diff --git a/packages/base-test-utils/src/index.ts b/packages/base-test-utils/src/index.ts index 8a452d9ef2..41faa04f89 100644 --- a/packages/base-test-utils/src/index.ts +++ b/packages/base-test-utils/src/index.ts @@ -54,6 +54,7 @@ export class BaseTestUtils { throw new Error(baseErrMsg + ': ' + customMsg) } + // TODO: De-dupe this with `delayOrAbort` in abort-signal-utils.ts static async delay(ms: number, signal?: AbortSignal): Promise { return new Promise((resolve, reject) => { const timeout = setTimeout(() => resolve(), ms) diff --git a/packages/common/src/utils/abort-signal-utils.ts b/packages/common/src/utils/abort-signal-utils.ts index 007622d25a..304a594dfd 100644 --- a/packages/common/src/utils/abort-signal-utils.ts +++ b/packages/common/src/utils/abort-signal-utils.ts @@ -66,3 +66,18 @@ export async function abortable( original.removeEventListener('abort', onAbort) }) } + +export async function delayOrAbort(ms: number, signal?: AbortSignal): Promise { + return new Promise((resolve, reject) => { + const timeout = setTimeout(() => resolve(), ms) + if (signal) { + const handleAbort = () => { + clearTimeout(timeout) + signal.removeEventListener('abort', handleAbort) + reject(signal.reason) + } + if (signal.aborted) handleAbort() + signal.addEventListener('abort', handleAbort) + } + }) +} diff --git a/packages/core/src/ceramic.ts b/packages/core/src/ceramic.ts index b1df5925a4..79f3a69009 100644 --- a/packages/core/src/ceramic.ts +++ b/packages/core/src/ceramic.ts @@ -24,6 +24,7 @@ import { CeramicSigner, StreamStateLoader, StreamReaderWriter, + delayOrAbort, } from '@ceramicnetwork/common' import { DEFAULT_TRACE_SAMPLE_RATIO, @@ -615,35 +616,21 @@ export class Ceramic implements StreamReaderWriter, StreamStateLoader { // If authenticated into the node, we can start publishing metrics // publishing metrics is enabled by default, even if no metrics config if (this._metricsConfig?.metricsPublisherEnabled) { - // First, subscribe the node to the Model used for NodeMetrics - const metricsModel = NodeMetrics.getModel(this._networkOptions.name) - await this.repository.index.indexModels([{ streamID: metricsModel }]) - await this.recon.registerInterest(metricsModel, this.did.id) - - // Now start the NodeMetrics system. - const ipfsVersion = await this.ipfs.version() - const ipfsId = await this.ipfs.id() - - NodeMetrics.start({ - ceramic: this, - network: this._networkOptions.name, - ceramicVersion: this._versionInfo.cliPackageVersion, - ipfsVersion: ipfsVersion.version, - intervalMS: this._metricsConfig?.metricsPublishIntervalMS || DEFAULT_PUBLISH_INTERVAL_MS, - nodeId: ipfsId.publicKey, // what makes the best ID for the node? - nodeName: '', // daemon.hostname is not useful - nodeAuthDID: this.did.id, - nodeIPAddr: '', // daemon.hostname is not the external name - nodePeerId: ipfsId.publicKey, - logger: this._logger, - }) - this._logger.imp( - `Publishing Node Metrics publicly to the Ceramic Network. To learn more, including how to disable publishing, please see the NODE_METRICS.md file for your branch, e.g. https://github.com/ceramicnetwork/js-ceramic/blob/develop/docs-dev/NODE_METRICS.md` - ) + if (EnvironmentUtils.useRustCeramic()) { + // Start a background job that will wait for the Model to be available (synced over Recon) + // and then start publishing to it. + const metricsModel = NodeMetrics.getModel(this._networkOptions.name) + void this._waitForMetricsModel(metricsModel).then( + this._startPublishingNodeMetrics.bind(this, metricsModel) + ) + } else { + this._logger.warn( + `Disabling publishing of Node Metrics because we are not connected to a Recon-compatible p2p node` + ) + } } } else { - // warn that the node does not have an authenticated did - this._logger.imp( + this._logger.warn( `The ceramic daemon is running without an authenticated DID. This means that this node cannot itself publish streams, including node metrics, and cannot use a DID as the method to authenticate with the Ceramic Anchor Service. See https://developers.ceramic.network/docs/composedb/guides/composedb-server/access-mainnet#updating-to-did-based-authentication for instructions on how to update your node to use DID authentication.` ) } @@ -655,6 +642,74 @@ export class Ceramic implements StreamReaderWriter, StreamStateLoader { ) } + /** + * Starts up the subsystem to periodically publish Node Metrics to a Stream. + * Requires the data for the NodeMetrics Model to already be available locally + * in the ceramic-one blockstore. + * @param metricsModel - the StreamID of the Model that Node Metrics should be published to. + */ + async _startPublishingNodeMetrics(metricsModel: StreamID): Promise { + await this.repository.index.indexModels([{ streamID: metricsModel }]) + await this.recon.registerInterest(metricsModel, this.did.id) + + // Now start the NodeMetrics system. + const ipfsVersion = await this.ipfs.version() + const ipfsId = await this.ipfs.id() + + NodeMetrics.start({ + ceramic: this, + network: this._networkOptions.name, + ceramicVersion: this._versionInfo.cliPackageVersion, + ipfsVersion: ipfsVersion.version, + intervalMS: this._metricsConfig?.metricsPublishIntervalMS || DEFAULT_PUBLISH_INTERVAL_MS, + nodeId: ipfsId.publicKey, // what makes the best ID for the node? + nodeName: '', // daemon.hostname is not useful + nodeAuthDID: this.did.id, + nodeIPAddr: '', // daemon.hostname is not the external name + nodePeerId: ipfsId.publicKey, + logger: this._logger, + }) + this._logger.imp( + `Publishing Node Metrics publicly to the Ceramic Network. To learn more, including how to disable publishing, please see the NODE_METRICS.md file for your branch, e.g. https://github.com/ceramicnetwork/js-ceramic/blob/develop/docs-dev/NODE_METRICS.md` + ) + } + + /** + * Waits for Model used to publish NodeMetrics to be available locally. + * Since we subscribe to the metamodel at startup, so long as some connected node on the network + * has the model, it should eventually be available locally. + * @param model + */ + async _waitForMetricsModel(model: StreamID): Promise { + let attemptNum = 0 + let backoffMs = 100 + const maxBackoffMs = 1000 * 60 // Caps off at checking once per minute + + while (!this._shutdownSignal.isShuttingDown()) { + try { + await this.dispatcher.getFromIpfs(model.cid) + if (attemptNum > 0) { + this._logger.imp(`Model ${model} used to publish Node Metrics loaded successfully`) + } + return + } catch (err) { + if (attemptNum == 0) { + this._logger.imp( + `Waiting for Model ${model} used to publish Node Metrics to be available locally` + ) + } else if (attemptNum % 5 == 0) { + this._logger.err(`Error loading Model ${model} used to publish Node Metrics: ${err}`) + } + + await this._shutdownSignal.abortable((signal) => delayOrAbort(backoffMs, signal)) + attemptNum++ + if (backoffMs <= maxBackoffMs) { + backoffMs *= 2 + } + } + } + } + /** * Runs some checks at node startup to ensure that the node is healthy and properly configured. * Throws an Error if any issues are detected diff --git a/packages/core/src/dispatcher.ts b/packages/core/src/dispatcher.ts index 559210105e..c2cef4518d 100644 --- a/packages/core/src/dispatcher.ts +++ b/packages/core/src/dispatcher.ts @@ -359,7 +359,7 @@ export class Dispatcher { */ async retrieveCommit(cid: CID | string, streamId?: StreamID): Promise { try { - return await this._getFromIpfs(cid) + return await this.getFromIpfs(cid) } catch (e) { if (streamId) { this._logger.err( @@ -380,7 +380,7 @@ export class Dispatcher { */ async retrieveFromIPFS(cid: CID | string, path?: string): Promise { try { - return await this._getFromIpfs(cid, path) + return await this.getFromIpfs(cid, path) } catch (e) { this._logger.err(`Error while loading CID ${cid.toString()} from IPFS: ${e}`) throw e @@ -416,7 +416,7 @@ export class Dispatcher { /** * Helper function for loading a CID from IPFS */ - private async _getFromIpfs(cid: CID | string, path?: string): Promise { + async getFromIpfs(cid: CID | string, path?: string): Promise { const asCid = typeof cid === 'string' ? CID.parse(cid) : cid // Lookup CID in cache before looking it up IPFS diff --git a/packages/core/src/shutdown-signal.ts b/packages/core/src/shutdown-signal.ts index aa9ee1ab72..f061b88b0f 100644 --- a/packages/core/src/shutdown-signal.ts +++ b/packages/core/src/shutdown-signal.ts @@ -7,6 +7,7 @@ import { Observer, Subject } from 'rxjs' */ export class ShutdownSignal { private subject: Subject = new Subject() + private shuttingDown = false /** * Subscribers to the signal. @@ -20,6 +21,11 @@ export class ShutdownSignal { */ abort(): void { this.subject.complete() + this.shuttingDown = true + } + + isShuttingDown(): boolean { + return this.shuttingDown } /**