diff --git a/src/common/constants.js b/src/common/constants.js index b5dae423..6498ce35 100644 --- a/src/common/constants.js +++ b/src/common/constants.js @@ -294,3 +294,5 @@ module.exports.WORKER_STATUS = { REGISTERED: "registered", LOGGEDIN: "logged-in" }; + +module.exports.PERIODIC_ANNOUNCEMENT_INTERVAL_MILI = 12 * 60 * 60 * 1000; // 12 hours diff --git a/src/worker/controller/NodeController.js b/src/worker/controller/NodeController.js index b4df57df..8b1b4ed4 100644 --- a/src/worker/controller/NodeController.js +++ b/src/worker/controller/NodeController.js @@ -92,9 +92,12 @@ class NodeController { // TODO:: taskManager see this._initTaskManager() this._taskManager = null; - // // init ethereum api + // init ethereum api this._ethereumApi = null; + // a list of system scheduled timers + this._scheduledTimers = []; + // TODO: consider a more cleaner approach this._workerInitialzied = false; this._workerInitInProgress = false; @@ -381,6 +384,9 @@ class NodeController { if (this._webserver) { this._webserver.stop(); } + for (const timer of this._scheduledTimers) { + timer.unref(); + } } /** @@ -471,6 +477,13 @@ class NodeController { }); } + /** Add a timer to the scheduled timers list + * @param {Timer} timer + * */ + addTimer(timer) { + this._scheduledTimers.push(timer); + } + addPeer(maStr) { nodeUtils.connectionStrToPeerInfo(maStr, (err, peerInfo) => { const action = NOTIFICATION["DISCOVERED"]; diff --git a/src/worker/controller/actions/InitWorkerAction.js b/src/worker/controller/actions/InitWorkerAction.js index 3aade827..fe13c39b 100644 --- a/src/worker/controller/actions/InitWorkerAction.js +++ b/src/worker/controller/actions/InitWorkerAction.js @@ -19,7 +19,6 @@ * */ const constants = require("../../../common/constants"); -const waterfall = require("async/waterfall"); class InitWorkerAction { constructor(controller) { @@ -28,12 +27,13 @@ class InitWorkerAction { /** * @param {Function} optional callback (err)=>{} * */ - execute(params) { + async execute(params) { const callback = params.callback; const C = constants.NODE_NOTIFICATIONS; + let error = null; if (this._controller.isWorkerInitialized()) { - this._controller.logger().debug("Worker was already initialized.. Skipping"); + this._controller.logger().debug("worker was already initialized.. Skipping"); if (callback) { callback(null); } @@ -41,36 +41,23 @@ class InitWorkerAction { } this._controller.startInitWorker(); - // methods - const syncState = cb => { - if (!this._controller.hasEthereum()) { - return cb(null); + try { + if (this._controller.hasEthereum()) { + this._controller.logger().debug("starting sync"); + await this._controller.asyncExecCmd(C.SYNC_RECEIVER_PIPELINE, {}); } - this._controller.logger().debug("Starting sync"); - this._controller.execCmd(C.SYNC_RECEIVER_PIPELINE, { - onEnd: (err, statusResult) => { - this._controller.logger().debug("Finished sync"); - cb(err); - } - }); - }; - const announceState = cb => { - this._controller.logger().debug("Starting announcing local state"); - this._controller.execCmd(C.ANNOUNCE_LOCAL_STATE, { - onResponse: (error, content) => { - if (error) { - this._controller.logger().error("failed announcing " + error); - } else { - content.forEach(ecid => { - this._controller.logger().debug("providing: " + ecid.getKeccack256()); - }); - } - cb(error); - } + + this._controller.logger().debug("starting announcing local state"); + const content = await this._controller.asyncExecCmd(C.ANNOUNCE_LOCAL_STATE, {}); + content.result.forEach(ecid => { + this._controller.logger().debug("providing: " + ecid.getKeccack256()); }); - }; - const backgroundServices = cb => { + + // TODO:: everything that runs in an infinite loop in the program should be started here. + // TODO:: for example we could start here a process to always ping enigma-core and check if ok + this._controller.logger().debug("starting background services"); if (this._controller.hasEthereum()) { + // subscribe to new epoch events this._controller .ethereum() .services() @@ -85,63 +72,38 @@ class InitWorkerAction { }.bind(this) ); } - // TODO:: everything that runs in an infinite loop in the program should be started here. - // TODO:: for example we could start here a process to always ping enigma-core and check if ok // subscribe to self (for responding to rpc requests of other workers) this._controller.execCmd(C.SELF_KEY_SUBSCRIBE, {}); - // log finish this stage - this._controller.logger().debug("started background services"); - cb(null); - }; - const registerWorker = async () => { - if (this._controller.hasEthereum()) { - let workerParams = null; + // add timer for checking synchronization status and announcing local state + const timer = setInterval(async () => { + this._controller.logger().info("starting periodic local state announcement"); try { - workerParams = await this._controller.asyncExecCmd(C.GET_ETH_WORKER_PARAM); - } catch (err) { - return this._controller - .logger() - .error("error InitWorkerAction- Reading worker params from ethereum failed" + err); - } - // If the worker is already logged-in, nothing to do - if ( - workerParams.status === constants.ETHEREUM_WORKER_STATUS.LOGGEDIN || - workerParams.status === constants.ETHEREUM_WORKER_STATUS.LOGGEDOUT - ) { - this._controller.logger().info("InitWorkerAction- worker is already registered"); - return; - } - try { - await this._controller.asyncExecCmd(C.REGISTER); + this._controller.logger().debug("starting sync"); + await this._controller.asyncExecute(C.SYNC_RECEIVER_PIPELINE, {}); + this._controller.logger().debug("starting announcing local state"); + const content = this._controller.asyncExecCmd(C.ANNOUNCE_LOCAL_STATE, {}); + content.result.forEach(ecid => { + this._controller.logger().debug("providing: " + ecid.getKeccack256()); + }); + this._controller.logger().info(`periodic local state announcement finished successfully`); } catch (err) { - return this._controller.logger().error("error InitWorkerAction- Register to ethereum failed" + err); - } - } - }; - waterfall( - [ - // Sync State - syncState, - // Announce State: - announceState, - // Background Services: - backgroundServices, - // register and login worker - registerWorker - ], - err => { - if (err) { - this._controller.logger().error("error InitWorkerAction " + err); - } else { - this._controller.logger().info("success InitWorkerAction"); - } - this._controller.initWorkerDone(); - if (callback) { - callback(err); + this._controller.logger().error(`error in periodic local state announcement: ${err}`); } - } - ); + }, constants.PERIODIC_ANNOUNCEMENT_INTERVAL_MILI); + this._controller.addTimer(timer); + + // register worker + await this._controller.asyncExecCmd(C.REGISTER); + this._controller.logger().info("success InitWorkerAction"); + } catch (err) { + this._controller.logger().error("error InitWorkerAction " + err); + error = err; + } + this._controller.initWorkerDone(); + if (callback) { + callback(error); + } } asyncExecute(params) { diff --git a/src/worker/controller/actions/sync/AnnounceLocalStateAction.js b/src/worker/controller/actions/sync/AnnounceLocalStateAction.js index 72d803b9..17e341b7 100644 --- a/src/worker/controller/actions/sync/AnnounceLocalStateAction.js +++ b/src/worker/controller/actions/sync/AnnounceLocalStateAction.js @@ -38,7 +38,7 @@ class AnnounceLocalStateAction { asyncExecute(params) { const action = this; return new Promise((resolve, reject) => { - params.callback = function(status, result) { + params.onResponse = function(status, result) { resolve({ status: status, result: result }); }; action.execute(params); diff --git a/src/worker/controller/actions/sync/ReceiveAllPipelineAction.js b/src/worker/controller/actions/sync/ReceiveAllPipelineAction.js index 17ee7216..d22c7383 100644 --- a/src/worker/controller/actions/sync/ReceiveAllPipelineAction.js +++ b/src/worker/controller/actions/sync/ReceiveAllPipelineAction.js @@ -113,7 +113,7 @@ class ReceiveAllPipelineAction { asyncExecute(params) { const action = this; return new Promise((resolve, reject) => { - params.callback = function(err, results) { + params.onEnd = function(err, results) { if (err) { reject(err); } else {