From 92e25cee945116a63afe85927f1978bf3b188a5b Mon Sep 17 00:00:00 2001 From: Simon Hoss Date: Fri, 29 Sep 2023 23:50:27 +0200 Subject: [PATCH 01/13] Add seq and datetime to msg parsed from reply string --- .../Internal/ReplyToDateTimeAndSeq.cs | 15 ++++++++++ src/NATS.Client.Core/NatsMsg.cs | 29 +++++++++++++++++-- .../Internal/ReplyToDateTimeAndSeqTest.cs | 13 +++++++++ 3 files changed, 55 insertions(+), 2 deletions(-) create mode 100644 src/NATS.Client.Core/Internal/ReplyToDateTimeAndSeq.cs create mode 100644 tests/NATS.Client.Core.Tests/Internal/ReplyToDateTimeAndSeqTest.cs diff --git a/src/NATS.Client.Core/Internal/ReplyToDateTimeAndSeq.cs b/src/NATS.Client.Core/Internal/ReplyToDateTimeAndSeq.cs new file mode 100644 index 000000000..4734024b1 --- /dev/null +++ b/src/NATS.Client.Core/Internal/ReplyToDateTimeAndSeq.cs @@ -0,0 +1,15 @@ +namespace NATS.Client.Core.Internal; + +internal static class ReplyToDateTimeAndSeq +{ + internal static (DateTimeOffset DateTime, long Seq) Parse(string reply) + { + var ackSeperated = reply.Split("."); + var timestamp = long.Parse(ackSeperated[^2]); + var offset = DateTimeOffset.FromUnixTimeMilliseconds(timestamp / 1000000); + var dateTime = new DateTimeOffset(offset.Ticks, TimeSpan.Zero); + var seq = long.Parse(ackSeperated[5]); + + return (dateTime, seq); + } +} diff --git a/src/NATS.Client.Core/NatsMsg.cs b/src/NATS.Client.Core/NatsMsg.cs index 67edc2b61..374991f94 100644 --- a/src/NATS.Client.Core/NatsMsg.cs +++ b/src/NATS.Client.Core/NatsMsg.cs @@ -1,5 +1,6 @@ using System.Buffers; using System.Diagnostics.CodeAnalysis; +using NATS.Client.Core.Internal; namespace NATS.Client.Core; @@ -24,6 +25,8 @@ namespace NATS.Client.Core; public readonly record struct NatsMsg( string Subject, string? ReplyTo, + long? Seq, + DateTimeOffset? DateTime, int Size, NatsHeaders? Headers, ReadOnlyMemory Data, @@ -50,12 +53,22 @@ internal static NatsMsg Build( headers.SetReadOnly(); } + long? seq = null; + DateTimeOffset? dateTime = null; + + if (replyTo != null) + { + var dateTimeAndSeq = ReplyToDateTimeAndSeq.Parse(replyTo); + seq = dateTimeAndSeq.Seq; + dateTime = dateTimeAndSeq.DateTime; + } + var size = subject.Length + (replyTo?.Length ?? 0) + (headersBuffer?.Length ?? 0) + payloadBuffer.Length; - return new NatsMsg(subject, replyTo, (int)size, headers, payloadBuffer.ToArray(), connection); + return new NatsMsg(subject, replyTo, seq, dateTime, (int) size, headers, payloadBuffer.ToArray(), connection); } /// @@ -129,6 +142,8 @@ private void CheckReplyPreconditions() public readonly record struct NatsMsg( string Subject, string? ReplyTo, + long? Seq, + DateTimeOffset? DateTime, int Size, NatsHeaders? Headers, T? Data, @@ -163,12 +178,22 @@ internal static NatsMsg Build( headers.SetReadOnly(); } + long? seq = null; + DateTimeOffset? dateTime = null; + + if (replyTo != null) + { + var dateTimeAndSeq = ReplyToDateTimeAndSeq.Parse(replyTo); + seq = dateTimeAndSeq.Seq; + dateTime = dateTimeAndSeq.DateTime; + } + var size = subject.Length + (replyTo?.Length ?? 0) + (headersBuffer?.Length ?? 0) + payloadBuffer.Length; - return new NatsMsg(subject, replyTo, (int)size, headers, data, connection); + return new NatsMsg(subject, replyTo, seq, dateTime, (int) size, headers, data, connection); } /// diff --git a/tests/NATS.Client.Core.Tests/Internal/ReplyToDateTimeAndSeqTest.cs b/tests/NATS.Client.Core.Tests/Internal/ReplyToDateTimeAndSeqTest.cs new file mode 100644 index 000000000..dcc6c27f3 --- /dev/null +++ b/tests/NATS.Client.Core.Tests/Internal/ReplyToDateTimeAndSeqTest.cs @@ -0,0 +1,13 @@ +namespace NATS.Client.Core.Tests.Internal; + +public class ReplyToDateTimeAndSeqTest +{ + [Fact] + public void ShouldParseReplyToDateTimeAndSeq() + { + var (dateTime, seq) = ReplyToDateTimeAndSeq.Parse("$JS.ACK.UnitTest.GetEvents_0.1.100.1.1696023331771188000.0"); + + dateTime.ToString("O").Should().Be("2023-09-29T21:35:31.7710000+00:00"); + seq.Should().Be(100); + } +} From 02ebb8c2bfae9bad73960d6f1c18ff0b34190723 Mon Sep 17 00:00:00 2001 From: Simon Hoss Date: Sat, 30 Sep 2023 00:09:57 +0200 Subject: [PATCH 02/13] Map NatsJSMsg Seq, DateTime to the underlying message --- src/NATS.Client.JetStream/NatsJSMsg.cs | 30 ++++++++++++++++++-------- 1 file changed, 21 insertions(+), 9 deletions(-) diff --git a/src/NATS.Client.JetStream/NatsJSMsg.cs b/src/NATS.Client.JetStream/NatsJSMsg.cs index c0a4dfc54..cdb68a30a 100644 --- a/src/NATS.Client.JetStream/NatsJSMsg.cs +++ b/src/NATS.Client.JetStream/NatsJSMsg.cs @@ -45,6 +45,16 @@ internal NatsJSMsg(NatsMsg msg, NatsJSContext context) /// public T? Data => _msg.Data; + /// + /// The sequence number of the message + /// + public long? Seq => _msg.Seq; + + /// + /// The time of the message + /// + public DateTimeOffset? DateTime => _msg.DateTime; + /// /// The connection messages was delivered on. /// @@ -56,7 +66,8 @@ internal NatsJSMsg(NatsMsg msg, NatsJSContext context) /// Ack options. /// A used to cancel the call. /// A representing the async call. - public ValueTask AckAsync(AckOpts opts = default, CancellationToken cancellationToken = default) => SendAckAsync(NatsJSConstants.Ack, opts, cancellationToken); + public ValueTask AckAsync(AckOpts opts = default, CancellationToken cancellationToken = default) => + SendAckAsync(NatsJSConstants.Ack, opts, cancellationToken); /// /// Signals that the message will not be processed now and processing can move onto the next message. @@ -67,7 +78,8 @@ internal NatsJSMsg(NatsMsg msg, NatsJSContext context) /// /// Messages rejected using NACK will be resent by the NATS JetStream server after the configured timeout. /// - public ValueTask NackAsync(AckOpts opts = default, CancellationToken cancellationToken = default) => SendAckAsync(NatsJSConstants.Nack, opts, cancellationToken); + public ValueTask NackAsync(AckOpts opts = default, CancellationToken cancellationToken = default) => + SendAckAsync(NatsJSConstants.Nack, opts, cancellationToken); /// /// Indicates that work is ongoing and the wait period should be extended. @@ -85,7 +97,8 @@ internal NatsJSMsg(NatsMsg msg, NatsJSContext context) /// by another amount of time equal to ack_wait by the NATS JetStream server. /// /// - public ValueTask AckProgressAsync(AckOpts opts = default, CancellationToken cancellationToken = default) => SendAckAsync(NatsJSConstants.AckProgress, opts, cancellationToken); + public ValueTask AckProgressAsync(AckOpts opts = default, CancellationToken cancellationToken = default) => + SendAckAsync(NatsJSConstants.AckProgress, opts, cancellationToken); /// /// Instructs the server to stop redelivery of the message without acknowledging it as successfully processed. @@ -93,19 +106,18 @@ internal NatsJSMsg(NatsMsg msg, NatsJSContext context) /// Ack options. /// A used to cancel the call. /// A representing the async call. - public ValueTask AckTerminateAsync(AckOpts opts = default, CancellationToken cancellationToken = default) => SendAckAsync(NatsJSConstants.AckTerminate, opts, cancellationToken); + public ValueTask AckTerminateAsync(AckOpts opts = default, CancellationToken cancellationToken = default) => + SendAckAsync(NatsJSConstants.AckTerminate, opts, cancellationToken); - private ValueTask SendAckAsync(ReadOnlySequence payload, AckOpts opts = default, CancellationToken cancellationToken = default) + private ValueTask SendAckAsync(ReadOnlySequence payload, AckOpts opts = default, + CancellationToken cancellationToken = default) { if (_msg == default) throw new NatsJSException("No user message, can't acknowledge"); return _msg.ReplyAsync( payload: payload, - opts: new NatsPubOpts - { - WaitUntilSent = opts.WaitUntilSent ?? _context.Opts.AckOpts.WaitUntilSent, - }, + opts: new NatsPubOpts {WaitUntilSent = opts.WaitUntilSent ?? _context.Opts.AckOpts.WaitUntilSent,}, cancellationToken: cancellationToken); } } From 3c5abccbc066e4fdc2889b4c58b27131ee664eba Mon Sep 17 00:00:00 2001 From: Simon Hoss Date: Sat, 30 Sep 2023 00:12:33 +0200 Subject: [PATCH 03/13] Correct formatting --- src/NATS.Client.JetStream/NatsJSMsg.cs | 32 ++++++++++++-------------- 1 file changed, 15 insertions(+), 17 deletions(-) diff --git a/src/NATS.Client.JetStream/NatsJSMsg.cs b/src/NATS.Client.JetStream/NatsJSMsg.cs index cdb68a30a..03b735f24 100644 --- a/src/NATS.Client.JetStream/NatsJSMsg.cs +++ b/src/NATS.Client.JetStream/NatsJSMsg.cs @@ -46,19 +46,19 @@ internal NatsJSMsg(NatsMsg msg, NatsJSContext context) public T? Data => _msg.Data; /// - /// The sequence number of the message + /// The connection messages was delivered on. /// - public long? Seq => _msg.Seq; + public INatsConnection? Connection => _msg.Connection; /// - /// The time of the message + /// The sequence number of the message. /// - public DateTimeOffset? DateTime => _msg.DateTime; + public long? Sequence => _msg.Seq; /// - /// The connection messages was delivered on. + /// The time of the message. /// - public INatsConnection? Connection => _msg.Connection; + public DateTimeOffset? DateTime => _msg.DateTime; /// /// Acknowledges the message was completely handled. @@ -66,8 +66,7 @@ internal NatsJSMsg(NatsMsg msg, NatsJSContext context) /// Ack options. /// A used to cancel the call. /// A representing the async call. - public ValueTask AckAsync(AckOpts opts = default, CancellationToken cancellationToken = default) => - SendAckAsync(NatsJSConstants.Ack, opts, cancellationToken); + public ValueTask AckAsync(AckOpts opts = default, CancellationToken cancellationToken = default) => SendAckAsync(NatsJSConstants.Ack, opts, cancellationToken); /// /// Signals that the message will not be processed now and processing can move onto the next message. @@ -78,8 +77,7 @@ public ValueTask AckAsync(AckOpts opts = default, CancellationToken cancellation /// /// Messages rejected using NACK will be resent by the NATS JetStream server after the configured timeout. /// - public ValueTask NackAsync(AckOpts opts = default, CancellationToken cancellationToken = default) => - SendAckAsync(NatsJSConstants.Nack, opts, cancellationToken); + public ValueTask NackAsync(AckOpts opts = default, CancellationToken cancellationToken = default) => SendAckAsync(NatsJSConstants.Nack, opts, cancellationToken); /// /// Indicates that work is ongoing and the wait period should be extended. @@ -97,8 +95,7 @@ public ValueTask NackAsync(AckOpts opts = default, CancellationToken cancellatio /// by another amount of time equal to ack_wait by the NATS JetStream server. /// /// - public ValueTask AckProgressAsync(AckOpts opts = default, CancellationToken cancellationToken = default) => - SendAckAsync(NatsJSConstants.AckProgress, opts, cancellationToken); + public ValueTask AckProgressAsync(AckOpts opts = default, CancellationToken cancellationToken = default) => SendAckAsync(NatsJSConstants.AckProgress, opts, cancellationToken); /// /// Instructs the server to stop redelivery of the message without acknowledging it as successfully processed. @@ -106,18 +103,19 @@ public ValueTask AckProgressAsync(AckOpts opts = default, CancellationToken canc /// Ack options. /// A used to cancel the call. /// A representing the async call. - public ValueTask AckTerminateAsync(AckOpts opts = default, CancellationToken cancellationToken = default) => - SendAckAsync(NatsJSConstants.AckTerminate, opts, cancellationToken); + public ValueTask AckTerminateAsync(AckOpts opts = default, CancellationToken cancellationToken = default) => SendAckAsync(NatsJSConstants.AckTerminate, opts, cancellationToken); - private ValueTask SendAckAsync(ReadOnlySequence payload, AckOpts opts = default, - CancellationToken cancellationToken = default) + private ValueTask SendAckAsync(ReadOnlySequence payload, AckOpts opts = default, CancellationToken cancellationToken = default) { if (_msg == default) throw new NatsJSException("No user message, can't acknowledge"); return _msg.ReplyAsync( payload: payload, - opts: new NatsPubOpts {WaitUntilSent = opts.WaitUntilSent ?? _context.Opts.AckOpts.WaitUntilSent,}, + opts: new NatsPubOpts + { + WaitUntilSent = opts.WaitUntilSent ?? _context.Opts.AckOpts.WaitUntilSent, + }, cancellationToken: cancellationToken); } } From 76ff2bd408ee06a5764fd98a65d55dd07a61f822 Mon Sep 17 00:00:00 2001 From: Simon Hoss Date: Sat, 30 Sep 2023 13:16:11 +0200 Subject: [PATCH 04/13] Implement Msg DateTime and Sequence only in JetStream --- .editorconfig | 1 + .../Internal/ReplyToDateTimeAndSeq.cs | 15 --------- src/NATS.Client.Core/NatsMsg.cs | 29 ++-------------- .../Internal/ReplyToDateTimeAndSeq.cs | 28 ++++++++++++++++ src/NATS.Client.JetStream/NatsJSMsg.cs | 11 +++---- .../Internal/ReplyToDateTimeAndSeqTest.cs | 13 -------- .../Internal/ReplyToDateTimeAndSeqTest.cs | 33 +++++++++++++++++++ 7 files changed, 69 insertions(+), 61 deletions(-) delete mode 100644 src/NATS.Client.Core/Internal/ReplyToDateTimeAndSeq.cs create mode 100644 src/NATS.Client.JetStream/Internal/ReplyToDateTimeAndSeq.cs delete mode 100644 tests/NATS.Client.Core.Tests/Internal/ReplyToDateTimeAndSeqTest.cs create mode 100644 tests/NATS.Client.JetStream.Tests/Internal/ReplyToDateTimeAndSeqTest.cs diff --git a/.editorconfig b/.editorconfig index a41b130ef..408cdcc41 100644 --- a/.editorconfig +++ b/.editorconfig @@ -13,6 +13,7 @@ generated_code = true [*.cs] indent_size = 4 +max_line_length = 300 # changes from VS2019 defaults csharp_style_namespace_declarations = file_scoped diff --git a/src/NATS.Client.Core/Internal/ReplyToDateTimeAndSeq.cs b/src/NATS.Client.Core/Internal/ReplyToDateTimeAndSeq.cs deleted file mode 100644 index 4734024b1..000000000 --- a/src/NATS.Client.Core/Internal/ReplyToDateTimeAndSeq.cs +++ /dev/null @@ -1,15 +0,0 @@ -namespace NATS.Client.Core.Internal; - -internal static class ReplyToDateTimeAndSeq -{ - internal static (DateTimeOffset DateTime, long Seq) Parse(string reply) - { - var ackSeperated = reply.Split("."); - var timestamp = long.Parse(ackSeperated[^2]); - var offset = DateTimeOffset.FromUnixTimeMilliseconds(timestamp / 1000000); - var dateTime = new DateTimeOffset(offset.Ticks, TimeSpan.Zero); - var seq = long.Parse(ackSeperated[5]); - - return (dateTime, seq); - } -} diff --git a/src/NATS.Client.Core/NatsMsg.cs b/src/NATS.Client.Core/NatsMsg.cs index 374991f94..67edc2b61 100644 --- a/src/NATS.Client.Core/NatsMsg.cs +++ b/src/NATS.Client.Core/NatsMsg.cs @@ -1,6 +1,5 @@ using System.Buffers; using System.Diagnostics.CodeAnalysis; -using NATS.Client.Core.Internal; namespace NATS.Client.Core; @@ -25,8 +24,6 @@ namespace NATS.Client.Core; public readonly record struct NatsMsg( string Subject, string? ReplyTo, - long? Seq, - DateTimeOffset? DateTime, int Size, NatsHeaders? Headers, ReadOnlyMemory Data, @@ -53,22 +50,12 @@ internal static NatsMsg Build( headers.SetReadOnly(); } - long? seq = null; - DateTimeOffset? dateTime = null; - - if (replyTo != null) - { - var dateTimeAndSeq = ReplyToDateTimeAndSeq.Parse(replyTo); - seq = dateTimeAndSeq.Seq; - dateTime = dateTimeAndSeq.DateTime; - } - var size = subject.Length + (replyTo?.Length ?? 0) + (headersBuffer?.Length ?? 0) + payloadBuffer.Length; - return new NatsMsg(subject, replyTo, seq, dateTime, (int) size, headers, payloadBuffer.ToArray(), connection); + return new NatsMsg(subject, replyTo, (int)size, headers, payloadBuffer.ToArray(), connection); } /// @@ -142,8 +129,6 @@ private void CheckReplyPreconditions() public readonly record struct NatsMsg( string Subject, string? ReplyTo, - long? Seq, - DateTimeOffset? DateTime, int Size, NatsHeaders? Headers, T? Data, @@ -178,22 +163,12 @@ internal static NatsMsg Build( headers.SetReadOnly(); } - long? seq = null; - DateTimeOffset? dateTime = null; - - if (replyTo != null) - { - var dateTimeAndSeq = ReplyToDateTimeAndSeq.Parse(replyTo); - seq = dateTimeAndSeq.Seq; - dateTime = dateTimeAndSeq.DateTime; - } - var size = subject.Length + (replyTo?.Length ?? 0) + (headersBuffer?.Length ?? 0) + payloadBuffer.Length; - return new NatsMsg(subject, replyTo, seq, dateTime, (int) size, headers, data, connection); + return new NatsMsg(subject, replyTo, (int)size, headers, data, connection); } /// diff --git a/src/NATS.Client.JetStream/Internal/ReplyToDateTimeAndSeq.cs b/src/NATS.Client.JetStream/Internal/ReplyToDateTimeAndSeq.cs new file mode 100644 index 000000000..035858c9f --- /dev/null +++ b/src/NATS.Client.JetStream/Internal/ReplyToDateTimeAndSeq.cs @@ -0,0 +1,28 @@ +namespace NATS.Client.JetStream.Internal; + +internal static class ReplyToDateTimeAndSeq +{ + internal static (DateTimeOffset? DateTime, long? Seq) Parse(string? reply) + { + if (reply == null) + { + return (null, null); + } + + var ackSeperated = reply.Split("."); + + // It must be seperated by 9 dots as defined in + // https://docs.nats.io/reference/reference-protocols/nats_api_reference#acknowledging-messages + if (ackSeperated.Length != 9) + { + return (null, null); + } + + var timestamp = long.Parse(ackSeperated[^2]); + var offset = DateTimeOffset.FromUnixTimeMilliseconds(timestamp / 1000000); + var dateTime = new DateTimeOffset(offset.Ticks, TimeSpan.Zero); + var seq = long.Parse(ackSeperated[5]); + + return (dateTime, seq); + } +} diff --git a/src/NATS.Client.JetStream/NatsJSMsg.cs b/src/NATS.Client.JetStream/NatsJSMsg.cs index 03b735f24..6fde0aa71 100644 --- a/src/NATS.Client.JetStream/NatsJSMsg.cs +++ b/src/NATS.Client.JetStream/NatsJSMsg.cs @@ -12,11 +12,13 @@ public readonly struct NatsJSMsg { private readonly NatsJSContext _context; private readonly NatsMsg _msg; + private readonly Lazy<(DateTimeOffset? DateTime, long? Sequence)> _replyToDateTimeAndSeq; internal NatsJSMsg(NatsMsg msg, NatsJSContext context) { _msg = msg; _context = context; + _replyToDateTimeAndSeq = new Lazy<(DateTimeOffset? DateTime, long? Sequnce)>(() => ReplyToDateTimeAndSeq.Parse(msg.ReplyTo)); } /// @@ -53,12 +55,12 @@ internal NatsJSMsg(NatsMsg msg, NatsJSContext context) /// /// The sequence number of the message. /// - public long? Sequence => _msg.Seq; + public long? Sequence => _replyToDateTimeAndSeq.Value.Sequence; /// /// The time of the message. /// - public DateTimeOffset? DateTime => _msg.DateTime; + public DateTimeOffset? DateTime => _replyToDateTimeAndSeq.Value.DateTime; /// /// Acknowledges the message was completely handled. @@ -112,10 +114,7 @@ private ValueTask SendAckAsync(ReadOnlySequence payload, AckOpts opts = de return _msg.ReplyAsync( payload: payload, - opts: new NatsPubOpts - { - WaitUntilSent = opts.WaitUntilSent ?? _context.Opts.AckOpts.WaitUntilSent, - }, + opts: new NatsPubOpts {WaitUntilSent = opts.WaitUntilSent ?? _context.Opts.AckOpts.WaitUntilSent,}, cancellationToken: cancellationToken); } } diff --git a/tests/NATS.Client.Core.Tests/Internal/ReplyToDateTimeAndSeqTest.cs b/tests/NATS.Client.Core.Tests/Internal/ReplyToDateTimeAndSeqTest.cs deleted file mode 100644 index dcc6c27f3..000000000 --- a/tests/NATS.Client.Core.Tests/Internal/ReplyToDateTimeAndSeqTest.cs +++ /dev/null @@ -1,13 +0,0 @@ -namespace NATS.Client.Core.Tests.Internal; - -public class ReplyToDateTimeAndSeqTest -{ - [Fact] - public void ShouldParseReplyToDateTimeAndSeq() - { - var (dateTime, seq) = ReplyToDateTimeAndSeq.Parse("$JS.ACK.UnitTest.GetEvents_0.1.100.1.1696023331771188000.0"); - - dateTime.ToString("O").Should().Be("2023-09-29T21:35:31.7710000+00:00"); - seq.Should().Be(100); - } -} diff --git a/tests/NATS.Client.JetStream.Tests/Internal/ReplyToDateTimeAndSeqTest.cs b/tests/NATS.Client.JetStream.Tests/Internal/ReplyToDateTimeAndSeqTest.cs new file mode 100644 index 000000000..7c9a64bb8 --- /dev/null +++ b/tests/NATS.Client.JetStream.Tests/Internal/ReplyToDateTimeAndSeqTest.cs @@ -0,0 +1,33 @@ +using NATS.Client.JetStream.Internal; + +namespace NATS.Client.JetStream.Tests.Internal; + +public class ReplyToDateTimeAndSeqTest +{ + [Fact] + public void ShouldParseReplyToDateTimeAndSeq() + { + var (dateTime, seq) = ReplyToDateTimeAndSeq.Parse("$JS.ACK.UnitTest.GetEvents_0.1.100.1.1696023331771188000.0"); + + dateTime!.Value.ToString("O").Should().Be("2023-09-29T21:35:31.7710000+00:00"); + seq.Should().Be(100); + } + + [Fact] + public void ShouldSetNullForReturnWhenReplyToIsNull() + { + var (dateTime, seq) = ReplyToDateTimeAndSeq.Parse(null); + + dateTime.Should().BeNull(); + seq.Should().BeNull(); + } + + [Fact] + public void ShouldSetNullWhenReplyToIsASimpleReqResponse() + { + var (dateTime, seq) = ReplyToDateTimeAndSeq.Parse("_INBOX.1"); + + dateTime.Should().BeNull(); + seq.Should().BeNull(); + } +} From 298cc5ec9fbcac576b8140c224a05dc8b8f1653d Mon Sep 17 00:00:00 2001 From: Simon Hoss Date: Sat, 30 Sep 2023 18:37:52 +0200 Subject: [PATCH 05/13] Parse reply based on official JSAck ADR --- .../Internal/ReplyToDateTimeAndSeq.cs | 66 ++++++++++++++++--- src/NATS.Client.JetStream/NatsJSMsg.cs | 13 ++-- .../NatsJSMsgMetadata.cs | 38 +++++++++++ .../Internal/ReplyToDateTimeAndSeqTest.cs | 47 ++++++++++--- 4 files changed, 135 insertions(+), 29 deletions(-) create mode 100644 src/NATS.Client.JetStream/NatsJSMsgMetadata.cs diff --git a/src/NATS.Client.JetStream/Internal/ReplyToDateTimeAndSeq.cs b/src/NATS.Client.JetStream/Internal/ReplyToDateTimeAndSeq.cs index 035858c9f..90e284c63 100644 --- a/src/NATS.Client.JetStream/Internal/ReplyToDateTimeAndSeq.cs +++ b/src/NATS.Client.JetStream/Internal/ReplyToDateTimeAndSeq.cs @@ -2,27 +2,73 @@ namespace NATS.Client.JetStream.Internal; internal static class ReplyToDateTimeAndSeq { - internal static (DateTimeOffset? DateTime, long? Seq) Parse(string? reply) + private const int V1TokenCounts = 9; + private const int V2TokenCounts = 12; + + private const int AckDomainTokenPos = 2; + private const int AckAccHashTokenPos = 3; + private const int AckStreamTokenPos = 4; + private const int AckConsumerTokenPos = 5; + private const int AckNumDeliveredTokenPos = 6; + private const int AckStreamSeqTokenPos = 7; + private const int AckConsumerSeqTokenPos = 8; + private const int AckTimestampSeqTokenPos = 9; + private const int AckNumPendingTokenPos = 10; + + internal static NatsJSMsgMetadata? Parse(string? reply) { if (reply == null) { - return (null, null); + return null; + } + + var originalTokens = reply.Split(".").AsSpan(); + var tokensLength = originalTokens.Length; + + // Parsed based on https://github.com/nats-io/nats-architecture-and-design/blob/main/adr/ADR-15.md#jsack + + // If lower than 9 or more than 9 but less than 11, report an error + if (tokensLength is < V1TokenCounts or > V1TokenCounts and < V2TokenCounts - 1) + { + return null; } - var ackSeperated = reply.Split("."); + if (originalTokens[0] != "$JS" || originalTokens[1] != "ACK") + { + return null; + } - // It must be seperated by 9 dots as defined in - // https://docs.nats.io/reference/reference-protocols/nats_api_reference#acknowledging-messages - if (ackSeperated.Length != 9) + var tokens = new string[V2TokenCounts].AsSpan(); + + if (tokensLength == V1TokenCounts) { - return (null, null); + originalTokens[..2].CopyTo(tokens); + originalTokens[2..].CopyTo(tokens[4..]); + + tokens[AckDomainTokenPos] = string.Empty; + tokens[AckAccHashTokenPos] = string.Empty; + } + else + { + tokens = originalTokens; + + if (tokens[AckDomainTokenPos] == "_") + { + tokens[AckDomainTokenPos] = string.Empty; + } } - var timestamp = long.Parse(ackSeperated[^2]); + var timestamp = long.Parse(tokens[AckTimestampSeqTokenPos]); var offset = DateTimeOffset.FromUnixTimeMilliseconds(timestamp / 1000000); var dateTime = new DateTimeOffset(offset.Ticks, TimeSpan.Zero); - var seq = long.Parse(ackSeperated[5]); - return (dateTime, seq); + return new NatsJSMsgMetadata( + new NatsJSSequencePair(ulong.Parse(tokens[AckStreamSeqTokenPos]), ulong.Parse(tokens[AckConsumerSeqTokenPos])), + ulong.Parse(tokens[AckNumDeliveredTokenPos]), + ulong.Parse(tokens[AckNumPendingTokenPos]), + dateTime, + tokens[AckStreamTokenPos], + tokens[AckConsumerTokenPos], + tokens[AckDomainTokenPos]); } } diff --git a/src/NATS.Client.JetStream/NatsJSMsg.cs b/src/NATS.Client.JetStream/NatsJSMsg.cs index 6fde0aa71..ed4e49149 100644 --- a/src/NATS.Client.JetStream/NatsJSMsg.cs +++ b/src/NATS.Client.JetStream/NatsJSMsg.cs @@ -12,13 +12,13 @@ public readonly struct NatsJSMsg { private readonly NatsJSContext _context; private readonly NatsMsg _msg; - private readonly Lazy<(DateTimeOffset? DateTime, long? Sequence)> _replyToDateTimeAndSeq; + private readonly Lazy _replyToDateTimeAndSeq; internal NatsJSMsg(NatsMsg msg, NatsJSContext context) { _msg = msg; _context = context; - _replyToDateTimeAndSeq = new Lazy<(DateTimeOffset? DateTime, long? Sequnce)>(() => ReplyToDateTimeAndSeq.Parse(msg.ReplyTo)); + _replyToDateTimeAndSeq = new Lazy(() => ReplyToDateTimeAndSeq.Parse(msg.ReplyTo)); } /// @@ -53,14 +53,9 @@ internal NatsJSMsg(NatsMsg msg, NatsJSContext context) public INatsConnection? Connection => _msg.Connection; /// - /// The sequence number of the message. + /// Additional metadata about the message. /// - public long? Sequence => _replyToDateTimeAndSeq.Value.Sequence; - - /// - /// The time of the message. - /// - public DateTimeOffset? DateTime => _replyToDateTimeAndSeq.Value.DateTime; + public NatsJSMsgMetadata? Metadata => _replyToDateTimeAndSeq.Value; /// /// Acknowledges the message was completely handled. diff --git a/src/NATS.Client.JetStream/NatsJSMsgMetadata.cs b/src/NATS.Client.JetStream/NatsJSMsgMetadata.cs new file mode 100644 index 000000000..e31341a5d --- /dev/null +++ b/src/NATS.Client.JetStream/NatsJSMsgMetadata.cs @@ -0,0 +1,38 @@ +namespace NATS.Client.JetStream; + +/// +/// Additional metadata about the message. +/// +/// +/// The sequence pair for the message. +/// +/// +/// The number of times the message was delivered. +/// +/// +/// The number of messages pending for the consumer. +/// +/// +/// The timestamp of the message. +/// +/// +/// The stream the message was sent to. +/// +/// +/// The consumer the message was sent to. +/// +/// +/// The domain the message was sent to. +/// +public readonly record struct NatsJSMsgMetadata(NatsJSSequencePair Sequence, ulong NumDelivered, ulong NumPending, DateTimeOffset Timestamp, string Stream, string Consumer, string Domain); + +/// +/// The sequence pair for the message. +/// +/// +/// The stream sequence number. +/// +/// +/// The consumer sequence number. +/// +public readonly record struct NatsJSSequencePair(ulong Stream, ulong Consumer); diff --git a/tests/NATS.Client.JetStream.Tests/Internal/ReplyToDateTimeAndSeqTest.cs b/tests/NATS.Client.JetStream.Tests/Internal/ReplyToDateTimeAndSeqTest.cs index 7c9a64bb8..bf1118e5c 100644 --- a/tests/NATS.Client.JetStream.Tests/Internal/ReplyToDateTimeAndSeqTest.cs +++ b/tests/NATS.Client.JetStream.Tests/Internal/ReplyToDateTimeAndSeqTest.cs @@ -5,29 +5,56 @@ namespace NATS.Client.JetStream.Tests.Internal; public class ReplyToDateTimeAndSeqTest { [Fact] - public void ShouldParseReplyToDateTimeAndSeq() + public void ShouldParseV1ReplyToDateTimeAndSeq() { - var (dateTime, seq) = ReplyToDateTimeAndSeq.Parse("$JS.ACK.UnitTest.GetEvents_0.1.100.1.1696023331771188000.0"); + var natsJSMsgMetadata = ReplyToDateTimeAndSeq.Parse("$JS.ACK.UnitTest.GetEvents_0.1.100.1.1696023331771188000.0"); - dateTime!.Value.ToString("O").Should().Be("2023-09-29T21:35:31.7710000+00:00"); - seq.Should().Be(100); + natsJSMsgMetadata!.Value.Timestamp.ToString("O").Should().Be("2023-09-29T21:35:31.7710000+00:00"); + natsJSMsgMetadata.Value.Sequence.Stream.Should().Be(100); + natsJSMsgMetadata.Value.Sequence.Consumer.Should().Be(1); + natsJSMsgMetadata.Value.NumDelivered.Should().Be(1); + natsJSMsgMetadata.Value.NumPending.Should().Be(0); + natsJSMsgMetadata.Value.Stream.Should().Be("UnitTest"); + natsJSMsgMetadata.Value.Consumer.Should().Be("GetEvents_0"); + natsJSMsgMetadata.Value.Domain.Should().BeEmpty(); + } + + [Fact] + public void ShouldParseV2ReplyToDateTimeAndSeq() + { + var natsJSMsgMetadata = ReplyToDateTimeAndSeq.Parse("$JS.ACK.MyDomain.1234.UnitTest.GetEvents_0.1.100.1.1696023331771188000.0"); + + natsJSMsgMetadata!.Value.Timestamp.ToString("O").Should().Be("2023-09-29T21:35:31.7710000+00:00"); + natsJSMsgMetadata.Value.Sequence.Stream.Should().Be(100); + natsJSMsgMetadata.Value.Sequence.Consumer.Should().Be(1); + natsJSMsgMetadata.Value.NumDelivered.Should().Be(1); + natsJSMsgMetadata.Value.NumPending.Should().Be(0); + natsJSMsgMetadata.Value.Stream.Should().Be("UnitTest"); + natsJSMsgMetadata.Value.Consumer.Should().Be("GetEvents_0"); + natsJSMsgMetadata.Value.Domain.Should().Be("MyDomain"); } [Fact] public void ShouldSetNullForReturnWhenReplyToIsNull() { - var (dateTime, seq) = ReplyToDateTimeAndSeq.Parse(null); + var natsJSMsgMetadata = ReplyToDateTimeAndSeq.Parse(null); - dateTime.Should().BeNull(); - seq.Should().BeNull(); + natsJSMsgMetadata.Should().BeNull(); } [Fact] public void ShouldSetNullWhenReplyToIsASimpleReqResponse() { - var (dateTime, seq) = ReplyToDateTimeAndSeq.Parse("_INBOX.1"); + var natsJSMsgMetadata = ReplyToDateTimeAndSeq.Parse("_INBOX.1"); + + natsJSMsgMetadata.Should().BeNull(); + } + + [Fact] + public void ShouldSetNullWhenDoesNotStartWithJsAck() + { + var natsJSMsgMetadata = ReplyToDateTimeAndSeq.Parse("1.2.3.4.5.6.7.8.9"); - dateTime.Should().BeNull(); - seq.Should().BeNull(); + natsJSMsgMetadata.Should().BeNull(); } } From 4ac03bb3bc762aafd3fbbe24d1215cbdb4395486 Mon Sep 17 00:00:00 2001 From: Simon Hoss Date: Thu, 12 Oct 2023 21:12:57 +0200 Subject: [PATCH 06/13] Add no_wait for consumer --- src/NATS.Client.Core/NatsSubBase.cs | 1 + src/NATS.Client.JetStream/INatsJSFetch.cs | 6 ++ .../Internal/NatsJSFetch.cs | 6 +- src/NATS.Client.JetStream/NatsJSConsumer.cs | 19 +++--- src/NATS.Client.JetStream/NatsJSOpts.cs | 5 ++ tests/NATS.Client.JetStream.Tests/ViewTest.cs | 60 +++++++++++++++++++ 6 files changed, 89 insertions(+), 8 deletions(-) create mode 100644 tests/NATS.Client.JetStream.Tests/ViewTest.cs diff --git a/src/NATS.Client.Core/NatsSubBase.cs b/src/NATS.Client.Core/NatsSubBase.cs index 1eb6164c8..fdcb8559b 100644 --- a/src/NATS.Client.Core/NatsSubBase.cs +++ b/src/NATS.Client.Core/NatsSubBase.cs @@ -10,6 +10,7 @@ namespace NATS.Client.Core; public enum NatsSubEndReason { None, + NoMsgs, MaxMsgs, MaxBytes, Timeout, diff --git a/src/NATS.Client.JetStream/INatsJSFetch.cs b/src/NATS.Client.JetStream/INatsJSFetch.cs index e835678e2..311d003cb 100644 --- a/src/NATS.Client.JetStream/INatsJSFetch.cs +++ b/src/NATS.Client.JetStream/INatsJSFetch.cs @@ -1,4 +1,5 @@ using System.Threading.Channels; +using NATS.Client.Core; namespace NATS.Client.JetStream; @@ -7,6 +8,11 @@ namespace NATS.Client.JetStream; /// public interface INatsJSFetch : IAsyncDisposable { + /// + /// The reason why the fetch has ended + /// + NatsSubEndReason EndReason { get; } + void Stop(); } diff --git a/src/NATS.Client.JetStream/Internal/NatsJSFetch.cs b/src/NATS.Client.JetStream/Internal/NatsJSFetch.cs index 08fcf32e9..0c9c1075d 100644 --- a/src/NATS.Client.JetStream/Internal/NatsJSFetch.cs +++ b/src/NATS.Client.JetStream/Internal/NatsJSFetch.cs @@ -169,7 +169,11 @@ protected override async ValueTask ReceiveInternalAsync( var headers = new NatsHeaders(); if (Connection.HeaderParser.ParseHeaders(new SequenceReader(headersBuffer.Value), headers)) { - if (headers is { Code: 408, Message: NatsHeaders.Messages.RequestTimeout }) + if (headers is { Code: 404 }) + { + EndSubscription(NatsSubEndReason.NoMsgs); + } + else if (headers is { Code: 408, Message: NatsHeaders.Messages.RequestTimeout }) { EndSubscription(NatsSubEndReason.Timeout); } diff --git a/src/NATS.Client.JetStream/NatsJSConsumer.cs b/src/NATS.Client.JetStream/NatsJSConsumer.cs index 800435288..b8ce51cc3 100644 --- a/src/NATS.Client.JetStream/NatsJSConsumer.cs +++ b/src/NATS.Client.JetStream/NatsJSConsumer.cs @@ -266,13 +266,18 @@ await _context.Connection.SubAsync( cancellationToken); await sub.CallMsgNextAsync( - new ConsumerGetnextRequest - { - Batch = max.MaxMsgs, - MaxBytes = max.MaxBytes, - IdleHeartbeat = timeouts.IdleHeartbeat.ToNanos(), - Expires = timeouts.Expires.ToNanos(), - }, + opts.NoWait + + // When no wait is set we don't need to send the idle heartbeat and expiration, because if no message is available nats doesn't send 404 instantly + ? new ConsumerGetnextRequest { Batch = max.MaxMsgs, MaxBytes = max.MaxBytes, NoWait = opts.NoWait } + : new ConsumerGetnextRequest + { + Batch = max.MaxMsgs, + MaxBytes = max.MaxBytes, + IdleHeartbeat = timeouts.IdleHeartbeat.ToNanos(), + Expires = timeouts.Expires.ToNanos(), + NoWait = opts.NoWait, + }, cancellationToken); sub.ResetHeartbeatTimer(); diff --git a/src/NATS.Client.JetStream/NatsJSOpts.cs b/src/NATS.Client.JetStream/NatsJSOpts.cs index c2bd7d6b4..86b58a007 100644 --- a/src/NATS.Client.JetStream/NatsJSOpts.cs +++ b/src/NATS.Client.JetStream/NatsJSOpts.cs @@ -154,6 +154,11 @@ public record NatsJSFetchOpts /// public TimeSpan? IdleHeartbeat { get; init; } + /// + /// Does not wait for messages to be available + /// + public bool NoWait { get; init; } + /// /// Serializer to use to deserialize the message if a model is being used. /// diff --git a/tests/NATS.Client.JetStream.Tests/ViewTest.cs b/tests/NATS.Client.JetStream.Tests/ViewTest.cs new file mode 100644 index 000000000..39b49b9ce --- /dev/null +++ b/tests/NATS.Client.JetStream.Tests/ViewTest.cs @@ -0,0 +1,60 @@ +using NATS.Client.Core.Tests; + +namespace NATS.Client.JetStream.Tests; + +public class ViewTest +{ + private readonly ITestOutputHelper _output; + + public ViewTest(ITestOutputHelper output) + { + _output = output; + } + + [Fact] + public async Task Get_all_msgs() + { + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + + // await using var server = NatsServer.Start( + // outputHelper: _output, + // opts: new NatsServerOptsBuilder() + // .UseTransport(TransportType.Tcp) + // .Trace() + // .UseJetStream() + // .Build()); + + // var (nats, proxy) = server.CreateProxiedClientConnection(); + var nats = new NatsConnection(NatsOpts.Default); + var js = new NatsJSContext(nats); + + await nats.ConnectAsync(); + + // await js.CreateStreamAsync("s1", new[] {"s1.*"}, cts.Token); + + // for (var i = 0; i < 30; i++) + // { + // var ack = await js.PublishAsync("s1.foo", new TestData {Test = i}, cancellationToken: cts.Token); + // ack.EnsureSuccess(); + // } + + var view = await js.GetViewAsync("s1", "s1.*", cts.Token); + + var fetchAllMsgs = await view.FetchAllAsync(cancellationToken: cts.Token); + + var count = 0; + await foreach (var natsJSMsg in fetchAllMsgs.Msgs.ReadAllAsync(cts.Token)) + { + await natsJSMsg.AckAsync(cancellationToken: cts.Token); + + count++; + } + + Assert.Equal(30, count); + } + + private record TestData + { + public int Test { get; init; } + } +} From 858f15f9e1a4a685c4c08fe6d6176b054e9853a4 Mon Sep 17 00:00:00 2001 From: Simon Hoss Date: Thu, 12 Oct 2023 21:16:39 +0200 Subject: [PATCH 07/13] Remove ViewTest --- tests/NATS.Client.JetStream.Tests/ViewTest.cs | 60 ------------------- 1 file changed, 60 deletions(-) delete mode 100644 tests/NATS.Client.JetStream.Tests/ViewTest.cs diff --git a/tests/NATS.Client.JetStream.Tests/ViewTest.cs b/tests/NATS.Client.JetStream.Tests/ViewTest.cs deleted file mode 100644 index 39b49b9ce..000000000 --- a/tests/NATS.Client.JetStream.Tests/ViewTest.cs +++ /dev/null @@ -1,60 +0,0 @@ -using NATS.Client.Core.Tests; - -namespace NATS.Client.JetStream.Tests; - -public class ViewTest -{ - private readonly ITestOutputHelper _output; - - public ViewTest(ITestOutputHelper output) - { - _output = output; - } - - [Fact] - public async Task Get_all_msgs() - { - var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); - - // await using var server = NatsServer.Start( - // outputHelper: _output, - // opts: new NatsServerOptsBuilder() - // .UseTransport(TransportType.Tcp) - // .Trace() - // .UseJetStream() - // .Build()); - - // var (nats, proxy) = server.CreateProxiedClientConnection(); - var nats = new NatsConnection(NatsOpts.Default); - var js = new NatsJSContext(nats); - - await nats.ConnectAsync(); - - // await js.CreateStreamAsync("s1", new[] {"s1.*"}, cts.Token); - - // for (var i = 0; i < 30; i++) - // { - // var ack = await js.PublishAsync("s1.foo", new TestData {Test = i}, cancellationToken: cts.Token); - // ack.EnsureSuccess(); - // } - - var view = await js.GetViewAsync("s1", "s1.*", cts.Token); - - var fetchAllMsgs = await view.FetchAllAsync(cancellationToken: cts.Token); - - var count = 0; - await foreach (var natsJSMsg in fetchAllMsgs.Msgs.ReadAllAsync(cts.Token)) - { - await natsJSMsg.AckAsync(cancellationToken: cts.Token); - - count++; - } - - Assert.Equal(30, count); - } - - private record TestData - { - public int Test { get; init; } - } -} From d924db0149eb635fe1272e24018f7f47547dea5f Mon Sep 17 00:00:00 2001 From: Simon Hoss Date: Mon, 16 Oct 2023 22:26:55 +0200 Subject: [PATCH 08/13] Add no wait test --- tests/NATS.Client.JetStream.Tests/ConsumerFetchTest.cs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/tests/NATS.Client.JetStream.Tests/ConsumerFetchTest.cs b/tests/NATS.Client.JetStream.Tests/ConsumerFetchTest.cs index 05d8c19cc..dd1b4dd8c 100644 --- a/tests/NATS.Client.JetStream.Tests/ConsumerFetchTest.cs +++ b/tests/NATS.Client.JetStream.Tests/ConsumerFetchTest.cs @@ -8,8 +8,10 @@ public class ConsumerFetchTest public ConsumerFetchTest(ITestOutputHelper output) => _output = output; - [Fact] - public async Task Fetch_test() + [Theory] + [InlineData(true)] + [InlineData(false)] + public async Task Fetch_test(bool noWait) { var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); await using var server = NatsServer.StartJS(); @@ -27,7 +29,7 @@ public async Task Fetch_test() var consumer = await js.GetConsumerAsync("s1", "c1", cts.Token); var count = 0; await using var fc = - await consumer.FetchAsync(new NatsJSFetchOpts { MaxMsgs = 10 }, cancellationToken: cts.Token); + await consumer.FetchAsync(new NatsJSFetchOpts { MaxMsgs = 10, NoWait = noWait }, cancellationToken: cts.Token); await foreach (var msg in fc.Msgs.ReadAllAsync(cts.Token)) { await msg.AckAsync(new AckOpts(WaitUntilSent: true), cts.Token); From b42b4a6e344700dcdb3e6c2f3fc1f27b7cd7cef0 Mon Sep 17 00:00:00 2001 From: Simon Hoss Date: Tue, 17 Oct 2023 22:19:11 +0200 Subject: [PATCH 09/13] Correct comment --- src/NATS.Client.JetStream/NatsJSConsumer.cs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/NATS.Client.JetStream/NatsJSConsumer.cs b/src/NATS.Client.JetStream/NatsJSConsumer.cs index 0994bfe7d..7c248b0ce 100644 --- a/src/NATS.Client.JetStream/NatsJSConsumer.cs +++ b/src/NATS.Client.JetStream/NatsJSConsumer.cs @@ -258,7 +258,9 @@ public async ValueTask> FetchAsync( await sub.CallMsgNextAsync( opts.NoWait - // When no wait is set we don't need to send the idle heartbeat and expiration, because if no message is available nats doesn't send 404 instantly + // When no wait is set we don't need to send the idle heartbeat and expiration + // If no message is available the server will respond with a 404 immediately + // If messages are available the server will send a 408 direct after the last message ? new ConsumerGetnextRequest { Batch = max.MaxMsgs, MaxBytes = max.MaxBytes, NoWait = opts.NoWait } : new ConsumerGetnextRequest { From 256048a75cac0797bdc2525d233cd846a801ef57 Mon Sep 17 00:00:00 2001 From: Simon Hoss Date: Tue, 17 Oct 2023 22:19:41 +0200 Subject: [PATCH 10/13] Add noWait tests --- .../ConsumerFetchTest.cs | 32 +++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/tests/NATS.Client.JetStream.Tests/ConsumerFetchTest.cs b/tests/NATS.Client.JetStream.Tests/ConsumerFetchTest.cs index dd1b4dd8c..005bf693a 100644 --- a/tests/NATS.Client.JetStream.Tests/ConsumerFetchTest.cs +++ b/tests/NATS.Client.JetStream.Tests/ConsumerFetchTest.cs @@ -40,6 +40,38 @@ public async Task Fetch_test(bool noWait) Assert.Equal(10, count); } + [Theory] + [InlineData(0, NatsSubEndReason.NoMsgs)] + [InlineData(5, NatsSubEndReason.Timeout)] + public async Task EndReason_test(int sendCount, NatsSubEndReason endReason) + { + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + await using var server = NatsServer.StartJS(); + await using var nats = server.CreateClientConnection(); + var js = new NatsJSContext(nats); + await js.CreateStreamAsync("s1", new[] { "s1.*" }, cts.Token); + await js.CreateConsumerAsync("s1", "c1", cancellationToken: cts.Token); + + for (var i = 0; i < sendCount; i++) + { + var ack = await js.PublishAsync("s1.foo", new TestData { Test = i }, cancellationToken: cts.Token); + ack.EnsureSuccess(); + } + + var consumer = await js.GetConsumerAsync("s1", "c1", cts.Token); + var count = 0; + await using var fc = + await consumer.FetchAsync(new NatsJSFetchOpts { MaxMsgs = 10, NoWait = true }, cancellationToken: cts.Token); + await foreach (var msg in fc.Msgs.ReadAllAsync(cts.Token)) + { + await msg.AckAsync(new AckOpts(WaitUntilSent: true), cts.Token); + count++; + } + + Assert.Equal(sendCount, count); + Assert.Equal(endReason, fc.EndReason); + } + private record TestData { public int Test { get; init; } From 6a8e6c7200074bb2aa5f264ed4eaf11ce0a8d6aa Mon Sep 17 00:00:00 2001 From: Simon Hoss Date: Wed, 18 Oct 2023 21:35:51 +0200 Subject: [PATCH 11/13] Add FetchNoWait --- src/NATS.Client.JetStream/NatsJSConsumer.cs | 16 ++++++++++++- src/NATS.Client.JetStream/NatsJSOpts.cs | 2 +- .../ConsumerFetchTest.cs | 24 +++++++------------ 3 files changed, 25 insertions(+), 17 deletions(-) diff --git a/src/NATS.Client.JetStream/NatsJSConsumer.cs b/src/NATS.Client.JetStream/NatsJSConsumer.cs index 7c248b0ce..55707edea 100644 --- a/src/NATS.Client.JetStream/NatsJSConsumer.cs +++ b/src/NATS.Client.JetStream/NatsJSConsumer.cs @@ -207,6 +207,20 @@ await sub.CallMsgNextAsync( } } + public async IAsyncEnumerable> FetchNoWait( + NatsJSFetchOpts? opts = default, + [EnumeratorCancellation] CancellationToken cancellationToken = default) + { + ThrowIfDeleted(); + opts ??= _context.Opts.DefaultFetchOpts; + + await using var fc = await FetchAsync(opts with { NoWait = true }, cancellationToken); + await foreach (var jsMsg in fc.Msgs.ReadAllAsync(cancellationToken)) + { + yield return jsMsg; + } + } + /// /// Consume a set number of messages from the stream using this consumer. /// @@ -261,7 +275,7 @@ await sub.CallMsgNextAsync( // When no wait is set we don't need to send the idle heartbeat and expiration // If no message is available the server will respond with a 404 immediately // If messages are available the server will send a 408 direct after the last message - ? new ConsumerGetnextRequest { Batch = max.MaxMsgs, MaxBytes = max.MaxBytes, NoWait = opts.NoWait } + ? new ConsumerGetnextRequest {Batch = max.MaxMsgs, MaxBytes = max.MaxBytes, NoWait = opts.NoWait} : new ConsumerGetnextRequest { Batch = max.MaxMsgs, diff --git a/src/NATS.Client.JetStream/NatsJSOpts.cs b/src/NATS.Client.JetStream/NatsJSOpts.cs index 86b58a007..8e5081e7b 100644 --- a/src/NATS.Client.JetStream/NatsJSOpts.cs +++ b/src/NATS.Client.JetStream/NatsJSOpts.cs @@ -157,7 +157,7 @@ public record NatsJSFetchOpts /// /// Does not wait for messages to be available /// - public bool NoWait { get; init; } + internal bool NoWait { get; init; } /// /// Serializer to use to deserialize the message if a model is being used. diff --git a/tests/NATS.Client.JetStream.Tests/ConsumerFetchTest.cs b/tests/NATS.Client.JetStream.Tests/ConsumerFetchTest.cs index 005bf693a..0c6ee2505 100644 --- a/tests/NATS.Client.JetStream.Tests/ConsumerFetchTest.cs +++ b/tests/NATS.Client.JetStream.Tests/ConsumerFetchTest.cs @@ -8,10 +8,8 @@ public class ConsumerFetchTest public ConsumerFetchTest(ITestOutputHelper output) => _output = output; - [Theory] - [InlineData(true)] - [InlineData(false)] - public async Task Fetch_test(bool noWait) + [Fact] + public async Task Fetch_test() { var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); await using var server = NatsServer.StartJS(); @@ -29,7 +27,7 @@ public async Task Fetch_test(bool noWait) var consumer = await js.GetConsumerAsync("s1", "c1", cts.Token); var count = 0; await using var fc = - await consumer.FetchAsync(new NatsJSFetchOpts { MaxMsgs = 10, NoWait = noWait }, cancellationToken: cts.Token); + await consumer.FetchAsync(new NatsJSFetchOpts { MaxMsgs = 10 }, cancellationToken: cts.Token); await foreach (var msg in fc.Msgs.ReadAllAsync(cts.Token)) { await msg.AckAsync(new AckOpts(WaitUntilSent: true), cts.Token); @@ -40,10 +38,8 @@ public async Task Fetch_test(bool noWait) Assert.Equal(10, count); } - [Theory] - [InlineData(0, NatsSubEndReason.NoMsgs)] - [InlineData(5, NatsSubEndReason.Timeout)] - public async Task EndReason_test(int sendCount, NatsSubEndReason endReason) + [Fact] + public async Task FetchNoWait_test() { var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); await using var server = NatsServer.StartJS(); @@ -52,7 +48,7 @@ public async Task EndReason_test(int sendCount, NatsSubEndReason endReason) await js.CreateStreamAsync("s1", new[] { "s1.*" }, cts.Token); await js.CreateConsumerAsync("s1", "c1", cancellationToken: cts.Token); - for (var i = 0; i < sendCount; i++) + for (var i = 0; i < 10; i++) { var ack = await js.PublishAsync("s1.foo", new TestData { Test = i }, cancellationToken: cts.Token); ack.EnsureSuccess(); @@ -60,16 +56,14 @@ public async Task EndReason_test(int sendCount, NatsSubEndReason endReason) var consumer = await js.GetConsumerAsync("s1", "c1", cts.Token); var count = 0; - await using var fc = - await consumer.FetchAsync(new NatsJSFetchOpts { MaxMsgs = 10, NoWait = true }, cancellationToken: cts.Token); - await foreach (var msg in fc.Msgs.ReadAllAsync(cts.Token)) + await foreach (var msg in consumer.FetchNoWait(new NatsJSFetchOpts { MaxMsgs = 10 }, cancellationToken: cts.Token)) { await msg.AckAsync(new AckOpts(WaitUntilSent: true), cts.Token); + Assert.Equal(count, msg.Data!.Test); count++; } - Assert.Equal(sendCount, count); - Assert.Equal(endReason, fc.EndReason); + Assert.Equal(10, count); } private record TestData From bda4fc330fcaf35cad26f07785b10586e3928756 Mon Sep 17 00:00:00 2001 From: Simon Hoss Date: Thu, 19 Oct 2023 10:13:21 +0200 Subject: [PATCH 12/13] Remove EndReason --- src/NATS.Client.JetStream/INatsJSFetch.cs | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/NATS.Client.JetStream/INatsJSFetch.cs b/src/NATS.Client.JetStream/INatsJSFetch.cs index 311d003cb..b48a62853 100644 --- a/src/NATS.Client.JetStream/INatsJSFetch.cs +++ b/src/NATS.Client.JetStream/INatsJSFetch.cs @@ -8,11 +8,6 @@ namespace NATS.Client.JetStream; /// public interface INatsJSFetch : IAsyncDisposable { - /// - /// The reason why the fetch has ended - /// - NatsSubEndReason EndReason { get; } - void Stop(); } From 500def5cdf7bd4ad42ee3278ba6b8505a11535e8 Mon Sep 17 00:00:00 2001 From: Simon Hoss Date: Thu, 19 Oct 2023 10:14:06 +0200 Subject: [PATCH 13/13] Remove unused namespace --- src/NATS.Client.JetStream/INatsJSFetch.cs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/NATS.Client.JetStream/INatsJSFetch.cs b/src/NATS.Client.JetStream/INatsJSFetch.cs index b48a62853..e835678e2 100644 --- a/src/NATS.Client.JetStream/INatsJSFetch.cs +++ b/src/NATS.Client.JetStream/INatsJSFetch.cs @@ -1,5 +1,4 @@ using System.Threading.Channels; -using NATS.Client.Core; namespace NATS.Client.JetStream;