Skip to content

Commit

Permalink
feat: nHistoricalStates flag
Browse files Browse the repository at this point in the history
  • Loading branch information
twoeths committed Oct 9, 2023
1 parent 99d6af5 commit 4669871
Show file tree
Hide file tree
Showing 15 changed files with 314 additions and 70 deletions.
11 changes: 11 additions & 0 deletions packages/beacon-node/src/chain/archiver/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,13 @@ export class Archiver {

if (!opts.disableArchiveOnCheckpoint) {
this.chain.emitter.on(ChainEvent.forkChoiceFinalized, this.onFinalizedCheckpoint);
this.chain.emitter.on(ChainEvent.checkpoint, this.onCheckpoint);

signal.addEventListener(
"abort",
() => {
this.chain.emitter.off(ChainEvent.forkChoiceFinalized, this.onFinalizedCheckpoint);
this.chain.emitter.off(ChainEvent.checkpoint, this.onCheckpoint);
},
{once: true}
);
Expand All @@ -74,6 +76,15 @@ export class Archiver {
return this.jobQueue.push(finalized);
};

private onCheckpoint = (): void => {
const headStateRoot = this.chain.forkChoice.getHead().stateRoot;
this.chain.regen.pruneOnCheckpoint(
this.chain.forkChoice.getFinalizedCheckpoint().epoch,
this.chain.forkChoice.getJustifiedCheckpoint().epoch,
headStateRoot
);
};

private processFinalizedCheckpoint = async (finalized: CheckpointWithHex): Promise<void> => {
try {
const finalizedEpoch = finalized.epoch;
Expand Down
11 changes: 8 additions & 3 deletions packages/beacon-node/src/chain/chain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import {IExecutionEngine, IExecutionBuilder} from "../execution/index.js";
import {Clock, ClockEvent, IClock} from "../util/clock.js";
import {ensureDir, writeIfNotExist} from "../util/file.js";
import {isOptimisticBlock} from "../util/forkChoice.js";
import {CHECKPOINT_STATES_FOLDER, PersistentCheckpointStateCache, StateContextCache} from "./stateCache/index.js";
import {CHECKPOINT_STATES_FOLDER, PersistentCheckpointStateCache, LRUBlockStateCache} from "./stateCache/index.js";
import {BlockProcessor, ImportBlockOpts} from "./blocks/index.js";
import {ChainEventEmitter, ChainEvent} from "./emitter.js";
import {IBeaconChain, ProposerPreparationData, BlockHash, StateGetOpts} from "./interface.js";
Expand Down Expand Up @@ -79,6 +79,7 @@ import {ShufflingCache} from "./shufflingCache.js";
import {MemoryCheckpointStateCache} from "./stateCache/memoryCheckpointsCache.js";
import {FilePersistentApis} from "./stateCache/persistent/file.js";
import {DbPersistentApis} from "./stateCache/persistent/db.js";
import {StateContextCache} from "./stateCache/stateContextCache.js";

/**
* Arbitrary constants, blobs should be consumed immediately in the same slot they are produced.
Expand Down Expand Up @@ -236,11 +237,13 @@ export class BeaconChain implements IBeaconChain {
this.pubkey2index = cachedState.epochCtx.pubkey2index;
this.index2pubkey = cachedState.epochCtx.index2pubkey;

const stateCache = new StateContextCache(this.opts, {metrics});
const stateCache = this.opts.nHistoricalStates
? new LRUBlockStateCache(this.opts, {metrics})
: new StateContextCache({metrics});
const persistentApis = this.opts.persistCheckpointStatesToFile
? new FilePersistentApis(CHECKPOINT_STATES_FOLDER)
: new DbPersistentApis(this.db);
const checkpointStateCache = this.opts.persistentCheckpointStateCache
const checkpointStateCache = this.opts.nHistoricalStates
? new PersistentCheckpointStateCache(
{
metrics,
Expand All @@ -256,6 +259,8 @@ export class BeaconChain implements IBeaconChain {

const {checkpoint} = computeAnchorCheckpoint(config, anchorState);
stateCache.add(cachedState);
// TODO: remove once we go with n-historical states
stateCache.setHeadState(cachedState);
checkpointStateCache.add(checkpoint, cachedState);

const forkChoice = initializeForkChoice(
Expand Down
11 changes: 5 additions & 6 deletions packages/beacon-node/src/chain/options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ import {ArchiverOpts} from "./archiver/index.js";
import {ForkChoiceOpts} from "./forkChoice/index.js";
import {LightClientServerOpts} from "./lightClient/index.js";
import {PersistentCheckpointStateCacheOpts} from "./stateCache/types.js";
import {StateContextCacheOpts} from "./stateCache/stateContextCache.js";
import {LRUBlockStateCacheOpts} from "./stateCache/lruBlockStateCache.js";

export type IChainOptions = BlockProcessOpts &
PoolOpts &
SeenCacheOpts &
ForkChoiceOpts &
ArchiverOpts &
StateContextCacheOpts &
LRUBlockStateCacheOpts &
PersistentCheckpointStateCacheOpts &
LightClientServerOpts & {
blsVerifyAllMainThread?: boolean;
Expand All @@ -31,8 +31,7 @@ export type IChainOptions = BlockProcessOpts &
trustedSetup?: string;
broadcastValidationStrictness?: string;
minSameMessageSignatureSetsToBatch: number;
// TODO: change to n_historical_states
persistentCheckpointStateCache?: boolean;
nHistoricalStates?: boolean;
/** by default persist checkpoint state to db */
persistCheckpointStatesToFile?: boolean;
};
Expand Down Expand Up @@ -97,12 +96,12 @@ export const defaultChainOptions: IChainOptions = {
// since this batch attestation work is designed to work with useWorker=true, make this the lowest value
minSameMessageSignatureSetsToBatch: 2,
// TODO: change to false, leaving here to ease testing
persistentCheckpointStateCache: true,
nHistoricalStates: true,
// by default, persist checkpoint states to db
persistCheckpointStatesToFile: false,

// since Sep 2023, only cache up to 32 states by default. If a big reorg happens it'll load checkpoint state from disk and regen from there.
// TODO: change to 128, leaving here to ease testing
// TODO: change to 128 which is the old StateCache config, only change back to 32 when we enable n-historical state, leaving here to ease testing
maxStates: 32,
// only used when persistentCheckpointStateCache = true
maxEpochsInMemory: 2,
Expand Down
2 changes: 2 additions & 0 deletions packages/beacon-node/src/chain/regen/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ export interface IStateRegenerator extends IStateRegeneratorInternal {
getCheckpointStateOrBytes(cp: CheckpointHex): Promise<CachedBeaconStateAllForks | Uint8Array | null>;
getCheckpointStateSync(cp: CheckpointHex): CachedBeaconStateAllForks | null;
getClosestHeadState(head: ProtoBlock): CachedBeaconStateAllForks | null;
// TODO: remove once we go with n-historical state cache
pruneOnCheckpoint(finalizedEpoch: Epoch, justifiedEpoch: Epoch, headStateRoot: RootHex): void;
pruneOnFinalized(finalizedEpoch: Epoch): void;
addPostState(postState: CachedBeaconStateAllForks): void;
addCheckpointState(cp: phase0.Checkpoint, item: CachedBeaconStateAllForks): void;
Expand Down
20 changes: 15 additions & 5 deletions packages/beacon-node/src/chain/regen/queued.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import {IForkChoice, ProtoBlock} from "@lodestar/fork-choice";
import {CachedBeaconStateAllForks, computeEpochAtSlot} from "@lodestar/state-transition";
import {Logger} from "@lodestar/utils";
import {routes} from "@lodestar/api";
import {CheckpointHex, CheckpointStateCache, StateContextCache, toCheckpointHex} from "../stateCache/index.js";
import {CheckpointHex, CheckpointStateCache, BlockStateCache, toCheckpointHex} from "../stateCache/index.js";
import {Metrics} from "../../metrics/index.js";
import {JobItemQueue} from "../../util/queue/index.js";
import {IStateRegenerator, IStateRegeneratorInternal, RegenCaller, RegenFnName, StateCloneOpts} from "./interface.js";
Expand Down Expand Up @@ -34,7 +34,7 @@ export class QueuedStateRegenerator implements IStateRegenerator {
private readonly regen: StateRegenerator;

private readonly forkChoice: IForkChoice;
private readonly stateCache: StateContextCache;
private readonly stateCache: BlockStateCache;
private readonly checkpointStateCache: CheckpointStateCache;
private readonly metrics: Metrics | null;
private readonly logger: Logger;
Expand Down Expand Up @@ -82,6 +82,12 @@ export class QueuedStateRegenerator implements IStateRegenerator {
return this.checkpointStateCache.getLatest(head.blockRoot, Infinity) || this.stateCache.get(head.stateRoot);
}

// TODO: remove this once we go with n-historical state
pruneOnCheckpoint(finalizedEpoch: Epoch, justifiedEpoch: Epoch, headStateRoot: RootHex): void {
this.checkpointStateCache.prune(finalizedEpoch, justifiedEpoch);
this.stateCache.prune(headStateRoot);
}

pruneOnFinalized(finalizedEpoch: number): void {
this.checkpointStateCache.pruneFinalized(finalizedEpoch);
this.stateCache.deleteAllBeforeEpoch(finalizedEpoch);
Expand All @@ -106,16 +112,20 @@ export class QueuedStateRegenerator implements IStateRegenerator {
: this.stateCache.get(newHeadStateRoot);

if (headState) {
// this move the headState to the front of the queue so it'll not be pruned right away
this.stateCache.add(headState);
// TODO: use add() api instead once we go with n-historical state
this.stateCache.setHeadState(headState);
} else {
// Trigger regen on head change if necessary
this.logger.warn("Head state not available, triggering regen", {stateRoot: newHeadStateRoot});
// it's important to reload state to regen head state here
const shouldReload = true;
// head has changed, so the existing cached head state is no longer useful. Set strong reference to null to free
// up memory for regen step below. During regen, node won't be functional but eventually head will be available
// TODO: remove this once we go with n-historical state
this.stateCache.setHeadState(null);
this.regen.getState(newHeadStateRoot, RegenCaller.processBlock, shouldReload).then(
// this move the headState to the front of the queue so it'll not be pruned right away
(headStateRegen) => this.stateCache.add(headStateRegen),
(headStateRegen) => this.stateCache.setHeadState(headStateRegen),
(e) => this.logger.error("Error on head state regen", {}, e)
);
}
Expand Down
4 changes: 2 additions & 2 deletions packages/beacon-node/src/chain/regen/regen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import {SLOTS_PER_EPOCH} from "@lodestar/params";
import {ChainForkConfig} from "@lodestar/config";
import {Metrics} from "../../metrics/index.js";
import {IBeaconDb} from "../../db/index.js";
import {CheckpointStateCache, StateContextCache} from "../stateCache/index.js";
import {CheckpointStateCache, BlockStateCache} from "../stateCache/index.js";
import {getCheckpointFromState} from "../blocks/utils/checkpoint.js";
import {ChainEvent, ChainEventEmitter} from "../emitter.js";
import {IStateRegeneratorInternal, RegenCaller, StateCloneOpts} from "./interface.js";
Expand All @@ -24,7 +24,7 @@ import {RegenError, RegenErrorCode} from "./errors.js";
export type RegenModules = {
db: IBeaconDb;
forkChoice: IForkChoice;
stateCache: StateContextCache;
stateCache: BlockStateCache;
checkpointStateCache: CheckpointStateCache;
config: ChainForkConfig;
emitter: ChainEventEmitter;
Expand Down
2 changes: 1 addition & 1 deletion packages/beacon-node/src/chain/shufflingCache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import {CachedBeaconStateAllForks, EpochShuffling, getShufflingDecisionBlock} fr
import {Epoch, RootHex} from "@lodestar/types";

/**
* Same value to CheckpointBalancesCache, with the assumption that we don't have to use it old epochs. In the worse case:
* Same value to CheckpointBalancesCache, with the assumption that we don't have to use it for old epochs. In the worse case:
* - when loading state bytes from disk, we need to compute shuffling for all epochs (~1s as of Sep 2023)
* - don't have shuffling to verify attestations, need to do 1 epoch transition to add shuffling to this cache. This never happens
* with default chain option of maxSkipSlots = 32
Expand Down
2 changes: 1 addition & 1 deletion packages/beacon-node/src/chain/stateCache/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
export * from "./stateContextCache.js";
export * from "./lruBlockStateCache.js";
export * from "./persistentCheckpointsCache.js";
export * from "./types.js";
146 changes: 146 additions & 0 deletions packages/beacon-node/src/chain/stateCache/lruBlockStateCache.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
import {toHexString} from "@chainsafe/ssz";
import {Epoch, RootHex} from "@lodestar/types";
import {CachedBeaconStateAllForks} from "@lodestar/state-transition";
import {routes} from "@lodestar/api";
import {Metrics} from "../../metrics/index.js";
import {LinkedList} from "../../util/array.js";
import {MapTracker} from "./mapMetrics.js";
import {BlockStateCache} from "./types.js";

export type LRUBlockStateCacheOpts = {
maxStates: number;
};

/**
* New implementation of BlockStateCache that keeps the most recent n states consistently
* - Prune per add() instead of per checkpoint so it only keeps n historical states consistently
* - This is LRU like cache except that we only track the last added time, not the last used time
* because state could be fetched from multiple places, but we only care about the last added time.
* - No need to set a separate head state, the head state is always the first item in the list
*/
export class LRUBlockStateCache implements BlockStateCache {
/**
* Max number of states allowed in the cache
*/
readonly maxStates: number;

private readonly cache: MapTracker<string, CachedBeaconStateAllForks>;
/** Epoch -> Set<blockRoot> */
private readonly epochIndex = new Map<Epoch, Set<string>>();
// key order to implement LRU like cache
private readonly keyOrder: LinkedList<string>;
private readonly metrics: Metrics["stateCache"] | null | undefined;

constructor(opts: LRUBlockStateCacheOpts, {metrics}: {maxStates?: number; metrics?: Metrics | null}) {
this.maxStates = opts.maxStates;
this.cache = new MapTracker(metrics?.stateCache);
if (metrics) {
this.metrics = metrics.stateCache;
metrics.stateCache.size.addCollect(() => metrics.stateCache.size.set(this.cache.size));
}
this.keyOrder = new LinkedList();
}

/**
* This implementation always move head state to the head of the list
* so no need to set a separate head state
* However this is to be consistent with the old StateContextCache
* TODO: remove this method, consumer should go with add() api instead
*/
setHeadState(item: CachedBeaconStateAllForks | null): void {
if (item !== null) {
this.add(item);
}
}

get(rootHex: RootHex): CachedBeaconStateAllForks | null {
this.metrics?.lookups.inc();
const item = this.cache.get(rootHex);
if (!item) {
return null;
}

this.metrics?.hits.inc();
this.metrics?.stateClonedCount.observe(item.clonedCount);

return item;
}

add(item: CachedBeaconStateAllForks): void {
const key = toHexString(item.hashTreeRoot());
if (this.cache.get(key)) {
this.keyOrder.moveToHead(key);
// same size, no prune
return;
}
this.metrics?.adds.inc();
this.cache.set(key, item);
const epoch = item.epochCtx.epoch;
const blockRoots = this.epochIndex.get(epoch);
if (blockRoots) {
blockRoots.add(key);
} else {
this.epochIndex.set(epoch, new Set([key]));
}
this.keyOrder.unshift(key);
this.prune();
}

clear(): void {
this.cache.clear();
this.epochIndex.clear();
}

get size(): number {
return this.cache.size;
}

/**
* If a recent state is not available, regen from the checkpoint state.
* Given state 0 => 1 => ... => n, if regen adds back state 0 we should not remove it right away.
* The LRU-like cache helps with this.
*/
prune(): void {
while (this.keyOrder.length > this.maxStates) {
const key = this.keyOrder.pop();
if (!key) {
// should not happen
throw new Error("No key");
}
const item = this.cache.get(key);
if (item) {
this.epochIndex.get(item.epochCtx.epoch)?.delete(key);
this.cache.delete(key);
}
}
}

/**
* Prune per finalized epoch.
*/
deleteAllBeforeEpoch(finalizedEpoch: Epoch): void {
for (const epoch of this.epochIndex.keys()) {
if (epoch < finalizedEpoch) {
this.deleteAllEpochItems(epoch);
}
}
}

/** ONLY FOR DEBUGGING PURPOSES. For lodestar debug API */
dumpSummary(): routes.lodestar.StateCacheItem[] {
return Array.from(this.cache.entries()).map(([key, state]) => ({
slot: state.slot,
root: toHexString(state.hashTreeRoot()),
reads: this.cache.readCount.get(key) ?? 0,
lastRead: this.cache.lastRead.get(key) ?? 0,
checkpointState: false,
}));
}

private deleteAllEpochItems(epoch: Epoch): void {
for (const rootHex of this.epochIndex.get(epoch) || []) {
this.cache.delete(rootHex);
}
this.epochIndex.delete(epoch);
}
}
Loading

0 comments on commit 4669871

Please sign in to comment.