Skip to content

Commit

Permalink
Pause and Resume consumer (#874)
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf authored Mar 7, 2024
1 parent c4dd08b commit a7c301f
Show file tree
Hide file tree
Showing 11 changed files with 268 additions and 1 deletion.
5 changes: 5 additions & 0 deletions src/NATS.Client/Internals/JetStreamConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ public static class JetStreamConstants
/// </summary>
public const string JsapiConsumerDelete = "CONSUMER.DELETE.{0}.{1}";

/// <summary>
/// JSAPI_CONSUMER_PAUSE is used to delete consumers.
/// </summary>
public const string JsapiConsumerPause = "CONSUMER.PAUSE.{0}.{1}";

/// <summary>
/// JSAPI_CONSUMER_NAMES is used to return a list of consumer names
/// </summary>
Expand Down
22 changes: 22 additions & 0 deletions src/NATS.Client/Internals/JsonUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -151,12 +151,34 @@ public static DateTime AsDate(JSONNode node)
return DateTime.MinValue;
}
}

public static DateTime? AsOptionalDate(JSONNode node)
{
try
{
if (node.IsNull)
{
return null;
}
return DateTime.Parse(node.Value).ToUniversalTime();
}
catch (Exception)
{
return null;
}
}

public static string ToString(DateTime dt)
{
// Assume MinValue is Unset
return dt.Equals(DateTime.MinValue) ? null : UnsafeToString(dt);
}

public static string ToString(DateTime? dt)
{
// Assume MinValue is Unset
return !dt.HasValue || dt.Equals(DateTime.MinValue) ? null : UnsafeToString(dt.Value);
}

