diff --git a/deploy/local/docker-compose/vector-kafka-clickhouse-libp2p.yaml b/deploy/local/docker-compose/vector-kafka-clickhouse-libp2p.yaml index 25465646..b5f52986 100644 --- a/deploy/local/docker-compose/vector-kafka-clickhouse-libp2p.yaml +++ b/deploy/local/docker-compose/vector-kafka-clickhouse-libp2p.yaml @@ -793,7 +793,7 @@ transforms: log(., level: "error", rate_limit_secs: 60) } - .slot = .data.data.slot + .slot = .data.slot slot_start_date_time, err = parse_timestamp(.meta.client.additional_data.slot.start_date_time, format: "%+"); if err == null { .slot_start_date_time = to_unix_timestamp(slot_start_date_time) @@ -834,7 +834,7 @@ transforms: .propagation_slot_start_diff = .meta.client.additional_data.propagation.slot_start_diff .proposer_index = .data.proposer_index - .blob_index = .data.data.blob_index + .blob_index = .data.index peer_id_key, err = .meta.client.additional_data.metadata.peer_id + .meta.ethereum.network.name if err != null { diff --git a/deploy/migrations/clickhouse/037_libp2p_gossipsub_blob_sidecar.up.sql b/deploy/migrations/clickhouse/037_libp2p_gossipsub_blob_sidecar.up.sql index 19d4261c..9e277fd4 100644 --- a/deploy/migrations/clickhouse/037_libp2p_gossipsub_blob_sidecar.up.sql +++ b/deploy/migrations/clickhouse/037_libp2p_gossipsub_blob_sidecar.up.sql @@ -15,7 +15,7 @@ ON CLUSTER '{cluster}' ( proposer_index UInt32 CODEC(ZSTD(1)), blob_index UInt32 CODEC(ZSTD(1)), peer_id_unique_key Int64, - message_id String CODEC(ZSTD(1)),z + message_id String CODEC(ZSTD(1)), message_size UInt32 Codec(ZSTD(1)), topic_layer LowCardinality(String), topic_fork_digest_value LowCardinality(String), @@ -37,9 +37,9 @@ ON CLUSTER '{cluster}' ( meta_client_geo_autonomous_system_organization Nullable(String) Codec(ZSTD(1)), meta_network_id Int32 Codec(DoubleDelta, ZSTD(1)), meta_network_name LowCardinality(String) -) Engine = ReplicatedMergeTree('/clickhouse/{installation}/{cluster}/tables/{shard}/{database}/{table}', '{replica}') +) Engine = ReplicatedReplacingMergeTree('/clickhouse/{installation}/{cluster}/tables/{shard}/{database}/{table}', '{replica}', updated_date_time) PARTITION BY toStartOfMonth(slot_start_date_time) -ORDER BY (slot_start_date_time, meta_network_name, meta_client_name); +ORDER BY (slot_start_date_time, unique_key, meta_network_name, meta_client_name); ALTER TABLE libp2p_gossipsub_blob_sidecar_local ON CLUSTER '{cluster}' @@ -85,4 +85,4 @@ COMMENT COLUMN meta_network_name 'Name of the network associated with the client CREATE TABLE libp2p_gossipsub_blob_sidecar ON CLUSTER '{cluster}' AS libp2p_gossipsub_blob_sidecar_local -ENGINE = Distributed('{cluster}', '{database}', 'libp2p_gossipsub_blob_sidecar_local', rand()); \ No newline at end of file +ENGINE = Distributed('{cluster}', default, libp2p_gossipsub_blob_sidecar_local, unique_key); \ No newline at end of file diff --git a/pkg/clmimicry/gossipsub_blob_sidecar.go b/pkg/clmimicry/gossipsub_blob_sidecar.go index e11dd5ae..ab47b2f6 100644 --- a/pkg/clmimicry/gossipsub_blob_sidecar.go +++ b/pkg/clmimicry/gossipsub_blob_sidecar.go @@ -28,7 +28,7 @@ func (m *Mimicry) handleGossipBlobSidecar( return fmt.Errorf("invalid slot") } - blobIndex, ok := payload["BlobIndex"].(uint64) + blobIndex, ok := payload["index"].(uint64) if !ok { return fmt.Errorf("invalid blob index") } @@ -65,7 +65,7 @@ func (m *Mimicry) handleGossipBlobSidecar( Id: uuid.New().String(), }, Meta: &xatu.Meta{ - Client: clientMeta, + Client: metadata, }, Data: &xatu.DecoratedEvent_Libp2PTraceGossipsubBlobSidecar{ Libp2PTraceGossipsubBlobSidecar: data, @@ -105,7 +105,7 @@ func (m *Mimicry) createAdditionalGossipSubBlobSidecarData( return nil, fmt.Errorf("invalid message ID") } - msgSize, ok := payload["MsgSize"].(uint32) + msgSize, ok := payload["MsgSize"].(int) if !ok { return nil, fmt.Errorf("invalid message size") } @@ -136,8 +136,9 @@ func (m *Mimicry) createAdditionalGossipSubBlobSidecarData( PeerId: wrapperspb.String(peerID), }, Topic: wrapperspb.String(topic), - MessageSize: wrapperspb.UInt32(msgSize), + MessageSize: wrapperspb.UInt32(uint32(msgSize)), MessageId: wrapperspb.String(msgID), } + return data, nil }