Skip to content
This repository has been archived by the owner on Aug 12, 2024. It is now read-only.

Commit

Permalink
Merge pull request #2 from sh7ning-mirror/master
Browse files Browse the repository at this point in the history
Optimize playback
  • Loading branch information
Ive20 authored Mar 10, 2020
2 parents de3419f + 3f0ca52 commit d0f1b0d
Showing 1 changed file with 16 additions and 15 deletions.
31 changes: 16 additions & 15 deletions builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/Kucoin/kucoin-go-sdk"
"sync"

"github.com/JetBlink/orderbook/base"
"github.com/JetBlink/orderbook/level3"
"github.com/Kucoin/kucoin-go-sdk"
"github.com/Kucoin/kucoin-level3-sdk/helper"
"github.com/Kucoin/kucoin-level3-sdk/level3stream"
"github.com/Kucoin/kucoin-level3-sdk/utils/log"
Expand All @@ -34,7 +34,7 @@ func NewBuilder(apiService *kucoin.ApiService, symbol string) *Builder {
apiService: apiService,
symbol: symbol,
lock: &sync.RWMutex{},
Messages: make(chan json.RawMessage, helper.MaxMsgChanLen),
Messages: make(chan json.RawMessage, helper.MaxMsgChanLen*1024),
}
}

Expand All @@ -45,12 +45,12 @@ func (b *Builder) resetOrderBook() {
}

func (b *Builder) ReloadOrderBook() {
defer func() {
if r := recover(); r != nil {
log.Error("ReloadOrderBook panic : %v", r)
b.ReloadOrderBook()
}
}()
//defer func() {
// if r := recover(); r != nil {
// log.Error("ReloadOrderBook panic : %v", r)
// b.ReloadOrderBook()
// }
//}()

log.Warn("start running ReloadOrderBook, symbol: %s", b.symbol)
b.resetOrderBook()
Expand All @@ -69,7 +69,7 @@ func (b *Builder) ReloadOrderBook() {
func (b *Builder) playback() {
log.Warn("prepare playback...")

const tempMsgChanMaxLen = 200
const tempMsgChanMaxLen = 10240
tempMsgChan := make(chan *level3stream.StreamDataModel, tempMsgChanMaxLen)
firstSequence := ""
var fullOrderBook *DepthResponse
Expand All @@ -92,13 +92,18 @@ func (b *Builder) playback() {
log.Warn("start getting full level3 order book data, symbol: %s", b.symbol)
fullOrderBook, err = b.GetAtomicFullOrderBook()
if err != nil {
panic(err)
continue
}
log.Error("got full level3 order book data, Sequence: %d", fullOrderBook.Sequence)
log.Error("got full level3 order book data, Sequence: %s", fullOrderBook.Sequence)
}

if len(tempMsgChan) > tempMsgChanMaxLen-5 {
panic("playback failed, tempMsgChan is too long, retry...")
}

if fullOrderBook != nil && fullOrderBook.Sequence < firstSequence {
log.Error("full data Sequence %d is too small", fullOrderBook.Sequence)
log.Error("full data Sequence %s is too small", fullOrderBook.Sequence)
fullOrderBook = nil
continue
}
Expand All @@ -118,10 +123,6 @@ func (b *Builder) playback() {
log.Warn("finish playback.")
break
}

if len(tempMsgChan) > tempMsgChanMaxLen-5 {
panic("playback failed, tempMsgChan is too long, retry...")
}
}
}
}
Expand Down

0 comments on commit d0f1b0d

Please sign in to comment.