diff --git a/.env.devnet b/.env.devnet index c562a53..bbee777 100644 --- a/.env.devnet +++ b/.env.devnet @@ -2,6 +2,7 @@ NETWORK=mainnet API_URL=https://api.multiversx.com DATA_API_CEX_URL=https://data-api.multiversx.com/v1/quotes/cex DATA_API_XEXCHANGE_URL=https://data-api.multiversx.com/v1/quotes/xexchange +DATA_API_HATOM_URL=https://data-api.multiversx.com/v1/quotes/hatom # DUNE_API_URL=http://localhost:3001/api/v1/table DUNE_API_URL=https://api.dune.com/api/v1/table DUNE_NAMESPACE=stefanmvx diff --git a/.env.mainnet b/.env.mainnet index aacbf4d..59b77d9 100644 --- a/.env.mainnet +++ b/.env.mainnet @@ -2,6 +2,7 @@ NETWORK=mainnet API_URL=https://api.multiversx.com DATA_API_CEX_URL=https://data-api.multiversx.com/v1/quotes/cex DATA_API_XEXCHANGE_URL=https://data-api.multiversx.com/v1/quotes/xexchange +DATA_API_HATOM_URL=https://data-api.multiversx.com/v1/quotes/hatom DUNE_API_URL=http://localhost:3001/api/v1/table # DUNE_API_URL=https://api.dune.com/api/v1/table DUNE_NAMESPACE=stefanmvx diff --git a/.env.testnet b/.env.testnet index c562a53..bbee777 100644 --- a/.env.testnet +++ b/.env.testnet @@ -2,6 +2,7 @@ NETWORK=mainnet API_URL=https://api.multiversx.com DATA_API_CEX_URL=https://data-api.multiversx.com/v1/quotes/cex DATA_API_XEXCHANGE_URL=https://data-api.multiversx.com/v1/quotes/xexchange +DATA_API_HATOM_URL=https://data-api.multiversx.com/v1/quotes/hatom # DUNE_API_URL=http://localhost:3001/api/v1/table DUNE_API_URL=https://api.dune.com/api/v1/table DUNE_NAMESPACE=stefanmvx diff --git a/.eslintrc.js b/.eslintrc.js index e8fb7e7..823fba4 100644 --- a/.eslintrc.js +++ b/.eslintrc.js @@ -24,8 +24,8 @@ module.exports = { }, 'boundaries/elements': [ { - type: 'apps/api', - pattern: 'apps/api', + type: 'apps/events-processor', + pattern: 'apps/events-processor', }, { type: 'apps/dune-simulator', @@ -70,7 +70,7 @@ module.exports = { default: 'disallow', rules: [ { - from: 'apps/api', + from: 'apps/events-processor', allow: ['libs/common', 'libs/entities', 'libs/services'] }, { @@ -83,12 +83,12 @@ module.exports = { }, { from: 'libs/services', - allow: ['libs/common', 'libs/entities', 'libs/database', 'apps/api', 'apps/dune-simulator'] + allow: ['libs/common', 'libs/entities', 'libs/database', 'apps/events-processor', 'apps/dune-simulator'] }, { from: 'libs/common', allow: ['libs/entities'] - } + }, ] }], 'boundaries/no-unknown': [2], diff --git a/.multiversx/config/config.yaml b/.multiversx/config/config.yaml index fd37903..0655c45 100644 --- a/.multiversx/config/config.yaml +++ b/.multiversx/config/config.yaml @@ -1,5 +1,5 @@ apps: - api: + eventsProcessor: port: 3000 privatePort: 4000 useCachingInterceptor: true @@ -14,6 +14,7 @@ libs: api: ${API_URL} dataApiCex: ${DATA_API_CEX_URL} dataApiXexchange: ${DATA_API_XEXCHANGE_URL} + dataApiHatom: ${DATA_API_HATOM_URL} duneApi: ${DUNE_API_URL} database: host: 'localhost' diff --git a/README.md b/README.md index 2e85578..7ec45a9 100644 --- a/README.md +++ b/README.md @@ -1,9 +1,110 @@ -REST API facade template for microservices that interacts with the MultiversX blockchain. +MultiversX to Dune analytics boilerplate -## Quick start +## Introduction + +This repository features a starting point for extracting, transforming and loading MultiversX specific data into Dune Analytics. + +It includes examples on how to process different Hatom events, such as lending, borrowing and liquidation. + +It also includes a `dune simulator` that exposes the same Rest API interface as Dune Analytics, and is also able to generate charts. This will be very useful for testing. + +Here's an example of a chart generated by the simulator: + +![Dune simulator chart](/assets/simulator-chart.png) + +Here's an example of a chart generated in a Dune Dashboard: + +![Dune analytics charts](/assets/dune-chart.png) + +## Installation + +You might need additional packages installed on your PC in order to install all dependencies (canvas, for example). +Before running `npm install` on `MacOS` (for example), make sure you install all the packages, as following: +``` +brew install pkg-config cairo pango libpng jpeg giflib librsvg +``` 1. Run `npm install` in the project directory -2. Optionally make edits to `config/config.yaml` and/or `.env` files +2. Update `config/config.yaml` and/or `.env` files + +## Extending or contributing + +At the time of the writing, there is no official Dune account to be used, so one that will extend or integrate this project will have to create his own account and datasets. + +In order to contribute, one can follow the implementation of the already integrated features. + +### Architecture + +The project relies on a so-called component `event processor` (similar to `transaction processor` for those familiar with MultiversX microservices) that can return +via a callback all the events that match the provided criteria. + +Calls to this component are triggered via `cron jobs` that will initiate data fetching at given intervals. + +All the received events are then processed by services specific to each use-case. They will make conversions, will update different fields (such as prices), and so on. +After the processing, they will send all the processed events to an accumulator. + +From time to time (by using a different `cron job`), the accumulator will push data to Dune Analytics. + +In testing phases (or when using sensitive data), there's also a different app called `dune simulator` that can receive the events and generate charts. + +Let's see how we can integrate an use case. + +### Use case: Hatom borrowing events + +Let's follow, for example, how Hatom borrowing events processing was integrated: + +1. First, we need to create a service. Have a look at the `libs/services/src/events/hatom.borrow.events.service.ts` service to see how we process events and how we send them to the accumulator. +2. Then, we need to import that service into `libs/services/src/event-processor/processor.service.ts`. +3. After that, we need to create a new cron job for this use-case. In this function, we will initialize an `event processor` instance and we'll configure the desired options: +``` +@Cron(CronExpression.EVERY_10_SECONDS) + async handleHatomBorrowEventsUSDT() { + await Locker.lock('hatom-borrow-USDT-f8c08c', async () => { + const eventProcessorOptions = new EventProcessorOptions({ + elasticUrl: 'https://index.multiversx.com', + eventIdentifiers: ['borrow'], + emitterAddresses: ['erd1qqqqqqqqqqqqqpgqkrgsvct7hfx7ru30mfzk3uy6pxzxn6jj78ss84aldu'], + pageSize: 500, + getLastProcessedTimestamp: async () => { + return await this.dynamicCollectionService.getLastProcessedTimestamp('hatom-borrow-USDT-f8c08c'); + }, + setLastProcessedTimestamp: async (nonce) => { + await this.dynamicCollectionService.setLastProcessedTimestamp('hatom-borrow-USDT-f8c08c', nonce); + }, + onEventsReceived: async (highestTimestamp, events) => { + highestTimestamp; + await this.hatomBorrowService.hatomBorrowParser(events as EventLog[], 'USDT-f8c08c'); + }, + }); + const eventProcessor = new EventProcessor(); + await eventProcessor.start(eventProcessorOptions); + }); + } +``` + +As you can see, we want to receive all the events emitted by the address `erd1qqqqqqqqqqqqqpgqkrgsvct7hfx7ru30mfzk3uy6pxzxn6jj78ss84aldu` and have the identifier `borrow`. + +Inside the functions that handle the last processed timestamps, we will store them into MongoDB for persistance. + +Inside the `onEventsReceived` function, we call our service that will further process the raw events. + +For this example, since we need to query multiple addresses for getting all the `borrow` events, we can either create multiple cron jobs, either set multiple entries in `emitterAddresses`. + +### Dune Analytics + +For interacting with Dune Analytics, you'll need to obtain an API key and set it into `.env.{network}` file. + +After the data is imported into Dune Analytics, there are a few steps until making the first dashboard: +1. Create queries. Dune Analytics uses `DuneSQL, a Trino fork` so make sure you read their documentation +2. Create a dashboard +3. Import existing queries into the dashboard. + +Here's an example of a dashboard: + +![Dashboard example](/assets/dune-dashboard.png) + +You can also click on each chart to see the query that generated it. +The example dashboard is available here: https://dune.com/bro9966/test-hatom-dashboard ## Dependencies @@ -26,17 +127,13 @@ Endpoints that can be used by anyone (public endpoints). Endpoints that are not exposed on the internet For example: We do not want to expose our metrics and cache interactions to anyone (/metrics /cache) -### `Cache Warmer` - -This is used to keep the application cache in sync with new updates. +### `Events Processor` -### `Transaction Processor` +This is used to fetch specific events from index.multiversx.com, extract necessary dataset and send it to Dune via API. -This is used for scanning the transactions from MultiversX Blockchain. +### `Dune Simulator` -### `Queue Worker` - -This is used for concurrently processing heavy jobs. +This is used to simulate Dune responses and behaviour, in order to verify data before making it public. ### `Grafana dashboard` @@ -50,7 +147,7 @@ This is a MultiversX project built on Nest.js framework. ### Environment variables -In order to simplify the scripts, the templates will use the following environment variables: +In order to simplify the scripts, we'll use the following environment variables: - `NODE_ENV` @@ -64,7 +161,7 @@ In order to simplify the scripts, the templates will use the following environme **Description**: Specifies which part of the application to start. -**Possible Values**: `api`, `cache-warmer`, `transactions-processor`, `queue-worker` +**Possible Values**: `events-processor`, `dune-simulator` **Usage**: Selects the specific application module to run. @@ -84,84 +181,26 @@ In order to simplify the scripts, the templates will use the following environme **Usage**: When set to true, the application starts in watch mode, which automatically reloads the app on code changes. - -### `npm run start` - -Runs the app in the production mode. -Make requests to [http://localhost:3001](http://localhost:3001). - -Redis Server is required to be installed. - -## Running the api +## Running the events-processor ```bash # development watch mode on devnet -$ NODE_ENV=devnet NODE_APP=api NODE_WATCH=true npm run start -or -$ NODE_ENV=devnet NODE_WATCH=true npm run start:api +$ NODE_ENV=devnet NODE_APP=events-processor NODE_WATCH=true npm run start:events-processor # development debug mode on devnet -$ NODE_ENV=devnet NODE_APP=api NODE_DEBUG=true npm run start -or -$ NODE_ENV=devnet NODE_DEBUG=true npm run start:api - -# development mode -$ NODE_ENV=devnet NODE_APP=api npm run start -or -$ NODE_ENV=devnet npm run start:api - -# production mode -$ NODE_ENV=mainnet NODE_APP=api npm run start -or -$ NODE_ENV=mainnet npm run start:api +$ NODE_ENV=devnet NODE_APP=events-processor NODE_DEBUG=true npm run start:events-processor ``` -## Running the transactions-processor +## Running the dune-simulator ```bash # development watch mode on devnet -$ NODE_ENV=devnet NODE_APP=transactions-processor NODE_WATCH=true npm run start -or -$ NODE_ENV=devnet NODE_WATCH=true npm run start:transactions-processor +$ NODE_ENV=devnet NODE_APP=dune-simulator NODE_WATCH=true npm run start:dune-simulator # development debug mode on devnet -$ NODE_ENV=devnet NODE_APP=transactions-processor NODE_DEBUG=true npm run start -or -$ NODE_ENV=devnet NODE_DEBUG=true npm run start:transactions-processor - -# development mode on devnet -$ NODE_ENV=devnet NODE_APP=transactions-processor npm run start -or -$ NODE_ENV=devnet npm run start:transactions-processor - -# production mode -$ NODE_ENV=mainnet npm run start:transactions-processor +$ NODE_ENV=devnet NODE_APP=dune-simulator NODE_DEBUG=true npm run start:dune-simulator ``` -## Running the queue-worker - -```bash -# development watch mode on devnet -$ NODE_ENV=devnet NODE_APP=queue-worker NODE_WATCH=true npm run start -or -$ NODE_ENV=devnet NODE_WATCH=true npm run start:queue-worker - -# development debug mode on devnet -$ NODE_ENV=devnet NODE_APP=queue-worker NODE_DEBUG=true npm run start -or -$ NODE_ENV=devnet NODE_DEBUG=true npm run start:queue-worker - -# development mode on devnet -$ NODE_ENV=devnet NODE_APP=queue-worker npm run start -or -$ NODE_ENV=devnet npm run start:queue-worker - -# production mode -$ NODE_ENV=mainnet npm run start:queue-worker -``` - -Requests can be made to http://localhost:3001 for the api. The app will reload when you'll make edits (if opened in watch mode). You will also see any lint errors in the console.​ - ### `npm run test` ```bash @@ -174,3 +213,8 @@ $ npm run test:e2e # test coverage $ npm run test:cov ``` + +### How to start +1. start docker containers +2. start dune-simulator app (if you want to store data locally on your machine) +3. start events-processor app diff --git a/apps/api/src/endpoints/endpoints.module.ts b/apps/api/src/endpoints/endpoints.module.ts deleted file mode 100644 index c4c6ff9..0000000 --- a/apps/api/src/endpoints/endpoints.module.ts +++ /dev/null @@ -1,13 +0,0 @@ -import { Module } from "@nestjs/common"; -import { DynamicModuleUtils } from "@libs/common"; -import { EventsModule } from "./events/events.module"; - -@Module({ - imports: [ - EventsModule, - ], - providers: [ - DynamicModuleUtils.getNestJsApiConfigService(), - ], -}) -export class EndpointsModule { } diff --git a/apps/api/src/endpoints/events/events.controller.ts b/apps/api/src/endpoints/events/events.controller.ts deleted file mode 100644 index 48c86aa..0000000 --- a/apps/api/src/endpoints/events/events.controller.ts +++ /dev/null @@ -1,29 +0,0 @@ -import { Body, Controller, Param, Post } from "@nestjs/common"; -import { ApiTags } from "@nestjs/swagger"; -import { EventLog } from "./entities/event.log"; -import { HatomBorrowEventsService, LiquidityEventsService } from "@libs/services/events"; - -@Controller('/events') -@ApiTags('events') -export class EventsController { - constructor( - private readonly liquidityService: LiquidityEventsService, - private readonly hatomBorrowService: HatomBorrowEventsService, - ) { } - - @Post("/liquidity-webhook") - async liquidityWebhook( - @Body() body: EventLog[], - ): Promise { - await this.liquidityService.liquidityWebhook(body); - } - - @Post("/hatom-webhook/:borrowed_token") - async hatomBorrowWebhook( - @Body() body: EventLog[], - @Param('borrowed_token') borrowedToken: string, - ): Promise { - await this.hatomBorrowService.hatomBorrowWebhook(body, borrowedToken); - } - -} diff --git a/apps/api/src/endpoints/events/events.module.ts b/apps/api/src/endpoints/events/events.module.ts deleted file mode 100644 index 2c213b9..0000000 --- a/apps/api/src/endpoints/events/events.module.ts +++ /dev/null @@ -1,13 +0,0 @@ -import { Module } from "@nestjs/common"; -import { ServicesModule } from "@libs/services/services.module"; -import { EventsController } from "./events.controller"; - -@Module({ - imports: [ - ServicesModule, - ], - controllers: [ - EventsController, - ], -}) -export class EventsModule { } diff --git a/apps/dune-simulator/src/endpoints/dune-simulator/csv.middleware.ts b/apps/dune-simulator/src/endpoints/dune-simulator/csv.middleware.ts index b45ac04..4c2f90c 100644 --- a/apps/dune-simulator/src/endpoints/dune-simulator/csv.middleware.ts +++ b/apps/dune-simulator/src/endpoints/dune-simulator/csv.middleware.ts @@ -4,17 +4,17 @@ import csv from 'csv-parser'; @Injectable() export class CsvParserMiddleware implements NestMiddleware { - use(req: Request, _: Response, next: NextFunction) { - if (req.headers['content-type'] === 'text/csv') { - const results: any = []; - req.pipe(csv()) - .on('data', (data: any) => results.push(data)) - .on('end', () => { - req.body = results; - next(); - }); - } else { - next(); - } + use(req: Request, _: Response, next: NextFunction) { + if (req.headers['content-type'] === 'text/csv') { + const results: any = []; + req.pipe(csv()) + .on('data', (data: any) => results.push(data)) + .on('end', () => { + req.body = results; + next(); + }); + } else { + next(); } + } } diff --git a/apps/dune-simulator/src/endpoints/dune-simulator/dune-simulator.controller.ts b/apps/dune-simulator/src/endpoints/dune-simulator/dune-simulator.controller.ts index bb63cf1..4de42b1 100644 --- a/apps/dune-simulator/src/endpoints/dune-simulator/dune-simulator.controller.ts +++ b/apps/dune-simulator/src/endpoints/dune-simulator/dune-simulator.controller.ts @@ -7,76 +7,76 @@ import { Response } from "express"; @Controller('/api/v1/table') @ApiTags('dune-simulator') export class DuneSimulatorController { - constructor( - private readonly duneSimulatorService: DuneSimulatorService, - ) { } + constructor( + private readonly duneSimulatorService: DuneSimulatorService, + ) { } - @Post("/create") - async createTable( - @Headers('x-dune-api-key') apiKey: string, - @Headers('content-type') contentType: string, - @Body() body: CreateTableBody, - ): Promise { - try { - const response = await this.duneSimulatorService.createTable(apiKey, contentType, body); - return response; - } catch (error) { - throw error; - } + @Post("/create") + async createTable( + @Headers('x-dune-api-key') apiKey: string, + @Headers('content-type') contentType: string, + @Body() body: CreateTableBody, + ): Promise { + try { + const response = await this.duneSimulatorService.createTable(apiKey, contentType, body); + return response; + } catch (error) { + throw error; } + } - @Post("/:name_space/:table_name/insert") - async insertIntoTable( - @Param('name_space') nameSpace: string, - @Param('table_name') tableName: string, - @Headers('x-dune-api-key') apiKey: string, - @Headers('content-type') contentType: string, - @Body() body: any[], - ): Promise<{ 'rows_written': number, 'bytes_written': number }> { - try { - const response = await this.duneSimulatorService.insertIntoTable(nameSpace, tableName, body, apiKey, contentType); - return response; - } catch (error) { - throw error; - } + @Post("/:name_space/:table_name/insert") + async insertIntoTable( + @Param('name_space') nameSpace: string, + @Param('table_name') tableName: string, + @Headers('x-dune-api-key') apiKey: string, + @Headers('content-type') contentType: string, + @Body() body: any[], + ): Promise<{ 'rows_written': number, 'bytes_written': number }> { + try { + const response = await this.duneSimulatorService.insertIntoTable(nameSpace, tableName, body, apiKey, contentType); + return response; + } catch (error) { + throw error; } + } - @Get("/generate/chart/:table_name/:x_axis/:y_axis/png") - async generateChartPng( - @Param('table_name') tableName: string, - @Param('x_axis') xAxis: string, - @Param('y_axis') yAxis: string, - @Res() res: Response, - ): Promise { - try { + @Get("/generate/chart/:table_name/:x_axis/:y_axis/png") + async generateChartPng( + @Param('table_name') tableName: string, + @Param('x_axis') xAxis: string, + @Param('y_axis') yAxis: string, + @Res() res: Response, + ): Promise { + try { - const imageBuffer = await this.duneSimulatorService.generateChartPng(tableName, xAxis, yAxis); + const imageBuffer = await this.duneSimulatorService.generateChartPng(tableName, xAxis, yAxis); - res.setHeader('Content-Type', 'image/png'); - res.send(imageBuffer); - - } catch (error) { - throw error; - } + res.setHeader('Content-Type', 'image/png'); + res.send(imageBuffer); + } catch (error) { + throw error; } - @Get("/generate/chart/:table_name/:x_axis/:y_axis/html") - async generateChartHtml( - @Param('table_name') tableName: string, - @Param('x_axis') xAxis: string, - @Param('y_axis') yAxis: string, - @Res() res: Response, - ): Promise { - try { - const htmlContent = await this.duneSimulatorService.generateChartHtml(tableName, xAxis, yAxis); + } - res.setHeader('Content-Type', 'text/html'); - res.send(htmlContent); + @Get("/generate/chart/:table_name/:x_axis/:y_axis/html") + async generateChartHtml( + @Param('table_name') tableName: string, + @Param('x_axis') xAxis: string, + @Param('y_axis') yAxis: string, + @Res() res: Response, + ): Promise { + try { + const htmlContent = await this.duneSimulatorService.generateChartHtml(tableName, xAxis, yAxis); - } catch (error) { - throw error; - } + res.setHeader('Content-Type', 'text/html'); + res.send(htmlContent); + } catch (error) { + throw error; } + + } } diff --git a/apps/dune-simulator/src/endpoints/dune-simulator/dune-simulator.module.ts b/apps/dune-simulator/src/endpoints/dune-simulator/dune-simulator.module.ts index 7fc4f61..807448f 100644 --- a/apps/dune-simulator/src/endpoints/dune-simulator/dune-simulator.module.ts +++ b/apps/dune-simulator/src/endpoints/dune-simulator/dune-simulator.module.ts @@ -3,17 +3,17 @@ import { DuneSimulatorController } from "./dune-simulator.controller"; import { DuneSimulatorServicesModule } from "@libs/services"; import { CsvParserMiddleware } from "./csv.middleware"; @Module({ - imports: [ - DuneSimulatorServicesModule, - ], - controllers: [ - DuneSimulatorController, - ], + imports: [ + DuneSimulatorServicesModule, + ], + controllers: [ + DuneSimulatorController, + ], }) export class DuneSimulatorModule { - configure(consumer: MiddlewareConsumer) { - consumer - .apply(CsvParserMiddleware) - .forRoutes({ path: '/api/v1/table/:name_space/:table_name/insert', method: RequestMethod.POST }); - } + configure(consumer: MiddlewareConsumer) { + consumer + .apply(CsvParserMiddleware) + .forRoutes({ path: '/api/v1/table/:name_space/:table_name/insert', method: RequestMethod.POST }); + } } diff --git a/apps/dune-simulator/src/endpoints/dune-simulator/entities/create.table.ts b/apps/dune-simulator/src/endpoints/dune-simulator/entities/create.table.ts index 1559474..fec0449 100644 --- a/apps/dune-simulator/src/endpoints/dune-simulator/entities/create.table.ts +++ b/apps/dune-simulator/src/endpoints/dune-simulator/entities/create.table.ts @@ -1,34 +1,34 @@ import { ApiProperty } from "@nestjs/swagger"; export class TableSchema { - constructor(init?: Partial) { - Object.assign(this, init); - } - @ApiProperty() - name!: string; - - @ApiProperty() - type!: string; + constructor(init?: Partial) { + Object.assign(this, init); + } + @ApiProperty() + name!: string; + + @ApiProperty() + type!: string; } export class CreateTableBody { - constructor(init?: Partial) { - Object.assign(this, init); - } + constructor(init?: Partial) { + Object.assign(this, init); + } - @ApiProperty() - namespace!: string; + @ApiProperty() + namespace!: string; - @ApiProperty() - table_name!: string; + @ApiProperty() + table_name!: string; - @ApiProperty() - description?: string = ""; + @ApiProperty() + description?: string = ""; - @ApiProperty() - schema!: TableSchema[]; + @ApiProperty() + schema!: TableSchema[]; - @ApiProperty() - is_private!: boolean; + @ApiProperty() + is_private!: boolean; } diff --git a/apps/dune-simulator/src/endpoints/dune-simulator/entities/csv.file.ts b/apps/dune-simulator/src/endpoints/dune-simulator/entities/csv.file.ts index 83ad7e9..6c5c0aa 100644 --- a/apps/dune-simulator/src/endpoints/dune-simulator/entities/csv.file.ts +++ b/apps/dune-simulator/src/endpoints/dune-simulator/entities/csv.file.ts @@ -2,24 +2,24 @@ import { ApiProperty } from "@nestjs/swagger"; import BigNumber from "bignumber.js"; export class CsvFileEntity { - constructor(init?: Partial) { - Object.assign(this, init); - } + constructor(init?: Partial) { + Object.assign(this, init); + } - @ApiProperty() - timestamp!: string; + @ApiProperty() + timestamp!: string; - @ApiProperty() - volumeusd!: BigNumber; + @ApiProperty() + volumeusd!: BigNumber; } export class CsvFile { - constructor(init?: Partial) { - Object.assign(this, init); - } - @ApiProperty() - headers!: string; + constructor(init?: Partial) { + Object.assign(this, init); + } + @ApiProperty() + headers!: string; - @ApiProperty() - schema!: string[]; + @ApiProperty() + schema!: string[]; } diff --git a/apps/dune-simulator/src/endpoints/endpoints.module.ts b/apps/dune-simulator/src/endpoints/endpoints.module.ts index 1031e56..fb6428d 100644 --- a/apps/dune-simulator/src/endpoints/endpoints.module.ts +++ b/apps/dune-simulator/src/endpoints/endpoints.module.ts @@ -3,11 +3,11 @@ import { DynamicModuleUtils } from "@libs/common"; import { DuneSimulatorModule } from "./dune-simulator/dune-simulator.module"; @Module({ - imports: [ - DuneSimulatorModule, - ], - providers: [ - DynamicModuleUtils.getNestJsApiConfigService(), - ], + imports: [ + DuneSimulatorModule, + ], + providers: [ + DynamicModuleUtils.getNestJsApiConfigService(), + ], }) export class EndpointsModule { } diff --git a/apps/api/docs/swagger.md b/apps/events-processor/docs/swagger.md similarity index 100% rename from apps/api/docs/swagger.md rename to apps/events-processor/docs/swagger.md diff --git a/apps/api/src/config/app-config.module.ts b/apps/events-processor/src/config/app-config.module.ts similarity index 100% rename from apps/api/src/config/app-config.module.ts rename to apps/events-processor/src/config/app-config.module.ts diff --git a/apps/api/src/config/app-config.service.ts b/apps/events-processor/src/config/app-config.service.ts similarity index 82% rename from apps/api/src/config/app-config.service.ts rename to apps/events-processor/src/config/app-config.service.ts index 58d59cd..1db089a 100644 --- a/apps/api/src/config/app-config.service.ts +++ b/apps/events-processor/src/config/app-config.service.ts @@ -3,7 +3,7 @@ import { Injectable } from "@nestjs/common"; @Injectable() export class AppConfigService { - readonly config = configuration().apps.api; + readonly config = configuration().apps.eventsProcessor; getDuneNamespace(): string { return configuration().libs.common.features.dune.namespace ?? ""; @@ -25,6 +25,10 @@ export class AppConfigService { return configuration().libs.common.urls.dataApiXexchange ?? ""; } + getDataApiHatomUrl(): string { + return configuration().libs.common.urls.dataApiHatom ?? ""; + } + getDuneApiUrl(): string { return configuration().libs.common.urls.duneApi ?? ""; } diff --git a/apps/api/src/main.ts b/apps/events-processor/src/main.ts similarity index 100% rename from apps/api/src/main.ts rename to apps/events-processor/src/main.ts diff --git a/apps/api/src/private.app.module.ts b/apps/events-processor/src/private.app.module.ts similarity index 91% rename from apps/api/src/private.app.module.ts rename to apps/events-processor/src/private.app.module.ts index 644ae12..93e62fa 100644 --- a/apps/api/src/private.app.module.ts +++ b/apps/events-processor/src/private.app.module.ts @@ -4,6 +4,7 @@ import { ApiMetricsModule, DynamicModuleUtils } from '@libs/common'; import { LoggingModule } from '@multiversx/sdk-nestjs-common'; import { CommonConfigModule } from '@libs/common/config/common.config.module'; import { AppConfigModule } from './config/app-config.module'; +import { ServicesModule } from '@libs/services'; @Module({ imports: [ @@ -12,6 +13,7 @@ import { AppConfigModule } from './config/app-config.module'; DynamicModuleUtils.getCachingModule(), CommonConfigModule, AppConfigModule, + ServicesModule, ], providers: [ DynamicModuleUtils.getNestJsApiConfigService(), diff --git a/apps/api/src/endpoints/events/entities/event.log.ts b/apps/events-processor/src/processor/entities/event.log.ts similarity index 100% rename from apps/api/src/endpoints/events/entities/event.log.ts rename to apps/events-processor/src/processor/entities/event.log.ts diff --git a/apps/api/src/endpoints/events/entities/index.ts b/apps/events-processor/src/processor/entities/index.ts similarity index 100% rename from apps/api/src/endpoints/events/entities/index.ts rename to apps/events-processor/src/processor/entities/index.ts diff --git a/apps/api/src/public.app.module.ts b/apps/events-processor/src/public.app.module.ts similarity index 87% rename from apps/api/src/public.app.module.ts rename to apps/events-processor/src/public.app.module.ts index e255c65..d1f73f2 100644 --- a/apps/api/src/public.app.module.ts +++ b/apps/events-processor/src/public.app.module.ts @@ -1,18 +1,18 @@ import { Module } from '@nestjs/common'; -import { EndpointsModule } from './endpoints/endpoints.module'; import { DynamicModuleUtils } from '@libs/common'; import { LoggingModule } from '@multiversx/sdk-nestjs-common'; import { CommonConfigModule } from '@libs/common/config/common.config.module'; import { AppConfigModule } from './config/app-config.module'; import { ScheduleModule } from '@nestjs/schedule'; +import { ServicesModule } from '@libs/services'; @Module({ imports: [ LoggingModule, - EndpointsModule, AppConfigModule, CommonConfigModule, ScheduleModule.forRoot(), + ServicesModule, ], providers: [ DynamicModuleUtils.getNestJsApiConfigService(), diff --git a/apps/api/test/app.e2e-spec.ts b/apps/events-processor/test/app.e2e-spec.ts similarity index 100% rename from apps/api/test/app.e2e-spec.ts rename to apps/events-processor/test/app.e2e-spec.ts diff --git a/apps/api/test/jest-e2e.json b/apps/events-processor/test/jest-e2e.json similarity index 100% rename from apps/api/test/jest-e2e.json rename to apps/events-processor/test/jest-e2e.json diff --git a/apps/api/tsconfig.app.json b/apps/events-processor/tsconfig.app.json similarity index 100% rename from apps/api/tsconfig.app.json rename to apps/events-processor/tsconfig.app.json diff --git a/assets/dune-chart.png b/assets/dune-chart.png new file mode 100644 index 0000000..191da6e Binary files /dev/null and b/assets/dune-chart.png differ diff --git a/assets/dune-dashboard.png b/assets/dune-dashboard.png new file mode 100644 index 0000000..f34036a Binary files /dev/null and b/assets/dune-dashboard.png differ diff --git a/assets/simulator-chart.png b/assets/simulator-chart.png new file mode 100644 index 0000000..901d082 Binary files /dev/null and b/assets/simulator-chart.png differ diff --git a/config/config.yaml b/config/config.yaml index fd37903..0655c45 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -1,5 +1,5 @@ apps: - api: + eventsProcessor: port: 3000 privatePort: 4000 useCachingInterceptor: true @@ -14,6 +14,7 @@ libs: api: ${API_URL} dataApiCex: ${DATA_API_CEX_URL} dataApiXexchange: ${DATA_API_XEXCHANGE_URL} + dataApiHatom: ${DATA_API_HATOM_URL} duneApi: ${DUNE_API_URL} database: host: 'localhost' diff --git a/config/schema.yaml b/config/schema.yaml index 23820ff..ad974d1 100644 --- a/config/schema.yaml +++ b/config/schema.yaml @@ -1,6 +1,6 @@ title: config apps: - api: + eventsProcessor: port: integer privatePort: integer useCachingInterceptor: boolean @@ -17,6 +17,7 @@ libs: api: string dataApiCex: string dataApiXexchange: string + dataApiHatom: string duneApi: string database: host: string diff --git a/libs/common/src/entities/config.d.ts b/libs/common/src/entities/config.d.ts index 5c61796..6a5fa4a 100644 --- a/libs/common/src/entities/config.d.ts +++ b/libs/common/src/entities/config.d.ts @@ -2,9 +2,9 @@ export interface Config { apps: { - api: { + eventsProcessor: { port: number; - privatePort: number; + privatePort: port; useCachingInterceptor: boolean; }; duneSimulator: { @@ -20,6 +20,7 @@ export interface Config { api: string; dataApiCex: string; dataApiXexchange: string; + dataApiHatom: string; duneApi: string; }; database: { diff --git a/libs/database/src/collections/dynamic.collection.ts b/libs/database/src/collections/dynamic.collection.ts index 08467a2..6825263 100644 --- a/libs/database/src/collections/dynamic.collection.ts +++ b/libs/database/src/collections/dynamic.collection.ts @@ -56,7 +56,6 @@ export class DynamicCollectionRepository { return collectionDocuments; } - private mapToMongooseType(fieldType: string) { switch (fieldType.toLowerCase()) { case 'varchar': @@ -67,5 +66,33 @@ export class DynamicCollectionRepository { throw new HttpException(`Unsupported field type: ${fieldType}`, HttpStatus.BAD_REQUEST); } } + + public async setLastProcessedTimestamp(key: string, nonce: number) { + const collectionName = `${key}_last_processed_nonce`; + const existingCollections = await this.connection.db.listCollections({ name: collectionName }).toArray(); + if (existingCollections.length === 0) { + const schema = new Schema({ + nonce: { type: Number, required: true }, + }); + this.connection.model(collectionName, schema); + } + + const dynamicModel = this.connection.db.collection(collectionName); + await dynamicModel.updateOne({}, { $set: { "nonce": nonce } }, { upsert: true }); + } + + public async getLastProcessedTimestamp(key: string) { + const collectionName = `${key}_last_processed_nonce`; + const existingCollections = await this.connection.db.listCollections({ name: collectionName }).toArray(); + if (existingCollections.length === 0) { + return 0; + } + const dynamicModel = this.connection.db.collection(collectionName); + const lastProcessedNonce = await dynamicModel.findOne(); + if (!lastProcessedNonce) { + return 0; + } + return lastProcessedNonce.nonce; + } } diff --git a/libs/services/src/data/data.service.ts b/libs/services/src/data/data.service.ts index ca45c4f..487d708 100644 --- a/libs/services/src/data/data.service.ts +++ b/libs/services/src/data/data.service.ts @@ -4,7 +4,7 @@ import { Injectable } from "@nestjs/common"; import BigNumber from "bignumber.js"; import axios from 'axios'; import moment from "moment"; -import { AppConfigService } from "apps/api/src/config/app-config.service"; +import { AppConfigService } from "apps/events-processor/src/config/app-config.service"; import { OriginLogger } from "@multiversx/sdk-nestjs-common"; interface TokenPrice { @@ -23,16 +23,29 @@ export class DataService { private readonly appConfigService: AppConfigService, ) { } - async getTokenPrice(tokenId: string, date: moment.Moment): Promise { + async getTokenPrice(tokenId: string, date: moment.Moment, market?: string): Promise { return await this.cachingService.getOrSet( CacheInfo.TokenPrice(tokenId, date).key, - async () => await this.getTokenPriceRaw(tokenId, date), + async () => await this.getTokenPriceRaw(tokenId, date, market), CacheInfo.TokenPrice(tokenId, date).ttl ); } - async getTokenPriceRaw(tokenId: string, date: moment.Moment): Promise { + async getTokenPriceRaw(tokenId: string, date: moment.Moment, market?: string): Promise { try { + if (market) { + switch (market) { + case 'hatom': + return (await axios.get(`${this.appConfigService.getDataApiHatomUrl()}/${tokenId}?date=${date.format('YYYY-MM-DD')}`)).data.price; + case 'xexchange': + return (await axios.get(`${this.appConfigService.getDataApiXexchangeUrl()}/${tokenId}?date=${date.format('YYYY-MM-DD')}`)).data.price; + case 'cex': + return (await axios.get(`${this.appConfigService.getDataApiCexUrl()}/${tokenId}?date=${date.format('YYYY-MM-DD')}`)).data.price; + default: + throw Error('Invalid market !'); + } + } + if (tokenId.startsWith('USD')) { return (await axios.get(`${this.appConfigService.getDataApiCexUrl()}/${tokenId}?date=${date.format('YYYY-MM-DD')}`)).data.price; } diff --git a/libs/services/src/dune-sender/dune-sender.service.ts b/libs/services/src/dune-sender/dune-sender.service.ts index 23a1adf..d14ada4 100644 --- a/libs/services/src/dune-sender/dune-sender.service.ts +++ b/libs/services/src/dune-sender/dune-sender.service.ts @@ -2,103 +2,103 @@ import { Injectable } from "@nestjs/common"; import { Cron, CronExpression } from "@nestjs/schedule"; import { Lock, OriginLogger } from "@multiversx/sdk-nestjs-common"; import { CsvRecordsService } from "../records"; -import { AppConfigService } from "apps/api/src/config/app-config.service"; +import { AppConfigService } from "apps/events-processor/src/config/app-config.service"; import axios from 'axios'; import { TableSchema } from "apps/dune-simulator/src/endpoints/dune-simulator/entities"; import { toSnakeCase } from "libs/services/utils"; @Injectable() export class DuneSenderService { - private readonly logger = new OriginLogger(DuneSenderService.name); + private readonly logger = new OriginLogger(DuneSenderService.name); - constructor( - private readonly csvRecordsService: CsvRecordsService, - private readonly appConfigService: AppConfigService, - ) { } + constructor( + private readonly csvRecordsService: CsvRecordsService, + private readonly appConfigService: AppConfigService, + ) { } - @Cron(CronExpression.EVERY_10_SECONDS) - @Lock({ name: 'send-csv-to-dune', verbose: false }) - async sendCsvRecordsToDune(): Promise { - const records: Record = this.csvRecordsService.getRecords(); + @Cron(CronExpression.EVERY_10_SECONDS) + @Lock({ name: 'send-csv-to-dune', verbose: false }) + async sendCsvRecordsToDune(): Promise { + const records: Record = this.csvRecordsService.getRecords(); - await this.sendCsvToDune(records); - } + await this.sendCsvToDune(records); + } - async sendCsvToDune(records: Record) { - for (const [csvFileName, lines] of Object.entries(records)) { - if (lines.length === 0) { - continue; - } - const [resultString, linesLength] = await this.csvRecordsService.formatRecord(csvFileName); + async sendCsvToDune(records: Record) { + for (const [csvFileName, lines] of Object.entries(records)) { + if (lines.length === 0) { + continue; + } + const [resultString, linesLength] = await this.csvRecordsService.formatRecord(csvFileName); - const csvData: Buffer = Buffer.from(resultString, 'utf-8'); + const csvData: Buffer = Buffer.from(resultString, 'utf-8'); - this.logger.log("starting sending data from file " + csvFileName); + this.logger.log("starting sending data from file " + csvFileName); - const formattedCsvFileName = toSnakeCase(csvFileName); + const formattedCsvFileName = toSnakeCase(csvFileName); - const isRecordSent = await this.insertCsvDataToTable(formattedCsvFileName, csvData); + const isRecordSent = await this.insertCsvDataToTable(formattedCsvFileName, csvData); - if (isRecordSent) { - await this.csvRecordsService.deleteFirstRecords(csvFileName, linesLength); - } - } + if (isRecordSent) { + await this.csvRecordsService.deleteFirstRecords(csvFileName, linesLength); + } } + } + + async createTable(tableName: string): Promise { + try { + const schema: TableSchema[] = this.csvRecordsService.getHeaders(tableName); + + const url = `${this.appConfigService.getDuneApiUrl()}/create`; + const payload = { + 'namespace': this.appConfigService.getDuneNamespace(), + 'table_name': tableName, + 'description': 'test', + 'schema': schema, + "is_private": false, + }; + + const response = await axios.post(url, JSON.stringify(payload), { + headers: { + 'content-type': 'application/json', + 'x-dune-api-key': this.appConfigService.getDuneApiKey(), + }, + }); + this.logger.log(response.data); + } catch (error) { + if (axios.isAxiosError(error) && error.response) { + this.logger.log(error.response.data); + return false; + } - async createTable(tableName: string): Promise { - try { - const schema: TableSchema[] = this.csvRecordsService.getHeaders(tableName); - - const url = `${this.appConfigService.getDuneApiUrl()}/create`; - const payload = { - 'namespace': this.appConfigService.getDuneNamespace(), - 'table_name': tableName, - 'description': 'test', - 'schema': schema, - "is_private": false, - }; - - const response = await axios.post(url, JSON.stringify(payload), { - headers: { - 'content-type': 'application/json', - 'x-dune-api-key': this.appConfigService.getDuneApiKey(), - }, - }); - this.logger.log(response.data); - } catch (error) { - if (axios.isAxiosError(error) && error.response) { - this.logger.log(error.response.data); - return false; - } - - } - return true; } - - async insertCsvDataToTable(tableName: string, data: Buffer): Promise { - - try { - const url = `${this.appConfigService.getDuneApiUrl()}/${this.appConfigService.getDuneNamespace()}/${tableName}/insert`; - const response = await axios.post(url, data, { - headers: { - 'content-type': 'text/csv', - 'x-dune-api-key': this.appConfigService.getDuneApiKey(), - }, - }); - this.logger.log(response.data); - } catch (error) { - if (axios.isAxiosError(error) && error.response) { - this.logger.log(error.response.data); - if (error.response.status === 404) { - this.logger.log("Trying to create local table !"); - const isTableCreated = await this.createTable(tableName); - if (isTableCreated) { - this.logger.log("Table was created"); - } - } - } - return false; + return true; + } + + async insertCsvDataToTable(tableName: string, data: Buffer): Promise { + + try { + const url = `${this.appConfigService.getDuneApiUrl()}/${this.appConfigService.getDuneNamespace()}/${tableName}/insert`; + const response = await axios.post(url, data, { + headers: { + 'content-type': 'text/csv', + 'x-dune-api-key': this.appConfigService.getDuneApiKey(), + }, + }); + this.logger.log(response.data); + } catch (error) { + if (axios.isAxiosError(error) && error.response) { + this.logger.log(error.response.data); + if (error.response.status === 404) { + this.logger.log("Trying to create local table !"); + const isTableCreated = await this.createTable(tableName); + if (isTableCreated) { + this.logger.log("Table was created"); + } } - return true; + } + return false; } + return true; + } } diff --git a/libs/services/src/dune-simulator/dune-simulator.service.ts b/libs/services/src/dune-simulator/dune-simulator.service.ts index 16f47c6..df5deb6 100644 --- a/libs/services/src/dune-simulator/dune-simulator.service.ts +++ b/libs/services/src/dune-simulator/dune-simulator.service.ts @@ -5,187 +5,188 @@ import ChartJSImage from 'chartjs-to-image'; import path from "path"; import fs from "fs"; import { DynamicCollectionRepository } from "@libs/database/collections"; + @Injectable() export class DuneSimulatorService { - private readonly logger = new OriginLogger(DuneSimulatorService.name); - - constructor( - private readonly dynamicCollectionRepository: DynamicCollectionRepository, - ) { } + private readonly logger = new OriginLogger(DuneSimulatorService.name); - async createTable(apiKey: string, contentType: string, body: CreateTableBody): Promise { - if (contentType !== 'application/json') { - throw new HttpException("Content-Type header is not application/json", HttpStatus.BAD_REQUEST); - } + constructor( + private readonly dynamicCollectionRepository: DynamicCollectionRepository, + ) { } - if (!body.namespace || !apiKey) { - throw new HttpException(`You are not authorized to create a table under the ${body.namespace} namespace`, HttpStatus.UNAUTHORIZED); - } + async createTable(apiKey: string, contentType: string, body: CreateTableBody): Promise { + if (contentType !== 'application/json') { + throw new HttpException("Content-Type header is not application/json", HttpStatus.BAD_REQUEST); + } - try { - await this.dynamicCollectionRepository.createTable(body.table_name, body.schema); - } catch (error) { - throw error; - } + if (!body.namespace || !apiKey) { + throw new HttpException(`You are not authorized to create a table under the ${body.namespace} namespace`, HttpStatus.UNAUTHORIZED); + } - return { - 'namespace': body.namespace, - 'table_name': body.table_name, - 'full_name': `dune.${body.namespace}.${body.table_name}`, - 'example_query': `select * from dune.${body.namespace}.${body.table_name}`, - 'already_existed': false, - 'message': "Table created successfully", - }; + try { + await this.dynamicCollectionRepository.createTable(body.table_name, body.schema); + } catch (error) { + throw error; } - async insertIntoTable( - nameSpace: string, - tableName: string, - data: any[], - apiKey: string, - contentType: string - ): Promise<{ 'rows_written': number, 'bytes_written': number }> { - if (!nameSpace || !apiKey) { - throw new HttpException(`You are not authorized to write to the table named ${nameSpace}.${tableName}`, HttpStatus.UNAUTHORIZED); - } - if (contentType !== 'text/csv') { - throw new HttpException(`{Invalid content type ${contentType}. We support CSV (Content-Type: text/csv) and newline delimited JSON (Content-Type: application/x-ndjson).` - , HttpStatus.BAD_REQUEST); - } + return { + 'namespace': body.namespace, + 'table_name': body.table_name, + 'full_name': `dune.${body.namespace}.${body.table_name}`, + 'example_query': `select * from dune.${body.namespace}.${body.table_name}`, + 'already_existed': false, + 'message': "Table created successfully", + }; + } + + async insertIntoTable( + nameSpace: string, + tableName: string, + data: any[], + apiKey: string, + contentType: string + ): Promise<{ 'rows_written': number, 'bytes_written': number }> { + if (!nameSpace || !apiKey) { + throw new HttpException(`You are not authorized to write to the table named ${nameSpace}.${tableName}`, HttpStatus.UNAUTHORIZED); + } + if (contentType !== 'text/csv') { + throw new HttpException(`{Invalid content type ${contentType}. We support CSV (Content-Type: text/csv) and newline delimited JSON (Content-Type: application/x-ndjson).` + , HttpStatus.BAD_REQUEST); + } - try { - await this.dynamicCollectionRepository.insertIntoTable(tableName, data); - } catch (error) { - this.logger.error(error); - throw error; - } + try { + await this.dynamicCollectionRepository.insertIntoTable(tableName, data); + } catch (error) { + this.logger.error(error); + throw error; + } - const rowsWritten = data.length; + const rowsWritten = data.length; - const jsonString = JSON.stringify(data); + const jsonString = JSON.stringify(data); - const bytes_written = Buffer.byteLength(jsonString, 'utf8'); - return { 'rows_written': rowsWritten, 'bytes_written': bytes_written }; - } + const bytes_written = Buffer.byteLength(jsonString, 'utf8'); + return { 'rows_written': rowsWritten, 'bytes_written': bytes_written }; + } - async generateChartPng(tableName: string, xAxis: string, yAxis: string) { - try { + async generateChartPng(tableName: string, xAxis: string, yAxis: string) { + try { - const points = await this.getCsvDataFromDB(tableName, xAxis, yAxis); - const chart = this.createChart(points, xAxis, yAxis, tableName, 800, 600); - const buffer = await chart.toBinary(); - return buffer; + const points = await this.getCsvDataFromDB(tableName, xAxis, yAxis); + const chart = this.createChart(points, xAxis, yAxis, tableName, 800, 600); + const buffer = await chart.toBinary(); + return buffer; - } catch (error) { - throw error; - } + } catch (error) { + throw error; } + } - async generateChartHtml(tableName: string, xAxis: string, yAxis: string) { - try { - const points = await this.getCsvDataFromDB(tableName, xAxis, yAxis); + async generateChartHtml(tableName: string, xAxis: string, yAxis: string) { + try { + const points = await this.getCsvDataFromDB(tableName, xAxis, yAxis); - const templatePath = path.join(process.cwd(), 'libs/services/src/dune-simulator', 'chart-template.html'); - const htmlTemplate = fs.readFileSync(templatePath, 'utf8'); + const templatePath = path.join(process.cwd(), 'libs/services/src/dune-simulator', 'chart-template.html'); + const htmlTemplate = fs.readFileSync(templatePath, 'utf8'); - const updatedHtml = htmlTemplate - .replace(/{{chartTitle}}/g, tableName) - .replace(/'{{labels}}'/g, JSON.stringify(points.map(point => new Date(point[0]).toISOString()))) - .replace(/'{{data}}'/g, JSON.stringify(points.map(point => point[1]))) - .replace(/{{xTitle}}/g, xAxis) - .replace(/{{yTitle}}/g, yAxis); + const updatedHtml = htmlTemplate + .replace(/{{chartTitle}}/g, tableName) + .replace(/'{{labels}}'/g, JSON.stringify(points.map(point => new Date(point[0]).toISOString()))) + .replace(/'{{data}}'/g, JSON.stringify(points.map(point => point[1]))) + .replace(/{{xTitle}}/g, xAxis) + .replace(/{{yTitle}}/g, yAxis); - return updatedHtml; - } catch (error) { - throw error; - } + return updatedHtml; + } catch (error) { + throw error; } + } + + + createChart(points: number[][], + xTitle: string, + yTitle: string, + chartName: string, + width: number, + height: number + ): ChartJSImage { + const downsampler = require('downsample-lttb'); + + const downsampledData: number[][] = points.length > 2500 ? downsampler.processData(points, 2500) : points; + + const chart = new ChartJSImage(); + chart.setConfig({ + type: 'line', + data: { + datasets: [{ + label: chartName, + borderColor: 'rgb(75, 192, 192)', + backgroundColor: 'rgba(75, 192, 192, 0.2)', + data: downsampledData.map(item => { return { x: item[0], y: item[1] }; }), + fill: true, + pointRadius: 0, + }], + }, + options: { + title: { + display: true, + text: `${chartName} Area Chart`, + }, + scales: { + xAxes: [{ + type: 'time', + time: { + unit: 'month', + displayFormats: { + month: 'MMM yyyy', + }, + tooltipFormat: 'MMM yyyy', + }, + scaleLabel: { + display: true, + labelString: xTitle, + }, + ticks: { + autoSkip: true, + maxRotation: 0, + minRotation: 0, + }, - - createChart(points: number[][], - xTitle: string, - yTitle: string, - chartName: string, - width: number, - height: number - ): ChartJSImage { - const downsampler = require('downsample-lttb'); - - const downsampledData: number[][] = points.length > 2500 ? downsampler.processData(points, 2500) : points; - - const chart = new ChartJSImage(); - chart.setConfig({ - type: 'line', - data: { - datasets: [{ - label: chartName, - borderColor: 'rgb(75, 192, 192)', - backgroundColor: 'rgba(75, 192, 192, 0.2)', - data: downsampledData.map(item => { return { x: item[0], y: item[1] }; }), - fill: true, - pointRadius: 0, - }], + }], + yAxes: [{ + scaleLabel: { + display: true, + labelString: yTitle, }, - options: { - title: { - display: true, - text: `${chartName} Area Chart`, - }, - scales: { - xAxes: [{ - type: 'time', - time: { - unit: 'month', - displayFormats: { - month: 'MMM yyyy', - }, - tooltipFormat: 'MMM yyyy', - }, - scaleLabel: { - display: true, - labelString: xTitle, - }, - ticks: { - autoSkip: true, - maxRotation: 0, - minRotation: 0, - }, - - }], - yAxes: [{ - scaleLabel: { - display: true, - labelString: yTitle, - }, - ticks: { - beginAtZero: true, - }, - }], - }, + ticks: { + beginAtZero: true, }, - }); - chart.setWidth(width).setHeight(height); + }], + }, + }, + }); + chart.setWidth(width).setHeight(height); - return chart; - } + return chart; + } - async getCsvDataFromDB(tableName: string, xAxis: string, yAxis: string): Promise { - const records = await this.dynamicCollectionRepository.getCollectionDocuments(tableName); + async getCsvDataFromDB(tableName: string, xAxis: string, yAxis: string): Promise { + const records = await this.dynamicCollectionRepository.getCollectionDocuments(tableName); - const points = []; + const points = []; - try { - for (const record of records) { - const value = parseFloat(record[yAxis]); - if (!isNaN(value)) { - points.push([Date.parse(record[xAxis]), value]); - } - } - } catch (error) { - throw error; + try { + for (const record of records) { + const value = parseFloat(record[yAxis]); + if (!isNaN(value)) { + points.push([Date.parse(record[xAxis]), value]); } - - return points; + } + } catch (error) { + throw error; } + + return points; + } } diff --git a/libs/services/src/event-processor/event.processor.ts b/libs/services/src/event-processor/event.processor.ts index 5f53f65..517662d 100644 --- a/libs/services/src/event-processor/event.processor.ts +++ b/libs/services/src/event-processor/event.processor.ts @@ -20,7 +20,7 @@ export class EventProcessor { } const maxHeight = await lastTimestampFunc(); - if (!maxHeight) { + if (maxHeight === undefined) { throw new Error(`Cannot get the last processed timestamp via the provided getLastProcessedTimestamp()`); } diff --git a/libs/services/src/event-processor/index.ts b/libs/services/src/event-processor/index.ts new file mode 100644 index 0000000..d18b131 --- /dev/null +++ b/libs/services/src/event-processor/index.ts @@ -0,0 +1,2 @@ +export * from './event.processor'; +export * from './processor.service'; diff --git a/libs/services/src/event-processor/processor.service.ts b/libs/services/src/event-processor/processor.service.ts new file mode 100644 index 0000000..80685dc --- /dev/null +++ b/libs/services/src/event-processor/processor.service.ts @@ -0,0 +1,115 @@ +import { DynamicCollectionRepository } from "@libs/database/collections"; +import { EventProcessor, EventProcessorOptions } from "./event.processor"; +import { Locker } from "@multiversx/sdk-nestjs-common"; +import { Injectable } from "@nestjs/common"; +import { Cron, CronExpression } from "@nestjs/schedule"; +import { EventLog } from "apps/events-processor/src/processor/entities"; +import { HatomBorrowEventsService, HatomEnterMarketEventsService } from "../events"; + + +@Injectable() +export class ProcessorService { + constructor( + private readonly dynamicCollectionService: DynamicCollectionRepository, + private readonly hatomEnterMarketService: HatomEnterMarketEventsService, + private readonly hatomBorrowService: HatomBorrowEventsService, + ) { } + + @Cron(CronExpression.EVERY_10_SECONDS) + async handleHatomEnterMarketEvents() { + await Locker.lock('hatom-enter-market', async () => { + const eventProcessorOptions = new EventProcessorOptions({ + elasticUrl: 'https://index.multiversx.com', + eventIdentifiers: ['enterMarkets'], + emitterAddresses: ['erd1qqqqqqqqqqqqqpgqxp28qpnv7rfcmk6qrgxgw5uf2fnp84ar78ssqdk6hr'], + pageSize: 500, + getLastProcessedTimestamp: async () => { + return await this.dynamicCollectionService.getLastProcessedTimestamp('hatom-enter-market'); + }, + setLastProcessedTimestamp: async (nonce) => { + await this.dynamicCollectionService.setLastProcessedTimestamp('hatom-enter-market', nonce); + }, + onEventsReceived: async (highestTimestamp, events) => { + highestTimestamp; + await this.hatomEnterMarketService.hatomEnterMarketParser(events as EventLog[]); + }, + }); + + const eventProcessor = new EventProcessor(); + await eventProcessor.start(eventProcessorOptions); + }); + } + + + @Cron(CronExpression.EVERY_10_SECONDS) + async handleHatomBorrowEventsUSDT() { + await Locker.lock('hatom-borrow-USDT-f8c08c', async () => { + const eventProcessorOptions = new EventProcessorOptions({ + elasticUrl: 'https://index.multiversx.com', + eventIdentifiers: ['borrow'], + emitterAddresses: ['erd1qqqqqqqqqqqqqpgqkrgsvct7hfx7ru30mfzk3uy6pxzxn6jj78ss84aldu'], + pageSize: 500, + getLastProcessedTimestamp: async () => { + return await this.dynamicCollectionService.getLastProcessedTimestamp('hatom-borrow-USDT-f8c08c'); + }, + setLastProcessedTimestamp: async (nonce) => { + await this.dynamicCollectionService.setLastProcessedTimestamp('hatom-borrow-USDT-f8c08c', nonce); + }, + onEventsReceived: async (highestTimestamp, events) => { + highestTimestamp; + await this.hatomBorrowService.hatomBorrowParser(events as EventLog[], 'USDT-f8c08c'); + }, + }); + const eventProcessor = new EventProcessor(); + await eventProcessor.start(eventProcessorOptions); + }); + } + + @Cron(CronExpression.EVERY_10_SECONDS) + async handleHatomUsdcBorrowEventsUSDC() { + await Locker.lock('hatom-borrow-USDC-c76f1f', async () => { + const eventProcessorOptions = new EventProcessorOptions({ + elasticUrl: 'https://index.multiversx.com', + eventIdentifiers: ['borrow'], + emitterAddresses: ['erd1qqqqqqqqqqqqqpgqvxn0cl35r74tlw2a8d794v795jrzfxyf78sstg8pjr'], + pageSize: 500, + getLastProcessedTimestamp: async () => { + return await this.dynamicCollectionService.getLastProcessedTimestamp('hatom-borrow-USDC-c76f1f'); + }, + setLastProcessedTimestamp: async (nonce) => { + await this.dynamicCollectionService.setLastProcessedTimestamp('hatom-borrow-USDC-c76f1f', nonce); + }, + onEventsReceived: async (highestTimestamp, events) => { + highestTimestamp; + await this.hatomBorrowService.hatomBorrowParser(events as EventLog[], 'USDC-c76f1f'); + }, + }); + const eventProcessor = new EventProcessor(); + await eventProcessor.start(eventProcessorOptions); + }); + } + + @Cron(CronExpression.EVERY_10_SECONDS) + async handleHatomBorrowEventsWEGLD() { + await Locker.lock('hatom-borrow-WEGLD-bd4d79', async () => { + const eventProcessorOptions = new EventProcessorOptions({ + elasticUrl: 'https://index.multiversx.com', + eventIdentifiers: ['borrow'], + emitterAddresses: ['erd1qqqqqqqqqqqqqpgq35qkf34a8svu4r2zmfzuztmeltqclapv78ss5jleq3'], + pageSize: 500, + getLastProcessedTimestamp: async () => { + return await this.dynamicCollectionService.getLastProcessedTimestamp('hatom-borrow-WEGLD-bd4d79'); + }, + setLastProcessedTimestamp: async (nonce) => { + await this.dynamicCollectionService.setLastProcessedTimestamp('hatom-borrow-WEGLD-bd4d79', nonce); + }, + onEventsReceived: async (highestTimestamp, events) => { + highestTimestamp; + await this.hatomBorrowService.hatomBorrowParser(events as EventLog[], 'WEGLD-bd4d79'); + }, + }); + const eventProcessor = new EventProcessor(); + await eventProcessor.start(eventProcessorOptions); + }); + } +} diff --git a/libs/services/src/event-processor/test.ts b/libs/services/src/event-processor/test.ts deleted file mode 100644 index 57a1fd8..0000000 --- a/libs/services/src/event-processor/test.ts +++ /dev/null @@ -1,19 +0,0 @@ -import { EventProcessor, EventProcessorOptions } from './event.processor'; - -const eventsProcessor = new EventProcessor(); - -void eventsProcessor.start(new EventProcessorOptions({ - elasticUrl: 'https://index.multiversx.com', - eventIdentifiers: ['ESDTTransfer'], - emitterAddresses: ['erd1z08z0svvqs84dnrh5hrm47agcxmch79fslmupzgqcgfdtpp3slwqlna25a'], - pageSize: 3, - getLastProcessedTimestamp: async () => { - await Promise.resolve(); - return 1726700632; - }, - onEventsReceived: async (highestTimestamp, events) => { - await Promise.resolve(); - console.log(`onEventsReceived -> highestTimestamp: ${highestTimestamp}`); - console.log(`onEventsReceived -> events: ${JSON.stringify(events)}`); - }, -})); diff --git a/libs/services/src/events/hatom.borrow.events.service.ts b/libs/services/src/events/hatom.borrow.events.service.ts index 55a80b5..99671da 100644 --- a/libs/services/src/events/hatom.borrow.events.service.ts +++ b/libs/services/src/events/hatom.borrow.events.service.ts @@ -1,5 +1,5 @@ import { Injectable } from "@nestjs/common"; -import { EventLog } from "apps/api/src/endpoints/events/entities"; +import { EventLog } from "apps/events-processor/src/processor/entities"; import { Address } from "@multiversx/sdk-core"; import BigNumber from "bignumber.js"; import { CsvRecordsService } from "../records"; @@ -9,89 +9,91 @@ import { TableSchema } from "apps/dune-simulator/src/endpoints/dune-simulator/en import { joinCsvAttributes } from "libs/services/utils"; interface BorrowEvent { - eventName: string; - borrowerAddress: string; - amount: BigNumber; - newAccountBorrow: BigNumber; - newTotalBorrows: BigNumber; - newBorrowerIndex: BigNumber; + eventName: string; + borrowerAddress: string; + amount: BigNumber; + newAccountBorrow: BigNumber; + newTotalBorrows: BigNumber; + newBorrowerIndex: BigNumber; } @Injectable() export class HatomBorrowEventsService { - private readonly headers: TableSchema[] = [ - { name: 'borrower_address', type: 'varchar' }, - { name: 'timestamp', type: 'varchar' }, - { name: 'borrowed_amount', type: 'double' }, - { name: 'borrowed_amount_in_egld', type: 'double' }, - { name: 'borrowed_amount_in_usd', type: 'double' }, - { name: 'total_borrowed', type: 'double' }, - { name: 'account_borrowed', type: 'double' }, - { name: 'borrowed_token', type: 'varchar' }, - ]; - constructor( - private readonly csvRecordsService: CsvRecordsService, - private readonly dataService: DataService, - ) { } + private readonly headers: TableSchema[] = [ + { name: 'borrower_address', type: 'varchar' }, + { name: 'timestamp', type: 'varchar' }, + { name: 'borrowed_amount', type: 'double' }, + { name: 'borrowed_amount_in_egld', type: 'double' }, + { name: 'borrowed_amount_in_usd', type: 'double' }, + { name: 'total_borrowed', type: 'double' }, + { name: 'account_borrowed', type: 'double' }, + { name: 'borrowed_token', type: 'varchar' }, + ]; + constructor( + private readonly csvRecordsService: CsvRecordsService, + private readonly dataService: DataService, + ) { } - public async hatomBorrowWebhook(eventsLog: EventLog[], borrowedToken: string): Promise { + public async hatomBorrowParser(eventsLog: EventLog[], borrowedToken: string): Promise { - for (const eventLog of eventsLog) { - const borrowEventInHex = '626f72726f775f6576656e74'; // 'borrow_event' + for (const eventLog of eventsLog) { + const borrowEventInHex = '626f72726f775f6576656e74'; // 'borrow_event' + const borrowEventTopicsLength = 6; - if (eventLog.identifier === "borrow" && eventLog.topics[0] === borrowEventInHex) { - const currentEvent = this.decodeTopics(eventLog); - const eventDate = moment.unix(eventLog.timestamp); + if (eventLog.identifier === "borrow" && eventLog.topics.length === borrowEventTopicsLength && eventLog.topics[0] === borrowEventInHex) { + const currentEvent = this.decodeTopics(eventLog); - const [borrowedAmountInEGLD, borrowedAmountInUSD] = await this.convertBorrowedAmount(currentEvent, borrowedToken, eventDate); - const tokenPrecision = await this.dataService.getTokenPrecision(borrowedToken); + const eventDate = moment.unix(eventLog.timestamp); - await this.csvRecordsService.pushRecord( - `hatom_borrow_events`, - [ - joinCsvAttributes( - currentEvent.borrowerAddress, - eventDate.format('YYYY-MM-DD HH:mm:ss.SSS'), - currentEvent.amount.shiftedBy(-tokenPrecision).decimalPlaces(4), - borrowedAmountInEGLD.shiftedBy(-tokenPrecision).decimalPlaces(4), - borrowedAmountInUSD.shiftedBy(-tokenPrecision).decimalPlaces(4), - currentEvent.newTotalBorrows.shiftedBy(-tokenPrecision).decimalPlaces(4), - currentEvent.newAccountBorrow.shiftedBy(-tokenPrecision).decimalPlaces(4), - borrowedToken, - ), - ], - this.headers - ); - } - } + const [borrowedAmountInEGLD, borrowedAmountInUSD] = await this.convertBorrowedAmount(currentEvent, borrowedToken, eventDate); + const tokenPrecision = await this.dataService.getTokenPrecision(borrowedToken); + + await this.csvRecordsService.pushRecord( + `hatom_borrow_events`, + [ + joinCsvAttributes( + currentEvent.borrowerAddress, + eventDate.format('YYYY-MM-DD HH:mm:ss.SSS'), + currentEvent.amount.shiftedBy(-tokenPrecision).decimalPlaces(4), + borrowedAmountInEGLD.shiftedBy(-tokenPrecision).decimalPlaces(4), + borrowedAmountInUSD.shiftedBy(-tokenPrecision).decimalPlaces(4), + currentEvent.newTotalBorrows.shiftedBy(-tokenPrecision).decimalPlaces(4), + currentEvent.newAccountBorrow.shiftedBy(-tokenPrecision).decimalPlaces(4), + borrowedToken, + ), + ], + this.headers + ); + } } + } - decodeTopics(eventLog: EventLog): BorrowEvent { - const currentEvent: BorrowEvent = { - eventName: Buffer.from(eventLog.topics[0], 'hex').toString(), - borrowerAddress: Address.newFromHex(Buffer.from(eventLog.topics[1], 'hex').toString('hex')).toBech32(), - amount: BigNumber(Buffer.from(eventLog.topics[2], 'hex').toString('hex'), 16), - newAccountBorrow: BigNumber(Buffer.from(eventLog.topics[3], 'hex').toString('hex'), 16), - newTotalBorrows: BigNumber(Buffer.from(eventLog.topics[4], 'hex').toString('hex'), 16), - newBorrowerIndex: BigNumber(Buffer.from(eventLog.topics[5], 'hex').toString('hex'), 16), - }; + decodeTopics(eventLog: EventLog): BorrowEvent { + const currentEvent: BorrowEvent = { + eventName: Buffer.from(eventLog.topics[0], 'hex').toString(), + borrowerAddress: Address.newFromHex(Buffer.from(eventLog.topics[1], 'hex').toString('hex')).toBech32(), + amount: BigNumber(Buffer.from(eventLog.topics[2], 'hex').toString('hex'), 16), + newAccountBorrow: BigNumber(Buffer.from(eventLog.topics[3], 'hex').toString('hex'), 16), + newTotalBorrows: BigNumber(Buffer.from(eventLog.topics[4], 'hex').toString('hex'), 16), + newBorrowerIndex: BigNumber(Buffer.from(eventLog.topics[5], 'hex').toString('hex'), 16), + }; - return currentEvent; - } + return currentEvent; + } - async convertBorrowedAmount(currentEvent: BorrowEvent, borrowedToken: string, date: moment.Moment): Promise<[BigNumber, BigNumber]> { - let borrowedAmountInEGLD, borrowedAmountInUSD; + async convertBorrowedAmount(currentEvent: BorrowEvent, borrowedToken: string, date: moment.Moment): Promise<[BigNumber, BigNumber]> { + let borrowedAmountInEGLD, borrowedAmountInUSD; - const egldPrice = await this.dataService.getTokenPrice('WEGLD-bd4d79', date); - if (borrowedToken === 'WEGLD-bd4d79') { - borrowedAmountInEGLD = currentEvent.amount; - borrowedAmountInUSD = borrowedAmountInEGLD.multipliedBy(egldPrice); - } else { - borrowedAmountInUSD = currentEvent.amount; - borrowedAmountInEGLD = borrowedAmountInUSD.dividedBy(egldPrice); - } - return [borrowedAmountInEGLD, borrowedAmountInUSD]; + const egldPrice = await this.dataService.getTokenPrice('WEGLD-bd4d79', date); + if (borrowedToken === 'WEGLD-bd4d79') { + borrowedAmountInEGLD = currentEvent.amount; + borrowedAmountInUSD = borrowedAmountInEGLD.multipliedBy(egldPrice); + } else { + borrowedAmountInUSD = currentEvent.amount; + borrowedAmountInEGLD = borrowedAmountInUSD.dividedBy(egldPrice); } + return [borrowedAmountInEGLD, borrowedAmountInUSD]; + } } diff --git a/libs/services/src/events/hatom.enter.market.events.service.ts b/libs/services/src/events/hatom.enter.market.events.service.ts new file mode 100644 index 0000000..e899631 --- /dev/null +++ b/libs/services/src/events/hatom.enter.market.events.service.ts @@ -0,0 +1,133 @@ +import { Injectable } from "@nestjs/common"; +import { EventLog } from "apps/events-processor/src/processor/entities"; +import { Address } from "@multiversx/sdk-core"; +import BigNumber from "bignumber.js"; +import { CsvRecordsService } from "../records"; +import moment from "moment"; +import { DataService } from "../data"; +import { TableSchema } from "apps/dune-simulator/src/endpoints/dune-simulator/entities"; +import { joinCsvAttributes } from "libs/services/utils"; + +interface BorrowEvent { + eventName: string; + moneyMarket: string; + borrowerAddress: string; + amount: BigNumber; +} + +@Injectable() +export class HatomEnterMarketEventsService { + private readonly headers: TableSchema[] = [ + { name: 'timestamp', type: 'varchar' }, + { name: 'money_market', type: 'varchar' }, + { name: 'token_amount', type: 'double' }, + { name: 'token_id', type: 'varchar' }, + { name: 'borrower_address', type: 'varchar' }, + { name: 'value_in_egld', type: 'double' }, + { name: 'value_in_usd', type: 'double' }, + { name: 'total_value_in_egld', type: 'double' }, + { name: 'total_value_in_usd', type: 'double' }, + ]; + private readonly moneyMarketNotFound = "Money Market Not Found!"; + private totalValueInUsd: BigNumber = new BigNumber(0); + private totalValueInEgld: BigNumber = new BigNumber(0); + + constructor( + private readonly csvRecordsService: CsvRecordsService, + private readonly dataService: DataService, + ) { } + + public async hatomEnterMarketParser(eventsLog: EventLog[]): Promise { + + for (const eventLog of eventsLog) { + const enterMarketEventInHex = '656e7465725f6d61726b65745f6576656e74'; // 'enter_market_event' + const enterMarketTopicsLength = 4; + if (eventLog.identifier === "enterMarkets" && eventLog.topics.length === enterMarketTopicsLength && eventLog.topics[0] === enterMarketEventInHex) { + const currentEvent = this.decodeTopics(eventLog); + + const eventDate = moment.unix(eventLog.timestamp); + const tokenID = this.getTokenIdByMoneyMarket(currentEvent.moneyMarket); + if (tokenID === this.moneyMarketNotFound) { + continue; + } + + const [valueInEgld, valueInUsd] = await this.convertTokenValue(currentEvent, tokenID, eventDate); + const tokenPrecision = await this.dataService.getTokenPrecision(tokenID); + + await this.csvRecordsService.pushRecord( + `hatom_enter_market_events`, + [ + joinCsvAttributes( + eventDate.format('YYYY-MM-DD HH:mm:ss.SSS'), + currentEvent.moneyMarket, + currentEvent.amount.shiftedBy(-tokenPrecision).decimalPlaces(4), + tokenID, + currentEvent.borrowerAddress, + valueInEgld.shiftedBy(-tokenPrecision).decimalPlaces(4), + valueInUsd.shiftedBy(-tokenPrecision).decimalPlaces(4), + this.totalValueInEgld.shiftedBy(-tokenPrecision).decimalPlaces(4), + this.totalValueInUsd.shiftedBy(-tokenPrecision).decimalPlaces(4), + ), + ], + this.headers, + ); + } + } + } + + decodeTopics(eventLog: EventLog): BorrowEvent { + const currentEvent: BorrowEvent = { + eventName: Buffer.from(eventLog.topics[0], 'hex').toString(), + moneyMarket: Address.newFromHex(Buffer.from(eventLog.topics[1], 'hex').toString('hex')).toBech32(), + borrowerAddress: Address.newFromHex(Buffer.from(eventLog.topics[2], 'hex').toString('hex')).toBech32(), + amount: BigNumber(Buffer.from(eventLog.topics[3], 'hex').toString('hex'), 16), + }; + + return currentEvent; + } + + async convertTokenValue(currentEvent: BorrowEvent, tokenID: string, date: moment.Moment): Promise<[BigNumber, BigNumber]> { + const egldPrice = await this.dataService.getTokenPrice('WEGLD-bd4d79', date); + const tokenPrice = await this.dataService.getTokenPrice(tokenID, date, 'hatom'); + + const valueInUsd = currentEvent.amount.multipliedBy(tokenPrice); + const valueInEgld = valueInUsd.dividedBy(egldPrice); + + this.totalValueInUsd = this.totalValueInUsd.plus(valueInUsd); + this.totalValueInEgld = this.totalValueInEgld.plus(valueInEgld); + + return [valueInEgld, valueInUsd]; + } + + getTokenIdByMoneyMarket(moneyMarket: string) { + switch (moneyMarket) { + case 'erd1qqqqqqqqqqqqqpgqta0tv8d5pjzmwzshrtw62n4nww9kxtl278ssspxpxu': + return 'HUTK-4fa4b2'; + case 'erd1qqqqqqqqqqqqqpgqkrgsvct7hfx7ru30mfzk3uy6pxzxn6jj78ss84aldu': + return 'HUSDC-d80042'; + case 'erd1qqqqqqqqqqqqqpgqvxn0cl35r74tlw2a8d794v795jrzfxyf78sstg8pjr': + return 'HUSDT-6f0914'; + case 'erd1qqqqqqqqqqqqqpgqxmn4jlazsjp6gnec95423egatwcdfcjm78ss5q550k': + return 'HSEGLD-c13a4e'; + case 'erd1qqqqqqqqqqqqqpgq35qkf34a8svu4r2zmfzuztmeltqclapv78ss5jleq3': + return 'HEGLD-d61095'; + case 'erd1qqqqqqqqqqqqqpgqz9pvuz22qvqxfqpk6r3rluj0u2can55c78ssgcqs00': + return 'HWTAO-2e9136'; + case 'erd1qqqqqqqqqqqqqpgqxerzmkr80xc0qwa8vvm5ug9h8e2y7jgsqk2svevje0': + return 'HHTM-e03ba5'; + case 'erd1qqqqqqqqqqqqqpgq8h8upp38fe9p4ny9ecvsett0usu2ep7978ssypgmrs': + return 'HWETH-b3d17e'; + case 'erd1qqqqqqqqqqqqqpgqg47t8v5nwzvdxgf6g5jkxleuplu8y4f678ssfcg5gy': + return 'HWBTC-49ca31'; + case 'erd1qqqqqqqqqqqqqpgqdvrqup8k9mxvhvnc7cnzkcs028u95s5378ssr9d72p': + return 'HBUSD-ac1fca'; + case 'erd1qqqqqqqqqqqqqpgq7sspywe6e2ehy7dn5dz00ved3aa450mv78ssllmln6': + return 'HSWTAO-6df80c'; + default: + return this.moneyMarketNotFound; + } + } +} + + + diff --git a/libs/services/src/events/index.ts b/libs/services/src/events/index.ts index 7437fe5..e8d0503 100644 --- a/libs/services/src/events/index.ts +++ b/libs/services/src/events/index.ts @@ -1,2 +1,3 @@ export * from './liquidity.events.service'; export * from './hatom.borrow.events.service'; +export * from './hatom.enter.market.events.service'; diff --git a/libs/services/src/events/liquidity.events.service.ts b/libs/services/src/events/liquidity.events.service.ts index de6fd84..04e8228 100644 --- a/libs/services/src/events/liquidity.events.service.ts +++ b/libs/services/src/events/liquidity.events.service.ts @@ -1,5 +1,5 @@ import { Injectable } from "@nestjs/common"; -import { EventLog } from "apps/api/src/endpoints/events/entities"; +import { EventLog } from "apps/events-processor/src/processor/entities"; import moment from "moment"; import BigNumber from "bignumber.js"; import { AddLiquidityEvent, RemoveLiquidityEvent } from "@multiversx/sdk-exchange"; @@ -10,89 +10,89 @@ import { joinCsvAttributes } from "libs/services/utils"; @Injectable() export class LiquidityEventsService { - private lastFirstTokenReserves: { [key: string]: BigNumber } = {}; - private lastSecondTokenReserves: { [key: string]: BigNumber } = {}; - - private lastDate: { [key: string]: moment.Moment } = {}; - private readonly headers: TableSchema[] = [ - { name: 'timestamp', type: 'varchar' }, - { name: 'volumeusd', type: 'double' }, - ]; - constructor( - private readonly dataService: DataService, - private readonly csvRecordsService: CsvRecordsService, - ) { } - - public async liquidityWebhook(eventsLog: EventLog[]): Promise { - let currentEvent: AddLiquidityEvent | RemoveLiquidityEvent; - - for (const eventLog of eventsLog) { - // We need to parse an event only when we receive data from events-log-service - - // eventLog.topics = eventLog.topics.map((topic) => Buffer.from(topic, 'hex').toString('base64')); - // eventLog.data = Buffer.from(eventLog.data, 'hex').toString('base64'); - // eventLog.additionalData = eventLog.additionalData.map((data) => Buffer.from(data, 'hex').toString('base64')); - - switch (eventLog.identifier) { - case "addLiquidity": - currentEvent = new AddLiquidityEvent(eventLog); - break; - case "removeLiquidity": - currentEvent = new RemoveLiquidityEvent(eventLog); - break; - default: - continue; - } - - const firstTokenId = currentEvent.getFirstToken()?.tokenID ?? ""; - const secondTokenId = currentEvent.getSecondToken()?.tokenID ?? ""; - const csvFileName = `${firstTokenId}_${secondTokenId}`; - const eventDate = moment.unix(currentEvent.getTimestamp()?.toNumber() ?? 0); - - if (this.lastDate[csvFileName]) { - const diff = this.computeHoursDifference(eventDate, this.lastDate[csvFileName]); - - for (let i = 0; i < diff; i++) { - this.lastDate[csvFileName].add(1, 'hour').startOf('hour'); - const liquidity = await this.computeLiquidty(this.lastFirstTokenReserves[csvFileName], this.lastSecondTokenReserves[csvFileName], firstTokenId, secondTokenId, this.lastDate[csvFileName]); - await this.csvRecordsService.pushRecord( - csvFileName, - [ - joinCsvAttributes( - this.lastDate[csvFileName].format('YYYY-MM-DD HH:mm:ss.SSS'), - liquidity.decimalPlaces(4), - ), - ], - this.headers); - } - } - - this.lastFirstTokenReserves[csvFileName] = currentEvent.getFirstTokenReserves() ?? new BigNumber(0); - this.lastSecondTokenReserves[csvFileName] = currentEvent.getSecondTokenReserves() ?? new BigNumber(0); - this.lastDate[csvFileName] = eventDate; + private lastFirstTokenReserves: { [key: string]: BigNumber } = {}; + private lastSecondTokenReserves: { [key: string]: BigNumber } = {}; + + private lastDate: { [key: string]: moment.Moment } = {}; + private readonly headers: TableSchema[] = [ + { name: 'timestamp', type: 'varchar' }, + { name: 'volumeusd', type: 'double' }, + ]; + constructor( + private readonly dataService: DataService, + private readonly csvRecordsService: CsvRecordsService, + ) { } + + public async liquidityWebhook(eventsLog: EventLog[]): Promise { + let currentEvent: AddLiquidityEvent | RemoveLiquidityEvent; + + for (const eventLog of eventsLog) { + // We need to parse an event only when we receive data from events-log-service + + // eventLog.topics = eventLog.topics.map((topic) => Buffer.from(topic, 'hex').toString('base64')); + // eventLog.data = Buffer.from(eventLog.data, 'hex').toString('base64'); + // eventLog.additionalData = eventLog.additionalData.map((data) => Buffer.from(data, 'hex').toString('base64')); + + switch (eventLog.identifier) { + case "addLiquidity": + currentEvent = new AddLiquidityEvent(eventLog); + break; + case "removeLiquidity": + currentEvent = new RemoveLiquidityEvent(eventLog); + break; + default: + continue; + } + + const firstTokenId = currentEvent.getFirstToken()?.tokenID ?? ""; + const secondTokenId = currentEvent.getSecondToken()?.tokenID ?? ""; + const csvFileName = `${firstTokenId}_${secondTokenId}`; + const eventDate = moment.unix(currentEvent.getTimestamp()?.toNumber() ?? 0); + + if (this.lastDate[csvFileName]) { + const diff = this.computeHoursDifference(eventDate, this.lastDate[csvFileName]); + + for (let i = 0; i < diff; i++) { + this.lastDate[csvFileName].add(1, 'hour').startOf('hour'); + const liquidity = await this.computeLiquidty(this.lastFirstTokenReserves[csvFileName], this.lastSecondTokenReserves[csvFileName], firstTokenId, secondTokenId, this.lastDate[csvFileName]); + await this.csvRecordsService.pushRecord( + csvFileName, + [ + joinCsvAttributes( + this.lastDate[csvFileName].format('YYYY-MM-DD HH:mm:ss.SSS'), + liquidity.decimalPlaces(4), + ), + ], + this.headers); } - } + } - private computeHoursDifference(currentDate: moment.Moment, previousDate: moment.Moment): number { - let diff = currentDate.diff(previousDate, 'hours'); + this.lastFirstTokenReserves[csvFileName] = currentEvent.getFirstTokenReserves() ?? new BigNumber(0); + this.lastSecondTokenReserves[csvFileName] = currentEvent.getSecondTokenReserves() ?? new BigNumber(0); + this.lastDate[csvFileName] = eventDate; + } + } - if (previousDate.minutes() > currentDate.minutes()) { - diff++; - } + private computeHoursDifference(currentDate: moment.Moment, previousDate: moment.Moment): number { + let diff = currentDate.diff(previousDate, 'hours'); - return diff; + if (previousDate.minutes() > currentDate.minutes()) { + diff++; } - private async computeLiquidty(firstTokenReserves: BigNumber, secondTokenReserves: BigNumber, firstTokenId: string, secondTokenId: string, date: moment.Moment): Promise { - const firstTokenPrice = await this.dataService.getTokenPrice(firstTokenId, date); - const secondTokenPrice = await this.dataService.getTokenPrice(secondTokenId, date); + return diff; + } - const firstTokenPrecision = await this.dataService.getTokenPrecision(firstTokenId); - const secondTokenPrecision = await this.dataService.getTokenPrecision(secondTokenId); + private async computeLiquidty(firstTokenReserves: BigNumber, secondTokenReserves: BigNumber, firstTokenId: string, secondTokenId: string, date: moment.Moment): Promise { + const firstTokenPrice = await this.dataService.getTokenPrice(firstTokenId, date); + const secondTokenPrice = await this.dataService.getTokenPrice(secondTokenId, date); - const firstTokenReservePrice = firstTokenReserves.multipliedBy(firstTokenPrice).shiftedBy(-firstTokenPrecision); - const secondTokenReservePrice = secondTokenReserves.multipliedBy(secondTokenPrice).shiftedBy(-secondTokenPrecision); + const firstTokenPrecision = await this.dataService.getTokenPrecision(firstTokenId); + const secondTokenPrecision = await this.dataService.getTokenPrecision(secondTokenId); - return firstTokenReservePrice.plus(secondTokenReservePrice); - } + const firstTokenReservePrice = firstTokenReserves.multipliedBy(firstTokenPrice).shiftedBy(-firstTokenPrecision); + const secondTokenReservePrice = secondTokenReserves.multipliedBy(secondTokenPrice).shiftedBy(-secondTokenPrecision); + + return firstTokenReservePrice.plus(secondTokenReservePrice); + } } diff --git a/libs/services/src/records/csv-records.service.ts b/libs/services/src/records/csv-records.service.ts index 9b41ed9..e523af0 100644 --- a/libs/services/src/records/csv-records.service.ts +++ b/libs/services/src/records/csv-records.service.ts @@ -4,7 +4,7 @@ import { CacheInfo } from '@libs/common'; import { RedlockService } from '@multiversx/sdk-nestjs-cache'; import { TableSchema } from 'apps/dune-simulator/src/endpoints/dune-simulator/entities'; import { toSnakeCase } from 'libs/services/utils'; - +import '@multiversx/sdk-nestjs-common/lib/utils/extensions/string.extensions'; @Injectable() export class CsvRecordsService { private csvRecords: Record = {}; diff --git a/libs/services/src/services.module.ts b/libs/services/src/services.module.ts index 758fb50..a0d2cfe 100644 --- a/libs/services/src/services.module.ts +++ b/libs/services/src/services.module.ts @@ -1,10 +1,12 @@ import { Global, Module } from '@nestjs/common'; import { DatabaseModule } from '@libs/database'; import { DynamicModuleUtils } from '@libs/common'; -import { HatomBorrowEventsService, LiquidityEventsService } from './events'; +import { HatomBorrowEventsService, HatomEnterMarketEventsService, LiquidityEventsService } from './events'; import { DataService } from './data'; import { DuneSenderService } from './dune-sender'; import { CsvRecordsService } from './records'; +import { EventProcessor } from './event-processor/event.processor'; +import { ProcessorService } from './event-processor/processor.service'; @Global() @Module({ @@ -19,6 +21,9 @@ import { CsvRecordsService } from './records'; DuneSenderService, CsvRecordsService, HatomBorrowEventsService, + HatomEnterMarketEventsService, + EventProcessor, + ProcessorService, ], exports: [ LiquidityEventsService, @@ -26,6 +31,9 @@ import { CsvRecordsService } from './records'; DuneSenderService, CsvRecordsService, HatomBorrowEventsService, + HatomEnterMarketEventsService, + EventProcessor, + ProcessorService, ], }) export class ServicesModule { } diff --git a/nest-cli.json b/nest-cli.json index fc27df8..f431576 100644 --- a/nest-cli.json +++ b/nest-cli.json @@ -1,24 +1,24 @@ { "$schema": "https://json.schemastore.org/nest-cli", "collection": "@nestjs/schematics", - "sourceRoot": "apps/api/src", + "sourceRoot": "apps/events-processor/src", "compilerOptions": { "webpack": false, "plugins": [ "@nestjs/swagger" ], - "tsConfigPath": "apps/api/tsconfig.app.json" + "tsConfigPath": "apps/events-processor/tsconfig.app.json" }, "monorepo": true, - "root": "apps/api", + "root": "apps/events-processor", "projects": { - "api": { + "events-processor": { "type": "application", - "root": "apps/api", + "root": "apps/events-processor", "entryFile": "main", - "sourceRoot": "apps/api/src", + "sourceRoot": "apps/events-processor/src", "compilerOptions": { - "tsConfigPath": "apps/api/tsconfig.app.json", + "tsConfigPath": "apps/events-processor/tsconfig.app.json", "assets": [ { "include": "../config/config.yaml", @@ -30,7 +30,7 @@ }, { "include": "../docs/swagger.md", - "outDir": "./dist/apps/api/docs" + "outDir": "./dist/apps/events-processor/docs" } ] } diff --git a/package.json b/package.json index 7908ff1..a68597d 100644 --- a/package.json +++ b/package.json @@ -11,7 +11,7 @@ "build:all": "nest build api && nest build dune-simulator && nest build common", "build:infra": "npm run build:all && npm run copy-config-infra", "format": "prettier --write \"src/**/*.ts\" \"test/**/*.ts\"", - "start:api": "cross-env NODE_APP=api npm run start", + "start:events-processor": "cross-env NODE_APP=events-processor npm run start", "start:dune-simulator": "cross-env NODE_APP=dune-simulator npm run start", "start": "npm run copy-config && npm run nest-start", "copy-config-infra": "mkdir -p dist/config && cp .multiversx/config/config.yaml dist/config/config.yaml && cp config/schema.yaml dist/config/schema.yaml",