public static string UnsafeToString(DateTime dt)
{
Expand Down
3 changes: 3 additions & 0 deletions src/NATS.Client/JetStream/ApiConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,9 @@ public static class ApiConstants
public const string Options = "options";
public const string OptStartSeq = "opt_start_seq";
public const string OptStartTime = "opt_start_time";
public const string Paused = "paused";
public const string PauseRemaining = "pause_remaining";
public const string PauseUntil = "pause_until";
public const string Placement = "placement";
public const string ProcessingTime = "processing_time";
public const string Republish = "republish";
Expand Down
23 changes: 23 additions & 0 deletions src/NATS.Client/JetStream/ConsumerConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public sealed class ConsumerConfiguration : JsonSerializable
public string DeliverGroup { get; }
public string SampleFrequency { get; }
public DateTime StartTime { get; }
public DateTime? PauseUntil { get; }
public Duration AckWait { get; }
public Duration IdleHeartbeat { get; }
public Duration MaxExpires { get; }
Expand Down Expand Up @@ -113,6 +114,7 @@ internal ConsumerConfiguration(JSONNode ccNode)
SampleFrequency = ccNode[ApiConstants.SampleFreq].Value;

StartTime = AsDate(ccNode[ApiConstants.OptStartTime]);
PauseUntil = AsOptionalDate(ccNode[ApiConstants.PauseUntil]);
AckWait = AsDuration(ccNode, ApiConstants.AckWait, null);
IdleHeartbeat = AsDuration(ccNode, ApiConstants.IdleHeartbeat, null);
MaxExpires = AsDuration(ccNode, ApiConstants.MaxExpires, null);
Expand Down Expand Up @@ -158,6 +160,7 @@ private ConsumerConfiguration(ConsumerConfigurationBuilder builder)
SampleFrequency = builder._sampleFrequency;

StartTime = builder._startTime;
PauseUntil = builder._pauseUntil;
AckWait = builder._ackWait;
IdleHeartbeat = builder._idleHeartbeat;
MaxExpires = builder._maxExpires;
Expand Down Expand Up @@ -192,6 +195,7 @@ public override JSONNode ToJsonNode()
AddField(o, ApiConstants.DeliverGroup, DeliverGroup);
AddField(o, ApiConstants.OptStartSeq, StartSeq);
AddField(o, ApiConstants.OptStartTime, JsonUtils.ToString(StartTime));
AddField(o, ApiConstants.PauseUntil, JsonUtils.ToString(PauseUntil));
AddField(o, ApiConstants.AckPolicy, AckPolicy.GetString());
AddField(o, ApiConstants.AckWait, AckWait);
AddField(o, ApiConstants.MaxDeliver, MaxDeliver);
Expand Down Expand Up @@ -252,6 +256,7 @@ internal IList<string> GetChanges(ConsumerConfiguration server)
if (InactiveThreshold != null && !InactiveThreshold.Equals(server.InactiveThreshold)) { changes.Add("InactiveThreshold"); }

RecordWouldBeChange(StartTime, server.StartTime, "StartTime", changes);
RecordWouldBeChange(PauseUntil, server.PauseUntil, "PauseUntil", changes);

RecordWouldBeChange(Description, server.Description, "Description", changes);
RecordWouldBeChange(SampleFrequency, server.SampleFrequency, "SampleFrequency", changes);
Expand All @@ -275,6 +280,11 @@ private void RecordWouldBeChange(DateTime request, DateTime server, string field
{
if (request != DateTime.MinValue && !request.Equals(server)) { changes.Add(field); }
}

private void RecordWouldBeChange(DateTime? request, DateTime? server, string field, IList<string> changes)
{
RecordWouldBeChange(request.GetValueOrDefault(DateTime.MinValue), server.GetValueOrDefault(DateTime.MinValue), field, changes);
}

internal static int GetOrUnset(int? val)
{
Expand Down Expand Up @@ -341,6 +351,7 @@ public sealed class ConsumerConfigurationBuilder
internal string _sampleFrequency;

internal DateTime _startTime;
internal DateTime? _pauseUntil;
internal Duration _ackWait;
internal Duration _idleHeartbeat;
internal Duration _maxExpires;
Expand Down Expand Up @@ -379,6 +390,7 @@ public ConsumerConfigurationBuilder(ConsumerConfiguration cc)
_sampleFrequency = cc.SampleFrequency;

_startTime = cc.StartTime;
_pauseUntil = cc.PauseUntil;
_ackWait = cc.AckWait;
_idleHeartbeat = cc.IdleHeartbeat;
_maxExpires = cc.MaxExpires;
Expand Down Expand Up @@ -506,6 +518,17 @@ public ConsumerConfigurationBuilder WithStartTime(DateTime startTime)
return this;
}

/// <summary>
/// Sets the time to pause the consumer until
/// </summary>
/// <param name="pauseUntil">the time to pause the consumer until</param>
/// <returns>The ConsumerConfigurationBuilder</returns>
public ConsumerConfigurationBuilder WithPauseUntil(DateTime? pauseUntil)
{
_pauseUntil = pauseUntil;
return this;
}

/// <summary>
/// Sets the acknowledgement policy of the ConsumerConfiguration.
/// </summary>
Expand Down
9 changes: 8 additions & 1 deletion src/NATS.Client/JetStream/ConsumerInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// limitations under the License.

using System;
using NATS.Client.Internals;
using NATS.Client.Internals.SimpleJSON;
using static NATS.Client.Internals.JsonUtils;

Expand All @@ -33,7 +34,9 @@ public sealed class ConsumerInfo : ApiResponse
public bool PushBound { get; private set; }
public ulong CalculatedPending => NumPending + Delivered.ConsumerSeq;
public DateTime Timestamp { get; private set; }

public bool Paused { get; private set; }
public Duration PauseRemaining { get; private set; }

internal ConsumerInfo(Msg msg, bool throwOnError) : base(msg, throwOnError)
{
Init(JsonNode);
Expand Down Expand Up @@ -64,6 +67,8 @@ private void Init(JSONNode ciNode)
ClusterInfo = ClusterInfo.OptionalInstance(ciNode[ApiConstants.Cluster]);
PushBound = ciNode[ApiConstants.PushBound].AsBool;
Timestamp = AsDate(ciNode[ApiConstants.Timestamp]);
Paused = ciNode[ApiConstants.Paused].AsBool;
PauseRemaining = Paused ? JsonUtils.AsDuration(ciNode, ApiConstants.PauseRemaining, Duration.Zero) : null;
}

public override string ToString()
Expand All @@ -80,6 +85,8 @@ public override string ToString()
", Timestamp=" + Timestamp +
", Delivered=" + Delivered +
", AckFloor=" + AckFloor +
", Paused=" + Paused +
", PauseRemaining=" + PauseRemaining +
", " + ObjectString("ClusterInfo", ClusterInfo) +
", " + "ConsumerConfiguration" + ConsumerConfiguration.ToJsonString() +
'}';
Expand Down
36 changes: 36 additions & 0 deletions src/NATS.Client/JetStream/ConsumerPauseRequest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright 2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using System;
using NATS.Client.Internals;
using NATS.Client.Internals.SimpleJSON;

namespace NATS.Client.JetStream
{
public sealed class ConsumerPauseRequest : JsonSerializable
{
public DateTime PauseUntil { get; }

internal ConsumerPauseRequest(DateTime pauseUntil)
{
PauseUntil = pauseUntil;
}

public override JSONNode ToJsonNode()
{
JSONObject jso = new JSONObject();
JsonUtils.AddField(jso, ApiConstants.PauseUntil, PauseUntil);
return jso;
}
}
}
51 changes: 51 additions & 0 deletions src/NATS.Client/JetStream/ConsumerPauseResponse.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright 2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using System;
using NATS.Client.Internals;
using static NATS.Client.Internals.JsonUtils;

namespace NATS.Client.JetStream
{
public sealed class ConsumerPauseResponse : ApiResponse
{
public bool Paused { get; private set; }
public DateTime? PauseUntil { get; private set; }
public Duration PauseRemaining { get; private set; }

internal ConsumerPauseResponse(Msg msg, bool throwOnError) : base(msg, throwOnError)
{
Init();
}

internal ConsumerPauseResponse(string json, bool throwOnError) : base(json, throwOnError)
{
Init();
}

private void Init()
{
Paused = JsonNode[ApiConstants.Paused].AsBool;
if (Paused)
{
PauseUntil = AsDate(JsonNode[ApiConstants.PauseUntil]);
PauseRemaining = JsonUtils.AsDuration(JsonNode, ApiConstants.PauseRemaining, Duration.Zero);
}
else
{
PauseUntil = null;
PauseRemaining = null;
}
}
}
}
18 changes: 18 additions & 0 deletions src/NATS.Client/JetStream/IJetStreamManagement.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

using System;
using System.Collections.Generic;

namespace NATS.Client.JetStream
Expand Down Expand Up @@ -97,6 +98,23 @@ public interface IJetStreamManagement
/// <returns>True if the consumer was deleted.</returns>
bool DeleteConsumer(string streamName, string consumer);

/// <summary>
/// Pauses a consumer.
/// </summary>
/// <param name="streamName">The name of the stream the consumer is attached to.</param>
/// <param name="consumer">The name of the consumer.</param>
/// <param name="pauseUntil">Consumer is paused until this time.</param>
/// <returns>ConsumerPauseResponse.</returns>
ConsumerPauseResponse PauseConsumer(string streamName, string consumer, DateTime pauseUntil);

/// <summary>
/// Resumes a consumer.
/// </summary>
/// <param name="streamName">The name of the stream the consumer is attached to.</param>
/// <param name="consumer">The name of the consumer.</param>
/// <returns>True if the resume succeeded.</returns>
bool ResumeConsumer(string streamName, string consumer);

/// <summary>
/// Gets information for an existing consumer.
/// </summary>
Expand Down
20 changes: 20 additions & 0 deletions src/NATS.Client/JetStream/JetStreamManagement.cs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,26 @@ public bool DeleteConsumer(string streamName, string consumer)
return new SuccessApiResponse(m, true).Success;
}

public ConsumerPauseResponse PauseConsumer(string streamName, string consumer, DateTime pauseUntil)
{
Validator.ValidateStreamName(streamName, true);
Validator.ValidateNotNull(consumer, nameof(consumer));
string subj = string.Format(JetStreamConstants.JsapiConsumerPause, streamName, consumer);
ConsumerPauseRequest cprq = new ConsumerPauseRequest(pauseUntil);
Msg m = RequestResponseRequired(subj, cprq.Serialize(), Timeout);
return new ConsumerPauseResponse(m, true);
}

public bool ResumeConsumer(string streamName, string consumer)
{
Validator.ValidateStreamName(streamName, true);
Validator.ValidateNotNull(consumer, nameof(consumer));
string subj = string.Format(JetStreamConstants.JsapiConsumerPause, streamName, consumer);
Msg m = RequestResponseRequired(subj, null, Timeout);
ConsumerPauseResponse cpre = new ConsumerPauseResponse(m, true);
return !cpre.Paused;
}

public ConsumerInfo GetConsumerInfo(string streamName, string consumer)
{
Validator.ValidateStreamName(streamName, true);
Expand Down
Loading

0 comments on commit a7c301f

Please sign in to comment.