diff --git a/packages/tests/src/lib/service_node.ts b/packages/tests/src/lib/service_node.ts index 14ea9a78b9..29ea61c5cc 100644 --- a/packages/tests/src/lib/service_node.ts +++ b/packages/tests/src/lib/service_node.ts @@ -290,7 +290,7 @@ export class ServiceNode { } return this.restCall( - `/relay/v1/auto/message`, + `/relay/v1/auto/messages`, "POST", message, async (response) => response.status === 200 diff --git a/packages/tests/src/types.ts b/packages/tests/src/types.ts index 6f7b6fe705..33bb757475 100644 --- a/packages/tests/src/types.ts +++ b/packages/tests/src/types.ts @@ -15,6 +15,7 @@ export interface Args { discv5Discovery?: boolean; storeMessageDbUrl?: string; pubsubTopic?: Array; + contentTopic?: Array; websocketSupport?: boolean; tcpPort?: number; restPort?: number; diff --git a/packages/tests/src/utils/generate_test_data.ts b/packages/tests/src/utils/generate_test_data.ts index c09d8ebf67..8644e26437 100644 --- a/packages/tests/src/utils/generate_test_data.ts +++ b/packages/tests/src/utils/generate_test_data.ts @@ -1,7 +1,5 @@ import { createDecoder, createEncoder, Decoder, Encoder } from "@waku/core"; -import { DOCKER_IMAGE_NAME } from "../lib/service_node"; - // Utility to generate test data for multiple topics tests. export function generateTestData(topicCount: number): { contentTopics: string[]; @@ -22,19 +20,3 @@ export function generateTestData(topicCount: number): { decoders }; } - -// Utility to add test conditions based on nwaku/go-waku versions -export function isNwakuAtLeast(requiredVersion: string): boolean { - const versionRegex = /(?:v)?(\d+\.\d+(?:\.\d+)?)/; - const match = DOCKER_IMAGE_NAME.match(versionRegex); - - if (match) { - const version = match[0].substring(1); // Remove the 'v' prefix - return ( - version.localeCompare(requiredVersion, undefined, { numeric: true }) >= 0 - ); - } else { - // If there is no match we assume that it's a version close to master so we return True - return true; - } -} diff --git a/packages/tests/src/utils/index.ts b/packages/tests/src/utils/index.ts index 1a27402477..1024c0d884 100644 --- a/packages/tests/src/utils/index.ts +++ b/packages/tests/src/utils/index.ts @@ -6,3 +6,4 @@ export * from "./delay.js"; export * from "./base64_utf8.js"; export * from "./waitForConnections.js"; export * from "./custom_mocha_hooks.js"; +export * from "./waku_versions_utils.js"; diff --git a/packages/tests/src/utils/waku_versions_utils.ts b/packages/tests/src/utils/waku_versions_utils.ts new file mode 100644 index 0000000000..e2f6cf5c9c --- /dev/null +++ b/packages/tests/src/utils/waku_versions_utils.ts @@ -0,0 +1,34 @@ +import { Logger } from "@waku/utils"; + +import { DOCKER_IMAGE_NAME } from "../lib/service_node"; + +const log = new Logger("test:utils"); + +// Utility to add test conditions based on nwaku/go-waku versions +export function isNwakuAtLeast(requiredVersion: string): boolean { + const versionRegex = /(?:v)?(\d+\.\d+(?:\.\d+)?)/; + const match = DOCKER_IMAGE_NAME.match(versionRegex); + + if (match) { + const version = match[0].substring(1); // Remove the 'v' prefix + return ( + version.localeCompare(requiredVersion, undefined, { numeric: true }) >= 0 + ); + } else { + // If there is no match we assume that it's a version close to master so we return True + return true; + } +} + +// Utility to resolve autosharding cluster ID +export function resolveAutoshardingCluster(clusterId: number): number { + if (isNwakuAtLeast("0.27.0")) { + log.info(`Using clusterID ${clusterId} for autosharding`); + return clusterId; + } else { + // for versions older than 0.27.0 the autosharding cluster was hardcoded to 1 + // https://github.com/waku-org/nwaku/pull/2505 + log.warn("Falling back to clusterID 1 for autosharding"); + return 1; + } +} diff --git a/packages/tests/tests/filter/single_node/multiple_pubsub.node.spec.ts b/packages/tests/tests/filter/single_node/multiple_pubsub.node.spec.ts index 7fd42b1a79..55f176434c 100644 --- a/packages/tests/tests/filter/single_node/multiple_pubsub.node.spec.ts +++ b/packages/tests/tests/filter/single_node/multiple_pubsub.node.spec.ts @@ -19,8 +19,10 @@ import { expect } from "chai"; import { afterEachCustom, beforeEachCustom, + isNwakuAtLeast, makeLogFileName, MessageCollector, + resolveAutoshardingCluster, ServiceNode, tearDownNodes } from "../../../src/index.js"; @@ -186,6 +188,7 @@ describe("Waku Filter V2: Multiple PubsubTopics", function () { describe("Waku Filter V2 (Autosharding): Multiple PubsubTopics", function () { // Set the timeout for all tests in this suite. Can be overwritten at test level this.timeout(30000); + const clusterId = resolveAutoshardingCluster(3); let waku: LightNode; let nwaku: ServiceNode; let nwaku2: ServiceNode; @@ -196,39 +199,45 @@ describe("Waku Filter V2 (Autosharding): Multiple PubsubTopics", function () { const customContentTopic2 = "/myapp/1/latest/proto"; const autoshardingPubsubTopic1 = contentTopicToPubsubTopic( customContentTopic1, - 3 + clusterId ); const autoshardingPubsubTopic2 = contentTopicToPubsubTopic( customContentTopic2, - 3 + clusterId ); const contentTopicInfo: ContentTopicInfo = { - clusterId: 3, + clusterId: clusterId, contentTopics: [customContentTopic1, customContentTopic2] }; const customEncoder1 = createEncoder({ contentTopic: customContentTopic1, pubsubTopicShardInfo: { - clusterId: 3, + clusterId: clusterId, shard: contentTopicToShardIndex(customContentTopic1) } }); const customDecoder1 = createDecoder(customContentTopic1, { - clusterId: 3, + clusterId: clusterId, shard: contentTopicToShardIndex(customContentTopic1) }); const customEncoder2 = createEncoder({ contentTopic: customContentTopic2, pubsubTopicShardInfo: { - clusterId: 3, + clusterId: clusterId, shard: contentTopicToShardIndex(customContentTopic2) } }); const customDecoder2 = createDecoder(customContentTopic2, { - clusterId: 3, + clusterId: clusterId, shard: contentTopicToShardIndex(customContentTopic2) }); + before(async () => { + if (!isNwakuAtLeast("0.27.0")) { + this.ctx.skip(); + } + }); + beforeEachCustom(this, async () => { [nwaku, waku] = await runNodes( this.ctx, @@ -309,7 +318,8 @@ describe("Waku Filter V2 (Autosharding): Multiple PubsubTopics", function () { lightpush: true, relay: true, pubsubTopic: [autoshardingPubsubTopic2], - clusterId: 3 + clusterId: clusterId, + contentTopic: [customContentTopic2] }); await waku.dial(await nwaku2.getMultiaddrWithId()); await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]); diff --git a/packages/tests/tests/filter/single_node/utils.ts b/packages/tests/tests/filter/single_node/utils.ts index f52959c3fc..59195f0302 100644 --- a/packages/tests/tests/filter/single_node/utils.ts +++ b/packages/tests/tests/filter/single_node/utils.ts @@ -1,5 +1,6 @@ import { waitForRemotePeer } from "@waku/core"; import { + ContentTopicInfo, DefaultPubsubTopic, LightNode, ProtocolCreateOptions, @@ -26,17 +27,25 @@ export async function runNodes( ): Promise<[ServiceNode, LightNode]> { const nwaku = new ServiceNode(makeLogFileName(context)); + function isContentTopicInfo(info: ShardingParams): info is ContentTopicInfo { + return (info as ContentTopicInfo).contentTopics !== undefined; + } + await nwaku.start( { filter: true, lightpush: true, relay: true, pubsubTopic: pubsubTopics, - ...(shardInfo && { clusterId: shardInfo.clusterId }) + // Conditionally include clusterId if shardInfo exists + ...(shardInfo && { clusterId: shardInfo.clusterId }), + // Conditionally include contentTopic if shardInfo exists and clusterId is 1 + ...(shardInfo && + isContentTopicInfo(shardInfo) && + shardInfo.clusterId === 1 && { contentTopic: shardInfo.contentTopics }) }, { retries: 3 } ); - const waku_options: ProtocolCreateOptions = { staticNoiseKey: NOISE_KEY_1, libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } }, diff --git a/packages/tests/tests/getPeers.spec.ts b/packages/tests/tests/getPeers.spec.ts index f8ba7a69fa..bc5f6e690b 100644 --- a/packages/tests/tests/getPeers.spec.ts +++ b/packages/tests/tests/getPeers.spec.ts @@ -20,7 +20,9 @@ import Sinon from "sinon"; import { afterEachCustom, beforeEachCustom, + isNwakuAtLeast, makeLogFileName, + resolveAutoshardingCluster, ServiceNode, tearDownNodes } from "../src/index.js"; @@ -30,6 +32,13 @@ describe("getConnectedPeersForProtocolAndShard", function () { let serviceNode1: ServiceNode; let serviceNode2: ServiceNode; const contentTopic = "/test/2/waku-light-push/utf8"; + const autoshardingClusterId = resolveAutoshardingCluster(6); + + before(async () => { + if (!isNwakuAtLeast("0.27.0")) { + this.ctx.skip(); + } + }); beforeEachCustom(this, async () => { serviceNode1 = new ServiceNode(makeLogFileName(this.ctx) + "1"); @@ -231,7 +240,7 @@ describe("getConnectedPeersForProtocolAndShard", function () { this.timeout(15000); const shardInfo: ContentTopicInfo = { - clusterId: 2, + clusterId: autoshardingClusterId, contentTopics: [contentTopic] }; @@ -240,6 +249,7 @@ describe("getConnectedPeersForProtocolAndShard", function () { peerExchange: true, clusterId: shardInfo.clusterId, pubsubTopic: shardInfoToPubsubTopics(shardInfo), + contentTopic: [contentTopic], lightpush: true, relay: true }); @@ -263,12 +273,12 @@ describe("getConnectedPeersForProtocolAndShard", function () { this.timeout(15000); const shardInfo1: ContentTopicInfo = { - clusterId: 2, + clusterId: autoshardingClusterId, contentTopics: [contentTopic] }; const shardInfo2: ContentTopicInfo = { - clusterId: 2, + clusterId: autoshardingClusterId, contentTopics: ["/test/5/waku-light-push/utf8"] }; @@ -278,6 +288,7 @@ describe("getConnectedPeersForProtocolAndShard", function () { peerExchange: true, clusterId: shardInfo1.clusterId, pubsubTopic: shardInfoToPubsubTopics(shardInfo1), + contentTopic: [contentTopic], lightpush: true, relay: true }); @@ -288,6 +299,7 @@ describe("getConnectedPeersForProtocolAndShard", function () { peerExchange: true, clusterId: shardInfo2.clusterId, pubsubTopic: shardInfoToPubsubTopics(shardInfo2), + contentTopic: [contentTopic], lightpush: true, relay: true }); @@ -315,12 +327,12 @@ describe("getConnectedPeersForProtocolAndShard", function () { this.timeout(15000); const shardInfo1: ContentTopicInfo = { - clusterId: 2, + clusterId: autoshardingClusterId, contentTopics: [contentTopic] }; const shardInfo2: ContentTopicInfo = { - clusterId: 3, + clusterId: 2, contentTopics: [contentTopic] }; @@ -330,6 +342,7 @@ describe("getConnectedPeersForProtocolAndShard", function () { peerExchange: true, clusterId: shardInfo1.clusterId, pubsubTopic: shardInfoToPubsubTopics(shardInfo1), + contentTopic: [contentTopic], lightpush: true, relay: true }); @@ -367,12 +380,12 @@ describe("getConnectedPeersForProtocolAndShard", function () { this.timeout(15000); const shardInfo1: ContentTopicInfo = { - clusterId: 2, + clusterId: autoshardingClusterId, contentTopics: [contentTopic] }; const shardInfo2: ContentTopicInfo = { - clusterId: 3, + clusterId: 2, contentTopics: ["/test/5/waku-light-push/utf8"] }; @@ -382,6 +395,7 @@ describe("getConnectedPeersForProtocolAndShard", function () { peerExchange: true, clusterId: shardInfo1.clusterId, pubsubTopic: shardInfoToPubsubTopics(shardInfo1), + contentTopic: [contentTopic], lightpush: true, relay: true }); diff --git a/packages/tests/tests/light-push/single_node/multiple_pubsub.node.spec.ts b/packages/tests/tests/light-push/single_node/multiple_pubsub.node.spec.ts index 1144e5095c..b615ee4ed0 100644 --- a/packages/tests/tests/light-push/single_node/multiple_pubsub.node.spec.ts +++ b/packages/tests/tests/light-push/single_node/multiple_pubsub.node.spec.ts @@ -19,8 +19,10 @@ import { expect } from "chai"; import { afterEachCustom, beforeEachCustom, + isNwakuAtLeast, makeLogFileName, MessageCollector, + resolveAutoshardingCluster, ServiceNode, tearDownNodes } from "../../../src/index.js"; @@ -177,7 +179,7 @@ describe("Waku Light Push (Autosharding): Multiple PubsubTopics", function () { let nwaku2: ServiceNode; let messageCollector: MessageCollector; - const clusterId = 2; + const clusterId = resolveAutoshardingCluster(4); const customContentTopic1 = "/waku/2/content/test.js"; const customContentTopic2 = "/myapp/1/latest/proto"; const autoshardingPubsubTopic1 = contentTopicToPubsubTopic( @@ -203,6 +205,12 @@ describe("Waku Light Push (Autosharding): Multiple PubsubTopics", function () { let nimPeerId: PeerId; + before(async () => { + if (!isNwakuAtLeast("0.27.0")) { + this.ctx.skip(); + } + }); + beforeEachCustom(this, async () => { [nwaku, waku] = await runNodes( this.ctx, diff --git a/packages/tests/tests/light-push/utils.ts b/packages/tests/tests/light-push/utils.ts index 3e131c470c..04a6ccc055 100644 --- a/packages/tests/tests/light-push/utils.ts +++ b/packages/tests/tests/light-push/utils.ts @@ -1,5 +1,6 @@ import { createEncoder, waitForRemotePeer } from "@waku/core"; import { + ContentTopicInfo, DefaultPubsubTopic, LightNode, Protocols, @@ -23,13 +24,23 @@ export async function runNodes( shardInfo?: ShardingParams ): Promise<[ServiceNode, LightNode]> { const nwaku = new ServiceNode(makeLogFileName(context)); + + function isContentTopicInfo(info: ShardingParams): info is ContentTopicInfo { + return (info as ContentTopicInfo).contentTopics !== undefined; + } + await nwaku.start( { lightpush: true, filter: true, relay: true, pubsubTopic: pubsubTopics, - ...(shardInfo && { clusterId: shardInfo.clusterId }) + // Conditionally include clusterId if shardInfo exists + ...(shardInfo && { clusterId: shardInfo.clusterId }), + // Conditionally include contentTopic if shardInfo exists and clusterId is 1 + ...(shardInfo && + isContentTopicInfo(shardInfo) && + shardInfo.clusterId === 1 && { contentTopic: shardInfo.contentTopics }) }, { retries: 3 } ); diff --git a/packages/tests/tests/metadata.spec.ts b/packages/tests/tests/metadata.spec.ts index d121f14c62..3b48320404 100644 --- a/packages/tests/tests/metadata.spec.ts +++ b/packages/tests/tests/metadata.spec.ts @@ -67,13 +67,12 @@ describe("Metadata Protocol", function () { expect(shardInfoRes).to.not.be.undefined; expect(shardInfoRes.clusterId).to.equal(shardInfo.clusterId); - expect(shardInfoRes.shards).to.deep.equal(shardInfo.shards); + expect(shardInfoRes.shards).to.include.members(shardInfo.shards); const activeConnections = waku.libp2p.getConnections(); expect(activeConnections.length).to.equal(1); }); - // Had to use cluster 0 because of https://github.com/waku-org/js-waku/issues/1848 it("same cluster, different shard: nodes connect", async function () { const shardInfo1: ShardInfo = { clusterId: 0, @@ -115,7 +114,7 @@ describe("Metadata Protocol", function () { expect(shardInfoRes).to.not.be.undefined; expect(shardInfoRes.clusterId).to.equal(shardInfo1.clusterId); - expect(shardInfoRes.shards).to.deep.equal(shardInfo1.shards); + expect(shardInfoRes.shards).to.include.members(shardInfo1.shards); const activeConnections = waku.libp2p.getConnections(); expect(activeConnections.length).to.equal(1); @@ -219,6 +218,6 @@ describe("Metadata Protocol", function () { expect(metadataShardInfo).not.be.undefined; expect(metadataShardInfo!.clusterId).to.eq(shardInfo.clusterId); - expect(metadataShardInfo.shards).to.deep.eq(shardInfo.shards); + expect(metadataShardInfo.shards).to.include.members(shardInfo.shards); }); }); diff --git a/packages/tests/tests/relay/multiple_pubsub.node.spec.ts b/packages/tests/tests/relay/multiple_pubsub.node.spec.ts index b5261e92a1..cde14fbcef 100644 --- a/packages/tests/tests/relay/multiple_pubsub.node.spec.ts +++ b/packages/tests/tests/relay/multiple_pubsub.node.spec.ts @@ -26,6 +26,7 @@ import { NOISE_KEY_1, NOISE_KEY_2, NOISE_KEY_3, + resolveAutoshardingCluster, tearDownNodes } from "../../src/index.js"; import { TestDecoder } from "../filter/utils.js"; @@ -317,6 +318,7 @@ describe("Waku Relay, multiple pubsub topics", function () { describe("Waku Relay (Autosharding), multiple pubsub topics", function () { this.timeout(15000); + const clusterId = resolveAutoshardingCluster(7); let waku1: RelayNode; let waku2: RelayNode; let waku3: RelayNode; @@ -325,18 +327,18 @@ describe("Waku Relay (Autosharding), multiple pubsub topics", function () { const customContentTopic2 = "/myapp/1/latest/proto"; const autoshardingPubsubTopic1 = contentTopicToPubsubTopic( customContentTopic1, - 3 + clusterId ); const autoshardingPubsubTopic2 = contentTopicToPubsubTopic( customContentTopic2, - 3 + clusterId ); const contentTopicInfo1: ContentTopicInfo = { - clusterId: 3, + clusterId: clusterId, contentTopics: [customContentTopic1] }; const contentTopicInfo2: ContentTopicInfo = { - clusterId: 3, + clusterId: clusterId, contentTopics: [customContentTopic2] }; const customEncoder1 = createEncoder({ @@ -356,7 +358,7 @@ describe("Waku Relay (Autosharding), multiple pubsub topics", function () { pubsubTopicToSingleShardInfo(autoshardingPubsubTopic2) ); const contentTopicInfoBothShards: ContentTopicInfo = { - clusterId: 3, + clusterId: clusterId, contentTopics: [customContentTopic1, customContentTopic2] }; diff --git a/packages/tests/tests/sdk/content_topic.spec.ts b/packages/tests/tests/sdk/content_topic.spec.ts index 483e495169..31991ac48a 100644 --- a/packages/tests/tests/sdk/content_topic.spec.ts +++ b/packages/tests/tests/sdk/content_topic.spec.ts @@ -3,7 +3,6 @@ import { bytesToUtf8, createEncoder, createLightNode, - DEFAULT_CLUSTER_ID, defaultLibp2p, LightNode, Protocols, @@ -20,12 +19,18 @@ import { } from "@waku/utils"; import { expect } from "chai"; -import { makeLogFileName, ServiceNode, tearDownNodes } from "../../src"; +import { + makeLogFileName, + resolveAutoshardingCluster, + ServiceNode, + tearDownNodes +} from "../../src"; // skipped: https://github.com/waku-org/js-waku/issues/1914 describe.skip("SDK: Creating by Content Topic", function () { const ContentTopic = "/myapp/1/latest/proto"; const testMessage = "Test123"; + const clusterId = resolveAutoshardingCluster(2); let nwaku: ServiceNode; let waku: LightNode; let waku2: LightNode; @@ -34,13 +39,13 @@ describe.skip("SDK: Creating by Content Topic", function () { this.timeout(15000); nwaku = new ServiceNode(makeLogFileName(this) + "1"); await nwaku.start({ - pubsubTopic: [contentTopicToPubsubTopic(ContentTopic)], + pubsubTopic: [contentTopicToPubsubTopic(ContentTopic, clusterId)], lightpush: true, relay: true, filter: true, discv5Discovery: true, peerExchange: true, - clusterId: DEFAULT_CLUSTER_ID + clusterId: clusterId }); }); @@ -50,7 +55,10 @@ describe.skip("SDK: Creating by Content Topic", function () { }); it("given a content topic, creates a waku node and filter subscription", async function () { - const expectedPubsubTopic = contentTopicToPubsubTopic(ContentTopic); + const expectedPubsubTopic = contentTopicToPubsubTopic( + ContentTopic, + clusterId + ); waku = ( await subscribeToContentTopic(ContentTopic, () => {}, { @@ -62,7 +70,10 @@ describe.skip("SDK: Creating by Content Topic", function () { }); it("given a waku node and content topic, creates a filter subscription", async function () { - const expectedPubsubTopic = contentTopicToPubsubTopic(ContentTopic); + const expectedPubsubTopic = contentTopicToPubsubTopic( + ContentTopic, + clusterId + ); waku = await createLightNode({ shardInfo: { contentTopics: [ContentTopic] } @@ -96,7 +107,7 @@ describe.skip("SDK: Creating by Content Topic", function () { await waitForRemotePeer(waku2, [Protocols.LightPush]); const encoder = createEncoder({ pubsubTopicShardInfo: pubsubTopicToSingleShardInfo( - contentTopicToPubsubTopic(ContentTopic) + contentTopicToPubsubTopic(ContentTopic, clusterId) ), contentTopic: ContentTopic }); @@ -136,7 +147,7 @@ describe.skip("SDK: Creating by Content Topic", function () { await waitForRemotePeer(waku2, [Protocols.LightPush]); const encoder = createEncoder({ pubsubTopicShardInfo: pubsubTopicToSingleShardInfo( - contentTopicToPubsubTopic(ContentTopic) + contentTopicToPubsubTopic(ContentTopic, clusterId) ), contentTopic: ContentTopic }); @@ -161,7 +172,7 @@ describe.skip("SDK: Creating by Content Topic", function () { const encoder = createEncoder({ pubsubTopicShardInfo: pubsubTopicToSingleShardInfo( - contentTopicToPubsubTopic(ContentTopic) + contentTopicToPubsubTopic(ContentTopic, clusterId) ), contentTopic: ContentTopic }); diff --git a/packages/tests/tests/sharding/auto_sharding.spec.ts b/packages/tests/tests/sharding/auto_sharding.spec.ts new file mode 100644 index 0000000000..e1b623405a --- /dev/null +++ b/packages/tests/tests/sharding/auto_sharding.spec.ts @@ -0,0 +1,374 @@ +import { LightNode, ProtocolError, Protocols } from "@waku/interfaces"; +import { + createEncoder, + createLightNode, + utf8ToBytes, + waitForRemotePeer +} from "@waku/sdk"; +import { + contentTopicToPubsubTopic, + contentTopicToShardIndex +} from "@waku/utils"; +import { expect } from "chai"; + +import { + afterEachCustom, + beforeEachCustom, + isNwakuAtLeast, + makeLogFileName, + MessageCollector, + resolveAutoshardingCluster, + ServiceNode, + tearDownNodes +} from "../../src/index.js"; + +const ContentTopic = "/waku/2/content/test.js"; +const ContentTopic2 = "/myapp/1/latest/proto"; + +describe("Autosharding: Running Nodes", function () { + this.timeout(50000); + const clusterId = resolveAutoshardingCluster(10); + let waku: LightNode; + let nwaku: ServiceNode; + let messageCollector: MessageCollector; + + before(async () => { + if (!isNwakuAtLeast("0.27.0")) { + this.ctx.skip(); + } + }); + + beforeEachCustom(this, async () => { + nwaku = new ServiceNode(makeLogFileName(this.ctx)); + messageCollector = new MessageCollector(nwaku); + }); + + afterEachCustom(this, async () => { + await tearDownNodes(nwaku, waku); + }); + + describe("Different clusters and topics", function () { + // js-waku allows autosharding for cluster IDs different than 1 + // we have 2 dedicated tests for this case + it("Cluster ID 0 - Default/Global Cluster", async function () { + const clusterId = 0; + const pubsubTopics = [contentTopicToPubsubTopic(ContentTopic, clusterId)]; + await nwaku.start({ + store: true, + lightpush: true, + relay: true, + clusterId: clusterId, + pubsubTopic: pubsubTopics + }); + + waku = await createLightNode({ + shardInfo: { + clusterId: clusterId, + contentTopics: [ContentTopic] + } + }); + await waku.dial(await nwaku.getMultiaddrWithId()); + await waitForRemotePeer(waku, [Protocols.LightPush]); + + const encoder = createEncoder({ + contentTopic: ContentTopic, + pubsubTopicShardInfo: { + clusterId: clusterId, + shard: contentTopicToShardIndex(ContentTopic) + } + }); + + const request = await waku.lightPush.send(encoder, { + payload: utf8ToBytes("Hello World") + }); + + expect(request.successes.length).to.eq(1); + expect( + await messageCollector.waitForMessagesAutosharding(1, { + contentTopic: ContentTopic + }) + ).to.eq(true); + }); + + it("Non Autosharding Cluster", async function () { + const clusterId = 5; + const pubsubTopics = [contentTopicToPubsubTopic(ContentTopic, clusterId)]; + await nwaku.start({ + store: true, + lightpush: true, + relay: true, + clusterId: clusterId, + pubsubTopic: pubsubTopics + }); + + waku = await createLightNode({ + shardInfo: { + clusterId: clusterId, + contentTopics: [ContentTopic] + } + }); + await waku.dial(await nwaku.getMultiaddrWithId()); + await waitForRemotePeer(waku, [Protocols.LightPush]); + + const encoder = createEncoder({ + contentTopic: ContentTopic, + pubsubTopicShardInfo: { + clusterId: clusterId, + shard: contentTopicToShardIndex(ContentTopic) + } + }); + + const request = await waku.lightPush.send(encoder, { + payload: utf8ToBytes("Hello World") + }); + + expect(request.successes.length).to.eq(1); + expect( + await messageCollector.waitForMessagesAutosharding(1, { + contentTopic: ContentTopic + }) + ).to.eq(true); + }); + + const numTest = 10; + for (let i = 0; i < numTest; i++) { + // Random ContentTopic + const applicationName = `app${Math.floor(Math.random() * 100)}`; // Random application name app0 to app99 + const version = Math.floor(Math.random() * 10) + 1; // Random version between 1 and 10 + const topicName = `topic${Math.floor(Math.random() * 1000)}`; // Random topic name topic0 to topic999 + const encodingList = ["proto", "json", "xml", "test.js", "utf8"]; // Potential encodings + const encoding = + encodingList[Math.floor(Math.random() * encodingList.length)]; // Random encoding + const ContentTopic = `/${applicationName}/${version}/${topicName}/${encoding}`; + + it(`random auto sharding ${ + i + 1 + } - Cluster ID: ${clusterId}, Content Topic: ${ContentTopic}`, async function () { + const pubsubTopics = [ + contentTopicToPubsubTopic(ContentTopic, clusterId) + ]; + + await nwaku.start({ + store: true, + lightpush: true, + relay: true, + clusterId: clusterId, + pubsubTopic: pubsubTopics, + contentTopic: [ContentTopic] + }); + + waku = await createLightNode({ + shardInfo: { + clusterId: clusterId, + contentTopics: [ContentTopic] + } + }); + + await waku.dial(await nwaku.getMultiaddrWithId()); + await waitForRemotePeer(waku, [Protocols.LightPush]); + + const encoder = createEncoder({ + contentTopic: ContentTopic, + pubsubTopicShardInfo: { + clusterId: clusterId, + shard: contentTopicToShardIndex(ContentTopic) + } + }); + + const request = await waku.lightPush.send(encoder, { + payload: utf8ToBytes("Hello World") + }); + + expect(request.successes.length).to.eq(1); + expect( + await messageCollector.waitForMessagesAutosharding(1, { + contentTopic: ContentTopic + }) + ).to.eq(true); + }); + } + + it("Wrong topic", async function () { + const wrongTopic = "wrong_format"; + try { + contentTopicToPubsubTopic(wrongTopic, clusterId); + throw new Error("Wrong topic should've thrown an error"); + } catch (err) { + if ( + !(err instanceof Error) || + !err.message.includes("Content topic format is invalid") + ) { + throw err; + } + } + }); + }); + + describe("Others", function () { + it("configure the node with multiple content topics", async function () { + const pubsubTopics = [ + contentTopicToPubsubTopic(ContentTopic, clusterId), + contentTopicToPubsubTopic(ContentTopic2, clusterId) + ]; + + await nwaku.start({ + store: true, + lightpush: true, + relay: true, + clusterId: clusterId, + pubsubTopic: pubsubTopics, + contentTopic: [ContentTopic, ContentTopic2] + }); + + waku = await createLightNode({ + shardInfo: { + clusterId: clusterId, + // For autosharding, we configure multiple pubsub topics by using two content topics that hash to different shards + contentTopics: [ContentTopic, ContentTopic2] + } + }); + await waku.dial(await nwaku.getMultiaddrWithId()); + await waitForRemotePeer(waku, [Protocols.LightPush]); + + const encoder1 = createEncoder({ + contentTopic: ContentTopic, + pubsubTopicShardInfo: { + clusterId: clusterId, + shard: contentTopicToShardIndex(ContentTopic) + } + }); + + const encoder2 = createEncoder({ + contentTopic: ContentTopic2, + pubsubTopicShardInfo: { + clusterId: clusterId, + shard: contentTopicToShardIndex(ContentTopic2) + } + }); + + const request1 = await waku.lightPush.send(encoder1, { + payload: utf8ToBytes("Hello World") + }); + expect(request1.successes.length).to.eq(1); + expect( + await messageCollector.waitForMessagesAutosharding(1, { + contentTopic: ContentTopic + }) + ).to.eq(true); + + const request2 = await waku.lightPush.send(encoder2, { + payload: utf8ToBytes("Hello World") + }); + expect(request2.successes.length).to.eq(1); + expect( + await messageCollector.waitForMessagesAutosharding(1, { + contentTopic: ContentTopic + }) + ).to.eq(true); + }); + + it("using a protocol with unconfigured pubsub topic should fail", async function () { + const pubsubTopics = [contentTopicToPubsubTopic(ContentTopic, clusterId)]; + await nwaku.start({ + store: true, + lightpush: true, + relay: true, + clusterId: clusterId, + pubsubTopic: pubsubTopics, + contentTopic: [ContentTopic] + }); + + waku = await createLightNode({ + shardInfo: { + clusterId: clusterId, + contentTopics: [ContentTopic] + } + }); + + // use a content topic that is not configured + const encoder = createEncoder({ + contentTopic: ContentTopic2, + pubsubTopicShardInfo: { + clusterId: clusterId, + shard: contentTopicToShardIndex(ContentTopic2) + } + }); + + const { successes, failures } = await waku.lightPush.send(encoder, { + payload: utf8ToBytes("Hello World") + }); + + if (successes.length > 0 || failures?.length === 0) { + throw new Error("The request should've thrown an error"); + } + + const errors = failures?.map((failure) => failure.error); + expect(errors).to.include(ProtocolError.TOPIC_NOT_CONFIGURED); + }); + + it("start node with ApplicationInfo", async function () { + const pubsubTopics = [contentTopicToPubsubTopic(ContentTopic, clusterId)]; + + await nwaku.start({ + store: true, + lightpush: true, + relay: true, + clusterId: clusterId, + pubsubTopic: pubsubTopics, + contentTopic: [ContentTopic] + }); + + waku = await createLightNode({ + shardInfo: { + clusterId: clusterId, + application: ContentTopic.split("/")[1], + version: ContentTopic.split("/")[2] + } + }); + await waku.dial(await nwaku.getMultiaddrWithId()); + await waitForRemotePeer(waku, [Protocols.LightPush]); + + const encoder = createEncoder({ + contentTopic: ContentTopic, + pubsubTopicShardInfo: { + clusterId: clusterId, + shard: contentTopicToShardIndex(ContentTopic) + } + }); + + const request = await waku.lightPush.send(encoder, { + payload: utf8ToBytes("Hello World") + }); + + expect(request.successes.length).to.eq(1); + expect( + await messageCollector.waitForMessagesAutosharding(1, { + contentTopic: ContentTopic + }) + ).to.eq(true); + }); + + it("start node with empty content topic", async function () { + try { + waku = await createLightNode({ + shardInfo: { + clusterId: clusterId, + contentTopics: [] + } + }); + throw new Error( + "Starting the node with no content topic should've thrown an error" + ); + } catch (err) { + if ( + !(err instanceof Error) || + !err.message.includes( + "Missing minimum required configuration options for static sharding or autosharding" + ) + ) { + throw err; + } + } + }); + }); +}); diff --git a/packages/tests/tests/sharding/peer_management.spec.ts b/packages/tests/tests/sharding/peer_management.spec.ts index 5e49f9bb21..95341e43a9 100644 --- a/packages/tests/tests/sharding/peer_management.spec.ts +++ b/packages/tests/tests/sharding/peer_management.spec.ts @@ -20,7 +20,9 @@ import { afterEachCustom, beforeEachCustom, delay, + isNwakuAtLeast, makeLogFileName, + resolveAutoshardingCluster, ServiceNode, tearDownNodes } from "../../src/index.js"; @@ -205,8 +207,14 @@ describe("Static Sharding: Peer Management", function () { }); describe("Autosharding: Peer Management", function () { - const ContentTopic = "/waku/2/content/test.js"; - const clusterId = 2; + const ContentTopic = "/myapp/1/latest/proto"; + const clusterId = resolveAutoshardingCluster(8); + + before(async () => { + if (!isNwakuAtLeast("0.27.0")) { + this.ctx.skip(); + } + }); describe("Peer Exchange", function () { let waku: LightNode; @@ -241,7 +249,8 @@ describe("Autosharding: Peer Management", function () { discv5Discovery: true, peerExchange: true, relay: true, - clusterId: clusterId + clusterId: clusterId, + contentTopic: [ContentTopic] }); const enr1 = (await nwaku1.info()).enrUri; @@ -252,7 +261,8 @@ describe("Autosharding: Peer Management", function () { peerExchange: true, discv5BootstrapNode: enr1, relay: true, - clusterId: clusterId + clusterId: clusterId, + contentTopic: [ContentTopic] }); const enr2 = (await nwaku2.info()).enrUri; @@ -263,7 +273,8 @@ describe("Autosharding: Peer Management", function () { peerExchange: true, discv5BootstrapNode: enr2, relay: true, - clusterId: clusterId + clusterId: clusterId, + contentTopic: [ContentTopic] }); const nwaku3Ma = await nwaku3.getMultiaddrWithId(); @@ -332,7 +343,8 @@ describe("Autosharding: Peer Management", function () { discv5Discovery: true, peerExchange: true, discv5BootstrapNode: enr1, - clusterId: clusterId + clusterId: clusterId, + contentTopic: [ContentTopic] }); const enr2 = (await nwaku2.info()).enrUri; @@ -343,7 +355,8 @@ describe("Autosharding: Peer Management", function () { discv5Discovery: true, peerExchange: true, discv5BootstrapNode: enr2, - clusterId: clusterId + clusterId: clusterId, + contentTopic: [ContentTopic] }); const nwaku3Ma = await nwaku3.getMultiaddrWithId(); diff --git a/packages/tests/tests/sharding/running_nodes.spec.ts b/packages/tests/tests/sharding/running_nodes.spec.ts deleted file mode 100644 index 1d51141990..0000000000 --- a/packages/tests/tests/sharding/running_nodes.spec.ts +++ /dev/null @@ -1,152 +0,0 @@ -import { - LightNode, - ProtocolError, - Protocols, - ShardInfo, - SingleShardInfo -} from "@waku/interfaces"; -import { - createEncoder, - createLightNode, - utf8ToBytes, - waitForRemotePeer -} from "@waku/sdk"; -import { contentTopicToShardIndex } from "@waku/utils"; -import { expect } from "chai"; - -import { - afterEachCustom, - beforeEachCustom, - makeLogFileName, - ServiceNode, - tearDownNodes -} from "../../src/index.js"; - -const shardInfoFirstShard: ShardInfo = { clusterId: 0, shards: [2] }; -const shardInfoBothShards: ShardInfo = { clusterId: 0, shards: [2, 3] }; -const singleShardInfo1: SingleShardInfo = { clusterId: 0, shard: 2 }; -const singleShardInfo2: SingleShardInfo = { clusterId: 0, shard: 3 }; -const ContentTopic = "/waku/2/content/test.js"; -const ContentTopic2 = "/myapp/1/latest/proto"; - -describe("Static Sharding: Running Nodes", function () { - let waku: LightNode; - let nwaku: ServiceNode; - - beforeEachCustom(this, async () => { - nwaku = new ServiceNode(makeLogFileName(this.ctx)); - await nwaku.start({ store: true, lightpush: true, relay: true }); - }); - - afterEachCustom(this, async () => { - await tearDownNodes(nwaku, waku); - }); - - it("configure the node with multiple pubsub topics", async function () { - this.timeout(15_000); - waku = await createLightNode({ - shardInfo: shardInfoBothShards - }); - await waku.dial(await nwaku.getMultiaddrWithId()); - await waitForRemotePeer(waku, [Protocols.LightPush]); - - const encoder1 = createEncoder({ - contentTopic: ContentTopic, - pubsubTopicShardInfo: singleShardInfo1 - }); - - const encoder2 = createEncoder({ - contentTopic: ContentTopic, - pubsubTopicShardInfo: singleShardInfo2 - }); - - const request1 = await waku.lightPush.send(encoder1, { - payload: utf8ToBytes("Hello World") - }); - - const request2 = await waku.lightPush.send(encoder2, { - payload: utf8ToBytes("Hello World") - }); - - expect(request1.successes.length).to.eq(1); - expect(request2.successes.length).to.eq(1); - }); - - it("using a protocol with unconfigured pubsub topic should fail", async function () { - this.timeout(15_000); - waku = await createLightNode({ - shardInfo: shardInfoFirstShard - }); - - // use a pubsub topic that is not configured - const encoder = createEncoder({ - contentTopic: ContentTopic, - pubsubTopicShardInfo: singleShardInfo2 - }); - - const { successes, failures } = await waku.lightPush.send(encoder, { - payload: utf8ToBytes("Hello World") - }); - - if (successes.length > 0 || failures?.length === 0) { - throw new Error("The request should've thrown an error"); - } - - const errors = failures?.map((failure) => failure.error); - expect(errors).to.include(ProtocolError.TOPIC_NOT_CONFIGURED); - }); -}); - -describe("Autosharding: Running Nodes", function () { - let waku: LightNode; - let nwaku: ServiceNode; - - beforeEachCustom(this, async () => { - nwaku = new ServiceNode(makeLogFileName(this.ctx)); - await nwaku.start({ store: true, lightpush: true, relay: true }); - }); - - afterEachCustom(this, async () => { - await tearDownNodes(nwaku, waku); - }); - - it("configure the node with multiple pubsub topics", async function () { - this.timeout(15_000); - waku = await createLightNode({ - shardInfo: { - clusterId: 0, - // For autosharding, we configure multiple pubsub topics by using two content topics that hash to different shards - contentTopics: [ContentTopic, ContentTopic2] - } - }); - await waku.dial(await nwaku.getMultiaddrWithId()); - await waitForRemotePeer(waku, [Protocols.LightPush]); - - const encoder1 = createEncoder({ - contentTopic: ContentTopic, - pubsubTopicShardInfo: { - clusterId: 0, - shard: contentTopicToShardIndex(ContentTopic) - } - }); - - const encoder2 = createEncoder({ - contentTopic: ContentTopic, - pubsubTopicShardInfo: { - clusterId: 0, - shard: contentTopicToShardIndex(ContentTopic2) - } - }); - - const request1 = await waku.lightPush.send(encoder1, { - payload: utf8ToBytes("Hello World") - }); - - const request2 = await waku.lightPush.send(encoder2, { - payload: utf8ToBytes("Hello World") - }); - - expect(request1.successes.length).to.eq(1); - expect(request2.successes.length).to.eq(1); - }); -}); diff --git a/packages/tests/tests/sharding/static_sharding.spec.ts b/packages/tests/tests/sharding/static_sharding.spec.ts new file mode 100644 index 0000000000..b4558ecb5a --- /dev/null +++ b/packages/tests/tests/sharding/static_sharding.spec.ts @@ -0,0 +1,268 @@ +import { + LightNode, + ProtocolError, + Protocols, + ShardInfo, + SingleShardInfo +} from "@waku/interfaces"; +import { + createEncoder, + createLightNode, + utf8ToBytes, + waitForRemotePeer +} from "@waku/sdk"; +import { + shardInfoToPubsubTopics, + singleShardInfosToShardInfo, + singleShardInfoToPubsubTopic +} from "@waku/utils"; +import { expect } from "chai"; + +import { + afterEachCustom, + beforeEachCustom, + makeLogFileName, + MessageCollector, + ServiceNode, + tearDownNodes +} from "../../src/index.js"; + +const ContentTopic = "/waku/2/content/test.js"; + +describe("Static Sharding: Running Nodes", function () { + this.timeout(15_000); + let waku: LightNode; + let nwaku: ServiceNode; + let messageCollector: MessageCollector; + + beforeEachCustom(this, async () => { + nwaku = new ServiceNode(makeLogFileName(this.ctx)); + messageCollector = new MessageCollector(nwaku); + }); + + afterEachCustom(this, async () => { + await tearDownNodes(nwaku, waku); + }); + + describe("Different clusters and shards", function () { + // Will be skipped until https://github.com/waku-org/js-waku/issues/1874 is fixed + it.skip("shard 0", async function () { + const singleShardInfo = { clusterId: 0, shard: 0 }; + const encoder = createEncoder({ + contentTopic: ContentTopic, + pubsubTopicShardInfo: singleShardInfo + }); + expect(encoder.pubsubTopic).to.eq( + singleShardInfoToPubsubTopic(singleShardInfo) + ); + }); + + // dedicated test for Default Cluster ID 0 + it("Cluster ID 0 - Default/Global Cluster", async function () { + const singleShardInfo = { clusterId: 0, shard: 1 }; + const shardInfo = singleShardInfosToShardInfo([singleShardInfo]); + + await nwaku.start({ + store: true, + lightpush: true, + relay: true, + pubsubTopic: shardInfoToPubsubTopics(shardInfo) + }); + + waku = await createLightNode({ + shardInfo: shardInfo + }); + await waku.dial(await nwaku.getMultiaddrWithId()); + await waitForRemotePeer(waku, [Protocols.LightPush]); + + const encoder = createEncoder({ + contentTopic: ContentTopic, + pubsubTopicShardInfo: singleShardInfo + }); + + const request = await waku.lightPush.send(encoder, { + payload: utf8ToBytes("Hello World") + }); + + expect(request.successes.length).to.eq(1); + expect( + await messageCollector.waitForMessages(1, { + pubsubTopic: shardInfoToPubsubTopics(shardInfo)[0] + }) + ).to.eq(true); + }); + + const numTest = 10; + for (let i = 0; i < numTest; i++) { + // Random clusterId between 2 and 1000 + const clusterId = Math.floor(Math.random() * 999) + 2; + + // Random shardId between 1 and 1000 + const shardId = Math.floor(Math.random() * 1000) + 1; + + it(`random static sharding ${ + i + 1 + } - Cluster ID: ${clusterId}, Shard ID: ${shardId}`, async function () { + afterEach(async () => { + await tearDownNodes(nwaku, waku); + }); + + const singleShardInfo = { clusterId: clusterId, shard: shardId }; + const shardInfo = singleShardInfosToShardInfo([singleShardInfo]); + + await nwaku.start({ + store: true, + lightpush: true, + relay: true, + clusterId: clusterId, + pubsubTopic: shardInfoToPubsubTopics(shardInfo) + }); + + waku = await createLightNode({ + shardInfo: shardInfo + }); + await waku.dial(await nwaku.getMultiaddrWithId()); + await waitForRemotePeer(waku, [Protocols.LightPush]); + + const encoder = createEncoder({ + contentTopic: ContentTopic, + pubsubTopicShardInfo: singleShardInfo + }); + + const request = await waku.lightPush.send(encoder, { + payload: utf8ToBytes("Hello World") + }); + + expect(request.successes.length).to.eq(1); + expect( + await messageCollector.waitForMessages(1, { + pubsubTopic: shardInfoToPubsubTopics(shardInfo)[0] + }) + ).to.eq(true); + }); + } + }); + + describe("Others", function () { + const clusterId = 2; + let shardInfo: ShardInfo; + + const shardInfoFirstShard: ShardInfo = { + clusterId: clusterId, + shards: [2] + }; + const shardInfoBothShards: ShardInfo = { + clusterId: clusterId, + shards: [2, 3] + }; + const singleShardInfo1: SingleShardInfo = { + clusterId: clusterId, + shard: 2 + }; + const singleShardInfo2: SingleShardInfo = { + clusterId: clusterId, + shard: 3 + }; + + beforeEachCustom(this, async () => { + shardInfo = { + clusterId: clusterId, + shards: [2] + }; + + await nwaku.start({ + store: true, + lightpush: true, + relay: true, + clusterId: clusterId, + pubsubTopic: shardInfoToPubsubTopics(shardInfo) + }); + }); + + afterEachCustom(this, async () => { + await tearDownNodes(nwaku, waku); + }); + + it("configure the node with multiple pubsub topics", async function () { + waku = await createLightNode({ + shardInfo: shardInfoBothShards + }); + await waku.dial(await nwaku.getMultiaddrWithId()); + await waitForRemotePeer(waku, [Protocols.LightPush]); + + const encoder1 = createEncoder({ + contentTopic: ContentTopic, + pubsubTopicShardInfo: singleShardInfo1 + }); + + const encoder2 = createEncoder({ + contentTopic: ContentTopic, + pubsubTopicShardInfo: singleShardInfo2 + }); + + const request1 = await waku.lightPush.send(encoder1, { + payload: utf8ToBytes("Hello World2") + }); + expect(request1.successes.length).to.eq(1); + expect( + await messageCollector.waitForMessages(1, { + pubsubTopic: shardInfoToPubsubTopics(shardInfo)[0] + }) + ).to.eq(true); + + const request2 = await waku.lightPush.send(encoder2, { + payload: utf8ToBytes("Hello World3") + }); + expect(request2.successes.length).to.eq(1); + expect( + await messageCollector.waitForMessages(1, { + pubsubTopic: shardInfoToPubsubTopics(shardInfo)[0] + }) + ).to.eq(true); + }); + + it("using a protocol with unconfigured pubsub topic should fail", async function () { + this.timeout(15_000); + waku = await createLightNode({ + shardInfo: shardInfoFirstShard + }); + + // use a pubsub topic that is not configured + const encoder = createEncoder({ + contentTopic: ContentTopic, + pubsubTopicShardInfo: singleShardInfo2 + }); + + const { successes, failures } = await waku.lightPush.send(encoder, { + payload: utf8ToBytes("Hello World") + }); + + if (successes.length > 0 || failures?.length === 0) { + throw new Error("The request should've thrown an error"); + } + + const errors = failures?.map((failure) => failure.error); + expect(errors).to.include(ProtocolError.TOPIC_NOT_CONFIGURED); + }); + + it("start node with empty shard", async function () { + try { + waku = await createLightNode({ + shardInfo: { clusterId: clusterId, shards: [] } + }); + throw new Error( + "Starting the node with no shard should've thrown an error" + ); + } catch (err) { + if ( + !(err instanceof Error) || + !err.message.includes( + "Missing minimum required configuration options for static sharding or autosharding" + ) + ) { + throw err; + } + } + }); + }); +}); diff --git a/packages/tests/tests/store/multiple_pubsub.spec.ts b/packages/tests/tests/store/multiple_pubsub.spec.ts index cfac638e96..2fd1869397 100644 --- a/packages/tests/tests/store/multiple_pubsub.spec.ts +++ b/packages/tests/tests/store/multiple_pubsub.spec.ts @@ -11,8 +11,10 @@ import { expect } from "chai"; import { afterEachCustom, beforeEachCustom, + isNwakuAtLeast, makeLogFileName, NOISE_KEY_1, + resolveAutoshardingCluster, ServiceNode, tearDownNodes } from "../../src/index.js"; @@ -180,8 +182,7 @@ describe("Waku Store, custom pubsub topic", function () { }); }); -// Skipped until https://github.com/waku-org/js-waku/issues/1845 gets fixed -describe.skip("Waku Store (Autosharding), custom pubsub topic", function () { +describe("Waku Store (Autosharding), custom pubsub topic", function () { this.timeout(15000); let waku: LightNode; let nwaku: ServiceNode; @@ -189,7 +190,7 @@ describe.skip("Waku Store (Autosharding), custom pubsub topic", function () { const customContentTopic1 = "/waku/2/content/utf8"; const customContentTopic2 = "/myapp/1/latest/proto"; - const clusterId = 2; + const clusterId = resolveAutoshardingCluster(5); const autoshardingPubsubTopic1 = contentTopicToPubsubTopic( customContentTopic1, clusterId @@ -215,11 +216,18 @@ describe.skip("Waku Store (Autosharding), custom pubsub topic", function () { contentTopics: [customContentTopic1, customContentTopic2] }; + before(async () => { + if (!isNwakuAtLeast("0.27.0")) { + this.ctx.skip(); + } + }); + beforeEachCustom(this, async () => { nwaku = new ServiceNode(makeLogFileName(this.ctx)); await nwaku.start({ store: true, pubsubTopic: [autoshardingPubsubTopic1, autoshardingPubsubTopic2], + contentTopic: [customContentTopic1, customContentTopic2], relay: true, clusterId }); @@ -293,6 +301,7 @@ describe.skip("Waku Store (Autosharding), custom pubsub topic", function () { await nwaku2.start({ store: true, pubsubTopic: [autoshardingPubsubTopic2], + contentTopic: [customContentTopic2], relay: true, clusterId }); diff --git a/packages/utils/src/common/sharding.spec.ts b/packages/utils/src/common/sharding.spec.ts index f20e549c73..7ae9f2bc18 100644 --- a/packages/utils/src/common/sharding.spec.ts +++ b/packages/utils/src/common/sharding.spec.ts @@ -1,10 +1,18 @@ +import { DEFAULT_CLUSTER_ID, DefaultPubsubTopic } from "@waku/interfaces"; import { expect } from "chai"; import { contentTopicsByPubsubTopic, contentTopicToPubsubTopic, contentTopicToShardIndex, - ensureValidContentTopic + determinePubsubTopic, + ensurePubsubTopicIsConfigured, + ensureShardingConfigured, + ensureValidContentTopic, + pubsubTopicToSingleShardInfo, + shardInfoToPubsubTopics, + singleShardInfosToShardInfo, + singleShardInfoToPubsubTopic } from "./sharding"; const testInvalidCases = ( @@ -95,19 +103,37 @@ describe("ensureValidContentTopic", () => { }); describe("contentTopicToShardIndex", () => { - it("converts content topics to expected shard index", () => { - const contentTopics: [string, number][] = [ - ["/toychat/2/huilong/proto", 3], - ["/myapp/1/latest/proto", 0], - ["/waku/2/content/test.js", 1], - ["/toychat/2/huilong/proto", 3], - ["/0/toychat/2/huilong/proto", 3], - ["/statusim/1/community/cbor", 4], - ["/0/statusim/1/community/cbor", 4] - ]; - for (const [topic, shard] of contentTopics) { - expect(contentTopicToShardIndex(topic)).to.eq(shard); - } + const contentTopicsWithExpectedShards: [string, number][] = [ + ["/toychat/2/huilong/proto", 3], + ["/myapp/1/latest/proto", 0], + ["/waku/2/content/test.js", 1], + ["/toychat/2/huilong/proto", 3], + ["/0/toychat/2/huilong/proto", 3], + ["/statusim/1/community/cbor", 4], + ["/0/statusim/1/community/cbor", 4], + ["/app/22/sometopic/someencoding", 2], + ["/app/27/sometopic/someencoding", 5], + ["/app/20/sometopic/someencoding", 7], + ["/app/29/sometopic/someencoding", 6] + ]; + contentTopicsWithExpectedShards.forEach(([topic, expectedShard]) => { + it(`should correctly map ${topic} to shard index ${expectedShard}`, () => { + expect(contentTopicToShardIndex(topic)).to.eq(expectedShard); + }); + }); + + const testCases: [number, string, number][] = [ + [16, "/app/20/sometopic/someencoding", 15], + [2, "/app/20/sometopic/someencoding", 1], + [1, "/app/20/sometopic/someencoding", 0] + ]; + + testCases.forEach(([networkShards, topic, expectedShard]) => { + it(`should correctly map ${topic} to shard index ${expectedShard} with networkShards ${networkShards}`, () => { + expect(contentTopicToShardIndex(topic, networkShards)).to.eq( + expectedShard + ); + }); }); it("topics with same application and version share the same shard", () => { @@ -133,15 +159,387 @@ describe("contentTopicsByPubsubTopic", () => { expect(grouped.get(pubsubTopic)?.includes(contentTopic)).to.be.true; } }); -}); -describe("contentTopicsByPubsubTopic", () => { - it("groups content topics by expected pubsub topic", () => { - const contentTopics = ["/toychat/2/huilong/proto", "/myapp/1/latest/proto"]; + it("groups multiple content topics into the same pubsub topic when they share the same shard index", () => { + const contentTopics = [ + "/app/22/sometopic/someencoding", + "/app/22/anothertopic/otherencoding" + ]; const grouped = contentTopicsByPubsubTopic(contentTopics); - for (const contentTopic of contentTopics) { - const pubsubTopic = contentTopicToPubsubTopic(contentTopic); - expect(grouped.get(pubsubTopic)?.includes(contentTopic)).to.be.true; - } + expect(grouped.size).to.eq(1); // Only one pubsub topic expected + const pubsubTopic = contentTopicToPubsubTopic(contentTopics[0]); + expect(grouped.get(pubsubTopic)?.length).to.eq(2); // Both topics should be grouped under the same pubsub topic + }); + + it("handles different clusterIds correctly", () => { + const contentTopics = ["/app/22/sometopic/someencoding"]; + const clusterId1 = 1; + const clusterId2 = 2; + const grouped1 = contentTopicsByPubsubTopic(contentTopics, clusterId1); + const grouped2 = contentTopicsByPubsubTopic(contentTopics, clusterId2); + const pubsubTopic1 = contentTopicToPubsubTopic( + contentTopics[0], + clusterId1 + ); + const pubsubTopic2 = contentTopicToPubsubTopic( + contentTopics[0], + clusterId2 + ); + expect(pubsubTopic1).not.to.equal(pubsubTopic2); + expect(grouped1.has(pubsubTopic1)).to.be.true; + expect(grouped1.has(pubsubTopic2)).to.be.false; + expect(grouped2.has(pubsubTopic1)).to.be.false; + expect(grouped2.has(pubsubTopic2)).to.be.true; + }); + + it("handles different networkShards values correctly", () => { + const contentTopics = ["/app/20/sometopic/someencoding"]; + const networkShards1 = 8; + const networkShards2 = 16; + const grouped1 = contentTopicsByPubsubTopic( + contentTopics, + DEFAULT_CLUSTER_ID, + networkShards1 + ); + const grouped2 = contentTopicsByPubsubTopic( + contentTopics, + DEFAULT_CLUSTER_ID, + networkShards2 + ); + const pubsubTopic1 = contentTopicToPubsubTopic( + contentTopics[0], + DEFAULT_CLUSTER_ID, + networkShards1 + ); + const pubsubTopic2 = contentTopicToPubsubTopic( + contentTopics[0], + DEFAULT_CLUSTER_ID, + networkShards2 + ); + expect(pubsubTopic1).not.to.equal(pubsubTopic2); + expect(grouped1.has(pubsubTopic1)).to.be.true; + expect(grouped1.has(pubsubTopic2)).to.be.false; + expect(grouped2.has(pubsubTopic1)).to.be.false; + expect(grouped2.has(pubsubTopic2)).to.be.true; + }); + + it("throws an error for improperly formatted content topics", () => { + const invalidContentTopics = ["/invalid/format"]; + expect(() => contentTopicsByPubsubTopic(invalidContentTopics)).to.throw(); + }); +}); + +describe("singleShardInfoToPubsubTopic", () => { + it("should convert a SingleShardInfo object to the correct PubsubTopic", () => { + const singleShardInfo = { clusterId: 2, shard: 2 }; + const expectedTopic = "/waku/2/rs/2/2"; + expect(singleShardInfoToPubsubTopic(singleShardInfo)).to.equal( + expectedTopic + ); + }); +}); + +describe("singleShardInfosToShardInfo", () => { + it("should aggregate SingleShardInfos into a ShardInfo", () => { + const singleShardInfos = [ + { clusterId: 1, shard: 2 }, + { clusterId: 1, shard: 3 }, + { clusterId: 1, shard: 5 } + ]; + const expectedShardInfo = { clusterId: 1, shards: [2, 3, 5] }; + expect(singleShardInfosToShardInfo(singleShardInfos)).to.deep.equal( + expectedShardInfo + ); + }); + + it("should throw an error for empty SingleShardInfos array", () => { + expect(() => singleShardInfosToShardInfo([])).to.throw("Invalid shard"); + }); + + it("should throw an error for SingleShardInfos with different clusterIds", () => { + const invalidShardInfos = [ + { clusterId: 1, shard: 2 }, + { clusterId: 2, shard: 3 } + ]; + expect(() => singleShardInfosToShardInfo(invalidShardInfos)).to.throw( + "Passed shard infos have different clusterIds" + ); + }); +}); + +describe("shardInfoToPubsubTopics", () => { + it("should convert content topics to PubsubTopics for autosharding", () => { + const shardInfo = { + contentTopics: ["/app/v1/topic1/proto", "/app/v1/topic2/proto"] + }; + const topics = shardInfoToPubsubTopics(shardInfo); + expect(topics).to.be.an("array").that.includes("/waku/2/rs/1/4"); + expect(topics.length).to.equal(1); + }); + + it("should return unique PubsubTopics for static sharding", () => { + const shardInfo = { clusterId: 1, shards: [0, 1, 0] }; // Duplicate shard to test uniqueness + const topics = shardInfoToPubsubTopics(shardInfo); + expect(topics).to.have.members(["/waku/2/rs/1/0", "/waku/2/rs/1/1"]); + expect(topics.length).to.equal(2); + }); + + it("should handle application and version for autosharding", () => { + const shardInfo = { application: "app", version: "v1" }; + const topics = shardInfoToPubsubTopics(shardInfo); + expect(topics).to.be.an("array").that.includes("/waku/2/rs/1/4"); + expect(topics.length).to.equal(1); + }); + + [0, 1, 6].forEach((clusterId) => { + it(`should handle clusterId, application and version for autosharding with cluster iD ${clusterId}`, () => { + const shardInfo = { + clusterId: clusterId, + application: "app", + version: "v1" + }; + const topics = shardInfoToPubsubTopics(shardInfo); + expect(topics) + .to.be.an("array") + .that.includes(`/waku/2/rs/${clusterId}/4`); + expect(topics.length).to.equal(1); + }); + }); + + it("should return empty list for no shard", () => { + const shardInfo = { clusterId: 1, shards: [] }; + const topics = shardInfoToPubsubTopics(shardInfo); + expect(topics.length).to.equal(0); + }); + + it("should throw an error if shards are undefined for static sharding", () => { + const shardInfo = { clusterId: 1, shards: undefined }; + expect(() => shardInfoToPubsubTopics(shardInfo)).to.throw("Invalid shard"); + }); + + it("should throw an error for missing required configuration", () => { + const shardInfo = {}; + expect(() => shardInfoToPubsubTopics(shardInfo)).to.throw( + "Missing required configuration in shard parameters" + ); + }); +}); + +describe("pubsubTopicToSingleShardInfo with various invalid formats", () => { + const invalidTopics = [ + "/waku/1/rs/1/2", // Invalid Waku version + "/waku/2/r/1/2", // Invalid path segment + "/incorrect/format", // Completely incorrect format + "/waku/2/rs", // Missing both clusterId and shard + "/waku/2/rs/1/2/extra" // Extra trailing data + ]; + + it("should extract SingleShardInfo from a valid PubsubTopic", () => { + const topic = "/waku/2/rs/1/2"; + const expectedInfo = { clusterId: 1, shard: 2 }; + expect(pubsubTopicToSingleShardInfo(topic)).to.deep.equal(expectedInfo); + }); + + invalidTopics.forEach((topic) => { + it(`should throw an error for invalid PubsubTopic format: ${topic}`, () => { + expect(() => pubsubTopicToSingleShardInfo(topic)).to.throw( + "Invalid pubsub topic" + ); + }); + }); + + const nonNumericValues = ["x", "y", "$", "!", "\\", "-", "", " "]; + nonNumericValues.forEach((value) => { + it(`should throw an error for non-numeric clusterId: /waku/2/rs/${value}/1`, () => { + expect(() => + pubsubTopicToSingleShardInfo(`/waku/2/rs/${value}/1`) + ).to.throw("Invalid clusterId or shard"); + }); + + it(`should throw an error for non-numeric shard: /waku/2/rs/1/${value}`, () => { + expect(() => + pubsubTopicToSingleShardInfo(`/waku/2/rs/1/${value}`) + ).to.throw("Invalid clusterId or shard"); + }); + }); +}); + +describe("ensurePubsubTopicIsConfigured", () => { + it("should not throw an error for a single configured topic", () => { + const topic = "/waku/2/rs/1/2"; + const configuredTopics = [topic]; + expect(() => + ensurePubsubTopicIsConfigured(topic, configuredTopics) + ).not.to.throw(); + }); + + it("should not throw an error when the topic is within a list of configured topics", () => { + const topic = "/waku/2/rs/1/2"; + const configuredTopics = ["/waku/2/rs/1/1", topic, "/waku/2/rs/1/3"]; + expect(() => + ensurePubsubTopicIsConfigured(topic, configuredTopics) + ).not.to.throw(); + }); + + it("should throw an error for an unconfigured topic", () => { + const topic = "/waku/2/rs/1/2"; + const configuredTopics = ["/waku/2/rs/1/3"]; + expect(() => + ensurePubsubTopicIsConfigured(topic, configuredTopics) + ).to.throw(); + }); +}); + +describe("determinePubsubTopic", () => { + const contentTopic = "/app/46/sometopic/someencoding"; + it("should return the pubsub topic directly if a string is provided", () => { + const topic = "/waku/2/rs/1/3"; + expect(determinePubsubTopic(contentTopic, topic)).to.equal(topic); + }); + + it("should return a calculated topic if SingleShardInfo is provided", () => { + const info = { clusterId: 1, shard: 2 }; + const expectedTopic = "/waku/2/rs/1/2"; + expect(determinePubsubTopic(contentTopic, info)).to.equal(expectedTopic); + }); + + it("should fall back to default pubsub topic when pubsubTopicShardInfo is not provided", () => { + expect(determinePubsubTopic(contentTopic)).to.equal(DefaultPubsubTopic); + }); + + it("should process correctly when SingleShardInfo has no clusterId but has a shard", () => { + const info = { shard: 0 }; + const expectedTopic = `/waku/2/rs/${DEFAULT_CLUSTER_ID}/6`; + expect(determinePubsubTopic(contentTopic, info as any)).to.equal( + expectedTopic + ); + }); + + it("should derive a pubsub topic using contentTopic when SingleShardInfo only contains clusterId", () => { + const info = { clusterId: 2 }; + const expectedTopic = contentTopicToPubsubTopic( + contentTopic, + info.clusterId + ); + expect(determinePubsubTopic(contentTopic, info as any)).to.equal( + expectedTopic + ); + }); +}); + +describe("ensureShardingConfigured", () => { + it("should return valid sharding parameters for static sharding", () => { + const shardInfo = { clusterId: 1, shards: [0, 1] }; + const result = ensureShardingConfigured(shardInfo); + expect(result.shardingParams).to.deep.include({ + clusterId: 1, + shards: [0, 1] + }); + expect(result.shardInfo).to.deep.include({ clusterId: 1, shards: [0, 1] }); + expect(result.pubsubTopics).to.have.members([ + "/waku/2/rs/1/0", + "/waku/2/rs/1/1" + ]); + }); + + it("should return valid sharding parameters for content topics autosharding", () => { + const shardInfo = { contentTopics: ["/app/v1/topic1/proto"] }; + const result = ensureShardingConfigured(shardInfo); + expect(result.shardingParams).to.deep.include({ + contentTopics: ["/app/v1/topic1/proto"] + }); + const expectedPubsubTopic = contentTopicToPubsubTopic( + "/app/v1/topic1/proto", + DEFAULT_CLUSTER_ID + ); + expect(result.shardInfo.shards).to.include( + contentTopicToShardIndex("/app/v1/topic1/proto") + ); + expect(result.pubsubTopics).to.include(expectedPubsubTopic); + }); + + it("should configure sharding based on application and version for autosharding", () => { + const shardInfo = { application: "app", version: "v1" }; + const result = ensureShardingConfigured(shardInfo); + expect(result.shardingParams).to.deep.include({ + application: "app", + version: "v1" + }); + const expectedPubsubTopic = contentTopicToPubsubTopic( + `/app/v1/default/default` + ); + expect(result.pubsubTopics).to.include(expectedPubsubTopic); + expect(result.shardInfo.shards).to.include( + pubsubTopicToSingleShardInfo(expectedPubsubTopic).shard + ); + }); + + [0, 1, 4].forEach((clusterId) => { + it(`should configure sharding based on clusterId, application and version for autosharding with cluster iD ${clusterId}`, () => { + const shardInfo = { + clusterId: clusterId, + application: "app", + version: "v1" + }; + const result = ensureShardingConfigured(shardInfo); + expect(result.shardingParams).to.deep.include({ + application: "app", + version: "v1" + }); + const expectedPubsubTopic = contentTopicToPubsubTopic( + `/app/v1/default/default`, + shardInfo.clusterId + ); + expect(result.pubsubTopics).to.include(expectedPubsubTopic); + expect(result.shardInfo.shards).to.include( + pubsubTopicToSingleShardInfo(expectedPubsubTopic).shard + ); + }); + }); + + it("should throw an error for missing sharding configuration", () => { + const shardInfo = {}; + expect(() => ensureShardingConfigured(shardInfo)).to.throw(); + }); + + it("handles empty shards array correctly", () => { + const shardInfo = { clusterId: 1, shards: [] }; + expect(() => ensureShardingConfigured(shardInfo)).to.throw(); + }); + + it("handles empty contentTopics array correctly", () => { + const shardInfo = { contentTopics: [] }; + expect(() => ensureShardingConfigured(shardInfo)).to.throw(); + }); +}); + +describe("contentTopicToPubsubTopic", () => { + it("should correctly map a content topic to a pubsub topic", () => { + const contentTopic = "/app/v1/topic1/proto"; + expect(contentTopicToPubsubTopic(contentTopic)).to.equal("/waku/2/rs/1/4"); + }); + + it("should map different content topics to different pubsub topics based on shard index", () => { + const contentTopic1 = "/app/v1/topic1/proto"; + const contentTopic2 = "/app/v2/topic2/proto"; + const pubsubTopic1 = contentTopicToPubsubTopic(contentTopic1); + const pubsubTopic2 = contentTopicToPubsubTopic(contentTopic2); + expect(pubsubTopic1).not.to.equal(pubsubTopic2); + }); + + it("should use the provided clusterId for the pubsub topic", () => { + const contentTopic = "/app/v1/topic1/proto"; + const clusterId = 2; + expect(contentTopicToPubsubTopic(contentTopic, clusterId)).to.equal( + "/waku/2/rs/2/4" + ); + }); + + it("should correctly map a content topic to a pubsub topic for different network shard sizes", () => { + const contentTopic = "/app/v1/topic1/proto"; + const networkShards = 16; + expect(contentTopicToPubsubTopic(contentTopic, 1, networkShards)).to.equal( + "/waku/2/rs/1/4" + ); }); }); diff --git a/packages/utils/src/common/sharding.ts b/packages/utils/src/common/sharding.ts index 3ad5a77780..6d9a98f540 100644 --- a/packages/utils/src/common/sharding.ts +++ b/packages/utils/src/common/sharding.ts @@ -66,7 +66,8 @@ export const shardInfoToPubsubTopics = ( // Autosharding: single shard from application and version return [ contentTopicToPubsubTopic( - `/${shardInfo.application}/${shardInfo.version}/default/default` + `/${shardInfo.application}/${shardInfo.version}/default/default`, + shardInfo.clusterId ) ]; } else { @@ -293,7 +294,8 @@ export const ensureShardingConfigured = ( if (isApplicationVersionConfigured) { const pubsubTopic = contentTopicToPubsubTopic( - `/${application}/${version}/default/default` + `/${application}/${version}/default/default`, + clusterId ); return { shardingParams: { clusterId, application, version },