Skip to content

Commit

Permalink
Explicitly set service listener serialization (#304)
Browse files Browse the repository at this point in the history
This is necessary when the default serializer isn't using a Raw serialization
for NatsMemory type which is used by the service listener.
  • Loading branch information
mtmk authored Jan 3, 2024
1 parent aa32a49 commit b432a2f
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 3 deletions.
2 changes: 1 addition & 1 deletion src/NATS.Client.Services/Internal/SvcListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public ValueTask StartAsync()
_cts = CancellationTokenSource.CreateLinkedTokenSource(_cancellationToken);
_readLoop = Task.Run(async () =>
{
await foreach (var msg in _nats.SubscribeAsync<NatsMemoryOwner<byte>>(_subject, _queueGroup, cancellationToken: _cts.Token))
await foreach (var msg in _nats.SubscribeAsync<NatsMemoryOwner<byte>>(_subject, _queueGroup, serializer: NatsRawSerializer<NatsMemoryOwner<byte>>.Default, cancellationToken: _cts.Token))
{
await _channel.Writer.WriteAsync(new SvcMsg(_type, msg), _cancellationToken).ConfigureAwait(false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\src\NATS.Client.Serializers.Json\NATS.Client.Serializers.Json.csproj" />
<ProjectReference Include="..\..\src\NATS.Client.Services\NATS.Client.Services.csproj" />
<ProjectReference Include="..\NATS.Client.TestUtilities\NATS.Client.TestUtilities.csproj" />
</ItemGroup>
Expand Down
43 changes: 43 additions & 0 deletions tests/NATS.Client.Services.Tests/ServicesSerializationTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
using System.Buffers;
using NATS.Client.Core.Tests;
using NATS.Client.Serializers.Json;
using NATS.Client.Services.Internal;
using NATS.Client.Services.Models;

namespace NATS.Client.Services.Tests;

public class ServicesSerializationTest
{
private readonly ITestOutputHelper _output;

public ServicesSerializationTest(ITestOutputHelper output) => _output = output;

[Fact]
public async Task Service_info_and_stat_request_serialization()
{
await using var server = NatsServer.Start();

// Set serializer registry to use anything but a raw bytes (NatsMemory in this case) serializer
await using var nats = server.CreateClientConnection(new NatsOpts { SerializerRegistry = NatsJsonSerializerRegistry.Default });

var svc = new NatsSvcContext(nats);

var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
var cancellationToken = cts.Token;

await using var s1 = await svc.AddServiceAsync("s1", "1.0.0", cancellationToken: cancellationToken);

var infosTask = nats.FindServicesAsync("$SRV.INFO", 1, NatsSrvJsonSerializer<InfoResponse>.Default, cancellationToken);
var statsTask = nats.FindServicesAsync("$SRV.STATS", 1, NatsSrvJsonSerializer<StatsResponse>.Default, cancellationToken);

var infos = await infosTask;
Assert.Single(infos);
Assert.Equal("s1", infos[0].Name);
Assert.Equal("1.0.0", infos[0].Version);

var stats = await statsTask;
Assert.Single(stats);
Assert.Equal("s1", stats[0].Name);
Assert.Equal("1.0.0", stats[0].Version);
}
}
6 changes: 4 additions & 2 deletions tests/NATS.Client.TestUtilities/Utils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,9 @@ public static async Task<List<T>> FindServicesAsync<T>(this NatsConnection nats,
await Retry.Until("service is found", async () =>
{
var count = 0;
await foreach (var msg in nats.RequestManyAsync<object?, T>(subject, null, replySerializer: serializer, replyOpts: replyOpts, cancellationToken: ct).ConfigureAwait(false))

// nats cli sends an empty JSON object '{}' as the request payload so we do the same here
await foreach (var msg in nats.RequestManyAsync<string, T>(subject, "{}", replySerializer: serializer, replyOpts: replyOpts, cancellationToken: ct).ConfigureAwait(false))
{
if (++count == limit)
break;
Expand All @@ -145,7 +147,7 @@ await Retry.Until("service is found", async () =>
});

var count = 0;
await foreach (var msg in nats.RequestManyAsync<object?, T>(subject, null, replySerializer: serializer, replyOpts: replyOpts, cancellationToken: ct).ConfigureAwait(false))
await foreach (var msg in nats.RequestManyAsync<string, T>(subject, "{}", replySerializer: serializer, replyOpts: replyOpts, cancellationToken: ct).ConfigureAwait(false))
{
responses.Add(msg.Data!);
if (++count == limit)
Expand Down

0 comments on commit b432a2f

Please sign in to comment.