Skip to content

Commit

Permalink
Try-get implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
mtmk committed Dec 4, 2024
1 parent 8cdfbba commit 9b4d49b
Show file tree
Hide file tree
Showing 5 changed files with 168 additions and 5 deletions.
27 changes: 23 additions & 4 deletions sandbox/MicroBenchmark/KVBench.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
using BenchmarkDotNet.Attributes;
using BenchmarkDotNet.Attributes;
using NATS.Client.Core;
using NATS.Client.JetStream;
using NATS.Client.KeyValueStore;

#pragma warning disable CS8618

namespace MicroBenchmark;

[MemoryDiagnoser]
Expand All @@ -13,7 +15,7 @@ public class KVBench
private NatsConnection _nats;
private NatsJSContext _js;
private NatsKVContext _kv;
private INatsKVStore _store;
private NatsKVStore _store;

[Params(64, 512, 1024)]
public int Iter { get; set; }
Expand All @@ -24,18 +26,35 @@ public async Task SetupAsync()
_nats = new NatsConnection();
_js = new NatsJSContext(_nats);
_kv = new NatsKVContext(_js);
_store = await _kv.CreateStoreAsync("benchmark");
_store = (NatsKVStore)(await _kv.CreateStoreAsync("benchmark"));
}

[Benchmark]
public async ValueTask<int> TryGetAsync()
{
var total = 0;
for (var i = 0; i < Iter; i++)
{
var result = await _store.TryGetEntryAsync<int>("does.not.exist");
if (result is { Success: false, Error: NatsKVKeyNotFoundException })
total++;
}

if (total != Iter)
throw new Exception();

return total;
}

[Benchmark]
public async ValueTask<int> GetAsyncNew()
{
var total = 0;
for (var i = 0; i < Iter; i++)
{
try
{
await _store.GetEntryAsync<int>("does.not.exist");
await _store.GetEntryAsyncNew<int>("does.not.exist");
}
catch (NatsKVKeyNotFoundException)
{
Expand Down
2 changes: 1 addition & 1 deletion sandbox/MicroBenchmark/MicroBenchmark.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="BenchmarkDotNet" Version="0.13.12" />
<PackageReference Include="BenchmarkDotNet" Version="0.14.0" />
<PackageReference Include="StackExchange.Redis" Version="2.5.43" />
<PackageReference Include="ZLogger" Version="1.6.1" />
<PackageReference Include="NATS.Client" Version="0.14.5" />
Expand Down
7 changes: 7 additions & 0 deletions src/NATS.Client.Core/NatsResult.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@ public NatsResult(Exception error)

public static implicit operator NatsResult<T>(Exception error) => new(error);

[MethodImpl(MethodImplOptions.NoInlining)]
public void EnsureSuccess()
{
if (_error != null)
throw _error;
}

private static T ThrowValueIsNotSetException() => throw CreateInvalidOperationException("Result value is not set");

private static Exception ThrowErrorIsNotSetException() => throw CreateInvalidOperationException("Result error is not set");
Expand Down
2 changes: 2 additions & 0 deletions src/NATS.Client.KeyValueStore/NatsKVException.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ public NatsKVCreateException()

public class NatsKVKeyNotFoundException : NatsKVException
{
public static readonly NatsKVKeyNotFoundException Default = new();

public NatsKVKeyNotFoundException()
: base("Key not found")
{
Expand Down
135 changes: 135 additions & 0 deletions src/NATS.Client.KeyValueStore/NatsKVStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,141 @@ public async ValueTask<NatsKVEntry<T>> GetEntryAsync<T>(string key, ulong revisi
}
}

public async ValueTask<NatsKVEntry<T>> GetEntryAsyncNew<T>(string key, ulong revision = default, INatsDeserialize<T>? serializer = default, CancellationToken cancellationToken = default)
{
var result = await TryGetEntryAsync(key, revision, serializer, cancellationToken);
result.EnsureSuccess();
return result.Value;
}

/// <inheritdoc />
public async ValueTask<NatsResult<NatsKVEntry<T>>> TryGetEntryAsync<T>(string key, ulong revision = default, INatsDeserialize<T>? serializer = default, CancellationToken cancellationToken = default)
{
ValidateKey(key);
serializer ??= JetStreamContext.Connection.Opts.SerializerRegistry.GetDeserializer<T>();

var request = new StreamMsgGetRequest();
var keySubject = $"$KV.{Bucket}.{key}";

if (revision == default)
{
request.LastBySubj = keySubject;
}
else
{
request.Seq = revision;
request.NextBySubj = keySubject;
}

if (_stream.Info.Config.AllowDirect)
{
var direct = await _stream.GetDirectAsync<T>(request, serializer, cancellationToken);

if (direct is { Headers: { } headers } msg)
{
if (headers.Code == 404)
return NatsKVKeyNotFoundException.Default;

if (!headers.TryGetLastValue(NatsSubject, out var subject))
return new NatsKVException("Missing sequence header");

if (revision != default)
{
if (!string.Equals(subject, keySubject, StringComparison.Ordinal))
{
return new NatsKVException("Unexpected subject");
}
}

if (!headers.TryGetLastValue(NatsSequence, out var sequenceValue))
return new NatsKVException("Missing sequence header");

if (!ulong.TryParse(sequenceValue, out var sequence))
return new NatsKVException("Can't parse sequence header");

if (!headers.TryGetLastValue(NatsTimeStamp, out var timestampValue))
return new NatsKVException("Missing timestamp header");

if (!DateTimeOffset.TryParse(timestampValue, out var timestamp))
return new NatsKVException("Can't parse timestamp header");

var operation = NatsKVOperation.Put;
if (headers.TryGetValue(KVOperation, out var operationValues))
{
if (operationValues.Count != 1)
return new NatsKVException("Unexpected number of operation headers");

if (!Enum.TryParse(operationValues[0], ignoreCase: true, out operation))
return new NatsKVException("Can't parse operation header");
}

if (operation is NatsKVOperation.Del or NatsKVOperation.Purge)
{
return new NatsKVKeyDeletedException(sequence);
}

return new NatsKVEntry<T>(Bucket, key)
{
Bucket = Bucket,
Key = key,
Created = timestamp,
Revision = sequence,
Operation = operation,
Value = msg.Data,
Delta = 0,
UsedDirectGet = true,
Error = msg.Error,
};
}
else
{
return new NatsKVException("Missing headers");
}
}
else
{
var response = await _stream.GetAsync(request, cancellationToken);

if (revision != default)
{
if (string.Equals(response.Message.Subject, keySubject, StringComparison.Ordinal))
{
return new NatsKVException("Unexpected subject");
}
}

T? data;
NatsDeserializeException? deserializeException = null;
if (response.Message.Data.Length > 0)
{
var buffer = new ReadOnlySequence<byte>(response.Message.Data);

try
{
data = serializer.Deserialize(buffer);
}
catch (Exception e)
{
deserializeException = new NatsDeserializeException(buffer.ToArray(), e);
data = default;
}
}
else
{
data = default;
}

return new NatsKVEntry<T>(Bucket, key)
{
Created = response.Message.Time,
Revision = response.Message.Seq,
Value = data,
UsedDirectGet = false,
Error = deserializeException,
};
}
}

/// <inheritdoc />
public IAsyncEnumerable<NatsKVEntry<T>> WatchAsync<T>(string key, INatsDeserialize<T>? serializer = default, NatsKVWatchOpts? opts = default, CancellationToken cancellationToken = default)
=> WatchAsync<T>([key], serializer, opts, cancellationToken);
Expand Down

0 comments on commit 9b4d49b

Please sign in to comment.