From 2d384ee9f7cb4704109f7488bda401bd1c96c2f3 Mon Sep 17 00:00:00 2001 From: Stephen Cresswell <229672+cressie176@users.noreply.github.com> Date: Thu, 24 Aug 2023 06:13:55 +0100 Subject: [PATCH] Add maxQueueDepth --- README.md | 15 ++++++++------ lib/Errors.js | 5 +++++ lib/Pool.js | 2 +- lib/State.js | 5 ++++- test/Pool.test.js | 51 +++++++++++++++++++++++++++++++++++++++++++++-- 5 files changed, 68 insertions(+), 10 deletions(-) diff --git a/README.md b/README.md index eef3f12..5b26d38 100644 --- a/README.md +++ b/README.md @@ -26,9 +26,10 @@ try { | Name | Type | Required | Default | Notes | | -------------------- | ------- | -------- | -------- | ------------------------------------------------------------------------------------------------------------------------------ | -| factory | Factory | Y | | An instance of a resource factory | +| factory | Factory | Y | | An instance of a resource factory. | | minSize | integer | N | 0 | Specifies the minimum pool size. | | maxSize | integer | N | Infinity | Specifies the maximum pool size. | +| maxQueueDepth | integer | N | Infinity | Specifies the maximum acquire queue depth. | | initialiseTimeout | integer | N | | The number of milliseconds the pool will wait to initialise. If unset the pool will wait undefinitely. | | acquireTimeout | integer | Y | | The number of milliseconds the pool will wait to acquire a resource before rejecting. | | acquireRetryInterval | integer | N | 100 | The number of milliseconds the pool will wait before retrying resource acquition after a failure. | @@ -106,10 +107,12 @@ There are equally strong arguments to re-issue the most recently used reosurce a #### Errors -| Code | Notes | -| ----------------------------------- | --------------------------------------------------------------------------- | -| ERR_X‑POOL_OPERATION_TIMEDOUT | The acquire timeout was exceeded | -| ERR_X‑POOL_OPERATION_FAILED | The resource could not be acquired (e.g. because the pool is shutting down) | +| Code | Notes | +| ----------------------------------------- | --------------------------------------------------------------------------- | +| ERR_X‑POOL_OPERATION_TIMEDOUT | The acquire timeout was exceeded | +| ERR_X‑POOL_OPERATION_FAILED | The resource could not be acquired (e.g. because the pool is shutting down) | +| ERR_X‑POOL_MAX_QUEUE_DEPTH_EXCEEDED | The maximum acquire queue depth was exceeded | + ### release(resource: T) : void @@ -237,7 +240,7 @@ Migrating from [generic-pool](https://github.com/coopernurse/node-pool) is relat | ------------------------- | -------------- | --------------------------------------------------------------------------------------------------------------------- | | max | maxSize | | | min | minSize | | -| maxWaitingClients | Not Supported | We suggest using the acquireTimeout option instead. | +| maxWaitingClients | maxQueueDepth | | | testOnBorrow | Not Supported | Use an empty `factory.validate` method instead. | | acquireTimeoutMillis | acquireTimeout | This option is mandatory with X-Pool. | | destroyTimeoutMillis | destroyTimeout | This option is mandatory with X-Pool. | diff --git a/lib/Errors.js b/lib/Errors.js index 32cb5eb..78b6dd4 100644 --- a/lib/Errors.js +++ b/lib/Errors.js @@ -23,6 +23,10 @@ class OperationFailed extends XPoolError { static code = 'ERR_X-POOL_OPERATION_FAILED'; } +class MaxQueueDepthExceeded extends XPoolError { + static code = 'ERR_X-POOL_MAX_QUEUE_DEPTH_EXCEEDED'; +} + class ResourceCreationFailed extends XPoolError { static code = 'ERR_X-POOL_RESOURCE_CREATION_FAILED'; } @@ -40,6 +44,7 @@ module.exports = { ConfigurationError, OperationTimedout, OperationFailed, + MaxQueueDepthExceeded, ResourceCreationFailed, ResourceValidationFailed, ResourceDestructionFailed, diff --git a/lib/Pool.js b/lib/Pool.js index ac9c2df..b9a3364 100644 --- a/lib/Pool.js +++ b/lib/Pool.js @@ -19,7 +19,7 @@ module.exports = class Pool extends EventEmitter { this._destroyTimeout = validateNumber('destroyTimeout', options, true, 1); this._initialiseTimeout = validateNumber('initialiseTimeout', options, false, 1); this._shutdownTimeout = validateNumber('shutdownTimeout', options, false, 1); - this._state = new State({ maxSize: options.maxSize, minSize: options.minSize }); + this._state = new State({ maxSize: options.maxSize, minSize: options.minSize, maxQueueDepth: options.maxQueueDepth }); this.initialise = this._initialiseTimeout ? this._initialiseWithTimeout : this._initialiseWithoutTimeout; } diff --git a/lib/State.js b/lib/State.js index 45ff529..cadd54d 100644 --- a/lib/State.js +++ b/lib/State.js @@ -1,13 +1,15 @@ const { validateNumber, validateUpperBoundary } = require('./validation'); +const { MaxQueueDepthExceeded } = require('./Errors'); module.exports = class State { constructor(options) { this._maxSize = validateNumber('maxSize', options, false, 1) || Infinity; this._minSize = validateNumber('minSize', options, false, 0) || 0; - validateUpperBoundary('minSize', 'maxSize', options); + this._maxQueueDepth = validateNumber('maxQueueDepth', options, false, 1) || Infinity; + this._queued = []; this._acquiringCount = 0; this._acquired = []; @@ -65,6 +67,7 @@ module.exports = class State { } queueAcquireRequest(request) { + if (this._queued.length === this._maxQueueDepth) throw new MaxQueueDepthExceeded(`Maximum queue depth of ${this._maxQueueDepth} exceeded`); this._queued.push(request); } diff --git a/test/Pool.test.js b/test/Pool.test.js index 2004ca6..fba506a 100644 --- a/test/Pool.test.js +++ b/test/Pool.test.js @@ -97,6 +97,27 @@ describe('Pool', () => { }); }); + describe('maxQueueDepth', () => { + + it('should require maxQueueDepth to be a number', () => { + const factory = new TestFactory(); + throws(() => new Pool({ factory, acquireTimeout: 1000, destroyTimeout: 1000, maxQueueDepth: false }), (err) => { + eq(err.code, 'ERR_X-POOL_CONFIGURATION_ERROR'); + eq(err.message, 'The maxQueueDepth option must be a number. Please read the documentation at https://acuminous.github.io/x-pool'); + return true; + }); + }); + + it('should require maxQueueDepth to be at least 1', () => { + const factory = new TestFactory(); + throws(() => new Pool({ factory, acquireTimeout: 1000, destroyTimeout: 1000, maxQueueDepth: 0 }), (err) => { + eq(err.code, 'ERR_X-POOL_CONFIGURATION_ERROR'); + eq(err.message, 'The maxQueueDepth option must be at least 1. Please read the documentation at https://acuminous.github.io/x-pool'); + return true; + }); + }); + }); + describe('acquireTimeout', () => { it('should require an acquireTimeout', () => { @@ -241,6 +262,14 @@ describe('Pool', () => { eq(idle, 1); eq(acquired, 1); }); + + it('should not exceed max queue depth when initialising', async () => { + const resources = new Array(100).fill().map((_, index) => ({ createDelay: 100, value: `R${index + 1}` })); + const factory = new TestFactory(resources); + const pool = createPool({ factory, minSize: 100, maxSize: 100, maxQueueDepth: 3, acquireTimeout: 1000 }); + + await pool.initialise(); + }); }); describe('acquire', () => { @@ -477,6 +506,24 @@ describe('Pool', () => { ok(after - before >= 99, 'Pool was not temporarily blocked'); eq(resource2, 'R2'); }); + + it('should honour max queue depth', async () => { + const resources = ['R1']; + const factory = new TestFactory(resources); + const pool = createPool({ factory, maxSize: 1, maxQueueDepth: 3, acquireTimeout: 100 }); + + await pool.acquire(); + + // The following acquisitions will timeout + pool.acquire().catch(() => {}); + pool.acquire().catch(() => {}); + pool.acquire().catch(() => {}); + + await rejects(() => pool.acquire(), (err) => { + eq(err.code, 'ERR_X-POOL_MAX_QUEUE_DEPTH_EXCEEDED'); + return true; + }); + }); }); describe('release', () => { @@ -1084,8 +1131,8 @@ describe('Pool', () => { }); }); -function createPool({ factory, minSize, maxSize, initialiseTimeout, acquireTimeout = 1000, acquireRetryInterval, destroyTimeout = 1000 }) { - return new Pool({ factory, minSize, maxSize, initialiseTimeout, acquireTimeout, acquireRetryInterval, destroyTimeout }); +function createPool({ factory, minSize, maxSize, maxQueueDepth, initialiseTimeout, acquireTimeout = 1000, acquireRetryInterval, destroyTimeout = 1000 }) { + return new Pool({ factory, minSize, maxSize, maxQueueDepth, initialiseTimeout, acquireTimeout, acquireRetryInterval, destroyTimeout }); } function acquireResources(pool, count) {