Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
mtmk committed Aug 29, 2023
1 parent 0a8537d commit c2547a7
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 56 deletions.
63 changes: 47 additions & 16 deletions sandbox/Example.JetStream.PullConsumer/Program.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
using System.Threading.Channels;
using Example.JetStream.PullConsumer;
using Example.JetStream.PullConsumer;
using Microsoft.Extensions.Logging;
using NATS.Client.Core;
using NATS.Client.Core.Internal;
using NATS.Client.JetStream;
using NATS.Client.JetStream.Internal;
using NATS.Client.JetStream.Models;

var options = NatsOptions.Default with { LoggerFactory = new MinimumConsoleLoggerFactory(LogLevel.Error) };

Expand All @@ -19,17 +15,52 @@
var expires = TimeSpan.FromSeconds(10);
var batch = 10;

await using var sub = await consumer.ConsumeAsync<RawData>(new NatsJSConsumeOpts
if (args.Length > 0 && args[0] == "fetch")
{
MaxMsgs = batch,
Expires = expires,
IdleHeartbeat = idle,
Serializer = new RawDataSerializer(),
});

await foreach (var jsMsg in sub.Msgs.ReadAllAsync())
while (true)
{
Console.WriteLine($"___\nFETCH {batch}");
await using var sub = await consumer.FetchAsync<RawData>(new NatsJSFetchOpts
{
MaxMsgs = batch, Expires = expires, IdleHeartbeat = idle, Serializer = new RawDataSerializer(),
});
await foreach (var jsMsg in sub.Msgs.ReadAllAsync())
{
var msg = jsMsg.Msg;
Console.WriteLine($"data: {msg.Data}");
await jsMsg.AckAsync();
}
}
}
else if (args.Length > 0 && args[0] == "next")
{
var msg = jsMsg.Msg;
Console.WriteLine($"data: {msg.Data}");
await jsMsg.AckAsync();
while (true)
{
Console.WriteLine("___\nNEXT");
var next = await consumer.NextAsync<RawData>(new NatsJSNextOpts
{
Expires = expires, IdleHeartbeat = idle, Serializer = new RawDataSerializer(),
});
if (next is { } jsMsg)
{
var msg = jsMsg.Msg;
Console.WriteLine($"data: {msg.Data}");
await jsMsg.AckAsync();
}
}
}
else
{
Console.WriteLine("___\nCONSUME");
await using var sub = await consumer.ConsumeAsync<RawData>(new NatsJSConsumeOpts
{
MaxMsgs = batch, Expires = expires, IdleHeartbeat = idle, Serializer = new RawDataSerializer(),
});

await foreach (var jsMsg in sub.Msgs.ReadAllAsync())
{
var msg = jsMsg.Msg;
Console.WriteLine($"data: {msg.Data}");
await jsMsg.AckAsync();
}
}
20 changes: 15 additions & 5 deletions src/NATS.Client.JetStream/NatsJSConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -77,19 +77,29 @@ await sub.CallMsgNextAsync(
cancellationToken);

sub.ResetPending();
sub.ResetHeartbeatTimer();

return sub;
}

public async ValueTask<NatsJSMsg<T?>> NextAsync<T>(CancellationToken cancellationToken = default)
public async ValueTask<NatsJSMsg<T?>?> NextAsync<T>(NatsJSNextOpts opts, CancellationToken cancellationToken = default)
{
await using var f = await FetchAsync<T>(new NatsJSFetchOpts { MaxMsgs = 1 }, cancellationToken: cancellationToken);
await using var f = await FetchAsync<T>(
new NatsJSFetchOpts
{
MaxMsgs = 1,
IdleHeartbeat = opts.IdleHeartbeat,
Expires = opts.Expires,
Serializer = opts.Serializer,
},
cancellationToken: cancellationToken);

await foreach (var natsJSMsg in f.Msgs.ReadAllAsync(cancellationToken))
{
return natsJSMsg;
}

throw new NatsJSException("No data");
return default;
}

public async ValueTask<INatsJSSubConsume<T>> FetchAsync<T>(
Expand All @@ -114,8 +124,6 @@ public async ValueTask<INatsJSSubConsume<T>> FetchAsync<T>(
Capacity = 1,
FullMode = BoundedChannelFullMode.Wait,
},
MaxMsgs = (int)max.MaxMsgs,
Timeout = timeouts.Expires,
};

var sub = new NatsJSSubFetch<T>(
Expand Down Expand Up @@ -144,6 +152,8 @@ await sub.CallMsgNextAsync(
},
cancellationToken);

sub.ResetHeartbeatTimer();

return sub;
}

Expand Down
9 changes: 9 additions & 0 deletions src/NATS.Client.JetStream/NatsJSOpts.cs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,15 @@ public record NatsJSNextOpts
/// Amount idle time the server should wait before sending a heartbeat. For requests with expires > 30s, heartbeats should be enabled by default
/// </summary>
public TimeSpan? IdleHeartbeat { get; init; }

/// <summary>
/// Serializer to use to deserialize the message if a model is being used.
/// </summary>
/// <remarks>
/// If not set, serializer set in connection options or the default JSON serializer
/// will be used.
/// </remarks>
public INatsSerializer? Serializer { get; init; }
}

