Skip to content

Commit

Permalink
feat: implement BufferPool for PersistentCPStateCache
Browse files Browse the repository at this point in the history
  • Loading branch information
twoeths committed Jan 9, 2024
1 parent ea49409 commit 527c0a3
Show file tree
Hide file tree
Showing 16 changed files with 590 additions and 113 deletions.
42 changes: 21 additions & 21 deletions packages/beacon-node/src/chain/blocks/importBlock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,11 @@ export async function importBlock(
): Promise<void> {
const {blockInput, postState, parentBlockSlot, executionStatus} = fullyVerifiedBlock;
const {block, source} = blockInput;
const blockRoot = this.config.getForkTypes(block.message.slot).BeaconBlock.hashTreeRoot(block.message);
const {slot: blockSlot} = block.message;
const blockRoot = this.config.getForkTypes(blockSlot).BeaconBlock.hashTreeRoot(block.message);
const blockRootHex = toHexString(blockRoot);
const currentEpoch = computeEpochAtSlot(this.forkChoice.getTime());
const blockEpoch = computeEpochAtSlot(block.message.slot);
const blockEpoch = computeEpochAtSlot(blockSlot);
const parentEpoch = computeEpochAtSlot(parentBlockSlot);
const prevFinalizedEpoch = this.forkChoice.getFinalizedCheckpoint().epoch;
const blockDelaySec = (fullyVerifiedBlock.seenTimestampSec - postState.genesisTime) % this.config.SECONDS_PER_SLOT;
Expand All @@ -87,17 +88,16 @@ export async function importBlock(

// This adds the state necessary to process the next block
// Some block event handlers require state being in state cache so need to do this before emitting EventType.block
this.regen.addPostState(postState);
this.regen.processState(blockRootHex, postState);

this.metrics?.importBlock.bySource.inc({source});
this.logger.verbose("Added block to forkchoice and state cache", {slot: block.message.slot, root: blockRootHex});
this.logger.verbose("Added block to forkchoice and state cache", {slot: blockSlot, root: blockRootHex});

// We want to import block asap so call all event handler in the next event loop
setTimeout(() => {
const slot = block.message.slot;
this.emitter.emit(routes.events.EventType.block, {
block: blockRootHex,
slot,
slot: blockSlot,
executionOptimistic: blockSummary != null && isOptimisticBlock(blockSummary),
});

Expand All @@ -106,7 +106,7 @@ export async function importBlock(
const {index, kzgCommitment} = blobSidecar;
this.emitter.emit(routes.events.EventType.blobSidecar, {
blockRoot: blockRootHex,
slot,
slot: blockSlot,
index,
kzgCommitment: toHexString(kzgCommitment),
versionedHash: toHexString(kzgCommitmentToVersionedHash(kzgCommitment)),
Expand Down Expand Up @@ -171,7 +171,7 @@ export async function importBlock(
correctHead,
missedSlotVote,
blockRootHex,
block.message.slot
blockSlot
);
} catch (e) {
// a block has a lot of attestations and it may has same error, we don't want to log all of them
Expand All @@ -185,15 +185,15 @@ export async function importBlock(
}
} else {
// always log other errors
this.logger.warn("Error processing attestation from block", {slot: block.message.slot}, e as Error);
this.logger.warn("Error processing attestation from block", {slot: blockSlot}, e as Error);
}
}
}

for (const {error, count} of invalidAttestationErrorsByCode.values()) {
this.logger.warn(
"Error processing attestations from block",
{slot: block.message.slot, erroredAttestations: count},
{slot: blockSlot, erroredAttestations: count},
error
);
}
Expand All @@ -214,7 +214,7 @@ export async function importBlock(
// all AttesterSlashings are valid before reaching this
this.forkChoice.onAttesterSlashing(slashing);
} catch (e) {
this.logger.warn("Error processing AttesterSlashing from block", {slot: block.message.slot}, e as Error);
this.logger.warn("Error processing AttesterSlashing from block", {slot: blockSlot}, e as Error);
}
}
}
Expand Down Expand Up @@ -297,7 +297,7 @@ export async function importBlock(
parentBlockSlot
);
} catch (e) {
this.logger.verbose("Error lightClientServer.onImportBlock", {slot: block.message.slot}, e as Error);
this.logger.verbose("Error lightClientServer.onImportBlock", {slot: blockSlot}, e as Error);
}
}, 0);
}
Expand Down Expand Up @@ -351,10 +351,10 @@ export async function importBlock(
if (parentEpoch < blockEpoch) {
// current epoch and previous epoch are likely cached in previous states
this.shufflingCache.processState(postState, postState.epochCtx.nextShuffling.epoch);
this.logger.verbose("Processed shuffling for next epoch", {parentEpoch, blockEpoch, slot: block.message.slot});
this.logger.verbose("Processed shuffling for next epoch", {parentEpoch, blockEpoch, slot: blockSlot});
}

if (block.message.slot % SLOTS_PER_EPOCH === 0) {
if (blockSlot % SLOTS_PER_EPOCH === 0) {
// Cache state to preserve epoch transition work
const checkpointState = postState;
const cp = getCheckpointFromState(checkpointState);
Expand Down Expand Up @@ -397,7 +397,7 @@ export async function importBlock(

// Send block events, only for recent enough blocks

if (this.clock.currentSlot - block.message.slot < EVENTSTREAM_EMIT_RECENT_BLOCK_SLOTS) {
if (this.clock.currentSlot - blockSlot < EVENTSTREAM_EMIT_RECENT_BLOCK_SLOTS) {
// NOTE: Skip looping if there are no listeners from the API
if (this.emitter.listenerCount(routes.events.EventType.voluntaryExit)) {
for (const voluntaryExit of block.message.body.voluntaryExits) {
Expand All @@ -417,10 +417,10 @@ export async function importBlock(
}

// Register stat metrics about the block after importing it
this.metrics?.parentBlockDistance.observe(block.message.slot - parentBlockSlot);
this.metrics?.parentBlockDistance.observe(blockSlot - parentBlockSlot);
this.metrics?.proposerBalanceDeltaAny.observe(fullyVerifiedBlock.proposerBalanceDelta);
this.metrics?.registerImportedBlock(block.message, fullyVerifiedBlock);
if (this.config.getForkSeq(block.message.slot) >= ForkSeq.altair) {
if (this.config.getForkSeq(blockSlot) >= ForkSeq.altair) {
this.metrics?.registerSyncAggregateInBlock(
blockEpoch,
(block as altair.SignedBeaconBlock).message.body.syncAggregate,
Expand All @@ -433,18 +433,18 @@ export async function importBlock(
// Gossip blocks need to be imported as soon as possible, waiting attestations could be processed
// in the next event loop. See https://github.com/ChainSafe/lodestar/issues/4789
setTimeout(() => {
this.reprocessController.onBlockImported({slot: block.message.slot, root: blockRootHex}, advancedSlot);
this.reprocessController.onBlockImported({slot: blockSlot, root: blockRootHex}, advancedSlot);
}, 0);

if (opts.seenTimestampSec !== undefined) {
const recvToImportedBlock = Date.now() / 1000 - opts.seenTimestampSec;
this.metrics?.gossipBlock.receivedToBlockImport.observe(recvToImportedBlock);
this.logger.verbose("Imported block", {slot: block.message.slot, recvToImportedBlock});
this.logger.verbose("Imported block", {slot: blockSlot, recvToImportedBlock});
}

this.logger.verbose("Block processed", {
slot: block.message.slot,
slot: blockSlot,
root: blockRootHex,
delaySec: this.clock.secFromSlot(block.message.slot),
delaySec: this.clock.secFromSlot(blockSlot),
});
}
2 changes: 1 addition & 1 deletion packages/beacon-node/src/chain/regen/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ export interface IStateRegenerator extends IStateRegeneratorInternal {
getClosestHeadState(head: ProtoBlock): CachedBeaconStateAllForks | null;
pruneOnCheckpoint(finalizedEpoch: Epoch, justifiedEpoch: Epoch, headStateRoot: RootHex): void;
pruneOnFinalized(finalizedEpoch: Epoch): void;
addPostState(postState: CachedBeaconStateAllForks): void;
processState(blockRootHex: RootHex, postState: CachedBeaconStateAllForks): void;
addCheckpointState(cp: phase0.Checkpoint, item: CachedBeaconStateAllForks): void;
updateHeadState(newHeadStateRoot: RootHex, maybeHeadState: CachedBeaconStateAllForks): void;
updatePreComputedCheckpoint(rootHex: RootHex, epoch: Epoch): number | null;
Expand Down
10 changes: 7 additions & 3 deletions packages/beacon-node/src/chain/regen/queued.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ import {IForkChoice, ProtoBlock} from "@lodestar/fork-choice";
import {CachedBeaconStateAllForks, computeEpochAtSlot} from "@lodestar/state-transition";
import {Logger} from "@lodestar/utils";
import {routes} from "@lodestar/api";
import {CheckpointHex, CheckpointStateCache, StateContextCache, toCheckpointHex} from "../stateCache/index.js";
import {CheckpointHex, toCheckpointHex} from "../stateCache/index.js";
import {Metrics} from "../../metrics/index.js";
import {JobItemQueue} from "../../util/queue/index.js";
import {BlockStateCache, CheckpointStateCache} from "../stateCache/types.js";
import {IStateRegenerator, IStateRegeneratorInternal, RegenCaller, RegenFnName, StateCloneOpts} from "./interface.js";
import {StateRegenerator, RegenModules} from "./regen.js";
import {RegenError, RegenErrorCode} from "./errors.js";
Expand Down Expand Up @@ -34,7 +35,7 @@ export class QueuedStateRegenerator implements IStateRegenerator {
private readonly regen: StateRegenerator;

private readonly forkChoice: IForkChoice;
private readonly stateCache: StateContextCache;
private readonly stateCache: BlockStateCache;
private readonly checkpointStateCache: CheckpointStateCache;
private readonly metrics: Metrics | null;
private readonly logger: Logger;
Expand Down Expand Up @@ -88,8 +89,11 @@ export class QueuedStateRegenerator implements IStateRegenerator {
this.stateCache.deleteAllBeforeEpoch(finalizedEpoch);
}

addPostState(postState: CachedBeaconStateAllForks): void {
processState(blockRootHex: RootHex, postState: CachedBeaconStateAllForks): void {
this.stateCache.add(postState);
this.checkpointStateCache.processState(blockRootHex, postState).catch((e) => {
this.logger.debug("Error processing block state", {blockRootHex, slot: postState.slot}, e);
});
}

addCheckpointState(cp: phase0.Checkpoint, item: CachedBeaconStateAllForks): void {
Expand Down
4 changes: 1 addition & 3 deletions packages/beacon-node/src/chain/stateCache/datastore/db.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import {CachedBeaconStateAllForks} from "@lodestar/state-transition";
import {phase0, ssz} from "@lodestar/types";
import {IBeaconDb} from "../../../db/interface.js";
import {CPStateDatastore, DatastoreKey} from "./types.js";
Expand All @@ -9,9 +8,8 @@ import {CPStateDatastore, DatastoreKey} from "./types.js";
export class DbCPStateDatastore implements CPStateDatastore {
constructor(private readonly db: IBeaconDb) {}

async write(cpKey: phase0.Checkpoint, state: CachedBeaconStateAllForks): Promise<DatastoreKey> {
async write(cpKey: phase0.Checkpoint, stateBytes: Uint8Array): Promise<DatastoreKey> {
const serializedCheckpoint = checkpointToDatastoreKey(cpKey);
const stateBytes = state.serialize();
await this.db.checkpointState.putBinary(serializedCheckpoint, stateBytes);
return serializedCheckpoint;
}
Expand Down
52 changes: 52 additions & 0 deletions packages/beacon-node/src/chain/stateCache/datastore/file.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import path from "node:path";
import {toHexString, fromHexString} from "@chainsafe/ssz";
import {phase0, ssz} from "@lodestar/types";
import {ensureDir, readFile, readFileNames, removeFile, writeIfNotExist} from "../../../util/file.js";
import {CPStateDatastore, DatastoreKey} from "./types.js";

const CHECKPOINT_STATES_FOLDER = "checkpoint_states";
const CHECKPOINT_FILE_NAME_LENGTH = 82;

/**
* Implementation of CPStatePersistentApis using file system, this is beneficial for debugging.
*/
export class FileCPStateDatastore implements CPStateDatastore {
private readonly folderPath: string;

constructor(parentDir: string = ".") {
// by default use the beacon folder `/beacon/checkpoint_states`
this.folderPath = path.join(parentDir, CHECKPOINT_STATES_FOLDER);
}

async init(): Promise<void> {
try {
await ensureDir(this.folderPath);
} catch (_) {
// do nothing
}
}

async write(cpKey: phase0.Checkpoint, stateBytes: Uint8Array): Promise<DatastoreKey> {
const serializedCheckpoint = ssz.phase0.Checkpoint.serialize(cpKey);
const filePath = path.join(this.folderPath, toHexString(serializedCheckpoint));
await writeIfNotExist(filePath, stateBytes);
return serializedCheckpoint;
}

async remove(serializedCheckpoint: DatastoreKey): Promise<void> {
const filePath = path.join(this.folderPath, toHexString(serializedCheckpoint));
await removeFile(filePath);
}

async read(serializedCheckpoint: DatastoreKey): Promise<Uint8Array | null> {
const filePath = path.join(this.folderPath, toHexString(serializedCheckpoint));
return readFile(filePath);
}

async readKeys(): Promise<DatastoreKey[]> {
const fileNames = await readFileNames(this.folderPath);
return fileNames
.filter((fileName) => fileName.startsWith("0x") && fileName.length === CHECKPOINT_FILE_NAME_LENGTH)
.map((fileName) => fromHexString(fileName));
}
}
4 changes: 2 additions & 2 deletions packages/beacon-node/src/chain/stateCache/datastore/types.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import {CachedBeaconStateAllForks} from "@lodestar/state-transition";
import {phase0} from "@lodestar/types";

// With db implementation, persistedKey is serialized data of a checkpoint
export type DatastoreKey = Uint8Array;

// Make this generic to support testing
export interface CPStateDatastore {
write: (cpKey: phase0.Checkpoint, state: CachedBeaconStateAllForks) => Promise<DatastoreKey>;
write: (cpKey: phase0.Checkpoint, stateBytes: Uint8Array) => Promise<DatastoreKey>;
remove: (key: DatastoreKey) => Promise<void>;
read: (key: DatastoreKey) => Promise<Uint8Array | null>;
readKeys: () => Promise<DatastoreKey[]>;
init?: () => Promise<void>;
}
Loading

0 comments on commit 527c0a3

Please sign in to comment.