diff --git a/packages/beacon-node/src/chain/archiver/archiveStates.ts b/packages/beacon-node/src/chain/archiver/archiveStates.ts index e3ff48b02355..2231cd3ff513 100644 --- a/packages/beacon-node/src/chain/archiver/archiveStates.ts +++ b/packages/beacon-node/src/chain/archiver/archiveStates.ts @@ -5,6 +5,7 @@ import {computeEpochAtSlot, computeStartSlotAtEpoch} from "@lodestar/state-trans import {CheckpointWithHex} from "@lodestar/fork-choice"; import {IBeaconDb} from "../../db/index.js"; import {IStateRegenerator} from "../regen/interface.js"; +import {getStateSlotFromBytes} from "../../util/multifork.js"; /** * Minimum number of epochs between single temp archived states @@ -83,13 +84,26 @@ export class StatesArchiver { * Only the new finalized state is stored to disk */ async archiveState(finalized: CheckpointWithHex): Promise { - const finalizedState = this.regen.getCheckpointStateSync(finalized); - if (!finalizedState) { - throw Error("No state in cache for finalized checkpoint state epoch #" + finalized.epoch); + // starting from Mar 2024, the finalized state could be from disk or in memory + const finalizedStateOrBytes = await this.regen.getCheckpointStateOrBytes(finalized); + const {rootHex} = finalized; + if (!finalizedStateOrBytes) { + throw Error(`No state in cache for finalized checkpoint state epoch #${finalized.epoch} root ${rootHex}`); + } + if (finalizedStateOrBytes instanceof Uint8Array) { + const slot = getStateSlotFromBytes(finalizedStateOrBytes); + await this.db.stateArchive.putBinary(slot, finalizedStateOrBytes); + this.logger.verbose("Archived finalized state bytes", {epoch: finalized.epoch, slot, root: rootHex}); + } else { + // state + await this.db.stateArchive.put(finalizedStateOrBytes.slot, finalizedStateOrBytes); + // don't delete states before the finalized state, auto-prune will take care of it + this.logger.verbose("Archived finalized state", { + epoch: finalized.epoch, + slot: finalizedStateOrBytes.slot, + root: rootHex, + }); } - await this.db.stateArchive.put(finalizedState.slot, finalizedState); - // don't delete states before the finalized state, auto-prune will take care of it - this.logger.verbose("Archived finalized state", {finalizedEpoch: finalized.epoch}); } } diff --git a/packages/beacon-node/src/chain/blocks/importBlock.ts b/packages/beacon-node/src/chain/blocks/importBlock.ts index 0f1f2a7890d9..b755d2026f3c 100644 --- a/packages/beacon-node/src/chain/blocks/importBlock.ts +++ b/packages/beacon-node/src/chain/blocks/importBlock.ts @@ -360,7 +360,7 @@ export async function importBlock( const checkpointState = postState; const cp = getCheckpointFromState(checkpointState); this.regen.addCheckpointState(cp, checkpointState); - this.emitter.emit(ChainEvent.checkpoint, cp, checkpointState); + this.emitter.emit(ChainEvent.checkpoint, cp, checkpointState.clone(true)); // Note: in-lined code from previos handler of ChainEvent.checkpoint this.logger.verbose("Checkpoint processed", toCheckpointHex(cp)); diff --git a/packages/beacon-node/src/chain/regen/interface.ts b/packages/beacon-node/src/chain/regen/interface.ts index b861378ff440..d72f83604e67 100644 --- a/packages/beacon-node/src/chain/regen/interface.ts +++ b/packages/beacon-node/src/chain/regen/interface.ts @@ -36,6 +36,7 @@ export interface IStateRegenerator extends IStateRegeneratorInternal { dumpCacheSummary(): routes.lodestar.StateCacheItem[]; getStateSync(stateRoot: RootHex): CachedBeaconStateAllForks | null; getPreStateSync(block: allForks.BeaconBlock): CachedBeaconStateAllForks | null; + getCheckpointStateOrBytes(cp: CheckpointHex): Promise; getCheckpointStateSync(cp: CheckpointHex): CachedBeaconStateAllForks | null; getClosestHeadState(head: ProtoBlock): CachedBeaconStateAllForks | null; pruneOnCheckpoint(finalizedEpoch: Epoch, justifiedEpoch: Epoch, headStateRoot: RootHex): void; diff --git a/packages/beacon-node/src/chain/regen/queued.ts b/packages/beacon-node/src/chain/regen/queued.ts index a65c462227f3..365a6d871a79 100644 --- a/packages/beacon-node/src/chain/regen/queued.ts +++ b/packages/beacon-node/src/chain/regen/queued.ts @@ -18,7 +18,6 @@ const REGEN_CAN_ACCEPT_WORK_THRESHOLD = 16; type QueuedStateRegeneratorModules = RegenModules & { signal: AbortSignal; - logger: Logger; }; type RegenRequestKey = keyof IStateRegeneratorInternal; @@ -54,6 +53,12 @@ export class QueuedStateRegenerator implements IStateRegenerator { this.logger = modules.logger; } + async init(): Promise { + if (this.checkpointStateCache.init) { + return this.checkpointStateCache.init(); + } + } + canAcceptWork(): boolean { return this.jobQueue.jobLen < REGEN_CAN_ACCEPT_WORK_THRESHOLD; } @@ -105,6 +110,10 @@ export class QueuedStateRegenerator implements IStateRegenerator { return null; } + async getCheckpointStateOrBytes(cp: CheckpointHex): Promise { + return this.checkpointStateCache.getStateOrBytes(cp); + } + getCheckpointStateSync(cp: CheckpointHex): CachedBeaconStateAllForks | null { return this.checkpointStateCache.get(cp); } @@ -145,10 +154,13 @@ export class QueuedStateRegenerator implements IStateRegenerator { } else { // Trigger regen on head change if necessary this.logger.warn("Head state not available, triggering regen", {stateRoot: newHeadStateRoot}); + // it's important to reload state to regen head state here + const allowDiskReload = true; // head has changed, so the existing cached head state is no longer useful. Set strong reference to null to free // up memory for regen step below. During regen, node won't be functional but eventually head will be available + // for legacy StateContextCache only this.stateCache.setHeadState(null); - this.regen.getState(newHeadStateRoot, RegenCaller.processBlock).then( + this.regen.getState(newHeadStateRoot, RegenCaller.processBlock, allowDiskReload).then( (headStateRegen) => this.stateCache.setHeadState(headStateRegen), (e) => this.logger.error("Error on head state regen", {}, e) ); diff --git a/packages/beacon-node/src/chain/regen/regen.ts b/packages/beacon-node/src/chain/regen/regen.ts index ab0e0b5f2dd7..2e32b9ed2d71 100644 --- a/packages/beacon-node/src/chain/regen/regen.ts +++ b/packages/beacon-node/src/chain/regen/regen.ts @@ -319,7 +319,7 @@ async function processSlotsToNearestCheckpoint( const checkpointState = postState; const cp = getCheckpointFromState(checkpointState); checkpointStateCache.add(cp, checkpointState); - emitter.emit(ChainEvent.checkpoint, cp, checkpointState.clone()); + emitter.emit(ChainEvent.checkpoint, cp, checkpointState.clone(true)); // this avoids keeping our node busy processing blocks await sleep(0); diff --git a/packages/beacon-node/src/chain/stateCache/fifoBlockStateCache.ts b/packages/beacon-node/src/chain/stateCache/fifoBlockStateCache.ts index 854983101c04..7cce1566baf3 100644 --- a/packages/beacon-node/src/chain/stateCache/fifoBlockStateCache.ts +++ b/packages/beacon-node/src/chain/stateCache/fifoBlockStateCache.ts @@ -80,7 +80,7 @@ export class FIFOBlockStateCache implements BlockStateCache { this.metrics?.hits.inc(); this.metrics?.stateClonedCount.observe(item.clonedCount); - return item; + return item.clone(true); } /** diff --git a/packages/beacon-node/src/chain/stateCache/persistentCheckpointsCache.ts b/packages/beacon-node/src/chain/stateCache/persistentCheckpointsCache.ts index b57e51f8f2d6..9580ef1701c7 100644 --- a/packages/beacon-node/src/chain/stateCache/persistentCheckpointsCache.ts +++ b/packages/beacon-node/src/chain/stateCache/persistentCheckpointsCache.ts @@ -187,7 +187,7 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache { async getOrReload(cp: CheckpointHex): Promise { const stateOrStateBytesData = await this.getStateOrLoadDb(cp); if (stateOrStateBytesData === null || isCachedBeaconState(stateOrStateBytesData)) { - return stateOrStateBytesData; + return stateOrStateBytesData?.clone(true) ?? null; } const {persistedKey, stateBytes} = stateOrStateBytesData; const logMeta = {persistedKey: toHexString(persistedKey)}; @@ -242,7 +242,7 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache { this.cache.set(cpKey, {type: CacheItemType.inMemory, state: newCachedState, persistedKey}); this.epochIndex.getOrDefault(cp.epoch).add(cp.rootHex); // don't prune from memory here, call it at the last 1/3 of slot 0 of an epoch - return newCachedState; + return newCachedState.clone(true); } catch (e) { this.logger.debug("Reload: error loading cached state", logMeta, e as Error); return null; @@ -312,7 +312,7 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache { if (isInMemoryCacheItem(cacheItem)) { const {state} = cacheItem; this.metrics?.stateClonedCount.observe(state.clonedCount); - return state; + return state.clone(true); } return null; @@ -352,9 +352,9 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache { .filter((e) => e <= maxEpoch); for (const epoch of epochs) { if (this.epochIndex.get(epoch)?.has(rootHex)) { - const inMemoryState = this.get({rootHex, epoch}); - if (inMemoryState) { - return inMemoryState; + const inMemoryClonedState = this.get({rootHex, epoch}); + if (inMemoryClonedState) { + return inMemoryClonedState; } } } @@ -376,9 +376,9 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache { for (const epoch of epochs) { if (this.epochIndex.get(epoch)?.has(rootHex)) { try { - const state = await this.getOrReload({rootHex, epoch}); - if (state) { - return state; + const clonedState = await this.getOrReload({rootHex, epoch}); + if (clonedState) { + return clonedState; } } catch (e) { this.logger.debug("Error get or reload state", {epoch, rootHex}, e as Error); diff --git a/packages/beacon-node/src/chain/stateCache/stateContextCache.ts b/packages/beacon-node/src/chain/stateCache/stateContextCache.ts index 3a04c4f4a258..a11398f2d962 100644 --- a/packages/beacon-node/src/chain/stateCache/stateContextCache.ts +++ b/packages/beacon-node/src/chain/stateCache/stateContextCache.ts @@ -48,7 +48,7 @@ export class StateContextCache implements BlockStateCache { this.metrics?.hits.inc(); this.metrics?.stateClonedCount.observe(item.clonedCount); - return item; + return item.clone(true); } add(item: CachedBeaconStateAllForks): void { diff --git a/packages/beacon-node/src/chain/stateCache/stateContextCheckpointsCache.ts b/packages/beacon-node/src/chain/stateCache/stateContextCheckpointsCache.ts index a177db9b7c87..a7e907a97014 100644 --- a/packages/beacon-node/src/chain/stateCache/stateContextCheckpointsCache.ts +++ b/packages/beacon-node/src/chain/stateCache/stateContextCheckpointsCache.ts @@ -72,7 +72,7 @@ export class CheckpointStateCache implements CheckpointStateCacheInterface { this.metrics?.stateClonedCount.observe(item.clonedCount); - return item; + return item.clone(true); } add(cp: phase0.Checkpoint, item: CachedBeaconStateAllForks): void { diff --git a/packages/beacon-node/src/util/multifork.ts b/packages/beacon-node/src/util/multifork.ts index 81b4921a0a4a..4abeacd2e566 100644 --- a/packages/beacon-node/src/util/multifork.ts +++ b/packages/beacon-node/src/util/multifork.ts @@ -1,5 +1,5 @@ import {ChainForkConfig} from "@lodestar/config"; -import {allForks} from "@lodestar/types"; +import {Slot, allForks} from "@lodestar/types"; import {bytesToInt} from "@lodestar/utils"; import {getSlotFromSignedBeaconBlockSerialized} from "./sszBytes.js"; @@ -36,10 +36,14 @@ export function getStateTypeFromBytes( config: ChainForkConfig, bytes: Buffer | Uint8Array ): allForks.AllForksSSZTypes["BeaconState"] { - const slot = bytesToInt(bytes.subarray(SLOT_BYTES_POSITION_IN_STATE, SLOT_BYTES_POSITION_IN_STATE + SLOT_BYTE_COUNT)); + const slot = getStateSlotFromBytes(bytes); return config.getForkTypes(slot).BeaconState; } +export function getStateSlotFromBytes(bytes: Uint8Array): Slot { + return bytesToInt(bytes.subarray(SLOT_BYTES_POSITION_IN_STATE, SLOT_BYTES_POSITION_IN_STATE + SLOT_BYTE_COUNT)); +} + /** * First field in update is beacon, first field in beacon is slot * diff --git a/packages/beacon-node/test/unit/chain/stateCache/persistentCheckpointsCache.test.ts b/packages/beacon-node/test/unit/chain/stateCache/persistentCheckpointsCache.test.ts index 9c37b863623d..af7c118f5e99 100644 --- a/packages/beacon-node/test/unit/chain/stateCache/persistentCheckpointsCache.test.ts +++ b/packages/beacon-node/test/unit/chain/stateCache/persistentCheckpointsCache.test.ts @@ -132,7 +132,9 @@ describe("PersistentCheckpointStateCache", function () { it("pruneFinalized and getStateOrBytes", async function () { cache.add(cp2, states["cp2"]); - expect(await cache.getStateOrBytes(cp0bHex)).toEqual(states["cp0b"]); + expect(((await cache.getStateOrBytes(cp0bHex)) as CachedBeaconStateAllForks).hashTreeRoot()).toEqual( + states["cp0b"].hashTreeRoot() + ); expect(await cache.processState(toHexString(cp2.root), states["cp2"])).toEqual(1); // cp0 is persisted expect(fileApisBuffer.size).toEqual(1); @@ -484,7 +486,9 @@ describe("PersistentCheckpointStateCache", function () { // regen needs to reload cp0b cache.add(cp0b, states["cp0b"]); - expect(await cache.getStateOrBytes(cp0bHex)).toEqual(states["cp0b"]); + expect(((await cache.getStateOrBytes(cp0bHex)) as CachedBeaconStateAllForks).hashTreeRoot()).toEqual( + states["cp0b"].hashTreeRoot() + ); // regen generates cp1b const cp1b = {epoch: 21, root: root0b}; @@ -670,7 +674,9 @@ describe("PersistentCheckpointStateCache", function () { // simulate regen cache.add(cp0b, states["cp0b"]); - expect(await cache.getStateOrBytes(cp0bHex)).toEqual(states["cp0b"]); + expect(((await cache.getStateOrBytes(cp0bHex)) as CachedBeaconStateAllForks).hashTreeRoot()).toEqual( + states["cp0b"].hashTreeRoot() + ); // root2, regen cp0b const cp1bState = states["cp0b"].clone(); cp1bState.slot = 21 * SLOTS_PER_EPOCH; @@ -847,7 +853,9 @@ describe("PersistentCheckpointStateCache", function () { // simulate reload cp1b cache.add(cp0b, states["cp0b"]); - expect(await cache.getStateOrBytes(cp0bHex)).toEqual(states["cp0b"]); + expect(((await cache.getStateOrBytes(cp0bHex)) as CachedBeaconStateAllForks).hashTreeRoot()).toEqual( + states["cp0b"].hashTreeRoot() + ); const root1b = Buffer.alloc(32, 101); const state1b = states["cp0b"].clone(); state1b.slot = state1a.slot + 1;