Skip to content

Commit

Permalink
Update dvrip source processing
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexxIT committed Oct 11, 2023
1 parent 843a3ae commit cb710ea
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 79 deletions.
70 changes: 57 additions & 13 deletions pkg/dvrip/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package dvrip

import (
"bufio"
"bytes"
"crypto/md5"
"encoding/binary"
"encoding/json"
Expand All @@ -28,7 +29,8 @@ type Client struct {
seq uint32
stream string

rd io.Reader
rd io.Reader
buf []byte
}

func (c *Client) Dial(rawURL string) (err error) {
Expand Down Expand Up @@ -87,47 +89,47 @@ func (c *Client) Login(user, pass string) (err error) {
SofiaHash(pass), user,
)

if _, err = c.Request(Login, []byte(data)); err != nil {
if _, err = c.WriteCmd(Login, []byte(data)); err != nil {
return
}

_, err = c.ResponseJSON()
_, err = c.ReadJSON()
return
}

func (c *Client) Play() error {
format := `{"Name":"OPMonitor","SessionID":"0x%08X","OPMonitor":{"Action":"%s","Parameter":%s}}` + "\x0A\x00"

data := fmt.Sprintf(format, c.session, "Claim", c.stream)
if _, err := c.Request(OPMonitorClaim, []byte(data)); err != nil {
if _, err := c.WriteCmd(OPMonitorClaim, []byte(data)); err != nil {
return err
}
if _, err := c.ResponseJSON(); err != nil {
if _, err := c.ReadJSON(); err != nil {
return err
}

data = fmt.Sprintf(format, c.session, "Start", c.stream)
_, err := c.Request(OPMonitorStart, []byte(data))
_, err := c.WriteCmd(OPMonitorStart, []byte(data))
return err
}

func (c *Client) Talk() error {
format := `{"Name":"OPTalk","SessionID":"0x%08X","OPTalk":{"Action":"%s"}}` + "\x0A\x00"

data := fmt.Sprintf(format, c.session, "Claim")
if _, err := c.Request(OPTalkClaim, []byte(data)); err != nil {
if _, err := c.WriteCmd(OPTalkClaim, []byte(data)); err != nil {
return err
}
if _, err := c.ResponseJSON(); err != nil {
if _, err := c.ReadJSON(); err != nil {
return err
}

data = fmt.Sprintf(format, c.session, "Start")
_, err := c.Request(OPTalkStart, []byte(data))
_, err := c.WriteCmd(OPTalkStart, []byte(data))
return err
}

func (c *Client) Request(cmd uint16, payload []byte) (n int, err error) {
func (c *Client) WriteCmd(cmd uint16, payload []byte) (n int, err error) {
b := make([]byte, 20, 128)
b[0] = 255
binary.LittleEndian.PutUint32(b[4:], c.session)
Expand All @@ -145,7 +147,7 @@ func (c *Client) Request(cmd uint16, payload []byte) (n int, err error) {
return c.conn.Write(b)
}

func (c *Client) Response() (b []byte, err error) {
func (c *Client) ReadChunk() (b []byte, err error) {
if err = c.conn.SetReadDeadline(time.Now().Add(time.Second * 5)); err != nil {
return
}
Expand All @@ -170,10 +172,52 @@ func (c *Client) Response() (b []byte, err error) {
return
}

func (c *Client) ReadPacket() (pType byte, payload []byte, err error) {
var b []byte

// many cameras may split packet to multiple chunks
// some rare cameras may put multiple packets to single chunk
for len(c.buf) < 16 {
if b, err = c.ReadChunk(); err != nil {
return 0, nil, err
}
c.buf = append(c.buf, b...)
}

if !bytes.HasPrefix(c.buf, []byte{0, 0, 1}) {
return 0, nil, fmt.Errorf("dvrip: wrong packet: %0.16x", c.buf)
}

var size int

switch pType = c.buf[3]; pType {
case 0xFC, 0xFE:
size = int(binary.LittleEndian.Uint32(c.buf[12:])) + 16
case 0xFD: // PFrame
size = int(binary.LittleEndian.Uint32(c.buf[4:])) + 8
case 0xFA, 0xF9:
size = int(binary.LittleEndian.Uint16(c.buf[6:])) + 8
default:
return 0, nil, fmt.Errorf("dvrip: unknown packet type: %X", pType)
}

for len(c.buf) < size {
if b, err = c.ReadChunk(); err != nil {
return 0, nil, err
}
c.buf = append(c.buf, b...)
}

payload = c.buf[:size]
c.buf = c.buf[size:]

return
}

type Response map[string]any

func (c *Client) ResponseJSON() (res Response, err error) {
b, err := c.Response()
func (c *Client) ReadJSON() (res Response, err error) {
b, err := c.ReadChunk()
if err != nil {
return
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/dvrip/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (c *Consumer) AddTrack(media *core.Media, _ *core.Codec, track *core.Receiv

for len(payload) >= PacketSize {
buf = append(buf[:8], payload[:PacketSize]...)
if n, err := c.client.Request(OPTalkData, buf); err != nil {
if n, err := c.client.WriteCmd(OPTalkData, buf); err != nil {
c.Send += n
}

Expand Down
113 changes: 48 additions & 65 deletions pkg/dvrip/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,21 @@ type Producer struct {

func (c *Producer) Start() error {
for {
tag, size, b, err := c.readPacket()
pType, b, err := c.client.ReadPacket()
if err != nil {
return err
}

//log.Printf("[DVR] type: %d, len: %d", dataType, len(b))

switch tag {
switch pType {
case 0xFC, 0xFE, 0xFD:
if c.video == nil {
continue
}

var payload []byte
if tag != 0xFD {
if pType != 0xFD {
payload = b[16:] // iframe
} else {
payload = b[8:] // pframe
Expand All @@ -65,31 +65,29 @@ func (c *Producer) Start() error {
continue
}

for b != nil {
payload := b[8:size]
if len(b) > size {
b = b[size:]
} else {
b = nil
}

c.audioTS += uint32(len(payload))
c.audioSeq++

packet := &rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
SequenceNumber: c.audioSeq,
Timestamp: c.audioTS,
},
Payload: payload,
}

//log.Printf("[DVR] len: %d, ts: %10d", len(packet.Payload), packet.Timestamp)

c.audio.WriteRTP(packet)
payload := b[8:]

c.audioTS += uint32(len(payload))
c.audioSeq++

packet := &rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
SequenceNumber: c.audioSeq,
Timestamp: c.audioTS,
},
Payload: payload,
}

//log.Printf("[DVR] len: %d, ts: %10d", len(packet.Payload), packet.Timestamp)

c.audio.WriteRTP(packet)

case 0xF9: // unknown

default:
println(fmt.Sprintf("dvrip: unknown packet type: %d", pType))
}
}
}
Expand All @@ -105,14 +103,33 @@ func (c *Producer) probe() error {

rd := core.NewReadBuffer(c.client.rd)
rd.BufferSize = core.ProbeSize
defer rd.Reset()
defer func() {
c.client.buf = nil
rd.Reset()
}()

c.client.rd = rd

timeout := time.Now().Add(core.ProbeTimeout)
// some awful cameras has VERY rare keyframes
// so we wait video+audio for default probe time
// and wait anything for 15 seconds
timeoutBoth := time.Now().Add(core.ProbeTimeout)
timeoutAny := time.Now().Add(time.Second * 15)

for (c.video == nil || c.audio == nil) && time.Now().Before(timeout) {
tag, _, b, err := c.readPacket()
for {
if now := time.Now(); now.Before(timeoutBoth) {
if c.video != nil && c.audio != nil {
return nil
}
} else if now.Before(timeoutAny) {
if c.video != nil || c.audio != nil {
return nil
}
} else {
return errors.New("dvrip: can't probe medias")
}

tag, b, err := c.client.ReadPacket()
if err != nil {
return err
}
Expand Down Expand Up @@ -147,40 +164,6 @@ func (c *Producer) probe() error {
c.addAudioTrack(b[4], b[5])
}
}

return nil
}

func (c *Producer) readPacket() (tag byte, size int, data []byte, err error) {
if data, err = c.client.Response(); err != nil {
return 0, 0, nil, err
}

switch tag = data[3]; tag {
case 0xFC, 0xFE:
size = int(binary.LittleEndian.Uint32(data[12:])) + 16
case 0xFD: // PFrame
size = int(binary.LittleEndian.Uint32(data[4:])) + 8
case 0xFA, 0xF9:
size = int(binary.LittleEndian.Uint16(data[6:])) + 8
default:
return 0, 0, nil, fmt.Errorf("unknown type: %X", tag)
}

// collect data from multiple packets
for len(data) < size {
b, err := c.client.Response()
if err != nil {
return 0, 0, nil, err
}
data = append(data, b...)
}

if len(data) > size {
return 0, 0, nil, errors.New("wrong size")
}

return
}

func (c *Producer) addVideoTrack(mediaCode byte, payload []byte) {
Expand Down

0 comments on commit cb710ea

Please sign in to comment.