Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement BufferPool for PersistentCPStateCache #6269

Merged
merged 6 commits into from
Jan 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 21 additions & 21 deletions packages/beacon-node/src/chain/blocks/importBlock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,11 @@ export async function importBlock(
): Promise<void> {
const {blockInput, postState, parentBlockSlot, executionStatus} = fullyVerifiedBlock;
const {block, source} = blockInput;
const blockRoot = this.config.getForkTypes(block.message.slot).BeaconBlock.hashTreeRoot(block.message);
const {slot: blockSlot} = block.message;
const blockRoot = this.config.getForkTypes(blockSlot).BeaconBlock.hashTreeRoot(block.message);
const blockRootHex = toHexString(blockRoot);
const currentEpoch = computeEpochAtSlot(this.forkChoice.getTime());
const blockEpoch = computeEpochAtSlot(block.message.slot);
const blockEpoch = computeEpochAtSlot(blockSlot);
const parentEpoch = computeEpochAtSlot(parentBlockSlot);
const prevFinalizedEpoch = this.forkChoice.getFinalizedCheckpoint().epoch;
const blockDelaySec = (fullyVerifiedBlock.seenTimestampSec - postState.genesisTime) % this.config.SECONDS_PER_SLOT;
Expand All @@ -87,17 +88,16 @@ export async function importBlock(

// This adds the state necessary to process the next block
// Some block event handlers require state being in state cache so need to do this before emitting EventType.block
this.regen.addPostState(postState);
this.regen.processState(blockRootHex, postState);

this.metrics?.importBlock.bySource.inc({source});
this.logger.verbose("Added block to forkchoice and state cache", {slot: block.message.slot, root: blockRootHex});
this.logger.verbose("Added block to forkchoice and state cache", {slot: blockSlot, root: blockRootHex});

// We want to import block asap so call all event handler in the next event loop
setTimeout(() => {
const slot = block.message.slot;
this.emitter.emit(routes.events.EventType.block, {
block: blockRootHex,
slot,
slot: blockSlot,
executionOptimistic: blockSummary != null && isOptimisticBlock(blockSummary),
});

Expand All @@ -106,7 +106,7 @@ export async function importBlock(
const {index, kzgCommitment} = blobSidecar;
this.emitter.emit(routes.events.EventType.blobSidecar, {
blockRoot: blockRootHex,
slot,
slot: blockSlot,
index,
kzgCommitment: toHexString(kzgCommitment),
versionedHash: toHexString(kzgCommitmentToVersionedHash(kzgCommitment)),
Expand Down Expand Up @@ -171,7 +171,7 @@ export async function importBlock(
correctHead,
missedSlotVote,
blockRootHex,
block.message.slot
blockSlot
);
} catch (e) {
// a block has a lot of attestations and it may has same error, we don't want to log all of them
Expand All @@ -185,15 +185,15 @@ export async function importBlock(
}
} else {
// always log other errors
this.logger.warn("Error processing attestation from block", {slot: block.message.slot}, e as Error);
this.logger.warn("Error processing attestation from block", {slot: blockSlot}, e as Error);
}
}
}

for (const {error, count} of invalidAttestationErrorsByCode.values()) {
this.logger.warn(
"Error processing attestations from block",
{slot: block.message.slot, erroredAttestations: count},
{slot: blockSlot, erroredAttestations: count},
error
);
}
Expand All @@ -214,7 +214,7 @@ export async function importBlock(
// all AttesterSlashings are valid before reaching this
this.forkChoice.onAttesterSlashing(slashing);
} catch (e) {
this.logger.warn("Error processing AttesterSlashing from block", {slot: block.message.slot}, e as Error);
this.logger.warn("Error processing AttesterSlashing from block", {slot: blockSlot}, e as Error);
}
}
}
Expand Down Expand Up @@ -297,7 +297,7 @@ export async function importBlock(
parentBlockSlot
);
} catch (e) {
this.logger.verbose("Error lightClientServer.onImportBlock", {slot: block.message.slot}, e as Error);
this.logger.verbose("Error lightClientServer.onImportBlock", {slot: blockSlot}, e as Error);
}
}, 0);
}
Expand Down Expand Up @@ -351,10 +351,10 @@ export async function importBlock(
if (parentEpoch < blockEpoch) {
// current epoch and previous epoch are likely cached in previous states
this.shufflingCache.processState(postState, postState.epochCtx.nextShuffling.epoch);
this.logger.verbose("Processed shuffling for next epoch", {parentEpoch, blockEpoch, slot: block.message.slot});
this.logger.verbose("Processed shuffling for next epoch", {parentEpoch, blockEpoch, slot: blockSlot});
}

if (block.message.slot % SLOTS_PER_EPOCH === 0) {
if (blockSlot % SLOTS_PER_EPOCH === 0) {
// Cache state to preserve epoch transition work
const checkpointState = postState;
const cp = getCheckpointFromState(checkpointState);
Expand Down Expand Up @@ -397,7 +397,7 @@ export async function importBlock(

// Send block events, only for recent enough blocks

if (this.clock.currentSlot - block.message.slot < EVENTSTREAM_EMIT_RECENT_BLOCK_SLOTS) {
if (this.clock.currentSlot - blockSlot < EVENTSTREAM_EMIT_RECENT_BLOCK_SLOTS) {
// NOTE: Skip looping if there are no listeners from the API
if (this.emitter.listenerCount(routes.events.EventType.voluntaryExit)) {
for (const voluntaryExit of block.message.body.voluntaryExits) {
Expand All @@ -417,10 +417,10 @@ export async function importBlock(
}

// Register stat metrics about the block after importing it
this.metrics?.parentBlockDistance.observe(block.message.slot - parentBlockSlot);
this.metrics?.parentBlockDistance.observe(blockSlot - parentBlockSlot);
this.metrics?.proposerBalanceDeltaAny.observe(fullyVerifiedBlock.proposerBalanceDelta);
this.metrics?.registerImportedBlock(block.message, fullyVerifiedBlock);
if (this.config.getForkSeq(block.message.slot) >= ForkSeq.altair) {
if (this.config.getForkSeq(blockSlot) >= ForkSeq.altair) {
this.metrics?.registerSyncAggregateInBlock(
blockEpoch,
(block as altair.SignedBeaconBlock).message.body.syncAggregate,
Expand All @@ -433,18 +433,18 @@ export async function importBlock(
// Gossip blocks need to be imported as soon as possible, waiting attestations could be processed
// in the next event loop. See https://github.com/ChainSafe/lodestar/issues/4789
setTimeout(() => {
this.reprocessController.onBlockImported({slot: block.message.slot, root: blockRootHex}, advancedSlot);
this.reprocessController.onBlockImported({slot: blockSlot, root: blockRootHex}, advancedSlot);
}, 0);

if (opts.seenTimestampSec !== undefined) {
const recvToImportedBlock = Date.now() / 1000 - opts.seenTimestampSec;
this.metrics?.gossipBlock.receivedToBlockImport.observe(recvToImportedBlock);
this.logger.verbose("Imported block", {slot: block.message.slot, recvToImportedBlock});
this.logger.verbose("Imported block", {slot: blockSlot, recvToImportedBlock});
}

this.logger.verbose("Block processed", {
slot: block.message.slot,
slot: blockSlot,
root: blockRootHex,
delaySec: this.clock.secFromSlot(block.message.slot),
delaySec: this.clock.secFromSlot(blockSlot),
});
}
2 changes: 1 addition & 1 deletion packages/beacon-node/src/chain/regen/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ export interface IStateRegenerator extends IStateRegeneratorInternal {
getClosestHeadState(head: ProtoBlock): CachedBeaconStateAllForks | null;
pruneOnCheckpoint(finalizedEpoch: Epoch, justifiedEpoch: Epoch, headStateRoot: RootHex): void;
pruneOnFinalized(finalizedEpoch: Epoch): void;
addPostState(postState: CachedBeaconStateAllForks): void;
processState(blockRootHex: RootHex, postState: CachedBeaconStateAllForks): void;
addCheckpointState(cp: phase0.Checkpoint, item: CachedBeaconStateAllForks): void;
updateHeadState(newHeadStateRoot: RootHex, maybeHeadState: CachedBeaconStateAllForks): void;
updatePreComputedCheckpoint(rootHex: RootHex, epoch: Epoch): number | null;
Expand Down
10 changes: 7 additions & 3 deletions packages/beacon-node/src/chain/regen/queued.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ import {IForkChoice, ProtoBlock} from "@lodestar/fork-choice";
import {CachedBeaconStateAllForks, computeEpochAtSlot} from "@lodestar/state-transition";
import {Logger} from "@lodestar/utils";
import {routes} from "@lodestar/api";
import {CheckpointHex, CheckpointStateCache, StateContextCache, toCheckpointHex} from "../stateCache/index.js";
import {CheckpointHex, toCheckpointHex} from "../stateCache/index.js";
import {Metrics} from "../../metrics/index.js";
import {JobItemQueue} from "../../util/queue/index.js";
import {BlockStateCache, CheckpointStateCache} from "../stateCache/types.js";
import {IStateRegenerator, IStateRegeneratorInternal, RegenCaller, RegenFnName, StateCloneOpts} from "./interface.js";
import {StateRegenerator, RegenModules} from "./regen.js";
import {RegenError, RegenErrorCode} from "./errors.js";
Expand Down Expand Up @@ -34,7 +35,7 @@ export class QueuedStateRegenerator implements IStateRegenerator {
private readonly regen: StateRegenerator;

private readonly forkChoice: IForkChoice;
private readonly stateCache: StateContextCache;
private readonly stateCache: BlockStateCache;
private readonly checkpointStateCache: CheckpointStateCache;
private readonly metrics: Metrics | null;
private readonly logger: Logger;
Expand Down Expand Up @@ -88,8 +89,11 @@ export class QueuedStateRegenerator implements IStateRegenerator {
this.stateCache.deleteAllBeforeEpoch(finalizedEpoch);
}

addPostState(postState: CachedBeaconStateAllForks): void {
processState(blockRootHex: RootHex, postState: CachedBeaconStateAllForks): void {
this.stateCache.add(postState);
this.checkpointStateCache.processState(blockRootHex, postState).catch((e) => {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the current CheckpointStateCache implementation does nothing on this call

this.logger.debug("Error processing block state", {blockRootHex, slot: postState.slot}, e);
});
}

addCheckpointState(cp: phase0.Checkpoint, item: CachedBeaconStateAllForks): void {
Expand Down
4 changes: 1 addition & 3 deletions packages/beacon-node/src/chain/stateCache/datastore/db.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import {CachedBeaconStateAllForks} from "@lodestar/state-transition";
import {phase0, ssz} from "@lodestar/types";
import {IBeaconDb} from "../../../db/interface.js";
import {CPStateDatastore, DatastoreKey} from "./types.js";
Expand All @@ -9,9 +8,8 @@ import {CPStateDatastore, DatastoreKey} from "./types.js";
export class DbCPStateDatastore implements CPStateDatastore {
constructor(private readonly db: IBeaconDb) {}

async write(cpKey: phase0.Checkpoint, state: CachedBeaconStateAllForks): Promise<DatastoreKey> {
async write(cpKey: phase0.Checkpoint, stateBytes: Uint8Array): Promise<DatastoreKey> {
const serializedCheckpoint = checkpointToDatastoreKey(cpKey);
const stateBytes = state.serialize();
await this.db.checkpointState.putBinary(serializedCheckpoint, stateBytes);
return serializedCheckpoint;
}
Expand Down
52 changes: 52 additions & 0 deletions packages/beacon-node/src/chain/stateCache/datastore/file.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import path from "node:path";
import {toHexString, fromHexString} from "@chainsafe/ssz";
import {phase0, ssz} from "@lodestar/types";
import {ensureDir, readFile, readFileNames, removeFile, writeIfNotExist} from "../../../util/file.js";
import {CPStateDatastore, DatastoreKey} from "./types.js";

const CHECKPOINT_STATES_FOLDER = "checkpoint_states";
const CHECKPOINT_FILE_NAME_LENGTH = 82;

/**
* Implementation of CPStateDatastore using file system, this is beneficial for debugging.
*/
export class FileCPStateDatastore implements CPStateDatastore {
private readonly folderPath: string;

constructor(parentDir: string = ".") {
// by default use the beacon folder `/beacon/checkpoint_states`
this.folderPath = path.join(parentDir, CHECKPOINT_STATES_FOLDER);
}

async init(): Promise<void> {
try {
await ensureDir(this.folderPath);
} catch (_) {
// do nothing
}
}

async write(cpKey: phase0.Checkpoint, stateBytes: Uint8Array): Promise<DatastoreKey> {
const serializedCheckpoint = ssz.phase0.Checkpoint.serialize(cpKey);
const filePath = path.join(this.folderPath, toHexString(serializedCheckpoint));
await writeIfNotExist(filePath, stateBytes);
return serializedCheckpoint;
}

async remove(serializedCheckpoint: DatastoreKey): Promise<void> {
const filePath = path.join(this.folderPath, toHexString(serializedCheckpoint));
await removeFile(filePath);
}

async read(serializedCheckpoint: DatastoreKey): Promise<Uint8Array | null> {
const filePath = path.join(this.folderPath, toHexString(serializedCheckpoint));
return readFile(filePath);
}

async readKeys(): Promise<DatastoreKey[]> {
const fileNames = await readFileNames(this.folderPath);
return fileNames
.filter((fileName) => fileName.startsWith("0x") && fileName.length === CHECKPOINT_FILE_NAME_LENGTH)
.map((fileName) => fromHexString(fileName));
}
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import {CachedBeaconStateAllForks} from "@lodestar/state-transition";
import {phase0} from "@lodestar/types";

// With db implementation, persistedKey is serialized data of a checkpoint
export type DatastoreKey = Uint8Array;

// Make this generic to support testing
export interface CPStateDatastore {
write: (cpKey: phase0.Checkpoint, state: CachedBeaconStateAllForks) => Promise<DatastoreKey>;
write: (cpKey: phase0.Checkpoint, stateBytes: Uint8Array) => Promise<DatastoreKey>;
remove: (key: DatastoreKey) => Promise<void>;
read: (key: DatastoreKey) => Promise<Uint8Array | null>;
readKeys: () => Promise<DatastoreKey[]>;
init?: () => Promise<void>;
}
Loading
Loading