From b432a2ff474cf0ff14535a5b67fc0c7d2a0ca5bd Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Wed, 3 Jan 2024 09:11:33 +0000 Subject: [PATCH] Explicitly set service listener serialization (#304) This is necessary when the default serializer isn't using a Raw serialization for NatsMemory type which is used by the service listener. --- .../Internal/SvcListener.cs | 2 +- .../NATS.Client.Services.Tests.csproj | 1 + .../ServicesSerializationTest.cs | 43 +++++++++++++++++++ tests/NATS.Client.TestUtilities/Utils.cs | 6 ++- 4 files changed, 49 insertions(+), 3 deletions(-) create mode 100644 tests/NATS.Client.Services.Tests/ServicesSerializationTest.cs diff --git a/src/NATS.Client.Services/Internal/SvcListener.cs b/src/NATS.Client.Services/Internal/SvcListener.cs index 16a1fce77..1c138ac46 100644 --- a/src/NATS.Client.Services/Internal/SvcListener.cs +++ b/src/NATS.Client.Services/Internal/SvcListener.cs @@ -29,7 +29,7 @@ public ValueTask StartAsync() _cts = CancellationTokenSource.CreateLinkedTokenSource(_cancellationToken); _readLoop = Task.Run(async () => { - await foreach (var msg in _nats.SubscribeAsync>(_subject, _queueGroup, cancellationToken: _cts.Token)) + await foreach (var msg in _nats.SubscribeAsync>(_subject, _queueGroup, serializer: NatsRawSerializer>.Default, cancellationToken: _cts.Token)) { await _channel.Writer.WriteAsync(new SvcMsg(_type, msg), _cancellationToken).ConfigureAwait(false); } diff --git a/tests/NATS.Client.Services.Tests/NATS.Client.Services.Tests.csproj b/tests/NATS.Client.Services.Tests/NATS.Client.Services.Tests.csproj index 53faec348..0f784d731 100644 --- a/tests/NATS.Client.Services.Tests/NATS.Client.Services.Tests.csproj +++ b/tests/NATS.Client.Services.Tests/NATS.Client.Services.Tests.csproj @@ -28,6 +28,7 @@ + diff --git a/tests/NATS.Client.Services.Tests/ServicesSerializationTest.cs b/tests/NATS.Client.Services.Tests/ServicesSerializationTest.cs new file mode 100644 index 000000000..5143597bb --- /dev/null +++ b/tests/NATS.Client.Services.Tests/ServicesSerializationTest.cs @@ -0,0 +1,43 @@ +using System.Buffers; +using NATS.Client.Core.Tests; +using NATS.Client.Serializers.Json; +using NATS.Client.Services.Internal; +using NATS.Client.Services.Models; + +namespace NATS.Client.Services.Tests; + +public class ServicesSerializationTest +{ + private readonly ITestOutputHelper _output; + + public ServicesSerializationTest(ITestOutputHelper output) => _output = output; + + [Fact] + public async Task Service_info_and_stat_request_serialization() + { + await using var server = NatsServer.Start(); + + // Set serializer registry to use anything but a raw bytes (NatsMemory in this case) serializer + await using var nats = server.CreateClientConnection(new NatsOpts { SerializerRegistry = NatsJsonSerializerRegistry.Default }); + + var svc = new NatsSvcContext(nats); + + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + var cancellationToken = cts.Token; + + await using var s1 = await svc.AddServiceAsync("s1", "1.0.0", cancellationToken: cancellationToken); + + var infosTask = nats.FindServicesAsync("$SRV.INFO", 1, NatsSrvJsonSerializer.Default, cancellationToken); + var statsTask = nats.FindServicesAsync("$SRV.STATS", 1, NatsSrvJsonSerializer.Default, cancellationToken); + + var infos = await infosTask; + Assert.Single(infos); + Assert.Equal("s1", infos[0].Name); + Assert.Equal("1.0.0", infos[0].Version); + + var stats = await statsTask; + Assert.Single(stats); + Assert.Equal("s1", stats[0].Name); + Assert.Equal("1.0.0", stats[0].Version); + } +} diff --git a/tests/NATS.Client.TestUtilities/Utils.cs b/tests/NATS.Client.TestUtilities/Utils.cs index 3a21588fe..47024a41e 100644 --- a/tests/NATS.Client.TestUtilities/Utils.cs +++ b/tests/NATS.Client.TestUtilities/Utils.cs @@ -135,7 +135,9 @@ public static async Task> FindServicesAsync(this NatsConnection nats, await Retry.Until("service is found", async () => { var count = 0; - await foreach (var msg in nats.RequestManyAsync(subject, null, replySerializer: serializer, replyOpts: replyOpts, cancellationToken: ct).ConfigureAwait(false)) + + // nats cli sends an empty JSON object '{}' as the request payload so we do the same here + await foreach (var msg in nats.RequestManyAsync(subject, "{}", replySerializer: serializer, replyOpts: replyOpts, cancellationToken: ct).ConfigureAwait(false)) { if (++count == limit) break; @@ -145,7 +147,7 @@ await Retry.Until("service is found", async () => }); var count = 0; - await foreach (var msg in nats.RequestManyAsync(subject, null, replySerializer: serializer, replyOpts: replyOpts, cancellationToken: ct).ConfigureAwait(false)) + await foreach (var msg in nats.RequestManyAsync(subject, "{}", replySerializer: serializer, replyOpts: replyOpts, cancellationToken: ct).ConfigureAwait(false)) { responses.Add(msg.Data!); if (++count == limit)