Skip to content

Commit

Permalink
Ensure timestamps are as accurate as possible
Browse files Browse the repository at this point in the history
  • Loading branch information
samcm committed May 27, 2024
1 parent d55cf4a commit 8b015a8
Showing 1 changed file with 25 additions and 9 deletions.
34 changes: 25 additions & 9 deletions eth/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,8 @@ func (p *PubSub) handleBeaconBlock(ctx context.Context, msg *pubsub.Message) err
}

func (p *PubSub) handleAttestation(ctx context.Context, msg *pubsub.Message) error {
now := time.Now()

if msg == nil || msg.Topic == nil || *msg.Topic == "" {
return fmt.Errorf("nil message or topic")
}
Expand All @@ -199,7 +201,6 @@ func (p *PubSub) handleAttestation(ctx context.Context, msg *pubsub.Message) err
return fmt.Errorf("decode attestation gossip message: %w", err)
}

now := time.Now()
evt := &host.TraceEvent{
Type: eventTypeHandleMessage,
PeerID: p.host.ID(),
Expand All @@ -226,6 +227,8 @@ func (p *PubSub) handleAttestation(ctx context.Context, msg *pubsub.Message) err
}

func (p *PubSub) handleAggregateAndProof(ctx context.Context, msg *pubsub.Message) error {
now := time.Now()

ap := &ethtypes.SignedAggregateAttestationAndProof{}
if err := p.cfg.Encoder.DecodeGossip(msg.Data, ap); err != nil {
return fmt.Errorf("decode aggregate and proof message: %w", err)
Expand All @@ -234,7 +237,7 @@ func (p *PubSub) handleAggregateAndProof(ctx context.Context, msg *pubsub.Messag
evt := &host.TraceEvent{
Type: eventTypeHandleMessage,
PeerID: p.host.ID(),
Timestamp: time.Now(),
Timestamp: now,
Payload: map[string]any{
"PeerID": msg.ReceivedFrom.String(),
"MsgID": hex.EncodeToString([]byte(msg.ID)),
Expand All @@ -260,6 +263,8 @@ func (p *PubSub) handleAggregateAndProof(ctx context.Context, msg *pubsub.Messag
}

func (p *PubSub) handleExitMessage(ctx context.Context, msg *pubsub.Message) error {
now := time.Now()

ve := &ethtypes.VoluntaryExit{}
if err := p.cfg.Encoder.DecodeGossip(msg.Data, ve); err != nil {
return fmt.Errorf("decode voluntary exit message: %w", err)
Expand All @@ -268,7 +273,7 @@ func (p *PubSub) handleExitMessage(ctx context.Context, msg *pubsub.Message) err
evt := &host.TraceEvent{
Type: eventTypeHandleMessage,
PeerID: p.host.ID(),
Timestamp: time.Now(),
Timestamp: now,
Payload: map[string]any{
"PeerID": msg.ReceivedFrom.String(),
"MsgID": hex.EncodeToString([]byte(msg.ID)),
Expand All @@ -288,6 +293,8 @@ func (p *PubSub) handleExitMessage(ctx context.Context, msg *pubsub.Message) err
}

func (p *PubSub) handleAttesterSlashingMessage(ctx context.Context, msg *pubsub.Message) error {
now := time.Now()

as := &ethtypes.AttesterSlashing{}
if err := p.cfg.Encoder.DecodeGossip(msg.Data, as); err != nil {
return fmt.Errorf("decode attester slashing message: %w", err)
Expand All @@ -296,7 +303,7 @@ func (p *PubSub) handleAttesterSlashingMessage(ctx context.Context, msg *pubsub.
evt := &host.TraceEvent{
Type: eventTypeHandleMessage,
PeerID: p.host.ID(),
Timestamp: time.Now(),
Timestamp: now,
Payload: map[string]any{
"PeerID": msg.ReceivedFrom.String(),
"MsgID": hex.EncodeToString([]byte(msg.ID)),
Expand All @@ -316,6 +323,8 @@ func (p *PubSub) handleAttesterSlashingMessage(ctx context.Context, msg *pubsub.
}

func (p *PubSub) handleProposerSlashingMessage(ctx context.Context, msg *pubsub.Message) error {
now := time.Now()

ps := &ethtypes.ProposerSlashing{}
if err := p.cfg.Encoder.DecodeGossip(msg.Data, ps); err != nil {
return fmt.Errorf("decode proposer slashing message: %w", err)
Expand All @@ -324,7 +333,7 @@ func (p *PubSub) handleProposerSlashingMessage(ctx context.Context, msg *pubsub.
evt := &host.TraceEvent{
Type: eventTypeHandleMessage,
PeerID: p.host.ID(),
Timestamp: time.Now(),
Timestamp: now,
Payload: map[string]any{
"PeerID": msg.ReceivedFrom.String(),
"MsgID": hex.EncodeToString([]byte(msg.ID)),
Expand All @@ -348,6 +357,8 @@ func (p *PubSub) handleProposerSlashingMessage(ctx context.Context, msg *pubsub.
}

func (p *PubSub) handleContributtionAndProofMessage(ctx context.Context, msg *pubsub.Message) error {
now := time.Now()

cp := &ethtypes.SignedContributionAndProof{}
if err := p.cfg.Encoder.DecodeGossip(msg.Data, cp); err != nil {
return fmt.Errorf("decode contribution and proof message: %w", err)
Expand All @@ -356,7 +367,7 @@ func (p *PubSub) handleContributtionAndProofMessage(ctx context.Context, msg *pu
evt := &host.TraceEvent{
Type: eventTypeHandleMessage,
PeerID: p.host.ID(),
Timestamp: time.Now(),
Timestamp: now,
Payload: map[string]any{
"PeerID": msg.ReceivedFrom.String(),
"MsgID": hex.EncodeToString([]byte(msg.ID)),
Expand All @@ -379,6 +390,8 @@ func (p *PubSub) handleContributtionAndProofMessage(ctx context.Context, msg *pu
}

func (p *PubSub) handleSyncCommitteeMessage(ctx context.Context, msg *pubsub.Message) error {
now := time.Now()

sc := &ethtypes.SyncCommitteeMessage{}
if err := p.cfg.Encoder.DecodeGossip(msg.Data, sc); err != nil {
return fmt.Errorf("decode sync committee message: %w", err)
Expand All @@ -387,7 +400,7 @@ func (p *PubSub) handleSyncCommitteeMessage(ctx context.Context, msg *pubsub.Mes
evt := &host.TraceEvent{
Type: eventTypeHandleMessage,
PeerID: p.host.ID(),
Timestamp: time.Now(),
Timestamp: now,
Payload: map[string]any{
"PeerID": msg.ReceivedFrom.String(),
"MsgID": hex.EncodeToString([]byte(msg.ID)),
Expand All @@ -409,6 +422,8 @@ func (p *PubSub) handleSyncCommitteeMessage(ctx context.Context, msg *pubsub.Mes
}

func (p *PubSub) handleBlsToExecutionChangeMessage(ctx context.Context, msg *pubsub.Message) error {
now := time.Now()

pb := &ethtypes.BLSToExecutionChange{}
if err := p.cfg.Encoder.DecodeGossip(msg.Data, pb); err != nil {
return fmt.Errorf("decode bls to execution change message: %w", err)
Expand All @@ -417,7 +432,7 @@ func (p *PubSub) handleBlsToExecutionChangeMessage(ctx context.Context, msg *pub
evt := &host.TraceEvent{
Type: eventTypeHandleMessage,
PeerID: p.host.ID(),
Timestamp: time.Now(),
Timestamp: now,
Payload: map[string]any{
"PeerID": msg.ReceivedFrom.String(),
"MsgID": hex.EncodeToString([]byte(msg.ID)),
Expand All @@ -438,6 +453,8 @@ func (p *PubSub) handleBlsToExecutionChangeMessage(ctx context.Context, msg *pub
}

func (p *PubSub) handleBlobSidecar(ctx context.Context, msg *pubsub.Message) error {
now := time.Now()

switch p.cfg.ForkVersion {
case DenebForkVersion:
var blob ethtypes.BlobSidecar
Expand All @@ -451,7 +468,6 @@ func (p *PubSub) handleBlobSidecar(ctx context.Context, msg *pubsub.Message) err
slotStart := p.cfg.GenesisTime.Add(time.Duration(slot) * p.cfg.SecondsPerSlot)
proposerIndex := blob.GetSignedBlockHeader().GetHeader().GetProposerIndex()

now := time.Now()
evt := &host.TraceEvent{
Type: "HANDLE_MESSAGE",
PeerID: p.host.ID(),
Expand Down

0 comments on commit 8b015a8

Please sign in to comment.