Skip to content

Commit

Permalink
Also print queue and job scheduling reports with --queue and --job (
Browse files Browse the repository at this point in the history
  • Loading branch information
zuqq authored Jun 19, 2023
1 parent 5d79600 commit b52963a
Show file tree
Hide file tree
Showing 2 changed files with 154 additions and 107 deletions.
23 changes: 12 additions & 11 deletions internal/scheduler/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func (sctx *SchedulingContext) ReportString(verbosity int32) string {
fmt.Fprint(w, "Scheduled queues:\n")
for queueName, qctx := range scheduled {
fmt.Fprintf(w, "\t%s:\n", queueName)
fmt.Fprintf(w, indent.String("\t\t", qctx.ReportString(verbosity-1)))
fmt.Fprintf(w, indent.String("\t\t", qctx.ReportString(verbosity-2)))
}
}
preempted := armadamaps.Filter(
Expand All @@ -182,7 +182,7 @@ func (sctx *SchedulingContext) ReportString(verbosity int32) string {
fmt.Fprint(w, "Preempted queues:\n")
for queueName, qctx := range preempted {
fmt.Fprintf(w, "\t%s:\n", queueName)
fmt.Fprintf(w, indent.String("\t\t", qctx.ReportString(verbosity-1)))
fmt.Fprintf(w, indent.String("\t\t", qctx.ReportString(verbosity-2)))
}
}
w.Flush()
Expand Down Expand Up @@ -341,28 +341,29 @@ func (qctx *QueueSchedulingContext) String() string {
return qctx.ReportString(0)
}

const maxPrintedJobIdsByReason = 1
const maxJobIdsToPrint = 1

func (qctx *QueueSchedulingContext) ReportString(verbosity int32) string {
var sb strings.Builder
w := tabwriter.NewWriter(&sb, 1, 1, 1, ' ', 0)
if verbosity > 0 {
fmt.Fprintf(w, "Created:\t%s\n", qctx.Created)
if verbosity >= 0 {
fmt.Fprintf(w, "Time:\t%s\n", qctx.Created)
fmt.Fprintf(w, "Queue:\t%s\n", qctx.Queue)
}
fmt.Fprintf(w, "Scheduled resources:\t%s\n", qctx.ScheduledResourcesByPriority.AggregateByResource().CompactString())
fmt.Fprintf(w, "Scheduled resources (by priority):\t%s\n", qctx.ScheduledResourcesByPriority.String())
fmt.Fprintf(w, "Preempted resources:\t%s\n", qctx.EvictedResourcesByPriority.AggregateByResource().CompactString())
fmt.Fprintf(w, "Preempted resources (by priority):\t%s\n", qctx.EvictedResourcesByPriority.String())
if verbosity > 0 {
if verbosity >= 0 {
fmt.Fprintf(w, "Total allocated resources after scheduling:\t%s\n", qctx.AllocatedByPriority.AggregateByResource().CompactString())
fmt.Fprintf(w, "Total allocated resources after scheduling (by priority):\t%s\n", qctx.AllocatedByPriority.String())
fmt.Fprintf(w, "Number of jobs scheduled:\t%d\n", len(qctx.SuccessfulJobSchedulingContexts))
fmt.Fprintf(w, "Number of jobs preempted:\t%d\n", len(qctx.EvictedJobsById))
fmt.Fprintf(w, "Number of jobs that could not be scheduled:\t%d\n", len(qctx.UnsuccessfulJobSchedulingContexts))
if len(qctx.SuccessfulJobSchedulingContexts) > 0 {
jobIdsToPrint := maps.Keys(qctx.SuccessfulJobSchedulingContexts)
if verbosity <= 1 && len(jobIdsToPrint) > maxPrintedJobIdsByReason {
jobIdsToPrint = jobIdsToPrint[0:maxPrintedJobIdsByReason]
if len(jobIdsToPrint) > maxJobIdsToPrint {
jobIdsToPrint = jobIdsToPrint[0:maxJobIdsToPrint]
}
fmt.Fprintf(w, "Scheduled jobs:\t%v", jobIdsToPrint)
if len(jobIdsToPrint) != len(qctx.SuccessfulJobSchedulingContexts) {
Expand All @@ -373,8 +374,8 @@ func (qctx *QueueSchedulingContext) ReportString(verbosity int32) string {
}
if len(qctx.EvictedJobsById) > 0 {
jobIdsToPrint := maps.Keys(qctx.EvictedJobsById)
if verbosity <= 1 && len(jobIdsToPrint) > maxPrintedJobIdsByReason {
jobIdsToPrint = jobIdsToPrint[0:maxPrintedJobIdsByReason]
if len(jobIdsToPrint) > maxJobIdsToPrint {
jobIdsToPrint = jobIdsToPrint[0:maxJobIdsToPrint]
}
fmt.Fprintf(w, "Preempted jobs:\t%v", jobIdsToPrint)
if len(jobIdsToPrint) != len(qctx.EvictedJobsById) {
Expand Down Expand Up @@ -571,7 +572,7 @@ func (jctx *JobSchedulingContext) String() string {
var sb strings.Builder
w := tabwriter.NewWriter(&sb, 1, 1, 1, ' ', 0)
fmt.Fprintf(w, "Time:\t%s\n", jctx.Created)
fmt.Fprintf(w, "Job id:\t%s\n", jctx.JobId)
fmt.Fprintf(w, "Job ID:\t%s\n", jctx.JobId)
fmt.Fprintf(w, "Number of nodes in cluster:\t%d\n", jctx.NumNodes)
if jctx.UnschedulableReason != "" {
fmt.Fprintf(w, "UnschedulableReason:\t%s\n", jctx.UnschedulableReason)
Expand Down
238 changes: 142 additions & 96 deletions internal/scheduler/reports.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,49 +269,6 @@ func (repo *SchedulingContextRepository) addSchedulingContextForJobs(sctx *sched
return nil
}

func (repo *SchedulingContextRepository) getSchedulingReportStringForQueue(queue string, verbosity int32) string {
mostRecentByExecutor, _ := repo.GetMostRecentSchedulingContextByExecutorForQueue(queue)
mostRecentSuccessfulByExecutor, _ := repo.GetMostRecentSuccessfulSchedulingContextByExecutorForQueue(queue)
mostRecentPreemptingByExecutor, _ := repo.GetMostRecentPreemptingSchedulingContextByExecutorForQueue(queue)
sr := schedulingReport{
mostRecentByExecutor: mostRecentByExecutor,
mostRecentSuccessfulByExecutor: mostRecentSuccessfulByExecutor,
mostRecentPreemptingByExecutor: mostRecentPreemptingByExecutor,

sortedExecutorIds: repo.GetSortedExecutorIds(),
}
return sr.ReportString(verbosity)
}

func (repo *SchedulingContextRepository) getSchedulingReportStringForJob(jobId string, verbosity int32) string {
mostRecentByExecutor, _ := repo.GetMostRecentSchedulingContextByExecutorForJob(jobId)
var sb strings.Builder
w := tabwriter.NewWriter(&sb, 1, 1, 1, ' ', 0)
for _, executorId := range repo.GetSortedExecutorIds() {
fmt.Fprintf(w, "%s:\n", executorId)
sctx := mostRecentByExecutor[executorId]
if sctx != nil {
fmt.Fprint(w, indent.String("\t", "Most recent attempt:\n"))
fmt.Fprint(w, indent.String("\t\t", sctx.ReportString(verbosity)))
} else {
fmt.Fprint(w, indent.String("\t", "Most recent attempt: none\n"))
}
}
w.Flush()
return sb.String()
}

func (repo *SchedulingContextRepository) getSchedulingReportString(verbosity int32) string {
sr := schedulingReport{
mostRecentByExecutor: repo.GetMostRecentSchedulingContextByExecutor(),
mostRecentSuccessfulByExecutor: repo.GetMostRecentSuccessfulSchedulingContextByExecutor(),
mostRecentPreemptingByExecutor: repo.GetMostRecentPreemptingSchedulingContextByExecutor(),

sortedExecutorIds: repo.GetSortedExecutorIds(),
}
return sr.ReportString(verbosity)
}

// GetSchedulingReport is a gRPC endpoint for querying scheduler reports.
// TODO: Further separate this from internal contexts.
func (repo *SchedulingContextRepository) GetSchedulingReport(_ context.Context, request *schedulerobjects.SchedulingReportRequest) (*schedulerobjects.SchedulingReport, error) {
Expand All @@ -330,42 +287,147 @@ func (repo *SchedulingContextRepository) GetSchedulingReport(_ context.Context,
return &schedulerobjects.SchedulingReport{Report: report}, nil
}

type schedulingReport struct {
mostRecentByExecutor SchedulingContextByExecutor
mostRecentSuccessfulByExecutor SchedulingContextByExecutor
mostRecentPreemptingByExecutor SchedulingContextByExecutor

sortedExecutorIds []string
}

func (sr schedulingReport) ReportString(verbosity int32) string {
func (repo *SchedulingContextRepository) getSchedulingReportString(verbosity int32) string {
mostRecentByExecutor := repo.GetMostRecentSchedulingContextByExecutor()
mostRecentSuccessfulByExecutor := repo.GetMostRecentSuccessfulSchedulingContextByExecutor()
mostRecentPreemptingByExecutor := repo.GetMostRecentPreemptingSchedulingContextByExecutor()
var sb strings.Builder
w := tabwriter.NewWriter(&sb, 1, 1, 1, ' ', 0)
for _, executorId := range sr.sortedExecutorIds {
for _, executorId := range repo.GetSortedExecutorIds() {
fmt.Fprintf(w, "%s:\n", executorId)
if sctx := sr.mostRecentByExecutor[executorId]; sctx != nil {
fmt.Fprint(w, indent.String("\t", "Most recent attempt:\n"))
if sctx := mostRecentByExecutor[executorId]; sctx != nil {
fmt.Fprint(w, indent.String("\t", "Most recent scheduling round:\n"))
fmt.Fprint(w, indent.String("\t\t", sctx.ReportString(verbosity)))
} else {
fmt.Fprint(w, indent.String("\t", "Most recent attempt: none\n"))
fmt.Fprint(w, indent.String("\t", "Most recent scheduling round: none\n"))
}
if sctx := sr.mostRecentSuccessfulByExecutor[executorId]; sctx != nil {
fmt.Fprint(w, indent.String("\t", "Most recent successful attempt:\n"))
if sctx := mostRecentSuccessfulByExecutor[executorId]; sctx != nil {
fmt.Fprint(w, indent.String("\t", "Most recent scheduling round that scheduled a job:\n"))
fmt.Fprint(w, indent.String("\t\t", sctx.ReportString(verbosity)))
} else {
fmt.Fprint(w, indent.String("\t", "Most recent successful attempt: none\n"))
fmt.Fprint(w, indent.String("\t", "Most recent scheduling round that scheduled a job: none\n"))
}
if sctx := sr.mostRecentPreemptingByExecutor[executorId]; sctx != nil {
fmt.Fprint(w, indent.String("\t", "Most recent preempting attempt:\n"))
if sctx := mostRecentPreemptingByExecutor[executorId]; sctx != nil {
fmt.Fprint(w, indent.String("\t", "Most recent scheduling round that preempted a job:\n"))
fmt.Fprint(w, indent.String("\t\t", sctx.ReportString(verbosity)))
} else {
fmt.Fprint(w, indent.String("\t", "Most recent preempting attempt: none\n"))
fmt.Fprint(w, indent.String("\t", "Most recent scheduling round that preempted a job: none\n"))
}
}
w.Flush()
return sb.String()
}

func (repo *SchedulingContextRepository) getSchedulingReportStringForQueue(queue string, verbosity int32) string {
mostRecentByExecutor, _ := repo.GetMostRecentSchedulingContextByExecutorForQueue(queue)
mostRecentSuccessfulByExecutor, _ := repo.GetMostRecentSuccessfulSchedulingContextByExecutorForQueue(queue)
mostRecentPreemptingByExecutor, _ := repo.GetMostRecentPreemptingSchedulingContextByExecutorForQueue(queue)
var sb strings.Builder
w := tabwriter.NewWriter(&sb, 1, 1, 1, ' ', 0)
for _, executorId := range repo.GetSortedExecutorIds() {
fmt.Fprintf(w, "%s:\n", executorId)
if sctx := mostRecentByExecutor[executorId]; sctx != nil {
fmt.Fprintf(w, "\tMost recent scheduling round that considered queue %s:\n", queue)
sr := getSchedulingReportForQueue(sctx, queue)
fmt.Fprint(w, indent.String("\t\t", sr.ReportString(verbosity)))
} else {
fmt.Fprintf(w, "\tMost recent scheduling round that considered queue %s: none\n", queue)
}
if sctx := mostRecentSuccessfulByExecutor[executorId]; sctx != nil {
fmt.Fprintf(w, "\tMost recent scheduling round that scheduled a job from queue %s:\n", queue)
sr := getSchedulingReportForQueue(sctx, queue)
fmt.Fprint(w, indent.String("\t\t", sr.ReportString(verbosity)))
} else {
fmt.Fprintf(w, "\tMost recent scheduling round that scheduled a job from queue %s: none\n", queue)
}
if sctx := mostRecentPreemptingByExecutor[executorId]; sctx != nil {
fmt.Fprintf(w, "\tMost recent scheduling round that preempted a job from queue %s:\n", queue)
sr := getSchedulingReportForQueue(sctx, queue)
fmt.Fprint(w, indent.String("\t\t", sr.ReportString(verbosity)))
} else {
fmt.Fprintf(w, "\tMost recent scheduling round that preempted a job from queue %s: none\n", queue)
}
}
w.Flush()
return sb.String()
}

func (repo *SchedulingContextRepository) getSchedulingReportStringForJob(jobId string, verbosity int32) string {
mostRecentByExecutor, _ := repo.GetMostRecentSchedulingContextByExecutorForJob(jobId)
var sb strings.Builder
w := tabwriter.NewWriter(&sb, 1, 1, 1, ' ', 0)
for _, executorId := range repo.GetSortedExecutorIds() {
fmt.Fprintf(w, "%s:\n", executorId)
if sctx := mostRecentByExecutor[executorId]; sctx != nil {
fmt.Fprintf(w, "\tMost recent scheduling round that affected job %s:\n", jobId)
sr := getSchedulingReportForJob(sctx, jobId)
fmt.Fprint(w, indent.String("\t\t", sr.ReportString(verbosity)))
} else {
fmt.Fprintf(w, "\tMost recent scheduling round that affected job %s: none\n", jobId)
}
}
w.Flush()
return sb.String()
}

type schedulingReport struct {
schedulingContext *schedulercontext.SchedulingContext
queueSchedulingContext *schedulercontext.QueueSchedulingContext
jobSchedulingContext *schedulercontext.JobSchedulingContext
}

func (sr schedulingReport) ReportString(verbosity int32) string {
var sb strings.Builder
w := tabwriter.NewWriter(&sb, 1, 1, 1, ' ', 0)
if sctx := sr.schedulingContext; sctx != nil {
fmt.Fprint(w, "Overall scheduling report:\n")
fmt.Fprint(w, indent.String("\t", sctx.ReportString(verbosity)))
}
if qctx := sr.queueSchedulingContext; qctx != nil {
fmt.Fprintf(w, "Scheduling report for queue %s:\n", qctx.Queue)
fmt.Fprint(w, indent.String("\t", qctx.ReportString(verbosity)))
}
if jctx := sr.jobSchedulingContext; jctx != nil {
fmt.Fprintf(w, "Scheduling report for job %s:\n", jctx.JobId)
fmt.Fprint(w, indent.String("\t", jctx.String()))
}
w.Flush()
return sb.String()
}

func getSchedulingReportForQueue(sctx *schedulercontext.SchedulingContext, queue string) (sr schedulingReport) {
sr.schedulingContext = sctx
if sctx == nil {
return
}
sr.queueSchedulingContext = sctx.QueueSchedulingContexts[queue]
return
}

func getSchedulingReportForJob(sctx *schedulercontext.SchedulingContext, jobId string) (sr schedulingReport) {
sr.schedulingContext = sctx
if sctx == nil {
return
}
for _, qctx := range sctx.QueueSchedulingContexts {
for _, jctx := range qctx.SuccessfulJobSchedulingContexts {
if jctx.JobId == jobId {
sr.queueSchedulingContext = qctx
sr.jobSchedulingContext = jctx
return
}
}
for _, jctx := range qctx.UnsuccessfulJobSchedulingContexts {
if jctx.JobId == jobId {
sr.queueSchedulingContext = qctx
sr.jobSchedulingContext = jctx
return
}
}
}
return
}

// GetQueueReport is a gRPC endpoint for querying queue reports.
// TODO: Further separate this from internal contexts.
func (repo *SchedulingContextRepository) GetQueueReport(_ context.Context, request *schedulerobjects.QueueReportRequest) (*schedulerobjects.QueueReport, error) {
Expand All @@ -377,34 +439,30 @@ func (repo *SchedulingContextRepository) GetQueueReport(_ context.Context, reque
}

func (repo *SchedulingContextRepository) getQueueReportString(queue string, verbosity int32) string {
var sb strings.Builder
w := tabwriter.NewWriter(&sb, 1, 1, 1, ' ', 0)
sortedExecutorIds := repo.GetSortedExecutorIds()
mostRecentByExecutor, _ := repo.GetMostRecentSchedulingContextByExecutorForQueue(queue)
mostRecentSuccessfulByExecutor, _ := repo.GetMostRecentSuccessfulSchedulingContextByExecutorForQueue(queue)
mostRecentPreemptingByExecutor, _ := repo.GetMostRecentPreemptingSchedulingContextByExecutorForQueue(queue)
for _, executorId := range sortedExecutorIds {
var sb strings.Builder
w := tabwriter.NewWriter(&sb, 1, 1, 1, ' ', 0)
for _, executorId := range repo.GetSortedExecutorIds() {
fmt.Fprintf(w, "%s:\n", executorId)
if sctx := mostRecentByExecutor[executorId]; sctx != nil {
fmt.Fprint(w, indent.String("\t", "Most recent attempt:\n"))
qctx := sctx.QueueSchedulingContexts[queue]
fmt.Fprint(w, indent.String("\t\t", qctx.ReportString(verbosity)))
if sr := getSchedulingReportForQueue(mostRecentByExecutor[executorId], queue); sr.queueSchedulingContext != nil {
fmt.Fprintf(w, "\tMost recent scheduling round that considered queue %s:\n", queue)
fmt.Fprint(w, indent.String("\t\t", sr.queueSchedulingContext.ReportString(verbosity)))
} else {
fmt.Fprint(w, indent.String("\t", "Most recent attempt: none\n"))
fmt.Fprintf(w, "\tMost recent scheduling round that considered queue %s: none\n", queue)
}
if sctx := mostRecentSuccessfulByExecutor[executorId]; sctx != nil {
fmt.Fprint(w, indent.String("\t", "Most recent successful attempt:\n"))
qctx := sctx.QueueSchedulingContexts[queue]
fmt.Fprint(w, indent.String("\t\t", qctx.ReportString(verbosity)))
if sr := getSchedulingReportForQueue(mostRecentSuccessfulByExecutor[executorId], queue); sr.queueSchedulingContext != nil {
fmt.Fprintf(w, "\tMost recent scheduling round that scheduled a job from queue %s:\n", queue)
fmt.Fprint(w, indent.String("\t\t", sr.queueSchedulingContext.ReportString(verbosity)))
} else {
fmt.Fprint(w, indent.String("\t", "Most recent successful attempt: none\n"))
fmt.Fprintf(w, "\tMost recent scheduling round that scheduled a job from queue %s: none\n", queue)
}
if sctx := mostRecentPreemptingByExecutor[executorId]; sctx != nil {
fmt.Fprint(w, indent.String("\t", "Most recent preempting attempt:\n"))
qctx := sctx.QueueSchedulingContexts[queue]
fmt.Fprint(w, indent.String("\t\t", qctx.ReportString(verbosity)))
if sr := getSchedulingReportForQueue(mostRecentPreemptingByExecutor[executorId], queue); sr.queueSchedulingContext != nil {
fmt.Fprintf(w, "\tMost recent scheduling round that preempted a job from queue %s:\n", queue)
fmt.Fprint(w, indent.String("\t\t", sr.queueSchedulingContext.ReportString(verbosity)))
} else {
fmt.Fprint(w, indent.String("\t", "Most recent preempting attempt: none\n"))
fmt.Fprintf(w, "\tMost recent scheduling round that preempted a job from queue %s: none\n", queue)
}
}
w.Flush()
Expand Down Expand Up @@ -432,23 +490,11 @@ func (repo *SchedulingContextRepository) getJobReportString(jobId string) string
var sb strings.Builder
w := tabwriter.NewWriter(&sb, 1, 1, 1, ' ', 0)
for _, executorId := range repo.GetSortedExecutorIds() {
sctx := byExecutor[executorId]
var jctx *schedulercontext.JobSchedulingContext
if sctx != nil {
for _, qctx := range sctx.QueueSchedulingContexts {
if jctx, _ = qctx.SuccessfulJobSchedulingContexts[jobId]; jctx != nil {
break
}
if jctx, _ = qctx.UnsuccessfulJobSchedulingContexts[jobId]; jctx != nil {
break
}
}
}
if jctx != nil {
if sr := getSchedulingReportForJob(byExecutor[executorId], jobId); sr.jobSchedulingContext != nil {
fmt.Fprintf(w, "%s:\n", executorId)
fmt.Fprint(w, indent.String("\t", jctx.String()))
fmt.Fprint(w, indent.String("\t", sr.jobSchedulingContext.String()))
} else {
fmt.Fprintf(w, "%s: no recent attempt\n", executorId)
fmt.Fprintf(w, "%s: no recent scheduling round that affected job %s\n", executorId, jobId)
}
}
w.Flush()
Expand Down

0 comments on commit b52963a

Please sign in to comment.