From b04cda07e248d5985f7cec3f801ebd3a095da1db Mon Sep 17 00:00:00 2001 From: kimmy lin <30611210+countvonzero@users.noreply.github.com> Date: Mon, 11 Sep 2023 19:28:12 -0700 Subject: [PATCH 1/9] prune data and vacuum --- blocks/generator.go | 15 ----- blocks/generator_test.go | 17 ----- cmd/root.go | 4 ++ config/config.go | 35 +++++----- config/presets/fastnet.go | 1 + mesh/janitor.go | 57 ++++++++++++++++ mesh/janitor_test.go | 96 +++++++++++++++++++++++++++ node/node.go | 46 +++++++++++-- sql/certificates/certs.go | 10 +++ sql/certificates/certs_test.go | 16 +++++ sql/transactions/transactions.go | 11 +++ sql/transactions/transactions_test.go | 31 +++++++++ sql/vacuum.go | 10 +++ 13 files changed, 297 insertions(+), 52 deletions(-) create mode 100644 mesh/janitor.go create mode 100644 mesh/janitor_test.go create mode 100644 sql/vacuum.go diff --git a/blocks/generator.go b/blocks/generator.go index b1aa78e40e..3d5e389bd9 100644 --- a/blocks/generator.go +++ b/blocks/generator.go @@ -135,20 +135,7 @@ func (g *Generator) Stop() { } } -func (g *Generator) pruneAsync() { - g.eg.Go(func() error { - lid := g.msh.ProcessedLayer() - start := time.Now() - if err := proposals.DeleteBefore(g.cdb, lid); err != nil { - g.logger.With().Error("failed to delete old proposals", lid, log.Err(err)) - } - deleteLatency.Observe(time.Since(start).Seconds()) - return nil - }) -} - func (g *Generator) run() error { - g.pruneAsync() var maxLayer types.LayerID for { select { @@ -180,12 +167,10 @@ func (g *Generator) run() error { if len(g.optimisticOutput) > 0 { g.processOptimisticLayers(maxLayer) } - g.pruneAsync() case <-time.After(g.cfg.GenBlockInterval): if len(g.optimisticOutput) > 0 { g.processOptimisticLayers(maxLayer) } - g.pruneAsync() } } } diff --git a/blocks/generator_test.go b/blocks/generator_test.go index 2ad8415bd0..678438589d 100644 --- a/blocks/generator_test.go +++ b/blocks/generator_test.go @@ -344,17 +344,7 @@ func Test_run(t *testing.T) { txIDs := createAndSaveTxs(t, numTXs, tg.cdb) signers, atxes := createATXs(t, tg.cdb, (layerID.GetEpoch() - 1).FirstLayer(), numProposals) activeSet := types.ToATXIDs(atxes) - // generate some proposals before this layer - oldest := layerID - 10 - for lid := oldest; lid < layerID; lid++ { - createProposals(t, tg.cdb, lid, types.EmptyLayerHash, signers, activeSet, txIDs) - } plist := createProposals(t, tg.cdb, layerID, meshHash, signers, activeSet, txIDs) - for lid := oldest; lid <= layerID; lid++ { - got, err := proposals.GetByLayer(tg.cdb, lid) - require.NoError(t, err) - require.Len(t, got, len(signers)) - } pids := types.ToProposalIDs(plist) tg.mockFetch.EXPECT().GetProposals(gomock.Any(), pids) @@ -413,13 +403,6 @@ func Test_run(t *testing.T) { tg.hareCh <- hare.LayerOutput{Ctx: context.Background(), Layer: layerID, Proposals: pids} require.Eventually(t, func() bool { return len(tg.hareCh) == 0 }, time.Second, 100*time.Millisecond) tg.Stop() - for lid := oldest; lid < layerID; lid++ { - _, err := proposals.GetByLayer(tg.cdb, lid) - require.ErrorIs(t, err, sql.ErrNotFound) - } - got, err := proposals.GetByLayer(tg.cdb, layerID) - require.NoError(t, err) - require.Len(t, got, len(signers)) }) } } diff --git a/cmd/root.go b/cmd/root.go index 253591679a..54acea63b0 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -78,6 +78,10 @@ func AddCommands(cmd *cobra.Command) { cfg.DatabaseConnections, "configure number of active connections to enable parallel read requests") cmd.PersistentFlags().BoolVar(&cfg.DatabaseLatencyMetering, "db-latency-metering", cfg.DatabaseLatencyMetering, "if enabled collect latency histogram for every database query") + cmd.PersistentFlags().BoolVar(&cfg.DatabaseCompactState, "db-compact-state", + cfg.DatabaseCompactState, "if enabled compact database on startup") + cmd.PersistentFlags().DurationVar(&cfg.DatabasePruneInterval, "db-prune-interval", + cfg.DatabasePruneInterval, "configure interval for database pruning") /** ======================== P2P Flags ========================== **/ diff --git a/config/config.go b/config/config.go index b53710e4f9..c2fd1c535f 100644 --- a/config/config.go +++ b/config/config.go @@ -108,8 +108,10 @@ type BaseConfig struct { OptFilterThreshold int `mapstructure:"optimistic-filtering-threshold"` TickSize uint64 `mapstructure:"tick-size"` - DatabaseConnections int `mapstructure:"db-connections"` - DatabaseLatencyMetering bool `mapstructure:"db-latency-metering"` + DatabaseConnections int `mapstructure:"db-connections"` + DatabaseLatencyMetering bool `mapstructure:"db-latency-metering"` + DatabaseCompactState bool `mapstructure:"db-compact-state"` + DatabasePruneInterval time.Duration `mapstructure:"db-prune-interval"` NetworkHRP string `mapstructure:"network-hrp"` @@ -173,20 +175,21 @@ func DefaultTestConfig() Config { // DefaultBaseConfig returns a default configuration for spacemesh. func defaultBaseConfig() BaseConfig { return BaseConfig{ - DataDirParent: defaultDataDir, - FileLock: filepath.Join(os.TempDir(), "spacemesh.lock"), - CollectMetrics: false, - MetricsPort: 1010, - ProfilerName: "gp-spacemesh", - LayerDuration: 30 * time.Second, - LayersPerEpoch: 3, - PoETServers: []string{"127.0.0.1"}, - TxsPerProposal: 100, - BlockGasLimit: math.MaxUint64, - OptFilterThreshold: 90, - TickSize: 100, - DatabaseConnections: 16, - NetworkHRP: "sm", + DataDirParent: defaultDataDir, + FileLock: filepath.Join(os.TempDir(), "spacemesh.lock"), + CollectMetrics: false, + MetricsPort: 1010, + ProfilerName: "gp-spacemesh", + LayerDuration: 30 * time.Second, + LayersPerEpoch: 3, + PoETServers: []string{"127.0.0.1"}, + TxsPerProposal: 100, + BlockGasLimit: math.MaxUint64, + OptFilterThreshold: 90, + TickSize: 100, + DatabaseConnections: 16, + DatabasePruneInterval: 30 * time.Minute, + NetworkHRP: "sm", } } diff --git a/config/presets/fastnet.go b/config/presets/fastnet.go index b7d4ad726c..e7bf8d0d19 100644 --- a/config/presets/fastnet.go +++ b/config/presets/fastnet.go @@ -22,6 +22,7 @@ func fastnet() config.Config { conf.NetworkHRP = "stest" types.SetNetworkHRP(conf.NetworkHRP) // set to generate coinbase conf.BaseConfig.OptFilterThreshold = 90 + conf.BaseConfig.DatabasePruneInterval = time.Minute // set for systest TestEquivocation conf.BaseConfig.MinerGoodAtxsPercent = 50 diff --git a/mesh/janitor.go b/mesh/janitor.go new file mode 100644 index 0000000000..30fad37c72 --- /dev/null +++ b/mesh/janitor.go @@ -0,0 +1,57 @@ +package mesh + +import ( + "context" + "time" + + "go.uber.org/zap" + + "github.com/spacemeshos/go-spacemesh/common/types" + "github.com/spacemeshos/go-spacemesh/sql" + "github.com/spacemeshos/go-spacemesh/sql/certificates" + "github.com/spacemeshos/go-spacemesh/sql/proposals" + "github.com/spacemeshos/go-spacemesh/sql/transactions" +) + +func Prune( + ctx context.Context, + logger *zap.Logger, + db sql.Executor, + lc layerClock, + safeDist uint32, + interval time.Duration, +) { + logger.With().Info("db pruning launched", + zap.Uint32("dist", safeDist), + zap.Duration("interval", interval), + ) + for { + select { + case <-ctx.Done(): + return + case <-time.After(interval): + oldest := lc.CurrentLayer() - types.LayerID(safeDist) + if err := proposals.Delete(db, oldest); err != nil { + logger.Error("failed to delete proposals", + zap.Stringer("lid", oldest), + zap.Error(err), + ) + } + logger.Info("proposals pruned", zap.Stringer("lid", oldest)) + if err := certificates.DeleteCert(db, oldest); err != nil { + logger.Error("failed to delete certificates", + zap.Stringer("lid", oldest), + zap.Error(err), + ) + } + logger.Info("certificates pruned", zap.Stringer("lid", oldest)) + if err := transactions.DeleteProposalTxs(db, oldest); err != nil { + logger.Error("failed to delete proposal tx mapping", + zap.Stringer("lid", oldest), + zap.Error(err), + ) + } + logger.Info("proposal tx pruned", zap.Stringer("lid", oldest)) + } + } +} diff --git a/mesh/janitor_test.go b/mesh/janitor_test.go new file mode 100644 index 0000000000..2d10472999 --- /dev/null +++ b/mesh/janitor_test.go @@ -0,0 +1,96 @@ +package mesh + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" + "go.uber.org/mock/gomock" + "golang.org/x/sync/errgroup" + + "github.com/spacemeshos/go-spacemesh/common/types" + "github.com/spacemeshos/go-spacemesh/log/logtest" + "github.com/spacemeshos/go-spacemesh/mesh/mocks" + "github.com/spacemeshos/go-spacemesh/sql" + "github.com/spacemeshos/go-spacemesh/sql/ballots" + "github.com/spacemeshos/go-spacemesh/sql/certificates" + "github.com/spacemeshos/go-spacemesh/sql/proposals" + "github.com/spacemeshos/go-spacemesh/sql/transactions" +) + +func TestPrune(t *testing.T) { + db := sql.InMemory() + current := types.LayerID(10) + mc := mocks.NewMocklayerClock(gomock.NewController(t)) + done := make(chan struct{}) + count := 0 + mc.EXPECT().CurrentLayer().DoAndReturn(func() types.LayerID { + if count == 0 { + close(done) + } + count++ + return current + }).AnyTimes() + lyrProps := make([]*types.Proposal, 0, current) + for lid := types.LayerID(0); lid < current; lid++ { + blt := types.NewExistingBallot(types.RandomBallotID(), types.RandomEdSignature(), types.NodeID{1}, lid) + require.NoError(t, ballots.Add(db, &blt)) + p := &types.Proposal{ + InnerProposal: types.InnerProposal{ + Ballot: blt, + TxIDs: []types.TransactionID{types.RandomTransactionID(), types.RandomTransactionID()}, + }, + Signature: types.RandomEdSignature(), + } + p.SetID(types.RandomProposalID()) + require.NoError(t, proposals.Add(db, p)) + require.NoError(t, certificates.Add(db, lid, &types.Certificate{BlockID: types.RandomBlockID()})) + for _, tid := range p.TxIDs { + require.NoError(t, transactions.AddToProposal(db, tid, lid, p.ID())) + } + lyrProps = append(lyrProps, p) + } + confidenceDist := uint32(3) + ctx, cancel := context.WithCancel(context.Background()) + var eg errgroup.Group + eg.Go(func() error { + Prune(ctx, logtest.New(t).Zap(), db, mc, confidenceDist, time.Millisecond) + return nil + }) + require.Eventually(t, func() bool { + select { + case <-done: + oldest := current - types.LayerID(confidenceDist) + for lid := types.LayerID(0); lid < oldest; lid++ { + _, err := certificates.CertifiedBlock(db, lid) + require.ErrorIs(t, err, sql.ErrNotFound) + _, err = proposals.GetByLayer(db, lid) + require.ErrorIs(t, err, sql.ErrNotFound) + for _, tid := range lyrProps[lid].TxIDs { + exists, err := transactions.HasProposalTX(db, lyrProps[lid].ID(), tid) + require.NoError(t, err) + require.False(t, exists) + } + } + for lid := oldest; lid < current; lid++ { + got, err := certificates.CertifiedBlock(db, lid) + require.NoError(t, err) + require.NotEqual(t, types.EmptyBlockID, got) + pps, err := proposals.GetByLayer(db, lid) + require.NoError(t, err) + require.NotEmpty(t, pps) + for _, tid := range lyrProps[lid].TxIDs { + exists, err := transactions.HasProposalTX(db, lyrProps[lid].ID(), tid) + require.NoError(t, err) + require.True(t, exists) + } + } + return true + default: + return false + } + }, time.Second, 10*time.Millisecond) + cancel() + eg.Wait() +} diff --git a/node/node.go b/node/node.go index 4d3c27d4cd..17d26f0597 100644 --- a/node/node.go +++ b/node/node.go @@ -165,6 +165,12 @@ func GetCommand() *cobra.Command { return err } + if app.Config.DatabaseCompactState { + if err := app.compactDB(); err != nil { + return err + } + } + // This blocks until the context is finished or until an error is produced err = app.Start(ctx) cleanupCtx, cleanupCancel := context.WithTimeout(context.Background(), 30*time.Second) @@ -638,11 +644,17 @@ func (app *App) initServices(ctx context.Context) error { }) executor := mesh.NewExecutor(app.cachedDB, state, app.conState, app.addLogger(ExecutorLogger, lg)) - msh, err := mesh.NewMesh(app.cachedDB, app.clock, trtl, executor, app.conState, app.addLogger(MeshLogger, lg)) + mlog := app.addLogger(MeshLogger, lg) + msh, err := mesh.NewMesh(app.cachedDB, app.clock, trtl, executor, app.conState, mlog) if err != nil { return fmt.Errorf("failed to create mesh: %w", err) } + app.eg.Go(func() error { + mesh.Prune(ctx, mlog.Zap(), app.db, app.clock, app.Config.HareEligibility.ConfidenceParam, app.Config.DatabasePruneInterval) + return nil + }) + fetcherWrapped := &layerFetcher{} atxHandler := activation.NewHandler( app.cachedDB, @@ -1322,7 +1334,34 @@ func (app *App) LoadOrCreateEdSigner() (*signing.EdSigner, error) { return edSgn, nil } -func (app *App) setupDBs(ctx context.Context, lg log.Log, dbPath string) error { +func (app *App) compactDB() error { + dbPath, err := filepath.Abs(filepath.Join(app.Config.DataDir(), dbFile)) + if err != nil { + return fmt.Errorf("compact abs: %w", err) + } + if _, err := os.Stat(dbPath); err != nil { + if os.IsNotExist(err) { + return nil + } + return fmt.Errorf("compact stat: %w", err) + } + sqldb, err := sql.Open("file:" + dbPath) + if err != nil { + return fmt.Errorf("compact open %w", err) + } + app.log.With().Info("starting db compaction") + if err := sql.VacuumDB(sqldb); err != nil { + return fmt.Errorf("compact vacuum: %w", err) + } + if err := sqldb.Close(); err != nil { + return fmt.Errorf("compact close: %w", err) + } + app.log.With().Info("finished db compaction") + return nil +} + +func (app *App) setupDBs(ctx context.Context, lg log.Log) error { + dbPath := app.Config.DataDir() if err := os.MkdirAll(dbPath, os.ModePerm); err != nil { return fmt.Errorf("failed to create %s: %w", dbPath, err) } @@ -1407,7 +1446,6 @@ func (app *App) startSynchronous(ctx context.Context) (err error) { } if app.Config.ProfilerURL != "" { - app.profilerService, err = pyroscope.Start(pyroscope.Config{ ApplicationName: app.Config.ProfilerName, // app.Config.ProfilerURL should be the pyroscope server address @@ -1456,7 +1494,7 @@ func (app *App) startSynchronous(ctx context.Context) (err error) { return fmt.Errorf("failed to initialize p2p host: %w", err) } - if err := app.setupDBs(ctx, lg, app.Config.DataDir()); err != nil { + if err := app.setupDBs(ctx, lg); err != nil { return err } if err := app.initServices(ctx); err != nil { diff --git a/sql/certificates/certs.go b/sql/certificates/certs.go index 9ec75165d0..5ee460cb3b 100644 --- a/sql/certificates/certs.go +++ b/sql/certificates/certs.go @@ -164,3 +164,13 @@ func SetInvalid(db sql.Executor, lid types.LayerID, bid types.BlockID) error { } return nil } + +func DeleteCert(db sql.Executor, lid types.LayerID) error { + if _, err := db.Exec(`update certificates set cert = null where layer < ?1;`, + func(stmt *sql.Statement) { + stmt.BindInt64(1, int64(lid)) + }, nil); err != nil { + return fmt.Errorf("delete %s: %w", lid, err) + } + return nil +} diff --git a/sql/certificates/certs_test.go b/sql/certificates/certs_test.go index b345de33bf..ebf69d79ae 100644 --- a/sql/certificates/certs_test.go +++ b/sql/certificates/certs_test.go @@ -160,6 +160,22 @@ func TestCertifiedBlock(t *testing.T) { require.Equal(t, types.EmptyBlockID, got) } +func TestDeleteCert(t *testing.T) { + db := sql.InMemory() + require.NoError(t, Add(db, types.LayerID(2), &types.Certificate{BlockID: types.BlockID{2}})) + require.NoError(t, Add(db, types.LayerID(3), &types.Certificate{BlockID: types.BlockID{3}})) + require.NoError(t, Add(db, types.LayerID(4), &types.Certificate{BlockID: types.BlockID{4}})) + require.NoError(t, DeleteCert(db, 4)) + + _, err := CertifiedBlock(db, types.LayerID(2)) + require.ErrorIs(t, err, sql.ErrNotFound) + _, err = CertifiedBlock(db, types.LayerID(3)) + require.ErrorIs(t, err, sql.ErrNotFound) + got, err := CertifiedBlock(db, types.LayerID(4)) + require.NoError(t, err) + require.Equal(t, types.BlockID{4}, got) +} + func TestFirstInEpoch(t *testing.T) { db := sql.InMemory() diff --git a/sql/transactions/transactions.go b/sql/transactions/transactions.go index 4c7e654107..785b1fae77 100644 --- a/sql/transactions/transactions.go +++ b/sql/transactions/transactions.go @@ -59,6 +59,17 @@ func AddToProposal(db sql.Executor, tid types.TransactionID, lid types.LayerID, return nil } +func DeleteProposalTxs(db sql.Executor, lid types.LayerID) error { + if _, err := db.Exec(` + delete from proposal_transactions where layer < ?1;`, + func(stmt *sql.Statement) { + stmt.BindInt64(1, int64(lid)) + }, nil); err != nil { + return fmt.Errorf("DeleteProposalTxs %d: %w", lid, err) + } + return nil +} + // HasProposalTX returns true if the given transaction is included in the given proposal. func HasProposalTX(db sql.Executor, pid types.ProposalID, tid types.TransactionID) (bool, error) { rows, err := db.Exec("select 1 from proposal_transactions where pid = ?1 and tid = ?2", diff --git a/sql/transactions/transactions_test.go b/sql/transactions/transactions_test.go index 28a8070ea5..d2b3f0e684 100644 --- a/sql/transactions/transactions_test.go +++ b/sql/transactions/transactions_test.go @@ -154,6 +154,37 @@ func TestAddToProposal(t *testing.T) { require.False(t, has) } +func TestDeleteProposalTxs(t *testing.T) { + db := sql.InMemory() + proposals := map[types.LayerID][]types.ProposalID{ + types.LayerID(10): {{1, 1}, {1, 2}}, + types.LayerID(11): {{2, 1}, {2, 2}}, + } + tids := []types.TransactionID{{1, 2}, {2, 3}} + for lid, pids := range proposals { + for _, tid := range tids { + for _, pid := range pids { + require.NoError(t, transactions.AddToProposal(db, tid, lid, pid)) + } + } + } + require.NoError(t, transactions.DeleteProposalTxs(db, types.LayerID(11))) + for _, pid := range proposals[types.LayerID(10)] { + for _, tid := range tids { + has, err := transactions.HasProposalTX(db, pid, tid) + require.NoError(t, err) + require.False(t, has) + } + } + for _, pid := range proposals[types.LayerID(11)] { + for _, tid := range tids { + has, err := transactions.HasProposalTX(db, pid, tid) + require.NoError(t, err) + require.True(t, has) + } + } +} + func TestAddToBlock(t *testing.T) { db := sql.InMemory() diff --git a/sql/vacuum.go b/sql/vacuum.go new file mode 100644 index 0000000000..877d7aadeb --- /dev/null +++ b/sql/vacuum.go @@ -0,0 +1,10 @@ +package sql + +import "fmt" + +func VacuumDB(db Executor) error { + if _, err := db.Exec("vacuum", nil, nil); err != nil { + return fmt.Errorf("vacuum %w", err) + } + return nil +} From e09bee20eba920829d97f6090afc8578f0be8f47 Mon Sep 17 00:00:00 2001 From: kimmy lin <30611210+countvonzero@users.noreply.github.com> Date: Mon, 11 Sep 2023 19:52:02 -0700 Subject: [PATCH 2/9] set mainnet config --- config/mainnet.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/config/mainnet.go b/config/mainnet.go index cf9e01ca7a..9b28038c6e 100644 --- a/config/mainnet.go +++ b/config/mainnet.go @@ -42,11 +42,12 @@ func MainnetConfig() Config { logging.TrtlLoggerLevel = zapcore.WarnLevel.String() return Config{ BaseConfig: BaseConfig{ - DataDirParent: defaultDataDir, - FileLock: filepath.Join(os.TempDir(), "spacemesh.lock"), - MetricsPort: 1010, - DatabaseConnections: 16, - NetworkHRP: "sm", + DataDirParent: defaultDataDir, + FileLock: filepath.Join(os.TempDir(), "spacemesh.lock"), + MetricsPort: 1010, + DatabaseConnections: 16, + DatabasePruneInterval: 30 * time.Minute, + NetworkHRP: "sm", LayerDuration: 5 * time.Minute, LayerAvgSize: 50, From 7f6d1532d4f3e10ad7e8c9b5c00e1dd46fd93e60 Mon Sep 17 00:00:00 2001 From: kimmy lin <30611210+countvonzero@users.noreply.github.com> Date: Tue, 12 Sep 2023 20:10:05 -0700 Subject: [PATCH 3/9] use free page count as vacuum criteria --- cmd/root.go | 4 ++-- config/config.go | 2 +- node/node.go | 26 +++++++++++++++++----- sql/vacuum.go | 10 --------- sql/vacuum/vacuum.go | 29 ++++++++++++++++++++++++ sql/vacuum/vacuum_test.go | 47 +++++++++++++++++++++++++++++++++++++++ 6 files changed, 99 insertions(+), 19 deletions(-) delete mode 100644 sql/vacuum.go create mode 100644 sql/vacuum/vacuum.go create mode 100644 sql/vacuum/vacuum_test.go diff --git a/cmd/root.go b/cmd/root.go index 54acea63b0..c067d02412 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -78,8 +78,8 @@ func AddCommands(cmd *cobra.Command) { cfg.DatabaseConnections, "configure number of active connections to enable parallel read requests") cmd.PersistentFlags().BoolVar(&cfg.DatabaseLatencyMetering, "db-latency-metering", cfg.DatabaseLatencyMetering, "if enabled collect latency histogram for every database query") - cmd.PersistentFlags().BoolVar(&cfg.DatabaseCompactState, "db-compact-state", - cfg.DatabaseCompactState, "if enabled compact database on startup") + cmd.PersistentFlags().IntVar(&cfg.DatabaseCompactStatePct, "db-compact-state-pct", + cfg.DatabaseCompactStatePct, "configure the threshold to trigger state compaction on startup") cmd.PersistentFlags().DurationVar(&cfg.DatabasePruneInterval, "db-prune-interval", cfg.DatabasePruneInterval, "configure interval for database pruning") diff --git a/config/config.go b/config/config.go index c2fd1c535f..9c296422e5 100644 --- a/config/config.go +++ b/config/config.go @@ -110,7 +110,7 @@ type BaseConfig struct { DatabaseConnections int `mapstructure:"db-connections"` DatabaseLatencyMetering bool `mapstructure:"db-latency-metering"` - DatabaseCompactState bool `mapstructure:"db-compact-state"` + DatabaseCompactStatePct int `mapstructure:"db-compact-state-pct"` DatabasePruneInterval time.Duration `mapstructure:"db-prune-interval"` NetworkHRP string `mapstructure:"network-hrp"` diff --git a/node/node.go b/node/node.go index 17d26f0597..aae13010e9 100644 --- a/node/node.go +++ b/node/node.go @@ -67,6 +67,7 @@ import ( "github.com/spacemeshos/go-spacemesh/sql" "github.com/spacemeshos/go-spacemesh/sql/layers" dbmetrics "github.com/spacemeshos/go-spacemesh/sql/metrics" + "github.com/spacemeshos/go-spacemesh/sql/vacuum" "github.com/spacemeshos/go-spacemesh/syncer" "github.com/spacemeshos/go-spacemesh/syncer/blockssync" "github.com/spacemeshos/go-spacemesh/system" @@ -165,8 +166,8 @@ func GetCommand() *cobra.Command { return err } - if app.Config.DatabaseCompactState { - if err := app.compactDB(); err != nil { + if app.Config.DatabaseCompactStatePct > 0 { + if err := app.compactDB(app.Config.DatabaseCompactStatePct); err != nil { return err } } @@ -1334,7 +1335,7 @@ func (app *App) LoadOrCreateEdSigner() (*signing.EdSigner, error) { return edSgn, nil } -func (app *App) compactDB() error { +func (app *App) compactDB(minPct int) error { dbPath, err := filepath.Abs(filepath.Join(app.Config.DataDir(), dbFile)) if err != nil { return fmt.Errorf("compact abs: %w", err) @@ -1349,14 +1350,27 @@ func (app *App) compactDB() error { if err != nil { return fmt.Errorf("compact open %w", err) } - app.log.With().Info("starting db compaction") - if err := sql.VacuumDB(sqldb); err != nil { + freePct, err := vacuum.FreePct(sqldb) + if err != nil { + return fmt.Errorf("compact check free pct: %w", err) + } + if freePct < minPct { + app.log.With().Info("db free space less that configured. not compacting", + log.Int("configured", minPct), + log.Int("actual", freePct), + ) + } + app.log.With().Info("starting db compaction", + log.Int("configured", minPct), + log.Int("actual", freePct), + ) + if err := vacuum.VacuumDB(sqldb); err != nil { return fmt.Errorf("compact vacuum: %w", err) } if err := sqldb.Close(); err != nil { return fmt.Errorf("compact close: %w", err) } - app.log.With().Info("finished db compaction") + app.log.Info("finished db compaction") return nil } diff --git a/sql/vacuum.go b/sql/vacuum.go deleted file mode 100644 index 877d7aadeb..0000000000 --- a/sql/vacuum.go +++ /dev/null @@ -1,10 +0,0 @@ -package sql - -import "fmt" - -func VacuumDB(db Executor) error { - if _, err := db.Exec("vacuum", nil, nil); err != nil { - return fmt.Errorf("vacuum %w", err) - } - return nil -} diff --git a/sql/vacuum/vacuum.go b/sql/vacuum/vacuum.go new file mode 100644 index 0000000000..0b56a98db0 --- /dev/null +++ b/sql/vacuum/vacuum.go @@ -0,0 +1,29 @@ +package vacuum + +import ( + "fmt" + + "github.com/spacemeshos/go-spacemesh/sql" +) + +func VacuumDB(db sql.Executor) error { + if _, err := db.Exec("vacuum", nil, nil); err != nil { + return fmt.Errorf("vacuum %w", err) + } + if _, err := db.Exec("pragma wal_checkpoint(TRUNCATE)", nil, nil); err != nil { + return fmt.Errorf("wal checkpoint %w", err) + } + return nil +} + +func FreePct(db sql.Executor) (int, error) { + var pct int + if _, err := db.Exec(`SELECT freelist_count, page_count FROM pragma_freelist_count(), pragma_page_count();`, + nil, func(stmt *sql.Statement) bool { + pct = 100 * stmt.ColumnInt(0) / stmt.ColumnInt(1) + return false + }); err != nil { + return 0, fmt.Errorf("db page size %w", err) + } + return pct, nil +} diff --git a/sql/vacuum/vacuum_test.go b/sql/vacuum/vacuum_test.go new file mode 100644 index 0000000000..ef6071474e --- /dev/null +++ b/sql/vacuum/vacuum_test.go @@ -0,0 +1,47 @@ +package vacuum + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/spacemeshos/go-spacemesh/common/types" + "github.com/spacemeshos/go-spacemesh/sql" + "github.com/spacemeshos/go-spacemesh/sql/ballots" + "github.com/spacemeshos/go-spacemesh/sql/proposals" +) + +func TestVacuumDB(t *testing.T) { + db := sql.InMemory() + require.NoError(t, VacuumDB(db)) +} + +func TestFreedPct(t *testing.T) { + db := sql.InMemory() + + got, err := FreePct(db) + require.NoError(t, err) + require.Equal(t, 0, got) + + for i := 0; i < 100; i++ { + ballot := types.NewExistingBallot(types.RandomBallotID(), types.RandomEdSignature(), types.NodeID{1}, types.LayerID(i)) + proposal := &types.Proposal{ + InnerProposal: types.InnerProposal{ + Ballot: ballot, + }, + Signature: types.RandomEdSignature(), + } + proposal.SetID(types.RandomProposalID()) + require.NoError(t, ballots.Add(db, &ballot)) + require.NoError(t, proposals.Add(db, proposal)) + } + + got, err = FreePct(db) + require.NoError(t, err) + require.Equal(t, 0, got) + + require.NoError(t, proposals.Delete(db, 80)) + got, err = FreePct(db) + require.NoError(t, err) + require.Greater(t, got, 10) +} From 1be203803312d6d7e8f7621ab51881dfd4fe384c Mon Sep 17 00:00:00 2001 From: kimmy lin <30611210+countvonzero@users.noreply.github.com> Date: Thu, 14 Sep 2023 00:16:24 -0700 Subject: [PATCH 4/9] code review feedback --- cmd/root.go | 2 -- config/config.go | 1 - mesh/janitor.go | 10 ++++++--- mesh/metrics/prometheus.go | 25 ++++++++++++++++----- node/node.go | 46 -------------------------------------- 5 files changed, 26 insertions(+), 58 deletions(-) diff --git a/cmd/root.go b/cmd/root.go index c067d02412..8c7142a4ab 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -78,8 +78,6 @@ func AddCommands(cmd *cobra.Command) { cfg.DatabaseConnections, "configure number of active connections to enable parallel read requests") cmd.PersistentFlags().BoolVar(&cfg.DatabaseLatencyMetering, "db-latency-metering", cfg.DatabaseLatencyMetering, "if enabled collect latency histogram for every database query") - cmd.PersistentFlags().IntVar(&cfg.DatabaseCompactStatePct, "db-compact-state-pct", - cfg.DatabaseCompactStatePct, "configure the threshold to trigger state compaction on startup") cmd.PersistentFlags().DurationVar(&cfg.DatabasePruneInterval, "db-prune-interval", cfg.DatabasePruneInterval, "configure interval for database pruning") diff --git a/config/config.go b/config/config.go index 9c296422e5..b0795bfbc5 100644 --- a/config/config.go +++ b/config/config.go @@ -110,7 +110,6 @@ type BaseConfig struct { DatabaseConnections int `mapstructure:"db-connections"` DatabaseLatencyMetering bool `mapstructure:"db-latency-metering"` - DatabaseCompactStatePct int `mapstructure:"db-compact-state-pct"` DatabasePruneInterval time.Duration `mapstructure:"db-prune-interval"` NetworkHRP string `mapstructure:"network-hrp"` diff --git a/mesh/janitor.go b/mesh/janitor.go index 30fad37c72..fa14a60ab8 100644 --- a/mesh/janitor.go +++ b/mesh/janitor.go @@ -7,6 +7,7 @@ import ( "go.uber.org/zap" "github.com/spacemeshos/go-spacemesh/common/types" + "github.com/spacemeshos/go-spacemesh/mesh/metrics" "github.com/spacemeshos/go-spacemesh/sql" "github.com/spacemeshos/go-spacemesh/sql/certificates" "github.com/spacemeshos/go-spacemesh/sql/proposals" @@ -31,27 +32,30 @@ func Prune( return case <-time.After(interval): oldest := lc.CurrentLayer() - types.LayerID(safeDist) + t0 := time.Now() if err := proposals.Delete(db, oldest); err != nil { logger.Error("failed to delete proposals", zap.Stringer("lid", oldest), zap.Error(err), ) } - logger.Info("proposals pruned", zap.Stringer("lid", oldest)) + metrics.PruneProposalLatency.Observe(time.Since(t0).Seconds()) + t1 := time.Now() if err := certificates.DeleteCert(db, oldest); err != nil { logger.Error("failed to delete certificates", zap.Stringer("lid", oldest), zap.Error(err), ) } - logger.Info("certificates pruned", zap.Stringer("lid", oldest)) + metrics.PruneCertLatency.Observe(time.Since(t1).Seconds()) + t2 := time.Now() if err := transactions.DeleteProposalTxs(db, oldest); err != nil { logger.Error("failed to delete proposal tx mapping", zap.Stringer("lid", oldest), zap.Error(err), ) } - logger.Info("proposal tx pruned", zap.Stringer("lid", oldest)) + metrics.PrunePropTxLatency.Observe(time.Since(t2).Seconds()) } } } diff --git a/mesh/metrics/prometheus.go b/mesh/metrics/prometheus.go index 6c46fb6499..2ef85ff42b 100644 --- a/mesh/metrics/prometheus.go +++ b/mesh/metrics/prometheus.go @@ -12,10 +12,23 @@ const ( ) // LayerNumBlocks is number of blocks in layer. -var LayerNumBlocks = metrics.NewHistogramWithBuckets( - "layer_num_blocks", - Subsystem, - "Number of blocks in layer", - []string{}, - prometheus.ExponentialBuckets(1, 2, 16), +var ( + LayerNumBlocks = metrics.NewHistogramWithBuckets( + "layer_num_blocks", + Subsystem, + "Number of blocks in layer", + []string{}, + prometheus.ExponentialBuckets(1, 2, 16), + ) + + pruneLatency = metrics.NewHistogramWithBuckets( + "prune_seconds", + Subsystem, + "prune time in seconds", + []string{"step"}, + prometheus.ExponentialBuckets(0.01, 2, 10), + ) + PruneProposalLatency = pruneLatency.WithLabelValues("proposal") + PruneCertLatency = pruneLatency.WithLabelValues("cert") + PrunePropTxLatency = pruneLatency.WithLabelValues("proptxs") ) diff --git a/node/node.go b/node/node.go index aae13010e9..8ba5f19208 100644 --- a/node/node.go +++ b/node/node.go @@ -67,7 +67,6 @@ import ( "github.com/spacemeshos/go-spacemesh/sql" "github.com/spacemeshos/go-spacemesh/sql/layers" dbmetrics "github.com/spacemeshos/go-spacemesh/sql/metrics" - "github.com/spacemeshos/go-spacemesh/sql/vacuum" "github.com/spacemeshos/go-spacemesh/syncer" "github.com/spacemeshos/go-spacemesh/syncer/blockssync" "github.com/spacemeshos/go-spacemesh/system" @@ -166,12 +165,6 @@ func GetCommand() *cobra.Command { return err } - if app.Config.DatabaseCompactStatePct > 0 { - if err := app.compactDB(app.Config.DatabaseCompactStatePct); err != nil { - return err - } - } - // This blocks until the context is finished or until an error is produced err = app.Start(ctx) cleanupCtx, cleanupCancel := context.WithTimeout(context.Background(), 30*time.Second) @@ -1335,45 +1328,6 @@ func (app *App) LoadOrCreateEdSigner() (*signing.EdSigner, error) { return edSgn, nil } -func (app *App) compactDB(minPct int) error { - dbPath, err := filepath.Abs(filepath.Join(app.Config.DataDir(), dbFile)) - if err != nil { - return fmt.Errorf("compact abs: %w", err) - } - if _, err := os.Stat(dbPath); err != nil { - if os.IsNotExist(err) { - return nil - } - return fmt.Errorf("compact stat: %w", err) - } - sqldb, err := sql.Open("file:" + dbPath) - if err != nil { - return fmt.Errorf("compact open %w", err) - } - freePct, err := vacuum.FreePct(sqldb) - if err != nil { - return fmt.Errorf("compact check free pct: %w", err) - } - if freePct < minPct { - app.log.With().Info("db free space less that configured. not compacting", - log.Int("configured", minPct), - log.Int("actual", freePct), - ) - } - app.log.With().Info("starting db compaction", - log.Int("configured", minPct), - log.Int("actual", freePct), - ) - if err := vacuum.VacuumDB(sqldb); err != nil { - return fmt.Errorf("compact vacuum: %w", err) - } - if err := sqldb.Close(); err != nil { - return fmt.Errorf("compact close: %w", err) - } - app.log.Info("finished db compaction") - return nil -} - func (app *App) setupDBs(ctx context.Context, lg log.Log) error { dbPath := app.Config.DataDir() if err := os.MkdirAll(dbPath, os.ModePerm); err != nil { From 98b24de2eddc751f74c0449789621db41d45deea Mon Sep 17 00:00:00 2001 From: kimmy lin <30611210+countvonzero@users.noreply.github.com> Date: Thu, 14 Sep 2023 00:16:31 -0700 Subject: [PATCH 5/9] add db migrations --- sql/database.go | 21 ++++++--- sql/functions.go | 19 ++++++++ sql/migrations.go | 21 ++++++--- .../{0003_next.sql => 0003_v1.1.5.sql} | 0 sql/migrations/0004_next.sql | 5 ++ sql/migrations_test.go | 2 +- sql/vacuum.go | 15 ++++++ sql/vacuum/vacuum.go | 29 ------------ sql/vacuum/vacuum_test.go | 47 ------------------- sql/vacuum_test.go | 12 +++++ 10 files changed, 81 insertions(+), 90 deletions(-) rename sql/migrations/{0003_next.sql => 0003_v1.1.5.sql} (100%) create mode 100644 sql/migrations/0004_next.sql create mode 100644 sql/vacuum.go delete mode 100644 sql/vacuum/vacuum.go delete mode 100644 sql/vacuum/vacuum_test.go create mode 100644 sql/vacuum_test.go diff --git a/sql/database.go b/sql/database.go index df95109e6f..732f9fd10f 100644 --- a/sql/database.go +++ b/sql/database.go @@ -115,7 +115,18 @@ func Open(uri string, opts ...Opt) (*Database, error) { if config.enableLatency { db.latency = newQueryLatency() } + for i := 0; i < config.connections; i++ { + conn := pool.Get(context.Background()) + if err := registerFunctions(conn); err != nil { + return nil, err + } + pool.Put(conn) + } if config.migrations != nil { + before, err := version(db) + if err != nil { + return nil, err + } tx, err := db.Tx(context.Background()) if err != nil { return nil, err @@ -128,13 +139,11 @@ func Open(uri string, opts ...Opt) (*Database, error) { if err != nil { return nil, err } - } - for i := 0; i < config.connections; i++ { - conn := pool.Get(context.Background()) - if err := registerFunctions(conn); err != nil { - return nil, err + if before == 3 { + if err := Vacuum(db); err != nil { + return nil, err + } } - defer pool.Put(conn) } return db, nil } diff --git a/sql/functions.go b/sql/functions.go index 07f2c21c85..fb2ce92b24 100644 --- a/sql/functions.go +++ b/sql/functions.go @@ -4,6 +4,9 @@ import ( "fmt" sqlite "github.com/go-llsqlite/crawshaw" + + "github.com/spacemeshos/go-spacemesh/codec" + "github.com/spacemeshos/go-spacemesh/common/types" ) func registerFunctions(conn *sqlite.Conn) error { @@ -16,5 +19,21 @@ func registerFunctions(conn *sqlite.Conn) error { }, nil, nil); err != nil { return fmt.Errorf("registering add_uint64: %w", err) } + // function to prune active set from old ballots + if err := conn.CreateFunction("prune_actives", true, 1, func(ctx sqlite.Context, values ...sqlite.Value) { + var ballot types.Ballot + if err := codec.Decode(values[0].Blob(), &ballot); err != nil { + ctx.ResultError(err) + } else { + ballot.ActiveSet = nil + if blob, err := codec.Encode(&ballot); err != nil { + ctx.ResultError(err) + } else { + ctx.ResultBlob(blob) + } + } + }, nil, nil); err != nil { + return fmt.Errorf("registering prune_actives: %w", err) + } return nil } diff --git a/sql/migrations.go b/sql/migrations.go index c6bf08f9fe..aeec91e0bd 100644 --- a/sql/migrations.go +++ b/sql/migrations.go @@ -23,6 +23,17 @@ type migration struct { // Migrations is interface for migrations provider. type Migrations func(Executor) error +func version(db Executor) (int, error) { + var current int + if _, err := db.Exec("PRAGMA user_version;", nil, func(stmt *Statement) bool { + current = stmt.ColumnInt(0) + return true + }); err != nil { + return 0, fmt.Errorf("read user_version %w", err) + } + return current, nil +} + func embeddedMigrations(db Executor) error { var migrations []migration fs.WalkDir(embedded, "migrations", func(path string, d fs.DirEntry, err error) error { @@ -62,13 +73,9 @@ func embeddedMigrations(db Executor) error { return migrations[i].order < migrations[j].order }) - var current int - - if _, err := db.Exec("PRAGMA user_version;", nil, func(stmt *Statement) bool { - current = stmt.ColumnInt(0) - return true - }); err != nil { - return fmt.Errorf("read user_version %w", err) + current, err := version(db) + if err != nil { + return err } for _, m := range migrations { diff --git a/sql/migrations/0003_next.sql b/sql/migrations/0003_v1.1.5.sql similarity index 100% rename from sql/migrations/0003_next.sql rename to sql/migrations/0003_v1.1.5.sql diff --git a/sql/migrations/0004_next.sql b/sql/migrations/0004_next.sql new file mode 100644 index 0000000000..f74ef85c5f --- /dev/null +++ b/sql/migrations/0004_next.sql @@ -0,0 +1,5 @@ +DELETE FROM proposals WHERE layer < 19000; +DELETE FROM proposal_transactions WHERE layer < 19000; +UPDATE certificates SET cert = NULL WHERE layer < 19000; +UPDATE ballots SET ballot = prune_actives(ballot); + diff --git a/sql/migrations_test.go b/sql/migrations_test.go index 8a0c637c46..2cb0de2b0f 100644 --- a/sql/migrations_test.go +++ b/sql/migrations_test.go @@ -15,5 +15,5 @@ func TestMigrationsAppliedOnce(t *testing.T) { return true }) require.NoError(t, err) - require.Equal(t, version, 3) + require.Equal(t, version, 4) } diff --git a/sql/vacuum.go b/sql/vacuum.go new file mode 100644 index 0000000000..e53b7c234d --- /dev/null +++ b/sql/vacuum.go @@ -0,0 +1,15 @@ +package sql + +import ( + "fmt" +) + +func Vacuum(db Executor) error { + if _, err := db.Exec("vacuum", nil, nil); err != nil { + return fmt.Errorf("vacuum %w", err) + } + if _, err := db.Exec("pragma wal_checkpoint(TRUNCATE)", nil, nil); err != nil { + return fmt.Errorf("wal checkpoint %w", err) + } + return nil +} diff --git a/sql/vacuum/vacuum.go b/sql/vacuum/vacuum.go deleted file mode 100644 index 0b56a98db0..0000000000 --- a/sql/vacuum/vacuum.go +++ /dev/null @@ -1,29 +0,0 @@ -package vacuum - -import ( - "fmt" - - "github.com/spacemeshos/go-spacemesh/sql" -) - -func VacuumDB(db sql.Executor) error { - if _, err := db.Exec("vacuum", nil, nil); err != nil { - return fmt.Errorf("vacuum %w", err) - } - if _, err := db.Exec("pragma wal_checkpoint(TRUNCATE)", nil, nil); err != nil { - return fmt.Errorf("wal checkpoint %w", err) - } - return nil -} - -func FreePct(db sql.Executor) (int, error) { - var pct int - if _, err := db.Exec(`SELECT freelist_count, page_count FROM pragma_freelist_count(), pragma_page_count();`, - nil, func(stmt *sql.Statement) bool { - pct = 100 * stmt.ColumnInt(0) / stmt.ColumnInt(1) - return false - }); err != nil { - return 0, fmt.Errorf("db page size %w", err) - } - return pct, nil -} diff --git a/sql/vacuum/vacuum_test.go b/sql/vacuum/vacuum_test.go deleted file mode 100644 index ef6071474e..0000000000 --- a/sql/vacuum/vacuum_test.go +++ /dev/null @@ -1,47 +0,0 @@ -package vacuum - -import ( - "testing" - - "github.com/stretchr/testify/require" - - "github.com/spacemeshos/go-spacemesh/common/types" - "github.com/spacemeshos/go-spacemesh/sql" - "github.com/spacemeshos/go-spacemesh/sql/ballots" - "github.com/spacemeshos/go-spacemesh/sql/proposals" -) - -func TestVacuumDB(t *testing.T) { - db := sql.InMemory() - require.NoError(t, VacuumDB(db)) -} - -func TestFreedPct(t *testing.T) { - db := sql.InMemory() - - got, err := FreePct(db) - require.NoError(t, err) - require.Equal(t, 0, got) - - for i := 0; i < 100; i++ { - ballot := types.NewExistingBallot(types.RandomBallotID(), types.RandomEdSignature(), types.NodeID{1}, types.LayerID(i)) - proposal := &types.Proposal{ - InnerProposal: types.InnerProposal{ - Ballot: ballot, - }, - Signature: types.RandomEdSignature(), - } - proposal.SetID(types.RandomProposalID()) - require.NoError(t, ballots.Add(db, &ballot)) - require.NoError(t, proposals.Add(db, proposal)) - } - - got, err = FreePct(db) - require.NoError(t, err) - require.Equal(t, 0, got) - - require.NoError(t, proposals.Delete(db, 80)) - got, err = FreePct(db) - require.NoError(t, err) - require.Greater(t, got, 10) -} diff --git a/sql/vacuum_test.go b/sql/vacuum_test.go new file mode 100644 index 0000000000..b994516279 --- /dev/null +++ b/sql/vacuum_test.go @@ -0,0 +1,12 @@ +package sql + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestVacuumDB(t *testing.T) { + db := InMemory() + require.NoError(t, Vacuum(db)) +} From 6f21bfc9d29afb4e9ee8691fa8bb96ba738dee95 Mon Sep 17 00:00:00 2001 From: kimmy lin <30611210+countvonzero@users.noreply.github.com> Date: Thu, 14 Sep 2023 19:19:17 -0700 Subject: [PATCH 6/9] extract active set and save --- mesh/janitor.go | 51 ++++++++++++++++++++++++++++++++++++ mesh/janitor_test.go | 37 ++++++++++++++++++++++++++ node/node.go | 1 + sql/ballots/ballots.go | 11 ++++++++ sql/ballots/ballots_test.go | 21 +++++++++++++++ sql/database.go | 28 +++++++++++++++----- sql/functions.go | 19 -------------- sql/migrations/0004_next.sql | 8 +++--- 8 files changed, 145 insertions(+), 31 deletions(-) diff --git a/mesh/janitor.go b/mesh/janitor.go index fa14a60ab8..222cfc5fe4 100644 --- a/mesh/janitor.go +++ b/mesh/janitor.go @@ -2,13 +2,19 @@ package mesh import ( "context" + "errors" + "fmt" "time" "go.uber.org/zap" + "github.com/spacemeshos/go-spacemesh/codec" "github.com/spacemeshos/go-spacemesh/common/types" + "github.com/spacemeshos/go-spacemesh/log" "github.com/spacemeshos/go-spacemesh/mesh/metrics" "github.com/spacemeshos/go-spacemesh/sql" + "github.com/spacemeshos/go-spacemesh/sql/activesets" + "github.com/spacemeshos/go-spacemesh/sql/ballots" "github.com/spacemeshos/go-spacemesh/sql/certificates" "github.com/spacemeshos/go-spacemesh/sql/proposals" "github.com/spacemeshos/go-spacemesh/sql/transactions" @@ -59,3 +65,48 @@ func Prune( } } } + +func ExtractActiveSet(db sql.Executor) error { + latest, err := ballots.LatestLayer(db) + if err != nil { + return fmt.Errorf("extract get latest: %w", err) + } + extracted := 0 + unique := 0 + log.With().Info("extracting ballots active sets", + log.Uint32("from", types.EpochID(2).FirstLayer().Uint32()), + log.Uint32("to", latest.Uint32()), + ) + for lid := types.EpochID(2).FirstLayer(); lid <= latest; lid++ { + blts, err := ballots.Layer(db, lid) + if err != nil { + return fmt.Errorf("extract layer %d: %w", lid, err) + } + for _, b := range blts { + if b.EpochData == nil { + continue + } + if len(b.ActiveSet) == 0 { + continue + } + if err := activesets.Add(db, b.EpochData.ActiveSetHash, &types.EpochActiveSet{ + Epoch: b.Layer.GetEpoch(), + Set: b.ActiveSet, + }); err != nil && !errors.Is(err, sql.ErrObjectExists) { + return fmt.Errorf("add active set %s (%s): %w", b.ID().String(), b.EpochData.ActiveSetHash.ShortString(), err) + } else if err == nil { + unique++ + } + b.ActiveSet = nil + if err := ballots.UpdateBlob(db, b.ID(), codec.MustEncode(b)); err != nil { + return fmt.Errorf("update ballot %s: %w", b.ID().String(), err) + } + extracted++ + } + } + log.With().Info("extracted active sets from ballots", + log.Int("num", extracted), + log.Int("unique", unique), + ) + return nil +} diff --git a/mesh/janitor_test.go b/mesh/janitor_test.go index 2d10472999..d867bfb866 100644 --- a/mesh/janitor_test.go +++ b/mesh/janitor_test.go @@ -13,6 +13,7 @@ import ( "github.com/spacemeshos/go-spacemesh/log/logtest" "github.com/spacemeshos/go-spacemesh/mesh/mocks" "github.com/spacemeshos/go-spacemesh/sql" + "github.com/spacemeshos/go-spacemesh/sql/activesets" "github.com/spacemeshos/go-spacemesh/sql/ballots" "github.com/spacemeshos/go-spacemesh/sql/certificates" "github.com/spacemeshos/go-spacemesh/sql/proposals" @@ -94,3 +95,39 @@ func TestPrune(t *testing.T) { cancel() eg.Wait() } + +func TestExtractActiveSet(t *testing.T) { + db := sql.InMemory() + current := types.LayerID(20) + blts := make([]*types.Ballot, 0, current) + hashes := []types.Hash32{types.RandomHash(), types.RandomHash()} + actives := [][]types.ATXID{types.RandomActiveSet(11), types.RandomActiveSet(19)} + for lid := types.EpochID(2).FirstLayer(); lid < current; lid++ { + blt := types.NewExistingBallot(types.RandomBallotID(), types.RandomEdSignature(), types.NodeID{1}, lid) + if lid%3 == 0 { + blt.EpochData = &types.EpochData{ + ActiveSetHash: hashes[0], + } + blt.ActiveSet = actives[0] + } + if lid%3 == 1 { + blt.EpochData = &types.EpochData{ + ActiveSetHash: hashes[1], + } + blt.ActiveSet = actives[1] + } + require.NoError(t, ballots.Add(db, &blt)) + blts = append(blts, &blt) + } + require.NoError(t, ExtractActiveSet(db)) + for _, b := range blts { + got, err := ballots.Get(db, b.ID()) + require.NoError(t, err) + require.Empty(t, got.ActiveSet) + } + for i, h := range hashes { + got, err := activesets.Get(db, h) + require.NoError(t, err) + require.Equal(t, actives[i], got.Set) + } +} diff --git a/node/node.go b/node/node.go index 8ba5f19208..e7fa797388 100644 --- a/node/node.go +++ b/node/node.go @@ -1336,6 +1336,7 @@ func (app *App) setupDBs(ctx context.Context, lg log.Log) error { sqlDB, err := sql.Open("file:"+filepath.Join(dbPath, dbFile), sql.WithConnections(app.Config.DatabaseConnections), sql.WithLatencyMetering(app.Config.DatabaseLatencyMetering), + sql.WithV4PreMigration(mesh.ExtractActiveSet), ) if err != nil { return fmt.Errorf("open sqlite db %w", err) diff --git a/sql/ballots/ballots.go b/sql/ballots/ballots.go index d543b2ced2..d539a3ef05 100644 --- a/sql/ballots/ballots.go +++ b/sql/ballots/ballots.go @@ -70,6 +70,17 @@ func Has(db sql.Executor, id types.BallotID) (bool, error) { return rows > 0, nil } +func UpdateBlob(db sql.Executor, bid types.BallotID, blob []byte) error { + if _, err := db.Exec(`update ballots set ballot = ?2 where id = ?1;`, + func(stmt *sql.Statement) { + stmt.BindBytes(1, bid.Bytes()) + stmt.BindBytes(2, blob[:]) + }, nil); err != nil { + return fmt.Errorf("update blob %s: %w", bid.String(), err) + } + return nil +} + // Get ballot with id from database. func Get(db sql.Executor, id types.BallotID) (rst *types.Ballot, err error) { if rows, err := db.Exec(`select pubkey, ballot, length(identities.proof) diff --git a/sql/ballots/ballots_test.go b/sql/ballots/ballots_test.go index 7c2a0b6277..ef384b394d 100644 --- a/sql/ballots/ballots_test.go +++ b/sql/ballots/ballots_test.go @@ -7,6 +7,7 @@ import ( "github.com/stretchr/testify/require" + "github.com/spacemeshos/go-spacemesh/codec" "github.com/spacemeshos/go-spacemesh/common/types" "github.com/spacemeshos/go-spacemesh/signing" "github.com/spacemeshos/go-spacemesh/sql" @@ -82,6 +83,26 @@ func TestAdd(t *testing.T) { require.True(t, stored.IsMalicious()) } +func TestUpdateBlob(t *testing.T) { + db := sql.InMemory() + nodeID := types.RandomNodeID() + ballot := types.NewExistingBallot(types.BallotID{1}, types.RandomEdSignature(), nodeID, types.LayerID(0)) + ballot.EpochData = &types.EpochData{ + ActiveSetHash: types.RandomHash(), + } + ballot.ActiveSet = types.RandomActiveSet(199) + require.NoError(t, Add(db, &ballot)) + got, err := Get(db, types.BallotID{1}) + require.NoError(t, err) + require.Equal(t, ballot, *got) + + ballot.ActiveSet = nil + require.NoError(t, UpdateBlob(db, types.BallotID{1}, codec.MustEncode(&ballot))) + got, err = Get(db, types.BallotID{1}) + require.NoError(t, err) + require.Empty(t, got.ActiveSet) +} + func TestHas(t *testing.T) { db := sql.InMemory() ballot := types.NewExistingBallot(types.BallotID{1}, types.EmptyEdSignature, types.EmptyNodeID, types.LayerID(0)) diff --git a/sql/database.go b/sql/database.go index 732f9fd10f..ed04b6cd70 100644 --- a/sql/database.go +++ b/sql/database.go @@ -59,6 +59,9 @@ type conf struct { connections int migrations Migrations enableLatency bool + + // TODO: remove after state is pruned for majority + v4preMigration func(Executor) error } // WithConnections overwrites number of pooled connections. @@ -75,6 +78,12 @@ func WithMigrations(migrations Migrations) Opt { } } +func WithV4PreMigration(cb func(Executor) error) Opt { + return func(c *conf) { + c.v4preMigration = cb + } +} + // WithLatencyMetering enables metric that track latency for every database query. // Note that it will be a significant amount of data, and should not be enabled on // multiple nodes by default. @@ -115,18 +124,16 @@ func Open(uri string, opts ...Opt) (*Database, error) { if config.enableLatency { db.latency = newQueryLatency() } - for i := 0; i < config.connections; i++ { - conn := pool.Get(context.Background()) - if err := registerFunctions(conn); err != nil { - return nil, err - } - pool.Put(conn) - } if config.migrations != nil { before, err := version(db) if err != nil { return nil, err } + if before == 3 && config.v4preMigration != nil { + if err := config.v4preMigration(db); err != nil { + return nil, err + } + } tx, err := db.Tx(context.Background()) if err != nil { return nil, err @@ -145,6 +152,13 @@ func Open(uri string, opts ...Opt) (*Database, error) { } } } + for i := 0; i < config.connections; i++ { + conn := pool.Get(context.Background()) + if err := registerFunctions(conn); err != nil { + return nil, err + } + pool.Put(conn) + } return db, nil } diff --git a/sql/functions.go b/sql/functions.go index fb2ce92b24..07f2c21c85 100644 --- a/sql/functions.go +++ b/sql/functions.go @@ -4,9 +4,6 @@ import ( "fmt" sqlite "github.com/go-llsqlite/crawshaw" - - "github.com/spacemeshos/go-spacemesh/codec" - "github.com/spacemeshos/go-spacemesh/common/types" ) func registerFunctions(conn *sqlite.Conn) error { @@ -19,21 +16,5 @@ func registerFunctions(conn *sqlite.Conn) error { }, nil, nil); err != nil { return fmt.Errorf("registering add_uint64: %w", err) } - // function to prune active set from old ballots - if err := conn.CreateFunction("prune_actives", true, 1, func(ctx sqlite.Context, values ...sqlite.Value) { - var ballot types.Ballot - if err := codec.Decode(values[0].Blob(), &ballot); err != nil { - ctx.ResultError(err) - } else { - ballot.ActiveSet = nil - if blob, err := codec.Encode(&ballot); err != nil { - ctx.ResultError(err) - } else { - ctx.ResultBlob(blob) - } - } - }, nil, nil); err != nil { - return fmt.Errorf("registering prune_actives: %w", err) - } return nil } diff --git a/sql/migrations/0004_next.sql b/sql/migrations/0004_next.sql index f74ef85c5f..7b8a0aded0 100644 --- a/sql/migrations/0004_next.sql +++ b/sql/migrations/0004_next.sql @@ -1,5 +1,3 @@ -DELETE FROM proposals WHERE layer < 19000; -DELETE FROM proposal_transactions WHERE layer < 19000; -UPDATE certificates SET cert = NULL WHERE layer < 19000; -UPDATE ballots SET ballot = prune_actives(ballot); - +DELETE FROM proposals; +DELETE FROM proposal_transactions; +UPDATE certificates SET cert = NULL WHERE layer < 19000; \ No newline at end of file From 35b94e051d5e4cb01d13a49b7ff3861f1fed2994 Mon Sep 17 00:00:00 2001 From: kimmy lin <30611210+countvonzero@users.noreply.github.com> Date: Sun, 17 Sep 2023 15:33:02 -0700 Subject: [PATCH 7/9] code review feedback and delay pruning of activeset to next update --- blocks/metrics.go | 8 -- mesh/janitor.go | 112 -------------------- mesh/metrics/prometheus.go | 11 -- node/node.go | 6 +- prune/interface.go | 11 ++ prune/metrics.go | 22 ++++ prune/mocks.go | 73 +++++++++++++ prune/prune.go | 60 +++++++++++ mesh/janitor_test.go => prune/prune_test.go | 44 +------- sql/ballots/util/extract.go | 58 ++++++++++ sql/ballots/util/extract_test.go | 57 ++++++++++ sql/certificates/certs.go | 2 +- sql/certificates/certs_test.go | 2 +- sql/database.go | 21 ++-- sql/transactions/transactions.go | 2 +- sql/transactions/transactions_test.go | 2 +- sql/vacuum.go | 5 + 17 files changed, 309 insertions(+), 187 deletions(-) delete mode 100644 mesh/janitor.go create mode 100644 prune/interface.go create mode 100644 prune/metrics.go create mode 100644 prune/mocks.go create mode 100644 prune/prune.go rename mesh/janitor_test.go => prune/prune_test.go (69%) create mode 100644 sql/ballots/util/extract.go create mode 100644 sql/ballots/util/extract_test.go diff --git a/blocks/metrics.go b/blocks/metrics.go index 9c2cb381f8..2a818408bc 100644 --- a/blocks/metrics.go +++ b/blocks/metrics.go @@ -30,14 +30,6 @@ var ( failFetchCnt = blockGenCount.WithLabelValues(failFetch) failGenCnt = blockGenCount.WithLabelValues(failGen) failErrCnt = blockGenCount.WithLabelValues(internalErr) - - deleteLatency = metrics.NewHistogramWithBuckets( - "delete_duration", - namespace, - "duration in second to delete old proposals", - []string{}, - prometheus.ExponentialBuckets(0.01, 2, 10), - ).WithLabelValues() ) type collector struct { diff --git a/mesh/janitor.go b/mesh/janitor.go deleted file mode 100644 index 222cfc5fe4..0000000000 --- a/mesh/janitor.go +++ /dev/null @@ -1,112 +0,0 @@ -package mesh - -import ( - "context" - "errors" - "fmt" - "time" - - "go.uber.org/zap" - - "github.com/spacemeshos/go-spacemesh/codec" - "github.com/spacemeshos/go-spacemesh/common/types" - "github.com/spacemeshos/go-spacemesh/log" - "github.com/spacemeshos/go-spacemesh/mesh/metrics" - "github.com/spacemeshos/go-spacemesh/sql" - "github.com/spacemeshos/go-spacemesh/sql/activesets" - "github.com/spacemeshos/go-spacemesh/sql/ballots" - "github.com/spacemeshos/go-spacemesh/sql/certificates" - "github.com/spacemeshos/go-spacemesh/sql/proposals" - "github.com/spacemeshos/go-spacemesh/sql/transactions" -) - -func Prune( - ctx context.Context, - logger *zap.Logger, - db sql.Executor, - lc layerClock, - safeDist uint32, - interval time.Duration, -) { - logger.With().Info("db pruning launched", - zap.Uint32("dist", safeDist), - zap.Duration("interval", interval), - ) - for { - select { - case <-ctx.Done(): - return - case <-time.After(interval): - oldest := lc.CurrentLayer() - types.LayerID(safeDist) - t0 := time.Now() - if err := proposals.Delete(db, oldest); err != nil { - logger.Error("failed to delete proposals", - zap.Stringer("lid", oldest), - zap.Error(err), - ) - } - metrics.PruneProposalLatency.Observe(time.Since(t0).Seconds()) - t1 := time.Now() - if err := certificates.DeleteCert(db, oldest); err != nil { - logger.Error("failed to delete certificates", - zap.Stringer("lid", oldest), - zap.Error(err), - ) - } - metrics.PruneCertLatency.Observe(time.Since(t1).Seconds()) - t2 := time.Now() - if err := transactions.DeleteProposalTxs(db, oldest); err != nil { - logger.Error("failed to delete proposal tx mapping", - zap.Stringer("lid", oldest), - zap.Error(err), - ) - } - metrics.PrunePropTxLatency.Observe(time.Since(t2).Seconds()) - } - } -} - -func ExtractActiveSet(db sql.Executor) error { - latest, err := ballots.LatestLayer(db) - if err != nil { - return fmt.Errorf("extract get latest: %w", err) - } - extracted := 0 - unique := 0 - log.With().Info("extracting ballots active sets", - log.Uint32("from", types.EpochID(2).FirstLayer().Uint32()), - log.Uint32("to", latest.Uint32()), - ) - for lid := types.EpochID(2).FirstLayer(); lid <= latest; lid++ { - blts, err := ballots.Layer(db, lid) - if err != nil { - return fmt.Errorf("extract layer %d: %w", lid, err) - } - for _, b := range blts { - if b.EpochData == nil { - continue - } - if len(b.ActiveSet) == 0 { - continue - } - if err := activesets.Add(db, b.EpochData.ActiveSetHash, &types.EpochActiveSet{ - Epoch: b.Layer.GetEpoch(), - Set: b.ActiveSet, - }); err != nil && !errors.Is(err, sql.ErrObjectExists) { - return fmt.Errorf("add active set %s (%s): %w", b.ID().String(), b.EpochData.ActiveSetHash.ShortString(), err) - } else if err == nil { - unique++ - } - b.ActiveSet = nil - if err := ballots.UpdateBlob(db, b.ID(), codec.MustEncode(b)); err != nil { - return fmt.Errorf("update ballot %s: %w", b.ID().String(), err) - } - extracted++ - } - } - log.With().Info("extracted active sets from ballots", - log.Int("num", extracted), - log.Int("unique", unique), - ) - return nil -} diff --git a/mesh/metrics/prometheus.go b/mesh/metrics/prometheus.go index 2ef85ff42b..631bcdc033 100644 --- a/mesh/metrics/prometheus.go +++ b/mesh/metrics/prometheus.go @@ -20,15 +20,4 @@ var ( []string{}, prometheus.ExponentialBuckets(1, 2, 16), ) - - pruneLatency = metrics.NewHistogramWithBuckets( - "prune_seconds", - Subsystem, - "prune time in seconds", - []string{"step"}, - prometheus.ExponentialBuckets(0.01, 2, 10), - ) - PruneProposalLatency = pruneLatency.WithLabelValues("proposal") - PruneCertLatency = pruneLatency.WithLabelValues("cert") - PrunePropTxLatency = pruneLatency.WithLabelValues("proptxs") ) diff --git a/node/node.go b/node/node.go index e7fa797388..22dc567898 100644 --- a/node/node.go +++ b/node/node.go @@ -63,8 +63,10 @@ import ( "github.com/spacemeshos/go-spacemesh/p2p" "github.com/spacemeshos/go-spacemesh/p2p/pubsub" "github.com/spacemeshos/go-spacemesh/proposals" + "github.com/spacemeshos/go-spacemesh/prune" "github.com/spacemeshos/go-spacemesh/signing" "github.com/spacemeshos/go-spacemesh/sql" + "github.com/spacemeshos/go-spacemesh/sql/ballots/util" "github.com/spacemeshos/go-spacemesh/sql/layers" dbmetrics "github.com/spacemeshos/go-spacemesh/sql/metrics" "github.com/spacemeshos/go-spacemesh/syncer" @@ -645,7 +647,7 @@ func (app *App) initServices(ctx context.Context) error { } app.eg.Go(func() error { - mesh.Prune(ctx, mlog.Zap(), app.db, app.clock, app.Config.HareEligibility.ConfidenceParam, app.Config.DatabasePruneInterval) + prune.Prune(ctx, mlog.Zap(), app.db, app.clock, app.Config.Tortoise.Hdist, app.Config.DatabasePruneInterval) return nil }) @@ -1336,7 +1338,7 @@ func (app *App) setupDBs(ctx context.Context, lg log.Log) error { sqlDB, err := sql.Open("file:"+filepath.Join(dbPath, dbFile), sql.WithConnections(app.Config.DatabaseConnections), sql.WithLatencyMetering(app.Config.DatabaseLatencyMetering), - sql.WithV4PreMigration(mesh.ExtractActiveSet), + sql.WithV4Migration(util.ExtractActiveSet), ) if err != nil { return fmt.Errorf("open sqlite db %w", err) diff --git a/prune/interface.go b/prune/interface.go new file mode 100644 index 0000000000..559a9639f6 --- /dev/null +++ b/prune/interface.go @@ -0,0 +1,11 @@ +package prune + +import ( + "github.com/spacemeshos/go-spacemesh/common/types" +) + +//go:generate mockgen -typed -package=prune -destination=./mocks.go -source=./interface.go + +type layerClock interface { + CurrentLayer() types.LayerID +} diff --git a/prune/metrics.go b/prune/metrics.go new file mode 100644 index 0000000000..97eb060fc8 --- /dev/null +++ b/prune/metrics.go @@ -0,0 +1,22 @@ +package prune + +import ( + "github.com/prometheus/client_golang/prometheus" + + "github.com/spacemeshos/go-spacemesh/metrics" +) + +const namespace = "prune" + +var ( + pruneLatency = metrics.NewHistogramWithBuckets( + "prune_seconds", + namespace, + "prune time in seconds", + []string{"step"}, + prometheus.ExponentialBuckets(0.01, 2, 10), + ) + proposalLatency = pruneLatency.WithLabelValues("proposal") + certLatency = pruneLatency.WithLabelValues("cert") + propTxLatency = pruneLatency.WithLabelValues("proptxs") +) diff --git a/prune/mocks.go b/prune/mocks.go new file mode 100644 index 0000000000..41857e78ab --- /dev/null +++ b/prune/mocks.go @@ -0,0 +1,73 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: ./interface.go + +// Package prune is a generated GoMock package. +package prune + +import ( + reflect "reflect" + + types "github.com/spacemeshos/go-spacemesh/common/types" + gomock "go.uber.org/mock/gomock" +) + +// MocklayerClock is a mock of layerClock interface. +type MocklayerClock struct { + ctrl *gomock.Controller + recorder *MocklayerClockMockRecorder +} + +// MocklayerClockMockRecorder is the mock recorder for MocklayerClock. +type MocklayerClockMockRecorder struct { + mock *MocklayerClock +} + +// NewMocklayerClock creates a new mock instance. +func NewMocklayerClock(ctrl *gomock.Controller) *MocklayerClock { + mock := &MocklayerClock{ctrl: ctrl} + mock.recorder = &MocklayerClockMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MocklayerClock) EXPECT() *MocklayerClockMockRecorder { + return m.recorder +} + +// CurrentLayer mocks base method. +func (m *MocklayerClock) CurrentLayer() types.LayerID { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CurrentLayer") + ret0, _ := ret[0].(types.LayerID) + return ret0 +} + +// CurrentLayer indicates an expected call of CurrentLayer. +func (mr *MocklayerClockMockRecorder) CurrentLayer() *layerClockCurrentLayerCall { + mr.mock.ctrl.T.Helper() + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CurrentLayer", reflect.TypeOf((*MocklayerClock)(nil).CurrentLayer)) + return &layerClockCurrentLayerCall{Call: call} +} + +// layerClockCurrentLayerCall wrap *gomock.Call +type layerClockCurrentLayerCall struct { + *gomock.Call +} + +// Return rewrite *gomock.Call.Return +func (c *layerClockCurrentLayerCall) Return(arg0 types.LayerID) *layerClockCurrentLayerCall { + c.Call = c.Call.Return(arg0) + return c +} + +// Do rewrite *gomock.Call.Do +func (c *layerClockCurrentLayerCall) Do(f func() types.LayerID) *layerClockCurrentLayerCall { + c.Call = c.Call.Do(f) + return c +} + +// DoAndReturn rewrite *gomock.Call.DoAndReturn +func (c *layerClockCurrentLayerCall) DoAndReturn(f func() types.LayerID) *layerClockCurrentLayerCall { + c.Call = c.Call.DoAndReturn(f) + return c +} diff --git a/prune/prune.go b/prune/prune.go new file mode 100644 index 0000000000..b5722be7c1 --- /dev/null +++ b/prune/prune.go @@ -0,0 +1,60 @@ +package prune + +import ( + "context" + "time" + + "go.uber.org/zap" + + "github.com/spacemeshos/go-spacemesh/common/types" + "github.com/spacemeshos/go-spacemesh/sql" + "github.com/spacemeshos/go-spacemesh/sql/certificates" + "github.com/spacemeshos/go-spacemesh/sql/proposals" + "github.com/spacemeshos/go-spacemesh/sql/transactions" +) + +func Prune( + ctx context.Context, + logger *zap.Logger, + db sql.Executor, + lc layerClock, + safeDist uint32, + interval time.Duration, +) { + logger.With().Info("db pruning launched", + zap.Uint32("dist", safeDist), + zap.Duration("interval", interval), + ) + for { + select { + case <-ctx.Done(): + return + case <-time.After(interval): + oldest := lc.CurrentLayer() - types.LayerID(safeDist) + t0 := time.Now() + if err := proposals.DeleteBefore(db, oldest); err != nil { + logger.Error("failed to delete proposals", + zap.Stringer("lid", oldest), + zap.Error(err), + ) + } + proposalLatency.Observe(time.Since(t0).Seconds()) + t1 := time.Now() + if err := certificates.DeleteCertBefore(db, oldest); err != nil { + logger.Error("failed to delete certificates", + zap.Stringer("lid", oldest), + zap.Error(err), + ) + } + certLatency.Observe(time.Since(t1).Seconds()) + t2 := time.Now() + if err := transactions.DeleteProposalTxsBefore(db, oldest); err != nil { + logger.Error("failed to delete proposal tx mapping", + zap.Stringer("lid", oldest), + zap.Error(err), + ) + } + propTxLatency.Observe(time.Since(t2).Seconds()) + } + } +} diff --git a/mesh/janitor_test.go b/prune/prune_test.go similarity index 69% rename from mesh/janitor_test.go rename to prune/prune_test.go index d867bfb866..286725459e 100644 --- a/mesh/janitor_test.go +++ b/prune/prune_test.go @@ -1,4 +1,4 @@ -package mesh +package prune import ( "context" @@ -11,9 +11,7 @@ import ( "github.com/spacemeshos/go-spacemesh/common/types" "github.com/spacemeshos/go-spacemesh/log/logtest" - "github.com/spacemeshos/go-spacemesh/mesh/mocks" "github.com/spacemeshos/go-spacemesh/sql" - "github.com/spacemeshos/go-spacemesh/sql/activesets" "github.com/spacemeshos/go-spacemesh/sql/ballots" "github.com/spacemeshos/go-spacemesh/sql/certificates" "github.com/spacemeshos/go-spacemesh/sql/proposals" @@ -23,7 +21,7 @@ import ( func TestPrune(t *testing.T) { db := sql.InMemory() current := types.LayerID(10) - mc := mocks.NewMocklayerClock(gomock.NewController(t)) + mc := NewMocklayerClock(gomock.NewController(t)) done := make(chan struct{}) count := 0 mc.EXPECT().CurrentLayer().DoAndReturn(func() types.LayerID { @@ -93,41 +91,5 @@ func TestPrune(t *testing.T) { } }, time.Second, 10*time.Millisecond) cancel() - eg.Wait() -} - -func TestExtractActiveSet(t *testing.T) { - db := sql.InMemory() - current := types.LayerID(20) - blts := make([]*types.Ballot, 0, current) - hashes := []types.Hash32{types.RandomHash(), types.RandomHash()} - actives := [][]types.ATXID{types.RandomActiveSet(11), types.RandomActiveSet(19)} - for lid := types.EpochID(2).FirstLayer(); lid < current; lid++ { - blt := types.NewExistingBallot(types.RandomBallotID(), types.RandomEdSignature(), types.NodeID{1}, lid) - if lid%3 == 0 { - blt.EpochData = &types.EpochData{ - ActiveSetHash: hashes[0], - } - blt.ActiveSet = actives[0] - } - if lid%3 == 1 { - blt.EpochData = &types.EpochData{ - ActiveSetHash: hashes[1], - } - blt.ActiveSet = actives[1] - } - require.NoError(t, ballots.Add(db, &blt)) - blts = append(blts, &blt) - } - require.NoError(t, ExtractActiveSet(db)) - for _, b := range blts { - got, err := ballots.Get(db, b.ID()) - require.NoError(t, err) - require.Empty(t, got.ActiveSet) - } - for i, h := range hashes { - got, err := activesets.Get(db, h) - require.NoError(t, err) - require.Equal(t, actives[i], got.Set) - } + require.NoError(t, eg.Wait()) } diff --git a/sql/ballots/util/extract.go b/sql/ballots/util/extract.go new file mode 100644 index 0000000000..52e097a06c --- /dev/null +++ b/sql/ballots/util/extract.go @@ -0,0 +1,58 @@ +package util + +import ( + "errors" + "fmt" + + "github.com/spacemeshos/go-spacemesh/common/types" + "github.com/spacemeshos/go-spacemesh/log" + "github.com/spacemeshos/go-spacemesh/sql" + "github.com/spacemeshos/go-spacemesh/sql/activesets" + "github.com/spacemeshos/go-spacemesh/sql/ballots" +) + +func ExtractActiveSet(db sql.Executor) error { + latest, err := ballots.LatestLayer(db) + if err != nil { + return fmt.Errorf("extract get latest: %w", err) + } + extracted := 0 + unique := 0 + log.With().Info("extracting ballots active sets", + log.Uint32("from", types.EpochID(2).FirstLayer().Uint32()), + log.Uint32("to", latest.Uint32()), + ) + for lid := types.EpochID(2).FirstLayer(); lid <= latest; lid++ { + blts, err := ballots.Layer(db, lid) + if err != nil { + return fmt.Errorf("extract layer %d: %w", lid, err) + } + for _, b := range blts { + if b.EpochData == nil { + continue + } + if len(b.ActiveSet) == 0 { + continue + } + if err := activesets.Add(db, b.EpochData.ActiveSetHash, &types.EpochActiveSet{ + Epoch: b.Layer.GetEpoch(), + Set: b.ActiveSet, + }); err != nil && !errors.Is(err, sql.ErrObjectExists) { + return fmt.Errorf("add active set %s (%s): %w", b.ID().String(), b.EpochData.ActiveSetHash.ShortString(), err) + } else if err == nil { + unique++ + } + // TODO: prune ballot active set after migration 4 is released + //b.ActiveSet = nil + //if err := ballots.UpdateBlob(db, b.ID(), codec.MustEncode(b)); err != nil { + // return fmt.Errorf("update ballot %s: %w", b.ID().String(), err) + //} + extracted++ + } + } + log.With().Info("extracted active sets from ballots", + log.Int("num", extracted), + log.Int("unique", unique), + ) + return nil +} diff --git a/sql/ballots/util/extract_test.go b/sql/ballots/util/extract_test.go new file mode 100644 index 0000000000..c9df4416a3 --- /dev/null +++ b/sql/ballots/util/extract_test.go @@ -0,0 +1,57 @@ +package util + +import ( + "os" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/spacemeshos/go-spacemesh/common/types" + "github.com/spacemeshos/go-spacemesh/sql" + "github.com/spacemeshos/go-spacemesh/sql/activesets" + "github.com/spacemeshos/go-spacemesh/sql/ballots" +) + +func TestMain(m *testing.M) { + types.SetLayersPerEpoch(4) + res := m.Run() + os.Exit(res) +} + +func TestExtractActiveSet(t *testing.T) { + db := sql.InMemory() + current := types.LayerID(20) + blts := make([]*types.Ballot, 0, current) + hashes := []types.Hash32{types.RandomHash(), types.RandomHash()} + actives := [][]types.ATXID{types.RandomActiveSet(11), types.RandomActiveSet(19)} + for lid := types.EpochID(2).FirstLayer(); lid < current; lid++ { + blt := types.NewExistingBallot(types.RandomBallotID(), types.RandomEdSignature(), types.NodeID{1}, lid) + if lid%3 == 0 { + blt.EpochData = &types.EpochData{ + ActiveSetHash: hashes[0], + } + blt.ActiveSet = actives[0] + } + if lid%3 == 1 { + blt.EpochData = &types.EpochData{ + ActiveSetHash: hashes[1], + } + blt.ActiveSet = actives[1] + } + require.NoError(t, ballots.Add(db, &blt)) + blts = append(blts, &blt) + } + require.NoError(t, ExtractActiveSet(db)) + for _, b := range blts { + got, err := ballots.Get(db, b.ID()) + require.NoError(t, err) + if b.Layer%3 != 2 { + require.NotEmpty(t, got.ActiveSet) + } + } + for i, h := range hashes { + got, err := activesets.Get(db, h) + require.NoError(t, err) + require.Equal(t, actives[i], got.Set) + } +} diff --git a/sql/certificates/certs.go b/sql/certificates/certs.go index 5ee460cb3b..b5010091a0 100644 --- a/sql/certificates/certs.go +++ b/sql/certificates/certs.go @@ -165,7 +165,7 @@ func SetInvalid(db sql.Executor, lid types.LayerID, bid types.BlockID) error { return nil } -func DeleteCert(db sql.Executor, lid types.LayerID) error { +func DeleteCertBefore(db sql.Executor, lid types.LayerID) error { if _, err := db.Exec(`update certificates set cert = null where layer < ?1;`, func(stmt *sql.Statement) { stmt.BindInt64(1, int64(lid)) diff --git a/sql/certificates/certs_test.go b/sql/certificates/certs_test.go index ebf69d79ae..cfe4ff9104 100644 --- a/sql/certificates/certs_test.go +++ b/sql/certificates/certs_test.go @@ -165,7 +165,7 @@ func TestDeleteCert(t *testing.T) { require.NoError(t, Add(db, types.LayerID(2), &types.Certificate{BlockID: types.BlockID{2}})) require.NoError(t, Add(db, types.LayerID(3), &types.Certificate{BlockID: types.BlockID{3}})) require.NoError(t, Add(db, types.LayerID(4), &types.Certificate{BlockID: types.BlockID{4}})) - require.NoError(t, DeleteCert(db, 4)) + require.NoError(t, DeleteCertBefore(db, 4)) _, err := CertifiedBlock(db, types.LayerID(2)) require.ErrorIs(t, err, sql.ErrNotFound) diff --git a/sql/database.go b/sql/database.go index ed04b6cd70..9b9c3aafed 100644 --- a/sql/database.go +++ b/sql/database.go @@ -61,7 +61,7 @@ type conf struct { enableLatency bool // TODO: remove after state is pruned for majority - v4preMigration func(Executor) error + v4Migration func(Executor) error } // WithConnections overwrites number of pooled connections. @@ -78,9 +78,9 @@ func WithMigrations(migrations Migrations) Opt { } } -func WithV4PreMigration(cb func(Executor) error) Opt { +func WithV4Migration(cb func(Executor) error) Opt { return func(c *conf) { - c.v4preMigration = cb + c.v4Migration = cb } } @@ -129,11 +129,6 @@ func Open(uri string, opts ...Opt) (*Database, error) { if err != nil { return nil, err } - if before == 3 && config.v4preMigration != nil { - if err := config.v4preMigration(db); err != nil { - return nil, err - } - } tx, err := db.Tx(context.Background()) if err != nil { return nil, err @@ -146,7 +141,15 @@ func Open(uri string, opts ...Opt) (*Database, error) { if err != nil { return nil, err } - if before == 3 { + after, err := version(db) + if err != nil { + return nil, err + } + if before <= 3 && after == 4 && config.v4Migration != nil { + // v4 migration (active set extraction) needs the 3rd migration to execute first + if err := config.v4Migration(db); err != nil { + return nil, err + } if err := Vacuum(db); err != nil { return nil, err } diff --git a/sql/transactions/transactions.go b/sql/transactions/transactions.go index 785b1fae77..536b0bf99f 100644 --- a/sql/transactions/transactions.go +++ b/sql/transactions/transactions.go @@ -59,7 +59,7 @@ func AddToProposal(db sql.Executor, tid types.TransactionID, lid types.LayerID, return nil } -func DeleteProposalTxs(db sql.Executor, lid types.LayerID) error { +func DeleteProposalTxsBefore(db sql.Executor, lid types.LayerID) error { if _, err := db.Exec(` delete from proposal_transactions where layer < ?1;`, func(stmt *sql.Statement) { diff --git a/sql/transactions/transactions_test.go b/sql/transactions/transactions_test.go index d2b3f0e684..a698b64f4d 100644 --- a/sql/transactions/transactions_test.go +++ b/sql/transactions/transactions_test.go @@ -168,7 +168,7 @@ func TestDeleteProposalTxs(t *testing.T) { } } } - require.NoError(t, transactions.DeleteProposalTxs(db, types.LayerID(11))) + require.NoError(t, transactions.DeleteProposalTxsBefore(db, types.LayerID(11))) for _, pid := range proposals[types.LayerID(10)] { for _, tid := range tids { has, err := transactions.HasProposalTX(db, pid, tid) diff --git a/sql/vacuum.go b/sql/vacuum.go index e53b7c234d..0f874fe14f 100644 --- a/sql/vacuum.go +++ b/sql/vacuum.go @@ -2,14 +2,19 @@ package sql import ( "fmt" + + "github.com/spacemeshos/go-spacemesh/log" ) func Vacuum(db Executor) error { + log.Info("vacuuming db...") if _, err := db.Exec("vacuum", nil, nil); err != nil { return fmt.Errorf("vacuum %w", err) } + log.Info("checkpointing db...") if _, err := db.Exec("pragma wal_checkpoint(TRUNCATE)", nil, nil); err != nil { return fmt.Errorf("wal checkpoint %w", err) } + log.Info("db vacuum completed") return nil } From ffe37de20a4631c6062e39e9ffd3aaf6f5fe3445 Mon Sep 17 00:00:00 2001 From: kimmy lin <30611210+countvonzero@users.noreply.github.com> Date: Mon, 18 Sep 2023 17:38:01 -0700 Subject: [PATCH 8/9] update changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8c0ae35ec3..5934bd7870 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ Support for old certificate sync protocol is dropped. This update is incompatibl ### Features ### Improvements +* [#4998](https://github.com/spacemeshos/go-spacemesh/pull/4998) First phase of state size reduction. Also prune ephemeral data periodically. * [#5021](https://github.com/spacemeshos/go-spacemesh/pull/5021) Drop support for old certificate sync protocol. * [#5024](https://github.com/spacemeshos/go-spacemesh/pull/5024) Active set will be saved in state separately from ballots. From 9bb6af005ab4d26c7f88988a78a1d50ad91ef0c3 Mon Sep 17 00:00:00 2001 From: kimmy lin <30611210+countvonzero@users.noreply.github.com> Date: Mon, 18 Sep 2023 18:28:07 -0700 Subject: [PATCH 9/9] small cleanup --- CHANGELOG.md | 3 ++- mesh/metrics/prometheus.go | 14 ++++++-------- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5934bd7870..1f2aafd23b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,7 +19,8 @@ Support for old certificate sync protocol is dropped. This update is incompatibl ### Features ### Improvements -* [#4998](https://github.com/spacemeshos/go-spacemesh/pull/4998) First phase of state size reduction. Also prune ephemeral data periodically. +* [#4998](https://github.com/spacemeshos/go-spacemesh/pull/4998) First phase of state size reduction. + Ephemeral data are deleted and state compacted at the time of upgrade. In steady-state, data is pruned periodically. * [#5021](https://github.com/spacemeshos/go-spacemesh/pull/5021) Drop support for old certificate sync protocol. * [#5024](https://github.com/spacemeshos/go-spacemesh/pull/5024) Active set will be saved in state separately from ballots. diff --git a/mesh/metrics/prometheus.go b/mesh/metrics/prometheus.go index 631bcdc033..6c46fb6499 100644 --- a/mesh/metrics/prometheus.go +++ b/mesh/metrics/prometheus.go @@ -12,12 +12,10 @@ const ( ) // LayerNumBlocks is number of blocks in layer. -var ( - LayerNumBlocks = metrics.NewHistogramWithBuckets( - "layer_num_blocks", - Subsystem, - "Number of blocks in layer", - []string{}, - prometheus.ExponentialBuckets(1, 2, 16), - ) +var LayerNumBlocks = metrics.NewHistogramWithBuckets( + "layer_num_blocks", + Subsystem, + "Number of blocks in layer", + []string{}, + prometheus.ExponentialBuckets(1, 2, 16), )