Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
RunOnFluxBot committed Nov 8, 2023
1 parent 3b0a933 commit 8ae5f65
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 108 deletions.
97 changes: 43 additions & 54 deletions services/appsService.js
Original file line number Diff line number Diff line change
Expand Up @@ -1859,30 +1859,23 @@ async function createAppVolume(appSpecifications, appName, isComponent, res) {
}

// if s flag create .stfolder
const containersData = appSpecifications.containerData.split('|');
// eslint-disable-next-line no-restricted-syntax
for (let i = 0; i < containersData.length; i += 1) {
const container = containersData[i];
const containerDataFlags = container.split(':')[1] ? container.split(':')[0] : '';
if (containerDataFlags.includes('s')) {
const containerFolder = i === 0 ? '' : `/appdata${container.split(':')[1].replace(containersData[0], '')}`;
const stFolderCreation = {
status: 'Creating .stfolder for syncthing...',
};
log.info(stFolderCreation);
if (res) {
res.write(serviceHelper.ensureString(stFolderCreation));
}
const execDIRst = `sudo mkdir -p ${appsFolder + appId + containerFolder}/.stfolder`;
// eslint-disable-next-line no-await-in-loop
await cmdAsync(execDIRst);
const stFolderCreation2 = {
status: '.stfolder created',
};
log.info(stFolderCreation2);
if (res) {
res.write(serviceHelper.ensureString(stFolderCreation2));
}
const containerDataFlags = appSpecifications.containerData.split('|')[0].split(':')[1] ? appSpecifications.containerData.split('|')[0].split(':')[0] : '';
if (containerDataFlags.includes('s')) {
const stFolderCreation = {
status: 'Creating .stfolder for syncthing...',
};
log.info(stFolderCreation);
if (res) {
res.write(serviceHelper.ensureString(stFolderCreation));
}
const execDIRst = `sudo mkdir -p ${appsFolder + appId}/.stfolder`;
await cmdAsync(execDIRst);
const stFolderCreation2 = {
status: '.stfolder created',
};
log.info(stFolderCreation2);
if (res) {
res.write(serviceHelper.ensureString(stFolderCreation2));
}
}

Expand Down Expand Up @@ -9823,36 +9816,32 @@ async function stopSyncthingApp(appComponentName, res) {
return;
}
let folderId = null;
// eslint-disable-next-line no-restricted-syntax
for (const syncthingFolder of allSyncthingFolders.data) {
if (syncthingFolder.path === folder || syncthingFolder.path.includes(`${folder}/`)) {
allSyncthingFolders.data.forEach((syncthingFolder) => {
if (syncthingFolder.path === folder) {
folderId = syncthingFolder.id;
}
if (folderId) {
const adjustSyncthingA = {
status: `Stopping syncthing on folder ${syncthingFolder.path}...`,
};
// remove folder from syncthing
// eslint-disable-next-line no-await-in-loop
await syncthingService.adjustConfigFolders('delete', undefined, folderId);
// eslint-disable-next-line no-await-in-loop
const restartRequired = await syncthingService.getConfigRestartRequired();
if (restartRequired.status === 'success' && restartRequired.data.requiresRestart === true) {
// eslint-disable-next-line no-await-in-loop
await syncthingService.systemRestart();
}
const adjustSyncthingB = {
status: 'Syncthing adjusted',
};
log.info(adjustSyncthingA);
if (res) {
res.write(serviceHelper.ensureString(adjustSyncthingA));
}
if (res) {
res.write(serviceHelper.ensureString(adjustSyncthingB));
}
}
folderId = null;
});
if (!folderId) {
return;
}
const adjustSyncthingA = {
status: 'Adjusting Syncthing...',
};
// remove folder from syncthing
await syncthingService.adjustConfigFolders('delete', undefined, folderId);
const restartRequired = await syncthingService.getConfigRestartRequired();
if (restartRequired.status === 'success' && restartRequired.data.requiresRestart === true) {
await syncthingService.systemRestart();
}
const adjustSyncthingB = {
status: 'Syncthing adjusted',
};
log.info(adjustSyncthingA);
if (res) {
res.write(serviceHelper.ensureString(adjustSyncthingA));
}
if (res) {
res.write(serviceHelper.ensureString(adjustSyncthingB));
}
} catch (error) {
log.error(error);
Expand Down Expand Up @@ -9907,7 +9896,7 @@ async function syncthingApps() {
const container = containersData[i];
const containerDataFlags = container.split(':')[1] ? container.split(':')[0] : '';
if (containerDataFlags.includes('s')) {
const containerFolder = i === 0 ? '' : `/appdata${container.split(':')[1].replace(containersData[0], '')}`;
const containerFolder = i === 0 ? '' : container.split(':')[1];
const identifier = installedApp.name;
const appId = dockerService.getAppIdentifier(identifier);
const folder = `${appsFolder + appId + containerFolder}`;
Expand Down Expand Up @@ -9964,7 +9953,7 @@ async function syncthingApps() {
const container = containersData[i];
const containerDataFlags = container.split(':')[1] ? container.split(':')[0] : '';
if (containerDataFlags.includes('s')) {
const containerFolder = i === 0 ? '' : `/appdata${container.split(':')[1].replace(containersData[0], '')}`;
const containerFolder = i === 0 ? '' : container.split(':')[1];
const identifier = `${installedComponent.name}_${installedApp.name}`;
const appId = dockerService.getAppIdentifier(identifier);
const folder = `${appsFolder + appId + containerFolder}`;
Expand Down
84 changes: 36 additions & 48 deletions services/fluxCommunication.js
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ function handleIncomingConnection(ws, req, expressWS) {
}, 1000);
return;
}
const findPeer = incomingPeers.find((p) => p.ip === ws._socket.remoteAddress && p.port === ws._socket.remotePort);
const findPeer = incomingPeers.find((p) => p.ip === ws._socket.remoteAddress);
if (findPeer) {
setTimeout(() => {
ws.close(1000, 'Peer received is already in incomingPeers list');
Expand All @@ -194,7 +194,6 @@ function handleIncomingConnection(ws, req, expressWS) {
}
const peer = {
ip: ws._socket.remoteAddress,
port: ws._socket.remotePort,
};
const ipv4Peer = peer.ip.replace('::ffff:', '');
// eslint-disable-next-line no-restricted-syntax
Expand Down Expand Up @@ -280,13 +279,13 @@ function handleIncomingConnection(ws, req, expressWS) {
// check if message comes from IP belonging to the public Key
const zl = await fluxCommunicationUtils.deterministicFluxList(pubKey); // this itself is sufficient.
const possibleNodes = zl.filter((key) => key.pubkey === pubKey); // another check in case sufficient check failed on daemon level
const nodeFound = possibleNodes.find((n) => n.ip.split(':')[0] === peer.ip.replace('::ffff:', '') && (n.ip.split(':')[1] || 16127) === peer.port);
const nodeFound = possibleNodes.find((n) => n.ip === peer.ip.replace('::ffff:', ''));
if (!nodeFound) {
log.warn(`Invalid message received from incoming peer ${peer.ip}:${peer.port} which is not an originating node of ${pubKey}.`);
log.warn(`Invalid message received from incoming peer ${peer.ip} which is not an originating node of ${pubKey}.`);
ws.close(1000, 'invalid message, disconnect'); // close as of policy violation
} else {
blockedPubKeysCache.set(pubKey, pubKey); // blocks ALL the nodes corresponding to the pubKey
log.warn(`closing incoming connection, adding peers ${pubKey}:${peer.port} to the blockedList. Originated from ${peer.ip}.`);
log.warn(`closing incoming connection, adding peers ${pubKey} to the blockedList. Originated from ${peer.ip}.`);
ws.close(1000, 'invalid message, blocked'); // close as of policy violation?
}
} catch (e) {
Expand All @@ -296,10 +295,9 @@ function handleIncomingConnection(ws, req, expressWS) {
});
ws.on('error', async (msg) => {
const ip = ws._socket.remoteAddress;
const { port } = ws._socket;
log.warn(`Incoming connection error ${ip}:${port}`);
const ocIndex = incomingConnections.findIndex((incomingCon) => ws._socket.remoteAddress === incomingCon._socket.remoteAddress && ws._socket.port === incomingCon._socket.port);
const foundPeer = incomingPeers.find((mypeer) => mypeer.ip === ip && mypeer.port === port);
log.warn(`Incoming connection error ${ip}`);
const ocIndex = incomingConnections.findIndex((incomingCon) => ws._socket.remoteAddress === incomingCon._socket.remoteAddress);
const foundPeer = incomingPeers.find((mypeer) => mypeer.ip === ip);
if (ocIndex > -1) {
incomingConnections.splice(ocIndex, 1);
}
Expand All @@ -313,10 +311,9 @@ function handleIncomingConnection(ws, req, expressWS) {
});
ws.on('close', async (msg) => {
const ip = ws._socket.remoteAddress;
const { port } = ws._socket;
log.warn(`Incoming connection close ${ip}:${port}`);
const ocIndex = incomingConnections.findIndex((incomingCon) => ws._socket.remoteAddress === incomingCon._socket.remoteAddress && ws._socket.port === incomingCon._socket.port);
const foundPeer = incomingPeers.find((mypeer) => mypeer.ip === ip && mypeer.port === port);
log.warn(`Incoming connection close ${ip}`);
const ocIndex = incomingConnections.findIndex((incomingCon) => ws._socket.remoteAddress === incomingCon._socket.remoteAddress);
const foundPeer = incomingPeers.find((mypeer) => mypeer.ip === ip);
if (ocIndex > -1) {
incomingConnections.splice(ocIndex, 1);
}
Expand Down Expand Up @@ -455,19 +452,17 @@ async function initiateAndHandleConnection(connection) {
outgoingConnections.push(websocket);
const peer = {
ip, // can represent just one ip address, multiport
port,
lastPingTime: null,
latency: null,
};
log.info(JSON.stringify(peer));
outgoingPeers.push(peer);
};

// every time a ping is sent a pong as received, measure latency
websocket.on('pong', () => {
try {
const curTime = new Date().getTime();
const foundPeer = outgoingPeers.find((peer) => peer.ip === ip && peer.port === port);
const foundPeer = outgoingPeers.find((peer) => peer.ip === ip);
if (foundPeer) {
foundPeer.latency = Math.ceil((curTime - foundPeer.lastPingTime) / 2);
}
Expand All @@ -482,7 +477,7 @@ async function initiateAndHandleConnection(connection) {
log.info(`Connection to ${connection} closed with code ${evt.code}`);
outgoingConnections.splice(ocIndex, 1);
}
const foundPeer = outgoingPeers.find((peer) => peer.ip === ip && peer.port === port);
const foundPeer = outgoingPeers.find((peer) => peer.ip === ip);
if (foundPeer) {
const peerIndex = outgoingPeers.indexOf(foundPeer);
if (peerIndex > -1) {
Expand Down Expand Up @@ -575,7 +570,7 @@ async function initiateAndHandleConnection(connection) {
log.info(`Connection to ${connection} errord with code ${evt.code}`);
outgoingConnections.splice(ocIndex, 1);
}
const foundPeer = outgoingPeers.find((peer) => peer.ip === ip && peer.port === port);
const foundPeer = outgoingPeers.find((peer) => peer.ip === ip);
if (foundPeer) {
const peerIndex = outgoingPeers.indexOf(foundPeer);
if (peerIndex > -1) {
Expand Down Expand Up @@ -604,10 +599,9 @@ async function addPeer(req, res) {
return res.json(errMessage);
}
const justIP = ip.split(':')[0];
const port = ip.split(':')[1] || 16127;
const wsObj = outgoingConnections.find((client) => client._socket.remoteAddress === justIP && client._socket.port === port);
const wsObj = outgoingConnections.find((client) => client._socket.remoteAddress === justIP);
if (wsObj) {
const errMessage = messageHelper.createErrorMessage(`Already connected to ${justIP}:${port}`);
const errMessage = messageHelper.createErrorMessage(`Already connected to ${justIP}`);
return res.json(errMessage);
}
const authorized = await verificationHelper.verifyPrivilege('adminandfluxteam', req);
Expand All @@ -617,7 +611,7 @@ async function addPeer(req, res) {
return res.json(message);
}
initiateAndHandleConnection(ip);
const message = messageHelper.createSuccessMessage(`Outgoing connection to ${ip}:${port} initiated`);
const message = messageHelper.createSuccessMessage(`Outgoing connection to ${ip} initiated`);
return res.json(message);
} catch (error) {
log.error(error);
Expand Down Expand Up @@ -654,23 +648,22 @@ async function addOutgoingPeer(req, res) {
const errMessage = messageHelper.createErrorMessage(`Request ip ${remoteIP4} of ${remoteIP} doesn't match the ip: ${justIP} to connect.`);
return res.json(errMessage);
}
const port = ip.split(':')[1] || 16127;

const wsObj = outgoingConnections.find((client) => client._socket.remoteAddress === justIP && client._socket.port === port);
const wsObj = outgoingConnections.find((client) => client._socket.remoteAddress === justIP);
if (wsObj) {
const errMessage = messageHelper.createErrorMessage(`Already connected to ${justIP}:${port}`);
const errMessage = messageHelper.createErrorMessage(`Already connected to ${justIP}`);
return res.json(errMessage);
}

const nodeList = await fluxCommunicationUtils.deterministicFluxList();
const fluxNode = nodeList.find((node) => node.ip.split(':')[0] === ip && (node.ip.split(':')[1] || 16127) === port);
const fluxNode = nodeList.find((node) => node.ip === ip);
if (!fluxNode) {
const errMessage = messageHelper.createErrorMessage(`FluxNode ${ip}:${port} is not confirmed on the network.`);
const errMessage = messageHelper.createErrorMessage(`FluxNode ${ip} is not confirmed on the network.`);
return res.json(errMessage);
}

initiateAndHandleConnection(ip);
const message = messageHelper.createSuccessMessage(`Outgoing connection to ${ip}:${port} initiated`);
const message = messageHelper.createSuccessMessage(`Outgoing connection to ${ip} initiated`);
return res.json(message);
} catch (error) {
log.error(error);
Expand Down Expand Up @@ -733,11 +726,9 @@ async function fluxDiscovery() {
for (let i = 1; i <= minDeterministicOutPeers; i += 1) {
const fixedIndex = fluxNodeIndex + i < sortedNodeList.length ? fluxNodeIndex + i : fluxNodeIndex + i - sortedNodeList.length;
const { ip } = sortedNodeList[fixedIndex];
const ipInc = ip.split(':')[0];
const portInc = ip.split(':')[1] || 16127;
// additional precaution
const clientExists = outgoingConnections.find((client) => client._socket.remoteAddress === ipInc && client._socket.port === portInc);
const clientIncomingExists = incomingConnections.find((client) => client._socket.remoteAddress.replace('::ffff:', '') === ipInc && client._socket.port === portInc);
const clientExists = outgoingConnections.find((client) => client._socket.remoteAddress === ip);
const clientIncomingExists = incomingConnections.find((client) => client._socket.remoteAddress.replace('::ffff:', '') === ip);
if (!clientExists && !clientIncomingExists) {
deterministicPeerConnections = true;
initiateAndHandleConnection(ip);
Expand All @@ -749,13 +740,12 @@ async function fluxDiscovery() {
for (let i = 1; i <= minDeterministicOutPeers; i += 1) {
const fixedIndex = fluxNodeIndex - i > 0 ? fluxNodeIndex - i : sortedNodeList.length - fluxNodeIndex - i;
const { ip } = sortedNodeList[fixedIndex];
const ipInc = ip.split(':')[0];
const portInc = ip.split(':')[1] || 16127;
// additional precaution
const clientExists = outgoingConnections.find((client) => client._socket.remoteAddress === ipInc && client._socket.port === portInc);
const clientIncomingExists = incomingConnections.find((client) => client._socket.remoteAddress.replace('::ffff:', '') === ipInc && client._socket.port === portInc);
if (!clientExists && !clientIncomingExists) {
const clientIncomingExists = incomingConnections.find((client) => client._socket.remoteAddress.replace('::ffff:', '') === ip);
if (!clientIncomingExists) {
deterministicPeerConnections = true;
const ipInc = ip.split(':')[0];
const portInc = ip.split(':')[1] || 16127;
// eslint-disable-next-line no-await-in-loop
await serviceHelper.axiosGet(`http://${ipInc}:${portInc}/flux/addoutgoingpeer/${myIP}`).catch((error) => log.error(error));
}
Expand All @@ -771,15 +761,14 @@ async function fluxDiscovery() {
// eslint-disable-next-line no-await-in-loop
const connection = await fluxNetworkHelper.getRandomConnection();
if (connection) {
let ipInc = connection.split(':')[0];
let portInc = connection.split(':')[1] || 16127;
const ip = connection.split(':')[0];
// additional precaution
let sameConnectedIp = currentIpsConnTried.find((connectedIP) => connectedIP === ipInc);
let clientExists = outgoingConnections.find((client) => client._socket.remoteAddress === ipInc && client._socket.port === portInc);
let clientIncomingExists = incomingConnections.find((client) => client._socket.remoteAddress.replace('::ffff:', '') === ipInc && client._socket.port === portInc);
let sameConnectedIp = currentIpsConnTried.find((connectedIP) => connectedIP === ip);
let clientExists = outgoingConnections.find((client) => client._socket.remoteAddress === ip);
let clientIncomingExists = incomingConnections.find((client) => client._socket.remoteAddress.replace('::ffff:', '') === ip);
if (!sameConnectedIp && !clientExists && !clientIncomingExists) {
log.info(`Adding random Flux peer: ${connection}`);
currentIpsConnTried.push(connection);
currentIpsConnTried.push(ip);
initiateAndHandleConnection(connection);
}
// Max of 8 incoming connections - 8 possible deterministic + x random if needed;
Expand All @@ -789,15 +778,14 @@ async function fluxDiscovery() {
// eslint-disable-next-line no-await-in-loop
const connectionInc = await fluxNetworkHelper.getRandomConnection();
if (connectionInc) {
ipInc = connectionInc.split(':')[0];
portInc = connectionInc.split(':')[1] || 16127;
const ipInc = connectionInc.split(':')[0];
const portInc = connectionInc.split(':')[1] || 16127;
// additional precaution
sameConnectedIp = currentIpsConnTried.find((connectedIP) => connectedIP === ipInc);
clientExists = outgoingConnections.find((client) => client._socket.remoteAddress === ipInc && client._socket.port === portInc);
clientIncomingExists = incomingConnections.find((client) => client._socket.remoteAddress.replace('::ffff:', '') === ipInc && client._socket.port === portInc);
clientExists = outgoingConnections.find((client) => client._socket.remoteAddress === ipInc);
clientIncomingExists = incomingConnections.find((client) => client._socket.remoteAddress.replace('::ffff:', '') === ipInc);
if (!sameConnectedIp && !clientExists && !clientIncomingExists) {
log.info(`Asking random Flux ${connectionInc} to add us as a peer`);
currentIpsConnTried.push(connectionInc);
// eslint-disable-next-line no-await-in-loop
await serviceHelper.axiosGet(`http://${ipInc}:${portInc}/flux/addoutgoingpeer/${myIP}`).catch((error) => log.error(error));
}
Expand Down
Loading

0 comments on commit 8ae5f65

Please sign in to comment.