Skip to content

Commit

Permalink
#636 - JetStream Batch Get Client support
Browse files Browse the repository at this point in the history
* Added batch get contract
  • Loading branch information
Ivandemidov00 committed Nov 19, 2024
1 parent a86b4a8 commit ae90e37
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 0 deletions.
10 changes: 10 additions & 0 deletions src/NATS.Client.JetStream/INatsJSStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -112,5 +112,15 @@ ValueTask UpdateAsync(

ValueTask<NatsMsg<T>> GetDirectAsync<T>(StreamMsgGetRequest request, INatsDeserialize<T>? serializer = default, CancellationToken cancellationToken = default);

/// <summary>
/// Request a direct batch message
/// </summary>
/// <param name="request">Batch message request.</param>
/// <param name="serializer">Serializer to use for the message type.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <typeparam name="T"></typeparam>
/// <returns></returns>
IAsyncEnumerable<NatsMsg<T>> GetBatchDirectAsync<T>(StreamMsgBatchGetRequest request, INatsDeserialize<T>? serializer = default, CancellationToken cancellationToken = default);

ValueTask<StreamMsgGetResponse> GetAsync(StreamMsgGetRequest request, CancellationToken cancellationToken = default);
}
80 changes: 80 additions & 0 deletions src/NATS.Client.JetStream/Models/StreamMsgBatchGetRequest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
using System.ComponentModel.DataAnnotations;
using System.Text.Json.Serialization;

namespace NATS.Client.JetStream.Models;

/// <summary>
/// A request to the JetStream $JS.API.STREAM.MSG.GET API
/// </summary>
public record StreamMsgBatchGetRequest
{
/// <summary>
/// The maximum amount of messages to be returned for this request
/// </summary>
[JsonPropertyName("batch")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
[Range(-1, int.MaxValue)]
public int Batch { get; set; }

/// <summary>
/// The maximum amount of returned bytes for this request.
/// </summary>
[JsonPropertyName("max_bytes")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
[Range(-1, int.MaxValue)]
public int MaxBytes { get; set; }

/// <summary>
/// The minimum sequence for returned message
/// </summary>
[JsonPropertyName("seq")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
[Range(ulong.MinValue, ulong.MaxValue)]
public ulong MinSequence { get; set; }

/// <summary>
/// The minimum start time for returned message
/// </summary>
[JsonPropertyName("start_time")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
[Range(ulong.MinValue, ulong.MaxValue)]
public DateTime StartTime { get; set; }

/// <summary>
/// The subject used filter messages that should be returned
/// </summary>
[JsonPropertyName("next_by_subj")]
[JsonIgnore(Condition = JsonIgnoreCondition.Never)]
[Required]
#if NET6_0
public string Subject { get; set; } = default!;
#else
#pragma warning disable SA1206
public required string Subject { get; set; }

#pragma warning restore SA1206
#endif

/// <summary>
/// Return last messages mathing the subjects
/// </summary>
[JsonPropertyName("multi_last")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
[Range(ulong.MinValue, ulong.MaxValue)]
public string[] LastBySubjects { get; set; } = [];

/// <summary>
/// Return message after sequence
/// </summary>
[JsonPropertyName("up_to_seq")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
[Range(ulong.MinValue, ulong.MaxValue)]
public ulong UpToSequence { get; set; }

/// <summary>
/// Return message after time
/// </summary>
[JsonPropertyName("up_to_time")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
public DateTimeOffset UpToTime { get; set; }
}

0 comments on commit ae90e37

Please sign in to comment.