Skip to content

Commit

Permalink
Add support for eth1 provider state
Browse files Browse the repository at this point in the history
  • Loading branch information
nazarhussain committed Sep 11, 2023
1 parent bbfdcb4 commit 9a519ad
Show file tree
Hide file tree
Showing 6 changed files with 178 additions and 73 deletions.
4 changes: 3 additions & 1 deletion packages/beacon-node/src/eth1/eth1DepositDataTracker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ export class Eth1DepositDataTracker {

while (!this.signal.aborted) {
lastRunMs = Date.now();
const oldState = this.eth1Provider.getState();

try {
const hasCaughtUp = await this.update();
Expand All @@ -179,7 +180,8 @@ export class Eth1DepositDataTracker {
if (e instanceof HttpRpcError && e.status === 429) {
this.logger.debug("Eth1 provider rate limited", {}, e);
await sleep(RATE_LIMITED_WAIT_MS, this.signal);
} else if (!isErrorAborted(e)) {
// only log error if state switched from online to some other state
} else if (!isErrorAborted(e) && this.eth1Provider.getState() !== oldState) {
this.logger.error("Error updating eth1 chain cache", {}, e as Error);
await sleep(MIN_WAIT_ON_ERROR_MS, this.signal);
}
Expand Down
8 changes: 8 additions & 0 deletions packages/beacon-node/src/eth1/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@ export interface IEth1Provider {
getBlocksByNumber(fromBlock: number, toBlock: number): Promise<EthJsonRpcBlockRaw[]>;
getDepositEvents(fromBlock: number, toBlock: number): Promise<phase0.DepositEvent[]>;
validateContract(): Promise<void>;
getState(): Eth1ProviderState;
}

export enum Eth1ProviderState {
ONLINE = "ONLINE",
OFFLINE = "OFFLINE",
ERROR = "ERROR",
AUTH_FAILED = "AUTH_FAILED",
}

export type Eth1DataAndDeposits = {
Expand Down
47 changes: 44 additions & 3 deletions packages/beacon-node/src/eth1/provider/eth1Provider.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,24 @@
import {toHexString} from "@chainsafe/ssz";
import {phase0} from "@lodestar/types";
import {ChainConfig} from "@lodestar/config";
import {fromHex} from "@lodestar/utils";
import {fromHex, isErrorAborted} from "@lodestar/utils";

import {FetchError, isFetchError} from "@lodestar/api";
import {linspace} from "../../util/numpy.js";
import {depositEventTopics, parseDepositLog} from "../utils/depositContract.js";
import {Eth1Block, IEth1Provider} from "../interface.js";
import {Eth1Block, Eth1ProviderState, IEth1Provider} from "../interface.js";
import {DEFAULT_PROVIDER_URLS, Eth1Options} from "../options.js";
import {isValidAddress} from "../../util/address.js";
import {EthJsonRpcBlockRaw} from "../interface.js";
import {JsonRpcHttpClient, JsonRpcHttpClientMetrics, ReqOpts} from "./jsonRpcHttpClient.js";
import {HTTP_CONNECTION_ERROR_CODES, HTTP_FATAL_ERROR_CODES} from "../../execution/engine/utils.js";
import {
ErrorJsonRpcResponse,
HttpRpcError,
JsonRpcHttpClient,
JsonRpcHttpClientEvent,
JsonRpcHttpClientMetrics,
ReqOpts,
} from "./jsonRpcHttpClient.js";
import {isJsonRpcTruncatedError, quantityToNum, numToQuantity, dataToBytes} from "./utils.js";

/* eslint-disable @typescript-eslint/naming-convention */
Expand Down Expand Up @@ -46,6 +55,8 @@ export class Eth1Provider implements IEth1Provider {
readonly deployBlock: number;
private readonly depositContractAddress: string;
private readonly rpc: JsonRpcHttpClient;
// The default state is ONLINE, it will be updated to offline if we receive a http error
private state: Eth1ProviderState = Eth1ProviderState.ONLINE;

constructor(
config: Pick<ChainConfig, "DEPOSIT_CONTRACT_ADDRESS">,
Expand All @@ -62,6 +73,36 @@ export class Eth1Provider implements IEth1Provider {
jwtSecret: opts.jwtSecretHex ? fromHex(opts.jwtSecretHex) : undefined,
metrics: metrics,
});

this.rpc.emitter.on(JsonRpcHttpClientEvent.RESPONSE, () => {
this.state = Eth1ProviderState.ONLINE;
});

this.rpc.emitter.on(JsonRpcHttpClientEvent.ERROR, ({error}) => {
if (isErrorAborted(error)) {
this.state = Eth1ProviderState.ONLINE;
return;
}

if ((error as unknown) instanceof HttpRpcError || (error as unknown) instanceof ErrorJsonRpcResponse) {
this.state = Eth1ProviderState.ERROR;
return;
}

if (error && isFetchError(error) && HTTP_FATAL_ERROR_CODES.includes((error as FetchError).code)) {
this.state = Eth1ProviderState.OFFLINE;
return;
}

if (error && isFetchError(error) && HTTP_CONNECTION_ERROR_CODES.includes((error as FetchError).code)) {
this.state = Eth1ProviderState.AUTH_FAILED;
return;
}
});
}

getState(): Eth1ProviderState {
return this.state;
}

async validateContract(): Promise<void> {
Expand Down
125 changes: 83 additions & 42 deletions packages/beacon-node/src/eth1/provider/jsonRpcHttpClient.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,33 @@
import {EventEmitter} from "events";
import StrictEventEmitter from "strict-event-emitter-types";
import {fetch} from "@lodestar/api";
import {ErrorAborted, TimeoutError, isValidHttpUrl, retry} from "@lodestar/utils";
import {IGauge, IHistogram} from "../../metrics/interface.js";
import {IJson, RpcPayload} from "../interface.js";
import {encodeJwtToken} from "./jwt.js";

export const enum JsonRpcHttpClientEvent {
/**
* When registered this event will be emitted before the client throws an error.
* This is useful for defining the error behavior in a common place at the time of declaration of the client.
*/
ERROR = "jsonRpcHttpClient:error",
/**
* When registered this event will be emitted before the client returns the valid response to the caller.
* This is useful for defining some common behavior for each request/response cycle
*/
RESPONSE = "jsonRpcHttpClient:response",
}

export type JsonRpcHttpClientEvents = {
[JsonRpcHttpClientEvent.ERROR]: (event: {payload?: unknown; error: Error}) => void;
[JsonRpcHttpClientEvent.RESPONSE]: (event: {payload?: unknown; response: unknown}) => void;
};

export class JsonRpcHttpClientEventEmitter extends (EventEmitter as {
new (): StrictEventEmitter<EventEmitter, JsonRpcHttpClientEvents>;
}) {}

/**
* Limits the amount of response text printed with RPC or parsing errors
*/
Expand Down Expand Up @@ -46,6 +71,7 @@ export interface IJsonRpcHttpClient {
fetch<R, P = IJson[]>(payload: RpcPayload<P>, opts?: ReqOpts): Promise<R>;
fetchWithRetries<R, P = IJson[]>(payload: RpcPayload<P>, opts?: ReqOpts): Promise<R>;
fetchBatch<R>(rpcPayloadArr: RpcPayload[], opts?: ReqOpts): Promise<R[]>;
emitter: JsonRpcHttpClientEventEmitter;
}

export class JsonRpcHttpClient implements IJsonRpcHttpClient {
Expand All @@ -58,6 +84,7 @@ export class JsonRpcHttpClient implements IJsonRpcHttpClient {
*/
private readonly jwtSecret?: Uint8Array;
private readonly metrics: JsonRpcHttpClientMetrics | null;
readonly emitter = new JsonRpcHttpClientEventEmitter();

constructor(
private readonly urls: string[],
Expand Down Expand Up @@ -107,58 +134,80 @@ export class JsonRpcHttpClient implements IJsonRpcHttpClient {
* Perform RPC request
*/
async fetch<R, P = IJson[]>(payload: RpcPayload<P>, opts?: ReqOpts): Promise<R> {
const res: RpcResponse<R> = await this.fetchJson({jsonrpc: "2.0", id: this.id++, ...payload}, opts);
return parseRpcResponse(res, payload);
return this.wrapWithEvents(
async () => {
const res: RpcResponse<R> = await this.fetchJson({jsonrpc: "2.0", id: this.id++, ...payload}, opts);
return parseRpcResponse(res, payload);
},
{payload}
);
}

/**
* Perform RPC request with retry
*/
async fetchWithRetries<R, P = IJson[]>(payload: RpcPayload<P>, opts?: ReqOpts): Promise<R> {
const routeId = opts?.routeId ?? "unknown";

const res = await retry<RpcResponse<R>>(
async (attempt) => {
/** If this is a retry, increment the retry counter for this method */
if (attempt > 1) {
this.opts?.metrics?.retryCount.inc({routeId});
return this.wrapWithEvents(async () => {
const routeId = opts?.routeId ?? "unknown";

const res = await retry<RpcResponse<R>>(
async (attempt) => {
/** If this is a retry, increment the retry counter for this method */
if (attempt > 1) {
this.opts?.metrics?.retryCount.inc({routeId});
}
return this.fetchJson({jsonrpc: "2.0", id: this.id++, ...payload}, opts);
},
{
retries: opts?.retryAttempts ?? this.opts?.retryAttempts ?? 1,
retryDelay: opts?.retryDelay ?? this.opts?.retryDelay ?? 0,
shouldRetry: opts?.shouldRetry,
}
return this.fetchJson({jsonrpc: "2.0", id: this.id++, ...payload}, opts);
},
{
retries: opts?.retryAttempts ?? this.opts?.retryAttempts ?? 1,
retryDelay: opts?.retryDelay ?? this.opts?.retryDelay ?? 0,
shouldRetry: opts?.shouldRetry,
}
);
return parseRpcResponse(res, payload);
);
return parseRpcResponse(res, payload);
}, payload);
}

/**
* Perform RPC batched request
* Type-wise assumes all requests results have the same type
*/
async fetchBatch<R>(rpcPayloadArr: RpcPayload[], opts?: ReqOpts): Promise<R[]> {
if (rpcPayloadArr.length === 0) return [];

const resArr: RpcResponse<R>[] = await this.fetchJson(
rpcPayloadArr.map(({method, params}) => ({jsonrpc: "2.0", method, params, id: this.id++})),
opts
);
return this.wrapWithEvents(async () => {
if (rpcPayloadArr.length === 0) return [];

const resArr: RpcResponse<R>[] = await this.fetchJson(
rpcPayloadArr.map(({method, params}) => ({jsonrpc: "2.0", method, params, id: this.id++})),
opts
);

if (!Array.isArray(resArr)) {
// Nethermind may reply to batch request with a JSON RPC error
if ((resArr as RpcResponseError).error !== undefined) {
throw new ErrorJsonRpcResponse(resArr as RpcResponseError, "batch");
}

if (!Array.isArray(resArr)) {
// Nethermind may reply to batch request with a JSON RPC error
if ((resArr as RpcResponseError).error !== undefined) {
throw new ErrorJsonRpcResponse(resArr as RpcResponseError, "batch");
throw Error(`expected array of results, got ${resArr} - ${jsonSerializeTry(resArr)}`);
}

throw Error(`expected array of results, got ${resArr} - ${jsonSerializeTry(resArr)}`);
}
return resArr.map((res, i) => parseRpcResponse(res, rpcPayloadArr[i]));
}, rpcPayloadArr);
}

return resArr.map((res, i) => parseRpcResponse(res, rpcPayloadArr[i]));
private async wrapWithEvents<T>(func: () => Promise<T>, payload?: unknown): Promise<T> {
try {
const response = await func();
this.emitter.emit(JsonRpcHttpClientEvent.RESPONSE, {payload, response});
return response;
} catch (error) {
this.emitter.emit(JsonRpcHttpClientEvent.ERROR, {payload, error: error as Error});
throw error;
}
}

private async fetchJson<R, T = unknown>(json: T, opts?: ReqOpts): Promise<R> {
if (this.urls.length === 0) throw Error("No url provided");

const routeId = opts?.routeId ?? "unknown";
let lastError: Error | null = null;

Expand All @@ -170,21 +219,13 @@ export class JsonRpcHttpClient implements IJsonRpcHttpClient {
try {
return await this.fetchJsonOneUrl<R, T>(this.urls[i], json, opts);
} catch (e) {
lastError = e as Error;
if (this.opts?.shouldNotFallback?.(e as Error)) {
throw e;
break;
}

lastError = e as Error;
}
}

if (lastError !== null) {
throw lastError;
} else if (this.urls.length === 0) {
throw Error("No url provided");
} else {
throw Error("Unknown error");
}
throw lastError ?? Error("Unknown error");
}

/**
Expand Down
30 changes: 11 additions & 19 deletions packages/beacon-node/src/execution/engine/http.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ import {
ErrorJsonRpcResponse,
HttpRpcError,
IJsonRpcHttpClient,
JsonRpcHttpClientEvent,
ReqOpts,
} from "../../eth1/provider/jsonRpcHttpClient.js";
import {Metrics} from "../../metrics/index.js";
import {JobItemQueue} from "../../util/queue/index.js";
import {EPOCHS_PER_BATCH} from "../../sync/constants.js";
import {numToQuantity} from "../../eth1/provider/utils.js";
import {IJson, RpcPayload} from "../../eth1/interface.js";
import {
ExecutionPayloadStatus,
ExecutePayloadResponse,
Expand Down Expand Up @@ -110,7 +110,7 @@ export class ExecutionEngineHttp implements IExecutionEngine {
private readonly rpcFetchQueue: JobItemQueue<[EngineRequest], EngineResponse>;

private jobQueueProcessor = async ({method, params, methodOpts}: EngineRequest): Promise<EngineResponse> => {
return this.fetchWithRetries<EngineApiRpcReturnTypes[typeof method], EngineApiRpcParamTypes[typeof method]>(
return this.rpc.fetchWithRetries<EngineApiRpcReturnTypes[typeof method], EngineApiRpcParamTypes[typeof method]>(
{method, params},
methodOpts
);
Expand All @@ -126,22 +126,14 @@ export class ExecutionEngineHttp implements IExecutionEngine {
metrics?.engineHttpProcessorQueue
);
this.logger = logger;
}

protected async fetchWithRetries<R, P = IJson[]>(payload: RpcPayload<P>, opts?: ReqOpts): Promise<R> {
try {
const res = await this.rpc.fetchWithRetries<R, P>(payload, opts);
this.rpc.emitter.on(JsonRpcHttpClientEvent.ERROR, ({error}) => {
this.updateEngineState(getExecutionEngineState({payloadError: error, oldState: this.state}));
});

this.rpc.emitter.on(JsonRpcHttpClientEvent.RESPONSE, () => {
this.updateEngineState(getExecutionEngineState({targetState: ExecutionEngineState.ONLINE, oldState: this.state}));
return res;
} catch (err) {
this.updateEngineState(getExecutionEngineState({payloadError: err, oldState: this.state}));

/*
* TODO: For some error cases as abort, we may not want to escalate the error to the caller
* But for now the higher level code handles such cases so we can just rethrow the error
*/
throw err;
}
});
}

/**
Expand Down Expand Up @@ -370,7 +362,7 @@ export class ExecutionEngineHttp implements IExecutionEngine {
: ForkSeq[fork] >= ForkSeq.capella
? "engine_getPayloadV2"
: "engine_getPayloadV1";
const payloadResponse = await this.fetchWithRetries<
const payloadResponse = await this.rpc.fetchWithRetries<
EngineApiRpcReturnTypes[typeof method],
EngineApiRpcParamTypes[typeof method]
>(
Expand All @@ -390,7 +382,7 @@ export class ExecutionEngineHttp implements IExecutionEngine {
async getPayloadBodiesByHash(blockHashes: RootHex[]): Promise<(ExecutionPayloadBody | null)[]> {
const method = "engine_getPayloadBodiesByHashV1";
assertReqSizeLimit(blockHashes.length, 32);
const response = await this.fetchWithRetries<
const response = await this.rpc.fetchWithRetries<
EngineApiRpcReturnTypes[typeof method],
EngineApiRpcParamTypes[typeof method]
>({method, params: blockHashes});
Expand All @@ -405,7 +397,7 @@ export class ExecutionEngineHttp implements IExecutionEngine {
assertReqSizeLimit(blockCount, 32);
const start = numToQuantity(startBlockNumber);
const count = numToQuantity(blockCount);
const response = await this.fetchWithRetries<
const response = await this.rpc.fetchWithRetries<
EngineApiRpcReturnTypes[typeof method],
EngineApiRpcParamTypes[typeof method]
>({method, params: [start, count]});
Expand Down
Loading

0 comments on commit 9a519ad

Please sign in to comment.