diff --git a/example/protocol/go.mod b/example/protocol/go.mod index 430e45c..8039755 100644 --- a/example/protocol/go.mod +++ b/example/protocol/go.mod @@ -3,7 +3,7 @@ module protocol/terminal go 1.23.2 require ( - github.com/cuteLittleDevil/go-jt808/protocol v1.1.0 + github.com/cuteLittleDevil/go-jt808/protocol v1.2.0 github.com/cuteLittleDevil/go-jt808/service v0.1.3 github.com/cuteLittleDevil/go-jt808/shared v1.1.0 github.com/cuteLittleDevil/go-jt808/terminal v0.1.3 diff --git a/example/protocol/go.sum b/example/protocol/go.sum index 6ca15e4..d1d904b 100644 --- a/example/protocol/go.sum +++ b/example/protocol/go.sum @@ -1,5 +1,5 @@ -github.com/cuteLittleDevil/go-jt808/protocol v1.1.0 h1:DA0EV/D++eHKnic5LFwvxzU6/ZlYDup7q1jI3nM0DBQ= -github.com/cuteLittleDevil/go-jt808/protocol v1.1.0/go.mod h1:LEgUyC8x29ITq6H7ycpOpgaHjjdquzYyzUXgbN5kYkI= +github.com/cuteLittleDevil/go-jt808/protocol v1.2.0 h1:OdzyjWnLs7TPeMHu0NvtiIDe16OK58c/bUJCgRaWNhM= +github.com/cuteLittleDevil/go-jt808/protocol v1.2.0/go.mod h1:LEgUyC8x29ITq6H7ycpOpgaHjjdquzYyzUXgbN5kYkI= github.com/cuteLittleDevil/go-jt808/service v0.1.3 h1:cpXmYdyjCUts769fIOVS/V/HtxHvBgYMjtUeDFZMbyE= github.com/cuteLittleDevil/go-jt808/service v0.1.3/go.mod h1:XOum4zCLCnMScfLUI5fr0acYsv3539gm3O0S5QUDttk= github.com/cuteLittleDevil/go-jt808/shared v1.1.0 h1:1YkXZnBmu2i7MzKZEiTh0iB6QczbZx1v+6jd4kP0CQk= diff --git a/protocol/model/t_terminal_params.go b/protocol/model/t_terminal_params.go index 7aa781e..e2cef3b 100644 --- a/protocol/model/t_terminal_params.go +++ b/protocol/model/t_terminal_params.go @@ -960,7 +960,7 @@ func (t *TerminalParamDetails) String() string { if len(t.OtherContent) > 0 { ids := make([]int, 0, len(t.OtherContent)) - for id, _ := range t.OtherContent { + for id := range t.OtherContent { ids = append(ids, int(id)) } sort.Ints(ids) diff --git a/service/active_message.go b/service/active_message.go new file mode 100644 index 0000000..94139fd --- /dev/null +++ b/service/active_message.go @@ -0,0 +1,57 @@ +package service + +import ( + "fmt" + "github.com/cuteLittleDevil/go-jt808/protocol/jt808" + "github.com/cuteLittleDevil/go-jt808/shared/consts" + "strings" + "sync" + "time" +) + +type ActiveMessage struct { + // once 保证完成情况只能被触发一次 + once sync.Once + // header 设备消息固体头 使用的是第一次报文的固定头 + header *jt808.Header + // replyChan 用于获取终端应答情况 + replyChan chan *Message + // completeChan 用于判断完成情况 + completeChan chan struct{} + // Key 唯一标识符 默认手机号 + Key string `json:"key"` + // Command 平台下发的指令 + Command consts.JT808CommandType `json:"command"` + // Body 平台下发的数据 + Body []byte `json:"body"` + // Data 平台最终下发的数据 + Data []byte `json:"data"` + // OverTimeDuration 超时时间 默认5秒 + OverTimeDuration time.Duration `json:"overTimeDuration"` +} + +func NewActiveMessage(key string, command consts.JT808CommandType, body []byte, overTimeDuration time.Duration) *ActiveMessage { + return &ActiveMessage{Key: key, Command: command, Body: body, OverTimeDuration: overTimeDuration} +} + +func (a *ActiveMessage) hasComplete() bool { + select { + case <-a.completeChan: + return true + default: + } + ok := true + a.once.Do(func() { + close(a.completeChan) + ok = false + }) + return ok +} + +func (a *ActiveMessage) String() string { + return strings.Join([]string{ + fmt.Sprintf("key[%s]", a.Key), + fmt.Sprintf("指令[%x] [%s]", a.Command, a.Command), + fmt.Sprintf("body[%x]", a.Body), + }, "\n") +} diff --git a/service/connection.go b/service/connection.go index 6162f84..b2d4bec 100644 --- a/service/connection.go +++ b/service/connection.go @@ -3,28 +3,41 @@ package service import ( "errors" "fmt" + "github.com/cuteLittleDevil/go-jt808/protocol/model" "github.com/cuteLittleDevil/go-jt808/shared/consts" "io" "log/slog" "net" "sync" + "time" ) type connection struct { - conn *net.TCPConn - handles map[consts.JT808CommandType]Handler - stopOnce sync.Once - stopChan chan struct{} - msgBuffChan chan *Message + conn *net.TCPConn + handles map[consts.JT808CommandType]Handler + stopOnce sync.Once + stopChan chan struct{} + msgBuffChan chan *Message + activeMsgChan chan *ActiveMessage + // platformSerialNumber 平台流水号 到了math.MaxUint16后+1重新变成0 + platformSerialNumber uint16 + joinFunc func(message *Message, activeChan chan<- *ActiveMessage) (string, error) + leaveFunc func(key string) + key string } -func newConnection(conn *net.TCPConn, handles map[consts.JT808CommandType]Handler) *connection { +func newConnection(conn *net.TCPConn, handles map[consts.JT808CommandType]Handler, + join func(message *Message, activeChan chan<- *ActiveMessage) (string, error), leave func(key string)) *connection { return &connection{ - conn: conn, - handles: handles, - stopOnce: sync.Once{}, - stopChan: make(chan struct{}), - msgBuffChan: make(chan *Message, 10), + conn: conn, + handles: handles, + stopOnce: sync.Once{}, + stopChan: make(chan struct{}), + msgBuffChan: make(chan *Message, 10), + activeMsgChan: make(chan *ActiveMessage, 3), + platformSerialNumber: uint16(0), + joinFunc: join, + leaveFunc: leave, } } @@ -34,13 +47,15 @@ func (c *connection) Start() { } func (c *connection) reader() { - // 消息体长度最大为 10bit 也就是 1023 的字节 - curData := make([]byte, 1023) - pack := newPackageParse() - + var ( + once sync.Once + // 消息体长度最大为 10bit 也就是 1023 的字节 + curData = make([]byte, 1023) + pack = newPackageParse() + ) defer func() { c.stop() - curData = nil + clear(curData) pack.clear() }() @@ -52,10 +67,12 @@ func (c *connection) reader() { if n, err := c.conn.Read(curData); err != nil { if errors.Is(err, net.ErrClosed) || errors.Is(err, io.EOF) { slog.Debug("connection close", + slog.Any("platform num", c.platformSerialNumber), slog.Any("err", err)) return } slog.Error("read data", + slog.Any("platform num", c.platformSerialNumber), slog.Any("err", err)) return } else if n > 0 { @@ -68,16 +85,32 @@ func (c *connection) reader() { return } for _, msg := range msgs { - key := consts.JT808CommandType(msg.JTMessage.Header.ID) - if handler, ok := c.handles[key]; ok { + command := consts.JT808CommandType(msg.JTMessage.Header.ID) + fmt.Println(uint16(command), command.String()) + if handler, ok := c.handles[command]; ok { msg.Handler = handler msg.OnReadExecutionEvent(msg) } else { slog.Warn("key not found", slog.Int("id", int(msg.JTMessage.Header.ID)), - slog.String("remark", key.String())) + slog.String("remark", command.String())) continue } + fail := false + once.Do(func() { + if key, err := c.joinFunc(msg, c.activeMsgChan); err != nil { + fail = true + slog.Warn("key exist", + slog.String("effective data", fmt.Sprintf("%x", effectiveData)), + slog.Any("err", err)) + return + } else { + c.key = key + } + }) + if fail { + return + } select { case c.msgBuffChan <- msg: default: @@ -85,45 +118,30 @@ func (c *connection) reader() { slog.String("original data", fmt.Sprintf("%x", msg.OriginalData))) } } - } } } } func (c *connection) write() { - // 到了math.MaxUint16后+1重新变成0 - num := uint16(0) + record := map[uint16]*ActiveMessage{} for { select { case <-c.stopChan: + clear(record) return + case activeMsg, ok := <-c.activeMsgChan: + if ok { + c.onActiveEvent(activeMsg, record) + } case msg, ok := <-c.msgBuffChan: if ok { - header := msg.JTMessage.Header - header.ReplyID = uint16(msg.ReplyProtocol()) - header.PlatformSerialNumber = num - num++ - if has := msg.HasReply(); !has { - continue - } - body, err := msg.ReplyBody(msg.JTMessage) - if err != nil { - slog.Warn("reply body fail", - slog.String("original data", fmt.Sprintf("%x", msg.OriginalData)), - slog.Any("err", err)) - continue - } - data := header.Encode(body) - _, err = c.conn.Write(data) - if err != nil { - slog.Warn("write fail", - slog.String("data", fmt.Sprintf("%x", data)), - slog.Any("err", err)) + if len(record) > 0 { // 说明现在有主动的请求 等待回复中 + if c.onActiveRespondEvent(record, msg) { + continue + } } - msg.ReplyData = data - msg.WriteErr = err - msg.OnWriteExecutionEvent(*msg) + c.defaultReplyEvent(msg) } else if msg != nil && len(msg.OriginalData) > 0 { slog.Warn("msgBuffChan is close", slog.String("original data", fmt.Sprintf("%x", msg.OriginalData))) @@ -134,7 +152,124 @@ func (c *connection) write() { func (c *connection) stop() { c.stopOnce.Do(func() { + c.leaveFunc(c.key) + clear(c.handles) close(c.msgBuffChan) + close(c.activeMsgChan) close(c.stopChan) }) } + +func (c *connection) defaultReplyEvent(msg *Message) { + header := msg.JTMessage.Header + header.ReplyID = uint16(msg.ReplyProtocol()) + header.PlatformSerialNumber = c.platformSerialNumber + if has := msg.HasReply(); !has { + return + } + body, err := msg.ReplyBody(msg.JTMessage) + if err != nil { + slog.Warn("reply body fail", + slog.String("original data", fmt.Sprintf("%x", msg.OriginalData)), + slog.Any("err", err)) + return + } + c.platformSerialNumber++ + data := header.Encode(body) + if _, err = c.conn.Write(data); err != nil { + slog.Warn("write fail", + slog.String("data", fmt.Sprintf("%x", data)), + slog.Any("err", err)) + msg.WriteErr = errors.Join(ErrWriteDataFail, err) + } + msg.ReplyData = data + msg.OnWriteExecutionEvent(*msg) +} + +func (c *connection) onActiveEvent(activeMsg *ActiveMessage, record map[uint16]*ActiveMessage) { + header := activeMsg.header + header.ReplyID = uint16(activeMsg.Command) + header.PlatformSerialNumber = c.platformSerialNumber + num := c.platformSerialNumber + record[num] = activeMsg + c.platformSerialNumber++ + data := header.Encode(activeMsg.Body) + activeMsg.Data = data + _, err := c.conn.Write(data) + if err != nil { + slog.Warn("write fail", + slog.String("data", fmt.Sprintf("%x", data)), + slog.Any("err", err)) + if activeMsg.hasComplete() { + return + } + delete(record, num) + activeMsg.replyChan <- newErrMessage(errors.Join(ErrWriteDataFail, err)) + return + } + go func(msg *ActiveMessage, seq uint16) { + duration := 5 * time.Second + if msg.OverTimeDuration > 0 { + duration = msg.OverTimeDuration + } + time.Sleep(duration) + if activeMsg.hasComplete() { + return + } + delete(record, seq) + activeMsg.replyChan <- newErrMessage(errors.Join(ErrWriteDataOverTime, + fmt.Errorf("overtime is [%.2f]second", duration.Seconds()))) + }(activeMsg, num) + if v, ok := c.handles[activeMsg.Command]; ok { + msg := NewMessage(data) + if err != nil { + msg.WriteErr = errors.Join(ErrWriteDataFail, err) + } + v.OnWriteExecutionEvent(*msg) + } +} + +func (c *connection) onActiveRespondEvent(record map[uint16]*ActiveMessage, msg *Message) bool { + type respond struct { + JT808Handler + HasRespondFunc func(seq uint16) bool + } + tmp := respond{ + JT808Handler: nil, + HasRespondFunc: nil, + } + switch consts.JT808CommandType(msg.JTMessage.Header.ID) { + case consts.T0001GeneralRespond: + t0x0001 := &model.T0x0001{} + tmp.JT808Handler = t0x0001 + tmp.HasRespondFunc = func(seq uint16) bool { + return seq == t0x0001.SerialNumber + } + case consts.T0104QueryParameter: + t0x0104 := &model.T0x0104{} + tmp.JT808Handler = t0x0104 + tmp.HasRespondFunc = func(seq uint16) bool { + return seq == t0x0104.RespondSerialNumber + } + } + if tmp.HasRespondFunc != nil { + if err := tmp.Parse(msg.JTMessage); err != nil { + slog.Warn("parse fail", + slog.String("original data", fmt.Sprintf("%x", msg.OriginalData)), + slog.Any("err", err)) + return true + } + for k, v := range record { + if tmp.HasRespondFunc(k) { + if v.hasComplete() { + return true + } + delete(record, k) + v.replyChan <- msg + return true + } + } + } + + return false +} diff --git a/service/error.go b/service/error.go new file mode 100644 index 0000000..3b41827 --- /dev/null +++ b/service/error.go @@ -0,0 +1,9 @@ +package service + +import "errors" + +var ( + ErrWriteDataFail = errors.New("write data fail") + ErrWriteDataOverTime = errors.New("write data is overtime") + ErrNotExistKey = errors.New("key not exist") +) diff --git a/service/go.mod b/service/go.mod index 83d9030..0ddb7eb 100644 --- a/service/go.mod +++ b/service/go.mod @@ -3,7 +3,7 @@ module github.com/cuteLittleDevil/go-jt808/service go 1.23.2 require ( - github.com/cuteLittleDevil/go-jt808/protocol v1.1.0 + github.com/cuteLittleDevil/go-jt808/protocol v1.2.0 github.com/cuteLittleDevil/go-jt808/shared v1.1.0 ) diff --git a/service/go.sum b/service/go.sum index 2396887..f383b57 100644 --- a/service/go.sum +++ b/service/go.sum @@ -1,5 +1,5 @@ -github.com/cuteLittleDevil/go-jt808/protocol v1.1.0 h1:DA0EV/D++eHKnic5LFwvxzU6/ZlYDup7q1jI3nM0DBQ= -github.com/cuteLittleDevil/go-jt808/protocol v1.1.0/go.mod h1:LEgUyC8x29ITq6H7ycpOpgaHjjdquzYyzUXgbN5kYkI= +github.com/cuteLittleDevil/go-jt808/protocol v1.2.0 h1:OdzyjWnLs7TPeMHu0NvtiIDe16OK58c/bUJCgRaWNhM= +github.com/cuteLittleDevil/go-jt808/protocol v1.2.0/go.mod h1:LEgUyC8x29ITq6H7ycpOpgaHjjdquzYyzUXgbN5kYkI= github.com/cuteLittleDevil/go-jt808/shared v1.1.0 h1:1YkXZnBmu2i7MzKZEiTh0iB6QczbZx1v+6jd4kP0CQk= github.com/cuteLittleDevil/go-jt808/shared v1.1.0/go.mod h1:BMWFmkDRLNjcXcuiPm/yphfWfZ6xNuTAJDkDDNhysOM= golang.org/x/text v0.19.0 h1:kTxAhCbGbxhK0IwgSKiMO5awPoDQ0RpfiVYBfK860YM= diff --git a/service/message.go b/service/message.go index 76a97a4..e440b92 100644 --- a/service/message.go +++ b/service/message.go @@ -18,3 +18,7 @@ func NewMessage(originalData []byte) *Message { OriginalData: originalData, } } + +func newErrMessage(err error) *Message { + return &Message{WriteErr: err} +} diff --git a/service/option.go b/service/option.go index 7d37a17..00db7e6 100644 --- a/service/option.go +++ b/service/option.go @@ -17,6 +17,7 @@ type Options struct { CustomHandleFunc func() map[consts.JT808CommandType]Handler Addr string Network string + KeyFunc func(message *Message) string } func (o *Options) Apply(opts []Option) { @@ -51,3 +52,9 @@ func WithCustomHandleFunc(customHandleFunc func() map[consts.JT808CommandType]Ha o.CustomHandleFunc = customHandleFunc }} } + +func WithKeyFunc(keyFunc func(message *Message) string) Option { + return Option{F: func(o *Options) { + o.KeyFunc = keyFunc + }} +} diff --git a/service/packet_parse.go b/service/packet_parse.go index 12fa1c7..8b76dd3 100644 --- a/service/packet_parse.go +++ b/service/packet_parse.go @@ -21,13 +21,13 @@ func newPackageParse() *packageParse { } func (p *packageParse) clear() { - p.historyData = nil + clear(p.historyData) for id, datas := range p.subcontractingRecord { slog.Warn("package no complete", slog.Any("id", id), slog.Int("data sum", len(datas))) } - p.subcontractingRecord = nil + clear(p.subcontractingRecord) } // parse 返回一个或者多个完成的包 diff --git a/service/service.go b/service/service.go index e0b6408..389bf13 100644 --- a/service/service.go +++ b/service/service.go @@ -9,13 +9,23 @@ import ( type GoJT808 struct { opts *Options + *sessionManager } func New(opts ...Option) *GoJT808 { options := NewOptions(opts) - return &GoJT808{ + g := &GoJT808{ opts: options, } + keyFunc := func(message *Message) string { + return message.Header.TerminalPhoneNo + } + if g.opts.KeyFunc != nil { + keyFunc = g.opts.KeyFunc + } + g.sessionManager = newSessionManager(keyFunc) + go g.sessionManager.run() + return g } func (g *GoJT808) Run() { @@ -49,11 +59,15 @@ func (g *GoJT808) Run() { handles[k] = v } } - conn := newConnection(c, handles) + conn := newConnection(c, handles, g.sessionManager.join, g.sessionManager.leave) go conn.Start() } } +func (g *GoJT808) SendActiveMessage(activeMsg *ActiveMessage) *Message { + return g.sessionManager.write(activeMsg) +} + func (g *GoJT808) createDefaultHandle() map[consts.JT808CommandType]Handler { return map[consts.JT808CommandType]Handler{ consts.T0100Register: newDefaultHandle(&model.T0x0100{}), @@ -61,5 +75,7 @@ func (g *GoJT808) createDefaultHandle() map[consts.JT808CommandType]Handler { consts.T0002HeartBeat: newDefaultHandle(&model.T0x0002{}), consts.T0200LocationReport: newDefaultHandle(&model.T0x0200{}), consts.T0704LocationBatchUpload: newDefaultHandle(&model.T0x0704{}), + consts.T0104QueryParameter: newDefaultHandle(&model.T0x0104{}), + consts.P8104QueryTerminalParams: newDefaultHandle(&model.P0x8104{}), } } diff --git a/service/session_manager.go b/service/session_manager.go new file mode 100644 index 0000000..fa87925 --- /dev/null +++ b/service/session_manager.go @@ -0,0 +1,91 @@ +package service + +import ( + "errors" + "fmt" + "github.com/cuteLittleDevil/go-jt808/protocol/jt808" + "time" +) + +type sessionOperationFunc func(record map[string]*session) + +type ( + sessionManager struct { + operationFuncChan chan sessionOperationFunc + keyFunc func(message *Message) string + } + + session struct { + header *jt808.Header + // 加入时间 + joinTime time.Time + // 数据发送到终端 + activeMsgChan chan<- *ActiveMessage + } +) + +func newSessionManager(keyFunc func(message *Message) string) *sessionManager { + return &sessionManager{ + operationFuncChan: make(chan sessionOperationFunc, 10), + keyFunc: keyFunc, + } +} + +func (s *sessionManager) run() { + record := make(map[string]*session, 1000) + for { + select { + case opFunc := <-s.operationFuncChan: + opFunc(record) + } + } +} + +func (s *sessionManager) join(message *Message, activeChan chan<- *ActiveMessage) (string, error) { + ch := make(chan error) + defer close(ch) + key := s.keyFunc(message) + s.operationFuncChan <- func(record map[string]*session) { + if v, ok := record[key]; ok { + ch <- fmt.Errorf("exist key join time[%s]", v.joinTime.Format(time.RFC3339)) + return + } + record[key] = &session{ + header: message.Header, + joinTime: time.Now(), + activeMsgChan: activeChan, + } + ch <- nil + } + return key, <-ch +} + +func (s *sessionManager) leave(key string) { + ch := make(chan struct{}) + s.operationFuncChan <- func(record map[string]*session) { + defer close(ch) + if _, ok := record[key]; ok { + delete(record, key) + } + } + <-ch + return +} + +func (s *sessionManager) write(activeMsg *ActiveMessage) *Message { + replyChan := make(chan *Message) + defer close(replyChan) + s.operationFuncChan <- func(record map[string]*session) { + key := activeMsg.Key + if v, ok := record[key]; ok { + activeMsg.header = v.header + activeMsg.completeChan = make(chan struct{}) + activeMsg.replyChan = replyChan + v.activeMsgChan <- activeMsg + return + } + replyChan <- newErrMessage(errors.Join(ErrNotExistKey, + fmt.Errorf("key=[%s] sum=[%d] ", key, len(record)))) + } + return <-replyChan +}