From 61c202e58c580870fb5035bc3d5e9921c6c45e64 Mon Sep 17 00:00:00 2001 From: Thibault You Date: Fri, 13 Aug 2021 14:31:18 +0200 Subject: [PATCH 1/2] :recycle: Adding websockets for FTX --- README.md | 2 +- package.json | 6 +- src/routes/balance.routes.ts | 4 +- src/routes/health.routes.ts | 3 + src/routes/market.routes.ts | 2 +- src/routes/trading.routes.ts | 4 +- src/services/account.service.ts | 2 +- .../exchanges/ftx.exchange.service.ts | 39 +++++- src/services/exchanges/ws/ftx.ws.service.ts | 132 ++++++++++++++++++ src/services/trading/trading.executor.ts | 5 +- src/services/trading/trading.service.ts | 5 +- src/utils/exchanges/common.utils.ts | 8 +- src/utils/exchanges/ftx.utils.ts | 4 +- src/utils/trading/ticker.utils.ts | 4 +- 14 files changed, 199 insertions(+), 21 deletions(-) create mode 100644 src/services/exchanges/ws/ftx.ws.service.ts diff --git a/README.md b/README.md index dd16c31..861c0a8 100644 --- a/README.md +++ b/README.md @@ -36,7 +36,7 @@ You can use the bot with : | [![kraken](https://user-images.githubusercontent.com/51840849/76173629-fc67fb00-61b1-11ea-84fe-f2de582f58a3.jpg)](https://www.kraken.com) | kraken | [Kraken (spot)](https://www.kraken.com) | [API](https://www.kraken.com/features/api) | -### 🚧 Features +### ⚗️ Features - add / read / delete account (or subaccount for FTX) configuration - open a Long / Short position (Futures) or Buy / Sell a token (Spot) in $US diff --git a/package.json b/package.json index 34c3404..e20d15d 100644 --- a/package.json +++ b/package.json @@ -57,20 +57,24 @@ "class-transformer": "0.4.0", "class-transformer-validator": "0.9.1", "class-validator": "0.13.1", + "crypto": "^1.0.1", "express": "4.17.1", "handy-redis": "2.2.2", "node-json-db": "1.3.0", "redis": "3.1.2", "typescript": "4.3.5", "uuid": "8.3.2", - "winston": "3.3.3" + "winston": "3.3.3", + "ws": "^8.1.0" }, "devDependencies": { + "@types/crypto-js": "^4.0.2", "@types/express": "4.17.13", "@types/jest": "26.0.24", "@types/node": "16.4.10", "@types/redis": "2.8.31", "@types/uuid": "8.3.1", + "@types/ws": "^7.4.7", "@typescript-eslint/eslint-plugin": "4.29.0", "@typescript-eslint/parser": "4.29.0", "concurrently": "6.2.0", diff --git a/src/routes/balance.routes.ts b/src/routes/balance.routes.ts index 52df045..1c33105 100644 --- a/src/routes/balance.routes.ts +++ b/src/routes/balance.routes.ts @@ -20,7 +20,7 @@ export const getAccountBalances = async ( try { const account = await readAccount(id); const { exchange } = account; - const balances = await TradingService.getTradeExecutor(exchange) + const balances = await (await TradingService.getTradeExecutor(exchange)) .getExchangeService() .getBalances(account); res.write( @@ -51,7 +51,7 @@ export const getAccountsBalances = async ( try { accountsBalances.push({ account: account.stub, - balances: await TradingService.getTradeExecutor(account.exchange) + balances: await (await TradingService.getTradeExecutor(account.exchange)) .getExchangeService() .getBalances(account) }); diff --git a/src/routes/health.routes.ts b/src/routes/health.routes.ts index 74c1eee..5701d0b 100644 --- a/src/routes/health.routes.ts +++ b/src/routes/health.routes.ts @@ -1,6 +1,7 @@ import { Request, Response, Router } from 'express'; import { Route } from '../constants/routes.constants'; import { HEALTHCHECK_SUCCESS } from '../messages/server.messages'; +import { FTXExchangeWSService } from '../services/exchanges/ws/ftx.ws.service'; import { loggingMiddleware } from '../utils/logger.utils'; const router = Router(); @@ -10,6 +11,8 @@ export const checkHealth = async ( _req: Request, res: Response ): Promise => { + const ftx = new FTXExchangeWSService(); + res.write( JSON.stringify({ message: HEALTHCHECK_SUCCESS diff --git a/src/routes/market.routes.ts b/src/routes/market.routes.ts index b69a974..8703d73 100644 --- a/src/routes/market.routes.ts +++ b/src/routes/market.routes.ts @@ -17,7 +17,7 @@ export const getMarkets = async ( ): Promise => { const exchange = req.params.exchange as ExchangeId; try { - const markets = await TradingService.getTradeExecutor(exchange) + const markets = await (await TradingService.getTradeExecutor(exchange)) .getExchangeService() .getMarkets(); res.write( diff --git a/src/routes/trading.routes.ts b/src/routes/trading.routes.ts index 9311afa..fd09eef 100644 --- a/src/routes/trading.routes.ts +++ b/src/routes/trading.routes.ts @@ -25,7 +25,7 @@ export const postTrade = async (req: Request, res: Response): Promise => { const side = getSide(direction); try { const account = await readAccount(stub); - TradingService.getTradeExecutor(account.exchange).addTrade( + (await TradingService.getTradeExecutor(account.exchange)).addTrade( account, trade ); @@ -50,7 +50,7 @@ export const postTrade = async (req: Request, res: Response): Promise => { const { direction, stub, symbol }: Trade = req.body; const side = getSide(direction); const account = await readAccount(stub); - TradingService.getTradeExecutor(account.exchange).addTrade( + (await TradingService.getTradeExecutor(account.exchange)).addTrade( account, req.body ); diff --git a/src/services/account.service.ts b/src/services/account.service.ts index 4e4c3a1..c47fa96 100644 --- a/src/services/account.service.ts +++ b/src/services/account.service.ts @@ -30,7 +30,7 @@ export const writeAccount = async (account: Account): Promise => { } } catch (err) { try { - await TradingService.getTradeExecutor(exchange) + await (await TradingService.getTradeExecutor(exchange)) .getExchangeService() .refreshSession(account); } catch (err) { diff --git a/src/services/exchanges/ftx.exchange.service.ts b/src/services/exchanges/ftx.exchange.service.ts index 4b70744..22635fb 100644 --- a/src/services/exchanges/ftx.exchange.service.ts +++ b/src/services/exchanges/ftx.exchange.service.ts @@ -4,9 +4,9 @@ import { getAccountId } from '../../utils/account.utils'; import { Exchange, Ticker } from 'ccxt'; import { Side } from '../../constants/trading.constants'; import { IOrderOptions } from '../../interfaces/trading.interfaces'; -import { error } from '../logger.service'; +import { debug, error } from '../logger.service'; import { Trade } from '../../entities/trade.entities'; -import { isFTXSpot } from '../../utils/exchanges/ftx.utils'; +import { getFTXBaseSymbol, isFTXSpot } from '../../utils/exchanges/ftx.utils'; import { OPEN_TRADE_ERROR_MAX_SIZE } from '../../messages/trading.messages'; import { OpenPositionError } from '../../errors/trading.errors'; import { CompositeExchangeService } from './base/composite.exchange.service'; @@ -17,12 +17,43 @@ import { getRelativeOrderSize } from '../../utils/trading/conversion.utils'; import { getInvertedSide, getSide } from '../../utils/trading/side.utils'; +import { FTXExchangeWSService } from './ws/ftx.ws.service'; +import { TICKER_READ_ERROR, TICKER_READ_SUCCESS } from '../../messages/exchanges.messages'; +import { TickerFetchError } from '../../errors/exchange.errors'; +import { getTickerPrice } from '../../utils/trading/ticker.utils'; export class FTXExchangeService extends CompositeExchangeService { + ws: FTXExchangeWSService; + constructor() { super(ExchangeId.FTX); } + init = async () => { + this.ws = await FTXExchangeWSService.init() + } + + getTicker = async (symbol: string): Promise => { + try { + let ticker: Ticker; + const base = getFTXBaseSymbol(symbol) + if (!this.ws.tickers[base]) { + this.ws.addTicker(symbol); + ticker = await this.defaultExchange.fetchTicker(symbol); + } else { + ticker = {symbol, ...this.ws.tickers[base]} as unknown as Ticker + } + debug(TICKER_READ_SUCCESS(this.exchangeId, symbol, ticker)); + return ticker; + } catch (err) { + console.log(err) + error(TICKER_READ_ERROR(this.exchangeId, symbol), err); + throw new TickerFetchError( + TICKER_READ_ERROR(this.exchangeId, symbol, err.message) + ); + } + }; + fetchPositions = async (instance: Exchange): Promise => (await instance.privateGetAccount()).result.positions; @@ -32,8 +63,8 @@ export class FTXExchangeService extends CompositeExchangeService { trade: Trade ): Promise => { const { size } = trade; - const { symbol, info } = ticker; - const { price } = info; + const { symbol } = ticker; + const price = getTickerPrice(ticker, this.exchangeId) // we add a check since FTX is a composite exchange if (isFTXSpot(ticker)) { const balance = await this.getTickerBalance(account, ticker); diff --git a/src/services/exchanges/ws/ftx.ws.service.ts b/src/services/exchanges/ws/ftx.ws.service.ts new file mode 100644 index 0000000..c696acd --- /dev/null +++ b/src/services/exchanges/ws/ftx.ws.service.ts @@ -0,0 +1,132 @@ +import WebSocket from 'ws'; +import * as crypto from 'crypto'; +import { Account } from '../../../entities/account.entities'; +import { getFTXBaseSymbol } from '../../../utils/exchanges/ftx.utils'; + +interface ISocket { + ws: WebSocket; + subs: Array>; +} + +export interface IFTXWSTicker { + bid: number; + ask: number; + // bidSize: number; + // askSize: number; + // last: number; + time: number; +} + +const DEFAULT_SOCKET = 'public'; + +export class FTXExchangeWSService { + sockets: Record = { + [DEFAULT_SOCKET]: { + ws: new WebSocket('wss://ftx.com/ws/', { + perMessageDeflate: false + }), + subs: [] + } + }; // account id / privates sockets + tickers: Record = {}; // ticker symbol / ticker + + static init = async (): Promise => { + const ws = new FTXExchangeWSService() + await ws.connect(ws.sockets[DEFAULT_SOCKET].ws); + console.log(`Opening FTX public socket.`); + return ws; + } + + send = (ws: WebSocket, message: Record, account?: Account) => { + if (message.op === 'subscribe') { + this.sockets[account ? account.stub : DEFAULT_SOCKET].subs.push(message); + } else if (message.op !== 'ping') { + console.log(message); + } + ws.send(JSON.stringify(message)); + }; + + connect = (ws: WebSocket, account?: Account): Promise => { + return new Promise((resolve, _reject) => { + ws.onopen = () => { + setInterval(() => this.send(ws, { op: 'ping' }), 5 * 1000); + resolve(); + }; + + ws.onmessage = (event) => { + let msg; + try { + msg = JSON.parse(event.data.toString()); + } catch (e) { + console.error('FTX sent a bad json', e.data); + } + switch (msg.channel) { + case 'ticker': + this.tickers[getFTXBaseSymbol(msg.market)] = msg.data + break; + // case 'orders': + // case 'fills': + default: + break; + } + }; + + ws.onclose = (event) => { + console.log('Socket is closed. Reconnecting ...', event.reason); + this.connect(ws, account); + }; + + ws.onerror = (err) => { + console.error( + 'Socket encountered error: ', + err.message, + 'Closing socket' + ); + ws.close(); + }; + }); + }; + + addAccount = async (account: Account): Promise => { + const ws = new WebSocket('wss://ftx.com/ws/', { + perMessageDeflate: false + }); + this.sockets[account.stub] = { + ws, + subs: [] + }; + await this.connect(this.sockets[account.stub].ws, account); + const { apiKey, secret, stub, subaccount } = account; + const time = Date.now(); + const sign = crypto + .createHmac('sha256', secret) + .update(time + 'websocket_login') + .digest('hex'); + this.send( + ws, + { + op: 'login', + args: { + key: apiKey, + sign: sign, + time: time, + subaccount: subaccount + } + }, + account + ); + // this.send(ws, { op: 'subscribe', channel: 'fills' }, account); + // this.send(ws, { op: 'subscribe', channel: 'orders' }, account); + console.log(`Opening FTX socket for ${stub} account.`); + }; + + addTicker = (symbol: string): void => { + if (!this.tickers[symbol]) { + this.send(this.sockets[DEFAULT_SOCKET].ws, { + op: 'subscribe', + channel: 'ticker', + market: symbol + }); + } + }; +} diff --git a/src/services/trading/trading.executor.ts b/src/services/trading/trading.executor.ts index 2e90f66..8d8b03a 100644 --- a/src/services/trading/trading.executor.ts +++ b/src/services/trading/trading.executor.ts @@ -36,7 +36,10 @@ export class TradingExecutor { constructor(id: ExchangeId) { this.id = id; - this.exchangeService = initExchangeService(id); + } + + init = async () => { + this.exchangeService = await initExchangeService(this.id); } getExchangeService = (): ExchangeService => this.exchangeService; diff --git a/src/services/trading/trading.service.ts b/src/services/trading/trading.service.ts index 2e07dd6..a50350c 100644 --- a/src/services/trading/trading.service.ts +++ b/src/services/trading/trading.service.ts @@ -7,11 +7,12 @@ export class TradingService { // eslint-disable-next-line @typescript-eslint/no-empty-function private constructor() {} - public static getTradeExecutor = ( + public static getTradeExecutor = async ( exchangeId: ExchangeId - ): TradingExecutor => { + ): Promise => { if (!TradingService.executors.get(exchangeId)) { const service = new TradingExecutor(exchangeId); + await service.init() TradingService.executors.set(exchangeId, service); service.start(); } diff --git a/src/utils/exchanges/common.utils.ts b/src/utils/exchanges/common.utils.ts index e23bf75..e8bcc77 100644 --- a/src/utils/exchanges/common.utils.ts +++ b/src/utils/exchanges/common.utils.ts @@ -33,9 +33,9 @@ export const getExchangeOptions = ( return options; }; -export const initExchangeService = ( +export const initExchangeService = async ( exchangeId: ExchangeId -): ExchangeService => { +): Promise => { switch (exchangeId) { case ExchangeId.Binance: return new BinanceSpotExchangeService(); @@ -47,7 +47,9 @@ export const initExchangeService = ( return new KrakenExchangeService(); case ExchangeId.FTX: default: - return new FTXExchangeService(); + const ex = new FTXExchangeService(); + await ex.init(); + return ex; } }; diff --git a/src/utils/exchanges/ftx.utils.ts b/src/utils/exchanges/ftx.utils.ts index 1b48dbe..822d691 100644 --- a/src/utils/exchanges/ftx.utils.ts +++ b/src/utils/exchanges/ftx.utils.ts @@ -1,4 +1,6 @@ import { Ticker } from 'ccxt'; export const isFTXSpot = (ticker: Ticker): boolean => - ticker.info.type === 'spot'; + !ticker.symbol.includes('PERP'); + + export const getFTXBaseSymbol = (symbol: string) => symbol.split('/')[0].replace('-PERP', '') \ No newline at end of file diff --git a/src/utils/trading/ticker.utils.ts b/src/utils/trading/ticker.utils.ts index 2674ec7..abb6ffb 100644 --- a/src/utils/trading/ticker.utils.ts +++ b/src/utils/trading/ticker.utils.ts @@ -5,7 +5,7 @@ export const getTickerPrice = ( ticker: Ticker, exchangeId: ExchangeId ): number => { - const { last, info } = ticker; + const { last, bid, ask, info } = ticker; switch (exchangeId) { case ExchangeId.Binance: case ExchangeId.BinanceFuturesUSD: @@ -15,6 +15,6 @@ export const getTickerPrice = ( return last; case ExchangeId.FTX: default: - return info.price; + return (bid + ask) / 2; } }; From 87099115816b03feec5b5955d349b03742f5f348 Mon Sep 17 00:00:00 2001 From: Thibault You Date: Fri, 13 Aug 2021 14:48:48 +0200 Subject: [PATCH 2/2] :toilet: Removing garbage --- src/routes/health.routes.ts | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/routes/health.routes.ts b/src/routes/health.routes.ts index 5701d0b..7f755fe 100644 --- a/src/routes/health.routes.ts +++ b/src/routes/health.routes.ts @@ -11,8 +11,6 @@ export const checkHealth = async ( _req: Request, res: Response ): Promise => { - const ftx = new FTXExchangeWSService(); - res.write( JSON.stringify({ message: HEALTHCHECK_SUCCESS