From 5bfb3e880d1913fe1655a02a02d011ebf4f3a6ac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Sun, 24 Dec 2023 12:09:29 +0100 Subject: [PATCH] fix: fix worker node cross tasks stealing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- CHANGELOG.md | 5 ++++ src/pools/abstract-pool.ts | 39 ++++++++++++++++++++++++----- src/pools/pool.ts | 2 ++ src/pools/worker-node.ts | 1 + src/pools/worker.ts | 5 ++++ tests/pools/abstract-pool.test.mjs | 7 ++++-- tests/pools/worker-node.test.mjs | 1 + tests/worker/thread-worker.test.mjs | 19 ++++++-------- 8 files changed, 59 insertions(+), 20 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4bbfcb10..6e25cdcd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,11 @@ and this project adheres to ## [Unreleased] +### Fixed + +- Avoid worker node cross tasks stealing. +- Ensure only half the pool worker nodes can steal tasks. + ## [0.1.9] - 2023-12-22 ### Changed diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 2fa396c6..8b18a4be 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -296,6 +296,14 @@ export abstract class AbstractPool< : accumulator, 0, ), + ...(this.opts.enableTasksQueue === true && + { + stealingWorkerNodes: this.workerNodes.reduce( + (accumulator, workerNode) => + workerNode.info.stealing ? accumulator + 1 : accumulator, + 0, + ), + }), busyWorkerNodes: this.workerNodes.reduce( (accumulator, _workerNode, workerNodeKey) => this.isWorkerNodeBusy(workerNodeKey) ? accumulator + 1 : accumulator, @@ -1375,6 +1383,10 @@ export abstract class AbstractPool< }) } + private cannotStealTask(): boolean { + return this.workerNodes.length <= 1 || this.info.queuedTasks === 0 + } + private handleTask(workerNodeKey: number, task: Task): void { if (this.shallExecuteTask(workerNodeKey)) { this.executeTask(workerNodeKey, task) @@ -1387,7 +1399,7 @@ export abstract class AbstractPool< if (workerNodeKey === -1) { return } - if (this.workerNodes.length <= 1) { + if (this.cannotStealTask()) { return } while (this.tasksQueueSize(workerNodeKey) > 0) { @@ -1481,15 +1493,21 @@ export abstract class AbstractPool< event: CustomEvent, previousStolenTask?: Task, ): void => { - if (this.workerNodes.length <= 1) { - return - } const { workerNodeKey } = event.detail if (workerNodeKey == null) { throw new Error( - 'WorkerNode event detail workerNodeKey attribute must be defined', + 'WorkerNode event detail workerNodeKey property must be defined', ) } + if ( + this.cannotStealTask() || (this.info.stealingWorkerNodes as number) > + Math.floor(this.workerNodes.length / 2) + ) { + if (previousStolenTask != null) { + this.getWorkerInfo(workerNodeKey).stealing = false + } + return + } const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks if ( previousStolenTask != null && @@ -1497,6 +1515,7 @@ export abstract class AbstractPool< (workerNodeTasksUsage.executing > 0 || this.tasksQueueSize(workerNodeKey) > 0) ) { + this.getWorkerInfo(workerNodeKey).stealing = false for ( const taskName of this.workerNodes[workerNodeKey].info .taskFunctionNames as string[] @@ -1511,6 +1530,7 @@ export abstract class AbstractPool< ) return } + this.getWorkerInfo(workerNodeKey).stealing = true const stolenTask = this.workerNodeStealTask(workerNodeKey) if ( this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) && @@ -1556,6 +1576,7 @@ export abstract class AbstractPool< const sourceWorkerNode = workerNodes.find( (sourceWorkerNode, sourceWorkerNodeKey) => sourceWorkerNode.info.ready && + !sourceWorkerNode.info.stealing && sourceWorkerNodeKey !== workerNodeKey && sourceWorkerNode.usage.tasks.queued > 0, ) @@ -1576,7 +1597,10 @@ export abstract class AbstractPool< private readonly handleBackPressureEvent = ( event: CustomEvent, ): void => { - if (this.workerNodes.length <= 1) { + if ( + this.cannotStealTask() || (this.info.stealingWorkerNodes as number) > + Math.floor(this.workerNodes.length / 2) + ) { return } const { workerId } = event.detail @@ -1596,16 +1620,19 @@ export abstract class AbstractPool< if ( sourceWorkerNode.usage.tasks.queued > 0 && workerNode.info.ready && + !workerNode.info.stealing && workerNode.info.id !== workerId && workerNode.usage.tasks.queued < (this.opts.tasksQueueOptions?.size as number) - sizeOffset ) { + this.getWorkerInfo(workerNodeKey).stealing = true const task = sourceWorkerNode.popTask() as Task this.handleTask(workerNodeKey, task) this.updateTaskStolenStatisticsWorkerUsage( workerNodeKey, task.name as string, ) + this.getWorkerInfo(workerNodeKey).stealing = false } } } diff --git a/src/pools/pool.ts b/src/pools/pool.ts index eecf5e18..49077dc5 100644 --- a/src/pools/pool.ts +++ b/src/pools/pool.ts @@ -71,6 +71,8 @@ export interface PoolInfo { readonly workerNodes: number /** Pool idle worker nodes. */ readonly idleWorkerNodes: number + /** Pool stealing worker nodes. */ + readonly stealingWorkerNodes?: number /** Pool busy worker nodes. */ readonly busyWorkerNodes: number readonly executedTasks: number diff --git a/src/pools/worker-node.ts b/src/pools/worker-node.ts index fd922b9c..952ddd7e 100644 --- a/src/pools/worker-node.ts +++ b/src/pools/worker-node.ts @@ -196,6 +196,7 @@ export class WorkerNode type: getWorkerType(worker) as WorkerType, dynamic: false, ready: false, + stealing: false, } } diff --git a/src/pools/worker.ts b/src/pools/worker.ts index 873580e1..1e6b0e87 100644 --- a/src/pools/worker.ts +++ b/src/pools/worker.ts @@ -146,6 +146,11 @@ export interface WorkerInfo { * Ready flag. */ ready: boolean + /** + * Stealing flag. + * This flag is set to `true` when worker node is stealing tasks from another worker node. + */ + stealing: boolean /** * Task function names. */ diff --git a/tests/pools/abstract-pool.test.mjs b/tests/pools/abstract-pool.test.mjs index 08bd0aeb..8f409e03 100644 --- a/tests/pools/abstract-pool.test.mjs +++ b/tests/pools/abstract-pool.test.mjs @@ -1028,6 +1028,7 @@ Deno.test({ type: WorkerTypes.web, dynamic: false, ready: true, + stealing: false, }) } await pool.destroy() @@ -1046,6 +1047,7 @@ Deno.test({ type: WorkerTypes.web, dynamic: false, ready: true, + stealing: false, }) } await pool.destroy() @@ -1447,7 +1449,7 @@ Deno.test({ stub( pool, 'hasBackPressure', - returnsNext(Array(5).fill(true)), + returnsNext(Array(7).fill(true)), ) expect(pool.emitter.eventNames()).toStrictEqual([]) const promises = new Set() @@ -1476,6 +1478,7 @@ Deno.test({ maxSize: expect.any(Number), workerNodes: expect.any(Number), idleWorkerNodes: expect.any(Number), + stealingWorkerNodes: expect.any(Number), busyWorkerNodes: expect.any(Number), executedTasks: expect.any(Number), executingTasks: expect.any(Number), @@ -1485,7 +1488,7 @@ Deno.test({ stolenTasks: expect.any(Number), failedTasks: expect.any(Number), }) - assertSpyCalls(pool.hasBackPressure, 5) + assertSpyCalls(pool.hasBackPressure, 7) pool.hasBackPressure.restore() await pool.destroy() }, diff --git a/tests/pools/worker-node.test.mjs b/tests/pools/worker-node.test.mjs index 7e123d36..3fc728e1 100644 --- a/tests/pools/worker-node.test.mjs +++ b/tests/pools/worker-node.test.mjs @@ -120,6 +120,7 @@ Deno.test({ type: WorkerTypes.web, dynamic: false, ready: false, + stealing: false, }) expect(threadWorkerNode.usage).toStrictEqual({ tasks: { diff --git a/tests/worker/thread-worker.test.mjs b/tests/worker/thread-worker.test.mjs index 179c8e23..ecbb07fa 100644 --- a/tests/worker/thread-worker.test.mjs +++ b/tests/worker/thread-worker.test.mjs @@ -32,6 +32,9 @@ Deno.test('Thread worker test suite', async (t) => { return 2 } const worker = new ThreadWorker({ fn1, fn2 }) + worker.port = { + postMessage: stub(() => {}), + } expect(worker.removeTaskFunction(0, fn1)).toStrictEqual({ status: false, error: new TypeError('name parameter is not a string'), @@ -40,9 +43,6 @@ Deno.test('Thread worker test suite', async (t) => { status: false, error: new TypeError('name parameter is an empty string'), }) - worker.port = { - postMessage: stub(() => {}), - } expect(worker.taskFunctions.get(DEFAULT_TASK_NAME)).toBeInstanceOf( Function, ) @@ -85,17 +85,12 @@ Deno.test('Thread worker test suite', async (t) => { }) await t.step( - 'Verify worker invokes the postMessage() method on port property', + 'Verify that sendToMainWorker() method invokes the port property postMessage() method', () => { - class SpyWorker extends ThreadWorker { - constructor(fn) { - super(fn) - this.port = { - postMessage: stub(() => {}), - } - } + const worker = new ThreadWorker(() => {}) + worker.port = { + postMessage: stub(() => {}), } - const worker = new SpyWorker(() => {}) worker.sendToMainWorker({ ok: 1 }) assertSpyCalls(worker.port.postMessage, 1) worker.port.postMessage.restore()