Skip to content

Commit

Permalink
Merge pull request #295 from lesismal/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
lesismal authored May 14, 2023
2 parents 6e2a0b1 + e72dd12 commit 0bf3b46
Show file tree
Hide file tree
Showing 7 changed files with 276 additions and 93 deletions.
2 changes: 1 addition & 1 deletion engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (

const (
// DefaultReadBufferSize .
DefaultReadBufferSize = 1024 * 32
DefaultReadBufferSize = 1024 * 64

// DefaultMaxWriteBufferSize .
DefaultMaxWriteBufferSize = 1024 * 1024
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@ module github.com/lesismal/nbio
go 1.16

require (
github.com/lesismal/llib v1.1.10
github.com/lesismal/llib v1.1.12
golang.org/x/crypto v0.6.0 // indirect
)
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
github.com/lesismal/llib v1.1.10 h1:6k6OYfp5+CYEK2nGAytpC6l9FO+nNs7gA/mpK+lPUkI=
github.com/lesismal/llib v1.1.10/go.mod h1:70tFXXe7P1FZ02AU9l8LgSOK7d7sRrpnkUr3rd3gKSg=
github.com/lesismal/llib v1.1.12 h1:KJFB8bL02V+QGIvILEw/w7s6bKj9Ps9Px97MZP2EOk0=
github.com/lesismal/llib v1.1.12/go.mod h1:70tFXXe7P1FZ02AU9l8LgSOK7d7sRrpnkUr3rd3gKSg=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20210513122933-cd7d49e622d5/go.mod h1:P+XmwS30IXTQdn5tA2iutPOUgjI07+tq3H3K9MVA1s8=
Expand Down
37 changes: 34 additions & 3 deletions nbhttp/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,17 @@ type Config struct {
MaxBlockingOnline int
// BlockingReadBufferSize represents read buffer size of blocking mod.
BlockingReadBufferSize int

// WebsocketCompressor .
WebsocketCompressor func() interface {
Compress([]byte) []byte
Close()
}
// WebsocketDecompressor .
WebsocketDecompressor func() interface {
Decompress([]byte) ([]byte, error)
Close()
}
}

// Engine .
Expand Down Expand Up @@ -536,6 +547,21 @@ func (e *Engine) TLSDataHandler(c *nbio.Conn, data []byte) {
}
}

// AddConnTLSNonBlocking .
func (engine *Engine) AddTransferredConn(nbc *nbio.Conn) error {
engine.mux.Lock()
if len(engine.conns) >= engine.MaxLoad {
engine.mux.Unlock()
nbc.Close()
return ErrServiceOverload
}
engine.conns[nbc] = struct{}{}
engine.mux.Unlock()
engine._onOpen(nbc)
engine.AddConn(nbc)
return nil
}

// AddConnNonTLSNonBlocking .
func (engine *Engine) AddConnNonTLSNonBlocking(c net.Conn, tlsConfig *tls.Config, decrease func()) {
nbc, err := nbio.NBConn(c)
Expand Down Expand Up @@ -915,9 +941,14 @@ func NewEngine(conf Config) *Engine {
// g.OnOpen(engine.ServerOnOpen)
g.OnClose(func(c *nbio.Conn, err error) {
c.MustExecute(func() {
parser, ok := c.Session().(*Parser)
if ok && parser != nil {
parser.Close(err)
switch vt := c.Session().(type) {
case *Parser:
vt.Close(err)
case interface {
Close(*Parser, error)
}:
vt.Close(nil, err)
default:
}
engine._onClose(c, err)
engine.mux.Lock()
Expand Down
4 changes: 4 additions & 0 deletions nbhttp/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,7 @@ var (
// ErrClientClosed .
ErrClientClosed = errors.New("http client closed")
)

var (
ErrServiceOverload = errors.New("service overload")
)
30 changes: 18 additions & 12 deletions nbhttp/websocket/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,21 +146,27 @@ func (c *Conn) WriteMessage(messageType MessageType, data []byte) error {

compress := c.enableWriteCompression && (messageType == TextMessage || messageType == BinaryMessage)
if compress {
compress = true
// compress = true
// if user customize mempool, they should promise it's safe to mempool.Free a buffer which is not from their mempool.Malloc
// or we need to implement a writebuffer that use mempool.Realloc to grow or append the buffer
w := &writeBuffer{
Buffer: bytes.NewBuffer(mempool.Malloc(len(data))),
}
defer w.Close()
w.Reset()
cw := compressWriter(w, c.compressionLevel)
_, err := cw.Write(data)
if err != nil {
compress = false
if c.Engine.WebsocketCompressor != nil {
compressor := c.Engine.WebsocketCompressor()
defer compressor.Close()
data = compressor.Compress(data)
} else {
cw.Close()
data = w.Bytes()
w := &writeBuffer{
Buffer: bytes.NewBuffer(mempool.Malloc(len(data))),
}
defer w.Close()
w.Reset()
cw := compressWriter(w, c.compressionLevel)
_, err := cw.Write(data)
if err != nil {
compress = false
} else {
cw.Close()
data = w.Bytes()
}
}
}

Expand Down
Loading

0 comments on commit 0bf3b46

Please sign in to comment.