diff --git a/packages/beacon-node/src/chain/archiver/archiveStates.ts b/packages/beacon-node/src/chain/archiver/archiveStates.ts index 2231cd3ff513..53f2033d80e8 100644 --- a/packages/beacon-node/src/chain/archiver/archiveStates.ts +++ b/packages/beacon-node/src/chain/archiver/archiveStates.ts @@ -6,6 +6,8 @@ import {CheckpointWithHex} from "@lodestar/fork-choice"; import {IBeaconDb} from "../../db/index.js"; import {IStateRegenerator} from "../regen/interface.js"; import {getStateSlotFromBytes} from "../../util/multifork.js"; +import {serializeState} from "../serializeState.js"; +import {AllocSource, BufferPool} from "../../util/bufferPool.js"; /** * Minimum number of epochs between single temp archived states @@ -30,7 +32,8 @@ export class StatesArchiver { private readonly regen: IStateRegenerator, private readonly db: IBeaconDb, private readonly logger: Logger, - private readonly opts: StatesArchiverOpts + private readonly opts: StatesArchiverOpts, + private readonly bufferPool?: BufferPool | null ) {} /** @@ -95,8 +98,13 @@ export class StatesArchiver { await this.db.stateArchive.putBinary(slot, finalizedStateOrBytes); this.logger.verbose("Archived finalized state bytes", {epoch: finalized.epoch, slot, root: rootHex}); } else { - // state - await this.db.stateArchive.put(finalizedStateOrBytes.slot, finalizedStateOrBytes); + // serialize state using BufferPool if provided + await serializeState( + finalizedStateOrBytes, + AllocSource.ARCHIVE_STATE, + (stateBytes) => this.db.stateArchive.putBinary(finalizedStateOrBytes.slot, stateBytes), + this.bufferPool + ); // don't delete states before the finalized state, auto-prune will take care of it this.logger.verbose("Archived finalized state", { epoch: finalized.epoch, diff --git a/packages/beacon-node/src/chain/archiver/index.ts b/packages/beacon-node/src/chain/archiver/index.ts index ee0711e05e4b..294c2281e19b 100644 --- a/packages/beacon-node/src/chain/archiver/index.ts +++ b/packages/beacon-node/src/chain/archiver/index.ts @@ -48,7 +48,7 @@ export class Archiver { opts: ArchiverOpts ) { this.archiveBlobEpochs = opts.archiveBlobEpochs; - this.statesArchiver = new StatesArchiver(chain.regen, db, logger, opts); + this.statesArchiver = new StatesArchiver(chain.regen, db, logger, opts, chain.bufferPool); this.prevFinalized = chain.forkChoice.getFinalizedCheckpoint(); this.jobQueue = new JobItemQueue<[CheckpointWithHex], void>(this.processFinalizedCheckpoint, { maxLength: PROCESS_FINALIZED_CHECKPOINT_QUEUE_LEN, diff --git a/packages/beacon-node/src/chain/chain.ts b/packages/beacon-node/src/chain/chain.ts index b410a1e84655..8dbb49798538 100644 --- a/packages/beacon-node/src/chain/chain.ts +++ b/packages/beacon-node/src/chain/chain.ts @@ -119,6 +119,7 @@ export class BeaconChain implements IBeaconChain { readonly config: BeaconConfig; readonly logger: Logger; readonly metrics: Metrics | null; + readonly bufferPool: BufferPool | null; readonly anchorStateLatestBlockSlot: Slot; @@ -272,6 +273,9 @@ export class BeaconChain implements IBeaconChain { const blockStateCache = this.opts.nHistoricalStates ? new FIFOBlockStateCache(this.opts, {metrics}) : new BlockStateCacheImpl({metrics}); + this.bufferPool = this.opts.nHistoricalStates + ? new BufferPool(anchorState.type.tree_serializedSize(anchorState.node), metrics) + : null; const checkpointStateCache = this.opts.nHistoricalStates ? new PersistentCheckpointStateCache( { @@ -280,7 +284,7 @@ export class BeaconChain implements IBeaconChain { clock, shufflingCache: this.shufflingCache, blockStateCache, - bufferPool: new BufferPool(anchorState.type.tree_serializedSize(anchorState.node), metrics), + bufferPool: this.bufferPool, datastore: fileDataStore ? // debug option if we want to investigate any issues with the DB new FileCPStateDatastore() diff --git a/packages/beacon-node/src/chain/interface.ts b/packages/beacon-node/src/chain/interface.ts index ca13dc604ea0..5185662eaa4f 100644 --- a/packages/beacon-node/src/chain/interface.ts +++ b/packages/beacon-node/src/chain/interface.ts @@ -30,6 +30,7 @@ import {IEth1ForBlockProduction} from "../eth1/index.js"; import {IExecutionEngine, IExecutionBuilder} from "../execution/index.js"; import {Metrics} from "../metrics/metrics.js"; import {IClock} from "../util/clock.js"; +import {BufferPool} from "../util/bufferPool.js"; import {ChainEventEmitter} from "./emitter.js"; import {IStateRegenerator, RegenCaller} from "./regen/index.js"; import {IBlsVerifier} from "./bls/index.js"; @@ -86,6 +87,7 @@ export interface IBeaconChain { readonly config: BeaconConfig; readonly logger: Logger; readonly metrics: Metrics | null; + readonly bufferPool: BufferPool | null; /** The initial slot that the chain is started with */ readonly anchorStateLatestBlockSlot: Slot; diff --git a/packages/beacon-node/src/chain/serializeState.ts b/packages/beacon-node/src/chain/serializeState.ts new file mode 100644 index 000000000000..cbb2ecd18cff --- /dev/null +++ b/packages/beacon-node/src/chain/serializeState.ts @@ -0,0 +1,33 @@ +import {CachedBeaconStateAllForks} from "@lodestar/state-transition"; +import {AllocSource, BufferPool} from "../util/bufferPool.js"; + +type ProcessStateBytesFn = (stateBytes: Uint8Array) => Promise; + +/* + * Serialize state using the BufferPool if provided. + */ +export async function serializeState( + state: CachedBeaconStateAllForks, + source: AllocSource, + processFn: ProcessStateBytesFn, + bufferPool?: BufferPool | null +): Promise { + const size = state.type.tree_serializedSize(state.node); + let stateBytes: Uint8Array | null = null; + if (bufferPool) { + const bufferWithKey = bufferPool.alloc(size, source); + if (bufferWithKey) { + stateBytes = bufferWithKey.buffer; + const dataView = new DataView(stateBytes.buffer, stateBytes.byteOffset, stateBytes.byteLength); + state.serializeToBytes({uint8Array: stateBytes, dataView}, 0); + } + } + + if (!stateBytes) { + // we already have metrics in BufferPool so no need to do it here + stateBytes = state.serialize(); + } + + return processFn(stateBytes); + // release the buffer back to the pool automatically +} diff --git a/packages/beacon-node/src/chain/stateCache/persistentCheckpointsCache.ts b/packages/beacon-node/src/chain/stateCache/persistentCheckpointsCache.ts index 823f066abcd6..190b79e58cd6 100644 --- a/packages/beacon-node/src/chain/stateCache/persistentCheckpointsCache.ts +++ b/packages/beacon-node/src/chain/stateCache/persistentCheckpointsCache.ts @@ -8,8 +8,9 @@ import {INTERVALS_PER_SLOT} from "@lodestar/params"; import {Metrics} from "../../metrics/index.js"; import {IClock} from "../../util/clock.js"; import {ShufflingCache} from "../shufflingCache.js"; -import {BufferPool, BufferWithKey} from "../../util/bufferPool.js"; +import {AllocSource, BufferPool, BufferWithKey} from "../../util/bufferPool.js"; import {StateCloneOpts} from "../regen/interface.js"; +import {serializeState} from "../serializeState.js"; import {MapTracker} from "./mapMetrics.js"; import {CPStateDatastore, DatastoreKey, datastoreKeyToCheckpoint} from "./datastore/index.js"; import {CheckpointHex, CacheItemType, CheckpointStateCache, BlockStateCache} from "./types.js"; @@ -29,7 +30,7 @@ type PersistentCheckpointStateCacheModules = { shufflingCache: ShufflingCache; datastore: CPStateDatastore; blockStateCache: BlockStateCache; - bufferPool?: BufferPool; + bufferPool?: BufferPool | null; }; /** checkpoint serialized as a string */ @@ -106,7 +107,7 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache { private readonly datastore: CPStateDatastore; private readonly shufflingCache: ShufflingCache; private readonly blockStateCache: BlockStateCache; - private readonly bufferPool?: BufferPool; + private readonly bufferPool?: BufferPool | null; constructor( { @@ -698,19 +699,20 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache { // persist and do not update epochIndex this.metrics?.statePersistSecFromSlot.observe(this.clock?.secFromSlot(this.clock?.currentSlot ?? 0) ?? 0); const cpPersist = {epoch: epoch, root: fromHexString(rootHex)}; - { - const timer = this.metrics?.stateSerializeDuration.startTimer(); - // automatically free the buffer pool after this scope - using stateBytesWithKey = this.serializeState(state); - let stateBytes = stateBytesWithKey?.buffer; - if (stateBytes == null) { - // fallback logic to use regular way to get state ssz bytes - this.metrics?.persistedStateAllocCount.inc(); - stateBytes = state.serialize(); - } - timer?.(); - persistedKey = await this.datastore.write(cpPersist, stateBytes); - } + // It's not sustainable to allocate ~240MB for each state every epoch, so we use buffer pool to reuse the memory. + // As monitored on holesky as of Jan 2024: + // - This does not increase heap allocation while gc time is the same + // - It helps stabilize persist time and save ~300ms in average (1.5s vs 1.2s) + // - It also helps the state reload to save ~500ms in average (4.3s vs 3.8s) + // - Also `serializeState.test.ts` perf test shows a lot of differences allocating ~240MB once vs per state serialization + const timer = this.metrics?.stateSerializeDuration.startTimer(); + persistedKey = await serializeState( + state, + AllocSource.PERSISTENT_CHECKPOINTS_CACHE_STATE, + (stateBytes) => this.datastore.write(cpPersist, stateBytes), + this.bufferPool + ); + timer?.(); persistCount++; this.logger.verbose("Pruned checkpoint state from memory and persisted to disk", { ...logMeta, @@ -767,29 +769,6 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache { }); } - /* - * It's not sustainable to allocate ~240MB for each state every epoch, so we use buffer pool to reuse the memory. - * As monitored on holesky as of Jan 2024: - * - This does not increase heap allocation while gc time is the same - * - It helps stabilize persist time and save ~300ms in average (1.5s vs 1.2s) - * - It also helps the state reload to save ~500ms in average (4.3s vs 3.8s) - * - Also `serializeState.test.ts` perf test shows a lot of differences allocating ~240MB once vs per state serialization - */ - private serializeState(state: CachedBeaconStateAllForks): BufferWithKey | null { - const size = state.type.tree_serializedSize(state.node); - if (this.bufferPool) { - const bufferWithKey = this.bufferPool.alloc(size); - if (bufferWithKey) { - const stateBytes = bufferWithKey.buffer; - const dataView = new DataView(stateBytes.buffer, stateBytes.byteOffset, stateBytes.byteLength); - state.serializeToBytes({uint8Array: stateBytes, dataView}, 0); - return bufferWithKey; - } - } - - return null; - } - /** * Serialize validators to bytes leveraging the buffer pool to save memory allocation. * - As monitored on holesky as of Jan 2024, it helps save ~500ms state reload time (4.3s vs 3.8s) @@ -800,7 +779,7 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache { const type = state.type.fields.validators; const size = type.tree_serializedSize(state.validators.node); if (this.bufferPool) { - const bufferWithKey = this.bufferPool.alloc(size); + const bufferWithKey = this.bufferPool.alloc(size, AllocSource.PERSISTENT_CHECKPOINTS_CACHE_VALIDATORS); if (bufferWithKey) { const validatorsBytes = bufferWithKey.buffer; const dataView = new DataView(validatorsBytes.buffer, validatorsBytes.byteOffset, validatorsBytes.byteLength); diff --git a/packages/beacon-node/src/metrics/metrics/lodestar.ts b/packages/beacon-node/src/metrics/metrics/lodestar.ts index 55d43922d936..737a900e5f64 100644 --- a/packages/beacon-node/src/metrics/metrics/lodestar.ts +++ b/packages/beacon-node/src/metrics/metrics/lodestar.ts @@ -18,6 +18,7 @@ import {LodestarMetadata} from "../options.js"; import {RegistryMetricCreator} from "../utils/registryMetricCreator.js"; import {OpSource} from "../validatorMonitor.js"; import {CacheItemType} from "../../chain/stateCache/types.js"; +import {AllocSource} from "../../util/bufferPool.js"; export type LodestarMetrics = ReturnType; @@ -1165,13 +1166,15 @@ export function createLodestarMetrics( name: "lodestar_buffer_pool_length", help: "Buffer pool length", }), - hits: register.counter({ + hits: register.counter<{source: AllocSource}>({ name: "lodestar_buffer_pool_hits_total", help: "Total number of buffer pool hits", + labelNames: ["source"], }), - misses: register.counter({ + misses: register.counter<{source: AllocSource}>({ name: "lodestar_buffer_pool_misses_total", help: "Total number of buffer pool misses", + labelNames: ["source"], }), grows: register.counter({ name: "lodestar_buffer_pool_grows_total", @@ -1271,10 +1274,6 @@ export function createLodestarMetrics( name: "lodestar_cp_state_cache_persisted_state_remove_count", help: "Total number of persisted states removed", }), - persistedStateAllocCount: register.counter({ - name: "lodestar_cp_state_cache_persisted_state_alloc_count", - help: "Total number time to allocate memory for persisted state", - }), }, balancesCache: { diff --git a/packages/beacon-node/src/util/bufferPool.ts b/packages/beacon-node/src/util/bufferPool.ts index f9e18a6d64a5..e3cf10fa88b3 100644 --- a/packages/beacon-node/src/util/bufferPool.ts +++ b/packages/beacon-node/src/util/bufferPool.ts @@ -5,6 +5,12 @@ import {Metrics} from "../metrics/metrics.js"; */ const GROW_RATIO = 1.1; +export enum AllocSource { + PERSISTENT_CHECKPOINTS_CACHE_VALIDATORS = "persistent_checkpoints_cache_validators", + PERSISTENT_CHECKPOINTS_CACHE_STATE = "persistent_checkpoints_cache_state", + ARCHIVE_STATE = "archive_state", +} + /** * A simple implementation to manage a single buffer. * This is initially used for state serialization at every epoch and for state reload. @@ -36,24 +42,24 @@ export class BufferPool { * If the buffer is already in use, return null. * Grow the buffer if the requested size is larger than the current buffer. */ - alloc(size: number): BufferWithKey | null { - return this.doAlloc(size, false); + alloc(size: number, source: AllocSource): BufferWithKey | null { + return this.doAlloc(size, source, false); } /** * Same to alloc() but the buffer is not zeroed. */ - allocUnsafe(size: number): BufferWithKey | null { - return this.doAlloc(size, true); + allocUnsafe(size: number, source: AllocSource): BufferWithKey | null { + return this.doAlloc(size, source, true); } - private doAlloc(size: number, isUnsafe = false): BufferWithKey | null { + private doAlloc(size: number, source: AllocSource, isUnsafe = false): BufferWithKey | null { if (this.inUse) { - this.metrics?.misses.inc(); + this.metrics?.misses.inc({source}); return null; } this.inUse = true; - this.metrics?.hits.inc(); + this.metrics?.hits.inc({source}); this.currentKey += 1; if (size > this.buffer.length) { this.metrics?.grows.inc(); diff --git a/packages/beacon-node/test/unit/util/bufferPool.test.ts b/packages/beacon-node/test/unit/util/bufferPool.test.ts index 2c789c19f74d..ff66504ae65f 100644 --- a/packages/beacon-node/test/unit/util/bufferPool.test.ts +++ b/packages/beacon-node/test/unit/util/bufferPool.test.ts @@ -1,12 +1,12 @@ import {describe, it, expect} from "vitest"; -import {BufferPool} from "../../../src/util/bufferPool.js"; +import {AllocSource, BufferPool} from "../../../src/util/bufferPool.js"; describe("BufferPool", () => { const pool = new BufferPool(100); it("should increase length", () => { expect(pool.length).toEqual(110); - using mem = pool.alloc(200); + using mem = pool.alloc(200, AllocSource.PERSISTENT_CHECKPOINTS_CACHE_STATE); if (mem === null) { throw Error("Expected non-null mem"); } @@ -15,15 +15,15 @@ describe("BufferPool", () => { it("should not allow alloc if in use", () => { { - using mem = pool.alloc(20); + using mem = pool.alloc(20, AllocSource.PERSISTENT_CHECKPOINTS_CACHE_STATE); if (mem === null) { throw Error("Expected non-null mem"); } // in the same scope we can't allocate again - expect(pool.alloc(20)).toEqual(null); + expect(pool.alloc(20, AllocSource.PERSISTENT_CHECKPOINTS_CACHE_STATE)).toEqual(null); } // out of the scope we can allocate again - expect(pool.alloc(20)).not.toEqual(null); + expect(pool.alloc(20, AllocSource.PERSISTENT_CHECKPOINTS_CACHE_STATE)).not.toEqual(null); }); });