From cb710ea2be96f12e7306b3e5c2983be25577476e Mon Sep 17 00:00:00 2001 From: Alex X Date: Wed, 11 Oct 2023 22:23:22 +0300 Subject: [PATCH] Update dvrip source processing --- pkg/dvrip/client.go | 70 +++++++++++++++++++++----- pkg/dvrip/consumer.go | 2 +- pkg/dvrip/producer.go | 113 ++++++++++++++++++------------------------ 3 files changed, 106 insertions(+), 79 deletions(-) diff --git a/pkg/dvrip/client.go b/pkg/dvrip/client.go index eefca775..345f291c 100644 --- a/pkg/dvrip/client.go +++ b/pkg/dvrip/client.go @@ -2,6 +2,7 @@ package dvrip import ( "bufio" + "bytes" "crypto/md5" "encoding/binary" "encoding/json" @@ -28,7 +29,8 @@ type Client struct { seq uint32 stream string - rd io.Reader + rd io.Reader + buf []byte } func (c *Client) Dial(rawURL string) (err error) { @@ -87,11 +89,11 @@ func (c *Client) Login(user, pass string) (err error) { SofiaHash(pass), user, ) - if _, err = c.Request(Login, []byte(data)); err != nil { + if _, err = c.WriteCmd(Login, []byte(data)); err != nil { return } - _, err = c.ResponseJSON() + _, err = c.ReadJSON() return } @@ -99,15 +101,15 @@ func (c *Client) Play() error { format := `{"Name":"OPMonitor","SessionID":"0x%08X","OPMonitor":{"Action":"%s","Parameter":%s}}` + "\x0A\x00" data := fmt.Sprintf(format, c.session, "Claim", c.stream) - if _, err := c.Request(OPMonitorClaim, []byte(data)); err != nil { + if _, err := c.WriteCmd(OPMonitorClaim, []byte(data)); err != nil { return err } - if _, err := c.ResponseJSON(); err != nil { + if _, err := c.ReadJSON(); err != nil { return err } data = fmt.Sprintf(format, c.session, "Start", c.stream) - _, err := c.Request(OPMonitorStart, []byte(data)) + _, err := c.WriteCmd(OPMonitorStart, []byte(data)) return err } @@ -115,19 +117,19 @@ func (c *Client) Talk() error { format := `{"Name":"OPTalk","SessionID":"0x%08X","OPTalk":{"Action":"%s"}}` + "\x0A\x00" data := fmt.Sprintf(format, c.session, "Claim") - if _, err := c.Request(OPTalkClaim, []byte(data)); err != nil { + if _, err := c.WriteCmd(OPTalkClaim, []byte(data)); err != nil { return err } - if _, err := c.ResponseJSON(); err != nil { + if _, err := c.ReadJSON(); err != nil { return err } data = fmt.Sprintf(format, c.session, "Start") - _, err := c.Request(OPTalkStart, []byte(data)) + _, err := c.WriteCmd(OPTalkStart, []byte(data)) return err } -func (c *Client) Request(cmd uint16, payload []byte) (n int, err error) { +func (c *Client) WriteCmd(cmd uint16, payload []byte) (n int, err error) { b := make([]byte, 20, 128) b[0] = 255 binary.LittleEndian.PutUint32(b[4:], c.session) @@ -145,7 +147,7 @@ func (c *Client) Request(cmd uint16, payload []byte) (n int, err error) { return c.conn.Write(b) } -func (c *Client) Response() (b []byte, err error) { +func (c *Client) ReadChunk() (b []byte, err error) { if err = c.conn.SetReadDeadline(time.Now().Add(time.Second * 5)); err != nil { return } @@ -170,10 +172,52 @@ func (c *Client) Response() (b []byte, err error) { return } +func (c *Client) ReadPacket() (pType byte, payload []byte, err error) { + var b []byte + + // many cameras may split packet to multiple chunks + // some rare cameras may put multiple packets to single chunk + for len(c.buf) < 16 { + if b, err = c.ReadChunk(); err != nil { + return 0, nil, err + } + c.buf = append(c.buf, b...) + } + + if !bytes.HasPrefix(c.buf, []byte{0, 0, 1}) { + return 0, nil, fmt.Errorf("dvrip: wrong packet: %0.16x", c.buf) + } + + var size int + + switch pType = c.buf[3]; pType { + case 0xFC, 0xFE: + size = int(binary.LittleEndian.Uint32(c.buf[12:])) + 16 + case 0xFD: // PFrame + size = int(binary.LittleEndian.Uint32(c.buf[4:])) + 8 + case 0xFA, 0xF9: + size = int(binary.LittleEndian.Uint16(c.buf[6:])) + 8 + default: + return 0, nil, fmt.Errorf("dvrip: unknown packet type: %X", pType) + } + + for len(c.buf) < size { + if b, err = c.ReadChunk(); err != nil { + return 0, nil, err + } + c.buf = append(c.buf, b...) + } + + payload = c.buf[:size] + c.buf = c.buf[size:] + + return +} + type Response map[string]any -func (c *Client) ResponseJSON() (res Response, err error) { - b, err := c.Response() +func (c *Client) ReadJSON() (res Response, err error) { + b, err := c.ReadChunk() if err != nil { return } diff --git a/pkg/dvrip/consumer.go b/pkg/dvrip/consumer.go index 3cd61204..7652c079 100644 --- a/pkg/dvrip/consumer.go +++ b/pkg/dvrip/consumer.go @@ -70,7 +70,7 @@ func (c *Consumer) AddTrack(media *core.Media, _ *core.Codec, track *core.Receiv for len(payload) >= PacketSize { buf = append(buf[:8], payload[:PacketSize]...) - if n, err := c.client.Request(OPTalkData, buf); err != nil { + if n, err := c.client.WriteCmd(OPTalkData, buf); err != nil { c.Send += n } diff --git a/pkg/dvrip/producer.go b/pkg/dvrip/producer.go index 4367a684..412dd0a3 100644 --- a/pkg/dvrip/producer.go +++ b/pkg/dvrip/producer.go @@ -29,21 +29,21 @@ type Producer struct { func (c *Producer) Start() error { for { - tag, size, b, err := c.readPacket() + pType, b, err := c.client.ReadPacket() if err != nil { return err } //log.Printf("[DVR] type: %d, len: %d", dataType, len(b)) - switch tag { + switch pType { case 0xFC, 0xFE, 0xFD: if c.video == nil { continue } var payload []byte - if tag != 0xFD { + if pType != 0xFD { payload = b[16:] // iframe } else { payload = b[8:] // pframe @@ -65,31 +65,29 @@ func (c *Producer) Start() error { continue } - for b != nil { - payload := b[8:size] - if len(b) > size { - b = b[size:] - } else { - b = nil - } - - c.audioTS += uint32(len(payload)) - c.audioSeq++ - - packet := &rtp.Packet{ - Header: rtp.Header{ - Version: 2, - Marker: true, - SequenceNumber: c.audioSeq, - Timestamp: c.audioTS, - }, - Payload: payload, - } - - //log.Printf("[DVR] len: %d, ts: %10d", len(packet.Payload), packet.Timestamp) - - c.audio.WriteRTP(packet) + payload := b[8:] + + c.audioTS += uint32(len(payload)) + c.audioSeq++ + + packet := &rtp.Packet{ + Header: rtp.Header{ + Version: 2, + Marker: true, + SequenceNumber: c.audioSeq, + Timestamp: c.audioTS, + }, + Payload: payload, } + + //log.Printf("[DVR] len: %d, ts: %10d", len(packet.Payload), packet.Timestamp) + + c.audio.WriteRTP(packet) + + case 0xF9: // unknown + + default: + println(fmt.Sprintf("dvrip: unknown packet type: %d", pType)) } } } @@ -105,14 +103,33 @@ func (c *Producer) probe() error { rd := core.NewReadBuffer(c.client.rd) rd.BufferSize = core.ProbeSize - defer rd.Reset() + defer func() { + c.client.buf = nil + rd.Reset() + }() c.client.rd = rd - timeout := time.Now().Add(core.ProbeTimeout) + // some awful cameras has VERY rare keyframes + // so we wait video+audio for default probe time + // and wait anything for 15 seconds + timeoutBoth := time.Now().Add(core.ProbeTimeout) + timeoutAny := time.Now().Add(time.Second * 15) - for (c.video == nil || c.audio == nil) && time.Now().Before(timeout) { - tag, _, b, err := c.readPacket() + for { + if now := time.Now(); now.Before(timeoutBoth) { + if c.video != nil && c.audio != nil { + return nil + } + } else if now.Before(timeoutAny) { + if c.video != nil || c.audio != nil { + return nil + } + } else { + return errors.New("dvrip: can't probe medias") + } + + tag, b, err := c.client.ReadPacket() if err != nil { return err } @@ -147,40 +164,6 @@ func (c *Producer) probe() error { c.addAudioTrack(b[4], b[5]) } } - - return nil -} - -func (c *Producer) readPacket() (tag byte, size int, data []byte, err error) { - if data, err = c.client.Response(); err != nil { - return 0, 0, nil, err - } - - switch tag = data[3]; tag { - case 0xFC, 0xFE: - size = int(binary.LittleEndian.Uint32(data[12:])) + 16 - case 0xFD: // PFrame - size = int(binary.LittleEndian.Uint32(data[4:])) + 8 - case 0xFA, 0xF9: - size = int(binary.LittleEndian.Uint16(data[6:])) + 8 - default: - return 0, 0, nil, fmt.Errorf("unknown type: %X", tag) - } - - // collect data from multiple packets - for len(data) < size { - b, err := c.client.Response() - if err != nil { - return 0, 0, nil, err - } - data = append(data, b...) - } - - if len(data) > size { - return 0, 0, nil, errors.New("wrong size") - } - - return } func (c *Producer) addVideoTrack(mediaCode byte, payload []byte) {