diff --git a/eventbus/kafka/sensor/kafka_sensor.go b/eventbus/kafka/sensor/kafka_sensor.go index 593419cf8d..1f39403701 100644 --- a/eventbus/kafka/sensor/kafka_sensor.go +++ b/eventbus/kafka/sensor/kafka_sensor.go @@ -132,6 +132,13 @@ func (s *KafkaSensor) Initialize() error { return err } + // producer is at risk of deadlocking if Errors channel isn't read. + go func() { + for err := range producer.Errors() { + s.Logger.Errorf("Kafka producer error", zap.Error(err)) + } + }() + s.client = client s.consumer = consumer s.kafkaHandler = &KafkaHandler{