Skip to content

Commit

Permalink
Merge pull request #316 from cloudwego/release-v0.6.0
Browse files Browse the repository at this point in the history
chore: release v0.6.0
  • Loading branch information
joway authored Mar 4, 2024
2 parents 4b0bb96 + 7ba622b commit 8c8b872
Show file tree
Hide file tree
Showing 17 changed files with 405 additions and 75 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/codeql-analysis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ jobs:

steps:
- name: Checkout repository
uses: actions/checkout@v3
uses: actions/checkout@v4

- name: Set up Go
uses: actions/setup-go@v2
uses: actions/setup-go@v5
with:
go-version: "1.20"

Expand Down
12 changes: 6 additions & 6 deletions .github/workflows/pr-check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ jobs:
os: [ X64, ARM64 ]
runs-on: ${{ matrix.os }}
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
- name: Set up Go
uses: actions/setup-go@v3
uses: actions/setup-go@v5
with:
go-version: ${{ matrix.go }}
# - uses: actions/cache@v2
Expand All @@ -28,9 +28,9 @@ jobs:
windows-test:
runs-on: windows-latest
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
- name: Set up Go
uses: actions/setup-go@v3
uses: actions/setup-go@v5
with:
go-version: "1.20"
# - uses: actions/cache@v2
Expand All @@ -44,9 +44,9 @@ jobs:
style-test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
- name: Set up Go
uses: actions/setup-go@v3
uses: actions/setup-go@v5
with:
go-version: 1.16
- name: Check License Header
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/release-check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v4

- name: Check Source Branch
run: python2 -c "exit(0 if '${{ github.head_ref }}'.startswith('release') or '${{ github.head_ref }}'.startswith('hotfix') else 1)"
Expand Down
2 changes: 1 addition & 1 deletion connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"time"
)

// CloseCallback will be called when the connection is closed.
// CloseCallback will be called after the connection is closed.
// Return: error is unused which will be ignored directly.
type CloseCallback func(connection Connection) error

Expand Down
20 changes: 20 additions & 0 deletions connection_errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package netpoll

import (
"fmt"
"net"
"syscall"
)

Expand Down Expand Up @@ -51,6 +52,10 @@ func Exception(err error, suffix string) error {
return &exception{no: no, suffix: suffix}
}

var (
_ net.Error = (*exception)(nil)
)

