Skip to content

Commit

Permalink
Explicit sync continues from last fully-processed ad (#2328)
Browse files Browse the repository at this point in the history
An explicit (manual) sync will do nothing if all the advertisements have been synced, but their entries have not yet been synced. This means that either a new advertisement or an explicit resync is required to process the partially synced advertisements.

A common reason for doing a manual sync is to restart a sync that failed, or was interrupted due some error. A new advertisement may not be available to restart processing the advertisement chain, and ipnisync will not send a new advertisement notification because it has already seen the latest advertisement.

Auto-sync suffers from the same problem. If a sync is interrupted after the advertisement chain is synced, but before all the entries are synced, then subsequent polls will do nothing until a new advertisement is published.
  • Loading branch information
gammazero authored Oct 6, 2023
1 parent 114e47b commit 9ef0868
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 4 deletions.
12 changes: 11 additions & 1 deletion internal/ingest/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,9 @@ func (ing *Ingester) Sync(ctx context.Context, peerInfo peer.AddrInfo, depth int
if err != nil {
return cid.Undef, fmt.Errorf("failed to get latest sync: %w", err)
}
// If explicitly syncing, then start from the last fully processed
// advertisement and not the last seen advertisement.
ing.sub.SetLatestSync(peerInfo.ID, latest)
}

if depth != 0 {
Expand Down Expand Up @@ -805,6 +808,9 @@ func (ing *Ingester) autoSync() {
}
log.Info("Auto-syncing the latest advertisement with publisher")

if latest, ok := ing.getLastKnownSync(pubID); ok {
ing.sub.SetLatestSync(pubID, latest)
}
peerInfo := peer.AddrInfo{
ID: pubID,
Addrs: []multiaddr.Multiaddr{pubAddr},
Expand All @@ -820,7 +826,8 @@ func (ing *Ingester) autoSync() {
}
}

// GetLatestSync gets the latest CID synced for the peer.
// GetLatestSync gets the latest CID synced for the peer. If no error is
// returned, then returned CID is never cid.Undef.
func (ing *Ingester) GetLatestSync(publisherID peer.ID) (cid.Cid, error) {
b, err := ing.ds.Get(context.Background(), datastore.NewKey(syncPrefix+publisherID.String()))
if err != nil {
Expand All @@ -829,10 +836,13 @@ func (ing *Ingester) GetLatestSync(publisherID peer.ID) (cid.Cid, error) {
}
return cid.Undef, err
}
// Returns error if b is nil or empty.
_, c, err := cid.CidFromBytes(b)
return c, err
}

// getLastKnownSync returns the CID of the last fully processed advertisement
// and a boolean indicating that a defined CID was successfully found.
func (ing *Ingester) getLastKnownSync(publisherID peer.ID) (cid.Cid, bool) {
c, err := ing.GetLatestSync(publisherID)
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions server/admin/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,19 +320,19 @@ func (h *adminHandler) handlePostSyncs(w http.ResponseWriter, r *http.Request) {
log = log.With("address", syncAddr)
}

log.Info("Syncing with peer")

// Start the sync, but do not wait for it to complete.
h.pendingSyncsLock.Lock()
if _, ok := h.pendingSyncsPeers[peerID]; ok {
h.pendingSyncsLock.Unlock()
log.Info("Manual sync ignored because another sync is in progress")
msg := fmt.Sprintf("Peer %s has already a sync in progress", peerID.String())
http.Error(w, msg, http.StatusBadRequest)
http.Error(w, msg, http.StatusConflict)
return
}
h.pendingSyncsPeers[peerID] = struct{}{}
h.pendingSyncsLock.Unlock()

log.Info("Syncing with peer")
h.pendingSyncs.Add(1)
go func() {
peerInfo := peer.AddrInfo{
Expand Down

0 comments on commit 9ef0868

Please sign in to comment.