From a949f8dfdbc7c6836e9a299c2f4a3d602bab222c Mon Sep 17 00:00:00 2001 From: Raishav Hanspal Date: Wed, 10 Jul 2024 23:57:35 +0530 Subject: [PATCH 1/5] #208 - add update priority logic, add promise started event --- source/index.ts | 18 ++++++++++++++---- source/priority-queue.ts | 6 ++++++ source/queue.ts | 1 + 3 files changed, 21 insertions(+), 4 deletions(-) diff --git a/source/index.ts b/source/index.ts index faf2c9e..eb58d27 100644 --- a/source/index.ts +++ b/source/index.ts @@ -8,7 +8,7 @@ type Task = | ((options: TaskOptions) => PromiseLike) | ((options: TaskOptions) => TaskResultType); -type EventName = 'active' | 'idle' | 'empty' | 'add' | 'next' | 'completed' | 'error'; +type EventName = 'active' | 'idle' | 'empty' | 'add' | 'next' | 'completed' | 'error' | 'started'; /** Promise queue with concurrency control. @@ -43,6 +43,8 @@ export default class PQueue(function_: Task, options: {throwOnTimeout: true} & Exclude): Promise; - async add(function_: Task, options?: Partial): Promise; - async add(function_: Task, options: Partial = {}): Promise { + async add(function_: Task, options: { throwOnTimeout: true } & Exclude, uid?: string): Promise; + async add(function_: Task, options?: Partial, uid?: string): Promise; + async add(function_: Task, options: Partial = {}, uid?: string): Promise { + // incase uid is not defined + uid = (this.#uidAssigner++).toString(); options = { timeout: this.timeout, throwOnTimeout: this.#throwOnTimeout, + uid, ...options, }; @@ -258,6 +267,7 @@ export default class PQueue { @@ -32,6 +33,11 @@ export default class PriorityQueue implements Queue) => element.uid === uid); + item && (item.priority = priority || ((item.priority || 0) + 1)); + } + dequeue(): RunFunction | undefined { const item = this.#queue.shift(); return item?.run; diff --git a/source/queue.ts b/source/queue.ts index be3316c..cd325ee 100644 --- a/source/queue.ts +++ b/source/queue.ts @@ -5,4 +5,5 @@ export type Queue = { filter: (options: Readonly>) => Element[]; dequeue: () => Element | undefined; enqueue: (run: Element, options?: Partial) => void; + prioritize: (uid: string, priority: number) => void; }; From 4cdcbf72b3966adfd66c368c9ed91fdf8f169974 Mon Sep 17 00:00:00 2001 From: Raishav Hanspal Date: Thu, 11 Jul 2024 00:05:19 +0530 Subject: [PATCH 2/5] #208 - update for uidAssigner --- source/index.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/source/index.ts b/source/index.ts index eb58d27..aecf859 100644 --- a/source/index.ts +++ b/source/index.ts @@ -43,6 +43,7 @@ export default class PQueue(function_: Task, options?: Partial, uid?: string): Promise; async add(function_: Task, options: Partial = {}, uid?: string): Promise { // incase uid is not defined - uid = (this.#uidAssigner++).toString(); + !uid && (uid = (this.#uidAssigner++).toString()); options = { timeout: this.timeout, throwOnTimeout: this.#throwOnTimeout, From 8383aacb2a984f7f2850caf73963c446ab1515d2 Mon Sep 17 00:00:00 2001 From: Raishav Hanspal Date: Thu, 11 Jul 2024 23:56:17 +0530 Subject: [PATCH 3/5] #208 - add test cases, add linting fixes --- source/index.ts | 15 +++++++------ source/priority-queue.ts | 20 +++++++++++++++-- test/test.ts | 46 +++++++++++++++++++++++++++++++++++++++- 3 files changed, 72 insertions(+), 9 deletions(-) diff --git a/source/index.ts b/source/index.ts index aecf859..bb2071d 100644 --- a/source/index.ts +++ b/source/index.ts @@ -43,8 +43,8 @@ export default class PQueue(function_: Task, options: { throwOnTimeout: true } & Exclude, uid?: string): Promise; + async add(function_: Task, options: {throwOnTimeout: true} & Exclude, uid?: string): Promise; async add(function_: Task, options?: Partial, uid?: string): Promise; async add(function_: Task, options: Partial = {}, uid?: string): Promise { - // incase uid is not defined - !uid && (uid = (this.#uidAssigner++).toString()); + // Incase uid is not defined + if (uid === undefined) { + uid = (this.#uidAssigner++).toString(); + } + options = { timeout: this.timeout, throwOnTimeout: this.#throwOnTimeout, @@ -268,7 +271,7 @@ export default class PQueue) => element.uid === uid); - item && (item.priority = priority || ((item.priority || 0) + 1)); + const queueIndex: number = this.#queue.findIndex((element: Readonly) => element.uid === uid); + const [item] = this.#queue.splice(queueIndex, 1); + if (item === undefined) { + return; + } + + item.priority = priority ?? ((item.priority ?? 0) + 1); + if (this.size && this.#queue[this.size - 1]!.priority! >= priority!) { + this.#queue.push(item); + return; + } + + const index = lowerBound( + this.#queue, item, + (a: Readonly, b: Readonly) => b.priority! - a.priority!, + ); + + this.#queue.splice(index, 0, item); } dequeue(): RunFunction | undefined { diff --git a/test/test.ts b/test/test.ts index d511eae..ad54b68 100644 --- a/test/test.ts +++ b/test/test.ts @@ -6,7 +6,7 @@ import inRange from 'in-range'; import timeSpan from 'time-span'; import randomInt from 'random-int'; import pDefer from 'p-defer'; -import PQueue, {AbortError} from '../source/index.js'; +import PQueue from '../source/index.js'; const fixture = Symbol('fixture'); @@ -1134,3 +1134,47 @@ test('aborting multiple jobs at the same time', async t => { await t.throwsAsync(task2, {instanceOf: DOMException}); t.like(queue, {size: 0, pending: 0}); }); + +test('.setPriority() - execute a promise before planned', async t => { + const result: string[] = []; + const queue = new PQueue({concurrency: 1}); + queue.add(async () => { + await delay(400); + result.push('🐌'); + }, {}, 'snail'); + queue.add(async () => { + await delay(400); + result.push('🦆'); + }, {}, 'duck'); + queue.add(async () => { + await delay(400); + result.push('🐢'); + }, {}, 'turtle'); + queue.setPriority('turtle', 1); + await queue.onIdle(); + t.deepEqual(result, ['🐌', '🐢', '🦆']); +}); + +test('started event to check when promise function is called', async t => { + const result: string[] = []; + const queue = new PQueue({concurrency: 1}); + queue.add(async () => { + await delay(400); + result.push('🐌'); + }, {}, '🐌'); + queue.add(async () => { + await delay(400); + result.push('🦆'); + }, {}, '🦆'); + queue.add(async () => { + await delay(400); + result.push('🐢'); + }, {}, '🐢'); + queue.on('started', uid => { + if (uid === '🦆') { + t.deepEqual(result, ['🐌', '🐢']); + } + }); + queue.setPriority('🐢', 1); + await queue.onIdle(); +}); From a60bfc19a32c1c5a9601e6b64ba54c1ee5f578ef Mon Sep 17 00:00:00 2001 From: Raishav Hanspal Date: Fri, 12 Jul 2024 09:33:03 +0530 Subject: [PATCH 4/5] #208 - review fixes, remove started event changes --- source/index.ts | 22 ++++++++++------------ source/options.ts | 1 + source/priority-queue.ts | 7 +++---- source/queue.ts | 2 +- test/test.ts | 32 ++++---------------------------- 5 files changed, 19 insertions(+), 45 deletions(-) diff --git a/source/index.ts b/source/index.ts index bb2071d..3459f98 100644 --- a/source/index.ts +++ b/source/index.ts @@ -8,7 +8,7 @@ type Task = | ((options: TaskOptions) => PromiseLike) | ((options: TaskOptions) => TaskResultType); -type EventName = 'active' | 'idle' | 'empty' | 'add' | 'next' | 'completed' | 'error' | 'started'; +type EventName = 'active' | 'idle' | 'empty' | 'add' | 'next' | 'completed' | 'error'; /** Promise queue with concurrency control. @@ -44,7 +44,7 @@ export default class PQueue(function_: Task, options: {throwOnTimeout: true} & Exclude, uid?: string): Promise; - async add(function_: Task, options?: Partial, uid?: string): Promise; - async add(function_: Task, options: Partial = {}, uid?: string): Promise { - // Incase uid is not defined - if (uid === undefined) { - uid = (this.#uidAssigner++).toString(); + async add(function_: Task, options: {throwOnTimeout: true} & Exclude): Promise; + async add(function_: Task, options?: Partial): Promise; + async add(function_: Task, options: Partial = {}): Promise { + // Incase id is not defined + if (options.id === undefined) { + options.id = (this.#idAssigner++).toString(); } options = { timeout: this.timeout, throwOnTimeout: this.#throwOnTimeout, - uid, ...options, }; @@ -271,7 +270,6 @@ export default class PQueue { @@ -33,9 +32,9 @@ export default class PriorityQueue implements Queue) => element.uid === uid); - const [item] = this.#queue.splice(queueIndex, 1); + setPriority(id: string, priority?: number) { + const existingIndex: number = this.#queue.findIndex((element: Readonly) => element.id === id); + const [item] = this.#queue.splice(existingIndex, 1); if (item === undefined) { return; } diff --git a/source/queue.ts b/source/queue.ts index cd325ee..459d29b 100644 --- a/source/queue.ts +++ b/source/queue.ts @@ -5,5 +5,5 @@ export type Queue = { filter: (options: Readonly>) => Element[]; dequeue: () => Element | undefined; enqueue: (run: Element, options?: Partial) => void; - prioritize: (uid: string, priority: number) => void; + setPriority: (id: string, priority: number) => void; }; diff --git a/test/test.ts b/test/test.ts index ad54b68..da828e0 100644 --- a/test/test.ts +++ b/test/test.ts @@ -1141,40 +1141,16 @@ test('.setPriority() - execute a promise before planned', async t => { queue.add(async () => { await delay(400); result.push('🐌'); - }, {}, 'snail'); + }, {id: '🐌'}); queue.add(async () => { await delay(400); result.push('🦆'); - }, {}, 'duck'); + }, {id: '🦆'}); queue.add(async () => { await delay(400); result.push('🐢'); - }, {}, 'turtle'); - queue.setPriority('turtle', 1); - await queue.onIdle(); - t.deepEqual(result, ['🐌', '🐢', '🦆']); -}); - -test('started event to check when promise function is called', async t => { - const result: string[] = []; - const queue = new PQueue({concurrency: 1}); - queue.add(async () => { - await delay(400); - result.push('🐌'); - }, {}, '🐌'); - queue.add(async () => { - await delay(400); - result.push('🦆'); - }, {}, '🦆'); - queue.add(async () => { - await delay(400); - result.push('🐢'); - }, {}, '🐢'); - queue.on('started', uid => { - if (uid === '🦆') { - t.deepEqual(result, ['🐌', '🐢']); - } - }); + }, {id: '🐢'}); queue.setPriority('🐢', 1); await queue.onIdle(); + t.deepEqual(result, ['🐌', '🐢', '🦆']); }); From 28e14c8e5cc86aebf11e9553db293c02b50f6883 Mon Sep 17 00:00:00 2001 From: Raishav Hanspal Date: Sun, 14 Jul 2024 08:11:21 +0530 Subject: [PATCH 5/5] add new event invoke to track when a certain promise fucntion is invoked --- source/index.ts | 3 ++- test/test.ts | 19 +++++++++++++++++++ 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/source/index.ts b/source/index.ts index 3459f98..a332d3a 100644 --- a/source/index.ts +++ b/source/index.ts @@ -8,7 +8,7 @@ type Task = | ((options: TaskOptions) => PromiseLike) | ((options: TaskOptions) => TaskResultType); -type EventName = 'active' | 'idle' | 'empty' | 'add' | 'next' | 'completed' | 'error'; +type EventName = 'active' | 'idle' | 'empty' | 'add' | 'next' | 'completed' | 'error' | 'invoke'; /** Promise queue with concurrency control. @@ -270,6 +270,7 @@ export default class PQueue { await queue.onIdle(); t.deepEqual(result, ['🐌', '🐢', '🦆']); }); + +test('track event "invoke" to check with respect to concurrency - 1', async t => { + const invoked: Array = []; + const queue = new PQueue({concurrency: 1}); + queue.on('invoke', (data: QueueAddOptions) => { + invoked.push(data.id); + }); + const job1 = queue.add(async () => { + await delay(400); + }, {id: '🐌'}); + const job2 = queue.add(async () => { + await delay(400); + }, {id: '🦆'}); + t.deepEqual(invoked, ['🐌']); + await job1; + t.deepEqual(invoked, ['🐌', '🦆']); + await queue.onIdle(); +});