diff --git a/hexkit/providers/akafka/provider.py b/hexkit/providers/akafka/provider.py index 44b467df..eb2ccfca 100644 --- a/hexkit/providers/akafka/provider.py +++ b/hexkit/providers/akafka/provider.py @@ -23,9 +23,9 @@ import json import logging -from contextlib import asynccontextmanager import ssl -from typing import Any, Callable, Protocol, TypeVar, Optional +from contextlib import asynccontextmanager +from typing import Any, Callable, Optional, Protocol, TypeVar from aiokafka import AIOKafkaConsumer, AIOKafkaProducer from aiokafka.helpers import create_ssl_context @@ -74,7 +74,7 @@ class KafkaConfig(BaseSettings): security_protocol: Literal["PLAINTEXT", "SSL"] = Field( "PLAINTEXT", description="Protocol used to communicate with brokers. " - + "Valid values are: PLAINTEXT, SSL." + + "Valid values are: PLAINTEXT, SSL.", ) ssl_cafile: str = Field( None, @@ -83,7 +83,7 @@ class KafkaConfig(BaseSettings): cafile, capath, cadata) default system CA will be used if found by OpenSSL. For more information see :meth:`~ssl.SSLContext.load_verify_locations`. - Default: :data:`None`""" + Default: :data:`None`""", ) ssl_certfile: str = Field( None, @@ -91,20 +91,20 @@ class KafkaConfig(BaseSettings): the client certificate, as well as any CA certificates needed to establish the certificate's authenticity. For more information see :meth:`~ssl.SSLContext.load_cert_chain`. - Default: :data:`None`.""" + Default: :data:`None`.""", ) ssl_keyfile: str = Field( None, description=""""optional filename containing the client private key. For more information see :meth:`~ssl.SSLContext.load_cert_chain`. - Default: :data:`None`.""" + Default: :data:`None`.""", ) ssl_password: str = Field( None, description="""optional password to be used when loading the certificate chain. For more information see :meth:`~ssl.SSLContext.load_cert_chain`. - Default: :data:`None`.""" + Default: :data:`None`.""", ) @@ -119,19 +119,22 @@ def generate_client_id(*, service_name: str, instance_id: str) -> str: """ return f"{service_name}.{instance_id}" + def generate_ssl_context(config: KafkaConfig) -> Optional[ssl.SSLContext]: """ Generate ssl_context for connecting to Kafka broker via an encrypted SSL connection """ - if config.security_protocol == "SSL": - return create_ssl_context( - cafile=config.ssl_cafile, # CA used to sign certificate. - # `CARoot` of JKS store container - certfile=config.ssl_certfile, # Signed certificate - keyfile=config.ssl_keyfile, # Private Key file of `certfile` certificate - password=config.ssl_password + return ( + create_ssl_context( + cafile=config.ssl_cafile, + certfile=config.ssl_certfile, + keyfile=config.ssl_keyfile, + password=config.ssl_password, ) - return None + if config.security_protocol == "SSL" + else None + ) + class KafkaProducerCompatible(Protocol): """A python duck type protocol describing an AIOKafkaProducer or equivalent.""" @@ -140,6 +143,8 @@ def __init__( self, *, bootstrap_servers: str, + security_protocol: str, + ssl_context: Optional[ssl.SSLContext], client_id: str, key_serializer: Callable[[Any], bytes], value_serializer: Callable[[Any], bytes], @@ -273,6 +278,8 @@ def __init__( self, *topics: Ascii, bootstrap_servers: str, + security_protocol: str, + ssl_context: Optional[ssl.SSLContext], client_id: str, group_id: str, auto_offset_reset: Literal["earliest"],