Skip to content

Commit

Permalink
feat: implement BalancesTreeCache (#7084)
Browse files Browse the repository at this point in the history
* feat: implement balances tree cache

* fix: set balancesTreeCache when clone EpochCache
  • Loading branch information
twoeths authored Sep 14, 2024
1 parent 0aeae26 commit 604a2a7
Show file tree
Hide file tree
Showing 14 changed files with 166 additions and 35 deletions.
2 changes: 1 addition & 1 deletion packages/beacon-node/src/chain/archiver/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ export class Archiver {
// should be after ArchiveBlocksTask to handle restart cleanly
await this.statesArchiver.maybeArchiveState(finalized);

this.chain.regen.pruneOnFinalized(finalizedEpoch);
this.chain.pruneOnFinalized(finalizedEpoch);

// tasks rely on extended fork choice
const prunedBlocks = this.chain.forkChoice.prune(finalized.rootHex);
Expand Down
38 changes: 38 additions & 0 deletions packages/beacon-node/src/chain/balancesTreeCache.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import {ListBasicTreeViewDU, UintNumberType} from "@chainsafe/ssz";
import {IBalancesTreeCache, CachedBeaconStateAllForks} from "@lodestar/state-transition";
import {Metrics} from "../metrics/index.js";

const MAX_ITEMS = 2;

export class BalancesTreeCache implements IBalancesTreeCache {
private readonly unusedBalancesTrees: ListBasicTreeViewDU<UintNumberType>[] = [];

constructor(private readonly metrics: Metrics | null = null) {
if (metrics) {
metrics.balancesTreeCache.size.addCollect(() => {
metrics.balancesTreeCache.size.set(this.unusedBalancesTrees.length);
});
}
}

processUnusedState(state: CachedBeaconStateAllForks | undefined): void {
if (state === undefined) {
return;
}

this.unusedBalancesTrees.push(state.balances);
while (this.unusedBalancesTrees.length > MAX_ITEMS) {
this.unusedBalancesTrees.shift();
}
}

getUnusedBalances(): ListBasicTreeViewDU<UintNumberType> | undefined {
if (this.unusedBalancesTrees.length === 0) {
this.metrics?.balancesTreeCache.miss.inc();
return undefined;
}

this.metrics?.balancesTreeCache.hit.inc();
return this.unusedBalancesTrees.shift();
}
}
11 changes: 10 additions & 1 deletion packages/beacon-node/src/chain/blocks/importBlock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,16 @@ export async function importBlock(

// This adds the state necessary to process the next block
// Some block event handlers require state being in state cache so need to do this before emitting EventType.block
this.regen.processState(blockRootHex, postState);
this.regen.processState(blockRootHex, postState).then((prunedStates) => {
if (prunedStates) {
for (const states of prunedStates.values()) {
// cp states on the same epoch shares the same balances seed tree so only need one of them
this.balancesTreeCache.processUnusedState(states[0]);
}
}
}).catch((e) => {
this.logger.error("Regen error to process state for block", {slot: blockSlot, root: blockRootHex}, e as Error);
});

this.metrics?.importBlock.bySource.inc({source});
this.logger.verbose("Added block to forkchoice and state cache", {slot: blockSlot, root: blockRootHex});
Expand Down
14 changes: 14 additions & 0 deletions packages/beacon-node/src/chain/chain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ import {DbCPStateDatastore} from "./stateCache/datastore/db.js";
import {FileCPStateDatastore} from "./stateCache/datastore/file.js";
import {SyncCommitteeRewards, computeSyncCommitteeRewards} from "./rewards/syncCommitteeRewards.js";
import {AttestationsRewards, computeAttestationsRewards} from "./rewards/attestationsRewards.js";
import {BalancesTreeCache} from "./balancesTreeCache.js";

/**
* Arbitrary constants, blobs and payloads should be consumed immediately in the same slot
Expand Down Expand Up @@ -158,6 +159,7 @@ export class BeaconChain implements IBeaconChain {
readonly beaconProposerCache: BeaconProposerCache;
readonly checkpointBalancesCache: CheckpointBalancesCache;
readonly shufflingCache: ShufflingCache;
readonly balancesTreeCache: BalancesTreeCache;
/** Map keyed by executionPayload.blockHash of the block for those blobs */
readonly producedContentsCache = new Map<BlockHash, deneb.Contents>();

Expand Down Expand Up @@ -247,6 +249,7 @@ export class BeaconChain implements IBeaconChain {
this.beaconProposerCache = new BeaconProposerCache(opts);
this.checkpointBalancesCache = new CheckpointBalancesCache();
this.shufflingCache = new ShufflingCache(metrics, this.opts);
this.balancesTreeCache = new BalancesTreeCache(metrics);

// Restore state caches
// anchorState may already by a CachedBeaconState. If so, don't create the cache again, since deserializing all
Expand All @@ -260,6 +263,7 @@ export class BeaconChain implements IBeaconChain {
config,
pubkey2index: new PubkeyIndexMap(),
index2pubkey: [],
balancesTreeCache: this.balancesTreeCache,
});
this.shufflingCache.processState(cachedState, cachedState.epochCtx.previousShuffling.epoch);
this.shufflingCache.processState(cachedState, cachedState.epochCtx.currentShuffling.epoch);
Expand Down Expand Up @@ -863,6 +867,16 @@ export class BeaconChain implements IBeaconChain {
}
}

pruneOnFinalized(finalizedEpoch: Epoch): void {
const prunedStates = this.regen.pruneOnFinalized(finalizedEpoch);
if (prunedStates) {
// cp states on the same epoch shares the same balances seed tree so only need one of them
for (const states of prunedStates.values()) {
this.balancesTreeCache.processUnusedState(states[0]);
}
}
}

/**
* Regenerate state for attestation verification, this does not happen with default chain option of maxSkipSlots = 32 .
* However, need to handle just in case. Lodestar doesn't support multiple regen state requests for attestation verification
Expand Down
2 changes: 2 additions & 0 deletions packages/beacon-node/src/chain/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,8 @@ export interface IBeaconChain {
blockRef: BeaconBlock | BlindedBeaconBlock,
validatorIds?: (ValidatorIndex | string)[]
): Promise<SyncCommitteeRewards>;

pruneOnFinalized(finalizedEpoch: Epoch): void;
}

export type SSZObjectType =
Expand Down
22 changes: 16 additions & 6 deletions packages/beacon-node/src/chain/regen/queued.ts
Original file line number Diff line number Diff line change
Expand Up @@ -148,16 +148,26 @@ export class QueuedStateRegenerator implements IStateRegenerator {
this.blockStateCache.prune(headStateRoot);
}

pruneOnFinalized(finalizedEpoch: number): void {
this.checkpointStateCache.pruneFinalized(finalizedEpoch);
pruneOnFinalized(finalizedEpoch: number): Map<Epoch, CachedBeaconStateAllForks[]> | null {
const prunedStates = this.checkpointStateCache.pruneFinalized(finalizedEpoch);
this.blockStateCache.deleteAllBeforeEpoch(finalizedEpoch);

return prunedStates;
}

processState(blockRootHex: RootHex, postState: CachedBeaconStateAllForks): void {
async processState(
blockRootHex: RootHex,
postState: CachedBeaconStateAllForks
): Promise<Map<Epoch, CachedBeaconStateAllForks[]> | null> {
this.blockStateCache.add(postState);
this.checkpointStateCache.processState(blockRootHex, postState).catch((e) => {
this.logger.debug("Error processing block state", {blockRootHex, slot: postState.slot}, e);
});
let prunedStates: Map<Epoch, CachedBeaconStateAllForks[]> | null = null;
try {
prunedStates = await this.checkpointStateCache.processState(blockRootHex, postState);
} catch (e) {
this.logger.debug("Error processing block state", {blockRootHex, slot: postState.slot}, e as Error);
}

return prunedStates;
}

addCheckpointState(cp: phase0.Checkpoint, item: CachedBeaconStateAllForks): void {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ export class InMemoryCheckpointStateCache implements CheckpointStateCache {
return this.getLatest(rootHex, maxEpoch, opts);
}

async processState(): Promise<number> {
async processState(): Promise<Map<Epoch, CachedBeaconStateAllForks[]> | null> {
// do nothing, this class does not support prunning
return 0;
return null;
}

get(cp: CheckpointHex, opts?: StateCloneOpts): CachedBeaconStateAllForks | null {
Expand Down Expand Up @@ -122,12 +122,17 @@ export class InMemoryCheckpointStateCache implements CheckpointStateCache {
return previousHits;
}

pruneFinalized(finalizedEpoch: Epoch): void {
pruneFinalized(finalizedEpoch: Epoch): Map<Epoch, CachedBeaconStateAllForks[]> {
const result = new Map<Epoch, CachedBeaconStateAllForks[]>();

for (const epoch of this.epochIndex.keys()) {
if (epoch < finalizedEpoch) {
this.deleteAllEpochItems(epoch);
const deletedStates = this.deleteAllEpochItems(epoch);
result.set(epoch, deletedStates);
}
}

return result;
}

prune(finalizedEpoch: Epoch, justifiedEpoch: Epoch): void {
Expand All @@ -153,11 +158,19 @@ export class InMemoryCheckpointStateCache implements CheckpointStateCache {
}
}

deleteAllEpochItems(epoch: Epoch): void {
deleteAllEpochItems(epoch: Epoch): CachedBeaconStateAllForks[] {
const states = [];
for (const rootHex of this.epochIndex.get(epoch) || []) {
this.cache.delete(toCheckpointKey({rootHex, epoch}));
const key = toCheckpointKey({rootHex, epoch});
const state = this.cache.get(key);
if (state) {
states.push(state);
}
this.cache.delete(key);
}
this.epochIndex.delete(epoch);

return states;
}

clear(): void {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -421,14 +421,17 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
/**
* Prune all checkpoint states before the provided finalized epoch.
*/
pruneFinalized(finalizedEpoch: Epoch): void {
pruneFinalized(finalizedEpoch: Epoch): Map<Epoch, CachedBeaconStateAllForks[]> | null {
for (const epoch of this.epochIndex.keys()) {
if (epoch < finalizedEpoch) {
this.deleteAllEpochItems(epoch).catch((e) =>
this.logger.debug("Error delete all epoch items", {epoch, finalizedEpoch}, e as Error)
);
}
}

// not likely to return anything in-memory state because we may persist states even before they are finalized
return null;
}

/**
Expand Down Expand Up @@ -481,12 +484,14 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
*
* As of Mar 2024, it takes <=350ms to persist a holesky state on fast server
*/
async processState(blockRootHex: RootHex, state: CachedBeaconStateAllForks): Promise<number> {
let persistCount = 0;
async processState(
blockRootHex: RootHex,
state: CachedBeaconStateAllForks
): Promise<Map<Epoch, CachedBeaconStateAllForks[]> | null> {
// it's important to sort the epochs in ascending order, in case of big reorg we always want to keep the most recent checkpoint states
const sortedEpochs = Array.from(this.epochIndex.keys()).sort((a, b) => a - b);
if (sortedEpochs.length <= this.maxEpochsInMemory) {
return 0;
return null;
}

const blockSlot = state.slot;
Expand All @@ -502,24 +507,19 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
// normally the block persist happens at 2/3 of slot 0 of epoch, if it's already late then just skip to allow other tasks to run
// there are plenty of chances in the same epoch to persist checkpoint states, also if block is late it could be reorged
this.logger.verbose("Skip persist checkpoint states", {blockSlot, root: blockRootHex});
return 0;
return null;
}

const persistEpochs = sortedEpochs.slice(0, sortedEpochs.length - this.maxEpochsInMemory);

const result = new Map<Epoch, CachedBeaconStateAllForks[]>();
for (const lowestEpoch of persistEpochs) {
// usually there is only 0 or 1 epoch to persist in this loop
persistCount += await this.processPastEpoch(blockRootHex, state, lowestEpoch);
const prunedStates = await this.processPastEpoch(blockRootHex, state, lowestEpoch);
result.set(lowestEpoch, prunedStates);
}

if (persistCount > 0) {
this.logger.verbose("Persisted checkpoint states", {
slot: blockSlot,
root: blockRootHex,
persistCount,
persistEpochs: persistEpochs.length,
});
}
return persistCount;
return result;
}

/**
Expand Down Expand Up @@ -648,13 +648,16 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
* Performance note:
* - In normal condition, we persist 1 checkpoint state per epoch.
* - In reorged condition, we may persist multiple (most likely 2) checkpoint states per epoch.
*
* Return the pruned states from memory
*/
private async processPastEpoch(
blockRootHex: RootHex,
state: CachedBeaconStateAllForks,
epoch: Epoch
): Promise<number> {
): Promise<CachedBeaconStateAllForks[]> {
let persistCount = 0;
const prunedStates: CachedBeaconStateAllForks[] = [];
const epochBoundarySlot = computeStartSlotAtEpoch(epoch);
const epochBoundaryRoot =
epochBoundarySlot === state.slot ? fromHexString(blockRootHex) : getBlockRootAtSlot(state, epochBoundarySlot);
Expand Down Expand Up @@ -735,10 +738,20 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
this.metrics?.statePruneFromMemoryCount.inc();
this.logger.verbose("Pruned checkpoint state from memory", logMeta);
}

prunedStates.push(state);
}
}

return persistCount;
if (persistCount > 0) {
this.logger.verbose("Persisted checkpoint states", {
stateSlot: state.slot,
blockRoot: blockRootHex,
persistCount,
});
}

return prunedStates;
}

