From b52963a08d7d798072741f0f7a33b8354b7742f6 Mon Sep 17 00:00:00 2001 From: Noah Held <41909795+zuqq@users.noreply.github.com> Date: Mon, 19 Jun 2023 15:47:35 +0100 Subject: [PATCH] Also print queue and job scheduling reports with `--queue` and `--job` (#2586) --- internal/scheduler/context/context.go | 23 +-- internal/scheduler/reports.go | 238 +++++++++++++++----------- 2 files changed, 154 insertions(+), 107 deletions(-) diff --git a/internal/scheduler/context/context.go b/internal/scheduler/context/context.go index 88369f89a12..40902b4eb00 100644 --- a/internal/scheduler/context/context.go +++ b/internal/scheduler/context/context.go @@ -167,7 +167,7 @@ func (sctx *SchedulingContext) ReportString(verbosity int32) string { fmt.Fprint(w, "Scheduled queues:\n") for queueName, qctx := range scheduled { fmt.Fprintf(w, "\t%s:\n", queueName) - fmt.Fprintf(w, indent.String("\t\t", qctx.ReportString(verbosity-1))) + fmt.Fprintf(w, indent.String("\t\t", qctx.ReportString(verbosity-2))) } } preempted := armadamaps.Filter( @@ -182,7 +182,7 @@ func (sctx *SchedulingContext) ReportString(verbosity int32) string { fmt.Fprint(w, "Preempted queues:\n") for queueName, qctx := range preempted { fmt.Fprintf(w, "\t%s:\n", queueName) - fmt.Fprintf(w, indent.String("\t\t", qctx.ReportString(verbosity-1))) + fmt.Fprintf(w, indent.String("\t\t", qctx.ReportString(verbosity-2))) } } w.Flush() @@ -341,19 +341,20 @@ func (qctx *QueueSchedulingContext) String() string { return qctx.ReportString(0) } -const maxPrintedJobIdsByReason = 1 +const maxJobIdsToPrint = 1 func (qctx *QueueSchedulingContext) ReportString(verbosity int32) string { var sb strings.Builder w := tabwriter.NewWriter(&sb, 1, 1, 1, ' ', 0) - if verbosity > 0 { - fmt.Fprintf(w, "Created:\t%s\n", qctx.Created) + if verbosity >= 0 { + fmt.Fprintf(w, "Time:\t%s\n", qctx.Created) + fmt.Fprintf(w, "Queue:\t%s\n", qctx.Queue) } fmt.Fprintf(w, "Scheduled resources:\t%s\n", qctx.ScheduledResourcesByPriority.AggregateByResource().CompactString()) fmt.Fprintf(w, "Scheduled resources (by priority):\t%s\n", qctx.ScheduledResourcesByPriority.String()) fmt.Fprintf(w, "Preempted resources:\t%s\n", qctx.EvictedResourcesByPriority.AggregateByResource().CompactString()) fmt.Fprintf(w, "Preempted resources (by priority):\t%s\n", qctx.EvictedResourcesByPriority.String()) - if verbosity > 0 { + if verbosity >= 0 { fmt.Fprintf(w, "Total allocated resources after scheduling:\t%s\n", qctx.AllocatedByPriority.AggregateByResource().CompactString()) fmt.Fprintf(w, "Total allocated resources after scheduling (by priority):\t%s\n", qctx.AllocatedByPriority.String()) fmt.Fprintf(w, "Number of jobs scheduled:\t%d\n", len(qctx.SuccessfulJobSchedulingContexts)) @@ -361,8 +362,8 @@ func (qctx *QueueSchedulingContext) ReportString(verbosity int32) string { fmt.Fprintf(w, "Number of jobs that could not be scheduled:\t%d\n", len(qctx.UnsuccessfulJobSchedulingContexts)) if len(qctx.SuccessfulJobSchedulingContexts) > 0 { jobIdsToPrint := maps.Keys(qctx.SuccessfulJobSchedulingContexts) - if verbosity <= 1 && len(jobIdsToPrint) > maxPrintedJobIdsByReason { - jobIdsToPrint = jobIdsToPrint[0:maxPrintedJobIdsByReason] + if len(jobIdsToPrint) > maxJobIdsToPrint { + jobIdsToPrint = jobIdsToPrint[0:maxJobIdsToPrint] } fmt.Fprintf(w, "Scheduled jobs:\t%v", jobIdsToPrint) if len(jobIdsToPrint) != len(qctx.SuccessfulJobSchedulingContexts) { @@ -373,8 +374,8 @@ func (qctx *QueueSchedulingContext) ReportString(verbosity int32) string { } if len(qctx.EvictedJobsById) > 0 { jobIdsToPrint := maps.Keys(qctx.EvictedJobsById) - if verbosity <= 1 && len(jobIdsToPrint) > maxPrintedJobIdsByReason { - jobIdsToPrint = jobIdsToPrint[0:maxPrintedJobIdsByReason] + if len(jobIdsToPrint) > maxJobIdsToPrint { + jobIdsToPrint = jobIdsToPrint[0:maxJobIdsToPrint] } fmt.Fprintf(w, "Preempted jobs:\t%v", jobIdsToPrint) if len(jobIdsToPrint) != len(qctx.EvictedJobsById) { @@ -571,7 +572,7 @@ func (jctx *JobSchedulingContext) String() string { var sb strings.Builder w := tabwriter.NewWriter(&sb, 1, 1, 1, ' ', 0) fmt.Fprintf(w, "Time:\t%s\n", jctx.Created) - fmt.Fprintf(w, "Job id:\t%s\n", jctx.JobId) + fmt.Fprintf(w, "Job ID:\t%s\n", jctx.JobId) fmt.Fprintf(w, "Number of nodes in cluster:\t%d\n", jctx.NumNodes) if jctx.UnschedulableReason != "" { fmt.Fprintf(w, "UnschedulableReason:\t%s\n", jctx.UnschedulableReason) diff --git a/internal/scheduler/reports.go b/internal/scheduler/reports.go index 5ed93ccf915..0fd5415de4d 100644 --- a/internal/scheduler/reports.go +++ b/internal/scheduler/reports.go @@ -269,49 +269,6 @@ func (repo *SchedulingContextRepository) addSchedulingContextForJobs(sctx *sched return nil } -func (repo *SchedulingContextRepository) getSchedulingReportStringForQueue(queue string, verbosity int32) string { - mostRecentByExecutor, _ := repo.GetMostRecentSchedulingContextByExecutorForQueue(queue) - mostRecentSuccessfulByExecutor, _ := repo.GetMostRecentSuccessfulSchedulingContextByExecutorForQueue(queue) - mostRecentPreemptingByExecutor, _ := repo.GetMostRecentPreemptingSchedulingContextByExecutorForQueue(queue) - sr := schedulingReport{ - mostRecentByExecutor: mostRecentByExecutor, - mostRecentSuccessfulByExecutor: mostRecentSuccessfulByExecutor, - mostRecentPreemptingByExecutor: mostRecentPreemptingByExecutor, - - sortedExecutorIds: repo.GetSortedExecutorIds(), - } - return sr.ReportString(verbosity) -} - -func (repo *SchedulingContextRepository) getSchedulingReportStringForJob(jobId string, verbosity int32) string { - mostRecentByExecutor, _ := repo.GetMostRecentSchedulingContextByExecutorForJob(jobId) - var sb strings.Builder - w := tabwriter.NewWriter(&sb, 1, 1, 1, ' ', 0) - for _, executorId := range repo.GetSortedExecutorIds() { - fmt.Fprintf(w, "%s:\n", executorId) - sctx := mostRecentByExecutor[executorId] - if sctx != nil { - fmt.Fprint(w, indent.String("\t", "Most recent attempt:\n")) - fmt.Fprint(w, indent.String("\t\t", sctx.ReportString(verbosity))) - } else { - fmt.Fprint(w, indent.String("\t", "Most recent attempt: none\n")) - } - } - w.Flush() - return sb.String() -} - -func (repo *SchedulingContextRepository) getSchedulingReportString(verbosity int32) string { - sr := schedulingReport{ - mostRecentByExecutor: repo.GetMostRecentSchedulingContextByExecutor(), - mostRecentSuccessfulByExecutor: repo.GetMostRecentSuccessfulSchedulingContextByExecutor(), - mostRecentPreemptingByExecutor: repo.GetMostRecentPreemptingSchedulingContextByExecutor(), - - sortedExecutorIds: repo.GetSortedExecutorIds(), - } - return sr.ReportString(verbosity) -} - // GetSchedulingReport is a gRPC endpoint for querying scheduler reports. // TODO: Further separate this from internal contexts. func (repo *SchedulingContextRepository) GetSchedulingReport(_ context.Context, request *schedulerobjects.SchedulingReportRequest) (*schedulerobjects.SchedulingReport, error) { @@ -330,42 +287,147 @@ func (repo *SchedulingContextRepository) GetSchedulingReport(_ context.Context, return &schedulerobjects.SchedulingReport{Report: report}, nil } -type schedulingReport struct { - mostRecentByExecutor SchedulingContextByExecutor - mostRecentSuccessfulByExecutor SchedulingContextByExecutor - mostRecentPreemptingByExecutor SchedulingContextByExecutor - - sortedExecutorIds []string -} - -func (sr schedulingReport) ReportString(verbosity int32) string { +func (repo *SchedulingContextRepository) getSchedulingReportString(verbosity int32) string { + mostRecentByExecutor := repo.GetMostRecentSchedulingContextByExecutor() + mostRecentSuccessfulByExecutor := repo.GetMostRecentSuccessfulSchedulingContextByExecutor() + mostRecentPreemptingByExecutor := repo.GetMostRecentPreemptingSchedulingContextByExecutor() var sb strings.Builder w := tabwriter.NewWriter(&sb, 1, 1, 1, ' ', 0) - for _, executorId := range sr.sortedExecutorIds { + for _, executorId := range repo.GetSortedExecutorIds() { fmt.Fprintf(w, "%s:\n", executorId) - if sctx := sr.mostRecentByExecutor[executorId]; sctx != nil { - fmt.Fprint(w, indent.String("\t", "Most recent attempt:\n")) + if sctx := mostRecentByExecutor[executorId]; sctx != nil { + fmt.Fprint(w, indent.String("\t", "Most recent scheduling round:\n")) fmt.Fprint(w, indent.String("\t\t", sctx.ReportString(verbosity))) } else { - fmt.Fprint(w, indent.String("\t", "Most recent attempt: none\n")) + fmt.Fprint(w, indent.String("\t", "Most recent scheduling round: none\n")) } - if sctx := sr.mostRecentSuccessfulByExecutor[executorId]; sctx != nil { - fmt.Fprint(w, indent.String("\t", "Most recent successful attempt:\n")) + if sctx := mostRecentSuccessfulByExecutor[executorId]; sctx != nil { + fmt.Fprint(w, indent.String("\t", "Most recent scheduling round that scheduled a job:\n")) fmt.Fprint(w, indent.String("\t\t", sctx.ReportString(verbosity))) } else { - fmt.Fprint(w, indent.String("\t", "Most recent successful attempt: none\n")) + fmt.Fprint(w, indent.String("\t", "Most recent scheduling round that scheduled a job: none\n")) } - if sctx := sr.mostRecentPreemptingByExecutor[executorId]; sctx != nil { - fmt.Fprint(w, indent.String("\t", "Most recent preempting attempt:\n")) + if sctx := mostRecentPreemptingByExecutor[executorId]; sctx != nil { + fmt.Fprint(w, indent.String("\t", "Most recent scheduling round that preempted a job:\n")) fmt.Fprint(w, indent.String("\t\t", sctx.ReportString(verbosity))) } else { - fmt.Fprint(w, indent.String("\t", "Most recent preempting attempt: none\n")) + fmt.Fprint(w, indent.String("\t", "Most recent scheduling round that preempted a job: none\n")) + } + } + w.Flush() + return sb.String() +} + +func (repo *SchedulingContextRepository) getSchedulingReportStringForQueue(queue string, verbosity int32) string { + mostRecentByExecutor, _ := repo.GetMostRecentSchedulingContextByExecutorForQueue(queue) + mostRecentSuccessfulByExecutor, _ := repo.GetMostRecentSuccessfulSchedulingContextByExecutorForQueue(queue) + mostRecentPreemptingByExecutor, _ := repo.GetMostRecentPreemptingSchedulingContextByExecutorForQueue(queue) + var sb strings.Builder + w := tabwriter.NewWriter(&sb, 1, 1, 1, ' ', 0) + for _, executorId := range repo.GetSortedExecutorIds() { + fmt.Fprintf(w, "%s:\n", executorId) + if sctx := mostRecentByExecutor[executorId]; sctx != nil { + fmt.Fprintf(w, "\tMost recent scheduling round that considered queue %s:\n", queue) + sr := getSchedulingReportForQueue(sctx, queue) + fmt.Fprint(w, indent.String("\t\t", sr.ReportString(verbosity))) + } else { + fmt.Fprintf(w, "\tMost recent scheduling round that considered queue %s: none\n", queue) + } + if sctx := mostRecentSuccessfulByExecutor[executorId]; sctx != nil { + fmt.Fprintf(w, "\tMost recent scheduling round that scheduled a job from queue %s:\n", queue) + sr := getSchedulingReportForQueue(sctx, queue) + fmt.Fprint(w, indent.String("\t\t", sr.ReportString(verbosity))) + } else { + fmt.Fprintf(w, "\tMost recent scheduling round that scheduled a job from queue %s: none\n", queue) + } + if sctx := mostRecentPreemptingByExecutor[executorId]; sctx != nil { + fmt.Fprintf(w, "\tMost recent scheduling round that preempted a job from queue %s:\n", queue) + sr := getSchedulingReportForQueue(sctx, queue) + fmt.Fprint(w, indent.String("\t\t", sr.ReportString(verbosity))) + } else { + fmt.Fprintf(w, "\tMost recent scheduling round that preempted a job from queue %s: none\n", queue) + } + } + w.Flush() + return sb.String() +} + +func (repo *SchedulingContextRepository) getSchedulingReportStringForJob(jobId string, verbosity int32) string { + mostRecentByExecutor, _ := repo.GetMostRecentSchedulingContextByExecutorForJob(jobId) + var sb strings.Builder + w := tabwriter.NewWriter(&sb, 1, 1, 1, ' ', 0) + for _, executorId := range repo.GetSortedExecutorIds() { + fmt.Fprintf(w, "%s:\n", executorId) + if sctx := mostRecentByExecutor[executorId]; sctx != nil { + fmt.Fprintf(w, "\tMost recent scheduling round that affected job %s:\n", jobId) + sr := getSchedulingReportForJob(sctx, jobId) + fmt.Fprint(w, indent.String("\t\t", sr.ReportString(verbosity))) + } else { + fmt.Fprintf(w, "\tMost recent scheduling round that affected job %s: none\n", jobId) } } w.Flush() return sb.String() } +type schedulingReport struct { + schedulingContext *schedulercontext.SchedulingContext + queueSchedulingContext *schedulercontext.QueueSchedulingContext + jobSchedulingContext *schedulercontext.JobSchedulingContext +} + +func (sr schedulingReport) ReportString(verbosity int32) string { + var sb strings.Builder + w := tabwriter.NewWriter(&sb, 1, 1, 1, ' ', 0) + if sctx := sr.schedulingContext; sctx != nil { + fmt.Fprint(w, "Overall scheduling report:\n") + fmt.Fprint(w, indent.String("\t", sctx.ReportString(verbosity))) + } + if qctx := sr.queueSchedulingContext; qctx != nil { + fmt.Fprintf(w, "Scheduling report for queue %s:\n", qctx.Queue) + fmt.Fprint(w, indent.String("\t", qctx.ReportString(verbosity))) + } + if jctx := sr.jobSchedulingContext; jctx != nil { + fmt.Fprintf(w, "Scheduling report for job %s:\n", jctx.JobId) + fmt.Fprint(w, indent.String("\t", jctx.String())) + } + w.Flush() + return sb.String() +} + +func getSchedulingReportForQueue(sctx *schedulercontext.SchedulingContext, queue string) (sr schedulingReport) { + sr.schedulingContext = sctx + if sctx == nil { + return + } + sr.queueSchedulingContext = sctx.QueueSchedulingContexts[queue] + return +} + +func getSchedulingReportForJob(sctx *schedulercontext.SchedulingContext, jobId string) (sr schedulingReport) { + sr.schedulingContext = sctx + if sctx == nil { + return + } + for _, qctx := range sctx.QueueSchedulingContexts { + for _, jctx := range qctx.SuccessfulJobSchedulingContexts { + if jctx.JobId == jobId { + sr.queueSchedulingContext = qctx + sr.jobSchedulingContext = jctx + return + } + } + for _, jctx := range qctx.UnsuccessfulJobSchedulingContexts { + if jctx.JobId == jobId { + sr.queueSchedulingContext = qctx + sr.jobSchedulingContext = jctx + return + } + } + } + return +} + // GetQueueReport is a gRPC endpoint for querying queue reports. // TODO: Further separate this from internal contexts. func (repo *SchedulingContextRepository) GetQueueReport(_ context.Context, request *schedulerobjects.QueueReportRequest) (*schedulerobjects.QueueReport, error) { @@ -377,34 +439,30 @@ func (repo *SchedulingContextRepository) GetQueueReport(_ context.Context, reque } func (repo *SchedulingContextRepository) getQueueReportString(queue string, verbosity int32) string { - var sb strings.Builder - w := tabwriter.NewWriter(&sb, 1, 1, 1, ' ', 0) - sortedExecutorIds := repo.GetSortedExecutorIds() mostRecentByExecutor, _ := repo.GetMostRecentSchedulingContextByExecutorForQueue(queue) mostRecentSuccessfulByExecutor, _ := repo.GetMostRecentSuccessfulSchedulingContextByExecutorForQueue(queue) mostRecentPreemptingByExecutor, _ := repo.GetMostRecentPreemptingSchedulingContextByExecutorForQueue(queue) - for _, executorId := range sortedExecutorIds { + var sb strings.Builder + w := tabwriter.NewWriter(&sb, 1, 1, 1, ' ', 0) + for _, executorId := range repo.GetSortedExecutorIds() { fmt.Fprintf(w, "%s:\n", executorId) - if sctx := mostRecentByExecutor[executorId]; sctx != nil { - fmt.Fprint(w, indent.String("\t", "Most recent attempt:\n")) - qctx := sctx.QueueSchedulingContexts[queue] - fmt.Fprint(w, indent.String("\t\t", qctx.ReportString(verbosity))) + if sr := getSchedulingReportForQueue(mostRecentByExecutor[executorId], queue); sr.queueSchedulingContext != nil { + fmt.Fprintf(w, "\tMost recent scheduling round that considered queue %s:\n", queue) + fmt.Fprint(w, indent.String("\t\t", sr.queueSchedulingContext.ReportString(verbosity))) } else { - fmt.Fprint(w, indent.String("\t", "Most recent attempt: none\n")) + fmt.Fprintf(w, "\tMost recent scheduling round that considered queue %s: none\n", queue) } - if sctx := mostRecentSuccessfulByExecutor[executorId]; sctx != nil { - fmt.Fprint(w, indent.String("\t", "Most recent successful attempt:\n")) - qctx := sctx.QueueSchedulingContexts[queue] - fmt.Fprint(w, indent.String("\t\t", qctx.ReportString(verbosity))) + if sr := getSchedulingReportForQueue(mostRecentSuccessfulByExecutor[executorId], queue); sr.queueSchedulingContext != nil { + fmt.Fprintf(w, "\tMost recent scheduling round that scheduled a job from queue %s:\n", queue) + fmt.Fprint(w, indent.String("\t\t", sr.queueSchedulingContext.ReportString(verbosity))) } else { - fmt.Fprint(w, indent.String("\t", "Most recent successful attempt: none\n")) + fmt.Fprintf(w, "\tMost recent scheduling round that scheduled a job from queue %s: none\n", queue) } - if sctx := mostRecentPreemptingByExecutor[executorId]; sctx != nil { - fmt.Fprint(w, indent.String("\t", "Most recent preempting attempt:\n")) - qctx := sctx.QueueSchedulingContexts[queue] - fmt.Fprint(w, indent.String("\t\t", qctx.ReportString(verbosity))) + if sr := getSchedulingReportForQueue(mostRecentPreemptingByExecutor[executorId], queue); sr.queueSchedulingContext != nil { + fmt.Fprintf(w, "\tMost recent scheduling round that preempted a job from queue %s:\n", queue) + fmt.Fprint(w, indent.String("\t\t", sr.queueSchedulingContext.ReportString(verbosity))) } else { - fmt.Fprint(w, indent.String("\t", "Most recent preempting attempt: none\n")) + fmt.Fprintf(w, "\tMost recent scheduling round that preempted a job from queue %s: none\n", queue) } } w.Flush() @@ -432,23 +490,11 @@ func (repo *SchedulingContextRepository) getJobReportString(jobId string) string var sb strings.Builder w := tabwriter.NewWriter(&sb, 1, 1, 1, ' ', 0) for _, executorId := range repo.GetSortedExecutorIds() { - sctx := byExecutor[executorId] - var jctx *schedulercontext.JobSchedulingContext - if sctx != nil { - for _, qctx := range sctx.QueueSchedulingContexts { - if jctx, _ = qctx.SuccessfulJobSchedulingContexts[jobId]; jctx != nil { - break - } - if jctx, _ = qctx.UnsuccessfulJobSchedulingContexts[jobId]; jctx != nil { - break - } - } - } - if jctx != nil { + if sr := getSchedulingReportForJob(byExecutor[executorId], jobId); sr.jobSchedulingContext != nil { fmt.Fprintf(w, "%s:\n", executorId) - fmt.Fprint(w, indent.String("\t", jctx.String())) + fmt.Fprint(w, indent.String("\t", sr.jobSchedulingContext.String())) } else { - fmt.Fprintf(w, "%s: no recent attempt\n", executorId) + fmt.Fprintf(w, "%s: no recent scheduling round that affected job %s\n", executorId, jobId) } } w.Flush()