From 3f3014c03d10665a98d24c4fa26698b857dfa9c8 Mon Sep 17 00:00:00 2001 From: Naz Date: Wed, 6 Jan 2021 13:45:23 +1300 Subject: [PATCH] Refactored job manager interface to consist of single addJob method refs #122 - In future changes there's a plan to add "inline" scheduled jobs, which would conflict current method naming. - The amount of parameters in the methods was more than 3, so it made sense to transform them into an options object - Scheduling still doesn't work for "inline" jobs. This should be solved as a part of upstream library (https://github.com/breejs/bree/issues/68) --- packages/job-manager/lib/job-manager.js | 121 +++++++++--------- packages/job-manager/test/job-manager.test.js | 72 ++++++++--- 2 files changed, 115 insertions(+), 78 deletions(-) diff --git a/packages/job-manager/lib/job-manager.js b/packages/job-manager/lib/job-manager.js index 76d8bc4d..7a9863d4 100644 --- a/packages/job-manager/lib/job-manager.js +++ b/packages/job-manager/lib/job-manager.js @@ -36,7 +36,7 @@ class JobManager { this.bree = new Bree({ root: false, // set this to `false` to prevent requiring a root directory of jobs - hasSeconds: true, // precission is needed to avoid task ovelaps after immidiate execution + hasSeconds: true, // precission is needed to avoid task ovelaps after immediate execution outputWorkerMetadata: true, logger: logging, errorHandler: errorHandler @@ -46,83 +46,80 @@ class JobManager { } /** - * Adds job to queue in current even loop + * By default schedules an "offloaded" job. If `offloaded: true` parameter is set, + * puts an "inline" immediate job into the queue. * - * @param {Function|String} job - function or path to a module defining a job - * @param {Object} [data] - data to be passed into the job + * @param {Object} GhostJob - job options + * @prop {Function | String} GhostJob.job - function or path to a module defining a job + * @prop {String} [GhostJob.name] - unique job name, if not provided takes function name or job script filename + * @prop {String | Date} [GhostJob.at] - Date, cron or human readable schedule format. Manage will do immediate execution if not specified. Not supported for "inline" jobs + * @prop {Object} [GhostJob.data] - data to be passed into the job + * @prop {Boolean} [GhostJob.offloaded] - creates an "offloaded" job running in a worker thread by default. If set to "false" runs an "inline" job on the same event loop */ - addJob(job, data) { - this.logging.info('Adding one off job to the queue'); - - this.queue.push(async () => { - try { - if (typeof job === 'function') { - await job(data); + addJob({name, at, job, data, offloaded = true}) { + if (offloaded) { + this.logging.info('Adding offloaded job to the queue'); + let schedule; + + if (!name) { + if (typeof job === 'string') { + name = path.parse(job).name; } else { - await require(job)(data); + throw new Error('Name parameter should be present if job is a function'); } - } catch (err) { - // NOTE: each job should be written in a safe way and handle all errors internally - // if the error is caught here jobs implementaton should be changed - this.logging.error(new errors.IgnitionError({ - level: 'critical', - errorType: 'UnhandledJobError', - message: 'Processed job threw an unhandled error', - context: (typeof job === 'function') ? 'function' : job, - err - })); - - throw err; } - }, handler); - } - /** - * Schedules recuring job offloaded to per-job event-loop (thread or a process) - * - * @param {String | Date} at - Date, cron or human readable schedule format - * @param {Function|String} job - function or path to a module defining a job - * @param {Object} [data] - data to be passed into the job - * @param {String} [name] - job name - */ - scheduleJob(at, job, data, name) { - let schedule; + if (at && !(at instanceof Date)) { + if (isCronExpression(at)) { + schedule = later.parse.cron(at, true); + } else { + schedule = later.parse.text(at); + } - if (!name) { - if (typeof job === 'string') { - name = path.parse(job).name; - } else { - throw new Error('Name parameter should be present if job is a function'); - } - } + if ((schedule.error && schedule.error !== -1) || schedule.schedules.length === 0) { + throw new Error('Invalid schedule format'); + } - if (at && !(at instanceof Date)) { - if (isCronExpression(at)) { - schedule = later.parse.cron(at, true); + this.logging.info(`Scheduling job ${name} at ${at}. Next run on: ${later.schedule(schedule).next()}`); + } else if (at !== undefined) { + this.logging.info(`Scheduling job ${name} at ${at}`); } else { - schedule = later.parse.text(at); + this.logging.info(`Scheduling job ${name} to run immediately`); } - if ((schedule.error && schedule.error !== -1) || schedule.schedules.length === 0) { - throw new Error('Invalid schedule format'); - } - - this.logging.info(`Scheduling job ${name} at ${at}. Next run on: ${later.schedule(schedule).next()}`); - } else if (at !== undefined) { - this.logging.info(`Scheduling job ${name} at ${at}`); + const breeJob = assembleBreeJob(at, job, data, name); + this.bree.add(breeJob); + return this.bree.start(name); } else { - this.logging.info(`Scheduling job ${name} to run immediately`); + this.logging.info('Adding one off inline job to the queue'); + + this.queue.push(async () => { + try { + if (typeof job === 'function') { + await job(data); + } else { + await require(job)(data); + } + } catch (err) { + // NOTE: each job should be written in a safe way and handle all errors internally + // if the error is caught here jobs implementaton should be changed + this.logging.error(new errors.IgnitionError({ + level: 'critical', + errorType: 'UnhandledJobError', + message: 'Processed job threw an unhandled error', + context: (typeof job === 'function') ? 'function' : job, + err + })); + + throw err; + } + }, handler); } - - const breeJob = assembleBreeJob(at, job, data, name); - this.bree.add(breeJob); - return this.bree.start(name); } /** - * Removes a job from sqcheduled (offloaded) jobs queue. - * There is no way to remove jovs from in-line (same event loop) jobs - * added through `addJob` method. + * Removes an "offloaded" job from scheduled jobs queue. + * It's NOT yet possible to remove "inline" jobs (will be possible when scheduling is added https://github.com/breejs/bree/issues/68). * The method will throw an Error if job with provided name does not exist. * * NOTE: current implementation does not guarante running job termination diff --git a/packages/job-manager/test/job-manager.test.js b/packages/job-manager/test/job-manager.test.js index f092f111..61adfb7a 100644 --- a/packages/job-manager/test/job-manager.test.js +++ b/packages/job-manager/test/job-manager.test.js @@ -23,15 +23,18 @@ describe('Job Manager', function () { const jobManager = new JobManager({logging}); should.exist(jobManager.addJob); - should.exist(jobManager.scheduleJob); }); - describe('Add a Job', function () { + describe('Inline jobs', function () { it('adds a job to a queue', async function () { const spy = sinon.spy(); const jobManager = new JobManager({logging}); - jobManager.addJob(spy, 'test data'); + jobManager.addJob({ + job: spy, + data: 'test data', + offloaded: false + }); should(jobManager.queue.idle()).be.false(); // give time to execute the job @@ -46,7 +49,11 @@ describe('Job Manager', function () { const spy = sinon.stub().throws(); const jobManager = new JobManager({logging}); - jobManager.addJob(spy, 'test data'); + jobManager.addJob({ + job: spy, + data: 'test data', + offloaded: false + }); should(jobManager.queue.idle()).be.false(); // give time to execute the job @@ -59,12 +66,15 @@ describe('Job Manager', function () { }); }); - describe('Schedule a Job', function () { + describe('Offloaded jobs', function () { it('fails to schedule for invalid scheduling expression', function () { const jobManager = new JobManager({logging}); try { - jobManager.scheduleJob('invalid expression', 'jobName', {}); + jobManager.addJob({ + at: 'invalid expression', + name: 'jobName' + }); } catch (err) { err.message.should.equal('Invalid schedule format'); } @@ -74,7 +84,10 @@ describe('Job Manager', function () { const jobManager = new JobManager({logging}); try { - jobManager.scheduleJob('invalid expression', () => {}, {}); + jobManager.addJob({ + at: 'invalid expression', + job: () => {} + }); } catch (err) { err.message.should.equal('Name parameter should be present if job is a function'); } @@ -86,7 +99,11 @@ describe('Job Manager', function () { const jobPath = path.resolve(__dirname, './jobs/simple.js'); const clock = FakeTimers.install({now: Date.now()}); - jobManager.scheduleJob(timeInTenSeconds, jobPath, null, 'job-in-ten'); + jobManager.addJob({ + at: timeInTenSeconds, + job: jobPath, + name: 'job-in-ten' + }); should(jobManager.bree.timeouts['job-in-ten']).type('object'); should(jobManager.bree.workers['job-in-ten']).type('undefined'); @@ -119,7 +136,10 @@ describe('Job Manager', function () { const clock = FakeTimers.install({now: Date.now()}); const jobPath = path.resolve(__dirname, './jobs/simple.js'); - jobManager.scheduleJob(undefined, jobPath, undefined, 'job-now'); + jobManager.addJob({ + job: jobPath, + name: 'job-now' + }); should(jobManager.bree.timeouts['job-now']).type('object'); @@ -148,7 +168,10 @@ describe('Job Manager', function () { const clock = FakeTimers.install({now: Date.now()}); const jobPath = path.resolve(__dirname, './jobs/simple.js'); - jobManager.scheduleJob(undefined, jobPath, undefined, 'job-now'); + jobManager.addJob({ + job: jobPath, + name: 'job-now' + }); should(jobManager.bree.timeouts['job-now']).type('object'); @@ -170,7 +193,10 @@ describe('Job Manager', function () { should(jobManager.bree.workers['job-now']).type('undefined'); (() => { - jobManager.scheduleJob(undefined, jobPath, undefined, 'job-now'); + jobManager.addJob({ + job: jobPath, + name: 'job-now' + }); }).should.throw('Job #1 has a duplicate job name of job-now'); clock.uninstall(); @@ -183,7 +209,10 @@ describe('Job Manager', function () { const spyHandler = sinon.spy(); const jobManager = new JobManager({logging, errorHandler: spyHandler}); - jobManager.scheduleJob(undefined, job, undefined, 'will-fail'); + jobManager.addJob({ + job, + name: 'will-fail' + }); // give time to execute the job // has to be this long because in Node v10 the communication is @@ -204,7 +233,11 @@ describe('Job Manager', function () { const timeInTenSeconds = new Date(Date.now() + 10); const jobPath = path.resolve(__dirname, './jobs/simple.js'); - jobManager.scheduleJob(timeInTenSeconds, jobPath, null, 'job-in-ten'); + jobManager.addJob({ + at: timeInTenSeconds, + job: jobPath, + name: 'job-in-ten' + }); jobManager.bree.config.jobs[0].name.should.equal('job-in-ten'); await jobManager.removeJob('job-in-ten'); @@ -214,10 +247,14 @@ describe('Job Manager', function () { }); describe('Shutdown', function () { - it('gracefully shuts down a synchronous jobs', async function () { + it('gracefully shuts down an inline jobs', async function () { const jobManager = new JobManager({logging}); - jobManager.addJob(require('./jobs/timed-job'), 200); + jobManager.addJob({ + job: require('./jobs/timed-job'), + data: 200, + offloaded: false + }); should(jobManager.queue.idle()).be.false(); @@ -229,7 +266,10 @@ describe('Job Manager', function () { it('gracefully shuts down an interval job', async function () { const jobManager = new JobManager({logging}); - jobManager.scheduleJob('every 5 seconds', path.resolve(__dirname, './jobs/graceful.js')); + jobManager.addJob({ + at: 'every 5 seconds', + job: path.resolve(__dirname, './jobs/graceful.js') + }); await delay(1); // let the job execution kick in