From 5cf76c178f18fdbc87dded4d7c42b21a2070d448 Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Tue, 29 Aug 2023 12:29:55 +0100 Subject: [PATCH] Consume and fetch all --- .../Example.JetStream.PullConsumer/Program.cs | 34 ++++++++++++++++++- src/NATS.Client.JetStream/NatsJSConsumer.cs | 23 +++++++++++++ 2 files changed, 56 insertions(+), 1 deletion(-) diff --git a/sandbox/Example.JetStream.PullConsumer/Program.cs b/sandbox/Example.JetStream.PullConsumer/Program.cs index 7542c8b6a..22e7fd59f 100644 --- a/sandbox/Example.JetStream.PullConsumer/Program.cs +++ b/sandbox/Example.JetStream.PullConsumer/Program.cs @@ -32,6 +32,23 @@ } } } +else if (args.Length > 0 && args[0] == "fetch-all") +{ + while (true) + { + Console.WriteLine($"___\nFETCH {batch}"); + var opts = new NatsJSFetchOpts + { + MaxMsgs = batch, Expires = expires, IdleHeartbeat = idle, Serializer = new RawDataSerializer(), + }; + await foreach (var jsMsg in consumer.FetchAllAsync(opts)) + { + var msg = jsMsg.Msg; + Console.WriteLine($"data: {msg.Data}"); + await jsMsg.AckAsync(); + } + } +} else if (args.Length > 0 && args[0] == "next") { while (true) @@ -49,7 +66,7 @@ } } } -else +else if (args.Length > 0 && args[0] == "consume") { Console.WriteLine("___\nCONSUME"); await using var sub = await consumer.ConsumeAsync(new NatsJSConsumeOpts @@ -64,3 +81,18 @@ await jsMsg.AckAsync(); } } +else if (args.Length > 0 && args[0] == "consume-all") +{ + Console.WriteLine("___\nCONSUME-ALL"); + var opts = new NatsJSConsumeOpts + { + MaxMsgs = batch, Expires = expires, IdleHeartbeat = idle, Serializer = new RawDataSerializer(), + }; + + await foreach (var jsMsg in consumer.ConsumeAllAsync(opts)) + { + var msg = jsMsg.Msg; + Console.WriteLine($"data: {msg.Data}"); + await jsMsg.AckAsync(); + } +} diff --git a/src/NATS.Client.JetStream/NatsJSConsumer.cs b/src/NATS.Client.JetStream/NatsJSConsumer.cs index b2fd50813..d6abc6d20 100644 --- a/src/NATS.Client.JetStream/NatsJSConsumer.cs +++ b/src/NATS.Client.JetStream/NatsJSConsumer.cs @@ -1,3 +1,4 @@ +using System.Runtime.CompilerServices; using System.Threading.Channels; using NATS.Client.Core; using NATS.Client.JetStream.Internal; @@ -28,6 +29,17 @@ public async ValueTask DeleteAsync(CancellationToken cancellationToken = d return _deleted = await _context.DeleteConsumerAsync(_stream, _consumer, cancellationToken); } + public async IAsyncEnumerable> ConsumeAllAsync( + NatsJSConsumeOpts opts, + [EnumeratorCancellation] CancellationToken cancellationToken = default) + { + await using var cc = await ConsumeAsync(opts, cancellationToken); + await foreach (var jsMsg in cc.Msgs.ReadAllAsync(cancellationToken)) + { + yield return jsMsg; + } + } + public async ValueTask> ConsumeAsync(NatsJSConsumeOpts opts, CancellationToken cancellationToken = default) { ThrowIfDeleted(); @@ -102,6 +114,17 @@ await sub.CallMsgNextAsync( return default; } + public async IAsyncEnumerable> FetchAllAsync( + NatsJSFetchOpts opts, + [EnumeratorCancellation] CancellationToken cancellationToken = default) + { + await using var fc = await FetchAsync(opts, cancellationToken); + await foreach (var jsMsg in fc.Msgs.ReadAllAsync(cancellationToken)) + { + yield return jsMsg; + } + } + public async ValueTask> FetchAsync( NatsJSFetchOpts opts, CancellationToken cancellationToken = default)