Skip to content
This repository has been archived by the owner on Jun 21, 2020. It is now read-only.

Announce periodically (each 12 hours) #305

Open
wants to merge 6 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/common/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -294,3 +294,5 @@ module.exports.WORKER_STATUS = {
REGISTERED: "registered",
LOGGEDIN: "logged-in"
};

module.exports.PERIODIC_ANNOUNCEMENT_INTERVAL_MILI = 12 * 60 * 60 * 1000; // 12 hours
15 changes: 14 additions & 1 deletion src/worker/controller/NodeController.js
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,12 @@ class NodeController {
// TODO:: taskManager see this._initTaskManager()
this._taskManager = null;

// // init ethereum api
// init ethereum api
this._ethereumApi = null;

// a list of system scheduled timers
this._scheduledTimers = [];

// TODO: consider a more cleaner approach
this._workerInitialzied = false;
this._workerInitInProgress = false;
Expand Down Expand Up @@ -381,6 +384,9 @@ class NodeController {
if (this._webserver) {
this._webserver.stop();
}
for (const timer of this._scheduledTimers) {
timer.unref();
}
}

/**
Expand Down Expand Up @@ -471,6 +477,13 @@ class NodeController {
});
}

/** Add a timer to the scheduled timers list
* @param {Timer} timer
* */
addTimer(timer) {
this._scheduledTimers.push(timer);
}

addPeer(maStr) {
nodeUtils.connectionStrToPeerInfo(maStr, (err, peerInfo) => {
const action = NOTIFICATION["DISCOVERED"];
Expand Down
124 changes: 43 additions & 81 deletions src/worker/controller/actions/InitWorkerAction.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
* */

const constants = require("../../../common/constants");
const waterfall = require("async/waterfall");

class InitWorkerAction {
constructor(controller) {
Expand All @@ -28,49 +27,37 @@ class InitWorkerAction {
/**
* @param {Function} optional callback (err)=>{}
* */
execute(params) {
async execute(params) {
const callback = params.callback;
const C = constants.NODE_NOTIFICATIONS;
let error = null;

if (this._controller.isWorkerInitialized()) {
this._controller.logger().debug("Worker was already initialized.. Skipping");
this._controller.logger().debug("worker was already initialized.. Skipping");
if (callback) {
callback(null);
}
return;
}

this._controller.startInitWorker();
// methods
const syncState = cb => {
if (!this._controller.hasEthereum()) {
return cb(null);
try {
if (this._controller.hasEthereum()) {
this._controller.logger().debug("starting sync");
await this._controller.asyncExecCmd(C.SYNC_RECEIVER_PIPELINE, {});
}
this._controller.logger().debug("Starting sync");
this._controller.execCmd(C.SYNC_RECEIVER_PIPELINE, {
onEnd: (err, statusResult) => {
this._controller.logger().debug("Finished sync");
cb(err);
}
});
};
const announceState = cb => {
this._controller.logger().debug("Starting announcing local state");
this._controller.execCmd(C.ANNOUNCE_LOCAL_STATE, {
onResponse: (error, content) => {
if (error) {
this._controller.logger().error("failed announcing " + error);
} else {
content.forEach(ecid => {
this._controller.logger().debug("providing: " + ecid.getKeccack256());
});
}
cb(error);
}

this._controller.logger().debug("starting announcing local state");
const content = await this._controller.asyncExecCmd(C.ANNOUNCE_LOCAL_STATE, {});
content.result.forEach(ecid => {
this._controller.logger().debug("providing: " + ecid.getKeccack256());
});
};
const backgroundServices = cb => {

// TODO:: everything that runs in an infinite loop in the program should be started here.
// TODO:: for example we could start here a process to always ping enigma-core and check if ok
this._controller.logger().debug("starting background services");
if (this._controller.hasEthereum()) {
// subscribe to new epoch events
this._controller
.ethereum()
.services()
Expand All @@ -85,63 +72,38 @@ class InitWorkerAction {
}.bind(this)
);
}
// TODO:: everything that runs in an infinite loop in the program should be started here.
// TODO:: for example we could start here a process to always ping enigma-core and check if ok
// subscribe to self (for responding to rpc requests of other workers)
this._controller.execCmd(C.SELF_KEY_SUBSCRIBE, {});
// log finish this stage
this._controller.logger().debug("started background services");
cb(null);
};
const registerWorker = async () => {
if (this._controller.hasEthereum()) {
let workerParams = null;

// add timer for checking synchronization status and announcing local state
const timer = setInterval(async () => {
this._controller.logger().info("starting periodic local state announcement");
try {
workerParams = await this._controller.asyncExecCmd(C.GET_ETH_WORKER_PARAM);
} catch (err) {
return this._controller
.logger()
.error("error InitWorkerAction- Reading worker params from ethereum failed" + err);
}
// If the worker is already logged-in, nothing to do
if (
workerParams.status === constants.ETHEREUM_WORKER_STATUS.LOGGEDIN ||
workerParams.status === constants.ETHEREUM_WORKER_STATUS.LOGGEDOUT
) {
this._controller.logger().info("InitWorkerAction- worker is already registered");
return;
}
try {
await this._controller.asyncExecCmd(C.REGISTER);
this._controller.logger().debug("starting sync");
await this._controller.asyncExecute(C.SYNC_RECEIVER_PIPELINE, {});
this._controller.logger().debug("starting announcing local state");
const content = this._controller.asyncExecCmd(C.ANNOUNCE_LOCAL_STATE, {});
content.result.forEach(ecid => {
this._controller.logger().debug("providing: " + ecid.getKeccack256());
});
this._controller.logger().info(`periodic local state announcement finished successfully`);
} catch (err) {
return this._controller.logger().error("error InitWorkerAction- Register to ethereum failed" + err);
}
}
};
waterfall(
[
// Sync State
syncState,
// Announce State:
announceState,
// Background Services:
backgroundServices,
// register and login worker
registerWorker
],
err => {
if (err) {
this._controller.logger().error("error InitWorkerAction " + err);
} else {
this._controller.logger().info("success InitWorkerAction");
}
this._controller.initWorkerDone();
if (callback) {
callback(err);
this._controller.logger().error(`error in periodic local state announcement: ${err}`);
}
}
);
}, constants.PERIODIC_ANNOUNCEMENT_INTERVAL_MILI);
this._controller.addTimer(timer);

// register worker
await this._controller.asyncExecCmd(C.REGISTER);
this._controller.logger().info("success InitWorkerAction");
} catch (err) {
this._controller.logger().error("error InitWorkerAction " + err);
error = err;
}
this._controller.initWorkerDone();
if (callback) {
callback(error);
}
}

asyncExecute(params) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class AnnounceLocalStateAction {
asyncExecute(params) {
const action = this;
return new Promise((resolve, reject) => {
params.callback = function(status, result) {
params.onResponse = function(status, result) {
resolve({ status: status, result: result });
};
action.execute(params);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ class ReceiveAllPipelineAction {
asyncExecute(params) {
const action = this;
return new Promise((resolve, reject) => {
params.callback = function(err, results) {
params.onEnd = function(err, results) {
if (err) {
reject(err);
} else {
Expand Down