From 994b69e4a163483b85089a2a6ba97d81208e85d1 Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Mon, 4 Nov 2024 09:42:44 -0800 Subject: [PATCH 1/6] fix: basichost: Use NegotiationTimeout as fallback timeout for NewStream (#3020) --- p2p/host/basic/basic_host.go | 15 +++++++-- p2p/host/basic/basic_host_test.go | 54 +++++++++++++++++++++++++++++++ 2 files changed, 66 insertions(+), 3 deletions(-) diff --git a/p2p/host/basic/basic_host.go b/p2p/host/basic/basic_host.go index f7d3c5275a..820411bd27 100644 --- a/p2p/host/basic/basic_host.go +++ b/p2p/host/basic/basic_host.go @@ -122,9 +122,10 @@ type HostOpts struct { // MultistreamMuxer is essential for the *BasicHost and will use a sensible default value if omitted. MultistreamMuxer *msmux.MultistreamMuxer[protocol.ID] - // NegotiationTimeout determines the read and write timeouts on streams. - // If 0 or omitted, it will use DefaultNegotiationTimeout. - // If below 0, timeouts on streams will be deactivated. + // NegotiationTimeout determines the read and write timeouts when negotiating + // protocols for streams. If 0 or omitted, it will use + // DefaultNegotiationTimeout. If below 0, timeouts on streams will be + // deactivated. NegotiationTimeout time.Duration // AddrsFactory holds a function which can be used to override or filter the result of Addrs. @@ -689,6 +690,14 @@ func (h *BasicHost) RemoveStreamHandler(pid protocol.ID) { // to create one. If ProtocolID is "", writes no header. // (Thread-safe) func (h *BasicHost) NewStream(ctx context.Context, p peer.ID, pids ...protocol.ID) (str network.Stream, strErr error) { + if _, ok := ctx.Deadline(); !ok { + if h.negtimeout > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, h.negtimeout) + defer cancel() + } + } + // If the caller wants to prevent the host from dialing, it should use the NoDial option. if nodial, _ := network.GetNoDial(ctx); !nodial { err := h.Connect(ctx, peer.AddrInfo{ID: p}) diff --git a/p2p/host/basic/basic_host_test.go b/p2p/host/basic/basic_host_test.go index 1ab98aae9d..2a7a772976 100644 --- a/p2p/host/basic/basic_host_test.go +++ b/p2p/host/basic/basic_host_test.go @@ -2,6 +2,7 @@ package basichost import ( "context" + "encoding/binary" "fmt" "io" "reflect" @@ -941,3 +942,56 @@ func TestTrimHostAddrList(t *testing.T) { }) } } + +func TestHostTimeoutNewStream(t *testing.T) { + h1, err := NewHost(swarmt.GenSwarm(t), nil) + require.NoError(t, err) + h1.Start() + defer h1.Close() + + const proto = "/testing" + h2 := swarmt.GenSwarm(t) + + h2.SetStreamHandler(func(s network.Stream) { + // First message is multistream header. Just echo it + msHeader := []byte("\x19/multistream/1.0.0\n") + _, err := s.Read(msHeader) + assert.NoError(t, err) + _, err = s.Write(msHeader) + assert.NoError(t, err) + + buf := make([]byte, 1024) + n, err := s.Read(buf) + assert.NoError(t, err) + + msgLen, varintN := binary.Uvarint(buf[:n]) + buf = buf[varintN:] + proto := buf[:int(msgLen)] + if string(proto) == "/ipfs/id/1.0.0\n" { + // Signal we don't support identify + na := []byte("na\n") + n := binary.PutUvarint(buf, uint64(len(na))) + copy(buf[n:], na) + + _, err = s.Write(buf[:int(n)+len(na)]) + assert.NoError(t, err) + } else { + // Stall + time.Sleep(5 * time.Second) + } + t.Log("Resetting") + s.Reset() + }) + + err = h1.Connect(context.Background(), peer.AddrInfo{ + ID: h2.LocalPeer(), + Addrs: h2.ListenAddresses(), + }) + require.NoError(t, err) + + // No context passed in, fallback to negtimeout + h1.negtimeout = time.Second + _, err = h1.NewStream(context.Background(), h2.LocalPeer(), proto) + require.Error(t, err) + require.ErrorContains(t, err, "context deadline exceeded") +} From 2404f75d8f4f3e566fc09a43131b9d7239ad3a0b Mon Sep 17 00:00:00 2001 From: sukun Date: Mon, 18 Nov 2024 12:03:43 +0530 Subject: [PATCH 2/6] webrtc: handshake more connections in parallel (#3040) --- p2p/transport/webrtc/listener.go | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/p2p/transport/webrtc/listener.go b/p2p/transport/webrtc/listener.go index d4ba3c0550..c3e2f29799 100644 --- a/p2p/transport/webrtc/listener.go +++ b/p2p/transport/webrtc/listener.go @@ -33,8 +33,12 @@ func (c *connMultiaddrs) LocalMultiaddr() ma.Multiaddr { return c.local } func (c *connMultiaddrs) RemoteMultiaddr() ma.Multiaddr { return c.remote } const ( - candidateSetupTimeout = 20 * time.Second - DefaultMaxInFlightConnections = 10 + candidateSetupTimeout = 10 * time.Second + // This is higher than other transports(64) as there's no way to detect a peer that has gone away after + // sending the initial connection request message(STUN Binding request). Such peers take up a goroutine + // till connection timeout. As the number of handshakes in parallel is still guarded by the resource + // manager, this higher number is okay. + DefaultMaxInFlightConnections = 128 ) type listener struct { @@ -325,8 +329,7 @@ func (l *listener) Multiaddr() ma.Multiaddr { // addOnConnectionStateChangeCallback adds the OnConnectionStateChange to the PeerConnection. // The channel returned here: // * is closed when the state changes to Connection -// * receives an error when the state changes to Failed -// * doesn't receive anything (nor is closed) when the state changes to Disconnected +// * receives an error when the state changes to Failed or Closed or Disconnected func addOnConnectionStateChangeCallback(pc *webrtc.PeerConnection) <-chan error { errC := make(chan error, 1) var once sync.Once @@ -334,17 +337,15 @@ func addOnConnectionStateChangeCallback(pc *webrtc.PeerConnection) <-chan error switch pc.ConnectionState() { case webrtc.PeerConnectionStateConnected: once.Do(func() { close(errC) }) - case webrtc.PeerConnectionStateFailed: + // PeerConnectionStateFailed happens when we fail to negotiate the connection. + // PeerConnectionStateDisconnected happens when we disconnect immediately after connecting. + // PeerConnectionStateClosed happens when we close the peer connection locally, not when remote closes. We don't need + // to error in this case, but it's a no-op, so it doesn't hurt. + case webrtc.PeerConnectionStateFailed, webrtc.PeerConnectionStateClosed, webrtc.PeerConnectionStateDisconnected: once.Do(func() { errC <- errors.New("peerconnection failed") close(errC) }) - case webrtc.PeerConnectionStateDisconnected: - // the connection can move to a disconnected state and back to a connected state without ICE renegotiation. - // This could happen when underlying UDP packets are lost, and therefore the connection moves to the disconnected state. - // If the connection then receives packets on the connection, it can move back to the connected state. - // If no packets are received until the failed timeout is triggered, the connection moves to the failed state. - log.Warn("peerconnection disconnected") } }) return errC From 09ae1790ef1c1750b9e80802b4cfcf27645fdcc2 Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Mon, 18 Nov 2024 14:08:57 -0800 Subject: [PATCH 3/6] fix: obsaddr: do not record observations over relayed conn (#3043) These observations are telling us what the relay's IP address is. This isn't useful for us at all. This also solves an issue during hole punching where we may report the relay's address as our own. --- p2p/protocol/identify/obsaddr.go | 13 ++++++++++++- p2p/protocol/identify/obsaddr_glass_test.go | 18 ++++++++++++++++++ 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/p2p/protocol/identify/obsaddr.go b/p2p/protocol/identify/obsaddr.go index ffe60345e1..06e54bf5fd 100644 --- a/p2p/protocol/identify/obsaddr.go +++ b/p2p/protocol/identify/obsaddr.go @@ -335,6 +335,11 @@ func (o *ObservedAddrManager) worker() { } } +func isRelayedAddress(a ma.Multiaddr) bool { + _, err := a.ValueForProtocol(ma.P_CIRCUIT) + return err == nil +} + func (o *ObservedAddrManager) shouldRecordObservation(conn connMultiaddrs, observed ma.Multiaddr) (shouldRecord bool, localTW thinWaist, observedTW thinWaist) { if conn == nil || observed == nil { return false, thinWaist{}, thinWaist{} @@ -350,6 +355,12 @@ func (o *ObservedAddrManager) shouldRecordObservation(conn connMultiaddrs, obser return false, thinWaist{}, thinWaist{} } + // Ignore p2p-circuit addresses. These are the observed address of the relay. + // Not useful for us. + if isRelayedAddress(observed) { + return false, thinWaist{}, thinWaist{} + } + // we should only use ObservedAddr when our connection's LocalAddr is one // of our ListenAddrs. If we Dial out using an ephemeral addr, knowing that // address's external mapping is not very useful because the port will not be @@ -410,7 +421,7 @@ func (o *ObservedAddrManager) maybeRecordObservation(conn connMultiaddrs, observ if !shouldRecord { return } - log.Debugw("added own observed listen addr", "observed", observed) + log.Debugw("added own observed listen addr", "conn", conn, "observed", observed) o.mu.Lock() defer o.mu.Unlock() diff --git a/p2p/protocol/identify/obsaddr_glass_test.go b/p2p/protocol/identify/obsaddr_glass_test.go index 31fd4f5726..3211aa5f54 100644 --- a/p2p/protocol/identify/obsaddr_glass_test.go +++ b/p2p/protocol/identify/obsaddr_glass_test.go @@ -53,6 +53,24 @@ func TestShouldRecordObservationWithWebTransport(t *testing.T) { require.True(t, shouldRecord) } +func TestShouldNotRecordObservationWithRelayedAddr(t *testing.T) { + listenAddr := ma.StringCast("/ip4/1.2.3.4/udp/8888/quic-v1/p2p-circuit") + ifaceAddr := ma.StringCast("/ip4/10.0.0.2/udp/9999/quic-v1") + listenAddrs := func() []ma.Multiaddr { return []ma.Multiaddr{listenAddr} } + ifaceListenAddrs := func() ([]ma.Multiaddr, error) { return []ma.Multiaddr{ifaceAddr}, nil } + addrs := func() []ma.Multiaddr { return []ma.Multiaddr{listenAddr} } + + c := &mockConn{ + local: listenAddr, + remote: ma.StringCast("/ip4/1.2.3.6/udp/1236/quic-v1/p2p-circuit"), + } + observedAddr := ma.StringCast("/ip4/1.2.3.4/udp/1231/quic-v1/p2p-circuit") + o, err := NewObservedAddrManager(listenAddrs, addrs, ifaceListenAddrs, normalize) + require.NoError(t, err) + shouldRecord, _, _ := o.shouldRecordObservation(c, observedAddr) + require.False(t, shouldRecord) +} + func TestShouldRecordObservationWithNAT64Addr(t *testing.T) { listenAddr1 := ma.StringCast("/ip4/0.0.0.0/tcp/1234") ifaceAddr1 := ma.StringCast("/ip4/10.0.0.2/tcp/4321") From df02f4d0a19fb9a31dc12a626e0066ca927be911 Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Mon, 18 Nov 2024 14:38:22 -0800 Subject: [PATCH 4/6] chore: update go-multistream to v0.6.0 (#3041) Brings from go-multistream that lets us wait for a handshake to finish before closing the stream. --- go.mod | 2 +- go.sum | 4 ++-- test-plans/go.mod | 2 +- test-plans/go.sum | 4 ++-- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/go.mod b/go.mod index 9966beb932..c6f3f2b324 100644 --- a/go.mod +++ b/go.mod @@ -44,7 +44,7 @@ require ( github.com/multiformats/go-multibase v0.2.0 github.com/multiformats/go-multicodec v0.9.0 github.com/multiformats/go-multihash v0.2.3 - github.com/multiformats/go-multistream v0.5.0 + github.com/multiformats/go-multistream v0.6.0 github.com/multiformats/go-varint v0.0.7 github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 github.com/pion/datachannel v1.5.9 diff --git a/go.sum b/go.sum index 5c288ad7ab..4650eef1f0 100644 --- a/go.sum +++ b/go.sum @@ -246,8 +246,8 @@ github.com/multiformats/go-multicodec v0.9.0/go.mod h1:L3QTQvMIaVBkXOXXtVmYE+LI1 github.com/multiformats/go-multihash v0.0.8/go.mod h1:YSLudS+Pi8NHE7o6tb3D8vrpKa63epEDmG8nTduyAew= github.com/multiformats/go-multihash v0.2.3 h1:7Lyc8XfX/IY2jWb/gI7JP+o7JEq9hOa7BFvVU9RSh+U= github.com/multiformats/go-multihash v0.2.3/go.mod h1:dXgKXCXjBzdscBLk9JkjINiEsCKRVch90MdaGiKsvSM= -github.com/multiformats/go-multistream v0.5.0 h1:5htLSLl7lvJk3xx3qT/8Zm9J4K8vEOf/QGkvOGQAyiE= -github.com/multiformats/go-multistream v0.5.0/go.mod h1:n6tMZiwiP2wUsR8DgfDWw1dydlEqV3l6N3/GBsX6ILA= +github.com/multiformats/go-multistream v0.6.0 h1:ZaHKbsL404720283o4c/IHQXiS6gb8qAN5EIJ4PN5EA= +github.com/multiformats/go-multistream v0.6.0/go.mod h1:MOyoG5otO24cHIg8kf9QW2/NozURlkP/rvi2FQJyCPg= github.com/multiformats/go-varint v0.0.7 h1:sWSGR+f/eu5ABZA2ZpYKBILXTTs9JWpdEM/nEGOHFS8= github.com/multiformats/go-varint v0.0.7/go.mod h1:r8PUYw/fD/SjBCiKOoDlGF6QawOELpZAu9eioSos/OU= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= diff --git a/test-plans/go.mod b/test-plans/go.mod index 8e670a5129..b2eee27810 100644 --- a/test-plans/go.mod +++ b/test-plans/go.mod @@ -61,7 +61,7 @@ require ( github.com/multiformats/go-multibase v0.2.0 // indirect github.com/multiformats/go-multicodec v0.9.0 // indirect github.com/multiformats/go-multihash v0.2.3 // indirect - github.com/multiformats/go-multistream v0.5.0 // indirect + github.com/multiformats/go-multistream v0.6.0 // indirect github.com/multiformats/go-varint v0.0.7 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/onsi/ginkgo/v2 v2.20.2 // indirect diff --git a/test-plans/go.sum b/test-plans/go.sum index 2b532868ba..cbb839c369 100644 --- a/test-plans/go.sum +++ b/test-plans/go.sum @@ -196,8 +196,8 @@ github.com/multiformats/go-multicodec v0.9.0/go.mod h1:L3QTQvMIaVBkXOXXtVmYE+LI1 github.com/multiformats/go-multihash v0.0.8/go.mod h1:YSLudS+Pi8NHE7o6tb3D8vrpKa63epEDmG8nTduyAew= github.com/multiformats/go-multihash v0.2.3 h1:7Lyc8XfX/IY2jWb/gI7JP+o7JEq9hOa7BFvVU9RSh+U= github.com/multiformats/go-multihash v0.2.3/go.mod h1:dXgKXCXjBzdscBLk9JkjINiEsCKRVch90MdaGiKsvSM= -github.com/multiformats/go-multistream v0.5.0 h1:5htLSLl7lvJk3xx3qT/8Zm9J4K8vEOf/QGkvOGQAyiE= -github.com/multiformats/go-multistream v0.5.0/go.mod h1:n6tMZiwiP2wUsR8DgfDWw1dydlEqV3l6N3/GBsX6ILA= +github.com/multiformats/go-multistream v0.6.0 h1:ZaHKbsL404720283o4c/IHQXiS6gb8qAN5EIJ4PN5EA= +github.com/multiformats/go-multistream v0.6.0/go.mod h1:MOyoG5otO24cHIg8kf9QW2/NozURlkP/rvi2FQJyCPg= github.com/multiformats/go-varint v0.0.7 h1:sWSGR+f/eu5ABZA2ZpYKBILXTTs9JWpdEM/nEGOHFS8= github.com/multiformats/go-varint v0.0.7/go.mod h1:r8PUYw/fD/SjBCiKOoDlGF6QawOELpZAu9eioSos/OU= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= From ae0d78aae140b25c7ace752d084eb4695ccda224 Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Mon, 18 Nov 2024 14:38:43 -0800 Subject: [PATCH 5/6] fix: Defer resource usage cleanup until the very end (#3042) --- p2p/protocol/circuitv2/relay/relay.go | 4 ++-- p2p/transport/webtransport/conn.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/p2p/protocol/circuitv2/relay/relay.go b/p2p/protocol/circuitv2/relay/relay.go index 2ee237d97b..326d17781b 100644 --- a/p2p/protocol/circuitv2/relay/relay.go +++ b/p2p/protocol/circuitv2/relay/relay.go @@ -118,7 +118,7 @@ func (r *Relay) Close() error { r.host.RemoveStreamHandler(proto.ProtoIDv2Hop) r.host.Network().StopNotify(r.notifiee) - r.scope.Done() + defer r.scope.Done() r.cancel() r.gc() if r.metricsTracer != nil { @@ -315,7 +315,7 @@ func (r *Relay) handleConnect(s network.Stream, msg *pbv2.HopMessage) pbv2.Statu connStTime := time.Now() cleanup := func() { - span.Done() + defer span.Done() r.mx.Lock() r.rmConn(src) r.rmConn(dest.ID) diff --git a/p2p/transport/webtransport/conn.go b/p2p/transport/webtransport/conn.go index 0525124711..d914398e0e 100644 --- a/p2p/transport/webtransport/conn.go +++ b/p2p/transport/webtransport/conn.go @@ -71,7 +71,7 @@ func (c *conn) allowWindowIncrease(size uint64) bool { // It must be called even if the peer closed the connection in order for // garbage collection to properly work in this package. func (c *conn) Close() error { - c.scope.Done() + defer c.scope.Done() c.transport.removeConn(c.session) err := c.session.CloseWithError(0, "") _ = c.qconn.CloseWithError(1, "") From 0606295e010406ab8bdf15227b9b090cbadeeaf8 Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Tue, 19 Nov 2024 14:04:47 -0800 Subject: [PATCH 6/6] refactor(libp2phttp): don't require specific port for the HTTP host example (#3047) --- p2p/http/example_test.go | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/p2p/http/example_test.go b/p2p/http/example_test.go index 7073b7f0e3..8e94f6e7e0 100644 --- a/p2p/http/example_test.go +++ b/p2p/http/example_test.go @@ -6,6 +6,7 @@ import ( "log" "net" "net/http" + "regexp" "strings" "github.com/libp2p/go-libp2p" @@ -125,18 +126,24 @@ func ExampleHost_overLibp2pStreams() { // Output: Hello HTTP } +var tcpPortRE = regexp.MustCompile(`/tcp/(\d+)`) + func ExampleHost_Serve() { server := libp2phttp.Host{ InsecureAllowHTTP: true, // For our example, we'll allow insecure HTTP - ListenAddrs: []ma.Multiaddr{ma.StringCast("/ip4/127.0.0.1/tcp/50221/http")}, + ListenAddrs: []ma.Multiaddr{ma.StringCast("/ip4/127.0.0.1/tcp/0/http")}, } go server.Serve() defer server.Close() - fmt.Println(server.Addrs()) + for _, a := range server.Addrs() { + s := a.String() + addrWithoutSpecificPort := tcpPortRE.ReplaceAllString(s, "/tcp/") + fmt.Println(addrWithoutSpecificPort) + } - // Output: [/ip4/127.0.0.1/tcp/50221/http] + // Output: /ip4/127.0.0.1/tcp//http } func ExampleHost_SetHTTPHandler() {