From 82a722725d26dcef6ea3cea9033f35357607406d Mon Sep 17 00:00:00 2001 From: Francis Li Date: Thu, 2 May 2024 16:46:46 -0700 Subject: [PATCH] Add blob handler to capture blob p2p messages and produce event --- eth/node_config.go | 4 ++-- eth/pubsub.go | 47 ++++++++++++++++++++++++++++++++++++++++++++++ host/host.go | 1 + 3 files changed, 50 insertions(+), 2 deletions(-) diff --git a/eth/node_config.go b/eth/node_config.go index fdfbb79..d372483 100644 --- a/eth/node_config.go +++ b/eth/node_config.go @@ -383,7 +383,7 @@ func desiredPubSubBaseTopics() []string { p2p.GossipContributionAndProofMessage, // p2p.GossipSyncCommitteeMessage, p2p.GossipBlsToExecutionChangeMessage, - // p2p.GossipBlobSidecarMessage, + p2p.GossipBlobSidecarMessage, } } @@ -441,7 +441,7 @@ func hasSubnets(topic string) (subnets uint64, hasSubnets bool) { } func (n *NodeConfig) composeEthTopic(base string, encoder encoder.NetworkEncoding, subnet uint64) string { - if subnet > 1 { // as far as I know, there aren't subnets with index 0 + if subnet > 0 { // as far as I know, there aren't subnets with index 0 return fmt.Sprintf(base, n.ForkDigest, subnet) + encoder.ProtocolSuffix() } else { return fmt.Sprintf(base, n.ForkDigest) + encoder.ProtocolSuffix() diff --git a/eth/pubsub.go b/eth/pubsub.go index 91f5a73..540f171 100644 --- a/eth/pubsub.go +++ b/eth/pubsub.go @@ -8,6 +8,7 @@ import ( "strings" "time" + "github.com/ethereum/go-ethereum/common/hexutil" pubsub "github.com/libp2p/go-libp2p-pubsub" pubsubpb "github.com/libp2p/go-libp2p-pubsub/pb" "github.com/libp2p/go-libp2p/core/peer" @@ -108,6 +109,8 @@ func (p *PubSub) mapPubSubTopicWithHandlers(topic string) host.TopicHandler { switch { case strings.Contains(topic, p2p.GossipBlockMessage): return p.handleBeaconBlock + case strings.Contains(topic, p2p.GossipBlobSidecarMessage): + return p.handleBlobSidecar default: return p.host.TracedTopicHandler(host.NoopHandler) } @@ -165,6 +168,50 @@ func (p *PubSub) handleBeaconBlock(ctx context.Context, msg *pubsub.Message) err return nil } +func (p *PubSub) handleBlobSidecar(ctx context.Context, msg *pubsub.Message) error { + switch p.cfg.ForkVersion { + case DenebForkVersion: + var blob ethtypes.BlobSidecar + err := p.cfg.Encoder.DecodeGossip(msg.Data, &blob) + if err != nil { + slog.Error("decode blob sidecar gossip message", tele.LogAttrError(err)) + return err + } + + slot := blob.GetSignedBlockHeader().GetHeader().GetSlot() + slotStart := p.cfg.GenesisTime.Add(time.Duration(slot) * p.cfg.SecondsPerSlot) + proposerIndex := blob.GetSignedBlockHeader().GetHeader().GetProposerIndex() + + now := time.Now() + evt := &host.TraceEvent{ + Type: "HANDLE_MESSAGE", + PeerID: p.host.ID(), + Timestamp: now, + Payload: map[string]any{ + "PeerID": msg.ReceivedFrom.String(), + "MsgID": hex.EncodeToString([]byte(msg.ID)), + "MsgSize": len(msg.Data), + "Topic": msg.GetTopic(), + "Seq": msg.GetSeqno(), + "Slot": slot, + "ValIdx": proposerIndex, + "TimeInSlot": now.Sub(slotStart).Seconds(), + "StateRoot": hexutil.Encode(blob.GetSignedBlockHeader().GetHeader().GetStateRoot()), + "BodyRoot": hexutil.Encode(blob.GetSignedBlockHeader().GetHeader().GetBodyRoot()), + "ParentRoot": hexutil.Encode(blob.GetSignedBlockHeader().GetHeader().GetParentRoot()), + }, + } + + if err := p.cfg.DataStream.PutRecord(ctx, evt); err != nil { + slog.Warn("failed putting topic handler event", tele.LogAttrError(err)) + } + default: + return fmt.Errorf("non recognized fork-version: %d", p.cfg.ForkVersion[:]) + } + + return nil +} + type GenericSignedBeaconBlock interface { GetBlock() GenericBeaconBlock } diff --git a/host/host.go b/host/host.go index f07e0e2..e09dc24 100644 --- a/host/host.go +++ b/host/host.go @@ -270,6 +270,7 @@ func (h *Host) PrivateListenMaddr() (ma.Multiaddr, error) { func (h *Host) TracedTopicHandler(handler TopicHandler) TopicHandler { return func(ctx context.Context, msg *pubsub.Message) error { + slog.Debug("Handling gossip message", "topic", msg.GetTopic()) evt := &TraceEvent{ Type: "HANDLE_MESSAGE", PeerID: h.ID(),