Skip to content

Commit

Permalink
Merge pull request #23 from 0x00101010/message_handlers
Browse files Browse the repository at this point in the history
feat: All the remaining pubsub message handlers
  • Loading branch information
guillaumemichel committed May 17, 2024
2 parents b3e91a8 + 66b1d87 commit e362cd6
Show file tree
Hide file tree
Showing 2 changed files with 273 additions and 5 deletions.
9 changes: 5 additions & 4 deletions eth/node_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,12 +376,12 @@ func desiredPubSubBaseTopics() []string {
return []string{
p2p.GossipBlockMessage,
p2p.GossipAggregateAndProofMessage,
// p2p.GossipAttestationMessage,
p2p.GossipAttestationMessage,
p2p.GossipExitMessage,
p2p.GossipAttesterSlashingMessage,
p2p.GossipProposerSlashingMessage,
p2p.GossipContributionAndProofMessage,
// p2p.GossipSyncCommitteeMessage,
p2p.GossipSyncCommitteeMessage,
p2p.GossipBlsToExecutionChangeMessage,
p2p.GossipBlobSidecarMessage,
}
Expand Down Expand Up @@ -475,8 +475,9 @@ func (n *NodeConfig) getDefaultTopicScoreParams(encoder encoder.NetworkEncoding,
desiredTopics := n.getDesiredFullTopics(encoder)
topicScores := make(map[string]*pubsub.TopicScoreParams, len(desiredTopics))
for _, topic := range desiredTopics {
params := topicToScoreParamsMapper(topic, activeValidators)
topicScores[topic] = params
if params := topicToScoreParamsMapper(topic, activeValidators); params != nil {
topicScores[topic] = params
}
}
return topicScores
}
269 changes: 268 additions & 1 deletion eth/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"github.com/probe-lab/hermes/tele"
)

const eventTypeHandleMessage = "HANDLE_MESSAGE"

