Skip to content

Commit

Permalink
NewSubscriber does not need datastore and topic args
Browse files Browse the repository at this point in the history
  • Loading branch information
gammazero committed Jan 8, 2024
1 parent 1d8fbe0 commit 9f1e40b
Show file tree
Hide file tree
Showing 7 changed files with 49 additions and 51 deletions.
16 changes: 8 additions & 8 deletions dagsync/announce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestAnnounceReplace(t *testing.T) {
blocksSeenByHook[c] = struct{}{}
}

sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic, dagsync.RecvAnnounce(),
sub, err := dagsync.NewSubscriber(dstHost, dstLnkS, dagsync.RecvAnnounce(testTopic),
dagsync.BlockHook(blockHook))
require.NoError(t, err)
defer sub.Close()
Expand Down Expand Up @@ -170,7 +170,7 @@ func TestAnnounce_LearnsHttpPublisherAddr(t *testing.T) {
subh := test.MkTestHost(t)
subds := dssync.MutexWrap(datastore.NewMapDatastore())
subls := test.MkLinkSystem(subds)
sub, err := dagsync.NewSubscriber(subh, subds, subls, testTopic, dagsync.RecvAnnounce(), dagsync.StrictAdsSelector(false))
sub, err := dagsync.NewSubscriber(subh, subls, dagsync.RecvAnnounce(testTopic), dagsync.StrictAdsSelector(false))
require.NoError(t, err)
defer sub.Close()

Expand Down Expand Up @@ -224,13 +224,13 @@ func TestAnnounceRepublish(t *testing.T) {

topics := test.WaitForMeshWithMessage(t, testTopic, dstHost, dstHost2)

sub2, err := dagsync.NewSubscriber(dstHost2, dstStore2, dstLnkS2, testTopic,
dagsync.RecvAnnounce(announce.WithTopic(topics[1])), dagsync.StrictAdsSelector(false))
sub2, err := dagsync.NewSubscriber(dstHost2, dstLnkS2,
dagsync.RecvAnnounce("", announce.WithTopic(topics[1])), dagsync.StrictAdsSelector(false))
require.NoError(t, err)
defer sub2.Close()

sub1, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic,
dagsync.RecvAnnounce(announce.WithTopic(topics[0]), announce.WithResend(true)),
sub1, err := dagsync.NewSubscriber(dstHost, dstLnkS,
dagsync.RecvAnnounce("", announce.WithTopic(topics[0]), announce.WithResend(true)),
dagsync.StrictAdsSelector(false))
require.NoError(t, err)
defer sub1.Close()
Expand Down Expand Up @@ -377,8 +377,8 @@ func initPubSub(t *testing.T, srcStore, dstStore datastore.Batching, allowPeer f
dstHost.Peerstore().AddAddrs(srcHost.ID(), srcHost.Addrs(), time.Hour)
dstLnkS := test.MkLinkSystem(dstStore)

sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic,
dagsync.RecvAnnounce(announce.WithTopic(topics[1]), announce.WithAllowPeer(allowPeer)))
sub, err := dagsync.NewSubscriber(dstHost, dstLnkS,
dagsync.RecvAnnounce("", announce.WithTopic(topics[1]), announce.WithAllowPeer(allowPeer)))
require.NoError(t, err)

err = srcHost.Connect(context.Background(), dstHost.Peerstore().PeerInfo(dstHost.ID()))
Expand Down
8 changes: 3 additions & 5 deletions dagsync/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,16 +68,14 @@ func ExampleSubscriber() {
srcHost.Peerstore().AddAddrs(dstHost.ID(), dstHost.Addrs(), time.Hour)
dstHost.Peerstore().AddAddrs(srcHost.ID(), srcHost.Addrs(), time.Hour)

sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkSys, "/indexer/ingest/testnet")
sub, err := dagsync.NewSubscriber(dstHost, dstLnkSys, dagsync.RecvAnnounce("/indexer/ingest/testnet"))
if err != nil {
panic(err)
}
defer sub.Close()

// Connections must be made after Subscriber is created, because the
// gossip pubsub must be created before connections are made. Otherwise,
// the connecting hosts will not see the destination host has pubsub and
// messages will not get published.
// Connections are made after Subscriber is created, so that the connecting
// host sees that the destination host has pubsub.
dstPeerInfo := dstHost.Peerstore().PeerInfo(dstHost.ID())
if err = srcHost.Connect(context.Background(), dstPeerInfo); err != nil {
panic(err)
Expand Down
2 changes: 1 addition & 1 deletion dagsync/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func setupPublisherSubscriber(t *testing.T, subscriberOptions []dagsync.Option)
dstHost := test.MkTestHost(t)

subscriberOptions = append(subscriberOptions, dagsync.StrictAdsSelector(false))
sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLinkSys, testTopic, subscriberOptions...)
sub, err := dagsync.NewSubscriber(dstHost, dstLinkSys, subscriberOptions...)
require.NoError(t, err)
t.Cleanup(func() {
sub.Close()
Expand Down
8 changes: 5 additions & 3 deletions dagsync/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ type config struct {
lastKnownSync LastKnownSyncFunc
maxAsyncSyncs int

hasRcvr bool
rcvrOpts []announce.Option
hasRcvr bool
rcvrOpts []announce.Option
rcvrTopic string

adsDepthLimit int64
entriesDepthLimit int64
Expand Down Expand Up @@ -179,10 +180,11 @@ func SegmentDepthLimit(depth int64) Option {
}

// RecvAnnounce enables an announcement message receiver.
func RecvAnnounce(opts ...announce.Option) Option {
func RecvAnnounce(topic string, opts ...announce.Option) Option {
return func(c *config) error {
c.hasRcvr = true
c.rcvrOpts = opts
c.rcvrTopic = topic
return nil
}
}
Expand Down
5 changes: 2 additions & 3 deletions dagsync/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (

"github.com/gammazero/channelqueue"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
logging "github.com/ipfs/go-log/v2"
"github.com/ipld/go-ipld-prime"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
Expand Down Expand Up @@ -174,7 +173,7 @@ func wrapBlockHook() (*sync.RWMutex, map[peer.ID]func(peer.ID, cid.Cid), func(pe

// NewSubscriber creates a new Subscriber that processes pubsub messages and
// syncs dags advertised using the specified selector.
func NewSubscriber(host host.Host, ds datastore.Batching, lsys ipld.LinkSystem, topic string, options ...Option) (*Subscriber, error) {
func NewSubscriber(host host.Host, lsys ipld.LinkSystem, options ...Option) (*Subscriber, error) {
opts, err := getOpts(options)
if err != nil {
return nil, err
Expand Down Expand Up @@ -242,7 +241,7 @@ func NewSubscriber(host host.Host, ds datastore.Batching, lsys ipld.LinkSystem,
if opts.maxAsyncSyncs > 0 {
s.syncSem = make(chan struct{}, opts.maxAsyncSyncs)
}
s.receiver, err = announce.NewReceiver(host, topic, opts.rcvrOpts...)
s.receiver, err = announce.NewReceiver(host, opts.rcvrTopic, opts.rcvrOpts...)
if err != nil {
return nil, fmt.Errorf("failed to create announcement receiver: %w", err)
}
Expand Down
23 changes: 11 additions & 12 deletions dagsync/subscriber_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func TestScopedBlockHook(t *testing.T) {
subLsys := test.MkLinkSystem(subDS)

var calledGeneralBlockHookTimes int64
sub, err := dagsync.NewSubscriber(subHost, subDS, subLsys, testTopic,
sub, err := dagsync.NewSubscriber(subHost, subLsys,
dagsync.BlockHook(func(i peer.ID, c cid.Cid, _ dagsync.SegmentSyncActions) {
atomic.AddInt64(&calledGeneralBlockHookTimes, 1)
}),
Expand Down Expand Up @@ -128,7 +128,7 @@ func TestSyncedCidsReturned(t *testing.T) {
subDS := dssync.MutexWrap(datastore.NewMapDatastore())
subLsys := test.MkLinkSystem(subDS)

sub, err := dagsync.NewSubscriber(subHost, subDS, subLsys, testTopic, dagsync.StrictAdsSelector(false))
sub, err := dagsync.NewSubscriber(subHost, subLsys, dagsync.StrictAdsSelector(false))
require.NoError(t, err)

onFinished, cancel := sub.OnSyncFinished()
Expand Down Expand Up @@ -191,7 +191,7 @@ func TestConcurrentSync(t *testing.T) {
subLsys := test.MkLinkSystem(subDS)

var calledTimes int64
sub, err := dagsync.NewSubscriber(subHost, subDS, subLsys, testTopic,
sub, err := dagsync.NewSubscriber(subHost, subLsys,
dagsync.BlockHook(func(i peer.ID, c cid.Cid, _ dagsync.SegmentSyncActions) {
atomic.AddInt64(&calledTimes, 1)
}),
Expand Down Expand Up @@ -418,8 +418,8 @@ func TestRoundTrip(t *testing.T) {
t.Log("block hook got", c, "from", p)
}

sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic,
dagsync.RecvAnnounce(announce.WithTopic(topics[2])),
sub, err := dagsync.NewSubscriber(dstHost, dstLnkS,
dagsync.RecvAnnounce("", announce.WithTopic(topics[2])),
dagsync.BlockHook(blockHook),
dagsync.StrictAdsSelector(false),
)
Expand Down Expand Up @@ -623,8 +623,8 @@ func TestMaxAsyncSyncs(t *testing.T) {
bhMutex.Unlock()
}

sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic,
dagsync.RecvAnnounce(),
sub, err := dagsync.NewSubscriber(dstHost, dstLnkS,
dagsync.RecvAnnounce(testTopic),
dagsync.BlockHook(blockHook),
dagsync.StrictAdsSelector(false),
// If this value is > 1, then test must fail.
Expand Down Expand Up @@ -717,8 +717,8 @@ func TestMaxAsyncSyncs(t *testing.T) {
dstLnkS, blocked = test.MkBlockedLinkSystem(dstStore)
blocksSeenByHook = make(map[cid.Cid]struct{})

sub, err = dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic,
dagsync.RecvAnnounce(),
sub, err = dagsync.NewSubscriber(dstHost, dstLnkS,
dagsync.RecvAnnounce(testTopic),
dagsync.BlockHook(blockHook),
dagsync.StrictAdsSelector(false),
dagsync.MaxAsyncConcurrency(2),
Expand Down Expand Up @@ -778,7 +778,7 @@ func TestCloseSubscriber(t *testing.T) {

lsys := test.MkLinkSystem(st)

sub, err := dagsync.NewSubscriber(sh, st, lsys, testTopic, dagsync.StrictAdsSelector(false))
sub, err := dagsync.NewSubscriber(sh, lsys, dagsync.StrictAdsSelector(false))
require.NoError(t, err)

watcher, cncl := sub.OnSyncFinished()
Expand Down Expand Up @@ -866,7 +866,6 @@ func newHostSystem(t *testing.T) hostSystem {
return hostSystem{
privKey: privKey,
host: test.MkTestHost(t, libp2p.Identity(privKey)),
ds: ds,
lsys: test.MkLinkSystem(ds),
}
}
Expand All @@ -890,7 +889,7 @@ func (b dagsyncPubSubBuilder) Build(t *testing.T, topicName string, pubSys hostS
}

subOpts = append(subOpts, dagsync.StrictAdsSelector(false))
sub, err := dagsync.NewSubscriber(subSys.host, subSys.ds, subSys.lsys, topicName, subOpts...)
sub, err := dagsync.NewSubscriber(subSys.host, subSys.lsys, subOpts...)
require.NoError(t, err)

return pub, sub, senders
Expand Down
38 changes: 19 additions & 19 deletions dagsync/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ func TestLatestSyncSuccess(t *testing.T) {
require.NoError(t, err)
defer pub.Close()

sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic,
dagsync.RecvAnnounce(announce.WithTopic(topics[1])),
sub, err := dagsync.NewSubscriber(dstHost, dstLnkS,
dagsync.RecvAnnounce("", announce.WithTopic(topics[1])),
dagsync.StrictAdsSelector(false))
require.NoError(t, err)
defer sub.Close()
Expand Down Expand Up @@ -86,9 +86,9 @@ func TestSyncFn(t *testing.T) {
blocksSeenByHook[c] = struct{}{}
}

sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic,
sub, err := dagsync.NewSubscriber(dstHost, dstLnkS,
dagsync.BlockHook(blockHook),
dagsync.RecvAnnounce(announce.WithTopic(topics[1])),
dagsync.RecvAnnounce("", announce.WithTopic(topics[1])),
dagsync.StrictAdsSelector(false))
require.NoError(t, err)
defer sub.Close()
Expand Down Expand Up @@ -197,8 +197,8 @@ func TestPartialSync(t *testing.T) {
defer pub.Close()
test.MkChain(srcLnkS, true)

sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic,
dagsync.RecvAnnounce(announce.WithTopic(topics[1])),
sub, err := dagsync.NewSubscriber(dstHost, dstLnkS,
dagsync.RecvAnnounce("", announce.WithTopic(topics[1])),
dagsync.StrictAdsSelector(false))
require.NoError(t, err)
defer sub.Close()
Expand Down Expand Up @@ -251,8 +251,8 @@ func TestStepByStepSync(t *testing.T) {
require.NoError(t, err)
defer pub.Close()

sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic,
dagsync.RecvAnnounce(announce.WithTopic(topics[1])),
sub, err := dagsync.NewSubscriber(dstHost, dstLnkS,
dagsync.RecvAnnounce("", announce.WithTopic(topics[1])),
dagsync.StrictAdsSelector(false))
require.NoError(t, err)
defer sub.Close()
Expand Down Expand Up @@ -291,8 +291,8 @@ func TestLatestSyncFailure(t *testing.T) {
t.Log("source host:", srcHost.ID())
t.Log("targer host:", dstHost.ID())

sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic,
dagsync.RecvAnnounce(), dagsync.StrictAdsSelector(false))
sub, err := dagsync.NewSubscriber(dstHost, dstLnkS,
dagsync.RecvAnnounce(testTopic), dagsync.StrictAdsSelector(false))
require.NoError(t, err)
defer sub.Close()

Expand Down Expand Up @@ -326,8 +326,8 @@ func TestLatestSyncFailure(t *testing.T) {
sub.Close()

dstStore = dssync.MutexWrap(datastore.NewMapDatastore())

Check failure on line 328 in dagsync/sync_test.go

View workflow job for this annotation

GitHub Actions / go-check / All

this value of dstStore is never used (SA4006)
sub2, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic,
dagsync.RecvAnnounce(), dagsync.StrictAdsSelector(false))
sub2, err := dagsync.NewSubscriber(dstHost, dstLnkS,
dagsync.RecvAnnounce(testTopic), dagsync.StrictAdsSelector(false))
require.NoError(t, err)
defer sub2.Close()

Expand Down Expand Up @@ -367,8 +367,8 @@ func TestUpdatePeerstoreAddr(t *testing.T) {
dstStore := dssync.MutexWrap(datastore.NewMapDatastore())
dstLnkS := test.MkLinkSystem(dstStore)
dstHost := test.MkTestHost(t)
sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic,
dagsync.RecvAnnounce(), dagsync.StrictAdsSelector(false))
sub, err := dagsync.NewSubscriber(dstHost, dstLnkS,
dagsync.RecvAnnounce(testTopic), dagsync.StrictAdsSelector(false))
require.NoError(t, err)
defer sub.Close()

Expand Down Expand Up @@ -410,8 +410,8 @@ func TestSyncOnAnnounceIPNI(t *testing.T) {
dstHost := test.MkTestHost(t)
dstLnkS := test.MkLinkSystem(dstStore)

sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic,
dagsync.RecvAnnounce(), dagsync.StrictAdsSelector(false))
sub, err := dagsync.NewSubscriber(dstHost, dstLnkS,
dagsync.RecvAnnounce(testTopic), dagsync.StrictAdsSelector(false))
require.NoError(t, err)
defer sub.Close()

Expand Down Expand Up @@ -446,8 +446,8 @@ func TestSyncOnAnnounceHTTP(t *testing.T) {
dstHost := test.MkTestHost(t)
dstLnkS := test.MkLinkSystem(dstStore)

sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic,
dagsync.RecvAnnounce(), dagsync.StrictAdsSelector(false))
sub, err := dagsync.NewSubscriber(dstHost, dstLnkS,
dagsync.RecvAnnounce(testTopic), dagsync.StrictAdsSelector(false))
require.NoError(t, err)
defer sub.Close()

Expand Down Expand Up @@ -489,7 +489,7 @@ func TestCancelDeadlock(t *testing.T) {
require.NoError(t, err)
defer pub.Close()

sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic, dagsync.StrictAdsSelector(false))
sub, err := dagsync.NewSubscriber(dstHost, dstLnkS, dagsync.StrictAdsSelector(false))
require.NoError(t, err)
defer sub.Close()

Expand Down

0 comments on commit 9f1e40b

Please sign in to comment.