From 8d879e0f4f9cf489504c4b777b9d8f4375700880 Mon Sep 17 00:00:00 2001 From: kimmy lin <30611210+countvonzero@users.noreply.github.com> Date: Thu, 14 Sep 2023 19:19:17 -0700 Subject: [PATCH] extract active set and save --- mesh/janitor.go | 51 ++++++++++++++++++++++++++++++++++++ mesh/janitor_test.go | 37 ++++++++++++++++++++++++++ node/node.go | 1 + sql/ballots/ballots.go | 11 ++++++++ sql/ballots/ballots_test.go | 21 +++++++++++++++ sql/database.go | 28 +++++++++++++++----- sql/functions.go | 19 -------------- sql/migrations/0004_next.sql | 8 +++--- 8 files changed, 145 insertions(+), 31 deletions(-) diff --git a/mesh/janitor.go b/mesh/janitor.go index fa14a60ab86..222cfc5fe4f 100644 --- a/mesh/janitor.go +++ b/mesh/janitor.go @@ -2,13 +2,19 @@ package mesh import ( "context" + "errors" + "fmt" "time" "go.uber.org/zap" + "github.com/spacemeshos/go-spacemesh/codec" "github.com/spacemeshos/go-spacemesh/common/types" + "github.com/spacemeshos/go-spacemesh/log" "github.com/spacemeshos/go-spacemesh/mesh/metrics" "github.com/spacemeshos/go-spacemesh/sql" + "github.com/spacemeshos/go-spacemesh/sql/activesets" + "github.com/spacemeshos/go-spacemesh/sql/ballots" "github.com/spacemeshos/go-spacemesh/sql/certificates" "github.com/spacemeshos/go-spacemesh/sql/proposals" "github.com/spacemeshos/go-spacemesh/sql/transactions" @@ -59,3 +65,48 @@ func Prune( } } } + +func ExtractActiveSet(db sql.Executor) error { + latest, err := ballots.LatestLayer(db) + if err != nil { + return fmt.Errorf("extract get latest: %w", err) + } + extracted := 0 + unique := 0 + log.With().Info("extracting ballots active sets", + log.Uint32("from", types.EpochID(2).FirstLayer().Uint32()), + log.Uint32("to", latest.Uint32()), + ) + for lid := types.EpochID(2).FirstLayer(); lid <= latest; lid++ { + blts, err := ballots.Layer(db, lid) + if err != nil { + return fmt.Errorf("extract layer %d: %w", lid, err) + } + for _, b := range blts { + if b.EpochData == nil { + continue + } + if len(b.ActiveSet) == 0 { + continue + } + if err := activesets.Add(db, b.EpochData.ActiveSetHash, &types.EpochActiveSet{ + Epoch: b.Layer.GetEpoch(), + Set: b.ActiveSet, + }); err != nil && !errors.Is(err, sql.ErrObjectExists) { + return fmt.Errorf("add active set %s (%s): %w", b.ID().String(), b.EpochData.ActiveSetHash.ShortString(), err) + } else if err == nil { + unique++ + } + b.ActiveSet = nil + if err := ballots.UpdateBlob(db, b.ID(), codec.MustEncode(b)); err != nil { + return fmt.Errorf("update ballot %s: %w", b.ID().String(), err) + } + extracted++ + } + } + log.With().Info("extracted active sets from ballots", + log.Int("num", extracted), + log.Int("unique", unique), + ) + return nil +} diff --git a/mesh/janitor_test.go b/mesh/janitor_test.go index 2d104729995..d867bfb866d 100644 --- a/mesh/janitor_test.go +++ b/mesh/janitor_test.go @@ -13,6 +13,7 @@ import ( "github.com/spacemeshos/go-spacemesh/log/logtest" "github.com/spacemeshos/go-spacemesh/mesh/mocks" "github.com/spacemeshos/go-spacemesh/sql" + "github.com/spacemeshos/go-spacemesh/sql/activesets" "github.com/spacemeshos/go-spacemesh/sql/ballots" "github.com/spacemeshos/go-spacemesh/sql/certificates" "github.com/spacemeshos/go-spacemesh/sql/proposals" @@ -94,3 +95,39 @@ func TestPrune(t *testing.T) { cancel() eg.Wait() } + +func TestExtractActiveSet(t *testing.T) { + db := sql.InMemory() + current := types.LayerID(20) + blts := make([]*types.Ballot, 0, current) + hashes := []types.Hash32{types.RandomHash(), types.RandomHash()} + actives := [][]types.ATXID{types.RandomActiveSet(11), types.RandomActiveSet(19)} + for lid := types.EpochID(2).FirstLayer(); lid < current; lid++ { + blt := types.NewExistingBallot(types.RandomBallotID(), types.RandomEdSignature(), types.NodeID{1}, lid) + if lid%3 == 0 { + blt.EpochData = &types.EpochData{ + ActiveSetHash: hashes[0], + } + blt.ActiveSet = actives[0] + } + if lid%3 == 1 { + blt.EpochData = &types.EpochData{ + ActiveSetHash: hashes[1], + } + blt.ActiveSet = actives[1] + } + require.NoError(t, ballots.Add(db, &blt)) + blts = append(blts, &blt) + } + require.NoError(t, ExtractActiveSet(db)) + for _, b := range blts { + got, err := ballots.Get(db, b.ID()) + require.NoError(t, err) + require.Empty(t, got.ActiveSet) + } + for i, h := range hashes { + got, err := activesets.Get(db, h) + require.NoError(t, err) + require.Equal(t, actives[i], got.Set) + } +} diff --git a/node/node.go b/node/node.go index 5114b0d90e4..3779117a309 100644 --- a/node/node.go +++ b/node/node.go @@ -1336,6 +1336,7 @@ func (app *App) setupDBs(ctx context.Context, lg log.Log) error { sqlDB, err := sql.Open("file:"+filepath.Join(dbPath, dbFile), sql.WithConnections(app.Config.DatabaseConnections), sql.WithLatencyMetering(app.Config.DatabaseLatencyMetering), + sql.WithV4PreMigration(mesh.ExtractActiveSet), ) if err != nil { return fmt.Errorf("open sqlite db %w", err) diff --git a/sql/ballots/ballots.go b/sql/ballots/ballots.go index 827a29df950..c2f64f314fa 100644 --- a/sql/ballots/ballots.go +++ b/sql/ballots/ballots.go @@ -70,6 +70,17 @@ func Has(db sql.Executor, id types.BallotID) (bool, error) { return rows > 0, nil } +func UpdateBlob(db sql.Executor, bid types.BallotID, blob []byte) error { + if _, err := db.Exec(`update ballots set ballot = ?2 where id = ?1;`, + func(stmt *sql.Statement) { + stmt.BindBytes(1, bid.Bytes()) + stmt.BindBytes(2, blob[:]) + }, nil); err != nil { + return fmt.Errorf("update blob %s: %w", bid.String(), err) + } + return nil +} + // Get ballot with id from database. func Get(db sql.Executor, id types.BallotID) (rst *types.Ballot, err error) { if rows, err := db.Exec(`select pubkey, ballot, length(identities.proof) diff --git a/sql/ballots/ballots_test.go b/sql/ballots/ballots_test.go index 88dd3b8b77d..b644f363dee 100644 --- a/sql/ballots/ballots_test.go +++ b/sql/ballots/ballots_test.go @@ -7,6 +7,7 @@ import ( "github.com/stretchr/testify/require" + "github.com/spacemeshos/go-spacemesh/codec" "github.com/spacemeshos/go-spacemesh/common/types" "github.com/spacemeshos/go-spacemesh/signing" "github.com/spacemeshos/go-spacemesh/sql" @@ -75,6 +76,26 @@ func TestAdd(t *testing.T) { require.True(t, stored.IsMalicious()) } +func TestUpdateBlob(t *testing.T) { + db := sql.InMemory() + nodeID := types.RandomNodeID() + ballot := types.NewExistingBallot(types.BallotID{1}, types.RandomEdSignature(), nodeID, types.LayerID(0)) + ballot.EpochData = &types.EpochData{ + ActiveSetHash: types.RandomHash(), + } + ballot.ActiveSet = types.RandomActiveSet(199) + require.NoError(t, Add(db, &ballot)) + got, err := Get(db, types.BallotID{1}) + require.NoError(t, err) + require.Equal(t, ballot, *got) + + ballot.ActiveSet = nil + require.NoError(t, UpdateBlob(db, types.BallotID{1}, codec.MustEncode(&ballot))) + got, err = Get(db, types.BallotID{1}) + require.NoError(t, err) + require.Empty(t, got.ActiveSet) +} + func TestHas(t *testing.T) { db := sql.InMemory() ballot := types.NewExistingBallot(types.BallotID{1}, types.EmptyEdSignature, types.EmptyNodeID, types.LayerID(0)) diff --git a/sql/database.go b/sql/database.go index 732f9fd10f0..ed04b6cd704 100644 --- a/sql/database.go +++ b/sql/database.go @@ -59,6 +59,9 @@ type conf struct { connections int migrations Migrations enableLatency bool + + // TODO: remove after state is pruned for majority + v4preMigration func(Executor) error } // WithConnections overwrites number of pooled connections. @@ -75,6 +78,12 @@ func WithMigrations(migrations Migrations) Opt { } } +func WithV4PreMigration(cb func(Executor) error) Opt { + return func(c *conf) { + c.v4preMigration = cb + } +} + // WithLatencyMetering enables metric that track latency for every database query. // Note that it will be a significant amount of data, and should not be enabled on // multiple nodes by default. @@ -115,18 +124,16 @@ func Open(uri string, opts ...Opt) (*Database, error) { if config.enableLatency { db.latency = newQueryLatency() } - for i := 0; i < config.connections; i++ { - conn := pool.Get(context.Background()) - if err := registerFunctions(conn); err != nil { - return nil, err - } - pool.Put(conn) - } if config.migrations != nil { before, err := version(db) if err != nil { return nil, err } + if before == 3 && config.v4preMigration != nil { + if err := config.v4preMigration(db); err != nil { + return nil, err + } + } tx, err := db.Tx(context.Background()) if err != nil { return nil, err @@ -145,6 +152,13 @@ func Open(uri string, opts ...Opt) (*Database, error) { } } } + for i := 0; i < config.connections; i++ { + conn := pool.Get(context.Background()) + if err := registerFunctions(conn); err != nil { + return nil, err + } + pool.Put(conn) + } return db, nil } diff --git a/sql/functions.go b/sql/functions.go index fb2ce92b241..07f2c21c851 100644 --- a/sql/functions.go +++ b/sql/functions.go @@ -4,9 +4,6 @@ import ( "fmt" sqlite "github.com/go-llsqlite/crawshaw" - - "github.com/spacemeshos/go-spacemesh/codec" - "github.com/spacemeshos/go-spacemesh/common/types" ) func registerFunctions(conn *sqlite.Conn) error { @@ -19,21 +16,5 @@ func registerFunctions(conn *sqlite.Conn) error { }, nil, nil); err != nil { return fmt.Errorf("registering add_uint64: %w", err) } - // function to prune active set from old ballots - if err := conn.CreateFunction("prune_actives", true, 1, func(ctx sqlite.Context, values ...sqlite.Value) { - var ballot types.Ballot - if err := codec.Decode(values[0].Blob(), &ballot); err != nil { - ctx.ResultError(err) - } else { - ballot.ActiveSet = nil - if blob, err := codec.Encode(&ballot); err != nil { - ctx.ResultError(err) - } else { - ctx.ResultBlob(blob) - } - } - }, nil, nil); err != nil { - return fmt.Errorf("registering prune_actives: %w", err) - } return nil } diff --git a/sql/migrations/0004_next.sql b/sql/migrations/0004_next.sql index f74ef85c5f7..7b8a0aded0d 100644 --- a/sql/migrations/0004_next.sql +++ b/sql/migrations/0004_next.sql @@ -1,5 +1,3 @@ -DELETE FROM proposals WHERE layer < 19000; -DELETE FROM proposal_transactions WHERE layer < 19000; -UPDATE certificates SET cert = NULL WHERE layer < 19000; -UPDATE ballots SET ballot = prune_actives(ballot); - +DELETE FROM proposals; +DELETE FROM proposal_transactions; +UPDATE certificates SET cert = NULL WHERE layer < 19000; \ No newline at end of file