From a7c689f5ebbb0b37bbe6bd448fd9c29fa7dc55ac Mon Sep 17 00:00:00 2001 From: Nico Flaig Date: Fri, 2 Aug 2024 23:02:49 +0100 Subject: [PATCH] feat: track syncing status and fetch duties on resynced (#6995) * feat: track syncing status and fetch duties on synced * Rename scheduling function to runOnResynced * Consider prev offline and syncing to trigger resynced event handlers * Add comment to error handler * Add note about el offline and sycning not considered * Align syncing status logs with existing node is syncing logs * Cleanup * Add ssz support to syncing status api * Align beacon node code to return proper types * Keep track of error in prev syncing status * Print slot in error log * Skip on first slot of epoch since tasks are already scheduled * Update api test data * Fix endpoint tests * await scheduled tasks, mostly relevant for testing * Add unit tests * Move beacon heath metric to syncing status tracker * Add beacon health panel to validator client dashboard * Formatting * Improve info called once assertion * Reset mocks after each test --- dashboards/lodestar_validator_client.json | 134 +++++++++++++--- packages/api/src/beacon/routes/node.ts | 34 ++-- .../api/test/unit/beacon/testData/node.ts | 2 +- packages/beacon-node/src/sync/sync.ts | 12 +- .../api/impl/beacon/node/endpoints.test.ts | 7 +- packages/cli/test/sim/endpoints.test.ts | 4 +- packages/state-transition/src/util/epoch.ts | 7 + packages/validator/src/metrics.ts | 6 +- .../validator/src/services/attestation.ts | 17 +- .../src/services/attestationDuties.ts | 10 +- .../validator/src/services/syncCommittee.ts | 17 +- .../src/services/syncCommitteeDuties.ts | 16 +- .../src/services/syncingStatusTracker.ts | 74 +++++++++ packages/validator/src/validator.ts | 32 ++-- .../test/unit/services/attestation.test.ts | 5 + .../unit/services/attestationDuties.test.ts | 124 ++++++++++++--- .../unit/services/syncCommitteDuties.test.ts | 129 +++++++++++++-- .../test/unit/services/syncCommittee.test.ts | 5 + .../services/syncingStatusTracker.test.ts | 149 ++++++++++++++++++ .../validator/test/unit/utils/metrics.test.ts | 2 +- packages/validator/test/utils/apiStub.ts | 3 + packages/validator/test/utils/logger.ts | 14 ++ 22 files changed, 693 insertions(+), 110 deletions(-) create mode 100644 packages/validator/src/services/syncingStatusTracker.ts create mode 100644 packages/validator/test/unit/services/syncingStatusTracker.test.ts diff --git a/dashboards/lodestar_validator_client.json b/dashboards/lodestar_validator_client.json index 8ec6a04437b1..7ede7ba0bcff 100644 --- a/dashboards/lodestar_validator_client.json +++ b/dashboards/lodestar_validator_client.json @@ -71,6 +71,7 @@ "mode": "palette-classic" }, "custom": { + "axisBorderShow": false, "axisCenteredZero": false, "axisColorMode": "text", "axisLabel": "msg / slot", @@ -211,10 +212,12 @@ "fields": "", "values": false }, + "showPercentChange": false, "text": {}, - "textMode": "name" + "textMode": "name", + "wideLayout": true }, - "pluginVersion": "10.1.1", + "pluginVersion": "10.4.1", "targets": [ { "datasource": { @@ -231,7 +234,6 @@ } ], "title": "Lodestar version", - "transformations": [], "type": "stat" }, { @@ -267,10 +269,12 @@ "fields": "", "values": false }, + "showPercentChange": false, "text": {}, - "textMode": "name" + "textMode": "name", + "wideLayout": true }, - "pluginVersion": "10.1.1", + "pluginVersion": "10.4.1", "targets": [ { "datasource": { @@ -321,10 +325,12 @@ "fields": "", "values": false }, + "showPercentChange": false, "text": {}, - "textMode": "name" + "textMode": "name", + "wideLayout": true }, - "pluginVersion": "10.1.1", + "pluginVersion": "10.4.1", "targets": [ { "datasource": { @@ -359,7 +365,7 @@ }, "gridPos": { "h": 2, - "w": 4, + "w": 2, "x": 20, "y": 0 }, @@ -376,9 +382,11 @@ "fields": "", "values": false }, - "textMode": "auto" + "showPercentChange": false, + "textMode": "auto", + "wideLayout": true }, - "pluginVersion": "10.1.1", + "pluginVersion": "10.4.1", "targets": [ { "datasource": { @@ -395,6 +403,83 @@ "title": "VC indices", "type": "stat" }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "description": "", + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [ + { + "options": { + "0": { + "color": "green", + "index": 0, + "text": "Ready" + }, + "1": { + "color": "yellow", + "index": 1, + "text": "Syncing" + }, + "2": { + "color": "red", + "index": 2, + "text": "Error" + } + }, + "type": "value" + } + ] + }, + "overrides": [] + }, + "gridPos": { + "h": 2, + "w": 2, + "x": 22, + "y": 0 + }, + "id": 47, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "showPercentChange": false, + "textMode": "auto", + "wideLayout": true + }, + "pluginVersion": "10.4.1", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "editorMode": "code", + "expr": "vc_beacon_health", + "instant": false, + "legendFormat": "__auto", + "range": true, + "refId": "A" + } + ], + "title": "Beacon health", + "type": "stat" + }, { "datasource": { "type": "prometheus", @@ -429,9 +514,11 @@ "fields": "", "values": false }, - "textMode": "auto" + "showPercentChange": false, + "textMode": "auto", + "wideLayout": true }, - "pluginVersion": "10.1.1", + "pluginVersion": "10.4.1", "targets": [ { "datasource": { @@ -482,9 +569,11 @@ "fields": "", "values": false }, - "textMode": "auto" + "showPercentChange": false, + "textMode": "auto", + "wideLayout": true }, - "pluginVersion": "10.1.1", + "pluginVersion": "10.4.1", "targets": [ { "datasource": { @@ -622,7 +711,7 @@ }, "showHeader": false }, - "pluginVersion": "10.1.1", + "pluginVersion": "10.4.1", "targets": [ { "datasource": { @@ -701,6 +790,7 @@ "mode": "palette-classic" }, "custom": { + "axisBorderShow": false, "axisCenteredZero": false, "axisColorMode": "text", "axisLabel": "", @@ -866,6 +956,7 @@ "mode": "palette-classic" }, "custom": { + "axisBorderShow": false, "axisCenteredZero": false, "axisColorMode": "text", "axisLabel": "", @@ -1019,7 +1110,8 @@ }, "showValue": "never", "tooltip": { - "show": true, + "mode": "single", + "showColorScale": false, "yHistogram": false }, "yAxis": { @@ -1028,7 +1120,7 @@ "unit": "s" } }, - "pluginVersion": "10.1.1", + "pluginVersion": "10.4.1", "reverseYBuckets": false, "targets": [ { @@ -1133,7 +1225,8 @@ }, "showValue": "never", "tooltip": { - "show": true, + "mode": "single", + "showColorScale": false, "yHistogram": false }, "yAxis": { @@ -1142,7 +1235,7 @@ "unit": "s" } }, - "pluginVersion": "10.1.1", + "pluginVersion": "10.4.1", "reverseYBuckets": false, "targets": [ { @@ -2344,8 +2437,7 @@ ], "refresh": "10s", "revision": 1, - "schemaVersion": 38, - "style": "dark", + "schemaVersion": 39, "tags": [ "lodestar" ], diff --git a/packages/api/src/beacon/routes/node.ts b/packages/api/src/beacon/routes/node.ts index 1ff0378c3330..0744b5f07452 100644 --- a/packages/api/src/beacon/routes/node.ts +++ b/packages/api/src/beacon/routes/node.ts @@ -43,6 +43,22 @@ export const PeerCountType = new ContainerType( {jsonCase: "eth2"} ); +export const SyncingStatusType = new ContainerType( + { + /** Head slot node is trying to reach */ + headSlot: ssz.Slot, + /** How many slots node needs to process to reach head. 0 if synced. */ + syncDistance: ssz.Slot, + /** Set to true if the node is syncing, false if the node is synced. */ + isSyncing: ssz.Boolean, + /** Set to true if the node is optimistically tracking head. */ + isOptimistic: ssz.Boolean, + /** Set to true if the connected el client is offline */ + elOffline: ssz.Boolean, + }, + {jsonCase: "eth2"} +); + export type NetworkIdentity = ValueOf; export type PeerState = "disconnected" | "connecting" | "connected" | "disconnecting"; @@ -66,18 +82,7 @@ export type FilterGetPeers = { direction?: PeerDirection[]; }; -export type SyncingStatus = { - /** Head slot node is trying to reach */ - headSlot: string; - /** How many slots node needs to process to reach head. 0 if synced. */ - syncDistance: string; - /** Set to true if the node is syncing, false if the node is synced. */ - isSyncing: boolean; - /** Set to true if the node is optimistically tracking head. */ - isOptimistic: boolean; - /** Set to true if the connected el client is offline */ - elOffline: boolean; -}; +export type SyncingStatus = ValueOf; export enum NodeHealth { READY = HttpStatusCode.OK, @@ -243,7 +248,10 @@ export function getDefinitions(_config: ChainForkConfig): RouteDefinitions = { }, getSyncingStatus: { args: undefined, - res: {data: {headSlot: "1", syncDistance: "2", isSyncing: false, isOptimistic: true, elOffline: false}}, + res: {data: {headSlot: 1, syncDistance: 2, isSyncing: false, isOptimistic: true, elOffline: false}}, }, getHealth: { args: {syncingStatus: 206}, diff --git a/packages/beacon-node/src/sync/sync.ts b/packages/beacon-node/src/sync/sync.ts index c7f01e1eae78..cc8ddc6eb499 100644 --- a/packages/beacon-node/src/sync/sync.ts +++ b/packages/beacon-node/src/sync/sync.ts @@ -93,8 +93,8 @@ export class BeaconSync implements IBeaconSync { // If we are pre/at genesis, signal ready if (currentSlot <= GENESIS_SLOT) { return { - headSlot: "0", - syncDistance: "0", + headSlot: 0, + syncDistance: 0, isSyncing: false, isOptimistic: false, elOffline, @@ -107,16 +107,16 @@ export class BeaconSync implements IBeaconSync { case SyncState.SyncingHead: case SyncState.Stalled: return { - headSlot: String(head.slot), - syncDistance: String(currentSlot - head.slot), + headSlot: head.slot, + syncDistance: currentSlot - head.slot, isSyncing: true, isOptimistic: isOptimisticBlock(head), elOffline, }; case SyncState.Synced: return { - headSlot: String(head.slot), - syncDistance: "0", + headSlot: head.slot, + syncDistance: 0, isSyncing: false, isOptimistic: isOptimisticBlock(head), elOffline, diff --git a/packages/beacon-node/test/e2e/api/impl/beacon/node/endpoints.test.ts b/packages/beacon-node/test/e2e/api/impl/beacon/node/endpoints.test.ts index d85fdb80720f..2d2a0a37c59e 100644 --- a/packages/beacon-node/test/e2e/api/impl/beacon/node/endpoints.test.ts +++ b/packages/beacon-node/test/e2e/api/impl/beacon/node/endpoints.test.ts @@ -1,6 +1,7 @@ import {describe, beforeAll, afterAll, it, expect, vi} from "vitest"; import {createBeaconConfig} from "@lodestar/config"; import {chainConfig as chainConfigDef} from "@lodestar/config/default"; +import {routes} from "@lodestar/api"; import {ApiClient, getClient} from "@lodestar/api/beacon"; import {sleep} from "@lodestar/utils"; import {LogLevel, testLogger} from "../../../../../utils/logger.js"; @@ -46,9 +47,9 @@ describe("beacon node api", function () { it("should return valid syncing status", async () => { const res = await client.node.getSyncingStatus(); - expect(res.value()).toEqual({ - headSlot: "0", - syncDistance: "0", + expect(res.value()).toEqual({ + headSlot: 0, + syncDistance: 0, isSyncing: false, isOptimistic: false, elOffline: false, diff --git a/packages/cli/test/sim/endpoints.test.ts b/packages/cli/test/sim/endpoints.test.ts index a40a18e379eb..07cd5fec0cc4 100644 --- a/packages/cli/test/sim/endpoints.test.ts +++ b/packages/cli/test/sim/endpoints.test.ts @@ -107,8 +107,8 @@ await env.tracker.assert( await env.tracker.assert("BN Not Synced", async () => { const expectedSyncStatus: routes.node.SyncingStatus = { - headSlot: "2", - syncDistance: "0", + headSlot: 2, + syncDistance: 0, isSyncing: false, isOptimistic: false, elOffline: false, diff --git a/packages/state-transition/src/util/epoch.ts b/packages/state-transition/src/util/epoch.ts index bb66fb04eb94..ba182f627de0 100644 --- a/packages/state-transition/src/util/epoch.ts +++ b/packages/state-transition/src/util/epoch.ts @@ -70,3 +70,10 @@ export function computeSyncPeriodAtSlot(slot: Slot): SyncPeriod { export function computeSyncPeriodAtEpoch(epoch: Epoch): SyncPeriod { return Math.floor(epoch / EPOCHS_PER_SYNC_COMMITTEE_PERIOD); } + +/** + * Determine if the given slot is start slot of an epoch + */ +export function isStartSlotOfEpoch(slot: Slot): boolean { + return slot % SLOTS_PER_EPOCH === 0; +} diff --git a/packages/validator/src/metrics.ts b/packages/validator/src/metrics.ts index dc7d1a11ffac..a437328e8d5f 100644 --- a/packages/validator/src/metrics.ts +++ b/packages/validator/src/metrics.ts @@ -8,9 +8,7 @@ export enum MessageSource { export enum BeaconHealth { READY = 0, SYNCING = 1, - NOT_INITIALIZED_OR_ISSUES = 2, - UNKNOWN = 3, - ERROR = 4, + ERROR = 2, } export type Metrics = ReturnType; @@ -279,7 +277,7 @@ export function getMetrics(register: MetricsRegisterExtra, gitData: LodestarGitD beaconHealth: register.gauge({ name: "vc_beacon_health", - help: `Current health status of the beacon(s) the validator is connected too. ${renderEnumNumeric(BeaconHealth)}`, + help: `Current health status of the beacon(s) the validator is connected to. ${renderEnumNumeric(BeaconHealth)}`, }), restApiClient: { diff --git a/packages/validator/src/services/attestation.ts b/packages/validator/src/services/attestation.ts index 57a8a7621a97..9191b251a6de 100644 --- a/packages/validator/src/services/attestation.ts +++ b/packages/validator/src/services/attestation.ts @@ -10,6 +10,7 @@ import {ValidatorStore} from "./validatorStore.js"; import {AttestationDutiesService, AttDutyAndProof} from "./attestationDuties.js"; import {groupAttDutiesByCommitteeIndex} from "./utils.js"; import {ChainHeaderTracker} from "./chainHeaderTracker.js"; +import {SyncingStatusTracker} from "./syncingStatusTracker.js"; import {ValidatorEventEmitter} from "./emitter.js"; export type AttestationServiceOpts = { @@ -40,12 +41,22 @@ export class AttestationService { private readonly validatorStore: ValidatorStore, private readonly emitter: ValidatorEventEmitter, chainHeadTracker: ChainHeaderTracker, + syncingStatusTracker: SyncingStatusTracker, private readonly metrics: Metrics | null, private readonly opts?: AttestationServiceOpts ) { - this.dutiesService = new AttestationDutiesService(logger, api, clock, validatorStore, chainHeadTracker, metrics, { - distributedAggregationSelection: opts?.distributedAggregationSelection, - }); + this.dutiesService = new AttestationDutiesService( + logger, + api, + clock, + validatorStore, + chainHeadTracker, + syncingStatusTracker, + metrics, + { + distributedAggregationSelection: opts?.distributedAggregationSelection, + } + ); // At most every slot, check existing duties from AttestationDutiesService and run tasks clock.runEverySlot(this.runAttestationTasks); diff --git a/packages/validator/src/services/attestationDuties.ts b/packages/validator/src/services/attestationDuties.ts index 1f278aebbd89..ea82bf0a4c72 100644 --- a/packages/validator/src/services/attestationDuties.ts +++ b/packages/validator/src/services/attestationDuties.ts @@ -1,7 +1,7 @@ import {toHexString} from "@chainsafe/ssz"; import {SLOTS_PER_EPOCH} from "@lodestar/params"; import {sleep} from "@lodestar/utils"; -import {computeEpochAtSlot, isAggregatorFromCommitteeLength} from "@lodestar/state-transition"; +import {computeEpochAtSlot, isAggregatorFromCommitteeLength, isStartSlotOfEpoch} from "@lodestar/state-transition"; import {BLSSignature, Epoch, Slot, ValidatorIndex, RootHex} from "@lodestar/types"; import {ApiClient, routes} from "@lodestar/api"; import {batchItems, IClock, LoggerVc} from "../util/index.js"; @@ -9,6 +9,7 @@ import {PubkeyHex} from "../types.js"; import {Metrics} from "../metrics.js"; import {ValidatorStore} from "./validatorStore.js"; import {ChainHeaderTracker, HeadEventData} from "./chainHeaderTracker.js"; +import {SyncingStatusTracker} from "./syncingStatusTracker.js"; /** Only retain `HISTORICAL_DUTIES_EPOCHS` duties prior to the current epoch. */ const HISTORICAL_DUTIES_EPOCHS = 2; @@ -52,6 +53,7 @@ export class AttestationDutiesService { private clock: IClock, private readonly validatorStore: ValidatorStore, chainHeadTracker: ChainHeaderTracker, + syncingStatusTracker: SyncingStatusTracker, private readonly metrics: Metrics | null, private readonly opts?: AttestationDutiesServiceOpts ) { @@ -60,6 +62,12 @@ export class AttestationDutiesService { clock.runEveryEpoch(this.runDutiesTasks); clock.runEverySlot(this.prepareForNextEpoch); chainHeadTracker.runOnNewHead(this.onNewHead); + syncingStatusTracker.runOnResynced(async (slot) => { + // Skip on first slot of epoch since tasks are already scheduled + if (!isStartSlotOfEpoch(slot)) { + return this.runDutiesTasks(computeEpochAtSlot(slot)); + } + }); if (metrics) { metrics.attesterDutiesCount.addCollect(() => { diff --git a/packages/validator/src/services/syncCommittee.ts b/packages/validator/src/services/syncCommittee.ts index 06926724141c..c960adc6986b 100644 --- a/packages/validator/src/services/syncCommittee.ts +++ b/packages/validator/src/services/syncCommittee.ts @@ -11,6 +11,7 @@ import {SyncCommitteeDutiesService, SyncDutyAndProofs} from "./syncCommitteeDuti import {groupSyncDutiesBySubcommitteeIndex, SubcommitteeDuty} from "./utils.js"; import {ChainHeaderTracker} from "./chainHeaderTracker.js"; import {ValidatorEventEmitter} from "./emitter.js"; +import {SyncingStatusTracker} from "./syncingStatusTracker.js"; export type SyncCommitteeServiceOpts = { scAfterBlockDelaySlotFraction?: number; @@ -31,12 +32,22 @@ export class SyncCommitteeService { private readonly validatorStore: ValidatorStore, private readonly emitter: ValidatorEventEmitter, private readonly chainHeaderTracker: ChainHeaderTracker, + readonly syncingStatusTracker: SyncingStatusTracker, private readonly metrics: Metrics | null, private readonly opts?: SyncCommitteeServiceOpts ) { - this.dutiesService = new SyncCommitteeDutiesService(config, logger, api, clock, validatorStore, metrics, { - distributedAggregationSelection: opts?.distributedAggregationSelection, - }); + this.dutiesService = new SyncCommitteeDutiesService( + config, + logger, + api, + clock, + validatorStore, + syncingStatusTracker, + metrics, + { + distributedAggregationSelection: opts?.distributedAggregationSelection, + } + ); // At most every slot, check existing duties from SyncCommitteeDutiesService and run tasks clock.runEverySlot(this.runSyncCommitteeTasks); diff --git a/packages/validator/src/services/syncCommitteeDuties.ts b/packages/validator/src/services/syncCommitteeDuties.ts index edc62dea575c..dd663528f751 100644 --- a/packages/validator/src/services/syncCommitteeDuties.ts +++ b/packages/validator/src/services/syncCommitteeDuties.ts @@ -1,6 +1,12 @@ import {toHexString} from "@chainsafe/ssz"; import {EPOCHS_PER_SYNC_COMMITTEE_PERIOD, SYNC_COMMITTEE_SUBNET_SIZE} from "@lodestar/params"; -import {computeSyncPeriodAtEpoch, computeSyncPeriodAtSlot, isSyncCommitteeAggregator} from "@lodestar/state-transition"; +import { + computeEpochAtSlot, + computeSyncPeriodAtEpoch, + computeSyncPeriodAtSlot, + isStartSlotOfEpoch, + isSyncCommitteeAggregator, +} from "@lodestar/state-transition"; import {ChainForkConfig} from "@lodestar/config"; import {BLSSignature, Epoch, Slot, SyncPeriod, ValidatorIndex} from "@lodestar/types"; import {ApiClient, routes} from "@lodestar/api"; @@ -9,6 +15,7 @@ import {PubkeyHex} from "../types.js"; import {Metrics} from "../metrics.js"; import {ValidatorStore} from "./validatorStore.js"; import {syncCommitteeIndicesToSubnets} from "./utils.js"; +import {SyncingStatusTracker} from "./syncingStatusTracker.js"; /** Only retain `HISTORICAL_DUTIES_PERIODS` duties prior to the current periods. */ const HISTORICAL_DUTIES_PERIODS = 2; @@ -80,12 +87,19 @@ export class SyncCommitteeDutiesService { private readonly api: ApiClient, clock: IClock, private readonly validatorStore: ValidatorStore, + syncingStatusTracker: SyncingStatusTracker, metrics: Metrics | null, private readonly opts?: SyncCommitteeDutiesServiceOpts ) { // Running this task every epoch is safe since a re-org of many epochs is very unlikely // TODO: If the re-org event is reliable consider re-running then clock.runEveryEpoch(this.runDutiesTasks); + syncingStatusTracker.runOnResynced(async (slot) => { + // Skip on first slot of epoch since tasks are already scheduled + if (!isStartSlotOfEpoch(slot)) { + return this.runDutiesTasks(computeEpochAtSlot(slot)); + } + }); if (metrics) { metrics.syncCommitteeDutiesCount.addCollect(() => { diff --git a/packages/validator/src/services/syncingStatusTracker.ts b/packages/validator/src/services/syncingStatusTracker.ts new file mode 100644 index 000000000000..4c38e670092d --- /dev/null +++ b/packages/validator/src/services/syncingStatusTracker.ts @@ -0,0 +1,74 @@ +import {ApiClient, routes} from "@lodestar/api"; +import {Logger} from "@lodestar/utils"; +import {Slot} from "@lodestar/types"; +import {IClock} from "../util/clock.js"; +import {BeaconHealth, Metrics} from "../metrics.js"; + +export type SyncingStatus = routes.node.SyncingStatus; + +type RunOnResyncedFn = (slot: Slot, signal: AbortSignal) => Promise; + +/** + * Track the syncing status of connected beacon node(s) + */ +export class SyncingStatusTracker { + private prevSyncingStatus?: SyncingStatus | Error; + + private readonly fns: RunOnResyncedFn[] = []; + + constructor( + private readonly logger: Logger, + private readonly api: ApiClient, + private readonly clock: IClock, + private readonly metrics: Metrics | null + ) { + this.clock.runEverySlot(this.checkSyncingStatus); + } + + /** + * Run function when node status changes from syncing to synced + * + * Note: does not consider if execution client is offline or syncing and + * hence it is not useful to schedule tasks that require a non-optimistic node. + */ + runOnResynced(fn: RunOnResyncedFn): void { + this.fns.push(fn); + } + + private checkSyncingStatus = async (slot: Slot, signal: AbortSignal): Promise => { + try { + const syncingStatus = (await this.api.node.getSyncingStatus()).value(); + const {isSyncing, headSlot, syncDistance, isOptimistic, elOffline} = syncingStatus; + const prevErrorOrSyncing = this.prevSyncingStatus instanceof Error || this.prevSyncingStatus?.isSyncing === true; + + if (isSyncing === true) { + this.logger.warn("Node is syncing", {slot, headSlot, syncDistance}); + } else if (this.prevSyncingStatus === undefined || prevErrorOrSyncing) { + this.logger.info("Node is synced", {slot, headSlot, isOptimistic, elOffline}); + } + this.logger.verbose("Node syncing status", {slot, ...syncingStatus}); + + this.prevSyncingStatus = syncingStatus; + + this.metrics?.beaconHealth.set( + !isSyncing && !isOptimistic && !elOffline ? BeaconHealth.READY : BeaconHealth.SYNCING + ); + + if (prevErrorOrSyncing && isSyncing === false) { + await Promise.all( + this.fns.map((fn) => + fn(slot, signal).catch((e) => this.logger.error("Error calling resynced event handler", e)) + ) + ); + } + } catch (e) { + // Error likely due to node being offline. In any case, handle failure to + // check syncing status the same way as if node was previously syncing + this.prevSyncingStatus = e as Error; + + this.metrics?.beaconHealth.set(BeaconHealth.ERROR); + + this.logger.error("Failed to check syncing status", {slot}, this.prevSyncingStatus); + } + }; +} diff --git a/packages/validator/src/validator.ts b/packages/validator/src/validator.ts index 461762560fee..1732f54ababf 100644 --- a/packages/validator/src/validator.ts +++ b/packages/validator/src/validator.ts @@ -16,10 +16,11 @@ import {ExternalSignerOptions, pollExternalSignerPubkeys} from "./services/exter import {Interchange, InterchangeFormatVersion, ISlashingProtection} from "./slashingProtection/index.js"; import {assertEqualParams, getLoggerVc, NotEqualParamsError} from "./util/index.js"; import {ChainHeaderTracker} from "./services/chainHeaderTracker.js"; +import {SyncingStatusTracker} from "./services/syncingStatusTracker.js"; import {ValidatorEventEmitter} from "./services/emitter.js"; import {ValidatorStore, Signer, ValidatorProposerConfig, defaultOptions} from "./services/validatorStore.js"; import {LodestarValidatorDatabaseController, ProcessShutdownCallback, PubkeyHex} from "./types.js"; -import {BeaconHealth, Metrics} from "./metrics.js"; +import {Metrics} from "./metrics.js"; import {MetaDataRepository} from "./repositories/metaDataRepository.js"; import {DoppelgangerService} from "./services/doppelgangerService.js"; @@ -35,6 +36,7 @@ export type ValidatorModules = { api: ApiClient; clock: IClock; chainHeaderTracker: ChainHeaderTracker; + syncingStatusTracker: SyncingStatusTracker; logger: Logger; db: LodestarValidatorDatabaseController; metrics: Metrics | null; @@ -89,6 +91,7 @@ export class Validator { private readonly api: ApiClient; private readonly clock: IClock; private readonly chainHeaderTracker: ChainHeaderTracker; + readonly syncingStatusTracker: SyncingStatusTracker; private readonly logger: Logger; private readonly db: LodestarValidatorDatabaseController; private state: Status; @@ -106,6 +109,7 @@ export class Validator { api, clock, chainHeaderTracker, + syncingStatusTracker, logger, db, metrics, @@ -121,6 +125,7 @@ export class Validator { this.api = api; this.clock = clock; this.chainHeaderTracker = chainHeaderTracker; + this.syncingStatusTracker = syncingStatusTracker; this.logger = logger; this.controller = controller; this.db = db; @@ -145,12 +150,6 @@ export class Validator { if (metrics) { this.db.setMetrics(metrics.db); - - this.clock.runEverySlot(() => - this.fetchBeaconHealth() - .then((health) => metrics.beaconHealth.set(health)) - .catch((e) => this.logger.error("Error on fetchBeaconHealth", {}, e)) - ); } // "start" the validator @@ -225,6 +224,7 @@ export class Validator { emitter.setMaxListeners(Infinity); const chainHeaderTracker = new ChainHeaderTracker(logger, api, emitter); + const syncingStatusTracker = new SyncingStatusTracker(logger, api, clock, metrics); const blockProposingService = new BlockProposingService(config, loggerVc, api, clock, validatorStore, metrics, { useProduceBlockV3: opts.useProduceBlockV3, @@ -239,6 +239,7 @@ export class Validator { validatorStore, emitter, chainHeaderTracker, + syncingStatusTracker, metrics, { afterBlockDelaySlotFraction: opts.afterBlockDelaySlotFraction, @@ -255,6 +256,7 @@ export class Validator { validatorStore, emitter, chainHeaderTracker, + syncingStatusTracker, metrics, { scAfterBlockDelaySlotFraction: opts.scAfterBlockDelaySlotFraction, @@ -274,6 +276,7 @@ export class Validator { api, clock, chainHeaderTracker, + syncingStatusTracker, logger, db, metrics, @@ -382,21 +385,6 @@ export class Validator { return this.validatorStore.signVoluntaryExit(publicKey, validator.index, exitEpoch); } - - private async fetchBeaconHealth(): Promise { - try { - const {status: healthCode} = await this.api.node.getHealth(); - - if (healthCode === routes.node.NodeHealth.READY) return BeaconHealth.READY; - if (healthCode === routes.node.NodeHealth.SYNCING) return BeaconHealth.SYNCING; - if (healthCode === routes.node.NodeHealth.NOT_INITIALIZED_OR_ISSUES) - return BeaconHealth.NOT_INITIALIZED_OR_ISSUES; - else return BeaconHealth.UNKNOWN; - } catch (e) { - // TODO: Filter by network error type - return BeaconHealth.ERROR; - } - } } /** Assert the same genesisValidatorRoot and genesisTime */ diff --git a/packages/validator/test/unit/services/attestation.test.ts b/packages/validator/test/unit/services/attestation.test.ts index 0ffec323ee30..11779ee496b3 100644 --- a/packages/validator/test/unit/services/attestation.test.ts +++ b/packages/validator/test/unit/services/attestation.test.ts @@ -10,12 +10,14 @@ import {getApiClientStub, mockApiResponse} from "../../utils/apiStub.js"; import {loggerVc} from "../../utils/logger.js"; import {ClockMock} from "../../utils/clock.js"; import {ChainHeaderTracker} from "../../../src/services/chainHeaderTracker.js"; +import {SyncingStatusTracker} from "../../../src/services/syncingStatusTracker.js"; import {ValidatorEventEmitter} from "../../../src/services/emitter.js"; import {ZERO_HASH, ZERO_HASH_HEX} from "../../utils/types.js"; vi.mock("../../../src/services/validatorStore.js"); vi.mock("../../../src/services/emitter.js"); vi.mock("../../../src/services/chainHeaderTracker.js"); +vi.mock("../../../src/services/syncingStatusTracker.js"); describe("AttestationService", function () { const api = getApiClientStub(); @@ -24,6 +26,8 @@ describe("AttestationService", function () { const emitter = vi.mocked(new ValidatorEventEmitter()); // @ts-expect-error - Mocked class don't need parameters const chainHeadTracker = vi.mocked(new ChainHeaderTracker()); + // @ts-expect-error - Mocked class don't need parameters + const syncingStatusTracker = vi.mocked(new SyncingStatusTracker()); let pubkeys: Uint8Array[]; // Initialize pubkeys in before() so bls is already initialized @@ -62,6 +66,7 @@ describe("AttestationService", function () { validatorStore, emitter, chainHeadTracker, + syncingStatusTracker, null, opts ); diff --git a/packages/validator/test/unit/services/attestationDuties.test.ts b/packages/validator/test/unit/services/attestationDuties.test.ts index ad54c0735eea..751ebcf11205 100644 --- a/packages/validator/test/unit/services/attestationDuties.test.ts +++ b/packages/validator/test/unit/services/attestationDuties.test.ts @@ -13,6 +13,7 @@ import {loggerVc} from "../../utils/logger.js"; import {ClockMock} from "../../utils/clock.js"; import {initValidatorStore} from "../../utils/validatorStore.js"; import {ChainHeaderTracker} from "../../../src/services/chainHeaderTracker.js"; +import {SyncingStatusTracker} from "../../../src/services/syncingStatusTracker.js"; import {ZERO_HASH_HEX} from "../../utils/types.js"; vi.mock("../../../src/services/chainHeaderTracker.js"); @@ -45,10 +46,6 @@ describe("AttestationDutiesService", function () { let controller: AbortController; // To stop clock beforeEach(() => { controller = new AbortController(); - }); - afterEach(() => controller.abort()); - - it("Should fetch indexes and duties", async function () { // Reply with an active validator that has an index const validatorResponse = { ...defaultValidator, @@ -58,7 +55,13 @@ describe("AttestationDutiesService", function () { api.beacon.getStateValidators.mockResolvedValue( mockApiResponse({data: [validatorResponse], meta: {executionOptimistic: false, finalized: false}}) ); + }); + afterEach(() => { + vi.restoreAllMocks(); + controller.abort(); + }); + it("Should fetch indexes and duties", async function () { // Reply with some duties const slot = 1; const epoch = computeEpochAtSlot(slot); @@ -78,9 +81,18 @@ describe("AttestationDutiesService", function () { // Accept all subscriptions api.validator.prepareBeaconCommitteeSubnet.mockResolvedValue(mockApiResponse({})); - // Clock will call runAttesterDutiesTasks() immediately + // Clock will call runDutiesTasks() immediately const clock = new ClockMock(); - const dutiesService = new AttestationDutiesService(loggerVc, api, clock, validatorStore, chainHeadTracker, null); + const syncingStatusTracker = new SyncingStatusTracker(loggerVc, api, clock, null); + const dutiesService = new AttestationDutiesService( + loggerVc, + api, + clock, + validatorStore, + chainHeadTracker, + syncingStatusTracker, + null + ); // Trigger clock onSlot for slot 0 await clock.tickEpochFns(0, controller.signal); @@ -107,16 +119,6 @@ describe("AttestationDutiesService", function () { }); it("Should remove signer from attestation duties", async function () { - // Reply with an active validator that has an index - const validatorResponse = { - ...defaultValidator, - index, - validator: {...defaultValidator.validator, pubkey: pubkeys[0]}, - }; - api.beacon.getStateValidators.mockResolvedValue( - mockApiResponse({data: [validatorResponse], meta: {executionOptimistic: false, finalized: false}}) - ); - // Reply with some duties const slot = 1; const duty: routes.validator.AttesterDuty = { @@ -135,9 +137,18 @@ describe("AttestationDutiesService", function () { // Accept all subscriptions api.validator.prepareBeaconCommitteeSubnet.mockResolvedValue(mockApiResponse({})); - // Clock will call runAttesterDutiesTasks() immediately + // Clock will call runDutiesTasks() immediately const clock = new ClockMock(); - const dutiesService = new AttestationDutiesService(loggerVc, api, clock, validatorStore, chainHeadTracker, null); + const syncingStatusTracker = new SyncingStatusTracker(loggerVc, api, clock, null); + const dutiesService = new AttestationDutiesService( + loggerVc, + api, + clock, + validatorStore, + chainHeadTracker, + syncingStatusTracker, + null + ); // Trigger clock onSlot for slot 0 await clock.tickEpochFns(0, controller.signal); @@ -153,4 +164,81 @@ describe("AttestationDutiesService", function () { dutiesService.removeDutiesForKey(toHexString(pubkeys[0])); expect(Object.fromEntries(dutiesService["dutiesByIndexByEpoch"])).toEqual({}); }); + + it("Should fetch duties when node is resynced", async function () { + // Node is syncing + api.node.getSyncingStatus.mockResolvedValue( + mockApiResponse({data: {headSlot: 0, syncDistance: 1, isSyncing: true, isOptimistic: false, elOffline: false}}) + ); + api.validator.getAttesterDuties.mockRejectedValue(Error("Node is syncing")); + api.validator.prepareBeaconCommitteeSubnet.mockRejectedValue(Error("Node is syncing")); + + // Clock will call runDutiesTasks() immediately + const clock = new ClockMock(); + const syncingStatusTracker = new SyncingStatusTracker(loggerVc, api, clock, null); + const dutiesService = new AttestationDutiesService( + loggerVc, + api, + clock, + validatorStore, + chainHeadTracker, + syncingStatusTracker, + null + ); + + // Trigger clock for slot and epoch + await clock.tickEpochFns(0, controller.signal); + await clock.tickSlotFns(1, controller.signal); + + const dutySlot = 3; + const epoch = computeEpochAtSlot(dutySlot); + + // Duties for slot should be empty as node is still syncing + expect(dutiesService.getDutiesAtSlot(dutySlot)).toEqual([]); + + // Node is synced now + api.node.getSyncingStatus.mockResolvedValue( + mockApiResponse({data: {headSlot: 1, syncDistance: 0, isSyncing: false, isOptimistic: false, elOffline: false}}) + ); + + // Reply with some duties on next call + const duty: routes.validator.AttesterDuty = { + slot: dutySlot, + committeeIndex: 1, + committeeLength: 120, + committeesAtSlot: 120, + validatorCommitteeIndex: 1, + validatorIndex: index, + pubkey: pubkeys[0], + }; + api.validator.getAttesterDuties.mockResolvedValue( + mockApiResponse({data: [duty], meta: {dependentRoot: ZERO_HASH_HEX, executionOptimistic: false}}) + ); + + // Accept all subscriptions + api.validator.prepareBeaconCommitteeSubnet.mockResolvedValue(mockApiResponse({})); + + // Only tick clock for slot to not trigger regular polling + await clock.tickSlotFns(2, controller.signal); + + // Validator index should be persisted + expect(validatorStore.getAllLocalIndices()).toEqual([index]); + expect(validatorStore.getPubkeyOfIndex(index)).toBe(toHexString(pubkeys[0])); + + // Duties for this and next epoch should be persisted + expect(Object.fromEntries(dutiesService["dutiesByIndexByEpoch"].get(epoch)?.dutiesByIndex || new Map())).toEqual({ + // Since the ZERO_HASH won't pass the isAggregator test, selectionProof is null + [index]: {duty, selectionProof: null}, + }); + expect( + Object.fromEntries(dutiesService["dutiesByIndexByEpoch"].get(epoch + 1)?.dutiesByIndex || new Map()) + ).toEqual({ + // Since the ZERO_HASH won't pass the isAggregator test, selectionProof is null + [index]: {duty, selectionProof: null}, + }); + + expect(dutiesService.getDutiesAtSlot(dutySlot)).toEqual([{duty, selectionProof: null}]); + + expect(api.validator.prepareBeaconCommitteeSubnet).toHaveBeenCalledOnce(); + }); }); diff --git a/packages/validator/test/unit/services/syncCommitteDuties.test.ts b/packages/validator/test/unit/services/syncCommitteDuties.test.ts index 52f2071f9102..10bac3ab2fe9 100644 --- a/packages/validator/test/unit/services/syncCommitteDuties.test.ts +++ b/packages/validator/test/unit/services/syncCommitteDuties.test.ts @@ -1,4 +1,4 @@ -import {describe, it, expect, beforeAll, beforeEach, afterEach} from "vitest"; +import {describe, it, expect, beforeAll, beforeEach, afterEach, vi} from "vitest"; import {when} from "vitest-when"; import {toBufferBE} from "bigint-buffer"; import {toHexString} from "@chainsafe/ssz"; @@ -13,6 +13,7 @@ import { SyncDutySubnet, } from "../../../src/services/syncCommitteeDuties.js"; import {ValidatorStore} from "../../../src/services/validatorStore.js"; +import {SyncingStatusTracker} from "../../../src/services/syncingStatusTracker.js"; import {getApiClientStub, mockApiResponse} from "../../utils/apiStub.js"; import {loggerVc} from "../../utils/logger.js"; import {ClockMock} from "../../utils/clock.js"; @@ -63,7 +64,10 @@ describe("SyncCommitteeDutiesService", function () { mockApiResponse({data: validatorResponses, meta: {executionOptimistic: false, finalized: false}}) ); }); - afterEach(() => controller.abort()); + afterEach(() => { + vi.restoreAllMocks(); + controller.abort(); + }); it("Should fetch indexes and duties", async function () { // Reply with some duties @@ -80,14 +84,22 @@ describe("SyncCommitteeDutiesService", function () { // Accept all subscriptions api.validator.prepareSyncCommitteeSubnets.mockResolvedValue(mockApiResponse({})); - // Clock will call runAttesterDutiesTasks() immediately + // Clock will call runDutiesTasks() immediately const clock = new ClockMock(); - const dutiesService = new SyncCommitteeDutiesService(altair0Config, loggerVc, api, clock, validatorStore, null); + const syncingStatusTracker = new SyncingStatusTracker(loggerVc, api, clock, null); + const dutiesService = new SyncCommitteeDutiesService( + altair0Config, + loggerVc, + api, + clock, + validatorStore, + syncingStatusTracker, + null + ); // Trigger clock onSlot for slot 0 await clock.tickEpochFns(0, controller.signal); - // Validator index should be persisted // Validator index should be persisted expect(validatorStore.getAllLocalIndices()).toEqual(indices); for (let i = 0; i < indices.length; i++) { @@ -107,9 +119,9 @@ describe("SyncCommitteeDutiesService", function () { 1: {[indices[0]]: {duty: toSyncDutySubnet(duty)}}, } as typeof dutiesByIndexByPeriodObj); - expect(await dutiesService.getDutiesAtSlot(slot)).toEqual([ + expect(await dutiesService.getDutiesAtSlot(slot)).toEqual([ {duty: toSyncDutySubnet(duty), selectionProofs: [{selectionProof: null, subcommitteeIndex: 0}]}, - ] as SyncDutyAndProofs[]); + ]); expect(api.validator.prepareSyncCommitteeSubnets).toHaveBeenCalledOnce(); }); @@ -143,9 +155,18 @@ describe("SyncCommitteeDutiesService", function () { .calledWith({epoch: 1, indices}) .thenResolve(mockApiResponse({data: [duty2], meta: {executionOptimistic: false}})); - // Clock will call runAttesterDutiesTasks() immediately + // Clock will call runDutiesTasks() immediately const clock = new ClockMock(); - const dutiesService = new SyncCommitteeDutiesService(altair0Config, loggerVc, api, clock, validatorStore, null); + const syncingStatusTracker = new SyncingStatusTracker(loggerVc, api, clock, null); + const dutiesService = new SyncCommitteeDutiesService( + altair0Config, + loggerVc, + api, + clock, + validatorStore, + syncingStatusTracker, + null + ); // Trigger clock onSlot for slot 0 await clock.tickEpochFns(0, controller.signal); @@ -196,9 +217,18 @@ describe("SyncCommitteeDutiesService", function () { // Accept all subscriptions api.validator.prepareSyncCommitteeSubnets.mockResolvedValue(mockApiResponse({})); - // Clock will call runAttesterDutiesTasks() immediately + // Clock will call runDutiesTasks() immediately const clock = new ClockMock(); - const dutiesService = new SyncCommitteeDutiesService(altair0Config, loggerVc, api, clock, validatorStore, null); + const syncingStatusTracker = new SyncingStatusTracker(loggerVc, api, clock, null); + const dutiesService = new SyncCommitteeDutiesService( + altair0Config, + loggerVc, + api, + clock, + validatorStore, + syncingStatusTracker, + null + ); // Trigger clock onSlot for slot 0 await clock.tickEpochFns(0, controller.signal); @@ -236,6 +266,83 @@ describe("SyncCommitteeDutiesService", function () { 1: {[indices[1]]: {duty: toSyncDutySubnet(duty2)}}, } as typeof dutiesByIndexByPeriodObjAfterRemoval); }); + + it("Should fetch duties when node is resynced", async function () { + // Node is syncing + api.node.getSyncingStatus.mockResolvedValue( + mockApiResponse({data: {headSlot: 0, syncDistance: 1, isSyncing: true, isOptimistic: false, elOffline: false}}) + ); + api.validator.getSyncCommitteeDuties.mockRejectedValue(Error("Node is syncing")); + api.validator.prepareSyncCommitteeSubnets.mockRejectedValue(Error("Node is syncing")); + + // Clock will call runDutiesTasks() immediately + const clock = new ClockMock(); + const syncingStatusTracker = new SyncingStatusTracker(loggerVc, api, clock, null); + const dutiesService = new SyncCommitteeDutiesService( + altair0Config, + loggerVc, + api, + clock, + validatorStore, + syncingStatusTracker, + null + ); + + // Trigger clock for slot and epoch + await clock.tickEpochFns(0, controller.signal); + await clock.tickSlotFns(1, controller.signal); + + const dutySlot = 1; + + // Duties for slot should be empty as node is still syncing + expect(await dutiesService.getDutiesAtSlot(dutySlot)).toEqual([]); + + // Node is synced now + api.node.getSyncingStatus.mockResolvedValue( + mockApiResponse({data: {headSlot: 1, syncDistance: 0, isSyncing: false, isOptimistic: false, elOffline: false}}) + ); + + // Reply with some duties + const duty: routes.validator.SyncDuty = { + pubkey: pubkeys[0], + validatorIndex: indices[0], + validatorSyncCommitteeIndices: [7], + }; + api.validator.getSyncCommitteeDuties.mockResolvedValue( + mockApiResponse({data: [duty], meta: {executionOptimistic: false}}) + ); + + // Accept all subscriptions + api.validator.prepareSyncCommitteeSubnets.mockResolvedValue(mockApiResponse({})); + + // Only tick clock for slot to not trigger regular polling + await clock.tickSlotFns(2, controller.signal); + + // Validator index should be persisted + expect(validatorStore.getAllLocalIndices()).toEqual(indices); + for (let i = 0; i < indices.length; i++) { + expect(validatorStore.getPubkeyOfIndex(indices[i])).toBe(toHexString(pubkeys[i])); + } + + // Duties for this and next epoch should be persisted + const dutiesByIndexByPeriodObj = Object.fromEntries( + Array.from(dutiesService["dutiesByIndexByPeriod"].entries()).map(([period, dutiesByIndex]) => [ + period, + Object.fromEntries(dutiesByIndex), + ]) + ); + + expect(dutiesByIndexByPeriodObj).toEqual({ + 0: {[indices[0]]: {duty: toSyncDutySubnet(duty)}}, + 1: {[indices[0]]: {duty: toSyncDutySubnet(duty)}}, + } as typeof dutiesByIndexByPeriodObj); + + expect(await dutiesService.getDutiesAtSlot(dutySlot)).toEqual([ + {duty: toSyncDutySubnet(duty), selectionProofs: [{selectionProof: null, subcommitteeIndex: 0}]}, + ]); + + expect(api.validator.prepareSyncCommitteeSubnets).toHaveBeenCalledOnce(); + }); }); function toSyncDutySubnet(duty: routes.validator.SyncDuty): SyncDutySubnet { diff --git a/packages/validator/test/unit/services/syncCommittee.test.ts b/packages/validator/test/unit/services/syncCommittee.test.ts index c65912f12e9a..66e63ab72a6c 100644 --- a/packages/validator/test/unit/services/syncCommittee.test.ts +++ b/packages/validator/test/unit/services/syncCommittee.test.ts @@ -12,12 +12,14 @@ import {getApiClientStub, mockApiResponse} from "../../utils/apiStub.js"; import {loggerVc} from "../../utils/logger.js"; import {ClockMock} from "../../utils/clock.js"; import {ChainHeaderTracker} from "../../../src/services/chainHeaderTracker.js"; +import {SyncingStatusTracker} from "../../../src/services/syncingStatusTracker.js"; import {ZERO_HASH} from "../../utils/types.js"; import {ValidatorEventEmitter} from "../../../src/services/emitter.js"; vi.mock("../../../src/services/validatorStore.js"); vi.mock("../../../src/services/emitter.js"); vi.mock("../../../src/services/chainHeaderTracker.js"); +vi.mock("../../../src/services/syncingStatusTracker.js"); /* eslint-disable @typescript-eslint/naming-convention */ @@ -28,6 +30,8 @@ describe("SyncCommitteeService", function () { const emitter = vi.mocked(new ValidatorEventEmitter()); // @ts-expect-error - Mocked class don't need parameters const chainHeaderTracker = vi.mocked(new ChainHeaderTracker()); + // @ts-expect-error - Mocked class don't need parameters + const syncingStatusTracker = vi.mocked(new SyncingStatusTracker()); let pubkeys: Uint8Array[]; // Initialize pubkeys in before() so bls is already initialized const config = createChainForkConfig({ @@ -71,6 +75,7 @@ describe("SyncCommitteeService", function () { validatorStore, emitter, chainHeaderTracker, + syncingStatusTracker, null, opts ); diff --git a/packages/validator/test/unit/services/syncingStatusTracker.test.ts b/packages/validator/test/unit/services/syncingStatusTracker.test.ts new file mode 100644 index 000000000000..59029e1b9c51 --- /dev/null +++ b/packages/validator/test/unit/services/syncingStatusTracker.test.ts @@ -0,0 +1,149 @@ +import {describe, it, expect, vi, beforeEach, afterEach, MockedFunction} from "vitest"; +import {getApiClientStub, mockApiResponse} from "../../utils/apiStub.js"; +import {getMockedLogger} from "../../utils/logger.js"; +import {ClockMock} from "../../utils/clock.js"; +import {SyncingStatus, SyncingStatusTracker} from "../../../src/services/syncingStatusTracker.js"; + +describe("SyncingStatusTracker", function () { + const api = getApiClientStub(); + const logger = getMockedLogger(); + + let controller: AbortController; + let clock: ClockMock; + let syncingStatusTracker: SyncingStatusTracker; + let callOnResynced: MockedFunction<() => Promise>; + + beforeEach(() => { + controller = new AbortController(); + clock = new ClockMock(); + syncingStatusTracker = new SyncingStatusTracker(logger, api, clock, null); + callOnResynced = vi.fn().mockResolvedValue(undefined); + syncingStatusTracker.runOnResynced(callOnResynced); + }); + + afterEach(() => { + vi.resetAllMocks(); + controller.abort(); + }); + + it("should handle transition from syncing to synced", async function () { + // Node is syncing + const syncing: SyncingStatus = { + headSlot: 0, + syncDistance: 1, + isSyncing: true, + isOptimistic: false, + elOffline: false, + }; + api.node.getSyncingStatus.mockResolvedValue(mockApiResponse({data: syncing})); + + await clock.tickSlotFns(1, controller.signal); + + expect(logger.warn).toHaveBeenCalledWith("Node is syncing", {slot: 1, headSlot: 0, syncDistance: 1}); + expect(logger.verbose).toHaveBeenCalledWith("Node syncing status", { + slot: 1, + headSlot: 0, + syncDistance: 1, + isSyncing: true, + isOptimistic: false, + elOffline: false, + }); + expect(syncingStatusTracker["prevSyncingStatus"]).toBe(syncing); + + // Transition to synced + const synced: SyncingStatus = { + headSlot: 2, + syncDistance: 0, + isSyncing: false, + isOptimistic: false, + elOffline: false, + }; + api.node.getSyncingStatus.mockResolvedValue(mockApiResponse({data: synced})); + + await clock.tickSlotFns(2, controller.signal); + + expect(logger.info).toHaveBeenCalledWith("Node is synced", { + slot: 2, + headSlot: 2, + isOptimistic: false, + elOffline: false, + }); + expect(logger.verbose).toHaveBeenCalledWith("Node syncing status", { + slot: 2, + headSlot: 2, + syncDistance: 0, + isSyncing: false, + isOptimistic: false, + elOffline: false, + }); + expect(syncingStatusTracker["prevSyncingStatus"]).toBe(synced); + expect(callOnResynced).toHaveBeenCalledOnce(); + }); + + it("should handle errors when checking syncing status", async function () { + // Node is offline + const error = new Error("ECONNREFUSED"); + api.node.getSyncingStatus.mockRejectedValue(error); + + await clock.tickSlotFns(1, controller.signal); + + expect(logger.error).toHaveBeenCalledWith("Failed to check syncing status", {slot: 1}, error); + expect(syncingStatusTracker["prevSyncingStatus"]).toBe(error); + expect(callOnResynced).not.toHaveBeenCalled(); + }); + + it("should not call scheduled tasks if already synced", async function () { + // Node is already synced + const syncedHead1: SyncingStatus = { + headSlot: 1, + syncDistance: 0, + isSyncing: false, + isOptimistic: false, + elOffline: false, + }; + api.node.getSyncingStatus.mockResolvedValue(mockApiResponse({data: syncedHead1})); + + await clock.tickSlotFns(1, controller.signal); + + expect(logger.info).toHaveBeenCalledWith("Node is synced", { + slot: 1, + headSlot: 1, + isOptimistic: false, + elOffline: false, + }); + expect(logger.verbose).toHaveBeenCalledWith("Node syncing status", { + slot: 1, + headSlot: 1, + syncDistance: 0, + isSyncing: false, + isOptimistic: false, + elOffline: false, + }); + expect(syncingStatusTracker["prevSyncingStatus"]).toBe(syncedHead1); + + // Still synced on next tick + const syncedHead2: SyncingStatus = { + headSlot: 2, + syncDistance: 0, + isSyncing: false, + isOptimistic: false, + elOffline: false, + }; + api.node.getSyncingStatus.mockResolvedValue(mockApiResponse({data: syncedHead2})); + + await clock.tickSlotFns(2, controller.signal); + + // info log should only be printed out once, not every slot + expect(logger.info).toHaveBeenCalledOnce(); + expect(logger.verbose).toHaveBeenCalledWith("Node syncing status", { + slot: 2, + headSlot: 2, + syncDistance: 0, + isSyncing: false, + isOptimistic: false, + elOffline: false, + }); + expect(syncingStatusTracker["prevSyncingStatus"]).toBe(syncedHead2); + expect(callOnResynced).not.toHaveBeenCalled(); + }); +}); diff --git a/packages/validator/test/unit/utils/metrics.test.ts b/packages/validator/test/unit/utils/metrics.test.ts index 695e8731b7f6..de4010761001 100644 --- a/packages/validator/test/unit/utils/metrics.test.ts +++ b/packages/validator/test/unit/utils/metrics.test.ts @@ -3,6 +3,6 @@ import {BeaconHealth, renderEnumNumeric} from "../../../src/metrics.js"; describe("renderEnumNumeric", () => { it("BeaconHealth", () => { - expect(renderEnumNumeric(BeaconHealth)).toBe("READY=0, SYNCING=1, NOT_INITIALIZED_OR_ISSUES=2, UNKNOWN=3, ERROR=4"); + expect(renderEnumNumeric(BeaconHealth)).toBe("READY=0, SYNCING=1, ERROR=2"); }); }); diff --git a/packages/validator/test/utils/apiStub.ts b/packages/validator/test/utils/apiStub.ts index ac41c7145128..3c1d80fff75d 100644 --- a/packages/validator/test/utils/apiStub.ts +++ b/packages/validator/test/utils/apiStub.ts @@ -21,6 +21,9 @@ export function getApiClientStub(): ApiClientStub { submitPoolSyncCommitteeSignatures: vi.fn(), submitPoolAttestations: vi.fn(), }, + node: { + getSyncingStatus: vi.fn(), + }, validator: { getProposerDuties: vi.fn(), getAttesterDuties: vi.fn(), diff --git a/packages/validator/test/utils/logger.ts b/packages/validator/test/utils/logger.ts index 8eaed6dfe6c1..44f5190dfe79 100644 --- a/packages/validator/test/utils/logger.ts +++ b/packages/validator/test/utils/logger.ts @@ -1,3 +1,5 @@ +import {vi, Mocked} from "vitest"; +import {Logger} from "@lodestar/logger"; import {getEnvLogger} from "@lodestar/logger/env"; import {getLoggerVc} from "../../src/util/index.js"; import {ClockMock} from "./clock.js"; @@ -13,3 +15,15 @@ import {ClockMock} from "./clock.js"; export const testLogger = getEnvLogger; export const loggerVc = getLoggerVc(getEnvLogger(), new ClockMock()); + +export type MockedLogger = Mocked; + +export function getMockedLogger(): MockedLogger { + return { + debug: vi.fn(), + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + verbose: vi.fn(), + }; +}