Skip to content

Commit

Permalink
Watcher fix and API naming fix (#273)
Browse files Browse the repository at this point in the history
* Watcher fix and API naming fix

* Fixed watcher not to exit after receiving initial set of
  objects. Introduced a new option to accomodate for that
  behaviour rather than returning null for example.
* Also fixed the method naming to be affixed 'Async'.

* Build and format fixes
  • Loading branch information
mtmk authored Dec 8, 2023
1 parent 5f925ea commit 3460b1e
Show file tree
Hide file tree
Showing 9 changed files with 74 additions and 22 deletions.
2 changes: 1 addition & 1 deletion sandbox/Example.ObjectStore/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
var obj = new NatsObjContext(js);

Log("Create object store...");
var store = await obj.CreateObjectStore("test-bucket");
var store = await obj.CreateObjectStoreAsync("test-bucket");

var data = new byte[1024 * 1024 * 10];
Random.Shared.NextBytes(data);
Expand Down
4 changes: 2 additions & 2 deletions src/NATS.Client.ObjectStore/INatsObjContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@ public interface INatsObjContext
/// <param name="bucket">Bucket name.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <returns>Object store object.</returns>
ValueTask<INatsObjStore> CreateObjectStore(string bucket, CancellationToken cancellationToken = default);
ValueTask<INatsObjStore> CreateObjectStoreAsync(string bucket, CancellationToken cancellationToken = default);

/// <summary>
/// Create a new object store.
/// </summary>
/// <param name="config">Object store configuration.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <returns>Object store object.</returns>
ValueTask<INatsObjStore> CreateObjectStore(NatsObjConfig config, CancellationToken cancellationToken = default);
ValueTask<INatsObjStore> CreateObjectStoreAsync(NatsObjConfig config, CancellationToken cancellationToken = default);

/// <summary>
/// Get an existing object store.
Expand Down
6 changes: 3 additions & 3 deletions src/NATS.Client.ObjectStore/NatsObjContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,16 @@ public class NatsObjContext : INatsObjContext
/// <param name="bucket">Bucket name.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <returns>Object store object.</returns>
public ValueTask<INatsObjStore> CreateObjectStore(string bucket, CancellationToken cancellationToken = default) =>
CreateObjectStore(new NatsObjConfig(bucket), cancellationToken);
public ValueTask<INatsObjStore> CreateObjectStoreAsync(string bucket, CancellationToken cancellationToken = default) =>
CreateObjectStoreAsync(new NatsObjConfig(bucket), cancellationToken);

/// <summary>
/// Create a new object store.
/// </summary>
/// <param name="config">Object store configuration.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <returns>Object store object.</returns>
public async ValueTask<INatsObjStore> CreateObjectStore(NatsObjConfig config, CancellationToken cancellationToken = default)
public async ValueTask<INatsObjStore> CreateObjectStoreAsync(NatsObjConfig config, CancellationToken cancellationToken = default)
{
ValidateBucketName(config.Bucket);

Expand Down
8 changes: 7 additions & 1 deletion src/NATS.Client.ObjectStore/NatsObjStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,7 @@ public IAsyncEnumerable<ObjectMetadata> ListAsync(NatsObjListOpts? opts = defaul
opts ??= new NatsObjListOpts();
var watchOpts = new NatsObjWatchOpts
{
InitialSetOnly = true,
UpdatesOnly = false,
IgnoreDeletes = !opts.ShowDeleted,
};
Expand Down Expand Up @@ -588,7 +589,7 @@ public async IAsyncEnumerable<ObjectMetadata> WatchAsync(NatsObjWatchOpts? opts
}
}

if (!opts.UpdatesOnly)
if (opts.InitialSetOnly)
{
if (metadata.NumPending == 0)
break;
Expand Down Expand Up @@ -672,6 +673,11 @@ public record NatsObjWatchOpts
public bool IncludeHistory { get; init; }

public bool UpdatesOnly { get; init; }

/// <summary>
/// Only return the initial set of objects and don't watch for further updates.
/// </summary>
public bool InitialSetOnly { get; init; }
}

public record NatsObjListOpts
Expand Down
2 changes: 1 addition & 1 deletion tests/NATS.Client.CheckNativeAot/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ async Task ObjectStoreTests()
var js = new NatsJSContext(nats);
var ob = new NatsObjContext(js);

var store = await ob.CreateObjectStore(new NatsObjConfig("b1"), cancellationToken);
var store = await ob.CreateObjectStoreAsync(new NatsObjConfig("b1"), cancellationToken);

var stringBuilder = new StringBuilder();
for (var i = 0; i < 9; i++)
Expand Down
22 changes: 11 additions & 11 deletions tests/NATS.Client.ObjectStore.Tests/ObjectStoreTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public async Task Create_delete_object_store()
var js = new NatsJSContext(nats);
var ob = new NatsObjContext(js);

await ob.CreateObjectStore(new NatsObjConfig("b1"), cancellationToken);
await ob.CreateObjectStoreAsync(new NatsObjConfig("b1"), cancellationToken);

await foreach (var stream in js.ListStreamsAsync(cancellationToken: cancellationToken))
{
Expand All @@ -51,7 +51,7 @@ public async Task Put_chunks()
var js = new NatsJSContext(nats);
var ob = new NatsObjContext(js);

var store = await ob.CreateObjectStore(new NatsObjConfig("b1"), cancellationToken);
var store = await ob.CreateObjectStoreAsync(new NatsObjConfig("b1"), cancellationToken);

var stringBuilder = new StringBuilder();
for (var i = 0; i < 9; i++)
Expand Down Expand Up @@ -113,7 +113,7 @@ public async Task Get_chunks()
var js = new NatsJSContext(nats);
var ob = new NatsObjContext(js);

var store = await ob.CreateObjectStore(new NatsObjConfig("b1"), cancellationToken);
var store = await ob.CreateObjectStoreAsync(new NatsObjConfig("b1"), cancellationToken);

var stringBuilder = new StringBuilder();
for (var i = 0; i < 9; i++)
Expand Down Expand Up @@ -167,7 +167,7 @@ public async Task Delete_object()
var js = new NatsJSContext(nats);
var ob = new NatsObjContext(js);

var store = await ob.CreateObjectStore(new NatsObjConfig("b1"), cancellationToken);
var store = await ob.CreateObjectStoreAsync(new NatsObjConfig("b1"), cancellationToken);
await store.PutAsync("k1", new byte[] { 65, 66, 67 }, cancellationToken);

var info = await store.GetInfoAsync("k1", cancellationToken: cancellationToken);
Expand Down Expand Up @@ -205,7 +205,7 @@ public async Task Put_and_get_large_file()
var js = new NatsJSContext(nats);
var obj = new NatsObjContext(js);

var store = await obj.CreateObjectStore(new NatsObjConfig("b1"), cancellationToken);
var store = await obj.CreateObjectStoreAsync(new NatsObjConfig("b1"), cancellationToken);

var data = new byte[1024 * 1024 * 10];
Random.Shared.NextBytes(data);
Expand Down Expand Up @@ -236,8 +236,8 @@ public async Task Add_link()
var js = new NatsJSContext(nats);
var obj = new NatsObjContext(js);

var store1 = await obj.CreateObjectStore(new NatsObjConfig("b1"), cancellationToken);
var store2 = await obj.CreateObjectStore(new NatsObjConfig("b2"), cancellationToken);
var store1 = await obj.CreateObjectStoreAsync(new NatsObjConfig("b1"), cancellationToken);
var store2 = await obj.CreateObjectStoreAsync(new NatsObjConfig("b2"), cancellationToken);

await store1.PutAsync("k1", new byte[] { 42 }, cancellationToken: cancellationToken);

Expand Down Expand Up @@ -283,7 +283,7 @@ public async Task Seal_and_get_status()
var js = new NatsJSContext(nats);
var obj = new NatsObjContext(js);

var store = await obj.CreateObjectStore(new NatsObjConfig("b1"), cancellationToken);
var store = await obj.CreateObjectStoreAsync(new NatsObjConfig("b1"), cancellationToken);

await store.PutAsync("k1", new byte[] { 42 }, cancellationToken: cancellationToken);

Expand Down Expand Up @@ -322,7 +322,7 @@ public async Task List()
var js = new NatsJSContext(nats);
var obj = new NatsObjContext(js);

var store = await obj.CreateObjectStore(new NatsObjConfig("b1"), cancellationToken);
var store = await obj.CreateObjectStoreAsync(new NatsObjConfig("b1"), cancellationToken);

await store.PutAsync("k1", new byte[] { 42 }, cancellationToken: cancellationToken);
await store.PutAsync("k2", new byte[] { 43 }, cancellationToken: cancellationToken);
Expand Down Expand Up @@ -379,8 +379,8 @@ public async Task Compressed_storage()
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
var cancellationToken = cts.Token;

var store1 = await obj.CreateObjectStore(new NatsObjConfig("b1") { Compression = false }, cancellationToken);
var store2 = await obj.CreateObjectStore(new NatsObjConfig("b2") { Compression = true }, cancellationToken);
var store1 = await obj.CreateObjectStoreAsync(new NatsObjConfig("b1") { Compression = false }, cancellationToken);
var store2 = await obj.CreateObjectStoreAsync(new NatsObjConfig("b2") { Compression = true }, cancellationToken);

Assert.Equal("b1", store1.Bucket);
Assert.Equal("b2", store2.Bucket);
Expand Down
46 changes: 46 additions & 0 deletions tests/NATS.Client.ObjectStore.Tests/WatcherTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
using NATS.Client.Core.Tests;

namespace NATS.Client.ObjectStore.Tests;

public class WatcherTest
{
[Fact]
public async Task Watcher_test()
{
await using var server = NatsServer.StartJS();
await using var nats = server.CreateClientConnection();
var js = new NatsJSContext(nats);
var ob = new NatsObjContext(js);

var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
var cancellationToken = cts.Token;

var store = await ob.CreateObjectStoreAsync("b1", cancellationToken);

await store.PutAsync("k0", new byte[] { 0 }, cancellationToken);

var signal = new WaitSignal();

var watcher = Task.Run(
async () =>
{
var count = 0;
await foreach (var info in store.WatchAsync(cancellationToken: cancellationToken))
{
count++;
signal.Pulse();
Assert.Equal(count, info.Size);
if (count == 3)
break;
}
},
cancellationToken);

await signal;

await store.PutAsync("k1", new byte[] { 0, 1 }, cancellationToken);
await store.PutAsync("k1", new byte[] { 0, 1, 3 }, cancellationToken);

await watcher;
}
}
2 changes: 1 addition & 1 deletion tests/NATS.Net.DocsExamples/ObjectStore/IntroPage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public async Task Run()
}

#region store
var store = await obj.CreateObjectStore("test-bucket");
var store = await obj.CreateObjectStoreAsync("test-bucket");
#endregion

await File.WriteAllTextAsync("data.bin", "tests");
Expand Down
4 changes: 2 additions & 2 deletions tests/Nats.Client.Compat/ObjectStoreCompat.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public async Task TestDefaultBucket(NatsConnection nats, NatsMsg<Memory<byte>> m
// Test.Log($"JSON: {json}");
var config = json["config"];

await ob.CreateObjectStore(new NatsObjConfig(config["bucket"].GetValue<string>()));
await ob.CreateObjectStoreAsync(new NatsObjConfig(config["bucket"].GetValue<string>()));
await msg.ReplyAsync();
}

Expand All @@ -42,7 +42,7 @@ public async Task TestCustomBucket(NatsConnection nats, NatsMsg<Memory<byte>> ms
var bucket = config["bucket"].GetValue<string>();
var numberOfReplicas = config["num_replicas"].GetValue<int>();

await ob.CreateObjectStore(new NatsObjConfig(bucket)
await ob.CreateObjectStoreAsync(new NatsObjConfig(bucket)
{
Description = description,
MaxAge = fromSeconds,
Expand Down

0 comments on commit 3460b1e

Please sign in to comment.