Skip to content

Commit

Permalink
feat: regen to consume state cache reload api (#6456)
Browse files Browse the repository at this point in the history
* feat: regen to consume state cache reload api

* chore: address PR comments
  • Loading branch information
twoeths authored Mar 4, 2024
1 parent 3af0db1 commit d10ed38
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 27 deletions.
4 changes: 3 additions & 1 deletion packages/beacon-node/src/chain/regen/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ export enum RegenErrorCode {
TOO_MANY_BLOCK_PROCESSED = "REGEN_ERROR_TOO_MANY_BLOCK_PROCESSED",
BLOCK_NOT_IN_DB = "REGEN_ERROR_BLOCK_NOT_IN_DB",
STATE_TRANSITION_ERROR = "REGEN_ERROR_STATE_TRANSITION_ERROR",
INVALID_STATE_ROOT = "REGEN_ERROR_INVALID_STATE_ROOT",
}

export type RegenErrorType =
Expand All @@ -17,7 +18,8 @@ export type RegenErrorType =
| {code: RegenErrorCode.NO_SEED_STATE}
| {code: RegenErrorCode.TOO_MANY_BLOCK_PROCESSED; stateRoot: RootHex | Root}
| {code: RegenErrorCode.BLOCK_NOT_IN_DB; blockRoot: RootHex | Root}
| {code: RegenErrorCode.STATE_TRANSITION_ERROR; error: Error};
| {code: RegenErrorCode.STATE_TRANSITION_ERROR; error: Error}
| {code: RegenErrorCode.INVALID_STATE_ROOT; slot: Slot; expected: RootHex; actual: RootHex};

export class RegenError extends Error {
type: RegenErrorType;
Expand Down
94 changes: 68 additions & 26 deletions packages/beacon-node/src/chain/regen/regen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,29 +10,34 @@ import {
stateTransition,
} from "@lodestar/state-transition";
import {IForkChoice, ProtoBlock} from "@lodestar/fork-choice";
import {sleep} from "@lodestar/utils";
import {Logger, sleep} from "@lodestar/utils";
import {SLOTS_PER_EPOCH} from "@lodestar/params";
import {ChainForkConfig} from "@lodestar/config";
import {Metrics} from "../../metrics/index.js";
import {IBeaconDb} from "../../db/index.js";
import {CheckpointStateCache, StateContextCache} from "../stateCache/index.js";
import {getCheckpointFromState} from "../blocks/utils/checkpoint.js";
import {ChainEvent, ChainEventEmitter} from "../emitter.js";
import {CheckpointStateCache, BlockStateCache} from "../stateCache/types.js";
import {IStateRegeneratorInternal, RegenCaller, StateCloneOpts} from "./interface.js";
import {RegenError, RegenErrorCode} from "./errors.js";

export type RegenModules = {
db: IBeaconDb;
forkChoice: IForkChoice;
stateCache: StateContextCache;
stateCache: BlockStateCache;
checkpointStateCache: CheckpointStateCache;
config: ChainForkConfig;
emitter: ChainEventEmitter;
logger: Logger;
metrics: Metrics | null;
};

/**
* Regenerates states that have already been processed by the fork choice
* Since Feb 2024, we support reloading checkpoint state from disk via allowDiskReload flag. Due to its performance impact
* this flag is only set to true in this case:
* - getPreState: this is for block processing, it's important to reload state in unfinality time
* - updateHeadState: rarely happen, but it's important to make sure we always can regen head state
*/
export class StateRegenerator implements IStateRegeneratorInternal {
constructor(private readonly modules: RegenModules) {}
Expand All @@ -41,11 +46,12 @@ export class StateRegenerator implements IStateRegeneratorInternal {
* Get the state to run with `block`. May be:
* - If parent is in same epoch -> Exact state at `block.parentRoot`
* - If parent is in prev epoch -> State after `block.parentRoot` dialed forward through epoch transition
* - reload state if needed in this flow
*/
async getPreState(
block: allForks.BeaconBlock,
opts: StateCloneOpts,
rCaller: RegenCaller
regenCaller: RegenCaller
): Promise<CachedBeaconStateAllForks> {
const parentBlock = this.modules.forkChoice.getBlock(block.parentRoot);
if (!parentBlock) {
Expand All @@ -57,18 +63,19 @@ export class StateRegenerator implements IStateRegeneratorInternal {

const parentEpoch = computeEpochAtSlot(parentBlock.slot);
const blockEpoch = computeEpochAtSlot(block.slot);
const allowDiskReload = true;

// This may save us at least one epoch transition.
// If the requested state crosses an epoch boundary
// then we may use the checkpoint state before the block
// We may have the checkpoint state with parent root inside the checkpoint state cache
// through gossip validation.
if (parentEpoch < blockEpoch) {
return this.getCheckpointState({root: block.parentRoot, epoch: blockEpoch}, opts, rCaller);
return this.getCheckpointState({root: block.parentRoot, epoch: blockEpoch}, opts, regenCaller, allowDiskReload);
}

// Otherwise, get the state normally.
return this.getState(parentBlock.stateRoot, rCaller);
return this.getState(parentBlock.stateRoot, regenCaller, allowDiskReload);
}

/**
Expand All @@ -77,20 +84,23 @@ export class StateRegenerator implements IStateRegeneratorInternal {
async getCheckpointState(
cp: phase0.Checkpoint,
opts: StateCloneOpts,
rCaller: RegenCaller
regenCaller: RegenCaller,
allowDiskReload = false
): Promise<CachedBeaconStateAllForks> {
const checkpointStartSlot = computeStartSlotAtEpoch(cp.epoch);
return this.getBlockSlotState(toHexString(cp.root), checkpointStartSlot, opts, rCaller);
return this.getBlockSlotState(toHexString(cp.root), checkpointStartSlot, opts, regenCaller, allowDiskReload);
}

/**
* Get state after block `blockRoot` dialed forward to `slot`
* - allowDiskReload should be used with care, as it will cause the state to be reloaded from disk
*/
async getBlockSlotState(
blockRoot: RootHex,
slot: Slot,
opts: StateCloneOpts,
rCaller: RegenCaller
regenCaller: RegenCaller,
allowDiskReload = false
): Promise<CachedBeaconStateAllForks> {
const block = this.modules.forkChoice.getBlockHex(blockRoot);
if (!block) {
Expand All @@ -108,26 +118,35 @@ export class StateRegenerator implements IStateRegeneratorInternal {
});
}

const latestCheckpointStateCtx = this.modules.checkpointStateCache.getLatest(blockRoot, computeEpochAtSlot(slot));
const {checkpointStateCache} = this.modules;
const epoch = computeEpochAtSlot(slot);
const latestCheckpointStateCtx = allowDiskReload
? await checkpointStateCache.getOrReloadLatest(blockRoot, epoch)
: checkpointStateCache.getLatest(blockRoot, epoch);

// If a checkpoint state exists with the given checkpoint root, it either is in requested epoch
// or needs to have empty slots processed until the requested epoch
if (latestCheckpointStateCtx) {
return processSlotsByCheckpoint(this.modules, latestCheckpointStateCtx, slot, opts);
return processSlotsByCheckpoint(this.modules, latestCheckpointStateCtx, slot, regenCaller, opts);
}

// Otherwise, use the fork choice to get the stateRoot from block at the checkpoint root
// regenerate that state,
// then process empty slots until the requested epoch
const blockStateCtx = await this.getState(block.stateRoot, rCaller);
return processSlotsByCheckpoint(this.modules, blockStateCtx, slot, opts);
const blockStateCtx = await this.getState(block.stateRoot, regenCaller, allowDiskReload);
return processSlotsByCheckpoint(this.modules, blockStateCtx, slot, regenCaller, opts);
}

/**
* Get state by exact root. If not in cache directly, requires finding the block that references the state from the
* forkchoice and replaying blocks to get to it.
* - allowDiskReload should be used with care, as it will cause the state to be reloaded from disk
*/
async getState(stateRoot: RootHex, _rCaller: RegenCaller): Promise<CachedBeaconStateAllForks> {
async getState(
stateRoot: RootHex,
_rCaller: RegenCaller,
allowDiskReload = false
): Promise<CachedBeaconStateAllForks> {
// Trivial case, state at stateRoot is already cached
const cachedStateCtx = this.modules.stateCache.get(stateRoot);
if (cachedStateCtx) {
Expand All @@ -143,15 +162,17 @@ export class StateRegenerator implements IStateRegeneratorInternal {
// gets reversed when replayed
const blocksToReplay = [block];
let state: CachedBeaconStateAllForks | null = null;
for (const b of this.modules.forkChoice.iterateAncestorBlocks(block.parentRoot)) {
const {checkpointStateCache} = this.modules;
// iterateAncestorBlocks only returns ancestor blocks, not the block itself
for (const b of this.modules.forkChoice.iterateAncestorBlocks(block.blockRoot)) {
state = this.modules.stateCache.get(b.stateRoot);
if (state) {
break;
}
state = this.modules.checkpointStateCache.getLatest(
b.blockRoot,
computeEpochAtSlot(blocksToReplay[blocksToReplay.length - 1].slot - 1)
);
const epoch = computeEpochAtSlot(blocksToReplay[blocksToReplay.length - 1].slot - 1);
state = allowDiskReload
? await checkpointStateCache.getOrReloadLatest(b.blockRoot, epoch)
: checkpointStateCache.getLatest(b.blockRoot, epoch);
if (state) {
break;
}
Expand All @@ -172,6 +193,8 @@ export class StateRegenerator implements IStateRegeneratorInternal {
});
}

const replaySlots = blocksToReplay.map((b) => b.slot).join(",");
this.modules.logger.debug("Replaying blocks to get state", {stateRoot, replaySlots});
for (const b of blocksToReplay.reverse()) {
const block = await this.modules.db.block.get(fromHexString(b.blockRoot));
if (!block) {
Expand All @@ -195,11 +218,23 @@ export class StateRegenerator implements IStateRegeneratorInternal {
verifyProposer: false,
verifySignatures: false,
},
null
this.modules.metrics
);

// TODO: Persist states, note that regen could be triggered by old states.
// Should those take a place in the cache?
const stateRoot = toHexString(state.hashTreeRoot());
if (b.stateRoot !== stateRoot) {
throw new RegenError({
slot: b.slot,
code: RegenErrorCode.INVALID_STATE_ROOT,
actual: stateRoot,
expected: b.stateRoot,
});
}

if (allowDiskReload) {
// also with allowDiskReload flag, we "reload" it to the state cache too
this.modules.stateCache.add(state);
}

// this avoids keeping our node busy processing blocks
await sleep(0);
Expand All @@ -210,13 +245,14 @@ export class StateRegenerator implements IStateRegeneratorInternal {
});
}
}
this.modules.logger.debug("Replayed blocks to get state", {stateRoot, replaySlots});

return state;
}

private findFirstStateBlock(stateRoot: RootHex): ProtoBlock {
for (const block of this.modules.forkChoice.forwarditerateAncestorBlocks()) {
if (block !== undefined) {
if (block.stateRoot === stateRoot) {
return block;
}
}
Expand All @@ -237,9 +273,10 @@ async function processSlotsByCheckpoint(
modules: {checkpointStateCache: CheckpointStateCache; metrics: Metrics | null; emitter: ChainEventEmitter},
preState: CachedBeaconStateAllForks,
slot: Slot,
regenCaller: RegenCaller,
opts: StateCloneOpts
): Promise<CachedBeaconStateAllForks> {
let postState = await processSlotsToNearestCheckpoint(modules, preState, slot, opts);
let postState = await processSlotsToNearestCheckpoint(modules, preState, slot, regenCaller, opts);
if (postState.slot < slot) {
postState = processSlots(postState, slot, opts, modules.metrics);
}
Expand All @@ -257,6 +294,7 @@ async function processSlotsToNearestCheckpoint(
modules: {checkpointStateCache: CheckpointStateCache; metrics: Metrics | null; emitter: ChainEventEmitter},
preState: CachedBeaconStateAllForks,
slot: Slot,
regenCaller: RegenCaller,
opts: StateCloneOpts
): Promise<CachedBeaconStateAllForks> {
const preSlot = preState.slot;
Expand All @@ -272,12 +310,16 @@ async function processSlotsToNearestCheckpoint(
) {
// processSlots calls .clone() before mutating
postState = processSlots(postState, nextEpochSlot, opts, metrics);
modules.metrics?.epochTransitionByCaller.inc({caller: regenCaller});

// Cache state to preserve epoch transition work
// this is usually added when we prepare for next slot or validate gossip block
// then when we process the 1st block of epoch, we don't have to do state transition again
// This adds Previous Root Checkpoint State to the checkpoint state cache
// This may becomes the "official" checkpoint state if the 1st block of epoch is skipped
const checkpointState = postState;
const cp = getCheckpointFromState(checkpointState);
checkpointStateCache.add(cp, checkpointState);
emitter.emit(ChainEvent.checkpoint, cp, checkpointState);
emitter.emit(ChainEvent.checkpoint, cp, checkpointState.clone());

// this avoids keeping our node busy processing blocks
await sleep(0);
Expand Down
5 changes: 5 additions & 0 deletions packages/beacon-node/src/metrics/metrics/lodestar.ts
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,11 @@ export function createLodestarMetrics(

// Beacon state transition metrics

epochTransitionByCaller: register.gauge<{caller: RegenCaller}>({
name: "lodestar_epoch_transition_by_caller_total",
help: "Total count of epoch transition by caller",
labelNames: ["caller"],
}),
epochTransitionTime: register.histogram({
name: "lodestar_stfn_epoch_transition_seconds",
help: "Time to process a single epoch transition in seconds",
Expand Down

0 comments on commit d10ed38

Please sign in to comment.