Skip to content

Commit

Permalink
change kafka broker settings
Browse files Browse the repository at this point in the history
  • Loading branch information
csuvajit committed Apr 7, 2024
1 parent 725aa36 commit 780985e
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 13 deletions.
16 changes: 9 additions & 7 deletions apps/service-auth/src/app.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ import { KafkaConsumerModule, KafkaProducerModule } from '@app/kafka';
import { MongoDbModule } from '@app/mongodb';
import { RedisModule } from '@app/redis';
import { Module } from '@nestjs/common';
import { ConfigModule } from '@nestjs/config';
import { ConfigModule, ConfigService } from '@nestjs/config';
import { logLevel } from 'kafkajs';
import { AuthModule } from './auth/auth.module';
import { ClansModule } from './clans/clans.module';
import { ConsumerModule } from './consumer/consumer.module';
import { GuildsModule } from './guilds/guilds.module';
import { LinksModule } from './links/links.module';
import { PlayersModule } from './players/players.module';
Expand All @@ -21,31 +22,32 @@ import { PlayersModule } from './players/players.module';
LinksModule,
PlayersModule,
KafkaProducerModule.forRootAsync({
useFactory() {
useFactory: (configService: ConfigService) => {
return {
kafkaConfig: {
clientId: 'kafka-client-id',
brokers: ['localhost:9092'],
brokers: [configService.getOrThrow('KAFKA_BROKER')],
logLevel: logLevel.NOTHING,
},
producerConfig: {},
};
},
inject: [],
inject: [ConfigService],
}),
KafkaConsumerModule.forRootAsync({
useFactory() {
useFactory: (configService: ConfigService) => {
return {
kafkaConfig: {
clientId: 'kafka-client-id',
brokers: ['localhost:9092'],
brokers: [configService.getOrThrow('KAFKA_BROKER')],
logLevel: logLevel.NOTHING,
},
consumerConfig: { groupId: 'kafka-consumer-group' },
};
},
inject: [],
inject: [ConfigService],
}),
ConsumerModule,
],
controllers: [],
providers: [],
Expand Down
7 changes: 7 additions & 0 deletions apps/service-auth/src/consumer/consumer.module.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import { Module } from '@nestjs/common';
import { ConsumerService } from './consumer.service';

@Module({
providers: [ConsumerService],
})
export class ConsumerModule {}
28 changes: 28 additions & 0 deletions apps/service-auth/src/consumer/consumer.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import { KAFKA_CONSUMER } from '@app/kafka';
import { Inject, Injectable } from '@nestjs/common';
import { Consumer } from 'kafkajs';

enum LogType {
CLAN_LEVEL_CHANGE = 'clan_level_change',
CLAN_WAR_LEAGUE_CHANGE = 'clan_war_league_change',
CAPITAL_LEAGUE_CHANGE = 'capital_league_change',
CLAN_MEMBER_CHANGE = 'clan_member_change',
}

@Injectable()
export class ConsumerService {
constructor(@Inject(KAFKA_CONSUMER) private consumer: Consumer) {}

async onModuleInit() {
this.consumer.subscribe({ topics: Object.values(LogType), fromBeginning: true });

await this.consumer.run({
eachMessage: async ({ message, topic }) => {
console.log({
topic,
value: message.value?.toString(),
});
},
});
}
}
12 changes: 6 additions & 6 deletions apps/service-clans/src/service-clans.module.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import { ClashClientModule } from '@app/clash-client';
import { KafkaProducerModule } from '@app/kafka';
import { MongoDbModule } from '@app/mongodb';
import { RedisModule } from '@app/redis';
import { Module } from '@nestjs/common';
import { ConfigModule } from '@nestjs/config';
import { ConfigModule, ConfigService } from '@nestjs/config';
import { logLevel } from 'kafkajs';
import { ServiceClansController } from './service-clans.controller';
import { ClansService } from './service-clans.service';
import { KafkaProducerModule } from '@app/kafka';
import { logLevel } from 'kafkajs';

@Module({
imports: [
Expand All @@ -15,17 +15,17 @@ import { logLevel } from 'kafkajs';
RedisModule,
ClashClientModule,
KafkaProducerModule.forRootAsync({
useFactory() {
useFactory: (configService: ConfigService) => {
return {
kafkaConfig: {
clientId: 'kafka-client-id',
brokers: ['localhost:9092'],
brokers: [configService.getOrThrow('KAFKA_BROKER')],
logLevel: logLevel.NOTHING,
},
producerConfig: {},
};
},
inject: [],
inject: [ConfigService],
}),
],
controllers: [ServiceClansController],
Expand Down
1 change: 1 addition & 0 deletions libs/mongodb/src/mongodb.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ export class MongoDbService {
.aggregate<TrackedClanList>([
{
$match: {
active: true,
paused: false,
},
},
Expand Down

0 comments on commit 780985e

Please sign in to comment.