From a14d994b4ba1040b51bdf8fc1d5872933d1eae40 Mon Sep 17 00:00:00 2001 From: RunOnFlux Date: Sat, 11 Nov 2023 11:20:05 +0000 Subject: [PATCH] Update from https://github.com/RunOnFlux/flux/commit/54dc5cb029c8e8cb1e39af9047bc6d2eb7379c35 --- services/appsService.js | 97 +++++----- services/fluxCommunication.js | 200 ++++++++------------ services/fluxCommunicationMessagesSender.js | 12 +- services/fluxNetworkHelper.js | 35 ++-- services/utils/establishedConnections.js | 4 +- 5 files changed, 146 insertions(+), 202 deletions(-) diff --git a/services/appsService.js b/services/appsService.js index 4826e14c..ee71512f 100644 --- a/services/appsService.js +++ b/services/appsService.js @@ -1859,30 +1859,23 @@ async function createAppVolume(appSpecifications, appName, isComponent, res) { } // if s flag create .stfolder - const containersData = appSpecifications.containerData.split('|'); - // eslint-disable-next-line no-restricted-syntax - for (let i = 0; i < containersData.length; i += 1) { - const container = containersData[i]; - const containerDataFlags = container.split(':')[1] ? container.split(':')[0] : ''; - if (containerDataFlags.includes('s')) { - const containerFolder = i === 0 ? '' : `/appdata${container.split(':')[1].replace(containersData[0], '')}`; - const stFolderCreation = { - status: 'Creating .stfolder for syncthing...', - }; - log.info(stFolderCreation); - if (res) { - res.write(serviceHelper.ensureString(stFolderCreation)); - } - const execDIRst = `sudo mkdir -p ${appsFolder + appId + containerFolder}/.stfolder`; - // eslint-disable-next-line no-await-in-loop - await cmdAsync(execDIRst); - const stFolderCreation2 = { - status: '.stfolder created', - }; - log.info(stFolderCreation2); - if (res) { - res.write(serviceHelper.ensureString(stFolderCreation2)); - } + const containerDataFlags = appSpecifications.containerData.split('|')[0].split(':')[1] ? appSpecifications.containerData.split('|')[0].split(':')[0] : ''; + if (containerDataFlags.includes('s')) { + const stFolderCreation = { + status: 'Creating .stfolder for syncthing...', + }; + log.info(stFolderCreation); + if (res) { + res.write(serviceHelper.ensureString(stFolderCreation)); + } + const execDIRst = `sudo mkdir -p ${appsFolder + appId}/.stfolder`; + await cmdAsync(execDIRst); + const stFolderCreation2 = { + status: '.stfolder created', + }; + log.info(stFolderCreation2); + if (res) { + res.write(serviceHelper.ensureString(stFolderCreation2)); } } @@ -9823,36 +9816,32 @@ async function stopSyncthingApp(appComponentName, res) { return; } let folderId = null; - // eslint-disable-next-line no-restricted-syntax - for (const syncthingFolder of allSyncthingFolders.data) { - if (syncthingFolder.path === folder || syncthingFolder.path.includes(`${folder}/`)) { + allSyncthingFolders.data.forEach((syncthingFolder) => { + if (syncthingFolder.path === folder) { folderId = syncthingFolder.id; } - if (folderId) { - const adjustSyncthingA = { - status: `Stopping syncthing on folder ${syncthingFolder.path}...`, - }; - // remove folder from syncthing - // eslint-disable-next-line no-await-in-loop - await syncthingService.adjustConfigFolders('delete', undefined, folderId); - // eslint-disable-next-line no-await-in-loop - const restartRequired = await syncthingService.getConfigRestartRequired(); - if (restartRequired.status === 'success' && restartRequired.data.requiresRestart === true) { - // eslint-disable-next-line no-await-in-loop - await syncthingService.systemRestart(); - } - const adjustSyncthingB = { - status: 'Syncthing adjusted', - }; - log.info(adjustSyncthingA); - if (res) { - res.write(serviceHelper.ensureString(adjustSyncthingA)); - } - if (res) { - res.write(serviceHelper.ensureString(adjustSyncthingB)); - } - } - folderId = null; + }); + if (!folderId) { + return; + } + const adjustSyncthingA = { + status: 'Adjusting Syncthing...', + }; + // remove folder from syncthing + await syncthingService.adjustConfigFolders('delete', undefined, folderId); + const restartRequired = await syncthingService.getConfigRestartRequired(); + if (restartRequired.status === 'success' && restartRequired.data.requiresRestart === true) { + await syncthingService.systemRestart(); + } + const adjustSyncthingB = { + status: 'Syncthing adjusted', + }; + log.info(adjustSyncthingA); + if (res) { + res.write(serviceHelper.ensureString(adjustSyncthingA)); + } + if (res) { + res.write(serviceHelper.ensureString(adjustSyncthingB)); } } catch (error) { log.error(error); @@ -9907,7 +9896,7 @@ async function syncthingApps() { const container = containersData[i]; const containerDataFlags = container.split(':')[1] ? container.split(':')[0] : ''; if (containerDataFlags.includes('s')) { - const containerFolder = i === 0 ? '' : `/appdata${container.split(':')[1].replace(containersData[0], '')}`; + const containerFolder = i === 0 ? '' : container.split(':')[1]; const identifier = installedApp.name; const appId = dockerService.getAppIdentifier(identifier); const folder = `${appsFolder + appId + containerFolder}`; @@ -9964,7 +9953,7 @@ async function syncthingApps() { const container = containersData[i]; const containerDataFlags = container.split(':')[1] ? container.split(':')[0] : ''; if (containerDataFlags.includes('s')) { - const containerFolder = i === 0 ? '' : `/appdata${container.split(':')[1].replace(containersData[0], '')}`; + const containerFolder = i === 0 ? '' : container.split(':')[1]; const identifier = `${installedComponent.name}_${installedApp.name}`; const appId = dockerService.getAppIdentifier(identifier); const folder = `${appsFolder + appId + containerFolder}`; diff --git a/services/fluxCommunication.js b/services/fluxCommunication.js index 49886f90..3c07cb67 100644 --- a/services/fluxCommunication.js +++ b/services/fluxCommunication.js @@ -61,9 +61,8 @@ const privateIpsList = [ * To handle temporary app messages. * @param {object} message Message. * @param {string} fromIP Sender's IP address. - * @param {string} port Sender's node Api port. */ -async function handleAppMessages(message, fromIP, port) { +async function handleAppMessages(message, fromIP) { try { // check if we have it in database and if not add // if not in database, rebroadcast to all connections @@ -73,10 +72,10 @@ async function handleAppMessages(message, fromIP, port) { const rebroadcastToPeers = await appsService.storeAppTemporaryMessage(message.data, true); if (rebroadcastToPeers === true) { const messageString = serviceHelper.ensureString(message); - const wsListOut = outgoingConnections.filter((client) => client._socket.remoteAddress !== fromIP && client.port !== port); + const wsListOut = outgoingConnections.filter((client) => client._socket.remoteAddress !== fromIP); fluxCommunicationMessagesSender.sendToAllPeers(messageString, wsListOut); await serviceHelper.delay(100); - const wsList = incomingConnections.filter((client) => client._socket.remoteAddress.replace('::ffff:', '') !== fromIP && client.port !== port); + const wsList = incomingConnections.filter((client) => client._socket.remoteAddress.replace('::ffff:', '') !== fromIP); fluxCommunicationMessagesSender.sendToAllIncomingConnections(messageString, wsList); } } catch (error) { @@ -88,9 +87,8 @@ async function handleAppMessages(message, fromIP, port) { * To handle running app messages. * @param {object} message Message. * @param {string} fromIP Sender's IP address. - * @param {string} port Sender's node Api port. */ -async function handleAppRunningMessage(message, fromIP, port) { +async function handleAppRunningMessage(message, fromIP) { try { // check if we have it exactly like that in database and if not, update // if not in database, rebroadcast to all connections @@ -102,10 +100,10 @@ async function handleAppRunningMessage(message, fromIP, port) { const timestampOK = fluxCommunicationUtils.verifyTimestampInFluxBroadcast(message, currentTimeStamp, 240000); if (rebroadcastToPeers === true && timestampOK) { const messageString = serviceHelper.ensureString(message); - const wsListOut = outgoingConnections.filter((client) => client._socket.remoteAddress !== fromIP && client.port !== port); + const wsListOut = outgoingConnections.filter((client) => client._socket.remoteAddress !== fromIP); fluxCommunicationMessagesSender.sendToAllPeers(messageString, wsListOut); await serviceHelper.delay(500); - const wsList = incomingConnections.filter((client) => client._socket.remoteAddress.replace('::ffff:', '') !== fromIP && client.port !== port); + const wsList = incomingConnections.filter((client) => client._socket.remoteAddress.replace('::ffff:', '') !== fromIP); fluxCommunicationMessagesSender.sendToAllIncomingConnections(messageString, wsList); } } catch (error) { @@ -117,9 +115,8 @@ async function handleAppRunningMessage(message, fromIP, port) { * To handle running app messages. * @param {object} message Message. * @param {string} fromIP Sender's IP address. - * @param {string} port Sender's node Api port. */ -async function handleIPChangedMessage(message, fromIP, port) { +async function handleIPChangedMessage(message, fromIP) { try { // check if we have it any app running on that location and if yes, update information // rebroadcast message to the network if it's valid @@ -130,10 +127,10 @@ async function handleIPChangedMessage(message, fromIP, port) { const timestampOK = fluxCommunicationUtils.verifyTimestampInFluxBroadcast(message, currentTimeStamp, 240000); if (rebroadcastToPeers && timestampOK) { const messageString = serviceHelper.ensureString(message); - const wsListOut = outgoingConnections.filter((client) => client._socket.remoteAddress !== fromIP && client.port !== port); + const wsListOut = outgoingConnections.filter((client) => client._socket.remoteAddress !== fromIP); fluxCommunicationMessagesSender.sendToAllPeers(messageString, wsListOut); await serviceHelper.delay(500); - const wsList = incomingConnections.filter((client) => client._socket.remoteAddress.replace('::ffff:', '') !== fromIP && client.port !== port); + const wsList = incomingConnections.filter((client) => client._socket.remoteAddress.replace('::ffff:', '') !== fromIP); fluxCommunicationMessagesSender.sendToAllIncomingConnections(messageString, wsList); } } catch (error) { @@ -145,9 +142,8 @@ async function handleIPChangedMessage(message, fromIP, port) { * To handle running app messages. * @param {object} message Message. * @param {string} fromIP Sender's IP address. - * @param {string} port Sender's node Api port. */ -async function handleAppRemovedMessage(message, fromIP, port) { +async function handleAppRemovedMessage(message, fromIP) { try { // check if we have it any app running on that location and if yes, delete that information // rebroadcast message to the network if it's valid @@ -158,10 +154,10 @@ async function handleAppRemovedMessage(message, fromIP, port) { const timestampOK = fluxCommunicationUtils.verifyTimestampInFluxBroadcast(message, currentTimeStamp, 240000); if (rebroadcastToPeers && timestampOK) { const messageString = serviceHelper.ensureString(message); - const wsListOut = outgoingConnections.filter((client) => client._socket.remoteAddress !== fromIP && client.port !== port); + const wsListOut = outgoingConnections.filter((client) => client._socket.remoteAddress !== fromIP); fluxCommunicationMessagesSender.sendToAllPeers(messageString, wsListOut); await serviceHelper.delay(500); - const wsList = incomingConnections.filter((client) => client._socket.remoteAddress.replace('::ffff:', '') !== fromIP && client.port !== port); + const wsList = incomingConnections.filter((client) => client._socket.remoteAddress.replace('::ffff:', '') !== fromIP); fluxCommunicationMessagesSender.sendToAllIncomingConnections(messageString, wsList); } } catch (error) { @@ -171,16 +167,14 @@ async function handleAppRemovedMessage(message, fromIP, port) { /** * To handle incoming connection. Several types of verification are performed. - * @param {object} websocket Web socket. + * @param {object} ws Web socket. * @param {object} req Request. * @param {object} expressWS Express web socket. * @returns {void} Return statement is only used here to interrupt the function and nothing is returned. */ // let messageNumber = 0; // eslint-disable-next-line no-unused-vars -function handleIncomingConnection(websocket, req, expressWS) { - const ws = websocket; - const port = req.params.port || 16127; +function handleIncomingConnection(ws, req, expressWS) { // now we are in connections state. push the websocket to our incomingconnections const maxPeers = 4 * config.fluxapps.minIncoming; const maxNumberOfConnections = numberOfFluxNodes / 160 < 9 * config.fluxapps.minIncoming ? numberOfFluxNodes / 160 : 9 * config.fluxapps.minIncoming; @@ -191,19 +185,17 @@ function handleIncomingConnection(websocket, req, expressWS) { }, 1000); return; } - const findPeer = incomingPeers.find((p) => p.ip === ws._socket.remoteAddress.replace('::ffff:', '') && p.port === port); + const findPeer = incomingPeers.find((p) => p.ip === ws._socket.remoteAddress); if (findPeer) { setTimeout(() => { ws.close(1000, 'Peer received is already in incomingPeers list'); }, 1000); return; } - const ipv4Peer = ws._socket.remoteAddress.replace('::ffff:', ''); const peer = { - ip: ipv4Peer, - port, + ip: ws._socket.remoteAddress, }; - + const ipv4Peer = peer.ip.replace('::ffff:', ''); // eslint-disable-next-line no-restricted-syntax for (const privateIp of privateIpsList) { if (ipv4Peer.startsWith(privateIp)) { @@ -214,7 +206,6 @@ function handleIncomingConnection(websocket, req, expressWS) { return; } } - ws.port = port; incomingConnections.push(ws); incomingPeers.push(peer); // verify data integrity, if not signed, close connection @@ -242,7 +233,7 @@ function handleIncomingConnection(websocket, req, expressWS) { } myCacheTemp.set(messageHash, messageHash); // check rate limit - const rateOK = fluxNetworkHelper.lruRateLimit(`${ipv4Peer}:${port}`, 90); + const rateOK = fluxNetworkHelper.lruRateLimit(ipv4Peer, 90); if (!rateOK) { return; // do not react to the message } @@ -265,15 +256,15 @@ function handleIncomingConnection(websocket, req, expressWS) { if (timestampOK === true) { try { if (msgObj.data.type === 'zelappregister' || msgObj.data.type === 'zelappupdate' || msgObj.data.type === 'fluxappregister' || msgObj.data.type === 'fluxappupdate') { - handleAppMessages(msgObj, peer.ip, peer.port); + handleAppMessages(msgObj, peer.ip.replace('::ffff:', '')); } else if (msgObj.data.type === 'fluxapprequest') { fluxCommunicationMessagesSender.respondWithAppMessage(msgObj, ws); } else if (msgObj.data.type === 'fluxapprunning') { - handleAppRunningMessage(msgObj, peer.ip, peer.port); + handleAppRunningMessage(msgObj, peer.ip.replace('::ffff:', '')); } else if (msgObj.data.type === 'fluxipchanged') { - handleIPChangedMessage(msgObj, peer.ip, peer.port); + handleIPChangedMessage(msgObj, peer.ip.replace('::ffff:', '')); } else if (msgObj.data.type === 'fluxappremoved') { - handleAppRemovedMessage(msgObj, peer.ip, peer.port); + handleAppRemovedMessage(msgObj, peer.ip.replace('::ffff:', '')); } else { log.warn(`Unrecognised message type of ${msgObj.data.type}`); } @@ -288,13 +279,13 @@ function handleIncomingConnection(websocket, req, expressWS) { // check if message comes from IP belonging to the public Key const zl = await fluxCommunicationUtils.deterministicFluxList(pubKey); // this itself is sufficient. const possibleNodes = zl.filter((key) => key.pubkey === pubKey); // another check in case sufficient check failed on daemon level - const nodeFound = possibleNodes.find((n) => n.ip.split(':')[0] === peer.ip.replace('::ffff:', '') && (n.ip.split(':')[1] || 16127) === peer.port); + const nodeFound = possibleNodes.find((n) => n.ip === peer.ip.replace('::ffff:', '')); if (!nodeFound) { - log.warn(`Invalid message received from incoming peer ${peer.ip}:${peer.port} which is not an originating node of ${pubKey}.`); + log.warn(`Invalid message received from incoming peer ${peer.ip} which is not an originating node of ${pubKey}.`); ws.close(1000, 'invalid message, disconnect'); // close as of policy violation } else { blockedPubKeysCache.set(pubKey, pubKey); // blocks ALL the nodes corresponding to the pubKey - log.warn(`closing incoming connection, adding peers ${pubKey}:${peer.port} to the blockedList. Originated from ${peer.ip}.`); + log.warn(`closing incoming connection, adding peers ${pubKey} to the blockedList. Originated from ${peer.ip}.`); ws.close(1000, 'invalid message, blocked'); // close as of policy violation? } } catch (e) { @@ -304,9 +295,9 @@ function handleIncomingConnection(websocket, req, expressWS) { }); ws.on('error', async (msg) => { const ip = ws._socket.remoteAddress; - log.warn(`Incoming connection error ${ip}:${port}`); - const ocIndex = incomingConnections.findIndex((incomingCon) => ws._socket.remoteAddress === incomingCon._socket.remoteAddress && ws.port === incomingCon.port); - const foundPeer = incomingPeers.find((mypeer) => mypeer.ip === ip && mypeer.port === port); + log.warn(`Incoming connection error ${ip}`); + const ocIndex = incomingConnections.findIndex((incomingCon) => ws._socket.remoteAddress === incomingCon._socket.remoteAddress); + const foundPeer = incomingPeers.find((mypeer) => mypeer.ip === ip); if (ocIndex > -1) { incomingConnections.splice(ocIndex, 1); } @@ -320,9 +311,9 @@ function handleIncomingConnection(websocket, req, expressWS) { }); ws.on('close', async (msg) => { const ip = ws._socket.remoteAddress; - log.warn(`Incoming connection close ${ip}:${port}`); - const ocIndex = incomingConnections.findIndex((incomingCon) => ws._socket.remoteAddress === incomingCon._socket.remoteAddress && ws.port === incomingCon.port); - const foundPeer = incomingPeers.find((mypeer) => mypeer.ip === ip && mypeer.port === port); + log.warn(`Incoming connection close ${ip}`); + const ocIndex = incomingConnections.findIndex((incomingCon) => ws._socket.remoteAddress === incomingCon._socket.remoteAddress); + const foundPeer = incomingPeers.find((mypeer) => mypeer.ip === ip); if (ocIndex > -1) { incomingConnections.splice(ocIndex, 1); } @@ -387,12 +378,10 @@ async function removePeer(req, res) { const errMessage = messageHelper.createErrorMessage('No IP address specified.'); return res.json(errMessage); } - const justIP = ip.split(':')[0]; - const port = ip.split(':')[1] || 16127; const authorized = await verificationHelper.verifyPrivilege('adminandfluxteam', req); if (authorized === true) { - const closeResponse = await fluxNetworkHelper.closeConnection(justIP, port); + const closeResponse = await fluxNetworkHelper.closeConnection(ip); response = closeResponse; } else { response = messageHelper.errUnauthorizedMessage(); @@ -424,12 +413,10 @@ async function removeIncomingPeer(req, res, expressWS) { const errMessage = messageHelper.createErrorMessage('No IP address specified.'); return res.json(errMessage); } - const justIP = ip.split(':')[0]; - const port = ip.split(':')[1] || 16127; const authorized = await verificationHelper.verifyPrivilege('adminandfluxteam', req); if (authorized === true) { - const closeResponse = await fluxNetworkHelper.closeIncomingConnection(justIP, port, expressWS); + const closeResponse = await fluxNetworkHelper.closeIncomingConnection(ip, expressWS); response = closeResponse; } else { response = messageHelper.errUnauthorizedMessage(); @@ -450,7 +437,6 @@ async function removeIncomingPeer(req, res, expressWS) { * To initiate and handle a connection. Opens a web socket and handles various events during connection. * @param {string} connection IP address (and port if applicable). */ -let socketPortsInformationActive = false; async function initiateAndHandleConnection(connection) { let ip = connection; let port = config.server.apiport; @@ -459,27 +445,13 @@ async function initiateAndHandleConnection(connection) { ip = connection.split(':')[0]; port = connection.split(':')[1]; } - let wsuri = `ws://${ip}:${port}/ws/flux/`; - if (!socketPortsInformationActive) { - const syncStatus = daemonServiceMiscRpcs.isDaemonSynced(); - const daemonHeight = syncStatus.data.height || 0; - if (daemonHeight >= config.socketPortsInformation) { - socketPortsInformationActive = true; - } - } - if (socketPortsInformationActive) { - const myIP = await fluxNetworkHelper.getMyFluxIPandPort(); - const myPort = myIP.split(':')[1] || 16127; - wsuri = `ws://${ip}:${port}/ws/flux/${myPort}`; - } - + const wsuri = `ws://${ip}:${port}/ws/flux/`; const websocket = new WebSocket(wsuri); - websocket.port = port; + websocket.onopen = () => { outgoingConnections.push(websocket); const peer = { ip, // can represent just one ip address, multiport - port, lastPingTime: null, latency: null, }; @@ -490,7 +462,7 @@ async function initiateAndHandleConnection(connection) { websocket.on('pong', () => { try { const curTime = new Date().getTime(); - const foundPeer = outgoingPeers.find((peer) => peer.ip === ip && peer.port === port); + const foundPeer = outgoingPeers.find((peer) => peer.ip === ip); if (foundPeer) { foundPeer.latency = Math.ceil((curTime - foundPeer.lastPingTime) / 2); } @@ -505,7 +477,7 @@ async function initiateAndHandleConnection(connection) { log.info(`Connection to ${connection} closed with code ${evt.code}`); outgoingConnections.splice(ocIndex, 1); } - const foundPeer = outgoingPeers.find((peer) => peer.ip === ip && peer.port === port); + const foundPeer = outgoingPeers.find((peer) => peer.ip === ip); if (foundPeer) { const peerIndex = outgoingPeers.indexOf(foundPeer); if (peerIndex > -1) { @@ -540,7 +512,7 @@ async function initiateAndHandleConnection(connection) { // incoming messages from outgoing connections const currentTimeStamp = Date.now(); // ms // check rate limit - const rateOK = fluxNetworkHelper.lruRateLimit(`${ip}:${port}`, 90); + const rateOK = fluxNetworkHelper.lruRateLimit(ip, 90); if (!rateOK) { return; // do not react to the message } @@ -558,15 +530,15 @@ async function initiateAndHandleConnection(connection) { const messageOK = await fluxCommunicationUtils.verifyOriginalFluxBroadcast(msgObj, undefined, currentTimeStamp); if (messageOK === true) { if (msgObj.data.type === 'zelappregister' || msgObj.data.type === 'zelappupdate' || msgObj.data.type === 'fluxappregister' || msgObj.data.type === 'fluxappupdate') { - handleAppMessages(msgObj, ip, port); + handleAppMessages(msgObj, ip); } else if (msgObj.data.type === 'fluxapprequest') { fluxCommunicationMessagesSender.respondWithAppMessage(msgObj, websocket); } else if (msgObj.data.type === 'fluxapprunning') { - handleAppRunningMessage(msgObj, ip, port); + handleAppRunningMessage(msgObj, ip); } else if (msgObj.data.type === 'fluxipchanged') { - handleIPChangedMessage(msgObj, ip, port); + handleIPChangedMessage(msgObj, ip); } else if (msgObj.data.type === 'fluxappremoved') { - handleAppRemovedMessage(msgObj, ip, port); + handleAppRemovedMessage(msgObj, ip); } else { log.warn(`Unrecognised message type of ${msgObj.data.type}`); } @@ -598,7 +570,7 @@ async function initiateAndHandleConnection(connection) { log.info(`Connection to ${connection} errord with code ${evt.code}`); outgoingConnections.splice(ocIndex, 1); } - const foundPeer = outgoingPeers.find((peer) => peer.ip === ip && peer.port === port); + const foundPeer = outgoingPeers.find((peer) => peer.ip === ip); if (foundPeer) { const peerIndex = outgoingPeers.indexOf(foundPeer); if (peerIndex > -1) { @@ -627,10 +599,9 @@ async function addPeer(req, res) { return res.json(errMessage); } const justIP = ip.split(':')[0]; - const port = ip.split(':')[1] || 16127; - const wsObj = outgoingConnections.find((client) => client._socket.remoteAddress === justIP && client.port === port); + const wsObj = outgoingConnections.find((client) => client._socket.remoteAddress === justIP); if (wsObj) { - const errMessage = messageHelper.createErrorMessage(`Already connected to ${justIP}:${port}`); + const errMessage = messageHelper.createErrorMessage(`Already connected to ${justIP}`); return res.json(errMessage); } const authorized = await verificationHelper.verifyPrivilege('adminandfluxteam', req); @@ -640,7 +611,7 @@ async function addPeer(req, res) { return res.json(message); } initiateAndHandleConnection(ip); - const message = messageHelper.createSuccessMessage(`Outgoing connection to ${ip}:${port} initiated`); + const message = messageHelper.createSuccessMessage(`Outgoing connection to ${ip} initiated`); return res.json(message); } catch (error) { log.error(error); @@ -677,23 +648,22 @@ async function addOutgoingPeer(req, res) { const errMessage = messageHelper.createErrorMessage(`Request ip ${remoteIP4} of ${remoteIP} doesn't match the ip: ${justIP} to connect.`); return res.json(errMessage); } - const port = ip.split(':')[1] || 16127; - const wsObj = outgoingConnections.find((client) => client._socket.remoteAddress === justIP && client.port === port); + const wsObj = outgoingConnections.find((client) => client._socket.remoteAddress === justIP); if (wsObj) { - const errMessage = messageHelper.createErrorMessage(`Already connected to ${justIP}:${port}`); + const errMessage = messageHelper.createErrorMessage(`Already connected to ${justIP}`); return res.json(errMessage); } const nodeList = await fluxCommunicationUtils.deterministicFluxList(); - const fluxNode = nodeList.find((node) => node.ip.split(':')[0] === ip && (node.ip.split(':')[1] || 16127) === port); + const fluxNode = nodeList.find((node) => node.ip === ip); if (!fluxNode) { - const errMessage = messageHelper.createErrorMessage(`FluxNode ${ip}:${port} is not confirmed on the network.`); + const errMessage = messageHelper.createErrorMessage(`FluxNode ${ip} is not confirmed on the network.`); return res.json(errMessage); } initiateAndHandleConnection(ip); - const message = messageHelper.createSuccessMessage(`Outgoing connection to ${ip}:${port} initiated`); + const message = messageHelper.createSuccessMessage(`Outgoing connection to ${ip} initiated`); return res.json(message); } catch (error) { log.error(error); @@ -747,7 +717,7 @@ async function fluxDiscovery() { const fluxNodeIndex = sortedNodeList.findIndex((node) => node.ip === myIP); log.info(`My node was found on index: ${fluxNodeIndex} of ${sortedNodeList.length} nodes`); const minDeterministicOutPeers = Math.min(sortedNodeList.length, 1.5 * config.fluxapps.minOutgoing); - // const minIncomingPeers = Math.min(sortedNodeList.length, 1.5 * config.fluxapps.minIncoming); + const minIncomingPeers = Math.min(sortedNodeList.length, 1.5 * config.fluxapps.minIncoming); log.info(`Current number of outgoing connections:${outgoingConnections.length}`); log.info(`Current number of incoming connections:${incomingConnections.length}`); // always try to connect to deterministic nodes @@ -756,11 +726,9 @@ async function fluxDiscovery() { for (let i = 1; i <= minDeterministicOutPeers; i += 1) { const fixedIndex = fluxNodeIndex + i < sortedNodeList.length ? fluxNodeIndex + i : fluxNodeIndex + i - sortedNodeList.length; const { ip } = sortedNodeList[fixedIndex]; - const ipInc = ip.split(':')[0]; - const portInc = ip.split(':')[1] || 16127; // additional precaution - const clientExists = outgoingConnections.find((client) => client._socket.remoteAddress === ipInc && client.port === portInc); - const clientIncomingExists = incomingConnections.find((client) => client._socket.remoteAddress.replace('::ffff:', '') === ipInc && client.port === portInc); + const clientExists = outgoingConnections.find((client) => client._socket.remoteAddress === ip); + const clientIncomingExists = incomingConnections.find((client) => client._socket.remoteAddress.replace('::ffff:', '') === ip); if (!clientExists && !clientIncomingExists) { deterministicPeerConnections = true; initiateAndHandleConnection(ip); @@ -772,13 +740,12 @@ async function fluxDiscovery() { for (let i = 1; i <= minDeterministicOutPeers; i += 1) { const fixedIndex = fluxNodeIndex - i > 0 ? fluxNodeIndex - i : sortedNodeList.length - fluxNodeIndex - i; const { ip } = sortedNodeList[fixedIndex]; - const ipInc = ip.split(':')[0]; - const portInc = ip.split(':')[1] || 16127; // additional precaution - const clientExists = outgoingConnections.find((client) => client._socket.remoteAddress === ipInc && client.port === portInc); - const clientIncomingExists = incomingConnections.find((client) => client._socket.remoteAddress.replace('::ffff:', '') === ipInc && client.port === portInc); - if (!clientExists && !clientIncomingExists) { + const clientIncomingExists = incomingConnections.find((client) => client._socket.remoteAddress.replace('::ffff:', '') === ip); + if (!clientIncomingExists) { deterministicPeerConnections = true; + const ipInc = ip.split(':')[0]; + const portInc = ip.split(':')[1] || 16127; // eslint-disable-next-line no-await-in-loop await serviceHelper.axiosGet(`http://${ipInc}:${portInc}/flux/addoutgoingpeer/${myIP}`).catch((error) => log.error(error)); } @@ -789,43 +756,40 @@ async function fluxDiscovery() { await serviceHelper.delay(500); let index = 0; - while ((outgoingConnections.length < 16 || [...new Set(outgoingConnections.map((client) => client._socket.remoteAddress))].length < 6) && index < 100) { // Max of 16 outgoing connections - 12 possible deterministic + min. 4 random + while (outgoingConnections.length < (minDeterministicOutPeers + minDeterministicOutPeers / 2) && index < 100) { // Max of 18 outgoing connections - 12 possible deterministic + min. 6 random index += 1; // eslint-disable-next-line no-await-in-loop const connection = await fluxNetworkHelper.getRandomConnection(); if (connection) { - const ipInc = connection.split(':')[0]; - const portInc = connection.split(':')[1] || 16127; + const ip = connection.split(':')[0]; // additional precaution - const sameConnectedIp = currentIpsConnTried.find((connectedIP) => connectedIP === ipInc); - const clientExists = outgoingConnections.find((client) => client._socket.remoteAddress === ipInc && client.port === portInc); - const clientIncomingExists = incomingConnections.find((client) => client._socket.remoteAddress.replace('::ffff:', '') === ipInc && client.port === portInc); + let sameConnectedIp = currentIpsConnTried.find((connectedIP) => connectedIP === ip); + let clientExists = outgoingConnections.find((client) => client._socket.remoteAddress === ip); + let clientIncomingExists = incomingConnections.find((client) => client._socket.remoteAddress.replace('::ffff:', '') === ip); if (!sameConnectedIp && !clientExists && !clientIncomingExists) { log.info(`Adding random Flux peer: ${connection}`); - currentIpsConnTried.push(connection); + currentIpsConnTried.push(ip); initiateAndHandleConnection(connection); } - } - // eslint-disable-next-line no-await-in-loop - await serviceHelper.delay(500); - } - index = 0; - while ((incomingConnections.length < 14 || [...new Set(incomingConnections.map((client) => client._socket.remoteAddress))].length < 6) && index < 100) { // Max of 14 outgoing connections - 12 possible deterministic + min. 2 random (we will get more random as others nodes have more random outgoing connections) - index += 1; - // eslint-disable-next-line no-await-in-loop - const connection = await fluxNetworkHelper.getRandomConnection(); - if (connection) { - const ipInc = connection.split(':')[0]; - const portInc = connection.split(':')[1] || 16127; - // additional precaution - const sameConnectedIp = currentIpsConnTried.find((connectedIP) => connectedIP === ipInc); - const clientExists = outgoingConnections.find((client) => client._socket.remoteAddress === ipInc && client.port === portInc); - const clientIncomingExists = incomingConnections.find((client) => client._socket.remoteAddress.replace('::ffff:', '') === ipInc && client.port === portInc); - if (!sameConnectedIp && !clientExists && !clientIncomingExists) { - log.info(`Asking random Flux ${connection} to add us as a peer`); - currentIpsConnTried.push(connection); + // Max of 8 incoming connections - 8 possible deterministic + x random if needed; + // We can have more incoming connections as it will be outgoing connections from other nodes + random + // we only add randoming incoming peers if currently it's bellow minimum + if (incomingConnections.length < minIncomingPeers) { // eslint-disable-next-line no-await-in-loop - await serviceHelper.axiosGet(`http://${ipInc}:${portInc}/flux/addoutgoingpeer/${myIP}`).catch((error) => log.error(error)); + const connectionInc = await fluxNetworkHelper.getRandomConnection(); + if (connectionInc) { + const ipInc = connectionInc.split(':')[0]; + const portInc = connectionInc.split(':')[1] || 16127; + // additional precaution + sameConnectedIp = currentIpsConnTried.find((connectedIP) => connectedIP === ipInc); + clientExists = outgoingConnections.find((client) => client._socket.remoteAddress === ipInc); + clientIncomingExists = incomingConnections.find((client) => client._socket.remoteAddress.replace('::ffff:', '') === ipInc); + if (!sameConnectedIp && !clientExists && !clientIncomingExists) { + log.info(`Asking random Flux ${connectionInc} to add us as a peer`); + // eslint-disable-next-line no-await-in-loop + await serviceHelper.axiosGet(`http://${ipInc}:${portInc}/flux/addoutgoingpeer/${myIP}`).catch((error) => log.error(error)); + } + } } } // eslint-disable-next-line no-await-in-loop diff --git a/services/fluxCommunicationMessagesSender.js b/services/fluxCommunicationMessagesSender.js index bf528111..61bfe5ca 100644 --- a/services/fluxCommunicationMessagesSender.js +++ b/services/fluxCommunicationMessagesSender.js @@ -39,7 +39,7 @@ async function sendToAllPeers(data, wsList) { if (!data) { const pingTime = new Date().getTime(); client.ping('flux'); // do ping with flux str instead - const foundPeer = outgoingPeers.find((peer) => peer.ip === client._socket.remoteAddress && peer.port === client.port); + const foundPeer = outgoingPeers.find((peer) => peer.ip === client._socket.remoteAddress); if (foundPeer) { foundPeer.lastPingTime = pingTime; } @@ -53,11 +53,10 @@ async function sendToAllPeers(data, wsList) { removals.push(client); try { const ip = client._socket.remoteAddress; - const { port } = client._socket; - const foundPeer = outgoingPeers.find((peer) => peer.ip === ip && peer.port === port); + const foundPeer = outgoingPeers.find((peer) => peer.ip === ip); ipremovals.push(foundPeer); // eslint-disable-next-line no-use-before-define - fluxNetworkHelper.closeConnection(ip, port); + fluxNetworkHelper.closeConnection(ip); } catch (err) { log.error(err); } @@ -110,10 +109,9 @@ async function sendToAllIncomingConnections(data, wsList) { removals.push(client); try { const ip = client._socket.remoteAddress; - const { port } = client; - const foundPeer = incomingPeers.find((peer) => peer.ip === ip && peer.port === port); + const foundPeer = incomingPeers.find((peer) => peer.ip === ip); ipremovals.push(foundPeer); - fluxNetworkHelper.closeIncomingConnection(ip, port, [], client); // this is wrong + fluxNetworkHelper.closeIncomingConnection(ip, [], client); // this is wrong } catch (err) { log.error(err); } diff --git a/services/fluxNetworkHelper.js b/services/fluxNetworkHelper.js index bc864be6..9fa08d29 100644 --- a/services/fluxNetworkHelper.js +++ b/services/fluxNetworkHelper.js @@ -457,22 +457,21 @@ async function getRandomConnection() { /** * To close an outgoing connection. * @param {string} ip IP address. - * @param {string} port node API port. * @returns {object} Message. */ -async function closeConnection(ip, port) { +async function closeConnection(ip) { if (!ip) return messageHelper.createWarningMessage('To close a connection please provide a proper IP number.'); - const wsObj = outgoingConnections.find((client) => client._socket.remoteAddress === ip && client.port === port); + const wsObj = outgoingConnections.find((client) => client._socket.remoteAddress === ip); if (!wsObj) { - return messageHelper.createWarningMessage(`Connection to ${ip}:${port} does not exists.`); + return messageHelper.createWarningMessage(`Connection to ${ip} does not exists.`); } const ocIndex = outgoingConnections.indexOf(wsObj); - const foundPeer = outgoingPeers.find((peer) => peer.ip === ip && peer.port === port); + const foundPeer = outgoingPeers.find((peer) => peer.ip === ip); if (ocIndex === -1) { - return messageHelper.createErrorMessage(`Unable to close connection ${ip}:${port}. Try again later.`); + return messageHelper.createErrorMessage(`Unable to close connection ${ip}. Try again later.`); } wsObj.close(1000, 'purpusfully closed'); - log.info(`Connection to ${ip}:${port} closed`); + log.info(`Connection to ${ip} closed`); outgoingConnections.splice(ocIndex, 1); if (foundPeer) { const peerIndex = outgoingPeers.indexOf(foundPeer); @@ -480,36 +479,35 @@ async function closeConnection(ip, port) { outgoingPeers.splice(peerIndex, 1); } } - return messageHelper.createSuccessMessage(`Outgoing connection to ${ip}:${port} closed`); + return messageHelper.createSuccessMessage(`Outgoing connection to ${ip} closed`); } /** * To close an incoming connection. * @param {string} ip IP address. - * @param {string} port node API port. * @param {object} expressWS Express web socket. * @param {object} clientToClose Web socket for client to close. * @returns {object} Message. */ -async function closeIncomingConnection(ip, port, expressWS, clientToClose) { +async function closeIncomingConnection(ip, expressWS, clientToClose) { if (!ip) return messageHelper.createWarningMessage('To close a connection please provide a proper IP number.'); const clientsSet = expressWS.clients || []; let wsObj = null || clientToClose; clientsSet.forEach((client) => { - if (client._socket.remoteAddress === ip && client.port === port) { + if (client._socket.remoteAddress === ip) { wsObj = client; } }); if (!wsObj) { - return messageHelper.createWarningMessage(`Connection from ${ip}:${port} does not exists.`); + return messageHelper.createWarningMessage(`Connection from ${ip} does not exists.`); } const ocIndex = incomingConnections.indexOf(wsObj); - const foundPeer = incomingPeers.find((peer) => peer.ip === ip && peer.port === port); + const foundPeer = incomingPeers.find((peer) => peer.ip === ip); if (ocIndex === -1) { - return messageHelper.createErrorMessage(`Unable to close incoming connection ${ip}:${port}. Try again later.`); + return messageHelper.createErrorMessage(`Unable to close incoming connection ${ip}. Try again later.`); } wsObj.close(1000, 'purpusfully closed'); - log.info(`Connection from ${ip}:${port} closed`); + log.info(`Connection from ${ip} closed`); incomingConnections.splice(ocIndex, 1); if (foundPeer) { const peerIndex = incomingPeers.indexOf(foundPeer); @@ -517,7 +515,7 @@ async function closeIncomingConnection(ip, port, expressWS, clientToClose) { incomingPeers.splice(peerIndex, 1); } } - return messageHelper.createSuccessMessage(`Incoming connection to ${ip}:${port} closed`); + return messageHelper.createSuccessMessage(`Incoming connection to ${ip} closed`); } /** @@ -656,10 +654,6 @@ function isCommunicationEstablished(req, res) { message = messageHelper.createErrorMessage(`Not enough outgoing connections established to Flux network. Minimum required ${config.fluxapps.minOutgoing} found ${outgoingPeers.length}`); } else if (incomingPeers.length < config.fluxapps.minIncoming) { // depends on other nodes successfully connecting to my node, todo enforcement message = messageHelper.createErrorMessage(`Not enough incoming connections from Flux network. Minimum required ${config.fluxapps.minIncoming} found ${incomingPeers.length}`); - } else if ([...new Set(outgoingPeers.map((peer) => peer.ip))].length < config.fluxapps.minUniqueIpsOutgoing) { // depends on other nodes successfully connecting to my node, todo enforcement - message = messageHelper.createErrorMessage(`Not enough outgoing unique ip's connections established to Flux network. Minimum required ${config.fluxapps.minUniqueIpsOutgoing} found ${[...new Set(outgoingPeers.map((peer) => peer.ip))].length}`); - } else if ([...new Set(incomingPeers.map((peer) => peer.ip))].length < config.fluxapps.minUniqueIpsIncoming) { // depends on other nodes successfully connecting to my node, todo enforcement - message = messageHelper.createErrorMessage(`Not enough incoming unique ip's connections from Flux network. Minimum required ${config.fluxapps.minUniqueIpsIncoming} found ${[...new Set(incomingPeers.map((peer) => peer.ip))].length}`); } else { message = messageHelper.createSuccessMessage('Communication to Flux network is properly established'); } @@ -745,7 +739,6 @@ async function checkMyFluxAvailability(retryNumber = 0) { await serviceHelper.delay(2 * 1000); // await two seconds const newIP = await getMyFluxIPandPort(); // to update node Ip on FluxOs; if (newIP && newIP !== oldIP) { // double check - log.info('FluxBench reported a new IP'); return true; } } if (benchMyIP && benchMyIP.split(':')[0] === myIP.split(':')[0]) { diff --git a/services/utils/establishedConnections.js b/services/utils/establishedConnections.js index bb4e8618..adda55f7 100644 --- a/services/utils/establishedConnections.js +++ b/services/utils/establishedConnections.js @@ -1,7 +1,7 @@ const incomingConnections = []; // websocket list -const incomingPeers = []; // array of objects containing ip, port +const incomingPeers = []; // array of objects containing ip const outgoingConnections = []; // websocket list -const outgoingPeers = []; // array of objects containing ip, port, latency, lastPingTime +const outgoingPeers = []; // array of objects containing ip, latency, lastPingTime module.exports = { incomingConnections,