-
-
Notifications
You must be signed in to change notification settings - Fork 185
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature/update priority #209
base: main
Are you sure you want to change the base?
Changes from all commits
a949f8d
4cdcbf7
8383aac
a60bfc1
cafe886
6a370fe
ccb7a53
686f0d8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||
---|---|---|---|---|---|---|---|---|---|---|
|
@@ -43,6 +43,9 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT | |||||||||
|
||||||||||
readonly #throwOnTimeout: boolean; | ||||||||||
|
||||||||||
/** Use to assign a unique identifier to a promise function, if not explicitly specified */ | ||||||||||
#idAssigner = 1; | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this should be a BigInt, to ensure it will never overflow. |
||||||||||
|
||||||||||
/** | ||||||||||
Per-operation timeout in milliseconds. Operations fulfill once `timeout` elapses if they haven't already. | ||||||||||
|
||||||||||
|
@@ -228,12 +231,21 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT | |||||||||
}); | ||||||||||
} | ||||||||||
|
||||||||||
setPriority(id: string, priority: number) { | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Doc comment |
||||||||||
this.#queue.setPriority(id, priority); | ||||||||||
} | ||||||||||
|
||||||||||
/** | ||||||||||
Adds a sync or async task to the queue. Always returns a promise. | ||||||||||
*/ | ||||||||||
async add<TaskResultType>(function_: Task<TaskResultType>, options: {throwOnTimeout: true} & Exclude<EnqueueOptionsType, 'throwOnTimeout'>): Promise<TaskResultType>; | ||||||||||
async add<TaskResultType>(function_: Task<TaskResultType>, options?: Partial<EnqueueOptionsType>): Promise<TaskResultType | void>; | ||||||||||
async add<TaskResultType>(function_: Task<TaskResultType>, options: Partial<EnqueueOptionsType> = {}): Promise<TaskResultType | void> { | ||||||||||
// Incase id is not defined | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||
if (options.id === undefined) { | ||||||||||
options.id = (this.#idAssigner++).toString(); | ||||||||||
} | ||||||||||
Comment on lines
+245
to
+247
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||
|
||||||||||
options = { | ||||||||||
timeout: this.timeout, | ||||||||||
throwOnTimeout: this.#throwOnTimeout, | ||||||||||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -69,6 +69,7 @@ export type QueueAddOptions = { | |||||
@default 0 | ||||||
*/ | ||||||
readonly priority?: number; | ||||||
id?: string; | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Doc comment. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
} & TaskOptions & TimeoutOptions; | ||||||
|
||||||
export type TaskOptions = { | ||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,10 +17,11 @@ export default class PriorityQueue implements Queue<RunFunction, PriorityQueueOp | |
|
||
const element = { | ||
priority: options.priority, | ||
id: options.id, | ||
run, | ||
}; | ||
|
||
if (this.size && this.#queue[this.size - 1]!.priority! >= options.priority!) { | ||
if (this.size === 0 || this.#queue[this.size - 1]!.priority! >= options.priority!) { | ||
this.#queue.push(element); | ||
return; | ||
} | ||
|
@@ -32,6 +33,20 @@ export default class PriorityQueue implements Queue<RunFunction, PriorityQueueOp | |
this.#queue.splice(index, 0, element); | ||
} | ||
|
||
setPriority(id: string, priority: number) { | ||
const existingIndex: number = this.#queue.findIndex((element: Readonly<PriorityQueueOptions>) => element.id === id); | ||
if (existingIndex === -1) { | ||
throw new Error('Invalid Index - No promise function of specified id available in the queue.'); | ||
} | ||
|
||
const [item] = this.#queue.splice(existingIndex, 1); | ||
if (item === undefined) { | ||
throw new Error('Undefined Item - No promise function of specified id available in the queue.'); | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think these should throw. It should be allowed to set a priority without knowing beforehand whether the has already finished. |
||
|
||
this.enqueue(item.run, {priority, id}); | ||
} | ||
|
||
dequeue(): RunFunction | undefined { | ||
const item = this.#queue.shift(); | ||
return item?.run; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,7 +6,7 @@ import inRange from 'in-range'; | |
import timeSpan from 'time-span'; | ||
import randomInt from 'random-int'; | ||
import pDefer from 'p-defer'; | ||
import PQueue, {AbortError} from '../source/index.js'; | ||
import PQueue from '../source/index.js'; | ||
|
||
const fixture = Symbol('fixture'); | ||
|
||
|
@@ -1134,3 +1134,43 @@ test('aborting multiple jobs at the same time', async t => { | |
await t.throwsAsync(task2, {instanceOf: DOMException}); | ||
t.like(queue, {size: 0, pending: 0}); | ||
}); | ||
|
||
test('.setPriority() - execute a promise before planned', async t => { | ||
const result: string[] = []; | ||
const queue = new PQueue({concurrency: 1}); | ||
queue.add(async () => { | ||
await delay(400); | ||
result.push('🐌'); | ||
}, {id: '🐌'}); | ||
queue.add(async () => { | ||
await delay(400); | ||
result.push('🦆'); | ||
}, {id: '🦆'}); | ||
queue.add(async () => { | ||
await delay(400); | ||
result.push('🐢'); | ||
}, {id: '🐢'}); | ||
queue.setPriority('🐢', 1); | ||
await queue.onIdle(); | ||
t.deepEqual(result, ['🐌', '🐢', '🦆']); | ||
}); | ||
|
||
test.failing('.setPriority() - with invalid "id"', async t => { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
const result: string[] = []; | ||
const queue = new PQueue({concurrency: 1}); | ||
queue.add(async () => { | ||
await delay(400); | ||
result.push('🐌'); | ||
}, {id: '🐌'}); | ||
queue.add(async () => { | ||
await delay(400); | ||
result.push('🦆'); | ||
}, {id: '🦆'}); | ||
queue.add(async () => { | ||
await delay(400); | ||
result.push('🐢'); | ||
}, {id: '🐢'}); | ||
queue.setPriority('⚡️', 1); | ||
await queue.onIdle(); | ||
t.deepEqual(result, ['🐌', '🐢', '🦆']); | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.