Skip to content

Commit

Permalink
c
Browse files Browse the repository at this point in the history
  • Loading branch information
Sheraff committed Jun 26, 2024
1 parent 0871cc1 commit 8bec2d8
Show file tree
Hide file tree
Showing 2 changed files with 304 additions and 296 deletions.
323 changes: 163 additions & 160 deletions src/v3/tests/basic.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -260,185 +260,188 @@ test.describe('events', { timeout: 500 }, () => {
})
})

test('step error', { timeout: 500 }, async (t) => {
class CustomError extends Error {
override name = 'CustomError'
}
test.describe('errors', async () => {
test('step error', { timeout: 500 }, async (t) => {
class CustomError extends Error {
override name = 'CustomError'
}

const asyncJob = new Job({
id: 'asyncJob',
input: z.object({ a: z.number() }),
}, async () => {
await Job.run({ id: 'add-one', backoff: 20 }, async () => {
throw new CustomError('Step error')
const asyncJob = new Job({
id: 'asyncJob',
input: z.object({ a: z.number() }),
}, async () => {
await Job.run({ id: 'add-one', backoff: 20 }, async () => {
throw new CustomError('Step error')
})
})
})

const syncJob = new Job({
id: 'syncJob',
input: z.object({ a: z.number() }),
}, async () => {
await Job.run({ id: 'add-one', backoff: 20 }, () => {
throw new CustomError('Step error')
const syncJob = new Job({
id: 'syncJob',
input: z.object({ a: z.number() }),
}, async () => {
await Job.run({ id: 'add-one', backoff: 20 }, () => {
throw new CustomError('Step error')
})
})
})

const queue = new Queue({
id: 'basic',
jobs: { asyncJob, syncJob },
storage: new SQLiteStorage()
})

const queue = new Queue({
id: 'basic',
jobs: { asyncJob, syncJob },
storage: new SQLiteStorage()
})

let runs = 1
asyncJob.emitter.on('run', () => runs++)
syncJob.emitter.on('run', () => runs++)

performance.mark('before-async')
await assert.rejects(invoke(queue.jobs.asyncJob, { a: 1 }), { message: 'Step error', name: 'CustomError' })
performance.mark('after-async')
t.diagnostic(`Runs to complete the job: ${runs} (async)`)
const asyncDuration = performance.measure('async', 'before-async', 'after-async').duration
t.diagnostic(`Duration: ${asyncDuration.toFixed(2)}ms`)
assert(asyncDuration > 40, 'Min. 40ms total: 3 retry implies 2 intervals of 20ms')
assert(asyncDuration < 60, 'Max. 60ms total: 3 retry implies 2 intervals of 20ms')
performance.clearMarks()
assert.strictEqual(runs, 4, 'Job should have been retried 3 times (+1 because async Job.run needs a loop to resolve)')

runs = 1
// @ts-expect-error -- purposefully testing passing a string
await assert.rejects(invoke(queue.jobs.asyncJob, { a: '1' }), { message: 'Input parsing failed', name: 'NonRecoverableError' })
t.diagnostic(`Runs to complete the job: ${runs} (async, non recoverable)`)
assert.strictEqual(runs, 1, 'Job should have been retried 0 times')

runs = 1
performance.mark('before-sync')
await assert.rejects(invoke(queue.jobs.syncJob, { a: 1 }), { message: 'Step error', name: 'CustomError' })
performance.mark('after-sync')
t.diagnostic(`Runs to complete the job: ${runs} (sync)`)
const syncDuration = performance.measure('sync', 'before-sync', 'after-sync').duration
t.diagnostic(`Duration: ${syncDuration.toFixed(2)}ms`)
assert(syncDuration > 40, 'Min. 40ms total: 3 retry implies 2 intervals of 20ms')
assert(syncDuration < 60, 'Max. 60ms total: 3 retry implies 2 intervals of 20ms')
performance.clearMarks()
assert.strictEqual(runs, 3, 'Job should have been retried 3 times')

runs = 1
// @ts-expect-error -- purposefully testing passing a string
await assert.rejects(invoke(queue.jobs.syncJob, { a: '1' }), { message: 'Input parsing failed', name: 'NonRecoverableError' })
t.diagnostic(`Runs to complete the job: ${runs} (sync, non recoverable)`)
assert.strictEqual(runs, 1, 'Job should have been retried 0 times')

await queue.close()
})
let runs = 1
asyncJob.emitter.on('run', () => runs++)
syncJob.emitter.on('run', () => runs++)

performance.mark('before-async')
await assert.rejects(invoke(queue.jobs.asyncJob, { a: 1 }), { message: 'Step error', name: 'CustomError' })
performance.mark('after-async')
t.diagnostic(`Runs to complete the job: ${runs} (async)`)
const asyncDuration = performance.measure('async', 'before-async', 'after-async').duration
t.diagnostic(`Duration: ${asyncDuration.toFixed(2)}ms`)
assert(asyncDuration > 40, 'Min. 40ms total: 3 retry implies 2 intervals of 20ms')
assert(asyncDuration < 60, 'Max. 60ms total: 3 retry implies 2 intervals of 20ms')
performance.clearMarks()
assert.strictEqual(runs, 4, 'Job should have been retried 3 times (+1 because async Job.run needs a loop to resolve)')

runs = 1
// @ts-expect-error -- purposefully testing passing a string
await assert.rejects(invoke(queue.jobs.asyncJob, { a: '1' }), { message: 'Input parsing failed', name: 'NonRecoverableError' })
t.diagnostic(`Runs to complete the job: ${runs} (async, non recoverable)`)
assert.strictEqual(runs, 1, 'Job should have been retried 0 times')

runs = 1
performance.mark('before-sync')
await assert.rejects(invoke(queue.jobs.syncJob, { a: 1 }), { message: 'Step error', name: 'CustomError' })
performance.mark('after-sync')
t.diagnostic(`Runs to complete the job: ${runs} (sync)`)
const syncDuration = performance.measure('sync', 'before-sync', 'after-sync').duration
t.diagnostic(`Duration: ${syncDuration.toFixed(2)}ms`)
assert(syncDuration > 40, 'Min. 40ms total: 3 retry implies 2 intervals of 20ms')
assert(syncDuration < 60, 'Max. 60ms total: 3 retry implies 2 intervals of 20ms')
performance.clearMarks()
assert.strictEqual(runs, 3, 'Job should have been retried 3 times')

runs = 1
// @ts-expect-error -- purposefully testing passing a string
await assert.rejects(invoke(queue.jobs.syncJob, { a: '1' }), { message: 'Input parsing failed', name: 'NonRecoverableError' })
t.diagnostic(`Runs to complete the job: ${runs} (sync, non recoverable)`)
assert.strictEqual(runs, 1, 'Job should have been retried 0 times')

test('step error without backoff delay / with custom `retry` fn', { timeout: 500 }, async (t) => {
let count = 0
const aaa = new Job({
id: 'aaa',
onStart(params) {
performance.mark(`start`)
},
onError(params) {
performance.mark(`error`)
},
}, async () => Job.run({
id: 'add-one',
backoff: 0,
retry(attempt, error) {
return attempt < 3
},
}, async () => {
count++
throw new Error('Step error')
await queue.close()
})
)

const db = new Database()
db.pragma('journal_mode = WAL')
test('step error without backoff delay / with custom `retry` fn', { timeout: 500 }, async (t) => {
let count = 0
const aaa = new Job({
id: 'aaa',
onStart(params) {
performance.mark(`start`)
},
onError(params) {
performance.mark(`error`)
},
}, async () => Job.run({
id: 'add-one',
backoff: 0,
retry(attempt, error) {
return attempt < 3
},
}, async () => {
count++
throw new Error('Step error')
})
)

const queue = new Queue({
id: 'basic',
jobs: { aaa },
storage: new SQLiteStorage({ db })
})
const db = new Database()
db.pragma('journal_mode = WAL')

await invoke(queue.jobs.aaa, { a: 1 }).catch(() => { })
t.diagnostic(`Runs to complete the job: ${count}`)
const queue = new Queue({
id: 'basic',
jobs: { aaa },
storage: new SQLiteStorage({ db })
})

const steps = db.prepare('SELECT * FROM steps').all() as Step[]
assert.equal(steps.length, 1)
assert.equal(steps[0]!.status, 'failed')
assert.equal(steps[0]!.runs, 3)
assert.equal(count, 3)
await invoke(queue.jobs.aaa, { a: 1 }).catch(() => { })
t.diagnostic(`Runs to complete the job: ${count}`)

const duration = performance.measure('test', 'start', 'error').duration
t.diagnostic(`Duration: ${duration.toFixed(2)}ms`)
assert(duration < 2, 'Duration should be less than 2ms')
const steps = db.prepare('SELECT * FROM steps').all() as Step[]
assert.equal(steps.length, 1)
assert.equal(steps[0]!.status, 'failed')
assert.equal(steps[0]!.runs, 3)
assert.equal(count, 3)

await queue.close()
db.close()
performance.clearMarks()
})
const duration = performance.measure('test', 'start', 'error').duration
t.diagnostic(`Duration: ${duration.toFixed(2)}ms`)
assert(duration < 2, 'Duration should be less than 2ms')

test('parsing error (input / output)', { timeout: 500 }, async (t) => {
const aaa = new Job({
id: 'aaa',
input: z.object({ a: z.union([z.number(), z.string()]) }),
output: z.object({ b: z.number() }),
}, async (input) => {
return { b: input.a }
await queue.close()
db.close()
performance.clearMarks()
})

const db = new Database()
db.pragma('journal_mode = WAL')
test('parsing error (input / output)', { timeout: 500 }, async (t) => {
const aaa = new Job({
id: 'aaa',
input: z.object({ a: z.union([z.number(), z.string()]) }),
output: z.object({ b: z.number() }),
}, async (input) => {
return { b: input.a }
})

const queue = new Queue({
id: 'basic',
jobs: { aaa },
storage: new SQLiteStorage({ db })
})
const db = new Database()
db.pragma('journal_mode = WAL')

normal: {
const fn = mock.fn()
await invoke(queue.jobs.aaa, { a: 1 }).catch(fn)
assert.equal(fn.mock.calls.length, 0)

const [input, output, ...rest] = db.prepare('SELECT * FROM steps').all() as Step[]
assert.equal(input!.status, 'completed')
assert.equal(input!.step, 'system/parse-input#0')
assert.equal(output!.status, 'completed')
assert.equal(output!.step, 'system/parse-output#0')
assert.equal(rest.length, 0)
db.exec('DELETE FROM steps')
}
input: {
const fn = mock.fn()
// @ts-expect-error -- purposefully testing passing an invalid input
await invoke(queue.jobs.aaa, { a: true }).catch(fn)
assert.equal(fn.mock.calls.length, 1)

const [input, output, ...rest] = db.prepare('SELECT * FROM steps').all() as Step[]
assert.equal(input!.status, 'failed')
assert.equal(input!.step, 'system/parse-input#0')
assert.equal(output, undefined)
assert.equal(rest.length, 0)
db.exec('DELETE FROM steps')
}
output: {
const fn = mock.fn()
await invoke(queue.jobs.aaa, { a: '1' }).catch(fn)
assert.equal(fn.mock.calls.length, 1)

const [input, output, ...rest] = db.prepare('SELECT * FROM steps').all() as Step[]
assert.equal(input!.status, 'completed')
assert.equal(input!.step, 'system/parse-input#0')
assert.equal(output!.status, 'failed')
assert.equal(output!.step, 'system/parse-output#0')
assert.equal(rest.length, 0)
db.exec('DELETE FROM steps')
}
const queue = new Queue({
id: 'basic',
jobs: { aaa },
storage: new SQLiteStorage({ db })
})

await queue.close()
db.close()
normal: {
const fn = mock.fn()
await invoke(queue.jobs.aaa, { a: 1 }).catch(fn)
assert.equal(fn.mock.calls.length, 0)

const [input, output, ...rest] = db.prepare('SELECT * FROM steps').all() as Step[]
assert.equal(input!.status, 'completed')
assert.equal(input!.step, 'system/parse-input#0')
assert.equal(output!.status, 'completed')
assert.equal(output!.step, 'system/parse-output#0')
assert.equal(rest.length, 0)
db.exec('DELETE FROM steps')
}
input: {
const fn = mock.fn()
// @ts-expect-error -- purposefully testing passing an invalid input
await invoke(queue.jobs.aaa, { a: true }).catch(fn)
assert.equal(fn.mock.calls.length, 1)

const [input, output, ...rest] = db.prepare('SELECT * FROM steps').all() as Step[]
assert.equal(input!.status, 'failed')
assert.equal(input!.step, 'system/parse-input#0')
assert.equal(output, undefined)
assert.equal(rest.length, 0)
db.exec('DELETE FROM steps')
}
output: {
const fn = mock.fn()
await invoke(queue.jobs.aaa, { a: '1' }).catch(fn)
assert.equal(fn.mock.calls.length, 1)

const [input, output, ...rest] = db.prepare('SELECT * FROM steps').all() as Step[]
assert.equal(input!.status, 'completed')
assert.equal(input!.step, 'system/parse-input#0')
assert.equal(output!.status, 'failed')
assert.equal(output!.step, 'system/parse-output#0')
assert.equal(rest.length, 0)
db.exec('DELETE FROM steps')
}

await queue.close()
db.close()
})
})

Loading

0 comments on commit 8bec2d8

Please sign in to comment.