diff --git a/internal/ingest/ingest.go b/internal/ingest/ingest.go index dfd32948d..30451d49b 100644 --- a/internal/ingest/ingest.go +++ b/internal/ingest/ingest.go @@ -437,6 +437,9 @@ func (ing *Ingester) Sync(ctx context.Context, peerInfo peer.AddrInfo, depth int if err != nil { return cid.Undef, fmt.Errorf("failed to get latest sync: %w", err) } + // If explicitly syncing, then start from the last fully processed + // advertisement and not the last seen advertisement. + ing.sub.SetLatestSync(peerInfo.ID, latest) } if depth != 0 { @@ -805,6 +808,9 @@ func (ing *Ingester) autoSync() { } log.Info("Auto-syncing the latest advertisement with publisher") + if latest, ok := ing.getLastKnownSync(pubID); ok { + ing.sub.SetLatestSync(pubID, latest) + } peerInfo := peer.AddrInfo{ ID: pubID, Addrs: []multiaddr.Multiaddr{pubAddr}, @@ -820,7 +826,8 @@ func (ing *Ingester) autoSync() { } } -// GetLatestSync gets the latest CID synced for the peer. +// GetLatestSync gets the latest CID synced for the peer. If no error is +// returned, then returned CID is never cid.Undef. func (ing *Ingester) GetLatestSync(publisherID peer.ID) (cid.Cid, error) { b, err := ing.ds.Get(context.Background(), datastore.NewKey(syncPrefix+publisherID.String())) if err != nil { @@ -829,10 +836,13 @@ func (ing *Ingester) GetLatestSync(publisherID peer.ID) (cid.Cid, error) { } return cid.Undef, err } + // Returns error if b is nil or empty. _, c, err := cid.CidFromBytes(b) return c, err } +// getLastKnownSync returns the CID of the last fully processed advertisement +// and a boolean indicating that a defined CID was successfully found. func (ing *Ingester) getLastKnownSync(publisherID peer.ID) (cid.Cid, bool) { c, err := ing.GetLatestSync(publisherID) if err != nil { diff --git a/server/admin/handler.go b/server/admin/handler.go index f4044b53e..97f6b3cb1 100644 --- a/server/admin/handler.go +++ b/server/admin/handler.go @@ -320,19 +320,19 @@ func (h *adminHandler) handlePostSyncs(w http.ResponseWriter, r *http.Request) { log = log.With("address", syncAddr) } - log.Info("Syncing with peer") - // Start the sync, but do not wait for it to complete. h.pendingSyncsLock.Lock() if _, ok := h.pendingSyncsPeers[peerID]; ok { h.pendingSyncsLock.Unlock() + log.Info("Manual sync ignored because another sync is in progress") msg := fmt.Sprintf("Peer %s has already a sync in progress", peerID.String()) - http.Error(w, msg, http.StatusBadRequest) + http.Error(w, msg, http.StatusConflict) return } h.pendingSyncsPeers[peerID] = struct{}{} h.pendingSyncsLock.Unlock() + log.Info("Syncing with peer") h.pendingSyncs.Add(1) go func() { peerInfo := peer.AddrInfo{