From 77f209be88fc107b3031d195498c3c097cd3889d Mon Sep 17 00:00:00 2001 From: jankaspar <2270833+jankaspar@users.noreply.github.com> Date: Wed, 23 Sep 2020 14:47:52 +0100 Subject: [PATCH] Add support for default job limits. (#432) --- internal/armada/configuration/types.go | 1 + internal/armada/repository/job.go | 36 +++++++++++++-- internal/armada/repository/job_test.go | 62 +++++++++++++++++++++++--- internal/armada/server.go | 2 +- internal/armada/server/submit_test.go | 2 +- 5 files changed, 93 insertions(+), 10 deletions(-) diff --git a/internal/armada/configuration/types.go b/internal/armada/configuration/types.go index 88dc88a207e..2168c5c4069 100644 --- a/internal/armada/configuration/types.go +++ b/internal/armada/configuration/types.go @@ -58,6 +58,7 @@ type SchedulingConfig struct { MaximalResourceFractionToSchedulePerQueue map[string]float64 MaximalResourceFractionPerQueue map[string]float64 Lease LeaseSettings + DefaultJobLimits common.ComputeResources } type EventRetentionPolicy struct { diff --git a/internal/armada/repository/job.go b/internal/armada/repository/job.go index e8aa198b1d7..891c103d11a 100644 --- a/internal/armada/repository/job.go +++ b/internal/armada/repository/job.go @@ -8,8 +8,11 @@ import ( "github.com/go-redis/redis" "github.com/gogo/protobuf/proto" log "github.com/sirupsen/logrus" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" "github.com/G-Research/armada/internal/armada/authorization" + "github.com/G-Research/armada/internal/common" "github.com/G-Research/armada/internal/common/util" "github.com/G-Research/armada/internal/common/validation" "github.com/G-Research/armada/pkg/api" @@ -42,11 +45,15 @@ type JobRepository interface { } type RedisJobRepository struct { - db redis.UniversalClient + db redis.UniversalClient + defaultJobLimits common.ComputeResources } -func NewRedisJobRepository(db redis.UniversalClient) *RedisJobRepository { - return &RedisJobRepository{db: db} +func NewRedisJobRepository(db redis.UniversalClient, defaultJobLimits common.ComputeResources) *RedisJobRepository { + if defaultJobLimits == nil { + defaultJobLimits = common.ComputeResources{} + } + return &RedisJobRepository{db: db, defaultJobLimits: defaultJobLimits} } func (repo *RedisJobRepository) CreateJobs(request *api.JobSubmitRequest, principal authorization.Principal) ([]*api.Job, error) { @@ -62,6 +69,7 @@ func (repo *RedisJobRepository) CreateJobs(request *api.JobSubmitRequest, princi for i, item := range request.JobRequestItems { + repo.applyDefaults(item.PodSpec) e := validation.ValidatePodSpec(item.PodSpec) if e != nil { return nil, fmt.Errorf("error validating pod spec of job with index %v: %v", i, e) @@ -550,6 +558,28 @@ func (repo *RedisJobRepository) leaseJobs(clusterId string, jobs []*api.Job) ([] return leasedJobs, nil } +func (repo *RedisJobRepository) applyDefaults(spec *v1.PodSpec) { + if spec != nil { + for i := range spec.Containers { + c := &spec.Containers[i] + if c.Resources.Limits == nil { + c.Resources.Limits = map[v1.ResourceName]resource.Quantity{} + } + if c.Resources.Requests == nil { + c.Resources.Requests = map[v1.ResourceName]resource.Quantity{} + } + for k, v := range repo.defaultJobLimits { + _, limitExists := c.Resources.Limits[v1.ResourceName(k)] + _, requestExists := c.Resources.Limits[v1.ResourceName(k)] + if !limitExists && !requestExists { + c.Resources.Requests[v1.ResourceName(k)] = v + c.Resources.Limits[v1.ResourceName(k)] = v + } + } + } + } +} + func leaseJob(db redis.Cmdable, queueName string, clusterId string, jobId string, now time.Time) *redis.Cmd { return leaseJobScript.Run(db, []string{jobQueuePrefix + queueName, jobLeasedPrefix + queueName, jobClusterMapKey}, clusterId, jobId, float64(now.UnixNano())) diff --git a/internal/armada/repository/job_test.go b/internal/armada/repository/job_test.go index 106908d7d79..9bc901e4b85 100644 --- a/internal/armada/repository/job_test.go +++ b/internal/armada/repository/job_test.go @@ -10,6 +10,7 @@ import ( "k8s.io/apimachinery/pkg/api/resource" "github.com/G-Research/armada/internal/armada/authorization" + "github.com/G-Research/armada/internal/common" "github.com/G-Research/armada/pkg/api" ) @@ -229,6 +230,49 @@ func TestGetQueueActiveJobSets(t *testing.T) { }) } +func TestCreateJob_ApplyDefaultLimitss(t *testing.T) { + defaults := common.ComputeResources{ + "cpu": resource.MustParse("1"), + "memory": resource.MustParse("512Mi"), + "ephemeral-storage": resource.MustParse("4Gi")} + + withRepositoryUsingJobDefaults(defaults, func(r *RedisJobRepository) { + testCases := map[*v1.ResourceList]v1.ResourceList{ + nil: { + "cpu": resource.MustParse("1"), + "memory": resource.MustParse("512Mi"), + "ephemeral-storage": resource.MustParse("4Gi"), + }, + { + "cpu": resource.MustParse("2"), + }: { + "cpu": resource.MustParse("2"), + "memory": resource.MustParse("512Mi"), + "ephemeral-storage": resource.MustParse("4Gi"), + }, + { + "nvidia/gpu": resource.MustParse("3"), + }: { + "cpu": resource.MustParse("1"), + "memory": resource.MustParse("512Mi"), + "ephemeral-storage": resource.MustParse("4Gi"), + "nvidia/gpu": resource.MustParse("3"), + }, + } + + for requirements, expected := range testCases { + resources := v1.ResourceRequirements{} + if requirements != nil { + resources.Requests = *requirements + resources.Limits = *requirements + } + job := addTestJobWithRequirements(t, r, "test", resources) + assert.Equal(t, expected, job.PodSpec.Containers[0].Resources.Limits) + assert.Equal(t, expected, job.PodSpec.Containers[0].Resources.Requests) + } + }) +} + func addLeasedJob(t *testing.T, r *RedisJobRepository, queue string, cluster string) *api.Job { job := addTestJob(t, r, queue) leased, e := r.TryLeaseJobs(cluster, queue, []*api.Job{job}) @@ -242,6 +286,14 @@ func addTestJob(t *testing.T, r *RedisJobRepository, queue string) *api.Job { cpu := resource.MustParse("1") memory := resource.MustParse("512Mi") + return addTestJobWithRequirements(t, r, queue, v1.ResourceRequirements{ + Limits: v1.ResourceList{"cpu": cpu, "memory": memory}, + Requests: v1.ResourceList{"cpu": cpu, "memory": memory}, + }) +} + +func addTestJobWithRequirements(t *testing.T, r *RedisJobRepository, queue string, requirements v1.ResourceRequirements) *api.Job { + jobs, e := r.CreateJobs(&api.JobSubmitRequest{ Queue: queue, JobSetId: "set1", @@ -251,10 +303,7 @@ func addTestJob(t *testing.T, r *RedisJobRepository, queue string) *api.Job { PodSpec: &v1.PodSpec{ Containers: []v1.Container{ { - Resources: v1.ResourceRequirements{ - Limits: v1.ResourceList{"cpu": cpu, "memory": memory}, - Requests: v1.ResourceList{"cpu": cpu, "memory": memory}, - }, + Resources: requirements, }, }, }, @@ -272,13 +321,16 @@ func addTestJob(t *testing.T, r *RedisJobRepository, queue string) *api.Job { } func withRepository(action func(r *RedisJobRepository)) { + withRepositoryUsingJobDefaults(nil, action) +} +func withRepositoryUsingJobDefaults(jobDefaultLimit common.ComputeResources, action func(r *RedisJobRepository)) { client := redis.NewClient(&redis.Options{Addr: "localhost:6379", DB: 10}) defer client.FlushDB() defer client.Close() client.FlushDB() - repo := NewRedisJobRepository(client) + repo := NewRedisJobRepository(client, jobDefaultLimit) action(repo) } diff --git a/internal/armada/server.go b/internal/armada/server.go index c186c3cac01..987ec9e696a 100644 --- a/internal/armada/server.go +++ b/internal/armada/server.go @@ -39,7 +39,7 @@ func Serve(config *configuration.ArmadaConfig) (func(), *sync.WaitGroup) { db := createRedisClient(&config.Redis) eventsDb := createRedisClient(&config.EventsRedis) - jobRepository := repository.NewRedisJobRepository(db) + jobRepository := repository.NewRedisJobRepository(db, config.Scheduling.DefaultJobLimits) usageRepository := repository.NewRedisUsageRepository(db) queueRepository := repository.NewRedisQueueRepository(db) schedulingInfoRepository := repository.NewRedisSchedulingInfoRepository(db) diff --git a/internal/armada/server/submit_test.go b/internal/armada/server/submit_test.go index f7e2b1c78be..d3f6d8b445c 100644 --- a/internal/armada/server/submit_test.go +++ b/internal/armada/server/submit_test.go @@ -156,7 +156,7 @@ func withSubmitServer(action func(s *SubmitServer, events repository.EventReposi // using real redis instance as miniredis does not support streams client := redis.NewClient(&redis.Options{Addr: "localhost:6379", DB: 10}) - jobRepo := repository.NewRedisJobRepository(client) + jobRepo := repository.NewRedisJobRepository(client, nil) queueRepo := repository.NewRedisQueueRepository(client) eventRepo := repository.NewRedisEventRepository(client, configuration.EventRetentionPolicy{ExpiryEnabled: false}) schedulingInfoRepository := repository.NewRedisSchedulingInfoRepository(client)