Skip to content

Commit

Permalink
update README, allowed custom handle ping event
Browse files Browse the repository at this point in the history
  • Loading branch information
PotatoCloud committed Aug 2, 2023
1 parent 5fef027 commit f234e53
Show file tree
Hide file tree
Showing 9 changed files with 154 additions and 33 deletions.
97 changes: 97 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,101 @@
go get github.com/RealFax/q-websocket@latest
```

## quick start

```go
// echo server

package main

import (
qWebsocket "github.com/RealFax/q-websocket"

"github.com/gobwas/ws/wsutil"
"github.com/panjf2000/gnet/v2"

"log"
)

func echo(req *qWebsocket.HandlerParams) {
wsutil.WriteServerText(req.Writer, req.Request)
}

func main() {
server := qWebsocket.NewServer(
"tcp://0.0.0.0:9090",
qWebsocket.WithHandler(echo),
)

if err := server.ListenAndServe(gnet.WithMulticore(true)); err != nil {
log.Fatal("server error:", err)
}
}
```

## using proto
_proto is a low-level handler built into q-websocket, providing simple router_

```go
// using proto

package main

import (
qWebsocket "github.com/RealFax/q-websocket"
"github.com/RealFax/q-websocket/proto"

"github.com/gobwas/ws"
"github.com/panjf2000/gnet/v2"

"log"
"strings"
)

// define proto struct

type Proto struct {
Type uint32 `json:"type"`
Message string `json:"message"`
}

func (p *Proto) Key() uint32 { return p.Type }
func (p *Proto) Value() Proto { return *p }
func (p *Proto) Self() *Proto { return p }

const (
Echo uint32 = iota
Pong
)

