Skip to content

Commit

Permalink
rm rpc watcher, use sg instead
Browse files Browse the repository at this point in the history
  • Loading branch information
rouzwelt committed Nov 11, 2024
1 parent 331449f commit d0863a5
Show file tree
Hide file tree
Showing 7 changed files with 339 additions and 661 deletions.
105 changes: 57 additions & 48 deletions src/cli.ts
Original file line number Diff line number Diff line change
@@ -1,27 +1,26 @@
import { config } from "dotenv";
import { SgOrder } from "./query";
import { Command } from "commander";
import { getMetaInfo } from "./config";
import { BigNumber, ethers } from "ethers";
import { Context } from "@opentelemetry/api";
import { getAddOrders, SgOrder } from "./query";
import { Resource } from "@opentelemetry/resources";
import { getOrderDetails, clear, getConfig } from ".";
import { ErrorSeverity, errorSnapshot } from "./error";
import { Tracer } from "@opentelemetry/sdk-trace-base";
import { ProcessPairReportStatus } from "./processOrders";
import { sleep, getOrdersTokens, isBigNumberish } from "./utils";
import { CompressionAlgorithm } from "@opentelemetry/otlp-exporter-base";
import { BotConfig, BundledOrders, CliOptions, ViemClient } from "./types";
import { OTLPTraceExporter } from "@opentelemetry/exporter-trace-otlp-http";
import { SEMRESATTRS_SERVICE_NAME } from "@opentelemetry/semantic-conventions";
import { sleep, getOrdersTokens, isBigNumberish, getblockNumber } from "./utils";
import { getOrderbookOwnersProfileMapFromSg, prepareOrdersForRound } from "./order";
import { manageAccounts, rotateProviders, sweepToMainWallet, sweepToEth } from "./account";
import {
watchOrderbook,
watchAllOrderbooks,
WatchedOrderbookOrders,
handleOrderbooksNewLogs,
} from "./watcher";
prepareOrdersForRound,
getOrderbookOwnersProfileMapFromSg,
handleAddOrderbookOwnersProfileMap,
handleRemoveOrderbookOwnersProfileMap,
} from "./order";
import {
diag,
trace,
Expand Down Expand Up @@ -413,6 +412,7 @@ export async function startup(argv: any, version?: string, tracer?: Tracer, ctx?
}
}
}
const lastReadOrdersTimestamp = Math.floor(Date.now() / 1000);
const tokens = getOrdersTokens(ordersDetails);
options.tokens = [...tokens];

Expand All @@ -425,7 +425,6 @@ export async function startup(argv: any, version?: string, tracer?: Tracer, ctx?
tracer,
ctx,
);
const blockNumber = (await getblockNumber(config.viemClient as any as ViemClient)) ?? 1n;

return {
roundGap,
Expand All @@ -439,7 +438,7 @@ export async function startup(argv: any, version?: string, tracer?: Tracer, ctx?
(options as CliOptions).ownerProfile,
),
tokens,
blockNumber,
lastReadOrdersTimestamp,
};
}

Expand Down Expand Up @@ -485,7 +484,7 @@ export const main = async (argv: any, version?: string) => {
config,
orderbooksOwnersProfileMap,
tokens,
blockNumber: bn,
lastReadOrdersTimestamp,
} = await tracer.startActiveSpan("startup", async (startupSpan) => {
const ctx = trace.setSpan(context.active(), startupSpan);
try {
Expand All @@ -508,14 +507,11 @@ export const main = async (argv: any, version?: string) => {
}
});

let blockNumber = bn;
const obs: string[] = [];
const watchedOrderbooksOrders: Record<string, WatchedOrderbookOrders> = {};
orderbooksOwnersProfileMap.forEach((_, ob) => {
obs.push(ob.toLowerCase());
});
const unwatchers = watchAllOrderbooks(obs, config.watchClient, watchedOrderbooksOrders);

const lastReadOrdersTimestampMap = options.subgraph.map((v) => ({
sg: v,
lastReadTimestampAdd: lastReadOrdersTimestamp,
lastReadTimestampRemove: lastReadOrdersTimestamp,
}));
const day = 24 * 60 * 60 * 1000;
let lastGasReset = Date.now() + day;
let lastInterval = Date.now() + poolUpdateInterval;
Expand All @@ -539,27 +535,6 @@ export const main = async (argv: any, version?: string) => {
"meta.dockerTag": process?.env?.DOCKER_TAG ?? "N/A",
});

