Skip to content

Commit

Permalink
feat: support promise in ShufflingCache
Browse files Browse the repository at this point in the history
  • Loading branch information
twoeths committed Oct 11, 2023
1 parent 95a885a commit cdfb908
Show file tree
Hide file tree
Showing 8 changed files with 289 additions and 72 deletions.
43 changes: 43 additions & 0 deletions packages/beacon-node/src/chain/chain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -633,6 +633,49 @@ export class BeaconChain implements IBeaconChain {
}
}

/**
* Regenerate state for attestation verification, this does not happen with default chain option of maxSkipSlots = 32 .
* However, need to handle just in case. Lodestar doesn't support multiple regen state requests for attestation verification
* at the same time, bounded inside "ShufflingCache.insertPromise()" function.
* Leave this function in chain instead of attestatation verification code to make sure we're aware of its performance impact.
*/
async regenStateForAttestationVerification(
attEpoch: Epoch,
shufflingDependentRoot: RootHex,
attHeadBlock: ProtoBlock,
regenCaller: RegenCaller
): Promise<void> {
// this is to prevent multiple calls to get shuffling for the same epoch and dependent root
// any subsequent calls of the same epoch and dependent root will wait for this promise to resolve
this.shufflingCache.insertPromise(attEpoch, shufflingDependentRoot);
const blockEpoch = computeEpochAtSlot(attHeadBlock.slot);

let state: CachedBeaconStateAllForks;
if (blockEpoch < attEpoch - 1) {
// thanks to one epoch look ahead, we don't need to dial up to attEpoch
const targetSlot = computeStartSlotAtEpoch(attEpoch - 1);
this.metrics?.gossipAttestation.useHeadBlockStateDialedToTargetEpoch.inc({caller: regenCaller});
state = await this.regen.getBlockSlotState(
attHeadBlock.blockRoot,
targetSlot,
{dontTransferCache: true},
regenCaller
);
} else if (blockEpoch > attEpoch) {
// should not happen, handled inside attestation verification code
throw Error(`Block epoch ${blockEpoch} is after attestation epoch ${attEpoch}`);
} else {
// should use either current or next shuffling of head state
// it's not likely to hit this since these shufflings are cached already
// so handle just in case
this.metrics?.gossipAttestation.useHeadBlockState.inc({caller: regenCaller});
state = await this.regen.getState(attHeadBlock.stateRoot, regenCaller);
}

// resolve the promise to unblock other calls of the same epoch and dependent root
this.shufflingCache.processState(state, attEpoch);
}

/**
* `ForkChoice.onBlock` must never throw for a block that is valid with respect to the network
* `justifiedBalancesGetter()` must never throw and it should always return a state.
Expand Down
6 changes: 6 additions & 0 deletions packages/beacon-node/src/chain/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,12 @@ export interface IBeaconChain {
persistInvalidSszBytes(type: string, sszBytes: Uint8Array, suffix?: string): void;
/** Persist bad items to persistInvalidSszObjectsDir dir, for example invalid state, attestations etc. */
persistInvalidSszView(view: TreeView<CompositeTypeAny>, suffix?: string): void;
regenStateForAttestationVerification(
attEpoch: Epoch,
shufflingDependentRoot: RootHex,
attHeadBlock: ProtoBlock,
regenCaller: RegenCaller
): Promise<void>;
updateBuilderStatus(clockSlot: Slot): void;

regenCanAcceptWork(): boolean;
Expand Down
149 changes: 129 additions & 20 deletions packages/beacon-node/src/chain/shufflingCache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,52 +10,161 @@ import {Metrics} from "../metrics/metrics.js";
**/
const MAX_SHUFFLING_CACHE_SIZE = 4;

/**
* With default chain option of maxSkipSlots = 32, there should be no shuffling promise. If that happens a lot, it could blow up Lodestar,
* with MAX_SHUFFLING_CACHE_SIZE = 4, only allow 1 promise at a time.
*/
const MAX_SHUFFLING_PROMISE = 1;

