Skip to content

Commit

Permalink
Rework restarting players to include backoff
Browse files Browse the repository at this point in the history
Also split logging into 2 levels - INFO and ERROR
  • Loading branch information
stephenafamo committed Mar 14, 2024
1 parent e9e8555 commit bf10506
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 36 deletions.
42 changes: 28 additions & 14 deletions conductor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"log/slog"
"sync"
"time"

"github.com/cenkalti/backoff/v4"
)

const defaultTimeout time.Duration = 9 * time.Second
Expand Down Expand Up @@ -35,6 +37,11 @@ func (c *Conductor) playWithLogger(ctx context.Context, logger Logger) error {
c.playing = map[string]struct{}{}
}

// Only use the provided logger if the conductor's logger is nil
if c.Logger != nil {
logger = c.Logger
}

var wg sync.WaitGroup
var lock sync.RWMutex

Expand Down Expand Up @@ -65,7 +72,7 @@ func (c *Conductor) playWithLogger(ctx context.Context, logger Logger) error {

wg.Add(len(c.Players))
for name, p := range c.Players {
go c.conductPlayer(ctxWthCancel, &wg, &lock, errs, name, p, logger)
go c.conductPlayer(ctxWthCancel, &wg, &lock, errs, name, p, logger.WithGroup(name))
}

// Wait for all the players to be done in another goroutine
Expand All @@ -78,10 +85,10 @@ func (c *Conductor) playWithLogger(ctx context.Context, logger Logger) error {
case err := <-errs:
return fmt.Errorf("error occured in a player: %w", err)
case <-timedCtx.Done():
logger.Log("conductor stopped after timeout")
logger.Info("conductor stopped after timeout")
return c.getTimeoutError(&lock)
case <-allDone:
logger.Log("conductor exited sucessfully")
logger.Info("conductor exited sucessfully")
return nil
}
}
Expand All @@ -99,22 +106,29 @@ func (c *Conductor) conductPlayer(ctx context.Context, wg *sync.WaitGroup, lock
c.playing[name] = struct{}{}
lock.Unlock()

l.Log("starting player", slog.String("name", name))
l.Info("starting player", slog.String("name", name))

err := ErrRestart
for errors.Is(err, ErrRestart) {
if c, ok := p.(*Conductor); ok {
err = c.playWithLogger(ctx, subConductorLogger{name: name, l: l})
} else {
err = p.Play(ctx)
}
var bkoff backoff.BackOff = &backoff.StopBackOff{}
if p, ok := p.(PlayerWithBackoff); ok {
bkoff = p.Backoff()
}

if err != nil {
l.Log("error in player", slog.String("name", name))
bkoff = backoff.WithContext(bkoff, ctx)

err := backoff.RetryNotify(func() error {
if c, ok := p.(*Conductor); ok {
return c.playWithLogger(ctx, l)
}
return p.Play(ctx)
}, bkoff, func(err error, d time.Duration) {
l.Error("player failed", slog.Any("err", err), slog.Duration("backoff", d))
})
if err != nil && !errors.Is(err, context.Canceled) {
l.Error("player error", slog.Any("err", err))
errs <- InstrumentError{name, err}
}
l.Log("stopped player", slog.String("name", name))

l.Info("player stopped")
}

lock.Lock()
Expand Down
4 changes: 0 additions & 4 deletions errors.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
package orchestra

import (
"errors"
"fmt"
"strings"
)

// Return this error from a player to signal the conductor to restart it
var ErrRestart = errors.New("restart")

// InstrumentError is an error that happens in an instrument started by a conductor
// It carries the name of the instrument
type InstrumentError struct {
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
module github.com/stephenafamo/orchestra

go 1.21

require github.com/cenkalti/backoff/v4 v4.2.1
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM=
github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
11 changes: 11 additions & 0 deletions player.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,24 @@ import (
"context"
"os"
"os/signal"

"github.com/cenkalti/backoff/v4"
)

// Player is a long running background worker
type Player interface {
Play(context.Context) error
}

// PlayerWithBackoff is a player that can be restarted with a backoff strategy
type PlayerWithBackoff interface {
Player
// A backoff strategy to use when the player fails but returns ErrRestart
// NOTE: This is only called once before the player is started, so it should be
// idempotent
Backoff() backoff.BackOff
}

// PlayUntilSignal starts the player and stops when it receives os.Signals
func PlayUntilSignal(ctx context.Context, p Player, sig ...os.Signal) error {
ctx, cancel := signal.NotifyContext(ctx, sig...)
Expand Down
36 changes: 18 additions & 18 deletions types.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,39 +7,39 @@ import (

// Logger is accepted by some Players ([Conductor], [ServerPlayer])
type Logger interface {
Log(msg string, attrs ...slog.Attr)
Info(msg string, attrs ...slog.Attr)
Error(msg string, attrs ...slog.Attr)
WithGroup(name string) Logger
}

var _ slogInterface = &slog.Logger{}

type slogInterface interface {
LogAttrs(ctx context.Context, level slog.Level, msg string, attrs ...slog.Attr)
WithGroup(name string) *slog.Logger
}

func LoggerFromSlog(level slog.Level, l slogInterface) Logger {
return slogLogger{level, l}
func LoggerFromSlog(infoLevel, errorLevel slog.Level, l slogInterface) Logger {
return slogLogger{infoLevel, errorLevel, l}
}

// DefaultLogger is used when a conductor's logger is nil
var DefaultLogger Logger = LoggerFromSlog(slog.LevelInfo, slog.Default())
var DefaultLogger Logger = LoggerFromSlog(slog.LevelInfo, slog.LevelError, slog.Default())

type slogLogger struct {
lvl slog.Level
logger slogInterface
lvlInfo slog.Level
lvlError slog.Level
logger slogInterface
}

func (d slogLogger) Log(msg string, attrs ...slog.Attr) {
d.logger.LogAttrs(context.Background(), d.lvl, msg, attrs...)
func (d slogLogger) Info(msg string, attrs ...slog.Attr) {
d.logger.LogAttrs(context.Background(), d.lvlInfo, msg, attrs...)
}

type subConductorLogger struct {
name string
l Logger
func (d slogLogger) Error(msg string, attrs ...slog.Attr) {
d.logger.LogAttrs(context.Background(), d.lvlError, msg, attrs...)
}

func (s subConductorLogger) Log(msg string, attrs ...slog.Attr) {
l := s.l
if s.l == nil {
l = DefaultLogger
}

l.Log(msg, append([]slog.Attr{slog.String("conductor", s.name)}, attrs...)...)
func (d slogLogger) WithGroup(name string) Logger {
return slogLogger{d.lvlInfo, d.lvlError, d.logger.WithGroup(name)}
}

0 comments on commit bf10506

Please sign in to comment.