Skip to content

Commit

Permalink
Fix error message when kafka is not installed
Browse files Browse the repository at this point in the history
  • Loading branch information
aversey committed Dec 17, 2024
1 parent c25ef25 commit 85d10c9
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 4 deletions.
11 changes: 11 additions & 0 deletions python/hopsworks_common/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
import os

from hopsworks_common.core.constants import (
HAS_CONFLUENT_KAFKA,
HAS_GREAT_EXPECTATIONS,
HAS_POLARS,
confluent_kafka_not_installed_message,
great_expectations_not_installed_message,
polars_not_installed_message,
)
Expand Down Expand Up @@ -96,3 +98,12 @@ def g(*args, **kwds):
return f(*args, **kwds)

return g

def uses_kafka(f):
@functools.wraps(f)
def g(*args, **kwds):
if not HAS_CONFLUENT_KAFKA:
raise ModuleNotFoundError(confluent_kafka_not_installed_message)
return f(*args, **kwds)

return g
24 changes: 20 additions & 4 deletions python/hsfs/core/kafka_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,26 @@
from io import BytesIO
from typing import TYPE_CHECKING, Any, Callable, Dict, Literal, Optional, Tuple, Union

import pandas as pd
from hopsworks_common import client
from hopsworks_common.core.constants import HAS_NUMPY
from hopsworks_common.decorators import uses_kafka
from hsfs.core import storage_connector_api
from hsfs.core.constants import HAS_AVRO, HAS_CONFLUENT_KAFKA, HAS_FAST_AVRO
from hsfs.core.constants import (
HAS_AVRO,
HAS_CONFLUENT_KAFKA,
HAS_FAST_AVRO,
HAS_PANDAS,
avro_not_installed_message,
)
from tqdm import tqdm


if HAS_NUMPY:
import numpy as np

if HAS_PANDAS:
import pandas as pd

if HAS_CONFLUENT_KAFKA:
from confluent_kafka import Consumer, KafkaError, Producer, TopicPartition

Expand All @@ -46,6 +55,7 @@
from hsfs.feature_group import ExternalFeatureGroup, FeatureGroup


@uses_kafka
def init_kafka_consumer(
feature_store_id: int,
offline_write_options: Dict[str, Any],
Expand Down Expand Up @@ -112,6 +122,7 @@ def _init_kafka_resources(
return producer, headers, feature_writers, writer


@uses_kafka
def init_kafka_producer(
feature_store_id: int,
offline_write_options: Dict[str, Any],
Expand All @@ -120,6 +131,7 @@ def init_kafka_producer(
return Producer(get_kafka_config(feature_store_id, offline_write_options))


@uses_kafka
def kafka_get_offsets(
topic_name: str,
feature_store_id: int,
Expand Down Expand Up @@ -192,6 +204,9 @@ def get_encoder_func(writer_schema: str) -> callable:
parsed_schema = parse_schema(schema)
return lambda record, outf: schemaless_writer(outf, parsed_schema, record)

if not HAS_AVRO:
raise ModuleNotFoundError(avro_not_installed_message)

parsed_schema = avro.schema.parse(writer_schema)
writer = avro.io.DatumWriter(parsed_schema)
return lambda record, outf: writer.write(record, avro.io.BinaryEncoder(outf))
Expand All @@ -207,11 +222,11 @@ def encode_row(complex_feature_writers, writer, row):
# for avro to be able to serialize them, they need to be python data types
if HAS_NUMPY and isinstance(row[k], np.ndarray):
row[k] = row[k].tolist()
if isinstance(row[k], pd.Timestamp):
if HAS_PANDAS and isinstance(row[k], pd.Timestamp):
row[k] = row[k].to_pydatetime()
if isinstance(row[k], datetime) and row[k].tzinfo is None:
row[k] = row[k].replace(tzinfo=timezone.utc)
if isinstance(row[k], pd._libs.missing.NAType):
if HAS_PANDAS and isinstance(row[k], pd._libs.missing.NAType):
row[k] = None
# encode complex features
row = encode_complex_features(complex_feature_writers, row)
Expand Down Expand Up @@ -244,6 +259,7 @@ def get_kafka_config(
return config


@uses_kafka
def build_ack_callback_and_optional_progress_bar(
n_rows: int, is_multi_part_insert: bool, offline_write_options: Dict[str, Any]
) -> Tuple[Callable, Optional[tqdm]]:
Expand Down

0 comments on commit 85d10c9

Please sign in to comment.