Skip to content

Commit

Permalink
Fixed an exception which happens when PutAsync is used more than once…
Browse files Browse the repository at this point in the history
… and activity logging is enabled in main project

Added some tests.
  • Loading branch information
uhfath committed Nov 11, 2024
1 parent 7c00b41 commit be90bf0
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 3 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -119,3 +119,4 @@ nuget/*.unitypackage

# MacOS folder attributes
.DS_Store
/tests/NATS.Client.TestUtilities/Properties/launchSettings.json
5 changes: 2 additions & 3 deletions src/NATS.Client.ObjectStore/NatsObjStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ public class NatsObjStore : INatsObjStore
private const string NatsRollup = "Nats-Rollup";
private const string RollupSubject = "sub";

private static readonly NatsHeaders NatsRollupHeaders = new() { { NatsRollup, RollupSubject } };

private readonly NatsObjContext _objContext;
private readonly INatsJSStream _stream;

Expand Down Expand Up @@ -603,7 +601,8 @@ public async ValueTask DeleteAsync(string key, CancellationToken cancellationTok

private async ValueTask PublishMeta(ObjectMetadata meta, CancellationToken cancellationToken)
{
var ack = await JetStreamContext.PublishAsync(GetMetaSubject(meta.Name), meta, serializer: NatsObjJsonSerializer<ObjectMetadata>.Default, headers: NatsRollupHeaders, cancellationToken: cancellationToken);
var natsRollupHeaders = new NatsHeaders { { NatsRollup, RollupSubject } };
var ack = await JetStreamContext.PublishAsync(GetMetaSubject(meta.Name), meta, serializer: NatsObjJsonSerializer<ObjectMetadata>.Default, headers: natsRollupHeaders, cancellationToken: cancellationToken);
ack.EnsureSuccess();
}

Expand Down
67 changes: 67 additions & 0 deletions tests/NATS.Client.ObjectStore.Tests/ObjectStoreTest.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System.Diagnostics;
using System.Security.Cryptography;
using System.Text;
using NATS.Client.Core.Tests;
Expand Down Expand Up @@ -484,4 +485,70 @@ public async Task Put_get_serialization_when_default_serializer_is_not_used()
var info = await store.GetInfoAsync("k1", cancellationToken: cancellationToken);
Assert.Equal("k1", info.Name);
}

[Fact]
public async Task Put_with_activity()
{
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
var cancellationToken = cts.Token;

using var activitySource = new ActivitySource($"NATS-debug-{nameof(Put_with_activity)}");
using var activityListener = new ActivityListener
{
ShouldListenTo = _ => true,
SampleUsingParentId = (ref ActivityCreationOptions<string> _) => ActivitySamplingResult.AllData,
Sample = (ref ActivityCreationOptions<ActivityContext> _) => ActivitySamplingResult.AllData,
};
using var activity = activitySource.StartActivity(ActivityKind.Client);
ActivitySource.AddActivityListener(activityListener);

await using var server = NatsServer.StartJS();
await using var nats = server.CreateClientConnection();
var js = new NatsJSContext(nats);
var obj = new NatsObjContext(js);

var store = await obj.CreateObjectStoreAsync(new NatsObjConfig("b1"), cancellationToken);

var data = new byte[1024];
Random.Shared.NextBytes(data);

const string filename = $"_tmp_test_file_{nameof(Put_with_activity)}.bin";
await File.WriteAllBytesAsync(filename, data, cancellationToken);

await store.PutAsync("my/random/data_1.bin", File.OpenRead(filename), cancellationToken: cancellationToken);
}

[Fact]
public async Task Put_multiple_times_with_activity()
{
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
var cancellationToken = cts.Token;

using var activitySource = new ActivitySource($"NATS-debug-{nameof(Put_multiple_times_with_activity)}");
using var activityListener = new ActivityListener
{
ShouldListenTo = _ => true,
SampleUsingParentId = (ref ActivityCreationOptions<string> _) => ActivitySamplingResult.AllData,
Sample = (ref ActivityCreationOptions<ActivityContext> _) => ActivitySamplingResult.AllData,
};
using var activity = activitySource.StartActivity(ActivityKind.Client);
ActivitySource.AddActivityListener(activityListener);

await using var server = NatsServer.StartJS();
await using var nats = server.CreateClientConnection();
var js = new NatsJSContext(nats);
var obj = new NatsObjContext(js);

var store = await obj.CreateObjectStoreAsync(new NatsObjConfig("b1"), cancellationToken);

var data = new byte[1024];
Random.Shared.NextBytes(data);

const string filename = $"_tmp_test_file_{nameof(Put_multiple_times_with_activity)}.bin";
await File.WriteAllBytesAsync(filename, data, cancellationToken);

await store.PutAsync("my/random/data_1.bin", File.OpenRead(filename), cancellationToken: cancellationToken);
await store.PutAsync("my/random/data_2.bin", File.OpenRead(filename), cancellationToken: cancellationToken);
await store.PutAsync("my/random/data_3.bin", File.OpenRead(filename), cancellationToken: cancellationToken);
}
}

0 comments on commit be90bf0

Please sign in to comment.