Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add no wait for consumer #152

Merged
merged 16 commits into from
Oct 20, 2023
Merged
1 change: 1 addition & 0 deletions src/NATS.Client.Core/NatsSubBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ namespace NATS.Client.Core;
public enum NatsSubEndReason
{
None,
NoMsgs,
MaxMsgs,
MaxBytes,
Timeout,
Expand Down
6 changes: 5 additions & 1 deletion src/NATS.Client.JetStream/Internal/NatsJSFetch.cs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,11 @@ protected override async ValueTask ReceiveInternalAsync(
var headers = new NatsHeaders();
if (Connection.HeaderParser.ParseHeaders(new SequenceReader<byte>(headersBuffer.Value), headers))
{
if (headers is { Code: 408, Message: NatsHeaders.Messages.RequestTimeout })
if (headers is { Code: 404 })
{
EndSubscription(NatsSubEndReason.NoMsgs);
}
else if (headers is { Code: 408, Message: NatsHeaders.Messages.RequestTimeout })
{
EndSubscription(NatsSubEndReason.Timeout);
}
Expand Down
35 changes: 28 additions & 7 deletions src/NATS.Client.JetStream/NatsJSConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,20 @@ await sub.CallMsgNextAsync(
}
}

public async IAsyncEnumerable<NatsJSMsg<T?>> FetchNoWait<T>(
NatsJSFetchOpts? opts = default,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
ThrowIfDeleted();
opts ??= _context.Opts.DefaultFetchOpts;

await using var fc = await FetchAsync<T>(opts with { NoWait = true }, cancellationToken);
await foreach (var jsMsg in fc.Msgs.ReadAllAsync(cancellationToken))
{
yield return jsMsg;
}
}

/// <summary>
/// Consume a set number of messages from the stream using this consumer.
/// </summary>
Expand Down Expand Up @@ -235,13 +249,20 @@ public async ValueTask<INatsJSFetch<T>> FetchAsync<T>(
await _context.Connection.SubAsync(sub: sub, cancellationToken);

await sub.CallMsgNextAsync(
new ConsumerGetnextRequest
{
Batch = max.MaxMsgs,
MaxBytes = max.MaxBytes,
IdleHeartbeat = timeouts.IdleHeartbeat.ToNanos(),
Expires = timeouts.Expires.ToNanos(),
},
opts.NoWait

// When no wait is set we don't need to send the idle heartbeat and expiration
// If no message is available the server will respond with a 404 immediately
// If messages are available the server will send a 408 direct after the last message
? new ConsumerGetnextRequest {Batch = max.MaxMsgs, MaxBytes = max.MaxBytes, NoWait = opts.NoWait}
: new ConsumerGetnextRequest
{
Batch = max.MaxMsgs,
MaxBytes = max.MaxBytes,
IdleHeartbeat = timeouts.IdleHeartbeat.ToNanos(),
Expires = timeouts.Expires.ToNanos(),
NoWait = opts.NoWait,
},
cancellationToken);

sub.ResetHeartbeatTimer();
Expand Down
5 changes: 5 additions & 0 deletions src/NATS.Client.JetStream/NatsJSOpts.cs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,11 @@ public record NatsJSFetchOpts
/// </summary>
public TimeSpan? IdleHeartbeat { get; init; }

/// <summary>
/// Does not wait for messages to be available
/// </summary>
internal bool NoWait { get; init; }

/// <summary>
/// Serializer to use to deserialize the message if a model is being used.
/// </summary>
Expand Down
28 changes: 28 additions & 0 deletions tests/NATS.Client.JetStream.Tests/ConsumerFetchTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,34 @@ public async Task Fetch_test()
Assert.Equal(10, count);
}

[Fact]
public async Task FetchNoWait_test()
{
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
await using var server = NatsServer.StartJS();
await using var nats = server.CreateClientConnection();
var js = new NatsJSContext(nats);
await js.CreateStreamAsync("s1", new[] { "s1.*" }, cts.Token);
await js.CreateConsumerAsync("s1", "c1", cancellationToken: cts.Token);

for (var i = 0; i < 10; i++)
{
var ack = await js.PublishAsync("s1.foo", new TestData { Test = i }, cancellationToken: cts.Token);
ack.EnsureSuccess();
}

var consumer = await js.GetConsumerAsync("s1", "c1", cts.Token);
var count = 0;
await foreach (var msg in consumer.FetchNoWait<TestData>(new NatsJSFetchOpts { MaxMsgs = 10 }, cancellationToken: cts.Token))
{
await msg.AckAsync(new AckOpts(WaitUntilSent: true), cts.Token);
Assert.Equal(count, msg.Data!.Test);
count++;
}

Assert.Equal(10, count);
}

[Fact]
public async Task Fetch_dispose_test()
{
Expand Down
Loading