From 9b0f1e855aa3a1f7b9aec3a4c726568d37595c28 Mon Sep 17 00:00:00 2001 From: Danish Arora <35004822+danisharora099@users.noreply.github.com> Date: Thu, 18 Jul 2024 00:41:21 +0530 Subject: [PATCH] feat: validate messages for individual filter nodes & perform renewals (#2057) * feat: validate messages for individual filter nodes & perform renewals * chore: fix spell check * chore: use a max threshold before peer renewal * chore: switch from a validation cycle timer to adhoc validation * chore: add test * fix: test * chore: address comments * fix: renewal without a new peer available * chore: validating messages should be non-blocking * chore: minor improvements * chore: rm only * chore: fix test --- packages/core/src/lib/filter/index.ts | 9 +- packages/interfaces/src/filter.ts | 1 + packages/sdk/src/protocols/base_protocol.ts | 7 +- packages/sdk/src/protocols/filter.ts | 136 +++++++++++++++--- .../tests/filter/peer_management.spec.ts | 62 +++++++- 5 files changed, 191 insertions(+), 24 deletions(-) diff --git a/packages/core/src/lib/filter/index.ts b/packages/core/src/lib/filter/index.ts index 11cada6b70..78d1f61182 100644 --- a/packages/core/src/lib/filter/index.ts +++ b/packages/core/src/lib/filter/index.ts @@ -35,7 +35,8 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore { constructor( private handleIncomingMessage: ( pubsubTopic: PubsubTopic, - wakuMessage: WakuMessage + wakuMessage: WakuMessage, + peerIdStr: string ) => Promise, libp2p: Libp2p, options?: ProtocolCreateOptions @@ -78,7 +79,11 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore { return; } - await this.handleIncomingMessage(pubsubTopic, wakuMessage); + await this.handleIncomingMessage( + pubsubTopic, + wakuMessage, + connection.remotePeer.toString() + ); } }).then( () => { diff --git a/packages/interfaces/src/filter.ts b/packages/interfaces/src/filter.ts index 4c56a3d680..2b770b30c1 100644 --- a/packages/interfaces/src/filter.ts +++ b/packages/interfaces/src/filter.ts @@ -16,6 +16,7 @@ import type { IReceiver } from "./receiver.js"; export type SubscribeOptions = { keepAlive?: number; pingsBeforePeerRenewed?: number; + maxMissedMessagesThreshold?: number; }; export type IFilter = IReceiver & IBaseProtocolCore; diff --git a/packages/sdk/src/protocols/base_protocol.ts b/packages/sdk/src/protocols/base_protocol.ts index 27ee1b8832..854c0fabce 100644 --- a/packages/sdk/src/protocols/base_protocol.ts +++ b/packages/sdk/src/protocols/base_protocol.ts @@ -51,14 +51,15 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { public async renewPeer(peerToDisconnect: PeerId): Promise { this.log.info(`Renewing peer ${peerToDisconnect}`); + await this.connectionManager.dropConnection(peerToDisconnect); + const peer = (await this.findAndAddPeers(1))[0]; if (!peer) { - throw new Error( - "Failed to find a new peer to replace the disconnected one" + this.log.error( + "Failed to find a new peer to replace the disconnected one." ); } - await this.connectionManager.dropConnection(peerToDisconnect); this.peers = this.peers.filter((peer) => !peer.id.equals(peerToDisconnect)); this.log.info( `Peer ${peerToDisconnect} disconnected and removed from the peer list` diff --git a/packages/sdk/src/protocols/filter.ts b/packages/sdk/src/protocols/filter.ts index 35da1a4c92..af69f1d218 100644 --- a/packages/sdk/src/protocols/filter.ts +++ b/packages/sdk/src/protocols/filter.ts @@ -4,8 +4,8 @@ import { ConnectionManager, FilterCore } from "@waku/core"; import { type Callback, type ContentTopic, - CoreProtocolResult, - CreateSubscriptionResult, + type CoreProtocolResult, + type CreateSubscriptionResult, type IAsyncIterator, type IDecodedMessage, type IDecoder, @@ -13,13 +13,14 @@ import { type IProtoMessage, type ISubscriptionSDK, type Libp2p, + type PeerIdStr, type ProtocolCreateOptions, ProtocolError, - ProtocolUseOptions, + type ProtocolUseOptions, type PubsubTopic, - SDKProtocolResult, + type SDKProtocolResult, type ShardingParams, - SubscribeOptions, + type SubscribeOptions, type Unsubscribe } from "@waku/interfaces"; import { messageHashStr } from "@waku/message-hash"; @@ -39,9 +40,17 @@ type SubscriptionCallback = { callback: Callback; }; +type ReceivedMessageHashes = { + all: Set; + nodes: { + [peerId: PeerIdStr]: Set; + }; +}; + const log = new Logger("sdk:filter"); const DEFAULT_MAX_PINGS = 3; +const DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD = 3; const DEFAULT_KEEP_ALIVE = 30 * 1000; const DEFAULT_SUBSCRIBE_OPTIONS = { @@ -51,8 +60,11 @@ export class SubscriptionManager implements ISubscriptionSDK { private readonly pubsubTopic: PubsubTopic; readonly receivedMessagesHashStr: string[] = []; private keepAliveTimer: number | null = null; + private readonly receivedMessagesHashes: ReceivedMessageHashes; private peerFailures: Map = new Map(); + private missedMessagesByPeer: Map = new Map(); private maxPingFailures: number = DEFAULT_MAX_PINGS; + private maxMissedMessagesThreshold = DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD; private subscriptionCallbacks: Map< ContentTopic, @@ -67,6 +79,26 @@ export class SubscriptionManager implements ISubscriptionSDK { ) { this.pubsubTopic = pubsubTopic; this.subscriptionCallbacks = new Map(); + const allPeerIdStr = this.getPeers().map((p) => p.id.toString()); + this.receivedMessagesHashes = { + all: new Set(), + nodes: { + ...Object.fromEntries(allPeerIdStr.map((peerId) => [peerId, new Set()])) + } + }; + allPeerIdStr.forEach((peerId) => this.missedMessagesByPeer.set(peerId, 0)); + } + + get messageHashes(): string[] { + return [...this.receivedMessagesHashes.all]; + } + + private addHash(hash: string, peerIdStr?: string): void { + this.receivedMessagesHashes.all.add(hash); + + if (peerIdStr) { + this.receivedMessagesHashes.nodes[peerIdStr].add(hash); + } } public async subscribe( @@ -74,7 +106,11 @@ export class SubscriptionManager implements ISubscriptionSDK { callback: Callback, options: SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS ): Promise { + this.keepAliveTimer = options.keepAlive || DEFAULT_KEEP_ALIVE; this.maxPingFailures = options.pingsBeforePeerRenewed || DEFAULT_MAX_PINGS; + this.maxMissedMessagesThreshold = + options.maxMissedMessagesThreshold || + DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD; const decodersArray = Array.isArray(decoders) ? decoders : [decoders]; @@ -146,8 +182,10 @@ export class SubscriptionManager implements ISubscriptionSDK { const results = await Promise.allSettled(promises); const finalResult = this.handleResult(results, "unsubscribe"); - if (this.subscriptionCallbacks.size === 0 && this.keepAliveTimer) { - this.stopKeepAlivePings(); + if (this.subscriptionCallbacks.size === 0) { + if (this.keepAliveTimer) { + this.stopKeepAlivePings(); + } } return finalResult; @@ -180,11 +218,49 @@ export class SubscriptionManager implements ISubscriptionSDK { return finalResult; } - async processIncomingMessage(message: WakuMessage): Promise { + private async validateMessage(): Promise { + for (const hash of this.receivedMessagesHashes.all) { + for (const [peerIdStr, hashes] of Object.entries( + this.receivedMessagesHashes.nodes + )) { + if (!hashes.has(hash)) { + this.incrementMissedMessageCount(peerIdStr); + if (this.shouldRenewPeer(peerIdStr)) { + log.info( + `Peer ${peerIdStr} has missed too many messages, renewing.` + ); + const peerId = this.getPeers().find( + (p) => p.id.toString() === peerIdStr + )?.id; + if (!peerId) { + log.error( + `Unexpected Error: Peer ${peerIdStr} not found in connected peers.` + ); + continue; + } + try { + await this.renewAndSubscribePeer(peerId); + } catch (error) { + log.error(`Failed to renew peer ${peerIdStr}: ${error}`); + } + } + } + } + } + } + + async processIncomingMessage( + message: WakuMessage, + peerIdStr: string + ): Promise { const hashedMessageStr = messageHashStr( this.pubsubTopic, message as IProtoMessage ); + + this.addHash(hashedMessageStr, peerIdStr); + void this.validateMessage(); + if (this.receivedMessagesHashStr.includes(hashedMessageStr)) { log.info("Message already received, skipping"); return; @@ -277,15 +353,29 @@ export class SubscriptionManager implements ISubscriptionSDK { } } - private async renewAndSubscribePeer(peerId: PeerId): Promise { - const newPeer = await this.renewPeer(peerId); - await this.protocol.subscribe( - this.pubsubTopic, - newPeer, - Array.from(this.subscriptionCallbacks.keys()) - ); + private async renewAndSubscribePeer( + peerId: PeerId + ): Promise { + try { + const newPeer = await this.renewPeer(peerId); + await this.protocol.subscribe( + this.pubsubTopic, + newPeer, + Array.from(this.subscriptionCallbacks.keys()) + ); + + this.receivedMessagesHashes.nodes[newPeer.id.toString()] = new Set(); + this.missedMessagesByPeer.set(newPeer.id.toString(), 0); - return newPeer; + return newPeer; + } catch (error) { + log.warn(`Failed to renew peer ${peerId.toString()}: ${error}.`); + return; + } finally { + this.peerFailures.delete(peerId.toString()); + this.missedMessagesByPeer.delete(peerId.toString()); + delete this.receivedMessagesHashes.nodes[peerId.toString()]; + } } private startKeepAlivePings(options: SubscribeOptions): void { @@ -312,6 +402,16 @@ export class SubscriptionManager implements ISubscriptionSDK { clearInterval(this.keepAliveTimer); this.keepAliveTimer = null; } + + private incrementMissedMessageCount(peerIdStr: string): void { + const currentCount = this.missedMessagesByPeer.get(peerIdStr) || 0; + this.missedMessagesByPeer.set(peerIdStr, currentCount + 1); + } + + private shouldRenewPeer(peerIdStr: string): boolean { + const missedMessages = this.missedMessagesByPeer.get(peerIdStr) || 0; + return missedMessages > this.maxMissedMessagesThreshold; + } } class FilterSDK extends BaseProtocolSDK implements IFilterSDK { @@ -326,7 +426,7 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK { ) { super( new FilterCore( - async (pubsubTopic: PubsubTopic, wakuMessage: WakuMessage) => { + async (pubsubTopic, wakuMessage, peerIdStr) => { const subscription = this.getActiveSubscription(pubsubTopic); if (!subscription) { log.error( @@ -335,7 +435,7 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK { return; } - await subscription.processIncomingMessage(wakuMessage); + await subscription.processIncomingMessage(wakuMessage, peerIdStr); }, libp2p, options diff --git a/packages/tests/tests/filter/peer_management.spec.ts b/packages/tests/tests/filter/peer_management.spec.ts index eaf1218458..c676c884a0 100644 --- a/packages/tests/tests/filter/peer_management.spec.ts +++ b/packages/tests/tests/filter/peer_management.spec.ts @@ -1,7 +1,8 @@ import { DefaultPubsubTopic, ISubscriptionSDK, - LightNode + LightNode, + SDKProtocolResult } from "@waku/interfaces"; import { createDecoder, @@ -16,6 +17,7 @@ import { describe } from "mocha"; import { afterEachCustom, beforeEachCustom, + ServiceNode, ServiceNodesFleet } from "../../src/index.js"; import { @@ -177,4 +179,62 @@ describe("Waku Filter: Peer Management: E2E", function () { waku.filter.numPeersToUse ); }); + + it("Renews peer on consistent missed messages", async function () { + const [serviceNodes, waku] = await runMultipleNodes( + this.ctx, + undefined, + undefined, + 2 + ); + const serviceNodesPeerIdStr = await Promise.all( + serviceNodes.nodes.map(async (node) => + (await node.getPeerId()).toString() + ) + ); + const nodeWithoutDiscovery = new ServiceNode("WithoutDiscovery"); + await nodeWithoutDiscovery.start({ lightpush: true, filter: true }); + const nodeWithouDiscoveryPeerIdStr = ( + await nodeWithoutDiscovery.getPeerId() + ).toString(); + await waku.dial(await nodeWithoutDiscovery.getMultiaddrWithId()); + + const { error, subscription: sub } = + await waku.filter.createSubscription(pubsubTopic); + if (!sub || error) { + throw new Error("Could not create subscription"); + } + + const messages: DecodedMessage[] = []; + const { successes } = await sub.subscribe([decoder], (msg) => { + messages.push(msg); + }); + + expect(successes.length).to.be.greaterThan(0); + expect(successes.length).to.be.equal(waku.filter.numPeersToUse); + + const sendMessage: () => Promise = async () => + waku.lightPush.send(encoder, { + payload: utf8ToBytes("Hello_World") + }); + + await sendMessage(); + + successes + .map((peerId) => + [nodeWithouDiscoveryPeerIdStr, ...serviceNodesPeerIdStr].includes( + peerId.toString() + ) + ) + .forEach((isConnected) => expect(isConnected).to.eq(true)); + + // send 2 more messages + await sendMessage(); + await sendMessage(); + + expect(waku.filter.connectedPeers.length).to.equal(2); + expect( + waku.filter.connectedPeers.map((p) => p.id.toString()) + ).to.not.include(nodeWithouDiscoveryPeerIdStr); + }); });