From e7cc58aa286c6ac5f7a0c21b0304f5c6f4323597 Mon Sep 17 00:00:00 2001 From: David Colburn Date: Tue, 12 Nov 2024 10:40:41 -0800 Subject: [PATCH] add backup_storage_used to egress info --- go.mod | 2 +- go.sum | 4 +- pkg/config/config_test.go | 3 +- pkg/config/pipeline.go | 7 +- pkg/handler/handler.go | 6 +- pkg/handler/handler_rpc.go | 6 +- pkg/info/info.go | 78 --------------------- pkg/pipeline/controller.go | 31 ++++---- pkg/pipeline/debug.go | 2 +- pkg/pipeline/sink/file.go | 2 +- pkg/pipeline/sink/image.go | 2 +- pkg/pipeline/sink/segments.go | 2 +- pkg/pipeline/sink/sink.go | 6 +- pkg/pipeline/sink/uploader/uploader.go | 20 +++--- pkg/pipeline/sink/uploader/uploader_test.go | 5 +- pkg/pipeline/source/web.go | 3 +- pkg/server/server_rpc.go | 14 ++-- 17 files changed, 56 insertions(+), 137 deletions(-) delete mode 100644 pkg/info/info.go diff --git a/go.mod b/go.mod index 2f1867a8..f365beba 100644 --- a/go.mod +++ b/go.mod @@ -23,7 +23,7 @@ require ( github.com/gorilla/websocket v1.5.3 github.com/livekit/livekit-server v1.7.3-0.20241017190429-44a74fc06ae7 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 - github.com/livekit/protocol v1.27.2-0.20241107143305-3c72d6581b6f + github.com/livekit/protocol v1.27.2-0.20241112182453-391b78ffe9a7 github.com/livekit/psrpc v0.6.1-0.20240924010758-9f0a4268a3b9 github.com/livekit/server-sdk-go/v2 v2.2.2-0.20241015094126-b8538ae5d67b github.com/pion/rtp v1.8.9 diff --git a/go.sum b/go.sum index f0a5d1ac..686f7b00 100644 --- a/go.sum +++ b/go.sum @@ -218,8 +218,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598 h1:yLlkHk2feSLHstD9n4VKg7YEBR4rLODTI4WE8gNBEnQ= github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598/go.mod h1:jwKUCmObuiEDH0iiuJHaGMXwRs3RjrB4G6qqgkr/5oE= -github.com/livekit/protocol v1.27.2-0.20241107143305-3c72d6581b6f h1:JDr/L79siZUP5rFH20QVj1n2ZqwUB9eyRPmFaaeIsQw= -github.com/livekit/protocol v1.27.2-0.20241107143305-3c72d6581b6f/go.mod h1:BrACGxSTlbAe+T9uXLOiiWyYrJ2gNc0mTYmZJPq/4aA= +github.com/livekit/protocol v1.27.2-0.20241112182453-391b78ffe9a7 h1:acdgG8T+8lrgT58KZPq/oRy/Gc8xa8/CKLgGLI0pq/I= +github.com/livekit/protocol v1.27.2-0.20241112182453-391b78ffe9a7/go.mod h1:BrACGxSTlbAe+T9uXLOiiWyYrJ2gNc0mTYmZJPq/4aA= github.com/livekit/psrpc v0.6.1-0.20240924010758-9f0a4268a3b9 h1:33oBjGpVD9tYkDXQU42tnHl8eCX9G6PVUToBVuCUyOs= github.com/livekit/psrpc v0.6.1-0.20240924010758-9f0a4268a3b9/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0= github.com/livekit/server-sdk-go/v2 v2.2.2-0.20241015094126-b8538ae5d67b h1:R1GpKwVbSYsG08k5sIkNCukvnrkOE18R8IO1YeujR8o= diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index 2adc254f..ba7c323e 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -20,7 +20,6 @@ import ( "github.com/stretchr/testify/require" - "github.com/livekit/egress/pkg/info" "github.com/livekit/protocol/livekit" ) @@ -79,7 +78,7 @@ func TestSegmentNaming(t *testing.T) { expectedStorageDir: "conf_test/", expectedPlaylistFilename: "playlist.m3u8", expectedLivePlaylistFilename: "", expectedSegmentPrefix: "conf_test_2/filename", }, } { - p := &PipelineConfig{Info: &info.EgressInfo{EgressId: "egress_ID"}} + p := &PipelineConfig{Info: &livekit.EgressInfo{EgressId: "egress_ID"}} o, err := p.getSegmentConfig(&livekit.SegmentedFileOutput{ FilenamePrefix: test.filenamePrefix, PlaylistName: test.playlistName, diff --git a/pkg/config/pipeline.go b/pkg/config/pipeline.go index 30dfa1b2..27ee368a 100644 --- a/pkg/config/pipeline.go +++ b/pkg/config/pipeline.go @@ -28,7 +28,6 @@ import ( "gopkg.in/yaml.v3" "github.com/livekit/egress/pkg/errors" - "github.com/livekit/egress/pkg/info" "github.com/livekit/egress/pkg/types" "github.com/livekit/protocol/egress" "github.com/livekit/protocol/livekit" @@ -55,8 +54,8 @@ type PipelineConfig struct { OutputCount atomic.Int32 `yaml:"-"` FinalizationRequired bool `yaml:"-"` - Info *info.EgressInfo `yaml:"-"` - Manifest *Manifest `yaml:"-"` + Info *livekit.EgressInfo `yaml:"-"` + Manifest *Manifest `yaml:"-"` } type SourceConfig struct { @@ -164,7 +163,7 @@ func (p *PipelineConfig) Update(request *rpc.StartEgressRequest) error { // start with defaults now := time.Now().UnixNano() - p.Info = &info.EgressInfo{ + p.Info = &livekit.EgressInfo{ EgressId: request.EgressId, RoomId: request.RoomId, Status: livekit.EgressStatus_EGRESS_STARTING, diff --git a/pkg/handler/handler.go b/pkg/handler/handler.go index 2109e8b0..3ef871a9 100644 --- a/pkg/handler/handler.go +++ b/pkg/handler/handler.go @@ -95,7 +95,7 @@ func (h *Handler) Run() { h.initialized.Break() if err != nil { h.conf.Info.SetFailed(err) - _, _ = h.ipcServiceClient.HandlerUpdate(context.Background(), (*livekit.EgressInfo)(h.conf.Info)) + _, _ = h.ipcServiceClient.HandlerUpdate(context.Background(), h.conf.Info) return } @@ -109,7 +109,7 @@ func (h *Handler) Run() { _, _ = h.ipcServiceClient.HandlerFinished(ctx, &ipc.HandlerFinishedRequest{ EgressId: h.conf.Info.EgressId, Metrics: m, - Info: (*livekit.EgressInfo)(res), + Info: res, }) } @@ -118,5 +118,5 @@ func (h *Handler) Kill() { if h.controller == nil { return } - h.controller.SendEOS(context.Background(), "handler killed") + h.controller.SendEOS(context.Background(), livekit.EndReasonKilled) } diff --git a/pkg/handler/handler_rpc.go b/pkg/handler/handler_rpc.go index 2bc76b87..e08c5971 100644 --- a/pkg/handler/handler_rpc.go +++ b/pkg/handler/handler_rpc.go @@ -35,7 +35,7 @@ func (h *Handler) UpdateStream(ctx context.Context, req *livekit.UpdateStreamReq if err != nil { return nil, err } - return (*livekit.EgressInfo)(h.controller.Info), nil + return h.controller.Info, nil } func (h *Handler) StopEgress(ctx context.Context, _ *livekit.StopEgressRequest) (*livekit.EgressInfo, error) { @@ -47,6 +47,6 @@ func (h *Handler) StopEgress(ctx context.Context, _ *livekit.StopEgressRequest) return nil, errors.ErrEgressNotFound } - h.controller.SendEOS(ctx, "StopEgress API") - return (*livekit.EgressInfo)(h.controller.Info), nil + h.controller.SendEOS(ctx, livekit.EndReasonAPI) + return h.controller.Info, nil } diff --git a/pkg/info/info.go b/pkg/info/info.go deleted file mode 100644 index 16d0bb8a..00000000 --- a/pkg/info/info.go +++ /dev/null @@ -1,78 +0,0 @@ -// Copyright 2023 LiveKit, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package info - -import ( - "net/http" - "time" - - "github.com/livekit/egress/pkg/errors" - "github.com/livekit/protocol/livekit" - "github.com/livekit/psrpc" -) - -type EgressInfo livekit.EgressInfo - -const ( - MsgStartNotReceived = "Start signal not received" - MsgLimitReached = "Session limit reached" - MsgLimitReachedWithoutStart = "Session limit reached before start signal" - MsgStoppedBeforeStarted = "Stop called before pipeline could start" -) - -func (e *EgressInfo) UpdateStatus(status livekit.EgressStatus) { - e.Status = status - e.UpdatedAt = time.Now().UnixNano() -} - -func (e *EgressInfo) SetLimitReached() { - now := time.Now().UnixNano() - e.Status = livekit.EgressStatus_EGRESS_LIMIT_REACHED - e.Error = MsgLimitReached - e.ErrorCode = int32(http.StatusRequestEntityTooLarge) - e.UpdatedAt = now - e.EndedAt = now -} - -func (e *EgressInfo) SetAborted(msg string) { - now := time.Now().UnixNano() - e.Status = livekit.EgressStatus_EGRESS_ABORTED - e.Error = msg - e.ErrorCode = int32(http.StatusPreconditionFailed) - e.UpdatedAt = now - e.EndedAt = now -} - -func (e *EgressInfo) SetFailed(err error) { - now := time.Now().UnixNano() - e.Status = livekit.EgressStatus_EGRESS_FAILED - e.UpdatedAt = now - e.EndedAt = now - e.Error = err.Error() - var perr psrpc.Error - if errors.As(err, &perr) { - // unknown is treated the same as an internal error (500) - if !errors.Is(perr.Code(), psrpc.Unknown) { - e.ErrorCode = int32(perr.ToHttp()) - } - } -} - -func (e *EgressInfo) SetComplete() { - now := time.Now().UnixNano() - e.Status = livekit.EgressStatus_EGRESS_COMPLETE - e.UpdatedAt = now - e.EndedAt = now -} diff --git a/pkg/pipeline/controller.go b/pkg/pipeline/controller.go index d6e42ee6..541fd198 100644 --- a/pkg/pipeline/controller.go +++ b/pkg/pipeline/controller.go @@ -29,7 +29,6 @@ import ( "github.com/livekit/egress/pkg/config" "github.com/livekit/egress/pkg/errors" "github.com/livekit/egress/pkg/gstreamer" - "github.com/livekit/egress/pkg/info" "github.com/livekit/egress/pkg/ipc" "github.com/livekit/egress/pkg/pipeline/builder" "github.com/livekit/egress/pkg/pipeline/sink" @@ -196,7 +195,7 @@ func (c *Controller) BuildPipeline() error { return nil } -func (c *Controller) Run(ctx context.Context) *info.EgressInfo { +func (c *Controller) Run(ctx context.Context) *livekit.EgressInfo { ctx, span := tracer.Start(ctx, "Pipeline.Run") defer span.End() @@ -208,7 +207,7 @@ func (c *Controller) Run(ctx context.Context) *info.EgressInfo { // close when room ends go func() { <-c.src.EndRecording() - c.SendEOS(ctx, "source closed") + c.SendEOS(ctx, livekit.EndReasonSrcClosed) }() // wait until room is ready @@ -218,7 +217,7 @@ func (c *Controller) Run(ctx context.Context) *info.EgressInfo { select { case <-c.stopped.Watch(): c.src.Close() - c.Info.SetAborted(info.MsgStartNotReceived) + c.Info.SetAborted(livekit.MsgStartNotReceived) return c.Info case <-start: // continue @@ -280,7 +279,7 @@ func (c *Controller) UpdateStream(ctx context.Context, req *livekit.UpdateStream // add stream info to results c.mu.Lock() c.Info.StreamResults = append(c.Info.StreamResults, stream.StreamInfo) - if list := (*livekit.EgressInfo)(c.Info).GetStream(); list != nil { + if list := c.Info.GetStream(); list != nil { list.Info = append(list.Info, stream.StreamInfo) } c.mu.Unlock() @@ -325,7 +324,7 @@ func (c *Controller) streamFinished(ctx context.Context, stream *config.Stream) // end egress if no outputs remaining if c.OutputCount.Load() == 0 { - c.SendEOS(ctx, "all streams removed") + c.SendEOS(ctx, livekit.EndReasonStreamsStopped) return nil } @@ -368,7 +367,7 @@ func (c *Controller) onEOSSent() { // made it through the pipeline by the time endRecording is closed if c.SourceType == types.SourceTypeSDK && !c.AudioEnabled { // this will not actually send a second EOS, but will make sure everything is in the correct state - c.SendEOS(context.Background(), "source closed") + c.SendEOS(context.Background(), livekit.EndReasonSrcClosed) } } @@ -381,12 +380,12 @@ func (c *Controller) SendEOS(ctx context.Context, reason string) { c.limitTimer.Stop() } - c.Info.Details = fmt.Sprintf("end reason: %s", reason) + c.Info.SetEndReason(reason) logger.Debugw("stopping pipeline", "reason", reason) switch c.Info.Status { case livekit.EgressStatus_EGRESS_STARTING: - c.Info.SetAborted(info.MsgStoppedBeforeStarted) + c.Info.SetAborted(livekit.MsgStoppedBeforeStarted) c.p.Stop() case livekit.EgressStatus_EGRESS_ABORTED, @@ -395,11 +394,11 @@ func (c *Controller) SendEOS(ctx context.Context, reason string) { case livekit.EgressStatus_EGRESS_ACTIVE: c.Info.UpdateStatus(livekit.EgressStatus_EGRESS_ENDING) - _, _ = c.ipcServiceClient.HandlerUpdate(ctx, (*livekit.EgressInfo)(c.Info)) + _, _ = c.ipcServiceClient.HandlerUpdate(ctx, c.Info) c.sendEOS() case livekit.EgressStatus_EGRESS_ENDING: - _, _ = c.ipcServiceClient.HandlerUpdate(ctx, (*livekit.EgressInfo)(c.Info)) + _, _ = c.ipcServiceClient.HandlerUpdate(ctx, c.Info) c.sendEOS() case livekit.EgressStatus_EGRESS_LIMIT_REACHED: @@ -454,7 +453,7 @@ func (c *Controller) Close() { // ensure egress ends with a final state switch c.Info.Status { case livekit.EgressStatus_EGRESS_STARTING: - c.Info.SetAborted(info.MsgStoppedBeforeStarted) + c.Info.SetAborted(livekit.MsgStoppedBeforeStarted) case livekit.EgressStatus_EGRESS_ACTIVE, livekit.EgressStatus_EGRESS_ENDING: @@ -492,13 +491,13 @@ func (c *Controller) startSessionLimitTimer(ctx context.Context) { c.limitTimer = time.AfterFunc(timeout, func() { switch c.Info.Status { case livekit.EgressStatus_EGRESS_STARTING: - c.Info.SetAborted(info.MsgLimitReachedWithoutStart) + c.Info.SetAborted(livekit.MsgLimitReachedWithoutStart) case livekit.EgressStatus_EGRESS_ACTIVE: c.Info.SetLimitReached() } if c.playing.IsBroken() { - c.SendEOS(ctx, "time limit reached") + c.SendEOS(ctx, livekit.EndReasonLimitReached) } else { c.p.Stop() } @@ -538,7 +537,7 @@ func (c *Controller) updateStartTime(startedAt int64) { if c.Info.Status == livekit.EgressStatus_EGRESS_STARTING { c.Info.UpdateStatus(livekit.EgressStatus_EGRESS_ACTIVE) - _, _ = c.ipcServiceClient.HandlerUpdate(context.Background(), (*livekit.EgressInfo)(c.Info)) + _, _ = c.ipcServiceClient.HandlerUpdate(context.Background(), c.Info) } } @@ -576,7 +575,7 @@ func (c *Controller) streamUpdated(ctx context.Context) { } } - _, _ = c.ipcServiceClient.HandlerUpdate(ctx, (*livekit.EgressInfo)(c.Info)) + _, _ = c.ipcServiceClient.HandlerUpdate(ctx, c.Info) } func (c *Controller) updateEndTime() { diff --git a/pkg/pipeline/debug.go b/pkg/pipeline/debug.go index 4ccb9d4a..191215a7 100644 --- a/pkg/pipeline/debug.go +++ b/pkg/pipeline/debug.go @@ -36,7 +36,7 @@ func (c *Controller) GetGstPipelineDebugDot() string { } func (c *Controller) uploadDebugFiles() { - u, err := uploader.New(&c.Debug.StorageConfig, nil, c.monitor) + u, err := uploader.New(&c.Debug.StorageConfig, nil, c.monitor, nil) if err != nil { logger.Errorw("failed to create uploader", err) return diff --git a/pkg/pipeline/sink/file.go b/pkg/pipeline/sink/file.go index a4f71ef0..bab3f08a 100644 --- a/pkg/pipeline/sink/file.go +++ b/pkg/pipeline/sink/file.go @@ -58,7 +58,7 @@ func (s *FileSink) Close() error { } func (s *FileSink) UploadManifest(filepath string) (string, bool, error) { - if s.DisableManifest && !s.ManifestRequired() { + if s.DisableManifest && !s.conf.Info.BackupStorageUsed { return "", false, nil } diff --git a/pkg/pipeline/sink/image.go b/pkg/pipeline/sink/image.go index deeaefcf..4124bddb 100644 --- a/pkg/pipeline/sink/image.go +++ b/pkg/pipeline/sink/image.go @@ -151,7 +151,7 @@ func (s *ImageSink) Close() error { } func (s *ImageSink) UploadManifest(filepath string) (string, bool, error) { - if s.DisableManifest && !s.ManifestRequired() { + if s.DisableManifest && !s.conf.Info.BackupStorageUsed { return "", false, nil } diff --git a/pkg/pipeline/sink/segments.go b/pkg/pipeline/sink/segments.go index 377418d9..3c3d7397 100644 --- a/pkg/pipeline/sink/segments.go +++ b/pkg/pipeline/sink/segments.go @@ -322,7 +322,7 @@ func (s *SegmentSink) Close() error { } func (s *SegmentSink) UploadManifest(filepath string) (string, bool, error) { - if s.DisableManifest && !s.ManifestRequired() { + if s.DisableManifest && !s.conf.Info.BackupStorageUsed { return "", false, nil } diff --git a/pkg/pipeline/sink/sink.go b/pkg/pipeline/sink/sink.go index d33934c5..c730f9b1 100644 --- a/pkg/pipeline/sink/sink.go +++ b/pkg/pipeline/sink/sink.go @@ -41,7 +41,7 @@ func CreateSinks(p *config.PipelineConfig, callbacks *gstreamer.Callbacks, monit case types.EgressTypeFile: o := c[0].(*config.FileConfig) - u, err := uploader.New(o.StorageConfig, p.BackupConfig, monitor) + u, err := uploader.New(o.StorageConfig, p.BackupConfig, monitor, p.Info) if err != nil { return nil, err } @@ -51,7 +51,7 @@ func CreateSinks(p *config.PipelineConfig, callbacks *gstreamer.Callbacks, monit case types.EgressTypeSegments: o := c[0].(*config.SegmentConfig) - u, err := uploader.New(o.StorageConfig, p.BackupConfig, monitor) + u, err := uploader.New(o.StorageConfig, p.BackupConfig, monitor, p.Info) if err != nil { return nil, err } @@ -75,7 +75,7 @@ func CreateSinks(p *config.PipelineConfig, callbacks *gstreamer.Callbacks, monit for _, ci := range c { o := ci.(*config.ImageConfig) - u, err := uploader.New(o.StorageConfig, p.BackupConfig, monitor) + u, err := uploader.New(o.StorageConfig, p.BackupConfig, monitor, p.Info) if err != nil { return nil, err } diff --git a/pkg/pipeline/sink/uploader/uploader.go b/pkg/pipeline/sink/uploader/uploader.go index 681af39b..4569c381 100644 --- a/pkg/pipeline/sink/uploader/uploader.go +++ b/pkg/pipeline/sink/uploader/uploader.go @@ -21,6 +21,7 @@ import ( "github.com/livekit/egress/pkg/config" "github.com/livekit/egress/pkg/stats" "github.com/livekit/egress/pkg/types" + "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" "github.com/livekit/psrpc" ) @@ -36,13 +37,13 @@ type uploader interface { } type Uploader struct { - primary uploader - backup uploader - backupUsed bool - monitor *stats.HandlerMonitor + primary uploader + backup uploader + info *livekit.EgressInfo + monitor *stats.HandlerMonitor } -func New(conf, backup *config.StorageConfig, monitor *stats.HandlerMonitor) (*Uploader, error) { +func New(conf, backup *config.StorageConfig, monitor *stats.HandlerMonitor, info *livekit.EgressInfo) (*Uploader, error) { p, err := getUploader(conf) if err != nil { return nil, err @@ -51,6 +52,7 @@ func New(conf, backup *config.StorageConfig, monitor *stats.HandlerMonitor) (*Up u := &Uploader{ primary: p, monitor: monitor, + info: info, } if backup != nil { @@ -109,7 +111,9 @@ func (u *Uploader) Upload( if u.backup != nil { location, size, backupErr := u.backup.upload(localFilepath, storageFilepath, outputType) if backupErr == nil { - u.backupUsed = true + if u.info != nil { + u.info.SetBackupUsed() + } if u.monitor != nil { u.monitor.IncBackupStorageWrites(string(outputType)) } @@ -125,7 +129,3 @@ func (u *Uploader) Upload( return "", 0, primaryErr } - -func (u *Uploader) ManifestRequired() bool { - return u.backupUsed -} diff --git a/pkg/pipeline/sink/uploader/uploader_test.go b/pkg/pipeline/sink/uploader/uploader_test.go index 07af5c9e..23e508b8 100644 --- a/pkg/pipeline/sink/uploader/uploader_test.go +++ b/pkg/pipeline/sink/uploader/uploader_test.go @@ -10,6 +10,7 @@ import ( "github.com/stretchr/testify/require" "github.com/livekit/egress/pkg/config" + "github.com/livekit/protocol/livekit" ) func TestUploader(t *testing.T) { @@ -37,7 +38,8 @@ func TestUploader(t *testing.T) { GeneratePresignedUrl: true, } - u, err := New(primary, backup, nil) + info := &livekit.EgressInfo{} + u, err := New(primary, backup, nil, info) require.NoError(t, err) filepath := "uploader_test.go" @@ -48,6 +50,7 @@ func TestUploader(t *testing.T) { require.NotZero(t, size) require.NotEmpty(t, location) + require.True(t, info.BackupStorageUsed) response, err := http.Get(location) require.NoError(t, err) diff --git a/pkg/pipeline/source/web.go b/pkg/pipeline/source/web.go index 4f9e97d1..c6aedfc9 100644 --- a/pkg/pipeline/source/web.go +++ b/pkg/pipeline/source/web.go @@ -30,7 +30,6 @@ import ( "github.com/livekit/egress/pkg/config" "github.com/livekit/egress/pkg/errors" - "github.com/livekit/egress/pkg/info" "github.com/livekit/protocol/logger" "github.com/livekit/protocol/tracer" ) @@ -51,7 +50,7 @@ type WebSource struct { startRecording chan struct{} endRecording chan struct{} - info *info.EgressInfo + info *livekit.EgressInfo } func init() { diff --git a/pkg/server/server_rpc.go b/pkg/server/server_rpc.go index 9c737f78..60f6bfc8 100644 --- a/pkg/server/server_rpc.go +++ b/pkg/server/server_rpc.go @@ -73,27 +73,25 @@ func (s *Server) StartEgress(ctx context.Context, req *rpc.StartEgressRequest) ( "request", p.Info.Request, ) - info := (*livekit.EgressInfo)(p.Info) - - errChan := s.ioClient.CreateEgress(ctx, info) - launchErr := s.launchProcess(req, info) + errChan := s.ioClient.CreateEgress(ctx, p.Info) + launchErr := s.launchProcess(req, p.Info) createErr := <-errChan if launchErr != nil { if createErr == nil { // send failed update if it was saved to db - s.processEnded(req, info, launchErr) + s.processEnded(req, p.Info, launchErr) } return nil, launchErr } else if createErr != nil { // launched but failed to save - abort and return error - info.Error = createErr.Error() - info.ErrorCode = int32(http.StatusInternalServerError) + p.Info.Error = createErr.Error() + p.Info.ErrorCode = int32(http.StatusInternalServerError) s.AbortProcess(req.EgressId, createErr) return nil, createErr } - return (*livekit.EgressInfo)(p.Info), nil + return p.Info, nil } func (s *Server) launchProcess(req *rpc.StartEgressRequest, info *livekit.EgressInfo) error {