Skip to content

Commit

Permalink
sync2: ATX integration (#6448)
Browse files Browse the repository at this point in the history
## Motivation

We need more efficient ATX sync.



Co-authored-by: Matthias <5011972+fasmat@users.noreply.github.com>
  • Loading branch information
ivan4th and fasmat committed Dec 16, 2024
1 parent 282d7b4 commit e6b5371
Show file tree
Hide file tree
Showing 43 changed files with 2,070 additions and 1,031 deletions.
20 changes: 20 additions & 0 deletions config/mainnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/spacemeshos/go-spacemesh/hare4"
"github.com/spacemeshos/go-spacemesh/miner"
"github.com/spacemeshos/go-spacemesh/p2p"
"github.com/spacemeshos/go-spacemesh/sync2"
"github.com/spacemeshos/go-spacemesh/syncer"
"github.com/spacemeshos/go-spacemesh/syncer/atxsync"
"github.com/spacemeshos/go-spacemesh/syncer/malsync"
Expand Down Expand Up @@ -77,6 +78,14 @@ func MainnetConfig() Config {

hare4conf := hare4.DefaultConfig()
hare4conf.Enable = false

oldAtxSyncCfg := sync2.DefaultConfig()
oldAtxSyncCfg.MultiPeerReconcilerConfig.SyncInterval = time.Hour
oldAtxSyncCfg.MaxDepth = 16
newAtxSyncCfg := sync2.DefaultConfig()
newAtxSyncCfg.MaxDepth = 21
newAtxSyncCfg.MultiPeerReconcilerConfig.SyncInterval = 5 * time.Minute

return Config{
BaseConfig: BaseConfig{
DataDirParent: defaultDataDir,
Expand Down Expand Up @@ -212,6 +221,17 @@ func MainnetConfig() Config {
DisableMeshAgreement: true,
AtxSync: atxsync.DefaultConfig(),
MalSync: malsync.DefaultConfig(),
ReconcSync: syncer.ReconcSyncConfig{
OldAtxSyncCfg: oldAtxSyncCfg,
NewAtxSyncCfg: newAtxSyncCfg,
ParallelLoadLimit: 10,
HardTimeout: 10 * time.Minute,
ServerConfig: fetch.ServerConfig{
Queue: 200,
Requests: 100,
Interval: time.Second,
},
},
},
Recovery: checkpoint.DefaultConfig(),
Cache: datastore.DefaultConfig(),
Expand Down
19 changes: 19 additions & 0 deletions config/presets/testnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/spacemeshos/go-spacemesh/hare4"
"github.com/spacemeshos/go-spacemesh/miner"
"github.com/spacemeshos/go-spacemesh/p2p"
"github.com/spacemeshos/go-spacemesh/sync2"
"github.com/spacemeshos/go-spacemesh/syncer"
"github.com/spacemeshos/go-spacemesh/syncer/atxsync"
"github.com/spacemeshos/go-spacemesh/syncer/malsync"
Expand Down Expand Up @@ -65,6 +66,13 @@ func testnet() config.Config {
hare4conf := hare4.DefaultConfig()
hare4conf.Enable = false
defaultdir := filepath.Join(home, "spacemesh-testnet", "/")

oldAtxSyncCfg := sync2.DefaultConfig()
oldAtxSyncCfg.MaxDepth = 16
newAtxSyncCfg := sync2.DefaultConfig()
newAtxSyncCfg.MaxDepth = 21
newAtxSyncCfg.MultiPeerReconcilerConfig.SyncInterval = 5 * time.Minute

return config.Config{
Preset: "testnet",
BaseConfig: config.BaseConfig{
Expand Down Expand Up @@ -163,6 +171,17 @@ func testnet() config.Config {
OutOfSyncThresholdLayers: 10,
AtxSync: atxsync.DefaultConfig(),
MalSync: malsync.DefaultConfig(),
ReconcSync: syncer.ReconcSyncConfig{
OldAtxSyncCfg: oldAtxSyncCfg,
NewAtxSyncCfg: newAtxSyncCfg,
ParallelLoadLimit: 10,
HardTimeout: time.Minute,
ServerConfig: fetch.ServerConfig{
Queue: 200,
Requests: 100,
Interval: time.Second,
},
},
},
Recovery: checkpoint.DefaultConfig(),
Cache: datastore.DefaultConfig(),
Expand Down
15 changes: 4 additions & 11 deletions fetch/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ type ServerConfig struct {
Interval time.Duration `mapstructure:"interval"`
}

func (s ServerConfig) toOpts() []server.Opt {
func (s ServerConfig) ToOpts() []server.Opt {
opts := []server.Opt{}
if s.Queue != 0 {
opts = append(opts, server.WithQueueSize(s.Queue))
Expand Down Expand Up @@ -270,6 +270,7 @@ func NewFetch(
cdb *datastore.CachedDB,
proposals *store.Store,
host *p2p.Host,
peerCache *peers.Peers,
opts ...Option,
) (*Fetch, error) {
bs := datastore.NewBlobStore(cdb, proposals)
Expand All @@ -293,7 +294,7 @@ func NewFetch(
opt(f)
}
f.getAtxsLimiter = semaphore.NewWeighted(f.cfg.GetAtxsConcurrency)
f.peers = peers.New()
f.peers = peerCache
// NOTE(dshulyak) this is to avoid tests refactoring.
// there is one test that covers this part.
if host != nil {
Expand Down Expand Up @@ -375,7 +376,7 @@ func (f *Fetch) registerServer(
if f.cfg.EnableServerMetrics {
opts = append(opts, server.WithMetrics())
}
opts = append(opts, f.cfg.getServerConfig(protocol).toOpts()...)
opts = append(opts, f.cfg.getServerConfig(protocol).ToOpts()...)
f.servers[protocol] = server.New(host, protocol, handler, opts...)
}

Expand Down Expand Up @@ -1008,14 +1009,6 @@ func (f *Fetch) RegisterPeerHashes(peer p2p.Peer, hashes []types.Hash32) {
f.hashToPeers.RegisterPeerHashes(peer, hashes)
}

// RegisterPeerHashes registers provided peer for a hash.
func (f *Fetch) RegisterPeerHash(peer p2p.Peer, hash types.Hash32) {
if peer == f.host.ID() {
return
}
f.hashToPeers.Add(hash, peer)
}

func (f *Fetch) SelectBestShuffled(n int) []p2p.Peer {
// shuffle to split the load between peers with good latency.
// and it avoids sticky behavior, when temporarily faulty peer had good latency in the past.
Expand Down
4 changes: 4 additions & 0 deletions fetch/fetch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/datastore"
"github.com/spacemeshos/go-spacemesh/fetch/mocks"
"github.com/spacemeshos/go-spacemesh/fetch/peers"
"github.com/spacemeshos/go-spacemesh/p2p"
"github.com/spacemeshos/go-spacemesh/p2p/pubsub"
"github.com/spacemeshos/go-spacemesh/p2p/server"
Expand Down Expand Up @@ -87,6 +88,7 @@ func createFetch(tb testing.TB) *testFetch {
cdb,
store.New(),
nil,
peers.New(),
WithContext(context.Background()),
WithConfig(cfg),
WithLogger(lg),
Expand Down Expand Up @@ -133,6 +135,7 @@ func TestFetch_Start(t *testing.T) {
cdb,
store.New(),
nil,
peers.New(),
WithContext(context.Background()),
WithConfig(DefaultConfig()),
WithLogger(lg),
Expand Down Expand Up @@ -407,6 +410,7 @@ func TestFetch_PeerDroppedWhenMessageResultsInValidationReject(t *testing.T) {
cdb,
store.New(),
h,
peers.New(),
WithContext(ctx),
WithConfig(cfg),
WithLogger(lg),
Expand Down
5 changes: 4 additions & 1 deletion fetch/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package fetch

import (
"context"

"github.com/spacemeshos/go-spacemesh/common/types"
)

type limiter interface {
Expand All @@ -10,7 +12,8 @@ type limiter interface {
}

type getHashesOpts struct {
limiter limiter
limiter limiter
callback func(types.Hash32, error)
}

type noLimit struct{}
Expand Down
36 changes: 30 additions & 6 deletions fetch/mesh_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func (f *Fetch) GetAtxs(ctx context.Context, ids []types.ATXID, opts ...system.G
return nil
}

options := system.GetAtxOpts{}
var options system.GetAtxOpts
for _, opt := range opts {
opt(&options)
}
Expand All @@ -42,10 +42,17 @@ func (f *Fetch) GetAtxs(ctx context.Context, ids []types.ATXID, opts ...system.G
zap.Bool("limiting", !options.LimitingOff),
)
hashes := types.ATXIDsToHashes(ids)
if options.LimitingOff {
return f.getHashes(ctx, hashes, datastore.ATXDB, f.validators.atx.HandleMessage)
handler := f.validators.atx.HandleMessage
var ghOpts []getHashesOpt
if !options.LimitingOff {
ghOpts = append(ghOpts, withLimiter(f.getAtxsLimiter))
}
return f.getHashes(ctx, hashes, datastore.ATXDB, f.validators.atx.HandleMessage, withLimiter(f.getAtxsLimiter))
if options.Callback != nil {
ghOpts = append(ghOpts, withHashCallback(func(hash types.Hash32, err error) {
options.Callback(types.ATXID(hash), err)
}))
}
return f.getHashes(ctx, hashes, datastore.ATXDB, handler, ghOpts...)
}

type dataReceiver func(context.Context, types.Hash32, p2p.Peer, []byte) error
Expand All @@ -58,6 +65,12 @@ func withLimiter(l limiter) getHashesOpt {
}
}

func withHashCallback(callback func(types.Hash32, error)) getHashesOpt {
return func(o *getHashesOpts) {
o.callback = callback
}
}

func (f *Fetch) getHashes(
ctx context.Context,
hashes []types.Hash32,
Expand All @@ -66,7 +79,8 @@ func (f *Fetch) getHashes(
opts ...getHashesOpt,
) error {
options := getHashesOpts{
limiter: noLimit{},
limiter: noLimit{},
callback: func(types.Hash32, error) {},
}
for _, opt := range opts {
opt(&options)
Expand All @@ -83,18 +97,26 @@ func (f *Fetch) getHashes(
for i, hash := range hashes {
if err := options.limiter.Acquire(ctx, 1); err != nil {
pendingMetric.Add(float64(i - len(hashes)))
return fmt.Errorf("acquiring slot to get hash: %w", err)
err = fmt.Errorf("acquiring slot to get hash: %w", err)
for _, h := range hashes[i:] {
options.callback(h, err)
}
return err
}
p, err := f.getHash(ctx, hash, hint, receiver)
if err != nil {
options.limiter.Release(1)
pendingMetric.Add(float64(i - len(hashes)))
for _, h := range hashes[i:] {
options.callback(h, err)
}
return err
}
if p == nil {
// data is available locally
options.limiter.Release(1)
pendingMetric.Add(-1)
options.callback(hash, nil)
continue
}

Expand All @@ -103,6 +125,7 @@ func (f *Fetch) getHashes(
case <-ctx.Done():
options.limiter.Release(1)
pendingMetric.Add(-1)
options.callback(hash, ctx.Err())
return ctx.Err()
case <-p.completed:
options.limiter.Release(1)
Expand All @@ -118,6 +141,7 @@ func (f *Fetch) getHashes(
bfailure.Add(hash, p.err)
mu.Unlock()
}
options.callback(hash, p.err)
return nil
}
})
Expand Down
23 changes: 19 additions & 4 deletions fetch/mesh_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"errors"
"fmt"
"sync"
"testing"

p2phost "github.com/libp2p/go-libp2p/core/host"
Expand All @@ -20,6 +21,7 @@ import (
"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/datastore"
"github.com/spacemeshos/go-spacemesh/fetch/mocks"
"github.com/spacemeshos/go-spacemesh/fetch/peers"
"github.com/spacemeshos/go-spacemesh/genvm/sdk/wallet"
"github.com/spacemeshos/go-spacemesh/p2p"
"github.com/spacemeshos/go-spacemesh/p2p/peerinfo"
Expand Down Expand Up @@ -87,7 +89,7 @@ func startTestLoop(tb testing.TB, f *Fetch, eg *errgroup.Group, stop chan struct
default:
f.mu.Lock()
for h, req := range f.unprocessed {
require.NoError(tb, req.validator(req.ctx, types.Hash32{}, p2p.NoPeer, []byte{}))
require.NoError(tb, req.validator(req.ctx, h, p2p.NoPeer, []byte{}))
close(req.promise.completed)
delete(f.unprocessed, h)
}
Expand Down Expand Up @@ -596,7 +598,7 @@ func genATXs(tb testing.TB, num uint32) []*types.ActivationTx {
}

func TestGetATXs(t *testing.T) {
atxs := genATXs(t, 2)
atxs := genATXs(t, 4)
f := createFetch(t)
f.mAtxH.EXPECT().
HandleMessage(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Expand All @@ -607,10 +609,22 @@ func TestGetATXs(t *testing.T) {
var eg errgroup.Group
startTestLoop(t, f.Fetch, &eg, stop)

atxIDs := types.ToATXIDs(atxs)
require.NoError(t, f.GetAtxs(context.Background(), atxIDs))
atxIDs1 := types.ToATXIDs(atxs[:2])
require.NoError(t, f.GetAtxs(context.Background(), atxIDs1))

atxIDs2 := types.ToATXIDs(atxs[2:])
var recvIDs []types.ATXID
var mtx sync.Mutex
require.NoError(t, f.GetAtxs(context.Background(), atxIDs2,
system.WithATXCallback(func(id types.ATXID, err error) {
mtx.Lock()
defer mtx.Unlock()
require.NoError(t, err)
recvIDs = append(recvIDs, id)
})))
close(stop)
require.NoError(t, eg.Wait())
require.ElementsMatch(t, atxIDs2, recvIDs)
}

func TestGetActiveSet(t *testing.T) {
Expand Down Expand Up @@ -1010,6 +1024,7 @@ func Test_GetAtxsLimiting(t *testing.T) {
host, err := p2p.Upgrade(mesh.Hosts()[0])
require.NoError(t, err)
f, err := NewFetch(cdb, store.New(), host,
peers.New(),
WithContext(context.Background()),
withServers(map[string]requester{hashProtocol: client}),
WithConfig(cfg),
Expand Down
3 changes: 3 additions & 0 deletions fetch/p2p_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/spacemeshos/go-spacemesh/codec"
"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/datastore"
"github.com/spacemeshos/go-spacemesh/fetch/peers"
"github.com/spacemeshos/go-spacemesh/p2p"
"github.com/spacemeshos/go-spacemesh/p2p/server"
"github.com/spacemeshos/go-spacemesh/proposals/store"
Expand Down Expand Up @@ -132,6 +133,7 @@ func createP2PFetch(
tpf.serverCDB,
tpf.serverPDB,
serverHost,
peers.New(),
WithContext(ctx),
WithConfig(p2pFetchCfg(serverStreaming)),
WithLogger(lg),
Expand All @@ -153,6 +155,7 @@ func createP2PFetch(
tpf.clientCDB,
tpf.clientPDB,
clientHost,
peers.New(),
WithContext(ctx),
WithConfig(p2pFetchCfg(clientStreaming)),
WithLogger(lg),
Expand Down
Loading

0 comments on commit e6b5371

Please sign in to comment.