diff --git a/src/NATS.Client.JetStream/INatsJSContext.cs b/src/NATS.Client.JetStream/INatsJSContext.cs index a23eff2c8..bc1c89e41 100644 --- a/src/NATS.Client.JetStream/INatsJSContext.cs +++ b/src/NATS.Client.JetStream/INatsJSContext.cs @@ -208,6 +208,20 @@ ValueTask CreateStreamAsync( StreamConfig config, CancellationToken cancellationToken = default); + /// + /// Creates a new stream if it doesn't exist or update if the stream already exists. + /// + /// Stream configuration request to be sent to NATS JetStream server. + /// A used to cancel the API call. + /// The NATS JetStream stream object which can be used to manage the stream. + /// There was an issue retrieving the response. + /// Server responded with an error. + /// The stream name in is invalid. + /// The name in is null. + ValueTask CreateOrUpdateStreamAsync( + StreamConfig config, + CancellationToken cancellationToken = default); + /// /// Deletes a stream. /// diff --git a/src/NATS.Client.JetStream/NatsJSContext.Streams.cs b/src/NATS.Client.JetStream/NatsJSContext.Streams.cs index b9b428b89..135d33457 100644 --- a/src/NATS.Client.JetStream/NatsJSContext.Streams.cs +++ b/src/NATS.Client.JetStream/NatsJSContext.Streams.cs @@ -59,6 +59,33 @@ public async ValueTask CreateStreamAsync( return new NatsJSStream(this, response); } + /// + /// Creates a new stream if it doesn't exist or update if the stream already exists. + /// + /// Stream configuration request to be sent to NATS JetStream server. + /// A used to cancel the API call. + /// The NATS JetStream stream object which can be used to manage the stream. + /// There was an issue retrieving the response. + /// Server responded with an error. + /// The stream name in is invalid. + /// The name in is null. + public async ValueTask CreateOrUpdateStreamAsync(StreamConfig config, CancellationToken cancellationToken = default) + { + ThrowIfInvalidStreamName(config.Name, nameof(config.Name)); + var response = await JSRequestAsync( + subject: $"{Opts.Prefix}.STREAM.UPDATE.{config.Name}", + request: config, + cancellationToken); + + if (response.Error is { Code: 404 }) + { + return await CreateStreamAsync(config, cancellationToken); + } + + response.EnsureSuccess(); + return new NatsJSStream(this, response.Response!); + } + /// /// Deletes a stream. /// diff --git a/tests/NATS.Client.JetStream.Tests/JetStreamTest.cs b/tests/NATS.Client.JetStream.Tests/JetStreamTest.cs index 4cbc9cd6f..310bc9553 100644 --- a/tests/NATS.Client.JetStream.Tests/JetStreamTest.cs +++ b/tests/NATS.Client.JetStream.Tests/JetStreamTest.cs @@ -26,6 +26,9 @@ public async Task Stream_invalid_name_test(string? streamName) // Create stream await Assert.ThrowsAnyAsync(async () => await jsmContext.CreateStreamAsync(cfg)); + // Create or update stream + await Assert.ThrowsAnyAsync(async () => await jsmContext.CreateOrUpdateStreamAsync(cfg)); + // Delete stream await Assert.ThrowsAnyAsync(async () => await jsmContext.DeleteStreamAsync(streamName!)); diff --git a/tests/NATS.Client.JetStream.Tests/ManageStreamTest.cs b/tests/NATS.Client.JetStream.Tests/ManageStreamTest.cs index c36db8ab3..7f607319b 100644 --- a/tests/NATS.Client.JetStream.Tests/ManageStreamTest.cs +++ b/tests/NATS.Client.JetStream.Tests/ManageStreamTest.cs @@ -127,4 +127,61 @@ public async Task Delete_one_msg() Assert.Equal(2, stream.Info.State.Subjects?.Count); } + + [Fact] + public async Task Create_or_update_stream_should_be_create_stream_if_stream_doesnt_exist() + { + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + + await using var server = NatsServer.StartJS(); + var nats = server.CreateClientConnection(); + var js = new NatsJSContext(nats); + + var streamConfig = new StreamConfig("s1", ["s1.*"]) + { Storage = StreamConfigStorage.File }; + + var accountInfoBefore = await js.GetAccountInfoAsync(cts.Token); + await js.CreateOrUpdateStreamAsync(streamConfig, cts.Token); + var accountInfoAfter = await js.GetAccountInfoAsync(cts.Token); + + Assert.Equal(0, accountInfoBefore.Streams); + Assert.Equal(1, accountInfoAfter.Streams); + } + + [Fact] + public async Task Create_or_update_stream_should_be_update_stream_if_stream_exist() + { + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + + await using var server = NatsServer.StartJS(); + var nats = server.CreateClientConnection(); + var js = new NatsJSContext(nats); + + var streamConfig = new StreamConfig("s1", ["s1.*"]) + { Storage = StreamConfigStorage.File, NoAck = false }; + var streamConfigForUpdated = streamConfig with { NoAck = true }; + + var stream = await js.CreateOrUpdateStreamAsync(streamConfig, cts.Token); + var updatedStream = await js.CreateOrUpdateStreamAsync(streamConfigForUpdated, cts.Token); + + Assert.False(stream.Info.Config.NoAck); + Assert.True(updatedStream.Info.Config.NoAck); + } + + [Fact] + public async Task Create_or_update_stream_should_be_throwing_update_operation_errors() + { + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + + await using var server = NatsServer.StartJS(); + var nats = server.CreateClientConnection(); + var js = new NatsJSContext(nats); + + var streamConfig = new StreamConfig("s1", ["s1.*"]) + { Storage = StreamConfigStorage.File }; + var streamConfigForUpdated = streamConfig with { Storage = StreamConfigStorage.Memory }; + + await js.CreateOrUpdateStreamAsync(streamConfig, cts.Token); + await Assert.ThrowsAsync(async () => await js.CreateOrUpdateStreamAsync(streamConfigForUpdated, cts.Token)); + } } diff --git a/tests/NATS.Client.KeyValueStore.Tests/NatsKVContextFactoryTest.cs b/tests/NATS.Client.KeyValueStore.Tests/NatsKVContextFactoryTest.cs index f3700d222..d82eac404 100644 --- a/tests/NATS.Client.KeyValueStore.Tests/NatsKVContextFactoryTest.cs +++ b/tests/NATS.Client.KeyValueStore.Tests/NatsKVContextFactoryTest.cs @@ -78,6 +78,8 @@ public class MockJsContext : INatsJSContext public ValueTask CreateStreamAsync(StreamConfig config, CancellationToken cancellationToken = default) => throw new NotImplementedException(); + public ValueTask CreateOrUpdateStreamAsync(StreamConfig config, CancellationToken cancellationToken = default) => throw new NotImplementedException(); + public ValueTask DeleteStreamAsync(string stream, CancellationToken cancellationToken = default) => throw new NotImplementedException(); public ValueTask PurgeStreamAsync(string stream, StreamPurgeRequest request, CancellationToken cancellationToken = default) => throw new NotImplementedException(); diff --git a/tests/NATS.Client.ObjectStore.Tests/NatsObjContextFactoryTest.cs b/tests/NATS.Client.ObjectStore.Tests/NatsObjContextFactoryTest.cs index adc5eef32..78d94dee2 100644 --- a/tests/NATS.Client.ObjectStore.Tests/NatsObjContextFactoryTest.cs +++ b/tests/NATS.Client.ObjectStore.Tests/NatsObjContextFactoryTest.cs @@ -78,6 +78,8 @@ public class MockJsContext : INatsJSContext public ValueTask CreateStreamAsync(StreamConfig config, CancellationToken cancellationToken = default) => throw new NotImplementedException(); + public ValueTask CreateOrUpdateStreamAsync(StreamConfig config, CancellationToken cancellationToken = default) => throw new NotImplementedException(); + public ValueTask DeleteStreamAsync(string stream, CancellationToken cancellationToken = default) => throw new NotImplementedException(); public ValueTask PurgeStreamAsync(string stream, StreamPurgeRequest request, CancellationToken cancellationToken = default) => throw new NotImplementedException();