Skip to content

Commit

Permalink
fix: improve jsonRPC error UX for eth1 + execution (#5949)
Browse files Browse the repository at this point in the history
* Add support for eth1 provider state

* Fix the tests types

* Fix transient enum const type error

* Add a utility for elapsed time

* Update the eth1 provider to check elapsed time

* Fix the typs

* Fix the condition for the error tracker

* Simplify the waitForElapsedTime

* Update the name for tracker

* Fix lint error
  • Loading branch information
nazarhussain authored Sep 29, 2023
1 parent 9cd65cc commit 3cfa9cd
Show file tree
Hide file tree
Showing 13 changed files with 265 additions and 82 deletions.
2 changes: 1 addition & 1 deletion packages/beacon-node/src/chain/initState.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ export async function initStateFromEth1({

const builder = new GenesisBuilder({
config,
eth1Provider: new Eth1Provider(config, opts, signal),
eth1Provider: new Eth1Provider(config, {...opts, logger}, signal),
logger,
signal,
pendingStatus:
Expand Down
6 changes: 3 additions & 3 deletions packages/beacon-node/src/eth1/eth1DepositDataTracker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -175,16 +175,16 @@ export class Eth1DepositDataTracker {
await sleep(sleepTimeMs, this.signal);
}
} catch (e) {
this.metrics?.eth1.depositTrackerUpdateErrors.inc(1);

// From Infura: 429 Too Many Requests
if (e instanceof HttpRpcError && e.status === 429) {
this.logger.debug("Eth1 provider rate limited", {}, e);
await sleep(RATE_LIMITED_WAIT_MS, this.signal);
// only log error if state switched from online to some other state
} else if (!isErrorAborted(e)) {
this.logger.error("Error updating eth1 chain cache", {}, e as Error);
await sleep(MIN_WAIT_ON_ERROR_MS, this.signal);
}

this.metrics?.eth1.depositTrackerUpdateErrors.inc(1);
}
}
}
Expand Down
8 changes: 7 additions & 1 deletion packages/beacon-node/src/eth1/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,13 @@ export class Eth1ForBlockProduction implements IEth1ForBlockProduction {
modules: Eth1DepositDataTrackerModules & Eth1MergeBlockTrackerModules & {eth1Provider?: IEth1Provider}
) {
const eth1Provider =
modules.eth1Provider || new Eth1Provider(modules.config, opts, modules.signal, modules.metrics?.eth1HttpClient);
modules.eth1Provider ||
new Eth1Provider(
modules.config,
{...opts, logger: modules.logger},
modules.signal,
modules.metrics?.eth1HttpClient
);

this.eth1DepositDataTracker = opts.disableEth1DepositDataTracker
? null
Expand Down
8 changes: 8 additions & 0 deletions packages/beacon-node/src/eth1/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@ export interface IEth1Provider {
getBlocksByNumber(fromBlock: number, toBlock: number): Promise<EthJsonRpcBlockRaw[]>;
getDepositEvents(fromBlock: number, toBlock: number): Promise<phase0.DepositEvent[]>;
validateContract(): Promise<void>;
getState(): Eth1ProviderState;
}

export enum Eth1ProviderState {
ONLINE = "ONLINE",
OFFLINE = "OFFLINE",
ERROR = "ERROR",
AUTH_FAILED = "AUTH_FAILED",
}

export type Eth1DataAndDeposits = {
Expand Down
62 changes: 58 additions & 4 deletions packages/beacon-node/src/eth1/provider/eth1Provider.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,25 @@
import {toHexString} from "@chainsafe/ssz";
import {phase0} from "@lodestar/types";
import {ChainConfig} from "@lodestar/config";
import {fromHex} from "@lodestar/utils";
import {fromHex, isErrorAborted, createElapsedTimeTracker} from "@lodestar/utils";
import {Logger} from "@lodestar/logger";

import {FetchError, isFetchError} from "@lodestar/api";
import {linspace} from "../../util/numpy.js";
import {depositEventTopics, parseDepositLog} from "../utils/depositContract.js";
import {Eth1Block, IEth1Provider} from "../interface.js";
import {Eth1Block, Eth1ProviderState, IEth1Provider} from "../interface.js";
import {DEFAULT_PROVIDER_URLS, Eth1Options} from "../options.js";
import {isValidAddress} from "../../util/address.js";
import {EthJsonRpcBlockRaw} from "../interface.js";
import {JsonRpcHttpClient, JsonRpcHttpClientMetrics, ReqOpts} from "./jsonRpcHttpClient.js";
import {HTTP_CONNECTION_ERROR_CODES, HTTP_FATAL_ERROR_CODES} from "../../execution/engine/utils.js";
import {
ErrorJsonRpcResponse,
HttpRpcError,
JsonRpcHttpClient,
JsonRpcHttpClientEvent,
JsonRpcHttpClientMetrics,
ReqOpts,
} from "./jsonRpcHttpClient.js";
import {isJsonRpcTruncatedError, quantityToNum, numToQuantity, dataToBytes} from "./utils.js";

/* eslint-disable @typescript-eslint/naming-convention */
Expand Down Expand Up @@ -42,17 +52,23 @@ const getBlockByHashOpts: ReqOpts = {routeId: "getBlockByHash"};
const getBlockNumberOpts: ReqOpts = {routeId: "getBlockNumber"};
const getLogsOpts: ReqOpts = {routeId: "getLogs"};

const isOneMinutePassed = createElapsedTimeTracker({minElapsedTime: 60_000});

export class Eth1Provider implements IEth1Provider {
readonly deployBlock: number;
private readonly depositContractAddress: string;
private readonly rpc: JsonRpcHttpClient;
// The default state is ONLINE, it will be updated to offline if we receive a http error
private state: Eth1ProviderState = Eth1ProviderState.ONLINE;
private logger?: Logger;

constructor(
config: Pick<ChainConfig, "DEPOSIT_CONTRACT_ADDRESS">,
opts: Pick<Eth1Options, "depositContractDeployBlock" | "providerUrls" | "jwtSecretHex">,
opts: Pick<Eth1Options, "depositContractDeployBlock" | "providerUrls" | "jwtSecretHex"> & {logger?: Logger},
signal?: AbortSignal,
metrics?: JsonRpcHttpClientMetrics | null
) {
this.logger = opts.logger;
this.deployBlock = opts.depositContractDeployBlock ?? 0;
this.depositContractAddress = toHexString(config.DEPOSIT_CONTRACT_ADDRESS);
this.rpc = new JsonRpcHttpClient(opts.providerUrls ?? DEFAULT_PROVIDER_URLS, {
Expand All @@ -62,6 +78,44 @@ export class Eth1Provider implements IEth1Provider {
jwtSecret: opts.jwtSecretHex ? fromHex(opts.jwtSecretHex) : undefined,
metrics: metrics,
});

this.rpc.emitter.on(JsonRpcHttpClientEvent.RESPONSE, () => {
const oldState = this.state;
this.state = Eth1ProviderState.ONLINE;

if (oldState !== Eth1ProviderState.ONLINE) {
this.logger?.info("Eth1Provider is back online", {oldState, newState: this.state});
}
});

this.rpc.emitter.on(JsonRpcHttpClientEvent.ERROR, ({error}) => {
if (isErrorAborted(error)) {
this.state = Eth1ProviderState.ONLINE;
} else if ((error as unknown) instanceof HttpRpcError || (error as unknown) instanceof ErrorJsonRpcResponse) {
this.state = Eth1ProviderState.ERROR;
} else if (error && isFetchError(error) && HTTP_FATAL_ERROR_CODES.includes((error as FetchError).code)) {
this.state = Eth1ProviderState.OFFLINE;
} else if (error && isFetchError(error) && HTTP_CONNECTION_ERROR_CODES.includes((error as FetchError).code)) {
this.state = Eth1ProviderState.AUTH_FAILED;
}

if (this.state !== Eth1ProviderState.ONLINE) {
if (isOneMinutePassed()) {
this.logger?.error(
"Eth1Provider faced error",
{
state: this.state,
lastErrorAt: new Date(Date.now() - isOneMinutePassed.msSinceLastCall).toLocaleTimeString(),
},
error
);
}
}
});
}

getState(): Eth1ProviderState {
return this.state;
}

async validateContract(): Promise<void> {
Expand Down
125 changes: 83 additions & 42 deletions packages/beacon-node/src/eth1/provider/jsonRpcHttpClient.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,33 @@
import {EventEmitter} from "events";
import StrictEventEmitter from "strict-event-emitter-types";
import {fetch} from "@lodestar/api";
import {ErrorAborted, TimeoutError, isValidHttpUrl, retry} from "@lodestar/utils";
import {IGauge, IHistogram} from "../../metrics/interface.js";
import {IJson, RpcPayload} from "../interface.js";
import {encodeJwtToken} from "./jwt.js";

export enum JsonRpcHttpClientEvent {
/**
* When registered this event will be emitted before the client throws an error.
* This is useful for defining the error behavior in a common place at the time of declaration of the client.
*/
ERROR = "jsonRpcHttpClient:error",
/**
* When registered this event will be emitted before the client returns the valid response to the caller.
* This is useful for defining some common behavior for each request/response cycle
*/
RESPONSE = "jsonRpcHttpClient:response",
}

export type JsonRpcHttpClientEvents = {
[JsonRpcHttpClientEvent.ERROR]: (event: {payload?: unknown; error: Error}) => void;
[JsonRpcHttpClientEvent.RESPONSE]: (event: {payload?: unknown; response: unknown}) => void;
};

export class JsonRpcHttpClientEventEmitter extends (EventEmitter as {
new (): StrictEventEmitter<EventEmitter, JsonRpcHttpClientEvents>;
}) {}

/**
* Limits the amount of response text printed with RPC or parsing errors
*/
Expand Down Expand Up @@ -46,6 +71,7 @@ export interface IJsonRpcHttpClient {
fetch<R, P = IJson[]>(payload: RpcPayload<P>, opts?: ReqOpts): Promise<R>;
fetchWithRetries<R, P = IJson[]>(payload: RpcPayload<P>, opts?: ReqOpts): Promise<R>;
fetchBatch<R>(rpcPayloadArr: RpcPayload[], opts?: ReqOpts): Promise<R[]>;
emitter: JsonRpcHttpClientEventEmitter;
}

export class JsonRpcHttpClient implements IJsonRpcHttpClient {
Expand All @@ -58,6 +84,7 @@ export class JsonRpcHttpClient implements IJsonRpcHttpClient {
*/
private readonly jwtSecret?: Uint8Array;
private readonly metrics: JsonRpcHttpClientMetrics | null;
readonly emitter = new JsonRpcHttpClientEventEmitter();

constructor(
private readonly urls: string[],
Expand Down Expand Up @@ -107,58 +134,80 @@ export class JsonRpcHttpClient implements IJsonRpcHttpClient {
* Perform RPC request
*/
async fetch<R, P = IJson[]>(payload: RpcPayload<P>, opts?: ReqOpts): Promise<R> {
const res: RpcResponse<R> = await this.fetchJson({jsonrpc: "2.0", id: this.id++, ...payload}, opts);
return parseRpcResponse(res, payload);
return this.wrapWithEvents(
async () => {
const res: RpcResponse<R> = await this.fetchJson({jsonrpc: "2.0", id: this.id++, ...payload}, opts);
return parseRpcResponse(res, payload);
},
{payload}
);
}

/**
* Perform RPC request with retry
*/
async fetchWithRetries<R, P = IJson[]>(payload: RpcPayload<P>, opts?: ReqOpts): Promise<R> {
const routeId = opts?.routeId ?? "unknown";

const res = await retry<RpcResponse<R>>(
async (attempt) => {
/** If this is a retry, increment the retry counter for this method */
if (attempt > 1) {
this.opts?.metrics?.retryCount.inc({routeId});
return this.wrapWithEvents(async () => {
const routeId = opts?.routeId ?? "unknown";

const res = await retry<RpcResponse<R>>(
async (attempt) => {
/** If this is a retry, increment the retry counter for this method */
if (attempt > 1) {
this.opts?.metrics?.retryCount.inc({routeId});
}
return this.fetchJson({jsonrpc: "2.0", id: this.id++, ...payload}, opts);
},
{
retries: opts?.retryAttempts ?? this.opts?.retryAttempts ?? 1,
retryDelay: opts?.retryDelay ?? this.opts?.retryDelay ?? 0,
shouldRetry: opts?.shouldRetry,
}
return this.fetchJson({jsonrpc: "2.0", id: this.id++, ...payload}, opts);
},
{
retries: opts?.retryAttempts ?? this.opts?.retryAttempts ?? 1,
retryDelay: opts?.retryDelay ?? this.opts?.retryDelay ?? 0,
shouldRetry: opts?.shouldRetry,
}
);
return parseRpcResponse(res, payload);
);
return parseRpcResponse(res, payload);
}, payload);
}

/**
* Perform RPC batched request
* Type-wise assumes all requests results have the same type
*/
async fetchBatch<R>(rpcPayloadArr: RpcPayload[], opts?: ReqOpts): Promise<R[]> {
if (rpcPayloadArr.length === 0) return [];

const resArr: RpcResponse<R>[] = await this.fetchJson(
rpcPayloadArr.map(({method, params}) => ({jsonrpc: "2.0", method, params, id: this.id++})),
opts
);
return this.wrapWithEvents(async () => {
if (rpcPayloadArr.length === 0) return [];

const resArr: RpcResponse<R>[] = await this.fetchJson(
rpcPayloadArr.map(({method, params}) => ({jsonrpc: "2.0", method, params, id: this.id++})),
opts
);

if (!Array.isArray(resArr)) {
// Nethermind may reply to batch request with a JSON RPC error
if ((resArr as RpcResponseError).error !== undefined) {
throw new ErrorJsonRpcResponse(resArr as RpcResponseError, "batch");
}

if (!Array.isArray(resArr)) {
// Nethermind may reply to batch request with a JSON RPC error
if ((resArr as RpcResponseError).error !== undefined) {
throw new ErrorJsonRpcResponse(resArr as RpcResponseError, "batch");
throw Error(`expected array of results, got ${resArr} - ${jsonSerializeTry(resArr)}`);
}

throw Error(`expected array of results, got ${resArr} - ${jsonSerializeTry(resArr)}`);
}
return resArr.map((res, i) => parseRpcResponse(res, rpcPayloadArr[i]));
}, rpcPayloadArr);
}

return resArr.map((res, i) => parseRpcResponse(res, rpcPayloadArr[i]));
private async wrapWithEvents<T>(func: () => Promise<T>, payload?: unknown): Promise<T> {
try {
const response = await func();
this.emitter.emit(JsonRpcHttpClientEvent.RESPONSE, {payload, response});
return response;
} catch (error) {
this.emitter.emit(JsonRpcHttpClientEvent.ERROR, {payload, error: error as Error});
throw error;
}
}

private async fetchJson<R, T = unknown>(json: T, opts?: ReqOpts): Promise<R> {
if (this.urls.length === 0) throw Error("No url provided");

const routeId = opts?.routeId ?? "unknown";
let lastError: Error | null = null;

Expand All @@ -170,21 +219,13 @@ export class JsonRpcHttpClient implements IJsonRpcHttpClient {
try {
return await this.fetchJsonOneUrl<R, T>(this.urls[i], json, opts);
} catch (e) {
lastError = e as Error;
if (this.opts?.shouldNotFallback?.(e as Error)) {
throw e;
break;
}

lastError = e as Error;
}
}

if (lastError !== null) {
throw lastError;
} else if (this.urls.length === 0) {
throw Error("No url provided");
} else {
throw Error("Unknown error");
}
throw lastError ?? Error("Unknown error");
}

/**
Expand Down
Loading

0 comments on commit 3cfa9cd

Please sign in to comment.