Skip to content

Commit

Permalink
refactor: add getBatchHandlers
Browse files Browse the repository at this point in the history
  • Loading branch information
twoeths committed Aug 19, 2023
1 parent 37cfb6e commit 1116943
Show file tree
Hide file tree
Showing 2 changed files with 139 additions and 121 deletions.
1 change: 1 addition & 0 deletions packages/beacon-node/src/network/gossip/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ export type GossipHandlerParamGeneric<T extends GossipType> = {
export type GossipHandlers = {
[K in GossipType]:
| ((gossipHandlerParam: GossipHandlerParamGeneric<K>) => Promise<void>)
// TODO: make it generic
| ((gossipHandlerParams: GossipHandlerParamGeneric<K>[]) => Promise<(null | AttestationError)[]>);
};

Expand Down
259 changes: 138 additions & 121 deletions packages/beacon-node/src/network/processor/gossipHandlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,19 @@ const MAX_UNKNOWN_BLOCK_ROOT_RETRIES = 1;
* - Ethereum Consensus gossipsub protocol strictly defined a single topic for message
*/
export function getGossipHandlers(modules: ValidatorFnsModules, options: GossipHandlerOpts): GossipHandlers {
const defaultHandlers = getDefaultHandlers(modules, options);
if (options.beaconAttestationBatchValidation) {
const batchHandlers = getBatchHandlers(modules, options);
return {...defaultHandlers, ...batchHandlers};
}
return defaultHandlers;
}

/**
* Default handlers validate gossip messages one by one.
* We only have a choice to do batch validation for beacon_attestation topic.
*/
function getDefaultHandlers(modules: ValidatorFnsModules, options: GossipHandlerOpts): GossipHandlers {
const {chain, config, metrics, events, logger, core, aggregatorTracker} = modules;

async function validateBeaconBlock(
Expand Down Expand Up @@ -246,124 +259,6 @@ export function getGossipHandlers(modules: ValidatorFnsModules, options: GossipH
});
}

async function beaconAttestationBatchHandler(
gossipHandlerParams: GossipHandlerParamGeneric<GossipType.beacon_attestation>[]
): Promise<(null | AttestationError)[]> {
const results: (null | AttestationError)[] = [];
const attestationCount = gossipHandlerParams.length;
if (attestationCount === 0) {
return results;
}
// all attestations should have same attestation data as filtered by network processor
const {subnet, fork} = gossipHandlerParams[0].topic;
const validationParams = gossipHandlerParams.map((param) => ({
attestation: null,
serializedData: param.gossipData.serializedData,
attSlot: param.gossipData.msgSlot,
attDataBase64: param.gossipData.indexed,
})) as AttestationOrBytes[];
const {results: validationResults, batchableBls} = await validateGossipAttestationsSameAttData(
fork,
chain,
validationParams,
subnet
);
for (const [i, validationResult] of validationResults.entries()) {
if (validationResult.err) {
results.push(validationResult.err as AttestationError);
continue;
}

results.push(null);

// Handler
const {indexedAttestation, attDataRootHex, attestation} = validationResult.result;
metrics?.registerGossipUnaggregatedAttestation(gossipHandlerParams[i].seenTimestampSec, indexedAttestation);

try {
// Node may be subscribe to extra subnets (long-lived random subnets). For those, validate the messages
// but don't add to attestation pool, to save CPU and RAM
if (aggregatorTracker.shouldAggregate(subnet, indexedAttestation.data.slot)) {
const insertOutcome = chain.attestationPool.add(attestation, attDataRootHex);
metrics?.opPool.attestationPoolInsertOutcome.inc({insertOutcome});
}
} catch (e) {
logger.error("Error adding unaggregated attestation to pool", {subnet}, e as Error);
}

if (!options.dontSendGossipAttestationsToForkchoice) {
try {
chain.forkChoice.onAttestation(indexedAttestation, attDataRootHex);
} catch (e) {
logger.debug("Error adding gossip unaggregated attestation to forkchoice", {subnet}, e as Error);
}
}

chain.emitter.emit(routes.events.EventType.attestation, attestation);
}

if (batchableBls) {
metrics?.gossipAttestation.attestationBatchHistogram.observe(attestationCount);
} else {
metrics?.gossipAttestation.attestationNonBatchCount.inc(attestationCount);
}

return results;
}

async function beaconAttestationHandler({
gossipData,
topic,
seenTimestampSec,
}: GossipHandlerParamGeneric<GossipType.beacon_attestation>): Promise<void> {
const {serializedData, msgSlot} = gossipData;
if (msgSlot == undefined) {
throw Error("msgSlot is undefined for beacon_attestation topic");
}
const {subnet, fork} = topic;

// do not deserialize gossipSerializedData here, it's done in validateGossipAttestation only if needed
let validationResult: AttestationValidationResult;
try {
validationResult = await validateGossipAttestation(
fork,
chain,
{attestation: null, serializedData, attSlot: msgSlot},
subnet
);
} catch (e) {
if (e instanceof AttestationError && e.action === GossipAction.REJECT) {
chain.persistInvalidSszBytes(ssz.phase0.Attestation.typeName, serializedData, "gossip_reject");
}
throw e;
}

// Handler
const {indexedAttestation, attDataRootHex, attestation} = validationResult;
metrics?.registerGossipUnaggregatedAttestation(seenTimestampSec, indexedAttestation);

try {
// Node may be subscribe to extra subnets (long-lived random subnets). For those, validate the messages
// but don't add to attestation pool, to save CPU and RAM
if (aggregatorTracker.shouldAggregate(subnet, indexedAttestation.data.slot)) {
const insertOutcome = chain.attestationPool.add(attestation, attDataRootHex);
metrics?.opPool.attestationPoolInsertOutcome.inc({insertOutcome});
}
} catch (e) {
logger.error("Error adding unaggregated attestation to pool", {subnet}, e as Error);
}

if (!options.dontSendGossipAttestationsToForkchoice) {
try {
chain.forkChoice.onAttestation(indexedAttestation, attDataRootHex);
} catch (e) {
logger.debug("Error adding gossip unaggregated attestation to forkchoice", {subnet}, e as Error);
}
}

chain.emitter.emit(routes.events.EventType.attestation, attestation);
}

return {
[GossipType.beacon_block]: async ({
gossipData,
Expand Down Expand Up @@ -460,9 +355,58 @@ export function getGossipHandlers(modules: ValidatorFnsModules, options: GossipH

chain.emitter.emit(routes.events.EventType.attestation, signedAggregateAndProof.message.aggregate);
},
[GossipType.beacon_attestation]: options.beaconAttestationBatchValidation
? beaconAttestationBatchHandler
: beaconAttestationHandler,
[GossipType.beacon_attestation]: async ({
gossipData,
topic,
seenTimestampSec,
}: GossipHandlerParamGeneric<GossipType.beacon_attestation>): Promise<void> => {
const {serializedData, msgSlot} = gossipData;
if (msgSlot == undefined) {
throw Error("msgSlot is undefined for beacon_attestation topic");
}
const {subnet, fork} = topic;

// do not deserialize gossipSerializedData here, it's done in validateGossipAttestation only if needed
let validationResult: AttestationValidationResult;
try {
validationResult = await validateGossipAttestation(
fork,
chain,
{attestation: null, serializedData, attSlot: msgSlot},
subnet
);
} catch (e) {
if (e instanceof AttestationError && e.action === GossipAction.REJECT) {
chain.persistInvalidSszBytes(ssz.phase0.Attestation.typeName, serializedData, "gossip_reject");
}
throw e;
}

// Handler
const {indexedAttestation, attDataRootHex, attestation} = validationResult;
metrics?.registerGossipUnaggregatedAttestation(seenTimestampSec, indexedAttestation);

try {
// Node may be subscribe to extra subnets (long-lived random subnets). For those, validate the messages
// but don't add to attestation pool, to save CPU and RAM
if (aggregatorTracker.shouldAggregate(subnet, indexedAttestation.data.slot)) {
const insertOutcome = chain.attestationPool.add(attestation, attDataRootHex);
metrics?.opPool.attestationPoolInsertOutcome.inc({insertOutcome});
}
} catch (e) {
logger.error("Error adding unaggregated attestation to pool", {subnet}, e as Error);
}

if (!options.dontSendGossipAttestationsToForkchoice) {
try {
chain.forkChoice.onAttestation(indexedAttestation, attDataRootHex);
} catch (e) {
logger.debug("Error adding gossip unaggregated attestation to forkchoice", {subnet}, e as Error);
}
}

chain.emitter.emit(routes.events.EventType.attestation, attestation);
},

[GossipType.attester_slashing]: async ({
gossipData,
Expand Down Expand Up @@ -606,6 +550,79 @@ export function getGossipHandlers(modules: ValidatorFnsModules, options: GossipH
};
}

/**
* For now, only beacon_attestation topic is batched.
*/
function getBatchHandlers(modules: ValidatorFnsModules, options: GossipHandlerOpts): Partial<GossipHandlers> {
const {chain, metrics, logger, aggregatorTracker} = modules;
return {
[GossipType.beacon_attestation]: async (
gossipHandlerParams: GossipHandlerParamGeneric<GossipType.beacon_attestation>[]
): Promise<(null | AttestationError)[]> => {
const results: (null | AttestationError)[] = [];
const attestationCount = gossipHandlerParams.length;
if (attestationCount === 0) {
return results;
}
// all attestations should have same attestation data as filtered by network processor
const {subnet, fork} = gossipHandlerParams[0].topic;
const validationParams = gossipHandlerParams.map((param) => ({
attestation: null,
serializedData: param.gossipData.serializedData,
attSlot: param.gossipData.msgSlot,
attDataBase64: param.gossipData.indexed,
})) as AttestationOrBytes[];
const {results: validationResults, batchableBls} = await validateGossipAttestationsSameAttData(
fork,
chain,
validationParams,
subnet
);
for (const [i, validationResult] of validationResults.entries()) {
if (validationResult.err) {
results.push(validationResult.err as AttestationError);
continue;
}

results.push(null);

// Handler
const {indexedAttestation, attDataRootHex, attestation} = validationResult.result;
metrics?.registerGossipUnaggregatedAttestation(gossipHandlerParams[i].seenTimestampSec, indexedAttestation);

try {
// Node may be subscribe to extra subnets (long-lived random subnets). For those, validate the messages
// but don't add to attestation pool, to save CPU and RAM
if (aggregatorTracker.shouldAggregate(subnet, indexedAttestation.data.slot)) {
const insertOutcome = chain.attestationPool.add(attestation, attDataRootHex);
metrics?.opPool.attestationPoolInsertOutcome.inc({insertOutcome});
}
} catch (e) {
logger.error("Error adding unaggregated attestation to pool", {subnet}, e as Error);
}

if (!options.dontSendGossipAttestationsToForkchoice) {
try {
chain.forkChoice.onAttestation(indexedAttestation, attDataRootHex);
} catch (e) {
logger.debug("Error adding gossip unaggregated attestation to forkchoice", {subnet}, e as Error);
}
}

chain.emitter.emit(routes.events.EventType.attestation, attestation);
}

if (batchableBls) {
metrics?.gossipAttestation.attestationBatchHistogram.observe(attestationCount);
} else {
metrics?.gossipAttestation.attestationNonBatchCount.inc(attestationCount);
}

return results;
},
};
}

/**
* Retry a function if it throws error code UNKNOWN_OR_PREFINALIZED_BEACON_BLOCK_ROOT
*/
Expand Down

0 comments on commit 1116943

Please sign in to comment.