Skip to content

Commit

Permalink
hls: fix discontinuity of TS counters between segments
Browse files Browse the repository at this point in the history
  • Loading branch information
aler9 committed Sep 7, 2021
1 parent 98bf53a commit ef4b925
Show file tree
Hide file tree
Showing 5 changed files with 253 additions and 215 deletions.
130 changes: 23 additions & 107 deletions internal/hls/muxer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import (
"time"

"github.com/aler9/gortsplib"

"github.com/aler9/rtsp-simple-server/internal/h264"
)

const (
Expand All @@ -18,20 +16,9 @@ const (

// Muxer is a HLS muxer.
type Muxer struct {
hlsSegmentCount int
hlsSegmentDuration time.Duration
videoTrack *gortsplib.Track
audioTrack *gortsplib.Track

h264Conf *gortsplib.TrackConfigH264
aacConf *gortsplib.TrackConfigAAC
videoDTSEst *h264.DTSEstimator
audioAUCount int
currentSegment *segment
startPCR time.Time
startPTS time.Duration
primaryPlaylist *primaryPlaylist
streamPlaylist *streamPlaylist
primaryPlaylist *muxerPrimaryPlaylist
streamPlaylist *muxerStreamPlaylist
tsGenerator *muxerTSGenerator
}

// NewMuxer allocates a Muxer.
Expand All @@ -58,16 +45,23 @@ func NewMuxer(
}
}

primaryPlaylist := newMuxerPrimaryPlaylist(videoTrack, audioTrack, h264Conf)

streamPlaylist := newMuxerStreamPlaylist(hlsSegmentCount)

tsGenerator := newMuxerTSGenerator(
hlsSegmentCount,
hlsSegmentDuration,
videoTrack,
audioTrack,
h264Conf,
aacConf,
streamPlaylist)

m := &Muxer{
hlsSegmentCount: hlsSegmentCount,
hlsSegmentDuration: hlsSegmentDuration,
videoTrack: videoTrack,
audioTrack: audioTrack,
h264Conf: h264Conf,
aacConf: aacConf,
currentSegment: newSegment(videoTrack, audioTrack, h264Conf, aacConf),
primaryPlaylist: newPrimaryPlaylist(videoTrack, audioTrack, h264Conf),
streamPlaylist: newStreamPlaylist(hlsSegmentCount),
primaryPlaylist: primaryPlaylist,
streamPlaylist: streamPlaylist,
tsGenerator: tsGenerator,
}

return m, nil
Expand All @@ -80,93 +74,15 @@ func (m *Muxer) Close() {

// WriteH264 writes H264 NALUs, grouped by PTS, into the muxer.
func (m *Muxer) WriteH264(pts time.Duration, nalus [][]byte) error {
idrPresent := func() bool {
for _, nalu := range nalus {
typ := h264.NALUType(nalu[0] & 0x1F)
if typ == h264.NALUTypeIDR {
return true
}
}
return false
}()

// skip group silently until we find one with a IDR
if !m.currentSegment.firstPacketWritten && !idrPresent {
return nil
}

if m.currentSegment.firstPacketWritten {
if idrPresent &&
m.currentSegment.duration() >= m.hlsSegmentDuration {
m.streamPlaylist.pushSegment(m.currentSegment)

m.currentSegment = newSegment(m.videoTrack, m.audioTrack, m.h264Conf, m.aacConf)
m.currentSegment.setStartPCR(m.startPCR)
}
} else {
m.startPCR = time.Now()
m.startPTS = pts
m.currentSegment.setStartPCR(m.startPCR)
m.videoDTSEst = h264.NewDTSEstimator()
}

pts -= m.startPTS

err := m.currentSegment.writeH264(
m.videoDTSEst.Feed(pts)+pcrOffset,
pts+pcrOffset,
idrPresent,
nalus)
if err != nil {
return err
}

return nil
return m.tsGenerator.writeH264(pts, nalus)
}

// WriteAAC writes AAC AUs, grouped by PTS, into the muxer.
func (m *Muxer) WriteAAC(pts time.Duration, aus [][]byte) error {
if m.videoTrack == nil {
if m.currentSegment.firstPacketWritten {
if m.audioAUCount >= segmentMinAUCount &&
m.currentSegment.duration() >= m.hlsSegmentDuration {
m.audioAUCount = 0

m.streamPlaylist.pushSegment(m.currentSegment)

m.currentSegment = newSegment(m.videoTrack, m.audioTrack, m.h264Conf, m.aacConf)
m.currentSegment.setStartPCR(m.startPCR)
}
} else {
m.startPCR = time.Now()
m.startPTS = pts
m.currentSegment.setStartPCR(m.startPCR)
}
} else {
if !m.currentSegment.firstPacketWritten {
return nil
}
}

pts = pts - m.startPTS + pcrOffset

for _, au := range aus {
err := m.currentSegment.writeAAC(pts, au)
if err != nil {
return err
}

if m.videoTrack == nil {
m.audioAUCount++
}

pts += 1000 * time.Second / time.Duration(m.aacConf.SampleRate)
}

return nil
return m.tsGenerator.writeAAC(pts, aus)
}

// PrimaryPlaylist returns a reader to read the primary playlist
// PrimaryPlaylist returns a reader to read the primary playlist.
func (m *Muxer) PrimaryPlaylist() io.Reader {
return m.primaryPlaylist.reader()
}
Expand All @@ -176,7 +92,7 @@ func (m *Muxer) StreamPlaylist() io.Reader {
return m.streamPlaylist.reader()
}

// Segment returns a reader to read a segment.
// Segment returns a reader to read a segment listed in the stream playlist.
func (m *Muxer) Segment(fname string) io.Reader {
return m.streamPlaylist.segment(fname)
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,20 @@ import (
"github.com/aler9/gortsplib"
)

type primaryPlaylist struct {
type muxerPrimaryPlaylist struct {
videoTrack *gortsplib.Track
audioTrack *gortsplib.Track
h264Conf *gortsplib.TrackConfigH264

cnt []byte
}

func newPrimaryPlaylist(
func newMuxerPrimaryPlaylist(
videoTrack *gortsplib.Track,
audioTrack *gortsplib.Track,
h264Conf *gortsplib.TrackConfigH264,
) *primaryPlaylist {
p := &primaryPlaylist{
) *muxerPrimaryPlaylist {
p := &muxerPrimaryPlaylist{
videoTrack: videoTrack,
audioTrack: audioTrack,
h264Conf: h264Conf,
Expand All @@ -45,6 +45,6 @@ func newPrimaryPlaylist(
return p
}

func (p *primaryPlaylist) reader() io.Reader {
func (p *muxerPrimaryPlaylist) reader() io.Reader {
return bytes.NewReader(p.cnt)
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,27 +21,27 @@ func (r *asyncReader) Read(buf []byte) (int, error) {
return r.reader.Read(buf)
}

type streamPlaylist struct {
type muxerStreamPlaylist struct {
hlsSegmentCount int

mutex sync.Mutex
cond *sync.Cond
closed bool
segments []*segment
segmentByName map[string]*segment
segments []*muxerTSSegment
segmentByName map[string]*muxerTSSegment
segmentDeleteCount int
}

func newStreamPlaylist(hlsSegmentCount int) *streamPlaylist {
p := &streamPlaylist{
func newMuxerStreamPlaylist(hlsSegmentCount int) *muxerStreamPlaylist {
p := &muxerStreamPlaylist{
hlsSegmentCount: hlsSegmentCount,
segmentByName: make(map[string]*segment),
segmentByName: make(map[string]*muxerTSSegment),
}
p.cond = sync.NewCond(&p.mutex)
return p
}

func (p *streamPlaylist) close() {
func (p *muxerStreamPlaylist) close() {
func() {
p.mutex.Lock()
defer p.mutex.Unlock()
Expand All @@ -51,7 +51,7 @@ func (p *streamPlaylist) close() {
p.cond.Broadcast()
}

func (p *streamPlaylist) reader() io.Reader {
func (p *muxerStreamPlaylist) reader() io.Reader {
return &asyncReader{generator: func() []byte {
p.mutex.Lock()
defer p.mutex.Unlock()
Expand Down Expand Up @@ -94,7 +94,7 @@ func (p *streamPlaylist) reader() io.Reader {
}}
}

func (p *streamPlaylist) segment(fname string) io.Reader {
func (p *muxerStreamPlaylist) segment(fname string) io.Reader {
base := strings.TrimSuffix(fname, ".ts")

p.mutex.Lock()
Expand All @@ -108,7 +108,7 @@ func (p *streamPlaylist) segment(fname string) io.Reader {
return f.reader()
}

func (p *streamPlaylist) pushSegment(t *segment) {
func (p *muxerStreamPlaylist) pushSegment(t *muxerTSSegment) {
func() {
p.mutex.Lock()
defer p.mutex.Unlock()
Expand Down
Loading

0 comments on commit ef4b925

Please sign in to comment.