Skip to content

Commit

Permalink
Added info and stats to services (#274)
Browse files Browse the repository at this point in the history
* Added info and stats to services

Added info and stats accessors to service object as they are returned
from the service query APIs $SRV.INFO and STATS

* Renamed Info and Stata methods

* Revert "Renamed Info and Stata methods"

This reverts commit 2f8dfc5.
  • Loading branch information
mtmk authored Dec 11, 2023
1 parent 3460b1e commit 9ed8a14
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 69 deletions.
13 changes: 13 additions & 0 deletions src/NATS.Client.Services/INatsSvcServer.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using NATS.Client.Core;
using NATS.Client.Services.Models;

namespace NATS.Client.Services;

Expand Down Expand Up @@ -39,4 +40,16 @@ public interface INatsSvcServer : IAsyncDisposable
/// <param name="cancellationToken">A <see cref="CancellationToken"/> may be used to cancel th call in the future.</param>
/// <returns>A <seealso cref="ValueTask"/> representing the asynchronous operation.</returns>
ValueTask<NatsSvcServer.Group> AddGroupAsync(string name, string? queueGroup = default, CancellationToken cancellationToken = default);

/// <summary>
/// Get current stats for the service.
/// </summary>
/// <returns>Stats response object</returns>
StatsResponse GetStats();

/// <summary>
/// Get current info for the service.
/// </summary>
/// <returns>Info response object</returns>
InfoResponse GetInfo();
}
124 changes: 68 additions & 56 deletions src/NATS.Client.Services/NatsSvcServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,71 @@ public ValueTask<Group> AddGroupAsync(string name, string? queueGroup = default,
return ValueTask.FromResult(group);
}

/// <inheritdoc />
public StatsResponse GetStats()
{
var endPoints = _endPoints.Select(ep =>
{
JsonNode? statsData;
try
{
statsData = _config.StatsHandler?.Invoke(ep.Value);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error calling stats handler for {Endpoint}", ep.Key);
statsData = null;
}

return new EndpointStats
{
Name = ep.Key,
Subject = ep.Value.Subject,
QueueGroup = ep.Value.QueueGroup!,
Data = statsData!,
ProcessingTime = ep.Value.ProcessingTime,
NumRequests = ep.Value.Requests,
NumErrors = ep.Value.Errors,
LastError = ep.Value.LastError ?? string.Empty,
AverageProcessingTime = ep.Value.AverageProcessingTime,
};
}).ToList();

var response = new StatsResponse
{
Name = _config.Name,
Id = _id,
Version = _config.Version,
Metadata = _config.Metadata!,
Endpoints = endPoints,
Started = _started,
};
return response;
}

/// <inheritdoc />
public InfoResponse GetInfo()
{
var endPoints = _endPoints.Select(ep => new EndpointInfo
{
Name = ep.Key,
Subject = ep.Value.Subject,
QueueGroup = ep.Value.QueueGroup!,
Metadata = ep.Value.Metadata!,
}).ToList();

var infoResponse = new InfoResponse
{
Name = _config.Name,
Id = _id,
Version = _config.Version,
Description = _config.Description!,
Metadata = _config.Metadata!,
Endpoints = endPoints,
};
return infoResponse;
}

/// <summary>
/// Stop the service.
/// </summary>
Expand Down Expand Up @@ -172,7 +237,7 @@ private async Task MsgLoop()
}

