Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
RunOnFluxBot committed Nov 14, 2024
1 parent 7bb0e74 commit bd7c3b2
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 118 deletions.
186 changes: 69 additions & 117 deletions services/appsService.js
Original file line number Diff line number Diff line change
Expand Up @@ -8974,48 +8974,6 @@ function getAppPorts(appSpecs) {
return appPorts;
}

async function testTrySpawningGlobalApplication() {
// get all the applications list names missing instances
const pipeline = [
{
$lookup: {
from: 'zelappslocation',
localField: 'name',
foreignField: 'name',
as: 'locations',
},
},
{
$addFields: {
actual: { $size: '$locations.name' },
},
},
{
$match: {
$expr: { $lt: ['$actual', { $ifNull: ['$instances', 3] }] },
},
},
{
$project: {
_id: 0,
name: '$name',
actual: '$actual',
required: '$instances',
nodes: { $ifNull: ['$nodes', []] },
geolocation: { $ifNull: ['$geolocation', []] },
},
},
{ $sort: { name: 1 } },
];

const db = dbHelper.databaseConnection();
const database = db.db(config.database.appsglobal.database);
log.info('testTrySpawningGlobalApplication');
const globalAppNamesLocation = await dbHelper.aggregateInDatabase(database, globalAppsInformation, pipeline);
log.info(JSON.stringify(globalAppNamesLocation));
log.info('testTrySpawningGlobalApplication');
}

