Skip to content

Commit

Permalink
Update test containers and adapt
Browse files Browse the repository at this point in the history
  • Loading branch information
Cito committed Dec 13, 2024
1 parent 57d4493 commit 6f20c26
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 73 deletions.
4 changes: 2 additions & 2 deletions .devcontainer/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ services:
user: vscode

zookeeper:
image: confluentinc/cp-zookeeper:7.6.1
image: confluentinc/cp-zookeeper:7.8.0
environment:
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000

kafka:
image: confluentinc/cp-server:7.6.1
image: confluentinc/cp-server:7.8.0
restart: always
depends_on:
- zookeeper
Expand Down
94 changes: 29 additions & 65 deletions src/hexkit/providers/akafka/testcontainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,22 @@

"""Improved Kafka test containers."""

import tarfile
import time
from io import BytesIO
from textwrap import dedent
from typing import Literal, Optional

from testcontainers.core.container import DockerContainer
from testcontainers.core.waiting_utils import wait_for_logs
from testcontainers.kafka import KafkaContainer

__all__ = ["KafkaSSLContainer"]

DEFAULT_IMAGE = "confluentinc/cp-kafka:7.6.1"
DEFAULT_IMAGE = "confluentinc/cp-kafka:7.8.0"

DEFAULT_PORT = 9093 # default port for the Kafka container
BROKER_PORT = 9092 # auxiliary port for inter broker listener


class KafkaSSLContainer(DockerContainer):
class KafkaSSLContainer(KafkaContainer):
"""Kafka container that supports SSL (or actually TLS)."""

TC_START_SCRIPT = "/tc-start.sh"
SECRETS_PATH = "/etc/kafka/secrets"

