Skip to content

Commit

Permalink
refactor: Update Dockerfile and handleGossipDataColumnSidecar
Browse files Browse the repository at this point in the history
  • Loading branch information
samcm committed May 13, 2024
1 parent 0ac4a56 commit 920981e
Show file tree
Hide file tree
Showing 7 changed files with 67 additions and 22 deletions.
4 changes: 3 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
FROM golang:1.22 AS builder
WORKDIR /src
COPY go.sum go.mod ./
RUN go mod download
ARG GOPROXY
ENV GOPROXY=${GOPROXY}
RUN go mod download -x
COPY . .
RUN go build -o /bin/app .

Expand Down
36 changes: 35 additions & 1 deletion deploy/local/docker-compose/vector-kafka-clickhouse-libp2p.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -809,7 +809,7 @@ transforms:
.kzg_proof = .data.kzg_proof
.kzg_commitments_inclusion_proof = .data.kzg_commitments_inclusion_proof
.slot = .data.data.slot
.slot = .data.signed_block_header.message.slot
slot_start_date_time, err = parse_timestamp(.meta.client.additional_data.slot.start_date_time, format: "%+");
if err == null {
.slot_start_date_time = to_unix_timestamp(slot_start_date_time)
Expand Down Expand Up @@ -847,6 +847,20 @@ transforms:
log(., level: "error", rate_limit_secs: 60)
}
topicParts, err = split(.meta.client.additional_data.topic, "/")
if err != null {
.error = err
.error_description = "failed to split topic"
} else {
if length(topicParts) != 5 {
.errDebug = {
"topic": .meta.client.additional_data.topic,
}
.error_description = "failed to split topic"
log(., level: "error", rate_limit_secs: 60)
}
}
.topic_layer = topicParts[1]
.topic_fork_digest_value = topicParts[2]
.topic_name = topicParts[3]
Expand Down Expand Up @@ -1806,3 +1820,23 @@ sinks:
healthcheck:
enabled: true
skip_unknown_fields: false
libp2p_trace_gossipsub_beacon_data_column_sidecar_clickhouse:
type: clickhouse
inputs:
- libp2p_trace_gossipsub_beacon_data_column_sidecar_formatted
database: default
endpoint: "${CLICKHOUSE_ENDPOINT}"
table: libp2p_gossipsub_beacon_data_column_sidecar
auth:
strategy: basic
user: "${CLICKHOUSE_USER}"
password: "${CLICKHOUSE_PASSWORD}"
batch:
max_bytes: 52428800
max_events: 200000
timeout_secs: 1
buffer:
max_events: 200000
healthcheck:
enabled: true
skip_unknown_fields: false
3 changes: 3 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ services:
build:
context: .
dockerfile: Dockerfile
args:
GOPROXY: ${GOPROXY}
ports:
- "8080:8080"
- "9096:9090"
Expand Down Expand Up @@ -264,6 +266,7 @@ services:
"libp2p-trace-handle-status"
"libp2p-trace-gossipsub-beacon-block"
"libp2p-trace-gossipsub-beacon-attestation"
"libp2p-trace-gossipsub-beacon-data-column-sidecar"
)
for topic in "$${topics[@]}"; do
echo "Creating topic: $$topic";
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ module github.com/ethpandaops/xatu

go 1.22.0

replace github.com/probe-lab/hermes => github.com/ethpandaops/hermes v0.0.0-20240513080348-eafb396a7027
replace github.com/probe-lab/hermes => github.com/ethpandaops/hermes v0.0.0-20240513121617-967d7180f2b3

