Skip to content

Commit

Permalink
feat: archive state using BufferPool if provided (#7042)
Browse files Browse the repository at this point in the history
* fix: archive state using BufferPool if provided

* chore: fix comment
  • Loading branch information
twoeths authored Sep 5, 2024
1 parent 4e22884 commit fe6c4ac
Show file tree
Hide file tree
Showing 9 changed files with 94 additions and 63 deletions.
14 changes: 11 additions & 3 deletions packages/beacon-node/src/chain/archiver/archiveStates.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import {CheckpointWithHex} from "@lodestar/fork-choice";
import {IBeaconDb} from "../../db/index.js";
import {IStateRegenerator} from "../regen/interface.js";
import {getStateSlotFromBytes} from "../../util/multifork.js";
import {serializeState} from "../serializeState.js";
import {AllocSource, BufferPool} from "../../util/bufferPool.js";

/**
* Minimum number of epochs between single temp archived states
Expand All @@ -30,7 +32,8 @@ export class StatesArchiver {
private readonly regen: IStateRegenerator,
private readonly db: IBeaconDb,
private readonly logger: Logger,
private readonly opts: StatesArchiverOpts
private readonly opts: StatesArchiverOpts,
private readonly bufferPool?: BufferPool | null
) {}

/**
Expand Down Expand Up @@ -95,8 +98,13 @@ export class StatesArchiver {
await this.db.stateArchive.putBinary(slot, finalizedStateOrBytes);
this.logger.verbose("Archived finalized state bytes", {epoch: finalized.epoch, slot, root: rootHex});
} else {
// state
await this.db.stateArchive.put(finalizedStateOrBytes.slot, finalizedStateOrBytes);
// serialize state using BufferPool if provided
await serializeState(
finalizedStateOrBytes,
AllocSource.ARCHIVE_STATE,
(stateBytes) => this.db.stateArchive.putBinary(finalizedStateOrBytes.slot, stateBytes),
this.bufferPool
);
// don't delete states before the finalized state, auto-prune will take care of it
this.logger.verbose("Archived finalized state", {
epoch: finalized.epoch,
Expand Down
2 changes: 1 addition & 1 deletion packages/beacon-node/src/chain/archiver/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ export class Archiver {
opts: ArchiverOpts
) {
this.archiveBlobEpochs = opts.archiveBlobEpochs;
this.statesArchiver = new StatesArchiver(chain.regen, db, logger, opts);
this.statesArchiver = new StatesArchiver(chain.regen, db, logger, opts, chain.bufferPool);
this.prevFinalized = chain.forkChoice.getFinalizedCheckpoint();
this.jobQueue = new JobItemQueue<[CheckpointWithHex], void>(this.processFinalizedCheckpoint, {
maxLength: PROCESS_FINALIZED_CHECKPOINT_QUEUE_LEN,
Expand Down
6 changes: 5 additions & 1 deletion packages/beacon-node/src/chain/chain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ export class BeaconChain implements IBeaconChain {
readonly config: BeaconConfig;
readonly logger: Logger;
readonly metrics: Metrics | null;
readonly bufferPool: BufferPool | null;

readonly anchorStateLatestBlockSlot: Slot;

Expand Down Expand Up @@ -272,6 +273,9 @@ export class BeaconChain implements IBeaconChain {
const blockStateCache = this.opts.nHistoricalStates
? new FIFOBlockStateCache(this.opts, {metrics})
: new BlockStateCacheImpl({metrics});
this.bufferPool = this.opts.nHistoricalStates
? new BufferPool(anchorState.type.tree_serializedSize(anchorState.node), metrics)
: null;
const checkpointStateCache = this.opts.nHistoricalStates
? new PersistentCheckpointStateCache(
{
Expand All @@ -280,7 +284,7 @@ export class BeaconChain implements IBeaconChain {
clock,
shufflingCache: this.shufflingCache,
blockStateCache,
bufferPool: new BufferPool(anchorState.type.tree_serializedSize(anchorState.node), metrics),
bufferPool: this.bufferPool,
datastore: fileDataStore
? // debug option if we want to investigate any issues with the DB
new FileCPStateDatastore()
Expand Down
2 changes: 2 additions & 0 deletions packages/beacon-node/src/chain/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import {IEth1ForBlockProduction} from "../eth1/index.js";
import {IExecutionEngine, IExecutionBuilder} from "../execution/index.js";
import {Metrics} from "../metrics/metrics.js";
import {IClock} from "../util/clock.js";
import {BufferPool} from "../util/bufferPool.js";
import {ChainEventEmitter} from "./emitter.js";
import {IStateRegenerator, RegenCaller} from "./regen/index.js";
import {IBlsVerifier} from "./bls/index.js";
Expand Down Expand Up @@ -86,6 +87,7 @@ export interface IBeaconChain {
readonly config: BeaconConfig;
readonly logger: Logger;
readonly metrics: Metrics | null;
readonly bufferPool: BufferPool | null;

/** The initial slot that the chain is started with */
readonly anchorStateLatestBlockSlot: Slot;
Expand Down
33 changes: 33 additions & 0 deletions packages/beacon-node/src/chain/serializeState.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import {CachedBeaconStateAllForks} from "@lodestar/state-transition";
import {AllocSource, BufferPool} from "../util/bufferPool.js";

type ProcessStateBytesFn<T> = (stateBytes: Uint8Array) => Promise<T>;

/*
* Serialize state using the BufferPool if provided.
*/
export async function serializeState<T>(
state: CachedBeaconStateAllForks,
source: AllocSource,
processFn: ProcessStateBytesFn<T>,
bufferPool?: BufferPool | null
): Promise<T> {
const size = state.type.tree_serializedSize(state.node);
let stateBytes: Uint8Array | null = null;
if (bufferPool) {
const bufferWithKey = bufferPool.alloc(size, source);
if (bufferWithKey) {
stateBytes = bufferWithKey.buffer;
const dataView = new DataView(stateBytes.buffer, stateBytes.byteOffset, stateBytes.byteLength);
state.serializeToBytes({uint8Array: stateBytes, dataView}, 0);
}
}

if (!stateBytes) {
// we already have metrics in BufferPool so no need to do it here
stateBytes = state.serialize();
}

return processFn(stateBytes);
// release the buffer back to the pool automatically
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ import {INTERVALS_PER_SLOT} from "@lodestar/params";
import {Metrics} from "../../metrics/index.js";
import {IClock} from "../../util/clock.js";
import {ShufflingCache} from "../shufflingCache.js";
import {BufferPool, BufferWithKey} from "../../util/bufferPool.js";
import {AllocSource, BufferPool, BufferWithKey} from "../../util/bufferPool.js";
import {StateCloneOpts} from "../regen/interface.js";
import {serializeState} from "../serializeState.js";
import {MapTracker} from "./mapMetrics.js";
import {CPStateDatastore, DatastoreKey, datastoreKeyToCheckpoint} from "./datastore/index.js";
import {CheckpointHex, CacheItemType, CheckpointStateCache, BlockStateCache} from "./types.js";
Expand All @@ -29,7 +30,7 @@ type PersistentCheckpointStateCacheModules = {
shufflingCache: ShufflingCache;
datastore: CPStateDatastore;
blockStateCache: BlockStateCache;
bufferPool?: BufferPool;
bufferPool?: BufferPool | null;
};

/** checkpoint serialized as a string */
Expand Down Expand Up @@ -106,7 +107,7 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
private readonly datastore: CPStateDatastore;
private readonly shufflingCache: ShufflingCache;
private readonly blockStateCache: BlockStateCache;
private readonly bufferPool?: BufferPool;
private readonly bufferPool?: BufferPool | null;

constructor(
{
Expand Down Expand Up @@ -698,19 +699,20 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
// persist and do not update epochIndex
this.metrics?.statePersistSecFromSlot.observe(this.clock?.secFromSlot(this.clock?.currentSlot ?? 0) ?? 0);
const cpPersist = {epoch: epoch, root: fromHexString(rootHex)};
{
const timer = this.metrics?.stateSerializeDuration.startTimer();
// automatically free the buffer pool after this scope
using stateBytesWithKey = this.serializeState(state);
let stateBytes = stateBytesWithKey?.buffer;
if (stateBytes == null) {
// fallback logic to use regular way to get state ssz bytes
this.metrics?.persistedStateAllocCount.inc();
stateBytes = state.serialize();
}
timer?.();
persistedKey = await this.datastore.write(cpPersist, stateBytes);
}
// It's not sustainable to allocate ~240MB for each state every epoch, so we use buffer pool to reuse the memory.
// As monitored on holesky as of Jan 2024:
// - This does not increase heap allocation while gc time is the same
// - It helps stabilize persist time and save ~300ms in average (1.5s vs 1.2s)
// - It also helps the state reload to save ~500ms in average (4.3s vs 3.8s)
// - Also `serializeState.test.ts` perf test shows a lot of differences allocating ~240MB once vs per state serialization
const timer = this.metrics?.stateSerializeDuration.startTimer();
persistedKey = await serializeState(
state,
AllocSource.PERSISTENT_CHECKPOINTS_CACHE_STATE,
(stateBytes) => this.datastore.write(cpPersist, stateBytes),
this.bufferPool
);
timer?.();
persistCount++;
this.logger.verbose("Pruned checkpoint state from memory and persisted to disk", {
...logMeta,
Expand Down Expand Up @@ -767,29 +769,6 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
});
}

/*
* It's not sustainable to allocate ~240MB for each state every epoch, so we use buffer pool to reuse the memory.
* As monitored on holesky as of Jan 2024:
* - This does not increase heap allocation while gc time is the same
* - It helps stabilize persist time and save ~300ms in average (1.5s vs 1.2s)
* - It also helps the state reload to save ~500ms in average (4.3s vs 3.8s)
* - Also `serializeState.test.ts` perf test shows a lot of differences allocating ~240MB once vs per state serialization
*/
private serializeState(state: CachedBeaconStateAllForks): BufferWithKey | null {
const size = state.type.tree_serializedSize(state.node);
if (this.bufferPool) {
const bufferWithKey = this.bufferPool.alloc(size);
if (bufferWithKey) {
const stateBytes = bufferWithKey.buffer;
const dataView = new DataView(stateBytes.buffer, stateBytes.byteOffset, stateBytes.byteLength);
state.serializeToBytes({uint8Array: stateBytes, dataView}, 0);
return bufferWithKey;
}
}

return null;
}

/**
* Serialize validators to bytes leveraging the buffer pool to save memory allocation.
* - As monitored on holesky as of Jan 2024, it helps save ~500ms state reload time (4.3s vs 3.8s)
Expand All @@ -800,7 +779,7 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
const type = state.type.fields.validators;
const size = type.tree_serializedSize(state.validators.node);
if (this.bufferPool) {
const bufferWithKey = this.bufferPool.alloc(size);
const bufferWithKey = this.bufferPool.alloc(size, AllocSource.PERSISTENT_CHECKPOINTS_CACHE_VALIDATORS);
if (bufferWithKey) {
const validatorsBytes = bufferWithKey.buffer;
const dataView = new DataView(validatorsBytes.buffer, validatorsBytes.byteOffset, validatorsBytes.byteLength);
Expand Down
11 changes: 5 additions & 6 deletions packages/beacon-node/src/metrics/metrics/lodestar.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import {LodestarMetadata} from "../options.js";
import {RegistryMetricCreator} from "../utils/registryMetricCreator.js";
import {OpSource} from "../validatorMonitor.js";
import {CacheItemType} from "../../chain/stateCache/types.js";
import {AllocSource} from "../../util/bufferPool.js";

export type LodestarMetrics = ReturnType<typeof createLodestarMetrics>;

Expand Down Expand Up @@ -1165,13 +1166,15 @@ export function createLodestarMetrics(
name: "lodestar_buffer_pool_length",
help: "Buffer pool length",
}),
hits: register.counter({
hits: register.counter<{source: AllocSource}>({
name: "lodestar_buffer_pool_hits_total",
help: "Total number of buffer pool hits",
labelNames: ["source"],
}),
misses: register.counter({
misses: register.counter<{source: AllocSource}>({
name: "lodestar_buffer_pool_misses_total",
help: "Total number of buffer pool misses",
labelNames: ["source"],
}),
grows: register.counter({
name: "lodestar_buffer_pool_grows_total",
Expand Down Expand Up @@ -1271,10 +1274,6 @@ export function createLodestarMetrics(
name: "lodestar_cp_state_cache_persisted_state_remove_count",
help: "Total number of persisted states removed",
}),
persistedStateAllocCount: register.counter({
name: "lodestar_cp_state_cache_persisted_state_alloc_count",
help: "Total number time to allocate memory for persisted state",
}),
},

balancesCache: {
Expand Down
20 changes: 13 additions & 7 deletions packages/beacon-node/src/util/bufferPool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ import {Metrics} from "../metrics/metrics.js";
*/
const GROW_RATIO = 1.1;

export enum AllocSource {
PERSISTENT_CHECKPOINTS_CACHE_VALIDATORS = "persistent_checkpoints_cache_validators",
PERSISTENT_CHECKPOINTS_CACHE_STATE = "persistent_checkpoints_cache_state",
ARCHIVE_STATE = "archive_state",
}

/**
* A simple implementation to manage a single buffer.
* This is initially used for state serialization at every epoch and for state reload.
Expand Down Expand Up @@ -36,24 +42,24 @@ export class BufferPool {
* If the buffer is already in use, return null.
* Grow the buffer if the requested size is larger than the current buffer.
*/
alloc(size: number): BufferWithKey | null {
return this.doAlloc(size, false);
alloc(size: number, source: AllocSource): BufferWithKey | null {
return this.doAlloc(size, source, false);
}

/**
* Same to alloc() but the buffer is not zeroed.
*/
allocUnsafe(size: number): BufferWithKey | null {
return this.doAlloc(size, true);
allocUnsafe(size: number, source: AllocSource): BufferWithKey | null {
return this.doAlloc(size, source, true);
}

private doAlloc(size: number, isUnsafe = false): BufferWithKey | null {
private doAlloc(size: number, source: AllocSource, isUnsafe = false): BufferWithKey | null {
if (this.inUse) {
this.metrics?.misses.inc();
this.metrics?.misses.inc({source});
return null;
}
this.inUse = true;
this.metrics?.hits.inc();
this.metrics?.hits.inc({source});
this.currentKey += 1;
if (size > this.buffer.length) {
this.metrics?.grows.inc();
Expand Down
10 changes: 5 additions & 5 deletions packages/beacon-node/test/unit/util/bufferPool.test.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import {describe, it, expect} from "vitest";
import {BufferPool} from "../../../src/util/bufferPool.js";
import {AllocSource, BufferPool} from "../../../src/util/bufferPool.js";

describe("BufferPool", () => {
const pool = new BufferPool(100);

it("should increase length", () => {
expect(pool.length).toEqual(110);
using mem = pool.alloc(200);
using mem = pool.alloc(200, AllocSource.PERSISTENT_CHECKPOINTS_CACHE_STATE);
if (mem === null) {
throw Error("Expected non-null mem");
}
Expand All @@ -15,15 +15,15 @@ describe("BufferPool", () => {

it("should not allow alloc if in use", () => {
{
using mem = pool.alloc(20);
using mem = pool.alloc(20, AllocSource.PERSISTENT_CHECKPOINTS_CACHE_STATE);
if (mem === null) {
throw Error("Expected non-null mem");
}
// in the same scope we can't allocate again
expect(pool.alloc(20)).toEqual(null);
expect(pool.alloc(20, AllocSource.PERSISTENT_CHECKPOINTS_CACHE_STATE)).toEqual(null);
}

// out of the scope we can allocate again
expect(pool.alloc(20)).not.toEqual(null);
expect(pool.alloc(20, AllocSource.PERSISTENT_CHECKPOINTS_CACHE_STATE)).not.toEqual(null);
});
});

0 comments on commit fe6c4ac

Please sign in to comment.