diff --git a/src/NATS.Client.Core/NatsSubBase.cs b/src/NATS.Client.Core/NatsSubBase.cs index c2c2bc9f6..8d1b0ad12 100644 --- a/src/NATS.Client.Core/NatsSubBase.cs +++ b/src/NATS.Client.Core/NatsSubBase.cs @@ -10,6 +10,7 @@ namespace NATS.Client.Core; public enum NatsSubEndReason { None, + NoMsgs, MaxMsgs, MaxBytes, Timeout, diff --git a/src/NATS.Client.JetStream/Internal/NatsJSFetch.cs b/src/NATS.Client.JetStream/Internal/NatsJSFetch.cs index 2d69b663d..f01bd171d 100644 --- a/src/NATS.Client.JetStream/Internal/NatsJSFetch.cs +++ b/src/NATS.Client.JetStream/Internal/NatsJSFetch.cs @@ -172,7 +172,11 @@ protected override async ValueTask ReceiveInternalAsync( var headers = new NatsHeaders(); if (Connection.HeaderParser.ParseHeaders(new SequenceReader(headersBuffer.Value), headers)) { - if (headers is { Code: 408, Message: NatsHeaders.Messages.RequestTimeout }) + if (headers is { Code: 404 }) + { + EndSubscription(NatsSubEndReason.NoMsgs); + } + else if (headers is { Code: 408, Message: NatsHeaders.Messages.RequestTimeout }) { EndSubscription(NatsSubEndReason.Timeout); } diff --git a/src/NATS.Client.JetStream/NatsJSConsumer.cs b/src/NATS.Client.JetStream/NatsJSConsumer.cs index de55fcd6d..7dc1df3d0 100644 --- a/src/NATS.Client.JetStream/NatsJSConsumer.cs +++ b/src/NATS.Client.JetStream/NatsJSConsumer.cs @@ -197,6 +197,20 @@ await sub.CallMsgNextAsync( } } + public async IAsyncEnumerable> FetchNoWait( + NatsJSFetchOpts? opts = default, + [EnumeratorCancellation] CancellationToken cancellationToken = default) + { + ThrowIfDeleted(); + opts ??= _context.Opts.DefaultFetchOpts; + + await using var fc = await FetchAsync(opts with { NoWait = true }, cancellationToken); + await foreach (var jsMsg in fc.Msgs.ReadAllAsync(cancellationToken)) + { + yield return jsMsg; + } + } + /// /// Consume a set number of messages from the stream using this consumer. /// @@ -235,13 +249,20 @@ public async ValueTask> FetchAsync( await _context.Connection.SubAsync(sub: sub, cancellationToken); await sub.CallMsgNextAsync( - new ConsumerGetnextRequest - { - Batch = max.MaxMsgs, - MaxBytes = max.MaxBytes, - IdleHeartbeat = timeouts.IdleHeartbeat.ToNanos(), - Expires = timeouts.Expires.ToNanos(), - }, + opts.NoWait + + // When no wait is set we don't need to send the idle heartbeat and expiration + // If no message is available the server will respond with a 404 immediately + // If messages are available the server will send a 408 direct after the last message + ? new ConsumerGetnextRequest {Batch = max.MaxMsgs, MaxBytes = max.MaxBytes, NoWait = opts.NoWait} + : new ConsumerGetnextRequest + { + Batch = max.MaxMsgs, + MaxBytes = max.MaxBytes, + IdleHeartbeat = timeouts.IdleHeartbeat.ToNanos(), + Expires = timeouts.Expires.ToNanos(), + NoWait = opts.NoWait, + }, cancellationToken); sub.ResetHeartbeatTimer(); diff --git a/src/NATS.Client.JetStream/NatsJSOpts.cs b/src/NATS.Client.JetStream/NatsJSOpts.cs index c2bd7d6b4..8e5081e7b 100644 --- a/src/NATS.Client.JetStream/NatsJSOpts.cs +++ b/src/NATS.Client.JetStream/NatsJSOpts.cs @@ -154,6 +154,11 @@ public record NatsJSFetchOpts /// public TimeSpan? IdleHeartbeat { get; init; } + /// + /// Does not wait for messages to be available + /// + internal bool NoWait { get; init; } + /// /// Serializer to use to deserialize the message if a model is being used. /// diff --git a/tests/NATS.Client.JetStream.Tests/ConsumerFetchTest.cs b/tests/NATS.Client.JetStream.Tests/ConsumerFetchTest.cs index bc0223c63..ceeb15245 100644 --- a/tests/NATS.Client.JetStream.Tests/ConsumerFetchTest.cs +++ b/tests/NATS.Client.JetStream.Tests/ConsumerFetchTest.cs @@ -39,6 +39,34 @@ public async Task Fetch_test() Assert.Equal(10, count); } + [Fact] + public async Task FetchNoWait_test() + { + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + await using var server = NatsServer.StartJS(); + await using var nats = server.CreateClientConnection(); + var js = new NatsJSContext(nats); + await js.CreateStreamAsync("s1", new[] { "s1.*" }, cts.Token); + await js.CreateConsumerAsync("s1", "c1", cancellationToken: cts.Token); + + for (var i = 0; i < 10; i++) + { + var ack = await js.PublishAsync("s1.foo", new TestData { Test = i }, cancellationToken: cts.Token); + ack.EnsureSuccess(); + } + + var consumer = await js.GetConsumerAsync("s1", "c1", cts.Token); + var count = 0; + await foreach (var msg in consumer.FetchNoWait(new NatsJSFetchOpts { MaxMsgs = 10 }, cancellationToken: cts.Token)) + { + await msg.AckAsync(new AckOpts(WaitUntilSent: true), cts.Token); + Assert.Equal(count, msg.Data!.Test); + count++; + } + + Assert.Equal(10, count); + } + [Fact] public async Task Fetch_dispose_test() {