Skip to content

Commit

Permalink
Remove unused addrs from peerstore (#128)
Browse files Browse the repository at this point in the history
* Remove unused addrs from peerstore

Remove unused addrs from peerstore to prevent addresses that are no longer valid from continuing to be used.

Fixes #127

Update non-critical log messages to log at debug level.

* Fix log message
* Add test to check that old address is removed from peerstore
  • Loading branch information
gammazero authored Sep 27, 2023
1 parent c886fd4 commit 262133f
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 18 deletions.
54 changes: 36 additions & 18 deletions dagsync/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ type Subscriber struct {
expSyncWG sync.WaitGroup

// selector sequence for advertisements
//
// This selector sequence is wrapped with selector logic that stops
// traversal when the latest synced link is reached. So, this only
// specifies the selection sequence itself.
adsSelectorSeq ipld.Node

// selectorOne selects one multihash entries or HAMT block.
Expand Down Expand Up @@ -347,8 +351,6 @@ func (s *Subscriber) doClose() error {
func (s *Subscriber) OnSyncFinished() (<-chan SyncFinished, context.CancelFunc) {
// Channel is buffered to prevent distribute() from blocking if a reader is
// not reading the channel immediately.
log.Info("Configuring subscriber OnSyncFinished...")

cq := channelqueue.New[SyncFinished](-1)
ch := cq.In()
s.addEventChan <- ch
Expand All @@ -363,7 +365,7 @@ func (s *Subscriber) OnSyncFinished() (<-chan SyncFinished, context.CancelFunc)
}
ch = nil
}
log.Info("Subscriber OnSyncFinished configured.")
log.Debug("Subscriber OnSyncFinished configured")
return cq.Out(), cncl
}

Expand Down Expand Up @@ -396,16 +398,6 @@ func (s *Subscriber) RemoveHandler(peerID peer.ID) bool {
// It is the responsibility of the caller to make sure the given CID appears
// after the latest sync in order to avid re-syncing of content that may have
// previously been synced.
//
// The selector sequence, sel, can optionally be specified to customize the
// selection sequence during traversal. If unspecified, the default selector
// sequence is used.
//
// Note that the selector sequence is wrapped with a selector logic that will
// stop traversal when the latest synced link is reached. Therefore, it must
// only specify the selection sequence itself.
//
// See: ExploreRecursiveWithStopNode.
func (s *Subscriber) SyncAdChain(ctx context.Context, peerInfo peer.AddrInfo, options ...SyncOption) (cid.Cid, error) {
s.expSyncMutex.Lock()
if s.expSyncClosed {
Expand Down Expand Up @@ -483,7 +475,7 @@ func (s *Subscriber) SyncAdChain(ctx context.Context, peerInfo peer.AddrInfo, op
}

log = log.With("cid", nextCid)
log.Info("Sync starting advertisement chain at head CID")
log.Debug("Start advertisement chain sync")

if ctx.Err() != nil {
return cid.Undef, fmt.Errorf("sync canceled: %w", ctx.Err())
Expand Down Expand Up @@ -554,7 +546,7 @@ func (s *Subscriber) SyncHAMTEntries(ctx context.Context, peerInfo peer.AddrInfo

func (s *Subscriber) syncEntries(ctx context.Context, peerInfo peer.AddrInfo, entCid cid.Cid, sel ipld.Node, bh BlockHookFunc, segdl int64) error {
if entCid == cid.Undef {
log.Info("No entries to sync", "peer", peerInfo.ID)
log.Infow("No entries to sync", "peer", peerInfo.ID)
return nil
}

Expand All @@ -577,7 +569,7 @@ func (s *Subscriber) syncEntries(ctx context.Context, peerInfo peer.AddrInfo, en
return err
}

log.Info("Start entries sync", "peer", peerInfo.ID, "cid", entCid)
log.Debugw("Start entries sync", "peer", peerInfo.ID, "cid", entCid)

// Check for an existing handler for the specified peer (publisher). If
// none, create one if allowed.
Expand Down Expand Up @@ -755,6 +747,28 @@ func (s *Subscriber) Announce(ctx context.Context, nextCid cid.Cid, peerInfo pee
return s.receiver.Direct(ctx, nextCid, peerInfo)
}

// delNotPresent removes from the peerStore the peer's multiaddrs that are not
// present in the specified addrs slice.
func delNotPresent(peerStore peerstore.Peerstore, peerID peer.ID, addrs []multiaddr.Multiaddr) {
var del []multiaddr.Multiaddr
oldAddrs := peerStore.Addrs(peerID)
for _, old := range oldAddrs {
keep := false
for _, new := range addrs {
if old.Equal(new) {
keep = true
break
}
}
if !keep {
del = append(del, old)
}
}
if len(del) != 0 {
peerStore.SetAddrs(peerID, del, 0)
}
}

func (s *Subscriber) makeSyncer(peerInfo peer.AddrInfo, doUpdate bool) (Syncer, func(), error) {
// Check for an HTTP address in peerAddrs, or if not given, in the http
// peerstore. This gives a preference to use ipnisync over dtsync.
Expand All @@ -763,6 +777,9 @@ func (s *Subscriber) makeSyncer(peerInfo peer.AddrInfo, doUpdate bool) (Syncer,
httpAddrs = s.httpPeerstore.Addrs(peerInfo.ID)
} else {
httpAddrs = mautil.FindHTTPAddrs(peerInfo.Addrs)
if doUpdate && len(httpAddrs) != 0 {
delNotPresent(s.httpPeerstore, peerInfo.ID, httpAddrs)
}
}

var update func()
Expand Down Expand Up @@ -790,6 +807,7 @@ func (s *Subscriber) makeSyncer(peerInfo peer.AddrInfo, doUpdate bool) (Syncer,
if doUpdate {
peerStore := s.host.Peerstore()
if peerStore != nil && len(peerInfo.Addrs) != 0 {
delNotPresent(peerStore, peerInfo.ID, peerInfo.Addrs)
// Add it to peerstore with a small TTL first, and extend it if/when
// sync with it completes. In case the peerstore already has this
// address and the existing TTL is greater than this temp one, this is
Expand Down Expand Up @@ -1005,7 +1023,7 @@ func (h *handler) handle(ctx context.Context, nextCid cid.Cid, sel ipld.Node, sy
if err != nil {
return 0, err
}
log.Infow("Non-segmented sync completed", "syncedCount", syncedCount)
log.Debugw("Non-segmented sync completed", "syncedCount", syncedCount)
return syncedCount, nil
}

Expand Down Expand Up @@ -1060,6 +1078,6 @@ SegSyncLoop:
}
}

log.Infow("Segmented sync completed", "syncedCount", syncedCount)
log.Debugw("Segmented sync completed", "syncedCount", syncedCount)
return syncedCount, nil
}
45 changes: 45 additions & 0 deletions dagsync/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package dagsync_test

import (
"context"
"path"
"testing"
"time"

Expand All @@ -17,6 +18,7 @@ import (
"github.com/ipni/go-libipni/dagsync/ipnisync"
"github.com/ipni/go-libipni/dagsync/test"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -399,6 +401,49 @@ func TestSyncOnAnnounceDataTransfer(t *testing.T) {
announceTest(t, pub, sub, dstStore, watcher, pubInfo, chainLnks[0])
}

func TestUpdatePeerstoreAddr(t *testing.T) {
dstStore := dssync.MutexWrap(datastore.NewMapDatastore())
dstHost := test.MkTestHost(t)
dstLnkS := test.MkLinkSystem(dstStore)

sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic,
dagsync.RecvAnnounce(), dagsync.StrictAdsSelector(false))
require.NoError(t, err)
defer sub.Close()

watcher, cncl := sub.OnSyncFinished()
defer cncl()

srcHost := test.MkTestHost(t)
srcStore := dssync.MutexWrap(datastore.NewMapDatastore())
srcLnkS := test.MkLinkSystem(srcStore)
pub, err := dtsync.NewPublisher(srcHost, srcStore, srcLnkS, testTopic)
require.NoError(t, err)
defer pub.Close()
require.NoError(t, test.WaitForP2PPublisher(pub, dstHost, testTopic))

srcHost.Peerstore().AddAddrs(dstHost.ID(), dstHost.Addrs(), time.Hour)
dstHost.Peerstore().AddAddrs(srcHost.ID(), srcHost.Addrs(), time.Hour)

// Store the whole chain in source node
chainLnks := test.MkChain(srcLnkS, true)

pubInfo := peer.AddrInfo{
ID: pub.ID(),
Addrs: pub.Addrs(),
}

announceTest(t, pub, sub, dstStore, watcher, pubInfo, chainLnks[2])
require.Equal(t, pubInfo.Addrs, dstHost.Peerstore().Addrs(pub.ID()))

// Update publisher address, sync, and check that peerstore matches.
maddr, err := multiaddr.NewMultiaddr("/dns4/localhost/tcp/" + path.Base(pub.Addrs()[0].String()))
require.NoError(t, err)
pubInfo.Addrs = []multiaddr.Multiaddr{maddr}
announceTest(t, pub, sub, dstStore, watcher, pubInfo, chainLnks[1])
require.Equal(t, pubInfo.Addrs, dstHost.Peerstore().Addrs(pub.ID()))
}

func TestSyncOnAnnounceIPNI(t *testing.T) {
dstStore := dssync.MutexWrap(datastore.NewMapDatastore())
dstHost := test.MkTestHost(t)
Expand Down

0 comments on commit 262133f

Please sign in to comment.