Skip to content

Commit

Permalink
add tests for present behavior
Browse files Browse the repository at this point in the history
  • Loading branch information
sukunrt committed Dec 3, 2024
1 parent fd76100 commit d77303e
Show file tree
Hide file tree
Showing 3 changed files with 229 additions and 57 deletions.
13 changes: 7 additions & 6 deletions p2p/host/basic/address_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,13 @@ type addressService struct {
addrsChangeChan chan struct{}
addrsUpdated chan struct{}
autoRelayAddrsSub event.Subscription
autoRelayAddrs func() []ma.Multiaddr
reachability func() network.Reachability
ifaceAddrs *interfaceAddrsCache
wg sync.WaitGroup
ctx context.Context
ctxCancel context.CancelFunc
// There are wrapped in to functions for mocking
autoRelayAddrs func() []ma.Multiaddr
reachability func() network.Reachability
ifaceAddrs *interfaceAddrsCache
wg sync.WaitGroup
ctx context.Context
ctxCancel context.CancelFunc
}

func NewAddressService(h *BasicHost, natmgr func(network.Network) NATManager,
Expand Down
174 changes: 174 additions & 0 deletions p2p/host/basic/address_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ package basichost

import (
"testing"
"time"

"github.com/libp2p/go-libp2p/core/network"
swarmt "github.com/libp2p/go-libp2p/p2p/net/swarm/testing"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -96,3 +99,174 @@ func TestAppendNATAddrs(t *testing.T) {
})
}
}

type mockNatManager struct {
GetMappingFunc func(addr ma.Multiaddr) ma.Multiaddr
HasDiscoveredNATFunc func() bool
}

func (m *mockNatManager) Close() error {
return nil
}

func (m *mockNatManager) GetMapping(addr ma.Multiaddr) ma.Multiaddr {
return m.GetMappingFunc(addr)
}

func (m *mockNatManager) HasDiscoveredNAT() bool {
return m.HasDiscoveredNATFunc()
}

var _ NATManager = &mockNatManager{}

type mockObservedAddrs struct {
OwnObservedAddrsFunc func() []ma.Multiaddr
ObservedAddrsForFunc func(ma.Multiaddr) []ma.Multiaddr
}

func (m *mockObservedAddrs) OwnObservedAddrs() []ma.Multiaddr {
return m.OwnObservedAddrsFunc()
}

func (m *mockObservedAddrs) ObservedAddrsFor(local ma.Multiaddr) []ma.Multiaddr {
return m.ObservedAddrsForFunc(local)
}

