Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: improve jsonRPC error UX for eth1 + execution #5949

Merged
merged 10 commits into from
Sep 29, 2023
2 changes: 1 addition & 1 deletion packages/beacon-node/src/chain/initState.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ export async function initStateFromEth1({

const builder = new GenesisBuilder({
config,
eth1Provider: new Eth1Provider(config, opts, signal),
eth1Provider: new Eth1Provider(config, {...opts, logger}, signal),
logger,
signal,
pendingStatus:
Expand Down
9 changes: 5 additions & 4 deletions packages/beacon-node/src/eth1/eth1DepositDataTracker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ export class Eth1DepositDataTracker {

while (!this.signal.aborted) {
lastRunMs = Date.now();
const oldState = this.eth1Provider.getState();

try {
const hasCaughtUp = await this.update();
Expand All @@ -175,16 +176,16 @@ export class Eth1DepositDataTracker {
await sleep(sleepTimeMs, this.signal);
}
} catch (e) {
this.metrics?.eth1.depositTrackerUpdateErrors.inc(1);

// From Infura: 429 Too Many Requests
if (e instanceof HttpRpcError && e.status === 429) {
this.logger.debug("Eth1 provider rate limited", {}, e);
await sleep(RATE_LIMITED_WAIT_MS, this.signal);
} else if (!isErrorAborted(e)) {
this.logger.error("Error updating eth1 chain cache", {}, e as Error);
// only log error if state switched from online to some other state
} else if (!isErrorAborted(e) && this.eth1Provider.getState() !== oldState) {
nazarhussain marked this conversation as resolved.
Show resolved Hide resolved
await sleep(MIN_WAIT_ON_ERROR_MS, this.signal);
}

this.metrics?.eth1.depositTrackerUpdateErrors.inc(1);
}
}
}
Expand Down
8 changes: 7 additions & 1 deletion packages/beacon-node/src/eth1/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,13 @@ export class Eth1ForBlockProduction implements IEth1ForBlockProduction {
modules: Eth1DepositDataTrackerModules & Eth1MergeBlockTrackerModules & {eth1Provider?: IEth1Provider}
) {
const eth1Provider =
modules.eth1Provider || new Eth1Provider(modules.config, opts, modules.signal, modules.metrics?.eth1HttpClient);
modules.eth1Provider ||
new Eth1Provider(
modules.config,
{...opts, logger: modules.logger},
modules.signal,
modules.metrics?.eth1HttpClient
);

this.eth1DepositDataTracker = opts.disableEth1DepositDataTracker
? null
Expand Down
8 changes: 8 additions & 0 deletions packages/beacon-node/src/eth1/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@ export interface IEth1Provider {
getBlocksByNumber(fromBlock: number, toBlock: number): Promise<EthJsonRpcBlockRaw[]>;
getDepositEvents(fromBlock: number, toBlock: number): Promise<phase0.DepositEvent[]>;
validateContract(): Promise<void>;
getState(): Eth1ProviderState;
}

export enum Eth1ProviderState {
ONLINE = "ONLINE",
OFFLINE = "OFFLINE",
ERROR = "ERROR",
AUTH_FAILED = "AUTH_FAILED",
}

export type Eth1DataAndDeposits = {
Expand Down
62 changes: 58 additions & 4 deletions packages/beacon-node/src/eth1/provider/eth1Provider.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,25 @@
import {toHexString} from "@chainsafe/ssz";
import {phase0} from "@lodestar/types";
import {ChainConfig} from "@lodestar/config";
import {fromHex} from "@lodestar/utils";
import {fromHex, isErrorAborted, waitForElapsedTime} from "@lodestar/utils";
import {Logger} from "@lodestar/logger";

import {FetchError, isFetchError} from "@lodestar/api";
import {linspace} from "../../util/numpy.js";
import {depositEventTopics, parseDepositLog} from "../utils/depositContract.js";
import {Eth1Block, IEth1Provider} from "../interface.js";
import {Eth1Block, Eth1ProviderState, IEth1Provider} from "../interface.js";
import {DEFAULT_PROVIDER_URLS, Eth1Options} from "../options.js";
import {isValidAddress} from "../../util/address.js";
import {EthJsonRpcBlockRaw} from "../interface.js";
import {JsonRpcHttpClient, JsonRpcHttpClientMetrics, ReqOpts} from "./jsonRpcHttpClient.js";
import {HTTP_CONNECTION_ERROR_CODES, HTTP_FATAL_ERROR_CODES} from "../../execution/engine/utils.js";
import {
ErrorJsonRpcResponse,
HttpRpcError,
JsonRpcHttpClient,
JsonRpcHttpClientEvent,
JsonRpcHttpClientMetrics,
ReqOpts,
} from "./jsonRpcHttpClient.js";
import {isJsonRpcTruncatedError, quantityToNum, numToQuantity, dataToBytes} from "./utils.js";

/* eslint-disable @typescript-eslint/naming-convention */
Expand Down Expand Up @@ -42,17 +52,23 @@ const getBlockByHashOpts: ReqOpts = {routeId: "getBlockByHash"};
const getBlockNumberOpts: ReqOpts = {routeId: "getBlockNumber"};
const getLogsOpts: ReqOpts = {routeId: "getLogs"};

const ifOneMinutePassed = waitForElapsedTime({minElapsedTime: 60_000});

export class Eth1Provider implements IEth1Provider {
readonly deployBlock: number;
private readonly depositContractAddress: string;
private readonly rpc: JsonRpcHttpClient;
// The default state is ONLINE, it will be updated to offline if we receive a http error
private state: Eth1ProviderState = Eth1ProviderState.ONLINE;
private logger?: Logger;

constructor(
config: Pick<ChainConfig, "DEPOSIT_CONTRACT_ADDRESS">,
opts: Pick<Eth1Options, "depositContractDeployBlock" | "providerUrls" | "jwtSecretHex">,
opts: Pick<Eth1Options, "depositContractDeployBlock" | "providerUrls" | "jwtSecretHex"> & {logger?: Logger},
signal?: AbortSignal,
metrics?: JsonRpcHttpClientMetrics | null
) {
this.logger = opts.logger;
this.deployBlock = opts.depositContractDeployBlock ?? 0;
this.depositContractAddress = toHexString(config.DEPOSIT_CONTRACT_ADDRESS);
this.rpc = new JsonRpcHttpClient(opts.providerUrls ?? DEFAULT_PROVIDER_URLS, {
Expand All @@ -62,6 +78,44 @@ export class Eth1Provider implements IEth1Provider {
jwtSecret: opts.jwtSecretHex ? fromHex(opts.jwtSecretHex) : undefined,
metrics: metrics,
});

this.rpc.emitter.on(JsonRpcHttpClientEvent.RESPONSE, () => {
const oldState = this.state;
this.state = Eth1ProviderState.ONLINE;

if (oldState !== Eth1ProviderState.ONLINE) {
this.logger?.info("Eth1Provider is back online", {oldState, newState: this.state});
}
});

this.rpc.emitter.on(JsonRpcHttpClientEvent.ERROR, ({error}) => {
if (isErrorAborted(error)) {
this.state = Eth1ProviderState.ONLINE;
} else if ((error as unknown) instanceof HttpRpcError || (error as unknown) instanceof ErrorJsonRpcResponse) {
this.state = Eth1ProviderState.ERROR;
} else if (error && isFetchError(error) && HTTP_FATAL_ERROR_CODES.includes((error as FetchError).code)) {
this.state = Eth1ProviderState.OFFLINE;
} else if (error && isFetchError(error) && HTTP_CONNECTION_ERROR_CODES.includes((error as FetchError).code)) {
this.state = Eth1ProviderState.AUTH_FAILED;
}

if (this.state !== Eth1ProviderState.ONLINE) {
ifOneMinutePassed(({msSinceLastCall, now}) => {
this.logger?.error(
"Eth1Provider faced error",
{
state: this.state,
lastErrorAt: msSinceLastCall !== undefined ? new Date(now - msSinceLastCall).toLocaleTimeString() : "N/A",
},
error
);
});
}
});
}

getState(): Eth1ProviderState {
return this.state;
}

async validateContract(): Promise<void> {
Expand Down
125 changes: 83 additions & 42 deletions packages/beacon-node/src/eth1/provider/jsonRpcHttpClient.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,33 @@
import {EventEmitter} from "events";
import StrictEventEmitter from "strict-event-emitter-types";
import {fetch} from "@lodestar/api";
import {ErrorAborted, TimeoutError, isValidHttpUrl, retry} from "@lodestar/utils";
import {IGauge, IHistogram} from "../../metrics/interface.js";
import {IJson, RpcPayload} from "../interface.js";
import {encodeJwtToken} from "./jwt.js";

export enum JsonRpcHttpClientEvent {
/**
* When registered this event will be emitted before the client throws an error.
* This is useful for defining the error behavior in a common place at the time of declaration of the client.
*/
ERROR = "jsonRpcHttpClient:error",
/**
* When registered this event will be emitted before the client returns the valid response to the caller.
* This is useful for defining some common behavior for each request/response cycle
*/
RESPONSE = "jsonRpcHttpClient:response",
}

export type JsonRpcHttpClientEvents = {
[JsonRpcHttpClientEvent.ERROR]: (event: {payload?: unknown; error: Error}) => void;
[JsonRpcHttpClientEvent.RESPONSE]: (event: {payload?: unknown; response: unknown}) => void;
};

export class JsonRpcHttpClientEventEmitter extends (EventEmitter as {
new (): StrictEventEmitter<EventEmitter, JsonRpcHttpClientEvents>;
}) {}

/**
* Limits the amount of response text printed with RPC or parsing errors
*/
Expand Down Expand Up @@ -46,6 +71,7 @@ export interface IJsonRpcHttpClient {
fetch<R, P = IJson[]>(payload: RpcPayload<P>, opts?: ReqOpts): Promise<R>;
fetchWithRetries<R, P = IJson[]>(payload: RpcPayload<P>, opts?: ReqOpts): Promise<R>;
fetchBatch<R>(rpcPayloadArr: RpcPayload[], opts?: ReqOpts): Promise<R[]>;
emitter: JsonRpcHttpClientEventEmitter;
}

export class JsonRpcHttpClient implements IJsonRpcHttpClient {
Expand All @@ -58,6 +84,7 @@ export class JsonRpcHttpClient implements IJsonRpcHttpClient {
*/
private readonly jwtSecret?: Uint8Array;
private readonly metrics: JsonRpcHttpClientMetrics | null;
readonly emitter = new JsonRpcHttpClientEventEmitter();

constructor(
private readonly urls: string[],
Expand Down Expand Up @@ -107,58 +134,80 @@ export class JsonRpcHttpClient implements IJsonRpcHttpClient {
* Perform RPC request
*/
async fetch<R, P = IJson[]>(payload: RpcPayload<P>, opts?: ReqOpts): Promise<R> {
const res: RpcResponse<R> = await this.fetchJson({jsonrpc: "2.0", id: this.id++, ...payload}, opts);
return parseRpcResponse(res, payload);
return this.wrapWithEvents(
nazarhussain marked this conversation as resolved.
Show resolved Hide resolved
async () => {
const res: RpcResponse<R> = await this.fetchJson({jsonrpc: "2.0", id: this.id++, ...payload}, opts);
return parseRpcResponse(res, payload);
},
{payload}
);
}

/**
* Perform RPC request with retry
*/
async fetchWithRetries<R, P = IJson[]>(payload: RpcPayload<P>, opts?: ReqOpts): Promise<R> {
const routeId = opts?.routeId ?? "unknown";

const res = await retry<RpcResponse<R>>(
async (attempt) => {
/** If this is a retry, increment the retry counter for this method */
if (attempt > 1) {
this.opts?.metrics?.retryCount.inc({routeId});
return this.wrapWithEvents(async () => {
const routeId = opts?.routeId ?? "unknown";

const res = await retry<RpcResponse<R>>(
async (attempt) => {
/** If this is a retry, increment the retry counter for this method */
if (attempt > 1) {
this.opts?.metrics?.retryCount.inc({routeId});
}
return this.fetchJson({jsonrpc: "2.0", id: this.id++, ...payload}, opts);
},
{
retries: opts?.retryAttempts ?? this.opts?.retryAttempts ?? 1,
retryDelay: opts?.retryDelay ?? this.opts?.retryDelay ?? 0,
shouldRetry: opts?.shouldRetry,
}
return this.fetchJson({jsonrpc: "2.0", id: this.id++, ...payload}, opts);
},
{
retries: opts?.retryAttempts ?? this.opts?.retryAttempts ?? 1,
retryDelay: opts?.retryDelay ?? this.opts?.retryDelay ?? 0,
shouldRetry: opts?.shouldRetry,
}
);
return parseRpcResponse(res, payload);
);
return parseRpcResponse(res, payload);
}, payload);
}

/**
* Perform RPC batched request
* Type-wise assumes all requests results have the same type
*/
async fetchBatch<R>(rpcPayloadArr: RpcPayload[], opts?: ReqOpts): Promise<R[]> {
if (rpcPayloadArr.length === 0) return [];

const resArr: RpcResponse<R>[] = await this.fetchJson(
rpcPayloadArr.map(({method, params}) => ({jsonrpc: "2.0", method, params, id: this.id++})),
opts
);
return this.wrapWithEvents(async () => {
if (rpcPayloadArr.length === 0) return [];

const resArr: RpcResponse<R>[] = await this.fetchJson(
rpcPayloadArr.map(({method, params}) => ({jsonrpc: "2.0", method, params, id: this.id++})),
opts
);

if (!Array.isArray(resArr)) {
// Nethermind may reply to batch request with a JSON RPC error
if ((resArr as RpcResponseError).error !== undefined) {
throw new ErrorJsonRpcResponse(resArr as RpcResponseError, "batch");
}

if (!Array.isArray(resArr)) {
// Nethermind may reply to batch request with a JSON RPC error
if ((resArr as RpcResponseError).error !== undefined) {
throw new ErrorJsonRpcResponse(resArr as RpcResponseError, "batch");
throw Error(`expected array of results, got ${resArr} - ${jsonSerializeTry(resArr)}`);
}

throw Error(`expected array of results, got ${resArr} - ${jsonSerializeTry(resArr)}`);
}
return resArr.map((res, i) => parseRpcResponse(res, rpcPayloadArr[i]));
}, rpcPayloadArr);
}

return resArr.map((res, i) => parseRpcResponse(res, rpcPayloadArr[i]));
private async wrapWithEvents<T>(func: () => Promise<T>, payload?: unknown): Promise<T> {
try {
const response = await func();
this.emitter.emit(JsonRpcHttpClientEvent.RESPONSE, {payload, response});
return response;
} catch (error) {
this.emitter.emit(JsonRpcHttpClientEvent.ERROR, {payload, error: error as Error});
throw error;
}
}

private async fetchJson<R, T = unknown>(json: T, opts?: ReqOpts): Promise<R> {
if (this.urls.length === 0) throw Error("No url provided");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I previously removed a similar check like this in #5884 as we already check the URLs in the constructor

if (urls.length === 0) {
throw Error("No urls provided to JsonRpcHttpClient");
}

I don't think this error is reachable but if it is we should make sure to validate the URLs earlier.


const routeId = opts?.routeId ?? "unknown";
let lastError: Error | null = null;

Expand All @@ -170,21 +219,13 @@ export class JsonRpcHttpClient implements IJsonRpcHttpClient {
try {
return await this.fetchJsonOneUrl<R, T>(this.urls[i], json, opts);
} catch (e) {
lastError = e as Error;
if (this.opts?.shouldNotFallback?.(e as Error)) {
throw e;
break;
}

lastError = e as Error;
}
}

if (lastError !== null) {
throw lastError;
} else if (this.urls.length === 0) {
throw Error("No url provided");
} else {
throw Error("Unknown error");
}
throw lastError ?? Error("Unknown error");
nazarhussain marked this conversation as resolved.
Show resolved Hide resolved
}

/**
Expand Down
Loading