diff --git a/packages/beacon-node/src/network/core/networkCoreWorker.ts b/packages/beacon-node/src/network/core/networkCoreWorker.ts index d8ebb3facaa8..8f4eabea6f51 100644 --- a/packages/beacon-node/src/network/core/networkCoreWorker.ts +++ b/packages/beacon-node/src/network/core/networkCoreWorker.ts @@ -113,6 +113,7 @@ wireEventsOnWorkerThread( events, parentPort, networkCoreWorkerMetrics, + logger, networkEventDirection ); wireEventsOnWorkerThread( @@ -120,6 +121,7 @@ wireEventsOnWorkerThread( reqRespBridgeEventBus, parentPort, networkCoreWorkerMetrics, + logger, reqRespBridgeEventDirection ); diff --git a/packages/beacon-node/src/network/core/networkCoreWorkerHandler.ts b/packages/beacon-node/src/network/core/networkCoreWorkerHandler.ts index 3322f2664f04..e1e01de46c33 100644 --- a/packages/beacon-node/src/network/core/networkCoreWorkerHandler.ts +++ b/packages/beacon-node/src/network/core/networkCoreWorkerHandler.ts @@ -73,6 +73,7 @@ export class WorkerNetworkCore implements INetworkCore { modules.events, modules.worker as unknown as worker_threads.Worker, modules.metrics, + modules.logger, networkEventDirection ); wireEventsOnMainThread( @@ -80,6 +81,7 @@ export class WorkerNetworkCore implements INetworkCore { this.reqRespBridgeEventBus, modules.worker as unknown as worker_threads.Worker, modules.metrics, + modules.logger, reqRespBridgeEventDirection ); diff --git a/packages/beacon-node/src/util/workerEvents.ts b/packages/beacon-node/src/util/workerEvents.ts index 5cb4e7553f95..9e82d7398ae8 100644 --- a/packages/beacon-node/src/util/workerEvents.ts +++ b/packages/beacon-node/src/util/workerEvents.ts @@ -1,4 +1,5 @@ import {MessagePort, Worker} from "node:worker_threads"; +import {LoggerNode} from "@lodestar/logger/lib/node.js"; import {Metrics} from "../metrics/metrics.js"; import {NetworkCoreWorkerMetrics} from "../network/core/metrics.js"; import {StrictEventEmitterSingleArg} from "./strictEvents.js"; @@ -28,6 +29,7 @@ export function wireEventsOnWorkerThread( events: StrictEventEmitterSingleArg, parentPort: MessagePort, metrics: NetworkCoreWorkerMetrics | null, + logger: LoggerNode, isWorkerToMain: {[K in keyof EventData]: EventDirection} ): void { // Subscribe to events from main thread @@ -38,9 +40,11 @@ export function wireEventsOnWorkerThread( // This check is not necessary but added for safety in case of improper implemented events isWorkerToMain[data.event] === EventDirection.mainToWorker ) { - events.emit(data.event, data.data); const [sec, nanoSec] = process.hrtime(data.posted); - metrics?.networkWorkerWireEventsOnWorkerThreadLatencySec.observe(sec + nanoSec / 1e9); + const networkWorkerLatency = sec + nanoSec / 1e9; + metrics?.networkWorkerWireEventsOnWorkerThreadLatencySec.observe(networkWorkerLatency); + logger.trace("network worker message latency", networkWorkerLatency); + events.emit(data.event, data.data); } }); @@ -65,6 +69,7 @@ export function wireEventsOnMainThread( events: StrictEventEmitterSingleArg, worker: Pick, metrics: Metrics | null, + logger: LoggerNode, isWorkerToMain: {[K in keyof EventData]: EventDirection} ): void { // Subscribe to events from main thread @@ -75,9 +80,11 @@ export function wireEventsOnMainThread( // This check is not necessary but added for safety in case of improper implemented events isWorkerToMain[data.event] === EventDirection.workerToMain ) { - events.emit(data.event, data.data); const [sec, nanoSec] = process.hrtime(data.posted); - metrics?.networkWorkerWireEventsOnMainThreadLatencySec.observe(sec + nanoSec / 1e9); + const networkWorkerLatency = sec + nanoSec / 1e9; + metrics?.networkWorkerWireEventsOnMainThreadLatencySec.observe(networkWorkerLatency); + logger.trace("network worker message latency", networkWorkerLatency); + events.emit(data.event, data.data); } });