Skip to content

Commit

Permalink
Implement initial kafka logging
Browse files Browse the repository at this point in the history
  • Loading branch information
TheByronHimes committed Nov 15, 2023
1 parent 4c05b00 commit ddf0c35
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 14 deletions.
2 changes: 1 addition & 1 deletion src/hexkit/custom_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@


# A type indicating that a string should be ascii-compatible.
# Technically it is an alias for `str` so it only serves documention purposes.
# Technically it is an alias for `str` so it only serves documentation purposes.
Ascii = str


Expand Down
88 changes: 75 additions & 13 deletions src/hexkit/providers/akafka/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@
import logging
import ssl
from contextlib import asynccontextmanager
from typing import Any, Callable, Optional, Protocol, TypeVar
from pathlib import Path
from typing import Any, Callable, Literal, Optional, Protocol, TypeVar

from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
from aiokafka.helpers import create_ssl_context
Expand All @@ -37,11 +38,6 @@
from hexkit.protocols.eventpub import EventPublisherProtocol
from hexkit.protocols.eventsub import EventSubscriberProtocol

try: # workaround for https://github.com/pydantic/pydantic/issues/5821
from typing_extensions import Literal
except ImportError:
from typing import Literal # type: ignore

__all__ = [
"KafkaConfig",
"KafkaEventPublisher",
Expand Down Expand Up @@ -95,6 +91,58 @@ class KafkaConfig(BaseSettings):
"",
description="Optional password to be used for the client private key.",
)
kafka_log_output_filename: Optional[str] = Field(
"",
examples=["kafka.log"],
description="Name of file used to capture log output. Leave blank to write "
+ "to standard output.",
)
kafka_log_output_mode: str = Field(
"a",
examples=["w", "a"],
description="Mode to use for logging to file, such as 'w', 'a', etc. "
+ "Has no effect if `kafka_log_output_filename` is empty.",
)
kafka_log_format: str = Field(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s",
description="Format string for the log messages.",
)
kafka_log_level: int = Field(
logging.INFO,
examples=[logging.INFO, logging.WARNING, logging.CRITICAL],
description="Threshold level for logging. Only logs of this level and higher "
+ "will be captured.",
)


def get_configured_logger(*, config: KafkaConfig, name: str) -> logging.Logger:
"""Produce a Kafka-specific logger according to KafkaConfig."""
logger = logging.getLogger(name=name)

# logger objects are singletons, so don't repeat configuration.
configured = getattr(logger, "configured", False)
if configured:
return logger

logger.setLevel(config.kafka_log_level)

formatter = logging.Formatter(fmt=config.kafka_log_format)
if config.kafka_log_output_filename:
output_filename = Path(config.kafka_log_output_filename)
file_handler = logging.FileHandler(
filename=output_filename,
mode=config.kafka_log_output_mode,
encoding="utf-8",
)
file_handler.setFormatter(fmt=formatter)
logger.addHandler(file_handler)
else:
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(fmt=formatter)
logger.addHandler(stream_handler)

setattr(logger, "configured", True) # noqa: B010
return logger


class EventTypeNotFoundError(RuntimeError):
Expand Down Expand Up @@ -198,23 +246,28 @@ async def construct(
"ascii"
),
)

log = get_configured_logger(config=config, name=client_id)

try:
await producer.start()
yield cls(producer=producer)
yield cls(producer=producer, log=log)
finally:
await producer.stop()

def __init__(
self,
*,
producer: KafkaProducerCompatible,
log: logging.Logger,
):
"""Please do not call directly! Should be called by the `construct` method.
Args:
producer:
hands over a started AIOKafkaProducer.
"""
self._producer = producer
self._log = log

async def _publish_validated(
self, *, payload: JsonObject, type_: Ascii, key: Ascii, topic: Ascii
Expand All @@ -232,6 +285,7 @@ async def _publish_validated(
await self._producer.send_and_wait(
topic, key=key, value=payload, headers=event_headers
)
self._log.info("published event type: '%s'", event_headers[0][1])


class ConsumerEvent(Protocol):
Expand Down Expand Up @@ -352,16 +406,23 @@ async def construct(
event_value.decode("ascii")
),
)

log = get_configured_logger(config=config, name=client_id)

try:
await consumer.start()
yield cls(consumer=consumer, translator=translator)
yield cls(consumer=consumer, translator=translator, log=log)
finally:
await consumer.stop()

# pylint: disable=too-many-arguments
# (some arguments are only used for testing)
def __init__(
self, *, consumer: KafkaConsumerCompatible, translator: EventSubscriberProtocol
self,
*,
consumer: KafkaConsumerCompatible,
translator: EventSubscriberProtocol,
log: logging.Logger,
):
"""Please do not call directly! Should be called by the `construct` method.
Args:
Expand All @@ -375,6 +436,7 @@ def __init__(
self._consumer = consumer
self._translator = translator
self._types_whitelist = translator.types_of_interest
self._log = log

@staticmethod
def _get_event_label(event: ConsumerEvent) -> str:
Expand All @@ -391,26 +453,26 @@ async def _consume_event(self, event: ConsumerEvent) -> None:
try:
type_ = get_event_type(event)
except EventTypeNotFoundError:
logging.warning("Ignored an event without type: %s", event_label)
self._log.warning("Ignored an event without type: '%s'", event_label)
return

if type_ in self._types_whitelist:
logging.info('Consuming event of type "%s": %s', type_, event_label)
self._log.info("Consuming event of type '%s': %s", type_, event_label)

try:
# blocks until event processing is completed:
await self._translator.consume(
payload=event.value, type_=type_, topic=event.topic
)
except Exception:
logging.error(
self._log.error(
"A fatal error occurred while processing the event: %s",
event_label,
)
raise

else:
logging.info("Ignored event of type %s: %s", type_, event_label)
self._log.info("Ignored event of type '%s': %s", type_, event_label)

async def run(self, forever: bool = True) -> None:
"""
Expand Down

0 comments on commit ddf0c35

Please sign in to comment.