// watch new obs
for (const newOb of newMeta["meta.orderbooks"]) {
const ob = newOb.toLowerCase();
if (!obs.includes(ob)) {
obs.push(ob);
if (!watchedOrderbooksOrders[ob]) {
watchedOrderbooksOrders[ob] = { orderLogs: [] };
}
if (!unwatchers[ob]) {
unwatchers[ob] = watchOrderbook(
ob,
config.watchClient,
watchedOrderbooksOrders[ob],
blockNumber,
);
}
}
}
const tempBn = await getblockNumber(config.viemClient as any as ViemClient);
if (tempBn !== undefined) blockNumber = (tempBn * 95n) / 100n;

await tracer.startActiveSpan(
"check-wallet-balance",
{},
Expand Down Expand Up @@ -735,15 +710,49 @@ export const main = async (argv: any, version?: string) => {
}

try {
// check for new orders
await handleOrderbooksNewLogs(
orderbooksOwnersProfileMap,
watchedOrderbooksOrders,
config.viemClient as any as ViemClient,
tokens,
options.ownerProfile,
roundSpan,
// check for new orders changes
const now = Math.floor(Date.now() / 1000);
const addOrdersResult = await Promise.allSettled(
lastReadOrdersTimestampMap.map((v) =>
getAddOrders(v.sg, v.lastReadTimestampAdd, now, options.timeout, roundSpan),
),
);
for (let i = 0; i < addOrdersResult.length; i++) {
const res = addOrdersResult[i];
if (res.status === "fulfilled") {
lastReadOrdersTimestampMap[i].lastReadTimestampAdd = now;
await handleAddOrderbookOwnersProfileMap(
orderbooksOwnersProfileMap,
res.value.map((v) => v.order),
config.viemClient as any as ViemClient,
tokens,
options.ownerProfile,
roundSpan,
);
}
}
const rmOrdersResult = await Promise.allSettled(
lastReadOrdersTimestampMap.map((v) =>
getAddOrders(
v.sg,
v.lastReadTimestampRemove,
now,
options.timeout,
roundSpan,
),
),
);
for (let i = 0; i < rmOrdersResult.length; i++) {
const res = rmOrdersResult[i];
if (res.status === "fulfilled") {
lastReadOrdersTimestampMap[i].lastReadTimestampRemove = now;
await handleRemoveOrderbookOwnersProfileMap(
orderbooksOwnersProfileMap,
res.value.map((v) => v.order),
roundSpan,
);
}
}
} catch {
/**/
}
Expand Down
118 changes: 108 additions & 10 deletions src/order.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { OrderV3 } from "./abis";
import { SgOrder } from "./query";
import { toOrder } from "./watcher";
import { Span } from "@opentelemetry/api";
import { hexlify } from "ethers/lib/utils";
import { getTokenSymbol, shuffleArray } from "./utils";
import { decodeAbiParameters, parseAbiParameters } from "viem";
import {
Expand All @@ -19,6 +20,28 @@ import {
*/
export const DEFAULT_OWNER_LIMIT = 25 as const;

export function toOrder(orderLog: any): Order {
return {
owner: orderLog.owner.toLowerCase(),
nonce: orderLog.nonce.toLowerCase(),
evaluable: {
interpreter: orderLog.evaluable.interpreter.toLowerCase(),
store: orderLog.evaluable.store.toLowerCase(),
bytecode: orderLog.evaluable.bytecode.toLowerCase(),
},
validInputs: orderLog.validInputs.map((v: any) => ({
token: v.token.toLowerCase(),
decimals: v.decimals,
vaultId: hexlify(v.vaultId),
})),
validOutputs: orderLog.validOutputs.map((v: any) => ({
token: v.token.toLowerCase(),
decimals: v.decimals,
vaultId: hexlify(v.vaultId),
})),
};
}

/**
* Get all pairs of an order
*/
Expand Down Expand Up @@ -96,17 +119,19 @@ export async function getOrderPairs(
}
return pairs;
}

/**
* Get a map of per owner orders per orderbook
* @param ordersDetails - Order details queried from subgraph
* Handles new orders fetched from sg to the owner profile map
*/
export async function getOrderbookOwnersProfileMapFromSg(
export async function handleAddOrderbookOwnersProfileMap(
orderbooksOwnersProfileMap: OrderbooksOwnersProfileMap,
ordersDetails: SgOrder[],
viemClient: ViemClient,
tokens: TokenDetails[],
ownerLimits?: Record<string, number>,
): Promise<OrderbooksOwnersProfileMap> {
const orderbookOwnersProfileMap: OrderbooksOwnersProfileMap = new Map();
span?: Span,
) {
const changes: Record<string, string[]> = {};
for (let i = 0; i < ordersDetails.length; i++) {
const orderDetails = ordersDetails[i];
const orderbook = orderDetails.orderbook.id.toLowerCase();
Expand All @@ -116,11 +141,18 @@ export async function getOrderbookOwnersProfileMapFromSg(
orderDetails.orderBytes as `0x${string}`,
)[0],
);
const orderbookOwnerProfileItem = orderbookOwnersProfileMap.get(orderbook);
if (span) {
if (!changes[orderbook]) changes[orderbook] = [];
if (!changes[orderbook].includes(orderDetails.orderHash.toLowerCase())) {
changes[orderbook].push(orderDetails.orderHash.toLowerCase());
}
}
const orderbookOwnerProfileItem = orderbooksOwnersProfileMap.get(orderbook);
if (orderbookOwnerProfileItem) {
const ownerProfile = orderbookOwnerProfileItem.get(orderStruct.owner.toLowerCase());
if (ownerProfile) {
if (!ownerProfile.orders.has(orderDetails.orderHash.toLowerCase())) {
const order = ownerProfile.orders.get(orderDetails.orderHash.toLowerCase());
if (!order) {
ownerProfile.orders.set(orderDetails.orderHash.toLowerCase(), {
active: true,
order: orderStruct,
Expand All @@ -132,6 +164,8 @@ export async function getOrderbookOwnersProfileMapFromSg(
),
consumedTakeOrders: [],
});
} else {
if (!order.active) order.active = true;
}
} else {
const ordersProfileMap: OrdersProfileMap = new Map();
Expand Down Expand Up @@ -159,10 +193,74 @@ export async function getOrderbookOwnersProfileMapFromSg(
limit: ownerLimits?.[orderStruct.owner.toLowerCase()] ?? DEFAULT_OWNER_LIMIT,
orders: ordersProfileMap,
});
orderbookOwnersProfileMap.set(orderbook, ownerProfileMap);
orderbooksOwnersProfileMap.set(orderbook, ownerProfileMap);
}
}
if (span) {
for (const orderbook in changes) {
span.setAttribute(`orderbooksChanges.${orderbook}.addedOrders`, changes[orderbook]);
}
}
}

/**
* Handles new removed orders fetched from sg to the owner profile map
*/
export async function handleRemoveOrderbookOwnersProfileMap(
orderbooksOwnersProfileMap: OrderbooksOwnersProfileMap,
ordersDetails: SgOrder[],
span?: Span,
) {
const changes: Record<string, string[]> = {};
for (let i = 0; i < ordersDetails.length; i++) {
const orderDetails = ordersDetails[i];
const orderbook = orderDetails.orderbook.id.toLowerCase();
const orderStruct = toOrder(
decodeAbiParameters(
parseAbiParameters(OrderV3),
orderDetails.orderBytes as `0x${string}`,
)[0],
);
if (span) {
if (!changes[orderbook]) changes[orderbook] = [];
if (!changes[orderbook].includes(orderDetails.orderHash.toLowerCase())) {
changes[orderbook].push(orderDetails.orderHash.toLowerCase());
}
}
const orderbookOwnerProfileItem = orderbooksOwnersProfileMap.get(orderbook);
if (orderbookOwnerProfileItem) {
const ownerProfile = orderbookOwnerProfileItem.get(orderStruct.owner.toLowerCase());
if (ownerProfile) {
ownerProfile.orders.delete(orderDetails.orderHash.toLowerCase());
}
}
}
if (span) {
for (const orderbook in changes) {
span.setAttribute(`orderbooksChanges.${orderbook}.removedOrders`, changes[orderbook]);
}
}
return orderbookOwnersProfileMap;
}

/**
* Get a map of per owner orders per orderbook
* @param ordersDetails - Order details queried from subgraph
*/
export async function getOrderbookOwnersProfileMapFromSg(
ordersDetails: SgOrder[],
viemClient: ViemClient,
tokens: TokenDetails[],
ownerLimits?: Record<string, number>,
): Promise<OrderbooksOwnersProfileMap> {
const orderbooksOwnersProfileMap: OrderbooksOwnersProfileMap = new Map();
await handleAddOrderbookOwnersProfileMap(
orderbooksOwnersProfileMap,
ordersDetails,
viemClient,
tokens,
ownerLimits,
);
return orderbooksOwnersProfileMap;
}

/**
Expand Down
Loading

0 comments on commit d0863a5

Please sign in to comment.