diff --git a/src/NATS.Client.Core/NatsMsg.cs b/src/NATS.Client.Core/NatsMsg.cs index 3d05d3eb8..53bc01750 100644 --- a/src/NATS.Client.Core/NatsMsg.cs +++ b/src/NATS.Client.Core/NatsMsg.cs @@ -3,6 +3,96 @@ namespace NATS.Client.Core; +/// +/// This interface provides an optional contract when passing +/// messages to processing methods which is usually helpful in +/// creating test doubles in unit testing. +/// +/// +/// +/// Using this interface is optional and should not affect functionality. +/// +/// +/// There is a performance penalty when using this interface because +/// is a value type and boxing is required. +/// A boxing allocation occurs when a value type is converted to the +/// interface type. This is because the interface type is a reference +/// type and the value type must be converted to a reference type. +/// You should benchmark your application to determine if the +/// interface is worth the performance penalty or makes any noticeable +/// degradation in performance. +/// +/// +/// Data type of the payload +public interface INatsMsg +{ + /// The destination subject to publish to. + string Subject { get; init; } + + /// The reply subject that subscribers can use to send a response back to the publisher/requester. + string? ReplyTo { get; init; } + + /// Message size in bytes. + int Size { get; init; } + + /// Pass additional information using name-value pairs. + NatsHeaders? Headers { get; init; } + + /// Serializable data object. + T? Data { get; init; } + + /// NATS connection this message is associated to. + INatsConnection? Connection { get; init; } + + /// + /// Reply with an empty message. + /// + /// Optional message headers. + /// Optional reply-to subject. + /// A for publishing options. + /// A used to cancel the command. + /// A that represents the asynchronous send operation. + ValueTask ReplyAsync(NatsHeaders? headers = default, string? replyTo = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default); + + /// + /// Reply to this message. + /// + /// Serializable data object. + /// Optional message headers. + /// Optional reply-to subject. + /// Serializer to use for the message type. + /// A for publishing options. + /// A used to cancel the command. + /// Specifies the type of data that may be sent to the NATS Server. + /// A that represents the asynchronous send operation. + /// + /// + /// Publishes a new message using the reply-to subject from the this message as the destination subject. + /// + /// + /// If the is not specified, the assigned to + /// the will be used to find a serializer for the type . + /// You can specify a in when creating a + /// . If not specified, will be used. + /// + /// + ValueTask ReplyAsync(TReply data, NatsHeaders? headers = default, string? replyTo = default, INatsSerialize? serializer = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default); + + /// + /// Reply to this message. + /// + /// A representing message details. + /// Serializer to use for the message type. + /// A for publishing options. + /// A used to cancel the command. + /// Specifies the type of data that may be sent to the NATS Server. + /// A that represents the asynchronous send operation. + /// + /// Publishes a new message using the reply-to subject from the this message as the destination subject. + /// + ValueTask ReplyAsync(NatsMsg msg, INatsSerialize? serializer = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default); +} + /// /// NATS message structure as defined by the protocol. /// @@ -28,7 +118,7 @@ public readonly record struct NatsMsg( int Size, NatsHeaders? Headers, T? Data, - INatsConnection? Connection) + INatsConnection? Connection) : INatsMsg { internal static NatsMsg Build( string subject, diff --git a/src/NATS.Client.JetStream/NatsJSMsg.cs b/src/NATS.Client.JetStream/NatsJSMsg.cs index 48368a541..9358f1228 100644 --- a/src/NATS.Client.JetStream/NatsJSMsg.cs +++ b/src/NATS.Client.JetStream/NatsJSMsg.cs @@ -6,17 +6,134 @@ namespace NATS.Client.JetStream; +/// +/// This interface provides an optional contract when passing +/// messages to processing methods which is usually helpful in +/// creating test doubles in unit testing. +/// +/// +/// +/// Using this interface is optional and should not affect functionality. +/// +/// +/// There is a performance penalty when using this interface because +/// is a value type and boxing is required. +/// A boxing allocation occurs when a value type is converted to the +/// interface type. This is because the interface type is a reference +/// type and the value type must be converted to a reference type. +/// You should benchmark your application to determine if the +/// interface is worth the performance penalty or makes any noticeable +/// degradation in performance. +/// +/// +/// Data type of the payload +public interface INatsJSMsg +{ + /// + /// Subject of the user message. + /// + string Subject { get; } + + /// + /// Message size in bytes. + /// + /// + /// Message size is calculated using the same method NATS server uses: + /// + /// int size = subject.Length + replyTo.Length + headers.Length + payload.Length; + /// + /// + int Size { get; } + + /// + /// Headers of the user message if set. + /// + NatsHeaders? Headers { get; } + + /// + /// Deserialized user data. + /// + T? Data { get; } + + /// + /// The connection messages was delivered on. + /// + INatsConnection? Connection { get; } + + /// + /// Additional metadata about the message. + /// + NatsJSMsgMetadata? Metadata { get; } + + /// + /// Reply with an empty message. + /// + /// Optional message headers. + /// Optional reply-to subject. + /// A for publishing options. + /// A used to cancel the command. + /// A that represents the asynchronous send operation. + ValueTask ReplyAsync(NatsHeaders? headers = default, string? replyTo = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default); + + /// + /// Acknowledges the message was completely handled. + /// + /// Ack options. + /// A used to cancel the call. + /// A representing the async call. + ValueTask AckAsync(AckOpts opts = default, CancellationToken cancellationToken = default); + + /// + /// Signals that the message will not be processed now and processing can move onto the next message. + /// + /// Delay redelivery of the message. + /// Ack options. + /// A used to cancel the call. + /// A representing the async call. + /// + /// Messages rejected using -NAK will be resent by the NATS JetStream server after the configured timeout + /// or the delay parameter if it's specified. + /// + ValueTask NakAsync(AckOpts opts = default, TimeSpan delay = default, CancellationToken cancellationToken = default); + + /// + /// Indicates that work is ongoing and the wait period should be extended. + /// + /// Ack options. + /// A used to cancel the call. + /// A representing the async call. + /// + /// + /// Time period is defined by the consumer's ack_wait configuration on the server which is + /// defined as how long to allow messages to remain un-acknowledged before attempting redelivery. + /// + /// + /// This message must be sent before the ack_wait period elapses. The period should be extended + /// by another amount of time equal to ack_wait by the NATS JetStream server. + /// + /// + ValueTask AckProgressAsync(AckOpts opts = default, CancellationToken cancellationToken = default); + + /// + /// Instructs the server to stop redelivery of the message without acknowledging it as successfully processed. + /// + /// Ack options. + /// A used to cancel the call. + /// A representing the async call. + ValueTask AckTerminateAsync(AckOpts opts = default, CancellationToken cancellationToken = default); +} + /// /// NATS JetStream message with and control messages. /// /// User message type -public readonly struct NatsJSMsg +public readonly struct NatsJSMsg : INatsJSMsg { private readonly NatsJSContext _context; private readonly NatsMsg _msg; private readonly Lazy _replyToDateTimeAndSeq; - internal NatsJSMsg(NatsMsg msg, NatsJSContext context) + public NatsJSMsg(NatsMsg msg, NatsJSContext context) { _msg = msg; _context = context; diff --git a/tests/NATS.Client.Core.Tests/MessageInterfaceTest.cs b/tests/NATS.Client.Core.Tests/MessageInterfaceTest.cs new file mode 100644 index 000000000..1acba090b --- /dev/null +++ b/tests/NATS.Client.Core.Tests/MessageInterfaceTest.cs @@ -0,0 +1,49 @@ +namespace NATS.Client.Core.Tests; + +public class MessageInterfaceTest +{ + [Fact] + public async Task Sub_custom_builder_test() + { + await using var server = NatsServer.Start(); + await using var nats = server.CreateClientConnection(); + + var sync = 0; + var sub = Task.Run(async () => + { + var count = 0; + await foreach (var natsMsg in nats.SubscribeAsync("foo.*")) + { + if (natsMsg.Subject == "foo.sync") + { + Interlocked.Increment(ref sync); + continue; + } + + if (natsMsg.Subject == "foo.end") + { + break; + } + + // Boxing allocation: conversion from 'NatsMsg' to 'INatsMsg' requires boxing of the value type + // vvvvvvv + ProcessMessage(count++, natsMsg); + } + }); + + await Retry.Until( + reason: "subscription is ready", + condition: () => Volatile.Read(ref sync) > 0, + action: async () => await nats.PubAsync("foo.sync"), + retryDelay: TimeSpan.FromSeconds(1)); + + for (var i = 0; i < 10; i++) + await nats.PublishAsync(subject: $"foo.{i}", data: $"test_msg_{i}"); + + await nats.PubAsync("foo.end"); + + await sub; + } + + private void ProcessMessage(int count, INatsMsg natsMsg) => natsMsg.Data.Should().Be($"test_msg_{count}"); +} diff --git a/tests/NATS.Client.JetStream.Tests/MessageInterfaceTest.cs b/tests/NATS.Client.JetStream.Tests/MessageInterfaceTest.cs new file mode 100644 index 000000000..a096eb11b --- /dev/null +++ b/tests/NATS.Client.JetStream.Tests/MessageInterfaceTest.cs @@ -0,0 +1,49 @@ +using NATS.Client.Core.Tests; + +namespace NATS.Client.JetStream.Tests; + +public class MessageInterfaceTest +{ + [Fact] + public async Task Using_message_interface() + { + await using var server = NatsServer.StartJS(); + await using var nats = server.CreateClientConnection(); + var js = new NatsJSContext(nats); + + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + + await js.CreateStreamAsync("s1", new[] { "s1.*" }, cts.Token); + + var ack = await js.PublishAsync("s1.foo", "test_msg", cancellationToken: cts.Token); + ack.EnsureSuccess(); + + var consumer = await js.CreateConsumerAsync("s1", "c1", cancellationToken: cts.Token); + + await foreach (var natsJSMsg in consumer.ConsumeAsync(cancellationToken: cts.Token)) + { + // Boxing allocation: conversion from 'NatsJSMsg' to 'INatsJSMsg' requires boxing of the value type + // vvvvvvvvv + await ProcessMessageAsync(natsJSMsg, cts.Token); + break; + } + + await Retry.Until( + "ack pending 0", + async () => + { + var c = await js.GetConsumerAsync("s1", "c1", cts.Token); + return c.Info.NumAckPending == 0; + }, + retryDelay: TimeSpan.FromSeconds(1), + timeout: TimeSpan.FromSeconds(20)); + await consumer.RefreshAsync(cts.Token); + Assert.Equal(0, consumer.Info.NumAckPending); + } + + private async Task ProcessMessageAsync(INatsJSMsg natsJSMsg, CancellationToken cancellationToken = default) + { + natsJSMsg.Data.Should().Be("test_msg"); + await natsJSMsg.AckAsync(cancellationToken: cancellationToken); + } +}