Skip to content

Commit

Permalink
feat(p2p): support tcp protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
fumiama committed Jul 16, 2024
1 parent 17e1f6c commit 739cf86
Show file tree
Hide file tree
Showing 19 changed files with 393 additions and 26 deletions.
2 changes: 1 addition & 1 deletion config/cfg.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ type Config struct {
IP string `yaml:"IP"`
SubNet string `yaml:"SubNet"`
PrivateKey string `yaml:"PrivateKey"`
Network string `yaml:"Network"` // Network udp or ws (WIP)
Network string `yaml:"Network"` // Network udp, tcp or ws (WIP)
EndPoint string `yaml:"EndPoint"`
MTU int64 `yaml:"MTU"`
SpeedLoop uint16 `yaml:"SpeedLoop"`
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/fumiama/WireGold
go 1.20

require (
github.com/FloatTech/ttl v0.0.0-20230307105452-d6f7b2b647d1
github.com/FloatTech/ttl v0.0.0-20240715074357-190755f3fece
github.com/RomiChan/syncx v0.0.0-20240418144900-b7402ffdebc7
github.com/fumiama/blake2b-simd v0.0.0-20220412110131-4481822068bb
github.com/fumiama/go-base16384 v1.7.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
github.com/FloatTech/ttl v0.0.0-20230307105452-d6f7b2b647d1 h1:g4pTnDJUW4VbJ9NvoRfUvdjDrHz/6QhfN/LoIIpICbo=
github.com/FloatTech/ttl v0.0.0-20230307105452-d6f7b2b647d1/go.mod h1:fHZFWGquNXuHttu9dUYoKuNbm3dzLETnIOnm1muSfDs=
github.com/FloatTech/ttl v0.0.0-20240715074357-190755f3fece h1:RIrGO+hIOoXxUh0T3TDaWNvinkXH9S2i12cWivT2MZ4=
github.com/FloatTech/ttl v0.0.0-20240715074357-190755f3fece/go.mod h1:fHZFWGquNXuHttu9dUYoKuNbm3dzLETnIOnm1muSfDs=
github.com/RomiChan/syncx v0.0.0-20240418144900-b7402ffdebc7 h1:S/ferNiehVjNaBMNNBxUjLtVmP/YWD6Yh79RfPv4ehU=
github.com/RomiChan/syncx v0.0.0-20240418144900-b7402ffdebc7/go.mod h1:vD7Ra3Q9onRtojoY5sMCLQ7JBgjUsrXDnDKyFxqpf9w=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down
9 changes: 7 additions & 2 deletions gold/head/packet.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ import (
"github.com/sirupsen/logrus"
)

var (
ErrBadCRCChecksum = errors.New("bad crc checksum")
ErrDataLenLT60 = errors.New("data len < 60")
)

type PacketFlags uint16

func (pf PacketFlags) IsValid() bool {
Expand Down Expand Up @@ -97,12 +102,12 @@ func NewPacket(proto uint8, srcPort uint16, dst net.IP, dstPort uint16, data []b
// Unmarshal 将 data 的数据解码到自身
func (p *Packet) Unmarshal(data []byte) (complete bool, err error) {
if len(data) < 60 {
err = errors.New("data len < 60")
err = ErrDataLenLT60
return
}
p.crc64 = binary.LittleEndian.Uint64(data[52:60])
if crc64.Checksum(data[:52], crc64.MakeTable(crc64.ISO)) != p.crc64 {
err = errors.New("bad crc checksum")
err = ErrBadCRCChecksum
return
}

Expand Down
6 changes: 5 additions & 1 deletion gold/link/link.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ import (
base14 "github.com/fumiama/go-base16384"
)

var (
ErrPerrNotExist = errors.New("peer not exist")
)

// Link 是本机到 peer 的连接抽象
type Link struct {
// peer 的公钥
Expand Down Expand Up @@ -56,7 +60,7 @@ func (m *Me) Connect(peer string) (*Link, error) {
if ok {
return p, nil
}
return nil, errors.New("peer not exist")
return nil, ErrPerrNotExist
}

// Close 关闭到 peer 的连接
Expand Down
2 changes: 1 addition & 1 deletion gold/link/listen.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (m *Me) listen() (conn p2p.Conn, err error) {
return
}
if err != nil {
logrus.Warnln("[listen] read from udp err, reconnect:", err)
logrus.Warnln("[listen] read from conn err, reconnect:", err)
conn, err = m.ep.Listen()
if err != nil {
logrus.Errorln("[listen] reconnect udp err:", err)
Expand Down
6 changes: 5 additions & 1 deletion gold/link/me.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,15 @@ type Me struct {
srcport, dstport, mtu, speedloop uint16
// 报头掩码
mask uint64
// 本机网络端点初始化配置
networkconfigs []any
}

type MyConfig struct {
MyIPwithMask string
MyEndpoint string
Network string
NetworkConfigs []any
PrivateKey *[32]byte
NIC lower.NICIO
SrcPort, DstPort, MTU, SpeedLoop uint16
Expand All @@ -71,7 +74,8 @@ func NewMe(cfg *MyConfig) (m Me) {
if nw == "" {
nw = "udp"
}
m.ep, err = p2p.NewEndPoint(nw, cfg.MyEndpoint)
m.networkconfigs = cfg.NetworkConfigs
m.ep, err = p2p.NewEndPoint(nw, cfg.MyEndpoint, m.networkconfigs...)
if err != nil {
panic(err)
}
Expand Down
7 changes: 6 additions & 1 deletion gold/link/nat.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,12 @@ func (l *Link) onNotify(packet []byte) {
// ---- 遍历 Notify,注册对方的 endpoint 到
// ---- connections,注意使用读写锁connmapmu
for peer, ep := range notify {
addr, err := p2p.NewEndPoint(ep[0], ep[1])
nw, epstr := ep[0], ep[1]
if nw != l.me.ep.Network() {
logrus.Warnln("[nat] ignore different network notify", nw, "addr", epstr)
continue
}
addr, err := p2p.NewEndPoint(nw, epstr, l.me.networkconfigs...)
if err == nil {
p, ok := l.me.IsInPeer(peer)
if ok {
Expand Down
2 changes: 1 addition & 1 deletion gold/link/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (m *Me) AddPeer(cfg *PeerConfig) (l *Link) {
}
}
if cfg.EndPoint != "" {
e, err := p2p.NewEndPoint(m.ep.Network(), cfg.EndPoint)
e, err := p2p.NewEndPoint(m.ep.Network(), cfg.EndPoint, m.networkconfigs...)
if err != nil {
panic(err)
}
Expand Down
11 changes: 8 additions & 3 deletions gold/link/send.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ import (
"github.com/sirupsen/logrus"
)

var (
ErrDropBigDontFragTransPkt = errors.New("drop big don't fragmnet trans packet")
ErrTTL = errors.New("ttl exceeded")
)

// WriteAndPut 向 peer 发包并将包放回缓存池
func (l *Link) WriteAndPut(p *head.Packet, istransfer bool) (n int, err error) {
defer p.Put()
Expand All @@ -37,7 +42,7 @@ func (l *Link) WriteAndPut(p *head.Packet, istransfer bool) (n int, err error) {
return l.write(p, teatype, sndcnt, uint32(remlen), 0, istransfer, false)
}
if istransfer && p.Flags.DontFrag() && remlen > delta {
return 0, errors.New("drop don't fragmnet big trans packet")
return 0, ErrDropBigDontFragTransPkt
}
ttl := p.TTL
totl := uint32(remlen)
Expand Down Expand Up @@ -93,11 +98,11 @@ func (l *Link) write(p *head.Packet, teatype uint8, additional uint16, datasz ui
d, cl = p.Marshal(l.me.me, teatype, additional, datasz, offset, false, hasmore)
}
if d == nil {
return 0, errors.New("[send] ttl exceeded")
return 0, ErrTTL
}
peerep := l.endpoint
if peerep == nil {
return 0, errors.New("[send] nil endpoint of " + p.Dst.String())
return 0, errors.New("nil endpoint of " + p.Dst.String())
}
bound := 64
endl := "..."
Expand Down
4 changes: 4 additions & 0 deletions gold/p2p/define.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ import (
"github.com/RomiChan/syncx"
)

var (
ErrEndpointTypeMistatch = errors.New("endpoint type mismatch")
)

type Initializer func(endpoint string, configs ...any) EndPoint

var factory syncx.Map[string, Initializer]
Expand Down
41 changes: 41 additions & 0 deletions gold/p2p/tcp/init.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package tcp

import (
"net"
"net/netip"
"time"

"github.com/fumiama/WireGold/gold/p2p"
)

type Config struct {
PeersTimeout time.Duration
ReceiveChannelSize int
}

func NewEndpoint(endpoint string, configs ...any) p2p.EndPoint {
return newEndpoint(endpoint, configs...)
}

func newEndpoint(endpoint string, configs ...any) *EndPoint {
var cfg *Config
if len(configs) == 0 || configs[0] == nil {
cfg = &Config{}
} else {
cfg = configs[0].(*Config)
}
return &EndPoint{
addr: net.TCPAddrFromAddrPort(
netip.MustParseAddrPort(endpoint),
),
peerstimeout: cfg.PeersTimeout,
recvchansize: cfg.ReceiveChannelSize,
}
}

func init() {
_, hasexist := p2p.Register("tcp", NewEndpoint)
if hasexist {
panic("network tcp has been registered")
}
}
65 changes: 65 additions & 0 deletions gold/p2p/tcp/pdu.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package tcp

import (
"encoding/binary"
"io"
"net"

"github.com/fumiama/WireGold/helper"
)

type packetType uint8

const (
packetTypeKeepAlive packetType = iota
packetTypeNormal
)

type packet struct {
typ packetType
len uint16
dat []byte
io.ReaderFrom
io.WriterTo
}

func (p *packet) pack() (net.Buffers, func()) {
d, cl := helper.OpenWriterF(func(w *helper.Writer) {
w.WriteByte(byte(p.typ))
w.WriteUInt16(p.len)
})
return net.Buffers{d, p.dat}, cl
}

func (p *packet) Read(_ []byte) (int, error) {
panic("stub")
}

func (p *packet) Write(_ []byte) (int, error) {
panic("stub")
}

func (p *packet) ReadFrom(r io.Reader) (n int64, err error) {
var buf [3]byte
cnt, err := io.ReadFull(r, buf[:])
n = int64(cnt)
if err != nil {
return
}
p.typ = packetType(buf[0])
p.len = binary.LittleEndian.Uint16(buf[1:3])
w := helper.SelectWriter()
copied, err := io.CopyN(w, r, int64(p.len))
n += copied
if err != nil {
return
}
p.dat = w.Bytes()
return
}

func (p *packet) WriteTo(w io.Writer) (n int64, err error) {
buf, cl := p.pack()
defer cl()
return io.Copy(w, &buf)
}
Loading

0 comments on commit 739cf86

Please sign in to comment.