From d7b6baa7600dcaaeaad2a4de247b3740e5d44aef Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Fri, 2 Feb 2024 19:46:48 +0000 Subject: [PATCH] Send buffer changes (#346) * Send buffer refinements * Command writer pipeline encapsulation * Default write buffer size * Revert tmp comment outs * HPUB writer fix * Use channel instead of semaphore for locks * Reuse same pipeline avoiding data loss * Tidy up * Reverted ping command * Revert to clearing buffer Other NATS clients do the same * Lock function * Added inlining * Command timeouts * Cancellation fixes * Derive pool rent size from buffer size option * Fixed format * Command timeout test * Test debug * Keep send buffer at message boundaries * Flush buffers cleanly on dispose * Fixing buffer msg boundry issue * Format fixed * Buffer position * Handle socket exception * throw away bytes in send buffer after a failed send (#368) Signed-off-by: Caleb Lloyd --------- Signed-off-by: Caleb Lloyd Co-authored-by: Caleb Lloyd <2414837+caleblloyd@users.noreply.github.com> --- .../Commands/CommandWriter.cs | 780 +++++++----------- .../Commands/NatsPooledBufferWriter.cs | 208 +++++ .../Commands/PriorityCommandWriter.cs | 25 + .../Commands/ProtocolWriter.cs | 283 ++++--- .../Internal/CancellationTimer.cs | 6 +- src/NATS.Client.Core/Internal/HeaderWriter.cs | 37 +- .../NatsPipeliningWriteProtocolProcessor.cs | 629 +++++++------- src/NATS.Client.Core/NatsConnection.cs | 19 +- src/NATS.Client.Core/NatsLogEvents.cs | 1 + src/NATS.Client.Core/NatsOpts.cs | 4 +- .../CancellationTest.cs | 31 +- .../ConnectionRetryTest.cs | 6 +- .../NATS.Client.Core.Tests/NatsHeaderTest.cs | 21 +- tests/NATS.Client.Core.Tests/ProtocolTest.cs | 52 +- .../NATS.Client.Core.Tests/SendBufferTest.cs | 78 ++ tests/NATS.Client.TestUtilities/MockServer.cs | 112 +++ .../TmpFileLogger.cs | 18 + 17 files changed, 1339 insertions(+), 971 deletions(-) create mode 100644 src/NATS.Client.Core/Commands/NatsPooledBufferWriter.cs create mode 100644 src/NATS.Client.Core/Commands/PriorityCommandWriter.cs create mode 100644 tests/NATS.Client.Core.Tests/SendBufferTest.cs create mode 100644 tests/NATS.Client.TestUtilities/MockServer.cs create mode 100644 tests/NATS.Client.TestUtilities/TmpFileLogger.cs diff --git a/src/NATS.Client.Core/Commands/CommandWriter.cs b/src/NATS.Client.Core/Commands/CommandWriter.cs index 818b7b766..31cc150d1 100644 --- a/src/NATS.Client.Core/Commands/CommandWriter.cs +++ b/src/NATS.Client.Core/Commands/CommandWriter.cs @@ -1,229 +1,150 @@ +using System.Buffers; using System.IO.Pipelines; using System.Runtime.CompilerServices; using System.Threading.Channels; +using Microsoft.Extensions.Logging; using NATS.Client.Core.Internal; namespace NATS.Client.Core.Commands; /// -/// Used to track commands that have been enqueued to the PipeReader -/// -internal readonly record struct QueuedCommand(int Size, int Trim = 0); - -/// -/// Sets up a Pipe, and provides methods to write to the PipeWriter +/// Sets up a buffer (Pipe), and provides methods to write protocol messages to the buffer /// When methods complete, they have been queued for sending /// and further cancellation is not possible /// /// /// These methods are in the hot path, and have all been -/// optimized to eliminate allocations and make an initial attempt -/// to run synchronously without the async state machine +/// optimized to eliminate allocations and minimize copying /// internal sealed class CommandWriter : IAsyncDisposable { + // set to a reasonable socket write mem size + private const int MaxSendSize = 16384; + + private readonly ILogger _logger; + private readonly ObjectPool _pool; + private readonly int _arrayPoolInitialSize; + private readonly object _lock = new(); + private readonly CancellationTokenSource _cts; private readonly ConnectionStatsCounter _counter; private readonly TimeSpan _defaultCommandTimeout; private readonly Action _enqueuePing; - private readonly NatsOpts _opts; - private readonly PipeWriter _pipeWriter; private readonly ProtocolWriter _protocolWriter; - private readonly ChannelWriter _queuedCommandsWriter; - private readonly SemaphoreSlim _semLock; + private readonly HeaderWriter _headerWriter; + private readonly Channel _channelLock; + private readonly Channel _channelSize; + private readonly CancellationTimerPool _ctPool; + private readonly PipeReader _pipeReader; + private readonly PipeWriter _pipeWriter; + private ISocketConnection? _socketConnection; private Task? _flushTask; - private bool _disposed; + private Task? _readerLoopTask; + private CancellationTokenSource? _ctsReader; + private volatile bool _disposed; - public CommandWriter(NatsOpts opts, ConnectionStatsCounter counter, Action enqueuePing, TimeSpan? overrideCommandTimeout = default) + public CommandWriter(ObjectPool pool, NatsOpts opts, ConnectionStatsCounter counter, Action enqueuePing, TimeSpan? overrideCommandTimeout = default) { + _logger = opts.LoggerFactory.CreateLogger(); + _pool = pool; + + // Derive ArrayPool rent size from buffer size to + // avoid defining another option. + _arrayPoolInitialSize = opts.WriterBufferSize / 256; + _counter = counter; _defaultCommandTimeout = overrideCommandTimeout ?? opts.CommandTimeout; _enqueuePing = enqueuePing; - _opts = opts; + _protocolWriter = new ProtocolWriter(opts.SubjectEncoding); + _channelLock = Channel.CreateBounded(1); + _channelSize = Channel.CreateUnbounded(new UnboundedChannelOptions { SingleWriter = true, SingleReader = true }); + _headerWriter = new HeaderWriter(opts.HeaderEncoding); + _cts = new CancellationTokenSource(); + var pipe = new Pipe(new PipeOptions( pauseWriterThreshold: opts.WriterBufferSize, // flush will block after hitting - resumeWriterThreshold: opts.WriterBufferSize / 2, // will start flushing again after catching up - minimumSegmentSize: 16384, // segment that is part of an uninterrupted payload can be sent using socket.send + resumeWriterThreshold: opts.WriterBufferSize / 2, useSynchronizationContext: false)); - PipeReader = pipe.Reader; + _pipeReader = pipe.Reader; _pipeWriter = pipe.Writer; - _protocolWriter = new ProtocolWriter(_pipeWriter, opts.SubjectEncoding, opts.HeaderEncoding); - var channel = Channel.CreateUnbounded(new UnboundedChannelOptions { SingleWriter = true, SingleReader = true }); - _semLock = new SemaphoreSlim(1); - QueuedCommandsReader = channel.Reader; - _queuedCommandsWriter = channel.Writer; - } - - public PipeReader PipeReader { get; } - - public ChannelReader QueuedCommandsReader { get; } - - public Queue InFlightCommands { get; } = new(); - - public async ValueTask DisposeAsync() - { - await _semLock.WaitAsync().ConfigureAwait(false); - try - { - if (_disposed) - { - return; - } - _disposed = true; - _queuedCommandsWriter.Complete(); - await _pipeWriter.CompleteAsync().ConfigureAwait(false); - } - finally - { - _semLock.Release(); - } + // We need a new ObjectPool here because of the root token (_cts.Token). + // When the root token is cancelled as this object is disposed, cancellation + // objects in the pooled CancellationTimer should not be reused since the + // root token would already be cancelled which means CancellationTimer tokens + // would always be in a cancelled state. + _ctPool = new CancellationTimerPool(new ObjectPool(opts.ObjectPoolSize), _cts.Token); } - public NatsPipeliningWriteProtocolProcessor CreateNatsPipeliningWriteProtocolProcessor(ISocketConnection socketConnection) => new(socketConnection, this, _opts, _counter); - - public ValueTask ConnectAsync(ClientOpts connectOpts, CancellationToken cancellationToken) + public void Reset(ISocketConnection socketConnection) { -#pragma warning disable CA2016 -#pragma warning disable VSTHRD103 - if (!_semLock.Wait(0)) -#pragma warning restore VSTHRD103 -#pragma warning restore CA2016 - { - return ConnectStateMachineAsync(false, connectOpts, cancellationToken); - } - - if (_flushTask is { IsCompletedSuccessfully: false }) - { - return ConnectStateMachineAsync(true, connectOpts, cancellationToken); - } - - try + lock (_lock) { - if (_disposed) - { - throw new ObjectDisposedException(nameof(CommandWriter)); - } + _socketConnection = socketConnection; + _ctsReader = new CancellationTokenSource(); - var success = false; - try - { - _protocolWriter.WriteConnect(connectOpts); - success = true; - } - finally + _readerLoopTask = Task.Run(async () => { - EnqueueCommand(success); - } - } - finally - { - _semLock.Release(); + await ReaderLoopAsync(_logger, _socketConnection, _pipeReader, _channelSize, _ctsReader.Token).ConfigureAwait(false); + }); } - - return ValueTask.CompletedTask; } - public ValueTask PingAsync(PingCommand pingCommand, CancellationToken cancellationToken) + public async Task CancelReaderLoopAsync() { -#pragma warning disable CA2016 -#pragma warning disable VSTHRD103 - if (!_semLock.Wait(0)) -#pragma warning restore VSTHRD103 -#pragma warning restore CA2016 - { - return PingStateMachineAsync(false, pingCommand, cancellationToken); - } - - if (_flushTask is { IsCompletedSuccessfully: false }) + CancellationTokenSource? cts; + Task? readerTask; + lock (_lock) { - return PingStateMachineAsync(true, pingCommand, cancellationToken); + cts = _ctsReader; + readerTask = _readerLoopTask; } - try + if (cts != null) { - if (_disposed) - { - throw new ObjectDisposedException(nameof(CommandWriter)); - } - - var success = false; - try - { - _protocolWriter.WritePing(); - _enqueuePing(pingCommand); - success = true; - } - finally - { - EnqueueCommand(success); - } - } - finally - { - _semLock.Release(); +#if NET6_0 + cts.Cancel(); +#else + await cts.CancelAsync().ConfigureAwait(false); +#endif } - return ValueTask.CompletedTask; + if (readerTask != null) + await readerTask.WaitAsync(TimeSpan.FromSeconds(3), _cts.Token).ConfigureAwait(false); } - public ValueTask PongAsync(CancellationToken cancellationToken = default) + public async ValueTask DisposeAsync() { -#pragma warning disable CA2016 -#pragma warning disable VSTHRD103 - if (!_semLock.Wait(0)) -#pragma warning restore VSTHRD103 -#pragma warning restore CA2016 + if (_disposed) { - return PongStateMachineAsync(false, cancellationToken); + return; } - if (_flushTask is { IsCompletedSuccessfully: false }) - { - return PongStateMachineAsync(true, cancellationToken); - } + _disposed = true; - try - { - if (_disposed) - { - throw new ObjectDisposedException(nameof(CommandWriter)); - } +#if NET6_0 + _cts.Cancel(); +#else + await _cts.CancelAsync().ConfigureAwait(false); +#endif - var success = false; - try - { - _protocolWriter.WritePong(); - success = true; - } - finally - { - EnqueueCommand(success); - } - } - finally + _channelLock.Writer.TryComplete(); + _channelSize.Writer.TryComplete(); + await _pipeWriter.CompleteAsync().ConfigureAwait(false); + + Task? readerTask; + lock (_lock) { - _semLock.Release(); + readerTask = _readerLoopTask; } - return ValueTask.CompletedTask; + if (readerTask != null) + await readerTask.ConfigureAwait(false); } - public ValueTask PublishAsync(string subject, T? value, NatsHeaders? headers, string? replyTo, INatsSerialize serializer, CancellationToken cancellationToken) + public async ValueTask ConnectAsync(ClientOpts connectOpts, CancellationToken cancellationToken) { -#pragma warning disable CA2016 -#pragma warning disable VSTHRD103 - if (!_semLock.Wait(0)) -#pragma warning restore VSTHRD103 -#pragma warning restore CA2016 - { - return PublishStateMachineAsync(false, subject, value, headers, replyTo, serializer, cancellationToken); - } - - if (_flushTask is { IsCompletedSuccessfully: false }) - { - return PublishStateMachineAsync(true, subject, value, headers, replyTo, serializer, cancellationToken); - } - + var cancellationTimer = _ctPool.Start(_defaultCommandTimeout, cancellationToken); + await LockAsync(cancellationTimer.Token).ConfigureAwait(false); try { if (_disposed) @@ -231,84 +152,28 @@ public ValueTask PublishAsync(string subject, T? value, NatsHeaders? headers, throw new ObjectDisposedException(nameof(CommandWriter)); } - var trim = 0; - var success = false; - try - { - trim = _protocolWriter.WritePublish(subject, value, headers, replyTo, serializer); - success = true; - } - finally + if (_flushTask is { IsCompletedSuccessfully: false }) { - EnqueueCommand(success, trim: trim); + await _flushTask.WaitAsync(cancellationTimer.Token).ConfigureAwait(false); } - } - finally - { - _semLock.Release(); - } - return ValueTask.CompletedTask; - } - - public ValueTask SubscribeAsync(int sid, string subject, string? queueGroup, int? maxMsgs, CancellationToken cancellationToken) - { -#pragma warning disable CA2016 -#pragma warning disable VSTHRD103 - if (!_semLock.Wait(0)) -#pragma warning restore VSTHRD103 -#pragma warning restore CA2016 - { - return SubscribeStateMachineAsync(false, sid, subject, queueGroup, maxMsgs, cancellationToken); - } - - if (_flushTask is { IsCompletedSuccessfully: false }) - { - return SubscribeStateMachineAsync(true, sid, subject, queueGroup, maxMsgs, cancellationToken); - } - - try - { - if (_disposed) - { - throw new ObjectDisposedException(nameof(CommandWriter)); - } + _protocolWriter.WriteConnect(_pipeWriter, connectOpts); - var success = false; - try - { - _protocolWriter.WriteSubscribe(sid, subject, queueGroup, maxMsgs); - success = true; - } - finally - { - EnqueueCommand(success); - } + _channelSize.Writer.TryWrite((int)_pipeWriter.UnflushedBytes); + var flush = _pipeWriter.FlushAsync(CancellationToken.None); + _flushTask = flush.IsCompletedSuccessfully ? null : flush.AsTask(); } finally { - _semLock.Release(); + await UnLockAsync().ConfigureAwait(false); + cancellationTimer.TryReturn(); } - - return ValueTask.CompletedTask; } - public ValueTask UnsubscribeAsync(int sid, CancellationToken cancellationToken) + public async ValueTask PingAsync(PingCommand pingCommand, CancellationToken cancellationToken) { -#pragma warning disable CA2016 -#pragma warning disable VSTHRD103 - if (!_semLock.Wait(0)) -#pragma warning restore VSTHRD103 -#pragma warning restore CA2016 - { - return UnsubscribeStateMachineAsync(false, sid, cancellationToken); - } - - if (_flushTask is { IsCompletedSuccessfully: false }) - { - return UnsubscribeStateMachineAsync(true, sid, cancellationToken); - } - + var cancellationTimer = _ctPool.Start(_defaultCommandTimeout, cancellationToken); + await LockAsync(cancellationTimer.Token).ConfigureAwait(false); try { if (_disposed) @@ -316,55 +181,29 @@ public ValueTask UnsubscribeAsync(int sid, CancellationToken cancellationToken) throw new ObjectDisposedException(nameof(CommandWriter)); } - var success = false; - try - { - _protocolWriter.WriteUnsubscribe(sid, null); - success = true; - } - finally - { - EnqueueCommand(success); - } - } - finally - { - _semLock.Release(); - } - - return ValueTask.CompletedTask; - } - - // only used for internal testing - internal async Task TestStallFlushAsync(TimeSpan timeSpan) - { - await _semLock.WaitAsync().ConfigureAwait(false); - - try - { if (_flushTask is { IsCompletedSuccessfully: false }) { - await _flushTask.ConfigureAwait(false); + await _flushTask.WaitAsync(cancellationTimer.Token).ConfigureAwait(false); } - _flushTask = Task.Delay(timeSpan); + _enqueuePing(pingCommand); + _protocolWriter.WritePing(_pipeWriter); + + _channelSize.Writer.TryWrite((int)_pipeWriter.UnflushedBytes); + var flush = _pipeWriter.FlushAsync(CancellationToken.None); + _flushTask = flush.IsCompletedSuccessfully ? null : flush.AsTask(); } finally { - _semLock.Release(); + await UnLockAsync().ConfigureAwait(false); + cancellationTimer.TryReturn(); } } - private async ValueTask ConnectStateMachineAsync(bool lockHeld, ClientOpts connectOpts, CancellationToken cancellationToken) + public async ValueTask PongAsync(CancellationToken cancellationToken = default) { - if (!lockHeld) - { - if (!await _semLock.WaitAsync(_defaultCommandTimeout, cancellationToken).ConfigureAwait(false)) - { - throw new TimeoutException(); - } - } - + var cancellationTimer = _ctPool.Start(_defaultCommandTimeout, cancellationToken); + await LockAsync(cancellationTimer.Token).ConfigureAwait(false); try { if (_disposed) @@ -374,76 +213,45 @@ private async ValueTask ConnectStateMachineAsync(bool lockHeld, ClientOpts conne if (_flushTask is { IsCompletedSuccessfully: false }) { - await _flushTask.WaitAsync(_defaultCommandTimeout, cancellationToken).ConfigureAwait(false); + await _flushTask.WaitAsync(cancellationTimer.Token).ConfigureAwait(false); } - var success = false; - try - { - _protocolWriter.WriteConnect(connectOpts); - success = true; - } - finally - { - EnqueueCommand(success); - } + _protocolWriter.WritePong(_pipeWriter); + + _channelSize.Writer.TryWrite((int)_pipeWriter.UnflushedBytes); + var flush = _pipeWriter.FlushAsync(CancellationToken.None); + _flushTask = flush.IsCompletedSuccessfully ? null : flush.AsTask(); } finally { - _semLock.Release(); + await UnLockAsync().ConfigureAwait(false); + cancellationTimer.TryReturn(); } } - private async ValueTask PingStateMachineAsync(bool lockHeld, PingCommand pingCommand, CancellationToken cancellationToken) + public ValueTask PublishAsync(string subject, T? value, NatsHeaders? headers, string? replyTo, INatsSerialize serializer, CancellationToken cancellationToken) { - if (!lockHeld) + NatsPooledBufferWriter? headersBuffer = null; + if (headers != null) { - if (!await _semLock.WaitAsync(_defaultCommandTimeout, cancellationToken).ConfigureAwait(false)) - { - throw new TimeoutException(); - } + if (!_pool.TryRent(out headersBuffer)) + headersBuffer = new NatsPooledBufferWriter(_arrayPoolInitialSize); + _headerWriter.Write(headersBuffer, headers); } - try - { - if (_disposed) - { - throw new ObjectDisposedException(nameof(CommandWriter)); - } - - if (_flushTask is { IsCompletedSuccessfully: false }) - { - await _flushTask.WaitAsync(_defaultCommandTimeout, cancellationToken).ConfigureAwait(false); - } + NatsPooledBufferWriter payloadBuffer; + if (!_pool.TryRent(out payloadBuffer!)) + payloadBuffer = new NatsPooledBufferWriter(_arrayPoolInitialSize); + if (value != null) + serializer.Serialize(payloadBuffer, value); - var success = false; - try - { - _protocolWriter.WritePing(); - _enqueuePing(pingCommand); - success = true; - } - finally - { - EnqueueCommand(success); - } - } - finally - { - _semLock.Release(); - } + return PublishLockedAsync(subject, replyTo, payloadBuffer, headersBuffer, cancellationToken); } - private async ValueTask PongStateMachineAsync(bool lockHeld, CancellationToken cancellationToken) + public async ValueTask SubscribeAsync(int sid, string subject, string? queueGroup, int? maxMsgs, CancellationToken cancellationToken) { - if (!lockHeld) - { - if (!await _semLock.WaitAsync(_defaultCommandTimeout, cancellationToken).ConfigureAwait(false)) - { - throw new TimeoutException(); - } - } - + var cancellationTimer = _ctPool.Start(_defaultCommandTimeout, cancellationToken); + await LockAsync(cancellationTimer.Token).ConfigureAwait(false); try { if (_disposed) @@ -453,36 +261,26 @@ private async ValueTask PongStateMachineAsync(bool lockHeld, CancellationToken c if (_flushTask is { IsCompletedSuccessfully: false }) { - await _flushTask.WaitAsync(_defaultCommandTimeout, cancellationToken).ConfigureAwait(false); + await _flushTask.WaitAsync(cancellationTimer.Token).ConfigureAwait(false); } - var success = false; - try - { - _protocolWriter.WritePong(); - success = true; - } - finally - { - EnqueueCommand(success); - } + _protocolWriter.WriteSubscribe(_pipeWriter, sid, subject, queueGroup, maxMsgs); + + _channelSize.Writer.TryWrite((int)_pipeWriter.UnflushedBytes); + var flush = _pipeWriter.FlushAsync(CancellationToken.None); + _flushTask = flush.IsCompletedSuccessfully ? null : flush.AsTask(); } finally { - _semLock.Release(); + await UnLockAsync().ConfigureAwait(false); + cancellationTimer.TryReturn(); } } - private async ValueTask PublishStateMachineAsync(bool lockHeld, string subject, T? value, NatsHeaders? headers, string? replyTo, INatsSerialize serializer, CancellationToken cancellationToken) + public async ValueTask UnsubscribeAsync(int sid, int? maxMsgs, CancellationToken cancellationToken) { - if (!lockHeld) - { - if (!await _semLock.WaitAsync(_defaultCommandTimeout, cancellationToken).ConfigureAwait(false)) - { - throw new TimeoutException(); - } - } - + var cancellationTimer = _ctPool.Start(_defaultCommandTimeout, cancellationToken); + await LockAsync(cancellationTimer.Token).ConfigureAwait(false); try { if (_disposed) @@ -492,167 +290,219 @@ private async ValueTask PublishStateMachineAsync(bool lockHeld, string subjec if (_flushTask is { IsCompletedSuccessfully: false }) { - await _flushTask.WaitAsync(_defaultCommandTimeout, cancellationToken).ConfigureAwait(false); + await _flushTask.WaitAsync(cancellationTimer.Token).ConfigureAwait(false); } - var trim = 0; - var success = false; - try - { - trim = _protocolWriter.WritePublish(subject, value, headers, replyTo, serializer); - success = true; - } - finally - { - EnqueueCommand(success, trim: trim); - } + _protocolWriter.WriteUnsubscribe(_pipeWriter, sid, maxMsgs); + + _channelSize.Writer.TryWrite((int)_pipeWriter.UnflushedBytes); + var flush = _pipeWriter.FlushAsync(CancellationToken.None); + _flushTask = flush.IsCompletedSuccessfully ? null : flush.AsTask(); } finally { - _semLock.Release(); + await UnLockAsync().ConfigureAwait(false); + cancellationTimer.TryReturn(); } } - private async ValueTask SubscribeStateMachineAsync(bool lockHeld, int sid, string subject, string? queueGroup, int? maxMsgs, CancellationToken cancellationToken) - { - if (!lockHeld) - { - if (!await _semLock.WaitAsync(_defaultCommandTimeout, cancellationToken).ConfigureAwait(false)) - { - throw new TimeoutException(); - } - } + // only used for internal testing + internal bool TestStallFlush() => _channelLock.Writer.TryWrite(1); + private static async Task ReaderLoopAsync(ILogger logger, ISocketConnection connection, PipeReader pipeReader, Channel channelSize, CancellationToken cancellationToken) + { try { - if (_disposed) - { - throw new ObjectDisposedException(nameof(CommandWriter)); - } - - if (_flushTask is { IsCompletedSuccessfully: false }) - { - await _flushTask.WaitAsync(_defaultCommandTimeout, cancellationToken).ConfigureAwait(false); - } - - var success = false; - try - { - _protocolWriter.WriteSubscribe(sid, subject, queueGroup, maxMsgs); - success = true; - } - finally - { - EnqueueCommand(success); - } - } - finally - { - _semLock.Release(); + var examinedOffset = 0; + while (true) + { + var result = await pipeReader.ReadAsync(cancellationToken).ConfigureAwait(false); + + if (result.IsCanceled) + { + break; + } + + var buffer = result.Buffer; + var consumed = buffer.Start; + var examined = buffer.GetPosition(examinedOffset); + var readBuffer = buffer.Slice(examinedOffset); + + try + { + if (!buffer.IsEmpty && !readBuffer.IsEmpty) + { + var bufferLength = (int)readBuffer.Length; + + var bytes = ArrayPool.Shared.Rent(bufferLength); + readBuffer.CopyTo(bytes); + var memory = bytes.AsMemory(0, bufferLength); + + try + { + var totalSent = 0; + var totalSize = 0; + while (totalSent < bufferLength) + { + var sendMemory = memory; + if (sendMemory.Length > MaxSendSize) + { + // cap the send size, the OS can only handle so much in a send buffer at a time + // also if the send fails, we have to throw this many bytes away + sendMemory = memory[..MaxSendSize]; + } + + int sent; + Exception? sendEx = null; + try + { + sent = await connection.SendAsync(sendMemory).ConfigureAwait(false); + } + catch (Exception ex) + { + // we have no idea how many bytes were actually sent, so we have to assume they all were + // this could result in message loss, but is consistent with at-most once delivery + sendEx = ex; + sent = sendMemory.Length; + } + + totalSent += sent; + memory = memory[sent..]; + + while (totalSize < totalSent) + { + int peek; + while (!channelSize.Reader.TryPeek(out peek)) + { + // should never happen; channel sizes are written before flush is called + await channelSize.Reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false); + } + + // Don't just mark the message as complete if we have more data to send + if (totalSize + peek > totalSent) + { + break; + } + + int size; + while (!channelSize.Reader.TryRead(out size)) + { + // should never happen; channel sizes are written before flush is called (plus we just peeked) + await channelSize.Reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false); + } + + totalSize += size; + examinedOffset = 0; + } + + // make sure to mark the buffer only at message boundaries. + consumed = buffer.GetPosition(totalSize); + examined = buffer.GetPosition(totalSent); + examinedOffset += totalSent - totalSize; + + // throw if there was a send failure + if (sendEx != null) + { + throw sendEx; + } + } + } + finally + { + ArrayPool.Shared.Return(bytes); + } + } + } + finally + { + // Always examine to the end to potentially unblock writer + pipeReader.AdvanceTo(consumed, examined); + } + + if (result.IsCompleted) + { + break; + } + } + } + catch (OperationCanceledException) + { + // Expected during shutdown + } + catch (InvalidOperationException) + { + // We might still be using the previous pipe reader which might be completed already + } + catch (Exception e) + { + logger.LogError(NatsLogEvents.Buffer, e, "Unexpected error in send buffer reader loop"); } } - private async ValueTask UnsubscribeStateMachineAsync(bool lockHeld, int sid, CancellationToken cancellationToken) + [AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder))] + private async ValueTask PublishLockedAsync(string subject, string? replyTo, NatsPooledBufferWriter payloadBuffer, NatsPooledBufferWriter? headersBuffer, CancellationToken cancellationToken) { - if (!lockHeld) - { - if (!await _semLock.WaitAsync(_defaultCommandTimeout, cancellationToken).ConfigureAwait(false)) - { - throw new TimeoutException(); - } - } - + var cancellationTimer = _ctPool.Start(_defaultCommandTimeout, cancellationToken); + await LockAsync(cancellationTimer.Token).ConfigureAwait(false); try { + var payload = payloadBuffer.WrittenMemory; + var headers = headersBuffer?.WrittenMemory; + if (_disposed) { throw new ObjectDisposedException(nameof(CommandWriter)); } - if (_flushTask is { IsCompletedSuccessfully: false }) - { - await _flushTask.WaitAsync(_defaultCommandTimeout, cancellationToken).ConfigureAwait(false); - } + _protocolWriter.WritePublish(_pipeWriter, subject, replyTo, headers, payload); - var success = false; - try + payloadBuffer.Reset(); + _pool.Return(payloadBuffer); + + if (headersBuffer != null) { - _protocolWriter.WriteUnsubscribe(sid, null); - success = true; + headersBuffer.Reset(); + _pool.Return(headersBuffer); } - finally + + var size = (int)_pipeWriter.UnflushedBytes; + _channelSize.Writer.TryWrite(size); + + var result = await _pipeWriter.FlushAsync(cancellationTimer.Token).ConfigureAwait(false); + if (result.IsCanceled) { - EnqueueCommand(success); + throw new OperationCanceledException(); } } finally { - _semLock.Release(); + await UnLockAsync().ConfigureAwait(false); + cancellationTimer.TryReturn(); } } - /// - /// Enqueues a command, and kicks off a flush - /// - /// - /// Whether the command was successful - /// If true, it will be sent on the wire - /// If false, it will be thrown out - /// - /// - /// Number of bytes to skip from beginning of message - /// when sending on the wire - /// [MethodImpl(MethodImplOptions.AggressiveInlining)] - private void EnqueueCommand(bool success, int trim = 0) + private async ValueTask LockAsync(CancellationToken cancellationToken) { - var size = (int)_pipeWriter.UnflushedBytes; - if (size == 0) + Interlocked.Increment(ref _counter.PendingMessages); + try { - // no unflushed bytes means no command was produced - _flushTask = null; - return; + await _channelLock.Writer.WriteAsync(1, cancellationToken).ConfigureAwait(false); } - - if (success) + catch (TaskCanceledException) { - Interlocked.Add(ref _counter.PendingMessages, 1); + throw new OperationCanceledException(); + } + catch (ChannelClosedException) + { + throw new OperationCanceledException(); } - - _queuedCommandsWriter.TryWrite(new QueuedCommand(Size: size, Trim: success ? trim : size)); - var flush = _pipeWriter.FlushAsync(); - _flushTask = flush.IsCompletedSuccessfully ? null : flush.AsTask(); - } -} - -internal sealed class PriorityCommandWriter : IAsyncDisposable -{ - private readonly NatsPipeliningWriteProtocolProcessor _natsPipeliningWriteProtocolProcessor; - private int _disposed; - - public PriorityCommandWriter(ISocketConnection socketConnection, NatsOpts opts, ConnectionStatsCounter counter, Action enqueuePing) - { - CommandWriter = new CommandWriter(opts, counter, enqueuePing, overrideCommandTimeout: TimeSpan.MaxValue); - _natsPipeliningWriteProtocolProcessor = CommandWriter.CreateNatsPipeliningWriteProtocolProcessor(socketConnection); } - public CommandWriter CommandWriter { get; } - - public async ValueTask DisposeAsync() + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private ValueTask UnLockAsync() { - if (Interlocked.Increment(ref _disposed) == 1) - { - // disposing command writer marks pipe writer as complete - await CommandWriter.DisposeAsync().ConfigureAwait(false); - try - { - // write loop will complete once pipe reader completes - await _natsPipeliningWriteProtocolProcessor.WriteLoop.ConfigureAwait(false); - } - finally - { - await _natsPipeliningWriteProtocolProcessor.DisposeAsync().ConfigureAwait(false); - } - } + Interlocked.Decrement(ref _counter.PendingMessages); + return _channelLock.Reader.ReadAsync(_cts.Token); } } diff --git a/src/NATS.Client.Core/Commands/NatsPooledBufferWriter.cs b/src/NATS.Client.Core/Commands/NatsPooledBufferWriter.cs new file mode 100644 index 000000000..b62d1c617 --- /dev/null +++ b/src/NATS.Client.Core/Commands/NatsPooledBufferWriter.cs @@ -0,0 +1,208 @@ +using System.Buffers; +using System.Numerics; +using System.Runtime.CompilerServices; +using NATS.Client.Core.Internal; + +namespace NATS.Client.Core.Commands; + +// adapted from https://github.com/CommunityToolkit/dotnet/blob/v8.2.2/src/CommunityToolkit.HighPerformance/Buffers/ArrayPoolBufferWriter%7BT%7D.cs +internal sealed class NatsPooledBufferWriter : IBufferWriter, IObjectPoolNode> +{ + private const int DefaultInitialMinBufferSize = 256; + private const int DefaultInitialMaxBufferSize = 65536; + + private readonly ArrayPool _pool; + private readonly int _size; + private T[]? _array; + private int _index; + private NatsPooledBufferWriter? _next; + + public NatsPooledBufferWriter(int size) + { + if (size < DefaultInitialMinBufferSize) + { + size = DefaultInitialMinBufferSize; + } + + if (size > DefaultInitialMaxBufferSize) + { + size = DefaultInitialMaxBufferSize; + } + + _size = size; + _pool = ArrayPool.Shared; + _array = _pool.Rent(size); + _index = 0; + } + + public ref NatsPooledBufferWriter? NextNode => ref _next; + + /// + /// Gets the data written to the underlying buffer so far, as a . + /// + public ReadOnlyMemory WrittenMemory + { + get + { + var array = _array; + + if (array is null) + { + ThrowObjectDisposedException(); + } + + return array!.AsMemory(0, _index); + } + } + + /// + /// Gets the data written to the underlying buffer so far, as a . + /// + public ReadOnlySpan WrittenSpan + { + get + { + var array = _array; + + if (array is null) + { + ThrowObjectDisposedException(); + } + + return array!.AsSpan(0, _index); + } + } + + /// + /// Gets the amount of data written to the underlying buffer so far. + /// + public int WrittenCount + { + get => _index; + } + + /// + public void Advance(int count) + { + var array = _array; + + if (array is null) + { + ThrowObjectDisposedException(); + } + + if (count < 0) + { + ThrowArgumentOutOfRangeExceptionForNegativeCount(); + } + + if (_index > array!.Length - count) + { + ThrowArgumentExceptionForAdvancedTooFar(); + } + + _index += count; + } + + /// + public Memory GetMemory(int sizeHint = 0) + { + CheckBufferAndEnsureCapacity(sizeHint); + + return _array.AsMemory(_index); + } + + /// + public Span GetSpan(int sizeHint = 0) + { + CheckBufferAndEnsureCapacity(sizeHint); + + return _array.AsSpan(_index); + } + + public void Reset() + { + if (_array != null) + _pool.Return(_array); + _array = _pool.Rent(_size); + _index = 0; + } + + /// + public override string ToString() + { + // See comments in MemoryOwner about this + if (typeof(T) == typeof(char) && + _array is char[] chars) + { + return new(chars, 0, _index); + } + + // Same representation used in Span + return $"NatsPooledBufferWriter<{typeof(T)}>[{_index}]"; + } + + [MethodImpl(MethodImplOptions.NoInlining)] + private static void ThrowArgumentOutOfRangeExceptionForNegativeCount() => throw new ArgumentOutOfRangeException("count", "The count can't be a negative value."); + + [MethodImpl(MethodImplOptions.NoInlining)] + private static void ThrowArgumentOutOfRangeExceptionForNegativeSizeHint() => throw new ArgumentOutOfRangeException("sizeHint", "The size hint can't be a negative value."); + + [MethodImpl(MethodImplOptions.NoInlining)] + private static void ThrowArgumentExceptionForAdvancedTooFar() => throw new ArgumentException("The buffer writer has advanced too far."); + + [MethodImpl(MethodImplOptions.NoInlining)] + private static void ThrowObjectDisposedException() => throw new ObjectDisposedException("The current buffer has already been disposed."); + + /// + /// Ensures that has enough free space to contain a given number of new items. + /// + /// The minimum number of items to ensure space for in . + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private void CheckBufferAndEnsureCapacity(int sizeHint) + { + var array = _array; + + if (array is null) + { + ThrowObjectDisposedException(); + } + + if (sizeHint < 0) + { + ThrowArgumentOutOfRangeExceptionForNegativeSizeHint(); + } + + if (sizeHint == 0) + { + sizeHint = 1; + } + + if (sizeHint > array!.Length - _index) + { + ResizeBuffer(sizeHint); + } + } + + /// + /// Resizes to ensure it can fit the specified number of new items. + /// + /// The minimum number of items to ensure space for in . + [MethodImpl(MethodImplOptions.NoInlining)] + private void ResizeBuffer(int sizeHint) + { + var minimumSize = (uint)_index + (uint)sizeHint; + + // The ArrayPool class has a maximum threshold of 1024 * 1024 for the maximum length of + // pooled arrays, and once this is exceeded it will just allocate a new array every time + // of exactly the requested size. In that case, we manually round up the requested size to + // the nearest power of two, to ensure that repeated consecutive writes when the array in + // use is bigger than that threshold don't end up causing a resize every single time. + if (minimumSize > 1024 * 1024) + { + minimumSize = BitOperations.RoundUpToPowerOf2(minimumSize); + } + + _pool.Resize(ref _array, (int)minimumSize); + } +} diff --git a/src/NATS.Client.Core/Commands/PriorityCommandWriter.cs b/src/NATS.Client.Core/Commands/PriorityCommandWriter.cs new file mode 100644 index 000000000..3020e84f4 --- /dev/null +++ b/src/NATS.Client.Core/Commands/PriorityCommandWriter.cs @@ -0,0 +1,25 @@ +using NATS.Client.Core.Internal; + +namespace NATS.Client.Core.Commands; + +internal sealed class PriorityCommandWriter : IAsyncDisposable +{ + private int _disposed; + + public PriorityCommandWriter(ObjectPool pool, ISocketConnection socketConnection, NatsOpts opts, ConnectionStatsCounter counter, Action enqueuePing) + { + CommandWriter = new CommandWriter(pool, opts, counter, enqueuePing, overrideCommandTimeout: Timeout.InfiniteTimeSpan); + CommandWriter.Reset(socketConnection); + } + + public CommandWriter CommandWriter { get; } + + public async ValueTask DisposeAsync() + { + if (Interlocked.Increment(ref _disposed) == 1) + { + // disposing command writer marks pipe writer as complete + await CommandWriter.DisposeAsync().ConfigureAwait(false); + } + } +} diff --git a/src/NATS.Client.Core/Commands/ProtocolWriter.cs b/src/NATS.Client.Core/Commands/ProtocolWriter.cs index fe7bd9e6e..6d477c0d9 100644 --- a/src/NATS.Client.Core/Commands/ProtocolWriter.cs +++ b/src/NATS.Client.Core/Commands/ProtocolWriter.cs @@ -1,6 +1,6 @@ +using System.Buffers; using System.Buffers.Binary; using System.Buffers.Text; -using System.IO.Pipelines; using System.Runtime.CompilerServices; using System.Text; using System.Text.Json; @@ -36,47 +36,40 @@ internal sealed class ProtocolWriter private static readonly ulong PongNewLine = BinaryPrimitives.ReadUInt64LittleEndian("PONG\r\n "u8); private static readonly ulong UnsubSpace = BinaryPrimitives.ReadUInt64LittleEndian("UNSUB "u8); - private readonly PipeWriter _writer; - private readonly HeaderWriter _headerWriter; private readonly Encoding _subjectEncoding; - public ProtocolWriter(PipeWriter writer, Encoding subjectEncoding, Encoding headerEncoding) - { - _writer = writer; - _subjectEncoding = subjectEncoding; - _headerWriter = new HeaderWriter(writer, headerEncoding); - } + public ProtocolWriter(Encoding subjectEncoding) => _subjectEncoding = subjectEncoding; // https://docs.nats.io/reference/reference-protocols/nats-protocol#connect // CONNECT {["option_name":option_value],...} - public void WriteConnect(ClientOpts opts) + public void WriteConnect(IBufferWriter writer, ClientOpts opts) { - var span = _writer.GetSpan(UInt64Length); + var span = writer.GetSpan(UInt64Length); BinaryPrimitives.WriteUInt64LittleEndian(span, ConnectSpace); - _writer.Advance(ConnectSpaceLength); + writer.Advance(ConnectSpaceLength); - var jsonWriter = new Utf8JsonWriter(_writer); + var jsonWriter = new Utf8JsonWriter(writer); JsonSerializer.Serialize(jsonWriter, opts, JsonContext.Default.ClientOpts); - span = _writer.GetSpan(UInt16Length); + span = writer.GetSpan(UInt16Length); BinaryPrimitives.WriteUInt16LittleEndian(span, NewLine); - _writer.Advance(NewLineLength); + writer.Advance(NewLineLength); } // https://docs.nats.io/reference/reference-protocols/nats-protocol#ping-pong - public void WritePing() + public void WritePing(IBufferWriter writer) { - var span = _writer.GetSpan(UInt64Length); + var span = writer.GetSpan(UInt64Length); BinaryPrimitives.WriteUInt64LittleEndian(span, PingNewLine); - _writer.Advance(PingNewLineLength); + writer.Advance(PingNewLineLength); } // https://docs.nats.io/reference/reference-protocols/nats-protocol#ping-pong - public void WritePong() + public void WritePong(IBufferWriter writer) { - var span = _writer.GetSpan(UInt64Length); + var span = writer.GetSpan(UInt64Length); BinaryPrimitives.WriteUInt64LittleEndian(span, PongNewLine); - _writer.Advance(PongNewLineLength); + writer.Advance(PongNewLineLength); } // https://docs.nats.io/reference/reference-protocols/nats-protocol#pub @@ -84,124 +77,21 @@ public void WritePong() // or // https://docs.nats.io/reference/reference-protocols/nats-protocol#hpub // HPUB [reply-to] <#header bytes> <#total bytes>\r\n[headers]\r\n\r\n[payload]\r\n - // - // returns the number of bytes that should be skipped when writing to the wire - public int WritePublish(string subject, T? value, NatsHeaders? headers, string? replyTo, INatsSerialize serializer) + public void WritePublish(IBufferWriter writer, string subject, string? replyTo, ReadOnlyMemory? headers, ReadOnlyMemory payload) { - int ctrlLen; - if (headers == null) - { - // 'PUB ' + subject +' '+ payload len +'\r\n' - ctrlLen = PubSpaceLength + _subjectEncoding.GetByteCount(subject) + 1 + MaxIntStringLength + NewLineLength; - } - else - { - // 'HPUB ' + subject +' '+ header len +' '+ payload len +'\r\n' - ctrlLen = HpubSpaceLength + _subjectEncoding.GetByteCount(subject) + 1 + MaxIntStringLength + 1 + MaxIntStringLength + NewLineLength; - } - - if (replyTo != null) - { - // len += replyTo +' ' - ctrlLen += _subjectEncoding.GetByteCount(replyTo) + 1; - } - - var ctrlSpan = _writer.GetSpan(ctrlLen); - var span = ctrlSpan; - if (headers == null) - { - BinaryPrimitives.WriteUInt32LittleEndian(span, PubSpace); - span = span[PubSpaceLength..]; - } - else - { - BinaryPrimitives.WriteUInt64LittleEndian(span, HpubSpace); - span = span[HpubSpaceLength..]; - } - - var written = _subjectEncoding.GetBytes(subject, span); - span[written] = (byte)' '; - span = span[(written + 1)..]; - - if (replyTo != null) - { - written = _subjectEncoding.GetBytes(replyTo, span); - span[written] = (byte)' '; - span = span[(written + 1)..]; - } - - Span lenSpan; if (headers == null) { - // len = payload len - lenSpan = span[..MaxIntStringLength]; - span = span[lenSpan.Length..]; + WritePub(writer, subject, replyTo, payload); } else { - // len = header len +' '+ payload len - lenSpan = span[..(MaxIntStringLength + 1 + MaxIntStringLength)]; - span = span[lenSpan.Length..]; + WriteHpub(writer, subject, replyTo, headers.Value, payload); } - - BinaryPrimitives.WriteUInt16LittleEndian(span, NewLine); - _writer.Advance(ctrlLen); - - var headersLength = 0L; - var totalLength = 0L; - if (headers != null) - { - headersLength = _headerWriter.Write(headers); - totalLength += headersLength; - } - - // Consider null as empty payload. This way we are able to transmit null values as sentinels. - // Another point is serializer behaviour. For instance JSON serializer seems to serialize null - // as a string "null", others might throw exception. - if (value != null) - { - var initialCount = _writer.UnflushedBytes; - serializer.Serialize(_writer, value); - totalLength += _writer.UnflushedBytes - initialCount; - } - - span = _writer.GetSpan(UInt16Length); - BinaryPrimitives.WriteUInt16LittleEndian(span, NewLine); - _writer.Advance(NewLineLength); - - // write the length - var lenWritten = 0; - if (headers != null) - { - if (!Utf8Formatter.TryFormat(headersLength, lenSpan, out lenWritten)) - { - ThrowOnUtf8FormatFail(); - } - - lenSpan[lenWritten] = (byte)' '; - lenWritten += 1; - } - - if (!Utf8Formatter.TryFormat(totalLength, lenSpan[lenWritten..], out var tLen)) - { - ThrowOnUtf8FormatFail(); - } - - lenWritten += tLen; - var trim = lenSpan.Length - lenWritten; - if (trim > 0) - { - // shift right - ctrlSpan[..(ctrlLen - trim - NewLineLength)].CopyTo(ctrlSpan[trim..]); - ctrlSpan[..trim].Clear(); - } - - return trim; } // https://docs.nats.io/reference/reference-protocols/nats-protocol#sub // SUB [queue group] - public void WriteSubscribe(int sid, string subject, string? queueGroup, int? maxMsgs) + public void WriteSubscribe(IBufferWriter writer, int sid, string subject, string? queueGroup, int? maxMsgs) { // 'SUB ' + subject +' '+ sid +'\r\n' var ctrlLen = SubSpaceLength + _subjectEncoding.GetByteCount(subject) + 1 + MaxIntStringLength + NewLineLength; @@ -212,7 +102,7 @@ public void WriteSubscribe(int sid, string subject, string? queueGroup, int? max ctrlLen += _subjectEncoding.GetByteCount(queueGroup) + 1; } - var span = _writer.GetSpan(ctrlLen); + var span = writer.GetSpan(ctrlLen); BinaryPrimitives.WriteUInt32LittleEndian(span, SubSpace); var size = SubSpaceLength; span = span[SubSpaceLength..]; @@ -241,20 +131,20 @@ public void WriteSubscribe(int sid, string subject, string? queueGroup, int? max BinaryPrimitives.WriteUInt16LittleEndian(span, NewLine); size += NewLineLength; - _writer.Advance(size); + writer.Advance(size); // Immediately send UNSUB to minimize the risk of // receiving more messages than in case they are published // between our SUB and UNSUB calls. if (maxMsgs != null) { - WriteUnsubscribe(sid, maxMsgs); + WriteUnsubscribe(writer, sid, maxMsgs); } } // https://docs.nats.io/reference/reference-protocols/nats-protocol#unsub // UNSUB [max_msgs] - public void WriteUnsubscribe(int sid, int? maxMessages) + public void WriteUnsubscribe(IBufferWriter writer, int sid, int? maxMessages) { // 'UNSUB ' + sid +'\r\n' var ctrlLen = UnsubSpaceLength + MaxIntStringLength + NewLineLength; @@ -264,7 +154,7 @@ public void WriteUnsubscribe(int sid, int? maxMessages) ctrlLen += 1 + MaxIntStringLength; } - var span = _writer.GetSpan(ctrlLen); + var span = writer.GetSpan(ctrlLen); BinaryPrimitives.WriteUInt64LittleEndian(span, UnsubSpace); var size = UnsubSpaceLength; span = span[UnsubSpaceLength..]; @@ -291,10 +181,135 @@ public void WriteUnsubscribe(int sid, int? maxMessages) BinaryPrimitives.WriteUInt16LittleEndian(span, NewLine); size += NewLineLength; - _writer.Advance(size); + writer.Advance(size); } // optimization detailed here: https://github.com/nats-io/nats.net.v2/issues/320#issuecomment-1886165748 [MethodImpl(MethodImplOptions.NoInlining)] private static void ThrowOnUtf8FormatFail() => throw new NatsException("Can not format integer."); + + // PUB [reply-to] <#bytes>\r\n[payload]\r\n + private void WritePub(IBufferWriter writer, string subject, string? replyTo, ReadOnlyMemory payload) + { + Span spanPayloadLength = stackalloc byte[MaxIntStringLength]; + if (!Utf8Formatter.TryFormat(payload.Length, spanPayloadLength, out var payloadLengthWritten)) + { + ThrowOnUtf8FormatFail(); + } + + spanPayloadLength = spanPayloadLength.Slice(0, payloadLengthWritten); + + var total = PubSpaceLength; + + var subjectSpaceLength = _subjectEncoding.GetByteCount(subject) + 1; + total += subjectSpaceLength; + + var replyToLengthSpace = 0; + if (replyTo != null) + { + replyToLengthSpace = _subjectEncoding.GetByteCount(replyTo) + 1; + total += replyToLengthSpace; + } + + total += spanPayloadLength.Length + NewLineLength + payload.Length + NewLineLength; + + var span = writer.GetSpan(total); + + BinaryPrimitives.WriteUInt32LittleEndian(span, PubSpace); + span = span.Slice(PubSpaceLength); + + _subjectEncoding.GetBytes(subject, span); + span[subjectSpaceLength - 1] = (byte)' '; + span = span.Slice(subjectSpaceLength); + + if (replyTo != null) + { + _subjectEncoding.GetBytes(replyTo, span); + span[replyToLengthSpace - 1] = (byte)' '; + span = span.Slice(replyToLengthSpace); + } + + spanPayloadLength.CopyTo(span); + span = span.Slice(spanPayloadLength.Length); + + BinaryPrimitives.WriteUInt16LittleEndian(span, NewLine); + span = span.Slice(NewLineLength); + + payload.Span.CopyTo(span); + span = span.Slice(payload.Length); + + BinaryPrimitives.WriteUInt16LittleEndian(span, NewLine); + + writer.Advance(total); + } + + // HPUB [reply-to] <#header bytes> <#total bytes>\r\n[headers]\r\n\r\n[payload]\r\n + private void WriteHpub(IBufferWriter writer, string subject, string? replyTo, ReadOnlyMemory headers, ReadOnlyMemory payload) + { + Span spanPayloadLength = stackalloc byte[MaxIntStringLength]; + if (!Utf8Formatter.TryFormat(payload.Length + headers.Length, spanPayloadLength, out var payloadLengthWritten)) + { + ThrowOnUtf8FormatFail(); + } + + spanPayloadLength = spanPayloadLength.Slice(0, payloadLengthWritten); + + Span spanHeadersLength = stackalloc byte[MaxIntStringLength + 1]; + if (!Utf8Formatter.TryFormat(headers.Length, spanHeadersLength, out var headersLengthWritten)) + { + ThrowOnUtf8FormatFail(); + } + + spanHeadersLength = spanHeadersLength.Slice(0, headersLengthWritten + 1); + spanHeadersLength[headersLengthWritten] = (byte)' '; + + var total = HpubSpaceLength; + + var subjectSpaceLength = _subjectEncoding.GetByteCount(subject) + 1; + total += subjectSpaceLength; + + var replyToLengthSpace = 0; + if (replyTo != null) + { + replyToLengthSpace = _subjectEncoding.GetByteCount(replyTo) + 1; + total += replyToLengthSpace; + } + + total += spanHeadersLength.Length + spanPayloadLength.Length + NewLineLength + headers.Length + payload.Length + NewLineLength; + + var span = writer.GetSpan(total); + + BinaryPrimitives.WriteUInt64LittleEndian(span, HpubSpace); + span = span.Slice(HpubSpaceLength); + + _subjectEncoding.GetBytes(subject, span); + span[subjectSpaceLength - 1] = (byte)' '; + span = span.Slice(subjectSpaceLength); + + if (replyTo != null) + { + _subjectEncoding.GetBytes(replyTo, span); + span[replyToLengthSpace - 1] = (byte)' '; + span = span.Slice(replyToLengthSpace); + } + + spanHeadersLength.CopyTo(span); + span = span.Slice(spanHeadersLength.Length); + + spanPayloadLength.CopyTo(span); + span = span.Slice(spanPayloadLength.Length); + + BinaryPrimitives.WriteUInt16LittleEndian(span, NewLine); + span = span.Slice(NewLineLength); + + headers.Span.CopyTo(span); + span = span.Slice(headers.Length); + + payload.Span.CopyTo(span); + span = span.Slice(payload.Length); + + BinaryPrimitives.WriteUInt16LittleEndian(span, NewLine); + + writer.Advance(total); + } } diff --git a/src/NATS.Client.Core/Internal/CancellationTimer.cs b/src/NATS.Client.Core/Internal/CancellationTimer.cs index 537184ec6..6fe7b280d 100644 --- a/src/NATS.Client.Core/Internal/CancellationTimer.cs +++ b/src/NATS.Client.Core/Internal/CancellationTimer.cs @@ -66,7 +66,11 @@ public static CancellationTimer Start(ObjectPool pool, CancellationToken rootTok } self._timeout = timeout; - self._cancellationTokenSource.CancelAfter(timeout); + if (timeout != Timeout.InfiniteTimeSpan) + { + self._cancellationTokenSource.CancelAfter(timeout); + } + return self; } diff --git a/src/NATS.Client.Core/Internal/HeaderWriter.cs b/src/NATS.Client.Core/Internal/HeaderWriter.cs index 47b0d8c2e..8bb62694d 100644 --- a/src/NATS.Client.Core/Internal/HeaderWriter.cs +++ b/src/NATS.Client.Core/Internal/HeaderWriter.cs @@ -12,23 +12,19 @@ internal class HeaderWriter private const byte ByteColon = (byte)':'; private const byte ByteSpace = (byte)' '; private const byte ByteDel = 127; - private readonly PipeWriter _pipeWriter; + private readonly Encoding _encoding; - public HeaderWriter(PipeWriter pipeWriter, Encoding encoding) - { - _pipeWriter = pipeWriter; - _encoding = encoding; - } + public HeaderWriter(Encoding encoding) => _encoding = encoding; private static ReadOnlySpan CrLf => new[] { ByteCr, ByteLf }; private static ReadOnlySpan ColonSpace => new[] { ByteColon, ByteSpace }; - internal long Write(NatsHeaders headers) + internal long Write(IBufferWriter bufferWriter, NatsHeaders headers) { - var initialCount = _pipeWriter.UnflushedBytes; - _pipeWriter.WriteSpan(CommandConstants.NatsHeaders10NewLine); + bufferWriter.WriteSpan(CommandConstants.NatsHeaders10NewLine); + var len = CommandConstants.NatsHeaders10NewLine.Length; foreach (var kv in headers) { @@ -38,7 +34,7 @@ internal long Write(NatsHeaders headers) { // write key var keyLength = _encoding.GetByteCount(kv.Key); - var keySpan = _pipeWriter.GetSpan(keyLength); + var keySpan = bufferWriter.GetSpan(keyLength); _encoding.GetBytes(kv.Key, keySpan); if (!ValidateKey(keySpan.Slice(0, keyLength))) { @@ -46,20 +42,26 @@ internal long Write(NatsHeaders headers) $"Invalid header key '{kv.Key}': contains colon, space, or other non-printable ASCII characters"); } - _pipeWriter.Advance(keyLength); - _pipeWriter.Write(ColonSpace); + bufferWriter.Advance(keyLength); + len += keyLength; + + bufferWriter.Write(ColonSpace); + len += ColonSpace.Length; // write values var valueLength = _encoding.GetByteCount(value); - var valueSpan = _pipeWriter.GetSpan(valueLength); + var valueSpan = bufferWriter.GetSpan(valueLength); _encoding.GetBytes(value, valueSpan); if (!ValidateValue(valueSpan.Slice(0, valueLength))) { throw new NatsException($"Invalid header value for key '{kv.Key}': contains CRLF"); } - _pipeWriter.Advance(valueLength); - _pipeWriter.Write(CrLf); + bufferWriter.Advance(valueLength); + len += valueLength; + + bufferWriter.Write(CrLf); + len += CrLf.Length; } } } @@ -67,9 +69,10 @@ internal long Write(NatsHeaders headers) // Even empty header needs to terminate. // We will send NATS/1.0 version line // even if there are no headers. - _pipeWriter.Write(CrLf); + bufferWriter.Write(CrLf); + len += CrLf.Length; - return _pipeWriter.UnflushedBytes - initialCount; + return len; } // cannot contain ASCII Bytes <=32, 58, or 127 diff --git a/src/NATS.Client.Core/Internal/NatsPipeliningWriteProtocolProcessor.cs b/src/NATS.Client.Core/Internal/NatsPipeliningWriteProtocolProcessor.cs index 50661adaa..922cb5b95 100644 --- a/src/NATS.Client.Core/Internal/NatsPipeliningWriteProtocolProcessor.cs +++ b/src/NATS.Client.Core/Internal/NatsPipeliningWriteProtocolProcessor.cs @@ -1,320 +1,309 @@ -using System.Buffers; -using System.Diagnostics; -using System.IO.Pipelines; -using System.Threading.Channels; -using Microsoft.Extensions.Logging; -using NATS.Client.Core.Commands; - -namespace NATS.Client.Core.Internal; - -internal sealed class NatsPipeliningWriteProtocolProcessor : IAsyncDisposable -{ - private readonly CancellationTokenSource _cts; - private readonly ConnectionStatsCounter _counter; - private readonly NatsOpts _opts; - private readonly PipeReader _pipeReader; - private readonly Queue _inFlightCommands; - private readonly ChannelReader _queuedCommandReader; - private readonly ISocketConnection _socketConnection; - private readonly Stopwatch _stopwatch = new Stopwatch(); - private int _disposed; - - public NatsPipeliningWriteProtocolProcessor(ISocketConnection socketConnection, CommandWriter commandWriter, NatsOpts opts, ConnectionStatsCounter counter) - { - _cts = new CancellationTokenSource(); - _counter = counter; - _inFlightCommands = commandWriter.InFlightCommands; - _opts = opts; - _pipeReader = commandWriter.PipeReader; - _queuedCommandReader = commandWriter.QueuedCommandsReader; - _socketConnection = socketConnection; - WriteLoop = Task.Run(WriteLoopAsync); - } - - public Task WriteLoop { get; } - - public async ValueTask DisposeAsync() - { - if (Interlocked.Increment(ref _disposed) == 1) - { -#if NET6_0 - _cts.Cancel(); -#else - await _cts.CancelAsync().ConfigureAwait(false); -#endif - try - { - await WriteLoop.ConfigureAwait(false); // wait to drain writer - } - catch - { - // ignore - } - } - } - - private async Task WriteLoopAsync() - { - var logger = _opts.LoggerFactory.CreateLogger(); - var isEnabledTraceLogging = logger.IsEnabled(LogLevel.Trace); - var cancellationToken = _cts.Token; - var pending = 0; - var trimming = 0; - var examinedOffset = 0; - var sent = 0; - - // memory segment used to consolidate multiple small memory chunks - // should <= (minimumSegmentSize * 0.5) in CommandWriter - // 8520 should fit into 6 packets on 1500 MTU TLS connection or 1 packet on 9000 MTU TLS connection - // assuming 40 bytes TCP overhead + 40 bytes TLS overhead per packet - var consolidateMem = new Memory(new byte[8520]); - - // add up in flight command sum - var inFlightSum = 0; - foreach (var command in _inFlightCommands) - { - inFlightSum += command.Size; - } - - try - { - while (true) - { - // read data from pipe reader - var result = await _pipeReader.ReadAsync(cancellationToken).ConfigureAwait(false); - if (result.IsCanceled) - { - // if the pipe has been canceled, break - break; - } - - var consumedPos = result.Buffer.Start; - var examinedPos = result.Buffer.Start; - try - { - // move from _queuedCommandReader to _inFlightCommands until the total size - // of all _inFlightCommands is >= result.Buffer.Length - while (inFlightSum < result.Buffer.Length) - { - QueuedCommand queuedCommand; - while (!_queuedCommandReader.TryRead(out queuedCommand)) - { - await _queuedCommandReader.WaitToReadAsync(cancellationToken).ConfigureAwait(false); - } - - _inFlightCommands.Enqueue(queuedCommand); - inFlightSum += queuedCommand.Size; - } - - // examinedOffset was processed last iteration, so slice it off the buffer - var buffer = result.Buffer.Slice(examinedOffset); - - // iterate until buffer is empty - // any time buffer sliced and re-assigned, continue should be called - // so that this conditional is checked - while (buffer.Length > 0) - { - // if there are no pending bytes to send, set to next command - if (pending == 0) - { - var peek = _inFlightCommands.Peek(); - pending = peek.Size; - trimming = peek.Trim; - } - - // from this point forward, pending != 0 - // any operation that decrements pending should check if it is 0, - // and dequeue from _inFlightCommands if it is - - // trim any bytes that should not be sent - if (trimming > 0) - { - var trimmed = Math.Min(trimming, (int)buffer.Length); - pending -= trimmed; - trimming -= trimmed; - if (pending == 0) - { - // the entire command was trimmed (canceled) - inFlightSum -= _inFlightCommands.Dequeue().Size; - consumedPos = buffer.GetPosition(trimmed); - examinedPos = consumedPos; - examinedOffset = 0; - buffer = buffer.Slice(trimmed); - - // iterate in case buffer is now empty - continue; - } - - // the command was partially trimmed - examinedPos = buffer.GetPosition(trimmed); - examinedOffset += trimmed; - buffer = buffer.Slice(trimmed); - - // iterate in case buffer is now empty - continue; - } - - if (sent > 0) - { - if (pending <= sent) - { - // the entire command was sent - inFlightSum -= _inFlightCommands.Dequeue().Size; - Interlocked.Add(ref _counter.PendingMessages, -1); - Interlocked.Add(ref _counter.SentMessages, 1); - - // mark the bytes as consumed, and reset pending - sent -= pending; - consumedPos = buffer.GetPosition(pending); - examinedPos = consumedPos; - examinedOffset = 0; - buffer = buffer.Slice(pending); - pending = 0; - - // iterate in case buffer is now empty - continue; - } - - // the command was partially sent - // decrement pending by the number of bytes that were sent - pending -= sent; - examinedPos = buffer.GetPosition(sent); - examinedOffset += sent; - buffer = buffer.Slice(sent); - sent = 0; - - // iterate in case buffer is now empty - continue; - } - - // loop through _inFlightCommands to determine whether any commands - // in the first memory segment need trimming - var sendMem = buffer.First; - var maxSize = 0; - var maxSizeCap = Math.Max(sendMem.Length, consolidateMem.Length); - var doTrim = false; - foreach (var command in _inFlightCommands) - { - if (maxSize == 0) - { - // first command; set to pending - maxSize = pending; - continue; - } - - if (maxSize > maxSizeCap) - { - // over cap - break; - } - - if (command.Trim > 0) - { - // will have to trim - doTrim = true; - break; - } - - maxSize += command.Size; - } - - // adjust the first memory segment to end on a command boundary - if (sendMem.Length > maxSize) - { - sendMem = sendMem[..maxSize]; - } - - // if trimming is required or the first memory segment is smaller than consolidateMem - // consolidate bytes that need to be sent into consolidateMem - if (doTrim || (buffer.Length > sendMem.Length && sendMem.Length < consolidateMem.Length)) - { - var bufferIter = buffer; - var memIter = consolidateMem; - var trimmedSize = 0; - foreach (var command in _inFlightCommands) - { - if (bufferIter.Length == 0 || memIter.Length == 0) - { - break; - } - - int write; - if (trimmedSize == 0) - { - // first command, only write pending data - write = pending; - } - else if (command.Trim == 0) - { - write = command.Size; - } - else - { - if (bufferIter.Length < command.Trim + 1) - { - // not enough bytes to start writing the next command - break; - } - - bufferIter = bufferIter.Slice(command.Trim); - write = command.Size - command.Trim; - if (write == 0) - { - // the entire command was trimmed (canceled) - continue; - } - } - - write = Math.Min(memIter.Length, write); - write = Math.Min((int)bufferIter.Length, write); - bufferIter.Slice(0, write).CopyTo(memIter.Span); - memIter = memIter[write..]; - bufferIter = bufferIter.Slice(write); - trimmedSize += write; - } - - sendMem = consolidateMem[..trimmedSize]; - } - - // perform send - _stopwatch.Restart(); - sent = await _socketConnection.SendAsync(sendMem).ConfigureAwait(false); - _stopwatch.Stop(); - Interlocked.Add(ref _counter.SentBytes, sent); - if (isEnabledTraceLogging) - { - logger.LogTrace("Socket.SendAsync. Size: {0} Elapsed: {1}ms", sent, _stopwatch.Elapsed.TotalMilliseconds); - } - } - } - finally - { - // _pipeReader.AdvanceTo must be called exactly once for every - // _pipeReader.ReadAsync, which is why it is in the finally block - _pipeReader.AdvanceTo(consumedPos, examinedPos); - } - - if (result.IsCompleted) - { - // if the pipe has been completed, break - break; - } - } - } - catch (OperationCanceledException) - { - // ignore, intentionally disposed - } - catch (SocketClosedException) - { - // ignore, will be handled in read loop - } - catch (Exception ex) - { - logger.LogError(ex, "Unexpected error occured in write loop"); - throw; - } - - logger.LogDebug("WriteLoop finished."); - } -} +// using System.Buffers; +// using System.Diagnostics; +// using System.IO.Pipelines; +// using System.Threading.Channels; +// using Microsoft.Extensions.Logging; +// using NATS.Client.Core.Commands; +// +// namespace NATS.Client.Core.Internal; +// +// internal sealed class NatsPipeliningWriteProtocolProcessor : IAsyncDisposable +// { +// private readonly CancellationTokenSource _cts; +// private readonly ConnectionStatsCounter _counter; +// private readonly NatsOpts _opts; +// private readonly PipeReader _pipeReader; +// private readonly Queue _inFlightCommands; +// private readonly ChannelReader _queuedCommandReader; +// private readonly ISocketConnection _socketConnection; +// private readonly Stopwatch _stopwatch = new Stopwatch(); +// private int _disposed; +// +// public NatsPipeliningWriteProtocolProcessor(ISocketConnection socketConnection, CommandWriter commandWriter, NatsOpts opts, ConnectionStatsCounter counter) +// { +// _cts = new CancellationTokenSource(); +// _counter = counter; +// _inFlightCommands = commandWriter.InFlightCommands; +// _opts = opts; +// _pipeReader = commandWriter.PipeReader; +// _queuedCommandReader = commandWriter.QueuedCommandsReader; +// _socketConnection = socketConnection; +// WriteLoop = Task.Run(WriteLoopAsync); +// } +// +// public Task WriteLoop { get; } +// +// public async ValueTask DisposeAsync() +// { +// if (Interlocked.Increment(ref _disposed) == 1) +// { +// #if NET6_0 +// _cts.Cancel(); +// #else +// await _cts.CancelAsync().ConfigureAwait(false); +// #endif +// try +// { +// await WriteLoop.ConfigureAwait(false); // wait to drain writer +// } +// catch +// { +// // ignore +// } +// } +// } +// +// private async Task WriteLoopAsync() +// { +// var logger = _opts.LoggerFactory.CreateLogger(); +// var isEnabledTraceLogging = logger.IsEnabled(LogLevel.Trace); +// var cancellationToken = _cts.Token; +// var pending = 0; +// var trimming = 0; +// var examinedOffset = 0; +// +// // memory segment used to consolidate multiple small memory chunks +// // should <= (minimumSegmentSize * 0.5) in CommandWriter +// // 8520 should fit into 6 packets on 1500 MTU TLS connection or 1 packet on 9000 MTU TLS connection +// // assuming 40 bytes TCP overhead + 40 bytes TLS overhead per packet +// var consolidateMem = new Memory(new byte[8520]); +// +// // add up in flight command sum +// var inFlightSum = 0; +// foreach (var command in _inFlightCommands) +// { +// inFlightSum += command.Size; +// } +// +// try +// { +// while (true) +// { +// var result = await _pipeReader.ReadAsync(cancellationToken).ConfigureAwait(false); +// if (result.IsCanceled) +// { +// break; +// } +// +// var consumedPos = result.Buffer.Start; +// var examinedPos = result.Buffer.Start; +// try +// { +// var buffer = result.Buffer.Slice(examinedOffset); +// while (inFlightSum < result.Buffer.Length) +// { +// QueuedCommand queuedCommand; +// while (!_queuedCommandReader.TryRead(out queuedCommand)) +// { +// await _queuedCommandReader.WaitToReadAsync(cancellationToken).ConfigureAwait(false); +// } +// +// _inFlightCommands.Enqueue(queuedCommand); +// inFlightSum += queuedCommand.Size; +// } +// +// while (buffer.Length > 0) +// { +// if (pending == 0) +// { +// var peek = _inFlightCommands.Peek(); +// pending = peek.Size; +// trimming = peek.Trim; +// } +// +// if (trimming > 0) +// { +// var trimmed = Math.Min(trimming, (int)buffer.Length); +// consumedPos = buffer.GetPosition(trimmed); +// examinedPos = buffer.GetPosition(trimmed); +// examinedOffset = 0; +// buffer = buffer.Slice(trimmed); +// pending -= trimmed; +// trimming -= trimmed; +// if (pending == 0) +// { +// // the entire command was trimmed (canceled) +// inFlightSum -= _inFlightCommands.Dequeue().Size; +// } +// +// continue; +// } +// +// var sendMem = buffer.First; +// var maxSize = 0; +// var maxSizeCap = Math.Max(sendMem.Length, consolidateMem.Length); +// var doTrim = false; +// foreach (var command in _inFlightCommands) +// { +// if (maxSize == 0) +// { +// // first command; set to pending +// maxSize = pending; +// continue; +// } +// +// if (maxSize > maxSizeCap) +// { +// // over cap +// break; +// } +// +// if (command.Trim > 0) +// { +// // will have to trim +// doTrim = true; +// break; +// } +// +// maxSize += command.Size; +// } +// +// if (sendMem.Length > maxSize) +// { +// sendMem = sendMem[..maxSize]; +// } +// +// var bufferIter = buffer; +// if (doTrim || (bufferIter.Length > sendMem.Length && sendMem.Length < consolidateMem.Length)) +// { +// var memIter = consolidateMem; +// var trimmedSize = 0; +// foreach (var command in _inFlightCommands) +// { +// if (bufferIter.Length == 0 || memIter.Length == 0) +// { +// break; +// } +// +// int write; +// if (trimmedSize == 0) +// { +// // first command, only write pending data +// write = pending; +// } +// else if (command.Trim == 0) +// { +// write = command.Size; +// } +// else +// { +// if (bufferIter.Length < command.Trim + 1) +// { +// // not enough bytes to start writing the next command +// break; +// } +// +// bufferIter = bufferIter.Slice(command.Trim); +// write = command.Size - command.Trim; +// if (write == 0) +// { +// // the entire command was trimmed (canceled) +// continue; +// } +// } +// +// write = Math.Min(memIter.Length, write); +// write = Math.Min((int)bufferIter.Length, write); +// bufferIter.Slice(0, write).CopyTo(memIter.Span); +// memIter = memIter[write..]; +// bufferIter = bufferIter.Slice(write); +// trimmedSize += write; +// } +// +// sendMem = consolidateMem[..trimmedSize]; +// } +// +// // perform send +// _stopwatch.Restart(); +// var sent = await _socketConnection.SendAsync(sendMem).ConfigureAwait(false); +// _stopwatch.Stop(); +// Interlocked.Add(ref _counter.SentBytes, sent); +// if (isEnabledTraceLogging) +// { +// logger.LogTrace("Socket.SendAsync. Size: {0} Elapsed: {1}ms", sent, _stopwatch.Elapsed.TotalMilliseconds); +// } +// +// var consumed = 0; +// var sentAndTrimmed = sent; +// while (consumed < sentAndTrimmed) +// { +// if (pending == 0) +// { +// var peek = _inFlightCommands.Peek(); +// pending = peek.Size - peek.Trim; +// consumed += peek.Trim; +// sentAndTrimmed += peek.Trim; +// +// if (pending == 0) +// { +// // the entire command was trimmed (canceled) +// inFlightSum -= _inFlightCommands.Dequeue().Size; +// continue; +// } +// } +// +// if (pending <= sentAndTrimmed - consumed) +// { +// // the entire command was sent +// inFlightSum -= _inFlightCommands.Dequeue().Size; +// Interlocked.Add(ref _counter.PendingMessages, -1); +// Interlocked.Add(ref _counter.SentMessages, 1); +// +// // mark the bytes as consumed, and reset pending +// consumed += pending; +// pending = 0; +// } +// else +// { +// // the entire command was not sent; decrement pending by +// // the number of bytes from the command that was sent +// pending += consumed - sentAndTrimmed; +// break; +// } +// } +// +// if (consumed > 0) +// { +// // mark fully sent commands as consumed +// consumedPos = buffer.GetPosition(consumed); +// examinedOffset = sentAndTrimmed - consumed; +// } +// else +// { +// // no commands were consumed +// examinedOffset += sentAndTrimmed; +// } +// +// // lop off sent bytes for next iteration +// examinedPos = buffer.GetPosition(sentAndTrimmed); +// buffer = buffer.Slice(sentAndTrimmed); +// } +// } +// finally +// { +// _pipeReader.AdvanceTo(consumedPos, examinedPos); +// } +// +// if (result.IsCompleted) +// { +// break; +// } +// } +// } +// catch (OperationCanceledException) +// { +// // ignore, intentionally disposed +// } +// catch (SocketClosedException) +// { +// // ignore, will be handled in read loop +// } +// catch (Exception ex) +// { +// logger.LogError(ex, "Unexpected error occured in write loop"); +// throw; +// } +// +// logger.LogDebug("WriteLoop finished."); +// } +// } diff --git a/src/NATS.Client.Core/NatsConnection.cs b/src/NATS.Client.Core/NatsConnection.cs index 6a6236f29..0a285d285 100644 --- a/src/NATS.Client.Core/NatsConnection.cs +++ b/src/NATS.Client.Core/NatsConnection.cs @@ -56,7 +56,6 @@ public partial class NatsConnection : INatsConnection private volatile NatsUri? _currentConnectUri; private volatile NatsUri? _lastSeedConnectUri; private NatsReadProtocolProcessor? _socketReader; - private NatsPipeliningWriteProtocolProcessor? _socketWriter; private TaskCompletionSource _waitForOpenConnection; private TlsCerts? _tlsCerts; private UserCredentials? _userCredentials; @@ -80,7 +79,7 @@ public NatsConnection(NatsOpts opts) _cancellationTimerPool = new CancellationTimerPool(_pool, _disposedCancellationTokenSource.Token); _name = opts.Name; Counter = new ConnectionStatsCounter(); - CommandWriter = new CommandWriter(Opts, Counter, EnqueuePing); + CommandWriter = new CommandWriter(_pool, Opts, Counter, EnqueuePing); InboxPrefix = NewInbox(opts.InboxPrefix); SubscriptionManager = new SubscriptionManager(this, InboxPrefix); _logger = opts.LoggerFactory.CreateLogger(); @@ -218,7 +217,8 @@ internal ValueTask UnsubscribeAsync(int sid) { try { - return CommandWriter.UnsubscribeAsync(sid, CancellationToken.None); + // TODO: use maxMsgs in INatsSub to unsubscribe. + return CommandWriter.UnsubscribeAsync(sid, null, CancellationToken.None); } catch (Exception ex) { @@ -431,7 +431,7 @@ private async ValueTask SetupReaderWriterAsync(bool reconnect) // Authentication _userCredentials?.Authenticate(_clientOpts, WritableServerInfo); - await using (var priorityCommandWriter = new PriorityCommandWriter(_socket!, Opts, Counter, EnqueuePing)) + await using (var priorityCommandWriter = new PriorityCommandWriter(_pool, _socket!, Opts, Counter, EnqueuePing)) { // add CONNECT and PING command to priority lane await priorityCommandWriter.CommandWriter.ConnectAsync(_clientOpts, CancellationToken.None).ConfigureAwait(false); @@ -455,7 +455,7 @@ private async ValueTask SetupReaderWriterAsync(bool reconnect) } // create the socket writer - _socketWriter = CommandWriter.CreateNatsPipeliningWriteProtocolProcessor(_socket!); + CommandWriter.Reset(_socket!); lock (_gate) { @@ -500,6 +500,8 @@ private async void ReconnectLoop() // If dispose this client, WaitForClosed throws OperationCanceledException so stop reconnect-loop correctly. await _socket!.WaitForClosed.ConfigureAwait(false); + await CommandWriter.CancelReaderLoopAsync().ConfigureAwait(false); + _logger.LogTrace(NatsLogEvents.Connection, "Connection {Name} is closed. Will cleanup and reconnect", _name); lock (_gate) { @@ -800,13 +802,6 @@ private async ValueTask DisposeSocketComponentAsync(IAsyncDisposable component, // Dispose Reader(Drain read buffers but no reads more) private async ValueTask DisposeSocketAsync(bool asyncReaderDispose) { - // writer's internal buffer/channel is not thread-safe, must wait until complete. - if (_socketWriter != null) - { - await DisposeSocketComponentAsync(_socketWriter, "socket writer").ConfigureAwait(false); - _socketWriter = null; - } - if (_socket != null) { await DisposeSocketComponentAsync(_socket, "socket").ConfigureAwait(false); diff --git a/src/NATS.Client.Core/NatsLogEvents.cs b/src/NATS.Client.Core/NatsLogEvents.cs index fc1cd0fe2..15456e79a 100644 --- a/src/NATS.Client.Core/NatsLogEvents.cs +++ b/src/NATS.Client.Core/NatsLogEvents.cs @@ -11,4 +11,5 @@ public static class NatsLogEvents public static readonly EventId Protocol = new(1005, nameof(Protocol)); public static readonly EventId TcpSocket = new(1006, nameof(TcpSocket)); public static readonly EventId Internal = new(1006, nameof(Internal)); + public static readonly EventId Buffer = new(1007, nameof(Buffer)); } diff --git a/src/NATS.Client.Core/NatsOpts.cs b/src/NATS.Client.Core/NatsOpts.cs index 4056427e8..dc7222403 100644 --- a/src/NATS.Client.Core/NatsOpts.cs +++ b/src/NATS.Client.Core/NatsOpts.cs @@ -31,7 +31,9 @@ public sealed record NatsOpts public ILoggerFactory LoggerFactory { get; init; } = NullLoggerFactory.Instance; - public int WriterBufferSize { get; init; } = 1048576; + // Same as default pipelines pause writer size + // Performing better compared to nats bench on localhost + public int WriterBufferSize { get; init; } = 65536; public int ReaderBufferSize { get; init; } = 1048576; diff --git a/tests/NATS.Client.Core.Tests/CancellationTest.cs b/tests/NATS.Client.Core.Tests/CancellationTest.cs index 1148cd569..732c07f84 100644 --- a/tests/NATS.Client.Core.Tests/CancellationTest.cs +++ b/tests/NATS.Client.Core.Tests/CancellationTest.cs @@ -15,15 +15,18 @@ public async Task CommandTimeoutTest() await using var conn = server.CreateClientConnection(NatsOpts.Default with { CommandTimeout = TimeSpan.FromMilliseconds(1) }); await conn.ConnectAsync(); + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + var cancellationToken = cts.Token; + // stall the flush task - await conn.CommandWriter.TestStallFlushAsync(TimeSpan.FromSeconds(5)); + Assert.True(conn.CommandWriter.TestStallFlush()); // commands that call ConnectAsync throw OperationCanceledException - await Assert.ThrowsAsync(() => conn.PingAsync().AsTask()); - await Assert.ThrowsAsync(() => conn.PublishAsync("test").AsTask()); - await Assert.ThrowsAsync(async () => + await Assert.ThrowsAsync(() => conn.PingAsync(cancellationToken).AsTask()); + await Assert.ThrowsAsync(() => conn.PublishAsync("test", cancellationToken: cancellationToken).AsTask()); + await Assert.ThrowsAsync(async () => { - await foreach (var unused in conn.SubscribeAsync("test")) + await foreach (var unused in conn.SubscribeAsync("test", cancellationToken: cancellationToken)) { } }); @@ -75,4 +78,22 @@ await Assert.ThrowsAsync(async () => } }); } + + [Fact] + public async Task Cancellation_timer() + { + var objectPool = new ObjectPool(10); + var cancellationTimerPool = new CancellationTimerPool(objectPool, CancellationToken.None); + var cancellationTimer = cancellationTimerPool.Start(TimeSpan.FromSeconds(2), CancellationToken.None); + + try + { + await Task.Delay(TimeSpan.FromSeconds(4), cancellationTimer.Token); + _output.WriteLine($"delayed 4 seconds"); + } + catch (Exception e) + { + _output.WriteLine($"Exception: {e.GetType().Name}"); + } + } } diff --git a/tests/NATS.Client.Core.Tests/ConnectionRetryTest.cs b/tests/NATS.Client.Core.Tests/ConnectionRetryTest.cs index 261337c60..132b727c3 100644 --- a/tests/NATS.Client.Core.Tests/ConnectionRetryTest.cs +++ b/tests/NATS.Client.Core.Tests/ConnectionRetryTest.cs @@ -58,6 +58,7 @@ public async Task Retry_and_connect_after_disconnected() [Fact] public async Task Reconnect_doesnt_drop_partially_sent_msgs() { + const int msgSize = 1048576; // 1MiB await using var server = NatsServer.Start(); await using var pubConn = server.CreateClientConnection(); @@ -86,6 +87,7 @@ public async Task Reconnect_doesnt_drop_partially_sent_msgs() } else { + Assert.Equal(msgSize, msg.Data.Length); Interlocked.Increment(ref received); } } @@ -99,7 +101,7 @@ public async Task Reconnect_doesnt_drop_partially_sent_msgs() } var sent = 0; - var data = new byte[1048576]; // 1MiB + var data = new byte[msgSize]; var sendTask = Task.Run(async () => { while (!stopCts.IsCancellationRequested) @@ -131,6 +133,8 @@ public async Task Reconnect_doesnt_drop_partially_sent_msgs() Assert.True(reconnects > 0, "connection did not reconnect"); Assert.True(received <= sent, $"duplicate messages sent on wire- {sent} sent, {received} received"); + _output.WriteLine($"reconnects: {reconnects}, sent: {sent}, received: {received}"); + // some messages may still be lost, as socket could have been disconnected // after socket.WriteAsync returned, but before OS sent // check to ensure that the loss was < 1% diff --git a/tests/NATS.Client.Core.Tests/NatsHeaderTest.cs b/tests/NATS.Client.Core.Tests/NatsHeaderTest.cs index 6b74776af..e2081e740 100644 --- a/tests/NATS.Client.Core.Tests/NatsHeaderTest.cs +++ b/tests/NATS.Client.Core.Tests/NatsHeaderTest.cs @@ -21,8 +21,8 @@ public async Task WriterTests() ["key"] = "a-long-header-value", }; var pipe = new Pipe(new PipeOptions(pauseWriterThreshold: 0)); - var writer = new HeaderWriter(pipe.Writer, Encoding.UTF8); - var written = writer.Write(headers); + var writer = new HeaderWriter(Encoding.UTF8); + var written = writer.Write(pipe.Writer, headers); var text = "NATS/1.0\r\nk1: v1\r\nk2: v2-0\r\nk2: v2-1\r\na-long-header-key: value\r\nkey: a-long-header-value\r\n\r\n"; var expected = new ReadOnlySequence(Encoding.UTF8.GetBytes(text)); @@ -30,7 +30,24 @@ public async Task WriterTests() Assert.Equal(expected.Length, written); await pipe.Writer.FlushAsync(); var result = await pipe.Reader.ReadAtLeastAsync((int)written); + Assert.True(expected.ToSpan().SequenceEqual(result.Buffer.ToSpan())); + _output.WriteLine($"Buffer:\n{result.Buffer.FirstSpan.Dump()}"); + } + [Fact] + public async Task WriterEmptyTests() + { + var headers = new NatsHeaders(); + var pipe = new Pipe(new PipeOptions(pauseWriterThreshold: 0)); + var writer = new HeaderWriter(Encoding.UTF8); + var written = writer.Write(pipe.Writer, headers); + + var text = "NATS/1.0\r\n\r\n"; + var expected = new ReadOnlySequence(Encoding.UTF8.GetBytes(text)); + + Assert.Equal(expected.Length, written); + await pipe.Writer.FlushAsync(); + var result = await pipe.Reader.ReadAtLeastAsync((int)written); Assert.True(expected.ToSpan().SequenceEqual(result.Buffer.ToSpan())); _output.WriteLine($"Buffer:\n{result.Buffer.FirstSpan.Dump()}"); } diff --git a/tests/NATS.Client.Core.Tests/ProtocolTest.cs b/tests/NATS.Client.Core.Tests/ProtocolTest.cs index 35a64e266..6c7228c35 100644 --- a/tests/NATS.Client.Core.Tests/ProtocolTest.cs +++ b/tests/NATS.Client.Core.Tests/ProtocolTest.cs @@ -1,4 +1,5 @@ using System.Buffers; +using System.Collections.Concurrent; using System.Text; using Microsoft.Extensions.Logging; using NATS.Client.TestUtilities; @@ -335,17 +336,29 @@ await Retry.Until( () => proxy.ClientFrames.Any(f => f.Message.StartsWith("PUB foo"))); var frames = proxy.ClientFrames.Select(f => f.Message).ToList(); + + foreach (var frame in frames) + { + _output.WriteLine($"frame: {frame}"); + } + Assert.StartsWith("SUB foo", frames[0]); - Assert.StartsWith("PUB bar1", frames[1]); - Assert.StartsWith("PUB bar2", frames[2]); - Assert.StartsWith("PUB bar3", frames[3]); - Assert.StartsWith("PUB foo", frames[4]); + + for (var i = 0; i < 100; i++) + { + Assert.StartsWith($"PUB bar{i}", frames[i + 1]); + } + + Assert.StartsWith("PUB foo", frames[101]); await nats.DisposeAsync(); } - [Fact] - public async Task Protocol_parser_under_load() + [Theory] + [InlineData(1)] + [InlineData(1024)] + [InlineData(1024 * 1024)] + public async Task Protocol_parser_under_load(int size) { await using var server = NatsServer.Start(); var logger = new InMemoryTestLoggerFactory(LogLevel.Error); @@ -355,24 +368,27 @@ public async Task Protocol_parser_under_load() using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); var signal = new WaitSignal(); - + var counts = new ConcurrentDictionary(); _ = Task.Run( async () => { var count = 0; - await foreach (var unused in nats.SubscribeAsync("x", cancellationToken: cts.Token)) + await foreach (var msg in nats.SubscribeAsync("x.*", cancellationToken: cts.Token)) { - if (++count > 10_000) + if (++count > 100) signal.Pulse(); + counts.AddOrUpdate(msg.Subject, 1, (_, c) => c + 1); } }, cts.Token); + var payload = new byte[size]; + var r = 0; _ = Task.Run( async () => { while (!cts.Token.IsCancellationRequested) - await nats.PublishAsync("x", "x", cancellationToken: cts.Token); + await nats.PublishAsync($"x.{Volatile.Read(ref r)}", payload, cancellationToken: cts.Token); }, cts.Token); @@ -382,12 +398,21 @@ public async Task Protocol_parser_under_load() { await Task.Delay(1_000, cts.Token); await server.RestartAsync(); + Interlocked.Increment(ref r); + await Task.Delay(1_000, cts.Token); } foreach (var log in logger.Logs.Where(x => x.EventId == NatsLogEvents.Protocol && x.LogLevel == LogLevel.Error)) { Assert.DoesNotContain("Unknown Protocol Operation", log.Message); } + + foreach (var (key, value) in counts) + { + _output.WriteLine($"{key} {value}"); + } + + counts.Count.Should().BeGreaterOrEqualTo(3); } private sealed class NatsSubReconnectTest : NatsSubBase @@ -403,9 +428,10 @@ internal override async ValueTask WriteReconnectCommandsAsync(CommandWriter comm await base.WriteReconnectCommandsAsync(commandWriter, sid); // Any additional commands to send on reconnect - await commandWriter.PublishAsync("bar1", default, default, default, NatsRawSerializer.Default, default); - await commandWriter.PublishAsync("bar2", default, default, default, NatsRawSerializer.Default, default); - await commandWriter.PublishAsync("bar3", default, default, default, NatsRawSerializer.Default, default); + for (var i = 0; i < 100; i++) + { + await commandWriter.PublishAsync($"bar{i}", default, default, default, NatsRawSerializer.Default, default); + } } protected override ValueTask ReceiveInternalAsync(string subject, string? replyTo, ReadOnlySequence? headersBuffer, ReadOnlySequence payloadBuffer) diff --git a/tests/NATS.Client.Core.Tests/SendBufferTest.cs b/tests/NATS.Client.Core.Tests/SendBufferTest.cs new file mode 100644 index 000000000..d6d098804 --- /dev/null +++ b/tests/NATS.Client.Core.Tests/SendBufferTest.cs @@ -0,0 +1,78 @@ +using System.Diagnostics; +using NATS.Client.TestUtilities; + +namespace NATS.Client.Core.Tests; + +public class SendBufferTest +{ + private readonly ITestOutputHelper _output; + + public SendBufferTest(ITestOutputHelper output) => _output = output; + + [Fact] + public async Task Send_cancel() + { + void Log(string m) => TmpFileLogger.Log(m); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + + await using var server = new MockServer( + async (s, cmd) => + { + if (cmd.Name == "PUB" && cmd.Subject == "pause") + { + s.Log("[S] pause"); + await Task.Delay(10_000, cts.Token); + } + }, + Log, + cts.Token); + + Log("__________________________________"); + + await using var nats = new NatsConnection(new NatsOpts { Url = server.Url }); + + Log($"[C] connect {server.Url}"); + await nats.ConnectAsync(); + + Log($"[C] ping"); + var rtt = await nats.PingAsync(cts.Token); + Log($"[C] ping rtt={rtt}"); + + server.Log($"[C] publishing pause..."); + await nats.PublishAsync("pause", "x", cancellationToken: cts.Token); + + server.Log($"[C] publishing 1M..."); + var payload = new byte[1024 * 1024]; + var tasks = new List(); + for (var i = 0; i < 10; i++) + { + var i1 = i; + tasks.Add(Task.Run(async () => + { + var stopwatch = Stopwatch.StartNew(); + + try + { + Log($"[C] ({i1}) publish..."); + await nats.PublishAsync("x", payload, cancellationToken: cts.Token); + } + catch (Exception e) + { + stopwatch.Stop(); + Log($"[C] ({i1}) publish cancelled after {stopwatch.Elapsed.TotalSeconds:n0} s (exception: {e.GetType()})"); + return; + } + + stopwatch.Stop(); + Log($"[C] ({i1}) publish took {stopwatch.Elapsed.TotalSeconds:n3} s"); + })); + } + + for (var i = 0; i < 10; i++) + { + Log($"[C] await tasks {i}..."); + await tasks[i]; + } + } +} diff --git a/tests/NATS.Client.TestUtilities/MockServer.cs b/tests/NATS.Client.TestUtilities/MockServer.cs new file mode 100644 index 000000000..d9a0f273d --- /dev/null +++ b/tests/NATS.Client.TestUtilities/MockServer.cs @@ -0,0 +1,112 @@ +using System.Net; +using System.Net.Sockets; +using System.Text; +using System.Text.RegularExpressions; + +namespace NATS.Client.TestUtilities; + +public class MockServer : IAsyncDisposable +{ + private readonly Action _logger; + private readonly TcpListener _server; + private readonly Task _accept; + + public MockServer( + Func handler, + Action logger, + CancellationToken cancellationToken) + { + _logger = logger; + _server = new TcpListener(IPAddress.Parse("127.0.0.1"), 0); + _server.Start(); + Port = ((IPEndPoint)_server.LocalEndpoint).Port; + + _accept = Task.Run( + async () => + { + var client = await _server.AcceptTcpClientAsync(); + + var stream = client.GetStream(); + + var sw = new StreamWriter(stream, Encoding.ASCII); + await sw.WriteAsync("INFO {}\r\n"); + await sw.FlushAsync(); + + var sr = new StreamReader(stream, Encoding.ASCII); + + while (!cancellationToken.IsCancellationRequested) + { + Log($"[S] >>> READ LINE"); + var line = (await sr.ReadLineAsync())!; + + if (line.StartsWith("CONNECT")) + { + Log($"[S] RCV CONNECT"); + } + else if (line.StartsWith("PING")) + { + Log($"[S] RCV PING"); + await sw.WriteAsync("PONG\r\n"); + await sw.FlushAsync(); + Log($"[S] SND PONG"); + } + else if (line.StartsWith("SUB")) + { + var m = Regex.Match(line, @"^SUB\s+(?\S+)"); + var subject = m.Groups["subject"].Value; + Log($"[S] RCV SUB {subject}"); + await handler(this, new Cmd("SUB", subject, 0)); + } + else if (line.StartsWith("PUB") || line.StartsWith("HPUB")) + { + var m = Regex.Match(line, @"^(H?PUB)\s+(?\S+).*?(?\d+)$"); + var size = int.Parse(m.Groups["size"].Value); + var subject = m.Groups["subject"].Value; + Log($"[S] RCV PUB {subject} {size}"); + var read = 0; + var buffer = new byte[size]; + while (read < size) + { + var received = await stream.ReadAsync(buffer, read, size - read); + read += received; + Log($"[S] RCV {received} bytes (size={size} read={read})"); + } + + await handler(this, new Cmd("PUB", subject, size)); + await sr.ReadLineAsync(); + } + else + { + Log($"[S] RCV LINE: {line}"); + } + } + }, + cancellationToken); + } + + public int Port { get; } + + public string Url => $"127.0.0.1:{Port}"; + + public async ValueTask DisposeAsync() + { + _server.Stop(); + try + { + await _accept; + } + catch (OperationCanceledException) + { + } + catch (SocketException) + { + } + catch (IOException) + { + } + } + + public void Log(string m) => _logger(m); + + public record Cmd(string Name, string Subject, int Size); +} diff --git a/tests/NATS.Client.TestUtilities/TmpFileLogger.cs b/tests/NATS.Client.TestUtilities/TmpFileLogger.cs new file mode 100644 index 000000000..72d8e8ee0 --- /dev/null +++ b/tests/NATS.Client.TestUtilities/TmpFileLogger.cs @@ -0,0 +1,18 @@ +using System.Text; + +namespace NATS.Client.TestUtilities; + +public static class TmpFileLogger +{ + private static readonly object Gate = new(); + + public static void Log(string m) + { + lock (Gate) + { + using var fs = new FileStream("/tmp/test.log", FileMode.Append, FileAccess.Write, FileShare.ReadWrite); + using var sw = new StreamWriter(fs, Encoding.UTF8); + sw.WriteLine($"{DateTime.Now:HH:mm:ss.fff} {m}"); + } + } +}