public record NatsJSFetchOpts
Expand Down
4 changes: 2 additions & 2 deletions src/NATS.Client.JetStream/NatsJSSubConsume.cs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ public ValueTask CallMsgNextAsync(ConsumerGetnextRequest request, CancellationTo

public void ResetPending() => _pending = _batch;

public void ResetHeartbeatTimer() => _timer.Change(_hbTimeout, Timeout.Infinite);

public override async ValueTask DisposeAsync()
{
await base.DisposeAsync();
Expand Down Expand Up @@ -204,8 +206,6 @@ protected override void TryComplete()
_notifications.Writer.TryComplete();
}

private void ResetHeartbeatTimer() => _timer.Change(_hbTimeout, Timeout.Infinite);

private void CheckPending()
{
if (_pending <= _threshold)
Expand Down
79 changes: 46 additions & 33 deletions src/NATS.Client.JetStream/NatsJSSubFetch.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ public class NatsJSSubFetch<TMsg> : NatsSubBase, INatsJSSubConsume<TMsg>
private readonly long _idle;
private readonly long _hbTimeout;

private long _pending;

public NatsJSSubFetch(
long batch,
TimeSpan expires,
Expand All @@ -50,6 +52,7 @@ public NatsJSSubFetch(
_expires = expires.ToNanos();
_idle = idle.ToNanos();
_hbTimeout = (int)(idle * 2).TotalMilliseconds;
_pending = _batch;

_userMsgs = Channel.CreateBounded<NatsJSMsg<TMsg?>>(NatsSub.GetChannelOptions(opts?.ChannelOptions));
Msgs = _userMsgs.Reader;
Expand Down Expand Up @@ -79,6 +82,8 @@ public ValueTask CallMsgNextAsync(ConsumerGetnextRequest request, CancellationTo
headers: default,
cancellationToken);

public void ResetHeartbeatTimer() => _timer.Change(_hbTimeout, Timeout.Infinite);

public override async ValueTask DisposeAsync()
{
await base.DisposeAsync();
Expand Down Expand Up @@ -114,55 +119,65 @@ protected override ValueTask ReceiveInternalAsync(
ReadOnlySequence<byte>? headersBuffer,
ReadOnlySequence<byte> payloadBuffer)
{
ResetHeartbeatTimer();
if (subject == Subject)
try
{
if (headersBuffer.HasValue)
ResetHeartbeatTimer();
if (subject == Subject)
{
var headers = new NatsHeaders();
if (Connection.HeaderParser.ParseHeaders(new SequenceReader<byte>(headersBuffer.Value), headers))
if (headersBuffer.HasValue)
{
if (headers is { Code: 408, Message: NatsHeaders.Messages.RequestTimeout })
var headers = new NatsHeaders();
if (Connection.HeaderParser.ParseHeaders(new SequenceReader<byte>(headersBuffer.Value), headers))
{
}
else if (headers is { Code: 100, Message: NatsHeaders.Messages.IdleHeartbeat })
{
EndSubscription(NatsSubEndReason.Timeout);
if (headers is { Code: 408, Message: NatsHeaders.Messages.RequestTimeout })
{
EndSubscription(NatsSubEndReason.Timeout);
}
else if (headers is { Code: 100, Message: NatsHeaders.Messages.IdleHeartbeat })
{
}
else
{
_notifications.Writer.TryWrite(new NatsJSNotification(headers.Code, headers.MessageText));
}
}
else
{
_notifications.Writer.TryWrite(new NatsJSNotification(headers.Code, headers.MessageText));
_logger.LogError(
"Can't parse headers: {HeadersBuffer}",
Encoding.ASCII.GetString(headersBuffer.Value.ToArray()));
throw new NatsJSException("Can't parse headers");
}
}
else
{
_logger.LogError(
"Can't parse headers: {HeadersBuffer}",
Encoding.ASCII.GetString(headersBuffer.Value.ToArray()));
throw new NatsJSException("Can't parse headers");
throw new NatsJSException("No header found");
}

return ValueTask.CompletedTask;
}
else
{
throw new NatsJSException("No header found");
var msg = new NatsJSMsg<TMsg?>(NatsMsg<TMsg?>.Build(
subject,
replyTo,
headersBuffer,
payloadBuffer,
Connection,
Connection.HeaderParser,
_serializer));

_pending--;

return _userMsgs.Writer.WriteAsync(msg);
}

return ValueTask.CompletedTask;
}
else
finally
{
var msg = new NatsJSMsg<TMsg?>(NatsMsg<TMsg?>.Build(
subject,
replyTo,
headersBuffer,
payloadBuffer,
Connection,
Connection.HeaderParser,
_serializer));

DecrementMaxMsgs();

return _userMsgs.Writer.WriteAsync(msg);
if (_pending == 0)
{
EndSubscription(NatsSubEndReason.MaxMsgs);
}
}
}

Expand All @@ -172,8 +187,6 @@ protected override void TryComplete()
_notifications.Writer.TryComplete();
}

private void ResetHeartbeatTimer() => _timer.Change(_hbTimeout, Timeout.Infinite);

private async Task NotificationsLoop()
{
await foreach (var notification in _notifications.Reader.ReadAllAsync())
Expand Down

0 comments on commit c2547a7

Please sign in to comment.