Skip to content

Commit

Permalink
Consume and fetch all
Browse files Browse the repository at this point in the history
  • Loading branch information
mtmk committed Aug 29, 2023
1 parent c2547a7 commit 5cf76c1
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 1 deletion.
34 changes: 33 additions & 1 deletion sandbox/Example.JetStream.PullConsumer/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,23 @@
}
}
}
else if (args.Length > 0 && args[0] == "fetch-all")
{
while (true)
{
Console.WriteLine($"___\nFETCH {batch}");
var opts = new NatsJSFetchOpts
{
MaxMsgs = batch, Expires = expires, IdleHeartbeat = idle, Serializer = new RawDataSerializer(),
};
await foreach (var jsMsg in consumer.FetchAllAsync<RawData>(opts))
{
var msg = jsMsg.Msg;
Console.WriteLine($"data: {msg.Data}");
await jsMsg.AckAsync();
}
}
}
else if (args.Length > 0 && args[0] == "next")
{
while (true)
Expand All @@ -49,7 +66,7 @@
}
}
}
else
else if (args.Length > 0 && args[0] == "consume")
{
Console.WriteLine("___\nCONSUME");
await using var sub = await consumer.ConsumeAsync<RawData>(new NatsJSConsumeOpts
Expand All @@ -64,3 +81,18 @@
await jsMsg.AckAsync();
}
}
else if (args.Length > 0 && args[0] == "consume-all")
{
Console.WriteLine("___\nCONSUME-ALL");
var opts = new NatsJSConsumeOpts
{
MaxMsgs = batch, Expires = expires, IdleHeartbeat = idle, Serializer = new RawDataSerializer(),
};

await foreach (var jsMsg in consumer.ConsumeAllAsync<RawData>(opts))
{
var msg = jsMsg.Msg;
Console.WriteLine($"data: {msg.Data}");
await jsMsg.AckAsync();
}
}
23 changes: 23 additions & 0 deletions src/NATS.Client.JetStream/NatsJSConsumer.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System.Runtime.CompilerServices;
using System.Threading.Channels;
using NATS.Client.Core;
using NATS.Client.JetStream.Internal;
Expand Down Expand Up @@ -28,6 +29,17 @@ public async ValueTask<bool> DeleteAsync(CancellationToken cancellationToken = d
return _deleted = await _context.DeleteConsumerAsync(_stream, _consumer, cancellationToken);
}

public async IAsyncEnumerable<NatsJSMsg<T?>> ConsumeAllAsync<T>(
NatsJSConsumeOpts opts,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
await using var cc = await ConsumeAsync<T>(opts, cancellationToken);
await foreach (var jsMsg in cc.Msgs.ReadAllAsync(cancellationToken))
{
yield return jsMsg;
}
}

public async ValueTask<INatsJSSubConsume<T>> ConsumeAsync<T>(NatsJSConsumeOpts opts, CancellationToken cancellationToken = default)
{
ThrowIfDeleted();
Expand Down Expand Up @@ -102,6 +114,17 @@ await sub.CallMsgNextAsync(
return default;
}

public async IAsyncEnumerable<NatsJSMsg<T?>> FetchAllAsync<T>(
NatsJSFetchOpts opts,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
await using var fc = await FetchAsync<T>(opts, cancellationToken);
await foreach (var jsMsg in fc.Msgs.ReadAllAsync(cancellationToken))
{
yield return jsMsg;
}
}

public async ValueTask<INatsJSSubConsume<T>> FetchAsync<T>(
NatsJSFetchOpts opts,
CancellationToken cancellationToken = default)
Expand Down

0 comments on commit 5cf76c1

Please sign in to comment.