- Installation
- Module Initialization
- RmqGlobalService
- Module-Specific Configuration
- Throw error and handling
Start by installing the @diy0r/nestjs-rabbitmq
package:
npm i @diy0r/nestjs-rabbitmq
In your root module, import RmqModule
:
import { RmqModule, IRMQExtendedOptions } from '@diy0r/nestjs-rabbitmq';
const extendedOptions: IRMQExtendedOptions = {
typeChannel: TypeChannel.CONFIR_CHANNEL,
globalBroker: {
replyTo: {
queue: '',
options: { exclusive: true },
consumeOptions: { noAck: true },
},
//..
},
//...
};
@Module({
imports: [
RmqModule.forRoot({
connectOptions: {
username: 'username',
password: 'password',
hostname: 'localhost',
port: 5672,
vhost: '/',
protocol: 'amqp',
},
extendedOptions, //optional
}),
],
providers: [SomeService],
})
export class AppModule {}
@Module({
imports: [
RmqModule.forRootAsync({
useFactory: async (...providers: any[]) => ({
connectOptions: {
username: 'username',
password: 'password',
hostname: 'localhost',
port: 5672,
vhost: '/',
protocol: 'amqp',
},
extendedOptions, //optional
}),
inject: [],
imports: [],
}),
],
providers: [SomeService],
})
export class AppModule {}
You can also connect using the standard URI. For more information, see the RabbitMQ URI Specification.
RmqModule.forRoot({
connectOptions: 'amqp://username:password@localhost:5672/',
extendedOptions, //optional
});
In forRoot({ connectOptions, extendedOptions })
, we pass an object consisting of two parameters. The first, connectOptions, is used for connections, and the second, extendedOptions, is for additional settings.
-
connectOptions: IRMQConnectConfig - standard parameters amqlib.connect for connection. Parameters can be an object or a URI.
-
extendedOptions: IRMQExtendedOptions - environment settings (optional)
- typeChannel - channel type, default is
TypeChannel.CHANNEL
- prefetch - Configuration for message prefetching. See
- count - The maximum number of messages that can be sent over the channel before receiving an acknowledgment. A falsy value indicates no limit.
- isGlobal - If
true
, applies the prefetch limit to the entire channel (for versions before RabbitMQ 3.3.0). For newer versions, this parameter is ignored as prefetch is applied per consumer.
- socketOptions - here you can configure SSL/TLS. See SSL/TLS and Client properties
- globalBroker - specify if you want to use RPC from
RmqGlobalService
- replyTo - needed for setting up the
replyTo
queue- queue - queue name. It's recommended to leave it as an empty string so RabbitMQ will automatically generate a unique name.
- options - options passed as the second parameter in assertQueue
- consumeOptions - configure the consumer with a callback that will be invoked with each message. See
- errorHandler (class) - custom error handler for dealing with errors from replies that extends
RMQErrorHandler
.
- serDes (object) - The serDes parameter is an object that defines serialization and deserialization functions for handling messages. See
- messageTimeout - Number of milliseconds the
send
method will wait for the response before a timeout error. Default is 40,000. - serviceName - an arbitrary identifier for the originating application
- replyTo - needed for setting up the
- typeChannel - channel type, default is
We recommend specifying the TypeChannel.CONFIR_CHANNEL
channel type to get more accurate statuses for your requests.
After successful connection, RmqGlobalService
will be available globally and you will be able to import your module from any part of your application and send messages.
@Injectable()
export class SomeService {
constructor(private readonly rmqGlobalService: RmqGlobalService) {}
//...
}
We recommend specifying interfaces for the objects you send and receive.
interface IReply {
message: object;
}
interface ISend {
age: number;
name: string;
}
async sendNotify(obj: ISend) {
const { status } = await this.rmqGlobalService.notify<ISend>(
'exchange',
'user.login',
obj,
);
return status;
}
- 'this.rmqGlobalService.notify(exchange, routingKey, options)' - asynchronous request that returns the message status
- exchange - exchange name
- routingKey - routing key
- options - standard publish options (optional)
If you have defined globalBroker
in forRoot
, you will have access to the RPC method send
,otherwise, you will catch an error when calling it.
async sendMessage(obj: ISend) {
const message = await this.rmqGlobalService.send<ISend, IReply>(
'exchange name',
'user.rpc',
obj,
);
return message;
}
-
'this.rmqGlobalService.send<ISend, IReply>(exchange, routingKey, options)' - asynchronous request.
- exchange - exchange name
- routingKey - routing key
- options - standard publish options (optional)
- timeout - if supplied, the message will have its own timeout.(custom parameter)
Unlike the standard sendToQueue method from amqlib, this asynchronous method differs in that messages go through Serialization before being sent.
async sendToQueue(queue: string, obj:ISend) {
const status = await this.rmqGlobalService.sendToQueue<ISend>(queue, obj);
return status;
}
Return either true, meaning βkeep sendingβ, or false, meaning βplease wait for a βdrainβ eventβ.See Flow control
- 'this.rmqGlobalService.sendToQueue()' - asynchronous function
- queue - queue name
If you want access to the standard RabbitMQ channel, you always have access to async getter this.rmqGlobalService.channel
and all its methods See.
async getChannel() {
const channel: Channel | ConfirmChannel = await this.rmqGlobalService.channel;
return channel;
}
By default, messages are parsed using the JSON.parse method when they are received and converted to a string using JSON.stringify when they are published. If you want to change this behavior you can use your own parsers.
const extendedOptions: IRMQExtendedOptions = {
typeChannel: TypeChannel.CONFIR_CHANNEL,
globalBroker: {
serDes: {
deserialize: (message: Buffer): any => OtherDeserializer(message),
serialize: (message: any): Buffer => OtherSerialize(message),
},
//...
},
//...
};
The deserialize
method is responsible for converting a message from its raw format (typically a Buffer in Node.js) into a JavaScript object or another suitable format for processing.
The serialize
method is responsible for converting a JavaScript object or any other data format into a raw format (typically a Buffer) suitable for transmission over RabbitMQ.
In this case, it will be applied to all methods of RmqGlobalService
.
The forFeature
and forFeatureAsync
methods allow configuring RabbitMQ parameters at the specific module level in your NestJS application. This is useful for setting up different exchanges, queues, and other RabbitMQ parameters specific to that module.
To use forFeature in your module, import RmqModule
and configure the parameters in the forFeature
method.
@Module({
imports: [
RmqModule.forFeature({
exchange: {
exchange: 'exchange_name',
type: 'topic',
options: {
durable: true,
autoDelete: true,
},
},
queue: {
queue: 'queue',
options: { durable: true },
consumeOptions: { noAck: false },
},
replyTo: {
queue: '',
options: { exclusive: true },
consumeOptions: { noAck: true },
},
}),
],
providers: [MyService, MyRmqEvents],
})
export class MyModule {}
RmqModule.forFeatureAsync({
inject: [],
imports: [],
useFactory: async () => ({
exchange,
queue,
replyTo,
//...
}),
interceptors: [],
//..
});
useFactory should return an object of type IModuleBroker
, and serdes
, interceptors
, and middlewares
come after useFactory
, not inside it, because these parameters are needed to extend the functionality. That's why they come after it.
In the `forFeature(ModuleBroker)` method, you need to pass an object of type `IModuleBroker`:
- exchange - exchange parameters
- exchange - The name of the exchange, which should be unique.
- type - exchange type (Currently only the topic is supported)
- options - The default queue parameters from
Options.AssertExchange
.(optional) See Channel.assertExchange.
- queue - An object containing the queue parameters. (optional)
- queue - The name of the queue, which should be unique.
- options - The default queue parameters from
Options.AssertQueue
. See Channel AssertQueue. - consumeOptions - configure the consumer with a callback that will be invoked with each message. See Channel Consume
- replyTo - needed for setting up the
replyTo
queue (optional)- queue - The name of the queue, which should be unique.
- options - The default queue parameters from
Options.AssertQueue
. See Channel AssertQueue. - consumeOptions - configure the consumer with a callback that will be invoked with each message. See Channel Consume
- errorHandler (class) - custom error handler for dealing with errors from replies that extends
RMQErrorHandler
.
- serviceName - The name of your service. (optional)
- messageTimeout - The timeout for waiting for a response, with a default value of 40000ms. (optional)
In @diy0r/nestjs-rabbitmq
, requests are processed through the @MessageRoute
and @MessageNonRoute
decorators. You need to declare the queue
to use these decorators.
@Injectable()
export class MyRmqEvents {
constructor(private readonly rmqService: RmqService) {}
@MessageRoute('text.text')
received(obj: any, consumeMessage: ConsumeMessage) {
this.rmqService.ack(consumeMessage); // call if 'consumeOptions: { noAck: false }'
return { message: obj };
}
@MessageRoute('*.*.rpc') // subscribe with a pattern!
receivedTopic(obj: any, consumeMessage: ConsumeMessage) {
return { message: obj };
}
@MessageRoute('rpc.#') // subscribe with a pattern!
receivedTopicPattern(obj: any, consumeMessage: ConsumeMessage) {
//...
return { message: obj };
}
@MessageRoute('*.rpc.mix.#') //subscribe with a mix pattern!
recivedMixTopic(obj: any, consumeMessage: ConsumeMessage) {
return { message: obj };
}
@MessageRoute('notify.rpc')
receivedTopicNotify(obj: any, consumeMessage: ConsumeMessage) {
this.rmqService.ack(consumeMessage);
//...
}
@MessageNonRoute()
receivedNonRoute(obj: any, consumeMessage: ConsumeMessage) {
this.rmqService.ack(consumeMessage);
return { message: obj };
}
}
@MessageRoute(RoutingKey: string)
method(message: ISend, consumeMessage: ConsumeMessage) {}
message
- what the listener receivesconsumeMessage
- more information from the message (not just content) from amqlib
The @MessageRoute decorator will automatically bind the queue and RoutingKey to the exchange specified in the forFeature method, and this routing key will only be visible inside the specific module (e.g., MyModule
). Just like magic!
In some cases, when a non-processable message arrives in the queue, it will stay there forever unless it is processed. In such cases, the @MessageNonRoute
decorator can help. The method bound to this decorator will be called only when the request cannot find a bound routing key to the exchange.
If you want to manually acknowledge messages, set consumeOptions: { noAck: false }
in the queue. If you want the library to automatically acknowledge messages, set noAck: true
, so you don't have to explicitly call ack
.
Middleware allows you to execute additional logic before message processing.You can declare middleware at various levels, including the module level, provider level, and specific endpoint level. It's important to note that the middleware runs before any interceptors.
To declare an middleware, implement the IRmqMiddleware
abstract class.
export class EventMiddleware implements IRmqMiddleware {
async use(message: ConsumeMessage, content: any): Promise<void | any> {
content.args = '1,2,...';
}
}
or you can directly return:
export class EventMiddleware implements IRmqMiddleware {
async use(message: ConsumeMessage, content: any): Promise<void | any> {
if (content.args !== '2,1') return { status: 'error' };
}
}
If nothing is returned, the execution of the middleware chain will continue.
EventMiddleware implements the use
method, which takes two parameters: message
, which is the standard ConsumeMessage
from amqplib, and content
, which is your message that has passed through deserialization.
At the module level, you can declare middleware that will apply to all message handlers within that module.
@Module({
imports: [
RmqModule.forFeature({
exchange: {
/* exchange parameters */
},
queue: {
/* queue parameters */
},
middlewares: [EventMiddleware, OtherMiddleware], // Middleware is applied to all handlers in the module
}),
],
providers: [MyService, MyRmqEvents],
})
export class MyModule {}
At the provider level, you can declare middleware that will apply to all message handlers within that provider.
@Injectable()
@RmqMiddleware(EventMiddleware) // Middleware is applied to all methods in the MyRmqEvents
export class MyRmqEvents {
@MessageRoute('text.text')
received(obj: any, consumeMessage: ConsumeMessage) {
return { message: obj };
}
// Class methods ...
}
Middleware can be applied at the specific endpoint (method) level.
@Injectable()
export class MyRmqEvents {
@MessageRoute('text.text')
@RmqMiddleware(EventMiddleware) // Middleware is applied only to this method
handleTextText(obj: any, consumeMessage: ConsumeMessage) {
return { message: obj };
}
}
Initially, middleware at the module level will be invoked, then at the class level, followed by the endpoint level.
Interceptors allow you to intercept and modify the processing of a message both before and after it is processed by a handler, just like Interceptors from Nest.js. You can declare interceptors at various levels, including the module level, the provider level, and the specific endpoint level. It's important to note that interceptors are executed after the middleware!
To declare an interceptor, implement the abstract class IRmqInterceptor
with a decorator @Injectable
.
@Injectable()
export class EventInterceptor implements IRmqInterceptor {
constructor(private readonly rmqSerivce: RmqService) {}
async intercept(
message: ConsumeMessage,
content: any,
): Promise<ReverseFunction> {
console.log('Before...');
return async (content: any, message: ConsumeMessage) => {
console.log(`After... ${Date.now() - now}ms`);
};
}
}
EventInterceptor implements the intercept
method, which takes two parameters: message
, which is the standard ConsumeMessage
from amqplib, and content
, which is your message that has passed through deserialization. The intercept
method returns an asynchronous function that will be invoked after processing.
At the module level, you can declare interceptors that will apply to all message handlers within that module.
@Module({
imports: [
RmqModule.forFeature({
exchange: {
/* exchange parameters */
},
queue: {
/* queue parameters */
},
interceptors: [EventInterceptor, OtherInterceptor], // Interceptors are applied to all handlers in the module!
}),
],
providers: [MyService, MyRmqEvents],
})
export class MyModule {}
The interceptor
will be invoked from left to right.
At the provider level, you can declare interceptors that will apply to all message handlers within that provider.
@Injectable()
@RmqInterceptor(EventInterceptor) // Interceptor is applied to all methods in the MyRmqEvents
export class MyRmqEvents {
@MessageRoute('text.text')
received(obj: any, consumeMessage: ConsumeMessage) {
return { message: obj };
}
// Class methods ...
}
At the specific endpoint (method) level, allowing you to configure interceptor for each endpoint individually.
@Injectable()
export class MyRmqEvents {
@MessageRoute('text.text')
@RmqInterceptor(EventInterceptor) // Interceptor is applied only to this method
handleTextText(obj: any, consumeMessage: ConsumeMessage) {
return { message: obj };
}
}
Initially, interceptors at the module level will be invoked, then at the class level, followed by the endpoint level, and then in reverse order.
If you just want to send messages and receive responses, simply specify the exchange and replyTo, and you will have access to all methods from RmqService as shown below. It is mainly useful when you are writing an API gateway and want to maintain modularity.
@Injectable()
export class MyService {
constructor(private readonly rmqService: RmqService) {}
async sendMessage(obj: ISend) {
const message = await this.rmqGlobalService.send<ISend, IReply>(
'user.rpc',
obj,
);
return message;
}
async sendNotifyService(obj: Record<string, any>) {
const message = await this.rmqService.notify<ISend>('notify.rpc', obj);
return message;
}
-
'this.rmqService.send<ISend, IReply>(routingKey, options)' - asynchronous request
- routingKey - routing key
- options - standard publish options (optional) - timeout - if supplied, the message will have its own timeout.(custom parameter)
To use send
, you must have replyTo
declared in forFeature.
- 'this.rmqService.notify(routingKey, options)' - asynchronous request that returns the message status
- routingKey - routing key
- options - standard publish options (optional)
Note that in these examples we do not specify the exchange in the requests because the send and notify methods use the exchange specified on MyModule in method forFeature
.
const isConnected = this.rmqService.healthCheck();
'this.rmqService.healthCheck()' - Check if you are still connected to RMQ.
this.rmqService.ack(message);
- 'this.rmqService.ack(message:ConsumeMessage)' - Acknowledge the given message, or all messages up to and including the given message. See
this.rmqService.nack(message);
- 'this.rmqService.nack(message:ConsumeMessage)' - Reject a message. This instructs the server to either requeue the message or throw it away (which may result in it being dead-lettered). See
The serDes
allows for both the deserialization and serialization of messages.
It's important to note the execution order, as this is performed only once.
When a message reaches an endpoint, the framework checks for the @SerDes
decorator on method(endpoint). If found, the methods specified there are used. If not, it searches for the decorator in the parent class(Provider). If the decorator is still not found, it falls back to using the serDes
object from your module. If none of these are defined, the default serialization and deserialization are used.
At the module level, the SerDes configuration applies to all message handlers within that module. This is useful for ensuring consistent message processing across the entire module.
@Module({
imports: [
RmqModule.forFeature({
exchange: {
/* exchange parameters */
},
queue: {
/* queue parameters */
},
serDes: {
deserialize: (message: Buffer): any => OtherDeserializer(message),
serialize: (message: any): Buffer => OtherSerializer(message),
}, // serDes is applied to all handlers in the module!
}),
],
})
export class MyModule {}
You can also configure SerDes at the provider level, applying it to all message handlers within a specific class(provider).
@Injectable()
@SerDes({
deserialize: (message: Buffer): any => OtherDeserializer(message),
serialize: (message: any): Buffer => OtherSerializer(message),
}) // SerDes is applied to all methods in the MyRmqEvents
export class MyRmqEvents {
@MessageRoute('text.text')
received(obj: any, consumeMessage: ConsumeMessage) {
return { message: obj };
}
// Class methods ...
}
At the specific endpoint (method) level, allowing you to configure SerDes for each endpoint individually.
@Injectable()
export class MyRmqEvents {
@MessageRoute('text.text')
@SerDes({
deserialize: (message: Buffer): any => OtherDeserializer(message),
serialize: (message: any): Buffer => OtherSerializer(message),
}) // SerDes is applied only to this method
handleTextText(obj: any, consumeMessage: ConsumeMessage) {
return { message: obj };
}
}
If you want to use a custom error handler to handle errors in responses, use the errorHandler property in the replyTo
parameters in forRoot
and forFeature
, and pass a class that extends RMQErrorHandler
If you call rmqGlobalService.send
, the errorHandler
from forRoot
will be used, and if you call rmqService.send
, the errorHandler
from forFeature
will be used.
export class MyRMQErrorHandler extends RMQErrorHandler {
public static handle(
headers: IRmqErrorHeaders | MessagePropertyHeaders,
): Error | RMQError {
// do something ...
return new RMQError(
headers['-x-error'],
headers['-x-service'],
headers['-x-status-code'],
headers['-x-host'],
headers['-x-date'],
);
}
}
async sendMessage (obj: ISend){
try {
const message = await this.rmqGlobalService.send<ISend, IReply>(
'user.rpc',
obj,
);
} catch (error: RMQError) {
//...
}
};
@MessageRoute()
handleTextText(obj: ISend, consumeMessage: ConsumeMessage) {
throw new RMQError('Error message', 2);
//or
throw new Error('Error message');
}