Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: All the remaining pubsub message handlers #23

Merged
merged 9 commits into from
May 17, 2024
9 changes: 5 additions & 4 deletions eth/node_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,12 +376,12 @@ func desiredPubSubBaseTopics() []string {
return []string{
p2p.GossipBlockMessage,
p2p.GossipAggregateAndProofMessage,
// p2p.GossipAttestationMessage,
p2p.GossipAttestationMessage,
p2p.GossipExitMessage,
p2p.GossipAttesterSlashingMessage,
p2p.GossipProposerSlashingMessage,
p2p.GossipContributionAndProofMessage,
// p2p.GossipSyncCommitteeMessage,
p2p.GossipSyncCommitteeMessage,
p2p.GossipBlsToExecutionChangeMessage,
// p2p.GossipBlobSidecarMessage,
}
Expand Down Expand Up @@ -475,8 +475,9 @@ func (n *NodeConfig) getDefaultTopicScoreParams(encoder encoder.NetworkEncoding,
desiredTopics := n.getDesiredFullTopics(encoder)
topicScores := make(map[string]*pubsub.TopicScoreParams, len(desiredTopics))
for _, topic := range desiredTopics {
params := topicToScoreParamsMapper(topic, activeValidators)
topicScores[topic] = params
if params := topicToScoreParamsMapper(topic, activeValidators); params != nil {
topicScores[topic] = params
}
}
return topicScores
}
270 changes: 269 additions & 1 deletion eth/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"strings"
"time"

"github.com/ethereum/go-ethereum/common/hexutil"
pubsub "github.com/libp2p/go-libp2p-pubsub"
pubsubpb "github.com/libp2p/go-libp2p-pubsub/pb"
"github.com/libp2p/go-libp2p/core/peer"
Expand All @@ -21,6 +22,8 @@ import (
"github.com/probe-lab/hermes/tele"
)

const eventTypeHandleMessage = "HANDLE_MESSAGE"

type PubSubConfig struct {
Topics []string
ForkVersion ForkVersion
Expand Down Expand Up @@ -108,6 +111,22 @@ func (p *PubSub) mapPubSubTopicWithHandlers(topic string) host.TopicHandler {
switch {
case strings.Contains(topic, p2p.GossipBlockMessage):
return p.handleBeaconBlock
case strings.Contains(topic, p2p.GossipAggregateAndProofMessage):
return p.handleAggregateAndProof
case strings.Contains(topic, p2p.GossipAttestationMessage):
return p.handleAttestation
case strings.Contains(topic, p2p.GossipExitMessage):
return p.handleExitMessage
case strings.Contains(topic, p2p.GossipAttesterSlashingMessage):
return p.handleAttesterSlashingMessage
case strings.Contains(topic, p2p.GossipProposerSlashingMessage):
return p.handleProposerSlashingMessage
case strings.Contains(topic, p2p.GossipContributionAndProofMessage):
return p.handleContributtionAndProofMessage
case strings.Contains(topic, p2p.GossipSyncCommitteeMessage):
return p.handleSyncCommitteeMessage
case strings.Contains(topic, p2p.GossipBlsToExecutionChangeMessage):
return p.handleBlsToExecutionChangeMessage
default:
return p.host.TracedTopicHandler(host.NoopHandler)
}
Expand Down Expand Up @@ -143,7 +162,7 @@ func (p *PubSub) handleBeaconBlock(ctx context.Context, msg *pubsub.Message) err
slotStart := p.cfg.GenesisTime.Add(time.Duration(slot) * p.cfg.SecondsPerSlot)

evt := &host.TraceEvent{
Type: "HANDLE_MESSAGE",
Type: eventTypeHandleMessage,
PeerID: p.host.ID(),
Timestamp: now,
Payload: map[string]any{
Expand All @@ -165,6 +184,255 @@ func (p *PubSub) handleBeaconBlock(ctx context.Context, msg *pubsub.Message) err
return nil
}

func (p *PubSub) handleAttestation(ctx context.Context, msg *pubsub.Message) error {
if msg == nil || msg.Topic == nil || *msg.Topic == "" {
return fmt.Errorf("nil message or topic")
}

attestation := ethtypes.Attestation{}
err := p.cfg.Encoder.DecodeGossip(msg.Data, &attestation)
if err != nil {
return fmt.Errorf("decode attestation gossip message: %w", err)
}

now := time.Now()
evt := &host.TraceEvent{
Type: eventTypeHandleMessage,
PeerID: p.host.ID(),
Timestamp: now,
Payload: map[string]any{
"PeerID": msg.ReceivedFrom.String(),
"MsgID": hex.EncodeToString([]byte(msg.ID)),
"MsgSize": len(msg.Data),
"Topic": msg.GetTopic(),
"Seq": msg.GetSeqno(),
"CommIdx": attestation.GetData().GetCommitteeIndex(),
"Slot": attestation.GetData().GetSlot(),
"BeaconBlockRoot": attestation.GetData().GetBeaconBlockRoot(),
"Source": attestation.GetData().GetSource(),
"Target": attestation.GetData().GetTarget(),
},
}

if err := p.cfg.DataStream.PutRecord(ctx, evt); err != nil {
slog.Warn("failed putting topic handler event", tele.LogAttrError(err))
}

return nil
}

func (p *PubSub) handleAggregateAndProof(ctx context.Context, msg *pubsub.Message) error {
ap := &ethtypes.SignedAggregateAttestationAndProof{}
if err := p.cfg.Encoder.DecodeGossip(msg.Data, ap); err != nil {
return fmt.Errorf("decode aggregate and proof message: %w", err)
}

evt := &host.TraceEvent{
Type: eventTypeHandleMessage,
PeerID: p.host.ID(),
Timestamp: time.Now(),
Payload: map[string]any{
"PeerID": msg.ReceivedFrom.String(),
"MsgID": hex.EncodeToString([]byte(msg.ID)),
"MsgSize": len(msg.Data),
"Topic": msg.GetTopic(),
"Seq": msg.GetSeqno(),
"Sig": hexutil.Encode(ap.GetSignature()),
"AggIdx": ap.GetMessage().GetAggregatorIndex(),
"SelectionProof": hexutil.Encode(ap.GetMessage().GetSelectionProof()),
// There are other details in the SignedAggregateAttestationAndProof message, add them when needed.
},
}

if err := p.cfg.DataStream.PutRecord(ctx, evt); err != nil {
slog.Warn(
"failed putting topic handler event",
"topic", msg.GetTopic(),
"err", tele.LogAttrError(err),
)
}

return nil
}

func (p *PubSub) handleExitMessage(ctx context.Context, msg *pubsub.Message) error {
ve := &ethtypes.VoluntaryExit{}
if err := p.cfg.Encoder.DecodeGossip(msg.Data, ve); err != nil {
return fmt.Errorf("decode voluntary exit message: %w", err)
}

evt := &host.TraceEvent{
Type: eventTypeHandleMessage,
PeerID: p.host.ID(),
Timestamp: time.Now(),
Payload: map[string]any{
"PeerID": msg.ReceivedFrom.String(),
"MsgID": hex.EncodeToString([]byte(msg.ID)),
"MsgSize": len(msg.Data),
"Topic": msg.GetTopic(),
"Seq": msg.GetSeqno(),
"Epoch": ve.GetEpoch(),
"ValIdx": ve.GetValidatorIndex(),
},
}

if err := p.cfg.DataStream.PutRecord(ctx, evt); err != nil {
slog.Warn("failed putting voluntary exit event", tele.LogAttrError(err))
}

return nil
}

func (p *PubSub) handleAttesterSlashingMessage(ctx context.Context, msg *pubsub.Message) error {
as := &ethtypes.AttesterSlashing{}
if err := p.cfg.Encoder.DecodeGossip(msg.Data, as); err != nil {
return fmt.Errorf("decode attester slashing message: %w", err)
}

evt := &host.TraceEvent{
Type: eventTypeHandleMessage,
PeerID: p.host.ID(),
Timestamp: time.Now(),
Payload: map[string]any{
"PeerID": msg.ReceivedFrom.String(),
"MsgID": hex.EncodeToString([]byte(msg.ID)),
"MsgSize": len(msg.Data),
"Topic": msg.GetTopic(),
"Seq": msg.GetSeqno(),
"Att1_indices": as.GetAttestation_1().GetAttestingIndices(),
"Att2_indices": as.GetAttestation_2().GetAttestingIndices(),
},
}

if err := p.cfg.DataStream.PutRecord(ctx, evt); err != nil {
slog.Warn("failed putting attester slashing event", tele.LogAttrError(err))
}

return nil
}

func (p *PubSub) handleProposerSlashingMessage(ctx context.Context, msg *pubsub.Message) error {
ps := &ethtypes.ProposerSlashing{}
if err := p.cfg.Encoder.DecodeGossip(msg.Data, ps); err != nil {
return fmt.Errorf("decode proposer slashing message: %w", err)
}

evt := &host.TraceEvent{
Type: eventTypeHandleMessage,
PeerID: p.host.ID(),
Timestamp: time.Now(),
Payload: map[string]any{
"PeerID": msg.ReceivedFrom.String(),
"MsgID": hex.EncodeToString([]byte(msg.ID)),
"MsgSize": len(msg.Data),
"Topic": msg.GetTopic(),
"Seq": msg.GetSeqno(),
"Header1_Slot": ps.GetHeader_1().GetHeader().GetSlot(),
"Header1_ProposerIndex": ps.GetHeader_1().GetHeader().GetProposerIndex(),
"Header1_StateRoot": hexutil.Encode(ps.GetHeader_1().GetHeader().GetStateRoot()),
"Header2_Slot": ps.GetHeader_2().GetHeader().GetSlot(),
"Header2_ProposerIndex": ps.GetHeader_2().GetHeader().GetProposerIndex(),
"Header2_StateRoot": hexutil.Encode(ps.GetHeader_2().GetHeader().GetStateRoot()),
},
}

if err := p.cfg.DataStream.PutRecord(ctx, evt); err != nil {
slog.Warn("failed putting proposer slashing event", tele.LogAttrError(err))
}

return nil
}

func (p *PubSub) handleContributtionAndProofMessage(ctx context.Context, msg *pubsub.Message) error {
cp := &ethtypes.SignedContributionAndProof{}
if err := p.cfg.Encoder.DecodeGossip(msg.Data, cp); err != nil {
return fmt.Errorf("decode contribution and proof message: %w", err)
}

evt := &host.TraceEvent{
Type: eventTypeHandleMessage,
PeerID: p.host.ID(),
Timestamp: time.Now(),
Payload: map[string]any{
"PeerID": msg.ReceivedFrom.String(),
"MsgID": hex.EncodeToString([]byte(msg.ID)),
"MsgSize": len(msg.Data),
"Topic": msg.GetTopic(),
"Seq": msg.GetSeqno(),
"Sig": hexutil.Encode(cp.GetSignature()),
"AggIdx": cp.GetMessage().GetAggregatorIndex(),
"Contrib_Slot": cp.GetMessage().GetContribution().GetSlot(),
"Contrib_SubCommitteeIdx": cp.GetMessage().GetContribution().GetSubcommitteeIndex(),
"Contrib_BlockRoot": cp.GetMessage().GetContribution().GetBlockRoot(),
},
}

if err := p.cfg.DataStream.PutRecord(ctx, evt); err != nil {
slog.Warn("failed putting contribution and proof event", tele.LogAttrError(err))
}

return nil
}

func (p *PubSub) handleSyncCommitteeMessage(ctx context.Context, msg *pubsub.Message) error {
sc := &ethtypes.SyncCommitteeMessage{}
if err := p.cfg.Encoder.DecodeGossip(msg.Data, sc); err != nil {
return fmt.Errorf("decode sync committee message: %w", err)
}

evt := &host.TraceEvent{
Type: eventTypeHandleMessage,
PeerID: p.host.ID(),
Timestamp: time.Now(),
Payload: map[string]any{
"PeerID": msg.ReceivedFrom.String(),
"MsgID": hex.EncodeToString([]byte(msg.ID)),
"MsgSize": len(msg.Data),
"Topic": msg.GetTopic(),
"Seq": msg.GetSeqno(),
"Slot": sc.GetSlot(),
"ValIdx": sc.GetValidatorIndex(),
"BlockRoot": hexutil.Encode(sc.GetBlockRoot()),
"Signature": hexutil.Encode(sc.GetSignature()),
},
}

if err := p.cfg.DataStream.PutRecord(ctx, evt); err != nil {
slog.Warn("failed putting sync committee event", tele.LogAttrError(err))
}

return nil
}

func (p *PubSub) handleBlsToExecutionChangeMessage(ctx context.Context, msg *pubsub.Message) error {
pb := &ethtypes.BLSToExecutionChange{}
if err := p.cfg.Encoder.DecodeGossip(msg.Data, pb); err != nil {
return fmt.Errorf("decode bls to execution change message: %w", err)
}

evt := &host.TraceEvent{
Type: eventTypeHandleMessage,
PeerID: p.host.ID(),
Timestamp: time.Now(),
Payload: map[string]any{
"PeerID": msg.ReceivedFrom.String(),
"MsgID": hex.EncodeToString([]byte(msg.ID)),
"MsgSize": len(msg.Data),
"Topic": msg.GetTopic(),
"Seq": msg.GetSeqno(),
"ValIdx": pb.GetValidatorIndex(),
"FromBlsPubkey": hexutil.Encode(pb.GetFromBlsPubkey()),
"ToExecutionAddress": hexutil.Encode(pb.GetToExecutionAddress()),
},
}

if err := p.cfg.DataStream.PutRecord(ctx, evt); err != nil {
slog.Warn("failed putting bls to execution change event", tele.LogAttrError(err))
}

return nil
}

type GenericSignedBeaconBlock interface {
GetBlock() GenericBeaconBlock
}
Expand Down
Loading