Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add SSL Support to Kafka Provider Configuration #66

Merged
merged 29 commits into from
Nov 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
a0081f3
Add security_protocol, ssl_context to KafkaConfig
Sep 13, 2023
9dc45e1
Change field type and unravel ssl_context fields
Sep 20, 2023
df3a8e3
Fix comments
walzph Oct 2, 2023
d487be4
update to template
KerstenBreuer Oct 6, 2023
1085ed7
update to template
KerstenBreuer Oct 7, 2023
731170f
fix mypy
KerstenBreuer Oct 7, 2023
2cd8641
WIP test
dontseyit Nov 2, 2023
77da1d5
Add ca, server certs for ssl testing
Nov 2, 2023
9b1b55a
Update gitignore from template
Cito Nov 7, 2023
e9042ce
Revert server cert creation
Cito Nov 7, 2023
263de33
Add tests for Kafka SSL access
Cito Nov 7, 2023
5122baa
Fix init script location
Cito Nov 7, 2023
52ffff9
Remove duplicate steps
Cito Nov 7, 2023
385a7f9
Add ports and step names in test workflow
Cito Nov 7, 2023
4a73bed
Add delete topic enable setting
Cito Nov 7, 2023
148aa7a
Make tests work from outside the docker container
Cito Nov 8, 2023
379b3e5
Fix docker-compose options
Cito Nov 8, 2023
b05a98d
Add a prefix to kafka config settings
Cito Nov 8, 2023
a783b8a
Rename certificates to make more sense
Cito Nov 8, 2023
d8a049e
Use test container for testing Kafka with SSL
Cito Nov 9, 2023
3717266
Update from template
Cito Nov 9, 2023
35d4eba
Update lock files
Cito Nov 9, 2023
12bf063
Fix tests when not running in docker
Cito Nov 9, 2023
80ced22
Use latest docker image for kafka everywhere
Cito Nov 9, 2023
d9ff85f
Bump version
Cito Nov 9, 2023
576e4ef
Point to GitHub issue that we work around
Cito Nov 10, 2023
1e7036e
Rename module with test container
Cito Nov 10, 2023
af44944
Update pyproject.toml
Cito Nov 10, 2023
b3e09d5
Recreate lock files
Cito Nov 10, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 10 additions & 12 deletions .devcontainer/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,23 @@ services:
user: vscode

zookeeper:
image: confluentinc/cp-zookeeper:7.3.1
# used ports: 2181
image: confluentinc/cp-zookeeper:7.5.1
environment:
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000

kafka:
image: confluentinc/cp-server:7.3.1
image: confluentinc/cp-server:7.5.1
restart: always
depends_on:
- zookeeper
# used ports: 9092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_DELETE_TOPIC_ENABLE: "true"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
Expand All @@ -55,9 +54,8 @@ services:
- kafka
- zookeeper
environment:
- KAFKA_CLUSTERS_0_NAME=local
- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:9092
- KAFKA_CLUSTERS_0_ZOOKEEPER=zookeeper:2181
# used ports: 8080
# The port is automatically forwarded to localhost.
# Please see the "PORTS" panel for details.
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
ports:
- 8080:8080
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,12 @@ dmypy.json
# ignore VS Code settings:
.vscode/

# key stores
*.key
*.rnd
.keystore
.ssl/

# desktop settings and thumbnails
.DS_Store
desktop.ini
Expand Down
1 change: 0 additions & 1 deletion .static_files_ignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
.github/workflows/check_openapi_spec.yaml
.github/workflows/check_readme.yaml
.github/workflows/cd.yaml
.github/workflows/dev_cd.yaml

scripts/script_utils/fastapi_app_location.py

Expand Down
7 changes: 4 additions & 3 deletions examples/stream_calc/sc_tests/integration/test_event_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@
from testcontainers.kafka import KafkaContainer

from hexkit.custom_types import JsonObject
from hexkit.providers.akafka.testcontainer import DEFAULT_IMAGE as KAFKA_IMAGE
from stream_calc.config import Config
from stream_calc.main import main

DEFAULT_CONFIG = Config()
DEFAULT_CONFIG = Config() # type: ignore


