Skip to content

Commit

Permalink
Create an INatsJSMsg<T> interface (#266)
Browse files Browse the repository at this point in the history
* Created an INatsJSMsg interface

An INatsJSMsg interface can help with unit testing in common .net

* There maybe performance implications due to struct to class change
  assuming not noticeable. Struct to class change should not introduce
  functional change sine it is immutable.

* This would be a breaking change but the impact would be relatively
  small in most code bases.

* Made NatsJSMsg.ctor public

This could help with unit testing

* Warnings and format fixes

* Revert "Created an INatsJSMsg interface"

This reverts commit bac3b14.

* Revert "Warnings and format fixes"

This reverts commit 88f68fe.

* Use message interfaces only when required

* Message interface docs

* Fixed interface after merge
  • Loading branch information
mtmk authored Dec 12, 2023
1 parent e662bdd commit 780997b
Show file tree
Hide file tree
Showing 4 changed files with 308 additions and 3 deletions.
92 changes: 91 additions & 1 deletion src/NATS.Client.Core/NatsMsg.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,96 @@

namespace NATS.Client.Core;

/// <summary>
/// This interface provides an optional contract when passing
/// messages to processing methods which is usually helpful in
/// creating test doubles in unit testing.
/// </summary>
/// <remarks>
/// <para>
/// Using this interface is optional and should not affect functionality.
/// </para>
/// <para>
/// There is a performance penalty when using this interface because
/// <see cref="NatsMsg{T}"/> is a value type and boxing is required.
/// A boxing allocation occurs when a value type is converted to the
/// interface type. This is because the interface type is a reference
/// type and the value type must be converted to a reference type.
/// You should benchmark your application to determine if the
/// interface is worth the performance penalty or makes any noticeable
/// degradation in performance.
/// </para>
/// </remarks>
/// <typeparam name="T">Data type of the payload</typeparam>
public interface INatsMsg<T>
{
/// <summary>The destination subject to publish to.</summary>
string Subject { get; init; }

/// <summary>The reply subject that subscribers can use to send a response back to the publisher/requester.</summary>
string? ReplyTo { get; init; }

/// <summary>Message size in bytes.</summary>
int Size { get; init; }

/// <summary>Pass additional information using name-value pairs.</summary>
NatsHeaders? Headers { get; init; }

/// <summary>Serializable data object.</summary>
T? Data { get; init; }

/// <summary>NATS connection this message is associated to.</summary>
INatsConnection? Connection { get; init; }

/// <summary>
/// Reply with an empty message.
/// </summary>
/// <param name="headers">Optional message headers.</param>
/// <param name="replyTo">Optional reply-to subject.</param>
/// <param name="opts">A <see cref="NatsPubOpts"/> for publishing options.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the command.</param>
/// <returns>A <see cref="ValueTask"/> that represents the asynchronous send operation.</returns>
ValueTask ReplyAsync(NatsHeaders? headers = default, string? replyTo = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default);

/// <summary>
/// Reply to this message.
/// </summary>
/// <param name="data">Serializable data object.</param>
/// <param name="headers">Optional message headers.</param>
/// <param name="replyTo">Optional reply-to subject.</param>
/// <param name="serializer">Serializer to use for the message type.</param>
/// <param name="opts">A <see cref="NatsPubOpts"/> for publishing options.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the command.</param>
/// <typeparam name="TReply">Specifies the type of data that may be sent to the NATS Server.</typeparam>
/// <returns>A <see cref="ValueTask"/> that represents the asynchronous send operation.</returns>
/// <remarks>
/// <para>
/// Publishes a new message using the reply-to subject from the this message as the destination subject.
/// </para>
/// <para>
/// If the <paramref name="serializer"/> is not specified, the <see cref="INatsSerializerRegistry"/> assigned to
/// the <see cref="NatsConnection"/> will be used to find a serializer for the type <typeparamref name="TReply"/>.
/// You can specify a <see cref="INatsSerializerRegistry"/> in <see cref="NatsOpts"/> when creating a
/// <see cref="NatsConnection"/>. If not specified, <see cref="NatsDefaultSerializerRegistry"/> will be used.
/// </para>
/// </remarks>
ValueTask ReplyAsync<TReply>(TReply data, NatsHeaders? headers = default, string? replyTo = default, INatsSerialize<TReply>? serializer = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default);

/// <summary>
/// Reply to this message.
/// </summary>
/// <param name="msg">A <see cref="NatsMsg{T}"/> representing message details.</param>
/// <param name="serializer">Serializer to use for the message type.</param>
/// <param name="opts">A <see cref="NatsPubOpts"/> for publishing options.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the command.</param>
/// <typeparam name="TReply">Specifies the type of data that may be sent to the NATS Server.</typeparam>
/// <returns>A <see cref="ValueTask"/> that represents the asynchronous send operation.</returns>
/// <remarks>
/// Publishes a new message using the reply-to subject from the this message as the destination subject.
/// </remarks>
ValueTask ReplyAsync<TReply>(NatsMsg<TReply> msg, INatsSerialize<TReply>? serializer = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default);
}

/// <summary>
/// NATS message structure as defined by the protocol.
/// </summary>
Expand All @@ -28,7 +118,7 @@ public readonly record struct NatsMsg<T>(
int Size,
NatsHeaders? Headers,
T? Data,
INatsConnection? Connection)
INatsConnection? Connection) : INatsMsg<T>
{
internal static NatsMsg<T> Build(
string subject,
Expand Down
121 changes: 119 additions & 2 deletions src/NATS.Client.JetStream/NatsJSMsg.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,134 @@

namespace NATS.Client.JetStream;

/// <summary>
/// This interface provides an optional contract when passing
/// messages to processing methods which is usually helpful in
/// creating test doubles in unit testing.
/// </summary>
/// <remarks>
/// <para>
/// Using this interface is optional and should not affect functionality.
/// </para>
/// <para>
/// There is a performance penalty when using this interface because
/// <see cref="NatsJSMsg{T}"/> is a value type and boxing is required.
/// A boxing allocation occurs when a value type is converted to the
/// interface type. This is because the interface type is a reference
/// type and the value type must be converted to a reference type.
/// You should benchmark your application to determine if the
/// interface is worth the performance penalty or makes any noticeable
/// degradation in performance.
/// </para>
/// </remarks>
/// <typeparam name="T">Data type of the payload</typeparam>
public interface INatsJSMsg<out T>
{
/// <summary>
/// Subject of the user message.
/// </summary>
string Subject { get; }

/// <summary>
/// Message size in bytes.
/// </summary>
/// <remarks>
/// Message size is calculated using the same method NATS server uses:
/// <code lang="C#">
/// int size = subject.Length + replyTo.Length + headers.Length + payload.Length;
/// </code>
/// </remarks>
int Size { get; }

/// <summary>
/// Headers of the user message if set.
/// </summary>
NatsHeaders? Headers { get; }

/// <summary>
/// Deserialized user data.
/// </summary>
T? Data { get; }

/// <summary>
/// The connection messages was delivered on.
/// </summary>
INatsConnection? Connection { get; }

/// <summary>
/// Additional metadata about the message.
/// </summary>
NatsJSMsgMetadata? Metadata { get; }

/// <summary>
/// Reply with an empty message.
/// </summary>
/// <param name="headers">Optional message headers.</param>
/// <param name="replyTo">Optional reply-to subject.</param>
/// <param name="opts">A <see cref="NatsPubOpts"/> for publishing options.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the command.</param>
/// <returns>A <see cref="ValueTask"/> that represents the asynchronous send operation.</returns>
ValueTask ReplyAsync(NatsHeaders? headers = default, string? replyTo = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default);

/// <summary>
/// Acknowledges the message was completely handled.
/// </summary>
/// <param name="opts">Ack options.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the call.</param>
/// <returns>A <see cref="ValueTask"/> representing the async call.</returns>
ValueTask AckAsync(AckOpts opts = default, CancellationToken cancellationToken = default);

/// <summary>
/// Signals that the message will not be processed now and processing can move onto the next message.
/// </summary>
/// <param name="delay">Delay redelivery of the message.</param>
/// <param name="opts">Ack options.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the call.</param>
/// <returns>A <see cref="ValueTask"/> representing the async call.</returns>
/// <remarks>
/// Messages rejected using <c>-NAK</c> will be resent by the NATS JetStream server after the configured timeout
/// or the delay parameter if it's specified.
/// </remarks>
ValueTask NakAsync(AckOpts opts = default, TimeSpan delay = default, CancellationToken cancellationToken = default);

/// <summary>
/// Indicates that work is ongoing and the wait period should be extended.
/// </summary>
/// <param name="opts">Ack options.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the call.</param>
/// <returns>A <see cref="ValueTask"/> representing the async call.</returns>
/// <remarks>
/// <para>
/// Time period is defined by the consumer's <c>ack_wait</c> configuration on the server which is
/// defined as how long to allow messages to remain un-acknowledged before attempting redelivery.
/// </para>
/// <para>
/// This message must be sent before the <c>ack_wait</c> period elapses. The period should be extended
/// by another amount of time equal to <c>ack_wait</c> by the NATS JetStream server.
/// </para>
/// </remarks>
ValueTask AckProgressAsync(AckOpts opts = default, CancellationToken cancellationToken = default);

/// <summary>
/// Instructs the server to stop redelivery of the message without acknowledging it as successfully processed.
/// </summary>
/// <param name="opts">Ack options.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the call.</param>
/// <returns>A <see cref="ValueTask"/> representing the async call.</returns>
ValueTask AckTerminateAsync(AckOpts opts = default, CancellationToken cancellationToken = default);
}

/// <summary>
/// NATS JetStream message with <see cref="NatsMsg{T}"/> and control messages.
/// </summary>
/// <typeparam name="T">User message type</typeparam>
public readonly struct NatsJSMsg<T>
public readonly struct NatsJSMsg<T> : INatsJSMsg<T>
{
private readonly NatsJSContext _context;
private readonly NatsMsg<T> _msg;
private readonly Lazy<NatsJSMsgMetadata?> _replyToDateTimeAndSeq;

internal NatsJSMsg(NatsMsg<T> msg, NatsJSContext context)
public NatsJSMsg(NatsMsg<T> msg, NatsJSContext context)
{
_msg = msg;
_context = context;
Expand Down
49 changes: 49 additions & 0 deletions tests/NATS.Client.Core.Tests/MessageInterfaceTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
namespace NATS.Client.Core.Tests;

public class MessageInterfaceTest
{
[Fact]
public async Task Sub_custom_builder_test()
{
await using var server = NatsServer.Start();
await using var nats = server.CreateClientConnection();

var sync = 0;
var sub = Task.Run(async () =>
{
var count = 0;
await foreach (var natsMsg in nats.SubscribeAsync<string>("foo.*"))
{
if (natsMsg.Subject == "foo.sync")
{
Interlocked.Increment(ref sync);
continue;
}

if (natsMsg.Subject == "foo.end")
{
break;
}

// Boxing allocation: conversion from 'NatsMsg<string>' to 'INatsMsg<string>' requires boxing of the value type
// vvvvvvv
ProcessMessage(count++, natsMsg);
}
});

await Retry.Until(
reason: "subscription is ready",
condition: () => Volatile.Read(ref sync) > 0,
action: async () => await nats.PubAsync("foo.sync"),
retryDelay: TimeSpan.FromSeconds(1));

for (var i = 0; i < 10; i++)
await nats.PublishAsync(subject: $"foo.{i}", data: $"test_msg_{i}");

await nats.PubAsync("foo.end");

await sub;
}

private void ProcessMessage(int count, INatsMsg<string> natsMsg) => natsMsg.Data.Should().Be($"test_msg_{count}");
}
49 changes: 49 additions & 0 deletions tests/NATS.Client.JetStream.Tests/MessageInterfaceTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
using NATS.Client.Core.Tests;

namespace NATS.Client.JetStream.Tests;

public class MessageInterfaceTest
{
[Fact]
public async Task Using_message_interface()
{
await using var server = NatsServer.StartJS();
await using var nats = server.CreateClientConnection();
var js = new NatsJSContext(nats);

var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));

await js.CreateStreamAsync("s1", new[] { "s1.*" }, cts.Token);

var ack = await js.PublishAsync("s1.foo", "test_msg", cancellationToken: cts.Token);
ack.EnsureSuccess();

var consumer = await js.CreateConsumerAsync("s1", "c1", cancellationToken: cts.Token);

await foreach (var natsJSMsg in consumer.ConsumeAsync<string>(cancellationToken: cts.Token))
{
// Boxing allocation: conversion from 'NatsJSMsg<string>' to 'INatsJSMsg<string>' requires boxing of the value type
// vvvvvvvvv
await ProcessMessageAsync(natsJSMsg, cts.Token);
break;
}

await Retry.Until(
"ack pending 0",
async () =>
{
var c = await js.GetConsumerAsync("s1", "c1", cts.Token);
return c.Info.NumAckPending == 0;
},
retryDelay: TimeSpan.FromSeconds(1),
timeout: TimeSpan.FromSeconds(20));
await consumer.RefreshAsync(cts.Token);
Assert.Equal(0, consumer.Info.NumAckPending);
}

private async Task ProcessMessageAsync(INatsJSMsg<string> natsJSMsg, CancellationToken cancellationToken = default)
{
natsJSMsg.Data.Should().Be("test_msg");
await natsJSMsg.AckAsync(cancellationToken: cancellationToken);
}
}

0 comments on commit 780997b

Please sign in to comment.