Jobstream Queue Engine is a wrapper around SQS. The goals of this project are to:
- Simplify the steps required for setting up asynchronous queue-based processing by abstracting away all of the boilerplate scaffolding and usage of the AWS SQS client SDK
- Provide an extremely simple interface for enqueueing and dequeueing messages to and from an AWS SQS queue, leaving your code clean and easy to reason about
Using npm:
$> npm install -s jobstream-queue-engine
In NodeJS:
- Create a class for each asynchronous processing job. See the example-jobstream-workers folder of this repository for an example.
// Import the Jobstream class
import Jobstream from "jobstream-queue-engine";
// Create a class that extends Jobstream. It must include 2 methods:
// 1. `static configFilePath()` - Path to a YAML config file that includes the following keys:
// * sqs_queue_name
// * aws_region
// * aws_account_id
// 2. `async perform({ message })` - Processing logic that the asynchronous job is intended
// to carry out.
class SampleAsyncJob extends Jobstream {
static configFilePath() {
/*
* The following keys are required in the config file:
* - sqs_queue_name (example: 'SampleQueueFifo')
* - aws_region (example: 'us-east-1')
* - aws_account_id (example: '123456789012')
*/
return '/path/to/config.yaml';
}
async perform({ message }) {
/*
* Some set of steps to execute for processing the incoming message.
*
* --IMPORTANT--
* - A truthy return value will proceed to delete the message from SQS so that it is
* not re-enqueued.
* - A falsy return value will have SQS re-enqueue the message. AWS SQS will automatically
* re-enqueue messages up to the configured maxReceiveCount of the queue, or indefinitely
* if not configured.
*/
}
}
- Create an instance of the class you just created using the static "create" method that is exposed by
Jobstream
// The workerId argument is optional. When supplied, it appends the workerId to the job's logs.
// This is particularly useful in the likely scenario that you have multiple instances of the
// same type of job simultaenously processing messages.
const sampleRequestAsyncJob = await SampleRequestAsyncJob.create({ workerId: workerId });
- Enqueue a message using any instance
// Note: Messages are base64 encoded in transit. Encryption can be enabled on AWS for more
// stringent security requirements.
const response = await sampleRequestAsyncJob.enqueue({
message: {
foo: 'bar',
},
});
- Dequeue a message using any instance
// Note: The dequeue method will poll SQS for a message. If a message is received, it will then
// be base64 decoded and supplied as the 'message' parameter in your class's
// 'perform({ message })' method.
const message = await sampleRequestAsyncJob.dequeue();
For a complete example end-to-end, see this repo.
- JSON-formatted config files
- Support for message 'attributes' (different from the message 'body')
- SQS queue setup and configuration via AWS Cloudformation
- Integration with SNS and/or AWS Kinesis