enum CacheItemType {
shuffling,
promise,
}

type ShufflingCacheItem = {
type: CacheItemType.shuffling;
decisionBlockHex: RootHex;
epoch: Epoch;
shuffling: EpochShuffling;
};

type PromiseCacheItem = {
type: CacheItemType.promise;
decisionBlockHex: RootHex;
epoch: Epoch;
promise: Promise<EpochShuffling>;
resolveFn: (shuffling: EpochShuffling) => void;
};

type CacheItem = ShufflingCacheItem | PromiseCacheItem;

/**
* A shuffling cache to help:
* - get committee quickly for attestation verification
* - if a shuffling is not available (which does not happen with default chain option of maxSkipSlots = 32), track a promise to make sure we don't compute the same shuffling twice
* - skip computing shuffling when loading state bytes from disk
*/
export class ShufflingCache {
private readonly items: ShufflingCacheItem[] = [];
/** LRU cache implemented as an array, pruned every time we add an item */
private readonly items: CacheItem[] = [];

constructor(metrics: Metrics | null = null) {
constructor(
private readonly metrics: Metrics | null = null,
private maxSize = MAX_SHUFFLING_CACHE_SIZE
) {
if (metrics) {
metrics.shufflingCache.size.addCollect(() => metrics.shufflingCache.size.set(this.items.length));
}
}

/**
* Extract shuffling from state and add to cache
*/
processState(state: CachedBeaconStateAllForks, shufflingEpoch: Epoch): void {
const decisionBlockHex = getShufflingDecisionBlock(state, shufflingEpoch);
const index = this.items.findIndex(
(item) => item.shuffling.epoch === shufflingEpoch && item.decisionBlockHex === decisionBlockHex
);
if (index === -1) {
if (this.items.length === MAX_SHUFFLING_CACHE_SIZE) {
this.items.shift();
}
let shuffling: EpochShuffling;
if (shufflingEpoch === state.epochCtx.nextShuffling.epoch) {
let shuffling: EpochShuffling;
switch (shufflingEpoch) {
case state.epochCtx.nextShuffling.epoch:
shuffling = state.epochCtx.nextShuffling;
} else if (shufflingEpoch === state.epochCtx.currentShuffling.epoch) {
break;
case state.epochCtx.currentShuffling.epoch:
shuffling = state.epochCtx.currentShuffling;
} else if (shufflingEpoch === state.epochCtx.previousShuffling.epoch) {
break;
case state.epochCtx.previousShuffling.epoch:
shuffling = state.epochCtx.previousShuffling;
} else {
break;
default:
throw new Error(`Shuffling not found from state ${state.slot} for epoch ${shufflingEpoch}`);
}

let found = false;
for (const item of this.items) {
if (item.epoch === shufflingEpoch && item.decisionBlockHex === decisionBlockHex) {
found = true;
if (isPromiseCacheItem(item)) {
// unblock consumers of this promise
item.resolveFn(shuffling);
// then update item type to shuffling
Object.assign(item, {type: CacheItemType.shuffling, shuffling});
// TODO: remove promise and resolveFn?
// we updated type to CacheItemType.shuffling so the above fields are not used anyway
this.metrics?.shufflingCache.processStateUpdatePromise.inc();
} else {
// ShufflingCacheItem, do nothing
this.metrics?.shufflingCache.processStateNoOp.inc();
}
break;
}
this.items.push({decisionBlockHex, shuffling});
}

if (!found) {
this.add({type: CacheItemType.shuffling, epoch: shufflingEpoch, decisionBlockHex, shuffling});
this.metrics?.shufflingCache.processStateInsertNew.inc();
}
}

get(shufflingEpoch: Epoch, dependentRootHex: RootHex): EpochShuffling | null {
return (
this.items.find((item) => item.shuffling.epoch === shufflingEpoch && item.decisionBlockHex === dependentRootHex)
?.shuffling ?? null
);
/**
* Insert a promise to make sure we don't regen state for the same shuffling.
* Bound by MAX_SHUFFLING_PROMISE to make sure our node does not blow up.
*/
insertPromise(shufflingEpoch: Epoch, dependentRootHex: RootHex): void {
const promiseCount = this.items.filter((item) => isPromiseCacheItem(item)).length;
if (promiseCount >= MAX_SHUFFLING_PROMISE) {
throw new Error(
`Too many shuffling promises: ${promiseCount}, shufflingEpoch: ${shufflingEpoch}, dependentRootHex: ${dependentRootHex}`
);
}
let resolveFn: ((shuffling: EpochShuffling) => void) | null = null;
const promise = new Promise<EpochShuffling>((resolve) => {
resolveFn = resolve;
});
if (resolveFn === null) {
throw new Error("Promise Constructor was not executed immediately");
}

const cacheItem: PromiseCacheItem = {
type: CacheItemType.promise,
epoch: shufflingEpoch,
decisionBlockHex: dependentRootHex,
promise,
resolveFn,
};
this.add(cacheItem);
this.metrics?.shufflingCache.insertPromiseCount.inc();
}

/**
* Most of the time, this should return a shuffling immediately.
* If there's a promise, it means we are computing the same shuffling, so we wait for the promise to resolve.
* Return null if we don't have a shuffling for this epoch and dependentRootHex.
*/
async get(shufflingEpoch: Epoch, dependentRootHex: RootHex): Promise<EpochShuffling | null> {
for (const item of this.items) {
if (item.epoch === shufflingEpoch && item.decisionBlockHex === dependentRootHex) {
if (isShufflingCacheItem(item)) {
return item.shuffling;
} else {
return item.promise;
}
}
}

// not found
return null;
}

private add(cacheItem: CacheItem): void {
if (this.items.length === this.maxSize) {
this.items.shift();
}
this.items.push(cacheItem);
}
}

function isShufflingCacheItem(item: CacheItem): item is ShufflingCacheItem {
return item.type === CacheItemType.shuffling;
}

function isPromiseCacheItem(item: CacheItem): item is PromiseCacheItem {
return item.type === CacheItemType.promise;
}
89 changes: 41 additions & 48 deletions packages/beacon-node/src/chain/validation/attestation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import {EpochDifference, ProtoBlock} from "@lodestar/fork-choice";
import {ATTESTATION_SUBNET_COUNT, SLOTS_PER_EPOCH, ForkName, ForkSeq, DOMAIN_BEACON_ATTESTER} from "@lodestar/params";
import {
computeEpochAtSlot,
CachedBeaconStateAllForks,
createSingleSignatureSetFromComponents,
SingleSignatureSet,
EpochCacheError,
Expand Down Expand Up @@ -590,6 +589,46 @@ export async function getShufflingForAttestationVerification(
regenCaller: RegenCaller
): Promise<EpochShuffling | null> {
const blockEpoch = computeEpochAtSlot(attHeadBlock.slot);
const shufflingDependentRoot = getShufflingDependentRoot(chain, attEpoch, blockEpoch, attHeadBlock);

let shuffling = await chain.shufflingCache.get(attEpoch, shufflingDependentRoot);
if (shuffling) {
// most of the time, we should get the shuffling from cache
chain.metrics?.gossipAttestation.shufflingHit.inc({caller: regenCaller});
return shuffling;
}

chain.metrics?.gossipAttestation.shufflingMiss.inc({caller: regenCaller});
try {
// for the 1st time of the same epoch and dependent root, it awaits for the regen state
// from the 2nd time, it should use the same cached promise and it should reach the above code
await chain.regenStateForAttestationVerification(attEpoch, shufflingDependentRoot, attHeadBlock, regenCaller);
} catch (e) {
throw new AttestationError(GossipAction.IGNORE, {
code: AttestationErrorCode.MISSING_STATE_TO_VERIFY_ATTESTATION,
error: e as Error,
});
}

shuffling = await chain.shufflingCache.get(attEpoch, shufflingDependentRoot);
if (shuffling) {
chain.metrics?.gossipAttestation.shufflingRegenHit.inc({caller: regenCaller});
return shuffling;
} else {
chain.metrics?.gossipAttestation.shufflingRegenMiss.inc({caller: regenCaller});
return null;
}
}

/**
* Get dependent root of a shuffling given attestation epoch and head block.
*/
export function getShufflingDependentRoot(
chain: IBeaconChain,
attEpoch: Epoch,
blockEpoch: Epoch,
attHeadBlock: ProtoBlock
): RootHex {
let shufflingDependentRoot: RootHex;
if (blockEpoch === attEpoch) {
// current shuffling, this is equivalent to `headState.currentShuffling`
Expand Down Expand Up @@ -623,53 +662,7 @@ export async function getShufflingForAttestationVerification(
throw Error(`attestation epoch ${attEpoch} is before head block epoch ${blockEpoch}`);
}

let shuffling = chain.shufflingCache.get(attEpoch, shufflingDependentRoot);
if (shuffling) {
// most of the time, we should get the shuffling from cache
chain.metrics?.gossipAttestation.shufflingHit.inc({caller: regenCaller});
return shuffling;
}
chain.metrics?.gossipAttestation.shufflingMiss.inc({caller: regenCaller});

let state: CachedBeaconStateAllForks;
try {
if (blockEpoch < attEpoch - 1) {
// thanks to one epoch look ahead, we don't need to dial up to attEpoch
const targetSlot = computeStartSlotAtEpoch(attEpoch - 1);
chain.metrics?.gossipAttestation.useHeadBlockStateDialedToTargetEpoch.inc({caller: regenCaller});
state = await chain.regen.getBlockSlotState(
attHeadBlock.blockRoot,
targetSlot,
{dontTransferCache: true},
regenCaller
);
} else if (blockEpoch > attEpoch) {
// should not happen, handled above
throw Error(`Block epoch ${blockEpoch} is after attestation epoch ${attEpoch}`);
} else {
// should use either current or next shuffling of head state
// it's not likely to hit this since these shufflings are cached already
// so handle just in case
chain.metrics?.gossipAttestation.useHeadBlockState.inc({caller: regenCaller});
state = await chain.regen.getState(attHeadBlock.stateRoot, regenCaller);
}
} catch (e) {
throw new AttestationError(GossipAction.IGNORE, {
code: AttestationErrorCode.MISSING_STATE_TO_VERIFY_ATTESTATION,
error: e as Error,
});
}

// add to cache
chain.shufflingCache.processState(state, attEpoch);
shuffling = chain.shufflingCache.get(attEpoch, shufflingDependentRoot);
if (shuffling) {
chain.metrics?.gossipAttestation.shufflingRegenHit.inc({caller: regenCaller});
return shuffling;
} else {
chain.metrics?.gossipAttestation.shufflingRegenMiss.inc({caller: regenCaller});
return null;
}
return shufflingDependentRoot;
}

/**
Expand Down
16 changes: 16 additions & 0 deletions packages/beacon-node/src/metrics/metrics/lodestar.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1084,6 +1084,22 @@ export function createLodestarMetrics(
name: "lodestar_shuffling_cache_size",
help: "Shuffling cache size",
}),
processStateInsertNew: register.gauge({
name: "lodestar_shuffling_cache_process_state_insert_new_total",
help: "Total number of times processState is called resulting a new shuffling",
}),
processStateUpdatePromise: register.gauge({
name: "lodestar_shuffling_cache_process_state_update_promise_total",
help: "Total number of times processState is called resulting a promise being updated with shuffling",
}),
processStateNoOp: register.gauge({
name: "lodestar_shuffling_cache_process_state_no_op_total",
help: "Total number of times processState is called resulting no changes",
}),
insertPromiseCount: register.gauge({
name: "lodestar_shuffling_cache_insert_promise_count",
help: "Total number of times insertPromise is called",
}),
},

seenCache: {
Expand Down
Loading

0 comments on commit cdfb908

Please sign in to comment.