diff --git a/packages/beacon-node/src/network/gossip/interface.ts b/packages/beacon-node/src/network/gossip/interface.ts index 825178532ee8..f61008b50570 100644 --- a/packages/beacon-node/src/network/gossip/interface.ts +++ b/packages/beacon-node/src/network/gossip/interface.ts @@ -171,6 +171,7 @@ export type GossipHandlerParamGeneric = { export type GossipHandlers = { [K in GossipType]: | ((gossipHandlerParam: GossipHandlerParamGeneric) => Promise) + // TODO: make it generic | ((gossipHandlerParams: GossipHandlerParamGeneric[]) => Promise<(null | AttestationError)[]>); }; diff --git a/packages/beacon-node/src/network/processor/gossipHandlers.ts b/packages/beacon-node/src/network/processor/gossipHandlers.ts index ee706dbb7bd4..5e476c9ad413 100644 --- a/packages/beacon-node/src/network/processor/gossipHandlers.ts +++ b/packages/beacon-node/src/network/processor/gossipHandlers.ts @@ -81,6 +81,19 @@ const MAX_UNKNOWN_BLOCK_ROOT_RETRIES = 1; * - Ethereum Consensus gossipsub protocol strictly defined a single topic for message */ export function getGossipHandlers(modules: ValidatorFnsModules, options: GossipHandlerOpts): GossipHandlers { + const defaultHandlers = getDefaultHandlers(modules, options); + if (options.beaconAttestationBatchValidation) { + const batchHandlers = getBatchHandlers(modules, options); + return {...defaultHandlers, ...batchHandlers}; + } + return defaultHandlers; +} + +/** + * Default handlers validate gossip messages one by one. + * We only have a choice to do batch validation for beacon_attestation topic. + */ +function getDefaultHandlers(modules: ValidatorFnsModules, options: GossipHandlerOpts): GossipHandlers { const {chain, config, metrics, events, logger, core, aggregatorTracker} = modules; async function validateBeaconBlock( @@ -246,124 +259,6 @@ export function getGossipHandlers(modules: ValidatorFnsModules, options: GossipH }); } - async function beaconAttestationBatchHandler( - gossipHandlerParams: GossipHandlerParamGeneric[] - ): Promise<(null | AttestationError)[]> { - const results: (null | AttestationError)[] = []; - const attestationCount = gossipHandlerParams.length; - if (attestationCount === 0) { - return results; - } - // all attestations should have same attestation data as filtered by network processor - const {subnet, fork} = gossipHandlerParams[0].topic; - const validationParams = gossipHandlerParams.map((param) => ({ - attestation: null, - serializedData: param.gossipData.serializedData, - attSlot: param.gossipData.msgSlot, - attDataBase64: param.gossipData.indexed, - })) as AttestationOrBytes[]; - const {results: validationResults, batchableBls} = await validateGossipAttestationsSameAttData( - fork, - chain, - validationParams, - subnet - ); - for (const [i, validationResult] of validationResults.entries()) { - if (validationResult.err) { - results.push(validationResult.err as AttestationError); - continue; - } - - results.push(null); - - // Handler - const {indexedAttestation, attDataRootHex, attestation} = validationResult.result; - metrics?.registerGossipUnaggregatedAttestation(gossipHandlerParams[i].seenTimestampSec, indexedAttestation); - - try { - // Node may be subscribe to extra subnets (long-lived random subnets). For those, validate the messages - // but don't add to attestation pool, to save CPU and RAM - if (aggregatorTracker.shouldAggregate(subnet, indexedAttestation.data.slot)) { - const insertOutcome = chain.attestationPool.add(attestation, attDataRootHex); - metrics?.opPool.attestationPoolInsertOutcome.inc({insertOutcome}); - } - } catch (e) { - logger.error("Error adding unaggregated attestation to pool", {subnet}, e as Error); - } - - if (!options.dontSendGossipAttestationsToForkchoice) { - try { - chain.forkChoice.onAttestation(indexedAttestation, attDataRootHex); - } catch (e) { - logger.debug("Error adding gossip unaggregated attestation to forkchoice", {subnet}, e as Error); - } - } - - chain.emitter.emit(routes.events.EventType.attestation, attestation); - } - - if (batchableBls) { - metrics?.gossipAttestation.attestationBatchHistogram.observe(attestationCount); - } else { - metrics?.gossipAttestation.attestationNonBatchCount.inc(attestationCount); - } - - return results; - } - - async function beaconAttestationHandler({ - gossipData, - topic, - seenTimestampSec, - }: GossipHandlerParamGeneric): Promise { - const {serializedData, msgSlot} = gossipData; - if (msgSlot == undefined) { - throw Error("msgSlot is undefined for beacon_attestation topic"); - } - const {subnet, fork} = topic; - - // do not deserialize gossipSerializedData here, it's done in validateGossipAttestation only if needed - let validationResult: AttestationValidationResult; - try { - validationResult = await validateGossipAttestation( - fork, - chain, - {attestation: null, serializedData, attSlot: msgSlot}, - subnet - ); - } catch (e) { - if (e instanceof AttestationError && e.action === GossipAction.REJECT) { - chain.persistInvalidSszBytes(ssz.phase0.Attestation.typeName, serializedData, "gossip_reject"); - } - throw e; - } - - // Handler - const {indexedAttestation, attDataRootHex, attestation} = validationResult; - metrics?.registerGossipUnaggregatedAttestation(seenTimestampSec, indexedAttestation); - - try { - // Node may be subscribe to extra subnets (long-lived random subnets). For those, validate the messages - // but don't add to attestation pool, to save CPU and RAM - if (aggregatorTracker.shouldAggregate(subnet, indexedAttestation.data.slot)) { - const insertOutcome = chain.attestationPool.add(attestation, attDataRootHex); - metrics?.opPool.attestationPoolInsertOutcome.inc({insertOutcome}); - } - } catch (e) { - logger.error("Error adding unaggregated attestation to pool", {subnet}, e as Error); - } - - if (!options.dontSendGossipAttestationsToForkchoice) { - try { - chain.forkChoice.onAttestation(indexedAttestation, attDataRootHex); - } catch (e) { - logger.debug("Error adding gossip unaggregated attestation to forkchoice", {subnet}, e as Error); - } - } - - chain.emitter.emit(routes.events.EventType.attestation, attestation); - } - return { [GossipType.beacon_block]: async ({ gossipData, @@ -460,9 +355,58 @@ export function getGossipHandlers(modules: ValidatorFnsModules, options: GossipH chain.emitter.emit(routes.events.EventType.attestation, signedAggregateAndProof.message.aggregate); }, - [GossipType.beacon_attestation]: options.beaconAttestationBatchValidation - ? beaconAttestationBatchHandler - : beaconAttestationHandler, + [GossipType.beacon_attestation]: async ({ + gossipData, + topic, + seenTimestampSec, + }: GossipHandlerParamGeneric): Promise => { + const {serializedData, msgSlot} = gossipData; + if (msgSlot == undefined) { + throw Error("msgSlot is undefined for beacon_attestation topic"); + } + const {subnet, fork} = topic; + + // do not deserialize gossipSerializedData here, it's done in validateGossipAttestation only if needed + let validationResult: AttestationValidationResult; + try { + validationResult = await validateGossipAttestation( + fork, + chain, + {attestation: null, serializedData, attSlot: msgSlot}, + subnet + ); + } catch (e) { + if (e instanceof AttestationError && e.action === GossipAction.REJECT) { + chain.persistInvalidSszBytes(ssz.phase0.Attestation.typeName, serializedData, "gossip_reject"); + } + throw e; + } + + // Handler + const {indexedAttestation, attDataRootHex, attestation} = validationResult; + metrics?.registerGossipUnaggregatedAttestation(seenTimestampSec, indexedAttestation); + + try { + // Node may be subscribe to extra subnets (long-lived random subnets). For those, validate the messages + // but don't add to attestation pool, to save CPU and RAM + if (aggregatorTracker.shouldAggregate(subnet, indexedAttestation.data.slot)) { + const insertOutcome = chain.attestationPool.add(attestation, attDataRootHex); + metrics?.opPool.attestationPoolInsertOutcome.inc({insertOutcome}); + } + } catch (e) { + logger.error("Error adding unaggregated attestation to pool", {subnet}, e as Error); + } + + if (!options.dontSendGossipAttestationsToForkchoice) { + try { + chain.forkChoice.onAttestation(indexedAttestation, attDataRootHex); + } catch (e) { + logger.debug("Error adding gossip unaggregated attestation to forkchoice", {subnet}, e as Error); + } + } + + chain.emitter.emit(routes.events.EventType.attestation, attestation); + }, [GossipType.attester_slashing]: async ({ gossipData, @@ -606,6 +550,79 @@ export function getGossipHandlers(modules: ValidatorFnsModules, options: GossipH }; } +/** + * For now, only beacon_attestation topic is batched. + */ +function getBatchHandlers(modules: ValidatorFnsModules, options: GossipHandlerOpts): Partial { + const {chain, metrics, logger, aggregatorTracker} = modules; + return { + [GossipType.beacon_attestation]: async ( + gossipHandlerParams: GossipHandlerParamGeneric[] + ): Promise<(null | AttestationError)[]> => { + const results: (null | AttestationError)[] = []; + const attestationCount = gossipHandlerParams.length; + if (attestationCount === 0) { + return results; + } + // all attestations should have same attestation data as filtered by network processor + const {subnet, fork} = gossipHandlerParams[0].topic; + const validationParams = gossipHandlerParams.map((param) => ({ + attestation: null, + serializedData: param.gossipData.serializedData, + attSlot: param.gossipData.msgSlot, + attDataBase64: param.gossipData.indexed, + })) as AttestationOrBytes[]; + const {results: validationResults, batchableBls} = await validateGossipAttestationsSameAttData( + fork, + chain, + validationParams, + subnet + ); + for (const [i, validationResult] of validationResults.entries()) { + if (validationResult.err) { + results.push(validationResult.err as AttestationError); + continue; + } + + results.push(null); + + // Handler + const {indexedAttestation, attDataRootHex, attestation} = validationResult.result; + metrics?.registerGossipUnaggregatedAttestation(gossipHandlerParams[i].seenTimestampSec, indexedAttestation); + + try { + // Node may be subscribe to extra subnets (long-lived random subnets). For those, validate the messages + // but don't add to attestation pool, to save CPU and RAM + if (aggregatorTracker.shouldAggregate(subnet, indexedAttestation.data.slot)) { + const insertOutcome = chain.attestationPool.add(attestation, attDataRootHex); + metrics?.opPool.attestationPoolInsertOutcome.inc({insertOutcome}); + } + } catch (e) { + logger.error("Error adding unaggregated attestation to pool", {subnet}, e as Error); + } + + if (!options.dontSendGossipAttestationsToForkchoice) { + try { + chain.forkChoice.onAttestation(indexedAttestation, attDataRootHex); + } catch (e) { + logger.debug("Error adding gossip unaggregated attestation to forkchoice", {subnet}, e as Error); + } + } + + chain.emitter.emit(routes.events.EventType.attestation, attestation); + } + + if (batchableBls) { + metrics?.gossipAttestation.attestationBatchHistogram.observe(attestationCount); + } else { + metrics?.gossipAttestation.attestationNonBatchCount.inc(attestationCount); + } + + return results; + }, + }; +} + /** * Retry a function if it throws error code UNKNOWN_OR_PREFINALIZED_BEACON_BLOCK_ROOT */