Skip to content

Commit

Permalink
autorelay: send addresses on eventbus; dont wrap address factory
Browse files Browse the repository at this point in the history
  • Loading branch information
sukunrt committed Dec 3, 2024
1 parent 8423de3 commit 762a43e
Show file tree
Hide file tree
Showing 8 changed files with 223 additions and 126 deletions.
34 changes: 8 additions & 26 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,8 @@ func (cfg *Config) newBasicHost(swrm *swarm.Swarm, eventBus event.Bus) (*bhost.B
DisableIdentifyAddressDiscovery: cfg.DisableIdentifyAddressDiscovery,
EnableAutoNATv2: cfg.EnableAutoNATv2,
AutoNATv2Dialer: autonatv2Dialer,
EnableAutoRelay: cfg.EnableAutoRelay,
AutoRelayOpts: cfg.AutoRelayOpts,
})
if err != nil {
return nil, err
Expand Down Expand Up @@ -518,28 +520,6 @@ func (cfg *Config) NewNode() (host.Host, error) {
)
}

// enable autorelay
fxopts = append(fxopts,
fx.Invoke(func(h *bhost.BasicHost, lifecycle fx.Lifecycle) error {
if cfg.EnableAutoRelay {
if !cfg.DisableMetrics {
mt := autorelay.WithMetricsTracer(
autorelay.NewMetricsTracer(autorelay.WithRegisterer(cfg.PrometheusRegisterer)))
mtOpts := []autorelay.Option{mt}
cfg.AutoRelayOpts = append(mtOpts, cfg.AutoRelayOpts...)
}

ar, err := autorelay.NewAutoRelay(h, cfg.AutoRelayOpts...)
if err != nil {
return err
}
lifecycle.Append(fx.StartStopHook(ar.Start, ar.Close))
return nil
}
return nil
}),
)

