-
Notifications
You must be signed in to change notification settings - Fork 12
/
worker-pool.js
125 lines (108 loc) · 4.65 KB
/
worker-pool.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
const { EventEmitter } = require('events');
const { Worker } = require('worker_threads');
const { FastNEARError } = require('./error');
const kTaskInfo = Symbol('kTaskInfo');
const kWorkerFreedEvent = Symbol('kWorkerFreedEvent');
const CONTRACT_TIMEOUT_MS = parseInt(process.env.FAST_NEAR_CONTRACT_TIMEOUT_MS || '1000');
// NOTE: Mostly lifted from here https://amagiacademy.com/blog/posts/2021-04-09/node-worker-threads-pool
class WorkerPool extends EventEmitter {
constructor(numThreads, storage) {
super();
this.numThreads = numThreads;
this.workers = [];
this.freeWorkers = [];
this.storage = storage;
this.running = true;
for (let i = 0; i < numThreads; i++) {
this.addNewWorker();
}
}
addNewWorker() {
const worker = new Worker(`${__dirname}/worker.js`);
worker.on('message', ({ result, logs, error, errorCode, methodName, compKey }) => {
let { resolve, reject, blockHeight } = worker[kTaskInfo];
if (!methodName) {
clearTimeout(worker[kTaskInfo].timeoutHandle);
worker[kTaskInfo] = null;
this.freeWorkers.push(worker);
this.emit(kWorkerFreedEvent);
}
if (error) {
if (errorCode) {
// TODO: Should we preserve call stack when possible?
return reject(new FastNEARError(errorCode, error.message));
}
return reject(error);
}
if (!methodName) {
return resolve({ result, logs });
}
compKey = Buffer.from(compKey);
blockHeight = Buffer.from(blockHeight.toString());
switch (methodName) {
case 'storage_read':
(async () => {
const data = await this.storage.getLatestData(compKey, blockHeight);
worker.postMessage(data);
})().catch((error) => {
worker.postMessage({ error });
});
break;
}
});
worker.once('exit', (code) => {
worker.emit('error', new Error(`Worker stopped with exit code ${code}`));
});
worker.on('error', (err) => {
if (!this.running) {
return;
}
if (worker[kTaskInfo]) {
const { contractId, methodName, didTimeout, reject } = worker[kTaskInfo]
if (didTimeout) {
err = new FastNEARError('executionTimedOut', `${contractId}.${methodName} execution timed out`, { accountId: contractId, methodName });
}
reject(err)
} else {
this.emit('error', err);
}
this.workers.splice(this.workers.indexOf(worker), 1);
this.addNewWorker();
});
this.workers.push(worker);
this.freeWorkers.push(worker);
this.emit(kWorkerFreedEvent);
}
runContract(blockHeight, blockTimestamp, wasmModule, account, contractId, methodName, methodArgs) {
return new Promise((resolve, reject) => {
if (this.freeWorkers.length === 0) {
// No free threads, wait until a worker thread becomes free.
// TODO: Throw (for rate limiting) if there are too many queued callbacks
this.once(kWorkerFreedEvent,
() => this.runContract(blockHeight, blockTimestamp, wasmModule, account, contractId, methodName, methodArgs).then(resolve).catch(reject));
return;
}
const { amount, locked, storage_usage } = account;
const accountBalance = amount.toBuffer('le', 16);
const accountLockedBalance = locked.toBuffer('le', 16);
const storageUsage = storage_usage.toBuffer('le', 8);
const worker = this.freeWorkers.pop();
worker[kTaskInfo] = { resolve, reject, blockHeight, blockTimestamp, contractId, methodName };
worker.postMessage({ wasmModule, blockHeight, blockTimestamp, accountBalance, accountLockedBalance, storageUsage, contractId, methodName, methodArgs });
worker[kTaskInfo].timeoutHandle = setTimeout(() => {
if (worker[kTaskInfo]) {
worker[kTaskInfo].didTimeout = true;
worker.terminate();
}
}, CONTRACT_TIMEOUT_MS);
});
}
close() {
this.running = false;
for (const worker of this.workers) {
worker.terminate();
}
this.workers = [];
}
}
module.exports = WorkerPool;