Skip to content

Commit

Permalink
Add a prefix to kafka config settings
Browse files Browse the repository at this point in the history
  • Loading branch information
Cito committed Nov 8, 2023
1 parent 379b3e5 commit b05a98d
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 18 deletions.
24 changes: 12 additions & 12 deletions src/hexkit/providers/akafka/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,26 +72,26 @@ class KafkaConfig(BaseSettings):
examples=[["localhost:9092"]],
description="A list of connection strings to connect to Kafka bootstrap servers.",
)
security_protocol: Literal["PLAINTEXT", "SSL"] = Field(
kafka_security_protocol: Literal["PLAINTEXT", "SSL"] = Field(
"PLAINTEXT",
description="Protocol used to communicate with brokers. "
+ "Valid values are: PLAINTEXT, SSL.",
)
ssl_cafile: str = Field(
kafka_ssl_cafile: str = Field(
"",
description="Certificate Authority file path containing certificates"
+ " used to sign broker certificates. If a CA not specified, the default"
+ " system CA will be used if found by OpenSSL.",
)
ssl_certfile: str = Field(
kafka_ssl_certfile: str = Field(
"",
description="Optional filename of client certificate, as well as any"
+ " CA certificates needed to establish the certificate's authenticity.",
)
ssl_keyfile: str = Field(
kafka_ssl_keyfile: str = Field(
"", description="Optional filename containing the client private key."
)
ssl_password: str = Field(
kafka_ssl_password: str = Field(
"",
description="Optional password to be used for the client private key.",
)
Expand All @@ -113,12 +113,12 @@ def generate_ssl_context(config: KafkaConfig) -> Optional[ssl.SSLContext]:
"""Generate SSL context for an encrypted SSL connection to Kafka broker."""
return (
create_ssl_context(
cafile=config.ssl_cafile,
certfile=config.ssl_certfile,
keyfile=config.ssl_keyfile,
password=config.ssl_password,
cafile=config.kafka_ssl_cafile,
certfile=config.kafka_ssl_certfile,
keyfile=config.kafka_ssl_keyfile,
password=config.kafka_ssl_password,
)
if config.security_protocol == "SSL"
if config.kafka_security_protocol == "SSL"
else None
)

Expand Down Expand Up @@ -190,7 +190,7 @@ async def construct(

producer = kafka_producer_cls(
bootstrap_servers=",".join(config.kafka_servers),
security_protocol=config.security_protocol,
security_protocol=config.kafka_security_protocol,
ssl_context=generate_ssl_context(config),
client_id=client_id,
key_serializer=lambda key: key.encode("ascii"),
Expand Down Expand Up @@ -342,7 +342,7 @@ async def construct(
consumer = kafka_consumer_cls(
*topics,
bootstrap_servers=",".join(config.kafka_servers),
security_protocol=config.security_protocol,
security_protocol=config.kafka_security_protocol,
ssl_context=generate_ssl_context(config),
client_id=client_id,
group_id=config.service_name,
Expand Down
10 changes: 5 additions & 5 deletions tests/integration/test_akafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,11 @@ async def test_kafka_ssl():
service_name="test_ssl",
service_instance_id="1",
kafka_servers=["localhost:19092"], # SSL port
security_protocol="SSL",
ssl_cafile=str(secrets_dir / "ca.crt"),
ssl_certfile=str(secrets_dir / "client.crt"),
ssl_keyfile=str(secrets_dir / "client.key"),
ssl_password=password,
kafka_security_protocol="SSL",
kafka_ssl_cafile=str(secrets_dir / "ca.crt"),
kafka_ssl_certfile=str(secrets_dir / "client.crt"),
kafka_ssl_keyfile=str(secrets_dir / "client.key"),
kafka_ssl_password=password,
)

async with KafkaEventPublisher.construct(config=config) as event_publisher:
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/test_akafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ async def test_kafka_event_subscriber(
consumer_cls = Mock()
consumer_cls.return_value = consumer

# create protocol-compatiple translator mock:
# create protocol-compatible translator mock:
translator = AsyncMock()
if processing_failure and exception:
translator.consume.side_effect = exception()
Expand Down

0 comments on commit b05a98d

Please sign in to comment.