Skip to content

Commit

Permalink
Add SSL Support to Kafka Provider Configuration (#66)
Browse files Browse the repository at this point in the history
  • Loading branch information
walzph authored Nov 13, 2023
1 parent 1b693b3 commit 4c05b00
Show file tree
Hide file tree
Showing 17 changed files with 587 additions and 36 deletions.
22 changes: 10 additions & 12 deletions .devcontainer/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,23 @@ services:
user: vscode

zookeeper:
image: confluentinc/cp-zookeeper:7.3.1
# used ports: 2181
image: confluentinc/cp-zookeeper:7.5.1
environment:
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000

kafka:
image: confluentinc/cp-server:7.3.1
image: confluentinc/cp-server:7.5.1
restart: always
depends_on:
- zookeeper
# used ports: 9092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_DELETE_TOPIC_ENABLE: "true"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
Expand All @@ -55,9 +54,8 @@ services:
- kafka
- zookeeper
environment:
- KAFKA_CLUSTERS_0_NAME=local
- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:9092
- KAFKA_CLUSTERS_0_ZOOKEEPER=zookeeper:2181
# used ports: 8080
# The port is automatically forwarded to localhost.
# Please see the "PORTS" panel for details.
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
ports:
- 8080:8080
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,12 @@ dmypy.json
# ignore VS Code settings:
.vscode/

# key stores
*.key
*.rnd
.keystore
.ssl/

# desktop settings and thumbnails
.DS_Store
desktop.ini
Expand Down
1 change: 0 additions & 1 deletion .static_files_ignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
.github/workflows/check_openapi_spec.yaml
.github/workflows/check_readme.yaml
.github/workflows/cd.yaml
.github/workflows/dev_cd.yaml

scripts/script_utils/fastapi_app_location.py

Expand Down
7 changes: 4 additions & 3 deletions examples/stream_calc/sc_tests/integration/test_event_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@
from testcontainers.kafka import KafkaContainer

from hexkit.custom_types import JsonObject
from hexkit.providers.akafka.testcontainer import DEFAULT_IMAGE as KAFKA_IMAGE
from stream_calc.config import Config
from stream_calc.main import main

DEFAULT_CONFIG = Config()
DEFAULT_CONFIG = Config() # type: ignore


class Event(NamedTuple):
Expand Down Expand Up @@ -200,14 +201,14 @@ async def test_receive_calc_publish(cases: list[Case] = deepcopy(CASES)):
the results.
"""

with KafkaContainer() as kafka:
with KafkaContainer(image=KAFKA_IMAGE) as kafka:
kafka_server = kafka.get_bootstrap_server()

submit_test_problems(cases, kafka_server=kafka_server)

# run the stream_calc app:
# (for each problem separately to avoid running forever)
config = Config(kafka_servers=[kafka_server])
config = Config(kafka_servers=[kafka_server]) # type: ignore
for _ in cases:
await main(config=config, run_forever=False)

Expand Down
4 changes: 3 additions & 1 deletion examples/stream_calc/stream_calc/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ def get_container(config: Config) -> Container:
return container


async def main(*, config: Config = Config(), run_forever: bool = True) -> None:
async def main(
*, config: Config = Config(), run_forever: bool = True # type: ignore
) -> None:
"""
Coroutine to run the stream calculator.
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "hexkit"
version = "0.11.1"
version = "1.0.0"
description = "A Toolkit for Building Microservices using the Hexagonal Architecture"
readme = "README.md"
authors = [
Expand Down
1 change: 1 addition & 0 deletions requirements-dev.in
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@
-r requirements-dev-common.in

# additional requirements can be listed her
cryptography >= 41
85 changes: 84 additions & 1 deletion requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# This file is autogenerated by pip-compile with Python 3.9
# by the following command:
#
# pip-compile --all-extras --generate-hashes --output-file=/workspace/requirements-dev.txt /tmp/tmpye1zf3r4/pyproject.toml /workspace/requirements-dev.in
# pip-compile --all-extras --generate-hashes --output-file=/workspace/requirements-dev.txt /tmp/tmp6008thxb/pyproject.toml /workspace/requirements-dev.in
#
aiokafka==0.8.1 \
--hash=sha256:1e24839088fd6d3ff481cc09a48ea487b997328df11630bc0a1b88255edbcfe9 \
Expand Down Expand Up @@ -95,6 +95,60 @@ certifi==2023.7.22 \
# httpcore
# httpx
# requests
cffi==1.16.0 \
--hash=sha256:0c9ef6ff37e974b73c25eecc13952c55bceed9112be2d9d938ded8e856138bcc \
--hash=sha256:131fd094d1065b19540c3d72594260f118b231090295d8c34e19a7bbcf2e860a \
--hash=sha256:1b8ebc27c014c59692bb2664c7d13ce7a6e9a629be20e54e7271fa696ff2b417 \
--hash=sha256:2c56b361916f390cd758a57f2e16233eb4f64bcbeee88a4881ea90fca14dc6ab \
--hash=sha256:2d92b25dbf6cae33f65005baf472d2c245c050b1ce709cc4588cdcdd5495b520 \
--hash=sha256:31d13b0f99e0836b7ff893d37af07366ebc90b678b6664c955b54561fc36ef36 \
--hash=sha256:32c68ef735dbe5857c810328cb2481e24722a59a2003018885514d4c09af9743 \
--hash=sha256:3686dffb02459559c74dd3d81748269ffb0eb027c39a6fc99502de37d501faa8 \
--hash=sha256:582215a0e9adbe0e379761260553ba11c58943e4bbe9c36430c4ca6ac74b15ed \
--hash=sha256:5b50bf3f55561dac5438f8e70bfcdfd74543fd60df5fa5f62d94e5867deca684 \
--hash=sha256:5bf44d66cdf9e893637896c7faa22298baebcd18d1ddb6d2626a6e39793a1d56 \
--hash=sha256:6602bc8dc6f3a9e02b6c22c4fc1e47aa50f8f8e6d3f78a5e16ac33ef5fefa324 \
--hash=sha256:673739cb539f8cdaa07d92d02efa93c9ccf87e345b9a0b556e3ecc666718468d \
--hash=sha256:68678abf380b42ce21a5f2abde8efee05c114c2fdb2e9eef2efdb0257fba1235 \
--hash=sha256:68e7c44931cc171c54ccb702482e9fc723192e88d25a0e133edd7aff8fcd1f6e \
--hash=sha256:6b3d6606d369fc1da4fd8c357d026317fbb9c9b75d36dc16e90e84c26854b088 \
--hash=sha256:748dcd1e3d3d7cd5443ef03ce8685043294ad6bd7c02a38d1bd367cfd968e000 \
--hash=sha256:7651c50c8c5ef7bdb41108b7b8c5a83013bfaa8a935590c5d74627c047a583c7 \
--hash=sha256:7b78010e7b97fef4bee1e896df8a4bbb6712b7f05b7ef630f9d1da00f6444d2e \
--hash=sha256:7e61e3e4fa664a8588aa25c883eab612a188c725755afff6289454d6362b9673 \
--hash=sha256:80876338e19c951fdfed6198e70bc88f1c9758b94578d5a7c4c91a87af3cf31c \
--hash=sha256:8895613bcc094d4a1b2dbe179d88d7fb4a15cee43c052e8885783fac397d91fe \
--hash=sha256:88e2b3c14bdb32e440be531ade29d3c50a1a59cd4e51b1dd8b0865c54ea5d2e2 \
--hash=sha256:8f8e709127c6c77446a8c0a8c8bf3c8ee706a06cd44b1e827c3e6a2ee6b8c098 \
--hash=sha256:9cb4a35b3642fc5c005a6755a5d17c6c8b6bcb6981baf81cea8bfbc8903e8ba8 \
--hash=sha256:9f90389693731ff1f659e55c7d1640e2ec43ff725cc61b04b2f9c6d8d017df6a \
--hash=sha256:a09582f178759ee8128d9270cd1344154fd473bb77d94ce0aeb2a93ebf0feaf0 \
--hash=sha256:a6a14b17d7e17fa0d207ac08642c8820f84f25ce17a442fd15e27ea18d67c59b \
--hash=sha256:a72e8961a86d19bdb45851d8f1f08b041ea37d2bd8d4fd19903bc3083d80c896 \
--hash=sha256:abd808f9c129ba2beda4cfc53bde801e5bcf9d6e0f22f095e45327c038bfe68e \
--hash=sha256:ac0f5edd2360eea2f1daa9e26a41db02dd4b0451b48f7c318e217ee092a213e9 \
--hash=sha256:b29ebffcf550f9da55bec9e02ad430c992a87e5f512cd63388abb76f1036d8d2 \
--hash=sha256:b2ca4e77f9f47c55c194982e10f058db063937845bb2b7a86c84a6cfe0aefa8b \
--hash=sha256:b7be2d771cdba2942e13215c4e340bfd76398e9227ad10402a8767ab1865d2e6 \
--hash=sha256:b84834d0cf97e7d27dd5b7f3aca7b6e9263c56308ab9dc8aae9784abb774d404 \
--hash=sha256:b86851a328eedc692acf81fb05444bdf1891747c25af7529e39ddafaf68a4f3f \
--hash=sha256:bcb3ef43e58665bbda2fb198698fcae6776483e0c4a631aa5647806c25e02cc0 \
--hash=sha256:c0f31130ebc2d37cdd8e44605fb5fa7ad59049298b3f745c74fa74c62fbfcfc4 \
--hash=sha256:c6a164aa47843fb1b01e941d385aab7215563bb8816d80ff3a363a9f8448a8dc \
--hash=sha256:d8a9d3ebe49f084ad71f9269834ceccbf398253c9fac910c4fd7053ff1386936 \
--hash=sha256:db8e577c19c0fda0beb7e0d4e09e0ba74b1e4c092e0e40bfa12fe05b6f6d75ba \
--hash=sha256:dc9b18bf40cc75f66f40a7379f6a9513244fe33c0e8aa72e2d56b0196a7ef872 \
--hash=sha256:e09f3ff613345df5e8c3667da1d918f9149bd623cd9070c983c013792a9a62eb \
--hash=sha256:e4108df7fe9b707191e55f33efbcb2d81928e10cea45527879a4749cbe472614 \
--hash=sha256:e6024675e67af929088fda399b2094574609396b1decb609c55fa58b028a32a1 \
--hash=sha256:e70f54f1796669ef691ca07d046cd81a29cb4deb1e5f942003f401c0c4a2695d \
--hash=sha256:e715596e683d2ce000574bae5d07bd522c781a822866c20495e52520564f0969 \
--hash=sha256:e760191dd42581e023a68b758769e2da259b5d52e3103c6060ddc02c9edb8d7b \
--hash=sha256:ed86a35631f7bfbb28e108dd96773b9d5a6ce4811cf6ea468bb6a359b256b1e4 \
--hash=sha256:ee07e47c12890ef248766a6e55bd38ebfb2bb8edd4142d56db91b21ea68b7627 \
--hash=sha256:fa3a0128b152627161ce47201262d3140edb5a5c3da88d73a1b790a959126956 \
--hash=sha256:fcc8eb6d5902bb1cf6dc4f187ee3ea80a1eba0a89aba40a5cb20a5087d961357
# via cryptography
cfgv==3.4.0 \
--hash=sha256:b7265b1f29fd3316bfcd2b330d63d024f2bfd8bcb8b0272f8e19a504856c48f9 \
--hash=sha256:e52591d4c5f5dead8e0f673fb16db7949d2cfb3f7da4582893288f0ded8fe560
Expand Down Expand Up @@ -255,6 +309,31 @@ coverage[toml]==7.3.2 \
# via
# coverage
# pytest-cov
cryptography==41.0.5 \
--hash=sha256:0c327cac00f082013c7c9fb6c46b7cc9fa3c288ca702c74773968173bda421bf \
--hash=sha256:0d2a6a598847c46e3e321a7aef8af1436f11c27f1254933746304ff014664d84 \
--hash=sha256:227ec057cd32a41c6651701abc0328135e472ed450f47c2766f23267b792a88e \
--hash=sha256:22892cc830d8b2c89ea60148227631bb96a7da0c1b722f2aac8824b1b7c0b6b8 \
--hash=sha256:392cb88b597247177172e02da6b7a63deeff1937fa6fec3bbf902ebd75d97ec7 \
--hash=sha256:3be3ca726e1572517d2bef99a818378bbcf7d7799d5372a46c79c29eb8d166c1 \
--hash=sha256:573eb7128cbca75f9157dcde974781209463ce56b5804983e11a1c462f0f4e88 \
--hash=sha256:580afc7b7216deeb87a098ef0674d6ee34ab55993140838b14c9b83312b37b86 \
--hash=sha256:5a70187954ba7292c7876734183e810b728b4f3965fbe571421cb2434d279179 \
--hash=sha256:73801ac9736741f220e20435f84ecec75ed70eda90f781a148f1bad546963d81 \
--hash=sha256:7d208c21e47940369accfc9e85f0de7693d9a5d843c2509b3846b2db170dfd20 \
--hash=sha256:8254962e6ba1f4d2090c44daf50a547cd5f0bf446dc658a8e5f8156cae0d8548 \
--hash=sha256:88417bff20162f635f24f849ab182b092697922088b477a7abd6664ddd82291d \
--hash=sha256:a48e74dad1fb349f3dc1d449ed88e0017d792997a7ad2ec9587ed17405667e6d \
--hash=sha256:b948e09fe5fb18517d99994184854ebd50b57248736fd4c720ad540560174ec5 \
--hash=sha256:c707f7afd813478e2019ae32a7c49cd932dd60ab2d2a93e796f68236b7e1fbf1 \
--hash=sha256:d38e6031e113b7421db1de0c1b1f7739564a88f1684c6b89234fbf6c11b75147 \
--hash=sha256:d3977f0e276f6f5bf245c403156673db103283266601405376f075c849a0b936 \
--hash=sha256:da6a0ff8f1016ccc7477e6339e1d50ce5f59b88905585f77193ebd5068f1e797 \
--hash=sha256:e270c04f4d9b5671ebcc792b3ba5d4488bf7c42c3c241a3748e2599776f29696 \
--hash=sha256:e886098619d3815e0ad5790c973afeee2c0e6e04b4da90b88e6bd06e2a0b1b72 \
--hash=sha256:ec3b055ff8f1dce8e6ef28f626e0972981475173d7973d63f271b29c8a2897da \
--hash=sha256:fba1e91467c65fe64a82c689dc6cf58151158993b13eb7a7f3f4b7f395636723
# via -r /workspace/requirements-dev.in
dependency-injector==4.41.0 \
--hash=sha256:02620454ee8101f77a317f3229935ce687480883d72a40858ff4b0c87c935cce \
--hash=sha256:059fbb48333148143e8667a5323d162628dfe27c386bd0ed3deeecfc390338bf \
Expand Down Expand Up @@ -490,6 +569,10 @@ pre-commit==3.4.0 \
--hash=sha256:6bbd5129a64cad4c0dfaeeb12cd8f7ea7e15b77028d985341478c8af3c759522 \
--hash=sha256:96d529a951f8b677f730a7212442027e8ba53f9b04d217c4c67dc56c393ad945
# via -r /workspace/requirements-dev-common.in
pycparser==2.21 \
--hash=sha256:8ee45429555515e1f6b185e78100aea234072576aa43ab53aefcae078162fca9 \
--hash=sha256:e644fdec12f7872f86c58ff790da456218b10f863970249516d60a5eaca77206
# via cffi
pydantic==2.4.2 \
--hash=sha256:94f336138093a5d7f426aac732dcfe7ab4eb4da243c88f891d65deb4a2556ee7 \
--hash=sha256:bc3ddf669d234f4220e6e1c4d96b061abe0998185a8d7855c0126782b7abc8c1
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# This file is autogenerated by pip-compile with Python 3.9
# by the following command:
#
# pip-compile --all-extras --constraint=/workspace/requirements-dev.txt --generate-hashes --output-file=/workspace/requirements.txt /tmp/tmpye1zf3r4/pyproject.toml
# pip-compile --all-extras --constraint=/workspace/requirements-dev.txt --generate-hashes --output-file=/workspace/requirements.txt /tmp/tmp6008thxb/pyproject.toml
#
aiokafka==0.8.1 \
--hash=sha256:1e24839088fd6d3ff481cc09a48ea487b997328df11630bc0a1b88255edbcfe9 \
Expand Down
3 changes: 3 additions & 0 deletions scripts/license_checker.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@
"xml",
"yaml",
"yml",
"tsv",
"fastq",
"gz",
]

# exclude any files with names that match any of the following regex:
Expand Down
51 changes: 49 additions & 2 deletions src/hexkit/providers/akafka/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@

import json
import logging
import ssl
from contextlib import asynccontextmanager
from typing import Any, Callable, Protocol, TypeVar
from typing import Any, Callable, Optional, Protocol, TypeVar

from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
from aiokafka.helpers import create_ssl_context
from pydantic import Field
from pydantic_settings import BaseSettings

Expand Down Expand Up @@ -70,6 +72,29 @@ class KafkaConfig(BaseSettings):
examples=[["localhost:9092"]],
description="A list of connection strings to connect to Kafka bootstrap servers.",
)
kafka_security_protocol: Literal["PLAINTEXT", "SSL"] = Field(
"PLAINTEXT",
description="Protocol used to communicate with brokers. "
+ "Valid values are: PLAINTEXT, SSL.",
)
kafka_ssl_cafile: str = Field(
"",
description="Certificate Authority file path containing certificates"
+ " used to sign broker certificates. If a CA not specified, the default"
+ " system CA will be used if found by OpenSSL.",
)
kafka_ssl_certfile: str = Field(
"",
description="Optional filename of client certificate, as well as any"
+ " CA certificates needed to establish the certificate's authenticity.",
)
kafka_ssl_keyfile: str = Field(
"", description="Optional filename containing the client private key."
)
kafka_ssl_password: str = Field(
"",
description="Optional password to be used for the client private key.",
)


