diff --git a/p2p/net/swarm/dial_ranker.go b/p2p/net/swarm/dial_ranker.go index 7e58876b91..154a0344a1 100644 --- a/p2p/net/swarm/dial_ranker.go +++ b/p2p/net/swarm/dial_ranker.go @@ -22,11 +22,15 @@ const ( // RelayDelay is the duration by which relay dials are delayed relative to direct addresses RelayDelay = 500 * time.Millisecond + + // delay for other transport addresses. This will apply to /webrtc-direct. + PublicOtherDelay = 1 * time.Second + PrivateOtherDelay = 100 * time.Millisecond ) // NoDelayDialRanker ranks addresses with no delay. This is useful for simultaneous connect requests. func NoDelayDialRanker(addrs []ma.Multiaddr) []network.AddrDelay { - return getAddrDelay(addrs, 0, 0, 0) + return getAddrDelay(addrs, 0, 0, 0, 0) } // DefaultDialRanker determines the ranking of outgoing connection attempts. @@ -67,8 +71,11 @@ func NoDelayDialRanker(addrs []ma.Multiaddr) []network.AddrDelay { // 5. If only one of TCP IPv6 or TCP IPv4 addresses are present, dial the TCP address with the lowest port // first. After this we dial the rest of the TCP addresses delayed by 250ms (PublicTCPDelay) for public // addresses, and 30ms (PrivateTCPDelay) for local addresses. -// 6. When a TCP socket is connected and awaiting security and muxer upgrade, we stop new dials for 2*PrivateTCPDelay +// 6. When a TCP socket is connected and awaiting security and muxer upgrade, we stop new dials for 2*PublicTCPDelay // to allow for the upgrade to complete. +// 7. WebRTC Direct, and other IP transport addresses are dialed 1 second after the last QUIC or TCP dial. +// We only ever need to dial these if the peer doesn't have any other transport available, in which +// case these are dialed immediately. // // We dial lowest ports first as they are more likely to be the listen port. func DefaultDialRanker(addrs []ma.Multiaddr) []network.AddrDelay { @@ -83,13 +90,18 @@ func DefaultDialRanker(addrs []ma.Multiaddr) []network.AddrDelay { } res := make([]network.AddrDelay, 0, len(addrs)) + res = append(res, getAddrDelay(pvt, PrivateTCPDelay, PrivateQUICDelay, PrivateOtherDelay, 0)...) + res = append(res, getAddrDelay(public, PublicTCPDelay, PublicQUICDelay, PublicOtherDelay, 0)...) + res = append(res, getAddrDelay(relay, PublicTCPDelay, PublicQUICDelay, PublicOtherDelay, relayOffset)...) + var maxDelay time.Duration + if len(res) > 0 { + maxDelay = res[len(res)-1].Delay + } + for i := 0; i < len(addrs); i++ { - res = append(res, network.AddrDelay{Addr: addrs[i], Delay: 0}) + res = append(res, network.AddrDelay{Addr: addrs[i], Delay: maxDelay + PublicOtherDelay}) } - res = append(res, getAddrDelay(pvt, PrivateTCPDelay, PrivateQUICDelay, 0)...) - res = append(res, getAddrDelay(public, PublicTCPDelay, PublicQUICDelay, 0)...) - res = append(res, getAddrDelay(relay, PublicTCPDelay, PublicQUICDelay, relayOffset)...) return res } @@ -98,7 +110,7 @@ func DefaultDialRanker(addrs []ma.Multiaddr) []network.AddrDelay { // offset is used to delay all addresses by a fixed duration. This is useful for delaying all relay // addresses relative to direct addresses. func getAddrDelay(addrs []ma.Multiaddr, tcpDelay time.Duration, quicDelay time.Duration, - offset time.Duration) []network.AddrDelay { + otherDelay time.Duration, offset time.Duration) []network.AddrDelay { if len(addrs) == 0 { return nil } @@ -158,6 +170,7 @@ func getAddrDelay(addrs []ma.Multiaddr, tcpDelay time.Duration, quicDelay time.D res := make([]network.AddrDelay, 0, len(addrs)) var tcpFirstDialDelay time.Duration + var lastQUICOrTCPDelay time.Duration for i, addr := range addrs { var delay time.Duration switch { @@ -176,6 +189,7 @@ func getAddrDelay(addrs []ma.Multiaddr, tcpDelay time.Duration, quicDelay time.D delay = quicDelay } } + lastQUICOrTCPDelay = delay tcpFirstDialDelay = delay + tcpDelay case isProtocolAddr(addr, ma.P_TCP): // We dial an IPv6 address, then after tcpDelay an IPv4 @@ -193,6 +207,10 @@ func getAddrDelay(addrs []ma.Multiaddr, tcpDelay time.Duration, quicDelay time.D } } delay += tcpFirstDialDelay + lastQUICOrTCPDelay = delay + // if it's neither quic, webtransport, tcp, or websocket address + default: + delay = lastQUICOrTCPDelay + otherDelay } res = append(res, network.AddrDelay{Addr: addr, Delay: offset + delay}) } @@ -230,6 +248,9 @@ func score(a ma.Multiaddr) int { pi, _ := strconv.Atoi(p) return ip4Weight + pi + (1 << 20) } + if _, err := a.ValueForProtocol(ma.P_WEBRTC_DIRECT); err == nil { + return 1 << 21 + } return (1 << 30) } diff --git a/p2p/net/swarm/dial_ranker_test.go b/p2p/net/swarm/dial_ranker_test.go index 5ef3cc27f1..f7a6172122 100644 --- a/p2p/net/swarm/dial_ranker_test.go +++ b/p2p/net/swarm/dial_ranker_test.go @@ -29,6 +29,7 @@ func TestNoDelayDialRanker(t *testing.T) { q3v1 := ma.StringCast("/ip4/1.2.3.4/udp/3/quic-v1") q4 := ma.StringCast("/ip4/1.2.3.4/udp/4/quic-v1") t1 := ma.StringCast("/ip4/1.2.3.5/tcp/1/") + wrtc1 := ma.StringCast("/ip4/1.1.1.1/udp/1/webrtc-direct") testCase := []struct { name string @@ -37,7 +38,7 @@ func TestNoDelayDialRanker(t *testing.T) { }{ { name: "quic+webtransport filtered when quicv1", - addrs: []ma.Multiaddr{q1, q2, q3, q4, q1v1, q2v1, q3v1, wt1, t1}, + addrs: []ma.Multiaddr{q1, q2, q3, q4, q1v1, q2v1, q3v1, wt1, t1, wrtc1}, output: []network.AddrDelay{ {Addr: q1, Delay: 0}, {Addr: q2, Delay: 0}, @@ -48,6 +49,7 @@ func TestNoDelayDialRanker(t *testing.T) { {Addr: q3v1, Delay: 0}, {Addr: wt1, Delay: 0}, {Addr: t1, Delay: 0}, + {Addr: wrtc1, Delay: 0}, }, }, } @@ -287,3 +289,69 @@ func TestDelayRankerRelay(t *testing.T) { }) } } + +func TestDelayRankerOtherTransportDelay(t *testing.T) { + q1v1 := ma.StringCast("/ip4/1.2.3.4/udp/1/quic-v1") + q1v16 := ma.StringCast("/ip6/1::2/udp/1/quic-v1") + t1 := ma.StringCast("/ip4/1.2.3.5/tcp/1/") + t1v6 := ma.StringCast("/ip6/1::2/tcp/1") + wrtc1 := ma.StringCast("/ip4/1.2.3.4/udp/1/webrtc-direct") + wrtc1v6 := ma.StringCast("/ip6/1::2/udp/1/webrtc-direct") + onion1 := ma.StringCast("/onion3/vww6ybal4bd7szmgncyruucpgfkqahzddi37ktceo3ah7ngmcopnpyyd:1234") + onlyIP := ma.StringCast("/ip4/1.2.3.4/") + testCase := []struct { + name string + addrs []ma.Multiaddr + output []network.AddrDelay + }{ + { + name: "quic-with-other", + addrs: []ma.Multiaddr{q1v1, q1v16, wrtc1, wrtc1v6, onion1, onlyIP}, + output: []network.AddrDelay{ + {Addr: q1v16, Delay: 0}, + {Addr: q1v1, Delay: PublicQUICDelay}, + {Addr: wrtc1, Delay: PublicQUICDelay + PublicOtherDelay}, + {Addr: wrtc1v6, Delay: PublicQUICDelay + PublicOtherDelay}, + {Addr: onlyIP, Delay: PublicQUICDelay + PublicOtherDelay}, + {Addr: onion1, Delay: PublicQUICDelay + 2*PublicOtherDelay}, + }, + }, + { + name: "quic-and-tcp-with-other", + addrs: []ma.Multiaddr{q1v1, t1, t1v6, wrtc1, wrtc1v6, onion1, onlyIP}, + output: []network.AddrDelay{ + {Addr: q1v1, Delay: 0}, + {Addr: t1v6, Delay: PublicQUICDelay}, + {Addr: t1, Delay: 2 * PublicQUICDelay}, + {Addr: wrtc1, Delay: 2*PublicQUICDelay + PublicOtherDelay}, + {Addr: wrtc1v6, Delay: 2*PublicQUICDelay + PublicOtherDelay}, + {Addr: onlyIP, Delay: 2*PublicQUICDelay + PublicOtherDelay}, + {Addr: onion1, Delay: 2*PublicQUICDelay + 2*PublicOtherDelay}, + }, + }, + { + name: "only-non-ip-addr", + addrs: []ma.Multiaddr{onion1}, + output: []network.AddrDelay{ + {Addr: onion1, Delay: PublicOtherDelay}, + }, + }, + } + for _, tc := range testCase { + t.Run(tc.name, func(t *testing.T) { + res := DefaultDialRanker(tc.addrs) + if len(res) != len(tc.output) { + log.Errorf("expected %s got %s", tc.output, res) + t.Errorf("expected elems: %d got: %d", len(tc.output), len(res)) + return + } + sortAddrDelays(res) + sortAddrDelays(tc.output) + for i := 0; i < len(tc.output); i++ { + if !tc.output[i].Addr.Equal(res[i].Addr) || tc.output[i].Delay != res[i].Delay { + t.Fatalf("expected %+v got %+v", tc.output, res) + } + } + }) + } +} diff --git a/p2p/net/swarm/dial_worker.go b/p2p/net/swarm/dial_worker.go index 9d097f3c26..5bb9559e52 100644 --- a/p2p/net/swarm/dial_worker.go +++ b/p2p/net/swarm/dial_worker.go @@ -2,6 +2,7 @@ package swarm import ( "context" + "fmt" "math" "sync" "time" @@ -295,6 +296,9 @@ loop: } ad.dialed = true ad.dialRankingDelay = now.Sub(ad.createdAt) + if _, err := ad.addr.ValueForProtocol(ma.P_WEBRTC_DIRECT); err == nil { + fmt.Println("dial ranking delay", ad.addr, ad.dialRankingDelay) + } err := w.s.dialNextAddr(ad.ctx, w.peer, ad.addr, w.resch) if err != nil { // Errored without attempting a dial. This happens in case of diff --git a/p2p/test/basichost/basic_host_test.go b/p2p/test/basichost/basic_host_test.go index 9cd442dbf0..0197387b1b 100644 --- a/p2p/test/basichost/basic_host_test.go +++ b/p2p/test/basichost/basic_host_test.go @@ -8,11 +8,14 @@ import ( "time" "github.com/libp2p/go-libp2p" + "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peerstore" + "github.com/libp2p/go-libp2p/p2p/net/swarm" "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/client" "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/relay" + libp2pquic "github.com/libp2p/go-libp2p/p2p/transport/quic" libp2pwebrtc "github.com/libp2p/go-libp2p/p2p/transport/webrtc" libp2pwebtransport "github.com/libp2p/go-libp2p/p2p/transport/webtransport" ma "github.com/multiformats/go-multiaddr" @@ -199,3 +202,69 @@ func TestAddrFactorCertHashAppend(t *testing.T) { return hasWebRTC && hasWebTransport }, 5*time.Second, 100*time.Millisecond) } + +func TestOnlyWebRTCDirectDialNoDelay(t *testing.T) { + // This tests that only webrtc-direct dials are dialled immediately + // and not delayed by dial ranker. + h1, err := libp2p.New( + libp2p.Transport(libp2pwebrtc.New), + libp2p.ListenAddrStrings( + "/ip4/0.0.0.0/udp/0/webrtc-direct", + ), + ) + require.NoError(t, err) + h2, err := libp2p.New( + libp2p.Transport(libp2pwebrtc.New), + libp2p.NoListenAddrs, + ) + require.NoError(t, err) + + ctx, cancel := context.WithTimeout(context.Background(), swarm.PrivateOtherDelay-10*time.Millisecond) + defer cancel() + err = h2.Connect(ctx, peer.AddrInfo{ID: h1.ID(), Addrs: h1.Addrs()}) + require.NoError(t, err) +} + +func TestWebRTCWithQUICManyConnections(t *testing.T) { + // Correctly fixes: https://github.com/libp2p/js-libp2p/issues/2805 + + // The server has both /quic-v1 and /webrtc-direct listen addresses + h, err := libp2p.New( + libp2p.Transport(libp2pquic.NewTransport), + libp2p.Transport(libp2pwebrtc.New), + libp2p.ListenAddrStrings("/ip4/0.0.0.0/udp/0/quic-v1"), + libp2p.ListenAddrStrings("/ip4/0.0.0.0/udp/0/webrtc-direct"), + libp2p.ResourceManager(&network.NullResourceManager{}), + ) + require.NoError(t, err) + defer h.Close() + + const N = 200 + // These N dialers have both /quic-v1 and /webrtc-direct transports + var dialers [N]host.Host + for i := 0; i < N; i++ { + dialers[i], err = libp2p.New(libp2p.NoListenAddrs) + require.NoError(t, err) + defer dialers[i].Close() + } + // This dialer has only /webrtc-direct transport + d, err := libp2p.New(libp2p.Transport(libp2pwebrtc.New), libp2p.NoListenAddrs) + require.NoError(t, err) + defer d.Close() + + for i := 0; i < N; i++ { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + // With happy eyeballs these dialers will connect over only /quic-v1 + // and not stall the /webrtc-direct handshake goroutines. + // it is fine if the dial fails, we just want to ensure that there's space + // in the /webrtc-direct listen queue + _ = dialers[i].Connect(ctx, peer.AddrInfo{ID: h.ID(), Addrs: h.Addrs()}) + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + // The webrtc only dialer should be able to connect to the peer + err = d.Connect(ctx, peer.AddrInfo{ID: h.ID(), Addrs: h.Addrs()}) + require.NoError(t, err) +}