Skip to content

Commit

Permalink
refuse old proposals from gossip (#5020)
Browse files Browse the repository at this point in the history
## Motivation
there are a lot of old proposals floating in the gossip network.
deleting old proposals in #4993 forced the node to process/save these old proposals repeatedly

## Changes
- refuse proposals older than current layer
- add metrics/log for time it takes to delete proposals
  • Loading branch information
countvonzero committed Sep 16, 2023
1 parent 306c43a commit 8919df9
Show file tree
Hide file tree
Showing 12 changed files with 144 additions and 17 deletions.
21 changes: 15 additions & 6 deletions blocks/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,20 @@ func (g *Generator) Stop() {
}
}

func (g *Generator) pruneAsync() {
g.eg.Go(func() error {
lid := g.msh.ProcessedLayer()
start := time.Now()
if err := proposals.DeleteBefore(g.cdb, lid); err != nil {
g.logger.With().Error("failed to delete old proposals", lid, log.Err(err))
}
deleteLatency.Observe(time.Since(start).Seconds())
return nil
})
}

func (g *Generator) run() error {
g.pruneAsync()
var maxLayer types.LayerID
for {
select {
Expand Down Expand Up @@ -167,16 +180,12 @@ func (g *Generator) run() error {
if len(g.optimisticOutput) > 0 {
g.processOptimisticLayers(maxLayer)
}
if err := proposals.Delete(g.cdb, out.Layer); err != nil {
g.logger.With().Error("failed to delete old proposals",
out.Layer,
log.Err(err),
)
}
g.pruneAsync()
case <-time.After(g.cfg.GenBlockInterval):
if len(g.optimisticOutput) > 0 {
g.processOptimisticLayers(maxLayer)
}
g.pruneAsync()
}
}
}
Expand Down
6 changes: 6 additions & 0 deletions blocks/generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ func createTestGenerator(t *testing.T) *testGenerator {
mockCert: mocks.NewMockcertifier(ctrl),
mockPatrol: mocks.NewMocklayerPatrol(ctrl),
}
tg.mockMesh.EXPECT().ProcessedLayer().Return(types.LayerID(1)).AnyTimes()
lg := logtest.New(t)
cdb := datastore.NewCachedDB(sql.InMemory(), lg)
tg.Generator = NewGenerator(cdb, tg.mockExec, tg.mockMesh, tg.mockFetch, tg.mockCert, tg.mockPatrol,
Expand Down Expand Up @@ -325,7 +326,11 @@ func Test_run(t *testing.T) {
t.Run(tc.desc, func(t *testing.T) {
tg := createTestGenerator(t)
tg.cfg.BlockGasLimit = tc.gasLimit
tg.mockMesh = mocks.NewMockmeshProvider(gomock.NewController(t))
tg.msh = tg.mockMesh
layerID := types.GetEffectiveGenesis().Add(100)
processed := layerID - 1
tg.mockMesh.EXPECT().ProcessedLayer().DoAndReturn(func() types.LayerID { return processed }).AnyTimes()
require.NoError(t, layers.SetApplied(tg.cdb, layerID-1, types.EmptyBlockID))
var meshHash types.Hash32
if tc.optimistic {
Expand Down Expand Up @@ -398,6 +403,7 @@ func Test_run(t *testing.T) {
tg.mockMesh.EXPECT().ProcessLayerPerHareOutput(gomock.Any(), layerID, gomock.Any(), tc.optimistic).DoAndReturn(
func(_ context.Context, _ types.LayerID, got types.BlockID, _ bool) error {
require.Equal(t, block.ID(), got)
processed = layerID
return nil
})
tg.mockPatrol.EXPECT().CompleteHare(layerID)
Expand Down
1 change: 1 addition & 0 deletions blocks/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type layerPatrol interface {
}

type meshProvider interface {
ProcessedLayer() types.LayerID
AddBlockWithTXs(context.Context, *types.Block) error
ProcessLayerPerHareOutput(context.Context, types.LayerID, types.BlockID, bool) error
}
Expand Down
8 changes: 8 additions & 0 deletions blocks/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,14 @@ var (
failFetchCnt = blockGenCount.WithLabelValues(failFetch)
failGenCnt = blockGenCount.WithLabelValues(failGen)
failErrCnt = blockGenCount.WithLabelValues(internalErr)

deleteLatency = metrics.NewHistogramWithBuckets(
"delete_duration",
namespace,
"duration in second to delete old proposals",
[]string{},
prometheus.ExponentialBuckets(0.01, 2, 10),
).WithLabelValues()
)

type collector struct {
Expand Down
38 changes: 38 additions & 0 deletions blocks/mocks/mocks.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 8 additions & 1 deletion proposals/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,11 @@ func (h *Handler) handleProposal(ctx context.Context, expHash types.Hash32, peer
preGenesis.Inc()
return fmt.Errorf("proposal before effective genesis: layer %v", p.Layer)
}
if p.Layer <= h.mesh.ProcessedLayer() {
// old proposals have no use for the node
tooLate.Inc()
return fmt.Errorf("proposal too late: layer %v", p.Layer)
}

latency := receivedTime.Sub(h.clock.LayerToTime(p.Layer))
metrics.ReportMessageLatency(pubsub.ProposalProtocol, pubsub.ProposalProtocol, latency)
Expand Down Expand Up @@ -311,7 +316,9 @@ func (h *Handler) handleProposal(ctx context.Context, expHash types.Hash32, peer
}
proposalDuration.WithLabelValues(dbLookup).Observe(float64(time.Since(t1)))

logger.With().Info("new proposal", log.Int("num_txs", len(p.TxIDs)))
logger.With().Info("new proposal",
log.String("exp hash", expHash.ShortString()),
log.Int("num_txs", len(p.TxIDs)))
t2 := time.Now()
h.fetcher.RegisterPeerHashes(peer, collectHashes(p))
proposalDuration.WithLabelValues(peerHashes).Observe(float64(time.Since(t2)))
Expand Down
34 changes: 26 additions & 8 deletions proposals/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func (ms *mockSet) decodeAnyBallots() *mockSet {

func (ms *mockSet) setCurrentLayer(layer types.LayerID) *mockSet {
ms.mclock.EXPECT().CurrentLayer().Return(layer).AnyTimes()
ms.mm.EXPECT().ProcessedLayer().Return(layer - 1).AnyTimes()
ms.mclock.EXPECT().LayerToTime(gomock.Any()).Return(time.Now().Add(-5 * time.Second)).AnyTimes()
return ms
}
Expand Down Expand Up @@ -188,6 +189,7 @@ func withTransactions(ids ...types.TransactionID) createProposalOpt {
func createProposal(t *testing.T, opts ...any) *types.Proposal {
t.Helper()
b := types.RandomBallot()
b.Layer = 10000
p := &types.Proposal{
InnerProposal: types.InnerProposal{
Ballot: *b,
Expand Down Expand Up @@ -876,6 +878,20 @@ func TestProposal_BeforeEffectiveGenesis(t *testing.T) {
checkProposal(t, th.cdb, p, false)
}

func TestProposal_TooOld(t *testing.T) {
th := createTestHandlerNoopDecoder(t)
lid := types.LayerID(11)
th.mockSet.setCurrentLayer(lid)
p := createProposal(t)
p.Layer = lid - 1
data := encodeProposal(t, p)
got := th.HandleSyncedProposal(context.Background(), p.ID().AsHash32(), p2p.NoPeer, data)
require.ErrorContains(t, got, "proposal too late")

require.Error(t, th.HandleProposal(context.Background(), "", data))
checkProposal(t, th.cdb, p, false)
}

func TestProposal_BadSignature(t *testing.T) {
th := createTestHandlerNoopDecoder(t)
p := createProposal(t)
Expand All @@ -896,6 +912,7 @@ func TestProposal_InconsistentSmeshers(t *testing.T) {
TxIDs: []types.TransactionID{types.RandomTransactionID(), types.RandomTransactionID()},
},
}
p.Layer = th.clock.CurrentLayer()
signer1, err := signing.NewEdSigner()
require.NoError(t, err)
signer2, err := signing.NewEdSigner()
Expand Down Expand Up @@ -936,7 +953,7 @@ func TestProposal_KnownProposal(t *testing.T) {

func TestProposal_DuplicateTXs(t *testing.T) {
th := createTestHandlerNoopDecoder(t)
lid := types.LayerID(100)
lid := th.clock.CurrentLayer()
supported := []*types.Block{
types.NewExistingBlock(types.BlockID{1}, types.InnerBlock{LayerIndex: lid.Sub(1)}),
types.NewExistingBlock(types.BlockID{2}, types.InnerBlock{LayerIndex: lid.Sub(2)}),
Expand Down Expand Up @@ -973,7 +990,7 @@ func TestProposal_DuplicateTXs(t *testing.T) {

func TestProposal_TXsNotAvailable(t *testing.T) {
th := createTestHandlerNoopDecoder(t)
lid := types.LayerID(100)
lid := th.clock.CurrentLayer()
supported := []*types.Block{
types.NewExistingBlock(types.BlockID{1}, types.InnerBlock{LayerIndex: lid.Sub(1)}),
types.NewExistingBlock(types.BlockID{2}, types.InnerBlock{LayerIndex: lid.Sub(2)}),
Expand Down Expand Up @@ -1011,7 +1028,7 @@ func TestProposal_TXsNotAvailable(t *testing.T) {

func TestProposal_FailedToAddProposalTXs(t *testing.T) {
th := createTestHandlerNoopDecoder(t)
lid := types.LayerID(100)
lid := th.clock.CurrentLayer()
supported := []*types.Block{
types.NewExistingBlock(types.BlockID{1}, types.InnerBlock{LayerIndex: lid.Sub(1)}),
types.NewExistingBlock(types.BlockID{2}, types.InnerBlock{LayerIndex: lid.Sub(2)}),
Expand Down Expand Up @@ -1049,7 +1066,7 @@ func TestProposal_FailedToAddProposalTXs(t *testing.T) {

func TestProposal_ProposalGossip_Concurrent(t *testing.T) {
th := createTestHandlerNoopDecoder(t)
lid := types.LayerID(100)
lid := th.clock.CurrentLayer()
supported := []*types.Block{
types.NewExistingBlock(types.BlockID{1}, types.InnerBlock{LayerIndex: lid.Sub(1)}),
types.NewExistingBlock(types.BlockID{2}, types.InnerBlock{LayerIndex: lid.Sub(2)}),
Expand Down Expand Up @@ -1105,7 +1122,7 @@ func TestProposal_ProposalGossip_Concurrent(t *testing.T) {

func TestProposal_BroadcastMaliciousGossip(t *testing.T) {
th := createTestHandlerNoopDecoder(t)
lid := types.LayerID(100)
lid := th.clock.CurrentLayer()
p := createProposal(t, withLayer(lid))
createAtx(t, th.cdb.Database, p.Layer.GetEpoch()-1, p.AtxID, p.SmesherID)
require.NoError(t, ballots.Add(th.cdb, &p.Ballot))
Expand Down Expand Up @@ -1183,7 +1200,7 @@ func TestProposal_ProposalGossip_Fetched(t *testing.T) {
tc := tc
t.Run(tc.name, func(t *testing.T) {
th := createTestHandlerNoopDecoder(t)
lid := types.LayerID(100)
lid := th.clock.CurrentLayer()
supported := []*types.Block{
types.NewExistingBlock(types.BlockID{1}, types.InnerBlock{LayerIndex: lid.Sub(1)}),
types.NewExistingBlock(types.BlockID{2}, types.InnerBlock{LayerIndex: lid.Sub(2)}),
Expand Down Expand Up @@ -1231,7 +1248,7 @@ func TestProposal_ProposalGossip_Fetched(t *testing.T) {

func TestProposal_ValidProposal(t *testing.T) {
th := createTestHandlerNoopDecoder(t)
lid := types.LayerID(10)
lid := th.clock.CurrentLayer()
blks := []*types.Block{
types.NewExistingBlock(types.BlockID{1}, types.InnerBlock{LayerIndex: lid.Sub(1)}),
types.NewExistingBlock(types.BlockID{2}, types.InnerBlock{LayerIndex: lid.Sub(2)}),
Expand Down Expand Up @@ -1268,7 +1285,7 @@ func TestProposal_ValidProposal(t *testing.T) {

func TestMetrics(t *testing.T) {
th := createTestHandlerNoopDecoder(t)
lid := types.LayerID(100)
lid := th.clock.CurrentLayer()
supported := []*types.Block{
types.NewExistingBlock(types.BlockID{1}, types.InnerBlock{LayerIndex: lid.Sub(1)}),
types.NewExistingBlock(types.BlockID{2}, types.InnerBlock{LayerIndex: lid.Sub(2)}),
Expand Down Expand Up @@ -1470,6 +1487,7 @@ func TestHandleSyncedProposalActiveSet(t *testing.T) {
} {
t.Run(tc.desc, func(t *testing.T) {
th := createTestHandler(t)
th.mm.EXPECT().ProcessedLayer().Return(acceptEmpty - 2).AnyTimes()
th.cfg.AllowEmptyActiveSet = acceptEmpty
pid := p2p.Peer("any")

Expand Down
1 change: 1 addition & 0 deletions proposals/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
//go:generate mockgen -typed -package=proposals -destination=./mocks.go -source=./interface.go

type meshProvider interface {
ProcessedLayer() types.LayerID
AddBallot(context.Context, *types.Ballot) (*types.MalfeasanceProof, error)
AddTXsFromProposal(context.Context, types.LayerID, types.ProposalID, []types.TransactionID) error
}
Expand Down
1 change: 1 addition & 0 deletions proposals/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ var (
malformed = processErrors.WithLabelValues("mal")
failedInit = processErrors.WithLabelValues("init")
known = processErrors.WithLabelValues("known")
tooLate = processErrors.WithLabelValues("late")
preGenesis = processErrors.WithLabelValues("genesis")
badSigProposal = processErrors.WithLabelValues("sigp")
badSigBallot = processErrors.WithLabelValues("sigb")
Expand Down
38 changes: 38 additions & 0 deletions proposals/mocks.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion sql/proposals/proposals.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func decodeProposal(stmt *sql.Statement) (*types.Proposal, error) {
return proposal, nil
}

func Delete(db sql.Executor, lid types.LayerID) error {
func DeleteBefore(db sql.Executor, lid types.LayerID) error {
if _, err := db.Exec(`delete from proposals where layer < ?1;`,
func(stmt *sql.Statement) {
stmt.BindInt64(1, int64(lid))
Expand Down
2 changes: 1 addition & 1 deletion sql/proposals/proposals_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func TestDelete(t *testing.T) {
require.NoError(t, err)
require.Len(t, got, numProps)
}
require.NoError(t, Delete(db, types.LayerID(maxLayers)))
require.NoError(t, DeleteBefore(db, types.LayerID(maxLayers)))
for i := 1; i < maxLayers; i++ {
_, err := GetByLayer(db, types.LayerID(i))
require.ErrorIs(t, err, sql.ErrNotFound)
Expand Down

0 comments on commit 8919df9

Please sign in to comment.