Skip to content

Commit

Permalink
Add support for default job limits. (#432)
Browse files Browse the repository at this point in the history
  • Loading branch information
jankaspar committed Sep 23, 2020
1 parent 29bcedd commit 77f209b
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 10 deletions.
1 change: 1 addition & 0 deletions internal/armada/configuration/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type SchedulingConfig struct {
MaximalResourceFractionToSchedulePerQueue map[string]float64
MaximalResourceFractionPerQueue map[string]float64
Lease LeaseSettings
DefaultJobLimits common.ComputeResources
}

type EventRetentionPolicy struct {
Expand Down
36 changes: 33 additions & 3 deletions internal/armada/repository/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@ import (
"github.com/go-redis/redis"
"github.com/gogo/protobuf/proto"
log "github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"

"github.com/G-Research/armada/internal/armada/authorization"
"github.com/G-Research/armada/internal/common"
"github.com/G-Research/armada/internal/common/util"
"github.com/G-Research/armada/internal/common/validation"
"github.com/G-Research/armada/pkg/api"
Expand Down Expand Up @@ -42,11 +45,15 @@ type JobRepository interface {
}

type RedisJobRepository struct {
db redis.UniversalClient
db redis.UniversalClient
defaultJobLimits common.ComputeResources
}

func NewRedisJobRepository(db redis.UniversalClient) *RedisJobRepository {
return &RedisJobRepository{db: db}
func NewRedisJobRepository(db redis.UniversalClient, defaultJobLimits common.ComputeResources) *RedisJobRepository {
if defaultJobLimits == nil {
defaultJobLimits = common.ComputeResources{}
}
return &RedisJobRepository{db: db, defaultJobLimits: defaultJobLimits}
}

func (repo *RedisJobRepository) CreateJobs(request *api.JobSubmitRequest, principal authorization.Principal) ([]*api.Job, error) {
Expand All @@ -62,6 +69,7 @@ func (repo *RedisJobRepository) CreateJobs(request *api.JobSubmitRequest, princi

for i, item := range request.JobRequestItems {

repo.applyDefaults(item.PodSpec)
e := validation.ValidatePodSpec(item.PodSpec)
if e != nil {
return nil, fmt.Errorf("error validating pod spec of job with index %v: %v", i, e)
Expand Down Expand Up @@ -550,6 +558,28 @@ func (repo *RedisJobRepository) leaseJobs(clusterId string, jobs []*api.Job) ([]
return leasedJobs, nil
}

func (repo *RedisJobRepository) applyDefaults(spec *v1.PodSpec) {
if spec != nil {
for i := range spec.Containers {
c := &spec.Containers[i]
if c.Resources.Limits == nil {
c.Resources.Limits = map[v1.ResourceName]resource.Quantity{}
}
if c.Resources.Requests == nil {
c.Resources.Requests = map[v1.ResourceName]resource.Quantity{}
}
for k, v := range repo.defaultJobLimits {
_, limitExists := c.Resources.Limits[v1.ResourceName(k)]
_, requestExists := c.Resources.Limits[v1.ResourceName(k)]
if !limitExists && !requestExists {
c.Resources.Requests[v1.ResourceName(k)] = v
c.Resources.Limits[v1.ResourceName(k)] = v
}
}
}
}
}

func leaseJob(db redis.Cmdable, queueName string, clusterId string, jobId string, now time.Time) *redis.Cmd {
return leaseJobScript.Run(db, []string{jobQueuePrefix + queueName, jobLeasedPrefix + queueName, jobClusterMapKey},
clusterId, jobId, float64(now.UnixNano()))
Expand Down
62 changes: 57 additions & 5 deletions internal/armada/repository/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"k8s.io/apimachinery/pkg/api/resource"

"github.com/G-Research/armada/internal/armada/authorization"
"github.com/G-Research/armada/internal/common"
"github.com/G-Research/armada/pkg/api"
)

Expand Down Expand Up @@ -229,6 +230,49 @@ func TestGetQueueActiveJobSets(t *testing.T) {
})
}

func TestCreateJob_ApplyDefaultLimitss(t *testing.T) {
defaults := common.ComputeResources{
"cpu": resource.MustParse("1"),
"memory": resource.MustParse("512Mi"),
"ephemeral-storage": resource.MustParse("4Gi")}

withRepositoryUsingJobDefaults(defaults, func(r *RedisJobRepository) {
testCases := map[*v1.ResourceList]v1.ResourceList{
nil: {
"cpu": resource.MustParse("1"),
"memory": resource.MustParse("512Mi"),
"ephemeral-storage": resource.MustParse("4Gi"),
},
{
"cpu": resource.MustParse("2"),
}: {
"cpu": resource.MustParse("2"),
"memory": resource.MustParse("512Mi"),
"ephemeral-storage": resource.MustParse("4Gi"),
},
{
"nvidia/gpu": resource.MustParse("3"),
}: {
"cpu": resource.MustParse("1"),
"memory": resource.MustParse("512Mi"),
"ephemeral-storage": resource.MustParse("4Gi"),
"nvidia/gpu": resource.MustParse("3"),
},
}

for requirements, expected := range testCases {
resources := v1.ResourceRequirements{}
if requirements != nil {
resources.Requests = *requirements
resources.Limits = *requirements
}
job := addTestJobWithRequirements(t, r, "test", resources)
assert.Equal(t, expected, job.PodSpec.Containers[0].Resources.Limits)
assert.Equal(t, expected, job.PodSpec.Containers[0].Resources.Requests)
}
})
}

func addLeasedJob(t *testing.T, r *RedisJobRepository, queue string, cluster string) *api.Job {
job := addTestJob(t, r, queue)
leased, e := r.TryLeaseJobs(cluster, queue, []*api.Job{job})
Expand All @@ -242,6 +286,14 @@ func addTestJob(t *testing.T, r *RedisJobRepository, queue string) *api.Job {
cpu := resource.MustParse("1")
memory := resource.MustParse("512Mi")

return addTestJobWithRequirements(t, r, queue, v1.ResourceRequirements{
Limits: v1.ResourceList{"cpu": cpu, "memory": memory},
Requests: v1.ResourceList{"cpu": cpu, "memory": memory},
})
}

func addTestJobWithRequirements(t *testing.T, r *RedisJobRepository, queue string, requirements v1.ResourceRequirements) *api.Job {

jobs, e := r.CreateJobs(&api.JobSubmitRequest{
Queue: queue,
JobSetId: "set1",
Expand All @@ -251,10 +303,7 @@ func addTestJob(t *testing.T, r *RedisJobRepository, queue string) *api.Job {
PodSpec: &v1.PodSpec{
Containers: []v1.Container{
{
Resources: v1.ResourceRequirements{
Limits: v1.ResourceList{"cpu": cpu, "memory": memory},
Requests: v1.ResourceList{"cpu": cpu, "memory": memory},
},
Resources: requirements,
},
},
},
Expand All @@ -272,13 +321,16 @@ func addTestJob(t *testing.T, r *RedisJobRepository, queue string) *api.Job {
}

func withRepository(action func(r *RedisJobRepository)) {
withRepositoryUsingJobDefaults(nil, action)
}

func withRepositoryUsingJobDefaults(jobDefaultLimit common.ComputeResources, action func(r *RedisJobRepository)) {
client := redis.NewClient(&redis.Options{Addr: "localhost:6379", DB: 10})
defer client.FlushDB()
defer client.Close()

client.FlushDB()

repo := NewRedisJobRepository(client)
repo := NewRedisJobRepository(client, jobDefaultLimit)
action(repo)
}
2 changes: 1 addition & 1 deletion internal/armada/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func Serve(config *configuration.ArmadaConfig) (func(), *sync.WaitGroup) {
db := createRedisClient(&config.Redis)
eventsDb := createRedisClient(&config.EventsRedis)

jobRepository := repository.NewRedisJobRepository(db)
jobRepository := repository.NewRedisJobRepository(db, config.Scheduling.DefaultJobLimits)
usageRepository := repository.NewRedisUsageRepository(db)
queueRepository := repository.NewRedisQueueRepository(db)
schedulingInfoRepository := repository.NewRedisSchedulingInfoRepository(db)
Expand Down
2 changes: 1 addition & 1 deletion internal/armada/server/submit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func withSubmitServer(action func(s *SubmitServer, events repository.EventReposi
// using real redis instance as miniredis does not support streams
client := redis.NewClient(&redis.Options{Addr: "localhost:6379", DB: 10})

jobRepo := repository.NewRedisJobRepository(client)
jobRepo := repository.NewRedisJobRepository(client, nil)
queueRepo := repository.NewRedisQueueRepository(client)
eventRepo := repository.NewRedisEventRepository(client, configuration.EventRetentionPolicy{ExpiryEnabled: false})
schedulingInfoRepository := repository.NewRedisSchedulingInfoRepository(client)
Expand Down

0 comments on commit 77f209b

Please sign in to comment.