diff --git a/__tests__/integration/ioredis-mock.ts b/__tests__/integration/ioredis-mock.ts new file mode 100644 index 00000000..fbae246b --- /dev/null +++ b/__tests__/integration/ioredis-mock.ts @@ -0,0 +1,87 @@ +import { Queue, Worker, Scheduler } from "../../src"; +import * as RedisMock from "ioredis-mock"; +import specHelper from "../utils/specHelper"; + +// for ioredis-mock, we need to re-use a shared connection +// setting "pkg" is important! +const REDIS = new RedisMock(); +const connectionDetails = { redis: REDIS, pkg: "ioredis-mock" }; + +const jobs = { + add: { + perform: async (a, b) => { + const response = a + b; + return response; + }, + }, +}; + +describe("testing with ioredis-mock package", () => { + let queue: Queue; + let scheduler: Scheduler; + let worker: Worker; + + afterAll(async () => { + await queue.end(); + await scheduler.end(); + await worker.end(); + }); + + test("a queue can be created", async () => { + queue = new Queue( + { connection: connectionDetails, queues: ["math"] }, + jobs + ); + await queue.connect(); + }); + + test("a scheduler can be created", async () => { + scheduler = new Scheduler( + { connection: connectionDetails, queues: ["math"] }, + jobs + ); + await scheduler.connect(); + // await scheduler.start(); + }); + + test("a worker can be created", async () => { + worker = new Worker( + { + connection: connectionDetails, + queues: ["math"], + timeout: specHelper.timeout, + }, + jobs + ); + await worker.connect(); + // worker.start(); + }); + + test("a job can be enqueued", async () => { + await queue.enqueueIn(1, "math", "add", [1, 2]); + const times = await queue.scheduledAt("math", "add", [1, 2]); + expect(times.length).toBe(1); + }); + + test("the scheduler can promote the job", async () => { + await scheduler.poll(); + const times = await queue.scheduledAt("math", "add", [1, 2]); + expect(times.length).toBe(0); + const jobsLength = await queue.length("math"); + expect(jobsLength).toBe(1); + }); + + test("the worker can work the job", async (done) => { + await worker.start(); + worker.on("success", async (q, job, result, duration) => { + expect(q).toBe("math"); + expect(job.class).toBe("add"); + expect(result).toBe(3); + expect(worker.result).toBe(result); + expect(duration).toBeGreaterThanOrEqual(0); + + worker.removeAllListeners("success"); + done(); + }); + }); +}); diff --git a/__tests__/integration/ioredis.ts b/__tests__/integration/ioredis.ts new file mode 100644 index 00000000..36763da1 --- /dev/null +++ b/__tests__/integration/ioredis.ts @@ -0,0 +1,89 @@ +import { Queue, Worker, Scheduler } from "../../src"; +import specHelper from "../utils/specHelper"; + +const connectionDetails = { + pkg: "ioredis", + host: "127.0.0.1", + password: null, + port: 6379, + database: parseInt(process.env.JEST_WORKER_ID || "0"), +}; + +const jobs = { + add: { + perform: async (a, b) => { + const response = a + b; + return response; + }, + }, +}; + +describe("testing with ioredis package", () => { + let queue: Queue; + let scheduler: Scheduler; + let worker: Worker; + + afterAll(async () => { + await queue.end(); + await scheduler.end(); + await worker.end(); + }); + + test("a queue can be created", async () => { + queue = new Queue( + { connection: connectionDetails, queues: ["math"] }, + jobs + ); + await queue.connect(); + }); + + test("a scheduler can be created", async () => { + scheduler = new Scheduler( + { connection: connectionDetails, queues: ["math"] }, + jobs + ); + await scheduler.connect(); + // await scheduler.start(); + }); + + test("a worker can be created", async () => { + worker = new Worker( + { + connection: connectionDetails, + queues: ["math"], + timeout: specHelper.timeout, + }, + jobs + ); + await worker.connect(); + // worker.start(); + }); + + test("a job can be enqueued", async () => { + await queue.enqueueIn(1, "math", "add", [1, 2]); + const times = await queue.scheduledAt("math", "add", [1, 2]); + expect(times.length).toBe(1); + }); + + test("the scheduler can promote the job", async () => { + await scheduler.poll(); + const times = await queue.scheduledAt("math", "add", [1, 2]); + expect(times.length).toBe(0); + const jobsLength = await queue.length("math"); + expect(jobsLength).toBe(1); + }); + + test("the worker can work the job", async (done) => { + await worker.start(); + worker.on("success", async (q, job, result, duration) => { + expect(q).toBe("math"); + expect(job.class).toBe("add"); + expect(result).toBe(3); + expect(worker.result).toBe(result); + expect(duration).toBeGreaterThanOrEqual(0); + + worker.removeAllListeners("success"); + done(); + }); + }); +}); diff --git a/__tests__/utils/specHelper.ts b/__tests__/utils/specHelper.ts index a82ae84f..de74f4a5 100644 --- a/__tests__/utils/specHelper.ts +++ b/__tests__/utils/specHelper.ts @@ -1,8 +1,9 @@ -const Redis = require("ioredis"); +import * as Redis from "ioredis"; +import * as NodeResque from "../../src/index"; + const namespace = `resque-test-${process.env.JEST_WORKER_ID || 0}`; const queue = "test_queue"; const pkg = "ioredis"; -const NodeResque = require("../../src/index"); const SpecHelper = { pkg: pkg, @@ -21,11 +22,14 @@ const SpecHelper = { }, connect: async function () { - this.redis = Redis.createClient( + if (!this.connectionDetails.options) this.connectionDetails.options = {}; + this.connectionDetails.options.db = this.connectionDetails?.options?.database; + this.redis = new Redis( this.connectionDetails.port, this.connectionDetails.host, this.connectionDetails.options ); + this.redis.setMaxListeners(0); if ( this.connectionDetails.password !== null && diff --git a/examples/example-mock.ts b/examples/example-mock.ts new file mode 100755 index 00000000..950b9046 --- /dev/null +++ b/examples/example-mock.ts @@ -0,0 +1,176 @@ +#!/usr/bin/env ts-node + +import { Queue, Scheduler, Worker } from "../src"; +/* In your projects: +import { Queue, Scheduler, Worker } from "node-resque"; +*/ + +import * as RedisMock from "ioredis-mock"; + +async function boot() { + // //////////////////////// + // SET UP THE CONNECTION // + // //////////////////////// + + // for ioredis-mock, we need to re-use a shared connection + // setting "pkg" is important! + const connectionDetails = { redis: new RedisMock(), pkg: "ioredis-mock" }; + + // /////////////////////////// + // DEFINE YOUR WORKER TASKS // + // /////////////////////////// + + let jobsToComplete = 0; + + const jobs = { + add: { + plugins: ["JobLock"], + pluginOptions: { + JobLock: {}, + }, + perform: async (a, b) => { + await new Promise((resolve) => { + setTimeout(resolve, 1000); + }); + jobsToComplete--; + tryShutdown(); + + const answer = a + b; + return answer; + }, + }, + subtract: { + perform: (a, b) => { + jobsToComplete--; + tryShutdown(); + + const answer = a - b; + return answer; + }, + }, + }; + + // just a helper for this demo + async function tryShutdown() { + if (jobsToComplete === 0) { + await new Promise((resolve) => { + setTimeout(resolve, 500); + }); + await scheduler.end(); + await worker.end(); + process.exit(); + } + } + + // ///////////////// + // START A WORKER // + // ///////////////// + + const worker = new Worker( + { connection: connectionDetails, queues: ["math", "otherQueue"] }, + jobs + ); + await worker.connect(); + worker.start(); + + // //////////////////// + // START A SCHEDULER // + // //////////////////// + + const scheduler = new Scheduler({ connection: connectionDetails }); + await scheduler.connect(); + scheduler.start(); + + // ////////////////////// + // REGESTER FOR EVENTS // + // ////////////////////// + + worker.on("start", () => { + console.log("worker started"); + }); + worker.on("end", () => { + console.log("worker ended"); + }); + worker.on("cleaning_worker", (worker, pid) => { + console.log(`cleaning old worker ${worker}`); + }); + worker.on("poll", (queue) => { + console.log(`worker polling ${queue}`); + }); + worker.on("ping", (time) => { + console.log(`worker check in @ ${time}`); + }); + worker.on("job", (queue, job) => { + console.log(`working job ${queue} ${JSON.stringify(job)}`); + }); + worker.on("reEnqueue", (queue, job, plugin) => { + console.log(`reEnqueue job (${plugin}) ${queue} ${JSON.stringify(job)}`); + }); + worker.on("success", (queue, job, result, duration) => { + console.log( + `job success ${queue} ${JSON.stringify(job)} >> ${result} (${duration}ms)` + ); + }); + worker.on("failure", (queue, job, failure, duration) => { + console.log( + `job failure ${queue} ${JSON.stringify( + job + )} >> ${failure} (${duration}ms)` + ); + }); + worker.on("error", (error, queue, job) => { + console.log(`error ${queue} ${JSON.stringify(job)} >> ${error}`); + }); + worker.on("pause", () => { + console.log("worker paused"); + }); + + scheduler.on("start", () => { + console.log("scheduler started"); + }); + scheduler.on("end", () => { + console.log("scheduler ended"); + }); + scheduler.on("poll", () => { + console.log("scheduler polling"); + }); + scheduler.on("leader", () => { + console.log("scheduler became leader"); + }); + scheduler.on("error", (error) => { + console.log(`scheduler error >> ${error}`); + }); + scheduler.on("cleanStuckWorker", (workerName, errorPayload, delta) => { + console.log( + `failing ${workerName} (stuck for ${delta}s) and failing job ${errorPayload}` + ); + }); + scheduler.on("workingTimestamp", (timestamp) => { + console.log(`scheduler working timestamp ${timestamp}`); + }); + scheduler.on("transferredJob", (timestamp, job) => { + console.log(`scheduler enquing job ${timestamp} >> ${JSON.stringify(job)}`); + }); + + // ////////////////////// + // CONNECT TO A QUEUE // + // ////////////////////// + + const queue = new Queue({ connection: connectionDetails }, jobs); + queue.on("error", function (error) { + console.log(error); + }); + await queue.connect(); + await queue.enqueue("math", "add", [1, 2]); + await queue.enqueue("math", "add", [1, 2]); + await queue.enqueue("math", "add", [2, 3]); + await queue.enqueueIn(3000, "math", "subtract", [2, 1]); + jobsToComplete = 4; +} + +boot(); + +// and when you are done +// await queue.end() +// await scheduler.end() +// await worker.end() diff --git a/package-lock.json b/package-lock.json index 995dc714..6d8dadd0 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1086,6 +1086,12 @@ "integrity": "sha1-45sJrqne+Gao8gbiiK9jkZuuOcQ=", "dev": true }, + "array-from": { + "version": "2.1.1", + "resolved": "https://registry.npmjs.org/array-from/-/array-from-2.1.1.tgz", + "integrity": "sha1-z+nYwmYoudxa7MYqn12PHzUsEZU=", + "dev": true + }, "array-unique": { "version": "0.3.2", "resolved": "https://registry.npmjs.org/array-unique/-/array-unique-0.3.2.tgz", @@ -1584,6 +1590,16 @@ } } }, + "d": { + "version": "1.0.1", + "resolved": "https://registry.npmjs.org/d/-/d-1.0.1.tgz", + "integrity": "sha512-m62ShEObQ39CfralilEQRjH6oAMtNCV1xJyEx5LpRYUVN+EviphDgUc/F3hnYbADmkiNs67Y+3ylmlG7Lnu+FA==", + "dev": true, + "requires": { + "es5-ext": "^0.10.50", + "type": "^1.0.1" + } + }, "dashdash": { "version": "1.14.1", "resolved": "https://registry.npmjs.org/dashdash/-/dashdash-1.14.1.tgz", @@ -1778,6 +1794,77 @@ "is-arrayish": "^0.2.1" } }, + "es5-ext": { + "version": "0.10.53", + "resolved": "https://registry.npmjs.org/es5-ext/-/es5-ext-0.10.53.tgz", + "integrity": "sha512-Xs2Stw6NiNHWypzRTY1MtaG/uJlwCk8kH81920ma8mvN8Xq1gsfhZvpkImLQArw8AHnv8MT2I45J3c0R8slE+Q==", + "dev": true, + "requires": { + "es6-iterator": "~2.0.3", + "es6-symbol": "~3.1.3", + "next-tick": "~1.0.0" + } + }, + "es6-iterator": { + "version": "2.0.3", + "resolved": "https://registry.npmjs.org/es6-iterator/-/es6-iterator-2.0.3.tgz", + "integrity": "sha1-p96IkUGgWpSwhUQDstCg+/qY87c=", + "dev": true, + "requires": { + "d": "1", + "es5-ext": "^0.10.35", + "es6-symbol": "^3.1.1" + } + }, + "es6-map": { + "version": "0.1.5", + "resolved": "https://registry.npmjs.org/es6-map/-/es6-map-0.1.5.tgz", + "integrity": "sha1-kTbgUD3MBqMBaQ8LsU/042TpSfA=", + "dev": true, + "requires": { + "d": "1", + "es5-ext": "~0.10.14", + "es6-iterator": "~2.0.1", + "es6-set": "~0.1.5", + "es6-symbol": "~3.1.1", + "event-emitter": "~0.3.5" + } + }, + "es6-set": { + "version": "0.1.5", + "resolved": "https://registry.npmjs.org/es6-set/-/es6-set-0.1.5.tgz", + "integrity": "sha1-0rPsXU2ADO2BjbU40ol02wpzzLE=", + "dev": true, + "requires": { + "d": "1", + "es5-ext": "~0.10.14", + "es6-iterator": "~2.0.1", + "es6-symbol": "3.1.1", + "event-emitter": "~0.3.5" + }, + "dependencies": { + "es6-symbol": { + "version": "3.1.1", + "resolved": "https://registry.npmjs.org/es6-symbol/-/es6-symbol-3.1.1.tgz", + "integrity": "sha1-vwDvT9q2uhtG7Le2KbTH7VcVzHc=", + "dev": true, + "requires": { + "d": "1", + "es5-ext": "~0.10.14" + } + } + } + }, + "es6-symbol": { + "version": "3.1.3", + "resolved": "https://registry.npmjs.org/es6-symbol/-/es6-symbol-3.1.3.tgz", + "integrity": "sha512-NJ6Yn3FuDinBaBRWl/q5X/s4koRHBrgKAu+yGI6JCBeiu3qrcbJhwT2GeR/EXVfylRk8dpQVJoLEFhK+Mu31NA==", + "dev": true, + "requires": { + "d": "^1.0.1", + "ext": "^1.1.2" + } + }, "escape-string-regexp": { "version": "1.0.5", "resolved": "https://registry.npmjs.org/escape-string-regexp/-/escape-string-regexp-1.0.5.tgz", @@ -1815,6 +1902,16 @@ "integrity": "sha512-kVscqXk4OCp68SZ0dkgEKVi6/8ij300KBWTJq32P/dYeWTSwK41WyTxalN1eRmA5Z9UU/LX9D7FWSmV9SAYx6g==", "dev": true }, + "event-emitter": { + "version": "0.3.5", + "resolved": "https://registry.npmjs.org/event-emitter/-/event-emitter-0.3.5.tgz", + "integrity": "sha1-34xp7vFkeSPHFXuc6DhAYQsCzDk=", + "dev": true, + "requires": { + "d": "1", + "es5-ext": "~0.10.14" + } + }, "exec-sh": { "version": "0.3.4", "resolved": "https://registry.npmjs.org/exec-sh/-/exec-sh-0.3.4.tgz", @@ -1921,6 +2018,23 @@ } } }, + "ext": { + "version": "1.4.0", + "resolved": "https://registry.npmjs.org/ext/-/ext-1.4.0.tgz", + "integrity": "sha512-Key5NIsUxdqKg3vIsdw9dSuXpPCQ297y6wBjL30edxwPgt2E44WcWBZey/ZvUc6sERLTxKdyCu4gZFmUbk1Q7A==", + "dev": true, + "requires": { + "type": "^2.0.0" + }, + "dependencies": { + "type": { + "version": "2.1.0", + "resolved": "https://registry.npmjs.org/type/-/type-2.1.0.tgz", + "integrity": "sha512-G9absDWvhAWCV2gmF1zKud3OyC61nZDwWvBL2DApaVFogI07CprggiQAOOjvp2NRjYWFzPyu7vwtDrQFq8jeSA==", + "dev": true + } + } + }, "extend": { "version": "3.0.2", "resolved": "https://registry.npmjs.org/extend/-/extend-3.0.2.tgz", @@ -2046,6 +2160,31 @@ "bser": "2.1.1" } }, + "fengari": { + "version": "0.1.4", + "resolved": "https://registry.npmjs.org/fengari/-/fengari-0.1.4.tgz", + "integrity": "sha512-6ujqUuiIYmcgkGz8MGAdERU57EIluGGPSUgGPTsco657EHa+srq0S3/YUl/r9kx1+D+d4rGfYObd+m8K22gB1g==", + "dev": true, + "requires": { + "readline-sync": "^1.4.9", + "sprintf-js": "^1.1.1", + "tmp": "^0.0.33" + }, + "dependencies": { + "sprintf-js": { + "version": "1.1.2", + "resolved": "https://registry.npmjs.org/sprintf-js/-/sprintf-js-1.1.2.tgz", + "integrity": "sha512-VE0SOVEHCk7Qc8ulkWw3ntAzXuqf7S2lvwQaDLRnUeIEaKNQJzV6BwmLKhOqT61aGhfUMrXeaBk+oDGCzvhcug==", + "dev": true + } + } + }, + "fengari-interop": { + "version": "0.1.2", + "resolved": "https://registry.npmjs.org/fengari-interop/-/fengari-interop-0.1.2.tgz", + "integrity": "sha512-8iTvaByZVoi+lQJhHH9vC+c/Yaok9CwOqNQZN6JrVpjmWwW4dDkeblBXhnHC+BoI6eF4Cy5NKW3z6ICEjvgywQ==", + "dev": true + }, "fill-range": { "version": "7.0.1", "resolved": "https://registry.npmjs.org/fill-range/-/fill-range-7.0.1.tgz", @@ -2407,6 +2546,22 @@ "standard-as-callback": "^2.0.1" } }, + "ioredis-mock": { + "version": "5.1.0", + "resolved": "https://registry.npmjs.org/ioredis-mock/-/ioredis-mock-5.1.0.tgz", + "integrity": "sha512-6t5ru9KWz6/U2IfvgPKd69MILfnqgz8s5oj3bPsiI50WavHuCTzlAwHF2Yw/MaEeJM5BkBwPIzY0AdTU4bzscw==", + "dev": true, + "requires": { + "array-from": "^2.1.1", + "es6-map": "^0.1.5", + "es6-set": "^0.1.5", + "fengari": "^0.1.4", + "fengari-interop": "^0.1.2", + "lodash": "^4.17.20", + "minimatch": "^3.0.4", + "standard-as-callback": "^2.0.1" + } + }, "ip-regex": { "version": "2.1.0", "resolved": "https://registry.npmjs.org/ip-regex/-/ip-regex-2.1.0.tgz", @@ -4165,6 +4320,12 @@ "integrity": "sha512-Yd3UES5mWCSqR+qNT93S3UoYUkqAZ9lLg8a7g9rimsWmYGK8cVToA4/sF3RrshdyV3sAGMXVUmpMYOw+dLpOuw==", "dev": true }, + "next-tick": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/next-tick/-/next-tick-1.0.0.tgz", + "integrity": "sha1-yobR/ogoFpsBICCOPchCS524NCw=", + "dev": true + }, "nice-try": { "version": "1.0.5", "resolved": "https://registry.npmjs.org/nice-try/-/nice-try-1.0.5.tgz", @@ -4354,6 +4515,12 @@ "word-wrap": "~1.2.3" } }, + "os-tmpdir": { + "version": "1.0.2", + "resolved": "https://registry.npmjs.org/os-tmpdir/-/os-tmpdir-1.0.2.tgz", + "integrity": "sha1-u+Z0BseaqFxc/sdm/lc0VV36EnQ=", + "dev": true + }, "p-each-series": { "version": "2.1.0", "resolved": "https://registry.npmjs.org/p-each-series/-/p-each-series-2.1.0.tgz", @@ -4592,6 +4759,12 @@ "type-fest": "^0.8.1" } }, + "readline-sync": { + "version": "1.4.10", + "resolved": "https://registry.npmjs.org/readline-sync/-/readline-sync-1.4.10.tgz", + "integrity": "sha512-gNva8/6UAe8QYepIQH/jQ2qn91Qj0B9sYjMBBs3QOB8F2CXcKgLxQaJRP76sWVRQt+QU+8fAkCbCvjjMFu7Ycw==", + "dev": true + }, "rechoir": { "version": "0.6.2", "resolved": "https://registry.npmjs.org/rechoir/-/rechoir-0.6.2.tgz", @@ -5427,6 +5600,15 @@ "integrity": "sha512-fcwX4mndzpLQKBS1DVYhGAcYaYt7vsHNIvQV+WXMvnow5cgjPphq5CaayLaGsjRdSCKZFNGt7/GYAuXaNOiYCA==", "dev": true }, + "tmp": { + "version": "0.0.33", + "resolved": "https://registry.npmjs.org/tmp/-/tmp-0.0.33.tgz", + "integrity": "sha512-jRCJlojKnZ3addtTOjdIqoRuPEKBvNXcGYqzO6zWZX8KfKEpnGY5jfggJQ3EjKuu8D4bJRr0y+cYJFmYbImXGw==", + "dev": true, + "requires": { + "os-tmpdir": "~1.0.2" + } + }, "tmpl": { "version": "1.0.4", "resolved": "https://registry.npmjs.org/tmpl/-/tmpl-1.0.4.tgz", @@ -5561,6 +5743,12 @@ "integrity": "sha1-WuaBd/GS1EViadEIr6k/+HQ/T2Q=", "dev": true }, + "type": { + "version": "1.2.0", + "resolved": "https://registry.npmjs.org/type/-/type-1.2.0.tgz", + "integrity": "sha512-+5nt5AAniqsCnu2cEQQdpzCAh33kVx8n0VoFidKpB1dVVLAN/F+bgVOqOJqOnEnrhp222clB5p3vUlD+1QAnfg==", + "dev": true + }, "type-check": { "version": "0.3.2", "resolved": "https://registry.npmjs.org/type-check/-/type-check-0.3.2.tgz", diff --git a/package.json b/package.json index d56113f7..c23a8299 100644 --- a/package.json +++ b/package.json @@ -32,6 +32,7 @@ "@types/ioredis": "^4.17.7", "@types/jest": "^26.0.15", "@types/node": "^14.14.6", + "ioredis-mock": "^5.1.0", "jest": "^26.6.3", "node-schedule": "^1.3.2", "prettier": "^2.1.2", @@ -50,7 +51,7 @@ }, "scripts": { "prepare": "npm run build && npm run docs", - "pretest": "npm run lint", + "pretest": "npm run lint && npm run build", "lint": "prettier --check src __tests__ examples \"*.md\"", "pretty": "prettier --write src __tests__ examples \"**/*.md\"", "test": "jest", diff --git a/src/core/connection.ts b/src/core/connection.ts index 229879f5..abba2d46 100644 --- a/src/core/connection.ts +++ b/src/core/connection.ts @@ -3,7 +3,6 @@ import { EventEmitter } from "events"; import * as IORedis from "ioredis"; import * as fs from "fs"; -import * as os from "os"; import * as path from "path"; import { ConnectionOptions } from "../types/options"; @@ -61,17 +60,19 @@ export class Connection extends EventEmitter { this.redis = this.options.redis; await connectionTestAndLoadLua(); } else { - if (this.options.pkg === "ioredis") { - const Pkg = IORedis; - this.options.options.db = this.options.database; - this.redis = new Pkg( + const Pkg = require(this.options.pkg); + if ( + typeof Pkg.createClient === "function" && + this.options.pkg !== "ioredis" + ) { + this.redis = Pkg.createClient( this.options.port, this.options.host, this.options.options ); } else { - const Pkg = require(this.options.pkg); - this.redis = Pkg.createClient( + this.options.options.db = this.options.database; + this.redis = new Pkg( this.options.port, this.options.host, this.options.options @@ -88,13 +89,17 @@ export class Connection extends EventEmitter { this.redis.on("error", (err) => this.eventListeners.error(err)); this.redis.on("end", () => this.eventListeners.end()); - if (!this.options.redis) { + if (!this.options.redis && typeof this.redis.select === "function") { await this.redis.select(this.options.database); } + await connectionTestAndLoadLua(); } loadLua() { + // even though ioredis-mock can run LUA, cjson is not available + if (this.options.pkg === "ioredis-mock") return; + const luaDir = path.join(__dirname, "..", "..", "lua"); const files = fs.readdirSync(luaDir); diff --git a/src/core/scheduler.ts b/src/core/scheduler.ts index 01f7c0c2..df832d32 100644 --- a/src/core/scheduler.ts +++ b/src/core/scheduler.ts @@ -268,10 +268,8 @@ export class Scheduler extends EventEmitter { private async cleanupTimestamp(timestamp: number) { const key = this.connection.key("delayed:" + timestamp); - await this.connection.redis.watch(key); - await this.connection.redis.watch( - this.connection.key("delayed_queue_schedule") - ); + await this.watchIfPossible(key); + await this.watchIfPossible(this.connection.key("delayed_queue_schedule")); const length = await this.connection.redis.llen(key); if (length === 0) { await this.connection.redis @@ -280,7 +278,7 @@ export class Scheduler extends EventEmitter { .zrem(this.connection.key("delayed_queue_schedule"), timestamp) .exec(); } - await this.connection.redis.unwatch(); + await this.unwatchIfPossible(); } private async checkStuckWorkers() { @@ -325,4 +323,16 @@ export class Scheduler extends EventEmitter { const errorPayload = await this.queue.forceCleanWorker(workerName); this.emit("cleanStuckWorker", workerName, errorPayload, delta); } + + private async watchIfPossible(key: string) { + if (typeof this.connection.redis.watch === "function") { + return this.connection.redis.watch(key); + } + } + + private async unwatchIfPossible() { + if (typeof this.connection.redis.unwatch === "function") { + return this.connection.redis.unwatch(); + } + } } diff --git a/src/core/worker.ts b/src/core/worker.ts index a46e922a..939012a1 100644 --- a/src/core/worker.ts +++ b/src/core/worker.ts @@ -422,13 +422,30 @@ export class Worker extends EventEmitter { this.stringQueues() ); - const encodedJob: string = await this.connection.redis["popAndStoreJob"]( - queueKey, - workerKey, - new Date().toString(), - this.queue, - this.name - ); + let encodedJob: string; + + if (this.connection.redis["popAndStoreJob"]) { + encodedJob = await this.connection.redis["popAndStoreJob"]( + queueKey, + workerKey, + new Date().toString(), + this.queue, + this.name + ); + } else { + encodedJob = await this.connection.redis.lpop(queueKey); + if (encodedJob) { + await this.connection.redis.set( + workerKey, + JSON.stringify({ + run_at: new Date().toString(), + queue: this.queue, + worker: this.name, + payload: JSON.parse(encodedJob), + }) + ); + } + } if (encodedJob) currentJob = JSON.parse(encodedJob);