Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - sync2: ATX integration #6448

Closed
wants to merge 34 commits into from
Closed
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
ca2d5c6
sql: add Database.Connection/WithConnection, interface cleanup
ivan4th Nov 11, 2024
6e3aa16
sync2: dbset: use single connection for each sync session
ivan4th Nov 11, 2024
4828b67
sync2: ATX integration
ivan4th Nov 11, 2024
eff6963
sync2: multipeer: fix edge cases
ivan4th Nov 11, 2024
4984889
sync2: dbset: fix connection leak in non-loaded DBSets
ivan4th Nov 11, 2024
a89964a
Merge branch 'sync2/dbset-conns' into sync2/atxs
ivan4th Nov 11, 2024
bb31cc6
sql: revert removing Rollback method of the Migration interface
ivan4th Nov 13, 2024
3e5a401
sql: remove Database.Connection() method, keep WithConnection()
ivan4th Nov 13, 2024
2753560
Merge branch 'feature/long-db-conns' into sync2/dbset-conns
ivan4th Nov 13, 2024
f5cae06
sql: allow multiple connections to in-memory database
ivan4th Nov 13, 2024
9e585e5
sync2: fixup for temporary OrderedSet copies
ivan4th Nov 13, 2024
042d0d7
Merge branch 'sync2/dbset-conns' into sync2/fix-multipeer
ivan4th Nov 13, 2024
294bc6c
Merge branch 'sync2/fix-multipeer' into sync2/atxs
ivan4th Nov 13, 2024
0914f1f
Merge branch 'develop' into sync2/dbset-conns
ivan4th Nov 20, 2024
1ff57ab
Merge branch 'sync2/dbset-conns' into sync2/atxs
ivan4th Nov 20, 2024
ce73f54
sync2: use saner retry scheme for fetched ATXs
ivan4th Nov 20, 2024
f411f7e
syncer: rename "v2" field to "reconcSync" in the config
ivan4th Nov 20, 2024
4ac694b
Merge branch 'sync2/dbset-conns' into sync2/fix-multipeer
ivan4th Nov 21, 2024
bb7226f
sync2: add server options and request rate limits
ivan4th Nov 20, 2024
50afbf8
Merge branch 'develop' into sync2/atxs
ivan4th Nov 24, 2024
284f836
sync2: fix mainnet/testnet configs
ivan4th Nov 24, 2024
a8d87f1
sync2: only handle synced keys during commit
ivan4th Nov 25, 2024
24768fb
Merge branch 'develop' into sync2/fix-multipeer
ivan4th Nov 26, 2024
bb54fd8
Merge branch 'develop' into sync2/atxs
ivan4th Nov 26, 2024
244ceb1
Merge branch 'sync2/fix-multipeer' into sync2/atxs
ivan4th Nov 26, 2024
9092fe7
Merge branch 'develop' into sync2/atxs
ivan4th Dec 5, 2024
2987bbc
sync2: address comments
ivan4th Dec 6, 2024
7dab83a
sync2: make most Seqs/SeqResults non-cyclic
ivan4th Dec 6, 2024
858787a
sync2: address comments
ivan4th Dec 6, 2024
f869ded
sync2: refactor ATXHandler.Commit()
ivan4th Dec 6, 2024
ad4924b
sync2: fixup
ivan4th Dec 6, 2024
8a1e693
Simplify interfaces
fasmat Dec 13, 2024
c22bd45
Fixup after Fetcher update
ivan4th Dec 15, 2024
6e40bda
Address comments
ivan4th Dec 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions config/mainnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/spacemeshos/go-spacemesh/hare4"
"github.com/spacemeshos/go-spacemesh/miner"
"github.com/spacemeshos/go-spacemesh/p2p"
"github.com/spacemeshos/go-spacemesh/sync2"
"github.com/spacemeshos/go-spacemesh/syncer"
"github.com/spacemeshos/go-spacemesh/syncer/atxsync"
"github.com/spacemeshos/go-spacemesh/syncer/malsync"
Expand Down Expand Up @@ -77,6 +78,14 @@ func MainnetConfig() Config {

hare4conf := hare4.DefaultConfig()
hare4conf.Enable = false

oldAtxSyncCfg := sync2.DefaultConfig()
oldAtxSyncCfg.MultiPeerReconcilerConfig.SyncInterval = time.Hour
oldAtxSyncCfg.MaxDepth = 16
newAtxSyncCfg := sync2.DefaultConfig()
newAtxSyncCfg.MaxDepth = 21
newAtxSyncCfg.MultiPeerReconcilerConfig.SyncInterval = 5 * time.Minute
fasmat marked this conversation as resolved.
Show resolved Hide resolved

return Config{
BaseConfig: BaseConfig{
DataDirParent: defaultDataDir,
Expand Down Expand Up @@ -212,6 +221,17 @@ func MainnetConfig() Config {
DisableMeshAgreement: true,
AtxSync: atxsync.DefaultConfig(),
MalSync: malsync.DefaultConfig(),
ReconcSync: syncer.ReconcSyncConfig{
OldAtxSyncCfg: oldAtxSyncCfg,
NewAtxSyncCfg: newAtxSyncCfg,
ParallelLoadLimit: 10,
HardTimeout: 10 * time.Minute,
ServerConfig: fetch.ServerConfig{
Queue: 200,
Requests: 100,
Interval: time.Second,
},
},
},
Recovery: checkpoint.DefaultConfig(),
Cache: datastore.DefaultConfig(),
Expand Down
19 changes: 19 additions & 0 deletions config/presets/testnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/spacemeshos/go-spacemesh/hare4"
"github.com/spacemeshos/go-spacemesh/miner"
"github.com/spacemeshos/go-spacemesh/p2p"
"github.com/spacemeshos/go-spacemesh/sync2"
"github.com/spacemeshos/go-spacemesh/syncer"
"github.com/spacemeshos/go-spacemesh/syncer/atxsync"
"github.com/spacemeshos/go-spacemesh/syncer/malsync"
Expand Down Expand Up @@ -65,6 +66,13 @@ func testnet() config.Config {
hare4conf := hare4.DefaultConfig()
hare4conf.Enable = false
defaultdir := filepath.Join(home, "spacemesh-testnet", "/")

oldAtxSyncCfg := sync2.DefaultConfig()
oldAtxSyncCfg.MaxDepth = 16
newAtxSyncCfg := sync2.DefaultConfig()
newAtxSyncCfg.MaxDepth = 21
newAtxSyncCfg.MultiPeerReconcilerConfig.SyncInterval = 5 * time.Minute

return config.Config{
Preset: "testnet",
BaseConfig: config.BaseConfig{
Expand Down Expand Up @@ -163,6 +171,17 @@ func testnet() config.Config {
OutOfSyncThresholdLayers: 10,
AtxSync: atxsync.DefaultConfig(),
MalSync: malsync.DefaultConfig(),
ReconcSync: syncer.ReconcSyncConfig{
OldAtxSyncCfg: oldAtxSyncCfg,
NewAtxSyncCfg: newAtxSyncCfg,
ParallelLoadLimit: 10,
HardTimeout: time.Minute,
ServerConfig: fetch.ServerConfig{
Queue: 200,
Requests: 100,
Interval: time.Second,
},
},
},
Recovery: checkpoint.DefaultConfig(),
Cache: datastore.DefaultConfig(),
Expand Down
13 changes: 11 additions & 2 deletions fetch/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"sync"
"time"

corehost "github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/protocol"
"go.uber.org/zap"
Expand Down Expand Up @@ -116,7 +117,7 @@ type ServerConfig struct {
Interval time.Duration `mapstructure:"interval"`
}

func (s ServerConfig) toOpts() []server.Opt {
func (s ServerConfig) ToOpts() []server.Opt {
opts := []server.Opt{}
if s.Queue != 0 {
opts = append(opts, server.WithQueueSize(s.Queue))
Expand Down Expand Up @@ -375,7 +376,7 @@ func (f *Fetch) registerServer(
if f.cfg.EnableServerMetrics {
opts = append(opts, server.WithMetrics())
}
opts = append(opts, f.cfg.getServerConfig(protocol).toOpts()...)
opts = append(opts, f.cfg.getServerConfig(protocol).ToOpts()...)
f.servers[protocol] = server.New(host, protocol, handler, opts...)
}

Expand Down Expand Up @@ -1025,3 +1026,11 @@ func (f *Fetch) SelectBestShuffled(n int) []p2p.Peer {
})
return peers
}

func (f *Fetch) Host() corehost.Host {
return f.host.(corehost.Host)
}

func (f *Fetch) Peers() *peers.Peers {
return f.peers
}
5 changes: 4 additions & 1 deletion fetch/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package fetch

import (
"context"

"github.com/spacemeshos/go-spacemesh/common/types"
)

type limiter interface {
Expand All @@ -10,7 +12,8 @@ type limiter interface {
}

type getHashesOpts struct {
limiter limiter
limiter limiter
callback func(types.Hash32, error)
}

type noLimit struct{}
Expand Down
36 changes: 30 additions & 6 deletions fetch/mesh_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func (f *Fetch) GetAtxs(ctx context.Context, ids []types.ATXID, opts ...system.G
return nil
}

options := system.GetAtxOpts{}
var options system.GetAtxOpts
for _, opt := range opts {
opt(&options)
}
Expand All @@ -42,10 +42,17 @@ func (f *Fetch) GetAtxs(ctx context.Context, ids []types.ATXID, opts ...system.G
zap.Bool("limiting", !options.LimitingOff),
)
hashes := types.ATXIDsToHashes(ids)
if options.LimitingOff {
return f.getHashes(ctx, hashes, datastore.ATXDB, f.validators.atx.HandleMessage)
handler := f.validators.atx.HandleMessage
var ghOpts []getHashesOpt
if !options.LimitingOff {
ghOpts = append(ghOpts, withLimiter(f.getAtxsLimiter))
}
return f.getHashes(ctx, hashes, datastore.ATXDB, f.validators.atx.HandleMessage, withLimiter(f.getAtxsLimiter))
if options.Callback != nil {
ghOpts = append(ghOpts, withHashCallback(func(hash types.Hash32, err error) {
options.Callback(types.ATXID(hash), err)
}))
}
return f.getHashes(ctx, hashes, datastore.ATXDB, handler, ghOpts...)
}

type dataReceiver func(context.Context, types.Hash32, p2p.Peer, []byte) error
Expand All @@ -58,6 +65,12 @@ func withLimiter(l limiter) getHashesOpt {
}
}

func withHashCallback(callback func(types.Hash32, error)) getHashesOpt {
return func(o *getHashesOpts) {
o.callback = callback
}
}

func (f *Fetch) getHashes(
ctx context.Context,
hashes []types.Hash32,
Expand All @@ -66,7 +79,8 @@ func (f *Fetch) getHashes(
opts ...getHashesOpt,
) error {
options := getHashesOpts{
limiter: noLimit{},
limiter: noLimit{},
callback: func(types.Hash32, error) {},
}
for _, opt := range opts {
opt(&options)
Expand All @@ -83,18 +97,26 @@ func (f *Fetch) getHashes(
for i, hash := range hashes {
if err := options.limiter.Acquire(ctx, 1); err != nil {
pendingMetric.Add(float64(i - len(hashes)))
return fmt.Errorf("acquiring slot to get hash: %w", err)
err = fmt.Errorf("acquiring slot to get hash: %w", err)
for _, h := range hashes[i:] {
options.callback(h, err)
}
return err
fasmat marked this conversation as resolved.
Show resolved Hide resolved
}
p, err := f.getHash(ctx, hash, hint, receiver)
if err != nil {
options.limiter.Release(1)
pendingMetric.Add(float64(i - len(hashes)))
for _, h := range hashes[i:] {
options.callback(h, err)
}
fasmat marked this conversation as resolved.
Show resolved Hide resolved
return err
}
if p == nil {
// data is available locally
options.limiter.Release(1)
pendingMetric.Add(-1)
options.callback(hash, nil)
continue
}

Expand All @@ -103,6 +125,7 @@ func (f *Fetch) getHashes(
case <-ctx.Done():
options.limiter.Release(1)
pendingMetric.Add(-1)
options.callback(hash, ctx.Err())
return ctx.Err()
case <-p.completed:
options.limiter.Release(1)
Expand All @@ -118,6 +141,7 @@ func (f *Fetch) getHashes(
bfailure.Add(hash, p.err)
mu.Unlock()
}
options.callback(hash, p.err)
return nil
}
})
Expand Down
21 changes: 17 additions & 4 deletions fetch/mesh_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"errors"
"fmt"
"sync"
"testing"

p2phost "github.com/libp2p/go-libp2p/core/host"
Expand Down Expand Up @@ -87,7 +88,7 @@ func startTestLoop(tb testing.TB, f *Fetch, eg *errgroup.Group, stop chan struct
default:
f.mu.Lock()
for h, req := range f.unprocessed {
require.NoError(tb, req.validator(req.ctx, types.Hash32{}, p2p.NoPeer, []byte{}))
require.NoError(tb, req.validator(req.ctx, h, p2p.NoPeer, []byte{}))
close(req.promise.completed)
delete(f.unprocessed, h)
}
Expand Down Expand Up @@ -596,7 +597,7 @@ func genATXs(tb testing.TB, num uint32) []*types.ActivationTx {
}

func TestGetATXs(t *testing.T) {
atxs := genATXs(t, 2)
atxs := genATXs(t, 4)
f := createFetch(t)
f.mAtxH.EXPECT().
HandleMessage(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Expand All @@ -607,10 +608,22 @@ func TestGetATXs(t *testing.T) {
var eg errgroup.Group
startTestLoop(t, f.Fetch, &eg, stop)

atxIDs := types.ToATXIDs(atxs)
require.NoError(t, f.GetAtxs(context.Background(), atxIDs))
atxIDs1 := types.ToATXIDs(atxs[:2])
require.NoError(t, f.GetAtxs(context.Background(), atxIDs1))

atxIDs2 := types.ToATXIDs(atxs[2:])
var recvIDs []types.ATXID
var mtx sync.Mutex
require.NoError(t, f.GetAtxs(context.Background(), atxIDs2,
system.WithATXCallback(func(id types.ATXID, err error) {
mtx.Lock()
defer mtx.Unlock()
require.NoError(t, err)
recvIDs = append(recvIDs, id)
})))
close(stop)
require.NoError(t, eg.Wait())
require.ElementsMatch(t, atxIDs2, recvIDs)
}

func TestGetActiveSet(t *testing.T) {
Expand Down
Loading
Loading