From a570048187f1e729c0be3939239fcfa3aa0a008e Mon Sep 17 00:00:00 2001 From: twoeths Date: Fri, 11 Oct 2024 09:37:29 +0700 Subject: [PATCH] fix: improve forkchoice (#7142) * fix: reuse deltas in computeDeltas * fix: remodel queuedAttestations * fix: call forkchoice.updateTime() once per clock slot * fix: recomputeForkChoiceHead() at slot boundary * fix: improve computeDeltas() - handle newBalances = oldBalances * fix: do not compute forkchoice head at slot boundary * fix: prepareNextSlot unit test --- .../src/api/impl/validator/index.ts | 12 ----- .../src/chain/blocks/importBlock.ts | 3 +- packages/beacon-node/src/chain/chain.ts | 12 ++--- .../beacon-node/src/chain/forkChoice/index.ts | 5 ++ packages/beacon-node/src/chain/interface.ts | 3 +- .../beacon-node/src/chain/prepareNextSlot.ts | 5 +- .../beacon-node/src/metrics/metrics/beacon.ts | 4 +- .../test/unit/chain/prepareNextSlot.test.ts | 2 +- .../fork-choice/src/forkChoice/forkChoice.ts | 51 ++++++++++++------- .../fork-choice/src/forkChoice/interface.ts | 11 ---- .../src/protoArray/computeDeltas.ts | 11 ++-- 11 files changed, 61 insertions(+), 58 deletions(-) diff --git a/packages/beacon-node/src/api/impl/validator/index.ts b/packages/beacon-node/src/api/impl/validator/index.ts index 8629bbb1f47c..e1ea415443b9 100644 --- a/packages/beacon-node/src/api/impl/validator/index.ts +++ b/packages/beacon-node/src/api/impl/validator/index.ts @@ -389,10 +389,6 @@ export function getValidatorApi( notWhileSyncing(); await waitForSlot(slot); // Must never request for a future slot > currentSlot - // Process the queued attestations in the forkchoice for correct head estimation - // forkChoice.updateTime() might have already been called by the onSlot clock - // handler, in which case this should just return. - chain.forkChoice.updateTime(slot); parentBlockRoot = fromHex(chain.getProposerHead(slot).blockRoot); } else { parentBlockRoot = inParentBlockRoot; @@ -459,10 +455,6 @@ export function getValidatorApi( notWhileSyncing(); await waitForSlot(slot); // Must never request for a future slot > currentSlot - // Process the queued attestations in the forkchoice for correct head estimation - // forkChoice.updateTime() might have already been called by the onSlot clock - // handler, in which case this should just return. - chain.forkChoice.updateTime(slot); parentBlockRoot = fromHex(chain.getProposerHead(slot).blockRoot); } else { parentBlockRoot = inParentBlockRoot; @@ -535,10 +527,6 @@ export function getValidatorApi( notWhileSyncing(); await waitForSlot(slot); // Must never request for a future slot > currentSlot - // Process the queued attestations in the forkchoice for correct head estimation - // forkChoice.updateTime() might have already been called by the onSlot clock - // handler, in which case this should just return. - chain.forkChoice.updateTime(slot); const parentBlockRoot = fromHex(chain.getProposerHead(slot).blockRoot); notOnOutOfRangeData(parentBlockRoot); diff --git a/packages/beacon-node/src/chain/blocks/importBlock.ts b/packages/beacon-node/src/chain/blocks/importBlock.ts index d19d6a60c564..596f01f391a4 100644 --- a/packages/beacon-node/src/chain/blocks/importBlock.ts +++ b/packages/beacon-node/src/chain/blocks/importBlock.ts @@ -19,6 +19,7 @@ import {ChainEvent, ReorgEventData} from "../emitter.js"; import {REPROCESS_MIN_TIME_TO_NEXT_SLOT_SEC} from "../reprocess.js"; import type {BeaconChain} from "../chain.js"; import {callInNextEventLoop} from "../../util/eventLoop.js"; +import {ForkchoiceCaller} from "../forkChoice/index.js"; import {FullyVerifiedBlock, ImportBlockOpts, AttestationImportOpt, BlockInputType} from "./types.js"; import {getCheckpointFromState} from "./utils/checkpoint.js"; import {writeBlockInputToDb} from "./writeBlockInputToDb.js"; @@ -208,7 +209,7 @@ export async function importBlock( // 5. Compute head. If new head, immediately stateCache.setHeadState() const oldHead = this.forkChoice.getHead(); - const newHead = this.recomputeForkChoiceHead(); + const newHead = this.recomputeForkChoiceHead(ForkchoiceCaller.importBlock); const currFinalizedEpoch = this.forkChoice.getFinalizedCheckpoint().epoch; if (newHead.blockRoot !== oldHead.blockRoot) { diff --git a/packages/beacon-node/src/chain/chain.ts b/packages/beacon-node/src/chain/chain.ts index 371f660abe2e..4bd8cd2bea3d 100644 --- a/packages/beacon-node/src/chain/chain.ts +++ b/packages/beacon-node/src/chain/chain.ts @@ -60,7 +60,7 @@ import { } from "./interface.js"; import {IChainOptions} from "./options.js"; import {QueuedStateRegenerator, RegenCaller} from "./regen/index.js"; -import {initializeForkChoice} from "./forkChoice/index.js"; +import {ForkchoiceCaller, initializeForkChoice} from "./forkChoice/index.js"; import {IBlsVerifier, BlsSingleThreadVerifier, BlsMultiThreadWorkerPool} from "./bls/index.js"; import { SeenAttesters, @@ -784,9 +784,9 @@ export class BeaconChain implements IBeaconChain { }; } - recomputeForkChoiceHead(): ProtoBlock { + recomputeForkChoiceHead(caller: ForkchoiceCaller): ProtoBlock { this.metrics?.forkChoice.requests.inc(); - const timer = this.metrics?.forkChoice.findHead.startTimer({entrypoint: FindHeadFnName.recomputeForkChoiceHead}); + const timer = this.metrics?.forkChoice.findHead.startTimer({caller}); try { return this.forkChoice.updateAndGetHead({mode: UpdateHeadOpt.GetCanonicialHead}).head; @@ -800,7 +800,7 @@ export class BeaconChain implements IBeaconChain { predictProposerHead(slot: Slot): ProtoBlock { this.metrics?.forkChoice.requests.inc(); - const timer = this.metrics?.forkChoice.findHead.startTimer({entrypoint: FindHeadFnName.predictProposerHead}); + const timer = this.metrics?.forkChoice.findHead.startTimer({caller: FindHeadFnName.predictProposerHead}); try { return this.forkChoice.updateAndGetHead({mode: UpdateHeadOpt.GetPredictedProposerHead, slot}).head; @@ -814,7 +814,7 @@ export class BeaconChain implements IBeaconChain { getProposerHead(slot: Slot): ProtoBlock { this.metrics?.forkChoice.requests.inc(); - const timer = this.metrics?.forkChoice.findHead.startTimer({entrypoint: FindHeadFnName.getProposerHead}); + const timer = this.metrics?.forkChoice.findHead.startTimer({caller: FindHeadFnName.getProposerHead}); const secFromSlot = this.clock.secFromSlot(slot); try { @@ -1060,8 +1060,8 @@ export class BeaconChain implements IBeaconChain { if (this.forkChoice.irrecoverableError) { this.processShutdownCallback(this.forkChoice.irrecoverableError); } - this.forkChoice.updateTime(slot); + this.forkChoice.updateTime(slot); this.metrics?.clockSlot.set(slot); this.attestationPool.prune(slot); diff --git a/packages/beacon-node/src/chain/forkChoice/index.ts b/packages/beacon-node/src/chain/forkChoice/index.ts index 346a4afe1e7f..839975de4e26 100644 --- a/packages/beacon-node/src/chain/forkChoice/index.ts +++ b/packages/beacon-node/src/chain/forkChoice/index.ts @@ -27,6 +27,11 @@ export type ForkChoiceOpts = RawForkChoiceOpts & { forkchoiceConstructor?: typeof ForkChoice; }; +export enum ForkchoiceCaller { + prepareNextSlot = "prepare_next_slot", + importBlock = "import_block", +} + /** * Fork Choice extended with a ChainEventEmitter */ diff --git a/packages/beacon-node/src/chain/interface.ts b/packages/beacon-node/src/chain/interface.ts index 531f60dc0e63..3b44ffd594ae 100644 --- a/packages/beacon-node/src/chain/interface.ts +++ b/packages/beacon-node/src/chain/interface.ts @@ -58,6 +58,7 @@ import {ShufflingCache} from "./shufflingCache.js"; import {BlockRewards} from "./rewards/blockRewards.js"; import {AttestationsRewards} from "./rewards/attestationsRewards.js"; import {SyncCommitteeRewards} from "./rewards/syncCommitteeRewards.js"; +import {ForkchoiceCaller} from "./forkChoice/index.js"; export {BlockType, type AssembledBlockType}; export {type ProposerPreparationData}; @@ -204,7 +205,7 @@ export interface IBeaconChain { getStatus(): phase0.Status; - recomputeForkChoiceHead(): ProtoBlock; + recomputeForkChoiceHead(caller: ForkchoiceCaller): ProtoBlock; /** When proposerBoostReorg is enabled, this is called at slot n-1 to predict the head block to build on if we are proposing at slot n */ predictProposerHead(slot: Slot): ProtoBlock; diff --git a/packages/beacon-node/src/chain/prepareNextSlot.ts b/packages/beacon-node/src/chain/prepareNextSlot.ts index 48724ab25b0b..bda618758842 100644 --- a/packages/beacon-node/src/chain/prepareNextSlot.ts +++ b/packages/beacon-node/src/chain/prepareNextSlot.ts @@ -18,6 +18,7 @@ import {isQueueErrorAborted} from "../util/queue/index.js"; import {prepareExecutionPayload, getPayloadAttributesForSSE} from "./produceBlock/produceBlockBody.js"; import {IBeaconChain} from "./interface.js"; import {RegenCaller} from "./regen/index.js"; +import {ForkchoiceCaller} from "./forkChoice/index.js"; /* With 12s slot times, this scheduler will run 4s before the start of each slot (`12 / 3 = 4`). */ export const SCHEDULER_LOOKAHEAD_FACTOR = 3; @@ -77,7 +78,9 @@ export class PrepareNextSlotScheduler { await sleep(slotMs - slotMs / SCHEDULER_LOOKAHEAD_FACTOR, this.signal); // calling updateHead() here before we produce a block to reduce reorg possibility - const {slot: headSlot, blockRoot: headRoot} = this.chain.recomputeForkChoiceHead(); + const {slot: headSlot, blockRoot: headRoot} = this.chain.recomputeForkChoiceHead( + ForkchoiceCaller.prepareNextSlot + ); // PS: previously this was comparing slots, but that gave no leway on the skipped // slots on epoch bounday. Making it more fluid. diff --git a/packages/beacon-node/src/metrics/metrics/beacon.ts b/packages/beacon-node/src/metrics/metrics/beacon.ts index 6e86328ee561..1737a5a2468f 100644 --- a/packages/beacon-node/src/metrics/metrics/beacon.ts +++ b/packages/beacon-node/src/metrics/metrics/beacon.ts @@ -58,11 +58,11 @@ export function createBeaconMetrics(register: RegistryMetricCreator) { // Non-spec'ed forkChoice: { - findHead: register.histogram<{entrypoint: string}>({ + findHead: register.histogram<{caller: string}>({ name: "beacon_fork_choice_find_head_seconds", help: "Time taken to find head in seconds", buckets: [0.1, 1, 10], - labelNames: ["entrypoint"], + labelNames: ["caller"], }), requests: register.gauge({ name: "beacon_fork_choice_requests_total", diff --git a/packages/beacon-node/test/unit/chain/prepareNextSlot.test.ts b/packages/beacon-node/test/unit/chain/prepareNextSlot.test.ts index 9ce121e976d0..c7d0a7801fec 100644 --- a/packages/beacon-node/test/unit/chain/prepareNextSlot.test.ts +++ b/packages/beacon-node/test/unit/chain/prepareNextSlot.test.ts @@ -98,7 +98,7 @@ describe("PrepareNextSlot scheduler", () => { scheduler.prepareForNextSlot(2 * SLOTS_PER_EPOCH - 1), vi.advanceTimersByTimeAsync((config.SECONDS_PER_SLOT * 1000 * 2) / 3), ]); - expect(chainStub.recomputeForkChoiceHead).toHaveBeenCalledWith(); + expect(chainStub.recomputeForkChoiceHead).toHaveBeenCalledOnce(); expect(regenStub.getBlockSlotState).not.toHaveBeenCalled(); }); diff --git a/packages/fork-choice/src/forkChoice/forkChoice.ts b/packages/fork-choice/src/forkChoice/forkChoice.ts index 828a16725444..6ca3fd3a183a 100644 --- a/packages/fork-choice/src/forkChoice/forkChoice.ts +++ b/packages/fork-choice/src/forkChoice/forkChoice.ts @@ -1,4 +1,4 @@ -import {Logger, fromHex, toRootHex} from "@lodestar/utils"; +import {Logger, MapDef, fromHex, toRootHex} from "@lodestar/utils"; import {SLOTS_PER_HISTORICAL_ROOT, SLOTS_PER_EPOCH, INTERVALS_PER_SLOT} from "@lodestar/params"; import {bellatrix, Slot, ValidatorIndex, phase0, ssz, RootHex, Epoch, Root, BeaconBlock} from "@lodestar/types"; import { @@ -34,7 +34,6 @@ import {ForkChoiceError, ForkChoiceErrorCode, InvalidBlockCode, InvalidAttestati import { IForkChoice, LatestMessage, - QueuedAttestation, PowBlockHex, EpochDifference, AncestorResult, @@ -91,7 +90,9 @@ export class ForkChoice implements IForkChoice { * Attestations that arrived at the current slot and must be queued for later processing. * NOT currently tracked in the protoArray */ - private readonly queuedAttestations = new Set(); + private readonly queuedAttestations: MapDef>> = new MapDef( + () => new MapDef(() => new Set()) + ); // Note: as of Jun 2022 Lodestar metrics show that 100% of the times updateHead() is called, synced = false. // Because we are processing attestations from gossip, recomputing scores is always necessary @@ -128,9 +129,13 @@ export class ForkChoice implements IForkChoice { } getMetrics(): ForkChoiceMetrics { + let numAttestations = 0; + for (const indicesByRoot of this.queuedAttestations.values()) { + numAttestations += Array.from(indicesByRoot.values()).reduce((acc, indices) => acc + indices.size, 0); + } return { votes: this.votes.length, - queuedAttestations: this.queuedAttestations.size, + queuedAttestations: numAttestations, validatedAttestationDatas: this.validatedAttestationDatas.size, balancesLength: this.balances.length, nodes: this.protoArray.nodes.length, @@ -716,12 +721,13 @@ export class ForkChoice implements IForkChoice { // Attestations can only affect the fork choice of subsequent slots. // Delay consideration in the fork choice until their slot is in the past. // ``` - this.queuedAttestations.add({ - slot: slot, - attestingIndices: attestation.attestingIndices, - blockRoot: blockRootHex, - targetEpoch, - }); + const byRoot = this.queuedAttestations.getOrDefault(slot); + const validatorIndices = byRoot.getOrDefault(blockRootHex); + for (const validatorIndex of attestation.attestingIndices) { + if (!this.fcStore.equivocatingIndices.has(validatorIndex)) { + validatorIndices.add(validatorIndex); + } + } } } @@ -751,6 +757,11 @@ export class ForkChoice implements IForkChoice { /** * Call `onTick` for all slots between `fcStore.getCurrentSlot()` and the provided `currentSlot`. + * This should only be called once per slot because: + * - calling this multiple times in the same slot does not update `votes` + * - new attestations in the current slot must stay in the queue + * - new attestations in the old slots are applied to the `votes` already + * - also side effect of this function is `validatedAttestationDatas` reseted */ updateTime(currentSlot: Slot): void { if (this.fcStore.currentSlot >= currentSlot) return; @@ -1352,15 +1363,19 @@ export class ForkChoice implements IForkChoice { */ private processAttestationQueue(): void { const currentSlot = this.fcStore.currentSlot; - for (const attestation of this.queuedAttestations.values()) { - // Delay consideration in the fork choice until their slot is in the past. - if (attestation.slot < currentSlot) { - this.queuedAttestations.delete(attestation); - const {blockRoot, targetEpoch} = attestation; - const blockRootHex = blockRoot; - for (const validatorIndex of attestation.attestingIndices) { - this.addLatestMessage(validatorIndex, targetEpoch, blockRootHex); + for (const [slot, byRoot] of this.queuedAttestations.entries()) { + const targetEpoch = computeEpochAtSlot(slot); + if (slot < currentSlot) { + this.queuedAttestations.delete(slot); + for (const [blockRoot, validatorIndices] of byRoot.entries()) { + const blockRootHex = blockRoot; + for (const validatorIndex of validatorIndices) { + // equivocatingIndices was checked in onAttestation + this.addLatestMessage(validatorIndex, targetEpoch, blockRootHex); + } } + } else { + break; } } } diff --git a/packages/fork-choice/src/forkChoice/interface.ts b/packages/fork-choice/src/forkChoice/interface.ts index 0b6d56a88bf2..9ac8cdfac81b 100644 --- a/packages/fork-choice/src/forkChoice/interface.ts +++ b/packages/fork-choice/src/forkChoice/interface.ts @@ -250,14 +250,3 @@ export type LatestMessage = { epoch: Epoch; root: RootHex; }; - -/** - * Used for queuing attestations from the current slot. Only contains the minimum necessary - * information about the attestation. - */ -export type QueuedAttestation = { - slot: Slot; - attestingIndices: ValidatorIndex[]; - blockRoot: RootHex; - targetEpoch: Epoch; -}; diff --git a/packages/fork-choice/src/protoArray/computeDeltas.ts b/packages/fork-choice/src/protoArray/computeDeltas.ts index 8301c1e77a75..2b87deb098da 100644 --- a/packages/fork-choice/src/protoArray/computeDeltas.ts +++ b/packages/fork-choice/src/protoArray/computeDeltas.ts @@ -3,6 +3,9 @@ import {EffectiveBalanceIncrements} from "@lodestar/state-transition"; import {VoteTracker} from "./interface.js"; import {ProtoArrayError, ProtoArrayErrorCode} from "./errors.js"; +// reuse arrays to avoid memory reallocation and gc +const deltas = new Array(); + /** * Returns a list of `deltas`, where there is one delta for each of the indices in `indices` * @@ -19,10 +22,8 @@ export function computeDeltas( newBalances: EffectiveBalanceIncrements, equivocatingIndices: Set ): number[] { - const deltas = new Array(numProtoNodes); - for (let i = 0; i < numProtoNodes; i++) { - deltas[i] = 0; - } + deltas.length = numProtoNodes; + deltas.fill(0); // avoid creating new variables in the loop to potentially reduce GC pressure let oldBalance, newBalance: number; @@ -47,7 +48,7 @@ export function computeDeltas( // It is possible that there was a vote for an unknown validator if we change our justified // state to a new state with a higher epoch that is on a different fork because that fork may have // on-boarded fewer validators than the prior fork. - newBalance = newBalances[vIndex] ?? 0; + newBalance = newBalances === oldBalances ? oldBalance : newBalances[vIndex] ?? 0; if (equivocatingIndices.size > 0 && equivocatingIndices.has(vIndex)) { // this function could be called multiple times but we only want to process slashing validator for 1 time