Skip to content
This repository has been archived by the owner on Sep 27, 2022. It is now read-only.

Commit

Permalink
Refactored job manager interface to consist of single addJob method
Browse files Browse the repository at this point in the history
refs #122

- In future changes there's a plan to add "inline" scheduled jobs, which would conflict current method naming.
- The amount of parameters in the methods was more than 3, so it made sense to transform them into an options object
- Scheduling still doesn't work for "inline" jobs. This should be solved as a part of upstream library (breejs/bree#68)
  • Loading branch information
naz committed Jan 6, 2021
1 parent c24caf4 commit 3f3014c
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 78 deletions.
121 changes: 59 additions & 62 deletions packages/job-manager/lib/job-manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class JobManager {

this.bree = new Bree({
root: false, // set this to `false` to prevent requiring a root directory of jobs
hasSeconds: true, // precission is needed to avoid task ovelaps after immidiate execution
hasSeconds: true, // precission is needed to avoid task ovelaps after immediate execution
outputWorkerMetadata: true,
logger: logging,
errorHandler: errorHandler
Expand All @@ -46,83 +46,80 @@ class JobManager {
}

/**
* Adds job to queue in current even loop
* By default schedules an "offloaded" job. If `offloaded: true` parameter is set,
* puts an "inline" immediate job into the queue.
*
* @param {Function|String} job - function or path to a module defining a job
* @param {Object} [data] - data to be passed into the job
* @param {Object} GhostJob - job options
* @prop {Function | String} GhostJob.job - function or path to a module defining a job
* @prop {String} [GhostJob.name] - unique job name, if not provided takes function name or job script filename
* @prop {String | Date} [GhostJob.at] - Date, cron or human readable schedule format. Manage will do immediate execution if not specified. Not supported for "inline" jobs
* @prop {Object} [GhostJob.data] - data to be passed into the job
* @prop {Boolean} [GhostJob.offloaded] - creates an "offloaded" job running in a worker thread by default. If set to "false" runs an "inline" job on the same event loop
*/
addJob(job, data) {
this.logging.info('Adding one off job to the queue');

this.queue.push(async () => {
try {
if (typeof job === 'function') {
await job(data);
addJob({name, at, job, data, offloaded = true}) {
if (offloaded) {
this.logging.info('Adding offloaded job to the queue');
let schedule;

if (!name) {
if (typeof job === 'string') {
name = path.parse(job).name;
} else {
await require(job)(data);
throw new Error('Name parameter should be present if job is a function');
}
} catch (err) {
// NOTE: each job should be written in a safe way and handle all errors internally
// if the error is caught here jobs implementaton should be changed
this.logging.error(new errors.IgnitionError({
level: 'critical',
errorType: 'UnhandledJobError',
message: 'Processed job threw an unhandled error',
context: (typeof job === 'function') ? 'function' : job,
err
}));

throw err;
}
}, handler);
}

/**
* Schedules recuring job offloaded to per-job event-loop (thread or a process)
*
* @param {String | Date} at - Date, cron or human readable schedule format
* @param {Function|String} job - function or path to a module defining a job
* @param {Object} [data] - data to be passed into the job
* @param {String} [name] - job name
*/
scheduleJob(at, job, data, name) {
let schedule;
if (at && !(at instanceof Date)) {
if (isCronExpression(at)) {
schedule = later.parse.cron(at, true);
} else {
schedule = later.parse.text(at);
}

if (!name) {
if (typeof job === 'string') {
name = path.parse(job).name;
} else {
throw new Error('Name parameter should be present if job is a function');
}
}
if ((schedule.error && schedule.error !== -1) || schedule.schedules.length === 0) {
throw new Error('Invalid schedule format');
}

if (at && !(at instanceof Date)) {
if (isCronExpression(at)) {
schedule = later.parse.cron(at, true);
this.logging.info(`Scheduling job ${name} at ${at}. Next run on: ${later.schedule(schedule).next()}`);
} else if (at !== undefined) {
this.logging.info(`Scheduling job ${name} at ${at}`);
} else {
schedule = later.parse.text(at);
this.logging.info(`Scheduling job ${name} to run immediately`);
}

if ((schedule.error && schedule.error !== -1) || schedule.schedules.length === 0) {
throw new Error('Invalid schedule format');
}

this.logging.info(`Scheduling job ${name} at ${at}. Next run on: ${later.schedule(schedule).next()}`);
} else if (at !== undefined) {
this.logging.info(`Scheduling job ${name} at ${at}`);
const breeJob = assembleBreeJob(at, job, data, name);
this.bree.add(breeJob);
return this.bree.start(name);
} else {
this.logging.info(`Scheduling job ${name} to run immediately`);
this.logging.info('Adding one off inline job to the queue');

this.queue.push(async () => {
try {
if (typeof job === 'function') {
await job(data);
} else {
await require(job)(data);
}
} catch (err) {
// NOTE: each job should be written in a safe way and handle all errors internally
// if the error is caught here jobs implementaton should be changed
this.logging.error(new errors.IgnitionError({
level: 'critical',
errorType: 'UnhandledJobError',
message: 'Processed job threw an unhandled error',
context: (typeof job === 'function') ? 'function' : job,
err
}));

throw err;
}
}, handler);
}

const breeJob = assembleBreeJob(at, job, data, name);
this.bree.add(breeJob);
return this.bree.start(name);
}

/**
* Removes a job from sqcheduled (offloaded) jobs queue.
* There is no way to remove jovs from in-line (same event loop) jobs
* added through `addJob` method.
* Removes an "offloaded" job from scheduled jobs queue.
* It's NOT yet possible to remove "inline" jobs (will be possible when scheduling is added https://github.com/breejs/bree/issues/68).
* The method will throw an Error if job with provided name does not exist.
*
* NOTE: current implementation does not guarante running job termination
Expand Down
72 changes: 56 additions & 16 deletions packages/job-manager/test/job-manager.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,18 @@ describe('Job Manager', function () {
const jobManager = new JobManager({logging});

should.exist(jobManager.addJob);
should.exist(jobManager.scheduleJob);
});

describe('Add a Job', function () {
describe('Inline jobs', function () {
it('adds a job to a queue', async function () {
const spy = sinon.spy();
const jobManager = new JobManager({logging});

jobManager.addJob(spy, 'test data');
jobManager.addJob({
job: spy,
data: 'test data',
offloaded: false
});
should(jobManager.queue.idle()).be.false();

// give time to execute the job
Expand All @@ -46,7 +49,11 @@ describe('Job Manager', function () {
const spy = sinon.stub().throws();
const jobManager = new JobManager({logging});

jobManager.addJob(spy, 'test data');
jobManager.addJob({
job: spy,
data: 'test data',
offloaded: false
});
should(jobManager.queue.idle()).be.false();

// give time to execute the job
Expand All @@ -59,12 +66,15 @@ describe('Job Manager', function () {
});
});

describe('Schedule a Job', function () {
describe('Offloaded jobs', function () {
it('fails to schedule for invalid scheduling expression', function () {
const jobManager = new JobManager({logging});

try {
jobManager.scheduleJob('invalid expression', 'jobName', {});
jobManager.addJob({
at: 'invalid expression',
name: 'jobName'
});
} catch (err) {
err.message.should.equal('Invalid schedule format');
}
Expand All @@ -74,7 +84,10 @@ describe('Job Manager', function () {
const jobManager = new JobManager({logging});

try {
jobManager.scheduleJob('invalid expression', () => {}, {});
jobManager.addJob({
at: 'invalid expression',
job: () => {}
});
} catch (err) {
err.message.should.equal('Name parameter should be present if job is a function');
}
Expand All @@ -86,7 +99,11 @@ describe('Job Manager', function () {
const jobPath = path.resolve(__dirname, './jobs/simple.js');

const clock = FakeTimers.install({now: Date.now()});
jobManager.scheduleJob(timeInTenSeconds, jobPath, null, 'job-in-ten');
jobManager.addJob({
at: timeInTenSeconds,
job: jobPath,
name: 'job-in-ten'
});

should(jobManager.bree.timeouts['job-in-ten']).type('object');
should(jobManager.bree.workers['job-in-ten']).type('undefined');
Expand Down Expand Up @@ -119,7 +136,10 @@ describe('Job Manager', function () {
const clock = FakeTimers.install({now: Date.now()});

const jobPath = path.resolve(__dirname, './jobs/simple.js');
jobManager.scheduleJob(undefined, jobPath, undefined, 'job-now');
jobManager.addJob({
job: jobPath,
name: 'job-now'
});

should(jobManager.bree.timeouts['job-now']).type('object');

Expand Down Expand Up @@ -148,7 +168,10 @@ describe('Job Manager', function () {
const clock = FakeTimers.install({now: Date.now()});

const jobPath = path.resolve(__dirname, './jobs/simple.js');
jobManager.scheduleJob(undefined, jobPath, undefined, 'job-now');
jobManager.addJob({
job: jobPath,
name: 'job-now'
});

should(jobManager.bree.timeouts['job-now']).type('object');

Expand All @@ -170,7 +193,10 @@ describe('Job Manager', function () {
should(jobManager.bree.workers['job-now']).type('undefined');

(() => {
jobManager.scheduleJob(undefined, jobPath, undefined, 'job-now');
jobManager.addJob({
job: jobPath,
name: 'job-now'
});
}).should.throw('Job #1 has a duplicate job name of job-now');

clock.uninstall();
Expand All @@ -183,7 +209,10 @@ describe('Job Manager', function () {
const spyHandler = sinon.spy();
const jobManager = new JobManager({logging, errorHandler: spyHandler});

jobManager.scheduleJob(undefined, job, undefined, 'will-fail');
jobManager.addJob({
job,
name: 'will-fail'
});

// give time to execute the job
// has to be this long because in Node v10 the communication is
Expand All @@ -204,7 +233,11 @@ describe('Job Manager', function () {
const timeInTenSeconds = new Date(Date.now() + 10);
const jobPath = path.resolve(__dirname, './jobs/simple.js');

jobManager.scheduleJob(timeInTenSeconds, jobPath, null, 'job-in-ten');
jobManager.addJob({
at: timeInTenSeconds,
job: jobPath,
name: 'job-in-ten'
});
jobManager.bree.config.jobs[0].name.should.equal('job-in-ten');

await jobManager.removeJob('job-in-ten');
Expand All @@ -214,10 +247,14 @@ describe('Job Manager', function () {
});

describe('Shutdown', function () {
it('gracefully shuts down a synchronous jobs', async function () {
it('gracefully shuts down an inline jobs', async function () {
const jobManager = new JobManager({logging});

jobManager.addJob(require('./jobs/timed-job'), 200);
jobManager.addJob({
job: require('./jobs/timed-job'),
data: 200,
offloaded: false
});

should(jobManager.queue.idle()).be.false();

Expand All @@ -229,7 +266,10 @@ describe('Job Manager', function () {
it('gracefully shuts down an interval job', async function () {
const jobManager = new JobManager({logging});

jobManager.scheduleJob('every 5 seconds', path.resolve(__dirname, './jobs/graceful.js'));
jobManager.addJob({
at: 'every 5 seconds',
job: path.resolve(__dirname, './jobs/graceful.js')
});

await delay(1); // let the job execution kick in

Expand Down

0 comments on commit 3f3014c

Please sign in to comment.