From 91ff94e637585595933f9de2bdd52bc89eab20a0 Mon Sep 17 00:00:00 2001 From: vrtnd Date: Tue, 19 Nov 2024 16:21:41 +0300 Subject: [PATCH] fix rerun --- serverless.yml | 4 +-- src/handlers/dailyAggregateAllAdapters.ts | 14 +++----- src/handlers/runAdapterFromTo.ts | 44 ++++++++++++----------- src/handlers/runAllAdaptersHistorical.ts | 5 ++- src/helpers/solana.ts | 2 +- src/utils/blocks.ts | 26 ++------------ 6 files changed, 35 insertions(+), 60 deletions(-) diff --git a/serverless.yml b/serverless.yml index f5c04243..a76f3ff7 100644 --- a/serverless.yml +++ b/serverless.yml @@ -158,7 +158,7 @@ functions: timeout: 900 memorySize: 1024 events: - - schedule: cron(0 0/12 * * ? *) + - schedule: cron(0 * * * ? *) runAdapterByName: handler: src/handlers/runAdapterByName.default timeout: 900 @@ -168,7 +168,7 @@ functions: timeout: 900 memorySize: 1024 events: - - schedule: cron(30 0/12 * * ? *) + - schedule: cron(30 * * * ? *) getNetflows: handler: src/handlers/getNetflows.default timeout: 30 diff --git a/src/handlers/dailyAggregateAllAdapters.ts b/src/handlers/dailyAggregateAllAdapters.ts index 90f68420..ed0ce244 100644 --- a/src/handlers/dailyAggregateAllAdapters.ts +++ b/src/handlers/dailyAggregateAllAdapters.ts @@ -4,17 +4,11 @@ import { runAggregateDataHistorical } from "../utils/aggregate"; import bridgeNetworkData from "../data/bridgeNetworkData"; export default wrapScheduledLambda(async (_event) => { - const currentDate = new Date(); - const yesterdayDate = new Date(currentDate); - yesterdayDate.setDate(currentDate.getDate() - 1); + const fourHoursAgo = convertToUnixTimestamp(new Date()) - 60 * 60 * 4; + const now = convertToUnixTimestamp(new Date()); - const startOfYesterday = new Date(yesterdayDate.setUTCHours(0, 0, 0, 0)); - const endOfYesterday = new Date(yesterdayDate.setUTCHours(23, 59, 59, 999)); - - const startTimestamp = convertToUnixTimestamp(startOfYesterday); - const endTimestamp = convertToUnixTimestamp(endOfYesterday); - - console.log(`Aggregating data for ${startOfYesterday.toISOString()} to ${endOfYesterday.toISOString()}`); + const startTimestamp = fourHoursAgo; + const endTimestamp = now; for (const adapter of bridgeNetworkData) { await runAggregateDataHistorical(startTimestamp, endTimestamp, adapter.id, false); diff --git a/src/handlers/runAdapterFromTo.ts b/src/handlers/runAdapterFromTo.ts index 12a2cd70..5a7c0ebf 100644 --- a/src/handlers/runAdapterFromTo.ts +++ b/src/handlers/runAdapterFromTo.ts @@ -3,6 +3,7 @@ import bridgeNetworks from "../data/bridgeNetworkData"; import { runAdapterHistorical } from "../utils/adapter"; import { sql } from "../utils/db"; import { getBridgeID } from "../utils/wrappa/postgres/query"; +import { getLatestBlock } from "../utils/blocks"; const handler = async (event: any) => { try { @@ -29,33 +30,38 @@ const handler = async (event: any) => { console.error(`Could not find bridge config for ${nChain} on ${bridgeName}`); return; } - - const fromTx = await sql<{ tx_block: number }[]>` - SELECT tx_block FROM bridges.transactions - WHERE bridge_id = ${bridgeConfig.id} + let fromBlock, toBlock; + if (fromTimestamp) { + const fromTx = await sql<{ tx_block: number }[]>` + SELECT tx_block FROM bridges.transactions + WHERE bridge_id = ${bridgeConfig.id} AND chain = ${nChain} AND tx_block IS NOT NULL AND ts <= to_timestamp(${fromTimestamp}) ORDER BY ts DESC LIMIT 1 - `; - - const toTx = await sql<{ tx_block: number }[]>` - SELECT tx_block FROM bridges.transactions - WHERE bridge_id = ${bridgeConfig.id} - AND chain = ${nChain} - AND tx_block IS NOT NULL - AND ts >= to_timestamp(${toTimestamp}) - ORDER BY ts ASC LIMIT 1 - `; + `; + fromBlock = fromTx[0].tx_block; + } + if (toTimestamp) { + const toTx = await sql<{ tx_block: number }[]>` + SELECT tx_block FROM bridges.transactions + WHERE bridge_id = ${bridgeConfig.id} + AND chain = ${nChain} + AND tx_block IS NOT NULL + AND ts >= to_timestamp(${toTimestamp}) + ORDER BY ts ASC LIMIT 1 + `; + toBlock = toTx[0].tx_block; + } else { + const latestBlock = await getLatestBlock(nChain); + toBlock = latestBlock.number; + } - if (!fromTx.length || !toTx.length) { + if (!fromBlock || !toBlock) { console.error(`Could not find transactions with blocks for ${nChain} on ${bridgeName}`); return; } - const fromBlock = fromTx[0].tx_block; - const toBlock = toTx[0].tx_block; - await runAdapterHistorical(fromBlock, toBlock, adapter.id, nChain, true, false, "upsert"); console.log(`Adapter ${bridgeName} ran successfully for chain ${nChain} from block ${fromBlock} to ${toBlock}`); @@ -66,8 +72,6 @@ const handler = async (event: any) => { console.log(`Adapter ${bridgeName} completed for all chains`); } catch (e) { console.error(`Adapter failed: ${JSON.stringify(e)}`); - } finally { - await sql.end(); } }; diff --git a/src/handlers/runAllAdaptersHistorical.ts b/src/handlers/runAllAdaptersHistorical.ts index 4e8cea3b..db415b2a 100644 --- a/src/handlers/runAllAdaptersHistorical.ts +++ b/src/handlers/runAllAdaptersHistorical.ts @@ -25,13 +25,12 @@ async function invokeLambda(functionName: string, event: any) { const handler = async (_event: any) => { await closeIdleConnections(); const now = Math.floor(Date.now() / 1000); - const dayAgo = now - 86400; + const fourHoursAgo = now - 60 * 60 * 4; for (const bridge of bridgeNetworks) { await invokeLambda("llama-bridges-prod-runAdapterFromTo", { bridgeName: bridge.bridgeDbName, - fromTimestamp: dayAgo, - toTimestamp: now, + fromTimestamp: fourHoursAgo, }); } diff --git a/src/helpers/solana.ts b/src/helpers/solana.ts index 01da9b67..89325895 100644 --- a/src/helpers/solana.ts +++ b/src/helpers/solana.ts @@ -1,7 +1,7 @@ import { Connection } from "@solana/web3.js"; export const getConnection = (): Connection => { - const rpc = process.env.SOLANA_RPC || "https://api.mainnet-beta.solana.com"; + const rpc = "https://api.mainnet-beta.solana.com"; const connection = new Connection(rpc); const getBlock = async (block: number) => { return new Connection(rpc).getBlock(block, { diff --git a/src/utils/blocks.ts b/src/utils/blocks.ts index 1df7bdee..e898fec3 100644 --- a/src/utils/blocks.ts +++ b/src/utils/blocks.ts @@ -60,30 +60,8 @@ const lookupBlock = async (timestamp: number, { chain }: { chain: Chain }) => { }; async function getBlockTime(slotNumber: number) { - const response = await fetch(process.env.SOLANA_RPC as string, { - method: "POST", - headers: { - "Content-Type": "application/json", - }, - body: JSON.stringify({ - jsonrpc: "2.0", - id: 1, - method: "getBlockTime", - params: [slotNumber], - }), - }); - - if (!response.ok) { - throw new Error(`HTTP error! status: ${response.status}`); - } - - const data = await response.json(); - - if (data.error) { - throw new Error(`RPC error: ${data.error.message}`); - } - - return data.result; + const response = await connection.getBlockTime(slotNumber); + return response; } export async function getLatestBlock(chain: string, bridge?: string): Promise<{ number: number; timestamp: number }> {