Skip to content

Commit

Permalink
implement crashLoopBackOff for failed processes (#64)
Browse files Browse the repository at this point in the history
* implement crashLoopBackOff for failed processes

+ `maxAttemps = -1` will restart the job forever

* adjust comments

* make sleep interuptable

* remove obsolete retry magic

* set default `maxAttempts` to 0

* make maxAttempts an int-pointer to handle nil-values
  • Loading branch information
jkmw authored Aug 10, 2023
1 parent 6c3b516 commit 0257a36
Show file tree
Hide file tree
Showing 10 changed files with 105 additions and 17 deletions.
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,18 @@ job "foo" {
}
```
Set `maxAttempts` to `-1` will restart the process "forever".
```hcl
job "foo" {
command = "/usr/local/bin/foo"
args = ["bar"]
maxAttempts = -1
canFail = false
workingDirectory = "/some/path"
}
```
You can append a custom environment to the process by setting `env`:
```hcl
Expand Down
2 changes: 1 addition & 1 deletion cmd/mittnitectl/job_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ var jobStatusCmd = cobra.Command{
lipgloss.Left,
styleStatusLeftColumn.Render("allowed to fail:"),
styleHighlight.Render(fmt.Sprintf("%t", resp.Body.Config.CanFail)),
styleStatusAddendum.Render("(max restart attempts: "), styleHighlight.Render(fmt.Sprintf("%d", resp.Body.Config.MaxAttempts)), ")",
styleStatusAddendum.Render("(max restart attempts: "), styleHighlight.Render(fmt.Sprintf("%d", resp.Body.Config.GetMaxAttempts())), ")",
),
lipgloss.JoinHorizontal(
lipgloss.Left,
Expand Down
2 changes: 1 addition & 1 deletion internal/config/ignitionconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (ignitionConfig *Ignition) GenerateFromConfigDir(configDir string) error {
}

for _, job := range ignitionConfig.Jobs {
if job.MaxAttempts_ != 0 {
if job.MaxAttempts_ != nil {
log.Warnf("field max_attempts in job %s is deprecated in favor of maxAttempts", job.Name)
job.MaxAttempts = job.MaxAttempts_
}
Expand Down
17 changes: 15 additions & 2 deletions internal/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,15 +109,28 @@ type JobConfig struct {
// optional fields for "normal" jobs
// these will be ignored if fields for lazy jobs are set
Watches []Watch `hcl:"watch" json:"watch"`
MaxAttempts_ int `hcl:"max_attempts" json:"-"` // deprecated
MaxAttempts int `hcl:"maxAttempts" json:"maxAttempts"`
MaxAttempts_ *int `hcl:"max_attempts" json:"-,omitempty"` // deprecated
MaxAttempts *int `hcl:"maxAttempts" json:"maxAttempts,omitempty"`
OneTime bool `hcl:"oneTime" json:"oneTime"`

// fields required for lazy activation
Laziness *Laziness `hcl:"lazy" json:"lazy"`
Listeners []Listener `hcl:"listen" json:"listen"`
}

func (jc *JobConfig) GetMaxAttempts() int {
maxAttempts := 3
if jc.MaxAttempts == nil {
return maxAttempts
}

maxAttempts = *jc.MaxAttempts
if maxAttempts < 0 {
maxAttempts = -1
}
return maxAttempts
}

type BootJobConfig struct {
BaseJobConfig `hcl:",squash"`

Expand Down
4 changes: 4 additions & 0 deletions pkg/proc/basejob.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ func (job *baseJob) Signal(sig os.Signal) {
)
}

func (job *baseJob) Reset() {
job.phase = JobPhase{}
}

func (job *baseJob) MarkForRestart() {
job.restart = true
}
Expand Down
17 changes: 17 additions & 0 deletions pkg/proc/crashloopbackoff.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package proc

import (
"time"
)

// calculates the next backOff based on the current backOff
func calculateNextBackOff(currBackOff, maxBackoff time.Duration) time.Duration {
if currBackOff.Seconds() <= 1 {
return 2 * time.Second
}
next := time.Duration(2*currBackOff.Seconds()) * time.Second
if next.Seconds() > maxBackoff.Seconds() {
return maxBackoff
}
return next
}
51 changes: 44 additions & 7 deletions pkg/proc/job_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ import (
log "github.com/sirupsen/logrus"
)

const (
// longest duration between two restarts
maxBackOff = 300 * time.Second
)

func (job *CommonJob) Init() {
job.restart = false
job.stop = false
Expand Down Expand Up @@ -45,12 +50,9 @@ func (job *CommonJob) Run(ctx context.Context, _ chan<- error) error {

l := log.WithField("job.name", job.Config.Name)

backOff := 1 * time.Second
attempts := 0
maxAttempts := job.Config.MaxAttempts

if maxAttempts <= 0 {
maxAttempts = 3
}
maxAttempts := job.Config.GetMaxAttempts()

p := make(chan *os.Process)
defer close(p)
Expand All @@ -62,6 +64,11 @@ func (job *CommonJob) Run(ctx context.Context, _ chan<- error) error {
}()

for { // restart failed jobs as long mittnite is running
if job.stop {
return nil
}

job.ctx, job.interrupt = context.WithCancel(context.Background())
err := job.startOnce(ctx, p)
switch err {
case nil:
Expand All @@ -81,9 +88,18 @@ func (job *CommonJob) Run(ctx context.Context, _ chan<- error) error {
}

attempts++
if attempts < maxAttempts {
if maxAttempts == -1 || attempts < maxAttempts {
currBackOff := backOff
backOff = calculateNextBackOff(currBackOff, maxBackOff)

job.phase.Set(JobPhaseReasonCrashLooping)
l.WithField("job.maxAttempts", maxAttempts).WithField("job.usedAttempts", attempts).Info("remaining attempts")
l.
WithField("job.maxAttempts", maxAttempts).
WithField("job.usedAttempts", attempts).
WithField("job.nextRestartIn", currBackOff.String()).
Info("remaining attempts")

job.crashLoopSleep(currBackOff)
continue
}

Expand Down Expand Up @@ -175,11 +191,13 @@ func (job *CommonJob) IsRunning() bool {
func (job *CommonJob) Restart() {
job.restart = true
job.SignalAll(syscall.SIGTERM)
job.interrupt()
}

func (job *CommonJob) Stop() {
job.stop = true
job.SignalAll(syscall.SIGTERM)
job.interrupt()
}

func (job *CommonJob) Status() *CommonJobStatus {
Expand Down Expand Up @@ -213,3 +231,22 @@ func (job *CommonJob) executeWatchCommand(watchCmd *config.WatchCommand) error {
Info("executing watch command")
return cmd.Run()
}

func (job *CommonJob) crashLoopSleep(duration time.Duration) {
timeout := make(chan bool)

go func() {
defer close(timeout)
<-time.After(duration)
timeout <- true
}()

for {
select {
case <-timeout:
return
case <-job.ctx.Done():
return
}
}
}
9 changes: 5 additions & 4 deletions pkg/proc/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func (r *Runner) Init() error {

func (r *Runner) exec() {
for i := range r.jobs {
r.startJob(r.jobs[i])
r.startJob(r.jobs[i], JobPhaseReasonUnknown)
}
}

Expand All @@ -155,7 +155,7 @@ func (r *Runner) jobExistsAndIsControllable(job *CommonJob) bool {

func (r *Runner) addAndStartJob(job Job) {
r.addJobIfNotExists(job)
r.startJob(job)
r.startJob(job, job.GetPhase().Reason)
}

func (r *Runner) addJobIfNotExists(job Job) {
Expand All @@ -167,9 +167,10 @@ func (r *Runner) addJobIfNotExists(job Job) {
r.jobs = append(r.jobs, job)
}

func (r *Runner) startJob(job Job) {
func (r *Runner) startJob(job Job, initialPhase JobPhaseReason) {
job.GetPhase().Set(initialPhase)
phase := job.GetPhase()
if phase.Is(JobPhaseReasonStopped) || phase.Is(JobPhaseReasonFailed) {
if phase.Is(JobPhaseReasonStopped) || phase.Is(JobPhaseReasonFailed) || phase.Is(JobPhaseReasonCrashLooping) {
return
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/proc/runner_api_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,15 @@ func (r *Runner) apiV1JobMiddleware(next http.Handler) http.Handler {
func (r *Runner) apiV1StartJob(writer http.ResponseWriter, req *http.Request) {
job := req.Context().Value(contextKeyJob).(*CommonJob)
if !job.IsRunning() {
r.startJob(job)
r.startJob(job, JobPhaseReasonUnknown)
}
writer.WriteHeader(http.StatusOK)
}

func (r *Runner) apiV1RestartJob(writer http.ResponseWriter, req *http.Request) {
job := req.Context().Value(contextKeyJob).(*CommonJob)
if !job.IsRunning() {
r.startJob(job)
r.startJob(job, JobPhaseReasonUnknown)
} else {
job.Restart()
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/proc/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ func NewApi(listenAddress string) *Api {
type baseJob struct {
Config *config.BaseJobConfig

ctx context.Context
interrupt context.CancelFunc

cmd *exec.Cmd
restart bool
stop bool
Expand Down Expand Up @@ -97,6 +100,7 @@ type Job interface {
Init()
Run(context.Context, chan<- error) error
Watch()
Reset()

GetPhase() *JobPhase
GetName() string
Expand Down

0 comments on commit 0257a36

Please sign in to comment.