Skip to content

Commit

Permalink
Fix per-pc resource limits
Browse files Browse the repository at this point in the history
  • Loading branch information
severinson committed Jun 22, 2023
1 parent f869cea commit f11417a
Show file tree
Hide file tree
Showing 19 changed files with 768 additions and 596 deletions.
6 changes: 1 addition & 5 deletions internal/armada/configuration/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,12 +233,8 @@ type PriorityClass struct {
Priority int32
// If true, Armada may preempt jobs of this class to improve fairness.
Preemptible bool
// Limits resources assigned to jobs of priority equal to or lower than that of this priority class.
// Limits resources assigned to jobs of this priority class.
// Specifically, jobs of this priority class are only scheduled if doing so does not exceed this limit.
//
// For example, if priority is 10 and MaximumResourceFractionPerQueue is map[string]float64{"cpu": 0.3},
// jobs of this priority class are not scheduled if doing so would cause the total resources assigned
// to jobs of priority 10 or lower from the same queue to exceed 30% of the total.
MaximumResourceFractionPerQueue map[string]float64
// Per-pool override of MaximumResourceFractionPerQueue.
// If missing for a particular pool, MaximumResourceFractionPerQueue is used instead for that pool.
Expand Down
205 changes: 121 additions & 84 deletions internal/armada/server/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
schedulerconstraints "github.com/armadaproject/armada/internal/scheduler/constraints"
schedulercontext "github.com/armadaproject/armada/internal/scheduler/context"
"github.com/armadaproject/armada/internal/scheduler/database"
"github.com/armadaproject/armada/internal/scheduler/interfaces"
schedulerinterfaces "github.com/armadaproject/armada/internal/scheduler/interfaces"
"github.com/armadaproject/armada/internal/scheduler/nodedb"
"github.com/armadaproject/armada/internal/scheduler/schedulerobjects"
Expand Down Expand Up @@ -282,10 +283,31 @@ func (q *AggregatedQueueServer) getJobs(ctx context.Context, req *api.StreamingL
})
}

// Map queue names to priority factor for all active queues, i.e.,
// all queues for which the jobs queue has not been deleted automatically by Redis.
queues, err := q.queueRepository.GetAllQueues()
if err != nil {
return nil, err
}
priorityFactorByQueue := make(map[string]float64, len(queues))
apiQueues := make([]*api.Queue, len(queues))
for i, queue := range queues {
priorityFactorByQueue[queue.Name] = float64(queue.PriorityFactor)
apiQueues[i] = &api.Queue{Name: queue.Name}
}
activeQueues, err := q.jobRepository.FilterActiveQueues(apiQueues)
if err != nil {
return nil, err
}
priorityFactorByActiveQueue := make(map[string]float64, len(activeQueues))
for _, queue := range activeQueues {
priorityFactorByActiveQueue[queue.Name] = priorityFactorByQueue[queue.Name]
}

// Nodes to be considered by the scheduler.
lastSeen := q.clock.Now()
nodes := make([]*schedulerobjects.Node, 0, len(req.Nodes))
allocatedByQueueForCluster := make(map[string]schedulerobjects.QuantityByPriorityAndResourceType)
allocatedByQueueAndPriorityClassForCluster := make(map[string]schedulerobjects.QuantityByTAndResourceType[string], len(queues))
jobIdsByGangId := make(map[string]map[string]bool)
gangIdByJobId := make(map[string]string)
nodeIdByJobId := make(map[string]string)
Expand Down Expand Up @@ -332,11 +354,9 @@ func (q *AggregatedQueueServer) getJobs(ctx context.Context, req *api.StreamingL
}

