Skip to content

Commit

Permalink
Pipeline/Bin subclassing (#468)
Browse files Browse the repository at this point in the history
* pipeline/bin

* pipeline/bin

* pipeline refactor

* fix streaming

* fix linking

* fix segments

* more pad edge cases

* segment multi broken

* fix multi

* always quit, update comments
  • Loading branch information
frostbyte73 authored Aug 23, 2023
1 parent 918f62c commit 45d36b1
Show file tree
Hide file tree
Showing 38 changed files with 2,283 additions and 2,116 deletions.
4 changes: 2 additions & 2 deletions pkg/config/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type BaseConfig struct {
WsUrl string `yaml:"ws_url"` // (env LIVEKIT_WS_URL)

// optional
Logging logger.Config `yaml:"logging"` // logging config
Logging *logger.Config `yaml:"logging"` // logging config
TemplateBase string `yaml:"template_base"` // custom template base url
BackupStorage string `yaml:"backup_storage"` // backup file location for failed uploads
ClusterID string `yaml:"cluster_id"` // cluster this instance belongs to
Expand Down Expand Up @@ -106,7 +106,7 @@ func (c *BaseConfig) initLogger(values ...interface{}) error {
return err
}

zl, err := logger.NewZapLogger(&c.Logging)
zl, err := logger.NewZapLogger(c.Logging)
if err != nil {
return err
}
Expand Down
24 changes: 9 additions & 15 deletions pkg/config/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/livekit/egress/pkg/types"
"github.com/livekit/protocol/egress"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/rpc"
"github.com/livekit/protocol/tracer"
"github.com/livekit/protocol/utils"
Expand All @@ -51,12 +52,13 @@ type PipelineConfig struct {
SourceConfig `yaml:"-"`
AudioConfig `yaml:"-"`
VideoConfig `yaml:"-"`
*Callbacks `yaml:"-"`

Outputs map[types.EgressType]OutputConfig `yaml:"-"`
OutputCount int `yaml:"-"`
FinalizationRequired bool `yaml:"-"`

OnUpdate func(context.Context, *livekit.EgressInfo) `yaml:"-"`

Info *livekit.EgressInfo `yaml:"-"`
}

Expand Down Expand Up @@ -96,6 +98,7 @@ type TrackSource struct {
MimeType types.MimeType
PayloadType webrtc.PayloadType
ClockRate uint32
EOSFunc func()
}

type AudioConfig struct {
Expand All @@ -119,23 +122,14 @@ type VideoConfig struct {
KeyFrameInterval float64
}

type Callbacks struct {
GstReady chan struct{} `yaml:"-"`
OnTrackMuted func(string) `yaml:"-"`
OnTrackUnmuted func(string) `yaml:"-"`
OnTrackAdded func(*TrackSource) `yaml:"-"`
OnTrackRemoved func(trackID string) `yaml:"-"`
OnUpdate func(context.Context, *livekit.EgressInfo) `yaml:"-"`
OnFailure func(error) `yaml:"-"`
}

func NewPipelineConfig(confString string, req *rpc.StartEgressRequest) (*PipelineConfig, error) {
p := &PipelineConfig{
BaseConfig: BaseConfig{},
Outputs: make(map[types.EgressType]OutputConfig),
Callbacks: &Callbacks{
GstReady: make(chan struct{}),
BaseConfig: BaseConfig{
Logging: &logger.Config{
Level: "info",
},
},
Outputs: make(map[types.EgressType]OutputConfig),
}

if err := yaml.Unmarshal([]byte(confString), p); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/config/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ type CPUCostConfig struct {
func NewServiceConfig(confString string) (*ServiceConfig, error) {
conf := &ServiceConfig{
BaseConfig: BaseConfig{
Logging: logger.Config{
Logging: &logger.Config{
Level: "info",
},
ApiKey: os.Getenv("LIVEKIT_API_KEY"),
Expand Down
13 changes: 13 additions & 0 deletions pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"fmt"
"strings"

"github.com/tinyzimmer/go-gst/gst"

"github.com/livekit/psrpc"
)

Expand All @@ -32,6 +34,7 @@ var (
ErrNoCompatibleFileOutputType = psrpc.NewErrorf(psrpc.InvalidArgument, "no supported file output type is compatible with the selected codecs")
ErrResourceExhausted = psrpc.NewErrorf(psrpc.ResourceExhausted, "not enough CPU")
ErrSubscriptionFailed = psrpc.NewErrorf(psrpc.Internal, "failed to subscribe to track")
ErrPipelineFrozen = psrpc.NewErrorf(psrpc.Internal, "pipeline frozen")
)

func New(err string) error {
Expand Down Expand Up @@ -122,6 +125,10 @@ func ErrProcessStartFailed(err error) error {
return psrpc.NewError(psrpc.Internal, err)
}

func ErrStateChangeFailed(bin string, state gst.State) error {
return psrpc.NewErrorf(psrpc.Internal, "%s failed to change state to %s", bin, state.String())
}

type ErrArray struct {
errs []error
}
Expand All @@ -130,6 +137,12 @@ func (e *ErrArray) AppendErr(err error) {
e.errs = append(e.errs, err)
}

func (e *ErrArray) Check(err error) {
if err != nil {
e.errs = append(e.errs, err)
}
}

func (e *ErrArray) ToError() psrpc.Error {
if len(e.errs) == 0 {
return nil
Expand Down
Loading

0 comments on commit 45d36b1

Please sign in to comment.