Skip to content

Commit

Permalink
Fix
Browse files Browse the repository at this point in the history
  • Loading branch information
benjackwhite committed Jan 19, 2025
1 parent e9a3ce7 commit 4a6575c
Showing 1 changed file with 7 additions and 11 deletions.
18 changes: 7 additions & 11 deletions plugin-server/src/ingestion/ingestion-consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { Histogram } from 'prom-client'

import { BatchConsumer, startBatchConsumer } from '../kafka/batch-consumer'
import { createRdConnectionConfigFromEnvVars } from '../kafka/config'
import { KafkaProducerWrapper } from '../kafka/producer'
import { IngestionOverflowMode } from '../main/ingestion-queues/batch-processing/each-batch-ingestion'
import { ingestionOverflowingMessagesTotal } from '../main/ingestion-queues/batch-processing/metrics'
import { addSentryBreadcrumbsEventListeners } from '../main/ingestion-queues/kafka-metrics'
Expand All @@ -15,8 +16,6 @@ import {
} from '../main/ingestion-queues/metrics'
import { runInstrumentedFunction } from '../main/utils'
import { Hub, PipelineEvent, PluginServerService } from '../types'
import { createKafkaProducerWrapper } from '../utils/db/hub'
import { KafkaProducerWrapper } from '../utils/db/kafka-producer-wrapper'
import { normalizeEvent } from '../utils/event'
import { retryIfRetriable } from '../utils/retries'
import { status } from '../utils/status'
Expand Down Expand Up @@ -53,10 +52,10 @@ const KNOWN_SET_EVENTS = new Set([
abstract class IngestionConsumer {
batchConsumer?: BatchConsumer
isStopping = false
protected kafkaProducer?: KafkaProducerWrapper
protected abstract name: string
protected heartbeat = () => {}
protected promises: Set<Promise<any>> = new Set()
protected kafkaProducer?: KafkaProducerWrapper

protected scheduleWork<T>(promise: Promise<T>): Promise<T> {
this.promises.add(promise)
Expand Down Expand Up @@ -101,7 +100,7 @@ abstract class IngestionConsumer {
}): Promise<void> {
this.batchConsumer = await startBatchConsumer({
...options,
connectionConfig: createRdConnectionConfigFromEnvVars(this.hub),
connectionConfig: createRdConnectionConfigFromEnvVars(this.hub, 'consumer'),
autoCommit: true,
sessionTimeout: this.hub.KAFKA_CONSUMPTION_SESSION_TIMEOUT_MS,
maxPollIntervalMs: this.hub.KAFKA_CONSUMPTION_MAX_POLL_INTERVAL_MS,
Expand Down Expand Up @@ -148,13 +147,10 @@ abstract class IngestionConsumer {
}

public async start(): Promise<void> {
// NOTE: This is only for starting shared services
await Promise.all([
createKafkaProducerWrapper(this.hub).then((producer) => {
this.kafkaProducer = producer
this.kafkaProducer.producer.connect()
}),
])
await KafkaProducerWrapper.create(this.hub).then((producer) => {
this.kafkaProducer = producer
this.kafkaProducer.producer.connect()
})
}

public async stop(): Promise<void> {
Expand Down

0 comments on commit 4a6575c

Please sign in to comment.