Skip to content

Commit

Permalink
move timeouts to config
Browse files Browse the repository at this point in the history
  • Loading branch information
frostbyte73 committed Nov 21, 2024
1 parent 90182fa commit 654dc90
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 12 deletions.
2 changes: 1 addition & 1 deletion cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func runService(c *cli.Context) error {
}

bus := psrpc.NewRedisMessageBus(rc)
ioClient, err := info.NewIOClient(bus)
ioClient, err := info.NewIOClient(&conf.BaseConfig, bus)
if err != nil {
return err
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/config/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ type BaseConfig struct {
EnableChromeSandbox bool `yaml:"enable_chrome_sandbox"` // enable Chrome sandbox, requires extra docker configuration
MaxUploadQueue int `yaml:"max_upload_queue"` // maximum upload queue size, in minutes
DisallowLocalStorage bool `yaml:"disallow_local_storage"` // require an upload config for all requests
IOCreateTimeout time.Duration `yaml:"io_create_timeout"` // timeout for CreateEgress calls
IOUpdateTimeout time.Duration `yaml:"io_update_timeout"` // timeout for UpdateEgress calls

SessionLimits `yaml:"session_limits"` // session duration limits
StorageConfig *StorageConfig `yaml:"storage,omitempty"` // storage config
Expand Down
11 changes: 11 additions & 0 deletions pkg/config/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package config
import (
"fmt"
"os"
"time"

"gopkg.in/yaml.v3"

Expand All @@ -39,6 +40,9 @@ const (

defaultTemplatePort = 7980
defaultTemplateBaseTemplate = "http://localhost:%d/"

defaultIOCreateTimeout = time.Second * 15
defaultIOUpdateTimeout = time.Second * 30
)

type ServiceConfig struct {
Expand Down Expand Up @@ -101,6 +105,13 @@ func (c *ServiceConfig) InitDefaults() {
c.TemplateBase = fmt.Sprintf(defaultTemplateBaseTemplate, c.TemplatePort)
}

if c.IOCreateTimeout == 0 {
c.IOCreateTimeout = defaultIOCreateTimeout
}
if c.IOUpdateTimeout == 0 {
c.IOUpdateTimeout = defaultIOUpdateTimeout
}

// Setting CPU costs from config. Ensure that CPU costs are positive
if c.RoomCompositeCpuCost <= 0 {
c.RoomCompositeCpuCost = roomCompositeCpuCost
Expand Down
24 changes: 14 additions & 10 deletions pkg/info/io.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/frostbyte73/core"
"go.uber.org/atomic"

"github.com/livekit/egress/pkg/config"
"github.com/livekit/egress/pkg/errors"
"github.com/livekit/protocol/egress"
"github.com/livekit/protocol/livekit"
Expand All @@ -32,9 +33,7 @@ import (
)

const (
createTimeout = time.Second * 10
ioTimeout = time.Second * 30
maxBackoff = time.Minute * 10
maxBackoff = time.Minute * 10
)

type IOClient interface {
Expand All @@ -48,6 +47,9 @@ type IOClient interface {
type ioClient struct {
rpc.IOInfoClient

createTimeout time.Duration
updateTimeout time.Duration

mu sync.Mutex
egresses map[string]*egressCreation
updates chan *update
Expand All @@ -66,16 +68,18 @@ type update struct {
info *livekit.EgressInfo
}

func NewIOClient(bus psrpc.MessageBus) (IOClient, error) {
client, err := rpc.NewIOInfoClient(bus, psrpc.WithClientTimeout(ioTimeout))
func NewIOClient(conf *config.BaseConfig, bus psrpc.MessageBus) (IOClient, error) {
client, err := rpc.NewIOInfoClient(bus)
if err != nil {
return nil, err
}

c := &ioClient{
IOInfoClient: client,
egresses: make(map[string]*egressCreation),
updates: make(chan *update, 1000),
IOInfoClient: client,
createTimeout: conf.IOCreateTimeout,
updateTimeout: conf.IOUpdateTimeout,
egresses: make(map[string]*egressCreation),
updates: make(chan *update, 1000),
}
c.healthy.Store(true)
go c.updateWorker()
Expand All @@ -92,7 +96,7 @@ func (c *ioClient) CreateEgress(ctx context.Context, info *livekit.EgressInfo) c

errChan := make(chan error, 1)
go func() {
_, err := c.IOInfoClient.CreateEgress(ctx, info, psrpc.WithRequestTimeout(createTimeout))
_, err := c.IOInfoClient.CreateEgress(ctx, info, psrpc.WithRequestTimeout(c.createTimeout))

c.mu.Lock()
defer c.mu.Unlock()
Expand Down Expand Up @@ -153,7 +157,7 @@ func (c *ioClient) updateWorker() {
func (c *ioClient) sendUpdate(u *update) {
d := time.Millisecond * 250
for {
if _, err := c.IOInfoClient.UpdateEgress(u.ctx, u.info); err != nil {
if _, err := c.IOInfoClient.UpdateEgress(u.ctx, u.info, psrpc.WithRequestTimeout(c.updateTimeout)); err != nil {
if errors.Is(err, psrpc.ErrRequestTimedOut) {
if c.healthy.Swap(false) {
logger.Infow("io connection unhealthy")
Expand Down
2 changes: 1 addition & 1 deletion test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestEgress(t *testing.T) {
require.NoError(t, err)
bus := psrpc.NewRedisMessageBus(rc)

ioClient, err := info.NewIOClient(bus)
ioClient, err := info.NewIOClient(&r.ServiceConfig.BaseConfig, bus)
require.NoError(t, err)

svc, err := server.NewServer(r.ServiceConfig, bus, ioClient)
Expand Down

0 comments on commit 654dc90

Please sign in to comment.