From 1bdf3198a09a13f164ecadc186ec012a8d2f9dae Mon Sep 17 00:00:00 2001 From: Caleb Lloyd <2414837+caleblloyd@users.noreply.github.com> Date: Fri, 12 Jan 2024 13:12:47 -0500 Subject: [PATCH] micro-optimizations in ProtocolWriter (#329) * micro-optimizations in ProtocolWriter Signed-off-by: Caleb Lloyd * fix constants bench Signed-off-by: Caleb Lloyd * fix comment Signed-off-by: Caleb Lloyd --------- Signed-off-by: Caleb Lloyd --- Directory.Build.props | 1 + .../MicroBenchmark/CommandConstantsBench.cs | 162 +++++++++++++++ sandbox/MicroBenchmark/MicroBenchmark.csproj | 5 +- .../Commands/ProtocolWriter.cs | 186 ++++++++++-------- 4 files changed, 268 insertions(+), 86 deletions(-) create mode 100644 sandbox/MicroBenchmark/CommandConstantsBench.cs diff --git a/Directory.Build.props b/Directory.Build.props index 2b363fc67..b579b3586 100644 --- a/Directory.Build.props +++ b/Directory.Build.props @@ -1,5 +1,6 @@ + latest enable $(NoWarn);CS1591;SA0001 true diff --git a/sandbox/MicroBenchmark/CommandConstantsBench.cs b/sandbox/MicroBenchmark/CommandConstantsBench.cs new file mode 100644 index 000000000..652934ecc --- /dev/null +++ b/sandbox/MicroBenchmark/CommandConstantsBench.cs @@ -0,0 +1,162 @@ +using System.Buffers.Binary; +using BenchmarkDotNet.Attributes; + +namespace MicroBenchmark; + +[MemoryDiagnoser] +[ShortRunJob] +[PlainExporter] +public class CommandConstantsBench +{ + private const ushort NewLineConst = '\r' + ('\n' << 4); + private const uint PubSpaceConst = 'P' + ('U' << 8) + ('B' << 16) + (' ' << 24); + private const ulong ConnectSpaceConst = 'C' + ('O' << 8) + ('N' << 16) + ('N' << 24) + ((ulong)'E' << 32) + ((ulong)'C' << 40) + ((ulong)'T' << 48) + ((ulong)' ' << 56); + + private static readonly byte[] Bytes = new byte[8]; + private static readonly ushort NewLineReadonly = BinaryPrimitives.ReadUInt16LittleEndian("\r\n"u8); + private static readonly uint PubSpaceReadonly = BinaryPrimitives.ReadUInt32LittleEndian("PUB "u8); + private static readonly ulong ConnectSpaceReadonly = BinaryPrimitives.ReadUInt64LittleEndian("CONNECT "u8); + + [Params(1_000_000)] + public int Iter { get; set; } + + [Params(2, 4, 8)] + public int Size { get; set; } + + private static ReadOnlySpan NewLine => "\r\n"u8; + + private static ReadOnlySpan PubSpace => "PUB "u8; + + private static ReadOnlySpan ConnectSpace => "CONNECT "u8; + + [Benchmark] + public void PubCopy() + { + var dest = Bytes.AsSpan(); + switch (Size) + { + case 2: + for (var i = 0; i < Iter; i++) + { + NewLine.CopyTo(dest); + } + + break; + case 4: + for (var i = 0; i < Iter; i++) + { + PubSpace.CopyTo(dest); + } + + break; + case 8: + for (var i = 0; i < Iter; i++) + { + ConnectSpace.CopyTo(dest); + } + + break; + } + } + + [Benchmark] + public void PubSetIndex() + { + var dest = Bytes.AsSpan(); + switch (Size) + { + case 2: + for (var i = 0; i < Iter; i++) + { + dest[0] = (byte)'\r'; + dest[1] = (byte)'\n'; + } + + break; + case 4: + for (var i = 0; i < Iter; i++) + { + dest[0] = (byte)'P'; + dest[1] = (byte)'U'; + dest[2] = (byte)'B'; + dest[3] = (byte)' '; + } + + break; + case 8: + for (var i = 0; i < Iter; i++) + { + dest[0] = (byte)'C'; + dest[1] = (byte)'O'; + dest[2] = (byte)'N'; + dest[3] = (byte)'N'; + dest[4] = (byte)'E'; + dest[5] = (byte)'C'; + dest[6] = (byte)'T'; + dest[7] = (byte)' '; + } + + break; + } + } + + [Benchmark] + public void PubBinaryConst() + { + var dest = Bytes.AsSpan(); + switch (Size) + { + case 2: + for (var i = 0; i < Iter; i++) + { + BinaryPrimitives.WriteUInt16LittleEndian(dest, NewLineConst); + } + + break; + case 4: + for (var i = 0; i < Iter; i++) + { + BinaryPrimitives.WriteUInt32LittleEndian(dest, PubSpaceConst); + } + + break; + case 8: + for (var i = 0; i < Iter; i++) + { + BinaryPrimitives.WriteUInt64LittleEndian(dest, ConnectSpaceConst); + } + + break; + } + } + + [Benchmark] + public void PubBinaryReadonly() + { + var dest = Bytes.AsSpan(); + switch (Size) + { + case 2: + for (var i = 0; i < Iter; i++) + { + BinaryPrimitives.WriteUInt16LittleEndian(dest, NewLineReadonly); + } + + break; + case 4: + for (var i = 0; i < Iter; i++) + { + BinaryPrimitives.WriteUInt32LittleEndian(dest, PubSpaceReadonly); + } + + break; + case 8: + for (var i = 0; i < Iter; i++) + { + BinaryPrimitives.WriteUInt64LittleEndian(dest, ConnectSpaceReadonly); + } + + break; + } + } +} diff --git a/sandbox/MicroBenchmark/MicroBenchmark.csproj b/sandbox/MicroBenchmark/MicroBenchmark.csproj index dad46b78e..a46099bf1 100644 --- a/sandbox/MicroBenchmark/MicroBenchmark.csproj +++ b/sandbox/MicroBenchmark/MicroBenchmark.csproj @@ -2,7 +2,7 @@ Exe - net6.0 + net8.0 enable enable false @@ -10,8 +10,7 @@ - - + diff --git a/src/NATS.Client.Core/Commands/ProtocolWriter.cs b/src/NATS.Client.Core/Commands/ProtocolWriter.cs index aaf224365..4ddc439d3 100644 --- a/src/NATS.Client.Core/Commands/ProtocolWriter.cs +++ b/src/NATS.Client.Core/Commands/ProtocolWriter.cs @@ -1,5 +1,7 @@ +using System.Buffers.Binary; using System.Buffers.Text; using System.IO.Pipelines; +using System.Runtime.CompilerServices; using System.Text; using System.Text.Json; using NATS.Client.Core.Internal; @@ -9,7 +11,30 @@ namespace NATS.Client.Core.Commands; internal sealed class ProtocolWriter { private const int MaxIntStringLength = 9; // https://github.com/nats-io/nats-server/blob/28a2a1000045b79927ebf6b75eecc19c1b9f1548/server/util.go#L85C8-L85C23 - private const int NewLineLength = 2; // \r\n + private const int NewLineLength = 2; // "\r\n" + private const int PubSpaceLength = 4; // "PUB " + private const int SubSpaceLength = 4; // "SUB " + private const int ConnectSpaceLength = 8; // "CONNECT " + private const int HpubSpaceLength = 5; // "HPUB " + private const int PingNewLineLength = 6; // "PING\r\n" + private const int PongNewLineLength = 6; // "PONG\r\n" + private const int UnsubSpaceLength = 6; // "UNSUB " + private const int UInt16Length = 2; + private const int UInt64Length = 8; + + // 2 bytes, make sure string length is 2 + private static readonly ushort NewLine = BinaryPrimitives.ReadUInt16LittleEndian("\r\n"u8); + + // 4 bytes, make sure string length is 4 + private static readonly uint PubSpace = BinaryPrimitives.ReadUInt32LittleEndian("PUB "u8); + private static readonly uint SubSpace = BinaryPrimitives.ReadUInt32LittleEndian("SUB "u8); + + // 8 bytes, make sure string length is 8 + private static readonly ulong ConnectSpace = BinaryPrimitives.ReadUInt64LittleEndian("CONNECT "u8); + private static readonly ulong HpubSpace = BinaryPrimitives.ReadUInt64LittleEndian("HPUB "u8); + private static readonly ulong PingNewLine = BinaryPrimitives.ReadUInt64LittleEndian("PING\r\n "u8); + private static readonly ulong PongNewLine = BinaryPrimitives.ReadUInt64LittleEndian("PONG\r\n "u8); + private static readonly ulong UnsubSpace = BinaryPrimitives.ReadUInt64LittleEndian("UNSUB "u8); private readonly PipeWriter _writer; private readonly HeaderWriter _headerWriter; @@ -26,24 +51,32 @@ public ProtocolWriter(PipeWriter writer, Encoding subjectEncoding, Encoding head // CONNECT {["option_name":option_value],...} public void WriteConnect(ClientOpts opts) { - WriteConstant(CommandConstants.ConnectWithPadding); + var span = _writer.GetSpan(UInt64Length); + BinaryPrimitives.WriteUInt64LittleEndian(span, ConnectSpace); + _writer.Advance(ConnectSpaceLength); var jsonWriter = new Utf8JsonWriter(_writer); JsonSerializer.Serialize(jsonWriter, opts, JsonContext.Default.ClientOpts); - WriteConstant(CommandConstants.NewLine); + span = _writer.GetSpan(UInt16Length); + BinaryPrimitives.WriteUInt16LittleEndian(span, NewLine); + _writer.Advance(NewLineLength); } // https://docs.nats.io/reference/reference-protocols/nats-protocol#ping-pong public void WritePing() { - WriteConstant(CommandConstants.PingNewLine); + var span = _writer.GetSpan(UInt64Length); + BinaryPrimitives.WriteUInt64LittleEndian(span, PingNewLine); + _writer.Advance(PingNewLineLength); } // https://docs.nats.io/reference/reference-protocols/nats-protocol#ping-pong public void WritePong() { - WriteConstant(CommandConstants.PongNewLine); + var span = _writer.GetSpan(UInt64Length); + BinaryPrimitives.WriteUInt64LittleEndian(span, PongNewLine); + _writer.Advance(PongNewLineLength); } // https://docs.nats.io/reference/reference-protocols/nats-protocol#pub @@ -59,12 +92,12 @@ public int WritePublish(string subject, T? value, NatsHeaders? headers, strin if (headers == null) { // 'PUB ' + subject +' '+ payload len +'\r\n' - ctrlLen = 4 + _subjectEncoding.GetByteCount(subject) + 1 + MaxIntStringLength + 2; + ctrlLen = PubSpaceLength + _subjectEncoding.GetByteCount(subject) + 1 + MaxIntStringLength + NewLineLength; } else { // 'HPUB ' + subject +' '+ header len +' '+ payload len +'\r\n' - ctrlLen = 5 + _subjectEncoding.GetByteCount(subject) + 1 + MaxIntStringLength + 1 + MaxIntStringLength + 2; + ctrlLen = HpubSpaceLength + _subjectEncoding.GetByteCount(subject) + 1 + MaxIntStringLength + 1 + MaxIntStringLength + NewLineLength; } if (replyTo != null) @@ -77,20 +110,13 @@ public int WritePublish(string subject, T? value, NatsHeaders? headers, strin var span = ctrlSpan; if (headers == null) { - span[0] = (byte)'P'; - span[1] = (byte)'U'; - span[2] = (byte)'B'; - span[3] = (byte)' '; - span = span[4..]; + BinaryPrimitives.WriteUInt32LittleEndian(span, PubSpace); + span = span[PubSpaceLength..]; } else { - span[0] = (byte)'H'; - span[1] = (byte)'P'; - span[2] = (byte)'U'; - span[3] = (byte)'B'; - span[4] = (byte)' '; - span = span[5..]; + BinaryPrimitives.WriteUInt64LittleEndian(span, HpubSpace); + span = span[HpubSpaceLength..]; } var written = _subjectEncoding.GetBytes(subject, span); @@ -118,8 +144,7 @@ public int WritePublish(string subject, T? value, NatsHeaders? headers, strin span = span[lenSpan.Length..]; } - span[0] = (byte)'\r'; - span[1] = (byte)'\n'; + BinaryPrimitives.WriteUInt16LittleEndian(span, NewLine); _writer.Advance(ctrlLen); var headersLength = 0L; @@ -140,10 +165,9 @@ public int WritePublish(string subject, T? value, NatsHeaders? headers, strin totalLength += _writer.UnflushedBytes - initialCount; } - span = _writer.GetSpan(2); - span[0] = (byte)'\r'; - span[1] = (byte)'\n'; - _writer.Advance(2); + span = _writer.GetSpan(UInt16Length); + BinaryPrimitives.WriteUInt16LittleEndian(span, NewLine); + _writer.Advance(NewLineLength); // write the length var lenWritten = 0; @@ -151,7 +175,7 @@ public int WritePublish(string subject, T? value, NatsHeaders? headers, strin { if (!Utf8Formatter.TryFormat(headersLength, lenSpan, out lenWritten)) { - throw new NatsException("Can not format integer."); + ThrowOnUtf8FormatFail(); } lenSpan[lenWritten] = (byte)' '; @@ -160,7 +184,7 @@ public int WritePublish(string subject, T? value, NatsHeaders? headers, strin if (!Utf8Formatter.TryFormat(totalLength, lenSpan[lenWritten..], out var tLen)) { - throw new NatsException("Can not format integer."); + ThrowOnUtf8FormatFail(); } lenWritten += tLen; @@ -168,7 +192,7 @@ public int WritePublish(string subject, T? value, NatsHeaders? headers, strin if (trim > 0) { // shift right - ctrlSpan[..(ctrlLen - trim - 2)].CopyTo(ctrlSpan[trim..]); + ctrlSpan[..(ctrlLen - trim - NewLineLength)].CopyTo(ctrlSpan[trim..]); ctrlSpan[..trim].Clear(); } @@ -179,42 +203,45 @@ public int WritePublish(string subject, T? value, NatsHeaders? headers, strin // SUB [queue group] public void WriteSubscribe(int sid, string subject, string? queueGroup, int? maxMsgs) { - var offset = 0; + // 'SUB ' + subject +' '+ sid +'\r\n' + var ctrlLen = SubSpaceLength + _subjectEncoding.GetByteCount(subject) + 1 + MaxIntStringLength + NewLineLength; - var maxLength = CommandConstants.SubWithPadding.Length - + subject.Length + 1 - + (queueGroup == null ? 0 : queueGroup.Length + 1) - + MaxIntStringLength - + NewLineLength; // newline + if (queueGroup != null) + { + // len += queueGroup +' ' + ctrlLen += _subjectEncoding.GetByteCount(queueGroup) + 1; + } - var writableSpan = _writer.GetSpan(maxLength); - CommandConstants.SubWithPadding.CopyTo(writableSpan); - offset += CommandConstants.SubWithPadding.Length; + var span = _writer.GetSpan(ctrlLen); + BinaryPrimitives.WriteUInt32LittleEndian(span, SubSpace); + var size = SubSpaceLength; + span = span[SubSpaceLength..]; - subject.WriteASCIIBytes(writableSpan.Slice(offset)); - offset += subject.Length; - writableSpan.Slice(offset)[0] = (byte)' '; - offset += 1; + var written = _subjectEncoding.GetBytes(subject, span); + span[written] = (byte)' '; + size += written + 1; + span = span[(written + 1)..]; if (queueGroup != null) { - queueGroup.WriteASCIIBytes(writableSpan.Slice(offset)); - offset += queueGroup.Length; - writableSpan.Slice(offset)[0] = (byte)' '; - offset += 1; + written = _subjectEncoding.GetBytes(subject, span); + span[written] = (byte)' '; + size += written + 1; + span = span[(written + 1)..]; } - if (!Utf8Formatter.TryFormat(sid, writableSpan.Slice(offset), out var written)) + if (!Utf8Formatter.TryFormat(sid, span, out written)) { - throw new NatsException("Can not format integer."); + ThrowOnUtf8FormatFail(); } - offset += written; + size += written; + span = span[written..]; - CommandConstants.NewLine.CopyTo(writableSpan.Slice(offset)); - offset += CommandConstants.NewLine.Length; + BinaryPrimitives.WriteUInt16LittleEndian(span, NewLine); + size += NewLineLength; - _writer.Advance(offset); + _writer.Advance(size); // Immediately send UNSUB to minimize the risk of // receiving more messages than in case they are published @@ -229,52 +256,45 @@ public void WriteSubscribe(int sid, string subject, string? queueGroup, int? max // UNSUB [max_msgs] public void WriteUnsubscribe(int sid, int? maxMessages) { - var offset = 0; - var maxLength = CommandConstants.UnsubWithPadding.Length - + MaxIntStringLength - + ((maxMessages != null) ? (1 + MaxIntStringLength) : 0) - + NewLineLength; + // 'UNSUB ' + sid +'\r\n' + var ctrlLen = UnsubSpaceLength + MaxIntStringLength + NewLineLength; + if (maxMessages != null) + { + // len +=' '+ max_msgs + ctrlLen += 1 + MaxIntStringLength; + } - var writableSpan = _writer.GetSpan(maxLength); - CommandConstants.UnsubWithPadding.CopyTo(writableSpan); - offset += CommandConstants.UnsubWithPadding.Length; + var span = _writer.GetSpan(ctrlLen); + BinaryPrimitives.WriteUInt64LittleEndian(span, UnsubSpace); + var size = UnsubSpaceLength; + span = span[UnsubSpaceLength..]; - if (!Utf8Formatter.TryFormat(sid, writableSpan.Slice(offset), out var written)) + if (!Utf8Formatter.TryFormat(sid, span, out var written)) { - throw new NatsException("Can not format integer."); + ThrowOnUtf8FormatFail(); } - offset += written; - + size += written; + span = span[written..]; if (maxMessages != null) { - writableSpan.Slice(offset)[0] = (byte)' '; - offset += 1; - if (!Utf8Formatter.TryFormat(maxMessages.Value, writableSpan.Slice(offset), out written)) + span[0] = (byte)' '; + if (!Utf8Formatter.TryFormat(maxMessages.Value, span[1..], out written)) { - throw new NatsException("Can not format integer."); + ThrowOnUtf8FormatFail(); } - offset += written; + size += written + 1; + span = span[(written + 1)..]; } - CommandConstants.NewLine.CopyTo(writableSpan.Slice(offset)); - offset += CommandConstants.NewLine.Length; + BinaryPrimitives.WriteUInt16LittleEndian(span, NewLine); + size += NewLineLength; - _writer.Advance(offset); + _writer.Advance(size); } - internal void WriteRaw(byte[] protocol) - { - var span = _writer.GetSpan(protocol.Length); - protocol.CopyTo(span); - _writer.Advance(protocol.Length); - } - - private void WriteConstant(ReadOnlySpan constant) - { - var writableSpan = _writer.GetSpan(constant.Length); - constant.CopyTo(writableSpan); - _writer.Advance(constant.Length); - } + // optimization detailed here: https://github.com/nats-io/nats.net.v2/issues/320#issuecomment-1886165748 + [MethodImpl(MethodImplOptions.NoInlining)] + private static void ThrowOnUtf8FormatFail() => throw new NatsException("Can not format integer."); }