From cdfb908985fe3b150c65487ff1271830bcd899b5 Mon Sep 17 00:00:00 2001 From: Tuyen Nguyen Date: Wed, 11 Oct 2023 10:02:51 +0700 Subject: [PATCH] feat: support promise in ShufflingCache --- packages/beacon-node/src/chain/chain.ts | 43 +++++ packages/beacon-node/src/chain/interface.ts | 6 + .../beacon-node/src/chain/shufflingCache.ts | 149 +++++++++++++++--- .../src/chain/validation/attestation.ts | 89 +++++------ .../src/metrics/metrics/lodestar.ts | 16 ++ .../test/unit/chain/shufflingCache.test.ts | 47 ++++++ .../validation/aggregateAndProof.test.ts | 1 + .../unit/chain/validation/attestation.test.ts | 10 +- 8 files changed, 289 insertions(+), 72 deletions(-) create mode 100644 packages/beacon-node/test/unit/chain/shufflingCache.test.ts diff --git a/packages/beacon-node/src/chain/chain.ts b/packages/beacon-node/src/chain/chain.ts index 38ed86fdbece..72ca2d0c835a 100644 --- a/packages/beacon-node/src/chain/chain.ts +++ b/packages/beacon-node/src/chain/chain.ts @@ -633,6 +633,49 @@ export class BeaconChain implements IBeaconChain { } } + /** + * Regenerate state for attestation verification, this does not happen with default chain option of maxSkipSlots = 32 . + * However, need to handle just in case. Lodestar doesn't support multiple regen state requests for attestation verification + * at the same time, bounded inside "ShufflingCache.insertPromise()" function. + * Leave this function in chain instead of attestatation verification code to make sure we're aware of its performance impact. + */ + async regenStateForAttestationVerification( + attEpoch: Epoch, + shufflingDependentRoot: RootHex, + attHeadBlock: ProtoBlock, + regenCaller: RegenCaller + ): Promise { + // this is to prevent multiple calls to get shuffling for the same epoch and dependent root + // any subsequent calls of the same epoch and dependent root will wait for this promise to resolve + this.shufflingCache.insertPromise(attEpoch, shufflingDependentRoot); + const blockEpoch = computeEpochAtSlot(attHeadBlock.slot); + + let state: CachedBeaconStateAllForks; + if (blockEpoch < attEpoch - 1) { + // thanks to one epoch look ahead, we don't need to dial up to attEpoch + const targetSlot = computeStartSlotAtEpoch(attEpoch - 1); + this.metrics?.gossipAttestation.useHeadBlockStateDialedToTargetEpoch.inc({caller: regenCaller}); + state = await this.regen.getBlockSlotState( + attHeadBlock.blockRoot, + targetSlot, + {dontTransferCache: true}, + regenCaller + ); + } else if (blockEpoch > attEpoch) { + // should not happen, handled inside attestation verification code + throw Error(`Block epoch ${blockEpoch} is after attestation epoch ${attEpoch}`); + } else { + // should use either current or next shuffling of head state + // it's not likely to hit this since these shufflings are cached already + // so handle just in case + this.metrics?.gossipAttestation.useHeadBlockState.inc({caller: regenCaller}); + state = await this.regen.getState(attHeadBlock.stateRoot, regenCaller); + } + + // resolve the promise to unblock other calls of the same epoch and dependent root + this.shufflingCache.processState(state, attEpoch); + } + /** * `ForkChoice.onBlock` must never throw for a block that is valid with respect to the network * `justifiedBalancesGetter()` must never throw and it should always return a state. diff --git a/packages/beacon-node/src/chain/interface.ts b/packages/beacon-node/src/chain/interface.ts index 97fa93c7d1d3..465928df1099 100644 --- a/packages/beacon-node/src/chain/interface.ts +++ b/packages/beacon-node/src/chain/interface.ts @@ -160,6 +160,12 @@ export interface IBeaconChain { persistInvalidSszBytes(type: string, sszBytes: Uint8Array, suffix?: string): void; /** Persist bad items to persistInvalidSszObjectsDir dir, for example invalid state, attestations etc. */ persistInvalidSszView(view: TreeView, suffix?: string): void; + regenStateForAttestationVerification( + attEpoch: Epoch, + shufflingDependentRoot: RootHex, + attHeadBlock: ProtoBlock, + regenCaller: RegenCaller + ): Promise; updateBuilderStatus(clockSlot: Slot): void; regenCanAcceptWork(): boolean; diff --git a/packages/beacon-node/src/chain/shufflingCache.ts b/packages/beacon-node/src/chain/shufflingCache.ts index ffd528a43279..71a709184d47 100644 --- a/packages/beacon-node/src/chain/shufflingCache.ts +++ b/packages/beacon-node/src/chain/shufflingCache.ts @@ -10,52 +10,161 @@ import {Metrics} from "../metrics/metrics.js"; **/ const MAX_SHUFFLING_CACHE_SIZE = 4; +/** + * With default chain option of maxSkipSlots = 32, there should be no shuffling promise. If that happens a lot, it could blow up Lodestar, + * with MAX_SHUFFLING_CACHE_SIZE = 4, only allow 1 promise at a time. + */ +const MAX_SHUFFLING_PROMISE = 1; + +enum CacheItemType { + shuffling, + promise, +} + type ShufflingCacheItem = { + type: CacheItemType.shuffling; decisionBlockHex: RootHex; + epoch: Epoch; shuffling: EpochShuffling; }; +type PromiseCacheItem = { + type: CacheItemType.promise; + decisionBlockHex: RootHex; + epoch: Epoch; + promise: Promise; + resolveFn: (shuffling: EpochShuffling) => void; +}; + +type CacheItem = ShufflingCacheItem | PromiseCacheItem; + /** * A shuffling cache to help: * - get committee quickly for attestation verification + * - if a shuffling is not available (which does not happen with default chain option of maxSkipSlots = 32), track a promise to make sure we don't compute the same shuffling twice * - skip computing shuffling when loading state bytes from disk */ export class ShufflingCache { - private readonly items: ShufflingCacheItem[] = []; + /** LRU cache implemented as an array, pruned every time we add an item */ + private readonly items: CacheItem[] = []; - constructor(metrics: Metrics | null = null) { + constructor( + private readonly metrics: Metrics | null = null, + private maxSize = MAX_SHUFFLING_CACHE_SIZE + ) { if (metrics) { metrics.shufflingCache.size.addCollect(() => metrics.shufflingCache.size.set(this.items.length)); } } + /** + * Extract shuffling from state and add to cache + */ processState(state: CachedBeaconStateAllForks, shufflingEpoch: Epoch): void { const decisionBlockHex = getShufflingDecisionBlock(state, shufflingEpoch); - const index = this.items.findIndex( - (item) => item.shuffling.epoch === shufflingEpoch && item.decisionBlockHex === decisionBlockHex - ); - if (index === -1) { - if (this.items.length === MAX_SHUFFLING_CACHE_SIZE) { - this.items.shift(); - } - let shuffling: EpochShuffling; - if (shufflingEpoch === state.epochCtx.nextShuffling.epoch) { + let shuffling: EpochShuffling; + switch (shufflingEpoch) { + case state.epochCtx.nextShuffling.epoch: shuffling = state.epochCtx.nextShuffling; - } else if (shufflingEpoch === state.epochCtx.currentShuffling.epoch) { + break; + case state.epochCtx.currentShuffling.epoch: shuffling = state.epochCtx.currentShuffling; - } else if (shufflingEpoch === state.epochCtx.previousShuffling.epoch) { + break; + case state.epochCtx.previousShuffling.epoch: shuffling = state.epochCtx.previousShuffling; - } else { + break; + default: throw new Error(`Shuffling not found from state ${state.slot} for epoch ${shufflingEpoch}`); + } + + let found = false; + for (const item of this.items) { + if (item.epoch === shufflingEpoch && item.decisionBlockHex === decisionBlockHex) { + found = true; + if (isPromiseCacheItem(item)) { + // unblock consumers of this promise + item.resolveFn(shuffling); + // then update item type to shuffling + Object.assign(item, {type: CacheItemType.shuffling, shuffling}); + // TODO: remove promise and resolveFn? + // we updated type to CacheItemType.shuffling so the above fields are not used anyway + this.metrics?.shufflingCache.processStateUpdatePromise.inc(); + } else { + // ShufflingCacheItem, do nothing + this.metrics?.shufflingCache.processStateNoOp.inc(); + } + break; } - this.items.push({decisionBlockHex, shuffling}); + } + + if (!found) { + this.add({type: CacheItemType.shuffling, epoch: shufflingEpoch, decisionBlockHex, shuffling}); + this.metrics?.shufflingCache.processStateInsertNew.inc(); } } - get(shufflingEpoch: Epoch, dependentRootHex: RootHex): EpochShuffling | null { - return ( - this.items.find((item) => item.shuffling.epoch === shufflingEpoch && item.decisionBlockHex === dependentRootHex) - ?.shuffling ?? null - ); + /** + * Insert a promise to make sure we don't regen state for the same shuffling. + * Bound by MAX_SHUFFLING_PROMISE to make sure our node does not blow up. + */ + insertPromise(shufflingEpoch: Epoch, dependentRootHex: RootHex): void { + const promiseCount = this.items.filter((item) => isPromiseCacheItem(item)).length; + if (promiseCount >= MAX_SHUFFLING_PROMISE) { + throw new Error( + `Too many shuffling promises: ${promiseCount}, shufflingEpoch: ${shufflingEpoch}, dependentRootHex: ${dependentRootHex}` + ); + } + let resolveFn: ((shuffling: EpochShuffling) => void) | null = null; + const promise = new Promise((resolve) => { + resolveFn = resolve; + }); + if (resolveFn === null) { + throw new Error("Promise Constructor was not executed immediately"); + } + + const cacheItem: PromiseCacheItem = { + type: CacheItemType.promise, + epoch: shufflingEpoch, + decisionBlockHex: dependentRootHex, + promise, + resolveFn, + }; + this.add(cacheItem); + this.metrics?.shufflingCache.insertPromiseCount.inc(); + } + + /** + * Most of the time, this should return a shuffling immediately. + * If there's a promise, it means we are computing the same shuffling, so we wait for the promise to resolve. + * Return null if we don't have a shuffling for this epoch and dependentRootHex. + */ + async get(shufflingEpoch: Epoch, dependentRootHex: RootHex): Promise { + for (const item of this.items) { + if (item.epoch === shufflingEpoch && item.decisionBlockHex === dependentRootHex) { + if (isShufflingCacheItem(item)) { + return item.shuffling; + } else { + return item.promise; + } + } + } + + // not found + return null; } + + private add(cacheItem: CacheItem): void { + if (this.items.length === this.maxSize) { + this.items.shift(); + } + this.items.push(cacheItem); + } +} + +function isShufflingCacheItem(item: CacheItem): item is ShufflingCacheItem { + return item.type === CacheItemType.shuffling; +} + +function isPromiseCacheItem(item: CacheItem): item is PromiseCacheItem { + return item.type === CacheItemType.promise; } diff --git a/packages/beacon-node/src/chain/validation/attestation.ts b/packages/beacon-node/src/chain/validation/attestation.ts index 5db87fa9f5ec..2da29c16f94a 100644 --- a/packages/beacon-node/src/chain/validation/attestation.ts +++ b/packages/beacon-node/src/chain/validation/attestation.ts @@ -4,7 +4,6 @@ import {EpochDifference, ProtoBlock} from "@lodestar/fork-choice"; import {ATTESTATION_SUBNET_COUNT, SLOTS_PER_EPOCH, ForkName, ForkSeq, DOMAIN_BEACON_ATTESTER} from "@lodestar/params"; import { computeEpochAtSlot, - CachedBeaconStateAllForks, createSingleSignatureSetFromComponents, SingleSignatureSet, EpochCacheError, @@ -590,6 +589,46 @@ export async function getShufflingForAttestationVerification( regenCaller: RegenCaller ): Promise { const blockEpoch = computeEpochAtSlot(attHeadBlock.slot); + const shufflingDependentRoot = getShufflingDependentRoot(chain, attEpoch, blockEpoch, attHeadBlock); + + let shuffling = await chain.shufflingCache.get(attEpoch, shufflingDependentRoot); + if (shuffling) { + // most of the time, we should get the shuffling from cache + chain.metrics?.gossipAttestation.shufflingHit.inc({caller: regenCaller}); + return shuffling; + } + + chain.metrics?.gossipAttestation.shufflingMiss.inc({caller: regenCaller}); + try { + // for the 1st time of the same epoch and dependent root, it awaits for the regen state + // from the 2nd time, it should use the same cached promise and it should reach the above code + await chain.regenStateForAttestationVerification(attEpoch, shufflingDependentRoot, attHeadBlock, regenCaller); + } catch (e) { + throw new AttestationError(GossipAction.IGNORE, { + code: AttestationErrorCode.MISSING_STATE_TO_VERIFY_ATTESTATION, + error: e as Error, + }); + } + + shuffling = await chain.shufflingCache.get(attEpoch, shufflingDependentRoot); + if (shuffling) { + chain.metrics?.gossipAttestation.shufflingRegenHit.inc({caller: regenCaller}); + return shuffling; + } else { + chain.metrics?.gossipAttestation.shufflingRegenMiss.inc({caller: regenCaller}); + return null; + } +} + +/** + * Get dependent root of a shuffling given attestation epoch and head block. + */ +export function getShufflingDependentRoot( + chain: IBeaconChain, + attEpoch: Epoch, + blockEpoch: Epoch, + attHeadBlock: ProtoBlock +): RootHex { let shufflingDependentRoot: RootHex; if (blockEpoch === attEpoch) { // current shuffling, this is equivalent to `headState.currentShuffling` @@ -623,53 +662,7 @@ export async function getShufflingForAttestationVerification( throw Error(`attestation epoch ${attEpoch} is before head block epoch ${blockEpoch}`); } - let shuffling = chain.shufflingCache.get(attEpoch, shufflingDependentRoot); - if (shuffling) { - // most of the time, we should get the shuffling from cache - chain.metrics?.gossipAttestation.shufflingHit.inc({caller: regenCaller}); - return shuffling; - } - chain.metrics?.gossipAttestation.shufflingMiss.inc({caller: regenCaller}); - - let state: CachedBeaconStateAllForks; - try { - if (blockEpoch < attEpoch - 1) { - // thanks to one epoch look ahead, we don't need to dial up to attEpoch - const targetSlot = computeStartSlotAtEpoch(attEpoch - 1); - chain.metrics?.gossipAttestation.useHeadBlockStateDialedToTargetEpoch.inc({caller: regenCaller}); - state = await chain.regen.getBlockSlotState( - attHeadBlock.blockRoot, - targetSlot, - {dontTransferCache: true}, - regenCaller - ); - } else if (blockEpoch > attEpoch) { - // should not happen, handled above - throw Error(`Block epoch ${blockEpoch} is after attestation epoch ${attEpoch}`); - } else { - // should use either current or next shuffling of head state - // it's not likely to hit this since these shufflings are cached already - // so handle just in case - chain.metrics?.gossipAttestation.useHeadBlockState.inc({caller: regenCaller}); - state = await chain.regen.getState(attHeadBlock.stateRoot, regenCaller); - } - } catch (e) { - throw new AttestationError(GossipAction.IGNORE, { - code: AttestationErrorCode.MISSING_STATE_TO_VERIFY_ATTESTATION, - error: e as Error, - }); - } - - // add to cache - chain.shufflingCache.processState(state, attEpoch); - shuffling = chain.shufflingCache.get(attEpoch, shufflingDependentRoot); - if (shuffling) { - chain.metrics?.gossipAttestation.shufflingRegenHit.inc({caller: regenCaller}); - return shuffling; - } else { - chain.metrics?.gossipAttestation.shufflingRegenMiss.inc({caller: regenCaller}); - return null; - } + return shufflingDependentRoot; } /** diff --git a/packages/beacon-node/src/metrics/metrics/lodestar.ts b/packages/beacon-node/src/metrics/metrics/lodestar.ts index 0eb22401e4ee..fc591657533c 100644 --- a/packages/beacon-node/src/metrics/metrics/lodestar.ts +++ b/packages/beacon-node/src/metrics/metrics/lodestar.ts @@ -1084,6 +1084,22 @@ export function createLodestarMetrics( name: "lodestar_shuffling_cache_size", help: "Shuffling cache size", }), + processStateInsertNew: register.gauge({ + name: "lodestar_shuffling_cache_process_state_insert_new_total", + help: "Total number of times processState is called resulting a new shuffling", + }), + processStateUpdatePromise: register.gauge({ + name: "lodestar_shuffling_cache_process_state_update_promise_total", + help: "Total number of times processState is called resulting a promise being updated with shuffling", + }), + processStateNoOp: register.gauge({ + name: "lodestar_shuffling_cache_process_state_no_op_total", + help: "Total number of times processState is called resulting no changes", + }), + insertPromiseCount: register.gauge({ + name: "lodestar_shuffling_cache_insert_promise_count", + help: "Total number of times insertPromise is called", + }), }, seenCache: { diff --git a/packages/beacon-node/test/unit/chain/shufflingCache.test.ts b/packages/beacon-node/test/unit/chain/shufflingCache.test.ts new file mode 100644 index 000000000000..7dae2593e7fd --- /dev/null +++ b/packages/beacon-node/test/unit/chain/shufflingCache.test.ts @@ -0,0 +1,47 @@ +import {expect} from "chai"; +import {getShufflingDecisionBlock} from "@lodestar/state-transition"; +// eslint-disable-next-line import/no-relative-packages +import {generateTestCachedBeaconStateOnlyValidators} from "../../../../state-transition/test/perf/util.js"; +import {ShufflingCache} from "../../../src/chain/shufflingCache.js"; + +describe("ShufflingCache", function () { + const vc = 64; + const stateSlot = 100; + const state = generateTestCachedBeaconStateOnlyValidators({vc, slot: stateSlot}); + const currentEpoch = state.epochCtx.currentShuffling.epoch; + let shufflingCache: ShufflingCache; + + beforeEach(() => { + shufflingCache = new ShufflingCache(null, 1); + shufflingCache.processState(state, currentEpoch); + }); + + it("should get shuffling from cache", async function () { + const dependentRoot = getShufflingDecisionBlock(state, currentEpoch); + expect(await shufflingCache.get(currentEpoch, dependentRoot)).to.deep.equal(state.epochCtx.currentShuffling); + }); + + it("should bound by maxSize", async function () { + const dependentRoot = getShufflingDecisionBlock(state, currentEpoch); + expect(await shufflingCache.get(currentEpoch, dependentRoot)).to.deep.equal(state.epochCtx.currentShuffling); + shufflingCache.processState(state, currentEpoch + 1); + // the current shuffling is not available anymore + expect(await shufflingCache.get(currentEpoch, dependentRoot)).to.be.null; + }); + + it("should return shuffling from promise", async function () { + const nextDependentRoot = getShufflingDecisionBlock(state, currentEpoch + 1); + shufflingCache.insertPromise(currentEpoch + 1, nextDependentRoot); + const shufflingRequest0 = shufflingCache.get(currentEpoch + 1, nextDependentRoot); + const shufflingRequest1 = shufflingCache.get(currentEpoch + 1, nextDependentRoot); + shufflingCache.processState(state, currentEpoch + 1); + expect(await shufflingRequest0).to.deep.equal(state.epochCtx.nextShuffling); + expect(await shufflingRequest1).to.deep.equal(state.epochCtx.nextShuffling); + }); + + it("should support 1 promise at a time", async function () { + const nextDependentRoot = getShufflingDecisionBlock(state, currentEpoch + 1); + shufflingCache.insertPromise(currentEpoch + 1, nextDependentRoot); + expect(() => shufflingCache.insertPromise(currentEpoch + 1, nextDependentRoot)).to.throw(); + }); +}); diff --git a/packages/beacon-node/test/unit/chain/validation/aggregateAndProof.test.ts b/packages/beacon-node/test/unit/chain/validation/aggregateAndProof.test.ts index a5b192abee44..5cf8e9f0bb59 100644 --- a/packages/beacon-node/test/unit/chain/validation/aggregateAndProof.test.ts +++ b/packages/beacon-node/test/unit/chain/validation/aggregateAndProof.test.ts @@ -121,6 +121,7 @@ describe("chain / validation / aggregateAndProof", () => { getState(), signedAggregateAndProof.message.aggregate.data.slot + 2 * SLOTS_PER_EPOCH ); + chain.regenStateForAttestationVerification = () => Promise.resolve(); (chain as {regen: IStateRegenerator}).regen = { getState: async () => committeeState, } as Partial as IStateRegenerator; diff --git a/packages/beacon-node/test/unit/chain/validation/attestation.test.ts b/packages/beacon-node/test/unit/chain/validation/attestation.test.ts index a950345f816a..2e8a381c326b 100644 --- a/packages/beacon-node/test/unit/chain/validation/attestation.test.ts +++ b/packages/beacon-node/test/unit/chain/validation/attestation.test.ts @@ -346,6 +346,7 @@ describe("validateAttestation", () => { // simulate https://github.com/ChainSafe/lodestar/issues/4396 // this way we cannot get committeeIndices const committeeState = processSlots(getState(), attestation.data.slot + 2 * SLOTS_PER_EPOCH); + chain.regenStateForAttestationVerification = () => Promise.resolve(); (chain as {regen: IStateRegenerator}).regen = { getState: async () => committeeState, } as Partial as IStateRegenerator; @@ -520,7 +521,7 @@ describe("getShufflingForAttestationVerification", () => { const previousDependentRoot = "0xa916b57729dbfb89a082820e0eb2b669d9d511a675d3d8c888b2f300f10b0bdf"; forkchoiceStub.getDependentRoot.withArgs(attHeadBlock, EpochDifference.previous).returns(previousDependentRoot); const expectedShuffling = {epoch: attEpoch} as EpochShuffling; - shufflingCacheStub.get.withArgs(attEpoch, previousDependentRoot).returns(expectedShuffling); + shufflingCacheStub.get.withArgs(attEpoch, previousDependentRoot).resolves(expectedShuffling); const resultShuffling = await getShufflingForAttestationVerification( chain, attEpoch, @@ -540,7 +541,7 @@ describe("getShufflingForAttestationVerification", () => { const currentDependentRoot = "0xa916b57729dbfb89a082820e0eb2b669d9d511a675d3d8c888b2f300f10b0bdf"; forkchoiceStub.getDependentRoot.withArgs(attHeadBlock, EpochDifference.current).returns(currentDependentRoot); const expectedShuffling = {epoch: attEpoch} as EpochShuffling; - shufflingCacheStub.get.withArgs(attEpoch, currentDependentRoot).returns(expectedShuffling); + shufflingCacheStub.get.withArgs(attEpoch, currentDependentRoot).resolves(expectedShuffling); const resultShuffling = await getShufflingForAttestationVerification( chain, attEpoch, @@ -558,8 +559,9 @@ describe("getShufflingForAttestationVerification", () => { blockRoot, } as Partial as ProtoBlock; const expectedShuffling = {epoch: attEpoch} as EpochShuffling; - shufflingCacheStub.get.withArgs(attEpoch, blockRoot).onFirstCall().returns(null); - shufflingCacheStub.get.withArgs(attEpoch, blockRoot).onSecondCall().returns(expectedShuffling); + shufflingCacheStub.get.withArgs(attEpoch, blockRoot).onFirstCall().resolves(null); + shufflingCacheStub.get.withArgs(attEpoch, blockRoot).onSecondCall().resolves(expectedShuffling); + chain.regenStateForAttestationVerification = () => Promise.resolve(); const resultShuffling = await getShufflingForAttestationVerification( chain, attEpoch,