diff --git a/app.js b/app.js index ce566c9cf..c0dfa06f3 100644 --- a/app.js +++ b/app.js @@ -59,6 +59,9 @@ function createApp(config) { const harvestQueue = config.harvest.queue() initializers.push(async () => harvestQueue.initialize()) + const upgradeHandler = config.upgrade.service({ queue: config.upgrade.queue }) + initializers.push(async () => upgradeHandler.initialize()) + const definitionService = require('./business/definitionService')( harvestStore, harvestService, @@ -67,7 +70,8 @@ function createApp(config) { curationService, definitionStore, searchService, - cachingService + cachingService, + upgradeHandler ) // Circular dependency. Reach in and set the curationService's definitionService. Sigh. curationService.definitionService = definitionService @@ -100,6 +104,10 @@ function createApp(config) { crawlerSecret ) + // enable heap stats logging at an interval if configured + const trySetHeapLoggingAtInterval = require('./lib/heapLogger') + trySetHeapLoggingAtInterval(config, logger) + const app = express() app.use(cors()) app.options('*', cors()) @@ -230,6 +238,7 @@ function createApp(config) { // kick off the queue processors require('./providers/curation/process')(curationQueue, curationService, logger) require('./providers/harvest/process')(harvestQueue, definitionService, logger) + upgradeHandler.setupProcessing(definitionService, logger) // Signal system is up and ok (no error) callback() diff --git a/bin/config.js b/bin/config.js index eea97035a..3ec0993e4 100644 --- a/bin/config.js +++ b/bin/config.js @@ -57,6 +57,10 @@ module.exports = { definition: { store: loadFactory(config.get('DEFINITION_STORE_PROVIDER') || 'file', 'definition') }, + upgrade: { + queue: loadFactory(config.get('DEFINITION_UPGRADE_QUEUE_PROVIDER') || 'memory', 'upgrade.queue'), + service: loadFactory(config.get('DEFINITION_UPGRADE_PROVIDER') || 'versionCheck', 'upgrade.service') + }, attachment: { store: loadFactory(config.get('ATTACHMENT_STORE_PROVIDER') || 'file', 'attachment') }, @@ -100,5 +104,9 @@ module.exports = { crawlerKey: config.get('APPINSIGHTS_CRAWLER_APIKEY') }, appVersion: config.get('APP_VERSION'), - buildsha: config.get('BUILD_SHA') + buildsha: config.get('BUILD_SHA'), + heapstats: { + logHeapstats: config.get('LOG_NODE_HEAPSTATS'), + logInverval: config.get('LOG_NODE_HEAPSTATS_INTERVAL_MS') + } } diff --git a/business/definitionService.js b/business/definitionService.js index 5a80def55..e18313dbb 100644 --- a/business/definitionService.js +++ b/business/definitionService.js @@ -39,7 +39,7 @@ const currentSchema = '1.7.0' const weights = { declared: 30, discovered: 25, consistency: 15, spdx: 15, texts: 15, date: 30, source: 70 } class DefinitionService { - constructor(harvestStore, harvestService, summary, aggregator, curation, store, search, cache) { + constructor(harvestStore, harvestService, summary, aggregator, curation, store, search, cache, upgradeHandler) { this.harvestStore = harvestStore this.harvestService = harvestService this.summaryService = summary @@ -48,9 +48,15 @@ class DefinitionService { this.definitionStore = store this.search = search this.cache = cache + this.upgradeHandler = upgradeHandler + if (this.upgradeHandler) this.upgradeHandler.currentSchema = currentSchema this.logger = logger() } + get currentSchema() { + return currentSchema + } + /** * Get the final representation of the specified definition and optionally apply the indicated * curation. @@ -68,11 +74,10 @@ class DefinitionService { return this.compute(coordinates, curation) } const existing = await this._cacheExistingAside(coordinates, force) - let result - if (get(existing, '_meta.schemaVersion') === currentSchema) { + let result = await this.upgradeHandler.validate(existing) + if (result) { // Log line used for /status page insights this.logger.info('computed definition available', { coordinates: coordinates.toString() }) - result = existing } else result = force ? await this.computeAndStore(coordinates) : await this.computeStoreAndCurate(coordinates) return this._trimDefinition(this._cast(result), expand) } @@ -598,5 +603,15 @@ class DefinitionService { } } -module.exports = (harvestStore, harvestService, summary, aggregator, curation, store, search, cache) => - new DefinitionService(harvestStore, harvestService, summary, aggregator, curation, store, search, cache) +module.exports = (harvestStore, harvestService, summary, aggregator, curation, store, search, cache, versionHandler) => + new DefinitionService( + harvestStore, + harvestService, + summary, + aggregator, + curation, + store, + search, + cache, + versionHandler + ) diff --git a/full.env.json b/full.env.json index 8deb15c99..3c668a218 100644 --- a/full.env.json +++ b/full.env.json @@ -47,5 +47,9 @@ "CRAWLER_QUEUE_PROVIDER": "memory", "CRAWLER_SERVICE_PORT": "5000", "CRAWLER_GITHUB_TOKEN": "< GitHub PAT here >", - "SCANCODE_HOME": "< ScanCode install location e.g., c:\\installs\\scancode-toolkit-2.2.1 >" + "SCANCODE_HOME": "< ScanCode install location e.g., c:\\installs\\scancode-toolkit-2.2.1 >", + + "========== Heapstats Logging settings (OPTIONAL) ==========": "", + "LOG_NODE_HEAPSTATS": "", + "LOG_NODE_HEAPSTATS_INTERVAL_MS": "" } diff --git a/lib/heapLogger.js b/lib/heapLogger.js new file mode 100644 index 000000000..345221100 --- /dev/null +++ b/lib/heapLogger.js @@ -0,0 +1,86 @@ +// (c) Copyright 2024, Microsoft and ClearlyDefined contributors. Licensed under the MIT license. +// SPDX-License-Identifier: MIT + +// =================================================== +// Log the heap statistics at regular intervals +// =================================================== +// NOTE: set 'LOG_NODE_HEAPSTATS' env var to 'true' to log heap stats +// NOTE: set 'LOG_NODE_HEAPSTATS_INTERVAL_MS' env var to '' for logging interval +// NOTE: To better understand heap stats being logged, check: +// - https://nodejs.org/docs/v22.12.0/api/v8.html#v8getheapspacestatistics +// - https://nodejs.org/docs/v22.12.0/api/v8.html#v8getheapstatistics +function trySetHeapLoggingAtInterval(config, logger) { + logger.debug('heapLogger.js :: Entered "trySetHeapLoggingAtInterval"...') + + const shouldLogHeapstats = config.heapstats.logHeapstats + ? config.heapstats.logHeapstats.toLowerCase() === 'true' + : false + + logger.debug(`heapLogger.js :: "shouldLogHeapstats" set to "${shouldLogHeapstats}"`) + + if (shouldLogHeapstats) { + const v8 = require('v8') + + const addCommas = num => Number(num).toLocaleString() + const isNumeric = num => !isNaN(Number(num)) + + // Set the heapstats logging interval + const maybeInterval = config.heapstats.logInverval + const heapStatsInverval = maybeInterval && isNumeric(maybeInterval) ? maybeInterval : 30000 + + logger.debug(`heapLogger.js :: heap stats logging interval will be "${heapStatsInverval}" ms`) + + // Function to log the heap space statistics + const logHeapSpaceStats = () => { + // Get the current timestamp + const currentTimestamp = new Date().toISOString() + + // Get the heap space statistics + const heapSpaceStats = v8.getHeapSpaceStatistics() + + heapSpaceStats.forEach(space => { + const heapStatsMessage = + `[${currentTimestamp}] Heap Space Statistics: ` + + `Space Name: '${space.space_name}', ` + + `Space Size: '${addCommas(space.space_size)}' bytes, ` + + `Space Used Size: '${addCommas(space.space_used_size)}' bytes, ` + + `Space Available Size: '${addCommas(space.space_available_size)}' bytes, ` + + `Physical Space Size: '${addCommas(space.physical_space_size)}' bytes` + + '\n--------------------------' + + logger.info(heapStatsMessage) + }) + + // Get the heap statistics + const heapStats = v8.getHeapStatistics() + + const heapStatsMessage = + `[${currentTimestamp}] Heap Statistics: ` + + `Total Heap Size: '${addCommas(heapStats.total_heap_size)}' bytes, ` + + `Total Heap Size Executable: '${addCommas(heapStats.total_heap_size_executable)}' bytes, ` + + `Total Physical Size: '${addCommas(heapStats.total_physical_size)}' bytes, ` + + `Total Available Size: '${addCommas(heapStats.total_available_size)}' bytes, ` + + `Used Heap Size: '${addCommas(heapStats.used_heap_size)}' bytes, ` + + `Heap Size Limit: '${addCommas(heapStats.heap_size_limit)}' bytes` + + '\n--------------------------' + + logger.info(heapStatsMessage) + } + + // Only run if not in a test environment + if (process.argv.every(arg => !arg.includes('mocha'))) { + logger.debug(`heapLogger.js :: setting heap stats logging at "${heapStatsInverval}" ms interval...`) + + // Set the interval to log the heap space statistics + setInterval(logHeapSpaceStats, heapStatsInverval) + + logger.debug(`heapLogger.js :: set heap stats logging at "${heapStatsInverval}" ms interval.`) + } + } else { + logger.debug('heapLogger.js :: heap stats logging not enabled.') + } + + logger.debug('heapLogger.js :: Exiting "trySetHeapLoggingAtInterval".') +} + +module.exports = trySetHeapLoggingAtInterval diff --git a/minimal.env.json b/minimal.env.json index 9f6916c91..54e23abdc 100644 --- a/minimal.env.json +++ b/minimal.env.json @@ -8,5 +8,9 @@ "========== Service Curation settings ==========": "", "CURATION_GITHUB_REPO": "sample-curated-data", - "CURATION_GITHUB_TOKEN": "" + "CURATION_GITHUB_TOKEN": "", + + "========== Heapstats Logging settings (OPTIONAL) ==========": "", + "LOG_NODE_HEAPSTATS": "", + "LOG_NODE_HEAPSTATS_INTERVAL_MS": "" } diff --git a/package-lock.json b/package-lock.json index 51213da0e..11a6926ea 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "service", - "version": "2.1.0", + "version": "2.2.0", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "service", - "version": "2.1.0", + "version": "2.2.0", "hasInstallScript": true, "license": "MIT", "dependencies": { diff --git a/package.json b/package.json index 3db16f7bf..7fd1e755b 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "service", - "version": "2.1.0", + "version": "2.2.0", "description": "Service side of clearlydefined.io.", "scripts": { "test": "npm run mocha && npm run lint", @@ -12,7 +12,7 @@ "prettier:check": "prettier . --check", "prettier:write": "prettier . --write", "dev": "nodemon ./bin/www", - "start": "node ./bin/www", + "start": "node --max-old-space-size=8192 ./bin/www", "postinstall": "patch-package" }, "license": "MIT", diff --git a/providers/index.js b/providers/index.js index 4471581fd..2d94b59e8 100644 --- a/providers/index.js +++ b/providers/index.js @@ -27,6 +27,16 @@ module.exports = { auth: { github: require('../middleware/githubConfig') }, + upgrade: { + queue: { + azure: require('../providers/upgrade/azureQueueConfig'), + memory: require('../providers/upgrade/memoryQueueConfig') + }, + service: { + versionCheck: require('../providers/upgrade/defVersionCheck').factory, + upgradeQueue: require('../providers/upgrade/defUpgradeQueueConfig') + } + }, curation: { queue: { azure: require('../providers/curation/azureQueueConfig'), diff --git a/providers/queueing/memoryQueue.js b/providers/queueing/memoryQueue.js index e90ae2c6c..5228bd669 100644 --- a/providers/queueing/memoryQueue.js +++ b/providers/queueing/memoryQueue.js @@ -9,6 +9,7 @@ class MemoryQueue { this.logger = logger() this.data = [] this.messageId = 0 + this.decoder = options.decoder } async initialize() {} @@ -33,14 +34,19 @@ class MemoryQueue { const message = this.data[0] if (!message) return null this.data[0].dequeueCount++ - if (message.dequeueCount <= 5) return Promise.resolve({ original: message, data: JSON.parse(message.messageText) }) + if (message.dequeueCount <= 5) return Promise.resolve({ original: message, data: this._parseData(message) }) await this.delete({ original: message }) return this.dequeue() } + _parseData({ messageText }) { + return JSON.parse(this.decoder(messageText)) + } + /** Similar to dequeue() but returns an array instead. See AzureStorageQueue.dequeueMultiple() */ async dequeueMultiple() { - return [await this.dequeue()] + const message = await this.dequeue() + return message ? [message] : [] } /** @@ -58,4 +64,12 @@ class MemoryQueue { } } -module.exports = () => new MemoryQueue() +const factory = (opts = {}) => { + const defaultOpts = { + decoder: text => text + } + const mergedOpts = { ...defaultOpts, ...opts } + return new MemoryQueue(mergedOpts) +} + +module.exports = factory diff --git a/providers/upgrade/azureQueueConfig.js b/providers/upgrade/azureQueueConfig.js new file mode 100644 index 000000000..219ac3f37 --- /dev/null +++ b/providers/upgrade/azureQueueConfig.js @@ -0,0 +1,22 @@ +// (c) Copyright 2024, SAP SE and ClearlyDefined contributors. Licensed under the MIT license. +// SPDX-License-Identifier: MIT + +const config = require('painless-config') +const AzureStorageQueue = require('../queueing/azureStorageQueue') + +const defaultOptions = { + connectionString: + config.get('DEFINITION_UPGRADE_QUEUE_CONNECTION_STRING') || config.get('HARVEST_AZBLOB_CONNECTION_STRING'), + queueName: config.get('DEFINITION_UPGRADE_QUEUE_NAME') || 'definitions-upgrade', + dequeueOptions: { + numOfMessages: config.get('DEFINITION_UPGRADE_DEQUEUE_BATCH_SIZE') || 16, + visibilityTimeout: 10 * 60 // 10 min. The default value is 30 seconds. + } +} + +function azure(options) { + const realOptions = options || defaultOptions + return new AzureStorageQueue(realOptions) +} + +module.exports = azure diff --git a/providers/upgrade/defUpgradeQueue.js b/providers/upgrade/defUpgradeQueue.js new file mode 100644 index 000000000..e8d103a40 --- /dev/null +++ b/providers/upgrade/defUpgradeQueue.js @@ -0,0 +1,50 @@ +// (c) Copyright 2024, SAP SE and ClearlyDefined contributors. Licensed under the MIT license. +// SPDX-License-Identifier: MIT + +const { DefinitionVersionChecker } = require('./defVersionCheck') +const { setup } = require('./process') + +class DefinitionQueueUpgrader extends DefinitionVersionChecker { + async validate(definition) { + if (!definition) return + const result = await super.validate(definition) + if (result) return result + + await this._queueUpgrade(definition) + return definition + } + + async _queueUpgrade(definition) { + if (!this._upgrade) throw new Error('Upgrade queue is not set') + try { + const message = this._constructMessage(definition) + await this._upgrade.queue(message) + this.logger.info('Queued for definition upgrade ', { + coordinates: DefinitionVersionChecker.getCoordinates(definition) + }) + } catch (error) { + //continue if queueing fails and requeue at the next request. + this.logger.error(`Error queueing for definition upgrade ${error.message}`, { + error, + coordinates: DefinitionVersionChecker.getCoordinates(definition) + }) + } + } + + _constructMessage(definition) { + const { coordinates, _meta } = definition + const content = { coordinates, _meta } + return Buffer.from(JSON.stringify(content)).toString('base64') + } + + async initialize() { + this._upgrade = this.options.queue() + return this._upgrade.initialize() + } + + setupProcessing(definitionService, logger, once) { + return setup(this._upgrade, definitionService, logger, once) + } +} + +module.exports = DefinitionQueueUpgrader diff --git a/providers/upgrade/defUpgradeQueueConfig.js b/providers/upgrade/defUpgradeQueueConfig.js new file mode 100644 index 000000000..f95588c13 --- /dev/null +++ b/providers/upgrade/defUpgradeQueueConfig.js @@ -0,0 +1,12 @@ +// (c) Copyright 2024, SAP SE and ClearlyDefined contributors. Licensed under the MIT license. +// SPDX-License-Identifier: MIT + +const DefinitionQueueUpgrader = require('./defUpgradeQueue') +const memory = require('../queueing/memoryQueue') + +function serviceFactory(options = {}) { + const mergedOptions = { queue: memory, ...options } + return new DefinitionQueueUpgrader(mergedOptions) +} + +module.exports = serviceFactory diff --git a/providers/upgrade/defVersionCheck.js b/providers/upgrade/defVersionCheck.js new file mode 100644 index 000000000..8b3801299 --- /dev/null +++ b/providers/upgrade/defVersionCheck.js @@ -0,0 +1,47 @@ +// (c) Copyright 2024, SAP SE and ClearlyDefined contributors. Licensed under the MIT license. +// SPDX-License-Identifier: MIT + +const logger = require('../logging/logger') +const { gte } = require('semver') +const { get } = require('lodash') +const EntityCoordinates = require('../../lib/entityCoordinates') + +class DefinitionVersionChecker { + constructor(options = {}) { + this.options = options + this.logger = this.options.logger || logger() + } + + set currentSchema(schemaVersion) { + this._currentSchema = schemaVersion + } + + get currentSchema() { + return this._currentSchema + } + + async validate(definition) { + if (!this._currentSchema) throw new Error('Current schema version is not set') + const defSchemaVersion = get(definition, '_meta.schemaVersion') + this.logger.debug(`Definition version: %s, Current schema version: %s `, defSchemaVersion, this._currentSchema, { + coordinates: DefinitionVersionChecker.getCoordinates(definition) + }) + if (defSchemaVersion && gte(defSchemaVersion, this._currentSchema)) return definition + } + + async initialize() { + //do nothing for initialization + } + + setupProcessing() { + //do nothing for set up processing + } + + static getCoordinates(definition) { + return definition?.coordinates && EntityCoordinates.fromObject(definition.coordinates).toString() + } +} + +const factory = options => new DefinitionVersionChecker(options) + +module.exports = { DefinitionVersionChecker, factory } diff --git a/providers/upgrade/memoryQueueConfig.js b/providers/upgrade/memoryQueueConfig.js new file mode 100644 index 000000000..aed034223 --- /dev/null +++ b/providers/upgrade/memoryQueueConfig.js @@ -0,0 +1,14 @@ +// (c) Copyright 2024, SAP SE and ClearlyDefined contributors. Licensed under the MIT license. +// SPDX-License-Identifier: MIT + +const MemoryQueue = require('../queueing/memoryQueue') + +const encodedMessageQueueFactory = opts => { + const defaultOpts = { + decoder: text => Buffer.from(text, 'base64').toString('utf8') + } + const mergedOpts = { ...defaultOpts, ...opts } + return MemoryQueue(mergedOpts) +} + +module.exports = encodedMessageQueueFactory diff --git a/providers/upgrade/process.js b/providers/upgrade/process.js new file mode 100644 index 000000000..7aa82f964 --- /dev/null +++ b/providers/upgrade/process.js @@ -0,0 +1,94 @@ +// (c) Copyright 2024, SAP SE and ClearlyDefined contributors. Licensed under the MIT license. +// SPDX-License-Identifier: MIT + +const { get } = require('lodash') +const EntityCoordinates = require('../../lib/entityCoordinates') +const { factory } = require('./defVersionCheck') +const Cache = require('../caching/memory') + +class QueueHandler { + constructor(queue, logger, messageHandler = { processMessage: async () => {} }) { + this._queue = queue + this.logger = logger + this._messageHandler = messageHandler + } + + async work(once) { + let isQueueEmpty = true + try { + const messages = await this._queue.dequeueMultiple() + if (messages && messages.length > 0) isQueueEmpty = false + const results = await Promise.allSettled( + messages.map(async message => { + await this._messageHandler.processMessage(message) + await this._queue.delete(message) + }) + ) + results.filter(result => result.status === 'rejected').forEach(result => this.logger.error(result.reason)) + } catch (error) { + this.logger.error(error) + } finally { + if (!once) setTimeout(this.work.bind(this), isQueueEmpty ? 10000 : 0) + } + } +} + +class DefinitionUpgrader { + static defaultTtlSeconds = 60 * 5 /* 5 mins */ + static delayInMSeconds = 500 + + constructor( + definitionService, + logger, + defVersionChecker, + cache = Cache({ defaultTtlSeconds: DefinitionUpgrader.defaultTtlSeconds }) + ) { + this.logger = logger + this._definitionService = definitionService + this._defVersionChecker = defVersionChecker + this._defVersionChecker.currentSchema = definitionService.currentSchema + this._upgradeLock = cache + } + + async processMessage(message) { + let coordinates = get(message, 'data.coordinates') + if (!coordinates) return + coordinates = EntityCoordinates.fromObject(coordinates) + + while (this._upgradeLock.get(coordinates.toString())) { + await new Promise(resolve => setTimeout(resolve, DefinitionUpgrader.delayInMSeconds)) + } + try { + this._upgradeLock.set(coordinates.toString(), true) + await this._upgradeIfNecessary(coordinates) + } finally { + this._upgradeLock.delete(coordinates.toString()) + } + } + + async _upgradeIfNecessary(coordinates) { + try { + const existing = await this._definitionService.getStored(coordinates) + let result = await this._defVersionChecker.validate(existing) + if (!result) { + await this._definitionService.computeStoreAndCurate(coordinates) + this.logger.info('Handled definition upgrade for %s', coordinates) + } else { + this.logger.debug('Skipped definition upgrade for %s', coordinates) + } + } catch (error) { + const context = `Error handling definition upgrade for ${coordinates.toString()}` + const newError = new Error(`${context}: ${error.message}`) + newError.stack = error.stack + throw newError + } + } +} + +function setup(_queue, _definitionService, _logger, once = false, _defVersionChecker = factory({ logger: _logger })) { + const defUpgrader = new DefinitionUpgrader(_definitionService, _logger, _defVersionChecker) + const queueHandler = new QueueHandler(_queue, _logger, defUpgrader) + return queueHandler.work(once) +} + +module.exports = { DefinitionUpgrader, QueueHandler, setup } diff --git a/test/business/definitionServiceTest.js b/test/business/definitionServiceTest.js index 93a5e5d8f..db15f8b6c 100644 --- a/test/business/definitionServiceTest.js +++ b/test/business/definitionServiceTest.js @@ -15,6 +15,10 @@ const expect = chai.expect const FileHarvestStore = require('../../providers/stores/fileHarvestStore') const SummaryService = require('../../business/summarizer') const AggregatorService = require('../../business/aggregator') +const DefinitionQueueUpgrader = require('../../providers/upgrade/defUpgradeQueue') +const memoryQueue = require('../../providers/upgrade/memoryQueueConfig') +const { DefinitionVersionChecker } = require('../../providers/upgrade/defVersionCheck') +const util = require('util') describe('Definition Service', () => { it('invalidates single coordinate', async () => { @@ -314,26 +318,147 @@ describe('Definition Service Facet management', () => { }) describe('Integration test', () => { - let fileHarvestStore - beforeEach(() => { - fileHarvestStore = createFileHarvestStore() + describe('compute', () => { + let fileHarvestStore + beforeEach(() => { + fileHarvestStore = createFileHarvestStore() + }) + + it('computes the same definition with latest harvest data', async () => { + const coordinates = EntityCoordinates.fromString('npm/npmjs/-/debug/3.1.0') + const allHarvestData = await fileHarvestStore.getAll(coordinates) + delete allHarvestData['scancode']['2.9.0+b1'] //remove invalid scancode version + let service = setupServiceToCalculateDefinition(allHarvestData) + const baseline_def = await service.compute(coordinates) + + const latestHarvestData = await fileHarvestStore.getAllLatest(coordinates) + service = setupServiceToCalculateDefinition(latestHarvestData) + const comparison_def = await service.compute(coordinates) + + //updated timestamp is not deterministic + expect(comparison_def._meta.updated).to.not.equal(baseline_def._meta.updated) + comparison_def._meta.updated = baseline_def._meta.updated + expect(comparison_def).to.deep.equal(baseline_def) + }) }) - it('computes the same definition with latest harvest data', async () => { - const coordinates = EntityCoordinates.fromString('npm/npmjs/-/debug/3.1.0') - const allHarvestData = await fileHarvestStore.getAll(coordinates) - delete allHarvestData['scancode']['2.9.0+b1'] //remove invalid scancode version - let service = setupDefinitionService(allHarvestData) - const baseline_def = await service.compute(coordinates) - - const latestHarvestData = await fileHarvestStore.getAllLatest(coordinates) - service = setupDefinitionService(latestHarvestData) - const comparison_def = await service.compute(coordinates) - - //updated timestamp is not deterministic - expect(comparison_def._meta.updated).to.not.equal(baseline_def._meta.updated) - comparison_def._meta.updated = baseline_def._meta.updated - expect(comparison_def).to.deep.equal(baseline_def) + describe('Handle schema version upgrade', () => { + const coordinates = EntityCoordinates.fromString('npm/npmjs/-/test/1.0') + const definition = { _meta: { schemaVersion: '1.7.0' }, coordinates } + + let logger, upgradeHandler + beforeEach(() => { + logger = { + debug: (format, ...args) => console.debug(util.format(format, ...args)), + error: (format, ...args) => console.error(util.format(format, ...args)), + info: (format, ...args) => console.info(util.format(format, ...args)) + } + }) + + const handleVersionedDefinition = function () { + describe('verify schema version', () => { + it('logs and harvests new definitions with empty tools', async () => { + const { service } = setupServiceForUpgrade(null, upgradeHandler) + service._harvest = sinon.stub() + await service.get(coordinates) + expect(service._harvest.calledOnce).to.be.true + expect(service._harvest.getCall(0).args[0]).to.eq(coordinates) + }) + + it('computes if definition does not exist', async () => { + const { service } = setupServiceForUpgrade(null, upgradeHandler) + service.computeStoreAndCurate = sinon.stub().resolves(definition) + await service.get(coordinates) + expect(service.computeStoreAndCurate.calledOnce).to.be.true + expect(service.computeStoreAndCurate.getCall(0).args[0]).to.eq(coordinates) + }) + + it('returns the up-to-date definition', async () => { + const { service } = setupServiceForUpgrade(definition, upgradeHandler) + service.computeStoreAndCurate = sinon.stub() + const result = await service.get(coordinates) + expect(service.computeStoreAndCurate.called).to.be.false + expect(result).to.deep.equal(definition) + }) + }) + } + + describe('schema version check', () => { + beforeEach(async () => { + upgradeHandler = new DefinitionVersionChecker({ logger }) + await upgradeHandler.initialize() + }) + + handleVersionedDefinition() + + context('with stale definitions', () => { + it('recomputes a definition with the updated schema version', async () => { + const staleDef = { ...createDefinition(null, null, ['foo']), _meta: { schemaVersion: '1.0.0' }, coordinates } + const { service, store } = setupServiceForUpgrade(staleDef, upgradeHandler) + const result = await service.get(coordinates) + expect(result._meta.schemaVersion).to.eq('1.7.0') + expect(result.coordinates).to.deep.equal(coordinates) + expect(store.store.calledOnce).to.be.true + }) + }) + }) + + describe('queueing schema version updates', () => { + let queue, staleDef + beforeEach(async () => { + queue = memoryQueue() + const queueFactory = sinon.stub().returns(queue) + upgradeHandler = new DefinitionQueueUpgrader({ logger, queue: queueFactory }) + await upgradeHandler.initialize() + staleDef = { ...createDefinition(null, null, ['foo']), _meta: { schemaVersion: '1.0.0' }, coordinates } + }) + + handleVersionedDefinition() + + context('with stale definitions', () => { + it('returns a stale definition, queues update, recomputes and retrieves the updated definition', async () => { + const { service, store } = setupServiceForUpgrade(staleDef, upgradeHandler) + const result = await service.get(coordinates) + expect(result).to.deep.equal(staleDef) + expect(queue.data.length).to.eq(1) + await upgradeHandler.setupProcessing(service, logger, true) + const newResult = await service.get(coordinates) + expect(newResult._meta.schemaVersion).to.eq('1.7.0') + expect(store.store.calledOnce).to.be.true + expect(queue.data.length).to.eq(0) + }) + + it('computes once when the same coordinates is queued twice', async () => { + const { service, store } = setupServiceForUpgrade(staleDef, upgradeHandler) + await service.get(coordinates) + const result = await service.get(coordinates) + expect(result).to.deep.equal(staleDef) + expect(queue.data.length).to.eq(2) + await upgradeHandler.setupProcessing(service, logger, true) + expect(queue.data.length).to.eq(1) + await upgradeHandler.setupProcessing(service, logger, true) + const newResult = await service.get(coordinates) + expect(newResult._meta.schemaVersion).to.eq('1.7.0') + expect(store.store.calledOnce).to.be.true + expect(queue.data.length).to.eq(0) + }) + + it('computes once when the same coordinates is queued twice within one dequeue batch ', async () => { + const { service, store } = setupServiceForUpgrade(staleDef, upgradeHandler) + await service.get(coordinates) + await service.get(coordinates) + queue.dequeueMultiple = sinon.stub().callsFake(async () => { + const message1 = await queue.dequeue() + const message2 = await queue.dequeue() + return Promise.resolve([message1, message2]) + }) + await upgradeHandler.setupProcessing(service, logger, true) + const newResult = await service.get(coordinates) + expect(newResult._meta.schemaVersion).to.eq('1.7.0') + expect(store.store.calledOnce).to.be.true + }) + }) + }) }) }) @@ -348,7 +473,7 @@ function createFileHarvestStore() { return FileHarvestStore(options) } -function setupDefinitionService(rawHarvestData) { +function setupServiceToCalculateDefinition(rawHarvestData) { const harvestStore = { getAllLatest: () => Promise.resolve(rawHarvestData) } const summary = SummaryService({}) @@ -363,15 +488,47 @@ function setupDefinitionService(rawHarvestData) { return setupWithDelegates(curator, harvestStore, summary, aggregator) } -function setupWithDelegates(curator, harvestStore, summary, aggregator) { - const store = { delete: sinon.stub(), get: sinon.stub(), store: sinon.stub() } +function setupServiceForUpgrade(definition, upgradeHandler) { + let storedDef = definition && { ...definition } + const store = { + get: sinon.stub().resolves(storedDef), + store: sinon.stub().callsFake(def => (storedDef = def)) + } + const harvestStore = { getAllLatest: () => Promise.resolve(null) } + const summary = { summarizeAll: () => Promise.resolve(null) } + const aggregator = { process: () => Promise.resolve(definition) } + const curator = { + get: () => Promise.resolve(), + apply: (_coordinates, _curationSpec, definition) => Promise.resolve(definition), + autoCurate: () => {} + } + const service = setupWithDelegates(curator, harvestStore, summary, aggregator, store, upgradeHandler) + return { service, store } +} + +function setupWithDelegates( + curator, + harvestStore, + summary, + aggregator, + store = { delete: sinon.stub(), get: sinon.stub(), store: sinon.stub() }, + upgradeHandler = { validate: def => Promise.resolve(def) } +) { const search = { delete: sinon.stub(), store: sinon.stub() } const cache = { delete: sinon.stub(), get: sinon.stub(), set: sinon.stub() } - const harvestService = { harvest: () => sinon.stub() } - const service = DefinitionService(harvestStore, harvestService, summary, aggregator, curator, store, search, cache) + const service = DefinitionService( + harvestStore, + harvestService, + summary, + aggregator, + curator, + store, + search, + cache, + upgradeHandler + ) service.logger = { info: sinon.stub(), debug: () => {} } - service._harvest = sinon.stub() return service } @@ -411,7 +568,18 @@ function setup(definition, coordinateSpec, curation) { const harvestService = { harvest: () => sinon.stub() } const summary = { summarizeAll: () => Promise.resolve(null) } const aggregator = { process: () => Promise.resolve(definition) } - const service = DefinitionService(harvestStore, harvestService, summary, aggregator, curator, store, search, cache) + const upgradeHandler = { validate: def => Promise.resolve(def) } + const service = DefinitionService( + harvestStore, + harvestService, + summary, + aggregator, + curator, + store, + search, + cache, + upgradeHandler + ) service.logger = { info: sinon.stub(), debug: sinon.stub() } service._harvest = sinon.stub() const coordinates = EntityCoordinates.fromString(coordinateSpec || 'npm/npmjs/-/test/1.0') diff --git a/test/providers/queueing/memoryQueueTest.js b/test/providers/queueing/memoryQueueTest.js index 3a241d749..0e1d1f932 100644 --- a/test/providers/queueing/memoryQueueTest.js +++ b/test/providers/queueing/memoryQueueTest.js @@ -5,52 +5,104 @@ const assert = require('assert') const MemoryQueue = require('../../../providers/queueing/memoryQueue') describe('memory queue operations', () => { - it('queues messages', async () => { - const memQueue = MemoryQueue() - await memQueue.queue(JSON.stringify({ somekey: 1 })) - await memQueue.queue(JSON.stringify({ somekey: 2 })) - assert.equal(memQueue.data.length, 2) - }) + let memQueue + context('with unecoded messages', () => { + beforeEach(() => { + memQueue = MemoryQueue() + }) - it('dequeues messages', async () => { - const memQueue = MemoryQueue() - await memQueue.queue(JSON.stringify({ somekey: 1 })) - await memQueue.queue(JSON.stringify({ somekey: 2 })) + it('queues messages', async () => { + await memQueue.queue(JSON.stringify({ somekey: 1 })) + await memQueue.queue(JSON.stringify({ somekey: 2 })) + assert.equal(memQueue.data.length, 2) + }) - let message1 = await memQueue.dequeue() - assert.equal(message1.data.somekey, 1) + it('dequeues messages', async () => { + await memQueue.queue(JSON.stringify({ somekey: 1 })) + await memQueue.queue(JSON.stringify({ somekey: 2 })) - await memQueue.delete(message1) + let message1 = await memQueue.dequeue() + assert.equal(message1.data.somekey, 1) - let message2 = await memQueue.dequeue() - assert.equal(message2.data.somekey, 2) + await memQueue.delete(message1) - await memQueue.delete(message2) + let message2 = await memQueue.dequeue() + assert.equal(message2.data.somekey, 2) - let message3 = await memQueue.dequeue() - assert.equal(message3, null) - }) + await memQueue.delete(message2) + + let message3 = await memQueue.dequeue() + assert.equal(message3, null) + }) + + it('dequeue count increases to 5', async () => { + await memQueue.queue(JSON.stringify({ somekey: 1 })) + + let message = await memQueue.dequeue() + assert.equal(message.original.dequeueCount, 1) + + message = await memQueue.dequeue() + assert.equal(message.original.dequeueCount, 2) + + message = await memQueue.dequeue() + assert.equal(message.original.dequeueCount, 3) - it('dequeue count increases to 5', async () => { - const memQueue = MemoryQueue() - await memQueue.queue(JSON.stringify({ somekey: 1 })) + message = await memQueue.dequeue() + assert.equal(message.original.dequeueCount, 4) - let message = await memQueue.dequeue() - assert.equal(message.original.dequeueCount, 1) + message = await memQueue.dequeue() + assert.equal(message.original.dequeueCount, 5) + + message = await memQueue.dequeue() + assert.equal(message, null) + }) + + it('handles dequeueing multiple messages', async () => { + await memQueue.queue(JSON.stringify({ somekey: 1 })) + await memQueue.queue(JSON.stringify({ somekey: 2 })) + + let messages = await memQueue.dequeueMultiple() + assert.equal(messages.length, 1) + assert.equal(messages[0].data.somekey, 1) + await memQueue.delete(messages[0]) + + messages = await memQueue.dequeueMultiple() + assert.equal(messages.length, 1) + assert.equal(messages[0].data.somekey, 2) + }) + + it('handles dequeueing multiple messages of an empty queue', async () => { + let messages = await memQueue.dequeueMultiple() + assert.equal(messages.length, 0) + }) + }) - message = await memQueue.dequeue() - assert.equal(message.original.dequeueCount, 2) + context('with encoded messages', () => { + beforeEach(() => { + memQueue = MemoryQueue({ + decoder: text => Buffer.from(text, 'base64').toString('utf8') + }) + }) + const encode = content => Buffer.from(JSON.stringify(content)).toString('base64') - message = await memQueue.dequeue() - assert.equal(message.original.dequeueCount, 3) + it('queues messages', async () => { + await memQueue.queue(encode({ somekey: 1 })) + await memQueue.queue(encode({ somekey: 2 })) + assert.equal(memQueue.data.length, 2) + }) - message = await memQueue.dequeue() - assert.equal(message.original.dequeueCount, 4) + it('handles dequeueing multiple messages', async () => { + await memQueue.queue(encode({ somekey: 1 })) + await memQueue.queue(encode({ somekey: 2 })) - message = await memQueue.dequeue() - assert.equal(message.original.dequeueCount, 5) + let messages = await memQueue.dequeueMultiple() + assert.equal(messages.length, 1) + assert.equal(messages[0].data.somekey, 1) + await memQueue.delete(messages[0]) - message = await memQueue.dequeue() - assert.equal(message, null) + messages = await memQueue.dequeueMultiple() + assert.equal(messages.length, 1) + assert.equal(messages[0].data.somekey, 2) + }) }) }) diff --git a/test/providers/upgrade/defUpgradeQueue.js b/test/providers/upgrade/defUpgradeQueue.js new file mode 100644 index 000000000..8d286d19c --- /dev/null +++ b/test/providers/upgrade/defUpgradeQueue.js @@ -0,0 +1,140 @@ +// (c) Copyright 2024, SAP SE and ClearlyDefined contributors. Licensed under the MIT license. +// SPDX-License-Identifier: MIT + +const chaiAsPromised = require('chai-as-promised') +const chai = require('chai') +chai.use(chaiAsPromised) +const { expect } = require('chai') +const sinon = require('sinon') +const DefinitionQueueUpgrader = require('../../../providers/upgrade/defUpgradeQueue') +const MemoryQueue = require('../../../providers/upgrade/memoryQueueConfig') + +describe('DefinitionQueueUpgrader', () => { + let logger + beforeEach(() => { + logger = { debug: sinon.stub(), error: sinon.stub() } + }) + + describe('Unit tests', () => { + const definition = { coordinates: 'test', _meta: { schemaVersion: '1.0.0' } } + let queue, upgrader + + beforeEach(async () => { + queue = { + queue: sinon.stub().resolves(), + initialize: sinon.stub().resolves() + } + const queueFactory = sinon.stub().returns(queue) + upgrader = new DefinitionQueueUpgrader({ logger, queue: queueFactory }) + }) + + it('returns an instance of DefinitionQueueUpgrader', () => { + expect(upgrader).to.be.an.instanceOf(DefinitionQueueUpgrader) + }) + + it('sets and gets current schema version', () => { + upgrader.currentSchema = '1.0.0' + expect(upgrader.currentSchema).to.equal('1.0.0') + }) + + it('initializes', async () => { + await upgrader.initialize() + expect(queue.initialize.calledOnce).to.be.true + }) + + it('connects to queue after setupProcessing', async () => { + await upgrader.initialize() + const definitionService = { currentSchema: '1.0.0' } + queue.dequeueMultiple = sinon.stub().resolves([]) + upgrader.setupProcessing(definitionService, logger, true) + expect(queue.dequeueMultiple.calledOnce).to.be.true + }) + + context('validate', () => { + it('fails if current schema version is not set', async () => { + await expect(upgrader.validate(definition)).to.be.rejectedWith(Error) + }) + + it('fails if it is not initialized', async () => { + upgrader.currentSchema = '1.0.0' + const stale = { coordinates: 'test', _meta: { schemaVersion: '0.0.1' } } + await expect(upgrader.validate(stale)).to.be.rejectedWith(Error) + }) + }) + + context('validate after set up', () => { + beforeEach(async () => { + await upgrader.initialize() + upgrader.currentSchema = '1.0.0' + }) + + it('does not queue null definition', async () => { + const result = await upgrader.validate(null) + expect(result).to.be.not.ok + expect(queue.queue.called).to.be.false + }) + + it('does not queue an up-to-date definition', async () => { + const definition = { coordinates: 'test', _meta: { schemaVersion: '1.0.0' } } + const result = await upgrader.validate(definition) + expect(result).to.deep.equal(definition) + expect(queue.queue.called).to.be.false + }) + + it('queues and returns a stale definition', async () => { + const definition = { coordinates: 'test', _meta: { schemaVersion: '0.0.1' } } + const result = await upgrader.validate(definition) + expect(result).to.deep.equal(definition) + expect(queue.queue.calledOnce).to.be.true + }) + + it('logs erorr when queueing throws', async () => { + const staleDef = { + coordinates: { + type: 'npm', + provider: 'npmjs', + name: 'lodash', + revision: '4.17.11' + }, + _meta: { schemaVersion: '0.0.1' } + } + queue.queue.rejects(new Error('test')) + const result = await upgrader.validate(staleDef) + expect(result).to.deep.equal(staleDef) + expect(logger.error.calledOnce).to.be.true + const { coordinates } = logger.error.args[0][1] + expect(coordinates).to.eq('npm/npmjs/-/lodash/4.17.11') + }) + }) + }) + + describe('Integration tests', () => { + let queue, upgrader + + beforeEach(async () => { + queue = MemoryQueue() + upgrader = new DefinitionQueueUpgrader({ logger, queue: sinon.stub().returns(queue) }) + await upgrader.initialize() + upgrader.currentSchema = '1.0.0' + }) + + it('queues the correct message that can be decoded correctly', async () => { + const staleDef = { + coordinates: { + type: 'npm', + provider: 'npmjs', + name: 'lodash', + revision: '4.17.11' + }, + _meta: { schemaVersion: '0.0.1' } + } + const result = await upgrader.validate(staleDef) + expect(result).to.deep.equal(staleDef) + expect(queue.data.length).to.equal(1) + + const message = await queue.dequeue() + const coordinates = message.data.coordinates + expect(coordinates).to.deep.equal(staleDef.coordinates) + }) + }) +}) diff --git a/test/providers/upgrade/defVersionCheck.js b/test/providers/upgrade/defVersionCheck.js new file mode 100644 index 000000000..cf33a49a3 --- /dev/null +++ b/test/providers/upgrade/defVersionCheck.js @@ -0,0 +1,76 @@ +// (c) Copyright 2024, SAP SE and ClearlyDefined contributors. Licensed under the MIT license. +// SPDX-License-Identifier: MIT + +const chaiAsPromised = require('chai-as-promised') +const chai = require('chai') +chai.use(chaiAsPromised) +const { expect } = require('chai') +const sinon = require('sinon') +const { DefinitionVersionChecker, factory } = require('../../../providers/upgrade/defVersionCheck') + +describe('DefinitionVersionChecker', () => { + let logger, checker + beforeEach(() => { + logger = { debug: sinon.stub() } + checker = new DefinitionVersionChecker({ logger }) + }) + + it('returns an instance of DefinitionVersionChecker', () => { + expect(checker).to.be.an.instanceOf(DefinitionVersionChecker) + }) + + it('creates a new instance of DefinitionVersionChecker using factory', () => { + const checker = factory({ logger: logger }) + expect(checker).to.be.an.instanceOf(DefinitionVersionChecker) + }) + + it('sets and gets current schema version', () => { + checker.currentSchema = '1.0.0' + expect(checker.currentSchema).to.equal('1.0.0') + }) + + it('initializes and returns undefined', async () => { + const result = await checker.initialize() + expect(result).to.be.not.ok + }) + + it('returns after setupProcessing', async () => { + const result = checker.setupProcessing() + expect(result).to.be.not.ok + }) + + it('throws an error in validate if current schema version is not set', async () => { + const definition = { _meta: { schemaVersion: '1.0.0' } } + await expect(checker.validate(definition)).to.be.rejectedWith(Error) + }) + + context('validate after current schema version is set', () => { + beforeEach(() => { + checker.currentSchema = '1.0.0' + }) + + it('returns the definition if it is up-to-date', async () => { + const definition = { _meta: { schemaVersion: '1.0.0' } } + const result = await checker.validate(definition) + expect(result).to.deep.equal(definition) + }) + + it('returns undefined for a stale definition', async () => { + const definition = { _meta: { schemaVersion: '0.1.0' } } + const result = await checker.validate(definition) + expect(result).to.be.undefined + }) + + it('returns undefined for a definition without schema version', async () => { + const definition = {} + const result = await checker.validate(definition) + expect(result).to.be.undefined + }) + + it('handles null', async () => { + checker.currentSchema = '1.0.0' + const result = await checker.validate(null) + expect(result).to.be.not.ok + }) + }) +}) diff --git a/test/providers/upgrade/processTest.js b/test/providers/upgrade/processTest.js new file mode 100644 index 000000000..1ea7fe72f --- /dev/null +++ b/test/providers/upgrade/processTest.js @@ -0,0 +1,212 @@ +// (c) Copyright 2024, SAP SE and ClearlyDefined contributors. Licensed under the MIT license. +// SPDX-License-Identifier: MIT + +const chaiAsPromised = require('chai-as-promised') +const chai = require('chai') +chai.use(chaiAsPromised) +const { expect } = require('chai') +const sinon = require('sinon') +const { QueueHandler, DefinitionUpgrader } = require('../../../providers/upgrade/process') +const EntityCoordinates = require('../../../lib/entityCoordinates') + +describe('Definition Upgrade Queue Processing', () => { + let logger + + beforeEach(() => { + logger = { + info: sinon.stub(), + error: sinon.stub(), + debug: sinon.stub() + } + }) + + describe('QueueHandler', () => { + let queue, messageHandler, handler + + beforeEach(() => { + queue = { + dequeueMultiple: sinon.stub(), + delete: sinon.stub().resolves() + } + messageHandler = { + processMessage: sinon.stub() + } + handler = new QueueHandler(queue, logger, messageHandler) + }) + + it('returns an instance of QueueHandler', () => { + expect(handler).to.be.an.instanceOf(QueueHandler) + }) + + it('works on a queue', () => { + queue.dequeueMultiple.resolves([]) + handler.work(true) + expect(queue.dequeueMultiple.calledOnce).to.be.true + expect(messageHandler.processMessage.notCalled).to.be.true + expect(queue.delete.notCalled).to.be.true + }) + + it('processes one message', async () => { + queue.dequeueMultiple.resolves([{ message: 'test' }]) + await handler.work(true) + expect(queue.dequeueMultiple.calledOnce).to.be.true + expect(messageHandler.processMessage.calledOnce).to.be.true + expect(queue.delete.calledOnce).to.be.true + }) + + it('processes multiple messages', async () => { + queue.dequeueMultiple.resolves([{ message: 'testA' }, { message: 'testB' }]) + await handler.work(true) + expect(queue.dequeueMultiple.calledOnce).to.be.true + expect(messageHandler.processMessage.calledTwice).to.be.true + expect(queue.delete.calledTwice).to.be.true + }) + + it('handles if error is thrown', async () => { + queue.dequeueMultiple.resolves([{ message: 'testA' }]) + messageHandler.processMessage = sinon.stub().throws() + await handler.work(true) + expect(queue.dequeueMultiple.calledOnce).to.be.true + expect(messageHandler.processMessage.calledOnce).to.be.true + expect(queue.delete.called).to.be.false + expect(logger.error.calledOnce).to.be.true + }) + + it('handles both sucessful and unsucessful messages', async () => { + queue.dequeueMultiple.resolves([{ message: 'testA' }, { message: 'testB' }]) + messageHandler.processMessage = sinon.stub().onFirstCall().throws().onSecondCall().resolves() + await handler.work(true) + expect(queue.dequeueMultiple.calledOnce).to.be.true + expect(messageHandler.processMessage.calledTwice).to.be.true + expect(queue.delete.calledOnce).to.be.true + expect(logger.error.calledOnce).to.be.true + }) + }) + + describe('DefinitionUpgrader', () => { + const coordinates = 'pypi/pypi/-/test/revision' + const definition = Object.freeze({ coordinates: EntityCoordinates.fromString(coordinates) }) + const message = Object.freeze({ data: { coordinates: definition.coordinates } }) + let definitionService, versionChecker, upgrader + + beforeEach(() => { + definitionService = { + getStored: sinon.stub(), + computeStoreAndCurate: sinon.stub().resolves() + } + versionChecker = { + validate: sinon.stub() + } + upgrader = new DefinitionUpgrader(definitionService, logger, versionChecker) + }) + + it('recomputes a definition, if a definition is not up-to-date', async () => { + definitionService.getStored.resolves(definition) + versionChecker.validate.resolves() + + await upgrader.processMessage(message) + expect(definitionService.getStored.calledOnce).to.be.true + expect(versionChecker.validate.calledOnce).to.be.true + expect(definitionService.computeStoreAndCurate.calledOnce).to.be.true + }) + + it('skips compute if a definition is up-to-date', async () => { + definitionService.getStored.resolves(definition) + versionChecker.validate.resolves(definition) + + await upgrader.processMessage(message) + expect(definitionService.getStored.calledOnce).to.be.true + expect(versionChecker.validate.calledOnce).to.be.true + expect(definitionService.computeStoreAndCurate.notCalled).to.be.true + }) + + it('computes if a definition does not exist', async () => { + definitionService.getStored.resolves() + versionChecker.validate.resolves() + + await upgrader.processMessage(message) + expect(definitionService.getStored.calledOnce).to.be.true + expect(versionChecker.validate.calledOnce).to.be.true + expect(definitionService.computeStoreAndCurate.calledOnce).to.be.true + }) + + it('skips if there is no coordinates', async () => { + await upgrader.processMessage({ data: {} }) + expect(definitionService.getStored.notCalled).to.be.true + expect(versionChecker.validate.notCalled).to.be.true + expect(definitionService.computeStoreAndCurate.notCalled).to.be.true + }) + + it('handles exception by rethrowing with coordinates and the original error message', async () => { + definitionService.getStored.resolves(definition) + versionChecker.validate.resolves() + definitionService.computeStoreAndCurate.rejects(new Error('test')) + + await expect(upgrader.processMessage(message)).to.be.rejectedWith(Error, /pypi\/pypi\/-\/test\/revision: test/) + }) + }) + + describe('Integration Test', () => { + const definition = Object.freeze({ + coordinates: { type: 'pypi', provider: 'pypi', name: 'test', revision: 'revision' }, + _meta: { schemaVersion: '0.0.1' } + }) + const message = Object.freeze({ data: { ...definition } }) + + let queue, handler, definitionService, versionChecker + beforeEach(() => { + let definitionUpgrader + ;({ definitionService, versionChecker, definitionUpgrader } = setupDefinitionUpgrader(logger)) + queue = { + dequeueMultiple: sinon.stub().resolves([message]), + delete: sinon.stub().resolves() + } + handler = new QueueHandler(queue, logger, definitionUpgrader) + }) + + it('handles exception and logs the coordinates and the original error message', async () => { + definitionService.getStored.resolves(definition) + versionChecker.validate.resolves() + definitionService.computeStoreAndCurate.rejects(new Error('test')) + + await handler.work(true) + expect(queue.dequeueMultiple.calledOnce).to.be.true + expect(queue.delete.called).to.be.false + expect(logger.error.calledOnce).to.be.true + expect(logger.error.args[0][0].message).to.match(/pypi\/pypi\/-\/test\/revision: test/) + }) + + it('skips compute if a definition is up-to-date', async () => { + definitionService.getStored.resolves(definition) + versionChecker.validate.resolves(definition) + + await handler.work(true) + expect(definitionService.getStored.calledOnce).to.be.true + expect(versionChecker.validate.calledOnce).to.be.true + expect(definitionService.computeStoreAndCurate.notCalled).to.be.true + expect(queue.delete.called).to.be.true + }) + + it('recomputes a definition, if a definition is not up-to-date', async () => { + definitionService.getStored.resolves(definition) + versionChecker.validate.resolves() + await handler.work(true) + expect(definitionService.getStored.calledOnce).to.be.true + expect(versionChecker.validate.calledOnce).to.be.true + expect(definitionService.computeStoreAndCurate.calledOnce).to.be.true + expect(queue.delete.called).to.be.true + }) + }) +}) + +function setupDefinitionUpgrader(logger) { + const definitionService = { + getStored: sinon.stub(), + computeStoreAndCurate: sinon.stub().resolves() + } + const versionChecker = { + validate: sinon.stub() + } + const definitionUpgrader = new DefinitionUpgrader(definitionService, logger, versionChecker) + return { definitionService, versionChecker, definitionUpgrader } +}