-
Notifications
You must be signed in to change notification settings - Fork 215
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
sync2: ATX integration #6448
base: develop
Are you sure you want to change the base?
sync2: ATX integration #6448
Conversation
This adds a possibility to take a connection from the pool to use it via the Executor interface, and return it later when it's no longer needed. This avoids connection pool overhead in cases when a lot of quries need to be made, but the use of read transactions is not needed. Using read transactions instead of simple connections has the side effect of blocking WAL checkpoints.
Using single connection for multiple SQL queries which are executed during sync avoids noticeable overhead due to SQLite connection pool delays. Also, this change fixes memory overuse in DBSet. When initializing DBSet from a database table, there's no need to use an FPTree with big preallocated pool for the new entries that are added during recent sync.
This adds set reconciliation for ATXs. There are per-epoch syncers, with lower FPTree depth (16 by default) used for older epochs and greater FPTree depth (21 by default) used for current epoch. Both active syncv2 and passive (server-only) syncv2 are disabled by default. It is possible to enable syncv2 in server-only or full (active) mode.
Split sync could become blocked when there were slow peers. Their subranges are assigned to other peers, and there were bugs causing indefinite blocking and panics in these cases. Moreover, after other peers managed to sync the slow peers' subranges ahead of them, we need to interrupt syncing against the slow peers as it's no longer needed. In multipeer sync, when every peer has failed to sync, e.g. due to temporary connection interruption, we don't need to wait for the full sync interval, using shorter wait time between retries.
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## develop #6448 +/- ##
=========================================
- Coverage 79.9% 79.8% -0.1%
=========================================
Files 353 354 +1
Lines 46602 46885 +283
=========================================
+ Hits 37245 37429 +184
- Misses 7247 7330 +83
- Partials 2110 2126 +16 ☔ View full report in Codecov by Sentry. |
It turned out that sync interactions are happening rather quickly, and thus it is not really practical to try and begin handling arriving keys (e.g. ATX IDs) during sync itself. Moreover, on-the-fly key handling was actually only used to register peer IDs for each ATX ID to fetch the actual blob from, and that can be done in the handler's Commit() method just as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewing this PR took me quite some time because it builds on code that I'm not yet very familiar with. There are quite a few things that I noticed when going through the code:
We callbacks in places where they make code more complex (and inefficient) than it needs to be. Why are errors propagated using callbacks?
I'm also unsure about the DB abstraction that syncv2
builds on. Why do we have to implement something like this ourselves? Are there no existing solutions that we can use? Is it even needed? It's another layer of abstraction for mostly simple queries as far as I can tell, it also overlaps with the existing sql/builder
package that exists to solve mostly the same challenges and was newly implemented only a few months ago.
I'm becoming unsure about the extensive use of iterators in syncv2
: they are inconsistent; some are actually rings others are just slices, which leads to them behaving differently and the only way to know is to dig into the implementation. Some uses need to know the number of elements in the iterator before working through it. Wouldn't at that point a slice do as well?. I thought we needed iterators because a) we might not know in advance how many elements will be returned and b) collecting elements eagerly would be much more inefficient than using an iterator.
Especially regarding inefficiency of using slices over iterators: I now believe just behaving similar to io.Reader
(but for Hashes instead of Bytes) would solve a lot of the complexity and remove the need for rangesync.Seq
. Instead we could replace it with a rangesync.Reader
that looks like this:
type Reader interface {
Read([]KeyBytes) (int, error) // what is the need for `KeyBytes` btw? isn't that always a types.Hash32?
}
The caller can still iterate over the elements (by reading in a loop) error propagation is more straight forward and rangesync.SeqResult
is then probably not needed at all? The underlying can be a mock, a DB, some in memory structure, even a network connection - just like for the o.g. io.Reader
🙂 wdyt?
syncer/syncer.go
Outdated
|
||
func (s *Syncer) ensureMalfeasanceInSync(ctx context.Context) error { | ||
// TODO: use syncv2 for malfeasance proofs: | ||
// https://github.com/spacemeshos/go-spacemesh/issues/3987 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not understand how this issue relates to Malfeasance sync?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See this comment: #3987 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still don't understand how these two are related. The issue talks about sync issues in general and only mentions malfeasance proofs in passing. Your comment lists options for different sync methods.
- When and how is this issue considered resolved?
- How is this method in particular related to the issue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed the link to issue to avoid confusion. It has to deal with state sync somewhat, but is indeed more general
err = fmt.Errorf("acquiring slot to get hash: %w", err) | ||
for _, h := range hashes[i:] { | ||
options.callback(h, err) | ||
} | ||
return err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That seems very inefficient calling possibly millions of callbacks to communicate the same error where the caller side probably doesn't even care about any error besides the first?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's rather likely to be a small subset of ATX IDs e.g. not downloaded due to hs/1
request throttling, and these requests will be retried later
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The point still stands - getHashes
might be called with 5.0 Mio hashes at once - if the limit cannot be acquired because throttling is active then millions of callbacks are called.
As far as I can see this only affects this section of the code:
Lines 110 to 129 in ad4924b
err := h.f.GetAtxs(ctx, cs.items, system.WithATXCallback(func(id types.ATXID, err error) { | |
mtx.Lock() | |
defer mtx.Unlock() | |
switch { | |
case err == nil: | |
cs.numDownloaded++ | |
someSucceeded = true | |
delete(cs.state, id) | |
case errors.Is(err, pubsub.ErrValidationReject): | |
h.logger.Debug("failed to download ATX", | |
zap.String("atx", id.ShortString()), zap.Error(err)) | |
delete(cs.state, id) | |
case cs.state[id] >= h.maxAttempts-1: | |
h.logger.Debug("failed to download ATX: max attempts reached", | |
zap.String("atx", id.ShortString())) | |
delete(cs.state, id) | |
default: | |
cs.state[id]++ | |
} | |
})) |
This will print a debug log with the exact same error for every ATX and increment every element in cs.state
. This could be handled much simpler (and arguably more efficiently) without requiring to keep track of the retries of every single hash:
Arguably this is out of the scope of this PR but this should be addressed. It makes no sense to register a hash for a peer then requesting that hash in a batch and let the fetcher again reconstruct from which peer to fetch that hash from. Error handling is also bad, because for every fetched hash an error has to be returned via callback or aggregated in a &fetcher.BatchError{}
. Instead imo it would be much simpler to just have a (blocking) getHash
method that fetches a hash from a given peer and returns an error if something went wrong. Then the caller can easily parallize requests and match errors to peers & hashes.
Internally the fetcher can still group requests into single batches, request those from individual peers and deserialise the batched result. This also makes it easier to figure out what when wrong if something did go wrong. Right now we have a lot of log errors of the kind validation ran for unknown hash
and hash missing from ongoing requests
because the fetcher fails to match requests of hashes to peers and/or callers with how it is structured at the moment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should do a serious fetcher
cleanup after we switch to syncv2
at least for ATXs and malfeasance proofs. Right now it's probably a bit too early as we'll have to update v1 syncers that are on their way out.
Simplifying blob fetch logic that currently uses promises etc. should be one of the goals, other being removing non-streaming fetcher client and server code.
for _, h := range hashes[i:] { | ||
options.callback(h, err) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As above
Thanks a lot for review!
You mean the fetcher interaction (
Passing just The idea was that given that we planned to introduce rqlite/sql dependency in any case (it is to be used for migration code cleanup and clean schema comparsion), we could use it's SQLite AST representation for SQL query generation as well,
The tests contain generated SQL so it can also be seen clearly. Hand-written SQL is often error-prone and thus it probably doesn't harm to state the intent twice in different ways :) Other than that,
Re the need for iterators themselves, there are places in the code where the pattern of invoking The need for cyclic iterators arises from the currently used range representation: https://github.com/spacemeshos/go-spacemesh/blob/develop/dev-docs/sync2-set-reconciliation.md#range-representation Returning temporary slices instead of iterators, which can sometimes turn out to contain million of items, may cause substantial GC pressure and consequently unnecessarily high CPU usage, which is rather pronounced with the old sync implementation.
I'm afraid that will make e.g. |
Made the sequences non-cyclic in most cases, except for the case of |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added more comments to the current PR, sorry for taking a while to finish reviewing it but it is a rather large change...
Regarding your comment:
Returning temporary slices instead of iterators, which can sometimes turn out to contain million of items, may cause substantial GC pressure and consequently unnecessarily high CPU usage, which is rather pronounced with the old sync implementation.
I do understand this, but using iterators doesn't necessarily solve the problem when what is iterated is a slice (which it is in many cases). Slices - as long as they are not copied - are just pointers that are returned or passed as argument. Wrapping them into an iterator doesn't necessarily mean there are fewer memory allocations or less pressure on the GC.
I do understand that especially with a DB backend it can be much more efficient to use iterators than slices. That's why I suggested a "io.Reader
inspired" alternative. Were not all data has to be read into memory at once, but only as much as really is needed - similar how the iterators behave in that case.
I'm afraid that will make e.g. FPTree traversal will become quite a bit more complicated, as well as aforementioned iterator combining code. It can of course wrap an iter.Seq under the hood but then I'm unsure whether it's worth the effort
Yes, it would still be possible to use iter.Seq
where it would simplify code by just wrapping the interface in an iterator.
With the removal of passing along the length of the iterator and removing (most) cyclic cases it is probably OK now but I still feel like in many cases it neither improves readability nor performance but just adds a layer of abstraction 🤷
sync2/atxs.go
Outdated
someSucceeded, err := h.getAtxs(ctx, cs) | ||
switch { | ||
case err == nil: | ||
case errors.Is(err, context.Canceled): | ||
return err | ||
case !errors.Is(err, &fetch.BatchError{}): | ||
h.logger.Debug("failed to download ATXs", zap.Error(err)) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I had this wrong in my previous suggestion. errors.Is
is used to check for value equality, not type equality. We probably want to do this here instead:
someSucceeded, err := h.getAtxs(ctx, cs) | |
switch { | |
case err == nil: | |
case errors.Is(err, context.Canceled): | |
return err | |
case !errors.Is(err, &fetch.BatchError{}): | |
h.logger.Debug("failed to download ATXs", zap.Error(err)) | |
} | |
someSucceeded, err := h.getAtxs(ctx, cs) | |
batchErr := &fetch.BatchError{} | |
switch { | |
case err == nil: | |
case errors.Is(err, context.Canceled): | |
return err | |
case !errors.As(err, batchErr): | |
h.logger.Debug("failed to download ATXs", zap.Error(batchErr)) | |
} |
The reason being that errors.As
will check (and assign) err
to batchErr
if the type allows it. errors.Is
does an equality check which by default will always fail unless err == &fetch.BatchError{}
. If the type implements interface{ Is(error) bool }
then errors.Is
will use the Is
method instead. fetch.BatchError
has such a method that correctly checks if errors.Is
would be true
for any of the contained errors so that a caller can treat a batch error as if it was an error of a single item fetched.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed, but it had to be case !errors.As(err, &batchErr):
b/c BatchError
methods have pointer receiver
sync2/interface.go
Outdated
type Fetcher interface { | ||
system.AtxFetcher | ||
Host() host.Host | ||
Peers() *peers.Peers | ||
RegisterPeerHash(peer p2p.Peer, hash types.Hash32) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can just inline system.AtxFetcher
:
type Fetcher interface { | |
system.AtxFetcher | |
Host() host.Host | |
Peers() *peers.Peers | |
RegisterPeerHash(peer p2p.Peer, hash types.Hash32) | |
} | |
type Fetcher interface { | |
GetAtxs(context.Context, []types.ATXID, ...GetAtxOpt) error | |
Host() host.Host | |
Peers() *peers.Peers | |
RegisterPeerHash(peer p2p.Peer, hash types.Hash32) | |
} |
Interfaces should be defined on the user side not in a system
package.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With my other suggestions this can be further simplified to
type Fetcher interface {
GetAtxs(context.Context, []types.ATXID, ...system.GetAtxOpt) error
RegisterPeerHashes(peer p2p.Peer, hash []types.Hash32)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed by merging your branch
sync2/interface.go
Outdated
system.AtxFetcher | ||
Host() host.Host | ||
Peers() *peers.Peers | ||
RegisterPeerHash(peer p2p.Peer, hash types.Hash32) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also this method exists twice on the fetcher implementation:
RegisterPeerHashes(peer p2p.Peer, hash []types.Hash32)
RegisterPeerHash(peer p2p.Peer, hash types.Hash32)
would it make sense to drop the second and just use the first instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed by merging your branch
@@ -363,6 +363,7 @@ func (ft *FPTree) traverseFrom( | |||
} | |||
|
|||
// All returns all the items currently in the tree (including those in the IDStore). | |||
// The sequence in SeqResult is either empty or infinite. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My linter complains that there is a meaningless assertion in sync2/fptree/nodepool_test.go
:
require.Nil(t, nil, idx3)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A leftover from some older code, removed
syncer/interface.go
Outdated
@@ -47,7 +48,7 @@ type fetcher interface { | |||
GetLayerOpinions(context.Context, p2p.Peer, types.LayerID) ([]byte, error) | |||
GetCert(context.Context, types.LayerID, types.BlockID, []p2p.Peer) (*types.Certificate, error) | |||
|
|||
system.AtxFetcher | |||
sync2.Fetcher |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This Interface can be simplified to:
// fetcher is the interface to the low-level fetching.
type fetcher interface {
GetLayerData(context.Context, p2p.Peer, types.LayerID) ([]byte, error)
GetLayerOpinions(context.Context, p2p.Peer, types.LayerID) ([]byte, error)
GetCert(context.Context, types.LayerID, types.BlockID, []p2p.Peer) (*types.Certificate, error)
sync2.Fetcher
GetMalfeasanceProofs(context.Context, []types.NodeID) error
GetBallots(context.Context, []types.BallotID) error
GetBlocks(context.Context, []types.BlockID) error
RegisterPeerHashes(peer p2p.Peer, hashes []types.Hash32)
SelectBestShuffled(int) []p2p.Peer
PeerEpochInfo(context.Context, p2p.Peer, types.EpochID) (*fetch.EpochData, error)
PeerMeshHashes(context.Context, p2p.Peer, *fetch.MeshHashRequest) (*fetch.MeshHashes, error)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With my other suggestions this can be further simplified to:
type fetcher interface {
GetLayerData(context.Context, p2p.Peer, types.LayerID) ([]byte, error)
GetLayerOpinions(context.Context, p2p.Peer, types.LayerID) ([]byte, error)
GetCert(context.Context, types.LayerID, types.BlockID, []p2p.Peer) (*types.Certificate, error)
GetAtxs(context.Context, []types.ATXID, ...system.GetAtxOpt) error
GetMalfeasanceProofs(context.Context, []types.NodeID) error
GetBallots(context.Context, []types.BallotID) error
GetBlocks(context.Context, []types.BlockID) error
RegisterPeerHashes(peer p2p.Peer, hashes []types.Hash32)
SelectBestShuffled(int) []p2p.Peer
PeerEpochInfo(context.Context, p2p.Peer, types.EpochID) (*fetch.EpochData, error)
PeerMeshHashes(context.Context, p2p.Peer, *fetch.MeshHashRequest) (*fetch.MeshHashes, error)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed by merging your branch
syncer/syncer.go
Outdated
serverOpts := append( | ||
s.cfg.ReconcSync.ServerConfig.ToOpts(), | ||
server.WithHardTimeout(s.cfg.ReconcSync.HardTimeout)) | ||
s.dispatcher = sync2.NewDispatcher(s.logger, fetcher.(sync2.Fetcher), serverOpts) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This cast shouldn't be necessary:
s.dispatcher = sync2.NewDispatcher(s.logger, fetcher.(sync2.Fetcher), serverOpts) | |
s.dispatcher = sync2.NewDispatcher(s.logger, fetcher, serverOpts) |
Also this here is the reason why the sync2.Fetcher
interface needs to expose the Host
method. I think instead it would be better to pass the host explicitly to NewSyncer
and remove the method from the interface and fetch.Fetch
implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed by merging your branch
sync2/atxs.go
Outdated
curSet := dbset.NewDBSet(db, atxsTable(epoch), 32, cfg.MaxDepth) | ||
handler := NewATXHandler(logger, f, cfg.BatchSize, cfg.MaxAttempts, | ||
cfg.MaxBatchRetries, cfg.FailedBatchDelay, nil) | ||
return NewP2PHashSync(logger, d, name, curSet, 32, f.Peers(), handler, cfg, enableActiveSync) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the only use of the fetcher.Peers()
method. Just like with host
the peer.Peers
service should be passed explicitly to NewATXSyncer
and not indirectly via the fetcher. For this it needs to be initialised in node.go
and passed to both the fetcher and here instead of constructing it in the fetcher.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed by merging your branch
syncer/syncer.go
Outdated
hss := sync2.NewATXSyncSource( | ||
s.logger, s.dispatcher, cdb.Database.(sql.StateDatabase), | ||
fetcher.(sync2.Fetcher), s.cfg.ReconcSync.EnableActiveSync) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See my other comments about the sync2.Fetcher
and syncer.fetcher
interfaces
syncer/syncer.go
Outdated
|
||
func (s *Syncer) ensureMalfeasanceInSync(ctx context.Context) error { | ||
// TODO: use syncv2 for malfeasance proofs: | ||
// https://github.com/spacemeshos/go-spacemesh/issues/3987 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still don't understand how these two are related. The issue talks about sync issues in general and only mentions malfeasance proofs in passing. Your comment lists options for different sync methods.
- When and how is this issue considered resolved?
- How is this method in particular related to the issue?
err = fmt.Errorf("acquiring slot to get hash: %w", err) | ||
for _, h := range hashes[i:] { | ||
options.callback(h, err) | ||
} | ||
return err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The point still stands - getHashes
might be called with 5.0 Mio hashes at once - if the limit cannot be acquired because throttling is active then millions of callbacks are called.
As far as I can see this only affects this section of the code:
Lines 110 to 129 in ad4924b
err := h.f.GetAtxs(ctx, cs.items, system.WithATXCallback(func(id types.ATXID, err error) { | |
mtx.Lock() | |
defer mtx.Unlock() | |
switch { | |
case err == nil: | |
cs.numDownloaded++ | |
someSucceeded = true | |
delete(cs.state, id) | |
case errors.Is(err, pubsub.ErrValidationReject): | |
h.logger.Debug("failed to download ATX", | |
zap.String("atx", id.ShortString()), zap.Error(err)) | |
delete(cs.state, id) | |
case cs.state[id] >= h.maxAttempts-1: | |
h.logger.Debug("failed to download ATX: max attempts reached", | |
zap.String("atx", id.ShortString())) | |
delete(cs.state, id) | |
default: | |
cs.state[id]++ | |
} | |
})) |
This will print a debug log with the exact same error for every ATX and increment every element in cs.state
. This could be handled much simpler (and arguably more efficiently) without requiring to keep track of the retries of every single hash:
Arguably this is out of the scope of this PR but this should be addressed. It makes no sense to register a hash for a peer then requesting that hash in a batch and let the fetcher again reconstruct from which peer to fetch that hash from. Error handling is also bad, because for every fetched hash an error has to be returned via callback or aggregated in a &fetcher.BatchError{}
. Instead imo it would be much simpler to just have a (blocking) getHash
method that fetches a hash from a given peer and returns an error if something went wrong. Then the caller can easily parallize requests and match errors to peers & hashes.
Internally the fetcher can still group requests into single batches, request those from individual peers and deserialise the batched result. This also makes it easier to figure out what when wrong if something did go wrong. Right now we have a lot of log errors of the kind validation ran for unknown hash
and hash missing from ongoing requests
because the fetcher fails to match requests of hashes to peers and/or callers with how it is structured at the moment.
|
Motivation
We need more efficient ATX sync.
Description
This adds set reconciliation for ATXs.
There are per-epoch syncers, with lower FPTree depth (16 by default)
used for older epochs and greater FPTree depth (21 by default) used
for current epoch.
Both active syncv2 and passive (server-only) syncv2 are disabled by
default. It is possible to enable syncv2 in server-only or
full (active) mode.
Test Plan
Test on testnet and mainnet nodes
TODO