Skip to content

Commit

Permalink
make sleep interuptable
Browse files Browse the repository at this point in the history
  • Loading branch information
jkmw committed Aug 9, 2023
1 parent aa70ae4 commit bfee0e4
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 8 deletions.
4 changes: 4 additions & 0 deletions pkg/proc/basejob.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ func (job *baseJob) Signal(sig os.Signal) {
)
}

func (job *baseJob) Reset() {
job.phase = JobPhase{}
}

func (job *baseJob) MarkForRestart() {
job.restart = true
}
Expand Down
30 changes: 28 additions & 2 deletions pkg/proc/job_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ func (job *CommonJob) Run(ctx context.Context, _ chan<- error) error {
}()

for { // restart failed jobs as long mittnite is running
if job.stop {
return nil
}

job.ctx, job.interrupt = context.WithCancel(context.Background())
err := job.startOnce(ctx, p)
switch err {
case nil:
Expand All @@ -98,7 +103,7 @@ func (job *CommonJob) Run(ctx context.Context, _ chan<- error) error {
if maxAttempts == -1 || attempts < maxAttempts {
currBackOff := backOff
backOffRetries++
if backOffRetries > maxBackOffRetries {
if backOffRetries >= maxBackOffRetries {
backOff = calculateNextBackOff(currBackOff, maxBackOff)
backOffRetries = 0
}
Expand All @@ -110,7 +115,7 @@ func (job *CommonJob) Run(ctx context.Context, _ chan<- error) error {
WithField("job.nextRestartIn", currBackOff.String()).
Info("remaining attempts")

time.Sleep(currBackOff)
job.crashLoopSleep(currBackOff)
continue
}

Expand Down Expand Up @@ -202,11 +207,13 @@ func (job *CommonJob) IsRunning() bool {
func (job *CommonJob) Restart() {
job.restart = true
job.SignalAll(syscall.SIGTERM)
job.interrupt()
}

func (job *CommonJob) Stop() {
job.stop = true
job.SignalAll(syscall.SIGTERM)
job.interrupt()
}

func (job *CommonJob) Status() *CommonJobStatus {
Expand Down Expand Up @@ -240,3 +247,22 @@ func (job *CommonJob) executeWatchCommand(watchCmd *config.WatchCommand) error {
Info("executing watch command")
return cmd.Run()
}

func (job *CommonJob) crashLoopSleep(duration time.Duration) {
timeout := make(chan bool)

go func() {
defer close(timeout)
<-time.After(duration)
timeout <- true
}()

for {
select {
case <-timeout:
return
case <-job.ctx.Done():
return
}
}
}
9 changes: 5 additions & 4 deletions pkg/proc/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func (r *Runner) Init() error {

func (r *Runner) exec() {
for i := range r.jobs {
r.startJob(r.jobs[i])
r.startJob(r.jobs[i], JobPhaseReasonUnknown)
}
}

Expand All @@ -155,7 +155,7 @@ func (r *Runner) jobExistsAndIsControllable(job *CommonJob) bool {

func (r *Runner) addAndStartJob(job Job) {
r.addJobIfNotExists(job)
r.startJob(job)
r.startJob(job, job.GetPhase().Reason)
}

func (r *Runner) addJobIfNotExists(job Job) {
Expand All @@ -167,9 +167,10 @@ func (r *Runner) addJobIfNotExists(job Job) {
r.jobs = append(r.jobs, job)
}

func (r *Runner) startJob(job Job) {
func (r *Runner) startJob(job Job, initialPhase JobPhaseReason) {
job.GetPhase().Set(initialPhase)
phase := job.GetPhase()
if phase.Is(JobPhaseReasonStopped) || phase.Is(JobPhaseReasonFailed) {
if phase.Is(JobPhaseReasonStopped) || phase.Is(JobPhaseReasonFailed) || phase.Is(JobPhaseReasonCrashLooping) {
return
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/proc/runner_api_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,15 @@ func (r *Runner) apiV1JobMiddleware(next http.Handler) http.Handler {
func (r *Runner) apiV1StartJob(writer http.ResponseWriter, req *http.Request) {
job := req.Context().Value(contextKeyJob).(*CommonJob)
if !job.IsRunning() {
r.startJob(job)
r.startJob(job, JobPhaseReasonUnknown)
}
writer.WriteHeader(http.StatusOK)
}

func (r *Runner) apiV1RestartJob(writer http.ResponseWriter, req *http.Request) {
job := req.Context().Value(contextKeyJob).(*CommonJob)
if !job.IsRunning() {
r.startJob(job)
r.startJob(job, JobPhaseReasonUnknown)
} else {
job.Restart()
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/proc/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ func NewApi(listenAddress string) *Api {
type baseJob struct {
Config *config.BaseJobConfig

ctx context.Context
interrupt context.CancelFunc

cmd *exec.Cmd
restart bool
stop bool
Expand Down Expand Up @@ -97,6 +100,7 @@ type Job interface {
Init()
Run(context.Context, chan<- error) error
Watch()
Reset()

GetPhase() *JobPhase
GetName() string
Expand Down

0 comments on commit bfee0e4

Please sign in to comment.