Skip to content

Commit

Permalink
Merge branch 'unstable' into nflaig/remove-redundant-error-listener
Browse files Browse the repository at this point in the history
  • Loading branch information
nflaig authored Jul 24, 2023
2 parents ec0a60f + 7b5fc63 commit 3ea4321
Show file tree
Hide file tree
Showing 44 changed files with 369 additions and 131 deletions.
11 changes: 9 additions & 2 deletions packages/api/src/beacon/server/events.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import {ChainForkConfig} from "@lodestar/config";
import {Api, ReqTypes, routesData, getEventSerdes} from "../routes/events.js";
import {ServerRoutes} from "../../utils/server/index.js";
import {Api, ReqTypes, routesData, getEventSerdes, eventTypes} from "../routes/events.js";
import {ApiError, ServerRoutes} from "../../utils/server/index.js";
import {ServerApi} from "../../interfaces.js";

export function getRoutes(config: ChainForkConfig, api: ServerApi<Api>): ServerRoutes<Api, ReqTypes> {
Expand All @@ -14,6 +14,13 @@ export function getRoutes(config: ChainForkConfig, api: ServerApi<Api>): ServerR
id: "eventstream",

handler: async (req, res) => {
const validTopics = new Set(Object.values(eventTypes));
for (const topic of req.query.topics) {
if (!validTopics.has(topic)) {
throw new ApiError(400, `Invalid topic: ${topic}`);
}
}

const controller = new AbortController();

try {
Expand Down
5 changes: 4 additions & 1 deletion packages/api/src/beacon/server/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import {ChainForkConfig} from "@lodestar/config";
import {Api} from "../routes/index.js";
import {ServerInstance, ServerRoute, RouteConfig, registerRoute} from "../../utils/server/index.js";
import {ApiError, ServerInstance, ServerRoute, RouteConfig, registerRoute} from "../../utils/server/index.js";

import {ServerApi} from "../../interfaces.js";
import * as beacon from "./beacon.js";
Expand All @@ -13,6 +13,9 @@ import * as node from "./node.js";
import * as proof from "./proof.js";
import * as validator from "./validator.js";

// Re-export for usage in beacon-node
export {ApiError};

// Re-export for convenience
export {RouteConfig};

Expand Down
9 changes: 9 additions & 0 deletions packages/api/src/utils/server/errors.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import {HttpErrorCodes} from "../client/httpStatusCode.js";

export class ApiError extends Error {
statusCode: HttpErrorCodes;
constructor(statusCode: HttpErrorCodes, message?: string) {
super(message);
this.statusCode = statusCode;
}
}
1 change: 1 addition & 0 deletions packages/api/src/utils/server/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
export * from "./genericJsonServer.js";
export * from "./registerRoute.js";
export * from "./errors.js";
export * from "./types.js";
10 changes: 2 additions & 8 deletions packages/beacon-node/src/api/impl/errors.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,6 @@
import {HttpErrorCodes} from "@lodestar/api";
import {ApiError} from "@lodestar/api/beacon/server";

export class ApiError extends Error {
statusCode: HttpErrorCodes;
constructor(statusCode: HttpErrorCodes, message?: string) {
super(message);
this.statusCode = statusCode;
}
}
export {ApiError};

export class StateNotFound extends ApiError {
constructor() {
Expand Down
7 changes: 0 additions & 7 deletions packages/beacon-node/src/api/impl/events/index.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,12 @@
import {routes, ServerApi} from "@lodestar/api";
import {ApiModules} from "../types.js";
import {ApiError} from "../errors.js";

export function getEventsApi({chain}: Pick<ApiModules, "chain" | "config">): ServerApi<routes.events.Api> {
const validTopics = new Set(Object.values(routes.events.eventTypes));

return {
async eventstream(topics, signal, onEvent) {
const onAbortFns: (() => void)[] = [];

for (const topic of topics) {
if (!validTopics.has(topic)) {
throw new ApiError(400, `Unknown topic ${topic}`);
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
const handler = (data: any): void => {
// TODO: What happens if this handler throws? Does it break the other chain.emitter listeners?
Expand Down
74 changes: 64 additions & 10 deletions packages/beacon-node/src/api/rest/activeSockets.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import http, {Server} from "node:http";
import {Socket} from "node:net";
import {waitFor} from "@lodestar/utils";
import {IGauge} from "../../metrics/index.js";

export type SocketMetrics = {
Expand All @@ -8,17 +9,23 @@ export type SocketMetrics = {
socketsBytesWritten: IGauge;
};

// Use relatively short timeout to speed up shutdown
const GRACEFUL_TERMINATION_TIMEOUT = 1_000;

/**
* From https://github.com/nodejs/node/blob/57bd715d527aba8dae56b975056961b0e429e91e/lib/_http_client.js#L363-L413
* But exposes the count of sockets, and does not have a graceful period
* From https://github.com/gajus/http-terminator/blob/aabca4751552e983f8a59ba896b7fb58ce3b4087/src/factories/createInternalHttpTerminator.ts#L24-L61
* But only handles HTTP sockets, exposes the count of sockets as metrics
*/
export class HttpActiveSocketsTracker {
private sockets = new Set<Socket>();
private terminated = false;
private terminating = false;

constructor(server: Server, metrics: SocketMetrics | null) {
constructor(
private readonly server: Server,
metrics: SocketMetrics | null
) {
server.on("connection", (socket) => {
if (this.terminated) {
if (this.terminating) {
socket.destroy(Error("Closing"));
} else {
this.sockets.add(socket);
Expand All @@ -39,18 +46,65 @@ export class HttpActiveSocketsTracker {
}
}

destroyAll(): void {
this.terminated = true;
/**
* Wait for all connections to drain, forcefully terminate any open connections after timeout
*
* From https://github.com/gajus/http-terminator/blob/aabca4751552e983f8a59ba896b7fb58ce3b4087/src/factories/createInternalHttpTerminator.ts#L78-L165
* But only handles HTTP sockets and does not close server, immediately closes eventstream API connections
*/
async terminate(): Promise<void> {
if (this.terminating) return;
this.terminating = true;

// Can speed up shutdown by a few milliseconds
this.server.closeIdleConnections();

// Inform new incoming requests on keep-alive connections that
// the connection will be closed after the current response
this.server.on("request", (_req, res) => {
if (!res.headersSent) {
res.setHeader("Connection", "close");
}
});

for (const socket of this.sockets) {
// This is the HTTP CONNECT request socket.
// @ts-expect-error Unclear if I am using wrong type or how else this should be handled.
// @ts-expect-error HTTP sockets have reference to server
if (!(socket.server instanceof http.Server)) {
continue;
}

socket.destroy(Error("Closing"));
this.sockets.delete(socket);
// @ts-expect-error Internal property but only way to access response of socket
const res = socket._httpMessage as http.ServerResponse | undefined;

if (res == null) {
// Immediately destroy sockets without an attached HTTP request
this.destroySocket(socket);
} else if (res.getHeader("Content-Type") === "text/event-stream") {
// eventstream API will never stop and must be forcefully closed
socket.end();
} else if (!res.headersSent) {
// Inform existing keep-alive connections that they will be closed after the current response
res.setHeader("Connection", "close");
}
}

// Wait for all connections to drain, forcefully terminate after timeout
try {
await waitFor(() => this.sockets.size === 0, {
timeout: GRACEFUL_TERMINATION_TIMEOUT,
});
} catch {
// Ignore timeout error
} finally {
for (const socket of this.sockets) {
this.destroySocket(socket);
}
}
}

private destroySocket(socket: Socket): void {
socket.destroy(Error("Closing"));
this.sockets.delete(socket);
}
}
25 changes: 6 additions & 19 deletions packages/beacon-node/src/api/rest/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,6 @@ export type RestApiServerMetrics = SocketMetrics & {
errors: IGauge<"operationId">;
};

enum Status {
Listening = "listening",
Closed = "closed",
}

/**
* REST API powered by `fastify` server.
*/
Expand All @@ -42,8 +37,6 @@ export class RestApiServer {
protected readonly logger: Logger;
private readonly activeSockets: HttpActiveSocketsTracker;

private status = Status.Closed;

constructor(
private readonly opts: RestApiServerOpts,
modules: RestApiServerModules
Expand Down Expand Up @@ -106,8 +99,7 @@ export class RestApiServer {
server.addHook("onError", async (req, _res, err) => {
// Don't log ErrorAborted errors, they happen on node shutdown and are not useful
// Don't log NodeISSyncing errors, they happen very frequently while syncing and the validator polls duties
// Don't log eventstream aborted errors if server instance is being closed on node shutdown
if (err instanceof ErrorAborted || err instanceof NodeIsSyncing || this.status === Status.Closed) return;
if (err instanceof ErrorAborted || err instanceof NodeIsSyncing) return;

const {operationId} = req.routeConfig as RouteConfig;

Expand All @@ -127,9 +119,6 @@ export class RestApiServer {
* Start the REST API server.
*/
async listen(): Promise<void> {
if (this.status === Status.Listening) return;
this.status = Status.Listening;

try {
const host = this.opts.address;
const address = await this.server.listen({port: this.opts.port, host});
Expand All @@ -139,7 +128,6 @@ export class RestApiServer {
}
} catch (e) {
this.logger.error("Error starting REST api server", this.opts, e as Error);
this.status = Status.Closed;
throw e;
}
}
Expand All @@ -148,17 +136,16 @@ export class RestApiServer {
* Close the server instance and terminate all existing connections.
*/
async close(): Promise<void> {
if (this.status === Status.Closed) return;
this.status = Status.Closed;

// In NodeJS land calling close() only causes new connections to be rejected.
// Existing connections can prevent .close() from resolving for potentially forever.
// In Lodestar case when the BeaconNode wants to close we will just abruptly terminate
// all existing connections for a fast shutdown.
// In Lodestar case when the BeaconNode wants to close we will attempt to gracefully
// close all existing connections but forcefully terminate after timeout for a fast shutdown.
// Inspired by https://github.com/gajus/http-terminator/
this.activeSockets.destroyAll();
await this.activeSockets.terminate();

await this.server.close();

this.logger.debug("REST API server closed");
}

/** For child classes to override */
Expand Down
8 changes: 5 additions & 3 deletions packages/beacon-node/src/metrics/server/http.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,17 +90,19 @@ export async function getHttpMetricsServer(
async close(): Promise<void> {
// In NodeJS land calling close() only causes new connections to be rejected.
// Existing connections can prevent .close() from resolving for potentially forever.
// In Lodestar case when the BeaconNode wants to close we will just abruptly terminate
// all existing connections for a fast shutdown.
// In Lodestar case when the BeaconNode wants to close we will attempt to gracefully
// close all existing connections but forcefully terminate after timeout for a fast shutdown.
// Inspired by https://github.com/gajus/http-terminator/
activeSockets.destroyAll();
await activeSockets.terminate();

await new Promise<void>((resolve, reject) => {
server.close((err) => {
if (err) reject(err);
else resolve();
});
});

logger.debug("Metrics HTTP server closed");
},
};
}
17 changes: 16 additions & 1 deletion packages/db/test/unit/controller/level.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import {execSync} from "node:child_process";
import os from "node:os";
import {expect} from "chai";
import leveldown from "leveldown";
import all from "it-all";
Expand Down Expand Up @@ -135,9 +136,23 @@ describe("LevelDB controller", () => {
expect(approxSize).gt(0, "approximateSize return not > 0");
});

function getDuCommand(): string {
if (os.platform() === "darwin") {
try {
const res = execSync("gdu --help", {encoding: "utf8"});
if (res?.startsWith("Usage: gdu ")) {
return "gdu";
}
} catch {
/* no-op */
}
}
return "du";
}

function getDbSize(): number {
// 116 ./.__testdb
const res = execSync(`du -bs ${dbLocation}`, {encoding: "utf8"});
const res = execSync(`${getDuCommand()} -bs ${dbLocation}`, {encoding: "utf8"});
const match = res.match(/^(\d+)/);
if (!match) throw Error(`Unknown du response \n${res}`);
return parseInt(match[1]);
Expand Down
6 changes: 4 additions & 2 deletions packages/prover/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
"type": "module",
"exports": {
".": {
"import": "./lib/index.js",
"browser": "./lib/index.web.js"
"import": "./lib/index.js"
},
"./browser": {
"import": "./lib/browser/index.js"
}
},
"bin": {
Expand Down
3 changes: 3 additions & 0 deletions packages/prover/src/browser/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export * from "../interfaces.js";
export * from "../proof_provider/index.js";
export {createVerifiedExecutionProvider} from "../web3_provider.js";
2 changes: 1 addition & 1 deletion packages/prover/src/constants.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// https://github.com/ethereum/consensus-specs/blob/dev/specs/altair/light-client/p2p-interface.md#configuration
export const MAX_REQUEST_LIGHT_CLIENT_UPDATES = 128;
export const MAX_PAYLOAD_HISTORY = 32;
export const UNVERIFIED_RESPONSE_CODE = -33091;
export const VERIFICATION_FAILED_RESPONSE_CODE = -33091;
export const ZERO_ADDRESS = "0x0000000000000000000000000000000000000000";
export const DEFAULT_PROXY_REQUEST_TIMEOUT = 3000;
2 changes: 2 additions & 0 deletions packages/prover/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
export * from "./interfaces.js";
export * from "./proof_provider/index.js";
export {createVerifiedExecutionProvider} from "./web3_provider.js";
export {createVerifiedExecutionProxy} from "./web3_proxy.js";
export {isVerificationFailedError} from "./utils/json_rpc.js";
2 changes: 0 additions & 2 deletions packages/prover/src/index.web.ts

This file was deleted.

1 change: 1 addition & 0 deletions packages/prover/src/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {ProofProvider} from "./proof_provider/proof_provider.js";
import {JsonRpcRequest, JsonRpcRequestOrBatch, JsonRpcResponse, JsonRpcResponseOrBatch} from "./types.js";
import {ELRpc} from "./utils/rpc.js";

export {NetworkName} from "@lodestar/config/networks";
export enum LCTransport {
Rest = "Rest",
P2P = "P2P",
Expand Down
1 change: 1 addition & 0 deletions packages/prover/src/proof_provider/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from "./proof_provider.js";
15 changes: 13 additions & 2 deletions packages/prover/src/utils/evm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,19 @@ export async function getVMWithState({

const batchRequests = [];
for (const [address, storageKeys] of Object.entries(storageKeysMap)) {
batchRequests.push({jsonrpc: "2.0", method: "eth_getProof", params: [address, storageKeys, blockHashHex]});
batchRequests.push({jsonrpc: "2.0", method: "eth_getCode", params: [address, blockHashHex]});
batchRequests.push({
jsonrpc: "2.0",
id: rpc.getRequestId(),
method: "eth_getProof",
params: [address, storageKeys, blockHashHex],
});

batchRequests.push({
jsonrpc: "2.0",
id: rpc.getRequestId(),
method: "eth_getCode",
params: [address, blockHashHex],
});
}

// If all responses are valid then we will have even number of responses
Expand Down
Loading

0 comments on commit 3ea4321

Please sign in to comment.