Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add disallow_local_storage config option #795

Merged
merged 3 commits into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ require (
github.com/gorilla/websocket v1.5.3
github.com/livekit/livekit-server v1.7.3-0.20241017190429-44a74fc06ae7
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1
github.com/livekit/protocol v1.26.1-0.20241017190602-ef6fc8f9c752
github.com/livekit/protocol v1.27.1-0.20241022061022-caa595ed3292
github.com/livekit/psrpc v0.6.1-0.20240924010758-9f0a4268a3b9
github.com/livekit/server-sdk-go/v2 v2.2.2-0.20241015094126-b8538ae5d67b
github.com/pion/rtp v1.8.9
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598 h1:yLlkHk2feSLHstD9n4VKg7YEBR4rLODTI4WE8gNBEnQ=
github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598/go.mod h1:jwKUCmObuiEDH0iiuJHaGMXwRs3RjrB4G6qqgkr/5oE=
github.com/livekit/protocol v1.26.1-0.20241017190602-ef6fc8f9c752 h1:EgULMfdFSW/3ZZckhiF+CwDApYTD3SkqR3MYazKeE5w=
github.com/livekit/protocol v1.26.1-0.20241017190602-ef6fc8f9c752/go.mod h1:nxRzmQBKSYK64gqr7ABWwt78hvrgiO2wYuCojRYb7Gs=
github.com/livekit/protocol v1.27.1-0.20241022061022-caa595ed3292 h1:wVzOLGSjJpCsdKHKKpPxYhXW/JL90l0XYFQbeINSdP4=
github.com/livekit/protocol v1.27.1-0.20241022061022-caa595ed3292/go.mod h1:nxRzmQBKSYK64gqr7ABWwt78hvrgiO2wYuCojRYb7Gs=
github.com/livekit/psrpc v0.6.1-0.20240924010758-9f0a4268a3b9 h1:33oBjGpVD9tYkDXQU42tnHl8eCX9G6PVUToBVuCUyOs=
github.com/livekit/psrpc v0.6.1-0.20240924010758-9f0a4268a3b9/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0=
github.com/livekit/server-sdk-go/v2 v2.2.2-0.20241015094126-b8538ae5d67b h1:R1GpKwVbSYsG08k5sIkNCukvnrkOE18R8IO1YeujR8o=
Expand Down
11 changes: 6 additions & 5 deletions pkg/config/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,12 @@ type BaseConfig struct {
WsUrl string `yaml:"ws_url"` // (env LIVEKIT_WS_URL)

// optional
Logging *logger.Config `yaml:"logging"` // logging config
TemplateBase string `yaml:"template_base"` // custom template base url
ClusterID string `yaml:"cluster_id"` // cluster this instance belongs to
EnableChromeSandbox bool `yaml:"enable_chrome_sandbox"` // enable Chrome sandbox, requires extra docker configuration
MaxUploadQueue int `yaml:"max_upload_queue"` // maximum upload queue size, in minutes
Logging *logger.Config `yaml:"logging"` // logging config
TemplateBase string `yaml:"template_base"` // custom template base url
ClusterID string `yaml:"cluster_id"` // cluster this instance belongs to
EnableChromeSandbox bool `yaml:"enable_chrome_sandbox"` // enable Chrome sandbox, requires extra docker configuration
MaxUploadQueue int `yaml:"max_upload_queue"` // maximum upload queue size, in minutes
DisallowLocalStorage bool `yaml:"disallow_local_storage"` // require an upload config for all requests

SessionLimits `yaml:"session_limits"` // session duration limits
StorageConfig *StorageConfig `yaml:"storage,omitempty"` // storage config
Expand Down
7 changes: 6 additions & 1 deletion pkg/config/output_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,17 @@ type fileRequest interface {
}

func (p *PipelineConfig) getFileConfig(outputType types.OutputType, req fileRequest) (*FileConfig, error) {
sc, err := p.getStorageConfig(req)
if err != nil {
return nil, err
}

conf := &FileConfig{
outputConfig: outputConfig{OutputType: outputType},
FileInfo: &livekit.FileInfo{},
StorageFilepath: clean(req.GetFilepath()),
DisableManifest: req.GetDisableManifest(),
StorageConfig: p.getStorageConfig(req),
StorageConfig: sc,
}

// filename
Expand Down
7 changes: 6 additions & 1 deletion pkg/config/output_image.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ func (p *PipelineConfig) getImageConfig(images *livekit.ImageOutput) (*ImageConf
return nil, err
}

sc, err := p.getStorageConfig(images)
if err != nil {
return nil, err
}

filenamePrefix := clean(images.FilenamePrefix)
conf := &ImageConfig{
outputConfig: outputConfig{
Expand All @@ -77,7 +82,7 @@ func (p *PipelineConfig) getImageConfig(images *livekit.ImageOutput) (*ImageConf
ImagePrefix: filenamePrefix,
ImageSuffix: images.FilenameSuffix,
DisableManifest: images.DisableManifest,
StorageConfig: p.getStorageConfig(images),
StorageConfig: sc,
CaptureInterval: images.CaptureInterval,
Width: images.Width,
Height: images.Height,
Expand Down
10 changes: 7 additions & 3 deletions pkg/config/output_segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ func (p *PipelineConfig) GetSegmentConfig() *SegmentConfig {

// segments should always be added last, so we can check keyframe interval from file/stream
func (p *PipelineConfig) getSegmentConfig(segments *livekit.SegmentedFileOutput) (*SegmentConfig, error) {
sc, err := p.getStorageConfig(segments)
if err != nil {
return nil, err
}

conf := &SegmentConfig{
SegmentsInfo: &livekit.SegmentsInfo{},
SegmentPrefix: clean(segments.FilenamePrefix),
Expand All @@ -60,7 +65,7 @@ func (p *PipelineConfig) getSegmentConfig(segments *livekit.SegmentedFileOutput)
LivePlaylistFilename: clean(segments.LivePlaylistName),
SegmentDuration: int(segments.SegmentDuration),
DisableManifest: segments.DisableManifest,
StorageConfig: p.getStorageConfig(segments),
StorageConfig: sc,
}

if conf.SegmentDuration == 0 {
Expand All @@ -74,8 +79,7 @@ func (p *PipelineConfig) getSegmentConfig(segments *livekit.SegmentedFileOutput)
}

// filename
err := conf.updatePrefixAndPlaylist(p)
if err != nil {
if err = conf.updatePrefixAndPlaylist(p); err != nil {
return nil, err
}

Expand Down
22 changes: 16 additions & 6 deletions pkg/config/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package config
import (
"time"

"github.com/livekit/egress/pkg/errors"
"github.com/livekit/protocol/egress"
)

Expand Down Expand Up @@ -61,7 +62,7 @@ type GCPConfig struct {
ProxyConfig *ProxyConfig `yaml:"proxy_config"`
}

func (p *PipelineConfig) getStorageConfig(req egress.UploadRequest) *StorageConfig {
func (p *PipelineConfig) getStorageConfig(req egress.UploadRequest) (*StorageConfig, error) {
sc := &StorageConfig{}
if p.StorageConfig != nil {
sc.PathPrefix = p.StorageConfig.PathPrefix
Expand Down Expand Up @@ -102,7 +103,7 @@ func (p *PipelineConfig) getStorageConfig(req egress.UploadRequest) *StorageConf
if sc.S3.MinRetryDelay == 0 {
sc.S3.MinRetryDelay = time.Millisecond * 100
}
return sc
return sc, nil
}

if gcp := req.GetGcp(); gcp != nil {
Expand All @@ -117,7 +118,7 @@ func (p *PipelineConfig) getStorageConfig(req egress.UploadRequest) *StorageConf
Password: gcp.Proxy.Password,
}
}
return sc
return sc, nil
}

if azure := req.GetAzure(); azure != nil {
Expand All @@ -126,7 +127,7 @@ func (p *PipelineConfig) getStorageConfig(req egress.UploadRequest) *StorageConf
AccountKey: azure.AccountKey,
ContainerName: azure.ContainerName,
}
return sc
return sc, nil
}

if ali := req.GetAliOSS(); ali != nil {
Expand All @@ -137,8 +138,17 @@ func (p *PipelineConfig) getStorageConfig(req egress.UploadRequest) *StorageConf
Endpoint: ali.Endpoint,
Bucket: ali.Bucket,
}
return sc
return sc, nil
}

return p.StorageConfig
sc = p.StorageConfig
if p.DisallowLocalStorage && (sc == nil || sc.IsLocal()) {
return nil, errors.ErrInvalidInput("output")
}

return sc, nil
}

func (c *StorageConfig) IsLocal() bool {
return c.S3 == nil && c.GCP == nil && c.Azure == nil && c.AliOSS == nil
}
9 changes: 6 additions & 3 deletions pkg/pipeline/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,10 +458,13 @@ func (c *Controller) Close() {
case livekit.EgressStatus_EGRESS_ACTIVE,
livekit.EgressStatus_EGRESS_ENDING:
c.Info.SetComplete()
}
fallthrough

// upload manifest and add location to egress info
c.uploadManifest()
case livekit.EgressStatus_EGRESS_LIMIT_REACHED,
livekit.EgressStatus_EGRESS_COMPLETE:
// upload manifest and add location to egress info
c.uploadManifest()
}
}

func (c *Controller) startSessionLimitTimer(ctx context.Context) {
Expand Down
40 changes: 25 additions & 15 deletions pkg/server/server_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,19 +73,30 @@ func (s *Server) StartEgress(ctx context.Context, req *rpc.StartEgressRequest) (
"request", p.Info.Request,
)

errChan := s.ioClient.CreateEgress(ctx, (*livekit.EgressInfo)(p.Info))
s.launchProcess(req, (*livekit.EgressInfo)(p.Info))
if err = <-errChan; err != nil {
s.AbortProcess(req.EgressId, err)
s.monitor.EgressAborted(req)
s.activeRequests.Dec()
return nil, err
info := (*livekit.EgressInfo)(p.Info)

errChan := s.ioClient.CreateEgress(ctx, info)
launchErr := s.launchProcess(req, info)
createErr := <-errChan

if launchErr != nil {
if createErr == nil {
// send failed update if it was saved to db
s.processEnded(req, info, launchErr)
}
return nil, launchErr
} else if createErr != nil {
// launched but failed to save - abort and return error
info.Error = createErr.Error()
info.ErrorCode = int32(http.StatusInternalServerError)
s.AbortProcess(req.EgressId, createErr)
return nil, createErr
}

return (*livekit.EgressInfo)(p.Info), nil
}

func (s *Server) launchProcess(req *rpc.StartEgressRequest, info *livekit.EgressInfo) {
func (s *Server) launchProcess(req *rpc.StartEgressRequest, info *livekit.EgressInfo) error {
_, span := tracer.Start(context.Background(), "Service.launchProcess")
defer span.End()

Expand All @@ -102,16 +113,14 @@ func (s *Server) launchProcess(req *rpc.StartEgressRequest, info *livekit.Egress
if err != nil {
span.RecordError(err)
logger.Errorw("could not marshal config", err)
s.processEnded(req, info, err)
return
return err
}

reqString, err := protojson.Marshal(req)
if err != nil {
span.RecordError(err)
logger.Errorw("could not marshal request", err)
s.processEnded(req, info, err)
return
return err
}

cmd := exec.Command("egress",
Expand All @@ -125,13 +134,14 @@ func (s *Server) launchProcess(req *rpc.StartEgressRequest, info *livekit.Egress
cmd.SysProcAttr = &syscall.SysProcAttr{Setsid: true}

if err = s.Launch(context.Background(), handlerID, req, info, cmd); err != nil {
s.processEnded(req, info, err)
return err
} else {
s.monitor.UpdatePID(info.EgressId, cmd.Process.Pid)
go func() {
err = cmd.Wait()
s.processEnded(req, info, err)
}()
return nil
}
}

Expand All @@ -145,8 +155,8 @@ func (s *Server) processEnded(req *rpc.StartEgressRequest, info *livekit.EgressI
info.Error = "internal error"
info.ErrorCode = int32(http.StatusInternalServerError)
_ = s.ioClient.UpdateEgress(context.Background(), info)
logger.Errorw("process failed, shutting down", err)
s.Shutdown(false, false)

logger.Errorw("process failed", err)
}

avgCPU, maxCPU := s.monitor.EgressEnded(req)
Expand Down
55 changes: 28 additions & 27 deletions pkg/stats/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,36 +163,37 @@ func (m *Monitor) canAcceptRequestLocked(req *rpc.StartEgressRequest) ([]interfa
"activeWeb", m.webRequests.Load(),
}

var accept bool
var required float64
switch r := req.Request.(type) {
case *rpc.StartEgressRequest_RoomComposite:
if m.webRequests.Load() >= m.cpuCostConfig.MaxConcurrentWeb {
return fields, false
}
if r.RoomComposite.AudioOnly {
required = m.cpuCostConfig.AudioRoomCompositeCpuCost
} else {
required = m.cpuCostConfig.RoomCompositeCpuCost
}
case *rpc.StartEgressRequest_Web:
if m.webRequests.Load() >= m.cpuCostConfig.MaxConcurrentWeb {
return fields, false
}
if r.Web.AudioOnly {
required = m.cpuCostConfig.AudioWebCpuCost
} else {
required = m.cpuCostConfig.WebCpuCost
required := req.EstimatedCpu
if required == 0 {
switch r := req.Request.(type) {
case *rpc.StartEgressRequest_RoomComposite:
if m.webRequests.Load() >= m.cpuCostConfig.MaxConcurrentWeb {
return fields, false
}
if r.RoomComposite.AudioOnly {
required = m.cpuCostConfig.AudioRoomCompositeCpuCost
} else {
required = m.cpuCostConfig.RoomCompositeCpuCost
}
case *rpc.StartEgressRequest_Web:
if m.webRequests.Load() >= m.cpuCostConfig.MaxConcurrentWeb {
return fields, false
}
if r.Web.AudioOnly {
required = m.cpuCostConfig.AudioWebCpuCost
} else {
required = m.cpuCostConfig.WebCpuCost
}
case *rpc.StartEgressRequest_Participant:
required = m.cpuCostConfig.ParticipantCpuCost
case *rpc.StartEgressRequest_TrackComposite:
required = m.cpuCostConfig.TrackCompositeCpuCost
case *rpc.StartEgressRequest_Track:
required = m.cpuCostConfig.TrackCpuCost
}
case *rpc.StartEgressRequest_Participant:
required = m.cpuCostConfig.ParticipantCpuCost
case *rpc.StartEgressRequest_TrackComposite:
required = m.cpuCostConfig.TrackCompositeCpuCost
case *rpc.StartEgressRequest_Track:
required = m.cpuCostConfig.TrackCpuCost
}
accept = available >= required

accept := available >= required
fields = append(fields,
"required", required,
"canAccept", accept,
Expand Down