From 740f5fa44eaeffc4fcbace3b2f0da0fcee8ddccd Mon Sep 17 00:00:00 2001 From: Robat Williams Date: Thu, 21 Dec 2023 13:58:59 +0000 Subject: [PATCH] Tests and fixes for concurrency limiter --- src/functions/ConcurrencyLimitedFetch.mjs | 3 +- .../ConcurrencyLimitedFetch.test.mjs | 84 +++++++++++++++++++ 2 files changed, 86 insertions(+), 1 deletion(-) create mode 100644 src/functions/ConcurrencyLimitedFetch.test.mjs diff --git a/src/functions/ConcurrencyLimitedFetch.mjs b/src/functions/ConcurrencyLimitedFetch.mjs index 28a23bb..62bcbe4 100644 --- a/src/functions/ConcurrencyLimitedFetch.mjs +++ b/src/functions/ConcurrencyLimitedFetch.mjs @@ -36,7 +36,7 @@ export default class ConcurrencyLimitedFetch { _process() { if ( this._queue.length === 0 || - this._pendingCount > ConcurrencyLimitedFetch._PENDING_LIMIT + this._pendingCount >= ConcurrencyLimitedFetch._PENDING_LIMIT ) { return; } @@ -45,6 +45,7 @@ export default class ConcurrencyLimitedFetch { if (task.args.options.signal.aborted) { task.reject(task.args.options.signal.reason); + return; } this._pendingCount++; diff --git a/src/functions/ConcurrencyLimitedFetch.test.mjs b/src/functions/ConcurrencyLimitedFetch.test.mjs new file mode 100644 index 0000000..42e29a8 --- /dev/null +++ b/src/functions/ConcurrencyLimitedFetch.test.mjs @@ -0,0 +1,84 @@ +import assert from 'node:assert'; +import { describe, it } from 'node:test'; +import ConcurrencyLimitedFetch from './ConcurrencyLimitedFetch.mjs'; + +describe('ConcurrencyLimitedFetch', () => { + it('fetches immediately when none are queued or pending', (t) => { + t.mock.method(global, 'fetch', () => Promise.resolve()); + const fetcher = new ConcurrencyLimitedFetch(); + + fetcher.fetch('', makeFetchOptions()); + + assert.strictEqual(fetch.mock.callCount(), 1); + }); + + it('fetches immediately when none are queued and fewer than limit are pending', (t) => { + t.mock.method(global, 'fetch', () => Promise.resolve()); + const fetcher = new ConcurrencyLimitedFetch(); + + fetcher.fetch('', makeFetchOptions()); + fetcher.fetch('', makeFetchOptions()); + fetcher.fetch('', makeFetchOptions()); + + assert.strictEqual(fetch.mock.callCount(), 3); + }); + + it('queues when more than limit are pending', (t) => { + t.mock.method(global, 'fetch', () => Promise.resolve()); + const fetcher = new ConcurrencyLimitedFetch(); + + for (let i = 0; i < 15; i++) { + fetcher.fetch('', makeFetchOptions()); + } + + assert.strictEqual(fetch.mock.callCount(), 10); + }); + + it('fetches the next queued when a pending completes', async (t) => { + t.mock.method(global, 'fetch', () => Promise.resolve()); + const fetcher = new ConcurrencyLimitedFetch(); + + for (let i = 0; i < 15; i++) { + fetcher.fetch('', makeFetchOptions()); + } + + await flushPromises(); + assert.strictEqual(fetch.mock.callCount(), 15); + }); + + it('does not fetch when a pending completes and there is none queued', async (t) => { + t.mock.method(global, 'fetch', () => Promise.resolve()); + const fetcher = new ConcurrencyLimitedFetch(); + + fetcher.fetch('', makeFetchOptions()); + + await flushPromises(); + assert.strictEqual(fetch.mock.callCount(), 1); + }); + + it('does not fetch if signal was aborted while was in queue', async (t) => { + t.mock.method(global, 'fetch', () => Promise.resolve()); + const fetcher = new ConcurrencyLimitedFetch(); + + for (let i = 0; i < 10; i++) { + fetcher.fetch('', makeFetchOptions()); + } + + const abortController = new AbortController(); + const aborted = fetcher.fetch('', makeFetchOptions({ abortController })); + abortController.abort(); + + await assert.rejects(aborted, 'AbortError: This operation was aborted'); + assert.strictEqual(fetch.mock.callCount(), 10); + }); +}); + +function makeFetchOptions({ abortController } = {}) { + return { + signal: (abortController ?? new AbortController()).signal, + }; +} + +function flushPromises() { + return new Promise(setImmediate); +}