From bbce7682f4f47976a7f8238dd623891f411d113f Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Fri, 27 Jan 2023 12:55:56 -0800 Subject: [PATCH] rcmgr: Fix connection accounting --- p2p/transport/quic/conn.go | 6 +++++- p2p/transport/quic/listener.go | 19 +++++++++++-------- p2p/transport/quic/transport.go | 12 +++++++++--- 3 files changed, 25 insertions(+), 12 deletions(-) diff --git a/p2p/transport/quic/conn.go b/p2p/transport/quic/conn.go index 999615ceb8..261e19a2d1 100644 --- a/p2p/transport/quic/conn.go +++ b/p2p/transport/quic/conn.go @@ -32,8 +32,12 @@ var _ tpt.CapableConn = &conn{} // It must be called even if the peer closed the connection in order for // garbage collection to properly work in this package. func (c *conn) Close() error { + return c.closeWithError(0, "") +} + +func (c *conn) closeWithError(errCode quic.ApplicationErrorCode, errString string) error { c.transport.removeConn(c.quicConn) - err := c.quicConn.CloseWithError(0, "") + err := c.quicConn.CloseWithError(errCode, errString) c.scope.Done() return err } diff --git a/p2p/transport/quic/listener.go b/p2p/transport/quic/listener.go index ea6b68bd6c..08be39b83a 100644 --- a/p2p/transport/quic/listener.go +++ b/p2p/transport/quic/listener.go @@ -56,15 +56,13 @@ func (l *listener) Accept() (tpt.CapableConn, error) { } c, err := l.setupConn(qconn) if err != nil { - qconn.CloseWithError(1, err.Error()) continue } + l.transport.addConn(qconn, c) if l.transport.gater != nil && !(l.transport.gater.InterceptAccept(c) && l.transport.gater.InterceptSecured(network.DirInbound, c.remotePeerID, c)) { - c.scope.Done() - qconn.CloseWithError(errorCodeConnectionGating, "connection gated") + c.closeWithError(errorCodeConnectionGating, "connection gated") continue } - l.transport.addConn(qconn, c) // return through active hole punching if any key := holePunchKey{addr: qconn.RemoteAddr().String(), peer: c.remotePeerID} @@ -84,7 +82,7 @@ func (l *listener) Accept() (tpt.CapableConn, error) { } } -func (l *listener) setupConn(qconn quic.Connection) (*conn, error) { +func (l *listener) setupConn(qconn quic.Connection) (_c *conn, _err error) { remoteMultiaddr, err := quicreuse.ToQuicMultiaddr(qconn.RemoteAddr(), qconn.ConnectionState().Version) if err != nil { return nil, err @@ -95,23 +93,28 @@ func (l *listener) setupConn(qconn quic.Connection) (*conn, error) { log.Debugw("resource manager blocked incoming connection", "addr", qconn.RemoteAddr(), "error", err) return nil, err } + // err defer + defer func() { + if _err != nil { + qconn.CloseWithError(1, _err.Error()) + connScope.Done() + } + }() + // The tls.Config used to establish this connection already verified the certificate chain. // Since we don't have any way of knowing which tls.Config was used though, // we have to re-determine the peer's identity here. // Therefore, this is expected to never fail. remotePubKey, err := p2ptls.PubKeyFromCertChain(qconn.ConnectionState().TLS.PeerCertificates) if err != nil { - connScope.Done() return nil, err } remotePeerID, err := peer.IDFromPublicKey(remotePubKey) if err != nil { - connScope.Done() return nil, err } if err := connScope.SetPeer(remotePeerID); err != nil { log.Debugw("resource manager blocked incoming connection for peer", "peer", remotePeerID, "addr", qconn.RemoteAddr(), "error", err) - connScope.Done() return nil, err } diff --git a/p2p/transport/quic/transport.go b/p2p/transport/quic/transport.go index 325471973e..cb45b38b84 100644 --- a/p2p/transport/quic/transport.go +++ b/p2p/transport/quic/transport.go @@ -102,7 +102,7 @@ func NewTransport(key ic.PrivKey, connManager *quicreuse.ConnManager, psk pnet.P } // Dial dials a new QUIC connection -func (t *transport) Dial(ctx context.Context, raddr ma.Multiaddr, p peer.ID) (tpt.CapableConn, error) { +func (t *transport) Dial(ctx context.Context, raddr ma.Multiaddr, p peer.ID) (_c tpt.CapableConn, _err error) { tlsConf, keyCh := t.identity.ConfigForPeer(p) if ok, isClient, _ := network.GetSimultaneousConnect(ctx); ok && !isClient { return t.holePunch(ctx, raddr, p) @@ -113,9 +113,16 @@ func (t *transport) Dial(ctx context.Context, raddr ma.Multiaddr, p peer.ID) (tp log.Debugw("resource manager blocked outgoing connection", "peer", p, "addr", raddr, "error", err) return nil, err } + + // err defer + defer func() { + if _err != nil { + scope.Done() + } + }() + if err := scope.SetPeer(p); err != nil { log.Debugw("resource manager blocked outgoing connection for peer", "peer", p, "addr", raddr, "error", err) - scope.Done() return nil, err } pconn, err := t.connManager.DialQUIC(ctx, raddr, tlsConf, t.allowWindowIncrease) @@ -131,7 +138,6 @@ func (t *transport) Dial(ctx context.Context, raddr ma.Multiaddr, p peer.ID) (tp } if remotePubKey == nil { pconn.CloseWithError(1, "") - scope.Done() return nil, errors.New("p2p/transport/quic BUG: expected remote pub key to be set") }