diff --git a/packages/beacon-node/src/network/core/metrics.ts b/packages/beacon-node/src/network/core/metrics.ts index 92791ec00d5a..78bc88d52fe7 100644 --- a/packages/beacon-node/src/network/core/metrics.ts +++ b/packages/beacon-node/src/network/core/metrics.ts @@ -237,6 +237,20 @@ export function createNetworkCoreMetrics(register: RegistryMetricCreator) { name: "lodestar_attnets_service_committee_subscriptions_total", help: "Count of committee subscriptions", }), + subscriptionsCommitteeMeshPeers: register.histogram<"subnet">({ + name: "lodestar_attnets_service_committee_subscriptions_mesh_peers", + help: "Histogram of mesh peers per committee subscription", + labelNames: ["subnet"], + // Dlow = 6, D = 8, DHi = 12 plus 2 more buckets + buckets: [0, 4, 6, 8, 12], + }), + subscriptionsCommitteeTimeToStableMesh: register.histogram<"subnet">({ + name: "lodestar_attnets_service_committee_subscriptions_time_to_stable_mesh_seconds", + help: "Histogram of time until committee subscription is considered healthy (>= 6 mesh peers)", + labelNames: ["subnet"], + // we subscribe 2 slots = 24s before aggregator duty + buckets: [0, 6, 12, 18, 24], + }), subscriptionsRandom: register.gauge({ name: "lodestar_attnets_service_random_subscriptions_total", help: "Count of random subscriptions", diff --git a/packages/beacon-node/src/network/subnets/dllAttnetsService.ts b/packages/beacon-node/src/network/subnets/dllAttnetsService.ts index ec996d9b9394..f7ae0e8d09c2 100644 --- a/packages/beacon-node/src/network/subnets/dllAttnetsService.ts +++ b/packages/beacon-node/src/network/subnets/dllAttnetsService.ts @@ -1,5 +1,3 @@ -import {computeStartSlotAtEpoch} from "@lodestar/state-transition"; -import {ChainForkConfig} from "@lodestar/config"; import { ATTESTATION_SUBNET_COUNT, EPOCHS_PER_RANDOM_SUBNET_SUBSCRIPTION, @@ -8,12 +6,15 @@ import { } from "@lodestar/params"; import {Epoch, Slot, ssz} from "@lodestar/types"; import {Logger, MapDef} from "@lodestar/utils"; +import {BeaconConfig} from "@lodestar/config"; import {ClockEvent, IClock} from "../../util/clock.js"; import {GossipType} from "../gossip/index.js"; import {MetadataController} from "../metadata.js"; import {SubnetMap, RequestedSubnet} from "../peers/utils/index.js"; import {getActiveForks} from "../forks.js"; import {NetworkCoreMetrics} from "../core/metrics.js"; +import {stringifyGossipTopic} from "../gossip/topic.js"; +import {GOSSIP_D_LOW} from "../gossip/scoringParameters.js"; import {IAttnetsService, CommitteeSubscription, SubnetsServiceOpts, GossipSubscriber, NodeId} from "./interface.js"; import {computeSubscribedSubnet} from "./util.js"; @@ -24,6 +25,15 @@ enum SubnetSource { longLived = "long_lived", } +type Subnet = number; +// map of subnet to time to form stable mesh as seconds, null if not yet formed +type AggregatorDutyInfo = Map; + +/** + * This value means node is not able to form stable mesh. + */ +const NOT_ABLE_TO_FORM_STABLE_MESH_SEC = -1; + /** * Manage deleterministic long lived (DLL) subnets and short lived subnets. * - PeerManager uses attnetsService to know which peers are required for duties and long lived subscriptions @@ -42,13 +52,13 @@ export class DLLAttnetsService implements IAttnetsService { /** ${SUBNETS_PER_NODE} long lived subscriptions, may overlap with `shortLivedSubscriptions` */ private longLivedSubscriptions = new Set(); /** - * Map of an aggregator at a slot and subnet + * Map of an aggregator at a slot and AggregatorDutyInfo * Used to determine if we should process an attestation. */ - private aggregatorSlotSubnet = new MapDef>(() => new Set()); + private aggregatorSlotSubnet = new MapDef(() => new Map()); constructor( - private readonly config: ChainForkConfig, + private readonly config: BeaconConfig, private readonly clock: IClock, private readonly gossip: GossipSubscriber, private readonly metadata: MetadataController, @@ -108,7 +118,7 @@ export class DLLAttnetsService implements IAttnetsService { this.committeeSubnets.request({subnet, toSlot: slot + 1}); if (isAggregator) { // need exact slot here - this.aggregatorSlotSubnet.getOrDefault(slot).add(subnet); + this.aggregatorSlotSubnet.getOrDefault(slot).set(subnet, null); } } } @@ -154,26 +164,82 @@ export class DLLAttnetsService implements IAttnetsService { * Run per slot. * - Subscribe to gossip subnets 2 slots in advance * - Unsubscribe from expired subnets + * - Track time to stable mesh if not yet formed */ private onSlot = (clockSlot: Slot): void => { try { - for (const [dutiedSlot, subnets] of this.aggregatorSlotSubnet.entries()) { + setTimeout( + () => { + this.onHalfSlot(clockSlot); + }, + this.config.SECONDS_PER_SLOT * 0.5 * 1000 + ); + + for (const [dutiedSlot, dutiedInfo] of this.aggregatorSlotSubnet.entries()) { if (dutiedSlot === clockSlot + this.opts.slotsToSubscribeBeforeAggregatorDuty) { // Trigger gossip subscription first, in batch - if (subnets.size > 0) { - this.subscribeToSubnets(Array.from(subnets), SubnetSource.committee); + if (dutiedInfo.size > 0) { + this.subscribeToSubnets(Array.from(dutiedInfo.keys()), SubnetSource.committee); } // Then, register the subscriptions - Array.from(subnets).map((subnet) => this.shortLivedSubscriptions.request({subnet, toSlot: dutiedSlot})); + for (const subnet of dutiedInfo.keys()) { + this.shortLivedSubscriptions.request({subnet, toSlot: dutiedSlot}); + } } + this.trackTimeToStableMesh(clockSlot, dutiedSlot, dutiedInfo); } this.unsubscribeExpiredCommitteeSubnets(clockSlot); + this.pruneExpiredAggregator(clockSlot); } catch (e) { this.logger.error("Error on AttnetsService.onSlot", {slot: clockSlot}, e as Error); } }; + private onHalfSlot = (clockSlot: Slot): void => { + for (const [dutiedSlot, dutiedInfo] of this.aggregatorSlotSubnet.entries()) { + this.trackTimeToStableMesh(clockSlot, dutiedSlot, dutiedInfo); + } + }; + + /** + * Track time to form stable mesh if not yet formed + */ + private trackTimeToStableMesh(clockSlot: Slot, dutiedSlot: Slot, dutiedInfo: AggregatorDutyInfo): void { + if (dutiedSlot < clockSlot) { + // aggregator duty is expired, set timeToStableMesh to some big value so we know this value is not good + for (const [subnet, timeToFormMesh] of dutiedInfo.entries()) { + if (timeToFormMesh === null) { + dutiedInfo.set(subnet, NOT_ABLE_TO_FORM_STABLE_MESH_SEC); + this.metrics?.attnetsService.subscriptionsCommitteeTimeToStableMesh.observe( + {subnet}, + NOT_ABLE_TO_FORM_STABLE_MESH_SEC + ); + } + } + } else if (dutiedSlot <= clockSlot + this.opts.slotsToSubscribeBeforeAggregatorDuty) { + // aggregator duty is not expired, track time to stable mesh if this is the 1st time we see mesh peers>=Dlo (6) + for (const [subnet, timeToFormMesh] of dutiedInfo.entries()) { + if (timeToFormMesh === null) { + const topicStr = stringifyGossipTopic(this.config, { + type: gossipType, + fork: this.config.getForkName(dutiedSlot), + subnet, + }); + const numMeshPeers = this.gossip.mesh.get(topicStr)?.size ?? 0; + if (numMeshPeers >= GOSSIP_D_LOW) { + const timeToStableMeshSec = this.clock.secFromSlot( + dutiedSlot - this.opts.slotsToSubscribeBeforeAggregatorDuty + ); + // set to dutiedInfo so we'll not set to metrics again + dutiedInfo.set(subnet, timeToStableMeshSec); + this.metrics?.attnetsService.subscriptionsCommitteeTimeToStableMesh.observe({subnet}, timeToStableMeshSec); + } + } + } + } + } + /** * Run per epoch, clean-up operations that are not urgent * Subscribe to new random subnets every EPOCHS_PER_RANDOM_SUBNET_SUBSCRIPTION epochs @@ -183,8 +249,6 @@ export class DLLAttnetsService implements IAttnetsService { if (epoch % EPOCHS_PER_RANDOM_SUBNET_SUBSCRIPTION === 0) { this.recomputeLongLivedSubnets(); } - const slot = computeStartSlotAtEpoch(epoch); - this.pruneExpiredAggregator(slot); } catch (e) { this.logger.error("Error on AttnetsService.onEpoch", {epoch}, e as Error); } @@ -245,9 +309,9 @@ export class DLLAttnetsService implements IAttnetsService { * @param currentSlot */ private pruneExpiredAggregator(currentSlot: Slot): void { - for (const slot of this.aggregatorSlotSubnet.keys()) { - if (currentSlot > slot) { - this.aggregatorSlotSubnet.delete(slot); + for (const dutiedSlot of this.aggregatorSlotSubnet.keys()) { + if (currentSlot > dutiedSlot) { + this.aggregatorSlotSubnet.delete(dutiedSlot); } } } @@ -303,6 +367,17 @@ export class DLLAttnetsService implements IAttnetsService { private onScrapeLodestarMetrics(metrics: NetworkCoreMetrics): void { metrics.attnetsService.committeeSubnets.set(this.committeeSubnets.size); metrics.attnetsService.subscriptionsCommittee.set(this.shortLivedSubscriptions.size); + // track short lived subnet status, >= 6 (Dlo) means healthy, otherwise unhealthy + const currentSlot = this.clock.currentSlot; + for (const {subnet} of this.shortLivedSubscriptions.getActiveTtl(currentSlot)) { + const topicStr = stringifyGossipTopic(this.config, { + type: gossipType, + fork: this.config.getForkName(currentSlot), + subnet, + }); + const numMeshPeers = this.gossip.mesh.get(topicStr)?.size ?? 0; + metrics.attnetsService.subscriptionsCommitteeMeshPeers.observe({subnet}, numMeshPeers); + } metrics.attnetsService.longLivedSubscriptions.set(this.longLivedSubscriptions.size); let aggregatorCount = 0; for (const subnets of this.aggregatorSlotSubnet.values()) { diff --git a/packages/beacon-node/src/network/subnets/interface.ts b/packages/beacon-node/src/network/subnets/interface.ts index e21d9ebb8d8c..8f9c859be210 100644 --- a/packages/beacon-node/src/network/subnets/interface.ts +++ b/packages/beacon-node/src/network/subnets/interface.ts @@ -38,9 +38,13 @@ export type SubnetsServiceTestOpts = { shuffleFn?: ShuffleFn; }; +type TopicStr = string; +type PeerIdStr = string; + export type GossipSubscriber = { subscribeTopic(topic: GossipTopic): void; unsubscribeTopic(topic: GossipTopic): void; + mesh: Map>; }; // uint256 in the spec diff --git a/packages/beacon-node/test/unit/network/subnets/dllAttnetsService.test.ts b/packages/beacon-node/test/unit/network/subnets/dllAttnetsService.test.ts index c98563c9d3e9..d769635d1afc 100644 --- a/packages/beacon-node/test/unit/network/subnets/dllAttnetsService.test.ts +++ b/packages/beacon-node/test/unit/network/subnets/dllAttnetsService.test.ts @@ -39,6 +39,7 @@ describe("DLLAttnetsService", () => { beforeEach(function () { sandbox.useFakeTimers(Date.now()); gossipStub = sandbox.createStubInstance(Eth2Gossipsub) as SinonStubbedInstance & Eth2Gossipsub; + Object.defineProperty(gossipStub, "mesh", {value: new Map()}); clock = new Clock({ genesisTime: Math.floor(Date.now() / 1000), config,