Publisher/Subscriber pattern plus messaging queue and notifications written in Typescript.
npm install pubsub-ts
import {PubSub} from "pubsub-ts";
let subscriber: PubSub.Subscriber = new PubSub.Subscriber();
subscriber.on('postMsg', (notification: PubSub.Notification) => {
this.msg = notification.body;
});
subscriber.off('postMsg');
subscriber.start();
subscriber.pause();
import {PubSub} from "pubsub-ts";
let publisher: PubSub.Publisher = new PubSub.Publisher();
publisher.add(subscriber);
publisher.delete(subscriber);
publisher.notify('postMsg', 'a message');
You can also prioritize a Notification.
publisher.notifyPriority('postMsg', 'priority message');
Or send a Notification as urgent to be delivered immediately.
publisher.notifyUrgent('postMsg', 'urgent message');
Subscribers are equipped with a queue of notifications. You can control when the Subscriber should start() receiving notifications that were posted to its queue as well as pause() the queue.
This allows to preserve the order of notifications until the subscriber is ready to receive them. For example when waiting for an async operation to complete.
When a Subscriber is in the paused state, notifications are pushed into the queue for later processing.
When a Subscriber is in the started state, the queue will resume processing notifications.
NOTE: Subscribers are paused by default. You must invoke start() to enable processing from the queue. This is done so you have explicit control as to when notifications should start being posted to the subscriber.
import {PubSub} from "pubsub-ts";
class TransactionAgent extends PubSub.Subscriber {
constructor() {
this.on('connectionChange', this.onConnection);
this.on('withdrawal', this.onWithdrawal);
}
public onConnection(notification: PubSub.Notification): void {
if (notification.body.status === 'online') {
// Start receiving notifications
this.start();
} else {
// Pause receiving notifications
this.pause();
}
}
public onWithdrawal(notification: PubSub.Notification): void {
let amount: number = notification.body;
console.log('[TransactionAgent]', amount);
}
}
class ATM extends PubSub.Publisher {
constructor() {
this.add(new TransactionAgent());
}
public withdrawAmount(amount: number): void {
this.notify('withdrawal', amount);
}
public connectionChange(status: string): void {
this.notify('connectionChange', {status: status});
}
}
let atm: ATM = new ATM();
// 1. ATM not connected. Offline.
// 2. User requests withdrawal.
atm.withdrawAmount(1500);
// 3. ATM back online. Connection ready.
atm.connectionChange('online');
// 4. Transaction agent starts processing from the queue
// console log:
[TransactionAgent], 1500