Skip to content

Commit

Permalink
Merge pull request #530 from agustafson/certificate-loading
Browse files Browse the repository at this point in the history
  • Loading branch information
bplommer authored Apr 6, 2021
2 parents 35091b7 + 53cf695 commit 3ec88f5
Show file tree
Hide file tree
Showing 6 changed files with 134 additions and 0 deletions.
32 changes: 32 additions & 0 deletions docs/src/main/mdoc/certificates.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
---
id: security
title: Security & Certificates
---

## Security: certificates, trust stores, and passwords

The `KafkaCredentialStore` can be used to create the necessary trust stores and passwords to access kafka.

The parameters passed in are string representations of the client private key, client certificate
and service certificate. the `properties` field in `KafkaCredentialStore` can then be applied to
any of the `*Settings` classes by using the `withProperties(kafkaCredentialStore.properties)`.

```scala mdoc
import cats.effect._
import fs2.kafka._
import fs2.kafka.security._

def createKafkaProducerUsingPem[F[_]: Sync, K, V](
caCertificate: String,
accessKey: String,
accessCertificate: String
)(implicit keySer: Serializer[F, K], valSer: Serializer[F, V]): ProducerSettings[F, K, V] =
ProducerSettings[F, K, V]
.withCredentials(
KafkaCredentialStore.fromPemStrings(
caCertificate,
accessKey,
accessCertificate
)
)
```
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ package fs2.kafka
import cats.effect.{Blocker, Sync}
import cats.Show
import fs2.kafka.internal.converters.collection._
import fs2.kafka.security.KafkaCredentialStore
import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig}

import scala.concurrent.duration._

/**
Expand Down Expand Up @@ -201,6 +203,12 @@ sealed abstract class AdminClientSettings[F[_]] {
def withCreateAdminClient(
createAdminClient: Map[String, String] => F[AdminClient]
): AdminClientSettings[F]

/**
* Includes the credentials properties from the provided [[KafkaCredentialStore]]
*/
def withCredentials(credentialsStore: KafkaCredentialStore): AdminClientSettings[F] =
withProperties(credentialsStore.properties)
}

object AdminClientSettings {
Expand Down
8 changes: 8 additions & 0 deletions modules/core/src/main/scala/fs2/kafka/ConsumerSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@ package fs2.kafka
import cats.effect.{Blocker, Sync}
import cats.Show
import fs2.kafka.internal.converters.collection._
import fs2.kafka.security.KafkaCredentialStore
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.requests.OffsetFetchResponse
import org.apache.kafka.common.serialization.ByteArrayDeserializer

import scala.concurrent.duration._

/**
Expand Down Expand Up @@ -397,6 +399,12 @@ sealed abstract class ConsumerSettings[F[_], K, V] {
* instead be set to `2` and not the specified value.
*/
def withMaxPrefetchBatches(maxPrefetchBatches: Int): ConsumerSettings[F, K, V]

/**
* Includes the credentials properties from the provided [[KafkaCredentialStore]]
*/
def withCredentials(credentialsStore: KafkaCredentialStore): ConsumerSettings[F, K, V] =
withProperties(credentialsStore.properties)
}

object ConsumerSettings {
Expand Down
8 changes: 8 additions & 0 deletions modules/core/src/main/scala/fs2/kafka/ProducerSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ package fs2.kafka
import cats.effect.{Blocker, Sync}
import cats.Show
import fs2.kafka.internal.converters.collection._
import fs2.kafka.security.KafkaCredentialStore
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization.ByteArraySerializer

import scala.concurrent.duration._

/**
Expand Down Expand Up @@ -239,6 +241,12 @@ sealed abstract class ProducerSettings[F[_], K, V] {
def withCreateProducer(
createProducer: Map[String, String] => F[KafkaByteProducer]
): ProducerSettings[F, K, V]

/**
* Includes the credentials properties from the provided [[KafkaCredentialStore]]
*/
def withCredentials(credentialsStore: KafkaCredentialStore): ProducerSettings[F, K, V] =
withProperties(credentialsStore.properties)
}

