diff --git a/cmd/tradelogs/main.go b/cmd/tradelogs/main.go index c260ffa..f74484f 100644 --- a/cmd/tradelogs/main.go +++ b/cmd/tradelogs/main.go @@ -129,7 +129,6 @@ func run(c *cli.Context) error { bc := tradelogs.NewBroadcaster(tradeLogChan) go bc.BroadcastLog() - go bc.CheckDisconnect() httpTradelogs := tradelogs.New(l, s, c.String(libapp.HTTPServerFlag.Name), bc) go func() { if err := httpTradelogs.Run(); err != nil { diff --git a/go.mod b/go.mod index 4eaf443..f8b3f51 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,6 @@ go 1.21.1 require ( cloud.google.com/go/bigquery v1.56.0 github.com/KyberNetwork/cclog v1.1.0 - github.com/KyberNetwork/tradinglib v0.4.19 github.com/TheZeroSlave/zapsentry v1.20.2 github.com/ethereum/go-ethereum v1.13.14 github.com/getsentry/sentry-go v0.26.0 @@ -34,6 +33,7 @@ require ( github.com/apache/arrow/go/v12 v12.0.0 // indirect github.com/apache/thrift v0.16.0 // indirect github.com/bits-and-blooms/bitset v1.13.0 // indirect + github.com/btcsuite/btcd/chaincfg/chainhash v1.0.2 // indirect github.com/bytedance/sonic v1.10.2 // indirect github.com/cespare/cp v1.1.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect diff --git a/go.sum b/go.sum index 080f217..38922d0 100644 --- a/go.sum +++ b/go.sum @@ -33,8 +33,6 @@ github.com/KyberNetwork/cclog v1.1.0 h1:3gqKpSayABuTjS4J7H8qcQHrlfyggpjmHUnCYQT0 github.com/KyberNetwork/cclog v1.1.0/go.mod h1:vf9+yocFGEyqqObn4Gr9rtj4ffnuKVpKWiSCtfsxIHg= github.com/KyberNetwork/evmlistener v0.4.7 h1:SlJwzqngj2N2ot/1M391GbwE/A6wyKfV9z1RIlxZpyI= github.com/KyberNetwork/evmlistener v0.4.7/go.mod h1:7ylrHTrF9bRJRcVdw02f4P2k3ohc1/gKGQ3HgaYtfE8= -github.com/KyberNetwork/tradinglib v0.4.19 h1:pVyRacHZY9xMhEiJHjVPKVhscdSQjoG2pIR291FnKSo= -github.com/KyberNetwork/tradinglib v0.4.19/go.mod h1:HwQjz6Iv2Nn/cQYnToYCZytjhzxLHs60JMXLeiZKcvc= github.com/Masterminds/squirrel v1.5.4 h1:uUcX/aBc8O7Fg9kaISIUsHXdKuqehiXAMQTYX8afzqM= github.com/Masterminds/squirrel v1.5.4/go.mod h1:NNaOrjSoIDfDA40n7sr2tPNZRfjzjA400rg+riTZj10= github.com/Microsoft/go-winio v0.6.1 h1:9/kr64B9VUZrLm5YYwbGtUJnMgqWVOdUAXu6Migciow= @@ -463,8 +461,6 @@ github.com/shopspring/decimal v1.3.1/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFR github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= -github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo= -github.com/sourcegraph/conc v0.3.0/go.mod h1:Sdozi7LEKbFPqYX2/J+iBAM6HpqSLTASQIKqDmF7Mt0= github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ= github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE= github.com/spf13/cobra v0.0.5/go.mod h1:3K3wKZymM7VvHMDS9+Akkh4K60UwM26emMESw8tLCHU= diff --git a/internal/server/tradelogs/broadcaster.go b/internal/server/tradelogs/broadcaster.go index 30b9c1d..2eb47ab 100644 --- a/internal/server/tradelogs/broadcaster.go +++ b/internal/server/tradelogs/broadcaster.go @@ -49,6 +49,19 @@ func (b *Broadcaster) BroadcastLog() { func (b *Broadcaster) addConn(event, maker string, conn *websocket.Conn) { id := xid.New().String() b.l.Infow("connected socket", "id", id) + go func() { + msgType, msg, err := conn.ReadMessage() + b.l.Infow("read msg result", "id", id, "msgType", msgType, "msg", msg, "err", err) + if err != nil { + b.mu.Lock() + e, ok := b.clients[fmt.Sprintf("%s-%s", event, maker)] + if !ok { + return + } + delete(e, id) + b.mu.Unlock() + } + }() b.mu.Lock() cons, ok := b.clients[combine(event, maker)] if !ok { @@ -68,20 +81,11 @@ func combine(event, maker string) string { return fmt.Sprintf("%s-%s", event, maker) } -func (b *Broadcaster) CheckDisconnect() { - for { - b.mu.Lock() - for _, cons := range b.clients { - for id, c := range cons { - if _, _, err := c.ws.ReadMessage(); err != nil { - if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway, websocket.CloseNoStatusReceived) { - b.l.Infow("socket is closed", "id", id) - delete(cons, id) - } - } - } +func (b *Broadcaster) Test() { + for range time.NewTicker(time.Second * 5).C { + b.tradeLogChan <- storage.TradeLog{ + EventHash: "0xac75f773e3a92f1a02b12134d65e1f47f8a14eabe4eaf1e24624918e6a8b269f", + Maker: "0x807cF9A772d5a3f9CeFBc1192e939D62f0D9bD38", } - b.mu.Unlock() - time.Sleep(time.Minute) } } diff --git a/internal/server/tradelogs/client_test.go b/internal/server/tradelogs/client_test.go new file mode 100644 index 0000000..9e40e0a --- /dev/null +++ b/internal/server/tradelogs/client_test.go @@ -0,0 +1,38 @@ +package server + +import ( + "encoding/json" + "log" + "testing" + "time" + + "github.com/KyberNetwork/tradelogs/pkg/storage" + "github.com/gorilla/websocket" + "github.com/stretchr/testify/require" +) + +const ( + url1 = "wss://tradelogs.kyberengineering.io/eventlogws?maker=0x807cF9A772d5a3f9CeFBc1192e939D62f0D9bD38&event_hash=0xac75f773e3a92f1a02b12134d65e1f47f8a14eabe4eaf1e24624918e6a8b269f" + url2 = "ws://localhost:8080/eventlogws?maker=0x807cF9A772d5a3f9CeFBc1192e939D62f0D9bD38&event_hash=0xac75f773e3a92f1a02b12134d65e1f47f8a14eabe4eaf1e24624918e6a8b269f" +) + +func TestWSClient(t *testing.T) { + t.Skip() + for { + conn, _, err := websocket.DefaultDialer.Dial(url2, nil) + if err != nil { + log.Fatal("dial:", err) + } + for range time.NewTicker(time.Second).C { + if _, data, err := conn.ReadMessage(); err == nil { + var l storage.TradeLog + err := json.Unmarshal(data, &l) + require.NoError(t, err) + log.Println("receive:", l) + } else { + conn.Close() + break + } + } + } +}