Skip to content

Commit

Permalink
Fix comments
Browse files Browse the repository at this point in the history
  • Loading branch information
walzph committed Oct 2, 2023
1 parent 2d677e9 commit b040384
Showing 1 changed file with 22 additions and 15 deletions.
37 changes: 22 additions & 15 deletions hexkit/providers/akafka/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@

import json
import logging
from contextlib import asynccontextmanager
import ssl
from typing import Any, Callable, Protocol, TypeVar, Optional
from contextlib import asynccontextmanager
from typing import Any, Callable, Optional, Protocol, TypeVar

from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
from aiokafka.helpers import create_ssl_context
Expand Down Expand Up @@ -74,7 +74,7 @@ class KafkaConfig(BaseSettings):
security_protocol: Literal["PLAINTEXT", "SSL"] = Field(
"PLAINTEXT",
description="Protocol used to communicate with brokers. "
+ "Valid values are: PLAINTEXT, SSL."
+ "Valid values are: PLAINTEXT, SSL.",
)
ssl_cafile: str = Field(
None,
Expand All @@ -83,28 +83,28 @@ class KafkaConfig(BaseSettings):
cafile, capath, cadata) default system CA will be used if found by
OpenSSL. For more information see
:meth:`~ssl.SSLContext.load_verify_locations`.
Default: :data:`None`"""
Default: :data:`None`""",
)
ssl_certfile: str = Field(
None,
description="""optional filename of file in PEM format containing
the client certificate, as well as any CA certificates needed to
establish the certificate's authenticity. For more information see
:meth:`~ssl.SSLContext.load_cert_chain`.
Default: :data:`None`."""
Default: :data:`None`.""",
)
ssl_keyfile: str = Field(
None,
description=""""optional filename containing the client private key.
For more information see :meth:`~ssl.SSLContext.load_cert_chain`.
Default: :data:`None`."""
Default: :data:`None`.""",
)
ssl_password: str = Field(
None,
description="""optional password to be used when loading the
certificate chain. For more information see
:meth:`~ssl.SSLContext.load_cert_chain`.
Default: :data:`None`."""
Default: :data:`None`.""",
)


Expand All @@ -119,19 +119,22 @@ def generate_client_id(*, service_name: str, instance_id: str) -> str:
"""
return f"{service_name}.{instance_id}"


def generate_ssl_context(config: KafkaConfig) -> Optional[ssl.SSLContext]:
"""
Generate ssl_context for connecting to Kafka broker via an encrypted SSL connection
"""
if config.security_protocol == "SSL":
return create_ssl_context(
cafile=config.ssl_cafile, # CA used to sign certificate.
# `CARoot` of JKS store container
certfile=config.ssl_certfile, # Signed certificate
keyfile=config.ssl_keyfile, # Private Key file of `certfile` certificate
password=config.ssl_password
return (
create_ssl_context(
cafile=config.ssl_cafile,
certfile=config.ssl_certfile,
keyfile=config.ssl_keyfile,
password=config.ssl_password,
)
return None
if config.security_protocol == "SSL"
else None
)


class KafkaProducerCompatible(Protocol):
"""A python duck type protocol describing an AIOKafkaProducer or equivalent."""
Expand All @@ -140,6 +143,8 @@ def __init__(
self,
*,
bootstrap_servers: str,
security_protocol: str,
ssl_context: Optional[ssl.SSLContext],
client_id: str,
key_serializer: Callable[[Any], bytes],
value_serializer: Callable[[Any], bytes],
Expand Down Expand Up @@ -273,6 +278,8 @@ def __init__(
self,
*topics: Ascii,
bootstrap_servers: str,
security_protocol: str,
ssl_context: Optional[ssl.SSLContext],
client_id: str,
group_id: str,
auto_offset_reset: Literal["earliest"],
Expand Down

0 comments on commit b040384

Please sign in to comment.