-
Notifications
You must be signed in to change notification settings - Fork 20
/
Copy pathsession.go
541 lines (471 loc) · 14.6 KB
/
session.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
package goetty
import (
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"io"
"net"
"os"
"sync/atomic"
"time"
"github.com/fagongzi/goetty/v3/buf"
"github.com/fagongzi/goetty/v3/codec"
"go.uber.org/zap"
)
var (
// ErrIllegalState illegal state error
ErrIllegalState = errors.New("illegal state")
// ErrDisableConnect disable to connect
ErrDisableConnect = errors.New("io session is disable to connect")
stateReadyToConnect int32 = 0
stateConnecting int32 = 1
stateConnected int32 = 2
stateClosed int32 = 3
)
// WriteOptions write options
type WriteOptions struct {
// Timeout deadline for write
Timeout time.Duration
// Flush flush data to net.Conn
Flush bool
}
// ReadOptions read options
type ReadOptions struct {
// Timeout deadline for read
Timeout time.Duration
}
// Option option to create IOSession
type Option[IN any, OUT any] func(*baseIO[IN, OUT])
// WithSessionLogger set logger for IOSession
func WithSessionLogger[IN any, OUT any](logger *zap.Logger) Option[IN, OUT] {
return func(bio *baseIO[IN, OUT]) {
bio.logger = logger
}
}
// WithSessionAllocator set mem allocator to build in and out ByteBuf
func WithSessionAllocator[IN any, OUT any](allocator buf.Allocator) Option[IN, OUT] {
return func(bio *baseIO[IN, OUT]) {
bio.options.allocator = allocator
}
}
// WithSessionCodec set codec for IOSession
func WithSessionCodec[IN any, OUT any](codec codec.Codec[IN, OUT]) Option[IN, OUT] {
return func(bio *baseIO[IN, OUT]) {
bio.options.codec = codec
}
}
// WithSessionRWBufferSize set read/write buf size for IOSession
func WithSessionRWBufferSize[IN any, OUT any](read, write int) Option[IN, OUT] {
return func(bio *baseIO[IN, OUT]) {
bio.options.readBufSize = read
bio.options.writeBufSize = write
}
}
// WithSessionConn set IOSession's net.Conn
func WithSessionConn[IN any, OUT any](id uint64, conn net.Conn) Option[IN, OUT] {
return func(bio *baseIO[IN, OUT]) {
bio.conn = conn
bio.id = id
}
}
// WithSessionAware set IOSession's session aware
func WithSessionAware[IN any, OUT any](value IOSessionAware[IN, OUT]) Option[IN, OUT] {
return func(bio *baseIO[IN, OUT]) {
bio.options.aware = value
}
}
// WithSessionReleaseMsgFunc set a func to release message once the message encode into the write buf
func WithSessionReleaseMsgFunc[IN any, OUT any](value func(any)) Option[IN, OUT] {
return func(bio *baseIO[IN, OUT]) {
bio.options.releaseMsgFunc = value
}
}
// WithSessionTLS set tls for client
func WithSessionTLS[IN any, OUT any](tlsConfig *tls.Config) Option[IN, OUT] {
return func(bio *baseIO[IN, OUT]) {
bio.options.dial = func(network, address string, timeout time.Duration) (net.Conn, error) {
return tls.DialWithDialer(&net.Dialer{Timeout: timeout}, network, address, tlsConfig)
}
}
}
// WithSessionDisableCompactAfterGrow set Set whether the buffer should be compressed,
// if it is, it will reset the reader and writer index. Default is true.
func WithSessionDisableCompactAfterGrow[IN any, OUT any]() Option[IN, OUT] {
return func(bio *baseIO[IN, OUT]) {
bio.options.disableCompactAfterGrow = true
}
}
// WithSessionTLSFromCertAndKeys set tls for client
func WithSessionTLSFromCertAndKeys[IN any, OUT any](certFile, keyFile, caFile string, insecureSkipVerify bool) Option[IN, OUT] {
return func(bio *baseIO[IN, OUT]) {
bio.options.dial = func(network, address string, timeout time.Duration) (net.Conn, error) {
cert, err := tls.LoadX509KeyPair(certFile, keyFile)
if err != nil {
return nil, err
}
data, err := os.ReadFile(caFile)
if err != nil {
return nil, err
}
certPool := x509.NewCertPool()
if !certPool.AppendCertsFromPEM(data) {
return nil, fmt.Errorf("append %s to root CAs failed", caFile)
}
conf := &tls.Config{
RootCAs: certPool,
Certificates: []tls.Certificate{cert},
InsecureSkipVerify: insecureSkipVerify,
}
return tls.DialWithDialer(&net.Dialer{Timeout: timeout}, network, address, conf)
}
}
}
// WithSessionDisableAutoResetInBuffer set disable auto reset in buffer. If disabled, the
// application must reset in buffer in the read loop, otherwise there will be a memory leak.
func WithSessionDisableAutoResetInBuffer[IN any, OUT any]() Option[IN, OUT] {
return func(bio *baseIO[IN, OUT]) {
bio.options.disableAutoResetInBuffer = true
}
}
// IOSession internally holds a raw net.Conn on which to provide read and write operations
type IOSession[IN any, OUT any] interface {
// ID session id
ID() uint64
// Connect connect to address, only used at client-side
Connect(addr string, timeout time.Duration) error
// Connected returns true if connection is ok
Connected() bool
// Disconnect disconnect the connection
Disconnect() error
// Close close the session, the read and write buffer will closed, and cannot Connect
// again. IOSession reference count minus 1.
Close() error
// Ref for IOSessions, held by several goroutines, several references are needed. Each
// concurrent process holding an IOSession can Close the IOSession and release the resource
// when the reference count reaches 0.
Ref()
// Read read packet from connection
Read(option ReadOptions) (IN, error)
// Write encodes the msg into a []byte into the buffer according to the codec.Encode.
// If flush is set to false, the data will not be written to the underlying socket.
Write(msg OUT, options WriteOptions) error
// Flush flush the out buffer
Flush(timeout time.Duration) error
// RemoteAddress returns remote address, include ip and port
RemoteAddress() string
// RawConn return raw tcp conn, RawConn should only be used to access the underlying
// attributes of the tcp conn, e.g. set keepalive attributes. Read from RawConn directly
// may lose data since the bytes might have been copied to the InBuf.
// To perform read/write operation on the underlying tcp conn, use BufferedConn instead.
RawConn() net.Conn
// UseConn use the specified conn to handle reads and writes. Note that conn reads and
// writes cannot be handled in other goroutines until UseConn is called.
UseConn(net.Conn)
// OutBuf returns byte buffer which used to encode message into bytes
OutBuf() *buf.ByteBuf
// InBuf returns input buffer which used to decode bytes to message
InBuf() *buf.ByteBuf
}
// BufferedIOSession is a IOSession that can read from the in-buffer first
type BufferedIOSession interface {
// BufferedConn returns a wrapped net.Conn that read from IOSession's in-buffer first
BufferedConn() net.Conn
}
type baseIO[IN any, OUT any] struct {
id uint64
state int32
conn net.Conn
localAddr, remoteAddr string
in *buf.ByteBuf
out *buf.ByteBuf
disableConnect bool
logger *zap.Logger
readCopyBuf []byte
writeCopyBuf []byte
options struct {
aware IOSessionAware[IN, OUT]
codec codec.Codec[IN, OUT]
readBufSize, writeBufSize int
readCopyBufSize, writeCopyBufSize int
releaseMsgFunc func(any)
allocator buf.Allocator
dial func(network, address string, timeout time.Duration) (net.Conn, error)
disableAutoResetInBuffer bool
disableCompactAfterGrow bool
}
atomic struct {
ref int32
}
}
// NewIOSession create a new io session
func NewIOSession[IN any, OUT any](opts ...Option[IN, OUT]) IOSession[IN, OUT] {
bio := &baseIO[IN, OUT]{}
for _, opt := range opts {
opt(bio)
}
bio.adjust()
bio.Ref()
bio.readCopyBuf = make([]byte, bio.options.readCopyBufSize)
bio.writeCopyBuf = make([]byte, bio.options.writeCopyBufSize)
if bio.conn != nil {
bio.initConn()
bio.disableConnect = true
}
if bio.options.aware != nil {
bio.options.aware.Created(bio)
}
return bio
}
func (bio *baseIO[IN, OUT]) adjust() {
bio.logger = adjustLogger(bio.logger).With(zap.Uint64("session-id", bio.id))
if bio.options.readBufSize == 0 {
bio.options.readBufSize = defaultReadBuf
}
if bio.options.readCopyBufSize == 0 {
bio.options.readCopyBufSize = defaultReadCopyBuf
}
if bio.options.writeBufSize == 0 {
bio.options.writeBufSize = defaultWriteBuf
}
if bio.options.writeCopyBufSize == 0 {
bio.options.writeCopyBufSize = defaultWriteCopyBuf
}
if bio.options.releaseMsgFunc == nil {
bio.options.releaseMsgFunc = func(any) {}
}
if bio.options.dial == nil {
bio.options.dial = net.DialTimeout
}
}
func (bio *baseIO[IN, OUT]) ID() uint64 {
return bio.id
}
func (bio *baseIO[IN, OUT]) Connect(addressWithNetwork string, timeout time.Duration) error {
network, address, err := parseAddress(addressWithNetwork)
if err != nil {
return err
}
if bio.disableConnect {
return ErrDisableConnect
}
old := bio.getState()
switch old {
case stateReadyToConnect:
break
case stateClosed:
return fmt.Errorf("the session is closed")
case stateConnecting:
return fmt.Errorf("the session is connecting in other goroutine")
case stateConnected:
return nil
}
if !atomic.CompareAndSwapInt32(&bio.state, stateReadyToConnect, stateConnecting) {
current := bio.getState()
if current == stateConnected {
return nil
}
return fmt.Errorf("the session is closing or connecting is other goroutine")
}
conn, err := bio.options.dial(network, address, timeout)
if nil != err {
atomic.StoreInt32(&bio.state, stateReadyToConnect)
return err
}
bio.conn = conn
bio.initConn()
return nil
}
func (bio *baseIO[IN, OUT]) Connected() bool {
return bio.getState() == stateConnected
}
func (bio *baseIO[IN, OUT]) Disconnect() error {
old := bio.getState()
switch old {
case stateReadyToConnect, stateClosed:
return nil
case stateConnecting:
return fmt.Errorf("the session is connecting in other goroutine")
case stateConnected:
break
}
if !atomic.CompareAndSwapInt32(&bio.state, stateConnected, stateReadyToConnect) {
current := bio.getState()
if current == stateReadyToConnect {
return nil
}
return fmt.Errorf("the session is closing or connecting is other goroutine")
}
bio.closeConn()
atomic.StoreInt32(&bio.state, stateReadyToConnect)
return nil
}
func (bio *baseIO[IN, OUT]) Ref() {
atomic.AddInt32(&bio.atomic.ref, 1)
}
func (bio *baseIO[IN, OUT]) unRef() int32 {
return atomic.AddInt32(&bio.atomic.ref, -1)
}
func (bio *baseIO[IN, OUT]) RawConn() net.Conn {
return bio.conn
}
func (bio *baseIO[IN, OUT]) BufferedConn() net.Conn {
return newBufferedConn[IN, OUT](bio.conn, bio)
}
func (bio *baseIO[IN, OUT]) UseConn(conn net.Conn) {
bio.conn = conn
}
func (bio *baseIO[IN, OUT]) Close() error {
bio.closeConn()
ref := bio.unRef()
if ref < 0 {
panic("invalid ref count")
}
if ref > 0 {
return nil
}
OUTER:
for {
old := bio.getState()
switch old {
case stateReadyToConnect, stateClosed:
break OUTER
case stateConnecting:
return fmt.Errorf("the session is connecting in other goroutine")
case stateConnected:
}
if atomic.CompareAndSwapInt32(&bio.state, stateConnected, stateClosed) {
break
}
}
if bio.out != nil {
bio.out.Close()
}
if bio.in != nil {
bio.in.Close()
}
atomic.StoreInt32(&bio.state, stateClosed)
if bio.options.aware != nil {
bio.options.aware.Closed(bio)
}
bio.logger.Debug("IOSession closed")
return nil
}
func (bio *baseIO[IN, OUT]) Read(options ReadOptions) (IN, error) {
var msg IN
for {
if !bio.Connected() {
return msg, ErrIllegalState
}
var err error
var complete bool
for {
if bio.in.Readable() > 0 {
msg, complete, err = bio.options.codec.Decode(bio.in)
if !complete && err == nil {
msg, complete, err = bio.readFromConn(options.Timeout)
}
} else {
if !bio.options.disableAutoResetInBuffer {
bio.in.Reset()
}
msg, complete, err = bio.readFromConn(options.Timeout)
}
if nil != err {
bio.in.Reset()
return msg, err
}
if complete {
if !bio.options.disableAutoResetInBuffer && bio.in.Readable() == 0 {
bio.in.Reset()
}
return msg, nil
}
}
}
}
func (bio *baseIO[IN, OUT]) Write(
msg OUT,
options WriteOptions) error {
if !bio.Connected() {
return ErrIllegalState
}
err := bio.options.codec.Encode(msg, bio.out, bio.conn)
bio.options.releaseMsgFunc(msg)
if err != nil {
return err
}
if options.Flush && bio.out.Readable() > 0 {
err = bio.Flush(options.Timeout)
if err != nil {
return err
}
}
return nil
}
func (bio *baseIO[IN, OUT]) Flush(timeout time.Duration) error {
defer bio.out.Reset()
if !bio.Connected() {
return ErrIllegalState
}
if timeout != 0 {
bio.conn.SetWriteDeadline(time.Now().Add(timeout))
} else {
bio.conn.SetWriteDeadline(time.Time{})
}
_, err := io.CopyBuffer(bio.conn, bio.out, bio.writeCopyBuf)
if err == nil || err == io.EOF {
return nil
}
return err
}
func (bio *baseIO[IN, OUT]) RemoteAddress() string {
return bio.remoteAddr
}
func (bio *baseIO[IN, OUT]) OutBuf() *buf.ByteBuf {
return bio.out
}
func (bio *baseIO[IN, OUT]) InBuf() *buf.ByteBuf {
return bio.in
}
func (bio *baseIO[IN, OUT]) readFromConn(timeout time.Duration) (IN, bool, error) {
var v IN
if timeout != 0 {
bio.conn.SetReadDeadline(time.Now().Add(timeout))
} else {
bio.conn.SetReadDeadline(time.Time{})
}
n, err := io.CopyBuffer(bio.in, bio.conn, bio.readCopyBuf)
if err != nil {
return v, false, err
}
if n == 0 {
return v, false, io.EOF
}
return bio.options.codec.Decode(bio.in)
}
func (bio *baseIO[IN, OUT]) closeConn() {
if bio.conn != nil {
if err := bio.conn.Close(); err != nil {
bio.logger.Error("close connection failed",
zap.Error(err))
return
}
bio.logger.Debug("connection disconnected")
}
}
func (bio *baseIO[IN, OUT]) getState() int32 {
return atomic.LoadInt32(&bio.state)
}
func (bio *baseIO[IN, OUT]) initConn() {
bio.remoteAddr = bio.conn.RemoteAddr().String()
bio.localAddr = bio.conn.LocalAddr().String()
bio.in = buf.NewByteBuf(bio.options.readBufSize,
buf.WithDisableCompactAfterGrow(bio.options.disableCompactAfterGrow),
buf.WithMemAllocator(bio.options.allocator))
bio.out = buf.NewByteBuf(bio.options.writeBufSize,
buf.WithDisableCompactAfterGrow(bio.options.disableCompactAfterGrow),
buf.WithMemAllocator(bio.options.allocator))
atomic.StoreInt32(&bio.state, stateConnected)
bio.logger.Debug("session init completed")
}