diff --git a/fx_options_test.go b/fx_options_test.go index 48ac79b53d..67d5b8cb0f 100644 --- a/fx_options_test.go +++ b/fx_options_test.go @@ -17,7 +17,9 @@ func TestGetPeerID(t *testing.T) { WithFxOption(fx.Populate(&id)), ) require.NoError(t, err) - defer host.Close() + defer func() { + host.Close() + }() require.Equal(t, host.ID(), id) diff --git a/libp2p_test.go b/libp2p_test.go index 3de82946d8..38759acb46 100644 --- a/libp2p_test.go +++ b/libp2p_test.go @@ -485,6 +485,7 @@ func TestHostAddrsFactoryAddsCerthashes(t *testing.T) { require.NoError(t, err) require.Eventually(t, func() bool { addrs := h.Addrs() + fmt.Println(addrs) for _, a := range addrs { first, last := ma.SplitFunc(a, func(c ma.Component) bool { return c.Protocol().Code == ma.P_CERTHASH diff --git a/p2p/host/basic/address_service.go b/p2p/host/basic/address_service.go new file mode 100644 index 0000000000..6c3a021f1c --- /dev/null +++ b/p2p/host/basic/address_service.go @@ -0,0 +1,551 @@ +package basichost + +import ( + "context" + "errors" + "fmt" + "net" + "slices" + "sync" + "time" + + "github.com/libp2p/go-libp2p/core/event" + "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/peerstore" + "github.com/libp2p/go-libp2p/core/record" + "github.com/libp2p/go-libp2p/core/transport" + "github.com/libp2p/go-libp2p/p2p/host/autonat" + "github.com/libp2p/go-libp2p/p2p/host/autorelay" + "github.com/libp2p/go-libp2p/p2p/host/basic/internal/backoff" + "github.com/libp2p/go-libp2p/p2p/host/eventbus" + "github.com/libp2p/go-libp2p/p2p/protocol/identify" + libp2pwebrtc "github.com/libp2p/go-libp2p/p2p/transport/webrtc" + libp2pwebtransport "github.com/libp2p/go-libp2p/p2p/transport/webtransport" + "github.com/libp2p/go-netroute" + ma "github.com/multiformats/go-multiaddr" + manet "github.com/multiformats/go-multiaddr/net" +) + +type peerRecordFunc func([]ma.Multiaddr) (*record.Envelope, error) + +type addressService struct { + s network.Network + natmgr NATManager + ids identify.IDService + peerRecord peerRecordFunc + disableSignedPeerRecord bool + peerstore peerstore.Peerstore + id peer.ID + autonat autonat.AutoNAT + emitter event.Emitter + autorelay *autorelay.AutoRelay + addrsFactory AddrsFactory + addrChangeChan chan struct{} + ctx context.Context + ctxCancel context.CancelFunc + + wg sync.WaitGroup + updateLocalIPv4Backoff backoff.ExpBackoff + updateLocalIPv6Backoff backoff.ExpBackoff + + mx sync.RWMutex + allInterfaceAddrs []ma.Multiaddr + filteredInterfaceAddrs []ma.Multiaddr +} + +func NewAddressService(h *BasicHost, natmgr func(network.Network) NATManager, + addrFactory AddrsFactory) (*addressService, error) { + emitter, err := h.eventbus.Emitter(&event.EvtLocalAddressesUpdated{}, eventbus.Stateful) + if err != nil { + return nil, err + } + var nmgr NATManager + if natmgr != nil { + nmgr = natmgr(h.Network()) + } + ctx, cancel := context.WithCancel(context.Background()) + as := &addressService{ + s: h.Network(), + ids: h.IDService(), + peerRecord: h.makeSignedPeerRecord, + disableSignedPeerRecord: h.disableSignedPeerRecord, + peerstore: h.Peerstore(), + id: h.ID(), + natmgr: nmgr, + emitter: emitter, + autorelay: h.autorelay, + addrsFactory: addrFactory, + addrChangeChan: make(chan struct{}, 1), + ctx: ctx, + ctxCancel: cancel, + } + as.updateLocalIpAddr() + if !as.disableSignedPeerRecord { + cab, ok := peerstore.GetCertifiedAddrBook(as.peerstore) + if !ok { + return nil, errors.New("peerstore should also be a certified address book") + } + h.caBook = cab + rec, err := as.peerRecord(as.Addrs()) + if err != nil { + return nil, fmt.Errorf("failed to create signed record for self: %w", err) + } + if _, err := cab.ConsumePeerRecord(rec, peerstore.PermanentAddrTTL); err != nil { + return nil, fmt.Errorf("failed to persist signed record to peerstore: %w", err) + } + } + return as, nil +} + +func (a *addressService) SetAutoNAT(an autonat.AutoNAT) { + a.autonat = an +} + +func (a *addressService) Start() { + a.wg.Add(1) + go a.background() +} + +func (a *addressService) Close() { + a.ctxCancel() + a.wg.Wait() + if a.natmgr != nil { + err := a.natmgr.Close() + if err != nil { + log.Warnf("error closing natmgr: %s", err) + } + } + err := a.emitter.Close() + if err != nil { + log.Warnf("error closing addrs update emitter: %s", err) + } +} + +func (a *addressService) SignalAddressChange() { + select { + case a.addrChangeChan <- struct{}{}: + default: + } + +} + +func (a *addressService) background() { + defer a.wg.Done() + var lastAddrs []ma.Multiaddr + + emitAddrChange := func(currentAddrs []ma.Multiaddr, lastAddrs []ma.Multiaddr) { + changeEvt := a.makeUpdatedAddrEvent(lastAddrs, currentAddrs) + if changeEvt == nil { + return + } + // Our addresses have changed. + // store the signed peer record in the peer store. + if !a.disableSignedPeerRecord { + cabook, ok := peerstore.GetCertifiedAddrBook(a.peerstore) + if !ok { + log.Errorf("peerstore doesn't implement certified address book") + return + } + if _, err := cabook.ConsumePeerRecord(changeEvt.SignedPeerRecord, peerstore.PermanentAddrTTL); err != nil { + log.Errorf("failed to persist signed peer record in peer store, err=%s", err) + return + } + } + // update host addresses in the peer store + removedAddrs := make([]ma.Multiaddr, 0, len(changeEvt.Removed)) + for _, ua := range changeEvt.Removed { + removedAddrs = append(removedAddrs, ua.Address) + } + a.peerstore.SetAddrs(a.id, currentAddrs, peerstore.PermanentAddrTTL) + a.peerstore.SetAddrs(a.id, removedAddrs, 0) + + // emit addr change event + if err := a.emitter.Emit(*changeEvt); err != nil { + log.Warnf("error emitting event for updated addrs: %s", err) + } + } + + // periodically schedules an IdentifyPush to update our peers for changes + // in our address set (if needed) + ticker := time.NewTicker(addrChangeTickrInterval) + defer ticker.Stop() + + for { + // Update our local IP addresses before checking our current addresses. + if len(a.s.ListenAddresses()) > 0 { + a.updateLocalIpAddr() + } + curr := a.Addrs() + emitAddrChange(curr, lastAddrs) + lastAddrs = curr + + select { + case <-ticker.C: + case <-a.addrChangeChan: + case <-a.ctx.Done(): + return + } + } +} + +// Addrs returns the node's dialable addresses both public and private. +// If autorealy is enabled and node reachability is private, it returns +// the node's relay addresses and private network addresses. +func (a *addressService) Addrs() []ma.Multiaddr { + addrs := a.AllAddrs() + // Delete public addresses if the node's reachability is private, and we have autorelay. + if a.autonat != nil && a.autorelay != nil && a.autonat.Status() == network.ReachabilityPrivate { + addrs = slices.DeleteFunc(addrs, func(a ma.Multiaddr) bool { return manet.IsPublicAddr(a) }) + addrs = append(addrs, a.autorelay.RelayAddrs()...) + } + // Make a copy. Consumers can modify the slice elements + addrs = slices.Clone(a.addrsFactory(addrs)) + // Add certhashes for the addresses provided by the user via address factory. + return a.addCertHashes(ma.Unique(addrs)) +} + +// GetHolePunchAddrs returns the node's public direct listen addresses. +func (a *addressService) GetHolePunchAddrs() []ma.Multiaddr { + addrs := a.AllAddrs() + addrs = slices.Clone(a.addrsFactory(addrs)) + // AllAddrs may ignore observed addresses in favour of NAT mappings. + // Use both for hole punching. + addrs = append(addrs, a.ids.OwnObservedAddrs()...) + addrs = ma.Unique(addrs) + return slices.DeleteFunc(addrs, func(a ma.Multiaddr) bool { return !manet.IsPublicAddr(a) }) +} + +var p2pCircuitAddr = ma.StringCast("/p2p-circuit") + +// AllAddrs returns all the addresses the host is listening on except circuit addresses. +func (a *addressService) AllAddrs() []ma.Multiaddr { + listenAddrs := a.s.ListenAddresses() + if len(listenAddrs) == 0 { + return nil + } + + a.mx.RLock() + filteredIfaceAddrs := a.filteredInterfaceAddrs + allIfaceAddrs := a.allInterfaceAddrs + a.mx.RUnlock() + + // Iterate over all _unresolved_ listen addresses, resolving our primary + // interface only to avoid advertising too many addresses. + finalAddrs := make([]ma.Multiaddr, 0, 8) + if resolved, err := manet.ResolveUnspecifiedAddresses(listenAddrs, filteredIfaceAddrs); err != nil { + // This can happen if we're listening on no addrs, or listening + // on IPv6 addrs, but only have IPv4 interface addrs. + log.Debugw("failed to resolve listen addrs", "error", err) + } else { + finalAddrs = append(finalAddrs, resolved...) + } + + finalAddrs = ma.Unique(finalAddrs) + + // use nat mappings if we have them + if a.natmgr != nil && a.natmgr.HasDiscoveredNAT() { + // We have successfully mapped ports on our NAT. Use those + // instead of observed addresses (mostly). + // Next, apply this mapping to our addresses. + for _, listen := range listenAddrs { + extMaddr := a.natmgr.GetMapping(listen) + if extMaddr == nil { + // not mapped + continue + } + + // if the router reported a sane address + if !manet.IsIPUnspecified(extMaddr) { + // Add in the mapped addr. + finalAddrs = append(finalAddrs, extMaddr) + } else { + log.Warn("NAT device reported an unspecified IP as it's external address") + } + + // Did the router give us a routable public addr? + if manet.IsPublicAddr(extMaddr) { + // well done + continue + } + + // No. + // in case the router gives us a wrong address or we're behind a double-NAT. + // also add observed addresses + resolved, err := manet.ResolveUnspecifiedAddress(listen, allIfaceAddrs) + if err != nil { + // This can happen if we try to resolve /ip6/::/... + // without any IPv6 interface addresses. + continue + } + + for _, addr := range resolved { + // Now, check if we have any observed addresses that + // differ from the one reported by the router. Routers + // don't always give the most accurate information. + observed := a.ids.ObservedAddrsFor(addr) + + if len(observed) == 0 { + continue + } + + // Drop the IP from the external maddr + _, extMaddrNoIP := ma.SplitFirst(extMaddr) + + for _, obsMaddr := range observed { + // Extract a public observed addr. + ip, _ := ma.SplitFirst(obsMaddr) + if ip == nil || !manet.IsPublicAddr(ip) { + continue + } + + finalAddrs = append(finalAddrs, ma.Join(ip, extMaddrNoIP)) + } + } + } + } else { + var observedAddrs []ma.Multiaddr + if a.ids != nil { + observedAddrs = a.ids.OwnObservedAddrs() + } + finalAddrs = append(finalAddrs, observedAddrs...) + } + finalAddrs = ma.Unique(finalAddrs) + // Remove /p2p-circuit addresses from the list. + // The p2p-circuit tranport listener reports its address as just /p2p-circuit + // This is useless for dialing. Users need to manage their circuit addresses themselves, + // or use AutoRelay. + finalAddrs = slices.DeleteFunc(finalAddrs, func(a ma.Multiaddr) bool { + return a.Equal(p2pCircuitAddr) + }) + // Add certhashes for /webrtc-direct, /webtransport, etc addresses discovered + // using identify. + finalAddrs = a.addCertHashes(finalAddrs) + return finalAddrs +} + +func (a *addressService) updateLocalIpAddr() { + a.mx.Lock() + defer a.mx.Unlock() + + a.filteredInterfaceAddrs = nil + a.allInterfaceAddrs = nil + + // Try to use the default ipv4/6 addresses. + // TODO: Remove this. We should advertise all interface addresses. + if r, err := netroute.New(); err != nil { + log.Debugw("failed to build Router for kernel's routing table", "error", err) + } else { + + var localIPv4 net.IP + var ran bool + err, ran = a.updateLocalIPv4Backoff.Run(func() error { + _, _, localIPv4, err = r.Route(net.IPv4zero) + return err + }) + + if ran && err != nil { + log.Debugw("failed to fetch local IPv4 address", "error", err) + } else if ran && localIPv4.IsGlobalUnicast() { + maddr, err := manet.FromIP(localIPv4) + if err == nil { + a.filteredInterfaceAddrs = append(a.filteredInterfaceAddrs, maddr) + } + } + + var localIPv6 net.IP + err, ran = a.updateLocalIPv6Backoff.Run(func() error { + _, _, localIPv6, err = r.Route(net.IPv6unspecified) + return err + }) + + if ran && err != nil { + log.Debugw("failed to fetch local IPv6 address", "error", err) + } else if ran && localIPv6.IsGlobalUnicast() { + maddr, err := manet.FromIP(localIPv6) + if err == nil { + a.filteredInterfaceAddrs = append(a.filteredInterfaceAddrs, maddr) + } + } + } + + // Resolve the interface addresses + ifaceAddrs, err := manet.InterfaceMultiaddrs() + if err != nil { + // This usually shouldn't happen, but we could be in some kind + // of funky restricted environment. + log.Errorw("failed to resolve local interface addresses", "error", err) + + // Add the loopback addresses to the filtered addrs and use them as the non-filtered addrs. + // Then bail. There's nothing else we can do here. + a.filteredInterfaceAddrs = append(a.filteredInterfaceAddrs, manet.IP4Loopback, manet.IP6Loopback) + a.allInterfaceAddrs = a.filteredInterfaceAddrs + return + } + + for _, addr := range ifaceAddrs { + // Skip link-local addrs, they're mostly useless. + if !manet.IsIP6LinkLocal(addr) { + a.allInterfaceAddrs = append(a.allInterfaceAddrs, addr) + } + } + + // If netroute failed to get us any interface addresses, use all of + // them. + if len(a.filteredInterfaceAddrs) == 0 { + // Add all addresses. + a.filteredInterfaceAddrs = a.allInterfaceAddrs + } else { + // Only add loopback addresses. Filter these because we might + // not _have_ an IPv6 loopback address. + for _, addr := range a.allInterfaceAddrs { + if manet.IsIPLoopback(addr) { + a.filteredInterfaceAddrs = append(a.filteredInterfaceAddrs, addr) + } + } + } +} + +func (a *addressService) addCertHashes(addrs []ma.Multiaddr) []ma.Multiaddr { + // This is a temporary workaround/hack that fixes #2233. Once we have a + // proper address pipeline, rework this. See the issue for more context. + type transportForListeninger interface { + TransportForListening(a ma.Multiaddr) transport.Transport + } + + type addCertHasher interface { + AddCertHashes(m ma.Multiaddr) (ma.Multiaddr, bool) + } + + s, ok := a.s.(transportForListeninger) + if !ok { + return addrs + } + + // Copy addrs slice since we'll be modifying it. + addrsOld := addrs + addrs = make([]ma.Multiaddr, len(addrsOld)) + copy(addrs, addrsOld) + + for i, addr := range addrs { + wtOK, wtN := libp2pwebtransport.IsWebtransportMultiaddr(addr) + webrtcOK, webrtcN := libp2pwebrtc.IsWebRTCDirectMultiaddr(addr) + if (wtOK && wtN == 0) || (webrtcOK && webrtcN == 0) { + t := s.TransportForListening(addr) + tpt, ok := t.(addCertHasher) + if !ok { + continue + } + addrWithCerthash, added := tpt.AddCertHashes(addr) + if !added { + log.Debugf("Couldn't add certhashes to multiaddr: %s", addr) + continue + } + addrs[i] = addrWithCerthash + } + } + return addrs +} + +func (a *addressService) makeUpdatedAddrEvent(prev, current []ma.Multiaddr) *event.EvtLocalAddressesUpdated { + if prev == nil && current == nil { + return nil + } + prevmap := make(map[string]ma.Multiaddr, len(prev)) + currmap := make(map[string]ma.Multiaddr, len(current)) + evt := &event.EvtLocalAddressesUpdated{Diffs: true} + addrsAdded := false + + for _, addr := range prev { + prevmap[string(addr.Bytes())] = addr + } + for _, addr := range current { + currmap[string(addr.Bytes())] = addr + } + for _, addr := range currmap { + _, ok := prevmap[string(addr.Bytes())] + updated := event.UpdatedAddress{Address: addr} + if ok { + updated.Action = event.Maintained + } else { + updated.Action = event.Added + addrsAdded = true + } + evt.Current = append(evt.Current, updated) + delete(prevmap, string(addr.Bytes())) + } + for _, addr := range prevmap { + updated := event.UpdatedAddress{Action: event.Removed, Address: addr} + evt.Removed = append(evt.Removed, updated) + } + + if !addrsAdded && len(evt.Removed) == 0 { + return nil + } + + // Our addresses have changed. Make a new signed peer record. + if !a.disableSignedPeerRecord { + // add signed peer record to the event + sr, err := a.peerRecord(current) + if err != nil { + log.Errorf("error creating a signed peer record from the set of current addresses, err=%s", err) + // drop this change + return nil + } + evt.SignedPeerRecord = sr + } + + return evt +} + +func trimHostAddrList(addrs []ma.Multiaddr, maxSize int) []ma.Multiaddr { + totalSize := 0 + for _, a := range addrs { + totalSize += len(a.Bytes()) + } + if totalSize <= maxSize { + return addrs + } + + score := func(addr ma.Multiaddr) int { + var res int + if manet.IsPublicAddr(addr) { + res |= 1 << 12 + } else if !manet.IsIPLoopback(addr) { + res |= 1 << 11 + } + var protocolWeight int + ma.ForEach(addr, func(c ma.Component) bool { + switch c.Protocol().Code { + case ma.P_QUIC_V1: + protocolWeight = 5 + case ma.P_TCP: + protocolWeight = 4 + case ma.P_WSS: + protocolWeight = 3 + case ma.P_WEBTRANSPORT: + protocolWeight = 2 + case ma.P_WEBRTC_DIRECT: + protocolWeight = 1 + case ma.P_P2P: + return false + } + return true + }) + res |= 1 << protocolWeight + return res + } + + slices.SortStableFunc(addrs, func(a, b ma.Multiaddr) int { + return score(b) - score(a) // b-a for reverse order + }) + totalSize = 0 + for i, a := range addrs { + totalSize += len(a.Bytes()) + if totalSize > maxSize { + addrs = addrs[:i] + break + } + } + return addrs +} diff --git a/p2p/host/basic/basic_host.go b/p2p/host/basic/basic_host.go index 5e2b6c2440..fb555b4545 100644 --- a/p2p/host/basic/basic_host.go +++ b/p2p/host/basic/basic_host.go @@ -5,8 +5,6 @@ import ( "errors" "fmt" "io" - "net" - "slices" "sync" "time" @@ -19,10 +17,8 @@ import ( "github.com/libp2p/go-libp2p/core/peerstore" "github.com/libp2p/go-libp2p/core/protocol" "github.com/libp2p/go-libp2p/core/record" - "github.com/libp2p/go-libp2p/core/transport" "github.com/libp2p/go-libp2p/p2p/host/autonat" "github.com/libp2p/go-libp2p/p2p/host/autorelay" - "github.com/libp2p/go-libp2p/p2p/host/basic/internal/backoff" "github.com/libp2p/go-libp2p/p2p/host/eventbus" "github.com/libp2p/go-libp2p/p2p/host/pstoremanager" "github.com/libp2p/go-libp2p/p2p/host/relaysvc" @@ -35,11 +31,8 @@ import ( libp2pwebtransport "github.com/libp2p/go-libp2p/p2p/transport/webtransport" "github.com/prometheus/client_golang/prometheus" - "github.com/libp2p/go-netroute" - logging "github.com/ipfs/go-log/v2" ma "github.com/multiformats/go-multiaddr" - manet "github.com/multiformats/go-multiaddr/net" msmux "github.com/multiformats/go-multistream" ) @@ -95,13 +88,7 @@ type BasicHost struct { evtLocalAddrsUpdated event.Emitter } - addrChangeChan chan struct{} - - addrMu sync.RWMutex - updateLocalIPv4Backoff backoff.ExpBackoff - updateLocalIPv6Backoff backoff.ExpBackoff - filteredInterfaceAddrs []ma.Multiaddr - allInterfaceAddrs []ma.Multiaddr + addrMu sync.RWMutex disableSignedPeerRecord bool signKey crypto.PrivKey @@ -109,9 +96,10 @@ type BasicHost struct { autoNat autonat.AutoNAT - autonatv2 *autonatv2.AutoNAT - addrSub event.Subscription - autorelay *autorelay.AutoRelay + autonatv2 *autonatv2.AutoNAT + addrSub event.Subscription + autorelay *autorelay.AutoRelay + addressService *addressService } var _ host.Host = (*BasicHost)(nil) @@ -196,54 +184,26 @@ func NewHost(n network.Network, opts *HostOpts) (*BasicHost, error) { if err != nil { return nil, err } + hostCtx, cancel := context.WithCancel(context.Background()) h := &BasicHost{ network: n, psManager: psManager, mux: msmux.NewMultistreamMuxer[protocol.ID](), negtimeout: DefaultNegotiationTimeout, - AddrsFactory: DefaultAddrsFactory, eventbus: opts.EventBus, - addrChangeChan: make(chan struct{}, 1), ctx: hostCtx, ctxCancel: cancel, disableSignedPeerRecord: opts.DisableSignedPeerRecord, addrSub: addrSub, } - h.updateLocalIpAddr() - if h.emitters.evtLocalProtocolsUpdated, err = h.eventbus.Emitter(&event.EvtLocalProtocolsUpdated{}, eventbus.Stateful); err != nil { return nil, err } - if h.emitters.evtLocalAddrsUpdated, err = h.eventbus.Emitter(&event.EvtLocalAddressesUpdated{}, eventbus.Stateful); err != nil { - return nil, err - } - - if !h.disableSignedPeerRecord { - cab, ok := peerstore.GetCertifiedAddrBook(n.Peerstore()) - if !ok { - return nil, errors.New("peerstore should also be a certified address book") - } - h.caBook = cab + if !opts.DisableSignedPeerRecord { h.signKey = h.Peerstore().PrivKey(h.ID()) - if h.signKey == nil { - return nil, errors.New("unable to access host key") - } - - // persist a signed peer record for self to the peerstore. - rec := peer.PeerRecordFromAddrInfo(peer.AddrInfo{ - ID: h.ID(), - Addrs: h.Addrs(), - }) - ev, err := record.Seal(rec, h.signKey) - if err != nil { - return nil, fmt.Errorf("failed to create signed record for self: %w", err) - } - if _, err := cab.ConsumePeerRecord(ev, peerstore.PermanentAddrTTL); err != nil { - return nil, fmt.Errorf("failed to persist signed record to peerstore: %w", err) - } } if opts.MultistreamMuxer != nil { @@ -273,6 +233,31 @@ func NewHost(n network.Network, opts *HostOpts) (*BasicHost, error) { return nil, fmt.Errorf("failed to create Identify service: %s", err) } + if opts.EnableAutoRelay { + if opts.EnableMetrics { + mt := autorelay.WithMetricsTracer( + autorelay.NewMetricsTracer(autorelay.WithRegisterer(opts.PrometheusRegisterer))) + mtOpts := []autorelay.Option{mt} + opts.AutoRelayOpts = append(mtOpts, opts.AutoRelayOpts...) + } + + ar, err := autorelay.NewAutoRelay(h, opts.AutoRelayOpts...) + if err != nil { + return nil, fmt.Errorf("failed to create autorelay: %w", err) + } + h.autorelay = ar + } + + addrFactory := DefaultAddrsFactory + if opts.AddrsFactory != nil { + addrFactory = opts.AddrsFactory + } + + h.addressService, err = NewAddressService(h, opts.NATManager, addrFactory) + if err != nil { + return nil, fmt.Errorf("failed to create address service: %w", err) + } + if opts.EnableHolePunching { if opts.EnableMetrics { hpOpts := []holepunch.Option{ @@ -280,16 +265,7 @@ func NewHost(n network.Network, opts *HostOpts) (*BasicHost, error) { opts.HolePunchingOptions = append(hpOpts, opts.HolePunchingOptions...) } - h.hps, err = holepunch.NewService(h, h.ids, func() []ma.Multiaddr { - addrs := h.AllAddrs() - if opts.AddrsFactory != nil { - addrs = slices.Clone(opts.AddrsFactory(addrs)) - } - // AllAddrs may ignore observed addresses in favour of NAT mappings. Use both for hole punching. - addrs = append(addrs, h.ids.OwnObservedAddrs()...) - addrs = ma.Unique(addrs) - return slices.DeleteFunc(addrs, func(a ma.Multiaddr) bool { return !manet.IsPublicAddr(a) }) - }, opts.HolePunchingOptions...) + h.hps, err = holepunch.NewService(h, h.ids, h.addressService.GetHolePunchAddrs, opts.HolePunchingOptions...) if err != nil { return nil, fmt.Errorf("failed to create hole punch service: %w", err) } @@ -299,14 +275,6 @@ func NewHost(n network.Network, opts *HostOpts) (*BasicHost, error) { h.negtimeout = opts.NegotiationTimeout } - if opts.AddrsFactory != nil { - h.AddrsFactory = opts.AddrsFactory - } - - if opts.NATManager != nil { - h.natmgr = opts.NATManager(n) - } - if opts.ConnManager == nil { h.cmgr = &connmgr.NullConnMgr{} } else { @@ -340,21 +308,6 @@ func NewHost(n network.Network, opts *HostOpts) (*BasicHost, error) { } } - if opts.EnableAutoRelay { - if opts.EnableMetrics { - mt := autorelay.WithMetricsTracer( - autorelay.NewMetricsTracer(autorelay.WithRegisterer(opts.PrometheusRegisterer))) - mtOpts := []autorelay.Option{mt} - opts.AutoRelayOpts = append(mtOpts, opts.AutoRelayOpts...) - } - - ar, err := autorelay.NewAutoRelay(h, opts.AutoRelayOpts...) - if err != nil { - return nil, fmt.Errorf("failed to create autorelay: %w", err) - } - h.autorelay = ar - } - n.SetStreamHandler(h.newStreamHandler) // register to be notified when the network's listen addrs change, @@ -370,92 +323,9 @@ func NewHost(n network.Network, opts *HostOpts) (*BasicHost, error) { return h, nil } -func (h *BasicHost) updateLocalIpAddr() { - h.addrMu.Lock() - defer h.addrMu.Unlock() - - h.filteredInterfaceAddrs = nil - h.allInterfaceAddrs = nil - - // Try to use the default ipv4/6 addresses. - - if r, err := netroute.New(); err != nil { - log.Debugw("failed to build Router for kernel's routing table", "error", err) - } else { - - var localIPv4 net.IP - var ran bool - err, ran = h.updateLocalIPv4Backoff.Run(func() error { - _, _, localIPv4, err = r.Route(net.IPv4zero) - return err - }) - - if ran && err != nil { - log.Debugw("failed to fetch local IPv4 address", "error", err) - } else if ran && localIPv4.IsGlobalUnicast() { - maddr, err := manet.FromIP(localIPv4) - if err == nil { - h.filteredInterfaceAddrs = append(h.filteredInterfaceAddrs, maddr) - } - } - - var localIPv6 net.IP - err, ran = h.updateLocalIPv6Backoff.Run(func() error { - _, _, localIPv6, err = r.Route(net.IPv6unspecified) - return err - }) - - if ran && err != nil { - log.Debugw("failed to fetch local IPv6 address", "error", err) - } else if ran && localIPv6.IsGlobalUnicast() { - maddr, err := manet.FromIP(localIPv6) - if err == nil { - h.filteredInterfaceAddrs = append(h.filteredInterfaceAddrs, maddr) - } - } - } - - // Resolve the interface addresses - ifaceAddrs, err := manet.InterfaceMultiaddrs() - if err != nil { - // This usually shouldn't happen, but we could be in some kind - // of funky restricted environment. - log.Errorw("failed to resolve local interface addresses", "error", err) - - // Add the loopback addresses to the filtered addrs and use them as the non-filtered addrs. - // Then bail. There's nothing else we can do here. - h.filteredInterfaceAddrs = append(h.filteredInterfaceAddrs, manet.IP4Loopback, manet.IP6Loopback) - h.allInterfaceAddrs = h.filteredInterfaceAddrs - return - } - - for _, addr := range ifaceAddrs { - // Skip link-local addrs, they're mostly useless. - if !manet.IsIP6LinkLocal(addr) { - h.allInterfaceAddrs = append(h.allInterfaceAddrs, addr) - } - } - - // If netroute failed to get us any interface addresses, use all of - // them. - if len(h.filteredInterfaceAddrs) == 0 { - // Add all addresses. - h.filteredInterfaceAddrs = h.allInterfaceAddrs - } else { - // Only add loopback addresses. Filter these because we might - // not _have_ an IPv6 loopback address. - for _, addr := range h.allInterfaceAddrs { - if manet.IsIPLoopback(addr) { - h.filteredInterfaceAddrs = append(h.filteredInterfaceAddrs, addr) - } - } - } -} - // Start starts background tasks in the host func (h *BasicHost) Start() { h.psManager.Start() - h.refCount.Add(1) h.ids.Start() if h.autorelay != nil { h.autorelay.Start() @@ -466,7 +336,7 @@ func (h *BasicHost) Start() { log.Errorf("autonat v2 failed to start: %s", err) } } - go h.background() + h.addressService.Start() } // newStreamHandler is the remote-opened stream handler for network.Network @@ -521,61 +391,7 @@ func (h *BasicHost) newStreamHandler(s network.Stream) { // changed. // Warning: this interface is unstable and may disappear in the future. func (h *BasicHost) SignalAddressChange() { - select { - case h.addrChangeChan <- struct{}{}: - default: - } -} - -func (h *BasicHost) makeUpdatedAddrEvent(prev, current []ma.Multiaddr) *event.EvtLocalAddressesUpdated { - if prev == nil && current == nil { - return nil - } - prevmap := make(map[string]ma.Multiaddr, len(prev)) - currmap := make(map[string]ma.Multiaddr, len(current)) - evt := &event.EvtLocalAddressesUpdated{Diffs: true} - addrsAdded := false - - for _, addr := range prev { - prevmap[string(addr.Bytes())] = addr - } - for _, addr := range current { - currmap[string(addr.Bytes())] = addr - } - for _, addr := range currmap { - _, ok := prevmap[string(addr.Bytes())] - updated := event.UpdatedAddress{Address: addr} - if ok { - updated.Action = event.Maintained - } else { - updated.Action = event.Added - addrsAdded = true - } - evt.Current = append(evt.Current, updated) - delete(prevmap, string(addr.Bytes())) - } - for _, addr := range prevmap { - updated := event.UpdatedAddress{Action: event.Removed, Address: addr} - evt.Removed = append(evt.Removed, updated) - } - - if !addrsAdded && len(evt.Removed) == 0 { - return nil - } - - // Our addresses have changed. Make a new signed peer record. - if !h.disableSignedPeerRecord { - // add signed peer record to the event - sr, err := h.makeSignedPeerRecord(current) - if err != nil { - log.Errorf("error creating a signed peer record from the set of current addresses, err=%s", err) - // drop this change - return nil - } - evt.SignedPeerRecord = sr - } - - return evt + h.addressService.SignalAddressChange() } func (h *BasicHost) makeSignedPeerRecord(addrs []ma.Multiaddr) (*record.Envelope, error) { @@ -594,60 +410,6 @@ func (h *BasicHost) makeSignedPeerRecord(addrs []ma.Multiaddr) (*record.Envelope return record.Seal(rec, h.signKey) } -func (h *BasicHost) background() { - defer h.refCount.Done() - var lastAddrs []ma.Multiaddr - - emitAddrChange := func(currentAddrs []ma.Multiaddr, lastAddrs []ma.Multiaddr) { - changeEvt := h.makeUpdatedAddrEvent(lastAddrs, currentAddrs) - if changeEvt == nil { - return - } - // Our addresses have changed. - // store the signed peer record in the peer store. - if !h.disableSignedPeerRecord { - if _, err := h.caBook.ConsumePeerRecord(changeEvt.SignedPeerRecord, peerstore.PermanentAddrTTL); err != nil { - log.Errorf("failed to persist signed peer record in peer store, err=%s", err) - return - } - } - // update host addresses in the peer store - removedAddrs := make([]ma.Multiaddr, 0, len(changeEvt.Removed)) - for _, ua := range changeEvt.Removed { - removedAddrs = append(removedAddrs, ua.Address) - } - h.Peerstore().SetAddrs(h.ID(), currentAddrs, peerstore.PermanentAddrTTL) - h.Peerstore().SetAddrs(h.ID(), removedAddrs, 0) - - // emit addr change event - if err := h.emitters.evtLocalAddrsUpdated.Emit(*changeEvt); err != nil { - log.Warnf("error emitting event for updated addrs: %s", err) - } - } - - // periodically schedules an IdentifyPush to update our peers for changes - // in our address set (if needed) - ticker := time.NewTicker(addrChangeTickrInterval) - defer ticker.Stop() - - for { - // Update our local IP addresses before checking our current addresses. - if len(h.network.ListenAddresses()) > 0 { - h.updateLocalIpAddr() - } - curr := h.Addrs() - emitAddrChange(curr, lastAddrs) - lastAddrs = curr - - select { - case <-ticker.C: - case <-h.addrChangeChan: - case <-h.ctx.Done(): - return - } - } -} - // ID returns the (local) peer.ID associated with this Host func (h *BasicHost) ID() peer.ID { return h.Network().LocalPeer() @@ -871,15 +633,7 @@ func (h *BasicHost) ConnManager() connmgr.ConnManager { // When used with AutoRelay, and if the host is not publicly reachable, // this will only have host's private, relay, and no public addresses. func (h *BasicHost) Addrs() []ma.Multiaddr { - addrs := h.AllAddrs() - // Make a copy. Consumers can modify the slice elements - if h.autoNat != nil && h.autorelay != nil && h.autoNat.Status() == network.ReachabilityPrivate { - addrs = slices.DeleteFunc(addrs, func(a ma.Multiaddr) bool { return manet.IsPublicAddr(a) }) - addrs = append(addrs, h.autorelay.RelayAddrs()...) - } - addrs = slices.Clone(h.AddrsFactory(addrs)) - // Add certhashes for the addresses provided by the user via address factory. - return h.addCertHashes(ma.Unique(addrs)) + return h.addressService.Addrs() } // NormalizeMultiaddr returns a multiaddr suitable for equality checks. @@ -899,205 +653,9 @@ func (h *BasicHost) NormalizeMultiaddr(addr ma.Multiaddr) ma.Multiaddr { return addr } -var p2pCircuitAddr = ma.StringCast("/p2p-circuit") - // AllAddrs returns all the addresses the host is listening on except circuit addresses. func (h *BasicHost) AllAddrs() []ma.Multiaddr { - listenAddrs := h.Network().ListenAddresses() - if len(listenAddrs) == 0 { - return nil - } - - h.addrMu.RLock() - filteredIfaceAddrs := h.filteredInterfaceAddrs - allIfaceAddrs := h.allInterfaceAddrs - h.addrMu.RUnlock() - - // Iterate over all _unresolved_ listen addresses, resolving our primary - // interface only to avoid advertising too many addresses. - finalAddrs := make([]ma.Multiaddr, 0, 8) - if resolved, err := manet.ResolveUnspecifiedAddresses(listenAddrs, filteredIfaceAddrs); err != nil { - // This can happen if we're listening on no addrs, or listening - // on IPv6 addrs, but only have IPv4 interface addrs. - log.Debugw("failed to resolve listen addrs", "error", err) - } else { - finalAddrs = append(finalAddrs, resolved...) - } - - finalAddrs = ma.Unique(finalAddrs) - - // use nat mappings if we have them - if h.natmgr != nil && h.natmgr.HasDiscoveredNAT() { - // We have successfully mapped ports on our NAT. Use those - // instead of observed addresses (mostly). - // Next, apply this mapping to our addresses. - for _, listen := range listenAddrs { - extMaddr := h.natmgr.GetMapping(listen) - if extMaddr == nil { - // not mapped - continue - } - - // if the router reported a sane address - if !manet.IsIPUnspecified(extMaddr) { - // Add in the mapped addr. - finalAddrs = append(finalAddrs, extMaddr) - } else { - log.Warn("NAT device reported an unspecified IP as it's external address") - } - - // Did the router give us a routable public addr? - if manet.IsPublicAddr(extMaddr) { - // well done - continue - } - - // No. - // in case the router gives us a wrong address or we're behind a double-NAT. - // also add observed addresses - resolved, err := manet.ResolveUnspecifiedAddress(listen, allIfaceAddrs) - if err != nil { - // This can happen if we try to resolve /ip6/::/... - // without any IPv6 interface addresses. - continue - } - - for _, addr := range resolved { - // Now, check if we have any observed addresses that - // differ from the one reported by the router. Routers - // don't always give the most accurate information. - observed := h.ids.ObservedAddrsFor(addr) - - if len(observed) == 0 { - continue - } - - // Drop the IP from the external maddr - _, extMaddrNoIP := ma.SplitFirst(extMaddr) - - for _, obsMaddr := range observed { - // Extract a public observed addr. - ip, _ := ma.SplitFirst(obsMaddr) - if ip == nil || !manet.IsPublicAddr(ip) { - continue - } - - finalAddrs = append(finalAddrs, ma.Join(ip, extMaddrNoIP)) - } - } - } - } else { - var observedAddrs []ma.Multiaddr - if h.ids != nil { - observedAddrs = h.ids.OwnObservedAddrs() - } - finalAddrs = append(finalAddrs, observedAddrs...) - } - finalAddrs = ma.Unique(finalAddrs) - // Remove /p2p-circuit addresses from the list. - // The p2p-circuit tranport listener reports its address as just /p2p-circuit - // This is useless for dialing. Users need to manage their circuit addresses themselves, - // or use AutoRelay. - finalAddrs = slices.DeleteFunc(finalAddrs, func(a ma.Multiaddr) bool { - return a.Equal(p2pCircuitAddr) - }) - // Add certhashes for /webrtc-direct, /webtransport, etc addresses discovered - // using identify. - finalAddrs = h.addCertHashes(finalAddrs) - return finalAddrs -} - -func (h *BasicHost) addCertHashes(addrs []ma.Multiaddr) []ma.Multiaddr { - // This is a temporary workaround/hack that fixes #2233. Once we have a - // proper address pipeline, rework this. See the issue for more context. - type transportForListeninger interface { - TransportForListening(a ma.Multiaddr) transport.Transport - } - - type addCertHasher interface { - AddCertHashes(m ma.Multiaddr) (ma.Multiaddr, bool) - } - - s, ok := h.Network().(transportForListeninger) - if !ok { - return addrs - } - - // Copy addrs slice since we'll be modifying it. - addrsOld := addrs - addrs = make([]ma.Multiaddr, len(addrsOld)) - copy(addrs, addrsOld) - - for i, addr := range addrs { - wtOK, wtN := libp2pwebtransport.IsWebtransportMultiaddr(addr) - webrtcOK, webrtcN := libp2pwebrtc.IsWebRTCDirectMultiaddr(addr) - if (wtOK && wtN == 0) || (webrtcOK && webrtcN == 0) { - t := s.TransportForListening(addr) - tpt, ok := t.(addCertHasher) - if !ok { - continue - } - addrWithCerthash, added := tpt.AddCertHashes(addr) - if !added { - log.Debugf("Couldn't add certhashes to multiaddr: %s", addr) - continue - } - addrs[i] = addrWithCerthash - } - } - return addrs -} - -func trimHostAddrList(addrs []ma.Multiaddr, maxSize int) []ma.Multiaddr { - totalSize := 0 - for _, a := range addrs { - totalSize += len(a.Bytes()) - } - if totalSize <= maxSize { - return addrs - } - - score := func(addr ma.Multiaddr) int { - var res int - if manet.IsPublicAddr(addr) { - res |= 1 << 12 - } else if !manet.IsIPLoopback(addr) { - res |= 1 << 11 - } - var protocolWeight int - ma.ForEach(addr, func(c ma.Component) bool { - switch c.Protocol().Code { - case ma.P_QUIC_V1: - protocolWeight = 5 - case ma.P_TCP: - protocolWeight = 4 - case ma.P_WSS: - protocolWeight = 3 - case ma.P_WEBTRANSPORT: - protocolWeight = 2 - case ma.P_WEBRTC_DIRECT: - protocolWeight = 1 - case ma.P_P2P: - return false - } - return true - }) - res |= 1 << protocolWeight - return res - } - - slices.SortStableFunc(addrs, func(a, b ma.Multiaddr) int { - return score(b) - score(a) // b-a for reverse order - }) - totalSize = 0 - for i, a := range addrs { - totalSize += len(a.Bytes()) - if totalSize > maxSize { - addrs = addrs[:i] - break - } - } - return addrs + return h.addressService.AllAddrs() } // SetAutoNat sets the autonat service for the host. @@ -1107,6 +665,7 @@ func (h *BasicHost) SetAutoNat(a autonat.AutoNAT) { if h.autoNat == nil { h.autoNat = a } + h.addressService.SetAutoNAT(h.autoNat) } // GetAutoNat returns the host's AutoNAT service, if AutoNAT is enabled. @@ -1126,6 +685,9 @@ func (h *BasicHost) Close() error { if h.cmgr != nil { h.cmgr.Close() } + + h.addressService.Close() + if h.ids != nil { h.ids.Close() } @@ -1146,7 +708,6 @@ func (h *BasicHost) Close() error { } _ = h.emitters.evtLocalProtocolsUpdated.Close() - _ = h.emitters.evtLocalAddrsUpdated.Close() if err := h.network.Close(); err != nil { log.Errorf("swarm close failed: %v", err) diff --git a/p2p/host/basic/basic_host_test.go b/p2p/host/basic/basic_host_test.go index 2a7a772976..a9c7ab28fd 100644 --- a/p2p/host/basic/basic_host_test.go +++ b/p2p/host/basic/basic_host_test.go @@ -8,6 +8,7 @@ import ( "reflect" "strings" "sync" + "sync/atomic" "testing" "time" @@ -212,10 +213,10 @@ func TestLocalIPChangesWhenListenAddrChanges(t *testing.T) { h.Start() defer h.Close() - h.addrMu.Lock() - h.filteredInterfaceAddrs = nil - h.allInterfaceAddrs = nil - h.addrMu.Unlock() + h.addressService.mx.Lock() + h.addressService.filteredInterfaceAddrs = nil + h.addressService.allInterfaceAddrs = nil + h.addressService.mx.Unlock() // change listen addrs and verify local IP addr is not nil again require.NoError(t, h.Network().Listen(ma.StringCast("/ip4/0.0.0.0/tcp/0"))) @@ -224,8 +225,8 @@ func TestLocalIPChangesWhenListenAddrChanges(t *testing.T) { h.addrMu.RLock() defer h.addrMu.RUnlock() - require.NotEmpty(t, h.filteredInterfaceAddrs) - require.NotEmpty(t, h.allInterfaceAddrs) + require.NotEmpty(t, h.addressService.filteredInterfaceAddrs) + require.NotEmpty(t, h.addressService.allInterfaceAddrs) } func TestAllAddrs(t *testing.T) { @@ -619,8 +620,13 @@ func TestAddrChangeImmediatelyIfAddressNonEmpty(t *testing.T) { ctx := context.Background() taddrs := []ma.Multiaddr{ma.StringCast("/ip4/1.2.3.4/tcp/1234")} - starting := make(chan struct{}) + starting := make(chan struct{}, 1) + var count atomic.Int32 h, err := NewHost(swarmt.GenSwarm(t), &HostOpts{AddrsFactory: func(addrs []ma.Multiaddr) []ma.Multiaddr { + // The first call here is made from the constructor. Don't block. + if count.Add(1) == 1 { + return addrs + } <-starting return taddrs }}) @@ -628,11 +634,11 @@ func TestAddrChangeImmediatelyIfAddressNonEmpty(t *testing.T) { defer h.Close() sub, err := h.EventBus().Subscribe(&event.EvtLocalAddressesUpdated{}) - close(starting) if err != nil { t.Error(err) } defer sub.Close() + close(starting) h.Start() expected := event.EvtLocalAddressesUpdated{