Skip to content

Commit

Permalink
feat(express-relay): Support websocket in the js sdk (#1301)
Browse files Browse the repository at this point in the history
  • Loading branch information
m30m authored Feb 22, 2024
1 parent 3c1348c commit f22c0c8
Show file tree
Hide file tree
Showing 6 changed files with 423 additions and 171 deletions.
14 changes: 6 additions & 8 deletions express_relay/sdk/js/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,24 +35,22 @@ import {

const client = new Client({ baseUrl: "https://per-staging.dourolabs.app/" });

function calculateOpportunityBid(
opportunity: OpportunityParams
): BidInfo | null {
function calculateOpportunityBid(opportunity: Opportunity): BidInfo | null {
// searcher implementation here
// if the opportunity is not suitable for the searcher, return null
}
const opportunities = await client.getOpportunities();

for (const opportunity of opportunities) {
const bidInfo = calculateOpportunityBid(order);
if (bidInfo === null) continue;
client.setOpportunityHandler(async (opportunity: Opportunity) => {
const bidInfo = calculateOpportunityBid(opportunity);
if (bidInfo === null) return;
const opportunityBid = await client.signOpportunityBid(
opportunity,
bidInfo,
privateKey // searcher private key with appropriate permissions and assets
);
await client.submitOpportunityBid(opportunityBid);
}
});
await client.subscribeChains([chain_id]); // chain id you want to subscribe to
```

### Example
Expand Down
10 changes: 6 additions & 4 deletions express_relay/sdk/js/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@pythnetwork/express-relay-evm-js",
"version": "0.1.0",
"version": "0.1.1",
"description": "Utilities for interacting with the express relay protocol",
"homepage": "https://pyth.network",
"author": {
Expand Down Expand Up @@ -36,20 +36,22 @@
"directory": "express_relay/sdk/js"
},
"dependencies": {
"isomorphic-ws": "^5.0.0",
"openapi-client-axios": "^7.5.4",
"openapi-fetch": "^0.8.2",
"viem": "^2.7.6"
"viem": "^2.7.6",
"ws": "^8.16.0"
},
"devDependencies": {
"@types/yargs": "^17.0.10",
"@typescript-eslint/eslint-plugin": "^5.21.0",
"@typescript-eslint/parser": "^5.21.0",
"eslint": "^8.56.0",
"jest": "^27.5.1",
"openapi-typescript": "^6.5.5",
"prettier": "^2.6.2",
"typescript": "^5.1",
"yargs": "^17.4.1",
"jest": "^27.5.1"
"yargs": "^17.4.1"
},
"license": "Apache-2.0"
}
64 changes: 32 additions & 32 deletions express_relay/sdk/js/src/examples/SimpleSearcher.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
import yargs from "yargs";
import { hideBin } from "yargs/helpers";
import { Client } from "../index";
import { checkHex, Client } from "../index";
import { privateKeyToAccount } from "viem/accounts";
import { isHex } from "viem";

function sleep(ms: number) {
return new Promise((resolve) => setTimeout(resolve, ms));
}

const argv = yargs(hideBin(process.argv))
.option("endpoint", {
description:
Expand All @@ -18,6 +14,7 @@ const argv = yargs(hideBin(process.argv))
.option("chain-id", {
description: "Chain id to fetch opportunities for. e.g: sepolia",
type: "string",
demandOption: true,
})
.option("bid", {
description: "Bid amount in wei",
Expand All @@ -43,35 +40,38 @@ async function run() {
throw new Error(`Invalid private key: ${argv.privateKey}`);
}
const DAY_IN_SECONDS = 60 * 60 * 24;
// eslint-disable-next-line no-constant-condition
while (true) {
const opportunities = await client.getOpportunities(argv.chainId);
console.log(`Fetched ${opportunities.length} opportunities`);
for (const opportunity of opportunities) {
const bid = BigInt(argv.bid);
// Bid info should be generated by evaluating the opportunity
// here for simplicity we are using a constant bid and 24 hours of validity
const bidInfo = {
amount: bid,
validUntil: BigInt(Math.round(Date.now() / 1000 + DAY_IN_SECONDS)),
};
const opportunityBid = await client.signOpportunityBid(
opportunity,
bidInfo,
argv.privateKey
client.setOpportunityHandler(async (opportunity) => {
const bid = BigInt(argv.bid);
// Bid info should be generated by evaluating the opportunity
// here for simplicity we are using a constant bid and 24 hours of validity
const bidInfo = {
amount: bid,
validUntil: BigInt(Math.round(Date.now() / 1000 + DAY_IN_SECONDS)),
};
const opportunityBid = await client.signOpportunityBid(
opportunity,
bidInfo,
checkHex(argv.privateKey)
);
try {
await client.submitOpportunityBid(opportunityBid);
console.log(
`Successful bid ${bid} on opportunity ${opportunity.opportunityId}`
);
} catch (error) {
console.error(
`Failed to bid on opportunity ${opportunity.opportunityId}: ${error}`
);
try {
await client.submitOpportunityBid(opportunityBid);
console.log(
`Successful bid ${bid} on opportunity ${opportunity.opportunityId}`
);
} catch (error) {
console.error(
`Failed to bid on opportunity ${opportunity.opportunityId}: ${error}`
);
}
}
await sleep(5000);
});
try {
await client.subscribeChains([argv.chainId]);
console.log(
`Subscribed to chain ${argv.chainId}. Waiting for opportunities...`
);
} catch (error) {
console.error(error);
client.websocket?.close();
}
}

Expand Down
190 changes: 171 additions & 19 deletions express_relay/sdk/js/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import type { paths } from "./types";
import createClient, { ClientOptions } from "openapi-fetch";
import type { paths, components } from "./types";
import createClient, {
ClientOptions as FetchClientOptions,
} from "openapi-fetch";
import {
Address,
encodeAbiParameters,
Expand All @@ -10,7 +12,7 @@ import {
keccak256,
} from "viem";
import { privateKeyToAccount, sign, signatureToHex } from "viem/accounts";

import WebSocket from "isomorphic-ws";
/**
* ERC20 token with contract address and amount
*/
Expand Down Expand Up @@ -118,11 +120,172 @@ function checkTokenQty(token: { contract: string; amount: string }): TokenQty {
};
}

type ClientOptions = FetchClientOptions & { baseUrl: string };

export interface WsOptions {
/**
* Max time to wait for a response from the server in milliseconds
*/
response_timeout: number;
}

const DEFAULT_WS_OPTIONS: WsOptions = {
response_timeout: 5000,
};

export class Client {
private clientOptions?: ClientOptions;
public clientOptions: ClientOptions;
public wsOptions: WsOptions;
public websocket?: WebSocket;
public idCounter = 0;
public callbackRouter: Record<
string,
(response: components["schemas"]["ServerResultMessage"]) => void
> = {};
private websocketOpportunityCallback?: (
opportunity: Opportunity
) => Promise<void>;

constructor(clientOptions?: ClientOptions) {
constructor(clientOptions: ClientOptions, wsOptions?: WsOptions) {
this.clientOptions = clientOptions;
this.wsOptions = { ...DEFAULT_WS_OPTIONS, ...wsOptions };
}

private connectWebsocket() {
const websocketEndpoint = new URL(this.clientOptions.baseUrl);
websocketEndpoint.protocol =
websocketEndpoint.protocol === "https:" ? "wss:" : "ws:";
websocketEndpoint.pathname = "/v1/ws";

this.websocket = new WebSocket(websocketEndpoint.toString());
this.websocket.on("message", async (data) => {
const message:
| components["schemas"]["ServerResultResponse"]
| components["schemas"]["ServerUpdateResponse"] = JSON.parse(
data.toString()
);
if ("id" in message && message.id) {
const callback = this.callbackRouter[message.id];
if (callback !== undefined) {
callback(message);
delete this.callbackRouter[message.id];
}
} else if ("type" in message && message.type === "new_opportunity") {
if (this.websocketOpportunityCallback !== undefined) {
const convertedOpportunity = this.convertOpportunity(
message.opportunity
);
if (convertedOpportunity !== undefined) {
await this.websocketOpportunityCallback(convertedOpportunity);
}
}
} else if ("error" in message) {
// Can not route error messages to the callback router as they don't have an id
console.error(message.error);
}
});
}

/**
* Converts an opportunity from the server to the client format
* Returns undefined if the opportunity version is not supported
* @param opportunity
*/
private convertOpportunity(
opportunity: components["schemas"]["OpportunityParamsWithMetadata"]
): Opportunity | undefined {
if (opportunity.version != "v1") {
console.warn(
`Can not handle opportunity version: ${opportunity.version}. Please upgrade your client.`
);
return undefined;
}
return {
chainId: opportunity.chain_id,
opportunityId: opportunity.opportunity_id,
permissionKey: checkHex(opportunity.permission_key),
contract: checkAddress(opportunity.contract),
calldata: checkHex(opportunity.calldata),
value: BigInt(opportunity.value),
repayTokens: opportunity.repay_tokens.map(checkTokenQty),
receiptTokens: opportunity.receipt_tokens.map(checkTokenQty),
};
}

public setOpportunityHandler(
callback: (opportunity: Opportunity) => Promise<void>
) {
this.websocketOpportunityCallback = callback;
}

/**
* Subscribes to the specified chains
*
* The opportunity handler will be called for opportunities on the specified chains
* If the opportunity handler is not set, an error will be thrown
* @param chains
*/
async subscribeChains(chains: string[]) {
if (this.websocketOpportunityCallback === undefined) {
throw new Error("Opportunity handler not set");
}
return this.sendWebsocketMessage({
method: "subscribe",
params: {
chain_ids: chains,
},
});
}

/**
* Unsubscribes from the specified chains
*
* The opportunity handler will no longer be called for opportunities on the specified chains
* @param chains
*/
async unsubscribeChains(chains: string[]) {
return this.sendWebsocketMessage({
method: "unsubscribe",
params: {
chain_ids: chains,
},
});
}

async sendWebsocketMessage(
msg: components["schemas"]["ClientMessage"]
): Promise<void> {
const msg_with_id: components["schemas"]["ClientRequest"] = {
...msg,
id: (this.idCounter++).toString(),
};
return new Promise((resolve, reject) => {
this.callbackRouter[msg_with_id.id] = (response) => {
if (response.status === "success") {
resolve();
} else {
reject(response.result);
}
};
if (this.websocket === undefined) {
this.connectWebsocket();
}
if (this.websocket !== undefined) {
if (this.websocket.readyState === WebSocket.CONNECTING) {
this.websocket.on("open", () => {
this.websocket?.send(JSON.stringify(msg_with_id));
});
} else if (this.websocket.readyState === WebSocket.OPEN) {
this.websocket.send(JSON.stringify(msg_with_id));
} else {
reject("Websocket connection closing or already closed");
}
}
setTimeout(() => {
delete this.callbackRouter[msg_with_id.id];
reject("Websocket response timeout");
}, this.wsOptions.response_timeout);
});
}

/**
Expand All @@ -138,22 +301,11 @@ export class Client {
throw new Error("No opportunities found");
}
return opportunities.data.flatMap((opportunity) => {
if (opportunity.version != "v1") {
console.warn(
`Can not handle opportunity version: ${opportunity.version}. Please upgrade your client.`
);
const convertedOpportunity = this.convertOpportunity(opportunity);
if (convertedOpportunity === undefined) {
return [];
}
return {
chainId: opportunity.chain_id,
opportunityId: opportunity.opportunity_id,
permissionKey: checkHex(opportunity.permission_key),
contract: checkAddress(opportunity.contract),
calldata: checkHex(opportunity.calldata),
value: BigInt(opportunity.value),
repayTokens: opportunity.repay_tokens.map(checkTokenQty),
receiptTokens: opportunity.receipt_tokens.map(checkTokenQty),
};
return convertedOpportunity;
});
}

Expand Down
Loading

0 comments on commit f22c0c8

Please sign in to comment.