Skip to content

Commit

Permalink
fix: consumer init 시점 변경
Browse files Browse the repository at this point in the history
  • Loading branch information
jcy0308 committed Aug 29, 2024
1 parent c58b0f6 commit af03ca0
Showing 1 changed file with 6 additions and 5 deletions.
11 changes: 6 additions & 5 deletions src/kafka/kafka.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ import {
OnModuleInit,
OnModuleDestroy,
InternalServerErrorException,
OnApplicationBootstrap,
} from '@nestjs/common';
import { Kafka, EachMessagePayload, Consumer } from 'kafkajs';
import { PuppeteerService } from '../puppeteer/puppeteer.service';
import { MinioService } from '../minio/minio.service';

@Injectable()
export class KafkaService implements OnModuleInit, OnModuleDestroy {
export class KafkaService implements OnApplicationBootstrap, OnModuleDestroy {
private kafka: Kafka;
private consumer: Consumer;
private topic = process.env.TOPIC;
Expand All @@ -28,15 +29,15 @@ export class KafkaService implements OnModuleInit, OnModuleDestroy {
password: process.env.SASL_PASSWORD,
},
});
}

async onModuleInit() {
this.consumer = this.kafka.consumer({
groupId: process.env.CONSUMER_GROUP,
});
await this.consumer.connect();
await this.consumer.subscribe({ topic: this.topic, fromBeginning: false });
this.consumer.connect();
}

async onApplicationBootstrap() {
await this.consumer.subscribe({ topic: this.topic, fromBeginning: false });
await this.consumer
.run({
autoCommit: false,
Expand Down

0 comments on commit af03ca0

Please sign in to comment.