Skip to content

Commit

Permalink
Merge pull request #39 from KyberNetwork/update_ws
Browse files Browse the repository at this point in the history
update disconnect flow
  • Loading branch information
ngocthanh1389 authored Apr 15, 2024
2 parents a80ec8c + dc69270 commit 9857c9a
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 20 deletions.
1 change: 0 additions & 1 deletion cmd/tradelogs/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ func run(c *cli.Context) error {

bc := tradelogs.NewBroadcaster(tradeLogChan)
go bc.BroadcastLog()
go bc.CheckDisconnect()
httpTradelogs := tradelogs.New(l, s, c.String(libapp.HTTPServerFlag.Name), bc)
go func() {
if err := httpTradelogs.Run(); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ go 1.21.1
require (
cloud.google.com/go/bigquery v1.56.0
github.com/KyberNetwork/cclog v1.1.0
github.com/KyberNetwork/tradinglib v0.4.19
github.com/TheZeroSlave/zapsentry v1.20.2
github.com/ethereum/go-ethereum v1.13.14
github.com/getsentry/sentry-go v0.26.0
Expand Down Expand Up @@ -34,6 +33,7 @@ require (
github.com/apache/arrow/go/v12 v12.0.0 // indirect
github.com/apache/thrift v0.16.0 // indirect
github.com/bits-and-blooms/bitset v1.13.0 // indirect
github.com/btcsuite/btcd/chaincfg/chainhash v1.0.2 // indirect
github.com/bytedance/sonic v1.10.2 // indirect
github.com/cespare/cp v1.1.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
Expand Down
4 changes: 0 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ github.com/KyberNetwork/cclog v1.1.0 h1:3gqKpSayABuTjS4J7H8qcQHrlfyggpjmHUnCYQT0
github.com/KyberNetwork/cclog v1.1.0/go.mod h1:vf9+yocFGEyqqObn4Gr9rtj4ffnuKVpKWiSCtfsxIHg=
github.com/KyberNetwork/evmlistener v0.4.7 h1:SlJwzqngj2N2ot/1M391GbwE/A6wyKfV9z1RIlxZpyI=
github.com/KyberNetwork/evmlistener v0.4.7/go.mod h1:7ylrHTrF9bRJRcVdw02f4P2k3ohc1/gKGQ3HgaYtfE8=
github.com/KyberNetwork/tradinglib v0.4.19 h1:pVyRacHZY9xMhEiJHjVPKVhscdSQjoG2pIR291FnKSo=
github.com/KyberNetwork/tradinglib v0.4.19/go.mod h1:HwQjz6Iv2Nn/cQYnToYCZytjhzxLHs60JMXLeiZKcvc=
github.com/Masterminds/squirrel v1.5.4 h1:uUcX/aBc8O7Fg9kaISIUsHXdKuqehiXAMQTYX8afzqM=
github.com/Masterminds/squirrel v1.5.4/go.mod h1:NNaOrjSoIDfDA40n7sr2tPNZRfjzjA400rg+riTZj10=
github.com/Microsoft/go-winio v0.6.1 h1:9/kr64B9VUZrLm5YYwbGtUJnMgqWVOdUAXu6Migciow=
Expand Down Expand Up @@ -463,8 +461,6 @@ github.com/shopspring/decimal v1.3.1/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFR
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo=
github.com/sourcegraph/conc v0.3.0/go.mod h1:Sdozi7LEKbFPqYX2/J+iBAM6HpqSLTASQIKqDmF7Mt0=
github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ=
github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE=
github.com/spf13/cobra v0.0.5/go.mod h1:3K3wKZymM7VvHMDS9+Akkh4K60UwM26emMESw8tLCHU=
Expand Down
32 changes: 18 additions & 14 deletions internal/server/tradelogs/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,19 @@ func (b *Broadcaster) BroadcastLog() {
func (b *Broadcaster) addConn(event, maker string, conn *websocket.Conn) {
id := xid.New().String()
b.l.Infow("connected socket", "id", id)
go func() {
msgType, msg, err := conn.ReadMessage()
b.l.Infow("read msg result", "id", id, "msgType", msgType, "msg", msg, "err", err)
if err != nil {
b.mu.Lock()
e, ok := b.clients[fmt.Sprintf("%s-%s", event, maker)]
if !ok {
return
}
delete(e, id)
b.mu.Unlock()
}
}()
b.mu.Lock()
cons, ok := b.clients[combine(event, maker)]
if !ok {
Expand All @@ -68,20 +81,11 @@ func combine(event, maker string) string {
return fmt.Sprintf("%s-%s", event, maker)
}

func (b *Broadcaster) CheckDisconnect() {
for {
b.mu.Lock()
for _, cons := range b.clients {
for id, c := range cons {
if _, _, err := c.ws.ReadMessage(); err != nil {
if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway, websocket.CloseNoStatusReceived) {
b.l.Infow("socket is closed", "id", id)
delete(cons, id)
}
}
}
func (b *Broadcaster) Test() {
for range time.NewTicker(time.Second * 5).C {
b.tradeLogChan <- storage.TradeLog{
EventHash: "0xac75f773e3a92f1a02b12134d65e1f47f8a14eabe4eaf1e24624918e6a8b269f",
Maker: "0x807cF9A772d5a3f9CeFBc1192e939D62f0D9bD38",
}
b.mu.Unlock()
time.Sleep(time.Minute)
}
}
38 changes: 38 additions & 0 deletions internal/server/tradelogs/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package server

import (
"encoding/json"
"log"
"testing"
"time"

"github.com/KyberNetwork/tradelogs/pkg/storage"
"github.com/gorilla/websocket"
"github.com/stretchr/testify/require"
)

const (
url1 = "wss://tradelogs.kyberengineering.io/eventlogws?maker=0x807cF9A772d5a3f9CeFBc1192e939D62f0D9bD38&event_hash=0xac75f773e3a92f1a02b12134d65e1f47f8a14eabe4eaf1e24624918e6a8b269f"
url2 = "ws://localhost:8080/eventlogws?maker=0x807cF9A772d5a3f9CeFBc1192e939D62f0D9bD38&event_hash=0xac75f773e3a92f1a02b12134d65e1f47f8a14eabe4eaf1e24624918e6a8b269f"
)

func TestWSClient(t *testing.T) {
t.Skip()
for {
conn, _, err := websocket.DefaultDialer.Dial(url2, nil)
if err != nil {
log.Fatal("dial:", err)
}
for range time.NewTicker(time.Second).C {
if _, data, err := conn.ReadMessage(); err == nil {
var l storage.TradeLog
err := json.Unmarshal(data, &l)
require.NoError(t, err)
log.Println("receive:", l)
} else {
conn.Close()
break
}
}
}
}

0 comments on commit 9857c9a

Please sign in to comment.