Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sync2: ATX integration #6448

Open
wants to merge 34 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
ca2d5c6
sql: add Database.Connection/WithConnection, interface cleanup
ivan4th Nov 11, 2024
6e3aa16
sync2: dbset: use single connection for each sync session
ivan4th Nov 11, 2024
4828b67
sync2: ATX integration
ivan4th Nov 11, 2024
eff6963
sync2: multipeer: fix edge cases
ivan4th Nov 11, 2024
4984889
sync2: dbset: fix connection leak in non-loaded DBSets
ivan4th Nov 11, 2024
a89964a
Merge branch 'sync2/dbset-conns' into sync2/atxs
ivan4th Nov 11, 2024
bb31cc6
sql: revert removing Rollback method of the Migration interface
ivan4th Nov 13, 2024
3e5a401
sql: remove Database.Connection() method, keep WithConnection()
ivan4th Nov 13, 2024
2753560
Merge branch 'feature/long-db-conns' into sync2/dbset-conns
ivan4th Nov 13, 2024
f5cae06
sql: allow multiple connections to in-memory database
ivan4th Nov 13, 2024
9e585e5
sync2: fixup for temporary OrderedSet copies
ivan4th Nov 13, 2024
042d0d7
Merge branch 'sync2/dbset-conns' into sync2/fix-multipeer
ivan4th Nov 13, 2024
294bc6c
Merge branch 'sync2/fix-multipeer' into sync2/atxs
ivan4th Nov 13, 2024
0914f1f
Merge branch 'develop' into sync2/dbset-conns
ivan4th Nov 20, 2024
1ff57ab
Merge branch 'sync2/dbset-conns' into sync2/atxs
ivan4th Nov 20, 2024
ce73f54
sync2: use saner retry scheme for fetched ATXs
ivan4th Nov 20, 2024
f411f7e
syncer: rename "v2" field to "reconcSync" in the config
ivan4th Nov 20, 2024
4ac694b
Merge branch 'sync2/dbset-conns' into sync2/fix-multipeer
ivan4th Nov 21, 2024
bb7226f
sync2: add server options and request rate limits
ivan4th Nov 20, 2024
50afbf8
Merge branch 'develop' into sync2/atxs
ivan4th Nov 24, 2024
284f836
sync2: fix mainnet/testnet configs
ivan4th Nov 24, 2024
a8d87f1
sync2: only handle synced keys during commit
ivan4th Nov 25, 2024
24768fb
Merge branch 'develop' into sync2/fix-multipeer
ivan4th Nov 26, 2024
bb54fd8
Merge branch 'develop' into sync2/atxs
ivan4th Nov 26, 2024
244ceb1
Merge branch 'sync2/fix-multipeer' into sync2/atxs
ivan4th Nov 26, 2024
9092fe7
Merge branch 'develop' into sync2/atxs
ivan4th Dec 5, 2024
2987bbc
sync2: address comments
ivan4th Dec 6, 2024
7dab83a
sync2: make most Seqs/SeqResults non-cyclic
ivan4th Dec 6, 2024
858787a
sync2: address comments
ivan4th Dec 6, 2024
f869ded
sync2: refactor ATXHandler.Commit()
ivan4th Dec 6, 2024
ad4924b
sync2: fixup
ivan4th Dec 6, 2024
8a1e693
Simplify interfaces
fasmat Dec 13, 2024
c22bd45
Fixup after Fetcher update
ivan4th Dec 15, 2024
6e40bda
Address comments
ivan4th Dec 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions config/mainnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/spacemeshos/go-spacemesh/hare4"
"github.com/spacemeshos/go-spacemesh/miner"
"github.com/spacemeshos/go-spacemesh/p2p"
"github.com/spacemeshos/go-spacemesh/sync2"
"github.com/spacemeshos/go-spacemesh/syncer"
"github.com/spacemeshos/go-spacemesh/syncer/atxsync"
"github.com/spacemeshos/go-spacemesh/syncer/malsync"
Expand Down Expand Up @@ -77,6 +78,14 @@ func MainnetConfig() Config {

hare4conf := hare4.DefaultConfig()
hare4conf.Enable = false

oldAtxSyncCfg := sync2.DefaultConfig()
oldAtxSyncCfg.MultiPeerReconcilerConfig.SyncInterval = time.Hour
oldAtxSyncCfg.MaxDepth = 16
newAtxSyncCfg := sync2.DefaultConfig()
newAtxSyncCfg.MaxDepth = 21
newAtxSyncCfg.MultiPeerReconcilerConfig.SyncInterval = 5 * time.Minute
fasmat marked this conversation as resolved.
Show resolved Hide resolved

return Config{
BaseConfig: BaseConfig{
DataDirParent: defaultDataDir,
Expand Down Expand Up @@ -212,6 +221,17 @@ func MainnetConfig() Config {
DisableMeshAgreement: true,
AtxSync: atxsync.DefaultConfig(),
MalSync: malsync.DefaultConfig(),
ReconcSync: syncer.ReconcSyncConfig{
OldAtxSyncCfg: oldAtxSyncCfg,
NewAtxSyncCfg: newAtxSyncCfg,
ParallelLoadLimit: 10,
HardTimeout: 10 * time.Minute,
ServerConfig: fetch.ServerConfig{
Queue: 200,
Requests: 100,
Interval: time.Second,
},
},
},
Recovery: checkpoint.DefaultConfig(),
Cache: datastore.DefaultConfig(),
Expand Down
19 changes: 19 additions & 0 deletions config/presets/testnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/spacemeshos/go-spacemesh/hare4"
"github.com/spacemeshos/go-spacemesh/miner"
"github.com/spacemeshos/go-spacemesh/p2p"
"github.com/spacemeshos/go-spacemesh/sync2"
"github.com/spacemeshos/go-spacemesh/syncer"
"github.com/spacemeshos/go-spacemesh/syncer/atxsync"
"github.com/spacemeshos/go-spacemesh/syncer/malsync"
Expand Down Expand Up @@ -65,6 +66,13 @@ func testnet() config.Config {
hare4conf := hare4.DefaultConfig()
hare4conf.Enable = false
defaultdir := filepath.Join(home, "spacemesh-testnet", "/")

oldAtxSyncCfg := sync2.DefaultConfig()
oldAtxSyncCfg.MaxDepth = 16
newAtxSyncCfg := sync2.DefaultConfig()
newAtxSyncCfg.MaxDepth = 21
newAtxSyncCfg.MultiPeerReconcilerConfig.SyncInterval = 5 * time.Minute

return config.Config{
Preset: "testnet",
BaseConfig: config.BaseConfig{
Expand Down Expand Up @@ -163,6 +171,17 @@ func testnet() config.Config {
OutOfSyncThresholdLayers: 10,
AtxSync: atxsync.DefaultConfig(),
MalSync: malsync.DefaultConfig(),
ReconcSync: syncer.ReconcSyncConfig{
OldAtxSyncCfg: oldAtxSyncCfg,
NewAtxSyncCfg: newAtxSyncCfg,
ParallelLoadLimit: 10,
HardTimeout: time.Minute,
ServerConfig: fetch.ServerConfig{
Queue: 200,
Requests: 100,
Interval: time.Second,
},
},
},
Recovery: checkpoint.DefaultConfig(),
Cache: datastore.DefaultConfig(),
Expand Down
13 changes: 11 additions & 2 deletions fetch/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
"sync"
"time"

corehost "github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/protocol"
"go.uber.org/zap"
Expand Down Expand Up @@ -116,7 +117,7 @@
Interval time.Duration `mapstructure:"interval"`
}

func (s ServerConfig) toOpts() []server.Opt {
func (s ServerConfig) ToOpts() []server.Opt {
opts := []server.Opt{}
if s.Queue != 0 {
opts = append(opts, server.WithQueueSize(s.Queue))
Expand Down Expand Up @@ -375,7 +376,7 @@
if f.cfg.EnableServerMetrics {
opts = append(opts, server.WithMetrics())
}
opts = append(opts, f.cfg.getServerConfig(protocol).toOpts()...)
opts = append(opts, f.cfg.getServerConfig(protocol).ToOpts()...)
f.servers[protocol] = server.New(host, protocol, handler, opts...)
}

Expand Down Expand Up @@ -1025,3 +1026,11 @@
})
return peers
}

func (f *Fetch) Host() corehost.Host {
return f.host.(corehost.Host)

Check warning on line 1031 in fetch/fetch.go

View check run for this annotation

Codecov / codecov/patch

fetch/fetch.go#L1030-L1031

Added lines #L1030 - L1031 were not covered by tests
}

func (f *Fetch) Peers() *peers.Peers {
return f.peers

Check warning on line 1035 in fetch/fetch.go

View check run for this annotation

Codecov / codecov/patch

fetch/fetch.go#L1034-L1035

Added lines #L1034 - L1035 were not covered by tests
}
5 changes: 4 additions & 1 deletion fetch/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package fetch

import (
"context"

"github.com/spacemeshos/go-spacemesh/common/types"
)

type limiter interface {
Expand All @@ -10,7 +12,8 @@ type limiter interface {
}

type getHashesOpts struct {
limiter limiter
limiter limiter
callback func(types.Hash32, error)
}

type noLimit struct{}
Expand Down
36 changes: 30 additions & 6 deletions fetch/mesh_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
return nil
}

options := system.GetAtxOpts{}
var options system.GetAtxOpts
for _, opt := range opts {
opt(&options)
}
Expand All @@ -42,10 +42,17 @@
zap.Bool("limiting", !options.LimitingOff),
)
hashes := types.ATXIDsToHashes(ids)
if options.LimitingOff {
return f.getHashes(ctx, hashes, datastore.ATXDB, f.validators.atx.HandleMessage)
handler := f.validators.atx.HandleMessage
var ghOpts []getHashesOpt
if !options.LimitingOff {
ghOpts = append(ghOpts, withLimiter(f.getAtxsLimiter))
}
return f.getHashes(ctx, hashes, datastore.ATXDB, f.validators.atx.HandleMessage, withLimiter(f.getAtxsLimiter))
if options.Callback != nil {
ghOpts = append(ghOpts, withHashCallback(func(hash types.Hash32, err error) {
options.Callback(types.ATXID(hash), err)
}))
}
return f.getHashes(ctx, hashes, datastore.ATXDB, handler, ghOpts...)
}

type dataReceiver func(context.Context, types.Hash32, p2p.Peer, []byte) error
Expand All @@ -58,6 +65,12 @@
}
}

func withHashCallback(callback func(types.Hash32, error)) getHashesOpt {
return func(o *getHashesOpts) {
o.callback = callback
}
}

func (f *Fetch) getHashes(
ctx context.Context,
hashes []types.Hash32,
Expand All @@ -66,7 +79,8 @@
opts ...getHashesOpt,
) error {
options := getHashesOpts{
limiter: noLimit{},
limiter: noLimit{},
callback: func(types.Hash32, error) {},
}
for _, opt := range opts {
opt(&options)
Expand All @@ -83,18 +97,26 @@
for i, hash := range hashes {
if err := options.limiter.Acquire(ctx, 1); err != nil {
pendingMetric.Add(float64(i - len(hashes)))
return fmt.Errorf("acquiring slot to get hash: %w", err)
err = fmt.Errorf("acquiring slot to get hash: %w", err)
for _, h := range hashes[i:] {
options.callback(h, err)
}
return err

Check warning on line 104 in fetch/mesh_data.go

View check run for this annotation

Codecov / codecov/patch

fetch/mesh_data.go#L100-L104

Added lines #L100 - L104 were not covered by tests
Comment on lines +100 to +104
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That seems very inefficient calling possibly millions of callbacks to communicate the same error where the caller side probably doesn't even care about any error besides the first?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's rather likely to be a small subset of ATX IDs e.g. not downloaded due to hs/1 request throttling, and these requests will be retried later

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The point still stands - getHashes might be called with 5.0 Mio hashes at once - if the limit cannot be acquired because throttling is active then millions of callbacks are called.

As far as I can see this only affects this section of the code:

go-spacemesh/sync2/atxs.go

Lines 110 to 129 in ad4924b

err := h.f.GetAtxs(ctx, cs.items, system.WithATXCallback(func(id types.ATXID, err error) {
mtx.Lock()
defer mtx.Unlock()
switch {
case err == nil:
cs.numDownloaded++
someSucceeded = true
delete(cs.state, id)
case errors.Is(err, pubsub.ErrValidationReject):
h.logger.Debug("failed to download ATX",
zap.String("atx", id.ShortString()), zap.Error(err))
delete(cs.state, id)
case cs.state[id] >= h.maxAttempts-1:
h.logger.Debug("failed to download ATX: max attempts reached",
zap.String("atx", id.ShortString()))
delete(cs.state, id)
default:
cs.state[id]++
}
}))

This will print a debug log with the exact same error for every ATX and increment every element in cs.state. This could be handled much simpler (and arguably more efficiently) without requiring to keep track of the retries of every single hash:

Arguably this is out of the scope of this PR but this should be addressed. It makes no sense to register a hash for a peer then requesting that hash in a batch and let the fetcher again reconstruct from which peer to fetch that hash from. Error handling is also bad, because for every fetched hash an error has to be returned via callback or aggregated in a &fetcher.BatchError{}. Instead imo it would be much simpler to just have a (blocking) getHash method that fetches a hash from a given peer and returns an error if something went wrong. Then the caller can easily parallize requests and match errors to peers & hashes.

Internally the fetcher can still group requests into single batches, request those from individual peers and deserialise the batched result. This also makes it easier to figure out what when wrong if something did go wrong. Right now we have a lot of log errors of the kind validation ran for unknown hash and hash missing from ongoing requests because the fetcher fails to match requests of hashes to peers and/or callers with how it is structured at the moment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should do a serious fetcher cleanup after we switch to syncv2 at least for ATXs and malfeasance proofs. Right now it's probably a bit too early as we'll have to update v1 syncers that are on their way out.
Simplifying blob fetch logic that currently uses promises etc. should be one of the goals, other being removing non-streaming fetcher client and server code.

}
p, err := f.getHash(ctx, hash, hint, receiver)
if err != nil {
options.limiter.Release(1)
pendingMetric.Add(float64(i - len(hashes)))
for _, h := range hashes[i:] {
options.callback(h, err)
}

Check warning on line 112 in fetch/mesh_data.go

View check run for this annotation

Codecov / codecov/patch

fetch/mesh_data.go#L110-L112

Added lines #L110 - L112 were not covered by tests
Comment on lines +110 to +112
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see above

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above

return err
}
if p == nil {
// data is available locally
options.limiter.Release(1)
pendingMetric.Add(-1)
options.callback(hash, nil)

Check warning on line 119 in fetch/mesh_data.go

View check run for this annotation

Codecov / codecov/patch

fetch/mesh_data.go#L119

Added line #L119 was not covered by tests
continue
}

Expand All @@ -103,6 +125,7 @@
case <-ctx.Done():
options.limiter.Release(1)
pendingMetric.Add(-1)
options.callback(hash, ctx.Err())

Check warning on line 128 in fetch/mesh_data.go

View check run for this annotation

Codecov / codecov/patch

fetch/mesh_data.go#L128

Added line #L128 was not covered by tests
return ctx.Err()
case <-p.completed:
options.limiter.Release(1)
Expand All @@ -118,6 +141,7 @@
bfailure.Add(hash, p.err)
mu.Unlock()
}
options.callback(hash, p.err)
return nil
}
})
Expand Down
21 changes: 17 additions & 4 deletions fetch/mesh_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"errors"
"fmt"
"sync"
"testing"

