diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000000..6c6f460db0 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,13 @@ +{ + "[markdown]": { + "editor.unicodeHighlight.ambiguousCharacters": false, + "editor.unicodeHighlight.invisibleCharacters": false, + "editor.wordWrap": "on", + "editor.quickSuggestions": { + "comments": "off", + "strings": "off", + "other": "off" + }, + "editor.formatOnSave": false + } +} diff --git a/doc/CONFIGURATION.md b/doc/CONFIGURATION.md index 78c09f84a7..e3ea10497f 100644 --- a/doc/CONFIGURATION.md +++ b/doc/CONFIGURATION.md @@ -556,7 +556,8 @@ const node = await createLibp2p({ streamMuxers: [mplex()], connectionEncryption: [noise()], connectionManager: { - maxConnections: Infinity, + maxIncomingConnections: Infinity, + maxOutgoingConnections: Infinity, minConnections: 0, pollInterval: 2000, // The below values will only be taken into account when Metrics are enabled diff --git a/doc/LIMITS.md b/doc/LIMITS.md index 6173b413a7..b7b50f1494 100644 --- a/doc/LIMITS.md +++ b/doc/LIMITS.md @@ -23,8 +23,6 @@ This is important for [DoS](https://en.wikipedia.org/wiki/Denial-of-service_atta It's possible to limit the total amount of connections a node is able to make (combining incoming and outgoing). When this limit is reached and an attempt to open a new connection is made, existing connections may be closed to make room for the new connection (see [Closing connections][#closing-connections]). -* Note: there currently isn't a way to specify different limits for incoming vs. outgoing. Connection limits are applied across both incoming and outgoing connections combined. There is a backlog item for this [here](https://github.com/libp2p/js-libp2p/issues/1508). - We can also limit the number of connections in a "pending" state. These connections have been opened by a remote peer but peer IDs have yet to be exchanged and/or connection encryption and multiplexing negotiated. Once this limit is hit further connections will be closed unless the remote peer has an address in the [allow list](#allowdeny-lists). All fields are optional. The default values are defined in [src/connection-manager/index.ts](https://github.com/libp2p/js-libp2p/blob/master/src/connection-manager/index.ts) - please see that file for the current values. @@ -33,9 +31,14 @@ All fields are optional. The default values are defined in [src/connection-manag const node = await createLibp2pNode({ connectionManager: { /** - * The total number of connections allowed to be open at one time + * The total number of incoming connections allowed to be open at one time */ - maxConnections: number + maxIncomingConnections: number + + /** + * The total number of outgoing connections allowed to be open at one time + */ + maxOutgoingConnections: number /** * If the number of open connections goes below this number, the node @@ -255,9 +258,9 @@ const node = await createLibp2pNode({ /** * Once this many connections are open on this listener any further connections * will be rejected - this will have no effect if it is larger than the value - * configured for the ConnectionManager maxConnections parameter + * configured for the ConnectionManager maxIncomingConnections parameter */ - maxConnections: number + maxIncomingConnections: number }) ] }) @@ -310,7 +313,7 @@ Important details for ascertaining this are: As a result, the max amount of memory buffered by libp2p is approximately: ``` -connectionManager.maxConnections * +(connectionManager.maxOutgoingConnections + connectionManager.maxIncomingConnections) * (muxer.maxUnprocessedMessageQueueSize + (muxer.maxInboundStreams * muxer.maxStreamBufferSize) + (muxer.maxOutboundStreams * muxer.maxStreamBufferSize) diff --git a/src/config.ts b/src/config.ts index e856061c99..9cb805622a 100644 --- a/src/config.ts +++ b/src/config.ts @@ -20,7 +20,8 @@ const DefaultConfig: Partial = { announceFilter: (multiaddrs: Multiaddr[]) => multiaddrs }, connectionManager: { - maxConnections: 300, + maxIncomingConnections: 300, + maxOutgoingConnections: 300, minConnections: 50, autoDial: true, autoDialInterval: 10000, diff --git a/src/connection-manager/auto-dialler.ts b/src/connection-manager/auto-dialler.ts index 2b26b00ad2..136ba73a6f 100644 --- a/src/connection-manager/auto-dialler.ts +++ b/src/connection-manager/auto-dialler.ts @@ -20,7 +20,7 @@ export interface AutoDiallerInit { enabled?: boolean /** - * The minimum number of connections to avoid pruning + * The minimum number of incoming connections to avoid pruning */ minConnections?: number @@ -107,7 +107,7 @@ export class AutoDialler implements Startable { this.autoDialTimeout.clear() } - const minConnections = this.options.minConnections + const { minConnections } = this.options // Already has enough connections if (this.components.connectionManager.getConnections().length >= minConnections) { diff --git a/src/connection-manager/index.ts b/src/connection-manager/index.ts index 0c40953863..eb74810c8e 100644 --- a/src/connection-manager/index.ts +++ b/src/connection-manager/index.ts @@ -27,7 +27,12 @@ export interface ConnectionManagerConfig { /** * The maximum number of connections libp2p is willing to have before it starts disconnecting. Defaults to `Infinity` */ - maxConnections: number + maxIncomingConnections: number + + /** + * The maximum number of outgoing connections to keep open + */ + maxOutgoingConnections: number /** * The minimum number of connections below which libp2p not activate preemptive disconnections. Defaults to `0`. @@ -100,7 +105,7 @@ export interface ConnectionManagerConfig { /** * A list of multiaddrs that will always be allowed (except if they are in the - * deny list) to open connections to this node even if we've reached maxConnections + * deny list) to open connections to this node even if we've reached maxnOutgoingConnections */ allow?: string[] @@ -124,7 +129,8 @@ export interface ConnectionManagerConfig { } const defaultOptions: Partial = { - maxConnections: Infinity, + maxIncomingConnections: Infinity, + maxOutgoingConnections: Infinity, minConnections: 0, maxEventLoopDelay: Infinity, pollInterval: 2000, @@ -167,8 +173,11 @@ export class DefaultConnectionManager extends EventEmitter { + if (connection.stat.direction === 'inbound') { + inboundConnections++ + } else { + outboundConnections++ + } + }) + + const numIncomingToPrune = inboundConnections - this.opts.maxIncomingConnections + const numOutgoingToPrune = outboundConnections - this.opts.maxOutgoingConnections + + await this._checkMaxLimit('maxOutgoingConnections', outboundConnections, numOutgoingToPrune) + await this._checkMaxLimit('maxIncomingConnections', inboundConnections, numIncomingToPrune) - await this._checkMaxLimit('maxConnections', numConnections, toPrune) this.dispatchEvent(new CustomEvent('peer:connect', { detail: connection })) } @@ -489,6 +512,16 @@ export class DefaultConnectionManager extends EventEmitter connection.stat.direction === 'outbound').length + + if ((totalOutboundConnections + 1) > this.opts.maxOutgoingConnections) { + throw errCode( + new Error('Connection Manager max connections exceeded'), + codes.ERR_CONNECTION_DENIED + ) + } + let timeoutController: TimeoutController | undefined if (options?.signal == null) { @@ -684,23 +717,27 @@ export class DefaultConnectionManager extends EventEmitter= Number(this.opts.maxIncomingPendingConnections)) { + log('connection from %s refused - incomingPendingConnections exceeded by peer %s', maConn.remoteAddr) + return false + } + + const connections = this.getConnections() + + const totalIncomingConnections = connections.filter(connection => connection.stat.direction === 'inbound').length + // check allow list const allowConnection = this.allow.some(ma => { return maConn.remoteAddr.toString().startsWith(ma.toString()) }) - if (allowConnection) { - this.incomingPendingConnections++ - - return true - } - - // check pending connections - if (this.incomingPendingConnections === this.opts.maxIncomingPendingConnections) { - log('connection from %s refused - incomingPendingConnections exceeded by peer %s', maConn.remoteAddr) + if ((totalIncomingConnections + 1 > this.opts.maxIncomingConnections) && !allowConnection) { + log('connection from %s refused - maxIncomingConnections exceeded', maConn.remoteAddr) return false } + // Check the rate limiter if (maConn.remoteAddr.isThinWaistAddress()) { const host = maConn.remoteAddr.nodeAddress().address @@ -712,14 +749,9 @@ export class DefaultConnectionManager extends EventEmitter { upgrader, peerStore }, { - maxConnections: 1000, + maxIncomingConnections: 1000, minConnections: 50, + maxOutgoingConnections: 1000, autoDialInterval: 1000, inboundUpgradeTimeout: 1000 }) @@ -97,8 +98,9 @@ describe('Connection Manager', () => { upgrader, peerStore }, { - maxConnections: 1000, + maxIncomingConnections: 1000, minConnections: 50, + maxOutgoingConnections: 1000, autoDialInterval: 1000, inboundUpgradeTimeout: 1000 }) @@ -233,7 +235,7 @@ describe('libp2p.connections', () => { }, connectionManager: { minConnections, - maxConnections: 1 + maxIncomingConnections: 1 } } }) @@ -265,7 +267,7 @@ describe('libp2p.connections', () => { }, connectionManager: { minConnections, - maxConnections: 1 + maxIncomingConnections: 1 } } }) diff --git a/test/connection-manager/index.spec.ts b/test/connection-manager/index.spec.ts index b5668b2148..054b2abad7 100644 --- a/test/connection-manager/index.spec.ts +++ b/test/connection-manager/index.spec.ts @@ -17,10 +17,12 @@ import type { Dialer } from '@libp2p/interface-connection-manager' import type { Connection } from '@libp2p/interface-connection' import type { Upgrader } from '@libp2p/interface-transport' import type { PeerStore } from '@libp2p/interface-peer-store' +import { codes as ErrorCodes } from '../../src/errors.js' const defaultOptions = { - maxConnections: 10, + maxIncomingConnections: 10, minConnections: 1, + maxOutgoingConnections: 10, autoDialInterval: Infinity, inboundUpgradeTimeout: 10000 } @@ -69,7 +71,7 @@ describe('Connection Manager', () => { libp2p = await createNode({ config: createBaseOptions({ connectionManager: { - maxConnections: max, + maxIncomingConnections: max, minConnections: 2 } }), @@ -119,7 +121,7 @@ describe('Connection Manager', () => { libp2p = await createNode({ config: createBaseOptions({ connectionManager: { - maxConnections: max, + maxIncomingConnections: max, minConnections: 2 } }), @@ -169,12 +171,77 @@ describe('Connection Manager', () => { expect(shortestLivedWithLowestTagSpy).to.have.property('callCount', 1) }) - it('should close connection when the maximum has been reached even without tags', async () => { + it('should not close connection that is on the allowlist when pruning', async () => { + const max = 5 + const remoteAddrPeerId = await createEd25519PeerId() + + libp2p = await createNode({ + config: createBaseOptions({ + connectionManager: { + maxIncomingConnections: max, + minConnections: 0, + allow: [ + '/ip4/83.13.55.32' + ] + } + }), + started: false + }) + + await libp2p.start() + + const connectionManager = libp2p.connectionManager as DefaultConnectionManager + const connectionManagerMaybeDisconnectOneSpy = sinon.spy(connectionManager, '_pruneConnections') + const spies = new Map>>() + + // Max out connections + for (let i = 1; i < max; i++) { + const connection = mockConnection(mockMultiaddrConnection(mockDuplex(), await createEd25519PeerId())) + const spy = sinon.spy(connection, 'close') + const value = i * 10 + spies.set(value, spy) + await libp2p.peerStore.tagPeer(connection.remotePeer, 'test-tag', { + value + }) + await connectionManager._onConnect(new CustomEvent('connection', { detail: connection })) + } + + // Connect to the peer on the allowed list + const connection = mockConnection(mockMultiaddrConnection(mockDuplex(), remoteAddrPeerId)) + + // Tag that allowed peer with lowest value + const value = 0 * 10 + await libp2p.peerStore.tagPeer(connection.remotePeer, 'test-tag', { + value + }) + + await connectionManager._onConnect(new CustomEvent('connection', { detail: connection })) + + // get the lowest value + const lowest = Array.from(spies.keys()).sort((a, b) => { + if (a > b) { + return 1 + } + + if (a < b) { + return -1 + } + + return 0 + })[0] + const lowestSpy = spies.get(lowest) + + expect(connectionManagerMaybeDisconnectOneSpy.callCount).to.equal(0) + // expect lowest value spy NOT to be called since the peer is in the allow list + expect(lowestSpy).to.have.property('callCount', 0) + }) + + it('should close connection when the maximum incoming has been reached even without tags', async () => { const max = 5 libp2p = await createNode({ config: createBaseOptions({ connectionManager: { - maxConnections: max, + maxIncomingConnections: max, minConnections: 0 } }), @@ -198,16 +265,17 @@ describe('Connection Manager', () => { expect(spy).to.have.property('callCount', 1) }) - it('should fail if the connection manager has mismatched connection limit options', async () => { + it('should fail if the connection manager has mismatched incoming + outgoing connection limit options', async () => { await expect(createNode({ config: createBaseOptions({ connectionManager: { - maxConnections: 5, - minConnections: 6 + maxIncomingConnections: 5, + maxOutgoingConnections: 1, + minConnections: 7 } }), started: false - })).to.eventually.rejected('maxConnections must be greater') + })).to.eventually.rejected('maxIncomingConnections must be greater') }) it('should reconnect to important peers on startup', async () => { @@ -263,35 +331,64 @@ describe('Connection Manager', () => { .to.eventually.be.false() }) - it('should deny connections when maxConnections is exceeded', async () => { - const dialer = stubInterface() - dialer.dial.resolves(stubInterface()) - - const connectionManager = new DefaultConnectionManager({ - peerId: libp2p.peerId, - upgrader: stubInterface(), - peerStore: stubInterface(), - dialer - }, { - ...defaultOptions, - maxConnections: 1 + it('should deny connections when maxIncomingConnections is exceeded', async () => { + libp2p = await createNode({ + config: createBaseOptions({ + connectionManager: { + maxIncomingConnections: 1 + } + }), + started: false }) - // max out the connection limit - await connectionManager.openConnection(await createEd25519PeerId()) + await libp2p.start() + + const connectionManager = libp2p.connectionManager as DefaultConnectionManager + + // max out the connection limit by having an inbound connection already + const connection = mockConnection(mockMultiaddrConnection(mockDuplex(), await createEd25519PeerId())) + connection.stat.direction = 'inbound' + await connectionManager._onConnect(new CustomEvent('connection', { detail: connection })) + expect(connectionManager.getConnections()).to.have.lengthOf(1) - // an inbound connection is opened - const remotePeer = await createEd25519PeerId() - const maConn = mockMultiaddrConnection({ + // another inbound connection is opened + const remotePeer2 = await createEd25519PeerId() + const maConn2 = mockMultiaddrConnection({ source: [], sink: async () => {} - }, remotePeer) + }, remotePeer2) - await expect(connectionManager.acceptIncomingConnection(maConn)) + await expect(connectionManager.acceptIncomingConnection(maConn2)) .to.eventually.be.false() }) + it('should throw an error when attempting to connect and maxOutgoingConnections is exceeded', async () => { + libp2p = await createNode({ + config: createBaseOptions({ + connectionManager: { + maxOutgoingConnections: 1 + } + }), + started: false + }) + + await libp2p.start() + + const connectionManager = libp2p.connectionManager as DefaultConnectionManager + + // max out the connection limit by having an inbound connection already + const connection = mockConnection(mockMultiaddrConnection(mockDuplex(), await createEd25519PeerId())) + connection.stat.direction = 'outbound' + await connectionManager._onConnect(new CustomEvent('connection', { detail: connection })) + + expect(connectionManager.getConnections()).to.have.lengthOf(1) + + // another outbound connection is opened + await expect(connectionManager.openConnection(await createEd25519PeerId())).to.eventually.be.rejected() + .and.to.have.property('code', ErrorCodes.ERR_CONNECTION_DENIED) + }) + it('should deny connections from peers that connect too frequently', async () => { const dialer = stubInterface() dialer.dial.resolves(stubInterface()) @@ -334,7 +431,7 @@ describe('Connection Manager', () => { dialer }, { ...defaultOptions, - maxConnections: 1, + maxIncomingConnections: 1, allow: [ '/ip4/83.13.55.32' ] diff --git a/test/dialing/direct.node.ts b/test/dialing/direct.node.ts index 86ff236db8..c51f292f25 100644 --- a/test/dialing/direct.node.ts +++ b/test/dialing/direct.node.ts @@ -79,7 +79,8 @@ describe('Dialing (direct, TCP)', () => { }) localComponents.peerStore = new PersistentPeerStore(localComponents) localComponents.connectionManager = new DefaultConnectionManager(localComponents, { - maxConnections: 100, + maxIncomingConnections: 100, + maxOutgoingConnections: 100, minConnections: 50, autoDialInterval: 1000, inboundUpgradeTimeout: 1000 diff --git a/test/dialing/direct.spec.ts b/test/dialing/direct.spec.ts index 1084db1270..9e6bc78406 100644 --- a/test/dialing/direct.spec.ts +++ b/test/dialing/direct.spec.ts @@ -51,7 +51,8 @@ describe('Dialing (direct, WebSockets)', () => { addressFilter: localComponents.connectionGater.filterMultiaddrForPeer }) localComponents.connectionManager = new DefaultConnectionManager(localComponents, { - maxConnections: 100, + maxIncomingConnections: 100, + maxOutgoingConnections: 100, minConnections: 50, autoDialInterval: 1000, inboundUpgradeTimeout: 1000 diff --git a/test/fetch/index.spec.ts b/test/fetch/index.spec.ts index 27a7e80db3..2d532a3088 100644 --- a/test/fetch/index.spec.ts +++ b/test/fetch/index.spec.ts @@ -35,7 +35,8 @@ async function createComponents (index: number) { components.peerStore = new PersistentPeerStore(components) components.connectionManager = new DefaultConnectionManager(components, { minConnections: 50, - maxConnections: 1000, + maxOutgoingConnections: 1000, + maxIncomingConnections: 1000, autoDialInterval: 1000, inboundUpgradeTimeout: 1000 }) diff --git a/test/identify/index.spec.ts b/test/identify/index.spec.ts index 5a5f0b247e..1639d9dcd6 100644 --- a/test/identify/index.spec.ts +++ b/test/identify/index.spec.ts @@ -59,7 +59,8 @@ async function createComponents (index: number) { components.peerStore = new PersistentPeerStore(components) components.connectionManager = new DefaultConnectionManager(components, { minConnections: 50, - maxConnections: 1000, + maxIncomingConnections: 1000, + maxOutgoingConnections: 1000, autoDialInterval: 1000, inboundUpgradeTimeout: 1000 }) diff --git a/test/identify/push.spec.ts b/test/identify/push.spec.ts index c6482aecce..b2dbddd29c 100644 --- a/test/identify/push.spec.ts +++ b/test/identify/push.spec.ts @@ -57,7 +57,8 @@ async function createComponents (index: number): Promise { components.peerStore = new PersistentPeerStore(components) components.connectionManager = new DefaultConnectionManager(components, { minConnections: 50, - maxConnections: 1000, + maxIncomingConnections: 1000, + maxOutgoingConnections: 1000, autoDialInterval: 1000, inboundUpgradeTimeout: 1000 }) diff --git a/test/ping/index.spec.ts b/test/ping/index.spec.ts index 4fedd39806..9bc5a703a0 100644 --- a/test/ping/index.spec.ts +++ b/test/ping/index.spec.ts @@ -35,7 +35,8 @@ async function createComponents (index: number): Promise { components.peerStore = new PersistentPeerStore(components) components.connectionManager = new DefaultConnectionManager(components, { minConnections: 50, - maxConnections: 1000, + maxOutgoingConnections: 1000, + maxIncomingConnections: 1000, autoDialInterval: 1000, inboundUpgradeTimeout: 1000 }) diff --git a/test/registrar/registrar.spec.ts b/test/registrar/registrar.spec.ts index d9fac96a4f..bb1f0e8c69 100644 --- a/test/registrar/registrar.spec.ts +++ b/test/registrar/registrar.spec.ts @@ -43,7 +43,8 @@ describe('registrar', () => { components.peerStore = new PersistentPeerStore(components) components.connectionManager = new DefaultConnectionManager(components, { minConnections: 50, - maxConnections: 1000, + maxOutgoingConnections: 1000, + maxIncomingConnections: 1000, autoDialInterval: 1000, inboundUpgradeTimeout: 1000 })