From f00991a063d88c848f8a79b26f7b46e7ef75fff9 Mon Sep 17 00:00:00 2001
From: PeterN <79838809+peter-quix@users.noreply.github.com>
Date: Thu, 24 Oct 2024 12:29:40 +0100
Subject: [PATCH] Update QuixStreamingClient to use V2 API (#61)
* Update to use V2 and librdkafka endpoint
* Add back Quixstreams.Streaming.Samples
---
builds/csharp/nuget/build_nugets.py | 4 +-
src/Quix.Streams.sln | 9 +-
src/QuixStreams.Kafka/KafkaConsumer.cs | 3 +-
src/QuixStreams.Kafka/KafkaProducer.cs | 3 +-
.../Configuration.cs | 41 +++++++++
src/QuixStreams.Streaming.Samples/Program.cs | 19 +++-
.../QuixStreams.Streaming.Samples.csproj | 27 ++++++
.../QuixStreams.Streaming.Samples.sln | 25 ++++++
.../Configuration/SecurityOptions.cs | 33 +++----
.../KafkaStreamingClient.cs | 9 +-
.../QuixStreamingClient.cs | 86 ++++---------------
.../Configuration/SecurityOptionsBuilder.cs | 32 ++++++-
12 files changed, 191 insertions(+), 100 deletions(-)
create mode 100644 src/QuixStreams.Streaming.Samples/Configuration.cs
create mode 100644 src/QuixStreams.Streaming.Samples/QuixStreams.Streaming.Samples.csproj
create mode 100644 src/QuixStreams.Streaming.Samples/QuixStreams.Streaming.Samples.sln
diff --git a/builds/csharp/nuget/build_nugets.py b/builds/csharp/nuget/build_nugets.py
index 47b1edda..444eec76 100644
--- a/builds/csharp/nuget/build_nugets.py
+++ b/builds/csharp/nuget/build_nugets.py
@@ -7,8 +7,8 @@
from typing import List
version = "0.7.5.0"
-informal_version = "0.7.5.0-dev1"
-nuget_version = "0.7.5.0-dev1"
+informal_version = "0.7.5.0-dev2"
+nuget_version = "0.7.5.0-dev2"
def updatecsproj(projfilepath):
diff --git a/src/Quix.Streams.sln b/src/Quix.Streams.sln
index 1c51ca2e..c457484c 100644
--- a/src/Quix.Streams.sln
+++ b/src/Quix.Streams.sln
@@ -51,6 +51,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "QuixStreams.IntegrationTest
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "QuixStreams.Kafka.Transport.Tests", "QuixStreams.Kafka.Transport.Tests\QuixStreams.Kafka.Transport.Tests.csproj", "{AC3B6BC9-822B-4D21-951E-846B5C15EE2B}"
EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "QuixStreams.Streaming.Samples", "QuixStreams.Streaming.Samples\QuixStreams.Streaming.Samples.csproj", "{5AFAD0A1-8E84-4F81-9497-14177DCEAC6D}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -59,7 +61,7 @@ Global
GlobalSection(ProjectConfigurationPlatforms) = postSolution
{E317338C-BD9B-455A-82AC-81400D7F1C68}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{E317338C-BD9B-455A-82AC-81400D7F1C68}.Debug|Any CPU.Build.0 = Debug|Any CPU
- {E317338C-BD9B-455A-82AC-81400D7F1C68}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {E317338C-BD9B-455A-82AC-81400D7F1C68}.Release|Any CPU.ActiveCfg = Release|Any CPU
{E317338C-BD9B-455A-82AC-81400D7F1C68}.Release|Any CPU.Build.0 = Release|Any CPU
{350E3A07-A7F5-4ED0-8328-059CC80F07E9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{350E3A07-A7F5-4ED0-8328-059CC80F07E9}.Debug|Any CPU.Build.0 = Debug|Any CPU
@@ -144,6 +146,10 @@ Global
{AC3B6BC9-822B-4D21-951E-846B5C15EE2B}.Debug|Any CPU.Build.0 = Debug|Any CPU
{AC3B6BC9-822B-4D21-951E-846B5C15EE2B}.Release|Any CPU.ActiveCfg = Release|Any CPU
{AC3B6BC9-822B-4D21-951E-846B5C15EE2B}.Release|Any CPU.Build.0 = Release|Any CPU
+ {5AFAD0A1-8E84-4F81-9497-14177DCEAC6D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {5AFAD0A1-8E84-4F81-9497-14177DCEAC6D}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {5AFAD0A1-8E84-4F81-9497-14177DCEAC6D}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {5AFAD0A1-8E84-4F81-9497-14177DCEAC6D}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -166,6 +172,7 @@ Global
{4A3B1B05-F64E-4BE7-9C27-53340CD4D6E5} = {F1EDBE48-54DD-4ABE-A6FB-2ECC9C96F260}
{1F88D7A0-5F93-4816-B314-7D26DA3F8FA1} = {F1EDBE48-54DD-4ABE-A6FB-2ECC9C96F260}
{AC3B6BC9-822B-4D21-951E-846B5C15EE2B} = {F1EDBE48-54DD-4ABE-A6FB-2ECC9C96F260}
+ {5AFAD0A1-8E84-4F81-9497-14177DCEAC6D} = {A017302E-839F-4D1C-AD50-5C6AF5F7DE3F}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {C6290472-9588-465F-889C-D5CA8A845449}
diff --git a/src/QuixStreams.Kafka/KafkaConsumer.cs b/src/QuixStreams.Kafka/KafkaConsumer.cs
index 6f5f8de7..cc24dc47 100644
--- a/src/QuixStreams.Kafka/KafkaConsumer.cs
+++ b/src/QuixStreams.Kafka/KafkaConsumer.cs
@@ -157,7 +157,8 @@ string GetConfigId()
foreach (var keyValuePair in this.config)
{
if (keyValuePair.Key?.IndexOf("password", StringComparison.InvariantCultureIgnoreCase) > -1 ||
- keyValuePair.Key?.IndexOf("username", StringComparison.InvariantCultureIgnoreCase) > -1)
+ keyValuePair.Key?.IndexOf("username", StringComparison.InvariantCultureIgnoreCase) > -1 ||
+ keyValuePair.Key?.IndexOf("ssl.ca.pem", StringComparison.InvariantCultureIgnoreCase) > -1)
{
logBuilder.AppendLine($"= {keyValuePair.Key}: [REDACTED]");
}
diff --git a/src/QuixStreams.Kafka/KafkaProducer.cs b/src/QuixStreams.Kafka/KafkaProducer.cs
index 6ac5c8f2..c4307d21 100644
--- a/src/QuixStreams.Kafka/KafkaProducer.cs
+++ b/src/QuixStreams.Kafka/KafkaProducer.cs
@@ -135,7 +135,8 @@ string CreateConfigId(ProducerTopicConfiguration topicConfiguration)
foreach (var keyValuePair in this.config)
{
if (keyValuePair.Key?.IndexOf("password", StringComparison.InvariantCultureIgnoreCase) > -1 ||
- keyValuePair.Key?.IndexOf("username", StringComparison.InvariantCultureIgnoreCase) > -1)
+ keyValuePair.Key?.IndexOf("username", StringComparison.InvariantCultureIgnoreCase) > -1 ||
+ keyValuePair.Key?.IndexOf("ssl.ca.pem", StringComparison.InvariantCultureIgnoreCase) > -1)
{
logBuilder.AppendLine($"= {keyValuePair.Key}: [REDACTED]");
}
diff --git a/src/QuixStreams.Streaming.Samples/Configuration.cs b/src/QuixStreams.Streaming.Samples/Configuration.cs
new file mode 100644
index 00000000..6b74753d
--- /dev/null
+++ b/src/QuixStreams.Streaming.Samples/Configuration.cs
@@ -0,0 +1,41 @@
+using Microsoft.Extensions.Configuration;
+using QuixStreams.Streaming.Configuration;
+
+namespace QuixStreams.Streaming.Samples
+{
+ public class Configuration
+ {
+ public static KafkaConfiguration Config;
+
+
+ public static QuixStreamingClientConfig QuixStreamingClientConfig;
+
+ static Configuration()
+ {
+ var builder = new ConfigurationBuilder();
+ builder.AddJsonFile("appsettings.json", optional: false);
+ var appConfig = builder.Build();
+
+ Config = new KafkaConfiguration();
+ appConfig.Bind("KafkaConfiguration", Config);
+
+ QuixStreamingClientConfig = new QuixStreamingClientConfig();
+ appConfig.Bind("QuixStreamingClientConfig", QuixStreamingClientConfig);
+
+ }
+ }
+
+ public class KafkaConfiguration
+ {
+ public string BrokerList { get; set; }
+ public string Topic { get; set; }
+ public string ConsumerId { get; set; }
+ public SecurityOptions Security{ get; set; }
+ }
+
+ public class QuixStreamingClientConfig
+ {
+ public string PortalApi { get; set; }
+ public string Token { get; set; }
+ }
+}
\ No newline at end of file
diff --git a/src/QuixStreams.Streaming.Samples/Program.cs b/src/QuixStreams.Streaming.Samples/Program.cs
index 90af181d..02864fb5 100644
--- a/src/QuixStreams.Streaming.Samples/Program.cs
+++ b/src/QuixStreams.Streaming.Samples/Program.cs
@@ -91,7 +91,24 @@ private static void ExampleReadWriteUsingQuixStreamingClient(in CancellationToke
var quixStreamClient = new QuixStreamingClient(QuixStreams.Streaming.Samples.Configuration.QuixStreamingClientConfig.Token);
quixStreamClient.ApiUrl = new Uri(QuixStreams.Streaming.Samples.Configuration.QuixStreamingClientConfig.PortalApi);
- var topicProducer = quixStreamClient.GetTopicConsumer("iddqd");
+ using var topicConsumer = quixStreamClient.GetTopicConsumer("test-topic-sdk");
+ using var topicProducer = quixStreamClient.GetTopicProducer("test-topic-sdk");
+
+ var packageReceived = 0;
+ topicConsumer.OnStreamReceived += (sender, consumer) =>
+ {
+ Console.WriteLine("Stream {0} received", consumer.StreamId);
+ consumer.OnPackageReceived += (o, args) =>
+ {
+ packageReceived++;
+ };
+ };
+ topicConsumer.Subscribe();
+ var stream = topicProducer.GetOrCreateStream("test-stream");
+ stream.Timeseries.Buffer.AddTimestamp(DateTime.UtcNow).AddValue("parameter1", "somevalue").Publish();
+ stream.Flush();
+ stream.Close();
+ SpinWait.SpinUntil(() => packageReceived > 0, TimeSpan.FromSeconds(5));
}
}
}
diff --git a/src/QuixStreams.Streaming.Samples/QuixStreams.Streaming.Samples.csproj b/src/QuixStreams.Streaming.Samples/QuixStreams.Streaming.Samples.csproj
new file mode 100644
index 00000000..baf2e8dc
--- /dev/null
+++ b/src/QuixStreams.Streaming.Samples/QuixStreams.Streaming.Samples.csproj
@@ -0,0 +1,27 @@
+
+
+
+ Exe
+ net7.0;netstandard2.1
+ 8.0
+ Debug;Release
+ AnyCPU
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ PreserveNewest
+
+
+
+
diff --git a/src/QuixStreams.Streaming.Samples/QuixStreams.Streaming.Samples.sln b/src/QuixStreams.Streaming.Samples/QuixStreams.Streaming.Samples.sln
new file mode 100644
index 00000000..82cc6a3b
--- /dev/null
+++ b/src/QuixStreams.Streaming.Samples/QuixStreams.Streaming.Samples.sln
@@ -0,0 +1,25 @@
+
+Microsoft Visual Studio Solution File, Format Version 12.00
+# Visual Studio Version 16
+VisualStudioVersion = 25.0.1706.3
+MinimumVisualStudioVersion = 10.0.40219.1
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "QuixStreams.Streaming.Samples", "QuixStreams.Streaming.Samples.csproj", "{FCFCF867-C987-4746-ADD3-EBA8F3363CA0}"
+EndProject
+Global
+ GlobalSection(SolutionConfigurationPlatforms) = preSolution
+ Debug|Any CPU = Debug|Any CPU
+ Release|Any CPU = Release|Any CPU
+ EndGlobalSection
+ GlobalSection(ProjectConfigurationPlatforms) = postSolution
+ {FCFCF867-C987-4746-ADD3-EBA8F3363CA0}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {FCFCF867-C987-4746-ADD3-EBA8F3363CA0}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {FCFCF867-C987-4746-ADD3-EBA8F3363CA0}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {FCFCF867-C987-4746-ADD3-EBA8F3363CA0}.Release|Any CPU.Build.0 = Release|Any CPU
+ EndGlobalSection
+ GlobalSection(SolutionProperties) = preSolution
+ HideSolutionNode = FALSE
+ EndGlobalSection
+ GlobalSection(ExtensibilityGlobals) = postSolution
+ SolutionGuid = {FC022A17-D1F9-4D53-B133-B31D19F5D99E}
+ EndGlobalSection
+EndGlobal
diff --git a/src/QuixStreams.Streaming/Configuration/SecurityOptions.cs b/src/QuixStreams.Streaming/Configuration/SecurityOptions.cs
index 38c5dd70..609b4992 100644
--- a/src/QuixStreams.Streaming/Configuration/SecurityOptions.cs
+++ b/src/QuixStreams.Streaming/Configuration/SecurityOptions.cs
@@ -1,4 +1,6 @@
-namespace QuixStreams.Streaming.Configuration
+using System;
+
+namespace QuixStreams.Streaming.Configuration
{
///
/// A class representing security options for configuring SSL encryption with SASL authentication in Kafka.
@@ -23,7 +25,15 @@ public class SecurityOptions
///
/// The path to the folder or file containing the certificate authority certificate(s) to validate the ssl connection.
///
+ [Obsolete("Use SslCaContent instead")]
public string SslCertificates { get; set; }
+
+ ///
+ /// The content of the SSL certificate authority to use.
+ /// This is the same as ssl.ca.pem in librdkafka.
+ /// If specified, is ignored
+ ///
+ public string SslCaContent { get; set; }
///
/// Use SSL
@@ -41,26 +51,5 @@ public class SecurityOptions
public SecurityOptions()
{
}
-
- ///
- /// Initializes a new instance of that is configured for SSL encryption with SASL authentication
- ///
- /// The path to the folder or file containing the certificate authority certificate(s) to validate the ssl connection. Example: "./certificates/ca.cert"
- /// The username for the SASL authentication
- /// The password for the SASL authentication
- /// The SASL mechanism to use. Defaulting to ScramSha256
- public SecurityOptions(string sslCertificates, string username, string password, SaslMechanism saslMechanism = Configuration.SaslMechanism.ScramSha256)
- {
- this.SslCertificates = sslCertificates;
- this.Username = username;
- this.Password = password;
- this.SaslMechanism = saslMechanism;
-
- // Assume that if we get sslCertificates it's because we will use ssl
- this.UseSsl = !string.IsNullOrEmpty(this.SslCertificates);
-
- // Assume that if we have username, we will use Sasl
- this.UseSasl = !string.IsNullOrEmpty(this.Username);
- }
}
}
diff --git a/src/QuixStreams.Streaming/KafkaStreamingClient.cs b/src/QuixStreams.Streaming/KafkaStreamingClient.cs
index eabfb944..2fb6ed80 100644
--- a/src/QuixStreams.Streaming/KafkaStreamingClient.cs
+++ b/src/QuixStreams.Streaming/KafkaStreamingClient.cs
@@ -130,7 +130,14 @@ public KafkaStreamingClient(string brokerAddress, SecurityOptions securityOption
if (securityOptions.UseSsl)
{
- securityOptionsBuilder.SetSslEncryption(securityOptions.SslCertificates);
+ if (!string.IsNullOrWhiteSpace(securityOptions.SslCaContent))
+ {
+ securityOptionsBuilder.SetSslCaContent(securityOptions.SslCaContent);
+ }
+ else
+ {
+ securityOptionsBuilder.SetSslEncryption(securityOptions.SslCertificates);
+ }
}
else
{
diff --git a/src/QuixStreams.Streaming/QuixStreamingClient.cs b/src/QuixStreams.Streaming/QuixStreamingClient.cs
index b0298a6c..5306fdf9 100644
--- a/src/QuixStreams.Streaming/QuixStreamingClient.cs
+++ b/src/QuixStreams.Streaming/QuixStreamingClient.cs
@@ -720,13 +720,19 @@ private async Task CreateStreamingClientForWorkspace(Works
}
logger.LogWarning("Workspace {0} is in state {1} instead of {2}.", ws.WorkspaceId, ws.Status, WorkspaceStatus.Ready);
}
-
+
var securityOptions = new SecurityOptions();
if (ws.Broker.SecurityMode == BrokerSecurityMode.Ssl || ws.Broker.SecurityMode == BrokerSecurityMode.SaslSsl)
{
+ var librdKafkaConfig = await GetWorkspaceLibrdKafkaConfig(ws.WorkspaceId);
securityOptions.UseSsl = true;
- securityOptions.SslCertificates = await GetWorkspaceCertificatePath(ws).ConfigureAwait(false);
+ if (librdKafkaConfig.TryGetValue("ssl.ca.cert", out var sslcacert))
+ {
+ byte[] data = Convert.FromBase64String(sslcacert);
+ string decodedString = System.Text.Encoding.UTF8.GetString(data);
+ securityOptions.SslCaContent = decodedString;
+ }
if (!brokerProperties.ContainsKey("ssl.endpoint.identification.algorithm"))
{
brokerProperties["ssl.endpoint.identification.algorithm"] = "none"; // default back to None
@@ -770,74 +776,7 @@ private async Task CreateStreamingClientForWorkspace(Works
var client = new KafkaStreamingClient(ws.Broker.Address, securityOptions, brokerProperties, debug);
return wsToStreamingClientDict.GetOrAdd(ws.WorkspaceId, client);
}
-
- private async Task GetWorkspaceCertificatePath(Workspace ws)
- {
- if (!ws.Broker.HasCertificate) return null;
- var targetFolder = Path.Combine(Directory.GetCurrentDirectory(), "certificates", ws.WorkspaceId);
- var certPath = Path.Combine(targetFolder, "ca.cert");
- if (!File.Exists(certPath))
- {
- var wsLock = workspaceLocks.GetOrAdd(ws.WorkspaceId, new object());
- lock (wsLock)
- {
- if (!File.Exists(certPath))
- {
- async Task HelperFunc()
- {
- Directory.CreateDirectory(targetFolder);
- this.logger.LogTrace("Certificate is not yet downloaded for workspace {0}.", ws.Name);
- var zipPath = Path.Combine(targetFolder, "certs.zip");
- if (!File.Exists(zipPath))
- {
- this.logger.LogTrace("Downloading certificate for workspace {0}.", ws.Name);
- var response = await this.SendRequestToApi(HttpMethod.Get, new Uri(ApiUrl, $"workspaces/{ws.WorkspaceId}/certificates")).ConfigureAwait(false);
- if (response.StatusCode == HttpStatusCode.NoContent)
- {
- ws.Broker.HasCertificate = false;
- return null;
- }
-
- using var fs = File.Open(zipPath, FileMode.Create);
- await response.Content.CopyToAsync(fs).ConfigureAwait(false);
- }
-
- var hasCert = false;
-
- using (var file = File.OpenRead(zipPath))
- using (var zip = new ZipArchive(file, ZipArchiveMode.Read))
- {
- foreach (var entry in zip.Entries)
- {
- if (entry.Name != "ca.cert") continue;
- using var stream = entry.Open();
- using var fs = File.Open(certPath, FileMode.Create);
- await stream.CopyToAsync(fs).ConfigureAwait(false);
- hasCert = true;
- }
- }
- File.Delete(zipPath);
- this.logger.LogTrace("Certificate is now available for workspace {0}", ws.Name);
- if (!hasCert)
- {
- this.logger.LogWarning("Expected to find certificate for workspace {0}, but the downloaded zip had none.", ws.Name);
- return null;
- }
- return certPath;
- }
- return HelperFunc().ConfigureAwait(true).GetAwaiter().GetResult();
- }
- this.logger.LogTrace("Certificate is downloaded by another thread for workspace {0}", ws.Name);
- }
- }
- else
- {
- this.logger.LogTrace("Certificate is already available for workspace {0}", ws.Name);
- }
-
- return certPath;
- }
-
+
private async Task> GetWorkspaces()
{
var result = await GetModelFromApi>("workspaces", true, true).ConfigureAwait(false);
@@ -845,6 +784,12 @@ private async Task> GetWorkspaces()
return result;
}
+ private async Task> GetWorkspaceLibrdKafkaConfig(string workspaceId)
+ {
+ var result = await GetModelFromApi>($"workspaces/{workspaceId}/broker/librdkafka", true, true).ConfigureAwait(false);
+ return result;
+ }
+
private Task> GetTopics(Workspace workspace, bool useCache)
{
return GetModelFromApi>($"{workspace.WorkspaceId}/topics", true, useCache);
@@ -891,6 +836,7 @@ private async Task SendRequestToApi(HttpMethod method, Uri
httpRequest.Content = new StringContent(JsonConvert.SerializeObject(bodyModel), Encoding.UTF8, "application/json");
}
httpRequest.Headers.Authorization = new AuthenticationHeaderValue("Bearer", token);
+ httpRequest.Headers.Add("X-Version", "2.0");
try
{
var response =
diff --git a/src/QuixStreams.Telemetry/Configuration/SecurityOptionsBuilder.cs b/src/QuixStreams.Telemetry/Configuration/SecurityOptionsBuilder.cs
index a4caa835..00e0da65 100644
--- a/src/QuixStreams.Telemetry/Configuration/SecurityOptionsBuilder.cs
+++ b/src/QuixStreams.Telemetry/Configuration/SecurityOptionsBuilder.cs
@@ -39,6 +39,27 @@ public SecurityOptionsBuilder SetSslEncryption(string certificatePath = null)
return this;
}
+
+ ///
+ /// Configures the builder to use SSL encryption with CA content
+ /// This is the same as ssl.ca.pem in librdkafka
+ ///
+ /// The builder
+ public SecurityOptionsBuilder SetSslCaContent(string caContent = null)
+ {
+ encryptionSelected = EncryptionSelected.SSL;
+
+ if (caContent != null)
+ {
+ // validate file existence
+ sslEncryptionConfiguration = new SslEncryptionConfiguration
+ {
+ CaContent = caContent
+ };
+ }
+
+ return this;
+ }
///
/// Configures the builder to use PLAINTEXT (no encryption)
@@ -83,6 +104,8 @@ public SecurityOptionsBuilder SetNoAuthentication()
private class SslEncryptionConfiguration
{
public string CaLocation { get; set; }
+
+ public string CaContent { get; set; }
}
private class SaslConfiguration
@@ -177,7 +200,14 @@ void SetSslEncryptionDetails()
{
if (sslEncryptionConfiguration != null)
{
- kafkaConfiguration["ssl.ca.location"] = sslEncryptionConfiguration.CaLocation;
+ if (!string.IsNullOrWhiteSpace(sslEncryptionConfiguration.CaContent))
+ {
+ kafkaConfiguration["ssl.ca.pem"] = sslEncryptionConfiguration.CaContent;
+ }
+ else
+ {
+ kafkaConfiguration["ssl.ca.location"] = sslEncryptionConfiguration.CaLocation;
+ }
}
}