Skip to content

Commit

Permalink
fix: network.beaconAttestationBatchValidation flag
Browse files Browse the repository at this point in the history
  • Loading branch information
twoeths committed Aug 18, 2023
1 parent e862ed7 commit 36db4a6
Show file tree
Hide file tree
Showing 8 changed files with 148 additions and 92 deletions.
1 change: 0 additions & 1 deletion packages/beacon-node/src/chain/options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ export type IChainOptions = BlockProcessOpts &
/** Option to load a custom kzg trusted setup in txt format */
trustedSetup?: string;
broadcastValidationStrictness?: string;
beaconAttestationBatchValidation?: boolean;
minSameMessageSignatureSetsToBatch: number;
};

Expand Down
17 changes: 7 additions & 10 deletions packages/beacon-node/src/chain/validation/attestation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import {
createSingleSignatureSetFromComponents,
SingleSignatureSet,
} from "@lodestar/state-transition";
import {IBeaconChain} from "..";
import {AttestationError, AttestationErrorCode, GossipAction} from "../errors/index.js";
import {MAXIMUM_GOSSIP_CLOCK_DISPARITY_SEC} from "../../constants/index.js";
import {RegenCaller} from "../regen/index.js";
Expand All @@ -21,6 +20,7 @@ import {
import {AttestationDataCacheEntry} from "../seenCache/seenAttestationData.js";
import {sszDeserializeAttestation} from "../../network/gossip/topic.js";
import {Result, wrapError} from "../../util/wrapError.js";
import {IBeaconChain} from "../interface.js";

export type BatchResult = {
results: Result<AttestationValidationResult>[];
Expand Down Expand Up @@ -59,15 +59,6 @@ export type Phase0Result = AttestationValidationResult & {
*/
const SHUFFLING_LOOK_AHEAD_EPOCHS = 1;

/**
* Verify gossip attestations of the same attestation data.
* - If there are less than 32 signatures, verify each signature individually with batchable = true
* - If there are not less than 32 signatures
* - do a quick verify by aggregate all signatures and pubkeys, this takes 4.6ms for 32 signatures and 7.6ms for 64 signatures
* - if one of the signature is invalid, do a fallback verify by verify each signature individually with batchable = false
* - subnet is required
* - do not prioritize bls signature set
*/
export async function validateGossipAttestation(
fork: ForkName,
chain: IBeaconChain,
Expand All @@ -78,6 +69,12 @@ export async function validateGossipAttestation(
return validateAttestation(fork, chain, attestationOrBytes, subnet);
}

/**
* Verify gossip attestations of the same attestation data. The main advantage is we can batch verify bls signatures
* through verifySignatureSetsSameMessage bls api to improve performance.
* - If there are less than 2 signatures (minSameMessageSignatureSetsToBatch), verify each signature individually with batchable = true
* - do not prioritize bls signature set
*/
export async function validateGossipAttestationsSameAttData(
fork: ForkName,
chain: IBeaconChain,
Expand Down
191 changes: 126 additions & 65 deletions packages/beacon-node/src/network/processor/gossipHandlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import {ForkName, ForkSeq} from "@lodestar/params";
import {routes} from "@lodestar/api";
import {Metrics} from "../../metrics/index.js";
import {OpSource} from "../../metrics/validatorMonitor.js";
import {IBeaconChain} from "../../chain/index.js";
import {
AttestationError,
AttestationErrorCode,
Expand All @@ -29,7 +28,9 @@ import {
validateGossipBlsToExecutionChange,
AggregateAndProofValidationResult,
validateGossipAttestationsSameAttData,
validateGossipAttestation,
AttestationOrBytes,
AttestationValidationResult,
} from "../../chain/validation/index.js";
import {NetworkEvent, NetworkEventBus} from "../events.js";
import {PeerAction} from "../peers/index.js";
Expand All @@ -40,6 +41,7 @@ import {BlockInput, BlockSource, getBlockInput, GossipedInputType} from "../../c
import {sszDeserialize} from "../gossip/topic.js";
import {INetworkCore} from "../core/index.js";
import {INetwork} from "../interface.js";
import {IBeaconChain} from "../../chain/interface.js";
import {AggregatorTracker} from "./aggregatorTracker.js";

/**
Expand All @@ -48,6 +50,8 @@ import {AggregatorTracker} from "./aggregatorTracker.js";
export type GossipHandlerOpts = {
/** By default pass gossip attestations to forkchoice */
dontSendGossipAttestationsToForkchoice?: boolean;
/** By default don't validate gossip attestations in batch */
beaconAttestationBatchValidation?: boolean;
};

export type ValidatorFnsModules = {
Expand Down Expand Up @@ -242,6 +246,124 @@ export function getGossipHandlers(modules: ValidatorFnsModules, options: GossipH
});
}

async function beaconAttestationBatchHandler(
gossipHandlerParams: GossipHandlerParamGeneric<GossipType.beacon_attestation>[]
): Promise<(null | AttestationError)[]> {
const results: (null | AttestationError)[] = [];
const attestationCount = gossipHandlerParams.length;
if (attestationCount === 0) {
return results;
}
// all attestations should have same attestation data as filtered by network processor
const {subnet, fork} = gossipHandlerParams[0].topic;
const validationParams = gossipHandlerParams.map((param) => ({
attestation: null,
serializedData: param.gossipData.serializedData,
attSlot: param.gossipData.msgSlot,
attDataBase64: param.gossipData.indexed,
})) as AttestationOrBytes[];
const {results: validationResults, batchableBls} = await validateGossipAttestationsSameAttData(
fork,
chain,
validationParams,
subnet
);
for (const [i, validationResult] of validationResults.entries()) {
if (validationResult.err) {
results.push(validationResult.err as AttestationError);
continue;
}

results.push(null);

// Handler
const {indexedAttestation, attDataRootHex, attestation} = validationResult.result;
metrics?.registerGossipUnaggregatedAttestation(gossipHandlerParams[i].seenTimestampSec, indexedAttestation);

try {
// Node may be subscribe to extra subnets (long-lived random subnets). For those, validate the messages
// but don't add to attestation pool, to save CPU and RAM
if (aggregatorTracker.shouldAggregate(subnet, indexedAttestation.data.slot)) {
const insertOutcome = chain.attestationPool.add(attestation, attDataRootHex);
metrics?.opPool.attestationPoolInsertOutcome.inc({insertOutcome});
}
} catch (e) {
logger.error("Error adding unaggregated attestation to pool", {subnet}, e as Error);
}

if (!options.dontSendGossipAttestationsToForkchoice) {
try {
chain.forkChoice.onAttestation(indexedAttestation, attDataRootHex);
} catch (e) {
logger.debug("Error adding gossip unaggregated attestation to forkchoice", {subnet}, e as Error);
}
}

chain.emitter.emit(routes.events.EventType.attestation, attestation);
}

if (batchableBls) {
metrics?.gossipAttestation.attestationBatchHistogram.observe(attestationCount);
} else {
metrics?.gossipAttestation.attestationNonBatchCount.inc(attestationCount);
}

return results;
}

async function beaconAttestationHandler({
gossipData,
topic,
seenTimestampSec,
}: GossipHandlerParamGeneric<GossipType.beacon_attestation>): Promise<void> {
const {serializedData, msgSlot} = gossipData;
if (msgSlot == undefined) {
throw Error("msgSlot is undefined for beacon_attestation topic");
}
const {subnet, fork} = topic;

// do not deserialize gossipSerializedData here, it's done in validateGossipAttestation only if needed
let validationResult: AttestationValidationResult;
try {
validationResult = await validateGossipAttestation(
fork,
chain,
{attestation: null, serializedData, attSlot: msgSlot},
subnet
);
} catch (e) {
if (e instanceof AttestationError && e.action === GossipAction.REJECT) {
chain.persistInvalidSszBytes(ssz.phase0.Attestation.typeName, serializedData, "gossip_reject");
}
throw e;
}

// Handler
const {indexedAttestation, attDataRootHex, attestation} = validationResult;
metrics?.registerGossipUnaggregatedAttestation(seenTimestampSec, indexedAttestation);

try {
// Node may be subscribe to extra subnets (long-lived random subnets). For those, validate the messages
// but don't add to attestation pool, to save CPU and RAM
if (aggregatorTracker.shouldAggregate(subnet, indexedAttestation.data.slot)) {
const insertOutcome = chain.attestationPool.add(attestation, attDataRootHex);
metrics?.opPool.attestationPoolInsertOutcome.inc({insertOutcome});
}
} catch (e) {
logger.error("Error adding unaggregated attestation to pool", {subnet}, e as Error);
}

if (!options.dontSendGossipAttestationsToForkchoice) {
try {
chain.forkChoice.onAttestation(indexedAttestation, attDataRootHex);
} catch (e) {
logger.debug("Error adding gossip unaggregated attestation to forkchoice", {subnet}, e as Error);
}
}

chain.emitter.emit(routes.events.EventType.attestation, attestation);
}

return {
[GossipType.beacon_block]: async ({
gossipData,
Expand Down Expand Up @@ -338,70 +460,9 @@ export function getGossipHandlers(modules: ValidatorFnsModules, options: GossipH

chain.emitter.emit(routes.events.EventType.attestation, signedAggregateAndProof.message.aggregate);
},
[GossipType.beacon_attestation]: async (
gossipHandlerParams: GossipHandlerParamGeneric<GossipType.beacon_attestation>[]
) => {
const results: (null | AttestationError)[] = [];
const attestationCount = gossipHandlerParams.length;
if (attestationCount === 0) {
return results;
}
// all attestations should have same attestation data as filtered by network processor
const {subnet, fork} = gossipHandlerParams[0].topic;
const validationParams = gossipHandlerParams.map((param) => ({
attestation: null,
serializedData: param.gossipData.serializedData,
attSlot: param.gossipData.msgSlot,
attDataBase64: param.gossipData.indexed,
})) as AttestationOrBytes[];
const {results: validationResults, batchableBls} = await validateGossipAttestationsSameAttData(
fork,
chain,
validationParams,
subnet
);
for (const [i, validationResult] of validationResults.entries()) {
if (validationResult.err) {
results.push(validationResult.err as AttestationError);
continue;
}

results.push(null);

// Handler
const {indexedAttestation, attDataRootHex, attestation} = validationResult.result;
metrics?.registerGossipUnaggregatedAttestation(gossipHandlerParams[i].seenTimestampSec, indexedAttestation);

try {
// Node may be subscribe to extra subnets (long-lived random subnets). For those, validate the messages
// but don't add to attestation pool, to save CPU and RAM
if (aggregatorTracker.shouldAggregate(subnet, indexedAttestation.data.slot)) {
const insertOutcome = chain.attestationPool.add(attestation, attDataRootHex);
metrics?.opPool.attestationPoolInsertOutcome.inc({insertOutcome});
}
} catch (e) {
logger.error("Error adding unaggregated attestation to pool", {subnet}, e as Error);
}

if (!options.dontSendGossipAttestationsToForkchoice) {
try {
chain.forkChoice.onAttestation(indexedAttestation, attDataRootHex);
} catch (e) {
logger.debug("Error adding gossip unaggregated attestation to forkchoice", {subnet}, e as Error);
}
}

chain.emitter.emit(routes.events.EventType.attestation, attestation);
}

if (batchableBls) {
metrics?.gossipAttestation.attestationBatchHistogram.observe(attestationCount);
} else {
metrics?.gossipAttestation.attestationNonBatchCount.inc(attestationCount);
}

return results;
},
[GossipType.beacon_attestation]: options.beaconAttestationBatchValidation
? beaconAttestationBatchHandler
: beaconAttestationHandler,

[GossipType.attester_slashing]: async ({
gossipData,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,12 @@ import {IndexedGossipQueueAvgTime} from "./indexedAvgTime.js";
* In normal condition, the higher this value the more efficient the signature verification.
* However, if at least 1 signature is invalid, we need to verify each signature separately.
*/
const MAX_GOSSIP_ATTESTATION_BATCH_SIZE = 128;
const MAX_GOSSIP_ATTESTATION_BATCH_SIZE = 64;

/**
* Batching signatures have the cost of signature aggregation which blocks the main thread.
* We should only batch verify when there are at least 32 signatures.
* Minimum signature sets to batch verify without waiting for 50ms.
*/
export const MIN_SIGNATURE_SETS_TO_BATCH_VERIFY = 32;
export const MIN_SIGNATURE_SETS_TO_BATCH_VERIFY = 16;

/**
* Numbers from https://github.com/sigp/lighthouse/blob/b34a79dc0b02e04441ba01fd0f304d1e203d877d/beacon_node/network/src/beacon_processor/mod.rs#L69
Expand Down
2 changes: 1 addition & 1 deletion packages/beacon-node/src/network/processor/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ export class NetworkProcessor {
this.metrics = metrics;
this.logger = logger;
this.events = events;
this.gossipQueues = createGossipQueues(this.chain.opts.beaconAttestationBatchValidation);
this.gossipQueues = createGossipQueues(this.opts.beaconAttestationBatchValidation);
this.gossipTopicConcurrency = mapValues(this.gossipQueues, () => 0);
this.gossipValidatorFn = getGossipValidatorFn(modules.gossipHandlers ?? getGossipHandlers(modules, opts), modules);
this.gossipValidatorBatchFn = getGossipValidatorBatchFn(
Expand Down
9 changes: 0 additions & 9 deletions packages/cli/src/options/beaconNodeOptions/chain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ export type ChainArgs = {
"chain.archiveStateEpochFrequency": number;
emitPayloadAttributes?: boolean;
broadcastValidationStrictness?: string;
"chain.beaconAttestationBatchValidation"?: boolean;
"chain.minSameMessageSignatureSetsToBatch"?: number;
};

Expand All @@ -48,7 +47,6 @@ export function parseArgs(args: ChainArgs): IBeaconNodeOptions["chain"] {
archiveStateEpochFrequency: args["chain.archiveStateEpochFrequency"],
emitPayloadAttributes: args["emitPayloadAttributes"],
broadcastValidationStrictness: args["broadcastValidationStrictness"],
beaconAttestationBatchValidation: args["chain.beaconAttestationBatchValidation"],
minSameMessageSignatureSetsToBatch:
args["chain.minSameMessageSignatureSetsToBatch"] ?? defaultOptions.chain.minSameMessageSignatureSetsToBatch,
};
Expand Down Expand Up @@ -188,13 +186,6 @@ Will double processing times. Use only for debugging purposes.",
default: "warn",
},

"chain.beaconAttestationBatchValidation": {
hidden: true,
description: "Enable beacon attestation batch validation",
type: "boolean",
group: "chain",
},

"chain.minSameMessageSignatureSetsToBatch": {
hidden: true,
description: "Minimum number of same message signature sets to batch",
Expand Down
9 changes: 9 additions & 0 deletions packages/cli/src/options/beaconNodeOptions/network.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ export type NetworkArgs = {
"network.connectToDiscv5Bootnodes"?: boolean;
"network.discv5FirstQueryDelayMs"?: number;
"network.dontSendGossipAttestationsToForkchoice"?: boolean;
"network.beaconAttestationBatchValidation"?: boolean;
"network.allowPublishToZeroPeers"?: boolean;
"network.gossipsubD"?: number;
"network.gossipsubDLow"?: number;
Expand Down Expand Up @@ -142,6 +143,7 @@ export function parseArgs(args: NetworkArgs): IBeaconNodeOptions["network"] {
connectToDiscv5Bootnodes: args["network.connectToDiscv5Bootnodes"],
discv5FirstQueryDelayMs: args["network.discv5FirstQueryDelayMs"],
dontSendGossipAttestationsToForkchoice: args["network.dontSendGossipAttestationsToForkchoice"],
beaconAttestationBatchValidation: args["network.beaconAttestationBatchValidation"],
allowPublishToZeroPeers: args["network.allowPublishToZeroPeers"],
gossipsubD: args["network.gossipsubD"],
gossipsubDLow: args["network.gossipsubDLow"],
Expand Down Expand Up @@ -323,6 +325,13 @@ export const options: CliCommandOptions<NetworkArgs> = {
group: "network",
},

"network.beaconAttestationBatchValidation": {
hidden: true,
type: "boolean",
description: "Validate gossip attestations in batches",
group: "network",
},

"network.allowPublishToZeroPeers": {
hidden: true,
type: "boolean",
Expand Down
Loading

0 comments on commit 36db4a6

Please sign in to comment.