From c9950e9682b9f94eb82848db044390bd50a5d372 Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Mon, 11 Nov 2024 19:52:59 -0800 Subject: [PATCH 1/2] fix: identify: push should not dial a new connection --- p2p/protocol/identify/id.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/p2p/protocol/identify/id.go b/p2p/protocol/identify/id.go index 1733a4166c..146d58d9d7 100644 --- a/p2p/protocol/identify/id.go +++ b/p2p/protocol/identify/id.go @@ -347,6 +347,10 @@ func (ids *idService) sendPushes(ctx context.Context) { defer func() { <-sem }() ctx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() + + // We only want to send an identify push if we already have an open + // connection. + ctx = network.WithNoDial(ctx, "id push") str, err := ids.Host.NewStream(ctx, c.RemotePeer(), IDPush) if err != nil { // connection might have been closed recently return From b53c174b85a95b24cda0a4ff7d2dfb1c14b9ccfb Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Tue, 12 Nov 2024 15:43:06 -0800 Subject: [PATCH 2/2] send IDPush on given conn --- p2p/protocol/identify/id.go | 36 +++++++++++++++++++++++------------- 1 file changed, 23 insertions(+), 13 deletions(-) diff --git a/p2p/protocol/identify/id.go b/p2p/protocol/identify/id.go index 146d58d9d7..b6d5240ba6 100644 --- a/p2p/protocol/identify/id.go +++ b/p2p/protocol/identify/id.go @@ -348,10 +348,7 @@ func (ids *idService) sendPushes(ctx context.Context) { ctx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() - // We only want to send an identify push if we already have an open - // connection. - ctx = network.WithNoDial(ctx, "id push") - str, err := ids.Host.NewStream(ctx, c.RemotePeer(), IDPush) + str, err := newStreamAndNegotiate(ctx, c, IDPush) if err != nil { // connection might have been closed recently return } @@ -441,25 +438,38 @@ func (ids *idService) IdentifyWait(c network.Conn) <-chan struct{} { return e.IdentifyWaitChan } -func (ids *idService) identifyConn(c network.Conn) error { - ctx, cancel := context.WithTimeout(context.Background(), Timeout) - defer cancel() +// newStreamAndNegotiate opens a new stream on the given connection and negotiates the given protocol. +func newStreamAndNegotiate(ctx context.Context, c network.Conn, proto protocol.ID) (network.Stream, error) { s, err := c.NewStream(network.WithAllowLimitedConn(ctx, "identify")) if err != nil { log.Debugw("error opening identify stream", "peer", c.RemotePeer(), "error", err) - return err + return nil, err + } + err = s.SetDeadline(time.Now().Add(Timeout)) + if err != nil { + return nil, err } - s.SetDeadline(time.Now().Add(Timeout)) - if err := s.SetProtocol(ID); err != nil { + if err := s.SetProtocol(proto); err != nil { log.Warnf("error setting identify protocol for stream: %s", err) - s.Reset() + _ = s.Reset() } // ok give the response to our handler. - if err := msmux.SelectProtoOrFail(ID, s); err != nil { + if err := msmux.SelectProtoOrFail(proto, s); err != nil { log.Infow("failed negotiate identify protocol with peer", "peer", c.RemotePeer(), "error", err) - s.Reset() + _ = s.Reset() + return nil, err + } + return s, nil +} + +func (ids *idService) identifyConn(c network.Conn) error { + ctx, cancel := context.WithTimeout(context.Background(), Timeout) + defer cancel() + s, err := newStreamAndNegotiate(network.WithAllowLimitedConn(ctx, "identify"), c, ID) + if err != nil { + log.Debugw("error opening identify stream", "peer", c.RemotePeer(), "error", err) return err }