Skip to content

Commit

Permalink
feat: poll proposer duties of next epoch in advance
Browse files Browse the repository at this point in the history
  • Loading branch information
twoeths committed Jul 24, 2023
1 parent 3257345 commit 0c351d7
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 19 deletions.
43 changes: 33 additions & 10 deletions packages/beacon-node/src/api/impl/validator/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import {ApiModules} from "../types.js";
import {RegenCaller} from "../../../chain/regen/index.js";
import {getValidatorStatus} from "../beacon/state/utils.js";
import {validateGossipFnRetryUnknownRoot} from "../../../network/processor/gossipHandlers.js";
import {SCHEDULER_LOOKAHEAD_FACTOR} from "../../../chain/prepareNextSlot.js";
import {computeSubnetForCommitteesAtSlot, getPubkeysForIndices} from "./utils.js";

/**
Expand Down Expand Up @@ -118,15 +119,23 @@ export function getValidatorApi({
* Prevents a validator from not being able to get the attestater duties correctly if the beacon and validator clocks are off
*/
async function waitForNextClosestEpoch(): Promise<void> {
const nextEpoch = chain.clock.currentEpoch + 1;
const secPerEpoch = SLOTS_PER_EPOCH * config.SECONDS_PER_SLOT;
const nextEpochStartSec = chain.genesisTime + nextEpoch * secPerEpoch;
const msToNextEpoch = nextEpochStartSec * 1000 - Date.now();
const msToNextEpoch = timeToNextEpochInMs();
if (msToNextEpoch > 0 && msToNextEpoch < MAX_API_CLOCK_DISPARITY_MS) {
const nextEpoch = chain.clock.currentEpoch + 1;
await chain.clock.waitForSlot(computeStartSlotAtEpoch(nextEpoch));
}
}

/**
* Compute ms to the next epoch.
*/
function timeToNextEpochInMs(): number {
const nextEpoch = chain.clock.currentEpoch + 1;
const secPerEpoch = SLOTS_PER_EPOCH * config.SECONDS_PER_SLOT;
const nextEpochStartSec = chain.genesisTime + nextEpoch * secPerEpoch;
return nextEpochStartSec * 1000 - Date.now();
}

