-
Notifications
You must be signed in to change notification settings - Fork 46
/
pipeline.go
107 lines (94 loc) · 2.77 KB
/
pipeline.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
package protocol
import (
"context"
"net"
"github.com/fakeyanss/jt808-server-go/internal/protocol/model"
)
// tcp/udp 消息处理组
type Pipeline struct {
fh *JT808FrameHandler // FrameHandler instance
pc *JT808PacketCodec // PacketCodec instance
mp *JT808MsgProcessor // MsgHandler instance
}
func NewPipeline(conn net.Conn) *Pipeline {
return &Pipeline{
fh: NewJT808FrameHandler(conn),
pc: NewJT808PacketCodec(),
mp: NewJT808MsgProcessor(),
}
}
// 处理函数封装
type delegateFunc func(context.Context, *Pipeline) (context.Context, error)
func (p *Pipeline) ProcessConnRead(ctx context.Context) error {
actions := []delegateFunc{
recv(),
decode(),
process(),
encode(),
send(),
}
return p.callWithBlocking(ctx, actions)
}
func (p *Pipeline) ProcessConnWrite(ctx context.Context) error {
actions := []delegateFunc{
encode(),
send(),
}
return p.callWithBlocking(ctx, actions)
}
func (p *Pipeline) callWithBlocking(ctx context.Context, funcs []delegateFunc) error {
// todo: 重构err定义,通过errors.Cause, 区分breakErr, continueErr
curCtx := ctx
var err error
for _, f := range funcs {
curCtx, err = f(curCtx, p)
if curCtx == nil || err != nil {
break
}
}
return err
}
func recv() delegateFunc {
return delegateFunc(func(ctx context.Context, p *Pipeline) (context.Context, error) {
framePayload, err := p.fh.Recv(ctx)
nxtCtx := context.WithValue(ctx, model.FrameCtxKey{}, framePayload)
return nxtCtx, err
})
}
func decode() delegateFunc {
return delegateFunc(func(ctx context.Context, p *Pipeline) (context.Context, error) {
framePayload := ctx.Value(model.FrameCtxKey{}).(FramePayload)
packet, err := p.pc.Decode(framePayload)
nxtCtx := context.WithValue(ctx, model.PacketDecodeCtxKey{}, packet)
return nxtCtx, err
})
}
func process() delegateFunc {
return delegateFunc(func(ctx context.Context, p *Pipeline) (context.Context, error) {
packet := ctx.Value(model.PacketDecodeCtxKey{}).(*model.PacketData)
if packet == nil { // 不需要处理
return nil, nil
}
pd, err := p.mp.Process(ctx, packet)
nxtCtx := context.WithValue(ctx, model.ProcessDataCtxKey{}, pd)
return nxtCtx, err
})
}
func encode() delegateFunc {
return delegateFunc(func(ctx context.Context, p *Pipeline) (context.Context, error) {
pd := ctx.Value(model.ProcessDataCtxKey{}).(*model.ProcessData)
if pd == nil || pd.Outgoing == nil { // 不需要回复,不用后续处理
return nil, nil
}
pkt, err := p.pc.Encode(pd.Outgoing)
nxtCtx := context.WithValue(ctx, model.PacketEncodeCtxKey{}, pkt)
return nxtCtx, err
})
}
func send() delegateFunc {
return delegateFunc(func(ctx context.Context, p *Pipeline) (context.Context, error) {
packet := ctx.Value(model.PacketEncodeCtxKey{}).([]byte)
err := p.fh.Send(packet)
return ctx, err
})
}