Skip to content

Commit

Permalink
Refactor gRPC PostService to be injected into services (#5145)
Browse files Browse the repository at this point in the history
## Motivation
Follow up on #5061 (comment)

This changes the `activation.Builder` and `activation.NIPostBuilder` to use `grpcservice.PostService` directly instead of the later signalling new and dropped connections to them.

## Changes
- Inverse dependency between the three components as discussed in previous PR
- During startup the `grpcservice.PostService` is now always initialized along the other node services, but only registered in the `grpcserver` when it is configured for at least one of the available listeners.

## Test Plan
n/a

## TODO
<!-- This section should be removed when all items are complete -->
- [x] Explain motivation or link existing issue(s)
- [x] Test changes and document test plan
- [x] Update documentation as needed
- [x] Update [changelog](../CHANGELOG.md) as needed
  • Loading branch information
fasmat committed Oct 12, 2023
1 parent 7324dc9 commit 299b49d
Show file tree
Hide file tree
Showing 16 changed files with 631 additions and 737 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ COPY . .
RUN --mount=type=cache,id=build,target=/root/.cache/go-build make build
RUN --mount=type=cache,id=build,target=/root/.cache/go-build make gen-p2p-identity

# In this last stage, we start from a fresh Alpine image, to reduce the image size and not ship the Go compiler in our production artifacts.
# In this last stage, we start from a fresh image, to reduce the image size and not ship the Go compiler in our production artifacts.
FROM linux AS spacemesh

# Finally we copy the statically compiled Go binary.
Expand Down
48 changes: 11 additions & 37 deletions activation/activation.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ type Config struct {
// it is responsible for initializing post, receiving poet proof and orchestrating nipst. after which it will
// calculate total weight and providing relevant view as proof.
type Builder struct {
pendingPoetClients atomic.Pointer[[]PoetProvingServiceClient]
pendingPoetClients atomic.Pointer[[]poetClient]
started *atomic.Bool

eg errgroup.Group
Expand All @@ -82,14 +82,12 @@ type Builder struct {
regossipInterval time.Duration
cdb *datastore.CachedDB
publisher pubsub.Publisher
postService postService
nipostBuilder nipostBuilder
postSetupProvider postSetupProvider
initialPost *types.Post
validator nipostValidator

postMux sync.Mutex
postClient PostClient

// smeshingMutex protects `StartSmeshing` and `StopSmeshing` from concurrent access
smeshingMutex sync.Mutex

Expand Down Expand Up @@ -117,7 +115,7 @@ func WithPoetRetryInterval(interval time.Duration) BuilderOption {
}

// PoETClientInitializer interfaces for creating PoetProvingServiceClient.
type PoETClientInitializer func(string, PoetConfig) (PoetProvingServiceClient, error)
type PoETClientInitializer func(string, PoetConfig) (poetClient, error)

// WithPoETClientInitializer modifies initialization logic for PoET client. Used during client update.
func WithPoETClientInitializer(initializer PoETClientInitializer) BuilderOption {
Expand Down Expand Up @@ -153,6 +151,7 @@ func NewBuilder(
signer *signing.EdSigner,
cdb *datastore.CachedDB,
publisher pubsub.Publisher,
postService postService,
nipostBuilder nipostBuilder,
postSetupProvider postSetupProvider,
layerClock layerClock,
Expand All @@ -170,6 +169,7 @@ func NewBuilder(
regossipInterval: conf.RegossipInterval,
cdb: cdb,
publisher: publisher,
postService: postService,
nipostBuilder: nipostBuilder,
postSetupProvider: postSetupProvider,
layerClock: layerClock,
Expand All @@ -185,39 +185,13 @@ func NewBuilder(
return b
}

func (b *Builder) Connected(client PostClient) {
b.postMux.Lock()
defer b.postMux.Unlock()

if b.postClient != nil {
b.log.With().Error("post service already connected")
return
}

b.postClient = client
}

func (b *Builder) Disconnected(client PostClient) {
b.postMux.Lock()
defer b.postMux.Unlock()

if b.postClient != client {
b.log.With().Debug("post service not connected")
return
}

b.postClient = nil
}

func (b *Builder) proof(ctx context.Context, challenge []byte) (*types.Post, *types.PostMetadata, error) {
b.postMux.Lock()
defer b.postMux.Unlock()

if b.postClient == nil {
return nil, nil, errors.New("post service not connected")
client, err := b.postService.Client(b.nodeID)
if err != nil {
return nil, nil, err
}

return b.postClient.Proof(ctx, challenge)
return client.Proof(ctx, challenge)
}

// Smeshing returns true iff atx builder is smeshing.
Expand Down Expand Up @@ -417,7 +391,7 @@ func (b *Builder) verifyInitialPost(ctx context.Context, post *types.Post, metad
}
}

func (b *Builder) receivePendingPoetClients() *[]PoetProvingServiceClient {
func (b *Builder) receivePendingPoetClients() *[]poetClient {
return b.pendingPoetClients.Swap(nil)
}

Expand Down Expand Up @@ -552,7 +526,7 @@ func (b *Builder) UpdatePoETServers(ctx context.Context, endpoints []string) err
return nil
})))

clients := make([]PoetProvingServiceClient, 0, len(endpoints))
clients := make([]poetClient, 0, len(endpoints))
for _, endpoint := range endpoints {
client, err := b.poetClientInitializer(endpoint, b.poetCfg)
if err != nil {
Expand Down
14 changes: 8 additions & 6 deletions activation/activation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ type testAtxBuilder struct {
goldenATXID types.ATXID

mpub *mocks.MockPublisher
mpostSvc *MockpostService
mnipost *MocknipostBuilder
mpost *MockpostSetupProvider
mpostClient *MockPostClient
Expand All @@ -117,6 +118,7 @@ func newTestBuilder(tb testing.TB, opts ...BuilderOption) *testAtxBuilder {
coinbase: types.GenerateAddress([]byte("33333")),
goldenATXID: types.ATXID(types.HexToHash32("77777")),
mpub: mocks.NewMockPublisher(ctrl),
mpostSvc: NewMockpostService(ctrl),
mnipost: NewMocknipostBuilder(ctrl),
mpost: NewMockpostSetupProvider(ctrl),
mpostClient: NewMockPostClient(ctrl),
Expand All @@ -138,14 +140,14 @@ func newTestBuilder(tb testing.TB, opts ...BuilderOption) *testAtxBuilder {
close(ch)
return ch
}).AnyTimes()
tab.mpostSvc.EXPECT().Client(tab.nodeID).Return(tab.mpostClient, nil).AnyTimes()

b := NewBuilder(cfg, tab.nodeID, tab.sig, tab.cdb, tab.mpub, tab.mnipost, tab.mpost,
b := NewBuilder(cfg, tab.nodeID, tab.sig, tab.cdb, tab.mpub, tab.mpostSvc, tab.mnipost, tab.mpost,
tab.mclock, tab.msync, lg, opts...)
b.initialPost = &types.Post{
Nonce: 0,
Indices: make([]byte, 10),
}
b.Connected(tab.mpostClient)
tab.Builder = b
dir := tb.TempDir()
tab.mnipost.EXPECT().DataDir().Return(dir).AnyTimes()
Expand Down Expand Up @@ -1142,8 +1144,8 @@ func TestBuilder_InitialPostIsPersisted(t *testing.T) {
func TestBuilder_UpdatePoets(t *testing.T) {
r := require.New(t)

tab := newTestBuilder(t, WithPoETClientInitializer(func(string, PoetConfig) (PoetProvingServiceClient, error) {
poet := NewMockPoetProvingServiceClient(gomock.NewController(t))
tab := newTestBuilder(t, WithPoETClientInitializer(func(string, PoetConfig) (poetClient, error) {
poet := NewMockpoetClient(gomock.NewController(t))
poet.EXPECT().PoetServiceID(gomock.Any()).AnyTimes().Return(types.PoetServiceID{ServiceID: []byte("poetid")}, nil)
return poet, nil
}))
Expand All @@ -1162,8 +1164,8 @@ func TestBuilder_UpdatePoets(t *testing.T) {
func TestBuilder_UpdatePoetsUnstable(t *testing.T) {
r := require.New(t)

tab := newTestBuilder(t, WithPoETClientInitializer(func(string, PoetConfig) (PoetProvingServiceClient, error) {
poet := NewMockPoetProvingServiceClient(gomock.NewController(t))
tab := newTestBuilder(t, WithPoETClientInitializer(func(string, PoetConfig) (poetClient, error) {
poet := NewMockpoetClient(gomock.NewController(t))
poet.EXPECT().PoetServiceID(gomock.Any()).AnyTimes().Return(types.PoetServiceID{ServiceID: []byte("poetid")}, errors.New("ERROR"))
return poet, nil
}))
Expand Down
59 changes: 24 additions & 35 deletions activation/e2e/nipost_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,9 +175,21 @@ func TestNIPostBuilderWithClients(t *testing.T) {

poetDb := activation.NewPoetDb(sql.InMemory(), log.NewFromLog(logger).Named("poetDb"))

svc := grpcserver.NewPostService(logger)
grpcCfg, cleanup := launchServer(t, svc)
t.Cleanup(cleanup)

t.Cleanup(launchPostSupervisor(t, logger, grpcCfg, opts))

require.Eventually(t, func() bool {
_, err := svc.Client(sig.NodeID())
return err == nil
}, 10*time.Second, 100*time.Millisecond, "timed out waiting for connection")

nb, err := activation.NewNIPostBuilder(
sig.NodeID(),
poetDb,
svc,
[]string{poetProver.RestURL().String()},
t.TempDir(),
log.NewFromLog(logger),
Expand All @@ -187,25 +199,6 @@ func TestNIPostBuilderWithClients(t *testing.T) {
)
require.NoError(t, err)

connected := make(chan struct{})
con := grpcserver.NewMockpostConnectionListener(ctrl)
con.EXPECT().Connected(gomock.Any()).DoAndReturn(func(c activation.PostClient) {
close(connected)
}).Times(1)
con.EXPECT().Disconnected(gomock.Any()).Times(1)

svc := grpcserver.NewPostService(logger, nb, con)
grpcCfg, cleanup := launchServer(t, svc)
t.Cleanup(cleanup)

t.Cleanup(launchPostSupervisor(t, logger, grpcCfg, opts))

select {
case <-connected:
case <-time.After(10 * time.Second):
require.Fail(t, "timed out waiting for connection")
}

challenge := types.NIPostChallenge{
PublishEpoch: postGenesisEpoch + 2,
}
Expand Down Expand Up @@ -245,9 +238,12 @@ func TestNIPostBuilder_Close(t *testing.T) {
},
)

svc := grpcserver.NewPostService(logger)

nb, err := activation.NewNIPostBuilder(
sig.NodeID(),
poetDb,
svc,
[]string{poetProver.RestURL().String()},
t.TempDir(),
log.NewFromLog(logger),
Expand Down Expand Up @@ -303,9 +299,14 @@ func TestNewNIPostBuilderNotInitialized(t *testing.T) {

poetDb := activation.NewPoetDb(sql.InMemory(), log.NewFromLog(logger).Named("poetDb"))

svc := grpcserver.NewPostService(logger)
grpcCfg, cleanup := launchServer(t, svc)
t.Cleanup(cleanup)

nb, err := activation.NewNIPostBuilder(
sig.NodeID(),
poetDb,
svc,
[]string{poetProver.RestURL().String()},
t.TempDir(),
logtest.New(t),
Expand All @@ -315,28 +316,16 @@ func TestNewNIPostBuilderNotInitialized(t *testing.T) {
)
require.NoError(t, err)

connected := make(chan struct{})
con := grpcserver.NewMockpostConnectionListener(ctrl)
con.EXPECT().Connected(gomock.Any()).DoAndReturn(func(c activation.PostClient) {
close(connected)
}).Times(1)
con.EXPECT().Disconnected(gomock.Any()).Times(1)

svc := grpcserver.NewPostService(logger, nb, con)
grpcCfg, cleanup := launchServer(t, svc)
t.Cleanup(cleanup)

opts := activation.DefaultPostSetupOpts()
opts.DataDir = t.TempDir()
opts.ProviderID.SetInt64(int64(initialization.CPUProviderID()))
opts.Scrypt.N = 2 // Speedup initialization in tests.
t.Cleanup(launchPostSupervisor(t, logger, grpcCfg, opts))

select {
case <-connected:
case <-time.After(10 * time.Second):
require.Fail(t, "timed out waiting for connection")
}
require.Eventually(t, func() bool {
_, err := svc.Client(sig.NodeID())
return err == nil
}, 10*time.Second, 100*time.Millisecond, "timed out waiting for connection")

challenge := types.NIPostChallenge{
PublishEpoch: postGenesisEpoch + 2,
Expand Down
31 changes: 12 additions & 19 deletions activation/e2e/validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,21 @@ func TestValidator_Validate(t *testing.T) {

poetDb := activation.NewPoetDb(sql.InMemory(), log.NewFromLog(logger).Named("poetDb"))

svc := grpcserver.NewPostService(logger)
grpcCfg, cleanup := launchServer(t, svc)
t.Cleanup(cleanup)

t.Cleanup(launchPostSupervisor(t, logger, grpcCfg, opts))

require.Eventually(t, func() bool {
_, err := svc.Client(sig.NodeID())
return err == nil
}, 10*time.Second, 100*time.Millisecond, "timed out waiting for connection")

nb, err := activation.NewNIPostBuilder(
sig.NodeID(),
poetDb,
svc,
[]string{poetProver.RestURL().String()},
t.TempDir(),
logtest.New(t, zapcore.DebugLevel),
Expand All @@ -81,25 +93,6 @@ func TestValidator_Validate(t *testing.T) {
)
require.NoError(t, err)

connected := make(chan struct{})
con := grpcserver.NewMockpostConnectionListener(ctrl)
con.EXPECT().Connected(gomock.Any()).DoAndReturn(func(c activation.PostClient) {
close(connected)
}).Times(1)
con.EXPECT().Disconnected(gomock.Any()).Times(1)

svc := grpcserver.NewPostService(logger, nb, con)
grpcCfg, cleanup := launchServer(t, svc)
t.Cleanup(cleanup)

t.Cleanup(launchPostSupervisor(t, logger, grpcCfg, opts))

select {
case <-connected:
case <-time.After(10 * time.Second):
require.Fail(t, "timed out waiting for connection")
}

challenge := types.NIPostChallenge{
PublishEpoch: postGenesisEpoch + 2,
}
Expand Down
28 changes: 27 additions & 1 deletion activation/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type layerClock interface {
}

type nipostBuilder interface {
UpdatePoETProvers([]PoetProvingServiceClient)
UpdatePoETProvers([]poetClient)
BuildNIPost(ctx context.Context, challenge *types.NIPostChallenge) (*types.NIPost, error)
DataDir() string
}
Expand Down Expand Up @@ -78,6 +78,32 @@ type SmeshingProvider interface {
UpdatePoETServers(ctx context.Context, endpoints []string) error
}

// poetClient servers as an interface to communicate with a PoET server.
// It is used to submit challenges and fetch proofs.
type poetClient interface {
Address() string

PowParams(ctx context.Context) (*PoetPowParams, error)

// Submit registers a challenge in the proving service current open round.
Submit(ctx context.Context, deadline time.Time, prefix, challenge []byte, signature types.EdSignature, nodeID types.NodeID, pow PoetPoW) (*types.PoetRound, error)

// PoetServiceID returns the public key of the PoET proving service.
PoetServiceID(context.Context) (types.PoetServiceID, error)

// Proof returns the proof for the given round ID.
Proof(ctx context.Context, roundID string) (*types.PoetProofMessage, []types.Member, error)
}

type poetDbAPI interface {
GetProof(types.PoetProofRef) (*types.PoetProof, *types.Hash32, error)
ValidateAndStore(ctx context.Context, proofMessage *types.PoetProofMessage) error
}

type postService interface {
Client(nodeId types.NodeID) (PostClient, error)
}

type PostClient interface {
Proof(ctx context.Context, challenge []byte) (*types.Post, *types.PostMetadata, error)
}
Loading

0 comments on commit 299b49d

Please sign in to comment.