diff --git a/src/kafka/kafka.service.ts b/src/kafka/kafka.service.ts index fb7872e..d29c3d1 100644 --- a/src/kafka/kafka.service.ts +++ b/src/kafka/kafka.service.ts @@ -3,13 +3,14 @@ import { OnModuleInit, OnModuleDestroy, InternalServerErrorException, + OnApplicationBootstrap, } from '@nestjs/common'; import { Kafka, EachMessagePayload, Consumer } from 'kafkajs'; import { PuppeteerService } from '../puppeteer/puppeteer.service'; import { MinioService } from '../minio/minio.service'; @Injectable() -export class KafkaService implements OnModuleInit, OnModuleDestroy { +export class KafkaService implements OnApplicationBootstrap, OnModuleDestroy { private kafka: Kafka; private consumer: Consumer; private topic = process.env.TOPIC; @@ -28,15 +29,15 @@ export class KafkaService implements OnModuleInit, OnModuleDestroy { password: process.env.SASL_PASSWORD, }, }); - } - async onModuleInit() { this.consumer = this.kafka.consumer({ groupId: process.env.CONSUMER_GROUP, }); - await this.consumer.connect(); - await this.consumer.subscribe({ topic: this.topic, fromBeginning: false }); + this.consumer.connect(); + } + async onApplicationBootstrap() { + await this.consumer.subscribe({ topic: this.topic, fromBeginning: false }); await this.consumer .run({ autoCommit: false,