diff --git a/providers/src/airflow/providers/apache/kafka/hooks/consume.py b/providers/src/airflow/providers/apache/kafka/hooks/consume.py index 9e0d0f2454b6..8996e72b6191 100644 --- a/providers/src/airflow/providers/apache/kafka/hooks/consume.py +++ b/providers/src/airflow/providers/apache/kafka/hooks/consume.py @@ -23,6 +23,18 @@ from airflow.providers.apache.kafka.hooks.base import KafkaBaseHook +class KafkaAuthenticationError(Exception): + """Custom exception for Kafka authentication failures.""" + + pass + + +def error_callback(err): + """Handle kafka errors.""" + print("Exception received: ", err) + raise KafkaAuthenticationError(f"Authentication failed: {err}") + + class KafkaConsumerHook(KafkaBaseHook): """ A hook for creating a Kafka Consumer. @@ -36,6 +48,7 @@ def __init__(self, topics: Sequence[str], kafka_config_id=KafkaBaseHook.default_ self.topics = topics def _get_client(self, config) -> Consumer: + config["error_cb"] = error_callback return Consumer(config) def get_consumer(self) -> Consumer: