Skip to content

Commit

Permalink
Add tolerations to away jobs (#3362)
Browse files Browse the repository at this point in the history
  • Loading branch information
zuqq committed Feb 1, 2024
1 parent 4b587f9 commit db37556
Show file tree
Hide file tree
Showing 19 changed files with 615 additions and 311 deletions.
11 changes: 11 additions & 0 deletions internal/armada/repository/apimessages/conversions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
v11 "k8s.io/api/networking/v1"
"k8s.io/apimachinery/pkg/api/resource"

"github.com/armadaproject/armada/internal/scheduler/schedulerobjects"
"github.com/armadaproject/armada/pkg/api"
"github.com/armadaproject/armada/pkg/armadaevents"
)
Expand Down Expand Up @@ -278,6 +279,16 @@ func TestConvertLeased(t *testing.T) {
JobRunLeased: &armadaevents.JobRunLeased{
JobId: jobIdProto,
ExecutorId: executorId,
PodRequirementsOverlay: &schedulerobjects.PodRequirements{
Tolerations: []v1.Toleration{
{
Key: "whale",
Value: "true",
Effect: v1.TaintEffectNoSchedule,
},
},
Priority: 1000,
},
},
},
}
Expand Down
10 changes: 10 additions & 0 deletions internal/common/ingest/testfixtures/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,16 @@ var Leased = &armadaevents.EventSequence_Event{
HasScheduledAtPriority: true,
ScheduledAtPriority: 15,
UpdateSequenceNumber: 1,
PodRequirementsOverlay: &schedulerobjects.PodRequirements{
Tolerations: []v1.Toleration{
{
Key: "whale",
Value: "true",
Effect: v1.TaintEffectNoSchedule,
},
},
Priority: 15,
},
},
},
}
Expand Down
32 changes: 29 additions & 3 deletions internal/scheduler/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/gogo/protobuf/types"
"github.com/google/uuid"
"github.com/pkg/errors"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/clock"

