Skip to content

Commit

Permalink
Merge branch 'unstable' into mkeil/network-worker-new-space
Browse files Browse the repository at this point in the history
  • Loading branch information
matthewkeil committed Aug 23, 2023
2 parents be181e7 + 945f892 commit 9a66e0b
Show file tree
Hide file tree
Showing 22 changed files with 540 additions and 49 deletions.
20 changes: 18 additions & 2 deletions dashboards/lodestar_debug_gossipsub.json
Original file line number Diff line number Diff line change
Expand Up @@ -2414,23 +2414,39 @@
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
"exemplar": false,
"expr": "(sum(rate(gossipsub_async_validation_result_total{acceptance=\"ignore\"} [32m])) by (topic))\n/\n(sum(rate(gossipsub_async_validation_result_total[32m])) by (topic))",
"expr": "(sum(rate(gossipsub_ignored_messages_total[32m])) by (topic))\n/\n(\n sum(rate(gossipsub_msg_received_prevalidation_total[32m])) by (topic)\n)",
"interval": "",
"legendFormat": "IGNORE {{topic}}",
"range": true,
"refId": "A"
},
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
"exemplar": false,
"expr": "(sum(rate(gossipsub_async_validation_result_total{acceptance=\"reject\"} [32m])) by (topic))\n/\n(sum(rate(gossipsub_async_validation_result_total[32m])) by (topic))",
"expr": "(sum(rate(gossipsub_rejected_messages_total[32m])) by (topic))\n/\n(\n sum(rate(gossipsub_msg_received_prevalidation_total[32m])) by (topic)\n)",
"hide": false,
"interval": "",
"legendFormat": "REJECT {{topic}}",
"range": true,
"refId": "B"
},
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
"expr": "(sum(rate(gossipsub_unknown_validation_results_total[32m])) by (topic))\n/\n(\n sum(rate(gossipsub_msg_received_prevalidation_total[32m])) by (topic)\n)",
"hide": false,
"legendFormat": "UNKNOWN {{topic}}",
"range": true,
"refId": "C"
}
],
"title": "Async validation result - IGNORE / REJECT rates",
Expand Down
2 changes: 1 addition & 1 deletion packages/beacon-node/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@
"@chainsafe/bls": "7.1.1",
"@chainsafe/blst": "^0.2.9",
"@chainsafe/discv5": "^5.1.0",
"@chainsafe/libp2p-gossipsub": "^10.0.0",
"@chainsafe/libp2p-gossipsub": "^10.1.0",
"@chainsafe/libp2p-noise": "^13.0.0",
"@chainsafe/persistent-merkle-tree": "^0.5.0",
"@chainsafe/prometheus-gc-stats": "^1.0.0",
Expand Down
14 changes: 14 additions & 0 deletions packages/beacon-node/src/network/core/metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,20 @@ export function createNetworkCoreMetrics(register: RegistryMetricCreator) {
name: "lodestar_attnets_service_committee_subscriptions_total",
help: "Count of committee subscriptions",
}),
subscriptionsCommitteeMeshPeers: register.histogram<"subnet">({
name: "lodestar_attnets_service_committee_subscriptions_mesh_peers",
help: "Histogram of mesh peers per committee subscription",
labelNames: ["subnet"],
// Dlow = 6, D = 8, DHi = 12 plus 2 more buckets
buckets: [0, 4, 6, 8, 12],
}),
subscriptionsCommitteeTimeToStableMesh: register.histogram<"subnet">({
name: "lodestar_attnets_service_committee_subscriptions_time_to_stable_mesh_seconds",
help: "Histogram of time until committee subscription is considered healthy (>= 6 mesh peers)",
labelNames: ["subnet"],
// we subscribe 2 slots = 24s before aggregator duty
buckets: [0, 6, 12, 18, 24],
}),
subscriptionsRandom: register.gauge({
name: "lodestar_attnets_service_random_subscriptions_total",
help: "Count of random subscriptions",
Expand Down
2 changes: 2 additions & 0 deletions packages/beacon-node/src/network/options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,6 @@ export const defaultNetworkOptions: NetworkOptions = {
deterministicLongLivedAttnets: true,
// subscribe 2 slots before aggregator dutied slot to get stable mesh peers as monitored on goerli
slotsToSubscribeBeforeAggregatorDuty: 2,
// this should only be set to true if useWorker is true
beaconAttestationBatchValidation: true,
};
105 changes: 90 additions & 15 deletions packages/beacon-node/src/network/subnets/dllAttnetsService.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import {computeStartSlotAtEpoch} from "@lodestar/state-transition";
import {ChainForkConfig} from "@lodestar/config";
import {
ATTESTATION_SUBNET_COUNT,
EPOCHS_PER_RANDOM_SUBNET_SUBSCRIPTION,
Expand All @@ -8,12 +6,15 @@ import {
} from "@lodestar/params";
import {Epoch, Slot, ssz} from "@lodestar/types";
import {Logger, MapDef} from "@lodestar/utils";
import {BeaconConfig} from "@lodestar/config";
import {ClockEvent, IClock} from "../../util/clock.js";
import {GossipType} from "../gossip/index.js";
import {MetadataController} from "../metadata.js";
import {SubnetMap, RequestedSubnet} from "../peers/utils/index.js";
import {getActiveForks} from "../forks.js";
import {NetworkCoreMetrics} from "../core/metrics.js";
import {stringifyGossipTopic} from "../gossip/topic.js";
import {GOSSIP_D_LOW} from "../gossip/scoringParameters.js";
import {IAttnetsService, CommitteeSubscription, SubnetsServiceOpts, GossipSubscriber, NodeId} from "./interface.js";
import {computeSubscribedSubnet} from "./util.js";

Expand All @@ -24,6 +25,15 @@ enum SubnetSource {
longLived = "long_lived",
}

type Subnet = number;
// map of subnet to time to form stable mesh as seconds, null if not yet formed
type AggregatorDutyInfo = Map<Subnet, number | null>;

/**
* This value means node is not able to form stable mesh.
*/
const NOT_ABLE_TO_FORM_STABLE_MESH_SEC = -1;

/**
* Manage deleterministic long lived (DLL) subnets and short lived subnets.
* - PeerManager uses attnetsService to know which peers are required for duties and long lived subscriptions
Expand All @@ -42,13 +52,13 @@ export class DLLAttnetsService implements IAttnetsService {
/** ${SUBNETS_PER_NODE} long lived subscriptions, may overlap with `shortLivedSubscriptions` */
private longLivedSubscriptions = new Set<number>();
/**
* Map of an aggregator at a slot and subnet
* Map of an aggregator at a slot and AggregatorDutyInfo
* Used to determine if we should process an attestation.
*/
private aggregatorSlotSubnet = new MapDef<Slot, Set<number>>(() => new Set());
private aggregatorSlotSubnet = new MapDef<Slot, AggregatorDutyInfo>(() => new Map());

constructor(
private readonly config: ChainForkConfig,
private readonly config: BeaconConfig,
private readonly clock: IClock,
private readonly gossip: GossipSubscriber,
private readonly metadata: MetadataController,
Expand Down Expand Up @@ -108,7 +118,7 @@ export class DLLAttnetsService implements IAttnetsService {
this.committeeSubnets.request({subnet, toSlot: slot + 1});
if (isAggregator) {
// need exact slot here
this.aggregatorSlotSubnet.getOrDefault(slot).add(subnet);
this.aggregatorSlotSubnet.getOrDefault(slot).set(subnet, null);
}
}
}
Expand Down Expand Up @@ -154,26 +164,82 @@ export class DLLAttnetsService implements IAttnetsService {
* Run per slot.
* - Subscribe to gossip subnets 2 slots in advance
* - Unsubscribe from expired subnets
* - Track time to stable mesh if not yet formed
*/
private onSlot = (clockSlot: Slot): void => {
try {
for (const [dutiedSlot, subnets] of this.aggregatorSlotSubnet.entries()) {
setTimeout(
() => {
this.onHalfSlot(clockSlot);
},
this.config.SECONDS_PER_SLOT * 0.5 * 1000
);

for (const [dutiedSlot, dutiedInfo] of this.aggregatorSlotSubnet.entries()) {
if (dutiedSlot === clockSlot + this.opts.slotsToSubscribeBeforeAggregatorDuty) {
// Trigger gossip subscription first, in batch
if (subnets.size > 0) {
this.subscribeToSubnets(Array.from(subnets), SubnetSource.committee);
if (dutiedInfo.size > 0) {
this.subscribeToSubnets(Array.from(dutiedInfo.keys()), SubnetSource.committee);
}
// Then, register the subscriptions
Array.from(subnets).map((subnet) => this.shortLivedSubscriptions.request({subnet, toSlot: dutiedSlot}));
for (const subnet of dutiedInfo.keys()) {
this.shortLivedSubscriptions.request({subnet, toSlot: dutiedSlot});
}
}
this.trackTimeToStableMesh(clockSlot, dutiedSlot, dutiedInfo);
}

this.unsubscribeExpiredCommitteeSubnets(clockSlot);
this.pruneExpiredAggregator(clockSlot);
} catch (e) {
this.logger.error("Error on AttnetsService.onSlot", {slot: clockSlot}, e as Error);
}
};

private onHalfSlot = (clockSlot: Slot): void => {
for (const [dutiedSlot, dutiedInfo] of this.aggregatorSlotSubnet.entries()) {
this.trackTimeToStableMesh(clockSlot, dutiedSlot, dutiedInfo);
}
};

/**
* Track time to form stable mesh if not yet formed
*/
private trackTimeToStableMesh(clockSlot: Slot, dutiedSlot: Slot, dutiedInfo: AggregatorDutyInfo): void {
if (dutiedSlot < clockSlot) {
// aggregator duty is expired, set timeToStableMesh to some big value so we know this value is not good
for (const [subnet, timeToFormMesh] of dutiedInfo.entries()) {
if (timeToFormMesh === null) {
dutiedInfo.set(subnet, NOT_ABLE_TO_FORM_STABLE_MESH_SEC);
this.metrics?.attnetsService.subscriptionsCommitteeTimeToStableMesh.observe(
{subnet},
NOT_ABLE_TO_FORM_STABLE_MESH_SEC
);
}
}
} else if (dutiedSlot <= clockSlot + this.opts.slotsToSubscribeBeforeAggregatorDuty) {
// aggregator duty is not expired, track time to stable mesh if this is the 1st time we see mesh peers>=Dlo (6)
for (const [subnet, timeToFormMesh] of dutiedInfo.entries()) {
if (timeToFormMesh === null) {
const topicStr = stringifyGossipTopic(this.config, {
type: gossipType,
fork: this.config.getForkName(dutiedSlot),
subnet,
});
const numMeshPeers = this.gossip.mesh.get(topicStr)?.size ?? 0;
if (numMeshPeers >= GOSSIP_D_LOW) {
const timeToStableMeshSec = this.clock.secFromSlot(
dutiedSlot - this.opts.slotsToSubscribeBeforeAggregatorDuty
);
// set to dutiedInfo so we'll not set to metrics again
dutiedInfo.set(subnet, timeToStableMeshSec);
this.metrics?.attnetsService.subscriptionsCommitteeTimeToStableMesh.observe({subnet}, timeToStableMeshSec);
}
}
}
}
}

/**
* Run per epoch, clean-up operations that are not urgent
* Subscribe to new random subnets every EPOCHS_PER_RANDOM_SUBNET_SUBSCRIPTION epochs
Expand All @@ -183,8 +249,6 @@ export class DLLAttnetsService implements IAttnetsService {
if (epoch % EPOCHS_PER_RANDOM_SUBNET_SUBSCRIPTION === 0) {
this.recomputeLongLivedSubnets();
}
const slot = computeStartSlotAtEpoch(epoch);
this.pruneExpiredAggregator(slot);
} catch (e) {
this.logger.error("Error on AttnetsService.onEpoch", {epoch}, e as Error);
}
Expand Down Expand Up @@ -245,9 +309,9 @@ export class DLLAttnetsService implements IAttnetsService {
* @param currentSlot
*/
private pruneExpiredAggregator(currentSlot: Slot): void {
for (const slot of this.aggregatorSlotSubnet.keys()) {
if (currentSlot > slot) {
this.aggregatorSlotSubnet.delete(slot);
for (const dutiedSlot of this.aggregatorSlotSubnet.keys()) {
if (currentSlot > dutiedSlot) {
this.aggregatorSlotSubnet.delete(dutiedSlot);
}
}
}
Expand Down Expand Up @@ -303,6 +367,17 @@ export class DLLAttnetsService implements IAttnetsService {
private onScrapeLodestarMetrics(metrics: NetworkCoreMetrics): void {
metrics.attnetsService.committeeSubnets.set(this.committeeSubnets.size);
metrics.attnetsService.subscriptionsCommittee.set(this.shortLivedSubscriptions.size);
// track short lived subnet status, >= 6 (Dlo) means healthy, otherwise unhealthy
const currentSlot = this.clock.currentSlot;
for (const {subnet} of this.shortLivedSubscriptions.getActiveTtl(currentSlot)) {
const topicStr = stringifyGossipTopic(this.config, {
type: gossipType,
fork: this.config.getForkName(currentSlot),
subnet,
});
const numMeshPeers = this.gossip.mesh.get(topicStr)?.size ?? 0;
metrics.attnetsService.subscriptionsCommitteeMeshPeers.observe({subnet}, numMeshPeers);
}
metrics.attnetsService.longLivedSubscriptions.set(this.longLivedSubscriptions.size);
let aggregatorCount = 0;
for (const subnets of this.aggregatorSlotSubnet.values()) {
Expand Down
4 changes: 4 additions & 0 deletions packages/beacon-node/src/network/subnets/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,13 @@ export type SubnetsServiceTestOpts = {
shuffleFn?: ShuffleFn;
};

type TopicStr = string;
type PeerIdStr = string;

export type GossipSubscriber = {
subscribeTopic(topic: GossipTopic): void;
unsubscribeTopic(topic: GossipTopic): void;
mesh: Map<TopicStr, Set<PeerIdStr>>;
};

// uint256 in the spec
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ describe("DLLAttnetsService", () => {
beforeEach(function () {
sandbox.useFakeTimers(Date.now());
gossipStub = sandbox.createStubInstance(Eth2Gossipsub) as SinonStubbedInstance<Eth2Gossipsub> & Eth2Gossipsub;
Object.defineProperty(gossipStub, "mesh", {value: new Map()});
clock = new Clock({
genesisTime: Math.floor(Date.now() / 1000),
config,
Expand Down
10 changes: 8 additions & 2 deletions packages/cli/src/cmds/beacon/handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import {
pruneOldFilesInDir,
} from "../../util/index.js";
import {getVersionData} from "../../util/version.js";
import {LogArgs} from "../../options/logOptions.js";
import {BeaconArgs} from "./options.js";
import {getBeaconPaths} from "./paths.js";
import {initBeaconState} from "./initBeaconState.js";
Expand Down Expand Up @@ -205,8 +206,13 @@ export async function beaconHandlerInit(args: BeaconArgs & GlobalArgs) {
return {config, options, beaconPaths, network, version, commit, peerId, logger};
}

export function initLogger(args: BeaconArgs, dataDir: string, config: ChainForkConfig): LoggerNode {
const defaultLogFilepath = path.join(dataDir, "beacon.log");
export function initLogger(
args: LogArgs & Pick<GlobalArgs, "dataDir">,
dataDir: string,
config: ChainForkConfig,
fileName = "beacon.log"
): LoggerNode {
const defaultLogFilepath = path.join(dataDir, fileName);
const logger = getNodeLogger(parseLoggerArgs(args, {defaultLogFilepath}, config));
try {
cleanOldLogFiles(args, {defaultLogFilepath});
Expand Down
Loading

0 comments on commit 9a66e0b

Please sign in to comment.