Skip to content

Commit

Permalink
Avoid recomputing definition when the coordinates are queued multiple…
Browse files Browse the repository at this point in the history
… times within one batch
  • Loading branch information
qtomlinson committed Dec 3, 2024
1 parent b31e6e1 commit 96938b4
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 3 deletions.
26 changes: 24 additions & 2 deletions providers/upgrade/process.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
const { get } = require('lodash')
const EntityCoordinates = require('../../lib/entityCoordinates')
const { factory } = require('./defVersionCheck')
const Cache = require('../caching/memory')

class QueueHandler {
constructor(queue, logger, messageHandler = { processMessage: async () => {} }) {
Expand Down Expand Up @@ -33,18 +34,39 @@ class QueueHandler {
}

class DefinitionUpgrader {
constructor(definitionService, logger, defVersionChecker) {
static defaultTtlSeconds = 60 * 5 /* 5 mins */
static delayInMSeconds = 500

constructor(
definitionService,
logger,
defVersionChecker,
cache = Cache({ defaultTtlSeconds: DefinitionUpgrader.defaultTtlSeconds })
) {
this.logger = logger
this._definitionService = definitionService
this._defVersionChecker = defVersionChecker
this._defVersionChecker.currentSchema = definitionService.currentSchema
this._upgradeLock = cache
}

async processMessage(message) {
let coordinates = get(message, 'data.coordinates')
if (!coordinates) return

coordinates = EntityCoordinates.fromObject(coordinates)

while (this._upgradeLock.get(coordinates.toString())) {
await new Promise(resolve => setTimeout(resolve, DefinitionUpgrader.delayInMSeconds))
}
try {
this._upgradeLock.set(coordinates.toString(), true)
await this._upgradeIfNecessary(coordinates)
} finally {
this._upgradeLock.delete(coordinates.toString())
}
}

async _upgradeIfNecessary(coordinates) {
const existing = await this._definitionService.getStored(coordinates)
let result = await this._defVersionChecker.validate(existing)
if (!result) {
Expand Down
16 changes: 15 additions & 1 deletion test/business/definitionServiceTest.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ const DefinitionQueueUpgrader = require('../../providers/upgrade/defUpgradeQueue
const memoryQueue = require('../../providers/upgrade/memoryQueueConfig')
const { DefinitionVersionChecker } = require('../../providers/upgrade/defVersionCheck')
const util = require('util')
const { fail } = require('assert')

describe('Definition Service', () => {
it('invalidates single coordinate', async () => {
Expand Down Expand Up @@ -443,6 +442,21 @@ describe('Integration test', () => {
expect(store.store.calledOnce).to.be.true
expect(queue.data.length).to.eq(0)
})

it('computes once when the same coordinates is queued twice within one dequeue batch ', async () => {
const { service, store } = setupServiceForUpgrade(staleDef, upgradeHandler)
await service.get(coordinates)
await service.get(coordinates)
queue.dequeueMultiple = sinon.stub().callsFake(async () => {
const message1 = await queue.dequeue()
const message2 = await queue.dequeue()
return Promise.resolve([message1, message2])
})
await upgradeHandler.setupProcessing(service, logger, true)
const newResult = await service.get(coordinates)
expect(newResult._meta.schemaVersion).to.eq('1.7.0')
expect(store.store.calledOnce).to.be.true
})
})
})
})
Expand Down

0 comments on commit 96938b4

Please sign in to comment.