Skip to content

Commit

Permalink
Scheduler: resource waste metric (#3347)
Browse files Browse the repository at this point in the history
* wip

Signed-off-by: mohamed <mohamedabdelfatah2027@gmail.com>

* update scheduler jobRun with repoRun's timestamps

Signed-off-by: Mohamed Abdelfatah <mohamedabdelfatah2027@gmail.com>

* add waste metrics for scheduler

Signed-off-by: Mohamed Abdelfatah <mohamedabdelfatah2027@gmail.com>

* fix test

Signed-off-by: Mohamed Abdelfatah <mohamedabdelfatah2027@gmail.com>

* lint

Signed-off-by: Mohamed Abdelfatah <mohamedabdelfatah2027@gmail.com>

* rework metrics and scheduler job_run

Signed-off-by: Mohamed Abdelfatah <mohamedabdelfatah2027@gmail.com>

* fix test

Signed-off-by: Mohamed Abdelfatah <mohamedabdelfatah2027@gmail.com>

* lint

Signed-off-by: Mohamed Abdelfatah <mohamedabdelfatah2027@gmail.com>

* add withPending

Signed-off-by: Mohamed Abdelfatah <mohamedabdelfatah2027@gmail.com>

* switch to priorState by closest timestamp

Signed-off-by: Mohamed Abdelfatah <mohamedabdelfatah2027@gmail.com>

* add leased

Signed-off-by: Mohamed Abdelfatah <mohamedabdelfatah2027@gmail.com>

* fix typo

Signed-off-by: Mohamed Abdelfatah <mohamedabdelfatah2027@gmail.com>

* update diff value

Signed-off-by: Mohamed Abdelfatah <mohamedabdelfatah2027@gmail.com>

* revert to multi arguments

Signed-off-by: Mohamed Abdelfatah <mohamedabdelfatah2027@gmail.com>

* test fix

Signed-off-by: Mohamed Abdelfatah <mohamedabdelfatah2027@gmail.com>

* add queued

Signed-off-by: Mohamed Abdelfatah <mohamedabdelfatah2027@gmail.com>

* fix queued time

Signed-off-by: Mohamed Abdelfatah <mohamedabdelfatah2027@gmail.com>

* duration fix

Signed-off-by: Mohamed Abdelfatah <mohamedabdelfatah2027@gmail.com>

* add missing time stamps

Signed-off-by: Mohamed Abdelfatah <mohamedabdelfatah2027@gmail.com>

* resolve conflict

Signed-off-by: Mohamed Abdelfatah <mohamedabdelfatah2027@gmail.com>

---------

Signed-off-by: mohamed <mohamedabdelfatah2027@gmail.com>
Signed-off-by: Mohamed Abdelfatah <mohamedabdelfatah2027@gmail.com>
Co-authored-by: Albin Severinson <albin@severinson.org>
  • Loading branch information
Mo-Fatah and severinson authored Feb 8, 2024
1 parent 4d64d30 commit b552605
Show file tree
Hide file tree
Showing 7 changed files with 406 additions and 31 deletions.
8 changes: 8 additions & 0 deletions internal/scheduler/jobdb/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,14 @@ func (job *Job) WithNewRun(executor string, nodeId, nodeName string, scheduledAt
false,
false,
false,
false,
nil,
nil,
nil,
nil,
nil,
false,
false,
))
}

Expand Down
117 changes: 116 additions & 1 deletion internal/scheduler/jobdb/job_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package jobdb

