diff --git a/src/NATS.Client.Core/NatsConnection.cs b/src/NATS.Client.Core/NatsConnection.cs index 847be4610..fa3d34a4e 100644 --- a/src/NATS.Client.Core/NatsConnection.cs +++ b/src/NATS.Client.Core/NatsConnection.cs @@ -213,7 +213,7 @@ internal ValueTask UnsubscribeAsync(int sid) return ValueTask.CompletedTask; } - internal void MessageDropped(NatsSub natsSub, int pending, NatsMsg msg) + internal void MessageDropped(NatsSubBase natsSub, int pending, NatsMsg msg) { var subject = msg.Subject; _logger.LogWarning("Dropped message from {Subject} with {Pending} pending messages", subject, pending); diff --git a/src/NATS.Client.JetStream/INatsJSContextFactory.cs b/src/NATS.Client.JetStream/INatsJSContextFactory.cs index 086afbd4e..445686815 100644 --- a/src/NATS.Client.JetStream/INatsJSContextFactory.cs +++ b/src/NATS.Client.JetStream/INatsJSContextFactory.cs @@ -1,4 +1,4 @@ -using NATS.Client.Core; +using NATS.Client.Core; namespace NATS.Client.JetStream; diff --git a/src/NATS.Client.JetStream/Internal/NatsJSOrderedConsume.cs b/src/NATS.Client.JetStream/Internal/NatsJSOrderedConsume.cs index e1a38227b..2b6d2d17f 100644 --- a/src/NATS.Client.JetStream/Internal/NatsJSOrderedConsume.cs +++ b/src/NATS.Client.JetStream/Internal/NatsJSOrderedConsume.cs @@ -94,17 +94,15 @@ public NatsJSOrderedConsume( Timeout.Infinite, Timeout.Infinite); - // Keep user channel small to avoid blocking the user code - // when disposed otherwise channel reader will continue delivering messages - // if there are messages queued up already. This channel is used to pass messages - // to the user from the subscription channel (which should be set to a - // sufficiently large value to avoid blocking socket reads in the - // NATS connection). - _userMsgs = Channel.CreateBounded>(1); + // This channel is used to pass messages to the user from the subscription. + _userMsgs = Channel.CreateBounded>( + Connection.GetChannelOpts(Connection.Opts, opts?.ChannelOpts), + msg => Connection.MessageDropped(this, _userMsgs?.Reader.Count ?? 0, msg.Msg)); Msgs = _userMsgs.Reader; - // Capacity as 1 is enough here since it's used for signaling only. - _pullRequests = Channel.CreateBounded(1); + // Pull request channel is set as unbounded because we don't want to drop + // them and minimize potential lock contention. + _pullRequests = Channel.CreateUnbounded(); _pullTask = Task.Run(PullLoop); ResetPending(); diff --git a/src/NATS.Client.JetStream/NatsJSMsg.cs b/src/NATS.Client.JetStream/NatsJSMsg.cs index 25aef4c76..b2f849041 100644 --- a/src/NATS.Client.JetStream/NatsJSMsg.cs +++ b/src/NATS.Client.JetStream/NatsJSMsg.cs @@ -186,6 +186,8 @@ public NatsJSMsg(NatsMsg msg, NatsJSContext context) /// public string? ReplyTo => _msg.ReplyTo; + internal NatsMsg Msg => _msg; + /// /// Reply with an empty message. /// diff --git a/src/NATS.Client.JetStream/NatsJSOrderedConsumer.cs b/src/NATS.Client.JetStream/NatsJSOrderedConsumer.cs index 32e7285de..e0f6398b7 100644 --- a/src/NATS.Client.JetStream/NatsJSOrderedConsumer.cs +++ b/src/NATS.Client.JetStream/NatsJSOrderedConsumer.cs @@ -119,32 +119,9 @@ public async IAsyncEnumerable> ConsumeAsync( { NatsJSMsg msg; - try - { - var canRead = cc.Msgs.TryRead(out msg); - if (!canRead) - break; - } - catch (OperationCanceledException) - { + var canRead = cc.Msgs.TryRead(out msg); + if (!canRead) break; - } - catch (NatsJSProtocolException pe) - { - protocolException = pe; - goto CONSUME_LOOP; - } - catch (NatsJSConnectionException e) - { - _logger.LogWarning(NatsJSLogEvents.Retry, "{Error}. Retrying...", e.Message); - goto CONSUME_LOOP; - } - catch (NatsJSTimeoutException e) - { - notificationHandler?.Invoke(new NatsJSTimeoutNotification(), cancellationToken); - _logger.LogWarning(NatsJSLogEvents.Retry, "{Error}. Retrying...", e.Message); - goto CONSUME_LOOP; - } if (msg.Metadata is not { } metadata) continue; diff --git a/tests/NATS.Client.JetStream.Tests/NatsJsContextFactoryTest.cs b/tests/NATS.Client.JetStream.Tests/NatsJsContextFactoryTest.cs index ee5d6103b..d9130427c 100644 --- a/tests/NATS.Client.JetStream.Tests/NatsJsContextFactoryTest.cs +++ b/tests/NATS.Client.JetStream.Tests/NatsJsContextFactoryTest.cs @@ -103,12 +103,11 @@ public class MockConnection : INatsConnection public string NewInbox() => throw new NotImplementedException(); - public ValueTask> RequestAsync(string subject, TRequest? data, NatsHeaders? headers = default, INatsSerialize? requestSerializer = default, INatsDeserialize? replySerializer = default, NatsPubOpts? requestOpts = default, - NatsSubOpts? replyOpts = default, CancellationToken cancellationToken = default) => - throw new NotImplementedException(); + public ValueTask> RequestAsync(string subject, TRequest? data, NatsHeaders? headers = default, INatsSerialize? requestSerializer = default, INatsDeserialize? replySerializer = default, NatsPubOpts? requestOpts = default, NatsSubOpts? replyOpts = default, CancellationToken cancellationToken = default) + => throw new NotImplementedException(); - public IAsyncEnumerable> RequestManyAsync(string subject, TRequest? data, NatsHeaders? headers = default, INatsSerialize? requestSerializer = default, INatsDeserialize? replySerializer = default, NatsPubOpts? requestOpts = default, - NatsSubOpts? replyOpts = default, CancellationToken cancellationToken = default) => throw new NotImplementedException(); + public IAsyncEnumerable> RequestManyAsync(string subject, TRequest? data, NatsHeaders? headers = default, INatsSerialize? requestSerializer = default, INatsDeserialize? replySerializer = default, NatsPubOpts? requestOpts = default, NatsSubOpts? replyOpts = default, CancellationToken cancellationToken = default) + => throw new NotImplementedException(); public ValueTask ConnectAsync() => throw new NotImplementedException(); diff --git a/tests/NATS.Client.KeyValueStore.Tests/NatsKVContextFactoryTest.cs b/tests/NATS.Client.KeyValueStore.Tests/NatsKVContextFactoryTest.cs index f1a0571b7..6b173149e 100644 --- a/tests/NATS.Client.KeyValueStore.Tests/NatsKVContextFactoryTest.cs +++ b/tests/NATS.Client.KeyValueStore.Tests/NatsKVContextFactoryTest.cs @@ -32,7 +32,6 @@ public async Task Create_Context_Test() context.Should().NotBeNull(); } - [Fact] public void Create_Context_WithMockConnection_Test() { diff --git a/tests/NATS.Client.ObjectStore.Tests/NatsObjContextFactoryTest.cs b/tests/NATS.Client.ObjectStore.Tests/NatsObjContextFactoryTest.cs index 2dd8e9f39..fc4d43ea8 100644 --- a/tests/NATS.Client.ObjectStore.Tests/NatsObjContextFactoryTest.cs +++ b/tests/NATS.Client.ObjectStore.Tests/NatsObjContextFactoryTest.cs @@ -32,7 +32,6 @@ public async Task Create_Context_Test() context.Should().NotBeNull(); } - [Fact] public void Create_Context_WithMockConnection_Test() {