Skip to content

Commit

Permalink
update error codes (#797)
Browse files Browse the repository at this point in the history
  • Loading branch information
frostbyte73 authored Oct 30, 2024
1 parent ea5cdb3 commit e4fd0e0
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 21 deletions.
10 changes: 9 additions & 1 deletion pkg/config/manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package config

import (
"bytes"
"encoding/json"
"sync"
"time"
Expand Down Expand Up @@ -153,5 +154,12 @@ func (m *Manifest) AddImage(filename string, ts time.Time, location, presignedUr
func (m *Manifest) Close(endedAt int64) ([]byte, error) {
m.EndedAt = endedAt

return json.Marshal(m)
buf := bytes.NewBuffer(nil)
enc := json.NewEncoder(buf)
enc.SetEscapeHTML(false)
if err := enc.Encode(m); err != nil {
return nil, err
}

return buf.Bytes(), nil
}
3 changes: 2 additions & 1 deletion pkg/pipeline/builder/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/go-gst/go-gst/gst"
"github.com/go-gst/go-gst/gst/app"

"github.com/livekit/egress/pkg/errors"
"github.com/livekit/egress/pkg/gstreamer"
)

Expand All @@ -26,7 +27,7 @@ func BuildWebsocketBin(pipeline *gstreamer.Pipeline, appSinkCallbacks *app.SinkC

appSink, err := app.NewAppSink()
if err != nil {
return nil, err
return nil, errors.ErrGstPipelineError(err)
}
appSink.SetCallbacks(appSinkCallbacks)

Expand Down
3 changes: 2 additions & 1 deletion pkg/pipeline/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/tracer"
"github.com/livekit/psrpc"
)

const (
Expand Down Expand Up @@ -349,7 +350,7 @@ func (c *Controller) streamFailed(ctx context.Context, stream *config.Stream, st

// fail egress if no outputs remaining
if c.OutputCount.Load() == 0 {
return streamErr
return psrpc.NewError(psrpc.Unavailable, streamErr)
}

logger.Infow("stream failed",
Expand Down
6 changes: 3 additions & 3 deletions pkg/pipeline/sink/uploader/gcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func newGCPUploader(c *config.StorageConfig) (uploader, error) {
if conf.CredentialsJSON != "" {
jwtConfig, err := google.JWTConfigFromJSON([]byte(conf.CredentialsJSON), storageScope)
if err != nil {
return nil, err
return nil, errors.ErrUploadFailed("GCP", err)
}
opts = append(opts, option.WithTokenSource(jwtConfig.TokenSource(context.Background())))
}
Expand All @@ -80,12 +80,12 @@ func newGCPUploader(c *config.StorageConfig) (uploader, error) {
defaultTransport.ProxyConnectHeader.Add("Proxy-Authorization", basicAuth)
}
}
client, err := storage.NewClient(context.Background(), opts...)

client, err := storage.NewClient(context.Background(), opts...)
// restore default transport
http.DefaultTransport = transportClone
if err != nil {
return nil, err
return nil, errors.ErrUploadFailed("GCP", err)
}

u.client = client
Expand Down
5 changes: 3 additions & 2 deletions pkg/pipeline/sink/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/livekit/egress/pkg/gstreamer"
"github.com/livekit/egress/pkg/types"
"github.com/livekit/protocol/logger"
"github.com/livekit/psrpc"
)

const pingPeriod = time.Second * 30
Expand All @@ -56,7 +57,7 @@ func newWebsocketSink(o *config.StreamConfig, mimeType types.MimeType, callbacks

conn, _, err := websocket.DefaultDialer.Dial(wsUrl, header)
if err != nil {
return nil, err
return nil, psrpc.NewError(psrpc.InvalidArgument, err)
}

s := &WebsocketSink{
Expand Down Expand Up @@ -88,7 +89,7 @@ func newWebsocketSink(o *config.StreamConfig, mimeType types.MimeType, callbacks
if err == io.EOF {
return gst.FlowEOS
}
callbacks.OnError(err)
callbacks.OnError(psrpc.NewError(psrpc.Unavailable, err))
}

return gst.FlowOK
Expand Down
37 changes: 24 additions & 13 deletions pkg/pipeline/source/sdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ type SDKSource struct {
mu sync.RWMutex
initialized core.Fuse
filenameReplacements map[string]string
errors chan error
errors chan *subscriptionInfo

writers map[string]*sdk.AppWriter
subLock sync.RWMutex
Expand All @@ -65,6 +65,11 @@ type SDKSource struct {
endRecording chan struct{}
}

type subscriptionInfo struct {
trackID string
err error
}

func NewSDKSource(ctx context.Context, p *config.PipelineConfig, callbacks *gstreamer.Callbacks) (*SDKSource, error) {
ctx, span := tracer.Start(ctx, "SDKInput.New")
defer span.End()
Expand All @@ -77,7 +82,7 @@ func NewSDKSource(ctx context.Context, p *config.PipelineConfig, callbacks *gstr
close(startRecording)
}),
filenameReplacements: make(map[string]string),
errors: make(chan error, 2),
errors: make(chan *subscriptionInfo, 2),
writers: make(map[string]*sdk.AppWriter),
startRecording: startRecording,
endRecording: make(chan struct{}),
Expand Down Expand Up @@ -224,9 +229,9 @@ func (s *SDKSource) awaitParticipantTracks(identity string) (uint32, uint32, err
done := false
for !done {
select {
case err = <-s.errors:
if err != nil {
return 0, 0, err
case sub := <-s.errors:
if sub.err != nil {
return 0, 0, sub.err
}
subscribed++
if subscribed == expected {
Expand All @@ -244,9 +249,9 @@ func (s *SDKSource) awaitParticipantTracks(identity string) (uint32, uint32, err
for {
select {
// check errors from any tracks published in the meantime
case err = <-s.errors:
if err != nil {
return 0, 0, err
case sub := <-s.errors:
if sub.err != nil {
return 0, 0, sub.err
}
default:
// get dimensions after subscribing so that track info exists
Expand Down Expand Up @@ -289,12 +294,15 @@ func (s *SDKSource) awaitTracks(expecting map[string]struct{}) (uint32, uint32,

for i := 0; i < trackCount; i++ {
select {
case err = <-s.errors:
if err != nil {
return 0, 0, err
case sub := <-s.errors:
if sub.err != nil {
return 0, 0, sub.err
}
delete(expecting, sub.trackID)
case <-deadline:
return 0, 0, errors.ErrSubscriptionFailed
for trackID := range expecting {
return 0, 0, errors.ErrTrackNotFound(trackID)
}
}
}

Expand Down Expand Up @@ -380,7 +388,10 @@ func (s *SDKSource) onTrackSubscribed(track *webrtc.TrackRemote, pub *lksdk.Remo
s.callbacks.OnError(onSubscribeErr)
}
} else {
s.errors <- onSubscribeErr
s.errors <- &subscriptionInfo{
trackID: pub.SID(),
err: onSubscribeErr,
}
}
s.subLock.RUnlock()
}()
Expand Down

0 comments on commit e4fd0e0

Please sign in to comment.