From 8e2fbbdce4f4dc9e543992f7d0a80f5a38825ac3 Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Fri, 5 Jan 2024 16:05:44 +0000 Subject: [PATCH] Svc info subs do not use queue group (#308) --- .../Internal/SvcListener.cs | 4 +-- src/NATS.Client.Services/NatsSvcServer.cs | 3 +- .../ServicesTests.cs | 33 +++++++++++++++++++ 3 files changed, 37 insertions(+), 3 deletions(-) diff --git a/src/NATS.Client.Services/Internal/SvcListener.cs b/src/NATS.Client.Services/Internal/SvcListener.cs index 1c138ac46..7220155a2 100644 --- a/src/NATS.Client.Services/Internal/SvcListener.cs +++ b/src/NATS.Client.Services/Internal/SvcListener.cs @@ -9,12 +9,12 @@ internal class SvcListener : IAsyncDisposable private readonly Channel _channel; private readonly SvcMsgType _type; private readonly string _subject; - private readonly string _queueGroup; + private readonly string? _queueGroup; private readonly CancellationToken _cancellationToken; private Task? _readLoop; private CancellationTokenSource? _cts; - public SvcListener(NatsConnection nats, Channel channel, SvcMsgType type, string subject, string queueGroup, CancellationToken cancellationToken) + public SvcListener(NatsConnection nats, Channel channel, SvcMsgType type, string subject, string? queueGroup, CancellationToken cancellationToken) { _nats = nats; _channel = channel; diff --git a/src/NATS.Client.Services/NatsSvcServer.cs b/src/NATS.Client.Services/NatsSvcServer.cs index 2b55b42f3..f99efbeaf 100644 --- a/src/NATS.Client.Services/NatsSvcServer.cs +++ b/src/NATS.Client.Services/NatsSvcServer.cs @@ -193,7 +193,8 @@ internal async ValueTask StartAsync() var type = svcType.ToString().ToUpper(); foreach (var subject in new[] { $"$SRV.{type}", $"$SRV.{type}.{name}", $"$SRV.{type}.{name}.{_id}" }) { - var svcListener = new SvcListener(_nats, _channel, svcType, subject, _config.QueueGroup, _cancellationToken); + // for discovery subjects do not use a queue group + var svcListener = new SvcListener(_nats, _channel, svcType, subject, default, _cancellationToken); await svcListener.StartAsync(); _svcListeners.Add(svcListener); } diff --git a/tests/NATS.Client.Services.Tests/ServicesTests.cs b/tests/NATS.Client.Services.Tests/ServicesTests.cs index 1f3f6c139..36c43569b 100644 --- a/tests/NATS.Client.Services.Tests/ServicesTests.cs +++ b/tests/NATS.Client.Services.Tests/ServicesTests.cs @@ -231,4 +231,37 @@ await s2.AddEndpointAsync( Assert.Equal("s2baz", eps.Data["ep_name"]?.GetValue()); } } + + [Fact] + public async Task Add_multiple_service_listeners_ping_info_and_stats() + { + await using var server = NatsServer.Start(); + await using var nats = server.CreateClientConnection(); + var svc = new NatsSvcContext(nats); + + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + var cancellationToken = cts.Token; + + await using var s1 = await svc.AddServiceAsync("s1", "1.0.0", cancellationToken: cancellationToken); + await using var s2 = await svc.AddServiceAsync("s2", "2.0.0", cancellationToken: cancellationToken); + + var pingsTask = nats.FindServicesAsync("$SRV.PING", 2, NatsSrvJsonSerializer.Default, cancellationToken); + var infosTask = nats.FindServicesAsync("$SRV.INFO", 2, NatsSrvJsonSerializer.Default, cancellationToken); + var statsTask = nats.FindServicesAsync("$SRV.STATS", 2, NatsSrvJsonSerializer.Default, cancellationToken); + + var pings = await pingsTask; + Assert.Equal(2, pings.Count); + Assert.Equal("1.0.0", pings.First(s => s.Name == "s1").Version); + Assert.Equal("2.0.0", pings.First(s => s.Name == "s2").Version); + + var infos = await infosTask; + Assert.Equal(2, infos.Count); + Assert.Equal("1.0.0", infos.First(s => s.Name == "s1").Version); + Assert.Equal("2.0.0", infos.First(s => s.Name == "s2").Version); + + var stats = await statsTask; + Assert.Equal(2, stats.Count); + Assert.Equal("1.0.0", stats.First(s => s.Name == "s1").Version); + Assert.Equal("2.0.0", stats.First(s => s.Name == "s2").Version); + } }