Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: track time to stable mesh for aggregator duties #5897

Merged
merged 2 commits into from
Aug 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions packages/beacon-node/src/network/core/metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,20 @@ export function createNetworkCoreMetrics(register: RegistryMetricCreator) {
name: "lodestar_attnets_service_committee_subscriptions_total",
help: "Count of committee subscriptions",
}),
subscriptionsCommitteeMeshPeers: register.histogram<"subnet">({
name: "lodestar_attnets_service_committee_subscriptions_mesh_peers",
help: "Histogram of mesh peers per committee subscription",
labelNames: ["subnet"],
// Dlow = 6, D = 8, DHi = 12 plus 2 more buckets
buckets: [0, 4, 6, 8, 12],
}),
subscriptionsCommitteeTimeToStableMesh: register.histogram<"subnet">({
name: "lodestar_attnets_service_committee_subscriptions_time_to_stable_mesh_seconds",
help: "Histogram of time until committee subscription is considered healthy (>= 6 mesh peers)",
labelNames: ["subnet"],
// we subscribe 2 slots = 24s before aggregator duty
buckets: [0, 6, 12, 18, 24],
}),
subscriptionsRandom: register.gauge({
name: "lodestar_attnets_service_random_subscriptions_total",
help: "Count of random subscriptions",
Expand Down
105 changes: 90 additions & 15 deletions packages/beacon-node/src/network/subnets/dllAttnetsService.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import {computeStartSlotAtEpoch} from "@lodestar/state-transition";
import {ChainForkConfig} from "@lodestar/config";
import {
ATTESTATION_SUBNET_COUNT,
EPOCHS_PER_RANDOM_SUBNET_SUBSCRIPTION,
Expand All @@ -8,12 +6,15 @@ import {
} from "@lodestar/params";
import {Epoch, Slot, ssz} from "@lodestar/types";
import {Logger, MapDef} from "@lodestar/utils";
import {BeaconConfig} from "@lodestar/config";
import {ClockEvent, IClock} from "../../util/clock.js";
import {GossipType} from "../gossip/index.js";
import {MetadataController} from "../metadata.js";
import {SubnetMap, RequestedSubnet} from "../peers/utils/index.js";
import {getActiveForks} from "../forks.js";
import {NetworkCoreMetrics} from "../core/metrics.js";
import {stringifyGossipTopic} from "../gossip/topic.js";
import {GOSSIP_D_LOW} from "../gossip/scoringParameters.js";
import {IAttnetsService, CommitteeSubscription, SubnetsServiceOpts, GossipSubscriber, NodeId} from "./interface.js";
import {computeSubscribedSubnet} from "./util.js";

Expand All @@ -24,6 +25,15 @@ enum SubnetSource {
longLived = "long_lived",
}

type Subnet = number;
// map of subnet to time to form stable mesh as seconds, null if not yet formed
type AggregatorDutyInfo = Map<Subnet, number | null>;

/**
* This value means node is not able to form stable mesh.
*/
const NOT_ABLE_TO_FORM_STABLE_MESH_SEC = -1;

/**
* Manage deleterministic long lived (DLL) subnets and short lived subnets.
* - PeerManager uses attnetsService to know which peers are required for duties and long lived subscriptions
Expand All @@ -42,13 +52,13 @@ export class DLLAttnetsService implements IAttnetsService {
/** ${SUBNETS_PER_NODE} long lived subscriptions, may overlap with `shortLivedSubscriptions` */
private longLivedSubscriptions = new Set<number>();
/**
* Map of an aggregator at a slot and subnet
* Map of an aggregator at a slot and AggregatorDutyInfo
* Used to determine if we should process an attestation.
*/
private aggregatorSlotSubnet = new MapDef<Slot, Set<number>>(() => new Set());
private aggregatorSlotSubnet = new MapDef<Slot, AggregatorDutyInfo>(() => new Map());

constructor(
private readonly config: ChainForkConfig,
private readonly config: BeaconConfig,
private readonly clock: IClock,
private readonly gossip: GossipSubscriber,
private readonly metadata: MetadataController,
Expand Down Expand Up @@ -108,7 +118,7 @@ export class DLLAttnetsService implements IAttnetsService {
this.committeeSubnets.request({subnet, toSlot: slot + 1});
if (isAggregator) {
// need exact slot here
this.aggregatorSlotSubnet.getOrDefault(slot).add(subnet);
this.aggregatorSlotSubnet.getOrDefault(slot).set(subnet, null);
}
}
}
Expand Down Expand Up @@ -154,26 +164,82 @@ export class DLLAttnetsService implements IAttnetsService {
* Run per slot.
* - Subscribe to gossip subnets 2 slots in advance
* - Unsubscribe from expired subnets
* - Track time to stable mesh if not yet formed
*/
private onSlot = (clockSlot: Slot): void => {
try {
for (const [dutiedSlot, subnets] of this.aggregatorSlotSubnet.entries()) {
setTimeout(
() => {
this.onHalfSlot(clockSlot);
},
this.config.SECONDS_PER_SLOT * 0.5 * 1000
);

for (const [dutiedSlot, dutiedInfo] of this.aggregatorSlotSubnet.entries()) {
if (dutiedSlot === clockSlot + this.opts.slotsToSubscribeBeforeAggregatorDuty) {
// Trigger gossip subscription first, in batch
if (subnets.size > 0) {
this.subscribeToSubnets(Array.from(subnets), SubnetSource.committee);
if (dutiedInfo.size > 0) {
this.subscribeToSubnets(Array.from(dutiedInfo.keys()), SubnetSource.committee);
}
// Then, register the subscriptions
Array.from(subnets).map((subnet) => this.shortLivedSubscriptions.request({subnet, toSlot: dutiedSlot}));
for (const subnet of dutiedInfo.keys()) {
this.shortLivedSubscriptions.request({subnet, toSlot: dutiedSlot});
}
}
this.trackTimeToStableMesh(clockSlot, dutiedSlot, dutiedInfo);
}

this.unsubscribeExpiredCommitteeSubnets(clockSlot);
this.pruneExpiredAggregator(clockSlot);
} catch (e) {
this.logger.error("Error on AttnetsService.onSlot", {slot: clockSlot}, e as Error);
}
};

private onHalfSlot = (clockSlot: Slot): void => {
for (const [dutiedSlot, dutiedInfo] of this.aggregatorSlotSubnet.entries()) {
this.trackTimeToStableMesh(clockSlot, dutiedSlot, dutiedInfo);
}
};

/**
* Track time to form stable mesh if not yet formed
*/
private trackTimeToStableMesh(clockSlot: Slot, dutiedSlot: Slot, dutiedInfo: AggregatorDutyInfo): void {
if (dutiedSlot < clockSlot) {
// aggregator duty is expired, set timeToStableMesh to some big value so we know this value is not good
for (const [subnet, timeToFormMesh] of dutiedInfo.entries()) {
if (timeToFormMesh === null) {
dutiedInfo.set(subnet, NOT_ABLE_TO_FORM_STABLE_MESH_SEC);
this.metrics?.attnetsService.subscriptionsCommitteeTimeToStableMesh.observe(
{subnet},
NOT_ABLE_TO_FORM_STABLE_MESH_SEC
);
}
}
} else if (dutiedSlot <= clockSlot + this.opts.slotsToSubscribeBeforeAggregatorDuty) {
// aggregator duty is not expired, track time to stable mesh if this is the 1st time we see mesh peers>=Dlo (6)
for (const [subnet, timeToFormMesh] of dutiedInfo.entries()) {
if (timeToFormMesh === null) {
const topicStr = stringifyGossipTopic(this.config, {
type: gossipType,
fork: this.config.getForkName(dutiedSlot),
subnet,
});
const numMeshPeers = this.gossip.mesh.get(topicStr)?.size ?? 0;
if (numMeshPeers >= GOSSIP_D_LOW) {
const timeToStableMeshSec = this.clock.secFromSlot(
dutiedSlot - this.opts.slotsToSubscribeBeforeAggregatorDuty
);
// set to dutiedInfo so we'll not set to metrics again
dutiedInfo.set(subnet, timeToStableMeshSec);
this.metrics?.attnetsService.subscriptionsCommitteeTimeToStableMesh.observe({subnet}, timeToStableMeshSec);
}
}
}
}
}

/**
* Run per epoch, clean-up operations that are not urgent
* Subscribe to new random subnets every EPOCHS_PER_RANDOM_SUBNET_SUBSCRIPTION epochs
Expand All @@ -183,8 +249,6 @@ export class DLLAttnetsService implements IAttnetsService {
if (epoch % EPOCHS_PER_RANDOM_SUBNET_SUBSCRIPTION === 0) {
this.recomputeLongLivedSubnets();
}
const slot = computeStartSlotAtEpoch(epoch);
this.pruneExpiredAggregator(slot);
} catch (e) {
this.logger.error("Error on AttnetsService.onEpoch", {epoch}, e as Error);
}
Expand Down Expand Up @@ -245,9 +309,9 @@ export class DLLAttnetsService implements IAttnetsService {
* @param currentSlot
*/
private pruneExpiredAggregator(currentSlot: Slot): void {
for (const slot of this.aggregatorSlotSubnet.keys()) {
if (currentSlot > slot) {
this.aggregatorSlotSubnet.delete(slot);
for (const dutiedSlot of this.aggregatorSlotSubnet.keys()) {
if (currentSlot > dutiedSlot) {
this.aggregatorSlotSubnet.delete(dutiedSlot);
}
}
}
Expand Down Expand Up @@ -303,6 +367,17 @@ export class DLLAttnetsService implements IAttnetsService {
private onScrapeLodestarMetrics(metrics: NetworkCoreMetrics): void {
metrics.attnetsService.committeeSubnets.set(this.committeeSubnets.size);
metrics.attnetsService.subscriptionsCommittee.set(this.shortLivedSubscriptions.size);
// track short lived subnet status, >= 6 (Dlo) means healthy, otherwise unhealthy
const currentSlot = this.clock.currentSlot;
for (const {subnet} of this.shortLivedSubscriptions.getActiveTtl(currentSlot)) {
const topicStr = stringifyGossipTopic(this.config, {
type: gossipType,
fork: this.config.getForkName(currentSlot),
subnet,
});
const numMeshPeers = this.gossip.mesh.get(topicStr)?.size ?? 0;
metrics.attnetsService.subscriptionsCommitteeMeshPeers.observe({subnet}, numMeshPeers);
}
metrics.attnetsService.longLivedSubscriptions.set(this.longLivedSubscriptions.size);
let aggregatorCount = 0;
for (const subnets of this.aggregatorSlotSubnet.values()) {
Expand Down
4 changes: 4 additions & 0 deletions packages/beacon-node/src/network/subnets/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,13 @@ export type SubnetsServiceTestOpts = {
shuffleFn?: ShuffleFn;
};

type TopicStr = string;
type PeerIdStr = string;

export type GossipSubscriber = {
subscribeTopic(topic: GossipTopic): void;
unsubscribeTopic(topic: GossipTopic): void;
mesh: Map<TopicStr, Set<PeerIdStr>>;
};

// uint256 in the spec
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ describe("DLLAttnetsService", () => {
beforeEach(function () {
sandbox.useFakeTimers(Date.now());
gossipStub = sandbox.createStubInstance(Eth2Gossipsub) as SinonStubbedInstance<Eth2Gossipsub> & Eth2Gossipsub;
Object.defineProperty(gossipStub, "mesh", {value: new Map()});
clock = new Clock({
genesisTime: Math.floor(Date.now() / 1000),
config,
Expand Down
Loading