From 6017dba9a99ca80bd7b90b424b995b1f0d45babb Mon Sep 17 00:00:00 2001 From: Spencer T Brody Date: Mon, 15 Jul 2024 15:49:16 -0400 Subject: [PATCH 1/5] fix: Subscribe to node metrics model before publishing to it. --- packages/common/package.json | 1 - packages/core/package.json | 2 +- packages/core/src/__tests__/recon.test.ts | 2 +- packages/core/src/ceramic.ts | 9 ++++++++- packages/core/src/recon.ts | 9 +++++++-- 5 files changed, 17 insertions(+), 6 deletions(-) diff --git a/packages/common/package.json b/packages/common/package.json index fcb89e4d2d..0cf35b97c7 100644 --- a/packages/common/package.json +++ b/packages/common/package.json @@ -43,7 +43,6 @@ "clean": "npx rimraf ./lib" }, "dependencies": { - "@ceramicnetwork/node-metrics": "^1.0.3", "@ceramicnetwork/streamid": "^5.5.0", "@didtools/cacao": "^3.0.0", "@didtools/key-webauthn": "^2.0.2", diff --git a/packages/core/package.json b/packages/core/package.json index a56295461c..a5a6d02ec6 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -51,7 +51,7 @@ "@ceramicnetwork/indexing": "^5.1.0", "@ceramicnetwork/ipfs-topology": "^6.1.0", "@ceramicnetwork/job-queue": "^5.1.0", - "@ceramicnetwork/node-metrics": "^1.0.3", + "@ceramicnetwork/node-metrics": "^1.0.5", "@ceramicnetwork/observability": "^1.4.4", "@ceramicnetwork/pinning-aggregation": "^6.1.0", "@ceramicnetwork/pinning-ipfs-backend": "^6.1.0", diff --git a/packages/core/src/__tests__/recon.test.ts b/packages/core/src/__tests__/recon.test.ts index b9c99bd42a..f7ee84d479 100644 --- a/packages/core/src/__tests__/recon.test.ts +++ b/packages/core/src/__tests__/recon.test.ts @@ -98,7 +98,7 @@ describe('ReconApi', () => { await reconApi.registerInterest(MODEL) expect(mockSendRequest).toHaveBeenCalledWith( `${RECON_URL}/ceramic/interests/model/${MODEL.toString()}`, - { method: 'POST' } + { method: 'POST', body: {}, headers: { 'Content-Type': 'application/json' } } ) }) }) diff --git a/packages/core/src/ceramic.ts b/packages/core/src/ceramic.ts index c45c8c6d78..57a07ad6d5 100644 --- a/packages/core/src/ceramic.ts +++ b/packages/core/src/ceramic.ts @@ -231,6 +231,7 @@ export class Ceramic implements StreamReaderWriter, StreamStateLoader { private _signer: CeramicSigner public readonly dispatcher: Dispatcher public readonly loggerProvider: LoggerProvider + public readonly recon: IReconApi public readonly admin: AdminApi public readonly feed: PublicFeed readonly repository: Repository @@ -329,6 +330,7 @@ export class Ceramic implements StreamReaderWriter, StreamStateLoader { ) const pinApi = new LocalPinApi(this.repository, this._logger) this.repository.index.setSyncQueryApi(this.syncApi) + this.recon = modules.reconApi this.admin = new LocalAdminApi( this._logger, localIndex, @@ -562,7 +564,6 @@ export class Ceramic implements StreamReaderWriter, StreamStateLoader { } await this._startupChecks() - await this._startMetrics() } catch (err) { this._logger.err(err) @@ -614,6 +615,12 @@ export class Ceramic implements StreamReaderWriter, StreamStateLoader { // If authenticated into the node, we can start publishing metrics // publishing metrics is enabled by default, even if no metrics config if (this._metricsConfig?.metricsPublisherEnabled) { + // First, subscribe the node to the Model used for NodeMetrics + const metricsModel = NodeMetrics.getModel(this._networkOptions.name) + await this.repository.index.indexModels([{ streamID: metricsModel }]) + await this.recon.registerInterest(metricsModel, this.did.id) + + // Now start the NodeMetrics system. const ipfsVersion = await this.ipfs.version() const ipfsId = await this.ipfs.id() diff --git a/packages/core/src/recon.ts b/packages/core/src/recon.ts index c1a4efa4f9..fd6762e44e 100644 --- a/packages/core/src/recon.ts +++ b/packages/core/src/recon.ts @@ -57,7 +57,7 @@ export interface ReconEventFeedResponse { */ export interface IReconApi extends Observable { init(initialCursor?: string): Promise - registerInterest(model: StreamID): Promise + registerInterest(model: StreamID, controller?: string): Promise put(car: CAR, opts?: AbortOptions): Promise enabled: boolean stop(): void @@ -118,14 +118,19 @@ export class ReconApi extends Observable implements IRec /** * Registers interest in a model * @param model stream id of the model to register interest in + * @param controller restrict the interest range to just events with this controller */ - async registerInterest(model: StreamID): Promise { + async registerInterest(model: StreamID, controller?: string): Promise { if (!this.enabled) { throw new Error(`Recon: disabled, not registering interest in model ${model.toString()}`) } try { + const headers = { 'Content-Type': 'application/json' } + const body = { ...(controller && { controller }) } await this.#sendRequest(this.#url + `/ceramic/interests/model/${model.toString()}`, { method: 'POST', + headers, + body, }) this.#logger.debug(`Recon: added interest for model ${model.toString()}`) } catch (err) { From b0fa60ad668cf5027ff31bdcb8fe5c169e29b797 Mon Sep 17 00:00:00 2001 From: Spencer T Brody Date: Fri, 19 Jul 2024 14:41:29 -0400 Subject: [PATCH 2/5] feat: wait for metrics model to be available locally --- packages/core/src/ceramic.ts | 42 +++++++++++++++++++++++++++++++++ packages/core/src/dispatcher.ts | 6 ++--- 2 files changed, 45 insertions(+), 3 deletions(-) diff --git a/packages/core/src/ceramic.ts b/packages/core/src/ceramic.ts index 57a07ad6d5..03cb872311 100644 --- a/packages/core/src/ceramic.ts +++ b/packages/core/src/ceramic.ts @@ -617,6 +617,7 @@ export class Ceramic implements StreamReaderWriter, StreamStateLoader { if (this._metricsConfig?.metricsPublisherEnabled) { // First, subscribe the node to the Model used for NodeMetrics const metricsModel = NodeMetrics.getModel(this._networkOptions.name) + await this._waitForMetricsModel(metricsModel) await this.repository.index.indexModels([{ streamID: metricsModel }]) await this.recon.registerInterest(metricsModel, this.did.id) @@ -655,6 +656,47 @@ export class Ceramic implements StreamReaderWriter, StreamStateLoader { ) } + /** + * Waits for Model used to publish NodeMetrics to be available locally. + * Since we subscribe to the metamodel at startup, so long as some connected node on the network + * has the model, it should eventually be available locally. + * @param model + */ + async _waitForMetricsModel(model: StreamID): Promise { + let attemptNum = 0 + let backoffMs = 100 + const maxBackoffMs = 1000 * 10 + const delay = async function (ms) { + return new Promise((resolve) => setTimeout(() => resolve(), ms)) + } + + // eslint-disable-next-line no-constant-condition + while (true) { + try { + await this.dispatcher.getFromIpfs(model.cid) + if (attemptNum > 0) { + this._logger.imp(`Model ${model} used to publish NodeMetrics loaded successfully`) + } + return + } catch (err) { + if (attemptNum == 0) { + this._logger.imp( + `Waiting for Model ${model} used to publish NodeMetrics to be available locally` + ) + } + if (attemptNum >= 5) { + this._logger.err(`Error loading Model ${model} used to publish NodeMetrics: ${err}`) + } + + await delay(backoffMs) + attemptNum++ + if (backoffMs <= maxBackoffMs) { + backoffMs *= 2 + } + } + } + } + /** * Runs some checks at node startup to ensure that the node is healthy and properly configured. * Throws an Error if any issues are detected diff --git a/packages/core/src/dispatcher.ts b/packages/core/src/dispatcher.ts index 1492d326bd..9fb7d2e8f1 100644 --- a/packages/core/src/dispatcher.ts +++ b/packages/core/src/dispatcher.ts @@ -339,7 +339,7 @@ export class Dispatcher { */ async retrieveCommit(cid: CID | string, streamId?: StreamID): Promise { try { - return await this._getFromIpfs(cid) + return await this.getFromIpfs(cid) } catch (e) { if (streamId) { this._logger.err( @@ -360,7 +360,7 @@ export class Dispatcher { */ async retrieveFromIPFS(cid: CID | string, path?: string): Promise { try { - return await this._getFromIpfs(cid, path) + return await this.getFromIpfs(cid, path) } catch (e) { this._logger.err(`Error while loading CID ${cid.toString()} from IPFS: ${e}`) throw e @@ -396,7 +396,7 @@ export class Dispatcher { /** * Helper function for loading a CID from IPFS */ - private async _getFromIpfs(cid: CID | string, path?: string): Promise { + async getFromIpfs(cid: CID | string, path?: string): Promise { const asCid = typeof cid === 'string' ? CID.parse(cid) : cid // Lookup CID in cache before looking it up IPFS From dc2f00e01a7f844ac8837d6a1b9f7eca1e65420b Mon Sep 17 00:00:00 2001 From: Spencer T Brody Date: Tue, 23 Jul 2024 17:20:51 -0400 Subject: [PATCH 3/5] Do not fail to start if metrics model unavailable --- packages/core/src/ceramic.ts | 85 +++++++++++++++++++----------------- 1 file changed, 45 insertions(+), 40 deletions(-) diff --git a/packages/core/src/ceramic.ts b/packages/core/src/ceramic.ts index 03cb872311..8dce76e9c1 100644 --- a/packages/core/src/ceramic.ts +++ b/packages/core/src/ceramic.ts @@ -617,30 +617,37 @@ export class Ceramic implements StreamReaderWriter, StreamStateLoader { if (this._metricsConfig?.metricsPublisherEnabled) { // First, subscribe the node to the Model used for NodeMetrics const metricsModel = NodeMetrics.getModel(this._networkOptions.name) - await this._waitForMetricsModel(metricsModel) - await this.repository.index.indexModels([{ streamID: metricsModel }]) - await this.recon.registerInterest(metricsModel, this.did.id) - - // Now start the NodeMetrics system. - const ipfsVersion = await this.ipfs.version() - const ipfsId = await this.ipfs.id() - - NodeMetrics.start({ - ceramic: this, - network: this._networkOptions.name, - ceramicVersion: this._versionInfo.cliPackageVersion, - ipfsVersion: ipfsVersion.version, - intervalMS: this._metricsConfig?.metricsPublishIntervalMS || DEFAULT_PUBLISH_INTERVAL_MS, - nodeId: ipfsId.publicKey, // what makes the best ID for the node? - nodeName: '', // daemon.hostname is not useful - nodeAuthDID: this.did.id, - nodeIPAddr: '', // daemon.hostname is not the external name - nodePeerId: ipfsId.publicKey, - logger: this._logger, - }) - this._logger.imp( - `Publishing Node Metrics publicly to the Ceramic Network. To learn more, including how to disable publishing, please see the NODE_METRICS.md file for your branch, e.g. https://github.com/ceramicnetwork/js-ceramic/blob/develop/docs-dev/NODE_METRICS.md` - ) + const modelLoaded = await this._waitForMetricsModel(metricsModel) + if (modelLoaded) { + await this.repository.index.indexModels([{ streamID: metricsModel }]) + await this.recon.registerInterest(metricsModel, this.did.id) + + // Now start the NodeMetrics system. + const ipfsVersion = await this.ipfs.version() + const ipfsId = await this.ipfs.id() + + NodeMetrics.start({ + ceramic: this, + network: this._networkOptions.name, + ceramicVersion: this._versionInfo.cliPackageVersion, + ipfsVersion: ipfsVersion.version, + intervalMS: + this._metricsConfig?.metricsPublishIntervalMS || DEFAULT_PUBLISH_INTERVAL_MS, + nodeId: ipfsId.publicKey, // what makes the best ID for the node? + nodeName: '', // daemon.hostname is not useful + nodeAuthDID: this.did.id, + nodeIPAddr: '', // daemon.hostname is not the external name + nodePeerId: ipfsId.publicKey, + logger: this._logger, + }) + this._logger.imp( + `Publishing Node Metrics publicly to the Ceramic Network. To learn more, including how to disable publishing, please see the NODE_METRICS.md file for your branch, e.g. https://github.com/ceramicnetwork/js-ceramic/blob/develop/docs-dev/NODE_METRICS.md` + ) + } else { + await this._logger.warn( + `Could not load Model ${metricsModel} used to publish Node Metrics, disabling metrics publishing` + ) + } } } else { // warn that the node does not have an authenticated did @@ -661,40 +668,38 @@ export class Ceramic implements StreamReaderWriter, StreamStateLoader { * Since we subscribe to the metamodel at startup, so long as some connected node on the network * has the model, it should eventually be available locally. * @param model + * @returns whether or not the model was loaded successfully */ - async _waitForMetricsModel(model: StreamID): Promise { - let attemptNum = 0 - let backoffMs = 100 - const maxBackoffMs = 1000 * 10 + async _waitForMetricsModel(model: StreamID): Promise { + const maxWaitDuration = 1000 * 10 // 10 seconds + const retryInterval = 100 + const maxRetries = maxWaitDuration / retryInterval const delay = async function (ms) { return new Promise((resolve) => setTimeout(() => resolve(), ms)) } - // eslint-disable-next-line no-constant-condition - while (true) { + for (let attemptNum = 0; attemptNum < maxRetries; attemptNum++) { try { await this.dispatcher.getFromIpfs(model.cid) if (attemptNum > 0) { - this._logger.imp(`Model ${model} used to publish NodeMetrics loaded successfully`) + this._logger.imp(`Model ${model} used to publish Node Metrics loaded successfully`) } - return + return true } catch (err) { if (attemptNum == 0) { this._logger.imp( - `Waiting for Model ${model} used to publish NodeMetrics to be available locally` + `Waiting for Model ${model} used to publish Node Metrics to be available locally` ) - } - if (attemptNum >= 5) { - this._logger.err(`Error loading Model ${model} used to publish NodeMetrics: ${err}`) + } else if (attemptNum % 10 == 0) { + // log error once a second + this._logger.err(`Error loading Model ${model} used to publish Node Metrics: ${err}`) } - await delay(backoffMs) + await delay(retryInterval) attemptNum++ - if (backoffMs <= maxBackoffMs) { - backoffMs *= 2 - } } } + return false } /** From 4df64a27f4d6adb5bfccf52f48a3e7854bea7511 Mon Sep 17 00:00:00 2001 From: Spencer T Brody Date: Tue, 23 Jul 2024 17:43:00 -0400 Subject: [PATCH 4/5] Increase timeout to 15 seconds --- packages/core/src/ceramic.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/core/src/ceramic.ts b/packages/core/src/ceramic.ts index 8dce76e9c1..3a878a6345 100644 --- a/packages/core/src/ceramic.ts +++ b/packages/core/src/ceramic.ts @@ -671,7 +671,7 @@ export class Ceramic implements StreamReaderWriter, StreamStateLoader { * @returns whether or not the model was loaded successfully */ async _waitForMetricsModel(model: StreamID): Promise { - const maxWaitDuration = 1000 * 10 // 10 seconds + const maxWaitDuration = 1000 * 15 // 10 seconds const retryInterval = 100 const maxRetries = maxWaitDuration / retryInterval const delay = async function (ms) { From d8339983f0f8fa6c6a7c26fd3ffeff7af07e0f4f Mon Sep 17 00:00:00 2001 From: Spencer T Brody Date: Tue, 23 Jul 2024 17:51:27 -0400 Subject: [PATCH 5/5] update comment --- packages/core/src/ceramic.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/core/src/ceramic.ts b/packages/core/src/ceramic.ts index 3a878a6345..4a16c4609b 100644 --- a/packages/core/src/ceramic.ts +++ b/packages/core/src/ceramic.ts @@ -671,7 +671,7 @@ export class Ceramic implements StreamReaderWriter, StreamStateLoader { * @returns whether or not the model was loaded successfully */ async _waitForMetricsModel(model: StreamID): Promise { - const maxWaitDuration = 1000 * 15 // 10 seconds + const maxWaitDuration = 1000 * 15 // 15 seconds const retryInterval = 100 const maxRetries = maxWaitDuration / retryInterval const delay = async function (ms) {