diff --git a/src/v3/tests/basic.test.ts b/src/v3/tests/basic.test.ts index ad22919..93d5b26 100644 --- a/src/v3/tests/basic.test.ts +++ b/src/v3/tests/basic.test.ts @@ -260,185 +260,188 @@ test.describe('events', { timeout: 500 }, () => { }) }) -test('step error', { timeout: 500 }, async (t) => { - class CustomError extends Error { - override name = 'CustomError' - } +test.describe('errors', async () => { + test('step error', { timeout: 500 }, async (t) => { + class CustomError extends Error { + override name = 'CustomError' + } - const asyncJob = new Job({ - id: 'asyncJob', - input: z.object({ a: z.number() }), - }, async () => { - await Job.run({ id: 'add-one', backoff: 20 }, async () => { - throw new CustomError('Step error') + const asyncJob = new Job({ + id: 'asyncJob', + input: z.object({ a: z.number() }), + }, async () => { + await Job.run({ id: 'add-one', backoff: 20 }, async () => { + throw new CustomError('Step error') + }) }) - }) - const syncJob = new Job({ - id: 'syncJob', - input: z.object({ a: z.number() }), - }, async () => { - await Job.run({ id: 'add-one', backoff: 20 }, () => { - throw new CustomError('Step error') + const syncJob = new Job({ + id: 'syncJob', + input: z.object({ a: z.number() }), + }, async () => { + await Job.run({ id: 'add-one', backoff: 20 }, () => { + throw new CustomError('Step error') + }) }) - }) - - const queue = new Queue({ - id: 'basic', - jobs: { asyncJob, syncJob }, - storage: new SQLiteStorage() - }) + const queue = new Queue({ + id: 'basic', + jobs: { asyncJob, syncJob }, + storage: new SQLiteStorage() + }) - let runs = 1 - asyncJob.emitter.on('run', () => runs++) - syncJob.emitter.on('run', () => runs++) - - performance.mark('before-async') - await assert.rejects(invoke(queue.jobs.asyncJob, { a: 1 }), { message: 'Step error', name: 'CustomError' }) - performance.mark('after-async') - t.diagnostic(`Runs to complete the job: ${runs} (async)`) - const asyncDuration = performance.measure('async', 'before-async', 'after-async').duration - t.diagnostic(`Duration: ${asyncDuration.toFixed(2)}ms`) - assert(asyncDuration > 40, 'Min. 40ms total: 3 retry implies 2 intervals of 20ms') - assert(asyncDuration < 60, 'Max. 60ms total: 3 retry implies 2 intervals of 20ms') - performance.clearMarks() - assert.strictEqual(runs, 4, 'Job should have been retried 3 times (+1 because async Job.run needs a loop to resolve)') - - runs = 1 - // @ts-expect-error -- purposefully testing passing a string - await assert.rejects(invoke(queue.jobs.asyncJob, { a: '1' }), { message: 'Input parsing failed', name: 'NonRecoverableError' }) - t.diagnostic(`Runs to complete the job: ${runs} (async, non recoverable)`) - assert.strictEqual(runs, 1, 'Job should have been retried 0 times') - - runs = 1 - performance.mark('before-sync') - await assert.rejects(invoke(queue.jobs.syncJob, { a: 1 }), { message: 'Step error', name: 'CustomError' }) - performance.mark('after-sync') - t.diagnostic(`Runs to complete the job: ${runs} (sync)`) - const syncDuration = performance.measure('sync', 'before-sync', 'after-sync').duration - t.diagnostic(`Duration: ${syncDuration.toFixed(2)}ms`) - assert(syncDuration > 40, 'Min. 40ms total: 3 retry implies 2 intervals of 20ms') - assert(syncDuration < 60, 'Max. 60ms total: 3 retry implies 2 intervals of 20ms') - performance.clearMarks() - assert.strictEqual(runs, 3, 'Job should have been retried 3 times') - - runs = 1 - // @ts-expect-error -- purposefully testing passing a string - await assert.rejects(invoke(queue.jobs.syncJob, { a: '1' }), { message: 'Input parsing failed', name: 'NonRecoverableError' }) - t.diagnostic(`Runs to complete the job: ${runs} (sync, non recoverable)`) - assert.strictEqual(runs, 1, 'Job should have been retried 0 times') - await queue.close() -}) + let runs = 1 + asyncJob.emitter.on('run', () => runs++) + syncJob.emitter.on('run', () => runs++) + + performance.mark('before-async') + await assert.rejects(invoke(queue.jobs.asyncJob, { a: 1 }), { message: 'Step error', name: 'CustomError' }) + performance.mark('after-async') + t.diagnostic(`Runs to complete the job: ${runs} (async)`) + const asyncDuration = performance.measure('async', 'before-async', 'after-async').duration + t.diagnostic(`Duration: ${asyncDuration.toFixed(2)}ms`) + assert(asyncDuration > 40, 'Min. 40ms total: 3 retry implies 2 intervals of 20ms') + assert(asyncDuration < 60, 'Max. 60ms total: 3 retry implies 2 intervals of 20ms') + performance.clearMarks() + assert.strictEqual(runs, 4, 'Job should have been retried 3 times (+1 because async Job.run needs a loop to resolve)') + + runs = 1 + // @ts-expect-error -- purposefully testing passing a string + await assert.rejects(invoke(queue.jobs.asyncJob, { a: '1' }), { message: 'Input parsing failed', name: 'NonRecoverableError' }) + t.diagnostic(`Runs to complete the job: ${runs} (async, non recoverable)`) + assert.strictEqual(runs, 1, 'Job should have been retried 0 times') + + runs = 1 + performance.mark('before-sync') + await assert.rejects(invoke(queue.jobs.syncJob, { a: 1 }), { message: 'Step error', name: 'CustomError' }) + performance.mark('after-sync') + t.diagnostic(`Runs to complete the job: ${runs} (sync)`) + const syncDuration = performance.measure('sync', 'before-sync', 'after-sync').duration + t.diagnostic(`Duration: ${syncDuration.toFixed(2)}ms`) + assert(syncDuration > 40, 'Min. 40ms total: 3 retry implies 2 intervals of 20ms') + assert(syncDuration < 60, 'Max. 60ms total: 3 retry implies 2 intervals of 20ms') + performance.clearMarks() + assert.strictEqual(runs, 3, 'Job should have been retried 3 times') + + runs = 1 + // @ts-expect-error -- purposefully testing passing a string + await assert.rejects(invoke(queue.jobs.syncJob, { a: '1' }), { message: 'Input parsing failed', name: 'NonRecoverableError' }) + t.diagnostic(`Runs to complete the job: ${runs} (sync, non recoverable)`) + assert.strictEqual(runs, 1, 'Job should have been retried 0 times') -test('step error without backoff delay / with custom `retry` fn', { timeout: 500 }, async (t) => { - let count = 0 - const aaa = new Job({ - id: 'aaa', - onStart(params) { - performance.mark(`start`) - }, - onError(params) { - performance.mark(`error`) - }, - }, async () => Job.run({ - id: 'add-one', - backoff: 0, - retry(attempt, error) { - return attempt < 3 - }, - }, async () => { - count++ - throw new Error('Step error') + await queue.close() }) - ) - const db = new Database() - db.pragma('journal_mode = WAL') + test('step error without backoff delay / with custom `retry` fn', { timeout: 500 }, async (t) => { + let count = 0 + const aaa = new Job({ + id: 'aaa', + onStart(params) { + performance.mark(`start`) + }, + onError(params) { + performance.mark(`error`) + }, + }, async () => Job.run({ + id: 'add-one', + backoff: 0, + retry(attempt, error) { + return attempt < 3 + }, + }, async () => { + count++ + throw new Error('Step error') + }) + ) - const queue = new Queue({ - id: 'basic', - jobs: { aaa }, - storage: new SQLiteStorage({ db }) - }) + const db = new Database() + db.pragma('journal_mode = WAL') - await invoke(queue.jobs.aaa, { a: 1 }).catch(() => { }) - t.diagnostic(`Runs to complete the job: ${count}`) + const queue = new Queue({ + id: 'basic', + jobs: { aaa }, + storage: new SQLiteStorage({ db }) + }) - const steps = db.prepare('SELECT * FROM steps').all() as Step[] - assert.equal(steps.length, 1) - assert.equal(steps[0]!.status, 'failed') - assert.equal(steps[0]!.runs, 3) - assert.equal(count, 3) + await invoke(queue.jobs.aaa, { a: 1 }).catch(() => { }) + t.diagnostic(`Runs to complete the job: ${count}`) - const duration = performance.measure('test', 'start', 'error').duration - t.diagnostic(`Duration: ${duration.toFixed(2)}ms`) - assert(duration < 2, 'Duration should be less than 2ms') + const steps = db.prepare('SELECT * FROM steps').all() as Step[] + assert.equal(steps.length, 1) + assert.equal(steps[0]!.status, 'failed') + assert.equal(steps[0]!.runs, 3) + assert.equal(count, 3) - await queue.close() - db.close() - performance.clearMarks() -}) + const duration = performance.measure('test', 'start', 'error').duration + t.diagnostic(`Duration: ${duration.toFixed(2)}ms`) + assert(duration < 2, 'Duration should be less than 2ms') -test('parsing error (input / output)', { timeout: 500 }, async (t) => { - const aaa = new Job({ - id: 'aaa', - input: z.object({ a: z.union([z.number(), z.string()]) }), - output: z.object({ b: z.number() }), - }, async (input) => { - return { b: input.a } + await queue.close() + db.close() + performance.clearMarks() }) - const db = new Database() - db.pragma('journal_mode = WAL') + test('parsing error (input / output)', { timeout: 500 }, async (t) => { + const aaa = new Job({ + id: 'aaa', + input: z.object({ a: z.union([z.number(), z.string()]) }), + output: z.object({ b: z.number() }), + }, async (input) => { + return { b: input.a } + }) - const queue = new Queue({ - id: 'basic', - jobs: { aaa }, - storage: new SQLiteStorage({ db }) - }) + const db = new Database() + db.pragma('journal_mode = WAL') - normal: { - const fn = mock.fn() - await invoke(queue.jobs.aaa, { a: 1 }).catch(fn) - assert.equal(fn.mock.calls.length, 0) - - const [input, output, ...rest] = db.prepare('SELECT * FROM steps').all() as Step[] - assert.equal(input!.status, 'completed') - assert.equal(input!.step, 'system/parse-input#0') - assert.equal(output!.status, 'completed') - assert.equal(output!.step, 'system/parse-output#0') - assert.equal(rest.length, 0) - db.exec('DELETE FROM steps') - } - input: { - const fn = mock.fn() - // @ts-expect-error -- purposefully testing passing an invalid input - await invoke(queue.jobs.aaa, { a: true }).catch(fn) - assert.equal(fn.mock.calls.length, 1) - - const [input, output, ...rest] = db.prepare('SELECT * FROM steps').all() as Step[] - assert.equal(input!.status, 'failed') - assert.equal(input!.step, 'system/parse-input#0') - assert.equal(output, undefined) - assert.equal(rest.length, 0) - db.exec('DELETE FROM steps') - } - output: { - const fn = mock.fn() - await invoke(queue.jobs.aaa, { a: '1' }).catch(fn) - assert.equal(fn.mock.calls.length, 1) - - const [input, output, ...rest] = db.prepare('SELECT * FROM steps').all() as Step[] - assert.equal(input!.status, 'completed') - assert.equal(input!.step, 'system/parse-input#0') - assert.equal(output!.status, 'failed') - assert.equal(output!.step, 'system/parse-output#0') - assert.equal(rest.length, 0) - db.exec('DELETE FROM steps') - } + const queue = new Queue({ + id: 'basic', + jobs: { aaa }, + storage: new SQLiteStorage({ db }) + }) - await queue.close() - db.close() + normal: { + const fn = mock.fn() + await invoke(queue.jobs.aaa, { a: 1 }).catch(fn) + assert.equal(fn.mock.calls.length, 0) + + const [input, output, ...rest] = db.prepare('SELECT * FROM steps').all() as Step[] + assert.equal(input!.status, 'completed') + assert.equal(input!.step, 'system/parse-input#0') + assert.equal(output!.status, 'completed') + assert.equal(output!.step, 'system/parse-output#0') + assert.equal(rest.length, 0) + db.exec('DELETE FROM steps') + } + input: { + const fn = mock.fn() + // @ts-expect-error -- purposefully testing passing an invalid input + await invoke(queue.jobs.aaa, { a: true }).catch(fn) + assert.equal(fn.mock.calls.length, 1) + + const [input, output, ...rest] = db.prepare('SELECT * FROM steps').all() as Step[] + assert.equal(input!.status, 'failed') + assert.equal(input!.step, 'system/parse-input#0') + assert.equal(output, undefined) + assert.equal(rest.length, 0) + db.exec('DELETE FROM steps') + } + output: { + const fn = mock.fn() + await invoke(queue.jobs.aaa, { a: '1' }).catch(fn) + assert.equal(fn.mock.calls.length, 1) + + const [input, output, ...rest] = db.prepare('SELECT * FROM steps').all() as Step[] + assert.equal(input!.status, 'completed') + assert.equal(input!.step, 'system/parse-input#0') + assert.equal(output!.status, 'failed') + assert.equal(output!.step, 'system/parse-output#0') + assert.equal(rest.length, 0) + db.exec('DELETE FROM steps') + } + + await queue.close() + db.close() + }) }) + diff --git a/src/v3/tests/cancel.test.ts b/src/v3/tests/cancel.test.ts index 6511d87..0ed54a2 100644 --- a/src/v3/tests/cancel.test.ts +++ b/src/v3/tests/cancel.test.ts @@ -7,170 +7,175 @@ import Database from "better-sqlite3" type Step = { step: string, status: string } -test('cancel during Job.run', { timeout: 500 }, async (t) => { - const db = new Database() - db.pragma('journal_mode = WAL') - - let done = false - const aaa = new Job({ - id: 'aaa', - output: z.object({ hello: z.string() }), - }, async () => { - await Job.run('bbb', async () => { - await new Promise(r => setTimeout(r, 100)) - done = true +test.describe('cancel', () => { + + test('cancel during Job.run', { timeout: 500 }, async (t) => { + const db = new Database() + db.pragma('journal_mode = WAL') + + let done = false + const aaa = new Job({ + id: 'aaa', + output: z.object({ hello: z.string() }), + }, async () => { + await Job.run('bbb', async () => { + await new Promise(r => setTimeout(r, 100)) + done = true + }) + return { hello: 'world' } + }) + const queue = new Queue({ + id: 'cancel', + jobs: { aaa }, + storage: new SQLiteStorage({ db }) }) - return { hello: 'world' } - }) - const queue = new Queue({ - id: 'cancel', - jobs: { aaa }, - storage: new SQLiteStorage({ db }) - }) - - const before = Date.now() - const promise = invoke(queue.jobs.aaa, {}) - await new Promise(r => setTimeout(r, 10)) - queue.jobs.aaa.cancel({}, { type: 'explicit' }) - const result = await promise - const after = Date.now() - assert.strictEqual(done, false, 'Step should not have completed') - assert.notDeepEqual(result, { hello: 'world' }, 'Job should not have completed') + const before = Date.now() + const promise = invoke(queue.jobs.aaa, {}) + await new Promise(r => setTimeout(r, 10)) + queue.jobs.aaa.cancel({}, { type: 'explicit' }) + const result = await promise + const after = Date.now() - t.diagnostic(`Task duration: ${after - before}ms`) - assert(after - before < 100, 'Task should have been cancelled before it completed') + assert.strictEqual(done, false, 'Step should not have completed') + assert.notDeepEqual(result, { hello: 'world' }, 'Job should not have completed') - const steps = db.prepare('SELECT * FROM steps').all() as Step[] - assert.strictEqual(steps.length, 1) - assert.strictEqual(steps[0]!.status, 'running', 'Step is still running, we do not abort ongoing user-land code') + t.diagnostic(`Task duration: ${after - before}ms`) + assert(after - before < 100, 'Task should have been cancelled before it completed') - await queue.close() - assert.strictEqual(done, true) + const steps = db.prepare('SELECT * FROM steps').all() as Step[] + assert.strictEqual(steps.length, 1) + assert.strictEqual(steps[0]!.status, 'running', 'Step is still running, we do not abort ongoing user-land code') - const stepsAfterClose = db.prepare('SELECT * FROM steps').all() as Step[] - assert.strictEqual(stepsAfterClose.length, 1) - assert.strictEqual(stepsAfterClose[0]!.status, 'completed', 'Closing the queue awaits all ongoing promises to finish') + await queue.close() + assert.strictEqual(done, true) - db.close() -}) + const stepsAfterClose = db.prepare('SELECT * FROM steps').all() as Step[] + assert.strictEqual(stepsAfterClose.length, 1) + assert.strictEqual(stepsAfterClose[0]!.status, 'completed', 'Closing the queue awaits all ongoing promises to finish') -test('cancel before Job.run', { timeout: 500 }, async (t) => { - const db = new Database() - db.pragma('journal_mode = WAL') + db.close() + }) - let done = false - const aaa = new Job({ - id: 'aaa', - output: z.object({ hello: z.string() }), - }, async () => { - await Job.run('bbb', async () => { - done = true - await new Promise(r => setTimeout(r, 100)) + test('cancel before Job.run', { timeout: 500 }, async (t) => { + const db = new Database() + db.pragma('journal_mode = WAL') + + let done = false + const aaa = new Job({ + id: 'aaa', + output: z.object({ hello: z.string() }), + }, async () => { + await Job.run('bbb', async () => { + done = true + await new Promise(r => setTimeout(r, 100)) + }) + return { hello: 'world' } + }) + const queue = new Queue({ + id: 'cancel', + jobs: { aaa }, + storage: new SQLiteStorage({ db }) }) - return { hello: 'world' } - }) - const queue = new Queue({ - id: 'cancel', - jobs: { aaa }, - storage: new SQLiteStorage({ db }) - }) - const promise = invoke(queue.jobs.aaa, {}) - queue.jobs.aaa.cancel({}, { type: 'explicit' }) - const result = await promise - assert.strictEqual(done, false) - assert.notDeepEqual(result, { hello: 'world' }) + const promise = invoke(queue.jobs.aaa, {}) + queue.jobs.aaa.cancel({}, { type: 'explicit' }) + const result = await promise + assert.strictEqual(done, false) + assert.notDeepEqual(result, { hello: 'world' }) - await queue.close() + await queue.close() - const steps = db.prepare('SELECT * FROM steps').all() as Step[] - assert.strictEqual(steps.length, 0, 'No steps should have been created') + const steps = db.prepare('SELECT * FROM steps').all() as Step[] + assert.strictEqual(steps.length, 0, 'No steps should have been created') - db.close() -}) + db.close() + }) -test('cancel during Job.sleep', { timeout: 500 }, async (t) => { - const db = new Database() - db.pragma('journal_mode = WAL') + test('cancel during Job.sleep', { timeout: 500 }, async (t) => { + const db = new Database() + db.pragma('journal_mode = WAL') - let done = false - const aaa = new Job({ - id: 'aaa', - output: z.object({ hello: z.string() }), - }, async () => { - await Job.sleep(100) - done = true - return { hello: 'world' } - }) - const queue = new Queue({ - id: 'cancel', - jobs: { aaa }, - storage: new SQLiteStorage({ db }) - }) + let done = false + const aaa = new Job({ + id: 'aaa', + output: z.object({ hello: z.string() }), + }, async () => { + await Job.sleep(100) + done = true + return { hello: 'world' } + }) + const queue = new Queue({ + id: 'cancel', + jobs: { aaa }, + storage: new SQLiteStorage({ db }) + }) - const promise = invoke(queue.jobs.aaa, {}) - await new Promise(r => setTimeout(r, 10)) - queue.jobs.aaa.cancel({}, { type: 'explicit' }) - const result = await promise - assert.strictEqual(done, false) - assert.notDeepEqual(result, { hello: 'world' }) + const promise = invoke(queue.jobs.aaa, {}) + await new Promise(r => setTimeout(r, 10)) + queue.jobs.aaa.cancel({}, { type: 'explicit' }) + const result = await promise + assert.strictEqual(done, false) + assert.notDeepEqual(result, { hello: 'world' }) - const steps = db.prepare('SELECT * FROM steps').all() as Step[] - assert.strictEqual(steps.length, 1, 'Only the sleep step should have been created') - assert.strictEqual(steps[0]?.status, 'stalled', 'Sleep step is sleeping') + const steps = db.prepare('SELECT * FROM steps').all() as Step[] + assert.strictEqual(steps.length, 1, 'Only the sleep step should have been created') + assert.strictEqual(steps[0]?.status, 'stalled', 'Sleep step is sleeping') - await queue.close() + await queue.close() - const stepsAfterClose = db.prepare('SELECT * FROM steps').all() as Step[] - assert.strictEqual(stepsAfterClose[0]?.status, 'stalled', 'Sleep step is still sleeping, no active promise maintained the queue open') + const stepsAfterClose = db.prepare('SELECT * FROM steps').all() as Step[] + assert.strictEqual(stepsAfterClose[0]?.status, 'stalled', 'Sleep step is still sleeping, no active promise maintained the queue open') - db.close() -}) + db.close() + }) -test('cancel during Job.waitFor', { timeout: 500 }, async (t) => { - const db = new Database() - db.pragma('journal_mode = WAL') + test('cancel during Job.waitFor', { timeout: 500 }, async (t) => { + const db = new Database() + db.pragma('journal_mode = WAL') - const pipe = new Pipe({ - id: 'pipe', - input: z.object({ hello: z.string() }), - }) + const pipe = new Pipe({ + id: 'pipe', + input: z.object({ hello: z.string() }), + }) - let done = false - const aaa = new Job({ - id: 'aaa', - output: z.object({ hello: z.string() }), - }, async () => { - const result = await Job.waitFor(pipe) - done = true - return result - }) + let done = false + const aaa = new Job({ + id: 'aaa', + output: z.object({ hello: z.string() }), + }, async () => { + const result = await Job.waitFor(pipe) + done = true + return result + }) - const queue = new Queue({ - id: 'cancel', - jobs: { aaa }, - pipes: { pipe }, - storage: new SQLiteStorage({ db }) - }) + const queue = new Queue({ + id: 'cancel', + jobs: { aaa }, + pipes: { pipe }, + storage: new SQLiteStorage({ db }) + }) - const promise = invoke(queue.jobs.aaa, {}) - await new Promise(r => setTimeout(r, 10)) - queue.jobs.aaa.cancel({}, { type: 'explicit' }) - queue.pipes.pipe.dispatch({ hello: 'world' }) - const result = await promise - assert.strictEqual(done, false) - assert.notDeepEqual(result, { hello: 'world' }) + const promise = invoke(queue.jobs.aaa, {}) + await new Promise(r => setTimeout(r, 10)) + queue.jobs.aaa.cancel({}, { type: 'explicit' }) + queue.pipes.pipe.dispatch({ hello: 'world' }) + const result = await promise + assert.strictEqual(done, false) + assert.notDeepEqual(result, { hello: 'world' }) - const steps = db.prepare('SELECT * FROM steps').all() as Step[] - assert.strictEqual(steps.length, 1, 'Only the waitFor step should have been created') - assert.strictEqual(steps[0]?.status, 'waiting', 'WaitFor step is waiting') + const steps = db.prepare('SELECT * FROM steps').all() as Step[] + assert.strictEqual(steps.length, 1, 'Only the waitFor step should have been created') + assert.strictEqual(steps[0]?.status, 'waiting', 'WaitFor step is waiting') - await queue.close() + await queue.close() - const stepsAfterClose = db.prepare('SELECT * FROM steps').all() as Step[] - assert.strictEqual(stepsAfterClose[0]?.status, 'waiting', 'WaitFor step is still waiting, no active promise maintained the queue open') + const stepsAfterClose = db.prepare('SELECT * FROM steps').all() as Step[] + assert.strictEqual(stepsAfterClose[0]?.status, 'waiting', 'WaitFor step is still waiting, no active promise maintained the queue open') - db.close() -}) \ No newline at end of file + db.close() + }) + + +})