await svcMsg.Msg.ReplyAsync(
new PingResponse
data: new PingResponse
{
Name = _config.Name,
Id = _id,
Expand All @@ -189,24 +254,8 @@ await svcMsg.Msg.ReplyAsync(
// empty request payload
}

var endPoints = _endPoints.Select(ep => new EndpointInfo
{
Name = ep.Key,
Subject = ep.Value.Subject,
QueueGroup = ep.Value.QueueGroup!,
Metadata = ep.Value.Metadata!,
}).ToList();

await svcMsg.Msg.ReplyAsync(
new InfoResponse
{
Name = _config.Name,
Id = _id,
Version = _config.Version,
Description = _config.Description!,
Metadata = _config.Metadata!,
Endpoints = endPoints,
},
data: GetInfo(),
serializer: NatsSrvJsonSerializer<InfoResponse>.Default,
cancellationToken: _cancellationToken);
}
Expand All @@ -217,45 +266,8 @@ await svcMsg.Msg.ReplyAsync(
// empty request payload
}

var endPoints = _endPoints.Select(ep =>
{
JsonNode? statsData;
try
{
statsData = _config.StatsHandler?.Invoke(ep.Value);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error calling stats handler for {Endpoint}", ep.Key);
statsData = null;
}

return new EndpointStats
{
Name = ep.Key,
Subject = ep.Value.Subject,
QueueGroup = ep.Value.QueueGroup!,
Data = statsData!,
ProcessingTime = ep.Value.ProcessingTime,
NumRequests = ep.Value.Requests,
NumErrors = ep.Value.Errors,
LastError = ep.Value.LastError ?? string.Empty,
AverageProcessingTime = ep.Value.AverageProcessingTime,
};
}).ToList();

var response = new StatsResponse
{
Name = _config.Name,
Id = _id,
Version = _config.Version,
Metadata = _config.Metadata!,
Endpoints = endPoints,
Started = _started,
};

await svcMsg.Msg.ReplyAsync(
response,
data: GetStats(),
serializer: NatsSrvJsonSerializer<StatsResponse>.Default,
cancellationToken: _cancellationToken);
}
Expand Down
36 changes: 23 additions & 13 deletions tests/NATS.Client.Services.Tests/ServicesTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,16 @@ await s1.AddEndpointAsync<int>(
},
cancellationToken: cancellationToken);

var info = (await nats.FindServicesAsync("$SRV.INFO", 1, NatsSrvJsonSerializer<InfoResponse>.Default, cancellationToken)).First();
Assert.Single(info.Endpoints);
var endpointInfo = info.Endpoints.First();
Assert.Equal("e1", endpointInfo.Name);
var info1 = (await nats.FindServicesAsync("$SRV.INFO", 1, NatsSrvJsonSerializer<InfoResponse>.Default, cancellationToken)).First();
var info2 = s1.GetInfo();
foreach (var info in new[] { info1, info2 })
{
Assert.Single(info.Endpoints);
var endpointInfo1 = info.Endpoints.First();
Assert.Equal("e1", endpointInfo1.Name);
}

var endpointInfo = info1.Endpoints.First();

for (var i = 0; i < 10; i++)
{
Expand All @@ -107,15 +113,19 @@ await s1.AddEndpointAsync<int>(
}
}

var stat = (await nats.FindServicesAsync("$SRV.STATS", 1, NatsSrvJsonSerializer<StatsResponse>.Default, cancellationToken)).First();
Assert.Single(stat.Endpoints);
var endpointStats = stat.Endpoints.First();
Assert.Equal("e1", endpointStats.Name);
Assert.Equal(10, endpointStats.NumRequests);
Assert.Equal(3, endpointStats.NumErrors);
Assert.Equal("999:Handler error", endpointStats.LastError);
Assert.True(endpointStats.ProcessingTime > 0);
Assert.True(endpointStats.AverageProcessingTime > 0);
var stat1 = (await nats.FindServicesAsync("$SRV.STATS", 1, NatsSrvJsonSerializer<StatsResponse>.Default, cancellationToken)).First();
var stat2 = s1.GetStats();
foreach (var stat in new[] { stat1, stat2 })
{
Assert.Single(stat.Endpoints);
var endpointStats = stat.Endpoints.First();
Assert.Equal("e1", endpointStats.Name);
Assert.Equal(10, endpointStats.NumRequests);
Assert.Equal(3, endpointStats.NumErrors);
Assert.Equal("999:Handler error", endpointStats.LastError);
Assert.True(endpointStats.ProcessingTime > 0);
Assert.True(endpointStats.AverageProcessingTime > 0);
}
}

[Fact]
Expand Down

0 comments on commit 9ed8a14

Please sign in to comment.