Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Video selector #472

Merged
merged 16 commits into from
Aug 25, 2023
2 changes: 2 additions & 0 deletions pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ var (
ErrNoConfig = psrpc.NewErrorf(psrpc.Internal, "missing config")
ErrGhostPadFailed = psrpc.NewErrorf(psrpc.Internal, "failed to add ghost pad to bin")
ErrStreamAlreadyExists = psrpc.NewErrorf(psrpc.AlreadyExists, "stream already exists")
ErrBinAlreadyAdded = psrpc.NewErrorf(psrpc.Internal, "bin already added to pipeline")
ErrWrongHierarchy = psrpc.NewErrorf(psrpc.Internal, "pipeline can contain bins or elements, not both")
ErrNonStreamingPipeline = psrpc.NewErrorf(psrpc.InvalidArgument, "UpdateStream called on non-streaming egress")
ErrEgressNotFound = psrpc.NewErrorf(psrpc.NotFound, "egress not found")
ErrNoCompatibleCodec = psrpc.NewErrorf(psrpc.InvalidArgument, "no supported codec is compatible with all outputs")
Expand Down
17 changes: 17 additions & 0 deletions pkg/gstreamer/bin.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type Bin struct {
getSrcPad func(string) *gst.Pad
getSinkPad func(string) *gst.Pad

added bool
srcs []*Bin // source bins
elements []*gst.Element // elements within this bin
queues map[string]*gst.Element // used with BinTypeMultiStream
Expand All @@ -59,6 +60,14 @@ func (b *Bin) AddSourceBin(src *Bin) error {
b.mu.Lock()
defer b.mu.Unlock()

src.mu.Lock()
alreadyAdded := src.added
src.added = true
src.mu.Unlock()
if alreadyAdded {
return errors.ErrBinAlreadyAdded
}

b.srcs = append(b.srcs, src)
if err := b.pipeline.Add(src.bin.Element); err != nil {
return errors.ErrGstPipelineError(err)
Expand Down Expand Up @@ -89,6 +98,14 @@ func (b *Bin) AddSinkBin(sink *Bin) error {
b.mu.Lock()
defer b.mu.Unlock()

sink.mu.Lock()
alreadyAdded := sink.added
sink.added = true
sink.mu.Unlock()
if alreadyAdded {
return errors.ErrBinAlreadyAdded
}

b.sinks = append(b.sinks, sink)
if err := b.pipeline.Add(sink.bin.Element); err != nil {
return errors.ErrGstPipelineError(err)
Expand Down
9 changes: 5 additions & 4 deletions pkg/gstreamer/callbacks.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package gstreamer

import (
"sync"
"time"

"github.com/tinyzimmer/go-gst/gst"

Expand All @@ -34,7 +35,7 @@ type Callbacks struct {
// source callbacks
onTrackAdded []func(*config.TrackSource)
onTrackMuted []func(string)
onTrackUnmuted []func(string)
onTrackUnmuted []func(string, time.Duration)
onTrackRemoved []func(string)

// internal
Expand Down Expand Up @@ -101,16 +102,16 @@ func (c *Callbacks) OnTrackMuted(trackID string) {
c.mu.RUnlock()
}

func (c *Callbacks) AddOnTrackUnmuted(f func(string)) {
func (c *Callbacks) AddOnTrackUnmuted(f func(string, time.Duration)) {
c.mu.Lock()
c.onTrackUnmuted = append(c.onTrackUnmuted, f)
c.mu.Unlock()
}

func (c *Callbacks) OnTrackUnmuted(trackID string) {
func (c *Callbacks) OnTrackUnmuted(trackID string, pts time.Duration) {
c.mu.RLock()
for _, onTrackUnmuted := range c.onTrackUnmuted {
onTrackUnmuted(trackID)
onTrackUnmuted(trackID, pts)
}
c.mu.RUnlock()
}
Expand Down
38 changes: 36 additions & 2 deletions pkg/gstreamer/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ type Pipeline struct {

loop *glib.MainLoop

started core.Fuse
running chan struct{}
binsAdded bool
elementsAdded bool
started core.Fuse
running chan struct{}
}

// A pipeline can have either elements or src and sink bins. If you add both you will get a wrong hierarchy error
Expand All @@ -54,6 +56,38 @@ func NewPipeline(name string, latency uint64, callbacks *Callbacks) (*Pipeline,
}, nil
}

func (p *Pipeline) AddSourceBin(src *Bin) error {
if p.elementsAdded {
return errors.ErrWrongHierarchy
}
p.binsAdded = true
return p.Bin.AddSourceBin(src)
}

func (p *Pipeline) AddSinkBin(sink *Bin) error {
if p.elementsAdded {
return errors.ErrWrongHierarchy
}
p.binsAdded = true
return p.Bin.AddSinkBin(sink)
}

func (p *Pipeline) AddElement(e *gst.Element) error {
if p.binsAdded {
return errors.ErrWrongHierarchy
}
p.elementsAdded = true
return p.Bin.AddElement(e)
}

func (p *Pipeline) AddElements(elements ...*gst.Element) error {
if p.binsAdded {
return errors.ErrWrongHierarchy
}
p.elementsAdded = true
return p.Bin.AddElements(elements...)
}

func (p *Pipeline) Link() error {
return p.link()
}
Expand Down
64 changes: 27 additions & 37 deletions pkg/pipeline/builder/audio.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ func buildWebAudioInput(b *gstreamer.Bin, p *config.PipelineConfig) error {
if err = pulseSrc.SetProperty("device", fmt.Sprintf("%s.monitor", p.Info.EgressId)); err != nil {
return errors.ErrGstPipelineError(err)
}

if err = b.AddElement(pulseSrc); err != nil {
return err
}
Expand All @@ -84,48 +83,40 @@ func buildWebAudioInput(b *gstreamer.Bin, p *config.PipelineConfig) error {

func buildSDKAudioInput(b *gstreamer.Bin, p *config.PipelineConfig) error {
if p.AudioTrack != nil {
appSrcBin, err := buildAudioAppSrcBin(b, p)
if err != nil {
if err := buildAudioAppSrcBin(b, p); err != nil {
return err
}
if err = b.AddSourceBin(appSrcBin); err != nil {
return err
}
}

testSrcBin, err := buildAudioTestSrcBin(b, p)
if err != nil {
return err
}
if err = b.AddSourceBin(testSrcBin); err != nil {
if err := buildAudioTestSrcBin(b, p); err != nil {
return err
}

if err = addAudioMixer(b, p); err != nil {
if err := addAudioMixer(b, p); err != nil {
return err
}
if p.AudioTranscoding {
if err = addAudioEncoder(b, p); err != nil {
if err := addAudioEncoder(b, p); err != nil {
return err
}
}

return nil
}

func buildAudioAppSrcBin(audioBin *gstreamer.Bin, p *config.PipelineConfig) (*gstreamer.Bin, error) {
func buildAudioAppSrcBin(audioBin *gstreamer.Bin, p *config.PipelineConfig) error {
track := p.AudioTrack

b := audioBin.NewBin(track.TrackID)
b.SetEOSFunc(track.EOSFunc)
if err := audioBin.AddSourceBin(b); err != nil {
return err
}

track.AppSrc.Element.SetArg("format", "time")
if err := track.AppSrc.Element.SetProperty("is-live", true); err != nil {
return nil, err
return err
}

if err := b.AddElement(track.AppSrc.Element); err != nil {
return nil, err
return err
}

switch track.MimeType {
Expand All @@ -134,61 +125,60 @@ func buildAudioAppSrcBin(audioBin *gstreamer.Bin, p *config.PipelineConfig) (*gs
"application/x-rtp,media=audio,payload=%d,encoding-name=OPUS,clock-rate=%d",
track.PayloadType, track.ClockRate,
))); err != nil {
return nil, errors.ErrGstPipelineError(err)
return errors.ErrGstPipelineError(err)
}

rtpOpusDepay, err := gst.NewElement("rtpopusdepay")
if err != nil {
return nil, errors.ErrGstPipelineError(err)
return errors.ErrGstPipelineError(err)
}

opusDec, err := gst.NewElement("opusdec")
if err != nil {
return nil, errors.ErrGstPipelineError(err)
return errors.ErrGstPipelineError(err)
}

if err = b.AddElements(rtpOpusDepay, opusDec); err != nil {
return nil, err
return err
}

default:
return nil, errors.ErrNotSupported(string(track.MimeType))
return errors.ErrNotSupported(string(track.MimeType))
}

if err := addAudioConverter(b, p); err != nil {
return nil, err
return err
}

return b, nil
return nil
}

func buildAudioTestSrcBin(audioBin *gstreamer.Bin, p *config.PipelineConfig) (*gstreamer.Bin, error) {
func buildAudioTestSrcBin(audioBin *gstreamer.Bin, p *config.PipelineConfig) error {
b := audioBin.NewBin("audio_test_src")
if err := audioBin.AddSourceBin(b); err != nil {
return err
}

audioTestSrc, err := gst.NewElement("audiotestsrc")
if err != nil {
return nil, errors.ErrGstPipelineError(err)
return errors.ErrGstPipelineError(err)
}
if err = audioTestSrc.SetProperty("volume", 0.0); err != nil {
return nil, errors.ErrGstPipelineError(err)
return errors.ErrGstPipelineError(err)
}
if err = audioTestSrc.SetProperty("do-timestamp", true); err != nil {
return nil, errors.ErrGstPipelineError(err)
return errors.ErrGstPipelineError(err)
}
if err = audioTestSrc.SetProperty("is-live", true); err != nil {
return nil, errors.ErrGstPipelineError(err)
return errors.ErrGstPipelineError(err)
}

audioCaps, err := newAudioCapsFilter(p)
if err != nil {
return nil, err
}

if err = b.AddElements(audioTestSrc, audioCaps); err != nil {
return nil, err
return err
}

return b, nil
return b.AddElements(audioTestSrc, audioCaps)
}

func addAudioConverter(b *gstreamer.Bin, p *config.PipelineConfig) error {
Expand Down
4 changes: 2 additions & 2 deletions pkg/pipeline/builder/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ func BuildSegmentBin(pipeline *gstreamer.Pipeline, p *config.PipelineConfig) (*g
return nil, errors.ErrGstPipelineError(err)
}

b.SetGetSrcPad(func(b string) *gst.Pad {
if b == "audio" {
b.SetGetSrcPad(func(name string) *gst.Pad {
if name == "audio" {
return sink.GetRequestPad("audio_%u")
} else {
return h264parse.GetStaticPad("sink")
Expand Down
Loading