Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add flags to NatsMsg and JetStream Request #652

Merged
merged 16 commits into from
Nov 22, 2024
152 changes: 138 additions & 14 deletions src/NATS.Client.Core/NatsMsg.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,14 @@

namespace NATS.Client.Core;

[Flags]
public enum NatsMsgFlags : byte
{
None = 0,
Empty = 1,
NoResponders = 2,
}

/// <summary>
/// This interface provides an optional contract when passing
/// messages to processing methods which is usually helpful in
Expand Down Expand Up @@ -103,12 +111,6 @@ public interface INatsMsg<T>
/// <summary>
/// NATS message structure as defined by the protocol.
/// </summary>
/// <param name="Subject">The destination subject to publish to.</param>
/// <param name="ReplyTo">The reply subject that subscribers can use to send a response back to the publisher/requester.</param>
/// <param name="Size">Message size in bytes.</param>
/// <param name="Headers">Pass additional information using name-value pairs.</param>
/// <param name="Data">Serializable data object.</param>
/// <param name="Connection">NATS connection this message is associated to.</param>
/// <typeparam name="T">Specifies the type of data that may be sent to the NATS Server.</typeparam>
/// <remarks>
/// <para>Connection property is used to provide reply functionality.</para>
Expand All @@ -119,20 +121,121 @@ public interface INatsMsg<T>
/// </code>
/// </para>
/// </remarks>
public readonly record struct NatsMsg<T>(
string Subject,
string? ReplyTo,
int Size,
NatsHeaders? Headers,
T? Data,
INatsConnection? Connection) : INatsMsg<T>
public readonly record struct NatsMsg<T> : INatsMsg<T>
{
/*
2 30
+--+------------------------------+
|EN| Message Size |
+--+------------------------------+
E: Empty flag
N: No responders flag

# Size is 30 bits:
Max Size: 1,073,741,823 (0x3FFFFFFF / 00111111111111111111111111111111)
Uint.Max: 4,294,967,295
Int.Max: 2,147,483,647
8mb: 8,388,608
*/
private readonly uint _flagsAndSize;

/// <summary>
/// NATS message structure as defined by the protocol.
/// </summary>
/// <param name="subject">The destination subject to publish to.</param>
/// <param name="replyTo">The reply subject that subscribers can use to send a response back to the publisher/requester.</param>
/// <param name="size">Message size in bytes.</param>
/// <param name="headers">Pass additional information using name-value pairs.</param>
/// <param name="data">Serializable data object.</param>
/// <param name="connection">NATS connection this message is associated to.</param>
/// <param name="flags">Message flags to indicate no responders and empty payloads.</param>
/// <remarks>
/// <para>Connection property is used to provide reply functionality.</para>
/// <para>
/// Message size is calculated using the same method NATS server uses:
/// <code lang="C#">
/// int size = subject.Length + replyTo.Length + headers.Length + payload.Length;
/// </code>
/// </para>
/// </remarks>
public NatsMsg(
string subject,
string? replyTo,
int size,
NatsHeaders? headers,
T? data,
INatsConnection? connection,
NatsMsgFlags flags = default)
{
Subject = subject;
ReplyTo = replyTo;
Size = size;
mtmk marked this conversation as resolved.
Show resolved Hide resolved
Flags = flags;
Headers = headers;
Data = data;
Connection = connection;
}

/// <inheritdoc />
public NatsException? Error => Headers?.Error;

/// <summary>The destination subject to publish to.</summary>
public string Subject { get; init; }

/// <summary>The reply subject that subscribers can use to send a response back to the publisher/requester.</summary>
public string? ReplyTo { get; init; }

/// <summary>Message size in bytes.</summary>
public int Size
{
// Extract the lower 30 bits
get => (int)(_flagsAndSize & 0x3FFFFFFF);

// Clear the lower 30 bits and set the new number
init
{
// Mask the input value to fit within 30 bits (clear upper bits)
var numberPart = (uint)(value & 0x3FFFFFFF);

// Clear the lower 30 bits and set the new number value
// Preserve the flags, update the number
_flagsAndSize = (_flagsAndSize & 0xC0000000) | numberPart;
}
}

public NatsMsgFlags Flags
{
// Extract the two leftmost bits (31st and 30th bit)
// Mask with 0b11 to get two bits
get => (NatsMsgFlags)((_flagsAndSize >> 30) & 0b11);

init
{
// Clear the current flag bits (set to 0) and then set the new flag value
var flagsPart = (uint)value << 30;
_flagsAndSize = (_flagsAndSize & 0x3FFFFFFF) | flagsPart;
}
}

/// <summary>Pass additional information using name-value pairs.</summary>
public NatsHeaders? Headers { get; init; }

/// <summary>Serializable data object.</summary>
public T? Data { get; init; }

/// <summary>NATS connection this message is associated to.</summary>
public INatsConnection? Connection { get; init; }

public bool IsEmpty => (Flags & NatsMsgFlags.Empty) == NatsMsgFlags.Empty;

public bool HasNoResponders => (Flags & NatsMsgFlags.NoResponders) == NatsMsgFlags.NoResponders;
mtmk marked this conversation as resolved.
Show resolved Hide resolved

/// <inheritdoc />
public void EnsureSuccess()
{
if (HasNoResponders)
throw new NatsNoRespondersException();

if (Error != null)
throw Error;
}
Expand Down Expand Up @@ -197,6 +300,17 @@ public ValueTask ReplyAsync<TReply>(NatsMsg<TReply> msg, INatsSerialize<TReply>?
return Connection.PublishAsync(msg with { Subject = ReplyTo }, serializer, opts, cancellationToken);
}

public void Deconstruct(out string subject, out string? replyTo, out int size, out NatsHeaders? headers, out T? data, out INatsConnection? connection, out NatsMsgFlags flags)
{
subject = Subject;
replyTo = ReplyTo;
size = Size;
headers = Headers;
data = Data;
connection = Connection;
flags = Flags;
}

internal static NatsMsg<T> Build(
string subject,
string? replyTo,
Expand All @@ -207,6 +321,16 @@ internal static NatsMsg<T> Build(
INatsDeserialize<T> serializer)
{
NatsHeaders? headers = null;
var flags = NatsMsgFlags.None;

if (payloadBuffer.Length == 0)
{
flags |= NatsMsgFlags.Empty;
if (NatsSubBase.IsHeader503(headersBuffer))
mtmk marked this conversation as resolved.
Show resolved Hide resolved
{
flags |= NatsMsgFlags.NoResponders;
}
}

if (headersBuffer != null)
{
Expand Down Expand Up @@ -277,7 +401,7 @@ internal static NatsMsg<T> Build(
}
}

return new NatsMsg<T>(subject, replyTo, (int)size, headers, data, connection);
return new NatsMsg<T>(subject, replyTo, (int)size, headers, data, connection, flags);
}

[MemberNotNull(nameof(Connection))]
Expand Down
6 changes: 5 additions & 1 deletion src/NATS.Client.Core/NatsSubBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ public virtual async ValueTask ReceiveAsync(string subject, string? replyTo, Rea
{
switch (Opts)
{
case { ThrowIfNoResponders: true } when headersBuffer is { Length: >= 12 } && headersBuffer.Value.Slice(8, 4).ToSpan().SequenceEqual(NoRespondersHeaderSequence):
case { ThrowIfNoResponders: true } when IsHeader503(headersBuffer):
SetException(new NatsNoRespondersException());
return;
case { StopOnEmptyMsg: true }:
Expand Down Expand Up @@ -311,6 +311,10 @@ public virtual async ValueTask ReceiveAsync(string subject, string? replyTo, Rea
}
}

internal static bool IsHeader503(ReadOnlySequence<byte>? headersBuffer) =>
headersBuffer is { Length: >= 12 }
&& headersBuffer.Value.Slice(8, 4).ToSpan().SequenceEqual(NoRespondersHeaderSequence);

internal void ClearException() => Interlocked.Exchange(ref _exception, null);

/// <summary>
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
using System.Buffers;
using System.Text.Json;
using NATS.Client.Core;

namespace NATS.Client.JetStream.Internal;

internal sealed class NatsJSForcedJsonDocumentSerializer<T> : INatsDeserialize<T>
mtmk marked this conversation as resolved.
Show resolved Hide resolved
{
public static readonly NatsJSForcedJsonDocumentSerializer<T> Default = new();

public T? Deserialize(in ReadOnlySequence<byte> buffer)
{
if (buffer.Length == 0)
{
return default;
}

// Force return JsonDocument instead of T
return (T)(object)JsonDocument.Parse(buffer);
}
}
66 changes: 53 additions & 13 deletions src/NATS.Client.JetStream/NatsJSContext.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System.Diagnostics.CodeAnalysis;
using System.Runtime.CompilerServices;
using System.Text.Json;
using Microsoft.Extensions.Logging;
using NATS.Client.Core;
using NATS.Client.JetStream.Internal;
Expand Down Expand Up @@ -285,49 +286,88 @@ internal async ValueTask<NatsJSResponse<TResponse>> JSRequestAsync<TRequest, TRe
CancellationToken cancellationToken = default)
where TRequest : class
where TResponse : class
{
var (response, exception) = await TryJSRequestAsync<TRequest, TResponse>(subject, request, cancellationToken).ConfigureAwait(false);
if (exception != null)
{
throw exception;
}

if (response != null)
return response.Value;

throw new Exception("State error: No response received");
}

internal async ValueTask<(NatsJSResponse<TResponse>?, Exception?)> TryJSRequestAsync<TRequest, TResponse>(
string subject,
TRequest? request,
CancellationToken cancellationToken = default)
where TRequest : class
where TResponse : class
{
if (request != null)
{
// TODO: Can't validate using JSON serializer context at the moment.
// Validator.ValidateObject(request, new ValidationContext(request));
}

await using var sub = await Connection.CreateRequestSubAsync<TRequest, TResponse>(
await using var sub = await Connection.CreateRequestSubAsync<TRequest, JsonDocument>(
subject: subject,
data: request,
headers: default,
replyOpts: new NatsSubOpts { Timeout = Connection.Opts.RequestTimeout },
requestSerializer: NatsJSJsonSerializer<TRequest>.Default,
replySerializer: NatsJSErrorAwareJsonSerializer<TResponse>.Default,
replySerializer: NatsJSForcedJsonDocumentSerializer<JsonDocument>.Default,
cancellationToken: cancellationToken)
.ConfigureAwait(false);

await foreach (var msg in sub.Msgs.ReadAllAsync(cancellationToken).ConfigureAwait(false))
{
if (msg.Error is { } error)
// We need to determine what type we're deserializing into
// .NET 6 new APIs to the rescue: we can read the buffer once
// by deserializing into a document, inspect and using the new
// API deserialize to the final type from the document.
if (msg.Data == null)
{
if (error.InnerException is NatsJSApiErrorException jsError)
{
return new NatsJSResponse<TResponse>(default, jsError.Error);
}
return (default, new NatsJSException("No response data received"));
}

var jsonDocument = msg.Data;
mtmk marked this conversation as resolved.
Show resolved Hide resolved

throw error;
if (jsonDocument.RootElement.TryGetProperty("error", out var errorElement))
{
var error = errorElement.Deserialize(JetStream.NatsJSJsonSerializerContext.Default.ApiError) ?? throw new NatsJSException("Can't parse JetStream error JSON payload");
return (new NatsJSResponse<TResponse>(default, error), default);
}

if (msg.Data == null)
var jsonTypeInfo = NatsJSJsonSerializerContext.DefaultContext.GetTypeInfo(typeof(TResponse));
if (jsonTypeInfo == null)
{
throw new NatsJSException("No response data received");
return (default, new NatsJSException($"Unknown response type {typeof(TResponse)}"));
}

return new NatsJSResponse<TResponse>(msg.Data, default);
var response = (TResponse?)jsonDocument.RootElement.Deserialize(jsonTypeInfo);

if (msg.Error is { } messageError)
{
return (default, messageError);
}

return (new NatsJSResponse<TResponse>(response, default), default);
}

if (sub is NatsSubBase { EndReason: NatsSubEndReason.Exception, Exception: not null } sb)
{
throw sb.Exception;
return (default, sb.Exception);
}

if (sub.EndReason != NatsSubEndReason.None)
{
return (default, new NatsJSApiNoResponseException(sub.EndReason));
}

throw new NatsJSApiNoResponseException();
return (default, new NatsJSApiNoResponseException());
}

private static void ConvertDomain(StreamSource streamSource)
Expand Down
Loading