Skip to content

Commit

Permalink
add ws kline support (#103)
Browse files Browse the repository at this point in the history
  • Loading branch information
JiamingKe authored Mar 11, 2023
1 parent 95ea813 commit bfcb751
Show file tree
Hide file tree
Showing 8 changed files with 267 additions and 9 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,7 @@ The following API endpoints have been implemented
#### Public Topics V5

- [Orderbook](https://bybit-exchange.github.io/docs/v5/websocket/public/orderbook)
- [Kline](https://bybit-exchange.github.io/docs/v5/websocket/public/kline)

#### Private Topics V5

Expand Down
22 changes: 22 additions & 0 deletions integrationtest-ws/v5/public_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,25 @@ func TestV5Public_OrderBook(t *testing.T) {

require.NoError(t, svc.Run())
}

func TestV5Public_Kline(t *testing.T) {
wsClient := bybit.NewTestWebsocketClient().WithAuthFromEnv()
svc, err := wsClient.V5().Public(bybit.CategoryV5Linear)
require.NoError(t, err)

_, err = svc.SubscribeKline(
bybit.V5WebsocketPublicKlineParamKey{
Interval: Interval5,
Symbol: bybit.SymbolV5BTCUSDT,
},
func(response bybit.V5WebsocketPublicKlineResponse) error {
goldenFilename := "./testdata/public-v5-kline.json"
testhelper.Compare(t, goldenFilename, testhelper.ConvertToJSON(response))
testhelper.UpdateFile(t, goldenFilename, testhelper.ConvertToJSON(response))
return nil
},
)
require.NoError(t, err)

require.NoError(t, svc.Run())
}
20 changes: 20 additions & 0 deletions integrationtest-ws/v5/testdata/piublic-v5-kline.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
{
"topic": "kline.5.BTCUSDT",
"data": [
{
"start": 1672324800000,
"end": 1672325099999,
"interval": "5",
"open": "16649.5",
"close": "16677",
"high": "16677",
"low": "16608",
"volume": "2.081",
"turnover": "34666.4005",
"confirm": false,
"timestamp": 1672324988882
}
],
"ts": 1672324988882,
"type": "snapshot"
}
9 changes: 5 additions & 4 deletions v5_client_web_socket_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ func (s *V5WebsocketService) Public(category CategoryV5) (V5WebsocketPublicServi
return &V5WebsocketPublicService{
client: s.client,
connection: c,
paramOrderBookMap: map[V5WebsocketPublicOrderBookParamKey]func(V5WebsocketPublicOrderBookResponse) error{},
paramOrderBookMap: make(map[V5WebsocketPublicOrderBookParamKey]func(V5WebsocketPublicOrderBookResponse) error),
paramKlineMap: make(map[V5WebsocketPublicKlineParamKey]func(V5WebsocketPublicKlineResponse) error),
}, nil
}

Expand All @@ -39,9 +40,9 @@ func (s *V5WebsocketService) Private() (V5WebsocketPrivateServiceI, error) {
return &V5WebsocketPrivateService{
client: s.client,
connection: c,
paramOrderMap: map[V5WebsocketPrivateParamKey]func(V5WebsocketPrivateOrderResponse) error{},
paramPositionMap: map[V5WebsocketPrivateParamKey]func(V5WebsocketPrivatePositionResponse) error{},
paramWalletMap: map[V5WebsocketPrivateParamKey]func(V5WebsocketPrivateWalletResponse) error{},
paramOrderMap: make(map[V5WebsocketPrivateParamKey]func(V5WebsocketPrivateOrderResponse) error),
paramPositionMap: make(map[V5WebsocketPrivateParamKey]func(V5WebsocketPrivatePositionResponse) error),
paramWalletMap: make(map[V5WebsocketPrivateParamKey]func(V5WebsocketPrivateWalletResponse) error),
}, nil
}

Expand Down
25 changes: 25 additions & 0 deletions v5_ws_public.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ type V5WebsocketPublicServiceI interface {
V5WebsocketPublicOrderBookParamKey,
func(V5WebsocketPublicOrderBookResponse) error,
) (func() error, error)

SubscribeKline(
V5WebsocketPublicKlineParamKey,
func(V5WebsocketPublicKlineResponse) error,
) (func() error, error)
}

// V5WebsocketPublicService :
Expand All @@ -31,6 +36,7 @@ type V5WebsocketPublicService struct {
connection *websocket.Conn

paramOrderBookMap map[V5WebsocketPublicOrderBookParamKey]func(V5WebsocketPublicOrderBookResponse) error
paramKlineMap map[V5WebsocketPublicKlineParamKey]func(V5WebsocketPublicKlineResponse) error
}

const (
Expand All @@ -49,6 +55,9 @@ type V5WebsocketPublicTopic string
const (
// V5WebsocketPublicTopicOrderBook :
V5WebsocketPublicTopicOrderBook = "orderbook"

// V5WebsocketPublicTopicKline :
V5WebsocketPublicTopicKline = "kline"
)

// judgeTopic :
Expand All @@ -61,6 +70,8 @@ func (s *V5WebsocketPublicService) judgeTopic(respBody []byte) (V5WebsocketPubli
switch {
case strings.Contains(topic, "orderbook"):
return V5WebsocketPublicTopicOrderBook, nil
case strings.Contains(topic, "kline"):
return V5WebsocketPublicTopicKline, nil
}
}
return "", nil
Expand Down Expand Up @@ -145,6 +156,20 @@ func (s *V5WebsocketPublicService) Run() error {
if err != nil {
return err
}
if err := f(resp); err != nil {
return err
}
case V5WebsocketPublicTopicKline:
var resp V5WebsocketPublicKlineResponse
if err := s.parseResponse(message, &resp); err != nil {
return err
}

f, err := s.retrieveKlineFunc(resp.Key())
if err != nil {
return err
}

if err := f(resp); err != nil {
return err
}
Expand Down
123 changes: 123 additions & 0 deletions v5_ws_public_kline.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package bybit

import (
"encoding/json"
"errors"
"fmt"
"strings"

"github.com/gorilla/websocket"
)

// SubscribeKline :
func (s *V5WebsocketPublicService) SubscribeKline(
key V5WebsocketPublicKlineParamKey,
f func(V5WebsocketPublicKlineResponse) error,
) (func() error, error) {
if err := s.addParamKlineFunc(key, f); err != nil {
return nil, err
}
param := struct {
Op string `json:"op"`
Args []interface{} `json:"args"`
}{
Op: "subscribe",
Args: []interface{}{key.Topic()},
}
buf, err := json.Marshal(param)
if err != nil {
return nil, err
}
if err := s.connection.WriteMessage(websocket.TextMessage, buf); err != nil {
return nil, err
}
return func() error {
param := struct {
Op string `json:"op"`
Args []interface{} `json:"args"`
}{
Op: "unsubscribe",
Args: []interface{}{key.Topic()},
}
buf, err := json.Marshal(param)
if err != nil {
return err
}
if err := s.connection.WriteMessage(websocket.TextMessage, []byte(buf)); err != nil {
return err
}
s.removeParamKlineFunc(key)
return nil
}, nil
}

// V5WebsocketPublicKlineParamKey :
type V5WebsocketPublicKlineParamKey struct {
Interval Interval
Symbol SymbolV5
}

// Topic :
func (k *V5WebsocketPublicKlineParamKey) Topic() string {
return fmt.Sprintf("%s.%s.%s", V5WebsocketPublicTopicKline, k.Interval, k.Symbol)
}

// V5WebsocketPublicKlineResponse :
type V5WebsocketPublicKlineResponse struct {
Topic string `json:"topic"`
Type string `json:"type"`
TimeStamp int64 `json:"ts"`
Data []V5WebsocketPublicKlineData `json:"data"`
}

// V5WebsocketPublicKlineData :
type V5WebsocketPublicKlineData struct {
Start int `json:"start"`
End int `json:"end"`
Interval Interval `json:"interval"`
Open string `json:"open"`
Close string `json:"close"`
High string `json:"high"`
Low string `json:"low"`
Volume string `json:"volume"`
Turnover string `json:"turnover"`
Confirm bool `json:"confirm"`
Timestamp int `json:"timestamp"`
}

// Key :
func (r *V5WebsocketPublicKlineResponse) Key() V5WebsocketPublicKlineParamKey {
topic := r.Topic
arr := strings.Split(topic, ".")
if arr[0] != V5WebsocketPublicTopicKline || len(arr) != 3 {
return V5WebsocketPublicKlineParamKey{}
}

return V5WebsocketPublicKlineParamKey{
Interval: Interval(arr[1]),
Symbol: SymbolV5(arr[2]),
}
}

// addParamKlineFunc :
func (s *V5WebsocketPublicService) addParamKlineFunc(key V5WebsocketPublicKlineParamKey, f func(V5WebsocketPublicKlineResponse) error) error {
if _, exist := s.paramKlineMap[key]; exist {
return errors.New("already registered for this key")
}
s.paramKlineMap[key] = f
return nil
}

// removeParamTradeFunc :
func (s *V5WebsocketPublicService) removeParamKlineFunc(key V5WebsocketPublicKlineParamKey) {
delete(s.paramKlineMap, key)
}

// retrievePositionFunc :
func (s *V5WebsocketPublicService) retrieveKlineFunc(key V5WebsocketPublicKlineParamKey) (func(V5WebsocketPublicKlineResponse) error, error) {
f, exist := s.paramKlineMap[key]
if !exist {
return nil, errors.New("func not found")
}
return f, nil
}
66 changes: 66 additions & 0 deletions v5_ws_public_kline_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package bybit

import (
"encoding/json"
"testing"

"github.com/hirokisan/bybit/v2/testhelper"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestWebsocketV5Public_Kline(t *testing.T) {
respBody := map[string]interface{}{
"topic": "kline.5.BTCUSDT",
"type": "snapshot",
"ts": 1672324988882,
"data": []map[string]interface{}{
{
"start": 1672324800000,
"end": 1672325099999,
"interval": "5",
"open": "16649.5",
"close": "16677",
"high": "16677",
"low": "16608",
"volume": "2.081",
"turnover": "34666.4005",
"confirm": false,
"timestamp": 1672324988882,
},
},
}
bytesBody, err := json.Marshal(respBody)
require.NoError(t, err)

category := CategoryV5Linear

server, teardown := testhelper.NewWebsocketServer(
testhelper.WithWebsocketHandlerOption(V5WebsocketPublicPathFor(category), bytesBody),
)
defer teardown()

wsClient := NewTestWebsocketClient().
WithBaseURL(server.URL)

svc, err := wsClient.V5().Public(category)
require.NoError(t, err)

{
_, err := svc.SubscribeKline(
V5WebsocketPublicKlineParamKey{
Interval: Interval5,
Symbol: SymbolV5BTCUSDT,
},
func(response V5WebsocketPublicKlineResponse) error {
assert.Equal(t, respBody["topic"], response.Topic)
return nil
},
)
require.NoError(t, err)
}

assert.NoError(t, svc.Run())
assert.NoError(t, svc.Ping())
assert.NoError(t, svc.Close())
}
10 changes: 5 additions & 5 deletions v5_ws_public_orderbook.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type V5WebsocketPublicOrderBookParamKey struct {

// Topic :
func (k *V5WebsocketPublicOrderBookParamKey) Topic() string {
return fmt.Sprintf("orderbook.%d.%s", k.Depth, k.Symbol)
return fmt.Sprintf("%s.%d.%s", V5WebsocketPublicTopicOrderBook, k.Depth, k.Symbol)
}

// V5WebsocketPublicOrderBookResponse :
Expand Down Expand Up @@ -134,7 +134,7 @@ func (b *V5WebsocketPublicOrderBookAsks) UnmarshalJSON(data []byte) error {
func (r *V5WebsocketPublicOrderBookResponse) Key() V5WebsocketPublicOrderBookParamKey {
topic := r.Topic
arr := strings.Split(topic, ".")
if arr[0] != "orderbook" || len(arr) != 3 {
if arr[0] != V5WebsocketPublicTopicOrderBook || len(arr) != 3 {
return V5WebsocketPublicOrderBookParamKey{}
}
depth, err := strconv.Atoi(arr[1])
Expand All @@ -149,11 +149,11 @@ func (r *V5WebsocketPublicOrderBookResponse) Key() V5WebsocketPublicOrderBookPar
}

// addParamOrderBookFunc :
func (s *V5WebsocketPublicService) addParamOrderBookFunc(param V5WebsocketPublicOrderBookParamKey, f func(V5WebsocketPublicOrderBookResponse) error) error {
if _, exist := s.paramOrderBookMap[param]; exist {
func (s *V5WebsocketPublicService) addParamOrderBookFunc(key V5WebsocketPublicOrderBookParamKey, f func(V5WebsocketPublicOrderBookResponse) error) error {
if _, exist := s.paramOrderBookMap[key]; exist {
return errors.New("already registered for this param")
}
s.paramOrderBookMap[param] = f
s.paramOrderBookMap[key] = f
return nil
}

Expand Down

0 comments on commit bfcb751

Please sign in to comment.