Skip to content

Commit

Permalink
Flush buffers cleanly on dispose
Browse files Browse the repository at this point in the history
  • Loading branch information
mtmk committed Feb 2, 2024
1 parent 51fa05d commit 54c007f
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 14 deletions.
14 changes: 12 additions & 2 deletions src/NATS.Client.Core/Commands/CommandWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public void Reset(ISocketConnection socketConnection)
lock (_lock)
{
_socketConnection = socketConnection;
_ctsReader = CancellationTokenSource.CreateLinkedTokenSource(_cts.Token);
_ctsReader = new CancellationTokenSource();

_readerLoopTask = Task.Run(async () =>
{
Expand All @@ -85,7 +85,7 @@ public void Reset(ISocketConnection socketConnection)
}
}

public async Task FlushAsync()
public async Task CancelReaderLoopAsync()
{
CancellationTokenSource? cts;
Task? readerTask;
Expand Down Expand Up @@ -126,6 +126,16 @@ public async ValueTask DisposeAsync()
_channelLock.Writer.TryComplete();
_channelSize.Writer.TryComplete();
await _pipeWriter.CompleteAsync().ConfigureAwait(false);

Task? readerTask;
lock (_lock)
{
readerTask = _readerLoopTask;
}

if (readerTask != null)
await readerTask.ConfigureAwait(false);

}

Check warning on line 139 in src/NATS.Client.Core/Commands/CommandWriter.cs

View workflow job for this annotation

GitHub Actions / test (main)

Check warning on line 139 in src/NATS.Client.Core/Commands/CommandWriter.cs

View workflow job for this annotation

GitHub Actions / test (main)

Check warning on line 139 in src/NATS.Client.Core/Commands/CommandWriter.cs

View workflow job for this annotation

GitHub Actions / test (v2.9.22)

Check warning on line 139 in src/NATS.Client.Core/Commands/CommandWriter.cs

View workflow job for this annotation

GitHub Actions / test (v2.9.22)

Check warning on line 139 in src/NATS.Client.Core/Commands/CommandWriter.cs

View workflow job for this annotation

GitHub Actions / test (v2.10.9)

Check warning on line 139 in src/NATS.Client.Core/Commands/CommandWriter.cs

View workflow job for this annotation

GitHub Actions / test (v2.10.9)

Check warning on line 139 in src/NATS.Client.Core/Commands/CommandWriter.cs

View workflow job for this annotation

GitHub Actions / check

A closing brace should not be preceded by a blank line

Check warning on line 139 in src/NATS.Client.Core/Commands/CommandWriter.cs

View workflow job for this annotation

GitHub Actions / check

A closing brace should not be preceded by a blank line

Check warning on line 139 in src/NATS.Client.Core/Commands/CommandWriter.cs

View workflow job for this annotation

GitHub Actions / memory test (main)

Check warning on line 139 in src/NATS.Client.Core/Commands/CommandWriter.cs

View workflow job for this annotation

GitHub Actions / memory test (main)

Check warning on line 139 in src/NATS.Client.Core/Commands/CommandWriter.cs

View workflow job for this annotation

GitHub Actions / memory test (main)

Check warning on line 139 in src/NATS.Client.Core/Commands/CommandWriter.cs

View workflow job for this annotation

GitHub Actions / memory test (main)

Check warning on line 139 in src/NATS.Client.Core/Commands/CommandWriter.cs

View workflow job for this annotation

GitHub Actions / memory test (v2.10.9)

Check warning on line 139 in src/NATS.Client.Core/Commands/CommandWriter.cs

View workflow job for this annotation

GitHub Actions / memory test (v2.10.9)

Check warning on line 139 in src/NATS.Client.Core/Commands/CommandWriter.cs

View workflow job for this annotation

GitHub Actions / memory test (v2.10.9)

Check warning on line 139 in src/NATS.Client.Core/Commands/CommandWriter.cs

View workflow job for this annotation

GitHub Actions / memory test (v2.10.9)

Check warning on line 139 in src/NATS.Client.Core/Commands/CommandWriter.cs

View workflow job for this annotation

GitHub Actions / dotnet (v2.10.9)

Check warning on line 139 in src/NATS.Client.Core/Commands/CommandWriter.cs

View workflow job for this annotation

GitHub Actions / dotnet (v2.10.9)

Check warning on line 139 in src/NATS.Client.Core/Commands/CommandWriter.cs

View workflow job for this annotation

GitHub Actions / dotnet (v2.10.9)

Check warning on line 139 in src/NATS.Client.Core/Commands/CommandWriter.cs

View workflow job for this annotation

GitHub Actions / dotnet (v2.10.9)

Check warning on line 139 in src/NATS.Client.Core/Commands/CommandWriter.cs

View workflow job for this annotation

GitHub Actions / dotnet (v2.10.9)

Check warning on line 139 in src/NATS.Client.Core/Commands/CommandWriter.cs

View workflow job for this annotation

GitHub Actions / dotnet (v2.10.9)

Check warning on line 139 in src/NATS.Client.Core/Commands/CommandWriter.cs

View workflow job for this annotation

GitHub Actions / dotnet (main)

Check warning on line 139 in src/NATS.Client.Core/Commands/CommandWriter.cs

View workflow job for this annotation

GitHub Actions / dotnet (main)

Check warning on line 139 in src/NATS.Client.Core/Commands/CommandWriter.cs

View workflow job for this annotation

GitHub Actions / dotnet (main)

Check warning on line 139 in src/NATS.Client.Core/Commands/CommandWriter.cs

View workflow job for this annotation

GitHub Actions / dotnet (main)

Check warning on line 139 in src/NATS.Client.Core/Commands/CommandWriter.cs

View workflow job for this annotation

GitHub Actions / dotnet (main)

Check warning on line 139 in src/NATS.Client.Core/Commands/CommandWriter.cs

View workflow job for this annotation

GitHub Actions / dotnet (main)

Check warning on line 139 in src/NATS.Client.Core/Commands/CommandWriter.cs

View workflow job for this annotation

GitHub Actions / dotnet (v2.9.22)

Check warning on line 139 in src/NATS.Client.Core/Commands/CommandWriter.cs

View workflow job for this annotation

GitHub Actions / dotnet (v2.9.22)

Check warning on line 139 in src/NATS.Client.Core/Commands/CommandWriter.cs

View workflow job for this annotation

GitHub Actions / dotnet (v2.9.22)

Check warning on line 139 in src/NATS.Client.Core/Commands/CommandWriter.cs

View workflow job for this annotation

GitHub Actions / dotnet (v2.9.22)

Check warning on line 139 in src/NATS.Client.Core/Commands/CommandWriter.cs

View workflow job for this annotation

GitHub Actions / dotnet (v2.9.22)

Check warning on line 139 in src/NATS.Client.Core/Commands/CommandWriter.cs

View workflow job for this annotation

GitHub Actions / dotnet (v2.9.22)

Check warning on line 139 in src/NATS.Client.Core/Commands/CommandWriter.cs

View workflow job for this annotation

GitHub Actions / memory test (v2.9.22)

Check warning on line 139 in src/NATS.Client.Core/Commands/CommandWriter.cs

View workflow job for this annotation

GitHub Actions / memory test (v2.9.22)

Check warning on line 139 in src/NATS.Client.Core/Commands/CommandWriter.cs

View workflow job for this annotation

GitHub Actions / memory test (v2.9.22)

Check warning on line 139 in src/NATS.Client.Core/Commands/CommandWriter.cs

View workflow job for this annotation

GitHub Actions / memory test (v2.9.22)


public async ValueTask ConnectAsync(ClientOpts connectOpts, CancellationToken cancellationToken)
Expand Down
1 change: 0 additions & 1 deletion src/NATS.Client.Core/Commands/PriorityCommandWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ public async ValueTask DisposeAsync()
{
if (Interlocked.Increment(ref _disposed) == 1)
{
await CommandWriter.FlushAsync().ConfigureAwait(false);
// disposing command writer marks pipe writer as complete
await CommandWriter.DisposeAsync().ConfigureAwait(false);
}
Expand Down
5 changes: 1 addition & 4 deletions src/NATS.Client.Core/NatsConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,7 @@ private async void ReconnectLoop()
// If dispose this client, WaitForClosed throws OperationCanceledException so stop reconnect-loop correctly.
await _socket!.WaitForClosed.ConfigureAwait(false);

await CommandWriter.FlushAsync().ConfigureAwait(false);
await CommandWriter.CancelReaderLoopAsync().ConfigureAwait(false);

_logger.LogTrace(NatsLogEvents.Connection, "Connection {Name} is closed. Will cleanup and reconnect", _name);
lock (_gate)
Expand Down Expand Up @@ -802,9 +802,6 @@ private async ValueTask DisposeSocketComponentAsync(IAsyncDisposable component,
// Dispose Reader(Drain read buffers but no reads more)
private async ValueTask DisposeSocketAsync(bool asyncReaderDispose)
{
// writer's internal buffer/channel is not thread-safe, must wait until complete.
await CommandWriter.FlushAsync().ConfigureAwait(false);

if (_socket != null)
{
await DisposeSocketComponentAsync(_socket, "socket").ConfigureAwait(false);
Expand Down
18 changes: 11 additions & 7 deletions tests/NATS.Client.Core.Tests/ProtocolTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -343,10 +343,13 @@ await Retry.Until(
}

Assert.StartsWith("SUB foo", frames[0]);
Assert.StartsWith("PUB bar1", frames[1]);
Assert.StartsWith("PUB bar2", frames[2]);
Assert.StartsWith("PUB bar3", frames[3]);
Assert.StartsWith("PUB foo", frames[4]);

for (var i = 0; i < 100; i++)
{
Assert.StartsWith($"PUB bar{i}", frames[i + 1]);
}

Assert.StartsWith("PUB foo", frames[101]);

await nats.DisposeAsync();
}
Expand Down Expand Up @@ -425,9 +428,10 @@ internal override async ValueTask WriteReconnectCommandsAsync(CommandWriter comm
await base.WriteReconnectCommandsAsync(commandWriter, sid);

// Any additional commands to send on reconnect
await commandWriter.PublishAsync("bar1", default, default, default, NatsRawSerializer<byte>.Default, default);
await commandWriter.PublishAsync("bar2", default, default, default, NatsRawSerializer<byte>.Default, default);
await commandWriter.PublishAsync("bar3", default, default, default, NatsRawSerializer<byte>.Default, default);
for (var i = 0; i < 100; i++)
{
await commandWriter.PublishAsync($"bar{i}", default, default, default, NatsRawSerializer<byte>.Default, default);
}
}

protected override ValueTask ReceiveInternalAsync(string subject, string? replyTo, ReadOnlySequence<byte>? headersBuffer, ReadOnlySequence<byte> payloadBuffer)
Expand Down

0 comments on commit 54c007f

Please sign in to comment.