Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: improve filter subscriptions #2193

Draft
wants to merge 4 commits into
base: weboko/peer-manager
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 24 additions & 8 deletions packages/interfaces/src/filter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,34 @@ export type SubscriptionCallback<T extends IDecodedMessage> = {
callback: Callback<T>;
};

export type SubscribeOptions = {
keepAlive?: number;
pingsBeforePeerRenewed?: number;
enableLightPushFilterCheck?: boolean;
export type FilterProtocolOptions = {
/**
* Interval with which Filter subscription will attempt to send ping requests to subscribed peers.
*
* @default 60_000
*/
keepAliveIntervalMs: number;

/**
* Number of failed pings allowed to make to a remote peer before attempting to subscribe to a new one.
*
* @default 3
*/
pingsBeforePeerRenewed: number;

/**
* Enables js-waku to send probe LightPush message over subscribed pubsubTopics on created subscription.
* In case message won't be received back through Filter - js-waku will attempt to subscribe to another peer.
*
* @default false
*/
enableLightPushFilterCheck: boolean;
};

export interface ISubscription {
subscribe<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>,
options?: SubscribeOptions
callback: Callback<T>
): Promise<SDKProtocolResult>;

unsubscribe(contentTopics: ContentTopic[]): Promise<SDKProtocolResult>;
Expand All @@ -38,8 +55,7 @@ export interface ISubscription {
export type IFilter = IReceiver & { protocol: IBaseProtocolCore } & {
subscribe<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>,
subscribeOptions?: SubscribeOptions
callback: Callback<T>
): Promise<SubscribeResult>;
};

Expand Down
22 changes: 12 additions & 10 deletions packages/interfaces/src/protocols.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import type { Libp2p } from "@libp2p/interface";
import type { PeerId } from "@libp2p/interface";

import type { ConnectionManagerOptions } from "./connection_manager.js";
import type { FilterProtocolOptions } from "./filter.js";
import type { CreateLibp2pOptions } from "./libp2p.js";
import type { IDecodedMessage } from "./message.js";
import { ThisAndThat, ThisOrThat } from "./misc.js";
Expand All @@ -24,17 +25,11 @@ export type NetworkConfig = StaticSharding | AutoSharding;

export type ProtocolCreateOptions = {
/**
* Configuration for determining the network in use.
*
* If using Static Sharding:
* Default value is configured for The Waku Network.
* The format to specify a shard is: clusterId: number, shards: number[]
* To learn more about the sharding specification, see [Relay Sharding](https://rfc.vac.dev/spec/51/).
* Set the user agent string to be used in identification of the node.
*
* If using Auto Sharding:
* See [Waku v2 Topic Usage Recommendations](https://github.com/vacp2p/rfc-index/blob/main/waku/informational/23/topics.md#content-topics) for details.
* You cannot add or remove content topics after initialization of the node.
* @default "js-waku"
*/
userAgent?: string;

/**
* Configuration for determining the network in use.
Expand Down Expand Up @@ -92,9 +87,16 @@ export type ProtocolCreateOptions = {
bootstrapPeers?: string[];

/**
* Configuration for connection manager. If not specified - default values are applied.
* Configuration for connection manager.
* If not specified - default values are applied.
*/
connectionManager?: Partial<ConnectionManagerOptions>;

/**
* Configuration for Filter protocol.
* If not specified - default values are applied.
*/
filter?: Partial<FilterProtocolOptions>;
};

export type Callback<T extends IDecodedMessage> = (
Expand Down
13 changes: 4 additions & 9 deletions packages/relay/src/create.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,5 @@
import type { RelayNode } from "@waku/interfaces";
import {
createLibp2pAndUpdateOptions,
CreateWakuNodeOptions,
WakuNode,
WakuOptions
} from "@waku/sdk";
import type { ProtocolCreateOptions, RelayNode } from "@waku/interfaces";
import { createLibp2pAndUpdateOptions, WakuNode } from "@waku/sdk";

import { RelayCreateOptions, wakuGossipSub, wakuRelay } from "./relay.js";

Expand All @@ -19,7 +14,7 @@ import { RelayCreateOptions, wakuGossipSub, wakuRelay } from "./relay.js";
* or use this function with caution.
*/
export async function createRelayNode(
options: CreateWakuNodeOptions & Partial<RelayCreateOptions>
options: ProtocolCreateOptions & Partial<RelayCreateOptions>
): Promise<RelayNode> {
options = {
...options,
Expand All @@ -36,7 +31,7 @@ export async function createRelayNode(

return new WakuNode(
pubsubTopics,
options as WakuOptions,
options as ProtocolCreateOptions,
libp2p,
{},
relay
Expand Down
6 changes: 3 additions & 3 deletions packages/sdk/src/create/create.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { type LightNode } from "@waku/interfaces";
import type { LightNode, ProtocolCreateOptions } from "@waku/interfaces";

import { CreateWakuNodeOptions, WakuNode } from "../waku/index.js";
import { WakuNode } from "../waku/index.js";

import { createLibp2pAndUpdateOptions } from "./libp2p.js";

Expand All @@ -10,7 +10,7 @@ import { createLibp2pAndUpdateOptions } from "./libp2p.js";
* Uses Waku Filter V2 by default.
*/
export async function createLightNode(
options: CreateWakuNodeOptions = {}
options: ProtocolCreateOptions = {}
): Promise<LightNode> {
Comment on lines 12 to +13
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ProtocolCreateOptions used for a createNode argument -- perhaps change naming/functionality here?

const { libp2p, pubsubTopics } = await createLibp2pAndUpdateOptions(options);

Expand Down
12 changes: 5 additions & 7 deletions packages/sdk/src/create/libp2p.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,12 @@
type IMetadata,
type Libp2p,
type Libp2pComponents,
type ProtocolCreateOptions,
PubsubTopic
} from "@waku/interfaces";
import { derivePubsubTopicsFromNetworkConfig, Logger } from "@waku/utils";
import { createLibp2p } from "libp2p";

import {
CreateWakuNodeOptions,
DefaultPingMaxInboundStreams,
DefaultUserAgent
} from "../waku/index.js";

import { defaultPeerDiscoveries } from "./discovery.js";

type MetadataService = {
Expand All @@ -31,6 +26,9 @@

const log = new Logger("sdk:create");

const DefaultUserAgent = "js-waku";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we're using mostly SNAKE_CASE for constants

const DefaultPingMaxInboundStreams = 10;

export async function defaultLibp2p(
pubsubTopics: PubsubTopic[],
options?: Partial<CreateLibp2pOptions>,
Expand Down Expand Up @@ -74,11 +72,11 @@
...metadataService,
...options?.services
}
}) as any as Libp2p; // TODO: make libp2p include it;

Check warning on line 75 in packages/sdk/src/create/libp2p.ts

View workflow job for this annotation

GitHub Actions / check

Unexpected any. Specify a different type

Check warning on line 75 in packages/sdk/src/create/libp2p.ts

View workflow job for this annotation

GitHub Actions / proto

Unexpected any. Specify a different type
}

export async function createLibp2pAndUpdateOptions(
options: CreateWakuNodeOptions
options: ProtocolCreateOptions
): Promise<{ libp2p: Libp2p; pubsubTopics: PubsubTopic[] }> {
const { networkConfig } = options;
const pubsubTopics = derivePubsubTopicsFromNetworkConfig(
Expand Down
6 changes: 1 addition & 5 deletions packages/sdk/src/protocols/filter/constants.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,4 @@
export const DEFAULT_KEEP_ALIVE = 60_000;
export const DEFAULT_MAX_PINGS = 3;
export const DEFAULT_LIGHT_PUSH_FILTER_CHECK = false;
export const DEFAULT_LIGHT_PUSH_FILTER_CHECK_INTERVAL = 10_000;

export const DEFAULT_SUBSCRIBE_OPTIONS = {
keepAlive: DEFAULT_KEEP_ALIVE,
enableLightPushFilterCheck: DEFAULT_LIGHT_PUSH_FILTER_CHECK
};
79 changes: 46 additions & 33 deletions packages/sdk/src/protocols/filter/index.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
import { ConnectionManager, FilterCore } from "@waku/core";
import {
type Callback,
type CreateSubscriptionResult,
type IAsyncIterator,
type IDecodedMessage,
type IDecoder,
type IFilter,
type ILightPush,
type Libp2p,
NetworkConfig,
ProtocolError,
type PubsubTopic,
type SubscribeOptions,
import type {
Callback,
CreateSubscriptionResult,
FilterProtocolOptions,
IAsyncIterator,
IDecodedMessage,
IDecoder,
IFilter,
ILightPush,
IProtoMessage,
Libp2p,
PubsubTopic,
SubscribeResult,
type Unsubscribe
Unsubscribe
} from "@waku/interfaces";
import { NetworkConfig, ProtocolError } from "@waku/interfaces";
import {
ensurePubsubTopicIsConfigured,
groupByContentTopic,
Expand All @@ -25,22 +25,29 @@ import {

import { PeerManager } from "../peer_manager.js";

import { DEFAULT_SUBSCRIBE_OPTIONS } from "./constants.js";
import { SubscriptionManager } from "./subscription_manager.js";
import { MessageCache } from "./message_cache.js";
import { Subscription } from "./subscription.js";
import { buildConfig } from "./utils.js";

const log = new Logger("sdk:filter");

class Filter implements IFilter {
public readonly protocol: FilterCore;

private activeSubscriptions = new Map<string, SubscriptionManager>();
private readonly config: FilterProtocolOptions;
private readonly messageCache: MessageCache;
private activeSubscriptions = new Map<string, Subscription>();

public constructor(
private connectionManager: ConnectionManager,
private libp2p: Libp2p,
private peerManager: PeerManager,
private lightPush?: ILightPush
private lightPush?: ILightPush,
config?: Partial<FilterProtocolOptions>
) {
this.config = buildConfig(config);
this.messageCache = new MessageCache(libp2p);

this.protocol = new FilterCore(
async (pubsubTopic, wakuMessage, peerIdStr) => {
const subscription = this.getActiveSubscription(pubsubTopic);
Expand All @@ -50,6 +57,15 @@ class Filter implements IFilter {
);
return;
}

if (this.messageCache.has(pubsubTopic, wakuMessage as IProtoMessage)) {
log.info(
`Skipping duplicate message for pubsubTopic:${pubsubTopic} peerId:${peerIdStr}`
);
return;
}

this.messageCache.set(pubsubTopic, wakuMessage as IProtoMessage);
Comment on lines +61 to +68
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we can move this to a private function

await subscription.processIncomingMessage(wakuMessage, peerIdStr);
},

Expand All @@ -66,7 +82,6 @@ class Filter implements IFilter {
*
* @param {IDecoder<T> | IDecoder<T>[]} decoders - A single decoder or an array of decoders to use for decoding messages.
* @param {Callback<T>} callback - The callback function to be invoked with decoded messages.
* @param {SubscribeOptions} [subscribeOptions=DEFAULT_SUBSCRIBE_OPTIONS] - Options for the subscription.
*
* @returns {Promise<SubscribeResult>} A promise that resolves to an object containing:
* - subscription: The created subscription object if successful, or null if failed.
Expand Down Expand Up @@ -100,8 +115,7 @@ class Filter implements IFilter {
*/
public async subscribe<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>,
subscribeOptions: SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS
callback: Callback<T>
): Promise<SubscribeResult> {
const uniquePubsubTopics = this.getUniquePubsubTopics(decoders);

Expand All @@ -127,8 +141,7 @@ class Filter implements IFilter {

const { failures, successes } = await subscription.subscribe(
decoders,
callback,
subscribeOptions
callback
);
return {
subscription,
Expand Down Expand Up @@ -173,12 +186,13 @@ class Filter implements IFilter {
this.getActiveSubscription(pubsubTopic) ??
this.setActiveSubscription(
pubsubTopic,
new SubscriptionManager(
new Subscription(
pubsubTopic,
this.protocol,
this.connectionManager,
this.peerManager,
this.libp2p,
this.config,
this.lightPush
)
);
Expand Down Expand Up @@ -206,8 +220,7 @@ class Filter implements IFilter {
*/
public async subscribeWithUnsubscribe<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>,
options: SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS
callback: Callback<T>
): Promise<Unsubscribe> {
const uniquePubsubTopics = this.getUniquePubsubTopics<T>(decoders);

Expand All @@ -231,7 +244,7 @@ class Filter implements IFilter {
throw Error(`Failed to create subscription: ${error}`);
}

await subscription.subscribe(decoders, callback, options);
await subscription.subscribe(decoders, callback);

const contentTopics = Array.from(
groupByContentTopic(
Expand All @@ -250,17 +263,16 @@ class Filter implements IFilter {
return toAsyncIterator(this, decoders);
}

//TODO: move to SubscriptionManager
private getActiveSubscription(
pubsubTopic: PubsubTopic
): SubscriptionManager | undefined {
): Subscription | undefined {
return this.activeSubscriptions.get(pubsubTopic);
}

private setActiveSubscription(
pubsubTopic: PubsubTopic,
subscription: SubscriptionManager
): SubscriptionManager {
subscription: Subscription
): Subscription {
this.activeSubscriptions.set(pubsubTopic, subscription);
return subscription;
}
Expand All @@ -285,8 +297,9 @@ class Filter implements IFilter {
export function wakuFilter(
connectionManager: ConnectionManager,
peerManager: PeerManager,
lightPush?: ILightPush
lightPush?: ILightPush,
config?: Partial<FilterProtocolOptions>
): (libp2p: Libp2p) => IFilter {
return (libp2p: Libp2p) =>
new Filter(connectionManager, libp2p, peerManager, lightPush);
new Filter(connectionManager, libp2p, peerManager, lightPush, config);
}
Loading
Loading