def __init__( # noqa: C901, PLR0912, PLR0913
Expand All @@ -58,24 +53,18 @@ def __init__( # noqa: C901, PLR0912, PLR0913
"trusted" must contain the trusted certificates. In "client_auth" you can
specify whether authentication is requested, required or not needed at all.
"""
super().__init__(image, **kwargs)
env = self.with_env
self.port = port
super().__init__(image, port, **kwargs)
ssl = bool(cert or trusted or client_auth)
protocol = "SSL" if ssl else "PLAINTEXT"
self.protocol = protocol
self.with_exposed_ports(port)
self.broker_port = DEFAULT_PORT if port == BROKER_PORT else BROKER_PORT
listeners = f"{protocol}://0.0.0.0:{port},BROKER://0.0.0.0:{self.broker_port}"
protocol_map = f"BROKER:PLAINTEXT,{protocol}:{protocol}"
env("KAFKA_LISTENERS", listeners)
env("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER")
env("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", protocol_map)
env("KAFKA_BROKER_ID", "1")
env("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1")
env("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", "1")
env("KAFKA_LOG_FLUSH_INTERVAL_MESSAGES", "10000000")
env("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "0")
self.listeners = (
f"{protocol}://0.0.0.0:{port},BROKER://0.0.0.0:{self.broker_port}"
)
self.security_protocol_map = f"BROKER:PLAINTEXT,{protocol}:{protocol}"
env = self.with_env
env("KAFKA_LISTENERS", self.listeners)
env("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", self.security_protocol_map)
if ssl:
if cert:
cert = cert.strip().replace("\n", "\\n")
Expand Down Expand Up @@ -106,52 +95,27 @@ def __init__( # noqa: C901, PLR0912, PLR0913
env("KAFKA_SSL_CLIENT_AUTH", client_auth)
env("KAFKA_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM", " ")

def get_bootstrap_server(self) -> str:
"""Get the Kafka bootstrap server."""
host = self.get_container_host_ip()
port = self.get_exposed_port(self.port)
return f"{host}:{port}"

def start(self, timeout: Optional[float] = 30) -> "KafkaSSLContainer":
"""Start the Docker container."""
script = self.TC_START_SCRIPT
command = f'sh -c "while [ ! -f {script} ]; do sleep 0.1; done; sh {script}"'
self.with_command(command)
super().start()
self.tc_start()
wait_for_logs(self, r".*\[KafkaServer id=\d+\] started.*", timeout=timeout)
return self

def tc_start(self) -> None:
"""Start the test container."""
protocol = self.protocol
host = self.get_container_host_ip()
port = self.get_exposed_port(self.port)
listeners = f"{protocol}://{host}:{port},BROKER://127.0.0.1:{self.broker_port}"
script = f"""
#!/bin/bash
c=/etc/confluent/docker
. $c/bash-config
export KAFKA_ADVERTISED_LISTENERS={listeners}
export KAFKA_ZOOKEEPER_CONNECT=localhost:2181
p=zookeeper.properties
echo "clientPort=2181" > $p
echo "dataDir=/var/lib/zookeeper/data" >> $p
echo "dataLogDir=/var/lib/zookeeper/log" >> $p
zookeeper-server-start $p &
# workaround for https://github.com/confluentinc/kafka-images/issues/244
sed -i -E '/^if .*LISTENERS.*SSL:/,/^fi/d' $c/configure
$c/configure && $c/launch
"""
self.create_file(dedent(script).strip().encode("utf-8"), self.TC_START_SCRIPT)

def create_file(self, content: bytes, path: str) -> None:
"""Create a file inside the container."""
with BytesIO() as archive:
with tarfile.TarFile(fileobj=archive, mode="w") as tar:
tarinfo = tarfile.TarInfo(name=path)
tarinfo.size = len(content)
tarinfo.mtime = time.time()
tar.addfile(tarinfo, BytesIO(content))
archive.seek(0)
self.get_wrapped_container().put_archive("/", archive)
data = (
dedent(
f"""
#!/bin/bash
{self.boot_command}
export KAFKA_ADVERTISED_LISTENERS={listeners}
c=/etc/confluent/docker
. $c/bash-config
# workaround for https://github.com/confluentinc/kafka-images/issues/244
sed -i -E '/^if .*LISTENERS.*SSL:/,/^fi/d' $c/configure
$c/configure
$c/launch
"""
)
.strip()
.encode("utf-8")
)
self.create_file(data, KafkaContainer.TC_START_SCRIPT)
2 changes: 1 addition & 1 deletion src/hexkit/providers/mongodb/testutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from hexkit.custom_types import PytestScope
from hexkit.providers.mongodb.provider import MongoDbConfig, MongoDbDaoFactory

MONGODB_IMAGE = "mongo:7.0.9"
MONGODB_IMAGE = "mongo:7.0.15"

__all__ = [
"MONGODB_IMAGE",
Expand Down
2 changes: 1 addition & 1 deletion src/hexkit/providers/s3/testutils/_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
]


LOCALSTACK_IMAGE = "localstack/localstack:3.8.1"
LOCALSTACK_IMAGE = "localstack/localstack:4.0.3"

TEST_FILE_DIR = Path(__file__).parent.parent.resolve() / "test_files"

Expand Down
9 changes: 5 additions & 4 deletions tests/integration/test_mongokafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@
"""Test the DAO pub/sub functionality based on the mongokafka/kafka providers."""

import uuid
from collections.abc import AsyncGenerator
from collections.abc import Generator
from pathlib import Path
from typing import Any, Optional

import pytest
import pytest_asyncio
from pydantic import UUID4, BaseModel
from pymongo import MongoClient
from pymongo.collection import Collection
Expand Down Expand Up @@ -75,9 +74,11 @@
EXAMPLE_TOPIC = "example"


@pytest_asyncio.fixture(autouse=True)
async def correlation_id_fixture() -> AsyncGenerator[str, None]:
@pytest.fixture(autouse=True)
def correlation_id_fixture() -> Generator[str, None, None]:
"""Provides a new correlation ID for each test case."""
# Note: Using an async fixture doesn't work reliably with older Python versions,
# because the context is not preserved even with pytest-asyncio 0.25.
correlation_id = new_correlation_id()
token = correlation_id_var.set(correlation_id)
yield correlation_id
Expand Down

0 comments on commit 6f20c26

Please sign in to comment.