From 83991ffcb395a0e4dfe90023247abbd7b33d4efb Mon Sep 17 00:00:00 2001 From: sukun Date: Sun, 1 Dec 2024 18:39:47 +0530 Subject: [PATCH 1/5] addressservice: extract out addressing logic from basichost The goal is to keep modifying it and removing the dependency on basichost by relying on events. At that point, we can expose this as a separate service usable by both basic and blank hosts --- p2p/host/basic/address_service.go | 551 ++++++++++++++++++++++++++++++ p2p/host/basic/basic_host.go | 524 +++------------------------- p2p/host/basic/basic_host_test.go | 22 +- 3 files changed, 606 insertions(+), 491 deletions(-) create mode 100644 p2p/host/basic/address_service.go diff --git a/p2p/host/basic/address_service.go b/p2p/host/basic/address_service.go new file mode 100644 index 0000000000..6c3a021f1c --- /dev/null +++ b/p2p/host/basic/address_service.go @@ -0,0 +1,551 @@ +package basichost + +import ( + "context" + "errors" + "fmt" + "net" + "slices" + "sync" + "time" + + "github.com/libp2p/go-libp2p/core/event" + "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/peerstore" + "github.com/libp2p/go-libp2p/core/record" + "github.com/libp2p/go-libp2p/core/transport" + "github.com/libp2p/go-libp2p/p2p/host/autonat" + "github.com/libp2p/go-libp2p/p2p/host/autorelay" + "github.com/libp2p/go-libp2p/p2p/host/basic/internal/backoff" + "github.com/libp2p/go-libp2p/p2p/host/eventbus" + "github.com/libp2p/go-libp2p/p2p/protocol/identify" + libp2pwebrtc "github.com/libp2p/go-libp2p/p2p/transport/webrtc" + libp2pwebtransport "github.com/libp2p/go-libp2p/p2p/transport/webtransport" + "github.com/libp2p/go-netroute" + ma "github.com/multiformats/go-multiaddr" + manet "github.com/multiformats/go-multiaddr/net" +) + +type peerRecordFunc func([]ma.Multiaddr) (*record.Envelope, error) + +type addressService struct { + s network.Network + natmgr NATManager + ids identify.IDService + peerRecord peerRecordFunc + disableSignedPeerRecord bool + peerstore peerstore.Peerstore + id peer.ID + autonat autonat.AutoNAT + emitter event.Emitter + autorelay *autorelay.AutoRelay + addrsFactory AddrsFactory + addrChangeChan chan struct{} + ctx context.Context + ctxCancel context.CancelFunc + + wg sync.WaitGroup + updateLocalIPv4Backoff backoff.ExpBackoff + updateLocalIPv6Backoff backoff.ExpBackoff + + mx sync.RWMutex + allInterfaceAddrs []ma.Multiaddr + filteredInterfaceAddrs []ma.Multiaddr +} + +func NewAddressService(h *BasicHost, natmgr func(network.Network) NATManager, + addrFactory AddrsFactory) (*addressService, error) { + emitter, err := h.eventbus.Emitter(&event.EvtLocalAddressesUpdated{}, eventbus.Stateful) + if err != nil { + return nil, err + } + var nmgr NATManager + if natmgr != nil { + nmgr = natmgr(h.Network()) + } + ctx, cancel := context.WithCancel(context.Background()) + as := &addressService{ + s: h.Network(), + ids: h.IDService(), + peerRecord: h.makeSignedPeerRecord, + disableSignedPeerRecord: h.disableSignedPeerRecord, + peerstore: h.Peerstore(), + id: h.ID(), + natmgr: nmgr, + emitter: emitter, + autorelay: h.autorelay, + addrsFactory: addrFactory, + addrChangeChan: make(chan struct{}, 1), + ctx: ctx, + ctxCancel: cancel, + } + as.updateLocalIpAddr() + if !as.disableSignedPeerRecord { + cab, ok := peerstore.GetCertifiedAddrBook(as.peerstore) + if !ok { + return nil, errors.New("peerstore should also be a certified address book") + } + h.caBook = cab + rec, err := as.peerRecord(as.Addrs()) + if err != nil { + return nil, fmt.Errorf("failed to create signed record for self: %w", err) + } + if _, err := cab.ConsumePeerRecord(rec, peerstore.PermanentAddrTTL); err != nil { + return nil, fmt.Errorf("failed to persist signed record to peerstore: %w", err) + } + } + return as, nil +} + +func (a *addressService) SetAutoNAT(an autonat.AutoNAT) { + a.autonat = an +} + +func (a *addressService) Start() { + a.wg.Add(1) + go a.background() +} + +func (a *addressService) Close() { + a.ctxCancel() + a.wg.Wait() + if a.natmgr != nil { + err := a.natmgr.Close() + if err != nil { + log.Warnf("error closing natmgr: %s", err) + } + } + err := a.emitter.Close() + if err != nil { + log.Warnf("error closing addrs update emitter: %s", err) + } +} + +func (a *addressService) SignalAddressChange() { + select { + case a.addrChangeChan <- struct{}{}: + default: + } + +} + +func (a *addressService) background() { + defer a.wg.Done() + var lastAddrs []ma.Multiaddr + + emitAddrChange := func(currentAddrs []ma.Multiaddr, lastAddrs []ma.Multiaddr) { + changeEvt := a.makeUpdatedAddrEvent(lastAddrs, currentAddrs) + if changeEvt == nil { + return + } + // Our addresses have changed. + // store the signed peer record in the peer store. + if !a.disableSignedPeerRecord { + cabook, ok := peerstore.GetCertifiedAddrBook(a.peerstore) + if !ok { + log.Errorf("peerstore doesn't implement certified address book") + return + } + if _, err := cabook.ConsumePeerRecord(changeEvt.SignedPeerRecord, peerstore.PermanentAddrTTL); err != nil { + log.Errorf("failed to persist signed peer record in peer store, err=%s", err) + return + } + } + // update host addresses in the peer store + removedAddrs := make([]ma.Multiaddr, 0, len(changeEvt.Removed)) + for _, ua := range changeEvt.Removed { + removedAddrs = append(removedAddrs, ua.Address) + } + a.peerstore.SetAddrs(a.id, currentAddrs, peerstore.PermanentAddrTTL) + a.peerstore.SetAddrs(a.id, removedAddrs, 0) + + // emit addr change event + if err := a.emitter.Emit(*changeEvt); err != nil { + log.Warnf("error emitting event for updated addrs: %s", err) + } + } + + // periodically schedules an IdentifyPush to update our peers for changes + // in our address set (if needed) + ticker := time.NewTicker(addrChangeTickrInterval) + defer ticker.Stop() + + for { + // Update our local IP addresses before checking our current addresses. + if len(a.s.ListenAddresses()) > 0 { + a.updateLocalIpAddr() + } + curr := a.Addrs() + emitAddrChange(curr, lastAddrs) + lastAddrs = curr + + select { + case <-ticker.C: + case <-a.addrChangeChan: + case <-a.ctx.Done(): + return + } + } +} + +// Addrs returns the node's dialable addresses both public and private. +// If autorealy is enabled and node reachability is private, it returns +// the node's relay addresses and private network addresses. +func (a *addressService) Addrs() []ma.Multiaddr { + addrs := a.AllAddrs() + // Delete public addresses if the node's reachability is private, and we have autorelay. + if a.autonat != nil && a.autorelay != nil && a.autonat.Status() == network.ReachabilityPrivate { + addrs = slices.DeleteFunc(addrs, func(a ma.Multiaddr) bool { return manet.IsPublicAddr(a) }) + addrs = append(addrs, a.autorelay.RelayAddrs()...) + } + // Make a copy. Consumers can modify the slice elements + addrs = slices.Clone(a.addrsFactory(addrs)) + // Add certhashes for the addresses provided by the user via address factory. + return a.addCertHashes(ma.Unique(addrs)) +} + +// GetHolePunchAddrs returns the node's public direct listen addresses. +func (a *addressService) GetHolePunchAddrs() []ma.Multiaddr { + addrs := a.AllAddrs() + addrs = slices.Clone(a.addrsFactory(addrs)) + // AllAddrs may ignore observed addresses in favour of NAT mappings. + // Use both for hole punching. + addrs = append(addrs, a.ids.OwnObservedAddrs()...) + addrs = ma.Unique(addrs) + return slices.DeleteFunc(addrs, func(a ma.Multiaddr) bool { return !manet.IsPublicAddr(a) }) +} + +var p2pCircuitAddr = ma.StringCast("/p2p-circuit") + +// AllAddrs returns all the addresses the host is listening on except circuit addresses. +func (a *addressService) AllAddrs() []ma.Multiaddr { + listenAddrs := a.s.ListenAddresses() + if len(listenAddrs) == 0 { + return nil + } + + a.mx.RLock() + filteredIfaceAddrs := a.filteredInterfaceAddrs + allIfaceAddrs := a.allInterfaceAddrs + a.mx.RUnlock() + + // Iterate over all _unresolved_ listen addresses, resolving our primary + // interface only to avoid advertising too many addresses. + finalAddrs := make([]ma.Multiaddr, 0, 8) + if resolved, err := manet.ResolveUnspecifiedAddresses(listenAddrs, filteredIfaceAddrs); err != nil { + // This can happen if we're listening on no addrs, or listening + // on IPv6 addrs, but only have IPv4 interface addrs. + log.Debugw("failed to resolve listen addrs", "error", err) + } else { + finalAddrs = append(finalAddrs, resolved...) + } + + finalAddrs = ma.Unique(finalAddrs) + + // use nat mappings if we have them + if a.natmgr != nil && a.natmgr.HasDiscoveredNAT() { + // We have successfully mapped ports on our NAT. Use those + // instead of observed addresses (mostly). + // Next, apply this mapping to our addresses. + for _, listen := range listenAddrs { + extMaddr := a.natmgr.GetMapping(listen) + if extMaddr == nil { + // not mapped + continue + } + + // if the router reported a sane address + if !manet.IsIPUnspecified(extMaddr) { + // Add in the mapped addr. + finalAddrs = append(finalAddrs, extMaddr) + } else { + log.Warn("NAT device reported an unspecified IP as it's external address") + } + + // Did the router give us a routable public addr? + if manet.IsPublicAddr(extMaddr) { + // well done + continue + } + + // No. + // in case the router gives us a wrong address or we're behind a double-NAT. + // also add observed addresses + resolved, err := manet.ResolveUnspecifiedAddress(listen, allIfaceAddrs) + if err != nil { + // This can happen if we try to resolve /ip6/::/... + // without any IPv6 interface addresses. + continue + } + + for _, addr := range resolved { + // Now, check if we have any observed addresses that + // differ from the one reported by the router. Routers + // don't always give the most accurate information. + observed := a.ids.ObservedAddrsFor(addr) + + if len(observed) == 0 { + continue + } + + // Drop the IP from the external maddr + _, extMaddrNoIP := ma.SplitFirst(extMaddr) + + for _, obsMaddr := range observed { + // Extract a public observed addr. + ip, _ := ma.SplitFirst(obsMaddr) + if ip == nil || !manet.IsPublicAddr(ip) { + continue + } + + finalAddrs = append(finalAddrs, ma.Join(ip, extMaddrNoIP)) + } + } + } + } else { + var observedAddrs []ma.Multiaddr + if a.ids != nil { + observedAddrs = a.ids.OwnObservedAddrs() + } + finalAddrs = append(finalAddrs, observedAddrs...) + } + finalAddrs = ma.Unique(finalAddrs) + // Remove /p2p-circuit addresses from the list. + // The p2p-circuit tranport listener reports its address as just /p2p-circuit + // This is useless for dialing. Users need to manage their circuit addresses themselves, + // or use AutoRelay. + finalAddrs = slices.DeleteFunc(finalAddrs, func(a ma.Multiaddr) bool { + return a.Equal(p2pCircuitAddr) + }) + // Add certhashes for /webrtc-direct, /webtransport, etc addresses discovered + // using identify. + finalAddrs = a.addCertHashes(finalAddrs) + return finalAddrs +} + +func (a *addressService) updateLocalIpAddr() { + a.mx.Lock() + defer a.mx.Unlock() + + a.filteredInterfaceAddrs = nil + a.allInterfaceAddrs = nil + + // Try to use the default ipv4/6 addresses. + // TODO: Remove this. We should advertise all interface addresses. + if r, err := netroute.New(); err != nil { + log.Debugw("failed to build Router for kernel's routing table", "error", err) + } else { + + var localIPv4 net.IP + var ran bool + err, ran = a.updateLocalIPv4Backoff.Run(func() error { + _, _, localIPv4, err = r.Route(net.IPv4zero) + return err + }) + + if ran && err != nil { + log.Debugw("failed to fetch local IPv4 address", "error", err) + } else if ran && localIPv4.IsGlobalUnicast() { + maddr, err := manet.FromIP(localIPv4) + if err == nil { + a.filteredInterfaceAddrs = append(a.filteredInterfaceAddrs, maddr) + } + } + + var localIPv6 net.IP + err, ran = a.updateLocalIPv6Backoff.Run(func() error { + _, _, localIPv6, err = r.Route(net.IPv6unspecified) + return err + }) + + if ran && err != nil { + log.Debugw("failed to fetch local IPv6 address", "error", err) + } else if ran && localIPv6.IsGlobalUnicast() { + maddr, err := manet.FromIP(localIPv6) + if err == nil { + a.filteredInterfaceAddrs = append(a.filteredInterfaceAddrs, maddr) + } + } + } + + // Resolve the interface addresses + ifaceAddrs, err := manet.InterfaceMultiaddrs() + if err != nil { + // This usually shouldn't happen, but we could be in some kind + // of funky restricted environment. + log.Errorw("failed to resolve local interface addresses", "error", err) + + // Add the loopback addresses to the filtered addrs and use them as the non-filtered addrs. + // Then bail. There's nothing else we can do here. + a.filteredInterfaceAddrs = append(a.filteredInterfaceAddrs, manet.IP4Loopback, manet.IP6Loopback) + a.allInterfaceAddrs = a.filteredInterfaceAddrs + return + } + + for _, addr := range ifaceAddrs { + // Skip link-local addrs, they're mostly useless. + if !manet.IsIP6LinkLocal(addr) { + a.allInterfaceAddrs = append(a.allInterfaceAddrs, addr) + } + } + + // If netroute failed to get us any interface addresses, use all of + // them. + if len(a.filteredInterfaceAddrs) == 0 { + // Add all addresses. + a.filteredInterfaceAddrs = a.allInterfaceAddrs + } else { + // Only add loopback addresses. Filter these because we might + // not _have_ an IPv6 loopback address. + for _, addr := range a.allInterfaceAddrs { + if manet.IsIPLoopback(addr) { + a.filteredInterfaceAddrs = append(a.filteredInterfaceAddrs, addr) + } + } + } +} + +func (a *addressService) addCertHashes(addrs []ma.Multiaddr) []ma.Multiaddr { + // This is a temporary workaround/hack that fixes #2233. Once we have a + // proper address pipeline, rework this. See the issue for more context. + type transportForListeninger interface { + TransportForListening(a ma.Multiaddr) transport.Transport + } + + type addCertHasher interface { + AddCertHashes(m ma.Multiaddr) (ma.Multiaddr, bool) + } + + s, ok := a.s.(transportForListeninger) + if !ok { + return addrs + } + + // Copy addrs slice since we'll be modifying it. + addrsOld := addrs + addrs = make([]ma.Multiaddr, len(addrsOld)) + copy(addrs, addrsOld) + + for i, addr := range addrs { + wtOK, wtN := libp2pwebtransport.IsWebtransportMultiaddr(addr) + webrtcOK, webrtcN := libp2pwebrtc.IsWebRTCDirectMultiaddr(addr) + if (wtOK && wtN == 0) || (webrtcOK && webrtcN == 0) { + t := s.TransportForListening(addr) + tpt, ok := t.(addCertHasher) + if !ok { + continue + } + addrWithCerthash, added := tpt.AddCertHashes(addr) + if !added { + log.Debugf("Couldn't add certhashes to multiaddr: %s", addr) + continue + } + addrs[i] = addrWithCerthash + } + } + return addrs +} + +func (a *addressService) makeUpdatedAddrEvent(prev, current []ma.Multiaddr) *event.EvtLocalAddressesUpdated { + if prev == nil && current == nil { + return nil + } + prevmap := make(map[string]ma.Multiaddr, len(prev)) + currmap := make(map[string]ma.Multiaddr, len(current)) + evt := &event.EvtLocalAddressesUpdated{Diffs: true} + addrsAdded := false + + for _, addr := range prev { + prevmap[string(addr.Bytes())] = addr + } + for _, addr := range current { + currmap[string(addr.Bytes())] = addr + } + for _, addr := range currmap { + _, ok := prevmap[string(addr.Bytes())] + updated := event.UpdatedAddress{Address: addr} + if ok { + updated.Action = event.Maintained + } else { + updated.Action = event.Added + addrsAdded = true + } + evt.Current = append(evt.Current, updated) + delete(prevmap, string(addr.Bytes())) + } + for _, addr := range prevmap { + updated := event.UpdatedAddress{Action: event.Removed, Address: addr} + evt.Removed = append(evt.Removed, updated) + } + + if !addrsAdded && len(evt.Removed) == 0 { + return nil + } + + // Our addresses have changed. Make a new signed peer record. + if !a.disableSignedPeerRecord { + // add signed peer record to the event + sr, err := a.peerRecord(current) + if err != nil { + log.Errorf("error creating a signed peer record from the set of current addresses, err=%s", err) + // drop this change + return nil + } + evt.SignedPeerRecord = sr + } + + return evt +} + +func trimHostAddrList(addrs []ma.Multiaddr, maxSize int) []ma.Multiaddr { + totalSize := 0 + for _, a := range addrs { + totalSize += len(a.Bytes()) + } + if totalSize <= maxSize { + return addrs + } + + score := func(addr ma.Multiaddr) int { + var res int + if manet.IsPublicAddr(addr) { + res |= 1 << 12 + } else if !manet.IsIPLoopback(addr) { + res |= 1 << 11 + } + var protocolWeight int + ma.ForEach(addr, func(c ma.Component) bool { + switch c.Protocol().Code { + case ma.P_QUIC_V1: + protocolWeight = 5 + case ma.P_TCP: + protocolWeight = 4 + case ma.P_WSS: + protocolWeight = 3 + case ma.P_WEBTRANSPORT: + protocolWeight = 2 + case ma.P_WEBRTC_DIRECT: + protocolWeight = 1 + case ma.P_P2P: + return false + } + return true + }) + res |= 1 << protocolWeight + return res + } + + slices.SortStableFunc(addrs, func(a, b ma.Multiaddr) int { + return score(b) - score(a) // b-a for reverse order + }) + totalSize = 0 + for i, a := range addrs { + totalSize += len(a.Bytes()) + if totalSize > maxSize { + addrs = addrs[:i] + break + } + } + return addrs +} diff --git a/p2p/host/basic/basic_host.go b/p2p/host/basic/basic_host.go index 5da306d54a..fb555b4545 100644 --- a/p2p/host/basic/basic_host.go +++ b/p2p/host/basic/basic_host.go @@ -5,8 +5,6 @@ import ( "errors" "fmt" "io" - "net" - "slices" "sync" "time" @@ -19,10 +17,8 @@ import ( "github.com/libp2p/go-libp2p/core/peerstore" "github.com/libp2p/go-libp2p/core/protocol" "github.com/libp2p/go-libp2p/core/record" - "github.com/libp2p/go-libp2p/core/transport" "github.com/libp2p/go-libp2p/p2p/host/autonat" "github.com/libp2p/go-libp2p/p2p/host/autorelay" - "github.com/libp2p/go-libp2p/p2p/host/basic/internal/backoff" "github.com/libp2p/go-libp2p/p2p/host/eventbus" "github.com/libp2p/go-libp2p/p2p/host/pstoremanager" "github.com/libp2p/go-libp2p/p2p/host/relaysvc" @@ -35,11 +31,8 @@ import ( libp2pwebtransport "github.com/libp2p/go-libp2p/p2p/transport/webtransport" "github.com/prometheus/client_golang/prometheus" - "github.com/libp2p/go-netroute" - logging "github.com/ipfs/go-log/v2" ma "github.com/multiformats/go-multiaddr" - manet "github.com/multiformats/go-multiaddr/net" msmux "github.com/multiformats/go-multistream" ) @@ -95,13 +88,7 @@ type BasicHost struct { evtLocalAddrsUpdated event.Emitter } - addrChangeChan chan struct{} - - addrMu sync.RWMutex - updateLocalIPv4Backoff backoff.ExpBackoff - updateLocalIPv6Backoff backoff.ExpBackoff - filteredInterfaceAddrs []ma.Multiaddr - allInterfaceAddrs []ma.Multiaddr + addrMu sync.RWMutex disableSignedPeerRecord bool signKey crypto.PrivKey @@ -109,9 +96,10 @@ type BasicHost struct { autoNat autonat.AutoNAT - autonatv2 *autonatv2.AutoNAT - addrSub event.Subscription - autorelay *autorelay.AutoRelay + autonatv2 *autonatv2.AutoNAT + addrSub event.Subscription + autorelay *autorelay.AutoRelay + addressService *addressService } var _ host.Host = (*BasicHost)(nil) @@ -196,54 +184,26 @@ func NewHost(n network.Network, opts *HostOpts) (*BasicHost, error) { if err != nil { return nil, err } + hostCtx, cancel := context.WithCancel(context.Background()) h := &BasicHost{ network: n, psManager: psManager, mux: msmux.NewMultistreamMuxer[protocol.ID](), negtimeout: DefaultNegotiationTimeout, - AddrsFactory: DefaultAddrsFactory, eventbus: opts.EventBus, - addrChangeChan: make(chan struct{}, 1), ctx: hostCtx, ctxCancel: cancel, disableSignedPeerRecord: opts.DisableSignedPeerRecord, addrSub: addrSub, } - h.updateLocalIpAddr() - if h.emitters.evtLocalProtocolsUpdated, err = h.eventbus.Emitter(&event.EvtLocalProtocolsUpdated{}, eventbus.Stateful); err != nil { return nil, err } - if h.emitters.evtLocalAddrsUpdated, err = h.eventbus.Emitter(&event.EvtLocalAddressesUpdated{}, eventbus.Stateful); err != nil { - return nil, err - } - - if !h.disableSignedPeerRecord { - cab, ok := peerstore.GetCertifiedAddrBook(n.Peerstore()) - if !ok { - return nil, errors.New("peerstore should also be a certified address book") - } - h.caBook = cab + if !opts.DisableSignedPeerRecord { h.signKey = h.Peerstore().PrivKey(h.ID()) - if h.signKey == nil { - return nil, errors.New("unable to access host key") - } - - // persist a signed peer record for self to the peerstore. - rec := peer.PeerRecordFromAddrInfo(peer.AddrInfo{ - ID: h.ID(), - Addrs: h.Addrs(), - }) - ev, err := record.Seal(rec, h.signKey) - if err != nil { - return nil, fmt.Errorf("failed to create signed record for self: %w", err) - } - if _, err := cab.ConsumePeerRecord(ev, peerstore.PermanentAddrTTL); err != nil { - return nil, fmt.Errorf("failed to persist signed record to peerstore: %w", err) - } } if opts.MultistreamMuxer != nil { @@ -273,6 +233,31 @@ func NewHost(n network.Network, opts *HostOpts) (*BasicHost, error) { return nil, fmt.Errorf("failed to create Identify service: %s", err) } + if opts.EnableAutoRelay { + if opts.EnableMetrics { + mt := autorelay.WithMetricsTracer( + autorelay.NewMetricsTracer(autorelay.WithRegisterer(opts.PrometheusRegisterer))) + mtOpts := []autorelay.Option{mt} + opts.AutoRelayOpts = append(mtOpts, opts.AutoRelayOpts...) + } + + ar, err := autorelay.NewAutoRelay(h, opts.AutoRelayOpts...) + if err != nil { + return nil, fmt.Errorf("failed to create autorelay: %w", err) + } + h.autorelay = ar + } + + addrFactory := DefaultAddrsFactory + if opts.AddrsFactory != nil { + addrFactory = opts.AddrsFactory + } + + h.addressService, err = NewAddressService(h, opts.NATManager, addrFactory) + if err != nil { + return nil, fmt.Errorf("failed to create address service: %w", err) + } + if opts.EnableHolePunching { if opts.EnableMetrics { hpOpts := []holepunch.Option{ @@ -280,16 +265,7 @@ func NewHost(n network.Network, opts *HostOpts) (*BasicHost, error) { opts.HolePunchingOptions = append(hpOpts, opts.HolePunchingOptions...) } - h.hps, err = holepunch.NewService(h, h.ids, func() []ma.Multiaddr { - addrs := h.AllAddrs() - if opts.AddrsFactory != nil { - addrs = slices.Clone(opts.AddrsFactory(addrs)) - } - // AllAddrs may ignore observed addresses in favour of NAT mappings. Use both for hole punching. - addrs = append(addrs, h.ids.OwnObservedAddrs()...) - addrs = ma.Unique(addrs) - return slices.DeleteFunc(addrs, func(a ma.Multiaddr) bool { return !manet.IsPublicAddr(a) }) - }, opts.HolePunchingOptions...) + h.hps, err = holepunch.NewService(h, h.ids, h.addressService.GetHolePunchAddrs, opts.HolePunchingOptions...) if err != nil { return nil, fmt.Errorf("failed to create hole punch service: %w", err) } @@ -299,14 +275,6 @@ func NewHost(n network.Network, opts *HostOpts) (*BasicHost, error) { h.negtimeout = opts.NegotiationTimeout } - if opts.AddrsFactory != nil { - h.AddrsFactory = opts.AddrsFactory - } - - if opts.NATManager != nil { - h.natmgr = opts.NATManager(n) - } - if opts.ConnManager == nil { h.cmgr = &connmgr.NullConnMgr{} } else { @@ -340,21 +308,6 @@ func NewHost(n network.Network, opts *HostOpts) (*BasicHost, error) { } } - if opts.EnableAutoRelay { - if opts.EnableMetrics { - mt := autorelay.WithMetricsTracer( - autorelay.NewMetricsTracer(autorelay.WithRegisterer(opts.PrometheusRegisterer))) - mtOpts := []autorelay.Option{mt} - opts.AutoRelayOpts = append(mtOpts, opts.AutoRelayOpts...) - } - - ar, err := autorelay.NewAutoRelay(h, opts.AutoRelayOpts...) - if err != nil { - return nil, fmt.Errorf("failed to create autorelay: %w", err) - } - h.autorelay = ar - } - n.SetStreamHandler(h.newStreamHandler) // register to be notified when the network's listen addrs change, @@ -370,92 +323,9 @@ func NewHost(n network.Network, opts *HostOpts) (*BasicHost, error) { return h, nil } -func (h *BasicHost) updateLocalIpAddr() { - h.addrMu.Lock() - defer h.addrMu.Unlock() - - h.filteredInterfaceAddrs = nil - h.allInterfaceAddrs = nil - - // Try to use the default ipv4/6 addresses. - - if r, err := netroute.New(); err != nil { - log.Debugw("failed to build Router for kernel's routing table", "error", err) - } else { - - var localIPv4 net.IP - var ran bool - err, ran = h.updateLocalIPv4Backoff.Run(func() error { - _, _, localIPv4, err = r.Route(net.IPv4zero) - return err - }) - - if ran && err != nil { - log.Debugw("failed to fetch local IPv4 address", "error", err) - } else if ran && localIPv4.IsGlobalUnicast() { - maddr, err := manet.FromIP(localIPv4) - if err == nil { - h.filteredInterfaceAddrs = append(h.filteredInterfaceAddrs, maddr) - } - } - - var localIPv6 net.IP - err, ran = h.updateLocalIPv6Backoff.Run(func() error { - _, _, localIPv6, err = r.Route(net.IPv6unspecified) - return err - }) - - if ran && err != nil { - log.Debugw("failed to fetch local IPv6 address", "error", err) - } else if ran && localIPv6.IsGlobalUnicast() { - maddr, err := manet.FromIP(localIPv6) - if err == nil { - h.filteredInterfaceAddrs = append(h.filteredInterfaceAddrs, maddr) - } - } - } - - // Resolve the interface addresses - ifaceAddrs, err := manet.InterfaceMultiaddrs() - if err != nil { - // This usually shouldn't happen, but we could be in some kind - // of funky restricted environment. - log.Errorw("failed to resolve local interface addresses", "error", err) - - // Add the loopback addresses to the filtered addrs and use them as the non-filtered addrs. - // Then bail. There's nothing else we can do here. - h.filteredInterfaceAddrs = append(h.filteredInterfaceAddrs, manet.IP4Loopback, manet.IP6Loopback) - h.allInterfaceAddrs = h.filteredInterfaceAddrs - return - } - - for _, addr := range ifaceAddrs { - // Skip link-local addrs, they're mostly useless. - if !manet.IsIP6LinkLocal(addr) { - h.allInterfaceAddrs = append(h.allInterfaceAddrs, addr) - } - } - - // If netroute failed to get us any interface addresses, use all of - // them. - if len(h.filteredInterfaceAddrs) == 0 { - // Add all addresses. - h.filteredInterfaceAddrs = h.allInterfaceAddrs - } else { - // Only add loopback addresses. Filter these because we might - // not _have_ an IPv6 loopback address. - for _, addr := range h.allInterfaceAddrs { - if manet.IsIPLoopback(addr) { - h.filteredInterfaceAddrs = append(h.filteredInterfaceAddrs, addr) - } - } - } -} - // Start starts background tasks in the host func (h *BasicHost) Start() { h.psManager.Start() - h.refCount.Add(1) h.ids.Start() if h.autorelay != nil { h.autorelay.Start() @@ -466,7 +336,7 @@ func (h *BasicHost) Start() { log.Errorf("autonat v2 failed to start: %s", err) } } - go h.background() + h.addressService.Start() } // newStreamHandler is the remote-opened stream handler for network.Network @@ -521,61 +391,7 @@ func (h *BasicHost) newStreamHandler(s network.Stream) { // changed. // Warning: this interface is unstable and may disappear in the future. func (h *BasicHost) SignalAddressChange() { - select { - case h.addrChangeChan <- struct{}{}: - default: - } -} - -func (h *BasicHost) makeUpdatedAddrEvent(prev, current []ma.Multiaddr) *event.EvtLocalAddressesUpdated { - if prev == nil && current == nil { - return nil - } - prevmap := make(map[string]ma.Multiaddr, len(prev)) - currmap := make(map[string]ma.Multiaddr, len(current)) - evt := &event.EvtLocalAddressesUpdated{Diffs: true} - addrsAdded := false - - for _, addr := range prev { - prevmap[string(addr.Bytes())] = addr - } - for _, addr := range current { - currmap[string(addr.Bytes())] = addr - } - for _, addr := range currmap { - _, ok := prevmap[string(addr.Bytes())] - updated := event.UpdatedAddress{Address: addr} - if ok { - updated.Action = event.Maintained - } else { - updated.Action = event.Added - addrsAdded = true - } - evt.Current = append(evt.Current, updated) - delete(prevmap, string(addr.Bytes())) - } - for _, addr := range prevmap { - updated := event.UpdatedAddress{Action: event.Removed, Address: addr} - evt.Removed = append(evt.Removed, updated) - } - - if !addrsAdded && len(evt.Removed) == 0 { - return nil - } - - // Our addresses have changed. Make a new signed peer record. - if !h.disableSignedPeerRecord { - // add signed peer record to the event - sr, err := h.makeSignedPeerRecord(current) - if err != nil { - log.Errorf("error creating a signed peer record from the set of current addresses, err=%s", err) - // drop this change - return nil - } - evt.SignedPeerRecord = sr - } - - return evt + h.addressService.SignalAddressChange() } func (h *BasicHost) makeSignedPeerRecord(addrs []ma.Multiaddr) (*record.Envelope, error) { @@ -594,61 +410,6 @@ func (h *BasicHost) makeSignedPeerRecord(addrs []ma.Multiaddr) (*record.Envelope return record.Seal(rec, h.signKey) } -func (h *BasicHost) background() { - defer h.refCount.Done() - var lastAddrs []ma.Multiaddr - - emitAddrChange := func(currentAddrs []ma.Multiaddr, lastAddrs []ma.Multiaddr) { - changeEvt := h.makeUpdatedAddrEvent(lastAddrs, currentAddrs) - if changeEvt == nil { - return - } - // Our addresses have changed. - // store the signed peer record in the peer store. - if !h.disableSignedPeerRecord { - if _, err := h.caBook.ConsumePeerRecord(changeEvt.SignedPeerRecord, peerstore.PermanentAddrTTL); err != nil { - log.Errorf("failed to persist signed peer record in peer store, err=%s", err) - return - } - } - // update host addresses in the peer store - removedAddrs := make([]ma.Multiaddr, 0, len(changeEvt.Removed)) - for _, ua := range changeEvt.Removed { - removedAddrs = append(removedAddrs, ua.Address) - } - h.Peerstore().SetAddrs(h.ID(), currentAddrs, peerstore.PermanentAddrTTL) - h.Peerstore().SetAddrs(h.ID(), removedAddrs, 0) - - // emit addr change event - if err := h.emitters.evtLocalAddrsUpdated.Emit(*changeEvt); err != nil { - log.Warnf("error emitting event for updated addrs: %s", err) - } - } - - // periodically schedules an IdentifyPush to update our peers for changes - // in our address set (if needed) - ticker := time.NewTicker(addrChangeTickrInterval) - defer ticker.Stop() - - for { - // Update our local IP addresses before checking our current addresses. - if len(h.network.ListenAddresses()) > 0 { - h.updateLocalIpAddr() - } - curr := h.Addrs() - emitAddrChange(curr, lastAddrs) - lastAddrs = curr - - select { - case <-ticker.C: - case <-h.addrChangeChan: - case <-h.addrSub.Out(): - case <-h.ctx.Done(): - return - } - } -} - // ID returns the (local) peer.ID associated with this Host func (h *BasicHost) ID() peer.ID { return h.Network().LocalPeer() @@ -872,15 +633,7 @@ func (h *BasicHost) ConnManager() connmgr.ConnManager { // When used with AutoRelay, and if the host is not publicly reachable, // this will only have host's private, relay, and no public addresses. func (h *BasicHost) Addrs() []ma.Multiaddr { - addrs := h.AllAddrs() - // Make a copy. Consumers can modify the slice elements - if h.autoNat != nil && h.autorelay != nil && h.autoNat.Status() == network.ReachabilityPrivate { - addrs = slices.DeleteFunc(addrs, func(a ma.Multiaddr) bool { return manet.IsPublicAddr(a) }) - addrs = append(addrs, h.autorelay.RelayAddrs()...) - } - addrs = slices.Clone(h.AddrsFactory(addrs)) - // Add certhashes for the addresses provided by the user via address factory. - return h.addCertHashes(ma.Unique(addrs)) + return h.addressService.Addrs() } // NormalizeMultiaddr returns a multiaddr suitable for equality checks. @@ -900,205 +653,9 @@ func (h *BasicHost) NormalizeMultiaddr(addr ma.Multiaddr) ma.Multiaddr { return addr } -var p2pCircuitAddr = ma.StringCast("/p2p-circuit") - // AllAddrs returns all the addresses the host is listening on except circuit addresses. func (h *BasicHost) AllAddrs() []ma.Multiaddr { - listenAddrs := h.Network().ListenAddresses() - if len(listenAddrs) == 0 { - return nil - } - - h.addrMu.RLock() - filteredIfaceAddrs := h.filteredInterfaceAddrs - allIfaceAddrs := h.allInterfaceAddrs - h.addrMu.RUnlock() - - // Iterate over all _unresolved_ listen addresses, resolving our primary - // interface only to avoid advertising too many addresses. - finalAddrs := make([]ma.Multiaddr, 0, 8) - if resolved, err := manet.ResolveUnspecifiedAddresses(listenAddrs, filteredIfaceAddrs); err != nil { - // This can happen if we're listening on no addrs, or listening - // on IPv6 addrs, but only have IPv4 interface addrs. - log.Debugw("failed to resolve listen addrs", "error", err) - } else { - finalAddrs = append(finalAddrs, resolved...) - } - - finalAddrs = ma.Unique(finalAddrs) - - // use nat mappings if we have them - if h.natmgr != nil && h.natmgr.HasDiscoveredNAT() { - // We have successfully mapped ports on our NAT. Use those - // instead of observed addresses (mostly). - // Next, apply this mapping to our addresses. - for _, listen := range listenAddrs { - extMaddr := h.natmgr.GetMapping(listen) - if extMaddr == nil { - // not mapped - continue - } - - // if the router reported a sane address - if !manet.IsIPUnspecified(extMaddr) { - // Add in the mapped addr. - finalAddrs = append(finalAddrs, extMaddr) - } else { - log.Warn("NAT device reported an unspecified IP as it's external address") - } - - // Did the router give us a routable public addr? - if manet.IsPublicAddr(extMaddr) { - // well done - continue - } - - // No. - // in case the router gives us a wrong address or we're behind a double-NAT. - // also add observed addresses - resolved, err := manet.ResolveUnspecifiedAddress(listen, allIfaceAddrs) - if err != nil { - // This can happen if we try to resolve /ip6/::/... - // without any IPv6 interface addresses. - continue - } - - for _, addr := range resolved { - // Now, check if we have any observed addresses that - // differ from the one reported by the router. Routers - // don't always give the most accurate information. - observed := h.ids.ObservedAddrsFor(addr) - - if len(observed) == 0 { - continue - } - - // Drop the IP from the external maddr - _, extMaddrNoIP := ma.SplitFirst(extMaddr) - - for _, obsMaddr := range observed { - // Extract a public observed addr. - ip, _ := ma.SplitFirst(obsMaddr) - if ip == nil || !manet.IsPublicAddr(ip) { - continue - } - - finalAddrs = append(finalAddrs, ma.Join(ip, extMaddrNoIP)) - } - } - } - } else { - var observedAddrs []ma.Multiaddr - if h.ids != nil { - observedAddrs = h.ids.OwnObservedAddrs() - } - finalAddrs = append(finalAddrs, observedAddrs...) - } - finalAddrs = ma.Unique(finalAddrs) - // Remove /p2p-circuit addresses from the list. - // The p2p-circuit tranport listener reports its address as just /p2p-circuit - // This is useless for dialing. Users need to manage their circuit addresses themselves, - // or use AutoRelay. - finalAddrs = slices.DeleteFunc(finalAddrs, func(a ma.Multiaddr) bool { - return a.Equal(p2pCircuitAddr) - }) - // Add certhashes for /webrtc-direct, /webtransport, etc addresses discovered - // using identify. - finalAddrs = h.addCertHashes(finalAddrs) - return finalAddrs -} - -func (h *BasicHost) addCertHashes(addrs []ma.Multiaddr) []ma.Multiaddr { - // This is a temporary workaround/hack that fixes #2233. Once we have a - // proper address pipeline, rework this. See the issue for more context. - type transportForListeninger interface { - TransportForListening(a ma.Multiaddr) transport.Transport - } - - type addCertHasher interface { - AddCertHashes(m ma.Multiaddr) (ma.Multiaddr, bool) - } - - s, ok := h.Network().(transportForListeninger) - if !ok { - return addrs - } - - // Copy addrs slice since we'll be modifying it. - addrsOld := addrs - addrs = make([]ma.Multiaddr, len(addrsOld)) - copy(addrs, addrsOld) - - for i, addr := range addrs { - wtOK, wtN := libp2pwebtransport.IsWebtransportMultiaddr(addr) - webrtcOK, webrtcN := libp2pwebrtc.IsWebRTCDirectMultiaddr(addr) - if (wtOK && wtN == 0) || (webrtcOK && webrtcN == 0) { - t := s.TransportForListening(addr) - tpt, ok := t.(addCertHasher) - if !ok { - continue - } - addrWithCerthash, added := tpt.AddCertHashes(addr) - if !added { - log.Debugf("Couldn't add certhashes to multiaddr: %s", addr) - continue - } - addrs[i] = addrWithCerthash - } - } - return addrs -} - -func trimHostAddrList(addrs []ma.Multiaddr, maxSize int) []ma.Multiaddr { - totalSize := 0 - for _, a := range addrs { - totalSize += len(a.Bytes()) - } - if totalSize <= maxSize { - return addrs - } - - score := func(addr ma.Multiaddr) int { - var res int - if manet.IsPublicAddr(addr) { - res |= 1 << 12 - } else if !manet.IsIPLoopback(addr) { - res |= 1 << 11 - } - var protocolWeight int - ma.ForEach(addr, func(c ma.Component) bool { - switch c.Protocol().Code { - case ma.P_QUIC_V1: - protocolWeight = 5 - case ma.P_TCP: - protocolWeight = 4 - case ma.P_WSS: - protocolWeight = 3 - case ma.P_WEBTRANSPORT: - protocolWeight = 2 - case ma.P_WEBRTC_DIRECT: - protocolWeight = 1 - case ma.P_P2P: - return false - } - return true - }) - res |= 1 << protocolWeight - return res - } - - slices.SortStableFunc(addrs, func(a, b ma.Multiaddr) int { - return score(b) - score(a) // b-a for reverse order - }) - totalSize = 0 - for i, a := range addrs { - totalSize += len(a.Bytes()) - if totalSize > maxSize { - addrs = addrs[:i] - break - } - } - return addrs + return h.addressService.AllAddrs() } // SetAutoNat sets the autonat service for the host. @@ -1108,6 +665,7 @@ func (h *BasicHost) SetAutoNat(a autonat.AutoNAT) { if h.autoNat == nil { h.autoNat = a } + h.addressService.SetAutoNAT(h.autoNat) } // GetAutoNat returns the host's AutoNAT service, if AutoNAT is enabled. @@ -1127,6 +685,9 @@ func (h *BasicHost) Close() error { if h.cmgr != nil { h.cmgr.Close() } + + h.addressService.Close() + if h.ids != nil { h.ids.Close() } @@ -1147,7 +708,6 @@ func (h *BasicHost) Close() error { } _ = h.emitters.evtLocalProtocolsUpdated.Close() - _ = h.emitters.evtLocalAddrsUpdated.Close() if err := h.network.Close(); err != nil { log.Errorf("swarm close failed: %v", err) @@ -1160,8 +720,6 @@ func (h *BasicHost) Close() error { h.refCount.Wait() - _ = h.addrSub.Close() - if h.Network().ResourceManager() != nil { h.Network().ResourceManager().Close() } diff --git a/p2p/host/basic/basic_host_test.go b/p2p/host/basic/basic_host_test.go index 2a7a772976..a9c7ab28fd 100644 --- a/p2p/host/basic/basic_host_test.go +++ b/p2p/host/basic/basic_host_test.go @@ -8,6 +8,7 @@ import ( "reflect" "strings" "sync" + "sync/atomic" "testing" "time" @@ -212,10 +213,10 @@ func TestLocalIPChangesWhenListenAddrChanges(t *testing.T) { h.Start() defer h.Close() - h.addrMu.Lock() - h.filteredInterfaceAddrs = nil - h.allInterfaceAddrs = nil - h.addrMu.Unlock() + h.addressService.mx.Lock() + h.addressService.filteredInterfaceAddrs = nil + h.addressService.allInterfaceAddrs = nil + h.addressService.mx.Unlock() // change listen addrs and verify local IP addr is not nil again require.NoError(t, h.Network().Listen(ma.StringCast("/ip4/0.0.0.0/tcp/0"))) @@ -224,8 +225,8 @@ func TestLocalIPChangesWhenListenAddrChanges(t *testing.T) { h.addrMu.RLock() defer h.addrMu.RUnlock() - require.NotEmpty(t, h.filteredInterfaceAddrs) - require.NotEmpty(t, h.allInterfaceAddrs) + require.NotEmpty(t, h.addressService.filteredInterfaceAddrs) + require.NotEmpty(t, h.addressService.allInterfaceAddrs) } func TestAllAddrs(t *testing.T) { @@ -619,8 +620,13 @@ func TestAddrChangeImmediatelyIfAddressNonEmpty(t *testing.T) { ctx := context.Background() taddrs := []ma.Multiaddr{ma.StringCast("/ip4/1.2.3.4/tcp/1234")} - starting := make(chan struct{}) + starting := make(chan struct{}, 1) + var count atomic.Int32 h, err := NewHost(swarmt.GenSwarm(t), &HostOpts{AddrsFactory: func(addrs []ma.Multiaddr) []ma.Multiaddr { + // The first call here is made from the constructor. Don't block. + if count.Add(1) == 1 { + return addrs + } <-starting return taddrs }}) @@ -628,11 +634,11 @@ func TestAddrChangeImmediatelyIfAddressNonEmpty(t *testing.T) { defer h.Close() sub, err := h.EventBus().Subscribe(&event.EvtLocalAddressesUpdated{}) - close(starting) if err != nil { t.Error(err) } defer sub.Close() + close(starting) h.Start() expected := event.EvtLocalAddressesUpdated{ From e8dba37aedb7273ca35bce4679681498de250da8 Mon Sep 17 00:00:00 2001 From: sukun Date: Mon, 2 Dec 2024 00:37:38 +0530 Subject: [PATCH 2/5] cleanup interface address handling --- p2p/host/basic/address_service.go | 257 ++++++++++++++---------------- p2p/host/basic/basic_host.go | 62 ++++++- p2p/host/basic/basic_host_test.go | 23 --- 3 files changed, 176 insertions(+), 166 deletions(-) diff --git a/p2p/host/basic/address_service.go b/p2p/host/basic/address_service.go index 6c3a021f1c..c7166e7ac0 100644 --- a/p2p/host/basic/address_service.go +++ b/p2p/host/basic/address_service.go @@ -42,6 +42,7 @@ type addressService struct { autorelay *autorelay.AutoRelay addrsFactory AddrsFactory addrChangeChan chan struct{} + relayAddrsSub event.Subscription ctx context.Context ctxCancel context.CancelFunc @@ -49,9 +50,7 @@ type addressService struct { updateLocalIPv4Backoff backoff.ExpBackoff updateLocalIPv6Backoff backoff.ExpBackoff - mx sync.RWMutex - allInterfaceAddrs []ma.Multiaddr - filteredInterfaceAddrs []ma.Multiaddr + ifaceAddrs *interfaceAddrsCache } func NewAddressService(h *BasicHost, natmgr func(network.Network) NATManager, @@ -64,6 +63,10 @@ func NewAddressService(h *BasicHost, natmgr func(network.Network) NATManager, if natmgr != nil { nmgr = natmgr(h.Network()) } + addrSub, err := h.EventBus().Subscribe(new(event.EvtAutoRelayAddrs)) + if err != nil { + return nil, err + } ctx, cancel := context.WithCancel(context.Background()) as := &addressService{ s: h.Network(), @@ -77,10 +80,11 @@ func NewAddressService(h *BasicHost, natmgr func(network.Network) NATManager, autorelay: h.autorelay, addrsFactory: addrFactory, addrChangeChan: make(chan struct{}, 1), + relayAddrsSub: addrSub, ctx: ctx, ctxCancel: cancel, + ifaceAddrs: &interfaceAddrsCache{}, } - as.updateLocalIpAddr() if !as.disableSignedPeerRecord { cab, ok := peerstore.GetCertifiedAddrBook(as.peerstore) if !ok { @@ -172,10 +176,6 @@ func (a *addressService) background() { defer ticker.Stop() for { - // Update our local IP addresses before checking our current addresses. - if len(a.s.ListenAddresses()) > 0 { - a.updateLocalIpAddr() - } curr := a.Addrs() emitAddrChange(curr, lastAddrs) lastAddrs = curr @@ -183,6 +183,7 @@ func (a *addressService) background() { select { case <-ticker.C: case <-a.addrChangeChan: + case <-a.relayAddrsSub.Out(): case <-a.ctx.Done(): return } @@ -225,10 +226,7 @@ func (a *addressService) AllAddrs() []ma.Multiaddr { return nil } - a.mx.RLock() - filteredIfaceAddrs := a.filteredInterfaceAddrs - allIfaceAddrs := a.allInterfaceAddrs - a.mx.RUnlock() + filteredIfaceAddrs := a.ifaceAddrs.Filtered() // Iterate over all _unresolved_ listen addresses, resolving our primary // interface only to avoid advertising too many addresses. @@ -272,7 +270,7 @@ func (a *addressService) AllAddrs() []ma.Multiaddr { // No. // in case the router gives us a wrong address or we're behind a double-NAT. // also add observed addresses - resolved, err := manet.ResolveUnspecifiedAddress(listen, allIfaceAddrs) + resolved, err := manet.ResolveUnspecifiedAddress(listen, a.ifaceAddrs.All()) if err != nil { // This can happen if we try to resolve /ip6/::/... // without any IPv6 interface addresses. @@ -324,88 +322,6 @@ func (a *addressService) AllAddrs() []ma.Multiaddr { return finalAddrs } -func (a *addressService) updateLocalIpAddr() { - a.mx.Lock() - defer a.mx.Unlock() - - a.filteredInterfaceAddrs = nil - a.allInterfaceAddrs = nil - - // Try to use the default ipv4/6 addresses. - // TODO: Remove this. We should advertise all interface addresses. - if r, err := netroute.New(); err != nil { - log.Debugw("failed to build Router for kernel's routing table", "error", err) - } else { - - var localIPv4 net.IP - var ran bool - err, ran = a.updateLocalIPv4Backoff.Run(func() error { - _, _, localIPv4, err = r.Route(net.IPv4zero) - return err - }) - - if ran && err != nil { - log.Debugw("failed to fetch local IPv4 address", "error", err) - } else if ran && localIPv4.IsGlobalUnicast() { - maddr, err := manet.FromIP(localIPv4) - if err == nil { - a.filteredInterfaceAddrs = append(a.filteredInterfaceAddrs, maddr) - } - } - - var localIPv6 net.IP - err, ran = a.updateLocalIPv6Backoff.Run(func() error { - _, _, localIPv6, err = r.Route(net.IPv6unspecified) - return err - }) - - if ran && err != nil { - log.Debugw("failed to fetch local IPv6 address", "error", err) - } else if ran && localIPv6.IsGlobalUnicast() { - maddr, err := manet.FromIP(localIPv6) - if err == nil { - a.filteredInterfaceAddrs = append(a.filteredInterfaceAddrs, maddr) - } - } - } - - // Resolve the interface addresses - ifaceAddrs, err := manet.InterfaceMultiaddrs() - if err != nil { - // This usually shouldn't happen, but we could be in some kind - // of funky restricted environment. - log.Errorw("failed to resolve local interface addresses", "error", err) - - // Add the loopback addresses to the filtered addrs and use them as the non-filtered addrs. - // Then bail. There's nothing else we can do here. - a.filteredInterfaceAddrs = append(a.filteredInterfaceAddrs, manet.IP4Loopback, manet.IP6Loopback) - a.allInterfaceAddrs = a.filteredInterfaceAddrs - return - } - - for _, addr := range ifaceAddrs { - // Skip link-local addrs, they're mostly useless. - if !manet.IsIP6LinkLocal(addr) { - a.allInterfaceAddrs = append(a.allInterfaceAddrs, addr) - } - } - - // If netroute failed to get us any interface addresses, use all of - // them. - if len(a.filteredInterfaceAddrs) == 0 { - // Add all addresses. - a.filteredInterfaceAddrs = a.allInterfaceAddrs - } else { - // Only add loopback addresses. Filter these because we might - // not _have_ an IPv6 loopback address. - for _, addr := range a.allInterfaceAddrs { - if manet.IsIPLoopback(addr) { - a.filteredInterfaceAddrs = append(a.filteredInterfaceAddrs, addr) - } - } - } -} - func (a *addressService) addCertHashes(addrs []ma.Multiaddr) []ma.Multiaddr { // This is a temporary workaround/hack that fixes #2233. Once we have a // proper address pipeline, rework this. See the issue for more context. @@ -498,54 +414,125 @@ func (a *addressService) makeUpdatedAddrEvent(prev, current []ma.Multiaddr) *eve return evt } -func trimHostAddrList(addrs []ma.Multiaddr, maxSize int) []ma.Multiaddr { - totalSize := 0 - for _, a := range addrs { - totalSize += len(a.Bytes()) +const ifaceAddrsTTL = time.Minute + +type interfaceAddrsCache struct { + mx sync.RWMutex + filtered []ma.Multiaddr + all []ma.Multiaddr + updateLocalIPv4Backoff backoff.ExpBackoff + updateLocalIPv6Backoff backoff.ExpBackoff + lastUpdated time.Time +} + +func (i *interfaceAddrsCache) Filtered() []ma.Multiaddr { + i.mx.RLock() + if time.Now().After(i.lastUpdated.Add(ifaceAddrsTTL)) { + i.mx.RUnlock() + return i.update(true) } - if totalSize <= maxSize { - return addrs + defer i.mx.RUnlock() + return i.filtered +} + +func (i *interfaceAddrsCache) All() []ma.Multiaddr { + i.mx.RLock() + if time.Now().After(i.lastUpdated.Add(ifaceAddrsTTL)) { + i.mx.RUnlock() + return i.update(false) } + defer i.mx.RUnlock() + return i.all +} - score := func(addr ma.Multiaddr) int { - var res int - if manet.IsPublicAddr(addr) { - res |= 1 << 12 - } else if !manet.IsIPLoopback(addr) { - res |= 1 << 11 +func (i *interfaceAddrsCache) update(filtered bool) []ma.Multiaddr { + i.mx.Lock() + defer i.mx.Unlock() + if !time.Now().After(i.lastUpdated.Add(ifaceAddrsTTL)) { + if filtered { + return i.filtered } - var protocolWeight int - ma.ForEach(addr, func(c ma.Component) bool { - switch c.Protocol().Code { - case ma.P_QUIC_V1: - protocolWeight = 5 - case ma.P_TCP: - protocolWeight = 4 - case ma.P_WSS: - protocolWeight = 3 - case ma.P_WEBTRANSPORT: - protocolWeight = 2 - case ma.P_WEBRTC_DIRECT: - protocolWeight = 1 - case ma.P_P2P: - return false + return i.all + } + i.updateUnlocked() + i.lastUpdated = time.Now() + if filtered { + return i.filtered + } + return i.all +} + +func (i *interfaceAddrsCache) updateUnlocked() { + i.filtered = nil + i.all = nil + + // Try to use the default ipv4/6 addresses. + // TODO: Remove this. We should advertise all interface addresses. + if r, err := netroute.New(); err != nil { + log.Debugw("failed to build Router for kernel's routing table", "error", err) + } else { + + var localIPv4 net.IP + var ran bool + err, ran = i.updateLocalIPv4Backoff.Run(func() error { + _, _, localIPv4, err = r.Route(net.IPv4zero) + return err + }) + + if ran && err != nil { + log.Debugw("failed to fetch local IPv4 address", "error", err) + } else if ran && localIPv4.IsGlobalUnicast() { + maddr, err := manet.FromIP(localIPv4) + if err == nil { + i.filtered = append(i.filtered, maddr) } - return true + } + + var localIPv6 net.IP + err, ran = i.updateLocalIPv6Backoff.Run(func() error { + _, _, localIPv6, err = r.Route(net.IPv6unspecified) + return err }) - res |= 1 << protocolWeight - return res + + if ran && err != nil { + log.Debugw("failed to fetch local IPv6 address", "error", err) + } else if ran && localIPv6.IsGlobalUnicast() { + maddr, err := manet.FromIP(localIPv6) + if err == nil { + i.filtered = append(i.filtered, maddr) + } + } } - slices.SortStableFunc(addrs, func(a, b ma.Multiaddr) int { - return score(b) - score(a) // b-a for reverse order - }) - totalSize = 0 - for i, a := range addrs { - totalSize += len(a.Bytes()) - if totalSize > maxSize { - addrs = addrs[:i] - break + // Resolve the interface addresses + ifaceAddrs, err := manet.InterfaceMultiaddrs() + if err != nil { + // This usually shouldn't happen, but we could be in some kind + // of funky restricted environment. + log.Errorw("failed to resolve local interface addresses", "error", err) + + // Add the loopback addresses to the filtered addrs and use them as the non-filtered addrs. + // Then bail. There's nothing else we can do here. + i.filtered = append(i.filtered, manet.IP4Loopback, manet.IP6Loopback) + i.all = i.filtered + return + } + + // remove link local ipv6 addresses + i.all = slices.DeleteFunc(ifaceAddrs, manet.IsIP6LinkLocal) + + // If netroute failed to get us any interface addresses, use all of + // them. + if len(i.filtered) == 0 { + // Add all addresses. + i.filtered = i.all + } else { + // Only add loopback addresses. Filter these because we might + // not _have_ an IPv6 loopback address. + for _, addr := range i.all { + if manet.IsIPLoopback(addr) { + i.filtered = append(i.filtered, addr) + } } } - return addrs } diff --git a/p2p/host/basic/basic_host.go b/p2p/host/basic/basic_host.go index fb555b4545..801656c499 100644 --- a/p2p/host/basic/basic_host.go +++ b/p2p/host/basic/basic_host.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "io" + "slices" "sync" "time" @@ -33,6 +34,7 @@ import ( logging "github.com/ipfs/go-log/v2" ma "github.com/multiformats/go-multiaddr" + manet "github.com/multiformats/go-multiaddr/net" msmux "github.com/multiformats/go-multistream" ) @@ -97,7 +99,6 @@ type BasicHost struct { autoNat autonat.AutoNAT autonatv2 *autonatv2.AutoNAT - addrSub event.Subscription autorelay *autorelay.AutoRelay addressService *addressService } @@ -180,11 +181,6 @@ func NewHost(n network.Network, opts *HostOpts) (*BasicHost, error) { return nil, err } - addrSub, err := opts.EventBus.Subscribe(new(event.EvtAutoRelayAddrs)) - if err != nil { - return nil, err - } - hostCtx, cancel := context.WithCancel(context.Background()) h := &BasicHost{ network: n, @@ -195,7 +191,6 @@ func NewHost(n network.Network, opts *HostOpts) (*BasicHost, error) { ctx: hostCtx, ctxCancel: cancel, disableSignedPeerRecord: opts.DisableSignedPeerRecord, - addrSub: addrSub, } if h.emitters.evtLocalProtocolsUpdated, err = h.eventbus.Emitter(&event.EvtLocalProtocolsUpdated{}, eventbus.Stateful); err != nil { @@ -596,7 +591,6 @@ func (h *BasicHost) Connect(ctx context.Context, pi peer.AddrInfo) error { return nil } } - return h.dialPeer(ctx, pi.ID) } @@ -728,6 +722,58 @@ func (h *BasicHost) Close() error { return nil } +func trimHostAddrList(addrs []ma.Multiaddr, maxSize int) []ma.Multiaddr { + totalSize := 0 + for _, a := range addrs { + totalSize += len(a.Bytes()) + } + if totalSize <= maxSize { + return addrs + } + + score := func(addr ma.Multiaddr) int { + var res int + if manet.IsPublicAddr(addr) { + res |= 1 << 12 + } else if !manet.IsIPLoopback(addr) { + res |= 1 << 11 + } + var protocolWeight int + ma.ForEach(addr, func(c ma.Component) bool { + switch c.Protocol().Code { + case ma.P_QUIC_V1: + protocolWeight = 5 + case ma.P_TCP: + protocolWeight = 4 + case ma.P_WSS: + protocolWeight = 3 + case ma.P_WEBTRANSPORT: + protocolWeight = 2 + case ma.P_WEBRTC_DIRECT: + protocolWeight = 1 + case ma.P_P2P: + return false + } + return true + }) + res |= 1 << protocolWeight + return res + } + + slices.SortStableFunc(addrs, func(a, b ma.Multiaddr) int { + return score(b) - score(a) // b-a for reverse order + }) + totalSize = 0 + for i, a := range addrs { + totalSize += len(a.Bytes()) + if totalSize > maxSize { + addrs = addrs[:i] + break + } + } + return addrs +} + type streamWrapper struct { network.Stream rw io.ReadWriteCloser diff --git a/p2p/host/basic/basic_host_test.go b/p2p/host/basic/basic_host_test.go index a9c7ab28fd..2d254956d2 100644 --- a/p2p/host/basic/basic_host_test.go +++ b/p2p/host/basic/basic_host_test.go @@ -206,29 +206,6 @@ func TestHostAddrsFactory(t *testing.T) { } } -func TestLocalIPChangesWhenListenAddrChanges(t *testing.T) { - // no listen addrs - h, err := NewHost(swarmt.GenSwarm(t, swarmt.OptDialOnly), nil) - require.NoError(t, err) - h.Start() - defer h.Close() - - h.addressService.mx.Lock() - h.addressService.filteredInterfaceAddrs = nil - h.addressService.allInterfaceAddrs = nil - h.addressService.mx.Unlock() - - // change listen addrs and verify local IP addr is not nil again - require.NoError(t, h.Network().Listen(ma.StringCast("/ip4/0.0.0.0/tcp/0"))) - h.SignalAddressChange() - time.Sleep(1 * time.Second) - - h.addrMu.RLock() - defer h.addrMu.RUnlock() - require.NotEmpty(t, h.addressService.filteredInterfaceAddrs) - require.NotEmpty(t, h.addressService.allInterfaceAddrs) -} - func TestAllAddrs(t *testing.T) { // no listen addrs h, err := NewHost(swarmt.GenSwarm(t, swarmt.OptDialOnly), nil) From 12e911006877644259fde5812ee630a6a0c6cd58 Mon Sep 17 00:00:00 2001 From: sukun Date: Mon, 2 Dec 2024 14:34:41 +0530 Subject: [PATCH 3/5] cleanup nat addr handling; add tests --- p2p/host/basic/address_service.go | 266 +++++++++++++------------ p2p/host/basic/address_service_test.go | 98 +++++++++ p2p/host/basic/basic_host.go | 1 - 3 files changed, 238 insertions(+), 127 deletions(-) create mode 100644 p2p/host/basic/address_service_test.go diff --git a/p2p/host/basic/address_service.go b/p2p/host/basic/address_service.go index c7166e7ac0..72949f79d1 100644 --- a/p2p/host/basic/address_service.go +++ b/p2p/host/basic/address_service.go @@ -19,7 +19,6 @@ import ( "github.com/libp2p/go-libp2p/p2p/host/autorelay" "github.com/libp2p/go-libp2p/p2p/host/basic/internal/backoff" "github.com/libp2p/go-libp2p/p2p/host/eventbus" - "github.com/libp2p/go-libp2p/p2p/protocol/identify" libp2pwebrtc "github.com/libp2p/go-libp2p/p2p/transport/webrtc" libp2pwebtransport "github.com/libp2p/go-libp2p/p2p/transport/webtransport" "github.com/libp2p/go-netroute" @@ -29,28 +28,29 @@ import ( type peerRecordFunc func([]ma.Multiaddr) (*record.Envelope, error) -type addressService struct { - s network.Network - natmgr NATManager - ids identify.IDService - peerRecord peerRecordFunc - disableSignedPeerRecord bool - peerstore peerstore.Peerstore - id peer.ID - autonat autonat.AutoNAT - emitter event.Emitter - autorelay *autorelay.AutoRelay - addrsFactory AddrsFactory - addrChangeChan chan struct{} - relayAddrsSub event.Subscription - ctx context.Context - ctxCancel context.CancelFunc - - wg sync.WaitGroup - updateLocalIPv4Backoff backoff.ExpBackoff - updateLocalIPv6Backoff backoff.ExpBackoff +type observedAddrsService interface { + OwnObservedAddrs() []ma.Multiaddr + ObservedAddrsFor(local ma.Multiaddr) []ma.Multiaddr +} - ifaceAddrs *interfaceAddrsCache +type addressService struct { + net network.Network + peerstore peerstore.Peerstore + id peer.ID + addrsFactory AddrsFactory + // peerRecord is used to create peer records when the addresses change + peerRecord peerRecordFunc + autonat autonat.AutoNAT + autorelay *autorelay.AutoRelay + natmgr NATManager + observedAddrsService observedAddrsService + addrChangeChan chan struct{} + relayAddrsSub event.Subscription + emitter event.Emitter + ifaceAddrs *interfaceAddrsCache + wg sync.WaitGroup + ctx context.Context + ctxCancel context.CancelFunc } func NewAddressService(h *BasicHost, natmgr func(network.Network) NATManager, @@ -67,30 +67,32 @@ func NewAddressService(h *BasicHost, natmgr func(network.Network) NATManager, if err != nil { return nil, err } + peerRecord := h.makeSignedPeerRecord + if !h.disableSignedPeerRecord { + peerRecord = nil + } ctx, cancel := context.WithCancel(context.Background()) as := &addressService{ - s: h.Network(), - ids: h.IDService(), - peerRecord: h.makeSignedPeerRecord, - disableSignedPeerRecord: h.disableSignedPeerRecord, - peerstore: h.Peerstore(), - id: h.ID(), - natmgr: nmgr, - emitter: emitter, - autorelay: h.autorelay, - addrsFactory: addrFactory, - addrChangeChan: make(chan struct{}, 1), - relayAddrsSub: addrSub, - ctx: ctx, - ctxCancel: cancel, - ifaceAddrs: &interfaceAddrsCache{}, - } - if !as.disableSignedPeerRecord { + net: h.Network(), + peerstore: h.Peerstore(), + observedAddrsService: h.IDService(), + id: h.ID(), + peerRecord: peerRecord, + natmgr: nmgr, + emitter: emitter, + autorelay: h.autorelay, + addrsFactory: addrFactory, + addrChangeChan: make(chan struct{}, 1), + relayAddrsSub: addrSub, + ctx: ctx, + ctxCancel: cancel, + ifaceAddrs: &interfaceAddrsCache{}, + } + if as.peerRecord != nil { cab, ok := peerstore.GetCertifiedAddrBook(as.peerstore) if !ok { return nil, errors.New("peerstore should also be a certified address book") } - h.caBook = cab rec, err := as.peerRecord(as.Addrs()) if err != nil { return nil, fmt.Errorf("failed to create signed record for self: %w", err) @@ -124,6 +126,10 @@ func (a *addressService) Close() { if err != nil { log.Warnf("error closing addrs update emitter: %s", err) } + err = a.relayAddrsSub.Close() + if err != nil { + log.Warnf("error closing addrs update emitter: %s", err) + } } func (a *addressService) SignalAddressChange() { @@ -131,7 +137,6 @@ func (a *addressService) SignalAddressChange() { case a.addrChangeChan <- struct{}{}: default: } - } func (a *addressService) background() { @@ -145,7 +150,7 @@ func (a *addressService) background() { } // Our addresses have changed. // store the signed peer record in the peer store. - if !a.disableSignedPeerRecord { + if a.peerRecord != nil { cabook, ok := peerstore.GetCertifiedAddrBook(a.peerstore) if !ok { log.Errorf("peerstore doesn't implement certified address book") @@ -212,7 +217,7 @@ func (a *addressService) GetHolePunchAddrs() []ma.Multiaddr { addrs = slices.Clone(a.addrsFactory(addrs)) // AllAddrs may ignore observed addresses in favour of NAT mappings. // Use both for hole punching. - addrs = append(addrs, a.ids.OwnObservedAddrs()...) + addrs = append(addrs, a.observedAddrsService.OwnObservedAddrs()...) addrs = ma.Unique(addrs) return slices.DeleteFunc(addrs, func(a ma.Multiaddr) bool { return !manet.IsPublicAddr(a) }) } @@ -221,96 +226,20 @@ var p2pCircuitAddr = ma.StringCast("/p2p-circuit") // AllAddrs returns all the addresses the host is listening on except circuit addresses. func (a *addressService) AllAddrs() []ma.Multiaddr { - listenAddrs := a.s.ListenAddresses() + listenAddrs := a.net.ListenAddresses() if len(listenAddrs) == 0 { return nil } - filteredIfaceAddrs := a.ifaceAddrs.Filtered() - - // Iterate over all _unresolved_ listen addresses, resolving our primary - // interface only to avoid advertising too many addresses. finalAddrs := make([]ma.Multiaddr, 0, 8) - if resolved, err := manet.ResolveUnspecifiedAddresses(listenAddrs, filteredIfaceAddrs); err != nil { - // This can happen if we're listening on no addrs, or listening - // on IPv6 addrs, but only have IPv4 interface addrs. - log.Debugw("failed to resolve listen addrs", "error", err) - } else { - finalAddrs = append(finalAddrs, resolved...) - } - - finalAddrs = ma.Unique(finalAddrs) + finalAddrs = a.appendInterfaceAddrs(finalAddrs, listenAddrs) // use nat mappings if we have them - if a.natmgr != nil && a.natmgr.HasDiscoveredNAT() { - // We have successfully mapped ports on our NAT. Use those - // instead of observed addresses (mostly). - // Next, apply this mapping to our addresses. - for _, listen := range listenAddrs { - extMaddr := a.natmgr.GetMapping(listen) - if extMaddr == nil { - // not mapped - continue - } - - // if the router reported a sane address - if !manet.IsIPUnspecified(extMaddr) { - // Add in the mapped addr. - finalAddrs = append(finalAddrs, extMaddr) - } else { - log.Warn("NAT device reported an unspecified IP as it's external address") - } - - // Did the router give us a routable public addr? - if manet.IsPublicAddr(extMaddr) { - // well done - continue - } - - // No. - // in case the router gives us a wrong address or we're behind a double-NAT. - // also add observed addresses - resolved, err := manet.ResolveUnspecifiedAddress(listen, a.ifaceAddrs.All()) - if err != nil { - // This can happen if we try to resolve /ip6/::/... - // without any IPv6 interface addresses. - continue - } - - for _, addr := range resolved { - // Now, check if we have any observed addresses that - // differ from the one reported by the router. Routers - // don't always give the most accurate information. - observed := a.ids.ObservedAddrsFor(addr) - - if len(observed) == 0 { - continue - } - - // Drop the IP from the external maddr - _, extMaddrNoIP := ma.SplitFirst(extMaddr) - - for _, obsMaddr := range observed { - // Extract a public observed addr. - ip, _ := ma.SplitFirst(obsMaddr) - if ip == nil || !manet.IsPublicAddr(ip) { - continue - } - - finalAddrs = append(finalAddrs, ma.Join(ip, extMaddrNoIP)) - } - } - } - } else { - var observedAddrs []ma.Multiaddr - if a.ids != nil { - observedAddrs = a.ids.OwnObservedAddrs() - } - finalAddrs = append(finalAddrs, observedAddrs...) - } + finalAddrs = a.appendNATAddrs(finalAddrs, listenAddrs) finalAddrs = ma.Unique(finalAddrs) + // Remove /p2p-circuit addresses from the list. - // The p2p-circuit tranport listener reports its address as just /p2p-circuit + // The p2p-circuit transport listener reports its address as just /p2p-circuit // This is useless for dialing. Users need to manage their circuit addresses themselves, // or use AutoRelay. finalAddrs = slices.DeleteFunc(finalAddrs, func(a ma.Multiaddr) bool { @@ -322,6 +251,35 @@ func (a *addressService) AllAddrs() []ma.Multiaddr { return finalAddrs } +func (a *addressService) appendInterfaceAddrs(result []ma.Multiaddr, listenAddrs []ma.Multiaddr) []ma.Multiaddr { + // resolving any unspecified listen addressees to use only the primary + // interface to avoid advertising too many addresses. + if resolved, err := manet.ResolveUnspecifiedAddresses(listenAddrs, a.ifaceAddrs.Filtered()); err != nil { + log.Warnw("failed to resolve listen addrs", "error", err) + } else { + result = append(result, resolved...) + } + result = ma.Unique(result) + return result +} + +func (a *addressService) appendNATAddrs(result []ma.Multiaddr, listenAddrs []ma.Multiaddr) []ma.Multiaddr { + ifaceAddrs := a.ifaceAddrs.All() + // use nat mappings if we have them + if a.natmgr != nil && a.natmgr.HasDiscoveredNAT() { + // we have a NAT device + for _, listen := range listenAddrs { + extMaddr := a.natmgr.GetMapping(listen) + result = appendValidNATAddrs(result, listen, extMaddr, a.observedAddrsService.ObservedAddrsFor, ifaceAddrs) + } + } else { + if a.observedAddrsService != nil { + result = append(result, a.observedAddrsService.OwnObservedAddrs()...) + } + } + return result +} + func (a *addressService) addCertHashes(addrs []ma.Multiaddr) []ma.Multiaddr { // This is a temporary workaround/hack that fixes #2233. Once we have a // proper address pipeline, rework this. See the issue for more context. @@ -333,7 +291,7 @@ func (a *addressService) addCertHashes(addrs []ma.Multiaddr) []ma.Multiaddr { AddCertHashes(m ma.Multiaddr) (ma.Multiaddr, bool) } - s, ok := a.s.(transportForListeninger) + s, ok := a.net.(transportForListeninger) if !ok { return addrs } @@ -400,7 +358,7 @@ func (a *addressService) makeUpdatedAddrEvent(prev, current []ma.Multiaddr) *eve } // Our addresses have changed. Make a new signed peer record. - if !a.disableSignedPeerRecord { + if a.peerRecord != nil { // add signed peer record to the event sr, err := a.peerRecord(current) if err != nil { @@ -536,3 +494,59 @@ func (i *interfaceAddrsCache) updateUnlocked() { } } } + +func getAllPossibleLocalAddrs(listenAddr ma.Multiaddr, ifaceAddrs []ma.Multiaddr) []ma.Multiaddr { + // If the nat mapping fails, use the observed addrs + resolved, err := manet.ResolveUnspecifiedAddress(listenAddr, ifaceAddrs) + if err != nil { + log.Warnf("failed to resolve listen addr %s, %s: %s", listenAddr, ifaceAddrs, err) + return nil + } + return append(resolved, listenAddr) +} + +// appendValidNATAddrs adds the NAT-ed addresses to the result. If the NAT device doesn't provide +// us with a public IP address, we use the observed addresses. +func appendValidNATAddrs(result []ma.Multiaddr, listenAddr ma.Multiaddr, natMapping ma.Multiaddr, + obsAddrsFunc func(ma.Multiaddr) []ma.Multiaddr, + ifaceAddrs []ma.Multiaddr) []ma.Multiaddr { + if natMapping == nil { + allAddrs := getAllPossibleLocalAddrs(listenAddr, ifaceAddrs) + for _, a := range allAddrs { + result = append(result, obsAddrsFunc(a)...) + } + return result + } + + // if the router reported a sane address, use it. + if !manet.IsIPUnspecified(natMapping) { + result = append(result, natMapping) + } else { + log.Warn("NAT device reported an unspecified IP as it's external address") + } + + // If the router gave us a public address, use it and ignore observed addresses + if manet.IsPublicAddr(natMapping) { + return result + } + + // Router gave us a private IP; maybe we're behind a CGNAT. + // See if we have a public IP from observed addresses. + _, extMaddrNoIP := ma.SplitFirst(natMapping) + if extMaddrNoIP == nil { + return result + } + + allAddrs := getAllPossibleLocalAddrs(listenAddr, ifaceAddrs) + for _, addr := range allAddrs { + for _, obsMaddr := range obsAddrsFunc(addr) { + // Extract a public observed addr. + ip, _ := ma.SplitFirst(obsMaddr) + if ip == nil || !manet.IsPublicAddr(ip) { + continue + } + result = append(result, ma.Join(ip, extMaddrNoIP)) + } + } + return result +} diff --git a/p2p/host/basic/address_service_test.go b/p2p/host/basic/address_service_test.go new file mode 100644 index 0000000000..e099ba5737 --- /dev/null +++ b/p2p/host/basic/address_service_test.go @@ -0,0 +1,98 @@ +package basichost + +import ( + "testing" + + ma "github.com/multiformats/go-multiaddr" + manet "github.com/multiformats/go-multiaddr/net" + "github.com/stretchr/testify/require" +) + +func TestAppendNATAddrs(t *testing.T) { + if1, if2 := ma.StringCast("/ip4/192.168.0.100"), ma.StringCast("/ip4/1.1.1.1") + ifaceAddrs := []ma.Multiaddr{if1, if2} + tcpListenAddr, udpListenAddr := ma.StringCast("/ip4/0.0.0.0/tcp/1"), ma.StringCast("/ip4/0.0.0.0/udp/2/quic-v1") + cases := []struct { + Name string + Listen ma.Multiaddr + Nat ma.Multiaddr + ObsAddrFunc func(ma.Multiaddr) []ma.Multiaddr + Expected []ma.Multiaddr + }{ + { + Name: "nat map success", + // nat mapping success, obsaddress ignored + Listen: ma.StringCast("/ip4/0.0.0.0/udp/1/quic-v1"), + Nat: ma.StringCast("/ip4/1.1.1.1/udp/10/quic-v1"), + ObsAddrFunc: func(m ma.Multiaddr) []ma.Multiaddr { + return []ma.Multiaddr{ma.StringCast("/ip4/2.2.2.2/udp/100/quic-v1")} + }, + Expected: []ma.Multiaddr{ma.StringCast("/ip4/1.1.1.1/udp/10/quic-v1")}, + }, + { + Name: "nat map failure", + //nat mapping fails, obs addresses added + Listen: ma.StringCast("/ip4/0.0.0.0/tcp/1"), + Nat: nil, + ObsAddrFunc: func(a ma.Multiaddr) []ma.Multiaddr { + ip, _ := ma.SplitFirst(a) + if ip == nil { + return nil + } + if ip.Equal(if1) { + return []ma.Multiaddr{ma.StringCast("/ip4/2.2.2.2/tcp/100")} + } else { + return []ma.Multiaddr{ma.StringCast("/ip4/3.3.3.3/tcp/100")} + } + }, + Expected: []ma.Multiaddr{ma.StringCast("/ip4/2.2.2.2/tcp/100"), ma.StringCast("/ip4/3.3.3.3/tcp/100")}, + }, + { + Name: "nat map success but CGNAT", + //nat addr added, obs address added with nat provided port + Listen: tcpListenAddr, + Nat: ma.StringCast("/ip4/100.100.1.1/tcp/100"), + ObsAddrFunc: func(a ma.Multiaddr) []ma.Multiaddr { + ip, _ := ma.SplitFirst(a) + if ip == nil { + return nil + } + if ip.Equal(if1) { + return []ma.Multiaddr{ma.StringCast("/ip4/2.2.2.2/tcp/20")} + } else { + return []ma.Multiaddr{ma.StringCast("/ip4/3.3.3.3/tcp/30")} + } + }, + Expected: []ma.Multiaddr{ + ma.StringCast("/ip4/100.100.1.1/tcp/100"), + ma.StringCast("/ip4/2.2.2.2/tcp/100"), + ma.StringCast("/ip4/3.3.3.3/tcp/100"), + }, + }, + { + Name: "uses unspecified address for obs address", + // observed address manager should be queries with both specified and unspecified addresses + // udp observed addresses are mapped to unspecified addresses + Listen: udpListenAddr, + Nat: nil, + ObsAddrFunc: func(a ma.Multiaddr) []ma.Multiaddr { + if manet.IsIPUnspecified(a) { + return []ma.Multiaddr{ma.StringCast("/ip4/3.3.3.3/udp/20/quic-v1")} + } + return []ma.Multiaddr{ma.StringCast("/ip4/2.2.2.2/udp/20/quic-v1")} + }, + Expected: []ma.Multiaddr{ + ma.StringCast("/ip4/2.2.2.2/udp/20/quic-v1"), + ma.StringCast("/ip4/3.3.3.3/udp/20/quic-v1"), + }, + }, + } + for _, tc := range cases { + t.Run(tc.Name, func(t *testing.T) { + res := appendValidNATAddrs(nil, + tc.Listen, tc.Nat, tc.ObsAddrFunc, ifaceAddrs) + res = ma.Unique(res) + require.ElementsMatch(t, tc.Expected, res, "%s\n%s", tc.Expected, res) + }) + } +} diff --git a/p2p/host/basic/basic_host.go b/p2p/host/basic/basic_host.go index 801656c499..61f4e1ebbd 100644 --- a/p2p/host/basic/basic_host.go +++ b/p2p/host/basic/basic_host.go @@ -94,7 +94,6 @@ type BasicHost struct { disableSignedPeerRecord bool signKey crypto.PrivKey - caBook peerstore.CertifiedAddrBook autoNat autonat.AutoNAT From fd761008b19dd7f47bc94e9d042457d94334442f Mon Sep 17 00:00:00 2001 From: sukun Date: Tue, 3 Dec 2024 17:34:09 +0530 Subject: [PATCH 4/5] make everything mockable --- p2p/host/basic/address_service.go | 206 ++++++++----------------- p2p/host/basic/address_service_test.go | 2 +- p2p/host/basic/basic_host.go | 142 +++++++++++++++-- p2p/host/basic/basic_host_test.go | 2 +- 4 files changed, 189 insertions(+), 163 deletions(-) diff --git a/p2p/host/basic/address_service.go b/p2p/host/basic/address_service.go index 72949f79d1..73d359c857 100644 --- a/p2p/host/basic/address_service.go +++ b/p2p/host/basic/address_service.go @@ -2,8 +2,6 @@ package basichost import ( "context" - "errors" - "fmt" "net" "slices" "sync" @@ -11,14 +9,9 @@ import ( "github.com/libp2p/go-libp2p/core/event" "github.com/libp2p/go-libp2p/core/network" - "github.com/libp2p/go-libp2p/core/peer" - "github.com/libp2p/go-libp2p/core/peerstore" "github.com/libp2p/go-libp2p/core/record" "github.com/libp2p/go-libp2p/core/transport" - "github.com/libp2p/go-libp2p/p2p/host/autonat" - "github.com/libp2p/go-libp2p/p2p/host/autorelay" "github.com/libp2p/go-libp2p/p2p/host/basic/internal/backoff" - "github.com/libp2p/go-libp2p/p2p/host/eventbus" libp2pwebrtc "github.com/libp2p/go-libp2p/p2p/transport/webrtc" libp2pwebtransport "github.com/libp2p/go-libp2p/p2p/transport/webtransport" "github.com/libp2p/go-netroute" @@ -34,19 +27,15 @@ type observedAddrsService interface { } type addressService struct { - net network.Network - peerstore peerstore.Peerstore - id peer.ID - addrsFactory AddrsFactory - // peerRecord is used to create peer records when the addresses change - peerRecord peerRecordFunc - autonat autonat.AutoNAT - autorelay *autorelay.AutoRelay + net network.Network + addrsFactory AddrsFactory natmgr NATManager observedAddrsService observedAddrsService - addrChangeChan chan struct{} - relayAddrsSub event.Subscription - emitter event.Emitter + addrsChangeChan chan struct{} + addrsUpdated chan struct{} + autoRelayAddrsSub event.Subscription + autoRelayAddrs func() []ma.Multiaddr + reachability func() network.Reachability ifaceAddrs *interfaceAddrsCache wg sync.WaitGroup ctx context.Context @@ -55,10 +44,6 @@ type addressService struct { func NewAddressService(h *BasicHost, natmgr func(network.Network) NATManager, addrFactory AddrsFactory) (*addressService, error) { - emitter, err := h.eventbus.Emitter(&event.EvtLocalAddressesUpdated{}, eventbus.Stateful) - if err != nil { - return nil, err - } var nmgr NATManager if natmgr != nil { nmgr = natmgr(h.Network()) @@ -67,47 +52,35 @@ func NewAddressService(h *BasicHost, natmgr func(network.Network) NATManager, if err != nil { return nil, err } - peerRecord := h.makeSignedPeerRecord - if !h.disableSignedPeerRecord { - peerRecord = nil + + var autoRelayAddrs func() []ma.Multiaddr + if h.autorelay != nil { + autoRelayAddrs = h.autorelay.RelayAddrs } + ctx, cancel := context.WithCancel(context.Background()) as := &addressService{ net: h.Network(), - peerstore: h.Peerstore(), observedAddrsService: h.IDService(), - id: h.ID(), - peerRecord: peerRecord, natmgr: nmgr, - emitter: emitter, - autorelay: h.autorelay, addrsFactory: addrFactory, - addrChangeChan: make(chan struct{}, 1), - relayAddrsSub: addrSub, - ctx: ctx, - ctxCancel: cancel, + addrsChangeChan: make(chan struct{}, 1), + addrsUpdated: make(chan struct{}, 1), + autoRelayAddrsSub: addrSub, + autoRelayAddrs: autoRelayAddrs, ifaceAddrs: &interfaceAddrsCache{}, - } - if as.peerRecord != nil { - cab, ok := peerstore.GetCertifiedAddrBook(as.peerstore) - if !ok { - return nil, errors.New("peerstore should also be a certified address book") - } - rec, err := as.peerRecord(as.Addrs()) - if err != nil { - return nil, fmt.Errorf("failed to create signed record for self: %w", err) - } - if _, err := cab.ConsumePeerRecord(rec, peerstore.PermanentAddrTTL); err != nil { - return nil, fmt.Errorf("failed to persist signed record to peerstore: %w", err) - } + reachability: func() network.Reachability { + if h.GetAutoNat() != nil { + return h.GetAutoNat().Status() + } + return network.ReachabilityUnknown + }, + ctx: ctx, + ctxCancel: cancel, } return as, nil } -func (a *addressService) SetAutoNAT(an autonat.AutoNAT) { - a.autonat = an -} - func (a *addressService) Start() { a.wg.Add(1) go a.background() @@ -122,11 +95,7 @@ func (a *addressService) Close() { log.Warnf("error closing natmgr: %s", err) } } - err := a.emitter.Close() - if err != nil { - log.Warnf("error closing addrs update emitter: %s", err) - } - err = a.relayAddrsSub.Close() + err := a.autoRelayAddrsSub.Close() if err != nil { log.Warnf("error closing addrs update emitter: %s", err) } @@ -134,61 +103,36 @@ func (a *addressService) Close() { func (a *addressService) SignalAddressChange() { select { - case a.addrChangeChan <- struct{}{}: + case a.addrsChangeChan <- struct{}{}: default: } } +func (a *addressService) AddrsUpdated() chan struct{} { + return a.addrsUpdated +} + func (a *addressService) background() { defer a.wg.Done() - var lastAddrs []ma.Multiaddr - emitAddrChange := func(currentAddrs []ma.Multiaddr, lastAddrs []ma.Multiaddr) { - changeEvt := a.makeUpdatedAddrEvent(lastAddrs, currentAddrs) - if changeEvt == nil { - return - } - // Our addresses have changed. - // store the signed peer record in the peer store. - if a.peerRecord != nil { - cabook, ok := peerstore.GetCertifiedAddrBook(a.peerstore) - if !ok { - log.Errorf("peerstore doesn't implement certified address book") - return - } - if _, err := cabook.ConsumePeerRecord(changeEvt.SignedPeerRecord, peerstore.PermanentAddrTTL); err != nil { - log.Errorf("failed to persist signed peer record in peer store, err=%s", err) - return - } - } - // update host addresses in the peer store - removedAddrs := make([]ma.Multiaddr, 0, len(changeEvt.Removed)) - for _, ua := range changeEvt.Removed { - removedAddrs = append(removedAddrs, ua.Address) - } - a.peerstore.SetAddrs(a.id, currentAddrs, peerstore.PermanentAddrTTL) - a.peerstore.SetAddrs(a.id, removedAddrs, 0) - - // emit addr change event - if err := a.emitter.Emit(*changeEvt); err != nil { - log.Warnf("error emitting event for updated addrs: %s", err) - } - } + var prev []ma.Multiaddr - // periodically schedules an IdentifyPush to update our peers for changes - // in our address set (if needed) ticker := time.NewTicker(addrChangeTickrInterval) defer ticker.Stop() - for { curr := a.Addrs() - emitAddrChange(curr, lastAddrs) - lastAddrs = curr + if a.areAddrsDifferent(prev, curr) { + select { + case a.addrsUpdated <- struct{}{}: + default: + } + } + prev = curr select { case <-ticker.C: - case <-a.addrChangeChan: - case <-a.relayAddrsSub.Out(): + case <-a.addrsChangeChan: + case <-a.autoRelayAddrsSub.Out(): case <-a.ctx.Done(): return } @@ -201,9 +145,9 @@ func (a *addressService) background() { func (a *addressService) Addrs() []ma.Multiaddr { addrs := a.AllAddrs() // Delete public addresses if the node's reachability is private, and we have autorelay. - if a.autonat != nil && a.autorelay != nil && a.autonat.Status() == network.ReachabilityPrivate { + if a.reachability() == network.ReachabilityPrivate && a.autoRelayAddrs != nil { addrs = slices.DeleteFunc(addrs, func(a ma.Multiaddr) bool { return manet.IsPublicAddr(a) }) - addrs = append(addrs, a.autorelay.RelayAddrs()...) + addrs = append(addrs, a.autoRelayAddrs()...) } // Make a copy. Consumers can modify the slice elements addrs = slices.Clone(a.addrsFactory(addrs)) @@ -245,6 +189,7 @@ func (a *addressService) AllAddrs() []ma.Multiaddr { finalAddrs = slices.DeleteFunc(finalAddrs, func(a ma.Multiaddr) bool { return a.Equal(p2pCircuitAddr) }) + // Add certhashes for /webrtc-direct, /webtransport, etc addresses discovered // using identify. finalAddrs = a.addCertHashes(finalAddrs) @@ -270,7 +215,7 @@ func (a *addressService) appendNATAddrs(result []ma.Multiaddr, listenAddrs []ma. // we have a NAT device for _, listen := range listenAddrs { extMaddr := a.natmgr.GetMapping(listen) - result = appendValidNATAddrs(result, listen, extMaddr, a.observedAddrsService.ObservedAddrsFor, ifaceAddrs) + result = appendNATAddrsForListenAddrs(result, listen, extMaddr, a.observedAddrsService.ObservedAddrsFor, ifaceAddrs) } } else { if a.observedAddrsService != nil { @@ -321,55 +266,26 @@ func (a *addressService) addCertHashes(addrs []ma.Multiaddr) []ma.Multiaddr { return addrs } -func (a *addressService) makeUpdatedAddrEvent(prev, current []ma.Multiaddr) *event.EvtLocalAddressesUpdated { - if prev == nil && current == nil { - return nil +func (a *addressService) areAddrsDifferent(prev, current []ma.Multiaddr) bool { + prevmap := make(map[string]struct{}) + currmap := make(map[string]struct{}) + for _, p := range prev { + prevmap[string(p.Bytes())] = struct{}{} } - prevmap := make(map[string]ma.Multiaddr, len(prev)) - currmap := make(map[string]ma.Multiaddr, len(current)) - evt := &event.EvtLocalAddressesUpdated{Diffs: true} - addrsAdded := false - - for _, addr := range prev { - prevmap[string(addr.Bytes())] = addr - } - for _, addr := range current { - currmap[string(addr.Bytes())] = addr - } - for _, addr := range currmap { - _, ok := prevmap[string(addr.Bytes())] - updated := event.UpdatedAddress{Address: addr} - if ok { - updated.Action = event.Maintained - } else { - updated.Action = event.Added - addrsAdded = true - } - evt.Current = append(evt.Current, updated) - delete(prevmap, string(addr.Bytes())) + for _, c := range current { + currmap[string(c.Bytes())] = struct{}{} } - for _, addr := range prevmap { - updated := event.UpdatedAddress{Action: event.Removed, Address: addr} - evt.Removed = append(evt.Removed, updated) - } - - if !addrsAdded && len(evt.Removed) == 0 { - return nil + for p := range prevmap { + if _, ok := currmap[p]; !ok { + return true + } } - - // Our addresses have changed. Make a new signed peer record. - if a.peerRecord != nil { - // add signed peer record to the event - sr, err := a.peerRecord(current) - if err != nil { - log.Errorf("error creating a signed peer record from the set of current addresses, err=%s", err) - // drop this change - return nil + for c := range currmap { + if _, ok := prevmap[c]; !ok { + return true } - evt.SignedPeerRecord = sr } - - return evt + return false } const ifaceAddrsTTL = time.Minute @@ -505,9 +421,9 @@ func getAllPossibleLocalAddrs(listenAddr ma.Multiaddr, ifaceAddrs []ma.Multiaddr return append(resolved, listenAddr) } -// appendValidNATAddrs adds the NAT-ed addresses to the result. If the NAT device doesn't provide +// appendNATAddrsForListenAddrs adds the NAT-ed addresses to the result. If the NAT device doesn't provide // us with a public IP address, we use the observed addresses. -func appendValidNATAddrs(result []ma.Multiaddr, listenAddr ma.Multiaddr, natMapping ma.Multiaddr, +func appendNATAddrsForListenAddrs(result []ma.Multiaddr, listenAddr ma.Multiaddr, natMapping ma.Multiaddr, obsAddrsFunc func(ma.Multiaddr) []ma.Multiaddr, ifaceAddrs []ma.Multiaddr) []ma.Multiaddr { if natMapping == nil { diff --git a/p2p/host/basic/address_service_test.go b/p2p/host/basic/address_service_test.go index e099ba5737..6ee611c8a2 100644 --- a/p2p/host/basic/address_service_test.go +++ b/p2p/host/basic/address_service_test.go @@ -89,7 +89,7 @@ func TestAppendNATAddrs(t *testing.T) { } for _, tc := range cases { t.Run(tc.Name, func(t *testing.T) { - res := appendValidNATAddrs(nil, + res := appendNATAddrsForListenAddrs(nil, tc.Listen, tc.Nat, tc.ObsAddrFunc, ifaceAddrs) res = ma.Unique(res) require.ElementsMatch(t, tc.Expected, res, "%s\n%s", tc.Expected, res) diff --git a/p2p/host/basic/basic_host.go b/p2p/host/basic/basic_host.go index 61f4e1ebbd..ac7cb9bd0f 100644 --- a/p2p/host/basic/basic_host.go +++ b/p2p/host/basic/basic_host.go @@ -81,8 +81,6 @@ type BasicHost struct { eventbus event.Bus relayManager *relaysvc.RelayManager - AddrsFactory AddrsFactory - negtimeout time.Duration emitters struct { @@ -90,12 +88,11 @@ type BasicHost struct { evtLocalAddrsUpdated event.Emitter } - addrMu sync.RWMutex - disableSignedPeerRecord bool signKey crypto.PrivKey - autoNat autonat.AutoNAT + autoNATMx sync.RWMutex + autoNat autonat.AutoNAT autonatv2 *autonatv2.AutoNAT autorelay *autorelay.AutoRelay @@ -195,6 +192,9 @@ func NewHost(n network.Network, opts *HostOpts) (*BasicHost, error) { if h.emitters.evtLocalProtocolsUpdated, err = h.eventbus.Emitter(&event.EvtLocalProtocolsUpdated{}, eventbus.Stateful); err != nil { return nil, err } + if h.emitters.evtLocalAddrsUpdated, err = h.eventbus.Emitter(&event.EvtLocalAddressesUpdated{}, eventbus.Stateful); err != nil { + return nil, err + } if !opts.DisableSignedPeerRecord { h.signKey = h.Peerstore().PrivKey(h.ID()) @@ -304,10 +304,24 @@ func NewHost(n network.Network, opts *HostOpts) (*BasicHost, error) { n.SetStreamHandler(h.newStreamHandler) + if !h.disableSignedPeerRecord { + cab, ok := peerstore.GetCertifiedAddrBook(h.Peerstore()) + if !ok { + return nil, errors.New("peerstore should also be a certified address book") + } + rec, err := h.makeSignedPeerRecord(h.addressService.Addrs()) + if err != nil { + return nil, fmt.Errorf("failed to create signed record for self: %w", err) + } + if _, err := cab.ConsumePeerRecord(rec, peerstore.PermanentAddrTTL); err != nil { + return nil, fmt.Errorf("failed to persist signed record to peerstore: %w", err) + } + } + // register to be notified when the network's listen addrs change, // so we can update our address set and push events if needed listenHandler := func(network.Network, ma.Multiaddr) { - h.SignalAddressChange() + h.addressService.SignalAddressChange() } n.Notify(&network.NotifyBundle{ ListenF: listenHandler, @@ -330,7 +344,10 @@ func (h *BasicHost) Start() { log.Errorf("autonat v2 failed to start: %s", err) } } + h.refCount.Add(1) + go h.background() h.addressService.Start() + } // newStreamHandler is the remote-opened stream handler for network.Network @@ -381,11 +398,105 @@ func (h *BasicHost) newStreamHandler(s network.Stream) { handle(protoID, s) } -// SignalAddressChange signals to the host that it needs to determine whether our listen addresses have recently -// changed. -// Warning: this interface is unstable and may disappear in the future. -func (h *BasicHost) SignalAddressChange() { - h.addressService.SignalAddressChange() +func (h *BasicHost) background() { + defer h.refCount.Done() + var lastAddrs []ma.Multiaddr + + // TODO: Deprecate this event and logic + emitAddrChange := func(currentAddrs []ma.Multiaddr, lastAddrs []ma.Multiaddr) { + changeEvt := h.makeUpdatedAddrEvent(lastAddrs, currentAddrs) + if changeEvt == nil { + return + } + // Our addresses have changed. + // store the signed peer record in the peer store. + if !h.disableSignedPeerRecord { + cabook, ok := peerstore.GetCertifiedAddrBook(h.Peerstore()) + if !ok { + log.Errorf("peerstore doesn't implement certified address book") + return + } + if _, err := cabook.ConsumePeerRecord(changeEvt.SignedPeerRecord, peerstore.PermanentAddrTTL); err != nil { + log.Errorf("failed to persist signed peer record in peer store, err=%s", err) + return + } + } + // update host addresses in the peer store + removedAddrs := make([]ma.Multiaddr, 0, len(changeEvt.Removed)) + for _, ua := range changeEvt.Removed { + removedAddrs = append(removedAddrs, ua.Address) + } + h.Peerstore().SetAddrs(h.ID(), currentAddrs, peerstore.PermanentAddrTTL) + h.Peerstore().SetAddrs(h.ID(), removedAddrs, 0) + + // emit addr change event + if err := h.emitters.evtLocalAddrsUpdated.Emit(*changeEvt); err != nil { + log.Warnf("error emitting event for updated addrs: %s", err) + } + } + + for { + curr := h.Addrs() + emitAddrChange(curr, lastAddrs) + lastAddrs = curr + + select { + case <-h.addressService.AddrsUpdated(): + case <-h.ctx.Done(): + return + } + } +} + +func (h *BasicHost) makeUpdatedAddrEvent(prev, current []ma.Multiaddr) *event.EvtLocalAddressesUpdated { + if prev == nil && current == nil { + return nil + } + prevmap := make(map[string]ma.Multiaddr, len(prev)) + currmap := make(map[string]ma.Multiaddr, len(current)) + evt := &event.EvtLocalAddressesUpdated{Diffs: true} + addrsAdded := false + + for _, addr := range prev { + prevmap[string(addr.Bytes())] = addr + } + for _, addr := range current { + currmap[string(addr.Bytes())] = addr + } + for _, addr := range currmap { + _, ok := prevmap[string(addr.Bytes())] + updated := event.UpdatedAddress{Address: addr} + if ok { + updated.Action = event.Maintained + } else { + updated.Action = event.Added + addrsAdded = true + } + evt.Current = append(evt.Current, updated) + delete(prevmap, string(addr.Bytes())) + } + for _, addr := range prevmap { + updated := event.UpdatedAddress{Action: event.Removed, Address: addr} + evt.Removed = append(evt.Removed, updated) + } + + if !addrsAdded && len(evt.Removed) == 0 { + return nil + } + + // Our addresses have changed. Make a new signed peer record. + if !h.disableSignedPeerRecord { + // add signed peer record to the event + sr, err := h.makeSignedPeerRecord(current) + if err != nil { + log.Errorf("error creating a signed peer record from the set of current addresses, err=%s", err) + // drop this change + return nil + } + evt.SignedPeerRecord = sr + } + + return evt } func (h *BasicHost) makeSignedPeerRecord(addrs []ma.Multiaddr) (*record.Envelope, error) { @@ -653,18 +764,17 @@ func (h *BasicHost) AllAddrs() []ma.Multiaddr { // SetAutoNat sets the autonat service for the host. func (h *BasicHost) SetAutoNat(a autonat.AutoNAT) { - h.addrMu.Lock() - defer h.addrMu.Unlock() + h.autoNATMx.Lock() + defer h.autoNATMx.Unlock() if h.autoNat == nil { h.autoNat = a } - h.addressService.SetAutoNAT(h.autoNat) } // GetAutoNat returns the host's AutoNAT service, if AutoNAT is enabled. func (h *BasicHost) GetAutoNat() autonat.AutoNAT { - h.addrMu.Lock() - defer h.addrMu.Unlock() + h.autoNATMx.Lock() + defer h.autoNATMx.Unlock() return h.autoNat } diff --git a/p2p/host/basic/basic_host_test.go b/p2p/host/basic/basic_host_test.go index 2d254956d2..dcf174410b 100644 --- a/p2p/host/basic/basic_host_test.go +++ b/p2p/host/basic/basic_host_test.go @@ -734,7 +734,7 @@ func TestHostAddrChangeDetection(t *testing.T) { lk.Lock() currentAddrSet = i lk.Unlock() - h.SignalAddressChange() + h.addressService.SignalAddressChange() evt := waitForAddrChangeEvent(ctx, sub, t) if !updatedAddrEventsEqual(expectedEvents[i-1], evt) { t.Errorf("change events not equal: \n\texpected: %v \n\tactual: %v", expectedEvents[i-1], evt) From 5fcce02b3e16dd35a20e64d26a46ebac1a9190f9 Mon Sep 17 00:00:00 2001 From: sukun Date: Tue, 3 Dec 2024 20:21:18 +0530 Subject: [PATCH 5/5] add tests for present behavior --- p2p/host/basic/address_service.go | 56 ++++---- p2p/host/basic/address_service_test.go | 174 +++++++++++++++++++++++++ p2p/host/basic/basic_host.go | 99 +++++++------- 3 files changed, 251 insertions(+), 78 deletions(-) diff --git a/p2p/host/basic/address_service.go b/p2p/host/basic/address_service.go index 73d359c857..fb1225d76a 100644 --- a/p2p/host/basic/address_service.go +++ b/p2p/host/basic/address_service.go @@ -9,7 +9,6 @@ import ( "github.com/libp2p/go-libp2p/core/event" "github.com/libp2p/go-libp2p/core/network" - "github.com/libp2p/go-libp2p/core/record" "github.com/libp2p/go-libp2p/core/transport" "github.com/libp2p/go-libp2p/p2p/host/basic/internal/backoff" libp2pwebrtc "github.com/libp2p/go-libp2p/p2p/transport/webrtc" @@ -19,8 +18,6 @@ import ( manet "github.com/multiformats/go-multiaddr/net" ) -type peerRecordFunc func([]ma.Multiaddr) (*record.Envelope, error) - type observedAddrsService interface { OwnObservedAddrs() []ma.Multiaddr ObservedAddrsFor(local ma.Multiaddr) []ma.Multiaddr @@ -34,12 +31,13 @@ type addressService struct { addrsChangeChan chan struct{} addrsUpdated chan struct{} autoRelayAddrsSub event.Subscription - autoRelayAddrs func() []ma.Multiaddr - reachability func() network.Reachability - ifaceAddrs *interfaceAddrsCache - wg sync.WaitGroup - ctx context.Context - ctxCancel context.CancelFunc + // There are wrapped in to functions for mocking + autoRelayAddrs func() []ma.Multiaddr + reachability func() network.Reachability + ifaceAddrs *interfaceAddrsCache + wg sync.WaitGroup + ctx context.Context + ctxCancel context.CancelFunc } func NewAddressService(h *BasicHost, natmgr func(network.Network) NATManager, @@ -177,19 +175,22 @@ func (a *addressService) AllAddrs() []ma.Multiaddr { finalAddrs := make([]ma.Multiaddr, 0, 8) finalAddrs = a.appendInterfaceAddrs(finalAddrs, listenAddrs) - - // use nat mappings if we have them finalAddrs = a.appendNATAddrs(finalAddrs, listenAddrs) finalAddrs = ma.Unique(finalAddrs) - // Remove /p2p-circuit addresses from the list. - // The p2p-circuit transport listener reports its address as just /p2p-circuit - // This is useless for dialing. Users need to manage their circuit addresses themselves, + // Remove "/p2p-circuit" addresses from the list. + // The p2p-circuit listener reports its address as just /p2p-circuit. This is + // useless for dialing. Users need to manage their circuit addresses themselves, // or use AutoRelay. finalAddrs = slices.DeleteFunc(finalAddrs, func(a ma.Multiaddr) bool { return a.Equal(p2pCircuitAddr) }) + // Remove any unspecified address from the list + finalAddrs = slices.DeleteFunc(finalAddrs, func(a ma.Multiaddr) bool { + return manet.IsIPUnspecified(a) + }) + // Add certhashes for /webrtc-direct, /webtransport, etc addresses discovered // using identify. finalAddrs = a.addCertHashes(finalAddrs) @@ -208,19 +209,23 @@ func (a *addressService) appendInterfaceAddrs(result []ma.Multiaddr, listenAddrs return result } +// appendNATAddrs appends the NAT-ed addrs for the listenAddrs. For unspecified listen addrs it appends the +// public address for all the interfaces. +// This automatically infers addresses from other transport addresses. For example, it'll infer a webtransport +// address from a quic observed address. +// +// TODO: Merge the natmgr and identify.ObservedAddrManager in to one NatMapper module. func (a *addressService) appendNATAddrs(result []ma.Multiaddr, listenAddrs []ma.Multiaddr) []ma.Multiaddr { ifaceAddrs := a.ifaceAddrs.All() - // use nat mappings if we have them - if a.natmgr != nil && a.natmgr.HasDiscoveredNAT() { - // we have a NAT device - for _, listen := range listenAddrs { - extMaddr := a.natmgr.GetMapping(listen) - result = appendNATAddrsForListenAddrs(result, listen, extMaddr, a.observedAddrsService.ObservedAddrsFor, ifaceAddrs) - } - } else { + if a.natmgr == nil || !a.natmgr.HasDiscoveredNAT() { if a.observedAddrsService != nil { result = append(result, a.observedAddrsService.OwnObservedAddrs()...) } + return result + } + for _, listen := range listenAddrs { + extMaddr := a.natmgr.GetMapping(listen) + result = appendNATAddrsForListenAddrs(result, listen, extMaddr, a.observedAddrsService.ObservedAddrsFor, ifaceAddrs) } return result } @@ -241,11 +246,6 @@ func (a *addressService) addCertHashes(addrs []ma.Multiaddr) []ma.Multiaddr { return addrs } - // Copy addrs slice since we'll be modifying it. - addrsOld := addrs - addrs = make([]ma.Multiaddr, len(addrsOld)) - copy(addrs, addrsOld) - for i, addr := range addrs { wtOK, wtN := libp2pwebtransport.IsWebtransportMultiaddr(addr) webrtcOK, webrtcN := libp2pwebrtc.IsWebRTCDirectMultiaddr(addr) @@ -411,6 +411,8 @@ func (i *interfaceAddrsCache) updateUnlocked() { } } +// getAllPossibleLocalAddrs gives all the possible address returned for `conn.LocalAddr` correspoinding +// to the `listenAddr` func getAllPossibleLocalAddrs(listenAddr ma.Multiaddr, ifaceAddrs []ma.Multiaddr) []ma.Multiaddr { // If the nat mapping fails, use the observed addrs resolved, err := manet.ResolveUnspecifiedAddress(listenAddr, ifaceAddrs) diff --git a/p2p/host/basic/address_service_test.go b/p2p/host/basic/address_service_test.go index 6ee611c8a2..d86cb162e7 100644 --- a/p2p/host/basic/address_service_test.go +++ b/p2p/host/basic/address_service_test.go @@ -2,7 +2,10 @@ package basichost import ( "testing" + "time" + "github.com/libp2p/go-libp2p/core/network" + swarmt "github.com/libp2p/go-libp2p/p2p/net/swarm/testing" ma "github.com/multiformats/go-multiaddr" manet "github.com/multiformats/go-multiaddr/net" "github.com/stretchr/testify/require" @@ -96,3 +99,174 @@ func TestAppendNATAddrs(t *testing.T) { }) } } + +type mockNatManager struct { + GetMappingFunc func(addr ma.Multiaddr) ma.Multiaddr + HasDiscoveredNATFunc func() bool +} + +func (m *mockNatManager) Close() error { + return nil +} + +func (m *mockNatManager) GetMapping(addr ma.Multiaddr) ma.Multiaddr { + return m.GetMappingFunc(addr) +} + +func (m *mockNatManager) HasDiscoveredNAT() bool { + return m.HasDiscoveredNATFunc() +} + +var _ NATManager = &mockNatManager{} + +type mockObservedAddrs struct { + OwnObservedAddrsFunc func() []ma.Multiaddr + ObservedAddrsForFunc func(ma.Multiaddr) []ma.Multiaddr +} + +func (m *mockObservedAddrs) OwnObservedAddrs() []ma.Multiaddr { + return m.OwnObservedAddrsFunc() +} + +func (m *mockObservedAddrs) ObservedAddrsFor(local ma.Multiaddr) []ma.Multiaddr { + return m.ObservedAddrsForFunc(local) +} + +func TestAddressService(t *testing.T) { + getAddrService := func() *addressService { + h, err := NewHost(swarmt.GenSwarm(t), &HostOpts{DisableIdentifyAddressDiscovery: true}) + require.NoError(t, err) + t.Cleanup(func() { h.Close() }) + + as := h.addressService + return as + } + + t.Run("NAT Address", func(t *testing.T) { + as := getAddrService() + as.natmgr = &mockNatManager{ + HasDiscoveredNATFunc: func() bool { return true }, + GetMappingFunc: func(addr ma.Multiaddr) ma.Multiaddr { + if _, err := addr.ValueForProtocol(ma.P_UDP); err == nil { + return ma.StringCast("/ip4/1.2.3.4/udp/1/quic-v1") + } + return nil + }, + } + require.Contains(t, as.Addrs(), ma.StringCast("/ip4/1.2.3.4/udp/1/quic-v1")) + }) + + t.Run("NAT And Observed Address", func(t *testing.T) { + as := getAddrService() + as.natmgr = &mockNatManager{ + HasDiscoveredNATFunc: func() bool { return true }, + GetMappingFunc: func(addr ma.Multiaddr) ma.Multiaddr { + if _, err := addr.ValueForProtocol(ma.P_UDP); err == nil { + return ma.StringCast("/ip4/1.2.3.4/udp/1/quic-v1") + } + return nil + }, + } + as.observedAddrsService = &mockObservedAddrs{ + ObservedAddrsForFunc: func(addr ma.Multiaddr) []ma.Multiaddr { + if _, err := addr.ValueForProtocol(ma.P_TCP); err == nil { + return []ma.Multiaddr{ma.StringCast("/ip4/2.2.2.2/tcp/1")} + } + return nil + }, + } + require.Contains(t, as.Addrs(), ma.StringCast("/ip4/1.2.3.4/udp/1/quic-v1")) + require.Contains(t, as.Addrs(), ma.StringCast("/ip4/2.2.2.2/tcp/1")) + }) + t.Run("Only Observed Address", func(t *testing.T) { + as := getAddrService() + as.natmgr = nil + as.observedAddrsService = &mockObservedAddrs{ + ObservedAddrsForFunc: func(addr ma.Multiaddr) []ma.Multiaddr { + if _, err := addr.ValueForProtocol(ma.P_TCP); err == nil { + return []ma.Multiaddr{ma.StringCast("/ip4/2.2.2.2/tcp/1")} + } + return nil + }, + OwnObservedAddrsFunc: func() []ma.Multiaddr { + return []ma.Multiaddr{ma.StringCast("/ip4/3.3.3.3/udp/1/quic-v1")} + }, + } + require.NotContains(t, as.Addrs(), ma.StringCast("/ip4/2.2.2.2/tcp/1")) + require.Contains(t, as.Addrs(), ma.StringCast("/ip4/3.3.3.3/udp/1/quic-v1")) + }) + t.Run("Public Addrs Removed When Private", func(t *testing.T) { + as := getAddrService() + as.natmgr = nil + as.observedAddrsService = &mockObservedAddrs{ + OwnObservedAddrsFunc: func() []ma.Multiaddr { + return []ma.Multiaddr{ma.StringCast("/ip4/3.3.3.3/udp/1/quic-v1")} + }, + } + as.reachability = func() network.Reachability { + return network.ReachabilityPrivate + } + relayAddr := ma.StringCast("/ip4/1.2.3.4/udp/1/quic-v1/p2p/QmdXGaeGiVA745XorV1jr11RHxB9z4fqykm6xCUPX1aTJo/p2p-circuit") + as.autoRelayAddrs = func() []ma.Multiaddr { + return []ma.Multiaddr{relayAddr} + } + require.NotContains(t, as.Addrs(), ma.StringCast("/ip4/3.3.3.3/udp/1/quic-v1")) + require.Contains(t, as.Addrs(), relayAddr) + require.Contains(t, as.AllAddrs(), ma.StringCast("/ip4/3.3.3.3/udp/1/quic-v1")) + }) + + t.Run("AddressFactory gets relay addresses", func(t *testing.T) { + as := getAddrService() + as.natmgr = nil + as.observedAddrsService = &mockObservedAddrs{ + OwnObservedAddrsFunc: func() []ma.Multiaddr { + return []ma.Multiaddr{ma.StringCast("/ip4/3.3.3.3/udp/1/quic-v1")} + }, + } + as.reachability = func() network.Reachability { + return network.ReachabilityPrivate + } + relayAddr := ma.StringCast("/ip4/1.2.3.4/udp/1/quic-v1/p2p/QmdXGaeGiVA745XorV1jr11RHxB9z4fqykm6xCUPX1aTJo/p2p-circuit") + as.autoRelayAddrs = func() []ma.Multiaddr { + return []ma.Multiaddr{relayAddr} + } + as.addrsFactory = func(addrs []ma.Multiaddr) []ma.Multiaddr { + for _, a := range addrs { + if a.Equal(relayAddr) { + return []ma.Multiaddr{ma.StringCast("/ip4/3.3.3.3/udp/1/quic-v1")} + } + } + return nil + } + require.Contains(t, as.Addrs(), ma.StringCast("/ip4/3.3.3.3/udp/1/quic-v1")) + require.NotContains(t, as.Addrs(), relayAddr) + }) + + t.Run("updates addresses on signaling", func(t *testing.T) { + as := getAddrService() + as.natmgr = nil + updateChan := make(chan struct{}) + a1 := ma.StringCast("/ip4/1.1.1.1/udp/1/quic-v1") + a2 := ma.StringCast("/ip4/1.1.1.1/tcp/1") + as.addrsFactory = func(addrs []ma.Multiaddr) []ma.Multiaddr { + select { + case <-updateChan: + return []ma.Multiaddr{a2} + default: + return []ma.Multiaddr{a1} + } + } + as.Start() + require.Contains(t, as.Addrs(), a1) + require.NotContains(t, as.Addrs(), a2) + close(updateChan) + as.SignalAddressChange() + select { + case <-as.AddrsUpdated(): + require.Contains(t, as.Addrs(), a2) + require.NotContains(t, as.Addrs(), a1) + case <-time.After(2 * time.Second): + t.Fatal("expected addrs to be updated") + } + }) +} diff --git a/p2p/host/basic/basic_host.go b/p2p/host/basic/basic_host.go index ac7cb9bd0f..404949a287 100644 --- a/p2p/host/basic/basic_host.go +++ b/p2p/host/basic/basic_host.go @@ -90,6 +90,7 @@ type BasicHost struct { disableSignedPeerRecord bool signKey crypto.PrivKey + caBook peerstore.CertifiedAddrBook autoNATMx sync.RWMutex autoNat autonat.AutoNAT @@ -309,11 +310,12 @@ func NewHost(n network.Network, opts *HostOpts) (*BasicHost, error) { if !ok { return nil, errors.New("peerstore should also be a certified address book") } + h.caBook = cab rec, err := h.makeSignedPeerRecord(h.addressService.Addrs()) if err != nil { return nil, fmt.Errorf("failed to create signed record for self: %w", err) } - if _, err := cab.ConsumePeerRecord(rec, peerstore.PermanentAddrTTL); err != nil { + if _, err := h.caBook.ConsumePeerRecord(rec, peerstore.PermanentAddrTTL); err != nil { return nil, fmt.Errorf("failed to persist signed record to peerstore: %w", err) } } @@ -398,56 +400,6 @@ func (h *BasicHost) newStreamHandler(s network.Stream) { handle(protoID, s) } -func (h *BasicHost) background() { - defer h.refCount.Done() - var lastAddrs []ma.Multiaddr - - // TODO: Deprecate this event and logic - emitAddrChange := func(currentAddrs []ma.Multiaddr, lastAddrs []ma.Multiaddr) { - changeEvt := h.makeUpdatedAddrEvent(lastAddrs, currentAddrs) - if changeEvt == nil { - return - } - // Our addresses have changed. - // store the signed peer record in the peer store. - if !h.disableSignedPeerRecord { - cabook, ok := peerstore.GetCertifiedAddrBook(h.Peerstore()) - if !ok { - log.Errorf("peerstore doesn't implement certified address book") - return - } - if _, err := cabook.ConsumePeerRecord(changeEvt.SignedPeerRecord, peerstore.PermanentAddrTTL); err != nil { - log.Errorf("failed to persist signed peer record in peer store, err=%s", err) - return - } - } - // update host addresses in the peer store - removedAddrs := make([]ma.Multiaddr, 0, len(changeEvt.Removed)) - for _, ua := range changeEvt.Removed { - removedAddrs = append(removedAddrs, ua.Address) - } - h.Peerstore().SetAddrs(h.ID(), currentAddrs, peerstore.PermanentAddrTTL) - h.Peerstore().SetAddrs(h.ID(), removedAddrs, 0) - - // emit addr change event - if err := h.emitters.evtLocalAddrsUpdated.Emit(*changeEvt); err != nil { - log.Warnf("error emitting event for updated addrs: %s", err) - } - } - - for { - curr := h.Addrs() - emitAddrChange(curr, lastAddrs) - lastAddrs = curr - - select { - case <-h.addressService.AddrsUpdated(): - case <-h.ctx.Done(): - return - } - } -} - func (h *BasicHost) makeUpdatedAddrEvent(prev, current []ma.Multiaddr) *event.EvtLocalAddressesUpdated { if prev == nil && current == nil { return nil @@ -515,6 +467,51 @@ func (h *BasicHost) makeSignedPeerRecord(addrs []ma.Multiaddr) (*record.Envelope return record.Seal(rec, h.signKey) } +func (h *BasicHost) background() { + defer h.refCount.Done() + var lastAddrs []ma.Multiaddr + + // TODO: Deprecate this event and logic once we have a new event for address with reachability + emitAddrChange := func(currentAddrs []ma.Multiaddr, lastAddrs []ma.Multiaddr) { + changeEvt := h.makeUpdatedAddrEvent(lastAddrs, currentAddrs) + if changeEvt == nil { + return + } + // Our addresses have changed. + // store the signed peer record in the peer store. + if !h.disableSignedPeerRecord { + if _, err := h.caBook.ConsumePeerRecord(changeEvt.SignedPeerRecord, peerstore.PermanentAddrTTL); err != nil { + log.Errorf("failed to persist signed peer record in peer store, err=%s", err) + return + } + } + // update host addresses in the peer store + removedAddrs := make([]ma.Multiaddr, 0, len(changeEvt.Removed)) + for _, ua := range changeEvt.Removed { + removedAddrs = append(removedAddrs, ua.Address) + } + h.Peerstore().SetAddrs(h.ID(), currentAddrs, peerstore.PermanentAddrTTL) + h.Peerstore().SetAddrs(h.ID(), removedAddrs, 0) + + // emit addr change event + if err := h.emitters.evtLocalAddrsUpdated.Emit(*changeEvt); err != nil { + log.Warnf("error emitting event for updated addrs: %s", err) + } + } + + for { + curr := h.Addrs() + emitAddrChange(curr, lastAddrs) + lastAddrs = curr + + select { + case <-h.addressService.AddrsUpdated(): + case <-h.ctx.Done(): + return + } + } +} + // ID returns the (local) peer.ID associated with this Host func (h *BasicHost) ID() peer.ID { return h.Network().LocalPeer()