Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ordered consumer optimizations #336

Merged
merged 2 commits into from
Jan 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/NATS.Client.Core/NatsConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ internal ValueTask UnsubscribeAsync(int sid)
return ValueTask.CompletedTask;
}

internal void MessageDropped<T>(NatsSub<T> natsSub, int pending, NatsMsg<T> msg)
internal void MessageDropped<T>(NatsSubBase natsSub, int pending, NatsMsg<T> msg)
{
var subject = msg.Subject;
_logger.LogWarning("Dropped message from {Subject} with {Pending} pending messages", subject, pending);
Expand Down
2 changes: 1 addition & 1 deletion src/NATS.Client.JetStream/INatsJSContextFactory.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using NATS.Client.Core;
using NATS.Client.Core;

namespace NATS.Client.JetStream;

Expand Down
16 changes: 7 additions & 9 deletions src/NATS.Client.JetStream/Internal/NatsJSOrderedConsume.cs
Original file line number Diff line number Diff line change
Expand Up @@ -94,17 +94,15 @@ public NatsJSOrderedConsume(
Timeout.Infinite,
Timeout.Infinite);

// Keep user channel small to avoid blocking the user code
// when disposed otherwise channel reader will continue delivering messages
// if there are messages queued up already. This channel is used to pass messages
// to the user from the subscription channel (which should be set to a
// sufficiently large value to avoid blocking socket reads in the
// NATS connection).
_userMsgs = Channel.CreateBounded<NatsJSMsg<TMsg>>(1);
// This channel is used to pass messages to the user from the subscription.
_userMsgs = Channel.CreateBounded<NatsJSMsg<TMsg>>(
Connection.GetChannelOpts(Connection.Opts, opts?.ChannelOpts),
msg => Connection.MessageDropped(this, _userMsgs?.Reader.Count ?? 0, msg.Msg));
Msgs = _userMsgs.Reader;

// Capacity as 1 is enough here since it's used for signaling only.
_pullRequests = Channel.CreateBounded<PullRequest>(1);
// Pull request channel is set as unbounded because we don't want to drop
// them and minimize potential lock contention.
_pullRequests = Channel.CreateUnbounded<PullRequest>();
_pullTask = Task.Run(PullLoop);

ResetPending();
Expand Down
2 changes: 2 additions & 0 deletions src/NATS.Client.JetStream/NatsJSMsg.cs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,8 @@ public NatsJSMsg(NatsMsg<T> msg, NatsJSContext context)
/// </summary>
public string? ReplyTo => _msg.ReplyTo;

internal NatsMsg<T> Msg => _msg;

/// <summary>
/// Reply with an empty message.
/// </summary>
Expand Down
27 changes: 2 additions & 25 deletions src/NATS.Client.JetStream/NatsJSOrderedConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -119,32 +119,9 @@ public async IAsyncEnumerable<NatsJSMsg<T>> ConsumeAsync<T>(
{
NatsJSMsg<T> msg;

try
{
var canRead = cc.Msgs.TryRead(out msg);
if (!canRead)
break;
}
catch (OperationCanceledException)
{
var canRead = cc.Msgs.TryRead(out msg);
if (!canRead)
break;
}
catch (NatsJSProtocolException pe)
{
protocolException = pe;
goto CONSUME_LOOP;
}
catch (NatsJSConnectionException e)
{
_logger.LogWarning(NatsJSLogEvents.Retry, "{Error}. Retrying...", e.Message);
goto CONSUME_LOOP;
}
catch (NatsJSTimeoutException e)
{
notificationHandler?.Invoke(new NatsJSTimeoutNotification(), cancellationToken);
_logger.LogWarning(NatsJSLogEvents.Retry, "{Error}. Retrying...", e.Message);
goto CONSUME_LOOP;
}

if (msg.Metadata is not { } metadata)
continue;
Expand Down
9 changes: 4 additions & 5 deletions tests/NATS.Client.JetStream.Tests/NatsJsContextFactoryTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,11 @@ public class MockConnection : INatsConnection

public string NewInbox() => throw new NotImplementedException();

public ValueTask<NatsMsg<TReply>> RequestAsync<TRequest, TReply>(string subject, TRequest? data, NatsHeaders? headers = default, INatsSerialize<TRequest>? requestSerializer = default, INatsDeserialize<TReply>? replySerializer = default, NatsPubOpts? requestOpts = default,
NatsSubOpts? replyOpts = default, CancellationToken cancellationToken = default) =>
throw new NotImplementedException();
public ValueTask<NatsMsg<TReply>> RequestAsync<TRequest, TReply>(string subject, TRequest? data, NatsHeaders? headers = default, INatsSerialize<TRequest>? requestSerializer = default, INatsDeserialize<TReply>? replySerializer = default, NatsPubOpts? requestOpts = default, NatsSubOpts? replyOpts = default, CancellationToken cancellationToken = default)
=> throw new NotImplementedException();

public IAsyncEnumerable<NatsMsg<TReply>> RequestManyAsync<TRequest, TReply>(string subject, TRequest? data, NatsHeaders? headers = default, INatsSerialize<TRequest>? requestSerializer = default, INatsDeserialize<TReply>? replySerializer = default, NatsPubOpts? requestOpts = default,
NatsSubOpts? replyOpts = default, CancellationToken cancellationToken = default) => throw new NotImplementedException();
public IAsyncEnumerable<NatsMsg<TReply>> RequestManyAsync<TRequest, TReply>(string subject, TRequest? data, NatsHeaders? headers = default, INatsSerialize<TRequest>? requestSerializer = default, INatsDeserialize<TReply>? replySerializer = default, NatsPubOpts? requestOpts = default, NatsSubOpts? replyOpts = default, CancellationToken cancellationToken = default)
=> throw new NotImplementedException();

public ValueTask ConnectAsync() => throw new NotImplementedException();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ public async Task Create_Context_Test()
context.Should().NotBeNull();
}


[Fact]
public void Create_Context_WithMockConnection_Test()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ public async Task Create_Context_Test()
context.Should().NotBeNull();
}


[Fact]
public void Create_Context_WithMockConnection_Test()
{
Expand Down