Skip to content

Commit

Permalink
First sync depth option for dagsync.Subscriber (#151)
Browse files Browse the repository at this point in the history
  • Loading branch information
gammazero authored Feb 9, 2024
1 parent e98f3fd commit 8f7bd63
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 6 deletions.
13 changes: 13 additions & 0 deletions dagsync/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type config struct {
rcvrOpts []announce.Option

adsDepthLimit int64
firstSyncDepth int64
entriesDepthLimit int64
segDepthLimit int64

Expand Down Expand Up @@ -173,6 +174,18 @@ func EntriesDepthLimit(depth int64) Option {
}
}

// FirstSyncDepth sets the advertisement chain depth to sync on the first sync
// with a new provider. A value of 0, the default, means unlimited depth.
func FirstSyncDepth(depth int64) Option {
return func(c *config) error {
if depth < 0 {
depth = 0
}
c.firstSyncDepth = depth
return nil
}
}

// SegmentDepthLimit sets the maximum recursion depth limit for a segmented sync.
// Setting the depth to a value less than zero disables segmented sync completely.
// Disabled by default.
Expand Down
20 changes: 14 additions & 6 deletions dagsync/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,9 @@ type Subscriber struct {
// async syncs.
syncSem chan struct{}

adsDepthLimit selector.RecursionLimit
segDepthLimit int64
adsDepthLimit selector.RecursionLimit
firstSyncDepth int64
segDepthLimit int64

receiver *announce.Receiver
topicName string
Expand Down Expand Up @@ -230,9 +231,10 @@ func NewSubscriber(host host.Host, ds datastore.Batching, lsys ipld.LinkSystem,
latestSyncHandler: latestSyncHandler{},
lastKnownSync: opts.lastKnownSync,

adsDepthLimit: recursionLimit(opts.adsDepthLimit),
segDepthLimit: opts.segDepthLimit,
topicName: topic,
adsDepthLimit: recursionLimit(opts.adsDepthLimit),
firstSyncDepth: opts.firstSyncDepth,
segDepthLimit: opts.segDepthLimit,
topicName: topic,

selectorOne: ssb.ExploreRecursive(selector.RecursionLimitDepth(0), all).Node(),
selectorAll: ssb.ExploreRecursive(selector.RecursionLimitNone(), all).Node(),
Expand Down Expand Up @@ -472,6 +474,8 @@ func (s *Subscriber) SyncAdChain(ctx context.Context, peerInfo peer.AddrInfo, op
log.Infow("cid to sync to is the stop node. Nothing to do")
return nextCid, nil
}
} else if s.firstSyncDepth != 0 && opts.depthLimit == 0 {
depthLimit = recursionLimit(s.firstSyncDepth)
}

log = log.With("cid", nextCid)
Expand Down Expand Up @@ -854,6 +858,7 @@ func (h *handler) asyncSyncAdChain(ctx context.Context) {
return
}

adsDepthLimit := h.subscriber.adsDepthLimit
nextCid := amsg.Cid
latestSyncLink := h.subscriber.GetLatestSync(h.peerID)
var stopAtCid cid.Cid
Expand All @@ -863,9 +868,12 @@ func (h *handler) asyncSyncAdChain(ctx context.Context) {
log.Infow("CID to sync to is the stop node. Nothing to do.", "peer", h.peerID)
return
}
} else if h.subscriber.firstSyncDepth != 0 {
// If nothing synced yet, use first sync depth if configured.
adsDepthLimit = recursionLimit(h.subscriber.firstSyncDepth)
}

sel := ExploreRecursiveWithStopNode(h.subscriber.adsDepthLimit, h.subscriber.adsSelectorSeq, latestSyncLink)
sel := ExploreRecursiveWithStopNode(adsDepthLimit, h.subscriber.adsSelectorSeq, latestSyncLink)
syncCount, err := h.handle(ctx, nextCid, sel, syncer, h.subscriber.generalBlockHook, h.subscriber.segDepthLimit, stopAtCid)
if err != nil {
// Failed to handle the sync, so allow another announce for the same CID.
Expand Down

0 comments on commit 8f7bd63

Please sign in to comment.