Skip to content

Commit

Permalink
update mutex lock (#41)
Browse files Browse the repository at this point in the history
  • Loading branch information
ngocthanh1389 authored Apr 19, 2024
1 parent b4a13e9 commit 2c2430a
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 28 deletions.
66 changes: 39 additions & 27 deletions internal/server/tradelogs/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,35 +35,50 @@ func NewBroadcaster(tradeChan chan storage.TradeLog) *Broadcaster {

func (b *Broadcaster) BroadcastLog() {
for log := range b.tradeLogChan {
b.mu.Lock()
cons := b.clients[combine(log.EventHash, log.Maker)]
for _, c := range cons {
if err := c.ws.WriteJSON(log); err != nil {
b.l.Errorw("error when send msg", "err", err)
}
}
b.mu.Unlock()
b.writeEvent(log)
}
}

func (b *Broadcaster) addConn(event, maker string, conn *websocket.Conn) {
func (b *Broadcaster) newConn(event, maker string, conn *websocket.Conn) {
id := xid.New().String()
b.l.Infow("connected socket", "id", id)
go func() {
msgType, msg, err := conn.ReadMessage()
b.l.Infow("read msg result", "id", id, "msgType", msgType, "msg", msg, "err", err)
if err != nil {
b.mu.Lock()
e, ok := b.clients[fmt.Sprintf("%s-%s", event, maker)]
if !ok {
return
}
conn.Close()
delete(e, id)
b.mu.Unlock()
b.removeConn(conn, id, event, maker)
}
}()
b.addConn(conn, id, event, maker)
}

func combine(event, maker string) string {
return fmt.Sprintf("%s-%s", event, maker)
}

func (b *Broadcaster) Test() {
for range time.NewTicker(time.Second * 5).C {
b.tradeLogChan <- storage.TradeLog{
EventHash: "0xac75f773e3a92f1a02b12134d65e1f47f8a14eabe4eaf1e24624918e6a8b269f",
Maker: "0x807cF9A772d5a3f9CeFBc1192e939D62f0D9bD38",
}
}
}

func (b *Broadcaster) removeConn(conn *websocket.Conn, id, event, maker string) {
b.mu.Lock()
defer b.mu.Unlock()
e, ok := b.clients[fmt.Sprintf("%s-%s", event, maker)]
if !ok {
return
}
conn.Close()
delete(e, id)
}

func (b *Broadcaster) addConn(conn *websocket.Conn, id, event, maker string) {
b.mu.Lock()
defer b.mu.Unlock()
cons, ok := b.clients[combine(event, maker)]
if !ok {
cons = map[string]Con{}
Expand All @@ -75,18 +90,15 @@ func (b *Broadcaster) addConn(event, maker string, conn *websocket.Conn) {
eventHash: event,
}
b.clients[combine(event, maker)] = cons
b.mu.Unlock()
}

func combine(event, maker string) string {
return fmt.Sprintf("%s-%s", event, maker)
}

func (b *Broadcaster) Test() {
for range time.NewTicker(time.Second * 5).C {
b.tradeLogChan <- storage.TradeLog{
EventHash: "0xac75f773e3a92f1a02b12134d65e1f47f8a14eabe4eaf1e24624918e6a8b269f",
Maker: "0x807cF9A772d5a3f9CeFBc1192e939D62f0D9bD38",
func (b *Broadcaster) writeEvent(log storage.TradeLog) {
b.mu.Lock()
defer b.mu.Unlock()
cons := b.clients[combine(log.EventHash, log.Maker)]
for _, c := range cons {
if err := c.ws.WriteJSON(log); err != nil {
b.l.Errorw("error when send msg", "err", err)
}
}
}
2 changes: 1 addition & 1 deletion internal/server/tradelogs/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,5 +120,5 @@ func (s *Server) registerEventLogWS(c *gin.Context) {
responseErr(c, http.StatusInternalServerError, fmt.Errorf("can't create ws"))
return
}
s.bc.addConn(param.EventHash, param.Maker, conn)
s.bc.newConn(param.EventHash, param.Maker, conn)
}

0 comments on commit 2c2430a

Please sign in to comment.