From 269e747c67b8da128d417b3809a0178257c74e38 Mon Sep 17 00:00:00 2001 From: lenka Date: Tue, 28 Jan 2020 12:10:56 +0200 Subject: [PATCH 1/3] periodic state announcement --- src/common/constants.js | 2 ++ .../controller/actions/InitWorkerAction.js | 18 ++++++++++++++++++ 2 files changed, 20 insertions(+) diff --git a/src/common/constants.js b/src/common/constants.js index b5dae423..6498ce35 100644 --- a/src/common/constants.js +++ b/src/common/constants.js @@ -294,3 +294,5 @@ module.exports.WORKER_STATUS = { REGISTERED: "registered", LOGGEDIN: "logged-in" }; + +module.exports.PERIODIC_ANNOUNCEMENT_INTERVAL_MILI = 12 * 60 * 60 * 1000; // 12 hours diff --git a/src/worker/controller/actions/InitWorkerAction.js b/src/worker/controller/actions/InitWorkerAction.js index 972140e4..90029dcb 100644 --- a/src/worker/controller/actions/InitWorkerAction.js +++ b/src/worker/controller/actions/InitWorkerAction.js @@ -87,6 +87,24 @@ class InitWorkerAction { // subscribe to self (for responding to rpc requests of other workers) this._controller.execCmd(C.SELF_KEY_SUBSCRIBE, {}); // log finish this stage + setInterval(() => { + this._controller.logger().info("Starting periodic local state announcement"); + waterfall( + [ + // Sync State + syncState, + // Announce State: + announceState + ], + err => { + if (err) { + this._controller.logger().error(`error in periodic local state announcement: ${err}`); + } else { + this._controller.logger().info(`periodic local state announcement finished successfully`); + } + } + ); + }, constants.PERIODIC_ANNOUNCEMENT_INTERVAL_MILI); this._controller.logger().debug("started background services"); cb(null); }; From d87e6805d82950ac65e367e27be02587bf9c13f4 Mon Sep 17 00:00:00 2001 From: lenka Date: Tue, 28 Jan 2020 18:15:31 +0200 Subject: [PATCH 2/3] unref the timer interval --- src/worker/controller/NodeController.js | 15 ++++++++++++++- src/worker/controller/actions/InitWorkerAction.js | 11 +++++++---- 2 files changed, 21 insertions(+), 5 deletions(-) diff --git a/src/worker/controller/NodeController.js b/src/worker/controller/NodeController.js index b4df57df..8b1b4ed4 100644 --- a/src/worker/controller/NodeController.js +++ b/src/worker/controller/NodeController.js @@ -92,9 +92,12 @@ class NodeController { // TODO:: taskManager see this._initTaskManager() this._taskManager = null; - // // init ethereum api + // init ethereum api this._ethereumApi = null; + // a list of system scheduled timers + this._scheduledTimers = []; + // TODO: consider a more cleaner approach this._workerInitialzied = false; this._workerInitInProgress = false; @@ -381,6 +384,9 @@ class NodeController { if (this._webserver) { this._webserver.stop(); } + for (const timer of this._scheduledTimers) { + timer.unref(); + } } /** @@ -471,6 +477,13 @@ class NodeController { }); } + /** Add a timer to the scheduled timers list + * @param {Timer} timer + * */ + addTimer(timer) { + this._scheduledTimers.push(timer); + } + addPeer(maStr) { nodeUtils.connectionStrToPeerInfo(maStr, (err, peerInfo) => { const action = NOTIFICATION["DISCOVERED"]; diff --git a/src/worker/controller/actions/InitWorkerAction.js b/src/worker/controller/actions/InitWorkerAction.js index 90029dcb..78696104 100644 --- a/src/worker/controller/actions/InitWorkerAction.js +++ b/src/worker/controller/actions/InitWorkerAction.js @@ -67,6 +67,8 @@ class InitWorkerAction { }); }; const backgroundServices = cb => { + // TODO:: everything that runs in an infinite loop in the program should be started here. + // TODO:: for example we could start here a process to always ping enigma-core and check if ok if (this._controller.hasEthereum()) { this._controller .ethereum() @@ -82,12 +84,11 @@ class InitWorkerAction { }.bind(this) ); } - // TODO:: everything that runs in an infinite loop in the program should be started here. - // TODO:: for example we could start here a process to always ping enigma-core and check if ok // subscribe to self (for responding to rpc requests of other workers) this._controller.execCmd(C.SELF_KEY_SUBSCRIBE, {}); - // log finish this stage - setInterval(() => { + + // add timer for checking synchronization status and announcing local state + const timer = setInterval(() => { this._controller.logger().info("Starting periodic local state announcement"); waterfall( [ @@ -105,6 +106,8 @@ class InitWorkerAction { } ); }, constants.PERIODIC_ANNOUNCEMENT_INTERVAL_MILI); + this._controller.addTimer(timer); + // log finish this stage this._controller.logger().debug("started background services"); cb(null); }; From c30a8b9fb382fba63198bd1d8ce469cf69d5ab7b Mon Sep 17 00:00:00 2001 From: lenka Date: Wed, 5 Feb 2020 17:39:42 +0200 Subject: [PATCH 3/3] code review --- .../controller/actions/InitWorkerAction.js | 139 +++++------------- .../actions/sync/AnnounceLocalStateAction.js | 2 +- .../actions/sync/ReceiveAllPipelineAction.js | 2 +- 3 files changed, 42 insertions(+), 101 deletions(-) diff --git a/src/worker/controller/actions/InitWorkerAction.js b/src/worker/controller/actions/InitWorkerAction.js index 32ae00fd..fe13c39b 100644 --- a/src/worker/controller/actions/InitWorkerAction.js +++ b/src/worker/controller/actions/InitWorkerAction.js @@ -19,7 +19,6 @@ * */ const constants = require("../../../common/constants"); -const waterfall = require("async/waterfall"); class InitWorkerAction { constructor(controller) { @@ -28,12 +27,13 @@ class InitWorkerAction { /** * @param {Function} optional callback (err)=>{} * */ - execute(params) { + async execute(params) { const callback = params.callback; const C = constants.NODE_NOTIFICATIONS; + let error = null; if (this._controller.isWorkerInitialized()) { - this._controller.logger().debug("Worker was already initialized.. Skipping"); + this._controller.logger().debug("worker was already initialized.. Skipping"); if (callback) { callback(null); } @@ -41,38 +41,23 @@ class InitWorkerAction { } this._controller.startInitWorker(); - // methods - const syncState = cb => { - if (!this._controller.hasEthereum()) { - return cb(null); + try { + if (this._controller.hasEthereum()) { + this._controller.logger().debug("starting sync"); + await this._controller.asyncExecCmd(C.SYNC_RECEIVER_PIPELINE, {}); } - this._controller.logger().debug("Starting sync"); - this._controller.execCmd(C.SYNC_RECEIVER_PIPELINE, { - onEnd: (err, statusResult) => { - this._controller.logger().debug("Finished sync"); - cb(err); - } - }); - }; - const announceState = cb => { - this._controller.logger().debug("Starting announcing local state"); - this._controller.execCmd(C.ANNOUNCE_LOCAL_STATE, { - onResponse: (error, content) => { - if (error) { - this._controller.logger().error("failed announcing " + error); - } else { - content.forEach(ecid => { - this._controller.logger().debug("providing: " + ecid.getKeccack256()); - }); - } - cb(error); - } + + this._controller.logger().debug("starting announcing local state"); + const content = await this._controller.asyncExecCmd(C.ANNOUNCE_LOCAL_STATE, {}); + content.result.forEach(ecid => { + this._controller.logger().debug("providing: " + ecid.getKeccack256()); }); - }; - const backgroundServices = cb => { + // TODO:: everything that runs in an infinite loop in the program should be started here. // TODO:: for example we could start here a process to always ping enigma-core and check if ok + this._controller.logger().debug("starting background services"); if (this._controller.hasEthereum()) { + // subscribe to new epoch events this._controller .ethereum() .services() @@ -91,78 +76,34 @@ class InitWorkerAction { this._controller.execCmd(C.SELF_KEY_SUBSCRIBE, {}); // add timer for checking synchronization status and announcing local state - const timer = setInterval(() => { - this._controller.logger().info("Starting periodic local state announcement"); - waterfall( - [ - // Sync State - syncState, - // Announce State: - announceState - ], - err => { - if (err) { - this._controller.logger().error(`error in periodic local state announcement: ${err}`); - } else { - this._controller.logger().info(`periodic local state announcement finished successfully`); - } - } - ); - }, constants.PERIODIC_ANNOUNCEMENT_INTERVAL_MILI); - this._controller.addTimer(timer); - // log finish this stage - this._controller.logger().debug("started background services"); - cb(null); - }; - const registerWorker = async () => { - if (this._controller.hasEthereum()) { - let workerParams = null; - + const timer = setInterval(async () => { + this._controller.logger().info("starting periodic local state announcement"); try { - workerParams = await this._controller.asyncExecCmd(C.GET_ETH_WORKER_PARAM); + this._controller.logger().debug("starting sync"); + await this._controller.asyncExecute(C.SYNC_RECEIVER_PIPELINE, {}); + this._controller.logger().debug("starting announcing local state"); + const content = this._controller.asyncExecCmd(C.ANNOUNCE_LOCAL_STATE, {}); + content.result.forEach(ecid => { + this._controller.logger().debug("providing: " + ecid.getKeccack256()); + }); + this._controller.logger().info(`periodic local state announcement finished successfully`); } catch (err) { - return this._controller - .logger() - .error("error InitWorkerAction- Reading worker params from ethereum failed" + err); + this._controller.logger().error(`error in periodic local state announcement: ${err}`); } - // If the worker is already logged-in, nothing to do - if ( - workerParams.status === constants.ETHEREUM_WORKER_STATUS.LOGGEDIN || - workerParams.status === constants.ETHEREUM_WORKER_STATUS.LOGGEDOUT - ) { - this._controller.logger().info("InitWorkerAction- worker is already registered"); - return; - } - try { - await this._controller.asyncExecCmd(C.REGISTER); - } catch (err) { - return this._controller.logger().error("error InitWorkerAction- Register to ethereum failed" + err); - } - } - }; - waterfall( - [ - // Sync State - syncState, - // Announce State: - announceState, - // Background Services: - backgroundServices, - // register and login worker - registerWorker - ], - err => { - if (err) { - this._controller.logger().error("error InitWorkerAction " + err); - } else { - this._controller.logger().info("success InitWorkerAction"); - } - this._controller.initWorkerDone(); - if (callback) { - callback(err); - } - } - ); + }, constants.PERIODIC_ANNOUNCEMENT_INTERVAL_MILI); + this._controller.addTimer(timer); + + // register worker + await this._controller.asyncExecCmd(C.REGISTER); + this._controller.logger().info("success InitWorkerAction"); + } catch (err) { + this._controller.logger().error("error InitWorkerAction " + err); + error = err; + } + this._controller.initWorkerDone(); + if (callback) { + callback(error); + } } asyncExecute(params) { diff --git a/src/worker/controller/actions/sync/AnnounceLocalStateAction.js b/src/worker/controller/actions/sync/AnnounceLocalStateAction.js index 72d803b9..17e341b7 100644 --- a/src/worker/controller/actions/sync/AnnounceLocalStateAction.js +++ b/src/worker/controller/actions/sync/AnnounceLocalStateAction.js @@ -38,7 +38,7 @@ class AnnounceLocalStateAction { asyncExecute(params) { const action = this; return new Promise((resolve, reject) => { - params.callback = function(status, result) { + params.onResponse = function(status, result) { resolve({ status: status, result: result }); }; action.execute(params); diff --git a/src/worker/controller/actions/sync/ReceiveAllPipelineAction.js b/src/worker/controller/actions/sync/ReceiveAllPipelineAction.js index 17ee7216..d22c7383 100644 --- a/src/worker/controller/actions/sync/ReceiveAllPipelineAction.js +++ b/src/worker/controller/actions/sync/ReceiveAllPipelineAction.js @@ -113,7 +113,7 @@ class ReceiveAllPipelineAction { asyncExecute(params) { const action = this; return new Promise((resolve, reject) => { - params.callback = function(err, results) { + params.onEnd = function(err, results) { if (err) { reject(err); } else {