Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pause and Resume consumer #874

Merged
merged 4 commits into from
Mar 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/NATS.Client/Internals/JetStreamConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ public static class JetStreamConstants
/// </summary>
public const string JsapiConsumerDelete = "CONSUMER.DELETE.{0}.{1}";

/// <summary>
/// JSAPI_CONSUMER_PAUSE is used to delete consumers.
/// </summary>
public const string JsapiConsumerPause = "CONSUMER.PAUSE.{0}.{1}";

/// <summary>
/// JSAPI_CONSUMER_NAMES is used to return a list of consumer names
/// </summary>
Expand Down
22 changes: 22 additions & 0 deletions src/NATS.Client/Internals/JsonUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -151,12 +151,34 @@ public static DateTime AsDate(JSONNode node)
return DateTime.MinValue;
}
}

public static DateTime? AsOptionalDate(JSONNode node)
{
try
{
if (node.IsNull)
{
return null;
}
return DateTime.Parse(node.Value).ToUniversalTime();
}
catch (Exception)
{
return null;
}
}

public static string ToString(DateTime dt)
{
// Assume MinValue is Unset
return dt.Equals(DateTime.MinValue) ? null : UnsafeToString(dt);
}

public static string ToString(DateTime? dt)
{
// Assume MinValue is Unset
return !dt.HasValue || dt.Equals(DateTime.MinValue) ? null : UnsafeToString(dt.Value);
}

