Skip to content

Commit

Permalink
Add blob handler to capture blob p2p messages and produce event
Browse files Browse the repository at this point in the history
  • Loading branch information
0x00101010 committed May 3, 2024
1 parent 307e7aa commit ab67cde
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 2 deletions.
4 changes: 2 additions & 2 deletions eth/node_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ func desiredPubSubBaseTopics() []string {
p2p.GossipContributionAndProofMessage,
// p2p.GossipSyncCommitteeMessage,
p2p.GossipBlsToExecutionChangeMessage,
// p2p.GossipBlobSidecarMessage,
p2p.GossipBlobSidecarMessage,
}
}

Expand Down Expand Up @@ -441,7 +441,7 @@ func hasSubnets(topic string) (subnets uint64, hasSubnets bool) {
}

func (n *NodeConfig) composeEthTopic(base string, encoder encoder.NetworkEncoding, subnet uint64) string {
if subnet > 1 { // as far as I know, there aren't subnets with index 0
if subnet > 0 { // as far as I know, there aren't subnets with index 0
return fmt.Sprintf(base, n.ForkDigest, subnet) + encoder.ProtocolSuffix()
} else {
return fmt.Sprintf(base, n.ForkDigest) + encoder.ProtocolSuffix()
Expand Down
55 changes: 55 additions & 0 deletions eth/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ func (p *PubSub) mapPubSubTopicWithHandlers(topic string) host.TopicHandler {
switch {
case strings.Contains(topic, p2p.GossipBlockMessage):
return p.handleBeaconBlock
case strings.Contains(topic, p2p.GossipBlobSidecarMessage):
return p.handleBlobSidecar
default:
return p.host.TracedTopicHandler(host.NoopHandler)
}
Expand Down Expand Up @@ -165,6 +167,59 @@ func (p *PubSub) handleBeaconBlock(ctx context.Context, msg *pubsub.Message) err
return nil
}

func (p *PubSub) handleBlobSidecar(ctx context.Context, msg *pubsub.Message) error {
switch p.cfg.ForkVersion {
case DenebForkVersion:
var blob ethtypes.BlobSidecar
err := p.cfg.Encoder.DecodeGossip(msg.Data, &blob)
if err != nil {
slog.Error("decode blob sidecar gossip message", tele.LogAttrError(err))
return err
}

slot := blob.GetSignedBlockHeader().GetHeader().GetSlot()
slotStart := p.cfg.GenesisTime.Add(time.Duration(slot) * p.cfg.SecondsPerSlot)
proposerIndex := blob.GetSignedBlockHeader().GetHeader().GetProposerIndex()

now := time.Now()
evt := &host.TraceEvent{
Type: "HANDLE_MESSAGE",
PeerID: p.host.ID(),
Timestamp: now,
Payload: map[string]any{
"PeerID": msg.ReceivedFrom.String(),
"MsgID": hex.EncodeToString([]byte(msg.ID)),
"MsgSize": len(msg.Data),
"Topic": msg.GetTopic(),
"Seq": msg.GetSeqno(),
"Slot": slot,
"ValIdx": proposerIndex,
"TimeInSlot": now.Sub(slotStart).Seconds(),
},
}
slog.Info(
"Handling blob gossip message",
"PeerID", p.host.ID(),
"RemotePeerID", msg.ReceivedFrom.String(),
"MsgID", hex.EncodeToString([]byte(msg.ID)),
"MsgSize", len(msg.Data),
"Topic", msg.GetTopic(),
"Seq", msg.GetSeqno(),
"Slot", slot,
"ValIdx", proposerIndex,
"TimeInSlot", now.Sub(slotStart).Seconds(),
)

if err := p.cfg.DataStream.PutRecord(ctx, evt); err != nil {
slog.Warn("failed putting topic handler event", tele.LogAttrError(err))
}
default:
return fmt.Errorf("non recognized fork-version: %d", p.cfg.ForkVersion[:])
}

return nil
}

type GenericSignedBeaconBlock interface {
GetBlock() GenericBeaconBlock
}
Expand Down
1 change: 1 addition & 0 deletions host/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ func (h *Host) PrivateListenMaddr() (ma.Multiaddr, error) {

func (h *Host) TracedTopicHandler(handler TopicHandler) TopicHandler {
return func(ctx context.Context, msg *pubsub.Message) error {
slog.Debug("Handling gossip message", "topic", msg.GetTopic())
evt := &TraceEvent{
Type: "HANDLE_MESSAGE",
PeerID: h.ID(),
Expand Down

0 comments on commit ab67cde

Please sign in to comment.