Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[APIPUB-72] Develop solution for processing rate limited/left over messages when warned #74

Merged
merged 5 commits into from
Sep 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/API-Publisher-Configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ Defines general behavior of the Ed-Fi API Publisher.
| Options:UseChangeVersionPaging<br/>`--useChangeVersionPaging` | Indicates whether or not to use change version paging.<br/>(_Default value: false_) |
| Options:ChangeVersionPagingWindowSize<br/>`--changeVersionPagingWindowSize` | Indicates the change version paging window size.<br/>(_Default value: 25000_) |
| Options:EnableRateLimit<br/>`--enableRateLimit` | Indicates whether or not to use rate limiting.<br/>(_Default value: false_) |
| Options:RateLimitNumberExecutions<br/>`--rateLimitNumberExecutions` | Indicates the maximum number of executions allowed within the defined time window.<br/>(_Default value: 100_) |
| Options:RateLimitNumberExecutions<br/>`--rateLimitNumberExecutions` | Indicates the maximum number of executions allowed within the defined time window.<br/>(_Default value: 30_) |
| Options:RateLimitTimeSeconds<br/>`--rateLimitTimeSeconds` | Indicates the the time span for the rate limit in seconds.<br/>(_Default value: 1_) |
| Options:RateLimitMaxRetries<br/>`--rateLimitMaxRetries` | Indicates the number of times the Ed-Fi API publisher will attempt to _resend_ a request, rejected by rate limiting, to the source or destination APIs before determining that the failure is permanent.<br/>(_Default value: 10_) |

## API Connections

Expand Down
3 changes: 2 additions & 1 deletion src/EdFi.Tools.ApiPublisher.Cli/apiPublisherSettings.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@
"useChangeVersionPaging": false,
"changeVersionPagingWindowSize": 25000,
"enableRateLimit": false,
"rateLimitNumberExecutions": 100,
"rateLimitNumberExecutions": 30,
"rateLimitTimeSeconds": 1,
"rateLimitMaxRetries": 10,
"useReversePaging": false
},
"authorizationFailureHandling": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ public ApiSourceResourceItemProvider(ISourceEdFiApiClientProvider sourceEdFiApiC
}
catch (RateLimitRejectedException)
{
_logger.Warning($"{sourceEdFiApiClient.DataManagementApiSegment}{resourceItemUrl}: Rate limit exceeded. Please try again later.");
_logger.Fatal($"{sourceEdFiApiClient.DataManagementApiSegment}{resourceItemUrl}: Rate limit exceeded. Please try again later.");
return (false, null);
}
//----------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,7 @@ public EdFiApiSourceTotalCountProvider(ISourceEdFiApiClientProvider sourceEdFiAp

string requestUri =
$"{edFiApiClient.DataManagementApiSegment}{resourceUrl}?offset=0&limit=1&totalCount=true{changeWindowQueryStringParameters}";

return RequestHelpers.SendGetRequestAsync(edFiApiClient, resourceUrl, requestUri, ct).Result;
return await RequestHelpers.SendGetRequestAsync(edFiApiClient, resourceUrl, requestUri, ct);
}, new Context(), cancellationToken);

string responseContent = null;
Expand Down Expand Up @@ -137,7 +136,7 @@ await HandleResourceCountRequestErrorAsync(resourceUrl, errorHandlingBlock, apiR
}
catch (RateLimitRejectedException)
{
_logger.Warning($"{edFiApiClient.DataManagementApiSegment}{resourceUrl}: Rate limit exceeded. Please try again later.");
_logger.Fatal($"{edFiApiClient.DataManagementApiSegment}{resourceUrl}: Rate limit exceeded. Please try again later.");
return (false, 0);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ public async Task<IEnumerable<TProcessDataMessage>> HandleStreamResourcePageAsyn
}
catch (RateLimitRejectedException)
{
_logger.Warning($"{message.ResourceUrl}: Rate limit exceeded. Please try again later.");
_logger.Fatal($"{message.ResourceUrl}: Rate limit exceeded. Please try again later.");
}
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ private TransformManyBlock<GetItemForKeyChangeMessage, ChangeKeyMessage> CreateG
}
catch (RateLimitRejectedException)
{
_logger.Warning($"{message.ResourceUrl}: Rate limit exceeded. Please try again later.");
_logger.Fatal($"{message.ResourceUrl}: Rate limit exceeded. Please try again later.");
throw;
}
catch (Exception ex)
Expand Down Expand Up @@ -367,7 +367,7 @@ private TransformManyBlock<ChangeKeyMessage, ErrorItemMessage> CreateChangeKeyBl
}
catch (RateLimitRejectedException)
{
_logger.Warning($"{msg.ResourceUrl}: Rate limit exceeded. Please try again later.");
_logger.Fatal($"{msg.ResourceUrl}: Rate limit exceeded. Please try again later.");
throw;
}
catch (Exception ex)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ private TransformManyBlock<GetItemForDeletionMessage, DeleteItemMessage> CreateG
}
catch (RateLimitRejectedException)
{
_logger.Warning($"{msg.ResourceUrl}: Rate limit exceeded. Please try again later.");
_logger.Fatal($"{msg.ResourceUrl}: Rate limit exceeded. Please try again later.");
throw;
}
catch (Exception ex)
Expand Down Expand Up @@ -315,7 +315,7 @@ private TransformManyBlock<DeleteItemMessage, ErrorItemMessage> CreateDeleteReso
}
catch (RateLimitRejectedException)
{
_logger.Warning($"{msg.ResourceUrl}: Rate limit exceeded. Please try again later.");
_logger.Fatal($"{msg.ResourceUrl}: Rate limit exceeded. Please try again later.");
throw;
}
catch (Exception ex)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -413,8 +413,8 @@ await HandlePostItemMessage(
}
catch (RateLimitRejectedException)
{
_logger.Warning($"{postItemMessage.ResourceUrl}: Rate limit exceeded. Please try again later.");
throw;
_logger.Fatal($"{postItemMessage.ResourceUrl}: Rate limit exceeded. Please try again later.");
return Enumerable.Empty<ErrorItemMessage>();
}
catch (Exception ex)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,12 @@ public int MaxDegreeOfParallelismForPostResourceItem

