diff --git a/activation/activation.go b/activation/activation.go index 7dc4b55ef8..ec43a0cf8a 100644 --- a/activation/activation.go +++ b/activation/activation.go @@ -59,9 +59,10 @@ const ( // Config defines configuration for Builder. type Config struct { - CoinbaseAccount types.Address - GoldenATXID types.ATXID - LayersPerEpoch uint32 + CoinbaseAccount types.Address + GoldenATXID types.ATXID + LayersPerEpoch uint32 + RegossipInterval time.Duration } // Builder struct is the struct that orchestrates the creation of activation transactions @@ -79,6 +80,7 @@ type Builder struct { coinbaseAccount types.Address goldenATXID types.ATXID layersPerEpoch uint32 + regossipInterval time.Duration cdb *datastore.CachedDB atxHandler atxHandler publisher pubsub.Publisher @@ -165,6 +167,7 @@ func NewBuilder( coinbaseAccount: conf.CoinbaseAccount, goldenATXID: conf.GoldenATXID, layersPerEpoch: conf.LayersPerEpoch, + regossipInterval: conf.RegossipInterval, cdb: cdb, atxHandler: hdlr, publisher: publisher, @@ -235,7 +238,22 @@ func (b *Builder) StartSmeshing(coinbase types.Address, opts PostSetupOpts) erro b.run(ctx) return nil }) - + if b.regossipInterval != 0 { + b.eg.Go(func() error { + ticker := time.NewTicker(b.regossipInterval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + if err := b.Regossip(ctx); err != nil { + b.log.With().Warning("failed to regossip", log.Context(ctx), log.Err(err)) + } + } + } + }) + } return nil } @@ -724,6 +742,28 @@ func (b *Builder) GetPositioningAtx() (types.ATXID, error) { return id, nil } +func (b *Builder) Regossip(ctx context.Context) error { + epoch := b.layerClock.CurrentLayer().GetEpoch() + atx, err := atxs.GetIDByEpochAndNodeID(b.cdb, epoch, b.signer.NodeID()) + if errors.Is(err, sql.ErrNotFound) { + return nil + } else if err != nil { + return err + } + blob, err := atxs.GetBlob(b.cdb, atx[:]) + if err != nil { + return fmt.Errorf("get blob %s: %w", atx.ShortString(), err) + } + if len(blob) == 0 { + return nil // checkpoint + } + if err := b.publisher.Publish(ctx, pubsub.AtxProtocol, blob); err != nil { + return fmt.Errorf("republish %s: %w", atx.ShortString(), err) + } + b.log.With().Debug("regossipped atx", log.Context(ctx), log.ShortStringer("atx", atx)) + return nil +} + // SignAndFinalizeAtx signs the atx with specified signer and calculates the ID of the ATX. func SignAndFinalizeAtx(signer *signing.EdSigner, atx *types.ActivationTx) error { atx.Signature = signer.Sign(signing.ATX, atx.SignedBytes()) diff --git a/activation/activation_test.go b/activation/activation_test.go index 1a9e9ace1d..b1a04af8e4 100644 --- a/activation/activation_test.go +++ b/activation/activation_test.go @@ -1271,6 +1271,36 @@ func TestWaitPositioningAtx(t *testing.T) { } } +func TestRegossip(t *testing.T) { + layer := types.LayerID(10) + t.Run("not found", func(t *testing.T) { + h := newTestBuilder(t) + h.mclock.EXPECT().CurrentLayer().Return(layer) + require.NoError(t, h.Regossip(context.Background())) + }) + t.Run("success", func(t *testing.T) { + h := newTestBuilder(t) + atx := newActivationTx(t, + h.signer, 0, types.EmptyATXID, types.EmptyATXID, nil, + layer.GetEpoch(), 0, 1, types.Address{}, 1, &types.NIPost{}) + require.NoError(t, atxs.Add(h.cdb.Database, atx)) + blob, err := atxs.GetBlob(h.cdb.Database, atx.ID().Bytes()) + require.NoError(t, err) + h.mclock.EXPECT().CurrentLayer().Return(layer) + + ctx := context.Background() + h.mpub.EXPECT().Publish(ctx, pubsub.AtxProtocol, blob) + require.NoError(t, h.Regossip(ctx)) + }) + t.Run("checkpointed", func(t *testing.T) { + h := newTestBuilder(t) + require.NoError(t, atxs.AddCheckpointed(h.cdb.Database, + &atxs.CheckpointAtx{ID: types.ATXID{1}, Epoch: layer.GetEpoch(), SmesherID: h.sig.NodeID()})) + h.mclock.EXPECT().CurrentLayer().Return(layer) + require.NoError(t, h.Regossip(context.Background())) + }) +} + func TestWaitingToBuildNipostChallengeWithJitter(t *testing.T) { t.Run("before grace period", func(t *testing.T) { // ┌──grace period──┐ diff --git a/activation/handler.go b/activation/handler.go index bfd3a06f58..e46400f77a 100644 --- a/activation/handler.go +++ b/activation/handler.go @@ -39,6 +39,7 @@ type atxChan struct { // Handler processes the atxs received from all nodes and their validity status. type Handler struct { + local p2p.Peer cdb *datastore.CachedDB edVerifier *signing.EdVerifier clock layerClock @@ -57,6 +58,7 @@ type Handler struct { // NewHandler returns a data handler for ATX. func NewHandler( + local p2p.Peer, cdb *datastore.CachedDB, edVerifier *signing.EdVerifier, c layerClock, @@ -71,6 +73,7 @@ func NewHandler( poetCfg PoetConfig, ) *Handler { return &Handler{ + local: local, cdb: cdb, edVerifier: edVerifier, clock: c, @@ -493,6 +496,9 @@ func (h *Handler) HandleGossipAtx(ctx context.Context, peer p2p.Peer, msg []byte log.Err(err), ) } + if errors.Is(err, errKnownAtx) && peer == h.local { + return nil + } return err } diff --git a/activation/handler_test.go b/activation/handler_test.go index 5f573b16b9..4ac25d2f70 100644 --- a/activation/handler_test.go +++ b/activation/handler_test.go @@ -31,7 +31,10 @@ import ( "github.com/spacemeshos/go-spacemesh/system/mocks" ) -const layersPerEpochBig = 1000 +const ( + layersPerEpochBig = 1000 + localID = "local" +) func newMerkleProof(t testing.TB, challenge types.Hash32, otherLeafs []types.Hash32) (types.MerkleProof, types.Hash32) { t.Helper() @@ -101,7 +104,7 @@ func newTestHandler(tb testing.TB, goldenATXID types.ATXID) *testHandler { mbeacon := NewMockAtxReceiver(ctrl) mtortoise := mocks.NewMockTortoise(ctrl) - atxHdlr := NewHandler(cdb, verifier, mclock, mpub, mockFetch, 1, goldenATXID, mValidator, mbeacon, mtortoise, lg, PoetConfig{}) + atxHdlr := NewHandler(localID, cdb, verifier, mclock, mpub, mockFetch, 1, goldenATXID, mValidator, mbeacon, mtortoise, lg, PoetConfig{}) return &testHandler{ Handler: atxHdlr, @@ -1002,6 +1005,26 @@ func TestHandler_HandleSyncedAtx(t *testing.T) { atxHdlr.mclock.EXPECT().LayerToTime(gomock.Any()).Return(time.Now()) require.Error(t, atxHdlr.HandleGossipAtx(context.Background(), "", buf)) }) + t.Run("known atx from local id is allowed", func(t *testing.T) { + t.Parallel() + + atxHdlr := newTestHandler(t, goldenATXID) + + atx := newActivationTx(t, sig, 0, types.EmptyATXID, types.EmptyATXID, nil, 0, 0, 0, types.Address{2, 4, 5}, 2, nil) + + atxHdlr.mbeacon.EXPECT().OnAtx(gomock.Any()) + atxHdlr.mtortoise.EXPECT().OnAtx(gomock.Any()) + require.NoError(t, atxHdlr.ProcessAtx(context.Background(), atx)) + + buf, err := codec.Encode(atx) + require.NoError(t, err) + + atxHdlr.mclock.EXPECT().LayerToTime(gomock.Any()).Return(time.Now()) + require.NoError(t, atxHdlr.HandleSyncedAtx(context.Background(), atx.ID().Hash32(), p2p.NoPeer, buf)) + + atxHdlr.mclock.EXPECT().LayerToTime(gomock.Any()).Return(time.Now()) + require.NoError(t, atxHdlr.HandleGossipAtx(context.Background(), localID, buf)) + }) t.Run("atx with invalid signature", func(t *testing.T) { t.Parallel() diff --git a/checkpoint/recovery_test.go b/checkpoint/recovery_test.go index 264c1bb618..c2d23be1de 100644 --- a/checkpoint/recovery_test.go +++ b/checkpoint/recovery_test.go @@ -228,6 +228,7 @@ func validateAndPreserveData(tb testing.TB, db *sql.Database, deps []*types.Veri mtrtl := smocks.NewMockTortoise(ctrl) cdb := datastore.NewCachedDB(db, lg) atxHandler := activation.NewHandler( + "", cdb, edVerifier, mclock, diff --git a/config/config.go b/config/config.go index 2feb7022a2..139ed2277d 100644 --- a/config/config.go +++ b/config/config.go @@ -118,6 +118,8 @@ type BaseConfig struct { // MinerGoodAtxsPercent is a threshold to decide if tortoise activeset should be // picked from first block insted of synced data. MinerGoodAtxsPercent int `mapstructure:"miner-good-atxs-percent"` + + RegossipAtxInterval time.Duration `mapstructure:"regossip-atx-interval"` } type PublicMetrics struct { diff --git a/config/mainnet.go b/config/mainnet.go index b8472f8a76..03ebe7b927 100644 --- a/config/mainnet.go +++ b/config/mainnet.go @@ -66,6 +66,7 @@ func MainnetConfig() Config { "https://poet-110.spacemesh.network", "https://poet-111.spacemesh.network", }, + RegossipAtxInterval: 2 * time.Hour, }, Genesis: &GenesisConfig{ GenesisTime: "2023-07-14T08:00:00Z", diff --git a/config/presets/fastnet.go b/config/presets/fastnet.go index e7bf8d0d19..6e5e4f37a1 100644 --- a/config/presets/fastnet.go +++ b/config/presets/fastnet.go @@ -55,6 +55,7 @@ func fastnet() config.Config { conf.Sync.Interval = 5 * time.Second conf.Sync.GossipDuration = 10 * time.Second conf.LayersPerEpoch = 4 + conf.RegossipAtxInterval = 30 * time.Second conf.Tortoise.Hdist = 4 conf.Tortoise.Zdist = 2 diff --git a/config/presets/testnet.go b/config/presets/testnet.go index 2c7c404ba1..a646030e77 100644 --- a/config/presets/testnet.go +++ b/config/presets/testnet.go @@ -63,8 +63,9 @@ func testnet() config.Config { OptFilterThreshold: 90, - TickSize: 666514, - PoETServers: []string{}, + TickSize: 666514, + PoETServers: []string{}, + RegossipAtxInterval: time.Hour, }, Genesis: &config.GenesisConfig{ GenesisTime: "2023-09-13T18:00:00Z", diff --git a/node/node.go b/node/node.go index 9797f148ab..e21e5e1d73 100644 --- a/node/node.go +++ b/node/node.go @@ -654,6 +654,7 @@ func (app *App) initServices(ctx context.Context) error { fetcherWrapped := &layerFetcher{} atxHandler := activation.NewHandler( + app.host.ID(), app.cachedDB, app.edVerifier, app.clock, @@ -858,9 +859,10 @@ func (app *App) initServices(ctx context.Context) error { } builderConfig := activation.Config{ - CoinbaseAccount: coinbaseAddr, - GoldenATXID: goldenATXID, - LayersPerEpoch: layersPerEpoch, + CoinbaseAccount: coinbaseAddr, + GoldenATXID: goldenATXID, + LayersPerEpoch: layersPerEpoch, + RegossipInterval: app.Config.RegossipAtxInterval, } atxBuilder := activation.NewBuilder( builderConfig,