From f234e53f9600b361289f4c21cd8aab6784fb2e64 Mon Sep 17 00:00:00 2001 From: Potato Date: Wed, 2 Aug 2023 15:56:19 +0800 Subject: [PATCH] update README, allowed custom handle ping event --- README.md | 97 +++++++++++++++++++++++++++++++++++++ example/simple-chat/chat.go | 4 +- handler.go | 27 +++++++---- options.go | 6 +++ proto/proto_test.go | 9 ++-- proto/request.go | 10 ++-- server.go | 18 +++++-- server_test.go | 2 +- upgrader.go | 14 +++--- 9 files changed, 154 insertions(+), 33 deletions(-) diff --git a/README.md b/README.md index c4db5e6..b387eea 100644 --- a/README.md +++ b/README.md @@ -6,4 +6,101 @@ go get github.com/RealFax/q-websocket@latest ``` +## quick start + +```go +// echo server + +package main + +import ( + qWebsocket "github.com/RealFax/q-websocket" + + "github.com/gobwas/ws/wsutil" + "github.com/panjf2000/gnet/v2" + + "log" +) + +func echo(req *qWebsocket.HandlerParams) { + wsutil.WriteServerText(req.Writer, req.Request) +} + +func main() { + server := qWebsocket.NewServer( + "tcp://0.0.0.0:9090", + qWebsocket.WithHandler(echo), + ) + + if err := server.ListenAndServe(gnet.WithMulticore(true)); err != nil { + log.Fatal("server error:", err) + } +} +``` + +## using proto +_proto is a low-level handler built into q-websocket, providing simple router_ + +```go +// using proto + +package main + +import ( + qWebsocket "github.com/RealFax/q-websocket" + "github.com/RealFax/q-websocket/proto" + + "github.com/gobwas/ws" + "github.com/panjf2000/gnet/v2" + + "log" + "strings" +) + +// define proto struct + +type Proto struct { + Type uint32 `json:"type"` + Message string `json:"message"` +} + +func (p *Proto) Key() uint32 { return p.Type } +func (p *Proto) Value() Proto { return *p } +func (p *Proto) Self() *Proto { return p } + +const ( + Echo uint32 = iota + Pong +) + +func main() { + engine := proto.New[Proto, uint32](func() proto.Proto[Proto, uint32] { + return new(Proto) + }) + + // echo handler + engine.Register(Echo, func(r *proto.Request[Proto]) { + r.WriteText([]byte(r.Request.Message)) + }) + + // ping pong handler + engine.Register(Pong, func(r *proto.Request[Proto]) { + if strings.ToLower(r.Request.Message) == "ping" { + r.WriteText([]byte("pong")) + return + } + r.WriteClose(ws.StatusGoingAway, "") + }) + + server := qWebsocket.NewServer( + "tcp://127.0.0.1:9090", + qWebsocket.WithHandler(engine.UseHandler()), + ) + + if err := server.ListenAndServe(gnet.WithMulticore(true)); err != nil { + log.Fatal("server error:", err) + } +} +``` + more usage see: [Example](https://github.com/RealFax/q-websocket/tree/master/example) diff --git a/example/simple-chat/chat.go b/example/simple-chat/chat.go index d5420ba..ebac9bd 100644 --- a/example/simple-chat/chat.go +++ b/example/simple-chat/chat.go @@ -141,13 +141,13 @@ func main() { server := qWebsocket.NewServer( "tcp://127.0.0.1:8080", qWebsocket.WithHandler(engine.UseHandler()), - qWebsocket.WithOnCloseHandler(func(conn *qWebsocket.GNetUpgraderConn, _ error) { + qWebsocket.WithOnCloseHandler(func(conn *qWebsocket.Conn, _ error) { s.offline(conn.ID) }), ) go func() { - if err := server.ListenAndServer(gnet.WithMulticore(true)); err != nil { + if err := server.ListenAndServe(gnet.WithMulticore(true)); err != nil { fmt.Println("[-] gnet error:", err) } }() diff --git a/handler.go b/handler.go index 3faa2f6..d367764 100644 --- a/handler.go +++ b/handler.go @@ -2,19 +2,26 @@ package qWebsocket import ( "github.com/gobwas/ws" + "github.com/gobwas/ws/wsutil" "io" ) -type OnCloseHandlerFunc func(conn *GNetUpgraderConn, err error) +type ( + OnCloseHandlerFunc func(conn *Conn, err error) + OnPingHandlerFunc func(conn *Conn) + HandlerFunc func(req *HandlerParams) -type HandlerParams struct { - OpCode ws.OpCode - Request []byte - Writer io.Writer - WsConn *GNetUpgraderConn -} + HandlerParams struct { + OpCode ws.OpCode + Request []byte + Writer io.Writer + WsConn *Conn + } +) -type HandlerFunc func(req *HandlerParams) +func EmptyOnCloseHandler(_ *Conn, _ error) {} +func EmptyHandler(_ *HandlerParams) {} -func EmptyOnCloseHandler(_ *GNetUpgraderConn, _ error) {} -func EmptyHandler(_ *HandlerParams) {} +func DefaultOnPingHandler(c *Conn) { + wsutil.WriteServerMessage(c, ws.OpPong, nil) +} diff --git a/options.go b/options.go index 8187576..bf78df5 100644 --- a/options.go +++ b/options.go @@ -47,6 +47,12 @@ func WithOnCloseHandler(handler OnCloseHandlerFunc) OptionFunc { } } +func WithOnPingHandler(handler OnPingHandlerFunc) OptionFunc { + return func(s *Server) { + s.onPingHandler = handler + } +} + func WithHandler(handler HandlerFunc) OptionFunc { return func(s *Server) { s.handler = handler diff --git a/proto/proto_test.go b/proto/proto_test.go index 3dce05e..5970dfc 100644 --- a/proto/proto_test.go +++ b/proto/proto_test.go @@ -5,7 +5,6 @@ import ( "github.com/RealFax/q-websocket/proto" "github.com/gobwas/ws" "github.com/panjf2000/gnet/v2" - "log" "testing" ) @@ -24,20 +23,20 @@ var engine = proto.New[Proto, uint32](func() proto.Proto[Proto, uint32] { func init() { engine.Register(1, func(r *proto.Request[Proto]) { - log.Println(r.Request.Message) + r.WriteText([]byte(r.Request.Message)) }) engine.Register(2, func(r *proto.Request[Proto]) { - ws.WriteFrame(r.Writer, ws.NewTextFrame([]byte(r.Request.Message))) + r.WriteClose(ws.StatusGoingAway, "") }) } func TestEngine_Handler(t *testing.T) { server := qWebsocket.NewServer( - "tcp://127.0.0.1:8080", + "tcp://127.0.0.1:10001", qWebsocket.WithHandler(engine.UseHandler()), ) - if err := server.ListenAndServer(gnet.WithMulticore(true)); err != nil { + if err := server.ListenAndServe(gnet.WithMulticore(true)); err != nil { t.Fatal(err) } } diff --git a/proto/request.go b/proto/request.go index 71c635d..72ad70c 100644 --- a/proto/request.go +++ b/proto/request.go @@ -4,13 +4,14 @@ import ( "bytes" qWebsocket "github.com/RealFax/q-websocket" "github.com/gobwas/ws" + "github.com/gobwas/ws/wsutil" "io" ) type Request[T any] struct { OpCode ws.OpCode Writer io.Writer - Conn *qWebsocket.GNetUpgraderConn + Conn *qWebsocket.Conn Request *T RawRequest []byte } @@ -20,13 +21,14 @@ func (t Request[T]) Reader() io.Reader { } func (t Request[T]) WriteText(p []byte) error { - return ws.WriteFrame(t.Writer, ws.NewTextFrame(p)) + return wsutil.WriteServerMessage(t.Writer, ws.OpText, p) } func (t Request[T]) WriteBinary(p []byte) error { - return ws.WriteFrame(t.Writer, ws.NewBinaryFrame(p)) + return wsutil.WriteServerMessage(t.Writer, ws.OpBinary, p) } func (t Request[T]) WriteClose(statusCode ws.StatusCode, reason string) error { - return ws.WriteFrame(t.Writer, ws.NewCloseFrame(ws.NewCloseFrameBody(statusCode, reason))) + defer t.Conn.Close() + return wsutil.WriteServerMessage(t.Writer, ws.OpClose, ws.NewCloseFrameBody(statusCode, reason)) } diff --git a/server.go b/server.go index 8f8ca1a..c25125a 100644 --- a/server.go +++ b/server.go @@ -23,6 +23,7 @@ type Server struct { upgrader *ws.Upgrader keepConnTable *ttlcache.Cache[string, gnet.Conn] onCloseHandler OnCloseHandlerFunc + onPingHandler OnPingHandlerFunc handler HandlerFunc } @@ -52,12 +53,16 @@ func (s *Server) withDefault() { WithOnCloseHandler(EmptyOnCloseHandler)(s) } + if s.onPingHandler == nil { + WithOnPingHandler(DefaultOnPingHandler)(s) + } + if s.handler == nil { WithHandler(EmptyHandler)(s) } } -func (s *Server) closeWS(conn *GNetUpgraderConn, statusCode ws.StatusCode, reason error) error { +func (s *Server) closeWS(conn *Conn, statusCode ws.StatusCode, reason error) error { defer s.onCloseHandler(conn, reason) return ws.WriteFrame(conn, ws.NewCloseFrame(ws.NewCloseFrameBody(statusCode, func() string { if reason != nil { @@ -75,7 +80,7 @@ func (s *Server) setupTimeoutHandler() { item *ttlcache.Item[string, gnet.Conn], ) { atomic.AddInt64(&s.connNum, -1) - upgraderConn, ok := item.Value().Context().(*GNetUpgraderConn) + upgraderConn, ok := item.Value().Context().(*Conn) if !ok { item.Value().Close() return @@ -137,7 +142,7 @@ func (s *Server) OnTraffic(c gnet.Conn) gnet.Action { c.SetContext(NewUpgraderConn(c)) } - upgraderConn, ok := c.Context().(*GNetUpgraderConn) + upgraderConn, ok := c.Context().(*Conn) if !ok { log.Printf("[-] invalid context, remote addr: %s", c.RemoteAddr()) return gnet.None @@ -167,7 +172,10 @@ func (s *Server) OnTraffic(c gnet.Conn) gnet.Action { for _, message := range messages { switch message.OpCode { case ws.OpPing: - wsutil.WriteServerMessage(upgraderConn, ws.OpPong, nil) + // async handle + s.workerPool.Submit(func() { + s.onPingHandler(upgraderConn) + }) upgraderConn.UpdateActive() case ws.OpText, ws.OpBinary: // async handle @@ -189,7 +197,7 @@ func (s *Server) OnTraffic(c gnet.Conn) gnet.Action { return gnet.None } -func (s *Server) ListenAndServer(opts ...gnet.Option) error { +func (s *Server) ListenAndServe(opts ...gnet.Option) error { s.setupTimeoutHandler() return gnet.Run(s, s.addr, opts...) } diff --git a/server_test.go b/server_test.go index 9a48d8a..7432d3d 100644 --- a/server_test.go +++ b/server_test.go @@ -42,7 +42,7 @@ func TestServer_ListenAndServer(t *testing.T) { http.ListenAndServe("localhost:9090", nil) }() - if err := server.ListenAndServer(gnet.WithMulticore(true)); err != nil { + if err := server.ListenAndServe(gnet.WithMulticore(true)); err != nil { t.Fatal(err) } } diff --git a/upgrader.go b/upgrader.go index eac280d..a032c48 100644 --- a/upgrader.go +++ b/upgrader.go @@ -13,7 +13,8 @@ var ( emptyUpgrader = &ws.Upgrader{} ) -type GNetUpgraderConn struct { +// Conn is upgraded websocket conn +type Conn struct { gnet.Conn LastActive int64 // atomic successUpgraded bool @@ -22,23 +23,24 @@ type GNetUpgraderConn struct { ctx context.Context } -func (c *GNetUpgraderConn) Context() context.Context { +func (c *Conn) Context() context.Context { return c.ctx } -func (c *GNetUpgraderConn) SetContext(ctx context.Context) { +func (c *Conn) SetContext(ctx context.Context) { c.ctx = ctx } -func (c *GNetUpgraderConn) UpdateActive() { +func (c *Conn) UpdateActive() { atomic.StoreInt64(&c.LastActive, time.Now().Unix()) } -func NewUpgraderConn(conn gnet.Conn) *GNetUpgraderConn { - return &GNetUpgraderConn{ +func NewUpgraderConn(conn gnet.Conn) *Conn { + return &Conn{ Conn: conn, LastActive: time.Now().Unix(), successUpgraded: false, ID: uuid.New().String(), + ctx: context.Background(), } }