diff --git a/internal/scheduler/api.go b/internal/scheduler/api.go index 92c2e6741b7..a501d242241 100644 --- a/internal/scheduler/api.go +++ b/internal/scheduler/api.go @@ -2,6 +2,7 @@ package scheduler import ( "context" + "strconv" "strings" "github.com/apache/pulsar-client-go/pulsar" @@ -9,6 +10,7 @@ import ( "github.com/gogo/protobuf/types" "github.com/google/uuid" "github.com/pkg/errors" + log "github.com/sirupsen/logrus" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/clock" @@ -17,6 +19,7 @@ import ( "github.com/armadaproject/armada/internal/common/logging" "github.com/armadaproject/armada/internal/common/pulsarutils" "github.com/armadaproject/armada/internal/common/schedulers" + priorityTypes "github.com/armadaproject/armada/internal/common/types" "github.com/armadaproject/armada/internal/common/util" "github.com/armadaproject/armada/internal/scheduler/database" "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" @@ -24,6 +27,8 @@ import ( "github.com/armadaproject/armada/pkg/executorapi" ) +const armadaJobPreemptibleLabel = "armada_preemptible" + // ExecutorApi is the gRPC service executors use to synchronise their state with that of the scheduler. type ExecutorApi struct { // Used to send Pulsar messages when, e.g., executors report a job has finished. @@ -36,6 +41,8 @@ type ExecutorApi struct { legacyExecutorRepository database.ExecutorRepository // Allowed priority class priorities. allowedPriorities []int32 + // Known priority classes + priorityClasses map[string]priorityTypes.PriorityClass // Max size of Pulsar messages produced. maxPulsarMessageSizeBytes uint // See scheduling schedulingConfig. @@ -52,6 +59,7 @@ func NewExecutorApi(producer pulsar.Producer, allowedPriorities []int32, nodeIdLabel string, priorityClassNameOverride *string, + priorityClasses map[string]priorityTypes.PriorityClass, maxPulsarMessageSizeBytes uint, ) (*ExecutorApi, error) { if len(allowedPriorities) == 0 { @@ -66,6 +74,7 @@ func NewExecutorApi(producer pulsar.Producer, maxPulsarMessageSizeBytes: maxPulsarMessageSizeBytes, nodeIdLabel: nodeIdLabel, priorityClassNameOverride: priorityClassNameOverride, + priorityClasses: priorityClasses, clock: clock.RealClock{}, }, nil } @@ -131,10 +140,6 @@ func (srv *ExecutorApi) LeaseJobRuns(stream executorapi.ExecutorApi_LeaseJobRuns return err } - if srv.priorityClassNameOverride != nil { - srv.setPriorityClassName(submitMsg, *srv.priorityClassNameOverride) - } - srv.addNodeIdSelector(submitMsg, lease.Node) if len(lease.PodRequirementsOverlay) > 0 { @@ -146,6 +151,13 @@ func (srv *ExecutorApi) LeaseJobRuns(stream executorapi.ExecutorApi_LeaseJobRuns addAnnotations(submitMsg, PodRequirementsOverlay.Annotations) } + srv.addPreemptibleLabel(submitMsg) + + // This must happen after anything that relies on the priorityClassName + if srv.priorityClassNameOverride != nil { + srv.setPriorityClassName(submitMsg, *srv.priorityClassNameOverride) + } + var groups []string if len(lease.Groups) > 0 { groups, err = compress.DecompressStringArray(lease.Groups, decompressor) @@ -202,6 +214,40 @@ func setPriorityClassName(podSpec *armadaevents.PodSpecWithAvoidList, priorityCl podSpec.PodSpec.PriorityClassName = priorityClassName } +func (srv *ExecutorApi) addPreemptibleLabel(job *armadaevents.SubmitJob) { + isPremptible := srv.isPreemptible(job) + labels := map[string]string{armadaJobPreemptibleLabel: strconv.FormatBool(isPremptible)} + addLabels(job, labels) +} + +func (srv *ExecutorApi) isPreemptible(job *armadaevents.SubmitJob) bool { + priorityClassName := "" + + if job.MainObject != nil { + switch typed := job.MainObject.Object.(type) { + case *armadaevents.KubernetesMainObject_PodSpec: + if typed.PodSpec != nil && typed.PodSpec.PodSpec != nil { + priorityClassName = typed.PodSpec.PodSpec.PriorityClassName + } + default: + return false + } + } + + priority, known := srv.priorityClasses[priorityClassName] + if priorityClassName == "" { + log.Errorf("priority class name not set on pod %s", job.JobId.String()) + return false + } + + if !known { + log.Errorf("unknown priority class found %s on job %s", priorityClassName, job.JobId.String()) + return false + } + + return priority.Preemptible +} + func (srv *ExecutorApi) addNodeIdSelector(job *armadaevents.SubmitJob, nodeId string) { if job == nil || nodeId == "" { return @@ -239,6 +285,21 @@ func addTolerations(job *armadaevents.SubmitJob, tolerations []v1.Toleration) { } } +func addLabels(job *armadaevents.SubmitJob, labels map[string]string) { + if job == nil || len(labels) == 0 { + return + } + if job.ObjectMeta == nil { + job.ObjectMeta = &armadaevents.ObjectMeta{} + } + if job.ObjectMeta.Labels == nil { + job.ObjectMeta.Labels = make(map[string]string, len(labels)) + } + for k, v := range labels { + job.ObjectMeta.Labels[k] = v + } +} + func addAnnotations(job *armadaevents.SubmitJob, annotations map[string]string) { if job == nil || len(annotations) == 0 { return diff --git a/internal/scheduler/api_test.go b/internal/scheduler/api_test.go index 9b1d181bbe9..45197fd6386 100644 --- a/internal/scheduler/api_test.go +++ b/internal/scheduler/api_test.go @@ -19,6 +19,7 @@ import ( "github.com/armadaproject/armada/internal/common/mocks" protoutil "github.com/armadaproject/armada/internal/common/proto" "github.com/armadaproject/armada/internal/common/pulsarutils" + "github.com/armadaproject/armada/internal/common/types" "github.com/armadaproject/armada/internal/scheduler/database" schedulermocks "github.com/armadaproject/armada/internal/scheduler/mocks" "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" @@ -29,6 +30,16 @@ import ( const nodeIdName = "kubernetes.io/hostname" +const ( + armadaDefaultPriorityClassName = "armada-default" + armadaPreemptiblePriorityClassName = "armada-preemptible" +) + +var priorityClasses = map[string]types.PriorityClass{ + armadaDefaultPriorityClassName: {Preemptible: false}, + armadaPreemptiblePriorityClassName: {Preemptible: true}, +} + func TestExecutorApi_LeaseJobRuns(t *testing.T) { const maxJobsPerCall = uint(100) testClock := clock.NewFakeClock(time.Now()) @@ -83,7 +94,9 @@ func TestExecutorApi_LeaseJobRuns(t *testing.T) { submit, compressedSubmit := submitMsg( t, - nil, + &armadaevents.ObjectMeta{ + Labels: map[string]string{armadaJobPreemptibleLabel: "false"}, + }, &v1.PodSpec{ NodeSelector: map[string]string{nodeIdName: "node-id"}, }, @@ -98,7 +111,12 @@ func TestExecutorApi_LeaseJobRuns(t *testing.T) { SubmitMessage: compressedSubmit, } - submitWithoutNodeSelector, compressedSubmitNoNodeSelector := submitMsg(t, nil, nil) + submitWithoutNodeSelector, compressedSubmitNoNodeSelector := submitMsg(t, + &armadaevents.ObjectMeta{ + Labels: map[string]string{armadaJobPreemptibleLabel: "false"}, + }, + nil, + ) leaseWithoutNode := &database.JobRunLease{ RunID: uuid.New(), Queue: "test-queue", @@ -108,6 +126,26 @@ func TestExecutorApi_LeaseJobRuns(t *testing.T) { SubmitMessage: compressedSubmitNoNodeSelector, } + preemptibleSubmit, preemptibleCompressedSubmit := submitMsg( + t, + &armadaevents.ObjectMeta{ + Labels: map[string]string{armadaJobPreemptibleLabel: "true"}, + }, + &v1.PodSpec{ + PriorityClassName: armadaPreemptiblePriorityClassName, + NodeSelector: map[string]string{nodeIdName: "node-id"}, + }, + ) + preemptibleLease := &database.JobRunLease{ + RunID: uuid.New(), + Queue: "test-queue", + JobSet: "test-jobset", + UserID: "test-user", + Node: "node-id", + Groups: compressedGroups, + SubmitMessage: preemptibleCompressedSubmit, + } + tolerations := []v1.Toleration{ { Key: "whale", @@ -137,6 +175,7 @@ func TestExecutorApi_LeaseJobRuns(t *testing.T) { t, &armadaevents.ObjectMeta{ Annotations: map[string]string{"runtime_gang_cardinality": "3"}, + Labels: map[string]string{armadaJobPreemptibleLabel: "false"}, }, &v1.PodSpec{ NodeSelector: map[string]string{nodeIdName: "node-id"}, @@ -218,6 +257,26 @@ func TestExecutorApi_LeaseJobRuns(t *testing.T) { }, }, }, + "preemptible job lease": { + request: defaultRequest, + leases: []*database.JobRunLease{preemptibleLease}, + expectedExecutor: defaultExpectedExecutor, + expectedMsgs: []*executorapi.LeaseStreamMessage{ + { + Event: &executorapi.LeaseStreamMessage_Lease{Lease: &executorapi.JobRunLease{ + JobRunId: armadaevents.ProtoUuidFromUuid(preemptibleLease.RunID), + Queue: preemptibleLease.Queue, + Jobset: preemptibleLease.JobSet, + User: preemptibleLease.UserID, + Groups: groups, + Job: preemptibleSubmit, + }}, + }, + { + Event: &executorapi.LeaseStreamMessage_End{End: &executorapi.EndMarker{}}, + }, + }, + }, "do nothing": { request: defaultRequest, expectedExecutor: defaultExpectedExecutor, @@ -270,6 +329,7 @@ func TestExecutorApi_LeaseJobRuns(t *testing.T) { []int32{1000, 2000}, "kubernetes.io/hostname", nil, + priorityClasses, 4*1024*1024, ) require.NoError(t, err) @@ -397,6 +457,7 @@ func TestExecutorApi_Publish(t *testing.T) { []int32{1000, 2000}, "kubernetes.io/hostname", nil, + priorityClasses, 4*1024*1024, ) diff --git a/internal/scheduler/schedulerapp.go b/internal/scheduler/schedulerapp.go index 600a5465e0f..95ef48d9022 100644 --- a/internal/scheduler/schedulerapp.go +++ b/internal/scheduler/schedulerapp.go @@ -161,6 +161,7 @@ func Run(config schedulerconfig.Configuration) error { types.AllowedPriorities(config.Scheduling.PriorityClasses), config.Scheduling.NodeIdLabel, config.Scheduling.PriorityClassNameOverride, + config.Scheduling.PriorityClasses, config.Pulsar.MaxAllowedMessageSize, ) if err != nil {