/**
Expand Down
7 changes: 5 additions & 2 deletions packages/beacon-node/src/chain/stateCache/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,11 @@ export interface CheckpointStateCache {
): Promise<CachedBeaconStateAllForks | null>;
updatePreComputedCheckpoint(rootHex: RootHex, epoch: Epoch): number | null;
prune(finalizedEpoch: Epoch, justifiedEpoch: Epoch): void;
pruneFinalized(finalizedEpoch: Epoch): void;
processState(blockRootHex: RootHex, state: CachedBeaconStateAllForks): Promise<number>;
pruneFinalized(finalizedEpoch: Epoch): Map<Epoch, CachedBeaconStateAllForks[]> | null;
processState(
blockRootHex: RootHex,
state: CachedBeaconStateAllForks
): Promise<Map<Epoch, CachedBeaconStateAllForks[]> | null>;
clear(): void;
dumpSummary(): routes.lodestar.StateCacheItem[];
/** Expose beacon states stored in cache. Use with caution */
Expand Down
15 changes: 15 additions & 0 deletions packages/beacon-node/src/metrics/metrics/lodestar.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1315,6 +1315,21 @@ export function createLodestarMetrics(
}),
},

balancesTreeCache: {
size: register.gauge({
name: "lodestar_balances_tree_cache_size",
help: "Balances tree cache size",
}),
hit: register.gauge({
name: "lodestar_balances_tree_cache_hit_total",
help: "Total number of balances tree cache hits",
}),
miss: register.gauge({
name: "lodestar_balances_tree_cache_miss_total",
help: "Total number of balances tree cache misses",
}),
},

seenCache: {
aggregatedAttestations: {
superSetCheckTotal: register.histogram({
Expand Down
5 changes: 5 additions & 0 deletions packages/state-transition/src/cache/balancesTreeCache.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import {UintNumberType, ListBasicTreeViewDU} from "@chainsafe/ssz";

export interface IBalancesTreeCache {
getUnusedBalances(): ListBasicTreeViewDU<UintNumberType> | undefined;
}
Loading

0 comments on commit 604a2a7

Please sign in to comment.