Skip to content

Commit

Permalink
Add support for targeting node labels (#283)
Browse files Browse the repository at this point in the history
* Add support for targetting node labels
fixes #280

* Add test for label matching.
  • Loading branch information
jankaspar authored Dec 17, 2019
1 parent 1365f1c commit e642796
Show file tree
Hide file tree
Showing 16 changed files with 949 additions and 138 deletions.
6 changes: 6 additions & 0 deletions client/DotNet/Armada.Client/ClientGenerated.cs
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,9 @@ public partial class ApiJob
[Newtonsoft.Json.JsonProperty("Queue", Required = Newtonsoft.Json.Required.Default, NullValueHandling = Newtonsoft.Json.NullValueHandling.Ignore)]
public string Queue { get; set; }

[Newtonsoft.Json.JsonProperty("RequiredNodeLabels", Required = Newtonsoft.Json.Required.Default, NullValueHandling = Newtonsoft.Json.NullValueHandling.Ignore)]
public System.Collections.Generic.IDictionary<string, string> RequiredNodeLabels { get; set; }


}

Expand Down Expand Up @@ -801,6 +804,9 @@ public partial class ApiJobSubmitRequestItem
[Newtonsoft.Json.JsonProperty("Priority", Required = Newtonsoft.Json.Required.Default, NullValueHandling = Newtonsoft.Json.NullValueHandling.Ignore)]
public double? Priority { get; set; }

[Newtonsoft.Json.JsonProperty("RequiredNodeLabels", Required = Newtonsoft.Json.Required.Default, NullValueHandling = Newtonsoft.Json.NullValueHandling.Ignore)]
public System.Collections.Generic.IDictionary<string, string> RequiredNodeLabels { get; set; }


}

