Skip to content

Commit

Permalink
feat(Messaging): Subscribe to the topic and receive payload in the ca…
Browse files Browse the repository at this point in the history
…llback
  • Loading branch information
hchristine committed May 31, 2024
1 parent cc52276 commit 84fa478
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 6 deletions.
13 changes: 9 additions & 4 deletions example/client.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
import Famcache from '../src';

const cache = new Famcache({
const client = new Famcache({
host: 'localhost',
port: 3577,
});

cache
client
.connect()
.then(() => {
console.log('Connected!');

cache.set('key', '10', 3000)
client.subscribe('topic1', (data) => {
console.log('topic1 received data: ', data);
});

client.set('key', '10', 3000)
.then(() => {
return cache.get('key');
return client.get('key');
})
.then((data) => {
console.log('Received', data);
Expand All @@ -21,3 +25,4 @@ cache
.catch((e) => {
console.log('Failed to connect');
});

33 changes: 31 additions & 2 deletions src/famcache.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
import { Socket } from 'net';
import { randomUUID } from 'crypto';
import type { ConnectionParams } from './params';
import type { QueueResolver } from './types';
import { CacheQuery, get, set, del, publish, unsubscribe } from './transport';
import type { QueueResolver, SubscribeCallback } from './types';
import { CacheQuery, get, set, del, publish, unsubscribe, subscribe, Messaging } from './transport';

class Famcache {
private socket: Socket;
private params: ConnectionParams;
private queue: Map<string, QueueResolver>;
private listeners: Map<string, SubscribeCallback[]>

constructor(params: ConnectionParams) {
this.socket = new Socket();
this.queue = new Map();
this.listeners = new Map();

this.params = params;
}
Expand All @@ -23,6 +25,18 @@ class Famcache {
private listen() {
this.socket.on('data', (data) => {
const payload = data.toString();

if (Messaging.isMessagingEvent(payload)) {
const message = Messaging.fromEvent(payload);

if (!this.listeners.has(message.topic)) {
return;
}

this.listeners.get(message.topic)?.forEach((callback) => callback(message.data));
return;
}

const query = CacheQuery.fromString(payload);

const resolver = this.queue.get(query.id);
Expand Down Expand Up @@ -94,11 +108,26 @@ class Famcache {
this.socket.write(publish(queryId, topic, data));
}

subscribe(topic: string, callback: SubscribeCallback) {
const queryId = this.genId();

this.socket.write(subscribe(queryId, topic))

const listeners = this.listeners.get(topic);

if (!listeners) {
this.listeners.set(topic, [callback]);
} else {
listeners.push(callback);
}
}

unsubscribe(topic: string) {
const queryId = this.genId();

this.socket.write(unsubscribe(queryId, topic));
}

}

export default Famcache;
1 change: 1 addition & 0 deletions src/transport/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
export * from './cache-query';
export * from './commands';
export * from './messaging';
19 changes: 19 additions & 0 deletions src/transport/messaging.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
export class Messaging {
static isMessagingEvent(event: string): boolean {
return event.startsWith("MESSAGE ");
}

static fromEvent(event: string): Messaging {
const [_, topic, data] = event.split(' ');

Check failure on line 7 in src/transport/messaging.ts

View workflow job for this annotation

GitHub Actions / lint (21.x)

'_' is assigned a value but never used

return new Messaging(topic, data);
}

public topic: string;
public data: string;

constructor(topic: string, data: string) {
this.topic = topic;
this.data = data;
}
}
3 changes: 3 additions & 0 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,6 @@ export type QueueResolver = {
resolve: (value: Optional<string>) => void;
reject: (reason: Optional<string>) => void;
};


export type SubscribeCallback = (data: string) => void;

0 comments on commit 84fa478

Please sign in to comment.