From 075f749e31d06a5bd41099a7bd9893493f0927dd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=94=D0=B5=D0=BC=D0=B8=D0=B4=D0=BE=D0=B2=20=D0=98=D0=B2?= =?UTF-8?q?=D0=B0=D0=BD?= Date: Sat, 23 Nov 2024 10:58:29 +0500 Subject: [PATCH] #636 - JetStream Batch Get Client support * Added implementation GetBatchDirectAsync --- src/NATS.Client.JetStream/INatsJSStream.cs | 6 ++--- src/NATS.Client.JetStream/NatsJSStream.cs | 29 ++++++++++++++++++++++ 2 files changed, 32 insertions(+), 3 deletions(-) diff --git a/src/NATS.Client.JetStream/INatsJSStream.cs b/src/NATS.Client.JetStream/INatsJSStream.cs index 7b041304d..840de8dd3 100644 --- a/src/NATS.Client.JetStream/INatsJSStream.cs +++ b/src/NATS.Client.JetStream/INatsJSStream.cs @@ -117,10 +117,10 @@ ValueTask UpdateAsync( /// /// Batch message request. /// Serializer to use for the message type. + /// true to send the last empty message with eobCode in the header; otherwise false /// A used to cancel the API call. - /// - /// - IAsyncEnumerable> GetBatchDirectAsync(StreamMsgBatchGetRequest request, INatsDeserialize? serializer = default, CancellationToken cancellationToken = default); + /// There was an issue, stream must have allow direct set. + IAsyncEnumerable> GetBatchDirectAsync(StreamMsgBatchGetRequest request, INatsDeserialize? serializer = default, bool includeEob = false, CancellationToken cancellationToken = default); ValueTask GetAsync(StreamMsgGetRequest request, CancellationToken cancellationToken = default); } diff --git a/src/NATS.Client.JetStream/NatsJSStream.cs b/src/NATS.Client.JetStream/NatsJSStream.cs index 54f8b63e0..50ab3da2f 100644 --- a/src/NATS.Client.JetStream/NatsJSStream.cs +++ b/src/NATS.Client.JetStream/NatsJSStream.cs @@ -181,6 +181,27 @@ public ValueTask> GetDirectAsync(StreamMsgGetRequest request, INat cancellationToken: cancellationToken); } + /// + /// Request a direct batch message + /// + /// Batch message request. + /// Serializer to use for the message type. + /// true to send the last empty message with eobCode in the header; otherwise false + /// A used to cancel the API call. + /// There was an issue, stream must have allow direct set. + public IAsyncEnumerable> GetBatchDirectAsync(StreamMsgBatchGetRequest request, INatsDeserialize? serializer = default, bool includeEob = false, CancellationToken cancellationToken = default) + { + ValidateStream(); + + return _context.Connection.RequestManyAsync( + subject: $"{_context.Opts.Prefix}.DIRECT.GET.{_name}", + data: request, + requestSerializer: NatsJSJsonSerializer.Default, + replySerializer: serializer, + replyOpts: new NatsSubOpts() { StopOnEmptyMsg = !includeEob, ThrowIfNoResponders = true }, + cancellationToken: cancellationToken); + } + public ValueTask GetAsync(StreamMsgGetRequest request, CancellationToken cancellationToken = default) => _context.JSRequestResponseAsync( subject: $"{_context.Opts.Prefix}.STREAM.MSG.GET.{_name}", @@ -192,4 +213,12 @@ private void ThrowIfDeleted() if (_deleted) throw new NatsJSException($"Stream '{_name}' is deleted"); } + + private void ValidateStream() + { + if (!Info.Config.AllowDirect) + { + throw new InvalidOperationException("StreamMsgBatchGetRequest is not permitted when AllowDirect on stream disable"); + } + } }