Skip to content

Commit

Permalink
Add label to pod to show if it is preemptible or not (#3517)
Browse files Browse the repository at this point in the history
This is useful to external tooling, so it knows which pods it can/can't preempt

Signed-off-by: JamesMurkin <jamesmurkin@hotmail.com>
Co-authored-by: Chris Martin <council_tax@hotmail.com>
  • Loading branch information
JamesMurkin and d80tb7 authored Apr 18, 2024
1 parent 36ad274 commit fa8b9d6
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 6 deletions.
69 changes: 65 additions & 4 deletions internal/scheduler/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ package scheduler

import (
"context"
"strconv"
"strings"

"github.com/apache/pulsar-client-go/pulsar"
"github.com/gogo/protobuf/proto"
"github.com/gogo/protobuf/types"
"github.com/google/uuid"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/clock"

Expand All @@ -17,13 +19,16 @@ import (
"github.com/armadaproject/armada/internal/common/logging"
"github.com/armadaproject/armada/internal/common/pulsarutils"
"github.com/armadaproject/armada/internal/common/schedulers"
priorityTypes "github.com/armadaproject/armada/internal/common/types"
"github.com/armadaproject/armada/internal/common/util"
"github.com/armadaproject/armada/internal/scheduler/database"
"github.com/armadaproject/armada/internal/scheduler/schedulerobjects"
"github.com/armadaproject/armada/pkg/armadaevents"
"github.com/armadaproject/armada/pkg/executorapi"
)

const armadaJobPreemptibleLabel = "armada_preemptible"

// ExecutorApi is the gRPC service executors use to synchronise their state with that of the scheduler.
type ExecutorApi struct {
// Used to send Pulsar messages when, e.g., executors report a job has finished.
Expand All @@ -36,6 +41,8 @@ type ExecutorApi struct {
legacyExecutorRepository database.ExecutorRepository
// Allowed priority class priorities.
allowedPriorities []int32
// Known priority classes
priorityClasses map[string]priorityTypes.PriorityClass
// Max size of Pulsar messages produced.
maxPulsarMessageSizeBytes uint
// See scheduling schedulingConfig.
Expand All @@ -52,6 +59,7 @@ func NewExecutorApi(producer pulsar.Producer,
allowedPriorities []int32,
nodeIdLabel string,
priorityClassNameOverride *string,
priorityClasses map[string]priorityTypes.PriorityClass,
maxPulsarMessageSizeBytes uint,
) (*ExecutorApi, error) {
if len(allowedPriorities) == 0 {
Expand All @@ -66,6 +74,7 @@ func NewExecutorApi(producer pulsar.Producer,
maxPulsarMessageSizeBytes: maxPulsarMessageSizeBytes,
nodeIdLabel: nodeIdLabel,
priorityClassNameOverride: priorityClassNameOverride,
priorityClasses: priorityClasses,
clock: clock.RealClock{},
}, nil
}
Expand Down Expand Up @@ -131,10 +140,6 @@ func (srv *ExecutorApi) LeaseJobRuns(stream executorapi.ExecutorApi_LeaseJobRuns
return err
}

if srv.priorityClassNameOverride != nil {
srv.setPriorityClassName(submitMsg, *srv.priorityClassNameOverride)
}

srv.addNodeIdSelector(submitMsg, lease.Node)

if len(lease.PodRequirementsOverlay) > 0 {
Expand All @@ -146,6 +151,13 @@ func (srv *ExecutorApi) LeaseJobRuns(stream executorapi.ExecutorApi_LeaseJobRuns
addAnnotations(submitMsg, PodRequirementsOverlay.Annotations)
}

srv.addPreemptibleLabel(submitMsg)

// This must happen after anything that relies on the priorityClassName
if srv.priorityClassNameOverride != nil {
srv.setPriorityClassName(submitMsg, *srv.priorityClassNameOverride)
}

var groups []string
if len(lease.Groups) > 0 {
groups, err = compress.DecompressStringArray(lease.Groups, decompressor)
Expand Down Expand Up @@ -202,6 +214,40 @@ func setPriorityClassName(podSpec *armadaevents.PodSpecWithAvoidList, priorityCl
podSpec.PodSpec.PriorityClassName = priorityClassName
}

func (srv *ExecutorApi) addPreemptibleLabel(job *armadaevents.SubmitJob) {
isPremptible := srv.isPreemptible(job)
labels := map[string]string{armadaJobPreemptibleLabel: strconv.FormatBool(isPremptible)}
addLabels(job, labels)
}

func (srv *ExecutorApi) isPreemptible(job *armadaevents.SubmitJob) bool {
priorityClassName := ""

if job.MainObject != nil {
switch typed := job.MainObject.Object.(type) {
case *armadaevents.KubernetesMainObject_PodSpec:
if typed.PodSpec != nil && typed.PodSpec.PodSpec != nil {
priorityClassName = typed.PodSpec.PodSpec.PriorityClassName
}
default:
return false
}
}

priority, known := srv.priorityClasses[priorityClassName]
if priorityClassName == "" {
log.Errorf("priority class name not set on pod %s", job.JobId.String())
return false
}

if !known {
log.Errorf("unknown priority class found %s on job %s", priorityClassName, job.JobId.String())
return false
}

return priority.Preemptible
}

func (srv *ExecutorApi) addNodeIdSelector(job *armadaevents.SubmitJob, nodeId string) {
if job == nil || nodeId == "" {
return
Expand Down Expand Up @@ -239,6 +285,21 @@ func addTolerations(job *armadaevents.SubmitJob, tolerations []v1.Toleration) {
}
}

func addLabels(job *armadaevents.SubmitJob, labels map[string]string) {
if job == nil || len(labels) == 0 {
return
}
if job.ObjectMeta == nil {
job.ObjectMeta = &armadaevents.ObjectMeta{}
}
if job.ObjectMeta.Labels == nil {
job.ObjectMeta.Labels = make(map[string]string, len(labels))
}
for k, v := range labels {
job.ObjectMeta.Labels[k] = v
}
}

func addAnnotations(job *armadaevents.SubmitJob, annotations map[string]string) {
if job == nil || len(annotations) == 0 {
return
Expand Down
65 changes: 63 additions & 2 deletions internal/scheduler/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/armadaproject/armada/internal/common/mocks"
protoutil "github.com/armadaproject/armada/internal/common/proto"
"github.com/armadaproject/armada/internal/common/pulsarutils"
"github.com/armadaproject/armada/internal/common/types"
"github.com/armadaproject/armada/internal/scheduler/database"
schedulermocks "github.com/armadaproject/armada/internal/scheduler/mocks"
"github.com/armadaproject/armada/internal/scheduler/schedulerobjects"
Expand All @@ -29,6 +30,16 @@ import (

const nodeIdName = "kubernetes.io/hostname"

const (
armadaDefaultPriorityClassName = "armada-default"
armadaPreemptiblePriorityClassName = "armada-preemptible"
)

var priorityClasses = map[string]types.PriorityClass{
armadaDefaultPriorityClassName: {Preemptible: false},
armadaPreemptiblePriorityClassName: {Preemptible: true},
}

func TestExecutorApi_LeaseJobRuns(t *testing.T) {
const maxJobsPerCall = uint(100)
testClock := clock.NewFakeClock(time.Now())
Expand Down Expand Up @@ -83,7 +94,9 @@ func TestExecutorApi_LeaseJobRuns(t *testing.T) {

submit, compressedSubmit := submitMsg(
t,
nil,
&armadaevents.ObjectMeta{
Labels: map[string]string{armadaJobPreemptibleLabel: "false"},
},
&v1.PodSpec{
NodeSelector: map[string]string{nodeIdName: "node-id"},
},
Expand All @@ -98,7 +111,12 @@ func TestExecutorApi_LeaseJobRuns(t *testing.T) {
SubmitMessage: compressedSubmit,
}

submitWithoutNodeSelector, compressedSubmitNoNodeSelector := submitMsg(t, nil, nil)
submitWithoutNodeSelector, compressedSubmitNoNodeSelector := submitMsg(t,
&armadaevents.ObjectMeta{
Labels: map[string]string{armadaJobPreemptibleLabel: "false"},
},
nil,
)
leaseWithoutNode := &database.JobRunLease{
RunID: uuid.New(),
Queue: "test-queue",
Expand All @@ -108,6 +126,26 @@ func TestExecutorApi_LeaseJobRuns(t *testing.T) {
SubmitMessage: compressedSubmitNoNodeSelector,
}

preemptibleSubmit, preemptibleCompressedSubmit := submitMsg(
t,
&armadaevents.ObjectMeta{
Labels: map[string]string{armadaJobPreemptibleLabel: "true"},
},
&v1.PodSpec{
PriorityClassName: armadaPreemptiblePriorityClassName,
NodeSelector: map[string]string{nodeIdName: "node-id"},
},
)
preemptibleLease := &database.JobRunLease{
RunID: uuid.New(),
Queue: "test-queue",
JobSet: "test-jobset",
UserID: "test-user",
Node: "node-id",
Groups: compressedGroups,
SubmitMessage: preemptibleCompressedSubmit,
}

tolerations := []v1.Toleration{
{
Key: "whale",
Expand Down Expand Up @@ -137,6 +175,7 @@ func TestExecutorApi_LeaseJobRuns(t *testing.T) {
t,
&armadaevents.ObjectMeta{
Annotations: map[string]string{"runtime_gang_cardinality": "3"},
Labels: map[string]string{armadaJobPreemptibleLabel: "false"},
},
&v1.PodSpec{
NodeSelector: map[string]string{nodeIdName: "node-id"},
Expand Down Expand Up @@ -218,6 +257,26 @@ func TestExecutorApi_LeaseJobRuns(t *testing.T) {
},
},
},
"preemptible job lease": {
request: defaultRequest,
leases: []*database.JobRunLease{preemptibleLease},
expectedExecutor: defaultExpectedExecutor,
expectedMsgs: []*executorapi.LeaseStreamMessage{
{
Event: &executorapi.LeaseStreamMessage_Lease{Lease: &executorapi.JobRunLease{
JobRunId: armadaevents.ProtoUuidFromUuid(preemptibleLease.RunID),
Queue: preemptibleLease.Queue,
Jobset: preemptibleLease.JobSet,
User: preemptibleLease.UserID,
Groups: groups,
Job: preemptibleSubmit,
}},
},
{
Event: &executorapi.LeaseStreamMessage_End{End: &executorapi.EndMarker{}},
},
},
},
"do nothing": {
request: defaultRequest,
expectedExecutor: defaultExpectedExecutor,
Expand Down Expand Up @@ -270,6 +329,7 @@ func TestExecutorApi_LeaseJobRuns(t *testing.T) {
[]int32{1000, 2000},
"kubernetes.io/hostname",
nil,
priorityClasses,
4*1024*1024,
)
require.NoError(t, err)
Expand Down Expand Up @@ -397,6 +457,7 @@ func TestExecutorApi_Publish(t *testing.T) {
[]int32{1000, 2000},
"kubernetes.io/hostname",
nil,
priorityClasses,
4*1024*1024,
)

Expand Down
1 change: 1 addition & 0 deletions internal/scheduler/schedulerapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ func Run(config schedulerconfig.Configuration) error {
types.AllowedPriorities(config.Scheduling.PriorityClasses),
config.Scheduling.NodeIdLabel,
config.Scheduling.PriorityClassNameOverride,
config.Scheduling.PriorityClasses,
config.Pulsar.MaxAllowedMessageSize,
)
if err != nil {
Expand Down

0 comments on commit fa8b9d6

Please sign in to comment.