diff --git a/source/index.ts b/source/index.ts index faf2c9e..a332d3a 100644 --- a/source/index.ts +++ b/source/index.ts @@ -8,7 +8,7 @@ type Task = | ((options: TaskOptions) => PromiseLike) | ((options: TaskOptions) => TaskResultType); -type EventName = 'active' | 'idle' | 'empty' | 'add' | 'next' | 'completed' | 'error'; +type EventName = 'active' | 'idle' | 'empty' | 'add' | 'next' | 'completed' | 'error' | 'invoke'; /** Promise queue with concurrency control. @@ -43,6 +43,9 @@ export default class PQueue(function_: Task, options: {throwOnTimeout: true} & Exclude): Promise; async add(function_: Task, options?: Partial): Promise; async add(function_: Task, options: Partial = {}): Promise { + // Incase id is not defined + if (options.id === undefined) { + options.id = (this.#idAssigner++).toString(); + } + options = { timeout: this.timeout, throwOnTimeout: this.#throwOnTimeout, @@ -258,6 +270,7 @@ export default class PQueue) => element.id === id); + const [item] = this.#queue.splice(existingIndex, 1); + if (item === undefined) { + return; + } + + item.priority = priority ?? ((item.priority ?? 0) + 1); + if (this.size && this.#queue[this.size - 1]!.priority! >= priority!) { + this.#queue.push(item); + return; + } + + const index = lowerBound( + this.#queue, item, + (a: Readonly, b: Readonly) => b.priority! - a.priority!, + ); + + this.#queue.splice(index, 0, item); + } + dequeue(): RunFunction | undefined { const item = this.#queue.shift(); return item?.run; diff --git a/source/queue.ts b/source/queue.ts index be3316c..459d29b 100644 --- a/source/queue.ts +++ b/source/queue.ts @@ -5,4 +5,5 @@ export type Queue = { filter: (options: Readonly>) => Element[]; dequeue: () => Element | undefined; enqueue: (run: Element, options?: Partial) => void; + setPriority: (id: string, priority: number) => void; }; diff --git a/test/test.ts b/test/test.ts index d511eae..a843d2f 100644 --- a/test/test.ts +++ b/test/test.ts @@ -6,7 +6,8 @@ import inRange from 'in-range'; import timeSpan from 'time-span'; import randomInt from 'random-int'; import pDefer from 'p-defer'; -import PQueue, {AbortError} from '../source/index.js'; +import PQueue from '../source/index.js'; +import type {QueueAddOptions} from '../source/index.js'; const fixture = Symbol('fixture'); @@ -1134,3 +1135,41 @@ test('aborting multiple jobs at the same time', async t => { await t.throwsAsync(task2, {instanceOf: DOMException}); t.like(queue, {size: 0, pending: 0}); }); + +test('.setPriority() - execute a promise before planned', async t => { + const result: string[] = []; + const queue = new PQueue({concurrency: 1}); + queue.add(async () => { + await delay(400); + result.push('🐌'); + }, {id: '🐌'}); + queue.add(async () => { + await delay(400); + result.push('🦆'); + }, {id: '🦆'}); + queue.add(async () => { + await delay(400); + result.push('🐢'); + }, {id: '🐢'}); + queue.setPriority('🐢', 1); + await queue.onIdle(); + t.deepEqual(result, ['🐌', '🐢', '🦆']); +}); + +test('track event "invoke" to check with respect to concurrency - 1', async t => { + const invoked: Array = []; + const queue = new PQueue({concurrency: 1}); + queue.on('invoke', (data: QueueAddOptions) => { + invoked.push(data.id); + }); + const job1 = queue.add(async () => { + await delay(400); + }, {id: '🐌'}); + const job2 = queue.add(async () => { + await delay(400); + }, {id: '🦆'}); + t.deepEqual(invoked, ['🐌']); + await job1; + t.deepEqual(invoked, ['🐌', '🦆']); + await queue.onIdle(); +});