diff --git a/README.md b/README.md index cd3ece0..8c6ca1e 100644 --- a/README.md +++ b/README.md @@ -314,6 +314,7 @@ The following API endpoints have been implemented #### Public Topics V5 - [Orderbook](https://bybit-exchange.github.io/docs/v5/websocket/public/orderbook) +- [Kline](https://bybit-exchange.github.io/docs/v5/websocket/public/kline) #### Private Topics V5 diff --git a/integrationtest-ws/v5/public_test.go b/integrationtest-ws/v5/public_test.go index 3d99597..96f6afc 100644 --- a/integrationtest-ws/v5/public_test.go +++ b/integrationtest-ws/v5/public_test.go @@ -31,3 +31,25 @@ func TestV5Public_OrderBook(t *testing.T) { require.NoError(t, svc.Run()) } + +func TestV5Public_Kline(t *testing.T) { + wsClient := bybit.NewTestWebsocketClient().WithAuthFromEnv() + svc, err := wsClient.V5().Public(bybit.CategoryV5Linear) + require.NoError(t, err) + + _, err = svc.SubscribeKline( + bybit.V5WebsocketPublicKlineParamKey{ + Interval: Interval5, + Symbol: bybit.SymbolV5BTCUSDT, + }, + func(response bybit.V5WebsocketPublicKlineResponse) error { + goldenFilename := "./testdata/public-v5-kline.json" + testhelper.Compare(t, goldenFilename, testhelper.ConvertToJSON(response)) + testhelper.UpdateFile(t, goldenFilename, testhelper.ConvertToJSON(response)) + return nil + }, + ) + require.NoError(t, err) + + require.NoError(t, svc.Run()) +} diff --git a/integrationtest-ws/v5/testdata/piublic-v5-kline.json b/integrationtest-ws/v5/testdata/piublic-v5-kline.json new file mode 100644 index 0000000..8874eb6 --- /dev/null +++ b/integrationtest-ws/v5/testdata/piublic-v5-kline.json @@ -0,0 +1,20 @@ +{ + "topic": "kline.5.BTCUSDT", + "data": [ + { + "start": 1672324800000, + "end": 1672325099999, + "interval": "5", + "open": "16649.5", + "close": "16677", + "high": "16677", + "low": "16608", + "volume": "2.081", + "turnover": "34666.4005", + "confirm": false, + "timestamp": 1672324988882 + } + ], + "ts": 1672324988882, + "type": "snapshot" +} \ No newline at end of file diff --git a/v5_client_web_socket_service.go b/v5_client_web_socket_service.go index 050ed0a..56a92e2 100644 --- a/v5_client_web_socket_service.go +++ b/v5_client_web_socket_service.go @@ -25,7 +25,8 @@ func (s *V5WebsocketService) Public(category CategoryV5) (V5WebsocketPublicServi return &V5WebsocketPublicService{ client: s.client, connection: c, - paramOrderBookMap: map[V5WebsocketPublicOrderBookParamKey]func(V5WebsocketPublicOrderBookResponse) error{}, + paramOrderBookMap: make(map[V5WebsocketPublicOrderBookParamKey]func(V5WebsocketPublicOrderBookResponse) error), + paramKlineMap: make(map[V5WebsocketPublicKlineParamKey]func(V5WebsocketPublicKlineResponse) error), }, nil } @@ -39,9 +40,9 @@ func (s *V5WebsocketService) Private() (V5WebsocketPrivateServiceI, error) { return &V5WebsocketPrivateService{ client: s.client, connection: c, - paramOrderMap: map[V5WebsocketPrivateParamKey]func(V5WebsocketPrivateOrderResponse) error{}, - paramPositionMap: map[V5WebsocketPrivateParamKey]func(V5WebsocketPrivatePositionResponse) error{}, - paramWalletMap: map[V5WebsocketPrivateParamKey]func(V5WebsocketPrivateWalletResponse) error{}, + paramOrderMap: make(map[V5WebsocketPrivateParamKey]func(V5WebsocketPrivateOrderResponse) error), + paramPositionMap: make(map[V5WebsocketPrivateParamKey]func(V5WebsocketPrivatePositionResponse) error), + paramWalletMap: make(map[V5WebsocketPrivateParamKey]func(V5WebsocketPrivateWalletResponse) error), }, nil } diff --git a/v5_ws_public.go b/v5_ws_public.go index 92d2495..8517cbf 100644 --- a/v5_ws_public.go +++ b/v5_ws_public.go @@ -23,6 +23,11 @@ type V5WebsocketPublicServiceI interface { V5WebsocketPublicOrderBookParamKey, func(V5WebsocketPublicOrderBookResponse) error, ) (func() error, error) + + SubscribeKline( + V5WebsocketPublicKlineParamKey, + func(V5WebsocketPublicKlineResponse) error, + ) (func() error, error) } // V5WebsocketPublicService : @@ -31,6 +36,7 @@ type V5WebsocketPublicService struct { connection *websocket.Conn paramOrderBookMap map[V5WebsocketPublicOrderBookParamKey]func(V5WebsocketPublicOrderBookResponse) error + paramKlineMap map[V5WebsocketPublicKlineParamKey]func(V5WebsocketPublicKlineResponse) error } const ( @@ -49,6 +55,9 @@ type V5WebsocketPublicTopic string const ( // V5WebsocketPublicTopicOrderBook : V5WebsocketPublicTopicOrderBook = "orderbook" + + // V5WebsocketPublicTopicKline : + V5WebsocketPublicTopicKline = "kline" ) // judgeTopic : @@ -61,6 +70,8 @@ func (s *V5WebsocketPublicService) judgeTopic(respBody []byte) (V5WebsocketPubli switch { case strings.Contains(topic, "orderbook"): return V5WebsocketPublicTopicOrderBook, nil + case strings.Contains(topic, "kline"): + return V5WebsocketPublicTopicKline, nil } } return "", nil @@ -145,6 +156,20 @@ func (s *V5WebsocketPublicService) Run() error { if err != nil { return err } + if err := f(resp); err != nil { + return err + } + case V5WebsocketPublicTopicKline: + var resp V5WebsocketPublicKlineResponse + if err := s.parseResponse(message, &resp); err != nil { + return err + } + + f, err := s.retrieveKlineFunc(resp.Key()) + if err != nil { + return err + } + if err := f(resp); err != nil { return err } diff --git a/v5_ws_public_kline.go b/v5_ws_public_kline.go new file mode 100644 index 0000000..3d73159 --- /dev/null +++ b/v5_ws_public_kline.go @@ -0,0 +1,123 @@ +package bybit + +import ( + "encoding/json" + "errors" + "fmt" + "strings" + + "github.com/gorilla/websocket" +) + +// SubscribeKline : +func (s *V5WebsocketPublicService) SubscribeKline( + key V5WebsocketPublicKlineParamKey, + f func(V5WebsocketPublicKlineResponse) error, +) (func() error, error) { + if err := s.addParamKlineFunc(key, f); err != nil { + return nil, err + } + param := struct { + Op string `json:"op"` + Args []interface{} `json:"args"` + }{ + Op: "subscribe", + Args: []interface{}{key.Topic()}, + } + buf, err := json.Marshal(param) + if err != nil { + return nil, err + } + if err := s.connection.WriteMessage(websocket.TextMessage, buf); err != nil { + return nil, err + } + return func() error { + param := struct { + Op string `json:"op"` + Args []interface{} `json:"args"` + }{ + Op: "unsubscribe", + Args: []interface{}{key.Topic()}, + } + buf, err := json.Marshal(param) + if err != nil { + return err + } + if err := s.connection.WriteMessage(websocket.TextMessage, []byte(buf)); err != nil { + return err + } + s.removeParamKlineFunc(key) + return nil + }, nil +} + +// V5WebsocketPublicKlineParamKey : +type V5WebsocketPublicKlineParamKey struct { + Interval Interval + Symbol SymbolV5 +} + +// Topic : +func (k *V5WebsocketPublicKlineParamKey) Topic() string { + return fmt.Sprintf("%s.%s.%s", V5WebsocketPublicTopicKline, k.Interval, k.Symbol) +} + +// V5WebsocketPublicKlineResponse : +type V5WebsocketPublicKlineResponse struct { + Topic string `json:"topic"` + Type string `json:"type"` + TimeStamp int64 `json:"ts"` + Data []V5WebsocketPublicKlineData `json:"data"` +} + +// V5WebsocketPublicKlineData : +type V5WebsocketPublicKlineData struct { + Start int `json:"start"` + End int `json:"end"` + Interval Interval `json:"interval"` + Open string `json:"open"` + Close string `json:"close"` + High string `json:"high"` + Low string `json:"low"` + Volume string `json:"volume"` + Turnover string `json:"turnover"` + Confirm bool `json:"confirm"` + Timestamp int `json:"timestamp"` +} + +// Key : +func (r *V5WebsocketPublicKlineResponse) Key() V5WebsocketPublicKlineParamKey { + topic := r.Topic + arr := strings.Split(topic, ".") + if arr[0] != V5WebsocketPublicTopicKline || len(arr) != 3 { + return V5WebsocketPublicKlineParamKey{} + } + + return V5WebsocketPublicKlineParamKey{ + Interval: Interval(arr[1]), + Symbol: SymbolV5(arr[2]), + } +} + +// addParamKlineFunc : +func (s *V5WebsocketPublicService) addParamKlineFunc(key V5WebsocketPublicKlineParamKey, f func(V5WebsocketPublicKlineResponse) error) error { + if _, exist := s.paramKlineMap[key]; exist { + return errors.New("already registered for this key") + } + s.paramKlineMap[key] = f + return nil +} + +// removeParamTradeFunc : +func (s *V5WebsocketPublicService) removeParamKlineFunc(key V5WebsocketPublicKlineParamKey) { + delete(s.paramKlineMap, key) +} + +// retrievePositionFunc : +func (s *V5WebsocketPublicService) retrieveKlineFunc(key V5WebsocketPublicKlineParamKey) (func(V5WebsocketPublicKlineResponse) error, error) { + f, exist := s.paramKlineMap[key] + if !exist { + return nil, errors.New("func not found") + } + return f, nil +} diff --git a/v5_ws_public_kline_test.go b/v5_ws_public_kline_test.go new file mode 100644 index 0000000..e8aaa19 --- /dev/null +++ b/v5_ws_public_kline_test.go @@ -0,0 +1,66 @@ +package bybit + +import ( + "encoding/json" + "testing" + + "github.com/hirokisan/bybit/v2/testhelper" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestWebsocketV5Public_Kline(t *testing.T) { + respBody := map[string]interface{}{ + "topic": "kline.5.BTCUSDT", + "type": "snapshot", + "ts": 1672324988882, + "data": []map[string]interface{}{ + { + "start": 1672324800000, + "end": 1672325099999, + "interval": "5", + "open": "16649.5", + "close": "16677", + "high": "16677", + "low": "16608", + "volume": "2.081", + "turnover": "34666.4005", + "confirm": false, + "timestamp": 1672324988882, + }, + }, + } + bytesBody, err := json.Marshal(respBody) + require.NoError(t, err) + + category := CategoryV5Linear + + server, teardown := testhelper.NewWebsocketServer( + testhelper.WithWebsocketHandlerOption(V5WebsocketPublicPathFor(category), bytesBody), + ) + defer teardown() + + wsClient := NewTestWebsocketClient(). + WithBaseURL(server.URL) + + svc, err := wsClient.V5().Public(category) + require.NoError(t, err) + + { + _, err := svc.SubscribeKline( + V5WebsocketPublicKlineParamKey{ + Interval: Interval5, + Symbol: SymbolV5BTCUSDT, + }, + func(response V5WebsocketPublicKlineResponse) error { + assert.Equal(t, respBody["topic"], response.Topic) + return nil + }, + ) + require.NoError(t, err) + } + + assert.NoError(t, svc.Run()) + assert.NoError(t, svc.Ping()) + assert.NoError(t, svc.Close()) +} diff --git a/v5_ws_public_orderbook.go b/v5_ws_public_orderbook.go index 53c5fe7..05a2855 100644 --- a/v5_ws_public_orderbook.go +++ b/v5_ws_public_orderbook.go @@ -60,7 +60,7 @@ type V5WebsocketPublicOrderBookParamKey struct { // Topic : func (k *V5WebsocketPublicOrderBookParamKey) Topic() string { - return fmt.Sprintf("orderbook.%d.%s", k.Depth, k.Symbol) + return fmt.Sprintf("%s.%d.%s", V5WebsocketPublicTopicOrderBook, k.Depth, k.Symbol) } // V5WebsocketPublicOrderBookResponse : @@ -134,7 +134,7 @@ func (b *V5WebsocketPublicOrderBookAsks) UnmarshalJSON(data []byte) error { func (r *V5WebsocketPublicOrderBookResponse) Key() V5WebsocketPublicOrderBookParamKey { topic := r.Topic arr := strings.Split(topic, ".") - if arr[0] != "orderbook" || len(arr) != 3 { + if arr[0] != V5WebsocketPublicTopicOrderBook || len(arr) != 3 { return V5WebsocketPublicOrderBookParamKey{} } depth, err := strconv.Atoi(arr[1]) @@ -149,11 +149,11 @@ func (r *V5WebsocketPublicOrderBookResponse) Key() V5WebsocketPublicOrderBookPar } // addParamOrderBookFunc : -func (s *V5WebsocketPublicService) addParamOrderBookFunc(param V5WebsocketPublicOrderBookParamKey, f func(V5WebsocketPublicOrderBookResponse) error) error { - if _, exist := s.paramOrderBookMap[param]; exist { +func (s *V5WebsocketPublicService) addParamOrderBookFunc(key V5WebsocketPublicOrderBookParamKey, f func(V5WebsocketPublicOrderBookResponse) error) error { + if _, exist := s.paramOrderBookMap[key]; exist { return errors.New("already registered for this param") } - s.paramOrderBookMap[param] = f + s.paramOrderBookMap[key] = f return nil }