Skip to content

Commit

Permalink
pushing as it doesnt work
Browse files Browse the repository at this point in the history
  • Loading branch information
jaredwray committed Sep 13, 2024
1 parent f9a1565 commit 48ca0d6
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 16 deletions.
48 changes: 32 additions & 16 deletions src/queue-providers/google-pubsub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ export class GooglePubSubQueue implements AirhornQueueProvider {
private readonly _name: string;
private readonly _uri: string;
private readonly _topicName: string;
private readonly _subscriptionName: string;
private readonly _pubsub: PubSub;
private _topicCreated = false;
private readonly _projectId = 'airhorn-project';
Expand All @@ -15,6 +16,7 @@ export class GooglePubSubQueue implements AirhornQueueProvider {
this._name = 'google-pubsub';
this._uri = 'google-pubsub://localhost';
this._topicName = 'airhorn-delivery-queue';
this._subscriptionName = this._topicName + '-subscription';

this._pubsub = new PubSub({ projectId: this._projectId });
}
Expand Down Expand Up @@ -58,39 +60,53 @@ export class GooglePubSubQueue implements AirhornQueueProvider {
async publish(notification: AirhornNotification): Promise<void> {
await this.createTopic();
const topic = await this.getTopic();
const data = JSON.stringify(notification);
await topic.publishMessage({ data });
const data = Buffer.from(JSON.stringify({ message: 'Hello, Pub/Sub emulator!' }));
const publishId = await topic.publishMessage({ data });
console.log(`Message published with ID: ${publishId} to topic: ${topic.name}`);
}

async subscribe(callback: (notification: AirhornNotification, acknowledge: () => void) => void): Promise<void> {
await this.createTopic();
const topic = await this.getTopic();
const subscriptionName = this._topicName + '-subscription';
const subscription = topic.subscription(subscriptionName);
let subscription = topic.subscription(this._subscriptionName);

const [exists] = await subscription.exists();
if (!exists) {
await topic.createSubscription(subscriptionName);
await topic.createSubscription(this._subscriptionName, {
retryPolicy: {
minimumBackoff: {
seconds: 60,
},
maximumBackoff: {
seconds: 600,
}
}
});

subscription = topic.subscription(this._subscriptionName);
}

subscription.on('message', message => {
const notification = JSON.parse(message.data.toString());
const acknowledge = () => {
message.ack();
};

callback(notification as AirhornNotification, acknowledge);
});
const listeners = subscription.listenerCount('message');
if(listeners === 0) {
subscription.on('message', (message) => {
const airhornNotification = JSON.parse(message.data.toString());
console.log('Received message:', message);
const acknowledge = () => {
message.ack();
}

callback(airhornNotification, acknowledge);

Check failure on line 98 in src/queue-providers/google-pubsub.ts

View workflow job for this annotation

GitHub Actions / test-coverage (20)

Unsafe argument of type `any` assigned to a parameter of type `AirhornNotification`.

Check failure on line 98 in src/queue-providers/google-pubsub.ts

View workflow job for this annotation

GitHub Actions / setup-test (20)

Unsafe argument of type `any` assigned to a parameter of type `AirhornNotification`.
});
}
}

async clearSubscription(): Promise<void> {
const topic = await this.getTopic();
const subscriptionName = this._topicName + '-subscription';
const subscription = topic.subscription(subscriptionName);
const subscription = topic.subscription(this._subscriptionName);

const [exists] = await subscription.exists();
if (exists) {
await subscription.delete();
await subscription.close();
}
}

Expand Down
5 changes: 5 additions & 0 deletions src/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ export type AirhornQueueProvider = {
uri: string;
publish(notification: AirhornNotification): Promise<void>;
subscribe(callback: (notification: AirhornNotification, acknowledge: () => void) => void): Promise<void>;
clearSubscription(): Promise<void>;
};

export class AirhornQueue {
Expand All @@ -21,4 +22,8 @@ export class AirhornQueue {
public async subscribe(callback: (notification: AirhornNotification, acknowledge: () => void) => void) {
await this.provider.subscribe(callback);
}

public async clearSubscription() {
await this.provider.clearSubscription();
}
}
38 changes: 38 additions & 0 deletions test/queue-providers/google-pubsub.test.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,29 @@
import {describe, test, expect} from 'vitest';

Check failure on line 1 in test/queue-providers/google-pubsub.test.ts

View workflow job for this annotation

GitHub Actions / test-coverage (20)

'constants' module was deprecated since v6.3.0. Use 'constants' property of each module instead.

Check failure on line 1 in test/queue-providers/google-pubsub.test.ts

View workflow job for this annotation

GitHub Actions / setup-test (20)

'constants' module was deprecated since v6.3.0. Use 'constants' property of each module instead.
import {GooglePubSubQueue} from '../../src/queue-providers/google-pubsub.js';
import { AirhornNotification, AirhornNotificationStatus } from '../../src/notification.js';
import { AirhornProviderType } from '../../src/provider-type.js';
import exp from 'constants';


// eslint-disable-next-line n/prefer-global/process
process.env.PUBSUB_EMULATOR_HOST = 'localhost:8085';

Check failure on line 10 in test/queue-providers/google-pubsub.test.ts

View workflow job for this annotation

GitHub Actions / test-coverage (20)

Return values from promise executor functions cannot be read.

Check failure on line 10 in test/queue-providers/google-pubsub.test.ts

View workflow job for this annotation

GitHub Actions / setup-test (20)

Return values from promise executor functions cannot be read.
const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms));

const notificationMock: AirhornNotification = {
id: '1',
to: '1',
from: '1',
subscriptionId: '1',
templateName: '1',
providerName: '1',
providerResponse: [],
providerType: AirhornProviderType.SMTP,
status: AirhornNotificationStatus.QUEUED,
createdAt: new Date(),
modifiedAt: new Date(),
};

describe('GooglePubSubQueue', async () => {
test('should create a new instance of GooglePubSubQueue', async () => {
const queue = new GooglePubSubQueue();
Expand Down Expand Up @@ -36,4 +56,22 @@ describe('GooglePubSubQueue', async () => {
const topic = await queue.getTopic();
expect(topic.name).toEqual('projects/airhorn-project/topics/airhorn-delivery-queue');
});

test('publish and subscribe to a message', async () => {
const queue = new GooglePubSubQueue();
await queue.createTopic();
let itWorked = false;
const onMessage = (notification: AirhornNotification, acknowledge: () => void) => {
console.log('onMessage');
expect(notification).toEqual({message: 'Hello, Pub/Sub emulator!'});
acknowledge();
itWorked = true;
};
await queue.subscribe(onMessage);
await queue.publish(notificationMock);
await queue.publish(notificationMock);
await sleep(1000);
expect(itWorked).toEqual(true);
queue.clearSubscription();

Check failure on line 75 in test/queue-providers/google-pubsub.test.ts

View workflow job for this annotation

GitHub Actions / test-coverage (20)

Promises must be awaited, end with a call to .catch, end with a call to .then with a rejection handler or be explicitly marked as ignored with the `void` operator.

Check failure on line 75 in test/queue-providers/google-pubsub.test.ts

View workflow job for this annotation

GitHub Actions / setup-test (20)

Promises must be awaited, end with a call to .catch, end with a call to .then with a rejection handler or be explicitly marked as ignored with the `void` operator.
});
});
1 change: 1 addition & 0 deletions test/queue.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const providerMock = {
uri: 'mock://localhost',
async publish(notification: AirhornNotification) {},
async subscribe(callback: (notification: AirhornNotification, acknowledge: () => void) => void) {},
async clearSubscription() {},
};

const notificationMock: AirhornNotification = {
Expand Down

0 comments on commit 48ca0d6

Please sign in to comment.