Skip to content

Commit

Permalink
✨ feat: Alert modification enhancements
Browse files Browse the repository at this point in the history
Delete alert API
Add support for lt_price alert type
  • Loading branch information
mtsammy40 committed Aug 10, 2024
1 parent beddd2a commit 4013313
Show file tree
Hide file tree
Showing 9 changed files with 112 additions and 8 deletions.
1 change: 1 addition & 0 deletions .idea/crypto_alerts_api.iml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions .idea/modules.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion src/constants/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ export default {
channels: {
pending_notifications: 'pending-notifications',
update_alert_triggered: 'update_alert_triggered',
add_alert_listener: 'add_alert_listener'
add_alert_listener: 'add_alert_listener',
delete_alert_listener: 'delete_alert_listener'
},
notification_workflow_ids: {
crypto_price_alert: 'crypto-price-alert'
Expand Down
2 changes: 1 addition & 1 deletion src/interfaces/alert.model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import {ObjectId} from "mongodb";
export default interface AlertModel {
_id?: ObjectId | undefined;
pair: string;
type: 'gt_price'
type: 'gt_price' | 'lt_price';
status: 'active' | 'triggered' | 'cancelled';
price: {
current?: number;
Expand Down
2 changes: 1 addition & 1 deletion src/interfaces/create-alert-dto.model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { NotificationChannel } from './alert.model';

export default interface CreateAlertDtoModel {
pair: string;
type: 'gt_price'
type: 'gt_price' | 'lt_price';
price: {
value: number;
current?: number;
Expand Down
27 changes: 25 additions & 2 deletions src/repository/alert-repository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,24 @@ export default class AlertRepository {
try {
return await (await this.client())
.collection('alerts')
.find().toArray();
.find()
.map(doc => {
return {
_id: doc._id,
pair: doc.pair,
type: doc.type,
status: doc.status,
price: doc.price,
notification: doc.notification,
triggerInfo: doc.triggerInfo,
created_at: doc.created_at,
} as AlertModel;
})
.toArray();
} catch (e) {
console.error('Error listing alerts', e);
throw new AppError('Error listing alerts');
}

}

async listActiveAlertSymbols() {
Expand Down Expand Up @@ -102,4 +114,15 @@ export default class AlertRepository {
throw new AppError('Error updating alert');
}
}

async delete(id: string) {
try {
return await (await this.client())
.collection(this._collectionName)
.findOneAndDelete({_id: new ObjectId(id)});
} catch (e) {
console.error('Error deleting alert', e);
throw new AppError('Error deleting alert');
}
}
}
22 changes: 21 additions & 1 deletion src/routes/alerts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,18 @@ const router = express.Router();

type AlertsResponse = {
content: any[]
error?: string
};

router.get<{}, AlertsResponse>('/', (req, res) => {
console.log('GET /routes/v1/alerts', req.query);
res.json({ content: [] });
alertService.list()
.then((alerts) => {
res.json({ content: alerts });
}).catch((e) => {
console.error('Error listing alerts', e);
res.status(500).json({ error: 'Error listing alerts', content: []});
});
});

router.post<CreateAlertDtoModel, {}>('/', (req, res) => {
Expand All @@ -28,4 +35,17 @@ router.post<CreateAlertDtoModel, {}>('/', (req, res) => {
});
});

router.delete<{ id: string }, {}>('/:id', (req, res) => {
console.log('DELETE /routes/v1/alerts/:id', req.params);
alertService.delete(req.params.id)
.then(() => {
console.log('AlertModel deleted');
res.status(204).end();
})
.catch((e: any) => {
console.error('Error deleting alert', e);
res.status(500).json({ error: 'Error deleting alert' });
});
});

export default router;
32 changes: 32 additions & 0 deletions src/services/alerts.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import Ajv from "ajv";
import create_alert_dto_schema from "../schemas/create-alert-dto.schema";
import RedisConfig from "../config/redis.config";
import Constants from "../constants/constants";
import {WithId} from "mongodb";

export default class AlertsService {
_alertsRepository = new AlertRepository();
Expand Down Expand Up @@ -47,6 +48,37 @@ export default class AlertsService {
(await this._publisher).publish(Constants.channels.add_alert_listener, JSON.stringify(alert));
}

async list(): Promise<any[]> {
return new Promise(async (resolve, reject) => {
this._alertsRepository.list()
.then((alerts) => {
if(alerts) {
resolve(alerts);
} else {
reject('No alerts found');
}
})
.catch((e) => reject(e));
});
}

async delete(id: string) {
this._alertsRepository
.delete(id)
.then(async (doc) => {
if(!doc) {
console.error('Alert not found');
throw new Error('Alert not found');
}
console.log('Alert deleted ', doc._id);
(await this._publisher).publish(Constants.channels.delete_alert_listener, JSON.stringify(doc));
})
.catch((e: any) => {
console.error('Error deleting alert', e);
throw new Error('Error deleting alert');
});
}

async updateAsTriggered(message: string) {
console.debug('Updating alert as triggered ', message);
const alert: AlertModel = JSON.parse(message);
Expand Down
30 changes: 28 additions & 2 deletions src/workers/price-listener.worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import AlertModel from "../interfaces/alert.model";
import MarkPriceUpdate from "../interfaces/mark-price-update.model";
import RedisConfig from "../config/redis.config";
import Constants from "../constants/constants";
import Alerts from "../routes/alerts";


export default class PriceListenerWorker {
Expand Down Expand Up @@ -46,7 +47,8 @@ export default class PriceListenerWorker {

async startSubscribers() {
console.log('Starting price-watcher-worker subscribers');
(await this._subscriber).subscribe(Constants.channels.add_alert_listener, (message) => this.addListener(message))
(await this._subscriber).subscribe(Constants.channels.add_alert_listener, (message) => this.addListener(message));
(await this._subscriber).subscribe(Constants.channels.delete_alert_listener, (message) => this.removeListener(message))
}

async startMonitoring(symbols: string[]) {
Expand Down Expand Up @@ -79,7 +81,7 @@ export default class PriceListenerWorker {
if (message.id) {
this.handlePendingResponse(message);
} else if (message.stream) {
console.log('Stream message: ', message);
// console.info('Stream message: ', message);
this.processPriceUpdate(message);
}
});
Expand Down Expand Up @@ -120,6 +122,13 @@ export default class PriceListenerWorker {
this._pendingResponses.set(ref.toString(), payload);
}

private unsubscribeIfNoAlerts(pair: string) {
const alertsForPair = this._alerts.get(pair);
if (!alertsForPair || !alertsForPair.length) {
this.unsubscribe([pair]);
}
}

private unsubscribe(pairs: string[]) {
console.log('Unsubscribing from pairs: ', pairs.join(', '));
const params = pairs
Expand All @@ -143,6 +152,15 @@ export default class PriceListenerWorker {
this._alerts.set(alert.pair, [...alertsForPair]);
}

removeListener(message: string) {
const alert: AlertModel = JSON.parse(message);
console.log(`Removing alert ${alert._id} from listener `);
this.unsubscribeIfNoAlerts(alert.pair);
let alertsForPair = this._alerts.get(alert.pair) || [];
alertsForPair = [...alertsForPair.filter(a => a._id !== alert._id)];
this._alerts.set(alert.pair, [...alertsForPair]);
}

private handlePendingResponse(message: any) {
const ref = message.id.toString();
const pendingResponse = this._pendingResponses.get(ref);
Expand Down Expand Up @@ -199,6 +217,14 @@ export default class PriceListenerWorker {
.catch(console.error)
}
break;
case 'lt_price':
if (Number(price) <= alert.price.value) {
this.evictFromListener(alert);
this.triggerAlert(alert, price)
.then(() => console.log(`Alert ${alert._id} triggered.`))
.catch(console.error)
}
break;
default:
console.log(`Unhandled alert type: ${alert.type} for id ${alert._id}`);
}
Expand Down

0 comments on commit 4013313

Please sign in to comment.