Skip to content

Commit

Permalink
based layer4 conn timeout impl
Browse files Browse the repository at this point in the history
  • Loading branch information
PotatoCloud committed Aug 2, 2023
1 parent 9ba4bc4 commit 5fef027
Show file tree
Hide file tree
Showing 6 changed files with 88 additions and 11 deletions.
10 changes: 10 additions & 0 deletions default.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package qWebsocket

import "time"

const (
DefaultWorkerPoolExpiry = time.Second * 10
DefaultWorkerPoolNonBlocking = true

DefaultConnTimeout = time.Second * 15
)
13 changes: 9 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,18 @@ module github.com/RealFax/q-websocket

go 1.20

require (
github.com/gobwas/ws v1.2.1
github.com/google/uuid v1.3.0
github.com/jellydator/ttlcache/v3 v3.0.1
github.com/panjf2000/ants/v2 v2.8.1
github.com/panjf2000/gnet/v2 v2.3.1
github.com/pkg/errors v0.8.1
)

require (
github.com/gobwas/httphead v0.1.0 // indirect
github.com/gobwas/pool v0.2.1 // indirect
github.com/gobwas/ws v1.2.1 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/panjf2000/ants/v2 v2.8.1 // indirect
github.com/panjf2000/gnet/v2 v2.3.1 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,13 @@ github.com/gobwas/ws v1.2.1 h1:F2aeBZrm2NDsc7vbovKrWSogd4wvfAxg0FQ89/iqOTk=
github.com/gobwas/ws v1.2.1/go.mod h1:hRKAFb8wOxFROYNsT1bqfWnhX+b5MFeJM9r2ZSwg/KY=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/jellydator/ttlcache/v3 v3.0.1 h1:cHgCSMS7TdQcoprXnWUptJZzyFsqs18Lt8VVhRuZYVU=
github.com/jellydator/ttlcache/v3 v3.0.1/go.mod h1:WwTaEmcXQ3MTjOm4bsZoDFiCu/hMvNWLO1w67RXz6h4=
github.com/panjf2000/ants/v2 v2.8.1 h1:C+n/f++aiW8kHCExKlpX6X+okmxKXP7DWLutxuAPuwQ=
github.com/panjf2000/ants/v2 v2.8.1/go.mod h1:KIBmYG9QQX5U2qzFP/yQJaq/nSb6rahS9iEHkrCMgM8=
github.com/panjf2000/gnet/v2 v2.3.1 h1:J7vHkNxwsevVIw3u/6LCXgcnpGBk5iKqhQ2RMblGodc=
github.com/panjf2000/gnet/v2 v2.3.1/go.mod h1:Ik5lTy2nmBg9Uvjfcf2KRYs+EXVNOLyxPHpFOFlqu+M=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
Expand Down
11 changes: 11 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@ import (
"context"
"crypto/tls"
"github.com/gobwas/ws"
"github.com/jellydator/ttlcache/v3"
"github.com/panjf2000/ants/v2"
"github.com/panjf2000/gnet/v2"
"net"
"time"
)

// ---------- server options ---------- //
Expand All @@ -30,6 +33,14 @@ func WithUpgrader(upgrader *ws.Upgrader) OptionFunc {
}
}

func WithConnTimeout(timeout time.Duration) OptionFunc {
return func(s *Server) {
s.keepConnTable = ttlcache.New[string, gnet.Conn](
ttlcache.WithTTL[string, gnet.Conn](timeout),
)
}
}

func WithOnCloseHandler(handler OnCloseHandlerFunc) OptionFunc {
return func(s *Server) {
s.onCloseHandler = handler
Expand Down
60 changes: 54 additions & 6 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import (
"context"
"github.com/gobwas/ws"
"github.com/gobwas/ws/wsutil"
"github.com/jellydator/ttlcache/v3"
"github.com/panjf2000/ants/v2"
"github.com/panjf2000/gnet/v2"
"github.com/pkg/errors"
"log"
"sync/atomic"
"time"
Expand All @@ -19,26 +21,33 @@ type Server struct {
engine gnet.Engine
workerPool *ants.Pool
upgrader *ws.Upgrader
keepConnTable *ttlcache.Cache[string, gnet.Conn]
onCloseHandler OnCloseHandlerFunc
handler HandlerFunc
}

func (s *Server) withDefault() {
// check & init

if s.ctx == nil {
WithContext(context.Background())(s)
}

if s.workerPool == nil {
WithWorkerPool(1024*1024, ants.Options{
ExpiryDuration: time.Second * 10,
Nonblocking: true,
ExpiryDuration: DefaultWorkerPoolExpiry,
Nonblocking: DefaultWorkerPoolNonBlocking,
})(s)
}

if s.upgrader == nil {
WithUpgrader(emptyUpgrader)(s)
}

if s.keepConnTable == nil {
WithConnTimeout(DefaultConnTimeout)(s)
}

if s.onCloseHandler == nil {
WithOnCloseHandler(EmptyOnCloseHandler)(s)
}
Expand All @@ -58,6 +67,30 @@ func (s *Server) closeWS(conn *GNetUpgraderConn, statusCode ws.StatusCode, reaso
}())))
}