/**
* To try spawning a global application. Performs various checks before the app is spawned. Checks that app is not already running on the FluxNode/IP address.
* Checks if app already has the required number of instances deployed. Checks that application image is not blacklisted. Checks that ports not already in use.
Expand Down Expand Up @@ -9088,80 +9046,75 @@ async function trySpawningGlobalApplication() {
}

// get all the applications list names
const globalAppNamesLocation = await getAllGlobalApplications(['name', 'geolocation', 'nodes']);
// get all the applications list names missing instances
const pipeline = [
{
$lookup: {
from: 'zelappslocation',
localField: 'name',
foreignField: 'name',
as: 'locations',
},
},
{
$addFields: {
actual: { $size: '$locations.name' },
},
},
{
$match: {
$expr: { $lt: ['$actual', { $ifNull: ['$instances', 3] }] },
},
},
{
$project: {
_id: 0,
name: '$name',
actual: '$actual',
required: '$instances',
nodes: { $ifNull: ['$nodes', []] },
geolocation: { $ifNull: ['$geolocation', []] },
},
},
{ $sort: { name: 1 } },
];

const db = dbHelper.databaseConnection();
const database = db.db(config.database.appsglobal.database);
log.info('testTrySpawningGlobalApplication');
let globalAppNamesLocation = await dbHelper.aggregateInDatabase(database, globalAppsInformation, pipeline);
const numberOfGlobalApps = globalAppNamesLocation.length;
if (!numberOfGlobalApps) {
log.info('No installable application found');
await serviceHelper.delay(config.fluxapps.installation.delay * 1000);
await serviceHelper.delay(5 * 60 * 1000);
trySpawningGlobalApplication();
return;
}

const installDelay = config.fluxapps.installation.delay * 1000;
let probLn = Math.log(2 + numberOfGlobalApps); // from ln(2) -> ln(2 + x)
const adjustedDelay = installDelay / probLn;

let appToRun;
// highly favor application that is targetting our node
// get my collateral
const myCollateral = await generalService.obtainNodeCollateralInformation();
// get my ip address
// filter apps only those that include my ip or my collateral
const scopedApps = globalAppNamesLocation.filter((app) => app.nodes && (app.nodes.includes(myIP) || app.nodes.includes(`${myCollateral.txhash}:${myCollateral.txindex}`)));
const scopedAppsToRun = scopedApps.filter((app) => !trySpawningGlobalAppCache.has(app.name));
// check if this app was already evaluated
const numberOfScopedAppsToRun = scopedAppsToRun.length;
if (numberOfScopedAppsToRun) { // some app should be prioritized on our node
const appToRunNumber = Math.floor((Math.random() * numberOfScopedAppsToRun));
appToRun = scopedAppsToRun[appToRunNumber].name;
}

if (appToRun) { // ensure higher rate spawning for scoped apps
probLn *= 2;
}
// if all ok Check hashes comparison if its out turn to start the app. higher than 1% probability.
const randomNumber = Math.floor((Math.random() * (config.fluxapps.installation.probability / probLn))); // higher probability for more apps on network
if (randomNumber !== 0) {
log.info('Other Flux nodes are evaluating application installation');
await serviceHelper.delay(adjustedDelay);
trySpawningGlobalApplication();
return;
let appToRun = null;
let appFromAppsToBeCheckedLater = false;
const appIndex = appsToBeCheckedLater.findIndex((app) => app.timeToCheck >= Date.now());
if (appIndex >= 0) {
appToRun = appsToBeCheckedLater[appIndex].appName;
appsToBeCheckedLater.splice(appIndex, 1);
appFromAppsToBeCheckedLater = true;
} else {
const myNodeLocation = nodeFullGeolocation();
globalAppNamesLocation = globalAppNamesLocation.filter((app) => (app.geolocation.lengh === 0 || app.geolocation.find((loc) => `ac${myNodeLocation}`.startsWith(loc)))
|| (app.nodes.lengh === 0 || app.nodes.find((ip) => ip === myIP)));
// eslint-disable-next-line no-restricted-syntax
for (const appToRunAux of globalAppNamesLocation) {
if (!trySpawningGlobalAppCache.has(appToRunAux.name) && !appsToBeCheckedLater.includes((app) => app.appName === appToRunAux.name)) {
appToRun = appToRunAux.name;
log.info(`Application ${appToRun} selected to try to spawne. Reported as been running in ${appToRunAux.actual} instances and ${appToRunAux.required} are required.`);
break;
}
}
}

let appFromAppsToBeCheckedLater = false;
// no scoped applicaton, run some global app
if (!appToRun) {
// pick a random one
const appToRunNumber = Math.floor((Math.random() * numberOfGlobalApps));
appToRun = globalAppNamesLocation[appToRunNumber].name;

// force switch to run a geo restricted app
if (appToRunNumber % 5 === 0) { // every 5th run we are forcing application instalation that is in the nodes geolocation, esnuring highly geolocated apps spawn fast enough
// get this node location
const myNodeLocation = nodeFullGeolocation();
const appsInMyLocation = globalAppNamesLocation.filter((apps) => apps.geolocation && apps.geolocation.find((loc) => `ac${myNodeLocation}`.startsWith(loc)));
if (appsInMyLocation.length) {
const numberOfMyNodeGeoApps = appsInMyLocation.length;
const randomGeoAppNumber = Math.floor((Math.random() * numberOfMyNodeGeoApps));
// install geo location restricted app instead
appToRun = appsInMyLocation[randomGeoAppNumber].name;
}
} else if (appToRunNumber % 9 === 0) { // we will be checking every few runs if there are apps on appsToBeCheckedLater to be installed that previously passed all checks but were prioritize to be installed on lower tier nodes
const appIndex = appsToBeCheckedLater.findIndex((app) => app.timeToCheck >= Date.now());
if (appIndex >= 0) {
appToRun = appsToBeCheckedLater[appIndex].appName;
appsToBeCheckedLater.splice(appIndex, 1);
appFromAppsToBeCheckedLater = true;
}
}
}

// Check if App was checked in the last 2 hours.
// This is a small help because random can be getting the same app over and over
if (trySpawningGlobalAppCache.has(appToRun)) {
log.info(`App ${appToRun} was already evaluated in the last 2 hours.`);
const delay = numberOfGlobalApps < 20 ? config.fluxapps.installation.delay * 1000 : config.fluxapps.installation.delay * 1000 * 0.1;
await serviceHelper.delay(delay);
log.info('No app currently to be processed');
await serviceHelper.delay(5 * 60 * 1000);
trySpawningGlobalApplication();
return;
}
Expand All @@ -9174,7 +9127,7 @@ async function trySpawningGlobalApplication() {
// check if app not running on this device
if (runningAppList.find((document) => document.ip.includes(adjustedIP))) {
log.info(`Application ${appToRun} is reported as already running on this Flux IP`);
await serviceHelper.delay(adjustedDelay);
await serviceHelper.delay(5 * 60 * 1000);
trySpawningGlobalApplication();
return;
}
Expand All @@ -9185,7 +9138,7 @@ async function trySpawningGlobalApplication() {
}
if (runningApps.data.find((app) => app.Names[0].slice(5) === appToRun)) {
log.info(`${appToRun} application is already running on this Flux`);
await serviceHelper.delay(adjustedDelay);
await serviceHelper.delay(5 * 60 * 1000);
trySpawningGlobalApplication();
return;
}
Expand All @@ -9200,7 +9153,7 @@ async function trySpawningGlobalApplication() {
let minInstances = appSpecifications.instances || config.fluxapps.minimumInstances; // introduced in v3 of apps specs
if (runningAppList.length >= minInstances) {
log.info(`Application ${appToRun} is already spawned on ${runningAppList.length} instances`);
await serviceHelper.delay(adjustedDelay);
await serviceHelper.delay(5 * 60 * 1000);
trySpawningGlobalApplication();
return;
}
Expand All @@ -9222,7 +9175,7 @@ async function trySpawningGlobalApplication() {
const appExists = apps.find((app) => app.name === appSpecifications.name);
if (appExists) { // double checked in installation process.
log.info(`Application ${appSpecifications.name} is already installed`);
await serviceHelper.delay(adjustedDelay);
await serviceHelper.delay(5 * 60 * 1000);
trySpawningGlobalApplication();
return;
}
Expand Down Expand Up @@ -9255,7 +9208,7 @@ async function trySpawningGlobalApplication() {
const portsPubliclyAvailable = await checkInstallingAppPortAvailable(appPorts);
if (portsPubliclyAvailable === false) {
log.error(`Some of application ports of ${appSpecifications.name} are not available publicly. Installation aborted.`);
await serviceHelper.delay(adjustedDelay);
await serviceHelper.delay(5 * 60 * 1000);
trySpawningGlobalApplication();
return;
}
Expand All @@ -9265,7 +9218,7 @@ async function trySpawningGlobalApplication() {
minInstances = appSpecifications.instances || config.fluxapps.minimumInstances; // introduced in v3 of apps specs
if (runningAppList.length >= minInstances) {
log.info(`Application ${appToRun} is already spawned on ${runningAppList.length} instances`);
await serviceHelper.delay(adjustedDelay);
await serviceHelper.delay(5 * 60 * 1000);
trySpawningGlobalApplication();
return;
}
Expand All @@ -9283,7 +9236,7 @@ async function trySpawningGlobalApplication() {
const sameIpRangeNode = runningAppList.find((location) => location.ip.includes(myIpWithoutPort.substring(0, lastIndex)));
if (sameIpRangeNode) {
log.info(`Application ${appToRun} uses syncthing and it is already spawned on Fluxnode with same ip range`);
await serviceHelper.delay(adjustedDelay);
await serviceHelper.delay(5 * 60 * 1000);
trySpawningGlobalApplication();
return;
}
Expand All @@ -9304,7 +9257,7 @@ async function trySpawningGlobalApplication() {
if (component.repotag === componentToInstall.repotag && componentToInstall.repotag.startsWith('presearch/node')) { // applies to presearch specifically
log.info(`${componentToInstall.repotag} Image is already running on this Flux`);
// eslint-disable-next-line no-await-in-loop
await serviceHelper.delay(adjustedDelay);
await serviceHelper.delay(5 * 60 * 1000);
trySpawningGlobalApplication();
return;
}
Expand Down Expand Up @@ -9364,7 +9317,7 @@ async function trySpawningGlobalApplication() {
await fluxCommunicationMessagesSender.broadcastMessageToOutgoing(appRemovedMessage);
await serviceHelper.delay(500);
await fluxCommunicationMessagesSender.broadcastMessageToIncoming(appRemovedMessage);
await serviceHelper.delay(adjustedDelay);
await serviceHelper.delay(5 * 60 * 1000);
trySpawningGlobalApplication();
return;
}
Expand All @@ -9377,7 +9330,7 @@ async function trySpawningGlobalApplication() {
removeAppLocally(appSpecifications.name, null, true, null, true).catch((error) => log.error(error));
}

await serviceHelper.delay(10 * config.fluxapps.installation.delay * 1000);
await serviceHelper.delay(5 * 60 * 1000);
log.info('Reinitiating possible app installation');
trySpawningGlobalApplication();
} catch (error) {
Expand Down Expand Up @@ -13455,5 +13408,4 @@ module.exports = {
getAppSpecsUSDPrice,
checkApplicationsCpuUSage,
monitorNodeStatus,
testTrySpawningGlobalApplication,
};
1 change: 0 additions & 1 deletion services/serviceManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,6 @@ async function startFluxFunctions() {
log.info('Starting to spawn applications');
appsService.trySpawningGlobalApplication();
}, 125 * 60 * 1000);
appsService.testTrySpawningGlobalApplication();
setInterval(() => {
appsService.checkApplicationsCompliance();
}, 60 * 60 * 1000); // every hour
Expand Down

0 comments on commit bd7c3b2

Please sign in to comment.