From 06af3e2289dbf6b9f1b6324a0e58d7d5530b003e Mon Sep 17 00:00:00 2001 From: k <30611210+countvonzero@users.noreply.github.com> Date: Tue, 19 Sep 2023 01:28:44 +0000 Subject: [PATCH] prune data and compact state (#4998) ## Motivation Closes #3049 Closes #3588 ## Changes - prune data consistently with tortoise hdist distance. - proposals - certificates - proposal id<->tid mapping certificate is needed for consensus within hdist. the same distance is used for all for simplicity. - extract active set data from ballots and save them in activesets table. - vacuum and checkpoint database after migration 4 is complete this PR concludes the first update described https://github.com/spacemeshos/go-spacemesh/issues/4984#issuecomment-1720169998 --- CHANGELOG.md | 2 + blocks/generator.go | 15 --- blocks/generator_test.go | 17 ---- blocks/metrics.go | 8 -- cmd/root.go | 2 + config/config.go | 34 +++---- config/mainnet.go | 11 ++- config/presets/fastnet.go | 1 + node/node.go | 17 +++- prune/interface.go | 11 +++ prune/metrics.go | 22 +++++ prune/mocks.go | 73 ++++++++++++++ prune/prune.go | 60 ++++++++++++ prune/prune_test.go | 95 +++++++++++++++++++ sql/ballots/ballots.go | 11 +++ sql/ballots/ballots_test.go | 21 ++++ sql/ballots/util/extract.go | 58 +++++++++++ sql/ballots/util/extract_test.go | 57 +++++++++++ sql/certificates/certs.go | 10 ++ sql/certificates/certs_test.go | 16 ++++ sql/database.go | 28 +++++- sql/migrations.go | 21 ++-- .../{0003_next.sql => 0003_v1.1.5.sql} | 0 sql/migrations/0004_next.sql | 3 + sql/migrations_test.go | 2 +- sql/transactions/transactions.go | 11 +++ sql/transactions/transactions_test.go | 31 ++++++ sql/vacuum.go | 20 ++++ sql/vacuum_test.go | 12 +++ 29 files changed, 595 insertions(+), 74 deletions(-) create mode 100644 prune/interface.go create mode 100644 prune/metrics.go create mode 100644 prune/mocks.go create mode 100644 prune/prune.go create mode 100644 prune/prune_test.go create mode 100644 sql/ballots/util/extract.go create mode 100644 sql/ballots/util/extract_test.go rename sql/migrations/{0003_next.sql => 0003_v1.1.5.sql} (100%) create mode 100644 sql/migrations/0004_next.sql create mode 100644 sql/vacuum.go create mode 100644 sql/vacuum_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 8c0ae35ec3a..1f2aafd23b1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,8 @@ Support for old certificate sync protocol is dropped. This update is incompatibl ### Features ### Improvements +* [#4998](https://github.com/spacemeshos/go-spacemesh/pull/4998) First phase of state size reduction. + Ephemeral data are deleted and state compacted at the time of upgrade. In steady-state, data is pruned periodically. * [#5021](https://github.com/spacemeshos/go-spacemesh/pull/5021) Drop support for old certificate sync protocol. * [#5024](https://github.com/spacemeshos/go-spacemesh/pull/5024) Active set will be saved in state separately from ballots. diff --git a/blocks/generator.go b/blocks/generator.go index b1aa78e40eb..3d5e389bd91 100644 --- a/blocks/generator.go +++ b/blocks/generator.go @@ -135,20 +135,7 @@ func (g *Generator) Stop() { } } -func (g *Generator) pruneAsync() { - g.eg.Go(func() error { - lid := g.msh.ProcessedLayer() - start := time.Now() - if err := proposals.DeleteBefore(g.cdb, lid); err != nil { - g.logger.With().Error("failed to delete old proposals", lid, log.Err(err)) - } - deleteLatency.Observe(time.Since(start).Seconds()) - return nil - }) -} - func (g *Generator) run() error { - g.pruneAsync() var maxLayer types.LayerID for { select { @@ -180,12 +167,10 @@ func (g *Generator) run() error { if len(g.optimisticOutput) > 0 { g.processOptimisticLayers(maxLayer) } - g.pruneAsync() case <-time.After(g.cfg.GenBlockInterval): if len(g.optimisticOutput) > 0 { g.processOptimisticLayers(maxLayer) } - g.pruneAsync() } } } diff --git a/blocks/generator_test.go b/blocks/generator_test.go index 2ad8415bd01..678438589d8 100644 --- a/blocks/generator_test.go +++ b/blocks/generator_test.go @@ -344,17 +344,7 @@ func Test_run(t *testing.T) { txIDs := createAndSaveTxs(t, numTXs, tg.cdb) signers, atxes := createATXs(t, tg.cdb, (layerID.GetEpoch() - 1).FirstLayer(), numProposals) activeSet := types.ToATXIDs(atxes) - // generate some proposals before this layer - oldest := layerID - 10 - for lid := oldest; lid < layerID; lid++ { - createProposals(t, tg.cdb, lid, types.EmptyLayerHash, signers, activeSet, txIDs) - } plist := createProposals(t, tg.cdb, layerID, meshHash, signers, activeSet, txIDs) - for lid := oldest; lid <= layerID; lid++ { - got, err := proposals.GetByLayer(tg.cdb, lid) - require.NoError(t, err) - require.Len(t, got, len(signers)) - } pids := types.ToProposalIDs(plist) tg.mockFetch.EXPECT().GetProposals(gomock.Any(), pids) @@ -413,13 +403,6 @@ func Test_run(t *testing.T) { tg.hareCh <- hare.LayerOutput{Ctx: context.Background(), Layer: layerID, Proposals: pids} require.Eventually(t, func() bool { return len(tg.hareCh) == 0 }, time.Second, 100*time.Millisecond) tg.Stop() - for lid := oldest; lid < layerID; lid++ { - _, err := proposals.GetByLayer(tg.cdb, lid) - require.ErrorIs(t, err, sql.ErrNotFound) - } - got, err := proposals.GetByLayer(tg.cdb, layerID) - require.NoError(t, err) - require.Len(t, got, len(signers)) }) } } diff --git a/blocks/metrics.go b/blocks/metrics.go index 9c2cb381f83..2a818408bc2 100644 --- a/blocks/metrics.go +++ b/blocks/metrics.go @@ -30,14 +30,6 @@ var ( failFetchCnt = blockGenCount.WithLabelValues(failFetch) failGenCnt = blockGenCount.WithLabelValues(failGen) failErrCnt = blockGenCount.WithLabelValues(internalErr) - - deleteLatency = metrics.NewHistogramWithBuckets( - "delete_duration", - namespace, - "duration in second to delete old proposals", - []string{}, - prometheus.ExponentialBuckets(0.01, 2, 10), - ).WithLabelValues() ) type collector struct { diff --git a/cmd/root.go b/cmd/root.go index 253591679a9..8c7142a4aba 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -78,6 +78,8 @@ func AddCommands(cmd *cobra.Command) { cfg.DatabaseConnections, "configure number of active connections to enable parallel read requests") cmd.PersistentFlags().BoolVar(&cfg.DatabaseLatencyMetering, "db-latency-metering", cfg.DatabaseLatencyMetering, "if enabled collect latency histogram for every database query") + cmd.PersistentFlags().DurationVar(&cfg.DatabasePruneInterval, "db-prune-interval", + cfg.DatabasePruneInterval, "configure interval for database pruning") /** ======================== P2P Flags ========================== **/ diff --git a/config/config.go b/config/config.go index b53710e4f97..b0795bfbc54 100644 --- a/config/config.go +++ b/config/config.go @@ -108,8 +108,9 @@ type BaseConfig struct { OptFilterThreshold int `mapstructure:"optimistic-filtering-threshold"` TickSize uint64 `mapstructure:"tick-size"` - DatabaseConnections int `mapstructure:"db-connections"` - DatabaseLatencyMetering bool `mapstructure:"db-latency-metering"` + DatabaseConnections int `mapstructure:"db-connections"` + DatabaseLatencyMetering bool `mapstructure:"db-latency-metering"` + DatabasePruneInterval time.Duration `mapstructure:"db-prune-interval"` NetworkHRP string `mapstructure:"network-hrp"` @@ -173,20 +174,21 @@ func DefaultTestConfig() Config { // DefaultBaseConfig returns a default configuration for spacemesh. func defaultBaseConfig() BaseConfig { return BaseConfig{ - DataDirParent: defaultDataDir, - FileLock: filepath.Join(os.TempDir(), "spacemesh.lock"), - CollectMetrics: false, - MetricsPort: 1010, - ProfilerName: "gp-spacemesh", - LayerDuration: 30 * time.Second, - LayersPerEpoch: 3, - PoETServers: []string{"127.0.0.1"}, - TxsPerProposal: 100, - BlockGasLimit: math.MaxUint64, - OptFilterThreshold: 90, - TickSize: 100, - DatabaseConnections: 16, - NetworkHRP: "sm", + DataDirParent: defaultDataDir, + FileLock: filepath.Join(os.TempDir(), "spacemesh.lock"), + CollectMetrics: false, + MetricsPort: 1010, + ProfilerName: "gp-spacemesh", + LayerDuration: 30 * time.Second, + LayersPerEpoch: 3, + PoETServers: []string{"127.0.0.1"}, + TxsPerProposal: 100, + BlockGasLimit: math.MaxUint64, + OptFilterThreshold: 90, + TickSize: 100, + DatabaseConnections: 16, + DatabasePruneInterval: 30 * time.Minute, + NetworkHRP: "sm", } } diff --git a/config/mainnet.go b/config/mainnet.go index cf9e01ca7a8..9b28038c6e9 100644 --- a/config/mainnet.go +++ b/config/mainnet.go @@ -42,11 +42,12 @@ func MainnetConfig() Config { logging.TrtlLoggerLevel = zapcore.WarnLevel.String() return Config{ BaseConfig: BaseConfig{ - DataDirParent: defaultDataDir, - FileLock: filepath.Join(os.TempDir(), "spacemesh.lock"), - MetricsPort: 1010, - DatabaseConnections: 16, - NetworkHRP: "sm", + DataDirParent: defaultDataDir, + FileLock: filepath.Join(os.TempDir(), "spacemesh.lock"), + MetricsPort: 1010, + DatabaseConnections: 16, + DatabasePruneInterval: 30 * time.Minute, + NetworkHRP: "sm", LayerDuration: 5 * time.Minute, LayerAvgSize: 50, diff --git a/config/presets/fastnet.go b/config/presets/fastnet.go index b7d4ad726c9..e7bf8d0d194 100644 --- a/config/presets/fastnet.go +++ b/config/presets/fastnet.go @@ -22,6 +22,7 @@ func fastnet() config.Config { conf.NetworkHRP = "stest" types.SetNetworkHRP(conf.NetworkHRP) // set to generate coinbase conf.BaseConfig.OptFilterThreshold = 90 + conf.BaseConfig.DatabasePruneInterval = time.Minute // set for systest TestEquivocation conf.BaseConfig.MinerGoodAtxsPercent = 50 diff --git a/node/node.go b/node/node.go index 4d3c27d4cdf..22dc5678982 100644 --- a/node/node.go +++ b/node/node.go @@ -63,8 +63,10 @@ import ( "github.com/spacemeshos/go-spacemesh/p2p" "github.com/spacemeshos/go-spacemesh/p2p/pubsub" "github.com/spacemeshos/go-spacemesh/proposals" + "github.com/spacemeshos/go-spacemesh/prune" "github.com/spacemeshos/go-spacemesh/signing" "github.com/spacemeshos/go-spacemesh/sql" + "github.com/spacemeshos/go-spacemesh/sql/ballots/util" "github.com/spacemeshos/go-spacemesh/sql/layers" dbmetrics "github.com/spacemeshos/go-spacemesh/sql/metrics" "github.com/spacemeshos/go-spacemesh/syncer" @@ -638,11 +640,17 @@ func (app *App) initServices(ctx context.Context) error { }) executor := mesh.NewExecutor(app.cachedDB, state, app.conState, app.addLogger(ExecutorLogger, lg)) - msh, err := mesh.NewMesh(app.cachedDB, app.clock, trtl, executor, app.conState, app.addLogger(MeshLogger, lg)) + mlog := app.addLogger(MeshLogger, lg) + msh, err := mesh.NewMesh(app.cachedDB, app.clock, trtl, executor, app.conState, mlog) if err != nil { return fmt.Errorf("failed to create mesh: %w", err) } + app.eg.Go(func() error { + prune.Prune(ctx, mlog.Zap(), app.db, app.clock, app.Config.Tortoise.Hdist, app.Config.DatabasePruneInterval) + return nil + }) + fetcherWrapped := &layerFetcher{} atxHandler := activation.NewHandler( app.cachedDB, @@ -1322,13 +1330,15 @@ func (app *App) LoadOrCreateEdSigner() (*signing.EdSigner, error) { return edSgn, nil } -func (app *App) setupDBs(ctx context.Context, lg log.Log, dbPath string) error { +func (app *App) setupDBs(ctx context.Context, lg log.Log) error { + dbPath := app.Config.DataDir() if err := os.MkdirAll(dbPath, os.ModePerm); err != nil { return fmt.Errorf("failed to create %s: %w", dbPath, err) } sqlDB, err := sql.Open("file:"+filepath.Join(dbPath, dbFile), sql.WithConnections(app.Config.DatabaseConnections), sql.WithLatencyMetering(app.Config.DatabaseLatencyMetering), + sql.WithV4Migration(util.ExtractActiveSet), ) if err != nil { return fmt.Errorf("open sqlite db %w", err) @@ -1407,7 +1417,6 @@ func (app *App) startSynchronous(ctx context.Context) (err error) { } if app.Config.ProfilerURL != "" { - app.profilerService, err = pyroscope.Start(pyroscope.Config{ ApplicationName: app.Config.ProfilerName, // app.Config.ProfilerURL should be the pyroscope server address @@ -1456,7 +1465,7 @@ func (app *App) startSynchronous(ctx context.Context) (err error) { return fmt.Errorf("failed to initialize p2p host: %w", err) } - if err := app.setupDBs(ctx, lg, app.Config.DataDir()); err != nil { + if err := app.setupDBs(ctx, lg); err != nil { return err } if err := app.initServices(ctx); err != nil { diff --git a/prune/interface.go b/prune/interface.go new file mode 100644 index 00000000000..559a9639f6d --- /dev/null +++ b/prune/interface.go @@ -0,0 +1,11 @@ +package prune + +import ( + "github.com/spacemeshos/go-spacemesh/common/types" +) + +//go:generate mockgen -typed -package=prune -destination=./mocks.go -source=./interface.go + +type layerClock interface { + CurrentLayer() types.LayerID +} diff --git a/prune/metrics.go b/prune/metrics.go new file mode 100644 index 00000000000..97eb060fc84 --- /dev/null +++ b/prune/metrics.go @@ -0,0 +1,22 @@ +package prune + +import ( + "github.com/prometheus/client_golang/prometheus" + + "github.com/spacemeshos/go-spacemesh/metrics" +) + +const namespace = "prune" + +var ( + pruneLatency = metrics.NewHistogramWithBuckets( + "prune_seconds", + namespace, + "prune time in seconds", + []string{"step"}, + prometheus.ExponentialBuckets(0.01, 2, 10), + ) + proposalLatency = pruneLatency.WithLabelValues("proposal") + certLatency = pruneLatency.WithLabelValues("cert") + propTxLatency = pruneLatency.WithLabelValues("proptxs") +) diff --git a/prune/mocks.go b/prune/mocks.go new file mode 100644 index 00000000000..41857e78abc --- /dev/null +++ b/prune/mocks.go @@ -0,0 +1,73 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: ./interface.go + +// Package prune is a generated GoMock package. +package prune + +import ( + reflect "reflect" + + types "github.com/spacemeshos/go-spacemesh/common/types" + gomock "go.uber.org/mock/gomock" +) + +// MocklayerClock is a mock of layerClock interface. +type MocklayerClock struct { + ctrl *gomock.Controller + recorder *MocklayerClockMockRecorder +} + +// MocklayerClockMockRecorder is the mock recorder for MocklayerClock. +type MocklayerClockMockRecorder struct { + mock *MocklayerClock +} + +// NewMocklayerClock creates a new mock instance. +func NewMocklayerClock(ctrl *gomock.Controller) *MocklayerClock { + mock := &MocklayerClock{ctrl: ctrl} + mock.recorder = &MocklayerClockMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MocklayerClock) EXPECT() *MocklayerClockMockRecorder { + return m.recorder +} + +// CurrentLayer mocks base method. +func (m *MocklayerClock) CurrentLayer() types.LayerID { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CurrentLayer") + ret0, _ := ret[0].(types.LayerID) + return ret0 +} + +// CurrentLayer indicates an expected call of CurrentLayer. +func (mr *MocklayerClockMockRecorder) CurrentLayer() *layerClockCurrentLayerCall { + mr.mock.ctrl.T.Helper() + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CurrentLayer", reflect.TypeOf((*MocklayerClock)(nil).CurrentLayer)) + return &layerClockCurrentLayerCall{Call: call} +} + +// layerClockCurrentLayerCall wrap *gomock.Call +type layerClockCurrentLayerCall struct { + *gomock.Call +} + +// Return rewrite *gomock.Call.Return +func (c *layerClockCurrentLayerCall) Return(arg0 types.LayerID) *layerClockCurrentLayerCall { + c.Call = c.Call.Return(arg0) + return c +} + +// Do rewrite *gomock.Call.Do +func (c *layerClockCurrentLayerCall) Do(f func() types.LayerID) *layerClockCurrentLayerCall { + c.Call = c.Call.Do(f) + return c +} + +// DoAndReturn rewrite *gomock.Call.DoAndReturn +func (c *layerClockCurrentLayerCall) DoAndReturn(f func() types.LayerID) *layerClockCurrentLayerCall { + c.Call = c.Call.DoAndReturn(f) + return c +} diff --git a/prune/prune.go b/prune/prune.go new file mode 100644 index 00000000000..b5722be7c12 --- /dev/null +++ b/prune/prune.go @@ -0,0 +1,60 @@ +package prune + +import ( + "context" + "time" + + "go.uber.org/zap" + + "github.com/spacemeshos/go-spacemesh/common/types" + "github.com/spacemeshos/go-spacemesh/sql" + "github.com/spacemeshos/go-spacemesh/sql/certificates" + "github.com/spacemeshos/go-spacemesh/sql/proposals" + "github.com/spacemeshos/go-spacemesh/sql/transactions" +) + +func Prune( + ctx context.Context, + logger *zap.Logger, + db sql.Executor, + lc layerClock, + safeDist uint32, + interval time.Duration, +) { + logger.With().Info("db pruning launched", + zap.Uint32("dist", safeDist), + zap.Duration("interval", interval), + ) + for { + select { + case <-ctx.Done(): + return + case <-time.After(interval): + oldest := lc.CurrentLayer() - types.LayerID(safeDist) + t0 := time.Now() + if err := proposals.DeleteBefore(db, oldest); err != nil { + logger.Error("failed to delete proposals", + zap.Stringer("lid", oldest), + zap.Error(err), + ) + } + proposalLatency.Observe(time.Since(t0).Seconds()) + t1 := time.Now() + if err := certificates.DeleteCertBefore(db, oldest); err != nil { + logger.Error("failed to delete certificates", + zap.Stringer("lid", oldest), + zap.Error(err), + ) + } + certLatency.Observe(time.Since(t1).Seconds()) + t2 := time.Now() + if err := transactions.DeleteProposalTxsBefore(db, oldest); err != nil { + logger.Error("failed to delete proposal tx mapping", + zap.Stringer("lid", oldest), + zap.Error(err), + ) + } + propTxLatency.Observe(time.Since(t2).Seconds()) + } + } +} diff --git a/prune/prune_test.go b/prune/prune_test.go new file mode 100644 index 00000000000..286725459e4 --- /dev/null +++ b/prune/prune_test.go @@ -0,0 +1,95 @@ +package prune + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" + "go.uber.org/mock/gomock" + "golang.org/x/sync/errgroup" + + "github.com/spacemeshos/go-spacemesh/common/types" + "github.com/spacemeshos/go-spacemesh/log/logtest" + "github.com/spacemeshos/go-spacemesh/sql" + "github.com/spacemeshos/go-spacemesh/sql/ballots" + "github.com/spacemeshos/go-spacemesh/sql/certificates" + "github.com/spacemeshos/go-spacemesh/sql/proposals" + "github.com/spacemeshos/go-spacemesh/sql/transactions" +) + +func TestPrune(t *testing.T) { + db := sql.InMemory() + current := types.LayerID(10) + mc := NewMocklayerClock(gomock.NewController(t)) + done := make(chan struct{}) + count := 0 + mc.EXPECT().CurrentLayer().DoAndReturn(func() types.LayerID { + if count == 0 { + close(done) + } + count++ + return current + }).AnyTimes() + lyrProps := make([]*types.Proposal, 0, current) + for lid := types.LayerID(0); lid < current; lid++ { + blt := types.NewExistingBallot(types.RandomBallotID(), types.RandomEdSignature(), types.NodeID{1}, lid) + require.NoError(t, ballots.Add(db, &blt)) + p := &types.Proposal{ + InnerProposal: types.InnerProposal{ + Ballot: blt, + TxIDs: []types.TransactionID{types.RandomTransactionID(), types.RandomTransactionID()}, + }, + Signature: types.RandomEdSignature(), + } + p.SetID(types.RandomProposalID()) + require.NoError(t, proposals.Add(db, p)) + require.NoError(t, certificates.Add(db, lid, &types.Certificate{BlockID: types.RandomBlockID()})) + for _, tid := range p.TxIDs { + require.NoError(t, transactions.AddToProposal(db, tid, lid, p.ID())) + } + lyrProps = append(lyrProps, p) + } + confidenceDist := uint32(3) + ctx, cancel := context.WithCancel(context.Background()) + var eg errgroup.Group + eg.Go(func() error { + Prune(ctx, logtest.New(t).Zap(), db, mc, confidenceDist, time.Millisecond) + return nil + }) + require.Eventually(t, func() bool { + select { + case <-done: + oldest := current - types.LayerID(confidenceDist) + for lid := types.LayerID(0); lid < oldest; lid++ { + _, err := certificates.CertifiedBlock(db, lid) + require.ErrorIs(t, err, sql.ErrNotFound) + _, err = proposals.GetByLayer(db, lid) + require.ErrorIs(t, err, sql.ErrNotFound) + for _, tid := range lyrProps[lid].TxIDs { + exists, err := transactions.HasProposalTX(db, lyrProps[lid].ID(), tid) + require.NoError(t, err) + require.False(t, exists) + } + } + for lid := oldest; lid < current; lid++ { + got, err := certificates.CertifiedBlock(db, lid) + require.NoError(t, err) + require.NotEqual(t, types.EmptyBlockID, got) + pps, err := proposals.GetByLayer(db, lid) + require.NoError(t, err) + require.NotEmpty(t, pps) + for _, tid := range lyrProps[lid].TxIDs { + exists, err := transactions.HasProposalTX(db, lyrProps[lid].ID(), tid) + require.NoError(t, err) + require.True(t, exists) + } + } + return true + default: + return false + } + }, time.Second, 10*time.Millisecond) + cancel() + require.NoError(t, eg.Wait()) +} diff --git a/sql/ballots/ballots.go b/sql/ballots/ballots.go index d543b2ced29..d539a3ef051 100644 --- a/sql/ballots/ballots.go +++ b/sql/ballots/ballots.go @@ -70,6 +70,17 @@ func Has(db sql.Executor, id types.BallotID) (bool, error) { return rows > 0, nil } +func UpdateBlob(db sql.Executor, bid types.BallotID, blob []byte) error { + if _, err := db.Exec(`update ballots set ballot = ?2 where id = ?1;`, + func(stmt *sql.Statement) { + stmt.BindBytes(1, bid.Bytes()) + stmt.BindBytes(2, blob[:]) + }, nil); err != nil { + return fmt.Errorf("update blob %s: %w", bid.String(), err) + } + return nil +} + // Get ballot with id from database. func Get(db sql.Executor, id types.BallotID) (rst *types.Ballot, err error) { if rows, err := db.Exec(`select pubkey, ballot, length(identities.proof) diff --git a/sql/ballots/ballots_test.go b/sql/ballots/ballots_test.go index 7c2a0b62770..ef384b394d4 100644 --- a/sql/ballots/ballots_test.go +++ b/sql/ballots/ballots_test.go @@ -7,6 +7,7 @@ import ( "github.com/stretchr/testify/require" + "github.com/spacemeshos/go-spacemesh/codec" "github.com/spacemeshos/go-spacemesh/common/types" "github.com/spacemeshos/go-spacemesh/signing" "github.com/spacemeshos/go-spacemesh/sql" @@ -82,6 +83,26 @@ func TestAdd(t *testing.T) { require.True(t, stored.IsMalicious()) } +func TestUpdateBlob(t *testing.T) { + db := sql.InMemory() + nodeID := types.RandomNodeID() + ballot := types.NewExistingBallot(types.BallotID{1}, types.RandomEdSignature(), nodeID, types.LayerID(0)) + ballot.EpochData = &types.EpochData{ + ActiveSetHash: types.RandomHash(), + } + ballot.ActiveSet = types.RandomActiveSet(199) + require.NoError(t, Add(db, &ballot)) + got, err := Get(db, types.BallotID{1}) + require.NoError(t, err) + require.Equal(t, ballot, *got) + + ballot.ActiveSet = nil + require.NoError(t, UpdateBlob(db, types.BallotID{1}, codec.MustEncode(&ballot))) + got, err = Get(db, types.BallotID{1}) + require.NoError(t, err) + require.Empty(t, got.ActiveSet) +} + func TestHas(t *testing.T) { db := sql.InMemory() ballot := types.NewExistingBallot(types.BallotID{1}, types.EmptyEdSignature, types.EmptyNodeID, types.LayerID(0)) diff --git a/sql/ballots/util/extract.go b/sql/ballots/util/extract.go new file mode 100644 index 00000000000..52e097a06cc --- /dev/null +++ b/sql/ballots/util/extract.go @@ -0,0 +1,58 @@ +package util + +import ( + "errors" + "fmt" + + "github.com/spacemeshos/go-spacemesh/common/types" + "github.com/spacemeshos/go-spacemesh/log" + "github.com/spacemeshos/go-spacemesh/sql" + "github.com/spacemeshos/go-spacemesh/sql/activesets" + "github.com/spacemeshos/go-spacemesh/sql/ballots" +) + +func ExtractActiveSet(db sql.Executor) error { + latest, err := ballots.LatestLayer(db) + if err != nil { + return fmt.Errorf("extract get latest: %w", err) + } + extracted := 0 + unique := 0 + log.With().Info("extracting ballots active sets", + log.Uint32("from", types.EpochID(2).FirstLayer().Uint32()), + log.Uint32("to", latest.Uint32()), + ) + for lid := types.EpochID(2).FirstLayer(); lid <= latest; lid++ { + blts, err := ballots.Layer(db, lid) + if err != nil { + return fmt.Errorf("extract layer %d: %w", lid, err) + } + for _, b := range blts { + if b.EpochData == nil { + continue + } + if len(b.ActiveSet) == 0 { + continue + } + if err := activesets.Add(db, b.EpochData.ActiveSetHash, &types.EpochActiveSet{ + Epoch: b.Layer.GetEpoch(), + Set: b.ActiveSet, + }); err != nil && !errors.Is(err, sql.ErrObjectExists) { + return fmt.Errorf("add active set %s (%s): %w", b.ID().String(), b.EpochData.ActiveSetHash.ShortString(), err) + } else if err == nil { + unique++ + } + // TODO: prune ballot active set after migration 4 is released + //b.ActiveSet = nil + //if err := ballots.UpdateBlob(db, b.ID(), codec.MustEncode(b)); err != nil { + // return fmt.Errorf("update ballot %s: %w", b.ID().String(), err) + //} + extracted++ + } + } + log.With().Info("extracted active sets from ballots", + log.Int("num", extracted), + log.Int("unique", unique), + ) + return nil +} diff --git a/sql/ballots/util/extract_test.go b/sql/ballots/util/extract_test.go new file mode 100644 index 00000000000..c9df4416a39 --- /dev/null +++ b/sql/ballots/util/extract_test.go @@ -0,0 +1,57 @@ +package util + +import ( + "os" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/spacemeshos/go-spacemesh/common/types" + "github.com/spacemeshos/go-spacemesh/sql" + "github.com/spacemeshos/go-spacemesh/sql/activesets" + "github.com/spacemeshos/go-spacemesh/sql/ballots" +) + +func TestMain(m *testing.M) { + types.SetLayersPerEpoch(4) + res := m.Run() + os.Exit(res) +} + +func TestExtractActiveSet(t *testing.T) { + db := sql.InMemory() + current := types.LayerID(20) + blts := make([]*types.Ballot, 0, current) + hashes := []types.Hash32{types.RandomHash(), types.RandomHash()} + actives := [][]types.ATXID{types.RandomActiveSet(11), types.RandomActiveSet(19)} + for lid := types.EpochID(2).FirstLayer(); lid < current; lid++ { + blt := types.NewExistingBallot(types.RandomBallotID(), types.RandomEdSignature(), types.NodeID{1}, lid) + if lid%3 == 0 { + blt.EpochData = &types.EpochData{ + ActiveSetHash: hashes[0], + } + blt.ActiveSet = actives[0] + } + if lid%3 == 1 { + blt.EpochData = &types.EpochData{ + ActiveSetHash: hashes[1], + } + blt.ActiveSet = actives[1] + } + require.NoError(t, ballots.Add(db, &blt)) + blts = append(blts, &blt) + } + require.NoError(t, ExtractActiveSet(db)) + for _, b := range blts { + got, err := ballots.Get(db, b.ID()) + require.NoError(t, err) + if b.Layer%3 != 2 { + require.NotEmpty(t, got.ActiveSet) + } + } + for i, h := range hashes { + got, err := activesets.Get(db, h) + require.NoError(t, err) + require.Equal(t, actives[i], got.Set) + } +} diff --git a/sql/certificates/certs.go b/sql/certificates/certs.go index 9ec75165d09..b5010091a0d 100644 --- a/sql/certificates/certs.go +++ b/sql/certificates/certs.go @@ -164,3 +164,13 @@ func SetInvalid(db sql.Executor, lid types.LayerID, bid types.BlockID) error { } return nil } + +func DeleteCertBefore(db sql.Executor, lid types.LayerID) error { + if _, err := db.Exec(`update certificates set cert = null where layer < ?1;`, + func(stmt *sql.Statement) { + stmt.BindInt64(1, int64(lid)) + }, nil); err != nil { + return fmt.Errorf("delete %s: %w", lid, err) + } + return nil +} diff --git a/sql/certificates/certs_test.go b/sql/certificates/certs_test.go index b345de33bf3..cfe4ff9104e 100644 --- a/sql/certificates/certs_test.go +++ b/sql/certificates/certs_test.go @@ -160,6 +160,22 @@ func TestCertifiedBlock(t *testing.T) { require.Equal(t, types.EmptyBlockID, got) } +func TestDeleteCert(t *testing.T) { + db := sql.InMemory() + require.NoError(t, Add(db, types.LayerID(2), &types.Certificate{BlockID: types.BlockID{2}})) + require.NoError(t, Add(db, types.LayerID(3), &types.Certificate{BlockID: types.BlockID{3}})) + require.NoError(t, Add(db, types.LayerID(4), &types.Certificate{BlockID: types.BlockID{4}})) + require.NoError(t, DeleteCertBefore(db, 4)) + + _, err := CertifiedBlock(db, types.LayerID(2)) + require.ErrorIs(t, err, sql.ErrNotFound) + _, err = CertifiedBlock(db, types.LayerID(3)) + require.ErrorIs(t, err, sql.ErrNotFound) + got, err := CertifiedBlock(db, types.LayerID(4)) + require.NoError(t, err) + require.Equal(t, types.BlockID{4}, got) +} + func TestFirstInEpoch(t *testing.T) { db := sql.InMemory() diff --git a/sql/database.go b/sql/database.go index df95109e6f5..9b9c3aafed5 100644 --- a/sql/database.go +++ b/sql/database.go @@ -59,6 +59,9 @@ type conf struct { connections int migrations Migrations enableLatency bool + + // TODO: remove after state is pruned for majority + v4Migration func(Executor) error } // WithConnections overwrites number of pooled connections. @@ -75,6 +78,12 @@ func WithMigrations(migrations Migrations) Opt { } } +func WithV4Migration(cb func(Executor) error) Opt { + return func(c *conf) { + c.v4Migration = cb + } +} + // WithLatencyMetering enables metric that track latency for every database query. // Note that it will be a significant amount of data, and should not be enabled on // multiple nodes by default. @@ -116,6 +125,10 @@ func Open(uri string, opts ...Opt) (*Database, error) { db.latency = newQueryLatency() } if config.migrations != nil { + before, err := version(db) + if err != nil { + return nil, err + } tx, err := db.Tx(context.Background()) if err != nil { return nil, err @@ -128,13 +141,26 @@ func Open(uri string, opts ...Opt) (*Database, error) { if err != nil { return nil, err } + after, err := version(db) + if err != nil { + return nil, err + } + if before <= 3 && after == 4 && config.v4Migration != nil { + // v4 migration (active set extraction) needs the 3rd migration to execute first + if err := config.v4Migration(db); err != nil { + return nil, err + } + if err := Vacuum(db); err != nil { + return nil, err + } + } } for i := 0; i < config.connections; i++ { conn := pool.Get(context.Background()) if err := registerFunctions(conn); err != nil { return nil, err } - defer pool.Put(conn) + pool.Put(conn) } return db, nil } diff --git a/sql/migrations.go b/sql/migrations.go index c6bf08f9fe8..aeec91e0bda 100644 --- a/sql/migrations.go +++ b/sql/migrations.go @@ -23,6 +23,17 @@ type migration struct { // Migrations is interface for migrations provider. type Migrations func(Executor) error +func version(db Executor) (int, error) { + var current int + if _, err := db.Exec("PRAGMA user_version;", nil, func(stmt *Statement) bool { + current = stmt.ColumnInt(0) + return true + }); err != nil { + return 0, fmt.Errorf("read user_version %w", err) + } + return current, nil +} + func embeddedMigrations(db Executor) error { var migrations []migration fs.WalkDir(embedded, "migrations", func(path string, d fs.DirEntry, err error) error { @@ -62,13 +73,9 @@ func embeddedMigrations(db Executor) error { return migrations[i].order < migrations[j].order }) - var current int - - if _, err := db.Exec("PRAGMA user_version;", nil, func(stmt *Statement) bool { - current = stmt.ColumnInt(0) - return true - }); err != nil { - return fmt.Errorf("read user_version %w", err) + current, err := version(db) + if err != nil { + return err } for _, m := range migrations { diff --git a/sql/migrations/0003_next.sql b/sql/migrations/0003_v1.1.5.sql similarity index 100% rename from sql/migrations/0003_next.sql rename to sql/migrations/0003_v1.1.5.sql diff --git a/sql/migrations/0004_next.sql b/sql/migrations/0004_next.sql new file mode 100644 index 00000000000..7b8a0aded0d --- /dev/null +++ b/sql/migrations/0004_next.sql @@ -0,0 +1,3 @@ +DELETE FROM proposals; +DELETE FROM proposal_transactions; +UPDATE certificates SET cert = NULL WHERE layer < 19000; \ No newline at end of file diff --git a/sql/migrations_test.go b/sql/migrations_test.go index 8a0c637c467..2cb0de2b0f6 100644 --- a/sql/migrations_test.go +++ b/sql/migrations_test.go @@ -15,5 +15,5 @@ func TestMigrationsAppliedOnce(t *testing.T) { return true }) require.NoError(t, err) - require.Equal(t, version, 3) + require.Equal(t, version, 4) } diff --git a/sql/transactions/transactions.go b/sql/transactions/transactions.go index 4c7e6541078..536b0bf99ff 100644 --- a/sql/transactions/transactions.go +++ b/sql/transactions/transactions.go @@ -59,6 +59,17 @@ func AddToProposal(db sql.Executor, tid types.TransactionID, lid types.LayerID, return nil } +func DeleteProposalTxsBefore(db sql.Executor, lid types.LayerID) error { + if _, err := db.Exec(` + delete from proposal_transactions where layer < ?1;`, + func(stmt *sql.Statement) { + stmt.BindInt64(1, int64(lid)) + }, nil); err != nil { + return fmt.Errorf("DeleteProposalTxs %d: %w", lid, err) + } + return nil +} + // HasProposalTX returns true if the given transaction is included in the given proposal. func HasProposalTX(db sql.Executor, pid types.ProposalID, tid types.TransactionID) (bool, error) { rows, err := db.Exec("select 1 from proposal_transactions where pid = ?1 and tid = ?2", diff --git a/sql/transactions/transactions_test.go b/sql/transactions/transactions_test.go index 28a8070ea50..a698b64f4d4 100644 --- a/sql/transactions/transactions_test.go +++ b/sql/transactions/transactions_test.go @@ -154,6 +154,37 @@ func TestAddToProposal(t *testing.T) { require.False(t, has) } +func TestDeleteProposalTxs(t *testing.T) { + db := sql.InMemory() + proposals := map[types.LayerID][]types.ProposalID{ + types.LayerID(10): {{1, 1}, {1, 2}}, + types.LayerID(11): {{2, 1}, {2, 2}}, + } + tids := []types.TransactionID{{1, 2}, {2, 3}} + for lid, pids := range proposals { + for _, tid := range tids { + for _, pid := range pids { + require.NoError(t, transactions.AddToProposal(db, tid, lid, pid)) + } + } + } + require.NoError(t, transactions.DeleteProposalTxsBefore(db, types.LayerID(11))) + for _, pid := range proposals[types.LayerID(10)] { + for _, tid := range tids { + has, err := transactions.HasProposalTX(db, pid, tid) + require.NoError(t, err) + require.False(t, has) + } + } + for _, pid := range proposals[types.LayerID(11)] { + for _, tid := range tids { + has, err := transactions.HasProposalTX(db, pid, tid) + require.NoError(t, err) + require.True(t, has) + } + } +} + func TestAddToBlock(t *testing.T) { db := sql.InMemory() diff --git a/sql/vacuum.go b/sql/vacuum.go new file mode 100644 index 00000000000..0f874fe14f8 --- /dev/null +++ b/sql/vacuum.go @@ -0,0 +1,20 @@ +package sql + +import ( + "fmt" + + "github.com/spacemeshos/go-spacemesh/log" +) + +func Vacuum(db Executor) error { + log.Info("vacuuming db...") + if _, err := db.Exec("vacuum", nil, nil); err != nil { + return fmt.Errorf("vacuum %w", err) + } + log.Info("checkpointing db...") + if _, err := db.Exec("pragma wal_checkpoint(TRUNCATE)", nil, nil); err != nil { + return fmt.Errorf("wal checkpoint %w", err) + } + log.Info("db vacuum completed") + return nil +} diff --git a/sql/vacuum_test.go b/sql/vacuum_test.go new file mode 100644 index 00000000000..b994516279e --- /dev/null +++ b/sql/vacuum_test.go @@ -0,0 +1,12 @@ +package sql + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestVacuumDB(t *testing.T) { + db := InMemory() + require.NoError(t, Vacuum(db)) +}