Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send buffer changes #346

Merged
merged 30 commits into from
Feb 2, 2024
Merged

Send buffer changes #346

merged 30 commits into from
Feb 2, 2024

Conversation

mtmk
Copy link
Collaborator

@mtmk mtmk commented Jan 22, 2024

This PR

Method Iter Mean Error StdDev Allocated
PublishAsync 64 156.4 us 20.34 us 1.12 us 488 B
PublishAsync 512 299.8 us 146.19 us 8.01 us 520 B
PublishAsync 1024 482.9 us 218.91 us 12.00 us 488 B

Main

Method Iter Mean Error StdDev Allocated
PublishAsync 64 165.9 us 39.41 us 2.16 us 488 B
PublishAsync 512 375.1 us 81.65 us 4.48 us 488 B
PublishAsync 1024 607.0 us 152.22 us 8.34 us 488 B

@mtmk mtmk linked an issue Jan 22, 2024 that may be closed by this pull request
Copy link
Collaborator

@to11mtm to11mtm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some first-pass comments/questions, will try to review further and pull down for a look.

return ConnectStateMachineAsync(true, connectOpts, cancellationToken);
}

Interlocked.Add(ref _counter.PendingMessages, 1);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: Is there a reason to use this over Interlocked.Increment?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

copy/paste error. (Was there an argument about Add() being faster?🤷‍♂️)
Fixed.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Was there an argument about Add() being faster?🤷‍♂️)

It might be?

As always devil is in the details, I can't comment for ARM but for x86/x64 I did start to look at the tables on Agner Fog. However,

  • It has been a minute since I have peeked at these tables
  • They do not list LOCK INC
  • It's entirely possible that the JIT decides to pick the best option regardless...

However digging did lead me to this which claims that INC doesn't update the carry flag compared to ADD. Whether that still makes a difference in modern arches, I can't say...

