diff --git a/dagsync/subscriber.go b/dagsync/subscriber.go index ec514be..7bf3914 100644 --- a/dagsync/subscriber.go +++ b/dagsync/subscriber.go @@ -111,6 +111,10 @@ type Subscriber struct { expSyncWG sync.WaitGroup // selector sequence for advertisements + // + // This selector sequence is wrapped with selector logic that stops + // traversal when the latest synced link is reached. So, this only + // specifies the selection sequence itself. adsSelectorSeq ipld.Node // selectorOne selects one multihash entries or HAMT block. @@ -347,8 +351,6 @@ func (s *Subscriber) doClose() error { func (s *Subscriber) OnSyncFinished() (<-chan SyncFinished, context.CancelFunc) { // Channel is buffered to prevent distribute() from blocking if a reader is // not reading the channel immediately. - log.Info("Configuring subscriber OnSyncFinished...") - cq := channelqueue.New[SyncFinished](-1) ch := cq.In() s.addEventChan <- ch @@ -363,7 +365,7 @@ func (s *Subscriber) OnSyncFinished() (<-chan SyncFinished, context.CancelFunc) } ch = nil } - log.Info("Subscriber OnSyncFinished configured.") + log.Debug("Subscriber OnSyncFinished configured") return cq.Out(), cncl } @@ -396,16 +398,6 @@ func (s *Subscriber) RemoveHandler(peerID peer.ID) bool { // It is the responsibility of the caller to make sure the given CID appears // after the latest sync in order to avid re-syncing of content that may have // previously been synced. -// -// The selector sequence, sel, can optionally be specified to customize the -// selection sequence during traversal. If unspecified, the default selector -// sequence is used. -// -// Note that the selector sequence is wrapped with a selector logic that will -// stop traversal when the latest synced link is reached. Therefore, it must -// only specify the selection sequence itself. -// -// See: ExploreRecursiveWithStopNode. func (s *Subscriber) SyncAdChain(ctx context.Context, peerInfo peer.AddrInfo, options ...SyncOption) (cid.Cid, error) { s.expSyncMutex.Lock() if s.expSyncClosed { @@ -483,7 +475,7 @@ func (s *Subscriber) SyncAdChain(ctx context.Context, peerInfo peer.AddrInfo, op } log = log.With("cid", nextCid) - log.Info("Sync starting advertisement chain at head CID") + log.Debug("Start advertisement chain sync") if ctx.Err() != nil { return cid.Undef, fmt.Errorf("sync canceled: %w", ctx.Err()) @@ -554,7 +546,7 @@ func (s *Subscriber) SyncHAMTEntries(ctx context.Context, peerInfo peer.AddrInfo func (s *Subscriber) syncEntries(ctx context.Context, peerInfo peer.AddrInfo, entCid cid.Cid, sel ipld.Node, bh BlockHookFunc, segdl int64) error { if entCid == cid.Undef { - log.Info("No entries to sync", "peer", peerInfo.ID) + log.Infow("No entries to sync", "peer", peerInfo.ID) return nil } @@ -577,7 +569,7 @@ func (s *Subscriber) syncEntries(ctx context.Context, peerInfo peer.AddrInfo, en return err } - log.Info("Start entries sync", "peer", peerInfo.ID, "cid", entCid) + log.Debugw("Start entries sync", "peer", peerInfo.ID, "cid", entCid) // Check for an existing handler for the specified peer (publisher). If // none, create one if allowed. @@ -755,6 +747,28 @@ func (s *Subscriber) Announce(ctx context.Context, nextCid cid.Cid, peerInfo pee return s.receiver.Direct(ctx, nextCid, peerInfo) } +// delNotPresent removes from the peerStore the peer's multiaddrs that are not +// present in the specified addrs slice. +func delNotPresent(peerStore peerstore.Peerstore, peerID peer.ID, addrs []multiaddr.Multiaddr) { + var del []multiaddr.Multiaddr + oldAddrs := peerStore.Addrs(peerID) + for _, old := range oldAddrs { + keep := false + for _, new := range addrs { + if old.Equal(new) { + keep = true + break + } + } + if !keep { + del = append(del, old) + } + } + if len(del) != 0 { + peerStore.SetAddrs(peerID, del, 0) + } +} + func (s *Subscriber) makeSyncer(peerInfo peer.AddrInfo, doUpdate bool) (Syncer, func(), error) { // Check for an HTTP address in peerAddrs, or if not given, in the http // peerstore. This gives a preference to use ipnisync over dtsync. @@ -763,6 +777,9 @@ func (s *Subscriber) makeSyncer(peerInfo peer.AddrInfo, doUpdate bool) (Syncer, httpAddrs = s.httpPeerstore.Addrs(peerInfo.ID) } else { httpAddrs = mautil.FindHTTPAddrs(peerInfo.Addrs) + if doUpdate && len(httpAddrs) != 0 { + delNotPresent(s.httpPeerstore, peerInfo.ID, httpAddrs) + } } var update func() @@ -790,6 +807,7 @@ func (s *Subscriber) makeSyncer(peerInfo peer.AddrInfo, doUpdate bool) (Syncer, if doUpdate { peerStore := s.host.Peerstore() if peerStore != nil && len(peerInfo.Addrs) != 0 { + delNotPresent(peerStore, peerInfo.ID, peerInfo.Addrs) // Add it to peerstore with a small TTL first, and extend it if/when // sync with it completes. In case the peerstore already has this // address and the existing TTL is greater than this temp one, this is @@ -1005,7 +1023,7 @@ func (h *handler) handle(ctx context.Context, nextCid cid.Cid, sel ipld.Node, sy if err != nil { return 0, err } - log.Infow("Non-segmented sync completed", "syncedCount", syncedCount) + log.Debugw("Non-segmented sync completed", "syncedCount", syncedCount) return syncedCount, nil } @@ -1060,6 +1078,6 @@ SegSyncLoop: } } - log.Infow("Segmented sync completed", "syncedCount", syncedCount) + log.Debugw("Segmented sync completed", "syncedCount", syncedCount) return syncedCount, nil } diff --git a/dagsync/sync_test.go b/dagsync/sync_test.go index 49f1985..f5606f6 100644 --- a/dagsync/sync_test.go +++ b/dagsync/sync_test.go @@ -2,6 +2,7 @@ package dagsync_test import ( "context" + "path" "testing" "time" @@ -17,6 +18,7 @@ import ( "github.com/ipni/go-libipni/dagsync/ipnisync" "github.com/ipni/go-libipni/dagsync/test" "github.com/libp2p/go-libp2p/core/peer" + "github.com/multiformats/go-multiaddr" "github.com/stretchr/testify/require" ) @@ -399,6 +401,49 @@ func TestSyncOnAnnounceDataTransfer(t *testing.T) { announceTest(t, pub, sub, dstStore, watcher, pubInfo, chainLnks[0]) } +func TestUpdatePeerstoreAddr(t *testing.T) { + dstStore := dssync.MutexWrap(datastore.NewMapDatastore()) + dstHost := test.MkTestHost(t) + dstLnkS := test.MkLinkSystem(dstStore) + + sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic, + dagsync.RecvAnnounce(), dagsync.StrictAdsSelector(false)) + require.NoError(t, err) + defer sub.Close() + + watcher, cncl := sub.OnSyncFinished() + defer cncl() + + srcHost := test.MkTestHost(t) + srcStore := dssync.MutexWrap(datastore.NewMapDatastore()) + srcLnkS := test.MkLinkSystem(srcStore) + pub, err := dtsync.NewPublisher(srcHost, srcStore, srcLnkS, testTopic) + require.NoError(t, err) + defer pub.Close() + require.NoError(t, test.WaitForP2PPublisher(pub, dstHost, testTopic)) + + srcHost.Peerstore().AddAddrs(dstHost.ID(), dstHost.Addrs(), time.Hour) + dstHost.Peerstore().AddAddrs(srcHost.ID(), srcHost.Addrs(), time.Hour) + + // Store the whole chain in source node + chainLnks := test.MkChain(srcLnkS, true) + + pubInfo := peer.AddrInfo{ + ID: pub.ID(), + Addrs: pub.Addrs(), + } + + announceTest(t, pub, sub, dstStore, watcher, pubInfo, chainLnks[2]) + require.Equal(t, pubInfo.Addrs, dstHost.Peerstore().Addrs(pub.ID())) + + // Update publisher address, sync, and check that peerstore matches. + maddr, err := multiaddr.NewMultiaddr("/dns4/localhost/tcp/" + path.Base(pub.Addrs()[0].String())) + require.NoError(t, err) + pubInfo.Addrs = []multiaddr.Multiaddr{maddr} + announceTest(t, pub, sub, dstStore, watcher, pubInfo, chainLnks[1]) + require.Equal(t, pubInfo.Addrs, dstHost.Peerstore().Addrs(pub.ID())) +} + func TestSyncOnAnnounceIPNI(t *testing.T) { dstStore := dssync.MutexWrap(datastore.NewMapDatastore()) dstHost := test.MkTestHost(t)