Skip to content

Commit

Permalink
Backport: Improve events emitted by node when post proofing (#5470)
Browse files Browse the repository at this point in the history
* Backport: Improve events emitted by node when post proofing
  • Loading branch information
fasmat authored Jan 19, 2024
1 parent 489db02 commit 6ebcdf4
Show file tree
Hide file tree
Showing 7 changed files with 124 additions and 70 deletions.
15 changes: 12 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,20 @@

See [RELEASE](./RELEASE.md) for workflow instructions.

## Release v1.3.4
## Release v1.3.5

### Highlights
### Improvements

### Features
* [#5470](https://github.com/spacemeshos/go-spacemesh/pull/5470)
Fixed a bug in event reporting where the node reports a disconnection from the PoST service as a "PoST failed" event.
Disconnections cannot be avoided completely and do not interrupt the PoST proofing process. As long as the PoST
service reconnects within a reasonable time, the node will continue to operate normally without reporting any errors
via the event API.

Users of a remote setup should make sure that the PoST service is actually running and can reach the node. Observe
the log of both apps for indications of a connection problem.

## Release v1.3.4

### Improvements

Expand Down
23 changes: 1 addition & 22 deletions activation/activation.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,27 +160,6 @@ func NewBuilder(
return b
}

func (b *Builder) proof(ctx context.Context, challenge []byte) (*types.Post, *types.PostInfo, error) {
for {
client, err := b.postService.Client(b.signer.NodeID())
if err == nil {
events.EmitPostStart(challenge)
post, postInfo, err := client.Proof(ctx, challenge)
if err != nil {
events.EmitPostFailure()
return nil, nil, err
}
events.EmitPostComplete(challenge)
return post, postInfo, err
}
select {
case <-ctx.Done():
return nil, nil, ctx.Err()
case <-time.After(2 * time.Second):
}
}
}

// Smeshing returns true iff atx builder is smeshing.
func (b *Builder) Smeshing() bool {
b.smeshingMutex.Lock()
Expand Down Expand Up @@ -286,7 +265,7 @@ func (b *Builder) buildInitialPost(ctx context.Context) error {

// Create the initial post and save it.
startTime := time.Now()
post, postInfo, err := b.proof(ctx, shared.ZeroChallenge)
post, postInfo, err := b.nipostBuilder.Proof(ctx, shared.ZeroChallenge)
if err != nil {
return fmt.Errorf("post execution: %w", err)
}
Expand Down
57 changes: 25 additions & 32 deletions activation/activation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,13 +277,11 @@ func TestBuilder_StartSmeshingCoinbase(t *testing.T) {
tab := newTestBuilder(t)
coinbase := types.Address{1, 1, 1}

tab.mpostClient.EXPECT().
Proof(gomock.Any(), gomock.Any()).
DoAndReturn(func(ctx context.Context, b []byte) (*types.Post, *types.PostInfo, error) {
tab.mnipost.EXPECT().Proof(gomock.Any(), shared.ZeroChallenge).DoAndReturn(
func(ctx context.Context, b []byte) (*types.Post, *types.PostInfo, error) {
<-ctx.Done()
return nil, nil, ctx.Err()
}).
AnyTimes()
})
tab.mValidator.EXPECT().
Post(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
AnyTimes().
Expand All @@ -302,12 +300,11 @@ func TestBuilder_StartSmeshingCoinbase(t *testing.T) {
func TestBuilder_RestartSmeshing(t *testing.T) {
getBuilder := func(t *testing.T) *Builder {
tab := newTestBuilder(t)
tab.mpostClient.EXPECT().Proof(gomock.Any(), shared.ZeroChallenge).AnyTimes().DoAndReturn(
func(ctx context.Context, _ []byte) (*types.Post, *types.PostInfo, error) {
tab.mnipost.EXPECT().Proof(gomock.Any(), shared.ZeroChallenge).AnyTimes().DoAndReturn(
func(ctx context.Context, b []byte) (*types.Post, *types.PostInfo, error) {
<-ctx.Done()
return nil, nil, ctx.Err()
},
)
})

ch := make(chan struct{})
close(ch)
Expand Down Expand Up @@ -364,13 +361,11 @@ func TestBuilder_StopSmeshing_Delete(t *testing.T) {
return nil, ctx.Err()
}).
AnyTimes()
tab.mpostClient.EXPECT().
Proof(gomock.Any(), gomock.Any()).
DoAndReturn(func(ctx context.Context, b []byte) (*types.Post, *types.PostInfo, error) {
tab.mnipost.EXPECT().Proof(gomock.Any(), shared.ZeroChallenge).DoAndReturn(
func(ctx context.Context, b []byte) (*types.Post, *types.PostInfo, error) {
<-ctx.Done()
return nil, nil, ctx.Err()
}).
AnyTimes()
}).AnyTimes()

// Create state files
// TODO(mafa): fully migrate to DB
Expand Down Expand Up @@ -1261,15 +1256,14 @@ func TestBuilder_RetryPublishActivationTx(t *testing.T) {

func TestBuilder_InitialProofGeneratedOnce(t *testing.T) {
tab := newTestBuilder(t, WithPoetConfig(PoetConfig{PhaseShift: layerDuration * 4}))
tab.mpostClient.EXPECT().Proof(gomock.Any(), shared.ZeroChallenge).
Return(
&types.Post{Indices: make([]byte, 10)},
&types.PostInfo{
CommitmentATX: types.RandomATXID(),
Nonce: new(types.VRFPostIndex),
},
nil,
)
tab.mnipost.EXPECT().Proof(gomock.Any(), shared.ZeroChallenge).Return(
&types.Post{Indices: make([]byte, 10)},
&types.PostInfo{
CommitmentATX: types.RandomATXID(),
Nonce: new(types.VRFPostIndex),
},
nil,
)
tab.mValidator.EXPECT().
Post(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
AnyTimes().
Expand Down Expand Up @@ -1299,15 +1293,14 @@ func TestBuilder_InitialProofGeneratedOnce(t *testing.T) {

func TestBuilder_InitialPostIsPersisted(t *testing.T) {
tab := newTestBuilder(t, WithPoetConfig(PoetConfig{PhaseShift: layerDuration * 4}))
tab.mpostClient.EXPECT().Proof(gomock.Any(), shared.ZeroChallenge).
Return(
&types.Post{Indices: make([]byte, 10)},
&types.PostInfo{
CommitmentATX: types.RandomATXID(),
Nonce: new(types.VRFPostIndex),
},
nil,
)
tab.mnipost.EXPECT().Proof(gomock.Any(), shared.ZeroChallenge).Return(
&types.Post{Indices: make([]byte, 10)},
&types.PostInfo{
CommitmentATX: types.RandomATXID(),
Nonce: new(types.VRFPostIndex),
},
nil,
)
tab.mValidator.EXPECT().
Post(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
AnyTimes().
Expand Down
4 changes: 4 additions & 0 deletions activation/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package activation

import (
"context"
"fmt"
"io"
"time"

Expand Down Expand Up @@ -67,6 +68,7 @@ type layerClock interface {

type nipostBuilder interface {
BuildNIPost(ctx context.Context, challenge *types.NIPostChallenge) (*types.NIPost, error)
Proof(ctx context.Context, challenge []byte) (*types.Post, *types.PostInfo, error)
DataDir() string
}

Expand Down Expand Up @@ -127,6 +129,8 @@ type poetDbAPI interface {
ValidateAndStore(ctx context.Context, proofMessage *types.PoetProofMessage) error
}

var ErrPostClientClosed = fmt.Errorf("post client closed")

type postService interface {
Client(nodeId types.NodeID) (PostClient, error)
}
Expand Down
40 changes: 40 additions & 0 deletions activation/mocks.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

50 changes: 39 additions & 11 deletions activation/nipost.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,13 +129,45 @@ func (nb *NIPostBuilder) DataDir() string {
return nb.dataDir
}

func (nb *NIPostBuilder) proof(ctx context.Context, challenge []byte) (*types.Post, *types.PostInfo, error) {
client, err := nb.postService.Client(nb.signer.NodeID())
if err != nil {
return nil, nil, err
}
func (nb *NIPostBuilder) Proof(ctx context.Context, challenge []byte) (*types.Post, *types.PostInfo, error) {
started := false
retries := 0
for {
client, err := nb.postService.Client(nb.signer.NodeID())
if err != nil {
select {
case <-ctx.Done():
if started {
events.EmitPostFailure()
}
return nil, nil, ctx.Err()
case <-time.After(2 * time.Second): // Wait a few seconds and try connecting again
retries++
if retries%10 == 0 { // every 20 seconds inform user about lost connection (for remote post service)
// TODO(mafa): emit event warning user about lost connection
nb.log.Warn("post service not connected - waiting for reconnection", zap.Error(err))
}
continue
}
}
if !started {
events.EmitPostStart(challenge)
started = true
}

return client.Proof(ctx, challenge)
retries = 0
post, postInfo, err := client.Proof(ctx, challenge)
switch {
case errors.Is(err, ErrPostClientClosed):
continue
case err != nil:
events.EmitPostFailure()
return nil, nil, err
default: // err == nil
events.EmitPostComplete(challenge)
return post, postInfo, err
}
}
}

// UpdatePoETProvers updates poetProver reference. It should not be executed concurrently with BuildNIPoST.
Expand Down Expand Up @@ -272,14 +304,10 @@ func (nb *NIPostBuilder) BuildNIPost(ctx context.Context, challenge *types.NIPos

nb.log.Info("starting post execution", zap.Binary("challenge", nb.state.PoetProofRef[:]))
startTime := time.Now()
events.EmitPostStart(nb.state.PoetProofRef[:])

proof, postInfo, err := nb.proof(postCtx, nb.state.PoetProofRef[:])
proof, postInfo, err := nb.Proof(postCtx, nb.state.PoetProofRef[:])
if err != nil {
events.EmitPostFailure()
return nil, fmt.Errorf("failed to generate Post: %w", err)
}
events.EmitPostComplete(nb.state.PoetProofRef[:])
postGenDuration := time.Since(startTime)
nb.log.Info("finished post execution", zap.Duration("duration", postGenDuration))
metrics.PostDuration.Set(float64(postGenDuration.Nanoseconds()))
Expand Down
5 changes: 3 additions & 2 deletions api/grpcserver/post_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

pb "github.com/spacemeshos/api/release/go/spacemesh/v1"

"github.com/spacemeshos/go-spacemesh/activation"
"github.com/spacemeshos/go-spacemesh/common/types"
)

Expand Down Expand Up @@ -149,7 +150,7 @@ func (pc *postClient) send(ctx context.Context, req *pb.NodeRequest) (*pb.Servic
// send command
select {
case <-pc.closed:
return nil, fmt.Errorf("post client closed")
return nil, activation.ErrPostClientClosed
case <-ctx.Done():
return nil, ctx.Err()
case pc.con <- cmd:
Expand All @@ -158,7 +159,7 @@ func (pc *postClient) send(ctx context.Context, req *pb.NodeRequest) (*pb.Servic
// receive response
select {
case <-pc.closed:
return nil, fmt.Errorf("post client closed")
return nil, activation.ErrPostClientClosed
case <-ctx.Done():
return nil, ctx.Err()
case resp := <-resp:
Expand Down

0 comments on commit 6ebcdf4

Please sign in to comment.