From 93734131e947ffcb4e25c0bd9daab356e1f9f73b Mon Sep 17 00:00:00 2001 From: chanyeong Date: Fri, 30 Aug 2024 18:15:50 +0900 Subject: [PATCH] fix: fromBeginning true --- src/kafka/kafka.service.ts | 37 +++++++++++++------------------ src/minio/minio-client.service.ts | 2 +- 2 files changed, 17 insertions(+), 22 deletions(-) diff --git a/src/kafka/kafka.service.ts b/src/kafka/kafka.service.ts index 52cb1f4..0c74561 100644 --- a/src/kafka/kafka.service.ts +++ b/src/kafka/kafka.service.ts @@ -36,7 +36,7 @@ export class KafkaService implements OnModuleInit, OnModuleDestroy { async onModuleInit() { await this.consumer.connect(); console.log('connected'); - await this.consumer.subscribe({ topic: this.topic, fromBeginning: false }); + await this.consumer.subscribe({ topic: this.topic, fromBeginning: true }); console.log('subsribed'); await this.consumer .run({ @@ -46,27 +46,22 @@ export class KafkaService implements OnModuleInit, OnModuleDestroy { console.info( `partition: ${partition}, messageOffset: ${message.offset}`, ); - const uuid = message.key.toString(); const htmlContent = message.value.toString(); - try { - const pdf = await this.puppeteerService.generatePDF(htmlContent); - await this.minioService.uploadPdf( - uuid, - Buffer.from(pdf), - message.headers, - ); - // 성공 시 커밋 - // throw new BadGatewayException('일부러 에러 발생'); - await this.consumer.commitOffsets([ - { - topic, - partition, - offset: (BigInt(message.offset) + BigInt(1)).toString(), - }, - ]); - } catch (e) { - throw new InternalServerErrorException(`[consumer] ${e.message}`); - } + const pdf = await this.puppeteerService.generatePDF(htmlContent); + await this.minioService.uploadPdf( + message.headers.uuid, + Buffer.from(pdf), + message.headers, + ); + // 성공 시 커밋 + // throw new BadGatewayException('일부러 에러 발생'); + await this.consumer.commitOffsets([ + { + topic, + partition, + offset: (BigInt(message.offset) + BigInt(1)).toString(), + }, + ]); }, }) .catch((e) => console.error(`[example/consumer] ${e.message}`, e)); diff --git a/src/minio/minio-client.service.ts b/src/minio/minio-client.service.ts index 0ec3b3a..c1dbb39 100644 --- a/src/minio/minio-client.service.ts +++ b/src/minio/minio-client.service.ts @@ -15,7 +15,7 @@ export class MinioClientService { this.bucketName = this.configService.get('MINIO_BUCKET_NAME'); } - async uploadPdf(fileName: string, pdfBuffer: Buffer, headers): Promise { + async uploadPdf(fileName, pdfBuffer: Buffer, headers): Promise { const metaData = { 'Content-Type': 'application/pdf', createdAt: new Date().toUTCString(),