object ProducerSettings {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright 2018-2021 OVO Energy Limited
*
* SPDX-License-Identifier: Apache-2.0
*/

package fs2.kafka.security

sealed trait KafkaCredentialStore {
def properties: Map[String, String]
}

object KafkaCredentialStore {
final def fromPemStrings(
caCertificate: String,
clientPrivateKey: String,
clientCertificate: String
): KafkaCredentialStore =
new KafkaCredentialStore {
override val properties: Map[String, String] =
Map(
"security.protocol" -> "SSL",
"ssl.truststore.type" -> "PEM",
"ssl.truststore.certificates" -> caCertificate.replace("\n", ""),
"ssl.keystore.type" -> "PEM",
"ssl.keystore.key" -> clientPrivateKey.replace("\n", ""),
"ssl.keystore.certificate.chain" -> clientCertificate.replace("\n", "")
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package fs2.kafka.security

import fs2.kafka.BaseSpec

final class KafkaCredentialStoreSpec extends BaseSpec {
describe("KafkaCredentialStore") {
describe("fromPemStrigs") {
it("should create a KafkaCredentialStore with the expected properties") {
val caCert =
"""
|-----BEGIN CERTIFICATE-----
|RmFrZSBDQSBjZXJ0aWZpY2F0ZSBGYWtlIENBIGNlcnRpZmljYXRlIEZha2UgQ0EgY2VydGlmaWNh
|dGUgRmFrZSBDQSBjZXJ0aWZpY2F0ZQ==
|-----END CERTIFICATE-----
|""".stripMargin

val privateKey =
"""
|-----BEGIN PRIVATE KEY-----
|RmFrZSBwcml2YXRlIGtleSBGYWtlIHByaXZhdGUga2V5IEZha2UgcHJpdmF0ZSBrZXkgRmFrZSBw
|cml2YXRlIGtleSBGYWtlIHByaXZhdGUga2V5IA==
|-----END PRIVATE KEY-----
|""".stripMargin

val clientCert =
"""
|-----BEGIN CERTIFICATE-----
|RmFrZSBjbGllbnQgY2VydCBGYWtlIGNsaWVudCBjZXJ0IEZha2UgY2xpZW50IGNlcnQgRmFrZSBj
|bGllbnQgY2VydCBGYWtlIGNsaWVudCBjZXJ0IA==
|-----END CERTIFICATE-----
|""".stripMargin

val store = KafkaCredentialStore.fromPemStrings(caCert, privateKey, clientCert)

assert(
store.properties === Map(
"security.protocol" -> "SSL",
"ssl.truststore.type" -> "PEM",
"ssl.truststore.certificates" -> "-----BEGIN CERTIFICATE-----RmFrZSBDQSBjZXJ0aWZpY2F0ZSBGYWtlIENBIGNlcnRpZmljYXRlIEZha2UgQ0EgY2VydGlmaWNhdGUgRmFrZSBDQSBjZXJ0aWZpY2F0ZQ==-----END CERTIFICATE-----",
"ssl.keystore.type" -> "PEM",
"ssl.keystore.key" -> "-----BEGIN PRIVATE KEY-----RmFrZSBwcml2YXRlIGtleSBGYWtlIHByaXZhdGUga2V5IEZha2UgcHJpdmF0ZSBrZXkgRmFrZSBwcml2YXRlIGtleSBGYWtlIHByaXZhdGUga2V5IA==-----END PRIVATE KEY-----",
"ssl.keystore.certificate.chain" -> "-----BEGIN CERTIFICATE-----RmFrZSBjbGllbnQgY2VydCBGYWtlIGNsaWVudCBjZXJ0IEZha2UgY2xpZW50IGNlcnQgRmFrZSBjbGllbnQgY2VydCBGYWtlIGNsaWVudCBjZXJ0IA==-----END CERTIFICATE-----"
)
)
}
}
}
}

0 comments on commit 3ec88f5

Please sign in to comment.