Skip to content
This repository has been archived by the owner on Jun 21, 2020. It is now read-only.

Announce periodically (each 12 hours) #305

Open
wants to merge 6 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/common/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -294,3 +294,5 @@ module.exports.WORKER_STATUS = {
REGISTERED: "registered",
LOGGEDIN: "logged-in"
};

module.exports.PERIODIC_ANNOUNCEMENT_INTERVAL_MILI = 12 * 60 * 60 * 1000; // 12 hours
15 changes: 14 additions & 1 deletion src/worker/controller/NodeController.js
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,12 @@ class NodeController {
// TODO:: taskManager see this._initTaskManager()
this._taskManager = null;

// // init ethereum api
// init ethereum api
this._ethereumApi = null;

// a list of system scheduled timers
this._scheduledTimers = [];

// TODO: consider a more cleaner approach
this._workerInitialzied = false;
this._workerInitInProgress = false;
Expand Down Expand Up @@ -381,6 +384,9 @@ class NodeController {
if (this._webserver) {
this._webserver.stop();
}
for (const timer of this._scheduledTimers) {
timer.unref();
}
}

/**
Expand Down Expand Up @@ -471,6 +477,13 @@ class NodeController {
});
}

/** Add a timer to the scheduled timers list
* @param {Timer} timer
* */
addTimer(timer) {
this._scheduledTimers.push(timer);
}

addPeer(maStr) {
nodeUtils.connectionStrToPeerInfo(maStr, (err, peerInfo) => {
const action = NOTIFICATION["DISCOVERED"];
Expand Down
124 changes: 43 additions & 81 deletions src/worker/controller/actions/InitWorkerAction.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
* */

const constants = require("../../../common/constants");
const waterfall = require("async/waterfall");

class InitWorkerAction {
constructor(controller) {
Expand All @@ -28,49 +27,37 @@ class InitWorkerAction {
/**
* @param {Function} optional callback (err)=>{}
* */
execute(params) {
async execute(params) {
const callback = params.callback;
const C = constants.NODE_NOTIFICATIONS;
let error = null;

if (this._controller.isWorkerInitialized()) {
this._controller.logger().debug("Worker was already initialized.. Skipping");
this._controller.logger().debug("worker was already initialized.. Skipping");
if (callback) {
callback(null);
}
return;
}

this._controller.startInitWorker();
// methods
const syncState = cb => {
if (!this._controller.hasEthereum()) {
return cb(null);
try {
if (this._controller.hasEthereum()) {
this._controller.logger().debug("starting sync");
await this._controller.asyncExecCmd(C.SYNC_RECEIVER_PIPELINE, {});
}
this._controller.logger().debug("Starting sync");
this._controller.execCmd(C.SYNC_RECEIVER_PIPELINE, {
onEnd: (err, statusResult) => {
this._controller.logger().debug("Finished sync");
cb(err);
}
});
};
const announceState = cb => {
this._controller.logger().debug("Starting announcing local state");
this._controller.execCmd(C.ANNOUNCE_LOCAL_STATE, {
onResponse: (error, content) => {
if (error) {
this._controller.logger().error("failed announcing " + error);
} else {
content.forEach(ecid => {
this._controller.logger().debug("providing: " + ecid.getKeccack256());
});
}
cb(error);
}

this._controller.logger().debug("starting announcing local state");
const content = await this._controller.asyncExecCmd(C.ANNOUNCE_LOCAL_STATE, {});
content.result.forEach(ecid => {
this._controller.logger().debug("providing: " + ecid.getKeccack256());
});
};
const backgroundServices = cb => {

// TODO:: everything that runs in an infinite loop in the program should be started here.
// TODO:: for example we could start here a process to always ping enigma-core and check if ok
this._controller.logger().debug("starting background services");
if (this._controller.hasEthereum()) {
// subscribe to new epoch events
this._controller
.ethereum()
.services()
Expand All @@ -85,63 +72,38 @@ class InitWorkerAction {
}.bind(this)
);
}
// TODO:: everything that runs in an infinite loop in the program should be started here.
// TODO:: for example we could start here a process to always ping enigma-core and check if ok
// subscribe to self (for responding to rpc requests of other workers)
this._controller.execCmd(C.SELF_KEY_SUBSCRIBE, {});
// log finish this stage
this._controller.logger().debug("started background services");
cb(null);
};
const registerWorker = async () => {
if (this._controller.hasEthereum()) {
let workerParams = null;

// add timer for checking synchronization status and announcing local state
const timer = setInterval(async () => {
this._controller.logger().info("starting periodic local state announcement");
try {
workerParams = await this._controller.asyncExecCmd(C.GET_ETH_WORKER_PARAM);
} catch (err) {
return this._controller
.logger()
.error("error InitWorkerAction- Reading worker params from ethereum failed" + err);
}
// If the worker is already logged-in, nothing to do
if (
workerParams.status === constants.ETHEREUM_WORKER_STATUS.LOGGEDIN ||
workerParams.status === constants.ETHEREUM_WORKER_STATUS.LOGGEDOUT
) {
this._controller.logger().info("InitWorkerAction- worker is already registered");
return;
}
try {
await this._controller.asyncExecCmd(C.REGISTER);
this._controller.logger().debug("starting sync");
await this._controller.asyncExecute(C.SYNC_RECEIVER_PIPELINE, {});
this._controller.logger().debug("starting announcing local state");
const content = this._controller.asyncExecCmd(C.ANNOUNCE_LOCAL_STATE, {});
content.result.forEach(ecid => {
this._controller.logger().debug("providing: " + ecid.getKeccack256());
});
this._controller.logger().info(`periodic local state announcement finished successfully`);
} catch (err) {
return this._controller.logger().error("error InitWorkerAction- Register to ethereum failed" + err);
}
}
};
waterfall(
[
// Sync State
syncState,
// Announce State:
announceState,
// Background Services:
backgroundServices,
// register and login worker
registerWorker
],
err => {
if (err) {
this._controller.logger().error("error InitWorkerAction " + err);
} else {
this._controller.logger().info("success InitWorkerAction");
}
this._controller.initWorkerDone();
if (callback) {
callback(err);
this._controller.logger().error(`error in periodic local state announcement: ${err}`);
}
}
);
}, constants.PERIODIC_ANNOUNCEMENT_INTERVAL_MILI);
this._controller.addTimer(timer);

// register worker
await this._controller.asyncExecCmd(C.REGISTER);
this._controller.logger().info("success InitWorkerAction");
} catch (err) {
this._controller.logger().error("error InitWorkerAction " + err);
error = err;
}
this._controller.initWorkerDone();
if (callback) {
callback(error);
}
}

asyncExecute(params) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class AnnounceLocalStateAction {
asyncExecute(params) {
const action = this;
return new Promise((resolve, reject) => {
params.callback = function(status, result) {
params.onResponse = function(status, result) {
resolve({ status: status, result: result });
};
action.execute(params);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ class ReceiveAllPipelineAction {
asyncExecute(params) {
const action = this;
return new Promise((resolve, reject) => {
params.callback = function(err, results) {
params.onEnd = function(err, results) {
if (err) {
reject(err);
} else {
Expand Down