From a94b49f7119648651e75e9de1be9ec9085ccfa6b Mon Sep 17 00:00:00 2001 From: Nico Flaig Date: Sat, 9 Mar 2024 19:49:17 +0100 Subject: [PATCH] Add add retry functionality to http client (from #6387) --- packages/api/src/utils/client/httpClient.ts | 79 +++++++++++++++------ packages/api/src/utils/client/request.ts | 2 + 2 files changed, 58 insertions(+), 23 deletions(-) diff --git a/packages/api/src/utils/client/httpClient.ts b/packages/api/src/utils/client/httpClient.ts index 7c2f1a956456..2ec479eb338f 100644 --- a/packages/api/src/utils/client/httpClient.ts +++ b/packages/api/src/utils/client/httpClient.ts @@ -1,4 +1,4 @@ -import {ErrorAborted, Logger, TimeoutError, isValidHttpUrl} from "@lodestar/utils"; +import {ErrorAborted, Logger, TimeoutError, isValidHttpUrl, retry} from "@lodestar/utils"; import {WireFormat, mergeHeaders} from "../headers.js"; import {Endpoint} from "../types.js"; import {ApiRequestInit, ApiRequestInitRequired, RouteDefinitionExtra, createApiRequest} from "./request.js"; @@ -8,6 +8,8 @@ import {fetch, isFetchError} from "./fetch.js"; /** A higher default timeout, validator will sets its own shorter timeoutMs */ const DEFAULT_TIMEOUT_MS = 60_000; +const DEFAULT_RETRIES = 0; +const DEFAULT_RETRY_DELAY = 200; const DEFAULT_REQUEST_WIRE_FORMAT = WireFormat.json; const DEFAULT_RESPONSE_WIRE_FORMAT = WireFormat.ssz; @@ -41,6 +43,7 @@ export type HttpClientModules = { export class HttpClient implements IHttpClient { readonly urlsInits: ApiRequestInitRequired[] = []; + readonly globalInit: ApiRequestInitRequired; private readonly fetch: typeof fetch; private readonly metrics: null | Metrics; @@ -55,23 +58,27 @@ export class HttpClient implements IHttpClient { constructor(opts: HttpClientOptions, {logger, metrics}: HttpClientModules = {}) { // Cast to all types optional since they are defined with syntax `HttpClientOptions = A | B` const {baseUrl, urls = []} = opts as {baseUrl?: string; urls?: (string | ApiRequestInit)[]}; - const globalInit = opts.globalInit; - const defaults = { + const defaults: ApiRequestInitRequired = { baseUrl: "", timeoutMs: DEFAULT_TIMEOUT_MS, + retries: DEFAULT_RETRIES, + retryDelay: DEFAULT_RETRY_DELAY, requestWireFormat: DEFAULT_REQUEST_WIRE_FORMAT, responseWireFormat: DEFAULT_RESPONSE_WIRE_FORMAT, }; + this.globalInit = { + ...defaults, + ...opts.globalInit, + }; // opts.baseUrl is equivalent to `urls: [{baseUrl}]` // unshift opts.baseUrl to urls, without mutating opts.urls for (const [i, urlOrInit] of [...(baseUrl ? [baseUrl] : []), ...urls].entries()) { const init = typeof urlOrInit === "string" ? {baseUrl: urlOrInit} : urlOrInit; const urlInit: ApiRequestInitRequired = { - ...defaults, - ...globalInit, + ...this.globalInit, ...init, - headers: mergeHeaders(globalInit?.headers, init.headers), + headers: mergeHeaders(this.globalInit.headers, init.headers), }; if (!urlInit.baseUrl) { @@ -106,13 +113,42 @@ export class HttpClient implements IHttpClient { } } - /** - * Request with possible retries - */ async request( definition: RouteDefinitionExtra, args: E["args"], localInit: ApiRequestInit + ): Promise> { + const retries = localInit.retries ?? this.globalInit.retries; + + if (retries > 0) { + const routeId = definition.operationId; + + return retry( + async (attempt) => { + const res = await this.requestWithFallbacks(definition, args, localInit); + if (!res.ok && attempt <= retries) { + throw await res.error(); + } + return res; + }, + { + retries, + retryDelay: localInit.retryDelay ?? this.globalInit.retryDelay, + signal: this.globalInit.signal ?? undefined, + onRetry: (e, attempt) => { + this.logger?.debug("Retrying request", {routeId, attempt, lastError: e.message}); + }, + } + ); + } else { + return this.requestWithFallbacks(definition, args, localInit); + } + } + + private async requestWithFallbacks( + definition: RouteDefinitionExtra, + args: E["args"], + localInit: ApiRequestInit ): Promise> { // Early return when no fallback URLs are setup if (this.urlsInits.length === 1) { @@ -200,20 +236,20 @@ export class HttpClient implements IHttpClient { throw Error("loop ended without return or rejection"); } - async _request( + private async _request( definition: RouteDefinitionExtra, args: E["args"], localInit: ApiRequestInit, urlIndex = 0 ): Promise> { - const globalInit = this.urlsInits[urlIndex]; - if (globalInit === undefined) { + const urlInit = this.urlsInits[urlIndex]; + if (urlInit === undefined) { throw new Error(`Url at index ${urlIndex} does not exist`); } const init = { - ...globalInit, + ...urlInit, ...localInit, - headers: mergeHeaders(globalInit.headers, localInit.headers), + headers: mergeHeaders(urlInit.headers, localInit.headers), }; // Implement fetch timeout @@ -221,12 +257,10 @@ export class HttpClient implements IHttpClient { const timeout = setTimeout(() => controller.abort(), init.timeoutMs); init.signal = controller.signal; - // Attach global/local signal to this request's controller - const onGlobalSignalAbort = (): void => controller.abort(); - const signalGlobal = globalInit.signal; - const signalLocal = localInit.signal; - signalGlobal?.addEventListener("abort", onGlobalSignalAbort); - signalLocal?.addEventListener("abort", onGlobalSignalAbort); + // Attach global/url/local signal to this request's controller + const onSignalAbort = (): void => controller.abort(); + const abortSignals = [this.globalInit.signal, urlInit.signal, localInit.signal]; + abortSignals.forEach((s) => s?.addEventListener("abort", onSignalAbort)); const routeId = definition.operationId; const timer = this.metrics?.requestTime.startTimer({routeId}); @@ -255,7 +289,7 @@ export class HttpClient implements IHttpClient { this.metrics?.requestErrors.inc({routeId, baseUrl: init.baseUrl}); if (isAbortedError(e)) { - if (signalGlobal?.aborted || signalLocal?.aborted) { + if (abortSignals.some((s) => s?.aborted)) { throw new ErrorAborted(`${definition.operationId} request`); } else if (controller.signal.aborted) { throw new TimeoutError(`${definition.operationId} request`); @@ -269,8 +303,7 @@ export class HttpClient implements IHttpClient { timer?.(); clearTimeout(timeout); - signalGlobal?.removeEventListener("abort", onGlobalSignalAbort); - signalLocal?.removeEventListener("abort", onGlobalSignalAbort); + abortSignals.forEach((s) => s?.removeEventListener("abort", onSignalAbort)); } } } diff --git a/packages/api/src/utils/client/request.ts b/packages/api/src/utils/client/request.ts index 59a39f4cb82d..0361bb5c8f64 100644 --- a/packages/api/src/utils/client/request.ts +++ b/packages/api/src/utils/client/request.ts @@ -14,6 +14,8 @@ export type ExtraRequestInit = { requestWireFormat?: WireFormat; responseWireFormat?: WireFormat; timeoutMs?: number; + retries?: number; + retryDelay?: number; }; export type OptionalRequestInit = {