diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index d0baf8a07..d4ed5a0f1 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -94,6 +94,13 @@ jobs: cd tests/NATS.Client.Services.Tests dotnet test -c Release --no-build + - name: Test Simplified + run: | + killall nats-server 2> /dev/null | echo -n + nats-server -v + cd tests/NATS.Client.Simplified.Tests + dotnet test -c Release --no-build + - name: Test OpenTelemetry run: | killall nats-server 2> /dev/null | echo -n diff --git a/src/NATS.Client.Core/Nuid.cs b/src/NATS.Client.Core/Nuid.cs index 8994f3f28..21d822e7d 100644 --- a/src/NATS.Client.Core/Nuid.cs +++ b/src/NATS.Client.Core/Nuid.cs @@ -18,7 +18,7 @@ namespace NATS.Client.Core; [SkipLocalsInit] public sealed class Nuid { - // NuidLength, PrefixLength, SequentialLength were nuint (System.UIntPtr) in the original code + // NuidLength, PrefixLength, SequentialLength were nuint (System.UIntPtr) in the original code, // however, they were changed to uint to fix the compilation error for IL2CPP Unity projects. // With nuint, the following error occurs in Unity Linux IL2CPP builds: // Error: IL2CPP error for method 'System.Char[] NATS.Client.Core.Internal.NuidWriter::Refresh(System.UInt64&)' @@ -88,7 +88,7 @@ private static bool TryWriteNuidCore(Span buffer, Span prefix, ulong ref var digitsPtr = ref MemoryMarshal.GetReference(Digits); // write backwards so the last two characters change the fastest - for (var i = NuidLength; i > PrefixLength;) + for (nuint i = NuidLength; i > PrefixLength;) { i--; var digitIndex = (nuint)(sequential % Base); diff --git a/src/NATS.Client.JetStream/Internal/netstandard.cs b/src/NATS.Client.JetStream/Internal/netstandard.cs index f14282452..66b69b9b4 100644 --- a/src/NATS.Client.JetStream/Internal/netstandard.cs +++ b/src/NATS.Client.JetStream/Internal/netstandard.cs @@ -5,10 +5,6 @@ namespace System.Runtime.CompilerServices { - internal class ExtensionAttribute : Attribute - { - } - internal sealed class CompilerFeatureRequiredAttribute : Attribute { public CompilerFeatureRequiredAttribute(string featureName) diff --git a/src/NATS.Client.JetStream/Models/StreamSource.cs b/src/NATS.Client.JetStream/Models/StreamSource.cs index 684b7d667..0529bcb47 100644 --- a/src/NATS.Client.JetStream/Models/StreamSource.cs +++ b/src/NATS.Client.JetStream/Models/StreamSource.cs @@ -54,4 +54,13 @@ public record StreamSource [System.Text.Json.Serialization.JsonPropertyName("external")] [System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingDefault)] public ExternalStreamSource? External { get; set; } + + /// + /// This field is a convenience for setting up an ExternalStream. + /// If set, the value here is used to calculate the JetStreamAPI prefix. + /// This field is never serialized to the server. This value cannot be set + /// if external is set. + /// + [System.Text.Json.Serialization.JsonIgnore] + public string? Domain { get; set; } } diff --git a/src/NATS.Client.JetStream/NatsJSContext.Streams.cs b/src/NATS.Client.JetStream/NatsJSContext.Streams.cs index 53d61cbdd..b9b428b89 100644 --- a/src/NATS.Client.JetStream/NatsJSContext.Streams.cs +++ b/src/NATS.Client.JetStream/NatsJSContext.Streams.cs @@ -20,6 +20,38 @@ public async ValueTask CreateStreamAsync( CancellationToken cancellationToken = default) { ThrowIfInvalidStreamName(config.Name, nameof(config.Name)); + + // keep caller's config intact. + config = config with { }; + + // If we have a mirror and an external domain, convert to ext.APIPrefix. + if (config.Mirror != null && !string.IsNullOrEmpty(config.Mirror.Domain)) + { + config.Mirror = config.Mirror with { }; + ConvertDomain(config.Mirror); + } + + // Check sources for the same. + if (config.Sources != null && config.Sources.Count > 0) + { + ICollection? sources = []; + foreach (var ss in config.Sources) + { + if (!string.IsNullOrEmpty(ss.Domain)) + { + var remappedDomainSource = ss with { }; + ConvertDomain(remappedDomainSource); + sources.Add(remappedDomainSource); + } + else + { + sources.Add(ss); + } + } + + config.Sources = sources; + } + var response = await JSRequestResponseAsync( subject: $"{Opts.Prefix}.STREAM.CREATE.{config.Name}", config, diff --git a/src/NATS.Client.JetStream/NatsJSContext.cs b/src/NATS.Client.JetStream/NatsJSContext.cs index 66ed5cacc..c0c6f9094 100644 --- a/src/NATS.Client.JetStream/NatsJSContext.cs +++ b/src/NATS.Client.JetStream/NatsJSContext.cs @@ -327,6 +327,21 @@ internal async ValueTask> JSRequestAsync throw new ArgumentException("Stream name cannot contain ' ', '.'", paramName); diff --git a/src/NATS.Client.KeyValueStore/NatsKVConfig.cs b/src/NATS.Client.KeyValueStore/NatsKVConfig.cs index dad8b7086..3ff86e67d 100644 --- a/src/NATS.Client.KeyValueStore/NatsKVConfig.cs +++ b/src/NATS.Client.KeyValueStore/NatsKVConfig.cs @@ -1,3 +1,5 @@ +using NATS.Client.JetStream.Models; + namespace NATS.Client.KeyValueStore; /// @@ -61,12 +63,15 @@ public record NatsKVConfig /// public bool Compression { get; init; } - // TODO: Bucket mirror configuration. - // pub mirror: Option, - // Bucket sources configuration. - // pub sources: Option>, - // Allow mirrors using direct API. - // pub mirror_direct: bool, + /// + /// Mirror defines the configuration for mirroring another KeyValue store + /// + public StreamSource? Mirror { get; init; } + + /// + /// Sources defines the configuration for sources of a KeyValue store. + /// + public ICollection? Sources { get; set; } } /// diff --git a/src/NATS.Client.KeyValueStore/NatsKVContext.cs b/src/NATS.Client.KeyValueStore/NatsKVContext.cs index 6c4279739..cd0b1871c 100644 --- a/src/NATS.Client.KeyValueStore/NatsKVContext.cs +++ b/src/NATS.Client.KeyValueStore/NatsKVContext.cs @@ -173,9 +173,6 @@ private static string ExtractBucketName(string streamName) private static StreamConfig CreateStreamConfig(NatsKVConfig config) { - // TODO: KV Mirrors - var subjects = new[] { $"$KV.{config.Bucket}.>" }; - long history; if (config.History > 0) { @@ -203,6 +200,66 @@ private static StreamConfig CreateStreamConfig(NatsKVConfig config) var replicas = config.NumberOfReplicas > 0 ? config.NumberOfReplicas : 1; + string[]? subjects; + StreamSource? mirror; + ICollection? sources; + bool mirrorDirect; + + if (config.Mirror != null) + { + mirror = config.Mirror with + { + Name = config.Mirror.Name.StartsWith(KvStreamNamePrefix) + ? config.Mirror.Name + : BucketToStream(config.Mirror.Name), + }; + mirrorDirect = true; + subjects = default; + sources = default; + } + else if (config.Sources is { Count: > 0 }) + { + sources = []; + foreach (var ss in config.Sources) + { + string? sourceBucketName; + if (ss.Name.StartsWith(KvStreamNamePrefix)) + { + sourceBucketName = ss.Name.Substring(KvStreamNamePrefixLen); + } + else + { + sourceBucketName = ss.Name; + ss.Name = BucketToStream(ss.Name); + } + + if (ss.External == null || sourceBucketName != config.Bucket) + { + ss.SubjectTransforms = + [ + new SubjectTransform + { + Src = $"$KV.{sourceBucketName}.>", + Dest = $"$KV.{config.Bucket}.>", + } + ]; + } + + sources.Add(ss); + } + + subjects = [$"$KV.{config.Bucket}.>"]; + mirror = default; + mirrorDirect = false; + } + else + { + subjects = [$"$KV.{config.Bucket}.>"]; + mirror = default; + sources = default; + mirrorDirect = false; + } + var streamConfig = new StreamConfig { Name = BucketToStream(config.Bucket), @@ -221,10 +278,9 @@ private static StreamConfig CreateStreamConfig(NatsKVConfig config) AllowDirect = true, NumReplicas = replicas, Discard = StreamConfigDiscard.New, - - // TODO: KV mirrors - // MirrorDirect = - // Mirror = + Mirror = mirror, + MirrorDirect = mirrorDirect, + Sources = sources, Retention = StreamConfigRetention.Limits, // from ADR-8 }; diff --git a/tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs b/tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs index 01c973cf9..8504f04ac 100644 --- a/tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs +++ b/tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs @@ -648,4 +648,51 @@ public async Task TestDirectMessageRepublishedSubject() Assert.Equal(publishSubject3, kve3.Key); Assert.Equal("tres", kve3.Value); } + + [SkipIfNatsServer(versionEarlierThan: "2.10")] + public async Task Test_CombinedSources() + { + await using var server = NatsServer.StartJS(); + await using var nats = server.CreateClientConnection(); + + var js = new NatsJSContext(nats); + var kv = new NatsKVContext(js); + + var storeSource1 = await kv.CreateStoreAsync("source1"); + var storeSource2 = await kv.CreateStoreAsync("source2"); + + var storeCombined = await kv.CreateStoreAsync(new NatsKVConfig("combined") + { + Sources = [ + new StreamSource { Name = "source1" }, + new StreamSource { Name = "source2" } + ], + }); + + await storeSource1.PutAsync("ss1_a", "a_fromStore1"); + await storeSource2.PutAsync("ss2_b", "b_fromStore2"); + + await Retry.Until( + "async replication is completed", + async () => + { + try + { + await storeCombined.GetEntryAsync("ss1_a"); + await storeCombined.GetEntryAsync("ss2_b"); + } + catch (NatsKVKeyNotFoundException) + { + return false; + } + + return true; + }); + + var entryA = await storeCombined.GetEntryAsync("ss1_a"); + var entryB = await storeCombined.GetEntryAsync("ss2_b"); + + Assert.Equal("a_fromStore1", entryA.Value); + Assert.Equal("b_fromStore2", entryB.Value); + } } diff --git a/tests/NATS.Client.KeyValueStore.Tests/NatsKVContextFactoryTest.cs b/tests/NATS.Client.KeyValueStore.Tests/NatsKVContextFactoryTest.cs index 0270ae9f9..9f63aeada 100644 --- a/tests/NATS.Client.KeyValueStore.Tests/NatsKVContextFactoryTest.cs +++ b/tests/NATS.Client.KeyValueStore.Tests/NatsKVContextFactoryTest.cs @@ -50,6 +50,8 @@ public class MockJsContext : INatsJSContext { public INatsConnection Connection { get; } = new NatsConnection(); + public NatsJSOpts Opts { get; } = new(new NatsOpts()); + public ValueTask CreateOrderedConsumerAsync(string stream, NatsJSOrderedConsumerOpts? opts = default, CancellationToken cancellationToken = default) => throw new NotImplementedException(); public ValueTask CreateOrUpdateConsumerAsync(string stream, ConsumerConfig config, CancellationToken cancellationToken = default) => throw new NotImplementedException(); diff --git a/tests/NATS.Client.ObjectStore.Tests/NatsObjContextFactoryTest.cs b/tests/NATS.Client.ObjectStore.Tests/NatsObjContextFactoryTest.cs index 2ed5bb6f8..1ac2c470b 100644 --- a/tests/NATS.Client.ObjectStore.Tests/NatsObjContextFactoryTest.cs +++ b/tests/NATS.Client.ObjectStore.Tests/NatsObjContextFactoryTest.cs @@ -50,6 +50,8 @@ public class MockJsContext : INatsJSContext { public INatsConnection Connection { get; } = new NatsConnection(); + public NatsJSOpts Opts { get; } = new(new NatsOpts()); + public ValueTask CreateOrderedConsumerAsync(string stream, NatsJSOrderedConsumerOpts? opts = default, CancellationToken cancellationToken = default) => throw new NotImplementedException(); public ValueTask CreateOrUpdateConsumerAsync(string stream, ConsumerConfig config, CancellationToken cancellationToken = default) => throw new NotImplementedException(); diff --git a/tests/NATS.Client.Platform.Windows.Tests/NATS.Client.Platform.Windows.Tests.csproj b/tests/NATS.Client.Platform.Windows.Tests/NATS.Client.Platform.Windows.Tests.csproj index faaf762a1..8f3208efb 100644 --- a/tests/NATS.Client.Platform.Windows.Tests/NATS.Client.Platform.Windows.Tests.csproj +++ b/tests/NATS.Client.Platform.Windows.Tests/NATS.Client.Platform.Windows.Tests.csproj @@ -16,6 +16,8 @@ + + diff --git a/tests/NATS.Client.Simplified.Tests/test.runsettings b/tests/NATS.Client.Simplified.Tests/test.runsettings new file mode 100644 index 000000000..27c41ad33 --- /dev/null +++ b/tests/NATS.Client.Simplified.Tests/test.runsettings @@ -0,0 +1,7 @@ + + + + 1 + 300000 + +