From 54c007fd246eb22a6d2a1a1e2f77acd1cb2a8e99 Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Fri, 2 Feb 2024 14:25:28 +0000 Subject: [PATCH] Flush buffers cleanly on dispose --- src/NATS.Client.Core/Commands/CommandWriter.cs | 14 ++++++++++++-- .../Commands/PriorityCommandWriter.cs | 1 - src/NATS.Client.Core/NatsConnection.cs | 5 +---- tests/NATS.Client.Core.Tests/ProtocolTest.cs | 18 +++++++++++------- 4 files changed, 24 insertions(+), 14 deletions(-) diff --git a/src/NATS.Client.Core/Commands/CommandWriter.cs b/src/NATS.Client.Core/Commands/CommandWriter.cs index 6edb8c2d6..98f887615 100644 --- a/src/NATS.Client.Core/Commands/CommandWriter.cs +++ b/src/NATS.Client.Core/Commands/CommandWriter.cs @@ -76,7 +76,7 @@ public void Reset(ISocketConnection socketConnection) lock (_lock) { _socketConnection = socketConnection; - _ctsReader = CancellationTokenSource.CreateLinkedTokenSource(_cts.Token); + _ctsReader = new CancellationTokenSource(); _readerLoopTask = Task.Run(async () => { @@ -85,7 +85,7 @@ public void Reset(ISocketConnection socketConnection) } } - public async Task FlushAsync() + public async Task CancelReaderLoopAsync() { CancellationTokenSource? cts; Task? readerTask; @@ -126,6 +126,16 @@ public async ValueTask DisposeAsync() _channelLock.Writer.TryComplete(); _channelSize.Writer.TryComplete(); await _pipeWriter.CompleteAsync().ConfigureAwait(false); + + Task? readerTask; + lock (_lock) + { + readerTask = _readerLoopTask; + } + + if (readerTask != null) + await readerTask.ConfigureAwait(false); + } public async ValueTask ConnectAsync(ClientOpts connectOpts, CancellationToken cancellationToken) diff --git a/src/NATS.Client.Core/Commands/PriorityCommandWriter.cs b/src/NATS.Client.Core/Commands/PriorityCommandWriter.cs index 4bd0cce06..3020e84f4 100644 --- a/src/NATS.Client.Core/Commands/PriorityCommandWriter.cs +++ b/src/NATS.Client.Core/Commands/PriorityCommandWriter.cs @@ -18,7 +18,6 @@ public async ValueTask DisposeAsync() { if (Interlocked.Increment(ref _disposed) == 1) { - await CommandWriter.FlushAsync().ConfigureAwait(false); // disposing command writer marks pipe writer as complete await CommandWriter.DisposeAsync().ConfigureAwait(false); } diff --git a/src/NATS.Client.Core/NatsConnection.cs b/src/NATS.Client.Core/NatsConnection.cs index 1afe66f92..0a285d285 100644 --- a/src/NATS.Client.Core/NatsConnection.cs +++ b/src/NATS.Client.Core/NatsConnection.cs @@ -500,7 +500,7 @@ private async void ReconnectLoop() // If dispose this client, WaitForClosed throws OperationCanceledException so stop reconnect-loop correctly. await _socket!.WaitForClosed.ConfigureAwait(false); - await CommandWriter.FlushAsync().ConfigureAwait(false); + await CommandWriter.CancelReaderLoopAsync().ConfigureAwait(false); _logger.LogTrace(NatsLogEvents.Connection, "Connection {Name} is closed. Will cleanup and reconnect", _name); lock (_gate) @@ -802,9 +802,6 @@ private async ValueTask DisposeSocketComponentAsync(IAsyncDisposable component, // Dispose Reader(Drain read buffers but no reads more) private async ValueTask DisposeSocketAsync(bool asyncReaderDispose) { - // writer's internal buffer/channel is not thread-safe, must wait until complete. - await CommandWriter.FlushAsync().ConfigureAwait(false); - if (_socket != null) { await DisposeSocketComponentAsync(_socket, "socket").ConfigureAwait(false); diff --git a/tests/NATS.Client.Core.Tests/ProtocolTest.cs b/tests/NATS.Client.Core.Tests/ProtocolTest.cs index 704f2242c..6c7228c35 100644 --- a/tests/NATS.Client.Core.Tests/ProtocolTest.cs +++ b/tests/NATS.Client.Core.Tests/ProtocolTest.cs @@ -343,10 +343,13 @@ await Retry.Until( } Assert.StartsWith("SUB foo", frames[0]); - Assert.StartsWith("PUB bar1", frames[1]); - Assert.StartsWith("PUB bar2", frames[2]); - Assert.StartsWith("PUB bar3", frames[3]); - Assert.StartsWith("PUB foo", frames[4]); + + for (var i = 0; i < 100; i++) + { + Assert.StartsWith($"PUB bar{i}", frames[i + 1]); + } + + Assert.StartsWith("PUB foo", frames[101]); await nats.DisposeAsync(); } @@ -425,9 +428,10 @@ internal override async ValueTask WriteReconnectCommandsAsync(CommandWriter comm await base.WriteReconnectCommandsAsync(commandWriter, sid); // Any additional commands to send on reconnect - await commandWriter.PublishAsync("bar1", default, default, default, NatsRawSerializer.Default, default); - await commandWriter.PublishAsync("bar2", default, default, default, NatsRawSerializer.Default, default); - await commandWriter.PublishAsync("bar3", default, default, default, NatsRawSerializer.Default, default); + for (var i = 0; i < 100; i++) + { + await commandWriter.PublishAsync($"bar{i}", default, default, default, NatsRawSerializer.Default, default); + } } protected override ValueTask ReceiveInternalAsync(string subject, string? replyTo, ReadOnlySequence? headersBuffer, ReadOnlySequence payloadBuffer)