From a1efa81a6389488d2f449af15b1f82f5f5c37999 Mon Sep 17 00:00:00 2001 From: Florin Barbu Date: Mon, 4 Mar 2024 18:08:14 +0200 Subject: [PATCH 01/16] sharding tests refactor --- packages/tests/src/lib/service_node.ts | 2 +- packages/tests/src/types.ts | 1 + .../single_node/multiple_pubsub.node.spec.ts | 18 +- .../tests/tests/filter/single_node/utils.ts | 13 +- packages/tests/tests/getPeers.spec.ts | 20 +- .../single_node/multiple_pubsub.node.spec.ts | 2 +- packages/tests/tests/light-push/utils.ts | 13 +- packages/tests/tests/metadata.spec.ts | 1 - .../tests/relay/multiple_pubsub.node.spec.ts | 11 +- .../tests/sharding/auto_sharding.spec.ts | 380 ++++++++++++++++++ .../tests/sharding/peer_management.spec.ts | 17 +- .../tests/sharding/running_nodes.spec.ts | 167 -------- .../tests/sharding/static_sharding.spec.ts | 316 +++++++++++++++ .../tests/tests/store/multiple_pubsub.spec.ts | 7 +- 14 files changed, 765 insertions(+), 203 deletions(-) create mode 100644 packages/tests/tests/sharding/auto_sharding.spec.ts delete mode 100644 packages/tests/tests/sharding/running_nodes.spec.ts create mode 100644 packages/tests/tests/sharding/static_sharding.spec.ts diff --git a/packages/tests/src/lib/service_node.ts b/packages/tests/src/lib/service_node.ts index 1ff3af85d2..88a212516f 100644 --- a/packages/tests/src/lib/service_node.ts +++ b/packages/tests/src/lib/service_node.ts @@ -294,7 +294,7 @@ export class ServiceNode { } return this.restCall( - `/relay/v1/auto/message`, + `/relay/v1/auto/messages`, "POST", message, async (response) => response.status === 200 diff --git a/packages/tests/src/types.ts b/packages/tests/src/types.ts index 60e6b1401e..bfe9c38237 100644 --- a/packages/tests/src/types.ts +++ b/packages/tests/src/types.ts @@ -17,6 +17,7 @@ export interface Args { discv5Discovery?: boolean; storeMessageDbUrl?: string; pubsubTopic?: Array; + contentTopic?: Array; rpcPrivate?: boolean; websocketSupport?: boolean; tcpPort?: number; diff --git a/packages/tests/tests/filter/single_node/multiple_pubsub.node.spec.ts b/packages/tests/tests/filter/single_node/multiple_pubsub.node.spec.ts index 7fd42b1a79..c083674b96 100644 --- a/packages/tests/tests/filter/single_node/multiple_pubsub.node.spec.ts +++ b/packages/tests/tests/filter/single_node/multiple_pubsub.node.spec.ts @@ -186,6 +186,7 @@ describe("Waku Filter V2: Multiple PubsubTopics", function () { describe("Waku Filter V2 (Autosharding): Multiple PubsubTopics", function () { // Set the timeout for all tests in this suite. Can be overwritten at test level this.timeout(30000); + const clusterId = 1; let waku: LightNode; let nwaku: ServiceNode; let nwaku2: ServiceNode; @@ -196,36 +197,36 @@ describe("Waku Filter V2 (Autosharding): Multiple PubsubTopics", function () { const customContentTopic2 = "/myapp/1/latest/proto"; const autoshardingPubsubTopic1 = contentTopicToPubsubTopic( customContentTopic1, - 3 + clusterId ); const autoshardingPubsubTopic2 = contentTopicToPubsubTopic( customContentTopic2, - 3 + clusterId ); const contentTopicInfo: ContentTopicInfo = { - clusterId: 3, + clusterId: clusterId, contentTopics: [customContentTopic1, customContentTopic2] }; const customEncoder1 = createEncoder({ contentTopic: customContentTopic1, pubsubTopicShardInfo: { - clusterId: 3, + clusterId: clusterId, shard: contentTopicToShardIndex(customContentTopic1) } }); const customDecoder1 = createDecoder(customContentTopic1, { - clusterId: 3, + clusterId: clusterId, shard: contentTopicToShardIndex(customContentTopic1) }); const customEncoder2 = createEncoder({ contentTopic: customContentTopic2, pubsubTopicShardInfo: { - clusterId: 3, + clusterId: clusterId, shard: contentTopicToShardIndex(customContentTopic2) } }); const customDecoder2 = createDecoder(customContentTopic2, { - clusterId: 3, + clusterId: clusterId, shard: contentTopicToShardIndex(customContentTopic2) }); @@ -309,7 +310,8 @@ describe("Waku Filter V2 (Autosharding): Multiple PubsubTopics", function () { lightpush: true, relay: true, pubsubTopic: [autoshardingPubsubTopic2], - clusterId: 3 + clusterId: clusterId, + contentTopic: [customContentTopic2] }); await waku.dial(await nwaku2.getMultiaddrWithId()); await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]); diff --git a/packages/tests/tests/filter/single_node/utils.ts b/packages/tests/tests/filter/single_node/utils.ts index f52959c3fc..59195f0302 100644 --- a/packages/tests/tests/filter/single_node/utils.ts +++ b/packages/tests/tests/filter/single_node/utils.ts @@ -1,5 +1,6 @@ import { waitForRemotePeer } from "@waku/core"; import { + ContentTopicInfo, DefaultPubsubTopic, LightNode, ProtocolCreateOptions, @@ -26,17 +27,25 @@ export async function runNodes( ): Promise<[ServiceNode, LightNode]> { const nwaku = new ServiceNode(makeLogFileName(context)); + function isContentTopicInfo(info: ShardingParams): info is ContentTopicInfo { + return (info as ContentTopicInfo).contentTopics !== undefined; + } + await nwaku.start( { filter: true, lightpush: true, relay: true, pubsubTopic: pubsubTopics, - ...(shardInfo && { clusterId: shardInfo.clusterId }) + // Conditionally include clusterId if shardInfo exists + ...(shardInfo && { clusterId: shardInfo.clusterId }), + // Conditionally include contentTopic if shardInfo exists and clusterId is 1 + ...(shardInfo && + isContentTopicInfo(shardInfo) && + shardInfo.clusterId === 1 && { contentTopic: shardInfo.contentTopics }) }, { retries: 3 } ); - const waku_options: ProtocolCreateOptions = { staticNoiseKey: NOISE_KEY_1, libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } }, diff --git a/packages/tests/tests/getPeers.spec.ts b/packages/tests/tests/getPeers.spec.ts index fa8705d86d..7498dc9396 100644 --- a/packages/tests/tests/getPeers.spec.ts +++ b/packages/tests/tests/getPeers.spec.ts @@ -72,7 +72,6 @@ describe("getConnectedPeersForProtocolAndShard", function () { expect(peers.length).to.be.greaterThan(0); }); - // Had to use cluster 0 because of https://github.com/waku-org/js-waku/issues/1848 it("same cluster, different shard: nodes connect", async function () { this.timeout(15000); @@ -219,7 +218,7 @@ describe("getConnectedPeersForProtocolAndShard", function () { this.timeout(15000); const shardInfo: ContentTopicInfo = { - clusterId: 2, + clusterId: 1, contentTopics: [contentTopic] }; @@ -228,6 +227,7 @@ describe("getConnectedPeersForProtocolAndShard", function () { peerExchange: true, clusterId: shardInfo.clusterId, pubsubTopic: shardInfoToPubsubTopics(shardInfo), + contentTopic: [contentTopic], lightpush: true, relay: true }); @@ -251,12 +251,12 @@ describe("getConnectedPeersForProtocolAndShard", function () { this.timeout(15000); const shardInfo1: ContentTopicInfo = { - clusterId: 2, + clusterId: 1, contentTopics: [contentTopic] }; const shardInfo2: ContentTopicInfo = { - clusterId: 2, + clusterId: 1, contentTopics: ["/test/5/waku-light-push/utf8"] }; @@ -266,6 +266,7 @@ describe("getConnectedPeersForProtocolAndShard", function () { peerExchange: true, clusterId: shardInfo1.clusterId, pubsubTopic: shardInfoToPubsubTopics(shardInfo1), + contentTopic: [contentTopic], lightpush: true, relay: true }); @@ -276,6 +277,7 @@ describe("getConnectedPeersForProtocolAndShard", function () { peerExchange: true, clusterId: shardInfo2.clusterId, pubsubTopic: shardInfoToPubsubTopics(shardInfo2), + contentTopic: [contentTopic], lightpush: true, relay: true }); @@ -303,12 +305,12 @@ describe("getConnectedPeersForProtocolAndShard", function () { this.timeout(15000); const shardInfo1: ContentTopicInfo = { - clusterId: 2, + clusterId: 1, contentTopics: [contentTopic] }; const shardInfo2: ContentTopicInfo = { - clusterId: 3, + clusterId: 2, contentTopics: [contentTopic] }; @@ -318,6 +320,7 @@ describe("getConnectedPeersForProtocolAndShard", function () { peerExchange: true, clusterId: shardInfo1.clusterId, pubsubTopic: shardInfoToPubsubTopics(shardInfo1), + contentTopic: [contentTopic], lightpush: true, relay: true }); @@ -355,12 +358,12 @@ describe("getConnectedPeersForProtocolAndShard", function () { this.timeout(15000); const shardInfo1: ContentTopicInfo = { - clusterId: 2, + clusterId: 1, contentTopics: [contentTopic] }; const shardInfo2: ContentTopicInfo = { - clusterId: 3, + clusterId: 2, contentTopics: ["/test/5/waku-light-push/utf8"] }; @@ -370,6 +373,7 @@ describe("getConnectedPeersForProtocolAndShard", function () { peerExchange: true, clusterId: shardInfo1.clusterId, pubsubTopic: shardInfoToPubsubTopics(shardInfo1), + contentTopic: [contentTopic], lightpush: true, relay: true }); diff --git a/packages/tests/tests/light-push/single_node/multiple_pubsub.node.spec.ts b/packages/tests/tests/light-push/single_node/multiple_pubsub.node.spec.ts index d16893f3e2..613498c962 100644 --- a/packages/tests/tests/light-push/single_node/multiple_pubsub.node.spec.ts +++ b/packages/tests/tests/light-push/single_node/multiple_pubsub.node.spec.ts @@ -185,7 +185,7 @@ describe("Waku Light Push (Autosharding): Multiple PubsubTopics", function () { let nwaku2: ServiceNode; let messageCollector: MessageCollector; - const clusterId = 2; + const clusterId = 1; const customContentTopic1 = "/waku/2/content/test.js"; const customContentTopic2 = "/myapp/1/latest/proto"; const autoshardingPubsubTopic1 = contentTopicToPubsubTopic( diff --git a/packages/tests/tests/light-push/utils.ts b/packages/tests/tests/light-push/utils.ts index 3e131c470c..04a6ccc055 100644 --- a/packages/tests/tests/light-push/utils.ts +++ b/packages/tests/tests/light-push/utils.ts @@ -1,5 +1,6 @@ import { createEncoder, waitForRemotePeer } from "@waku/core"; import { + ContentTopicInfo, DefaultPubsubTopic, LightNode, Protocols, @@ -23,13 +24,23 @@ export async function runNodes( shardInfo?: ShardingParams ): Promise<[ServiceNode, LightNode]> { const nwaku = new ServiceNode(makeLogFileName(context)); + + function isContentTopicInfo(info: ShardingParams): info is ContentTopicInfo { + return (info as ContentTopicInfo).contentTopics !== undefined; + } + await nwaku.start( { lightpush: true, filter: true, relay: true, pubsubTopic: pubsubTopics, - ...(shardInfo && { clusterId: shardInfo.clusterId }) + // Conditionally include clusterId if shardInfo exists + ...(shardInfo && { clusterId: shardInfo.clusterId }), + // Conditionally include contentTopic if shardInfo exists and clusterId is 1 + ...(shardInfo && + isContentTopicInfo(shardInfo) && + shardInfo.clusterId === 1 && { contentTopic: shardInfo.contentTopics }) }, { retries: 3 } ); diff --git a/packages/tests/tests/metadata.spec.ts b/packages/tests/tests/metadata.spec.ts index 172accb1bb..2d5779fd66 100644 --- a/packages/tests/tests/metadata.spec.ts +++ b/packages/tests/tests/metadata.spec.ts @@ -62,7 +62,6 @@ describe("Metadata Protocol", function () { expect(activeConnections.length).to.equal(1); }); - // Had to use cluster 0 because of https://github.com/waku-org/js-waku/issues/1848 it("same cluster, different shard: nodes connect", async function () { const shardInfo1: ShardInfo = { clusterId: 0, diff --git a/packages/tests/tests/relay/multiple_pubsub.node.spec.ts b/packages/tests/tests/relay/multiple_pubsub.node.spec.ts index c2e086e22f..20990818c0 100644 --- a/packages/tests/tests/relay/multiple_pubsub.node.spec.ts +++ b/packages/tests/tests/relay/multiple_pubsub.node.spec.ts @@ -317,6 +317,7 @@ describe("Waku Relay, multiple pubsub topics", function () { describe("Waku Relay (Autosharding), multiple pubsub topics", function () { this.timeout(15000); + const clusterID = 1; let waku1: RelayNode; let waku2: RelayNode; let waku3: RelayNode; @@ -325,18 +326,18 @@ describe("Waku Relay (Autosharding), multiple pubsub topics", function () { const customContentTopic2 = "/myapp/1/latest/proto"; const autoshardingPubsubTopic1 = contentTopicToPubsubTopic( customContentTopic1, - 3 + clusterID ); const autoshardingPubsubTopic2 = contentTopicToPubsubTopic( customContentTopic2, - 3 + clusterID ); const contentTopicInfo1: ContentTopicInfo = { - clusterId: 3, + clusterId: clusterID, contentTopics: [customContentTopic1] }; const contentTopicInfo2: ContentTopicInfo = { - clusterId: 3, + clusterId: clusterID, contentTopics: [customContentTopic2] }; const customEncoder1 = createEncoder({ @@ -356,7 +357,7 @@ describe("Waku Relay (Autosharding), multiple pubsub topics", function () { pubsubTopicToSingleShardInfo(autoshardingPubsubTopic2) ); const contentTopicInfoBothShards: ContentTopicInfo = { - clusterId: 3, + clusterId: clusterID, contentTopics: [customContentTopic1, customContentTopic2] }; diff --git a/packages/tests/tests/sharding/auto_sharding.spec.ts b/packages/tests/tests/sharding/auto_sharding.spec.ts new file mode 100644 index 0000000000..5bc6f17a1e --- /dev/null +++ b/packages/tests/tests/sharding/auto_sharding.spec.ts @@ -0,0 +1,380 @@ +import { LightNode, Protocols } from "@waku/interfaces"; +import { + createEncoder, + createLightNode, + utf8ToBytes, + waitForRemotePeer +} from "@waku/sdk"; +import { + contentTopicToPubsubTopic, + contentTopicToShardIndex +} from "@waku/utils"; +import { expect } from "chai"; + +import { + afterEachCustom, + beforeEachCustom, + makeLogFileName, + MessageCollector, + ServiceNode, + tearDownNodes +} from "../../src/index.js"; + +const ContentTopic = "/waku/2/content/test.js"; +const ContentTopic2 = "/myapp/1/latest/proto"; + +describe("Autosharding: Running Nodes", function () { + this.timeout(15_000); + let clusterId = 1; + let waku: LightNode; + let nwaku: ServiceNode; + let messageCollector: MessageCollector; + + beforeEachCustom(this, async () => { + nwaku = new ServiceNode(makeLogFileName(this.ctx)); + messageCollector = new MessageCollector(nwaku); + }); + + afterEachCustom(this, async () => { + await tearDownNodes(nwaku, waku); + }); + + describe("Different clusters and topics", function () { + // js-waku allows autosharding for cluster IDs different than 1 + // we have 2 dedicated tests for this case + it("Cluster ID 0 - Default/Global Cluster", async function () { + clusterId = 0; + const pubsubTopics = [contentTopicToPubsubTopic(ContentTopic, clusterId)]; + await nwaku.start({ + store: true, + lightpush: true, + relay: true, + clusterId: clusterId, + pubsubTopic: pubsubTopics + }); + + waku = await createLightNode({ + shardInfo: { + clusterId: clusterId, + contentTopics: [ContentTopic] + } + }); + await waku.dial(await nwaku.getMultiaddrWithId()); + await waitForRemotePeer(waku, [Protocols.LightPush]); + + const encoder = createEncoder({ + contentTopic: ContentTopic, + pubsubTopicShardInfo: { + clusterId: clusterId, + shard: contentTopicToShardIndex(ContentTopic) + } + }); + + const request = await waku.lightPush.send(encoder, { + payload: utf8ToBytes("Hello World") + }); + + expect(request.recipients.length).to.eq(1); + expect( + await messageCollector.waitForMessagesAutosharding(1, { + contentTopic: ContentTopic + }) + ).to.eq(true); + }); + + it("Non Autosharding Cluster", async function () { + clusterId = 5; + const pubsubTopics = [contentTopicToPubsubTopic(ContentTopic, clusterId)]; + await nwaku.start({ + store: true, + lightpush: true, + relay: true, + clusterId: clusterId, + pubsubTopic: pubsubTopics + }); + + waku = await createLightNode({ + shardInfo: { + clusterId: clusterId, + contentTopics: [ContentTopic] + } + }); + await waku.dial(await nwaku.getMultiaddrWithId()); + await waitForRemotePeer(waku, [Protocols.LightPush]); + + const encoder = createEncoder({ + contentTopic: ContentTopic, + pubsubTopicShardInfo: { + clusterId: clusterId, + shard: contentTopicToShardIndex(ContentTopic) + } + }); + + const request = await waku.lightPush.send(encoder, { + payload: utf8ToBytes("Hello World") + }); + + expect(request.recipients.length).to.eq(1); + expect( + await messageCollector.waitForMessagesAutosharding(1, { + contentTopic: ContentTopic + }) + ).to.eq(true); + }); + + const numTest = 10; + for (let i = 0; i < numTest; i++) { + // Random ContentTopic + const applicationName = `app${Math.floor(Math.random() * 100)}`; // Random application name app0 to app99 + const version = Math.floor(Math.random() * 10) + 1; // Random version between 1 and 10 + const topicName = `topic${Math.floor(Math.random() * 1000)}`; // Random topic name topic0 to topic999 + const encodingList = ["proto", "json", "xml", "test.js", "utf8"]; // Potential encodings + const encoding = + encodingList[Math.floor(Math.random() * encodingList.length)]; // Random encoding + const ContentTopic = `/${applicationName}/${version}/${topicName}/${encoding}`; + + it(`random auto sharding ${ + i + 1 + } - Cluster ID: ${clusterId}, Content Topic: ${ContentTopic}`, async function () { + const pubsubTopics = [ + contentTopicToPubsubTopic(ContentTopic, clusterId) + ]; + + await nwaku.start({ + store: true, + lightpush: true, + relay: true, + clusterId: clusterId, + pubsubTopic: pubsubTopics, + contentTopic: [ContentTopic] + }); + + waku = await createLightNode({ + shardInfo: { + clusterId: clusterId, + contentTopics: [ContentTopic] + } + }); + + await waku.dial(await nwaku.getMultiaddrWithId()); + await waitForRemotePeer(waku, [Protocols.LightPush]); + + const encoder = createEncoder({ + contentTopic: ContentTopic, + pubsubTopicShardInfo: { + clusterId: clusterId, + shard: contentTopicToShardIndex(ContentTopic) + } + }); + + const request = await waku.lightPush.send(encoder, { + payload: utf8ToBytes("Hello World") + }); + + expect(request.recipients.length).to.eq(1); + expect( + await messageCollector.waitForMessagesAutosharding(1, { + contentTopic: ContentTopic + }) + ).to.eq(true); + }); + } + + it("Wrong topic", async function () { + const wrongTopic = "wrong_format"; + try { + contentTopicToPubsubTopic(wrongTopic, clusterId); + throw new Error("Wrong topic should've thrown an error"); + } catch (err) { + if ( + !(err instanceof Error) || + !err.message.includes("Content topic format is invalid") + ) { + throw err; + } + } + }); + + describe("Others", function () { + it("configure the node with multiple content topics", async function () { + const pubsubTopics = [ + contentTopicToPubsubTopic(ContentTopic, clusterId), + contentTopicToPubsubTopic(ContentTopic2, clusterId) + ]; + + await nwaku.start({ + store: true, + lightpush: true, + relay: true, + clusterId: clusterId, + pubsubTopic: pubsubTopics, + contentTopic: [ContentTopic, ContentTopic2] + }); + + waku = await createLightNode({ + shardInfo: { + clusterId: clusterId, + // For autosharding, we configure multiple pubsub topics by using two content topics that hash to different shards + contentTopics: [ContentTopic, ContentTopic2] + } + }); + await waku.dial(await nwaku.getMultiaddrWithId()); + await waitForRemotePeer(waku, [Protocols.LightPush]); + + const encoder1 = createEncoder({ + contentTopic: ContentTopic, + pubsubTopicShardInfo: { + clusterId: clusterId, + shard: contentTopicToShardIndex(ContentTopic) + } + }); + + const encoder2 = createEncoder({ + contentTopic: ContentTopic2, + pubsubTopicShardInfo: { + clusterId: clusterId, + shard: contentTopicToShardIndex(ContentTopic2) + } + }); + + const request1 = await waku.lightPush.send(encoder1, { + payload: utf8ToBytes("Hello World") + }); + expect(request1.recipients.length).to.eq(1); + expect( + await messageCollector.waitForMessagesAutosharding(1, { + contentTopic: ContentTopic + }) + ).to.eq(true); + + const request2 = await waku.lightPush.send(encoder2, { + payload: utf8ToBytes("Hello World") + }); + expect(request2.recipients.length).to.eq(1); + expect( + await messageCollector.waitForMessagesAutosharding(1, { + contentTopic: ContentTopic + }) + ).to.eq(true); + }); + + it("using a protocol with unconfigured pubsub topic should fail", async function () { + const pubsubTopics = [ + contentTopicToPubsubTopic(ContentTopic, clusterId) + ]; + await nwaku.start({ + store: true, + lightpush: true, + relay: true, + clusterId: clusterId, + pubsubTopic: pubsubTopics, + contentTopic: [ContentTopic] + }); + + waku = await createLightNode({ + shardInfo: { + clusterId: clusterId, + contentTopics: [ContentTopic] + } + }); + + // use a content topic that is not configured + const encoder = createEncoder({ + contentTopic: ContentTopic2, + pubsubTopicShardInfo: { + clusterId: clusterId, + shard: contentTopicToShardIndex(ContentTopic2) + } + }); + + try { + await waku.lightPush.send(encoder, { + payload: utf8ToBytes("Hello World") + }); + throw new Error("The request should've thrown an error"); + } catch (err) { + if ( + !(err instanceof Error) || + !err.message.includes( + `Pubsub topic ${contentTopicToPubsubTopic( + ContentTopic2, + clusterId + )} has not been configured on this instance. Configured topics are: ${ + pubsubTopics[0] + }` + ) + ) { + throw err; + } + } + }); + + it("start node with ApplicationInfo", async function () { + const pubsubTopics = [ + contentTopicToPubsubTopic(ContentTopic, clusterId) + ]; + + await nwaku.start({ + store: true, + lightpush: true, + relay: true, + clusterId: clusterId, + pubsubTopic: pubsubTopics, + contentTopic: [ContentTopic] + }); + + waku = await createLightNode({ + shardInfo: { + clusterId: clusterId, + application: ContentTopic.split("/")[1], + version: ContentTopic.split("/")[2] + } + }); + await waku.dial(await nwaku.getMultiaddrWithId()); + await waitForRemotePeer(waku, [Protocols.LightPush]); + + const encoder = createEncoder({ + contentTopic: ContentTopic, + pubsubTopicShardInfo: { + clusterId: clusterId, + shard: contentTopicToShardIndex(ContentTopic) + } + }); + + const request = await waku.lightPush.send(encoder, { + payload: utf8ToBytes("Hello World") + }); + + expect(request.recipients.length).to.eq(1); + expect( + await messageCollector.waitForMessagesAutosharding(1, { + contentTopic: ContentTopic + }) + ).to.eq(true); + }); + + it("start node with empty content topic", async function () { + try { + waku = await createLightNode({ + shardInfo: { + clusterId: clusterId, + contentTopics: [] + } + }); + throw new Error( + "Starting the node with no content topic should've thrown an error" + ); + } catch (err) { + if ( + !(err instanceof Error) || + !err.message.includes( + "Missing minimum required configuration options for static sharding or autosharding" + ) + ) { + throw err; + } + } + }); + }); + }); +}); diff --git a/packages/tests/tests/sharding/peer_management.spec.ts b/packages/tests/tests/sharding/peer_management.spec.ts index 4cf75792bd..e52030cbec 100644 --- a/packages/tests/tests/sharding/peer_management.spec.ts +++ b/packages/tests/tests/sharding/peer_management.spec.ts @@ -206,7 +206,7 @@ describe("Static Sharding: Peer Management", function () { describe("Autosharding: Peer Management", function () { const ContentTopic = "/waku/2/content/test.js"; - const clusterId = 2; + const clusterId = 1; describe("Peer Exchange", function () { let waku: LightNode; @@ -241,7 +241,8 @@ describe("Autosharding: Peer Management", function () { discv5Discovery: true, peerExchange: true, relay: true, - clusterId: clusterId + clusterId: clusterId, + contentTopic: [ContentTopic] }); const enr1 = (await nwaku1.info()).enrUri; @@ -252,7 +253,8 @@ describe("Autosharding: Peer Management", function () { peerExchange: true, discv5BootstrapNode: enr1, relay: true, - clusterId: clusterId + clusterId: clusterId, + contentTopic: [ContentTopic] }); const enr2 = (await nwaku2.info()).enrUri; @@ -263,7 +265,8 @@ describe("Autosharding: Peer Management", function () { peerExchange: true, discv5BootstrapNode: enr2, relay: true, - clusterId: clusterId + clusterId: clusterId, + contentTopic: [ContentTopic] }); const nwaku3Ma = await nwaku3.getMultiaddrWithId(); @@ -332,7 +335,8 @@ describe("Autosharding: Peer Management", function () { discv5Discovery: true, peerExchange: true, discv5BootstrapNode: enr1, - clusterId: clusterId + clusterId: clusterId, + contentTopic: [ContentTopic] }); const enr2 = (await nwaku2.info()).enrUri; @@ -343,7 +347,8 @@ describe("Autosharding: Peer Management", function () { discv5Discovery: true, peerExchange: true, discv5BootstrapNode: enr2, - clusterId: clusterId + clusterId: clusterId, + contentTopic: [ContentTopic] }); const nwaku3Ma = await nwaku3.getMultiaddrWithId(); diff --git a/packages/tests/tests/sharding/running_nodes.spec.ts b/packages/tests/tests/sharding/running_nodes.spec.ts deleted file mode 100644 index f122f33d52..0000000000 --- a/packages/tests/tests/sharding/running_nodes.spec.ts +++ /dev/null @@ -1,167 +0,0 @@ -import { - LightNode, - Protocols, - ShardInfo, - SingleShardInfo -} from "@waku/interfaces"; -import { - createEncoder, - createLightNode, - utf8ToBytes, - waitForRemotePeer -} from "@waku/sdk"; -import { - contentTopicToShardIndex, - singleShardInfoToPubsubTopic -} from "@waku/utils"; -import { expect } from "chai"; - -import { - afterEachCustom, - beforeEachCustom, - makeLogFileName, - ServiceNode, - tearDownNodes -} from "../../src/index.js"; - -const PubsubTopic1 = singleShardInfoToPubsubTopic({ - clusterId: 0, - shard: 2 -}); -const PubsubTopic2 = singleShardInfoToPubsubTopic({ - clusterId: 0, - shard: 3 -}); -const shardInfoFirstShard: ShardInfo = { clusterId: 0, shards: [2] }; -const shardInfoBothShards: ShardInfo = { clusterId: 0, shards: [2, 3] }; -const singleShardInfo1: SingleShardInfo = { clusterId: 0, shard: 2 }; -const singleShardInfo2: SingleShardInfo = { clusterId: 0, shard: 3 }; -const ContentTopic = "/waku/2/content/test.js"; -const ContentTopic2 = "/myapp/1/latest/proto"; - -describe("Static Sharding: Running Nodes", function () { - let waku: LightNode; - let nwaku: ServiceNode; - - beforeEachCustom(this, async () => { - nwaku = new ServiceNode(makeLogFileName(this.ctx)); - await nwaku.start({ store: true, lightpush: true, relay: true }); - }); - - afterEachCustom(this, async () => { - await tearDownNodes(nwaku, waku); - }); - - it("configure the node with multiple pubsub topics", async function () { - this.timeout(15_000); - waku = await createLightNode({ - shardInfo: shardInfoBothShards - }); - await waku.dial(await nwaku.getMultiaddrWithId()); - await waitForRemotePeer(waku, [Protocols.LightPush]); - - const encoder1 = createEncoder({ - contentTopic: ContentTopic, - pubsubTopicShardInfo: singleShardInfo1 - }); - - const encoder2 = createEncoder({ - contentTopic: ContentTopic, - pubsubTopicShardInfo: singleShardInfo2 - }); - - const request1 = await waku.lightPush.send(encoder1, { - payload: utf8ToBytes("Hello World") - }); - - const request2 = await waku.lightPush.send(encoder2, { - payload: utf8ToBytes("Hello World") - }); - - expect(request1.recipients.length).to.eq(1); - expect(request2.recipients.length).to.eq(1); - }); - - it("using a protocol with unconfigured pubsub topic should fail", async function () { - this.timeout(15_000); - waku = await createLightNode({ - shardInfo: shardInfoFirstShard - }); - - // use a pubsub topic that is not configured - const encoder = createEncoder({ - contentTopic: ContentTopic, - pubsubTopicShardInfo: singleShardInfo2 - }); - - try { - await waku.lightPush.send(encoder, { - payload: utf8ToBytes("Hello World") - }); - throw new Error("The request should've thrown an error"); - } catch (err) { - if ( - !(err instanceof Error) || - !err.message.includes( - `Pubsub topic ${PubsubTopic2} has not been configured on this instance. Configured topics are: ${PubsubTopic1}` - ) - ) { - throw err; - } - } - }); -}); - -describe("Autosharding: Running Nodes", function () { - let waku: LightNode; - let nwaku: ServiceNode; - - beforeEachCustom(this, async () => { - nwaku = new ServiceNode(makeLogFileName(this.ctx)); - await nwaku.start({ store: true, lightpush: true, relay: true }); - }); - - afterEachCustom(this, async () => { - await tearDownNodes(nwaku, waku); - }); - - it("configure the node with multiple pubsub topics", async function () { - this.timeout(15_000); - waku = await createLightNode({ - shardInfo: { - clusterId: 0, - // For autosharding, we configure multiple pubsub topics by using two content topics that hash to different shards - contentTopics: [ContentTopic, ContentTopic2] - } - }); - await waku.dial(await nwaku.getMultiaddrWithId()); - await waitForRemotePeer(waku, [Protocols.LightPush]); - - const encoder1 = createEncoder({ - contentTopic: ContentTopic, - pubsubTopicShardInfo: { - clusterId: 0, - shard: contentTopicToShardIndex(ContentTopic) - } - }); - - const encoder2 = createEncoder({ - contentTopic: ContentTopic, - pubsubTopicShardInfo: { - clusterId: 0, - shard: contentTopicToShardIndex(ContentTopic2) - } - }); - - const request1 = await waku.lightPush.send(encoder1, { - payload: utf8ToBytes("Hello World") - }); - - const request2 = await waku.lightPush.send(encoder2, { - payload: utf8ToBytes("Hello World") - }); - - expect(request1.recipients.length).to.eq(1); - expect(request2.recipients.length).to.eq(1); - }); -}); diff --git a/packages/tests/tests/sharding/static_sharding.spec.ts b/packages/tests/tests/sharding/static_sharding.spec.ts new file mode 100644 index 0000000000..9e86070030 --- /dev/null +++ b/packages/tests/tests/sharding/static_sharding.spec.ts @@ -0,0 +1,316 @@ +import { + LightNode, + Protocols, + ShardInfo, + SingleShardInfo +} from "@waku/interfaces"; +import { + createEncoder, + createLightNode, + utf8ToBytes, + waitForRemotePeer +} from "@waku/sdk"; +import { + shardInfoToPubsubTopics, + singleShardInfosToShardInfo, + singleShardInfoToPubsubTopic +} from "@waku/utils"; +import { expect } from "chai"; + +import { + afterEachCustom, + beforeEachCustom, + makeLogFileName, + MessageCollector, + ServiceNode, + tearDownNodes +} from "../../src/index.js"; + +const ContentTopic = "/waku/2/content/test.js"; + +describe("Static Sharding: Running Nodes", function () { + this.timeout(15_000); + let waku: LightNode; + let nwaku: ServiceNode; + let messageCollector: MessageCollector; + + beforeEachCustom(this, async () => { + nwaku = new ServiceNode(makeLogFileName(this.ctx)); + messageCollector = new MessageCollector(nwaku); + }); + + afterEachCustom(this, async () => { + await tearDownNodes(nwaku, waku); + }); + + describe("Different clusters and shards", function () { + // Will be skipped until https://github.com/waku-org/js-waku/issues/1874 is fixed + it.skip("shard 0", async function () { + const singleShardInfo = { clusterId: 0, shard: 0 }; + const encoder = createEncoder({ + contentTopic: ContentTopic, + pubsubTopicShardInfo: singleShardInfo + }); + expect(encoder.pubsubTopic).to.eq( + singleShardInfoToPubsubTopic(singleShardInfo) + ); + }); + + // dedicated test for Default Cluster ID 0 + it("Cluster ID 0 - Default/Global Cluster", async function () { + const singleShardInfo = { clusterId: 0, shard: 1 }; + const shardInfo = singleShardInfosToShardInfo([singleShardInfo]); + + await nwaku.start({ + store: true, + lightpush: true, + relay: true, + pubsubTopic: shardInfoToPubsubTopics(shardInfo) + }); + + waku = await createLightNode({ + shardInfo: shardInfo + }); + await waku.dial(await nwaku.getMultiaddrWithId()); + await waitForRemotePeer(waku, [Protocols.LightPush]); + + const encoder = createEncoder({ + contentTopic: ContentTopic, + pubsubTopicShardInfo: singleShardInfo + }); + + const request = await waku.lightPush.send(encoder, { + payload: utf8ToBytes("Hello World") + }); + + expect(request.recipients.length).to.eq(1); + expect( + await messageCollector.waitForMessages(1, { + pubsubTopic: shardInfoToPubsubTopics(shardInfo)[0] + }) + ).to.eq(true); + }); + + // dedicated test for Cluster ID 1 - WAKU Network + it("Cluster ID 1 - WAKU Network", async function () { + const singleShardInfo = { clusterId: 1, shard: 1 }; + const shardInfo = singleShardInfosToShardInfo([singleShardInfo]); + + await nwaku.start({ + store: true, + lightpush: true, + relay: true, + clusterId: 1, + contentTopic: [ContentTopic], + pubsubTopic: shardInfoToPubsubTopics(shardInfo) + }); + + waku = await createLightNode({ + shardInfo: shardInfo + }); + await waku.dial(await nwaku.getMultiaddrWithId()); + await waitForRemotePeer(waku, [Protocols.LightPush]); + + const encoder = createEncoder({ + contentTopic: ContentTopic, + pubsubTopicShardInfo: singleShardInfo + }); + + const request = await waku.lightPush.send(encoder, { + payload: utf8ToBytes("Hello World") + }); + + expect(request.recipients.length).to.eq(1); + expect( + await messageCollector.waitForMessages(1, { + pubsubTopic: shardInfoToPubsubTopics(shardInfo)[0] + }) + ).to.eq(true); + }); + + const numTest = 10; + for (let i = 0; i < numTest; i++) { + // Random clusterId between 2 and 1000 + const clusterId = Math.floor(Math.random() * 999) + 2; + + // Random shardId between 1 and 1000 + const shardId = Math.floor(Math.random() * 1000) + 1; + + it(`random static sharding ${ + i + 1 + } - Cluster ID: ${clusterId}, Shard ID: ${shardId}`, async function () { + afterEach(async () => { + await tearDownNodes(nwaku, waku); + }); + + const singleShardInfo = { clusterId: clusterId, shard: shardId }; + const shardInfo = singleShardInfosToShardInfo([singleShardInfo]); + + await nwaku.start({ + store: true, + lightpush: true, + relay: true, + clusterId: clusterId, + pubsubTopic: shardInfoToPubsubTopics(shardInfo) + }); + + waku = await createLightNode({ + shardInfo: shardInfo + }); + await waku.dial(await nwaku.getMultiaddrWithId()); + await waitForRemotePeer(waku, [Protocols.LightPush]); + + const encoder = createEncoder({ + contentTopic: ContentTopic, + pubsubTopicShardInfo: singleShardInfo + }); + + const request = await waku.lightPush.send(encoder, { + payload: utf8ToBytes("Hello World") + }); + + expect(request.recipients.length).to.eq(1); + expect( + await messageCollector.waitForMessages(1, { + pubsubTopic: shardInfoToPubsubTopics(shardInfo)[0] + }) + ).to.eq(true); + }); + } + }); + + describe("Others", function () { + const clusterId = 2; + let shardInfo: ShardInfo; + + const PubsubTopic1 = singleShardInfoToPubsubTopic({ + clusterId: clusterId, + shard: 2 + }); + const PubsubTopic2 = singleShardInfoToPubsubTopic({ + clusterId: clusterId, + shard: 3 + }); + const shardInfoFirstShard: ShardInfo = { + clusterId: clusterId, + shards: [2] + }; + const shardInfoBothShards: ShardInfo = { + clusterId: clusterId, + shards: [2, 3] + }; + const singleShardInfo1: SingleShardInfo = { + clusterId: clusterId, + shard: 2 + }; + const singleShardInfo2: SingleShardInfo = { + clusterId: clusterId, + shard: 3 + }; + + beforeEachCustom(this, async () => { + shardInfo = { + clusterId: clusterId, + shards: [2] + }; + + await nwaku.start({ + store: true, + lightpush: true, + relay: true, + clusterId: clusterId, + pubsubTopic: shardInfoToPubsubTopics(shardInfo) + }); + }); + + afterEachCustom(this, async () => { + await tearDownNodes(nwaku, waku); + }); + + it("configure the node with multiple pubsub topics", async function () { + waku = await createLightNode({ + shardInfo: shardInfoBothShards + }); + await waku.dial(await nwaku.getMultiaddrWithId()); + await waitForRemotePeer(waku, [Protocols.LightPush]); + + const encoder1 = createEncoder({ + contentTopic: ContentTopic, + pubsubTopicShardInfo: singleShardInfo1 + }); + + const encoder2 = createEncoder({ + contentTopic: ContentTopic, + pubsubTopicShardInfo: singleShardInfo2 + }); + + const request1 = await waku.lightPush.send(encoder1, { + payload: utf8ToBytes("Hello World2") + }); + expect(request1.recipients.length).to.eq(1); + expect( + await messageCollector.waitForMessages(1, { + pubsubTopic: shardInfoToPubsubTopics(shardInfo)[0] + }) + ).to.eq(true); + + const request2 = await waku.lightPush.send(encoder2, { + payload: utf8ToBytes("Hello World3") + }); + expect(request2.recipients.length).to.eq(1); + expect( + await messageCollector.waitForMessages(1, { + pubsubTopic: shardInfoToPubsubTopics(shardInfo)[0] + }) + ).to.eq(true); + }); + + it("using a protocol with unconfigured pubsub topic should fail", async function () { + waku = await createLightNode({ + shardInfo: shardInfoFirstShard + }); + + // use a pubsub topic that is not configured + const encoder = createEncoder({ + contentTopic: ContentTopic, + pubsubTopicShardInfo: singleShardInfo2 + }); + + try { + await waku.lightPush.send(encoder, { + payload: utf8ToBytes("Hello World") + }); + throw new Error("The request should've thrown an error"); + } catch (err) { + if ( + !(err instanceof Error) || + !err.message.includes( + `Pubsub topic ${PubsubTopic2} has not been configured on this instance. Configured topics are: ${PubsubTopic1}` + ) + ) { + throw err; + } + } + }); + + it("start node with empty shard", async function () { + try { + waku = await createLightNode({ + shardInfo: { clusterId: clusterId, shards: [] } + }); + throw new Error( + "Starting the node with no shard should've thrown an error" + ); + } catch (err) { + if ( + !(err instanceof Error) || + !err.message.includes( + "Missing minimum required configuration options for static sharding or autosharding" + ) + ) { + throw err; + } + } + }); + }); +}); diff --git a/packages/tests/tests/store/multiple_pubsub.spec.ts b/packages/tests/tests/store/multiple_pubsub.spec.ts index cfac638e96..a842afbe5f 100644 --- a/packages/tests/tests/store/multiple_pubsub.spec.ts +++ b/packages/tests/tests/store/multiple_pubsub.spec.ts @@ -180,8 +180,7 @@ describe("Waku Store, custom pubsub topic", function () { }); }); -// Skipped until https://github.com/waku-org/js-waku/issues/1845 gets fixed -describe.skip("Waku Store (Autosharding), custom pubsub topic", function () { +describe("Waku Store (Autosharding), custom pubsub topic", function () { this.timeout(15000); let waku: LightNode; let nwaku: ServiceNode; @@ -189,7 +188,7 @@ describe.skip("Waku Store (Autosharding), custom pubsub topic", function () { const customContentTopic1 = "/waku/2/content/utf8"; const customContentTopic2 = "/myapp/1/latest/proto"; - const clusterId = 2; + const clusterId = 1; const autoshardingPubsubTopic1 = contentTopicToPubsubTopic( customContentTopic1, clusterId @@ -220,6 +219,7 @@ describe.skip("Waku Store (Autosharding), custom pubsub topic", function () { await nwaku.start({ store: true, pubsubTopic: [autoshardingPubsubTopic1, autoshardingPubsubTopic2], + contentTopic: [customContentTopic1, customContentTopic2], relay: true, clusterId }); @@ -293,6 +293,7 @@ describe.skip("Waku Store (Autosharding), custom pubsub topic", function () { await nwaku2.start({ store: true, pubsubTopic: [autoshardingPubsubTopic2], + contentTopic: [customContentTopic2], relay: true, clusterId }); From 279b1bcef76d662c8b1664f2b8e1f836fe948321 Mon Sep 17 00:00:00 2001 From: Florin Barbu Date: Mon, 4 Mar 2024 19:05:53 +0200 Subject: [PATCH 02/16] small fixes --- .../tests/sharding/auto_sharding.spec.ts | 314 +++++++++--------- 1 file changed, 155 insertions(+), 159 deletions(-) diff --git a/packages/tests/tests/sharding/auto_sharding.spec.ts b/packages/tests/tests/sharding/auto_sharding.spec.ts index 5bc6f17a1e..c5ad359929 100644 --- a/packages/tests/tests/sharding/auto_sharding.spec.ts +++ b/packages/tests/tests/sharding/auto_sharding.spec.ts @@ -24,8 +24,8 @@ const ContentTopic = "/waku/2/content/test.js"; const ContentTopic2 = "/myapp/1/latest/proto"; describe("Autosharding: Running Nodes", function () { - this.timeout(15_000); - let clusterId = 1; + this.timeout(50000); + const clusterId = 1; let waku: LightNode; let nwaku: ServiceNode; let messageCollector: MessageCollector; @@ -43,7 +43,7 @@ describe("Autosharding: Running Nodes", function () { // js-waku allows autosharding for cluster IDs different than 1 // we have 2 dedicated tests for this case it("Cluster ID 0 - Default/Global Cluster", async function () { - clusterId = 0; + const clusterId = 0; const pubsubTopics = [contentTopicToPubsubTopic(ContentTopic, clusterId)]; await nwaku.start({ store: true, @@ -83,7 +83,7 @@ describe("Autosharding: Running Nodes", function () { }); it("Non Autosharding Cluster", async function () { - clusterId = 5; + const clusterId = 5; const pubsubTopics = [contentTopicToPubsubTopic(ContentTopic, clusterId)]; await nwaku.start({ store: true, @@ -194,187 +194,183 @@ describe("Autosharding: Running Nodes", function () { } } }); + }); - describe("Others", function () { - it("configure the node with multiple content topics", async function () { - const pubsubTopics = [ - contentTopicToPubsubTopic(ContentTopic, clusterId), - contentTopicToPubsubTopic(ContentTopic2, clusterId) - ]; + describe("Others", function () { + it("configure the node with multiple content topics", async function () { + const pubsubTopics = [ + contentTopicToPubsubTopic(ContentTopic, clusterId), + contentTopicToPubsubTopic(ContentTopic2, clusterId) + ]; - await nwaku.start({ - store: true, - lightpush: true, - relay: true, + await nwaku.start({ + store: true, + lightpush: true, + relay: true, + clusterId: clusterId, + pubsubTopic: pubsubTopics, + contentTopic: [ContentTopic, ContentTopic2] + }); + + waku = await createLightNode({ + shardInfo: { clusterId: clusterId, - pubsubTopic: pubsubTopics, - contentTopic: [ContentTopic, ContentTopic2] - }); + // For autosharding, we configure multiple pubsub topics by using two content topics that hash to different shards + contentTopics: [ContentTopic, ContentTopic2] + } + }); + await waku.dial(await nwaku.getMultiaddrWithId()); + await waitForRemotePeer(waku, [Protocols.LightPush]); - waku = await createLightNode({ - shardInfo: { - clusterId: clusterId, - // For autosharding, we configure multiple pubsub topics by using two content topics that hash to different shards - contentTopics: [ContentTopic, ContentTopic2] - } - }); - await waku.dial(await nwaku.getMultiaddrWithId()); - await waitForRemotePeer(waku, [Protocols.LightPush]); + const encoder1 = createEncoder({ + contentTopic: ContentTopic, + pubsubTopicShardInfo: { + clusterId: clusterId, + shard: contentTopicToShardIndex(ContentTopic) + } + }); - const encoder1 = createEncoder({ - contentTopic: ContentTopic, - pubsubTopicShardInfo: { - clusterId: clusterId, - shard: contentTopicToShardIndex(ContentTopic) - } - }); + const encoder2 = createEncoder({ + contentTopic: ContentTopic2, + pubsubTopicShardInfo: { + clusterId: clusterId, + shard: contentTopicToShardIndex(ContentTopic2) + } + }); - const encoder2 = createEncoder({ - contentTopic: ContentTopic2, - pubsubTopicShardInfo: { - clusterId: clusterId, - shard: contentTopicToShardIndex(ContentTopic2) - } - }); + const request1 = await waku.lightPush.send(encoder1, { + payload: utf8ToBytes("Hello World") + }); + expect(request1.recipients.length).to.eq(1); + expect( + await messageCollector.waitForMessagesAutosharding(1, { + contentTopic: ContentTopic + }) + ).to.eq(true); - const request1 = await waku.lightPush.send(encoder1, { - payload: utf8ToBytes("Hello World") - }); - expect(request1.recipients.length).to.eq(1); - expect( - await messageCollector.waitForMessagesAutosharding(1, { - contentTopic: ContentTopic - }) - ).to.eq(true); + const request2 = await waku.lightPush.send(encoder2, { + payload: utf8ToBytes("Hello World") + }); + expect(request2.recipients.length).to.eq(1); + expect( + await messageCollector.waitForMessagesAutosharding(1, { + contentTopic: ContentTopic + }) + ).to.eq(true); + }); - const request2 = await waku.lightPush.send(encoder2, { - payload: utf8ToBytes("Hello World") - }); - expect(request2.recipients.length).to.eq(1); - expect( - await messageCollector.waitForMessagesAutosharding(1, { - contentTopic: ContentTopic - }) - ).to.eq(true); + it("using a protocol with unconfigured pubsub topic should fail", async function () { + const pubsubTopics = [contentTopicToPubsubTopic(ContentTopic, clusterId)]; + await nwaku.start({ + store: true, + lightpush: true, + relay: true, + clusterId: clusterId, + pubsubTopic: pubsubTopics, + contentTopic: [ContentTopic] }); - it("using a protocol with unconfigured pubsub topic should fail", async function () { - const pubsubTopics = [ - contentTopicToPubsubTopic(ContentTopic, clusterId) - ]; - await nwaku.start({ - store: true, - lightpush: true, - relay: true, + waku = await createLightNode({ + shardInfo: { clusterId: clusterId, - pubsubTopic: pubsubTopics, - contentTopic: [ContentTopic] - }); + contentTopics: [ContentTopic] + } + }); - waku = await createLightNode({ - shardInfo: { - clusterId: clusterId, - contentTopics: [ContentTopic] - } - }); + // use a content topic that is not configured + const encoder = createEncoder({ + contentTopic: ContentTopic2, + pubsubTopicShardInfo: { + clusterId: clusterId, + shard: contentTopicToShardIndex(ContentTopic2) + } + }); - // use a content topic that is not configured - const encoder = createEncoder({ - contentTopic: ContentTopic2, - pubsubTopicShardInfo: { - clusterId: clusterId, - shard: contentTopicToShardIndex(ContentTopic2) - } + try { + await waku.lightPush.send(encoder, { + payload: utf8ToBytes("Hello World") }); - - try { - await waku.lightPush.send(encoder, { - payload: utf8ToBytes("Hello World") - }); - throw new Error("The request should've thrown an error"); - } catch (err) { - if ( - !(err instanceof Error) || - !err.message.includes( - `Pubsub topic ${contentTopicToPubsubTopic( - ContentTopic2, - clusterId - )} has not been configured on this instance. Configured topics are: ${ - pubsubTopics[0] - }` - ) - ) { - throw err; - } + throw new Error("The request should've thrown an error"); + } catch (err) { + if ( + !(err instanceof Error) || + !err.message.includes( + `Pubsub topic ${contentTopicToPubsubTopic( + ContentTopic2, + clusterId + )} has not been configured on this instance. Configured topics are: ${ + pubsubTopics[0] + }` + ) + ) { + throw err; } + } + }); + + it("start node with ApplicationInfo", async function () { + const pubsubTopics = [contentTopicToPubsubTopic(ContentTopic, clusterId)]; + + await nwaku.start({ + store: true, + lightpush: true, + relay: true, + clusterId: clusterId, + pubsubTopic: pubsubTopics, + contentTopic: [ContentTopic] }); - it("start node with ApplicationInfo", async function () { - const pubsubTopics = [ - contentTopicToPubsubTopic(ContentTopic, clusterId) - ]; + waku = await createLightNode({ + shardInfo: { + clusterId: clusterId, + application: ContentTopic.split("/")[1], + version: ContentTopic.split("/")[2] + } + }); + await waku.dial(await nwaku.getMultiaddrWithId()); + await waitForRemotePeer(waku, [Protocols.LightPush]); - await nwaku.start({ - store: true, - lightpush: true, - relay: true, + const encoder = createEncoder({ + contentTopic: ContentTopic, + pubsubTopicShardInfo: { clusterId: clusterId, - pubsubTopic: pubsubTopics, - contentTopic: [ContentTopic] - }); + shard: contentTopicToShardIndex(ContentTopic) + } + }); + const request = await waku.lightPush.send(encoder, { + payload: utf8ToBytes("Hello World") + }); + + expect(request.recipients.length).to.eq(1); + expect( + await messageCollector.waitForMessagesAutosharding(1, { + contentTopic: ContentTopic + }) + ).to.eq(true); + }); + + it("start node with empty content topic", async function () { + try { waku = await createLightNode({ shardInfo: { clusterId: clusterId, - application: ContentTopic.split("/")[1], - version: ContentTopic.split("/")[2] - } - }); - await waku.dial(await nwaku.getMultiaddrWithId()); - await waitForRemotePeer(waku, [Protocols.LightPush]); - - const encoder = createEncoder({ - contentTopic: ContentTopic, - pubsubTopicShardInfo: { - clusterId: clusterId, - shard: contentTopicToShardIndex(ContentTopic) + contentTopics: [] } }); - - const request = await waku.lightPush.send(encoder, { - payload: utf8ToBytes("Hello World") - }); - - expect(request.recipients.length).to.eq(1); - expect( - await messageCollector.waitForMessagesAutosharding(1, { - contentTopic: ContentTopic - }) - ).to.eq(true); - }); - - it("start node with empty content topic", async function () { - try { - waku = await createLightNode({ - shardInfo: { - clusterId: clusterId, - contentTopics: [] - } - }); - throw new Error( - "Starting the node with no content topic should've thrown an error" - ); - } catch (err) { - if ( - !(err instanceof Error) || - !err.message.includes( - "Missing minimum required configuration options for static sharding or autosharding" - ) - ) { - throw err; - } + throw new Error( + "Starting the node with no content topic should've thrown an error" + ); + } catch (err) { + if ( + !(err instanceof Error) || + !err.message.includes( + "Missing minimum required configuration options for static sharding or autosharding" + ) + ) { + throw err; } - }); + } }); }); }); From 9172ea518ca2638aace8f40403dddade0893edc1 Mon Sep 17 00:00:00 2001 From: Florin Barbu Date: Thu, 7 Mar 2024 12:23:25 +0200 Subject: [PATCH 03/16] adjust clusterID based on version --- .../tests/src/utils/generate_test_data.ts | 18 --------- packages/tests/src/utils/index.ts | 1 + .../tests/src/utils/waku_versions_utils.ts | 34 +++++++++++++++++ .../single_node/multiple_pubsub.node.spec.ts | 3 +- packages/tests/tests/getPeers.spec.ts | 12 +++--- .../single_node/multiple_pubsub.node.spec.ts | 3 +- .../tests/relay/multiple_pubsub.node.spec.ts | 13 ++++--- .../tests/sharding/auto_sharding.spec.ts | 3 +- .../tests/sharding/peer_management.spec.ts | 3 +- .../tests/sharding/static_sharding.spec.ts | 37 ------------------- .../tests/tests/store/multiple_pubsub.spec.ts | 3 +- 11 files changed, 59 insertions(+), 71 deletions(-) create mode 100644 packages/tests/src/utils/waku_versions_utils.ts diff --git a/packages/tests/src/utils/generate_test_data.ts b/packages/tests/src/utils/generate_test_data.ts index c09d8ebf67..8644e26437 100644 --- a/packages/tests/src/utils/generate_test_data.ts +++ b/packages/tests/src/utils/generate_test_data.ts @@ -1,7 +1,5 @@ import { createDecoder, createEncoder, Decoder, Encoder } from "@waku/core"; -import { DOCKER_IMAGE_NAME } from "../lib/service_node"; - // Utility to generate test data for multiple topics tests. export function generateTestData(topicCount: number): { contentTopics: string[]; @@ -22,19 +20,3 @@ export function generateTestData(topicCount: number): { decoders }; } - -// Utility to add test conditions based on nwaku/go-waku versions -export function isNwakuAtLeast(requiredVersion: string): boolean { - const versionRegex = /(?:v)?(\d+\.\d+(?:\.\d+)?)/; - const match = DOCKER_IMAGE_NAME.match(versionRegex); - - if (match) { - const version = match[0].substring(1); // Remove the 'v' prefix - return ( - version.localeCompare(requiredVersion, undefined, { numeric: true }) >= 0 - ); - } else { - // If there is no match we assume that it's a version close to master so we return True - return true; - } -} diff --git a/packages/tests/src/utils/index.ts b/packages/tests/src/utils/index.ts index 1a27402477..1024c0d884 100644 --- a/packages/tests/src/utils/index.ts +++ b/packages/tests/src/utils/index.ts @@ -6,3 +6,4 @@ export * from "./delay.js"; export * from "./base64_utf8.js"; export * from "./waitForConnections.js"; export * from "./custom_mocha_hooks.js"; +export * from "./waku_versions_utils.js"; diff --git a/packages/tests/src/utils/waku_versions_utils.ts b/packages/tests/src/utils/waku_versions_utils.ts new file mode 100644 index 0000000000..0eba84cea5 --- /dev/null +++ b/packages/tests/src/utils/waku_versions_utils.ts @@ -0,0 +1,34 @@ +import { Logger } from "@waku/utils"; + +import { DOCKER_IMAGE_NAME } from "../lib/service_node"; + +const log = new Logger("test:utils"); + +// Utility to add test conditions based on nwaku/go-waku versions +export function isNwakuAtLeast(requiredVersion: string): boolean { + const versionRegex = /(?:v)?(\d+\.\d+(?:\.\d+)?)/; + const match = DOCKER_IMAGE_NAME.match(versionRegex); + + if (match) { + const version = match[0].substring(1); // Remove the 'v' prefix + return ( + version.localeCompare(requiredVersion, undefined, { numeric: true }) >= 0 + ); + } else { + // If there is no match we assume that it's a version close to master so we return True + return true; + } +} + +// Utility to resolve authosharding cluster ID +export function resolveAutoshardingCluster(clusterId: number): number { + if (isNwakuAtLeast("0.26.0")) { + log.info(`Using clusterID ${clusterId} for autosharding`); + return clusterId; + } else { + // for versions older than 0.26.0 the authosharding cluster was hardcoded to 1 + // https://github.com/waku-org/nwaku/pull/2505 + log.warn("Falling back to clusterID 1 for autosharding"); + return 1; + } +} diff --git a/packages/tests/tests/filter/single_node/multiple_pubsub.node.spec.ts b/packages/tests/tests/filter/single_node/multiple_pubsub.node.spec.ts index c083674b96..0f9c59df8b 100644 --- a/packages/tests/tests/filter/single_node/multiple_pubsub.node.spec.ts +++ b/packages/tests/tests/filter/single_node/multiple_pubsub.node.spec.ts @@ -21,6 +21,7 @@ import { beforeEachCustom, makeLogFileName, MessageCollector, + resolveAutoshardingCluster, ServiceNode, tearDownNodes } from "../../../src/index.js"; @@ -186,7 +187,7 @@ describe("Waku Filter V2: Multiple PubsubTopics", function () { describe("Waku Filter V2 (Autosharding): Multiple PubsubTopics", function () { // Set the timeout for all tests in this suite. Can be overwritten at test level this.timeout(30000); - const clusterId = 1; + const clusterId = resolveAutoshardingCluster(3); let waku: LightNode; let nwaku: ServiceNode; let nwaku2: ServiceNode; diff --git a/packages/tests/tests/getPeers.spec.ts b/packages/tests/tests/getPeers.spec.ts index 7498dc9396..ba673259d2 100644 --- a/packages/tests/tests/getPeers.spec.ts +++ b/packages/tests/tests/getPeers.spec.ts @@ -21,6 +21,7 @@ import { afterEachCustom, beforeEachCustom, makeLogFileName, + resolveAutoshardingCluster, ServiceNode, tearDownNodes } from "../src/index.js"; @@ -30,6 +31,7 @@ describe("getConnectedPeersForProtocolAndShard", function () { let serviceNode1: ServiceNode; let serviceNode2: ServiceNode; const contentTopic = "/test/2/waku-light-push/utf8"; + const autoshardingClusterId = resolveAutoshardingCluster(6); beforeEachCustom(this, async () => { serviceNode1 = new ServiceNode(makeLogFileName(this.ctx) + "1"); @@ -218,7 +220,7 @@ describe("getConnectedPeersForProtocolAndShard", function () { this.timeout(15000); const shardInfo: ContentTopicInfo = { - clusterId: 1, + clusterId: autoshardingClusterId, contentTopics: [contentTopic] }; @@ -251,12 +253,12 @@ describe("getConnectedPeersForProtocolAndShard", function () { this.timeout(15000); const shardInfo1: ContentTopicInfo = { - clusterId: 1, + clusterId: autoshardingClusterId, contentTopics: [contentTopic] }; const shardInfo2: ContentTopicInfo = { - clusterId: 1, + clusterId: autoshardingClusterId, contentTopics: ["/test/5/waku-light-push/utf8"] }; @@ -305,7 +307,7 @@ describe("getConnectedPeersForProtocolAndShard", function () { this.timeout(15000); const shardInfo1: ContentTopicInfo = { - clusterId: 1, + clusterId: autoshardingClusterId, contentTopics: [contentTopic] }; @@ -358,7 +360,7 @@ describe("getConnectedPeersForProtocolAndShard", function () { this.timeout(15000); const shardInfo1: ContentTopicInfo = { - clusterId: 1, + clusterId: autoshardingClusterId, contentTopics: [contentTopic] }; diff --git a/packages/tests/tests/light-push/single_node/multiple_pubsub.node.spec.ts b/packages/tests/tests/light-push/single_node/multiple_pubsub.node.spec.ts index 613498c962..19afcd835c 100644 --- a/packages/tests/tests/light-push/single_node/multiple_pubsub.node.spec.ts +++ b/packages/tests/tests/light-push/single_node/multiple_pubsub.node.spec.ts @@ -22,6 +22,7 @@ import { beforeEachCustom, makeLogFileName, MessageCollector, + resolveAutoshardingCluster, ServiceNode, tearDownNodes } from "../../../src/index.js"; @@ -185,7 +186,7 @@ describe("Waku Light Push (Autosharding): Multiple PubsubTopics", function () { let nwaku2: ServiceNode; let messageCollector: MessageCollector; - const clusterId = 1; + const clusterId = resolveAutoshardingCluster(4); const customContentTopic1 = "/waku/2/content/test.js"; const customContentTopic2 = "/myapp/1/latest/proto"; const autoshardingPubsubTopic1 = contentTopicToPubsubTopic( diff --git a/packages/tests/tests/relay/multiple_pubsub.node.spec.ts b/packages/tests/tests/relay/multiple_pubsub.node.spec.ts index 20990818c0..b06f51472e 100644 --- a/packages/tests/tests/relay/multiple_pubsub.node.spec.ts +++ b/packages/tests/tests/relay/multiple_pubsub.node.spec.ts @@ -26,6 +26,7 @@ import { NOISE_KEY_1, NOISE_KEY_2, NOISE_KEY_3, + resolveAutoshardingCluster, tearDownNodes } from "../../src/index.js"; import { TestDecoder } from "../filter/utils.js"; @@ -317,7 +318,7 @@ describe("Waku Relay, multiple pubsub topics", function () { describe("Waku Relay (Autosharding), multiple pubsub topics", function () { this.timeout(15000); - const clusterID = 1; + const clusterId = resolveAutoshardingCluster(7); let waku1: RelayNode; let waku2: RelayNode; let waku3: RelayNode; @@ -326,18 +327,18 @@ describe("Waku Relay (Autosharding), multiple pubsub topics", function () { const customContentTopic2 = "/myapp/1/latest/proto"; const autoshardingPubsubTopic1 = contentTopicToPubsubTopic( customContentTopic1, - clusterID + clusterId ); const autoshardingPubsubTopic2 = contentTopicToPubsubTopic( customContentTopic2, - clusterID + clusterId ); const contentTopicInfo1: ContentTopicInfo = { - clusterId: clusterID, + clusterId: clusterId, contentTopics: [customContentTopic1] }; const contentTopicInfo2: ContentTopicInfo = { - clusterId: clusterID, + clusterId: clusterId, contentTopics: [customContentTopic2] }; const customEncoder1 = createEncoder({ @@ -357,7 +358,7 @@ describe("Waku Relay (Autosharding), multiple pubsub topics", function () { pubsubTopicToSingleShardInfo(autoshardingPubsubTopic2) ); const contentTopicInfoBothShards: ContentTopicInfo = { - clusterId: clusterID, + clusterId: clusterId, contentTopics: [customContentTopic1, customContentTopic2] }; diff --git a/packages/tests/tests/sharding/auto_sharding.spec.ts b/packages/tests/tests/sharding/auto_sharding.spec.ts index c5ad359929..2b38fdb34d 100644 --- a/packages/tests/tests/sharding/auto_sharding.spec.ts +++ b/packages/tests/tests/sharding/auto_sharding.spec.ts @@ -16,6 +16,7 @@ import { beforeEachCustom, makeLogFileName, MessageCollector, + resolveAutoshardingCluster, ServiceNode, tearDownNodes } from "../../src/index.js"; @@ -25,7 +26,7 @@ const ContentTopic2 = "/myapp/1/latest/proto"; describe("Autosharding: Running Nodes", function () { this.timeout(50000); - const clusterId = 1; + const clusterId = resolveAutoshardingCluster(10); let waku: LightNode; let nwaku: ServiceNode; let messageCollector: MessageCollector; diff --git a/packages/tests/tests/sharding/peer_management.spec.ts b/packages/tests/tests/sharding/peer_management.spec.ts index e52030cbec..ad52a293c5 100644 --- a/packages/tests/tests/sharding/peer_management.spec.ts +++ b/packages/tests/tests/sharding/peer_management.spec.ts @@ -21,6 +21,7 @@ import { beforeEachCustom, delay, makeLogFileName, + resolveAutoshardingCluster, ServiceNode, tearDownNodes } from "../../src/index.js"; @@ -206,7 +207,7 @@ describe("Static Sharding: Peer Management", function () { describe("Autosharding: Peer Management", function () { const ContentTopic = "/waku/2/content/test.js"; - const clusterId = 1; + const clusterId = resolveAutoshardingCluster(8); describe("Peer Exchange", function () { let waku: LightNode; diff --git a/packages/tests/tests/sharding/static_sharding.spec.ts b/packages/tests/tests/sharding/static_sharding.spec.ts index 9e86070030..49d7908d27 100644 --- a/packages/tests/tests/sharding/static_sharding.spec.ts +++ b/packages/tests/tests/sharding/static_sharding.spec.ts @@ -91,43 +91,6 @@ describe("Static Sharding: Running Nodes", function () { ).to.eq(true); }); - // dedicated test for Cluster ID 1 - WAKU Network - it("Cluster ID 1 - WAKU Network", async function () { - const singleShardInfo = { clusterId: 1, shard: 1 }; - const shardInfo = singleShardInfosToShardInfo([singleShardInfo]); - - await nwaku.start({ - store: true, - lightpush: true, - relay: true, - clusterId: 1, - contentTopic: [ContentTopic], - pubsubTopic: shardInfoToPubsubTopics(shardInfo) - }); - - waku = await createLightNode({ - shardInfo: shardInfo - }); - await waku.dial(await nwaku.getMultiaddrWithId()); - await waitForRemotePeer(waku, [Protocols.LightPush]); - - const encoder = createEncoder({ - contentTopic: ContentTopic, - pubsubTopicShardInfo: singleShardInfo - }); - - const request = await waku.lightPush.send(encoder, { - payload: utf8ToBytes("Hello World") - }); - - expect(request.recipients.length).to.eq(1); - expect( - await messageCollector.waitForMessages(1, { - pubsubTopic: shardInfoToPubsubTopics(shardInfo)[0] - }) - ).to.eq(true); - }); - const numTest = 10; for (let i = 0; i < numTest; i++) { // Random clusterId between 2 and 1000 diff --git a/packages/tests/tests/store/multiple_pubsub.spec.ts b/packages/tests/tests/store/multiple_pubsub.spec.ts index a842afbe5f..240b1622fa 100644 --- a/packages/tests/tests/store/multiple_pubsub.spec.ts +++ b/packages/tests/tests/store/multiple_pubsub.spec.ts @@ -13,6 +13,7 @@ import { beforeEachCustom, makeLogFileName, NOISE_KEY_1, + resolveAutoshardingCluster, ServiceNode, tearDownNodes } from "../../src/index.js"; @@ -188,7 +189,7 @@ describe("Waku Store (Autosharding), custom pubsub topic", function () { const customContentTopic1 = "/waku/2/content/utf8"; const customContentTopic2 = "/myapp/1/latest/proto"; - const clusterId = 1; + const clusterId = resolveAutoshardingCluster(5); const autoshardingPubsubTopic1 = contentTopicToPubsubTopic( customContentTopic1, clusterId From 5ee3527e15de455ef520cfa5dbf67d59e8e78f06 Mon Sep 17 00:00:00 2001 From: Florin Barbu Date: Thu, 7 Mar 2024 13:08:00 +0200 Subject: [PATCH 04/16] fix typo --- packages/tests/src/utils/waku_versions_utils.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/tests/src/utils/waku_versions_utils.ts b/packages/tests/src/utils/waku_versions_utils.ts index 0eba84cea5..d5a878d0c4 100644 --- a/packages/tests/src/utils/waku_versions_utils.ts +++ b/packages/tests/src/utils/waku_versions_utils.ts @@ -20,13 +20,13 @@ export function isNwakuAtLeast(requiredVersion: string): boolean { } } -// Utility to resolve authosharding cluster ID +// Utility to resolve autosharding cluster ID export function resolveAutoshardingCluster(clusterId: number): number { if (isNwakuAtLeast("0.26.0")) { log.info(`Using clusterID ${clusterId} for autosharding`); return clusterId; } else { - // for versions older than 0.26.0 the authosharding cluster was hardcoded to 1 + // for versions older than 0.26.0 the autosharding cluster was hardcoded to 1 // https://github.com/waku-org/nwaku/pull/2505 log.warn("Falling back to clusterID 1 for autosharding"); return 1; From 21e284e16ffd91f47248abf81a708573b598a63b Mon Sep 17 00:00:00 2001 From: Florin Barbu Date: Fri, 8 Mar 2024 10:56:52 +0200 Subject: [PATCH 05/16] fix dispatchEvent test --- packages/tests/tests/connection-mananger/methods.spec.ts | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/packages/tests/tests/connection-mananger/methods.spec.ts b/packages/tests/tests/connection-mananger/methods.spec.ts index a43c1f2b8c..1a2175e1cf 100644 --- a/packages/tests/tests/connection-mananger/methods.spec.ts +++ b/packages/tests/tests/connection-mananger/methods.spec.ts @@ -232,8 +232,7 @@ describe("Public methods", function () { ).to.eq(0); }); - // Will be skipped until https://github.com/waku-org/js-waku/issues/1835 is fixed - it.skip("dispatchEvent via connectionManager", async function () { + it("dispatchEvent via connectionManager", async function () { const peerIdBootstrap = await createSecp256k1PeerId(); await waku.libp2p.peerStore.save(peerIdBootstrap, { tags: { @@ -252,7 +251,9 @@ describe("Public methods", function () { ); }); waku.connectionManager.dispatchEvent( - new CustomEvent("peer:connect", { detail: peerIdBootstrap }) + new CustomEvent(EPeersByDiscoveryEvents.PEER_CONNECT_BOOTSTRAP, { + detail: peerIdBootstrap + }) ); expect(await peerConnectedBootstrap).to.eq(true); }); From 11423fb19c7ab227c4d75d9becf387e31a8a9e44 Mon Sep 17 00:00:00 2001 From: Florin Barbu Date: Fri, 8 Mar 2024 14:26:55 +0200 Subject: [PATCH 06/16] sharding unit tests --- packages/utils/src/common/sharding.spec.ts | 404 +++++++++++++++++++-- 1 file changed, 382 insertions(+), 22 deletions(-) diff --git a/packages/utils/src/common/sharding.spec.ts b/packages/utils/src/common/sharding.spec.ts index f20e549c73..131ac2ba6a 100644 --- a/packages/utils/src/common/sharding.spec.ts +++ b/packages/utils/src/common/sharding.spec.ts @@ -1,10 +1,18 @@ +import { DEFAULT_CLUSTER_ID, DefaultPubsubTopic } from "@waku/interfaces"; import { expect } from "chai"; import { contentTopicsByPubsubTopic, contentTopicToPubsubTopic, contentTopicToShardIndex, - ensureValidContentTopic + determinePubsubTopic, + ensurePubsubTopicIsConfigured, + ensureShardingConfigured, + ensureValidContentTopic, + pubsubTopicToSingleShardInfo, + shardInfoToPubsubTopics, + singleShardInfosToShardInfo, + singleShardInfoToPubsubTopic } from "./sharding"; const testInvalidCases = ( @@ -95,19 +103,37 @@ describe("ensureValidContentTopic", () => { }); describe("contentTopicToShardIndex", () => { - it("converts content topics to expected shard index", () => { - const contentTopics: [string, number][] = [ - ["/toychat/2/huilong/proto", 3], - ["/myapp/1/latest/proto", 0], - ["/waku/2/content/test.js", 1], - ["/toychat/2/huilong/proto", 3], - ["/0/toychat/2/huilong/proto", 3], - ["/statusim/1/community/cbor", 4], - ["/0/statusim/1/community/cbor", 4] - ]; - for (const [topic, shard] of contentTopics) { - expect(contentTopicToShardIndex(topic)).to.eq(shard); - } + const contentTopicsWithExpectedShards: [string, number][] = [ + ["/toychat/2/huilong/proto", 3], + ["/myapp/1/latest/proto", 0], + ["/waku/2/content/test.js", 1], + ["/toychat/2/huilong/proto", 3], + ["/0/toychat/2/huilong/proto", 3], + ["/statusim/1/community/cbor", 4], + ["/0/statusim/1/community/cbor", 4], + ["/app/22/sometopic/someencoding", 2], + ["/app/27/sometopic/someencoding", 5], + ["/app/20/sometopic/someencoding", 7], + ["/app/29/sometopic/someencoding", 6] + ]; + contentTopicsWithExpectedShards.forEach(([topic, expectedShard]) => { + it(`should correctly map ${topic} to shard index ${expectedShard}`, () => { + expect(contentTopicToShardIndex(topic)).to.eq(expectedShard); + }); + }); + + const testCases: [number, string, number][] = [ + [16, "/app/20/sometopic/someencoding", 15], + [2, "/app/20/sometopic/someencoding", 1], + [1, "/app/20/sometopic/someencoding", 0] + ]; + + testCases.forEach(([networkShards, topic, expectedShard]) => { + it(`should correctly map ${topic} to shard index ${expectedShard} with networkShards ${networkShards}`, () => { + expect(contentTopicToShardIndex(topic, networkShards)).to.eq( + expectedShard + ); + }); }); it("topics with same application and version share the same shard", () => { @@ -133,15 +159,349 @@ describe("contentTopicsByPubsubTopic", () => { expect(grouped.get(pubsubTopic)?.includes(contentTopic)).to.be.true; } }); -}); -describe("contentTopicsByPubsubTopic", () => { - it("groups content topics by expected pubsub topic", () => { - const contentTopics = ["/toychat/2/huilong/proto", "/myapp/1/latest/proto"]; + it("groups multiple content topics into the same pubsub topic when they share the same shard index", () => { + const contentTopics = [ + "/app/22/sometopic/someencoding", + "/app/22/anothertopic/otherencoding" + ]; const grouped = contentTopicsByPubsubTopic(contentTopics); - for (const contentTopic of contentTopics) { - const pubsubTopic = contentTopicToPubsubTopic(contentTopic); - expect(grouped.get(pubsubTopic)?.includes(contentTopic)).to.be.true; - } + expect(grouped.size).to.eq(1); // Only one pubsub topic expected + const pubsubTopic = contentTopicToPubsubTopic(contentTopics[0]); + expect(grouped.get(pubsubTopic)?.length).to.eq(2); // Both topics should be grouped under the same pubsub topic + }); + + it("handles different clusterIds correctly", () => { + const contentTopics = ["/app/22/sometopic/someencoding"]; + const clusterId1 = 1; + const clusterId2 = 2; + const grouped1 = contentTopicsByPubsubTopic(contentTopics, clusterId1); + const grouped2 = contentTopicsByPubsubTopic(contentTopics, clusterId2); + const pubsubTopic1 = contentTopicToPubsubTopic( + contentTopics[0], + clusterId1 + ); + const pubsubTopic2 = contentTopicToPubsubTopic( + contentTopics[0], + clusterId2 + ); + expect(pubsubTopic1).not.to.equal(pubsubTopic2); + expect(grouped1.has(pubsubTopic1)).to.be.true; + expect(grouped1.has(pubsubTopic2)).to.be.false; + expect(grouped2.has(pubsubTopic1)).to.be.false; + expect(grouped2.has(pubsubTopic2)).to.be.true; + }); + + it("handles different networkShards values correctly", () => { + const contentTopics = ["/app/20/sometopic/someencoding"]; + const networkShards1 = 8; + const networkShards2 = 16; + const grouped1 = contentTopicsByPubsubTopic( + contentTopics, + DEFAULT_CLUSTER_ID, + networkShards1 + ); + const grouped2 = contentTopicsByPubsubTopic( + contentTopics, + DEFAULT_CLUSTER_ID, + networkShards2 + ); + const pubsubTopic1 = contentTopicToPubsubTopic( + contentTopics[0], + DEFAULT_CLUSTER_ID, + networkShards1 + ); + const pubsubTopic2 = contentTopicToPubsubTopic( + contentTopics[0], + DEFAULT_CLUSTER_ID, + networkShards2 + ); + expect(pubsubTopic1).not.to.equal(pubsubTopic2); + expect(grouped1.has(pubsubTopic1)).to.be.true; + expect(grouped1.has(pubsubTopic2)).to.be.false; + expect(grouped2.has(pubsubTopic1)).to.be.false; + expect(grouped2.has(pubsubTopic2)).to.be.true; + }); + + it("throws an error for improperly formatted content topics", () => { + const invalidContentTopics = ["/invalid/format"]; + expect(() => contentTopicsByPubsubTopic(invalidContentTopics)).to.throw(); + }); +}); + +describe("singleShardInfoToPubsubTopic", () => { + it("should convert a SingleShardInfo object to the correct PubsubTopic", () => { + const singleShardInfo = { clusterId: 1, shard: 2 }; + const expectedTopic = "/waku/2/rs/1/2"; + expect(singleShardInfoToPubsubTopic(singleShardInfo)).to.equal( + expectedTopic + ); + }); +}); + +describe("singleShardInfosToShardInfo", () => { + it("should aggregate SingleShardInfos into a ShardInfo", () => { + const singleShardInfos = [ + { clusterId: 1, shard: 2 }, + { clusterId: 1, shard: 3 }, + { clusterId: 1, shard: 5 } + ]; + const expectedShardInfo = { clusterId: 1, shards: [2, 3, 5] }; + expect(singleShardInfosToShardInfo(singleShardInfos)).to.deep.equal( + expectedShardInfo + ); + }); + + it("should throw an error for empty SingleShardInfos array", () => { + expect(() => singleShardInfosToShardInfo([])).to.throw("Invalid shard"); + }); + + it("should throw an error for SingleShardInfos with different clusterIds", () => { + const invalidShardInfos = [ + { clusterId: 1, shard: 2 }, + { clusterId: 2, shard: 3 } + ]; + expect(() => singleShardInfosToShardInfo(invalidShardInfos)).to.throw( + "Passed shard infos have different clusterIds" + ); + }); +}); + +describe("shardInfoToPubsubTopics", () => { + it("should convert content topics to PubsubTopics for autosharding", () => { + const shardInfo = { + contentTopics: ["/app/v1/topic1/proto", "/app/v1/topic2/proto"] + }; + const topics = shardInfoToPubsubTopics(shardInfo); + expect(topics).to.be.an("array").that.includes("/waku/2/rs/1/4"); + expect(topics.length).to.equal(1); + }); + + it("should return unique PubsubTopics for static sharding", () => { + const shardInfo = { clusterId: 1, shards: [0, 1, 0] }; // Duplicate shard to test uniqueness + const topics = shardInfoToPubsubTopics(shardInfo); + expect(topics).to.have.members(["/waku/2/rs/1/0", "/waku/2/rs/1/1"]); + expect(topics.length).to.equal(2); + }); + + it("should handle application and version for autosharding", () => { + const shardInfo = { application: "app", version: "v1" }; + const topics = shardInfoToPubsubTopics(shardInfo); + expect(topics).to.be.an("array").that.includes("/waku/2/rs/1/4"); + expect(topics.length).to.equal(1); + }); + + it("should return empty list for no shard", () => { + const shardInfo = { clusterId: 1, shards: [] }; + const topics = shardInfoToPubsubTopics(shardInfo); + expect(topics.length).to.equal(0); + }); + + it("should throw an error if shards are undefined for static sharding", () => { + const shardInfo = { clusterId: 1, shards: undefined }; + expect(() => shardInfoToPubsubTopics(shardInfo)).to.throw("Invalid shard"); + }); + + it("should throw an error for missing required configuration", () => { + const shardInfo = {}; + expect(() => shardInfoToPubsubTopics(shardInfo)).to.throw( + "Missing required configuration in shard parameters" + ); + }); +}); + +describe("pubsubTopicToSingleShardInfo with various invalid formats", () => { + const invalidTopics = [ + "/waku/1/rs/1/2", // Invalid Waku version + "/waku/2/r/1/2", // Invalid path segment + "/incorrect/format", // Completely incorrect format + "/waku/2/rs", // Missing both clusterId and shard + "/waku/2/rs/1/2/extra" // Extra trailing data + ]; + + it("should extract SingleShardInfo from a valid PubsubTopic", () => { + const topic = "/waku/2/rs/1/2"; + const expectedInfo = { clusterId: 1, shard: 2 }; + expect(pubsubTopicToSingleShardInfo(topic)).to.deep.equal(expectedInfo); + }); + + invalidTopics.forEach((topic) => { + it(`should throw an error for invalid PubsubTopic format: ${topic}`, () => { + expect(() => pubsubTopicToSingleShardInfo(topic)).to.throw( + "Invalid pubsub topic" + ); + }); + }); + + const nonNumericValues = ["x", "y", "$", "!", "\\", "-", "", " "]; + nonNumericValues.forEach((value) => { + it(`should throw an error for non-numeric clusterId: /waku/2/rs/${value}/1`, () => { + expect(() => + pubsubTopicToSingleShardInfo(`/waku/2/rs/${value}/1`) + ).to.throw("Invalid clusterId or shard"); + }); + + it(`should throw an error for non-numeric shard: /waku/2/rs/1/${value}`, () => { + expect(() => + pubsubTopicToSingleShardInfo(`/waku/2/rs/1/${value}`) + ).to.throw("Invalid clusterId or shard"); + }); + }); +}); + +describe("ensurePubsubTopicIsConfigured", () => { + it("should not throw an error for a single configured topic", () => { + const topic = "/waku/2/rs/1/2"; + const configuredTopics = [topic]; + expect(() => + ensurePubsubTopicIsConfigured(topic, configuredTopics) + ).not.to.throw(); + }); + + it("should not throw an error when the topic is within a list of configured topics", () => { + const topic = "/waku/2/rs/1/2"; + const configuredTopics = ["/waku/2/rs/1/1", topic, "/waku/2/rs/1/3"]; + expect(() => + ensurePubsubTopicIsConfigured(topic, configuredTopics) + ).not.to.throw(); + }); + + it("should throw an error for an unconfigured topic", () => { + const topic = "/waku/2/rs/1/2"; + const configuredTopics = ["/waku/2/rs/1/3"]; + expect(() => + ensurePubsubTopicIsConfigured(topic, configuredTopics) + ).to.throw(); + }); +}); + +describe("determinePubsubTopic", () => { + const contentTopic = "/app/46/sometopic/someencoding"; + it("should return the pubsub topic directly if a string is provided", () => { + const topic = "/waku/2/rs/1/3"; + expect(determinePubsubTopic(contentTopic, topic)).to.equal(topic); + }); + + it("should return a calculated topic if SingleShardInfo is provided", () => { + const info = { clusterId: 1, shard: 2 }; + const expectedTopic = "/waku/2/rs/1/2"; + expect(determinePubsubTopic(contentTopic, info)).to.equal(expectedTopic); + }); + + it("should fall back to default pubsub topic when pubsubTopicShardInfo is not provided", () => { + expect(determinePubsubTopic(contentTopic)).to.equal(DefaultPubsubTopic); + }); + + it("should process correctly when SingleShardInfo has no clusterId but has a shard", () => { + const info = { shard: 0 }; + const expectedTopic = `/waku/2/rs/${DEFAULT_CLUSTER_ID}/6`; + expect(determinePubsubTopic(contentTopic, info as any)).to.equal( + expectedTopic + ); + }); + + it("should derive a pubsub topic using contentTopic when SingleShardInfo only contains clusterId", () => { + const info = { clusterId: 2 }; + const expectedTopic = contentTopicToPubsubTopic( + contentTopic, + info.clusterId + ); + expect(determinePubsubTopic(contentTopic, info as any)).to.equal( + expectedTopic + ); + }); +}); + +describe("ensureShardingConfigured", () => { + it("should return valid sharding parameters for static sharding", () => { + const shardInfo = { clusterId: 1, shards: [0, 1] }; + const result = ensureShardingConfigured(shardInfo); + expect(result.shardingParams).to.deep.include({ + clusterId: 1, + shards: [0, 1] + }); + expect(result.shardInfo).to.deep.include({ clusterId: 1, shards: [0, 1] }); + expect(result.pubsubTopics).to.have.members([ + "/waku/2/rs/1/0", + "/waku/2/rs/1/1" + ]); + }); + + it("should return valid sharding parameters for content topics autosharding", () => { + const shardInfo = { contentTopics: ["/app/v1/topic1/proto"] }; + const result = ensureShardingConfigured(shardInfo); + expect(result.shardingParams).to.deep.include({ + contentTopics: ["/app/v1/topic1/proto"] + }); + const expectedPubsubTopic = contentTopicToPubsubTopic( + "/app/v1/topic1/proto", + DEFAULT_CLUSTER_ID + ); + expect(result.shardInfo.shards).to.include( + contentTopicToShardIndex("/app/v1/topic1/proto") + ); + expect(result.pubsubTopics).to.include(expectedPubsubTopic); + }); + + it("should configure sharding based on application and version for autosharding", () => { + const shardInfo = { application: "app", version: "v1" }; + const result = ensureShardingConfigured(shardInfo); + expect(result.shardingParams).to.deep.include({ + application: "app", + version: "v1" + }); + const expectedPubsubTopic = contentTopicToPubsubTopic( + `/app/v1/default/default` + ); + expect(result.pubsubTopics).to.include(expectedPubsubTopic); + expect(result.shardInfo.shards).to.include( + pubsubTopicToSingleShardInfo(expectedPubsubTopic).shard + ); + }); + + it("should throw an error for missing sharding configuration", () => { + const shardInfo = {}; + expect(() => ensureShardingConfigured(shardInfo)).to.throw(); + }); + + it("handles empty shards array correctly", () => { + const shardInfo = { clusterId: 1, shards: [] }; + expect(() => ensureShardingConfigured(shardInfo)).to.throw(); + }); + + it("handles empty contentTopics array correctly", () => { + const shardInfo = { contentTopics: [] }; + expect(() => ensureShardingConfigured(shardInfo)).to.throw(); + }); +}); + +describe("contentTopicToPubsubTopic", () => { + it("should correctly map a content topic to a pubsub topic", () => { + const contentTopic = "/app/v1/topic1/proto"; + expect(contentTopicToPubsubTopic(contentTopic)).to.equal("/waku/2/rs/1/4"); + }); + + it("should map different content topics to different pubsub topics based on shard index", () => { + const contentTopic1 = "/app/v1/topic1/proto"; + const contentTopic2 = "/app/v2/topic2/proto"; + const pubsubTopic1 = contentTopicToPubsubTopic(contentTopic1); + const pubsubTopic2 = contentTopicToPubsubTopic(contentTopic2); + expect(pubsubTopic1).not.to.equal(pubsubTopic2); + }); + + it("should use the provided clusterId for the pubsub topic", () => { + const contentTopic = "/app/v1/topic1/proto"; + const clusterId = 2; + expect(contentTopicToPubsubTopic(contentTopic, clusterId)).to.equal( + "/waku/2/rs/2/4" + ); + }); + + it("should correctly map a content topic to a pubsub topic for different network shard sizes", () => { + const contentTopic = "/app/v1/topic1/proto"; + const networkShards = 16; + expect(contentTopicToPubsubTopic(contentTopic, 1, networkShards)).to.equal( + "/waku/2/rs/1/4" + ); }); }); From d3f414dd08082db9cc323b0627e98efa8fdb3232 Mon Sep 17 00:00:00 2001 From: Florin Barbu Date: Fri, 8 Mar 2024 15:30:08 +0200 Subject: [PATCH 07/16] port adjustment --- packages/tests/src/lib/service_node.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/tests/src/lib/service_node.ts b/packages/tests/src/lib/service_node.ts index 9019daf5f2..b2742d0d7e 100644 --- a/packages/tests/src/lib/service_node.ts +++ b/packages/tests/src/lib/service_node.ts @@ -113,7 +113,8 @@ export class ServiceNode { // waku nodes takes some time to bind port so to decrease chances of conflict // we also randomize the first port that portfinder will try - const startPort = Math.floor(Math.random() * (65535 - 1025) + 1025); + // depending on getPorts count adjust the random function in such a way that max port is 65535 + const startPort = Math.floor(Math.random() * (65530 - 1025) + 1025); const ports: Ports = await new Promise((resolve, reject) => { portfinder.getPorts(4, { port: startPort }, (err, ports) => { From 77d297bf1951955d61d516e92859d2fd13c13807 Mon Sep 17 00:00:00 2001 From: Florin Barbu Date: Mon, 11 Mar 2024 15:49:38 +0200 Subject: [PATCH 08/16] update unit tests --- packages/utils/src/common/sharding.spec.ts | 42 ++++++++++++++++++++-- 1 file changed, 40 insertions(+), 2 deletions(-) diff --git a/packages/utils/src/common/sharding.spec.ts b/packages/utils/src/common/sharding.spec.ts index 131ac2ba6a..7ae9f2bc18 100644 --- a/packages/utils/src/common/sharding.spec.ts +++ b/packages/utils/src/common/sharding.spec.ts @@ -231,8 +231,8 @@ describe("contentTopicsByPubsubTopic", () => { describe("singleShardInfoToPubsubTopic", () => { it("should convert a SingleShardInfo object to the correct PubsubTopic", () => { - const singleShardInfo = { clusterId: 1, shard: 2 }; - const expectedTopic = "/waku/2/rs/1/2"; + const singleShardInfo = { clusterId: 2, shard: 2 }; + const expectedTopic = "/waku/2/rs/2/2"; expect(singleShardInfoToPubsubTopic(singleShardInfo)).to.equal( expectedTopic ); @@ -291,6 +291,21 @@ describe("shardInfoToPubsubTopics", () => { expect(topics.length).to.equal(1); }); + [0, 1, 6].forEach((clusterId) => { + it(`should handle clusterId, application and version for autosharding with cluster iD ${clusterId}`, () => { + const shardInfo = { + clusterId: clusterId, + application: "app", + version: "v1" + }; + const topics = shardInfoToPubsubTopics(shardInfo); + expect(topics) + .to.be.an("array") + .that.includes(`/waku/2/rs/${clusterId}/4`); + expect(topics.length).to.equal(1); + }); + }); + it("should return empty list for no shard", () => { const shardInfo = { clusterId: 1, shards: [] }; const topics = shardInfoToPubsubTopics(shardInfo); @@ -459,6 +474,29 @@ describe("ensureShardingConfigured", () => { ); }); + [0, 1, 4].forEach((clusterId) => { + it(`should configure sharding based on clusterId, application and version for autosharding with cluster iD ${clusterId}`, () => { + const shardInfo = { + clusterId: clusterId, + application: "app", + version: "v1" + }; + const result = ensureShardingConfigured(shardInfo); + expect(result.shardingParams).to.deep.include({ + application: "app", + version: "v1" + }); + const expectedPubsubTopic = contentTopicToPubsubTopic( + `/app/v1/default/default`, + shardInfo.clusterId + ); + expect(result.pubsubTopics).to.include(expectedPubsubTopic); + expect(result.shardInfo.shards).to.include( + pubsubTopicToSingleShardInfo(expectedPubsubTopic).shard + ); + }); + }); + it("should throw an error for missing sharding configuration", () => { const shardInfo = {}; expect(() => ensureShardingConfigured(shardInfo)).to.throw(); From f9fd0b1d3fb3bdfb69c73e63614bd5b5c2ec2a65 Mon Sep 17 00:00:00 2001 From: Florin Barbu Date: Mon, 11 Mar 2024 15:58:45 +0200 Subject: [PATCH 09/16] fix 1902 --- packages/utils/src/common/sharding.ts | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/packages/utils/src/common/sharding.ts b/packages/utils/src/common/sharding.ts index 236ebc0f1f..fdf6babed8 100644 --- a/packages/utils/src/common/sharding.ts +++ b/packages/utils/src/common/sharding.ts @@ -66,7 +66,8 @@ export const shardInfoToPubsubTopics = ( // Autosharding: single shard from application and version return [ contentTopicToPubsubTopic( - `/${shardInfo.application}/${shardInfo.version}/default/default` + `/${shardInfo.application}/${shardInfo.version}/default/default`, + shardInfo.clusterId ) ]; } else { @@ -291,7 +292,8 @@ export const ensureShardingConfigured = ( if (isApplicationVersionConfigured) { const pubsubTopic = contentTopicToPubsubTopic( - `/${application}/${version}/default/default` + `/${application}/${version}/default/default`, + clusterId ); return { shardingParams: { clusterId, application, version }, From 04a2ff9189482d95998b245663e0f2bda0667e06 Mon Sep 17 00:00:00 2001 From: Florin Barbu Date: Mon, 11 Mar 2024 16:05:53 +0200 Subject: [PATCH 10/16] adjust content topic tests --- .../tests/tests/sdk/content_topic.spec.ts | 29 +++++++++++++------ 1 file changed, 20 insertions(+), 9 deletions(-) diff --git a/packages/tests/tests/sdk/content_topic.spec.ts b/packages/tests/tests/sdk/content_topic.spec.ts index 32776a7d7e..c37f2197eb 100644 --- a/packages/tests/tests/sdk/content_topic.spec.ts +++ b/packages/tests/tests/sdk/content_topic.spec.ts @@ -3,7 +3,6 @@ import { bytesToUtf8, createEncoder, createLightNode, - DEFAULT_CLUSTER_ID, defaultLibp2p, LightNode, Protocols, @@ -20,11 +19,17 @@ import { } from "@waku/utils"; import { expect } from "chai"; -import { makeLogFileName, ServiceNode, tearDownNodes } from "../../src"; +import { + makeLogFileName, + resolveAutoshardingCluster, + ServiceNode, + tearDownNodes +} from "../../src"; describe("SDK: Creating by Content Topic", function () { const ContentTopic = "/myapp/1/latest/proto"; const testMessage = "Test123"; + const clusterId = resolveAutoshardingCluster(2); let nwaku: ServiceNode; let waku: LightNode; let waku2: LightNode; @@ -33,13 +38,13 @@ describe("SDK: Creating by Content Topic", function () { this.timeout(15000); nwaku = new ServiceNode(makeLogFileName(this) + "1"); await nwaku.start({ - pubsubTopic: [contentTopicToPubsubTopic(ContentTopic)], + pubsubTopic: [contentTopicToPubsubTopic(ContentTopic, clusterId)], lightpush: true, relay: true, filter: true, discv5Discovery: true, peerExchange: true, - clusterId: DEFAULT_CLUSTER_ID + clusterId: clusterId }); }); @@ -49,7 +54,10 @@ describe("SDK: Creating by Content Topic", function () { }); it("given a content topic, creates a waku node and filter subscription", async function () { - const expectedPubsubTopic = contentTopicToPubsubTopic(ContentTopic); + const expectedPubsubTopic = contentTopicToPubsubTopic( + ContentTopic, + clusterId + ); waku = ( await subscribeToContentTopic(ContentTopic, () => {}, { @@ -61,7 +69,10 @@ describe("SDK: Creating by Content Topic", function () { }); it("given a waku node and content topic, creates a filter subscription", async function () { - const expectedPubsubTopic = contentTopicToPubsubTopic(ContentTopic); + const expectedPubsubTopic = contentTopicToPubsubTopic( + ContentTopic, + clusterId + ); waku = await createLightNode({ shardInfo: { contentTopics: [ContentTopic] } @@ -95,7 +106,7 @@ describe("SDK: Creating by Content Topic", function () { await waitForRemotePeer(waku2, [Protocols.LightPush]); const encoder = createEncoder({ pubsubTopicShardInfo: pubsubTopicToSingleShardInfo( - contentTopicToPubsubTopic(ContentTopic) + contentTopicToPubsubTopic(ContentTopic, clusterId) ), contentTopic: ContentTopic }); @@ -135,7 +146,7 @@ describe("SDK: Creating by Content Topic", function () { await waitForRemotePeer(waku2, [Protocols.LightPush]); const encoder = createEncoder({ pubsubTopicShardInfo: pubsubTopicToSingleShardInfo( - contentTopicToPubsubTopic(ContentTopic) + contentTopicToPubsubTopic(ContentTopic, clusterId) ), contentTopic: ContentTopic }); @@ -160,7 +171,7 @@ describe("SDK: Creating by Content Topic", function () { const encoder = createEncoder({ pubsubTopicShardInfo: pubsubTopicToSingleShardInfo( - contentTopicToPubsubTopic(ContentTopic) + contentTopicToPubsubTopic(ContentTopic, clusterId) ), contentTopic: ContentTopic }); From 639b1863f359cb5c37d834f85d11a87c39380e12 Mon Sep 17 00:00:00 2001 From: Florin Barbu Date: Tue, 12 Mar 2024 12:29:02 +0200 Subject: [PATCH 11/16] adjust metdata tests --- packages/tests/tests/metadata.spec.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/tests/tests/metadata.spec.ts b/packages/tests/tests/metadata.spec.ts index 2d5779fd66..a9e01922d8 100644 --- a/packages/tests/tests/metadata.spec.ts +++ b/packages/tests/tests/metadata.spec.ts @@ -56,7 +56,7 @@ describe("Metadata Protocol", function () { await waku.libp2p.services.metadata?.query(nwaku1PeerId); expect(shardInfoRes).to.not.be.undefined; expect(shardInfoRes?.clusterId).to.equal(shardInfo.clusterId); - expect(shardInfoRes?.shards).to.deep.equal(shardInfo.shards); + expect(shardInfoRes?.shards).to.include.members(shardInfo.shards); const activeConnections = waku.libp2p.getConnections(); expect(activeConnections.length).to.equal(1); @@ -92,7 +92,7 @@ describe("Metadata Protocol", function () { await waku.libp2p.services.metadata?.query(nwaku1PeerId); expect(shardInfoRes).to.not.be.undefined; expect(shardInfoRes?.clusterId).to.equal(shardInfo1.clusterId); - expect(shardInfoRes?.shards).to.deep.equal(shardInfo1.shards); + expect(shardInfoRes?.shards).to.include.members(shardInfo1.shards); const activeConnections = waku.libp2p.getConnections(); expect(activeConnections.length).to.equal(1); @@ -196,6 +196,6 @@ describe("Metadata Protocol", function () { expect(metadataShardInfo).not.be.undefined; expect(metadataShardInfo!.clusterId).to.eq(shardInfo.clusterId); - expect(metadataShardInfo.shards).to.deep.eq(shardInfo.shards); + expect(metadataShardInfo.shards).to.include.members(shardInfo.shards); }); }); From a9a89ee4137395415162965e397f1e53519dc031 Mon Sep 17 00:00:00 2001 From: Florin Barbu Date: Tue, 12 Mar 2024 13:37:53 +0200 Subject: [PATCH 12/16] small adjustments --- packages/tests/tests/sharding/auto_sharding.spec.ts | 12 ++++++------ .../tests/tests/sharding/peer_management.spec.ts | 2 +- .../tests/tests/sharding/static_sharding.spec.ts | 8 ++++---- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/packages/tests/tests/sharding/auto_sharding.spec.ts b/packages/tests/tests/sharding/auto_sharding.spec.ts index 2b38fdb34d..057dff9834 100644 --- a/packages/tests/tests/sharding/auto_sharding.spec.ts +++ b/packages/tests/tests/sharding/auto_sharding.spec.ts @@ -75,7 +75,7 @@ describe("Autosharding: Running Nodes", function () { payload: utf8ToBytes("Hello World") }); - expect(request.recipients.length).to.eq(1); + expect(request.successes.length).to.eq(1); expect( await messageCollector.waitForMessagesAutosharding(1, { contentTopic: ContentTopic @@ -115,7 +115,7 @@ describe("Autosharding: Running Nodes", function () { payload: utf8ToBytes("Hello World") }); - expect(request.recipients.length).to.eq(1); + expect(request.successes.length).to.eq(1); expect( await messageCollector.waitForMessagesAutosharding(1, { contentTopic: ContentTopic @@ -172,7 +172,7 @@ describe("Autosharding: Running Nodes", function () { payload: utf8ToBytes("Hello World") }); - expect(request.recipients.length).to.eq(1); + expect(request.successes.length).to.eq(1); expect( await messageCollector.waitForMessagesAutosharding(1, { contentTopic: ContentTopic @@ -242,7 +242,7 @@ describe("Autosharding: Running Nodes", function () { const request1 = await waku.lightPush.send(encoder1, { payload: utf8ToBytes("Hello World") }); - expect(request1.recipients.length).to.eq(1); + expect(request1.successes.length).to.eq(1); expect( await messageCollector.waitForMessagesAutosharding(1, { contentTopic: ContentTopic @@ -252,7 +252,7 @@ describe("Autosharding: Running Nodes", function () { const request2 = await waku.lightPush.send(encoder2, { payload: utf8ToBytes("Hello World") }); - expect(request2.recipients.length).to.eq(1); + expect(request2.successes.length).to.eq(1); expect( await messageCollector.waitForMessagesAutosharding(1, { contentTopic: ContentTopic @@ -343,7 +343,7 @@ describe("Autosharding: Running Nodes", function () { payload: utf8ToBytes("Hello World") }); - expect(request.recipients.length).to.eq(1); + expect(request.successes.length).to.eq(1); expect( await messageCollector.waitForMessagesAutosharding(1, { contentTopic: ContentTopic diff --git a/packages/tests/tests/sharding/peer_management.spec.ts b/packages/tests/tests/sharding/peer_management.spec.ts index ede0a84ab6..e3063a64ad 100644 --- a/packages/tests/tests/sharding/peer_management.spec.ts +++ b/packages/tests/tests/sharding/peer_management.spec.ts @@ -206,7 +206,7 @@ describe("Static Sharding: Peer Management", function () { }); describe("Autosharding: Peer Management", function () { - const ContentTopic = "/waku/2/content/test.js"; + const ContentTopic = "/myapp/1/latest/proto"; const clusterId = resolveAutoshardingCluster(8); describe("Peer Exchange", function () { diff --git a/packages/tests/tests/sharding/static_sharding.spec.ts b/packages/tests/tests/sharding/static_sharding.spec.ts index 49d7908d27..dc6f0542ff 100644 --- a/packages/tests/tests/sharding/static_sharding.spec.ts +++ b/packages/tests/tests/sharding/static_sharding.spec.ts @@ -83,7 +83,7 @@ describe("Static Sharding: Running Nodes", function () { payload: utf8ToBytes("Hello World") }); - expect(request.recipients.length).to.eq(1); + expect(request.successes.length).to.eq(1); expect( await messageCollector.waitForMessages(1, { pubsubTopic: shardInfoToPubsubTopics(shardInfo)[0] @@ -132,7 +132,7 @@ describe("Static Sharding: Running Nodes", function () { payload: utf8ToBytes("Hello World") }); - expect(request.recipients.length).to.eq(1); + expect(request.successes.length).to.eq(1); expect( await messageCollector.waitForMessages(1, { pubsubTopic: shardInfoToPubsubTopics(shardInfo)[0] @@ -210,7 +210,7 @@ describe("Static Sharding: Running Nodes", function () { const request1 = await waku.lightPush.send(encoder1, { payload: utf8ToBytes("Hello World2") }); - expect(request1.recipients.length).to.eq(1); + expect(request1.successes.length).to.eq(1); expect( await messageCollector.waitForMessages(1, { pubsubTopic: shardInfoToPubsubTopics(shardInfo)[0] @@ -220,7 +220,7 @@ describe("Static Sharding: Running Nodes", function () { const request2 = await waku.lightPush.send(encoder2, { payload: utf8ToBytes("Hello World3") }); - expect(request2.recipients.length).to.eq(1); + expect(request2.successes.length).to.eq(1); expect( await messageCollector.waitForMessages(1, { pubsubTopic: shardInfoToPubsubTopics(shardInfo)[0] From 36a68e12902a500d08752f2e5691e9dd1898178d Mon Sep 17 00:00:00 2001 From: Florin Barbu Date: Tue, 12 Mar 2024 14:00:27 +0200 Subject: [PATCH 13/16] fix --- .../tests/sharding/auto_sharding.spec.ts | 28 ++++++----------- .../tests/sharding/static_sharding.spec.ts | 31 ++++++------------- 2 files changed, 19 insertions(+), 40 deletions(-) diff --git a/packages/tests/tests/sharding/auto_sharding.spec.ts b/packages/tests/tests/sharding/auto_sharding.spec.ts index 057dff9834..6d2612ef8d 100644 --- a/packages/tests/tests/sharding/auto_sharding.spec.ts +++ b/packages/tests/tests/sharding/auto_sharding.spec.ts @@ -1,4 +1,4 @@ -import { LightNode, Protocols } from "@waku/interfaces"; +import { LightNode, ProtocolError, Protocols } from "@waku/interfaces"; import { createEncoder, createLightNode, @@ -287,26 +287,16 @@ describe("Autosharding: Running Nodes", function () { } }); - try { - await waku.lightPush.send(encoder, { - payload: utf8ToBytes("Hello World") - }); + const { successes, failures } = await waku.lightPush.send(encoder, { + payload: utf8ToBytes("Hello World") + }); + + if (successes.length > 0 || failures?.length === 0) { throw new Error("The request should've thrown an error"); - } catch (err) { - if ( - !(err instanceof Error) || - !err.message.includes( - `Pubsub topic ${contentTopicToPubsubTopic( - ContentTopic2, - clusterId - )} has not been configured on this instance. Configured topics are: ${ - pubsubTopics[0] - }` - ) - ) { - throw err; - } } + + const errors = failures?.map((failure) => failure.error); + expect(errors).to.include(ProtocolError.TOPIC_NOT_CONFIGURED); }); it("start node with ApplicationInfo", async function () { diff --git a/packages/tests/tests/sharding/static_sharding.spec.ts b/packages/tests/tests/sharding/static_sharding.spec.ts index dc6f0542ff..b4558ecb5a 100644 --- a/packages/tests/tests/sharding/static_sharding.spec.ts +++ b/packages/tests/tests/sharding/static_sharding.spec.ts @@ -1,5 +1,6 @@ import { LightNode, + ProtocolError, Protocols, ShardInfo, SingleShardInfo @@ -146,14 +147,6 @@ describe("Static Sharding: Running Nodes", function () { const clusterId = 2; let shardInfo: ShardInfo; - const PubsubTopic1 = singleShardInfoToPubsubTopic({ - clusterId: clusterId, - shard: 2 - }); - const PubsubTopic2 = singleShardInfoToPubsubTopic({ - clusterId: clusterId, - shard: 3 - }); const shardInfoFirstShard: ShardInfo = { clusterId: clusterId, shards: [2] @@ -229,6 +222,7 @@ describe("Static Sharding: Running Nodes", function () { }); it("using a protocol with unconfigured pubsub topic should fail", async function () { + this.timeout(15_000); waku = await createLightNode({ shardInfo: shardInfoFirstShard }); @@ -239,21 +233,16 @@ describe("Static Sharding: Running Nodes", function () { pubsubTopicShardInfo: singleShardInfo2 }); - try { - await waku.lightPush.send(encoder, { - payload: utf8ToBytes("Hello World") - }); + const { successes, failures } = await waku.lightPush.send(encoder, { + payload: utf8ToBytes("Hello World") + }); + + if (successes.length > 0 || failures?.length === 0) { throw new Error("The request should've thrown an error"); - } catch (err) { - if ( - !(err instanceof Error) || - !err.message.includes( - `Pubsub topic ${PubsubTopic2} has not been configured on this instance. Configured topics are: ${PubsubTopic1}` - ) - ) { - throw err; - } } + + const errors = failures?.map((failure) => failure.error); + expect(errors).to.include(ProtocolError.TOPIC_NOT_CONFIGURED); }); it("start node with empty shard", async function () { From 1cf81515608e47d7a847599164e3e9093aede088 Mon Sep 17 00:00:00 2001 From: Florin Barbu Date: Wed, 13 Mar 2024 09:14:20 +0200 Subject: [PATCH 14/16] update resolveAutoshardingCluster version --- packages/tests/src/utils/waku_versions_utils.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/tests/src/utils/waku_versions_utils.ts b/packages/tests/src/utils/waku_versions_utils.ts index d5a878d0c4..52fb493be8 100644 --- a/packages/tests/src/utils/waku_versions_utils.ts +++ b/packages/tests/src/utils/waku_versions_utils.ts @@ -22,7 +22,7 @@ export function isNwakuAtLeast(requiredVersion: string): boolean { // Utility to resolve autosharding cluster ID export function resolveAutoshardingCluster(clusterId: number): number { - if (isNwakuAtLeast("0.26.0")) { + if (isNwakuAtLeast("0.27.0")) { log.info(`Using clusterID ${clusterId} for autosharding`); return clusterId; } else { From ab430a1494104154a3a738b66d28360ef5a06fcf Mon Sep 17 00:00:00 2001 From: Florin Barbu Date: Thu, 14 Mar 2024 10:20:27 +0200 Subject: [PATCH 15/16] skip autosharding tests for nwaku < 0.27.0 --- packages/tests/src/utils/waku_versions_utils.ts | 2 +- .../tests/filter/single_node/multiple_pubsub.node.spec.ts | 7 +++++++ packages/tests/tests/getPeers.spec.ts | 7 +++++++ .../light-push/single_node/multiple_pubsub.node.spec.ts | 7 +++++++ packages/tests/tests/sharding/auto_sharding.spec.ts | 7 +++++++ packages/tests/tests/store/multiple_pubsub.spec.ts | 7 +++++++ 6 files changed, 36 insertions(+), 1 deletion(-) diff --git a/packages/tests/src/utils/waku_versions_utils.ts b/packages/tests/src/utils/waku_versions_utils.ts index 52fb493be8..e2f6cf5c9c 100644 --- a/packages/tests/src/utils/waku_versions_utils.ts +++ b/packages/tests/src/utils/waku_versions_utils.ts @@ -26,7 +26,7 @@ export function resolveAutoshardingCluster(clusterId: number): number { log.info(`Using clusterID ${clusterId} for autosharding`); return clusterId; } else { - // for versions older than 0.26.0 the autosharding cluster was hardcoded to 1 + // for versions older than 0.27.0 the autosharding cluster was hardcoded to 1 // https://github.com/waku-org/nwaku/pull/2505 log.warn("Falling back to clusterID 1 for autosharding"); return 1; diff --git a/packages/tests/tests/filter/single_node/multiple_pubsub.node.spec.ts b/packages/tests/tests/filter/single_node/multiple_pubsub.node.spec.ts index 0f9c59df8b..55f176434c 100644 --- a/packages/tests/tests/filter/single_node/multiple_pubsub.node.spec.ts +++ b/packages/tests/tests/filter/single_node/multiple_pubsub.node.spec.ts @@ -19,6 +19,7 @@ import { expect } from "chai"; import { afterEachCustom, beforeEachCustom, + isNwakuAtLeast, makeLogFileName, MessageCollector, resolveAutoshardingCluster, @@ -231,6 +232,12 @@ describe("Waku Filter V2 (Autosharding): Multiple PubsubTopics", function () { shard: contentTopicToShardIndex(customContentTopic2) }); + before(async () => { + if (!isNwakuAtLeast("0.27.0")) { + this.ctx.skip(); + } + }); + beforeEachCustom(this, async () => { [nwaku, waku] = await runNodes( this.ctx, diff --git a/packages/tests/tests/getPeers.spec.ts b/packages/tests/tests/getPeers.spec.ts index 42795143fa..bc5f6e690b 100644 --- a/packages/tests/tests/getPeers.spec.ts +++ b/packages/tests/tests/getPeers.spec.ts @@ -20,6 +20,7 @@ import Sinon from "sinon"; import { afterEachCustom, beforeEachCustom, + isNwakuAtLeast, makeLogFileName, resolveAutoshardingCluster, ServiceNode, @@ -33,6 +34,12 @@ describe("getConnectedPeersForProtocolAndShard", function () { const contentTopic = "/test/2/waku-light-push/utf8"; const autoshardingClusterId = resolveAutoshardingCluster(6); + before(async () => { + if (!isNwakuAtLeast("0.27.0")) { + this.ctx.skip(); + } + }); + beforeEachCustom(this, async () => { serviceNode1 = new ServiceNode(makeLogFileName(this.ctx) + "1"); serviceNode2 = new ServiceNode(makeLogFileName(this.ctx) + "2"); diff --git a/packages/tests/tests/light-push/single_node/multiple_pubsub.node.spec.ts b/packages/tests/tests/light-push/single_node/multiple_pubsub.node.spec.ts index 89cc16428c..b615ee4ed0 100644 --- a/packages/tests/tests/light-push/single_node/multiple_pubsub.node.spec.ts +++ b/packages/tests/tests/light-push/single_node/multiple_pubsub.node.spec.ts @@ -19,6 +19,7 @@ import { expect } from "chai"; import { afterEachCustom, beforeEachCustom, + isNwakuAtLeast, makeLogFileName, MessageCollector, resolveAutoshardingCluster, @@ -204,6 +205,12 @@ describe("Waku Light Push (Autosharding): Multiple PubsubTopics", function () { let nimPeerId: PeerId; + before(async () => { + if (!isNwakuAtLeast("0.27.0")) { + this.ctx.skip(); + } + }); + beforeEachCustom(this, async () => { [nwaku, waku] = await runNodes( this.ctx, diff --git a/packages/tests/tests/sharding/auto_sharding.spec.ts b/packages/tests/tests/sharding/auto_sharding.spec.ts index 6d2612ef8d..e1b623405a 100644 --- a/packages/tests/tests/sharding/auto_sharding.spec.ts +++ b/packages/tests/tests/sharding/auto_sharding.spec.ts @@ -14,6 +14,7 @@ import { expect } from "chai"; import { afterEachCustom, beforeEachCustom, + isNwakuAtLeast, makeLogFileName, MessageCollector, resolveAutoshardingCluster, @@ -31,6 +32,12 @@ describe("Autosharding: Running Nodes", function () { let nwaku: ServiceNode; let messageCollector: MessageCollector; + before(async () => { + if (!isNwakuAtLeast("0.27.0")) { + this.ctx.skip(); + } + }); + beforeEachCustom(this, async () => { nwaku = new ServiceNode(makeLogFileName(this.ctx)); messageCollector = new MessageCollector(nwaku); diff --git a/packages/tests/tests/store/multiple_pubsub.spec.ts b/packages/tests/tests/store/multiple_pubsub.spec.ts index 240b1622fa..2fd1869397 100644 --- a/packages/tests/tests/store/multiple_pubsub.spec.ts +++ b/packages/tests/tests/store/multiple_pubsub.spec.ts @@ -11,6 +11,7 @@ import { expect } from "chai"; import { afterEachCustom, beforeEachCustom, + isNwakuAtLeast, makeLogFileName, NOISE_KEY_1, resolveAutoshardingCluster, @@ -215,6 +216,12 @@ describe("Waku Store (Autosharding), custom pubsub topic", function () { contentTopics: [customContentTopic1, customContentTopic2] }; + before(async () => { + if (!isNwakuAtLeast("0.27.0")) { + this.ctx.skip(); + } + }); + beforeEachCustom(this, async () => { nwaku = new ServiceNode(makeLogFileName(this.ctx)); await nwaku.start({ From 9987c70142dc6d271c141ffe4366a8cf5cf921f3 Mon Sep 17 00:00:00 2001 From: Florin Barbu Date: Thu, 14 Mar 2024 10:59:03 +0200 Subject: [PATCH 16/16] skip autosharding tests for nwaku < 0.27.0 --- packages/tests/tests/sharding/peer_management.spec.ts | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/packages/tests/tests/sharding/peer_management.spec.ts b/packages/tests/tests/sharding/peer_management.spec.ts index e3063a64ad..95341e43a9 100644 --- a/packages/tests/tests/sharding/peer_management.spec.ts +++ b/packages/tests/tests/sharding/peer_management.spec.ts @@ -20,6 +20,7 @@ import { afterEachCustom, beforeEachCustom, delay, + isNwakuAtLeast, makeLogFileName, resolveAutoshardingCluster, ServiceNode, @@ -209,6 +210,12 @@ describe("Autosharding: Peer Management", function () { const ContentTopic = "/myapp/1/latest/proto"; const clusterId = resolveAutoshardingCluster(8); + before(async () => { + if (!isNwakuAtLeast("0.27.0")) { + this.ctx.skip(); + } + }); + describe("Peer Exchange", function () { let waku: LightNode; let nwaku1: ServiceNode;