diff --git a/lib/replicator.js b/lib/replicator.js index 68de631b..912a7673 100644 --- a/lib/replicator.js +++ b/lib/replicator.js @@ -980,6 +980,7 @@ module.exports = class Replicator { this.peers = [] this.findingPeers = 0 // updateable from the outside + this._attached = new Set() this._inflight = new InflightTracker() this._blocks = new BlockTracker() this._hashes = new BlockTracker() @@ -995,6 +996,13 @@ module.exports = class Replicator { this._ifAvailable = 0 this._updatesPending = 0 this._applyingReorg = null + + const self = this + this._onstreamclose = onstreamclose + + function onstreamclose () { + self.detachFrom(this.userData) + } } cork () { @@ -1665,21 +1673,36 @@ module.exports = class Replicator { attachTo (protomux, session) { const makePeer = this._makePeer.bind(this, protomux, session) + this._attached.add(protomux) protomux.pair({ protocol: 'hypercore/alpha', id: this.discoveryKey }, makePeer) + protomux.stream.setMaxListeners(0) + protomux.stream.on('close', this._onstreamclose) + this._ifAvailable++ protomux.stream.opened.then((opened) => { this._ifAvailable-- + if (opened) makePeer() else if (session) session.close().catch(noop) this._checkUpgradeIfAvailable() }) } + detachFrom (protomux) { + if (this._attached.delete(protomux)) { + protomux.stream.removeListener('close', this._onstreamclose) + protomux.unpair({ protocol: 'hypercore/alpha', id: this.discoveryKey }) + } + } + destroy () { for (const peer of this.peers) { - peer.protomux.unpair({ protocol: 'hypercore/alpha', id: this.discoveryKey }) + this.detachFrom(peer.protomux) peer.channel.close() } + for (const protomux of this._attached) { + this.detachFrom(protomux) + } } _makePeer (protomux, session) {