Skip to content

Commit

Permalink
feat: implement ShufflingCache
Browse files Browse the repository at this point in the history
  • Loading branch information
twoeths committed Oct 10, 2023
1 parent e42d6cc commit 95a885a
Show file tree
Hide file tree
Showing 13 changed files with 387 additions and 140 deletions.
7 changes: 7 additions & 0 deletions packages/beacon-node/src/chain/blocks/importBlock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ export async function importBlock(
const blockRootHex = toHexString(blockRoot);
const currentEpoch = computeEpochAtSlot(this.forkChoice.getTime());
const blockEpoch = computeEpochAtSlot(block.message.slot);
const parentEpoch = computeEpochAtSlot(parentBlockSlot);
const prevFinalizedEpoch = this.forkChoice.getFinalizedCheckpoint().epoch;
const blockDelaySec = (fullyVerifiedBlock.seenTimestampSec - postState.genesisTime) % this.config.SECONDS_PER_SLOT;

Expand Down Expand Up @@ -331,6 +332,12 @@ export async function importBlock(
this.logger.verbose("After importBlock caching postState without SSZ cache", {slot: postState.slot});
}

if (parentEpoch < blockEpoch) {
// current epoch and previous epoch are likely cached in previous states
this.shufflingCache.processState(postState, postState.epochCtx.nextShuffling.epoch);
this.logger.verbose("Processed shuffling for next epoch", {parentEpoch, blockEpoch, slot: block.message.slot});
}

if (block.message.slot % SLOTS_PER_EPOCH === 0) {
// Cache state to preserve epoch transition work
const checkpointState = postState;
Expand Down
9 changes: 8 additions & 1 deletion packages/beacon-node/src/chain/chain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import {IExecutionEngine, IExecutionBuilder} from "../execution/index.js";
import {Clock, ClockEvent, IClock} from "../util/clock.js";
import {ensureDir, writeIfNotExist} from "../util/file.js";
import {isOptimisticBlock} from "../util/forkChoice.js";
import {CheckpointStateCache, StateContextCache} from "./stateCache/index.js";
import {BlockProcessor, ImportBlockOpts} from "./blocks/index.js";
import {ChainEventEmitter, ChainEvent} from "./emitter.js";
import {IBeaconChain, ProposerPreparationData, BlockHash, StateGetOpts} from "./interface.js";
Expand Down Expand Up @@ -75,6 +74,9 @@ import {BlockAttributes, produceBlockBody} from "./produceBlock/produceBlockBody
import {computeNewStateRoot} from "./produceBlock/computeNewStateRoot.js";
import {BlockInput} from "./blocks/types.js";
import {SeenAttestationDatas} from "./seenCache/seenAttestationData.js";
import {ShufflingCache} from "./shufflingCache.js";
import {StateContextCache} from "./stateCache/stateContextCache.js";
import {CheckpointStateCache} from "./stateCache/stateContextCheckpointsCache.js";

/**
* Arbitrary constants, blobs should be consumed immediately in the same slot they are produced.
Expand Down Expand Up @@ -130,6 +132,7 @@ export class BeaconChain implements IBeaconChain {

readonly beaconProposerCache: BeaconProposerCache;
readonly checkpointBalancesCache: CheckpointBalancesCache;
readonly shufflingCache: ShufflingCache;
// TODO DENEB: Prune data structure every time period, for both old entries
/** Map keyed by executionPayload.blockHash of the block for those blobs */
readonly producedBlobSidecarsCache = new Map<BlockHash, {blobSidecars: deneb.BlobSidecars; slot: Slot}>();
Expand Down Expand Up @@ -211,6 +214,7 @@ export class BeaconChain implements IBeaconChain {

this.beaconProposerCache = new BeaconProposerCache(opts);
this.checkpointBalancesCache = new CheckpointBalancesCache();
this.shufflingCache = new ShufflingCache(metrics);

// Restore state caches
// anchorState may already by a CachedBeaconState. If so, don't create the cache again, since deserializing all
Expand All @@ -225,6 +229,9 @@ export class BeaconChain implements IBeaconChain {
pubkey2index: new PubkeyIndexMap(),
index2pubkey: [],
});
this.shufflingCache.processState(cachedState, cachedState.epochCtx.previousShuffling.epoch);
this.shufflingCache.processState(cachedState, cachedState.epochCtx.currentShuffling.epoch);
this.shufflingCache.processState(cachedState, cachedState.epochCtx.nextShuffling.epoch);

// Persist single global instance of state caches
this.pubkey2index = cachedState.epochCtx.pubkey2index;
Expand Down
2 changes: 2 additions & 0 deletions packages/beacon-node/src/chain/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import {CheckpointBalancesCache} from "./balancesCache.js";
import {IChainOptions} from "./options.js";
import {AssembledBlockType, BlockAttributes, BlockType} from "./produceBlock/produceBlockBody.js";
import {SeenAttestationDatas} from "./seenCache/seenAttestationData.js";
import {ShufflingCache} from "./shufflingCache.js";

export {BlockType, type AssembledBlockType};
export {type ProposerPreparationData};
Expand Down Expand Up @@ -93,6 +94,7 @@ export interface IBeaconChain {

readonly beaconProposerCache: BeaconProposerCache;
readonly checkpointBalancesCache: CheckpointBalancesCache;
readonly shufflingCache: ShufflingCache;
readonly producedBlobSidecarsCache: Map<BlockHash, {blobSidecars: deneb.BlobSidecars; slot: Slot}>;
readonly producedBlindedBlobSidecarsCache: Map<BlockHash, {blobSidecars: deneb.BlindedBlobSidecars; slot: Slot}>;
readonly producedBlockRoot: Set<RootHex>;
Expand Down
61 changes: 61 additions & 0 deletions packages/beacon-node/src/chain/shufflingCache.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import {CachedBeaconStateAllForks, EpochShuffling, getShufflingDecisionBlock} from "@lodestar/state-transition";
import {Epoch, RootHex} from "@lodestar/types";
import {Metrics} from "../metrics/metrics.js";

/**
* Same value to CheckpointBalancesCache, with the assumption that we don't have to use it for old epochs. In the worse case:
* - when loading state bytes from disk, we need to compute shuffling for all epochs (~1s as of Sep 2023)
* - don't have shuffling to verify attestations, need to do 1 epoch transition to add shuffling to this cache. This never happens
* with default chain option of maxSkipSlots = 32
**/
const MAX_SHUFFLING_CACHE_SIZE = 4;

type ShufflingCacheItem = {
decisionBlockHex: RootHex;
shuffling: EpochShuffling;
};

/**
* A shuffling cache to help:
* - get committee quickly for attestation verification
* - skip computing shuffling when loading state bytes from disk
*/
export class ShufflingCache {
private readonly items: ShufflingCacheItem[] = [];

constructor(metrics: Metrics | null = null) {
if (metrics) {
metrics.shufflingCache.size.addCollect(() => metrics.shufflingCache.size.set(this.items.length));
}
}

processState(state: CachedBeaconStateAllForks, shufflingEpoch: Epoch): void {
const decisionBlockHex = getShufflingDecisionBlock(state, shufflingEpoch);
const index = this.items.findIndex(
(item) => item.shuffling.epoch === shufflingEpoch && item.decisionBlockHex === decisionBlockHex
);
if (index === -1) {
if (this.items.length === MAX_SHUFFLING_CACHE_SIZE) {
this.items.shift();
}
let shuffling: EpochShuffling;
if (shufflingEpoch === state.epochCtx.nextShuffling.epoch) {
shuffling = state.epochCtx.nextShuffling;
} else if (shufflingEpoch === state.epochCtx.currentShuffling.epoch) {
shuffling = state.epochCtx.currentShuffling;
} else if (shufflingEpoch === state.epochCtx.previousShuffling.epoch) {
shuffling = state.epochCtx.previousShuffling;
} else {
throw new Error(`Shuffling not found from state ${state.slot} for epoch ${shufflingEpoch}`);
}
this.items.push({decisionBlockHex, shuffling});
}
}

get(shufflingEpoch: Epoch, dependentRootHex: RootHex): EpochShuffling | null {
return (
this.items.find((item) => item.shuffling.epoch === shufflingEpoch && item.decisionBlockHex === dependentRootHex)
?.shuffling ?? null
);
}
}
43 changes: 22 additions & 21 deletions packages/beacon-node/src/chain/validation/aggregateAndProof.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import {phase0, RootHex, ssz, ValidatorIndex} from "@lodestar/types";
import {
computeEpochAtSlot,
isAggregatorFromCommitteeLength,
getIndexedAttestationSignatureSet,
ISignatureSet,
createAggregateSignatureSetFromComponents,
} from "@lodestar/state-transition";
import {IBeaconChain} from "..";
Expand All @@ -14,8 +12,9 @@ import {RegenCaller} from "../regen/index.js";
import {getAttDataBase64FromSignedAggregateAndProofSerialized} from "../../util/sszBytes.js";
import {getSelectionProofSignatureSet, getAggregateAndProofSignatureSet} from "./signatureSets/index.js";
import {
getAttestationDataSigningRoot,
getCommitteeIndices,
getStateForAttestationVerification,
getShufflingForAttestationVerification,
verifyHeadBlockAndTargetRoot,
verifyPropagationSlotRange,
} from "./attestation.js";
Expand Down Expand Up @@ -142,17 +141,24 @@ async function validateAggregateAndProof(
// -- i.e. get_ancestor(store, aggregate.data.beacon_block_root, compute_start_slot_at_epoch(store.finalized_checkpoint.epoch)) == store.finalized_checkpoint.root
// > Altready check in `chain.forkChoice.hasBlock(attestation.data.beaconBlockRoot)`

const attHeadState = await getStateForAttestationVerification(
const shuffling = await getShufflingForAttestationVerification(
chain,
attSlot,
attEpoch,
attHeadBlock,
RegenCaller.validateGossipAggregateAndProof
RegenCaller.validateGossipAttestation
);

if (shuffling === null) {
throw new AttestationError(GossipAction.IGNORE, {
code: AttestationErrorCode.NO_COMMITTEE_FOR_SLOT_AND_INDEX,
index: attIndex,
slot: attSlot,
});
}

const committeeIndices: number[] = cachedAttData
? cachedAttData.committeeIndices
: getCommitteeIndices(attHeadState, attSlot, attIndex);
: getCommitteeIndices(shuffling, attSlot, attIndex);

const attestingIndices = aggregate.aggregationBits.intersectValues(committeeIndices);
const indexedAttestation: phase0.IndexedAttestation = {
Expand Down Expand Up @@ -185,21 +191,16 @@ async function validateAggregateAndProof(
// by the validator with index aggregate_and_proof.aggregator_index.
// [REJECT] The aggregator signature, signed_aggregate_and_proof.signature, is valid.
// [REJECT] The signature of aggregate is valid.
const aggregator = attHeadState.epochCtx.index2pubkey[aggregateAndProof.aggregatorIndex];
let indexedAttestationSignatureSet: ISignatureSet;
if (cachedAttData) {
const {signingRoot} = cachedAttData;
indexedAttestationSignatureSet = createAggregateSignatureSetFromComponents(
indexedAttestation.attestingIndices.map((i) => chain.index2pubkey[i]),
signingRoot,
indexedAttestation.signature
);
} else {
indexedAttestationSignatureSet = getIndexedAttestationSignatureSet(attHeadState, indexedAttestation);
}
const aggregator = chain.index2pubkey[aggregateAndProof.aggregatorIndex];
const signingRoot = cachedAttData ? cachedAttData.signingRoot : getAttestationDataSigningRoot(chain.config, attData);
const indexedAttestationSignatureSet = createAggregateSignatureSetFromComponents(
indexedAttestation.attestingIndices.map((i) => chain.index2pubkey[i]),
signingRoot,
indexedAttestation.signature
);
const signatureSets = [
getSelectionProofSignatureSet(attHeadState, attSlot, aggregator, signedAggregateAndProof),
getAggregateAndProofSignatureSet(attHeadState, attEpoch, aggregator, signedAggregateAndProof),
getSelectionProofSignatureSet(chain.config, attSlot, aggregator, signedAggregateAndProof),
getAggregateAndProofSignatureSet(chain.config, attEpoch, aggregator, signedAggregateAndProof),
indexedAttestationSignatureSet,
];
// no need to write to SeenAttestationDatas
Expand Down
Loading

0 comments on commit 95a885a

Please sign in to comment.