diff --git a/src/kafka/kafka.service.ts b/src/kafka/kafka.service.ts index fe1d17c..34a2425 100644 --- a/src/kafka/kafka.service.ts +++ b/src/kafka/kafka.service.ts @@ -10,7 +10,7 @@ import { PuppeteerService } from '../puppeteer/puppeteer.service'; import { MinioService } from '../minio/minio.service'; @Injectable() -export class KafkaService implements OnApplicationBootstrap, OnModuleDestroy { +export class KafkaService implements OnModuleInit, OnModuleDestroy { private kafka: Kafka; private consumer: Consumer; private topic = process.env.TOPIC; @@ -27,7 +27,6 @@ export class KafkaService implements OnApplicationBootstrap, OnModuleDestroy { 'kafka-controller-2.kafka-controller-headless.kafka.svc.cluster.local:9092', ], sasl: { - // mechanism: `process.env.SASL_MECHANISM`, mechanism: 'scram-sha-256', username: process.env.SASL_USER, password: process.env.SASL_PASSWORD, @@ -39,9 +38,11 @@ export class KafkaService implements OnApplicationBootstrap, OnModuleDestroy { }); } - async onApplicationBootstrap() { + async onModuleInit() { await this.consumer.connect(); + console.log('connected'); await this.consumer.subscribe({ topic: this.topic, fromBeginning: false }); + console.log('subsribed'); await this.consumer .run({ autoCommit: false, diff --git a/src/main.ts b/src/main.ts index 63be9be..1af218f 100644 --- a/src/main.ts +++ b/src/main.ts @@ -4,6 +4,6 @@ import { AppModule } from './app.module'; async function bootstrap() { const app = await NestFactory.create(AppModule); app.enableShutdownHooks(); - await app.listen(5000); + await app.listen(4000); } bootstrap();