Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

swarm: delay /webrtc-direct dials by 1 second #3078

Merged
merged 5 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 28 additions & 7 deletions p2p/net/swarm/dial_ranker.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,15 @@ const (

// RelayDelay is the duration by which relay dials are delayed relative to direct addresses
RelayDelay = 500 * time.Millisecond

// delay for other transport addresses. This will apply to /webrtc-direct.
PublicOtherDelay = 1 * time.Second
PrivateOtherDelay = 100 * time.Millisecond
)

// NoDelayDialRanker ranks addresses with no delay. This is useful for simultaneous connect requests.
func NoDelayDialRanker(addrs []ma.Multiaddr) []network.AddrDelay {
return getAddrDelay(addrs, 0, 0, 0)
return getAddrDelay(addrs, 0, 0, 0, 0)
}

// DefaultDialRanker determines the ranking of outgoing connection attempts.
Expand Down Expand Up @@ -67,8 +71,11 @@ func NoDelayDialRanker(addrs []ma.Multiaddr) []network.AddrDelay {
// 5. If only one of TCP IPv6 or TCP IPv4 addresses are present, dial the TCP address with the lowest port
// first. After this we dial the rest of the TCP addresses delayed by 250ms (PublicTCPDelay) for public
// addresses, and 30ms (PrivateTCPDelay) for local addresses.
// 6. When a TCP socket is connected and awaiting security and muxer upgrade, we stop new dials for 2*PrivateTCPDelay
// 6. When a TCP socket is connected and awaiting security and muxer upgrade, we stop new dials for 2*PublicTCPDelay
// to allow for the upgrade to complete.
// 7. WebRTC Direct, and other IP transport addresses are dialed 1 second after the last QUIC or TCP dial.
// We only ever need to dial these if the peer doesn't have any other transport available, in which
// case these are dialed immediately.
//
// We dial lowest ports first as they are more likely to be the listen port.
func DefaultDialRanker(addrs []ma.Multiaddr) []network.AddrDelay {
Expand All @@ -83,13 +90,18 @@ func DefaultDialRanker(addrs []ma.Multiaddr) []network.AddrDelay {
}

res := make([]network.AddrDelay, 0, len(addrs))
res = append(res, getAddrDelay(pvt, PrivateTCPDelay, PrivateQUICDelay, PrivateOtherDelay, 0)...)
res = append(res, getAddrDelay(public, PublicTCPDelay, PublicQUICDelay, PublicOtherDelay, 0)...)
res = append(res, getAddrDelay(relay, PublicTCPDelay, PublicQUICDelay, PublicOtherDelay, relayOffset)...)
var maxDelay time.Duration
if len(res) > 0 {
maxDelay = res[len(res)-1].Delay
}

for i := 0; i < len(addrs); i++ {
res = append(res, network.AddrDelay{Addr: addrs[i], Delay: 0})
res = append(res, network.AddrDelay{Addr: addrs[i], Delay: maxDelay + PublicOtherDelay})
}

res = append(res, getAddrDelay(pvt, PrivateTCPDelay, PrivateQUICDelay, 0)...)
res = append(res, getAddrDelay(public, PublicTCPDelay, PublicQUICDelay, 0)...)
res = append(res, getAddrDelay(relay, PublicTCPDelay, PublicQUICDelay, relayOffset)...)
return res
}

Expand All @@ -98,7 +110,7 @@ func DefaultDialRanker(addrs []ma.Multiaddr) []network.AddrDelay {
// offset is used to delay all addresses by a fixed duration. This is useful for delaying all relay
// addresses relative to direct addresses.
func getAddrDelay(addrs []ma.Multiaddr, tcpDelay time.Duration, quicDelay time.Duration,
offset time.Duration) []network.AddrDelay {
otherDelay time.Duration, offset time.Duration) []network.AddrDelay {
if len(addrs) == 0 {
return nil
}
Expand Down Expand Up @@ -158,6 +170,7 @@ func getAddrDelay(addrs []ma.Multiaddr, tcpDelay time.Duration, quicDelay time.D

res := make([]network.AddrDelay, 0, len(addrs))
var tcpFirstDialDelay time.Duration
var lastQUICOrTCPDelay time.Duration
for i, addr := range addrs {
var delay time.Duration
switch {
Expand All @@ -176,6 +189,7 @@ func getAddrDelay(addrs []ma.Multiaddr, tcpDelay time.Duration, quicDelay time.D
delay = quicDelay
}
}
lastQUICOrTCPDelay = delay
tcpFirstDialDelay = delay + tcpDelay
case isProtocolAddr(addr, ma.P_TCP):
// We dial an IPv6 address, then after tcpDelay an IPv4
Expand All @@ -193,6 +207,10 @@ func getAddrDelay(addrs []ma.Multiaddr, tcpDelay time.Duration, quicDelay time.D
}
}
delay += tcpFirstDialDelay
lastQUICOrTCPDelay = delay
// if it's neither quic, webtransport, tcp, or websocket address
default:
delay = lastQUICOrTCPDelay + otherDelay
}
res = append(res, network.AddrDelay{Addr: addr, Delay: offset + delay})
}
Expand Down Expand Up @@ -230,6 +248,9 @@ func score(a ma.Multiaddr) int {
pi, _ := strconv.Atoi(p)
return ip4Weight + pi + (1 << 20)
}
if _, err := a.ValueForProtocol(ma.P_WEBRTC_DIRECT); err == nil {
return 1 << 21
}
return (1 << 30)
}

Expand Down
70 changes: 69 additions & 1 deletion p2p/net/swarm/dial_ranker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func TestNoDelayDialRanker(t *testing.T) {
q3v1 := ma.StringCast("/ip4/1.2.3.4/udp/3/quic-v1")
q4 := ma.StringCast("/ip4/1.2.3.4/udp/4/quic-v1")
t1 := ma.StringCast("/ip4/1.2.3.5/tcp/1/")
wrtc1 := ma.StringCast("/ip4/1.1.1.1/udp/1/webrtc-direct")

testCase := []struct {
name string
Expand All @@ -37,7 +38,7 @@ func TestNoDelayDialRanker(t *testing.T) {
}{
{
name: "quic+webtransport filtered when quicv1",
addrs: []ma.Multiaddr{q1, q2, q3, q4, q1v1, q2v1, q3v1, wt1, t1},
addrs: []ma.Multiaddr{q1, q2, q3, q4, q1v1, q2v1, q3v1, wt1, t1, wrtc1},
output: []network.AddrDelay{
{Addr: q1, Delay: 0},
{Addr: q2, Delay: 0},
Expand All @@ -48,6 +49,7 @@ func TestNoDelayDialRanker(t *testing.T) {
{Addr: q3v1, Delay: 0},
{Addr: wt1, Delay: 0},
{Addr: t1, Delay: 0},
{Addr: wrtc1, Delay: 0},
},
},
}
Expand Down Expand Up @@ -287,3 +289,69 @@ func TestDelayRankerRelay(t *testing.T) {
})
}
}

func TestDelayRankerOtherTransportDelay(t *testing.T) {
q1v1 := ma.StringCast("/ip4/1.2.3.4/udp/1/quic-v1")
q1v16 := ma.StringCast("/ip6/1::2/udp/1/quic-v1")
t1 := ma.StringCast("/ip4/1.2.3.5/tcp/1/")
t1v6 := ma.StringCast("/ip6/1::2/tcp/1")
wrtc1 := ma.StringCast("/ip4/1.2.3.4/udp/1/webrtc-direct")
wrtc1v6 := ma.StringCast("/ip6/1::2/udp/1/webrtc-direct")
onion1 := ma.StringCast("/onion3/vww6ybal4bd7szmgncyruucpgfkqahzddi37ktceo3ah7ngmcopnpyyd:1234")
onlyIP := ma.StringCast("/ip4/1.2.3.4/")
testCase := []struct {
name string
addrs []ma.Multiaddr
output []network.AddrDelay
}{
{
name: "quic-with-other",
addrs: []ma.Multiaddr{q1v1, q1v16, wrtc1, wrtc1v6, onion1, onlyIP},
output: []network.AddrDelay{
{Addr: q1v16, Delay: 0},
{Addr: q1v1, Delay: PublicQUICDelay},
{Addr: wrtc1, Delay: PublicQUICDelay + PublicOtherDelay},
{Addr: wrtc1v6, Delay: PublicQUICDelay + PublicOtherDelay},
{Addr: onlyIP, Delay: PublicQUICDelay + PublicOtherDelay},
{Addr: onion1, Delay: PublicQUICDelay + 2*PublicOtherDelay},
},
},
{
name: "quic-and-tcp-with-other",
addrs: []ma.Multiaddr{q1v1, t1, t1v6, wrtc1, wrtc1v6, onion1, onlyIP},
output: []network.AddrDelay{
{Addr: q1v1, Delay: 0},
{Addr: t1v6, Delay: PublicQUICDelay},
{Addr: t1, Delay: 2 * PublicQUICDelay},
{Addr: wrtc1, Delay: 2*PublicQUICDelay + PublicOtherDelay},
{Addr: wrtc1v6, Delay: 2*PublicQUICDelay + PublicOtherDelay},
{Addr: onlyIP, Delay: 2*PublicQUICDelay + PublicOtherDelay},
{Addr: onion1, Delay: 2*PublicQUICDelay + 2*PublicOtherDelay},
},
},
{
name: "only-non-ip-addr",
addrs: []ma.Multiaddr{onion1},
output: []network.AddrDelay{
{Addr: onion1, Delay: PublicOtherDelay},
},
},
}
for _, tc := range testCase {
t.Run(tc.name, func(t *testing.T) {
res := DefaultDialRanker(tc.addrs)
if len(res) != len(tc.output) {
log.Errorf("expected %s got %s", tc.output, res)
t.Errorf("expected elems: %d got: %d", len(tc.output), len(res))
return
}
sortAddrDelays(res)
sortAddrDelays(tc.output)
for i := 0; i < len(tc.output); i++ {
if !tc.output[i].Addr.Equal(res[i].Addr) || tc.output[i].Delay != res[i].Delay {
t.Fatalf("expected %+v got %+v", tc.output, res)
}
}
})
}
}
4 changes: 4 additions & 0 deletions p2p/net/swarm/dial_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package swarm

import (
"context"
"fmt"
"math"
"sync"
"time"
Expand Down Expand Up @@ -295,6 +296,9 @@ loop:
}
ad.dialed = true
ad.dialRankingDelay = now.Sub(ad.createdAt)
if _, err := ad.addr.ValueForProtocol(ma.P_WEBRTC_DIRECT); err == nil {
fmt.Println("dial ranking delay", ad.addr, ad.dialRankingDelay)
}
err := w.s.dialNextAddr(ad.ctx, w.peer, ad.addr, w.resch)
if err != nil {
// Errored without attempting a dial. This happens in case of
Expand Down
69 changes: 69 additions & 0 deletions p2p/test/basichost/basic_host_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,14 @@ import (
"time"

"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/p2p/net/swarm"
"github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/client"
"github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/relay"
libp2pquic "github.com/libp2p/go-libp2p/p2p/transport/quic"
libp2pwebrtc "github.com/libp2p/go-libp2p/p2p/transport/webrtc"
libp2pwebtransport "github.com/libp2p/go-libp2p/p2p/transport/webtransport"
ma "github.com/multiformats/go-multiaddr"
Expand Down Expand Up @@ -199,3 +202,69 @@ func TestAddrFactorCertHashAppend(t *testing.T) {
return hasWebRTC && hasWebTransport
}, 5*time.Second, 100*time.Millisecond)
}

func TestOnlyWebRTCDirectDialNoDelay(t *testing.T) {
// This tests that only webrtc-direct dials are dialled immediately
// and not delayed by dial ranker.
h1, err := libp2p.New(
libp2p.Transport(libp2pwebrtc.New),
libp2p.ListenAddrStrings(
"/ip4/0.0.0.0/udp/0/webrtc-direct",
),
)
require.NoError(t, err)
h2, err := libp2p.New(
libp2p.Transport(libp2pwebrtc.New),
libp2p.NoListenAddrs,
)
require.NoError(t, err)

ctx, cancel := context.WithTimeout(context.Background(), swarm.PrivateOtherDelay-10*time.Millisecond)
defer cancel()
err = h2.Connect(ctx, peer.AddrInfo{ID: h1.ID(), Addrs: h1.Addrs()})
require.NoError(t, err)
}

func TestWebRTCWithQUICManyConnections(t *testing.T) {
// Correctly fixes: https://github.com/libp2p/js-libp2p/issues/2805

// The server has both /quic-v1 and /webrtc-direct listen addresses
h, err := libp2p.New(
libp2p.Transport(libp2pquic.NewTransport),
libp2p.Transport(libp2pwebrtc.New),
libp2p.ListenAddrStrings("/ip4/0.0.0.0/udp/0/quic-v1"),
libp2p.ListenAddrStrings("/ip4/0.0.0.0/udp/0/webrtc-direct"),
libp2p.ResourceManager(&network.NullResourceManager{}),
)
require.NoError(t, err)
defer h.Close()

const N = 200
// These N dialers have both /quic-v1 and /webrtc-direct transports
var dialers [N]host.Host
for i := 0; i < N; i++ {
dialers[i], err = libp2p.New(libp2p.NoListenAddrs)
require.NoError(t, err)
defer dialers[i].Close()
}
// This dialer has only /webrtc-direct transport
d, err := libp2p.New(libp2p.Transport(libp2pwebrtc.New), libp2p.NoListenAddrs)
require.NoError(t, err)
defer d.Close()

for i := 0; i < N; i++ {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// With happy eyeballs these dialers will connect over only /quic-v1
// and not stall the /webrtc-direct handshake goroutines.
// it is fine if the dial fails, we just want to ensure that there's space
// in the /webrtc-direct listen queue
_ = dialers[i].Connect(ctx, peer.AddrInfo{ID: h.ID(), Addrs: h.Addrs()})
}

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// The webrtc only dialer should be able to connect to the peer
err = d.Connect(ctx, peer.AddrInfo{ID: h.ID(), Addrs: h.Addrs()})
require.NoError(t, err)
}
Loading