type exception struct {
no syscall.Errno
suffix string
Expand Down Expand Up @@ -88,6 +93,21 @@ func (e *exception) Unwrap() error {
return e.no
}

func (e *exception) Timeout() bool {
switch e.no {
case ErrDialTimeout, ErrReadTimeout, ErrWriteTimeout:
return true
}
if e.no.Timeout() {
return true
}
return false
}

func (e *exception) Temporary() bool {
return e.no.Temporary()
}

// Errors defined in netpoll
var errnos = [...]string{
ErrnoMask & ErrConnClosed: "connection has been closed",
Expand Down
8 changes: 5 additions & 3 deletions connection_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,9 @@ type connection struct {
outputBuffer *LinkBuffer
outputBarrier *barrier
supportZeroCopy bool
maxSize int // The maximum size of data between two Release().
bookSize int // The size of data that can be read at once.
maxSize int // The maximum size of data between two Release().
bookSize int // The size of data that can be read at once.
state int32 // 0: not connected, 1: connected, 2: disconnected. Connection state should be changed sequentially.
}

var (
Expand Down Expand Up @@ -323,6 +324,7 @@ func (c *connection) init(conn Conn, opts *options) (err error) {
c.bookSize, c.maxSize = pagesize, pagesize
c.inputBuffer, c.outputBuffer = NewLinkBuffer(pagesize), NewLinkBuffer()
c.outputBarrier = barrierPool.Get().(*barrier)
c.state = 0

c.initNetFD(conn) // conn must be *netFD{}
c.initFDOperator()
Expand Down Expand Up @@ -447,7 +449,7 @@ func (c *connection) waitReadWithTimeout(n int) (err error) {
return Exception(ErrReadTimeout, c.remoteAddr.String())
case err = <-c.readTrigger:
if err != nil {
return err
goto RET
}
continue
}
Expand Down
1 change: 1 addition & 0 deletions connection_lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type key int32

const (
closing key = iota
connecting
processing
flushing
// total must be at the bottom.
Expand Down
88 changes: 78 additions & 10 deletions connection_onevent.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ import (

var runTask = gopool.CtxGo

func setRunner(runner func(ctx context.Context, f func())) {
runTask = runner
}

func disableGopool() error {
runTask = func(ctx context.Context, f func()) {
go f()
Expand All @@ -44,10 +48,11 @@ type gracefulExit interface {
// OnPrepare, OnRequest, CloseCallback share the lock processing,
// which is a CAS lock and can only be cleared by OnRequest.
type onEvent struct {
ctx context.Context
onConnectCallback atomic.Value
onRequestCallback atomic.Value
closeCallbacks atomic.Value // value is latest *callbackNode
ctx context.Context
onConnectCallback atomic.Value
onDisconnectCallback atomic.Value
onRequestCallback atomic.Value
closeCallbacks atomic.Value // value is latest *callbackNode
}

type callbackNode struct {
Expand All @@ -63,6 +68,14 @@ func (c *connection) SetOnConnect(onConnect OnConnect) error {
return nil
}

// SetOnDisconnect set the OnDisconnect callback.
func (c *connection) SetOnDisconnect(onDisconnect OnDisconnect) error {
if onDisconnect != nil {
c.onDisconnectCallback.Store(onDisconnect)
}
return nil
}

// SetOnRequest initialize ctx when setting OnRequest.
func (c *connection) SetOnRequest(onRequest OnRequest) error {
if onRequest == nil {
Expand Down Expand Up @@ -95,6 +108,7 @@ func (c *connection) AddCloseCallback(callback CloseCallback) error {
func (c *connection) onPrepare(opts *options) (err error) {
if opts != nil {
c.SetOnConnect(opts.onConnect)
c.SetOnDisconnect(opts.onDisconnect)
c.SetOnRequest(opts.onRequest)
c.SetReadTimeout(opts.readTimeout)
c.SetWriteTimeout(opts.writeTimeout)
Expand All @@ -120,23 +134,36 @@ func (c *connection) onPrepare(opts *options) (err error) {
func (c *connection) onConnect() {
var onConnect, _ = c.onConnectCallback.Load().(OnConnect)
if onConnect == nil {
atomic.StoreInt32(&c.state, 1)
return
}
if !c.lock(connecting) {
// it never happens because onDisconnect will not lock connecting if c.connected == 0
return
}
var onRequest, _ = c.onRequestCallback.Load().(OnRequest)
var connected int32
c.onProcess(
// only process when conn active and have unread data
func(c *connection) bool {
// if onConnect not called
if atomic.LoadInt32(&connected) == 0 {
if atomic.LoadInt32(&c.state) == 0 {
return true
}
// check for onRequest
return onRequest != nil && c.Reader().Len() > 0
},
func(c *connection) {
if atomic.CompareAndSwapInt32(&connected, 0, 1) {
if atomic.CompareAndSwapInt32(&c.state, 0, 1) {
c.ctx = onConnect(c.ctx, c)

if !c.IsActive() && atomic.CompareAndSwapInt32(&c.state, 1, 2) {
// since we hold connecting lock, so we should help to call onDisconnect here
var onDisconnect, _ = c.onDisconnectCallback.Load().(OnDisconnect)
if onDisconnect != nil {
onDisconnect(c.ctx, c)
}
}
c.unlock(connecting)
return
}
if onRequest != nil {
Expand All @@ -146,12 +173,44 @@ func (c *connection) onConnect() {
)
}

// when onDisconnect called, c.IsActive() must return false
func (c *connection) onDisconnect() {
var onDisconnect, _ = c.onDisconnectCallback.Load().(OnDisconnect)
if onDisconnect == nil {
return
}
var onConnect, _ = c.onConnectCallback.Load().(OnConnect)
if onConnect == nil {
// no need lock if onConnect is nil
atomic.StoreInt32(&c.state, 2)
onDisconnect(c.ctx, c)
return
}
// check if OnConnect finished when onConnect != nil && onDisconnect != nil
if atomic.LoadInt32(&c.state) > 0 && c.lock(connecting) { // means OnConnect already finished
// protect onDisconnect run once
// if CAS return false, means OnConnect already helps to run onDisconnect
if atomic.CompareAndSwapInt32(&c.state, 1, 2) {
onDisconnect(c.ctx, c)
}
c.unlock(connecting)
return
}
// OnConnect is not finished yet, return and let onConnect helps to call onDisconnect
return
}

// onRequest is responsible for executing the closeCallbacks after the connection has been closed.
func (c *connection) onRequest() (needTrigger bool) {
var onRequest, ok = c.onRequestCallback.Load().(OnRequest)
if !ok {
return true
}
// wait onConnect finished first
if atomic.LoadInt32(&c.state) == 0 && c.onConnectCallback.Load() != nil {
// let onConnect to call onRequest
return
}
processed := c.onProcess(
// only process when conn active and have unread data
func(c *connection) bool {
Expand Down Expand Up @@ -206,7 +265,7 @@ func (c *connection) onProcess(isProcessable func(c *connection) bool, process f
}
process(c)
}
// Handling callback if connection has been closed.
// handling callback if connection has been closed.
if closedBy != none {
// if closed by user when processing, it "may" needs detach
needDetach := closedBy == user
Expand All @@ -219,15 +278,24 @@ func (c *connection) onProcess(isProcessable func(c *connection) bool, process f
return
}
c.unlock(processing)
// Double check when exiting.
// Note: Poller's closeCallback call will try to get processing lock failed but here already neer to unlock processing.
// So here we need to check connection state again, to avoid connection leak
// double check close state
if c.status(closing) != 0 && c.lock(processing) {
// poller will get the processing lock failed, here help poller do closeCallback
// fd must already detach by poller
c.closeCallback(false, false)
panicked = false
return
}
// double check isProcessable
if isProcessable(c) && c.lock(processing) {
goto START
}
// task exits
panicked = false
return
}

runTask(c.ctx, task)
return true
}
Expand Down
4 changes: 4 additions & 0 deletions connection_reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ func (c *connection) onHup(p Poll) error {
}
c.triggerRead(Exception(ErrEOF, "peer close"))
c.triggerWrite(Exception(ErrConnClosed, "peer close"))

// call Disconnect callback first
c.onDisconnect()

// It depends on closing by user if OnConnect and OnRequest is nil, otherwise it needs to be released actively.
// It can be confirmed that the OnRequest goroutine has been exited before closeCallback executing,
// and it is safe to close the buffer at this time.
Expand Down
7 changes: 5 additions & 2 deletions connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,10 +504,10 @@ func TestParallelShortConnection(t *testing.T) {
var received int64
el, err := NewEventLoop(func(ctx context.Context, connection Connection) error {
data, err := connection.Reader().Next(connection.Reader().Len())
atomic.AddInt64(&received, int64(len(data)))
if err != nil {
return err
}
atomic.AddInt64(&received, int64(len(data)))
//t.Logf("conn[%s] received: %d, active: %v", connection.RemoteAddr(), len(data), connection.IsActive())
return nil
})
Expand Down Expand Up @@ -536,10 +536,13 @@ func TestParallelShortConnection(t *testing.T) {
}
wg.Wait()

for atomic.LoadInt64(&received) < int64(totalSize) {
count := 100
for count > 0 && atomic.LoadInt64(&received) < int64(totalSize) {
t.Logf("received: %d, except: %d", atomic.LoadInt64(&received), totalSize)
time.Sleep(time.Millisecond * 100)
count--
}
Equal(t, atomic.LoadInt64(&received), int64(totalSize))
}

func TestConnectionServerClose(t *testing.T) {
Expand Down
19 changes: 19 additions & 0 deletions eventloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,20 @@ type EventLoop interface {
Shutdown(ctx context.Context) error
}

/* The Connection Callback Sequence Diagram
| Connection State | Callback Function | Notes
| Connected but not initialized | OnPrepare | Conn is not registered into poller
| Connected and initialized | OnConnect | Conn is ready for read or write
| Read first byte | OnRequest | Conn is ready for read or write
| Peer closed but conn is active | OnDisconnect | Conn access will race with OnRequest function
| Self closed and conn is closed | CloseCallback | Conn is destroyed
Execution Order:
OnPrepare => OnConnect => OnRequest => CloseCallback
OnDisconnect
Note: only OnRequest and OnDisconnect will be executed in parallel
*/

// OnPrepare is used to inject custom preparation at connection initialization,
// which is optional but important in some scenarios. For example, a qps limiter
// can be set by closing overloaded connections directly in OnPrepare.
Expand Down Expand Up @@ -63,6 +77,11 @@ type OnPrepare func(connection Connection) context.Context
// }
type OnConnect func(ctx context.Context, connection Connection) context.Context

// OnDisconnect is called once connection is going to be closed.
// OnDisconnect must return as quick as possible because it will block poller.
// OnDisconnect is different from CloseCallback, you could check with "The Connection Callback Sequence Diagram" section.
type OnDisconnect func(ctx context.Context, connection Connection)

// OnRequest defines the function for handling connection. When data is sent from the connection peer,
// netpoll actively reads the data in LT mode and places it in the connection's input buffer.
// Generally, OnRequest starts handling the data in the following way:
Expand Down
Loading

0 comments on commit 8c8b872

Please sign in to comment.