Skip to content

Commit

Permalink
fix: gracefully terminate connections when closing http server
Browse files Browse the repository at this point in the history
  • Loading branch information
nflaig committed Jul 21, 2023
1 parent 3257345 commit d2bf447
Show file tree
Hide file tree
Showing 6 changed files with 159 additions and 11 deletions.
61 changes: 56 additions & 5 deletions packages/beacon-node/src/api/rest/activeSockets.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import http, {Server} from "node:http";
import {Socket} from "node:net";
import {waitFor} from "@lodestar/utils";
import {IGauge} from "../../metrics/index.js";

export type SocketMetrics = {
Expand All @@ -8,15 +9,21 @@ export type SocketMetrics = {
socketsBytesWritten: IGauge;
};

// Use relatively short timeout to speed up shutdown
const GRACEFUL_TERMINATION_TIMEOUT = 1_000;

/**
* From https://github.com/nodejs/node/blob/57bd715d527aba8dae56b975056961b0e429e91e/lib/_http_client.js#L363-L413
* But exposes the count of sockets, and does not have a graceful period
* But exposes the count of sockets, and waits for connections to drain until timeout
*/
export class HttpActiveSocketsTracker {
private sockets = new Set<Socket>();
private terminated = false;

constructor(server: Server, metrics: SocketMetrics | null) {
constructor(
private readonly server: Server,
metrics: SocketMetrics | null
) {
server.on("connection", (socket) => {
if (this.terminated) {
socket.destroy(Error("Closing"));
Expand All @@ -39,18 +46,62 @@ export class HttpActiveSocketsTracker {
}
}

destroyAll(): void {
/**
* Wait for all connections to drain, forcefully terminate any open connections after timeout
*
* From https://github.com/gajus/http-terminator/blob/aabca4751552e983f8a59ba896b7fb58ce3b4087/src/factories/createInternalHttpTerminator.ts#L78-L165
* But only handles HTTP sockets and does not close server, immediately terminates api.eventstream connections
*/
async terminate(): Promise<void> {
if (this.terminated) return;
this.terminated = true;

// Inform new incoming requests (and keep-alive connections) that
// the connection will be closed after the current response
this.server.on("request", (_req, res) => {
if (!res.headersSent) {
res.setHeader("Connection", "close");
}
});

for (const socket of this.sockets) {
// This is the HTTP CONNECT request socket.
// @ts-expect-error Unclear if I am using wrong type or how else this should be handled.
if (!(socket.server instanceof http.Server)) {
continue;
}

socket.destroy(Error("Closing"));
this.sockets.delete(socket);
// @ts-expect-error Unclear if I am using wrong type or how else this should be handled.
const serverResponse = socket._httpMessage as http.ServerResponse | undefined;

if (serverResponse == null) {
// Immediately destroy sockets without an attached HTTP request
this.destroySocket(socket);
} else if (serverResponse.getHeader("Content-Type") === "text/event-stream") {
// api.eventstream will never stop and must be forcefully terminated
this.destroySocket(socket);
} else if (!serverResponse.headersSent) {
// Inform existing keep-alive connections that they will be closed after the current response
serverResponse.setHeader("Connection", "close");
}
}

// Wait for all connections to drain, forcefully terminate after timeout
try {
await waitFor(() => this.sockets.size === 0, {
timeout: GRACEFUL_TERMINATION_TIMEOUT,
});
} catch {
// Ignore timeout errors
} finally {
for (const socket of this.sockets) {
this.destroySocket(socket);
}
}
}

private destroySocket(socket: Socket): void {
socket.destroy(Error("Closing"));
this.sockets.delete(socket);
}
}
8 changes: 5 additions & 3 deletions packages/beacon-node/src/api/rest/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -153,12 +153,14 @@ export class RestApiServer {

// In NodeJS land calling close() only causes new connections to be rejected.
// Existing connections can prevent .close() from resolving for potentially forever.
// In Lodestar case when the BeaconNode wants to close we will just abruptly terminate
// all existing connections for a fast shutdown.
// In Lodestar case when the BeaconNode wants to close we will attempt to gracefully
// close all existing connections but forcefully terminate after timeout for a fast shutdown.
// Inspired by https://github.com/gajus/http-terminator/
this.activeSockets.destroyAll();
await this.activeSockets.terminate();

await this.server.close();

this.logger.debug("REST API server closed");
}

/** For child classes to override */
Expand Down
8 changes: 5 additions & 3 deletions packages/beacon-node/src/metrics/server/http.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,17 +90,19 @@ export async function getHttpMetricsServer(
async close(): Promise<void> {
// In NodeJS land calling close() only causes new connections to be rejected.
// Existing connections can prevent .close() from resolving for potentially forever.
// In Lodestar case when the BeaconNode wants to close we will just abruptly terminate
// all existing connections for a fast shutdown.
// In Lodestar case when the BeaconNode wants to close we will attempt to gracefully
// close all existing connections but forcefully terminate after timeout for a fast shutdown.
// Inspired by https://github.com/gajus/http-terminator/
activeSockets.destroyAll();
await activeSockets.terminate();

await new Promise<void>((resolve, reject) => {
server.close((err) => {
if (err) reject(err);
else resolve();
});
});

logger.debug("Metrics HTTP server closed");
},
};
}
1 change: 1 addition & 0 deletions packages/utils/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ export * from "./timeout.js";
export {RecursivePartial, bnToNum} from "./types.js";
export * from "./verifyMerkleBranch.js";
export * from "./promise.js";
export * from "./waitFor.js";
55 changes: 55 additions & 0 deletions packages/utils/src/waitFor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import {ErrorAborted, TimeoutError} from "./errors.js";

export type WaitForOpts = {
/** Time in milliseconds between checking condition */
interval?: number;
/** Time in milliseconds to wait before throwing TimeoutError */
timeout?: number;
/** Signal to abort waiting for condition by throwing ErrorAborted */
signal?: AbortSignal;
};

/**
* Wait for a condition to be true
*
* Simplified and abortable implementation of https://github.com/sindresorhus/p-wait-for
*/
export function waitFor(condition: () => boolean, opts: WaitForOpts = {}): Promise<void> {
return new Promise((resolve, reject) => {
const {interval = 10, timeout = Infinity, signal} = opts;

if (signal && signal.aborted) {
return reject(new ErrorAborted());
}

if (condition()) {
return resolve();
}

let onDone: () => void = () => {};

const timeoutId = setTimeout(() => {
onDone();
reject(new TimeoutError());
}, timeout);

const intervalId = setInterval(() => {
if (condition()) {
onDone();
resolve();
}
}, interval);

const onAbort = (): void => {
onDone();
reject(new ErrorAborted());
};
if (signal) signal.addEventListener("abort", onAbort);

onDone = () => {
clearTimeout(timeoutId);
clearInterval(intervalId);
if (signal) signal.removeEventListener("abort", onAbort);
};
});
}
37 changes: 37 additions & 0 deletions packages/utils/test/unit/waitFor.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import "../setup.js";
import {expect} from "chai";
import {waitFor} from "../../src/waitFor.js";
import {ErrorAborted, TimeoutError} from "../../src/errors.js";

describe("waitFor", () => {
const interval = 10;
const timeout = 20;

it("Should resolve if condition is already true", async () => {
await expect(waitFor(() => true, {interval, timeout})).to.be.fulfilled;
});

it("Should resolve if condition becomes true within timeout", async () => {
let condition = false;
setTimeout(() => {
condition = true;
}, interval);
await waitFor(() => condition, {interval, timeout});
});

it("Should reject with TimeoutError if condition does not become true within timeout", async () => {
await expect(waitFor(() => false, {interval, timeout})).to.be.rejectedWith(TimeoutError);
});

it("Should reject with ErrorAborted if aborted before condition becomes true", async () => {
const controller = new AbortController();
setTimeout(() => controller.abort(), interval);
await expect(waitFor(() => false, {interval, timeout, signal: controller.signal})).to.be.rejectedWith(ErrorAborted);
});

it("Should reject with ErrorAborted if signal is already aborted", async () => {
const controller = new AbortController();
controller.abort();
await expect(waitFor(() => true, {interval, timeout, signal: controller.signal})).to.be.rejectedWith(ErrorAborted);
});
});

0 comments on commit d2bf447

Please sign in to comment.