Skip to content

Commit

Permalink
create new app transactions-processor
Browse files Browse the repository at this point in the history
  • Loading branch information
GuticaStefan committed Sep 19, 2024
1 parent 06bdad5 commit f3c1ad8
Show file tree
Hide file tree
Showing 14 changed files with 219 additions and 2 deletions.
4 changes: 4 additions & 0 deletions .multiversx/config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ apps:
port: 3001
privatePort: 4001
useCachingInterceptor: true
transactionsProcessor:
port: 3002
privatePort: 4002
useCachingInterceptor: true
libs:
common:
network: ${NETWORK}
Expand Down
28 changes: 28 additions & 0 deletions apps/transactions-processor/src/app.module.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import { Module } from '@nestjs/common';

Check failure on line 1 in apps/transactions-processor/src/app.module.ts

View workflow job for this annotation

GitHub Actions / build (16.x)

File is not of any known element type
import { ApiMetricsController, CommonConfigModule, DynamicModuleUtils, HealthCheckController } from '@libs/common';
import { ApiMetricsModule } from '@libs/common';
import { LoggingModule } from '@multiversx/sdk-nestjs-common';
import { AppConfigModule } from './config/app-config.module';
import { ScheduleModule } from '@nestjs/schedule';
import { ProcessorService } from './processor/processor.service';
import { DatabaseModule } from '@libs/database';

@Module({
imports: [
LoggingModule,
ApiMetricsModule,
AppConfigModule,
CommonConfigModule,
ScheduleModule.forRoot(),
DynamicModuleUtils.getCachingModule(),
DatabaseModule,
],
providers: [
ProcessorService,
],
controllers: [
ApiMetricsController,
HealthCheckController,
],
})
export class AppModule { }
13 changes: 13 additions & 0 deletions apps/transactions-processor/src/config/app-config.module.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import { Global, Module } from "@nestjs/common";

Check failure on line 1 in apps/transactions-processor/src/config/app-config.module.ts

View workflow job for this annotation

GitHub Actions / build (16.x)

File is not of any known element type
import { AppConfigService } from "./app-config.service";

@Global()
@Module({
providers: [
AppConfigService,
],
exports: [
AppConfigService,
],
})
export class AppConfigModule { }
7 changes: 7 additions & 0 deletions apps/transactions-processor/src/config/app-config.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import { configuration } from "@libs/common/config/configuration";

Check failure on line 1 in apps/transactions-processor/src/config/app-config.service.ts

View workflow job for this annotation

GitHub Actions / build (16.x)

File is not of any known element type
import { Injectable } from "@nestjs/common";

@Injectable()
export class AppConfigService {
readonly config = configuration().apps.transactionsProcessor;
}
44 changes: 44 additions & 0 deletions apps/transactions-processor/src/main.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import * as dotenv from 'dotenv';

Check failure on line 1 in apps/transactions-processor/src/main.ts

View workflow job for this annotation

GitHub Actions / build (16.x)

File is not of any known element type
import { resolve } from 'path';

// Determine which .env file to load based on NODE_ENV
const envPath = process.env.NODE_ENV === 'infra' ? '.env' : `.env.${process.env.NODE_ENV ?? 'mainnet'}`;
dotenv.config({
path: resolve(process.cwd(), envPath),
});

import 'module-alias/register';
import { NestFactory } from '@nestjs/core';
import { CommonConfigService, PubSubListenerModule } from '@libs/common';
import { AppModule } from './app.module';
import { MicroserviceOptions, Transport } from '@nestjs/microservices';
import { WINSTON_MODULE_NEST_PROVIDER } from 'nest-winston';
import { AppConfigService } from './config/app-config.service';

async function bootstrap() {
const transactionProcessorApp = await NestFactory.create(AppModule);
const appConfigService = transactionProcessorApp.get<AppConfigService>(AppConfigService);
const commonConfigService = transactionProcessorApp.get<CommonConfigService>(CommonConfigService);

await transactionProcessorApp.listen(appConfigService.config.port);

const pubSubApp = await NestFactory.createMicroservice<MicroserviceOptions>(
PubSubListenerModule.forRoot(),
{
transport: Transport.REDIS,
options: {
host: commonConfigService.config.redis.host,
port: commonConfigService.config.redis.port,
retryAttempts: 100,
retryDelay: 1000,
retryStrategy: () => 1000,
},
},
);
pubSubApp.useLogger(pubSubApp.get(WINSTON_MODULE_NEST_PROVIDER));
// eslint-disable-next-line @typescript-eslint/no-floating-promises
pubSubApp.listen();
}

// eslint-disable-next-line @typescript-eslint/no-floating-promises
bootstrap();
46 changes: 46 additions & 0 deletions apps/transactions-processor/src/processor/processor.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import { Locker } from "@multiversx/sdk-nestjs-common";

Check failure on line 1 in apps/transactions-processor/src/processor/processor.service.ts

View workflow job for this annotation

GitHub Actions / build (16.x)

File is not of any known element type
import { Injectable } from "@nestjs/common";
import { Cron, CronExpression } from "@nestjs/schedule";

import { DynamicCollectionRepository } from "@libs/database/collections";
import { EventProcessor, EventProcessorOptions } from "@libs/services/event-processor/event.processor";

