From 90e560b32647cb150419d3a85990cf7a05fb218b Mon Sep 17 00:00:00 2001 From: sukun Date: Tue, 3 Dec 2024 14:57:44 +0530 Subject: [PATCH 1/5] swarm: delay /webrtc-direct dials by 1 second Previously these addresses weren't delayed at all. When I initially did the ranker implementation, I was too conservative regarding what to delay. So, non QUIC, WebTransport, TCP, WS addresses were ignored in ranking. We only ever need to dial /webrtc-direct when there's no other address available for the peer, in which case we will dial the addresses immediately. This would have helped with https://github.com/libp2p/js-libp2p/issues/2805 as there would have been fewer peers dialing webrtc and then cancelling because they connected on a better transport. This also introduces an additional 1 second delay for any fancy non IP transports --- p2p/net/swarm/dial_ranker.go | 35 +++++++++++++---- p2p/net/swarm/dial_ranker_test.go | 63 ++++++++++++++++++++++++++++++- 2 files changed, 90 insertions(+), 8 deletions(-) diff --git a/p2p/net/swarm/dial_ranker.go b/p2p/net/swarm/dial_ranker.go index 7e58876b91..154a0344a1 100644 --- a/p2p/net/swarm/dial_ranker.go +++ b/p2p/net/swarm/dial_ranker.go @@ -22,11 +22,15 @@ const ( // RelayDelay is the duration by which relay dials are delayed relative to direct addresses RelayDelay = 500 * time.Millisecond + + // delay for other transport addresses. This will apply to /webrtc-direct. + PublicOtherDelay = 1 * time.Second + PrivateOtherDelay = 100 * time.Millisecond ) // NoDelayDialRanker ranks addresses with no delay. This is useful for simultaneous connect requests. func NoDelayDialRanker(addrs []ma.Multiaddr) []network.AddrDelay { - return getAddrDelay(addrs, 0, 0, 0) + return getAddrDelay(addrs, 0, 0, 0, 0) } // DefaultDialRanker determines the ranking of outgoing connection attempts. @@ -67,8 +71,11 @@ func NoDelayDialRanker(addrs []ma.Multiaddr) []network.AddrDelay { // 5. If only one of TCP IPv6 or TCP IPv4 addresses are present, dial the TCP address with the lowest port // first. After this we dial the rest of the TCP addresses delayed by 250ms (PublicTCPDelay) for public // addresses, and 30ms (PrivateTCPDelay) for local addresses. -// 6. When a TCP socket is connected and awaiting security and muxer upgrade, we stop new dials for 2*PrivateTCPDelay +// 6. When a TCP socket is connected and awaiting security and muxer upgrade, we stop new dials for 2*PublicTCPDelay // to allow for the upgrade to complete. +// 7. WebRTC Direct, and other IP transport addresses are dialed 1 second after the last QUIC or TCP dial. +// We only ever need to dial these if the peer doesn't have any other transport available, in which +// case these are dialed immediately. // // We dial lowest ports first as they are more likely to be the listen port. func DefaultDialRanker(addrs []ma.Multiaddr) []network.AddrDelay { @@ -83,13 +90,18 @@ func DefaultDialRanker(addrs []ma.Multiaddr) []network.AddrDelay { } res := make([]network.AddrDelay, 0, len(addrs)) + res = append(res, getAddrDelay(pvt, PrivateTCPDelay, PrivateQUICDelay, PrivateOtherDelay, 0)...) + res = append(res, getAddrDelay(public, PublicTCPDelay, PublicQUICDelay, PublicOtherDelay, 0)...) + res = append(res, getAddrDelay(relay, PublicTCPDelay, PublicQUICDelay, PublicOtherDelay, relayOffset)...) + var maxDelay time.Duration + if len(res) > 0 { + maxDelay = res[len(res)-1].Delay + } + for i := 0; i < len(addrs); i++ { - res = append(res, network.AddrDelay{Addr: addrs[i], Delay: 0}) + res = append(res, network.AddrDelay{Addr: addrs[i], Delay: maxDelay + PublicOtherDelay}) } - res = append(res, getAddrDelay(pvt, PrivateTCPDelay, PrivateQUICDelay, 0)...) - res = append(res, getAddrDelay(public, PublicTCPDelay, PublicQUICDelay, 0)...) - res = append(res, getAddrDelay(relay, PublicTCPDelay, PublicQUICDelay, relayOffset)...) return res } @@ -98,7 +110,7 @@ func DefaultDialRanker(addrs []ma.Multiaddr) []network.AddrDelay { // offset is used to delay all addresses by a fixed duration. This is useful for delaying all relay // addresses relative to direct addresses. func getAddrDelay(addrs []ma.Multiaddr, tcpDelay time.Duration, quicDelay time.Duration, - offset time.Duration) []network.AddrDelay { + otherDelay time.Duration, offset time.Duration) []network.AddrDelay { if len(addrs) == 0 { return nil } @@ -158,6 +170,7 @@ func getAddrDelay(addrs []ma.Multiaddr, tcpDelay time.Duration, quicDelay time.D res := make([]network.AddrDelay, 0, len(addrs)) var tcpFirstDialDelay time.Duration + var lastQUICOrTCPDelay time.Duration for i, addr := range addrs { var delay time.Duration switch { @@ -176,6 +189,7 @@ func getAddrDelay(addrs []ma.Multiaddr, tcpDelay time.Duration, quicDelay time.D delay = quicDelay } } + lastQUICOrTCPDelay = delay tcpFirstDialDelay = delay + tcpDelay case isProtocolAddr(addr, ma.P_TCP): // We dial an IPv6 address, then after tcpDelay an IPv4 @@ -193,6 +207,10 @@ func getAddrDelay(addrs []ma.Multiaddr, tcpDelay time.Duration, quicDelay time.D } } delay += tcpFirstDialDelay + lastQUICOrTCPDelay = delay + // if it's neither quic, webtransport, tcp, or websocket address + default: + delay = lastQUICOrTCPDelay + otherDelay } res = append(res, network.AddrDelay{Addr: addr, Delay: offset + delay}) } @@ -230,6 +248,9 @@ func score(a ma.Multiaddr) int { pi, _ := strconv.Atoi(p) return ip4Weight + pi + (1 << 20) } + if _, err := a.ValueForProtocol(ma.P_WEBRTC_DIRECT); err == nil { + return 1 << 21 + } return (1 << 30) } diff --git a/p2p/net/swarm/dial_ranker_test.go b/p2p/net/swarm/dial_ranker_test.go index 5ef3cc27f1..81ceb28b82 100644 --- a/p2p/net/swarm/dial_ranker_test.go +++ b/p2p/net/swarm/dial_ranker_test.go @@ -29,6 +29,7 @@ func TestNoDelayDialRanker(t *testing.T) { q3v1 := ma.StringCast("/ip4/1.2.3.4/udp/3/quic-v1") q4 := ma.StringCast("/ip4/1.2.3.4/udp/4/quic-v1") t1 := ma.StringCast("/ip4/1.2.3.5/tcp/1/") + wrtc1 := ma.StringCast("/ip4/1.1.1.1/udp/1/webrtc-direct") testCase := []struct { name string @@ -37,7 +38,7 @@ func TestNoDelayDialRanker(t *testing.T) { }{ { name: "quic+webtransport filtered when quicv1", - addrs: []ma.Multiaddr{q1, q2, q3, q4, q1v1, q2v1, q3v1, wt1, t1}, + addrs: []ma.Multiaddr{q1, q2, q3, q4, q1v1, q2v1, q3v1, wt1, t1, wrtc1}, output: []network.AddrDelay{ {Addr: q1, Delay: 0}, {Addr: q2, Delay: 0}, @@ -48,6 +49,7 @@ func TestNoDelayDialRanker(t *testing.T) { {Addr: q3v1, Delay: 0}, {Addr: wt1, Delay: 0}, {Addr: t1, Delay: 0}, + {Addr: wrtc1, Delay: 0}, }, }, } @@ -287,3 +289,62 @@ func TestDelayRankerRelay(t *testing.T) { }) } } + +func TestDelayRankerOtherTransportDelay(t *testing.T) { + q1v1 := ma.StringCast("/ip4/1.2.3.4/udp/1/quic-v1") + q1v16 := ma.StringCast("/ip6/1::2/udp/1/quic-v1") + t1 := ma.StringCast("/ip4/1.2.3.5/tcp/1/") + t1v6 := ma.StringCast("/ip6/1::2/tcp/1") + wrtc1 := ma.StringCast("/ip4/1.2.3.4/udp/1/webrtc-direct") + wrtc1v6 := ma.StringCast("/ip6/1::2/udp/1/webrtc-direct") + onion1 := ma.StringCast("/onion3/vww6ybal4bd7szmgncyruucpgfkqahzddi37ktceo3ah7ngmcopnpyyd:1234") + onlyIP := ma.StringCast("/ip4/1.2.3.4/") + testCase := []struct { + name string + addrs []ma.Multiaddr + output []network.AddrDelay + }{ + { + name: "quic-with-other", + addrs: []ma.Multiaddr{q1v1, q1v16, wrtc1, wrtc1v6, onion1, onlyIP}, + output: []network.AddrDelay{ + {Addr: q1v16, Delay: 0}, + {Addr: q1v1, Delay: PublicQUICDelay}, + {Addr: wrtc1, Delay: PublicQUICDelay + PublicOtherDelay}, + {Addr: wrtc1v6, Delay: PublicQUICDelay + PublicOtherDelay}, + {Addr: onlyIP, Delay: PublicQUICDelay + PublicOtherDelay}, + {Addr: onion1, Delay: PublicQUICDelay + 2*PublicOtherDelay}, + }, + }, + { + name: "quic-and-tcp-with-other", + addrs: []ma.Multiaddr{q1v1, t1, t1v6, wrtc1, wrtc1v6, onion1, onlyIP}, + output: []network.AddrDelay{ + {Addr: q1v1, Delay: 0}, + {Addr: t1v6, Delay: PublicQUICDelay}, + {Addr: t1, Delay: 2 * PublicQUICDelay}, + {Addr: wrtc1, Delay: 2*PublicQUICDelay + PublicOtherDelay}, + {Addr: wrtc1v6, Delay: 2*PublicQUICDelay + PublicOtherDelay}, + {Addr: onlyIP, Delay: 2*PublicQUICDelay + PublicOtherDelay}, + {Addr: onion1, Delay: 2*PublicQUICDelay + 2*PublicOtherDelay}, + }, + }, + } + for _, tc := range testCase { + t.Run(tc.name, func(t *testing.T) { + res := DefaultDialRanker(tc.addrs) + if len(res) != len(tc.output) { + log.Errorf("expected %s got %s", tc.output, res) + t.Errorf("expected elems: %d got: %d", len(tc.output), len(res)) + return + } + sortAddrDelays(res) + sortAddrDelays(tc.output) + for i := 0; i < len(tc.output); i++ { + if !tc.output[i].Addr.Equal(res[i].Addr) || tc.output[i].Delay != res[i].Delay { + t.Fatalf("expected %+v got %+v", tc.output, res) + } + } + }) + } +} From b352a542d97fa036340a75f3d069bc9330c8b66c Mon Sep 17 00:00:00 2001 From: sukun Date: Tue, 3 Dec 2024 23:21:29 +0530 Subject: [PATCH 2/5] add a basic host test that only webrtc dials aren't delayed --- p2p/net/swarm/dial_ranker_test.go | 7 +++++++ p2p/test/basichost/basic_host_test.go | 24 ++++++++++++++++++++++++ 2 files changed, 31 insertions(+) diff --git a/p2p/net/swarm/dial_ranker_test.go b/p2p/net/swarm/dial_ranker_test.go index 81ceb28b82..f7a6172122 100644 --- a/p2p/net/swarm/dial_ranker_test.go +++ b/p2p/net/swarm/dial_ranker_test.go @@ -329,6 +329,13 @@ func TestDelayRankerOtherTransportDelay(t *testing.T) { {Addr: onion1, Delay: 2*PublicQUICDelay + 2*PublicOtherDelay}, }, }, + { + name: "only-non-ip-addr", + addrs: []ma.Multiaddr{onion1}, + output: []network.AddrDelay{ + {Addr: onion1, Delay: PublicOtherDelay}, + }, + }, } for _, tc := range testCase { t.Run(tc.name, func(t *testing.T) { diff --git a/p2p/test/basichost/basic_host_test.go b/p2p/test/basichost/basic_host_test.go index 9cd442dbf0..63aa3faf98 100644 --- a/p2p/test/basichost/basic_host_test.go +++ b/p2p/test/basichost/basic_host_test.go @@ -11,6 +11,7 @@ import ( "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peerstore" + "github.com/libp2p/go-libp2p/p2p/net/swarm" "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/client" "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/relay" libp2pwebrtc "github.com/libp2p/go-libp2p/p2p/transport/webrtc" @@ -199,3 +200,26 @@ func TestAddrFactorCertHashAppend(t *testing.T) { return hasWebRTC && hasWebTransport }, 5*time.Second, 100*time.Millisecond) } + +func TestWebRTCDirectDialDelay(t *testing.T) { + // This tests that only webrtc-direct dials are dialled immediately + // and not delayed by dial ranker. + + h1, err := libp2p.New( + libp2p.Transport(libp2pwebrtc.New), + libp2p.ListenAddrStrings( + "/ip4/0.0.0.0/udp/0/webrtc-direct", + ), + ) + require.NoError(t, err) + h2, err := libp2p.New( + libp2p.Transport(libp2pwebrtc.New), + libp2p.NoListenAddrs, + ) + require.NoError(t, err) + + ctx, cancel := context.WithTimeout(context.Background(), swarm.PrivateOtherDelay) + defer cancel() + err = h2.Connect(ctx, peer.AddrInfo{ID: h1.ID(), Addrs: h1.Addrs()}) + require.NoError(t, err) +} From e98affd7fc31cc506d5c80c67ff7afe7cf8b6d79 Mon Sep 17 00:00:00 2001 From: sukun Date: Wed, 4 Dec 2024 17:59:05 +0530 Subject: [PATCH 3/5] add a test that would've failed before increasing listenqueue size --- p2p/test/basichost/basic_host_test.go | 50 ++++++++++++++++++++++++++- 1 file changed, 49 insertions(+), 1 deletion(-) diff --git a/p2p/test/basichost/basic_host_test.go b/p2p/test/basichost/basic_host_test.go index 63aa3faf98..1ab2e520af 100644 --- a/p2p/test/basichost/basic_host_test.go +++ b/p2p/test/basichost/basic_host_test.go @@ -4,16 +4,19 @@ import ( "context" "fmt" "strings" + "sync" "testing" "time" "github.com/libp2p/go-libp2p" + "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peerstore" "github.com/libp2p/go-libp2p/p2p/net/swarm" "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/client" "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/relay" + libp2pquic "github.com/libp2p/go-libp2p/p2p/transport/quic" libp2pwebrtc "github.com/libp2p/go-libp2p/p2p/transport/webrtc" libp2pwebtransport "github.com/libp2p/go-libp2p/p2p/transport/webtransport" ma "github.com/multiformats/go-multiaddr" @@ -218,8 +221,53 @@ func TestWebRTCDirectDialDelay(t *testing.T) { ) require.NoError(t, err) - ctx, cancel := context.WithTimeout(context.Background(), swarm.PrivateOtherDelay) + ctx, cancel := context.WithTimeout(context.Background(), swarm.PrivateOtherDelay-10*time.Millisecond) defer cancel() err = h2.Connect(ctx, peer.AddrInfo{ID: h1.ID(), Addrs: h1.Addrs()}) require.NoError(t, err) } + +func TestWebRTCWithQUICManyConnections(t *testing.T) { + h, err := libp2p.New( + libp2p.Transport(libp2pquic.NewTransport), + libp2p.Transport(libp2pwebrtc.New), + libp2p.ListenAddrStrings("/ip4/0.0.0.0/udp/0/quic-v1"), + libp2p.ListenAddrStrings("/ip4/0.0.0.0/udp/0/webrtc-direct"), + ) + require.NoError(t, err) + defer h.Close() + + const N = 50 + var dialers [N]host.Host + for i := 0; i < N; i++ { + dialers[i], err = libp2p.New(libp2p.NoListenAddrs) + require.NoError(t, err) + defer dialers[i].Close() + } + // only webrtc dialer for dialing the peer later + d, err := libp2p.New(libp2p.Transport(libp2pwebrtc.New), libp2p.NoListenAddrs) + require.NoError(t, err) + defer d.Close() + + startDial := make(chan struct{}) + var wg sync.WaitGroup + wg.Add(N) + for i := 0; i < N; i++ { + go func() { + defer wg.Done() + <-startDial + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + // it is fine if the dial fails, we just want to take up all the webrtc + // listen queue + _ = dialers[i].Connect(ctx, peer.AddrInfo{ID: h.ID(), Addrs: h.Addrs()}) + }() + } + close(startDial) + wg.Wait() + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + err = d.Connect(ctx, peer.AddrInfo{ID: h.ID(), Addrs: h.Addrs()}) + require.NoError(t, err) +} From faeda47002e890f98b71903c63199812fb25b925 Mon Sep 17 00:00:00 2001 From: sukun Date: Wed, 4 Dec 2024 18:00:18 +0530 Subject: [PATCH 4/5] add a test that would've failed before increasing listenqueue size --- p2p/test/basichost/basic_host_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/p2p/test/basichost/basic_host_test.go b/p2p/test/basichost/basic_host_test.go index 1ab2e520af..aa6ee688d0 100644 --- a/p2p/test/basichost/basic_host_test.go +++ b/p2p/test/basichost/basic_host_test.go @@ -207,7 +207,6 @@ func TestAddrFactorCertHashAppend(t *testing.T) { func TestWebRTCDirectDialDelay(t *testing.T) { // This tests that only webrtc-direct dials are dialled immediately // and not delayed by dial ranker. - h1, err := libp2p.New( libp2p.Transport(libp2pwebrtc.New), libp2p.ListenAddrStrings( @@ -228,6 +227,8 @@ func TestWebRTCDirectDialDelay(t *testing.T) { } func TestWebRTCWithQUICManyConnections(t *testing.T) { + // Correctly fixes: https://github.com/libp2p/js-libp2p/issues/2805 + h, err := libp2p.New( libp2p.Transport(libp2pquic.NewTransport), libp2p.Transport(libp2pwebrtc.New), From e675e4b509b991f7a78934c11f2b8472f289e116 Mon Sep 17 00:00:00 2001 From: sukun Date: Thu, 5 Dec 2024 02:22:42 +0530 Subject: [PATCH 5/5] improve test name --- p2p/net/swarm/dial_worker.go | 4 ++++ p2p/test/basichost/basic_host_test.go | 32 ++++++++++++--------------- 2 files changed, 18 insertions(+), 18 deletions(-) diff --git a/p2p/net/swarm/dial_worker.go b/p2p/net/swarm/dial_worker.go index 9d097f3c26..5bb9559e52 100644 --- a/p2p/net/swarm/dial_worker.go +++ b/p2p/net/swarm/dial_worker.go @@ -2,6 +2,7 @@ package swarm import ( "context" + "fmt" "math" "sync" "time" @@ -295,6 +296,9 @@ loop: } ad.dialed = true ad.dialRankingDelay = now.Sub(ad.createdAt) + if _, err := ad.addr.ValueForProtocol(ma.P_WEBRTC_DIRECT); err == nil { + fmt.Println("dial ranking delay", ad.addr, ad.dialRankingDelay) + } err := w.s.dialNextAddr(ad.ctx, w.peer, ad.addr, w.resch) if err != nil { // Errored without attempting a dial. This happens in case of diff --git a/p2p/test/basichost/basic_host_test.go b/p2p/test/basichost/basic_host_test.go index aa6ee688d0..0197387b1b 100644 --- a/p2p/test/basichost/basic_host_test.go +++ b/p2p/test/basichost/basic_host_test.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "strings" - "sync" "testing" "time" @@ -204,7 +203,7 @@ func TestAddrFactorCertHashAppend(t *testing.T) { }, 5*time.Second, 100*time.Millisecond) } -func TestWebRTCDirectDialDelay(t *testing.T) { +func TestOnlyWebRTCDirectDialNoDelay(t *testing.T) { // This tests that only webrtc-direct dials are dialled immediately // and not delayed by dial ranker. h1, err := libp2p.New( @@ -229,46 +228,43 @@ func TestWebRTCDirectDialDelay(t *testing.T) { func TestWebRTCWithQUICManyConnections(t *testing.T) { // Correctly fixes: https://github.com/libp2p/js-libp2p/issues/2805 + // The server has both /quic-v1 and /webrtc-direct listen addresses h, err := libp2p.New( libp2p.Transport(libp2pquic.NewTransport), libp2p.Transport(libp2pwebrtc.New), libp2p.ListenAddrStrings("/ip4/0.0.0.0/udp/0/quic-v1"), libp2p.ListenAddrStrings("/ip4/0.0.0.0/udp/0/webrtc-direct"), + libp2p.ResourceManager(&network.NullResourceManager{}), ) require.NoError(t, err) defer h.Close() - const N = 50 + const N = 200 + // These N dialers have both /quic-v1 and /webrtc-direct transports var dialers [N]host.Host for i := 0; i < N; i++ { dialers[i], err = libp2p.New(libp2p.NoListenAddrs) require.NoError(t, err) defer dialers[i].Close() } - // only webrtc dialer for dialing the peer later + // This dialer has only /webrtc-direct transport d, err := libp2p.New(libp2p.Transport(libp2pwebrtc.New), libp2p.NoListenAddrs) require.NoError(t, err) defer d.Close() - startDial := make(chan struct{}) - var wg sync.WaitGroup - wg.Add(N) for i := 0; i < N; i++ { - go func() { - defer wg.Done() - <-startDial - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - // it is fine if the dial fails, we just want to take up all the webrtc - // listen queue - _ = dialers[i].Connect(ctx, peer.AddrInfo{ID: h.ID(), Addrs: h.Addrs()}) - }() + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + // With happy eyeballs these dialers will connect over only /quic-v1 + // and not stall the /webrtc-direct handshake goroutines. + // it is fine if the dial fails, we just want to ensure that there's space + // in the /webrtc-direct listen queue + _ = dialers[i].Connect(ctx, peer.AddrInfo{ID: h.ID(), Addrs: h.Addrs()}) } - close(startDial) - wg.Wait() ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() + // The webrtc only dialer should be able to connect to the peer err = d.Connect(ctx, peer.AddrInfo{ID: h.ID(), Addrs: h.Addrs()}) require.NoError(t, err) }