Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adjust logging and allow configuration of batch size for dequeueMultiple #1258

Merged
merged 2 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion providers/upgrade/azureQueueConfig.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ const defaultOptions = {
config.get('DEFINITION_UPGRADE_QUEUE_CONNECTION_STRING') || config.get('HARVEST_AZBLOB_CONNECTION_STRING'),
queueName: config.get('DEFINITION_UPGRADE_QUEUE_NAME') || 'definitions-upgrade',
dequeueOptions: {
numOfMessages: 32,
numOfMessages: config.get('DEFINITION_UPGRADE_DEQUEUE_BATCH_SIZE') || 16,
visibilityTimeout: 10 * 60 // 10 min. The default value is 30 seconds.
}
}
Expand Down
6 changes: 3 additions & 3 deletions providers/upgrade/defUpgradeQueue.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ class DefinitionQueueUpgrader extends DefinitionVersionChecker {
try {
const message = this._constructMessage(definition)
await this._upgrade.queue(message)
this.logger.debug('Queued for definition upgrade ', {
this.logger.info('Queued for definition upgrade ', {
coordinates: DefinitionVersionChecker.getCoordinates(definition)
})
} catch (error) {
//continue if queuing fails and requeue at the next request.
this.logger.error(`Error queuing for definition upgrade ${error.message}`, {
//continue if queueing fails and requeue at the next request.
this.logger.error(`Error queueing for definition upgrade ${error.message}`, {
error,
coordinates: DefinitionVersionChecker.getCoordinates(definition)
})
Expand Down
21 changes: 14 additions & 7 deletions providers/upgrade/process.js
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,20 @@ class DefinitionUpgrader {
}

async _upgradeIfNecessary(coordinates) {
const existing = await this._definitionService.getStored(coordinates)
let result = await this._defVersionChecker.validate(existing)
if (!result) {
await this._definitionService.computeStoreAndCurate(coordinates)
this.logger.info(`Handled definition update for ${coordinates.toString()}`)
} else {
this.logger.debug(`Skipped definition update for ${coordinates.toString()}`)
try {
const existing = await this._definitionService.getStored(coordinates)
let result = await this._defVersionChecker.validate(existing)
if (!result) {
await this._definitionService.computeStoreAndCurate(coordinates)
this.logger.info('Handled definition upgrade for %s', coordinates)
} else {
this.logger.debug('Skipped definition upgrade for %s', coordinates)
}
} catch (error) {
const context = `Error handling definition upgrade for ${coordinates.toString()}`
const newError = new Error(`${context}: ${error.message}`)
newError.stack = error.stack
throw newError
}
}
}
Expand Down
5 changes: 4 additions & 1 deletion test/providers/upgrade/defUpgradeQueue.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ const DefinitionQueueUpgrader = require('../../../providers/upgrade/defUpgradeQu
const MemoryQueue = require('../../../providers/upgrade/memoryQueueConfig')

describe('DefinitionQueueUpgrader', () => {
const logger = { debug: sinon.stub(), error: sinon.stub() }
let logger
beforeEach(() => {
logger = { debug: sinon.stub(), error: sinon.stub() }
})

describe('Unit tests', () => {
const definition = { coordinates: 'test', _meta: { schemaVersion: '1.0.0' } }
Expand Down
86 changes: 82 additions & 4 deletions test/providers/upgrade/processTest.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
// (c) Copyright 2024, SAP SE and ClearlyDefined contributors. Licensed under the MIT license.
// SPDX-License-Identifier: MIT

const chaiAsPromised = require('chai-as-promised')
const chai = require('chai')
chai.use(chaiAsPromised)
const { expect } = require('chai')
const sinon = require('sinon')
const { QueueHandler, DefinitionUpgrader } = require('../../../providers/upgrade/process')
const EntityCoordinates = require('../../../lib/entityCoordinates')

describe('Definition Upgrade Queue Processing', () => {
let logger
Expand Down Expand Up @@ -80,7 +84,9 @@ describe('Definition Upgrade Queue Processing', () => {
})

describe('DefinitionUpgrader', () => {
const definition = Object.freeze({ coordinates: 'pypi/pypi/-/test/revision' })
const coordinates = 'pypi/pypi/-/test/revision'
const definition = Object.freeze({ coordinates: EntityCoordinates.fromString(coordinates) })
const message = Object.freeze({ data: { coordinates: definition.coordinates } })
let definitionService, versionChecker, upgrader

beforeEach(() => {
Expand All @@ -98,7 +104,7 @@ describe('Definition Upgrade Queue Processing', () => {
definitionService.getStored.resolves(definition)
versionChecker.validate.resolves()

await upgrader.processMessage({ data: { coordinates: 'pypi/pypi/-/test/revision' } })
await upgrader.processMessage(message)
expect(definitionService.getStored.calledOnce).to.be.true
expect(versionChecker.validate.calledOnce).to.be.true
expect(definitionService.computeStoreAndCurate.calledOnce).to.be.true
Expand All @@ -108,7 +114,7 @@ describe('Definition Upgrade Queue Processing', () => {
definitionService.getStored.resolves(definition)
versionChecker.validate.resolves(definition)

await upgrader.processMessage({ data: { coordinates: 'pypi/pypi/-/test/revision' } })
await upgrader.processMessage(message)
expect(definitionService.getStored.calledOnce).to.be.true
expect(versionChecker.validate.calledOnce).to.be.true
expect(definitionService.computeStoreAndCurate.notCalled).to.be.true
Expand All @@ -118,7 +124,7 @@ describe('Definition Upgrade Queue Processing', () => {
definitionService.getStored.resolves()
versionChecker.validate.resolves()

await upgrader.processMessage({ data: { coordinates: 'pypi/pypi/-/test/revision' } })
await upgrader.processMessage(message)
expect(definitionService.getStored.calledOnce).to.be.true
expect(versionChecker.validate.calledOnce).to.be.true
expect(definitionService.computeStoreAndCurate.calledOnce).to.be.true
Expand All @@ -130,5 +136,77 @@ describe('Definition Upgrade Queue Processing', () => {
expect(versionChecker.validate.notCalled).to.be.true
expect(definitionService.computeStoreAndCurate.notCalled).to.be.true
})

it('handles exception by rethrowing with coordinates and the original error message', async () => {
definitionService.getStored.resolves(definition)
versionChecker.validate.resolves()
definitionService.computeStoreAndCurate.rejects(new Error('test'))

await expect(upgrader.processMessage(message)).to.be.rejectedWith(Error, /pypi\/pypi\/-\/test\/revision: test/)
})
})

describe('Integration Test', () => {
const definition = Object.freeze({
coordinates: { type: 'pypi', provider: 'pypi', name: 'test', revision: 'revision' },
_meta: { schemaVersion: '0.0.1' }
})
const message = Object.freeze({ data: { ...definition } })

let queue, handler, definitionService, versionChecker
beforeEach(() => {
let definitionUpgrader
;({ definitionService, versionChecker, definitionUpgrader } = setupDefinitionUpgrader(logger))
queue = {
dequeueMultiple: sinon.stub().resolves([message]),
delete: sinon.stub().resolves()
}
handler = new QueueHandler(queue, logger, definitionUpgrader)
})

it('handles exception and logs the coordinates and the original error message', async () => {
definitionService.getStored.resolves(definition)
versionChecker.validate.resolves()
definitionService.computeStoreAndCurate.rejects(new Error('test'))

await handler.work(true)
expect(queue.dequeueMultiple.calledOnce).to.be.true
expect(queue.delete.called).to.be.false
expect(logger.error.calledOnce).to.be.true
expect(logger.error.args[0][0].message).to.match(/pypi\/pypi\/-\/test\/revision: test/)
})

it('skips compute if a definition is up-to-date', async () => {
definitionService.getStored.resolves(definition)
versionChecker.validate.resolves(definition)

await handler.work(true)
expect(definitionService.getStored.calledOnce).to.be.true
expect(versionChecker.validate.calledOnce).to.be.true
expect(definitionService.computeStoreAndCurate.notCalled).to.be.true
expect(queue.delete.called).to.be.true
})

it('recomputes a definition, if a definition is not up-to-date', async () => {
definitionService.getStored.resolves(definition)
versionChecker.validate.resolves()
await handler.work(true)
expect(definitionService.getStored.calledOnce).to.be.true
expect(versionChecker.validate.calledOnce).to.be.true
expect(definitionService.computeStoreAndCurate.calledOnce).to.be.true
expect(queue.delete.called).to.be.true
})
})
})

function setupDefinitionUpgrader(logger) {
const definitionService = {
getStored: sinon.stub(),
computeStoreAndCurate: sinon.stub().resolves()
}
const versionChecker = {
validate: sinon.stub()
}
const definitionUpgrader = new DefinitionUpgrader(definitionService, logger, versionChecker)
return { definitionService, versionChecker, definitionUpgrader }
}
Loading