class Event(NamedTuple):
Expand Down Expand Up @@ -200,14 +201,14 @@ async def test_receive_calc_publish(cases: list[Case] = deepcopy(CASES)):
the results.
"""

with KafkaContainer() as kafka:
with KafkaContainer(image=KAFKA_IMAGE) as kafka:
kafka_server = kafka.get_bootstrap_server()

submit_test_problems(cases, kafka_server=kafka_server)

# run the stream_calc app:
# (for each problem separately to avoid running forever)
config = Config(kafka_servers=[kafka_server])
config = Config(kafka_servers=[kafka_server]) # type: ignore
for _ in cases:
await main(config=config, run_forever=False)

Expand Down
4 changes: 3 additions & 1 deletion examples/stream_calc/stream_calc/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ def get_container(config: Config) -> Container:
return container


async def main(*, config: Config = Config(), run_forever: bool = True) -> None:
async def main(
*, config: Config = Config(), run_forever: bool = True # type: ignore
) -> None:
"""
Coroutine to run the stream calculator.
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "hexkit"
version = "0.11.1"
version = "1.0.0"
description = "A Toolkit for Building Microservices using the Hexagonal Architecture"
readme = "README.md"
authors = [
Expand Down
1 change: 1 addition & 0 deletions requirements-dev.in
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@
-r requirements-dev-common.in

# additional requirements can be listed her
cryptography >= 41
85 changes: 84 additions & 1 deletion requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# This file is autogenerated by pip-compile with Python 3.9
# by the following command:
#
# pip-compile --all-extras --generate-hashes --output-file=/workspace/requirements-dev.txt /tmp/tmpye1zf3r4/pyproject.toml /workspace/requirements-dev.in
# pip-compile --all-extras --generate-hashes --output-file=/workspace/requirements-dev.txt /tmp/tmp6008thxb/pyproject.toml /workspace/requirements-dev.in
#
aiokafka==0.8.1 \
--hash=sha256:1e24839088fd6d3ff481cc09a48ea487b997328df11630bc0a1b88255edbcfe9 \
Expand Down Expand Up @@ -95,6 +95,60 @@ certifi==2023.7.22 \
# httpcore
# httpx
# requests
cffi==1.16.0 \
--hash=sha256:0c9ef6ff37e974b73c25eecc13952c55bceed9112be2d9d938ded8e856138bcc \
--hash=sha256:131fd094d1065b19540c3d72594260f118b231090295d8c34e19a7bbcf2e860a \
--hash=sha256:1b8ebc27c014c59692bb2664c7d13ce7a6e9a629be20e54e7271fa696ff2b417 \
--hash=sha256:2c56b361916f390cd758a57f2e16233eb4f64bcbeee88a4881ea90fca14dc6ab \
--hash=sha256:2d92b25dbf6cae33f65005baf472d2c245c050b1ce709cc4588cdcdd5495b520 \
--hash=sha256:31d13b0f99e0836b7ff893d37af07366ebc90b678b6664c955b54561fc36ef36 \
--hash=sha256:32c68ef735dbe5857c810328cb2481e24722a59a2003018885514d4c09af9743 \
--hash=sha256:3686dffb02459559c74dd3d81748269ffb0eb027c39a6fc99502de37d501faa8 \
--hash=sha256:582215a0e9adbe0e379761260553ba11c58943e4bbe9c36430c4ca6ac74b15ed \
--hash=sha256:5b50bf3f55561dac5438f8e70bfcdfd74543fd60df5fa5f62d94e5867deca684 \
--hash=sha256:5bf44d66cdf9e893637896c7faa22298baebcd18d1ddb6d2626a6e39793a1d56 \
--hash=sha256:6602bc8dc6f3a9e02b6c22c4fc1e47aa50f8f8e6d3f78a5e16ac33ef5fefa324 \
--hash=sha256:673739cb539f8cdaa07d92d02efa93c9ccf87e345b9a0b556e3ecc666718468d \
--hash=sha256:68678abf380b42ce21a5f2abde8efee05c114c2fdb2e9eef2efdb0257fba1235 \
--hash=sha256:68e7c44931cc171c54ccb702482e9fc723192e88d25a0e133edd7aff8fcd1f6e \
--hash=sha256:6b3d6606d369fc1da4fd8c357d026317fbb9c9b75d36dc16e90e84c26854b088 \
--hash=sha256:748dcd1e3d3d7cd5443ef03ce8685043294ad6bd7c02a38d1bd367cfd968e000 \
--hash=sha256:7651c50c8c5ef7bdb41108b7b8c5a83013bfaa8a935590c5d74627c047a583c7 \
--hash=sha256:7b78010e7b97fef4bee1e896df8a4bbb6712b7f05b7ef630f9d1da00f6444d2e \
--hash=sha256:7e61e3e4fa664a8588aa25c883eab612a188c725755afff6289454d6362b9673 \
--hash=sha256:80876338e19c951fdfed6198e70bc88f1c9758b94578d5a7c4c91a87af3cf31c \
--hash=sha256:8895613bcc094d4a1b2dbe179d88d7fb4a15cee43c052e8885783fac397d91fe \
--hash=sha256:88e2b3c14bdb32e440be531ade29d3c50a1a59cd4e51b1dd8b0865c54ea5d2e2 \
--hash=sha256:8f8e709127c6c77446a8c0a8c8bf3c8ee706a06cd44b1e827c3e6a2ee6b8c098 \
--hash=sha256:9cb4a35b3642fc5c005a6755a5d17c6c8b6bcb6981baf81cea8bfbc8903e8ba8 \
--hash=sha256:9f90389693731ff1f659e55c7d1640e2ec43ff725cc61b04b2f9c6d8d017df6a \
--hash=sha256:a09582f178759ee8128d9270cd1344154fd473bb77d94ce0aeb2a93ebf0feaf0 \
--hash=sha256:a6a14b17d7e17fa0d207ac08642c8820f84f25ce17a442fd15e27ea18d67c59b \
--hash=sha256:a72e8961a86d19bdb45851d8f1f08b041ea37d2bd8d4fd19903bc3083d80c896 \
--hash=sha256:abd808f9c129ba2beda4cfc53bde801e5bcf9d6e0f22f095e45327c038bfe68e \
--hash=sha256:ac0f5edd2360eea2f1daa9e26a41db02dd4b0451b48f7c318e217ee092a213e9 \
--hash=sha256:b29ebffcf550f9da55bec9e02ad430c992a87e5f512cd63388abb76f1036d8d2 \
--hash=sha256:b2ca4e77f9f47c55c194982e10f058db063937845bb2b7a86c84a6cfe0aefa8b \
--hash=sha256:b7be2d771cdba2942e13215c4e340bfd76398e9227ad10402a8767ab1865d2e6 \
--hash=sha256:b84834d0cf97e7d27dd5b7f3aca7b6e9263c56308ab9dc8aae9784abb774d404 \
--hash=sha256:b86851a328eedc692acf81fb05444bdf1891747c25af7529e39ddafaf68a4f3f \
--hash=sha256:bcb3ef43e58665bbda2fb198698fcae6776483e0c4a631aa5647806c25e02cc0 \
--hash=sha256:c0f31130ebc2d37cdd8e44605fb5fa7ad59049298b3f745c74fa74c62fbfcfc4 \
--hash=sha256:c6a164aa47843fb1b01e941d385aab7215563bb8816d80ff3a363a9f8448a8dc \
--hash=sha256:d8a9d3ebe49f084ad71f9269834ceccbf398253c9fac910c4fd7053ff1386936 \
--hash=sha256:db8e577c19c0fda0beb7e0d4e09e0ba74b1e4c092e0e40bfa12fe05b6f6d75ba \
--hash=sha256:dc9b18bf40cc75f66f40a7379f6a9513244fe33c0e8aa72e2d56b0196a7ef872 \
--hash=sha256:e09f3ff613345df5e8c3667da1d918f9149bd623cd9070c983c013792a9a62eb \
--hash=sha256:e4108df7fe9b707191e55f33efbcb2d81928e10cea45527879a4749cbe472614 \
--hash=sha256:e6024675e67af929088fda399b2094574609396b1decb609c55fa58b028a32a1 \
--hash=sha256:e70f54f1796669ef691ca07d046cd81a29cb4deb1e5f942003f401c0c4a2695d \
--hash=sha256:e715596e683d2ce000574bae5d07bd522c781a822866c20495e52520564f0969 \
--hash=sha256:e760191dd42581e023a68b758769e2da259b5d52e3103c6060ddc02c9edb8d7b \
--hash=sha256:ed86a35631f7bfbb28e108dd96773b9d5a6ce4811cf6ea468bb6a359b256b1e4 \
--hash=sha256:ee07e47c12890ef248766a6e55bd38ebfb2bb8edd4142d56db91b21ea68b7627 \
--hash=sha256:fa3a0128b152627161ce47201262d3140edb5a5c3da88d73a1b790a959126956 \
--hash=sha256:fcc8eb6d5902bb1cf6dc4f187ee3ea80a1eba0a89aba40a5cb20a5087d961357
# via cryptography
cfgv==3.4.0 \
--hash=sha256:b7265b1f29fd3316bfcd2b330d63d024f2bfd8bcb8b0272f8e19a504856c48f9 \
--hash=sha256:e52591d4c5f5dead8e0f673fb16db7949d2cfb3f7da4582893288f0ded8fe560
Expand Down Expand Up @@ -255,6 +309,31 @@ coverage[toml]==7.3.2 \
# via
# coverage
# pytest-cov
cryptography==41.0.5 \
--hash=sha256:0c327cac00f082013c7c9fb6c46b7cc9fa3c288ca702c74773968173bda421bf \
--hash=sha256:0d2a6a598847c46e3e321a7aef8af1436f11c27f1254933746304ff014664d84 \
--hash=sha256:227ec057cd32a41c6651701abc0328135e472ed450f47c2766f23267b792a88e \
--hash=sha256:22892cc830d8b2c89ea60148227631bb96a7da0c1b722f2aac8824b1b7c0b6b8 \
--hash=sha256:392cb88b597247177172e02da6b7a63deeff1937fa6fec3bbf902ebd75d97ec7 \
--hash=sha256:3be3ca726e1572517d2bef99a818378bbcf7d7799d5372a46c79c29eb8d166c1 \
--hash=sha256:573eb7128cbca75f9157dcde974781209463ce56b5804983e11a1c462f0f4e88 \
--hash=sha256:580afc7b7216deeb87a098ef0674d6ee34ab55993140838b14c9b83312b37b86 \
--hash=sha256:5a70187954ba7292c7876734183e810b728b4f3965fbe571421cb2434d279179 \
--hash=sha256:73801ac9736741f220e20435f84ecec75ed70eda90f781a148f1bad546963d81 \
--hash=sha256:7d208c21e47940369accfc9e85f0de7693d9a5d843c2509b3846b2db170dfd20 \
--hash=sha256:8254962e6ba1f4d2090c44daf50a547cd5f0bf446dc658a8e5f8156cae0d8548 \
--hash=sha256:88417bff20162f635f24f849ab182b092697922088b477a7abd6664ddd82291d \
--hash=sha256:a48e74dad1fb349f3dc1d449ed88e0017d792997a7ad2ec9587ed17405667e6d \
--hash=sha256:b948e09fe5fb18517d99994184854ebd50b57248736fd4c720ad540560174ec5 \
--hash=sha256:c707f7afd813478e2019ae32a7c49cd932dd60ab2d2a93e796f68236b7e1fbf1 \
--hash=sha256:d38e6031e113b7421db1de0c1b1f7739564a88f1684c6b89234fbf6c11b75147 \
--hash=sha256:d3977f0e276f6f5bf245c403156673db103283266601405376f075c849a0b936 \
--hash=sha256:da6a0ff8f1016ccc7477e6339e1d50ce5f59b88905585f77193ebd5068f1e797 \
--hash=sha256:e270c04f4d9b5671ebcc792b3ba5d4488bf7c42c3c241a3748e2599776f29696 \
--hash=sha256:e886098619d3815e0ad5790c973afeee2c0e6e04b4da90b88e6bd06e2a0b1b72 \
--hash=sha256:ec3b055ff8f1dce8e6ef28f626e0972981475173d7973d63f271b29c8a2897da \
--hash=sha256:fba1e91467c65fe64a82c689dc6cf58151158993b13eb7a7f3f4b7f395636723
# via -r /workspace/requirements-dev.in
dependency-injector==4.41.0 \
--hash=sha256:02620454ee8101f77a317f3229935ce687480883d72a40858ff4b0c87c935cce \
--hash=sha256:059fbb48333148143e8667a5323d162628dfe27c386bd0ed3deeecfc390338bf \
Expand Down Expand Up @@ -490,6 +569,10 @@ pre-commit==3.4.0 \
--hash=sha256:6bbd5129a64cad4c0dfaeeb12cd8f7ea7e15b77028d985341478c8af3c759522 \
--hash=sha256:96d529a951f8b677f730a7212442027e8ba53f9b04d217c4c67dc56c393ad945
# via -r /workspace/requirements-dev-common.in
pycparser==2.21 \
--hash=sha256:8ee45429555515e1f6b185e78100aea234072576aa43ab53aefcae078162fca9 \
--hash=sha256:e644fdec12f7872f86c58ff790da456218b10f863970249516d60a5eaca77206
# via cffi
pydantic==2.4.2 \
--hash=sha256:94f336138093a5d7f426aac732dcfe7ab4eb4da243c88f891d65deb4a2556ee7 \
--hash=sha256:bc3ddf669d234f4220e6e1c4d96b061abe0998185a8d7855c0126782b7abc8c1
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# This file is autogenerated by pip-compile with Python 3.9
# by the following command:
#
# pip-compile --all-extras --constraint=/workspace/requirements-dev.txt --generate-hashes --output-file=/workspace/requirements.txt /tmp/tmpye1zf3r4/pyproject.toml
# pip-compile --all-extras --constraint=/workspace/requirements-dev.txt --generate-hashes --output-file=/workspace/requirements.txt /tmp/tmp6008thxb/pyproject.toml
#
aiokafka==0.8.1 \
--hash=sha256:1e24839088fd6d3ff481cc09a48ea487b997328df11630bc0a1b88255edbcfe9 \
Expand Down
3 changes: 3 additions & 0 deletions scripts/license_checker.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@
"xml",
"yaml",
"yml",
"tsv",
"fastq",
"gz",
]

