diff --git a/.dev/compose.backbone.env b/.dev/compose.backbone.env index 35291066..d3dace30 100644 --- a/.dev/compose.backbone.env +++ b/.dev/compose.backbone.env @@ -1 +1 @@ -BACKBONE_VERSION=6.5.1 +BACKBONE_VERSION=6.7.1 diff --git a/config/default.json b/config/default.json index 8621155b..68d56eee 100644 --- a/config/default.json +++ b/config/default.json @@ -10,11 +10,12 @@ "appenders": { "fileAppender": { "type": "dateFile", - "filename": "/var/log/enmeshed-connector/latest.log" + "filename": "./latest.log", + "layout": { "type": "pattern", "pattern": "[%d] [%p] %c - %m %x{correlationId}" } }, "consoleAppender": { "type": "stdout", - "layout": { "type": "pattern", "pattern": "%[[%d] [%p] %c - %m%]" } + "layout": { "type": "pattern", "pattern": "%[[%d] [%p] %c - %m %x{correlationId}%]" } }, "console": { "type": "logLevelFilter", diff --git a/config/test.json b/config/test.json index 2bbd1163..43a3ceb5 100644 --- a/config/test.json +++ b/config/test.json @@ -7,7 +7,7 @@ "appenders": { "consoleAppender": { "type": "stdout", - "layout": { "type": "pattern", "pattern": "%[[%d] [%p] %c - %m%]" } + "layout": { "type": "pattern", "pattern": "%[[%d] [%p] %c - %m %x{correlationId}%]" } }, "console": { "type": "logLevelFilter", diff --git a/package-lock.json b/package-lock.json index b174aaa5..1aa8bda0 100644 --- a/package-lock.json +++ b/package-lock.json @@ -21,6 +21,7 @@ "amqplib": "^0.10.4", "axios": "^1.7.7", "compression": "1.7.4", + "correlation-id": "^5.2.0", "cors": "2.8.5", "eventsource": "^2.0.2", "express": "4.21.0", diff --git a/package.json b/package.json index 001c9c11..19c08de5 100644 --- a/package.json +++ b/package.json @@ -89,6 +89,7 @@ "amqplib": "^0.10.4", "axios": "^1.7.7", "compression": "1.7.4", + "correlation-id": "^5.2.0", "cors": "2.8.5", "eventsource": "^2.0.2", "express": "4.21.0", diff --git a/src/ConnectorRuntime.ts b/src/ConnectorRuntime.ts index 781a31e5..39b1418c 100644 --- a/src/ConnectorRuntime.ts +++ b/src/ConnectorRuntime.ts @@ -8,6 +8,7 @@ import { ConsumptionController } from "@nmshd/consumption"; import { ConsumptionServices, DataViewExpander, GetIdentityInfoResponse, ModuleConfiguration, Runtime, RuntimeHealth, RuntimeServices, TransportServices } from "@nmshd/runtime"; import { AccountController, TransportCoreErrors } from "@nmshd/transport"; import axios from "axios"; +import correlator from "correlation-id"; import fs from "fs"; import { HttpsProxyAgent } from "https-proxy-agent"; import { validate as validateSchema } from "jsonschema"; @@ -57,7 +58,7 @@ export class ConnectorRuntime extends Runtime { private healthChecker: HealthChecker; private constructor(connectorConfig: ConnectorRuntimeConfig, loggerFactory: NodeLoggerFactory) { - super(connectorConfig, loggerFactory); + super(connectorConfig, loggerFactory, undefined, correlator); } public static async create(connectorConfig: ConnectorRuntimeConfig): Promise { diff --git a/src/index.ts b/src/index.ts index b7bbe2e3..0142b68f 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,4 +1,5 @@ import { RuntimeConfig } from "@nmshd/runtime"; +import correlator from "correlation-id"; import nconf from "nconf"; import { ConnectorRuntime } from "./ConnectorRuntime"; import { ConnectorRuntimeConfig } from "./ConnectorRuntimeConfig"; @@ -28,6 +29,7 @@ export function createConnectorConfig(overrides?: RuntimeConfig): ConnectorRunti .file("default-file", { file: "config/default.json" }); const connectorConfig = nconf.get() as ConnectorRuntimeConfig; + addCorrelationIdSupportToLogger(connectorConfig); if (connectorConfig.modules.sync.enabled && connectorConfig.modules.sse.enabled) { // eslint-disable-next-line no-console @@ -38,6 +40,21 @@ export function createConnectorConfig(overrides?: RuntimeConfig): ConnectorRunti return connectorConfig; } +function addCorrelationIdSupportToLogger(connectorConfig: ConnectorRuntimeConfig) { + Object.entries(connectorConfig.logging.appenders).forEach(([_key, appender]) => { + if ("layout" in appender && appender.layout.type === "pattern") { + const tokens = appender.layout.tokens; + + appender.layout.tokens = { + ...tokens, + correlationId: () => { + return correlator.getId() ?? ""; + } + }; + } + }); +} + const envKeyMapping: Record = { DATABASE_NAME: "database:dbName", // eslint-disable-line @typescript-eslint/naming-convention API_KEY: "infrastructure:httpServer:apiKey", // eslint-disable-line @typescript-eslint/naming-convention diff --git a/src/infrastructure/httpServer/HttpServer.ts b/src/infrastructure/httpServer/HttpServer.ts index d511d428..f8ce0450 100644 --- a/src/infrastructure/httpServer/HttpServer.ts +++ b/src/infrastructure/httpServer/HttpServer.ts @@ -1,5 +1,6 @@ import { sleep } from "@js-soft/ts-utils"; import compression from "compression"; +import correlator from "correlation-id"; import cors, { CorsOptions } from "cors"; import express, { Application, RequestHandler } from "express"; import helmet, { HelmetOptions } from "helmet"; @@ -58,6 +59,18 @@ export class HttpServer extends ConnectorInfrastructure private configure(): void { this.logger.debug("Configuring middleware..."); + this.app.use((req, res, next) => { + let correlationId = req.headers["x-correlation-id"]; + if (Array.isArray(correlationId)) { + correlationId = correlationId[0]; + } + if (correlationId) { + correlator.withId(correlationId, next); + } else { + correlator.withId(next); + } + }); + this.app.use(helmet(this.getHelmetOptions())); this.app.use(requestLogger(this.logger)); diff --git a/src/modules/sse/SseModule.ts b/src/modules/sse/SseModule.ts index 388f398c..8c50cb1d 100644 --- a/src/modules/sse/SseModule.ts +++ b/src/modules/sse/SseModule.ts @@ -1,4 +1,5 @@ import { ILogger } from "@js-soft/logging-abstractions"; +import correlator from "correlation-id"; import eventSourceModule from "eventsource"; import { ConnectorMode } from "../../ConnectorMode"; import { ConnectorRuntime } from "../../ConnectorRuntime"; @@ -88,11 +89,12 @@ export default class SseModule extends ConnectorRuntimeModule { + const syncResult = await services.transportServices.account.syncEverything(); + if (syncResult.isError) { + this.logger.error(syncResult); + } + }); } public stop(): void { diff --git a/src/modules/sync/SyncModule.ts b/src/modules/sync/SyncModule.ts index 7b493eda..974d987e 100644 --- a/src/modules/sync/SyncModule.ts +++ b/src/modules/sync/SyncModule.ts @@ -1,3 +1,4 @@ +import correlator from "correlation-id"; import { ConnectorRuntimeModule, ConnectorRuntimeModuleConfiguration } from "../../ConnectorRuntimeModule"; export interface SyncModuleConfiguration extends ConnectorRuntimeModuleConfiguration { @@ -16,8 +17,10 @@ export default class SyncModule extends ConnectorRuntimeModule { + const result = await this.runtime.getServices().transportServices.account.syncEverything(); + if (result.isError) this.logger.error("Sync failed", result.error); + }); this.syncTimeout = setTimeout(async () => await this.sync(), this.configuration.interval * 1000); } diff --git a/src/modules/webhooks/WebhooksModule.ts b/src/modules/webhooks/WebhooksModule.ts index f56bb9a9..0c16194a 100644 --- a/src/modules/webhooks/WebhooksModule.ts +++ b/src/modules/webhooks/WebhooksModule.ts @@ -2,6 +2,7 @@ import { Event, DataEvent as tsUtilsDataEvent } from "@js-soft/ts-utils"; import { DataEvent } from "@nmshd/runtime"; import agentKeepAlive, { HttpsAgent as AgentKeepAliveHttps } from "agentkeepalive"; import axios, { AxiosInstance } from "axios"; +import correlator from "correlation-id"; import { ConnectorRuntimeModule } from "../../ConnectorRuntimeModule"; import { ConfigModel, Webhook } from "./ConfigModel"; import { ConfigParser } from "./ConfigParser"; @@ -45,7 +46,13 @@ export default class WebhooksModule extends ConnectorRuntimeModule 299) { this.logger.warn(`Request to webhook '${url}' returned status ${response.status}. Expected value between 200 and 299.`); diff --git a/test/correlationId.test.ts b/test/correlationId.test.ts new file mode 100644 index 00000000..6679d10f --- /dev/null +++ b/test/correlationId.test.ts @@ -0,0 +1,65 @@ +import { AxiosInstance } from "axios"; +import { randomUUID } from "crypto"; +import { DateTime } from "luxon"; +import { ConnectorClientWithMetadata, Launcher } from "./lib/Launcher"; +import { getTimeout } from "./lib/setTimeout"; +import { establishRelationship } from "./lib/testUtils"; + +const launcher = new Launcher(); +let axiosClient: AxiosInstance; +let connectorClient1: ConnectorClientWithMetadata; +let connectorClient2: ConnectorClientWithMetadata; +let account2Address: string; +const uuidRegex = new RegExp("[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}"); + +beforeAll(async () => { + [connectorClient1, connectorClient2] = await launcher.launch(2); + axiosClient = connectorClient1["account"]["httpClient"]; + await establishRelationship(connectorClient1, connectorClient2); + account2Address = (await connectorClient2.account.getIdentityInfo()).result.address; +}, getTimeout(30000)); + +afterAll(() => launcher.stop()); + +describe("test the correlation ids", () => { + // eslint-disable-next-line jest/expect-expect + test("should send a random correlation id via webhook", async () => { + connectorClient1._eventBus?.reset(); + + await axiosClient.post("/api/v2/Requests/Outgoing", { + content: { + items: [{ "@type": "ReadAttributeRequestItem", mustBeAccepted: false, query: { "@type": "IdentityAttributeQuery", valueType: "Surname" } }], + expiresAt: DateTime.now().plus({ hour: 1 }).toISO() + }, + peer: account2Address + }); + + await connectorClient1._eventBus?.waitForEvent("consumption.outgoingRequestCreated", (event: any) => { + return uuidRegex.test(event.headers["x-correlation-id"]); + }); + }); + + // eslint-disable-next-line jest/expect-expect + test("should send a custom correlation id via webhook", async () => { + connectorClient1._eventBus?.reset(); + + const customCorrelationId = randomUUID(); + + await axiosClient.post( + "/api/v2/Requests/Outgoing", + { + content: { + items: [{ "@type": "ReadAttributeRequestItem", mustBeAccepted: false, query: { "@type": "IdentityAttributeQuery", valueType: "Surname" } }], + expiresAt: DateTime.now().plus({ hour: 1 }).toISO() + }, + peer: account2Address + }, + // eslint-disable-next-line @typescript-eslint/naming-convention + { headers: { "x-correlation-id": customCorrelationId } } + ); + + await connectorClient1._eventBus?.waitForEvent("consumption.outgoingRequestCreated", (event: any) => { + return event.headers["x-correlation-id"] === customCorrelationId; + }); + }); +}); diff --git a/test/lib/DataEventWithHeader.ts b/test/lib/DataEventWithHeader.ts new file mode 100644 index 00000000..8d1614fe --- /dev/null +++ b/test/lib/DataEventWithHeader.ts @@ -0,0 +1,11 @@ +import { DataEvent } from "@js-soft/ts-utils"; +import { IncomingHttpHeaders } from "node:http"; + +export class DataEventWithHeaders extends DataEvent { + public readonly headers: IncomingHttpHeaders; + + public constructor(namespace: string, data: T, headers: IncomingHttpHeaders) { + super(namespace, data); + this.headers = headers; + } +} diff --git a/test/lib/Launcher.ts b/test/lib/Launcher.ts index 8cd94992..1015a86e 100644 --- a/test/lib/Launcher.ts +++ b/test/lib/Launcher.ts @@ -1,4 +1,3 @@ -import { DataEvent } from "@js-soft/ts-utils"; import { ConnectorClient } from "@nmshd/connector-sdk"; import { Random, RandomCharacterRange } from "@nmshd/transport"; import { ChildProcess, spawn } from "child_process"; @@ -7,6 +6,7 @@ import http, { Server } from "node:http"; import https from "node:https"; import inspector from "node:inspector"; import path from "path"; +import { DataEventWithHeaders } from "./DataEventWithHeader"; import { MockEventBus } from "./MockEventBus"; import getPort from "./getPort"; import waitForConnector from "./waitForConnector"; @@ -129,7 +129,7 @@ export class Launcher { .use((req, res) => { res.status(200).send("OK"); - eventBus.publish(new DataEvent(req.body.trigger, req.body.data)); + eventBus.publish(new DataEventWithHeaders(req.body.trigger, req.body.data, req.headers)); }) .listen(port); } diff --git a/test/lib/MockEventBus.ts b/test/lib/MockEventBus.ts index 7c9055ce..eeda238e 100644 --- a/test/lib/MockEventBus.ts +++ b/test/lib/MockEventBus.ts @@ -22,13 +22,13 @@ export class MockEventBus extends EventEmitter2EventBus { this.publishPromises.push(this.emitter.emitAsync(namespace, event)); } - public async waitForEvent(subscriptionTarget: string, predicate?: (event: TEvent) => boolean): Promise { + public async waitForEvent(subscriptionTarget: string, predicate?: (event: TEvent) => boolean, timeout?: number): Promise { const alreadyTriggeredEvents = this.publishedEvents.find((e) => e.namespace === subscriptionTarget && (!predicate || predicate(e as TEvent))) as TEvent | undefined; if (alreadyTriggeredEvents) { return alreadyTriggeredEvents; } - const event = await waitForEvent(this, subscriptionTarget, predicate); + const event = await waitForEvent(this, subscriptionTarget, predicate, timeout); return event; }