Skip to content

Commit

Permalink
feat: add strategy support for state archives (#7170)
Browse files Browse the repository at this point in the history
* Add strategy support for state-archives

* Comment new strategy temporaily

* Fix types

* Fix unit tests

* Fix unit tests

* Hide new option

* Rename ArchiveMode.Full to ArchiveMode.Frequency

* Rename ArchiveMode to StateArchiveMode

* Update the code as per feedback

* Fix cli package import
  • Loading branch information
nazarhussain authored Oct 21, 2024
1 parent e6c1c5b commit c9cf3ea
Show file tree
Hide file tree
Showing 13 changed files with 261 additions and 198 deletions.
166 changes: 166 additions & 0 deletions packages/beacon-node/src/chain/archiver/archiver.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
import {Logger} from "@lodestar/utils";
import {CheckpointWithHex} from "@lodestar/fork-choice";
import {IBeaconDb} from "../../db/index.js";
import {JobItemQueue} from "../../util/queue/index.js";
import {IBeaconChain} from "../interface.js";
import {ChainEvent} from "../emitter.js";
import {Metrics} from "../../metrics/metrics.js";
import {FrequencyStateArchiveStrategy} from "./strategies/frequencyStateArchiveStrategy.js";
import {archiveBlocks} from "./archiveBlocks.js";
import {StateArchiveMode, ArchiverOpts, StateArchiveStrategy} from "./interface.js";

export const DEFAULT_STATE_ARCHIVE_MODE = StateArchiveMode.Frequency;

export const PROCESS_FINALIZED_CHECKPOINT_QUEUE_LEN = 256;

/**
* Used for running tasks that depends on some events or are executed
* periodically.
*/
export class Archiver {
private stateArchiveMode: StateArchiveMode;
private jobQueue: JobItemQueue<[CheckpointWithHex], void>;

private prevFinalized: CheckpointWithHex;
private readonly statesArchiverStrategy: StateArchiveStrategy;
private archiveBlobEpochs?: number;

constructor(
private readonly db: IBeaconDb,
private readonly chain: IBeaconChain,
private readonly logger: Logger,
signal: AbortSignal,
opts: ArchiverOpts,
private readonly metrics?: Metrics | null
) {
if (opts.stateArchiveMode === StateArchiveMode.Frequency) {
this.statesArchiverStrategy = new FrequencyStateArchiveStrategy(chain.regen, db, logger, opts, chain.bufferPool);
} else {
throw new Error(`State archive strategy "${opts.stateArchiveMode}" currently not supported.`);
}

this.stateArchiveMode = opts.stateArchiveMode;
this.archiveBlobEpochs = opts.archiveBlobEpochs;
this.prevFinalized = chain.forkChoice.getFinalizedCheckpoint();
this.jobQueue = new JobItemQueue<[CheckpointWithHex], void>(this.processFinalizedCheckpoint, {
maxLength: PROCESS_FINALIZED_CHECKPOINT_QUEUE_LEN,
signal,
});

if (!opts.disableArchiveOnCheckpoint) {
this.chain.emitter.on(ChainEvent.forkChoiceFinalized, this.onFinalizedCheckpoint);
this.chain.emitter.on(ChainEvent.checkpoint, this.onCheckpoint);

signal.addEventListener(
"abort",
() => {
this.chain.emitter.off(ChainEvent.forkChoiceFinalized, this.onFinalizedCheckpoint);
this.chain.emitter.off(ChainEvent.checkpoint, this.onCheckpoint);
},
{once: true}
);
}
}

/** Archive latest finalized state */
async persistToDisk(): Promise<void> {
return this.statesArchiverStrategy.maybeArchiveState(this.chain.forkChoice.getFinalizedCheckpoint());
}

private onFinalizedCheckpoint = async (finalized: CheckpointWithHex): Promise<void> => {
return this.jobQueue.push(finalized);
};

private onCheckpoint = (): void => {
const headStateRoot = this.chain.forkChoice.getHead().stateRoot;
this.chain.regen.pruneOnCheckpoint(
this.chain.forkChoice.getFinalizedCheckpoint().epoch,
this.chain.forkChoice.getJustifiedCheckpoint().epoch,
headStateRoot
);

this.statesArchiverStrategy.onCheckpoint(headStateRoot, this.metrics).catch((err) => {
this.logger.error("Error during state archive", {stateArchiveMode: this.stateArchiveMode}, err);
});
};

private processFinalizedCheckpoint = async (finalized: CheckpointWithHex): Promise<void> => {
try {
const finalizedEpoch = finalized.epoch;
this.logger.verbose("Start processing finalized checkpoint", {epoch: finalizedEpoch, rootHex: finalized.rootHex});
await archiveBlocks(
this.chain.config,
this.db,
this.chain.forkChoice,
this.chain.lightClientServer,
this.logger,
finalized,
this.chain.clock.currentEpoch,
this.archiveBlobEpochs
);
this.prevFinalized = finalized;

await this.statesArchiverStrategy.onFinalizedCheckpoint(finalized, this.metrics);

// should be after ArchiveBlocksTask to handle restart cleanly
await this.statesArchiverStrategy.maybeArchiveState(finalized, this.metrics);

this.chain.regen.pruneOnFinalized(finalizedEpoch);

// tasks rely on extended fork choice
const prunedBlocks = this.chain.forkChoice.prune(finalized.rootHex);
await this.updateBackfillRange(finalized);

this.logger.verbose("Finish processing finalized checkpoint", {
epoch: finalizedEpoch,
rootHex: finalized.rootHex,
prunedBlocks: prunedBlocks.length,
});
} catch (e) {
this.logger.error("Error processing finalized checkpoint", {epoch: finalized.epoch}, e as Error);
}
};

/**
* Backfill sync relies on verified connected ranges (which are represented as key,value
* with a verified jump from a key back to value). Since the node could have progressed
* ahead from, we need to save the forward progress of this node as another backfill
* range entry, that backfill sync will use to jump back if this node is restarted
* for any reason.
* The current backfill has its own backfill entry from anchor slot to last backfilled
* slot. And this would create the entry from the current finalized slot to the anchor
* slot.
*/
private updateBackfillRange = async (finalized: CheckpointWithHex): Promise<void> => {
try {
// Mark the sequence in backfill db from finalized block's slot till anchor slot as
// filled.
const finalizedBlockFC = this.chain.forkChoice.getBlockHex(finalized.rootHex);
if (finalizedBlockFC && finalizedBlockFC.slot > this.chain.anchorStateLatestBlockSlot) {
await this.db.backfilledRanges.put(finalizedBlockFC.slot, this.chain.anchorStateLatestBlockSlot);

// Clear previously marked sequence till anchorStateLatestBlockSlot, without
// touching backfill sync process sequence which are at
// <=anchorStateLatestBlockSlot i.e. clear >anchorStateLatestBlockSlot
// and < currentSlot
const filteredSeqs = await this.db.backfilledRanges.entries({
gt: this.chain.anchorStateLatestBlockSlot,
lt: finalizedBlockFC.slot,
});
this.logger.debug("updated backfilledRanges", {
key: finalizedBlockFC.slot,
value: this.chain.anchorStateLatestBlockSlot,
});
if (filteredSeqs.length > 0) {
await this.db.backfilledRanges.batchDelete(filteredSeqs.map((entry) => entry.key));
this.logger.debug(
`Forward Sync - cleaned up backfilledRanges between ${finalizedBlockFC.slot},${this.chain.anchorStateLatestBlockSlot}`,
{seqs: JSON.stringify(filteredSeqs)}
);
}
}
} catch (e) {
this.logger.error("Error updating backfilledRanges on finalization", {epoch: finalized.epoch}, e as Error);
}
};
}
172 changes: 2 additions & 170 deletions packages/beacon-node/src/chain/archiver/index.ts
Original file line number Diff line number Diff line change
@@ -1,170 +1,2 @@
import {Logger} from "@lodestar/utils";
import {CheckpointWithHex} from "@lodestar/fork-choice";
import {IBeaconDb} from "../../db/index.js";
import {JobItemQueue} from "../../util/queue/index.js";
import {IBeaconChain} from "../interface.js";
import {ChainEvent} from "../emitter.js";
import {Metrics} from "../../metrics/metrics.js";
import {StatesArchiver, StatesArchiverOpts} from "./archiveStates.js";
import {archiveBlocks} from "./archiveBlocks.js";

const PROCESS_FINALIZED_CHECKPOINT_QUEUE_LEN = 256;

export type ArchiverOpts = StatesArchiverOpts & {
disableArchiveOnCheckpoint?: boolean;
archiveBlobEpochs?: number;
};

type ProposalStats = {
total: number;
finalized: number;
orphaned: number;
missed: number;
};

export type FinalizedStats = {
allValidators: ProposalStats;
attachedValidators: ProposalStats;
finalizedCanonicalCheckpointsCount: number;
finalizedFoundCheckpointsInStateCache: number;
finalizedAttachedValidatorsCount: number;
};

/**
* Used for running tasks that depends on some events or are executed
* periodically.
*/
export class Archiver {
private jobQueue: JobItemQueue<[CheckpointWithHex], void>;

private prevFinalized: CheckpointWithHex;
private readonly statesArchiver: StatesArchiver;
private archiveBlobEpochs?: number;

constructor(
private readonly db: IBeaconDb,
private readonly chain: IBeaconChain,
private readonly logger: Logger,
signal: AbortSignal,
opts: ArchiverOpts,
private readonly metrics?: Metrics | null
) {
this.archiveBlobEpochs = opts.archiveBlobEpochs;
this.statesArchiver = new StatesArchiver(chain.regen, db, logger, opts, chain.bufferPool);
this.prevFinalized = chain.forkChoice.getFinalizedCheckpoint();
this.jobQueue = new JobItemQueue<[CheckpointWithHex], void>(this.processFinalizedCheckpoint, {
maxLength: PROCESS_FINALIZED_CHECKPOINT_QUEUE_LEN,
signal,
});

if (!opts.disableArchiveOnCheckpoint) {
this.chain.emitter.on(ChainEvent.forkChoiceFinalized, this.onFinalizedCheckpoint);
this.chain.emitter.on(ChainEvent.checkpoint, this.onCheckpoint);

signal.addEventListener(
"abort",
() => {
this.chain.emitter.off(ChainEvent.forkChoiceFinalized, this.onFinalizedCheckpoint);
this.chain.emitter.off(ChainEvent.checkpoint, this.onCheckpoint);
},
{once: true}
);
}
}

/** Archive latest finalized state */
async persistToDisk(): Promise<void> {
await this.statesArchiver.archiveState(this.chain.forkChoice.getFinalizedCheckpoint());
}

private onFinalizedCheckpoint = async (finalized: CheckpointWithHex): Promise<void> => {
return this.jobQueue.push(finalized);
};

private onCheckpoint = (): void => {
const headStateRoot = this.chain.forkChoice.getHead().stateRoot;
this.chain.regen.pruneOnCheckpoint(
this.chain.forkChoice.getFinalizedCheckpoint().epoch,
this.chain.forkChoice.getJustifiedCheckpoint().epoch,
headStateRoot
);
};

private processFinalizedCheckpoint = async (finalized: CheckpointWithHex): Promise<void> => {
try {
const finalizedEpoch = finalized.epoch;
this.logger.verbose("Start processing finalized checkpoint", {epoch: finalizedEpoch, rootHex: finalized.rootHex});
await archiveBlocks(
this.chain.config,
this.db,
this.chain.forkChoice,
this.chain.lightClientServer,
this.logger,
finalized,
this.chain.clock.currentEpoch,
this.archiveBlobEpochs
);
this.prevFinalized = finalized;

// should be after ArchiveBlocksTask to handle restart cleanly
await this.statesArchiver.maybeArchiveState(finalized, this.metrics);

this.chain.regen.pruneOnFinalized(finalizedEpoch);

// tasks rely on extended fork choice
const prunedBlocks = this.chain.forkChoice.prune(finalized.rootHex);
await this.updateBackfillRange(finalized);

this.logger.verbose("Finish processing finalized checkpoint", {
epoch: finalizedEpoch,
rootHex: finalized.rootHex,
prunedBlocks: prunedBlocks.length,
});
} catch (e) {
this.logger.error("Error processing finalized checkpoint", {epoch: finalized.epoch}, e as Error);
}
};

/**
* Backfill sync relies on verified connected ranges (which are represented as key,value
* with a verified jump from a key back to value). Since the node could have progressed
* ahead from, we need to save the forward progress of this node as another backfill
* range entry, that backfill sync will use to jump back if this node is restarted
* for any reason.
* The current backfill has its own backfill entry from anchor slot to last backfilled
* slot. And this would create the entry from the current finalized slot to the anchor
* slot.
*/
private updateBackfillRange = async (finalized: CheckpointWithHex): Promise<void> => {
try {
// Mark the sequence in backfill db from finalized block's slot till anchor slot as
// filled.
const finalizedBlockFC = this.chain.forkChoice.getBlockHex(finalized.rootHex);
if (finalizedBlockFC && finalizedBlockFC.slot > this.chain.anchorStateLatestBlockSlot) {
await this.db.backfilledRanges.put(finalizedBlockFC.slot, this.chain.anchorStateLatestBlockSlot);

// Clear previously marked sequence till anchorStateLatestBlockSlot, without
// touching backfill sync process sequence which are at
// <=anchorStateLatestBlockSlot i.e. clear >anchorStateLatestBlockSlot
// and < currentSlot
const filteredSeqs = await this.db.backfilledRanges.entries({
gt: this.chain.anchorStateLatestBlockSlot,
lt: finalizedBlockFC.slot,
});
this.logger.debug("updated backfilledRanges", {
key: finalizedBlockFC.slot,
value: this.chain.anchorStateLatestBlockSlot,
});
if (filteredSeqs.length > 0) {
await this.db.backfilledRanges.batchDelete(filteredSeqs.map((entry) => entry.key));
this.logger.debug(
`Forward Sync - cleaned up backfilledRanges between ${finalizedBlockFC.slot},${this.chain.anchorStateLatestBlockSlot}`,
{seqs: JSON.stringify(filteredSeqs)}
);
}
}
} catch (e) {
this.logger.error("Error updating backfilledRanges on finalization", {epoch: finalized.epoch}, e as Error);
}
};
}
export * from "./archiver.js";
export * from "./interface.js";
47 changes: 47 additions & 0 deletions packages/beacon-node/src/chain/archiver/interface.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import {CheckpointWithHex} from "@lodestar/fork-choice";
import {Metrics} from "../../metrics/metrics.js";
import {RootHex} from "@lodestar/types";

export enum StateArchiveMode {
Frequency = "frequency",
// New strategy to be implemented
// WIP: https://github.com/ChainSafe/lodestar/pull/7005
// Differential = "diff",
}

export interface StatesArchiverOpts {
/**
* Minimum number of epochs between archived states
*/
archiveStateEpochFrequency: number;
/**
* Strategy to store archive states
*/
stateArchiveMode: StateArchiveMode;
}

export type ArchiverOpts = StatesArchiverOpts & {
disableArchiveOnCheckpoint?: boolean;
archiveBlobEpochs?: number;
};

export type ProposalStats = {
total: number;
finalized: number;
orphaned: number;
missed: number;
};

export type FinalizedStats = {
allValidators: ProposalStats;
attachedValidators: ProposalStats;
finalizedCanonicalCheckpointsCount: number;
finalizedFoundCheckpointsInStateCache: number;
finalizedAttachedValidatorsCount: number;
};

export interface StateArchiveStrategy {
onCheckpoint(stateRoot: RootHex, metrics?: Metrics | null): Promise<void>;
onFinalizedCheckpoint(finalized: CheckpointWithHex, metrics?: Metrics | null): Promise<void>;
maybeArchiveState(finalized: CheckpointWithHex, metrics?: Metrics | null): Promise<void>;
}
Loading

0 comments on commit c9cf3ea

Please sign in to comment.