Skip to content

Commit

Permalink
Tests and fixes for concurrency limiter
Browse files Browse the repository at this point in the history
  • Loading branch information
robatwilliams committed Dec 21, 2023
1 parent e967a4d commit 740f5fa
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 1 deletion.
3 changes: 2 additions & 1 deletion src/functions/ConcurrencyLimitedFetch.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ export default class ConcurrencyLimitedFetch {
_process() {
if (
this._queue.length === 0 ||
this._pendingCount > ConcurrencyLimitedFetch._PENDING_LIMIT
this._pendingCount >= ConcurrencyLimitedFetch._PENDING_LIMIT
) {
return;
}
Expand All @@ -45,6 +45,7 @@ export default class ConcurrencyLimitedFetch {

if (task.args.options.signal.aborted) {
task.reject(task.args.options.signal.reason);
return;
}

this._pendingCount++;
Expand Down
84 changes: 84 additions & 0 deletions src/functions/ConcurrencyLimitedFetch.test.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import assert from 'node:assert';
import { describe, it } from 'node:test';
import ConcurrencyLimitedFetch from './ConcurrencyLimitedFetch.mjs';

describe('ConcurrencyLimitedFetch', () => {
it('fetches immediately when none are queued or pending', (t) => {
t.mock.method(global, 'fetch', () => Promise.resolve());
const fetcher = new ConcurrencyLimitedFetch();

fetcher.fetch('', makeFetchOptions());

assert.strictEqual(fetch.mock.callCount(), 1);
});

it('fetches immediately when none are queued and fewer than limit are pending', (t) => {
t.mock.method(global, 'fetch', () => Promise.resolve());
const fetcher = new ConcurrencyLimitedFetch();

fetcher.fetch('', makeFetchOptions());
fetcher.fetch('', makeFetchOptions());
fetcher.fetch('', makeFetchOptions());

assert.strictEqual(fetch.mock.callCount(), 3);
});

it('queues when more than limit are pending', (t) => {
t.mock.method(global, 'fetch', () => Promise.resolve());
const fetcher = new ConcurrencyLimitedFetch();

for (let i = 0; i < 15; i++) {
fetcher.fetch('', makeFetchOptions());
}

assert.strictEqual(fetch.mock.callCount(), 10);
});

it('fetches the next queued when a pending completes', async (t) => {
t.mock.method(global, 'fetch', () => Promise.resolve());
const fetcher = new ConcurrencyLimitedFetch();

for (let i = 0; i < 15; i++) {
fetcher.fetch('', makeFetchOptions());
}

await flushPromises();
assert.strictEqual(fetch.mock.callCount(), 15);
});

it('does not fetch when a pending completes and there is none queued', async (t) => {
t.mock.method(global, 'fetch', () => Promise.resolve());
const fetcher = new ConcurrencyLimitedFetch();

fetcher.fetch('', makeFetchOptions());

await flushPromises();
assert.strictEqual(fetch.mock.callCount(), 1);
});

it('does not fetch if signal was aborted while was in queue', async (t) => {
t.mock.method(global, 'fetch', () => Promise.resolve());
const fetcher = new ConcurrencyLimitedFetch();

for (let i = 0; i < 10; i++) {
fetcher.fetch('', makeFetchOptions());
}

const abortController = new AbortController();
const aborted = fetcher.fetch('', makeFetchOptions({ abortController }));
abortController.abort();

await assert.rejects(aborted, 'AbortError: This operation was aborted');
assert.strictEqual(fetch.mock.callCount(), 10);
});
});

function makeFetchOptions({ abortController } = {}) {
return {
signal: (abortController ?? new AbortController()).signal,
};
}

function flushPromises() {
return new Promise(setImmediate);
}

0 comments on commit 740f5fa

Please sign in to comment.