Skip to content

Commit

Permalink
feat: 添加平台下发指令到终端功能
Browse files Browse the repository at this point in the history
  • Loading branch information
cuteLittleDevil committed Oct 27, 2024
1 parent 05639f1 commit 83ad132
Show file tree
Hide file tree
Showing 13 changed files with 375 additions and 56 deletions.
2 changes: 1 addition & 1 deletion example/protocol/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module protocol/terminal
go 1.23.2

require (
github.com/cuteLittleDevil/go-jt808/protocol v1.1.0
github.com/cuteLittleDevil/go-jt808/protocol v1.2.0
github.com/cuteLittleDevil/go-jt808/service v0.1.3
github.com/cuteLittleDevil/go-jt808/shared v1.1.0
github.com/cuteLittleDevil/go-jt808/terminal v0.1.3
Expand Down
4 changes: 2 additions & 2 deletions example/protocol/go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
github.com/cuteLittleDevil/go-jt808/protocol v1.1.0 h1:DA0EV/D++eHKnic5LFwvxzU6/ZlYDup7q1jI3nM0DBQ=
github.com/cuteLittleDevil/go-jt808/protocol v1.1.0/go.mod h1:LEgUyC8x29ITq6H7ycpOpgaHjjdquzYyzUXgbN5kYkI=
github.com/cuteLittleDevil/go-jt808/protocol v1.2.0 h1:OdzyjWnLs7TPeMHu0NvtiIDe16OK58c/bUJCgRaWNhM=
github.com/cuteLittleDevil/go-jt808/protocol v1.2.0/go.mod h1:LEgUyC8x29ITq6H7ycpOpgaHjjdquzYyzUXgbN5kYkI=
github.com/cuteLittleDevil/go-jt808/service v0.1.3 h1:cpXmYdyjCUts769fIOVS/V/HtxHvBgYMjtUeDFZMbyE=
github.com/cuteLittleDevil/go-jt808/service v0.1.3/go.mod h1:XOum4zCLCnMScfLUI5fr0acYsv3539gm3O0S5QUDttk=
github.com/cuteLittleDevil/go-jt808/shared v1.1.0 h1:1YkXZnBmu2i7MzKZEiTh0iB6QczbZx1v+6jd4kP0CQk=
Expand Down
2 changes: 1 addition & 1 deletion protocol/model/t_terminal_params.go
Original file line number Diff line number Diff line change
Expand Up @@ -960,7 +960,7 @@ func (t *TerminalParamDetails) String() string {

if len(t.OtherContent) > 0 {
ids := make([]int, 0, len(t.OtherContent))
for id, _ := range t.OtherContent {
for id := range t.OtherContent {
ids = append(ids, int(id))
}
sort.Ints(ids)
Expand Down
57 changes: 57 additions & 0 deletions service/active_message.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package service

import (
"fmt"
"github.com/cuteLittleDevil/go-jt808/protocol/jt808"
"github.com/cuteLittleDevil/go-jt808/shared/consts"
"strings"
"sync"
"time"
)

type ActiveMessage struct {
// once 保证完成情况只能被触发一次
once sync.Once
// header 设备消息固体头 使用的是第一次报文的固定头
header *jt808.Header
// replyChan 用于获取终端应答情况
replyChan chan *Message
// completeChan 用于判断完成情况
completeChan chan struct{}
// Key 唯一标识符 默认手机号
Key string `json:"key"`
// Command 平台下发的指令
Command consts.JT808CommandType `json:"command"`
// Body 平台下发的数据
Body []byte `json:"body"`
// Data 平台最终下发的数据
Data []byte `json:"data"`
// OverTimeDuration 超时时间 默认5秒
OverTimeDuration time.Duration `json:"overTimeDuration"`
}

func NewActiveMessage(key string, command consts.JT808CommandType, body []byte, overTimeDuration time.Duration) *ActiveMessage {
return &ActiveMessage{Key: key, Command: command, Body: body, OverTimeDuration: overTimeDuration}
}

func (a *ActiveMessage) hasComplete() bool {
select {
case <-a.completeChan:
return true
default:
}
ok := true
a.once.Do(func() {
close(a.completeChan)
ok = false
})
return ok
}

func (a *ActiveMessage) String() string {
return strings.Join([]string{
fmt.Sprintf("key[%s]", a.Key),
fmt.Sprintf("指令[%x] [%s]", a.Command, a.Command),
fmt.Sprintf("body[%x]", a.Body),
}, "\n")
}
225 changes: 180 additions & 45 deletions service/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,41 @@ package service
import (
"errors"
"fmt"
"github.com/cuteLittleDevil/go-jt808/protocol/model"
"github.com/cuteLittleDevil/go-jt808/shared/consts"
"io"
"log/slog"
"net"
"sync"
"time"
)

type connection struct {
conn *net.TCPConn
handles map[consts.JT808CommandType]Handler
stopOnce sync.Once
stopChan chan struct{}
msgBuffChan chan *Message
conn *net.TCPConn
handles map[consts.JT808CommandType]Handler
stopOnce sync.Once
stopChan chan struct{}
msgBuffChan chan *Message
activeMsgChan chan *ActiveMessage
// platformSerialNumber 平台流水号 到了math.MaxUint16后+1重新变成0
platformSerialNumber uint16
joinFunc func(message *Message, activeChan chan<- *ActiveMessage) (string, error)
leaveFunc func(key string)
key string
}

func newConnection(conn *net.TCPConn, handles map[consts.JT808CommandType]Handler) *connection {
func newConnection(conn *net.TCPConn, handles map[consts.JT808CommandType]Handler,
join func(message *Message, activeChan chan<- *ActiveMessage) (string, error), leave func(key string)) *connection {
return &connection{
conn: conn,
handles: handles,
stopOnce: sync.Once{},
stopChan: make(chan struct{}),
msgBuffChan: make(chan *Message, 10),
conn: conn,
handles: handles,
stopOnce: sync.Once{},
stopChan: make(chan struct{}),
msgBuffChan: make(chan *Message, 10),
activeMsgChan: make(chan *ActiveMessage, 3),
platformSerialNumber: uint16(0),
joinFunc: join,
leaveFunc: leave,
}
}

Expand All @@ -34,13 +47,15 @@ func (c *connection) Start() {
}

func (c *connection) reader() {
// 消息体长度最大为 10bit 也就是 1023 的字节
curData := make([]byte, 1023)
pack := newPackageParse()

var (
once sync.Once
// 消息体长度最大为 10bit 也就是 1023 的字节
curData = make([]byte, 1023)
pack = newPackageParse()
)
defer func() {
c.stop()
curData = nil
clear(curData)
pack.clear()
}()

Expand All @@ -52,10 +67,12 @@ func (c *connection) reader() {
if n, err := c.conn.Read(curData); err != nil {
if errors.Is(err, net.ErrClosed) || errors.Is(err, io.EOF) {
slog.Debug("connection close",
slog.Any("platform num", c.platformSerialNumber),
slog.Any("err", err))
return
}
slog.Error("read data",
slog.Any("platform num", c.platformSerialNumber),
slog.Any("err", err))
return
} else if n > 0 {
Expand All @@ -68,62 +85,63 @@ func (c *connection) reader() {
return
}
for _, msg := range msgs {
key := consts.JT808CommandType(msg.JTMessage.Header.ID)
if handler, ok := c.handles[key]; ok {
command := consts.JT808CommandType(msg.JTMessage.Header.ID)
fmt.Println(uint16(command), command.String())
if handler, ok := c.handles[command]; ok {
msg.Handler = handler
msg.OnReadExecutionEvent(msg)
} else {
slog.Warn("key not found",
slog.Int("id", int(msg.JTMessage.Header.ID)),
slog.String("remark", key.String()))
slog.String("remark", command.String()))
continue
}
fail := false
once.Do(func() {
if key, err := c.joinFunc(msg, c.activeMsgChan); err != nil {
fail = true
slog.Warn("key exist",
slog.String("effective data", fmt.Sprintf("%x", effectiveData)),
slog.Any("err", err))
return
} else {
c.key = key
}
})
if fail {
return
}
select {
case c.msgBuffChan <- msg:
default:
slog.Warn("msg buff full",
slog.String("original data", fmt.Sprintf("%x", msg.OriginalData)))
}
}

}
}
}
}

func (c *connection) write() {
// 到了math.MaxUint16后+1重新变成0
num := uint16(0)
record := map[uint16]*ActiveMessage{}
for {
select {
case <-c.stopChan:
clear(record)
return
case activeMsg, ok := <-c.activeMsgChan:
if ok {
c.onActiveEvent(activeMsg, record)
}
case msg, ok := <-c.msgBuffChan:
if ok {
header := msg.JTMessage.Header
header.ReplyID = uint16(msg.ReplyProtocol())
header.PlatformSerialNumber = num
num++
if has := msg.HasReply(); !has {
continue
}
body, err := msg.ReplyBody(msg.JTMessage)
if err != nil {
slog.Warn("reply body fail",
slog.String("original data", fmt.Sprintf("%x", msg.OriginalData)),
slog.Any("err", err))
continue
}
data := header.Encode(body)
_, err = c.conn.Write(data)
if err != nil {
slog.Warn("write fail",
slog.String("data", fmt.Sprintf("%x", data)),
slog.Any("err", err))
if len(record) > 0 { // 说明现在有主动的请求 等待回复中
if c.onActiveRespondEvent(record, msg) {
continue
}
}
msg.ReplyData = data
msg.WriteErr = err
msg.OnWriteExecutionEvent(*msg)
c.defaultReplyEvent(msg)
} else if msg != nil && len(msg.OriginalData) > 0 {
slog.Warn("msgBuffChan is close",
slog.String("original data", fmt.Sprintf("%x", msg.OriginalData)))
Expand All @@ -134,7 +152,124 @@ func (c *connection) write() {

func (c *connection) stop() {
c.stopOnce.Do(func() {
c.leaveFunc(c.key)
clear(c.handles)
close(c.msgBuffChan)
close(c.activeMsgChan)
close(c.stopChan)
})
}

func (c *connection) defaultReplyEvent(msg *Message) {
header := msg.JTMessage.Header
header.ReplyID = uint16(msg.ReplyProtocol())
header.PlatformSerialNumber = c.platformSerialNumber
if has := msg.HasReply(); !has {
return
}
body, err := msg.ReplyBody(msg.JTMessage)
if err != nil {
slog.Warn("reply body fail",
slog.String("original data", fmt.Sprintf("%x", msg.OriginalData)),
slog.Any("err", err))
return
}
c.platformSerialNumber++
data := header.Encode(body)
if _, err = c.conn.Write(data); err != nil {
slog.Warn("write fail",
slog.String("data", fmt.Sprintf("%x", data)),
slog.Any("err", err))
msg.WriteErr = errors.Join(ErrWriteDataFail, err)
}
msg.ReplyData = data
msg.OnWriteExecutionEvent(*msg)
}

func (c *connection) onActiveEvent(activeMsg *ActiveMessage, record map[uint16]*ActiveMessage) {
header := activeMsg.header
header.ReplyID = uint16(activeMsg.Command)
header.PlatformSerialNumber = c.platformSerialNumber
num := c.platformSerialNumber
record[num] = activeMsg
c.platformSerialNumber++
data := header.Encode(activeMsg.Body)
activeMsg.Data = data
_, err := c.conn.Write(data)
if err != nil {
slog.Warn("write fail",
slog.String("data", fmt.Sprintf("%x", data)),
slog.Any("err", err))
if activeMsg.hasComplete() {
return
}
delete(record, num)
activeMsg.replyChan <- newErrMessage(errors.Join(ErrWriteDataFail, err))
return
}
go func(msg *ActiveMessage, seq uint16) {
duration := 5 * time.Second
if msg.OverTimeDuration > 0 {
duration = msg.OverTimeDuration
}
time.Sleep(duration)
if activeMsg.hasComplete() {
return
}
delete(record, seq)
activeMsg.replyChan <- newErrMessage(errors.Join(ErrWriteDataOverTime,
fmt.Errorf("overtime is [%.2f]second", duration.Seconds())))
}(activeMsg, num)
if v, ok := c.handles[activeMsg.Command]; ok {
msg := NewMessage(data)
if err != nil {
msg.WriteErr = errors.Join(ErrWriteDataFail, err)
}
v.OnWriteExecutionEvent(*msg)
}
}

func (c *connection) onActiveRespondEvent(record map[uint16]*ActiveMessage, msg *Message) bool {
type respond struct {
JT808Handler
HasRespondFunc func(seq uint16) bool
}
tmp := respond{
JT808Handler: nil,
HasRespondFunc: nil,
}
switch consts.JT808CommandType(msg.JTMessage.Header.ID) {
case consts.T0001GeneralRespond:
t0x0001 := &model.T0x0001{}
tmp.JT808Handler = t0x0001
tmp.HasRespondFunc = func(seq uint16) bool {
return seq == t0x0001.SerialNumber
}
case consts.T0104QueryParameter:
t0x0104 := &model.T0x0104{}
tmp.JT808Handler = t0x0104
tmp.HasRespondFunc = func(seq uint16) bool {
return seq == t0x0104.RespondSerialNumber
}
}
if tmp.HasRespondFunc != nil {
if err := tmp.Parse(msg.JTMessage); err != nil {
slog.Warn("parse fail",
slog.String("original data", fmt.Sprintf("%x", msg.OriginalData)),
slog.Any("err", err))
return true
}
for k, v := range record {
if tmp.HasRespondFunc(k) {
if v.hasComplete() {
return true
}
delete(record, k)
v.replyChan <- msg
return true
}
}
}

return false
}
9 changes: 9 additions & 0 deletions service/error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package service

import "errors"

var (
ErrWriteDataFail = errors.New("write data fail")
ErrWriteDataOverTime = errors.New("write data is overtime")
ErrNotExistKey = errors.New("key not exist")
)
Loading

0 comments on commit 83ad132

Please sign in to comment.