Skip to content

Commit

Permalink
websocket ping pong
Browse files Browse the repository at this point in the history
  • Loading branch information
adnaan committed Jun 10, 2023
1 parent 611c4c6 commit 9b748ca
Showing 1 changed file with 85 additions and 30 deletions.
115 changes: 85 additions & 30 deletions websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,25 +9,47 @@ import (
"net/http"
"strings"
"sync"
"time"

"github.com/gorilla/websocket"
"github.com/livefir/fir/internal/dom"
"github.com/livefir/fir/pubsub"
"k8s.io/klog/v2"
)

const (
// Time allowed to write a message to the peer.
writeWait = 10 * time.Second

// Time allowed to read the next pong message from the peer.
pongWait = 55 * time.Second

// Send pings to peer with this period. Must be less than pongWait.
pingPeriod = (pongWait * 9) / 10

// Maximum message size allowed from peer.
maxMessageSize = 1024
)

func onWebsocket(w http.ResponseWriter, r *http.Request, cntrl *controller) {
conn, err := cntrl.websocketUpgrader.Upgrade(w, r, nil)
if err != nil {
return
}
conn.SetReadLimit(1024)
conn.SetReadLimit(maxMessageSize)
conn.EnableWriteCompression(true)
conn.SetCompressionLevel(5)
conn.SetReadDeadline(time.Now().Add(pongWait))
conn.SetPongHandler(func(string) error {
klog.Errorf("[onWebsocket] pong from %v\n", conn.RemoteAddr())
conn.SetReadDeadline(time.Now().Add(pongWait))
return nil
})
defer conn.Close()
wsConn := &websocketConn{conn: conn}
ctx := context.Background()
done := make(chan struct{})
send := make(chan []byte)
go writePump(conn, send)
wg := &sync.WaitGroup{}
wg.Add(len(cntrl.routes))

Expand Down Expand Up @@ -56,7 +78,7 @@ func onWebsocket(w http.ResponseWriter, r *http.Request, cntrl *controller) {
response: w,
route: route,
}
go renderAndWriteEvent(wsConn, *routeChannel, routeCtx, pubsubEvent)
go renderAndWriteEvent(send, *routeChannel, routeCtx, pubsubEvent)
}
}()

Expand Down Expand Up @@ -92,7 +114,7 @@ func onWebsocket(w http.ResponseWriter, r *http.Request, cntrl *controller) {

go func() {
for pubsubEvent := range reloadSubscriber.C() {
go writeEvent(wsConn, pubsubEvent)
go writeEvent(send, pubsubEvent)
}
}()
}
Expand All @@ -104,7 +126,9 @@ loop:
for {
_, message, err := conn.ReadMessage()
if err != nil {
log.Println("[onWebsocket] c.readMessage error: ", err)
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway) {
log.Printf("[onWebsocket] error: %v", err)
}
break loop
}

Expand Down Expand Up @@ -150,17 +174,11 @@ loop:
go handleOnEventResult(onEventFunc(eventCtx), eventCtx, publishEvents(ctx, eventCtx))
}
close(done)
close(send)
wg.Wait()
}

type websocketConn struct {
conn *websocket.Conn
sync.Mutex
}

func renderAndWriteEvent(ws *websocketConn, channel string, ctx RouteContext, pubsubEvent pubsub.Event) error {
ws.Lock()
defer ws.Unlock()
func renderAndWriteEvent(send chan []byte, channel string, ctx RouteContext, pubsubEvent pubsub.Event) error {
events := renderDOMEvents(ctx, pubsubEvent)
eventsData, err := json.Marshal(events)
if err != nil {
Expand All @@ -172,30 +190,67 @@ func renderAndWriteEvent(ws *websocketConn, channel string, ctx RouteContext, pu
log.Println(err)
return err
}
klog.Errorf("[writeDOMevents] sending patch op to client:%v, %+v\n", ws.conn.RemoteAddr().String(), string(eventsData))
err = ws.conn.WriteMessage(websocket.TextMessage, eventsData)
if err != nil {
klog.Errorf("[writeDOMevents] error: writing message for channel:%v, closing conn with err %v", channel, err)
ws.conn.Close()
}
send <- eventsData
return err
}

func writeEvent(ws *websocketConn, pubsubEvent pubsub.Event) error {
ws.Lock()
defer ws.Unlock()
reload := dom.Event{
func writeEvent(send chan []byte, pubsubEvent pubsub.Event) error {
domEvent := dom.Event{
Type: pubsubEvent.ID,
}
reloadData, err := json.Marshal([]dom.Event{reload})
eventsData, err := json.Marshal([]dom.Event{domEvent})
if err != nil {
klog.Errorf("[writeReloadEvent] error: marshaling reload event %+v, err %v", reload, err)
klog.Errorf("[writeReloadEvent] error: marshaling dom event %+v, err %v", domEvent, err)
return err
}
err = ws.conn.WriteMessage(websocket.TextMessage, reloadData)
if err != nil {
klog.Errorf("[writeReloadEvent] error: writing message for channel:%v, closing conn with err %v", devReloadChannel, err)
ws.conn.Close()
}
send <- eventsData
return err
}

func writeConn(conn *websocket.Conn, mt int, payload []byte) error {
conn.SetWriteDeadline(time.Now().Add(writeWait))
return conn.WriteMessage(mt, payload)
}

func writePump(conn *websocket.Conn, send chan []byte) {
ticker := time.NewTicker(pingPeriod)
defer func() {
ticker.Stop()
conn.Close()
}()
for {
select {
case message, ok := <-send:
if !ok {
// The hub closed the channel.
writeConn(conn, websocket.CloseMessage, []byte{})
return
}

conn.SetWriteDeadline(time.Now().Add(writeWait))
w, err := conn.NextWriter(websocket.TextMessage)
if err != nil {
return
}
go func() {
klog.Errorf("[writeDOMevents] sending patch op to client:%v, %+v\n", conn.RemoteAddr(), string(message))
}()
w.Write(message)

// Add queued chat messages to the current websocket message.
n := len(send)
for i := 0; i < n; i++ {
w.Write(<-send)
}

if err := w.Close(); err != nil {
return
}
case <-ticker.C:
klog.Errorf("ping to client: %v\n", conn.RemoteAddr())
if err := writeConn(conn, websocket.PingMessage, []byte{}); err != nil {
return
}
}
}
}

0 comments on commit 9b748ca

Please sign in to comment.