Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

swarm: delay /webrtc-direct dials by 1 second #3078

Merged
merged 5 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 28 additions & 7 deletions p2p/net/swarm/dial_ranker.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,15 @@ const (

// RelayDelay is the duration by which relay dials are delayed relative to direct addresses
RelayDelay = 500 * time.Millisecond

// delay for other transport addresses. This will apply to /webrtc-direct.
PublicOtherDelay = 1 * time.Second
PrivateOtherDelay = 100 * time.Millisecond
)

// NoDelayDialRanker ranks addresses with no delay. This is useful for simultaneous connect requests.
func NoDelayDialRanker(addrs []ma.Multiaddr) []network.AddrDelay {
return getAddrDelay(addrs, 0, 0, 0)
return getAddrDelay(addrs, 0, 0, 0, 0)
}

// DefaultDialRanker determines the ranking of outgoing connection attempts.
Expand Down Expand Up @@ -67,8 +71,11 @@ func NoDelayDialRanker(addrs []ma.Multiaddr) []network.AddrDelay {
// 5. If only one of TCP IPv6 or TCP IPv4 addresses are present, dial the TCP address with the lowest port
// first. After this we dial the rest of the TCP addresses delayed by 250ms (PublicTCPDelay) for public
// addresses, and 30ms (PrivateTCPDelay) for local addresses.
// 6. When a TCP socket is connected and awaiting security and muxer upgrade, we stop new dials for 2*PrivateTCPDelay
// 6. When a TCP socket is connected and awaiting security and muxer upgrade, we stop new dials for 2*PublicTCPDelay
// to allow for the upgrade to complete.
// 7. WebRTC Direct, and other IP transport addresses are dialed 1 second after the last QUIC or TCP dial.
// We only ever need to dial these if the peer doesn't have any other transport available, in which
// case these are dialed immediately.
//
// We dial lowest ports first as they are more likely to be the listen port.
func DefaultDialRanker(addrs []ma.Multiaddr) []network.AddrDelay {
Expand All @@ -83,13 +90,18 @@ func DefaultDialRanker(addrs []ma.Multiaddr) []network.AddrDelay {
}

res := make([]network.AddrDelay, 0, len(addrs))
res = append(res, getAddrDelay(pvt, PrivateTCPDelay, PrivateQUICDelay, PrivateOtherDelay, 0)...)
res = append(res, getAddrDelay(public, PublicTCPDelay, PublicQUICDelay, PublicOtherDelay, 0)...)
res = append(res, getAddrDelay(relay, PublicTCPDelay, PublicQUICDelay, PublicOtherDelay, relayOffset)...)
var maxDelay time.Duration
if len(res) > 0 {
maxDelay = res[len(res)-1].Delay
}

for i := 0; i < len(addrs); i++ {
res = append(res, network.AddrDelay{Addr: addrs[i], Delay: 0})
res = append(res, network.AddrDelay{Addr: addrs[i], Delay: maxDelay + PublicOtherDelay})
}

res = append(res, getAddrDelay(pvt, PrivateTCPDelay, PrivateQUICDelay, 0)...)
res = append(res, getAddrDelay(public, PublicTCPDelay, PublicQUICDelay, 0)...)
res = append(res, getAddrDelay(relay, PublicTCPDelay, PublicQUICDelay, relayOffset)...)
return res
}

Expand All @@ -98,7 +110,7 @@ func DefaultDialRanker(addrs []ma.Multiaddr) []network.AddrDelay {
// offset is used to delay all addresses by a fixed duration. This is useful for delaying all relay
// addresses relative to direct addresses.
func getAddrDelay(addrs []ma.Multiaddr, tcpDelay time.Duration, quicDelay time.Duration,
offset time.Duration) []network.AddrDelay {
otherDelay time.Duration, offset time.Duration) []network.AddrDelay {
if len(addrs) == 0 {
return nil
}
Expand Down Expand Up @@ -158,6 +170,7 @@ func getAddrDelay(addrs []ma.Multiaddr, tcpDelay time.Duration, quicDelay time.D

res := make([]network.AddrDelay, 0, len(addrs))
var tcpFirstDialDelay time.Duration
var lastQUICOrTCPDelay time.Duration
for i, addr := range addrs {
var delay time.Duration
switch {
Expand All @@ -176,6 +189,7 @@ func getAddrDelay(addrs []ma.Multiaddr, tcpDelay time.Duration, quicDelay time.D
delay = quicDelay
}
}
lastQUICOrTCPDelay = delay
tcpFirstDialDelay = delay + tcpDelay
case isProtocolAddr(addr, ma.P_TCP):
// We dial an IPv6 address, then after tcpDelay an IPv4
Expand All @@ -193,6 +207,10 @@ func getAddrDelay(addrs []ma.Multiaddr, tcpDelay time.Duration, quicDelay time.D
}
}
delay += tcpFirstDialDelay
lastQUICOrTCPDelay = delay
// if it's neither quic, webtransport, tcp, or websocket address
default:
delay = lastQUICOrTCPDelay + otherDelay
}
res = append(res, network.AddrDelay{Addr: addr, Delay: offset + delay})
}
Expand Down Expand Up @@ -230,6 +248,9 @@ func score(a ma.Multiaddr) int {
pi, _ := strconv.Atoi(p)
return ip4Weight + pi + (1 << 20)
}
if _, err := a.ValueForProtocol(ma.P_WEBRTC_DIRECT); err == nil {
return 1 << 21
}
return (1 << 30)
}

Expand Down
70 changes: 69 additions & 1 deletion p2p/net/swarm/dial_ranker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func TestNoDelayDialRanker(t *testing.T) {
q3v1 := ma.StringCast("/ip4/1.2.3.4/udp/3/quic-v1")
q4 := ma.StringCast("/ip4/1.2.3.4/udp/4/quic-v1")
t1 := ma.StringCast("/ip4/1.2.3.5/tcp/1/")
wrtc1 := ma.StringCast("/ip4/1.1.1.1/udp/1/webrtc-direct")

testCase := []struct {
name string
Expand All @@ -37,7 +38,7 @@ func TestNoDelayDialRanker(t *testing.T) {
}{
{
name: "quic+webtransport filtered when quicv1",
addrs: []ma.Multiaddr{q1, q2, q3, q4, q1v1, q2v1, q3v1, wt1, t1},
addrs: []ma.Multiaddr{q1, q2, q3, q4, q1v1, q2v1, q3v1, wt1, t1, wrtc1},
output: []network.AddrDelay{
{Addr: q1, Delay: 0},
{Addr: q2, Delay: 0},
Expand All @@ -48,6 +49,7 @@ func TestNoDelayDialRanker(t *testing.T) {
{Addr: q3v1, Delay: 0},
{Addr: wt1, Delay: 0},
{Addr: t1, Delay: 0},
{Addr: wrtc1, Delay: 0},
},
},
}
Expand Down Expand Up @@ -287,3 +289,69 @@ func TestDelayRankerRelay(t *testing.T) {
})
}
}

func TestDelayRankerOtherTransportDelay(t *testing.T) {
q1v1 := ma.StringCast("/ip4/1.2.3.4/udp/1/quic-v1")
q1v16 := ma.StringCast("/ip6/1::2/udp/1/quic-v1")
t1 := ma.StringCast("/ip4/1.2.3.5/tcp/1/")
t1v6 := ma.StringCast("/ip6/1::2/tcp/1")
wrtc1 := ma.StringCast("/ip4/1.2.3.4/udp/1/webrtc-direct")
wrtc1v6 := ma.StringCast("/ip6/1::2/udp/1/webrtc-direct")
onion1 := ma.StringCast("/onion3/vww6ybal4bd7szmgncyruucpgfkqahzddi37ktceo3ah7ngmcopnpyyd:1234")
onlyIP := ma.StringCast("/ip4/1.2.3.4/")
testCase := []struct {
name string
addrs []ma.Multiaddr
output []network.AddrDelay
}{
{
name: "quic-with-other",
addrs: []ma.Multiaddr{q1v1, q1v16, wrtc1, wrtc1v6, onion1, onlyIP},
output: []network.AddrDelay{
{Addr: q1v16, Delay: 0},
{Addr: q1v1, Delay: PublicQUICDelay},
{Addr: wrtc1, Delay: PublicQUICDelay + PublicOtherDelay},
{Addr: wrtc1v6, Delay: PublicQUICDelay + PublicOtherDelay},
{Addr: onlyIP, Delay: PublicQUICDelay + PublicOtherDelay},
{Addr: onion1, Delay: PublicQUICDelay + 2*PublicOtherDelay},
},
},
{
name: "quic-and-tcp-with-other",
addrs: []ma.Multiaddr{q1v1, t1, t1v6, wrtc1, wrtc1v6, onion1, onlyIP},
output: []network.AddrDelay{
{Addr: q1v1, Delay: 0},
{Addr: t1v6, Delay: PublicQUICDelay},
{Addr: t1, Delay: 2 * PublicQUICDelay},
{Addr: wrtc1, Delay: 2*PublicQUICDelay + PublicOtherDelay},
{Addr: wrtc1v6, Delay: 2*PublicQUICDelay + PublicOtherDelay},
{Addr: onlyIP, Delay: 2*PublicQUICDelay + PublicOtherDelay},
{Addr: onion1, Delay: 2*PublicQUICDelay + 2*PublicOtherDelay},
},
},
{
name: "only-non-ip-addr",
addrs: []ma.Multiaddr{onion1},
output: []network.AddrDelay{
{Addr: onion1, Delay: PublicOtherDelay},
},
},
}
for _, tc := range testCase {
t.Run(tc.name, func(t *testing.T) {
res := DefaultDialRanker(tc.addrs)
if len(res) != len(tc.output) {
log.Errorf("expected %s got %s", tc.output, res)
t.Errorf("expected elems: %d got: %d", len(tc.output), len(res))
return
}
sortAddrDelays(res)
sortAddrDelays(tc.output)
for i := 0; i < len(tc.output); i++ {
if !tc.output[i].Addr.Equal(res[i].Addr) || tc.output[i].Delay != res[i].Delay {
t.Fatalf("expected %+v got %+v", tc.output, res)
}
}
})
}
}
73 changes: 73 additions & 0 deletions p2p/test/basichost/basic_host_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,19 @@ import (
"context"
"fmt"
"strings"
"sync"
"testing"
"time"

"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/p2p/net/swarm"
"github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/client"
"github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/relay"
libp2pquic "github.com/libp2p/go-libp2p/p2p/transport/quic"
libp2pwebrtc "github.com/libp2p/go-libp2p/p2p/transport/webrtc"
libp2pwebtransport "github.com/libp2p/go-libp2p/p2p/transport/webtransport"
ma "github.com/multiformats/go-multiaddr"
Expand Down Expand Up @@ -199,3 +203,72 @@ func TestAddrFactorCertHashAppend(t *testing.T) {
return hasWebRTC && hasWebTransport
}, 5*time.Second, 100*time.Millisecond)
}

