Skip to content

Commit

Permalink
fix(rate-limit): consider paused queue when dynamic rate limit (#1884)
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf authored May 9, 2023
1 parent a880069 commit a23f37e
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 54 deletions.
3 changes: 3 additions & 0 deletions src/classes/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -889,6 +889,9 @@ export class Scripts {
this.queue.keys.wait,
this.queue.keys.stalled,
lockKey,
this.queue.keys.paused,
this.queue.keys.meta,
this.queue.keys.events,
];

const args = [jobId, token];
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,24 @@
--[[
Function to move job from active state to wait.
Input:
keys[1] active key
keys[2] wait key
KEYS[1] active key
KEYS[2] wait key
keys[3] stalled key
keys[4] job lock key
KEYS[3] stalled key
KEYS[4] job lock key
KEYS[5] paused key
KEYS[6] meta key
KEYS[7] event key
args[1] job id
args[2] lock token
]]
local rcall = redis.call

-- Includes
--- @include "includes/getTargetQueueList"

local jobId = ARGV[1]
local token = ARGV[2]
local lockKey = KEYS[4]
Expand All @@ -21,8 +27,13 @@ local lockToken = rcall("GET", lockKey)
if lockToken == token then
local removed = rcall("LREM", KEYS[1], 1, jobId)
if (removed > 0) then
local target = getTargetQueueList(KEYS[6], KEYS[2], KEYS[5])

rcall("SREM", KEYS[3], jobId)
rcall("RPUSH", KEYS[2], jobId);
rcall("RPUSH", target, jobId)
rcall("DEL", lockKey)

-- Emit waiting event
rcall("XADD", KEYS[7], "*", "event", "waiting", "jobId", jobId)
end
end
149 changes: 100 additions & 49 deletions tests/test_rate_limiter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -261,65 +261,116 @@ describe('Rate Limiter', function () {
await worker.close();
});

it('should obey the rate limit with dynamic limit', async function () {
this.timeout(5000);
describe('when dynamic limit is used', () => {
it('should obey the rate limit', async function () {
this.timeout(5000);

const numJobs = 10;
const dynamicLimit = 250;
const duration = 100;
const margin = 0.95; // 5% margin for CI
const numJobs = 10;
const dynamicLimit = 250;
const duration = 100;
const margin = 0.95; // 5% margin for CI

const worker = new Worker(
queueName,
async job => {
if (job.attemptsMade === 1) {
await worker.rateLimit(dynamicLimit);
throw Worker.RateLimitError();
}
},
{
connection,
limiter: {
max: 1,
duration,
const worker = new Worker(
queueName,
async job => {
if (job.attemptsMade === 1) {
await worker.rateLimit(dynamicLimit);
throw Worker.RateLimitError();
}
},
},
);
{
connection,
limiter: {
max: 1,
duration,
},
},
);

const result = new Promise<void>((resolve, reject) => {
queueEvents.on(
'completed',
// after every job has been completed
after(numJobs, async () => {
await worker.close();
const result = new Promise<void>((resolve, reject) => {
queueEvents.on(
'completed',
// after every job has been completed
after(numJobs, async () => {
await worker.close();

try {
const timeDiff = new Date().getTime() - startTime;
expect(timeDiff).to.be.gte(
(numJobs * dynamicLimit + numJobs * duration) * margin,
);
resolve();
} catch (err) {
reject(err);
}
}),
);
try {
const timeDiff = new Date().getTime() - startTime;
expect(timeDiff).to.be.gte(
(numJobs * dynamicLimit + numJobs * duration) * margin,
);
resolve();
} catch (err) {
reject(err);
}
}),
);

queueEvents.on('failed', async err => {
await worker.close();
reject(err);
queueEvents.on('failed', async err => {
await worker.close();
reject(err);
});
});

const startTime = new Date().getTime();
const jobs = Array.from(Array(numJobs).keys()).map(() => ({
name: 'rate test',
data: {},
}));
await queue.addBulk(jobs);

await result;
await worker.close();
});

const startTime = new Date().getTime();
const jobs = Array.from(Array(numJobs).keys()).map(() => ({
name: 'rate test',
data: {},
}));
await queue.addBulk(jobs);
describe('when queue is paused', () => {
it('moves job to paused', async function () {
const dynamicLimit = 250;
const duration = 100;

const worker = new Worker(
queueName,
async job => {
if (job.attemptsMade === 1) {
await queue.pause();
await delay(150);
await worker.rateLimit(dynamicLimit);
throw Worker.RateLimitError();
}
},
{
connection,
autorun: false,
limiter: {
max: 1,
duration,
},
},
);

await result;
await worker.close();
const result = new Promise<void>((resolve, reject) => {
queueEvents.on(
'waiting',
// after every job has been moved to waiting again
after(2, () => {
resolve();
}),
);
});

await delay(200);
await queue.add('rate test', {});

worker.run();

await result;

const pausedCount = await queue.getJobCountByTypes('paused');
expect(pausedCount).to.equal(1);

await worker.close();
});
});
});

describe('when there are more added jobs than max limiter', () => {
Expand Down

0 comments on commit a23f37e

Please sign in to comment.