diff --git a/internal/armada/server/lease.go b/internal/armada/server/lease.go index 2b9d7bb753e..fd47a7b3dfc 100644 --- a/internal/armada/server/lease.go +++ b/internal/armada/server/lease.go @@ -476,7 +476,7 @@ func (q *AggregatedQueueServer) getJobs(ctx *armadacontext.Context, req *api.Str defer cancel() } - var fairnessCostProvider fairness.FairnessCostProvider + var fairnessCostProvider fairness.CostProvider totalResources := schedulerobjects.ResourceList{Resources: totalCapacity} if q.schedulingConfig.FairnessModel == configuration.DominantResourceFairness { fairnessCostProvider, err = fairness.NewDominantResourceFairness( diff --git a/internal/common/util/tabbed_string_builder.go b/internal/common/util/tabbed_string_builder.go new file mode 100644 index 00000000000..079a08662a5 --- /dev/null +++ b/internal/common/util/tabbed_string_builder.go @@ -0,0 +1,47 @@ +package util + +import ( + "fmt" + "strings" + "text/tabwriter" +) + +// TabbedStringBuilder is a wrapper around a *tabwriter.Writer that allows for efficiently building +// tab-aligned strings. +// This exists as *tabwriter.Writer exposes a more complicated interface which returns errors to the +// caller, propagated for the underlying IOWriter. In this case we ensure that the underlying Writer is +// a strings.Builder. This never returns errors therefore we can provide a simpler interface where the caller +// doesn't need to consider error handling. +type TabbedStringBuilder struct { + sb *strings.Builder + writer *tabwriter.Writer +} + +// NewTabbedStringBuilder creates a new TabbedStringBuilder. All parameters are equivalent to those defined in tabwriter.NewWriter +func NewTabbedStringBuilder(minwidth, tabwidth, padding int, padchar byte, flags uint) *TabbedStringBuilder { + sb := &strings.Builder{} + return &TabbedStringBuilder{ + sb: sb, + writer: tabwriter.NewWriter(sb, minwidth, tabwidth, padding, padchar, flags), + } +} + +// Writef formats according to a format specifier and writes to the underlying writer +func (t *TabbedStringBuilder) Writef(format string, a ...any) { + // should be safe ignore the error here as strings.Builder never errors + _, _ = fmt.Fprintf(t.writer, format, a...) +} + +// Write the string to the underlying writer +func (t *TabbedStringBuilder) Write(a ...any) { + // should be safe ignore the error here as strings.Builder never errors + _, _ = fmt.Fprint(t.writer, a...) +} + +// String returns the accumulated string. +// Flush on the underlying writer is automatically called +func (t *TabbedStringBuilder) String() string { + // should be safe ignore the error here as strings.Builder never errors + _ = t.writer.Flush() + return t.sb.String() +} diff --git a/internal/common/util/tabbed_string_builder_test.go b/internal/common/util/tabbed_string_builder_test.go new file mode 100644 index 00000000000..76608e84bf4 --- /dev/null +++ b/internal/common/util/tabbed_string_builder_test.go @@ -0,0 +1,20 @@ +package util + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestTabbedStringBuilder_TestSimple(t *testing.T) { + w := NewTabbedStringBuilder(1, 1, 1, ' ', 0) + w.Writef("a:\t%s", "b") + assert.Equal(t, "a: b", w.String()) +} + +func TestTabbedStringBuilderWriter_TestComplex(t *testing.T) { + w := NewTabbedStringBuilder(1, 1, 1, ' ', 0) + w.Writef("a:\t%s\n", "b") + w.Writef("a:\t%.2f\t%d\n", 1.5, 2) + assert.Equal(t, "a: b\na: 1.50 2\n", w.String()) +} diff --git a/internal/scheduler/common.go b/internal/scheduler/common.go index 4b0bc6e2940..be1c2faf0af 100644 --- a/internal/scheduler/common.go +++ b/internal/scheduler/common.go @@ -62,7 +62,7 @@ func PreemptedJobsFromSchedulerResult[T interfaces.LegacySchedulerJob](sr *Sched return rv } -// ScheduledJobsFromScheduleResult returns the slice of scheduled jobs in the result, +// ScheduledJobsFromSchedulerResult returns the slice of scheduled jobs in the result, // cast to type T. func ScheduledJobsFromSchedulerResult[T interfaces.LegacySchedulerJob](sr *SchedulerResult) []T { rv := make([]T, len(sr.ScheduledJobs)) diff --git a/internal/scheduler/configuration/configuration.go b/internal/scheduler/configuration/configuration.go index 2b8fc3f40cc..6f630eefe07 100644 --- a/internal/scheduler/configuration/configuration.go +++ b/internal/scheduler/configuration/configuration.go @@ -22,7 +22,7 @@ const ( type Configuration struct { // Database configuration Postgres configuration.PostgresConfig - // Redis Comnfig + // Redis Config Redis config.RedisConfig // General Pulsar configuration Pulsar configuration.PulsarConfig diff --git a/internal/scheduler/constraints/constraints.go b/internal/scheduler/constraints/constraints.go index b859d71bca1..f43310ad957 100644 --- a/internal/scheduler/constraints/constraints.go +++ b/internal/scheduler/constraints/constraints.go @@ -13,7 +13,7 @@ import ( ) const ( - // Indicates that the limit on resources scheduled per round has been exceeded. + // Indicates that the limit on resources scheduled per round has been exceeded. MaximumResourcesScheduledUnschedulableReason = "maximum resources scheduled" // Indicates that a queue has been assigned more than its allowed amount of resources. @@ -120,7 +120,7 @@ func ScaleQuantity(q resource.Quantity, f float64) resource.Quantity { return q } -func (constraints *SchedulingConstraints) CheckRoundConstraints(sctx *schedulercontext.SchedulingContext, queue string) (bool, string, error) { +func (constraints *SchedulingConstraints) CheckRoundConstraints(sctx *schedulercontext.SchedulingContext) (bool, string, error) { // MaximumResourcesToSchedule check. if !sctx.ScheduledResources.IsStrictlyLessOrEqual(constraints.MaximumResourcesToSchedule) { return false, MaximumResourcesScheduledUnschedulableReason, nil diff --git a/internal/scheduler/constraints/constraints_test.go b/internal/scheduler/constraints/constraints_test.go index 081058191dc..e76ee38e1b3 100644 --- a/internal/scheduler/constraints/constraints_test.go +++ b/internal/scheduler/constraints/constraints_test.go @@ -21,7 +21,7 @@ func TestConstraints(t *testing.T) { }{} // TODO: Add tests. for name, tc := range tests { t.Run(name, func(t *testing.T) { - ok, unschedulableReason, err := tc.constraints.CheckRoundConstraints(tc.sctx, tc.queue) + ok, unschedulableReason, err := tc.constraints.CheckRoundConstraints(tc.sctx) require.NoError(t, err) require.Equal(t, tc.globalUnschedulableReason == "", ok) require.Equal(t, tc.globalUnschedulableReason, unschedulableReason) diff --git a/internal/scheduler/context/context.go b/internal/scheduler/context/context.go index 29c12bbc374..c142c0ea9fa 100644 --- a/internal/scheduler/context/context.go +++ b/internal/scheduler/context/context.go @@ -2,8 +2,6 @@ package context import ( "fmt" - "strings" - "text/tabwriter" "time" "github.com/openconfig/goyang/pkg/indent" @@ -17,6 +15,7 @@ import ( armadamaps "github.com/armadaproject/armada/internal/common/maps" armadaslices "github.com/armadaproject/armada/internal/common/slices" "github.com/armadaproject/armada/internal/common/types" + "github.com/armadaproject/armada/internal/common/util" schedulerconfig "github.com/armadaproject/armada/internal/scheduler/configuration" "github.com/armadaproject/armada/internal/scheduler/fairness" "github.com/armadaproject/armada/internal/scheduler/interfaces" @@ -38,7 +37,7 @@ type SchedulingContext struct { // Default priority class. DefaultPriorityClass string // Determines how fairness is computed. - FairnessCostProvider fairness.FairnessCostProvider + FairnessCostProvider fairness.CostProvider // Limits job scheduling rate globally across all queues. // Use the "Started" time to ensure limiter state remains constant within each scheduling round. Limiter *rate.Limiter @@ -66,7 +65,7 @@ type SchedulingContext struct { // Used to efficiently generate scheduling keys. SchedulingKeyGenerator *schedulerobjects.SchedulingKeyGenerator // Record of job scheduling requirements known to be unfeasible. - // Used to immediately reject new jobs with identical reqirements. + // Used to immediately reject new jobs with identical requirements. // Maps to the JobSchedulingContext of a previous job attempted to schedule with the same key. UnfeasibleSchedulingKeys map[schedulerobjects.SchedulingKey]*JobSchedulingContext } @@ -76,7 +75,7 @@ func NewSchedulingContext( pool string, priorityClasses map[string]types.PriorityClass, defaultPriorityClass string, - fairnessCostProvider fairness.FairnessCostProvider, + fairnessCostProvider fairness.CostProvider, limiter *rate.Limiter, totalResources schedulerobjects.ResourceList, ) *SchedulingContext { @@ -177,18 +176,17 @@ func (sctx *SchedulingContext) TotalCost() float64 { } func (sctx *SchedulingContext) ReportString(verbosity int32) string { - var sb strings.Builder - w := tabwriter.NewWriter(&sb, 1, 1, 1, ' ', 0) - fmt.Fprintf(w, "Started:\t%s\n", sctx.Started) - fmt.Fprintf(w, "Finished:\t%s\n", sctx.Finished) - fmt.Fprintf(w, "Duration:\t%s\n", sctx.Finished.Sub(sctx.Started)) - fmt.Fprintf(w, "Termination reason:\t%s\n", sctx.TerminationReason) - fmt.Fprintf(w, "Total capacity:\t%s\n", sctx.TotalResources.CompactString()) - fmt.Fprintf(w, "Scheduled resources:\t%s\n", sctx.ScheduledResources.CompactString()) - fmt.Fprintf(w, "Preempted resources:\t%s\n", sctx.EvictedResources.CompactString()) - fmt.Fprintf(w, "Number of gangs scheduled:\t%d\n", sctx.NumScheduledGangs) - fmt.Fprintf(w, "Number of jobs scheduled:\t%d\n", sctx.NumScheduledJobs) - fmt.Fprintf(w, "Number of jobs preempted:\t%d\n", sctx.NumEvictedJobs) + w := util.NewTabbedStringBuilder(1, 1, 1, ' ', 0) + w.Writef("Started:\t%s\n", sctx.Started) + w.Writef("Finished:\t%s\n", sctx.Finished) + w.Writef("Duration:\t%s\n", sctx.Finished.Sub(sctx.Started)) + w.Writef("Termination reason:\t%s\n", sctx.TerminationReason) + w.Writef("Total capacity:\t%s\n", sctx.TotalResources.CompactString()) + w.Writef("Scheduled resources:\t%s\n", sctx.ScheduledResources.CompactString()) + w.Writef("Preempted resources:\t%s\n", sctx.EvictedResources.CompactString()) + w.Writef("Number of gangs scheduled:\t%d\n", sctx.NumScheduledGangs) + w.Writef("Number of jobs scheduled:\t%d\n", sctx.NumScheduledJobs) + w.Writef("Number of jobs preempted:\t%d\n", sctx.NumEvictedJobs) scheduled := armadamaps.Filter( sctx.QueueSchedulingContexts, func(_ string, qctx *QueueSchedulingContext) bool { @@ -196,12 +194,12 @@ func (sctx *SchedulingContext) ReportString(verbosity int32) string { }, ) if verbosity <= 0 { - fmt.Fprintf(w, "Scheduled queues:\t%v\n", maps.Keys(scheduled)) + w.Writef("Scheduled queues:\t%v\n", maps.Keys(scheduled)) } else { - fmt.Fprint(w, "Scheduled queues:\n") + w.Write("Scheduled queues:\n") for queueName, qctx := range scheduled { - fmt.Fprintf(w, "\t%s:\n", queueName) - fmt.Fprint(w, indent.String("\t\t", qctx.ReportString(verbosity-2))) + w.Writef("\t%s:\n", queueName) + w.Write(indent.String("\t\t", qctx.ReportString(verbosity-2))) } } preempted := armadamaps.Filter( @@ -211,16 +209,15 @@ func (sctx *SchedulingContext) ReportString(verbosity int32) string { }, ) if verbosity <= 0 { - fmt.Fprintf(w, "Preempted queues:\t%v\n", maps.Keys(preempted)) + w.Writef("Preempted queues:\t%v\n", maps.Keys(preempted)) } else { - fmt.Fprint(w, "Preempted queues:\n") + w.Write("Preempted queues:\n") for queueName, qctx := range preempted { - fmt.Fprintf(w, "\t%s:\n", queueName) - fmt.Fprint(w, indent.String("\t\t", qctx.ReportString(verbosity-2))) + w.Writef("\t%s:\n", queueName) + w.Writef(indent.String("\t\t", qctx.ReportString(verbosity-2))) } } - w.Flush() - return sb.String() + return w.String() } func (sctx *SchedulingContext) AddGangSchedulingContext(gctx *GangSchedulingContext) (bool, error) { @@ -367,13 +364,6 @@ type QueueSchedulingContext struct { EvictedJobsById map[string]bool } -func GetSchedulingContextFromQueueSchedulingContext(qctx *QueueSchedulingContext) *SchedulingContext { - if qctx == nil { - return nil - } - return qctx.SchedulingContext -} - func (qctx *QueueSchedulingContext) String() string { return qctx.ReportString(0) } @@ -391,32 +381,31 @@ func (qctx *QueueSchedulingContext) GetWeight() float64 { const maxJobIdsToPrint = 1 func (qctx *QueueSchedulingContext) ReportString(verbosity int32) string { - var sb strings.Builder - w := tabwriter.NewWriter(&sb, 1, 1, 1, ' ', 0) + w := util.NewTabbedStringBuilder(1, 1, 1, ' ', 0) if verbosity >= 0 { - fmt.Fprintf(w, "Time:\t%s\n", qctx.Created) - fmt.Fprintf(w, "Queue:\t%s\n", qctx.Queue) + w.Writef("Time:\t%s\n", qctx.Created) + w.Writef("Queue:\t%s\n", qctx.Queue) } - fmt.Fprintf(w, "Scheduled resources:\t%s\n", qctx.ScheduledResourcesByPriorityClass.AggregateByResource().CompactString()) - fmt.Fprintf(w, "Scheduled resources (by priority):\t%s\n", qctx.ScheduledResourcesByPriorityClass.String()) - fmt.Fprintf(w, "Preempted resources:\t%s\n", qctx.EvictedResourcesByPriorityClass.AggregateByResource().CompactString()) - fmt.Fprintf(w, "Preempted resources (by priority):\t%s\n", qctx.EvictedResourcesByPriorityClass.String()) + w.Writef("Scheduled resources:\t%s\n", qctx.ScheduledResourcesByPriorityClass.AggregateByResource().CompactString()) + w.Writef("Scheduled resources (by priority):\t%s\n", qctx.ScheduledResourcesByPriorityClass.String()) + w.Writef("Preempted resources:\t%s\n", qctx.EvictedResourcesByPriorityClass.AggregateByResource().CompactString()) + w.Writef("Preempted resources (by priority):\t%s\n", qctx.EvictedResourcesByPriorityClass.String()) if verbosity >= 0 { - fmt.Fprintf(w, "Total allocated resources after scheduling:\t%s\n", qctx.Allocated.CompactString()) - fmt.Fprintf(w, "Total allocated resources after scheduling by priority class:\t%s\n", qctx.AllocatedByPriorityClass) - fmt.Fprintf(w, "Number of jobs scheduled:\t%d\n", len(qctx.SuccessfulJobSchedulingContexts)) - fmt.Fprintf(w, "Number of jobs preempted:\t%d\n", len(qctx.EvictedJobsById)) - fmt.Fprintf(w, "Number of jobs that could not be scheduled:\t%d\n", len(qctx.UnsuccessfulJobSchedulingContexts)) + w.Writef("Total allocated resources after scheduling:\t%s\n", qctx.Allocated.CompactString()) + w.Writef("Total allocated resources after scheduling by priority class:\t%s\n", qctx.AllocatedByPriorityClass) + w.Writef("Number of jobs scheduled:\t%d\n", len(qctx.SuccessfulJobSchedulingContexts)) + w.Writef("Number of jobs preempted:\t%d\n", len(qctx.EvictedJobsById)) + w.Writef("Number of jobs that could not be scheduled:\t%d\n", len(qctx.UnsuccessfulJobSchedulingContexts)) if len(qctx.SuccessfulJobSchedulingContexts) > 0 { jobIdsToPrint := maps.Keys(qctx.SuccessfulJobSchedulingContexts) if len(jobIdsToPrint) > maxJobIdsToPrint { jobIdsToPrint = jobIdsToPrint[0:maxJobIdsToPrint] } - fmt.Fprintf(w, "Scheduled jobs:\t%v", jobIdsToPrint) + w.Writef("Scheduled jobs:\t%v", jobIdsToPrint) if len(jobIdsToPrint) != len(qctx.SuccessfulJobSchedulingContexts) { - fmt.Fprintf(w, " (and %d others not shown)\n", len(qctx.SuccessfulJobSchedulingContexts)-len(jobIdsToPrint)) + w.Writef(" (and %d others not shown)\n", len(qctx.SuccessfulJobSchedulingContexts)-len(jobIdsToPrint)) } else { - fmt.Fprint(w, "\n") + w.Write("\n") } } if len(qctx.EvictedJobsById) > 0 { @@ -424,15 +413,15 @@ func (qctx *QueueSchedulingContext) ReportString(verbosity int32) string { if len(jobIdsToPrint) > maxJobIdsToPrint { jobIdsToPrint = jobIdsToPrint[0:maxJobIdsToPrint] } - fmt.Fprintf(w, "Preempted jobs:\t%v", jobIdsToPrint) + w.Writef("Preempted jobs:\t%v", jobIdsToPrint) if len(jobIdsToPrint) != len(qctx.EvictedJobsById) { - fmt.Fprintf(w, " (and %d others not shown)\n", len(qctx.EvictedJobsById)-len(jobIdsToPrint)) + w.Writef(" (and %d others not shown)\n", len(qctx.EvictedJobsById)-len(jobIdsToPrint)) } else { - fmt.Fprint(w, "\n") + w.Write("\n") } } if len(qctx.UnsuccessfulJobSchedulingContexts) > 0 { - fmt.Fprint(w, "Unschedulable jobs:\n") + w.Write("Unschedulable jobs:\n") jobIdsByReason := armadaslices.MapAndGroupByFuncs( maps.Values(qctx.UnsuccessfulJobSchedulingContexts), func(jctx *JobSchedulingContext) string { @@ -450,12 +439,11 @@ func (qctx *QueueSchedulingContext) ReportString(verbosity int32) string { if len(jobIds) <= 0 { continue } - fmt.Fprintf(w, "\t%d:\t%s (e.g., %s)\n", len(jobIds), reason, jobIds[0]) + w.Writef("\t%d:\t%s (e.g., %s)\n", len(jobIds), reason, jobIds[0]) } } } - w.Flush() - return sb.String() + return w.String() } func (qctx *QueueSchedulingContext) AddGangSchedulingContext(gctx *GangSchedulingContext) error { @@ -603,20 +591,18 @@ type JobSchedulingContext struct { } func (jctx *JobSchedulingContext) String() string { - var sb strings.Builder - w := tabwriter.NewWriter(&sb, 1, 1, 1, ' ', 0) - fmt.Fprintf(w, "Time:\t%s\n", jctx.Created) - fmt.Fprintf(w, "Job ID:\t%s\n", jctx.JobId) + w := util.NewTabbedStringBuilder(1, 1, 1, ' ', 0) + w.Writef("Time:\t%s\n", jctx.Created) + w.Writef("Job ID:\t%s\n", jctx.JobId) if jctx.UnschedulableReason != "" { - fmt.Fprintf(w, "UnschedulableReason:\t%s\n", jctx.UnschedulableReason) + w.Writef("UnschedulableReason:\t%s\n", jctx.UnschedulableReason) } else { - fmt.Fprint(w, "UnschedulableReason:\tnone\n") + w.Write("UnschedulableReason:\tnone\n") } if jctx.PodSchedulingContext != nil { - fmt.Fprint(w, jctx.PodSchedulingContext.String()) + w.Write(jctx.PodSchedulingContext.String()) } - w.Flush() - return sb.String() + return w.String() } func (jctx *JobSchedulingContext) IsSuccessful() bool { @@ -658,22 +644,20 @@ type PodSchedulingContext struct { } func (pctx *PodSchedulingContext) String() string { - var sb strings.Builder - w := tabwriter.NewWriter(&sb, 1, 1, 1, ' ', 0) + w := util.NewTabbedStringBuilder(1, 1, 1, ' ', 0) if pctx.NodeId != "" { - fmt.Fprintf(w, "Node:\t%s\n", pctx.NodeId) + w.Writef("Node:\t%s\n", pctx.NodeId) } else { - fmt.Fprint(w, "Node:\tnone\n") + w.Write("Node:\tnone\n") } - fmt.Fprintf(w, "Number of nodes in cluster:\t%d\n", pctx.NumNodes) + w.Writef("Number of nodes in cluster:\t%d\n", pctx.NumNodes) if len(pctx.NumExcludedNodesByReason) == 0 { - fmt.Fprint(w, "Excluded nodes:\tnone\n") + w.Write("Excluded nodes:\tnone\n") } else { - fmt.Fprint(w, "Excluded nodes:\n") + w.Write("Excluded nodes:\n") for reason, count := range pctx.NumExcludedNodesByReason { - fmt.Fprintf(w, "\t%d:\t%s\n", count, reason) + w.Writef("\t%d:\t%s\n", count, reason) } } - w.Flush() - return sb.String() + return w.String() } diff --git a/internal/scheduler/database/db_pruner.go b/internal/scheduler/database/db_pruner.go index 8da7dd7935d..b0f7f46b28e 100644 --- a/internal/scheduler/database/db_pruner.go +++ b/internal/scheduler/database/db_pruner.go @@ -77,8 +77,8 @@ func PruneDb(ctx *armadacontext.Context, db *pgx.Conn, batchLimit int, keepAfter return nil } - // Delete everything that's present in the batch table - // Do this all in one call so as to be more terse with the syntax + // Delete everything that's present in the batch table. + // We do this in one call in order to be terser with the syntax _, err = tx.Exec(ctx, ` DELETE FROM runs WHERE job_id in (SELECT job_id from batch); DELETE FROM jobs WHERE job_id in (SELECT job_id from batch); diff --git a/internal/scheduler/database/executor_repository.go b/internal/scheduler/database/executor_repository.go index ec50db20126..c06e4a1ebf5 100644 --- a/internal/scheduler/database/executor_repository.go +++ b/internal/scheduler/database/executor_repository.go @@ -67,7 +67,7 @@ func (r *PostgresExecutorRepository) GetLastUpdateTimes(ctx *armadacontext.Conte } lastUpdateTimes := make(map[string]time.Time, len(rows)) for _, row := range rows { - // pgx defaults to local time so we convert to utc here + // pgx defaults to local time, so we convert to utc here lastUpdateTimes[row.ExecutorID] = row.LastUpdated.UTC() } return lastUpdateTimes, nil diff --git a/internal/scheduler/database/job_repository.go b/internal/scheduler/database/job_repository.go index ebc08d03230..2ed463bd4ad 100644 --- a/internal/scheduler/database/job_repository.go +++ b/internal/scheduler/database/job_repository.go @@ -82,39 +82,47 @@ func (r *PostgresJobRepository) FetchJobRunErrors(ctx *armadacontext.Context, ru errorsByRunId := make(map[uuid.UUID]*armadaevents.Error, len(runIds)) decompressor := compress.NewZlibDecompressor() - err := pgx.BeginTxFunc(ctx, r.db, pgx.TxOptions{ - IsoLevel: pgx.ReadCommitted, - AccessMode: pgx.ReadWrite, - DeferrableMode: pgx.Deferrable, - }, func(tx pgx.Tx) error { - for _, chunk := range chunks { - tmpTable, err := insertRunIdsToTmpTable(ctx, tx, chunk) - if err != nil { - return err - } + processChunk := func(tx pgx.Tx, chunk []uuid.UUID) error { + tmpTable, err := insertRunIdsToTmpTable(ctx, tx, chunk) + if err != nil { + return err + } - query := ` + query := ` SELECT job_run_errors.run_id, job_run_errors.error FROM %s as tmp JOIN job_run_errors ON job_run_errors.run_id = tmp.run_id` - rows, err := tx.Query(ctx, fmt.Sprintf(query, tmpTable)) + rows, err := tx.Query(ctx, fmt.Sprintf(query, tmpTable)) + if err != nil { + return err + } + defer rows.Close() + for rows.Next() { + var runId uuid.UUID + var errorBytes []byte + err := rows.Scan(&runId, &errorBytes) + if err != nil { + return errors.WithStack(err) + } + jobError, err := protoutil.DecompressAndUnmarshall(errorBytes, &armadaevents.Error{}, decompressor) if err != nil { - return err + return errors.WithStack(err) } - defer rows.Close() - for rows.Next() { - var runId uuid.UUID - var errorBytes []byte - err := rows.Scan(&runId, &errorBytes) - if err != nil { - return errors.WithStack(err) - } - jobError, err := protoutil.DecompressAndUnmarshall(errorBytes, &armadaevents.Error{}, decompressor) - if err != nil { - return errors.WithStack(err) - } - errorsByRunId[runId] = jobError + errorsByRunId[runId] = jobError + } + return nil + } + + err := pgx.BeginTxFunc(ctx, r.db, pgx.TxOptions{ + IsoLevel: pgx.ReadCommitted, + AccessMode: pgx.ReadWrite, + DeferrableMode: pgx.Deferrable, + }, func(tx pgx.Tx) error { + for _, chunk := range chunks { + err := processChunk(tx, chunk) + if err != nil { + return nil } } return nil diff --git a/internal/scheduler/database/redis_executor_repository_test.go b/internal/scheduler/database/redis_executor_repository_test.go index bf5b0ea9629..afa3b782529 100644 --- a/internal/scheduler/database/redis_executor_repository_test.go +++ b/internal/scheduler/database/redis_executor_repository_test.go @@ -4,6 +4,8 @@ import ( "testing" "time" + log "github.com/sirupsen/logrus" + "github.com/go-redis/redis" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -78,9 +80,10 @@ func TestRedisExecutorRepository_LoadAndSave(t *testing.T) { func withRedisExecutorRepository(action func(repository *RedisExecutorRepository)) { client := redis.NewClient(&redis.Options{Addr: "localhost:6379", DB: 10}) defer client.FlushDB() - defer client.Close() - - client.FlushDB() + defer func() { + err := client.Close() + log.WithError(err).Warn("Error closing redis client") + }() repo := NewRedisExecutorRepository(client, "pulsar") action(repo) } diff --git a/internal/scheduler/fairness/fairness.go b/internal/scheduler/fairness/fairness.go index 36509d6562b..bff0c47a06f 100644 --- a/internal/scheduler/fairness/fairness.go +++ b/internal/scheduler/fairness/fairness.go @@ -19,8 +19,8 @@ type Queue interface { GetWeight() float64 } -// FairnessCostProvider captures algorithms to compute the cost of an allocation. -type FairnessCostProvider interface { +// CostProvider captures algorithms to compute the cost of an allocation. +type CostProvider interface { CostFromQueue(queue Queue) float64 CostFromAllocationAndWeight(allocation schedulerobjects.ResourceList, weight float64) float64 } diff --git a/internal/scheduler/gang_scheduler.go b/internal/scheduler/gang_scheduler.go index fb9a3add118..c3a25c3249e 100644 --- a/internal/scheduler/gang_scheduler.go +++ b/internal/scheduler/gang_scheduler.go @@ -5,7 +5,6 @@ import ( "github.com/hashicorp/go-memdb" - "github.com/armadaproject/armada/internal/common/armadacontext" "github.com/armadaproject/armada/internal/common/util" schedulerconstraints "github.com/armadaproject/armada/internal/scheduler/constraints" schedulercontext "github.com/armadaproject/armada/internal/scheduler/context" @@ -38,13 +37,13 @@ func (sch *GangScheduler) SkipUnsuccessfulSchedulingKeyCheck() { sch.skipUnsuccessfulSchedulingKeyCheck = true } -func (sch *GangScheduler) Schedule(ctx *armadacontext.Context, gctx *schedulercontext.GangSchedulingContext) (ok bool, unschedulableReason string, err error) { +func (sch *GangScheduler) Schedule(gctx *schedulercontext.GangSchedulingContext) (ok bool, unschedulableReason string, err error) { // Exit immediately if this is a new gang and we've exceeded any round limits. // // Because this check occurs before adding the gctx to the sctx, // the round limits can be exceeded by one gang. if !gctx.AllJobsEvicted { - if ok, unschedulableReason, err = sch.constraints.CheckRoundConstraints(sch.schedulingContext, gctx.Queue); err != nil || !ok { + if ok, unschedulableReason, err = sch.constraints.CheckRoundConstraints(sch.schedulingContext); err != nil || !ok { return } } @@ -106,13 +105,13 @@ func (sch *GangScheduler) Schedule(ctx *armadacontext.Context, gctx *schedulerco return } } - return sch.trySchedule(ctx, gctx) + return sch.trySchedule(gctx) } -func (sch *GangScheduler) trySchedule(ctx *armadacontext.Context, gctx *schedulercontext.GangSchedulingContext) (ok bool, unschedulableReason string, err error) { +func (sch *GangScheduler) trySchedule(gctx *schedulercontext.GangSchedulingContext) (ok bool, unschedulableReason string, err error) { // If no node uniformity constraint, try scheduling across all nodes. if gctx.NodeUniformityLabel == "" { - return sch.tryScheduleGang(ctx, gctx) + return sch.tryScheduleGang(gctx) } // Otherwise try scheduling such that all nodes onto which a gang job lands have the same value for gctx.NodeUniformityLabel. @@ -140,7 +139,7 @@ func (sch *GangScheduler) trySchedule(ctx *armadacontext.Context, gctx *schedule } addNodeSelectorToGctx(gctx, gctx.NodeUniformityLabel, value) txn := sch.nodeDb.Txn(true) - if ok, unschedulableReason, err = sch.tryScheduleGangWithTxn(ctx, txn, gctx); err != nil { + if ok, unschedulableReason, err = sch.tryScheduleGangWithTxn(txn, gctx); err != nil { txn.Abort() return } else if ok { @@ -173,20 +172,20 @@ func (sch *GangScheduler) trySchedule(ctx *armadacontext.Context, gctx *schedule return } addNodeSelectorToGctx(gctx, gctx.NodeUniformityLabel, bestValue) - return sch.tryScheduleGang(ctx, gctx) + return sch.tryScheduleGang(gctx) } -func (sch *GangScheduler) tryScheduleGang(ctx *armadacontext.Context, gctx *schedulercontext.GangSchedulingContext) (ok bool, unschedulableReason string, err error) { +func (sch *GangScheduler) tryScheduleGang(gctx *schedulercontext.GangSchedulingContext) (ok bool, unschedulableReason string, err error) { txn := sch.nodeDb.Txn(true) defer txn.Abort() - ok, unschedulableReason, err = sch.tryScheduleGangWithTxn(ctx, txn, gctx) + ok, unschedulableReason, err = sch.tryScheduleGangWithTxn(txn, gctx) if ok && err == nil { txn.Commit() } return } -func (sch *GangScheduler) tryScheduleGangWithTxn(ctx *armadacontext.Context, txn *memdb.Txn, gctx *schedulercontext.GangSchedulingContext) (ok bool, unschedulableReason string, err error) { +func (sch *GangScheduler) tryScheduleGangWithTxn(txn *memdb.Txn, gctx *schedulercontext.GangSchedulingContext) (ok bool, unschedulableReason string, err error) { if ok, err = sch.nodeDb.ScheduleManyWithTxn(txn, gctx.JobSchedulingContexts); err != nil { return } else if !ok { diff --git a/internal/scheduler/gang_scheduler_test.go b/internal/scheduler/gang_scheduler_test.go index cc79703d2b2..add2f03ee7e 100644 --- a/internal/scheduler/gang_scheduler_test.go +++ b/internal/scheduler/gang_scheduler_test.go @@ -9,7 +9,6 @@ import ( "k8s.io/apimachinery/pkg/api/resource" "github.com/armadaproject/armada/internal/armada/configuration" - "github.com/armadaproject/armada/internal/common/armadacontext" armadaslices "github.com/armadaproject/armada/internal/common/slices" schedulerconstraints "github.com/armadaproject/armada/internal/scheduler/constraints" schedulercontext "github.com/armadaproject/armada/internal/scheduler/context" @@ -372,7 +371,7 @@ func TestGangScheduler(t *testing.T) { for i, gang := range tc.Gangs { jctxs := schedulercontext.JobSchedulingContextsFromJobs(testfixtures.TestPriorityClasses, gang) gctx := schedulercontext.NewGangSchedulingContext(jctxs) - ok, reason, err := sch.Schedule(armadacontext.Background(), gctx) + ok, reason, err := sch.Schedule(gctx) require.NoError(t, err) if ok { require.Empty(t, reason) diff --git a/internal/scheduler/jobdb/job_run.go b/internal/scheduler/jobdb/job_run.go index b49f5e5195b..b6b968f0339 100644 --- a/internal/scheduler/jobdb/job_run.go +++ b/internal/scheduler/jobdb/job_run.go @@ -77,7 +77,7 @@ func (run *JobRun) Id() uuid.UUID { return run.id } -// Id returns the id of the job this run is associated with. +// JobId returns the id of the job this run is associated with. func (run *JobRun) JobId() string { return run.jobId } diff --git a/internal/scheduler/jobdb/job_test.go b/internal/scheduler/jobdb/job_test.go index 2b3df48fb95..aa36eb7fea8 100644 --- a/internal/scheduler/jobdb/job_test.go +++ b/internal/scheduler/jobdb/job_test.go @@ -349,7 +349,7 @@ func TestJobSchedulingInfoFieldsInitialised(t *testing.T) { assert.NotNil(t, job.GetNodeSelector()) assert.NotNil(t, job.GetAnnotations()) - // Copy again here, as the fields get mutated so we want a clean copy + // Copy again here; the fields get mutated and we want a clean copy infoWithNilFieldsCopy2 := proto.Clone(infoWithNilFields).(*schedulerobjects.JobSchedulingInfo) updatedJob := baseJob.WithJobSchedulingInfo(infoWithNilFieldsCopy2) assert.NotNil(t, updatedJob.GetNodeSelector()) diff --git a/internal/scheduler/jobiteration.go b/internal/scheduler/jobiteration.go index 04dd63a6490..390211a0c83 100644 --- a/internal/scheduler/jobiteration.go +++ b/internal/scheduler/jobiteration.go @@ -135,14 +135,14 @@ func (repo *InMemoryJobRepository) GetExistingJobsByIds(jobIds []string) ([]inte return rv, nil } -func (repo *InMemoryJobRepository) GetJobIterator(ctx *armadacontext.Context, queue string) (JobIterator, error) { +func (repo *InMemoryJobRepository) GetJobIterator(queue string) (JobIterator, error) { repo.mu.Lock() defer repo.mu.Unlock() return NewInMemoryJobIterator(slices.Clone(repo.jobsByQueue[queue])), nil } // QueuedJobsIterator is an iterator over all jobs in a queue. -// It lazily loads jobs in batches from Redis asynch. +// It lazily loads jobs in batches from Redis async. type QueuedJobsIterator struct { ctx *armadacontext.Context err error @@ -174,7 +174,7 @@ func (it *QueuedJobsIterator) Next() (interfaces.LegacySchedulerJob, error) { return nil, it.err } - // Get one job that was loaded asynchrounsly. + // Get one job that was loaded asynchronously. select { case <-it.ctx.Done(): it.err = it.ctx.Err() // Return an error if called again. diff --git a/internal/scheduler/jobiteration_test.go b/internal/scheduler/jobiteration_test.go index a5990fa3fc4..9f4a437f020 100644 --- a/internal/scheduler/jobiteration_test.go +++ b/internal/scheduler/jobiteration_test.go @@ -321,7 +321,7 @@ func (repo *mockJobRepository) GetExistingJobsByIds(jobIds []string) ([]interfac return rv, nil } -func (repo *mockJobRepository) TryLeaseJobs(clusterId string, queue string, jobs []*api.Job) ([]*api.Job, error) { +func (repo *mockJobRepository) TryLeaseJobs(_ string, _ string, jobs []*api.Job) ([]*api.Job, error) { successfullyLeasedJobs := make([]*api.Job, 0, len(jobs)) for _, job := range jobs { if !repo.leasedJobs[job.Id] { diff --git a/internal/scheduler/kubernetesobjects/affinity/affinity_test.go b/internal/scheduler/kubernetesobjects/affinity/affinity_test.go index fe19741b1f8..c0ef4451170 100644 --- a/internal/scheduler/kubernetesobjects/affinity/affinity_test.go +++ b/internal/scheduler/kubernetesobjects/affinity/affinity_test.go @@ -62,7 +62,7 @@ func vanillaAvoidLabelAffinity(key string, val string) *v1.Affinity { } func vanillaAvoidLabelAffinites(labels []*api.StringKeyValuePair) *v1.Affinity { - mexprs := []v1.NodeSelectorRequirement{} + var mexprs []v1.NodeSelectorRequirement for _, kv := range labels { mexprs = append(mexprs, v1.NodeSelectorRequirement{ diff --git a/internal/scheduler/leader.go b/internal/scheduler/leader.go index 714cf243f52..c5e58e6c97f 100644 --- a/internal/scheduler/leader.go +++ b/internal/scheduler/leader.go @@ -85,7 +85,7 @@ func (lc *StandaloneLeaderController) ValidateToken(tok LeaderToken) bool { return false } -func (lc *StandaloneLeaderController) Run(ctx *armadacontext.Context) error { +func (lc *StandaloneLeaderController) Run(_ *armadacontext.Context) error { return nil } diff --git a/internal/scheduler/leader_client_test.go b/internal/scheduler/leader_client_test.go index 31ba46a8913..7befbb5e332 100644 --- a/internal/scheduler/leader_client_test.go +++ b/internal/scheduler/leader_client_test.go @@ -87,7 +87,7 @@ func (f *FakeLeaderController) GetToken() LeaderToken { return NewLeaderToken() } -func (f *FakeLeaderController) ValidateToken(tok LeaderToken) bool { +func (f *FakeLeaderController) ValidateToken(_ LeaderToken) bool { return f.IsCurrentlyLeader } diff --git a/internal/scheduler/leader_proxying_reports_server_test.go b/internal/scheduler/leader_proxying_reports_server_test.go index 2b83a02da28..ccbb06f0cc8 100644 --- a/internal/scheduler/leader_proxying_reports_server_test.go +++ b/internal/scheduler/leader_proxying_reports_server_test.go @@ -52,7 +52,7 @@ func TestLeaderProxyingSchedulingReportsServer_GetJobReports(t *testing.T) { ctx, cancel := armadacontext.WithTimeout(armadacontext.Background(), 5*time.Second) defer cancel() - sut, clientProvider, jobReportsServer, jobReportsClient := setupLeaderProxyingSchedulerReportsServerTest(t) + sut, clientProvider, jobReportsServer, jobReportsClient := setupLeaderProxyingSchedulerReportsServerTest() clientProvider.IsCurrentProcessLeader = tc.isCurrentProcessLeader request := &schedulerobjects.JobReportRequest{JobId: "job-1"} @@ -117,7 +117,7 @@ func TestLeaderProxyingSchedulingReportsServer_GetSchedulingReport(t *testing.T) ctx, cancel := armadacontext.WithTimeout(armadacontext.Background(), 5*time.Second) defer cancel() - sut, clientProvider, jobReportsServer, jobReportsClient := setupLeaderProxyingSchedulerReportsServerTest(t) + sut, clientProvider, jobReportsServer, jobReportsClient := setupLeaderProxyingSchedulerReportsServerTest() clientProvider.IsCurrentProcessLeader = tc.isCurrentProcessLeader request := &schedulerobjects.SchedulingReportRequest{Verbosity: int32(1)} @@ -182,7 +182,7 @@ func TestLeaderProxyingSchedulingReportsServer_GetQueueReport(t *testing.T) { ctx, cancel := armadacontext.WithTimeout(armadacontext.Background(), 5*time.Second) defer cancel() - sut, clientProvider, jobReportsServer, jobReportsClient := setupLeaderProxyingSchedulerReportsServerTest(t) + sut, clientProvider, jobReportsServer, jobReportsClient := setupLeaderProxyingSchedulerReportsServerTest() clientProvider.IsCurrentProcessLeader = tc.isCurrentProcessLeader request := &schedulerobjects.QueueReportRequest{Verbosity: int32(1), QueueName: "queue-1"} @@ -208,7 +208,7 @@ func TestLeaderProxyingSchedulingReportsServer_GetQueueReport(t *testing.T) { } } -func setupLeaderProxyingSchedulerReportsServerTest(t *testing.T) (*LeaderProxyingSchedulingReportsServer, *FakeClientProvider, *FakeSchedulerReportingServer, *FakeSchedulerReportingClient) { +func setupLeaderProxyingSchedulerReportsServerTest() (*LeaderProxyingSchedulingReportsServer, *FakeClientProvider, *FakeSchedulerReportingServer, *FakeSchedulerReportingClient) { jobReportsServer := NewFakeSchedulerReportingServer() jobReportsClient := NewFakeSchedulerReportingClient() clientProvider := NewFakeClientProvider() @@ -292,17 +292,17 @@ func NewFakeSchedulerReportingClient() *FakeSchedulerReportingClient { } } -func (f *FakeSchedulerReportingClient) GetSchedulingReport(ctx context.Context, request *schedulerobjects.SchedulingReportRequest, opts ...grpc.CallOption) (*schedulerobjects.SchedulingReport, error) { +func (f *FakeSchedulerReportingClient) GetSchedulingReport(ctx context.Context, request *schedulerobjects.SchedulingReportRequest, _ ...grpc.CallOption) (*schedulerobjects.SchedulingReport, error) { f.GetSchedulingReportCalls = append(f.GetSchedulingReportCalls, GetSchedulingReportCall{Context: ctx, Request: request}) return f.GetSchedulingReportResponse, f.Err } -func (f *FakeSchedulerReportingClient) GetQueueReport(ctx context.Context, request *schedulerobjects.QueueReportRequest, opts ...grpc.CallOption) (*schedulerobjects.QueueReport, error) { +func (f *FakeSchedulerReportingClient) GetQueueReport(ctx context.Context, request *schedulerobjects.QueueReportRequest, _ ...grpc.CallOption) (*schedulerobjects.QueueReport, error) { f.GetQueueReportCalls = append(f.GetQueueReportCalls, GetQueueReportCall{Context: ctx, Request: request}) return f.GetQueueReportResponse, f.Err } -func (f *FakeSchedulerReportingClient) GetJobReport(ctx context.Context, request *schedulerobjects.JobReportRequest, opts ...grpc.CallOption) (*schedulerobjects.JobReport, error) { +func (f *FakeSchedulerReportingClient) GetJobReport(ctx context.Context, request *schedulerobjects.JobReportRequest, _ ...grpc.CallOption) (*schedulerobjects.JobReport, error) { f.GetJobReportCalls = append(f.GetJobReportCalls, GetJobReportCall{Context: ctx, Request: request}) return f.GetJobReportResponse, f.Err } @@ -328,6 +328,6 @@ func NewFakeSchedulerReportingClientProvider() *FakeSchedulerReportingClientProv return &FakeSchedulerReportingClientProvider{} } -func (f *FakeSchedulerReportingClientProvider) GetSchedulerReportingClient(conn *grpc.ClientConn) schedulerobjects.SchedulerReportingClient { +func (f *FakeSchedulerReportingClientProvider) GetSchedulerReportingClient(_ *grpc.ClientConn) schedulerobjects.SchedulerReportingClient { return f.Client } diff --git a/internal/scheduler/leader_test.go b/internal/scheduler/leader_test.go index 17fb468b0cf..8e22d9b1a49 100644 --- a/internal/scheduler/leader_test.go +++ b/internal/scheduler/leader_test.go @@ -34,9 +34,9 @@ const ( // Test becoming leader. This test is slightly awkward as the K8s leader election code // in client-go doesn't seem to have a mechanism for manipulating the internal clock. // As a result, the test works as follows: -// * Set up a mock K8s client that updates the lease owener ever 100ms +// * Set up a mock K8s client that updates the lease owner ever 100ms // * Set up the client such that it checks for updates every 10ms -// * assert thart the state transitions are as expected +// * assert that the state transitions are as expected func TestK8sLeaderController_BecomingLeader(t *testing.T) { tests := map[string]struct { states []State // states to be returned from the server. diff --git a/internal/scheduler/nodedb/encoding.go b/internal/scheduler/nodedb/encoding.go index a5d88716e80..9f7b4fac2b3 100644 --- a/internal/scheduler/nodedb/encoding.go +++ b/internal/scheduler/nodedb/encoding.go @@ -52,7 +52,7 @@ func EncodeQuantity(out []byte, val resource.Quantity) []byte { return EncodeInt64(out, val.MilliValue()) } -// EncodeInt64 returns the canonical byte representation of a int64 used within the nodeDb. +// EncodeInt64 returns the canonical byte representation of an int64 used within the nodeDb. // The resulting []byte is such that for two int64 a and b, a.Cmp(b) = bytes.Compare(enc(a), enc(b)). // The byte representation is appended to out, which is returned. func EncodeInt64(out []byte, val int64) []byte { diff --git a/internal/scheduler/nodedb/nodedb.go b/internal/scheduler/nodedb/nodedb.go index a2351d1e75f..269f54335d2 100644 --- a/internal/scheduler/nodedb/nodedb.go +++ b/internal/scheduler/nodedb/nodedb.go @@ -3,9 +3,7 @@ package nodedb import ( "fmt" "math" - "strings" "sync" - "text/tabwriter" "time" "github.com/hashicorp/go-memdb" @@ -34,7 +32,7 @@ const ( // This helps avoid scheduling new jobs onto nodes that make it impossible to re-schedule evicted jobs. evictedPriority int32 = -1 // MinPriority is the smallest possible priority class priority within the NodeDb. - MinPriority int32 = evictedPriority + MinPriority = evictedPriority ) var empty struct{} @@ -309,7 +307,7 @@ func NewNodeDb( priorityClassPriorities := maps.Keys(allowedPriorities) slices.Sort(priorityClassPriorities) nodeDbPriorities := armadaslices.Concatenate([]int32{evictedPriority}, priorityClassPriorities) - schema, indexNameByPriority := nodeDbSchema(nodeDbPriorities, indexedResourceNames) + schema, indexNameByPriority := nodeDbSchema(nodeDbPriorities) db, err := memdb.NewMemDB(schema) if err != nil { return nil, errors.WithStack(err) @@ -388,22 +386,20 @@ func (nodeDb *NodeDb) EnableNewPreemptionStrategy() { } func (nodeDb *NodeDb) String() string { - var sb strings.Builder - w := tabwriter.NewWriter(&sb, 1, 1, 1, ' ', 0) - fmt.Fprintf(w, "Priorities:\t%v\n", configuration.AllowedPriorities(nodeDb.priorityClasses)) - fmt.Fprintf(w, "Indexed resources:\t%v\n", nodeDb.indexedResources) - fmt.Fprintf(w, "Indexed taints:\t%v\n", maps.Keys(nodeDb.indexedTaints)) - fmt.Fprintf(w, "Indexed node labels:\t%v\n", maps.Keys(nodeDb.indexedNodeLabels)) + w := util.NewTabbedStringBuilder(1, 1, 1, ' ', 0) + w.Writef("Priorities:\t%v\n", configuration.AllowedPriorities(nodeDb.priorityClasses)) + w.Writef("Indexed resources:\t%v\n", nodeDb.indexedResources) + w.Writef("Indexed taints:\t%v\n", maps.Keys(nodeDb.indexedTaints)) + w.Writef("Indexed node labels:\t%v\n", maps.Keys(nodeDb.indexedNodeLabels)) if len(nodeDb.nodeTypes) == 0 { - fmt.Fprint(w, "Node types:\tnone\n") + w.Write("Node types:\tnone\n") } else { - fmt.Fprint(w, "Node types:\n") + w.Write("Node types:\n") for _, nodeType := range nodeDb.nodeTypes { - fmt.Fprintf(w, " %d\n", nodeType.Id) + w.Writef(" %d\n", nodeType.Id) } } - w.Flush() - return sb.String() + return w.String() } // IndexedNodeLabelValues returns the set of possible values for a given indexed label across all nodes in the NodeDb. @@ -1116,8 +1112,8 @@ func (nodeDb *NodeDb) AddEvictedJobSchedulingContextWithTxn(txn *memdb.Txn, inde return nil } -func nodeDbSchema(priorities []int32, resources []string) (*memdb.DBSchema, map[int32]string) { - nodesTable, indexNameByPriority := nodesTableSchema(priorities, resources) +func nodeDbSchema(priorities []int32) (*memdb.DBSchema, map[int32]string) { + nodesTable, indexNameByPriority := nodesTableSchema(priorities) evictionsTable := evictionsTableSchema() return &memdb.DBSchema{ Tables: map[string]*memdb.TableSchema{ @@ -1127,7 +1123,7 @@ func nodeDbSchema(priorities []int32, resources []string) (*memdb.DBSchema, map[ }, indexNameByPriority } -func nodesTableSchema(priorities []int32, resources []string) (*memdb.TableSchema, map[int32]string) { +func nodesTableSchema(priorities []int32) (*memdb.TableSchema, map[int32]string) { indexes := make(map[string]*memdb.IndexSchema, len(priorities)+1) indexes["id"] = &memdb.IndexSchema{ Name: "id", diff --git a/internal/scheduler/nodedb/nodedb_test.go b/internal/scheduler/nodedb/nodedb_test.go index 50f7f9c5a9c..d3e9b75c433 100644 --- a/internal/scheduler/nodedb/nodedb_test.go +++ b/internal/scheduler/nodedb/nodedb_test.go @@ -20,7 +20,7 @@ import ( ) func TestNodeDbSchema(t *testing.T) { - schema, _ := nodeDbSchema(testfixtures.TestPriorities, testfixtures.TestResourceNames) + schema, _ := nodeDbSchema(testfixtures.TestPriorities) assert.NoError(t, schema.Validate()) } diff --git a/internal/scheduler/nodedb/nodeiteration.go b/internal/scheduler/nodedb/nodeiteration.go index fb2715c6676..3a9826ef48b 100644 --- a/internal/scheduler/nodedb/nodeiteration.go +++ b/internal/scheduler/nodedb/nodeiteration.go @@ -242,10 +242,10 @@ func (pq *nodeTypesIteratorPQ) Less(i, j int) bool { return pq.less(pq.items[i].node, pq.items[j].node) } -func (it *nodeTypesIteratorPQ) less(a, b *Node) bool { - allocatableByPriorityA := a.AllocatableByPriority[it.priority] - allocatableByPriorityB := b.AllocatableByPriority[it.priority] - for _, t := range it.indexedResources { +func (pq *nodeTypesIteratorPQ) less(a, b *Node) bool { + allocatableByPriorityA := a.AllocatableByPriority[pq.priority] + allocatableByPriorityB := b.AllocatableByPriority[pq.priority] + for _, t := range pq.indexedResources { qa := allocatableByPriorityA.Get(t) qb := allocatableByPriorityB.Get(t) if cmp := qa.Cmp(qb); cmp == -1 { @@ -304,7 +304,7 @@ type NodeTypeIterator struct { // Updated in-place as the iterator makes progress. lowerBound []resource.Quantity // memdb key computed from nodeTypeId and lowerBound. - // Stored here to avoid dynamic allocs. + // Stored here to avoid dynamic allocations. key []byte // Current iterator into the underlying memdb. // Updated in-place whenever lowerBound changes. diff --git a/internal/scheduler/nodedb/unschedulable.go b/internal/scheduler/nodedb/unschedulable.go index 7de03733310..51b35b2ffee 100644 --- a/internal/scheduler/nodedb/unschedulable.go +++ b/internal/scheduler/nodedb/unschedulable.go @@ -3,9 +3,9 @@ package nodedb import v1 "k8s.io/api/core/v1" const ( - unschedulableTaintKey string = "armadaproject.io/unschedulable" - unschedulableTaintValue string = "true" - unschedulableTaintEffect v1.TaintEffect = v1.TaintEffectNoSchedule + unschedulableTaintKey string = "armadaproject.io/unschedulable" + unschedulableTaintValue string = "true" + unschedulableTaintEffect = v1.TaintEffectNoSchedule ) // UnschedulableTaint returns the taint automatically added to unschedulable nodes on inserting into the nodeDb. @@ -16,11 +16,3 @@ func UnschedulableTaint() v1.Taint { Effect: unschedulableTaintEffect, } } - -// UnschedulableToleration returns a toleration that tolerates UnschedulableTaint(). -func UnschedulableToleration() v1.Toleration { - return v1.Toleration{ - Key: unschedulableTaintKey, - Value: unschedulableTaintValue, - } -} diff --git a/internal/scheduler/preempting_queue_scheduler.go b/internal/scheduler/preempting_queue_scheduler.go index adfbe4d86b5..a397619977d 100644 --- a/internal/scheduler/preempting_queue_scheduler.go +++ b/internal/scheduler/preempting_queue_scheduler.go @@ -322,7 +322,7 @@ func (sch *PreemptingQueueScheduler) evict(ctx *armadacontext.Context, evictor * if err := sch.nodeDb.Reset(); err != nil { return nil, nil, err } - if err := addEvictedJobsToNodeDb(ctx, sch.schedulingContext, sch.nodeDb, inMemoryJobRepo); err != nil { + if err := addEvictedJobsToNodeDb(sch.schedulingContext, sch.nodeDb, inMemoryJobRepo); err != nil { return nil, nil, err } } @@ -495,10 +495,10 @@ func (q MinimalQueue) GetWeight() float64 { // addEvictedJobsToNodeDb adds evicted jobs to the NodeDb. // Needed to enable the nodeDb accounting for these when preempting. -func addEvictedJobsToNodeDb(ctx *armadacontext.Context, sctx *schedulercontext.SchedulingContext, nodeDb *nodedb.NodeDb, inMemoryJobRepo *InMemoryJobRepository) error { +func addEvictedJobsToNodeDb(sctx *schedulercontext.SchedulingContext, nodeDb *nodedb.NodeDb, inMemoryJobRepo *InMemoryJobRepository) error { gangItByQueue := make(map[string]*QueuedGangIterator) for _, qctx := range sctx.QueueSchedulingContexts { - jobIt, err := inMemoryJobRepo.GetJobIterator(ctx, qctx.Queue) + jobIt, err := inMemoryJobRepo.GetJobIterator(qctx.Queue) if err != nil { return err } @@ -538,7 +538,7 @@ func addEvictedJobsToNodeDb(ctx *armadacontext.Context, sctx *schedulercontext.S func (sch *PreemptingQueueScheduler) schedule(ctx *armadacontext.Context, inMemoryJobRepo *InMemoryJobRepository, jobRepo JobRepository) (*SchedulerResult, error) { jobIteratorByQueue := make(map[string]JobIterator) for _, qctx := range sch.schedulingContext.QueueSchedulingContexts { - evictedIt, err := inMemoryJobRepo.GetJobIterator(ctx, qctx.Queue) + evictedIt, err := inMemoryJobRepo.GetJobIterator(qctx.Queue) if err != nil { return nil, err } @@ -881,7 +881,7 @@ func (evi *Evictor) Evict(ctx *armadacontext.Context, it nodedb.NodeIterator) (* // TODO: This is only necessary for jobs not scheduled in this cycle. // Since jobs scheduled in this cycle can be re-scheduled onto another node without triggering a preemption. func defaultPostEvictFunc(ctx *armadacontext.Context, job interfaces.LegacySchedulerJob, node *nodedb.Node) { - // Add annotation indicating to the scheduler this this job was evicted. + // Add annotation indicating to the scheduler this job was evicted. annotations := job.GetAnnotations() if annotations == nil { ctx.Errorf("error evicting job %s: annotations not initialised", job.GetId()) diff --git a/internal/scheduler/proxying_reports_server_test.go b/internal/scheduler/proxying_reports_server_test.go index 98f7c11fa97..2048a9e4f1a 100644 --- a/internal/scheduler/proxying_reports_server_test.go +++ b/internal/scheduler/proxying_reports_server_test.go @@ -27,7 +27,7 @@ func TestProxyingSchedulingReportsServer_GetJobReports(t *testing.T) { ctx, cancel := armadacontext.WithTimeout(armadacontext.Background(), 5*time.Second) defer cancel() - sut, jobReportsClient := setupProxyingSchedulerReportsServerTest(t) + sut, jobReportsClient := setupProxyingSchedulerReportsServerTest() request := &schedulerobjects.JobReportRequest{JobId: "job-1"} @@ -65,7 +65,7 @@ func TestProxyingSchedulingReportsServer_GetSchedulingReport(t *testing.T) { ctx, cancel := armadacontext.WithTimeout(armadacontext.Background(), 5*time.Second) defer cancel() - sut, jobReportsClient := setupProxyingSchedulerReportsServerTest(t) + sut, jobReportsClient := setupProxyingSchedulerReportsServerTest() request := &schedulerobjects.SchedulingReportRequest{Verbosity: int32(1)} @@ -103,7 +103,7 @@ func TestProxyingSchedulingReportsServer_GetQueueReport(t *testing.T) { ctx, cancel := armadacontext.WithTimeout(armadacontext.Background(), 5*time.Second) defer cancel() - sut, jobReportsClient := setupProxyingSchedulerReportsServerTest(t) + sut, jobReportsClient := setupProxyingSchedulerReportsServerTest() request := &schedulerobjects.QueueReportRequest{Verbosity: int32(1), QueueName: "queue-1"} @@ -125,7 +125,7 @@ func TestProxyingSchedulingReportsServer_GetQueueReport(t *testing.T) { } } -func setupProxyingSchedulerReportsServerTest(t *testing.T) (*ProxyingSchedulingReportsServer, *FakeSchedulerReportingClient) { +func setupProxyingSchedulerReportsServerTest() (*ProxyingSchedulingReportsServer, *FakeSchedulerReportingClient) { schedulerReportsClient := NewFakeSchedulerReportingClient() sut := NewProxyingSchedulingReportsServer(schedulerReportsClient) return sut, schedulerReportsClient diff --git a/internal/scheduler/queue_scheduler.go b/internal/scheduler/queue_scheduler.go index cf03c7af3fc..a1957b0739f 100644 --- a/internal/scheduler/queue_scheduler.go +++ b/internal/scheduler/queue_scheduler.go @@ -88,7 +88,7 @@ func (sch *QueueScheduler) Schedule(ctx *armadacontext.Context) (*SchedulerResul return nil, err default: } - if ok, unschedulableReason, err := sch.gangScheduler.Schedule(ctx, gctx); err != nil { + if ok, unschedulableReason, err := sch.gangScheduler.Schedule(gctx); err != nil { return nil, err } else if ok { for _, jctx := range gctx.JobSchedulingContexts { @@ -260,7 +260,7 @@ func (it *QueuedGangIterator) hitLookbackLimit() bool { // where the fraction of fair share computation includes the yielded gang. type CandidateGangIterator struct { queueRepository fairness.QueueRepository - fairnessCostProvider fairness.FairnessCostProvider + fairnessCostProvider fairness.CostProvider // If true, this iterator only yields gangs where all jobs are evicted. onlyYieldEvicted bool // If, e.g., onlyYieldEvictedByQueue["A"] is true, @@ -275,7 +275,7 @@ type CandidateGangIterator struct { func NewCandidateGangIterator( queueRepository fairness.QueueRepository, - fairnessCostProvider fairness.FairnessCostProvider, + fairnessCostProvider fairness.CostProvider, iteratorsByQueue map[string]*QueuedGangIterator, ) (*CandidateGangIterator, error) { it := &CandidateGangIterator{ @@ -397,7 +397,7 @@ func (it *CandidateGangIterator) queueCostWithGctx(gctx *schedulercontext.GangSc return it.fairnessCostProvider.CostFromAllocationAndWeight(it.buffer, queue.GetWeight()), nil } -// Priority queue used by CandidateGangIterator to determine from which queue to schedule the next job. +// QueueCandidateGangIteratorPQ is a Priority queue used by CandidateGangIterator to determine from which queue to schedule the next job. type QueueCandidateGangIteratorPQ []*QueueCandidateGangIteratorItem type QueueCandidateGangIteratorItem struct { diff --git a/internal/scheduler/queue_scheduler_test.go b/internal/scheduler/queue_scheduler_test.go index 3832db7ceba..44fd8899177 100644 --- a/internal/scheduler/queue_scheduler_test.go +++ b/internal/scheduler/queue_scheduler_test.go @@ -512,7 +512,7 @@ func TestQueueScheduler(t *testing.T) { ) jobIteratorByQueue := make(map[string]JobIterator) for queue := range tc.PriorityFactorByQueue { - it, err := jobRepo.GetJobIterator(armadacontext.Background(), queue) + it, err := jobRepo.GetJobIterator(queue) require.NoError(t, err) jobIteratorByQueue[queue] = it } diff --git a/internal/scheduler/reports.go b/internal/scheduler/reports.go index aefef6eb884..01b578f0733 100644 --- a/internal/scheduler/reports.go +++ b/internal/scheduler/reports.go @@ -6,7 +6,10 @@ import ( "strings" "sync" "sync/atomic" - "text/tabwriter" + + "strings" + "sync" + "sync/atomic" lru "github.com/hashicorp/golang-lru" "github.com/oklog/ulid" @@ -291,83 +294,77 @@ func (repo *SchedulingContextRepository) getSchedulingReportString(verbosity int mostRecentByExecutor := repo.GetMostRecentSchedulingContextByExecutor() mostRecentSuccessfulByExecutor := repo.GetMostRecentSuccessfulSchedulingContextByExecutor() mostRecentPreemptingByExecutor := repo.GetMostRecentPreemptingSchedulingContextByExecutor() - var sb strings.Builder - w := tabwriter.NewWriter(&sb, 1, 1, 1, ' ', 0) + w := util.NewTabbedStringBuilder(1, 1, 1, ' ', 0) for _, executorId := range repo.GetSortedExecutorIds() { - fmt.Fprintf(w, "%s:\n", executorId) + w.Writef("%s:\n", executorId) if sctx := mostRecentByExecutor[executorId]; sctx != nil { - fmt.Fprint(w, indent.String("\t", "Most recent scheduling round:\n")) - fmt.Fprint(w, indent.String("\t\t", sctx.ReportString(verbosity))) + w.Write(indent.String("\t", "Most recent scheduling round:\n")) + w.Write(indent.String("\t\t", sctx.ReportString(verbosity))) } else { - fmt.Fprint(w, indent.String("\t", "Most recent scheduling round: none\n")) + w.Write(indent.String("\t", "Most recent scheduling round: none\n")) } if sctx := mostRecentSuccessfulByExecutor[executorId]; sctx != nil { - fmt.Fprint(w, indent.String("\t", "Most recent scheduling round that scheduled a job:\n")) - fmt.Fprint(w, indent.String("\t\t", sctx.ReportString(verbosity))) + w.Write(indent.String("\t", "Most recent scheduling round that scheduled a job:\n")) + w.Write(indent.String("\t\t", sctx.ReportString(verbosity))) } else { - fmt.Fprint(w, indent.String("\t", "Most recent scheduling round that scheduled a job: none\n")) + w.Write(indent.String("\t", "Most recent scheduling round that scheduled a job: none\n")) } if sctx := mostRecentPreemptingByExecutor[executorId]; sctx != nil { - fmt.Fprint(w, indent.String("\t", "Most recent scheduling round that preempted a job:\n")) - fmt.Fprint(w, indent.String("\t\t", sctx.ReportString(verbosity))) + w.Write(indent.String("\t", "Most recent scheduling round that preempted a job:\n")) + w.Write(indent.String("\t\t", sctx.ReportString(verbosity))) } else { - fmt.Fprint(w, indent.String("\t", "Most recent scheduling round that preempted a job: none\n")) + w.Write(indent.String("\t", "Most recent scheduling round that preempted a job: none\n")) } } - w.Flush() - return sb.String() + return w.String() } func (repo *SchedulingContextRepository) getSchedulingReportStringForQueue(queue string, verbosity int32) string { mostRecentByExecutor, _ := repo.GetMostRecentSchedulingContextByExecutorForQueue(queue) mostRecentSuccessfulByExecutor, _ := repo.GetMostRecentSuccessfulSchedulingContextByExecutorForQueue(queue) mostRecentPreemptingByExecutor, _ := repo.GetMostRecentPreemptingSchedulingContextByExecutorForQueue(queue) - var sb strings.Builder - w := tabwriter.NewWriter(&sb, 1, 1, 1, ' ', 0) + w := util.NewTabbedStringBuilder(1, 1, 1, ' ', 0) for _, executorId := range repo.GetSortedExecutorIds() { - fmt.Fprintf(w, "%s:\n", executorId) + w.Writef("%s:\n", executorId) if sctx := mostRecentByExecutor[executorId]; sctx != nil { - fmt.Fprintf(w, "\tMost recent scheduling round that considered queue %s:\n", queue) + w.Writef("\tMost recent scheduling round that considered queue %s:\n", queue) sr := getSchedulingReportForQueue(sctx, queue) - fmt.Fprint(w, indent.String("\t\t", sr.ReportString(verbosity))) + w.Write(indent.String("\t\t", sr.ReportString(verbosity))) } else { - fmt.Fprintf(w, "\tMost recent scheduling round that considered queue %s: none\n", queue) + w.Writef("\tMost recent scheduling round that considered queue %s: none\n", queue) } if sctx := mostRecentSuccessfulByExecutor[executorId]; sctx != nil { - fmt.Fprintf(w, "\tMost recent scheduling round that scheduled a job from queue %s:\n", queue) + w.Writef("\tMost recent scheduling round that scheduled a job from queue %s:\n", queue) sr := getSchedulingReportForQueue(sctx, queue) - fmt.Fprint(w, indent.String("\t\t", sr.ReportString(verbosity))) + w.Write(indent.String("\t\t", sr.ReportString(verbosity))) } else { - fmt.Fprintf(w, "\tMost recent scheduling round that scheduled a job from queue %s: none\n", queue) + w.Writef("\tMost recent scheduling round that scheduled a job from queue %s: none\n", queue) } if sctx := mostRecentPreemptingByExecutor[executorId]; sctx != nil { - fmt.Fprintf(w, "\tMost recent scheduling round that preempted a job from queue %s:\n", queue) + w.Writef("\tMost recent scheduling round that preempted a job from queue %s:\n", queue) sr := getSchedulingReportForQueue(sctx, queue) - fmt.Fprint(w, indent.String("\t\t", sr.ReportString(verbosity))) + w.Write(indent.String("\t\t", sr.ReportString(verbosity))) } else { - fmt.Fprintf(w, "\tMost recent scheduling round that preempted a job from queue %s: none\n", queue) + w.Writef("\tMost recent scheduling round that preempted a job from queue %s: none\n", queue) } } - w.Flush() - return sb.String() + return w.String() } func (repo *SchedulingContextRepository) getSchedulingReportStringForJob(jobId string, verbosity int32) string { mostRecentByExecutor, _ := repo.GetMostRecentSchedulingContextByExecutorForJob(jobId) - var sb strings.Builder - w := tabwriter.NewWriter(&sb, 1, 1, 1, ' ', 0) + w := util.NewTabbedStringBuilder(1, 1, 1, ' ', 0) for _, executorId := range repo.GetSortedExecutorIds() { - fmt.Fprintf(w, "%s:\n", executorId) + w.Writef("%s:\n", executorId) if sctx := mostRecentByExecutor[executorId]; sctx != nil { - fmt.Fprintf(w, "\tMost recent scheduling round that affected job %s:\n", jobId) + w.Writef("\tMost recent scheduling round that affected job %s:\n", jobId) sr := getSchedulingReportForJob(sctx, jobId) - fmt.Fprint(w, indent.String("\t\t", sr.ReportString(verbosity))) + w.Write(indent.String("\t\t", sr.ReportString(verbosity))) } else { - fmt.Fprintf(w, "\tMost recent scheduling round that affected job %s: none\n", jobId) + w.Writef("\tMost recent scheduling round that affected job %s: none\n", jobId) } } - w.Flush() - return sb.String() + return w.String() } type schedulingReport struct { @@ -377,22 +374,20 @@ type schedulingReport struct { } func (sr schedulingReport) ReportString(verbosity int32) string { - var sb strings.Builder - w := tabwriter.NewWriter(&sb, 1, 1, 1, ' ', 0) + w := util.NewTabbedStringBuilder(1, 1, 1, ' ', 0) if sctx := sr.schedulingContext; sctx != nil { - fmt.Fprint(w, "Overall scheduling report:\n") - fmt.Fprint(w, indent.String("\t", sctx.ReportString(verbosity))) + w.Write("Overall scheduling report:\n") + w.Write(indent.String("\t", sctx.ReportString(verbosity))) } if qctx := sr.queueSchedulingContext; qctx != nil { - fmt.Fprintf(w, "Scheduling report for queue %s:\n", qctx.Queue) - fmt.Fprint(w, indent.String("\t", qctx.ReportString(verbosity))) + w.Writef("Scheduling report for queue %s:\n", qctx.Queue) + w.Write(indent.String("\t", qctx.ReportString(verbosity))) } if jctx := sr.jobSchedulingContext; jctx != nil { - fmt.Fprintf(w, "Scheduling report for job %s:\n", jctx.JobId) - fmt.Fprint(w, indent.String("\t", jctx.String())) + w.Writef("Scheduling report for job %s:\n", jctx.JobId) + w.Write(indent.String("\t", jctx.String())) } - w.Flush() - return sb.String() + return w.String() } func getSchedulingReportForQueue(sctx *schedulercontext.SchedulingContext, queue string) (sr schedulingReport) { @@ -442,31 +437,29 @@ func (repo *SchedulingContextRepository) getQueueReportString(queue string, verb mostRecentByExecutor, _ := repo.GetMostRecentSchedulingContextByExecutorForQueue(queue) mostRecentSuccessfulByExecutor, _ := repo.GetMostRecentSuccessfulSchedulingContextByExecutorForQueue(queue) mostRecentPreemptingByExecutor, _ := repo.GetMostRecentPreemptingSchedulingContextByExecutorForQueue(queue) - var sb strings.Builder - w := tabwriter.NewWriter(&sb, 1, 1, 1, ' ', 0) + w := util.NewTabbedStringBuilder(1, 1, 1, ' ', 0) for _, executorId := range repo.GetSortedExecutorIds() { - fmt.Fprintf(w, "%s:\n", executorId) + w.Writef("%s:\n", executorId) if sr := getSchedulingReportForQueue(mostRecentByExecutor[executorId], queue); sr.queueSchedulingContext != nil { - fmt.Fprintf(w, "\tMost recent scheduling round that considered queue %s:\n", queue) - fmt.Fprint(w, indent.String("\t\t", sr.queueSchedulingContext.ReportString(verbosity))) + w.Writef("\tMost recent scheduling round that considered queue %s:\n", queue) + w.Write(indent.String("\t\t", sr.queueSchedulingContext.ReportString(verbosity))) } else { - fmt.Fprintf(w, "\tMost recent scheduling round that considered queue %s: none\n", queue) + w.Writef("\tMost recent scheduling round that considered queue %s: none\n", queue) } if sr := getSchedulingReportForQueue(mostRecentSuccessfulByExecutor[executorId], queue); sr.queueSchedulingContext != nil { - fmt.Fprintf(w, "\tMost recent scheduling round that scheduled a job from queue %s:\n", queue) - fmt.Fprint(w, indent.String("\t\t", sr.queueSchedulingContext.ReportString(verbosity))) + w.Writef("\tMost recent scheduling round that scheduled a job from queue %s:\n", queue) + w.Write(indent.String("\t\t", sr.queueSchedulingContext.ReportString(verbosity))) } else { - fmt.Fprintf(w, "\tMost recent scheduling round that scheduled a job from queue %s: none\n", queue) + w.Writef("\tMost recent scheduling round that scheduled a job from queue %s: none\n", queue) } if sr := getSchedulingReportForQueue(mostRecentPreemptingByExecutor[executorId], queue); sr.queueSchedulingContext != nil { - fmt.Fprintf(w, "\tMost recent scheduling round that preempted a job from queue %s:\n", queue) - fmt.Fprint(w, indent.String("\t\t", sr.queueSchedulingContext.ReportString(verbosity))) + w.Writef("\tMost recent scheduling round that preempted a job from queue %s:\n", queue) + w.Write(indent.String("\t\t", sr.queueSchedulingContext.ReportString(verbosity))) } else { - fmt.Fprintf(w, "\tMost recent scheduling round that preempted a job from queue %s: none\n", queue) + w.Writef("\tMost recent scheduling round that preempted a job from queue %s: none\n", queue) } } - w.Flush() - return sb.String() + return w.String() } // GetJobReport is a gRPC endpoint for querying job reports. @@ -487,18 +480,16 @@ func (repo *SchedulingContextRepository) GetJobReport(_ context.Context, request func (repo *SchedulingContextRepository) getJobReportString(jobId string) string { byExecutor, _ := repo.GetMostRecentSchedulingContextByExecutorForJob(jobId) - var sb strings.Builder - w := tabwriter.NewWriter(&sb, 1, 1, 1, ' ', 0) + w := util.NewTabbedStringBuilder(1, 1, 1, ' ', 0) for _, executorId := range repo.GetSortedExecutorIds() { if sr := getSchedulingReportForJob(byExecutor[executorId], jobId); sr.jobSchedulingContext != nil { - fmt.Fprintf(w, "%s:\n", executorId) - fmt.Fprint(w, indent.String("\t", sr.jobSchedulingContext.String())) + w.Writef("%s:\n", executorId) + w.Write(indent.String("\t", sr.jobSchedulingContext.String())) } else { - fmt.Fprintf(w, "%s: no recent scheduling round that affected job %s\n", executorId, jobId) + w.Writef("%s: no recent scheduling round that affected job %s\n", executorId, jobId) } } - w.Flush() - return sb.String() + return w.String() } func (repo *SchedulingContextRepository) GetMostRecentSchedulingContextByExecutor() SchedulingContextByExecutor { @@ -543,15 +534,13 @@ func (repo *SchedulingContextRepository) GetSortedExecutorIds() []string { } func (m SchedulingContextByExecutor) String() string { - var sb strings.Builder - w := tabwriter.NewWriter(&sb, 1, 1, 1, ' ', 0) + w := util.NewTabbedStringBuilder(1, 1, 1, ' ', 0) executorIds := maps.Keys(m) slices.Sort(executorIds) for _, executorId := range executorIds { sctx := m[executorId] - fmt.Fprintf(w, "%s:\n", executorId) - fmt.Fprint(w, indent.String("\t", sctx.String())) + w.Writef("%s:\n", executorId) + w.Write(indent.String("\t", sctx.String())) } - w.Flush() - return sb.String() + return w.String() } diff --git a/internal/scheduler/scheduler.go b/internal/scheduler/scheduler.go index 4084ae726a8..0a92de6606c 100644 --- a/internal/scheduler/scheduler.go +++ b/internal/scheduler/scheduler.go @@ -58,7 +58,7 @@ type Scheduler struct { // The label used when setting node anti affinities. nodeIdLabel string // If an executor fails to report in for this amount of time, - // all jobs assigne to that executor are cancelled. + // all jobs assigned to that executor are cancelled. executorTimeout time.Duration // The time the previous scheduling round ended previousSchedulingRoundEnd time.Time @@ -856,7 +856,7 @@ func (s *Scheduler) ensureDbUpToDate(ctx *armadacontext.Context, pollInterval ti ctx.Infof("Successfully ensured that database state is up to date") return nil } - ctx.Infof("Recevied %d partitions, still waiting on %d", numReceived, numSent-numReceived) + ctx.Infof("Received %d partitions, still waiting on %d", numReceived, numSent-numReceived) s.clock.Sleep(pollInterval) } } diff --git a/internal/scheduler/scheduler_metrics.go b/internal/scheduler/scheduler_metrics.go index 3ba197ebeba..cbc903b213b 100644 --- a/internal/scheduler/scheduler_metrics.go +++ b/internal/scheduler/scheduler_metrics.go @@ -209,7 +209,7 @@ func observeJobAggregates(ctx *armadacontext.Context, metric prometheus.CounterV if err != nil { // A metric failure isn't reason to kill the programme. - ctx.Errorf("error reteriving considered jobs observer for queue %s, priorityClass %s", queue, priorityClassName) + ctx.Errorf("error retrieving considered jobs observer for queue %s, priorityClass %s", queue, priorityClassName) } else { observer.Add(float64(count)) } @@ -224,7 +224,7 @@ func (metrics *SchedulerMetrics) reportNumberOfJobsConsidered(ctx *armadacontext observer, err := metrics.consideredJobs.GetMetricWithLabelValues(queue, pool) if err != nil { - ctx.Errorf("error reteriving considered jobs observer for queue %s, pool %s", queue, pool) + ctx.Errorf("error retrieving considered jobs observer for queue %s, pool %s", queue, pool) } else { observer.Add(float64(count)) } @@ -252,7 +252,7 @@ func (metrics *SchedulerMetrics) reportQueueShares(ctx *armadacontext.Context, s observer, err = metrics.actualSharePerQueue.GetMetricWithLabelValues(queue, pool) if err != nil { - ctx.Errorf("error reteriving considered jobs observer for queue %s, pool %s", queue, pool) + ctx.Errorf("error retrieving considered jobs observer for queue %s, pool %s", queue, pool) } else { observer.Set(actualShare) } diff --git a/internal/scheduler/scheduler_test.go b/internal/scheduler/scheduler_test.go index 584f4552d42..53514a995b6 100644 --- a/internal/scheduler/scheduler_test.go +++ b/internal/scheduler/scheduler_test.go @@ -188,7 +188,7 @@ func TestScheduler_TestCycle(t *testing.T) { expectedJobCancelled []string // ids of jobs we expect to have produced cancelled messages expectedJobReprioritised []string // ids of jobs we expect to have produced reprioritised messages expectedQueued []string // ids of jobs we expect to have produced requeued messages - expectedJobSucceeded []string // ids of jobs we expect to have produced succeeeded messages + expectedJobSucceeded []string // ids of jobs we expect to have produced succeeded messages expectedLeased []string // ids of jobs we expected to be leased in jobdb at the end of the cycle expectedRequeued []string // ids of jobs we expected to be requeued in jobdb at the end of the cycle expectedTerminal []string // ids of jobs we expected to be terminal in jobdb at the end of the cycle @@ -943,31 +943,31 @@ type testJobRepository struct { numReceivedPartitions uint32 } -func (t *testJobRepository) FindInactiveRuns(ctx *armadacontext.Context, runIds []uuid.UUID) ([]uuid.UUID, error) { +func (t *testJobRepository) FindInactiveRuns(_ *armadacontext.Context, _ []uuid.UUID) ([]uuid.UUID, error) { // TODO implement me panic("implement me") } -func (t *testJobRepository) FetchJobRunLeases(ctx *armadacontext.Context, executor string, maxResults uint, excludedRunIds []uuid.UUID) ([]*database.JobRunLease, error) { +func (t *testJobRepository) FetchJobRunLeases(_ *armadacontext.Context, _ string, _ uint, _ []uuid.UUID) ([]*database.JobRunLease, error) { // TODO implement me panic("implement me") } -func (t *testJobRepository) FetchJobUpdates(ctx *armadacontext.Context, jobSerial int64, jobRunSerial int64) ([]database.Job, []database.Run, error) { +func (t *testJobRepository) FetchJobUpdates(_ *armadacontext.Context, _ int64, _ int64) ([]database.Job, []database.Run, error) { if t.shouldError { - return nil, nil, errors.New("error fetchiung job updates") + return nil, nil, errors.New("error fetching job updates") } return t.updatedJobs, t.updatedRuns, nil } -func (t *testJobRepository) FetchJobRunErrors(ctx *armadacontext.Context, runIds []uuid.UUID) (map[uuid.UUID]*armadaevents.Error, error) { +func (t *testJobRepository) FetchJobRunErrors(_ *armadacontext.Context, _ []uuid.UUID) (map[uuid.UUID]*armadaevents.Error, error) { if t.shouldError { return nil, errors.New("error fetching job run errors") } return t.errors, nil } -func (t *testJobRepository) CountReceivedPartitions(ctx *armadacontext.Context, groupId uuid.UUID) (uint32, error) { +func (t *testJobRepository) CountReceivedPartitions(_ *armadacontext.Context, _ uuid.UUID) (uint32, error) { if t.shouldError { return 0, errors.New("error counting received partitions") } @@ -979,18 +979,18 @@ type testExecutorRepository struct { shouldError bool } -func (t testExecutorRepository) GetExecutors(ctx *armadacontext.Context) ([]*schedulerobjects.Executor, error) { +func (t testExecutorRepository) GetExecutors(_ *armadacontext.Context) ([]*schedulerobjects.Executor, error) { panic("not implemented") } -func (t testExecutorRepository) GetLastUpdateTimes(ctx *armadacontext.Context) (map[string]time.Time, error) { +func (t testExecutorRepository) GetLastUpdateTimes(_ *armadacontext.Context) (map[string]time.Time, error) { if t.shouldError { return nil, errors.New("error getting last update time") } return t.updateTimes, nil } -func (t testExecutorRepository) StoreExecutor(ctx *armadacontext.Context, executor *schedulerobjects.Executor) error { +func (t testExecutorRepository) StoreExecutor(_ *armadacontext.Context, _ *schedulerobjects.Executor) error { panic("not implemented") } @@ -1001,7 +1001,7 @@ type testSchedulingAlgo struct { shouldError bool } -func (t *testSchedulingAlgo) Schedule(ctx *armadacontext.Context, txn *jobdb.Txn, jobDb *jobdb.JobDb) (*SchedulerResult, error) { +func (t *testSchedulingAlgo) Schedule(_ *armadacontext.Context, txn *jobdb.Txn, jobDb *jobdb.JobDb) (*SchedulerResult, error) { t.numberOfScheduleCalls++ if t.shouldError { return nil, errors.New("error scheduling jobs") @@ -1049,7 +1049,7 @@ type testPublisher struct { shouldError bool } -func (t *testPublisher) PublishMessages(ctx *armadacontext.Context, events []*armadaevents.EventSequence, _ func() bool) error { +func (t *testPublisher) PublishMessages(_ *armadacontext.Context, events []*armadaevents.EventSequence, _ func() bool) error { t.events = events if t.shouldError { return errors.New("Error when publishing") @@ -1061,7 +1061,7 @@ func (t *testPublisher) Reset() { t.events = nil } -func (t *testPublisher) PublishMarkers(ctx *armadacontext.Context, groupId uuid.UUID) (uint32, error) { +func (t *testPublisher) PublishMarkers(_ *armadacontext.Context, _ uuid.UUID) (uint32, error) { return 100, nil } diff --git a/internal/scheduler/schedulerobjects/executor.go b/internal/scheduler/schedulerobjects/executor.go index 30cf94bbfc4..95c41f9f592 100644 --- a/internal/scheduler/schedulerobjects/executor.go +++ b/internal/scheduler/schedulerobjects/executor.go @@ -7,7 +7,7 @@ import ( func (m *Executor) AllRuns() ([]uuid.UUID, error) { runIds := make([]uuid.UUID, 0) - // add all runids from nodes + // add all runIds from nodes for _, node := range m.Nodes { for runIdStr := range node.StateByJobRunId { runId, err := uuid.Parse(runIdStr) @@ -17,7 +17,7 @@ func (m *Executor) AllRuns() ([]uuid.UUID, error) { runIds = append(runIds, runId) } } - // add all unassigned runids + // add all unassigned runIds for _, runIdStr := range m.UnassignedJobRuns { runId, err := uuid.Parse(runIdStr) if err != nil { diff --git a/internal/scheduler/schedulerobjects/nodematching.go b/internal/scheduler/schedulerobjects/nodematching.go index 6db3402526a..45cefafa8f2 100644 --- a/internal/scheduler/schedulerobjects/nodematching.go +++ b/internal/scheduler/schedulerobjects/nodematching.go @@ -22,7 +22,7 @@ const ( type PodRequirementsNotMetReason interface { fmt.Stringer - // Returns a 64-bit hash of this reason. + // Sum64 returns a 64-bit hash of this reason. Sum64() uint64 } @@ -84,7 +84,7 @@ func (r *UnmatchedNodeSelector) Sum64() uint64 { return h } -func (err *UnmatchedNodeSelector) String() string { +func (r *UnmatchedNodeSelector) String() string { return PodRequirementsNotMetReasonUnmatchedNodeSelector } @@ -102,9 +102,9 @@ func (r *InsufficientResources) Sum64() uint64 { return h } -func (err *InsufficientResources) String() string { - return "pod requires " + err.Required.String() + " " + err.Resource + ", but only " + - err.Available.String() + " is available" +func (r *InsufficientResources) String() string { + return "pod requires " + r.Required.String() + " " + r.Resource + ", but only " + + r.Available.String() + " is available" } // PodRequirementsMet determines whether a pod can be scheduled on nodes of this NodeType. @@ -150,11 +150,11 @@ func StaticPodRequirementsMet(taints []v1.Taint, labels map[string]string, total return matches, reason, err } - for resource, required := range req.ResourceRequirements.Requests { - available := totalResources.Get(string(resource)) + for resourceType, required := range req.ResourceRequirements.Requests { + available := totalResources.Get(string(resourceType)) if required.Cmp(available) == 1 { return false, &InsufficientResources{ - Resource: string(resource), + Resource: string(resourceType), Required: required, Available: available, }, nil @@ -236,11 +236,11 @@ func podNodeAffinityRequirementsMet(nodeLabels map[string]string, req *PodRequir } func podResourceRequirementsMet(allocatableResources ResourceList, req *PodRequirements) (bool, PodRequirementsNotMetReason, error) { - for resource, required := range req.ResourceRequirements.Requests { - available := allocatableResources.Get(string(resource)) + for resourceType, required := range req.ResourceRequirements.Requests { + available := allocatableResources.Get(string(resourceType)) if required.Cmp(available) == 1 { return false, &InsufficientResources{ - Resource: string(resource), + Resource: string(resourceType), Required: required, Available: available, }, nil diff --git a/internal/scheduler/schedulerobjects/nodetype.go b/internal/scheduler/schedulerobjects/nodetype.go index d52d35e36fb..030bf649f08 100644 --- a/internal/scheduler/schedulerobjects/nodetype.go +++ b/internal/scheduler/schedulerobjects/nodetype.go @@ -62,10 +62,10 @@ func NewNodeType(taints []v1.Taint, labels map[string]string, indexedTaints map[ } } -// nodeTypeIdFromTaintsAndLabels generates a id unique for each combination of taints, labels, and unset labels. +// nodeTypeIdFromTaintsAndLabels generates a unique id for each combination of taints, labels, and unset labels. // The id is based on the fnv1a hash. Hash collisions do not affect correctness, only the efficiency of sorting out nodes. // -// We separate taints/labels by $, labels and values by =, and and groups by &, +// We separate taints/labels by $, labels and values by =, and groups by &, // since these characters are not allowed in taints and labels; see // https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#syntax-and-character-set // https://man.archlinux.org/man/community/kubectl/kubectl-taint.1.en @@ -107,7 +107,7 @@ func getFilteredTaints(taints []v1.Taint, inclusionFilter taintsFilterFunc) []v1 if inclusionFilter == nil { return taints } - filteredTaints := []v1.Taint{} + var filteredTaints []v1.Taint for _, taint := range taints { if !inclusionFilter(&taint) { continue diff --git a/internal/scheduler/schedulerobjects/resourcelist.go b/internal/scheduler/schedulerobjects/resourcelist.go index 9a3c67eb5e3..a33a34626ac 100644 --- a/internal/scheduler/schedulerobjects/resourcelist.go +++ b/internal/scheduler/schedulerobjects/resourcelist.go @@ -2,7 +2,7 @@ package schedulerobjects import ( "fmt" - math "math" + "math" "strings" v1 "k8s.io/api/core/v1" @@ -174,30 +174,30 @@ func (rl *ResourceList) Set(t string, q resource.Quantity) { rl.Resources[t] = q } -func (a *ResourceList) Add(b ResourceList) { - a.initialise() - for t, qb := range b.Resources { - qa := a.Resources[t] +func (rl *ResourceList) Add(other ResourceList) { + rl.initialise() + for t, qb := range other.Resources { + qa := rl.Resources[t] qa.Add(qb) - a.Resources[t] = qa + rl.Resources[t] = qa } } -func (a *ResourceList) AddV1ResourceList(b v1.ResourceList) { - a.initialise() - for t, qb := range b { - qa := a.Resources[string(t)] +func (rl *ResourceList) AddV1ResourceList(other v1.ResourceList) { + rl.initialise() + for t, qb := range other { + qa := rl.Resources[string(t)] qa.Add(qb) - a.Resources[string(t)] = qa + rl.Resources[string(t)] = qa } } -func (a *ResourceList) SubV1ResourceList(b v1.ResourceList) { - a.initialise() - for t, qb := range b { - qa := a.Resources[string(t)] +func (rl *ResourceList) SubV1ResourceList(other v1.ResourceList) { + rl.initialise() + for t, qb := range other { + qa := rl.Resources[string(t)] qa.Sub(qb) - a.Resources[string(t)] = qa + rl.Resources[string(t)] = qa } } @@ -208,12 +208,12 @@ func (rl *ResourceList) AddQuantity(resourceType string, quantity resource.Quant rl.Resources[resourceType] = q } -func (a *ResourceList) Sub(b ResourceList) { - a.initialise() - for t, qb := range b.Resources { - qa := a.Resources[t] +func (rl *ResourceList) Sub(other ResourceList) { + rl.initialise() + for t, qb := range other.Resources { + qa := rl.Resources[t] qa.Sub(qb) - a.Resources[t] = qa + rl.Resources[t] = qa } } @@ -245,8 +245,8 @@ func (rl ResourceList) Zero() { } } -func (a ResourceList) IsZero() bool { - for _, q := range a.Resources { +func (rl ResourceList) IsZero() bool { + for _, q := range rl.Resources { if !q.IsZero() { return false } @@ -254,14 +254,14 @@ func (a ResourceList) IsZero() bool { return true } -func (a ResourceList) Equal(b ResourceList) bool { - for t, qa := range a.Resources { - if qa.Cmp(b.Get(t)) != 0 { +func (rl ResourceList) Equal(other ResourceList) bool { + for t, qa := range rl.Resources { + if qa.Cmp(other.Get(t)) != 0 { return false } } - for t, qb := range b.Resources { - if qb.Cmp(a.Get(t)) != 0 { + for t, qb := range other.Resources { + if qb.Cmp(rl.Get(t)) != 0 { return false } } @@ -269,8 +269,8 @@ func (a ResourceList) Equal(b ResourceList) bool { } // IsStrictlyNonNegative returns true if there is no quantity less than zero. -func (a ResourceList) IsStrictlyNonNegative() bool { - for _, q := range a.Resources { +func (rl ResourceList) IsStrictlyNonNegative() bool { + for _, q := range rl.Resources { if q.Cmp(resource.Quantity{}) == -1 { return false } @@ -279,9 +279,9 @@ func (a ResourceList) IsStrictlyNonNegative() bool { } // IsStrictlyLessOrEqual returns false if there is a quantity in b greater than that in a and true otherwise. -func (a ResourceList) IsStrictlyLessOrEqual(b ResourceList) bool { - for t, q := range b.Resources { - if q.Cmp(a.Get(t)) == -1 { +func (rl ResourceList) IsStrictlyLessOrEqual(other ResourceList) bool { + for t, q := range other.Resources { + if q.Cmp(rl.Get(t)) == -1 { return false } } @@ -408,18 +408,18 @@ func (m AllocatedByPriorityAndResourceType) MarkAllocatable(p int32, rs Resource } } -func (allocatableByPriorityAndResourceType AllocatableByPriorityAndResourceType) Get(priority int32, resourceType string) resource.Quantity { - if allocatableByPriorityAndResourceType == nil { +func (m AllocatableByPriorityAndResourceType) Get(priority int32, resourceType string) resource.Quantity { + if m == nil { return resource.Quantity{} } - quantityByResourceType := allocatableByPriorityAndResourceType[priority] + quantityByResourceType := m[priority] return quantityByResourceType.Get(resourceType) } -func (assignedByPriorityAndResourceType AllocatedByPriorityAndResourceType) Get(priority int32, resourceType string) resource.Quantity { - if assignedByPriorityAndResourceType == nil { +func (m AllocatedByPriorityAndResourceType) Get(priority int32, resourceType string) resource.Quantity { + if m == nil { return resource.Quantity{} } - quantityByResourceType := assignedByPriorityAndResourceType[priority] + quantityByResourceType := m[priority] return quantityByResourceType.Get(resourceType) } diff --git a/internal/scheduler/scheduling_algo.go b/internal/scheduler/scheduling_algo.go index 563ab93f000..21ce7ccc3eb 100644 --- a/internal/scheduler/scheduling_algo.go +++ b/internal/scheduler/scheduling_algo.go @@ -357,7 +357,7 @@ func (l *FairSchedulingAlgo) scheduleOnExecutors( executorId = executors[0].Id } totalResources := fsctx.totalCapacityByPool[pool] - var fairnessCostProvider fairness.FairnessCostProvider + var fairnessCostProvider fairness.CostProvider if l.schedulingConfig.FairnessModel == configuration.DominantResourceFairness { fairnessCostProvider, err = fairness.NewDominantResourceFairness( totalResources,