diff --git a/packages/beacon-node/src/chain/opPools/aggregatedAttestationPool.ts b/packages/beacon-node/src/chain/opPools/aggregatedAttestationPool.ts index 00309d322a11..db7922758732 100644 --- a/packages/beacon-node/src/chain/opPools/aggregatedAttestationPool.ts +++ b/packages/beacon-node/src/chain/opPools/aggregatedAttestationPool.ts @@ -1,12 +1,6 @@ import bls from "@chainsafe/bls"; import {toHexString} from "@chainsafe/ssz"; -import { - ForkName, - MAX_ATTESTATIONS, - MIN_ATTESTATION_INCLUSION_DELAY, - SLOTS_PER_EPOCH, - TIMELY_SOURCE_FLAG_INDEX, -} from "@lodestar/params"; +import {ForkName, MAX_ATTESTATIONS, MIN_ATTESTATION_INCLUSION_DELAY, SLOTS_PER_EPOCH} from "@lodestar/params"; import {phase0, Epoch, Slot, ssz, ValidatorIndex, RootHex} from "@lodestar/types"; import { CachedBeaconStateAllForks, @@ -24,9 +18,17 @@ import {InsertOutcome} from "./types.js"; type DataRootHex = string; +type CommitteeIndex = number; + type AttestationWithScore = {attestation: phase0.Attestation; score: number}; -type GetParticipationFn = (epoch: Epoch, committee: number[]) => Set | null; +/** + * This function returns not seen participation for a given epoch and committee. + * Return null if all validators are seen or no info to check. + */ +type GetNotSeenValidatorsFn = (epoch: Epoch, committee: number[]) => Set | null; + +type ValidateAttestationDataFn = (attData: phase0.AttestationData) => boolean; /** * Limit the max attestations with the same AttestationData. @@ -37,13 +39,13 @@ type GetParticipationFn = (epoch: Epoch, committee: number[]) => Set | n const MAX_RETAINED_ATTESTATIONS_PER_GROUP = 4; /** - * On mainnet, each slot has 64 committees, and each block has 128 attestations max so we don't - * want to store more than 2 per group. + * On mainnet, each slot has 64 committees, and each block has 128 attestations max so in average + * we get 2 attestation per groups. + * Starting from Jan 2024, we have a performance issue getting attestations for a block. Based on the + * fact that lot of groups will have only 1 attestation since it's full of participation increase this number + * a bit higher than average. This also help decrease number of slots to search for attestations. */ -const MAX_ATTESTATIONS_PER_GROUP = 2; - -/** Same to https://github.com/ethereum/eth2.0-specs/blob/v1.1.0-alpha.5/specs/altair/beacon-chain.md#has_flag */ -const TIMELY_SOURCE = 1 << TIMELY_SOURCE_FLAG_INDEX; +const MAX_ATTESTATIONS_PER_GROUP = 3; /** * Maintain a pool of aggregated attestations. Attestations can be retrieved for inclusion in a block @@ -52,19 +54,22 @@ const TIMELY_SOURCE = 1 << TIMELY_SOURCE_FLAG_INDEX; * Note that we want to remove attestations with attesters that were included in the chain. */ export class AggregatedAttestationPool { - private readonly attestationGroupByDataHashBySlot = new MapDef>( - () => new Map() - ); + private readonly attestationGroupByDataHashByIndexBySlot = new MapDef< + Slot, + Map> + >(() => new Map>()); private lowestPermissibleSlot = 0; /** For metrics to track size of the pool */ getAttestationCount(): {attestationCount: number; attestationDataCount: number} { let attestationCount = 0; let attestationDataCount = 0; - for (const attestationGroupByData of this.attestationGroupByDataHashBySlot.values()) { - attestationDataCount += attestationGroupByData.size; - for (const attestationGroup of attestationGroupByData.values()) { - attestationCount += attestationGroup.getAttestationCount(); + for (const attestationGroupByDataByIndex of this.attestationGroupByDataHashByIndexBySlot.values()) { + for (const attestationGroupByData of attestationGroupByDataByIndex.values()) { + attestationDataCount += attestationGroupByData.size; + for (const attestationGroup of attestationGroupByData.values()) { + attestationCount += attestationGroup.getAttestationCount(); + } } } return {attestationCount, attestationDataCount}; @@ -84,7 +89,12 @@ export class AggregatedAttestationPool { return InsertOutcome.Old; } - const attestationGroupByDataHash = this.attestationGroupByDataHashBySlot.getOrDefault(slot); + const attestationGroupByDataHashByIndex = this.attestationGroupByDataHashByIndexBySlot.getOrDefault(slot); + let attestationGroupByDataHash = attestationGroupByDataHashByIndex.get(attestation.data.index); + if (!attestationGroupByDataHash) { + attestationGroupByDataHash = new Map(); + attestationGroupByDataHashByIndex.set(attestation.data.index, attestationGroupByDataHash); + } let attestationGroup = attestationGroupByDataHash.get(dataRootHex); if (!attestationGroup) { attestationGroup = new MatchingDataAttestationGroup(committee, attestation.data); @@ -100,7 +110,7 @@ export class AggregatedAttestationPool { /** Remove attestations which are too old to be included in a block. */ prune(clockSlot: Slot): void { // Only retain SLOTS_PER_EPOCH slots - pruneBySlot(this.attestationGroupByDataHashBySlot, clockSlot, SLOTS_PER_EPOCH); + pruneBySlot(this.attestationGroupByDataHashByIndexBySlot, clockSlot, SLOTS_PER_EPOCH); this.lowestPermissibleSlot = Math.max(clockSlot - SLOTS_PER_EPOCH, 0); } @@ -112,15 +122,19 @@ export class AggregatedAttestationPool { const stateEpoch = state.epochCtx.epoch; const statePrevEpoch = stateEpoch - 1; - const getParticipation = getParticipationFn(state); + const notSeenValidatorsFn = getNotSeenValidatorsFn(state); + const validateAttestationDataFn = getValidateAttestationDataFn(forkChoice, state); const attestationsByScore: AttestationWithScore[] = []; - const slots = Array.from(this.attestationGroupByDataHashBySlot.keys()).sort((a, b) => b - a); + const slots = Array.from(this.attestationGroupByDataHashByIndexBySlot.keys()).sort((a, b) => b - a); + let minScore = Number.MAX_SAFE_INTEGER; + let slotCount = 0; slot: for (const slot of slots) { - const attestationGroupByDataHash = this.attestationGroupByDataHashBySlot.get(slot); + slotCount++; + const attestationGroupByDataHashByIndex = this.attestationGroupByDataHashByIndexBySlot.get(slot); // should not happen - if (!attestationGroupByDataHash) { + if (!attestationGroupByDataHashByIndex) { throw Error(`No aggregated attestation pool for slot=${slot}`); } @@ -134,34 +148,64 @@ export class AggregatedAttestationPool { continue; // Invalid attestations } - const attestationGroups = Array.from(attestationGroupByDataHash.values()); - for (const attestationGroup of attestationGroups) { - if (!isValidAttestationData(forkChoice, state, attestationGroup.data)) { + const slotDelta = stateSlot - slot; + const shuffling = state.epochCtx.getShufflingAtEpoch(epoch); + const slotCommittees = shuffling.committees[slot % SLOTS_PER_EPOCH]; + for (const [committeeIndex, attestationGroupByData] of attestationGroupByDataHashByIndex.entries()) { + // all attestations will be validated against the state in next step so we can get committee from the state + // this is an improvement to save the notSeenValidatorsFn call for the same slot/index instead of the same attestation data + if (committeeIndex > slotCommittees.length) { + // invalid index, should not happen continue; } - const participation = getParticipation(epoch, attestationGroup.committee); - if (participation === null) { + + const committee = slotCommittees[committeeIndex]; + const notSeenAttestingIndices = notSeenValidatorsFn(epoch, committee); + if (notSeenAttestingIndices === null || notSeenAttestingIndices.size === 0) { + continue; + } + + if ( + slotCount > 2 && + attestationsByScore.length >= MAX_ATTESTATIONS && + notSeenAttestingIndices.size / slotDelta < minScore + ) { + // after 2 slots, there are a good chance that we have 2 * MAX_ATTESTATIONS attestations and break the for loop early + // if not, we may have to scan all slots in the pool + // if we have enough attestations and the max possible score is lower than scores of `attestationsByScore`, we should skip + // otherwise it takes time to check attestation, add it and remove it later after the sort by score continue; } - // TODO: Is it necessary to validateAttestation for: - // - Attestation committee index not within current committee count - // - Attestation aggregation bits length does not match committee length - // - // These properties should not change after being validate in gossip - // IF they have to be validated, do it only with one attestation per group since same data - // The committeeCountPerSlot can be precomputed once per slot - - attestationsByScore.push( - ...attestationGroup.getAttestationsForBlock(participation).map((attestation) => ({ - attestation: attestation.attestation, - score: attestation.notSeenAttesterCount / (stateSlot - slot), - })) - ); - - // Stop accumulating attestations there are enough that may have good scoring - if (attestationsByScore.length > MAX_ATTESTATIONS * 2) { - break slot; + for (const attestationGroup of attestationGroupByData.values()) { + if (!validateAttestationDataFn(attestationGroup.data)) { + continue; + } + + // TODO: Is it necessary to validateAttestation for: + // - Attestation committee index not within current committee count + // - Attestation aggregation bits length does not match committee length + // + // These properties should not change after being validate in gossip + // IF they have to be validated, do it only with one attestation per group since same data + // The committeeCountPerSlot can be precomputed once per slot + for (const {attestation, notSeenAttesterCount} of attestationGroup.getAttestationsForBlock( + notSeenAttestingIndices + )) { + const score = notSeenAttesterCount / slotDelta; + if (score < minScore) { + minScore = score; + } + attestationsByScore.push({ + attestation, + score, + }); + } + + // Stop accumulating attestations there are enough that may have good scoring + if (attestationsByScore.length >= MAX_ATTESTATIONS * 2) { + break slot; + } } } } @@ -183,13 +227,15 @@ export class AggregatedAttestationPool { * @param bySlot slot to filter, `bySlot === attestation.data.slot` */ getAll(bySlot?: Slot): phase0.Attestation[] { - let attestationGroupsArr: Map[]; + let attestationGroupsArr: Map[]; if (bySlot === undefined) { - attestationGroupsArr = Array.from(this.attestationGroupByDataHashBySlot.values()); + attestationGroupsArr = Array.from(this.attestationGroupByDataHashByIndexBySlot.values()).flatMap((byIndex) => + Array.from(byIndex.values()) + ); } else { - const attestationGroups = this.attestationGroupByDataHashBySlot.get(bySlot); - if (!attestationGroups) throw Error(`No attestations for slot ${bySlot}`); - attestationGroupsArr = [attestationGroups]; + const attestationGroupsByIndex = this.attestationGroupByDataHashByIndexBySlot.get(bySlot); + if (!attestationGroupsByIndex) throw Error(`No attestations for slot ${bySlot}`); + attestationGroupsArr = Array.from(attestationGroupsByIndex.values()); } const attestations: phase0.Attestation[] = []; @@ -224,6 +270,7 @@ export class MatchingDataAttestationGroup { private readonly attestations: AttestationWithIndex[] = []; constructor( + // TODO: no need committee here readonly committee: ValidatorIndex[], readonly data: phase0.AttestationData ) {} @@ -284,24 +331,18 @@ export class MatchingDataAttestationGroup { return InsertOutcome.NewData; } - getAttestationsForBlock(seenAttestingIndices: Set): AttestationNonParticipant[] { + /** + * Get AttestationNonParticipant for this groups of same attestation data. + * @param notSeenAttestingIndices not seen attestting indices, i.e. indices in the same committee + * @returns an array of AttestationNonParticipant + */ + getAttestationsForBlock(notSeenAttestingIndices: Set): AttestationNonParticipant[] { const attestations: AttestationNonParticipant[] = []; - - const committeeLen = this.committee.length; - const committeeSeenAttesting = new Array(committeeLen); - - // Intersect committee with participation only once for all attestations - for (let i = 0; i < committeeLen; i++) { - committeeSeenAttesting[i] = seenAttestingIndices.has(this.committee[i]); - } - for (const {attestation} of this.attestations) { - const {aggregationBits} = attestation; let notSeenAttesterCount = 0; - - for (let i = 0; i < committeeLen; i++) { - // TODO: Optimize aggregationBits.get() in bulk for the entire BitArray - if (!committeeSeenAttesting[i] && aggregationBits.get(i)) { + const {aggregationBits} = attestation; + for (const notSeenIndex of notSeenAttestingIndices) { + if (aggregationBits.get(notSeenIndex)) { notSeenAttesterCount++; } } @@ -311,9 +352,13 @@ export class MatchingDataAttestationGroup { } } - return attestations - .sort((a, b) => b.notSeenAttesterCount - a.notSeenAttesterCount) - .slice(0, MAX_ATTESTATIONS_PER_GROUP); + if (attestations.length <= MAX_ATTESTATIONS_PER_GROUP) { + return attestations; + } else { + return attestations + .sort((a, b) => b.notSeenAttesterCount - a.notSeenAttesterCount) + .slice(0, MAX_ATTESTATIONS_PER_GROUP); + } } /** Get attestations for API. */ @@ -335,7 +380,7 @@ export function aggregateInto(attestation1: AttestationWithIndex, attestation2: * Pre-compute participation from a CachedBeaconStateAllForks, for use to check if an attestation's committee * has already attested or not. */ -export function getParticipationFn(state: CachedBeaconStateAllForks): GetParticipationFn { +export function getNotSeenValidatorsFn(state: CachedBeaconStateAllForks): GetNotSeenValidatorsFn { if (state.config.getForkName(state.slot) === ForkName.phase0) { // Get attestations to be included in a phase0 block. // As we are close to altair, this is not really important, it's mainly for e2e. @@ -344,18 +389,29 @@ export function getParticipationFn(state: CachedBeaconStateAllForks): GetPartici const phase0State = state as CachedBeaconStatePhase0; const stateEpoch = computeEpochAtSlot(state.slot); - const previousEpochParticipants = extractParticipation( + const previousEpochParticipants = extractParticipationPhase0( phase0State.previousEpochAttestations.getAllReadonly(), state ); - const currentEpochParticipants = extractParticipation(phase0State.currentEpochAttestations.getAllReadonly(), state); - - return (epoch: Epoch) => { - return epoch === stateEpoch - ? currentEpochParticipants - : epoch === stateEpoch - 1 - ? previousEpochParticipants - : null; + const currentEpochParticipants = extractParticipationPhase0( + phase0State.currentEpochAttestations.getAllReadonly(), + state + ); + + return (epoch: Epoch, committee: number[]) => { + const participants = + epoch === stateEpoch ? currentEpochParticipants : epoch === stateEpoch - 1 ? previousEpochParticipants : null; + if (participants === null) { + return null; + } + + const notSeenAttestingIndices = new Set(); + for (const [i, validatorIndex] of committee.entries()) { + if (!participants.has(validatorIndex)) { + notSeenAttestingIndices.add(i); + } + } + return notSeenAttestingIndices.size === 0 ? null : notSeenAttestingIndices; }; } @@ -374,20 +430,24 @@ export function getParticipationFn(state: CachedBeaconStateAllForks): GetPartici const participationStatus = epoch === stateEpoch ? currentParticipation : epoch === stateEpoch - 1 ? previousParticipation : null; - if (participationStatus === null) return null; + if (participationStatus === null) { + return null; + } - const seenValidatorIndices = new Set(); - for (const validatorIndex of committee) { - if (flagIsTimelySource(participationStatus[validatorIndex])) { - seenValidatorIndices.add(validatorIndex); + const notSeenAttestingIndices = new Set(); + for (const [i, validatorIndex] of committee.entries()) { + // no need to check flagIsTimelySource as if validator is not seen, it's participation status is 0 + if (participationStatus[validatorIndex] === 0) { + notSeenAttestingIndices.add(i); } } - return seenValidatorIndices; + // if all validators are seen then return null, we don't need to check for any attestations of same committee again + return notSeenAttestingIndices.size === 0 ? null : notSeenAttestingIndices; }; } } -export function extractParticipation( +export function extractParticipationPhase0( attestations: phase0.PendingAttestation[], state: CachedBeaconStateAllForks ): Set { @@ -408,7 +468,56 @@ export function extractParticipation( } /** - * Do those validations: + * This returns a function to validate if an attestation data is compatible to a state, + * it's an optimized version of isValidAttestationData(). + * Atttestation data is validated by: + * - Validate the source checkpoint + * - Validate shuffling using beacon block root and target epoch + * + * Here we always validate the source checkpoint, and cache beacon block root + target epoch + * to avoid running the same shuffling validation multiple times. + */ +export function getValidateAttestationDataFn( + forkChoice: IForkChoice, + state: CachedBeaconStateAllForks +): ValidateAttestationDataFn { + const cachedValidatedAttestationData = new Map(); + const {previousJustifiedCheckpoint, currentJustifiedCheckpoint} = state; + const stateEpoch = state.epochCtx.epoch; + return (attData: phase0.AttestationData) => { + const targetEpoch = attData.target.epoch; + let justifiedCheckpoint; + // simple check first + if (targetEpoch === stateEpoch) { + justifiedCheckpoint = currentJustifiedCheckpoint; + } else if (targetEpoch === stateEpoch - 1) { + justifiedCheckpoint = previousJustifiedCheckpoint; + } else { + return false; + } + + if (!ssz.phase0.Checkpoint.equals(attData.source, justifiedCheckpoint)) return false; + + // Shuffling can't have changed if we're in the first few epochs + // Also we can't look back 2 epochs if target epoch is 1 or less + if (stateEpoch < 2 || targetEpoch < 2) { + return true; + } + + // the isValidAttestationData does not depend on slot and index + const beaconBlockRootHex = toHex(attData.beaconBlockRoot); + const cacheKey = beaconBlockRootHex + targetEpoch; + let isValid = cachedValidatedAttestationData.get(cacheKey); + if (isValid === undefined) { + isValid = isValidShuffling(forkChoice, state, beaconBlockRootHex, targetEpoch); + cachedValidatedAttestationData.set(cacheKey, isValid); + } + return isValid; + }; +} + +/** + * A straight forward version to validate attestation data. We don't use it, but keep it here for reference. * - Validate the source checkpoint * - Since we validated attestation's signature in gossip validation function, * we only need to validate the shuffling of attestation @@ -441,6 +550,16 @@ export function isValidAttestationData( if (stateEpoch < 2 || targetEpoch < 2) { return true; } + const beaconBlockRootHex = toHex(data.beaconBlockRoot); + return isValidShuffling(forkChoice, state, beaconBlockRootHex, targetEpoch); +} + +function isValidShuffling( + forkChoice: IForkChoice, + state: CachedBeaconStateAllForks, + blockRootHex: RootHex, + targetEpoch: Epoch +): boolean { // Otherwise the shuffling is determined by the block at the end of the target epoch // minus the shuffling lookahead (usually 2). We call this the "pivot". const pivotSlot = computeStartSlotAtEpoch(targetEpoch - 1) - 1; @@ -450,7 +569,7 @@ export function isValidAttestationData( // pivot block is the same as the current state's pivot block. If it is, then the // attestation's shuffling is the same as the current state's. // To account for skipped slots, find the first block at *or before* the pivot slot. - const beaconBlockRootHex = toHex(data.beaconBlockRoot); + const beaconBlockRootHex = blockRootHex; const beaconBlock = forkChoice.getBlockHex(beaconBlockRootHex); if (!beaconBlock) { throw Error(`Attestation data.beaconBlockRoot ${beaconBlockRootHex} not found in forkchoice`); @@ -467,7 +586,3 @@ export function isValidAttestationData( } return attestationDependentRoot === stateDependentRoot; } - -function flagIsTimelySource(flag: number): boolean { - return (flag & TIMELY_SOURCE) === TIMELY_SOURCE; -} diff --git a/packages/beacon-node/test/e2e/sync/finalizedSync.test.ts b/packages/beacon-node/test/e2e/sync/finalizedSync.test.ts new file mode 100644 index 000000000000..d3d06f1fc070 --- /dev/null +++ b/packages/beacon-node/test/e2e/sync/finalizedSync.test.ts @@ -0,0 +1,124 @@ +import {describe, it, afterEach} from "vitest"; +import {assert} from "chai"; +import {fromHexString} from "@chainsafe/ssz"; +import {ChainConfig} from "@lodestar/config"; +import {phase0} from "@lodestar/types"; +import {TimestampFormatCode} from "@lodestar/logger"; +import {SLOTS_PER_EPOCH} from "@lodestar/params"; +import {routes} from "@lodestar/api"; +import {EventData, EventType} from "@lodestar/api/lib/beacon/routes/events.js"; +import {getDevBeaconNode} from "../../utils/node/beacon.js"; +import {waitForEvent} from "../../utils/events/resolver.js"; +import {getAndInitDevValidators} from "../../utils/node/validator.js"; +import {ChainEvent} from "../../../src/chain/index.js"; +import {connect, onPeerConnect} from "../../utils/network.js"; +import {testLogger, LogLevel, TestLoggerOpts} from "../../utils/logger.js"; + +describe( + "sync / finalized sync", + function () { + const validatorCount = 8; + const testParams: Pick = { + // eslint-disable-next-line @typescript-eslint/naming-convention + SECONDS_PER_SLOT: 2, + }; + + const afterEachCallbacks: (() => Promise | void)[] = []; + afterEach(async () => { + while (afterEachCallbacks.length > 0) { + const callback = afterEachCallbacks.pop(); + if (callback) await callback(); + } + }); + + it("should do a finalized sync from another BN", async function () { + // single node at beginning, use main thread to verify bls + const genesisSlotsDelay = 4; + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + const genesisTime = Math.floor(Date.now() / 1000) + genesisSlotsDelay * testParams.SECONDS_PER_SLOT; + + const testLoggerOpts: TestLoggerOpts = { + level: LogLevel.info, + timestampFormat: { + format: TimestampFormatCode.EpochSlot, + genesisTime, + slotsPerEpoch: SLOTS_PER_EPOCH, + secondsPerSlot: testParams.SECONDS_PER_SLOT, + }, + }; + + const loggerNodeA = testLogger("FinalizedSync-Node-A", testLoggerOpts); + const loggerNodeB = testLogger("FinalizedSync-Node-B", testLoggerOpts); + + const bn = await getDevBeaconNode({ + params: testParams, + options: { + sync: {isSingleNode: true}, + network: {allowPublishToZeroPeers: true, useWorker: false}, + chain: {blsVerifyAllMainThread: true}, + }, + validatorCount, + genesisTime, + logger: loggerNodeA, + }); + + afterEachCallbacks.push(() => bn.close()); + + const {validators} = await getAndInitDevValidators({ + node: bn, + logPrefix: "FinalizedSyncVc", + validatorsPerClient: validatorCount, + validatorClientCount: 1, + startIndex: 0, + useRestApi: false, + testLoggerOpts, + }); + + afterEachCallbacks.push(() => Promise.all(validators.map((validator) => validator.close()))); + + // stop beacon node after validators + afterEachCallbacks.push(() => bn.close()); + + await waitForEvent(bn.chain.emitter, ChainEvent.forkChoiceFinalized, 240000); + loggerNodeA.info("Node A emitted finalized checkpoint event"); + + const bn2 = await getDevBeaconNode({ + params: testParams, + options: { + api: {rest: {enabled: false}}, + network: {useWorker: false}, + chain: {blsVerifyAllMainThread: true}, + }, + validatorCount, + genesisTime, + logger: loggerNodeB, + }); + loggerNodeA.info("Node B created"); + + afterEachCallbacks.push(() => bn2.close()); + afterEachCallbacks.push(() => bn2.close()); + + const headSummary = bn.chain.forkChoice.getHead(); + const head = await bn.db.block.get(fromHexString(headSummary.blockRoot)); + if (!head) throw Error("First beacon node has no head block"); + const waitForSynced = waitForEvent( + bn2.chain.emitter, + routes.events.EventType.head, + 100000, + ({block}) => block === headSummary.blockRoot + ); + + await Promise.all([connect(bn2.network, bn.network), onPeerConnect(bn2.network), onPeerConnect(bn.network)]); + loggerNodeA.info("Node A connected to Node B"); + + try { + await waitForSynced; + loggerNodeB.info("Node B synced to Node A, received head block", {slot: head.message.slot}); + } catch (e) { + assert.fail("Failed to sync to other node in time"); + } + }); + }, + // chain is finalized at slot 32, plus 4 slots for genesis delay => ~72s it should sync pretty fast + {timeout: 90000} +); diff --git a/packages/beacon-node/test/perf/chain/opPools/aggregatedAttestationPool.test.ts b/packages/beacon-node/test/perf/chain/opPools/aggregatedAttestationPool.test.ts index 5dd73bafa3f0..4ed8215ac85b 100644 --- a/packages/beacon-node/test/perf/chain/opPools/aggregatedAttestationPool.test.ts +++ b/packages/beacon-node/test/perf/chain/opPools/aggregatedAttestationPool.test.ts @@ -1,13 +1,13 @@ import {itBench} from "@dapplion/benchmark"; -import {expect} from "chai"; import {BitArray, toHexString} from "@chainsafe/ssz"; import { CachedBeaconStateAltair, computeEpochAtSlot, computeStartSlotAtEpoch, getBlockRootAtSlot, + newFilledArray, } from "@lodestar/state-transition"; -import {HISTORICAL_ROOTS_LIMIT, SLOTS_PER_EPOCH, TIMELY_SOURCE_FLAG_INDEX} from "@lodestar/params"; +import {HISTORICAL_ROOTS_LIMIT, SLOTS_PER_EPOCH} from "@lodestar/params"; import {ExecutionStatus, ForkChoice, IForkChoiceStore, ProtoArray} from "@lodestar/fork-choice"; import {ssz} from "@lodestar/types"; // eslint-disable-next-line import/no-relative-packages @@ -15,33 +15,24 @@ import {generatePerfTestCachedStateAltair} from "../../../../../state-transition import {AggregatedAttestationPool} from "../../../../src/chain/opPools/aggregatedAttestationPool.js"; import {computeAnchorCheckpoint} from "../../../../src/chain/initState.js"; -/** Same to https://github.com/ethereum/eth2.0-specs/blob/v1.1.0-alpha.5/specs/altair/beacon-chain.md#has_flag */ -const TIMELY_SOURCE = 1 << TIMELY_SOURCE_FLAG_INDEX; -function flagIsTimelySource(flag: number): boolean { - return (flag & TIMELY_SOURCE) === TIMELY_SOURCE; -} +const vc = 1_500_000; -// Aug 11 2021 -// getAttestationsForBlock -// ✓ getAttestationsForBlock 4.410948 ops/s 226.7086 ms/op - 64 runs 51.8 s -describe("getAttestationsForBlock", () => { +/** + * Jan 2024 + * getAttestationsForBlock vc=1500000 + * ✔ notSeenSlots=1 numMissedVotes=1 numBadVotes=10 10.48105 ops/s 95.41024 ms/op - 12 runs 18.2 s + * ✔ notSeenSlots=1 numMissedVotes=0 numBadVotes=4 11.44517 ops/s 87.37307 ms/op - 13 runs 14.5 s + * ✔ notSeenSlots=2 numMissedVotes=1 numBadVotes=10 23.86144 ops/s 41.90862 ms/op - 18 runs 34.1 s + */ +describe(`getAttestationsForBlock vc=${vc}`, () => { let originalState: CachedBeaconStateAltair; let protoArray: ProtoArray; let forkchoice: ForkChoice; before(function () { - this.timeout(2 * 60 * 1000); // Generating the states for the first time is very slow - - originalState = generatePerfTestCachedStateAltair({goBackOneSlot: true}); - - const previousEpochParticipationArr = originalState.previousEpochParticipation.getAll(); - const currentEpochParticipationArr = originalState.currentEpochParticipation.getAll(); - - const numPreviousEpochParticipation = previousEpochParticipationArr.filter(flagIsTimelySource).length; - const numCurrentEpochParticipation = currentEpochParticipationArr.filter(flagIsTimelySource).length; + this.timeout(5 * 60 * 1000); // Generating the states for the first time is very slow - expect(numPreviousEpochParticipation).to.equal(250000, "Wrong numPreviousEpochParticipation"); - expect(numCurrentEpochParticipation).to.equal(250000, "Wrong numCurrentEpochParticipation"); + originalState = generatePerfTestCachedStateAltair({goBackOneSlot: true, vc}); const {blockHeader, checkpoint} = computeAnchorCheckpoint(originalState.config, originalState); // TODO figure out why getBlockRootAtSlot(originalState, justifiedSlot) is not the same to justifiedCheckpoint.root @@ -125,17 +116,115 @@ describe("getAttestationsForBlock", () => { forkchoice = new ForkChoice(originalState.config, fcStore, protoArray); }); + // notSeenSlots should be >=1 + for (const [notSeenSlots, numMissedVotes, numBadVotes] of [ + [1, 1, 10], + [1, 0, 4], + // notSeenSlots=2 means the previous block slot is missed + [2, 1, 10], + ]) { + itBench({ + id: `notSeenSlots=${notSeenSlots} numMissedVotes=${numMissedVotes} numBadVotes=${numBadVotes}`, + before: () => { + const state = originalState.clone(); + // by default make all validators have full participation + const previousParticipation = newFilledArray(vc, 0b111); + // origState is at slot 0 of epoch so there is no currentParticipation + const currentParticipation = newFilledArray(vc, 0); + const currentEpoch = computeEpochAtSlot(state.slot); + + for (let epochSlot = 0; epochSlot < SLOTS_PER_EPOCH; epochSlot++) { + const slot = state.slot - 1 - epochSlot; + const slotEpoch = computeEpochAtSlot(slot); + const committeeCount = state.epochCtx.getCommitteeCountPerSlot(slotEpoch); + for (let committeeIndex = 0; committeeIndex < committeeCount; committeeIndex++) { + const duties = state.epochCtx.getBeaconCommittee(slot, committeeIndex); + const participationArr = slotEpoch === currentEpoch ? currentParticipation : previousParticipation; + for (const [i, validatorIndex] of duties.entries()) { + // no attestation in previous slot is included yet as that's the spec + // for slot < previous slot, there is missed votes at every committee so the code need to keep looking for attestations because votes are not seen + if (slot >= state.slot - notSeenSlots || i < numMissedVotes) { + participationArr[validatorIndex] = 0; + } + } + } + } + state.previousEpochParticipation = ssz.altair.EpochParticipation.toViewDU(previousParticipation); + state.currentEpochParticipation = ssz.altair.EpochParticipation.toViewDU(currentParticipation); + state.commit(); + return state; + }, + beforeEach: (state) => { + const pool = getAggregatedAttestationPool(state, numMissedVotes, numBadVotes); + return {state, pool}; + }, + fn: ({state, pool}) => { + pool.getAttestationsForBlock(forkchoice, state); + }, + }); + } +}); + +/** + * Fir dev purpose to find the best way to get not seen validators. + */ +describe.skip("getAttestationsForBlock aggregationBits intersectValues vs get", () => { + const runsFactor = 1000; + // As of Jan 2004 + const committeeLen = 450; + const aggregationBits = BitArray.fromBoolArray(Array.from({length: committeeLen}, () => true)); + const notSeenValidatorIndices = Array.from({length: committeeLen}, (_, i) => i); + itBench({ - id: "getAttestationsForBlock", - beforeEach: () => getAggregatedAttestationPool(originalState), - fn: (pool) => { - // logger.info("Number of attestations in pool", pool.getAll().length); - pool.getAttestationsForBlock(forkchoice, originalState); + id: "aggregationBits.intersectValues()", + fn: () => { + for (let i = 0; i < runsFactor; i++) { + aggregationBits.intersectValues(notSeenValidatorIndices); + } }, + runsFactor, + }); + + itBench({ + id: "aggregationBits.get()", + fn: () => { + for (let i = 0; i < runsFactor; i++) { + for (let j = 0; j < committeeLen; j++) { + aggregationBits.get(j); + } + } + }, + runsFactor, + }); + + itBench({ + id: "aggregationBits.get() with push()", + fn: () => { + for (let i = 0; i < runsFactor; i++) { + const arr: number[] = []; + for (let j = 0; j < committeeLen; j++) { + if (aggregationBits.get(j)) { + arr.push(j); + } + } + } + }, + runsFactor, }); }); -function getAggregatedAttestationPool(state: CachedBeaconStateAltair): AggregatedAttestationPool { +/** + * Create the pool with the following properties: + * - state: at slot n + * - all attestations at slot n - 1 are included in block but they are not enough + * - numMissedVotes: number of missed attestations/votes at every committee + * - numBadVotes: number of bad attestations/votes at every committee, they are not included in block because they are seen in the state + */ +function getAggregatedAttestationPool( + state: CachedBeaconStateAltair, + numMissedVotes: number, + numBadVotes: number +): AggregatedAttestationPool { const pool = new AggregatedAttestationPool(); for (let epochSlot = 0; epochSlot < SLOTS_PER_EPOCH; epochSlot++) { const slot = state.slot - 1 - epochSlot; @@ -145,31 +234,82 @@ function getAggregatedAttestationPool(state: CachedBeaconStateAltair): Aggregate epoch: state.currentJustifiedCheckpoint.epoch, root: state.currentJustifiedCheckpoint.root, }; + for (let committeeIndex = 0; committeeIndex < committeeCount; committeeIndex++) { - const attestation = { - aggregationBits: BitArray.fromBitLen(64), - data: { - slot: slot, - index: committeeIndex, - beaconBlockRoot: getBlockRootAtSlot(state, slot), - source: sourceCheckpoint, - target: { - epoch, - root: getBlockRootAtSlot(state, computeStartSlotAtEpoch(epoch)), - }, + const goodAttData = { + slot: slot, + index: committeeIndex, + beaconBlockRoot: getBlockRootAtSlot(state, slot), + source: sourceCheckpoint, + target: { + epoch, + root: getBlockRootAtSlot(state, computeStartSlotAtEpoch(epoch)), }, - signature: Buffer.alloc(96), }; + // for each good att data group, there are 4 versions of aggregation bits const committee = state.epochCtx.getBeaconCommittee(slot, committeeIndex); - // all attestation has full participation so getAttestationsForBlock() has to do a lot of filter - // aggregate_and_proof messages - pool.add( - attestation, - toHexString(ssz.phase0.AttestationData.hashTreeRoot(attestation.data)), - committee.length, - committee - ); + const committeeLen = committee.length; + const goodVoteBits = BitArray.fromBoolArray(Array.from({length: committeeLen}, () => true)); + // n first validators are totally missed + for (let i = 0; i < numMissedVotes; i++) { + goodVoteBits.set(i, false); + } + // n next validators vote for different att data + for (let i = 0; i < numBadVotes; i++) { + goodVoteBits.set(i + numMissedVotes, false); + } + + // there are 4 different versions of the good vote + for (const endingBits of [0b1000, 0b0100, 0b0010, 0b0001]) { + const aggregationBits = goodVoteBits.clone(); + aggregationBits.set(committeeLen - 1, Boolean(endingBits & 0b0001)); + aggregationBits.set(committeeLen - 2, Boolean(endingBits & 0b0010)); + aggregationBits.set(committeeLen - 3, Boolean(endingBits & 0b0100)); + aggregationBits.set(committeeLen - 4, Boolean(endingBits & 0b1000)); + + const attestation = { + aggregationBits, + data: goodAttData, + signature: Buffer.alloc(96), + }; + // all attestation has full participation so getAttestationsForBlock() has to do a lot of filter + // aggregate_and_proof messages + pool.add( + attestation, + toHexString(ssz.phase0.AttestationData.hashTreeRoot(attestation.data)), + committee.length, + committee + ); + } + + if (epochSlot === 0) { + // epochSlot === 0: attestations will be included in block but it's not enough for block + // epochSlot >= 1: no attestation will be included in block but the code still need to scan through them + continue; + } + + const zeroAggregationBits = BitArray.fromBoolArray(Array.from({length: committeeLen}, () => false)); + + // n first validator votes for n different bad votes, that makes n different att data in the same slot/index + // these votes/attestations will NOT be included in block as they are seen in the state + for (let i = 0; i < numBadVotes; i++) { + const attData = ssz.phase0.AttestationData.clone(goodAttData); + attData.beaconBlockRoot = getBlockRootAtSlot(state, slot - i - 1); + const aggregationBits = zeroAggregationBits.clone(); + aggregationBits.set(i + numMissedVotes, true); + const attestation = { + aggregationBits, + data: attData, + signature: Buffer.alloc(96), + }; + pool.add( + attestation, + toHexString(ssz.phase0.AttestationData.hashTreeRoot(attestation.data)), + committee.length, + committee + ); + } } } return pool; diff --git a/packages/beacon-node/test/unit/chain/opPools/aggregatedAttestationPool.test.ts b/packages/beacon-node/test/unit/chain/opPools/aggregatedAttestationPool.test.ts index 48abfbc35675..00aa8a40168b 100644 --- a/packages/beacon-node/test/unit/chain/opPools/aggregatedAttestationPool.test.ts +++ b/packages/beacon-node/test/unit/chain/opPools/aggregatedAttestationPool.test.ts @@ -2,14 +2,15 @@ import type {SecretKey} from "@chainsafe/bls/types"; import bls from "@chainsafe/bls"; import {BitArray, fromHexString, toHexString} from "@chainsafe/ssz"; import {describe, it, expect, beforeEach, beforeAll, afterEach, vi} from "vitest"; -import {CachedBeaconStateAllForks} from "@lodestar/state-transition"; -import {SLOTS_PER_EPOCH} from "@lodestar/params"; +import {CachedBeaconStateAllForks, newFilledArray} from "@lodestar/state-transition"; +import {FAR_FUTURE_EPOCH, MAX_EFFECTIVE_BALANCE, SLOTS_PER_EPOCH} from "@lodestar/params"; import {ssz, phase0} from "@lodestar/types"; +import {CachedBeaconStateAltair} from "@lodestar/state-transition/src/types.js"; import {MockedForkChoice, getMockedForkChoice} from "../../../mocks/mockedBeaconChain.js"; import { AggregatedAttestationPool, aggregateInto, - getParticipationFn, + getNotSeenValidatorsFn, MatchingDataAttestationGroup, } from "../../../../src/chain/opPools/aggregatedAttestationPool.js"; import {InsertOutcome} from "../../../../src/chain/opPools/types.js"; @@ -18,6 +19,7 @@ import {generateCachedAltairState} from "../../../utils/state.js"; import {renderBitArray} from "../../../utils/render.js"; import {ZERO_HASH_HEX} from "../../../../src/constants/constants.js"; import {generateProtoBlock} from "../../../utils/typeGenerator.js"; +import {generateValidators} from "../../../utils/validator.js"; /** Valid signature of random data to prevent BLS errors */ const validSignature = fromHexString( @@ -29,15 +31,43 @@ describe("AggregatedAttestationPool", function () { const altairForkEpoch = 2020; const currentEpoch = altairForkEpoch + 10; const currentSlot = SLOTS_PER_EPOCH * currentEpoch; - const originalState = generateCachedAltairState({slot: currentSlot + 1}, altairForkEpoch); - let altairState: CachedBeaconStateAllForks; + const committeeIndex = 0; const attestation = ssz.phase0.Attestation.defaultValue(); attestation.data.slot = currentSlot; + attestation.data.index = committeeIndex; attestation.data.target.epoch = currentEpoch; const attDataRootHex = toHexString(ssz.phase0.AttestationData.hashTreeRoot(attestation.data)); - const committee = [0, 1, 2, 3]; + const validatorOpts = { + activationEpoch: 0, + effectiveBalance: MAX_EFFECTIVE_BALANCE, + withdrawableEpoch: FAR_FUTURE_EPOCH, + exitEpoch: FAR_FUTURE_EPOCH, + }; + // this makes a committee length of 4 + const vc = 64; + const committeeLength = 4; + const validators = generateValidators(vc, validatorOpts); + const originalState = generateCachedAltairState({slot: currentSlot + 1, validators}, altairForkEpoch); + const committee = originalState.epochCtx.getBeaconCommittee(currentSlot, committeeIndex); + expect(committee.length).toEqual(committeeLength); + // 0 and 1 in committee are fully participated + const epochParticipation = newFilledArray(vc, 0b111); + for (let i = 0; i < committeeLength; i++) { + if (i === 0 || i === 1) { + epochParticipation[committee[i]] = 0b111; + } else { + epochParticipation[committee[i]] = 0b000; + } + } + (originalState as CachedBeaconStateAltair).previousEpochParticipation = + ssz.altair.EpochParticipation.toViewDU(epochParticipation); + (originalState as CachedBeaconStateAltair).currentEpochParticipation = + ssz.altair.EpochParticipation.toViewDU(epochParticipation); + originalState.commit(); + let altairState: CachedBeaconStateAllForks; + let forkchoiceStub: MockedForkChoice; beforeEach(() => { @@ -53,9 +83,16 @@ describe("AggregatedAttestationPool", function () { it("getParticipationFn", () => { // previousEpochParticipation and currentEpochParticipation is created inside generateCachedState // 0 and 1 are fully participated - const participationFn = getParticipationFn(altairState); - const participation = participationFn(currentEpoch, committee); - expect(participation).toEqual(new Set([0, 1])); + const notSeenValidatorFn = getNotSeenValidatorsFn(altairState); + const participation = notSeenValidatorFn(currentEpoch, committee); + // seen attesting indices are 0, 1 => not seen are 2, 3 + expect(participation).toEqual( + // { + // validatorIndices: [null, null, committee[2], committee[3]], + // attestingIndices: new Set([2, 3]), + // } + new Set([2, 3]) + ); }); // previousEpochParticipation and currentEpochParticipation is created inside generateCachedState @@ -68,7 +105,7 @@ describe("AggregatedAttestationPool", function () { for (const {name, attestingBits, isReturned} of testCases) { it(name, function () { - const aggregationBits = new BitArray(new Uint8Array(attestingBits), 8); + const aggregationBits = new BitArray(new Uint8Array(attestingBits), committeeLength); pool.add( {...attestation, aggregationBits}, attDataRootHex, @@ -180,13 +217,14 @@ describe("MatchingDataAttestationGroup.add()", () => { describe("MatchingDataAttestationGroup.getAttestationsForBlock", () => { const testCases: { id: string; - seenAttestingBits: number[]; + notSeenAttestingBits: number[]; attestationsToAdd: {bits: number[]; notSeenAttesterCount: number}[]; }[] = [ // Note: attestationsToAdd MUST intersect in order to not be aggregated and distort the results { id: "All have attested", - seenAttestingBits: [0b11111111], + // same to seenAttestingBits: [0b11111111], + notSeenAttestingBits: [0b00000000], attestationsToAdd: [ {bits: [0b11111110], notSeenAttesterCount: 0}, {bits: [0b00000011], notSeenAttesterCount: 0}, @@ -194,7 +232,8 @@ describe("MatchingDataAttestationGroup.getAttestationsForBlock", () => { }, { id: "Some have attested", - seenAttestingBits: [0b11110001], // equals to indexes [ 0, 4, 5, 6, 7 ] + // same to seenAttestingBits: [0b11110001] + notSeenAttestingBits: [0b00001110], attestationsToAdd: [ {bits: [0b11111110], notSeenAttesterCount: 3}, {bits: [0b00000011], notSeenAttesterCount: 1}, @@ -202,7 +241,8 @@ describe("MatchingDataAttestationGroup.getAttestationsForBlock", () => { }, { id: "Non have attested", - seenAttestingBits: [0b00000000], + // same to seenAttestingBits: [0b00000000], + notSeenAttestingBits: [0b11111111], attestationsToAdd: [ {bits: [0b11111110], notSeenAttesterCount: 7}, {bits: [0b00000011], notSeenAttesterCount: 2}, @@ -213,7 +253,7 @@ describe("MatchingDataAttestationGroup.getAttestationsForBlock", () => { const attestationData = ssz.phase0.AttestationData.defaultValue(); const committee = linspace(0, 7); - for (const {id, seenAttestingBits, attestationsToAdd} of testCases) { + for (const {id, notSeenAttestingBits, attestationsToAdd} of testCases) { it(id, () => { const attestationGroup = new MatchingDataAttestationGroup(committee, attestationData); @@ -229,8 +269,19 @@ describe("MatchingDataAttestationGroup.getAttestationsForBlock", () => { attestationGroup.add({attestation, trueBitsCount: attestation.aggregationBits.getTrueBitIndexes().length}); } - const indices = new BitArray(new Uint8Array(seenAttestingBits), 8).intersectValues(committee); - const attestationsForBlock = attestationGroup.getAttestationsForBlock(new Set(indices)); + const notSeenAggBits = new BitArray(new Uint8Array(notSeenAttestingBits), 8); + // const notSeenValidatorIndices: (ValidatorIndex | null)[] = []; + const notSeenAttestingIndices = new Set(); + for (let i = 0; i < committee.length; i++) { + // notSeenValidatorIndices.push(notSeenAggBits.get(i) ? committee[i] : null); + if (notSeenAggBits.get(i)) { + notSeenAttestingIndices.add(i); + } + } + const attestationsForBlock = attestationGroup.getAttestationsForBlock( + // notSeenValidatorIndices, + notSeenAttestingIndices + ); for (const [i, {notSeenAttesterCount}] of attestationsToAdd.entries()) { const attestation = attestationsForBlock.find((a) => a.attestation === attestations[i]);