diff --git a/dagsync/option.go b/dagsync/option.go index 0a9682f..f031481 100644 --- a/dagsync/option.go +++ b/dagsync/option.go @@ -45,6 +45,7 @@ type config struct { adsDepthLimit int64 entriesDepthLimit int64 + firstSyncDepth int64 segDepthLimit int64 strictAdsSelSeq bool @@ -166,6 +167,18 @@ func EntriesDepthLimit(depth int64) Option { } } +// FirstSyncDepth sets the advertisement chain depth to sync on the first sync +// with a new provider. A value of 0, the default, means unlimited depth. +func FirstSyncDepth(depth int64) Option { + return func(c *config) error { + if depth < 0 { + depth = 0 + } + c.firstSyncDepth = depth + return nil + } +} + // SegmentDepthLimit sets the maximum recursion depth limit for a segmented sync. // Setting the depth to a value less than zero disables segmented sync completely. // Disabled by default. diff --git a/dagsync/subscriber.go b/dagsync/subscriber.go index 8c5db3b..dc76fd6 100644 --- a/dagsync/subscriber.go +++ b/dagsync/subscriber.go @@ -99,8 +99,9 @@ type Subscriber struct { // async syncs. syncSem chan struct{} - adsDepthLimit selector.RecursionLimit - segDepthLimit int64 + adsDepthLimit selector.RecursionLimit + firstSyncDepth int64 + segDepthLimit int64 receiver *announce.Receiver @@ -135,7 +136,7 @@ type SyncFinished struct { // PeerID identifies the peer this SyncFinished event pertains to. This is // the publisher of the advertisement chain. PeerID peer.ID - // Count is the number of CID synced. + // Count is the number of CIDs synced. Count int // Err is used to return a failure to complete an asynchronous sync in // response to an announcement. @@ -223,8 +224,9 @@ func NewSubscriber(host host.Host, lsys ipld.LinkSystem, options ...Option) (*Su latestSyncHandler: latestSyncHandler{}, lastKnownSync: opts.lastKnownSync, - adsDepthLimit: recursionLimit(opts.adsDepthLimit), - segDepthLimit: opts.segDepthLimit, + adsDepthLimit: recursionLimit(opts.adsDepthLimit), + firstSyncDepth: opts.firstSyncDepth, + segDepthLimit: opts.segDepthLimit, selectorOne: ssb.ExploreRecursive(selector.RecursionLimitDepth(0), all).Node(), selectorAll: ssb.ExploreRecursive(selector.RecursionLimitNone(), all).Node(), @@ -463,6 +465,8 @@ func (s *Subscriber) SyncAdChain(ctx context.Context, peerInfo peer.AddrInfo, op log.Infow("cid to sync to is the stop node. Nothing to do") return nextCid, nil } + } else if s.firstSyncDepth != 0 && opts.depthLimit == 0 { + depthLimit = recursionLimit(s.firstSyncDepth) } log = log.With("cid", nextCid) @@ -842,6 +846,7 @@ func (h *handler) asyncSyncAdChain(ctx context.Context) { return } + adsDepthLimit := h.subscriber.adsDepthLimit nextCid := amsg.Cid latestSyncLink := h.subscriber.GetLatestSync(h.peerID) var stopAtCid cid.Cid @@ -851,9 +856,12 @@ func (h *handler) asyncSyncAdChain(ctx context.Context) { log.Infow("CID to sync to is the stop node. Nothing to do.", "peer", h.peerID) return } + } else if h.subscriber.firstSyncDepth != 0 { + // If nothing synced yet, use first sync depth if configured. + adsDepthLimit = recursionLimit(h.subscriber.firstSyncDepth) } - sel := ExploreRecursiveWithStopNode(h.subscriber.adsDepthLimit, h.subscriber.adsSelectorSeq, latestSyncLink) + sel := ExploreRecursiveWithStopNode(adsDepthLimit, h.subscriber.adsSelectorSeq, latestSyncLink) syncCount, err := h.handle(ctx, nextCid, sel, syncer, h.subscriber.generalBlockHook, h.subscriber.segDepthLimit, stopAtCid) if err != nil { // Failed to handle the sync, so allow another announce for the same CID. diff --git a/dagsync/sync_test.go b/dagsync/sync_test.go index f9e21ef..eeca396 100644 --- a/dagsync/sync_test.go +++ b/dagsync/sync_test.go @@ -58,6 +58,59 @@ func TestLatestSyncSuccess(t *testing.T) { senderAnnounceTest(t, pub, p2pSender, sub, dstStore, watcher, srcHost.ID(), chainLnks[0]) } +func TestFirstSyncDepth(t *testing.T) { + srcStore := dssync.MutexWrap(datastore.NewMapDatastore()) + dstStore := dssync.MutexWrap(datastore.NewMapDatastore()) + srcHost, srcPrivKey := test.MkTestHostPK(t) + srcLnkS := test.MkLinkSystem(srcStore) + + dstHost := test.MkTestHost(t) + srcHost.Peerstore().AddAddrs(dstHost.ID(), dstHost.Addrs(), time.Hour) + dstHost.Peerstore().AddAddrs(srcHost.ID(), srcHost.Addrs(), time.Hour) + dstLnkS := test.MkLinkSystem(dstStore) + + pub, err := ipnisync.NewPublisher(srcLnkS, srcPrivKey, ipnisync.WithStreamHost(srcHost)) + require.NoError(t, err) + defer pub.Close() + + sub, err := dagsync.NewSubscriber(dstHost, dstLnkS, + dagsync.FirstSyncDepth(1), + dagsync.RecvAnnounce(testTopic), + dagsync.StrictAdsSelector(false)) + require.NoError(t, err) + defer sub.Close() + + watcher, cncl := sub.OnSyncFinished() + defer cncl() + + // Store the whole chain in source node + chainLnks := test.MkChain(srcLnkS, true) + + lnk := chainLnks[0] + adCid := lnk.(cidlink.Link).Cid + + pub.SetRoot(adCid) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + pubInfo := peer.AddrInfo{ + ID: pub.ID(), + Addrs: pub.Addrs(), + } + err = sub.Announce(ctx, adCid, pubInfo) + require.NoError(t, err) + + select { + case <-ctx.Done(): + require.FailNow(t, "timed out waiting for sync to propagate") + case syncDone, open := <-watcher: + require.True(t, open, "event channel closed without receiving event") + require.Equal(t, adCid, syncDone.Cid, "sync returned unexpected cid") + _, err := dstStore.Get(context.Background(), datastore.NewKey(adCid.String())) + require.NoError(t, err, "data not in receiver store") + require.Equal(t, 1, syncDone.Count) + } +} + func TestSyncFn(t *testing.T) { t.Parallel() srcStore := dssync.MutexWrap(datastore.NewMapDatastore())