Skip to content

Commit

Permalink
Add maxQueueDepth
Browse files Browse the repository at this point in the history
  • Loading branch information
cressie176 committed Aug 24, 2023
1 parent f1188b7 commit 2d384ee
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 10 deletions.
15 changes: 9 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ try {

| Name | Type | Required | Default | Notes |
| -------------------- | ------- | -------- | -------- | ------------------------------------------------------------------------------------------------------------------------------ |
| factory | Factory | Y | | An instance of a resource factory |
| factory | Factory | Y | | An instance of a resource factory. |
| minSize | integer | N | 0 | Specifies the minimum pool size. |
| maxSize | integer | N | Infinity | Specifies the maximum pool size. |
| maxQueueDepth | integer | N | Infinity | Specifies the maximum acquire queue depth. |
| initialiseTimeout | integer | N | | The number of milliseconds the pool will wait to initialise. If unset the pool will wait undefinitely. |
| acquireTimeout | integer | Y | | The number of milliseconds the pool will wait to acquire a resource before rejecting. |
| acquireRetryInterval | integer | N | 100 | The number of milliseconds the pool will wait before retrying resource acquition after a failure. |
Expand Down Expand Up @@ -106,10 +107,12 @@ There are equally strong arguments to re-issue the most recently used reosurce a

#### Errors

| Code | Notes |
| ----------------------------------- | --------------------------------------------------------------------------- |
| ERR_X‑POOL_OPERATION_TIMEDOUT | The acquire timeout was exceeded |
| ERR_X‑POOL_OPERATION_FAILED | The resource could not be acquired (e.g. because the pool is shutting down) |
| Code | Notes |
| ----------------------------------------- | --------------------------------------------------------------------------- |
| ERR_X‑POOL_OPERATION_TIMEDOUT | The acquire timeout was exceeded |
| ERR_X‑POOL_OPERATION_FAILED | The resource could not be acquired (e.g. because the pool is shutting down) |
| ERR_X‑POOL_MAX_QUEUE_DEPTH_EXCEEDED | The maximum acquire queue depth was exceeded |


### release(resource: T) : void

Expand Down Expand Up @@ -237,7 +240,7 @@ Migrating from [generic-pool](https://github.com/coopernurse/node-pool) is relat
| ------------------------- | -------------- | --------------------------------------------------------------------------------------------------------------------- |
| max | maxSize | |
| min | minSize | |
| maxWaitingClients | Not Supported | We suggest using the acquireTimeout option instead. |
| maxWaitingClients | maxQueueDepth | |
| testOnBorrow | Not Supported | Use an empty `factory.validate` method instead. |
| acquireTimeoutMillis | acquireTimeout | This option is mandatory with X-Pool. |
| destroyTimeoutMillis | destroyTimeout | This option is mandatory with X-Pool. |
Expand Down
5 changes: 5 additions & 0 deletions lib/Errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ class OperationFailed extends XPoolError {
static code = 'ERR_X-POOL_OPERATION_FAILED';
}

class MaxQueueDepthExceeded extends XPoolError {
static code = 'ERR_X-POOL_MAX_QUEUE_DEPTH_EXCEEDED';
}

class ResourceCreationFailed extends XPoolError {
static code = 'ERR_X-POOL_RESOURCE_CREATION_FAILED';
}
Expand All @@ -40,6 +44,7 @@ module.exports = {
ConfigurationError,
OperationTimedout,
OperationFailed,
MaxQueueDepthExceeded,
ResourceCreationFailed,
ResourceValidationFailed,
ResourceDestructionFailed,
Expand Down
2 changes: 1 addition & 1 deletion lib/Pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ module.exports = class Pool extends EventEmitter {
this._destroyTimeout = validateNumber('destroyTimeout', options, true, 1);
this._initialiseTimeout = validateNumber('initialiseTimeout', options, false, 1);
this._shutdownTimeout = validateNumber('shutdownTimeout', options, false, 1);
this._state = new State({ maxSize: options.maxSize, minSize: options.minSize });
this._state = new State({ maxSize: options.maxSize, minSize: options.minSize, maxQueueDepth: options.maxQueueDepth });

this.initialise = this._initialiseTimeout ? this._initialiseWithTimeout : this._initialiseWithoutTimeout;
}
Expand Down
5 changes: 4 additions & 1 deletion lib/State.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
const { validateNumber, validateUpperBoundary } = require('./validation');
const { MaxQueueDepthExceeded } = require('./Errors');

module.exports = class State {

constructor(options) {
this._maxSize = validateNumber('maxSize', options, false, 1) || Infinity;
this._minSize = validateNumber('minSize', options, false, 0) || 0;

validateUpperBoundary('minSize', 'maxSize', options);

this._maxQueueDepth = validateNumber('maxQueueDepth', options, false, 1) || Infinity;

this._queued = [];
this._acquiringCount = 0;
this._acquired = [];
Expand Down Expand Up @@ -65,6 +67,7 @@ module.exports = class State {
}

queueAcquireRequest(request) {
if (this._queued.length === this._maxQueueDepth) throw new MaxQueueDepthExceeded(`Maximum queue depth of ${this._maxQueueDepth} exceeded`);
this._queued.push(request);
}

Expand Down
51 changes: 49 additions & 2 deletions test/Pool.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,27 @@ describe('Pool', () => {
});
});

describe('maxQueueDepth', () => {

it('should require maxQueueDepth to be a number', () => {
const factory = new TestFactory();
throws(() => new Pool({ factory, acquireTimeout: 1000, destroyTimeout: 1000, maxQueueDepth: false }), (err) => {
eq(err.code, 'ERR_X-POOL_CONFIGURATION_ERROR');
eq(err.message, 'The maxQueueDepth option must be a number. Please read the documentation at https://acuminous.github.io/x-pool');
return true;
});
});

it('should require maxQueueDepth to be at least 1', () => {
const factory = new TestFactory();
throws(() => new Pool({ factory, acquireTimeout: 1000, destroyTimeout: 1000, maxQueueDepth: 0 }), (err) => {
eq(err.code, 'ERR_X-POOL_CONFIGURATION_ERROR');
eq(err.message, 'The maxQueueDepth option must be at least 1. Please read the documentation at https://acuminous.github.io/x-pool');
return true;
});
});
});

describe('acquireTimeout', () => {

it('should require an acquireTimeout', () => {
Expand Down Expand Up @@ -241,6 +262,14 @@ describe('Pool', () => {
eq(idle, 1);
eq(acquired, 1);
});

it('should not exceed max queue depth when initialising', async () => {
const resources = new Array(100).fill().map((_, index) => ({ createDelay: 100, value: `R${index + 1}` }));
const factory = new TestFactory(resources);
const pool = createPool({ factory, minSize: 100, maxSize: 100, maxQueueDepth: 3, acquireTimeout: 1000 });

await pool.initialise();
});
});

describe('acquire', () => {
Expand Down Expand Up @@ -477,6 +506,24 @@ describe('Pool', () => {
ok(after - before >= 99, 'Pool was not temporarily blocked');
eq(resource2, 'R2');
});

it('should honour max queue depth', async () => {
const resources = ['R1'];
const factory = new TestFactory(resources);
const pool = createPool({ factory, maxSize: 1, maxQueueDepth: 3, acquireTimeout: 100 });

await pool.acquire();

// The following acquisitions will timeout
pool.acquire().catch(() => {});
pool.acquire().catch(() => {});
pool.acquire().catch(() => {});

await rejects(() => pool.acquire(), (err) => {
eq(err.code, 'ERR_X-POOL_MAX_QUEUE_DEPTH_EXCEEDED');
return true;
});
});
});

describe('release', () => {
Expand Down Expand Up @@ -1084,8 +1131,8 @@ describe('Pool', () => {
});
});

function createPool({ factory, minSize, maxSize, initialiseTimeout, acquireTimeout = 1000, acquireRetryInterval, destroyTimeout = 1000 }) {
return new Pool({ factory, minSize, maxSize, initialiseTimeout, acquireTimeout, acquireRetryInterval, destroyTimeout });
function createPool({ factory, minSize, maxSize, maxQueueDepth, initialiseTimeout, acquireTimeout = 1000, acquireRetryInterval, destroyTimeout = 1000 }) {
return new Pool({ factory, minSize, maxSize, maxQueueDepth, initialiseTimeout, acquireTimeout, acquireRetryInterval, destroyTimeout });
}

function acquireResources(pool, count) {
Expand Down

0 comments on commit 2d384ee

Please sign in to comment.