func TestAddressService(t *testing.T) {
getAddrService := func() *addressService {
h, err := NewHost(swarmt.GenSwarm(t), &HostOpts{DisableIdentifyAddressDiscovery: true})
require.NoError(t, err)
t.Cleanup(func() { h.Close() })

as := h.addressService
return as
}

t.Run("NAT Address", func(t *testing.T) {
as := getAddrService()
as.natmgr = &mockNatManager{
HasDiscoveredNATFunc: func() bool { return true },
GetMappingFunc: func(addr ma.Multiaddr) ma.Multiaddr {
if _, err := addr.ValueForProtocol(ma.P_UDP); err == nil {
return ma.StringCast("/ip4/1.2.3.4/udp/1/quic-v1")
}
return nil
},
}
require.Contains(t, as.Addrs(), ma.StringCast("/ip4/1.2.3.4/udp/1/quic-v1"))
})

t.Run("NAT And Observed Address", func(t *testing.T) {
as := getAddrService()
as.natmgr = &mockNatManager{
HasDiscoveredNATFunc: func() bool { return true },
GetMappingFunc: func(addr ma.Multiaddr) ma.Multiaddr {
if _, err := addr.ValueForProtocol(ma.P_UDP); err == nil {
return ma.StringCast("/ip4/1.2.3.4/udp/1/quic-v1")
}
return nil
},
}
as.observedAddrsService = &mockObservedAddrs{
ObservedAddrsForFunc: func(addr ma.Multiaddr) []ma.Multiaddr {
if _, err := addr.ValueForProtocol(ma.P_TCP); err == nil {
return []ma.Multiaddr{ma.StringCast("/ip4/2.2.2.2/tcp/1")}
}
return nil
},
}
require.Contains(t, as.Addrs(), ma.StringCast("/ip4/1.2.3.4/udp/1/quic-v1"))
require.Contains(t, as.Addrs(), ma.StringCast("/ip4/2.2.2.2/tcp/1"))
})
t.Run("Only Observed Address", func(t *testing.T) {
as := getAddrService()
as.natmgr = nil
as.observedAddrsService = &mockObservedAddrs{
ObservedAddrsForFunc: func(addr ma.Multiaddr) []ma.Multiaddr {
if _, err := addr.ValueForProtocol(ma.P_TCP); err == nil {
return []ma.Multiaddr{ma.StringCast("/ip4/2.2.2.2/tcp/1")}
}
return nil
},
OwnObservedAddrsFunc: func() []ma.Multiaddr {
return []ma.Multiaddr{ma.StringCast("/ip4/3.3.3.3/udp/1/quic-v1")}
},
}
require.NotContains(t, as.Addrs(), ma.StringCast("/ip4/2.2.2.2/tcp/1"))
require.Contains(t, as.Addrs(), ma.StringCast("/ip4/3.3.3.3/udp/1/quic-v1"))
})
t.Run("Public Addrs Removed When Private", func(t *testing.T) {
as := getAddrService()
as.natmgr = nil
as.observedAddrsService = &mockObservedAddrs{
OwnObservedAddrsFunc: func() []ma.Multiaddr {
return []ma.Multiaddr{ma.StringCast("/ip4/3.3.3.3/udp/1/quic-v1")}
},
}
as.reachability = func() network.Reachability {
return network.ReachabilityPrivate
}
relayAddr := ma.StringCast("/ip4/1.2.3.4/udp/1/quic-v1/p2p/QmdXGaeGiVA745XorV1jr11RHxB9z4fqykm6xCUPX1aTJo/p2p-circuit")
as.autoRelayAddrs = func() []ma.Multiaddr {
return []ma.Multiaddr{relayAddr}
}
require.NotContains(t, as.Addrs(), ma.StringCast("/ip4/3.3.3.3/udp/1/quic-v1"))
require.Contains(t, as.Addrs(), relayAddr)
require.Contains(t, as.AllAddrs(), ma.StringCast("/ip4/3.3.3.3/udp/1/quic-v1"))
})

t.Run("AddressFactory gets relay addresses", func(t *testing.T) {
as := getAddrService()
as.natmgr = nil
as.observedAddrsService = &mockObservedAddrs{
OwnObservedAddrsFunc: func() []ma.Multiaddr {
return []ma.Multiaddr{ma.StringCast("/ip4/3.3.3.3/udp/1/quic-v1")}
},
}
as.reachability = func() network.Reachability {
return network.ReachabilityPrivate
}
relayAddr := ma.StringCast("/ip4/1.2.3.4/udp/1/quic-v1/p2p/QmdXGaeGiVA745XorV1jr11RHxB9z4fqykm6xCUPX1aTJo/p2p-circuit")
as.autoRelayAddrs = func() []ma.Multiaddr {
return []ma.Multiaddr{relayAddr}
}
as.addrsFactory = func(addrs []ma.Multiaddr) []ma.Multiaddr {
for _, a := range addrs {
if a.Equal(relayAddr) {
return []ma.Multiaddr{ma.StringCast("/ip4/3.3.3.3/udp/1/quic-v1")}
}
}
return nil
}
require.Contains(t, as.Addrs(), ma.StringCast("/ip4/3.3.3.3/udp/1/quic-v1"))
require.NotContains(t, as.Addrs(), relayAddr)
})

t.Run("updates addresses on signaling", func(t *testing.T) {
as := getAddrService()
as.Start()
as.natmgr = nil
updateChan := make(chan struct{})
a1 := ma.StringCast("/ip4/1.1.1.1/udp/1/quic-v1")
a2 := ma.StringCast("/ip4/1.1.1.1/tcp/1")
as.addrsFactory = func(addrs []ma.Multiaddr) []ma.Multiaddr {
select {
case <-updateChan:
return []ma.Multiaddr{a2}
default:
return []ma.Multiaddr{a1}
}
}
require.Contains(t, as.Addrs(), a1)
require.NotContains(t, as.Addrs(), a2)
close(updateChan)
as.SignalAddressChange()
select {
case <-as.AddrsUpdated():
require.Contains(t, as.Addrs(), a2)
require.NotContains(t, as.Addrs(), a1)
case <-time.After(2 * time.Second):
t.Fatal("expected addrs to be updated")
}
})
}
99 changes: 48 additions & 51 deletions p2p/host/basic/basic_host.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ type BasicHost struct {

disableSignedPeerRecord bool
signKey crypto.PrivKey
caBook peerstore.CertifiedAddrBook

autoNATMx sync.RWMutex
autoNat autonat.AutoNAT
Expand Down Expand Up @@ -309,11 +310,12 @@ func NewHost(n network.Network, opts *HostOpts) (*BasicHost, error) {
if !ok {
return nil, errors.New("peerstore should also be a certified address book")
}
h.caBook = cab
rec, err := h.makeSignedPeerRecord(h.addressService.Addrs())
if err != nil {
return nil, fmt.Errorf("failed to create signed record for self: %w", err)
}
if _, err := cab.ConsumePeerRecord(rec, peerstore.PermanentAddrTTL); err != nil {
if _, err := h.caBook.ConsumePeerRecord(rec, peerstore.PermanentAddrTTL); err != nil {
return nil, fmt.Errorf("failed to persist signed record to peerstore: %w", err)
}
}
Expand Down Expand Up @@ -398,56 +400,6 @@ func (h *BasicHost) newStreamHandler(s network.Stream) {
handle(protoID, s)
}

func (h *BasicHost) background() {
defer h.refCount.Done()
var lastAddrs []ma.Multiaddr

// TODO: Deprecate this event and logic
emitAddrChange := func(currentAddrs []ma.Multiaddr, lastAddrs []ma.Multiaddr) {
changeEvt := h.makeUpdatedAddrEvent(lastAddrs, currentAddrs)
if changeEvt == nil {
return
}
// Our addresses have changed.
// store the signed peer record in the peer store.
if !h.disableSignedPeerRecord {
cabook, ok := peerstore.GetCertifiedAddrBook(h.Peerstore())
if !ok {
log.Errorf("peerstore doesn't implement certified address book")
return
}
if _, err := cabook.ConsumePeerRecord(changeEvt.SignedPeerRecord, peerstore.PermanentAddrTTL); err != nil {
log.Errorf("failed to persist signed peer record in peer store, err=%s", err)
return
}
}
// update host addresses in the peer store
removedAddrs := make([]ma.Multiaddr, 0, len(changeEvt.Removed))
for _, ua := range changeEvt.Removed {
removedAddrs = append(removedAddrs, ua.Address)
}
h.Peerstore().SetAddrs(h.ID(), currentAddrs, peerstore.PermanentAddrTTL)
h.Peerstore().SetAddrs(h.ID(), removedAddrs, 0)

// emit addr change event
if err := h.emitters.evtLocalAddrsUpdated.Emit(*changeEvt); err != nil {
log.Warnf("error emitting event for updated addrs: %s", err)
}
}

for {
curr := h.Addrs()
emitAddrChange(curr, lastAddrs)
lastAddrs = curr

select {
case <-h.addressService.AddrsUpdated():
case <-h.ctx.Done():
return
}
}
}

func (h *BasicHost) makeUpdatedAddrEvent(prev, current []ma.Multiaddr) *event.EvtLocalAddressesUpdated {
if prev == nil && current == nil {
return nil
Expand Down Expand Up @@ -515,6 +467,51 @@ func (h *BasicHost) makeSignedPeerRecord(addrs []ma.Multiaddr) (*record.Envelope
return record.Seal(rec, h.signKey)
}

func (h *BasicHost) background() {
defer h.refCount.Done()
var lastAddrs []ma.Multiaddr

// TODO: Deprecate this event and logic
emitAddrChange := func(currentAddrs []ma.Multiaddr, lastAddrs []ma.Multiaddr) {
changeEvt := h.makeUpdatedAddrEvent(lastAddrs, currentAddrs)
if changeEvt == nil {
return
}
// Our addresses have changed.
// store the signed peer record in the peer store.
if !h.disableSignedPeerRecord {
if _, err := h.caBook.ConsumePeerRecord(changeEvt.SignedPeerRecord, peerstore.PermanentAddrTTL); err != nil {
log.Errorf("failed to persist signed peer record in peer store, err=%s", err)
return
}
}
// update host addresses in the peer store
removedAddrs := make([]ma.Multiaddr, 0, len(changeEvt.Removed))
for _, ua := range changeEvt.Removed {
removedAddrs = append(removedAddrs, ua.Address)
}
h.Peerstore().SetAddrs(h.ID(), currentAddrs, peerstore.PermanentAddrTTL)
h.Peerstore().SetAddrs(h.ID(), removedAddrs, 0)

// emit addr change event
if err := h.emitters.evtLocalAddrsUpdated.Emit(*changeEvt); err != nil {
log.Warnf("error emitting event for updated addrs: %s", err)
}
}

for {
curr := h.Addrs()
emitAddrChange(curr, lastAddrs)
lastAddrs = curr

select {
case <-h.addressService.AddrsUpdated():
case <-h.ctx.Done():
return
}
}
}

// ID returns the (local) peer.ID associated with this Host
func (h *BasicHost) ID() peer.ID {
return h.Network().LocalPeer()
Expand Down

0 comments on commit d77303e

Please sign in to comment.