Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix error message when kafka is not installed #441

Merged
merged 5 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions python/hopsworks_common/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
import os

from hopsworks_common.core.constants import (
HAS_CONFLUENT_KAFKA,
HAS_GREAT_EXPECTATIONS,
HAS_POLARS,
confluent_kafka_not_installed_message,
great_expectations_not_installed_message,
polars_not_installed_message,
)
Expand Down Expand Up @@ -96,3 +98,12 @@ def g(*args, **kwds):
return f(*args, **kwds)

return g

def uses_confluent_kafka(f):
@functools.wraps(f)
def g(*args, **kwds):
if not HAS_CONFLUENT_KAFKA:
raise ModuleNotFoundError(confluent_kafka_not_installed_message)
return f(*args, **kwds)

return g
26 changes: 21 additions & 5 deletions python/hsfs/core/kafka_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,26 @@
from io import BytesIO
from typing import TYPE_CHECKING, Any, Callable, Dict, Literal, Optional, Tuple, Union

import pandas as pd
from hopsworks_common import client
from hopsworks_common.core.constants import HAS_NUMPY
from hopsworks_common.core.constants import (
HAS_AVRO,
HAS_CONFLUENT_KAFKA,
HAS_FAST_AVRO,
HAS_NUMPY,
HAS_PANDAS,
avro_not_installed_message,
)
from hopsworks_common.decorators import uses_confluent_kafka
from hsfs.core import storage_connector_api
from hsfs.core.constants import HAS_AVRO, HAS_CONFLUENT_KAFKA, HAS_FAST_AVRO
from tqdm import tqdm


if HAS_NUMPY:
import numpy as np

if HAS_PANDAS:
import pandas as pd

if HAS_CONFLUENT_KAFKA:
from confluent_kafka import Consumer, KafkaError, Producer, TopicPartition

Expand All @@ -46,6 +55,7 @@
from hsfs.feature_group import ExternalFeatureGroup, FeatureGroup


@uses_confluent_kafka
def init_kafka_consumer(
feature_store_id: int,
offline_write_options: Dict[str, Any],
Expand Down Expand Up @@ -112,6 +122,7 @@ def _init_kafka_resources(
return producer, headers, feature_writers, writer


@uses_confluent_kafka
def init_kafka_producer(
feature_store_id: int,
offline_write_options: Dict[str, Any],
Expand All @@ -120,6 +131,7 @@ def init_kafka_producer(
return Producer(get_kafka_config(feature_store_id, offline_write_options))


@uses_confluent_kafka
def kafka_get_offsets(
topic_name: str,
feature_store_id: int,
Expand Down Expand Up @@ -192,6 +204,9 @@ def get_encoder_func(writer_schema: str) -> callable:
parsed_schema = parse_schema(schema)
return lambda record, outf: schemaless_writer(outf, parsed_schema, record)

if not HAS_AVRO:
raise ModuleNotFoundError(avro_not_installed_message)

parsed_schema = avro.schema.parse(writer_schema)
writer = avro.io.DatumWriter(parsed_schema)
return lambda record, outf: writer.write(record, avro.io.BinaryEncoder(outf))
Expand All @@ -207,11 +222,11 @@ def encode_row(complex_feature_writers, writer, row):
# for avro to be able to serialize them, they need to be python data types
if HAS_NUMPY and isinstance(row[k], np.ndarray):
row[k] = row[k].tolist()
if isinstance(row[k], pd.Timestamp):
if HAS_PANDAS and isinstance(row[k], pd.Timestamp):
row[k] = row[k].to_pydatetime()
if isinstance(row[k], datetime) and row[k].tzinfo is None:
row[k] = row[k].replace(tzinfo=timezone.utc)
if isinstance(row[k], pd._libs.missing.NAType):
if HAS_PANDAS and isinstance(row[k], pd._libs.missing.NAType):
row[k] = None
# encode complex features
row = encode_complex_features(complex_feature_writers, row)
Expand Down Expand Up @@ -244,6 +259,7 @@ def get_kafka_config(
return config


@uses_confluent_kafka
def build_ack_callback_and_optional_progress_bar(
n_rows: int, is_multi_part_insert: bool, offline_write_options: Dict[str, Any]
) -> Tuple[Callable, Optional[tqdm]]:
Expand Down
3 changes: 2 additions & 1 deletion python/tests/core/test_kafka_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
#
import importlib

from hopsworks_common.core import constants
from hsfs import storage_connector
from hsfs.core import constants, kafka_engine
from hsfs.core import kafka_engine


if constants.HAS_CONFLUENT_KAFKA:
Expand Down
Loading