From 37b8c8722154f0cd9328ccb128bc93e62aae2d94 Mon Sep 17 00:00:00 2001 From: bzeron <214598929@qq.com> Date: Mon, 4 Nov 2019 17:50:49 +0800 Subject: [PATCH] init --- .env | 16 ++ .env-sandbox | 16 ++ .gitignore | 9 + Dockerfile | 28 +++ README.md | 108 ++++++++- README_CN.md | 90 ++++++++ api/api.go | 102 +++++++++ api/event.go | 47 ++++ api/monitor.go | 45 ++++ api/order_book.go | 55 +++++ app/app.go | 146 ++++++++++++ build.sh | 8 + builder/builder.go | 342 ++++++++++++++++++++++++++++ builder/depth.go | 33 +++ demo/python-demo/__init__.py | 0 demo/python-demo/config.py | 5 + demo/python-demo/level3/__init__.py | 0 demo/python-demo/level3/rpc.py | 108 +++++++++ demo/python-demo/order_book_demo.py | 60 +++++ docker-entrypoint.sh | 10 + events/l3_events.go | 180 +++++++++++++++ go.mod | 11 + helper/helper.go | 16 ++ helper/str/string.go | 28 +++ kucoin_market.go | 28 +++ level3stream/stream_data_model.go | 93 ++++++++ service/redis.go | 40 ++++ utils/log/log.go | 24 ++ utils/log/log_test.go | 19 ++ utils/recovery/recovery.go | 91 ++++++++ 30 files changed, 1757 insertions(+), 1 deletion(-) create mode 100644 .env create mode 100644 .env-sandbox create mode 100644 .gitignore create mode 100644 Dockerfile create mode 100644 README_CN.md create mode 100644 api/api.go create mode 100644 api/event.go create mode 100644 api/monitor.go create mode 100644 api/order_book.go create mode 100644 app/app.go create mode 100755 build.sh create mode 100644 builder/builder.go create mode 100644 builder/depth.go create mode 100644 demo/python-demo/__init__.py create mode 100644 demo/python-demo/config.py create mode 100644 demo/python-demo/level3/__init__.py create mode 100644 demo/python-demo/level3/rpc.py create mode 100644 demo/python-demo/order_book_demo.py create mode 100755 docker-entrypoint.sh create mode 100644 events/l3_events.go create mode 100644 go.mod create mode 100644 helper/helper.go create mode 100644 helper/str/string.go create mode 100644 kucoin_market.go create mode 100644 level3stream/stream_data_model.go create mode 100644 service/redis.go create mode 100644 utils/log/log.go create mode 100644 utils/log/log_test.go create mode 100644 utils/recovery/recovery.go diff --git a/.env b/.env new file mode 100644 index 0000000..c15f584 --- /dev/null +++ b/.env @@ -0,0 +1,16 @@ +# API_SKIP_VERIFY_TLS=1 + +API_BASE_URI=https://api.kucoin.com + +# If open order book true otherwise false +ENABLE_ORDER_BOOK=true + +# If open event watcher true otherwise false +ENABLE_EVENT_WATCHER=true + +# Password for RPS calls. Pass the same when calling +RPC_TOKEN=market-token + +REDIS_HOST=127.0.0.1:6379 +REDIS_PASSWORD= +REDIS_DB= \ No newline at end of file diff --git a/.env-sandbox b/.env-sandbox new file mode 100644 index 0000000..0dcccc1 --- /dev/null +++ b/.env-sandbox @@ -0,0 +1,16 @@ +# API_SKIP_VERIFY_TLS=1 + +API_BASE_URI=https://openapi-sandbox.kucoin.com + +# If open order book true otherwise false +ENABLE_ORDER_BOOK=true + +# If open event watcher true otherwise false +ENABLE_EVENT_WATCHER=true + +# Password for RPS calls. Pass the same when calling +RPC_TOKEN=market-token + +REDIS_HOST=127.0.0.1:6379 +REDIS_PASSWORD= +REDIS_DB= \ No newline at end of file diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..2bee759 --- /dev/null +++ b/.gitignore @@ -0,0 +1,9 @@ +.idea +vendor +kucoin_market +.env.local +__pycache__ +go.sum +kucoin-market-for-linux +kucoin-market-for-mac +kucoin-market-for-windows.exe \ No newline at end of file diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..8f859c1 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,28 @@ +FROM golang:1.13-stretch as builder + +RUN export GO111MODULE=on \ + && export GOPROXY=https://goproxy.io \ + && mkdir -p /go/src/github.com/Kucoin/kucoin-level3-sdk + +COPY . /go/src/github.com/Kucoin/kucoin-level3-sdk + +RUN cd /go/src/github.com/Kucoin/kucoin-level3-sdk \ + && CGO_ENABLED=0 go build -ldflags '-s -w' -o /go/bin/kucoin_market kucoin_market.go + +FROM debian:stretch + +RUN apt-get update \ + && apt-get install ca-certificates -y + +COPY --from=builder /go/bin/kucoin_market /usr/local/bin/ + +# .env => /app/.env +WORKDIR /app +VOLUME /app + +EXPOSE 9090 + +COPY docker-entrypoint.sh /usr/local/bin/ +ENTRYPOINT ["docker-entrypoint.sh"] + +CMD ["kucoin_market", "-c", "/app/.env", "-symbol", "BTC-USDT", "-p", "9090", "-rpckey", "BTC-USDT"] diff --git a/README.md b/README.md index 06c97db..f3e8093 100644 --- a/README.md +++ b/README.md @@ -1 +1,107 @@ -# Kucoin-Level3-sdk \ No newline at end of file +# Kucoin Level3 market + +## guide + [中文文档](README_CN.md) + +## Installation + +1. install dependencies + +``` +go get github.com/JetBlink/orderbook +go get github.com/go-redis/redis +go get github.com/gorilla/websocket +go get github.com/joho/godotenv +go get github.com/Kucoin/kucoin-go-sdk +go get github.com/shopspring/decimal +``` + +2. build + +``` +CGO_ENABLED=0 go build -ldflags '-s -w' -o kucoin_market kucoin_market.go +``` + +or you can download the latest available [release](https://github.com/Kucoin/kucoin-level3-sdk/releases) + +## Usage + +1. [vim .env](.env): + ``` + # API_SKIP_VERIFY_TLS=1 + + API_BASE_URI=https://api.kucoin.com + + # If open order book true otherwise false + ENABLE_ORDER_BOOK=true + + # If open event watcher true otherwise false + ENABLE_EVENT_WATCHER=true + + # Password for RPS calls. Pass the same when calling + RPC_TOKEN=market-token + + REDIS_HOST=127.0.0.1:6379 + REDIS_PASSWORD= + REDIS_DB= + ``` + +1. Run Command: + + ``` + ./kucoin_market -c .env -symbol BTC-USDT -p 9090 -rpckey BTC-USDT + ``` + + +## Docker Usage + +1. Build docker image + + ``` + docker build -t kucoin_market . + ``` + +1. [vim .env](.env) + +1. Run + + ``` + docker run --rm -it -v $(pwd)/.env:/app/.env --net=host kucoin_market + ``` + +## RPC Method + +> endpoint : 127.0.0.1:9090 +> the sdk rpc is based on golang jsonrpc 1.0 over tcp. + +see:[python jsonrpc client demo](./demo/python-demo/level3/rpc.py) + +* Get Part Order Book + ``` + {"method": "Server.GetPartOrderBook", "params": [{"token": "your-rpc-token", "number": 1}], "id": 0} + ``` + +* Get Full Order Book + ``` + {"method": "Server.GetOrderBook", "params": [{"token": "your-rpc-token"}], "id": 0} + ``` + +* Add Event ClientOids To Channels + ``` + {"method": "Server.AddEventClientOidsToChannels", "params": [{"token": "your-rpc-token", "data": {"clientOid": ["channel-1", "channel-2"]}}], "id": 0} + ``` + +* Add Event OrderIds To Channels + ``` + {"method": "Server.AddEventOrderIdsToChannels", "params": [{"token": "your-rpc-token", "data": {"orderId": ["channel-1", "channel-2"]}}], "id": 0} + ``` +## Python-Demo + +> the demo including orderbook display + +see:[python use_level3 demo](./demo/python-demo/order_book_demo.py) +- Run order_book.py + ``` + command: python order_book.py + describe: display orderbook + ``` diff --git a/README_CN.md b/README_CN.md new file mode 100644 index 0000000..21d4205 --- /dev/null +++ b/README_CN.md @@ -0,0 +1,90 @@ +# Kucoin Level3 market + +## 入门文档 + [英文文档](README.md) + +## 安装 + +1. install dependencies + +``` +go get github.com/JetBlink/orderbook +go get github.com/go-redis/redis +go get github.com/gorilla/websocket +go get github.com/joho/godotenv +go get github.com/Kucoin/kucoin-go-sdk +go get github.com/shopspring/decimal +``` + +2. build + +``` +CGO_ENABLED=0 go build -ldflags '-s -w' -o kucoin_market kucoin_market.go +``` + +或者直接下载已经编译完成的二进制文件 + +## 用法 + +1. [vim .env](.env): + ``` + # API_SKIP_VERIFY_TLS=1 + + API_BASE_URI=https://api.kucoin.com + + # If open order book true otherwise false + ENABLE_ORDER_BOOK=true + + # If open event watcher true otherwise false + ENABLE_EVENT_WATCHER=true + + # Password for RPS calls. Pass the same when calling + RPC_TOKEN=market-token + + REDIS_HOST=127.0.0.1:6379 + REDIS_PASSWORD= + REDIS_DB= + ``` + +1. 运行命令: + + ``` + ./kucoin_market -c .env -symbol BTC-USDT -p 9090 -rpckey BTC-USDT + ``` + +## RPC Method + +> endpoint : 127.0.0.1:9090 +> the sdk rpc is based on golang jsonrpc 1.0 over tcp. + +see:[python jsonrpc client demo](./demo/python-demo/level3/rpc.py) + +* Get Part Order Book + ``` + {"method": "Server.GetPartOrderBook", "params": [{"token": "your-rpc-token", "number": 1}], "id": 0} + ``` + +* Get Full Order Book + ``` + {"method": "Server.GetOrderBook", "params": [{"token": "your-rpc-token"}], "id": 0} + ``` + +* Add Event ClientOids To Channels + ``` + {"method": "Server.AddEventClientOidsToChannels", "params": [{"token": "your-rpc-token", "data": {"clientOid": ["channel-1", "channel-2"]}}], "id": 0} + ``` + +* Add Event OrderIds To Channels + ``` + {"method": "Server.AddEventOrderIdsToChannels", "params": [{"token": "your-rpc-token", "data": {"orderId": ["channel-1", "channel-2"]}}], "id": 0} + ``` +## Python-Demo + +> python的demo包含了一个本地orderbook的展示 +see:[python use_level3 demo](./demo/python-demo/order_book_demo.py) + +- Run order_book.py + ``` + command: python order_book.py + describe: display orderbook + ``` \ No newline at end of file diff --git a/api/api.go b/api/api.go new file mode 100644 index 0000000..eebbb81 --- /dev/null +++ b/api/api.go @@ -0,0 +1,102 @@ +package api + +import ( + "encoding/json" + "fmt" + "net" + "net/rpc" + "net/rpc/jsonrpc" + + "github.com/Kucoin/kucoin-level3-sdk/builder" + "github.com/Kucoin/kucoin-level3-sdk/events" + "github.com/Kucoin/kucoin-level3-sdk/utils/log" +) + +//Server is api server +type Server struct { + level3Builder *builder.Builder + eventWatcher *events.Watcher + + apiPort string + token string +} + +//InitRpcServer init rpc server +func InitRpcServer(apiPort string, token string, level3Builder *builder.Builder, watcher *events.Watcher) { + if apiPort == "" || token == "" { + panic(fmt.Sprintf("missing configuration,apiPort: %s, token: %s", apiPort, token)) + } + + apiPort = ":" + apiPort + if err := rpc.Register(&Server{ + level3Builder: level3Builder, + eventWatcher: watcher, + + apiPort: apiPort, + token: token, + }); err != nil { + panic("api server run failed, error: %s" + err.Error()) + } + + log.Warn("start running rpc server, port: %s", apiPort) + + listener, err := net.Listen("tcp", apiPort) + if err != nil { + panic("api server run failed, error: %s" + err.Error()) + } + + for { + conn, err := listener.Accept() + if err != nil { + continue + } + + go jsonrpc.ServeConn(conn) + } +} + +//TokenMessage is token type message +type TokenMessage struct { + Token string `json:"token"` +} + +//Response is api response +type Response struct { + Code string `json:"code"` + Data interface{} `json:"data"` + Error string `json:"error"` +} + +func (s *Server) checkToken(token string) string { + if token != s.token { + return s.failure(TokenErrorCode, "error token") + } + + return "" +} + +func (s *Server) success(data interface{}) string { + response, _ := json.Marshal(&Response{ + Code: "0", + Data: data, + Error: "", + }) + + return string(response) +} + +const ( + ServerErrorCode = "10" + TokenErrorCode = "20" + TickerErrorCode = "30" +) + +func (s *Server) failure(code string, err string) string { + response, _ := json.Marshal(&Response{ + Code: code, + Data: "", + Error: err, + }) + + return string(response) +} diff --git a/api/event.go b/api/event.go new file mode 100644 index 0000000..fabb192 --- /dev/null +++ b/api/event.go @@ -0,0 +1,47 @@ +package api + +type AddEventOrderIdsMessage struct { + Data map[string][]string `json:"data"` + TokenMessage +} + +type AddEventClientOidsMessage struct { + Data map[string][]string `json:"data"` + TokenMessage +} + +func (s *Server) AddEventOrderIdsToChannels(message *AddEventOrderIdsMessage, reply *string) error { + if err := s.checkToken(message.Token); err != "" { + *reply = err + return nil + } + + if len(message.Data) == 0 { + *reply = s.failure(ServerErrorCode, "empty event data") + return nil + } + + s.eventWatcher.AddEventOrderIdsToChannels(message.Data) + + *reply = s.success("") + return nil +} + +// You must subscribe in advance according to the ClientOids subscription, +// or you will miss the receive message because of without the mapping relationship between message and orderId~ +func (s *Server) AddEventClientOidsToChannels(message *AddEventClientOidsMessage, reply *string) error { + if err := s.checkToken(message.Token); err != "" { + *reply = err + return nil + } + + if len(message.Data) == 0 { + *reply = s.failure(ServerErrorCode, "empty event data") + return nil + } + + s.eventWatcher.AddEventClientOidsToChannels(message.Data) + + *reply = s.success("") + return nil +} diff --git a/api/monitor.go b/api/monitor.go new file mode 100644 index 0000000..dc50c46 --- /dev/null +++ b/api/monitor.go @@ -0,0 +1,45 @@ +package api + +import ( + "encoding/json" + "time" +) + +func (s *Server) GetChanLen(message *TokenMessage, reply *string) error { + if err := s.checkToken(message.Token); err != "" { + *reply = err + return nil + } + + ret := map[string]int{ + "level3Builder.Messages": len(s.level3Builder.Messages), + "eventWatcher.Messages": len(s.eventWatcher.Messages), + } + + if data, err := json.Marshal(ret); err == nil { + *reply = s.success(string(data)) + return nil + } + + *reply = s.failure(ServerErrorCode, "json failed") + return nil +} + +func (s *Server) Time(message *TokenMessage, reply *string) error { + if err := s.checkToken(message.Token); err != "" { + *reply = err + return nil + } + + ret := map[string]int64{ + "time": time.Now().Unix(), + } + + if data, err := json.Marshal(ret); err == nil { + *reply = s.success(string(data)) + return nil + } + + *reply = s.failure(ServerErrorCode, "json failed") + return nil +} diff --git a/api/order_book.go b/api/order_book.go new file mode 100644 index 0000000..0718132 --- /dev/null +++ b/api/order_book.go @@ -0,0 +1,55 @@ +package api + +func (s *Server) GetOrderBook(message *TokenMessage, reply *string) error { + if err := s.checkToken(message.Token); err != "" { + *reply = err + return nil + } + + data, err := s.level3Builder.SnapshotBytes() + if err != nil { + *reply = s.failure(TickerErrorCode, err.Error()) + return nil + } + + *reply = s.success(string(data)) + return nil +} + +type GetPartOrderBookMessage struct { + Number int `json:"number"` + TokenMessage +} + +func (s *Server) GetPartOrderBook(message *GetPartOrderBookMessage, reply *string) error { + if err := s.checkToken(message.Token); err != "" { + *reply = err + return nil + } + + data, err := s.level3Builder.GetPartOrderBook(message.Number) + if err != nil { + *reply = s.failure(TickerErrorCode, err.Error()) + return nil + } + + *reply = s.success(string(data)) + return nil +} + + +func (s *Server) GetTicker(message *TokenMessage, reply *string) error { + if err := s.checkToken(message.Token); err != "" { + *reply = err + return nil + } + + data, err := s.level3Builder.GetTicker() + if err != nil { + *reply = s.failure(TickerErrorCode, err.Error()) + return nil + } + + *reply = s.success(string(data)) + return nil +} diff --git a/app/app.go b/app/app.go new file mode 100644 index 0000000..6fa1a4a --- /dev/null +++ b/app/app.go @@ -0,0 +1,146 @@ +package app + +import ( + "encoding/json" + "os" + "strconv" + + "github.com/Kucoin/kucoin-go-sdk" + "github.com/Kucoin/kucoin-level3-sdk/api" + "github.com/Kucoin/kucoin-level3-sdk/builder" + "github.com/Kucoin/kucoin-level3-sdk/events" + "github.com/Kucoin/kucoin-level3-sdk/service" + "github.com/Kucoin/kucoin-level3-sdk/utils/log" +) + +type App struct { + apiService *kucoin.ApiService + symbol string + + enableOrderBook bool + level3Builder *builder.Builder + + enableEventWatcher bool + eventWatcher *events.Watcher + redisPool *service.Redis + + rpcPort string + rpcToken string +} + +func NewApp(symbol string, rpcPort string, rpcKey string) *App { + if symbol == "" { + panic("symbol is required") + } + + if rpcPort == "" { + panic("rpcPort is required") + } + + if rpcKey == "" { + panic("rpckey is required") + } + + apiService := kucoin.NewApiServiceFromEnv() + level3Builder := builder.NewBuilder(apiService, symbol) + + var redisHost = os.Getenv("REDIS_HOST") + var redisPassword = os.Getenv("REDIS_PASSWORD") + var redisDBEnv = os.Getenv("REDIS_DB") + var redisDB = 0 + if redisDBEnv != "" { + redisDB, _ = strconv.Atoi(redisDBEnv) + } + redisPool := service.NewRedis(redisHost, redisPassword, redisDB, rpcKey, symbol, rpcPort) + + eventWatcher := events.NewWatcher(redisPool) + + return &App{ + apiService: apiService, + symbol: symbol, + + enableOrderBook: os.Getenv("ENABLE_ORDER_BOOK") == "true", + level3Builder: level3Builder, + + enableEventWatcher: os.Getenv("ENABLE_EVENT_WATCHER") == "true", + redisPool: redisPool, + eventWatcher: eventWatcher, + + rpcPort: rpcPort, + rpcToken: os.Getenv("RPC_TOKEN"), + } +} + +func (app *App) Run() { + if app.enableOrderBook { + go app.level3Builder.ReloadOrderBook() + } + + if app.enableEventWatcher { + go app.eventWatcher.Run() + } + + //rpc server + go api.InitRpcServer(app.rpcPort, app.rpcToken, app.level3Builder, app.eventWatcher) + + app.websocket() +} + +func (app *App) writeMessage(msgRawData json.RawMessage) { + //log.Info("raw message : %s", kucoin.ToJsonString(msgRawData)) + if app.enableOrderBook { + app.level3Builder.Messages <- msgRawData + } + + if app.enableEventWatcher { + app.eventWatcher.Messages <- msgRawData + } + + const msgLenLimit = 50 + if len(app.level3Builder.Messages) > msgLenLimit || + len(app.eventWatcher.Messages) > msgLenLimit { + log.Error( + "msgLenLimit: app.level3Builder.Messages: %d, app.eventWatcher.Messages: %d, app.verify.Messages: %d", + len(app.level3Builder.Messages), + len(app.eventWatcher.Messages), + ) + } +} + +func (app *App) websocket() { + //todo recover dingTalk ? + apiService := app.apiService + + rsp, err := apiService.WebSocketPublicToken() + if err != nil { + panic(err) + } + + tk := &kucoin.WebSocketTokenModel{} + if err := rsp.ReadData(tk); err != nil { + panic(err) + } + + c := apiService.NewWebSocketClient(tk) + + mc, ec, err := c.Connect() + if err != nil { + panic(err) + } + + ch := kucoin.NewSubscribeMessage("/market/level3:"+app.symbol, false) + if err := c.Subscribe(ch); err != nil { + panic(err) + } + + for { + select { + case err := <-ec: + c.Stop() + panic(err) + + case msg := <-mc: + app.writeMessage(msg.RawData) + } + } +} diff --git a/build.sh b/build.sh new file mode 100755 index 0000000..74172a3 --- /dev/null +++ b/build.sh @@ -0,0 +1,8 @@ +#!/bin/bash + +CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -ldflags '-s -w' -o kucoin-market-for-linux kucoin_market.go + +CGO_ENABLED=0 go build -ldflags '-s -w' -o kucoin-market-for-mac kucoin_market.go + +CGO_ENABLED=0 GOOS=windows GOARCH=amd64 go build -ldflags '-s -w' -o kucoin-market-for-windows.exe kucoin_market.go + diff --git a/builder/builder.go b/builder/builder.go new file mode 100644 index 0000000..4a2ab16 --- /dev/null +++ b/builder/builder.go @@ -0,0 +1,342 @@ +package builder + +import ( + "encoding/json" + "errors" + "fmt" + "github.com/Kucoin/kucoin-go-sdk" + "sync" + + "github.com/JetBlink/orderbook/base" + "github.com/JetBlink/orderbook/level3" + "github.com/Kucoin/kucoin-level3-sdk/helper" + "github.com/Kucoin/kucoin-level3-sdk/level3stream" + "github.com/Kucoin/kucoin-level3-sdk/utils/log" + "github.com/shopspring/decimal" +) + +type Builder struct { + apiService *kucoin.ApiService + symbol string + lock *sync.RWMutex + Messages chan json.RawMessage + + fullOrderBook *level3.OrderBook + + latestMatch struct { + Price string `json:"price"` + Size string `json:"size"` + } +} + +func NewBuilder(apiService *kucoin.ApiService, symbol string) *Builder { + return &Builder{ + apiService: apiService, + symbol: symbol, + lock: &sync.RWMutex{}, + Messages: make(chan json.RawMessage, helper.MaxMsgChanLen), + } +} + +func (b *Builder) resetOrderBook() { + b.lock.Lock() + b.fullOrderBook = level3.NewOrderBook() + b.lock.Unlock() +} + +func (b *Builder) ReloadOrderBook() { + defer func() { + if r := recover(); r != nil { + log.Error("ReloadOrderBook panic : %v", r) + b.ReloadOrderBook() + } + }() + + log.Warn("start running ReloadOrderBook, symbol: %s", b.symbol) + b.resetOrderBook() + + b.playback() + + for msg := range b.Messages { + l3Data, err := level3stream.NewStreamDataModel(msg) + if err != nil { + panic(err) + } + b.updateFromStream(l3Data) + } +} + +func (b *Builder) playback() { + log.Warn("prepare playback...") + + const tempMsgChanMaxLen = 200 + tempMsgChan := make(chan *level3stream.StreamDataModel, tempMsgChanMaxLen) + firstSequence := "" + var fullOrderBook *DepthResponse + + for msg := range b.Messages { + l3Data, err := level3stream.NewStreamDataModel(msg) + if err != nil { + panic(err) + } + + tempMsgChan <- l3Data + + if firstSequence == "" { + firstSequence = l3Data.Sequence + log.Error("firstSequence: %s", firstSequence) + } + + if len(tempMsgChan) > 5 { + if fullOrderBook == nil { + log.Warn("start getting full level3 order book data, symbol: %s", b.symbol) + fullOrderBook, err = b.GetAtomicFullOrderBook() + if err != nil { + continue + } + log.Error("got full level3 order book data, Sequence: %d", fullOrderBook.Sequence) + } + + if fullOrderBook != nil && fullOrderBook.Sequence < firstSequence { + log.Error("full data Sequence %d is too small", fullOrderBook.Sequence) + fullOrderBook = nil + continue + } + + if fullOrderBook != nil && fullOrderBook.Sequence <= l3Data.Sequence { + log.Warn("sequence match, start playback, tempMsgChan: %d", len(tempMsgChan)) + + b.lock.Lock() + b.AddDepthToOrderBook(fullOrderBook) + b.lock.Unlock() + + n := len(tempMsgChan) + for i := 0; i < n; i++ { + b.updateFromStream(<-tempMsgChan) + } + + log.Warn("finish playback.") + break + } + + if len(tempMsgChan) > tempMsgChanMaxLen-5 { + panic("playback failed, tempMsgChan is too long, retry...") + } + } + } +} + +func (b *Builder) AddDepthToOrderBook(depth *DepthResponse) { + b.fullOrderBook.Sequence = helper.ParseUint64OrPanic(depth.Sequence) + + for index, elem := range depth.Asks { + order, err := level3.NewOrder(elem[0], base.AskSide, elem[1], elem[2], uint64(index), nil) + if err != nil { + panic(err) + } + + if err := b.fullOrderBook.AddOrder(order); err != nil { + panic(err) + } + } + + for index, elem := range depth.Bids { + order, err := level3.NewOrder(elem[0], base.BidSide, elem[1], elem[2], uint64(index), nil) + if err != nil { + panic(err) + } + + if err := b.fullOrderBook.AddOrder(order); err != nil { + panic(err) + } + } +} + +func (b *Builder) updateFromStream(msg *level3stream.StreamDataModel) { + //time.Now().UnixNano() + //log.Info("msg: %s", string(msg.GetRawMessage())) + + b.lock.Lock() + defer b.lock.Unlock() + + skip, err := b.updateSequence(msg) + if err != nil { + panic(err) + } + + if !skip { + b.updateOrderBook(msg) + } +} + +func (b *Builder) updateSequence(msg *level3stream.StreamDataModel) (bool, error) { + fullOrderBookSequenceValue := b.fullOrderBook.Sequence + msgSequenceValue := helper.ParseUint64OrPanic(msg.Sequence) + + if fullOrderBookSequenceValue+1 > msgSequenceValue { + return true, nil + } + + if fullOrderBookSequenceValue+1 != msgSequenceValue { + return false, errors.New(fmt.Sprintf( + "currentSequence: %d, msgSequence: %s, the sequence is not continuous, 当前chanLen: %d", + b.fullOrderBook.Sequence, + msg.Sequence, + len(b.Messages), + )) + } + + //更新 + //!!! sequence 需要更新,通过判断 sequence 是否自增来校验数据完整性,否则重放数据。 + b.fullOrderBook.Sequence = msgSequenceValue + + return false, nil +} + +func (b *Builder) updateOrderBook(msg *level3stream.StreamDataModel) { + //[3]string{"orderId", "price", "size"} + //var item = [3]string{msg.OrderId, msg.Price, msg.Size} + + side := "" + switch msg.Side { + case level3stream.SellSide: + side = base.AskSide + case level3stream.BuySide: + side = base.BidSide + default: + panic("error side: " + msg.Side) + } + + switch msg.Type { + case level3stream.MessageReceivedType: + case level3stream.MessageOpenType: + data := &level3stream.StreamDataOpenModel{} + if err := json.Unmarshal(msg.GetRawMessage(), data); err != nil { + panic(err) + } + + if data.Price == "" || data.Size == "0" { + return + } + + order, err := level3.NewOrder(data.OrderId, side, data.Price, data.Size, helper.ParseUint64OrPanic(data.Time), nil) + if err != nil { + log.Error(string(msg.GetRawMessage())) + panic(err) + } + if err := b.fullOrderBook.AddOrder(order); err != nil { + panic(err) + } + case level3stream.MessageDoneType: + data := &level3stream.StreamDataDoneModel{} + if err := json.Unmarshal(msg.GetRawMessage(), data); err != nil { + panic(err) + } + if err := b.fullOrderBook.RemoveByOrderId(data.OrderId); err != nil { + panic(err) + } + + case level3stream.MessageMatchType: + data := &level3stream.StreamDataMatchModel{} + if err := json.Unmarshal(msg.GetRawMessage(), data); err != nil { + panic(err) + } + sizeValue, err := decimal.NewFromString(data.Size) + if err != nil { + panic(err) + } + if err := b.fullOrderBook.MatchOrder(data.MakerOrderId, sizeValue); err != nil { + panic(err) + } + b.latestMatch.Price = data.Price + b.latestMatch.Size = data.Size + + case level3stream.MessageChangeType: + data := &level3stream.StreamDataChangeModel{} + if err := json.Unmarshal(msg.GetRawMessage(), data); err != nil { + panic(err) + } + sizeValue, err := decimal.NewFromString(data.NewSize) + if err != nil { + panic(err) + } + if err := b.fullOrderBook.ChangeOrder(data.OrderId, sizeValue); err != nil { + panic(err) + } + + default: + panic("error msg type: " + msg.Type) + } +} + +func (b *Builder) Snapshot() (*FullOrderBook, error) { + data, err := b.SnapshotBytes() + if err != nil { + return nil, err + } + + ret := &FullOrderBook{} + if err := json.Unmarshal(data, ret); err != nil { + return nil, err + } + + return ret, nil +} + +func (b *Builder) SnapshotBytes() ([]byte, error) { + b.lock.RLock() + data, err := json.Marshal(b.fullOrderBook) + b.lock.RUnlock() + if err != nil { + return nil, err + } + + return data, nil +} + +func (b *Builder) GetPartOrderBook(number int) ([]byte, error) { + defer func() { + if r := recover(); r != nil { + log.Error("GetPartOrderBook panic : %v", r) + } + }() + + b.lock.RLock() + defer b.lock.RUnlock() + + data, err := json.Marshal(map[string]interface{}{ + "sequence": b.fullOrderBook.Sequence, + base.AskSide: b.fullOrderBook.GetPartOrderBookBySide(base.AskSide, number), + base.BidSide: b.fullOrderBook.GetPartOrderBookBySide(base.BidSide, number), + }) + + if err != nil { + return nil, err + } + + return data, nil +} + +func (b *Builder) GetTicker() ([]byte, error) { + defer func() { + if r := recover(); r != nil { + log.Error("GetTicker panic : %v", r) + } + }() + + b.lock.RLock() + defer b.lock.RUnlock() + + data, err := json.Marshal(map[string]interface{}{ + "sequence": b.fullOrderBook.Sequence, + "match": b.latestMatch, + base.AskSide: b.fullOrderBook.GetPartOrderBookBySide(base.AskSide, 1), + base.BidSide: b.fullOrderBook.GetPartOrderBookBySide(base.BidSide, 1), + }) + + if err != nil { + return nil, err + } + + return data, nil +} diff --git a/builder/depth.go b/builder/depth.go new file mode 100644 index 0000000..6b46299 --- /dev/null +++ b/builder/depth.go @@ -0,0 +1,33 @@ +package builder + +import "errors" + +type DepthResponse struct { + Sequence string `json:"sequence"` + Asks [][3]string `json:"asks"` + Bids [][3]string `json:"bids"` +} + +type FullOrderBook struct { + Sequence uint64 `json:"sequence"` + Asks [][3]string `json:"asks"` + Bids [][3]string `json:"bids"` +} + +func (b *Builder) GetAtomicFullOrderBook() (*DepthResponse, error) { + rsp, err := b.apiService.AtomicFullOrderBook(b.symbol) + if err != nil { + return nil, err + } + + c := &DepthResponse{} + if err := rsp.ReadData(c); err != nil { + return nil, err + } + + if c.Sequence == "" { + return nil, errors.New("empty key sequence") + } + + return c, nil +} diff --git a/demo/python-demo/__init__.py b/demo/python-demo/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/demo/python-demo/config.py b/demo/python-demo/config.py new file mode 100644 index 0000000..8b70f8c --- /dev/null +++ b/demo/python-demo/config.py @@ -0,0 +1,5 @@ +rpc_config = { + 'host': '127.0.0.1', + 'port': 9090, + 'token': 'market-token' +} diff --git a/demo/python-demo/level3/__init__.py b/demo/python-demo/level3/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/demo/python-demo/level3/rpc.py b/demo/python-demo/level3/rpc.py new file mode 100644 index 0000000..74d1023 --- /dev/null +++ b/demo/python-demo/level3/rpc.py @@ -0,0 +1,108 @@ +import json +import socket + + +class RPC(object): + __slots__ = ('port', 'host', 'token', 'conn') + + def __init__(self, host: str, port: int, token: str): + """ + :param host: + :param port: + :param token: + """ + self.port = port + self.host = host + self.token = token + """ + create tcp connect + """ + self.conn = socket.create_connection((self.host, self.port)) + + def read(self) -> str: + """ + :return: + """ + rev = b'' + while True: + c = self.conn.recv(1) + if c == b'\n' or c == b'': + break + else: + rev += c + return rev.decode("utf-8") + + def execute(self, data: map) -> map: + """ + :param data: + :return: + """ + data['id'] = 0 + + self.conn.sendall(json.dumps(data).encode()) + + response = json.loads(self.read()) + + if response.get('id') != 0: + raise Exception("expected id=%s, received id=%s: %s" % (0, response.get('id'), response.get('error'))) + + if response.get('error') is not None: + raise Exception(response.get('error')) + + result = json.loads(response.get('result')) + + if result['code'] != '0': + raise Exception("rpc execute fail: %s" % result['error']) + + return result + + def close(self): + """ + :return: + """ + self.conn.close() + + def call(self, method: str, **kwargs): + """ + :param method: + :param kwargs: + :return: + """ + params = { + 'token': self.token, + } + if kwargs: + params.update(kwargs) + + data = { + 'method': "Server.%s" % method, + 'params': [params], + } + + return self.execute(data) + + def get_ticker(self, number): + result = self.call("GetPartOrderBook", number=number) + ticker = json.loads(result['data']) + if ticker['sequence'] == 0: + raise Exception("rpc get ticker fail: sequence is null") + return ticker + + def get_all_ticker(self): + result = self.call("GetOrderBook") + ticker = json.loads(result['data']) + if ticker['sequence'] == 0: + raise Exception("rpc get all ticker fail: sequence is null") + return ticker + + def add_event_order_id(self, data, channel): + args = {} + for i in data: + args[i] = [channel] + return self.call("AddEventOrderIdsToChannels", data=args) + + def add_event_client_id(self, data, channel): + args = {} + for i in data: + args[i] = [channel] + return self.call("AddEventClientOidsToChannels", data=args) diff --git a/demo/python-demo/order_book_demo.py b/demo/python-demo/order_book_demo.py new file mode 100644 index 0000000..6569338 --- /dev/null +++ b/demo/python-demo/order_book_demo.py @@ -0,0 +1,60 @@ +import time +from level3.rpc import RPC +from config import rpc_config +from decimal import Decimal +import os +import platform + +if __name__ == '__main__': + cmd = '' + system_os = platform.system() + if system_os == 'Windows': + cmd = 'cls' + elif system_os == 'Darwin' or system_os == 'Linux': + cmd = 'clear' + else: + raise Exception('unsupported system') + + while True: + rpc = RPC(rpc_config['host'], rpc_config['port'], rpc_config['token']) + + data = rpc.get_ticker(100) + + asks = data['asks'] + bids = data['bids'] + + price_list = [{}, {}] + + for ask in asks: + if ask[1] not in price_list[0].keys(): + price_list[0].update({ask[1]: Decimal(ask[2])}) + else: + price_list[0].update({ + ask[1]: price_list[0][ask[1]] + Decimal(ask[2]) + }) + if len(price_list[0]) >= 13: + price_list[0].pop(ask[1]) + break + + for bid in bids: + if bid[1] not in price_list[1].keys(): + price_list[1].update({bid[1]: Decimal(bid[2])}) + else: + price_list[1].update({ + bid[1]: price_list[1][bid[1]] + Decimal(bid[2]) + }) + if len(price_list[1]) >= 13: + price_list[1].pop(bid[1]) + break + + d1 = sorted(price_list[0].items(), key=lambda v: v[0], reverse=True) + d2 = sorted(price_list[1].items(), key=lambda v: v[0], reverse=True) + + os.system(cmd) + + for d in d1: + print("{} => {}".format(d[0], d[1])) + print("---Spread---") + for d in d2: + print("{} => {}".format(d[0], d[1])) + time.sleep(0.5) diff --git a/docker-entrypoint.sh b/docker-entrypoint.sh new file mode 100755 index 0000000..2f90106 --- /dev/null +++ b/docker-entrypoint.sh @@ -0,0 +1,10 @@ +#!/bin/bash + +set -e + +# first arg is `-f` or `--some-option` +if [ "${1#-}" != "$1" ]; then + set -- kucoin_market "$@" +fi + +exec "$@" diff --git a/events/l3_events.go b/events/l3_events.go new file mode 100644 index 0000000..ec255d2 --- /dev/null +++ b/events/l3_events.go @@ -0,0 +1,180 @@ +package events + +import ( + "encoding/json" + "sync" + + "github.com/Kucoin/kucoin-level3-sdk/helper" + "github.com/Kucoin/kucoin-level3-sdk/level3stream" + "github.com/Kucoin/kucoin-level3-sdk/service" + "github.com/Kucoin/kucoin-level3-sdk/utils/log" +) + +type Watcher struct { + Messages chan json.RawMessage + redisPool *service.Redis + lock *sync.RWMutex + + orderIds map[string]map[string]bool + clientOids map[string]map[string]bool +} + +func NewWatcher(redisPool *service.Redis) *Watcher { + return &Watcher{ + Messages: make(chan json.RawMessage, helper.MaxMsgChanLen), + redisPool: redisPool, + lock: &sync.RWMutex{}, + + orderIds: make(map[string]map[string]bool), + clientOids: make(map[string]map[string]bool), + } +} + +//todo 以后增加 对 已经交易和完成订单的跟踪,方便本地维护订单的状态 +func (w *Watcher) Run() { + log.Warn("start running Watcher") + + for msg := range w.Messages { + if !w.existEventOrderIds() { + continue + } + + l3Data, err := level3stream.NewStreamDataModel(msg) + if err != nil { + panic(err) + } + + switch l3Data.Type { + case level3stream.MessageReceivedType: + data := &level3stream.StreamDataReceivedModel{} + if err := json.Unmarshal(l3Data.GetRawMessage(), data); err != nil { + panic(err) + } + + w.lock.RLock() + channelsMap, ok := w.clientOids[data.ClientOid] + channels := getMapKeys(channelsMap) + w.lock.RUnlock() + if ok { + w.removeEventClientOid(data.ClientOid) + w.AddEventOrderIdsToChannels(map[string][]string{ + data.OrderId: channels, + }) + } + + w.publish(data.OrderId, string(l3Data.GetRawMessage())) + + case level3stream.MessageOpenType: + data := &level3stream.StreamDataOpenModel{} + if err := json.Unmarshal(l3Data.GetRawMessage(), data); err != nil { + panic(err) + } + w.publish(data.OrderId, string(l3Data.GetRawMessage())) + + case level3stream.MessageMatchType: + data := &level3stream.StreamDataMatchModel{} + if err := json.Unmarshal(l3Data.GetRawMessage(), data); err != nil { + panic(err) + } + + w.publish(data.MakerOrderId, string(l3Data.GetRawMessage())) + w.publish(data.TakerOrderId, string(l3Data.GetRawMessage())) + + case level3stream.MessageDoneType: + data := &level3stream.StreamDataDoneModel{} + if err := json.Unmarshal(l3Data.GetRawMessage(), data); err != nil { + panic(err) + } + + w.publish(data.OrderId, string(l3Data.GetRawMessage())) + w.removeEventOrderId(data.OrderId) + + case level3stream.MessageChangeType: + data := &level3stream.StreamDataChangeModel{} + if err := json.Unmarshal(l3Data.GetRawMessage(), data); err != nil { + panic(err) + } + + w.publish(data.OrderId, string(l3Data.GetRawMessage())) + + default: + panic("error msg type: " + l3Data.Type) + } + } +} + +func getMapKeys(data map[string]bool) []string { + keys := make([]string, 0, len(data)) + for k := range data { + keys = append(keys, k) + } + + return keys +} + +func (w *Watcher) publish(orderId string, message string) { + w.lock.RLock() + channelsMap, ok := w.orderIds[orderId] + channels := getMapKeys(channelsMap) + w.lock.RUnlock() + + if ok { + for _, channel := range channels { + if err := w.redisPool.Publish(channel, message); err != nil { + log.Error("redis publish to %s, msg: %s, error: %s", channel, message, err.Error()) + return + } + } + } +} + +func (w *Watcher) existEventOrderIds() bool { + w.lock.RLock() + defer w.lock.RUnlock() + if len(w.orderIds) == 0 && len(w.clientOids) == 0 { + return false + } + + return true +} + +func (w *Watcher) AddEventOrderIdsToChannels(data map[string][]string) { + w.lock.Lock() + defer w.lock.Unlock() + + for orderId, channels := range data { + for _, channel := range channels { + if w.orderIds[orderId] == nil { + w.orderIds[orderId] = make(map[string]bool) + } + w.orderIds[orderId][channel] = true + } + } +} + +func (w *Watcher) AddEventClientOidsToChannels(data map[string][]string) { + w.lock.Lock() + for clientOid, channels := range data { + for _, channel := range channels { + if w.clientOids[clientOid] == nil { + w.clientOids[clientOid] = make(map[string]bool) + } + w.clientOids[clientOid][channel] = true + } + } + w.lock.Unlock() +} + +func (w *Watcher) removeEventOrderId(orderId string) { + w.lock.Lock() + defer w.lock.Unlock() + + delete(w.orderIds, orderId) +} + +func (w *Watcher) removeEventClientOid(clientOid string) { + w.lock.Lock() + defer w.lock.Unlock() + + delete(w.clientOids, clientOid) +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..9cb62a6 --- /dev/null +++ b/go.mod @@ -0,0 +1,11 @@ +module github.com/Kucoin/kucoin-level3-sdk + +go 1.13 + +require ( + github.com/JetBlink/orderbook v1.0.0 + github.com/Kucoin/kucoin-go-sdk v1.1.10 + github.com/go-redis/redis v6.15.6+incompatible + github.com/joho/godotenv v1.3.0 + github.com/shopspring/decimal v0.0.0-20191009025716-f1972eb1d1f5 +) diff --git a/helper/helper.go b/helper/helper.go new file mode 100644 index 0000000..e8c5c52 --- /dev/null +++ b/helper/helper.go @@ -0,0 +1,16 @@ +package helper + +import ( + "strconv" +) + +const MaxMsgChanLen = 1024 + +func ParseUint64OrPanic(item string) uint64 { + value, err := strconv.ParseUint(item, 10, 64) + if err != nil { + panic(err) + } + + return value +} diff --git a/helper/str/string.go b/helper/str/string.go new file mode 100644 index 0000000..e887684 --- /dev/null +++ b/helper/str/string.go @@ -0,0 +1,28 @@ +package str + +import ( + "errors" + + "github.com/shopspring/decimal" +) + +func Diff(a string, b string) error { + if a == b { + return nil + } + + aF, err := decimal.NewFromString(a) + if err != nil { + return err + } + bF, err := decimal.NewFromString(b) + if err != nil { + return err + } + + if !aF.Equal(bF) { + return errors.New("not equal: " + a + " != " + b) + } + + return nil +} diff --git a/kucoin_market.go b/kucoin_market.go new file mode 100644 index 0000000..1a0ba5b --- /dev/null +++ b/kucoin_market.go @@ -0,0 +1,28 @@ +package main + +import ( + "flag" + "github.com/Kucoin/kucoin-level3-sdk/app" + + "github.com/Kucoin/kucoin-level3-sdk/utils/log" + "github.com/joho/godotenv" +) + +func main() { + envFile := flag.String("c", ".env", ".env file") + symbol := flag.String("symbol", "", "SYMBOL") + rpcPort := flag.String("p", "", "rpc port") + rpcKey := flag.String("rpckey", "", "market maker redis rpckey") + + flag.Parse() + + loadEnv(*envFile) + app.NewApp(*symbol, *rpcPort, *rpcKey).Run() +} + +func loadEnv(file string) { + err := godotenv.Load(file) + if err != nil { + log.Error("Error loading .env file: %s", file) + } +} diff --git a/level3stream/stream_data_model.go b/level3stream/stream_data_model.go new file mode 100644 index 0000000..68b4899 --- /dev/null +++ b/level3stream/stream_data_model.go @@ -0,0 +1,93 @@ +package level3stream + +import ( + "encoding/json" +) + +type StreamDataModel struct { + Sequence string `json:"sequence"` + Symbol string `json:"symbol"` + Type string `json:"type"` + Side string `json:"side"` + rawMessage json.RawMessage +} + +func NewStreamDataModel(msgData json.RawMessage) (*StreamDataModel, error) { + l3Data := &StreamDataModel{} + + if err := json.Unmarshal(msgData, l3Data); err != nil { + return nil, err + } + l3Data.rawMessage = msgData + + return l3Data, nil +} + +func (l3Data *StreamDataModel) GetRawMessage() json.RawMessage { + return l3Data.rawMessage +} + +const ( + BuySide = "buy" + SellSide = "sell" + + LimitOrderType = "limit" + MarketOrderType = "market" + + MessageDoneCanceled = "canceled" + MessageDoneFilled = "filled" + + MessageReceivedType = "received" + MessageOpenType = "open" + MessageDoneType = "done" + MessageMatchType = "match" + MessageChangeType = "change" +) + +type StreamDataReceivedModel struct { + OrderType string `json:"orderType"` + Side string `json:"side"` + //Size string `json:"size"` + Price string `json:"price"` + //Funds string `json:"funds"` + OrderId string `json:"orderId"` + Time string `json:"time"` + ClientOid string `json:"clientOid"` +} + +type StreamDataOpenModel struct { + Side string `json:"side"` + Size string `json:"size"` + OrderId string `json:"orderId"` + Price string `json:"price"` + Time string `json:"time"` + //RemainSize string `json:"remainSize"` +} + +type StreamDataDoneModel struct { + Side string `json:"side"` + Size string `json:"size"` + Reason string `json:"reason"` + OrderId string `json:"orderId"` + Price string `json:"price"` + Time string `json:"time"` +} + +type StreamDataMatchModel struct { + Side string `json:"side"` + Size string `json:"size"` + Price string `json:"price"` + TakerOrderId string `json:"takerOrderId"` + MakerOrderId string `json:"makerOrderId"` + Time string `json:"time"` + TradeId string `json:"tradeId"` +} + +type StreamDataChangeModel struct { + Side string `json:"side"` + NewSize string `json:"newSize"` + OldSize string `json:"oldSize"` + Price string `json:"price"` + OrderId string `json:"orderId"` + Time string `json:"time"` +} diff --git a/service/redis.go b/service/redis.go new file mode 100644 index 0000000..787bbbc --- /dev/null +++ b/service/redis.go @@ -0,0 +1,40 @@ +package service + +import ( + "github.com/Kucoin/kucoin-level3-sdk/utils/log" + "github.com/go-redis/redis" +) + +type Redis struct { + redisPool *redis.Client +} + +const RedisKeyPrefix = "kucoinMarket:rpcKey:" + +func NewRedis(addr, password string, db int, rpcKey string, symbol string, rpcPort string) *Redis { + log.Warn("connect to redis: " + addr) + + redisPool := redis.NewClient(&redis.Options{ + Addr: addr, + Password: password, // no password set + DB: db, // use default DB + //DialTimeout: 10 * time.Second, + //ReadTimeout: 30 * time.Second, + //WriteTimeout: 30 * time.Second, + //PoolSize: 10, + //PoolTimeout: 30 * time.Second, + }) + + //redisKey like: kucoinMarket:rpcKey:KCS-USDT:rpcKey + if err := redisPool.Set(RedisKeyPrefix+symbol+":"+rpcKey, rpcPort, 0).Err(); err != nil { + panic("connect to redis failed: " + err.Error()) + } + + return &Redis{ + redisPool: redisPool, + } +} + +func (r *Redis) Publish(channel string, message interface{}) error { + return r.redisPool.Publish(channel, message).Err() +} diff --git a/utils/log/log.go b/utils/log/log.go new file mode 100644 index 0000000..3fbb7f6 --- /dev/null +++ b/utils/log/log.go @@ -0,0 +1,24 @@ +package log + +import ( + "log" + "os" +) + +var logger = log.New(os.Stdout, "", log.LstdFlags) + +func Info(format string, v ...interface{}) { + logger.Printf("[Info] "+format+"\n", v...) +} + +func Warn(format string, v ...interface{}) { + logger.Printf("\033[33m[Warn] "+format+"\033[0m\n", v...) +} + +func Error(format string, v ...interface{}) { + logger.Printf("\033[31m[Error] "+format+"\033[0m\n", v...) +} + +func Fatal(format string, v ...interface{}) { + logger.Fatalf("\033[31m[Fatal] "+format+"\033[0m\n", v...) +} diff --git a/utils/log/log_test.go b/utils/log/log_test.go new file mode 100644 index 0000000..84d1806 --- /dev/null +++ b/utils/log/log_test.go @@ -0,0 +1,19 @@ +package log + +import "testing" + +func TestInfo(t *testing.T) { + Info("this is a info msg: %s", "hello world.") +} + +func TestWarn(t *testing.T) { + Warn("this is a warn msg: %s", "hello world.") +} + +func TestError(t *testing.T) { + Error("this is a error msg: %s", "hello world.") +} + +func TestFatal(t *testing.T) { + //Fatal("this is a error msg: %s", "hello world.") +} diff --git a/utils/recovery/recovery.go b/utils/recovery/recovery.go new file mode 100644 index 0000000..9eca5bb --- /dev/null +++ b/utils/recovery/recovery.go @@ -0,0 +1,91 @@ +package recovery + +import ( + "bytes" + "fmt" + "io/ioutil" + "runtime" + "time" +) + +func Recover(handler func(stack string)) func() { + return func() { + if r := recover(); r != nil { + stack := Stack(3) + reset := string([]byte{27, 91, 48, 109}) + handler(fmt.Sprintf("[%s] panic recovered: %s\n%s%s", TimeFormat(time.Now()), r, stack, reset)) + } + } +} + +var ( + dunno = []byte("???") + centerDot = []byte("·") + dot = []byte(".") + slash = []byte("/") +) + +// stack returns a nicely formatted stack frame, skipping skip frames. +func Stack(skip int) string { + buf := new(bytes.Buffer) // the returned data + // As we loop, we open files and read them. These variables record the currently + // loaded file. + var lines [][]byte + var lastFile string + for i := skip; ; i++ { // Skip the expected number of frames + pc, file, line, ok := runtime.Caller(i) + if !ok { + break + } + // Print this much at least. If we can't find the source, it won't show. + fmt.Fprintf(buf, "%s:%d (0x%x)\n", file, line, pc) + if file != lastFile { + data, err := ioutil.ReadFile(file) + if err != nil { + continue + } + lines = bytes.Split(data, []byte{'\n'}) + lastFile = file + } + fmt.Fprintf(buf, "\t%s: %s\n", function(pc), source(lines, line)) + } + return string(buf.Bytes()) +} + +// source returns a space-trimmed slice of the n'th line. +func source(lines [][]byte, n int) []byte { + n-- // in stack trace, lines are 1-indexed but our array is 0-indexed + if n < 0 || n >= len(lines) { + return dunno + } + return bytes.TrimSpace(lines[n]) +} + +// function returns, if possible, the name of the function containing the PC. +func function(pc uintptr) []byte { + fn := runtime.FuncForPC(pc) + if fn == nil { + return dunno + } + name := []byte(fn.Name()) + // The name includes the path name to the package, which is unnecessary + // since the file name is already included. Plus, it has center dots. + // That is, we see + // runtime/debug.*T·ptrmethod + // and want + // *T.ptrmethod + // Also the package path might contains dot (e.g. code.google.com/...), + // so first eliminate the path prefix + if lastslash := bytes.LastIndex(name, slash); lastslash >= 0 { + name = name[lastslash+1:] + } + if period := bytes.Index(name, dot); period >= 0 { + name = name[period+1:] + } + name = bytes.Replace(name, centerDot, dot, -1) + return name +} + +func TimeFormat(t time.Time) string { + return t.Format("2006/01/02 15:04:05") +}