From 5362aa77e360fdb6bf76ca9f98e9478ad3ba9663 Mon Sep 17 00:00:00 2001 From: Richie Bendall Date: Fri, 29 Jul 2022 19:52:41 +1200 Subject: [PATCH] Add `empty` event Signed-off-by: Richie Bendall --- package.json | 1 + readme.md | 8 +++++ source/index.ts | 86 ++++++++++++++++++------------------------------- test/test.ts | 33 +++++++++++++++++++ 4 files changed, 74 insertions(+), 54 deletions(-) diff --git a/package.json b/package.json index 52db7cc..10b9b85 100644 --- a/package.json +++ b/package.json @@ -57,6 +57,7 @@ "delay": "^5.0.0", "in-range": "^3.0.0", "nyc": "^15.1.0", + "p-defer": "^4.0.0", "random-int": "^3.0.0", "time-span": "^5.0.0", "ts-node": "^10.4.0", diff --git a/readme.md b/readme.md index 578a6f8..66ceb29 100644 --- a/readme.md +++ b/readme.md @@ -308,10 +308,18 @@ queue.on('error', error => { queue.add(() => Promise.reject(new Error('error'))); ``` +#### empty + +Emitted every time the queue becomes empty. + +Useful if you for example add additional items at a later time. + #### idle Emitted every time the queue becomes empty and all promises have completed; `queue.size === 0 && queue.pending === 0`. +The difference with `empty` is that `idle` guarantees that all work from the queue has finished. `empty` merely signals that the queue is empty, but it could mean that some promises haven't completed yet. + ```js import delay from 'delay'; import PQueue from 'p-queue'; diff --git a/source/index.ts b/source/index.ts index 2965430..b91c61e 100644 --- a/source/index.ts +++ b/source/index.ts @@ -4,15 +4,10 @@ import {Queue, RunFunction} from './queue.js'; import PriorityQueue from './priority-queue.js'; import {QueueAddOptions, Options, TaskOptions} from './options.js'; -type ResolveFunction = (value?: T | PromiseLike) => void; - type Task = | ((options: TaskOptions) => PromiseLike) | ((options: TaskOptions) => TaskResultType); -// eslint-disable-next-line @typescript-eslint/no-empty-function -const empty = (): void => {}; - const timeoutError = new TimeoutError(); /** @@ -20,10 +15,12 @@ The error thrown by `queue.add()` when a job is aborted before it is run. See `s */ export class AbortError extends Error {} +type EventName = 'active' | 'idle' | 'empty' | 'add' | 'next' | 'completed' | 'error'; + /** Promise queue with concurrency control. */ -export default class PQueue = PriorityQueue, EnqueueOptionsType extends QueueAddOptions = QueueAddOptions> extends EventEmitter<'active' | 'idle' | 'add' | 'next' | 'completed' | 'error'> { +export default class PQueue = PriorityQueue, EnqueueOptionsType extends QueueAddOptions = QueueAddOptions> extends EventEmitter { readonly #carryoverConcurrencyCount: boolean; readonly #isIntervalIgnored: boolean; @@ -51,13 +48,14 @@ export default class PQueue) { super(); @@ -88,7 +86,7 @@ export default class PQueue { if (options.throwOnTimeout === undefined ? this.#throwOnTimeout : options.throwOnTimeout) { reject(timeoutError); @@ -331,13 +326,7 @@ export default class PQueue(resolve => { - const existingResolve = this.#resolveEmpty; - this.#resolveEmpty = () => { - existingResolve(); - resolve(); - }; - }); + await this.#onEvent('empty'); } /** @@ -353,16 +342,7 @@ export default class PQueue(resolve => { - const listener = () => { - if (this.#queue.size < limit) { - this.removeListener('next', listener); - resolve(); - } - }; - - this.on('next', listener); - }); + await this.#onEvent('next', () => this.#queue.size < limit); } /** @@ -376,12 +356,21 @@ export default class PQueue(resolve => { - const existingResolve = this.#resolveIdle; - this.#resolveIdle = () => { - existingResolve(); + await this.#onEvent('idle'); + } + + async #onEvent(event: EventName, filter?: () => boolean): Promise { + return new Promise(resolve => { + const listener = () => { + if (filter && !filter()) { + return; + } + + this.off(event, listener); resolve(); }; + + this.on(event, listener); }); } @@ -415,17 +404,6 @@ export default class PQueue { t.is(timesCalled, 2); }); +test('should emit empty event when empty', async t => { + const queue = new PQueue({concurrency: 1}); + + let timesCalled = 0; + queue.on('empty', () => { + timesCalled++; + }); + + const {resolve: resolveJob1, promise: job1Promise} = pDefer(); + const {resolve: resolveJob2, promise: job2Promise} = pDefer(); + + const job1 = queue.add(async () => job1Promise); + const job2 = queue.add(async () => job2Promise); + t.is(queue.size, 1); + t.is(queue.pending, 1); + t.is(timesCalled, 0); + + resolveJob1(); + await job1; + + t.is(queue.size, 0); + t.is(queue.pending, 1); + t.is(timesCalled, 0); + + resolveJob2(); + await job2; + + t.is(queue.size, 0); + t.is(queue.pending, 0); + t.is(timesCalled, 1); +}); + test('should emit add event when adding task', async t => { const queue = new PQueue({concurrency: 1});