From 80c17965d0cafa43b86f25d20007ac5d12dc3b8b Mon Sep 17 00:00:00 2001 From: Matthias <5011972+fasmat@users.noreply.github.com> Date: Thu, 22 Feb 2024 12:48:17 +0000 Subject: [PATCH] Update Recovery for multi-smesher --- checkpoint/recovery.go | 72 ++++++++++++++++++++++++------------- checkpoint/recovery_test.go | 19 +++++----- node/node.go | 16 ++++----- 3 files changed, 66 insertions(+), 41 deletions(-) diff --git a/checkpoint/recovery.go b/checkpoint/recovery.go index 3d4ee5e13d..ca80a46d96 100644 --- a/checkpoint/recovery.go +++ b/checkpoint/recovery.go @@ -45,7 +45,7 @@ type RecoverConfig struct { DbFile string LocalDbFile string PreserveOwnAtx bool - NodeID types.NodeID + NodeIDs []types.NodeID Uri string Restore types.LayerID } @@ -176,15 +176,41 @@ func recoverFromLocalFile( log.Int("num accounts", len(data.accounts)), log.Int("num atxs", len(data.atxs)), ) - deps, proofs, err := collectOwnAtxDeps(logger, db, localDB, cfg, data) - if err != nil { - logger.With().Error("failed to collect deps for own atx", log.Err(err)) - // continue to recover from checkpoint despite failure to preserve own atx - } else if len(deps) > 0 { - logger.With().Info("collected own atx deps", - log.Context(ctx), - log.Int("own atx deps", len(deps)), - ) + allDeps := make([]*types.VerifiedActivationTx, 0) + allProofs := make([]*types.PoetProofMessage, 0) + if cfg.PreserveOwnAtx { + for _, nodeID := range cfg.NodeIDs { + deps, proofs, err := collectOwnAtxDeps(logger, db, localDB, nodeID, cfg.GoldenAtx, data) + if err != nil { + logger.With().Error("failed to collect deps for own atx", + nodeID, + log.Err(err), + ) + // continue to recover from checkpoint despite failure to preserve own atx + continue + } + + logger.With().Info("collected own atx deps", + log.Context(ctx), + nodeID, + log.Int("own atx deps", len(deps)), + ) + allDeps = append(allDeps, deps...) + allProofs = append(allProofs, proofs...) + + // // deduplicate allDeps and allProofs by sorting and compacting + // // TODO: for some reason this causes existing tests to fail - need to investigate + // slices.SortFunc(allDeps, func(i, j *types.VerifiedActivationTx) int { + // return bytes.Compare(i.ID().Bytes(), j.ID().Bytes()) + // }) + // allDeps = slices.Compact(allDeps) + // slices.SortFunc(allProofs, func(i, j *types.PoetProofMessage) int { + // iRef, _ := i.Ref() + // jRef, _ := j.Ref() + // return bytes.Compare(iRef[:], jRef[:]) + // }) + // allProofs = slices.Compact(allProofs) + } } if err := db.Close(); err != nil { return nil, fmt.Errorf("close old db: %w", err) @@ -248,8 +274,8 @@ func recoverFromLocalFile( types.GetEffectiveGenesis(), ) var preserve *PreservedData - if len(deps) > 0 { - preserve = &PreservedData{Deps: deps, Proofs: proofs} + if len(allDeps) > 0 { + preserve = &PreservedData{Deps: allDeps, Proofs: allProofs} } return preserve, nil } @@ -311,13 +337,11 @@ func collectOwnAtxDeps( logger log.Log, db *sql.Database, localDB *localsql.Database, - cfg *RecoverConfig, + nodeID types.NodeID, + goldenATX types.ATXID, data *recoverydata, ) ([]*types.VerifiedActivationTx, []*types.PoetProofMessage, error) { - if !cfg.PreserveOwnAtx { - return nil, nil, nil - } - atxid, err := atxs.GetLastIDByNodeID(db, cfg.NodeID) + atxid, err := atxs.GetLastIDByNodeID(db, nodeID) if err != nil && !errors.Is(err, sql.ErrNotFound) { return nil, nil, fmt.Errorf("query own last atx id: %w", err) } @@ -329,8 +353,8 @@ func collectOwnAtxDeps( own = true } - // check for if miner is building any atx - nipostCh, _ := nipost.Challenge(localDB, cfg.NodeID) + // check for if smesher is building any atx + nipostCh, _ := nipost.Challenge(localDB, nodeID) if ref == types.EmptyATXID { if nipostCh == nil { return nil, nil, nil @@ -340,7 +364,7 @@ func collectOwnAtxDeps( } } - all := map[types.ATXID]struct{}{cfg.GoldenAtx: {}, types.EmptyATXID: {}} + all := map[types.ATXID]struct{}{goldenATX: {}, types.EmptyATXID: {}} for _, catx := range data.atxs { all[catx.ID] = struct{}{} } @@ -353,18 +377,16 @@ func collectOwnAtxDeps( ref, log.Bool("own", own), ) - deps, proofs, err = collectDeps(db, cfg.GoldenAtx, ref, all) + deps, proofs, err = collectDeps(db, goldenATX, ref, all) if err != nil { return nil, nil, err } } if nipostCh != nil { - logger.With().Info("collecting pending atx and deps", - log.Object("nipost", nipostCh), - ) + logger.With().Info("collecting pending atx and deps", log.Object("nipost", nipostCh)) // any previous atx in nipost should already be captured earlier // we only care about positioning atx here - deps2, proofs2, err := collectDeps(db, cfg.GoldenAtx, nipostCh.PositioningATX, all) + deps2, proofs2, err := collectDeps(db, goldenATX, nipostCh.PositioningATX, all) if err != nil { return nil, nil, fmt.Errorf("deps from nipost positioning atx (%v): %w", nipostCh.PositioningATX, err) } diff --git a/checkpoint/recovery_test.go b/checkpoint/recovery_test.go index f2b73dd201..500357afe9 100644 --- a/checkpoint/recovery_test.go +++ b/checkpoint/recovery_test.go @@ -157,7 +157,7 @@ func TestRecover(t *testing.T) { DbFile: "test.sql", LocalDbFile: "local.sql", PreserveOwnAtx: true, - NodeID: types.NodeID{2, 3, 4}, + NodeIDs: []types.NodeID{types.RandomNodeID()}, Uri: tc.uri, Restore: types.LayerID(recoverLayer), } @@ -204,7 +204,7 @@ func TestRecover_SameRecoveryInfo(t *testing.T) { DataDir: t.TempDir(), DbFile: "test.sql", PreserveOwnAtx: true, - NodeID: types.NodeID{2, 3, 4}, + NodeIDs: []types.NodeID{types.RandomNodeID()}, Uri: fmt.Sprintf("%s/snapshot-15", ts.URL), Restore: types.LayerID(recoverLayer), } @@ -254,6 +254,9 @@ func validateAndPreserveData( lg, ) mfetch.EXPECT().GetAtxs(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + for _, vatx := range deps { + fmt.Println("deps", vatx.ID()) + } for i, vatx := range deps { encoded, err := codec.Encode(vatx) require.NoError(tb, err) @@ -435,7 +438,7 @@ func TestRecover_OwnAtxNotInCheckpoint_Preserve(t *testing.T) { DbFile: "test.sql", LocalDbFile: "local.sql", PreserveOwnAtx: true, - NodeID: sig.NodeID(), + NodeIDs: []types.NodeID{sig.NodeID()}, Uri: fmt.Sprintf("%s/snapshot-15", ts.URL), Restore: types.LayerID(recoverLayer), } @@ -507,7 +510,7 @@ func TestRecover_OwnAtxNotInCheckpoint_Preserve_IncludePending(t *testing.T) { DbFile: "test.sql", LocalDbFile: "local.sql", PreserveOwnAtx: true, - NodeID: sig.NodeID(), + NodeIDs: []types.NodeID{sig.NodeID()}, Uri: fmt.Sprintf("%s/snapshot-15", ts.URL), Restore: types.LayerID(recoverLayer), } @@ -596,7 +599,7 @@ func TestRecover_OwnAtxNotInCheckpoint_Preserve_Still_Initializing(t *testing.T) DbFile: "test.sql", LocalDbFile: "local.sql", PreserveOwnAtx: true, - NodeID: sig.NodeID(), + NodeIDs: []types.NodeID{sig.NodeID()}, Uri: fmt.Sprintf("%s/snapshot-15", ts.URL), Restore: types.LayerID(recoverLayer), } @@ -682,7 +685,7 @@ func TestRecover_OwnAtxNotInCheckpoint_Preserve_DepIsGolden(t *testing.T) { DbFile: "test.sql", LocalDbFile: "local.sql", PreserveOwnAtx: true, - NodeID: sig.NodeID(), + NodeIDs: []types.NodeID{sig.NodeID()}, Uri: fmt.Sprintf("%s/snapshot-15", ts.URL), Restore: types.LayerID(recoverLayer), } @@ -764,7 +767,7 @@ func TestRecover_OwnAtxNotInCheckpoint_DontPreserve(t *testing.T) { DbFile: "test.sql", LocalDbFile: "local.sql", PreserveOwnAtx: false, - NodeID: sig.NodeID(), + NodeIDs: []types.NodeID{sig.NodeID()}, Uri: fmt.Sprintf("%s/snapshot-15", ts.URL), Restore: types.LayerID(recoverLayer), } @@ -834,7 +837,7 @@ func TestRecover_OwnAtxInCheckpoint(t *testing.T) { DbFile: "test.sql", LocalDbFile: "local.sql", PreserveOwnAtx: true, - NodeID: nid, + NodeIDs: []types.NodeID{nid}, Uri: fmt.Sprintf("%s/snapshot-15", ts.URL), Restore: types.LayerID(recoverLayer), } diff --git a/node/node.go b/node/node.go index d7d725f0bb..7793210a50 100644 --- a/node/node.go +++ b/node/node.go @@ -429,16 +429,16 @@ func (app *App) LoadCheckpoint(ctx context.Context) (*checkpoint.PreservedData, if restore == 0 { return nil, fmt.Errorf("restore layer not set") } + nodeIDs := make([]types.NodeID, 0, len(app.signers)) cfg := &checkpoint.RecoverConfig{ GoldenAtx: types.ATXID(app.Config.Genesis.GoldenATX()), DataDir: app.Config.DataDir(), DbFile: dbFile, LocalDbFile: localDbFile, PreserveOwnAtx: app.Config.Recovery.PreserveOwnAtx, - // TODO(mafa): FIXXME! - // NodeID: app.edSgn.NodeID(), - Uri: checkpointFile, - Restore: restore, + NodeIDs: nodeIDs, + Uri: checkpointFile, + Restore: restore, } app.log.WithContext(ctx).With().Info("recover from checkpoint", log.String("url", checkpointFile), @@ -453,11 +453,11 @@ func (app *App) Started() <-chan struct{} { // Lock locks the app for exclusive use. It returns an error if the app is already locked. func (app *App) Lock() error { - lockdir := filepath.Dir(app.Config.FileLock) - if _, err := os.Stat(lockdir); errors.Is(err, os.ErrNotExist) { - err := os.Mkdir(lockdir, os.ModePerm) + lockDir := filepath.Dir(app.Config.FileLock) + if _, err := os.Stat(lockDir); errors.Is(err, os.ErrNotExist) { + err := os.Mkdir(lockDir, os.ModePerm) if err != nil { - return fmt.Errorf("creating dir %s for lock %s: %w", lockdir, app.Config.FileLock, err) + return fmt.Errorf("creating dir %s for lock %s: %w", lockDir, app.Config.FileLock, err) } } fl := flock.New(app.Config.FileLock)