// Aggregate total resources allocated by queue for this cluster.
allocatedByQueueForCluster = scheduler.UpdateUsage(
allocatedByQueueForCluster,
jobs,
q.schedulingConfig.Preemption.PriorityClasses,
scheduler.Add,
allocatedByQueueAndPriorityClassForCluster = updateAllocatedByQueueAndPriorityClass(
allocatedByQueueAndPriorityClassForCluster,
add, jobs,
)

// Group gangs.
Expand Down Expand Up @@ -398,30 +418,36 @@ func (q *AggregatedQueueServer) getJobs(ctx context.Context, req *api.StreamingL
return nil, err
}

// Load executor reports for all clusters, and insert an updated report for this cluster.
// Load allocation reports for all executors from Redis.
reportsByExecutor, err := q.usageRepository.GetClusterQueueResourceUsage()
if err != nil {
return nil, err
}
executorReport := &schedulerobjects.ClusterResourceUsageReport{

// Insert an updated report for the current executor, which includes information received in this lease call.
currentExecutorReport := &schedulerobjects.ClusterResourceUsageReport{
Pool: req.Pool,
Created: q.clock.Now(),
ResourcesByQueue: make(map[string]*schedulerobjects.QueueClusterResourceUsage),
ResourcesByQueue: make(map[string]*schedulerobjects.QueueClusterResourceUsage, len(queues)),
}
for queue, allocated := range allocatedByQueueForCluster {
executorReport.ResourcesByQueue[queue] = &schedulerobjects.QueueClusterResourceUsage{
Created: executorReport.Created,
Queue: queue,
ExecutorId: req.ClusterId,
ResourcesByPriority: allocated.DeepCopy(),
for queue, allocatedByPriorityClass := range allocatedByQueueAndPriorityClassForCluster {
currentExecutorReport.ResourcesByQueue[queue] = &schedulerobjects.QueueClusterResourceUsage{
Created: currentExecutorReport.Created,
Queue: queue,
ExecutorId: req.ClusterId,
ResourcesByPriorityClassName: armadamaps.DeepCopy(allocatedByPriorityClass),
}
}
reportsByExecutor[req.ClusterId] = executorReport
if err := q.usageRepository.UpdateClusterQueueResourceUsage(req.ClusterId, executorReport); err != nil {
reportsByExecutor[req.ClusterId] = currentExecutorReport

// Write the updated report into Redis to make the information available to other replicas of the server.
if err := q.usageRepository.UpdateClusterQueueResourceUsage(req.ClusterId, currentExecutorReport); err != nil {
return nil, errors.WithMessagef(err, "failed to update cluster usage for cluster %s", req.ClusterId)
}
allocatedByQueueForPool := q.aggregateUsage(reportsByExecutor, req.Pool)
log.Infof("allocated resources per queue for pool %s before scheduling: %v", req.Pool, allocatedByQueueForPool)

// Aggregate allocation across all clusters.
allocatedByQueueAndPriorityClassForPool := q.aggregateAllocationAcrossExecutor(reportsByExecutor, req.Pool)
log.Infof("allocated resources per queue for pool %s before scheduling: %v", req.Pool, allocatedByQueueAndPriorityClassForPool)

// Store executor details in Redis so they can be used by submit checks and the new scheduler.
if err := q.executorRepository.StoreExecutor(ctx, &schedulerobjects.Executor{
Expand All @@ -435,29 +461,7 @@ func (q *AggregatedQueueServer) getJobs(ctx context.Context, req *api.StreamingL
log.WithError(err).Warnf("could not store executor details for cluster %s", req.ClusterId)
}

// Map queue names to priority factor for all active queues, i.e.,
// all queues for which the jobs queue has not been deleted automatically by Redis.
queues, err := q.queueRepository.GetAllQueues()
if err != nil {
return nil, err
}
priorityFactorByQueue := make(map[string]float64, len(queues))
apiQueues := make([]*api.Queue, len(queues))
for i, queue := range queues {
priorityFactorByQueue[queue.Name] = float64(queue.PriorityFactor)
apiQueues[i] = &api.Queue{Name: queue.Name}
}
activeQueues, err := q.jobRepository.FilterActiveQueues(apiQueues)
if err != nil {
return nil, err
}
priorityFactorByActiveQueue := make(map[string]float64, len(activeQueues))
for _, queue := range activeQueues {
priorityFactorByActiveQueue[queue.Name] = priorityFactorByQueue[queue.Name]
}

// Give Schedule() a 3 second shorter deadline than ctx,
// to give it a chance to finish up before ctx is cancelled.
// Give Schedule() a 3 second shorter deadline than ctx to give it a chance to finish up before ctx deadline.
if deadline, ok := ctx.Deadline(); ok {
var cancel context.CancelFunc
ctx, cancel = context.WithDeadline(ctx, deadline.Add(-3*time.Second))
Expand All @@ -473,7 +477,7 @@ func (q *AggregatedQueueServer) getJobs(ctx context.Context, req *api.StreamingL
schedulerobjects.ResourceList{Resources: totalCapacity},
)
for queue, priorityFactor := range priorityFactorByQueue {
if err := sctx.AddQueueSchedulingContext(queue, priorityFactor, allocatedByQueueForPool[queue]); err != nil {
if err := sctx.AddQueueSchedulingContext(queue, priorityFactor, allocatedByQueueAndPriorityClassForPool[queue]); err != nil {
return nil, err
}
}
Expand Down Expand Up @@ -629,38 +633,37 @@ func (q *AggregatedQueueServer) getJobs(ctx context.Context, req *api.StreamingL
}

// Update resource cluster report to account for preempted/leased jobs and write it to Redis.
allocatedByQueueForCluster = scheduler.UpdateUsage(
allocatedByQueueForCluster,
result.PreemptedJobs,
q.schedulingConfig.Preemption.PriorityClasses,
scheduler.Subtract,
allocatedByQueueAndPriorityClassForCluster = updateAllocatedByQueueAndPriorityClass(
allocatedByQueueAndPriorityClassForCluster,
subtract, result.PreemptedJobs,
)
for queue, m := range allocatedByQueueForCluster {
for queue, m := range allocatedByQueueAndPriorityClassForCluster {
// Any quantity in the negative indicates a resource accounting problem.
if !m.IsStrictlyNonNegative() {
log.Errorf("unexpected negative resource quantity for queue %s: %v", queue, m)
for _, rl := range m {
if !rl.IsStrictlyNonNegative() {
return nil, errors.Errorf("unexpected negative resource quantity for queue %s: %v", queue, m)
}
}
}
allocatedByQueueForCluster = scheduler.UpdateUsage(
allocatedByQueueForCluster,
successfullyLeasedApiJobs,
q.schedulingConfig.Preemption.PriorityClasses,
scheduler.Add,
allocatedByQueueAndPriorityClassForCluster = updateAllocatedByQueueAndPriorityClass(
allocatedByQueueAndPriorityClassForCluster,
add, successfullyLeasedApiJobs,
)
executorReport.Created = q.clock.Now()
for queue, usage := range allocatedByQueueForCluster {
executorReport.ResourcesByQueue[queue] = &schedulerobjects.QueueClusterResourceUsage{
Created: executorReport.Created,
Queue: queue,
ExecutorId: req.ClusterId,
ResourcesByPriority: usage.DeepCopy(),
currentExecutorReport.Created = q.clock.Now()
for queue, usage := range allocatedByQueueAndPriorityClassForCluster {
currentExecutorReport.ResourcesByQueue[queue] = &schedulerobjects.QueueClusterResourceUsage{
Created: currentExecutorReport.Created,
Queue: queue,
ExecutorId: req.ClusterId,
ResourcesByPriorityClassName: armadamaps.DeepCopy(usage),
}
}
if err := q.usageRepository.UpdateClusterQueueResourceUsage(req.ClusterId, executorReport); err != nil {
if err := q.usageRepository.UpdateClusterQueueResourceUsage(req.ClusterId, currentExecutorReport); err != nil {
logging.WithStacktrace(log, err).Errorf("failed to update cluster usage")
}
allocatedByQueueForPool = q.aggregateUsage(reportsByExecutor, req.Pool)
log.Infof("allocated resources per queue for pool %s after scheduling: %v", req.Pool, allocatedByQueueForPool)

allocatedByQueueAndPriorityClassForPool = q.aggregateAllocationAcrossExecutor(reportsByExecutor, req.Pool)
log.Infof("allocated resources per queue for pool %s after scheduling: %v", req.Pool, allocatedByQueueAndPriorityClassForPool)

// Optionally set node id selectors on scheduled jobs.
if q.schedulingConfig.Preemption.SetNodeIdSelector {
Expand Down Expand Up @@ -742,31 +745,65 @@ func (q *AggregatedQueueServer) getJobs(ctx context.Context, req *api.StreamingL
return successfullyLeasedApiJobs, nil
}

// aggregateUsage Creates a map of resource usage first by cluster and then by queue.
// Clusters in pools other than pool are excluded.
func (q *AggregatedQueueServer) aggregateUsage(reportsByCluster map[string]*schedulerobjects.ClusterResourceUsageReport, pool string) map[string]schedulerobjects.QuantityByPriorityAndResourceType {
const activeClusterExpiry = 10 * time.Minute
type addOrSubtract int

const (
add addOrSubtract = iota
subtract
)

func updateAllocatedByQueueAndPriorityClass[T interfaces.LegacySchedulerJob](allocatedByQueueAndPriorityClass map[string]schedulerobjects.QuantityByTAndResourceType[string], op addOrSubtract, jobs []T) map[string]schedulerobjects.QuantityByTAndResourceType[string] {

Check failure on line 755 in internal/armada/server/lease.go

View workflow job for this annotation

GitHub Actions / go-lint (1.20)

line is 266 characters (lll)

Check failure on line 755 in internal/armada/server/lease.go

View workflow job for this annotation

GitHub Actions / go-lint (1.20)

line is 266 characters (lll)
if allocatedByQueueAndPriorityClass == nil {
allocatedByQueueAndPriorityClass = make(map[string]schedulerobjects.QuantityByTAndResourceType[string], 256)
}
for _, job := range jobs {
allocatedByPriorityClassName := allocatedByQueueAndPriorityClass[job.GetQueue()]
if allocatedByPriorityClassName == nil {
allocatedByPriorityClassName = make(map[string]schedulerobjects.ResourceList)
allocatedByQueueAndPriorityClass[job.GetQueue()] = allocatedByPriorityClassName
}
allocated := allocatedByPriorityClassName[job.GetPriorityClassName()]
if op == add {
allocated.AddV1ResourceList(job.GetResourceRequirements().Requests)
} else if op == subtract {
allocated.SubV1ResourceList(job.GetResourceRequirements().Requests)
} else {
panic(fmt.Sprintf("unknown op %d", op))
}
allocatedByPriorityClassName[job.GetPriorityClassName()] = allocated
}
return allocatedByQueueAndPriorityClass
}

func (q *AggregatedQueueServer) aggregateAllocationAcrossExecutor(reportsByExecutor map[string]*schedulerobjects.ClusterResourceUsageReport, pool string) map[string]schedulerobjects.QuantityByTAndResourceType[string] {
now := q.clock.Now()
aggregatedUsageByQueue := make(map[string]schedulerobjects.QuantityByPriorityAndResourceType)
for _, clusterReport := range reportsByCluster {
if clusterReport.Pool != pool {
// Separate resource accounting per pool.
allocatedByQueueAndPriorityClass := make(map[string]schedulerobjects.QuantityByTAndResourceType[string])
for _, executorReport := range reportsByExecutor {
if executorReport.Pool != pool {
// Only consider executors in the specified pool.
continue
}
if !clusterReport.Created.Add(activeClusterExpiry).After(now) {
// Stale report; omit.
continue
if q.schedulingConfig.ExecutorTimeout != 0 {
reportAge := now.Sub(executorReport.Created)
if reportAge > q.schedulingConfig.ExecutorTimeout {
// Stale report; omit.
continue
}
}
for queue, report := range clusterReport.ResourcesByQueue {
quantityByPriorityAndResourceType, ok := aggregatedUsageByQueue[queue]
if !ok {
quantityByPriorityAndResourceType = make(schedulerobjects.QuantityByPriorityAndResourceType)
aggregatedUsageByQueue[queue] = quantityByPriorityAndResourceType
for queue, queueReport := range executorReport.ResourcesByQueue {
allocatedByPriorityClass := allocatedByQueueAndPriorityClass[queue]
if allocatedByPriorityClass == nil {
allocatedByPriorityClass = make(map[string]schedulerobjects.ResourceList)
allocatedByQueueAndPriorityClass[queue] = allocatedByPriorityClass
}
for priorityClassName, allocated := range queueReport.ResourcesByPriorityClassName {
rl := allocatedByPriorityClass[priorityClassName]
rl.Add(allocated)
allocatedByPriorityClass[priorityClassName] = rl
}
quantityByPriorityAndResourceType.Add(report.ResourcesByPriority)
}
}
return aggregatedUsageByQueue
return allocatedByQueueAndPriorityClass
}

func (q *AggregatedQueueServer) decompressJobOwnershipGroups(jobs []*api.Job) error {
Expand Down
4 changes: 2 additions & 2 deletions internal/executor/utilisation/cluster_utilisation.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,8 +281,8 @@ func groupPodsByNodes(pods []*v1.Pod) map[string][]*v1.Pod {
return podsByNodes
}

func allocatedByPriorityAndResourceTypeFromPods(pods []*v1.Pod) schedulerobjects.QuantityByPriorityAndResourceType {
rv := make(schedulerobjects.QuantityByPriorityAndResourceType)
func allocatedByPriorityAndResourceTypeFromPods(pods []*v1.Pod) schedulerobjects.QuantityByTAndResourceType[int32] {
rv := make(schedulerobjects.QuantityByTAndResourceType[int32])
for _, pod := range pods {
var priority int32 = 0
if pod.Spec.Priority != nil {
Expand Down
40 changes: 0 additions & 40 deletions internal/scheduler/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,46 +116,6 @@ func JobsSummary(jobs []interfaces.LegacySchedulerJob) string {
)
}

type AddOrSubtract int

const (
Add AddOrSubtract = iota
Subtract
)

func UpdateUsage[S ~[]E, E interfaces.LegacySchedulerJob](
usage map[string]schedulerobjects.QuantityByPriorityAndResourceType,
jobs S,
priorityClasses map[string]configuration.PriorityClass,
addOrSubtract AddOrSubtract,
) map[string]schedulerobjects.QuantityByPriorityAndResourceType {
if usage == nil {
usage = make(map[string]schedulerobjects.QuantityByPriorityAndResourceType)
}
for _, job := range jobs {
req := PodRequirementFromLegacySchedulerJob(job, priorityClasses)
if req == nil {
continue
}
requests := schedulerobjects.ResourceListFromV1ResourceList(req.ResourceRequirements.Requests)
queue := job.GetQueue()
m := usage[queue]
if m == nil {
m = make(schedulerobjects.QuantityByPriorityAndResourceType)
}
switch addOrSubtract {
case Add:
m.Add(schedulerobjects.QuantityByPriorityAndResourceType{req.Priority: requests})
case Subtract:
m.Sub(schedulerobjects.QuantityByPriorityAndResourceType{req.Priority: requests})
default:
panic(fmt.Sprintf("invalid operation %d", addOrSubtract))
}
usage[queue] = m
}
return usage
}

func jobSchedulingContextsFromJobs[T interfaces.LegacySchedulerJob](jobs []T, executorId string, priorityClasses map[string]configuration.PriorityClass) []*schedulercontext.JobSchedulingContext {
if jobs == nil {
return nil
Expand Down
Loading

0 comments on commit f11417a

Please sign in to comment.