Skip to content

Commit

Permalink
Configurable first sync depth (#152)
Browse files Browse the repository at this point in the history
* Configurable first sync depth

There is now a dagsync.Subscriber option to configure the first sync depth for a new provider.

* Add first sync depth test
  • Loading branch information
gammazero authored Feb 9, 2024
1 parent a71a4b6 commit f92d4c1
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 6 deletions.
13 changes: 13 additions & 0 deletions dagsync/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type config struct {

adsDepthLimit int64
entriesDepthLimit int64
firstSyncDepth int64
segDepthLimit int64

strictAdsSelSeq bool
Expand Down Expand Up @@ -166,6 +167,18 @@ func EntriesDepthLimit(depth int64) Option {
}
}

// FirstSyncDepth sets the advertisement chain depth to sync on the first sync
// with a new provider. A value of 0, the default, means unlimited depth.
func FirstSyncDepth(depth int64) Option {
return func(c *config) error {
if depth < 0 {
depth = 0
}
c.firstSyncDepth = depth
return nil
}
}

// SegmentDepthLimit sets the maximum recursion depth limit for a segmented sync.
// Setting the depth to a value less than zero disables segmented sync completely.
// Disabled by default.
Expand Down
20 changes: 14 additions & 6 deletions dagsync/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,9 @@ type Subscriber struct {
// async syncs.
syncSem chan struct{}

adsDepthLimit selector.RecursionLimit
segDepthLimit int64
adsDepthLimit selector.RecursionLimit
firstSyncDepth int64
segDepthLimit int64

receiver *announce.Receiver

Expand Down Expand Up @@ -135,7 +136,7 @@ type SyncFinished struct {
// PeerID identifies the peer this SyncFinished event pertains to. This is
// the publisher of the advertisement chain.
PeerID peer.ID
// Count is the number of CID synced.
// Count is the number of CIDs synced.
Count int
// Err is used to return a failure to complete an asynchronous sync in
// response to an announcement.
Expand Down Expand Up @@ -223,8 +224,9 @@ func NewSubscriber(host host.Host, lsys ipld.LinkSystem, options ...Option) (*Su
latestSyncHandler: latestSyncHandler{},
lastKnownSync: opts.lastKnownSync,

adsDepthLimit: recursionLimit(opts.adsDepthLimit),
segDepthLimit: opts.segDepthLimit,
adsDepthLimit: recursionLimit(opts.adsDepthLimit),
firstSyncDepth: opts.firstSyncDepth,
segDepthLimit: opts.segDepthLimit,

selectorOne: ssb.ExploreRecursive(selector.RecursionLimitDepth(0), all).Node(),
selectorAll: ssb.ExploreRecursive(selector.RecursionLimitNone(), all).Node(),
Expand Down Expand Up @@ -463,6 +465,8 @@ func (s *Subscriber) SyncAdChain(ctx context.Context, peerInfo peer.AddrInfo, op
log.Infow("cid to sync to is the stop node. Nothing to do")
return nextCid, nil
}
} else if s.firstSyncDepth != 0 && opts.depthLimit == 0 {
depthLimit = recursionLimit(s.firstSyncDepth)
}

log = log.With("cid", nextCid)
Expand Down Expand Up @@ -842,6 +846,7 @@ func (h *handler) asyncSyncAdChain(ctx context.Context) {
return
}

adsDepthLimit := h.subscriber.adsDepthLimit
nextCid := amsg.Cid
latestSyncLink := h.subscriber.GetLatestSync(h.peerID)
var stopAtCid cid.Cid
Expand All @@ -851,9 +856,12 @@ func (h *handler) asyncSyncAdChain(ctx context.Context) {
log.Infow("CID to sync to is the stop node. Nothing to do.", "peer", h.peerID)
return
}
} else if h.subscriber.firstSyncDepth != 0 {
// If nothing synced yet, use first sync depth if configured.
adsDepthLimit = recursionLimit(h.subscriber.firstSyncDepth)
}

sel := ExploreRecursiveWithStopNode(h.subscriber.adsDepthLimit, h.subscriber.adsSelectorSeq, latestSyncLink)
sel := ExploreRecursiveWithStopNode(adsDepthLimit, h.subscriber.adsSelectorSeq, latestSyncLink)
syncCount, err := h.handle(ctx, nextCid, sel, syncer, h.subscriber.generalBlockHook, h.subscriber.segDepthLimit, stopAtCid)
if err != nil {
// Failed to handle the sync, so allow another announce for the same CID.
Expand Down
53 changes: 53 additions & 0 deletions dagsync/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,59 @@ func TestLatestSyncSuccess(t *testing.T) {
senderAnnounceTest(t, pub, p2pSender, sub, dstStore, watcher, srcHost.ID(), chainLnks[0])
}

func TestFirstSyncDepth(t *testing.T) {
srcStore := dssync.MutexWrap(datastore.NewMapDatastore())
dstStore := dssync.MutexWrap(datastore.NewMapDatastore())
srcHost, srcPrivKey := test.MkTestHostPK(t)
srcLnkS := test.MkLinkSystem(srcStore)

dstHost := test.MkTestHost(t)
srcHost.Peerstore().AddAddrs(dstHost.ID(), dstHost.Addrs(), time.Hour)
dstHost.Peerstore().AddAddrs(srcHost.ID(), srcHost.Addrs(), time.Hour)
dstLnkS := test.MkLinkSystem(dstStore)

pub, err := ipnisync.NewPublisher(srcLnkS, srcPrivKey, ipnisync.WithStreamHost(srcHost))
require.NoError(t, err)
defer pub.Close()

sub, err := dagsync.NewSubscriber(dstHost, dstLnkS,
dagsync.FirstSyncDepth(1),
dagsync.RecvAnnounce(testTopic),
dagsync.StrictAdsSelector(false))
require.NoError(t, err)
defer sub.Close()

watcher, cncl := sub.OnSyncFinished()
defer cncl()

// Store the whole chain in source node
chainLnks := test.MkChain(srcLnkS, true)

lnk := chainLnks[0]
adCid := lnk.(cidlink.Link).Cid

pub.SetRoot(adCid)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
pubInfo := peer.AddrInfo{
ID: pub.ID(),
Addrs: pub.Addrs(),
}
err = sub.Announce(ctx, adCid, pubInfo)
require.NoError(t, err)

select {
case <-ctx.Done():
require.FailNow(t, "timed out waiting for sync to propagate")
case syncDone, open := <-watcher:
require.True(t, open, "event channel closed without receiving event")
require.Equal(t, adCid, syncDone.Cid, "sync returned unexpected cid")
_, err := dstStore.Get(context.Background(), datastore.NewKey(adCid.String()))
require.NoError(t, err, "data not in receiver store")
require.Equal(t, 1, syncDone.Count)
}
}

func TestSyncFn(t *testing.T) {
t.Parallel()
srcStore := dssync.MutexWrap(datastore.NewMapDatastore())
Expand Down

0 comments on commit f92d4c1

Please sign in to comment.