Skip to content

Commit

Permalink
feat: individual job concurrency
Browse files Browse the repository at this point in the history
  • Loading branch information
adiologydev committed Aug 31, 2024
1 parent 2e2d60e commit 590d802
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 29 deletions.
66 changes: 42 additions & 24 deletions packages/tinyjobs/src/lib/TinyJobsHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,22 @@ type JobsMap = Map<

class TinyJobs<T> {
private queue: Queue;
private worker: Worker;
// private worker: Worker;
private redis: IORedis.Redis;

private options = {
private workers: Map<string, Worker> = new Map();

private options: {
removeOnComplete: boolean;
removeOnFailure: boolean;
concurrency: number;
connection?: ConnectionOptions;
} = {
removeOnComplete: false,
removeOnFailure: false,
concurrency: 1,
connection: {},
};

private readonly removeOnComplete = true;
private readonly removeOnFailure = true;

public jobs: JobsMap = new Map();
public events: TinyJobEventsHandler;

Expand All @@ -52,16 +57,17 @@ class TinyJobs<T> {
concurrency,
} = tinyJobsParams ?? {};

this.options = {
...this.options,
connection: bullConnection,
concurrency: concurrency ?? 1,
};

this.queue = new Queue(queueName, {
connection: bullConnection ?? {},
...queueOptions,
});

this.worker = new Worker(queueName, this.processQueue.bind(this), {
connection: bullConnection ?? {},
concurrency: concurrency ?? 1,
});

this.redis = new IORedis.Redis(redisConnection ?? {});
this.events = new TinyJobEventsHandler({ queueName, redis: this.redis });

Expand All @@ -74,19 +80,28 @@ class TinyJobs<T> {
if (!jobClass)
throw new Error(`No handler registered for job type: ${job.name}`);

if (jobClass instanceof TinyJob) {
if (jobClass instanceof TinyJob)
await Promise.resolve(jobClass.run(job.data));
} else {
throw new Error("Invalid job type.");
}
else throw new Error("Invalid job type.");
}

public async registerJob(job: new () => TinyJob) {
public async registerJob(job: new () => TinyJob, path?: string) {
if (this.jobs.has(job.name))
throw new Error(`Job with name ${job.name} already registered.`);

const implementation = new job();
const { name, cron, delay } = implementation;
const { name, cron, delay, conccurency } = implementation;

const jobWorker = new Worker(
this.queue.name,
this.processQueue.bind(this),
{
concurrency: conccurency ?? this.options.concurrency,
connection: this.options.connection ?? {},
}
);

this.workers.set(name, jobWorker);

this.jobs.set(name, {
implementation: implementation,
Expand All @@ -100,8 +115,8 @@ class TinyJobs<T> {
repeat: {
pattern: implementation.cron,
},
removeOnComplete: this.removeOnComplete,
removeOnFail: this.removeOnFailure,
removeOnComplete: this.options.removeOnComplete,
removeOnFail: this.options.removeOnFailure,
delay: implementation.delay,
});
}
Expand All @@ -120,9 +135,9 @@ class TinyJobs<T> {
);

const jobs = await loadJobsFromDir(jobsDir);
for (const job of jobs) {
for (const [jobName, job] of jobs) {
if (typeof job === "function") {
await this.registerJob(job);
await this.registerJob(job, jobName);
} else {
throw new Error(`Invalid job type: ${typeof job}`);
}
Expand All @@ -144,8 +159,8 @@ class TinyJobs<T> {
}
: undefined,
delay: delay ?? undefined,
removeOnComplete: this.removeOnComplete,
removeOnFail: this.removeOnFailure,
removeOnComplete: this.options.removeOnComplete,
removeOnFail: this.options.removeOnFailure,
...options,
});

Expand All @@ -160,8 +175,11 @@ class TinyJobs<T> {
}

private async gracefulShutdown(signal: NodeJS.Signals) {
await this.worker.close();
for (const worker of this.workers.values()) await worker.close();

// await this.worker.close();
await this.redis.quit();

process.exit(signal === "SIGTERM" ? 0 : 1);
}
}
Expand Down
10 changes: 9 additions & 1 deletion packages/tinyjobs/src/structures/Job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,26 @@ class Job {
name: string;
cron?: string;
delay?: number;
conccurency?: number;

/**
* Creates an instance of Job.
* @param {string} name The name of the job.
* @param {string} [cron] The cron pattern for the job.
* @param {number} [delay] The delay in milliseconds for the job.
* @param {number} [concurrency] The concurrency for the job. Defaults to global concurrency of TinyJobs.
* @memberof Job
*/
constructor(options: { name: string; cron?: string; delay?: number }) {
constructor(options: {
name: string;
cron?: string;
delay?: number;
concurrency?: number;
}) {
this.name = options.name;
this.cron = options.cron;
this.delay = options.delay;
this.conccurency = options.concurrency;
}

async run(payload?: Record<string, unknown>) {
Expand Down
7 changes: 3 additions & 4 deletions packages/tinyjobs/src/utils/jobs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,12 @@ export const loadJobsFromDir = async (dir: string) => {
!file.endsWith(".d.ts")
);

const jobs: TinyJob[] = [];
const jobs = new Map<string, TinyJob>();

for (const file of files) {
const JobClass = require(path.join(dir, file)).default;
if (JobClass.prototype instanceof TinyJob) {
jobs.push(JobClass);
}
if (JobClass.prototype instanceof TinyJob)
jobs.set(`${dir}/${file}`, JobClass);
}

return jobs;
Expand Down

0 comments on commit 590d802

Please sign in to comment.