diff --git a/sandbox/MicroBenchmark/KVBench.cs b/sandbox/MicroBenchmark/KVBench.cs index 0bf138731..2e291646e 100644 --- a/sandbox/MicroBenchmark/KVBench.cs +++ b/sandbox/MicroBenchmark/KVBench.cs @@ -8,6 +8,7 @@ namespace MicroBenchmark; [MemoryDiagnoser] +// [ShortRunJob] [PlainExporter] public class KVBench { @@ -25,79 +26,116 @@ public async Task SetupAsync() _store = (NatsKVStore)(await _kv.CreateStoreAsync("benchmark")); } - [Benchmark] - public async ValueTask TryGetAsync() - { - var result = await _store.TryGetEntryAsync("does.not.exist"); - if (result is { Success: false, Error: NatsKVKeyNotFoundException }) - { - return 1; - } - - return 0; - } + // [Benchmark(Baseline = true)] + // public async ValueTask TryGetAsync() + // { + // var result = await _store.TryGetEntryAsync("does.not.exist"); + // if (result is { Success: false, Error: NatsKVKeyNotFoundException }) + // { + // return 1; + // } + // + // return 0; + // } + // + // [Benchmark] + // public async ValueTask TryGetAsync2() + // { + // var result = await _store.TryGetEntryAsync2("does.not.exist"); + // if (result is { Success: false, Error: NatsKVKeyNotFoundException }) + // { + // return 1; + // } + // + // return 0; + // } + // + // [Benchmark] + // public async ValueTask TryGetAsync3() + // { + // var result = await _store.TryGetEntryAsync3("does.not.exist"); + // if (result is { Success: false, Error: NatsKVKeyNotFoundException }) + // { + // return 1; + // } + // + // return 0; + // } [Benchmark(Baseline = true)] - public async ValueTask GetAsync() - { - try - { - await _store.GetEntryAsync("does.not.exist"); - } - catch (NatsKVKeyNotFoundException) - { - return 1; - } - - return 0; - } + public string StringOrig() => _store.StringOrig("does.not.exist"); [Benchmark] - public async ValueTask TryGetMultiAsync() - { - List tasks = new(); - for (var i = 0; i < 100; i++) - { - tasks.Add(Task.Run(async () => - { - var result = await _store.TryGetEntryAsync("does.not.exist"); - if (result is { Success: false, Error: NatsKVKeyNotFoundException }) - { - return 1; - } - - return 0; - })); - } - - await Task.WhenAll(tasks); - - return 0; - } + public string StringInter() => _store.StringInter("does.not.exist"); [Benchmark] - public async ValueTask GetMultiAsync() - { - List tasks = new(); - for (var i = 0; i < 100; i++) - { - tasks.Add(Task.Run(async () => - { - try - { - await _store.GetEntryAsync("does.not.exist"); - } - catch (NatsKVKeyNotFoundException) - { - return 1; - } + public string StringConcat() => _store.StringConcat("does.not.exist"); - return 0; - })); - } - - await Task.WhenAll(tasks); - - return 0; - } + [Benchmark] + public string StringCreate() => _store.StringCreate("does.not.exist"); + + // + // [Benchmark(Baseline = true)] + // public async ValueTask GetAsync() + // { + // try + // { + // await _store.GetEntryAsync("does.not.exist"); + // } + // catch (NatsKVKeyNotFoundException) + // { + // return 1; + // } + // + // return 0; + // } + // + // [Benchmark] + // public async ValueTask TryGetMultiAsync() + // { + // List tasks = new(); + // for (var i = 0; i < 100; i++) + // { + // tasks.Add(Task.Run(async () => + // { + // var result = await _store.TryGetEntryAsync("does.not.exist"); + // if (result is { Success: false, Error: NatsKVKeyNotFoundException }) + // { + // return 1; + // } + // + // return 0; + // })); + // } + // + // await Task.WhenAll(tasks); + // + // return 0; + // } + // + // [Benchmark] + // public async ValueTask GetMultiAsync() + // { + // List tasks = new(); + // for (var i = 0; i < 100; i++) + // { + // tasks.Add(Task.Run(async () => + // { + // try + // { + // await _store.GetEntryAsync("does.not.exist"); + // } + // catch (NatsKVKeyNotFoundException) + // { + // return 1; + // } + // + // return 0; + // })); + // } + // + // await Task.WhenAll(tasks); + // + // return 0; + // } } diff --git a/src/NATS.Client.KeyValueStore/NatsKVStore.cs b/src/NATS.Client.KeyValueStore/NatsKVStore.cs index 5c5072571..fd32d7c8f 100644 --- a/src/NATS.Client.KeyValueStore/NatsKVStore.cs +++ b/src/NATS.Client.KeyValueStore/NatsKVStore.cs @@ -53,12 +53,14 @@ public class NatsKVStore : INatsKVStore private static readonly NatsKVException InvalidTimestampException = new("Can't parse timestamp header"); private static readonly NatsKVException InvalidOperationException = new("Can't parse operation header"); private readonly INatsJSStream _stream; + private readonly string _kvBucket; internal NatsKVStore(string bucket, INatsJSContext context, INatsJSStream stream) { Bucket = bucket; JetStreamContext = context; _stream = stream; + _kvBucket = $"$KV.{Bucket}."; } /// @@ -321,6 +323,294 @@ public async ValueTask>> TryGetEntryAsync(string ke } } + +#if NET8_0_OR_GREATER + static void CreateKeyString(Span span, (string prefix, string key) state) + { + state.prefix.CopyTo(span); + state.key.CopyTo(span[state.prefix.Length..]); + } +#endif + +#if !NETSTANDARD + [AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder<>))] +#endif + public async ValueTask>> TryGetEntryAsync2(string key, ulong revision = default, INatsDeserialize? serializer = default, CancellationToken cancellationToken = default) + { + ValidateKey(key); + serializer ??= JetStreamContext.Connection.Opts.SerializerRegistry.GetDeserializer(); + +#if NET8_0_OR_GREATER + var keySubject = string.Create(key.Length + _kvBucket.Length, (_kvBucket, key), CreateKeyString); +#else + var keySubject = $"{_kvBucket}{key}"; +#endif + + var request = new StreamMsgGetRequest(); + if (revision == default) + { + request.LastBySubj = keySubject; + } + else + { + request.Seq = revision; + request.NextBySubj = keySubject; + } + + if (_stream.Info.Config.AllowDirect) + { + var direct = await _stream.GetDirectAsync(request, serializer, cancellationToken); + + if (direct is { Headers: { } headers } msg) + { + if (headers.Code == 404) + return NatsKVKeyNotFoundException.Default; + + if (!headers.TryGetLastValue(NatsSubject, out var subject)) + return MissingSequenceHeaderException; + + if (revision != default) + { + if (!string.Equals(subject, keySubject, StringComparison.Ordinal)) + { + return UnexpectedSubjectException; + } + } + + if (!headers.TryGetLastValue(NatsSequence, out var sequenceValue)) + return MissingSequenceHeaderException; + + if (!ulong.TryParse(sequenceValue, out var sequence)) + return InvalidSequenceException; + + if (!headers.TryGetLastValue(NatsTimeStamp, out var timestampValue)) + return MissingTimestampHeaderException; + + if (!DateTimeOffset.TryParse(timestampValue, out var timestamp)) + return InvalidTimestampException; + + var operation = NatsKVOperation.Put; + if (headers.TryGetValue(KVOperation, out var operationValues)) + { + if (operationValues.Count != 1) + return UnexpectedNumberOfOperationHeadersException; + + if (!Enum.TryParse(operationValues[0], ignoreCase: true, out operation)) + return InvalidOperationException; + } + + if (operation is NatsKVOperation.Del or NatsKVOperation.Purge) + { + return new NatsKVKeyDeletedException(sequence); + } + + return new NatsKVEntry(Bucket, key) + { + Bucket = Bucket, + Key = key, + Created = timestamp, + Revision = sequence, + Operation = operation, + Value = msg.Data, + Delta = 0, + UsedDirectGet = true, + Error = msg.Error, + }; + } + else + { + return MissingHeadersException; + } + } + else + { + var response = await _stream.GetAsync(request, cancellationToken); + + if (revision != default) + { + if (string.Equals(response.Message.Subject, keySubject, StringComparison.Ordinal)) + { + return UnexpectedSubjectException; + } + } + + T? data; + NatsDeserializeException? deserializeException = null; + if (response.Message.Data.Length > 0) + { + var buffer = new ReadOnlySequence(response.Message.Data); + + try + { + data = serializer.Deserialize(buffer); + } + catch (Exception e) + { + deserializeException = new NatsDeserializeException(buffer.ToArray(), e); + data = default; + } + } + else + { + data = default; + } + + return new NatsKVEntry(Bucket, key) + { + Created = response.Message.Time, + Revision = response.Message.Seq, + Value = data, + UsedDirectGet = false, + Error = deserializeException, + }; + } + } + + #if !NETSTANDARD + [AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder<>))] +#endif + public async ValueTask>> TryGetEntryAsync3(string key, ulong revision = default, INatsDeserialize? serializer = default, CancellationToken cancellationToken = default) + { + ValidateKey(key); + serializer ??= JetStreamContext.Connection.Opts.SerializerRegistry.GetDeserializer(); + + var keySubject = $"{_kvBucket}{key}"; + + var request = new StreamMsgGetRequest(); + if (revision == default) + { + request.LastBySubj = keySubject; + } + else + { + request.Seq = revision; + request.NextBySubj = keySubject; + } + + if (_stream.Info.Config.AllowDirect) + { + var direct = await _stream.GetDirectAsync(request, serializer, cancellationToken); + + if (direct is { Headers: { } headers } msg) + { + if (headers.Code == 404) + return NatsKVKeyNotFoundException.Default; + + if (!headers.TryGetLastValue(NatsSubject, out var subject)) + return MissingSequenceHeaderException; + + if (revision != default) + { + if (!string.Equals(subject, keySubject, StringComparison.Ordinal)) + { + return UnexpectedSubjectException; + } + } + + if (!headers.TryGetLastValue(NatsSequence, out var sequenceValue)) + return MissingSequenceHeaderException; + + if (!ulong.TryParse(sequenceValue, out var sequence)) + return InvalidSequenceException; + + if (!headers.TryGetLastValue(NatsTimeStamp, out var timestampValue)) + return MissingTimestampHeaderException; + + if (!DateTimeOffset.TryParse(timestampValue, out var timestamp)) + return InvalidTimestampException; + + var operation = NatsKVOperation.Put; + if (headers.TryGetValue(KVOperation, out var operationValues)) + { + if (operationValues.Count != 1) + return UnexpectedNumberOfOperationHeadersException; + + if (!Enum.TryParse(operationValues[0], ignoreCase: true, out operation)) + return InvalidOperationException; + } + + if (operation is NatsKVOperation.Del or NatsKVOperation.Purge) + { + return new NatsKVKeyDeletedException(sequence); + } + + return new NatsKVEntry(Bucket, key) + { + Bucket = Bucket, + Key = key, + Created = timestamp, + Revision = sequence, + Operation = operation, + Value = msg.Data, + Delta = 0, + UsedDirectGet = true, + Error = msg.Error, + }; + } + else + { + return MissingHeadersException; + } + } + else + { + var response = await _stream.GetAsync(request, cancellationToken); + + if (revision != default) + { + if (string.Equals(response.Message.Subject, keySubject, StringComparison.Ordinal)) + { + return UnexpectedSubjectException; + } + } + + T? data; + NatsDeserializeException? deserializeException = null; + if (response.Message.Data.Length > 0) + { + var buffer = new ReadOnlySequence(response.Message.Data); + + try + { + data = serializer.Deserialize(buffer); + } + catch (Exception e) + { + deserializeException = new NatsDeserializeException(buffer.ToArray(), e); + data = default; + } + } + else + { + data = default; + } + + return new NatsKVEntry(Bucket, key) + { + Created = response.Message.Time, + Revision = response.Message.Seq, + Value = data, + UsedDirectGet = false, + Error = deserializeException, + }; + } + } + + public string StringOrig(string key) => $"$KV.{Bucket}.{key}"; + + public string StringInter(string key) => $"{_kvBucket}{key}"; + + public string StringConcat(string key) => _kvBucket + key; + + public string StringCreate(string key) + { +#if NET8_0_OR_GREATER + return string.Create(key.Length + _kvBucket.Length, (_kvBucket, key), CreateKeyString); +#else + throw new NotImplementedException(); +#endif + } + /// public IAsyncEnumerable> WatchAsync(string key, INatsDeserialize? serializer = default, NatsKVWatchOpts? opts = default, CancellationToken cancellationToken = default) => WatchAsync([key], serializer, opts, cancellationToken);