-
Notifications
You must be signed in to change notification settings - Fork 54
/
publisher.js
67 lines (59 loc) · 2.11 KB
/
publisher.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
'use strict';
const amqplib = require('amqplib/callback_api');
const config = require('./config');
// Create connection to AMQP server
amqplib.connect(config.amqp, (err, connection) => {
if (err) {
console.error(err.stack);
return process.exit(1);
}
// Create channel
connection.createChannel((err, channel) => {
if (err) {
console.error(err.stack);
return process.exit(1);
}
// Ensure queue for messages
channel.assertQueue(config.queue, {
// Ensure that the queue is not deleted when server restarts
durable: true
}, err => {
if (err) {
console.error(err.stack);
return process.exit(1);
}
// Create a function to send objects to the queue
// Javascript object is converted to JSON and then into a Buffer
let sender = (content, next) => {
let sent = channel.sendToQueue(config.queue, Buffer.from(JSON.stringify(content)), {
// Store queued elements on disk
persistent: true,
contentType: 'application/json'
});
if (sent) {
return next();
} else {
channel.once('drain', () => next());
}
};
// push 100 messages to queue
let sent = 0;
let sendNext = () => {
if (sent >= 100) {
console.log('All messages sent!');
// Close connection to AMQP server
// We need to call channel.close first, otherwise pending
// messages are not written to the queue
return channel.close(() => connection.close());
}
sent++;
sender({
to: 'recipient@example.com',
subject: 'Test message #' + sent,
text: 'hello world!'
}, sendNext);
};
sendNext();
});
});
});