public static string UnsafeToString(DateTime dt)
{
Expand Down
3 changes: 3 additions & 0 deletions src/NATS.Client/JetStream/ApiConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,9 @@ public static class ApiConstants
public const string Options = "options";
public const string OptStartSeq = "opt_start_seq";
public const string OptStartTime = "opt_start_time";
public const string Paused = "paused";
public const string PauseRemaining = "pause_remaining";
public const string PauseUntil = "pause_until";
public const string Placement = "placement";
public const string ProcessingTime = "processing_time";
public const string Republish = "republish";
Expand Down
23 changes: 23 additions & 0 deletions src/NATS.Client/JetStream/ConsumerConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public sealed class ConsumerConfiguration : JsonSerializable
public string DeliverGroup { get; }
public string SampleFrequency { get; }
public DateTime StartTime { get; }
public DateTime? PauseUntil { get; }
public Duration AckWait { get; }
public Duration IdleHeartbeat { get; }
public Duration MaxExpires { get; }
Expand Down Expand Up @@ -113,6 +114,7 @@ internal ConsumerConfiguration(JSONNode ccNode)
SampleFrequency = ccNode[ApiConstants.SampleFreq].Value;

StartTime = AsDate(ccNode[ApiConstants.OptStartTime]);
PauseUntil = AsOptionalDate(ccNode[ApiConstants.PauseUntil]);
AckWait = AsDuration(ccNode, ApiConstants.AckWait, null);
IdleHeartbeat = AsDuration(ccNode, ApiConstants.IdleHeartbeat, null);
MaxExpires = AsDuration(ccNode, ApiConstants.MaxExpires, null);
Expand Down Expand Up @@ -158,6 +160,7 @@ private ConsumerConfiguration(ConsumerConfigurationBuilder builder)
SampleFrequency = builder._sampleFrequency;

StartTime = builder._startTime;
PauseUntil = builder._pauseUntil;
AckWait = builder._ackWait;
IdleHeartbeat = builder._idleHeartbeat;
MaxExpires = builder._maxExpires;
Expand Down Expand Up @@ -192,6 +195,7 @@ public override JSONNode ToJsonNode()
AddField(o, ApiConstants.DeliverGroup, DeliverGroup);
AddField(o, ApiConstants.OptStartSeq, StartSeq);
AddField(o, ApiConstants.OptStartTime, JsonUtils.ToString(StartTime));
AddField(o, ApiConstants.PauseUntil, JsonUtils.ToString(PauseUntil));
AddField(o, ApiConstants.AckPolicy, AckPolicy.GetString());
AddField(o, ApiConstants.AckWait, AckWait);
AddField(o, ApiConstants.MaxDeliver, MaxDeliver);
Expand Down Expand Up @@ -252,6 +256,7 @@ internal IList<string> GetChanges(ConsumerConfiguration server)
if (InactiveThreshold != null && !InactiveThreshold.Equals(server.InactiveThreshold)) { changes.Add("InactiveThreshold"); }

RecordWouldBeChange(StartTime, server.StartTime, "StartTime", changes);
RecordWouldBeChange(PauseUntil, server.PauseUntil, "PauseUntil", changes);

RecordWouldBeChange(Description, server.Description, "Description", changes);
RecordWouldBeChange(SampleFrequency, server.SampleFrequency, "SampleFrequency", changes);
Expand All @@ -275,6 +280,11 @@ private void RecordWouldBeChange(DateTime request, DateTime server, string field
{
if (request != DateTime.MinValue && !request.Equals(server)) { changes.Add(field); }
}

private void RecordWouldBeChange(DateTime? request, DateTime? server, string field, IList<string> changes)
{
RecordWouldBeChange(request.GetValueOrDefault(DateTime.MinValue), server.GetValueOrDefault(DateTime.MinValue), field, changes);
}

internal static int GetOrUnset(int? val)
{
Expand Down Expand Up @@ -341,6 +351,7 @@ public sealed class ConsumerConfigurationBuilder
internal string _sampleFrequency;

internal DateTime _startTime;
internal DateTime? _pauseUntil;
internal Duration _ackWait;
internal Duration _idleHeartbeat;
internal Duration _maxExpires;
Expand Down Expand Up @@ -379,6 +390,7 @@ public ConsumerConfigurationBuilder(ConsumerConfiguration cc)
_sampleFrequency = cc.SampleFrequency;

_startTime = cc.StartTime;
_pauseUntil = cc.PauseUntil;
_ackWait = cc.AckWait;
_idleHeartbeat = cc.IdleHeartbeat;
_maxExpires = cc.MaxExpires;
Expand Down Expand Up @@ -506,6 +518,17 @@ public ConsumerConfigurationBuilder WithStartTime(DateTime startTime)
return this;
}

/// <summary>
/// Sets the time to pause the consumer until
/// </summary>
/// <param name="pauseUntil">the time to pause the consumer until</param>
/// <returns>The ConsumerConfigurationBuilder</returns>
public ConsumerConfigurationBuilder WithPauseUntil(DateTime? pauseUntil)
{
_pauseUntil = pauseUntil;
return this;
}

/// <summary>
/// Sets the acknowledgement policy of the ConsumerConfiguration.
/// </summary>
Expand Down
9 changes: 8 additions & 1 deletion src/NATS.Client/JetStream/ConsumerInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// limitations under the License.

using System;
using NATS.Client.Internals;
using NATS.Client.Internals.SimpleJSON;
using static NATS.Client.Internals.JsonUtils;

Expand All @@ -33,7 +34,9 @@ public sealed class ConsumerInfo : ApiResponse
public bool PushBound { get; private set; }
public ulong CalculatedPending => NumPending + Delivered.ConsumerSeq;
public DateTime Timestamp { get; private set; }

public bool Paused { get; private set; }
public Duration PauseRemaining { get; private set; }

internal ConsumerInfo(Msg msg, bool throwOnError) : base(msg, throwOnError)
{
Init(JsonNode);
Expand Down Expand Up @@ -64,6 +67,8 @@ private void Init(JSONNode ciNode)
ClusterInfo = ClusterInfo.OptionalInstance(ciNode[ApiConstants.Cluster]);
PushBound = ciNode[ApiConstants.PushBound].AsBool;
Timestamp = AsDate(ciNode[ApiConstants.Timestamp]);
Paused = ciNode[ApiConstants.Paused].AsBool;
PauseRemaining = Paused ? JsonUtils.AsDuration(ciNode, ApiConstants.PauseRemaining, Duration.Zero) : null;
}

public override string ToString()
Expand All @@ -80,6 +85,8 @@ public override string ToString()
", Timestamp=" + Timestamp +
", Delivered=" + Delivered +
", AckFloor=" + AckFloor +
", Paused=" + Paused +
", PauseRemaining=" + PauseRemaining +
", " + ObjectString("ClusterInfo", ClusterInfo) +
", " + "ConsumerConfiguration" + ConsumerConfiguration.ToJsonString() +
'}';
Expand Down
36 changes: 36 additions & 0 deletions src/NATS.Client/JetStream/ConsumerPauseRequest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright 2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using System;
using NATS.Client.Internals;
using NATS.Client.Internals.SimpleJSON;

namespace NATS.Client.JetStream
{
public sealed class ConsumerPauseRequest : JsonSerializable
{
public DateTime PauseUntil { get; }

internal ConsumerPauseRequest(DateTime pauseUntil)
{
PauseUntil = pauseUntil;
}

public override JSONNode ToJsonNode()
{
JSONObject jso = new JSONObject();
JsonUtils.AddField(jso, ApiConstants.PauseUntil, PauseUntil);
return jso;
}
}
}
51 changes: 51 additions & 0 deletions src/NATS.Client/JetStream/ConsumerPauseResponse.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright 2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using System;
using NATS.Client.Internals;
using static NATS.Client.Internals.JsonUtils;

namespace NATS.Client.JetStream
{
public sealed class ConsumerPauseResponse : ApiResponse
{
public bool Paused { get; private set; }
public DateTime? PauseUntil { get; private set; }
public Duration PauseRemaining { get; private set; }

internal ConsumerPauseResponse(Msg msg, bool throwOnError) : base(msg, throwOnError)
{
Init();
}

internal ConsumerPauseResponse(string json, bool throwOnError) : base(json, throwOnError)
{
Init();
}

private void Init()
{
Paused = JsonNode[ApiConstants.Paused].AsBool;
if (Paused)
{
PauseUntil = AsDate(JsonNode[ApiConstants.PauseUntil]);
PauseRemaining = JsonUtils.AsDuration(JsonNode, ApiConstants.PauseRemaining, Duration.Zero);
}
else
{
PauseUntil = null;
PauseRemaining = null;
}
}
}
}
18 changes: 18 additions & 0 deletions src/NATS.Client/JetStream/IJetStreamManagement.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

using System;
using System.Collections.Generic;

namespace NATS.Client.JetStream
Expand Down Expand Up @@ -97,6 +98,23 @@ public interface IJetStreamManagement
/// <returns>True if the consumer was deleted.</returns>
bool DeleteConsumer(string streamName, string consumer);

/// <summary>
/// Pauses a consumer.
/// </summary>
/// <param name="streamName">The name of the stream the consumer is attached to.</param>
/// <param name="consumer">The name of the consumer.</param>
/// <param name="pauseUntil">Consumer is paused until this time.</param>
/// <returns>ConsumerPauseResponse.</returns>
ConsumerPauseResponse PauseConsumer(string streamName, string consumer, DateTime pauseUntil);

/// <summary>
/// Resumes a consumer.
/// </summary>
/// <param name="streamName">The name of the stream the consumer is attached to.</param>
/// <param name="consumer">The name of the consumer.</param>
/// <returns>True if the resume succeeded.</returns>
bool ResumeConsumer(string streamName, string consumer);

/// <summary>
/// Gets information for an existing consumer.
/// </summary>
Expand Down
20 changes: 20 additions & 0 deletions src/NATS.Client/JetStream/JetStreamManagement.cs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,26 @@ public bool DeleteConsumer(string streamName, string consumer)
return new SuccessApiResponse(m, true).Success;
}

public ConsumerPauseResponse PauseConsumer(string streamName, string consumer, DateTime pauseUntil)
{
Validator.ValidateStreamName(streamName, true);
Validator.ValidateNotNull(consumer, nameof(consumer));
string subj = string.Format(JetStreamConstants.JsapiConsumerPause, streamName, consumer);
ConsumerPauseRequest cprq = new ConsumerPauseRequest(pauseUntil);
Msg m = RequestResponseRequired(subj, cprq.Serialize(), Timeout);
return new ConsumerPauseResponse(m, true);
}

public bool ResumeConsumer(string streamName, string consumer)
{
Validator.ValidateStreamName(streamName, true);
Validator.ValidateNotNull(consumer, nameof(consumer));
string subj = string.Format(JetStreamConstants.JsapiConsumerPause, streamName, consumer);
Msg m = RequestResponseRequired(subj, null, Timeout);
ConsumerPauseResponse cpre = new ConsumerPauseResponse(m, true);
return !cpre.Paused;
}

public ConsumerInfo GetConsumerInfo(string streamName, string consumer)
{
Validator.ValidateStreamName(streamName, true);
Expand Down
Loading
Loading