From 0efd3100eaded4fce8d50708c41805dfa92cd50f Mon Sep 17 00:00:00 2001 From: Qing Tomlinson Date: Wed, 11 Dec 2024 14:03:52 -0800 Subject: [PATCH] Address review comments -add logging to handle queuing failure -remove redundant JSON.stringify when queuing message (there is no impact to message decoding) -move queueHandler and defUpgrader to local scope in setup -minor refactoring to improve code reuse and readability -add more tests --- providers/upgrade/defUpgradeQueue.js | 19 ++- providers/upgrade/defVersionCheck.js | 6 +- providers/upgrade/process.js | 7 +- test/providers/upgrade/defUpgradeQueue.js | 163 ++++++++++++++-------- 4 files changed, 127 insertions(+), 68 deletions(-) diff --git a/providers/upgrade/defUpgradeQueue.js b/providers/upgrade/defUpgradeQueue.js index cd7a1b69..533d4de7 100644 --- a/providers/upgrade/defUpgradeQueue.js +++ b/providers/upgrade/defUpgradeQueue.js @@ -2,7 +2,6 @@ // SPDX-License-Identifier: MIT const { DefinitionVersionChecker } = require('./defVersionCheck') -const EntityCoordinates = require('../../lib/entityCoordinates') const { setup } = require('./process') class DefinitionQueueUpgrader extends DefinitionVersionChecker { @@ -17,11 +16,19 @@ class DefinitionQueueUpgrader extends DefinitionVersionChecker { async _queueUpgrade(definition) { if (!this._upgrade) throw new Error('Upgrade queue is not set') - const message = this._constructMessage(definition) - await this._upgrade.queue(JSON.stringify(message)) - this.logger.debug('Queued for definition upgrade ', { - coordinates: EntityCoordinates.fromObject(definition.coordinates).toString() - }) + try { + const message = this._constructMessage(definition) + await this._upgrade.queue(message) + this.logger.debug('Queued for definition upgrade ', { + coordinates: DefinitionVersionChecker.getCoordinates(definition) + }) + } catch (error) { + //continue if queuing fails and requeue at the next request. + this.logger.error(`Error queuing for definition upgrade ${error.message}`, { + error, + coordinates: DefinitionVersionChecker.getCoordinates(definition) + }) + } } _constructMessage(definition) { diff --git a/providers/upgrade/defVersionCheck.js b/providers/upgrade/defVersionCheck.js index 07a91a4a..8b380129 100644 --- a/providers/upgrade/defVersionCheck.js +++ b/providers/upgrade/defVersionCheck.js @@ -24,7 +24,7 @@ class DefinitionVersionChecker { if (!this._currentSchema) throw new Error('Current schema version is not set') const defSchemaVersion = get(definition, '_meta.schemaVersion') this.logger.debug(`Definition version: %s, Current schema version: %s `, defSchemaVersion, this._currentSchema, { - coordinates: definition?.coordinates && EntityCoordinates.fromObject(definition.coordinates).toString() + coordinates: DefinitionVersionChecker.getCoordinates(definition) }) if (defSchemaVersion && gte(defSchemaVersion, this._currentSchema)) return definition } @@ -36,6 +36,10 @@ class DefinitionVersionChecker { setupProcessing() { //do nothing for set up processing } + + static getCoordinates(definition) { + return definition?.coordinates && EntityCoordinates.fromObject(definition.coordinates).toString() + } } const factory = options => new DefinitionVersionChecker(options) diff --git a/providers/upgrade/process.js b/providers/upgrade/process.js index 9cfc9239..e2d717ad 100644 --- a/providers/upgrade/process.js +++ b/providers/upgrade/process.js @@ -78,12 +78,9 @@ class DefinitionUpgrader { } } -let queueHandler -let defUpgrader - function setup(_queue, _definitionService, _logger, once = false, _defVersionChecker = factory({ logger: _logger })) { - defUpgrader = new DefinitionUpgrader(_definitionService, _logger, _defVersionChecker) - queueHandler = new QueueHandler(_queue, _logger, defUpgrader) + const defUpgrader = new DefinitionUpgrader(_definitionService, _logger, _defVersionChecker) + const queueHandler = new QueueHandler(_queue, _logger, defUpgrader) return queueHandler.work(once) } diff --git a/test/providers/upgrade/defUpgradeQueue.js b/test/providers/upgrade/defUpgradeQueue.js index da4e9fbb..b6c90423 100644 --- a/test/providers/upgrade/defUpgradeQueue.js +++ b/test/providers/upgrade/defUpgradeQueue.js @@ -7,80 +7,131 @@ chai.use(chaiAsPromised) const { expect } = require('chai') const sinon = require('sinon') const DefinitionQueueUpgrader = require('../../../providers/upgrade/defUpgradeQueue') +const MemoryQueue = require('../../../providers/upgrade/memoryQueueConfig') describe('DefinitionQueueUpgrader', () => { - const definition = { coordinates: 'test', _meta: { schemaVersion: '1.0.0' } } - let queue, upgrader - - beforeEach(async () => { - const logger = { debug: sinon.stub() } - queue = { - queue: sinon.stub().resolves(), - initialize: sinon.stub().resolves() - } - const queueFactory = sinon.stub().returns(queue) - upgrader = new DefinitionQueueUpgrader({ logger, queue: queueFactory }) - }) + const logger = { debug: sinon.stub(), error: sinon.stub() } - it('returns an instance of DefinitionQueueUpgrader', () => { - expect(upgrader).to.be.an.instanceOf(DefinitionQueueUpgrader) - }) + describe('Unit tests', () => { + const definition = { coordinates: 'test', _meta: { schemaVersion: '1.0.0' } } + let queue, upgrader - it('sets and gets current schema version', () => { - upgrader.currentSchema = '1.0.0' - expect(upgrader.currentSchema).to.equal('1.0.0') - }) + beforeEach(async () => { + queue = { + queue: sinon.stub().resolves(), + initialize: sinon.stub().resolves() + } + const queueFactory = sinon.stub().returns(queue) + upgrader = new DefinitionQueueUpgrader({ logger, queue: queueFactory }) + }) - it('initializes', async () => { - await upgrader.initialize() - expect(queue.initialize.calledOnce).to.be.true - }) + it('returns an instance of DefinitionQueueUpgrader', () => { + expect(upgrader).to.be.an.instanceOf(DefinitionQueueUpgrader) + }) - it('connects to queue after setupProcessing', async () => { - await upgrader.initialize() - const definitionService = { currentSchema: '1.0.0' } - const logger = { debug: sinon.stub() } - queue.dequeueMultiple = sinon.stub().resolves([]) - upgrader.setupProcessing(definitionService, logger, true) - expect(queue.dequeueMultiple.calledOnce).to.be.true - }) + it('sets and gets current schema version', () => { + upgrader.currentSchema = '1.0.0' + expect(upgrader.currentSchema).to.equal('1.0.0') + }) - context('validate', () => { - it('fails if current schema version is not set', async () => { - await expect(upgrader.validate(definition)).to.be.rejectedWith(Error) + it('initializes', async () => { + await upgrader.initialize() + expect(queue.initialize.calledOnce).to.be.true }) - it('fails if it is not initialized', async () => { - upgrader.currentSchema = '1.0.0' - const stale = { coordinates: 'test', _meta: { schemaVersion: '0.0.1' } } - await expect(upgrader.validate(stale)).to.be.rejectedWith(Error) + it('connects to queue after setupProcessing', async () => { + await upgrader.initialize() + const definitionService = { currentSchema: '1.0.0' } + queue.dequeueMultiple = sinon.stub().resolves([]) + upgrader.setupProcessing(definitionService, logger, true) + expect(queue.dequeueMultiple.calledOnce).to.be.true + }) + + context('validate', () => { + it('fails if current schema version is not set', async () => { + await expect(upgrader.validate(definition)).to.be.rejectedWith(Error) + }) + + it('fails if it is not initialized', async () => { + upgrader.currentSchema = '1.0.0' + const stale = { coordinates: 'test', _meta: { schemaVersion: '0.0.1' } } + await expect(upgrader.validate(stale)).to.be.rejectedWith(Error) + }) + }) + + context('validate after set up', () => { + beforeEach(async () => { + await upgrader.initialize() + upgrader.currentSchema = '1.0.0' + }) + + it('does not queue null definition', async () => { + const result = await upgrader.validate(null) + expect(result).to.be.not.ok + expect(queue.queue.called).to.be.false + }) + + it('does not queue an up-to-date definition', async () => { + const definition = { coordinates: 'test', _meta: { schemaVersion: '1.0.0' } } + const result = await upgrader.validate(definition) + expect(result).to.deep.equal(definition) + expect(queue.queue.called).to.be.false + }) + + it('queues and returns a stale definition', async () => { + const definition = { coordinates: 'test', _meta: { schemaVersion: '0.0.1' } } + const result = await upgrader.validate(definition) + expect(result).to.deep.equal(definition) + expect(queue.queue.calledOnce).to.be.true + }) + + it('logs erorr when queueing throws', async () => { + const staleDef = { + coordinates: { + type: 'npm', + provider: 'npmjs', + name: 'lodash', + revision: '4.17.11' + }, + _meta: { schemaVersion: '0.0.1' } + } + queue.queue.rejects(new Error('test')) + const result = await upgrader.validate(staleDef) + expect(result).to.deep.equal(staleDef) + expect(logger.error.calledOnce).to.be.true + const { coordinates } = logger.error.args[0][1] + expect(coordinates).to.eq('npm/npmjs/-/lodash/4.17.11') + }) }) }) - context('validate after set up', () => { + describe('Integration tests', () => { + let queue, upgrader + beforeEach(async () => { + queue = MemoryQueue() + upgrader = new DefinitionQueueUpgrader({ logger, queue: sinon.stub().returns(queue) }) await upgrader.initialize() upgrader.currentSchema = '1.0.0' }) - it('does not queue null definition', async () => { - const result = await upgrader.validate(null) - expect(result).to.be.not.ok - expect(queue.queue.called).to.be.false - }) - - it('does not queue an up-to-date definition', async () => { - const definition = { coordinates: 'test', _meta: { schemaVersion: '1.0.0' } } - const result = await upgrader.validate(definition) - expect(result).to.deep.equal(definition) - expect(queue.queue.called).to.be.false - }) + it('queues the correct message that can be decoded correctly', async () => { + const staleDef = { + coordinates: { + type: 'npm', + provider: 'npmjs', + name: 'lodash', + revision: '4.17.11' + }, + _meta: { schemaVersion: '0.0.1' } + } + const result = await upgrader.validate(staleDef) + expect(result).to.deep.equal(staleDef) + expect(queue.data.length).to.equal(1) - it('queues and returns a stale definition', async () => { - const definition = { coordinates: 'test', _meta: { schemaVersion: '0.0.1' } } - const result = await upgrader.validate(definition) - expect(result).to.deep.equal(definition) - expect(queue.queue.calledOnce).to.be.true + const message = await queue.dequeue() + const coordinates = message.data.coordinates + expect(coordinates).to.deep.equal(staleDef.coordinates) }) }) })