p2phost "github.com/libp2p/go-libp2p/core/host"
Expand Down Expand Up @@ -87,7 +88,7 @@ func startTestLoop(tb testing.TB, f *Fetch, eg *errgroup.Group, stop chan struct
default:
f.mu.Lock()
for h, req := range f.unprocessed {
require.NoError(tb, req.validator(req.ctx, types.Hash32{}, p2p.NoPeer, []byte{}))
require.NoError(tb, req.validator(req.ctx, h, p2p.NoPeer, []byte{}))
close(req.promise.completed)
delete(f.unprocessed, h)
}
Expand Down Expand Up @@ -596,7 +597,7 @@ func genATXs(tb testing.TB, num uint32) []*types.ActivationTx {
}

func TestGetATXs(t *testing.T) {
atxs := genATXs(t, 2)
atxs := genATXs(t, 4)
f := createFetch(t)
f.mAtxH.EXPECT().
HandleMessage(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Expand All @@ -607,10 +608,22 @@ func TestGetATXs(t *testing.T) {
var eg errgroup.Group
startTestLoop(t, f.Fetch, &eg, stop)

atxIDs := types.ToATXIDs(atxs)
require.NoError(t, f.GetAtxs(context.Background(), atxIDs))
atxIDs1 := types.ToATXIDs(atxs[:2])
require.NoError(t, f.GetAtxs(context.Background(), atxIDs1))

atxIDs2 := types.ToATXIDs(atxs[2:])
var recvIDs []types.ATXID
var mtx sync.Mutex
require.NoError(t, f.GetAtxs(context.Background(), atxIDs2,
system.WithATXCallback(func(id types.ATXID, err error) {
mtx.Lock()
defer mtx.Unlock()
require.NoError(t, err)
recvIDs = append(recvIDs, id)
})))
close(stop)
require.NoError(t, eg.Wait())
require.ElementsMatch(t, atxIDs2, recvIDs)
}

func TestGetActiveSet(t *testing.T) {
Expand Down
Loading
Loading