Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: wait for metrics model to be available locally #3265

Closed
wants to merge 8 commits into from
93 changes: 70 additions & 23 deletions packages/core/src/ceramic.ts
Original file line number Diff line number Diff line change
Expand Up @@ -617,29 +617,37 @@ export class Ceramic implements StreamReaderWriter, StreamStateLoader {
if (this._metricsConfig?.metricsPublisherEnabled) {
// First, subscribe the node to the Model used for NodeMetrics
const metricsModel = NodeMetrics.getModel(this._networkOptions.name)
await this.repository.index.indexModels([{ streamID: metricsModel }])
await this.recon.registerInterest(metricsModel, this.did.id)

// Now start the NodeMetrics system.
const ipfsVersion = await this.ipfs.version()
const ipfsId = await this.ipfs.id()

NodeMetrics.start({
ceramic: this,
network: this._networkOptions.name,
ceramicVersion: this._versionInfo.cliPackageVersion,
ipfsVersion: ipfsVersion.version,
intervalMS: this._metricsConfig?.metricsPublishIntervalMS || DEFAULT_PUBLISH_INTERVAL_MS,
nodeId: ipfsId.publicKey, // what makes the best ID for the node?
nodeName: '', // daemon.hostname is not useful
nodeAuthDID: this.did.id,
nodeIPAddr: '', // daemon.hostname is not the external name
nodePeerId: ipfsId.publicKey,
logger: this._logger,
})
this._logger.imp(
`Publishing Node Metrics publicly to the Ceramic Network. To learn more, including how to disable publishing, please see the NODE_METRICS.md file for your branch, e.g. https://github.com/ceramicnetwork/js-ceramic/blob/develop/docs-dev/NODE_METRICS.md`
)
const modelLoaded = await this._waitForMetricsModel(metricsModel)
if (modelLoaded) {
await this.repository.index.indexModels([{ streamID: metricsModel }])
await this.recon.registerInterest(metricsModel, this.did.id)

// Now start the NodeMetrics system.
const ipfsVersion = await this.ipfs.version()
const ipfsId = await this.ipfs.id()

NodeMetrics.start({
ceramic: this,
network: this._networkOptions.name,
ceramicVersion: this._versionInfo.cliPackageVersion,
ipfsVersion: ipfsVersion.version,
intervalMS:
this._metricsConfig?.metricsPublishIntervalMS || DEFAULT_PUBLISH_INTERVAL_MS,
nodeId: ipfsId.publicKey, // what makes the best ID for the node?
nodeName: '', // daemon.hostname is not useful
nodeAuthDID: this.did.id,
nodeIPAddr: '', // daemon.hostname is not the external name
nodePeerId: ipfsId.publicKey,
logger: this._logger,
})
this._logger.imp(
`Publishing Node Metrics publicly to the Ceramic Network. To learn more, including how to disable publishing, please see the NODE_METRICS.md file for your branch, e.g. https://github.com/ceramicnetwork/js-ceramic/blob/develop/docs-dev/NODE_METRICS.md`
)
} else {
await this._logger.warn(
`Could not load Model ${metricsModel} used to publish Node Metrics, disabling metrics publishing`
)
}
}
} else {
// warn that the node does not have an authenticated did
Expand All @@ -655,6 +663,45 @@ export class Ceramic implements StreamReaderWriter, StreamStateLoader {
)
}

/**
* Waits for Model used to publish NodeMetrics to be available locally.
* Since we subscribe to the metamodel at startup, so long as some connected node on the network
* has the model, it should eventually be available locally.
* @param model
* @returns whether or not the model was loaded successfully
*/
async _waitForMetricsModel(model: StreamID): Promise<boolean> {
const maxWaitDuration = 1000 * 15 // 15 seconds
const retryInterval = 100
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think its going to take more than .1 sec? a cascading backoff might be nice. but this is fine for now.

const maxRetries = maxWaitDuration / retryInterval
const delay = async function (ms) {
return new Promise<void>((resolve) => setTimeout(() => resolve(), ms))
}

for (let attemptNum = 0; attemptNum < maxRetries; attemptNum++) {
try {
await this.dispatcher.getFromIpfs(model.cid)
if (attemptNum > 0) {
this._logger.imp(`Model ${model} used to publish Node Metrics loaded successfully`)
}
return true
} catch (err) {
if (attemptNum == 0) {
this._logger.imp(
`Waiting for Model ${model} used to publish Node Metrics to be available locally`
)
} else if (attemptNum % 10 == 0) {
// log error once a second
this._logger.err(`Error loading Model ${model} used to publish Node Metrics: ${err}`)
}

await delay(retryInterval)
attemptNum++
}
}
return false
}

/**
* Runs some checks at node startup to ensure that the node is healthy and properly configured.
* Throws an Error if any issues are detected
Expand Down
6 changes: 3 additions & 3 deletions packages/core/src/dispatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ export class Dispatcher {
*/
async retrieveCommit(cid: CID | string, streamId?: StreamID): Promise<any> {
try {
return await this._getFromIpfs(cid)
return await this.getFromIpfs(cid)
} catch (e) {
if (streamId) {
this._logger.err(
Expand All @@ -380,7 +380,7 @@ export class Dispatcher {
*/
async retrieveFromIPFS(cid: CID | string, path?: string): Promise<any> {
try {
return await this._getFromIpfs(cid, path)
return await this.getFromIpfs(cid, path)
} catch (e) {
this._logger.err(`Error while loading CID ${cid.toString()} from IPFS: ${e}`)
throw e
Expand Down Expand Up @@ -416,7 +416,7 @@ export class Dispatcher {
/**
* Helper function for loading a CID from IPFS
*/
private async _getFromIpfs(cid: CID | string, path?: string): Promise<any> {
async getFromIpfs(cid: CID | string, path?: string): Promise<any> {
const asCid = typeof cid === 'string' ? CID.parse(cid) : cid

// Lookup CID in cache before looking it up IPFS
Expand Down