From c2547a7c79a0f27f5cb31c82db6717c4d8bcd294 Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Tue, 29 Aug 2023 12:17:37 +0100 Subject: [PATCH] wip --- .../Example.JetStream.PullConsumer/Program.cs | 63 +++++++++++---- src/NATS.Client.JetStream/NatsJSConsumer.cs | 20 +++-- src/NATS.Client.JetStream/NatsJSOpts.cs | 9 +++ src/NATS.Client.JetStream/NatsJSSubConsume.cs | 4 +- src/NATS.Client.JetStream/NatsJSSubFetch.cs | 79 +++++++++++-------- 5 files changed, 119 insertions(+), 56 deletions(-) diff --git a/sandbox/Example.JetStream.PullConsumer/Program.cs b/sandbox/Example.JetStream.PullConsumer/Program.cs index 36904b296..7542c8b6a 100644 --- a/sandbox/Example.JetStream.PullConsumer/Program.cs +++ b/sandbox/Example.JetStream.PullConsumer/Program.cs @@ -1,11 +1,7 @@ -using System.Threading.Channels; -using Example.JetStream.PullConsumer; +using Example.JetStream.PullConsumer; using Microsoft.Extensions.Logging; using NATS.Client.Core; -using NATS.Client.Core.Internal; using NATS.Client.JetStream; -using NATS.Client.JetStream.Internal; -using NATS.Client.JetStream.Models; var options = NatsOptions.Default with { LoggerFactory = new MinimumConsoleLoggerFactory(LogLevel.Error) }; @@ -19,17 +15,52 @@ var expires = TimeSpan.FromSeconds(10); var batch = 10; -await using var sub = await consumer.ConsumeAsync(new NatsJSConsumeOpts +if (args.Length > 0 && args[0] == "fetch") { - MaxMsgs = batch, - Expires = expires, - IdleHeartbeat = idle, - Serializer = new RawDataSerializer(), -}); - -await foreach (var jsMsg in sub.Msgs.ReadAllAsync()) + while (true) + { + Console.WriteLine($"___\nFETCH {batch}"); + await using var sub = await consumer.FetchAsync(new NatsJSFetchOpts + { + MaxMsgs = batch, Expires = expires, IdleHeartbeat = idle, Serializer = new RawDataSerializer(), + }); + await foreach (var jsMsg in sub.Msgs.ReadAllAsync()) + { + var msg = jsMsg.Msg; + Console.WriteLine($"data: {msg.Data}"); + await jsMsg.AckAsync(); + } + } +} +else if (args.Length > 0 && args[0] == "next") { - var msg = jsMsg.Msg; - Console.WriteLine($"data: {msg.Data}"); - await jsMsg.AckAsync(); + while (true) + { + Console.WriteLine("___\nNEXT"); + var next = await consumer.NextAsync(new NatsJSNextOpts + { + Expires = expires, IdleHeartbeat = idle, Serializer = new RawDataSerializer(), + }); + if (next is { } jsMsg) + { + var msg = jsMsg.Msg; + Console.WriteLine($"data: {msg.Data}"); + await jsMsg.AckAsync(); + } + } +} +else +{ + Console.WriteLine("___\nCONSUME"); + await using var sub = await consumer.ConsumeAsync(new NatsJSConsumeOpts + { + MaxMsgs = batch, Expires = expires, IdleHeartbeat = idle, Serializer = new RawDataSerializer(), + }); + + await foreach (var jsMsg in sub.Msgs.ReadAllAsync()) + { + var msg = jsMsg.Msg; + Console.WriteLine($"data: {msg.Data}"); + await jsMsg.AckAsync(); + } } diff --git a/src/NATS.Client.JetStream/NatsJSConsumer.cs b/src/NATS.Client.JetStream/NatsJSConsumer.cs index 4811b33bc..b2fd50813 100644 --- a/src/NATS.Client.JetStream/NatsJSConsumer.cs +++ b/src/NATS.Client.JetStream/NatsJSConsumer.cs @@ -77,19 +77,29 @@ await sub.CallMsgNextAsync( cancellationToken); sub.ResetPending(); + sub.ResetHeartbeatTimer(); return sub; } - public async ValueTask> NextAsync(CancellationToken cancellationToken = default) + public async ValueTask?> NextAsync(NatsJSNextOpts opts, CancellationToken cancellationToken = default) { - await using var f = await FetchAsync(new NatsJSFetchOpts { MaxMsgs = 1 }, cancellationToken: cancellationToken); + await using var f = await FetchAsync( + new NatsJSFetchOpts + { + MaxMsgs = 1, + IdleHeartbeat = opts.IdleHeartbeat, + Expires = opts.Expires, + Serializer = opts.Serializer, + }, + cancellationToken: cancellationToken); + await foreach (var natsJSMsg in f.Msgs.ReadAllAsync(cancellationToken)) { return natsJSMsg; } - throw new NatsJSException("No data"); + return default; } public async ValueTask> FetchAsync( @@ -114,8 +124,6 @@ public async ValueTask> FetchAsync( Capacity = 1, FullMode = BoundedChannelFullMode.Wait, }, - MaxMsgs = (int)max.MaxMsgs, - Timeout = timeouts.Expires, }; var sub = new NatsJSSubFetch( @@ -144,6 +152,8 @@ await sub.CallMsgNextAsync( }, cancellationToken); + sub.ResetHeartbeatTimer(); + return sub; } diff --git a/src/NATS.Client.JetStream/NatsJSOpts.cs b/src/NATS.Client.JetStream/NatsJSOpts.cs index dfcc584f7..92450703b 100644 --- a/src/NATS.Client.JetStream/NatsJSOpts.cs +++ b/src/NATS.Client.JetStream/NatsJSOpts.cs @@ -81,6 +81,15 @@ public record NatsJSNextOpts /// Amount idle time the server should wait before sending a heartbeat. For requests with expires > 30s, heartbeats should be enabled by default /// public TimeSpan? IdleHeartbeat { get; init; } + + /// + /// Serializer to use to deserialize the message if a model is being used. + /// + /// + /// If not set, serializer set in connection options or the default JSON serializer + /// will be used. + /// + public INatsSerializer? Serializer { get; init; } } public record NatsJSFetchOpts diff --git a/src/NATS.Client.JetStream/NatsJSSubConsume.cs b/src/NATS.Client.JetStream/NatsJSSubConsume.cs index 9c852ef48..63878f288 100644 --- a/src/NATS.Client.JetStream/NatsJSSubConsume.cs +++ b/src/NATS.Client.JetStream/NatsJSSubConsume.cs @@ -91,6 +91,8 @@ public ValueTask CallMsgNextAsync(ConsumerGetnextRequest request, CancellationTo public void ResetPending() => _pending = _batch; + public void ResetHeartbeatTimer() => _timer.Change(_hbTimeout, Timeout.Infinite); + public override async ValueTask DisposeAsync() { await base.DisposeAsync(); @@ -204,8 +206,6 @@ protected override void TryComplete() _notifications.Writer.TryComplete(); } - private void ResetHeartbeatTimer() => _timer.Change(_hbTimeout, Timeout.Infinite); - private void CheckPending() { if (_pending <= _threshold) diff --git a/src/NATS.Client.JetStream/NatsJSSubFetch.cs b/src/NATS.Client.JetStream/NatsJSSubFetch.cs index 73a7dd2e9..e4f351380 100644 --- a/src/NATS.Client.JetStream/NatsJSSubFetch.cs +++ b/src/NATS.Client.JetStream/NatsJSSubFetch.cs @@ -27,6 +27,8 @@ public class NatsJSSubFetch : NatsSubBase, INatsJSSubConsume private readonly long _idle; private readonly long _hbTimeout; + private long _pending; + public NatsJSSubFetch( long batch, TimeSpan expires, @@ -50,6 +52,7 @@ public NatsJSSubFetch( _expires = expires.ToNanos(); _idle = idle.ToNanos(); _hbTimeout = (int)(idle * 2).TotalMilliseconds; + _pending = _batch; _userMsgs = Channel.CreateBounded>(NatsSub.GetChannelOptions(opts?.ChannelOptions)); Msgs = _userMsgs.Reader; @@ -79,6 +82,8 @@ public ValueTask CallMsgNextAsync(ConsumerGetnextRequest request, CancellationTo headers: default, cancellationToken); + public void ResetHeartbeatTimer() => _timer.Change(_hbTimeout, Timeout.Infinite); + public override async ValueTask DisposeAsync() { await base.DisposeAsync(); @@ -114,55 +119,65 @@ protected override ValueTask ReceiveInternalAsync( ReadOnlySequence? headersBuffer, ReadOnlySequence payloadBuffer) { - ResetHeartbeatTimer(); - if (subject == Subject) + try { - if (headersBuffer.HasValue) + ResetHeartbeatTimer(); + if (subject == Subject) { - var headers = new NatsHeaders(); - if (Connection.HeaderParser.ParseHeaders(new SequenceReader(headersBuffer.Value), headers)) + if (headersBuffer.HasValue) { - if (headers is { Code: 408, Message: NatsHeaders.Messages.RequestTimeout }) + var headers = new NatsHeaders(); + if (Connection.HeaderParser.ParseHeaders(new SequenceReader(headersBuffer.Value), headers)) { - } - else if (headers is { Code: 100, Message: NatsHeaders.Messages.IdleHeartbeat }) - { - EndSubscription(NatsSubEndReason.Timeout); + if (headers is { Code: 408, Message: NatsHeaders.Messages.RequestTimeout }) + { + EndSubscription(NatsSubEndReason.Timeout); + } + else if (headers is { Code: 100, Message: NatsHeaders.Messages.IdleHeartbeat }) + { + } + else + { + _notifications.Writer.TryWrite(new NatsJSNotification(headers.Code, headers.MessageText)); + } } else { - _notifications.Writer.TryWrite(new NatsJSNotification(headers.Code, headers.MessageText)); + _logger.LogError( + "Can't parse headers: {HeadersBuffer}", + Encoding.ASCII.GetString(headersBuffer.Value.ToArray())); + throw new NatsJSException("Can't parse headers"); } } else { - _logger.LogError( - "Can't parse headers: {HeadersBuffer}", - Encoding.ASCII.GetString(headersBuffer.Value.ToArray())); - throw new NatsJSException("Can't parse headers"); + throw new NatsJSException("No header found"); } + + return ValueTask.CompletedTask; } else { - throw new NatsJSException("No header found"); + var msg = new NatsJSMsg(NatsMsg.Build( + subject, + replyTo, + headersBuffer, + payloadBuffer, + Connection, + Connection.HeaderParser, + _serializer)); + + _pending--; + + return _userMsgs.Writer.WriteAsync(msg); } - - return ValueTask.CompletedTask; } - else + finally { - var msg = new NatsJSMsg(NatsMsg.Build( - subject, - replyTo, - headersBuffer, - payloadBuffer, - Connection, - Connection.HeaderParser, - _serializer)); - - DecrementMaxMsgs(); - - return _userMsgs.Writer.WriteAsync(msg); + if (_pending == 0) + { + EndSubscription(NatsSubEndReason.MaxMsgs); + } } } @@ -172,8 +187,6 @@ protected override void TryComplete() _notifications.Writer.TryComplete(); } - private void ResetHeartbeatTimer() => _timer.Change(_hbTimeout, Timeout.Infinite); - private async Task NotificationsLoop() { await foreach (var notification in _notifications.Reader.ReadAllAsync())