Skip to content

Commit

Permalink
Node Rescue works with ioredis-mock (#475)
Browse files Browse the repository at this point in the history
* spec helper uses imports

* ioredis-mock works

* make methods private

* pretest command includes build

* test both ioredis and ioredis-mock the same way
  • Loading branch information
evantahler authored Nov 16, 2020
1 parent b5aa9d5 commit c908bfc
Show file tree
Hide file tree
Showing 9 changed files with 601 additions and 24 deletions.
87 changes: 87 additions & 0 deletions __tests__/integration/ioredis-mock.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import { Queue, Worker, Scheduler } from "../../src";
import * as RedisMock from "ioredis-mock";
import specHelper from "../utils/specHelper";

// for ioredis-mock, we need to re-use a shared connection
// setting "pkg" is important!
const REDIS = new RedisMock();
const connectionDetails = { redis: REDIS, pkg: "ioredis-mock" };

const jobs = {
add: {
perform: async (a, b) => {
const response = a + b;
return response;
},
},
};

describe("testing with ioredis-mock package", () => {
let queue: Queue;
let scheduler: Scheduler;
let worker: Worker;

afterAll(async () => {
await queue.end();
await scheduler.end();
await worker.end();
});

test("a queue can be created", async () => {
queue = new Queue(
{ connection: connectionDetails, queues: ["math"] },
jobs
);
await queue.connect();
});

test("a scheduler can be created", async () => {
scheduler = new Scheduler(
{ connection: connectionDetails, queues: ["math"] },
jobs
);
await scheduler.connect();
// await scheduler.start();
});

test("a worker can be created", async () => {
worker = new Worker(
{
connection: connectionDetails,
queues: ["math"],
timeout: specHelper.timeout,
},
jobs
);
await worker.connect();
// worker.start();
});

test("a job can be enqueued", async () => {
await queue.enqueueIn(1, "math", "add", [1, 2]);
const times = await queue.scheduledAt("math", "add", [1, 2]);
expect(times.length).toBe(1);
});

test("the scheduler can promote the job", async () => {
await scheduler.poll();
const times = await queue.scheduledAt("math", "add", [1, 2]);
expect(times.length).toBe(0);
const jobsLength = await queue.length("math");
expect(jobsLength).toBe(1);
});

test("the worker can work the job", async (done) => {
await worker.start();
worker.on("success", async (q, job, result, duration) => {
expect(q).toBe("math");
expect(job.class).toBe("add");
expect(result).toBe(3);
expect(worker.result).toBe(result);
expect(duration).toBeGreaterThanOrEqual(0);

worker.removeAllListeners("success");
done();
});
});
});
89 changes: 89 additions & 0 deletions __tests__/integration/ioredis.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import { Queue, Worker, Scheduler } from "../../src";
import specHelper from "../utils/specHelper";

const connectionDetails = {
pkg: "ioredis",
host: "127.0.0.1",
password: null,
port: 6379,
database: parseInt(process.env.JEST_WORKER_ID || "0"),
};

const jobs = {
add: {
perform: async (a, b) => {
const response = a + b;
return response;
},
},
};

describe("testing with ioredis package", () => {
let queue: Queue;
let scheduler: Scheduler;
let worker: Worker;

afterAll(async () => {
await queue.end();
await scheduler.end();
await worker.end();
});

test("a queue can be created", async () => {
queue = new Queue(
{ connection: connectionDetails, queues: ["math"] },
jobs
);
await queue.connect();
});

test("a scheduler can be created", async () => {
scheduler = new Scheduler(
{ connection: connectionDetails, queues: ["math"] },
jobs
);
await scheduler.connect();
// await scheduler.start();
});

test("a worker can be created", async () => {
worker = new Worker(
{
connection: connectionDetails,
queues: ["math"],
timeout: specHelper.timeout,
},
jobs
);
await worker.connect();
// worker.start();
});

test("a job can be enqueued", async () => {
await queue.enqueueIn(1, "math", "add", [1, 2]);
const times = await queue.scheduledAt("math", "add", [1, 2]);
expect(times.length).toBe(1);
});

test("the scheduler can promote the job", async () => {
await scheduler.poll();
const times = await queue.scheduledAt("math", "add", [1, 2]);
expect(times.length).toBe(0);
const jobsLength = await queue.length("math");
expect(jobsLength).toBe(1);
});

test("the worker can work the job", async (done) => {
await worker.start();
worker.on("success", async (q, job, result, duration) => {
expect(q).toBe("math");
expect(job.class).toBe("add");
expect(result).toBe(3);
expect(worker.result).toBe(result);
expect(duration).toBeGreaterThanOrEqual(0);

worker.removeAllListeners("success");
done();
});
});
});
10 changes: 7 additions & 3 deletions __tests__/utils/specHelper.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
const Redis = require("ioredis");
import * as Redis from "ioredis";
import * as NodeResque from "../../src/index";

const namespace = `resque-test-${process.env.JEST_WORKER_ID || 0}`;
const queue = "test_queue";
const pkg = "ioredis";
const NodeResque = require("../../src/index");

const SpecHelper = {
pkg: pkg,
Expand All @@ -21,11 +22,14 @@ const SpecHelper = {
},

connect: async function () {
this.redis = Redis.createClient(
if (!this.connectionDetails.options) this.connectionDetails.options = {};
this.connectionDetails.options.db = this.connectionDetails?.options?.database;
this.redis = new Redis(
this.connectionDetails.port,
this.connectionDetails.host,
this.connectionDetails.options
);

this.redis.setMaxListeners(0);
if (
this.connectionDetails.password !== null &&
Expand Down
176 changes: 176 additions & 0 deletions examples/example-mock.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
#!/usr/bin/env ts-node

import { Queue, Scheduler, Worker } from "../src";
/* In your projects:
import { Queue, Scheduler, Worker } from "node-resque";
*/

import * as RedisMock from "ioredis-mock";

async function boot() {
// ////////////////////////
// SET UP THE CONNECTION //
// ////////////////////////

// for ioredis-mock, we need to re-use a shared connection
// setting "pkg" is important!
const connectionDetails = { redis: new RedisMock(), pkg: "ioredis-mock" };

// ///////////////////////////
// DEFINE YOUR WORKER TASKS //
// ///////////////////////////

let jobsToComplete = 0;

const jobs = {
add: {
plugins: ["JobLock"],
pluginOptions: {
JobLock: {},
},
perform: async (a, b) => {
await new Promise((resolve) => {
setTimeout(resolve, 1000);
});
jobsToComplete--;
tryShutdown();

const answer = a + b;
return answer;
},
},
subtract: {
perform: (a, b) => {
jobsToComplete--;
tryShutdown();

const answer = a - b;
return answer;
},
},
};

// just a helper for this demo
async function tryShutdown() {
if (jobsToComplete === 0) {
await new Promise((resolve) => {
setTimeout(resolve, 500);
});
await scheduler.end();
await worker.end();
process.exit();
}
}

// /////////////////
// START A WORKER //
// /////////////////

const worker = new Worker(
{ connection: connectionDetails, queues: ["math", "otherQueue"] },
jobs
);
await worker.connect();
worker.start();

// ////////////////////
// START A SCHEDULER //
// ////////////////////

const scheduler = new Scheduler({ connection: connectionDetails });
await scheduler.connect();
scheduler.start();

// //////////////////////
// REGESTER FOR EVENTS //
// //////////////////////

worker.on("start", () => {
console.log("worker started");
});
worker.on("end", () => {
console.log("worker ended");
});
worker.on("cleaning_worker", (worker, pid) => {
console.log(`cleaning old worker ${worker}`);
});
worker.on("poll", (queue) => {
console.log(`worker polling ${queue}`);
});
worker.on("ping", (time) => {
console.log(`worker check in @ ${time}`);
});
worker.on("job", (queue, job) => {
console.log(`working job ${queue} ${JSON.stringify(job)}`);
});
worker.on("reEnqueue", (queue, job, plugin) => {
console.log(`reEnqueue job (${plugin}) ${queue} ${JSON.stringify(job)}`);
});
worker.on("success", (queue, job, result, duration) => {
console.log(
`job success ${queue} ${JSON.stringify(job)} >> ${result} (${duration}ms)`
);
});
worker.on("failure", (queue, job, failure, duration) => {
console.log(
`job failure ${queue} ${JSON.stringify(
job
)} >> ${failure} (${duration}ms)`
);
});
worker.on("error", (error, queue, job) => {
console.log(`error ${queue} ${JSON.stringify(job)} >> ${error}`);
});
worker.on("pause", () => {
console.log("worker paused");
});

scheduler.on("start", () => {
console.log("scheduler started");
});
scheduler.on("end", () => {
console.log("scheduler ended");
});
scheduler.on("poll", () => {
console.log("scheduler polling");
});
scheduler.on("leader", () => {
console.log("scheduler became leader");
});
scheduler.on("error", (error) => {
console.log(`scheduler error >> ${error}`);
});
scheduler.on("cleanStuckWorker", (workerName, errorPayload, delta) => {
console.log(
`failing ${workerName} (stuck for ${delta}s) and failing job ${errorPayload}`
);
});
scheduler.on("workingTimestamp", (timestamp) => {
console.log(`scheduler working timestamp ${timestamp}`);
});
scheduler.on("transferredJob", (timestamp, job) => {
console.log(`scheduler enquing job ${timestamp} >> ${JSON.stringify(job)}`);
});

// //////////////////////
// CONNECT TO A QUEUE //
// //////////////////////

const queue = new Queue({ connection: connectionDetails }, jobs);
queue.on("error", function (error) {
console.log(error);
});
await queue.connect();
await queue.enqueue("math", "add", [1, 2]);
await queue.enqueue("math", "add", [1, 2]);
await queue.enqueue("math", "add", [2, 3]);
await queue.enqueueIn(3000, "math", "subtract", [2, 1]);
jobsToComplete = 4;
}

boot();

// and when you are done
// await queue.end()
// await scheduler.end()
// await worker.end()
Loading

0 comments on commit c908bfc

Please sign in to comment.