Skip to content

Commit

Permalink
Update QuixStreamingClient to use V2 API (#61)
Browse files Browse the repository at this point in the history
* Update to use V2 and librdkafka endpoint
* Add back Quixstreams.Streaming.Samples
  • Loading branch information
peter-quix authored Oct 24, 2024
1 parent 438cac9 commit f00991a
Show file tree
Hide file tree
Showing 12 changed files with 191 additions and 100 deletions.
4 changes: 2 additions & 2 deletions builds/csharp/nuget/build_nugets.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
from typing import List

version = "0.7.5.0"
informal_version = "0.7.5.0-dev1"
nuget_version = "0.7.5.0-dev1"
informal_version = "0.7.5.0-dev2"
nuget_version = "0.7.5.0-dev2"


def updatecsproj(projfilepath):
Expand Down
9 changes: 8 additions & 1 deletion src/Quix.Streams.sln
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "QuixStreams.IntegrationTest
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "QuixStreams.Kafka.Transport.Tests", "QuixStreams.Kafka.Transport.Tests\QuixStreams.Kafka.Transport.Tests.csproj", "{AC3B6BC9-822B-4D21-951E-846B5C15EE2B}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "QuixStreams.Streaming.Samples", "QuixStreams.Streaming.Samples\QuixStreams.Streaming.Samples.csproj", "{5AFAD0A1-8E84-4F81-9497-14177DCEAC6D}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand All @@ -59,7 +61,7 @@ Global
GlobalSection(ProjectConfigurationPlatforms) = postSolution
{E317338C-BD9B-455A-82AC-81400D7F1C68}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{E317338C-BD9B-455A-82AC-81400D7F1C68}.Debug|Any CPU.Build.0 = Debug|Any CPU
{E317338C-BD9B-455A-82AC-81400D7F1C68}.Release|Any CPU.ActiveCfg = Release|Any CPU
{E317338C-BD9B-455A-82AC-81400D7F1C68}.Release|Any CPU.ActiveCfg = Release|Any CPU
{E317338C-BD9B-455A-82AC-81400D7F1C68}.Release|Any CPU.Build.0 = Release|Any CPU
{350E3A07-A7F5-4ED0-8328-059CC80F07E9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{350E3A07-A7F5-4ED0-8328-059CC80F07E9}.Debug|Any CPU.Build.0 = Debug|Any CPU
Expand Down Expand Up @@ -144,6 +146,10 @@ Global
{AC3B6BC9-822B-4D21-951E-846B5C15EE2B}.Debug|Any CPU.Build.0 = Debug|Any CPU
{AC3B6BC9-822B-4D21-951E-846B5C15EE2B}.Release|Any CPU.ActiveCfg = Release|Any CPU
{AC3B6BC9-822B-4D21-951E-846B5C15EE2B}.Release|Any CPU.Build.0 = Release|Any CPU
{5AFAD0A1-8E84-4F81-9497-14177DCEAC6D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{5AFAD0A1-8E84-4F81-9497-14177DCEAC6D}.Debug|Any CPU.Build.0 = Debug|Any CPU
{5AFAD0A1-8E84-4F81-9497-14177DCEAC6D}.Release|Any CPU.ActiveCfg = Release|Any CPU
{5AFAD0A1-8E84-4F81-9497-14177DCEAC6D}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand All @@ -166,6 +172,7 @@ Global
{4A3B1B05-F64E-4BE7-9C27-53340CD4D6E5} = {F1EDBE48-54DD-4ABE-A6FB-2ECC9C96F260}
{1F88D7A0-5F93-4816-B314-7D26DA3F8FA1} = {F1EDBE48-54DD-4ABE-A6FB-2ECC9C96F260}
{AC3B6BC9-822B-4D21-951E-846B5C15EE2B} = {F1EDBE48-54DD-4ABE-A6FB-2ECC9C96F260}
{5AFAD0A1-8E84-4F81-9497-14177DCEAC6D} = {A017302E-839F-4D1C-AD50-5C6AF5F7DE3F}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {C6290472-9588-465F-889C-D5CA8A845449}
Expand Down
3 changes: 2 additions & 1 deletion src/QuixStreams.Kafka/KafkaConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,8 @@ string GetConfigId()
foreach (var keyValuePair in this.config)
{
if (keyValuePair.Key?.IndexOf("password", StringComparison.InvariantCultureIgnoreCase) > -1 ||
keyValuePair.Key?.IndexOf("username", StringComparison.InvariantCultureIgnoreCase) > -1)
keyValuePair.Key?.IndexOf("username", StringComparison.InvariantCultureIgnoreCase) > -1 ||
keyValuePair.Key?.IndexOf("ssl.ca.pem", StringComparison.InvariantCultureIgnoreCase) > -1)
{
logBuilder.AppendLine($"= {keyValuePair.Key}: [REDACTED]");
}
Expand Down
3 changes: 2 additions & 1 deletion src/QuixStreams.Kafka/KafkaProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ string CreateConfigId(ProducerTopicConfiguration topicConfiguration)
foreach (var keyValuePair in this.config)
{
if (keyValuePair.Key?.IndexOf("password", StringComparison.InvariantCultureIgnoreCase) > -1 ||
keyValuePair.Key?.IndexOf("username", StringComparison.InvariantCultureIgnoreCase) > -1)
keyValuePair.Key?.IndexOf("username", StringComparison.InvariantCultureIgnoreCase) > -1 ||
keyValuePair.Key?.IndexOf("ssl.ca.pem", StringComparison.InvariantCultureIgnoreCase) > -1)
{
logBuilder.AppendLine($"= {keyValuePair.Key}: [REDACTED]");
}
Expand Down
41 changes: 41 additions & 0 deletions src/QuixStreams.Streaming.Samples/Configuration.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
using Microsoft.Extensions.Configuration;
using QuixStreams.Streaming.Configuration;

namespace QuixStreams.Streaming.Samples
{
public class Configuration
{
public static KafkaConfiguration Config;


public static QuixStreamingClientConfig QuixStreamingClientConfig;

static Configuration()
{
var builder = new ConfigurationBuilder();
builder.AddJsonFile("appsettings.json", optional: false);
var appConfig = builder.Build();

Config = new KafkaConfiguration();
appConfig.Bind("KafkaConfiguration", Config);

QuixStreamingClientConfig = new QuixStreamingClientConfig();
appConfig.Bind("QuixStreamingClientConfig", QuixStreamingClientConfig);

}
}

public class KafkaConfiguration
{
public string BrokerList { get; set; }
public string Topic { get; set; }
public string ConsumerId { get; set; }
public SecurityOptions Security{ get; set; }
}

public class QuixStreamingClientConfig
{
public string PortalApi { get; set; }
public string Token { get; set; }
}
}
19 changes: 18 additions & 1 deletion src/QuixStreams.Streaming.Samples/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,24 @@ private static void ExampleReadWriteUsingQuixStreamingClient(in CancellationToke
var quixStreamClient = new QuixStreamingClient(QuixStreams.Streaming.Samples.Configuration.QuixStreamingClientConfig.Token);
quixStreamClient.ApiUrl = new Uri(QuixStreams.Streaming.Samples.Configuration.QuixStreamingClientConfig.PortalApi);

var topicProducer = quixStreamClient.GetTopicConsumer("iddqd");
using var topicConsumer = quixStreamClient.GetTopicConsumer("test-topic-sdk");
using var topicProducer = quixStreamClient.GetTopicProducer("test-topic-sdk");

var packageReceived = 0;
topicConsumer.OnStreamReceived += (sender, consumer) =>
{
Console.WriteLine("Stream {0} received", consumer.StreamId);
consumer.OnPackageReceived += (o, args) =>
{
packageReceived++;
};
};
topicConsumer.Subscribe();
var stream = topicProducer.GetOrCreateStream("test-stream");
stream.Timeseries.Buffer.AddTimestamp(DateTime.UtcNow).AddValue("parameter1", "somevalue").Publish();
stream.Flush();
stream.Close();
SpinWait.SpinUntil(() => packageReceived > 0, TimeSpan.FromSeconds(5));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFrameworks>net7.0;netstandard2.1</TargetFrameworks>
<LangVersion>8.0</LangVersion>
<Configurations>Debug;Release</Configurations>
<Platforms>AnyCPU</Platforms>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Configuration.Binder" Version="7.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="7.0.0" />
</ItemGroup>


<ItemGroup>
<ProjectReference Include="..\QuixStreams.Streaming\QuixStreams.Streaming.csproj" />
</ItemGroup>

<ItemGroup>
<None Update="appsettings.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio Version 16
VisualStudioVersion = 25.0.1706.3
MinimumVisualStudioVersion = 10.0.40219.1
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "QuixStreams.Streaming.Samples", "QuixStreams.Streaming.Samples.csproj", "{FCFCF867-C987-4746-ADD3-EBA8F3363CA0}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Release|Any CPU = Release|Any CPU
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
{FCFCF867-C987-4746-ADD3-EBA8F3363CA0}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{FCFCF867-C987-4746-ADD3-EBA8F3363CA0}.Debug|Any CPU.Build.0 = Debug|Any CPU
{FCFCF867-C987-4746-ADD3-EBA8F3363CA0}.Release|Any CPU.ActiveCfg = Release|Any CPU
{FCFCF867-C987-4746-ADD3-EBA8F3363CA0}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {FC022A17-D1F9-4D53-B133-B31D19F5D99E}
EndGlobalSection
EndGlobal
33 changes: 11 additions & 22 deletions src/QuixStreams.Streaming/Configuration/SecurityOptions.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
namespace QuixStreams.Streaming.Configuration
using System;

namespace QuixStreams.Streaming.Configuration
{
/// <summary>
/// A class representing security options for configuring SSL encryption with SASL authentication in Kafka.
Expand All @@ -23,7 +25,15 @@ public class SecurityOptions
/// <summary>
/// The path to the folder or file containing the certificate authority certificate(s) to validate the ssl connection.
/// </summary>
[Obsolete("Use SslCaContent instead")]
public string SslCertificates { get; set; }

/// <summary>
/// The content of the SSL certificate authority to use.
/// This is the same as ssl.ca.pem in librdkafka.
/// If specified, <see cref="SslCertificates"/> is ignored
/// </summary>
public string SslCaContent { get; set; }

/// <summary>
/// Use SSL
Expand All @@ -41,26 +51,5 @@ public class SecurityOptions
public SecurityOptions()
{
}

/// <summary>
/// Initializes a new instance of <see cref="SecurityOptions"/> that is configured for SSL encryption with SASL authentication
/// </summary>
/// <param name="sslCertificates">The path to the folder or file containing the certificate authority certificate(s) to validate the ssl connection. Example: "./certificates/ca.cert"</param>
/// <param name="username">The username for the SASL authentication</param>
/// <param name="password">The password for the SASL authentication</param>
/// <param name="saslMechanism">The SASL mechanism to use. Defaulting to ScramSha256</param>
public SecurityOptions(string sslCertificates, string username, string password, SaslMechanism saslMechanism = Configuration.SaslMechanism.ScramSha256)
{
this.SslCertificates = sslCertificates;
this.Username = username;
this.Password = password;
this.SaslMechanism = saslMechanism;

// Assume that if we get sslCertificates it's because we will use ssl
this.UseSsl = !string.IsNullOrEmpty(this.SslCertificates);

// Assume that if we have username, we will use Sasl
this.UseSasl = !string.IsNullOrEmpty(this.Username);
}
}
}
9 changes: 8 additions & 1 deletion src/QuixStreams.Streaming/KafkaStreamingClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,14 @@ public KafkaStreamingClient(string brokerAddress, SecurityOptions securityOption

if (securityOptions.UseSsl)
{
securityOptionsBuilder.SetSslEncryption(securityOptions.SslCertificates);
if (!string.IsNullOrWhiteSpace(securityOptions.SslCaContent))
{
securityOptionsBuilder.SetSslCaContent(securityOptions.SslCaContent);
}
else
{
securityOptionsBuilder.SetSslEncryption(securityOptions.SslCertificates);
}
}
else
{
Expand Down
86 changes: 16 additions & 70 deletions src/QuixStreams.Streaming/QuixStreamingClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -720,13 +720,19 @@ private async Task<KafkaStreamingClient> CreateStreamingClientForWorkspace(Works
}
logger.LogWarning("Workspace {0} is in state {1} instead of {2}.", ws.WorkspaceId, ws.Status, WorkspaceStatus.Ready);
}

var securityOptions = new SecurityOptions();

if (ws.Broker.SecurityMode == BrokerSecurityMode.Ssl || ws.Broker.SecurityMode == BrokerSecurityMode.SaslSsl)
{
var librdKafkaConfig = await GetWorkspaceLibrdKafkaConfig(ws.WorkspaceId);
securityOptions.UseSsl = true;
securityOptions.SslCertificates = await GetWorkspaceCertificatePath(ws).ConfigureAwait(false);
if (librdKafkaConfig.TryGetValue("ssl.ca.cert", out var sslcacert))
{
byte[] data = Convert.FromBase64String(sslcacert);
string decodedString = System.Text.Encoding.UTF8.GetString(data);
securityOptions.SslCaContent = decodedString;
}
if (!brokerProperties.ContainsKey("ssl.endpoint.identification.algorithm"))
{
brokerProperties["ssl.endpoint.identification.algorithm"] = "none"; // default back to None
Expand Down Expand Up @@ -770,81 +776,20 @@ private async Task<KafkaStreamingClient> CreateStreamingClientForWorkspace(Works
var client = new KafkaStreamingClient(ws.Broker.Address, securityOptions, brokerProperties, debug);
return wsToStreamingClientDict.GetOrAdd(ws.WorkspaceId, client);
}

private async Task<string> GetWorkspaceCertificatePath(Workspace ws)
{
if (!ws.Broker.HasCertificate) return null;
var targetFolder = Path.Combine(Directory.GetCurrentDirectory(), "certificates", ws.WorkspaceId);
var certPath = Path.Combine(targetFolder, "ca.cert");
if (!File.Exists(certPath))
{
var wsLock = workspaceLocks.GetOrAdd(ws.WorkspaceId, new object());
lock (wsLock)
{
if (!File.Exists(certPath))
{
async Task<string> HelperFunc()
{
Directory.CreateDirectory(targetFolder);
this.logger.LogTrace("Certificate is not yet downloaded for workspace {0}.", ws.Name);
var zipPath = Path.Combine(targetFolder, "certs.zip");
if (!File.Exists(zipPath))
{
this.logger.LogTrace("Downloading certificate for workspace {0}.", ws.Name);
var response = await this.SendRequestToApi(HttpMethod.Get, new Uri(ApiUrl, $"workspaces/{ws.WorkspaceId}/certificates")).ConfigureAwait(false);
if (response.StatusCode == HttpStatusCode.NoContent)
{
ws.Broker.HasCertificate = false;
return null;
}

using var fs = File.Open(zipPath, FileMode.Create);
await response.Content.CopyToAsync(fs).ConfigureAwait(false);
}

var hasCert = false;

using (var file = File.OpenRead(zipPath))
using (var zip = new ZipArchive(file, ZipArchiveMode.Read))
{
foreach (var entry in zip.Entries)
{
if (entry.Name != "ca.cert") continue;
using var stream = entry.Open();
using var fs = File.Open(certPath, FileMode.Create);
await stream.CopyToAsync(fs).ConfigureAwait(false);
hasCert = true;
}
}
File.Delete(zipPath);
this.logger.LogTrace("Certificate is now available for workspace {0}", ws.Name);
if (!hasCert)
{
this.logger.LogWarning("Expected to find certificate for workspace {0}, but the downloaded zip had none.", ws.Name);
return null;
}
return certPath;
}
return HelperFunc().ConfigureAwait(true).GetAwaiter().GetResult();
}
this.logger.LogTrace("Certificate is downloaded by another thread for workspace {0}", ws.Name);
}
}
else
{
this.logger.LogTrace("Certificate is already available for workspace {0}", ws.Name);
}

return certPath;
}


private async Task<List<Workspace>> GetWorkspaces()
{
var result = await GetModelFromApi<List<Workspace>>("workspaces", true, true).ConfigureAwait(false);
if (result.Count == 0) throw new InvalidTokenException("Could not find any workspaces for this token.");
return result;
}

private async Task<IDictionary<string, string>> GetWorkspaceLibrdKafkaConfig(string workspaceId)
{
var result = await GetModelFromApi<IDictionary<string, string>>($"workspaces/{workspaceId}/broker/librdkafka", true, true).ConfigureAwait(false);
return result;
}

private Task<List<Topic>> GetTopics(Workspace workspace, bool useCache)
{
return GetModelFromApi<List<Topic>>($"{workspace.WorkspaceId}/topics", true, useCache);
Expand Down Expand Up @@ -891,6 +836,7 @@ private async Task<HttpResponseMessage> SendRequestToApi(HttpMethod method, Uri
httpRequest.Content = new StringContent(JsonConvert.SerializeObject(bodyModel), Encoding.UTF8, "application/json");
}
httpRequest.Headers.Authorization = new AuthenticationHeaderValue("Bearer", token);
httpRequest.Headers.Add("X-Version", "2.0");
try
{
var response =
Expand Down
Loading

0 comments on commit f00991a

Please sign in to comment.