diff --git a/.gitignore b/.gitignore index a71cfd2a..40dbd167 100644 --- a/.gitignore +++ b/.gitignore @@ -137,3 +137,6 @@ dmypy.json .DS_Store desktop.ini thumbs.db + +# SSL keystore +.ssl/ diff --git a/src/hexkit/providers/akafka/testutils.py b/src/hexkit/providers/akafka/testutils.py index 9ee6c65f..d4206a1f 100644 --- a/src/hexkit/providers/akafka/testutils.py +++ b/src/hexkit/providers/akafka/testutils.py @@ -30,6 +30,7 @@ import jks import pytest_asyncio from aiokafka import AIOKafkaConsumer, TopicPartition +from cryptography import x509 from kafka import KafkaAdminClient from kafka.errors import KafkaError from OpenSSL import crypto @@ -402,63 +403,97 @@ async def expect_events( ) -def generate_ssl_certificates(): +def generate_ssl_certificates(): # noqa: PLR0915 """Generate ssl keys""" - key = crypto.PKey() - key.generate_key(crypto.TYPE_RSA, 2048) + ca_key = crypto.PKey() + ca_key.generate_key(crypto.TYPE_RSA, 2048) # generate a self signed certificate - cert = crypto.X509() - cert.get_subject().CN = "my.server.example.com" - cert.set_serial_number(473289472) - cert.gmtime_adj_notBefore(0) - cert.gmtime_adj_notAfter(365 * 24 * 60 * 60) - cert.set_issuer(cert.get_subject()) - cert.set_pubkey(key) - cert.sign(key, "sha256") - - # dumping the key and cert to ASN1 - dumped_cert = crypto.dump_certificate(crypto.FILETYPE_ASN1, cert) - dumped_key = crypto.dump_privatekey(crypto.FILETYPE_ASN1, key) + ca_cert = crypto.X509() + ca_cert.get_subject().CN = "GHGA Test Certificate Authority" + ca_cert.set_serial_number(x509.random_serial_number()) + ca_cert.gmtime_adj_notBefore(0) + ca_cert.gmtime_adj_notAfter(365 * 24 * 60 * 60) + ca_cert.set_issuer(ca_cert.get_subject()) + ca_cert.set_pubkey(ca_key) + ca_cert.sign(ca_key, "sha256") + + server_key = crypto.PKey() + server_key.generate_key(crypto.TYPE_RSA, 2048) + + # generate a self signed certificate + server_cert = crypto.X509() + server_cert.get_subject().CN = "Broker 1" + server_cert.set_serial_number(x509.random_serial_number()) + server_cert.gmtime_adj_notBefore(0) + server_cert.gmtime_adj_notAfter(365 * 24 * 60 * 60) + server_cert.set_issuer(ca_cert.get_subject()) + server_cert.set_pubkey(server_key) + server_cert.sign(ca_key, "sha256") + + # dumping the ca key and cert to ASN1 + dumped_ca_cert = crypto.dump_certificate(crypto.FILETYPE_ASN1, ca_cert) + dumped_ca_key = crypto.dump_privatekey(crypto.FILETYPE_ASN1, ca_key) + # dumping the server key and cert to ASN1 + dumped_server_cert = crypto.dump_certificate(crypto.FILETYPE_ASN1, server_cert) + dumped_server_key = crypto.dump_privatekey(crypto.FILETYPE_ASN1, server_key) # creating a private key entry - pke = jks.PrivateKeyEntry.new( - "self signed cert", [dumped_cert], dumped_key, "rsa_raw" + ca_pke = jks.PrivateKeyEntry.new( + "self signed cert", [dumped_ca_cert], dumped_ca_key, "rsa_raw" ) + # creating a private key entry + server_pke = jks.PrivateKeyEntry.new( + "self signed cert", [dumped_server_cert], dumped_server_key, "rsa_raw" + ) # if we want the private key entry to have a unique password, we can encrypt it beforehand # if it is not ecrypted when saved, it will be encrypted with the same password as the keystore - # pke.encrypt('my_private_key_password') + # pke.encrypt("") # os.mkdir("ssl") - ssl_dir = Path("/tmp/ssl") # noqa: S108 + ssl_dir = Path("/workspace/.ssl") Path(ssl_dir).mkdir(exist_ok=True) - cert_path = os.path.abspath(f"{ssl_dir}/kafka-server-cert.crt") - key_path = os.path.abspath(f"{ssl_dir}/kafka-server-private.key") - jks_path = os.path.abspath(f"{ssl_dir}/kafka-server.keystore.jks") + ca_cert_path = os.path.abspath(f"{ssl_dir}/ca-cert.crt") + ca_key_path = os.path.abspath(f"{ssl_dir}/ca-private.key") + ca_jks_path = os.path.abspath(f"{ssl_dir}/ca.truststore.jks") + server_cert_path = os.path.abspath(f"{ssl_dir}/server-cert.crt") + server_key_path = os.path.abspath(f"{ssl_dir}/server-private.key") + server_jks_path = os.path.abspath(f"{ssl_dir}/server.keystore.jks") cred_path = os.path.abspath(f"{ssl_dir}/password.txt") keystore_password = "password" # noqa: S105 - with open(cert_path, "w") as f: - f.write(crypto.dump_certificate(crypto.FILETYPE_PEM, cert).decode("utf-8")) + with open(ca_cert_path, "w") as f: + f.write(crypto.dump_certificate(crypto.FILETYPE_PEM, ca_cert).decode("utf-8")) - with open(key_path, "w") as f: - f.write(crypto.dump_privatekey(crypto.FILETYPE_PEM, key).decode("utf-8")) + with open(ca_key_path, "w") as f: + f.write(crypto.dump_privatekey(crypto.FILETYPE_PEM, ca_key).decode("utf-8")) + with open(server_cert_path, "w") as f: + f.write( + crypto.dump_certificate(crypto.FILETYPE_PEM, server_cert).decode("utf-8") + ) + + with open(server_key_path, "w") as f: + f.write(crypto.dump_privatekey(crypto.FILETYPE_PEM, server_key).decode("utf-8")) with open(cred_path, "w") as f: f.write(keystore_password) # creating a jks keystore with the private key, and saving it - keystore = jks.KeyStore.new("jks", [pke]) - keystore.save(jks_path, keystore_password) + ca_keystore = jks.KeyStore.new("jks", [ca_pke]) + ca_keystore.save(ca_jks_path, keystore_password) + + # creating a jks keystore with the private key, and saving it + server_keystore = jks.KeyStore.new("jks", [server_pke]) + server_keystore.save(server_jks_path, keystore_password) return { "ssl_dir": ssl_dir, - "ssl_certfile": cert_path, - "ssl_keyfile": key_path, - "ssl_cafile": cert_path, + "ssl_certfile": ca_cert_path, + "ssl_keyfile": ca_key_path, + "ssl_cafile": ca_cert_path, "ssl_password": cred_path, - "keystore_location": jks_path, + "keystore_location": ca_jks_path, } @@ -469,7 +504,9 @@ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.ssl_config = generate_ssl_certificates() - self.with_volume_mapping(str(self.ssl_config["ssl_dir"]), "/etc/kafka/secrets") + self.with_volume_mapping( + "/Users/w620-admin/workspace/ghga/hexkit/.ssl", "/etc/kafka/secrets" + ) self.with_env("KAFKA_LISTENERS", "SSL://0.0.0.0:9093,BROKER://0.0.0.0:9092") self.with_env( @@ -477,15 +514,15 @@ def __init__(self, *args, **kwargs): ) self.with_env("KAFKA_SECURITY_PROTOCOL", "SSL") - self.with_env("KAFKA_SSL_KEYSTORE_FILENAME", "kafka-server.keystore.jks") + self.with_env("KAFKA_SSL_KEYSTORE_FILENAME", "server.keystore.jks") self.with_env("KAFKA_SSL_KEYSTORE_CREDENTIALS", "password.txt") self.with_env("KAFKA_SSL_KEY_CREDENTIALS", "password.txt") - self.with_env("KAFKA_SSL_TRUSTSTORE_FILENAME", "kafka-server.keystore.jks") + self.with_env("KAFKA_SSL_TRUSTSTORE_FILENAME", "ca.truststore.jks") self.with_env("KAFKA_SSL_TRUSTSTORE_CREDENTIALS", "password.txt") self.with_env("KAFKA_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM", " ") - self.with_env("KAFKA_SSL_CLIENT_AUTH", "required") + self.with_env("KAFKA_SSL_CLIENT_AUTH", "requested") self.with_env("KAFKA_INTER_BROKER_LISTENER_NAME", "SSL") diff --git a/tests/integration/test_akafka.py b/tests/integration/test_akafka.py index c55d8c32..fd1dfaa8 100644 --- a/tests/integration/test_akafka.py +++ b/tests/integration/test_akafka.py @@ -84,10 +84,10 @@ async def test_kafka_event_subscriber(kafka_fixture: KafkaFixture): # noqa: F81 service_instance_id="1", kafka_servers=kafka_fixture.kafka_servers, security_protocol="SSL", - ssl_cafile="/tmp/ssl/kafka-server-cert.crt", - ssl_certfile="/tmp/ssl/kafka-server-cert.crt", - ssl_keyfile="/tmp/ssl/kafka-server-private.key", - ssl_password="password", + ssl_cafile="/workspace/.ssl/ca-cert.crt" + # # ssl_certfile="/workspace/.ssl/kafka-server-cert.crt", + # # ssl_keyfile="/workspace/.ssl/kafka-server-private.key", + # ssl_password="password", ) async with KafkaEventSubscriber.construct(