Skip to content

Commit

Permalink
Add add retry functionality to http client (from #6387)
Browse files Browse the repository at this point in the history
  • Loading branch information
nflaig committed Mar 9, 2024
1 parent 03b192c commit a94b49f
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 23 deletions.
79 changes: 56 additions & 23 deletions packages/api/src/utils/client/httpClient.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import {ErrorAborted, Logger, TimeoutError, isValidHttpUrl} from "@lodestar/utils";
import {ErrorAborted, Logger, TimeoutError, isValidHttpUrl, retry} from "@lodestar/utils";
import {WireFormat, mergeHeaders} from "../headers.js";
import {Endpoint} from "../types.js";
import {ApiRequestInit, ApiRequestInitRequired, RouteDefinitionExtra, createApiRequest} from "./request.js";
Expand All @@ -8,6 +8,8 @@ import {fetch, isFetchError} from "./fetch.js";

/** A higher default timeout, validator will sets its own shorter timeoutMs */
const DEFAULT_TIMEOUT_MS = 60_000;
const DEFAULT_RETRIES = 0;
const DEFAULT_RETRY_DELAY = 200;
const DEFAULT_REQUEST_WIRE_FORMAT = WireFormat.json;
const DEFAULT_RESPONSE_WIRE_FORMAT = WireFormat.ssz;

Expand Down Expand Up @@ -41,6 +43,7 @@ export type HttpClientModules = {

export class HttpClient implements IHttpClient {
readonly urlsInits: ApiRequestInitRequired[] = [];
readonly globalInit: ApiRequestInitRequired;

private readonly fetch: typeof fetch;
private readonly metrics: null | Metrics;
Expand All @@ -55,23 +58,27 @@ export class HttpClient implements IHttpClient {
constructor(opts: HttpClientOptions, {logger, metrics}: HttpClientModules = {}) {
// Cast to all types optional since they are defined with syntax `HttpClientOptions = A | B`
const {baseUrl, urls = []} = opts as {baseUrl?: string; urls?: (string | ApiRequestInit)[]};
const globalInit = opts.globalInit;
const defaults = {
const defaults: ApiRequestInitRequired = {
baseUrl: "",
timeoutMs: DEFAULT_TIMEOUT_MS,
retries: DEFAULT_RETRIES,
retryDelay: DEFAULT_RETRY_DELAY,
requestWireFormat: DEFAULT_REQUEST_WIRE_FORMAT,
responseWireFormat: DEFAULT_RESPONSE_WIRE_FORMAT,
};
this.globalInit = {
...defaults,
...opts.globalInit,
};

// opts.baseUrl is equivalent to `urls: [{baseUrl}]`
// unshift opts.baseUrl to urls, without mutating opts.urls
for (const [i, urlOrInit] of [...(baseUrl ? [baseUrl] : []), ...urls].entries()) {
const init = typeof urlOrInit === "string" ? {baseUrl: urlOrInit} : urlOrInit;
const urlInit: ApiRequestInitRequired = {
...defaults,
...globalInit,
...this.globalInit,
...init,
headers: mergeHeaders(globalInit?.headers, init.headers),
headers: mergeHeaders(this.globalInit.headers, init.headers),
};

if (!urlInit.baseUrl) {
Expand Down Expand Up @@ -106,13 +113,42 @@ export class HttpClient implements IHttpClient {
}
}

/**
* Request with possible retries
*/
async request<E extends Endpoint>(
definition: RouteDefinitionExtra<E>,
args: E["args"],
localInit: ApiRequestInit
): Promise<ApiResponse<E>> {
const retries = localInit.retries ?? this.globalInit.retries;

if (retries > 0) {
const routeId = definition.operationId;

return retry(
async (attempt) => {
const res = await this.requestWithFallbacks(definition, args, localInit);
if (!res.ok && attempt <= retries) {
throw await res.error();
}
return res;
},
{
retries,
retryDelay: localInit.retryDelay ?? this.globalInit.retryDelay,
signal: this.globalInit.signal ?? undefined,
onRetry: (e, attempt) => {
this.logger?.debug("Retrying request", {routeId, attempt, lastError: e.message});
},
}
);
} else {
return this.requestWithFallbacks(definition, args, localInit);
}
}

private async requestWithFallbacks<E extends Endpoint>(
definition: RouteDefinitionExtra<E>,
args: E["args"],
localInit: ApiRequestInit
): Promise<ApiResponse<E>> {
// Early return when no fallback URLs are setup
if (this.urlsInits.length === 1) {
Expand Down Expand Up @@ -200,33 +236,31 @@ export class HttpClient implements IHttpClient {
throw Error("loop ended without return or rejection");
}

async _request<E extends Endpoint>(
private async _request<E extends Endpoint>(
definition: RouteDefinitionExtra<E>,
args: E["args"],
localInit: ApiRequestInit,
urlIndex = 0
): Promise<ApiResponse<E>> {
const globalInit = this.urlsInits[urlIndex];
if (globalInit === undefined) {
const urlInit = this.urlsInits[urlIndex];
if (urlInit === undefined) {
throw new Error(`Url at index ${urlIndex} does not exist`);
}
const init = {
...globalInit,
...urlInit,
...localInit,
headers: mergeHeaders(globalInit.headers, localInit.headers),
headers: mergeHeaders(urlInit.headers, localInit.headers),
};

// Implement fetch timeout
const controller = new AbortController();
const timeout = setTimeout(() => controller.abort(), init.timeoutMs);
init.signal = controller.signal;

// Attach global/local signal to this request's controller
const onGlobalSignalAbort = (): void => controller.abort();
const signalGlobal = globalInit.signal;
const signalLocal = localInit.signal;
signalGlobal?.addEventListener("abort", onGlobalSignalAbort);
signalLocal?.addEventListener("abort", onGlobalSignalAbort);
// Attach global/url/local signal to this request's controller
const onSignalAbort = (): void => controller.abort();
const abortSignals = [this.globalInit.signal, urlInit.signal, localInit.signal];
abortSignals.forEach((s) => s?.addEventListener("abort", onSignalAbort));

const routeId = definition.operationId;
const timer = this.metrics?.requestTime.startTimer({routeId});
Expand Down Expand Up @@ -255,7 +289,7 @@ export class HttpClient implements IHttpClient {
this.metrics?.requestErrors.inc({routeId, baseUrl: init.baseUrl});

if (isAbortedError(e)) {
if (signalGlobal?.aborted || signalLocal?.aborted) {
if (abortSignals.some((s) => s?.aborted)) {
throw new ErrorAborted(`${definition.operationId} request`);
} else if (controller.signal.aborted) {
throw new TimeoutError(`${definition.operationId} request`);
Expand All @@ -269,8 +303,7 @@ export class HttpClient implements IHttpClient {
timer?.();

clearTimeout(timeout);
signalGlobal?.removeEventListener("abort", onGlobalSignalAbort);
signalLocal?.removeEventListener("abort", onGlobalSignalAbort);
abortSignals.forEach((s) => s?.removeEventListener("abort", onSignalAbort));
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions packages/api/src/utils/client/request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ export type ExtraRequestInit = {
requestWireFormat?: WireFormat;
responseWireFormat?: WireFormat;
timeoutMs?: number;
retries?: number;
retryDelay?: number;
};

export type OptionalRequestInit = {
Expand Down

0 comments on commit a94b49f

Please sign in to comment.