diff --git a/src/node_modules/_datdot-service-helpers/done-task-cleanup.js b/src/node_modules/_datdot-service-helpers/done-task-cleanup.js index be8c0159..5bf1598a 100644 --- a/src/node_modules/_datdot-service-helpers/done-task-cleanup.js +++ b/src/node_modules/_datdot-service-helpers/done-task-cleanup.js @@ -49,27 +49,25 @@ async function done_task_cleanup ({ role, topic, remotestringkey, peers, state, if (tasks[stringtopic].connections[remotestringkey] === 'in cleanup') return set_status_for_connection({ status: 'in cleanup', tasks, stringtopic, remotestringkey}) is_target_task = true - // stringtopic is one time used & is unique BUT !!!! - // for performance-attesting we connect to many targets on same general topic - + // roles: attester2encoder, attester2hoster, performance_attester, storage_attester - // performance_attester might also be hoster (or encoder) for same feed - - // performance attester calls done task multiple times for same performance challenge - // first time we remove the task, so we need to check if tasks[stringtopic] in all the future calls to avoid err - - /*if (tasks[stringtopic]) */remove_from_roles({ tasks, role, stringtopic, log }) - + remove_from_roles({ tasks, role, stringtopic, log }) + remove_from_target_tasks({ targets, stringtopic, remotestringkey, log }) // we have always only one task per stringtopic for targets const target_tasks = get_target_tasks({ targets, remotestringkey, log }) if (!target_tasks.length) { - log({ type: 'cleanup', data: { text: 'remove from targets', role, remotestringkey, stringtopic }}) + log({ type: 'cleanup', data: { text: 'remove from targets', role, remotestringkey, stringtopic }}) remove_from_targets({ targets, remotestringkey, log }) // remove_from_registry({ sockets, stringtopic, remotestringkey, log }) } - + swarm.leavePeer(b4a.from(remotestringkey, 'hex')) // stop attempting direct connections to the peer + // stringtopic is one time used & is unique BUT !!!! + // for performance-attesting we connect to many targets on same general topic + // performance_attester might also be hoster (or encoder) for same feed + // performance attester calls done task multiple times for same performance challenge + // first time we remove the task, so we need to check if tasks[stringtopic] in all the future calls to avoid err if (role === 'performance_attester') { remove_from_task_connections({ tasks, stringtopic, remotestringkey, log }) // TODO: leavePeer from topic, even if there still is a task for other stuff (like hoster role etc.) @@ -124,7 +122,6 @@ async function done_task_cleanup ({ role, topic, remotestringkey, peers, state, async function clean_general_task ({ state, role, stringtopic, peers, log }) { const { sockets, feeds, tasks, swarm } = state const topic = b4a.from(stringtopic, 'hex') - var connections_for_socket remove_from_roles({ tasks, role, stringtopic, log }) @@ -153,12 +150,13 @@ async function clean_general_task ({ state, role, stringtopic, peers, log }) { const topic_tasks = get_tasks_count_for_topic({ tasks, stringtopic, log }) if (!topic_tasks) { - remove_from_tasks({ tasks, stringtopic, log }) + remove_from_tasks({ tasks, stringtopic, log }) close_feeds({ feeds, field: stringtopic, log }) swarm.leave(topic) // stop getting new connections on topic. leave will not close any existing connections. + log({ type: 'cleanup', data: { text: 'swarm left', stringtopic }}) } else { - await try_refresh_discovery({ swarm, topic, tasks: tasks[stringtopic].roles, log }) - } + await try_refresh_discovery({ swarm, topic, tasks: tasks[stringtopic].roles, log }) + } } // STRINGTOPIC NEEDED @@ -227,12 +225,16 @@ function remove_from_targets ({ targets, remotestringkey, log }) { delete targets[remotestringkey] } function close_streams_and_channels ({ sockets, remotestringkey, log }) { - const { socket, replicationStream, channel, mux } = sockets[remotestringkey] - if (channel) channel.close() - mux.unpair({ protocol: 'datdot/alpha' }) - mux.destroy() - socket.destroy() - replicationStream.destroy() + try { + const { socket, replicationStream, channel, mux } = sockets[remotestringkey] + if (channel) channel.close() + mux.unpair({ protocol: 'datdot/alpha' }) + mux.destroy() + socket.destroy() + replicationStream.destroy() + } catch (err) { + log({ type: 'cleanup', data: { type: `error: close_streams_and_channels`, remotestringkey, err } }) + } } function get_target_tasks ({ targets, remotestringkey, log }) { return Object.keys(targets[remotestringkey].tasks) @@ -270,5 +272,5 @@ function notify_to_close_task ({ state, remotestringkey, stringtopic, is_target_ } } function set_status_for_connection ({ status, tasks, stringtopic, remotestringkey}) { - tasks[stringtopic].connections[remotestringkey] = status + tasks[stringtopic].connections[remotestringkey] = { status } } \ No newline at end of file diff --git a/src/node_modules/_datdot-service-helpers/hyper.js b/src/node_modules/_datdot-service-helpers/hyper.js index 42bb0f8c..f7ba6c9d 100644 --- a/src/node_modules/_datdot-service-helpers/hyper.js +++ b/src/node_modules/_datdot-service-helpers/hyper.js @@ -89,7 +89,7 @@ async function _new_task (account, { newfeed = true, field, feedkey, topic, log if (!field) field = stringtopic add_to_feeds({ feeds, field, stringkey, feed, log }) } - log({ type: 'hyper', data: { text: 'New feed', stringkey, stringtopic }}) + log({ type: 'hyper', data: { text: 'New feed', stringkey, field, stringtopic }}) } } resolve({ feed }) @@ -171,6 +171,9 @@ async function _connect ( account, bootstrap, { swarm_opts, targets: Targets, on // if (!channel.opened) channel.open() const count = sockets[remotestringkey].count log({ type: 'store connect', data: { text: 'Target - existing socket', remotestringkey, connections_count: count, channel: channel.opened }}) + // calling handle_target_connection will trigger onpeer cb for all the target topics for this peer + // so make sure connection is still alive because when onpeer is called it is assumed there is a connection + // and task can be executed handle_target_connection({ account, remotestringkey, log }) // feed will be returned with ontarget callback } else { // Join peer (if no socket connection && no discovery yet) @@ -195,63 +198,78 @@ async function _connect ( account, bootstrap, { swarm_opts, targets: Targets, on -----------------------------------------------------------------------------------------------*/ function onconnection ({ account }) { return async (connection, peerInfo) => { - return new Promise(async (resolve, reject) => { - try { - const { state } = account - const { targets, tasks, sockets, swarm } = state - const remotestringkey = peerInfo.publicKey.toString('hex') - const conn_log = account.log.sub(`<-onconnection: me: ${account.noisePublicKey.toString('hex').substring(0,5)}, peer: ${remotestringkey.substring(0,5)} `) - conn_log({ type: 'onconnection', data: { text: 'onconnection callback', remotestringkey, /* isInitiator: connection.isInitiator */ } }) - - // return if peer is trying to connect while new_channel (make_stream_and_channel) is still in progress - // @TODO: we are trying to fix problem where one peer gets onclose and the other doesn't - // the one that doesn't before the timeout gets a new onconnection - // and tries to reuse the existing channel, while the other peer cleaned the old channel - // and is on new onconnection creating a new channel - // we need to return until setTimeout removes old channel - // if (sockets[remotestringkey] && !sockets[remotestringkey].channel.opened) return - - const tid = setTimeout(() => { - conn_log({ type: 'onconnection', data: { text: 'ghost connection timeout', remotestringkey }}) + const { state } = account + const { targets, tasks, sockets, swarm } = state + const remotestringkey = peerInfo.publicKey.toString('hex') + const conn_log = account.log.sub(`<-onconnection: me: ${account.noisePublicKey.toString('hex').substring(0,5)}, peer: ${remotestringkey.substring(0,5)} `) + + const all_topics = get_tasks({ tasks, log: conn_log }) + conn_log({ + type: 'onconnection', + data: { + text: 'onconnection callback', + remotestringkey, + existing_socket: sockets[remotestringkey] ? 'yes' : 'no', + roles: all_topics.map(stringtopic => JSON.stringify(tasks[stringtopic].roles)), + conn: all_topics.map(stringtopic => [stringtopic, JSON.stringify(Object.keys(tasks[stringtopic].connections))]), + sockets: Object.keys(sockets).map(key => [ key, sockets[key].count ]) + } + }) + + try { + // return if peer is trying to connect while new_channel (make_stream_and_channel) is still in progress + // @TODO: we are trying to fix problem where one peer gets onclose and the other doesn't + // the one that doesn't before the timeout gets a new onconnection + // and tries to reuse the existing channel, while the other peer cleaned the old channel + // and is on new onconnection creating a new channel + // we need to return until setTimeout removes old channel + // if (sockets[remotestringkey] && !sockets[remotestringkey].channel.opened) return + + const tid = setTimeout(() => { + conn_log({ type: 'onconnection', data: { text: 'ghost connection timeout', remotestringkey }}) + if (sockets[remotestringkey] && !sockets[remotestringkey].count) { // we check for count to see if the connection was a match for any of the tasks (count would then not be a zero) // if there was no match or process stopped before the replication, then we close the streams - if (sockets[remotestringkey] && !sockets[remotestringkey].count) { - close_streams_and_channels({ sockets, remotestringkey, log: conn_log }) - delete sockets[remotestringkey] - conn_log({ type: 'onconnection', data: { text: 'ghost connection removed', remotestringkey }}) - } - }, DEFAULT_TIMEOUT) - - if (!sockets[remotestringkey]) { - await make_stream_and_channel({ account, tid, connection, remotestringkey, log: conn_log }) - resolve() - } - // this is the target - if (targets[remotestringkey]) { - handle_target_connection({ account, remotestringkey, log: conn_log }) - resolve() + close_streams_and_channels({ sockets, remotestringkey, log: conn_log }) + delete sockets[remotestringkey] + conn_log({ type: 'onconnection', data: { text: 'ghost connection removed', remotestringkey }}) } - // is peer a client? - const peer_topics = peerInfo.topics - if (!peer_topics.length) resolve() - - // peer is a client + }, DEFAULT_TIMEOUT) + + if (!sockets[remotestringkey]) { + await make_stream_and_channel({ account, tid, connection, remotestringkey, log: conn_log }) + } + // this is the target + if (targets[remotestringkey]) { clearTimeout(tid) - for (const topic of peer_topics) { - const stringtopic = topic.toString('hex') // only shows topics derived from a feed.discoveryKey - if ( - targets[remotestringkey]?.tasks[stringtopic] || // already handled as target connection - !tasks[stringtopic] || // no task - // TODO: this only checks if task is being handled by this particular peer - tasks[stringtopic]?.connections[remotestringkey] // task is already being handled - ) continue - handle_matching_topic({ stringtopic, remotestringkey, is_server: false, account, log: conn_log }) - } - } catch (err) { - conn_log({ type: 'Error', data: { text: 'Error: in onconnection', err }}) - reject(err) + handle_target_connection({ account, remotestringkey, log: conn_log }) + // do not return here, further code needs to run too because of ondiscovery cb } - }) + // is peer a client? + const peer_topics = peerInfo.topics + if (!peer_topics.length) { + clearTimeout(tid) + return + } + // peer is a client + for (const topic of peer_topics) { + const stringtopic = topic.toString('hex') // only shows topics derived from a feed.discoveryKey + const discovery = swarm.status(topic) + if ( + !discovery || // peer left the topic, but discovery hasn't yet been updated + !discovery.isClient || // peer is not a client anymore, but discovery mode hasn't been updated yet + targets[remotestringkey]?.tasks[stringtopic] || // already handled as target connection + !tasks[stringtopic] || // no task + // TODO: this only checks if task is being handled by this particular peer + tasks[stringtopic]?.connections[remotestringkey] // task is already being handled + ) continue + clearTimeout(tid) + handle_matching_topic({ stringtopic, remotestringkey, is_server: false, account, log: conn_log }) + } + } catch (err) { + conn_log({ type: 'Error', data: { text: 'Error: in onconnection', err }}) + throw(err) + } } } @@ -259,23 +277,27 @@ async function make_stream_and_channel ({ account, tid, connection, remotestring log({ type: 'onconnection', data: { text: 'loading replication stream and channel', remotestringkey } }) return new Promise(async (resolve, reject) => { try { - const { tasks, sockets, targets } = account.state + const { tasks, sockets, targets, swarm } = account.state var replicationStream log({ type: 'onconnection', data: { text: 'new connection', remotestringkey, isInitiator: connection.isInitiator } }) replicationStream = hypercore.createProtocolStream(connection, { ondiscoverykey }) mux = hypercore.getProtocolMuxer(replicationStream) - add_to_sockets({ sockets, remotestringkey, connection, replicationStream, mux, tid, log }) + add_to_sockets({ sockets, remotestringkey, connection, replicationStream, mux, log }) add_stream_listeners({ state: account.state, remotestringkey, log }) const channel = await new_channel({ account, remotestringkey, log }) if (!channel) return log({ type: 'exchange-feedkey', data: { text: 'Error: no channel returned', remotestringkey, existing: sockets[remotestringkey].channel ? 'yes' : 'no' }}) - + resolve({ replicationStream }) - + // peer is a server function ondiscoverykey (discoverykey) { // client hasn't replicated the hypercores so they are asking for the 'discovery-key' event const stringtopic = discoverykey.toString('hex') // only shows topics derived from a feedn.discoveryKey + log({ type: 'ondiscovery', data: { text: 'ondiscovery cb', remotestringkey, stringtopic } }) + const discovery = swarm.status(discoverykey) if ( + !discovery || // peer left the topic, but discovery hasn't yet been updated + !discovery.isServer || // peer is not a server anymore, but discovery mode hasn't been updated yet targets[remotestringkey]?.tasks[stringtopic] || // already handled as target connection !tasks[stringtopic] || // no task tasks[stringtopic]?.connections[remotestringkey] // task is already being handled @@ -320,11 +342,16 @@ function handle_matching_topic ({ stringtopic, remotestringkey, is_server, accou function replicate_feeds ({ state, stringtopic, remotestringkey, replicationStream, log }) { try { - const { swarm, feeds, sockets } = state - const feedkeys = Object.keys(feeds[stringtopic]) + const { swarm, feeds, targets } = state const status = swarm.status(b4a.from(stringtopic, 'hex')) - if (!status) return - for (const key of feedkeys) { + if (!status) return + var to_replicate + const feedkeys = Object.keys(feeds[stringtopic]) + targets[remotestringkey]?.tasks[stringtopic] ? + to_replicate = feedkeys.filter(key => !targets[remotestringkey]?.tasks[stringtopic]?.feed.key.toString('hex')) + : + to_replicate = feedkeys + for (const key of to_replicate) { const feed = feeds[stringtopic][key].feed if (!feed.opened || feed.closing) { log({ type: 'replicate', data: { text: 'error: feed closing or closed', open: feed.opened, closing: feed.closing, stringtopic, feedkey: feed.key.toString('hex') } }) @@ -341,11 +368,10 @@ function replicate_feeds ({ state, stringtopic, remotestringkey, replicationStre async function handle_target_connection ({ account, remotestringkey, log }) { //get feedkey from or send to a target peer (on each custom topic) const { state } = account const { targets, sockets, tasks } = state - const { tid, channel, replicationStream } = sockets[remotestringkey] + const { channel, replicationStream } = sockets[remotestringkey] const string_msg = channel.messages[0] // get string message type for sending strings through the mux channel with that peer (channel.addMessage(...)) log({ type: 'onconnection', data: { text: `handle target conn`, remotestringkey, target_tasks: Object.keys(targets[remotestringkey].tasks).length } }) - clearTimeout(tid) try { log({ type: 'exchange-feedkey', data: { text: 'load channel', remotestringkey, opened: channel.opened }}) @@ -365,7 +391,7 @@ async function handle_target_connection ({ account, remotestringkey, log }) { // // check for socket too because in case of 2 failed attempts in new_channel onclose, // we will have connections[remotestringkey)] but we will remove sockets[remotestringkey] // so we can create new replicationStream when new connection between same peers is established - if (connections[remotestringkey] === 'in progress' && sockets[remotestringkey]) { + if (connections[remotestringkey] && sockets[remotestringkey]) { log({ type: 'exchange-feedkey', data: { text: 'this task already in progress', remotestringkey }}) continue } @@ -378,6 +404,20 @@ async function handle_target_connection ({ account, remotestringkey, log }) { // if (msg.send) { // see if we need to send or to receive a message log({ type: 'protomux', data: { text: `send feedkey` } }) + const tid = setTimeout(() => { + log({ type: 'protomux', data: { text: 'error: timeout - feedkey-ack not received', remotestringkey, stringtopic }}) + set_status_for_connection({ status: 'feedkey-ack-timeout', tasks, stringtopic, remotestringkey}) + const tasks_with_peer_in_progress = Object.keys(tasks).filter(stringtopic => { + const status = tasks[stringtopic]?.connections[remotestringkey]?.status + return status && status === 'in progress' + }) + if (sockets[remotestringkey] && !tasks_with_peer_in_progress) { + close_streams_and_channels({ sockets, remotestringkey, log }) + delete sockets[remotestringkey] + log({ type: 'onconnection', data: { text: 'ghost connection removed', remotestringkey }}) + } + }, DEFAULT_TIMEOUT) + tasks[stringtopic].connections[remotestringkey].tid = tid string_msg.send(JSON.stringify({ type: 'feedkey', feedkey: feed.key.toString('hex'), stringtopic })) } if (role === 'performance_attester') { @@ -393,17 +433,21 @@ async function handle_target_connection ({ account, remotestringkey, log }) { // } function subscribe_to_topic_messages ({ account, string_msg, stringtopic, remotestringkey, log }) { - var registry = string_msg.registry - const entry = registry.get(stringtopic) - log({ type: 'onconnection', data: { text: `registry entry in target conn`, stringtopic, remotestringkey, entry: entry ? 'yes' : 'no' } }) - if (!entry) { - string_msg.registry.set(stringtopic, { onmessage }) - } else { - const { msg_queue } = entry - if (msg_queue) msg_queue.forEach(message => onmessage({ account, message, remotestringkey, string_msg })) - entry.msg_queue = void 0 - entry.onmessage = onmessage - } + try { + var registry = string_msg.registry + const entry = registry.get(stringtopic) + log({ type: 'registry', data: { text: `add to registry and subscribe to topic messages`, stringtopic, remotestringkey, entry: entry ? 'yes' : 'no' } }) + if (!entry) { + string_msg.registry.set(stringtopic, { onmessage }) + } else { + const { msg_queue } = entry + if (msg_queue) msg_queue.forEach(message => onmessage({ account, message, remotestringkey, string_msg })) + entry.msg_queue = void 0 + entry.onmessage = onmessage + } + } catch (err) { + log({ type: 'registry', data: { text: 'error in subscribe to topic messages', remotestringkey, stringtopic, err }}) + } } function new_channel ({ account, remotestringkey, log }) { @@ -495,6 +539,10 @@ async function onmessage ({ account, message, remotestringkey, string_msg }) { replicate_to_target({ state, ontarget, feed, replicationStream, remotestringkey, stringtopic, log }) } else if (type === 'ack-feedkey') { + const { tid } = tasks[stringtopic].connections[remotestringkey] + console.log({ name: log.path, conn: tasks[stringtopic].connections[remotestringkey] }) + clearTimeout(tid) + delete tasks[stringtopic].connections[remotestringkey].tid feed = targets[remotestringkey].tasks[stringtopic].feed log({ type: 'onmessage protomux', data: { text: 'received ack', feedkey: feed.key.toString('hex') }}) replicate_to_target({ state, ontarget, feed, replicationStream, remotestringkey, stringtopic, log }) @@ -569,9 +617,9 @@ function add_to_targets ({ targets, remotestringkey, stringtopic, role, ontarget else targets[remotestringkey].tasks[stringtopic] = { ontarget, role, feed, msg, done, log } } -function add_to_sockets ({ sockets, remotestringkey, connection, replicationStream, mux, tid, log }) { +function add_to_sockets ({ sockets, remotestringkey, connection, replicationStream, mux, log }) { log({ type: 'hyper', data: { text: 'Adding to sockets', remotestringkey, does_exist: sockets[remotestringkey] ? 'yes' : 'no' }}) - sockets[remotestringkey] = { socket: connection, replicationStream, tid, mux } + sockets[remotestringkey] = { socket: connection, replicationStream, mux } // count is undefined and will be set to 1 once there is a task match } @@ -664,35 +712,36 @@ async function handle_replicationStream_onclose ({ state, remotestringkey, log } decrease_socket_count({ sockets, remotestringkey, log }) const { count } = sockets[remotestringkey] if (!count) { - close_streams_and_channels({ sockets, remotestringkey, log }) + close_streams_and_channels({ sockets, remotestringkey, log }) delete sockets[remotestringkey] } if (tasks[stringtopic]?.connections[remotestringkey]) { - set_status_for_connection({ status: 'cleaned up', tasks, stringtopic, remotestringkey}) + set_status_for_connection({ status: 'cleaned up', tasks, stringtopic, remotestringkey}) } } // --------- NO TARGET/GENERAL SWARM ------------- else if ( - tasks[stringtopic].roles['author']?.count || + tasks[stringtopic].roles['author']?.count || tasks[stringtopic].roles['hoster']?.count || tasks[stringtopic].roles['hoster2author']?.count ) { - // TASKS - var role - if (tasks[stringtopic].roles['author']?.count) role = 'author' - else if (tasks[stringtopic].roles['hoster']?.count) role = 'hoster' - else if (tasks[stringtopic].roles['hoster2author']?.count) role = 'hoster2author' - - await clean_general_task({ state, role, stringtopic, peers: [remotestringkey], log }) - if (tasks[stringtopic]?.connections[remotestringkey]) { - set_status_for_connection({ status: 'cleaned up', tasks, stringtopic, remotestringkey}) - } - } else { - // cleanup was triggered due to connection error, peers will try to reconnect and in order to not think - // task is still in progres, we set it to '' - // if peer disconnected before the connection was added to task connections and sockets, it will not get to handle done task, - // but if it was, it will trigger this function - set_status_for_connection({ status: '', tasks, stringtopic, remotestringkey}) + // TASKS + var role + if (tasks[stringtopic].roles['author']?.count) role = 'author' + else if (tasks[stringtopic].roles['hoster']?.count) role = 'hoster' + else if (tasks[stringtopic].roles['hoster2author']?.count) role = 'hoster2author' + + await clean_general_task({ state, role, stringtopic, peers: [remotestringkey], log }) + if (tasks[stringtopic]?.connections[remotestringkey]) { + set_status_for_connection({ status: 'cleaned up', tasks, stringtopic, remotestringkey}) + } + } else { + // cleanup was triggered due to connection error, peers will try to reconnect and in order to not think + // task is still in progres, we set it to null + // if peer disconnected before the connection was added to task connections and sockets, it will not get to handle done task, + // but if it was, it will trigger this function + log({ type: 'handle cleanup', data: { text: 'remove from targets', role, remotestringkey, stringtopic }}) + set_status_for_connection({ status: null, tasks, stringtopic, remotestringkey}) } } @@ -742,7 +791,7 @@ account = { tasks: { // general swarm tasks or target tasks [stringtopic]: { connections: { - [remotestringkey]: 'in progress' or 'in cleanup' + [remotestringkey]: { status: 'in progress' or 'in cleanup', tid } }, roles: { author: { count: 0, onpeer: [], done: [] } // { server: true } diff --git a/src/node_modules/_datdot-service-helpers/try-refresh-discovery.js b/src/node_modules/_datdot-service-helpers/try-refresh-discovery.js index 576c2391..e77e2b74 100644 --- a/src/node_modules/_datdot-service-helpers/try-refresh-discovery.js +++ b/src/node_modules/_datdot-service-helpers/try-refresh-discovery.js @@ -34,6 +34,9 @@ async function try_refresh_discovery ({ swarm, topic, tasks, log }) { else if (no_client && any_server_only) mode = { server: true, client: false } try { + // NOTE: things don't get refreshed on the DHT in real time, so old mode + // might still be active for a few more seconds + // that is why we add additional checks in the onconnection cb const discovery = swarm.status(topic) if (discovery.isServer === mode.server && discovery.isClient === mode.client) return resolve(mode) await discovery.refresh(mode)