type PubSubConfig struct {
Topics []string
ForkVersion ForkVersion
Expand Down Expand Up @@ -109,6 +111,22 @@ func (p *PubSub) mapPubSubTopicWithHandlers(topic string) host.TopicHandler {
switch {
case strings.Contains(topic, p2p.GossipBlockMessage):
return p.handleBeaconBlock
case strings.Contains(topic, p2p.GossipAggregateAndProofMessage):
return p.handleAggregateAndProof
case strings.Contains(topic, p2p.GossipAttestationMessage):
return p.handleAttestation
case strings.Contains(topic, p2p.GossipExitMessage):
return p.handleExitMessage
case strings.Contains(topic, p2p.GossipAttesterSlashingMessage):
return p.handleAttesterSlashingMessage
case strings.Contains(topic, p2p.GossipProposerSlashingMessage):
return p.handleProposerSlashingMessage
case strings.Contains(topic, p2p.GossipContributionAndProofMessage):
return p.handleContributtionAndProofMessage
case strings.Contains(topic, p2p.GossipSyncCommitteeMessage):
return p.handleSyncCommitteeMessage
case strings.Contains(topic, p2p.GossipBlsToExecutionChangeMessage):
return p.handleBlsToExecutionChangeMessage
case strings.Contains(topic, p2p.GossipBlobSidecarMessage):
return p.handleBlobSidecar
default:
Expand Down Expand Up @@ -146,7 +164,7 @@ func (p *PubSub) handleBeaconBlock(ctx context.Context, msg *pubsub.Message) err
slotStart := p.cfg.GenesisTime.Add(time.Duration(slot) * p.cfg.SecondsPerSlot)

evt := &host.TraceEvent{
Type: "HANDLE_MESSAGE",
Type: eventTypeHandleMessage,
PeerID: p.host.ID(),
Timestamp: now,
Payload: map[string]any{
Expand All @@ -168,6 +186,255 @@ func (p *PubSub) handleBeaconBlock(ctx context.Context, msg *pubsub.Message) err
return nil
}

func (p *PubSub) handleAttestation(ctx context.Context, msg *pubsub.Message) error {
if msg == nil || msg.Topic == nil || *msg.Topic == "" {
return fmt.Errorf("nil message or topic")
}

attestation := ethtypes.Attestation{}
err := p.cfg.Encoder.DecodeGossip(msg.Data, &attestation)
if err != nil {
return fmt.Errorf("decode attestation gossip message: %w", err)
}

now := time.Now()
evt := &host.TraceEvent{
Type: eventTypeHandleMessage,
PeerID: p.host.ID(),
Timestamp: now,
Payload: map[string]any{
"PeerID": msg.ReceivedFrom.String(),
"MsgID": hex.EncodeToString([]byte(msg.ID)),
"MsgSize": len(msg.Data),
"Topic": msg.GetTopic(),
"Seq": msg.GetSeqno(),
"CommIdx": attestation.GetData().GetCommitteeIndex(),
"Slot": attestation.GetData().GetSlot(),
"BeaconBlockRoot": attestation.GetData().GetBeaconBlockRoot(),
"Source": attestation.GetData().GetSource(),
"Target": attestation.GetData().GetTarget(),
},
}

if err := p.cfg.DataStream.PutRecord(ctx, evt); err != nil {
slog.Warn("failed putting topic handler event", tele.LogAttrError(err))
}

return nil
}

func (p *PubSub) handleAggregateAndProof(ctx context.Context, msg *pubsub.Message) error {
ap := &ethtypes.SignedAggregateAttestationAndProof{}
if err := p.cfg.Encoder.DecodeGossip(msg.Data, ap); err != nil {
return fmt.Errorf("decode aggregate and proof message: %w", err)
}

evt := &host.TraceEvent{
Type: eventTypeHandleMessage,
PeerID: p.host.ID(),
Timestamp: time.Now(),
Payload: map[string]any{
"PeerID": msg.ReceivedFrom.String(),
"MsgID": hex.EncodeToString([]byte(msg.ID)),
"MsgSize": len(msg.Data),
"Topic": msg.GetTopic(),
"Seq": msg.GetSeqno(),
"Sig": hexutil.Encode(ap.GetSignature()),
"AggIdx": ap.GetMessage().GetAggregatorIndex(),
"SelectionProof": hexutil.Encode(ap.GetMessage().GetSelectionProof()),
// There are other details in the SignedAggregateAttestationAndProof message, add them when needed.
},
}

if err := p.cfg.DataStream.PutRecord(ctx, evt); err != nil {
slog.Warn(
"failed putting topic handler event",
"topic", msg.GetTopic(),
"err", tele.LogAttrError(err),
)
}

return nil
}

func (p *PubSub) handleExitMessage(ctx context.Context, msg *pubsub.Message) error {
ve := &ethtypes.VoluntaryExit{}
if err := p.cfg.Encoder.DecodeGossip(msg.Data, ve); err != nil {
return fmt.Errorf("decode voluntary exit message: %w", err)
}

evt := &host.TraceEvent{
Type: eventTypeHandleMessage,
PeerID: p.host.ID(),
Timestamp: time.Now(),
Payload: map[string]any{
"PeerID": msg.ReceivedFrom.String(),
"MsgID": hex.EncodeToString([]byte(msg.ID)),
"MsgSize": len(msg.Data),
"Topic": msg.GetTopic(),
"Seq": msg.GetSeqno(),
"Epoch": ve.GetEpoch(),
"ValIdx": ve.GetValidatorIndex(),
},
}

if err := p.cfg.DataStream.PutRecord(ctx, evt); err != nil {
slog.Warn("failed putting voluntary exit event", tele.LogAttrError(err))
}

return nil
}

func (p *PubSub) handleAttesterSlashingMessage(ctx context.Context, msg *pubsub.Message) error {
as := &ethtypes.AttesterSlashing{}
if err := p.cfg.Encoder.DecodeGossip(msg.Data, as); err != nil {
return fmt.Errorf("decode attester slashing message: %w", err)
}

evt := &host.TraceEvent{
Type: eventTypeHandleMessage,
PeerID: p.host.ID(),
Timestamp: time.Now(),
Payload: map[string]any{
"PeerID": msg.ReceivedFrom.String(),
"MsgID": hex.EncodeToString([]byte(msg.ID)),
"MsgSize": len(msg.Data),
"Topic": msg.GetTopic(),
"Seq": msg.GetSeqno(),
"Att1_indices": as.GetAttestation_1().GetAttestingIndices(),
"Att2_indices": as.GetAttestation_2().GetAttestingIndices(),
},
}

if err := p.cfg.DataStream.PutRecord(ctx, evt); err != nil {
slog.Warn("failed putting attester slashing event", tele.LogAttrError(err))
}

return nil
}

func (p *PubSub) handleProposerSlashingMessage(ctx context.Context, msg *pubsub.Message) error {
ps := &ethtypes.ProposerSlashing{}
if err := p.cfg.Encoder.DecodeGossip(msg.Data, ps); err != nil {
return fmt.Errorf("decode proposer slashing message: %w", err)
}

evt := &host.TraceEvent{
Type: eventTypeHandleMessage,
PeerID: p.host.ID(),
Timestamp: time.Now(),
Payload: map[string]any{
"PeerID": msg.ReceivedFrom.String(),
"MsgID": hex.EncodeToString([]byte(msg.ID)),
"MsgSize": len(msg.Data),
"Topic": msg.GetTopic(),
"Seq": msg.GetSeqno(),
"Header1_Slot": ps.GetHeader_1().GetHeader().GetSlot(),
"Header1_ProposerIndex": ps.GetHeader_1().GetHeader().GetProposerIndex(),
"Header1_StateRoot": hexutil.Encode(ps.GetHeader_1().GetHeader().GetStateRoot()),
"Header2_Slot": ps.GetHeader_2().GetHeader().GetSlot(),
"Header2_ProposerIndex": ps.GetHeader_2().GetHeader().GetProposerIndex(),
"Header2_StateRoot": hexutil.Encode(ps.GetHeader_2().GetHeader().GetStateRoot()),
},
}

if err := p.cfg.DataStream.PutRecord(ctx, evt); err != nil {
slog.Warn("failed putting proposer slashing event", tele.LogAttrError(err))
}

return nil
}

func (p *PubSub) handleContributtionAndProofMessage(ctx context.Context, msg *pubsub.Message) error {
cp := &ethtypes.SignedContributionAndProof{}
if err := p.cfg.Encoder.DecodeGossip(msg.Data, cp); err != nil {
return fmt.Errorf("decode contribution and proof message: %w", err)
}

evt := &host.TraceEvent{
Type: eventTypeHandleMessage,
PeerID: p.host.ID(),
Timestamp: time.Now(),
Payload: map[string]any{
"PeerID": msg.ReceivedFrom.String(),
"MsgID": hex.EncodeToString([]byte(msg.ID)),
"MsgSize": len(msg.Data),
"Topic": msg.GetTopic(),
"Seq": msg.GetSeqno(),
"Sig": hexutil.Encode(cp.GetSignature()),
"AggIdx": cp.GetMessage().GetAggregatorIndex(),
"Contrib_Slot": cp.GetMessage().GetContribution().GetSlot(),
"Contrib_SubCommitteeIdx": cp.GetMessage().GetContribution().GetSubcommitteeIndex(),
"Contrib_BlockRoot": cp.GetMessage().GetContribution().GetBlockRoot(),
},
}

if err := p.cfg.DataStream.PutRecord(ctx, evt); err != nil {
slog.Warn("failed putting contribution and proof event", tele.LogAttrError(err))
}

return nil
}

func (p *PubSub) handleSyncCommitteeMessage(ctx context.Context, msg *pubsub.Message) error {
sc := &ethtypes.SyncCommitteeMessage{}
if err := p.cfg.Encoder.DecodeGossip(msg.Data, sc); err != nil {
return fmt.Errorf("decode sync committee message: %w", err)
}

evt := &host.TraceEvent{
Type: eventTypeHandleMessage,
PeerID: p.host.ID(),
Timestamp: time.Now(),
Payload: map[string]any{
"PeerID": msg.ReceivedFrom.String(),
"MsgID": hex.EncodeToString([]byte(msg.ID)),
"MsgSize": len(msg.Data),
"Topic": msg.GetTopic(),
"Seq": msg.GetSeqno(),
"Slot": sc.GetSlot(),
"ValIdx": sc.GetValidatorIndex(),
"BlockRoot": hexutil.Encode(sc.GetBlockRoot()),
"Signature": hexutil.Encode(sc.GetSignature()),
},
}

if err := p.cfg.DataStream.PutRecord(ctx, evt); err != nil {
slog.Warn("failed putting sync committee event", tele.LogAttrError(err))
}

return nil
}

func (p *PubSub) handleBlsToExecutionChangeMessage(ctx context.Context, msg *pubsub.Message) error {
pb := &ethtypes.BLSToExecutionChange{}
if err := p.cfg.Encoder.DecodeGossip(msg.Data, pb); err != nil {
return fmt.Errorf("decode bls to execution change message: %w", err)
}

evt := &host.TraceEvent{
Type: eventTypeHandleMessage,
PeerID: p.host.ID(),
Timestamp: time.Now(),
Payload: map[string]any{
"PeerID": msg.ReceivedFrom.String(),
"MsgID": hex.EncodeToString([]byte(msg.ID)),
"MsgSize": len(msg.Data),
"Topic": msg.GetTopic(),
"Seq": msg.GetSeqno(),
"ValIdx": pb.GetValidatorIndex(),
"FromBlsPubkey": hexutil.Encode(pb.GetFromBlsPubkey()),
"ToExecutionAddress": hexutil.Encode(pb.GetToExecutionAddress()),
},
}

if err := p.cfg.DataStream.PutRecord(ctx, evt); err != nil {
slog.Warn("failed putting bls to execution change event", tele.LogAttrError(err))
}

return nil
}

func (p *PubSub) handleBlobSidecar(ctx context.Context, msg *pubsub.Message) error {
switch p.cfg.ForkVersion {
case DenebForkVersion:
Expand Down

0 comments on commit e362cd6

Please sign in to comment.