Skip to content

Commit

Permalink
feat(beacon-node): network worker event latency metrics (#5800)
Browse files Browse the repository at this point in the history
* feat(beacon-node): pass metrics to workerEvents

* feat(beacon-node): add types for network worker event metrics

* feat(beacon-node): add metrics for network worker events

* feat(beacon-node): add metric data to network worker events

* fix(beacon-node): move async iterator timestamps to correct location

* fix(beacon-node): Omit unnecessary emittedAt from parameter type

* feat(dashboards): add network worker thread metrics to dashboard

* fix(dashboard): change metric name back to lodestar_ prefix

* fix: run check-types and update missed test types

* Revert "feat(beacon-node): add types for network worker event metrics"

This reverts commit e8dc6d2.

* Revert "fix(beacon-node): move async iterator timestamps to correct location"

This reverts commit 8b3f6c6.

* Revert "fix(beacon-node): Omit unnecessary emittedAt from parameter type"

This reverts commit b7fde56.

* Revert "feat(beacon-node): add metric data to network worker events"

This reverts commit 631f570.

* feat(beacon-node): capture worker message in hrTime

* fix(dashboards): remove re-emit panel from network worker row

* fix(metrics): remove unused re-emit metrics

* Revert "fix: run check-types and update missed test types"

This reverts commit 525d157.

* fix(beacon-node): update metric name at call site

* fix(metrics): update capture to ISU units

* refactor(beacon-node): move initialization out of conditional

* fix: remove unused import

* feat(metrics): add unit to metric name

* feat(metrics): add unit to metric name

* feat: add trace log statement to network worker

* fix: change trace to debug log

* feat(metrics): add eventName to network worker message metrics

* bug(logger): check if trace is broken

* feat(metrics): add worker eventDirection label

* fix(metrics): use string instead of enum for eventDirection

* fix(metrics): remove eventDirection label

* feat(dashboards): add average panel for network worker message

* fix(metrics): update naming per Nico's suggestions

* refactor: remove unused logger from workerEvents.ts

* fix(metrics): add network worker unit name back

* fix(beacon-node): use bigint for hrtime in worker message metric

* fix(dashboards): remove dashboard changes. moved to PR#5827

* fix(dashboards): remove dashboard changes. moved to PR#5827

* fix(dashboards): remove dashboard changes. moved to PR#5827

* fix: constant case for nano conversion

* Revert "fix(beacon-node): use bigint for hrtime in worker message metric"

This reverts commit 59f12ff.

* refactor: remove Sec suffix in metric variable name

* fix(dashboard): make metric name match the updates in PR
  • Loading branch information
matthewkeil authored Oct 12, 2023
1 parent 93709ff commit dd57c96
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 15 deletions.
20 changes: 10 additions & 10 deletions dashboards/lodestar_networking.json
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
{
"__inputs": [
{
"description": "",
"label": "Prometheus",
"name": "DS_PROMETHEUS",
"type": "datasource",
"label": "Prometheus",
"description": "",
"pluginId": "prometheus",
"pluginName": "Prometheus",
"type": "datasource"
"pluginName": "Prometheus"
}
],
"annotations": {
Expand Down Expand Up @@ -937,7 +937,7 @@
},
"editorMode": "code",
"exemplar": false,
"expr": "(\n sum(rate(\n lodestar_network_worker_wire_events_on_worker_thread_latency_sum[$rate_interval]\n )) \n +\n sum(rate(\n lodestar_network_worker_wire_events_on_main_thread_latency_sum[$rate_interval]\n ))\n)\n/\n(\n sum(rate(\n lodestar_network_worker_wire_events_on_worker_thread_latency_count[$rate_interval]\n ))\n +\n sum(rate(\n lodestar_network_worker_wire_events_on_main_thread_latency_count[$rate_interval]\n ))\n)",
"expr": "(\n sum(rate(\n lodestar_network_worker_wire_events_on_worker_thread_latency_seconds_sum[$rate_interval]\n )) \n +\n sum(rate(\n lodestar_network_worker_wire_events_on_main_thread_latency_seconds_sum[$rate_interval]\n ))\n)\n/\n(\n sum(rate(\n lodestar_network_worker_wire_events_on_worker_thread_latency_seconds_count[$rate_interval]\n ))\n +\n sum(rate(\n lodestar_network_worker_wire_events_on_main_thread_latency_seconds_count[$rate_interval]\n ))\n)",
"hide": false,
"interval": "",
"legendFormat": "Average",
Expand All @@ -951,7 +951,7 @@
},
"editorMode": "code",
"exemplar": false,
"expr": "avg(rate(lodestar_network_worker_wire_events_on_worker_thread_latency_sum[$rate_interval])/rate(lodestar_network_worker_wire_events_on_worker_thread_latency_count[$rate_interval]))",
"expr": "avg(rate(lodestar_network_worker_wire_events_on_worker_thread_latency_seconds_sum[$rate_interval])/rate(lodestar_network_worker_wire_events_on_worker_thread_latency_seconds_count[$rate_interval]))",
"hide": false,
"interval": "",
"legendFormat": "Worker to Main",
Expand All @@ -964,7 +964,7 @@
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
"expr": "avg(rate(lodestar_network_worker_wire_events_on_main_thread_latency_sum[$rate_interval])/rate(lodestar_network_worker_wire_events_on_main_thread_latency_count[$rate_interval]))",
"expr": "avg(rate(lodestar_network_worker_wire_events_on_main_thread_latency_seconds_sum[$rate_interval])/rate(lodestar_network_worker_wire_events_on_main_thread_latency_seconds_count[$rate_interval]))",
"hide": false,
"legendFormat": "Main to Worker",
"range": true,
Expand Down Expand Up @@ -1051,7 +1051,7 @@
},
"editorMode": "code",
"exemplar": false,
"expr": "avg(rate(lodestar_network_worker_wire_events_on_main_thread_latency_sum[$rate_interval])/rate(lodestar_network_worker_wire_events_on_main_thread_latency_count[$rate_interval]))",
"expr": "avg(rate(lodestar_network_worker_wire_events_on_main_thread_latency_seconds_sum[$rate_interval])/rate(lodestar_network_worker_wire_events_on_main_thread_latency_seconds_count[$rate_interval]))",
"hide": true,
"interval": "",
"legendFormat": "Average to Main",
Expand All @@ -1065,7 +1065,7 @@
},
"editorMode": "code",
"exemplar": false,
"expr": "rate(lodestar_network_worker_wire_events_on_main_thread_latency_sum[$rate_interval])/rate(lodestar_network_worker_wire_events_on_main_thread_latency_count[$rate_interval])",
"expr": "rate(lodestar_network_worker_wire_events_on_main_thread_latency_seconds_sum[$rate_interval])/rate(lodestar_network_worker_wire_events_on_main_thread_latency_seconds_count[$rate_interval])",
"interval": "",
"legendFormat": "{{eventName}}",
"range": true,
Expand All @@ -1077,7 +1077,7 @@
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
"expr": "rate(lodestar_network_worker_wire_events_on_worker_thread_latency_sum[$rate_interval])/rate(lodestar_network_worker_wire_events_on_worker_thread_latency_count[$rate_interval])",
"expr": "rate(lodestar_network_worker_wire_events_on_worker_thread_latency_seconds_sum[$rate_interval])/rate(lodestar_network_worker_wire_events_on_worker_thread_latency_seconds_count[$rate_interval])",
"hide": false,
"legendFormat": "{{eventName}}",
"range": true,
Expand Down
6 changes: 6 additions & 0 deletions packages/beacon-node/src/metrics/metrics/lodestar.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,12 @@ export function createLodestarMetrics(
help: "Current count of pending items in reqRespBridgeReqCaller data structure",
}),
},
networkWorkerWireEventsOnMainThreadLatency: register.histogram<"eventName">({
name: "lodestar_network_worker_wire_events_on_main_thread_latency_seconds",
help: "Latency in seconds to transmit network events to main thread across worker port",
labelNames: ["eventName"],
buckets: [0.001, 0.003, 0.01, 0.03, 0.1],
}),

