diff --git a/src/NATS.Client.Core/NatsConnection.RequestReply.cs b/src/NATS.Client.Core/NatsConnection.RequestReply.cs index a3576cbf5..f5bfa0838 100644 --- a/src/NATS.Client.Core/NatsConnection.RequestReply.cs +++ b/src/NATS.Client.Core/NatsConnection.RequestReply.cs @@ -7,7 +7,17 @@ namespace NATS.Client.Core; public partial class NatsConnection { - private static readonly NatsSubOpts DefaultReplyOpts = new() { MaxMsgs = 1 }; + private static readonly NatsSubOpts ReplyOptsDefault = new NatsSubOpts + { + MaxMsgs = 1, + ThrowIfNoResponders = true, + }; + + private static readonly NatsSubOpts ReplyManyOptsDefault = new NatsSubOpts + { + StopOnEmptyMsg = true, + ThrowIfNoResponders = true, + }; /// public string NewInbox() => NewInbox(InboxPrefix); @@ -23,20 +33,14 @@ public async ValueTask> RequestAsync( NatsSubOpts? replyOpts = default, CancellationToken cancellationToken = default) { - var opts = SetReplyOptsDefaults(replyOpts); - - await using var sub = await RequestSubAsync(subject, data, headers, requestSerializer, replySerializer, requestOpts, opts, cancellationToken) + replyOpts = SetReplyOptsDefaults(replyOpts); + await using var sub = await RequestSubAsync(subject, data, headers, requestSerializer, replySerializer, requestOpts, replyOpts, cancellationToken) .ConfigureAwait(false); if (await sub.Msgs.WaitToReadAsync(cancellationToken).ConfigureAwait(false)) { if (sub.Msgs.TryRead(out var msg)) { - if (msg.IsNoRespondersError) - { - throw new NatsNoRespondersException(); - } - return msg; } } @@ -55,6 +59,7 @@ public async IAsyncEnumerable> RequestManyAsync(subject, data, headers, requestSerializer, replySerializer, requestOpts, replyOpts, cancellationToken) .ConfigureAwait(false); @@ -62,12 +67,6 @@ public async IAsyncEnumerable> RequestManyAsync( T? Data, INatsConnection? Connection) { - public bool IsNoRespondersError => Headers?.Code == 503; - internal static NatsMsg Build( string subject, string? replyTo, diff --git a/src/NATS.Client.Core/NatsSubBase.cs b/src/NATS.Client.Core/NatsSubBase.cs index 1b97fefeb..1ec37c33f 100644 --- a/src/NATS.Client.Core/NatsSubBase.cs +++ b/src/NATS.Client.Core/NatsSubBase.cs @@ -15,6 +15,7 @@ public enum NatsSubEndReason MaxBytes, Timeout, IdleTimeout, + EmptyMsg, IdleHeartbeatTimeout, StartUpTimeout, Cancelled, @@ -24,6 +25,7 @@ public enum NatsSubEndReason public abstract class NatsSubBase { + private static readonly byte[] NoRespondersHeaderSequence = { (byte)' ', (byte)'5', (byte)'0', (byte)'3' }; private readonly ILogger _logger; private readonly bool _debug; private readonly ISubscriptionManager _manager; @@ -193,6 +195,20 @@ public virtual async ValueTask ReceiveAsync(string subject, string? replyTo, Rea { ResetIdleTimeout(); + // check for empty payload conditions + if (payloadBuffer.Length == 0) + { + switch (Opts) + { + case { ThrowIfNoResponders: true } when headersBuffer is { Length: >= 12 } && headersBuffer.Value.Slice(8, 4).ToSpan().SequenceEqual(NoRespondersHeaderSequence): + SetException(new NatsNoRespondersException()); + return; + case { StopOnEmptyMsg: true }: + EndSubscription(NatsSubEndReason.EmptyMsg); + return; + } + } + try { // Need to await to handle any exceptions diff --git a/src/NATS.Client.Core/NatsSubOpts.cs b/src/NATS.Client.Core/NatsSubOpts.cs index caf2e9302..699893e1f 100644 --- a/src/NATS.Client.Core/NatsSubOpts.cs +++ b/src/NATS.Client.Core/NatsSubOpts.cs @@ -42,6 +42,27 @@ public record NatsSubOpts /// public TimeSpan? IdleTimeout { get; init; } + /// + /// If true, end the subscription upon receiving an empty message. + /// The empty message will not be delivered to the subscription. + /// + /// + /// If not set, all published messages will be received until explicitly + /// unsubscribed or disposed. + /// + public bool? StopOnEmptyMsg { get; init; } + + /// + /// If true, end the subscription and throw an exception if a + /// no responders status message is received. + /// The no responders status message will not be delivered to the subscription. + /// + /// + /// If not set, all published messages will be received until explicitly + /// unsubscribed or disposed. + /// + public bool? ThrowIfNoResponders { get; init; } + /// /// Allows Configuration of options for a subscription. /// diff --git a/src/NATS.Client.JetStream/NatsJSContext.cs b/src/NATS.Client.JetStream/NatsJSContext.cs index 10d0c2397..9ed556ef6 100644 --- a/src/NATS.Client.JetStream/NatsJSContext.cs +++ b/src/NATS.Client.JetStream/NatsJSContext.cs @@ -133,6 +133,9 @@ public async ValueTask PublishAsync( // without the timeout the publish call will hang forever since the server // which received the request won't be there to respond anymore. Timeout = Connection.Opts.RequestTimeout, + + // If JetStream is disabled, a no responders error will be returned + ThrowIfNoResponders = true, }, cancellationToken) .ConfigureAwait(false); @@ -141,11 +144,6 @@ public async ValueTask PublishAsync( { while (sub.Msgs.TryRead(out var msg)) { - if (msg.IsNoRespondersError) - { - throw new NatsNoRespondersException(); - } - if (msg.Data == null) { throw new NatsJSException("No response data received"); diff --git a/tests/NATS.Client.CheckNativeAot/Program.cs b/tests/NATS.Client.CheckNativeAot/Program.cs index 8d9016cc6..dead43d03 100644 --- a/tests/NATS.Client.CheckNativeAot/Program.cs +++ b/tests/NATS.Client.CheckNativeAot/Program.cs @@ -42,6 +42,9 @@ async Task RequestReplyTests() await msg.ReplyAsync(null); // sentinel }); + // make sure subs have started + await nats.PingAsync(); + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var results = new[] { 2, 3, 4 }; var count = 0; @@ -307,6 +310,9 @@ await grp2.AddEndpointAsync( handler: _ => ValueTask.CompletedTask, cancellationToken: cancellationToken); + // make sure subs have started + await nats.PingAsync(); + // Check that the endpoints are registered correctly { var info = (await nats.FindServicesAsync("$SRV.INFO.s1", 1, NatsSrvJsonSerializer.Default, cancellationToken)).First(); @@ -345,6 +351,9 @@ await s2.AddEndpointAsync( metadata: new Dictionary { { "ep-k1", "ep-v1" } }, cancellationToken: cancellationToken); + // make sure subs have started + await nats.PingAsync(); + // Check default queue group and stats handler { var info = (await nats.FindServicesAsync("$SRV.INFO.s2", 1, NatsSrvJsonSerializer.Default, cancellationToken)).First(); @@ -406,6 +415,9 @@ await s1.AddEndpointAsync( }, cancellationToken: cancellationToken); + // make sure subs have started + await nats.PingAsync(); + var info = (await nats.FindServicesAsync("$SRV.INFO", 1, NatsSrvJsonSerializer.Default, cancellationToken)).First(); AssertSingle(info.Endpoints); var endpointInfo = info.Endpoints.First();