Skip to content

Commit

Permalink
google pub/sub updates and fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
jaredwray committed Sep 12, 2024
1 parent db50a0d commit f9a1565
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 33 deletions.
22 changes: 21 additions & 1 deletion src/airhorn.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@ import { TemplateService } from './template-service.js';
import { ProviderService } from './provider-service.js';
import { AirhornProviderType } from './provider-type.js';
import { type AirhornSubscription } from './subscription.js';
import { AirhornStore, type AirhornStoreProvider, type CreateAirhornSubscription } from './store.js';
import { type AirhornNotification } from './notification.js';
import { AirhornQueue, type AirhornQueueProvider } from './queue.js';
import {
AirhornStore, type CreateAirhornNotification, type AirhornStoreProvider, type CreateAirhornSubscription,
} from './store.js';

export type AirhornOptions = {
TEMPLATE_PATH?: string;
Expand All @@ -15,6 +19,7 @@ export type AirhornOptions = {
AWS_SNS_REGION?: string;
FIREBASE_CERT?: string;
STORE_PROVIDER?: AirhornStoreProvider;
QUEUE_PROVIDER?: AirhornQueueProvider;
};

export class Airhorn {
Expand All @@ -26,6 +31,7 @@ export class Airhorn {
private readonly _templateService = new TemplateService();
private readonly _providerService = new ProviderService();
private readonly _store?: AirhornStore;
private readonly _queue?: AirhornQueue;

constructor(options?: AirhornOptions) {
if (options) {
Expand All @@ -37,6 +43,10 @@ export class Airhorn {
if (this.options.STORE_PROVIDER) {
this._store = new AirhornStore(this.options.STORE_PROVIDER);
}

if (this.options.QUEUE_PROVIDER) {
this._queue = new AirhornQueue(this.options.QUEUE_PROVIDER);
}

Check warning on line 49 in src/airhorn.ts

View check run for this annotation

Codecov / codecov/patch

src/airhorn.ts#L48-L49

Added lines #L48 - L49 were not covered by tests
}

public get templates(): TemplateService {
Expand Down Expand Up @@ -135,6 +145,16 @@ export class Airhorn {

throw new Error('Airhorn store not available');
}

public async publishNotification(notification: CreateAirhornNotification): Promise<void> {
if (this._queue && this._store) {
const updatedNotification = await this._store.createNotification(notification);

Check warning on line 151 in src/airhorn.ts

View check run for this annotation

Codecov / codecov/patch

src/airhorn.ts#L150-L151

Added lines #L150 - L151 were not covered by tests

await this._queue.publishNotification(updatedNotification);
}

Check warning on line 154 in src/airhorn.ts

View check run for this annotation

Codecov / codecov/patch

src/airhorn.ts#L153-L154

Added lines #L153 - L154 were not covered by tests

throw new Error('Airhorn queue and store needed for notifications');
}

Check warning on line 157 in src/airhorn.ts

View check run for this annotation

Codecov / codecov/patch

src/airhorn.ts#L156-L157

Added lines #L156 - L157 were not covered by tests
}

export { AirhornProviderType } from './provider-type.js';
Expand Down
1 change: 1 addition & 0 deletions src/notification.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ export enum AirhornNotificationStatus {
export type AirhornNotification = {
id: string;
to: string;
from: string;
subscriptionId: string;
externalId?: string;
providerType: AirhornProviderType;
Expand Down
41 changes: 37 additions & 4 deletions src/queue-providers/google-pubsub.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/* eslint-disable @typescript-eslint/no-empty-function,@typescript-eslint/class-literal-property-style */
/* eslint-disable @typescript-eslint/class-literal-property-style */
import { PubSub } from '@google-cloud/pubsub';
import { type AirhornNotification } from '../notification.js';
import { type AirhornQueueProvider } from '../queue.js';
Expand Down Expand Up @@ -55,11 +55,44 @@ export class GooglePubSubQueue implements AirhornQueueProvider {
return this._pubsub.topic(this.topicName);
}

async publishNotification(notification: AirhornNotification): Promise<void> {}
async publish(notification: AirhornNotification): Promise<void> {
await this.createTopic();
const topic = await this.getTopic();
const data = JSON.stringify(notification);
await topic.publishMessage({ data });
}

Check warning on line 63 in src/queue-providers/google-pubsub.ts

View check run for this annotation

Codecov / codecov/patch

src/queue-providers/google-pubsub.ts#L59-L63

Added lines #L59 - L63 were not covered by tests

async subscribe(callback: (notification: AirhornNotification, acknowledge: () => void) => void): Promise<void> {
await this.createTopic();
const topic = await this.getTopic();
const subscriptionName = this._topicName + '-subscription';
const subscription = topic.subscription(subscriptionName);

Check warning on line 69 in src/queue-providers/google-pubsub.ts

View check run for this annotation

Codecov / codecov/patch

src/queue-providers/google-pubsub.ts#L66-L69

Added lines #L66 - L69 were not covered by tests

async acknowledgeNotification(notification: AirhornNotification): Promise<void> {}
const [exists] = await subscription.exists();
if (!exists) {
await topic.createSubscription(subscriptionName);
}

Check warning on line 74 in src/queue-providers/google-pubsub.ts

View check run for this annotation

Codecov / codecov/patch

src/queue-providers/google-pubsub.ts#L71-L74

Added lines #L71 - L74 were not covered by tests

async listenForNotifications(queueName: string, callback: (notification: AirhornNotification) => void): Promise<void> {}
subscription.on('message', message => {
const notification = JSON.parse(message.data.toString());
const acknowledge = () => {
message.ack();
};

Check warning on line 80 in src/queue-providers/google-pubsub.ts

View check run for this annotation

Codecov / codecov/patch

src/queue-providers/google-pubsub.ts#L76-L80

Added lines #L76 - L80 were not covered by tests

callback(notification as AirhornNotification, acknowledge);
});
}

Check warning on line 84 in src/queue-providers/google-pubsub.ts

View check run for this annotation

Codecov / codecov/patch

src/queue-providers/google-pubsub.ts#L82-L84

Added lines #L82 - L84 were not covered by tests

async clearSubscription(): Promise<void> {
const topic = await this.getTopic();
const subscriptionName = this._topicName + '-subscription';
const subscription = topic.subscription(subscriptionName);

Check warning on line 89 in src/queue-providers/google-pubsub.ts

View check run for this annotation

Codecov / codecov/patch

src/queue-providers/google-pubsub.ts#L87-L89

Added lines #L87 - L89 were not covered by tests

const [exists] = await subscription.exists();
if (exists) {
await subscription.delete();
}
}

Check warning on line 95 in src/queue-providers/google-pubsub.ts

View check run for this annotation

Codecov / codecov/patch

src/queue-providers/google-pubsub.ts#L91-L95

Added lines #L91 - L95 were not covered by tests

async createTopic(): Promise<void> {
if (this._topicCreated) {
Expand Down
23 changes: 7 additions & 16 deletions src/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,22 @@ import { type AirhornNotification } from './notification.js';
export type AirhornQueueProvider = {
name: string;
uri: string;
publishNotification(notification: AirhornNotification): Promise<void>;
acknowledgeNotification(notification: AirhornNotification): Promise<void>;
listenForNotifications(queueName: string, callback: (notification: AirhornNotification) => void): Promise<void>;
};

export type AirhornQueueOptions = {
provider: AirhornQueueProvider;
publish(notification: AirhornNotification): Promise<void>;
subscribe(callback: (notification: AirhornNotification, acknowledge: () => void) => void): Promise<void>;
};

export class AirhornQueue {
public provider: AirhornQueueProvider;

constructor(options: AirhornQueueOptions) {
this.provider = options.provider;
constructor(provider: AirhornQueueProvider) {
this.provider = provider;
}

public async publishNotification(notification: AirhornNotification) {
await this.provider.publishNotification(notification);
}

public async acknowledgeNotification(notification: AirhornNotification) {
await this.provider.acknowledgeNotification(notification);
await this.provider.publish(notification);
}

public async listenForNotifications(queueName: string, callback: (notification: AirhornNotification) => void) {
await this.provider.listenForNotifications(queueName, callback);
public async subscribe(callback: (notification: AirhornNotification, acknowledge: () => void) => void) {
await this.provider.subscribe(callback);
}
}
1 change: 1 addition & 0 deletions src/store-providers/mongo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ export class MongoStoreProvider implements AirhornStoreProvider {
const notification: AirhornNotification = {
id: document._id,
to: document.to,
from: document.from,
subscriptionId: document.subscriptionId,
externalId: document.externalId,
providerType: document.providerType as AirhornProviderType,
Expand Down
20 changes: 8 additions & 12 deletions test/queue.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
/* eslint-disable @typescript-eslint/no-empty-function */
import { subscribe } from 'node:diagnostics_channel';
import {describe, test, expect} from 'vitest';
import {AirhornQueue} from '../src/queue.js';
import { AirhornNotificationStatus, type AirhornNotification } from '../src/notification.js';
Expand All @@ -7,14 +8,14 @@ import { AirhornProviderType } from '../src/provider-type.js';
const providerMock = {
name: 'mock',
uri: 'mock://localhost',
async publishNotification(notification: AirhornNotification) {},
async acknowledgeNotification(notification: AirhornNotification) {},
async listenForNotifications(queueName: string, callback: (notification: AirhornNotification) => void) {},
async publish(notification: AirhornNotification) {},
async subscribe(callback: (notification: AirhornNotification, acknowledge: () => void) => void) {},
};

const notificationMock: AirhornNotification = {
id: '1',
to: '1',
from: '1',
subscriptionId: '1',
templateName: '1',
providerName: '1',
Expand All @@ -27,23 +28,18 @@ const notificationMock: AirhornNotification = {

describe('AirhornQueue', async () => {
test('should create a new instance of AirhornQueue', async () => {
const queue = new AirhornQueue({ provider: providerMock });
const queue = new AirhornQueue(providerMock);
expect(queue.provider).toEqual(providerMock);
expect(queue.provider.name).toEqual('mock');
});

test('should publish a notification', async () => {
const queue = new AirhornQueue({ provider: providerMock });
const queue = new AirhornQueue(providerMock);
await queue.publishNotification(notificationMock);
});

test('should acknowledge a notification', async () => {
const queue = new AirhornQueue({ provider: providerMock });
await queue.acknowledgeNotification(notificationMock);
});

test('should listen for notifications', async () => {
const queue = new AirhornQueue({ provider: providerMock });
await queue.listenForNotifications('queueName', (notification: AirhornNotification) => {});
const queue = new AirhornQueue(providerMock);
await queue.subscribe((notification, acknowledge) => {});
});
});

0 comments on commit f9a1565

Please sign in to comment.