From ffdc50115a050ba932b018a7fd62eb32d452ec97 Mon Sep 17 00:00:00 2001 From: David Colburn Date: Wed, 23 Oct 2024 16:06:37 -0400 Subject: [PATCH 1/3] add disallow-local config option --- pkg/config/base.go | 11 +++++----- pkg/config/output_file.go | 7 ++++++- pkg/config/output_image.go | 7 ++++++- pkg/config/output_segment.go | 10 ++++++--- pkg/config/storage.go | 22 ++++++++++++++------ pkg/server/server_rpc.go | 40 ++++++++++++++++++++++-------------- 6 files changed, 66 insertions(+), 31 deletions(-) diff --git a/pkg/config/base.go b/pkg/config/base.go index df7718be..b602c132 100644 --- a/pkg/config/base.go +++ b/pkg/config/base.go @@ -36,11 +36,12 @@ type BaseConfig struct { WsUrl string `yaml:"ws_url"` // (env LIVEKIT_WS_URL) // optional - Logging *logger.Config `yaml:"logging"` // logging config - TemplateBase string `yaml:"template_base"` // custom template base url - ClusterID string `yaml:"cluster_id"` // cluster this instance belongs to - EnableChromeSandbox bool `yaml:"enable_chrome_sandbox"` // enable Chrome sandbox, requires extra docker configuration - MaxUploadQueue int `yaml:"max_upload_queue"` // maximum upload queue size, in minutes + Logging *logger.Config `yaml:"logging"` // logging config + TemplateBase string `yaml:"template_base"` // custom template base url + ClusterID string `yaml:"cluster_id"` // cluster this instance belongs to + EnableChromeSandbox bool `yaml:"enable_chrome_sandbox"` // enable Chrome sandbox, requires extra docker configuration + MaxUploadQueue int `yaml:"max_upload_queue"` // maximum upload queue size, in minutes + DisallowLocalStorage bool `yaml:"disallow_local_storage"` // require an upload config for all requests SessionLimits `yaml:"session_limits"` // session duration limits StorageConfig *StorageConfig `yaml:"storage,omitempty"` // storage config diff --git a/pkg/config/output_file.go b/pkg/config/output_file.go index d09bc0c0..d515b795 100644 --- a/pkg/config/output_file.go +++ b/pkg/config/output_file.go @@ -70,12 +70,17 @@ type fileRequest interface { } func (p *PipelineConfig) getFileConfig(outputType types.OutputType, req fileRequest) (*FileConfig, error) { + sc, err := p.getStorageConfig(req) + if err != nil { + return nil, err + } + conf := &FileConfig{ outputConfig: outputConfig{OutputType: outputType}, FileInfo: &livekit.FileInfo{}, StorageFilepath: clean(req.GetFilepath()), DisableManifest: req.GetDisableManifest(), - StorageConfig: p.getStorageConfig(req), + StorageConfig: sc, } // filename diff --git a/pkg/config/output_image.go b/pkg/config/output_image.go index 3a82b9ff..28649b25 100644 --- a/pkg/config/output_image.go +++ b/pkg/config/output_image.go @@ -64,6 +64,11 @@ func (p *PipelineConfig) getImageConfig(images *livekit.ImageOutput) (*ImageConf return nil, err } + sc, err := p.getStorageConfig(images) + if err != nil { + return nil, err + } + filenamePrefix := clean(images.FilenamePrefix) conf := &ImageConfig{ outputConfig: outputConfig{ @@ -77,7 +82,7 @@ func (p *PipelineConfig) getImageConfig(images *livekit.ImageOutput) (*ImageConf ImagePrefix: filenamePrefix, ImageSuffix: images.FilenameSuffix, DisableManifest: images.DisableManifest, - StorageConfig: p.getStorageConfig(images), + StorageConfig: sc, CaptureInterval: images.CaptureInterval, Width: images.Width, Height: images.Height, diff --git a/pkg/config/output_segment.go b/pkg/config/output_segment.go index f2f79a5a..27d11620 100644 --- a/pkg/config/output_segment.go +++ b/pkg/config/output_segment.go @@ -52,6 +52,11 @@ func (p *PipelineConfig) GetSegmentConfig() *SegmentConfig { // segments should always be added last, so we can check keyframe interval from file/stream func (p *PipelineConfig) getSegmentConfig(segments *livekit.SegmentedFileOutput) (*SegmentConfig, error) { + sc, err := p.getStorageConfig(segments) + if err != nil { + return nil, err + } + conf := &SegmentConfig{ SegmentsInfo: &livekit.SegmentsInfo{}, SegmentPrefix: clean(segments.FilenamePrefix), @@ -60,7 +65,7 @@ func (p *PipelineConfig) getSegmentConfig(segments *livekit.SegmentedFileOutput) LivePlaylistFilename: clean(segments.LivePlaylistName), SegmentDuration: int(segments.SegmentDuration), DisableManifest: segments.DisableManifest, - StorageConfig: p.getStorageConfig(segments), + StorageConfig: sc, } if conf.SegmentDuration == 0 { @@ -74,8 +79,7 @@ func (p *PipelineConfig) getSegmentConfig(segments *livekit.SegmentedFileOutput) } // filename - err := conf.updatePrefixAndPlaylist(p) - if err != nil { + if err = conf.updatePrefixAndPlaylist(p); err != nil { return nil, err } diff --git a/pkg/config/storage.go b/pkg/config/storage.go index d96590ab..1541b291 100644 --- a/pkg/config/storage.go +++ b/pkg/config/storage.go @@ -17,6 +17,7 @@ package config import ( "time" + "github.com/livekit/egress/pkg/errors" "github.com/livekit/protocol/egress" ) @@ -61,7 +62,7 @@ type GCPConfig struct { ProxyConfig *ProxyConfig `yaml:"proxy_config"` } -func (p *PipelineConfig) getStorageConfig(req egress.UploadRequest) *StorageConfig { +func (p *PipelineConfig) getStorageConfig(req egress.UploadRequest) (*StorageConfig, error) { sc := &StorageConfig{} if p.StorageConfig != nil { sc.PathPrefix = p.StorageConfig.PathPrefix @@ -102,7 +103,7 @@ func (p *PipelineConfig) getStorageConfig(req egress.UploadRequest) *StorageConf if sc.S3.MinRetryDelay == 0 { sc.S3.MinRetryDelay = time.Millisecond * 100 } - return sc + return sc, nil } if gcp := req.GetGcp(); gcp != nil { @@ -117,7 +118,7 @@ func (p *PipelineConfig) getStorageConfig(req egress.UploadRequest) *StorageConf Password: gcp.Proxy.Password, } } - return sc + return sc, nil } if azure := req.GetAzure(); azure != nil { @@ -126,7 +127,7 @@ func (p *PipelineConfig) getStorageConfig(req egress.UploadRequest) *StorageConf AccountKey: azure.AccountKey, ContainerName: azure.ContainerName, } - return sc + return sc, nil } if ali := req.GetAliOSS(); ali != nil { @@ -137,8 +138,17 @@ func (p *PipelineConfig) getStorageConfig(req egress.UploadRequest) *StorageConf Endpoint: ali.Endpoint, Bucket: ali.Bucket, } - return sc + return sc, nil } - return p.StorageConfig + sc = p.StorageConfig + if p.DisallowLocalStorage && (sc == nil || sc.IsLocal()) { + return nil, errors.ErrInvalidInput("output") + } + + return sc, nil +} + +func (c *StorageConfig) IsLocal() bool { + return c.S3 == nil && c.GCP == nil && c.Azure == nil && c.AliOSS == nil } diff --git a/pkg/server/server_rpc.go b/pkg/server/server_rpc.go index f068bd6d..9c737f78 100644 --- a/pkg/server/server_rpc.go +++ b/pkg/server/server_rpc.go @@ -73,19 +73,30 @@ func (s *Server) StartEgress(ctx context.Context, req *rpc.StartEgressRequest) ( "request", p.Info.Request, ) - errChan := s.ioClient.CreateEgress(ctx, (*livekit.EgressInfo)(p.Info)) - s.launchProcess(req, (*livekit.EgressInfo)(p.Info)) - if err = <-errChan; err != nil { - s.AbortProcess(req.EgressId, err) - s.monitor.EgressAborted(req) - s.activeRequests.Dec() - return nil, err + info := (*livekit.EgressInfo)(p.Info) + + errChan := s.ioClient.CreateEgress(ctx, info) + launchErr := s.launchProcess(req, info) + createErr := <-errChan + + if launchErr != nil { + if createErr == nil { + // send failed update if it was saved to db + s.processEnded(req, info, launchErr) + } + return nil, launchErr + } else if createErr != nil { + // launched but failed to save - abort and return error + info.Error = createErr.Error() + info.ErrorCode = int32(http.StatusInternalServerError) + s.AbortProcess(req.EgressId, createErr) + return nil, createErr } return (*livekit.EgressInfo)(p.Info), nil } -func (s *Server) launchProcess(req *rpc.StartEgressRequest, info *livekit.EgressInfo) { +func (s *Server) launchProcess(req *rpc.StartEgressRequest, info *livekit.EgressInfo) error { _, span := tracer.Start(context.Background(), "Service.launchProcess") defer span.End() @@ -102,16 +113,14 @@ func (s *Server) launchProcess(req *rpc.StartEgressRequest, info *livekit.Egress if err != nil { span.RecordError(err) logger.Errorw("could not marshal config", err) - s.processEnded(req, info, err) - return + return err } reqString, err := protojson.Marshal(req) if err != nil { span.RecordError(err) logger.Errorw("could not marshal request", err) - s.processEnded(req, info, err) - return + return err } cmd := exec.Command("egress", @@ -125,13 +134,14 @@ func (s *Server) launchProcess(req *rpc.StartEgressRequest, info *livekit.Egress cmd.SysProcAttr = &syscall.SysProcAttr{Setsid: true} if err = s.Launch(context.Background(), handlerID, req, info, cmd); err != nil { - s.processEnded(req, info, err) + return err } else { s.monitor.UpdatePID(info.EgressId, cmd.Process.Pid) go func() { err = cmd.Wait() s.processEnded(req, info, err) }() + return nil } } @@ -145,8 +155,8 @@ func (s *Server) processEnded(req *rpc.StartEgressRequest, info *livekit.EgressI info.Error = "internal error" info.ErrorCode = int32(http.StatusInternalServerError) _ = s.ioClient.UpdateEgress(context.Background(), info) - logger.Errorw("process failed, shutting down", err) - s.Shutdown(false, false) + + logger.Errorw("process failed", err) } avgCPU, maxCPU := s.monitor.EgressEnded(req) From 154e69ac0f6ddb45fb1dab97008e4b5b69dac45e Mon Sep 17 00:00:00 2001 From: David Colburn Date: Thu, 24 Oct 2024 11:34:55 -0400 Subject: [PATCH 2/3] only upload manifest on success --- pkg/pipeline/controller.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/pkg/pipeline/controller.go b/pkg/pipeline/controller.go index a098acdb..788fb81b 100644 --- a/pkg/pipeline/controller.go +++ b/pkg/pipeline/controller.go @@ -458,10 +458,13 @@ func (c *Controller) Close() { case livekit.EgressStatus_EGRESS_ACTIVE, livekit.EgressStatus_EGRESS_ENDING: c.Info.SetComplete() - } + fallthrough - // upload manifest and add location to egress info - c.uploadManifest() + case livekit.EgressStatus_EGRESS_LIMIT_REACHED, + livekit.EgressStatus_EGRESS_COMPLETE: + // upload manifest and add location to egress info + c.uploadManifest() + } } func (c *Controller) startSessionLimitTimer(ctx context.Context) { From 4e9d046acb36570228e5c7b185328add9257c13c Mon Sep 17 00:00:00 2001 From: David Colburn Date: Thu, 24 Oct 2024 12:10:59 -0400 Subject: [PATCH 3/3] use estimated cpu from request if available --- go.mod | 2 +- go.sum | 4 ++-- pkg/stats/monitor.go | 55 ++++++++++++++++++++++---------------------- 3 files changed, 31 insertions(+), 30 deletions(-) diff --git a/go.mod b/go.mod index 54fc2336..84ee2630 100644 --- a/go.mod +++ b/go.mod @@ -23,7 +23,7 @@ require ( github.com/gorilla/websocket v1.5.3 github.com/livekit/livekit-server v1.7.3-0.20241017190429-44a74fc06ae7 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 - github.com/livekit/protocol v1.26.1-0.20241017190602-ef6fc8f9c752 + github.com/livekit/protocol v1.27.1-0.20241022061022-caa595ed3292 github.com/livekit/psrpc v0.6.1-0.20240924010758-9f0a4268a3b9 github.com/livekit/server-sdk-go/v2 v2.2.2-0.20241015094126-b8538ae5d67b github.com/pion/rtp v1.8.9 diff --git a/go.sum b/go.sum index 87f2c7ed..fed35c3b 100644 --- a/go.sum +++ b/go.sum @@ -218,8 +218,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598 h1:yLlkHk2feSLHstD9n4VKg7YEBR4rLODTI4WE8gNBEnQ= github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598/go.mod h1:jwKUCmObuiEDH0iiuJHaGMXwRs3RjrB4G6qqgkr/5oE= -github.com/livekit/protocol v1.26.1-0.20241017190602-ef6fc8f9c752 h1:EgULMfdFSW/3ZZckhiF+CwDApYTD3SkqR3MYazKeE5w= -github.com/livekit/protocol v1.26.1-0.20241017190602-ef6fc8f9c752/go.mod h1:nxRzmQBKSYK64gqr7ABWwt78hvrgiO2wYuCojRYb7Gs= +github.com/livekit/protocol v1.27.1-0.20241022061022-caa595ed3292 h1:wVzOLGSjJpCsdKHKKpPxYhXW/JL90l0XYFQbeINSdP4= +github.com/livekit/protocol v1.27.1-0.20241022061022-caa595ed3292/go.mod h1:nxRzmQBKSYK64gqr7ABWwt78hvrgiO2wYuCojRYb7Gs= github.com/livekit/psrpc v0.6.1-0.20240924010758-9f0a4268a3b9 h1:33oBjGpVD9tYkDXQU42tnHl8eCX9G6PVUToBVuCUyOs= github.com/livekit/psrpc v0.6.1-0.20240924010758-9f0a4268a3b9/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0= github.com/livekit/server-sdk-go/v2 v2.2.2-0.20241015094126-b8538ae5d67b h1:R1GpKwVbSYsG08k5sIkNCukvnrkOE18R8IO1YeujR8o= diff --git a/pkg/stats/monitor.go b/pkg/stats/monitor.go index aa451063..38755bf9 100644 --- a/pkg/stats/monitor.go +++ b/pkg/stats/monitor.go @@ -163,36 +163,37 @@ func (m *Monitor) canAcceptRequestLocked(req *rpc.StartEgressRequest) ([]interfa "activeWeb", m.webRequests.Load(), } - var accept bool - var required float64 - switch r := req.Request.(type) { - case *rpc.StartEgressRequest_RoomComposite: - if m.webRequests.Load() >= m.cpuCostConfig.MaxConcurrentWeb { - return fields, false - } - if r.RoomComposite.AudioOnly { - required = m.cpuCostConfig.AudioRoomCompositeCpuCost - } else { - required = m.cpuCostConfig.RoomCompositeCpuCost - } - case *rpc.StartEgressRequest_Web: - if m.webRequests.Load() >= m.cpuCostConfig.MaxConcurrentWeb { - return fields, false - } - if r.Web.AudioOnly { - required = m.cpuCostConfig.AudioWebCpuCost - } else { - required = m.cpuCostConfig.WebCpuCost + required := req.EstimatedCpu + if required == 0 { + switch r := req.Request.(type) { + case *rpc.StartEgressRequest_RoomComposite: + if m.webRequests.Load() >= m.cpuCostConfig.MaxConcurrentWeb { + return fields, false + } + if r.RoomComposite.AudioOnly { + required = m.cpuCostConfig.AudioRoomCompositeCpuCost + } else { + required = m.cpuCostConfig.RoomCompositeCpuCost + } + case *rpc.StartEgressRequest_Web: + if m.webRequests.Load() >= m.cpuCostConfig.MaxConcurrentWeb { + return fields, false + } + if r.Web.AudioOnly { + required = m.cpuCostConfig.AudioWebCpuCost + } else { + required = m.cpuCostConfig.WebCpuCost + } + case *rpc.StartEgressRequest_Participant: + required = m.cpuCostConfig.ParticipantCpuCost + case *rpc.StartEgressRequest_TrackComposite: + required = m.cpuCostConfig.TrackCompositeCpuCost + case *rpc.StartEgressRequest_Track: + required = m.cpuCostConfig.TrackCpuCost } - case *rpc.StartEgressRequest_Participant: - required = m.cpuCostConfig.ParticipantCpuCost - case *rpc.StartEgressRequest_TrackComposite: - required = m.cpuCostConfig.TrackCompositeCpuCost - case *rpc.StartEgressRequest_Track: - required = m.cpuCostConfig.TrackCpuCost } - accept = available >= required + accept := available >= required fields = append(fields, "required", required, "canAccept", accept,