var bh *bhost.BasicHost
fxopts = append(fxopts, fx.Invoke(func(bho *bhost.BasicHost) { bh = bho }))
fxopts = append(fxopts, fx.Invoke(func(h *bhost.BasicHost, lifecycle fx.Lifecycle) {
Expand All @@ -554,12 +534,10 @@ func (cfg *Config) NewNode() (host.Host, error) {
fxopts = append(fxopts, cfg.UserFxOptions...)

app := fx.New(fxopts...)
if err := app.Start(context.Background()); err != nil {
return nil, err
if app.Err() != nil {
return nil, fmt.Errorf("failed to create host: %w", app.Err())
}

if err := cfg.addAutoNAT(bh); err != nil {
app.Stop(context.Background())
if cfg.Routing != nil {
rh.Close()
} else {
Expand All @@ -568,6 +546,10 @@ func (cfg *Config) NewNode() (host.Host, error) {
return nil, err
}

if err := app.Start(context.Background()); err != nil {
return nil, err
}

if cfg.Routing != nil {
return &closableRoutedHost{App: app, RoutedHost: rh}, nil
}
Expand Down
5 changes: 5 additions & 0 deletions core/event/addrs.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,8 @@ type EvtLocalAddressesUpdated struct {
// wrapped in a record.Envelope and signed by the Host's private key.
SignedPeerRecord *record.Envelope
}

// EvtAutoRelayAddrsUpdated is sent by the autorelay when the node's relay addresses are updated
type EvtAutoRelayAddrs struct {
RelayAddrs []ma.Multiaddr
}
15 changes: 4 additions & 11 deletions p2p/host/autorelay/addrsplosion.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ import (

// This function cleans up a relay's address set to remove private addresses and curtail
// addrsplosion.
// TODO: Remove this, we don't need this. The current method tries to select the
// best address for the relay. Instead we should rely on the addresses provided by the
// relay in response to the reservation request.
func cleanupAddressSet(addrs []ma.Multiaddr) []ma.Multiaddr {
var public, private []ma.Multiaddr

Expand All @@ -17,7 +20,7 @@ func cleanupAddressSet(addrs []ma.Multiaddr) []ma.Multiaddr {
continue
}

if manet.IsPublicAddr(a) || isDNSAddr(a) {
if manet.IsPublicAddr(a) {
public = append(public, a)
continue
}
Expand Down Expand Up @@ -51,16 +54,6 @@ func isRelayAddr(a ma.Multiaddr) bool {
return isRelay
}

func isDNSAddr(a ma.Multiaddr) bool {
if first, _ := ma.SplitFirst(a); first != nil {
switch first.Protocol().Code {
case ma.P_DNS, ma.P_DNS4, ma.P_DNS6, ma.P_DNSADDR:
return true
}
}
return false
}

// we have addrsplosion if for some protocol we advertise multiple ports on
// the same base address.
func hasAddrsplosion(addrs []ma.Multiaddr) bool {
Expand Down
37 changes: 13 additions & 24 deletions p2p/host/autorelay/autorelay.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,16 @@ package autorelay
import (
"context"
"errors"
"fmt"
"sync"

"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
basic "github.com/libp2p/go-libp2p/p2p/host/basic"
"github.com/libp2p/go-libp2p/p2p/host/eventbus"
ma "github.com/multiformats/go-multiaddr"

logging "github.com/ipfs/go-log/v2"
ma "github.com/multiformats/go-multiaddr"
)

var log = logging.Logger("autorelay")
Expand All @@ -22,8 +22,6 @@ type AutoRelay struct {
ctx context.Context
ctxCancel context.CancelFunc

conf *config

mx sync.Mutex
status network.Reachability

Expand All @@ -34,9 +32,9 @@ type AutoRelay struct {
metricsTracer MetricsTracer
}

func NewAutoRelay(bhost *basic.BasicHost, opts ...Option) (*AutoRelay, error) {
func NewAutoRelay(host host.Host, opts ...Option) (*AutoRelay, error) {
r := &AutoRelay{
host: bhost,
host: host,
status: network.ReachabilityUnknown,
}
conf := defaultConfig
Expand All @@ -46,29 +44,20 @@ func NewAutoRelay(bhost *basic.BasicHost, opts ...Option) (*AutoRelay, error) {
}
}
r.ctx, r.ctxCancel = context.WithCancel(context.Background())
r.conf = &conf
r.relayFinder = newRelayFinder(bhost, conf.peerSource, &conf)
r.metricsTracer = &wrappedMetricsTracer{conf.metricsTracer}

// Update the host address factory to use autorelay addresses if we're private
//
// TODO: Don't update host address factory. Instead send our relay addresses on the eventbus.
// The host can decide how to handle those.
addrF := bhost.AddrsFactory
bhost.AddrsFactory = func(addrs []ma.Multiaddr) []ma.Multiaddr {
addrs = addrF(addrs)
r.mx.Lock()
defer r.mx.Unlock()

if r.status != network.ReachabilityPrivate {
return addrs
}
return r.relayFinder.relayAddrs(addrs)
rf, err := newRelayFinder(host, &conf)
if err != nil {
return nil, fmt.Errorf("failed to create autorelay: %w", err)
}
r.relayFinder = rf
r.metricsTracer = &wrappedMetricsTracer{conf.metricsTracer}

return r, nil
}

func (r *AutoRelay) RelayAddrs() []ma.Multiaddr {
return r.relayFinder.RelayAddrs()
}

func (r *AutoRelay) Start() {
r.refCount.Add(1)
go func() {
Expand Down
70 changes: 69 additions & 1 deletion p2p/host/autorelay/autorelay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ package autorelay_test
import (
"context"
"fmt"
"slices"
"strings"
"sync/atomic"
"testing"
"time"

"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
Expand Down Expand Up @@ -96,7 +98,10 @@ func newRelay(t *testing.T) host.Host {
saddr := addr.String()
if strings.HasPrefix(saddr, "/ip4/127.0.0.1/") {
addrNoIP := strings.TrimPrefix(saddr, "/ip4/127.0.0.1")
addrs[i] = ma.StringCast("/dns4/localhost" + addrNoIP)
// .internal is classified as a public address as users
// are free to map this dns to a public ip address for
// use within a LAN
addrs[i] = ma.StringCast("/dns/libp2p.internal" + addrNoIP)
}
}
return addrs
Expand Down Expand Up @@ -517,3 +522,66 @@ func TestNoBusyLoop0MinInterval(t *testing.T) {
val := atomic.LoadUint64(&calledTimes)
require.Less(t, val, uint64(2))
}
func TestAutoRelayAddrsEvent(t *testing.T) {
cl := newMockClock()
r1, r2 := newRelay(t), newRelay(t)
t.Cleanup(func() {
r1.Close()
r2.Close()
})

relayFromP2PAddr := func(a ma.Multiaddr) peer.ID {
r, c := ma.SplitLast(a)
if c.Protocol().Code != ma.P_CIRCUIT {
return ""
}
if id, err := peer.IDFromP2PAddr(r); err == nil {
return id
}
return ""
}

checkPeersExist := func(addrs []ma.Multiaddr, peers ...peer.ID) bool {
for _, p := range peers {
if !slices.ContainsFunc(addrs, func(a ma.Multiaddr) bool { return relayFromP2PAddr(a) == p }) {
return false
}
}
return true
}
peerChan := make(chan peer.AddrInfo, 3)
h := newPrivateNode(t,
func(context.Context, int) <-chan peer.AddrInfo {
return peerChan
},
autorelay.WithClock(cl),
autorelay.WithMinCandidates(1),
autorelay.WithMaxCandidates(10),
autorelay.WithNumRelays(3),
autorelay.WithBootDelay(1*time.Second),
autorelay.WithMinInterval(time.Hour),
)
defer h.Close()

sub, err := h.EventBus().Subscribe(new(event.EvtAutoRelayAddrs))
require.NoError(t, err)

peerChan <- peer.AddrInfo{ID: r1.ID(), Addrs: r1.Addrs()}
cl.AdvanceBy(time.Second)

require.Eventually(t, func() bool {
e := <-sub.Out()
if !checkPeersExist(e.(event.EvtAutoRelayAddrs).RelayAddrs, r1.ID()) {
return false
}
if checkPeersExist(e.(event.EvtAutoRelayAddrs).RelayAddrs, r2.ID()) {
return false
}
return true
}, 5*time.Second, 50*time.Millisecond)
peerChan <- peer.AddrInfo{ID: r2.ID(), Addrs: r2.Addrs()}
require.Eventually(t, func() bool {
e := <-sub.Out()
return checkPeersExist(e.(event.EvtAutoRelayAddrs).RelayAddrs, r1.ID(), r2.ID())
}, 5*time.Second, 50*time.Millisecond)
}
19 changes: 0 additions & 19 deletions p2p/host/autorelay/relay.go

This file was deleted.

Loading

0 comments on commit 762a43e

Please sign in to comment.