From 1a35ee8971c6687a907bed2c0fbd5c1fcdf880fb Mon Sep 17 00:00:00 2001 From: Nico Flaig Date: Mon, 15 Jul 2024 15:19:58 +0100 Subject: [PATCH] feat: add option to disable thread pool for keystore decryption (#6949) --- .../keymanager/decryptKeystoreDefinitions.ts | 69 ++++++++++++++----- packages/cli/src/cmds/validator/options.ts | 8 +++ .../cli/src/cmds/validator/signers/index.ts | 2 + .../decryptKeystoreDefinitions.test.ts | 23 ++++--- 4 files changed, 77 insertions(+), 25 deletions(-) diff --git a/packages/cli/src/cmds/validator/keymanager/decryptKeystoreDefinitions.ts b/packages/cli/src/cmds/validator/keymanager/decryptKeystoreDefinitions.ts index 2901fd6cdfb5..076da70adcdb 100644 --- a/packages/cli/src/cmds/validator/keymanager/decryptKeystoreDefinitions.ts +++ b/packages/cli/src/cmds/validator/keymanager/decryptKeystoreDefinitions.ts @@ -1,5 +1,7 @@ +import fs from "node:fs"; import path from "node:path"; import bls from "@chainsafe/bls"; +import {Keystore} from "@chainsafe/bls-keystore"; import {SignerLocal, SignerType} from "@lodestar/validator"; import {LogLevel, Logger} from "@lodestar/utils"; import {lockFilepath, unlockFilepath} from "../../../util/lockfile.js"; @@ -7,11 +9,13 @@ import {LocalKeystoreDefinition} from "./interface.js"; import {clearKeystoreCache, loadKeystoreCache, writeKeystoreCache} from "./keystoreCache.js"; import {DecryptKeystoresThreadPool} from "./decryptKeystores/index.js"; -type KeystoreDecryptOptions = { +export type KeystoreDecryptOptions = { ignoreLockFile?: boolean; onDecrypt?: (index: number) => void; // Try to use the cache file if it exists cacheFilePath?: string; + /** Use main thread to decrypt keystores */ + disableThreadPool?: boolean; logger: Pick; signal: AbortSignal; }; @@ -57,14 +61,50 @@ export async function decryptKeystoreDefinitions( const signers = new Array(keystoreCount); const passwords = new Array(keystoreCount); const errors: KeystoreDecryptError[] = []; - const decryptKeystores = new DecryptKeystoresThreadPool(keystoreCount, opts.signal); - for (const [index, definition] of keystoreDefinitions.entries()) { - lockKeystore(definition.keystorePath, opts); + if (!opts.disableThreadPool) { + const decryptKeystores = new DecryptKeystoresThreadPool(keystoreCount, opts.signal); + + for (const [index, definition] of keystoreDefinitions.entries()) { + lockKeystore(definition.keystorePath, opts); + + decryptKeystores.queue( + definition, + (secretKeyBytes: Uint8Array) => { + const signer: SignerLocal = { + type: SignerType.Local, + secretKey: bls.SecretKey.fromBytes(secretKeyBytes), + }; + + signers[index] = signer; + passwords[index] = definition.password; + + if (opts?.onDecrypt) { + opts?.onDecrypt(index); + } + }, + (error: Error) => { + // In-progress tasks can't be canceled, so there's a chance that multiple errors may be caught + // add to the list of errors + errors.push({keystoreFile: path.basename(definition.keystorePath), error}); + // cancel all pending tasks, no need to continue decrypting after we hit one error + decryptKeystores.cancel(); + } + ); + } + + await decryptKeystores.completed(); + } else { + // Decrypt keystores in main thread + for (const [index, definition] of keystoreDefinitions.entries()) { + lockKeystore(definition.keystorePath, opts); + + try { + const keystore = Keystore.parse(fs.readFileSync(definition.keystorePath, "utf8")); + + // Memory-hogging function + const secretKeyBytes = await keystore.decrypt(definition.password); - decryptKeystores.queue( - definition, - (secretKeyBytes: Uint8Array) => { const signer: SignerLocal = { type: SignerType.Local, secretKey: bls.SecretKey.fromBytes(secretKeyBytes), @@ -76,19 +116,14 @@ export async function decryptKeystoreDefinitions( if (opts?.onDecrypt) { opts?.onDecrypt(index); } - }, - (error: Error) => { - // In-progress tasks can't be canceled, so there's a chance that multiple errors may be caught - // add to the list of errors - errors.push({keystoreFile: path.basename(definition.keystorePath), error}); - // cancel all pending tasks, no need to continue decrypting after we hit one error - decryptKeystores.cancel(); + } catch (e) { + errors.push({keystoreFile: path.basename(definition.keystorePath), error: e as Error}); + // stop processing, no need to continue decrypting after we hit one error + break; } - ); + } } - await decryptKeystores.completed(); - if (errors.length > 0) { // If an error occurs, the program isn't going to be running, // so we should unlock all lockfiles we created diff --git a/packages/cli/src/cmds/validator/options.ts b/packages/cli/src/cmds/validator/options.ts index 08548edb1072..fc7cb197c5aa 100644 --- a/packages/cli/src/cmds/validator/options.ts +++ b/packages/cli/src/cmds/validator/options.ts @@ -55,6 +55,7 @@ export type IValidatorCliArgs = AccountValidatorArgs & importKeystores?: string[]; importKeystoresPassword?: string; + disableKeystoresThreadPool?: boolean; "http.requestWireFormat"?: string; "http.responseWireFormat"?: string; @@ -301,6 +302,13 @@ export const validatorOptions: CliCommandOptions = { type: "string", }, + disableKeystoresThreadPool: { + hidden: true, + description: + "Disable thread pool and instead use main thread to decrypt keystores. This can speed up decryption in testing environments like Kurtosis", + type: "boolean", + }, + doppelgangerProtection: { alias: ["doppelgangerProtectionEnabled"], description: "Enables Doppelganger protection", diff --git a/packages/cli/src/cmds/validator/signers/index.ts b/packages/cli/src/cmds/validator/signers/index.ts index be028461c0ee..95daf7e69b0d 100644 --- a/packages/cli/src/cmds/validator/signers/index.ts +++ b/packages/cli/src/cmds/validator/signers/index.ts @@ -99,6 +99,7 @@ export async function getSignersFromArgs( ignoreLockFile: args.force, onDecrypt: needle, cacheFilePath: path.join(accountPaths.cacheDir, "imported_keystores.cache"), + disableThreadPool: args["disableKeystoresThreadPool"], logger, signal, }); @@ -133,6 +134,7 @@ export async function getSignersFromArgs( ignoreLockFile: args.force, onDecrypt: needle, cacheFilePath: path.join(accountPaths.cacheDir, "local_keystores.cache"), + disableThreadPool: args["disableKeystoresThreadPool"], logger, signal, }); diff --git a/packages/cli/test/unit/validator/decryptKeystoreDefinitions.test.ts b/packages/cli/test/unit/validator/decryptKeystoreDefinitions.test.ts index 8f1a82a55c3e..10f6b34bd152 100644 --- a/packages/cli/test/unit/validator/decryptKeystoreDefinitions.test.ts +++ b/packages/cli/test/unit/validator/decryptKeystoreDefinitions.test.ts @@ -5,7 +5,10 @@ import {rimraf} from "rimraf"; import {getKeystoresStr} from "@lodestar/test-utils"; import {cachedSeckeysHex} from "../../utils/cachedKeys.js"; import {testFilesDir} from "../../utils.js"; -import {decryptKeystoreDefinitions} from "../../../src/cmds/validator/keymanager/decryptKeystoreDefinitions.js"; +import { + decryptKeystoreDefinitions, + KeystoreDecryptOptions, +} from "../../../src/cmds/validator/keymanager/decryptKeystoreDefinitions.js"; import {LocalKeystoreDefinition} from "../../../src/cmds/validator/keymanager/interface.js"; import {LockfileError, unlockFilepath} from "../../../src/util/lockfile.js"; @@ -56,16 +59,20 @@ describe("decryptKeystoreDefinitions", () => { } }); - testDecryptKeystoreDefinitions(cacheFilePath); + testDecryptKeystoreDefinitions({cacheFilePath}); }); describe("without keystore cache", () => { testDecryptKeystoreDefinitions(); }); - function testDecryptKeystoreDefinitions(cacheFilePath?: string): void { + describe("disabled thread pool", () => { + testDecryptKeystoreDefinitions({disableThreadPool: true}); + }); + + function testDecryptKeystoreDefinitions(opts?: Partial): void { it("decrypt keystores", async () => { - const signers = await decryptKeystoreDefinitions(definitions, {logger: console, signal, cacheFilePath}); + const signers = await decryptKeystoreDefinitions(definitions, {logger: console, signal, ...opts}); expect(signers.length).toBe(secretKeys.length); for (const signer of signers) { const hexSecret = signer.secretKey.toHex(); @@ -75,11 +82,11 @@ describe("decryptKeystoreDefinitions", () => { }); it("fail to decrypt keystores if lockfiles already exist", async () => { - await decryptKeystoreDefinitions(definitions, {logger: console, signal, cacheFilePath}); + await decryptKeystoreDefinitions(definitions, {logger: console, signal, ...opts}); // lockfiles should exist after the first run try { - await decryptKeystoreDefinitions(definitions, {logger: console, signal, cacheFilePath}); + await decryptKeystoreDefinitions(definitions, {logger: console, signal, ...opts}); expect.fail("Second decrypt should fail due to failure to get lockfile"); } catch (e) { expect((e as LockfileError).code).toBe("ELOCKED"); @@ -87,10 +94,10 @@ describe("decryptKeystoreDefinitions", () => { }); it("decrypt keystores if lockfiles already exist if ignoreLockFile=true", async () => { - await decryptKeystoreDefinitions(definitions, {logger: console, signal, cacheFilePath}); + await decryptKeystoreDefinitions(definitions, {logger: console, signal, ...opts}); // lockfiles should exist after the first run - await decryptKeystoreDefinitions(definitions, {logger: console, signal, cacheFilePath, ignoreLockFile: true}); + await decryptKeystoreDefinitions(definitions, {logger: console, signal, ...opts, ignoreLockFile: true}); }); } });