Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
d80tb7 committed Sep 17, 2023
1 parent edc1426 commit 2fe5a7f
Show file tree
Hide file tree
Showing 42 changed files with 383 additions and 346 deletions.
2 changes: 1 addition & 1 deletion internal/armada/server/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,7 @@ func (q *AggregatedQueueServer) getJobs(ctx *armadacontext.Context, req *api.Str
defer cancel()
}

var fairnessCostProvider fairness.FairnessCostProvider
var fairnessCostProvider fairness.CostProvider
totalResources := schedulerobjects.ResourceList{Resources: totalCapacity}
if q.schedulingConfig.FairnessModel == configuration.DominantResourceFairness {
fairnessCostProvider, err = fairness.NewDominantResourceFairness(
Expand Down
47 changes: 47 additions & 0 deletions internal/common/util/tabbed_string_builder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package util

import (
"fmt"
"strings"
"text/tabwriter"
)

// TabbedStringBuilder is a wrapper around a *tabwriter.Writer that allows for efficiently building
// tab-aligned strings.
// This exists as *tabwriter.Writer exposes a more complicated interface which returns errors to the
// caller, propagated for the underlying IOWriter. In this case we ensure that the underlying Writer is
// a strings.Builder. This never returns errors therefore we can provide a simpler interface where the caller
// doesn't need to consider error handling.
type TabbedStringBuilder struct {
sb *strings.Builder
writer *tabwriter.Writer
}

// NewTabbedStringBuilder creates a new TabbedStringBuilder. All parameters are equivalent to those defined in tabwriter.NewWriter
func NewTabbedStringBuilder(minwidth, tabwidth, padding int, padchar byte, flags uint) *TabbedStringBuilder {
sb := &strings.Builder{}
return &TabbedStringBuilder{
sb: sb,
writer: tabwriter.NewWriter(sb, minwidth, tabwidth, padding, padchar, flags),
}
}

// Writef formats according to a format specifier and writes to the underlying writer
func (t *TabbedStringBuilder) Writef(format string, a ...any) {
// should be safe ignore the error here as strings.Builder never errors
_, _ = fmt.Fprintf(t.writer, format, a...)
}

// Write the string to the underlying writer
func (t *TabbedStringBuilder) Write(a ...any) {
// should be safe ignore the error here as strings.Builder never errors
_, _ = fmt.Fprint(t.writer, a...)
}

// String returns the accumulated string.
// Flush on the underlying writer is automatically called
func (t *TabbedStringBuilder) String() string {
// should be safe ignore the error here as strings.Builder never errors
_ = t.writer.Flush()
return t.sb.String()
}
20 changes: 20 additions & 0 deletions internal/common/util/tabbed_string_builder_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package util

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestTabbedStringBuilder_TestSimple(t *testing.T) {
w := NewTabbedStringBuilder(1, 1, 1, ' ', 0)
w.Writef("a:\t%s", "b")
assert.Equal(t, "a: b", w.String())
}

func TestTabbedStringBuilderWriter_TestComplex(t *testing.T) {
w := NewTabbedStringBuilder(1, 1, 1, ' ', 0)
w.Writef("a:\t%s\n", "b")
w.Writef("a:\t%.2f\t%d\n", 1.5, 2)
assert.Equal(t, "a: b\na: 1.50 2\n", w.String())
}
2 changes: 1 addition & 1 deletion internal/scheduler/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func PreemptedJobsFromSchedulerResult[T interfaces.LegacySchedulerJob](sr *Sched
return rv
}

// ScheduledJobsFromScheduleResult returns the slice of scheduled jobs in the result,
// ScheduledJobsFromSchedulerResult returns the slice of scheduled jobs in the result,
// cast to type T.
func ScheduledJobsFromSchedulerResult[T interfaces.LegacySchedulerJob](sr *SchedulerResult) []T {
rv := make([]T, len(sr.ScheduledJobs))
Expand Down
2 changes: 1 addition & 1 deletion internal/scheduler/configuration/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ const (
type Configuration struct {
// Database configuration
Postgres configuration.PostgresConfig
// Redis Comnfig
// Redis Config
Redis config.RedisConfig
// General Pulsar configuration
Pulsar configuration.PulsarConfig
Expand Down
4 changes: 2 additions & 2 deletions internal/scheduler/constraints/constraints.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
)

const (
// Indicates that the limit on resources scheduled per round has been exceeded.
// Indicates that the limit on resources scheduled per round has been exceeded.
MaximumResourcesScheduledUnschedulableReason = "maximum resources scheduled"

// Indicates that a queue has been assigned more than its allowed amount of resources.
Expand Down Expand Up @@ -120,7 +120,7 @@ func ScaleQuantity(q resource.Quantity, f float64) resource.Quantity {
return q
}

func (constraints *SchedulingConstraints) CheckRoundConstraints(sctx *schedulercontext.SchedulingContext, queue string) (bool, string, error) {
func (constraints *SchedulingConstraints) CheckRoundConstraints(sctx *schedulercontext.SchedulingContext) (bool, string, error) {
// MaximumResourcesToSchedule check.
if !sctx.ScheduledResources.IsStrictlyLessOrEqual(constraints.MaximumResourcesToSchedule) {
return false, MaximumResourcesScheduledUnschedulableReason, nil
Expand Down
2 changes: 1 addition & 1 deletion internal/scheduler/constraints/constraints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func TestConstraints(t *testing.T) {
}{} // TODO: Add tests.
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
ok, unschedulableReason, err := tc.constraints.CheckRoundConstraints(tc.sctx, tc.queue)
ok, unschedulableReason, err := tc.constraints.CheckRoundConstraints(tc.sctx)
require.NoError(t, err)
require.Equal(t, tc.globalUnschedulableReason == "", ok)
require.Equal(t, tc.globalUnschedulableReason, unschedulableReason)
Expand Down
136 changes: 60 additions & 76 deletions internal/scheduler/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package context

import (
"fmt"
"strings"
"text/tabwriter"
"time"

"github.com/openconfig/goyang/pkg/indent"
Expand All @@ -17,6 +15,7 @@ import (
armadamaps "github.com/armadaproject/armada/internal/common/maps"
armadaslices "github.com/armadaproject/armada/internal/common/slices"
"github.com/armadaproject/armada/internal/common/types"
"github.com/armadaproject/armada/internal/common/util"
schedulerconfig "github.com/armadaproject/armada/internal/scheduler/configuration"
"github.com/armadaproject/armada/internal/scheduler/fairness"
"github.com/armadaproject/armada/internal/scheduler/interfaces"
Expand All @@ -38,7 +37,7 @@ type SchedulingContext struct {
// Default priority class.
DefaultPriorityClass string
// Determines how fairness is computed.
FairnessCostProvider fairness.FairnessCostProvider
FairnessCostProvider fairness.CostProvider
// Limits job scheduling rate globally across all queues.
// Use the "Started" time to ensure limiter state remains constant within each scheduling round.
Limiter *rate.Limiter
Expand Down Expand Up @@ -66,7 +65,7 @@ type SchedulingContext struct {
// Used to efficiently generate scheduling keys.
SchedulingKeyGenerator *schedulerobjects.SchedulingKeyGenerator
// Record of job scheduling requirements known to be unfeasible.
// Used to immediately reject new jobs with identical reqirements.
// Used to immediately reject new jobs with identical requirements.
// Maps to the JobSchedulingContext of a previous job attempted to schedule with the same key.
UnfeasibleSchedulingKeys map[schedulerobjects.SchedulingKey]*JobSchedulingContext
}
Expand All @@ -76,7 +75,7 @@ func NewSchedulingContext(
pool string,
priorityClasses map[string]types.PriorityClass,
defaultPriorityClass string,
fairnessCostProvider fairness.FairnessCostProvider,
fairnessCostProvider fairness.CostProvider,
limiter *rate.Limiter,
totalResources schedulerobjects.ResourceList,
) *SchedulingContext {
Expand Down Expand Up @@ -177,31 +176,30 @@ func (sctx *SchedulingContext) TotalCost() float64 {
}

func (sctx *SchedulingContext) ReportString(verbosity int32) string {
var sb strings.Builder
w := tabwriter.NewWriter(&sb, 1, 1, 1, ' ', 0)
fmt.Fprintf(w, "Started:\t%s\n", sctx.Started)
fmt.Fprintf(w, "Finished:\t%s\n", sctx.Finished)
fmt.Fprintf(w, "Duration:\t%s\n", sctx.Finished.Sub(sctx.Started))
fmt.Fprintf(w, "Termination reason:\t%s\n", sctx.TerminationReason)
fmt.Fprintf(w, "Total capacity:\t%s\n", sctx.TotalResources.CompactString())
fmt.Fprintf(w, "Scheduled resources:\t%s\n", sctx.ScheduledResources.CompactString())
fmt.Fprintf(w, "Preempted resources:\t%s\n", sctx.EvictedResources.CompactString())
fmt.Fprintf(w, "Number of gangs scheduled:\t%d\n", sctx.NumScheduledGangs)
fmt.Fprintf(w, "Number of jobs scheduled:\t%d\n", sctx.NumScheduledJobs)
fmt.Fprintf(w, "Number of jobs preempted:\t%d\n", sctx.NumEvictedJobs)
w := util.NewTabbedStringBuilder(1, 1, 1, ' ', 0)
w.Writef("Started:\t%s\n", sctx.Started)
w.Writef("Finished:\t%s\n", sctx.Finished)
w.Writef("Duration:\t%s\n", sctx.Finished.Sub(sctx.Started))
w.Writef("Termination reason:\t%s\n", sctx.TerminationReason)
w.Writef("Total capacity:\t%s\n", sctx.TotalResources.CompactString())
w.Writef("Scheduled resources:\t%s\n", sctx.ScheduledResources.CompactString())
w.Writef("Preempted resources:\t%s\n", sctx.EvictedResources.CompactString())
w.Writef("Number of gangs scheduled:\t%d\n", sctx.NumScheduledGangs)
w.Writef("Number of jobs scheduled:\t%d\n", sctx.NumScheduledJobs)
w.Writef("Number of jobs preempted:\t%d\n", sctx.NumEvictedJobs)
scheduled := armadamaps.Filter(
sctx.QueueSchedulingContexts,
func(_ string, qctx *QueueSchedulingContext) bool {
return len(qctx.SuccessfulJobSchedulingContexts) > 0
},
)
if verbosity <= 0 {
fmt.Fprintf(w, "Scheduled queues:\t%v\n", maps.Keys(scheduled))
w.Writef("Scheduled queues:\t%v\n", maps.Keys(scheduled))
} else {
fmt.Fprint(w, "Scheduled queues:\n")
w.Write("Scheduled queues:\n")
for queueName, qctx := range scheduled {
fmt.Fprintf(w, "\t%s:\n", queueName)
fmt.Fprint(w, indent.String("\t\t", qctx.ReportString(verbosity-2)))
w.Writef("\t%s:\n", queueName)
w.Write(indent.String("\t\t", qctx.ReportString(verbosity-2)))
}
}
preempted := armadamaps.Filter(
Expand All @@ -211,16 +209,15 @@ func (sctx *SchedulingContext) ReportString(verbosity int32) string {
},
)
if verbosity <= 0 {
fmt.Fprintf(w, "Preempted queues:\t%v\n", maps.Keys(preempted))
w.Writef("Preempted queues:\t%v\n", maps.Keys(preempted))
} else {
fmt.Fprint(w, "Preempted queues:\n")
w.Write("Preempted queues:\n")
for queueName, qctx := range preempted {
fmt.Fprintf(w, "\t%s:\n", queueName)
fmt.Fprint(w, indent.String("\t\t", qctx.ReportString(verbosity-2)))
w.Writef("\t%s:\n", queueName)
w.Writef(indent.String("\t\t", qctx.ReportString(verbosity-2)))
}
}
w.Flush()
return sb.String()
return w.String()
}

func (sctx *SchedulingContext) AddGangSchedulingContext(gctx *GangSchedulingContext) (bool, error) {
Expand Down Expand Up @@ -367,13 +364,6 @@ type QueueSchedulingContext struct {
EvictedJobsById map[string]bool
}

func GetSchedulingContextFromQueueSchedulingContext(qctx *QueueSchedulingContext) *SchedulingContext {
if qctx == nil {
return nil
}
return qctx.SchedulingContext
}

func (qctx *QueueSchedulingContext) String() string {
return qctx.ReportString(0)
}
Expand All @@ -391,48 +381,47 @@ func (qctx *QueueSchedulingContext) GetWeight() float64 {
const maxJobIdsToPrint = 1

func (qctx *QueueSchedulingContext) ReportString(verbosity int32) string {
var sb strings.Builder
w := tabwriter.NewWriter(&sb, 1, 1, 1, ' ', 0)
w := util.NewTabbedStringBuilder(1, 1, 1, ' ', 0)
if verbosity >= 0 {
fmt.Fprintf(w, "Time:\t%s\n", qctx.Created)
fmt.Fprintf(w, "Queue:\t%s\n", qctx.Queue)
w.Writef("Time:\t%s\n", qctx.Created)
w.Writef("Queue:\t%s\n", qctx.Queue)
}
fmt.Fprintf(w, "Scheduled resources:\t%s\n", qctx.ScheduledResourcesByPriorityClass.AggregateByResource().CompactString())
fmt.Fprintf(w, "Scheduled resources (by priority):\t%s\n", qctx.ScheduledResourcesByPriorityClass.String())
fmt.Fprintf(w, "Preempted resources:\t%s\n", qctx.EvictedResourcesByPriorityClass.AggregateByResource().CompactString())
fmt.Fprintf(w, "Preempted resources (by priority):\t%s\n", qctx.EvictedResourcesByPriorityClass.String())
w.Writef("Scheduled resources:\t%s\n", qctx.ScheduledResourcesByPriorityClass.AggregateByResource().CompactString())
w.Writef("Scheduled resources (by priority):\t%s\n", qctx.ScheduledResourcesByPriorityClass.String())
w.Writef("Preempted resources:\t%s\n", qctx.EvictedResourcesByPriorityClass.AggregateByResource().CompactString())
w.Writef("Preempted resources (by priority):\t%s\n", qctx.EvictedResourcesByPriorityClass.String())
if verbosity >= 0 {
fmt.Fprintf(w, "Total allocated resources after scheduling:\t%s\n", qctx.Allocated.CompactString())
fmt.Fprintf(w, "Total allocated resources after scheduling by priority class:\t%s\n", qctx.AllocatedByPriorityClass)
fmt.Fprintf(w, "Number of jobs scheduled:\t%d\n", len(qctx.SuccessfulJobSchedulingContexts))
fmt.Fprintf(w, "Number of jobs preempted:\t%d\n", len(qctx.EvictedJobsById))
fmt.Fprintf(w, "Number of jobs that could not be scheduled:\t%d\n", len(qctx.UnsuccessfulJobSchedulingContexts))
w.Writef("Total allocated resources after scheduling:\t%s\n", qctx.Allocated.CompactString())
w.Writef("Total allocated resources after scheduling by priority class:\t%s\n", qctx.AllocatedByPriorityClass)
w.Writef("Number of jobs scheduled:\t%d\n", len(qctx.SuccessfulJobSchedulingContexts))
w.Writef("Number of jobs preempted:\t%d\n", len(qctx.EvictedJobsById))
w.Writef("Number of jobs that could not be scheduled:\t%d\n", len(qctx.UnsuccessfulJobSchedulingContexts))
if len(qctx.SuccessfulJobSchedulingContexts) > 0 {
jobIdsToPrint := maps.Keys(qctx.SuccessfulJobSchedulingContexts)
if len(jobIdsToPrint) > maxJobIdsToPrint {
jobIdsToPrint = jobIdsToPrint[0:maxJobIdsToPrint]
}
fmt.Fprintf(w, "Scheduled jobs:\t%v", jobIdsToPrint)
w.Writef("Scheduled jobs:\t%v", jobIdsToPrint)
if len(jobIdsToPrint) != len(qctx.SuccessfulJobSchedulingContexts) {
fmt.Fprintf(w, " (and %d others not shown)\n", len(qctx.SuccessfulJobSchedulingContexts)-len(jobIdsToPrint))
w.Writef(" (and %d others not shown)\n", len(qctx.SuccessfulJobSchedulingContexts)-len(jobIdsToPrint))
} else {
fmt.Fprint(w, "\n")
w.Write("\n")
}
}
if len(qctx.EvictedJobsById) > 0 {
jobIdsToPrint := maps.Keys(qctx.EvictedJobsById)
if len(jobIdsToPrint) > maxJobIdsToPrint {
jobIdsToPrint = jobIdsToPrint[0:maxJobIdsToPrint]
}
fmt.Fprintf(w, "Preempted jobs:\t%v", jobIdsToPrint)
w.Writef("Preempted jobs:\t%v", jobIdsToPrint)
if len(jobIdsToPrint) != len(qctx.EvictedJobsById) {
fmt.Fprintf(w, " (and %d others not shown)\n", len(qctx.EvictedJobsById)-len(jobIdsToPrint))
w.Writef(" (and %d others not shown)\n", len(qctx.EvictedJobsById)-len(jobIdsToPrint))
} else {
fmt.Fprint(w, "\n")
w.Write("\n")
}
}
if len(qctx.UnsuccessfulJobSchedulingContexts) > 0 {
fmt.Fprint(w, "Unschedulable jobs:\n")
w.Write("Unschedulable jobs:\n")
jobIdsByReason := armadaslices.MapAndGroupByFuncs(
maps.Values(qctx.UnsuccessfulJobSchedulingContexts),
func(jctx *JobSchedulingContext) string {
Expand All @@ -450,12 +439,11 @@ func (qctx *QueueSchedulingContext) ReportString(verbosity int32) string {
if len(jobIds) <= 0 {
continue
}
fmt.Fprintf(w, "\t%d:\t%s (e.g., %s)\n", len(jobIds), reason, jobIds[0])
w.Writef("\t%d:\t%s (e.g., %s)\n", len(jobIds), reason, jobIds[0])
}
}
}
w.Flush()
return sb.String()
return w.String()
}

func (qctx *QueueSchedulingContext) AddGangSchedulingContext(gctx *GangSchedulingContext) error {
Expand Down Expand Up @@ -603,20 +591,18 @@ type JobSchedulingContext struct {
}

func (jctx *JobSchedulingContext) String() string {
var sb strings.Builder
w := tabwriter.NewWriter(&sb, 1, 1, 1, ' ', 0)
fmt.Fprintf(w, "Time:\t%s\n", jctx.Created)
fmt.Fprintf(w, "Job ID:\t%s\n", jctx.JobId)
w := util.NewTabbedStringBuilder(1, 1, 1, ' ', 0)
w.Writef("Time:\t%s\n", jctx.Created)
w.Writef("Job ID:\t%s\n", jctx.JobId)
if jctx.UnschedulableReason != "" {
fmt.Fprintf(w, "UnschedulableReason:\t%s\n", jctx.UnschedulableReason)
w.Writef("UnschedulableReason:\t%s\n", jctx.UnschedulableReason)
} else {
fmt.Fprint(w, "UnschedulableReason:\tnone\n")
w.Write("UnschedulableReason:\tnone\n")
}
if jctx.PodSchedulingContext != nil {
fmt.Fprint(w, jctx.PodSchedulingContext.String())
w.Write(jctx.PodSchedulingContext.String())
}
w.Flush()
return sb.String()
return w.String()
}

func (jctx *JobSchedulingContext) IsSuccessful() bool {
Expand Down Expand Up @@ -658,22 +644,20 @@ type PodSchedulingContext struct {
}

func (pctx *PodSchedulingContext) String() string {
var sb strings.Builder
w := tabwriter.NewWriter(&sb, 1, 1, 1, ' ', 0)
w := util.NewTabbedStringBuilder(1, 1, 1, ' ', 0)
if pctx.NodeId != "" {
fmt.Fprintf(w, "Node:\t%s\n", pctx.NodeId)
w.Writef("Node:\t%s\n", pctx.NodeId)
} else {
fmt.Fprint(w, "Node:\tnone\n")
w.Write("Node:\tnone\n")
}
fmt.Fprintf(w, "Number of nodes in cluster:\t%d\n", pctx.NumNodes)
w.Writef("Number of nodes in cluster:\t%d\n", pctx.NumNodes)
if len(pctx.NumExcludedNodesByReason) == 0 {
fmt.Fprint(w, "Excluded nodes:\tnone\n")
w.Write("Excluded nodes:\tnone\n")
} else {
fmt.Fprint(w, "Excluded nodes:\n")
w.Write("Excluded nodes:\n")
for reason, count := range pctx.NumExcludedNodesByReason {
fmt.Fprintf(w, "\t%d:\t%s\n", count, reason)
w.Writef("\t%d:\t%s\n", count, reason)
}
}
w.Flush()
return sb.String()
return w.String()
}
Loading

0 comments on commit 2fe5a7f

Please sign in to comment.