Skip to content

Commit

Permalink
fix rerun
Browse files Browse the repository at this point in the history
  • Loading branch information
vrtnd committed Nov 19, 2024
1 parent d845134 commit 91ff94e
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 60 deletions.
4 changes: 2 additions & 2 deletions serverless.yml
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ functions:
timeout: 900
memorySize: 1024
events:
- schedule: cron(0 0/12 * * ? *)
- schedule: cron(0 * * * ? *)
runAdapterByName:
handler: src/handlers/runAdapterByName.default
timeout: 900
Expand All @@ -168,7 +168,7 @@ functions:
timeout: 900
memorySize: 1024
events:
- schedule: cron(30 0/12 * * ? *)
- schedule: cron(30 * * * ? *)
getNetflows:
handler: src/handlers/getNetflows.default
timeout: 30
Expand Down
14 changes: 4 additions & 10 deletions src/handlers/dailyAggregateAllAdapters.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,11 @@ import { runAggregateDataHistorical } from "../utils/aggregate";
import bridgeNetworkData from "../data/bridgeNetworkData";

export default wrapScheduledLambda(async (_event) => {
const currentDate = new Date();
const yesterdayDate = new Date(currentDate);
yesterdayDate.setDate(currentDate.getDate() - 1);
const fourHoursAgo = convertToUnixTimestamp(new Date()) - 60 * 60 * 4;
const now = convertToUnixTimestamp(new Date());

const startOfYesterday = new Date(yesterdayDate.setUTCHours(0, 0, 0, 0));
const endOfYesterday = new Date(yesterdayDate.setUTCHours(23, 59, 59, 999));

const startTimestamp = convertToUnixTimestamp(startOfYesterday);
const endTimestamp = convertToUnixTimestamp(endOfYesterday);

console.log(`Aggregating data for ${startOfYesterday.toISOString()} to ${endOfYesterday.toISOString()}`);
const startTimestamp = fourHoursAgo;
const endTimestamp = now;

for (const adapter of bridgeNetworkData) {
await runAggregateDataHistorical(startTimestamp, endTimestamp, adapter.id, false);
Expand Down
44 changes: 24 additions & 20 deletions src/handlers/runAdapterFromTo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import bridgeNetworks from "../data/bridgeNetworkData";
import { runAdapterHistorical } from "../utils/adapter";
import { sql } from "../utils/db";
import { getBridgeID } from "../utils/wrappa/postgres/query";
import { getLatestBlock } from "../utils/blocks";

const handler = async (event: any) => {
try {
Expand All @@ -29,33 +30,38 @@ const handler = async (event: any) => {
console.error(`Could not find bridge config for ${nChain} on ${bridgeName}`);
return;
}

const fromTx = await sql<{ tx_block: number }[]>`
SELECT tx_block FROM bridges.transactions
WHERE bridge_id = ${bridgeConfig.id}
let fromBlock, toBlock;
if (fromTimestamp) {
const fromTx = await sql<{ tx_block: number }[]>`
SELECT tx_block FROM bridges.transactions
WHERE bridge_id = ${bridgeConfig.id}
AND chain = ${nChain}
AND tx_block IS NOT NULL
AND ts <= to_timestamp(${fromTimestamp})
ORDER BY ts DESC LIMIT 1
`;

const toTx = await sql<{ tx_block: number }[]>`
SELECT tx_block FROM bridges.transactions
WHERE bridge_id = ${bridgeConfig.id}
AND chain = ${nChain}
AND tx_block IS NOT NULL
AND ts >= to_timestamp(${toTimestamp})
ORDER BY ts ASC LIMIT 1
`;
`;
fromBlock = fromTx[0].tx_block;
}
if (toTimestamp) {
const toTx = await sql<{ tx_block: number }[]>`
SELECT tx_block FROM bridges.transactions
WHERE bridge_id = ${bridgeConfig.id}
AND chain = ${nChain}
AND tx_block IS NOT NULL
AND ts >= to_timestamp(${toTimestamp})
ORDER BY ts ASC LIMIT 1
`;
toBlock = toTx[0].tx_block;
} else {
const latestBlock = await getLatestBlock(nChain);
toBlock = latestBlock.number;
}

if (!fromTx.length || !toTx.length) {
if (!fromBlock || !toBlock) {
console.error(`Could not find transactions with blocks for ${nChain} on ${bridgeName}`);
return;
}

const fromBlock = fromTx[0].tx_block;
const toBlock = toTx[0].tx_block;

await runAdapterHistorical(fromBlock, toBlock, adapter.id, nChain, true, false, "upsert");

console.log(`Adapter ${bridgeName} ran successfully for chain ${nChain} from block ${fromBlock} to ${toBlock}`);
Expand All @@ -66,8 +72,6 @@ const handler = async (event: any) => {
console.log(`Adapter ${bridgeName} completed for all chains`);
} catch (e) {
console.error(`Adapter failed: ${JSON.stringify(e)}`);
} finally {
await sql.end();
}
};

Expand Down
5 changes: 2 additions & 3 deletions src/handlers/runAllAdaptersHistorical.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,12 @@ async function invokeLambda(functionName: string, event: any) {
const handler = async (_event: any) => {
await closeIdleConnections();
const now = Math.floor(Date.now() / 1000);
const dayAgo = now - 86400;
const fourHoursAgo = now - 60 * 60 * 4;

for (const bridge of bridgeNetworks) {
await invokeLambda("llama-bridges-prod-runAdapterFromTo", {
bridgeName: bridge.bridgeDbName,
fromTimestamp: dayAgo,
toTimestamp: now,
fromTimestamp: fourHoursAgo,
});
}

Expand Down
2 changes: 1 addition & 1 deletion src/helpers/solana.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Connection } from "@solana/web3.js";

export const getConnection = (): Connection => {
const rpc = process.env.SOLANA_RPC || "https://api.mainnet-beta.solana.com";
const rpc = "https://api.mainnet-beta.solana.com";
const connection = new Connection(rpc);
const getBlock = async (block: number) => {
return new Connection(rpc).getBlock(block, {
Expand Down
26 changes: 2 additions & 24 deletions src/utils/blocks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,30 +60,8 @@ const lookupBlock = async (timestamp: number, { chain }: { chain: Chain }) => {
};

async function getBlockTime(slotNumber: number) {
const response = await fetch(process.env.SOLANA_RPC as string, {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({
jsonrpc: "2.0",
id: 1,
method: "getBlockTime",
params: [slotNumber],
}),
});

if (!response.ok) {
throw new Error(`HTTP error! status: ${response.status}`);
}

const data = await response.json();

if (data.error) {
throw new Error(`RPC error: ${data.error.message}`);
}

return data.result;
const response = await connection.getBlockTime(slotNumber);
return response;
}

export async function getLatestBlock(chain: string, bridge?: string): Promise<{ number: number; timestamp: number }> {
Expand Down

0 comments on commit 91ff94e

Please sign in to comment.