Skip to content

Commit

Permalink
fix(flow): respect defaultJobOptions from queue opts (#1080) fixes #1034
Browse files Browse the repository at this point in the history




* docs(flow): add descriptions of opts param
  • Loading branch information
roggervalf authored Feb 20, 2022
1 parent 3b3ff30 commit 0aca072
Show file tree
Hide file tree
Showing 3 changed files with 158 additions and 5 deletions.
44 changes: 39 additions & 5 deletions docs/gitbook/guide/flows/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ The above call will return instances for all the jobs added to the queue.
Note that the parent queue does not need to be the same queue as the one used for the children.
{% endhint %}

When the parent job is processed it is possible to access the results generated by its child jobs. For example lets assume the following worker for the child jobs:
When the parent job is processed it is possible to access the results generated by its child jobs. For example, lets assume the following worker for the child jobs:

```typescript
import { Worker } from 'bullmq';
Expand All @@ -69,7 +69,7 @@ const stepsQueue = new Worker('steps', async job => {
});
```

we can implement a parent worker that sums the costs of the children's jobs using the "_getChildrenValues_" method. This method returns an object with job keys as keys and the result of that given job as a value:
We can implement a parent worker that sums the costs of the children's jobs using the "_getChildrenValues_" method. This method returns an object with job keys as keys and the result of that given job as a value:

```typescript
import { Worker } from 'bullmq';
Expand Down Expand Up @@ -115,21 +115,21 @@ The order of processing would be: 'chassis', 'wheels' and finally 'engine'.

## Getters

There are some special getters that can be used in order to get jobs related to a flow. First we have a method in the Job class to get all the dependencies for a given job:
There are some special getters that can be used in order to get jobs related to a flow. First, we have a method in the Job class to get all the dependencies for a given job:

```typescript
const dependencies = await job.getDependencies();
```

it will return all the **direct** **dependencies**, i.e. the children of a given job.

The Job class also provides a another method that we presented above to get all the values produced by the children of a given job:
The Job class also provides another method that we presented above to get all the values produced by the children of a given job:

```typescript
const values = await job.getChildrenValues();
```

Also a new property is available in the Job class, _**parentKey,**_ with a fully qualified key for the job parent.
Also, a new property is available in the Job class, _**parentKey,**_ with a fully qualified key for the job parent.

Finally, there is also a new state where a job can be in, "waiting-children", for parent jobs that have not yet had their children completed:

Expand All @@ -138,6 +138,40 @@ const state = await job.getState();
// state will be "waiting-children"
```

## Provide options

Something to take in count that when adding a flow, there is an extra param **opts**, where you can add your queue options, this is a dictionary of options for the queues that you use in your flow, these options would affect each of the jobs that belongs to the mapped queue options.

```typescript
import { FlowProducer } from 'bullmq';
const flowProducer = new FlowProducer();

const queueName = 'assembly-line';
const chain = await flowProducer.add(
{
name: 'car',
data: { step: 'engine' },
queueName,
children: [
{
name: 'car',
data: { step: 'wheels' },
queueName,
},
],
},
{
queuesOptions: {
[queueName]: {
defaultJobOptions: {
removeOnComplete: true,
},
},
},
},
);
```

## Jobs removal

BullMQ also provides seamless removal functionality for jobs that are part of a flow.
Expand Down
4 changes: 4 additions & 0 deletions src/classes/flow-producer.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import { EventEmitter } from 'events';
import { get } from 'lodash';
import { Redis, Pipeline } from 'ioredis';
import { v4 } from 'uuid';
import {
FlowJob,
FlowQueuesOpts,
FlowOpts,
QueueBaseOptions,
QueueOptions,
RedisClient,
} from '../interfaces';
import { getParentKey, jobIdForGroup } from '../utils';
Expand Down Expand Up @@ -201,13 +203,15 @@ export class FlowProducer extends EventEmitter {
);
const queueOpts = queuesOpts && queuesOpts[node.queueName];

const jobsOpts = get(queueOpts, 'defaultJobOptions');
const jobId = jobIdForGroup(node.opts, node.data, queueOpts) || v4();

const job = new Job(
queue,
node.name,
node.data,
{
...(jobsOpts ? jobsOpts : {}),
...node.opts,
parent: parent?.parentOpts,
},
Expand Down
115 changes: 115 additions & 0 deletions tests/test_flow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,121 @@ describe('flows', () => {
await removeAllQueueData(new IORedis(), parentQueueName);
});

describe('when defaultJobOptions is provided', async () => {
it('processes children before the parent', async () => {
const parentQueueName = `parent-queue-${v4()}`;
const parentQueue = new Queue(parentQueueName, { connection });
const name = 'child-job';
const values = [
{ bar: 'something' },
{ baz: 'something' },
{ qux: 'something' },
];

let childrenProcessor,
processedChildren = 0;
const processingChildren = new Promise<void>(
resolve =>
(childrenProcessor = async (job: Job) => {
processedChildren++;

if (processedChildren == values.length) {
resolve();
}
return values[job.data.idx];
}),
);

const parentProcessor = async (job: Job) => {
const { processed, nextProcessedCursor } = await job.getDependencies({
processed: {},
});
expect(nextProcessedCursor).to.be.equal(0);
expect(Object.keys(processed)).to.have.length(3);

const childrenValues = await job.getChildrenValues();

for (let i = 0; i < values.length; i++) {
const jobKey = queue.toKey(tree.children[i].job.id);
expect(childrenValues[jobKey]).to.be.deep.equal(values[i]);
}
};

const parentWorker = new Worker(parentQueueName, parentProcessor, {
connection,
});
const childrenWorker = new Worker(queueName, childrenProcessor, {
connection,
});
await parentWorker.waitUntilReady();
await childrenWorker.waitUntilReady();

const completed = new Promise<void>(resolve => {
parentWorker.on('completed', async (job: Job) => {
expect(job.finishedOn).to.be.string;
const gotJob = await parentQueue.getJob(job.id);
expect(gotJob).to.be.undefined;
const counts = await parentQueue.getJobCounts('completed');
expect(counts.completed).to.be.equal(0);
resolve();
});
});

const flow = new FlowProducer({ connection });
const tree = await flow.add(
{
name: 'parent-job',
queueName: parentQueueName,
data: {},
children: [
{ name, data: { idx: 0, foo: 'bar' }, queueName },
{ name, data: { idx: 1, foo: 'baz' }, queueName },
{ name, data: { idx: 2, foo: 'qux' }, queueName },
],
},
{
queuesOptions: {
[parentQueueName]: {
defaultJobOptions: {
removeOnComplete: true,
},
},
},
},
);

expect(tree).to.have.property('job');
expect(tree).to.have.property('children');

const { children, job } = tree;
const parentState = await job.getState();

expect(parentState).to.be.eql('waiting-children');
expect(children).to.have.length(3);

expect(children[0].job.id).to.be.ok;
expect(children[0].job.data.foo).to.be.eql('bar');
expect(children[0].job.parent).to.deep.equal({
id: job.id,
queueKey: `bull:${parentQueueName}`,
});
expect(children[1].job.id).to.be.ok;
expect(children[1].job.data.foo).to.be.eql('baz');
expect(children[2].job.id).to.be.ok;
expect(children[2].job.data.foo).to.be.eql('qux');

await processingChildren;
await childrenWorker.close();

await completed;
await parentWorker.close();

await flow.close();

await removeAllQueueData(new IORedis(), parentQueueName);
});
});

describe('when backoff strategy is provided', async () => {
it('retries a job after a delay if a fixed backoff is given', async () => {
const name = 'child-job';
Expand Down

0 comments on commit 0aca072

Please sign in to comment.