require (
github.com/IBM/sarama v1.43.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -266,8 +266,8 @@ github.com/ethpandaops/ethcore v0.0.0-20240422023000-2a5727b18756 h1:8JWjrRfP14m
github.com/ethpandaops/ethcore v0.0.0-20240422023000-2a5727b18756/go.mod h1:ZvKqL6CKxiraefdXPHeJurV2pDD/f2HF2uklDVdrry8=
github.com/ethpandaops/ethwallclock v0.3.0 h1:xF5fwtBf+bHFHZKBnwiPFEuelW3sMM7SD3ZNFq1lJY4=
github.com/ethpandaops/ethwallclock v0.3.0/go.mod h1:y0Cu+mhGLlem19vnAV2x0hpFS5KZ7oOi2SWYayv9l24=
github.com/ethpandaops/hermes v0.0.0-20240513080348-eafb396a7027 h1:Q9QXjCXYsvFmrlZ+F+w1plLufilUVTLS9trGONbV6IE=
github.com/ethpandaops/hermes v0.0.0-20240513080348-eafb396a7027/go.mod h1:Z65YW+OhX67VHToofu2+wZDdMUgQ1AnvaEPgB9GzTC0=
github.com/ethpandaops/hermes v0.0.0-20240513121617-967d7180f2b3 h1:QLESeVZCK5Tq8h3CdtaRcpf8yHKeCVAnTCLk3lV0eBA=
github.com/ethpandaops/hermes v0.0.0-20240513121617-967d7180f2b3/go.mod h1:Z65YW+OhX67VHToofu2+wZDdMUgQ1AnvaEPgB9GzTC0=
github.com/evanphx/json-patch v4.9.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM=
Expand Down
38 changes: 22 additions & 16 deletions pkg/clmimicry/gossipsub_data_column_sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package clmimicry

import (
"context"
"errors"
"fmt"
"time"

Expand All @@ -26,26 +27,31 @@ func (m *Mimicry) handleGossipDataColumnSidecar(ctx context.Context,
return fmt.Errorf("invalid sidecar")
}

dataColumn := ""
for _, bytes := range eSidecar.DataColumn {
dataColumn += string(bytes)
row := 0 // PeerDAS is 1d initially

if len(eSidecar.DataColumn) == 0 {
return errors.New("invalid sidecar data column")
}

kzgCommitments := make([]string, len(eSidecar.KzgCommitments))
for i, bytes := range eSidecar.KzgCommitments {
kzgCommitments[i] = fmt.Sprintf("0x%x", bytes)
dataColumn := string(eSidecar.DataColumn[row])

if len(eSidecar.KzgCommitments) == 0 {
return errors.New("invalid sidecar kzg commitments")
}

kzgProof := ""
for _, bytes := range eSidecar.KzgProof {
kzgProof += string(bytes)
kzgCommitments := []string{}

for _, bytes := range eSidecar.KzgCommitments {
kzgCommitments = append(kzgCommitments, fmt.Sprintf("0x%x", bytes))
}

kzgCommitmentsInclusionProof := ""
for _, bytes := range eSidecar.KzgCommitmentsInclusionProof {
kzgCommitmentsInclusionProof += string(bytes)
if len(eSidecar.KzgProof) == 0 {
return errors.New("invalid sidecar kzg proof")
}

kzgProof := eSidecar.KzgProof[row]
kzgCommitmentsInclusionProof := eSidecar.KzgCommitmentsInclusionProof[row]

signedBlockHeader := &xatuethv1.SignedBeaconBlockHeaderV2{
Message: &xatuethv1.BeaconBlockHeaderV2{
Slot: wrapperspb.UInt64(uint64(eSidecar.SignedBlockHeader.GetHeader().GetSlot())),
Expand All @@ -59,11 +65,11 @@ func (m *Mimicry) handleGossipDataColumnSidecar(ctx context.Context,

sidecar := &gossipsub.DataColumnSidecar{
ColumnIndex: wrapperspb.UInt64(eSidecar.ColumnIndex),
DataColumn: dataColumn,
DataColumn: fmt.Sprintf("0x%x", dataColumn),
KzgCommitments: kzgCommitments,
KzgProof: kzgProof,
KzgProof: fmt.Sprintf("0x%x", kzgProof),
SignedBlockHeader: signedBlockHeader,
KzgCommitmentsInclusionProof: kzgCommitmentsInclusionProof,
KzgCommitmentsInclusionProof: fmt.Sprintf("0x%x", kzgCommitmentsInclusionProof),
}

metadata, ok := proto.Clone(clientMeta).(*xatu.ClientMeta)
Expand All @@ -87,7 +93,7 @@ func (m *Mimicry) handleGossipDataColumnSidecar(ctx context.Context,

decoratedEvent := &xatu.DecoratedEvent{
Event: &xatu.Event{
Name: xatu.Event_LIBP2P_TRACE_GOSSIPSUB_BEACON_ATTESTATION,
Name: xatu.Event_LIBP2P_TRACE_GOSSIPSUB_BEACON_DATA_COLUMN_SIDECAR,
DateTime: timestamppb.New(timestamp.Add(m.clockDrift)),
Id: uuid.New().String(),
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func (gsb *TraceGossipSubBeaconDataColumnSidecar) Type() string {
}

func (gsb *TraceGossipSubBeaconDataColumnSidecar) Validate(ctx context.Context) error {
_, ok := gsb.event.Data.(*xatu.DecoratedEvent_Libp2PTraceGossipsubBeaconAttestation)
_, ok := gsb.event.Data.(*xatu.DecoratedEvent_Libp2PTraceGossipsubBeaconDataColumnSidecar)
if !ok {
return errors.New("failed to cast event data to TraceGossipSubBeaconDataColumnSidecar")
}
Expand Down

0 comments on commit 920981e

Please sign in to comment.