func TestWebRTCDirectDialDelay(t *testing.T) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this test. Is it trying to make sure that if many peers are connected to a server (via QUIC?), that another peer can also connect to that server via WebRTC?

Copy link
Member Author

@sukunrt sukunrt Dec 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two tests.

  1. TestOnlyWebRTCDirectDialNoDelay: Tests that dials to a peer with only webrtc address aren't delayed.
  2. TestWebRTCWithManyQUICConnections: Tests that in presence of quic addresses webrtc dials are actually delayed. In case there's no happy eyeballs, the 200 initial dials would have taken up all the goroutines available for handshakes in the webrtc listener.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case there's no happy eyeballs, the 200 initial dials would have taken up all the goroutines available for handshakes in the webrtc listener.

But you wait for the Connect to finish, so wouldn't the handshake be complete at this point?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the QUIC handshake will complete not the webrtc one. The webrtc handshake will be cancelled. As there's no way to signal this cancellation in webrtc, the server will have to wait for timeout on this handshake. So, at this point if we get a new webrtc handshake attempt, it'll fail because there were no free listener goroutines. This all happens when there's no happy eyeballs.

With happy eyeballs, the quic handshake should complete without any webrtc dials.

// This tests that only webrtc-direct dials are dialled immediately
// and not delayed by dial ranker.
h1, err := libp2p.New(
libp2p.Transport(libp2pwebrtc.New),
libp2p.ListenAddrStrings(
"/ip4/0.0.0.0/udp/0/webrtc-direct",
),
)
require.NoError(t, err)
h2, err := libp2p.New(
libp2p.Transport(libp2pwebrtc.New),
libp2p.NoListenAddrs,
)
require.NoError(t, err)