public bool EnableRateLimit { get; set; } = false;

public int RateLimitNumberExecutions { get; set; } = 100;
public int RateLimitNumberExecutions { get; set; } = 30;

public double RateLimitTimeSeconds { get; set; } = 1;

public int RateLimitMaxRetries { get; set; } = 5;

public bool UseReversePaging { get; set; } = false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public IConfigurationBuilder Create(string[] commandLineArgs)
["--enableRateLimit"] = "Options:EnableRateLimit",
["--rateLimitNumberExecutions"] = "Options:RateLimitNumberExecutions",
["--rateLimitTimeSeconds"] = "Options:RateLimitTimeSeconds",
["--rateLimitMaxRetries"] = "Options:RateLimitMaxRetries",
["--useReversePaging"] = "Options:UseReversePaging",


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Licensed to the Ed-Fi Alliance under one or more agreements.
// The Ed-Fi Alliance licenses this file to you under the Apache License, Version 2.0.
// See the LICENSE and NOTICES files in the project root for more information.
using Polly;
using Polly.RateLimit;
using System;
using System.Threading.Tasks;
Expand All @@ -10,6 +11,6 @@ namespace EdFi.Tools.ApiPublisher.Core.Configuration;

public interface IRateLimiting<TResult>
{
AsyncRateLimitPolicy<TResult> GetRateLimitingPolicy();
IAsyncPolicy<TResult> GetRateLimitingPolicy();
Task<TResult> ExecuteAsync(Func<Task<TResult>> action);
}
29 changes: 25 additions & 4 deletions src/EdFi.Tools.ApiPublisher.Core/Configuration/PollyRateLimiter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,49 @@
using System.Threading.Tasks;
using System.Net.Http;
using System.Threading.RateLimiting;
using Serilog;

namespace EdFi.Tools.ApiPublisher.Core.Configuration;

public class PollyRateLimiter<TResult> : IRateLimiting<TResult>
{
private readonly AsyncRateLimitPolicy<TResult> _rateLimiter;
private readonly IAsyncPolicy<TResult> _rateLimiter;
private readonly IAsyncPolicy<TResult> _retryPolicyForRateLimit;
private readonly ILogger _logger = Log.ForContext(typeof(PollyRateLimiter<TResult>));

public PollyRateLimiter(Options options)
{
_rateLimiter = Policy.RateLimitAsync<TResult>(
options.RateLimitNumberExecutions,
TimeSpan.FromSeconds(options.RateLimitTimeSeconds),
options.RateLimitNumberExecutions);
_retryPolicyForRateLimit = Policy<TResult>
.Handle<RateLimitRejectedException>()
.WaitAndRetryAsync(options.RateLimitMaxRetries, // Number of retries
retryAttempt => TimeSpan.FromSeconds(options.RateLimitTimeSeconds),
(exception, timeSpan, retryCount, context) =>
{
var delay = TimeSpan.FromSeconds(options.RateLimitTimeSeconds);
_logger.Warning($"Retry {retryCount} due to rate limit exceeded. Waiting {delay.TotalSeconds} seconds before next retry.");
}
);
}

public async Task<TResult> ExecuteAsync(Func<Task<TResult>> action)
{
return await _rateLimiter.ExecuteAsync(action);
try
{
return await _rateLimiter.ExecuteAsync(action);
}
catch (RateLimitRejectedException) {
_logger.Fatal("Rate limit exceeded. Please try again later.");
throw;
}

}

public AsyncRateLimitPolicy<TResult> GetRateLimitingPolicy()
public IAsyncPolicy<TResult> GetRateLimitingPolicy()
{
return _rateLimiter;
return Policy.WrapAsync(_retryPolicyForRateLimit, _rateLimiter);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
using EdFi.Tools.ApiPublisher.Core.Processing.Messages;
using EdFi.Tools.ApiPublisher.Core.Versioning;
using Newtonsoft.Json;
using Polly.RateLimit;
using Serilog;
using Serilog.Events;
using System;
Expand Down Expand Up @@ -192,6 +193,11 @@ await _sourceIsolationApplicator.ApplySourceSnapshotIdentifierAsync(configuratio
await UpdateChangeVersionAsync(configuration, changeWindow)
.ConfigureAwait(false);
}
catch (RateLimitRejectedException ex)
{
_logger.Fatal(ex.Message);
throw;
}
catch (Exception ex)
{
_logger.Fatal($"An unhandled exception occurred during processing: {ex}");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public void RateLimitedMethod_Should_Throw_RateLimiterRejectedException_On_Overl
options.EnableRateLimit = true;
options.RateLimitNumberExecutions = 5;
options.RateLimitTimeSeconds = 1;
options.RateLimitMaxRetries = 1;
var rateLimiter = new PollyRateLimiter<HttpResponseMessage>(options);

var methodToTest = new MockRateLimitingMethod(rateLimiter);
Expand Down
Loading