From b160f5038a699b124d54e2f7067af69bd56e1ccf Mon Sep 17 00:00:00 2001 From: sukun Date: Mon, 2 Dec 2024 00:37:38 +0530 Subject: [PATCH] cleanup interface address handling --- p2p/host/autorelay/relay_finder.go | 1 - p2p/host/basic/address_service.go | 257 ++++++++++++++--------------- p2p/host/basic/basic_host.go | 62 ++++++- p2p/host/basic/basic_host_test.go | 23 --- 4 files changed, 176 insertions(+), 167 deletions(-) diff --git a/p2p/host/autorelay/relay_finder.go b/p2p/host/autorelay/relay_finder.go index d4fabddaa0..7dd127d464 100644 --- a/p2p/host/autorelay/relay_finder.go +++ b/p2p/host/autorelay/relay_finder.go @@ -485,7 +485,6 @@ func (rf *relayFinder) tryNode(ctx context.Context, pi peer.AddrInfo) (supportsR if err := rf.host.Connect(ctx, pi); err != nil { return false, fmt.Errorf("error connecting to relay %s: %w", pi.ID, err) } - conns := rf.host.Network().ConnsToPeer(pi.ID) for _, conn := range conns { if isRelayAddr(conn.RemoteMultiaddr()) { diff --git a/p2p/host/basic/address_service.go b/p2p/host/basic/address_service.go index 6c3a021f1c..c7166e7ac0 100644 --- a/p2p/host/basic/address_service.go +++ b/p2p/host/basic/address_service.go @@ -42,6 +42,7 @@ type addressService struct { autorelay *autorelay.AutoRelay addrsFactory AddrsFactory addrChangeChan chan struct{} + relayAddrsSub event.Subscription ctx context.Context ctxCancel context.CancelFunc @@ -49,9 +50,7 @@ type addressService struct { updateLocalIPv4Backoff backoff.ExpBackoff updateLocalIPv6Backoff backoff.ExpBackoff - mx sync.RWMutex - allInterfaceAddrs []ma.Multiaddr - filteredInterfaceAddrs []ma.Multiaddr + ifaceAddrs *interfaceAddrsCache } func NewAddressService(h *BasicHost, natmgr func(network.Network) NATManager, @@ -64,6 +63,10 @@ func NewAddressService(h *BasicHost, natmgr func(network.Network) NATManager, if natmgr != nil { nmgr = natmgr(h.Network()) } + addrSub, err := h.EventBus().Subscribe(new(event.EvtAutoRelayAddrs)) + if err != nil { + return nil, err + } ctx, cancel := context.WithCancel(context.Background()) as := &addressService{ s: h.Network(), @@ -77,10 +80,11 @@ func NewAddressService(h *BasicHost, natmgr func(network.Network) NATManager, autorelay: h.autorelay, addrsFactory: addrFactory, addrChangeChan: make(chan struct{}, 1), + relayAddrsSub: addrSub, ctx: ctx, ctxCancel: cancel, + ifaceAddrs: &interfaceAddrsCache{}, } - as.updateLocalIpAddr() if !as.disableSignedPeerRecord { cab, ok := peerstore.GetCertifiedAddrBook(as.peerstore) if !ok { @@ -172,10 +176,6 @@ func (a *addressService) background() { defer ticker.Stop() for { - // Update our local IP addresses before checking our current addresses. - if len(a.s.ListenAddresses()) > 0 { - a.updateLocalIpAddr() - } curr := a.Addrs() emitAddrChange(curr, lastAddrs) lastAddrs = curr @@ -183,6 +183,7 @@ func (a *addressService) background() { select { case <-ticker.C: case <-a.addrChangeChan: + case <-a.relayAddrsSub.Out(): case <-a.ctx.Done(): return } @@ -225,10 +226,7 @@ func (a *addressService) AllAddrs() []ma.Multiaddr { return nil } - a.mx.RLock() - filteredIfaceAddrs := a.filteredInterfaceAddrs - allIfaceAddrs := a.allInterfaceAddrs - a.mx.RUnlock() + filteredIfaceAddrs := a.ifaceAddrs.Filtered() // Iterate over all _unresolved_ listen addresses, resolving our primary // interface only to avoid advertising too many addresses. @@ -272,7 +270,7 @@ func (a *addressService) AllAddrs() []ma.Multiaddr { // No. // in case the router gives us a wrong address or we're behind a double-NAT. // also add observed addresses - resolved, err := manet.ResolveUnspecifiedAddress(listen, allIfaceAddrs) + resolved, err := manet.ResolveUnspecifiedAddress(listen, a.ifaceAddrs.All()) if err != nil { // This can happen if we try to resolve /ip6/::/... // without any IPv6 interface addresses. @@ -324,88 +322,6 @@ func (a *addressService) AllAddrs() []ma.Multiaddr { return finalAddrs } -func (a *addressService) updateLocalIpAddr() { - a.mx.Lock() - defer a.mx.Unlock() - - a.filteredInterfaceAddrs = nil - a.allInterfaceAddrs = nil - - // Try to use the default ipv4/6 addresses. - // TODO: Remove this. We should advertise all interface addresses. - if r, err := netroute.New(); err != nil { - log.Debugw("failed to build Router for kernel's routing table", "error", err) - } else { - - var localIPv4 net.IP - var ran bool - err, ran = a.updateLocalIPv4Backoff.Run(func() error { - _, _, localIPv4, err = r.Route(net.IPv4zero) - return err - }) - - if ran && err != nil { - log.Debugw("failed to fetch local IPv4 address", "error", err) - } else if ran && localIPv4.IsGlobalUnicast() { - maddr, err := manet.FromIP(localIPv4) - if err == nil { - a.filteredInterfaceAddrs = append(a.filteredInterfaceAddrs, maddr) - } - } - - var localIPv6 net.IP - err, ran = a.updateLocalIPv6Backoff.Run(func() error { - _, _, localIPv6, err = r.Route(net.IPv6unspecified) - return err - }) - - if ran && err != nil { - log.Debugw("failed to fetch local IPv6 address", "error", err) - } else if ran && localIPv6.IsGlobalUnicast() { - maddr, err := manet.FromIP(localIPv6) - if err == nil { - a.filteredInterfaceAddrs = append(a.filteredInterfaceAddrs, maddr) - } - } - } - - // Resolve the interface addresses - ifaceAddrs, err := manet.InterfaceMultiaddrs() - if err != nil { - // This usually shouldn't happen, but we could be in some kind - // of funky restricted environment. - log.Errorw("failed to resolve local interface addresses", "error", err) - - // Add the loopback addresses to the filtered addrs and use them as the non-filtered addrs. - // Then bail. There's nothing else we can do here. - a.filteredInterfaceAddrs = append(a.filteredInterfaceAddrs, manet.IP4Loopback, manet.IP6Loopback) - a.allInterfaceAddrs = a.filteredInterfaceAddrs - return - } - - for _, addr := range ifaceAddrs { - // Skip link-local addrs, they're mostly useless. - if !manet.IsIP6LinkLocal(addr) { - a.allInterfaceAddrs = append(a.allInterfaceAddrs, addr) - } - } - - // If netroute failed to get us any interface addresses, use all of - // them. - if len(a.filteredInterfaceAddrs) == 0 { - // Add all addresses. - a.filteredInterfaceAddrs = a.allInterfaceAddrs - } else { - // Only add loopback addresses. Filter these because we might - // not _have_ an IPv6 loopback address. - for _, addr := range a.allInterfaceAddrs { - if manet.IsIPLoopback(addr) { - a.filteredInterfaceAddrs = append(a.filteredInterfaceAddrs, addr) - } - } - } -} - func (a *addressService) addCertHashes(addrs []ma.Multiaddr) []ma.Multiaddr { // This is a temporary workaround/hack that fixes #2233. Once we have a // proper address pipeline, rework this. See the issue for more context. @@ -498,54 +414,125 @@ func (a *addressService) makeUpdatedAddrEvent(prev, current []ma.Multiaddr) *eve return evt } -func trimHostAddrList(addrs []ma.Multiaddr, maxSize int) []ma.Multiaddr { - totalSize := 0 - for _, a := range addrs { - totalSize += len(a.Bytes()) +const ifaceAddrsTTL = time.Minute + +type interfaceAddrsCache struct { + mx sync.RWMutex + filtered []ma.Multiaddr + all []ma.Multiaddr + updateLocalIPv4Backoff backoff.ExpBackoff + updateLocalIPv6Backoff backoff.ExpBackoff + lastUpdated time.Time +} + +func (i *interfaceAddrsCache) Filtered() []ma.Multiaddr { + i.mx.RLock() + if time.Now().After(i.lastUpdated.Add(ifaceAddrsTTL)) { + i.mx.RUnlock() + return i.update(true) } - if totalSize <= maxSize { - return addrs + defer i.mx.RUnlock() + return i.filtered +} + +func (i *interfaceAddrsCache) All() []ma.Multiaddr { + i.mx.RLock() + if time.Now().After(i.lastUpdated.Add(ifaceAddrsTTL)) { + i.mx.RUnlock() + return i.update(false) } + defer i.mx.RUnlock() + return i.all +} - score := func(addr ma.Multiaddr) int { - var res int - if manet.IsPublicAddr(addr) { - res |= 1 << 12 - } else if !manet.IsIPLoopback(addr) { - res |= 1 << 11 +func (i *interfaceAddrsCache) update(filtered bool) []ma.Multiaddr { + i.mx.Lock() + defer i.mx.Unlock() + if !time.Now().After(i.lastUpdated.Add(ifaceAddrsTTL)) { + if filtered { + return i.filtered } - var protocolWeight int - ma.ForEach(addr, func(c ma.Component) bool { - switch c.Protocol().Code { - case ma.P_QUIC_V1: - protocolWeight = 5 - case ma.P_TCP: - protocolWeight = 4 - case ma.P_WSS: - protocolWeight = 3 - case ma.P_WEBTRANSPORT: - protocolWeight = 2 - case ma.P_WEBRTC_DIRECT: - protocolWeight = 1 - case ma.P_P2P: - return false + return i.all + } + i.updateUnlocked() + i.lastUpdated = time.Now() + if filtered { + return i.filtered + } + return i.all +} + +func (i *interfaceAddrsCache) updateUnlocked() { + i.filtered = nil + i.all = nil + + // Try to use the default ipv4/6 addresses. + // TODO: Remove this. We should advertise all interface addresses. + if r, err := netroute.New(); err != nil { + log.Debugw("failed to build Router for kernel's routing table", "error", err) + } else { + + var localIPv4 net.IP + var ran bool + err, ran = i.updateLocalIPv4Backoff.Run(func() error { + _, _, localIPv4, err = r.Route(net.IPv4zero) + return err + }) + + if ran && err != nil { + log.Debugw("failed to fetch local IPv4 address", "error", err) + } else if ran && localIPv4.IsGlobalUnicast() { + maddr, err := manet.FromIP(localIPv4) + if err == nil { + i.filtered = append(i.filtered, maddr) } - return true + } + + var localIPv6 net.IP + err, ran = i.updateLocalIPv6Backoff.Run(func() error { + _, _, localIPv6, err = r.Route(net.IPv6unspecified) + return err }) - res |= 1 << protocolWeight - return res + + if ran && err != nil { + log.Debugw("failed to fetch local IPv6 address", "error", err) + } else if ran && localIPv6.IsGlobalUnicast() { + maddr, err := manet.FromIP(localIPv6) + if err == nil { + i.filtered = append(i.filtered, maddr) + } + } } - slices.SortStableFunc(addrs, func(a, b ma.Multiaddr) int { - return score(b) - score(a) // b-a for reverse order - }) - totalSize = 0 - for i, a := range addrs { - totalSize += len(a.Bytes()) - if totalSize > maxSize { - addrs = addrs[:i] - break + // Resolve the interface addresses + ifaceAddrs, err := manet.InterfaceMultiaddrs() + if err != nil { + // This usually shouldn't happen, but we could be in some kind + // of funky restricted environment. + log.Errorw("failed to resolve local interface addresses", "error", err) + + // Add the loopback addresses to the filtered addrs and use them as the non-filtered addrs. + // Then bail. There's nothing else we can do here. + i.filtered = append(i.filtered, manet.IP4Loopback, manet.IP6Loopback) + i.all = i.filtered + return + } + + // remove link local ipv6 addresses + i.all = slices.DeleteFunc(ifaceAddrs, manet.IsIP6LinkLocal) + + // If netroute failed to get us any interface addresses, use all of + // them. + if len(i.filtered) == 0 { + // Add all addresses. + i.filtered = i.all + } else { + // Only add loopback addresses. Filter these because we might + // not _have_ an IPv6 loopback address. + for _, addr := range i.all { + if manet.IsIPLoopback(addr) { + i.filtered = append(i.filtered, addr) + } } } - return addrs } diff --git a/p2p/host/basic/basic_host.go b/p2p/host/basic/basic_host.go index fb555b4545..801656c499 100644 --- a/p2p/host/basic/basic_host.go +++ b/p2p/host/basic/basic_host.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "io" + "slices" "sync" "time" @@ -33,6 +34,7 @@ import ( logging "github.com/ipfs/go-log/v2" ma "github.com/multiformats/go-multiaddr" + manet "github.com/multiformats/go-multiaddr/net" msmux "github.com/multiformats/go-multistream" ) @@ -97,7 +99,6 @@ type BasicHost struct { autoNat autonat.AutoNAT autonatv2 *autonatv2.AutoNAT - addrSub event.Subscription autorelay *autorelay.AutoRelay addressService *addressService } @@ -180,11 +181,6 @@ func NewHost(n network.Network, opts *HostOpts) (*BasicHost, error) { return nil, err } - addrSub, err := opts.EventBus.Subscribe(new(event.EvtAutoRelayAddrs)) - if err != nil { - return nil, err - } - hostCtx, cancel := context.WithCancel(context.Background()) h := &BasicHost{ network: n, @@ -195,7 +191,6 @@ func NewHost(n network.Network, opts *HostOpts) (*BasicHost, error) { ctx: hostCtx, ctxCancel: cancel, disableSignedPeerRecord: opts.DisableSignedPeerRecord, - addrSub: addrSub, } if h.emitters.evtLocalProtocolsUpdated, err = h.eventbus.Emitter(&event.EvtLocalProtocolsUpdated{}, eventbus.Stateful); err != nil { @@ -596,7 +591,6 @@ func (h *BasicHost) Connect(ctx context.Context, pi peer.AddrInfo) error { return nil } } - return h.dialPeer(ctx, pi.ID) } @@ -728,6 +722,58 @@ func (h *BasicHost) Close() error { return nil } +func trimHostAddrList(addrs []ma.Multiaddr, maxSize int) []ma.Multiaddr { + totalSize := 0 + for _, a := range addrs { + totalSize += len(a.Bytes()) + } + if totalSize <= maxSize { + return addrs + } + + score := func(addr ma.Multiaddr) int { + var res int + if manet.IsPublicAddr(addr) { + res |= 1 << 12 + } else if !manet.IsIPLoopback(addr) { + res |= 1 << 11 + } + var protocolWeight int + ma.ForEach(addr, func(c ma.Component) bool { + switch c.Protocol().Code { + case ma.P_QUIC_V1: + protocolWeight = 5 + case ma.P_TCP: + protocolWeight = 4 + case ma.P_WSS: + protocolWeight = 3 + case ma.P_WEBTRANSPORT: + protocolWeight = 2 + case ma.P_WEBRTC_DIRECT: + protocolWeight = 1 + case ma.P_P2P: + return false + } + return true + }) + res |= 1 << protocolWeight + return res + } + + slices.SortStableFunc(addrs, func(a, b ma.Multiaddr) int { + return score(b) - score(a) // b-a for reverse order + }) + totalSize = 0 + for i, a := range addrs { + totalSize += len(a.Bytes()) + if totalSize > maxSize { + addrs = addrs[:i] + break + } + } + return addrs +} + type streamWrapper struct { network.Stream rw io.ReadWriteCloser diff --git a/p2p/host/basic/basic_host_test.go b/p2p/host/basic/basic_host_test.go index a9c7ab28fd..2d254956d2 100644 --- a/p2p/host/basic/basic_host_test.go +++ b/p2p/host/basic/basic_host_test.go @@ -206,29 +206,6 @@ func TestHostAddrsFactory(t *testing.T) { } } -func TestLocalIPChangesWhenListenAddrChanges(t *testing.T) { - // no listen addrs - h, err := NewHost(swarmt.GenSwarm(t, swarmt.OptDialOnly), nil) - require.NoError(t, err) - h.Start() - defer h.Close() - - h.addressService.mx.Lock() - h.addressService.filteredInterfaceAddrs = nil - h.addressService.allInterfaceAddrs = nil - h.addressService.mx.Unlock() - - // change listen addrs and verify local IP addr is not nil again - require.NoError(t, h.Network().Listen(ma.StringCast("/ip4/0.0.0.0/tcp/0"))) - h.SignalAddressChange() - time.Sleep(1 * time.Second) - - h.addrMu.RLock() - defer h.addrMu.RUnlock() - require.NotEmpty(t, h.addressService.filteredInterfaceAddrs) - require.NotEmpty(t, h.addressService.allInterfaceAddrs) -} - func TestAllAddrs(t *testing.T) { // no listen addrs h, err := NewHost(swarmt.GenSwarm(t, swarmt.OptDialOnly), nil)