diff --git a/serverless.yml b/serverless.yml index 9af09626..f4f727c8 100644 --- a/serverless.yml +++ b/serverless.yml @@ -169,7 +169,14 @@ functions: memorySize: 1024 events: - schedule: cron(0 1 * * ? *) - + getNetflows: + handler: src/handlers/getNetflows.default + timeout: 30 + memorySize: 2000 + events: + - http: + path: netflows/{period} + method: get resources: # CORS for api gateway errors - ${file(resources/api-gateway-errors.yml)} diff --git a/src/handlers/getNetflows.ts b/src/handlers/getNetflows.ts new file mode 100644 index 00000000..8b5a4926 --- /dev/null +++ b/src/handlers/getNetflows.ts @@ -0,0 +1,18 @@ +import { IResponse, successResponse, errorResponse } from "../utils/lambda-response"; +import wrap from "../utils/wrap"; +import { getNetflows } from "../utils/wrappa/postgres/query"; + +const handler = async (event: AWSLambda.APIGatewayEvent): Promise => { + const period = event.pathParameters?.period?.toLowerCase() as "day" | "week" | "month"; + + if (!period || !["day", "week", "month"].includes(period)) { + return errorResponse({ + message: "Period must be one of: day, week, month", + }); + } + + const response = await getNetflows(period); + return successResponse(response, 10 * 60); // 10 mins cache +}; + +export default wrap(handler); diff --git a/src/utils/wrappa/postgres/query.ts b/src/utils/wrappa/postgres/query.ts index 019492ba..da6455d1 100644 --- a/src/utils/wrappa/postgres/query.ts +++ b/src/utils/wrappa/postgres/query.ts @@ -43,6 +43,8 @@ interface IAggregatedData { total_address_withdrawn: string[]; } +type TimePeriod = "day" | "week" | "month"; + const getBridgeID = async (bridgNetworkName: string, chain: string) => { return ( await sql` @@ -357,6 +359,41 @@ const getLast24HVolume = async (bridgeName: string, volumeType: VolumeType = "bo return parseFloat((+result[0].total_volume / 2).toString()); }; +const getNetflows = async (period: TimePeriod) => { + let intervalPeriod = sql``; + switch (period) { + case "day": + intervalPeriod = sql`interval '1 day'`; + break; + case "week": + intervalPeriod = sql`interval '1 week'`; + break; + case "month": + intervalPeriod = sql`interval '1 month'`; + break; + } + + return await sql<{ chain: string; net_flow: string }[]>` + WITH period_flows AS ( + SELECT + c.chain, + SUM(ha.total_deposited_usd - ha.total_withdrawn_usd) as net_flow + FROM bridges.hourly_aggregated ha + JOIN bridges.config c ON ha.bridge_id = c.id + WHERE ha.ts >= date_trunc(${period}, NOW()) - ${intervalPeriod} + AND ha.ts < date_trunc(${period}, NOW()) + AND LOWER(c.chain) NOT LIKE '%dydx%' + GROUP BY c.chain + ) + SELECT + chain, + net_flow + FROM period_flows + WHERE net_flow IS NOT NULL + ORDER BY ABS(net_flow) DESC; + `; +}; + export { getBridgeID, getConfigsWithDestChain, @@ -370,4 +407,5 @@ export { queryAggregatedDailyTimestampRange, queryAggregatedHourlyTimestampRange, getLast24HVolume, + getNetflows, };