From 5fef02723016368354480a1879ea8db0d67ef871 Mon Sep 17 00:00:00 2001 From: Potato Date: Wed, 2 Aug 2023 14:37:10 +0800 Subject: [PATCH] based layer4 conn timeout impl --- default.go | 10 +++++++++ go.mod | 13 +++++++---- go.sum | 3 +++ options.go | 11 +++++++++ server.go | 60 +++++++++++++++++++++++++++++++++++++++++++++----- server_test.go | 2 +- 6 files changed, 88 insertions(+), 11 deletions(-) create mode 100644 default.go diff --git a/default.go b/default.go new file mode 100644 index 0000000..e4551e3 --- /dev/null +++ b/default.go @@ -0,0 +1,10 @@ +package qWebsocket + +import "time" + +const ( + DefaultWorkerPoolExpiry = time.Second * 10 + DefaultWorkerPoolNonBlocking = true + + DefaultConnTimeout = time.Second * 15 +) diff --git a/go.mod b/go.mod index 154b0a0..fe92922 100644 --- a/go.mod +++ b/go.mod @@ -2,13 +2,18 @@ module github.com/RealFax/q-websocket go 1.20 +require ( + github.com/gobwas/ws v1.2.1 + github.com/google/uuid v1.3.0 + github.com/jellydator/ttlcache/v3 v3.0.1 + github.com/panjf2000/ants/v2 v2.8.1 + github.com/panjf2000/gnet/v2 v2.3.1 + github.com/pkg/errors v0.8.1 +) + require ( github.com/gobwas/httphead v0.1.0 // indirect github.com/gobwas/pool v0.2.1 // indirect - github.com/gobwas/ws v1.2.1 // indirect - github.com/google/uuid v1.3.0 // indirect - github.com/panjf2000/ants/v2 v2.8.1 // indirect - github.com/panjf2000/gnet/v2 v2.3.1 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect go.uber.org/atomic v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect diff --git a/go.sum b/go.sum index 1019c6b..adc3e58 100644 --- a/go.sum +++ b/go.sum @@ -9,10 +9,13 @@ github.com/gobwas/ws v1.2.1 h1:F2aeBZrm2NDsc7vbovKrWSogd4wvfAxg0FQ89/iqOTk= github.com/gobwas/ws v1.2.1/go.mod h1:hRKAFb8wOxFROYNsT1bqfWnhX+b5MFeJM9r2ZSwg/KY= github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/jellydator/ttlcache/v3 v3.0.1 h1:cHgCSMS7TdQcoprXnWUptJZzyFsqs18Lt8VVhRuZYVU= +github.com/jellydator/ttlcache/v3 v3.0.1/go.mod h1:WwTaEmcXQ3MTjOm4bsZoDFiCu/hMvNWLO1w67RXz6h4= github.com/panjf2000/ants/v2 v2.8.1 h1:C+n/f++aiW8kHCExKlpX6X+okmxKXP7DWLutxuAPuwQ= github.com/panjf2000/ants/v2 v2.8.1/go.mod h1:KIBmYG9QQX5U2qzFP/yQJaq/nSb6rahS9iEHkrCMgM8= github.com/panjf2000/gnet/v2 v2.3.1 h1:J7vHkNxwsevVIw3u/6LCXgcnpGBk5iKqhQ2RMblGodc= github.com/panjf2000/gnet/v2 v2.3.1/go.mod h1:Ik5lTy2nmBg9Uvjfcf2KRYs+EXVNOLyxPHpFOFlqu+M= +github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= diff --git a/options.go b/options.go index d9c6df9..8187576 100644 --- a/options.go +++ b/options.go @@ -4,8 +4,11 @@ import ( "context" "crypto/tls" "github.com/gobwas/ws" + "github.com/jellydator/ttlcache/v3" "github.com/panjf2000/ants/v2" + "github.com/panjf2000/gnet/v2" "net" + "time" ) // ---------- server options ---------- // @@ -30,6 +33,14 @@ func WithUpgrader(upgrader *ws.Upgrader) OptionFunc { } } +func WithConnTimeout(timeout time.Duration) OptionFunc { + return func(s *Server) { + s.keepConnTable = ttlcache.New[string, gnet.Conn]( + ttlcache.WithTTL[string, gnet.Conn](timeout), + ) + } +} + func WithOnCloseHandler(handler OnCloseHandlerFunc) OptionFunc { return func(s *Server) { s.onCloseHandler = handler diff --git a/server.go b/server.go index 44fd4c4..8f8ca1a 100644 --- a/server.go +++ b/server.go @@ -4,8 +4,10 @@ import ( "context" "github.com/gobwas/ws" "github.com/gobwas/ws/wsutil" + "github.com/jellydator/ttlcache/v3" "github.com/panjf2000/ants/v2" "github.com/panjf2000/gnet/v2" + "github.com/pkg/errors" "log" "sync/atomic" "time" @@ -19,19 +21,22 @@ type Server struct { engine gnet.Engine workerPool *ants.Pool upgrader *ws.Upgrader + keepConnTable *ttlcache.Cache[string, gnet.Conn] onCloseHandler OnCloseHandlerFunc handler HandlerFunc } func (s *Server) withDefault() { + // check & init + if s.ctx == nil { WithContext(context.Background())(s) } if s.workerPool == nil { WithWorkerPool(1024*1024, ants.Options{ - ExpiryDuration: time.Second * 10, - Nonblocking: true, + ExpiryDuration: DefaultWorkerPoolExpiry, + Nonblocking: DefaultWorkerPoolNonBlocking, })(s) } @@ -39,6 +44,10 @@ func (s *Server) withDefault() { WithUpgrader(emptyUpgrader)(s) } + if s.keepConnTable == nil { + WithConnTimeout(DefaultConnTimeout)(s) + } + if s.onCloseHandler == nil { WithOnCloseHandler(EmptyOnCloseHandler)(s) } @@ -58,6 +67,30 @@ func (s *Server) closeWS(conn *GNetUpgraderConn, statusCode ws.StatusCode, reaso }()))) } +func (s *Server) setupTimeoutHandler() { + // on connect timeout handler + s.keepConnTable.OnEviction(func( + _ context.Context, + reason ttlcache.EvictionReason, + item *ttlcache.Item[string, gnet.Conn], + ) { + atomic.AddInt64(&s.connNum, -1) + upgraderConn, ok := item.Value().Context().(*GNetUpgraderConn) + if !ok { + item.Value().Close() + return + } + s.closeWS(upgraderConn, ws.StatusGoingAway, errors.New("timeout")) + }) + + // start monitor connect ttl + go s.keepConnTable.Start() + go func() { + time.Sleep(time.Hour * 1) + s.keepConnTable.DeleteExpired() + }() +} + func (s *Server) Online() int64 { return atomic.LoadInt64(&s.connNum) } @@ -78,13 +111,17 @@ func (s *Server) OnBoot(eng gnet.Engine) gnet.Action { func (s *Server) OnShutdown(_ gnet.Engine) {} -func (s *Server) OnOpen(_ gnet.Conn) ([]byte, gnet.Action) { +func (s *Server) OnOpen(c gnet.Conn) ([]byte, gnet.Action) { atomic.AddInt64(&s.connNum, 1) + // monitor conn timeout + s.keepConnTable.Set(c.RemoteAddr().String(), c, ttlcache.DefaultTTL) return nil, gnet.None } -func (s *Server) OnClose(_ gnet.Conn, _ error) gnet.Action { +func (s *Server) OnClose(c gnet.Conn, _ error) gnet.Action { atomic.AddInt64(&s.connNum, -1) + // conn closed, remove conn in monitor list + s.keepConnTable.Delete(c.RemoteAddr().String()) return gnet.None } @@ -93,6 +130,9 @@ func (s *Server) OnTick() (time.Duration, gnet.Action) { } func (s *Server) OnTraffic(c gnet.Conn) gnet.Action { + // reset conn ttl + s.keepConnTable.Set(c.RemoteAddr().String(), c, time.Second*5) + if c.Context() == nil { c.SetContext(NewUpgraderConn(c)) } @@ -103,6 +143,7 @@ func (s *Server) OnTraffic(c gnet.Conn) gnet.Action { return gnet.None } + // trying upgrader conn if !upgraderConn.successUpgraded { if _, err := s.upgrader.Upgrade(upgraderConn); err != nil { log.Printf("[-] upgrade error: %s, remote: %s\n", err.Error(), c.RemoteAddr()) @@ -114,18 +155,22 @@ func (s *Server) OnTraffic(c gnet.Conn) gnet.Action { return gnet.None } + // waiting client message messages, err := wsutil.ReadClientMessage(upgraderConn, nil) if err != nil { + log.Printf("[-] read client message error: %s, remote: %s\n", err.Error(), c.RemoteAddr()) s.closeWS(upgraderConn, ws.StatusUnsupportedData, err) return gnet.Close } + // handle client message for _, message := range messages { switch message.OpCode { case ws.OpPing: wsutil.WriteServerMessage(upgraderConn, ws.OpPong, nil) upgraderConn.UpdateActive() - case ws.OpText: + case ws.OpText, ws.OpBinary: + // async handle s.workerPool.Submit(func() { s.handler(&HandlerParams{ OpCode: message.OpCode, @@ -145,11 +190,14 @@ func (s *Server) OnTraffic(c gnet.Conn) gnet.Action { } func (s *Server) ListenAndServer(opts ...gnet.Option) error { + s.setupTimeoutHandler() return gnet.Run(s, s.addr, opts...) } func NewServer(addr string, opts ...OptionFunc) *Server { - s := &Server{addr: addr} + s := &Server{ + addr: addr, + } for _, opt := range opts { opt(s) } diff --git a/server_test.go b/server_test.go index b408591..9a48d8a 100644 --- a/server_test.go +++ b/server_test.go @@ -18,7 +18,7 @@ func Handler(req *qWebsocket.HandlerParams) { func TestServer_ListenAndServer(t *testing.T) { server := qWebsocket.NewServer( - "tcp://127.0.0.1:8080", + "tcp://127.0.0.1:9010", qWebsocket.WithUpgrader(&ws.Upgrader{ OnRequest: qWebsocket.RequestProxy(func(req *url.URL) error { return nil