Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: verifySignatureSetsSameSigningRoot bls api #5734

Closed
wants to merge 8 commits into from
11 changes: 11 additions & 0 deletions packages/beacon-node/src/chain/bls/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,17 @@ export interface IBlsVerifier {
*/
verifySignatureSets(sets: ISignatureSet[], opts?: VerifySignatureOpts): Promise<boolean>;

/**
* Similar to verifySignatureSets but:
* - all signatures have the same signing root
* - return an array of boolean, each element indicates whether the corresponding signature set is valid
* - only support `verifyOnMainThread` option.
*/
verifySignatureSetsSameSigningRoot(
sets: ISignatureSet[],
opts?: Pick<VerifySignatureOpts, "verifyOnMainThread">
): Promise<boolean[]>;

/** For multithread pool awaits terminating all workers */
close(): Promise<void>;

Expand Down
12 changes: 11 additions & 1 deletion packages/beacon-node/src/chain/bls/maybeBatch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,18 @@ export type SignatureSetDeserialized = {
* Verify signatures sets with batch verification or regular core verify depending on the set count.
* Abstracted in a separate file to be consumed by the threaded pool and the main thread implementation.
*/
export function verifySignatureSetsMaybeBatch(sets: SignatureSetDeserialized[]): boolean {
export function verifySignatureSetsMaybeBatch(sets: SignatureSetDeserialized[], isSameMessage = false): boolean {
if (sets.length >= MIN_SET_COUNT_TO_BATCH) {
if (isSameMessage) {
twoeths marked this conversation as resolved.
Show resolved Hide resolved
// Consumers need to make sure that all sets have the same message
const aggregatedPubkey = bls.PublicKey.aggregate(sets.map((set) => set.publicKey));
const aggregatedSignature = bls.Signature.aggregate(
// true = validate signature
sets.map((set) => bls.Signature.fromBytes(set.signature, CoordType.affine, true))
);
return aggregatedSignature.verify(aggregatedPubkey, sets[0].message);
}

return bls.Signature.verifyMultipleSignatures(
sets.map((s) => ({
publicKey: s.publicKey,
Expand Down
245 changes: 186 additions & 59 deletions packages/beacon-node/src/chain/bls/multithread/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,17 @@ import {Metrics} from "../../../metrics/index.js";
import {IBlsVerifier, VerifySignatureOpts} from "../interface.js";
import {getAggregatedPubkey, getAggregatedPubkeysCount} from "../utils.js";
import {verifySignatureSetsMaybeBatch} from "../maybeBatch.js";
import {BlsWorkReq, BlsWorkResult, WorkerData, WorkResultCode} from "./types.js";
import {
ApiName,
BlsWorkReq,
BlsWorkResult,
JobItem,
Jobs,
QueueItem,
SerializedSet,
WorkerData,
WorkResultCode,
} from "./types.js";
import {chunkifyMaximizeChunkSize} from "./utils.js";
import {defaultPoolSize} from "./poolSize.js";

Expand Down Expand Up @@ -62,15 +72,25 @@ const MAX_BUFFER_WAIT_MS = 100;
const MAX_JOBS_CAN_ACCEPT_WORK = 512;

type WorkerApi = {
verifyManySignatureSetsSameMessage(sets: SerializedSet[]): Promise<BlsWorkResult>;
verifyManySignatureSets(workReqArr: BlsWorkReq[]): Promise<BlsWorkResult>;
};

type JobQueueItem<R = boolean> = {
resolve: (result: R | PromiseLike<R>) => void;
reject: (error?: Error) => void;
addedTimeMs: number;
workReq: BlsWorkReq;
};
function isMultiSigsJobItem(job: JobItem<BlsWorkReq> | JobItem<SerializedSet>): job is JobItem<BlsWorkReq> {
return (job as JobItem<BlsWorkReq>).workReq.sets != null;
}

function isMultiSigsQueueItem(queueItem: QueueItem): queueItem is JobItem<BlsWorkReq> {
return !Array.isArray(queueItem);
}

function isSameMessageQueueItem(queueItem: QueueItem): queueItem is JobItem<SerializedSet>[] {
return Array.isArray(queueItem);
}

function isMultiSigsJobs(jobs: Jobs): jobs is {isSameMessageJobs: false; jobs: JobItem<BlsWorkReq>[]} {
return !jobs.isSameMessageJobs;
}

enum WorkerStatusCode {
notInitialized,
Expand Down Expand Up @@ -106,9 +126,9 @@ export class BlsMultiThreadWorkerPool implements IBlsVerifier {

private readonly format: PointFormat;
private readonly workers: WorkerDescriptor[];
private readonly jobs: JobQueueItem[] = [];
private readonly jobs: QueueItem[] = [];
private bufferedJobs: {
matthewkeil marked this conversation as resolved.
Show resolved Hide resolved
jobs: JobQueueItem[];
jobs: QueueItem[];
sigCount: number;
firstPush: number;
timeout: NodeJS.Timeout;
Expand Down Expand Up @@ -163,7 +183,7 @@ export class BlsMultiThreadWorkerPool implements IBlsVerifier {
}))
);
} finally {
if (timer) timer();
if (timer) timer({api: ApiName.verifySignatureSets});
}
}

Expand All @@ -190,14 +210,63 @@ export class BlsMultiThreadWorkerPool implements IBlsVerifier {
return results.every((isValid) => isValid === true);
}

async verifySignatureSetsSameSigningRoot(
sets: ISignatureSet[],
opts?: Pick<VerifySignatureOpts, "verifyOnMainThread">
): Promise<boolean[]> {
if (opts?.verifyOnMainThread && !this.blsVerifyAllMultiThread) {
const isSameMessage = true;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about a metric for main thread time similar to the other case?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also be counting the sets and keys to see what kind of gains we get under load?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for reminding this, all metrics are available, I only need to add "api" label there

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also be counting the sets and keys to see what kind of gains we get under load?

we counted the api call, regarding the gain we can count on bls.test.ts perf test

let isAllValid = false;
const timer = this.metrics?.blsThreadPool.mainThreadDurationInThreadPool.startTimer();
try {
isAllValid = verifySignatureSetsMaybeBatch(
sets.map((set) => ({
publicKey: getAggregatedPubkey(set),
message: set.signingRoot.valueOf(),
signature: set.signature,
})),
isSameMessage
);
} finally {
if (timer) timer({api: ApiName.verifySignatureSetsSameSigningRoot});
}

if (isAllValid) {
return sets.map(() => true);
}
// when retry, always verify on worker thread
return Promise.all(
// batchable = false because at least one of signatures is invalid
// verifyOnMainThread = false because it takes time to verify all signatures
sets.map((set) => this.verifySignatureSets([set], {batchable: false, verifyOnMainThread: false}))
);
}

// distribute to multiple workers if there are more than 128 sets
const chunkResults = await Promise.all(
chunkifyMaximizeChunkSize(sets, MAX_SIGNATURE_SETS_PER_JOB).map(async (setsWorker) =>
Promise.all(this.queueSameMessageSignatureSets(setsWorker))
)
);

return chunkResults.flat();
}

async close(): Promise<void> {
if (this.bufferedJobs) {
clearTimeout(this.bufferedJobs.timeout);
}

// Abort all jobs
for (const job of this.jobs) {
job.reject(new QueueError({code: QueueErrorCode.QUEUE_ABORTED}));
if (isMultiSigsQueueItem(job)) {
job.reject(new QueueError({code: QueueErrorCode.QUEUE_ABORTED}));
} else {
// this is an array of same message job items
for (const jobItem of job) {
jobItem.reject(new QueueError({code: QueueErrorCode.QUEUE_ABORTED}));
}
}
}
this.jobs.splice(0, this.jobs.length);

Expand Down Expand Up @@ -249,25 +318,40 @@ export class BlsMultiThreadWorkerPool implements IBlsVerifier {
return workers;
}

private queueSameMessageSignatureSets(workReq: ISignatureSet[]): Promise<boolean>[] {
this.checkWorkers();

const jobItems: JobItem<SerializedSet>[] = [];
const now = Date.now();

const promises = workReq.map((set) => {
return new Promise<boolean>((resolve, reject) => {
jobItems.push({
resolve,
reject,
addedTimeMs: now,
workReq: {
publicKey: getAggregatedPubkey(set).toBytes(this.format),
message: set.signingRoot,
signature: set.signature,
},
});
});
});

// no need to buffer same message jobs
// Push job and schedule to call `runJob` in the next macro event loop cycle.
this.jobs.push(jobItems);
setTimeout(this.runJob, 0);

return promises;
}

/**
* Register BLS work to be done eventually in a worker
*/
private async queueBlsWork(workReq: BlsWorkReq): Promise<boolean> {
if (this.closed) {
throw new QueueError({code: QueueErrorCode.QUEUE_ABORTED});
}

// TODO: Consider if limiting queue size is necessary here.
// It would be bad to reject signatures because the node is slow.
// However, if the worker communication broke jobs won't ever finish

if (
this.workers.length > 0 &&
this.workers[0].status.code === WorkerStatusCode.initializationError &&
this.workers.every((worker) => worker.status.code === WorkerStatusCode.initializationError)
) {
throw this.workers[0].status.error;
}
this.checkWorkers();

return new Promise<boolean>((resolve, reject) => {
const job = {resolve, reject, addedTimeMs: Date.now(), workReq};
Expand Down Expand Up @@ -301,6 +385,24 @@ export class BlsMultiThreadWorkerPool implements IBlsVerifier {
});
}

private checkWorkers(): void {
if (this.closed) {
throw new QueueError({code: QueueErrorCode.QUEUE_ABORTED});
}

// TODO: Consider if limiting queue size is necessary here.
// It would be bad to reject signatures because the node is slow.
// However, if the worker communication broke jobs won't ever finish

if (
this.workers.length > 0 &&
this.workers[0].status.code === WorkerStatusCode.initializationError &&
this.workers.every((worker) => worker.status.code === WorkerStatusCode.initializationError)
) {
throw this.workers[0].status.error;
}
}

/**
* Potentially submit jobs to an idle worker, only if there's a worker and jobs
*/
Expand All @@ -316,7 +418,8 @@ export class BlsMultiThreadWorkerPool implements IBlsVerifier {
}

// Prepare work package
const jobs = this.prepareWork();
const typedJobs = prepareWork(this.jobs);
const {jobs} = typedJobs;
if (jobs.length === 0) {
return;
}
Expand All @@ -328,12 +431,13 @@ export class BlsMultiThreadWorkerPool implements IBlsVerifier {
const workerApi = worker.status.workerApi;
worker.status = {code: WorkerStatusCode.running, workerApi};
this.workersBusy++;
const api = isMultiSigsJobs(typedJobs) ? ApiName.verifySignatureSets : ApiName.verifySignatureSetsSameSigningRoot;

try {
let startedSigSets = 0;
for (const job of jobs) {
this.metrics?.blsThreadPool.jobWaitTime.observe((Date.now() - job.addedTimeMs) / 1000);
startedSigSets += job.workReq.sets.length;
this.metrics?.blsThreadPool.jobWaitTime.observe({api}, (Date.now() - job.addedTimeMs) / 1000);
startedSigSets += isMultiSigsJobItem(job) ? job.workReq.sets.length : 1;
}

this.metrics?.blsThreadPool.totalJobsGroupsStarted.inc(1);
Expand All @@ -345,7 +449,10 @@ export class BlsMultiThreadWorkerPool implements IBlsVerifier {
// Only downside is the the job promise may be resolved twice, but that's not an issue

const jobStartNs = process.hrtime.bigint();
const workResult = await workerApi.verifyManySignatureSets(jobs.map((job) => job.workReq));
this.metrics?.blsThreadPool.workerApiCalls.inc({api});
const workResult = isMultiSigsJobs(typedJobs)
? await workerApi.verifyManySignatureSets(typedJobs.jobs.map((job) => job.workReq))
: await workerApi.verifyManySignatureSetsSameMessage(typedJobs.jobs.map((job) => job.workReq));
const jobEndNs = process.hrtime.bigint();
const {workerId, batchRetries, batchSigsSuccess, workerStartNs, workerEndNs, results} = workResult;

Expand All @@ -356,7 +463,7 @@ export class BlsMultiThreadWorkerPool implements IBlsVerifier {
for (let i = 0; i < jobs.length; i++) {
const job = jobs[i];
const jobResult = results[i];
const sigSetCount = job.workReq.sets.length;
const sigSetCount = isMultiSigsJobItem(job) ? job.workReq.sets.length : 1;
if (!jobResult) {
job.reject(Error(`No jobResult for index ${i}`));
errorCount += sigSetCount;
Expand All @@ -375,14 +482,15 @@ export class BlsMultiThreadWorkerPool implements IBlsVerifier {
const latencyToWorkerSec = Number(workerStartNs - jobStartNs) / 1e9;
const latencyFromWorkerSec = Number(jobEndNs - workerEndNs) / 1e9;

this.metrics?.blsThreadPool.timePerSigSet.observe(workerJobTimeSec / startedSigSets);
this.metrics?.blsThreadPool.jobsWorkerTime.inc({workerId}, workerJobTimeSec);
this.metrics?.blsThreadPool.latencyToWorker.observe(latencyToWorkerSec);
this.metrics?.blsThreadPool.latencyFromWorker.observe(latencyFromWorkerSec);
this.metrics?.blsThreadPool.successJobsSignatureSetsCount.inc(successCount);
this.metrics?.blsThreadPool.errorJobsSignatureSetsCount.inc(errorCount);
this.metrics?.blsThreadPool.batchRetries.inc(batchRetries);
this.metrics?.blsThreadPool.batchSigsSuccess.inc(batchSigsSuccess);
this.metrics?.blsThreadPool.timePerSigSet.observe({api}, workerJobTimeSec / startedSigSets);
this.metrics?.blsThreadPool.jobsWorkerTimeByWorkerId.inc({workerId}, workerJobTimeSec);
this.metrics?.blsThreadPool.jobsWorkerTimeByApi.inc({api}, workerJobTimeSec);
this.metrics?.blsThreadPool.latencyToWorker.observe({api}, latencyToWorkerSec);
this.metrics?.blsThreadPool.latencyFromWorker.observe({api}, latencyFromWorkerSec);
this.metrics?.blsThreadPool.successJobsSignatureSetsCount.inc({api}, successCount);
this.metrics?.blsThreadPool.errorJobsSignatureSetsCount.inc({api}, errorCount);
this.metrics?.blsThreadPool.batchRetries.inc({api}, batchRetries);
this.metrics?.blsThreadPool.batchSigsSuccess.inc({api}, batchSigsSuccess);
} catch (e) {
// Worker communications should never reject
if (!this.closed) this.logger.error("BlsMultiThreadWorkerPool error", {}, e as Error);
Expand All @@ -399,26 +507,6 @@ export class BlsMultiThreadWorkerPool implements IBlsVerifier {
setTimeout(this.runJob, 0);
};

/**
* Grab pending work up to a max number of signatures
*/
private prepareWork(): JobQueueItem<boolean>[] {
const jobs: JobQueueItem<boolean>[] = [];
let totalSigs = 0;

while (totalSigs < MAX_SIGNATURE_SETS_PER_JOB) {
const job = this.jobs.shift();
if (!job) {
break;
}

jobs.push(job);
totalSigs += job.workReq.sets.length;
}

return jobs;
}

/**
* Add all buffered jobs to the job queue and potentially run them immediately
*/
Expand All @@ -441,3 +529,42 @@ export class BlsMultiThreadWorkerPool implements IBlsVerifier {
);
}
}

/**
* Grab pending work up to a max number of signatures
*/
export function prepareWork(jobs: QueueItem[], maxSignatureSet = MAX_SIGNATURE_SETS_PER_JOB): Jobs {
const jobItems: JobItem<BlsWorkReq>[] = [];
let totalSigs = 0;

while (totalSigs < maxSignatureSet) {
const job = jobs[0];
if (!job) {
break;
}

// first item
if (jobItems.length === 0) {
if (isSameMessageQueueItem(job)) {
jobs.shift();
return {isSameMessageJobs: true, jobs: job};
}
} else {
// from 2nd item make sure all items are of the multi sigs type
if (!isMultiSigsQueueItem(job)) {
break;
}
}

// should not happen, job should be multi sigs
if (!isMultiSigsJobItem(job)) {
throw Error("Unexpected job type");
}

jobs.shift();
jobItems.push(job);
totalSigs += job.workReq.sets.length;
}

return {isSameMessageJobs: false, jobs: jobItems};
}
Loading