From d68b79d6a81e6c84aa0f6419b110faf3962bb5d4 Mon Sep 17 00:00:00 2001 From: k <30611210+countvonzero@users.noreply.github.com> Date: Sat, 16 Sep 2023 18:46:44 +0000 Subject: [PATCH] refuse old proposals from gossip (#5020) ## Motivation there are a lot of old proposals floating in the gossip network. deleting old proposals in #4993 forced the node to process/save these old proposals repeatedly ## Changes - refuse proposals older than current layer - add metrics/log for time it takes to delete proposals --- blocks/generator.go | 21 ++++++++++++------ blocks/generator_test.go | 6 ++++++ blocks/interface.go | 1 + blocks/metrics.go | 8 +++++++ blocks/mocks/mocks.go | 38 +++++++++++++++++++++++++++++++++ proposals/handler.go | 9 +++++++- proposals/handler_test.go | 34 ++++++++++++++++++++++------- proposals/interface.go | 1 + proposals/metrics.go | 1 + proposals/mocks.go | 38 +++++++++++++++++++++++++++++++++ sql/proposals/proposals.go | 2 +- sql/proposals/proposals_test.go | 2 +- 12 files changed, 144 insertions(+), 17 deletions(-) diff --git a/blocks/generator.go b/blocks/generator.go index 21cc9ba4768..b1aa78e40eb 100644 --- a/blocks/generator.go +++ b/blocks/generator.go @@ -135,7 +135,20 @@ func (g *Generator) Stop() { } } +func (g *Generator) pruneAsync() { + g.eg.Go(func() error { + lid := g.msh.ProcessedLayer() + start := time.Now() + if err := proposals.DeleteBefore(g.cdb, lid); err != nil { + g.logger.With().Error("failed to delete old proposals", lid, log.Err(err)) + } + deleteLatency.Observe(time.Since(start).Seconds()) + return nil + }) +} + func (g *Generator) run() error { + g.pruneAsync() var maxLayer types.LayerID for { select { @@ -167,16 +180,12 @@ func (g *Generator) run() error { if len(g.optimisticOutput) > 0 { g.processOptimisticLayers(maxLayer) } - if err := proposals.Delete(g.cdb, out.Layer); err != nil { - g.logger.With().Error("failed to delete old proposals", - out.Layer, - log.Err(err), - ) - } + g.pruneAsync() case <-time.After(g.cfg.GenBlockInterval): if len(g.optimisticOutput) > 0 { g.processOptimisticLayers(maxLayer) } + g.pruneAsync() } } } diff --git a/blocks/generator_test.go b/blocks/generator_test.go index 6866363f2f2..b86bdb619cc 100644 --- a/blocks/generator_test.go +++ b/blocks/generator_test.go @@ -69,6 +69,7 @@ func createTestGenerator(t *testing.T) *testGenerator { mockCert: mocks.NewMockcertifier(ctrl), mockPatrol: mocks.NewMocklayerPatrol(ctrl), } + tg.mockMesh.EXPECT().ProcessedLayer().Return(types.LayerID(1)).AnyTimes() lg := logtest.New(t) cdb := datastore.NewCachedDB(sql.InMemory(), lg) tg.Generator = NewGenerator(cdb, tg.mockExec, tg.mockMesh, tg.mockFetch, tg.mockCert, tg.mockPatrol, @@ -325,7 +326,11 @@ func Test_run(t *testing.T) { t.Run(tc.desc, func(t *testing.T) { tg := createTestGenerator(t) tg.cfg.BlockGasLimit = tc.gasLimit + tg.mockMesh = mocks.NewMockmeshProvider(gomock.NewController(t)) + tg.msh = tg.mockMesh layerID := types.GetEffectiveGenesis().Add(100) + processed := layerID - 1 + tg.mockMesh.EXPECT().ProcessedLayer().DoAndReturn(func() types.LayerID { return processed }).AnyTimes() require.NoError(t, layers.SetApplied(tg.cdb, layerID-1, types.EmptyBlockID)) var meshHash types.Hash32 if tc.optimistic { @@ -398,6 +403,7 @@ func Test_run(t *testing.T) { tg.mockMesh.EXPECT().ProcessLayerPerHareOutput(gomock.Any(), layerID, gomock.Any(), tc.optimistic).DoAndReturn( func(_ context.Context, _ types.LayerID, got types.BlockID, _ bool) error { require.Equal(t, block.ID(), got) + processed = layerID return nil }) tg.mockPatrol.EXPECT().CompleteHare(layerID) diff --git a/blocks/interface.go b/blocks/interface.go index 84f0a8787eb..05c686e49b0 100644 --- a/blocks/interface.go +++ b/blocks/interface.go @@ -14,6 +14,7 @@ type layerPatrol interface { } type meshProvider interface { + ProcessedLayer() types.LayerID AddBlockWithTXs(context.Context, *types.Block) error ProcessLayerPerHareOutput(context.Context, types.LayerID, types.BlockID, bool) error } diff --git a/blocks/metrics.go b/blocks/metrics.go index 2a818408bc2..9c2cb381f83 100644 --- a/blocks/metrics.go +++ b/blocks/metrics.go @@ -30,6 +30,14 @@ var ( failFetchCnt = blockGenCount.WithLabelValues(failFetch) failGenCnt = blockGenCount.WithLabelValues(failGen) failErrCnt = blockGenCount.WithLabelValues(internalErr) + + deleteLatency = metrics.NewHistogramWithBuckets( + "delete_duration", + namespace, + "duration in second to delete old proposals", + []string{}, + prometheus.ExponentialBuckets(0.01, 2, 10), + ).WithLabelValues() ) type collector struct { diff --git a/blocks/mocks/mocks.go b/blocks/mocks/mocks.go index 2bc31816d0d..8c45e4c5dbb 100644 --- a/blocks/mocks/mocks.go +++ b/blocks/mocks/mocks.go @@ -171,6 +171,44 @@ func (c *meshProviderProcessLayerPerHareOutputCall) DoAndReturn(f func(context.C return c } +// ProcessedLayer mocks base method. +func (m *MockmeshProvider) ProcessedLayer() types.LayerID { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ProcessedLayer") + ret0, _ := ret[0].(types.LayerID) + return ret0 +} + +// ProcessedLayer indicates an expected call of ProcessedLayer. +func (mr *MockmeshProviderMockRecorder) ProcessedLayer() *meshProviderProcessedLayerCall { + mr.mock.ctrl.T.Helper() + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ProcessedLayer", reflect.TypeOf((*MockmeshProvider)(nil).ProcessedLayer)) + return &meshProviderProcessedLayerCall{Call: call} +} + +// meshProviderProcessedLayerCall wrap *gomock.Call +type meshProviderProcessedLayerCall struct { + *gomock.Call +} + +// Return rewrite *gomock.Call.Return +func (c *meshProviderProcessedLayerCall) Return(arg0 types.LayerID) *meshProviderProcessedLayerCall { + c.Call = c.Call.Return(arg0) + return c +} + +// Do rewrite *gomock.Call.Do +func (c *meshProviderProcessedLayerCall) Do(f func() types.LayerID) *meshProviderProcessedLayerCall { + c.Call = c.Call.Do(f) + return c +} + +// DoAndReturn rewrite *gomock.Call.DoAndReturn +func (c *meshProviderProcessedLayerCall) DoAndReturn(f func() types.LayerID) *meshProviderProcessedLayerCall { + c.Call = c.Call.DoAndReturn(f) + return c +} + // Mockexecutor is a mock of executor interface. type Mockexecutor struct { ctrl *gomock.Controller diff --git a/proposals/handler.go b/proposals/handler.go index bbd7184099a..4ee8787b931 100644 --- a/proposals/handler.go +++ b/proposals/handler.go @@ -262,6 +262,11 @@ func (h *Handler) handleProposal(ctx context.Context, expHash types.Hash32, peer preGenesis.Inc() return fmt.Errorf("proposal before effective genesis: layer %v", p.Layer) } + if p.Layer <= h.mesh.ProcessedLayer() { + // old proposals have no use for the node + tooLate.Inc() + return fmt.Errorf("proposal too late: layer %v", p.Layer) + } latency := receivedTime.Sub(h.clock.LayerToTime(p.Layer)) metrics.ReportMessageLatency(pubsub.ProposalProtocol, pubsub.ProposalProtocol, latency) @@ -311,7 +316,9 @@ func (h *Handler) handleProposal(ctx context.Context, expHash types.Hash32, peer } proposalDuration.WithLabelValues(dbLookup).Observe(float64(time.Since(t1))) - logger.With().Info("new proposal", log.Int("num_txs", len(p.TxIDs))) + logger.With().Info("new proposal", + log.String("exp hash", expHash.ShortString()), + log.Int("num_txs", len(p.TxIDs))) t2 := time.Now() h.fetcher.RegisterPeerHashes(peer, collectHashes(p)) proposalDuration.WithLabelValues(peerHashes).Observe(float64(time.Since(t2))) diff --git a/proposals/handler_test.go b/proposals/handler_test.go index 3919513f2f0..79fc2e74114 100644 --- a/proposals/handler_test.go +++ b/proposals/handler_test.go @@ -60,6 +60,7 @@ func (ms *mockSet) decodeAnyBallots() *mockSet { func (ms *mockSet) setCurrentLayer(layer types.LayerID) *mockSet { ms.mclock.EXPECT().CurrentLayer().Return(layer).AnyTimes() + ms.mm.EXPECT().ProcessedLayer().Return(layer - 1).AnyTimes() ms.mclock.EXPECT().LayerToTime(gomock.Any()).Return(time.Now().Add(-5 * time.Second)).AnyTimes() return ms } @@ -188,6 +189,7 @@ func withTransactions(ids ...types.TransactionID) createProposalOpt { func createProposal(t *testing.T, opts ...any) *types.Proposal { t.Helper() b := types.RandomBallot() + b.Layer = 10000 p := &types.Proposal{ InnerProposal: types.InnerProposal{ Ballot: *b, @@ -876,6 +878,20 @@ func TestProposal_BeforeEffectiveGenesis(t *testing.T) { checkProposal(t, th.cdb, p, false) } +func TestProposal_TooOld(t *testing.T) { + th := createTestHandlerNoopDecoder(t) + lid := types.LayerID(11) + th.mockSet.setCurrentLayer(lid) + p := createProposal(t) + p.Layer = lid - 1 + data := encodeProposal(t, p) + got := th.HandleSyncedProposal(context.Background(), p.ID().AsHash32(), p2p.NoPeer, data) + require.ErrorContains(t, got, "proposal too late") + + require.Error(t, th.HandleProposal(context.Background(), "", data)) + checkProposal(t, th.cdb, p, false) +} + func TestProposal_BadSignature(t *testing.T) { th := createTestHandlerNoopDecoder(t) p := createProposal(t) @@ -896,6 +912,7 @@ func TestProposal_InconsistentSmeshers(t *testing.T) { TxIDs: []types.TransactionID{types.RandomTransactionID(), types.RandomTransactionID()}, }, } + p.Layer = th.clock.CurrentLayer() signer1, err := signing.NewEdSigner() require.NoError(t, err) signer2, err := signing.NewEdSigner() @@ -936,7 +953,7 @@ func TestProposal_KnownProposal(t *testing.T) { func TestProposal_DuplicateTXs(t *testing.T) { th := createTestHandlerNoopDecoder(t) - lid := types.LayerID(100) + lid := th.clock.CurrentLayer() supported := []*types.Block{ types.NewExistingBlock(types.BlockID{1}, types.InnerBlock{LayerIndex: lid.Sub(1)}), types.NewExistingBlock(types.BlockID{2}, types.InnerBlock{LayerIndex: lid.Sub(2)}), @@ -973,7 +990,7 @@ func TestProposal_DuplicateTXs(t *testing.T) { func TestProposal_TXsNotAvailable(t *testing.T) { th := createTestHandlerNoopDecoder(t) - lid := types.LayerID(100) + lid := th.clock.CurrentLayer() supported := []*types.Block{ types.NewExistingBlock(types.BlockID{1}, types.InnerBlock{LayerIndex: lid.Sub(1)}), types.NewExistingBlock(types.BlockID{2}, types.InnerBlock{LayerIndex: lid.Sub(2)}), @@ -1011,7 +1028,7 @@ func TestProposal_TXsNotAvailable(t *testing.T) { func TestProposal_FailedToAddProposalTXs(t *testing.T) { th := createTestHandlerNoopDecoder(t) - lid := types.LayerID(100) + lid := th.clock.CurrentLayer() supported := []*types.Block{ types.NewExistingBlock(types.BlockID{1}, types.InnerBlock{LayerIndex: lid.Sub(1)}), types.NewExistingBlock(types.BlockID{2}, types.InnerBlock{LayerIndex: lid.Sub(2)}), @@ -1049,7 +1066,7 @@ func TestProposal_FailedToAddProposalTXs(t *testing.T) { func TestProposal_ProposalGossip_Concurrent(t *testing.T) { th := createTestHandlerNoopDecoder(t) - lid := types.LayerID(100) + lid := th.clock.CurrentLayer() supported := []*types.Block{ types.NewExistingBlock(types.BlockID{1}, types.InnerBlock{LayerIndex: lid.Sub(1)}), types.NewExistingBlock(types.BlockID{2}, types.InnerBlock{LayerIndex: lid.Sub(2)}), @@ -1105,7 +1122,7 @@ func TestProposal_ProposalGossip_Concurrent(t *testing.T) { func TestProposal_BroadcastMaliciousGossip(t *testing.T) { th := createTestHandlerNoopDecoder(t) - lid := types.LayerID(100) + lid := th.clock.CurrentLayer() p := createProposal(t, withLayer(lid)) createAtx(t, th.cdb.Database, p.Layer.GetEpoch()-1, p.AtxID, p.SmesherID) require.NoError(t, ballots.Add(th.cdb, &p.Ballot)) @@ -1183,7 +1200,7 @@ func TestProposal_ProposalGossip_Fetched(t *testing.T) { tc := tc t.Run(tc.name, func(t *testing.T) { th := createTestHandlerNoopDecoder(t) - lid := types.LayerID(100) + lid := th.clock.CurrentLayer() supported := []*types.Block{ types.NewExistingBlock(types.BlockID{1}, types.InnerBlock{LayerIndex: lid.Sub(1)}), types.NewExistingBlock(types.BlockID{2}, types.InnerBlock{LayerIndex: lid.Sub(2)}), @@ -1231,7 +1248,7 @@ func TestProposal_ProposalGossip_Fetched(t *testing.T) { func TestProposal_ValidProposal(t *testing.T) { th := createTestHandlerNoopDecoder(t) - lid := types.LayerID(10) + lid := th.clock.CurrentLayer() blks := []*types.Block{ types.NewExistingBlock(types.BlockID{1}, types.InnerBlock{LayerIndex: lid.Sub(1)}), types.NewExistingBlock(types.BlockID{2}, types.InnerBlock{LayerIndex: lid.Sub(2)}), @@ -1268,7 +1285,7 @@ func TestProposal_ValidProposal(t *testing.T) { func TestMetrics(t *testing.T) { th := createTestHandlerNoopDecoder(t) - lid := types.LayerID(100) + lid := th.clock.CurrentLayer() supported := []*types.Block{ types.NewExistingBlock(types.BlockID{1}, types.InnerBlock{LayerIndex: lid.Sub(1)}), types.NewExistingBlock(types.BlockID{2}, types.InnerBlock{LayerIndex: lid.Sub(2)}), @@ -1470,6 +1487,7 @@ func TestHandleSyncedProposalActiveSet(t *testing.T) { } { t.Run(tc.desc, func(t *testing.T) { th := createTestHandler(t) + th.mm.EXPECT().ProcessedLayer().Return(acceptEmpty - 2).AnyTimes() th.cfg.AllowEmptyActiveSet = acceptEmpty pid := p2p.Peer("any") diff --git a/proposals/interface.go b/proposals/interface.go index ff6d43f74dd..32072abd8ee 100644 --- a/proposals/interface.go +++ b/proposals/interface.go @@ -11,6 +11,7 @@ import ( //go:generate mockgen -typed -package=proposals -destination=./mocks.go -source=./interface.go type meshProvider interface { + ProcessedLayer() types.LayerID AddBallot(context.Context, *types.Ballot) (*types.MalfeasanceProof, error) AddTXsFromProposal(context.Context, types.LayerID, types.ProposalID, []types.TransactionID) error } diff --git a/proposals/metrics.go b/proposals/metrics.go index dc56913e157..006fee19774 100644 --- a/proposals/metrics.go +++ b/proposals/metrics.go @@ -93,6 +93,7 @@ var ( malformed = processErrors.WithLabelValues("mal") failedInit = processErrors.WithLabelValues("init") known = processErrors.WithLabelValues("known") + tooLate = processErrors.WithLabelValues("late") preGenesis = processErrors.WithLabelValues("genesis") badSigProposal = processErrors.WithLabelValues("sigp") badSigBallot = processErrors.WithLabelValues("sigb") diff --git a/proposals/mocks.go b/proposals/mocks.go index 696425c6d04..4d24033af2d 100644 --- a/proposals/mocks.go +++ b/proposals/mocks.go @@ -114,6 +114,44 @@ func (c *meshProviderAddTXsFromProposalCall) DoAndReturn(f func(context.Context, return c } +// ProcessedLayer mocks base method. +func (m *MockmeshProvider) ProcessedLayer() types.LayerID { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ProcessedLayer") + ret0, _ := ret[0].(types.LayerID) + return ret0 +} + +// ProcessedLayer indicates an expected call of ProcessedLayer. +func (mr *MockmeshProviderMockRecorder) ProcessedLayer() *meshProviderProcessedLayerCall { + mr.mock.ctrl.T.Helper() + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ProcessedLayer", reflect.TypeOf((*MockmeshProvider)(nil).ProcessedLayer)) + return &meshProviderProcessedLayerCall{Call: call} +} + +// meshProviderProcessedLayerCall wrap *gomock.Call +type meshProviderProcessedLayerCall struct { + *gomock.Call +} + +// Return rewrite *gomock.Call.Return +func (c *meshProviderProcessedLayerCall) Return(arg0 types.LayerID) *meshProviderProcessedLayerCall { + c.Call = c.Call.Return(arg0) + return c +} + +// Do rewrite *gomock.Call.Do +func (c *meshProviderProcessedLayerCall) Do(f func() types.LayerID) *meshProviderProcessedLayerCall { + c.Call = c.Call.Do(f) + return c +} + +// DoAndReturn rewrite *gomock.Call.DoAndReturn +func (c *meshProviderProcessedLayerCall) DoAndReturn(f func() types.LayerID) *meshProviderProcessedLayerCall { + c.Call = c.Call.DoAndReturn(f) + return c +} + // MockeligibilityValidator is a mock of eligibilityValidator interface. type MockeligibilityValidator struct { ctrl *gomock.Controller diff --git a/sql/proposals/proposals.go b/sql/proposals/proposals.go index 30a0c3a154d..d410c2b4e92 100644 --- a/sql/proposals/proposals.go +++ b/sql/proposals/proposals.go @@ -186,7 +186,7 @@ func decodeProposal(stmt *sql.Statement) (*types.Proposal, error) { return proposal, nil } -func Delete(db sql.Executor, lid types.LayerID) error { +func DeleteBefore(db sql.Executor, lid types.LayerID) error { if _, err := db.Exec(`delete from proposals where layer < ?1;`, func(stmt *sql.Statement) { stmt.BindInt64(1, int64(lid)) diff --git a/sql/proposals/proposals_test.go b/sql/proposals/proposals_test.go index d9bf0316e9c..886b68e91e9 100644 --- a/sql/proposals/proposals_test.go +++ b/sql/proposals/proposals_test.go @@ -57,7 +57,7 @@ func TestDelete(t *testing.T) { require.NoError(t, err) require.Len(t, got, numProps) } - require.NoError(t, Delete(db, types.LayerID(maxLayers))) + require.NoError(t, DeleteBefore(db, types.LayerID(maxLayers))) for i := 1; i < maxLayers; i++ { _, err := GetByLayer(db, types.LayerID(i)) require.ErrorIs(t, err, sql.ErrNotFound)