@Injectable()
export class ProcessorService {
private eventsProcessor: EventProcessor = new EventProcessor();
// private readonly logger: Logger;

constructor(
private readonly dynamicCollectionService: DynamicCollectionRepository,
// private readonly commonConfigService: CommonConfigService,
// private readonly appConfigService: AppConfigService,
) {
// this.logger = new Logger(ProcessorService.name);
}

@Cron(CronExpression.EVERY_10_SECONDS)
async handleNewTransactions() {
await Locker.lock('newTransactions', async () => {
await this.eventsProcessor.start(new EventProcessorOptions({
elasticUrl: 'https://index.multiversx.com',
eventIdentifiers: ['ESDTTransfer'],
emitterAddresses: ['erd1z08z0svvqs84dnrh5hrm47agcxmch79fslmupzgqcgfdtpp3slwqlna25a'],
pageSize: 3,
getLastProcessedTimestamp: async () => {
const result = await this.dynamicCollectionService.getLastProcessedTimestamp()

Check failure on line 30 in apps/transactions-processor/src/processor/processor.service.ts

View workflow job for this annotation

GitHub Actions / build (16.x)

Missing semicolon
console.log(result);
return result;
},
setLastProcessedTimestamp: async (nonce) => {
await this.dynamicCollectionService.setLastProcessedTimestamp(nonce);
},
onEventsReceived: async (highestTimestamp, events) => {
await Promise.resolve();
console.log(`onEventsReceived -> highestTimestamp: ${highestTimestamp}`);
console.log(`onEventsReceived -> events: ${JSON.stringify(events)}`);
},
}));
})

Check failure on line 43 in apps/transactions-processor/src/processor/processor.service.ts

View workflow job for this annotation

GitHub Actions / build (16.x)

Missing semicolon
}

}
16 changes: 16 additions & 0 deletions apps/transactions-processor/tsconfig.app.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
"extends": "../../tsconfig.json",
"compilerOptions": {
"declaration": false,
"outDir": "../../dist"
},
"include": [
"src/**/*"
],
"exclude": [
"node_modules",
"dist",
"test",
"**/*spec.ts"
]
}
4 changes: 4 additions & 0 deletions config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ apps:
port: 3001
privatePort: 4001
useCachingInterceptor: true
transactionsProcessor:
port: 3002
privatePort: 4002
useCachingInterceptor: true
libs:
common:
network: ${NETWORK}
Expand Down
4 changes: 4 additions & 0 deletions config/schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ apps:
port: integer
privatePort: integer
useCachingInterceptor: boolean
transactionsProcessor:
port: integer
privatePort: integer
useCachingInterceptor: boolean
libs:
common:
network:
Expand Down
4 changes: 4 additions & 0 deletions libs/common/src/entities/config.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ export interface Config {
privatePort: number;
useCachingInterceptor: boolean;
};
transactionsProcessor: {
port: number;
maxLookBehind: number;
};
};
libs: {
common: {
Expand Down
29 changes: 28 additions & 1 deletion libs/database/src/collections/dynamic.collection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ export class DynamicCollectionRepository {
return collectionDocuments;
}


private mapToMongooseType(fieldType: string) {
switch (fieldType.toLowerCase()) {
case 'varchar':
Expand All @@ -67,5 +66,33 @@ export class DynamicCollectionRepository {
throw new HttpException(`Unsupported field type: ${fieldType}`, HttpStatus.BAD_REQUEST);
}
}

public async setLastProcessedTimestamp(nonce: number) {
const collectionName = `last_processed_nonce`;
const existingCollections = await this.connection.db.listCollections({ name: collectionName }).toArray();
if (existingCollections.length === 0) {
const schema = new Schema({
shardId: { type: Number, required: true },
nonce: { type: Number, required: true },
});
this.connection.model(collectionName, schema);
}
const dynamicModel = this.connection.db.collection(collectionName);
await dynamicModel.updateOne({ $set: { nonce } }, { upsert: true });
}

public async getLastProcessedTimestamp() {
const collectionName = `last_processed_nonce`;
const existingCollections = await this.connection.db.listCollections({ name: collectionName }).toArray();
if (existingCollections.length === 0) {
return 0;
}
const dynamicModel = this.connection.db.collection(collectionName);
const lastProcessedNonce = await dynamicModel.findOne();
if (!lastProcessedNonce) {
return 0;
}
return lastProcessedNonce.nonce;
}
}

2 changes: 1 addition & 1 deletion libs/services/src/event-processor/event.processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ export class EventProcessor {
}

const maxHeight = await lastTimestampFunc();
if (!maxHeight) {
if (maxHeight === undefined) {
throw new Error(`Cannot get the last processed timestamp via the provided getLastProcessedTimestamp()`);
}

Expand Down
19 changes: 19 additions & 0 deletions nest-cli.json
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,25 @@
]
}
},
"transactions-processor": {
"type": "application",
"root": "apps/transactions-processor",
"entryFile": "main",
"sourceRoot": "apps/transactions-processor/src",
"compilerOptions": {
"tsConfigPath": "apps/transactions-processor/tsconfig.app.json",
"assets": [
{
"include": "../config/config.yaml",
"outDir": "./dist/config"
},
{
"include": "../config/schema.yaml",
"outDir": "./dist/config"
}
]
}
},
"common": {
"type": "library",
"root": "libs/common",
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
"format": "prettier --write \"src/**/*.ts\" \"test/**/*.ts\"",
"start:api": "cross-env NODE_APP=api npm run start",
"start:dune-simulator": "cross-env NODE_APP=dune-simulator npm run start",
"start:transactions-processor": "cross-env NODE_APP=transactions-processor npm run start",
"start": "npm run copy-config && npm run nest-start",
"copy-config-infra": "mkdir -p dist/config && cp .multiversx/config/config.yaml dist/config/config.yaml && cp config/schema.yaml dist/config/schema.yaml",
"copy-config": "mkdir -p ./apps/$NODE_APP/config && cp ./config/config.yaml ./apps/$NODE_APP/config/config.yaml && cp ./config/schema.yaml ./apps/$NODE_APP/config/schema.yaml",
Expand Down

0 comments on commit f3c1ad8

Please sign in to comment.