Skip to content

Commit

Permalink
fix: unknown http method issue
Browse files Browse the repository at this point in the history
  • Loading branch information
ailtonguitar authored and brmagadutra committed Apr 23, 2024
1 parent 810350e commit 7f017cf
Show file tree
Hide file tree
Showing 8 changed files with 93 additions and 46 deletions.
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
<Project Sdk="Microsoft.NET.Sdk.Web">
<Project Sdk="Microsoft.NET.Sdk.Web">

<PropertyGroup>
<TargetFramework>net8.0</TargetFramework>
<Nullable>enable</Nullable>
<ImplicitUsings>enable</ImplicitUsings>
<NoWarn>CS1591,SA1600</NoWarn>
</PropertyGroup>

<ItemGroup>
Expand Down
2 changes: 1 addition & 1 deletion src/Farfetch.LoadShedding/Tasks/ConcurrentCounter.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
namespace Farfetch.LoadShedding.Tasks
namespace Farfetch.LoadShedding.Tasks
{
internal class ConcurrentCounter
{
Expand Down
11 changes: 5 additions & 6 deletions src/Farfetch.LoadShedding/Tasks/TaskManager.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using System;
using System;
using System.Threading;
using System.Threading.Tasks;
using Farfetch.LoadShedding.Constants;
Expand Down Expand Up @@ -28,11 +28,7 @@ public TaskManager(

this._events = events;

this._taskQueue = new TaskQueue(maxQueueSize)
{
OnItemEnqueued = (count, item) => this.NotifyItemEnqueued(count, item),
OnItemDequeued = (count, item) => this.NotifyItemDequeued(count, item),
};
this._taskQueue = new TaskQueue(maxQueueSize);

this._events?.ConcurrencyLimitChanged?.Raise(new LimitChangedEventArgs(this._counter.Limit));
this._events?.QueueLimitChanged?.Raise(new LimitChangedEventArgs(maxQueueSize));
Expand Down Expand Up @@ -101,13 +97,16 @@ public async Task<TaskItem> AcquireAsync(Priority priority, CancellationToken ca
{
try
{
this.NotifyItemEnqueued(this._taskQueue.Count, item);

await item
.WaitAsync(this._queueTimeout, cancellationToken)
.ConfigureAwait(false);
}
finally
{
this._taskQueue.Remove(item);
this.NotifyItemDequeued(this._taskQueue.Count, item);
}
}

Expand Down
22 changes: 7 additions & 15 deletions src/Farfetch.LoadShedding/Tasks/TaskQueue.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using System;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
Expand All @@ -9,7 +9,7 @@ namespace Farfetch.LoadShedding.Tasks
{
internal class TaskQueue
{
private readonly ConcurrentCounter _counter = new ConcurrentCounter();
private readonly ConcurrentCounter _counter = new();

private readonly IDictionary<Priority, TaskItemList> _queues = new SortedDictionary<Priority, TaskItemList>()
{
Expand All @@ -31,10 +31,6 @@ public int Limit
set => this._counter.Limit = value;
}

public Action<int, TaskItem> OnItemEnqueued { get; set; }

public Action<int, TaskItem> OnItemDequeued { get; set; }

public void Enqueue(TaskItem item)
{
int count = this.EnqueueItem(item);
Expand All @@ -54,7 +50,7 @@ public TaskItem Dequeue()

if (nextQueueItem != null)
{
this.DecrementCounter(nextQueueItem);
this.DecrementCounter();
}

return nextQueueItem;
Expand All @@ -64,7 +60,7 @@ public void Remove(TaskItem item)
{
if (this._queues[item.Priority].Remove(item))
{
this.DecrementCounter(item);
this.DecrementCounter();
}
}

Expand All @@ -82,8 +78,6 @@ private int EnqueueItem(TaskItem item)

var count = this._counter.Increment();

this.OnItemEnqueued?.Invoke(count, item);

return count;
}

Expand All @@ -99,16 +93,14 @@ private void RejectLastItem()
return;
}

this._counter.Decrement();

this.DecrementCounter(lastItem);
this.DecrementCounter();

lastItem.Reject();
}

private void DecrementCounter(TaskItem nextQueueItem)
private void DecrementCounter()
{
this.OnItemDequeued?.Invoke(this._counter.Decrement(), nextQueueItem);
this._counter.Decrement();
}
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
<Project Sdk="Microsoft.NET.Sdk">
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>

<IsPackable>false</IsPackable>
<NoWarn>CS1591,SA1600,SA1606</NoWarn>
</PropertyGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,50 @@ public async Task GetAsync_WithReducedLimitAndQueueSize_SomeRequestsAreRejected(

// Act
var tasks = Enumerable
.Range(0, 160)
.Range(0, 1000)
.Select(_ => Task.Run(() => client.GetAsync("/api/people")));

var results = await Task.WhenAll(tasks.ToArray());

// Assert
Assert.True(results.Count(x => x.IsSuccessStatusCode) >= MinSuccessfulRequests);
Assert.Contains(results, x => x.StatusCode == HttpStatusCode.ServiceUnavailable);
await AssertMetrics(client);
await AssertMetrics(client, Priority.Normal);
}

[Fact]
public async Task GetAsync_WithReducedLimitAndQueueSizeAndMultiplePriorities_SomeRequestsAreRejected()
{
// Arrange
const int InitialConcurrencyLimit = 100, InitialQueueSize = 4, MinSuccessfulRequests = 44;

var options = new ConcurrencyOptions
{
QueueTimeoutInMs = 2,
InitialConcurrencyLimit = InitialConcurrencyLimit,
InitialQueueSize = InitialQueueSize,
MinQueueSize = InitialQueueSize,
};

var client = this.GetClient(options, x => x.UseHeaderPriorityResolver());

// Act
var tasks = Enumerable
.Range(0, 1000)
.Select(i => Task.Run(() =>
{
var message = new HttpRequestMessage(HttpMethod.Get, "/api/people");
message.Headers.Add("X-Priority", ((Priority)(i % 3)).ToString().ToLower());
return client.SendAsync(message);
}));

var results = await Task.WhenAll(tasks.ToArray());

// Assert
Assert.True(results.Count(x => x.IsSuccessStatusCode) >= MinSuccessfulRequests);
Assert.Contains(results, x => x.StatusCode == HttpStatusCode.ServiceUnavailable);

await AssertMetrics(client, Priority.Critical, Priority.Normal, Priority.NonCritical);
}

/// <summary>
Expand Down Expand Up @@ -95,12 +130,6 @@ public async Task GetAsync_WithHighLimitAndQueueSize_NoneRequestsIsRejected()
Assert.Equal(ExpectedRejectedRequests, results.Count(x => x.StatusCode == HttpStatusCode.ServiceUnavailable));
}

/// <summary>
///
/// </summary>
/// <param name="headerValue"></param>
/// <param name="priority"></param>
/// <returns>A <see cref="Task"/> representing the result of the asynchronous operation.</returns>
[Theory]
[InlineData("critical", Priority.Critical)]
[InlineData("normal", Priority.Normal)]
Expand Down Expand Up @@ -135,11 +164,6 @@ public async Task GetAsync_WithHeaderPriority_ResolvePriorityFromHeaderValue(str
Assert.True(_enqueuedItems.All(x => x == priority));
}

/// <summary>
///
/// </summary>
/// <param name="headerValue"></param>
/// <returns>A <see cref="Task"/> representing the result of the asynchronous operation.</returns>
[Theory]
[InlineData("critical")]
[InlineData("normal")]
Expand Down Expand Up @@ -234,7 +258,9 @@ public async Task GetMetrics_WithDisableMetrics_ShouldNotExportDisableMetrics()
Assert.NotNull(metrics.Content);
Assert.Equal("text/plain", metrics.Content?.Headers?.ContentType?.MediaType);

var content = metrics.Content?.ReadAsStringAsync().GetAwaiter().GetResult();
Assert.NotNull(metrics.Content);

var content = await metrics.Content.ReadAsStringAsync();

Assert.DoesNotContain("http_requests_concurrency_limit_total", content);
Assert.DoesNotContain("http_requests_queue_limit_total", content);
Expand Down Expand Up @@ -289,7 +315,7 @@ public HttpClient GetClient(
return testServer.CreateClient();
}

private static async Task AssertMetrics(HttpClient client)
private static async Task AssertMetrics(HttpClient client, params Priority[] priorities)
{
var metrics = await client.GetAsync("/monitoring/metrics");

Expand All @@ -303,12 +329,21 @@ private static async Task AssertMetrics(HttpClient client)
Assert.Contains("http_requests_concurrency_items_total", content);
Assert.Contains("http_requests_concurrency_limit_total", content);
Assert.Contains("http_requests_task_processing_time_seconds", content);
Assert.Contains("http_requests_queue_items_total{method=\"GET\",priority=\"normal\"}", content);
Assert.Contains("http_requests_queue_limit_total", content);
Assert.Contains("http_requests_queue_time_seconds_sum{method=\"GET\",priority=\"normal\"}", content);
Assert.Contains("http_requests_queue_time_seconds_count{method=\"GET\",priority=\"normal\"}", content);
Assert.Contains("http_requests_queue_time_seconds_bucket{method=\"GET\",priority=\"normal\",le=\"0.0005\"}", content);
Assert.Contains("http_requests_rejected_total{method=\"GET\",priority=\"normal\",reason=\"max_queue_items\"}", content);

foreach(var priority in priorities)
{
var priorityText = priority.ToString().ToLower();
Assert.Contains($"http_requests_queue_items_total{{method=\"GET\",priority=\"{priorityText}\"}}", content);
Assert.Contains($"http_requests_queue_time_seconds_sum{{method=\"GET\",priority=\"{priorityText}\"}}", content);
Assert.Contains($"http_requests_queue_time_seconds_count{{method=\"GET\",priority=\"{priorityText}\"}}", content);
Assert.Contains($"http_requests_queue_time_seconds_bucket{{method=\"GET\",priority=\"{priorityText}\",le=\"0.0005\"}}", content);
}

var lowPriority = priorities.Max();

Assert.Contains($"http_requests_rejected_total{{method=\"GET\",priority=\"{lowPriority.ToString().ToLower()}\",reason=\"max_queue_items\"}}", content);
Assert.DoesNotContain($"http_requests_rejected_total{{method=\"UNKNOWN\",priority=\"{lowPriority.ToString().ToLower()}\",reason=\"max_queue_items\"}}", content);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>

<IsPackable>false</IsPackable>
<NoWarn>CS1591,SA1600</NoWarn>
</PropertyGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,5 +67,25 @@ public void Enqueue_TaskWithHigherPriorityQueueLimitIsReached_EnqueueItemAndReje
Assert.Equal(TaskResult.Pending, highPriorityTask.Status);
Assert.Equal(TaskResult.Rejected, lowPriorityTask.Status);
}

[Fact]
public void Remove_ItemAlreadyRejected_ShouldNotDecrementCounter()
{
// Arrange
this._target.Limit = 2;

var lastTask = new TaskItem(Priority.NonCritical);

this._target.Enqueue(new TaskItem(Priority.Critical));
this._target.Enqueue(new TaskItem(Priority.Critical));
this._target.Enqueue(lastTask);

// Act
this._target.Remove(lastTask);

// Assert
Assert.Equal(2, _target.Count);
Assert.Equal(TaskResult.Rejected, lastTask.Status);
}
}
}

0 comments on commit 7f017cf

Please sign in to comment.