import (
"fmt"
"time"

"github.com/google/uuid"
"github.com/hashicorp/go-multierror"
Expand Down Expand Up @@ -29,14 +30,30 @@ type JobRun struct {
nodeName string
// Priority class priority that this job was scheduled at.
scheduledAtPriority *int32
// True if the job has been reported as running by the executor.
// True if the run has been reported as pending by the executor.
pending bool
// The time at which the run was reported as pending by the executor.
pendingTime *time.Time
// True if the run has been leased to an executor.
leased bool
// The time at which the run was leased to it's current executor.
leaseTime *time.Time
// True if the run has been reported as running by the executor.
running bool
// The time at which the run was reported as running by the executor.
runningTime *time.Time
// True if the run has been reported as preempted by the executor.
preempted bool
// The time at which the run was reported as preempted by the executor.
preemptedTime *time.Time
// True if the job has been reported as succeeded by the executor.
succeeded bool
// True if the job has been reported as failed by the executor.
failed bool
// True if the job has been reported as cancelled by the executor.
cancelled bool
// The time at which the job was reported as cancelled, failed or succeeded by the executor.
terminatedTime *time.Time
// True if the job has been returned by the executor.
returned bool
// True if the job has been returned and the job was given a chance to run.
Expand Down Expand Up @@ -199,10 +216,18 @@ func (jobDb *JobDb) CreateRun(
nodeId string,
nodeName string,
scheduledAtPriority *int32,
leased bool,
pending bool,
running bool,
preempted bool,
succeeded bool,
failed bool,
cancelled bool,
leaseTime *time.Time,
pendingTime *time.Time,
runningTime *time.Time,
preemptedTime *time.Time,
terminatedTime *time.Time,
returned bool,
runAttempted bool,
) *JobRun {
Expand All @@ -214,10 +239,18 @@ func (jobDb *JobDb) CreateRun(
nodeId: jobDb.stringInterner.Intern(nodeId),
nodeName: jobDb.stringInterner.Intern(nodeName),
scheduledAtPriority: scheduledAtPriority,
leased: leased,
pending: pending,
running: running,
preempted: preempted,
succeeded: succeeded,
failed: failed,
cancelled: cancelled,
leaseTime: leaseTime,
pendingTime: pendingTime,
runningTime: runningTime,
preemptedTime: preemptedTime,
terminatedTime: terminatedTime,
returned: returned,
runAttempted: runAttempted,
}
Expand Down Expand Up @@ -288,18 +321,96 @@ func (run *JobRun) WithCancelled(cancelled bool) *JobRun {
return run
}

func (run *JobRun) WithTerminatedTime(terminatedTime *time.Time) *JobRun {
run = run.DeepCopy()
run.terminatedTime = terminatedTime
return run
}

func (run *JobRun) Pending() bool {
return run.pending
}

func (run *JobRun) WithPending(pending bool) *JobRun {
run = run.DeepCopy()
run.pending = pending
return run
}

func (run *JobRun) PendingTime() *time.Time {
return run.pendingTime
}

func (run *JobRun) WithPendingTime(pendingTime *time.Time) *JobRun {
run = run.DeepCopy()
run.pendingTime = pendingTime
return run
}

func (run *JobRun) Leased() bool {
return run.leased
}

func (run *JobRun) LeaseTime() *time.Time {
return run.leaseTime
}

func (run *JobRun) WithLeased(leased bool) *JobRun {
run = run.DeepCopy()
run.leased = leased
return run
}

func (run *JobRun) WithLeasedTime(leaseTime *time.Time) *JobRun {
run = run.DeepCopy()
run.leaseTime = leaseTime
return run
}

// Running Returns true if the executor has reported the job run as running
func (run *JobRun) Running() bool {
return run.running
}

func (run *JobRun) RunningTime() *time.Time {
return run.runningTime
}

// WithRunning returns a copy of the job run with the running status updated.
func (run *JobRun) WithRunning(running bool) *JobRun {
run = run.DeepCopy()
run.running = running
return run
}

func (run *JobRun) WithRunningTime(runningTime *time.Time) *JobRun {
run = run.DeepCopy()
run.runningTime = runningTime
return run
}

// Preempted Returns true if the executor has reported the job run as preempted
func (run *JobRun) Preempted() bool {
return run.preempted
}

func (run *JobRun) PreemptedTime() *time.Time {
return run.preemptedTime
}

// WithRunning returns a copy of the job run with the running status updated.
func (run *JobRun) WithPreempted(preempted bool) *JobRun {
run = run.DeepCopy()
run.preempted = preempted
return run
}

func (run *JobRun) WithPreemptedTime(preemptedTime *time.Time) *JobRun {
run = run.DeepCopy()
run.preemptedTime = preemptedTime
return run
}

// Returned Returns true if the executor has returned the job run.
func (run *JobRun) Returned() bool {
return run.returned
Expand All @@ -311,6 +422,10 @@ func (run *JobRun) WithReturned(returned bool) *JobRun {
return run
}

func (run *JobRun) TerminatedTime() *time.Time {
return run.terminatedTime
}

// RunAttempted Returns true if the executor has attempted to run the job.
func (run *JobRun) RunAttempted() bool {
return run.runAttempted
Expand Down
28 changes: 26 additions & 2 deletions internal/scheduler/jobdb/job_run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,14 @@ var baseJobRun = jobDb.CreateRun(
false,
false,
false,
false,
nil,
nil,
nil,
nil,
nil,
false,
false,
)

// Test methods that only have getters
Expand Down Expand Up @@ -103,30 +111,46 @@ func TestDeepCopy(t *testing.T) {
"job id",
1,
"executor",
"nodeId",
"nodeName",
"nodeId",
&scheduledAtPriority,
true,
true,
true,
true,
true,
true,
true,
nil,
nil,
nil,
nil,
nil,
true,
true,
)
expected := jobDb.CreateRun(
run.id,
"job id",
1,
"executor",
"nodeId",
"nodeName",
"nodeId",
&scheduledAtPriority,
true,
true,
true,
true,
true,
true,
true,
nil,
nil,
nil,
nil,
nil,
true,
true,
)
actual := run.DeepCopy()
run.nodeId = "new nodeId"
Expand Down
39 changes: 30 additions & 9 deletions internal/scheduler/jobdb/reconciliation.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type JobStateTransitions struct {
Job *Job

Queued bool
Scheduled bool
Leased bool
Pending bool
Running bool
Cancelled bool
Expand All @@ -31,7 +31,7 @@ type JobStateTransitions struct {
// applyRunStateTransitions applies the state transitions of a run to that of the associated job.
func (jst JobStateTransitions) applyRunStateTransitions(rst RunStateTransitions) JobStateTransitions {
jst.Queued = jst.Queued || rst.Returned
jst.Scheduled = jst.Scheduled || rst.Scheduled
jst.Leased = jst.Leased || rst.Leased
jst.Pending = jst.Pending || rst.Pending
jst.Running = jst.Running || rst.Running
jst.Cancelled = jst.Cancelled || rst.Cancelled
Expand All @@ -46,7 +46,7 @@ func (jst JobStateTransitions) applyRunStateTransitions(rst RunStateTransitions)
type RunStateTransitions struct {
JobRun *JobRun

Scheduled bool
Leased bool
Returned bool
Pending bool
Running bool
Expand Down Expand Up @@ -153,38 +153,51 @@ func (jobDb *JobDb) reconcileJobDifferences(job *Job, jobRepoJob *database.Job,
return
}

// TODO(albin): Preempted is not supported.
func (jobDb *JobDb) reconcileRunDifferences(jobRun *JobRun, jobRepoRun *database.Run) (rst RunStateTransitions) {
defer func() { rst.JobRun = jobRun }()
if jobRun == nil && jobRepoRun == nil {
return
} else if jobRun == nil && jobRepoRun != nil {
jobRun = jobDb.schedulerRunFromDatabaseRun(jobRepoRun)
rst.Returned = jobRepoRun.Returned
rst.Pending = jobRepoRun.PendingTimestamp != nil
rst.Pending = jobRepoRun.Pending
rst.Leased = jobRepoRun.LeasedTimestamp != nil
rst.Running = jobRepoRun.Running
rst.Preempted = jobRepoRun.Preempted
rst.Cancelled = jobRepoRun.Cancelled
rst.Failed = jobRepoRun.Failed
rst.Succeeded = jobRepoRun.Succeeded
} else if jobRun != nil && jobRepoRun == nil {
return
} else if jobRun != nil && jobRepoRun != nil {
if jobRepoRun.LeasedTimestamp != nil && !jobRun.Leased() {
jobRun = jobRun.WithLeased(true).WithLeasedTime(jobRepoRun.LeasedTimestamp)
rst.Leased = true
}
if jobRepoRun.Pending && !jobRun.Pending() {
jobRun = jobRun.WithPending(true).WithPendingTime(jobRepoRun.PendingTimestamp)
rst.Pending = true
}
if jobRepoRun.Running && !jobRun.Running() {
jobRun = jobRun.WithRunning(true)
jobRun = jobRun.WithRunning(true).WithRunningTime(jobRepoRun.RunningTimestamp)
rst.Running = true
}
if jobRepoRun.Succeeded && !jobRun.Succeeded() {
jobRun = jobRun.WithSucceeded(true).WithRunning(false)
jobRun = jobRun.WithSucceeded(true).WithRunning(false).WithTerminatedTime(jobRepoRun.TerminatedTimestamp)
rst.Succeeded = true
}
if jobRepoRun.Failed && !jobRun.Failed() {
jobRun = jobRun.WithFailed(true).WithRunning(false)
jobRun = jobRun.WithFailed(true).WithRunning(false).WithTerminatedTime(jobRepoRun.TerminatedTimestamp)
rst.Failed = true
}
if jobRepoRun.Cancelled && !jobRun.Cancelled() {
jobRun = jobRun.WithCancelled(true).WithRunning(false)
jobRun = jobRun.WithCancelled(true).WithRunning(false).WithTerminatedTime(jobRepoRun.TerminatedTimestamp)
rst.Cancelled = true
}
if jobRepoRun.Preempted && !jobRun.Preempted() {
jobRun = jobRun.WithPreempted(true).WithRunning(false).WithPreemptedTime(jobRepoRun.TerminatedTimestamp)
rst.Preempted = true
}
if jobRepoRun.Returned && !jobRun.Returned() {
jobRun = jobRun.WithReturned(true).WithRunning(false)
rst.Returned = true
Expand Down Expand Up @@ -241,10 +254,18 @@ func (jobDb *JobDb) schedulerRunFromDatabaseRun(dbRun *database.Run) *JobRun {
nodeId,
dbRun.Node,
dbRun.ScheduledAtPriority,
dbRun.LeasedTimestamp != nil,
dbRun.Pending,
dbRun.Running,
dbRun.Preempted,
dbRun.Succeeded,
dbRun.Failed,
dbRun.Cancelled,
dbRun.LeasedTimestamp,
dbRun.PendingTimestamp,
dbRun.RunningTimestamp,
dbRun.PreemptedTimestamp,
dbRun.TerminatedTimestamp,
dbRun.Returned,
dbRun.RunAttempted,
)
Expand Down
Loading

0 comments on commit b552605

Please sign in to comment.