Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/yk/add-missing-licenses' into yk…
Browse files Browse the repository at this point in the history
…/add-missing-licenses
  • Loading branch information
yashkohli88 committed Dec 24, 2024
2 parents a335588 + b170cff commit 2d70d75
Show file tree
Hide file tree
Showing 21 changed files with 1,113 additions and 76 deletions.
11 changes: 10 additions & 1 deletion app.js
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ function createApp(config) {
const harvestQueue = config.harvest.queue()
initializers.push(async () => harvestQueue.initialize())

const upgradeHandler = config.upgrade.service({ queue: config.upgrade.queue })
initializers.push(async () => upgradeHandler.initialize())

const definitionService = require('./business/definitionService')(
harvestStore,
harvestService,
Expand All @@ -67,7 +70,8 @@ function createApp(config) {
curationService,
definitionStore,
searchService,
cachingService
cachingService,
upgradeHandler
)
// Circular dependency. Reach in and set the curationService's definitionService. Sigh.
curationService.definitionService = definitionService
Expand Down Expand Up @@ -100,6 +104,10 @@ function createApp(config) {
crawlerSecret
)

// enable heap stats logging at an interval if configured
const trySetHeapLoggingAtInterval = require('./lib/heapLogger')
trySetHeapLoggingAtInterval(config, logger)

const app = express()
app.use(cors())
app.options('*', cors())
Expand Down Expand Up @@ -230,6 +238,7 @@ function createApp(config) {
// kick off the queue processors
require('./providers/curation/process')(curationQueue, curationService, logger)
require('./providers/harvest/process')(harvestQueue, definitionService, logger)
upgradeHandler.setupProcessing(definitionService, logger)

// Signal system is up and ok (no error)
callback()
Expand Down
10 changes: 9 additions & 1 deletion bin/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ module.exports = {
definition: {
store: loadFactory(config.get('DEFINITION_STORE_PROVIDER') || 'file', 'definition')
},
upgrade: {
queue: loadFactory(config.get('DEFINITION_UPGRADE_QUEUE_PROVIDER') || 'memory', 'upgrade.queue'),
service: loadFactory(config.get('DEFINITION_UPGRADE_PROVIDER') || 'versionCheck', 'upgrade.service')
},
attachment: {
store: loadFactory(config.get('ATTACHMENT_STORE_PROVIDER') || 'file', 'attachment')
},
Expand Down Expand Up @@ -100,5 +104,9 @@ module.exports = {
crawlerKey: config.get('APPINSIGHTS_CRAWLER_APIKEY')
},
appVersion: config.get('APP_VERSION'),
buildsha: config.get('BUILD_SHA')
buildsha: config.get('BUILD_SHA'),
heapstats: {
logHeapstats: config.get('LOG_NODE_HEAPSTATS'),
logInverval: config.get('LOG_NODE_HEAPSTATS_INTERVAL_MS')
}
}
27 changes: 21 additions & 6 deletions business/definitionService.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ const currentSchema = '1.7.0'
const weights = { declared: 30, discovered: 25, consistency: 15, spdx: 15, texts: 15, date: 30, source: 70 }

class DefinitionService {
constructor(harvestStore, harvestService, summary, aggregator, curation, store, search, cache) {
constructor(harvestStore, harvestService, summary, aggregator, curation, store, search, cache, upgradeHandler) {
this.harvestStore = harvestStore
this.harvestService = harvestService
this.summaryService = summary
Expand All @@ -48,9 +48,15 @@ class DefinitionService {
this.definitionStore = store
this.search = search
this.cache = cache
this.upgradeHandler = upgradeHandler
if (this.upgradeHandler) this.upgradeHandler.currentSchema = currentSchema
this.logger = logger()
}

get currentSchema() {
return currentSchema
}

/**
* Get the final representation of the specified definition and optionally apply the indicated
* curation.
Expand All @@ -68,11 +74,10 @@ class DefinitionService {
return this.compute(coordinates, curation)
}
const existing = await this._cacheExistingAside(coordinates, force)
let result
if (get(existing, '_meta.schemaVersion') === currentSchema) {
let result = await this.upgradeHandler.validate(existing)
if (result) {
// Log line used for /status page insights
this.logger.info('computed definition available', { coordinates: coordinates.toString() })
result = existing
} else result = force ? await this.computeAndStore(coordinates) : await this.computeStoreAndCurate(coordinates)
return this._trimDefinition(this._cast(result), expand)
}
Expand Down Expand Up @@ -598,5 +603,15 @@ class DefinitionService {
}
}

module.exports = (harvestStore, harvestService, summary, aggregator, curation, store, search, cache) =>
new DefinitionService(harvestStore, harvestService, summary, aggregator, curation, store, search, cache)
module.exports = (harvestStore, harvestService, summary, aggregator, curation, store, search, cache, versionHandler) =>
new DefinitionService(
harvestStore,
harvestService,
summary,
aggregator,
curation,
store,
search,
cache,
versionHandler
)
6 changes: 5 additions & 1 deletion full.env.json
Original file line number Diff line number Diff line change
Expand Up @@ -47,5 +47,9 @@
"CRAWLER_QUEUE_PROVIDER": "memory",
"CRAWLER_SERVICE_PORT": "5000",
"CRAWLER_GITHUB_TOKEN": "< GitHub PAT here >",
"SCANCODE_HOME": "< ScanCode install location e.g., c:\\installs\\scancode-toolkit-2.2.1 >"
"SCANCODE_HOME": "< ScanCode install location e.g., c:\\installs\\scancode-toolkit-2.2.1 >",

"========== Heapstats Logging settings (OPTIONAL) ==========": "",
"LOG_NODE_HEAPSTATS": "<true|false>",
"LOG_NODE_HEAPSTATS_INTERVAL_MS": "<time_in_milliseconds (e.g. '30000')>"
}
86 changes: 86 additions & 0 deletions lib/heapLogger.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// (c) Copyright 2024, Microsoft and ClearlyDefined contributors. Licensed under the MIT license.
// SPDX-License-Identifier: MIT

// ===================================================
// Log the heap statistics at regular intervals
// ===================================================
// NOTE: set 'LOG_NODE_HEAPSTATS' env var to 'true' to log heap stats
// NOTE: set 'LOG_NODE_HEAPSTATS_INTERVAL_MS' env var to '<time_in_milliseconds>' for logging interval
// NOTE: To better understand heap stats being logged, check:
// - https://nodejs.org/docs/v22.12.0/api/v8.html#v8getheapspacestatistics
// - https://nodejs.org/docs/v22.12.0/api/v8.html#v8getheapstatistics
function trySetHeapLoggingAtInterval(config, logger) {
logger.debug('heapLogger.js :: Entered "trySetHeapLoggingAtInterval"...')

const shouldLogHeapstats = config.heapstats.logHeapstats
? config.heapstats.logHeapstats.toLowerCase() === 'true'
: false

logger.debug(`heapLogger.js :: "shouldLogHeapstats" set to "${shouldLogHeapstats}"`)

if (shouldLogHeapstats) {
const v8 = require('v8')

const addCommas = num => Number(num).toLocaleString()
const isNumeric = num => !isNaN(Number(num))

// Set the heapstats logging interval
const maybeInterval = config.heapstats.logInverval
const heapStatsInverval = maybeInterval && isNumeric(maybeInterval) ? maybeInterval : 30000

logger.debug(`heapLogger.js :: heap stats logging interval will be "${heapStatsInverval}" ms`)

// Function to log the heap space statistics
const logHeapSpaceStats = () => {
// Get the current timestamp
const currentTimestamp = new Date().toISOString()

// Get the heap space statistics
const heapSpaceStats = v8.getHeapSpaceStatistics()

heapSpaceStats.forEach(space => {
const heapStatsMessage =
`[${currentTimestamp}] Heap Space Statistics: ` +
`Space Name: '${space.space_name}', ` +
`Space Size: '${addCommas(space.space_size)}' bytes, ` +
`Space Used Size: '${addCommas(space.space_used_size)}' bytes, ` +
`Space Available Size: '${addCommas(space.space_available_size)}' bytes, ` +
`Physical Space Size: '${addCommas(space.physical_space_size)}' bytes` +
'\n--------------------------'

logger.info(heapStatsMessage)
})

// Get the heap statistics
const heapStats = v8.getHeapStatistics()

const heapStatsMessage =
`[${currentTimestamp}] Heap Statistics: ` +
`Total Heap Size: '${addCommas(heapStats.total_heap_size)}' bytes, ` +
`Total Heap Size Executable: '${addCommas(heapStats.total_heap_size_executable)}' bytes, ` +
`Total Physical Size: '${addCommas(heapStats.total_physical_size)}' bytes, ` +
`Total Available Size: '${addCommas(heapStats.total_available_size)}' bytes, ` +
`Used Heap Size: '${addCommas(heapStats.used_heap_size)}' bytes, ` +
`Heap Size Limit: '${addCommas(heapStats.heap_size_limit)}' bytes` +
'\n--------------------------'

logger.info(heapStatsMessage)
}

// Only run if not in a test environment
if (process.argv.every(arg => !arg.includes('mocha'))) {
logger.debug(`heapLogger.js :: setting heap stats logging at "${heapStatsInverval}" ms interval...`)

// Set the interval to log the heap space statistics
setInterval(logHeapSpaceStats, heapStatsInverval)

logger.debug(`heapLogger.js :: set heap stats logging at "${heapStatsInverval}" ms interval.`)
}
} else {
logger.debug('heapLogger.js :: heap stats logging not enabled.')
}

logger.debug('heapLogger.js :: Exiting "trySetHeapLoggingAtInterval".')
}

module.exports = trySetHeapLoggingAtInterval
6 changes: 5 additions & 1 deletion minimal.env.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,9 @@

"========== Service Curation settings ==========": "",
"CURATION_GITHUB_REPO": "sample-curated-data",
"CURATION_GITHUB_TOKEN": "<GitHub token here>"
"CURATION_GITHUB_TOKEN": "<GitHub token here>",

"========== Heapstats Logging settings (OPTIONAL) ==========": "",
"LOG_NODE_HEAPSTATS": "<true|false>",
"LOG_NODE_HEAPSTATS_INTERVAL_MS": "<time_in_milliseconds (e.g. '30000')>"
}
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "service",
"version": "2.1.0",
"version": "2.2.0",
"description": "Service side of clearlydefined.io.",
"scripts": {
"test": "npm run mocha && npm run lint",
Expand All @@ -12,7 +12,7 @@
"prettier:check": "prettier . --check",
"prettier:write": "prettier . --write",
"dev": "nodemon ./bin/www",
"start": "node ./bin/www",
"start": "node --max-old-space-size=8192 ./bin/www",
"postinstall": "patch-package"
},
"license": "MIT",
Expand Down
10 changes: 10 additions & 0 deletions providers/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,16 @@ module.exports = {
auth: {
github: require('../middleware/githubConfig')
},
upgrade: {
queue: {
azure: require('../providers/upgrade/azureQueueConfig'),
memory: require('../providers/upgrade/memoryQueueConfig')
},
service: {
versionCheck: require('../providers/upgrade/defVersionCheck').factory,
upgradeQueue: require('../providers/upgrade/defUpgradeQueueConfig')
}
},
curation: {
queue: {
azure: require('../providers/curation/azureQueueConfig'),
Expand Down
20 changes: 17 additions & 3 deletions providers/queueing/memoryQueue.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ class MemoryQueue {
this.logger = logger()
this.data = []
this.messageId = 0
this.decoder = options.decoder
}

async initialize() {}
Expand All @@ -33,14 +34,19 @@ class MemoryQueue {
const message = this.data[0]
if (!message) return null
this.data[0].dequeueCount++
if (message.dequeueCount <= 5) return Promise.resolve({ original: message, data: JSON.parse(message.messageText) })
if (message.dequeueCount <= 5) return Promise.resolve({ original: message, data: this._parseData(message) })
await this.delete({ original: message })
return this.dequeue()
}

_parseData({ messageText }) {
return JSON.parse(this.decoder(messageText))
}

/** Similar to dequeue() but returns an array instead. See AzureStorageQueue.dequeueMultiple() */
async dequeueMultiple() {
return [await this.dequeue()]
const message = await this.dequeue()
return message ? [message] : []
}

/**
Expand All @@ -58,4 +64,12 @@ class MemoryQueue {
}
}

module.exports = () => new MemoryQueue()
const factory = (opts = {}) => {
const defaultOpts = {
decoder: text => text
}
const mergedOpts = { ...defaultOpts, ...opts }
return new MemoryQueue(mergedOpts)
}

module.exports = factory
22 changes: 22 additions & 0 deletions providers/upgrade/azureQueueConfig.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// (c) Copyright 2024, SAP SE and ClearlyDefined contributors. Licensed under the MIT license.
// SPDX-License-Identifier: MIT

const config = require('painless-config')
const AzureStorageQueue = require('../queueing/azureStorageQueue')

const defaultOptions = {
connectionString:
config.get('DEFINITION_UPGRADE_QUEUE_CONNECTION_STRING') || config.get('HARVEST_AZBLOB_CONNECTION_STRING'),
queueName: config.get('DEFINITION_UPGRADE_QUEUE_NAME') || 'definitions-upgrade',
dequeueOptions: {
numOfMessages: config.get('DEFINITION_UPGRADE_DEQUEUE_BATCH_SIZE') || 16,
visibilityTimeout: 10 * 60 // 10 min. The default value is 30 seconds.
}
}

function azure(options) {
const realOptions = options || defaultOptions
return new AzureStorageQueue(realOptions)
}

module.exports = azure
50 changes: 50 additions & 0 deletions providers/upgrade/defUpgradeQueue.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// (c) Copyright 2024, SAP SE and ClearlyDefined contributors. Licensed under the MIT license.
// SPDX-License-Identifier: MIT

const { DefinitionVersionChecker } = require('./defVersionCheck')
const { setup } = require('./process')

class DefinitionQueueUpgrader extends DefinitionVersionChecker {
async validate(definition) {
if (!definition) return
const result = await super.validate(definition)
if (result) return result

await this._queueUpgrade(definition)
return definition
}

async _queueUpgrade(definition) {
if (!this._upgrade) throw new Error('Upgrade queue is not set')
try {
const message = this._constructMessage(definition)
await this._upgrade.queue(message)
this.logger.info('Queued for definition upgrade ', {
coordinates: DefinitionVersionChecker.getCoordinates(definition)
})
} catch (error) {
//continue if queueing fails and requeue at the next request.
this.logger.error(`Error queueing for definition upgrade ${error.message}`, {
error,
coordinates: DefinitionVersionChecker.getCoordinates(definition)
})
}
}

_constructMessage(definition) {
const { coordinates, _meta } = definition
const content = { coordinates, _meta }
return Buffer.from(JSON.stringify(content)).toString('base64')
}

async initialize() {
this._upgrade = this.options.queue()
return this._upgrade.initialize()
}

setupProcessing(definitionService, logger, once) {
return setup(this._upgrade, definitionService, logger, once)
}
}

module.exports = DefinitionQueueUpgrader
Loading

0 comments on commit 2d70d75

Please sign in to comment.