diff --git a/CHANGELOG.md b/CHANGELOG.md index 9e83968b9fc..f4cc9861323 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,20 @@ See [RELEASE](./RELEASE.md) for workflow instructions. +## UNRELEASED + +### Upgrade information + +### Highlights + +### Features + +### Improvements + +* [#4965](https://github.com/spacemeshos/go-spacemesh/pull/4965) Updates to PoST: + * Prevent errors when shutting down the node that can result in a crash + * `postdata_metadata.json` is now updated atomically to prevent corruption of the file. + ## v1.1.4 ### Upgrade information diff --git a/activation/post_verifier.go b/activation/post_verifier.go index 55e0b869715..765830065a7 100644 --- a/activation/post_verifier.go +++ b/activation/post_verifier.go @@ -22,6 +22,8 @@ type verifyPostJob struct { type OffloadingPostVerifier struct { eg errgroup.Group + stop context.CancelFunc + stopped <-chan struct{} log log.Log workers []*postVerifierWorker channel chan<- *verifyPostJob @@ -71,23 +73,30 @@ func NewOffloadingPostVerifier(verifiers []PostVerifier, logger log.Log) *Offloa } logger.With().Info("created post verifier", log.Int("num_workers", numWorkers)) - return &OffloadingPostVerifier{ + ctx, cancel := context.WithCancel(context.Background()) + stopped := make(chan struct{}) + v := &OffloadingPostVerifier{ log: logger, workers: workers, channel: channel, + stopped: stopped, + stop: func() { + cancel() + select { + case <-stopped: + default: + close(stopped) + } + }, } -} -func (v *OffloadingPostVerifier) Start(ctx context.Context) { v.log.Info("starting post verifier") for _, worker := range v.workers { worker := worker v.eg.Go(func() error { return worker.start(ctx) }) } - <-ctx.Done() - v.log.Info("stopping post verifier") - v.eg.Wait() - v.log.Info("stopped post verifier") + v.log.Info("started post verifier") + return v } func (v *OffloadingPostVerifier) Verify(ctx context.Context, p *shared.Proof, m *shared.ProofMetadata, opts ...verifying.OptionFunc) error { @@ -97,8 +106,11 @@ func (v *OffloadingPostVerifier) Verify(ctx context.Context, p *shared.Proof, m opts: opts, result: make(chan error, 1), } + select { case v.channel <- job: + case <-v.stopped: + return fmt.Errorf("verifier is closed") case <-ctx.Done(): return fmt.Errorf("submitting verifying job: %w", ctx.Err()) } @@ -106,17 +118,24 @@ func (v *OffloadingPostVerifier) Verify(ctx context.Context, p *shared.Proof, m select { case res := <-job.result: return res + case <-v.stopped: + return fmt.Errorf("verifier is closed") case <-ctx.Done(): return fmt.Errorf("waiting for verification result: %w", ctx.Err()) } } func (v *OffloadingPostVerifier) Close() error { + v.log.Info("stopping post verifier") + v.stop() + v.eg.Wait() + for _, worker := range v.workers { if err := worker.verifier.Close(); err != nil { return err } } + v.log.Info("stopped post verifier") return nil } diff --git a/activation/post_verifier_test.go b/activation/post_verifier_test.go index 7fb6fed82d2..e1932cabd69 100644 --- a/activation/post_verifier_test.go +++ b/activation/post_verifier_test.go @@ -4,6 +4,7 @@ import ( "context" "errors" "testing" + "time" "github.com/spacemeshos/post/shared" "github.com/stretchr/testify/require" @@ -26,29 +27,19 @@ func TestOffloadingPostVerifier(t *testing.T) { []activation.PostVerifier{verifier}, log.NewDefault(t.Name()), ) + defer offloadingVerifier.Close() + verifier.EXPECT().Close().Return(nil) - var eg errgroup.Group - eg.Go(func() error { - offloadingVerifier.Start(ctx) - return nil - }) - - { - verifier.EXPECT().Verify(ctx, &proof, &metadata, gomock.Any()).Return(nil) - err := offloadingVerifier.Verify(ctx, &proof, &metadata) - require.NoError(t, err) - } - { - verifier.EXPECT().Verify(ctx, &proof, &metadata, gomock.Any()).Return(errors.New("invalid proof!")) - err := offloadingVerifier.Verify(ctx, &proof, &metadata) - require.ErrorContains(t, err, "invalid proof!") - } + verifier.EXPECT().Verify(gomock.Any(), &proof, &metadata, gomock.Any()).Return(nil) + err := offloadingVerifier.Verify(ctx, &proof, &metadata) + require.NoError(t, err) - cancel() - require.NoError(t, eg.Wait()) + verifier.EXPECT().Verify(gomock.Any(), &proof, &metadata, gomock.Any()).Return(errors.New("invalid proof!")) + err = offloadingVerifier.Verify(ctx, &proof, &metadata) + require.ErrorContains(t, err, "invalid proof!") } -func TestPostVerfierDetectsInvalidProof(t *testing.T) { +func TestPostVerifierDetectsInvalidProof(t *testing.T) { verifier, err := activation.NewPostVerifier(activation.PostConfig{}, log.NewDefault(t.Name())) require.NoError(t, err) defer verifier.Close() @@ -67,26 +58,53 @@ func TestPostVerifierVerifyAfterStop(t *testing.T) { []activation.PostVerifier{verifier}, log.NewDefault(t.Name()), ) + defer offloadingVerifier.Close() + verifier.EXPECT().Close().Return(nil) + + verifier.EXPECT().Verify(gomock.Any(), &proof, &metadata, gomock.Any()).Return(nil) + err := offloadingVerifier.Verify(ctx, &proof, &metadata) + require.NoError(t, err) + + // Stop the verifier + verifier.EXPECT().Close().Return(nil) + offloadingVerifier.Close() + + err = offloadingVerifier.Verify(ctx, &proof, &metadata) + require.EqualError(t, err, "verifier is closed") +} + +func TestPostVerifierNoRaceOnClose(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + proof := shared.Proof{} + metadata := shared.ProofMetadata{} + + verifier := activation.NewMockPostVerifier(gomock.NewController(t)) + offloadingVerifier := activation.NewOffloadingPostVerifier( + []activation.PostVerifier{verifier}, + log.NewDefault(t.Name()), + ) + defer offloadingVerifier.Close() + verifier.EXPECT().Close().AnyTimes().Return(nil) + verifier.EXPECT().Verify(gomock.Any(), &proof, &metadata, gomock.Any()).AnyTimes().Return(nil) + // Stop the verifier var eg errgroup.Group eg.Go(func() error { - offloadingVerifier.Start(ctx) - return nil + time.Sleep(50 * time.Millisecond) + return offloadingVerifier.Close() }) - { - verifier.EXPECT().Verify(ctx, &proof, &metadata, gomock.Any()).Return(nil) - err := offloadingVerifier.Verify(ctx, &proof, &metadata) - require.NoError(t, err) + for i := 0; i < 10; i++ { + ms := 10 * i + eg.Go(func() error { + time.Sleep(time.Duration(ms) * time.Millisecond) + return offloadingVerifier.Verify(ctx, &proof, &metadata) + }) } - // Stop the verifier - cancel() - require.NoError(t, eg.Wait()) - { - err := offloadingVerifier.Verify(ctx, &proof, &metadata) - require.ErrorIs(t, err, context.Canceled) - } + require.EqualError(t, eg.Wait(), "verifier is closed") } func TestPostVerifierReturnsOnCtxCanceledWhenBlockedVerifying(t *testing.T) { @@ -98,15 +116,8 @@ func TestPostVerifierReturnsOnCtxCanceledWhenBlockedVerifying(t *testing.T) { // empty list of verifiers - no one will verify the proof }, log.NewDefault(t.Name())) - var eg errgroup.Group - eg.Go(func() error { - v.Start(ctx) - return nil - }) + require.NoError(t, v.Close()) - cancel() err := v.Verify(ctx, &shared.Proof{}, &shared.ProofMetadata{}) - require.ErrorIs(t, err, context.Canceled) - - require.NoError(t, eg.Wait()) + require.EqualError(t, err, "verifier is closed") } diff --git a/go.mod b/go.mod index d4e8ea7b77a..1fb4b33e637 100644 --- a/go.mod +++ b/go.mod @@ -40,7 +40,7 @@ require ( github.com/spacemeshos/go-scale v1.1.10 github.com/spacemeshos/merkle-tree v0.2.3 github.com/spacemeshos/poet v0.9.1 - github.com/spacemeshos/post v0.9.3 + github.com/spacemeshos/post v0.9.4 github.com/spf13/afero v1.9.5 github.com/spf13/cobra v1.7.0 github.com/spf13/pflag v1.0.5 diff --git a/go.sum b/go.sum index a5dc994df44..a516d2b8e0a 100644 --- a/go.sum +++ b/go.sum @@ -622,8 +622,8 @@ github.com/spacemeshos/merkle-tree v0.2.3 h1:zGEgOR9nxAzJr0EWjD39QFngwFEOxfxMloE github.com/spacemeshos/merkle-tree v0.2.3/go.mod h1:VomOcQ5pCBXz7goiWMP5hReyqOfDXGSKbrH2GB9Htww= github.com/spacemeshos/poet v0.9.1 h1:10wiGjuuGCZq9mAuQjRvNCpE5Uv0KU6yW5eynUCIECU= github.com/spacemeshos/poet v0.9.1/go.mod h1:ccxzHl5IRSenCSqonPI8M+5vPNwN+o3QU2X41bhbcao= -github.com/spacemeshos/post v0.9.3 h1:3khU8uSxjicOYrfg3tFP26YyTkbsmbJt1uLJN93ScH4= -github.com/spacemeshos/post v0.9.3/go.mod h1:sWxWEfxH4wc4D2KCY0kBMu4ezz/v52Km/N023SaCcNE= +github.com/spacemeshos/post v0.9.4 h1:l/KGneUnLH5imy2Uml7G9kBnJuzl4ag0ku3U0OWaTxc= +github.com/spacemeshos/post v0.9.4/go.mod h1:YjYLMcFFSpxrI86TW2rhyWCWRRFG+8LPiONgARNuaP4= github.com/spacemeshos/sha256-simd v0.1.0 h1:G7Mfu5RYdQiuE+wu4ZyJ7I0TI74uqLhFnKblEnSpjYI= github.com/spacemeshos/sha256-simd v0.1.0/go.mod h1:O8CClVIilId7RtuCMV2+YzMj6qjVn75JsxOxaE8vcfM= github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= diff --git a/node/node.go b/node/node.go index 18d890a1ee6..b1e210c7973 100644 --- a/node/node.go +++ b/node/node.go @@ -1031,13 +1031,6 @@ func (app *App) startServices(ctx context.Context) error { if err := app.fetcher.Start(); err != nil { return fmt.Errorf("failed to start fetcher: %w", err) } - app.eg.Go(func() error { - app.postVerifier.Start(ctx) - // Start only returns when the context expires (which means the system - // is shutting down). We do the closing of the post verifier here so - // it's ensured that start has returned before we call Close. - return app.postVerifier.Close() - }) app.syncer.Start() app.beaconProtocol.Start(ctx) @@ -1210,6 +1203,10 @@ func (app *App) stopServices(ctx context.Context) { _ = app.atxBuilder.StopSmeshing(false) } + if app.postVerifier != nil { + app.postVerifier.Close() + } + if app.hare != nil { app.hare.Close() }