Skip to content

Commit

Permalink
feat: verify gossip attestation messages in batch (#5896)
Browse files Browse the repository at this point in the history
* feat: validate gossip attestations same att data in batch

* feat: consume bls verifySignatureSetsSameMessage api

* chore: metrics to count percent of attestations batched

* feat: add minSameMessageSignatureSetsToBatch flag

* chore: remove unused metrics

* feat: chain.beaconAttestationBatchValidation cli option

* fix: create worker with defaultPoolSize - 1

* feat: enforce each queue item 50ms old

* chore: switch back to regular Map in IndexedGossipQueueMinSize

* fix: network.beaconAttestationBatchValidation flag

* fix: useFakeTimer in beforeEach

* fix: address attDataBase64 for both linear & indexed queues

* refactor: add getBatchHandlers

* fix: block event handler in the next event loop

* chore: refactor GossipHandlers type

* chore: set min and max batch to 32 & 128

* fix: for await in validateGossipAttestationsSameAttData

* chore: refactor phase* in batch function to step*

* fix: add and use blsPoolSize which is defaultPoolSize - 1

* chore: address PR comments

* feat: more metrics for the IndexedGossipQueue
  • Loading branch information
twoeths authored Aug 22, 2023
1 parent 8c6ad38 commit 7ee07da
Show file tree
Hide file tree
Showing 29 changed files with 1,134 additions and 261 deletions.
13 changes: 8 additions & 5 deletions packages/beacon-node/src/chain/blocks/importBlock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,14 @@ export async function importBlock(

this.metrics?.importBlock.bySource.inc({source});
this.logger.verbose("Added block to forkchoice and state cache", {slot: block.message.slot, root: blockRootHex});
this.emitter.emit(routes.events.EventType.block, {
block: toHexString(this.config.getForkTypes(block.message.slot).BeaconBlock.hashTreeRoot(block.message)),
slot: block.message.slot,
executionOptimistic: blockSummary != null && isOptimisticBlock(blockSummary),
});
// We want to import block asap so call all event handler in the next event loop
setTimeout(() => {
this.emitter.emit(routes.events.EventType.block, {
block: toHexString(this.config.getForkTypes(block.message.slot).BeaconBlock.hashTreeRoot(block.message)),
slot: block.message.slot,
executionOptimistic: blockSummary != null && isOptimisticBlock(blockSummary),
});
}, 0);

// 3. Import attestations to fork choice
//
Expand Down
7 changes: 5 additions & 2 deletions packages/beacon-node/src/chain/bls/multithread/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ export type BlsMultiThreadWorkerPoolOptions = {
blsVerifyAllMultiThread?: boolean;
};

// 1 worker for the main thread
const blsPoolSize = Math.max(defaultPoolSize - 1, 1);

/**
* Split big signature sets into smaller sets so they can be sent to multiple workers.
*
Expand Down Expand Up @@ -133,7 +136,7 @@ export class BlsMultiThreadWorkerPool implements IBlsVerifier {
// THe worker is not able to deserialize from uncompressed
// `Error: err _wrapDeserialize`
this.format = implementation === "blst-native" ? PointFormat.uncompressed : PointFormat.compressed;
this.workers = this.createWorkers(implementation, defaultPoolSize);
this.workers = this.createWorkers(implementation, blsPoolSize);

if (metrics) {
metrics.blsThreadPool.queueLength.addCollect(() => {
Expand All @@ -145,7 +148,7 @@ export class BlsMultiThreadWorkerPool implements IBlsVerifier {

canAcceptWork(): boolean {
return (
this.workersBusy < defaultPoolSize &&
this.workersBusy < blsPoolSize &&
// TODO: Should also bound the jobs queue?
this.jobs.length < MAX_JOBS_CAN_ACCEPT_WORK
);
Expand Down
5 changes: 5 additions & 0 deletions packages/beacon-node/src/chain/options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ export type IChainOptions = BlockProcessOpts &
/** Option to load a custom kzg trusted setup in txt format */
trustedSetup?: string;
broadcastValidationStrictness?: string;
minSameMessageSignatureSetsToBatch: number;
};

export type BlockProcessOpts = {
Expand Down Expand Up @@ -83,4 +84,8 @@ export const defaultChainOptions: IChainOptions = {
// for attestation validation, having this value ensures we don't have to regen states most of the time
maxSkipSlots: 32,
broadcastValidationStrictness: "warn",
// should be less than or equal to MIN_SIGNATURE_SETS_TO_BATCH_VERIFY
// batching too much may block the I/O thread so if useWorker=false, suggest this value to be 32
// since this batch attestation work is designed to work with useWorker=true, make this the lowest value
minSameMessageSignatureSetsToBatch: 2,
};
191 changes: 156 additions & 35 deletions packages/beacon-node/src/chain/validation/attestation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@ import {ATTESTATION_SUBNET_COUNT, SLOTS_PER_EPOCH, ForkName, ForkSeq} from "@lod
import {
computeEpochAtSlot,
CachedBeaconStateAllForks,
ISignatureSet,
getAttestationDataSigningRoot,
createSingleSignatureSetFromComponents,
SingleSignatureSet,
} from "@lodestar/state-transition";
import {IBeaconChain} from "..";
import {AttestationError, AttestationErrorCode, GossipAction} from "../errors/index.js";
import {MAXIMUM_GOSSIP_CLOCK_DISPARITY_SEC} from "../../constants/index.js";
import {RegenCaller} from "../regen/index.js";
Expand All @@ -21,6 +20,13 @@ import {
} from "../../util/sszBytes.js";
import {AttestationDataCacheEntry} from "../seenCache/seenAttestationData.js";
import {sszDeserializeAttestation} from "../../network/gossip/topic.js";
import {Result, wrapError} from "../../util/wrapError.js";
import {IBeaconChain} from "../interface.js";

export type BatchResult = {
results: Result<AttestationValidationResult>[];
batchableBls: boolean;
};

export type AttestationValidationResult = {
attestation: phase0.Attestation;
Expand All @@ -40,6 +46,12 @@ export type GossipAttestation = {
serializedData: Uint8Array;
// available in NetworkProcessor since we check for unknown block root attestations
attSlot: Slot;
attDataBase64?: string | null;
};

export type Step0Result = AttestationValidationResult & {
signatureSet: SingleSignatureSet;
validatorIndex: number;
};

/**
Expand All @@ -49,11 +61,7 @@ export type GossipAttestation = {
const SHUFFLING_LOOK_AHEAD_EPOCHS = 1;

/**
* Validate attestations from gossip
* - Only deserialize the attestation if needed, use the cached AttestationData instead
* - This is to avoid deserializing similar attestation multiple times which could help the gc
* - subnet is required
* - do not prioritize bls signature set
* Validate a single gossip attestation, do not prioritize bls signature set
*/
export async function validateGossipAttestation(
fork: ForkName,
Expand All @@ -62,7 +70,110 @@ export async function validateGossipAttestation(
/** Optional, to allow verifying attestations through API with unknown subnet */
subnet: number
): Promise<AttestationValidationResult> {
return validateAttestation(fork, chain, attestationOrBytes, subnet);
const prioritizeBls = false;
return validateAttestation(fork, chain, attestationOrBytes, subnet, prioritizeBls);
}

/**
* Verify gossip attestations of the same attestation data. The main advantage is we can batch verify bls signatures
* through verifySignatureSetsSameMessage bls api to improve performance.
* - If there are less than 2 signatures (minSameMessageSignatureSetsToBatch), verify each signature individually with batchable = true
* - do not prioritize bls signature set
*/
export async function validateGossipAttestationsSameAttData(
fork: ForkName,
chain: IBeaconChain,
attestationOrBytesArr: AttestationOrBytes[],
subnet: number,
// for unit test, consumers do not need to pass this
step0ValidationFn = validateGossipAttestationNoSignatureCheck
): Promise<BatchResult> {
if (attestationOrBytesArr.length === 0) {
return {results: [], batchableBls: false};
}

// step0: do all verifications except for signature verification
// this for await pattern below seems to be bad but it's not
// for seen AttestationData, it's the same to await Promise.all() pattern
// for unseen AttestationData, the 1st call will be cached and the rest will be fast
const step0ResultOrErrors: Result<Step0Result>[] = [];
for (const attestationOrBytes of attestationOrBytesArr) {
const resultOrError = await wrapError(step0ValidationFn(fork, chain, attestationOrBytes, subnet));
step0ResultOrErrors.push(resultOrError);
}

// step1: verify signatures of all valid attestations
// map new index to index in resultOrErrors
const newIndexToOldIndex = new Map<number, number>();
const signatureSets: SingleSignatureSet[] = [];
let newIndex = 0;
const step0Results: Step0Result[] = [];
for (const [i, resultOrError] of step0ResultOrErrors.entries()) {
if (resultOrError.err) {
continue;
}
step0Results.push(resultOrError.result);
newIndexToOldIndex.set(newIndex, i);
signatureSets.push(resultOrError.result.signatureSet);
newIndex++;
}

let signatureValids: boolean[];
const batchableBls = signatureSets.length >= chain.opts.minSameMessageSignatureSetsToBatch;
if (batchableBls) {
// all signature sets should have same signing root since we filtered in network processor
signatureValids = await chain.bls.verifySignatureSetsSameMessage(
signatureSets.map((set) => ({publicKey: set.pubkey, signature: set.signature})),
signatureSets[0].signingRoot
);
} else {
// don't want to block the main thread if there are too few signatures
signatureValids = await Promise.all(
signatureSets.map((set) => chain.bls.verifySignatureSets([set], {batchable: true}))
);
}

// phase0 post validation
for (const [i, sigValid] of signatureValids.entries()) {
const oldIndex = newIndexToOldIndex.get(i);
if (oldIndex == null) {
// should not happen
throw Error(`Cannot get old index for index ${i}`);
}

const {validatorIndex, attestation} = step0Results[i];
const targetEpoch = attestation.data.target.epoch;
if (sigValid) {
// Now that the attestation has been fully verified, store that we have received a valid attestation from this validator.
//
// It's important to double check that the attestation still hasn't been observed, since
// there can be a race-condition if we receive two attestations at the same time and
// process them in different threads.
if (chain.seenAttesters.isKnown(targetEpoch, validatorIndex)) {
step0ResultOrErrors[oldIndex] = {
err: new AttestationError(GossipAction.IGNORE, {
code: AttestationErrorCode.ATTESTATION_ALREADY_KNOWN,
targetEpoch,
validatorIndex,
}),
};
}

// valid
chain.seenAttesters.add(targetEpoch, validatorIndex);
} else {
step0ResultOrErrors[oldIndex] = {
err: new AttestationError(GossipAction.IGNORE, {
code: AttestationErrorCode.INVALID_SIGNATURE,
}),
};
}
}

return {
results: step0ResultOrErrors,
batchableBls,
};
}

/**
Expand All @@ -81,17 +192,42 @@ export async function validateApiAttestation(
}

/**
* Only deserialize the attestation if needed, use the cached AttestationData instead
* This is to avoid deserializing similar attestation multiple times which could help the gc
* Validate a single unaggregated attestation
* subnet is null for api attestations
*/
async function validateAttestation(
export async function validateAttestation(
fork: ForkName,
chain: IBeaconChain,
attestationOrBytes: AttestationOrBytes,
/** Optional, to allow verifying attestations through API with unknown subnet */
subnet: number | null,
prioritizeBls = false
): Promise<AttestationValidationResult> {
const step0Result = await validateGossipAttestationNoSignatureCheck(fork, chain, attestationOrBytes, subnet);
const {attestation, signatureSet, validatorIndex} = step0Result;
const isValid = await chain.bls.verifySignatureSets([signatureSet], {batchable: true, priority: prioritizeBls});

if (isValid) {
const targetEpoch = attestation.data.target.epoch;
chain.seenAttesters.add(targetEpoch, validatorIndex);
return step0Result;
} else {
throw new AttestationError(GossipAction.IGNORE, {
code: AttestationErrorCode.INVALID_SIGNATURE,
});
}
}

/**
* Only deserialize the attestation if needed, use the cached AttestationData instead
* This is to avoid deserializing similar attestation multiple times which could help the gc
*/
async function validateGossipAttestationNoSignatureCheck(
fork: ForkName,
chain: IBeaconChain,
attestationOrBytes: AttestationOrBytes,
/** Optional, to allow verifying attestations through API with unknown subnet */
subnet: number | null
): Promise<Step0Result> {
// Do checks in this order:
// - do early checks (w/o indexed attestation)
// - > obtain indexed attestation and committes per slot
Expand All @@ -105,11 +241,14 @@ async function validateAttestation(
let attestationOrCache:
| {attestation: phase0.Attestation; cache: null}
| {attestation: null; cache: AttestationDataCacheEntry; serializedData: Uint8Array};
let attDataBase64: AttDataBase64 | null;
let attDataBase64: AttDataBase64 | null = null;
if (attestationOrBytes.serializedData) {
// gossip
attDataBase64 = getAttDataBase64FromAttestationSerialized(attestationOrBytes.serializedData);
const attSlot = attestationOrBytes.attSlot;
// for old LIFO linear gossip queue we don't have attDataBase64
// for indexed gossip queue we have attDataBase64
attDataBase64 =
attestationOrBytes.attDataBase64 ?? getAttDataBase64FromAttestationSerialized(attestationOrBytes.serializedData);
const cachedAttData = attDataBase64 !== null ? chain.seenAttestationDatas.get(attSlot, attDataBase64) : null;
if (cachedAttData === null) {
const attestation = sszDeserializeAttestation(attestationOrBytes.serializedData);
Expand Down Expand Up @@ -263,7 +402,7 @@ async function validateAttestation(

// [REJECT] The signature of attestation is valid.
const attestingIndices = [validatorIndex];
let signatureSet: ISignatureSet;
let signatureSet: SingleSignatureSet;
let attDataRootHex: RootHex;
const signature = attestationOrCache.attestation
? attestationOrCache.attestation.signature
Expand Down Expand Up @@ -304,25 +443,7 @@ async function validateAttestation(
}
}

if (!(await chain.bls.verifySignatureSets([signatureSet], {batchable: true, priority: prioritizeBls}))) {
throw new AttestationError(GossipAction.REJECT, {code: AttestationErrorCode.INVALID_SIGNATURE});
}

// Now that the attestation has been fully verified, store that we have received a valid attestation from this validator.
//
// It's important to double check that the attestation still hasn't been observed, since
// there can be a race-condition if we receive two attestations at the same time and
// process them in different threads.
if (chain.seenAttesters.isKnown(targetEpoch, validatorIndex)) {
throw new AttestationError(GossipAction.IGNORE, {
code: AttestationErrorCode.ATTESTATION_ALREADY_KNOWN,
targetEpoch,
validatorIndex,
});
}

chain.seenAttesters.add(targetEpoch, validatorIndex);

// no signature check, leave that for step1
const indexedAttestation: phase0.IndexedAttestation = {
attestingIndices,
data: attData,
Expand All @@ -336,7 +457,7 @@ async function validateAttestation(
data: attData,
signature,
};
return {attestation, indexedAttestation, subnet: expectedSubnet, attDataRootHex};
return {attestation, indexedAttestation, subnet: expectedSubnet, attDataRootHex, signatureSet, validatorIndex};
}

/**
Expand Down
26 changes: 23 additions & 3 deletions packages/beacon-node/src/metrics/metrics/lodestar.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ export function createLodestarMetrics(
help: "Count of total gossip validation queue length",
labelNames: ["topic"],
}),
dropRatio: register.gauge<"topic">({
name: "lodestar_gossip_validation_queue_current_drop_ratio",
help: "Current drop ratio of gossip validation queue",
keySize: register.gauge<"topic">({
name: "lodestar_gossip_validation_queue_key_size",
help: "Count of total gossip validation queue key size",
labelNames: ["topic"],
}),
droppedJobs: register.gauge<"topic">({
Expand All @@ -65,6 +65,17 @@ export function createLodestarMetrics(
help: "Current count of jobs being run on network processor for topic",
labelNames: ["topic"],
}),
// this metric links to the beacon_attestation topic only as this is the only topics that are batch
keyAge: register.histogram({
name: "lodestar_gossip_validation_queue_key_age_seconds",
help: "Age of the first item of each key in the indexed queues in seconds",
buckets: [0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1, 5],
}),
queueTime: register.histogram({
name: "lodestar_gossip_validation_queue_time_seconds",
help: "Total time an item stays in queue until it is processed in seconds",
buckets: [0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1, 5],
}),
},

networkProcessor: {
Expand Down Expand Up @@ -575,6 +586,15 @@ export function createLodestarMetrics(
labelNames: ["caller"],
buckets: [0, 1, 2, 4, 8, 16, 32, 64],
}),
attestationBatchHistogram: register.histogram({
name: "lodestar_gossip_attestation_verified_in_batch_histogram",
help: "Number of attestations verified in batch",
buckets: [1, 2, 4, 8, 16, 32, 64, 128],
}),
attestationNonBatchCount: register.gauge({
name: "lodestar_gossip_attestation_verified_non_batch_count",
help: "Count of attestations NOT verified in batch",
}),
},

// Gossip block
Expand Down
Loading

0 comments on commit 7ee07da

Please sign in to comment.