func main() {
engine := proto.New[Proto, uint32](func() proto.Proto[Proto, uint32] {
return new(Proto)
})

// echo handler
engine.Register(Echo, func(r *proto.Request[Proto]) {
r.WriteText([]byte(r.Request.Message))
})

// ping pong handler
engine.Register(Pong, func(r *proto.Request[Proto]) {
if strings.ToLower(r.Request.Message) == "ping" {
r.WriteText([]byte("pong"))
return
}
r.WriteClose(ws.StatusGoingAway, "")
})

server := qWebsocket.NewServer(
"tcp://127.0.0.1:9090",
qWebsocket.WithHandler(engine.UseHandler()),
)

if err := server.ListenAndServe(gnet.WithMulticore(true)); err != nil {
log.Fatal("server error:", err)
}
}
```

more usage see: [Example](https://github.com/RealFax/q-websocket/tree/master/example)
4 changes: 2 additions & 2 deletions example/simple-chat/chat.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,13 +141,13 @@ func main() {
server := qWebsocket.NewServer(
"tcp://127.0.0.1:8080",
qWebsocket.WithHandler(engine.UseHandler()),
qWebsocket.WithOnCloseHandler(func(conn *qWebsocket.GNetUpgraderConn, _ error) {
qWebsocket.WithOnCloseHandler(func(conn *qWebsocket.Conn, _ error) {
s.offline(conn.ID)
}),
)

go func() {
if err := server.ListenAndServer(gnet.WithMulticore(true)); err != nil {
if err := server.ListenAndServe(gnet.WithMulticore(true)); err != nil {
fmt.Println("[-] gnet error:", err)
}
}()
Expand Down
27 changes: 17 additions & 10 deletions handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,26 @@ package qWebsocket

import (
"github.com/gobwas/ws"
"github.com/gobwas/ws/wsutil"
"io"
)

type OnCloseHandlerFunc func(conn *GNetUpgraderConn, err error)
type (
OnCloseHandlerFunc func(conn *Conn, err error)
OnPingHandlerFunc func(conn *Conn)
HandlerFunc func(req *HandlerParams)

type HandlerParams struct {
OpCode ws.OpCode
Request []byte
Writer io.Writer
WsConn *GNetUpgraderConn
}
HandlerParams struct {
OpCode ws.OpCode
Request []byte
Writer io.Writer
WsConn *Conn
}
)

type HandlerFunc func(req *HandlerParams)
func EmptyOnCloseHandler(_ *Conn, _ error) {}
func EmptyHandler(_ *HandlerParams) {}

func EmptyOnCloseHandler(_ *GNetUpgraderConn, _ error) {}
func EmptyHandler(_ *HandlerParams) {}
func DefaultOnPingHandler(c *Conn) {
wsutil.WriteServerMessage(c, ws.OpPong, nil)
}
6 changes: 6 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ func WithOnCloseHandler(handler OnCloseHandlerFunc) OptionFunc {
}
}

func WithOnPingHandler(handler OnPingHandlerFunc) OptionFunc {
return func(s *Server) {
s.onPingHandler = handler
}
}

func WithHandler(handler HandlerFunc) OptionFunc {
return func(s *Server) {
s.handler = handler
Expand Down
9 changes: 4 additions & 5 deletions proto/proto_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"github.com/RealFax/q-websocket/proto"
"github.com/gobwas/ws"
"github.com/panjf2000/gnet/v2"
"log"
"testing"
)

Expand All @@ -24,20 +23,20 @@ var engine = proto.New[Proto, uint32](func() proto.Proto[Proto, uint32] {

func init() {
engine.Register(1, func(r *proto.Request[Proto]) {
log.Println(r.Request.Message)
r.WriteText([]byte(r.Request.Message))
})
engine.Register(2, func(r *proto.Request[Proto]) {
ws.WriteFrame(r.Writer, ws.NewTextFrame([]byte(r.Request.Message)))
r.WriteClose(ws.StatusGoingAway, "")
})
}

func TestEngine_Handler(t *testing.T) {
server := qWebsocket.NewServer(
"tcp://127.0.0.1:8080",
"tcp://127.0.0.1:10001",
qWebsocket.WithHandler(engine.UseHandler()),
)

if err := server.ListenAndServer(gnet.WithMulticore(true)); err != nil {
if err := server.ListenAndServe(gnet.WithMulticore(true)); err != nil {
t.Fatal(err)
}
}
10 changes: 6 additions & 4 deletions proto/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ import (
"bytes"
qWebsocket "github.com/RealFax/q-websocket"
"github.com/gobwas/ws"
"github.com/gobwas/ws/wsutil"
"io"
)

type Request[T any] struct {
OpCode ws.OpCode
Writer io.Writer
Conn *qWebsocket.GNetUpgraderConn
Conn *qWebsocket.Conn
Request *T
RawRequest []byte
}
Expand All @@ -20,13 +21,14 @@ func (t Request[T]) Reader() io.Reader {
}

func (t Request[T]) WriteText(p []byte) error {
return ws.WriteFrame(t.Writer, ws.NewTextFrame(p))
return wsutil.WriteServerMessage(t.Writer, ws.OpText, p)
}

func (t Request[T]) WriteBinary(p []byte) error {
return ws.WriteFrame(t.Writer, ws.NewBinaryFrame(p))
return wsutil.WriteServerMessage(t.Writer, ws.OpBinary, p)
}

func (t Request[T]) WriteClose(statusCode ws.StatusCode, reason string) error {
return ws.WriteFrame(t.Writer, ws.NewCloseFrame(ws.NewCloseFrameBody(statusCode, reason)))
defer t.Conn.Close()
return wsutil.WriteServerMessage(t.Writer, ws.OpClose, ws.NewCloseFrameBody(statusCode, reason))
}
18 changes: 13 additions & 5 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type Server struct {
upgrader *ws.Upgrader
keepConnTable *ttlcache.Cache[string, gnet.Conn]
onCloseHandler OnCloseHandlerFunc
onPingHandler OnPingHandlerFunc
handler HandlerFunc
}

Expand Down Expand Up @@ -52,12 +53,16 @@ func (s *Server) withDefault() {
WithOnCloseHandler(EmptyOnCloseHandler)(s)
}

if s.onPingHandler == nil {
WithOnPingHandler(DefaultOnPingHandler)(s)
}

if s.handler == nil {
WithHandler(EmptyHandler)(s)
}
}

func (s *Server) closeWS(conn *GNetUpgraderConn, statusCode ws.StatusCode, reason error) error {
func (s *Server) closeWS(conn *Conn, statusCode ws.StatusCode, reason error) error {
defer s.onCloseHandler(conn, reason)
return ws.WriteFrame(conn, ws.NewCloseFrame(ws.NewCloseFrameBody(statusCode, func() string {
if reason != nil {
Expand All @@ -75,7 +80,7 @@ func (s *Server) setupTimeoutHandler() {
item *ttlcache.Item[string, gnet.Conn],
) {
atomic.AddInt64(&s.connNum, -1)
upgraderConn, ok := item.Value().Context().(*GNetUpgraderConn)
upgraderConn, ok := item.Value().Context().(*Conn)
if !ok {
item.Value().Close()
return
Expand Down Expand Up @@ -137,7 +142,7 @@ func (s *Server) OnTraffic(c gnet.Conn) gnet.Action {
c.SetContext(NewUpgraderConn(c))
}

upgraderConn, ok := c.Context().(*GNetUpgraderConn)
upgraderConn, ok := c.Context().(*Conn)
if !ok {
log.Printf("[-] invalid context, remote addr: %s", c.RemoteAddr())
return gnet.None
Expand Down Expand Up @@ -167,7 +172,10 @@ func (s *Server) OnTraffic(c gnet.Conn) gnet.Action {
for _, message := range messages {
switch message.OpCode {
case ws.OpPing:
wsutil.WriteServerMessage(upgraderConn, ws.OpPong, nil)
// async handle
s.workerPool.Submit(func() {
s.onPingHandler(upgraderConn)
})
upgraderConn.UpdateActive()
case ws.OpText, ws.OpBinary:
// async handle
Expand All @@ -189,7 +197,7 @@ func (s *Server) OnTraffic(c gnet.Conn) gnet.Action {
return gnet.None
}

func (s *Server) ListenAndServer(opts ...gnet.Option) error {
func (s *Server) ListenAndServe(opts ...gnet.Option) error {
s.setupTimeoutHandler()
return gnet.Run(s, s.addr, opts...)
}
Expand Down
2 changes: 1 addition & 1 deletion server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestServer_ListenAndServer(t *testing.T) {
http.ListenAndServe("localhost:9090", nil)
}()

if err := server.ListenAndServer(gnet.WithMulticore(true)); err != nil {
if err := server.ListenAndServe(gnet.WithMulticore(true)); err != nil {
t.Fatal(err)
}
}
14 changes: 8 additions & 6 deletions upgrader.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ var (
emptyUpgrader = &ws.Upgrader{}
)

type GNetUpgraderConn struct {
// Conn is upgraded websocket conn
type Conn struct {
gnet.Conn
LastActive int64 // atomic
successUpgraded bool
Expand All @@ -22,23 +23,24 @@ type GNetUpgraderConn struct {
ctx context.Context
}

func (c *GNetUpgraderConn) Context() context.Context {
func (c *Conn) Context() context.Context {
return c.ctx
}

func (c *GNetUpgraderConn) SetContext(ctx context.Context) {
func (c *Conn) SetContext(ctx context.Context) {
c.ctx = ctx
}

func (c *GNetUpgraderConn) UpdateActive() {
func (c *Conn) UpdateActive() {
atomic.StoreInt64(&c.LastActive, time.Now().Unix())
}

func NewUpgraderConn(conn gnet.Conn) *GNetUpgraderConn {
return &GNetUpgraderConn{
func NewUpgraderConn(conn gnet.Conn) *Conn {
return &Conn{
Conn: conn,
LastActive: time.Now().Unix(),
successUpgraded: false,
ID: uuid.New().String(),
ctx: context.Background(),
}
}

0 comments on commit f234e53

Please sign in to comment.