"github.com/armadaproject/armada/internal/common/armadacontext"
Expand Down Expand Up @@ -130,18 +131,29 @@ func (srv *ExecutorApi) LeaseJobRuns(stream executorapi.ExecutorApi_LeaseJobRuns
if err := unmarshalFromCompressedBytes(lease.SubmitMessage, decompressor, submitMsg); err != nil {
return err
}

if srv.priorityClassNameOverride != nil {
srv.setPriorityClassName(submitMsg, *srv.priorityClassNameOverride)
}

srv.addNodeIdSelector(submitMsg, lease.Node)

if len(lease.PodRequirementsOverlay) > 0 {
PodRequirementsOverlay := schedulerobjects.PodRequirements{}
if err := proto.Unmarshal(lease.PodRequirementsOverlay, &PodRequirementsOverlay); err != nil {
return err
}
addTolerations(submitMsg, PodRequirementsOverlay.Tolerations)
}

var groups []string
if len(lease.Groups) > 0 {
groups, err = compress.DecompressStringArray(lease.Groups, decompressor)
if err != nil {
return err
}
}

err := stream.Send(&executorapi.LeaseStreamMessage{
Event: &executorapi.LeaseStreamMessage_Lease{
Lease: &executorapi.JobRunLease{
Expand Down Expand Up @@ -183,6 +195,13 @@ func (srv *ExecutorApi) setPriorityClassName(job *armadaevents.SubmitJob, priori
}
}

func setPriorityClassName(podSpec *armadaevents.PodSpecWithAvoidList, priorityClassName string) {
if podSpec == nil || podSpec.PodSpec == nil {
return
}
podSpec.PodSpec.PriorityClassName = priorityClassName
}

func (srv *ExecutorApi) addNodeIdSelector(job *armadaevents.SubmitJob, nodeId string) {
if job == nil || nodeId == "" {
return
Expand All @@ -206,11 +225,18 @@ func addNodeSelector(podSpec *armadaevents.PodSpecWithAvoidList, key string, val
}
}

func setPriorityClassName(podSpec *armadaevents.PodSpecWithAvoidList, priorityClassName string) {
if podSpec == nil || podSpec.PodSpec == nil {
func addTolerations(job *armadaevents.SubmitJob, tolerations []v1.Toleration) {
if job == nil || len(tolerations) == 0 {
return
}
podSpec.PodSpec.PriorityClassName = priorityClassName
if job.MainObject != nil {
switch typed := job.MainObject.Object.(type) {
case *armadaevents.KubernetesMainObject_PodSpec:
if typed.PodSpec != nil && typed.PodSpec.PodSpec != nil {
typed.PodSpec.PodSpec.Tolerations = append(typed.PodSpec.PodSpec.Tolerations, tolerations...)
}
}
}
}

// ReportEvents publishes all eventSequences to Pulsar. The eventSequences are compacted for more efficient publishing.
Expand Down
70 changes: 62 additions & 8 deletions internal/scheduler/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/armadaproject/armada/internal/common/armadacontext"
"github.com/armadaproject/armada/internal/common/compress"
"github.com/armadaproject/armada/internal/common/mocks"
protoutil "github.com/armadaproject/armada/internal/common/proto"
"github.com/armadaproject/armada/internal/common/pulsarutils"
"github.com/armadaproject/armada/internal/scheduler/database"
schedulermocks "github.com/armadaproject/armada/internal/scheduler/mocks"
Expand Down Expand Up @@ -80,7 +81,12 @@ func TestExecutorApi_LeaseJobRuns(t *testing.T) {
UnassignedJobRuns: []string{runId3.String()},
}

submit, compressedSubmit := submitMsg(t, "node-id")
submit, compressedSubmit := submitMsg(
t,
&v1.PodSpec{
NodeSelector: map[string]string{nodeIdName: "node-id"},
},
)
defaultLease := &database.JobRunLease{
RunID: uuid.New(),
Queue: "test-queue",
Expand All @@ -91,7 +97,7 @@ func TestExecutorApi_LeaseJobRuns(t *testing.T) {
SubmitMessage: compressedSubmit,
}

submitWithoutNodeSelector, compressedSubmitNoNodeSelector := submitMsg(t, "")
submitWithoutNodeSelector, compressedSubmitNoNodeSelector := submitMsg(t, &v1.PodSpec{})
leaseWithoutNode := &database.JobRunLease{
RunID: uuid.New(),
Queue: "test-queue",
Expand All @@ -101,6 +107,39 @@ func TestExecutorApi_LeaseJobRuns(t *testing.T) {
SubmitMessage: compressedSubmitNoNodeSelector,
}

tolerations := []v1.Toleration{
{
Key: "whale",
Value: "true",
Effect: v1.TaintEffectNoSchedule,
},
}
leaseWithTolerations := &database.JobRunLease{
RunID: uuid.New(),
Queue: "test-queue",
JobSet: "test-jobset",
UserID: "test-user",
Node: "node-id",
Groups: compressedGroups,
// We use the submit message without tolerations here, because the
// run-level tolerations are stored in PodRequirementsOverlay.
SubmitMessage: compressedSubmit,
PodRequirementsOverlay: protoutil.MustMarshall(
&schedulerobjects.PodRequirements{
Tolerations: tolerations,
Priority: 1000,
},
),
}
submitWithTolerations, _ := submitMsg(
t,
&v1.PodSpec{
NodeSelector: map[string]string{nodeIdName: "node-id"},
Tolerations: tolerations,
},
)
submitWithTolerations.JobId = submit.JobId

tests := map[string]struct {
request *executorapi.LeaseRequest
runsToCancel []uuid.UUID
Expand Down Expand Up @@ -154,6 +193,26 @@ func TestExecutorApi_LeaseJobRuns(t *testing.T) {
},
},
},
"run with PodRequirementsOverlay": {
request: defaultRequest,
leases: []*database.JobRunLease{leaseWithTolerations},
expectedExecutor: defaultExpectedExecutor,
expectedMsgs: []*executorapi.LeaseStreamMessage{
{
Event: &executorapi.LeaseStreamMessage_Lease{Lease: &executorapi.JobRunLease{
JobRunId: armadaevents.ProtoUuidFromUuid(leaseWithTolerations.RunID),
Queue: leaseWithTolerations.Queue,
Jobset: leaseWithTolerations.JobSet,
User: leaseWithTolerations.UserID,
Groups: groups,
Job: submitWithTolerations,
}},
},
{
Event: &executorapi.LeaseStreamMessage_End{End: &executorapi.EndMarker{}},
},
},
},
"do nothing": {
request: defaultRequest,
expectedExecutor: defaultExpectedExecutor,
Expand Down Expand Up @@ -347,12 +406,7 @@ func TestExecutorApi_Publish(t *testing.T) {
}
}

func submitMsg(t *testing.T, nodeName string) (*armadaevents.SubmitJob, []byte) {
podSpec := &v1.PodSpec{}
if nodeName != "" {
podSpec.NodeSelector = map[string]string{}
podSpec.NodeSelector[nodeIdName] = nodeName
}
func submitMsg(t *testing.T, podSpec *v1.PodSpec) (*armadaevents.SubmitJob, []byte) {
submitMsg := &armadaevents.SubmitJob{
JobId: armadaevents.ProtoUuidFromUuid(uuid.New()),
MainObject: &armadaevents.KubernetesMainObject{
Expand Down
19 changes: 10 additions & 9 deletions internal/scheduler/database/job_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,14 @@ type hasSerial interface {
}

type JobRunLease struct {
RunID uuid.UUID
Queue string
JobSet string
UserID string
Node string
Groups []byte
SubmitMessage []byte
RunID uuid.UUID
Queue string
JobSet string
UserID string
Node string
Groups []byte
SubmitMessage []byte
PodRequirementsOverlay []byte
}

// JobRepository is an interface to be implemented by structs which provide job and run information
Expand Down Expand Up @@ -246,7 +247,7 @@ func (r *PostgresJobRepository) FetchJobRunLeases(ctx *armadacontext.Context, ex
}

query := `
SELECT jr.run_id, jr.node, j.queue, j.job_set, j.user_id, j.groups, j.submit_message
SELECT jr.run_id, jr.node, j.queue, j.job_set, j.user_id, j.groups, j.submit_message, jr.pod_requirements_overlay
FROM runs jr
LEFT JOIN %s as tmp ON (tmp.run_id = jr.run_id)
JOIN jobs j
Expand All @@ -267,7 +268,7 @@ func (r *PostgresJobRepository) FetchJobRunLeases(ctx *armadacontext.Context, ex
defer rows.Close()
for rows.Next() {
run := JobRunLease{}
err = rows.Scan(&run.RunID, &run.Node, &run.Queue, &run.JobSet, &run.UserID, &run.Groups, &run.SubmitMessage)
err = rows.Scan(&run.RunID, &run.Node, &run.Queue, &run.JobSet, &run.UserID, &run.Groups, &run.SubmitMessage, &run.PodRequirementsOverlay)
if err != nil {
return errors.WithStack(err)
}
Expand Down
39 changes: 30 additions & 9 deletions internal/scheduler/database/job_repository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/exp/slices"
v1 "k8s.io/api/core/v1"

"github.com/armadaproject/armada/internal/common/armadacontext"
"github.com/armadaproject/armada/internal/common/compress"
"github.com/armadaproject/armada/internal/common/database"
protoutil "github.com/armadaproject/armada/internal/common/proto"
"github.com/armadaproject/armada/internal/common/util"
"github.com/armadaproject/armada/internal/scheduler/schedulerobjects"
"github.com/armadaproject/armada/pkg/armadaevents"
)

Expand Down Expand Up @@ -402,6 +404,24 @@ func TestFetchJobRunLeases(t *testing.T) {
JobSet: "test-jobset",
Executor: executorName,
},
{
RunID: uuid.New(),
JobID: dbJobs[2].JobID,
JobSet: "test-jobset",
Executor: executorName,
PodRequirementsOverlay: protoutil.MustMarshall(
&schedulerobjects.PodRequirements{
Tolerations: []v1.Toleration{
{
Key: "whale",
Value: "true",
Effect: v1.TaintEffectNoSchedule,
},
},
Priority: 1000,
},
),
},
{
RunID: uuid.New(),
JobID: dbJobs[0].JobID,
Expand All @@ -424,15 +444,16 @@ func TestFetchJobRunLeases(t *testing.T) {
Succeeded: true, // should be ignored as terminal
},
}
expectedLeases := make([]*JobRunLease, 3)
expectedLeases := make([]*JobRunLease, 4)
for i := range expectedLeases {
expectedLeases[i] = &JobRunLease{
RunID: dbRuns[i].RunID,
Queue: dbJobs[i].Queue,
JobSet: dbJobs[i].JobSet,
UserID: dbJobs[i].UserID,
Groups: dbJobs[i].Groups,
SubmitMessage: dbJobs[i].SubmitMessage,
RunID: dbRuns[i].RunID,
Queue: dbJobs[i].Queue,
JobSet: dbJobs[i].JobSet,
UserID: dbJobs[i].UserID,
Groups: dbJobs[i].Groups,
SubmitMessage: dbJobs[i].SubmitMessage,
PodRequirementsOverlay: dbRuns[i].PodRequirementsOverlay,
}
}
tests := map[string]struct {
Expand Down Expand Up @@ -465,12 +486,12 @@ func TestFetchJobRunLeases(t *testing.T) {
excludedRuns: []uuid.UUID{dbRuns[1].RunID},
maxRowsToFetch: 100,
executor: executorName,
expectedLeases: []*JobRunLease{expectedLeases[0], expectedLeases[2]},
expectedLeases: []*JobRunLease{expectedLeases[0], expectedLeases[2], expectedLeases[3]},
},
"exclude everything": {
dbJobs: dbJobs,
dbRuns: dbRuns,
excludedRuns: []uuid.UUID{dbRuns[0].RunID, dbRuns[1].RunID, dbRuns[2].RunID},
excludedRuns: []uuid.UUID{dbRuns[0].RunID, dbRuns[1].RunID, dbRuns[2].RunID, dbRuns[3].RunID},
maxRowsToFetch: 100,
executor: executorName,
expectedLeases: nil,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- The pod_requirements_overlay column holds a serialized PodRequirements
-- object that can be used to modify the pod requirements of a job; for
-- example, it is used to add additional tolerations to runs that are scheduled
-- as away jobs.
ALTER TABLE runs ADD COLUMN pod_requirements_overlay bytea;
Loading

0 comments on commit db37556

Please sign in to comment.