From 920981ef3e3c5a9816a542bd386f295cab168ce9 Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Mon, 13 May 2024 16:23:18 +0300 Subject: [PATCH] refactor: Update Dockerfile and handleGossipDataColumnSidecar --- Dockerfile | 4 +- .../vector-kafka-clickhouse-libp2p.yaml | 36 +++++++++++++++++- docker-compose.yml | 3 ++ go.mod | 2 +- go.sum | 4 +- .../gossipsub_data_column_sidecar.go | 38 +++++++++++-------- ...ce_gossipsub_beacon_data_column_sidecar.go | 2 +- 7 files changed, 67 insertions(+), 22 deletions(-) diff --git a/Dockerfile b/Dockerfile index 69bb2c34..d7416fae 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,7 +1,9 @@ FROM golang:1.22 AS builder WORKDIR /src COPY go.sum go.mod ./ -RUN go mod download +ARG GOPROXY +ENV GOPROXY=${GOPROXY} +RUN go mod download -x COPY . . RUN go build -o /bin/app . diff --git a/deploy/local/docker-compose/vector-kafka-clickhouse-libp2p.yaml b/deploy/local/docker-compose/vector-kafka-clickhouse-libp2p.yaml index b9c66488..aa67b521 100644 --- a/deploy/local/docker-compose/vector-kafka-clickhouse-libp2p.yaml +++ b/deploy/local/docker-compose/vector-kafka-clickhouse-libp2p.yaml @@ -809,7 +809,7 @@ transforms: .kzg_proof = .data.kzg_proof .kzg_commitments_inclusion_proof = .data.kzg_commitments_inclusion_proof - .slot = .data.data.slot + .slot = .data.signed_block_header.message.slot slot_start_date_time, err = parse_timestamp(.meta.client.additional_data.slot.start_date_time, format: "%+"); if err == null { .slot_start_date_time = to_unix_timestamp(slot_start_date_time) @@ -847,6 +847,20 @@ transforms: log(., level: "error", rate_limit_secs: 60) } + topicParts, err = split(.meta.client.additional_data.topic, "/") + if err != null { + .error = err + .error_description = "failed to split topic" + } else { + if length(topicParts) != 5 { + .errDebug = { + "topic": .meta.client.additional_data.topic, + } + .error_description = "failed to split topic" + log(., level: "error", rate_limit_secs: 60) + } + } + .topic_layer = topicParts[1] .topic_fork_digest_value = topicParts[2] .topic_name = topicParts[3] @@ -1806,3 +1820,23 @@ sinks: healthcheck: enabled: true skip_unknown_fields: false + libp2p_trace_gossipsub_beacon_data_column_sidecar_clickhouse: + type: clickhouse + inputs: + - libp2p_trace_gossipsub_beacon_data_column_sidecar_formatted + database: default + endpoint: "${CLICKHOUSE_ENDPOINT}" + table: libp2p_gossipsub_beacon_data_column_sidecar + auth: + strategy: basic + user: "${CLICKHOUSE_USER}" + password: "${CLICKHOUSE_PASSWORD}" + batch: + max_bytes: 52428800 + max_events: 200000 + timeout_secs: 1 + buffer: + max_events: 200000 + healthcheck: + enabled: true + skip_unknown_fields: false \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index eef56856..4c3deb53 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -144,6 +144,8 @@ services: build: context: . dockerfile: Dockerfile + args: + GOPROXY: ${GOPROXY} ports: - "8080:8080" - "9096:9090" @@ -264,6 +266,7 @@ services: "libp2p-trace-handle-status" "libp2p-trace-gossipsub-beacon-block" "libp2p-trace-gossipsub-beacon-attestation" + "libp2p-trace-gossipsub-beacon-data-column-sidecar" ) for topic in "$${topics[@]}"; do echo "Creating topic: $$topic"; diff --git a/go.mod b/go.mod index ffc06b39..776ded68 100644 --- a/go.mod +++ b/go.mod @@ -2,7 +2,7 @@ module github.com/ethpandaops/xatu go 1.22.0 -replace github.com/probe-lab/hermes => github.com/ethpandaops/hermes v0.0.0-20240513080348-eafb396a7027 +replace github.com/probe-lab/hermes => github.com/ethpandaops/hermes v0.0.0-20240513121617-967d7180f2b3 require ( github.com/IBM/sarama v1.43.0 diff --git a/go.sum b/go.sum index d98b546f..52a19bf3 100644 --- a/go.sum +++ b/go.sum @@ -266,8 +266,8 @@ github.com/ethpandaops/ethcore v0.0.0-20240422023000-2a5727b18756 h1:8JWjrRfP14m github.com/ethpandaops/ethcore v0.0.0-20240422023000-2a5727b18756/go.mod h1:ZvKqL6CKxiraefdXPHeJurV2pDD/f2HF2uklDVdrry8= github.com/ethpandaops/ethwallclock v0.3.0 h1:xF5fwtBf+bHFHZKBnwiPFEuelW3sMM7SD3ZNFq1lJY4= github.com/ethpandaops/ethwallclock v0.3.0/go.mod h1:y0Cu+mhGLlem19vnAV2x0hpFS5KZ7oOi2SWYayv9l24= -github.com/ethpandaops/hermes v0.0.0-20240513080348-eafb396a7027 h1:Q9QXjCXYsvFmrlZ+F+w1plLufilUVTLS9trGONbV6IE= -github.com/ethpandaops/hermes v0.0.0-20240513080348-eafb396a7027/go.mod h1:Z65YW+OhX67VHToofu2+wZDdMUgQ1AnvaEPgB9GzTC0= +github.com/ethpandaops/hermes v0.0.0-20240513121617-967d7180f2b3 h1:QLESeVZCK5Tq8h3CdtaRcpf8yHKeCVAnTCLk3lV0eBA= +github.com/ethpandaops/hermes v0.0.0-20240513121617-967d7180f2b3/go.mod h1:Z65YW+OhX67VHToofu2+wZDdMUgQ1AnvaEPgB9GzTC0= github.com/evanphx/json-patch v4.9.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM= diff --git a/pkg/clmimicry/gossipsub_data_column_sidecar.go b/pkg/clmimicry/gossipsub_data_column_sidecar.go index 3e8dc439..2c652733 100644 --- a/pkg/clmimicry/gossipsub_data_column_sidecar.go +++ b/pkg/clmimicry/gossipsub_data_column_sidecar.go @@ -2,6 +2,7 @@ package clmimicry import ( "context" + "errors" "fmt" "time" @@ -26,26 +27,31 @@ func (m *Mimicry) handleGossipDataColumnSidecar(ctx context.Context, return fmt.Errorf("invalid sidecar") } - dataColumn := "" - for _, bytes := range eSidecar.DataColumn { - dataColumn += string(bytes) + row := 0 // PeerDAS is 1d initially + + if len(eSidecar.DataColumn) == 0 { + return errors.New("invalid sidecar data column") } - kzgCommitments := make([]string, len(eSidecar.KzgCommitments)) - for i, bytes := range eSidecar.KzgCommitments { - kzgCommitments[i] = fmt.Sprintf("0x%x", bytes) + dataColumn := string(eSidecar.DataColumn[row]) + + if len(eSidecar.KzgCommitments) == 0 { + return errors.New("invalid sidecar kzg commitments") } - kzgProof := "" - for _, bytes := range eSidecar.KzgProof { - kzgProof += string(bytes) + kzgCommitments := []string{} + + for _, bytes := range eSidecar.KzgCommitments { + kzgCommitments = append(kzgCommitments, fmt.Sprintf("0x%x", bytes)) } - kzgCommitmentsInclusionProof := "" - for _, bytes := range eSidecar.KzgCommitmentsInclusionProof { - kzgCommitmentsInclusionProof += string(bytes) + if len(eSidecar.KzgProof) == 0 { + return errors.New("invalid sidecar kzg proof") } + kzgProof := eSidecar.KzgProof[row] + kzgCommitmentsInclusionProof := eSidecar.KzgCommitmentsInclusionProof[row] + signedBlockHeader := &xatuethv1.SignedBeaconBlockHeaderV2{ Message: &xatuethv1.BeaconBlockHeaderV2{ Slot: wrapperspb.UInt64(uint64(eSidecar.SignedBlockHeader.GetHeader().GetSlot())), @@ -59,11 +65,11 @@ func (m *Mimicry) handleGossipDataColumnSidecar(ctx context.Context, sidecar := &gossipsub.DataColumnSidecar{ ColumnIndex: wrapperspb.UInt64(eSidecar.ColumnIndex), - DataColumn: dataColumn, + DataColumn: fmt.Sprintf("0x%x", dataColumn), KzgCommitments: kzgCommitments, - KzgProof: kzgProof, + KzgProof: fmt.Sprintf("0x%x", kzgProof), SignedBlockHeader: signedBlockHeader, - KzgCommitmentsInclusionProof: kzgCommitmentsInclusionProof, + KzgCommitmentsInclusionProof: fmt.Sprintf("0x%x", kzgCommitmentsInclusionProof), } metadata, ok := proto.Clone(clientMeta).(*xatu.ClientMeta) @@ -87,7 +93,7 @@ func (m *Mimicry) handleGossipDataColumnSidecar(ctx context.Context, decoratedEvent := &xatu.DecoratedEvent{ Event: &xatu.Event{ - Name: xatu.Event_LIBP2P_TRACE_GOSSIPSUB_BEACON_ATTESTATION, + Name: xatu.Event_LIBP2P_TRACE_GOSSIPSUB_BEACON_DATA_COLUMN_SIDECAR, DateTime: timestamppb.New(timestamp.Add(m.clockDrift)), Id: uuid.New().String(), }, diff --git a/pkg/server/service/event-ingester/event/libp2p/trace_gossipsub_beacon_data_column_sidecar.go b/pkg/server/service/event-ingester/event/libp2p/trace_gossipsub_beacon_data_column_sidecar.go index 0d1121a8..1b1b1a38 100644 --- a/pkg/server/service/event-ingester/event/libp2p/trace_gossipsub_beacon_data_column_sidecar.go +++ b/pkg/server/service/event-ingester/event/libp2p/trace_gossipsub_beacon_data_column_sidecar.go @@ -29,7 +29,7 @@ func (gsb *TraceGossipSubBeaconDataColumnSidecar) Type() string { } func (gsb *TraceGossipSubBeaconDataColumnSidecar) Validate(ctx context.Context) error { - _, ok := gsb.event.Data.(*xatu.DecoratedEvent_Libp2PTraceGossipsubBeaconAttestation) + _, ok := gsb.event.Data.(*xatu.DecoratedEvent_Libp2PTraceGossipsubBeaconDataColumnSidecar) if !ok { return errors.New("failed to cast event data to TraceGossipSubBeaconDataColumnSidecar") }