diff --git a/CHANGELOG.md b/CHANGELOG.md index 15873569..bd6d7910 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,10 @@ and this project adheres to ## [Unreleased] +### Fixed + +- Fix worker nodes priority queue k-buckets initialization. + ## [0.4.0] - 2024-05-07 ### Changed diff --git a/src/pools/utils.ts b/src/pools/utils.ts index dea5adc7..0111e426 100644 --- a/src/pools/utils.ts +++ b/src/pools/utils.ts @@ -191,6 +191,21 @@ export const checkWorkerNodeArguments = ( 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer', ) } + if (opts.tasksQueueBucketSize == null) { + throw new TypeError( + 'Cannot construct a worker node without a tasks queue bucket size option', + ) + } + if (!Number.isSafeInteger(opts.tasksQueueBucketSize)) { + throw new TypeError( + 'Cannot construct a worker node with a tasks queue bucket size option that is not an integer', + ) + } + if (opts.tasksQueueBucketSize <= 0) { + throw new RangeError( + 'Cannot construct a worker node with a tasks queue bucket size option that is not a positive integer', + ) + } } /** diff --git a/src/pools/worker-node.ts b/src/pools/worker-node.ts index 675cbbff..332b2066 100644 --- a/src/pools/worker-node.ts +++ b/src/pools/worker-node.ts @@ -79,7 +79,7 @@ export class WorkerNode ) } this.tasksQueueBackPressureSize = opts.tasksQueueBackPressureSize! - this.tasksQueue = new PriorityQueue>() + this.tasksQueue = new PriorityQueue>(opts.tasksQueueBucketSize) this.onBackPressureStarted = false this.taskFunctionsUsage = new Map() } diff --git a/tests/pools/abstract-pool.test.mjs b/tests/pools/abstract-pool.test.mjs index 286baca3..e3d81597 100644 --- a/tests/pools/abstract-pool.test.mjs +++ b/tests/pools/abstract-pool.test.mjs @@ -826,6 +826,7 @@ Deno.test({ expect(workerNode.tasksQueue).toBeInstanceOf(PriorityQueue) expect(workerNode.tasksQueue.size).toBe(0) expect(workerNode.tasksQueue.maxSize).toBe(0) + expect(workerNode.tasksQueue.k).toBe(numberOfWorkers * 2) } await pool.destroy() pool = new DynamicThreadPool( @@ -838,6 +839,7 @@ Deno.test({ expect(workerNode.tasksQueue).toBeInstanceOf(PriorityQueue) expect(workerNode.tasksQueue.size).toBe(0) expect(workerNode.tasksQueue.maxSize).toBe(0) + expect(workerNode.tasksQueue.k).toBe(numberOfWorkers * 2) } await pool.destroy() }, diff --git a/tests/pools/selection-strategies/worker-choice-strategies-context.test.mjs b/tests/pools/selection-strategies/worker-choice-strategies-context.test.mjs index ba6f8781..6a4540e4 100644 --- a/tests/pools/selection-strategies/worker-choice-strategies-context.test.mjs +++ b/tests/pools/selection-strategies/worker-choice-strategies-context.test.mjs @@ -14,7 +14,7 @@ import { RoundRobinWorkerChoiceStrategy } from '../../../src/pools/selection-str import { WeightedRoundRobinWorkerChoiceStrategy } from '../../../src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts' import { WorkerChoiceStrategiesContext } from '../../../src/pools/selection-strategies/worker-choice-strategies-context.ts' -Deno.test('Worker choice strategy context test suite', async (t) => { +Deno.test('Worker choice strategies context test suite', async (t) => { const min = 1 const max = 3 const fixedPool = new FixedThreadPool( diff --git a/tests/pools/worker-node.test.mjs b/tests/pools/worker-node.test.mjs index 2bfdd610..63c24c6d 100644 --- a/tests/pools/worker-node.test.mjs +++ b/tests/pools/worker-node.test.mjs @@ -11,7 +11,7 @@ Deno.test({ const threadWorkerNode = new WorkerNode( WorkerTypes.web, new URL('./../worker-files/thread/testWorker.mjs', import.meta.url), - { tasksQueueBackPressureSize: 12 }, + { tasksQueueBackPressureSize: 12, tasksQueueBucketSize: 6 }, ) await t.step('Worker node instantiation', () => { @@ -113,6 +113,71 @@ Deno.test({ 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer', ), ) + expect( + () => + new WorkerNode( + WorkerTypes.web, + new URL('./../worker-files/thread/testWorker.mjs', import.meta.url), + { + tasksQueueBackPressureSize: 12, + }, + ), + ).toThrow( + new TypeError( + 'Cannot construct a worker node without a tasks queue bucket size option', + ), + ) + expect( + () => + new WorkerNode( + WorkerTypes.web, + new URL('./../worker-files/thread/testWorker.mjs', import.meta.url), + { + tasksQueueBackPressureSize: 12, + tasksQueueBucketSize: 'invalidTasksQueueBucketSize', + }, + ), + ).toThrow( + new TypeError( + 'Cannot construct a worker node with a tasks queue bucket size option that is not an integer', + ), + ) + expect( + () => + new WorkerNode( + WorkerTypes.web, + new URL('./../worker-files/thread/testWorker.mjs', import.meta.url), + { tasksQueueBackPressureSize: 12, tasksQueueBucketSize: 0.2 }, + ), + ).toThrow( + new TypeError( + 'Cannot construct a worker node with a tasks queue bucket size option that is not an integer', + ), + ) + expect( + () => + new WorkerNode( + WorkerTypes.web, + new URL('./../worker-files/thread/testWorker.mjs', import.meta.url), + { tasksQueueBackPressureSize: 12, tasksQueueBucketSize: 0 }, + ), + ).toThrow( + new RangeError( + 'Cannot construct a worker node with a tasks queue bucket size option that is not a positive integer', + ), + ) + expect( + () => + new WorkerNode( + WorkerTypes.web, + new URL('./../worker-files/thread/testWorker.mjs', import.meta.url), + { tasksQueueBackPressureSize: 12, tasksQueueBucketSize: -1 }, + ), + ).toThrow( + new RangeError( + 'Cannot construct a worker node with a tasks queue bucket size option that is not a positive integer', + ), + ) expect(threadWorkerNode).toBeInstanceOf(WorkerNode) expect(threadWorkerNode.worker).toBeInstanceOf(Worker) expect(threadWorkerNode.info).toStrictEqual({ @@ -151,6 +216,7 @@ Deno.test({ expect(threadWorkerNode.tasksQueueBackPressureSize).toBe(12) expect(threadWorkerNode.tasksQueue).toBeInstanceOf(PriorityQueue) expect(threadWorkerNode.tasksQueue.size).toBe(0) + expect(threadWorkerNode.tasksQueue.k).toBe(6) expect(threadWorkerNode.tasksQueueSize()).toBe( threadWorkerNode.tasksQueue.size, )