diff --git a/src/defaults.js b/src/defaults.js index d0c7fa5..6960556 100644 --- a/src/defaults.js +++ b/src/defaults.js @@ -133,7 +133,7 @@ export function getOptionsWithDefaults (options) { // Idle Queues idleFor: options.idleFor || options['idle-for'] || defaults.idleFor, delete: options.delete || defaults.delete, - unpair: options.delete || defaults.unpair, + unpair: options.unpair || defaults.unpair, // Check create: options.create || defaults.create, diff --git a/src/idleQueues.js b/src/idleQueues.js index 48e6c94..6dc4a15 100644 --- a/src/idleQueues.js +++ b/src/idleQueues.js @@ -37,6 +37,7 @@ const metricNames = [ * Actual SQS call, used in conjunction with cache. */ export async function _cheapIdleCheck (qname, qrl, opt) { + debug('_cheapIdleCheck', qname, qrl) try { const client = getSQSClient() const cmd = new GetQueueAttributesCommand({ AttributeNames: attributeNames, QueueUrl: qrl }) @@ -46,11 +47,13 @@ export async function _cheapIdleCheck (qname, qrl, opt) { result.queue = qname.slice(opt.prefix.length) // We are idle if all the messages attributes are zero result.idle = attributeNames.filter(k => result[k] === '0').length === attributeNames.length + result.exists = true + debug({ result, SQS: 1 }) return { result, SQS: 1 } } catch (e) { + debug({ _cheapIdleCheck: e }) if (e instanceof QueueDoesNotExist) { - // Count deleted queues as idle - return { result: { idle: true }, SQS: 1 } + return { result: { idle: undefined, exists: false }, SQS: 1 } } else { throw e } @@ -62,6 +65,7 @@ export async function _cheapIdleCheck (qname, qrl, opt) { * at this immediate moment. */ export async function cheapIdleCheck (qname, qrl, opt) { + debug('cheapIdleCheck', qname, qrl) // Just call the API if we don't have a cache if (!opt.cacheUri) return _cheapIdleCheck(qname, qrl, opt) @@ -125,19 +129,20 @@ export async function checkIdle (qname, qrl, opt) { const { result: cheapResult, SQS } = await cheapIdleCheck(qname, qrl, opt) debug('cheapResult', cheapResult) - // Short circuit further calls if cheap result shows data - if (cheapResult.idle === false) { + // Short circuit further calls if cheap result is conclusive + if (cheapResult.idle === false || cheapResult.exists === false) { return { queue: qname.slice(opt.prefix.length), cheap: cheapResult, - idle: false, + idle: cheapResult.idle, + exists: cheapResult.exists, apiCalls: { SQS, CloudWatch: 0 } } } // If we get here, there's nothing in the queue at the moment, // so we have to check metrics one at a time - const apiCalls = { SQS: 1, CloudWatch: 0 } + const apiCalls = { SQS, CloudWatch: 0 } const results = [] let idle = true for (const metricName of metricNames) { @@ -158,7 +163,8 @@ export async function checkIdle (qname, qrl, opt) { queue: qname.slice(opt.prefix.length), cheap: cheapResult, apiCalls, - idle + idle, + exists: true }, ...results // merge in results from CloudWatch ) @@ -210,107 +216,96 @@ export async function processQueue (qname, qrl, opt) { } /** - * Processes a queue and its fail queue, treating them as a unit. + * Processes a queue and its fail and delete queue, treating them as a unit. */ -export async function processQueuePair (qname, qrl, opt) { +export async function processQueueSet (qname, qrl, opt) { const isFifo = qname.endsWith('.fifo') const normalizeOptions = Object.assign({}, opt, { fifo: isFifo }) + + // Generate DLQ name/url + const dqname = normalizeDLQName(qname, normalizeOptions) + const dqrl = normalizeDLQName(dqname, normalizeOptions) + // Generate fail queue name/url const fqname = normalizeFailQueueName(qname, normalizeOptions) const fqrl = normalizeFailQueueName(fqname, normalizeOptions) - // Generate DLQ name/url - const dqname = normalizeDLQName(qname, normalizeOptions) - const dqrl = normalizeDLQName(fqname, normalizeOptions) - // Idle check - const result = await checkIdle(qname, qrl, opt) - debug('result', result) + debug({ qname, qrl, dqname, dqrl, fqname, fqrl }) - // Queue is active - const active = !result.idle - if (active) { - if (opt.verbose) console.error(chalk.blue('Queue ') + qname.slice(opt.prefix.length) + chalk.blue(' has been ') + 'active' + chalk.blue(' in the last ') + opt.idleFor + chalk.blue(' minutes.')) - return result - } - - // Queue is idle - if (opt.verbose) console.error(chalk.blue('Queue ') + qname.slice(opt.prefix.length) + chalk.blue(' has been ') + 'idle' + chalk.blue(' for the last ') + opt.idleFor + chalk.blue(' minutes.')) + // Idle check + const qresult = await checkIdle(qname, qrl, opt) + const fqresult = await checkIdle(fqname, fqrl, opt) + const dqresult = await checkIdle(dqname, dqrl, opt) + debug({ qresult, fqresult, dqresult }) - // Check fail queue - try { - const fresult = await checkIdle(fqname, fqrl, opt) - debug('fresult', fresult) - const idleCheckResult = Object.assign( - result, - { idle: result.idle && fresult.idle, failq: fresult }, - { - apiCalls: { - SQS: result.apiCalls.SQS + fresult.apiCalls.SQS, - CloudWatch: result.apiCalls.CloudWatch + fresult.apiCalls.CloudWatch - } + // Start building return value + const result = Object.assign( + { + queue: qname, + idle: ( + qresult.idle && + (!fqresult.exists || fqresult.idle) && + (!fqresult.exists || dqresult.idle) + ), + apiCalls: { + SQS: qresult.apiCalls.SQS + fqresult.apiCalls.SQS + dqresult.apiCalls.SQS, + CloudWatch: qresult.apiCalls.CloudWatch + fqresult.apiCalls.CloudWatch + dqresult.apiCalls.CloudWatch } - ) - - // Queue is active - const factive = !fresult.idle - if (factive) { - if (opt.verbose) console.error(chalk.blue('Queue ') + fqname.slice(opt.prefix.length) + chalk.blue(' has been ') + 'active' + chalk.blue(' in the last ') + opt.idleFor + chalk.blue(' minutes.')) - return idleCheckResult } + ) - // Queue is idle - if (opt.verbose) console.error(chalk.blue('Queue ') + fqname.slice(opt.prefix.length) + chalk.blue(' has been ') + 'idle' + chalk.blue(' for the last ') + opt.idleFor + chalk.blue(' minutes.')) + // Queue is idle + if (qresult.idle && opt.verbose) console.error(chalk.blue('Queue ') + qname.slice(opt.prefix.length) + chalk.blue(' has been ') + 'idle' + chalk.blue(' for the last ') + opt.idleFor + chalk.blue(' minutes.')) + if (fqresult.idle && opt.verbose) console.error(chalk.blue('Queue ') + fqname.slice(opt.prefix.length) + chalk.blue(' has been ') + 'idle' + chalk.blue(' for the last ') + opt.idleFor + chalk.blue(' minutes.')) + if (dqresult.idle && opt.verbose) console.error(chalk.blue('Queue ') + dqname.slice(opt.prefix.length) + chalk.blue(' has been ') + 'idle' + chalk.blue(' for the last ') + opt.idleFor + chalk.blue(' minutes.')) - // Trigger a delete if the user wants it - if (!opt.delete) return idleCheckResult + // Delete if all are idle + const canDelete = ( + (qresult.idle || qresult.exists === false) && + (fqresult.idle || fqresult.exists === false) && + (dqresult.idle || dqresult.exists === false) + ) + debug({ canDelete }) - let dresult, dfresult - try { - dresult = await deleteQueue(qname, qrl, opt) - } catch (e) { - // Ignore queues that don't exist already in case that: 1) there was a double - // call or 2) the SQS node that got the request is not consistent yet - if (!(e instanceof QueueDoesNotExist)) throw e - } - try { - dfresult = await deleteQueue(fqname, fqrl, opt) - } catch (e) { - // Ignore queues that don't exist already in case that: 1) there was a double - // call or 2) the SQS node that got the request is not consistent yet - if (!(e instanceof QueueDoesNotExist)) throw e - } - return Object.assign(idleCheckResult, { - apiCalls: { - // Sum the SQS calls across all four - SQS: [result, fresult, dresult, dfresult] - .map(r => r.apiCalls.SQS) - .reduce((a, b) => a + b, 0), - // Sum the CloudWatch calls across all four - CloudWatch: [result, fresult, dresult, dfresult] - .map(r => r.apiCalls.CloudWatch) - .reduce((a, b) => a + b, 0) + if (opt.delete && canDelete) { + // Normal + const qdresult = await (async () => { + debug({ qresult }) + try { + if (qresult.idle) return deleteQueue(qname, qrl, opt) + } catch (e) { + if (!(e instanceof QueueDoesNotExist)) throw e } - }) - } catch (e) { - // Handle the case where the fail queue has been deleted or was never - // created for some reason - if (!(e instanceof QueueDoesNotExist)) throw e + })() + if (qdresult) { result.apiCalls.SQS += qdresult.apiCalls.SQS } + debug({ qdresult }) - // Fail queue doesn't exist if we get here - if (opt.verbose) console.error(chalk.blue('Queue ') + fqname.slice(opt.prefix.length) + chalk.blue(' does not exist.')) + // Fail + const fqdresult = await (async () => { + debug({ fqresult }) + try { + if (fqresult.idle) return deleteQueue(fqname, fqrl, opt) + } catch (e) { + if (!(e instanceof QueueDoesNotExist)) throw e + } + })() + if (fqdresult) { result.apiCalls.SQS += fqdresult.apiCalls.SQS } + debug({ fqdresult }) - // Handle delete - if (!opt.delete) return result - const deleteResult = await deleteQueue(qname, qrl, opt) - const resultIncludingDelete = Object.assign(result, { - deleted: deleteResult.deleted, - apiCalls: { - SQS: result.apiCalls.SQS + deleteResult.apiCalls.SQS, - CloudWatch: result.apiCalls.CloudWatch + deleteResult.apiCalls.CloudWatch + // Dead + const dqdresult = await (async () => { + debug({ dqresult }) + try { + if (dqresult.idle) return deleteQueue(dqname, dqrl, opt) + } catch (e) { + if (!(e instanceof QueueDoesNotExist)) throw e } - }) - return resultIncludingDelete + })() + if (dqdresult) { result.apiCalls.SQS += dqdresult.apiCalls.SQS } + debug({ dqdresult }) } + + return result } // @@ -333,7 +328,11 @@ export async function idleQueues (queues, options) { const sufFifo = opt.failSuffix + fifoSuffix const isFail = entry.qname.endsWith(suf) const isFifoFail = entry.qname.endsWith(sufFifo) - return opt.includeFailed ? true : (!isFail && !isFifoFail) + const sufDead = opt.dlqSuffix + const sufFifoDead = opt.dlqSuffix + fifoSuffix + const isDead = entry.qname.endsWith(sufDead) + const isFifoDead = entry.qname.endsWith(sufFifoDead) + return opt.includeFailed ? true : (!isFail && !isFifoFail && !isDead && !isFifoDead) }) // But only if we have queues to remove @@ -347,7 +346,7 @@ export async function idleQueues (queues, options) { } // Check each queue in parallel if (opt.unpair) return Promise.all(filteredEntries.map(e => processQueue(e.qname, e.qrl, opt))) - return Promise.all(filteredEntries.map(e => processQueuePair(e.qname, e.qrl, opt))) + return Promise.all(filteredEntries.map(e => processQueueSet(e.qname, e.qrl, opt))) } // Otherwise, let caller know diff --git a/test/idleQueues.test.js b/test/idleQueues.test.js index 59a6ca9..c6090b2 100644 --- a/test/idleQueues.test.js +++ b/test/idleQueues.test.js @@ -43,6 +43,7 @@ describe('_cheapIdleCheck', () => { ApproximateNumberOfMessages: '1', ApproximateNumberOfMessagesNotVisible: '0', idle: false, + exists: true, queue: qname } }) @@ -79,6 +80,7 @@ describe('cheapIdleCheck', () => { ApproximateNumberOfMessages: '1', ApproximateNumberOfMessagesNotVisible: '0', idle: false, + exists: true, queue: qname } })