Expand Down
12 changes: 12 additions & 0 deletions internal/armada/api/api.swagger.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,12 @@ func SwaggerJsonTemplate() string {
" },\n" +
" \"Queue\": {\n" +
" \"type\": \"string\"\n" +
" },\n" +
" \"RequiredNodeLabels\": {\n" +
" \"type\": \"object\",\n" +
" \"additionalProperties\": {\n" +
" \"type\": \"string\"\n" +
" }\n" +
" }\n" +
" }\n" +
" },\n" +
Expand Down Expand Up @@ -536,6 +542,12 @@ func SwaggerJsonTemplate() string {
" \"Priority\": {\n" +
" \"type\": \"number\",\n" +
" \"format\": \"double\"\n" +
" },\n" +
" \"RequiredNodeLabels\": {\n" +
" \"type\": \"object\",\n" +
" \"additionalProperties\": {\n" +
" \"type\": \"string\"\n" +
" }\n" +
" }\n" +
" }\n" +
" },\n" +
Expand Down
12 changes: 12 additions & 0 deletions internal/armada/api/api.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,12 @@
},
"Queue": {
"type": "string"
},
"RequiredNodeLabels": {
"type": "object",
"additionalProperties": {
"type": "string"
}
}
}
},
Expand Down Expand Up @@ -525,6 +531,12 @@
"Priority": {
"type": "number",
"format": "double"
},
"RequiredNodeLabels": {
"type": "object",
"additionalProperties": {
"type": "string"
}
}
}
},
Expand Down
627 changes: 566 additions & 61 deletions internal/armada/api/queue.pb.go

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions internal/armada/api/queue.proto
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ message Job {
string Namespace = 7;
map<string, string> Labels = 9;
map<string, string> Annotations = 10;
map<string, string> RequiredNodeLabels = 11;
string Owner = 8;
double Priority = 4;
k8s.io.api.core.v1.PodSpec PodSpec = 5;
Expand All @@ -24,6 +25,11 @@ message Job {
message LeaseRequest {
string ClusterId = 1;
map<string, k8s.io.apimachinery.pkg.api.resource.Quantity> Resources = 2 [(gogoproto.nullable) = false];
repeated NodeLabeling AvailableLabels = 3;
}

message NodeLabeling {
map<string,string> Labels = 3;
}

message JobLease {
Expand Down
261 changes: 212 additions & 49 deletions internal/armada/api/submit.pb.go

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions internal/armada/api/submit.proto
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ message JobSubmitRequestItem {
string Namespace = 3;
map<string, string> Labels = 4;
map<string, string> Annotations = 5;
map<string, string> RequiredNodeLabels = 6;
k8s.io.api.core.v1.PodSpec PodSpec = 2;
}

Expand Down
2 changes: 2 additions & 0 deletions internal/armada/repository/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ func (repo *RedisJobRepository) CreateJobs(request *api.JobSubmitRequest, princi
Labels: item.Labels,
Annotations: item.Annotations,

RequiredNodeLabels: item.RequiredNodeLabels,

Priority: item.Priority,

PodSpec: item.PodSpec,
Expand Down
37 changes: 27 additions & 10 deletions internal/armada/server/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,14 @@ func (q AggregatedQueueServer) LeaseJobs(ctx context.Context, request *api.Lease

jobs := []*api.Job{}
if !q.schedulingConfig.UseProbabilisticSchedulingForAllResources {
jobs, e = q.assignJobs(request.ClusterId, slices)
jobs, e = q.assignJobs(request, slices)
if e != nil {
log.Errorf("Error when leasing jobs for cluster %s: %s", request.ClusterId, e)
return nil, e
}
}

additionalJobs, e := q.distributeRemainder(scarcity, request.ClusterId, activeQueuePriority, slices)
additionalJobs, e := q.distributeRemainder(request, scarcity, activeQueuePriority, slices)
if e != nil {
log.Errorf("Error when leasing jobs for cluster %s: %s", request.ClusterId, e)
return nil, e
Expand Down Expand Up @@ -146,11 +146,11 @@ func (q *AggregatedQueueServer) ReportDone(ctx context.Context, idList *api.IdLi
return &api.IdList{cleaned}, e
}

func (q *AggregatedQueueServer) assignJobs(clusterId string, slices map[*api.Queue]common.ComputeResourcesFloat) ([]*api.Job, error) {
func (q *AggregatedQueueServer) assignJobs(request *api.LeaseRequest, slices map[*api.Queue]common.ComputeResourcesFloat) ([]*api.Job, error) {
jobs := make([]*api.Job, 0)
// TODO: parallelize
for queue, slice := range slices {
leased, remainder, e := q.leaseJobs(clusterId, queue, slice, -1)
leased, remainder, e := q.leaseJobs(request, queue, slice, -1)
if e != nil {
log.Error(e)
continue
Expand All @@ -161,7 +161,7 @@ func (q *AggregatedQueueServer) assignJobs(clusterId string, slices map[*api.Que
return jobs, nil
}

func (q *AggregatedQueueServer) distributeRemainder(resourceScarcity map[string]float64, clusterId string, priorities map[*api.Queue]scheduling.QueuePriorityInfo, slices map[*api.Queue]common.ComputeResourcesFloat) ([]*api.Job, error) {
func (q *AggregatedQueueServer) distributeRemainder(request *api.LeaseRequest, resourceScarcity map[string]float64, priorities map[*api.Queue]scheduling.QueuePriorityInfo, slices map[*api.Queue]common.ComputeResourcesFloat) ([]*api.Job, error) {
jobs := []*api.Job{}
remainder := common.ComputeResourcesFloat{}
shares := map[*api.Queue]float64{}
Expand All @@ -177,7 +177,7 @@ func (q *AggregatedQueueServer) distributeRemainder(resourceScarcity map[string]
queue := q.pickQueueRandomly(shares)
emptySteps++

leased, remaining, e := q.leaseJobs(clusterId, queue, remainder, 1)
leased, remaining, e := q.leaseJobs(request, queue, remainder, 1)
if e != nil {
log.Error(e)
continue
Expand Down Expand Up @@ -215,7 +215,7 @@ func (q *AggregatedQueueServer) pickQueueRandomly(shares map[*api.Queue]float64)
return lastQueue
}

func (q *AggregatedQueueServer) leaseJobs(clusterId string, queue *api.Queue, slice common.ComputeResourcesFloat, limit int) ([]*api.Job, common.ComputeResourcesFloat, error) {
func (q *AggregatedQueueServer) leaseJobs(request *api.LeaseRequest, queue *api.Queue, slice common.ComputeResourcesFloat, limit int) ([]*api.Job, common.ComputeResourcesFloat, error) {
jobs := make([]*api.Job, 0)
remainder := slice
for slice.IsValid() {
Expand All @@ -230,7 +230,7 @@ func (q *AggregatedQueueServer) leaseJobs(clusterId string, queue *api.Queue, sl
requirement := common.TotalResourceRequest(job.PodSpec).AsFloat()
remainder = slice.DeepCopy()
remainder.Sub(requirement)
if remainder.IsValid() {
if remainder.IsValid() && matchRequirements(job, request) {
slice = remainder
candidates = append(candidates, job)
}
Expand All @@ -239,7 +239,7 @@ func (q *AggregatedQueueServer) leaseJobs(clusterId string, queue *api.Queue, sl
}
}

leased, e := q.jobRepository.TryLeaseJobs(clusterId, queue.Name, candidates)
leased, e := q.jobRepository.TryLeaseJobs(request.ClusterId, queue.Name, candidates)
if e != nil {
return nil, slice, e
}
Expand All @@ -256,11 +256,28 @@ func (q *AggregatedQueueServer) leaseJobs(clusterId string, queue *api.Queue, sl
}
}

go reportJobsLeased(q.eventRepository, jobs, clusterId)
go reportJobsLeased(q.eventRepository, jobs, request.ClusterId)

return jobs, slice, nil
}

func matchRequirements(job *api.Job, request *api.LeaseRequest) bool {
if len(job.RequiredNodeLabels) == 0 {
return true
}

Labels:
for _, labeling := range request.AvailableLabels {
for k, v := range job.RequiredNodeLabels {
if labeling.Labels[k] != v {
continue Labels
}
}
return true
}
return false
}

func filterPriorityMapByKeys(original map[*api.Queue]scheduling.QueuePriorityInfo, keys []*api.Queue) map[*api.Queue]scheduling.QueuePriorityInfo {
result := make(map[*api.Queue]scheduling.QueuePriorityInfo)
for _, key := range keys {
Expand Down
28 changes: 28 additions & 0 deletions internal/armada/server/lease_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package server

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/G-Research/armada/internal/armada/api"
)

func Test_matchRequirements(t *testing.T) {

job := &api.Job{RequiredNodeLabels: map[string]string{"armada/region": "eu", "armada/zone": "1"}}

assert.False(t, matchRequirements(job, &api.LeaseRequest{}))
assert.False(t, matchRequirements(job, &api.LeaseRequest{AvailableLabels: []*api.NodeLabeling{
{Labels: map[string]string{"armada/region": "eu"}},
{Labels: map[string]string{"armada/zone": "2"}},
}}))
assert.False(t, matchRequirements(job, &api.LeaseRequest{AvailableLabels: []*api.NodeLabeling{
{Labels: map[string]string{"armada/region": "eu", "armada/zone": "2"}},
}}))

assert.True(t, matchRequirements(job, &api.LeaseRequest{AvailableLabels: []*api.NodeLabeling{
{Labels: map[string]string{"x": "y"}},
{Labels: map[string]string{"armada/region": "eu", "armada/zone": "1", "x": "y"}},
}}))
}
3 changes: 2 additions & 1 deletion internal/executor/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ func StartUp(config configuration.ExecutorConfiguration) (func(), *sync.WaitGrou

clusterUtilisationService := service.NewClusterUtilisationService(
clusterContext,
usageClient)
usageClient,
config.Kubernetes.TrackedNodeLabels)

stuckPodDetector := service.NewPodProgressMonitorService(
clusterContext,
Expand Down
3 changes: 2 additions & 1 deletion internal/executor/configuration/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ type ApplicationConfiguration struct {
}

type KubernetesConfiguration struct {
ImpersonateUsers bool
ImpersonateUsers bool
TrackedNodeLabels []string
}

type TaskConfiguration struct {
Expand Down
4 changes: 2 additions & 2 deletions internal/executor/service/cluster_allocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ func NewClusterAllocationService(
}

func (allocationService *ClusterAllocationService) AllocateSpareClusterCapacity() {
availableResource, err := allocationService.utilisationService.GetAvailableClusterCapacity()
availableResource, availableLabels, err := allocationService.utilisationService.GetAvailableClusterCapacity()
if err != nil {
log.Errorf("Failed to allocate spare cluster capacity because %s", err)
return
}

newJobs, err := allocationService.leaseService.RequestJobLeases(availableResource)
newJobs, err := allocationService.leaseService.RequestJobLeases(availableResource, availableLabels)

cpu := (*availableResource)["cpu"]
memory := (*availableResource)["memory"]
Expand Down
46 changes: 36 additions & 10 deletions internal/executor/service/cluster_utilisation.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,24 @@ import (
)

type UtilisationService interface {
GetAvailableClusterCapacity() (*common.ComputeResources, error)
GetAvailableClusterCapacity() (*common.ComputeResources, []map[string]string, error)
}

type ClusterUtilisationService struct {
clusterContext context.ClusterContext
usageClient api.UsageClient
clusterContext context.ClusterContext
usageClient api.UsageClient
trackedNodeLabels []string
}

func NewClusterUtilisationService(
clusterContext context.ClusterContext,
usageClient api.UsageClient) *ClusterUtilisationService {
usageClient api.UsageClient,
trackedNodeLabels []string) *ClusterUtilisationService {

return &ClusterUtilisationService{
clusterContext: clusterContext,
usageClient: usageClient}
clusterContext: clusterContext,
usageClient: usageClient,
trackedNodeLabels: trackedNodeLabels}
}

func (clusterUtilisationService *ClusterUtilisationService) ReportClusterUtilisation() {
Expand Down Expand Up @@ -63,15 +66,15 @@ func (clusterUtilisationService *ClusterUtilisationService) ReportClusterUtilisa
}
}

func (clusterUtilisationService *ClusterUtilisationService) GetAvailableClusterCapacity() (*common.ComputeResources, error) {
func (clusterUtilisationService *ClusterUtilisationService) GetAvailableClusterCapacity() (*common.ComputeResources, []map[string]string, error) {
processingNodes, err := clusterUtilisationService.getAllAvailableProcessingNodes()
if err != nil {
return new(common.ComputeResources), fmt.Errorf("Failed getting available cluster capacity due to: %s", err)
return new(common.ComputeResources), nil, fmt.Errorf("Failed getting available cluster capacity due to: %s", err)
}

allPods, err := clusterUtilisationService.clusterContext.GetAllPods()
if err != nil {
return new(common.ComputeResources), fmt.Errorf("Failed getting available cluster capacity due to: %s", err)
return new(common.ComputeResources), nil, fmt.Errorf("Failed getting available cluster capacity due to: %s", err)
}

allPodsRequiringResource := getAllPodsRequiringResourceOnProcessingNodes(allPods, processingNodes)
Expand All @@ -83,7 +86,9 @@ func (clusterUtilisationService *ClusterUtilisationService) GetAvailableClusterC
availableResource := totalNodeResource.DeepCopy()
availableResource.Sub(totalPodResource)

return &availableResource, nil
availableLabels := getDistinctNodesLabels(clusterUtilisationService.trackedNodeLabels, processingNodes)

return &availableResource, availableLabels, nil
}

func (clusterUtilisationService *ClusterUtilisationService) getAllAvailableProcessingNodes() ([]*v1.Node, error) {
Expand Down Expand Up @@ -200,3 +205,24 @@ func getUsageByQueue(pods []*v1.Pod) map[string]common.ComputeResources {

return utilisationByQueue
}

func getDistinctNodesLabels(labels []string, nodes []*v1.Node) []map[string]string {
result := []map[string]string{}
existing := map[string]bool{}
for _, n := range nodes {
selectedLabels := map[string]string{}
id := ""
for _, key := range labels {
value, ok := n.Labels[key]
if ok {
selectedLabels[key] = value
}
id += "|" + value
}
if !existing[id] {
result = append(result, selectedLabels)
existing[id] = true
}
}
return result
}
25 changes: 25 additions & 0 deletions internal/executor/service/cluster_utilisation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,31 @@ func TestGetUsageByQueue_HandlesEmptyList(t *testing.T) {
assert.Equal(t, len(result), 0)
}

func Test_getDistinctNodesLabels(t *testing.T) {

nodes := []*v1.Node{
{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{
"A": "x",
"B": "x",
}}},
{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{
"A": "x",
"B": "x",
}}},
{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{
"B": "y",
}}},
}
labels := []string{"A", "B"}

result := getDistinctNodesLabels(labels, nodes)

assert.Equal(t, []map[string]string{
{"A": "x", "B": "x"},
{"B": "y"},
}, result)
}

func hasKey(value map[string]common.ComputeResources, key string) bool {
_, ok := value[key]
return ok
Expand Down
Loading

0 comments on commit e642796

Please sign in to comment.