Skip to content

Commit

Permalink
fix: fix worker nodes priority queue k-buckets initialization
Browse files Browse the repository at this point in the history
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
  • Loading branch information
jerome-benoit committed May 8, 2024
1 parent 446cd6c commit f800588
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 3 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ and this project adheres to

## [Unreleased]

### Fixed

- Fix worker nodes priority queue k-buckets initialization.

## [0.4.0] - 2024-05-07

### Changed
Expand Down
15 changes: 15 additions & 0 deletions src/pools/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,21 @@ export const checkWorkerNodeArguments = (
'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer',
)
}
if (opts.tasksQueueBucketSize == null) {
throw new TypeError(
'Cannot construct a worker node without a tasks queue bucket size option',
)
}
if (!Number.isSafeInteger(opts.tasksQueueBucketSize)) {
throw new TypeError(
'Cannot construct a worker node with a tasks queue bucket size option that is not an integer',
)
}
if (opts.tasksQueueBucketSize <= 0) {
throw new RangeError(
'Cannot construct a worker node with a tasks queue bucket size option that is not a positive integer',
)
}
}

/**
Expand Down
2 changes: 1 addition & 1 deletion src/pools/worker-node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ export class WorkerNode<Worker extends IWorker, Data = unknown>
)
}
this.tasksQueueBackPressureSize = opts.tasksQueueBackPressureSize!
this.tasksQueue = new PriorityQueue<Task<Data>>()
this.tasksQueue = new PriorityQueue<Task<Data>>(opts.tasksQueueBucketSize)
this.onBackPressureStarted = false
this.taskFunctionsUsage = new Map<string, WorkerUsage>()
}
Expand Down
2 changes: 2 additions & 0 deletions tests/pools/abstract-pool.test.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -826,6 +826,7 @@ Deno.test({
expect(workerNode.tasksQueue).toBeInstanceOf(PriorityQueue)
expect(workerNode.tasksQueue.size).toBe(0)
expect(workerNode.tasksQueue.maxSize).toBe(0)
expect(workerNode.tasksQueue.k).toBe(numberOfWorkers * 2)
}
await pool.destroy()
pool = new DynamicThreadPool(
Expand All @@ -838,6 +839,7 @@ Deno.test({
expect(workerNode.tasksQueue).toBeInstanceOf(PriorityQueue)
expect(workerNode.tasksQueue.size).toBe(0)
expect(workerNode.tasksQueue.maxSize).toBe(0)
expect(workerNode.tasksQueue.k).toBe(numberOfWorkers * 2)
}
await pool.destroy()
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import { RoundRobinWorkerChoiceStrategy } from '../../../src/pools/selection-str
import { WeightedRoundRobinWorkerChoiceStrategy } from '../../../src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts'
import { WorkerChoiceStrategiesContext } from '../../../src/pools/selection-strategies/worker-choice-strategies-context.ts'

Deno.test('Worker choice strategy context test suite', async (t) => {
Deno.test('Worker choice strategies context test suite', async (t) => {
const min = 1
const max = 3
const fixedPool = new FixedThreadPool(
Expand Down
68 changes: 67 additions & 1 deletion tests/pools/worker-node.test.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ Deno.test({
const threadWorkerNode = new WorkerNode(
WorkerTypes.web,
new URL('./../worker-files/thread/testWorker.mjs', import.meta.url),
{ tasksQueueBackPressureSize: 12 },
{ tasksQueueBackPressureSize: 12, tasksQueueBucketSize: 6 },
)

await t.step('Worker node instantiation', () => {
Expand Down Expand Up @@ -113,6 +113,71 @@ Deno.test({
'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer',
),
)
expect(
() =>
new WorkerNode(
WorkerTypes.web,
new URL('./../worker-files/thread/testWorker.mjs', import.meta.url),
{
tasksQueueBackPressureSize: 12,
},
),
).toThrow(
new TypeError(
'Cannot construct a worker node without a tasks queue bucket size option',
),
)
expect(
() =>
new WorkerNode(
WorkerTypes.web,
new URL('./../worker-files/thread/testWorker.mjs', import.meta.url),
{
tasksQueueBackPressureSize: 12,
tasksQueueBucketSize: 'invalidTasksQueueBucketSize',
},
),
).toThrow(
new TypeError(
'Cannot construct a worker node with a tasks queue bucket size option that is not an integer',
),
)
expect(
() =>
new WorkerNode(
WorkerTypes.web,
new URL('./../worker-files/thread/testWorker.mjs', import.meta.url),
{ tasksQueueBackPressureSize: 12, tasksQueueBucketSize: 0.2 },
),
).toThrow(
new TypeError(
'Cannot construct a worker node with a tasks queue bucket size option that is not an integer',
),
)
expect(
() =>
new WorkerNode(
WorkerTypes.web,
new URL('./../worker-files/thread/testWorker.mjs', import.meta.url),
{ tasksQueueBackPressureSize: 12, tasksQueueBucketSize: 0 },
),
).toThrow(
new RangeError(
'Cannot construct a worker node with a tasks queue bucket size option that is not a positive integer',
),
)
expect(
() =>
new WorkerNode(
WorkerTypes.web,
new URL('./../worker-files/thread/testWorker.mjs', import.meta.url),
{ tasksQueueBackPressureSize: 12, tasksQueueBucketSize: -1 },
),
).toThrow(
new RangeError(
'Cannot construct a worker node with a tasks queue bucket size option that is not a positive integer',
),
)
expect(threadWorkerNode).toBeInstanceOf(WorkerNode)
expect(threadWorkerNode.worker).toBeInstanceOf(Worker)
expect(threadWorkerNode.info).toStrictEqual({
Expand Down Expand Up @@ -151,6 +216,7 @@ Deno.test({
expect(threadWorkerNode.tasksQueueBackPressureSize).toBe(12)
expect(threadWorkerNode.tasksQueue).toBeInstanceOf(PriorityQueue)
expect(threadWorkerNode.tasksQueue.size).toBe(0)
expect(threadWorkerNode.tasksQueue.k).toBe(6)
expect(threadWorkerNode.tasksQueueSize()).toBe(
threadWorkerNode.tasksQueue.size,
)
Expand Down

0 comments on commit f800588

Please sign in to comment.