# exclude any files with names that match any of the following regex:
Expand Down
51 changes: 49 additions & 2 deletions src/hexkit/providers/akafka/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@

import json
import logging
import ssl
from contextlib import asynccontextmanager
from typing import Any, Callable, Protocol, TypeVar
from typing import Any, Callable, Optional, Protocol, TypeVar

from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
from aiokafka.helpers import create_ssl_context
from pydantic import Field
from pydantic_settings import BaseSettings

Expand Down Expand Up @@ -70,6 +72,29 @@ class KafkaConfig(BaseSettings):
examples=[["localhost:9092"]],
description="A list of connection strings to connect to Kafka bootstrap servers.",
)
kafka_security_protocol: Literal["PLAINTEXT", "SSL"] = Field(
"PLAINTEXT",
description="Protocol used to communicate with brokers. "
+ "Valid values are: PLAINTEXT, SSL.",
)
kafka_ssl_cafile: str = Field(
"",
description="Certificate Authority file path containing certificates"
+ " used to sign broker certificates. If a CA not specified, the default"
+ " system CA will be used if found by OpenSSL.",
)
kafka_ssl_certfile: str = Field(
"",
description="Optional filename of client certificate, as well as any"
+ " CA certificates needed to establish the certificate's authenticity.",
)
kafka_ssl_keyfile: str = Field(
"", description="Optional filename containing the client private key."
)
kafka_ssl_password: str = Field(
"",
description="Optional password to be used for the client private key.",
)


