Skip to content

Commit

Permalink
Tidy up
Browse files Browse the repository at this point in the history
  • Loading branch information
cressie176 committed Aug 25, 2023
1 parent 58b3929 commit 822a125
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 67 deletions.
127 changes: 61 additions & 66 deletions lib/Pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,21 @@ module.exports = class Pool extends EventEmitter {

async initialise() {
this._assertRunning();
const initialSize = this._getInitialSize();
const initialSize = this._state.deficit;
return new InitialisePoolOperation(this, { initialSize }).run(async () => {
const resources = this._initialiseTimeout ? await this._initialiseWithTimeout() : await this._initialiseWithoutTimeout();
const resources = this._initialiseTimeout ? await this._initialiseWithTimeout(initialSize) : await this._initialiseWithoutTimeout(initialSize);
await Promise.all(resources.map((resource) => this.release(resource)));
});
}

async _initialiseWithTimeout() {
const task = this._createInitialiseTask();
async _initialiseWithTimeout(size) {
const task = this._createInitialiseTask(size);
return task.execute();
}

async _initialiseWithoutTimeout() {
async _initialiseWithoutTimeout(size) {
const task = { isAborted: () => false };
return this._batchAquire(task);
return this._batchAquire(task, size);
}

async acquire() {
Expand Down Expand Up @@ -105,45 +105,28 @@ module.exports = class Pool extends EventEmitter {
});
}

_createShutdownTask() {
const fn = async () => {
const stopped = this._commenceShutdown();
await this._checkShutdown();
return stopped;
};
return new TimedTask({ name: 'shutdown', fn, timeout: this._destroyTimeout });
}

_commenceShutdown() {
return new Promise((resolve, reject) => {
this._stopping = { resolve, reject };
});
}

async _checkShutdown() {
if (!this._stopping) return;
await this._destroySpareResources();
this.evictBadResources();
if (this._state.isEmpty()) this._completeShutdown();
}

_completeShutdown() {
this._stopping.resolve();
}

_assertRunning() {
if (this._stopping) throw new OperationFailed('The pool has been shutdown');
}

async _destroySpareResources() {
const { spare } = this._state;
return new DestroySpareResourcesOperation(this, { spare }).run((async () => {
const destroyResources = new Array(spare).fill().map(async () => {
const resource = this._state.getIdleResource();
return this._destroyResource(resource);
});
await Promise.all(destroyResources);
}));
_createInitialiseTask(size) {
const fn = async (task) => this._batchAquire(task, size);
return new TimedTask({ name: 'initialise', fn, timeout: this._initialiseTimeout });
}

async _batchAquire(task, size) {
const acquireResources = new Array(size).fill().map(async () => {
let resource;
while (!resource && !task.isAborted()) {
try {
resource = await this.acquire();
} catch (err) {
// Error events already emitted
}
}
return resource;
});
return Promise.all(acquireResources);
}

_createAcquireTask(op) {
Expand Down Expand Up @@ -172,31 +155,6 @@ module.exports = class Pool extends EventEmitter {
resolve();
}

_createInitialiseTask() {
const fn = async (task) => this._batchAquire(task);
return new TimedTask({ name: 'initialise', fn, timeout: this._initialiseTimeout });
}

async _batchAquire(task) {
const batchSize = this._getInitialSize();
const acquireResources = new Array(batchSize).fill().map(async () => {
let resource;
while (!resource && !task.isAborted()) {
try {
resource = await this.acquire();
} catch (err) {
// Error events already emitted
}
}
return resource;
});
return Promise.all(acquireResources);
}

_getInitialSize() {
return Math.max(this._state.minSize - this._state.size, 0);
}

async _acquireResource(op, task) {
let resource;
while (!resource && !task.isAborted()) {
Expand Down Expand Up @@ -272,6 +230,43 @@ module.exports = class Pool extends EventEmitter {
return new TimedTask({ name: 'destroy', fn, timeout: this._destroyTimeout, onLateResolve });
}

_createShutdownTask() {
const fn = async () => {
const stopped = this._commenceShutdown();
await this._checkShutdown();
return stopped;
};
return new TimedTask({ name: 'shutdown', fn, timeout: this._destroyTimeout });
}

_commenceShutdown() {
return new Promise((resolve, reject) => {
this._stopping = { resolve, reject };
});
}

async _checkShutdown() {
if (!this._stopping) return;
await this._destroySpareResources();
this.evictBadResources();
if (this._state.isEmpty()) this._completeShutdown();
}

_completeShutdown() {
this._stopping.resolve();
}

async _destroySpareResources() {
const { spare } = this._state;
return new DestroySpareResourcesOperation(this, { spare }).run((async () => {
const destroyResources = new Array(spare).fill().map(async () => {
const resource = this._state.getIdleResource();
return this._destroyResource(resource);
});
await Promise.all(destroyResources);
}));
}

_emitEvent(event) {
setImmediate(() => this.emit(event.code, event) || this.emit('EVT_X-POOL_Event', event));
}
Expand Down
4 changes: 4 additions & 0 deletions lib/State.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ module.exports = class State {
return this._maxSize;
}

get deficit() {
return Math.max(this.minSize - this.size, 0);
}

get queued() {
return this._queued.length;
}
Expand Down
2 changes: 1 addition & 1 deletion test/Pool.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -746,7 +746,7 @@ describe('Pool', () => {
const stats1 = pool.stats();
eq(stats1.bad, 2);

await pool.evictBadResources();
pool.evictBadResources();

const stats2 = pool.stats();
eq(stats2.bad, 0);
Expand Down

0 comments on commit 822a125

Please sign in to comment.