From 5d04567ec9b1a3a2e05405385deab5e6f25bf1dc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=BA=90=E6=96=87=E9=9B=A8?= <41315874+fumiama@users.noreply.github.com> Date: Tue, 16 Jul 2024 22:16:01 +0900 Subject: [PATCH] feat(tcp): add config option dialtimeout --- gold/link/nat.go | 8 ++++++++ gold/p2p/tcp/init.go | 2 ++ gold/p2p/tcp/tcp.go | 24 +++++++++++++++++++----- 3 files changed, 29 insertions(+), 5 deletions(-) diff --git a/gold/link/nat.go b/gold/link/nat.go index 5e4942a..a144722 100644 --- a/gold/link/nat.go +++ b/gold/link/nat.go @@ -19,6 +19,9 @@ func (l *Link) keepAlive(dur int64) { logrus.Infoln("[nat] start to keep alive") t := time.NewTicker(time.Second * time.Duration(dur)) for range t.C { + if l.status == LINK_STATUS_DOWN || l.me.loop == nil { + return + } n, err := l.WriteAndPut(head.NewPacket(head.ProtoHello, l.me.srcport, l.peerip, l.me.dstport, nil), false) if err == nil { logrus.Infoln("[nat] send", n, "bytes keep alive packet") @@ -78,6 +81,11 @@ func (l *Link) onQuery(packet []byte) { return } + if l == nil || l.me == nil { + logrus.Errorln("[nat] nil link/me") + return + } + // 2. notify分发 // ---- 封装 Notify 到 新的 packet // ---- 调用 l.Send 发送到对方 diff --git a/gold/p2p/tcp/init.go b/gold/p2p/tcp/init.go index cb240cc..e1873e8 100644 --- a/gold/p2p/tcp/init.go +++ b/gold/p2p/tcp/init.go @@ -9,6 +9,7 @@ import ( ) type Config struct { + DialTimeout time.Duration PeersTimeout time.Duration ReceiveChannelSize int } @@ -28,6 +29,7 @@ func newEndpoint(endpoint string, configs ...any) *EndPoint { addr: net.TCPAddrFromAddrPort( netip.MustParseAddrPort(endpoint), ), + dialtimeout: cfg.DialTimeout, peerstimeout: cfg.PeersTimeout, recvchansize: cfg.ReceiveChannelSize, } diff --git a/gold/p2p/tcp/tcp.go b/gold/p2p/tcp/tcp.go index 3604923..61118e4 100644 --- a/gold/p2p/tcp/tcp.go +++ b/gold/p2p/tcp/tcp.go @@ -5,6 +5,7 @@ import ( "io" "math/rand" "net" + "reflect" "strconv" "time" @@ -16,6 +17,7 @@ import ( type EndPoint struct { addr *net.TCPAddr + dialtimeout time.Duration peerstimeout time.Duration recvchansize int } @@ -45,9 +47,9 @@ func (ep *EndPoint) Listen() (p2p.Conn, error) { return nil, err } ep.addr = lstn.Addr().(*net.TCPAddr) - timeout := ep.peerstimeout - if timeout < time.Second { - timeout = time.Second + peerstimeout := ep.peerstimeout + if peerstimeout < time.Second { + peerstimeout = time.Second } chansz := ep.recvchansize if chansz < 32 { @@ -56,7 +58,7 @@ func (ep *EndPoint) Listen() (p2p.Conn, error) { conn := &Conn{ addr: ep, lstn: lstn, - peers: ttl.NewCacheOn(timeout, [4]func(string, *net.TCPConn){ + peers: ttl.NewCacheOn(peerstimeout, [4]func(string, *net.TCPConn){ nil, nil, func(s string, t *net.TCPConn) { @@ -122,6 +124,7 @@ func (conn *Conn) accept() { continue } ep := newEndpoint(tcpconn.RemoteAddr().String(), &Config{ + DialTimeout: conn.addr.dialtimeout, PeersTimeout: conn.addr.peerstimeout, ReceiveChannelSize: conn.addr.recvchansize, }) @@ -203,11 +206,22 @@ func (conn *Conn) WriteToPeer(b []byte, ep p2p.EndPoint) (n int, err error) { } tcpconn := conn.peers.Get(tcpep.String()) if tcpconn == nil { + dialtimeout := tcpep.dialtimeout + if dialtimeout < time.Second { + dialtimeout = time.Second + } + logrus.Infoln("[tcp] dial to", tcpep.addr, "timeout", dialtimeout) + var cn net.Conn // must use another port to send because there's no exsiting conn - tcpconn, err = net.DialTCP(tcpep.Network(), nil, tcpep.addr) + cn, err = net.DialTimeout(tcpep.Network(), tcpep.addr.String(), dialtimeout) if err != nil { return } + tcpconn, ok = cn.(*net.TCPConn) + if !ok { + return 0, errors.New("expect *net.TCPConn but got " + reflect.ValueOf(cn).Type().String()) + } + logrus.Infoln("[tcp] dial to", tcpep.addr, "success, local:", tcpconn.LocalAddr()) conn.peers.Set(tcpep.String(), tcpconn) } cnt, err := io.Copy(tcpconn, &packet{