Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add flags to NatsMsg and JetStream Request #652

Merged
merged 16 commits into from
Nov 22, 2024
152 changes: 138 additions & 14 deletions src/NATS.Client.Core/NatsMsg.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,18 @@
using System.Buffers;
using System.Diagnostics.CodeAnalysis;
using System.Xml.Linq;
using NATS.Client.Core.Internal;

namespace NATS.Client.Core;

[Flags]
public enum NatsMsgFlags : byte
{
None = 0,
Empty = 1,
NoResponders = 2,
}

/// <summary>
/// This interface provides an optional contract when passing
/// messages to processing methods which is usually helpful in
Expand Down Expand Up @@ -103,12 +112,6 @@ public interface INatsMsg<T>
/// <summary>
/// NATS message structure as defined by the protocol.
/// </summary>
/// <param name="Subject">The destination subject to publish to.</param>
/// <param name="ReplyTo">The reply subject that subscribers can use to send a response back to the publisher/requester.</param>
/// <param name="Size">Message size in bytes.</param>
/// <param name="Headers">Pass additional information using name-value pairs.</param>
/// <param name="Data">Serializable data object.</param>
/// <param name="Connection">NATS connection this message is associated to.</param>
/// <typeparam name="T">Specifies the type of data that may be sent to the NATS Server.</typeparam>
/// <remarks>
/// <para>Connection property is used to provide reply functionality.</para>
Expand All @@ -119,20 +122,120 @@ public interface INatsMsg<T>
/// </code>
/// </para>
/// </remarks>
public readonly record struct NatsMsg<T>(
string Subject,
string? ReplyTo,
int Size,
NatsHeaders? Headers,
T? Data,
INatsConnection? Connection) : INatsMsg<T>
public readonly record struct NatsMsg<T> : INatsMsg<T>
{
/*
2 30
+--+------------------------------+
|EN| Message Size |
+--+------------------------------+
E: Empty flag
N: No responders flag

# Size is 30 bits:
Max Size: 1,073,741,823 (0x3FFFFFFF / 00111111111111111111111111111111)
Uint.Max: 4,294,967,295
Int.Max: 2,147,483,647
8mb: 8,388,608
*/
private readonly uint _flagsAndSize;

/// <summary>
/// NATS message structure as defined by the protocol.
/// </summary>
/// <param name="subject">The destination subject to publish to.</param>
/// <param name="replyTo">The reply subject that subscribers can use to send a response back to the publisher/requester.</param>
/// <param name="size">Message size in bytes.</param>
/// <param name="headers">Pass additional information using name-value pairs.</param>
/// <param name="data">Serializable data object.</param>
/// <param name="connection">NATS connection this message is associated to.</param>
/// <param name="flags">Message flags to indicate no responders and empty payloads.</param>
/// <remarks>
/// <para>Connection property is used to provide reply functionality.</para>
/// <para>
/// Message size is calculated using the same method NATS server uses:
/// <code lang="C#">
/// int size = subject.Length + replyTo.Length + headers.Length + payload.Length;
/// </code>
/// </para>
/// </remarks>
public NatsMsg(
string subject,
string? replyTo,
int size,
NatsHeaders? headers,
T? data,
INatsConnection? connection,
NatsMsgFlags flags = default)
{
Subject = subject;
ReplyTo = replyTo;
_flagsAndSize = ((uint)flags << 30) | (uint)(size & 0x3FFFFFFF);
Headers = headers;
Data = data;
Connection = connection;
}

/// <inheritdoc />
public NatsException? Error => Headers?.Error;

/// <summary>The destination subject to publish to.</summary>
public string Subject { get; init; }

/// <summary>The reply subject that subscribers can use to send a response back to the publisher/requester.</summary>
public string? ReplyTo { get; init; }

/// <summary>Message size in bytes.</summary>
public int Size
{
// Extract the lower 30 bits
get => (int)(_flagsAndSize & 0x3FFFFFFF);

// Clear the lower 30 bits and set the new number
init
{
// Mask the input value to fit within 30 bits (clear upper bits)
var numberPart = (uint)(value & 0x3FFFFFFF);

// Clear the lower 30 bits and set the new number value
// Preserve the flags, update the number
_flagsAndSize = (_flagsAndSize & 0xC0000000) | numberPart;
}
}

public NatsMsgFlags Flags
{
// Extract the two leftmost bits (31st and 30th bit)
// Mask with 0b11 to get two bits
get => (NatsMsgFlags)((_flagsAndSize >> 30) & 0b11);

init
{
// Clear the current flag bits (set to 0) and then set the new flag value
var flagsPart = (uint)value << 30;
_flagsAndSize = (_flagsAndSize & 0x3FFFFFFF) | flagsPart;
}
}

/// <summary>Pass additional information using name-value pairs.</summary>
public NatsHeaders? Headers { get; init; }

/// <summary>Serializable data object.</summary>
public T? Data { get; init; }

/// <summary>NATS connection this message is associated to.</summary>
public INatsConnection? Connection { get; init; }

public bool IsEmpty => (_flagsAndSize & 0x40000000) != 0;

public bool HasNoResponders => (_flagsAndSize & 0x80000000) != 0;

/// <inheritdoc />
public void EnsureSuccess()
{
if (HasNoResponders)
throw new NatsNoRespondersException();

if (Error != null)
throw Error;
}
Expand Down Expand Up @@ -197,6 +300,17 @@ public ValueTask ReplyAsync<TReply>(NatsMsg<TReply> msg, INatsSerialize<TReply>?
return Connection.PublishAsync(msg with { Subject = ReplyTo }, serializer, opts, cancellationToken);
}

public void Deconstruct(out string subject, out string? replyTo, out int size, out NatsHeaders? headers, out T? data, out INatsConnection? connection, out NatsMsgFlags flags)
{
subject = Subject;
replyTo = ReplyTo;
size = Size;
headers = Headers;
data = Data;
connection = Connection;
flags = Flags;
}

internal static NatsMsg<T> Build(
string subject,
string? replyTo,
Expand All @@ -207,6 +321,16 @@ internal static NatsMsg<T> Build(
INatsDeserialize<T> serializer)
{
NatsHeaders? headers = null;
var flags = NatsMsgFlags.None;

if (payloadBuffer.Length == 0)
{
flags |= NatsMsgFlags.Empty;
if (NatsSubBase.IsHeader503(headersBuffer))
mtmk marked this conversation as resolved.
Show resolved Hide resolved
{
flags |= NatsMsgFlags.NoResponders;
}
}

if (headersBuffer != null)
{
Expand Down Expand Up @@ -277,7 +401,7 @@ internal static NatsMsg<T> Build(
}
}

return new NatsMsg<T>(subject, replyTo, (int)size, headers, data, connection);
return new NatsMsg<T>(subject, replyTo, (int)size, headers, data, connection, flags);
}

[MemberNotNull(nameof(Connection))]
Expand Down
38 changes: 38 additions & 0 deletions src/NATS.Client.Core/NatsResult.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
using System.Runtime.CompilerServices;

namespace NATS.Client.Core;

public readonly struct NatsResult<T>
{
private readonly T? _value;
private readonly Exception? _error;

public NatsResult(T value)
{
_value = value;
_error = null;
}

public NatsResult(Exception error)
{
_value = default;
_error = error;
}

public T Value => _value ?? ThrowValueIsNotSetException();

public Exception Error => _error ?? ThrowErrorIsNotSetException();

public bool Success => _error == null;

public static implicit operator NatsResult<T>(T value) => new(value);

public static implicit operator NatsResult<T>(Exception error) => new(error);

private static T ThrowValueIsNotSetException() => throw CreateInvalidOperationException("Result value is not set");

private static Exception ThrowErrorIsNotSetException() => throw CreateInvalidOperationException("Result error is not set");

[MethodImpl(MethodImplOptions.NoInlining)]
private static Exception CreateInvalidOperationException(string message) => new InvalidOperationException(message);
}
6 changes: 5 additions & 1 deletion src/NATS.Client.Core/NatsSubBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ public virtual async ValueTask ReceiveAsync(string subject, string? replyTo, Rea
{
switch (Opts)
{
case { ThrowIfNoResponders: true } when headersBuffer is { Length: >= 12 } && headersBuffer.Value.Slice(8, 4).ToSpan().SequenceEqual(NoRespondersHeaderSequence):
case { ThrowIfNoResponders: true } when IsHeader503(headersBuffer):
SetException(new NatsNoRespondersException());
return;
case { StopOnEmptyMsg: true }:
Expand Down Expand Up @@ -311,6 +311,10 @@ public virtual async ValueTask ReceiveAsync(string subject, string? replyTo, Rea
}
}

internal static bool IsHeader503(ReadOnlySequence<byte>? headersBuffer) =>
headersBuffer is { Length: >= 12 }
&& headersBuffer.Value.Slice(8, 4).ToSpan().SequenceEqual(NoRespondersHeaderSequence);

internal void ClearException() => Interlocked.Exchange(ref _exception, null);

/// <summary>
Expand Down

This file was deleted.

12 changes: 12 additions & 0 deletions src/NATS.Client.JetStream/Internal/NatsJSJsonDocumentSerializer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
using System.Buffers;
using System.Text.Json;
using NATS.Client.Core;

namespace NATS.Client.JetStream.Internal;

internal sealed class NatsJSJsonDocumentSerializer : INatsDeserialize<JsonDocument>
{
public static readonly NatsJSJsonDocumentSerializer Default = new();

public JsonDocument? Deserialize(in ReadOnlySequence<byte> buffer) => buffer.Length == 0 ? default : JsonDocument.Parse(buffer);
}
Loading