ctx, cancel := context.WithTimeout(context.Background(), swarm.PrivateOtherDelay-10*time.Millisecond)
defer cancel()
err = h2.Connect(ctx, peer.AddrInfo{ID: h1.ID(), Addrs: h1.Addrs()})
require.NoError(t, err)
}

func TestWebRTCWithQUICManyConnections(t *testing.T) {
// Correctly fixes: https://github.com/libp2p/js-libp2p/issues/2805

h, err := libp2p.New(
libp2p.Transport(libp2pquic.NewTransport),
libp2p.Transport(libp2pwebrtc.New),
libp2p.ListenAddrStrings("/ip4/0.0.0.0/udp/0/quic-v1"),
libp2p.ListenAddrStrings("/ip4/0.0.0.0/udp/0/webrtc-direct"),
)
require.NoError(t, err)
defer h.Close()

const N = 50
var dialers [N]host.Host
for i := 0; i < N; i++ {
dialers[i], err = libp2p.New(libp2p.NoListenAddrs)
require.NoError(t, err)
defer dialers[i].Close()
}
// only webrtc dialer for dialing the peer later
d, err := libp2p.New(libp2p.Transport(libp2pwebrtc.New), libp2p.NoListenAddrs)
require.NoError(t, err)
defer d.Close()

startDial := make(chan struct{})
var wg sync.WaitGroup
wg.Add(N)
for i := 0; i < N; i++ {
go func() {
defer wg.Done()
<-startDial
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// it is fine if the dial fails, we just want to take up all the webrtc
// listen queue
_ = dialers[i].Connect(ctx, peer.AddrInfo{ID: h.ID(), Addrs: h.Addrs()})
}()
}
close(startDial)
wg.Wait()

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
err = d.Connect(ctx, peer.AddrInfo{ID: h.ID(), Addrs: h.Addrs()})
require.NoError(t, err)
}
Loading