diff --git a/samples/Farfetch.LoadShedding.Samples.WebApi/Farfetch.LoadShedding.Samples.WebApi.csproj b/samples/Farfetch.LoadShedding.Samples.WebApi/Farfetch.LoadShedding.Samples.WebApi.csproj index e94fb92..5e6f506 100644 --- a/samples/Farfetch.LoadShedding.Samples.WebApi/Farfetch.LoadShedding.Samples.WebApi.csproj +++ b/samples/Farfetch.LoadShedding.Samples.WebApi/Farfetch.LoadShedding.Samples.WebApi.csproj @@ -1,9 +1,10 @@ - + net8.0 enable enable + CS1591,SA1600 diff --git a/src/Farfetch.LoadShedding/Tasks/ConcurrentCounter.cs b/src/Farfetch.LoadShedding/Tasks/ConcurrentCounter.cs index 1d3cf82..1bc5308 100644 --- a/src/Farfetch.LoadShedding/Tasks/ConcurrentCounter.cs +++ b/src/Farfetch.LoadShedding/Tasks/ConcurrentCounter.cs @@ -1,4 +1,4 @@ -namespace Farfetch.LoadShedding.Tasks +namespace Farfetch.LoadShedding.Tasks { internal class ConcurrentCounter { diff --git a/src/Farfetch.LoadShedding/Tasks/TaskManager.cs b/src/Farfetch.LoadShedding/Tasks/TaskManager.cs index ac13252..955d567 100644 --- a/src/Farfetch.LoadShedding/Tasks/TaskManager.cs +++ b/src/Farfetch.LoadShedding/Tasks/TaskManager.cs @@ -1,4 +1,4 @@ -using System; +using System; using System.Threading; using System.Threading.Tasks; using Farfetch.LoadShedding.Constants; @@ -28,11 +28,7 @@ public TaskManager( this._events = events; - this._taskQueue = new TaskQueue(maxQueueSize) - { - OnItemEnqueued = (count, item) => this.NotifyItemEnqueued(count, item), - OnItemDequeued = (count, item) => this.NotifyItemDequeued(count, item), - }; + this._taskQueue = new TaskQueue(maxQueueSize); this._events?.ConcurrencyLimitChanged?.Raise(new LimitChangedEventArgs(this._counter.Limit)); this._events?.QueueLimitChanged?.Raise(new LimitChangedEventArgs(maxQueueSize)); @@ -101,6 +97,8 @@ public async Task AcquireAsync(Priority priority, CancellationToken ca { try { + this.NotifyItemEnqueued(this._taskQueue.Count, item); + await item .WaitAsync(this._queueTimeout, cancellationToken) .ConfigureAwait(false); @@ -108,6 +106,7 @@ await item finally { this._taskQueue.Remove(item); + this.NotifyItemDequeued(this._taskQueue.Count, item); } } diff --git a/src/Farfetch.LoadShedding/Tasks/TaskQueue.cs b/src/Farfetch.LoadShedding/Tasks/TaskQueue.cs index 2f113ce..4d28890 100644 --- a/src/Farfetch.LoadShedding/Tasks/TaskQueue.cs +++ b/src/Farfetch.LoadShedding/Tasks/TaskQueue.cs @@ -1,4 +1,4 @@ -using System; +using System; using System.Collections.Generic; using System.Linq; using System.Runtime.CompilerServices; @@ -9,7 +9,7 @@ namespace Farfetch.LoadShedding.Tasks { internal class TaskQueue { - private readonly ConcurrentCounter _counter = new ConcurrentCounter(); + private readonly ConcurrentCounter _counter = new(); private readonly IDictionary _queues = new SortedDictionary() { @@ -31,10 +31,6 @@ public int Limit set => this._counter.Limit = value; } - public Action OnItemEnqueued { get; set; } - - public Action OnItemDequeued { get; set; } - public void Enqueue(TaskItem item) { int count = this.EnqueueItem(item); @@ -54,7 +50,7 @@ public TaskItem Dequeue() if (nextQueueItem != null) { - this.DecrementCounter(nextQueueItem); + this.DecrementCounter(); } return nextQueueItem; @@ -64,7 +60,7 @@ public void Remove(TaskItem item) { if (this._queues[item.Priority].Remove(item)) { - this.DecrementCounter(item); + this.DecrementCounter(); } } @@ -82,8 +78,6 @@ private int EnqueueItem(TaskItem item) var count = this._counter.Increment(); - this.OnItemEnqueued?.Invoke(count, item); - return count; } @@ -99,16 +93,14 @@ private void RejectLastItem() return; } - this._counter.Decrement(); - - this.DecrementCounter(lastItem); + this.DecrementCounter(); lastItem.Reject(); } - private void DecrementCounter(TaskItem nextQueueItem) + private void DecrementCounter() { - this.OnItemDequeued?.Invoke(this._counter.Decrement(), nextQueueItem); + this._counter.Decrement(); } } } diff --git a/tests/integration-tests/Farfetch.LoadShedding.IntegrationTests/Farfetch.LoadShedding.IntegrationTests.csproj b/tests/integration-tests/Farfetch.LoadShedding.IntegrationTests/Farfetch.LoadShedding.IntegrationTests.csproj index e472ba4..7e5981f 100644 --- a/tests/integration-tests/Farfetch.LoadShedding.IntegrationTests/Farfetch.LoadShedding.IntegrationTests.csproj +++ b/tests/integration-tests/Farfetch.LoadShedding.IntegrationTests/Farfetch.LoadShedding.IntegrationTests.csproj @@ -1,11 +1,11 @@ - + net8.0 enable enable - false + CS1591,SA1600,SA1606 diff --git a/tests/integration-tests/Farfetch.LoadShedding.IntegrationTests/Tests/Limiters/AdaptativeConcurrencyLimiterTests.cs b/tests/integration-tests/Farfetch.LoadShedding.IntegrationTests/Tests/Limiters/AdaptativeConcurrencyLimiterTests.cs index 216b565..8860792 100644 --- a/tests/integration-tests/Farfetch.LoadShedding.IntegrationTests/Tests/Limiters/AdaptativeConcurrencyLimiterTests.cs +++ b/tests/integration-tests/Farfetch.LoadShedding.IntegrationTests/Tests/Limiters/AdaptativeConcurrencyLimiterTests.cs @@ -53,7 +53,7 @@ public async Task GetAsync_WithReducedLimitAndQueueSize_SomeRequestsAreRejected( // Act var tasks = Enumerable - .Range(0, 160) + .Range(0, 1000) .Select(_ => Task.Run(() => client.GetAsync("/api/people"))); var results = await Task.WhenAll(tasks.ToArray()); @@ -61,7 +61,42 @@ public async Task GetAsync_WithReducedLimitAndQueueSize_SomeRequestsAreRejected( // Assert Assert.True(results.Count(x => x.IsSuccessStatusCode) >= MinSuccessfulRequests); Assert.Contains(results, x => x.StatusCode == HttpStatusCode.ServiceUnavailable); - await AssertMetrics(client); + await AssertMetrics(client, Priority.Normal); + } + + [Fact] + public async Task GetAsync_WithReducedLimitAndQueueSizeAndMultiplePriorities_SomeRequestsAreRejected() + { + // Arrange + const int InitialConcurrencyLimit = 100, InitialQueueSize = 4, MinSuccessfulRequests = 44; + + var options = new ConcurrencyOptions + { + QueueTimeoutInMs = 2, + InitialConcurrencyLimit = InitialConcurrencyLimit, + InitialQueueSize = InitialQueueSize, + MinQueueSize = InitialQueueSize, + }; + + var client = this.GetClient(options, x => x.UseHeaderPriorityResolver()); + + // Act + var tasks = Enumerable + .Range(0, 1000) + .Select(i => Task.Run(() => + { + var message = new HttpRequestMessage(HttpMethod.Get, "/api/people"); + message.Headers.Add("X-Priority", ((Priority)(i % 3)).ToString().ToLower()); + return client.SendAsync(message); + })); + + var results = await Task.WhenAll(tasks.ToArray()); + + // Assert + Assert.True(results.Count(x => x.IsSuccessStatusCode) >= MinSuccessfulRequests); + Assert.Contains(results, x => x.StatusCode == HttpStatusCode.ServiceUnavailable); + + await AssertMetrics(client, Priority.Critical, Priority.Normal, Priority.NonCritical); } /// @@ -95,12 +130,6 @@ public async Task GetAsync_WithHighLimitAndQueueSize_NoneRequestsIsRejected() Assert.Equal(ExpectedRejectedRequests, results.Count(x => x.StatusCode == HttpStatusCode.ServiceUnavailable)); } - /// - /// - /// - /// - /// - /// A representing the result of the asynchronous operation. [Theory] [InlineData("critical", Priority.Critical)] [InlineData("normal", Priority.Normal)] @@ -135,11 +164,6 @@ public async Task GetAsync_WithHeaderPriority_ResolvePriorityFromHeaderValue(str Assert.True(_enqueuedItems.All(x => x == priority)); } - /// - /// - /// - /// - /// A representing the result of the asynchronous operation. [Theory] [InlineData("critical")] [InlineData("normal")] @@ -234,7 +258,9 @@ public async Task GetMetrics_WithDisableMetrics_ShouldNotExportDisableMetrics() Assert.NotNull(metrics.Content); Assert.Equal("text/plain", metrics.Content?.Headers?.ContentType?.MediaType); - var content = metrics.Content?.ReadAsStringAsync().GetAwaiter().GetResult(); + Assert.NotNull(metrics.Content); + + var content = await metrics.Content.ReadAsStringAsync(); Assert.DoesNotContain("http_requests_concurrency_limit_total", content); Assert.DoesNotContain("http_requests_queue_limit_total", content); @@ -289,7 +315,7 @@ public HttpClient GetClient( return testServer.CreateClient(); } - private static async Task AssertMetrics(HttpClient client) + private static async Task AssertMetrics(HttpClient client, params Priority[] priorities) { var metrics = await client.GetAsync("/monitoring/metrics"); @@ -303,12 +329,21 @@ private static async Task AssertMetrics(HttpClient client) Assert.Contains("http_requests_concurrency_items_total", content); Assert.Contains("http_requests_concurrency_limit_total", content); Assert.Contains("http_requests_task_processing_time_seconds", content); - Assert.Contains("http_requests_queue_items_total{method=\"GET\",priority=\"normal\"}", content); Assert.Contains("http_requests_queue_limit_total", content); - Assert.Contains("http_requests_queue_time_seconds_sum{method=\"GET\",priority=\"normal\"}", content); - Assert.Contains("http_requests_queue_time_seconds_count{method=\"GET\",priority=\"normal\"}", content); - Assert.Contains("http_requests_queue_time_seconds_bucket{method=\"GET\",priority=\"normal\",le=\"0.0005\"}", content); - Assert.Contains("http_requests_rejected_total{method=\"GET\",priority=\"normal\",reason=\"max_queue_items\"}", content); + + foreach(var priority in priorities) + { + var priorityText = priority.ToString().ToLower(); + Assert.Contains($"http_requests_queue_items_total{{method=\"GET\",priority=\"{priorityText}\"}}", content); + Assert.Contains($"http_requests_queue_time_seconds_sum{{method=\"GET\",priority=\"{priorityText}\"}}", content); + Assert.Contains($"http_requests_queue_time_seconds_count{{method=\"GET\",priority=\"{priorityText}\"}}", content); + Assert.Contains($"http_requests_queue_time_seconds_bucket{{method=\"GET\",priority=\"{priorityText}\",le=\"0.0005\"}}", content); + } + + var lowPriority = priorities.Max(); + + Assert.Contains($"http_requests_rejected_total{{method=\"GET\",priority=\"{lowPriority.ToString().ToLower()}\",reason=\"max_queue_items\"}}", content); + Assert.DoesNotContain($"http_requests_rejected_total{{method=\"UNKNOWN\",priority=\"{lowPriority.ToString().ToLower()}\",reason=\"max_queue_items\"}}", content); } } } diff --git a/tests/unit-tests/Farfetch.LoadShedding.Tests/Farfetch.LoadShedding.Tests.csproj b/tests/unit-tests/Farfetch.LoadShedding.Tests/Farfetch.LoadShedding.Tests.csproj index 6e38a18..248a1d1 100644 --- a/tests/unit-tests/Farfetch.LoadShedding.Tests/Farfetch.LoadShedding.Tests.csproj +++ b/tests/unit-tests/Farfetch.LoadShedding.Tests/Farfetch.LoadShedding.Tests.csproj @@ -4,8 +4,8 @@ net8.0 enable enable - false + CS1591,SA1600 diff --git a/tests/unit-tests/Farfetch.LoadShedding.Tests/Tasks/TaskQueueTests.cs b/tests/unit-tests/Farfetch.LoadShedding.Tests/Tasks/TaskQueueTests.cs index 8cb2ad8..cb0747b 100644 --- a/tests/unit-tests/Farfetch.LoadShedding.Tests/Tasks/TaskQueueTests.cs +++ b/tests/unit-tests/Farfetch.LoadShedding.Tests/Tasks/TaskQueueTests.cs @@ -67,5 +67,25 @@ public void Enqueue_TaskWithHigherPriorityQueueLimitIsReached_EnqueueItemAndReje Assert.Equal(TaskResult.Pending, highPriorityTask.Status); Assert.Equal(TaskResult.Rejected, lowPriorityTask.Status); } + + [Fact] + public void Remove_ItemAlreadyRejected_ShouldNotDecrementCounter() + { + // Arrange + this._target.Limit = 2; + + var lastTask = new TaskItem(Priority.NonCritical); + + this._target.Enqueue(new TaskItem(Priority.Critical)); + this._target.Enqueue(new TaskItem(Priority.Critical)); + this._target.Enqueue(lastTask); + + // Act + this._target.Remove(lastTask); + + // Assert + Assert.Equal(2, _target.Count); + Assert.Equal(TaskResult.Rejected, lastTask.Status); + } } }