forked from nange/easyss
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrelay.go
155 lines (140 loc) · 3.91 KB
/
relay.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
package easyss
import (
"io"
"net"
"sync"
"time"
"github.com/nange/easypool"
"github.com/nange/easyss/cipherstream"
log "github.com/sirupsen/logrus"
)
var connStateBuf = sync.Pool{
New: func() interface{} {
buf := make([]byte, 32)
return buf
},
}
// relay copies between cipherstream and plaintxtstream.
// return the number of bytes copies
// from plaintxtstream to cipherstream, from cipherstream to plaintxtstream, and needclose on server conn
func relay(cipher, plaintxt io.ReadWriteCloser) (n1 int64, n2 int64, needclose bool) {
type res struct {
N int64
Err error
}
ch1 := make(chan res, 1)
ch2 := make(chan res, 1)
go func() {
n, err := io.Copy(plaintxt, cipher)
ch2 <- res{N: n, Err: err}
}()
go func() {
n, err := io.Copy(cipher, plaintxt)
ch1 <- res{N: n, Err: err}
}()
var state *ConnState
RELAY:
for i := 0; i < 2; i++ {
select {
case res1 := <-ch1:
setDeadline2Now(cipher, plaintxt)
n1 = res1.N
err := res1.Err
if cipherstream.EncryptErr(err) || cipherstream.WriteCipherErr(err) {
log.Warnf("io.Copy err:%+v, maybe underline connection have been closed", err)
markCipherStreamUnusable(cipher)
break RELAY
}
if i == 0 {
log.Infof("read plaintxt stream error, set start state. details:%v", err)
buf := connStateBuf.Get().([]byte)
defer connStateBuf.Put(buf)
state = NewConnState(FIN_WAIT1, buf)
} else if err != nil {
if !cipherstream.TimeoutErr(err) {
log.Errorf("execpt error is net: io timeout. but get:%v", err)
}
}
case res2 := <-ch2:
setDeadline2Now(cipher, plaintxt)
n2 = res2.N
err := res2.Err
if cipherstream.DecryptErr(err) || cipherstream.ReadCipherErr(err) {
log.Warnf("io.Copy err:%+v, maybe underline connection have been closed", err)
markCipherStreamUnusable(cipher)
break RELAY
}
if i == 0 {
if cipherstream.FINRSTStreamErr(err) {
log.Infof("read cipher stream ErrFINRSTStream, set start state")
buf := connStateBuf.Get().([]byte)
defer connStateBuf.Put(buf)
state = NewConnState(CLOSE_WAIT, buf)
} else {
log.Errorf("execpt error is ErrFINRSTStream, but get:%v", err)
markCipherStreamUnusable(cipher)
break RELAY
}
}
}
}
if cipherStreamUnusable(cipher) {
needclose = true
return
}
setCipherDeadline(cipher)
if state == nil {
log.Errorf("unexcepted state, some unexcepted error occor")
needclose = true
return
}
for statefn := state.fn; statefn != nil; {
statefn = statefn(cipher).fn
}
if state.err != nil {
log.Warnf("state err:%+v, state:%v", state.err, state.state)
markCipherStreamUnusable(cipher)
needclose = true
}
return
}
// mark the cipher stream unusable, return mark result
func markCipherStreamUnusable(cipher io.ReadWriteCloser) bool {
if cs, ok := cipher.(*cipherstream.CipherStream); ok {
if pc, ok := cs.ReadWriteCloser.(*easypool.PoolConn); ok {
log.Infof("mark cipher stream unusable")
pc.MarkUnusable()
return true
}
}
return false
}
// return true if the cipher stream is unusable
func cipherStreamUnusable(cipher io.ReadWriteCloser) bool {
if cs, ok := cipher.(*cipherstream.CipherStream); ok {
if pc, ok := cs.ReadWriteCloser.(*easypool.PoolConn); ok {
return pc.IsUnusable()
}
}
return false
}
func setDeadline2Now(cipher, plaintxt io.ReadWriteCloser) {
if conn, ok := plaintxt.(net.Conn); ok {
log.Infof("set plaintxt tcp connection deadline to now")
conn.SetDeadline(time.Now())
}
if cs, ok := cipher.(*cipherstream.CipherStream); ok {
if conn, ok := cs.ReadWriteCloser.(net.Conn); ok {
log.Infof("set cipher tcp connection deadline to now")
conn.SetDeadline(time.Now())
}
}
}
func setCipherDeadline(cipher io.ReadWriteCloser) {
if cs, ok := cipher.(*cipherstream.CipherStream); ok {
if conn, ok := cs.ReadWriteCloser.(net.Conn); ok {
log.Infof("set cipher tcp connection deadline to 15 second later")
conn.SetDeadline(time.Now().Add(15 * time.Second))
}
}
}