Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: use binary diff to persist finalized states #7005

Open
wants to merge 9 commits into
base: feature/differential-archive
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
746 changes: 746 additions & 0 deletions dashboards/lodestar_historical_state_regen.json

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions packages/beacon-node/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@
"@chainsafe/ssz": "^0.17.1",
"@chainsafe/threads": "^1.11.1",
"@chainsafe/pubkey-index-map": "2.0.0",
"@chainsafe/xdelta3-node": "^1.0.2",
"@ethersproject/abi": "^5.7.0",
"@fastify/bearer-auth": "^10.0.1",
"@fastify/cors": "^10.0.1",
Expand Down
10 changes: 9 additions & 1 deletion packages/beacon-node/src/api/impl/lodestar/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import {GossipType} from "../../../network/index.js";
import {IBeaconDb} from "../../../db/interface.js";
import {ApiModules} from "../types.js";
import {profileNodeJS, writeHeapSnapshot} from "../../../util/profile.js";
import {StateArchiveMode} from "../../../chain/options.js";

export function getLodestarApi({
chain,
Expand Down Expand Up @@ -185,7 +186,14 @@ export function getLodestarApi({
},

async dumpDbStateIndex() {
return {data: await db.stateArchive.dumpRootIndexEntries()};
switch (chain.opts.stateArchiveMode) {
case StateArchiveMode.Frequency: {
return {data: await db.stateArchive.dumpRootIndexEntries()};
}
case StateArchiveMode.Differential: {
return {data: await db.hierarchicalStateArchiveRepository.dumpRootIndexEntries()};
}
}
},
};
}
Expand Down
27 changes: 23 additions & 4 deletions packages/beacon-node/src/chain/archiver/archiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {Metrics} from "../../metrics/metrics.js";
import {FrequencyStateArchiveStrategy} from "./strategies/frequencyStateArchiveStrategy.js";
import {archiveBlocks} from "./archiveBlocks.js";
import {StateArchiveMode, ArchiverOpts, StateArchiveStrategy} from "./interface.js";
import {DifferentialStateArchiveStrategy} from "./strategies/diffStateArchiveStrategy.js";

export const DEFAULT_STATE_ARCHIVE_MODE = StateArchiveMode.Frequency;

Expand All @@ -33,10 +34,28 @@ export class Archiver {
opts: ArchiverOpts,
private readonly metrics?: Metrics | null
) {
if (opts.stateArchiveMode === StateArchiveMode.Frequency) {
this.statesArchiverStrategy = new FrequencyStateArchiveStrategy(chain.regen, db, logger, opts, chain.bufferPool);
} else {
throw new Error(`State archive strategy "${opts.stateArchiveMode}" currently not supported.`);
switch (opts.stateArchiveMode) {
case StateArchiveMode.Frequency:
this.statesArchiverStrategy = new FrequencyStateArchiveStrategy(
chain.regen,
db,
logger,
opts,
chain.bufferPool
);
break;
case StateArchiveMode.Differential:
this.statesArchiverStrategy = new DifferentialStateArchiveStrategy(
chain.historicalStateRegen,
chain.regen,
db,
logger,
opts,
chain.bufferPool
);
break;
default:
throw new Error(`State archive strategy "${opts.stateArchiveMode}" currently not supported.`);
}

this.stateArchiveMode = opts.stateArchiveMode;
Expand Down
4 changes: 1 addition & 3 deletions packages/beacon-node/src/chain/archiver/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@ import {RootHex} from "@lodestar/types";

export enum StateArchiveMode {
Frequency = "frequency",
// New strategy to be implemented
// WIP: https://github.com/ChainSafe/lodestar/pull/7005
// Differential = "diff",
Differential = "diff",
}

export interface StatesArchiverOpts {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import {CheckpointWithHex} from "@lodestar/fork-choice";
import {RootHex} from "@lodestar/types";
import {Metrics} from "../../../metrics/metrics.js";
import {StateArchiveStrategy, StatesArchiverOpts} from "../interface.js";
import {IStateRegenerator} from "../../regen/interface.js";
import {IBeaconDb} from "../../../db/interface.js";
import {Logger} from "@lodestar/logger";
import {BufferPool} from "../../../util/bufferPool.js";
import {IHistoricalStateRegen} from "../../historicalState/types.js";
import {CachedBeaconStateAllForks, computeStartSlotAtEpoch} from "@lodestar/state-transition";

export class DifferentialStateArchiveStrategy implements StateArchiveStrategy {
constructor(
private readonly historicalStateRegen: IHistoricalStateRegen | undefined,
private readonly regen: IStateRegenerator,
private readonly db: IBeaconDb,
private readonly logger: Logger,
private readonly opts: StatesArchiverOpts,
private readonly bufferPool?: BufferPool | null
) {}

onCheckpoint(_stateRoot: RootHex, _metrics?: Metrics | null): Promise<void> {
throw new Error("Method not implemented.");
}

onFinalizedCheckpoint(_finalized: CheckpointWithHex, _metrics?: Metrics | null): Promise<void> {
throw new Error("Method not implemented.");
}

async maybeArchiveState(finalized: CheckpointWithHex): Promise<void> {
// starting from Mar 2024, the finalized state could be from disk or in memory
const state = await this.regen.getCheckpointStateOrBytes(finalized);
if (state === null) {
this.logger.warn("Checkpoint state not available to archive.", {epoch: finalized.epoch, root: finalized.rootHex});
return;
}

if (Array.isArray(state) && state.constructor === Uint8Array) {
return this.historicalStateRegen?.storeHistoricalState(computeStartSlotAtEpoch(finalized.epoch), state);
}

return this.historicalStateRegen?.storeHistoricalState(
(state as CachedBeaconStateAllForks).slot,
(state as CachedBeaconStateAllForks).serialize()
);
}
}
26 changes: 20 additions & 6 deletions packages/beacon-node/src/chain/chain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ import {
CommonBlockBody,
FindHeadFnName,
} from "./interface.js";
import {IChainOptions} from "./options.js";
import {IChainOptions, StateArchiveMode} from "./options.js";
import {QueuedStateRegenerator, RegenCaller} from "./regen/index.js";
import {ForkchoiceCaller, initializeForkChoice} from "./forkChoice/index.js";
import {IBlsVerifier, BlsSingleThreadVerifier, BlsMultiThreadWorkerPool} from "./bls/index.js";
Expand Down Expand Up @@ -89,7 +89,7 @@ import {BlockAttributes, produceBlockBody, produceCommonBlockBody} from "./produ
import {computeNewStateRoot} from "./produceBlock/computeNewStateRoot.js";
import {BlockInput} from "./blocks/types.js";
import {SeenAttestationDatas} from "./seenCache/seenAttestationData.js";
import {HistoricalStateRegen} from "./historicalState/index.js";
import {IHistoricalStateRegen} from "./historicalState/index.js";
import {BlockRewards, computeBlockRewards} from "./rewards/blockRewards.js";
import {ShufflingCache} from "./shufflingCache.js";
import {BlockStateCacheImpl} from "./stateCache/blockStateCacheImpl.js";
Expand Down Expand Up @@ -130,7 +130,7 @@ export class BeaconChain implements IBeaconChain {
readonly regen: QueuedStateRegenerator;
readonly lightClientServer?: LightClientServer;
readonly reprocessController: ReprocessController;
readonly historicalStateRegen?: HistoricalStateRegen;
readonly historicalStateRegen?: IHistoricalStateRegen;

// Ops pool
readonly attestationPool: AttestationPool;
Expand Down Expand Up @@ -201,7 +201,7 @@ export class BeaconChain implements IBeaconChain {
eth1: IEth1ForBlockProduction;
executionEngine: IExecutionEngine;
executionBuilder?: IExecutionBuilder;
historicalStateRegen?: HistoricalStateRegen;
historicalStateRegen?: IHistoricalStateRegen;
}
) {
this.opts = opts;
Expand Down Expand Up @@ -532,8 +532,22 @@ export class BeaconChain implements IBeaconChain {
};
}

const data = await this.db.stateArchive.getByRoot(fromHex(stateRoot));
return data && {state: data, executionOptimistic: false, finalized: true};
switch (this.opts.stateArchiveMode) {
case StateArchiveMode.Frequency: {
const data = await this.db.stateArchive.getByRoot(fromHex(stateRoot));
return data && {state: data, executionOptimistic: false, finalized: true};
}
case StateArchiveMode.Differential: {
const slot = await this.db.hierarchicalStateArchiveRepository.getSlotByRoot(fromHex(stateRoot));
if (!slot) return null;

const stateBytes = await this.historicalStateRegen?.getHistoricalState(slot);
if (!stateBytes) return null;

const state = this.config.getForkTypes(slot).BeaconState.deserialize(stateBytes);
return {state: state as unknown as BeaconStateAllForks, executionOptimistic: false, finalized: true};
}
}
}

getStateByCheckpoint(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import path from "node:path";
import {ModuleThread, Thread, spawn, Worker} from "@chainsafe/threads";
import {chainConfigToJson} from "@lodestar/config";
import {LoggerNode} from "@lodestar/logger/node";
import {
HistoricalStateRegenInitModules,
HistoricalStateRegenModules,
HistoricalStateWorkerApi,
HistoricalStateWorkerData,
IHistoricalStateRegen,
} from "./types.js";
import {HierarchicalLayers} from "./utils/hierarchicalLayers.js";
import {StateArchiveMode} from "../archiver/interface.js";

// Worker constructor consider the path relative to the current working directory
const WORKER_DIR = process.env.NODE_ENV === "test" ? "../../../lib/chain/historicalState" : "./";

/**
* HistoricalStateRegen use hierarchical binary difference to minimize the effort and storage requirement to regenerate historical state
* As its compute intensive job, it will use a separate worker thread.
*
* @see following [doc](../../../../docs/pages/contribution/advance-topics/historical-state-regen.md) for further details.
*/
export class HistoricalStateRegen implements IHistoricalStateRegen {
private readonly api: ModuleThread<HistoricalStateWorkerApi>;
private readonly logger: LoggerNode;
private readonly stateArchiveMode: StateArchiveMode;

constructor(modules: HistoricalStateRegenModules) {
this.api = modules.api;
this.logger = modules.logger;
this.stateArchiveMode = modules.stateArchiveMode;
modules.signal?.addEventListener("abort", () => this.close(), {once: true});
}
static async init(modules: HistoricalStateRegenInitModules): Promise<HistoricalStateRegen> {
const workerData: HistoricalStateWorkerData = {
chainConfigJson: chainConfigToJson(modules.config),
genesisValidatorsRoot: modules.config.genesisValidatorsRoot,
genesisTime: modules.opts.genesisTime,
maxConcurrency: 1,
maxLength: 50,
dbLocation: modules.opts.dbLocation,
metricsEnabled: Boolean(modules.metrics),
loggerOpts: modules.logger.toOpts(),
hierarchicalLayersConfig: modules.hierarchicalLayersConfig ?? HierarchicalLayers.fromString().toString(),
};

const worker = new Worker(path.join(WORKER_DIR, "worker.js"), {
workerData,
} as ConstructorParameters<typeof Worker>[1]);

const api = await spawn<HistoricalStateWorkerApi>(worker, {
// A Lodestar Node may do very expensive task at start blocking the event loop and causing
// the initialization to timeout. The number below is big enough to almost disable the timeout
timeout: 5 * 60 * 1000,
});

return new HistoricalStateRegen({...modules, api});
}

async scrapeMetrics(): Promise<string> {
return this.api.scrapeMetrics();
}

async close(): Promise<void> {
await this.api.close();
this.logger.debug("Terminating historical state worker");
await Thread.terminate(this.api);
this.logger.debug("Terminated historical state worker");
}

async getHistoricalState(slot: number): Promise<Uint8Array | null> {
return this.api.getHistoricalState(slot, this.stateArchiveMode);
}

async storeHistoricalState(slot: number, stateBytes: Uint8Array): Promise<void> {
return this.api.storeHistoricalState(slot, stateBytes, this.stateArchiveMode);
}
}
70 changes: 3 additions & 67 deletions packages/beacon-node/src/chain/historicalState/index.ts
Original file line number Diff line number Diff line change
@@ -1,67 +1,3 @@
import path from "node:path";
import {ModuleThread, Thread, spawn, Worker} from "@chainsafe/threads";
import {chainConfigToJson} from "@lodestar/config";
import {LoggerNode} from "@lodestar/logger/node";
import {
HistoricalStateRegenInitModules,
HistoricalStateRegenModules,
HistoricalStateWorkerApi,
HistoricalStateWorkerData,
} from "./types.js";

// Worker constructor consider the path relative to the current working directory
const WORKER_DIR = process.env.NODE_ENV === "test" ? "../../../lib/chain/historicalState" : "./";

/**
* HistoricalStateRegen limits the damage from recreating historical states
* by running regen in a separate worker thread.
*/
export class HistoricalStateRegen implements HistoricalStateWorkerApi {
private readonly api: ModuleThread<HistoricalStateWorkerApi>;
private readonly logger: LoggerNode;

constructor(modules: HistoricalStateRegenModules) {
this.api = modules.api;
this.logger = modules.logger;
modules.signal?.addEventListener("abort", () => this.close(), {once: true});
}
static async init(modules: HistoricalStateRegenInitModules): Promise<HistoricalStateRegen> {
const workerData: HistoricalStateWorkerData = {
chainConfigJson: chainConfigToJson(modules.config),
genesisValidatorsRoot: modules.config.genesisValidatorsRoot,
genesisTime: modules.opts.genesisTime,
maxConcurrency: 1,
maxLength: 50,
dbLocation: modules.opts.dbLocation,
metricsEnabled: Boolean(modules.metrics),
loggerOpts: modules.logger.toOpts(),
};

const worker = new Worker(path.join(WORKER_DIR, "worker.js"), {
workerData,
} as ConstructorParameters<typeof Worker>[1]);

const api = await spawn<HistoricalStateWorkerApi>(worker, {
// A Lodestar Node may do very expensive task at start blocking the event loop and causing
// the initialization to timeout. The number below is big enough to almost disable the timeout
timeout: 5 * 60 * 1000,
});

return new HistoricalStateRegen({...modules, api});
}

async scrapeMetrics(): Promise<string> {
return this.api.scrapeMetrics();
}

async close(): Promise<void> {
await this.api.close();
this.logger.debug("Terminating historical state worker");
await Thread.terminate(this.api);
this.logger.debug("Terminated historical state worker");
}

async getHistoricalState(slot: number): Promise<Uint8Array> {
return this.api.getHistoricalState(slot);
}
}
export * from "./historicalStateRegen.js";
export * from "./types.js";
export * from "./utils/hierarchicalLayers.js";
Loading
Loading