Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: remove window reference and improve waitForRemotePeer #2194

Merged
merged 6 commits into from
Nov 8, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions packages/sdk/src/reliability_monitor/receiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ export class ReceiverReliabilityMonitor {
return;
}

const timeout = window.setTimeout(
const timeout = setTimeout(
(async () => {
const receivedAnyMessage = this.verifiedPeers.has(peerIdStr);
const receivedTestMessage = this.receivedMessagesFormPeer.has(
Expand All @@ -136,7 +136,7 @@ export class ReceiverReliabilityMonitor {
await this.renewAndSubscribePeer(peerId);
}) as () => void,
MESSAGE_VERIFICATION_DELAY
);
) as unknown as number;

this.scheduledVerification.set(peerIdStr, timeout);
}
Expand Down
8 changes: 4 additions & 4 deletions packages/sdk/src/waku/wait_for_remote_peer.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import sinon from "sinon";
import { waitForRemotePeer } from "./wait_for_remote_peer.js";
import { WakuNode } from "./waku.js";

describe("waitForRemotePeer", () => {
describe.only("waitForRemotePeer", () => {
let eventTarget = new EventTarget();

beforeEach(() => {
Expand Down Expand Up @@ -121,8 +121,8 @@ describe("waitForRemotePeer", () => {
});

it("should check connected peers if present and suitable", async () => {
const addEventListenerSpy = sinon.spy(eventTarget.addEventListener);
eventTarget.addEventListener = addEventListenerSpy;
const removeEventListenerSpy = sinon.spy(eventTarget.removeEventListener);
eventTarget.removeEventListener = removeEventListenerSpy;

const wakuNode = mockWakuNode({
isStarted: true,
Expand All @@ -144,7 +144,7 @@ describe("waitForRemotePeer", () => {
}

expect(err).to.be.undefined;
expect(addEventListenerSpy.notCalled).to.be.true;
expect(removeEventListenerSpy.notCalled).to.be.true;
});

it("should wait for LightPush peer to be connected", async () => {
Expand Down
91 changes: 54 additions & 37 deletions packages/sdk/src/waku/wait_for_remote_peer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,57 +40,76 @@
throw Error("Waku node is not started");
}

for (const protocol of protocols) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this improves error handling since Promise.any propagates differently

switch (protocol) {
case Protocols.Relay:
if (!waku.relay)
throw Error("Cannot wait for Relay peer: protocol not mounted");
break;
case Protocols.LightPush:
if (!waku.lightPush)
throw Error("Cannot wait for LightPush peer: protocol not mounted");
break;
case Protocols.Store:
if (!waku.store)
throw Error("Cannot wait for Store peer: protocol not mounted");
break;
case Protocols.Filter:
if (!waku.filter)
throw Error("Cannot wait for Filter peer: protocol not mounted");
break;
}
}

const promises = [waitForProtocols(waku, protocols)];

if (connections.length > 0 && !protocols.includes(Protocols.Relay)) {
const success = await waitForMetadata(waku, protocols);
promises.push(
waitForMetadata(waku, protocols) as unknown as Promise<any[]>

Check warning on line 68 in packages/sdk/src/waku/wait_for_remote_peer.ts

View workflow job for this annotation

GitHub Actions / check

Unexpected any. Specify a different type

Check warning on line 68 in packages/sdk/src/waku/wait_for_remote_peer.ts

View workflow job for this annotation

GitHub Actions / proto

Unexpected any. Specify a different type
);
}

if (success) {
return;
}
if (timeoutMs) {
await rejectOnTimeout(
Promise.any(promises),
timeoutMs,
"Timed out waiting for a remote peer."
);
} else {
await Promise.any(promises);
}
}

type EventListener = (_: CustomEvent<IdentifyResult>) => void;

/**
* Waits for required peers to be connected.
*/
async function waitForProtocols(
waku: IWaku,
protocols: Protocols[]
): Promise<any[]> {

Check warning on line 91 in packages/sdk/src/waku/wait_for_remote_peer.ts

View workflow job for this annotation

GitHub Actions / check

Unexpected any. Specify a different type

Check warning on line 91 in packages/sdk/src/waku/wait_for_remote_peer.ts

View workflow job for this annotation

GitHub Actions / proto

Unexpected any. Specify a different type
const promises = [];

if (protocols.includes(Protocols.Relay)) {
if (!waku.relay) {
throw Error("Cannot wait for Relay peer: protocol not mounted");
}
if (waku.relay && protocols.includes(Protocols.Relay)) {
promises.push(waku.relay.waitForPeers());
}

if (protocols.includes(Protocols.Store)) {
if (!waku.store) {
throw Error("Cannot wait for Store peer: protocol not mounted");
}
if (waku.store && protocols.includes(Protocols.Store)) {
promises.push(waitForConnectedPeer(StoreCodec, waku.libp2p));
}

if (protocols.includes(Protocols.LightPush)) {
if (!waku.lightPush) {
throw Error("Cannot wait for LightPush peer: protocol not mounted");
}
if (waku.lightPush && protocols.includes(Protocols.LightPush)) {
promises.push(waitForConnectedPeer(LightPushCodec, waku.libp2p));
}

if (protocols.includes(Protocols.Filter)) {
if (!waku.filter) {
throw new Error("Cannot wait for Filter peer: protocol not mounted");
}
if (waku.filter && protocols.includes(Protocols.Filter)) {
promises.push(waitForConnectedPeer(FilterCodecs.SUBSCRIBE, waku.libp2p));
}

if (timeoutMs) {
await rejectOnTimeout(
Promise.all(promises),
timeoutMs,
"Timed out waiting for a remote peer."
);
} else {
await Promise.all(promises);
}
return Promise.all(promises);
}

type EventListener = (_: CustomEvent<IdentifyResult>) => void;

/**
* Wait for a peer with the given protocol to be connected.
* If sharding is enabled on the node, it will also wait for the peer to be confirmed by the metadata service.
Expand Down Expand Up @@ -135,12 +154,12 @@
}

/**
* Waits for the metadata from the remote peer.
* Checks existing connections for needed metadata.
*/
async function waitForMetadata(
waku: IWaku,
protocols: Protocols[]
): Promise<boolean> {
): Promise<void> {
const connectedPeers = waku.libp2p.getPeers();
const metadataService = waku.libp2p.services.metadata;
const enabledCodes = mapProtocolsToCodecs(protocols);
Expand All @@ -149,7 +168,7 @@
log.info(
`Skipping waitForMetadata due to missing connections:${connectedPeers.length} or metadataService:${!!metadataService}`
);
return false;
return;
}

for (const peerId of connectedPeers) {
Expand All @@ -173,7 +192,7 @@
);

if (confirmedAllCodecs) {
return true;
return;
}
}
}
Expand All @@ -187,8 +206,6 @@
continue;
}
}

return false;
}

const awaitTimeout = (ms: number, rejectReason: string): Promise<void> =>
Expand Down
Loading