diff --git a/cmd/server/main.go b/cmd/server/main.go index 4c10da8e..c58874c1 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -111,7 +111,7 @@ func runService(c *cli.Context) error { } bus := psrpc.NewRedisMessageBus(rc) - ioClient, err := info.NewIOClient(bus) + ioClient, err := info.NewIOClient(&conf.BaseConfig, bus) if err != nil { return err } diff --git a/pkg/config/base.go b/pkg/config/base.go index b602c132..bfac0703 100644 --- a/pkg/config/base.go +++ b/pkg/config/base.go @@ -42,6 +42,8 @@ type BaseConfig struct { EnableChromeSandbox bool `yaml:"enable_chrome_sandbox"` // enable Chrome sandbox, requires extra docker configuration MaxUploadQueue int `yaml:"max_upload_queue"` // maximum upload queue size, in minutes DisallowLocalStorage bool `yaml:"disallow_local_storage"` // require an upload config for all requests + IOCreateTimeout time.Duration `yaml:"io_create_timeout"` // timeout for CreateEgress calls + IOUpdateTimeout time.Duration `yaml:"io_update_timeout"` // timeout for UpdateEgress calls SessionLimits `yaml:"session_limits"` // session duration limits StorageConfig *StorageConfig `yaml:"storage,omitempty"` // storage config diff --git a/pkg/config/service.go b/pkg/config/service.go index 90539bcf..4e7c21de 100644 --- a/pkg/config/service.go +++ b/pkg/config/service.go @@ -17,6 +17,7 @@ package config import ( "fmt" "os" + "time" "gopkg.in/yaml.v3" @@ -39,6 +40,9 @@ const ( defaultTemplatePort = 7980 defaultTemplateBaseTemplate = "http://localhost:%d/" + + defaultIOCreateTimeout = time.Second * 15 + defaultIOUpdateTimeout = time.Second * 30 ) type ServiceConfig struct { @@ -101,6 +105,13 @@ func (c *ServiceConfig) InitDefaults() { c.TemplateBase = fmt.Sprintf(defaultTemplateBaseTemplate, c.TemplatePort) } + if c.IOCreateTimeout == 0 { + c.IOCreateTimeout = defaultIOCreateTimeout + } + if c.IOUpdateTimeout == 0 { + c.IOUpdateTimeout = defaultIOUpdateTimeout + } + // Setting CPU costs from config. Ensure that CPU costs are positive if c.RoomCompositeCpuCost <= 0 { c.RoomCompositeCpuCost = roomCompositeCpuCost diff --git a/pkg/info/io.go b/pkg/info/io.go index de672acf..5cf6d4e9 100644 --- a/pkg/info/io.go +++ b/pkg/info/io.go @@ -23,6 +23,7 @@ import ( "github.com/frostbyte73/core" "go.uber.org/atomic" + "github.com/livekit/egress/pkg/config" "github.com/livekit/egress/pkg/errors" "github.com/livekit/protocol/egress" "github.com/livekit/protocol/livekit" @@ -32,9 +33,7 @@ import ( ) const ( - createTimeout = time.Second * 10 - ioTimeout = time.Second * 30 - maxBackoff = time.Minute * 10 + maxBackoff = time.Minute * 10 ) type IOClient interface { @@ -48,6 +47,9 @@ type IOClient interface { type ioClient struct { rpc.IOInfoClient + createTimeout time.Duration + updateTimeout time.Duration + mu sync.Mutex egresses map[string]*egressCreation updates chan *update @@ -66,16 +68,18 @@ type update struct { info *livekit.EgressInfo } -func NewIOClient(bus psrpc.MessageBus) (IOClient, error) { - client, err := rpc.NewIOInfoClient(bus, psrpc.WithClientTimeout(ioTimeout)) +func NewIOClient(conf *config.BaseConfig, bus psrpc.MessageBus) (IOClient, error) { + client, err := rpc.NewIOInfoClient(bus) if err != nil { return nil, err } c := &ioClient{ - IOInfoClient: client, - egresses: make(map[string]*egressCreation), - updates: make(chan *update, 1000), + IOInfoClient: client, + createTimeout: conf.IOCreateTimeout, + updateTimeout: conf.IOUpdateTimeout, + egresses: make(map[string]*egressCreation), + updates: make(chan *update, 1000), } c.healthy.Store(true) go c.updateWorker() @@ -92,7 +96,7 @@ func (c *ioClient) CreateEgress(ctx context.Context, info *livekit.EgressInfo) c errChan := make(chan error, 1) go func() { - _, err := c.IOInfoClient.CreateEgress(ctx, info, psrpc.WithRequestTimeout(createTimeout)) + _, err := c.IOInfoClient.CreateEgress(ctx, info, psrpc.WithRequestTimeout(c.createTimeout)) c.mu.Lock() defer c.mu.Unlock() @@ -153,7 +157,7 @@ func (c *ioClient) updateWorker() { func (c *ioClient) sendUpdate(u *update) { d := time.Millisecond * 250 for { - if _, err := c.IOInfoClient.UpdateEgress(u.ctx, u.info); err != nil { + if _, err := c.IOInfoClient.UpdateEgress(u.ctx, u.info, psrpc.WithRequestTimeout(c.updateTimeout)); err != nil { if errors.Is(err, psrpc.ErrRequestTimedOut) { if c.healthy.Swap(false) { logger.Infow("io connection unhealthy") diff --git a/test/integration_test.go b/test/integration_test.go index b98e37cb..92f74e4c 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -45,7 +45,7 @@ func TestEgress(t *testing.T) { require.NoError(t, err) bus := psrpc.NewRedisMessageBus(rc) - ioClient, err := info.NewIOClient(bus) + ioClient, err := info.NewIOClient(&r.ServiceConfig.BaseConfig, bus) require.NoError(t, err) svc, err := server.NewServer(r.ServiceConfig, bus, ioClient)