Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add backup_storage_used to egress info #807

Merged
merged 2 commits into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ require (
github.com/gorilla/websocket v1.5.3
github.com/livekit/livekit-server v1.7.3-0.20241017190429-44a74fc06ae7
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1
github.com/livekit/protocol v1.27.2-0.20241107143305-3c72d6581b6f
github.com/livekit/protocol v1.27.2-0.20241112182453-391b78ffe9a7
github.com/livekit/psrpc v0.6.1-0.20240924010758-9f0a4268a3b9
github.com/livekit/server-sdk-go/v2 v2.2.2-0.20241015094126-b8538ae5d67b
github.com/pion/rtp v1.8.9
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598 h1:yLlkHk2feSLHstD9n4VKg7YEBR4rLODTI4WE8gNBEnQ=
github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598/go.mod h1:jwKUCmObuiEDH0iiuJHaGMXwRs3RjrB4G6qqgkr/5oE=
github.com/livekit/protocol v1.27.2-0.20241107143305-3c72d6581b6f h1:JDr/L79siZUP5rFH20QVj1n2ZqwUB9eyRPmFaaeIsQw=
github.com/livekit/protocol v1.27.2-0.20241107143305-3c72d6581b6f/go.mod h1:BrACGxSTlbAe+T9uXLOiiWyYrJ2gNc0mTYmZJPq/4aA=
github.com/livekit/protocol v1.27.2-0.20241112182453-391b78ffe9a7 h1:acdgG8T+8lrgT58KZPq/oRy/Gc8xa8/CKLgGLI0pq/I=
github.com/livekit/protocol v1.27.2-0.20241112182453-391b78ffe9a7/go.mod h1:BrACGxSTlbAe+T9uXLOiiWyYrJ2gNc0mTYmZJPq/4aA=
github.com/livekit/psrpc v0.6.1-0.20240924010758-9f0a4268a3b9 h1:33oBjGpVD9tYkDXQU42tnHl8eCX9G6PVUToBVuCUyOs=
github.com/livekit/psrpc v0.6.1-0.20240924010758-9f0a4268a3b9/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0=
github.com/livekit/server-sdk-go/v2 v2.2.2-0.20241015094126-b8538ae5d67b h1:R1GpKwVbSYsG08k5sIkNCukvnrkOE18R8IO1YeujR8o=
Expand Down
3 changes: 1 addition & 2 deletions pkg/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (

"github.com/stretchr/testify/require"

"github.com/livekit/egress/pkg/info"
"github.com/livekit/protocol/livekit"
)

Expand Down Expand Up @@ -79,7 +78,7 @@ func TestSegmentNaming(t *testing.T) {
expectedStorageDir: "conf_test/", expectedPlaylistFilename: "playlist.m3u8", expectedLivePlaylistFilename: "", expectedSegmentPrefix: "conf_test_2/filename",
},
} {
p := &PipelineConfig{Info: &info.EgressInfo{EgressId: "egress_ID"}}
p := &PipelineConfig{Info: &livekit.EgressInfo{EgressId: "egress_ID"}}
o, err := p.getSegmentConfig(&livekit.SegmentedFileOutput{
FilenamePrefix: test.filenamePrefix,
PlaylistName: test.playlistName,
Expand Down
7 changes: 3 additions & 4 deletions pkg/config/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"gopkg.in/yaml.v3"

"github.com/livekit/egress/pkg/errors"
"github.com/livekit/egress/pkg/info"
"github.com/livekit/egress/pkg/types"
"github.com/livekit/protocol/egress"
"github.com/livekit/protocol/livekit"
Expand All @@ -55,8 +54,8 @@ type PipelineConfig struct {
OutputCount atomic.Int32 `yaml:"-"`
FinalizationRequired bool `yaml:"-"`

Info *info.EgressInfo `yaml:"-"`
Manifest *Manifest `yaml:"-"`
Info *livekit.EgressInfo `yaml:"-"`
Manifest *Manifest `yaml:"-"`
}

type SourceConfig struct {
Expand Down Expand Up @@ -164,7 +163,7 @@ func (p *PipelineConfig) Update(request *rpc.StartEgressRequest) error {

// start with defaults
now := time.Now().UnixNano()
p.Info = &info.EgressInfo{
p.Info = &livekit.EgressInfo{
EgressId: request.EgressId,
RoomId: request.RoomId,
Status: livekit.EgressStatus_EGRESS_STARTING,
Expand Down
6 changes: 3 additions & 3 deletions pkg/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (h *Handler) Run() {
h.initialized.Break()
if err != nil {
h.conf.Info.SetFailed(err)
_, _ = h.ipcServiceClient.HandlerUpdate(context.Background(), (*livekit.EgressInfo)(h.conf.Info))
_, _ = h.ipcServiceClient.HandlerUpdate(context.Background(), h.conf.Info)
return
}

Expand All @@ -109,7 +109,7 @@ func (h *Handler) Run() {
_, _ = h.ipcServiceClient.HandlerFinished(ctx, &ipc.HandlerFinishedRequest{
EgressId: h.conf.Info.EgressId,
Metrics: m,
Info: (*livekit.EgressInfo)(res),
Info: res,
})
}

Expand All @@ -118,5 +118,5 @@ func (h *Handler) Kill() {
if h.controller == nil {
return
}
h.controller.SendEOS(context.Background(), "handler killed")
h.controller.SendEOS(context.Background(), livekit.EndReasonKilled)
}
6 changes: 3 additions & 3 deletions pkg/handler/handler_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (h *Handler) UpdateStream(ctx context.Context, req *livekit.UpdateStreamReq
if err != nil {
return nil, err
}
return (*livekit.EgressInfo)(h.controller.Info), nil
return h.controller.Info, nil
}

func (h *Handler) StopEgress(ctx context.Context, _ *livekit.StopEgressRequest) (*livekit.EgressInfo, error) {
Expand All @@ -47,6 +47,6 @@ func (h *Handler) StopEgress(ctx context.Context, _ *livekit.StopEgressRequest)
return nil, errors.ErrEgressNotFound
}

h.controller.SendEOS(ctx, "StopEgress API")
return (*livekit.EgressInfo)(h.controller.Info), nil
h.controller.SendEOS(ctx, livekit.EndReasonAPI)
return h.controller.Info, nil
}
78 changes: 0 additions & 78 deletions pkg/info/info.go

This file was deleted.

31 changes: 15 additions & 16 deletions pkg/pipeline/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/livekit/egress/pkg/config"
"github.com/livekit/egress/pkg/errors"
"github.com/livekit/egress/pkg/gstreamer"
"github.com/livekit/egress/pkg/info"
"github.com/livekit/egress/pkg/ipc"
"github.com/livekit/egress/pkg/pipeline/builder"
"github.com/livekit/egress/pkg/pipeline/sink"
Expand Down Expand Up @@ -196,7 +195,7 @@ func (c *Controller) BuildPipeline() error {
return nil
}

func (c *Controller) Run(ctx context.Context) *info.EgressInfo {
func (c *Controller) Run(ctx context.Context) *livekit.EgressInfo {
ctx, span := tracer.Start(ctx, "Pipeline.Run")
defer span.End()

Expand All @@ -208,7 +207,7 @@ func (c *Controller) Run(ctx context.Context) *info.EgressInfo {
// close when room ends
go func() {
<-c.src.EndRecording()
c.SendEOS(ctx, "source closed")
c.SendEOS(ctx, livekit.EndReasonSrcClosed)
}()

// wait until room is ready
Expand All @@ -218,7 +217,7 @@ func (c *Controller) Run(ctx context.Context) *info.EgressInfo {
select {
case <-c.stopped.Watch():
c.src.Close()
c.Info.SetAborted(info.MsgStartNotReceived)
c.Info.SetAborted(livekit.MsgStartNotReceived)
return c.Info
case <-start:
// continue
Expand Down Expand Up @@ -280,7 +279,7 @@ func (c *Controller) UpdateStream(ctx context.Context, req *livekit.UpdateStream
// add stream info to results
c.mu.Lock()
c.Info.StreamResults = append(c.Info.StreamResults, stream.StreamInfo)
if list := (*livekit.EgressInfo)(c.Info).GetStream(); list != nil {
if list := c.Info.GetStream(); list != nil {
list.Info = append(list.Info, stream.StreamInfo)
}
c.mu.Unlock()
Expand Down Expand Up @@ -325,7 +324,7 @@ func (c *Controller) streamFinished(ctx context.Context, stream *config.Stream)

// end egress if no outputs remaining
if c.OutputCount.Load() == 0 {
c.SendEOS(ctx, "all streams removed")
c.SendEOS(ctx, livekit.EndReasonStreamsStopped)
return nil
}

Expand Down Expand Up @@ -368,7 +367,7 @@ func (c *Controller) onEOSSent() {
// made it through the pipeline by the time endRecording is closed
if c.SourceType == types.SourceTypeSDK && !c.AudioEnabled {
// this will not actually send a second EOS, but will make sure everything is in the correct state
c.SendEOS(context.Background(), "source closed")
c.SendEOS(context.Background(), livekit.EndReasonSrcClosed)
}
}

Expand All @@ -381,12 +380,12 @@ func (c *Controller) SendEOS(ctx context.Context, reason string) {
c.limitTimer.Stop()
}

c.Info.Details = fmt.Sprintf("end reason: %s", reason)
c.Info.SetEndReason(reason)
logger.Debugw("stopping pipeline", "reason", reason)

switch c.Info.Status {
case livekit.EgressStatus_EGRESS_STARTING:
c.Info.SetAborted(info.MsgStoppedBeforeStarted)
c.Info.SetAborted(livekit.MsgStoppedBeforeStarted)
c.p.Stop()

case livekit.EgressStatus_EGRESS_ABORTED,
Expand All @@ -395,11 +394,11 @@ func (c *Controller) SendEOS(ctx context.Context, reason string) {

case livekit.EgressStatus_EGRESS_ACTIVE:
c.Info.UpdateStatus(livekit.EgressStatus_EGRESS_ENDING)
_, _ = c.ipcServiceClient.HandlerUpdate(ctx, (*livekit.EgressInfo)(c.Info))
_, _ = c.ipcServiceClient.HandlerUpdate(ctx, c.Info)
c.sendEOS()

case livekit.EgressStatus_EGRESS_ENDING:
_, _ = c.ipcServiceClient.HandlerUpdate(ctx, (*livekit.EgressInfo)(c.Info))
_, _ = c.ipcServiceClient.HandlerUpdate(ctx, c.Info)
c.sendEOS()

case livekit.EgressStatus_EGRESS_LIMIT_REACHED:
Expand Down Expand Up @@ -454,7 +453,7 @@ func (c *Controller) Close() {
// ensure egress ends with a final state
switch c.Info.Status {
case livekit.EgressStatus_EGRESS_STARTING:
c.Info.SetAborted(info.MsgStoppedBeforeStarted)
c.Info.SetAborted(livekit.MsgStoppedBeforeStarted)

case livekit.EgressStatus_EGRESS_ACTIVE,
livekit.EgressStatus_EGRESS_ENDING:
Expand Down Expand Up @@ -492,13 +491,13 @@ func (c *Controller) startSessionLimitTimer(ctx context.Context) {
c.limitTimer = time.AfterFunc(timeout, func() {
switch c.Info.Status {
case livekit.EgressStatus_EGRESS_STARTING:
c.Info.SetAborted(info.MsgLimitReachedWithoutStart)
c.Info.SetAborted(livekit.MsgLimitReachedWithoutStart)

case livekit.EgressStatus_EGRESS_ACTIVE:
c.Info.SetLimitReached()
}
if c.playing.IsBroken() {
c.SendEOS(ctx, "time limit reached")
c.SendEOS(ctx, livekit.EndReasonLimitReached)
} else {
c.p.Stop()
}
Expand Down Expand Up @@ -538,7 +537,7 @@ func (c *Controller) updateStartTime(startedAt int64) {

if c.Info.Status == livekit.EgressStatus_EGRESS_STARTING {
c.Info.UpdateStatus(livekit.EgressStatus_EGRESS_ACTIVE)
_, _ = c.ipcServiceClient.HandlerUpdate(context.Background(), (*livekit.EgressInfo)(c.Info))
_, _ = c.ipcServiceClient.HandlerUpdate(context.Background(), c.Info)
}
}

Expand Down Expand Up @@ -576,7 +575,7 @@ func (c *Controller) streamUpdated(ctx context.Context) {
}
}

_, _ = c.ipcServiceClient.HandlerUpdate(ctx, (*livekit.EgressInfo)(c.Info))
_, _ = c.ipcServiceClient.HandlerUpdate(ctx, c.Info)
}

func (c *Controller) updateEndTime() {
Expand Down
2 changes: 1 addition & 1 deletion pkg/pipeline/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (c *Controller) GetGstPipelineDebugDot() string {
}

func (c *Controller) uploadDebugFiles() {
u, err := uploader.New(&c.Debug.StorageConfig, nil, c.monitor)
u, err := uploader.New(&c.Debug.StorageConfig, nil, c.monitor, nil)
if err != nil {
logger.Errorw("failed to create uploader", err)
return
Expand Down
2 changes: 1 addition & 1 deletion pkg/pipeline/sink/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (s *FileSink) Close() error {
}

func (s *FileSink) UploadManifest(filepath string) (string, bool, error) {
if s.DisableManifest && !s.ManifestRequired() {
if s.DisableManifest && !s.conf.Info.BackupStorageUsed {
return "", false, nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/pipeline/sink/image.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func (s *ImageSink) Close() error {
}

func (s *ImageSink) UploadManifest(filepath string) (string, bool, error) {
if s.DisableManifest && !s.ManifestRequired() {
if s.DisableManifest && !s.conf.Info.BackupStorageUsed {
return "", false, nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/pipeline/sink/segments.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ func (s *SegmentSink) Close() error {
}

func (s *SegmentSink) UploadManifest(filepath string) (string, bool, error) {
if s.DisableManifest && !s.ManifestRequired() {
if s.DisableManifest && !s.conf.Info.BackupStorageUsed {
return "", false, nil
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/pipeline/sink/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func CreateSinks(p *config.PipelineConfig, callbacks *gstreamer.Callbacks, monit
case types.EgressTypeFile:
o := c[0].(*config.FileConfig)

u, err := uploader.New(o.StorageConfig, p.BackupConfig, monitor)
u, err := uploader.New(o.StorageConfig, p.BackupConfig, monitor, p.Info)
if err != nil {
return nil, err
}
Expand All @@ -51,7 +51,7 @@ func CreateSinks(p *config.PipelineConfig, callbacks *gstreamer.Callbacks, monit
case types.EgressTypeSegments:
o := c[0].(*config.SegmentConfig)

u, err := uploader.New(o.StorageConfig, p.BackupConfig, monitor)
u, err := uploader.New(o.StorageConfig, p.BackupConfig, monitor, p.Info)
if err != nil {
return nil, err
}
Expand All @@ -75,7 +75,7 @@ func CreateSinks(p *config.PipelineConfig, callbacks *gstreamer.Callbacks, monit
for _, ci := range c {
o := ci.(*config.ImageConfig)

u, err := uploader.New(o.StorageConfig, p.BackupConfig, monitor)
u, err := uploader.New(o.StorageConfig, p.BackupConfig, monitor, p.Info)
if err != nil {
return nil, err
}
Expand Down
Loading