diff --git a/python/hopsworks_common/decorators.py b/python/hopsworks_common/decorators.py index c04904ed8..4f3a7798b 100644 --- a/python/hopsworks_common/decorators.py +++ b/python/hopsworks_common/decorators.py @@ -99,7 +99,7 @@ def g(*args, **kwds): return g -def uses_kafka(f): +def uses_confluent_kafka(f): @functools.wraps(f) def g(*args, **kwds): if not HAS_CONFLUENT_KAFKA: diff --git a/python/hsfs/core/kafka_engine.py b/python/hsfs/core/kafka_engine.py index a180855aa..b3bac7509 100644 --- a/python/hsfs/core/kafka_engine.py +++ b/python/hsfs/core/kafka_engine.py @@ -22,7 +22,7 @@ from hopsworks_common import client from hopsworks_common.core.constants import HAS_NUMPY -from hopsworks_common.decorators import uses_kafka +from hopsworks_common.decorators import uses_confluent_kafka from hsfs.core import storage_connector_api from hsfs.core.constants import ( HAS_AVRO, @@ -55,7 +55,7 @@ from hsfs.feature_group import ExternalFeatureGroup, FeatureGroup -@uses_kafka +@uses_confluent_kafka def init_kafka_consumer( feature_store_id: int, offline_write_options: Dict[str, Any], @@ -122,7 +122,7 @@ def _init_kafka_resources( return producer, headers, feature_writers, writer -@uses_kafka +@uses_confluent_kafka def init_kafka_producer( feature_store_id: int, offline_write_options: Dict[str, Any], @@ -131,7 +131,7 @@ def init_kafka_producer( return Producer(get_kafka_config(feature_store_id, offline_write_options)) -@uses_kafka +@uses_confluent_kafka def kafka_get_offsets( topic_name: str, feature_store_id: int, @@ -259,7 +259,7 @@ def get_kafka_config( return config -@uses_kafka +@uses_confluent_kafka def build_ack_callback_and_optional_progress_bar( n_rows: int, is_multi_part_insert: bool, offline_write_options: Dict[str, Any] ) -> Tuple[Callable, Optional[tqdm]]: