Skip to content

Commit

Permalink
adjust buffer sizes to avoid memory leaks (#43)
Browse files Browse the repository at this point in the history
  • Loading branch information
aler9 committed Jul 20, 2020
1 parent 25f59ab commit 50e2d86
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 10 deletions.
10 changes: 8 additions & 2 deletions server-client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ import (
const (
clientCheckStreamInterval = 5 * time.Second
clientReceiverReportInterval = 10 * time.Second
clientTcpReadBufferSize = 128 * 1024
clientTcpWriteBufferSize = 128 * 1024
clientUdpReadBufferSize = 2048
clientUdpWriteBufferSize = 128 * 1024
)

type serverClientTrack struct {
Expand Down Expand Up @@ -98,7 +102,7 @@ func newServerClient(p *program, nconn net.Conn) *serverClient {
WriteTimeout: p.conf.WriteTimeout,
}),
state: clientStateStarting,
readBuf: newDoubleBuffer(512 * 1024),
readBuf: newDoubleBuffer(clientTcpReadBufferSize),
done: make(chan struct{}),
}

Expand Down Expand Up @@ -788,7 +792,7 @@ func (c *serverClient) runPlay(path string) {
pconf := c.findConfForPath(path)

if c.streamProtocol == gortsplib.StreamProtocolTcp {
c.writeBuf = newDoubleBuffer(2048)
c.writeBuf = newDoubleBuffer(clientTcpWriteBufferSize)
c.events = make(chan serverClientEvent)
}

Expand Down Expand Up @@ -818,6 +822,7 @@ func (c *serverClient) runPlay(path string) {
readDone := make(chan error)
go func() {
buf := make([]byte, 2048)

for {
_, err := c.conn.NetConn().Read(buf)
if err != nil {
Expand Down Expand Up @@ -920,6 +925,7 @@ func (c *serverClient) runRecord(path string) {
for {
frame.Content = c.readBuf.swap()
frame.Content = frame.Content[:cap(frame.Content)]

recv, err := c.conn.ReadFrameOrRequest(frame)
if err != nil {
readDone <- err
Expand Down
4 changes: 2 additions & 2 deletions server-udpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ func newServerUdpListener(p *program, port int, streamType gortsplib.StreamType)
p: p,
nconn: nconn,
streamType: streamType,
readBuf: newDoubleBuffer(2048),
writeBuf: newDoubleBuffer(2048),
readBuf: newDoubleBuffer(clientUdpReadBufferSize),
writeBuf: newDoubleBuffer(clientUdpWriteBufferSize),
writeChan: make(chan *udpAddrBufPair),
done: make(chan struct{}),
}
Expand Down
13 changes: 7 additions & 6 deletions source.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ import (
)

const (
sourceRetryInterval = 5 * time.Second
sourceRetryInterval = 5 * time.Second
sourceUdpReadBufferSize = 2048
sourceTcpReadBufferSize = 128 * 1024
)

type source struct {
Expand All @@ -26,7 +28,6 @@ type source struct {
tracks []*gortsplib.Track
serverSdpText []byte
serverSdpParsed *sdp.SessionDescription
readBuf *doubleBuffer

terminate chan struct{}
done chan struct{}
Expand Down Expand Up @@ -71,7 +72,6 @@ func newSource(p *program, path string, sourceStr string, sourceProtocol string)
path: path,
u: u,
proto: proto,
readBuf: newDoubleBuffer(512 * 1024),
terminate: make(chan struct{}),
done: make(chan struct{}),
}
Expand Down Expand Up @@ -226,7 +226,7 @@ func (s *source) runUdp(conn *gortsplib.ConnClient) bool {
go func(trackId int, l *gortsplib.ConnClientUdpListener) {
defer wg.Done()

doubleBuf := newDoubleBuffer(2048)
doubleBuf := newDoubleBuffer(sourceUdpReadBufferSize)
for {
buf := doubleBuf.swap()

Expand All @@ -243,7 +243,7 @@ func (s *source) runUdp(conn *gortsplib.ConnClient) bool {
go func(trackId int, l *gortsplib.ConnClientUdpListener) {
defer wg.Done()

doubleBuf := newDoubleBuffer(2048)
doubleBuf := newDoubleBuffer(sourceUdpReadBufferSize)
for {
buf := doubleBuf.swap()

Expand Down Expand Up @@ -309,11 +309,12 @@ func (s *source) runTcp(conn *gortsplib.ConnClient) bool {
s.p.events <- programEventStreamerReady{s}

frame := &gortsplib.InterleavedFrame{}
doubleBuf := newDoubleBuffer(sourceTcpReadBufferSize)

tcpConnDone := make(chan error)
go func() {
for {
frame.Content = s.readBuf.swap()
frame.Content = doubleBuf.swap()
frame.Content = frame.Content[:cap(frame.Content)]

err := conn.ReadFrame(frame)
Expand Down

0 comments on commit 50e2d86

Please sign in to comment.