From 9b4d49bfa4a51f90bf9c85358a295761f0542785 Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Wed, 4 Dec 2024 17:21:12 +0000 Subject: [PATCH] Try-get implementation --- sandbox/MicroBenchmark/KVBench.cs | 27 +++- sandbox/MicroBenchmark/MicroBenchmark.csproj | 2 +- src/NATS.Client.Core/NatsResult.cs | 7 + .../NatsKVException.cs | 2 + src/NATS.Client.KeyValueStore/NatsKVStore.cs | 135 ++++++++++++++++++ 5 files changed, 168 insertions(+), 5 deletions(-) diff --git a/sandbox/MicroBenchmark/KVBench.cs b/sandbox/MicroBenchmark/KVBench.cs index cb3c6cd00..7e773d414 100644 --- a/sandbox/MicroBenchmark/KVBench.cs +++ b/sandbox/MicroBenchmark/KVBench.cs @@ -1,8 +1,10 @@ -using BenchmarkDotNet.Attributes; +using BenchmarkDotNet.Attributes; using NATS.Client.Core; using NATS.Client.JetStream; using NATS.Client.KeyValueStore; +#pragma warning disable CS8618 + namespace MicroBenchmark; [MemoryDiagnoser] @@ -13,7 +15,7 @@ public class KVBench private NatsConnection _nats; private NatsJSContext _js; private NatsKVContext _kv; - private INatsKVStore _store; + private NatsKVStore _store; [Params(64, 512, 1024)] public int Iter { get; set; } @@ -24,18 +26,35 @@ public async Task SetupAsync() _nats = new NatsConnection(); _js = new NatsJSContext(_nats); _kv = new NatsKVContext(_js); - _store = await _kv.CreateStoreAsync("benchmark"); + _store = (NatsKVStore)(await _kv.CreateStoreAsync("benchmark")); } [Benchmark] public async ValueTask TryGetAsync() + { + var total = 0; + for (var i = 0; i < Iter; i++) + { + var result = await _store.TryGetEntryAsync("does.not.exist"); + if (result is { Success: false, Error: NatsKVKeyNotFoundException }) + total++; + } + + if (total != Iter) + throw new Exception(); + + return total; + } + + [Benchmark] + public async ValueTask GetAsyncNew() { var total = 0; for (var i = 0; i < Iter; i++) { try { - await _store.GetEntryAsync("does.not.exist"); + await _store.GetEntryAsyncNew("does.not.exist"); } catch (NatsKVKeyNotFoundException) { diff --git a/sandbox/MicroBenchmark/MicroBenchmark.csproj b/sandbox/MicroBenchmark/MicroBenchmark.csproj index 6e3ae052e..133cafa6b 100644 --- a/sandbox/MicroBenchmark/MicroBenchmark.csproj +++ b/sandbox/MicroBenchmark/MicroBenchmark.csproj @@ -10,7 +10,7 @@ - + diff --git a/src/NATS.Client.Core/NatsResult.cs b/src/NATS.Client.Core/NatsResult.cs index 3b246aa6b..90d4e61aa 100644 --- a/src/NATS.Client.Core/NatsResult.cs +++ b/src/NATS.Client.Core/NatsResult.cs @@ -29,6 +29,13 @@ public NatsResult(Exception error) public static implicit operator NatsResult(Exception error) => new(error); + [MethodImpl(MethodImplOptions.NoInlining)] + public void EnsureSuccess() + { + if (_error != null) + throw _error; + } + private static T ThrowValueIsNotSetException() => throw CreateInvalidOperationException("Result value is not set"); private static Exception ThrowErrorIsNotSetException() => throw CreateInvalidOperationException("Result error is not set"); diff --git a/src/NATS.Client.KeyValueStore/NatsKVException.cs b/src/NATS.Client.KeyValueStore/NatsKVException.cs index d5d8ad36e..e89c73622 100644 --- a/src/NATS.Client.KeyValueStore/NatsKVException.cs +++ b/src/NATS.Client.KeyValueStore/NatsKVException.cs @@ -54,6 +54,8 @@ public NatsKVCreateException() public class NatsKVKeyNotFoundException : NatsKVException { + public static readonly NatsKVKeyNotFoundException Default = new(); + public NatsKVKeyNotFoundException() : base("Key not found") { diff --git a/src/NATS.Client.KeyValueStore/NatsKVStore.cs b/src/NATS.Client.KeyValueStore/NatsKVStore.cs index cd2a32fc9..197b30290 100644 --- a/src/NATS.Client.KeyValueStore/NatsKVStore.cs +++ b/src/NATS.Client.KeyValueStore/NatsKVStore.cs @@ -292,6 +292,141 @@ public async ValueTask> GetEntryAsync(string key, ulong revisi } } + public async ValueTask> GetEntryAsyncNew(string key, ulong revision = default, INatsDeserialize? serializer = default, CancellationToken cancellationToken = default) + { + var result = await TryGetEntryAsync(key, revision, serializer, cancellationToken); + result.EnsureSuccess(); + return result.Value; + } + + /// + public async ValueTask>> TryGetEntryAsync(string key, ulong revision = default, INatsDeserialize? serializer = default, CancellationToken cancellationToken = default) + { + ValidateKey(key); + serializer ??= JetStreamContext.Connection.Opts.SerializerRegistry.GetDeserializer(); + + var request = new StreamMsgGetRequest(); + var keySubject = $"$KV.{Bucket}.{key}"; + + if (revision == default) + { + request.LastBySubj = keySubject; + } + else + { + request.Seq = revision; + request.NextBySubj = keySubject; + } + + if (_stream.Info.Config.AllowDirect) + { + var direct = await _stream.GetDirectAsync(request, serializer, cancellationToken); + + if (direct is { Headers: { } headers } msg) + { + if (headers.Code == 404) + return NatsKVKeyNotFoundException.Default; + + if (!headers.TryGetLastValue(NatsSubject, out var subject)) + return new NatsKVException("Missing sequence header"); + + if (revision != default) + { + if (!string.Equals(subject, keySubject, StringComparison.Ordinal)) + { + return new NatsKVException("Unexpected subject"); + } + } + + if (!headers.TryGetLastValue(NatsSequence, out var sequenceValue)) + return new NatsKVException("Missing sequence header"); + + if (!ulong.TryParse(sequenceValue, out var sequence)) + return new NatsKVException("Can't parse sequence header"); + + if (!headers.TryGetLastValue(NatsTimeStamp, out var timestampValue)) + return new NatsKVException("Missing timestamp header"); + + if (!DateTimeOffset.TryParse(timestampValue, out var timestamp)) + return new NatsKVException("Can't parse timestamp header"); + + var operation = NatsKVOperation.Put; + if (headers.TryGetValue(KVOperation, out var operationValues)) + { + if (operationValues.Count != 1) + return new NatsKVException("Unexpected number of operation headers"); + + if (!Enum.TryParse(operationValues[0], ignoreCase: true, out operation)) + return new NatsKVException("Can't parse operation header"); + } + + if (operation is NatsKVOperation.Del or NatsKVOperation.Purge) + { + return new NatsKVKeyDeletedException(sequence); + } + + return new NatsKVEntry(Bucket, key) + { + Bucket = Bucket, + Key = key, + Created = timestamp, + Revision = sequence, + Operation = operation, + Value = msg.Data, + Delta = 0, + UsedDirectGet = true, + Error = msg.Error, + }; + } + else + { + return new NatsKVException("Missing headers"); + } + } + else + { + var response = await _stream.GetAsync(request, cancellationToken); + + if (revision != default) + { + if (string.Equals(response.Message.Subject, keySubject, StringComparison.Ordinal)) + { + return new NatsKVException("Unexpected subject"); + } + } + + T? data; + NatsDeserializeException? deserializeException = null; + if (response.Message.Data.Length > 0) + { + var buffer = new ReadOnlySequence(response.Message.Data); + + try + { + data = serializer.Deserialize(buffer); + } + catch (Exception e) + { + deserializeException = new NatsDeserializeException(buffer.ToArray(), e); + data = default; + } + } + else + { + data = default; + } + + return new NatsKVEntry(Bucket, key) + { + Created = response.Message.Time, + Revision = response.Message.Seq, + Value = data, + UsedDirectGet = false, + Error = deserializeException, + }; + } + } + /// public IAsyncEnumerable> WatchAsync(string key, INatsDeserialize? serializer = default, NatsKVWatchOpts? opts = default, CancellationToken cancellationToken = default) => WatchAsync([key], serializer, opts, cancellationToken);