Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support fetching historical proposer duties #7130

Merged
merged 14 commits into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 60 additions & 14 deletions packages/beacon-node/src/api/impl/validator/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import {PubkeyIndexMap} from "@chainsafe/pubkey-index-map";
import {routes} from "@lodestar/api";
import {ApplicationMethods} from "@lodestar/api/server";
import {
Expand All @@ -10,6 +11,8 @@ import {
computeEpochAtSlot,
getCurrentSlot,
beaconBlockToBlinded,
createCachedBeaconState,
loadState,
} from "@lodestar/state-transition";
import {
GENESIS_SLOT,
Expand Down Expand Up @@ -62,6 +65,7 @@ import {validateSyncCommitteeGossipContributionAndProof} from "../../../chain/va
import {CommitteeSubscription} from "../../../network/subnets/index.js";
import {ApiModules} from "../types.js";
import {RegenCaller} from "../../../chain/regen/index.js";
import {getStateResponseWithRegen} from "../beacon/state/utils.js";
import {validateGossipFnRetryUnknownRoot} from "../../../network/processor/gossipHandlers.js";
import {SCHEDULER_LOOKAHEAD_FACTOR} from "../../../chain/prepareNextSlot.js";
import {ChainEvent, CheckpointHex, CommonBlockBody} from "../../../chain/index.js";
Expand Down Expand Up @@ -888,15 +892,16 @@ export function getValidatorApi(
async getProposerDuties({epoch}) {
notWhileSyncing();

// Early check that epoch is within [current_epoch, current_epoch + 1], or allow for pre-genesis
// Early check that epoch is no more than current_epoch + 1, or allow for pre-genesis
const currentEpoch = currentEpochWithDisparity();
const nextEpoch = currentEpoch + 1;
if (currentEpoch >= 0 && epoch !== currentEpoch && epoch !== nextEpoch) {
throw Error(`Requested epoch ${epoch} must equal current ${currentEpoch} or next epoch ${nextEpoch}`);
if (currentEpoch >= 0 && epoch > nextEpoch) {
throw new ApiError(400, `Requested epoch ${epoch} must not be more than one epoch in the future`);
}

const head = chain.forkChoice.getHead();
let state: CachedBeaconStateAllForks | undefined = undefined;
const startSlot = computeStartSlotAtEpoch(epoch);
const slotMs = config.SECONDS_PER_SLOT * 1000;
const prepareNextSlotLookAheadMs = slotMs / SCHEDULER_LOOKAHEAD_FACTOR;
const toNextEpochMs = msToNextEpoch();
Expand All @@ -914,21 +919,63 @@ export function getValidatorApi(
}

if (!state) {
state = await chain.getHeadStateAtCurrentEpoch(RegenCaller.getDuties);
if (epoch >= currentEpoch - 1) {
twoeths marked this conversation as resolved.
Show resolved Hide resolved
// Cached beacon state stores proposers for previous, current and next epoch. The
// requested epoch is within that range, we can use the head state at current epoch
state = await chain.getHeadStateAtCurrentEpoch(RegenCaller.getDuties);
} else {
const res = await getStateResponseWithRegen(chain, startSlot);

const stateViewDU =
res.state instanceof Uint8Array
? loadState(config, chain.getHeadState(), res.state).state
nflaig marked this conversation as resolved.
Show resolved Hide resolved
: res.state.clone();

state = createCachedBeaconState(
stateViewDU,
{
config: chain.config,
// Not required to compute proposers
pubkey2index: new PubkeyIndexMap(),
index2pubkey: [],
},
{skipSyncPubkeys: true, skipSyncCommitteeCache: true}
);

if (state.epochCtx.epoch !== epoch) {
throw Error(`Loaded state epoch ${state.epochCtx.epoch} does not match requested epoch ${epoch}`);
}
}
}

const stateEpoch = state.epochCtx.epoch;
let indexes: ValidatorIndex[] = [];

if (epoch === stateEpoch) {
indexes = state.epochCtx.getBeaconProposers();
} else if (epoch === stateEpoch + 1) {
// Requesting duties for next epoch is allow since they can be predicted with high probabilities.
// @see `epochCtx.getBeaconProposersNextEpoch` JSDocs for rationale.
indexes = state.epochCtx.getBeaconProposersNextEpoch();
} else {
// Should never happen, epoch is checked to be in bounds above
throw Error(`Proposer duties for epoch ${epoch} not supported, current epoch ${stateEpoch}`);
switch (epoch) {
case stateEpoch:
indexes = state.epochCtx.getBeaconProposers();
break;

case stateEpoch + 1:
// Requesting duties for next epoch is allowed since they can be predicted with high probabilities.
// @see `epochCtx.getBeaconProposersNextEpoch` JSDocs for rationale.
indexes = state.epochCtx.getBeaconProposersNextEpoch();
break;

case stateEpoch - 1: {
const indexesPrevEpoch = state.epochCtx.getBeaconProposersPrevEpoch();
if (indexesPrevEpoch === null) {
// Should not happen as previous proposer duties should be initialized for head state
// and if we load state from `Uint8Array` it will always be the state of requested epoch
throw Error(`Proposer duties for previous epoch ${epoch} not yet initialized`);
}
indexes = indexesPrevEpoch;
break;
}

default:
// Should never happen, epoch is checked to be in bounds above
throw Error(`Proposer duties for epoch ${epoch} not supported, current epoch ${stateEpoch}`);
}

// NOTE: this is the fastest way of getting compressed pubkeys.
Expand All @@ -937,7 +984,6 @@ export function getValidatorApi(
// TODO: Add a flag to just send 0x00 as pubkeys since the Lodestar validator does not need them.
const pubkeys = getPubkeysForIndices(state.validators, indexes);

const startSlot = computeStartSlotAtEpoch(epoch);
const duties: routes.validator.ProposerDuty[] = [];
for (let i = 0; i < SLOTS_PER_EPOCH; i++) {
duties.push({slot: startSlot + i, validatorIndex: indexes[i], pubkey: pubkeys[i]});
Expand Down
4 changes: 4 additions & 0 deletions packages/beacon-node/test/mocks/mockedBeaconChain.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import {vi, Mocked, Mock} from "vitest";
import {PubkeyIndexMap} from "@chainsafe/pubkey-index-map";
import {config as defaultConfig} from "@lodestar/config/default";
import {ChainForkConfig} from "@lodestar/config";
import {ForkChoice, ProtoBlock, EpochDifference} from "@lodestar/fork-choice";
Expand Down Expand Up @@ -126,6 +127,8 @@ vi.mock("../../src/chain/chain.js", async (importActual) => {
// @ts-expect-error
beaconProposerCache: new BeaconProposerCache(),
shufflingCache: new ShufflingCache(),
pubkey2index: new PubkeyIndexMap(),
index2pubkey: [],
produceCommonBlockBody: vi.fn(),
getProposerHead: vi.fn(),
produceBlock: vi.fn(),
Expand All @@ -135,6 +138,7 @@ vi.mock("../../src/chain/chain.js", async (importActual) => {
predictProposerHead: vi.fn(),
getHeadStateAtCurrentEpoch: vi.fn(),
getHeadState: vi.fn(),
getStateBySlot: vi.fn(),
updateBuilderStatus: vi.fn(),
processBlock: vi.fn(),
regenStateForAttestationVerification: vi.fn(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {routes} from "@lodestar/api";
import {config} from "@lodestar/config/default";
import {MAX_EFFECTIVE_BALANCE, SLOTS_PER_EPOCH} from "@lodestar/params";
import {BeaconStateAllForks} from "@lodestar/state-transition";
import {Slot} from "@lodestar/types";
import {ApiTestModules, getApiTestModules} from "../../../../../utils/api.js";
import {FAR_FUTURE_EPOCH} from "../../../../../../src/constants/index.js";
import {SYNC_TOLERANCE_EPOCHS, getValidatorApi} from "../../../../../../src/api/impl/validator/index.js";
Expand All @@ -13,19 +14,34 @@ import {SyncState} from "../../../../../../src/sync/interface.js";
import {defaultApiOptions} from "../../../../../../src/api/options.js";

describe("get proposers api impl", function () {
const currentEpoch = 2;
const currentSlot = SLOTS_PER_EPOCH * currentEpoch;

let api: ReturnType<typeof getValidatorApi>;
let modules: ApiTestModules;
let state: BeaconStateAllForks;
let cachedState: ReturnType<typeof createCachedBeaconStateTest>;

beforeEach(function () {
vi.useFakeTimers({now: 0});
vi.advanceTimersByTime(currentSlot * config.SECONDS_PER_SLOT * 1000);
modules = getApiTestModules({clock: "real"});
api = getValidatorApi(defaultApiOptions, modules);

initializeState(currentSlot);

modules.chain.getHeadStateAtCurrentEpoch.mockResolvedValue(cachedState);
modules.forkChoice.getHead.mockReturnValue(zeroProtoBlock);
modules.forkChoice.getFinalizedBlock.mockReturnValue(zeroProtoBlock);
modules.db.block.get.mockResolvedValue({message: {stateRoot: Buffer.alloc(32)}} as any);

vi.spyOn(modules.sync, "state", "get").mockReturnValue(SyncState.Synced);
});

function initializeState(slot: Slot): void {
state = generateState(
{
slot: 0,
slot,
validators: generateValidators(25, {
effectiveBalance: MAX_EFFECTIVE_BALANCE,
activationEpoch: 0,
Expand All @@ -37,14 +53,10 @@ describe("get proposers api impl", function () {
);
cachedState = createCachedBeaconStateTest(state, config);

modules.chain.getHeadStateAtCurrentEpoch.mockResolvedValue(cachedState);
modules.forkChoice.getHead.mockReturnValue(zeroProtoBlock);
modules.db.block.get.mockResolvedValue({message: {stateRoot: Buffer.alloc(32)}} as any);

vi.spyOn(modules.sync, "state", "get").mockReturnValue(SyncState.Synced);
vi.spyOn(cachedState.epochCtx, "getBeaconProposersNextEpoch");
vi.spyOn(cachedState.epochCtx, "getBeaconProposers");
});
vi.spyOn(cachedState.epochCtx, "getBeaconProposersPrevEpoch");
}

afterEach(() => {
vi.useRealTimers();
Expand All @@ -54,7 +66,7 @@ describe("get proposers api impl", function () {
vi.advanceTimersByTime((SYNC_TOLERANCE_EPOCHS * SLOTS_PER_EPOCH + 1) * config.SECONDS_PER_SLOT * 1000);
vi.spyOn(modules.sync, "state", "get").mockReturnValue(SyncState.SyncingHead);

await expect(api.getProposerDuties({epoch: 1})).rejects.toThrow("Node is syncing - headSlot 0 currentSlot 9");
await expect(api.getProposerDuties({epoch: 1})).rejects.toThrow("Node is syncing - headSlot 0 currentSlot 25");
});

it("should raise error if node stalled", async () => {
Expand All @@ -65,34 +77,61 @@ describe("get proposers api impl", function () {
});

it("should get proposers for current epoch", async () => {
const {data: result} = (await api.getProposerDuties({epoch: 0})) as {data: routes.validator.ProposerDutyList};
const {data: result} = (await api.getProposerDuties({epoch: currentEpoch})) as {
data: routes.validator.ProposerDutyList;
};

expect(result.length).toBe(SLOTS_PER_EPOCH);
expect(cachedState.epochCtx.getBeaconProposers).toHaveBeenCalledOnce();
expect(cachedState.epochCtx.getBeaconProposersNextEpoch).not.toHaveBeenCalled();
expect(result.map((p) => p.slot)).toEqual(Array.from({length: SLOTS_PER_EPOCH}, (_, i) => i));
expect(cachedState.epochCtx.getBeaconProposersPrevEpoch).not.toHaveBeenCalled();
expect(result.map((p) => p.slot)).toEqual(
Array.from({length: SLOTS_PER_EPOCH}, (_, i) => currentEpoch * SLOTS_PER_EPOCH + i)
);
});

it("should get proposers for next epoch", async () => {
const {data: result} = (await api.getProposerDuties({epoch: 1})) as {data: routes.validator.ProposerDutyList};
const nextEpoch = currentEpoch + 1;
const {data: result} = (await api.getProposerDuties({epoch: nextEpoch})) as {
data: routes.validator.ProposerDutyList;
};

expect(result.length).toBe(SLOTS_PER_EPOCH);
expect(cachedState.epochCtx.getBeaconProposers).not.toHaveBeenCalled();
expect(cachedState.epochCtx.getBeaconProposersNextEpoch).toHaveBeenCalledOnce();
expect(result.map((p) => p.slot)).toEqual(Array.from({length: SLOTS_PER_EPOCH}, (_, i) => SLOTS_PER_EPOCH + i));
expect(cachedState.epochCtx.getBeaconProposersPrevEpoch).not.toHaveBeenCalled();
expect(result.map((p) => p.slot)).toEqual(
Array.from({length: SLOTS_PER_EPOCH}, (_, i) => nextEpoch * SLOTS_PER_EPOCH + i)
);
});

it("should get proposers for historical epoch", async () => {
const historicalEpoch = currentEpoch - 2;
initializeState(currentSlot - 2 * SLOTS_PER_EPOCH);
modules.chain.getStateBySlot.mockResolvedValue({state, executionOptimistic: false, finalized: true});

const {data: result} = (await api.getProposerDuties({epoch: historicalEpoch})) as {
data: routes.validator.ProposerDutyList;
};

expect(result.length).toBe(SLOTS_PER_EPOCH);
// Spy won't be called as `getProposerDuties` will create a new cached beacon state
expect(result.map((p) => p.slot)).toEqual(
Array.from({length: SLOTS_PER_EPOCH}, (_, i) => historicalEpoch * SLOTS_PER_EPOCH + i)
);
});

it("should raise error for more than one epoch in the future", async () => {
await expect(api.getProposerDuties({epoch: 2})).rejects.toThrow(
"Requested epoch 2 must equal current 0 or next epoch 1"
await expect(api.getProposerDuties({epoch: currentEpoch + 2})).rejects.toThrow(
"Requested epoch 4 must not be more than one epoch in the future"
);
});

it("should have different proposer validator public keys for current and next epoch", async () => {
const {data: currentProposers} = (await api.getProposerDuties({epoch: 0})) as {
const {data: currentProposers} = (await api.getProposerDuties({epoch: currentEpoch})) as {
data: routes.validator.ProposerDutyList;
};
const {data: nextProposers} = (await api.getProposerDuties({epoch: 1})) as {
const {data: nextProposers} = (await api.getProposerDuties({epoch: currentEpoch + 1})) as {
data: routes.validator.ProposerDutyList;
};

Expand All @@ -101,21 +140,21 @@ describe("get proposers api impl", function () {
});

it("should have different proposer validator indexes for current and next epoch", async () => {
const {data: currentProposers} = (await api.getProposerDuties({epoch: 0})) as {
const {data: currentProposers} = (await api.getProposerDuties({epoch: currentEpoch})) as {
data: routes.validator.ProposerDutyList;
};
const {data: nextProposers} = (await api.getProposerDuties({epoch: 1})) as {
const {data: nextProposers} = (await api.getProposerDuties({epoch: currentEpoch + 1})) as {
data: routes.validator.ProposerDutyList;
};

expect(currentProposers.map((p) => p.validatorIndex)).not.toEqual(nextProposers.map((p) => p.validatorIndex));
});

it("should have different proposer slots for current and next epoch", async () => {
const {data: currentProposers} = (await api.getProposerDuties({epoch: 0})) as {
const {data: currentProposers} = (await api.getProposerDuties({epoch: currentEpoch})) as {
data: routes.validator.ProposerDutyList;
};
const {data: nextProposers} = (await api.getProposerDuties({epoch: 1})) as {
const {data: nextProposers} = (await api.getProposerDuties({epoch: currentEpoch + 1})) as {
data: routes.validator.ProposerDutyList;
};

Expand Down
4 changes: 4 additions & 0 deletions packages/state-transition/src/cache/epochCache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -877,6 +877,10 @@ export class EpochCache {
return this.proposers;
}

getBeaconProposersPrevEpoch(): ValidatorIndex[] | null {
return this.proposersPrevEpoch;
}

/**
* We allow requesting proposal duties 1 epoch in the future as in normal network conditions it's possible to predict
* the correct shuffling with high probability. While knowing the proposers in advance is not useful for consensus,
Expand Down
Loading