Skip to content

Commit

Permalink
[cache] Add more details in the clusterQueues inactive message. (#3127)
Browse files Browse the repository at this point in the history
* [cache] Add more details in the clusterQueues inactive message.

* Review Remarks

* Review Remarks
  • Loading branch information
trasc authored Sep 26, 2024
1 parent fd12357 commit 23dec6d
Show file tree
Hide file tree
Showing 6 changed files with 174 additions and 99 deletions.
13 changes: 13 additions & 0 deletions apis/kueue/v1beta1/clusterqueue_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,19 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// ClusterQueue Active condition reasons.
const (
ClusterQueueActiveReasonTerminating = "Terminating"
ClusterQueueActiveReasonStopped = "Stopped"
ClusterQueueActiveReasonFlavorNotFound = "FlavorNotFound"
ClusterQueueActiveReasonAdmissionCheckNotFound = "AdmissionCheckNotFound"
ClusterQueueActiveReasonAdmissionCheckInactive = "AdmissionCheckInactive"
ClusterQueueActiveReasonMultipleSingleInstanceControllerAdmissionChecks = "MultipleSingleInstanceControllerAdmissionChecks"
ClusterQueueActiveReasonFlavorIndependentAdmissionCheckAppliedPerFlavor = "FlavorIndependentAdmissionCheckAppliedPerFlavor"
ClusterQueueActiveReasonUnknown = "Unknown"
ClusterQueueActiveReasonReady = "Ready"
)

// ClusterQueueSpec defines the desired state of ClusterQueue
// +kubebuilder:validation:XValidation:rule="!has(self.cohort) && has(self.resourceGroups) ? self.resourceGroups.all(rg, rg.flavors.all(f, f.resources.all(r, !has(r.borrowingLimit)))) : true", message="borrowingLimit must be nil when cohort is empty"
type ClusterQueueSpec struct {
Expand Down
14 changes: 7 additions & 7 deletions pkg/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3354,31 +3354,31 @@ func TestClusterQueueReadiness(t *testing.T) {
clusterQueueName: "queue1",
wantStatus: metav1.ConditionFalse,
wantReason: "FlavorNotFound",
wantMessage: "Can't admit new workloads: FlavorNotFound",
wantMessage: "Can't admit new workloads: references missing ResourceFlavor(s): [flavor1].",
},
"check not found": {
clusterQueues: []*kueue.ClusterQueue{baseQueue},
resourceFlavors: []*kueue.ResourceFlavor{baseFlavor},
clusterQueueName: "queue1",
wantStatus: metav1.ConditionFalse,
wantReason: "CheckNotFoundOrInactive",
wantMessage: "Can't admit new workloads: CheckNotFoundOrInactive",
wantReason: "AdmissionCheckNotFound",
wantMessage: "Can't admit new workloads: references missing AdmissionCheck(s): [check1].",
},
"check inactive": {
clusterQueues: []*kueue.ClusterQueue{baseQueue},
resourceFlavors: []*kueue.ResourceFlavor{baseFlavor},
admissionChecks: []*kueue.AdmissionCheck{utiltesting.MakeAdmissionCheck("check1").Obj()},
clusterQueueName: "queue1",
wantStatus: metav1.ConditionFalse,
wantReason: "CheckNotFoundOrInactive",
wantMessage: "Can't admit new workloads: CheckNotFoundOrInactive",
wantReason: "AdmissionCheckInactive",
wantMessage: "Can't admit new workloads: references inactive AdmissionCheck(s): [check1].",
},
"flavor and check not found": {
clusterQueues: []*kueue.ClusterQueue{baseQueue},
clusterQueueName: "queue1",
wantStatus: metav1.ConditionFalse,
wantReason: "FlavorNotFound",
wantMessage: "Can't admit new workloads: FlavorNotFound, CheckNotFoundOrInactive",
wantMessage: "Can't admit new workloads: references missing ResourceFlavor(s): [flavor1], references missing AdmissionCheck(s): [check1].",
},
"terminating": {
clusterQueues: []*kueue.ClusterQueue{baseQueue},
Expand Down Expand Up @@ -3406,7 +3406,7 @@ func TestClusterQueueReadiness(t *testing.T) {
clusterQueueName: "queue1",
wantStatus: metav1.ConditionFalse,
wantReason: "Stopped",
wantMessage: "Can't admit new workloads: Stopped",
wantMessage: "Can't admit new workloads: is stopped.",
},
}

Expand Down
130 changes: 82 additions & 48 deletions pkg/cache/clusterqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ package cache

import (
"errors"
"fmt"
"maps"
"math"
"slices"
"strings"

corev1 "k8s.io/api/core/v1"
Expand All @@ -35,6 +38,8 @@ import (
"sigs.k8s.io/kueue/pkg/metrics"
"sigs.k8s.io/kueue/pkg/resources"
utilac "sigs.k8s.io/kueue/pkg/util/admissioncheck"
"sigs.k8s.io/kueue/pkg/util/api"
utilmaps "sigs.k8s.io/kueue/pkg/util/maps"
"sigs.k8s.io/kueue/pkg/workload"
)

Expand Down Expand Up @@ -65,15 +70,16 @@ type clusterQueue struct {

AdmittedUsage resources.FlavorResourceQuantities
// localQueues by (namespace/name).
localQueues map[string]*queue
podsReadyTracking bool
hasMissingFlavors bool
hasMissingOrInactiveAdmissionChecks bool
hasMultipleSingleInstanceControllersChecks bool
hasFlavorIndependentAdmissionCheckAppliedPerFlavor bool
admittedWorkloadsCount int
isStopped bool
workloadInfoOptions []workload.InfoOption
localQueues map[string]*queue
podsReadyTracking bool
missingFlavors []kueue.ResourceFlavorReference
missingAdmissionChecks []string
inactiveAdmissionChecks []string
multipleSingleInstanceControllersChecks map[string][]string // key = controllerName
flavorIndependentAdmissionCheckAppliedPerFlavor []string
admittedWorkloadsCount int
isStopped bool
workloadInfoOptions []workload.InfoOption

resourceNode ResourceNode
hierarchy.ClusterQueue[*cohort]
Expand Down Expand Up @@ -219,7 +225,12 @@ func (c *clusterQueue) updateQuotasAndResourceGroups(in []kueue.ResourceGroup) b

func (c *clusterQueue) updateQueueStatus() {
status := active
if c.hasMissingFlavors || c.hasMissingOrInactiveAdmissionChecks || c.isStopped || c.hasMultipleSingleInstanceControllersChecks || c.hasFlavorIndependentAdmissionCheckAppliedPerFlavor {
if c.isStopped ||
len(c.missingFlavors) > 0 ||
len(c.missingAdmissionChecks) > 0 ||
len(c.inactiveAdmissionChecks) > 0 ||
len(c.multipleSingleInstanceControllersChecks) > 0 ||
len(c.flavorIndependentAdmissionCheckAppliedPerFlavor) > 0 {
status = pending
}
if c.Status == terminating {
Expand All @@ -234,45 +245,56 @@ func (c *clusterQueue) updateQueueStatus() {
func (c *clusterQueue) inactiveReason() (string, string) {
switch c.Status {
case terminating:
return "Terminating", "Can't admit new workloads; clusterQueue is terminating"
return kueue.ClusterQueueActiveReasonTerminating, "Can't admit new workloads; clusterQueue is terminating"
case pending:
reasons := make([]string, 0, 3)
messages := make([]string, 0, 3)
if c.isStopped {
reasons = append(reasons, "Stopped")
reasons = append(reasons, kueue.ClusterQueueActiveReasonStopped)
messages = append(messages, "is stopped")
}
if c.hasMissingFlavors {
reasons = append(reasons, "FlavorNotFound")
if len(c.missingFlavors) > 0 {
reasons = append(reasons, kueue.ClusterQueueActiveReasonFlavorNotFound)
messages = append(messages, fmt.Sprintf("references missing ResourceFlavor(s): %v", c.missingFlavors))
}
if c.hasMissingOrInactiveAdmissionChecks {
reasons = append(reasons, "CheckNotFoundOrInactive")
if len(c.missingAdmissionChecks) > 0 {
reasons = append(reasons, kueue.ClusterQueueActiveReasonAdmissionCheckNotFound)
messages = append(messages, fmt.Sprintf("references missing AdmissionCheck(s): %v", c.missingAdmissionChecks))
}

if c.hasMultipleSingleInstanceControllersChecks {
reasons = append(reasons, "MultipleSingleInstanceControllerChecks")
if len(c.inactiveAdmissionChecks) > 0 {
reasons = append(reasons, kueue.ClusterQueueActiveReasonAdmissionCheckInactive)
messages = append(messages, fmt.Sprintf("references inactive AdmissionCheck(s): %v", c.inactiveAdmissionChecks))
}
if len(c.multipleSingleInstanceControllersChecks) > 0 {
reasons = append(reasons, kueue.ClusterQueueActiveReasonMultipleSingleInstanceControllerAdmissionChecks)
for _, controller := range utilmaps.SortedKeys(c.multipleSingleInstanceControllersChecks) {
messages = append(messages, fmt.Sprintf("only one AdmissionCheck of %v can be referenced for controller %q", c.multipleSingleInstanceControllersChecks[controller], controller))
}
}

if c.hasFlavorIndependentAdmissionCheckAppliedPerFlavor {
reasons = append(reasons, "FlavorIndependentAdmissionCheckAppliedPerFlavor")
if len(c.flavorIndependentAdmissionCheckAppliedPerFlavor) > 0 {
reasons = append(reasons, kueue.ClusterQueueActiveReasonFlavorIndependentAdmissionCheckAppliedPerFlavor)
messages = append(messages, fmt.Sprintf("AdmissionCheck(s): %v cannot be set at flavor level", c.flavorIndependentAdmissionCheckAppliedPerFlavor))
}

if len(reasons) == 0 {
return "Unknown", "Can't admit new workloads."
return kueue.ClusterQueueActiveReasonUnknown, "Can't admit new workloads."
}

return reasons[0], strings.Join([]string{"Can't admit new workloads:", strings.Join(reasons, ", ")}, " ")
return reasons[0], api.TruncateConditionMessage(strings.Join([]string{"Can't admit new workloads: ", strings.Join(messages, ", "), "."}, ""))
}
return "Ready", "Can admit new flavors"
return kueue.ClusterQueueActiveReasonReady, "Can admit new workloads"
}

// UpdateWithFlavors updates a ClusterQueue based on the passed ResourceFlavors set.
// Exported only for testing.
func (c *clusterQueue) UpdateWithFlavors(flavors map[kueue.ResourceFlavorReference]*kueue.ResourceFlavor) {
c.hasMissingFlavors = c.updateLabelKeys(flavors)
c.updateLabelKeys(flavors)
c.updateQueueStatus()
}

func (c *clusterQueue) updateLabelKeys(flavors map[kueue.ResourceFlavorReference]*kueue.ResourceFlavor) bool {
var flavorNotFound bool
func (c *clusterQueue) updateLabelKeys(flavors map[kueue.ResourceFlavorReference]*kueue.ResourceFlavor) {
c.missingFlavors = nil
for i := range c.ResourceGroups {
rg := &c.ResourceGroups[i]
if len(rg.Flavors) == 0 {
Expand All @@ -286,61 +308,73 @@ func (c *clusterQueue) updateLabelKeys(flavors map[kueue.ResourceFlavorReference
keys.Insert(k)
}
} else {
flavorNotFound = true
c.missingFlavors = append(c.missingFlavors, fName)
}
}

if keys.Len() > 0 {
rg.LabelKeys = keys
}
}

return flavorNotFound
}

// updateWithAdmissionChecks updates a ClusterQueue based on the passed AdmissionChecks set.
func (c *clusterQueue) updateWithAdmissionChecks(checks map[string]AdmissionCheck) {
hasMissing := false
hasSpecificChecks := false
checksPerController := make(map[string]int, len(c.AdmissionChecks))
checksPerController := make(map[string][]string, len(c.AdmissionChecks))
singleInstanceControllers := sets.New[string]()
var missing []string
var inactive []string
var flavorIndependentCheckOnFlavors []string
for acName, flavors := range c.AdmissionChecks {
if ac, found := checks[acName]; !found {
hasMissing = true
missing = append(missing, acName)
} else {
if !ac.Active {
hasMissing = true
inactive = append(inactive, acName)
}
checksPerController[ac.Controller]++
checksPerController[ac.Controller] = append(checksPerController[ac.Controller], acName)
if ac.SingleInstanceInClusterQueue {
singleInstanceControllers.Insert(ac.Controller)
}
if ac.FlavorIndependent && flavors.Len() != 0 {
hasSpecificChecks = true
flavorIndependentCheckOnFlavors = append(flavorIndependentCheckOnFlavors, acName)
}
}
}

// sort the lists since c.AdmissionChecks is a map
slices.Sort(missing)
slices.Sort(inactive)
slices.Sort(flavorIndependentCheckOnFlavors)

update := false
if hasMissing != c.hasMissingOrInactiveAdmissionChecks {
c.hasMissingOrInactiveAdmissionChecks = hasMissing
if !slices.Equal(c.missingAdmissionChecks, missing) {
c.missingAdmissionChecks = missing
update = true
}

hasMultipleSICC := false
for controller, checks := range checksPerController {
if singleInstanceControllers.Has(controller) && checks > 1 {
hasMultipleSICC = true
}
if !slices.Equal(c.inactiveAdmissionChecks, inactive) {
c.inactiveAdmissionChecks = inactive
update = true
}

// remove the controllers which don't have more then one AC or are not single instance.
maps.DeleteFunc(checksPerController, func(controller string, acs []string) bool {
return len(acs) < 2 || !singleInstanceControllers.Has(controller)
})

// sort the remaining set
for c := range checksPerController {
slices.Sort(checksPerController[c])
}

if c.hasMultipleSingleInstanceControllersChecks != hasMultipleSICC {
c.hasMultipleSingleInstanceControllersChecks = hasMultipleSICC
if !maps.EqualFunc(checksPerController, c.multipleSingleInstanceControllersChecks, slices.Equal) {
c.multipleSingleInstanceControllersChecks = checksPerController
update = true
}

if c.hasFlavorIndependentAdmissionCheckAppliedPerFlavor != hasSpecificChecks {
c.hasFlavorIndependentAdmissionCheckAppliedPerFlavor = hasSpecificChecks
if !slices.Equal(c.flavorIndependentAdmissionCheckAppliedPerFlavor, flavorIndependentCheckOnFlavors) {
c.flavorIndependentAdmissionCheckAppliedPerFlavor = flavorIndependentCheckOnFlavors
update = true
}

Expand Down
Loading

0 comments on commit 23dec6d

Please sign in to comment.