function currentEpochWithDisparity(): Epoch {
return computeEpochAtSlot(getCurrentSlot(config, chain.genesisTime - MAX_API_CLOCK_DISPARITY_SEC));
}
Expand Down Expand Up @@ -387,15 +396,29 @@ export function getValidatorApi({

// Early check that epoch is within [current_epoch, current_epoch + 1], or allow for pre-genesis
const currentEpoch = currentEpochWithDisparity();
if (currentEpoch >= 0 && epoch !== currentEpoch && epoch !== currentEpoch + 1) {
throw Error(`Requested epoch ${epoch} must equal current ${currentEpoch} or next epoch ${currentEpoch + 1}`);
const nextEpoch = currentEpoch + 1;
if (currentEpoch >= 0 && epoch !== currentEpoch && epoch !== nextEpoch) {
throw Error(`Requested epoch ${epoch} must equal current ${currentEpoch} or next epoch ${nextEpoch}`);
}

// May request for an epoch that's in the future, for getBeaconProposersNextEpoch()
await waitForNextClosestEpoch();

const head = chain.forkChoice.getHead();
const state = await chain.getHeadStateAtCurrentEpoch(RegenCaller.getDuties);
let state: CachedBeaconStateAllForks | undefined = undefined;
const prepareNextSlotLookAheadMs = (config.SECONDS_PER_SLOT * 1000) / SCHEDULER_LOOKAHEAD_FACTOR;
const cpState = chain.regen.getCheckpointStateSync({rootHex: head.blockRoot, epoch});
// validators may request next epoch's duties when it's close to next epoch
// return that asap if PrepareNextSlot already compute beacon proposers for next epoch
if (epoch === nextEpoch && timeToNextEpochInMs() < prepareNextSlotLookAheadMs) {
if (cpState) {
state = cpState;
metrics?.duties.requestNextEpochProposalDutiesHit.inc();
} else {
metrics?.duties.requestNextEpochProposalDutiesMiss.inc();
}
}

if (!state) {
state = await chain.getHeadStateAtCurrentEpoch(RegenCaller.getDuties);
}

const stateEpoch = state.epochCtx.epoch;
let indexes: ValidatorIndex[] = [];
Expand Down
2 changes: 1 addition & 1 deletion packages/beacon-node/src/chain/prepareNextSlot.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import {IBeaconChain} from "./interface.js";
import {RegenCaller} from "./regen/index.js";

/* With 12s slot times, this scheduler will run 4s before the start of each slot (`12 / 3 = 4`). */
const SCHEDULER_LOOKAHEAD_FACTOR = 3;
export const SCHEDULER_LOOKAHEAD_FACTOR = 3;

/* We don't want to do more epoch transition than this */
const PREPARE_EPOCH_LIMIT = 1;
Expand Down
11 changes: 11 additions & 0 deletions packages/beacon-node/src/metrics/metrics/lodestar.ts
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,17 @@ export function createLodestarMetrics(
}),
},

duties: {
requestNextEpochProposalDutiesHit: register.gauge({
name: "lodestar_duties_request_next_epoch_proposal_duties_hit_total",
help: "Total count of requestNextEpochProposalDuties hit",
}),
requestNextEpochProposalDutiesMiss: register.gauge({
name: "lodestar_duties_request_next_epoch_proposal_duties_miss_total",
help: "Total count of requestNextEpochProposalDuties miss",
}),
},

// Beacon state transition metrics

epochTransitionTime: register.histogram({
Expand Down
1 change: 1 addition & 0 deletions packages/validator/src/services/block.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ export class BlockProposingService {
private readonly metrics: Metrics | null
) {
this.dutiesService = new BlockDutiesService(
config,
logger,
api,
clock,
Expand Down
31 changes: 26 additions & 5 deletions packages/validator/src/services/blockDuties.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
import {toHexString} from "@chainsafe/ssz";
import {computeEpochAtSlot} from "@lodestar/state-transition";
import {computeEpochAtSlot, computeStartSlotAtEpoch} from "@lodestar/state-transition";
import {BLSPubkey, Epoch, RootHex, Slot} from "@lodestar/types";
import {Api, ApiError, routes} from "@lodestar/api";
import {sleep} from "@lodestar/utils";
import {ChainConfig} from "@lodestar/config";
import {IClock, differenceHex, LoggerVc} from "../util/index.js";
import {PubkeyHex} from "../types.js";
import {Metrics} from "../metrics.js";
import {ValidatorStore} from "./validatorStore.js";

/** This polls block duties 1s before the next epoch */
// TODO: change to 6 to do it 2s before the next epoch
// once we have some improvement on epoch transition time
// see https://github.com/ChainSafe/lodestar/issues/5409
const BLOCK_DUTIES_LOOKAHEAD_FACTOR = 12;
/** Only retain `HISTORICAL_DUTIES_EPOCHS` duties prior to the current epoch */
const HISTORICAL_DUTIES_EPOCHS = 2;
// Re-declaring to not have to depend on `lodestar-params` just for this 0
Expand All @@ -24,9 +31,10 @@ export class BlockDutiesService {
private readonly proposers = new Map<Epoch, BlockDutyAtEpoch>();

constructor(
private readonly config: ChainConfig,
private readonly logger: LoggerVc,
private readonly api: Api,
clock: IClock,
private readonly clock: IClock,
private readonly validatorStore: ValidatorStore,
private readonly metrics: Metrics | null,
notifyBlockProductionFn: NotifyBlockProductionFn
Expand Down Expand Up @@ -75,7 +83,7 @@ export class BlockDutiesService {
}
}

private runBlockDutiesTask = async (slot: Slot): Promise<void> => {
private runBlockDutiesTask = async (slot: Slot, signal: AbortSignal): Promise<void> => {
try {
if (slot < 0) {
// Before genesis, fetch the genesis duties but don't notify block production
Expand All @@ -84,7 +92,7 @@ export class BlockDutiesService {
await this.pollBeaconProposers(GENESIS_EPOCH);
}
} else {
await this.pollBeaconProposersAndNotify(slot);
await this.pollBeaconProposersAndNotify(slot, signal);
}
} catch (e) {
this.logger.error("Error on pollBeaconProposers", {}, e as Error);
Expand Down Expand Up @@ -117,8 +125,10 @@ export class BlockDutiesService {
* through the slow path every time. I.e., the proposal will only happen after we've been able to
* download and process the duties from the BN. This means it is very important to ensure this
* function is as fast as possible.
* - Starting from Jul 2023, if PrepareNextSlotScheduler runs well in bn we already have proposers of next epoch
* some time (< 1/3 slot) before the next epoch
*/
private async pollBeaconProposersAndNotify(currentSlot: Slot): Promise<void> {
private async pollBeaconProposersAndNotify(currentSlot: Slot, signal: AbortSignal): Promise<void> {
// Notify the block proposal service for any proposals that we have in our cache.
const initialBlockProposers = this.getblockProposersAtSlot(currentSlot);
if (initialBlockProposers.length > 0) {
Expand All @@ -143,6 +153,17 @@ export class BlockDutiesService {
this.logger.debug("Detected new block proposer", {currentSlot});
this.metrics?.proposerDutiesReorg.inc();
}

const nextEpoch = computeEpochAtSlot(currentSlot) + 1;
const isLastSlotEpoch = computeStartSlotAtEpoch(nextEpoch) === currentSlot + 1;
if (isLastSlotEpoch) {
const nextSlot = currentSlot + 1;
const lookAheadMs = (this.config.SECONDS_PER_SLOT * 1000) / BLOCK_DUTIES_LOOKAHEAD_FACTOR;
await sleep(this.clock.msToSlot(nextSlot) - lookAheadMs, signal);
this.logger.debug("Polling proposers for next epoch", {nextEpoch, nextSlot});
// Poll proposers for the next epoch
await this.pollBeaconProposers(nextEpoch);
}
}

private async pollBeaconProposers(epoch: Epoch): Promise<void> {
Expand Down
31 changes: 28 additions & 3 deletions packages/validator/test/unit/services/blockDuties.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import bls from "@chainsafe/bls";
import {toHexString} from "@chainsafe/ssz";
import {RootHex} from "@lodestar/types";
import {HttpStatusCode, routes} from "@lodestar/api";
import {chainConfig} from "@lodestar/config/default";
import {toHex} from "@lodestar/utils";
import {BlockDutiesService} from "../../../src/services/blockDuties.js";
import {ValidatorStore} from "../../../src/services/validatorStore.js";
Expand Down Expand Up @@ -49,7 +50,15 @@ describe("BlockDutiesService", function () {
const notifyBlockProductionFn = sinon.stub(); // Returns void

const clock = new ClockMock();
const dutiesService = new BlockDutiesService(loggerVc, api, clock, validatorStore, null, notifyBlockProductionFn);
const dutiesService = new BlockDutiesService(
chainConfig,
loggerVc,
api,
clock,
validatorStore,
null,
notifyBlockProductionFn
);

// Trigger clock onSlot for slot 0
await clock.tickSlotFns(0, controller.signal);
Expand Down Expand Up @@ -84,7 +93,15 @@ describe("BlockDutiesService", function () {

// Clock will call runAttesterDutiesTasks() immediately
const clock = new ClockMock();
const dutiesService = new BlockDutiesService(loggerVc, api, clock, validatorStore, null, notifyBlockProductionFn);
const dutiesService = new BlockDutiesService(
chainConfig,
loggerVc,
api,
clock,
validatorStore,
null,
notifyBlockProductionFn
);

// Trigger clock onSlot for slot 0
api.validator.getProposerDuties.resolves({
Expand Down Expand Up @@ -151,7 +168,15 @@ describe("BlockDutiesService", function () {
const notifyBlockProductionFn = sinon.stub(); // Returns void

const clock = new ClockMock();
const dutiesService = new BlockDutiesService(loggerVc, api, clock, validatorStore, null, notifyBlockProductionFn);
const dutiesService = new BlockDutiesService(
chainConfig,
loggerVc,
api,
clock,
validatorStore,
null,
notifyBlockProductionFn
);

// Trigger clock onSlot for slot 0
await clock.tickSlotFns(0, controller.signal);
Expand Down

0 comments on commit 0c351d7

Please sign in to comment.