Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update error codes #797

Merged
merged 2 commits into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pkg/pipeline/builder/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/go-gst/go-gst/gst"
"github.com/go-gst/go-gst/gst/app"

"github.com/livekit/egress/pkg/errors"
"github.com/livekit/egress/pkg/gstreamer"
)

Expand All @@ -26,7 +27,7 @@ func BuildWebsocketBin(pipeline *gstreamer.Pipeline, appSinkCallbacks *app.SinkC

appSink, err := app.NewAppSink()
if err != nil {
return nil, err
return nil, errors.ErrGstPipelineError(err)
}
appSink.SetCallbacks(appSinkCallbacks)

Expand Down
3 changes: 2 additions & 1 deletion pkg/pipeline/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/tracer"
"github.com/livekit/psrpc"
)

const (
Expand Down Expand Up @@ -349,7 +350,7 @@ func (c *Controller) streamFailed(ctx context.Context, stream *config.Stream, st

// fail egress if no outputs remaining
if c.OutputCount.Load() == 0 {
return streamErr
return psrpc.NewError(psrpc.Unavailable, streamErr)
}

logger.Infow("stream failed",
Expand Down
6 changes: 3 additions & 3 deletions pkg/pipeline/sink/uploader/gcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func newGCPUploader(c *config.StorageConfig) (uploader, error) {
if conf.CredentialsJSON != "" {
jwtConfig, err := google.JWTConfigFromJSON([]byte(conf.CredentialsJSON), storageScope)
if err != nil {
return nil, err
return nil, errors.ErrUploadFailed("GCP", err)
}
opts = append(opts, option.WithTokenSource(jwtConfig.TokenSource(context.Background())))
}
Expand All @@ -80,12 +80,12 @@ func newGCPUploader(c *config.StorageConfig) (uploader, error) {
defaultTransport.ProxyConnectHeader.Add("Proxy-Authorization", basicAuth)
}
}
client, err := storage.NewClient(context.Background(), opts...)

client, err := storage.NewClient(context.Background(), opts...)
// restore default transport
http.DefaultTransport = transportClone
if err != nil {
return nil, err
return nil, errors.ErrUploadFailed("GCP", err)
}

u.client = client
Expand Down
5 changes: 3 additions & 2 deletions pkg/pipeline/sink/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/livekit/egress/pkg/gstreamer"
"github.com/livekit/egress/pkg/types"
"github.com/livekit/protocol/logger"
"github.com/livekit/psrpc"
)

const pingPeriod = time.Second * 30
Expand All @@ -56,7 +57,7 @@ func newWebsocketSink(o *config.StreamConfig, mimeType types.MimeType, callbacks

conn, _, err := websocket.DefaultDialer.Dial(wsUrl, header)
if err != nil {
return nil, err
return nil, psrpc.NewError(psrpc.InvalidArgument, err)
}

s := &WebsocketSink{
Expand Down Expand Up @@ -88,7 +89,7 @@ func newWebsocketSink(o *config.StreamConfig, mimeType types.MimeType, callbacks
if err == io.EOF {
return gst.FlowEOS
}
callbacks.OnError(err)
callbacks.OnError(psrpc.NewError(psrpc.Unavailable, err))
}

return gst.FlowOK
Expand Down
37 changes: 24 additions & 13 deletions pkg/pipeline/source/sdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ type SDKSource struct {
mu sync.RWMutex
initialized core.Fuse
filenameReplacements map[string]string
errors chan error
errors chan *subscriptionInfo

writers map[string]*sdk.AppWriter
subLock sync.RWMutex
Expand All @@ -65,6 +65,11 @@ type SDKSource struct {
endRecording chan struct{}
}

type subscriptionInfo struct {
trackID string
err error
}

func NewSDKSource(ctx context.Context, p *config.PipelineConfig, callbacks *gstreamer.Callbacks) (*SDKSource, error) {
ctx, span := tracer.Start(ctx, "SDKInput.New")
defer span.End()
Expand All @@ -77,7 +82,7 @@ func NewSDKSource(ctx context.Context, p *config.PipelineConfig, callbacks *gstr
close(startRecording)
}),
filenameReplacements: make(map[string]string),
errors: make(chan error, 2),
errors: make(chan *subscriptionInfo, 2),
writers: make(map[string]*sdk.AppWriter),
startRecording: startRecording,
endRecording: make(chan struct{}),
Expand Down Expand Up @@ -224,9 +229,9 @@ func (s *SDKSource) awaitParticipantTracks(identity string) (uint32, uint32, err
done := false
for !done {
select {
case err = <-s.errors:
if err != nil {
return 0, 0, err
case sub := <-s.errors:
if sub.err != nil {
return 0, 0, sub.err
}
subscribed++
if subscribed == expected {
Expand All @@ -244,9 +249,9 @@ func (s *SDKSource) awaitParticipantTracks(identity string) (uint32, uint32, err
for {
select {
// check errors from any tracks published in the meantime
case err = <-s.errors:
if err != nil {
return 0, 0, err
case sub := <-s.errors:
if sub.err != nil {
return 0, 0, sub.err
}
default:
// get dimensions after subscribing so that track info exists
Expand Down Expand Up @@ -289,12 +294,15 @@ func (s *SDKSource) awaitTracks(expecting map[string]struct{}) (uint32, uint32,

for i := 0; i < trackCount; i++ {
select {
case err = <-s.errors:
if err != nil {
return 0, 0, err
case sub := <-s.errors:
if sub.err != nil {
return 0, 0, sub.err
}
delete(expecting, sub.trackID)
case <-deadline:
return 0, 0, errors.ErrSubscriptionFailed
for trackID := range expecting {
return 0, 0, errors.ErrTrackNotFound(trackID)
}
}
}

Expand Down Expand Up @@ -380,7 +388,10 @@ func (s *SDKSource) onTrackSubscribed(track *webrtc.TrackRemote, pub *lksdk.Remo
s.callbacks.OnError(onSubscribeErr)
}
} else {
s.errors <- onSubscribeErr
s.errors <- &subscriptionInfo{
trackID: pub.SID(),
err: onSubscribeErr,
}
}
s.subLock.RUnlock()
}()
Expand Down