diff --git a/go.mod b/go.mod index 7ab19f51..e74d05bc 100644 --- a/go.mod +++ b/go.mod @@ -17,11 +17,11 @@ require ( github.com/go-logr/logr v1.2.4 github.com/googleapis/gax-go/v2 v2.11.0 github.com/gorilla/websocket v1.5.0 - github.com/livekit/livekit-server v1.4.4-0.20230612120056-afa773374840 + github.com/livekit/livekit-server v1.4.5-0.20230814182001-77c8e824735b github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 github.com/livekit/protocol v1.6.0 github.com/livekit/psrpc v0.3.2 - github.com/livekit/server-sdk-go v1.0.16-0.20230811173715-bab67d4a7670 + github.com/livekit/server-sdk-go v1.0.16-0.20230815025737-c12cd2eb8fe8 github.com/pion/rtp v1.8.1 github.com/pion/webrtc/v3 v3.2.14 github.com/pkg/errors v0.9.1 @@ -73,7 +73,7 @@ require ( github.com/klauspost/compress v1.16.5 // indirect github.com/klauspost/cpuid/v2 v2.0.9 // indirect github.com/lithammer/shortuuid/v4 v4.0.0 // indirect - github.com/livekit/mediatransportutil v0.0.0-20230716190407-fc4944cbc33a // indirect + github.com/livekit/mediatransportutil v0.0.0-20230814030822-8d5de0008b08 // indirect github.com/mackerelio/go-osstat v0.2.4 // indirect github.com/magefile/mage v1.15.0 // indirect github.com/mailru/easyjson v0.7.7 // indirect @@ -85,7 +85,7 @@ require ( github.com/nats-io/nuid v1.0.1 // indirect github.com/pion/datachannel v1.5.5 // indirect github.com/pion/dtls/v2 v2.2.7 // indirect - github.com/pion/ice/v2 v2.3.9 // indirect + github.com/pion/ice/v2 v2.3.10 // indirect github.com/pion/interceptor v0.1.17 // indirect github.com/pion/logging v0.2.2 // indirect github.com/pion/mdns v0.0.7 // indirect @@ -96,7 +96,7 @@ require ( github.com/pion/srtp/v2 v2.0.16 // indirect github.com/pion/stun v0.6.1 // indirect github.com/pion/transport/v2 v2.2.1 // indirect - github.com/pion/turn/v2 v2.1.2 // indirect + github.com/pion/turn/v2 v2.1.3 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_model v0.3.0 // indirect github.com/prometheus/common v0.42.0 // indirect @@ -111,8 +111,8 @@ require ( go.uber.org/goleak v1.1.12 // indirect go.uber.org/multierr v1.6.0 // indirect golang.org/x/crypto v0.11.0 // indirect - golang.org/x/exp v0.0.0-20230801115018-d63ba01acd4b // indirect - golang.org/x/net v0.12.0 // indirect + golang.org/x/exp v0.0.0-20230810033253-352e893a4cad // indirect + golang.org/x/net v0.13.0 // indirect golang.org/x/oauth2 v0.9.0 // indirect golang.org/x/sync v0.3.0 // indirect golang.org/x/sys v0.10.0 // indirect diff --git a/go.sum b/go.sum index 85c6fcfc..d0281fc6 100644 --- a/go.sum +++ b/go.sum @@ -171,18 +171,18 @@ github.com/livekit/go-glib v0.0.0-20230223001336-834490045522 h1:AlU57PAPgzde7q9 github.com/livekit/go-glib v0.0.0-20230223001336-834490045522/go.mod h1:ltV0gO6xNFzZhsIRbFXv8RTq9NGoNT2dmAER4YmZfaM= github.com/livekit/go-gst v0.2.34-0.20230623013326-3ca8cb8f62e5 h1:psIQLAp+yJETA+vZTVphV072nvCgcwC55xcufo1I5v0= github.com/livekit/go-gst v0.2.34-0.20230623013326-3ca8cb8f62e5/go.mod h1:0hI+orMYVT61TEh429LvmoV9UmyqjeTqdJ3DW2TX114= -github.com/livekit/livekit-server v1.4.4-0.20230612120056-afa773374840 h1:bi2ZWnpdrSu18Tx344mTIoRO0qSVqgvCC++nAADxto4= -github.com/livekit/livekit-server v1.4.4-0.20230612120056-afa773374840/go.mod h1:V8OlzLcbkG/8rQLLphsT379avf/CvXHRW8YlslL9q2M= +github.com/livekit/livekit-server v1.4.5-0.20230814182001-77c8e824735b h1:o+fyF5iijKHAMbzXFW6g4pRH/bR/cELRbTZHwRheF24= +github.com/livekit/livekit-server v1.4.5-0.20230814182001-77c8e824735b/go.mod h1:Dbul6eFPxLw/ItxGjovO1a2Gzrs2EHEuXgjADz6squI= github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkDaKb5iXdynYrzB84ErPPO4LbRASk58= github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= -github.com/livekit/mediatransportutil v0.0.0-20230716190407-fc4944cbc33a h1:JWpPHcMFuw0fP4swE89CfMgeUXiSN5IKvCJL/5HLI3A= -github.com/livekit/mediatransportutil v0.0.0-20230716190407-fc4944cbc33a/go.mod h1:xirUXW8xnLGmfCwUeAv/nj1VGo1OO1BmgxrYP7jK/14= +github.com/livekit/mediatransportutil v0.0.0-20230814030822-8d5de0008b08 h1:e0qwVjrtzmADgNZpdgSJgyhlF6BgrHkpdnkONL8pLrw= +github.com/livekit/mediatransportutil v0.0.0-20230814030822-8d5de0008b08/go.mod h1:xirUXW8xnLGmfCwUeAv/nj1VGo1OO1BmgxrYP7jK/14= github.com/livekit/protocol v1.6.0 h1:19S+vFZqnivKIOpyR3DEK/mSaykQ3UEf7H2G/mBOE54= github.com/livekit/protocol v1.6.0/go.mod h1:SUS9foM1xBzw/AFrgTJuFX/oSuwlnIbHmpdiPdCvwEM= github.com/livekit/psrpc v0.3.2 h1:eAaJhASme33gtoBhCRLH9jsnWcdm1tHWf0WzaDk56ew= github.com/livekit/psrpc v0.3.2/go.mod h1:n6JntEg+zT6Ji8InoyTpV7wusPNwGqqtxmHlkNhDN0U= -github.com/livekit/server-sdk-go v1.0.16-0.20230811173715-bab67d4a7670 h1:s22ZySAWzFymYn4B577XfyfDGULFJbyzu2nzohOA1GM= -github.com/livekit/server-sdk-go v1.0.16-0.20230811173715-bab67d4a7670/go.mod h1:4OIkcDpQJuOQJEThr+Z8+WQfkcs12BXC+9cuhigLnDM= +github.com/livekit/server-sdk-go v1.0.16-0.20230815025737-c12cd2eb8fe8 h1:Vm64l3U2rWAB2IdG/0/CXIZTgVPmldsQrDAOcq+zQQs= +github.com/livekit/server-sdk-go v1.0.16-0.20230815025737-c12cd2eb8fe8/go.mod h1:4OIkcDpQJuOQJEThr+Z8+WQfkcs12BXC+9cuhigLnDM= github.com/mackerelio/go-osstat v0.2.4 h1:qxGbdPkFo65PXOb/F/nhDKpF2nGmGaCFDLXoZjJTtUs= github.com/mackerelio/go-osstat v0.2.4/go.mod h1:Zy+qzGdZs3A9cuIqmgbJvwbmLQH9dJvtio5ZjJTbdlQ= github.com/magefile/mage v1.15.0 h1:BvGheCMAsG3bWUDbZ8AyXXpCNwU9u5CB6sM+HNb9HYg= @@ -219,8 +219,9 @@ github.com/pion/datachannel v1.5.5 h1:10ef4kwdjije+M9d7Xm9im2Y3O6A6ccQb0zcqZcJew github.com/pion/datachannel v1.5.5/go.mod h1:iMz+lECmfdCMqFRhXhcA/219B0SQlbpoR2V118yimL0= github.com/pion/dtls/v2 v2.2.7 h1:cSUBsETxepsCSFSxC3mc/aDo14qQLMSL+O6IjG28yV8= github.com/pion/dtls/v2 v2.2.7/go.mod h1:8WiMkebSHFD0T+dIU+UeBaoV7kDhOW5oDCzZ7WZ/F9s= -github.com/pion/ice/v2 v2.3.9 h1:7yZpHf3PhPxJGT4JkMj1Y8Rl5cQ6fB709iz99aeMd/U= github.com/pion/ice/v2 v2.3.9/go.mod h1:lT3kv5uUIlHfXHU/ZRD7uKD/ufM202+eTa3C/umgGf4= +github.com/pion/ice/v2 v2.3.10 h1:T3bUJKqh7pGEdMyTngUcTeQd6io9X8JjgsVWZDannnY= +github.com/pion/ice/v2 v2.3.10/go.mod h1:hHGCibDfmXGqukayQw979xEctASp2Pe5Oe0iDU8pRus= github.com/pion/interceptor v0.1.17 h1:prJtgwFh/gB8zMqGZoOgJPHivOwVAp61i2aG61Du/1w= github.com/pion/interceptor v0.1.17/go.mod h1:SY8kpmfVBvrbUzvj2bsXz7OJt5JvmVNZ+4Kjq7FcwrI= github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY= @@ -251,8 +252,9 @@ github.com/pion/transport/v2 v2.1.0/go.mod h1:AdSw4YBZVDkZm8fpoz+fclXyQwANWmZAlD github.com/pion/transport/v2 v2.2.0/go.mod h1:AdSw4YBZVDkZm8fpoz+fclXyQwANWmZAlDuQdctTThQ= github.com/pion/transport/v2 v2.2.1 h1:7qYnCBlpgSJNYMbLCKuSY9KbQdBFoETvPNETv0y4N7c= github.com/pion/transport/v2 v2.2.1/go.mod h1:cXXWavvCnFF6McHTft3DWS9iic2Mftcz1Aq29pGcU5g= -github.com/pion/turn/v2 v2.1.2 h1:wj0cAoGKltaZ790XEGW9HwoUewqjliwmhtxCuB2ApyM= github.com/pion/turn/v2 v2.1.2/go.mod h1:1kjnPkBcex3dhCU2Am+AAmxDcGhLX3WnMfmkNpvSTQU= +github.com/pion/turn/v2 v2.1.3 h1:pYxTVWG2gpC97opdRc5IGsQ1lJ9O/IlNhkzj7MMrGAA= +github.com/pion/turn/v2 v2.1.3/go.mod h1:huEpByKKHix2/b9kmTAM3YoX6MKP+/D//0ClgUYR2fY= github.com/pion/webrtc/v3 v3.2.14 h1:GlqnBnnLlcYYA/LOwqLLU1plZYwx0Y/e/57bZ2tzQcU= github.com/pion/webrtc/v3 v3.2.14/go.mod h1:r1mtixc2MH847mmQTPwlEvGge7D18C2T5qp8jI9Lm44= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= @@ -329,8 +331,8 @@ golang.org/x/crypto v0.10.0/go.mod h1:o4eNf7Ede1fv+hwOwZsTHl9EsPFO6q6ZvYR8vYfY45 golang.org/x/crypto v0.11.0 h1:6Ewdq3tDic1mg5xRO4milcWCfMVQhI4NkqWWvqejpuA= golang.org/x/crypto v0.11.0/go.mod h1:xgJhtzW8F9jGdVFWZESrid1U1bjeNy4zgy5cRr/CIio= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= -golang.org/x/exp v0.0.0-20230801115018-d63ba01acd4b h1:r+vk0EmXNmekl0S0BascoeeoHk/L7wmaW2QF90K+kYI= -golang.org/x/exp v0.0.0-20230801115018-d63ba01acd4b/go.mod h1:FXUEEKJgO7OQYeo8N01OfiKP8RXMtf6e8aTskBGqWdc= +golang.org/x/exp v0.0.0-20230810033253-352e893a4cad h1:g0bG7Z4uG+OgH2QDODnjp6ggkk1bJDsINcuWmJN1iJU= +golang.org/x/exp v0.0.0-20230810033253-352e893a4cad/go.mod h1:FXUEEKJgO7OQYeo8N01OfiKP8RXMtf6e8aTskBGqWdc= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= @@ -366,8 +368,9 @@ golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc= golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns= golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= golang.org/x/net v0.11.0/go.mod h1:2L/ixqYpgIVXmeoSA/4Lu7BzTG4KIyPIryS4IsOd1oQ= -golang.org/x/net v0.12.0 h1:cfawfvKITfUsFCeJIHJrbSxpeu/E81khclypR0GVT50= golang.org/x/net v0.12.0/go.mod h1:zEVYFnQC7m/vmpQFELhcD1EWkZlX69l4oqgmer6hfKA= +golang.org/x/net v0.13.0 h1:Nvo8UFsZ8X3BhAC9699Z1j7XQ3rsZnUUm7jfBEk1ueY= +golang.org/x/net v0.13.0/go.mod h1:zEVYFnQC7m/vmpQFELhcD1EWkZlX69l4oqgmer6hfKA= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.9.0 h1:BPpt2kU7oMRq3kCHAA1tbSEshXRw1LpG2ztgDwrzuAs= diff --git a/pkg/config/pipeline.go b/pkg/config/pipeline.go index cd3ee245..36ef8fb5 100644 --- a/pkg/config/pipeline.go +++ b/pkg/config/pipeline.go @@ -90,10 +90,12 @@ type SDKSourceParams struct { } type TrackSource struct { - TrackID string - Kind lksdk.TrackKind - AppSrc *app.Source - Codec webrtc.RTPCodecParameters + TrackID string + Kind lksdk.TrackKind + AppSrc *app.Source + MimeType types.MimeType + PayloadType webrtc.PayloadType + ClockRate uint32 } type AudioConfig struct { @@ -390,8 +392,7 @@ func (p *PipelineConfig) Update(request *rpc.StartEgressRequest) error { } } - if p.TrackID == "" { - // Track egress output format decision happens after join + if p.RequestType != types.RequestTypeTrack { err := p.validateAndUpdateOutputParams() if err != nil { return err diff --git a/pkg/errors/errors.go b/pkg/errors/errors.go index 9341ec69..6cc46756 100644 --- a/pkg/errors/errors.go +++ b/pkg/errors/errors.go @@ -31,7 +31,6 @@ var ( ErrNoCompatibleCodec = psrpc.NewErrorf(psrpc.InvalidArgument, "no supported codec is compatible with all outputs") ErrNoCompatibleFileOutputType = psrpc.NewErrorf(psrpc.InvalidArgument, "no supported file output type is compatible with the selected codecs") ErrResourceExhausted = psrpc.NewErrorf(psrpc.ResourceExhausted, "not enough CPU") - ErrVideoWebsocket = psrpc.NewErrorf(psrpc.InvalidArgument, "cannot send video over websocket") ErrInvalidTrack = psrpc.NewErrorf(psrpc.Internal, "unexpected track type") ) diff --git a/pkg/pipeline/input/audio.go b/pkg/pipeline/input/audio.go index 3446d8d2..c9bdd7f5 100644 --- a/pkg/pipeline/input/audio.go +++ b/pkg/pipeline/input/audio.go @@ -16,7 +16,6 @@ package input import ( "fmt" - "strings" "github.com/tinyzimmer/go-gst/gst" @@ -72,14 +71,12 @@ func (a *audioInput) buildAppSource(track *config.TrackSource) error { } a.src = []*gst.Element{track.AppSrc.Element} - switch { - case strings.EqualFold(track.Codec.MimeType, string(types.MimeTypeOpus)): - if err := track.AppSrc.Element.SetProperty("caps", gst.NewCapsFromString( - fmt.Sprintf( - "application/x-rtp,media=audio,payload=%d,encoding-name=OPUS,clock-rate=%d", - track.Codec.PayloadType, track.Codec.ClockRate, - ), - )); err != nil { + switch track.MimeType { + case types.MimeTypeOpus: + if err := track.AppSrc.Element.SetProperty("caps", gst.NewCapsFromString(fmt.Sprintf( + "application/x-rtp,media=audio,payload=%d,encoding-name=OPUS,clock-rate=%d", + track.PayloadType, track.ClockRate, + ))); err != nil { return errors.ErrGstPipelineError(err) } @@ -96,7 +93,7 @@ func (a *audioInput) buildAppSource(track *config.TrackSource) error { a.src = append(a.src, rtpOpusDepay, opusDec) default: - return errors.ErrNotSupported(track.Codec.MimeType) + return errors.ErrNotSupported(string(track.MimeType)) } return nil @@ -209,9 +206,10 @@ func newAudioCapsFilter(p *config.PipelineConfig) (*gst.Element, error) { "audio/x-raw,format=S16LE,layout=interleaved,rate=48000,channels=2", ) case types.MimeTypeAAC: - caps = gst.NewCapsFromString( - fmt.Sprintf("audio/x-raw,format=S16LE,layout=interleaved,rate=%d,channels=2", p.AudioFrequency), - ) + caps = gst.NewCapsFromString(fmt.Sprintf( + "audio/x-raw,format=S16LE,layout=interleaved,rate=%d,channels=2", + p.AudioFrequency, + )) default: return nil, errors.ErrNotSupported(string(p.AudioOutCodec)) } diff --git a/pkg/pipeline/input/video.go b/pkg/pipeline/input/video.go index 00803757..d979af87 100644 --- a/pkg/pipeline/input/video.go +++ b/pkg/pipeline/input/video.go @@ -16,7 +16,6 @@ package input import ( "fmt" - "strings" "github.com/tinyzimmer/go-gst/gst" @@ -60,8 +59,10 @@ func (v *videoInput) buildWebInput(p *config.PipelineConfig) error { if err != nil { return errors.ErrGstPipelineError(err) } - if err = caps.SetProperty("caps", gst.NewCapsFromString( - fmt.Sprintf("video/x-raw,framerate=%d/1", p.Framerate), + if err = caps.SetProperty("caps", gst.NewCapsFromString(fmt.Sprintf( + "video/x-raw,framerate=%d/1", + p.Framerate, + ), )); err != nil { return errors.ErrGstPipelineError(err) } @@ -87,11 +88,11 @@ func (v *videoInput) buildAppSource(p *config.PipelineConfig, track *config.Trac } v.src = append(v.src, track.AppSrc.Element) - switch { - case strings.EqualFold(track.Codec.MimeType, string(types.MimeTypeH264)): + switch track.MimeType { + case types.MimeTypeH264: if err := track.AppSrc.Element.SetProperty("caps", gst.NewCapsFromString(fmt.Sprintf( "application/x-rtp,media=video,payload=%d,encoding-name=H264,clock-rate=%d", - track.Codec.PayloadType, track.Codec.ClockRate, + track.PayloadType, track.ClockRate, ))); err != nil { return errors.ErrGstPipelineError(err) } @@ -121,23 +122,20 @@ func (v *videoInput) buildAppSource(p *config.PipelineConfig, track *config.Trac v.src = append(v.src, avDecH264) } else { - h264parse, err := gst.NewElement("h264parse") + h264Parse, err := gst.NewElement("h264parse") if err != nil { return errors.ErrGstPipelineError(err) } - v.src = append(v.src, h264parse) - + v.src = append(v.src, h264Parse) return nil } - case strings.EqualFold(track.Codec.MimeType, string(types.MimeTypeVP8)): - if err := track.AppSrc.Element.SetProperty("caps", gst.NewCapsFromString( - fmt.Sprintf( - "application/x-rtp,media=video,payload=%d,encoding-name=VP8,clock-rate=%d", - track.Codec.PayloadType, track.Codec.ClockRate, - ), - )); err != nil { + case types.MimeTypeVP8: + if err := track.AppSrc.Element.SetProperty("caps", gst.NewCapsFromString(fmt.Sprintf( + "application/x-rtp,media=video,payload=%d,encoding-name=VP8,clock-rate=%d", + track.PayloadType, track.ClockRate, + ))); err != nil { return errors.ErrGstPipelineError(err) } @@ -158,8 +156,49 @@ func (v *videoInput) buildAppSource(p *config.PipelineConfig, track *config.Trac v.src = append(v.src, vp8Dec) + case types.MimeTypeVP9: + if err := track.AppSrc.Element.SetProperty("caps", gst.NewCapsFromString(fmt.Sprintf( + "application/x-rtp,media=video,payload=%d,encoding-name=VP9,clock-rate=%d", + track.PayloadType, track.ClockRate, + ))); err != nil { + return errors.ErrGstPipelineError(err) + } + + rtpVP9Depay, err := gst.NewElement("rtpvp9depay") + if err != nil { + return errors.ErrGstPipelineError(err) + } + v.src = append(v.src, rtpVP9Depay) + + if p.VideoTranscoding { + vp9Dec, err := gst.NewElement("vp9dec") + if err != nil { + return errors.ErrGstPipelineError(err) + } + + v.src = append(v.src, vp9Dec) + } else { + vp9Parse, err := gst.NewElement("vp9parse") + if err != nil { + return errors.ErrGstPipelineError(err) + } + + vp9Caps, err := gst.NewElement("capsfilter") + if err != nil { + return errors.ErrGstPipelineError(err) + } + if err = vp9Caps.SetProperty("caps", gst.NewCapsFromString( + "video/x-vp9,width=[16,2147483647],height=[16,2147483647]", + )); err != nil { + return errors.ErrGstPipelineError(err) + } + + v.src = append(v.src, vp9Parse, vp9Caps) + return nil + } + default: - return errors.ErrNotSupported(track.Codec.MimeType) + return errors.ErrNotSupported(string(track.MimeType)) } videoQueue, err := builder.BuildQueue("video_input_queue", p.Latency, true) @@ -186,11 +225,10 @@ func (v *videoInput) buildAppSource(p *config.PipelineConfig, track *config.Trac if err != nil { return errors.ErrGstPipelineError(err) } - if err = caps.SetProperty("caps", gst.NewCapsFromString( - fmt.Sprintf("video/x-raw,framerate=%d/1,format=I420,width=%d,height=%d,colorimetry=bt709,chroma-site=mpeg2,pixel-aspect-ratio=1/1", - p.Framerate, p.Width, p.Height, - )), - ); err != nil { + if err = caps.SetProperty("caps", gst.NewCapsFromString(fmt.Sprintf( + "video/x-raw,framerate=%d/1,format=I420,width=%d,height=%d,colorimetry=bt709,chroma-site=mpeg2,pixel-aspect-ratio=1/1", + p.Framerate, p.Width, p.Height, + ))); err != nil { return errors.ErrGstPipelineError(err) } @@ -217,13 +255,11 @@ func (v *videoInput) buildEncoder(p *config.PipelineConfig) error { return errors.ErrGstPipelineError(err) } x264Enc.SetArg("speed-preset", "veryfast") - if p.KeyFrameInterval != 0 { if err = x264Enc.SetProperty("key-int-max", uint(p.KeyFrameInterval*float64(p.Framerate))); err != nil { return errors.ErrGstPipelineError(err) } } - if p.GetSegmentConfig() != nil { // Avoid key frames other than at segments boundaries as splitmuxsink can become inconsistent otherwise if err = x264Enc.SetProperty("option-string", "scenecut=0"); err != nil { @@ -235,16 +271,46 @@ func (v *videoInput) buildEncoder(p *config.PipelineConfig) error { if err != nil { return errors.ErrGstPipelineError(err) } - - if err = caps.SetProperty("caps", gst.NewCapsFromString( - fmt.Sprintf("video/x-h264,profile=%s", p.VideoProfile), - )); err != nil { + if err = caps.SetProperty("caps", gst.NewCapsFromString(fmt.Sprintf( + "video/x-h264,profile=%s", + p.VideoProfile, + ))); err != nil { return errors.ErrGstPipelineError(err) } v.encoder = append(v.encoder, x264Enc, caps) return nil + case types.MimeTypeVP9: + vp9Enc, err := gst.NewElement("vp9enc") + if err != nil { + return errors.ErrGstPipelineError(err) + } + if err = vp9Enc.SetProperty("deadline", int64(1)); err != nil { + return errors.ErrGstPipelineError(err) + } + if err = vp9Enc.SetProperty("row-mt", true); err != nil { + return errors.ErrGstPipelineError(err) + } + if err = vp9Enc.SetProperty("tile-columns", 3); err != nil { + return errors.ErrGstPipelineError(err) + } + if err = vp9Enc.SetProperty("tile-rows", 1); err != nil { + return errors.ErrGstPipelineError(err) + } + if err = vp9Enc.SetProperty("frame-parallel", true); err != nil { + return errors.ErrGstPipelineError(err) + } + if err = vp9Enc.SetProperty("max-quantizer", 52); err != nil { + return errors.ErrGstPipelineError(err) + } + if err = vp9Enc.SetProperty("min-quantizer", 2); err != nil { + return errors.ErrGstPipelineError(err) + } + + v.encoder = append(v.encoder, vp9Enc) + return errors.ErrNotSupported(fmt.Sprintf("%s encoding", p.VideoOutCodec)) + default: return errors.ErrNotSupported(fmt.Sprintf("%s encoding", p.VideoOutCodec)) } diff --git a/pkg/pipeline/source/sdk.go b/pkg/pipeline/source/sdk.go index 4eea2a34..ca4a0024 100644 --- a/pkg/pipeline/source/sdk.go +++ b/pkg/pipeline/source/sdk.go @@ -256,99 +256,92 @@ func (s *SDKSource) onTrackSubscribed(track *webrtc.TrackRemote, pub *lksdk.Remo }() s.active.Inc() - t := s.sync.AddTrack(track, rp.Identity()) - if !s.initialized.IsBroken() { - s.mu.Lock() - switch s.RequestType { - case types.RequestTypeTrackComposite: - if s.Identity == "" || track.Kind() == webrtc.RTPCodecTypeVideo { - s.Identity = rp.Identity() - s.filenameReplacements["{publisher_identity}"] = s.Identity - } - case types.RequestTypeTrack: - if track.Kind() == webrtc.RTPCodecTypeAudio { - s.TrackKind = "audio" - } else { - s.TrackKind = "video" - // check for video over websocket - if s.Outputs[types.EgressTypeWebsocket] != nil { - onSubscribeErr = errors.ErrVideoWebsocket - return - } - } - s.TrackSource = strings.ToLower(pub.Source().String()) - - s.filenameReplacements["{track_id}"] = s.TrackID - s.filenameReplacements["{track_type}"] = s.TrackKind - s.filenameReplacements["{track_source}"] = s.TrackSource - s.filenameReplacements["{publisher_identity}"] = s.Identity - } - s.mu.Unlock() + ts := &config.TrackSource{ + TrackID: pub.SID(), + Kind: pub.Kind(), + MimeType: types.MimeType(strings.ToLower(track.Codec().MimeType)), + PayloadType: track.Codec().PayloadType, + ClockRate: track.Codec().ClockRate, } - var codec types.MimeType - var writeBlanks bool - - switch { - case strings.EqualFold(track.Codec().MimeType, string(types.MimeTypeOpus)): - codec = types.MimeTypeOpus - + switch ts.MimeType { + case types.MimeTypeOpus: s.AudioEnabled = true - s.AudioInCodec = codec + s.AudioInCodec = ts.MimeType if s.AudioOutCodec == "" { - // This should only happen for track egress - s.AudioOutCodec = codec + s.AudioOutCodec = ts.MimeType } s.AudioTranscoding = true - if s.RequestType == types.RequestTypeTrack { - if o := s.GetFileConfig(); o != nil { - o.OutputType = types.OutputTypeOGG - } + writer, err := s.createWriter(track, rp, ts, false) + if err != nil { + onSubscribeErr = err + return } - case strings.EqualFold(track.Codec().MimeType, string(types.MimeTypeVP8)): - codec = types.MimeTypeVP8 + s.audioWriter = writer + s.AudioTrack = ts + case types.MimeTypeH264, types.MimeTypeVP8, types.MimeTypeVP9: s.VideoEnabled = true - s.VideoInCodec = codec + s.VideoInCodec = ts.MimeType if s.VideoOutCodec == "" { - // This should only happen for track egress - s.VideoOutCodec = codec + s.VideoOutCodec = ts.MimeType } - if s.VideoOutCodec != codec { + if s.VideoInCodec != s.VideoOutCodec { s.VideoTranscoding = true - writeBlanks = true } - if s.RequestType == types.RequestTypeTrack { - if o := s.GetFileConfig(); o != nil { - o.OutputType = types.OutputTypeWebM - } + writeBlanks := s.VideoTranscoding && ts.MimeType != types.MimeTypeVP9 + writer, err := s.createWriter(track, rp, ts, writeBlanks) + if err != nil { + onSubscribeErr = err + return } - case strings.EqualFold(track.Codec().MimeType, string(types.MimeTypeH264)): - codec = types.MimeTypeH264 + s.videoWriter = writer + s.VideoTrack = ts - s.VideoEnabled = true - s.VideoInCodec = codec - if s.VideoOutCodec == "" { - // This should only happen for track egress - s.VideoOutCodec = types.MimeTypeH264 - } + default: + onSubscribeErr = errors.ErrNotSupported(string(ts.MimeType)) + return + } - if s.RequestType == types.RequestTypeTrack { + if !s.initialized.IsBroken() { + s.mu.Lock() + switch s.RequestType { + case types.RequestTypeTrackComposite: + if s.Identity == "" || track.Kind() == webrtc.RTPCodecTypeVideo { + s.Identity = rp.Identity() + s.filenameReplacements["{publisher_identity}"] = s.Identity + } + case types.RequestTypeTrack: + s.TrackKind = pub.Kind().String() + if pub.Kind() == lksdk.TrackKindVideo && s.Outputs[types.EgressTypeWebsocket] != nil { + onSubscribeErr = errors.ErrIncompatible("websocket", ts.MimeType) + return + } + s.TrackSource = strings.ToLower(pub.Source().String()) if o := s.GetFileConfig(); o != nil { - o.OutputType = types.OutputTypeMP4 + o.OutputType = types.TrackOutputTypes[ts.MimeType] } - } - default: - onSubscribeErr = errors.ErrNotSupported(track.Codec().MimeType) - return + s.filenameReplacements["{track_id}"] = s.TrackID + s.filenameReplacements["{track_type}"] = s.TrackKind + s.filenameReplacements["{track_source}"] = s.TrackSource + s.filenameReplacements["{publisher_identity}"] = s.Identity + } + s.mu.Unlock() } +} +func (s *SDKSource) createWriter( + track *webrtc.TrackRemote, + rp *lksdk.RemoteParticipant, + ts *config.TrackSource, + writeBlanks bool, +) (*sdk.AppWriter, error) { var logFilename string if s.Debug.EnableProfiling { if s.Debug.ToUploadConfig() == nil { @@ -361,33 +354,16 @@ func (s *SDKSource) onTrackSubscribed(track *webrtc.TrackRemote, pub *lksdk.Remo <-s.GstReady src, err := gst.NewElementWithName("appsrc", track.ID()) if err != nil { - onSubscribeErr = errors.ErrGstPipelineError(err) - return + return nil, errors.ErrGstPipelineError(err) } - appSrc := app.SrcFromElement(src) - writer, err := sdk.NewAppWriter(track, rp, codec, appSrc, s.sync, t, writeBlanks, logFilename) + ts.AppSrc = app.SrcFromElement(src) + writer, err := sdk.NewAppWriter(track, rp, ts, s.sync, writeBlanks, logFilename) if err != nil { - logger.Errorw("could not create app writer", err) - onSubscribeErr = err - return - } - - ts := &config.TrackSource{ - TrackID: pub.SID(), - Kind: pub.Kind(), - AppSrc: appSrc, - Codec: track.Codec(), + return nil, err } - switch track.Kind() { - case webrtc.RTPCodecTypeAudio: - s.audioWriter = writer - s.AudioTrack = ts - case webrtc.RTPCodecTypeVideo: - s.videoWriter = writer - s.VideoTrack = ts - } + return writer, nil } func (s *SDKSource) onTrackMuted(pub lksdk.TrackPublication, _ lksdk.Participant) { @@ -408,15 +384,6 @@ func (s *SDKSource) onTrackUnmuted(pub lksdk.TrackPublication, _ lksdk.Participa } } -func (s *SDKSource) onTrackUnsubscribed(_ *webrtc.TrackRemote, pub *lksdk.RemoteTrackPublication, _ *lksdk.RemoteParticipant) { - if w := s.getWriterForTrack(pub.SID()); w != nil { - w.Drain(true) - if s.active.Dec() == 0 { - s.onDisconnected() - } - } -} - func (s *SDKSource) getWriterForTrack(trackID string) *sdk.AppWriter { if s.audioWriter != nil && s.audioWriter.TrackID() == trackID { return s.audioWriter @@ -427,6 +394,10 @@ func (s *SDKSource) getWriterForTrack(trackID string) *sdk.AppWriter { return nil } +func (s *SDKSource) onTrackUnsubscribed(_ *webrtc.TrackRemote, pub *lksdk.RemoteTrackPublication, _ *lksdk.RemoteParticipant) { + s.onTrackFinished(pub.SID()) +} + func (s *SDKSource) onTrackFinished(trackID string) { var w *sdk.AppWriter diff --git a/pkg/pipeline/source/sdk/appwriter.go b/pkg/pipeline/source/sdk/appwriter.go index 378873c1..e14600d4 100644 --- a/pkg/pipeline/source/sdk/appwriter.go +++ b/pkg/pipeline/source/sdk/appwriter.go @@ -29,6 +29,7 @@ import ( "github.com/tinyzimmer/go-gst/gst/app" "go.uber.org/atomic" + "github.com/livekit/egress/pkg/config" "github.com/livekit/egress/pkg/errors" "github.com/livekit/egress/pkg/types" "github.com/livekit/protocol/logger" @@ -55,7 +56,6 @@ type AppWriter struct { logger logger.Logger logFile *os.File track *webrtc.TrackRemote - identity string codec types.MimeType src *app.Source startTime time.Time @@ -83,22 +83,19 @@ type AppWriter struct { func NewAppWriter( track *webrtc.TrackRemote, rp *lksdk.RemoteParticipant, - codec types.MimeType, - src *app.Source, + ts *config.TrackSource, sync *synchronizer.Synchronizer, - syncInfo *synchronizer.TrackSynchronizer, writeBlanks bool, logFilename string, ) (*AppWriter, error) { w := &AppWriter{ logger: logger.GetLogger().WithValues("trackID", track.ID(), "kind", track.Kind().String()), track: track, - identity: rp.Identity(), - codec: codec, - src: src, + codec: ts.MimeType, + src: ts.AppSrc, writeBlanks: writeBlanks, sync: sync, - TrackSynchronizer: syncInfo, + TrackSynchronizer: sync.AddTrack(track, rp.Identity()), playing: core.NewFuse(), draining: core.NewFuse(), endStream: core.NewFuse(), @@ -115,28 +112,33 @@ func NewAppWriter( } var depacketizer rtp.Depacketizer - switch codec { - case types.MimeTypeVP8: - depacketizer = &codecs.VP8Packet{} - w.translator = NewVP8Translator(w.logger) - w.sendPLI = func() { rp.WritePLI(track.SSRC()) } + switch ts.MimeType { + case types.MimeTypeOpus: + depacketizer = &codecs.OpusPacket{} + w.translator = NewNullTranslator() case types.MimeTypeH264: depacketizer = &codecs.H264Packet{} w.translator = NewH264Translator() w.sendPLI = func() { rp.WritePLI(track.SSRC()) } - case types.MimeTypeOpus: - depacketizer = &codecs.OpusPacket{} - w.translator = NewOpusTranslator() + case types.MimeTypeVP8: + depacketizer = &codecs.VP8Packet{} + w.translator = NewVP8Translator(w.logger) + w.sendPLI = func() { rp.WritePLI(track.SSRC()) } + + case types.MimeTypeVP9: + depacketizer = &codecs.VP9Packet{} + w.translator = NewNullTranslator() + w.sendPLI = func() { rp.WritePLI(track.SSRC()) } default: - return nil, errors.ErrNotSupported(track.Codec().MimeType) + return nil, errors.ErrNotSupported(string(ts.MimeType)) } w.buffer = jitter.NewBuffer( depacketizer, - track.Codec().ClockRate, + ts.ClockRate, latency, jitter.WithPacketDroppedHandler(w.sendPLI), jitter.WithLogger(w.logger), diff --git a/pkg/pipeline/source/sdk/translator.go b/pkg/pipeline/source/sdk/translator.go index b6f8ebfc..f56c36b8 100644 --- a/pkg/pipeline/source/sdk/translator.go +++ b/pkg/pipeline/source/sdk/translator.go @@ -149,16 +149,16 @@ func (t *H264Translator) UpdateBlankFrame(pkt *rtp.Packet) error { return nil } -// Opus +// Null -type OpusTranslator struct{} +type NullTranslator struct{} -func NewOpusTranslator() Translator { - return &OpusTranslator{} +func NewNullTranslator() Translator { + return &NullTranslator{} } -func (t *OpusTranslator) Translate(_ *rtp.Packet) {} +func (t *NullTranslator) Translate(_ *rtp.Packet) {} -func (t *OpusTranslator) UpdateBlankFrame(_ *rtp.Packet) error { +func (t *NullTranslator) UpdateBlankFrame(_ *rtp.Packet) error { return nil } diff --git a/pkg/types/types.go b/pkg/types/types.go index fceb8e42..9424ccff 100644 --- a/pkg/types/types.go +++ b/pkg/types/types.go @@ -45,6 +45,7 @@ const ( MimeTypeRawAudio MimeType = "audio/x-raw" MimeTypeH264 MimeType = "video/h264" MimeTypeVP8 MimeType = "video/vp8" + MimeTypeVP9 MimeType = "video/vp9" MimeTypeRawVideo MimeType = "video/x-raw" // video profiles @@ -124,6 +125,7 @@ var ( }, OutputTypeIVF: { MimeTypeVP8: true, + MimeTypeVP9: true, }, OutputTypeMP4: { MimeTypeAAC: true, @@ -138,6 +140,7 @@ var ( OutputTypeWebM: { MimeTypeOpus: true, MimeTypeVP8: true, + MimeTypeVP9: true, }, OutputTypeRTMP: { MimeTypeAAC: true, @@ -152,6 +155,7 @@ var ( MimeTypeOpus: true, MimeTypeH264: true, MimeTypeVP8: true, + MimeTypeVP9: true, }, } @@ -175,6 +179,13 @@ var ( AudioVideoFileOutputTypes = []OutputType{ OutputTypeMP4, } + + TrackOutputTypes = map[MimeType]OutputType{ + MimeTypeOpus: OutputTypeOGG, + MimeTypeH264: OutputTypeMP4, + MimeTypeVP8: OutputTypeWebM, + MimeTypeVP9: OutputTypeWebM, + } ) func GetOutputTypeCompatibleWithCodecs(types []OutputType, audioCodecs map[MimeType]bool, videoCodecs map[MimeType]bool) OutputType { diff --git a/test/ffprobe.go b/test/ffprobe.go index 94b00582..ed5abab7 100644 --- a/test/ffprobe.go +++ b/test/ffprobe.go @@ -153,7 +153,7 @@ func verify(t *testing.T, in string, p *config.PipelineConfig, res *livekit.Egre require.Len(t, res.GetSegmentResults(), 1) segments := res.GetSegmentResults()[0] expected := int64(math.Ceil(actual / float64(p.GetSegmentConfig().SegmentDuration))) - require.True(t, segments.SegmentCount == expected || segments.SegmentCount == expected-1) + require.InDelta(t, expected, segments.SegmentCount, 1) case types.EgressTypeWebsocket: size, err := strconv.Atoi(info.Format.Size) @@ -221,6 +221,8 @@ func verify(t *testing.T, in string, p *config.PipelineConfig, res *livekit.Egre } case types.MimeTypeVP8: require.Equal(t, "vp8", stream.CodecName) + case types.MimeTypeVP9: + require.Equal(t, "vp9", stream.CodecName) } switch p.Outputs[egressType].GetOutputType() { diff --git a/test/integration.go b/test/integration.go index bca6fd8b..8c5c20ec 100644 --- a/test/integration.go +++ b/test/integration.go @@ -53,11 +53,13 @@ var ( types.MimeTypeOpus: "/workspace/test/sample/matrix-trailer.ogg", types.MimeTypeH264: "/workspace/test/sample/matrix-trailer.h264", types.MimeTypeVP8: "/workspace/test/sample/matrix-trailer-vp8.ivf", + types.MimeTypeVP9: "/workspace/test/sample/matrix-trailer-vp9.ivf", } frameDurations = map[types.MimeType]time.Duration{ types.MimeTypeH264: time.Microsecond * 41708, types.MimeTypeVP8: time.Microsecond * 41708, + types.MimeTypeVP9: time.Microsecond * 41708, } ) diff --git a/test/track.go b/test/track.go index d1d814c2..af66e6f7 100644 --- a/test/track.go +++ b/test/track.go @@ -61,13 +61,6 @@ func (r *Runner) testTrackFile(t *testing.T) { outputType: types.OutputTypeOGG, filename: "t_{track_source}_{time}.ogg", }, - { - name: "VP8", - videoOnly: true, - videoCodec: types.MimeTypeVP8, - outputType: types.OutputTypeWebM, - filename: "t_{track_type}_{time}.webm", - }, { name: "H264", videoOnly: true, @@ -75,6 +68,20 @@ func (r *Runner) testTrackFile(t *testing.T) { outputType: types.OutputTypeMP4, filename: "t_{track_id}_{time}.mp4", }, + { + name: "VP8", + videoOnly: true, + videoCodec: types.MimeTypeVP8, + outputType: types.OutputTypeWebM, + filename: "t_{track_type}_{time}.webm", + }, + // { + // name: "VP9", + // videoOnly: true, + // videoCodec: types.MimeTypeVP9, + // outputType: types.OutputTypeWebM, + // filename: "t_{track_type}_{time}.webm", + // }, } { r.runTrackTest(t, test.name, test.audioCodec, test.videoCodec, func(t *testing.T, audioTrackID, videoTrackID string) { trackID := audioTrackID