regenQueue: {
length: register.gauge({
Expand Down
8 changes: 8 additions & 0 deletions packages/beacon-node/src/network/core/metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -333,12 +333,20 @@ export function createNetworkCoreMetrics(register: RegistryMetricCreator) {
};
}

export type NetworkCoreWorkerMetrics = ReturnType<typeof getNetworkCoreWorkerMetrics>;

// eslint-disable-next-line @typescript-eslint/explicit-function-return-type
export function getNetworkCoreWorkerMetrics(register: RegistryMetricCreator) {
return {
reqRespBridgeRespCallerPending: register.gauge({
name: "lodestar_network_worker_reqresp_bridge_caller_pending_count",
help: "Current count of pending elements in respBridgeCaller",
}),
networkWorkerWireEventsOnWorkerThreadLatency: register.histogram<"eventName">({
name: "lodestar_network_worker_wire_events_on_worker_thread_latency_seconds",
help: "Latency in seconds to transmit network events to worker thread across parent port",
labelNames: ["eventName"],
buckets: [0.001, 0.003, 0.01, 0.03, 0.1],
}),
};
}
12 changes: 7 additions & 5 deletions packages/beacon-node/src/network/core/networkCoreWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ import {peerIdToString} from "../../util/peerId.js";
import {profileNodeJS} from "../../util/profile.js";
import {NetworkEventBus, NetworkEventData, networkEventDirection} from "../events.js";
import {wireEventsOnWorkerThread} from "../../util/workerEvents.js";
import {getNetworkCoreWorkerMetrics} from "./metrics.js";
import {NetworkWorkerApi, NetworkWorkerData} from "./types.js";
import {NetworkCore} from "./networkCore.js";
import {
NetworkWorkerThreadEventType,
ReqRespBridgeEventBus,
Expand All @@ -21,9 +24,6 @@ import {
getReqRespBridgeRespEvents,
reqRespBridgeEventDirection,
} from "./events.js";
import {getNetworkCoreWorkerMetrics} from "./metrics.js";
import {NetworkCore} from "./networkCore.js";
import {NetworkWorkerApi, NetworkWorkerData} from "./types.js";

// Cloned data from instantiation
const workerData = worker.workerData as NetworkWorkerData;
Expand Down Expand Up @@ -83,9 +83,9 @@ new AsyncIterableBridgeHandler(getReqRespBridgeReqEvents(reqRespBridgeEventBus),
);
const reqRespBridgeRespCaller = new AsyncIterableBridgeCaller(getReqRespBridgeRespEvents(reqRespBridgeEventBus));

const networkCoreWorkerMetrics = metricsRegister ? getNetworkCoreWorkerMetrics(metricsRegister) : null;
// respBridgeCaller metrics
if (metricsRegister) {
const networkCoreWorkerMetrics = getNetworkCoreWorkerMetrics(metricsRegister);
if (networkCoreWorkerMetrics) {
networkCoreWorkerMetrics.reqRespBridgeRespCallerPending.addCollect(() => {
networkCoreWorkerMetrics.reqRespBridgeRespCallerPending.set(reqRespBridgeRespCaller.pendingCount);
});
Expand All @@ -110,12 +110,14 @@ wireEventsOnWorkerThread<NetworkEventData>(
NetworkWorkerThreadEventType.networkEvent,
events,
parentPort,
networkCoreWorkerMetrics,
networkEventDirection
);
wireEventsOnWorkerThread<ReqRespBridgeEventData>(
NetworkWorkerThreadEventType.reqRespBridgeEvents,
reqRespBridgeEventBus,
parentPort,
networkCoreWorkerMetrics,
reqRespBridgeEventDirection
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,14 @@ export class WorkerNetworkCore implements INetworkCore {
NetworkWorkerThreadEventType.networkEvent,
modules.events,
modules.worker as unknown as worker_threads.Worker,
modules.metrics,
networkEventDirection
);
wireEventsOnMainThread<ReqRespBridgeEventData>(
NetworkWorkerThreadEventType.reqRespBridgeEvents,
this.reqRespBridgeEventBus,
modules.worker as unknown as worker_threads.Worker,
modules.metrics,
reqRespBridgeEventDirection
);

Expand Down
21 changes: 21 additions & 0 deletions packages/beacon-node/src/util/workerEvents.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,16 @@ import {MessagePort, Worker} from "node:worker_threads";
import {Thread} from "@chainsafe/threads";
import {Logger} from "@lodestar/logger";
import {sleep} from "@lodestar/utils";
import {Metrics} from "../metrics/metrics.js";
import {NetworkCoreWorkerMetrics} from "../network/core/metrics.js";
import {StrictEventEmitterSingleArg} from "./strictEvents.js";

const NANO_TO_SECOND_CONVERSION = 1e9;

export type WorkerBridgeEvent<EventData> = {
type: string;
event: keyof EventData;
posted: [number, number];
data: EventData[keyof EventData];
};

Expand All @@ -27,6 +32,7 @@ export function wireEventsOnWorkerThread<EventData>(
mainEventName: string,
events: StrictEventEmitterSingleArg<EventData>,
parentPort: MessagePort,
metrics: NetworkCoreWorkerMetrics | null,
isWorkerToMain: {[K in keyof EventData]: EventDirection}
): void {
// Subscribe to events from main thread
Expand All @@ -37,6 +43,12 @@ export function wireEventsOnWorkerThread<EventData>(
// This check is not necessary but added for safety in case of improper implemented events
isWorkerToMain[data.event] === EventDirection.mainToWorker
) {
const [sec, nanoSec] = process.hrtime(data.posted);
const networkWorkerLatency = sec + nanoSec / NANO_TO_SECOND_CONVERSION;
metrics?.networkWorkerWireEventsOnWorkerThreadLatency.observe(
{eventName: data.event as string},
networkWorkerLatency
);
events.emit(data.event, data.data);
}
});
Expand All @@ -48,6 +60,7 @@ export function wireEventsOnWorkerThread<EventData>(
const workerEvent: WorkerBridgeEvent<EventData> = {
type: mainEventName,
event: eventName,
posted: process.hrtime(),
data,
};
parentPort.postMessage(workerEvent);
Expand All @@ -60,6 +73,7 @@ export function wireEventsOnMainThread<EventData>(
mainEventName: string,
events: StrictEventEmitterSingleArg<EventData>,
worker: Pick<Worker, "on" | "postMessage">,
metrics: Metrics | null,
isWorkerToMain: {[K in keyof EventData]: EventDirection}
): void {
// Subscribe to events from main thread
Expand All @@ -70,6 +84,12 @@ export function wireEventsOnMainThread<EventData>(
// This check is not necessary but added for safety in case of improper implemented events
isWorkerToMain[data.event] === EventDirection.workerToMain
) {
const [sec, nanoSec] = process.hrtime(data.posted);
const networkWorkerLatency = sec + nanoSec / NANO_TO_SECOND_CONVERSION;
metrics?.networkWorkerWireEventsOnMainThreadLatency.observe(
{eventName: data.event as string},
networkWorkerLatency
);
events.emit(data.event, data.data);
}
});
Expand All @@ -81,6 +101,7 @@ export function wireEventsOnMainThread<EventData>(
const workerEvent: WorkerBridgeEvent<EventData> = {
type: mainEventName,
event: eventName,
posted: process.hrtime(),
data,
};
worker.postMessage(workerEvent);
Expand Down

0 comments on commit dd57c96

Please sign in to comment.