Skip to content

Commit

Permalink
feat: wait for metrics model to be available locally
Browse files Browse the repository at this point in the history
  • Loading branch information
stbrody committed Jul 29, 2024
1 parent db314c2 commit 9677435
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 26 deletions.
93 changes: 70 additions & 23 deletions packages/core/src/ceramic.ts
Original file line number Diff line number Diff line change
Expand Up @@ -617,29 +617,37 @@ export class Ceramic implements StreamReaderWriter, StreamStateLoader {
if (this._metricsConfig?.metricsPublisherEnabled) {
// First, subscribe the node to the Model used for NodeMetrics
const metricsModel = NodeMetrics.getModel(this._networkOptions.name)
await this.repository.index.indexModels([{ streamID: metricsModel }])
await this.recon.registerInterest(metricsModel, this.did.id)

// Now start the NodeMetrics system.
const ipfsVersion = await this.ipfs.version()
const ipfsId = await this.ipfs.id()

NodeMetrics.start({
ceramic: this,
network: this._networkOptions.name,
ceramicVersion: this._versionInfo.cliPackageVersion,
ipfsVersion: ipfsVersion.version,
intervalMS: this._metricsConfig?.metricsPublishIntervalMS || DEFAULT_PUBLISH_INTERVAL_MS,
nodeId: ipfsId.publicKey, // what makes the best ID for the node?
nodeName: '', // daemon.hostname is not useful
nodeAuthDID: this.did.id,
nodeIPAddr: '', // daemon.hostname is not the external name
nodePeerId: ipfsId.publicKey,
logger: this._logger,
})
this._logger.imp(
`Publishing Node Metrics publicly to the Ceramic Network. To learn more, including how to disable publishing, please see the NODE_METRICS.md file for your branch, e.g. https://github.com/ceramicnetwork/js-ceramic/blob/develop/docs-dev/NODE_METRICS.md`
)
const modelLoaded = await this._waitForMetricsModel(metricsModel)
if (modelLoaded) {
await this.repository.index.indexModels([{ streamID: metricsModel }])
await this.recon.registerInterest(metricsModel, this.did.id)

// Now start the NodeMetrics system.
const ipfsVersion = await this.ipfs.version()
const ipfsId = await this.ipfs.id()

NodeMetrics.start({
ceramic: this,
network: this._networkOptions.name,
ceramicVersion: this._versionInfo.cliPackageVersion,
ipfsVersion: ipfsVersion.version,
intervalMS:
this._metricsConfig?.metricsPublishIntervalMS || DEFAULT_PUBLISH_INTERVAL_MS,
nodeId: ipfsId.publicKey, // what makes the best ID for the node?
nodeName: '', // daemon.hostname is not useful
nodeAuthDID: this.did.id,
nodeIPAddr: '', // daemon.hostname is not the external name
nodePeerId: ipfsId.publicKey,
logger: this._logger,
})
this._logger.imp(
`Publishing Node Metrics publicly to the Ceramic Network. To learn more, including how to disable publishing, please see the NODE_METRICS.md file for your branch, e.g. https://github.com/ceramicnetwork/js-ceramic/blob/develop/docs-dev/NODE_METRICS.md`
)
} else {
await this._logger.warn(
`Could not load Model ${metricsModel} used to publish Node Metrics, disabling metrics publishing`
)
}
}
} else {
// warn that the node does not have an authenticated did
Expand All @@ -655,6 +663,45 @@ export class Ceramic implements StreamReaderWriter, StreamStateLoader {
)
}

/**
* Waits for Model used to publish NodeMetrics to be available locally.
* Since we subscribe to the metamodel at startup, so long as some connected node on the network
* has the model, it should eventually be available locally.
* @param model
* @returns whether or not the model was loaded successfully
*/
async _waitForMetricsModel(model: StreamID): Promise<boolean> {
const maxWaitDuration = 1000 * 15 // 15 seconds
const retryInterval = 100
const maxRetries = maxWaitDuration / retryInterval
const delay = async function (ms) {
return new Promise<void>((resolve) => setTimeout(() => resolve(), ms))
}

for (let attemptNum = 0; attemptNum < maxRetries; attemptNum++) {
try {
await this.dispatcher.getFromIpfs(model.cid)
if (attemptNum > 0) {
this._logger.imp(`Model ${model} used to publish Node Metrics loaded successfully`)
}
return true
} catch (err) {
if (attemptNum == 0) {
this._logger.imp(
`Waiting for Model ${model} used to publish Node Metrics to be available locally`
)
} else if (attemptNum % 10 == 0) {
// log error once a second
this._logger.err(`Error loading Model ${model} used to publish Node Metrics: ${err}`)
}

await delay(retryInterval)
attemptNum++
}
}
return false
}

/**
* Runs some checks at node startup to ensure that the node is healthy and properly configured.
* Throws an Error if any issues are detected
Expand Down
6 changes: 3 additions & 3 deletions packages/core/src/dispatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ export class Dispatcher {
*/
async retrieveCommit(cid: CID | string, streamId?: StreamID): Promise<any> {
try {
return await this._getFromIpfs(cid)
return await this.getFromIpfs(cid)
} catch (e) {
if (streamId) {
this._logger.err(
Expand All @@ -380,7 +380,7 @@ export class Dispatcher {
*/
async retrieveFromIPFS(cid: CID | string, path?: string): Promise<any> {
try {
return await this._getFromIpfs(cid, path)
return await this.getFromIpfs(cid, path)
} catch (e) {
this._logger.err(`Error while loading CID ${cid.toString()} from IPFS: ${e}`)
throw e
Expand Down Expand Up @@ -416,7 +416,7 @@ export class Dispatcher {
/**
* Helper function for loading a CID from IPFS
*/
private async _getFromIpfs(cid: CID | string, path?: string): Promise<any> {
async getFromIpfs(cid: CID | string, path?: string): Promise<any> {
const asCid = typeof cid === 'string' ? CID.parse(cid) : cid

// Lookup CID in cache before looking it up IPFS
Expand Down

0 comments on commit 9677435

Please sign in to comment.