From 1a80606c0537af2e315ab85c5894a9e5d713fa6f Mon Sep 17 00:00:00 2001 From: mtmk Date: Fri, 2 Aug 2024 22:40:04 +0100 Subject: [PATCH] Add NATS client implementation (#589) * Add NATS client implementation Added the initial implementation of the NATS client, including publish, subscribe, and request/reply functionalities. NATS.Client package is aimed at basic applications and samples or POCs, where especially serialization setup and other params are set to a sensible defaults. * dotnet format --- NATS.Client.sln | 21 ++ sandbox/Example.Client/Example.Client.csproj | 14 ++ sandbox/Example.Client/Program.cs | 98 +++++++++ sandbox/Example.NativeAot/Program.cs | 2 + src/NATS.Client.Core/INatsClient.cs | 118 ++++++++++ src/NATS.Client.Core/INatsConnection.cs | 111 +--------- src/NATS.Client.Core/INatsSerialize.cs | 63 +++++- src/NATS.Client.Core/NATS.Client.Core.csproj | 2 +- src/NATS.Client.Core/NatsConnection.cs | 3 + .../NatsJsonSerializer.cs | 22 +- src/NATS.Client/NATS.Client.csproj | 25 +++ src/NATS.Client/NatsClient.cs | 65 ++++++ .../NatsClientDefaultSerializer.cs | 26 +++ .../NatsClientDefaultSerializerRegistry.cs | 24 +++ .../NATS.Client.Core.Tests/SerializerTest.cs | 4 + .../CustomSerializerTest.cs | 2 + .../NatsJsContextFactoryTest.cs | 2 + tests/NATS.Client.Tests/ClientTest.cs | 203 ++++++++++++++++++ .../NATS.Client.Tests.csproj | 41 ++++ .../SerializationPage.cs | 2 + 20 files changed, 731 insertions(+), 117 deletions(-) create mode 100644 sandbox/Example.Client/Example.Client.csproj create mode 100644 sandbox/Example.Client/Program.cs create mode 100644 src/NATS.Client.Core/INatsClient.cs create mode 100644 src/NATS.Client/NATS.Client.csproj create mode 100644 src/NATS.Client/NatsClient.cs create mode 100644 src/NATS.Client/NatsClientDefaultSerializer.cs create mode 100644 src/NATS.Client/NatsClientDefaultSerializerRegistry.cs create mode 100644 tests/NATS.Client.Tests/ClientTest.cs create mode 100644 tests/NATS.Client.Tests/NATS.Client.Tests.csproj diff --git a/NATS.Client.sln b/NATS.Client.sln index 5779d3931..d36b69e4b 100644 --- a/NATS.Client.sln +++ b/NATS.Client.sln @@ -109,6 +109,12 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NATS.Net.OpenTelemetry.Test EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NATS.Client.Platform.Windows.Tests", "tests\NATS.Client.Platform.Windows.Tests\NATS.Client.Platform.Windows.Tests.csproj", "{A37994CC-A23A-415E-8B61-9468C7178A55}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NATS.Client", "src\NATS.Client\NATS.Client.csproj", "{48F1F736-3D87-4453-B497-BD9C203B2385}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Example.Client", "sandbox\Example.Client\Example.Client.csproj", "{A15CCDD5-B707-4142-B99A-64F0AB62318A}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NATS.Client.Tests", "tests\NATS.Client.Tests\NATS.Client.Tests.csproj", "{6DAAAA87-8DDF-4E60-81CE-D8900327DE33}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -291,6 +297,18 @@ Global {A37994CC-A23A-415E-8B61-9468C7178A55}.Debug|Any CPU.Build.0 = Debug|Any CPU {A37994CC-A23A-415E-8B61-9468C7178A55}.Release|Any CPU.ActiveCfg = Release|Any CPU {A37994CC-A23A-415E-8B61-9468C7178A55}.Release|Any CPU.Build.0 = Release|Any CPU + {48F1F736-3D87-4453-B497-BD9C203B2385}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {48F1F736-3D87-4453-B497-BD9C203B2385}.Debug|Any CPU.Build.0 = Debug|Any CPU + {48F1F736-3D87-4453-B497-BD9C203B2385}.Release|Any CPU.ActiveCfg = Release|Any CPU + {48F1F736-3D87-4453-B497-BD9C203B2385}.Release|Any CPU.Build.0 = Release|Any CPU + {A15CCDD5-B707-4142-B99A-64F0AB62318A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {A15CCDD5-B707-4142-B99A-64F0AB62318A}.Debug|Any CPU.Build.0 = Debug|Any CPU + {A15CCDD5-B707-4142-B99A-64F0AB62318A}.Release|Any CPU.ActiveCfg = Release|Any CPU + {A15CCDD5-B707-4142-B99A-64F0AB62318A}.Release|Any CPU.Build.0 = Release|Any CPU + {6DAAAA87-8DDF-4E60-81CE-D8900327DE33}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {6DAAAA87-8DDF-4E60-81CE-D8900327DE33}.Debug|Any CPU.Build.0 = Debug|Any CPU + {6DAAAA87-8DDF-4E60-81CE-D8900327DE33}.Release|Any CPU.ActiveCfg = Release|Any CPU + {6DAAAA87-8DDF-4E60-81CE-D8900327DE33}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -341,6 +359,9 @@ Global {474BA453-9CFF-41C2-B2E7-ADD92CC93E86} = {95A69671-16CA-4133-981C-CC381B7AAA30} {B8554582-DE19-41A2-9784-9B27C9F22429} = {C526E8AB-739A-48D7-8FC4-048978C9B650} {A37994CC-A23A-415E-8B61-9468C7178A55} = {C526E8AB-739A-48D7-8FC4-048978C9B650} + {48F1F736-3D87-4453-B497-BD9C203B2385} = {4827B3EC-73D8-436D-AE2A-5E29AC95FD0C} + {A15CCDD5-B707-4142-B99A-64F0AB62318A} = {95A69671-16CA-4133-981C-CC381B7AAA30} + {6DAAAA87-8DDF-4E60-81CE-D8900327DE33} = {C526E8AB-739A-48D7-8FC4-048978C9B650} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {8CBB7278-D093-448E-B3DE-B5991209A1AA} diff --git a/sandbox/Example.Client/Example.Client.csproj b/sandbox/Example.Client/Example.Client.csproj new file mode 100644 index 000000000..607b57d7b --- /dev/null +++ b/sandbox/Example.Client/Example.Client.csproj @@ -0,0 +1,14 @@ + + + + Exe + net8.0 + enable + enable + + + + + + + diff --git a/sandbox/Example.Client/Program.cs b/sandbox/Example.Client/Program.cs new file mode 100644 index 000000000..835438fa9 --- /dev/null +++ b/sandbox/Example.Client/Program.cs @@ -0,0 +1,98 @@ +// See https://aka.ms/new-console-template for more information + +using System.Text; +using NATS.Client; + +CancellationTokenSource cts = new(); + +await using var client = new NatsClient(); + +// Subscribe for int, string, bytes, json +List tasks = +[ + Task.Run(async () => + { + await foreach (var msg in client.SubscribeAsync("x.int", cancellationToken: cts.Token)) + { + Console.WriteLine($"Received int: {msg.Data}"); + } + }), + + Task.Run(async () => + { + await foreach (var msg in client.SubscribeAsync("x.string", cancellationToken: cts.Token)) + { + Console.WriteLine($"Received string: {msg.Data}"); + } + }), + + Task.Run(async () => + { + await foreach (var msg in client.SubscribeAsync("x.bytes", cancellationToken: cts.Token)) + { + if (msg.Data != null) + { + Console.WriteLine($"Received bytes: {Encoding.UTF8.GetString(msg.Data)}"); + } + } + }), + + Task.Run(async () => + { + await foreach (var msg in client.SubscribeAsync("x.json", cancellationToken: cts.Token)) + { + Console.WriteLine($"Received data: {msg.Data}"); + } + }), + + Task.Run(async () => + { + await foreach (var msg in client.SubscribeAsync("x.service", cancellationToken: cts.Token)) + { + if (msg.Data != null) + { + Console.WriteLine($"Replying to data: {msg.Data}"); + await msg.ReplyAsync($"Thank you {msg.Data.Name} your Id is {msg.Data.Id}!"); + } + } + }), + + Task.Run(async () => + { + var id = 0; + await foreach (var msg in client.SubscribeAsync("x.service2", cancellationToken: cts.Token)) + { + await msg.ReplyAsync(new MyData(id++, $"foo{id}")); + } + }) +]; + +await Task.Delay(1000); + +await client.PublishAsync("x.int", 100); +await client.PublishAsync("x.string", "Hello, World!"); +await client.PublishAsync("x.bytes", new byte[] { 65, 66, 67 }); +await client.PublishAsync("x.json", new MyData(30, "bar")); + +// Request/Reply +{ + var response = await client.RequestAsync("x.service", new MyData(100, "foo")); + Console.WriteLine($"Response: {response.Data}"); +} + +// Request/Reply without request data +for (var i = 0; i < 3; i++) +{ + var response = await client.RequestAsync("x.service2"); + Console.WriteLine($"Response[{i}]: {response.Data}"); +} + +// Use JetStream by referencing NATS.Client.JetStream package +// var js = client.GetJetStream(); +await cts.CancelAsync(); + +await Task.WhenAll(tasks); + +Console.WriteLine("Bye!"); + +public record MyData(int Id, string Name); diff --git a/sandbox/Example.NativeAot/Program.cs b/sandbox/Example.NativeAot/Program.cs index 6eb00b803..0bb978429 100644 --- a/sandbox/Example.NativeAot/Program.cs +++ b/sandbox/Example.NativeAot/Program.cs @@ -222,6 +222,8 @@ public void Serialize(IBufferWriter bufferWriter, T value) throw new NatsException($"Can't deserialize {typeof(T)}"); } + + public INatsSerializer CombineWith(INatsSerializer next) => throw new NotImplementedException(); } public record MyData diff --git a/src/NATS.Client.Core/INatsClient.cs b/src/NATS.Client.Core/INatsClient.cs new file mode 100644 index 000000000..ac1f1dacb --- /dev/null +++ b/src/NATS.Client.Core/INatsClient.cs @@ -0,0 +1,118 @@ +namespace NATS.Client.Core; + +public interface INatsClient : IAsyncDisposable +{ + /// + /// Represents a connection to the NATS server. + /// + INatsConnection Connection { get; } + + /// + /// Connect socket and write CONNECT command to nats server. + /// + ValueTask ConnectAsync(); + + /// + /// Send PING command and await PONG. Return value is similar as Round Trip Time (RTT). + /// + /// A used to cancel the command. + /// A that represents the asynchronous round trip operation. + ValueTask PingAsync(CancellationToken cancellationToken = default); + + /// + /// Publishes a serializable message payload to the given subject name, optionally supplying a reply subject. + /// + /// The destination subject to publish to. + /// Serializable data object. + /// Optional message headers. + /// Optional reply-to subject. + /// Serializer to use for the message type. + /// A for publishing options. + /// A used to cancel the command. + /// Specifies the type of data that may be sent to the NATS Server. + /// A that represents the asynchronous send operation. + ValueTask PublishAsync(string subject, T data, NatsHeaders? headers = default, string? replyTo = default, INatsSerialize? serializer = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default); + + /// + /// Publishes an empty message payload to the given subject name, optionally supplying a reply subject. + /// + /// The destination subject to publish to. + /// Optional message headers. + /// Optional reply-to subject. + /// A for publishing options. + /// A used to cancel the command. + /// A that represents the asynchronous send operation. + /// + /// Publishing a sentinel usually means a signal to the given subject which could be used to trigger an action + /// or indicate an event for example and of messages. + /// + ValueTask PublishAsync(string subject, NatsHeaders? headers = default, string? replyTo = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default); + + /// + /// Initiates a subscription to a subject, optionally joining a distributed queue group. + /// + /// The subject name to subscribe to. + /// If specified, the subscriber will join this queue group. + /// Serializer to use for the message type. + /// A for subscription options. + /// A used to cancel the command. + /// Specifies the type of data that may be received from the NATS Server. + /// An asynchronous enumerable of objects + /// + /// Subscribers with the same queue group name, become a queue group, + /// and only one randomly chosen subscriber of the queue group will + /// consume a message each time a message is received by the queue group. + /// + IAsyncEnumerable> SubscribeAsync(string subject, string? queueGroup = default, INatsDeserialize? serializer = default, NatsSubOpts? opts = default, CancellationToken cancellationToken = default); + + /// + /// Request and receive a single reply from a responder. + /// + /// Subject of the responder + /// Data to send to responder + /// Optional message headers + /// Serializer to use for the request message type. + /// Serializer to use for the reply message type. + /// Request publish options + /// Reply handler subscription options + /// Cancel this request + /// Request type + /// Reply type + /// Returns the received from the responder as reply. + /// Raised when cancellation token is used + /// + /// Response can be (null) or one . + /// Reply option's max messages will be set to 1. + /// If reply option's timeout is not defined, then it will be set to NatsOpts.RequestTimeout. + /// + ValueTask> RequestAsync( + string subject, + TRequest? data, + NatsHeaders? headers = default, + INatsSerialize? requestSerializer = default, + INatsDeserialize? replySerializer = default, + NatsPubOpts? requestOpts = default, + NatsSubOpts? replyOpts = default, + CancellationToken cancellationToken = default); + + /// + /// Send an empty request message and await the reply message asynchronously. + /// + /// Subject of the responder + /// Serializer to use for the reply message type. + /// Reply handler subscription options + /// Cancel this request + /// Reply type + /// Returns the received from the responder as reply. + /// Raised when cancellation token is used + /// + /// Response can be (null) or one . + /// Reply option's max messages will be set to 1. + /// If reply option's timeout is not defined, then it will be set to NatsOpts.RequestTimeout. + /// + ValueTask> RequestAsync( + string subject, + INatsDeserialize? replySerializer = default, + NatsSubOpts? replyOpts = default, + CancellationToken cancellationToken = default); +} diff --git a/src/NATS.Client.Core/INatsConnection.cs b/src/NATS.Client.Core/INatsConnection.cs index f83a49580..c54c9454a 100644 --- a/src/NATS.Client.Core/INatsConnection.cs +++ b/src/NATS.Client.Core/INatsConnection.cs @@ -2,7 +2,7 @@ namespace NATS.Client.Core; -public interface INatsConnection : IAsyncDisposable +public interface INatsConnection : INatsClient { event AsyncEventHandler? ConnectionDisconnected; @@ -18,42 +18,6 @@ public interface INatsConnection : IAsyncDisposable NatsConnectionState ConnectionState { get; } - /// - /// Send PING command and await PONG. Return value is similar as Round Trip Time (RTT). - /// - /// A used to cancel the command. - /// A that represents the asynchronous round trip operation. - ValueTask PingAsync(CancellationToken cancellationToken = default); - - /// - /// Publishes a serializable message payload to the given subject name, optionally supplying a reply subject. - /// - /// The destination subject to publish to. - /// Serializable data object. - /// Optional message headers. - /// Optional reply-to subject. - /// Serializer to use for the message type. - /// A for publishing options. - /// A used to cancel the command. - /// Specifies the type of data that may be sent to the NATS Server. - /// A that represents the asynchronous send operation. - ValueTask PublishAsync(string subject, T data, NatsHeaders? headers = default, string? replyTo = default, INatsSerialize? serializer = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default); - - /// - /// Publishes an empty message payload to the given subject name, optionally supplying a reply subject. - /// - /// The destination subject to publish to. - /// Optional message headers. - /// Optional reply-to subject. - /// A for publishing options. - /// A used to cancel the command. - /// A that represents the asynchronous send operation. - /// - /// Publishing a sentinel usually means a signal to the given subject which could be used to trigger an action - /// or indicate an event for example and of messages. - /// - ValueTask PublishAsync(string subject, NatsHeaders? headers = default, string? replyTo = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default); - /// /// Publishes a serializable message payload to the given subject name, optionally supplying a reply subject. /// @@ -65,23 +29,6 @@ public interface INatsConnection : IAsyncDisposable /// A that represents the asynchronous send operation. ValueTask PublishAsync(in NatsMsg msg, INatsSerialize? serializer = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default); - /// - /// Initiates a subscription to a subject, optionally joining a distributed queue group. - /// - /// The subject name to subscribe to. - /// If specified, the subscriber will join this queue group. - /// Serializer to use for the message type. - /// A for subscription options. - /// A used to cancel the command. - /// Specifies the type of data that may be received from the NATS Server. - /// An asynchronous enumerable of objects - /// - /// Subscribers with the same queue group name, become a queue group, - /// and only one randomly chosen subscriber of the queue group will - /// consume a message each time a message is received by the queue group. - /// - IAsyncEnumerable> SubscribeAsync(string subject, string? queueGroup = default, INatsDeserialize? serializer = default, NatsSubOpts? opts = default, CancellationToken cancellationToken = default); - /// /// Initiates a subscription to a subject, optionally joining a distributed queue group /// and returns a object which provides more control over the subscription. @@ -113,57 +60,6 @@ public interface INatsConnection : IAsyncDisposable /// A containing a unique inbox subject. string NewInbox(); - /// - /// Request and receive a single reply from a responder. - /// - /// Subject of the responder - /// Data to send to responder - /// Optional message headers - /// Serializer to use for the request message type. - /// Serializer to use for the reply message type. - /// Request publish options - /// Reply handler subscription options - /// Cancel this request - /// Request type - /// Reply type - /// Returns the received from the responder as reply. - /// Raised when cancellation token is used - /// - /// Response can be (null) or one . - /// Reply option's max messages will be set to 1. - /// If reply option's timeout is not defined, then it will be set to NatsOpts.RequestTimeout. - /// - ValueTask> RequestAsync( - string subject, - TRequest? data, - NatsHeaders? headers = default, - INatsSerialize? requestSerializer = default, - INatsDeserialize? replySerializer = default, - NatsPubOpts? requestOpts = default, - NatsSubOpts? replyOpts = default, - CancellationToken cancellationToken = default); - - /// - /// Send an empty request message and await the reply message asynchronously. - /// - /// Subject of the responder - /// Serializer to use for the reply message type. - /// Reply handler subscription options - /// Cancel this request - /// Reply type - /// Returns the received from the responder as reply. - /// Raised when cancellation token is used - /// - /// Response can be (null) or one . - /// Reply option's max messages will be set to 1. - /// If reply option's timeout is not defined, then it will be set to NatsOpts.RequestTimeout. - /// - ValueTask> RequestAsync( - string subject, - INatsDeserialize? replySerializer = default, - NatsSubOpts? replyOpts = default, - CancellationToken cancellationToken = default); - /// /// Request and receive zero or more replies from a responder. /// @@ -191,9 +87,4 @@ IAsyncEnumerable> RequestManyAsync( NatsPubOpts? requestOpts = default, NatsSubOpts? replyOpts = default, CancellationToken cancellationToken = default); - - /// - /// Connect socket and write CONNECT command to nats server. - /// - ValueTask ConnectAsync(); } diff --git a/src/NATS.Client.Core/INatsSerialize.cs b/src/NATS.Client.Core/INatsSerialize.cs index ddb055532..82f815a9e 100644 --- a/src/NATS.Client.Core/INatsSerialize.cs +++ b/src/NATS.Client.Core/INatsSerialize.cs @@ -14,6 +14,12 @@ namespace NATS.Client.Core; /// Serialized object type public interface INatsSerializer : INatsSerialize, INatsDeserialize { + /// + /// Combines the current serializer with the specified serializer. + /// + /// The serializer to be combined with. + /// The combined serializer. + INatsSerializer CombineWith(INatsSerializer next); } /// @@ -82,7 +88,7 @@ public class NatsDefaultSerializerRegistry : INatsSerializerRegistry /// public class NatsUtf8PrimitivesSerializer : INatsSerializer { - public static readonly NatsUtf8PrimitivesSerializer Default = new(default); + public static readonly NatsUtf8PrimitivesSerializer Default = new(); private readonly INatsSerializer? _next; @@ -90,7 +96,10 @@ public class NatsUtf8PrimitivesSerializer : INatsSerializer /// Creates a new instance of . /// /// The next serializer in chain. - public NatsUtf8PrimitivesSerializer(INatsSerializer? next) => _next = next; + public NatsUtf8PrimitivesSerializer(INatsSerializer? next = default) => _next = next; + + /// + public INatsSerializer CombineWith(INatsSerializer? next) => new NatsUtf8PrimitivesSerializer(next); /// public void Serialize(IBufferWriter bufferWriter, T value) @@ -595,7 +604,10 @@ public class NatsRawSerializer : INatsSerializer /// Creates a new instance of . /// /// Next serializer in chain. - public NatsRawSerializer(INatsSerializer? next) => _next = next; + public NatsRawSerializer(INatsSerializer? next = default) => _next = next; + + /// + public INatsSerializer CombineWith(INatsSerializer? next) => new NatsRawSerializer(next); /// public void Serialize(IBufferWriter bufferWriter, T value) @@ -723,8 +735,10 @@ public sealed class NatsJsonContextSerializerRegistry : INatsSerializerRegistry /// public sealed class NatsJsonContextSerializer : INatsSerializer { + // ReSharper disable once StaticMemberInGenericType private static readonly JsonWriterOptions JsonWriterOpts = new() { Indented = false, SkipValidation = true }; + // ReSharper disable once StaticMemberInGenericType [ThreadStatic] private static Utf8JsonWriter? _jsonWriter; @@ -747,6 +761,9 @@ public NatsJsonContextSerializer(JsonSerializerContext context, INatsSerializer< { } + /// + public INatsSerializer CombineWith(INatsSerializer next) => new NatsJsonContextSerializer(_contexts, next); + /// public void Serialize(IBufferWriter bufferWriter, T value) { @@ -804,6 +821,46 @@ public void Serialize(IBufferWriter bufferWriter, T value) } } +/// +/// Represents a builder for creating a chain of serializers for NATS messages. +/// +/// Serialized object type +public class NatsSerializerBuilder +{ + private readonly List> _serializers = new(); + + /// + /// Adds a serializer to the chain of serializers for NATS messages. + /// + /// The serializer to be added. + /// The updated instance of the serializer builder. + public NatsSerializerBuilder Add(INatsSerializer serializer) + { + _serializers.Add(serializer); + return this; + } + + /// + /// Builds a chain of serializers for NATS messages based on the added serializers. + /// Serializers are combined in the reverse order they were added. + /// + /// The combined serializer chain. + public INatsSerializer Build() + { + if (_serializers.Count == 0) + { + return NatsDefaultSerializer.Default; + } + + for (var i = _serializers.Count - 1; i > 0; i--) + { + _serializers[i - 1] = _serializers[i - 1].CombineWith(_serializers[i]); + } + + return _serializers[0]; + } +} + internal sealed class NullBufferWriter : IBufferWriter { internal static readonly IBufferWriter Instance = new NullBufferWriter(); diff --git a/src/NATS.Client.Core/NATS.Client.Core.csproj b/src/NATS.Client.Core/NATS.Client.Core.csproj index 52a723dc7..8244f952c 100644 --- a/src/NATS.Client.Core/NATS.Client.Core.csproj +++ b/src/NATS.Client.Core/NATS.Client.Core.csproj @@ -8,7 +8,7 @@ pubsub;messaging - NATS client for .NET. + NATS core client for .NET true diff --git a/src/NATS.Client.Core/NatsConnection.cs b/src/NATS.Client.Core/NatsConnection.cs index 7993cc6e8..fbdd77b62 100644 --- a/src/NATS.Client.Core/NatsConnection.cs +++ b/src/NATS.Client.Core/NatsConnection.cs @@ -1,5 +1,6 @@ using System.Buffers; using System.Diagnostics; +using System.Runtime.CompilerServices; using System.Threading.Channels; using Microsoft.Extensions.Logging; using NATS.Client.Core.Commands; @@ -112,6 +113,8 @@ public NatsConnection(NatsOpts opts) public event AsyncEventHandler? MessageDropped; + public INatsConnection Connection => this; + public NatsOpts Opts { get; } public NatsConnectionState ConnectionState diff --git a/src/NATS.Client.Serializers.Json/NatsJsonSerializer.cs b/src/NATS.Client.Serializers.Json/NatsJsonSerializer.cs index 9aa6b0b8d..2c4c56fbd 100644 --- a/src/NATS.Client.Serializers.Json/NatsJsonSerializer.cs +++ b/src/NATS.Client.Serializers.Json/NatsJsonSerializer.cs @@ -11,15 +11,28 @@ namespace NATS.Client.Serializers.Json; /// /// This serializer is not suitable for native AOT deployments since it might rely on reflection /// -public sealed class NatsJsonSerializer : INatsSerialize, INatsDeserialize +public sealed class NatsJsonSerializer : INatsSerializer { + // ReSharper disable once StaticMemberInGenericType private static readonly JsonWriterOptions JsonWriterOpts = new() { Indented = false, SkipValidation = true, }; + // ReSharper disable once StaticMemberInGenericType [ThreadStatic] private static Utf8JsonWriter? _jsonWriter; private readonly JsonSerializerOptions _opts; + /// + /// Reflection-based JSON serializer for NATS. + /// + /// + /// This serializer is not suitable for native AOT deployments since it might rely on reflection + /// + public NatsJsonSerializer() + : this(new JsonSerializerOptions { DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull }) + { + } + /// /// Creates a new instance of with the specified options. /// @@ -29,9 +42,12 @@ public sealed class NatsJsonSerializer : INatsSerialize, INatsDeserialize< /// /// Default instance of with option set to ignore null values when writing. /// - public static NatsJsonSerializer Default { get; } = new(new JsonSerializerOptions { DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull }); + public static NatsJsonSerializer Default { get; } = new(); + + /// + public INatsSerializer CombineWith(INatsSerializer next) => throw new NotSupportedException(); - /// flush + /// public void Serialize(IBufferWriter bufferWriter, T? value) { Utf8JsonWriter writer; diff --git a/src/NATS.Client/NATS.Client.csproj b/src/NATS.Client/NATS.Client.csproj new file mode 100644 index 000000000..834a4f053 --- /dev/null +++ b/src/NATS.Client/NATS.Client.csproj @@ -0,0 +1,25 @@ + + + + netstandard2.0;netstandard2.1;net6.0;net8.0 + enable + enable + true + + + pubsub;messaging + NATS client for .NET + + + false + + + + + + + + + diff --git a/src/NATS.Client/NatsClient.cs b/src/NATS.Client/NatsClient.cs new file mode 100644 index 000000000..0db53d9f5 --- /dev/null +++ b/src/NATS.Client/NatsClient.cs @@ -0,0 +1,65 @@ +using System.Threading.Channels; +using NATS.Client.Core; + +namespace NATS.Client; + +/// +/// Represents a NATS client that provides methods for interacting with NATS server. +/// +public class NatsClient : INatsClient +{ + /// + /// Initializes a new instance of the class. + /// + /// NATS server URL + /// Client name + /// Credentials filepath + public NatsClient( + string url = "nats://localhost:4222", + string name = "NATS .NET Client", + string? credsFile = default) + { + var opts = new NatsOpts + { + Name = name, + Url = url, + SerializerRegistry = NatsClientDefaultSerializerRegistry.Default, + SubPendingChannelFullMode = BoundedChannelFullMode.Wait, + AuthOpts = new NatsAuthOpts { CredsFile = credsFile }, + }; + + Connection = new NatsConnection(opts); + } + + /// + public INatsConnection Connection { get; } + + /// + public ValueTask ConnectAsync() => Connection.ConnectAsync(); + + /// + public ValueTask PingAsync(CancellationToken cancellationToken = default) => Connection.PingAsync(cancellationToken); + + /// + public ValueTask PublishAsync(string subject, T data, NatsHeaders? headers = default, string? replyTo = default, INatsSerialize? serializer = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default) + => Connection.PublishAsync(subject, data, headers, replyTo, serializer, opts, cancellationToken); + + /// + public ValueTask PublishAsync(string subject, NatsHeaders? headers = default, string? replyTo = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default) + => Connection.PublishAsync(subject, headers, replyTo, opts, cancellationToken); + + /// + public IAsyncEnumerable> SubscribeAsync(string subject, string? queueGroup = default, INatsDeserialize? serializer = default, NatsSubOpts? opts = default, CancellationToken cancellationToken = default) + => Connection.SubscribeAsync(subject, queueGroup, serializer, opts, cancellationToken); + + /// + public ValueTask> RequestAsync(string subject, TRequest? data, NatsHeaders? headers = default, INatsSerialize? requestSerializer = default, INatsDeserialize? replySerializer = default, NatsPubOpts? requestOpts = default, NatsSubOpts? replyOpts = default, CancellationToken cancellationToken = default) + => Connection.RequestAsync(subject, data, headers, requestSerializer, replySerializer, requestOpts, replyOpts, cancellationToken); + + /// + public ValueTask> RequestAsync(string subject, INatsDeserialize? replySerializer = default, NatsSubOpts? replyOpts = default, CancellationToken cancellationToken = default) + => Connection.RequestAsync(subject, replySerializer, replyOpts, cancellationToken); + + /// + public ValueTask DisposeAsync() => Connection.DisposeAsync(); +} diff --git a/src/NATS.Client/NatsClientDefaultSerializer.cs b/src/NATS.Client/NatsClientDefaultSerializer.cs new file mode 100644 index 000000000..fecc12a9a --- /dev/null +++ b/src/NATS.Client/NatsClientDefaultSerializer.cs @@ -0,0 +1,26 @@ +using NATS.Client.Core; +using NATS.Client.Serializers.Json; + +namespace NATS.Client; + +/// +/// Default serializer interface for NATS messages. +/// +/// Serialized object type +public static class NatsClientDefaultSerializer +{ + /// + /// Default serializer interface for NATS messages. + /// + public static readonly INatsSerializer Default; + + static NatsClientDefaultSerializer() + { + Default = new NatsSerializerBuilder() + .Add(new NatsRawSerializer()) + .Add(new NatsUtf8PrimitivesSerializer()) + .Add(new NatsJsonSerializer()) + .Build(); + Console.WriteLine($"Default serializer for {typeof(T).Name} is {Default.GetType().Name}"); + } +} diff --git a/src/NATS.Client/NatsClientDefaultSerializerRegistry.cs b/src/NATS.Client/NatsClientDefaultSerializerRegistry.cs new file mode 100644 index 000000000..0b4a780bc --- /dev/null +++ b/src/NATS.Client/NatsClientDefaultSerializerRegistry.cs @@ -0,0 +1,24 @@ +using NATS.Client.Core; + +namespace NATS.Client; + +/// +/// Default implementation of the INatsSerializerRegistry interface. +/// It provides the default serializer and deserializer for primitive types, +/// binary data, and JSON serialization. +/// +public class NatsClientDefaultSerializerRegistry : INatsSerializerRegistry +{ + /// + /// Default implementation of the INatsSerializerRegistry interface. + /// It provides the default serializer and deserializer for primitive types, + /// binary data, and JSON serialization. + /// + public static readonly NatsClientDefaultSerializerRegistry Default = new(); + + /// + public INatsSerialize GetSerializer() => NatsClientDefaultSerializer.Default; + + /// + public INatsDeserialize GetDeserializer() => NatsClientDefaultSerializer.Default; +} diff --git a/tests/NATS.Client.Core.Tests/SerializerTest.cs b/tests/NATS.Client.Core.Tests/SerializerTest.cs index 45f3aeb81..d594213a5 100644 --- a/tests/NATS.Client.Core.Tests/SerializerTest.cs +++ b/tests/NATS.Client.Core.Tests/SerializerTest.cs @@ -299,6 +299,8 @@ public class TestSerializer : INatsSerializer public void Serialize(IBufferWriter bufferWriter, T? value) => throw new TestSerializerException(); public T? Deserialize(in ReadOnlySequence buffer) => throw new TestSerializerException(); + + public INatsSerializer CombineWith(INatsSerializer next) => throw new NotImplementedException(); } public class TestSerializerException : Exception; @@ -310,6 +312,8 @@ public class TestSerializerWithEmpty : INatsSerializer : new TestData(Encoding.ASCII.GetString(buffer))); public void Serialize(IBufferWriter bufferWriter, T value) => throw new Exception("not used"); + + public INatsSerializer CombineWith(INatsSerializer next) => throw new NotImplementedException(); } public record TestData(string Name); diff --git a/tests/NATS.Client.JetStream.Tests/CustomSerializerTest.cs b/tests/NATS.Client.JetStream.Tests/CustomSerializerTest.cs index 4b3e89f5d..e93d3345d 100644 --- a/tests/NATS.Client.JetStream.Tests/CustomSerializerTest.cs +++ b/tests/NATS.Client.JetStream.Tests/CustomSerializerTest.cs @@ -67,6 +67,8 @@ public void Serialize(IBufferWriter bufferWriter, T value) } public T Deserialize(in ReadOnlySequence buffer) => (T)(object)new byte[] { 42 }; + + public INatsSerializer CombineWith(INatsSerializer next) => throw new NotImplementedException(); } private class Level42SerializerRegistry : INatsSerializerRegistry diff --git a/tests/NATS.Client.JetStream.Tests/NatsJsContextFactoryTest.cs b/tests/NATS.Client.JetStream.Tests/NatsJsContextFactoryTest.cs index c70453892..879b27d8e 100644 --- a/tests/NATS.Client.JetStream.Tests/NatsJsContextFactoryTest.cs +++ b/tests/NATS.Client.JetStream.Tests/NatsJsContextFactoryTest.cs @@ -96,6 +96,8 @@ public class MockConnection : INatsConnection public NatsOpts Opts { get; } = new(); + public INatsConnection Connection => this; + public NatsConnectionState ConnectionState { get; } = NatsConnectionState.Closed; public ValueTask PingAsync(CancellationToken cancellationToken = default) => throw new NotImplementedException(); diff --git a/tests/NATS.Client.Tests/ClientTest.cs b/tests/NATS.Client.Tests/ClientTest.cs new file mode 100644 index 000000000..226a30140 --- /dev/null +++ b/tests/NATS.Client.Tests/ClientTest.cs @@ -0,0 +1,203 @@ +using System.Text; +using NATS.Client.Core.Tests; + +// ReSharper disable AccessToDisposedClosure +namespace NATS.Client.Tests; + +public class ClientTest +{ + [Fact] + public async Task Client_works_with_all_expected_types_and_falls_back_to_JSON() + { + await using var server = NatsServer.Start(); + await using var client = new NatsClient(server.ClientUrl); + + CancellationTokenSource ctsTestTimeout = new(TimeSpan.FromSeconds(10)); + var ctsStop = CancellationTokenSource.CreateLinkedTokenSource(ctsTestTimeout.Token); + + // Subscribe for int, string, bytes, JSON + TaskCompletionSource tcs1 = new(); + TaskCompletionSource tcs1data = new(); + var task1 = Task.Run( + async () => + { + await foreach (var msg in client.SubscribeAsync("x.int.>", cancellationToken: ctsStop.Token)) + { + if (msg.Subject.EndsWith("sync")) + { + tcs1.TrySetResult(); + continue; + } + + tcs1data.SetResult(msg.Data); + } + }, + ctsTestTimeout.Token); + + TaskCompletionSource tcs2 = new(); + TaskCompletionSource tcs2data = new(); + var task2 = Task.Run( + async () => + { + await foreach (var msg in client.SubscribeAsync("x.string.>", cancellationToken: ctsStop.Token)) + { + if (msg.Subject.EndsWith("sync")) + { + tcs2.TrySetResult(); + continue; + } + + tcs2data.SetResult(msg.Data); + } + }, + ctsTestTimeout.Token); + + TaskCompletionSource tcs3 = new(); + TaskCompletionSource tcs3data = new(); + var task3 = Task.Run( + async () => + { + await foreach (var msg in client.SubscribeAsync("x.bytes.>", cancellationToken: ctsStop.Token)) + { + if (msg.Subject.EndsWith("sync")) + { + tcs3.TrySetResult(); + continue; + } + + tcs3data.SetResult(msg.Data); + } + }, + ctsTestTimeout.Token); + + TaskCompletionSource tcs4 = new(); + TaskCompletionSource tcs4data = new(); + var task4 = Task.Run( + async () => + { + await foreach (var msg in client.SubscribeAsync("x.json.>", cancellationToken: ctsStop.Token)) + { + if (msg.Subject.EndsWith("sync")) + { + tcs4.TrySetResult(); + continue; + } + + tcs4data.SetResult(msg.Data); + } + }, + ctsTestTimeout.Token); + + TaskCompletionSource tcs5 = new(); + var task5 = Task.Run( + async () => + { + await foreach (var msg in client.SubscribeAsync("x.service.>", cancellationToken: ctsStop.Token)) + { + if (msg.Subject.EndsWith("sync")) + { + tcs5.TrySetResult(); + continue; + } + + if (msg.Data != null) + { + await msg.ReplyAsync($"Thank you {msg.Data.Name} your Id is {msg.Data.Id}!", cancellationToken: ctsTestTimeout.Token); + } + } + }, + ctsTestTimeout.Token); + + TaskCompletionSource tcs6 = new(); + var task6 = Task.Run( + async () => + { + var id = 0; + await foreach (var msg in client.SubscribeAsync("x.service2.>", cancellationToken: ctsStop.Token)) + { + if (msg.Subject.EndsWith("sync")) + { + tcs6.TrySetResult(); + continue; + } + + await msg.ReplyAsync(new MyData(id, $"foo{id}"), cancellationToken: ctsTestTimeout.Token).ConfigureAwait(false); + id++; + } + }, + ctsTestTimeout.Token); + + await Retry.Until( + reason: "int synced", + condition: () => + { + var taskIsCompleted = tcs1.Task.IsCompleted; + return taskIsCompleted; + }, + action: () => client.PublishAsync("x.int.sync", cancellationToken: ctsTestTimeout.Token).AsTask()); + + await Retry.Until( + reason: "string synced", + condition: () => tcs2.Task.IsCompleted, + action: () => client.PublishAsync("x.string.sync", cancellationToken: ctsTestTimeout.Token).AsTask()); + + await Retry.Until( + reason: "bytes synced", + condition: () => tcs3.Task.IsCompleted, + action: () => client.PublishAsync("x.bytes.sync", cancellationToken: ctsTestTimeout.Token).AsTask()); + + await Retry.Until( + reason: "json synced", + condition: () => tcs4.Task.IsCompleted, + action: () => client.PublishAsync("x.json.sync", cancellationToken: ctsTestTimeout.Token).AsTask()); + + await Retry.Until( + reason: "service synced", + condition: () => tcs5.Task.IsCompleted, + action: () => client.PublishAsync("x.service.sync", cancellationToken: ctsTestTimeout.Token).AsTask()); + + await Retry.Until( + reason: "service2 synced", + condition: () => tcs6.Task.IsCompleted, + action: () => client.PublishAsync("x.service2.sync", cancellationToken: ctsTestTimeout.Token).AsTask()); + + await client.PublishAsync("x.int.data", 100, cancellationToken: ctsTestTimeout.Token); + var r1 = await tcs1data.Task; + Assert.Equal(100, r1); + + await client.PublishAsync("x.string.data", "Hello, World!", cancellationToken: ctsTestTimeout.Token); + var r2 = await tcs2data.Task; + Assert.Equal("Hello, World!", r2); + + await client.PublishAsync("x.bytes.data", "ABC"u8.ToArray(), cancellationToken: ctsTestTimeout.Token); + var r3 = await tcs3data.Task; + Assert.Equal(Encoding.UTF8.GetBytes("ABC"), r3); + + await client.PublishAsync("x.json.data", new MyData(30, "bar"), cancellationToken: ctsTestTimeout.Token); + var r4 = await tcs4data.Task; + Assert.Equal(new MyData(30, "bar"), r4); + + // Request/Reply + { + var response = await client.RequestAsync("x.service.call", new MyData(100, "foo"), cancellationToken: ctsTestTimeout.Token); + Assert.Equal("Thank you foo your Id is 100!", response.Data); + } + + // Request/Reply without request data + for (var i = 0; i < 3; i++) + { + var response = await client.RequestAsync("x.service2.call", cancellationToken: ctsTestTimeout.Token); + Assert.NotNull(response.Data); + Assert.Equal(i, response.Data.Id); + Assert.Equal($"foo{i}", response.Data.Name); + } + + // Use JetStream by referencing NATS.Client.JetStream package + // var js = client.GetJetStream(); + ctsStop.Cancel(); + + await Task.WhenAll(task1, task2, task3, task4, task5, task6); + } + + private record MyData(int Id, string Name); +} diff --git a/tests/NATS.Client.Tests/NATS.Client.Tests.csproj b/tests/NATS.Client.Tests/NATS.Client.Tests.csproj new file mode 100644 index 000000000..c64f26ef5 --- /dev/null +++ b/tests/NATS.Client.Tests/NATS.Client.Tests.csproj @@ -0,0 +1,41 @@ + + + + net6.0;net8.0 + enable + false + $(NoWarn);CS8002 + enable + $(MSBuildProjectDirectory)\test.runsettings + false + + + + + + + + + runtime; build; native; contentfiles; analyzers; buildtransitive + all + + + runtime; build; native; contentfiles; analyzers; buildtransitive + all + + + + + + + + + + + + + + + + + diff --git a/tests/NATS.Net.DocsExamples/SerializationPage.cs b/tests/NATS.Net.DocsExamples/SerializationPage.cs index f25015ac0..f4e3dbcd2 100644 --- a/tests/NATS.Net.DocsExamples/SerializationPage.cs +++ b/tests/NATS.Net.DocsExamples/SerializationPage.cs @@ -297,6 +297,8 @@ public void Serialize(IBufferWriter bufferWriter, T value) throw new NatsException($"Can't deserialize {typeof(T)}"); } + + public INatsSerializer CombineWith(INatsSerializer next) => throw new NotImplementedException(); } public class MyProtoBufSerializerRegistry : INatsSerializerRegistry