Skip to content

Commit

Permalink
Add ca, server certs for ssl testing
Browse files Browse the repository at this point in the history
  • Loading branch information
walzph committed Nov 2, 2023
1 parent a7951a0 commit ff8e1d7
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 40 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -137,3 +137,6 @@ dmypy.json
.DS_Store
desktop.ini
thumbs.db

# SSL keystore
.ssl/
109 changes: 73 additions & 36 deletions src/hexkit/providers/akafka/testutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import jks
import pytest_asyncio
from aiokafka import AIOKafkaConsumer, TopicPartition
from cryptography import x509
from kafka import KafkaAdminClient
from kafka.errors import KafkaError
from OpenSSL import crypto
Expand Down Expand Up @@ -402,63 +403,97 @@ async def expect_events(
)


def generate_ssl_certificates():
def generate_ssl_certificates(): # noqa: PLR0915
"""Generate ssl keys"""
key = crypto.PKey()
key.generate_key(crypto.TYPE_RSA, 2048)
ca_key = crypto.PKey()
ca_key.generate_key(crypto.TYPE_RSA, 2048)

# generate a self signed certificate
cert = crypto.X509()
cert.get_subject().CN = "my.server.example.com"
cert.set_serial_number(473289472)
cert.gmtime_adj_notBefore(0)
cert.gmtime_adj_notAfter(365 * 24 * 60 * 60)
cert.set_issuer(cert.get_subject())
cert.set_pubkey(key)
cert.sign(key, "sha256")

# dumping the key and cert to ASN1
dumped_cert = crypto.dump_certificate(crypto.FILETYPE_ASN1, cert)
dumped_key = crypto.dump_privatekey(crypto.FILETYPE_ASN1, key)
ca_cert = crypto.X509()
ca_cert.get_subject().CN = "GHGA Test Certificate Authority"
ca_cert.set_serial_number(x509.random_serial_number())
ca_cert.gmtime_adj_notBefore(0)
ca_cert.gmtime_adj_notAfter(365 * 24 * 60 * 60)
ca_cert.set_issuer(ca_cert.get_subject())
ca_cert.set_pubkey(ca_key)
ca_cert.sign(ca_key, "sha256")

server_key = crypto.PKey()
server_key.generate_key(crypto.TYPE_RSA, 2048)

# generate a self signed certificate
server_cert = crypto.X509()
server_cert.get_subject().CN = "Broker 1"
server_cert.set_serial_number(x509.random_serial_number())
server_cert.gmtime_adj_notBefore(0)
server_cert.gmtime_adj_notAfter(365 * 24 * 60 * 60)
server_cert.set_issuer(ca_cert.get_subject())
server_cert.set_pubkey(server_key)
server_cert.sign(ca_key, "sha256")

# dumping the ca key and cert to ASN1
dumped_ca_cert = crypto.dump_certificate(crypto.FILETYPE_ASN1, ca_cert)
dumped_ca_key = crypto.dump_privatekey(crypto.FILETYPE_ASN1, ca_key)
# dumping the server key and cert to ASN1
dumped_server_cert = crypto.dump_certificate(crypto.FILETYPE_ASN1, server_cert)
dumped_server_key = crypto.dump_privatekey(crypto.FILETYPE_ASN1, server_key)

