-
Notifications
You must be signed in to change notification settings - Fork 261
Feat/binance exchange ws get trade history/get latest trade cursor #719
base: master
Are you sure you want to change the base?
Feat/binance exchange ws get trade history/get latest trade cursor #719
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added some comments. this one turned out to be a bit heavier than the first two.
plugins/binanceExchange_ws.go
Outdated
Name string `json:"e"` | ||
} | ||
|
||
// "E": 1499405658658, // Event time |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you please put these comments next to the struct, will be much easier to read and more meaningful there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PS: Is this struct copied from somewhere where we can import it rather than defining it ourselves?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PPS: if it's not defined elsewhere but it captures the return values of the binance web socket API then it's probably better suited to live in this new file /support/sdk/binance-ws.go
plugins/binanceExchange_ws.go
Outdated
}) | ||
|
||
return &stream{doneC: doneC, stopC: stopC, cleanup: func() { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if there's nothing defined in the cleanup function then we can pass nil
?
(or was the intention to pass in some cleanup code here?)
plugins/binanceExchange_ws.go
Outdated
@@ -220,6 +394,19 @@ func subcribeBook(symbol string, state *mapEvents) (*stream, error) { | |||
|
|||
} | |||
|
|||
//ListenKey expires every 60 minutes | |||
func keepAliveStreamService(client *binance.Client, key string) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this the function that keeps pinging binance so it maintains the connection?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes
plugins/binanceExchange_ws.go
Outdated
log.Printf("Error keepAliveStreamService %v\n", err) | ||
} | ||
|
||
go keepAliveStreamService(client, key) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why run this in a new goroutine?
We should run this without kicking off a new goroutine and the code that calls keepAliveStreamService
for the first time should kick off the goroutine.
the way it works right now (if I'm reading correctly, not sure), then when you call makeBinanceWs
it will sleep the main thread for 50 minutes. If that's true then that may not be what we are looking for and the above suggestion may help improve the situation
plugins/binanceExchange_ws.go
Outdated
listenKey, err := binanceClient.NewStartUserStreamService().Do(context.Background()) | ||
|
||
if err != nil { | ||
return nil, err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you wrap the error like:
fmt.Errorf("some meaningful message like 'unable to start new user stream service': %s", err)
wrapping errors provide a little more context and a better stack trace.
err := json.Unmarshal(message, event) | ||
|
||
if err != nil { | ||
log.Printf("Error unmarshal %s to eventExecutionReport\n", string(message)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this fails here then does that mean that subcribeUserStream
has failed?
If so then we should also cause subcribeUserStream
to return an error in this case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the only way to capture errors when binance it's sending events it's to use another map(state) to save the error when happens and when any method from the interface it's called to check if for the stream "X" which it's using is any error
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the stream errors in update 1, but passes in update 2, and the user fetched the value of update 2 only, we don’t care about the error in update 1.
With that in mind, what I’m thinking is that when we write a value to the existing map (the data structure), we also include an error. So if there is an error in the stream we should write an error value.
When the user fetched the value they will check the error first and if it is an error they will handle appropriately (bubble upstream IIRC).
That allows us to keep the date of the response and error encapsulated into one map. What do you think?
return | ||
} | ||
|
||
userStreamLock.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need to lock here?
wondering in which scenario may this be executed twice, or maybe I've missed something
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is to avoid any misusage of the callback and to not have invalid data, because I am modifying the slice in the callback and the possibility for that callback to be executed by two goroutines exists I've added the mutex
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok will look at this a little more closely when I’m on a computer
data, isOk := history.data.(History) | ||
|
||
if !isOk { | ||
log.Printf("Error conversion %v\n", ErrConversionHistory) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this also cause the stream to fail and throw an error somewhere?
I'm concerned that all these errors here will be hidden errors that will not cause the code to crash.
we want the bot to crash rather than throw silent errors
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
one option it's to panic when I get errors from the stream, the second it's the one that I've written above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See above comment. Once we agree on a pattern we can apply to all streams.
lastCursor = cursor | ||
} | ||
} else { | ||
log.Printf("Error converting cursor %v\n", ErrConversionCursor) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe we should return an error here rather than just logging. we want the bot to crash if there is any error
plugins/binanceExchange_ws.go
Outdated
@@ -140,12 +220,14 @@ func makeMapEvents() *mapEvents { | |||
type events struct { | |||
SymbolStats *mapEvents | |||
BookStats *mapEvents | |||
OrderEvents *mapEvents |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this called OrderEvents
?
I'm confused because we are fetching the User Stream and getting Trade History, so was expecting this to be called either TradeHistoryEvents
or UserEvents
plugins/binanceExchange_ws.go
Outdated
|
||
if err != nil { | ||
log.Printf("Error keepAliveStreamService %v\n", err) | ||
panic(err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should find a way to not panic here. Sorry didn’t see this earlier.
Panics are very bad in kelp for many reasons and I can go into detail if interested.
Guidelines for submitting code to Kelp
Before Submitting a PR
Problem Identification
Research Solution
Technical Design Discussion
Implementation
Pull Request
Handling of PRs
Typically, this is what is expected from the author of a Pull Request:
Draft PRs