class EventTypeNotFoundError(RuntimeError):
Expand All @@ -84,13 +109,29 @@ def generate_client_id(*, service_name: str, instance_id: str) -> str:
return f"{service_name}.{instance_id}"


def generate_ssl_context(config: KafkaConfig) -> Optional[ssl.SSLContext]:
"""Generate SSL context for an encrypted SSL connection to Kafka broker."""
return (
create_ssl_context(
cafile=config.kafka_ssl_cafile,
certfile=config.kafka_ssl_certfile,
keyfile=config.kafka_ssl_keyfile,
password=config.kafka_ssl_password,
)
if config.kafka_security_protocol == "SSL"
else None
)


class KafkaProducerCompatible(Protocol):
"""A python duck type protocol describing an AIOKafkaProducer or equivalent."""

def __init__(
def __init__( # noqa: PLR0913
self,
*,
bootstrap_servers: str,
security_protocol: str,
ssl_context: Optional[ssl.SSLContext],
client_id: str,
key_serializer: Callable[[Any], bytes],
value_serializer: Callable[[Any], bytes],
Expand Down Expand Up @@ -149,6 +190,8 @@ async def construct(

producer = kafka_producer_cls(
bootstrap_servers=",".join(config.kafka_servers),
security_protocol=config.kafka_security_protocol,
ssl_context=generate_ssl_context(config),
client_id=client_id,
key_serializer=lambda key: key.encode("ascii"),
value_serializer=lambda event_value: json.dumps(event_value).encode(
Expand Down Expand Up @@ -220,6 +263,8 @@ def __init__( # noqa: PLR0913
self,
*topics: Ascii,
bootstrap_servers: str,
security_protocol: str,
ssl_context: Optional[ssl.SSLContext],
client_id: str,
group_id: str,
auto_offset_reset: Literal["earliest"],
Expand Down Expand Up @@ -297,6 +342,8 @@ async def construct(
consumer = kafka_consumer_cls(
*topics,
bootstrap_servers=",".join(config.kafka_servers),
security_protocol=config.kafka_security_protocol,
ssl_context=generate_ssl_context(config),
client_id=client_id,
group_id=config.service_name,
auto_offset_reset="earliest",
Expand Down
Loading