# creating a private key entry
pke = jks.PrivateKeyEntry.new(
"self signed cert", [dumped_cert], dumped_key, "rsa_raw"
ca_pke = jks.PrivateKeyEntry.new(
"self signed cert", [dumped_ca_cert], dumped_ca_key, "rsa_raw"
)

# creating a private key entry
server_pke = jks.PrivateKeyEntry.new(
"self signed cert", [dumped_server_cert], dumped_server_key, "rsa_raw"
)
# if we want the private key entry to have a unique password, we can encrypt it beforehand
# if it is not ecrypted when saved, it will be encrypted with the same password as the keystore
# pke.encrypt('my_private_key_password')
# pke.encrypt("")

# os.mkdir("ssl")
ssl_dir = Path("/tmp/ssl") # noqa: S108
ssl_dir = Path("/workspace/.ssl")
Path(ssl_dir).mkdir(exist_ok=True)
cert_path = os.path.abspath(f"{ssl_dir}/kafka-server-cert.crt")
key_path = os.path.abspath(f"{ssl_dir}/kafka-server-private.key")
jks_path = os.path.abspath(f"{ssl_dir}/kafka-server.keystore.jks")
ca_cert_path = os.path.abspath(f"{ssl_dir}/ca-cert.crt")
ca_key_path = os.path.abspath(f"{ssl_dir}/ca-private.key")
ca_jks_path = os.path.abspath(f"{ssl_dir}/ca.truststore.jks")
server_cert_path = os.path.abspath(f"{ssl_dir}/server-cert.crt")
server_key_path = os.path.abspath(f"{ssl_dir}/server-private.key")
server_jks_path = os.path.abspath(f"{ssl_dir}/server.keystore.jks")
cred_path = os.path.abspath(f"{ssl_dir}/password.txt")
keystore_password = "password" # noqa: S105

with open(cert_path, "w") as f:
f.write(crypto.dump_certificate(crypto.FILETYPE_PEM, cert).decode("utf-8"))
with open(ca_cert_path, "w") as f:
f.write(crypto.dump_certificate(crypto.FILETYPE_PEM, ca_cert).decode("utf-8"))

with open(key_path, "w") as f:
f.write(crypto.dump_privatekey(crypto.FILETYPE_PEM, key).decode("utf-8"))
with open(ca_key_path, "w") as f:
f.write(crypto.dump_privatekey(crypto.FILETYPE_PEM, ca_key).decode("utf-8"))
with open(server_cert_path, "w") as f:
f.write(
crypto.dump_certificate(crypto.FILETYPE_PEM, server_cert).decode("utf-8")
)

with open(server_key_path, "w") as f:
f.write(crypto.dump_privatekey(crypto.FILETYPE_PEM, server_key).decode("utf-8"))

with open(cred_path, "w") as f:
f.write(keystore_password)

# creating a jks keystore with the private key, and saving it
keystore = jks.KeyStore.new("jks", [pke])
keystore.save(jks_path, keystore_password)
ca_keystore = jks.KeyStore.new("jks", [ca_pke])
ca_keystore.save(ca_jks_path, keystore_password)

# creating a jks keystore with the private key, and saving it
server_keystore = jks.KeyStore.new("jks", [server_pke])
server_keystore.save(server_jks_path, keystore_password)

return {
"ssl_dir": ssl_dir,
"ssl_certfile": cert_path,
"ssl_keyfile": key_path,
"ssl_cafile": cert_path,
"ssl_certfile": ca_cert_path,
"ssl_keyfile": ca_key_path,
"ssl_cafile": ca_cert_path,
"ssl_password": cred_path,
"keystore_location": jks_path,
"keystore_location": ca_jks_path,
}


Expand All @@ -469,23 +504,25 @@ def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.ssl_config = generate_ssl_certificates()

self.with_volume_mapping(str(self.ssl_config["ssl_dir"]), "/etc/kafka/secrets")
self.with_volume_mapping(
"/Users/w620-admin/workspace/ghga/hexkit/.ssl", "/etc/kafka/secrets"
)

self.with_env("KAFKA_LISTENERS", "SSL://0.0.0.0:9093,BROKER://0.0.0.0:9092")
self.with_env(
"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "SSL:SSL,BROKER:PLAINTEXT"
)
self.with_env("KAFKA_SECURITY_PROTOCOL", "SSL")

self.with_env("KAFKA_SSL_KEYSTORE_FILENAME", "kafka-server.keystore.jks")
self.with_env("KAFKA_SSL_KEYSTORE_FILENAME", "server.keystore.jks")
self.with_env("KAFKA_SSL_KEYSTORE_CREDENTIALS", "password.txt")
self.with_env("KAFKA_SSL_KEY_CREDENTIALS", "password.txt")

self.with_env("KAFKA_SSL_TRUSTSTORE_FILENAME", "kafka-server.keystore.jks")
self.with_env("KAFKA_SSL_TRUSTSTORE_FILENAME", "ca.truststore.jks")
self.with_env("KAFKA_SSL_TRUSTSTORE_CREDENTIALS", "password.txt")

self.with_env("KAFKA_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM", " ")
self.with_env("KAFKA_SSL_CLIENT_AUTH", "required")
self.with_env("KAFKA_SSL_CLIENT_AUTH", "requested")

self.with_env("KAFKA_INTER_BROKER_LISTENER_NAME", "SSL")

Expand Down
8 changes: 4 additions & 4 deletions tests/integration/test_akafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,10 @@ async def test_kafka_event_subscriber(kafka_fixture: KafkaFixture): # noqa: F81
service_instance_id="1",
kafka_servers=kafka_fixture.kafka_servers,
security_protocol="SSL",
ssl_cafile="/tmp/ssl/kafka-server-cert.crt",
ssl_certfile="/tmp/ssl/kafka-server-cert.crt",
ssl_keyfile="/tmp/ssl/kafka-server-private.key",
ssl_password="password",
ssl_cafile="/workspace/.ssl/ca-cert.crt"
# # ssl_certfile="/workspace/.ssl/kafka-server-cert.crt",
# # ssl_keyfile="/workspace/.ssl/kafka-server-private.key",
# ssl_password="password",
)

async with KafkaEventSubscriber.construct(
Expand Down

0 comments on commit ff8e1d7

Please sign in to comment.