diff --git a/dagsync/announce_test.go b/dagsync/announce_test.go index 38d9518..f87541e 100644 --- a/dagsync/announce_test.go +++ b/dagsync/announce_test.go @@ -33,7 +33,7 @@ func TestAnnounceReplace(t *testing.T) { blocksSeenByHook[c] = struct{}{} } - sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic, dagsync.RecvAnnounce(), + sub, err := dagsync.NewSubscriber(dstHost, dstLnkS, dagsync.RecvAnnounce(testTopic), dagsync.BlockHook(blockHook)) require.NoError(t, err) defer sub.Close() @@ -170,7 +170,7 @@ func TestAnnounce_LearnsHttpPublisherAddr(t *testing.T) { subh := test.MkTestHost(t) subds := dssync.MutexWrap(datastore.NewMapDatastore()) subls := test.MkLinkSystem(subds) - sub, err := dagsync.NewSubscriber(subh, subds, subls, testTopic, dagsync.RecvAnnounce(), dagsync.StrictAdsSelector(false)) + sub, err := dagsync.NewSubscriber(subh, subls, dagsync.RecvAnnounce(testTopic), dagsync.StrictAdsSelector(false)) require.NoError(t, err) defer sub.Close() @@ -224,13 +224,13 @@ func TestAnnounceRepublish(t *testing.T) { topics := test.WaitForMeshWithMessage(t, testTopic, dstHost, dstHost2) - sub2, err := dagsync.NewSubscriber(dstHost2, dstStore2, dstLnkS2, testTopic, - dagsync.RecvAnnounce(announce.WithTopic(topics[1])), dagsync.StrictAdsSelector(false)) + sub2, err := dagsync.NewSubscriber(dstHost2, dstLnkS2, + dagsync.RecvAnnounce("", announce.WithTopic(topics[1])), dagsync.StrictAdsSelector(false)) require.NoError(t, err) defer sub2.Close() - sub1, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic, - dagsync.RecvAnnounce(announce.WithTopic(topics[0]), announce.WithResend(true)), + sub1, err := dagsync.NewSubscriber(dstHost, dstLnkS, + dagsync.RecvAnnounce("", announce.WithTopic(topics[0]), announce.WithResend(true)), dagsync.StrictAdsSelector(false)) require.NoError(t, err) defer sub1.Close() @@ -377,8 +377,8 @@ func initPubSub(t *testing.T, srcStore, dstStore datastore.Batching, allowPeer f dstHost.Peerstore().AddAddrs(srcHost.ID(), srcHost.Addrs(), time.Hour) dstLnkS := test.MkLinkSystem(dstStore) - sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic, - dagsync.RecvAnnounce(announce.WithTopic(topics[1]), announce.WithAllowPeer(allowPeer))) + sub, err := dagsync.NewSubscriber(dstHost, dstLnkS, + dagsync.RecvAnnounce("", announce.WithTopic(topics[1]), announce.WithAllowPeer(allowPeer))) require.NoError(t, err) err = srcHost.Connect(context.Background(), dstHost.Peerstore().PeerInfo(dstHost.ID())) diff --git a/dagsync/example_test.go b/dagsync/example_test.go index 1dfb0db..3d7abcc 100644 --- a/dagsync/example_test.go +++ b/dagsync/example_test.go @@ -68,16 +68,14 @@ func ExampleSubscriber() { srcHost.Peerstore().AddAddrs(dstHost.ID(), dstHost.Addrs(), time.Hour) dstHost.Peerstore().AddAddrs(srcHost.ID(), srcHost.Addrs(), time.Hour) - sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkSys, "/indexer/ingest/testnet") + sub, err := dagsync.NewSubscriber(dstHost, dstLnkSys, dagsync.RecvAnnounce("/indexer/ingest/testnet")) if err != nil { panic(err) } defer sub.Close() - // Connections must be made after Subscriber is created, because the - // gossip pubsub must be created before connections are made. Otherwise, - // the connecting hosts will not see the destination host has pubsub and - // messages will not get published. + // Connections are made after Subscriber is created, so that the connecting + // host sees that the destination host has pubsub. dstPeerInfo := dstHost.Peerstore().PeerInfo(dstHost.ID()) if err = srcHost.Connect(context.Background(), dstPeerInfo); err != nil { panic(err) diff --git a/dagsync/http_test.go b/dagsync/http_test.go index cd35c15..b281952 100644 --- a/dagsync/http_test.go +++ b/dagsync/http_test.go @@ -51,7 +51,7 @@ func setupPublisherSubscriber(t *testing.T, subscriberOptions []dagsync.Option) dstHost := test.MkTestHost(t) subscriberOptions = append(subscriberOptions, dagsync.StrictAdsSelector(false)) - sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLinkSys, testTopic, subscriberOptions...) + sub, err := dagsync.NewSubscriber(dstHost, dstLinkSys, subscriberOptions...) require.NoError(t, err) t.Cleanup(func() { sub.Close() diff --git a/dagsync/option.go b/dagsync/option.go index b35237e..44989e1 100644 --- a/dagsync/option.go +++ b/dagsync/option.go @@ -39,8 +39,9 @@ type config struct { lastKnownSync LastKnownSyncFunc maxAsyncSyncs int - hasRcvr bool - rcvrOpts []announce.Option + hasRcvr bool + rcvrOpts []announce.Option + rcvrTopic string adsDepthLimit int64 entriesDepthLimit int64 @@ -179,10 +180,11 @@ func SegmentDepthLimit(depth int64) Option { } // RecvAnnounce enables an announcement message receiver. -func RecvAnnounce(opts ...announce.Option) Option { +func RecvAnnounce(topic string, opts ...announce.Option) Option { return func(c *config) error { c.hasRcvr = true c.rcvrOpts = opts + c.rcvrTopic = topic return nil } } diff --git a/dagsync/subscriber.go b/dagsync/subscriber.go index 8c789ca..90e2f70 100644 --- a/dagsync/subscriber.go +++ b/dagsync/subscriber.go @@ -11,7 +11,6 @@ import ( "github.com/gammazero/channelqueue" "github.com/ipfs/go-cid" - "github.com/ipfs/go-datastore" logging "github.com/ipfs/go-log/v2" "github.com/ipld/go-ipld-prime" cidlink "github.com/ipld/go-ipld-prime/linking/cid" @@ -174,7 +173,7 @@ func wrapBlockHook() (*sync.RWMutex, map[peer.ID]func(peer.ID, cid.Cid), func(pe // NewSubscriber creates a new Subscriber that processes pubsub messages and // syncs dags advertised using the specified selector. -func NewSubscriber(host host.Host, ds datastore.Batching, lsys ipld.LinkSystem, topic string, options ...Option) (*Subscriber, error) { +func NewSubscriber(host host.Host, lsys ipld.LinkSystem, options ...Option) (*Subscriber, error) { opts, err := getOpts(options) if err != nil { return nil, err @@ -242,7 +241,7 @@ func NewSubscriber(host host.Host, ds datastore.Batching, lsys ipld.LinkSystem, if opts.maxAsyncSyncs > 0 { s.syncSem = make(chan struct{}, opts.maxAsyncSyncs) } - s.receiver, err = announce.NewReceiver(host, topic, opts.rcvrOpts...) + s.receiver, err = announce.NewReceiver(host, opts.rcvrTopic, opts.rcvrOpts...) if err != nil { return nil, fmt.Errorf("failed to create announcement receiver: %w", err) } diff --git a/dagsync/subscriber_test.go b/dagsync/subscriber_test.go index f8d91da..6a01d7d 100644 --- a/dagsync/subscriber_test.go +++ b/dagsync/subscriber_test.go @@ -64,7 +64,7 @@ func TestScopedBlockHook(t *testing.T) { subLsys := test.MkLinkSystem(subDS) var calledGeneralBlockHookTimes int64 - sub, err := dagsync.NewSubscriber(subHost, subDS, subLsys, testTopic, + sub, err := dagsync.NewSubscriber(subHost, subLsys, dagsync.BlockHook(func(i peer.ID, c cid.Cid, _ dagsync.SegmentSyncActions) { atomic.AddInt64(&calledGeneralBlockHookTimes, 1) }), @@ -128,7 +128,7 @@ func TestSyncedCidsReturned(t *testing.T) { subDS := dssync.MutexWrap(datastore.NewMapDatastore()) subLsys := test.MkLinkSystem(subDS) - sub, err := dagsync.NewSubscriber(subHost, subDS, subLsys, testTopic, dagsync.StrictAdsSelector(false)) + sub, err := dagsync.NewSubscriber(subHost, subLsys, dagsync.StrictAdsSelector(false)) require.NoError(t, err) onFinished, cancel := sub.OnSyncFinished() @@ -191,7 +191,7 @@ func TestConcurrentSync(t *testing.T) { subLsys := test.MkLinkSystem(subDS) var calledTimes int64 - sub, err := dagsync.NewSubscriber(subHost, subDS, subLsys, testTopic, + sub, err := dagsync.NewSubscriber(subHost, subLsys, dagsync.BlockHook(func(i peer.ID, c cid.Cid, _ dagsync.SegmentSyncActions) { atomic.AddInt64(&calledTimes, 1) }), @@ -418,8 +418,8 @@ func TestRoundTrip(t *testing.T) { t.Log("block hook got", c, "from", p) } - sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic, - dagsync.RecvAnnounce(announce.WithTopic(topics[2])), + sub, err := dagsync.NewSubscriber(dstHost, dstLnkS, + dagsync.RecvAnnounce("", announce.WithTopic(topics[2])), dagsync.BlockHook(blockHook), dagsync.StrictAdsSelector(false), ) @@ -623,8 +623,8 @@ func TestMaxAsyncSyncs(t *testing.T) { bhMutex.Unlock() } - sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic, - dagsync.RecvAnnounce(), + sub, err := dagsync.NewSubscriber(dstHost, dstLnkS, + dagsync.RecvAnnounce(testTopic), dagsync.BlockHook(blockHook), dagsync.StrictAdsSelector(false), // If this value is > 1, then test must fail. @@ -717,8 +717,8 @@ func TestMaxAsyncSyncs(t *testing.T) { dstLnkS, blocked = test.MkBlockedLinkSystem(dstStore) blocksSeenByHook = make(map[cid.Cid]struct{}) - sub, err = dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic, - dagsync.RecvAnnounce(), + sub, err = dagsync.NewSubscriber(dstHost, dstLnkS, + dagsync.RecvAnnounce(testTopic), dagsync.BlockHook(blockHook), dagsync.StrictAdsSelector(false), dagsync.MaxAsyncConcurrency(2), @@ -778,7 +778,7 @@ func TestCloseSubscriber(t *testing.T) { lsys := test.MkLinkSystem(st) - sub, err := dagsync.NewSubscriber(sh, st, lsys, testTopic, dagsync.StrictAdsSelector(false)) + sub, err := dagsync.NewSubscriber(sh, lsys, dagsync.StrictAdsSelector(false)) require.NoError(t, err) watcher, cncl := sub.OnSyncFinished() @@ -866,7 +866,6 @@ func newHostSystem(t *testing.T) hostSystem { return hostSystem{ privKey: privKey, host: test.MkTestHost(t, libp2p.Identity(privKey)), - ds: ds, lsys: test.MkLinkSystem(ds), } } @@ -890,7 +889,7 @@ func (b dagsyncPubSubBuilder) Build(t *testing.T, topicName string, pubSys hostS } subOpts = append(subOpts, dagsync.StrictAdsSelector(false)) - sub, err := dagsync.NewSubscriber(subSys.host, subSys.ds, subSys.lsys, topicName, subOpts...) + sub, err := dagsync.NewSubscriber(subSys.host, subSys.lsys, subOpts...) require.NoError(t, err) return pub, sub, senders diff --git a/dagsync/sync_test.go b/dagsync/sync_test.go index 6940c58..929c16b 100644 --- a/dagsync/sync_test.go +++ b/dagsync/sync_test.go @@ -41,8 +41,8 @@ func TestLatestSyncSuccess(t *testing.T) { require.NoError(t, err) defer pub.Close() - sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic, - dagsync.RecvAnnounce(announce.WithTopic(topics[1])), + sub, err := dagsync.NewSubscriber(dstHost, dstLnkS, + dagsync.RecvAnnounce("", announce.WithTopic(topics[1])), dagsync.StrictAdsSelector(false)) require.NoError(t, err) defer sub.Close() @@ -86,9 +86,9 @@ func TestSyncFn(t *testing.T) { blocksSeenByHook[c] = struct{}{} } - sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic, + sub, err := dagsync.NewSubscriber(dstHost, dstLnkS, dagsync.BlockHook(blockHook), - dagsync.RecvAnnounce(announce.WithTopic(topics[1])), + dagsync.RecvAnnounce("", announce.WithTopic(topics[1])), dagsync.StrictAdsSelector(false)) require.NoError(t, err) defer sub.Close() @@ -197,8 +197,8 @@ func TestPartialSync(t *testing.T) { defer pub.Close() test.MkChain(srcLnkS, true) - sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic, - dagsync.RecvAnnounce(announce.WithTopic(topics[1])), + sub, err := dagsync.NewSubscriber(dstHost, dstLnkS, + dagsync.RecvAnnounce("", announce.WithTopic(topics[1])), dagsync.StrictAdsSelector(false)) require.NoError(t, err) defer sub.Close() @@ -251,8 +251,8 @@ func TestStepByStepSync(t *testing.T) { require.NoError(t, err) defer pub.Close() - sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic, - dagsync.RecvAnnounce(announce.WithTopic(topics[1])), + sub, err := dagsync.NewSubscriber(dstHost, dstLnkS, + dagsync.RecvAnnounce("", announce.WithTopic(topics[1])), dagsync.StrictAdsSelector(false)) require.NoError(t, err) defer sub.Close() @@ -291,8 +291,8 @@ func TestLatestSyncFailure(t *testing.T) { t.Log("source host:", srcHost.ID()) t.Log("targer host:", dstHost.ID()) - sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic, - dagsync.RecvAnnounce(), dagsync.StrictAdsSelector(false)) + sub, err := dagsync.NewSubscriber(dstHost, dstLnkS, + dagsync.RecvAnnounce(testTopic), dagsync.StrictAdsSelector(false)) require.NoError(t, err) defer sub.Close() @@ -326,8 +326,8 @@ func TestLatestSyncFailure(t *testing.T) { sub.Close() dstStore = dssync.MutexWrap(datastore.NewMapDatastore()) - sub2, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic, - dagsync.RecvAnnounce(), dagsync.StrictAdsSelector(false)) + sub2, err := dagsync.NewSubscriber(dstHost, dstLnkS, + dagsync.RecvAnnounce(testTopic), dagsync.StrictAdsSelector(false)) require.NoError(t, err) defer sub2.Close() @@ -367,8 +367,8 @@ func TestUpdatePeerstoreAddr(t *testing.T) { dstStore := dssync.MutexWrap(datastore.NewMapDatastore()) dstLnkS := test.MkLinkSystem(dstStore) dstHost := test.MkTestHost(t) - sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic, - dagsync.RecvAnnounce(), dagsync.StrictAdsSelector(false)) + sub, err := dagsync.NewSubscriber(dstHost, dstLnkS, + dagsync.RecvAnnounce(testTopic), dagsync.StrictAdsSelector(false)) require.NoError(t, err) defer sub.Close() @@ -410,8 +410,8 @@ func TestSyncOnAnnounceIPNI(t *testing.T) { dstHost := test.MkTestHost(t) dstLnkS := test.MkLinkSystem(dstStore) - sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic, - dagsync.RecvAnnounce(), dagsync.StrictAdsSelector(false)) + sub, err := dagsync.NewSubscriber(dstHost, dstLnkS, + dagsync.RecvAnnounce(testTopic), dagsync.StrictAdsSelector(false)) require.NoError(t, err) defer sub.Close() @@ -446,8 +446,8 @@ func TestSyncOnAnnounceHTTP(t *testing.T) { dstHost := test.MkTestHost(t) dstLnkS := test.MkLinkSystem(dstStore) - sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic, - dagsync.RecvAnnounce(), dagsync.StrictAdsSelector(false)) + sub, err := dagsync.NewSubscriber(dstHost, dstLnkS, + dagsync.RecvAnnounce(testTopic), dagsync.StrictAdsSelector(false)) require.NoError(t, err) defer sub.Close() @@ -489,7 +489,7 @@ func TestCancelDeadlock(t *testing.T) { require.NoError(t, err) defer pub.Close() - sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic, dagsync.StrictAdsSelector(false)) + sub, err := dagsync.NewSubscriber(dstHost, dstLnkS, dagsync.StrictAdsSelector(false)) require.NoError(t, err) defer sub.Close()