func (s *Server) setupTimeoutHandler() {
// on connect timeout handler
s.keepConnTable.OnEviction(func(
_ context.Context,
reason ttlcache.EvictionReason,
item *ttlcache.Item[string, gnet.Conn],
) {
atomic.AddInt64(&s.connNum, -1)
upgraderConn, ok := item.Value().Context().(*GNetUpgraderConn)
if !ok {
item.Value().Close()
return
}
s.closeWS(upgraderConn, ws.StatusGoingAway, errors.New("timeout"))
})

// start monitor connect ttl
go s.keepConnTable.Start()
go func() {
time.Sleep(time.Hour * 1)
s.keepConnTable.DeleteExpired()
}()
}

func (s *Server) Online() int64 {
return atomic.LoadInt64(&s.connNum)
}
Expand All @@ -78,13 +111,17 @@ func (s *Server) OnBoot(eng gnet.Engine) gnet.Action {

func (s *Server) OnShutdown(_ gnet.Engine) {}

func (s *Server) OnOpen(_ gnet.Conn) ([]byte, gnet.Action) {
func (s *Server) OnOpen(c gnet.Conn) ([]byte, gnet.Action) {
atomic.AddInt64(&s.connNum, 1)
// monitor conn timeout
s.keepConnTable.Set(c.RemoteAddr().String(), c, ttlcache.DefaultTTL)
return nil, gnet.None
}

func (s *Server) OnClose(_ gnet.Conn, _ error) gnet.Action {
func (s *Server) OnClose(c gnet.Conn, _ error) gnet.Action {
atomic.AddInt64(&s.connNum, -1)
// conn closed, remove conn in monitor list
s.keepConnTable.Delete(c.RemoteAddr().String())
return gnet.None
}

Expand All @@ -93,6 +130,9 @@ func (s *Server) OnTick() (time.Duration, gnet.Action) {
}

func (s *Server) OnTraffic(c gnet.Conn) gnet.Action {
// reset conn ttl
s.keepConnTable.Set(c.RemoteAddr().String(), c, time.Second*5)

if c.Context() == nil {
c.SetContext(NewUpgraderConn(c))
}
Expand All @@ -103,6 +143,7 @@ func (s *Server) OnTraffic(c gnet.Conn) gnet.Action {
return gnet.None
}

// trying upgrader conn
if !upgraderConn.successUpgraded {
if _, err := s.upgrader.Upgrade(upgraderConn); err != nil {
log.Printf("[-] upgrade error: %s, remote: %s\n", err.Error(), c.RemoteAddr())
Expand All @@ -114,18 +155,22 @@ func (s *Server) OnTraffic(c gnet.Conn) gnet.Action {
return gnet.None
}

// waiting client message
messages, err := wsutil.ReadClientMessage(upgraderConn, nil)
if err != nil {
log.Printf("[-] read client message error: %s, remote: %s\n", err.Error(), c.RemoteAddr())
s.closeWS(upgraderConn, ws.StatusUnsupportedData, err)
return gnet.Close
}

// handle client message
for _, message := range messages {
switch message.OpCode {
case ws.OpPing:
wsutil.WriteServerMessage(upgraderConn, ws.OpPong, nil)
upgraderConn.UpdateActive()
case ws.OpText:
case ws.OpText, ws.OpBinary:
// async handle
s.workerPool.Submit(func() {
s.handler(&HandlerParams{
OpCode: message.OpCode,
Expand All @@ -145,11 +190,14 @@ func (s *Server) OnTraffic(c gnet.Conn) gnet.Action {
}

func (s *Server) ListenAndServer(opts ...gnet.Option) error {
s.setupTimeoutHandler()
return gnet.Run(s, s.addr, opts...)
}

func NewServer(addr string, opts ...OptionFunc) *Server {
s := &Server{addr: addr}
s := &Server{
addr: addr,
}
for _, opt := range opts {
opt(s)
}
Expand Down
2 changes: 1 addition & 1 deletion server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func Handler(req *qWebsocket.HandlerParams) {

func TestServer_ListenAndServer(t *testing.T) {
server := qWebsocket.NewServer(
"tcp://127.0.0.1:8080",
"tcp://127.0.0.1:9010",
qWebsocket.WithUpgrader(&ws.Upgrader{
OnRequest: qWebsocket.RequestProxy(func(req *url.URL) error {
return nil
Expand Down

0 comments on commit 5fef027

Please sign in to comment.