From ed0fe76c880d290642f23bf6b6e2cb51b0ced3f9 Mon Sep 17 00:00:00 2001 From: Dmitry Shulyak Date: Thu, 28 Sep 2023 07:41:03 +0000 Subject: [PATCH] don't track latency for messages that are not gossipped futher (#5064) this is mainly to prevent duplicates from skewing results. hare1 is not fixed in this change latency metering is dropped for atx protocol, as we don't have good reference for it closes: https://github.com/spacemeshos/go-spacemesh/issues/4623 --- activation/handler.go | 6 ------ activation/handler_test.go | 14 -------------- beacon/handlers.go | 15 +++------------ beacon/weakcoin/handler.go | 5 +---- checkpoint/recovery_test.go | 1 - proposals/handler.go | 5 +---- 6 files changed, 5 insertions(+), 41 deletions(-) diff --git a/activation/handler.go b/activation/handler.go index e46400f77a..eb5e808e77 100644 --- a/activation/handler.go +++ b/activation/handler.go @@ -15,7 +15,6 @@ import ( "github.com/spacemeshos/go-spacemesh/datastore" "github.com/spacemeshos/go-spacemesh/events" "github.com/spacemeshos/go-spacemesh/log" - "github.com/spacemeshos/go-spacemesh/metrics" "github.com/spacemeshos/go-spacemesh/p2p" "github.com/spacemeshos/go-spacemesh/p2p/pubsub" "github.com/spacemeshos/go-spacemesh/signing" @@ -509,11 +508,6 @@ func (h *Handler) handleAtx(ctx context.Context, expHash types.Hash32, peer p2p. return fmt.Errorf("%w: %w", errMalformedData, err) } - epochStart := h.clock.LayerToTime(atx.PublishEpoch.FirstLayer()) - poetRoundEnd := epochStart.Add(h.poetCfg.PhaseShift - h.poetCfg.CycleGap) - latency := receivedTime.Sub(poetRoundEnd) - metrics.ReportMessageLatency(pubsub.AtxProtocol, pubsub.AtxProtocol, latency) - atx.SetReceived(receivedTime.Local()) if err := atx.Initialize(); err != nil { return fmt.Errorf("failed to derive ID from atx: %w", err) diff --git a/activation/handler_test.go b/activation/handler_test.go index 4ac25d2f70..6b109576bf 100644 --- a/activation/handler_test.go +++ b/activation/handler_test.go @@ -924,7 +924,6 @@ func TestHandler_HandleGossipAtx(t *testing.T) { require.NoError(t, err) atxHdlr.mclock.EXPECT().CurrentLayer().Return(second.PublishEpoch.FirstLayer()) - atxHdlr.mclock.EXPECT().LayerToTime(gomock.Any()).Return(time.Now()) atxHdlr.mockFetch.EXPECT().RegisterPeerHashes(gomock.Any(), gomock.Any()) atxHdlr.mockFetch.EXPECT().GetPoetProof(gomock.Any(), second.GetPoetProofRef()).Return(errors.New("woof")) err = atxHdlr.HandleGossipAtx(context.Background(), "", secondData) @@ -932,7 +931,6 @@ func TestHandler_HandleGossipAtx(t *testing.T) { require.ErrorContains(t, err, "missing poet proof") atxHdlr.mclock.EXPECT().CurrentLayer().Return(second.PublishEpoch.FirstLayer()) - atxHdlr.mclock.EXPECT().LayerToTime(gomock.Any()).Return(time.Now()) atxHdlr.mockFetch.EXPECT().RegisterPeerHashes(gomock.Any(), gomock.Any()) atxHdlr.mockFetch.EXPECT().GetPoetProof(gomock.Any(), second.GetPoetProofRef()) atxHdlr.mockFetch.EXPECT().GetAtxs(gomock.Any(), gomock.Any()).DoAndReturn( @@ -941,7 +939,6 @@ func TestHandler_HandleGossipAtx(t *testing.T) { data, err := codec.Encode(first) require.NoError(t, err) atxHdlr.mclock.EXPECT().CurrentLayer().Return(first.PublishEpoch.FirstLayer()) - atxHdlr.mclock.EXPECT().LayerToTime(gomock.Any()).Return(time.Now()) atxHdlr.mValidator.EXPECT().Post(gomock.Any(), nodeID1, goldenATXID, first.InitialPost, gomock.Any(), first.NumUnits) atxHdlr.mValidator.EXPECT().VRFNonce(nodeID1, goldenATXID, &vrfNonce, gomock.Any(), first.NumUnits) atxHdlr.mockFetch.EXPECT().RegisterPeerHashes(gomock.Any(), gomock.Any()) @@ -981,7 +978,6 @@ func TestHandler_HandleSyncedAtx(t *testing.T) { buf, err := codec.Encode(atx) require.NoError(t, err) - atxHdlr.mclock.EXPECT().LayerToTime(gomock.Any()).Return(time.Now()) require.ErrorContains(t, atxHdlr.HandleSyncedAtx(context.Background(), atx.ID().Hash32(), p2p.NoPeer, buf), fmt.Sprintf("nil nipst for atx %v", atx.ID())) }) @@ -999,10 +995,8 @@ func TestHandler_HandleSyncedAtx(t *testing.T) { buf, err := codec.Encode(atx) require.NoError(t, err) - atxHdlr.mclock.EXPECT().LayerToTime(gomock.Any()).Return(time.Now()) require.NoError(t, atxHdlr.HandleSyncedAtx(context.Background(), atx.ID().Hash32(), p2p.NoPeer, buf)) - atxHdlr.mclock.EXPECT().LayerToTime(gomock.Any()).Return(time.Now()) require.Error(t, atxHdlr.HandleGossipAtx(context.Background(), "", buf)) }) t.Run("known atx from local id is allowed", func(t *testing.T) { @@ -1018,11 +1012,7 @@ func TestHandler_HandleSyncedAtx(t *testing.T) { buf, err := codec.Encode(atx) require.NoError(t, err) - - atxHdlr.mclock.EXPECT().LayerToTime(gomock.Any()).Return(time.Now()) require.NoError(t, atxHdlr.HandleSyncedAtx(context.Background(), atx.ID().Hash32(), p2p.NoPeer, buf)) - - atxHdlr.mclock.EXPECT().LayerToTime(gomock.Any()).Return(time.Now()) require.NoError(t, atxHdlr.HandleGossipAtx(context.Background(), localID, buf)) }) @@ -1036,7 +1026,6 @@ func TestHandler_HandleSyncedAtx(t *testing.T) { buf, err := codec.Encode(atx) require.NoError(t, err) - atxHdlr.mclock.EXPECT().LayerToTime(gomock.Any()).Return(time.Now()) require.ErrorContains(t, atxHdlr.HandleSyncedAtx(context.Background(), atx.ID().Hash32(), p2p.NoPeer, buf), "failed to verify atx signature") }) } @@ -1263,7 +1252,6 @@ func TestHandler_AtxWeight(t *testing.T) { peer := p2p.Peer("buddy") proofRef := atx1.GetPoetProofRef() - atxHdlr.mclock.EXPECT().LayerToTime(gomock.Any()).Return(time.Now()) atxHdlr.mclock.EXPECT().CurrentLayer().Return(currentLayer) atxHdlr.mockFetch.EXPECT().RegisterPeerHashes(peer, []types.Hash32{proofRef}) atxHdlr.mockFetch.EXPECT().GetPoetProof(gomock.Any(), proofRef) @@ -1304,7 +1292,6 @@ func TestHandler_AtxWeight(t *testing.T) { require.NoError(t, err) proofRef = atx2.GetPoetProofRef() - atxHdlr.mclock.EXPECT().LayerToTime(gomock.Any()).Return(time.Now()) atxHdlr.mclock.EXPECT().CurrentLayer().Return(currentLayer) atxHdlr.mockFetch.EXPECT().RegisterPeerHashes(peer, gomock.Any()).Do( func(_ p2p.Peer, got []types.Hash32) { @@ -1361,7 +1348,6 @@ func TestHandler_WrongHash(t *testing.T) { peer := p2p.Peer("buddy") proofRef := atx.GetPoetProofRef() - atxHdlr.mclock.EXPECT().LayerToTime(gomock.Any()).Return(time.Now()) atxHdlr.mclock.EXPECT().CurrentLayer().Return(currentLayer) atxHdlr.mockFetch.EXPECT().RegisterPeerHashes(peer, []types.Hash32{proofRef}) atxHdlr.mockFetch.EXPECT().GetPoetProof(gomock.Any(), proofRef) diff --git a/beacon/handlers.go b/beacon/handlers.go index 8a5ed08f01..78dee0d889 100644 --- a/beacon/handlers.go +++ b/beacon/handlers.go @@ -62,9 +62,6 @@ func (pd *ProtocolDriver) HandleProposal(ctx context.Context, peer p2p.Peer, msg return errMalformedMessage } - latency := receivedTime.Sub(pd.msgTimes.proposalSendTime(m.EpochID)) - metrics.ReportMessageLatency(pubsub.BeaconProtocol, pubsub.BeaconProposalProtocol, latency) - if !pd.isProposalTimely(&m, receivedTime) { logger.With().Debug("proposal too early", m.EpochID, log.Time("received_at", receivedTime)) return errUntimelyMessage @@ -94,6 +91,7 @@ func (pd *ProtocolDriver) HandleProposal(ctx context.Context, peer p2p.Peer, msg logger.With().Debug("malicious miner proposal potentially valid", log.Stringer("smesher", m.NodeID)) cat = potentiallyValid } + metrics.ReportMessageLatency(pubsub.BeaconProtocol, pubsub.BeaconProposalProtocol, time.Since(pd.msgTimes.proposalSendTime(m.EpochID))) return pd.addProposal(m, cat) } @@ -223,17 +221,12 @@ func (pd *ProtocolDriver) HandleFirstVotes(ctx context.Context, peer p2p.Peer, m logger := pd.logger.WithContext(ctx).WithFields(types.FirstRound, log.Stringer("sender", peer)) logger.Debug("new first votes") - receivedTime := time.Now() - var m FirstVotingMessage if err := codec.Decode(msg, &m); err != nil { logger.With().Warning("received invalid first votes", log.Err(err)) return errMalformedMessage } - latency := receivedTime.Sub(pd.msgTimes.firstVoteSendTime(m.EpochID)) - metrics.ReportMessageLatency(pubsub.BeaconProtocol, pubsub.BeaconFirstVotesProtocol, latency) - currentEpoch := pd.currentEpoch() if m.EpochID != currentEpoch { logger.With().Debug("first votes from different epoch", @@ -257,6 +250,7 @@ func (pd *ProtocolDriver) HandleFirstVotes(ctx context.Context, peer p2p.Peer, m } logger.Debug("received first voting message, storing its votes") + metrics.ReportMessageLatency(pubsub.BeaconProtocol, pubsub.BeaconFirstVotesProtocol, time.Since(pd.msgTimes.firstVoteSendTime(m.EpochID))) return pd.storeFirstVotes(m, minerPK) } @@ -342,9 +336,6 @@ func (pd *ProtocolDriver) HandleFollowingVotes(ctx context.Context, peer p2p.Pee return errEpochNotActive } - latency := receivedTime.Sub(pd.msgTimes.followupVoteSendTime(m.EpochID, m.RoundID)) - metrics.ReportMessageLatency(pubsub.BeaconProtocol, pubsub.BeaconFollowingVotesProtocol, latency) - // don't accept votes from future rounds if !pd.isVoteTimely(&m, receivedTime) { logger.With().Debug("following votes too early", m.RoundID, log.Time("received_at", receivedTime)) @@ -356,12 +347,12 @@ func (pd *ProtocolDriver) HandleFollowingVotes(ctx context.Context, peer p2p.Pee return err } + metrics.ReportMessageLatency(pubsub.BeaconProtocol, pubsub.BeaconFollowingVotesProtocol, time.Since(pd.msgTimes.followupVoteSendTime(m.EpochID, m.RoundID))) logger.Debug("received following voting message, counting its votes") if err = pd.storeFollowingVotes(m, nodeID); err != nil { logger.With().Warning("failed to store following votes", log.Err(err)) return err } - return nil } diff --git a/beacon/weakcoin/handler.go b/beacon/weakcoin/handler.go index df38878753..495e32ce92 100644 --- a/beacon/weakcoin/handler.go +++ b/beacon/weakcoin/handler.go @@ -16,7 +16,6 @@ import ( // HandleProposal defines method to handle Beacon Weak Coin Messages from gossip. func (wc *WeakCoin) HandleProposal(ctx context.Context, peer p2p.Peer, msg []byte) error { - receivedTime := time.Now() logger := wc.logger.WithContext(ctx) var message Message @@ -25,9 +24,6 @@ func (wc *WeakCoin) HandleProposal(ctx context.Context, peer p2p.Peer, msg []byt return pubsub.ErrValidationReject } - latency := receivedTime.Sub(wc.msgTime.WeakCoinProposalSendTime(message.Epoch, message.Round)) - metrics.ReportMessageLatency(pubsub.BeaconProtocol, pubsub.BeaconWeakCoinProtocol, latency) - if err := wc.receiveMessage(ctx, message); err != nil { if !errors.Is(err, errNotSmallest) { logger.With().Debug("invalid proposal", @@ -39,6 +35,7 @@ func (wc *WeakCoin) HandleProposal(ctx context.Context, peer p2p.Peer, msg []byt } return err } + metrics.ReportMessageLatency(pubsub.BeaconProtocol, pubsub.BeaconWeakCoinProtocol, time.Since(wc.msgTime.WeakCoinProposalSendTime(message.Epoch, message.Round))) return nil } diff --git a/checkpoint/recovery_test.go b/checkpoint/recovery_test.go index c2d23be1de..0ab14a4201 100644 --- a/checkpoint/recovery_test.go +++ b/checkpoint/recovery_test.go @@ -246,7 +246,6 @@ func validateAndPreserveData(tb testing.TB, db *sql.Database, deps []*types.Veri for i, vatx := range deps { encoded, err := codec.Encode(vatx) require.NoError(tb, err) - mclock.EXPECT().LayerToTime(gomock.Any()).Return(time.Now()) mclock.EXPECT().CurrentLayer().Return(vatx.PublishEpoch.FirstLayer()) mfetch.EXPECT().RegisterPeerHashes(gomock.Any(), gomock.Any()) mfetch.EXPECT().GetPoetProof(gomock.Any(), vatx.GetPoetProofRef()) diff --git a/proposals/handler.go b/proposals/handler.go index e2fba7036a..b27632dd42 100644 --- a/proposals/handler.go +++ b/proposals/handler.go @@ -248,7 +248,6 @@ func (h *Handler) HandleProposal(ctx context.Context, peer p2p.Peer, data []byte // HandleProposal is the gossip receiver for Proposal. func (h *Handler) handleProposal(ctx context.Context, expHash types.Hash32, peer p2p.Peer, data []byte) error { - receivedTime := time.Now() logger := h.logger.WithContext(ctx) t0 := time.Now() @@ -268,9 +267,6 @@ func (h *Handler) handleProposal(ctx context.Context, expHash types.Hash32, peer return fmt.Errorf("proposal from future: %d/%s", p.Layer, p.ID().String()) } - latency := receivedTime.Sub(h.clock.LayerToTime(p.Layer)) - metrics.ReportMessageLatency(pubsub.ProposalProtocol, pubsub.ProposalProtocol, latency) - if !h.edVerifier.Verify(signing.PROPOSAL, p.SmesherID, p.SignedBytes(), p.Signature) { badSigProposal.Inc() return fmt.Errorf("failed to verify proposal signature") @@ -367,6 +363,7 @@ func (h *Handler) handleProposal(ctx context.Context, expHash types.Hash32, peer } return errMaliciousBallot } + metrics.ReportMessageLatency(pubsub.ProposalProtocol, pubsub.ProposalProtocol, time.Since(h.clock.LayerToTime(p.Layer))) return nil }