Skip to content

Commit

Permalink
feat: validate messages for individual filter nodes & perform renewals (
Browse files Browse the repository at this point in the history
#2057)

* feat: validate messages for individual filter nodes & perform renewals

* chore: fix spell check

* chore: use a max threshold before peer renewal

* chore: switch from a validation cycle timer to adhoc validation

* chore: add test

* fix: test

* chore: address comments

* fix: renewal without a new peer available

* chore: validating messages should be non-blocking

* chore: minor improvements

* chore: rm only

* chore: fix test
  • Loading branch information
danisharora099 authored Jul 17, 2024
1 parent 00635b7 commit 9b0f1e8
Show file tree
Hide file tree
Showing 5 changed files with 191 additions and 24 deletions.
9 changes: 7 additions & 2 deletions packages/core/src/lib/filter/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore {
constructor(
private handleIncomingMessage: (
pubsubTopic: PubsubTopic,
wakuMessage: WakuMessage
wakuMessage: WakuMessage,
peerIdStr: string
) => Promise<void>,
libp2p: Libp2p,
options?: ProtocolCreateOptions
Expand Down Expand Up @@ -78,7 +79,11 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore {
return;
}

await this.handleIncomingMessage(pubsubTopic, wakuMessage);
await this.handleIncomingMessage(
pubsubTopic,
wakuMessage,
connection.remotePeer.toString()
);
}
}).then(
() => {
Expand Down
1 change: 1 addition & 0 deletions packages/interfaces/src/filter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import type { IReceiver } from "./receiver.js";
export type SubscribeOptions = {
keepAlive?: number;
pingsBeforePeerRenewed?: number;
maxMissedMessagesThreshold?: number;
};

export type IFilter = IReceiver & IBaseProtocolCore;
Expand Down
7 changes: 4 additions & 3 deletions packages/sdk/src/protocols/base_protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,15 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
public async renewPeer(peerToDisconnect: PeerId): Promise<Peer> {
this.log.info(`Renewing peer ${peerToDisconnect}`);

await this.connectionManager.dropConnection(peerToDisconnect);

const peer = (await this.findAndAddPeers(1))[0];
if (!peer) {
throw new Error(
"Failed to find a new peer to replace the disconnected one"
this.log.error(
"Failed to find a new peer to replace the disconnected one."
);
}

await this.connectionManager.dropConnection(peerToDisconnect);
this.peers = this.peers.filter((peer) => !peer.id.equals(peerToDisconnect));
this.log.info(
`Peer ${peerToDisconnect} disconnected and removed from the peer list`
Expand Down
136 changes: 118 additions & 18 deletions packages/sdk/src/protocols/filter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,23 @@ import { ConnectionManager, FilterCore } from "@waku/core";
import {
type Callback,
type ContentTopic,
CoreProtocolResult,
CreateSubscriptionResult,
type CoreProtocolResult,
type CreateSubscriptionResult,
type IAsyncIterator,
type IDecodedMessage,
type IDecoder,
type IFilterSDK,
type IProtoMessage,
type ISubscriptionSDK,
type Libp2p,
type PeerIdStr,
type ProtocolCreateOptions,
ProtocolError,
ProtocolUseOptions,
type ProtocolUseOptions,
type PubsubTopic,
SDKProtocolResult,
type SDKProtocolResult,
type ShardingParams,
SubscribeOptions,
type SubscribeOptions,
type Unsubscribe
} from "@waku/interfaces";
import { messageHashStr } from "@waku/message-hash";
Expand All @@ -39,9 +40,17 @@ type SubscriptionCallback<T extends IDecodedMessage> = {
callback: Callback<T>;
};

type ReceivedMessageHashes = {
all: Set<string>;
nodes: {
[peerId: PeerIdStr]: Set<string>;
};
};

const log = new Logger("sdk:filter");

const DEFAULT_MAX_PINGS = 3;
const DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD = 3;
const DEFAULT_KEEP_ALIVE = 30 * 1000;

const DEFAULT_SUBSCRIBE_OPTIONS = {
Expand All @@ -51,8 +60,11 @@ export class SubscriptionManager implements ISubscriptionSDK {
private readonly pubsubTopic: PubsubTopic;
readonly receivedMessagesHashStr: string[] = [];
private keepAliveTimer: number | null = null;
private readonly receivedMessagesHashes: ReceivedMessageHashes;
private peerFailures: Map<string, number> = new Map();
private missedMessagesByPeer: Map<string, number> = new Map();
private maxPingFailures: number = DEFAULT_MAX_PINGS;
private maxMissedMessagesThreshold = DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD;

private subscriptionCallbacks: Map<
ContentTopic,
Expand All @@ -67,14 +79,38 @@ export class SubscriptionManager implements ISubscriptionSDK {
) {
this.pubsubTopic = pubsubTopic;
this.subscriptionCallbacks = new Map();
const allPeerIdStr = this.getPeers().map((p) => p.id.toString());
this.receivedMessagesHashes = {
all: new Set(),
nodes: {
...Object.fromEntries(allPeerIdStr.map((peerId) => [peerId, new Set()]))
}
};
allPeerIdStr.forEach((peerId) => this.missedMessagesByPeer.set(peerId, 0));
}

get messageHashes(): string[] {
return [...this.receivedMessagesHashes.all];
}

private addHash(hash: string, peerIdStr?: string): void {
this.receivedMessagesHashes.all.add(hash);

if (peerIdStr) {
this.receivedMessagesHashes.nodes[peerIdStr].add(hash);
}
}

public async subscribe<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>,
options: SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS
): Promise<SDKProtocolResult> {
this.keepAliveTimer = options.keepAlive || DEFAULT_KEEP_ALIVE;
this.maxPingFailures = options.pingsBeforePeerRenewed || DEFAULT_MAX_PINGS;
this.maxMissedMessagesThreshold =
options.maxMissedMessagesThreshold ||
DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD;

const decodersArray = Array.isArray(decoders) ? decoders : [decoders];

Expand Down Expand Up @@ -146,8 +182,10 @@ export class SubscriptionManager implements ISubscriptionSDK {
const results = await Promise.allSettled(promises);
const finalResult = this.handleResult(results, "unsubscribe");

if (this.subscriptionCallbacks.size === 0 && this.keepAliveTimer) {
this.stopKeepAlivePings();
if (this.subscriptionCallbacks.size === 0) {
if (this.keepAliveTimer) {
this.stopKeepAlivePings();
}
}

return finalResult;
Expand Down Expand Up @@ -180,11 +218,49 @@ export class SubscriptionManager implements ISubscriptionSDK {
return finalResult;
}

async processIncomingMessage(message: WakuMessage): Promise<void> {
private async validateMessage(): Promise<void> {
for (const hash of this.receivedMessagesHashes.all) {
for (const [peerIdStr, hashes] of Object.entries(
this.receivedMessagesHashes.nodes
)) {
if (!hashes.has(hash)) {
this.incrementMissedMessageCount(peerIdStr);
if (this.shouldRenewPeer(peerIdStr)) {
log.info(
`Peer ${peerIdStr} has missed too many messages, renewing.`
);
const peerId = this.getPeers().find(
(p) => p.id.toString() === peerIdStr
)?.id;
if (!peerId) {
log.error(
`Unexpected Error: Peer ${peerIdStr} not found in connected peers.`
);
continue;
}
try {
await this.renewAndSubscribePeer(peerId);
} catch (error) {
log.error(`Failed to renew peer ${peerIdStr}: ${error}`);
}
}
}
}
}
}

async processIncomingMessage(
message: WakuMessage,
peerIdStr: string
): Promise<void> {
const hashedMessageStr = messageHashStr(
this.pubsubTopic,
message as IProtoMessage
);

this.addHash(hashedMessageStr, peerIdStr);
void this.validateMessage();

if (this.receivedMessagesHashStr.includes(hashedMessageStr)) {
log.info("Message already received, skipping");
return;
Expand Down Expand Up @@ -277,15 +353,29 @@ export class SubscriptionManager implements ISubscriptionSDK {
}
}

private async renewAndSubscribePeer(peerId: PeerId): Promise<Peer> {
const newPeer = await this.renewPeer(peerId);
await this.protocol.subscribe(
this.pubsubTopic,
newPeer,
Array.from(this.subscriptionCallbacks.keys())
);
private async renewAndSubscribePeer(
peerId: PeerId
): Promise<Peer | undefined> {
try {
const newPeer = await this.renewPeer(peerId);
await this.protocol.subscribe(
this.pubsubTopic,
newPeer,
Array.from(this.subscriptionCallbacks.keys())
);

this.receivedMessagesHashes.nodes[newPeer.id.toString()] = new Set();
this.missedMessagesByPeer.set(newPeer.id.toString(), 0);

return newPeer;
return newPeer;
} catch (error) {
log.warn(`Failed to renew peer ${peerId.toString()}: ${error}.`);
return;
} finally {
this.peerFailures.delete(peerId.toString());
this.missedMessagesByPeer.delete(peerId.toString());
delete this.receivedMessagesHashes.nodes[peerId.toString()];
}
}

private startKeepAlivePings(options: SubscribeOptions): void {
Expand All @@ -312,6 +402,16 @@ export class SubscriptionManager implements ISubscriptionSDK {
clearInterval(this.keepAliveTimer);
this.keepAliveTimer = null;
}

private incrementMissedMessageCount(peerIdStr: string): void {
const currentCount = this.missedMessagesByPeer.get(peerIdStr) || 0;
this.missedMessagesByPeer.set(peerIdStr, currentCount + 1);
}

private shouldRenewPeer(peerIdStr: string): boolean {
const missedMessages = this.missedMessagesByPeer.get(peerIdStr) || 0;
return missedMessages > this.maxMissedMessagesThreshold;
}
}

class FilterSDK extends BaseProtocolSDK implements IFilterSDK {
Expand All @@ -326,7 +426,7 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK {
) {
super(
new FilterCore(
async (pubsubTopic: PubsubTopic, wakuMessage: WakuMessage) => {
async (pubsubTopic, wakuMessage, peerIdStr) => {
const subscription = this.getActiveSubscription(pubsubTopic);
if (!subscription) {
log.error(
Expand All @@ -335,7 +435,7 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK {
return;
}

await subscription.processIncomingMessage(wakuMessage);
await subscription.processIncomingMessage(wakuMessage, peerIdStr);
},
libp2p,
options
Expand Down
62 changes: 61 additions & 1 deletion packages/tests/tests/filter/peer_management.spec.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import {
DefaultPubsubTopic,
ISubscriptionSDK,
LightNode
LightNode,
SDKProtocolResult
} from "@waku/interfaces";
import {
createDecoder,
Expand All @@ -16,6 +17,7 @@ import { describe } from "mocha";
import {
afterEachCustom,
beforeEachCustom,
ServiceNode,
ServiceNodesFleet
} from "../../src/index.js";
import {
Expand Down Expand Up @@ -177,4 +179,62 @@ describe("Waku Filter: Peer Management: E2E", function () {
waku.filter.numPeersToUse
);
});

it("Renews peer on consistent missed messages", async function () {
const [serviceNodes, waku] = await runMultipleNodes(
this.ctx,
undefined,
undefined,
2
);
const serviceNodesPeerIdStr = await Promise.all(
serviceNodes.nodes.map(async (node) =>
(await node.getPeerId()).toString()
)
);
const nodeWithoutDiscovery = new ServiceNode("WithoutDiscovery");
await nodeWithoutDiscovery.start({ lightpush: true, filter: true });
const nodeWithouDiscoveryPeerIdStr = (
await nodeWithoutDiscovery.getPeerId()
).toString();
await waku.dial(await nodeWithoutDiscovery.getMultiaddrWithId());

const { error, subscription: sub } =
await waku.filter.createSubscription(pubsubTopic);
if (!sub || error) {
throw new Error("Could not create subscription");
}

const messages: DecodedMessage[] = [];
const { successes } = await sub.subscribe([decoder], (msg) => {
messages.push(msg);
});

expect(successes.length).to.be.greaterThan(0);
expect(successes.length).to.be.equal(waku.filter.numPeersToUse);

const sendMessage: () => Promise<SDKProtocolResult> = async () =>
waku.lightPush.send(encoder, {
payload: utf8ToBytes("Hello_World")
});

await sendMessage();

successes
.map((peerId) =>
[nodeWithouDiscoveryPeerIdStr, ...serviceNodesPeerIdStr].includes(
peerId.toString()
)
)
.forEach((isConnected) => expect(isConnected).to.eq(true));

// send 2 more messages
await sendMessage();
await sendMessage();

expect(waku.filter.connectedPeers.length).to.equal(2);
expect(
waku.filter.connectedPeers.map((p) => p.id.toString())
).to.not.include(nodeWithouDiscoveryPeerIdStr);
});
});

0 comments on commit 9b0f1e8

Please sign in to comment.