class EventTypeNotFoundError(RuntimeError):
Expand All @@ -84,13 +109,29 @@ def generate_client_id(*, service_name: str, instance_id: str) -> str:
return f"{service_name}.{instance_id}"


def generate_ssl_context(config: KafkaConfig) -> Optional[ssl.SSLContext]:
"""Generate SSL context for an encrypted SSL connection to Kafka broker."""
return (
create_ssl_context(
cafile=config.kafka_ssl_cafile,
certfile=config.kafka_ssl_certfile,
keyfile=config.kafka_ssl_keyfile,
password=config.kafka_ssl_password,
)
if config.kafka_security_protocol == "SSL"
else None
)


class KafkaProducerCompatible(Protocol):
"""A python duck type protocol describing an AIOKafkaProducer or equivalent."""

def __init__(
def __init__( # noqa: PLR0913
self,
*,
bootstrap_servers: str,
security_protocol: str,
ssl_context: Optional[ssl.SSLContext],
client_id: str,
key_serializer: Callable[[Any], bytes],
value_serializer: Callable[[Any], bytes],
Expand Down Expand Up @@ -149,6 +190,8 @@ async def construct(

producer = kafka_producer_cls(
bootstrap_servers=",".join(config.kafka_servers),
security_protocol=config.kafka_security_protocol,
ssl_context=generate_ssl_context(config),
client_id=client_id,
key_serializer=lambda key: key.encode("ascii"),
value_serializer=lambda event_value: json.dumps(event_value).encode(
Expand Down Expand Up @@ -220,6 +263,8 @@ def __init__( # noqa: PLR0913
self,
*topics: Ascii,
bootstrap_servers: str,
security_protocol: str,
ssl_context: Optional[ssl.SSLContext],
client_id: str,
group_id: str,
auto_offset_reset: Literal["earliest"],
Expand Down Expand Up @@ -297,6 +342,8 @@ async def construct(
consumer = kafka_consumer_cls(
*topics,
bootstrap_servers=",".join(config.kafka_servers),
security_protocol=config.kafka_security_protocol,
ssl_context=generate_ssl_context(config),
client_id=client_id,
group_id=config.service_name,
auto_offset_reset="earliest",
Expand Down
Loading

0 comments on commit 4c05b00

Please sign in to comment.