This does almost certainly qualify as a micro opt either way, .Increment()/.Decrement() feel more readable but am happy with either option.

}
finally
{
_semLock.Release();
Interlocked.Add(ref _counter.PendingMessages, -1);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per above, also why not Interlocked.Decrement?

Comment on lines 119 to 128
PipeWriter bw;
lock (_lock)
{
EnqueueCommand(success);
if (_pipeWriter == null)
ThrowOnDisconnected();
bw = _pipeWriter!;
}

_protocolWriter.WriteConnect(bw, connectOpts!);
await bw.FlushAsync(cancellationToken).ConfigureAwait(false);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels curious enough that a comment may be helpful?

Main question on first glance, what does the lock + capture buy us over the Semaphore usage?

Copy link
Collaborator Author

@mtmk mtmk Jan 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_pipeWriter is replaced on reconnect in Reset(). I wanted to avoid any possible stale reads. afaik in this case semaphore or channel 'async locks' would wouldn't provide any memory consistency guarantees.

Comment on lines 196 to 213
public ValueTask PublishAsync<T>(string subject, T? value, NatsHeaders? headers, string? replyTo, INatsSerialize<T> serializer, CancellationToken cancellationToken)
{
#pragma warning disable CA2016
#pragma warning disable VSTHRD103
if (!_semLock.Wait(0))
#pragma warning restore VSTHRD103
#pragma warning restore CA2016
NatsPooledBufferWriter<byte>? headersBuffer = null;
if (headers != null)
{
return PublishStateMachineAsync(false, subject, value, headers, replyTo, serializer, cancellationToken);
if (!_pool.TryRent(out headersBuffer))
headersBuffer = new NatsPooledBufferWriter<byte>();
_headerWriter.Write(headersBuffer, headers);
}

if (_flushTask is { IsCompletedSuccessfully: false })
{
return PublishStateMachineAsync(true, subject, value, headers, replyTo, serializer, cancellationToken);
}
NatsPooledBufferWriter<byte> payloadBuffer;
if (!_pool.TryRent(out payloadBuffer!))
payloadBuffer = new NatsPooledBufferWriter<byte>();
if (value != null)
serializer.Serialize(payloadBuffer, value);

return PublishLockedAsync(subject, replyTo, payloadBuffer, headersBuffer, cancellationToken);
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interestingly, (not sure if it was a factor in this change) but it looks like this would move or less minimize the concerns I brought up in #318.

Comment on lines 149 to 154
lock (_lock)
{
EnqueueCommand(success);
if (_pipeWriter == null)
ThrowOnDisconnected();
bw = _pipeWriter!;
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given the re-use of this pattern, is there any potential value in us trying to consolidate shared logic of the lock/get (and possibly more) into a method call to lower maintenance concerns? (Maybe not at this stage but though it was worth asking.)

private async ValueTask PublishLockedAsync(string subject, string? replyTo, NatsPooledBufferWriter<byte> payloadBuffer, NatsPooledBufferWriter<byte>? headersBuffer, CancellationToken cancellationToken)
{
// Interlocked.Add(ref _counter.PendingMessages, 1);
await _semLock.WaitAsync(cancellationToken).ConfigureAwait(false);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Speaking of, putting the WaitAsync is probably 'better' here overall (the old Wait(0) -> statemachine pattern has some thrashing concerns) but it does raise some interesting questions in my head again as to the driver.

(Below is a language-lawyer disclaimer and... well, hopefully we aren't worried about it, but if we are, wanted to bring it up.)
This is going to be more fair than the old method (b/c it's always WaitAsync, but it is worth noting that SemaphoreSlim like most locking primitives in .NET do not guarantee fairness as to which thread gets released first citation.

If we -do- need stronger ordering/fairness on these, we will need to consider an alternative... and also remember the caveats mentioned in the citation; they are certainly not 'absolutes' however they may require consideration in a solution (again, only if needed, this can be a rabbit hole.)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just swapped this out with a channel. WDYT? (@stebet also raised concerns about SemaphoreSlim before)

// 8520 should fit into 6 packets on 1500 MTU TLS connection or 1 packet on 9000 MTU TLS connection
// assuming 40 bytes TCP overhead + 40 bytes TLS overhead per packet
const int maxSendMemoryLength = 8520;
var sendMemory = new Memory<byte>(new byte[maxSendMemoryLength]);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: Does this get re-started between connections?
And if yes is there a way to possibly reuse?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it lives as long as the CommandWriter object (and in turn NatsConnection object). On reconnect pipeline is renewed (not pooled at the moment) but the read loop stays in place.

Comment on lines 600 to 605
// The ArrayPool<T> class has a maximum threshold of 1024 * 1024 for the maximum length of
// pooled arrays, and once this is exceeded it will just allocate a new array every time
// of exactly the requested size. In that case, we manually round up the requested size to
// the nearest power of two, to ensure that repeated consecutive writes when the array in
// use is bigger than that threshold don't end up causing a resize every single time.
if (minimumSize > 1024 * 1024)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

<3 the exposition here! Great breakdown of the why.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not me unfortunately 😅 originally taken from the community toolkit (as mentioned in NatsBufferWriter.cs class). I'll make sure to put an attribution comment later.

@mtmk mtmk removed a link to an issue Jan 23, 2024
mtmk added 2 commits January 23, 2024 14:04
# Conflicts:
#	src/NATS.Client.Core/Commands/ProtocolWriter.cs
#	src/NATS.Client.Core/Internal/NatsPipeliningWriteProtocolProcessor.cs
#	tests/NATS.Client.TestUtilities/InMemoryTestLoggerFactory.cs
Copy link
Collaborator

@to11mtm to11mtm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left food for thought, primarily around the channel changes.

In summary:

As we are using the channel as a semaphore this way (i.e. using full channel to signal waiting as opposed to an empty channel to signal waiting,) it is better to use WriteAsync throughout if we want to have better ordering fairness and minimize thrashing on concurrent writes.

If we -do- need to 'prioritize' certain locks, we may want to consider how to do (same problem exists with a semaphore

Comment on lines 80 to 83
while (!_channelLock.Writer.TryWrite(1))
{
await _channelLock.Writer.WaitToWriteAsync().ConfigureAwait(false);
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. This inversion of channel usage does cause some challenges.

I think, -especially- in the dispose,

try
{
    await _channelLock.Writer.WriteAsync(1).ConfigureAwait(false);
}
catch(ChannelClosedExeption _)
{
}

Would normally be a bit less 'thrashy' in the case of a failed write.

It may be worth considering such in other cases if possible as well, however I need to take a look at overall usage to provide an informed statement.

}
finally
{
_semLock.Release();
while (!_channelLock.Reader.TryRead(out _))
Copy link
Collaborator

@to11mtm to11mtm Jan 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we aren't doing so elsewhere, we should probably .TryComplete() the channel here, such that if any other writers/readers happen to be waiting, they will return a false on WaitToWrite/WaitToRead or throw on WriteAsync/ReadAsync

Note: If we use TryComplete we should make sure anything using WaitToXXXAsync bails in an appropriate way if it's false.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so would you say using ReadAsync() would be better?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔

  1. Need to think more about the .Dispose() but probably yes.
  2. I think in general, ReadAsync should be used (unless a strong argument exists for thrashing on non-critical ops)
  3. I'll make a sample helper for the pattern I'm thinking of (or pull branch and provide commit link or draft pr against your branch if that's ok and I get the opportunity to)

Where using a BoundedChannel vs a SemaphoreSlim can be challenging, is in deciding, for lack of better words, 'the right pivot' on how to use the channel.

Depending on the need, and how the underlying synchronization abstraction is implemented, can have a sizeable impact on performance.

Unless there is a compelling reason to do otherwise I think I'm settled on .ReadAsync being a better idea for what we are doing here.

If we need more, we can go deeper; BoundedChannel is lower alloc than SemaphoreSlim for many cases, however there may be future opportunities for a better pooled abstraction[0]

[0] - In general, if a channel has a waiting value on read, it's no alloc. if it's the FIRST waiter with nondefault cancellation token, it gets a pooled instance. if there's a token or the pooled waiter is used, alloc. A little better than semaphoreslim and has in some ways more reliable semantics, but it can be tricky at first.

{
Interlocked.Increment(ref _counter.PendingMessages);

while (!_channelLock.Writer.TryWrite(1))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK Yeah I think this pattern (.TryWrite loop) even for sub/publish raises some fairness/behavioral consistency concerns.

In general WriteAsync is going to provide a slightly 'more fair' ordered queue for callers than the .TryWrite() loop, as for a bounded channel, once an item is read, things happen in this order:

  1. If there are any pending WriteAsync calls in the queue, and any are not cancelled, the first one is marked as completed and added to the queue (this is still in main read lock, so all thread safe) and the method returns.
  2. If previous didn't return, run through the entire set of waiters and wake them all up.

So, in general WriteAsync will be a lot less thrashy (queue vs free-for-all when contention happens) and it's worth noting that on the happy path (no contention) it won't alloc.

But that order of operations is important to consider; For instance, in extreme cases, one could use WriteAsync for 'priority' operations, and take the thrashing pain on non-priority ops (or some sort of magic.)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TBH I was aiming at high throughput on a tight loop, but that's just my test and I am actually not seeing much change between TryWrite/loop vs. WriteAsync. Happy to go with WriteAsync here.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TBH I was aiming at high throughput on a tight loop, but that's just my test and I am actually not seeing much change between TryWrite/loop vs. WriteAsync. Happy to go with WriteAsync here.

How hard would it be to add some concurrent tests where we see what happens when we have a lot of threads trying to do the needful?

}
internal sealed class NatsPooledBufferWriter<T> : IBufferWriter<T>, IObjectPoolNode<NatsPooledBufferWriter<T>>
{
private const int DefaultInitialBufferSize = 256;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice in future if there was a way to set this (i.e. cases where people know they have tiny messages for low mem environments, or cases where people know they have 1K payloads as the norm).

OTOH I don't think it's a big deal for the latter case (which is the only one I know of being 'real')

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we derive a value from write buffer size for example? I thinking it might not be relevant if the implementation changes in the future.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a derived value would be fine if the math can work out.

namespace NATS.Client.Core.Commands;

internal struct PingCommand
internal class PingCommand : IValueTaskSource<TimeSpan>, IObjectPoolNode<PingCommand>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, this will close #321! <3

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we pull this into a separate PR? It's kind of irrelevant here and this PR might linger a bit if it ever goes in that is.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Separate PR wouldn't hurt if it's not painful to do in the existing codebase, if we have benchmarks to be sure the alloc savings doesn't cost us, even bettter?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ping command moved here #358

@stebet
Copy link
Collaborator

stebet commented Jan 26, 2024

Taking a look at this now

@stebet
Copy link
Collaborator

stebet commented Jan 26, 2024

This might also be something to take a look at to make things simpler for locking: https://dotnet.github.io/dotNext/features/threading/exclusive.html

@mtmk
Copy link
Collaborator Author

mtmk commented Jan 28, 2024

This might also be something to take a look at to make things simpler for locking: https://dotnet.github.io/dotNext/features/threading/exclusive.html

This library now net8.0 only. tried pulling the code in but there is way too much dependency I gave up at 6 KLOC.

@caleblloyd
Copy link
Collaborator

caleblloyd commented Jan 31, 2024

I added a new benchmark to test Parallel Publish in #367 and here are the results:

main:

Method Concurrency Mean Error StdDev Gen0 Allocated
PublishParallelAsync 1 1.109 s 1.2463 s 0.0683 s 10000.0000 30.53 MB
PublishParallelAsync 2 2.525 s 0.6693 s 0.0367 s 205000.0000 604.19 MB
PublishParallelAsync 4 2.675 s 0.2779 s 0.0152 s 218000.0000 640.63 MB

This PR:

Method Concurrency Mean Error StdDev Gen0 Allocated
PublishParallelAsync 1 1.314 s 0.9356 s 0.0513 s 10000.0000 30.52 MB
PublishParallelAsync 2 1.410 s 0.3972 s 0.0218 s 25000.0000 76.3 MB
PublishParallelAsync 4 1.829 s 1.1481 s 0.0629 s 48000.0000 144.59 MB

Looking great!

src/NATS.Client.Core/Commands/CommandWriter.cs Outdated Show resolved Hide resolved
src/NATS.Client.Core/Commands/CommandWriter.cs Outdated Show resolved Hide resolved
src/NATS.Client.Core/Commands/CommandWriter.cs Outdated Show resolved Hide resolved
src/NATS.Client.Core/Commands/CommandWriter.cs Outdated Show resolved Hide resolved
mtmk and others added 2 commits February 2, 2024 18:02
Copy link
Collaborator

@caleblloyd caleblloyd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM 🎉

@mtmk mtmk merged commit d7b6baa into main Feb 2, 2024
10 checks passed
@mtmk mtmk deleted the 341-send-buffer-refinement branch February 2, 2024 19:46
mtmk added a commit that referenced this pull request Feb 2, 2024
* Send buffer changes (#346)
* Added deliver_group to consumer config (#366)
* Ping command as value task source (#358)
@mtmk mtmk mentioned this pull request Feb 2, 2024
mtmk added a commit that referenced this pull request Feb 2, 2024
* Send buffer changes (#346)
* Added deliver_group to consumer config (#366)
* Ping command as value task source (#358)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants