diff --git a/activation/handler.go b/activation/handler.go index e46400f77a..eb5e808e77 100644 --- a/activation/handler.go +++ b/activation/handler.go @@ -15,7 +15,6 @@ import ( "github.com/spacemeshos/go-spacemesh/datastore" "github.com/spacemeshos/go-spacemesh/events" "github.com/spacemeshos/go-spacemesh/log" - "github.com/spacemeshos/go-spacemesh/metrics" "github.com/spacemeshos/go-spacemesh/p2p" "github.com/spacemeshos/go-spacemesh/p2p/pubsub" "github.com/spacemeshos/go-spacemesh/signing" @@ -509,11 +508,6 @@ func (h *Handler) handleAtx(ctx context.Context, expHash types.Hash32, peer p2p. return fmt.Errorf("%w: %w", errMalformedData, err) } - epochStart := h.clock.LayerToTime(atx.PublishEpoch.FirstLayer()) - poetRoundEnd := epochStart.Add(h.poetCfg.PhaseShift - h.poetCfg.CycleGap) - latency := receivedTime.Sub(poetRoundEnd) - metrics.ReportMessageLatency(pubsub.AtxProtocol, pubsub.AtxProtocol, latency) - atx.SetReceived(receivedTime.Local()) if err := atx.Initialize(); err != nil { return fmt.Errorf("failed to derive ID from atx: %w", err) diff --git a/activation/handler_test.go b/activation/handler_test.go index 4ac25d2f70..6b109576bf 100644 --- a/activation/handler_test.go +++ b/activation/handler_test.go @@ -924,7 +924,6 @@ func TestHandler_HandleGossipAtx(t *testing.T) { require.NoError(t, err) atxHdlr.mclock.EXPECT().CurrentLayer().Return(second.PublishEpoch.FirstLayer()) - atxHdlr.mclock.EXPECT().LayerToTime(gomock.Any()).Return(time.Now()) atxHdlr.mockFetch.EXPECT().RegisterPeerHashes(gomock.Any(), gomock.Any()) atxHdlr.mockFetch.EXPECT().GetPoetProof(gomock.Any(), second.GetPoetProofRef()).Return(errors.New("woof")) err = atxHdlr.HandleGossipAtx(context.Background(), "", secondData) @@ -932,7 +931,6 @@ func TestHandler_HandleGossipAtx(t *testing.T) { require.ErrorContains(t, err, "missing poet proof") atxHdlr.mclock.EXPECT().CurrentLayer().Return(second.PublishEpoch.FirstLayer()) - atxHdlr.mclock.EXPECT().LayerToTime(gomock.Any()).Return(time.Now()) atxHdlr.mockFetch.EXPECT().RegisterPeerHashes(gomock.Any(), gomock.Any()) atxHdlr.mockFetch.EXPECT().GetPoetProof(gomock.Any(), second.GetPoetProofRef()) atxHdlr.mockFetch.EXPECT().GetAtxs(gomock.Any(), gomock.Any()).DoAndReturn( @@ -941,7 +939,6 @@ func TestHandler_HandleGossipAtx(t *testing.T) { data, err := codec.Encode(first) require.NoError(t, err) atxHdlr.mclock.EXPECT().CurrentLayer().Return(first.PublishEpoch.FirstLayer()) - atxHdlr.mclock.EXPECT().LayerToTime(gomock.Any()).Return(time.Now()) atxHdlr.mValidator.EXPECT().Post(gomock.Any(), nodeID1, goldenATXID, first.InitialPost, gomock.Any(), first.NumUnits) atxHdlr.mValidator.EXPECT().VRFNonce(nodeID1, goldenATXID, &vrfNonce, gomock.Any(), first.NumUnits) atxHdlr.mockFetch.EXPECT().RegisterPeerHashes(gomock.Any(), gomock.Any()) @@ -981,7 +978,6 @@ func TestHandler_HandleSyncedAtx(t *testing.T) { buf, err := codec.Encode(atx) require.NoError(t, err) - atxHdlr.mclock.EXPECT().LayerToTime(gomock.Any()).Return(time.Now()) require.ErrorContains(t, atxHdlr.HandleSyncedAtx(context.Background(), atx.ID().Hash32(), p2p.NoPeer, buf), fmt.Sprintf("nil nipst for atx %v", atx.ID())) }) @@ -999,10 +995,8 @@ func TestHandler_HandleSyncedAtx(t *testing.T) { buf, err := codec.Encode(atx) require.NoError(t, err) - atxHdlr.mclock.EXPECT().LayerToTime(gomock.Any()).Return(time.Now()) require.NoError(t, atxHdlr.HandleSyncedAtx(context.Background(), atx.ID().Hash32(), p2p.NoPeer, buf)) - atxHdlr.mclock.EXPECT().LayerToTime(gomock.Any()).Return(time.Now()) require.Error(t, atxHdlr.HandleGossipAtx(context.Background(), "", buf)) }) t.Run("known atx from local id is allowed", func(t *testing.T) { @@ -1018,11 +1012,7 @@ func TestHandler_HandleSyncedAtx(t *testing.T) { buf, err := codec.Encode(atx) require.NoError(t, err) - - atxHdlr.mclock.EXPECT().LayerToTime(gomock.Any()).Return(time.Now()) require.NoError(t, atxHdlr.HandleSyncedAtx(context.Background(), atx.ID().Hash32(), p2p.NoPeer, buf)) - - atxHdlr.mclock.EXPECT().LayerToTime(gomock.Any()).Return(time.Now()) require.NoError(t, atxHdlr.HandleGossipAtx(context.Background(), localID, buf)) }) @@ -1036,7 +1026,6 @@ func TestHandler_HandleSyncedAtx(t *testing.T) { buf, err := codec.Encode(atx) require.NoError(t, err) - atxHdlr.mclock.EXPECT().LayerToTime(gomock.Any()).Return(time.Now()) require.ErrorContains(t, atxHdlr.HandleSyncedAtx(context.Background(), atx.ID().Hash32(), p2p.NoPeer, buf), "failed to verify atx signature") }) } @@ -1263,7 +1252,6 @@ func TestHandler_AtxWeight(t *testing.T) { peer := p2p.Peer("buddy") proofRef := atx1.GetPoetProofRef() - atxHdlr.mclock.EXPECT().LayerToTime(gomock.Any()).Return(time.Now()) atxHdlr.mclock.EXPECT().CurrentLayer().Return(currentLayer) atxHdlr.mockFetch.EXPECT().RegisterPeerHashes(peer, []types.Hash32{proofRef}) atxHdlr.mockFetch.EXPECT().GetPoetProof(gomock.Any(), proofRef) @@ -1304,7 +1292,6 @@ func TestHandler_AtxWeight(t *testing.T) { require.NoError(t, err) proofRef = atx2.GetPoetProofRef() - atxHdlr.mclock.EXPECT().LayerToTime(gomock.Any()).Return(time.Now()) atxHdlr.mclock.EXPECT().CurrentLayer().Return(currentLayer) atxHdlr.mockFetch.EXPECT().RegisterPeerHashes(peer, gomock.Any()).Do( func(_ p2p.Peer, got []types.Hash32) { @@ -1361,7 +1348,6 @@ func TestHandler_WrongHash(t *testing.T) { peer := p2p.Peer("buddy") proofRef := atx.GetPoetProofRef() - atxHdlr.mclock.EXPECT().LayerToTime(gomock.Any()).Return(time.Now()) atxHdlr.mclock.EXPECT().CurrentLayer().Return(currentLayer) atxHdlr.mockFetch.EXPECT().RegisterPeerHashes(peer, []types.Hash32{proofRef}) atxHdlr.mockFetch.EXPECT().GetPoetProof(gomock.Any(), proofRef) diff --git a/beacon/handlers.go b/beacon/handlers.go index 8a5ed08f01..78dee0d889 100644 --- a/beacon/handlers.go +++ b/beacon/handlers.go @@ -62,9 +62,6 @@ func (pd *ProtocolDriver) HandleProposal(ctx context.Context, peer p2p.Peer, msg return errMalformedMessage } - latency := receivedTime.Sub(pd.msgTimes.proposalSendTime(m.EpochID)) - metrics.ReportMessageLatency(pubsub.BeaconProtocol, pubsub.BeaconProposalProtocol, latency) - if !pd.isProposalTimely(&m, receivedTime) { logger.With().Debug("proposal too early", m.EpochID, log.Time("received_at", receivedTime)) return errUntimelyMessage @@ -94,6 +91,7 @@ func (pd *ProtocolDriver) HandleProposal(ctx context.Context, peer p2p.Peer, msg logger.With().Debug("malicious miner proposal potentially valid", log.Stringer("smesher", m.NodeID)) cat = potentiallyValid } + metrics.ReportMessageLatency(pubsub.BeaconProtocol, pubsub.BeaconProposalProtocol, time.Since(pd.msgTimes.proposalSendTime(m.EpochID))) return pd.addProposal(m, cat) } @@ -223,17 +221,12 @@ func (pd *ProtocolDriver) HandleFirstVotes(ctx context.Context, peer p2p.Peer, m logger := pd.logger.WithContext(ctx).WithFields(types.FirstRound, log.Stringer("sender", peer)) logger.Debug("new first votes") - receivedTime := time.Now() - var m FirstVotingMessage if err := codec.Decode(msg, &m); err != nil { logger.With().Warning("received invalid first votes", log.Err(err)) return errMalformedMessage } - latency := receivedTime.Sub(pd.msgTimes.firstVoteSendTime(m.EpochID)) - metrics.ReportMessageLatency(pubsub.BeaconProtocol, pubsub.BeaconFirstVotesProtocol, latency) - currentEpoch := pd.currentEpoch() if m.EpochID != currentEpoch { logger.With().Debug("first votes from different epoch", @@ -257,6 +250,7 @@ func (pd *ProtocolDriver) HandleFirstVotes(ctx context.Context, peer p2p.Peer, m } logger.Debug("received first voting message, storing its votes") + metrics.ReportMessageLatency(pubsub.BeaconProtocol, pubsub.BeaconFirstVotesProtocol, time.Since(pd.msgTimes.firstVoteSendTime(m.EpochID))) return pd.storeFirstVotes(m, minerPK) } @@ -342,9 +336,6 @@ func (pd *ProtocolDriver) HandleFollowingVotes(ctx context.Context, peer p2p.Pee return errEpochNotActive } - latency := receivedTime.Sub(pd.msgTimes.followupVoteSendTime(m.EpochID, m.RoundID)) - metrics.ReportMessageLatency(pubsub.BeaconProtocol, pubsub.BeaconFollowingVotesProtocol, latency) - // don't accept votes from future rounds if !pd.isVoteTimely(&m, receivedTime) { logger.With().Debug("following votes too early", m.RoundID, log.Time("received_at", receivedTime)) @@ -356,12 +347,12 @@ func (pd *ProtocolDriver) HandleFollowingVotes(ctx context.Context, peer p2p.Pee return err } + metrics.ReportMessageLatency(pubsub.BeaconProtocol, pubsub.BeaconFollowingVotesProtocol, time.Since(pd.msgTimes.followupVoteSendTime(m.EpochID, m.RoundID))) logger.Debug("received following voting message, counting its votes") if err = pd.storeFollowingVotes(m, nodeID); err != nil { logger.With().Warning("failed to store following votes", log.Err(err)) return err } - return nil } diff --git a/beacon/weakcoin/handler.go b/beacon/weakcoin/handler.go index df38878753..495e32ce92 100644 --- a/beacon/weakcoin/handler.go +++ b/beacon/weakcoin/handler.go @@ -16,7 +16,6 @@ import ( // HandleProposal defines method to handle Beacon Weak Coin Messages from gossip. func (wc *WeakCoin) HandleProposal(ctx context.Context, peer p2p.Peer, msg []byte) error { - receivedTime := time.Now() logger := wc.logger.WithContext(ctx) var message Message @@ -25,9 +24,6 @@ func (wc *WeakCoin) HandleProposal(ctx context.Context, peer p2p.Peer, msg []byt return pubsub.ErrValidationReject } - latency := receivedTime.Sub(wc.msgTime.WeakCoinProposalSendTime(message.Epoch, message.Round)) - metrics.ReportMessageLatency(pubsub.BeaconProtocol, pubsub.BeaconWeakCoinProtocol, latency) - if err := wc.receiveMessage(ctx, message); err != nil { if !errors.Is(err, errNotSmallest) { logger.With().Debug("invalid proposal", @@ -39,6 +35,7 @@ func (wc *WeakCoin) HandleProposal(ctx context.Context, peer p2p.Peer, msg []byt } return err } + metrics.ReportMessageLatency(pubsub.BeaconProtocol, pubsub.BeaconWeakCoinProtocol, time.Since(wc.msgTime.WeakCoinProposalSendTime(message.Epoch, message.Round))) return nil } diff --git a/checkpoint/recovery_test.go b/checkpoint/recovery_test.go index c2d23be1de..0ab14a4201 100644 --- a/checkpoint/recovery_test.go +++ b/checkpoint/recovery_test.go @@ -246,7 +246,6 @@ func validateAndPreserveData(tb testing.TB, db *sql.Database, deps []*types.Veri for i, vatx := range deps { encoded, err := codec.Encode(vatx) require.NoError(tb, err) - mclock.EXPECT().LayerToTime(gomock.Any()).Return(time.Now()) mclock.EXPECT().CurrentLayer().Return(vatx.PublishEpoch.FirstLayer()) mfetch.EXPECT().RegisterPeerHashes(gomock.Any(), gomock.Any()) mfetch.EXPECT().GetPoetProof(gomock.Any(), vatx.GetPoetProofRef()) diff --git a/proposals/handler.go b/proposals/handler.go index e2fba7036a..b27632dd42 100644 --- a/proposals/handler.go +++ b/proposals/handler.go @@ -248,7 +248,6 @@ func (h *Handler) HandleProposal(ctx context.Context, peer p2p.Peer, data []byte // HandleProposal is the gossip receiver for Proposal. func (h *Handler) handleProposal(ctx context.Context, expHash types.Hash32, peer p2p.Peer, data []byte) error { - receivedTime := time.Now() logger := h.logger.WithContext(ctx) t0 := time.Now() @@ -268,9 +267,6 @@ func (h *Handler) handleProposal(ctx context.Context, expHash types.Hash32, peer return fmt.Errorf("proposal from future: %d/%s", p.Layer, p.ID().String()) } - latency := receivedTime.Sub(h.clock.LayerToTime(p.Layer)) - metrics.ReportMessageLatency(pubsub.ProposalProtocol, pubsub.ProposalProtocol, latency) - if !h.edVerifier.Verify(signing.PROPOSAL, p.SmesherID, p.SignedBytes(), p.Signature) { badSigProposal.Inc() return fmt.Errorf("failed to verify proposal signature") @@ -367,6 +363,7 @@ func (h *Handler) handleProposal(ctx context.Context, expHash types.Hash32, peer } return errMaliciousBallot } + metrics.ReportMessageLatency(pubsub.ProposalProtocol, pubsub.ProposalProtocol, time.Since(h.clock.LayerToTime(p.Layer))) return nil }