From e6427964ba28ac87bbf479bb8da0c2eaa4155ab6 Mon Sep 17 00:00:00 2001 From: jankaspar <2270833+jankaspar@users.noreply.github.com> Date: Tue, 17 Dec 2019 16:05:33 +0000 Subject: [PATCH] Add support for targeting node labels (#283) * Add support for targetting node labels fixes #280 * Add test for label matching. --- .../DotNet/Armada.Client/ClientGenerated.cs | 6 + internal/armada/api/api.swagger.go | 12 + internal/armada/api/api.swagger.json | 12 + internal/armada/api/queue.pb.go | 627 ++++++++++++++++-- internal/armada/api/queue.proto | 6 + internal/armada/api/submit.pb.go | 261 ++++++-- internal/armada/api/submit.proto | 1 + internal/armada/repository/job.go | 2 + internal/armada/server/lease.go | 37 +- internal/armada/server/lease_test.go | 28 + internal/executor/application.go | 3 +- internal/executor/configuration/types.go | 3 +- .../executor/service/cluster_allocation.go | 4 +- .../executor/service/cluster_utilisation.go | 46 +- .../service/cluster_utilisation_test.go | 25 + internal/executor/service/job_lease.go | 14 +- 16 files changed, 949 insertions(+), 138 deletions(-) create mode 100644 internal/armada/server/lease_test.go diff --git a/client/DotNet/Armada.Client/ClientGenerated.cs b/client/DotNet/Armada.Client/ClientGenerated.cs index 06f93996920..a95c1a1ba54 100644 --- a/client/DotNet/Armada.Client/ClientGenerated.cs +++ b/client/DotNet/Armada.Client/ClientGenerated.cs @@ -531,6 +531,9 @@ public partial class ApiJob [Newtonsoft.Json.JsonProperty("Queue", Required = Newtonsoft.Json.Required.Default, NullValueHandling = Newtonsoft.Json.NullValueHandling.Ignore)] public string Queue { get; set; } + [Newtonsoft.Json.JsonProperty("RequiredNodeLabels", Required = Newtonsoft.Json.Required.Default, NullValueHandling = Newtonsoft.Json.NullValueHandling.Ignore)] + public System.Collections.Generic.IDictionary RequiredNodeLabels { get; set; } + } @@ -801,6 +804,9 @@ public partial class ApiJobSubmitRequestItem [Newtonsoft.Json.JsonProperty("Priority", Required = Newtonsoft.Json.Required.Default, NullValueHandling = Newtonsoft.Json.NullValueHandling.Ignore)] public double? Priority { get; set; } + [Newtonsoft.Json.JsonProperty("RequiredNodeLabels", Required = Newtonsoft.Json.Required.Default, NullValueHandling = Newtonsoft.Json.NullValueHandling.Ignore)] + public System.Collections.Generic.IDictionary RequiredNodeLabels { get; set; } + } diff --git a/internal/armada/api/api.swagger.go b/internal/armada/api/api.swagger.go index e967b443485..250019922c9 100644 --- a/internal/armada/api/api.swagger.go +++ b/internal/armada/api/api.swagger.go @@ -252,6 +252,12 @@ func SwaggerJsonTemplate() string { " },\n" + " \"Queue\": {\n" + " \"type\": \"string\"\n" + + " },\n" + + " \"RequiredNodeLabels\": {\n" + + " \"type\": \"object\",\n" + + " \"additionalProperties\": {\n" + + " \"type\": \"string\"\n" + + " }\n" + " }\n" + " }\n" + " },\n" + @@ -536,6 +542,12 @@ func SwaggerJsonTemplate() string { " \"Priority\": {\n" + " \"type\": \"number\",\n" + " \"format\": \"double\"\n" + + " },\n" + + " \"RequiredNodeLabels\": {\n" + + " \"type\": \"object\",\n" + + " \"additionalProperties\": {\n" + + " \"type\": \"string\"\n" + + " }\n" + " }\n" + " }\n" + " },\n" + diff --git a/internal/armada/api/api.swagger.json b/internal/armada/api/api.swagger.json index 9f383090a92..a2edbef7075 100644 --- a/internal/armada/api/api.swagger.json +++ b/internal/armada/api/api.swagger.json @@ -241,6 +241,12 @@ }, "Queue": { "type": "string" + }, + "RequiredNodeLabels": { + "type": "object", + "additionalProperties": { + "type": "string" + } } } }, @@ -525,6 +531,12 @@ "Priority": { "type": "number", "format": "double" + }, + "RequiredNodeLabels": { + "type": "object", + "additionalProperties": { + "type": "string" + } } } }, diff --git a/internal/armada/api/queue.pb.go b/internal/armada/api/queue.pb.go index 038cf79a45e..25e862ad974 100644 --- a/internal/armada/api/queue.pb.go +++ b/internal/armada/api/queue.pb.go @@ -33,16 +33,17 @@ var _ = time.Kitchen const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package type Job struct { - Id string `protobuf:"bytes,1,opt,name=Id,proto3" json:"Id,omitempty"` - JobSetId string `protobuf:"bytes,2,opt,name=JobSetId,proto3" json:"JobSetId,omitempty"` - Queue string `protobuf:"bytes,3,opt,name=Queue,proto3" json:"Queue,omitempty"` - Namespace string `protobuf:"bytes,7,opt,name=Namespace,proto3" json:"Namespace,omitempty"` - Labels map[string]string `protobuf:"bytes,9,rep,name=Labels,proto3" json:"Labels,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` - Annotations map[string]string `protobuf:"bytes,10,rep,name=Annotations,proto3" json:"Annotations,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` - Owner string `protobuf:"bytes,8,opt,name=Owner,proto3" json:"Owner,omitempty"` - Priority float64 `protobuf:"fixed64,4,opt,name=Priority,proto3" json:"Priority,omitempty"` - PodSpec *v1.PodSpec `protobuf:"bytes,5,opt,name=PodSpec,proto3" json:"PodSpec,omitempty"` - Created time.Time `protobuf:"bytes,6,opt,name=Created,proto3,stdtime" json:"Created"` + Id string `protobuf:"bytes,1,opt,name=Id,proto3" json:"Id,omitempty"` + JobSetId string `protobuf:"bytes,2,opt,name=JobSetId,proto3" json:"JobSetId,omitempty"` + Queue string `protobuf:"bytes,3,opt,name=Queue,proto3" json:"Queue,omitempty"` + Namespace string `protobuf:"bytes,7,opt,name=Namespace,proto3" json:"Namespace,omitempty"` + Labels map[string]string `protobuf:"bytes,9,rep,name=Labels,proto3" json:"Labels,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` + Annotations map[string]string `protobuf:"bytes,10,rep,name=Annotations,proto3" json:"Annotations,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` + RequiredNodeLabels map[string]string `protobuf:"bytes,11,rep,name=RequiredNodeLabels,proto3" json:"RequiredNodeLabels,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` + Owner string `protobuf:"bytes,8,opt,name=Owner,proto3" json:"Owner,omitempty"` + Priority float64 `protobuf:"fixed64,4,opt,name=Priority,proto3" json:"Priority,omitempty"` + PodSpec *v1.PodSpec `protobuf:"bytes,5,opt,name=PodSpec,proto3" json:"PodSpec,omitempty"` + Created time.Time `protobuf:"bytes,6,opt,name=Created,proto3,stdtime" json:"Created"` } func (m *Job) Reset() { *m = Job{} } @@ -120,6 +121,13 @@ func (m *Job) GetAnnotations() map[string]string { return nil } +func (m *Job) GetRequiredNodeLabels() map[string]string { + if m != nil { + return m.RequiredNodeLabels + } + return nil +} + func (m *Job) GetOwner() string { if m != nil { return m.Owner @@ -149,8 +157,9 @@ func (m *Job) GetCreated() time.Time { } type LeaseRequest struct { - ClusterId string `protobuf:"bytes,1,opt,name=ClusterId,proto3" json:"ClusterId,omitempty"` - Resources map[string]resource.Quantity `protobuf:"bytes,2,rep,name=Resources,proto3" json:"Resources" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` + ClusterId string `protobuf:"bytes,1,opt,name=ClusterId,proto3" json:"ClusterId,omitempty"` + Resources map[string]resource.Quantity `protobuf:"bytes,2,rep,name=Resources,proto3" json:"Resources" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` + AvailableLabels []*NodeLabeling `protobuf:"bytes,3,rep,name=AvailableLabels,proto3" json:"AvailableLabels,omitempty"` } func (m *LeaseRequest) Reset() { *m = LeaseRequest{} } @@ -200,6 +209,57 @@ func (m *LeaseRequest) GetResources() map[string]resource.Quantity { return nil } +func (m *LeaseRequest) GetAvailableLabels() []*NodeLabeling { + if m != nil { + return m.AvailableLabels + } + return nil +} + +type NodeLabeling struct { + Labels map[string]string `protobuf:"bytes,3,rep,name=Labels,proto3" json:"Labels,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` +} + +func (m *NodeLabeling) Reset() { *m = NodeLabeling{} } +func (m *NodeLabeling) String() string { return proto.CompactTextString(m) } +func (*NodeLabeling) ProtoMessage() {} +func (*NodeLabeling) Descriptor() ([]byte, []int) { + return fileDescriptor_d29cc2425c808266, []int{2} +} +func (m *NodeLabeling) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *NodeLabeling) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_NodeLabeling.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalTo(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *NodeLabeling) XXX_Merge(src proto.Message) { + xxx_messageInfo_NodeLabeling.Merge(m, src) +} +func (m *NodeLabeling) XXX_Size() int { + return m.Size() +} +func (m *NodeLabeling) XXX_DiscardUnknown() { + xxx_messageInfo_NodeLabeling.DiscardUnknown(m) +} + +var xxx_messageInfo_NodeLabeling proto.InternalMessageInfo + +func (m *NodeLabeling) GetLabels() map[string]string { + if m != nil { + return m.Labels + } + return nil +} + type JobLease struct { Job []*Job `protobuf:"bytes,1,rep,name=Job,proto3" json:"Job,omitempty"` } @@ -208,7 +268,7 @@ func (m *JobLease) Reset() { *m = JobLease{} } func (m *JobLease) String() string { return proto.CompactTextString(m) } func (*JobLease) ProtoMessage() {} func (*JobLease) Descriptor() ([]byte, []int) { - return fileDescriptor_d29cc2425c808266, []int{2} + return fileDescriptor_d29cc2425c808266, []int{3} } func (m *JobLease) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -252,7 +312,7 @@ func (m *IdList) Reset() { *m = IdList{} } func (m *IdList) String() string { return proto.CompactTextString(m) } func (*IdList) ProtoMessage() {} func (*IdList) Descriptor() ([]byte, []int) { - return fileDescriptor_d29cc2425c808266, []int{3} + return fileDescriptor_d29cc2425c808266, []int{4} } func (m *IdList) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -297,7 +357,7 @@ func (m *RenewLeaseRequest) Reset() { *m = RenewLeaseRequest{} } func (m *RenewLeaseRequest) String() string { return proto.CompactTextString(m) } func (*RenewLeaseRequest) ProtoMessage() {} func (*RenewLeaseRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_d29cc2425c808266, []int{4} + return fileDescriptor_d29cc2425c808266, []int{5} } func (m *RenewLeaseRequest) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -349,7 +409,7 @@ func (m *ReturnLeaseRequest) Reset() { *m = ReturnLeaseRequest{} } func (m *ReturnLeaseRequest) String() string { return proto.CompactTextString(m) } func (*ReturnLeaseRequest) ProtoMessage() {} func (*ReturnLeaseRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_d29cc2425c808266, []int{5} + return fileDescriptor_d29cc2425c808266, []int{6} } func (m *ReturnLeaseRequest) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -396,8 +456,11 @@ func init() { proto.RegisterType((*Job)(nil), "api.Job") proto.RegisterMapType((map[string]string)(nil), "api.Job.AnnotationsEntry") proto.RegisterMapType((map[string]string)(nil), "api.Job.LabelsEntry") + proto.RegisterMapType((map[string]string)(nil), "api.Job.RequiredNodeLabelsEntry") proto.RegisterType((*LeaseRequest)(nil), "api.LeaseRequest") proto.RegisterMapType((map[string]resource.Quantity)(nil), "api.LeaseRequest.ResourcesEntry") + proto.RegisterType((*NodeLabeling)(nil), "api.NodeLabeling") + proto.RegisterMapType((map[string]string)(nil), "api.NodeLabeling.LabelsEntry") proto.RegisterType((*JobLease)(nil), "api.JobLease") proto.RegisterType((*IdList)(nil), "api.IdList") proto.RegisterType((*RenewLeaseRequest)(nil), "api.RenewLeaseRequest") @@ -407,51 +470,57 @@ func init() { func init() { proto.RegisterFile("internal/armada/api/queue.proto", fileDescriptor_d29cc2425c808266) } var fileDescriptor_d29cc2425c808266 = []byte{ - // 703 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x54, 0xdd, 0x4e, 0xdb, 0x4c, - 0x10, 0x8d, 0x13, 0x08, 0xc9, 0xe6, 0xfb, 0x28, 0xac, 0x10, 0x75, 0x4d, 0x95, 0x44, 0xb9, 0x40, - 0xb9, 0x28, 0x6b, 0x91, 0xb6, 0x12, 0x6d, 0x25, 0x24, 0xfe, 0xa4, 0x26, 0x42, 0x2d, 0x98, 0xbe, - 0xc0, 0x3a, 0x9e, 0x1a, 0x2b, 0xb1, 0xd7, 0xac, 0xd7, 0xa0, 0xbc, 0x05, 0x8f, 0xc5, 0x25, 0x97, - 0x95, 0x2a, 0xb5, 0x08, 0x5e, 0xa1, 0x0f, 0x50, 0xed, 0xfa, 0x17, 0xd2, 0x1b, 0xee, 0x3c, 0xb3, - 0xe7, 0xcc, 0x9e, 0x99, 0x33, 0x6b, 0xd4, 0xf1, 0x02, 0x01, 0x3c, 0xa0, 0x53, 0x93, 0x72, 0x9f, - 0x3a, 0xd4, 0xa4, 0xa1, 0x67, 0x5e, 0xc4, 0x10, 0x03, 0x09, 0x39, 0x13, 0x0c, 0xd7, 0x68, 0xe8, - 0x19, 0x1d, 0x97, 0x31, 0x77, 0x0a, 0xa6, 0x4a, 0xd9, 0xf1, 0x77, 0x53, 0x78, 0x3e, 0x44, 0x82, - 0xfa, 0x61, 0x82, 0x32, 0x7a, 0x93, 0x9d, 0x88, 0x78, 0x4c, 0xb1, 0xc7, 0x8c, 0x83, 0x79, 0xb9, - 0x6d, 0xba, 0x10, 0x00, 0xa7, 0x02, 0x9c, 0x14, 0xf3, 0xae, 0xc0, 0xf8, 0x74, 0x7c, 0xee, 0x05, - 0xc0, 0x67, 0x66, 0x38, 0x71, 0x15, 0x89, 0x43, 0xc4, 0x62, 0x3e, 0x86, 0x39, 0xd6, 0x96, 0xeb, - 0x89, 0xf3, 0xd8, 0x26, 0x63, 0xe6, 0x9b, 0x2e, 0x73, 0x59, 0xa1, 0x41, 0x46, 0x2a, 0x50, 0x5f, - 0x29, 0x7c, 0xe3, 0xa9, 0x52, 0xf0, 0x43, 0x31, 0x4b, 0x0e, 0x7b, 0x7f, 0x6a, 0xa8, 0x36, 0x62, - 0x36, 0x5e, 0x46, 0xd5, 0xa1, 0xa3, 0x6b, 0x5d, 0xad, 0xdf, 0xb4, 0xaa, 0x43, 0x07, 0x1b, 0xa8, - 0x31, 0x62, 0xf6, 0x19, 0x88, 0xa1, 0xa3, 0x57, 0x55, 0x36, 0x8f, 0xf1, 0x1a, 0x5a, 0x3c, 0x95, - 0xe3, 0xd0, 0x6b, 0xea, 0x20, 0x09, 0xf0, 0x6b, 0xd4, 0xfc, 0x42, 0x7d, 0x88, 0x42, 0x3a, 0x06, - 0x7d, 0x49, 0x9d, 0x14, 0x09, 0xfc, 0x06, 0xd5, 0x8f, 0xa9, 0x0d, 0xd3, 0x48, 0x6f, 0x76, 0x6b, - 0xfd, 0xd6, 0x60, 0x8d, 0xd0, 0xd0, 0x23, 0x23, 0x66, 0x93, 0x24, 0x7d, 0x14, 0x08, 0x3e, 0xb3, - 0x52, 0x0c, 0xfe, 0x84, 0x5a, 0x7b, 0x41, 0xc0, 0x04, 0x15, 0x1e, 0x0b, 0x22, 0x1d, 0x29, 0xca, - 0xab, 0x9c, 0x52, 0x3a, 0x4b, 0x78, 0x65, 0xb4, 0x94, 0xf7, 0xf5, 0x2a, 0x00, 0xae, 0x37, 0x12, - 0x79, 0x2a, 0x90, 0x0d, 0x9d, 0x70, 0x8f, 0x71, 0x4f, 0xcc, 0xf4, 0x85, 0xae, 0xd6, 0xd7, 0xac, - 0x3c, 0xc6, 0xef, 0xd1, 0xd2, 0x09, 0x73, 0xce, 0x42, 0x18, 0xeb, 0x8b, 0x5d, 0xad, 0xdf, 0x1a, - 0x6c, 0x90, 0xc4, 0x18, 0x75, 0xa3, 0x34, 0x8f, 0x5c, 0x6e, 0x93, 0x14, 0x62, 0x65, 0x58, 0xbc, - 0x8b, 0x96, 0x0e, 0x38, 0x48, 0x63, 0xf4, 0xba, 0xa2, 0x19, 0x24, 0x19, 0x35, 0xc9, 0x46, 0x4d, - 0xbe, 0x65, 0x4b, 0xb1, 0xdf, 0xb8, 0xf9, 0xd5, 0xa9, 0x5c, 0xff, 0xee, 0x68, 0x56, 0x46, 0x32, - 0x3e, 0xa0, 0x56, 0xa9, 0x79, 0xbc, 0x82, 0x6a, 0x13, 0x98, 0xa5, 0x1e, 0xc8, 0x4f, 0xd9, 0xc9, - 0x25, 0x9d, 0xc6, 0x90, 0x3a, 0x90, 0x04, 0x1f, 0xab, 0x3b, 0x9a, 0xb1, 0x8b, 0x56, 0x9e, 0x0e, - 0xe1, 0x39, 0xfc, 0xde, 0x9d, 0x86, 0xfe, 0x3b, 0x06, 0x1a, 0x81, 0x05, 0x17, 0x31, 0x44, 0x42, - 0xba, 0x77, 0x30, 0x8d, 0x23, 0x01, 0x3c, 0x5f, 0x83, 0x22, 0x81, 0x0f, 0x51, 0xd3, 0x4a, 0xb7, - 0x31, 0xd2, 0xab, 0xca, 0x8d, 0xae, 0x9a, 0x4d, 0xb9, 0x06, 0xc9, 0x21, 0x4a, 0xcf, 0xfe, 0x82, - 0xec, 0xd8, 0x2a, 0x88, 0xc6, 0x14, 0x2d, 0x3f, 0x86, 0xfc, 0x43, 0xf2, 0x61, 0x59, 0x72, 0x6b, - 0x40, 0x4a, 0x46, 0xe4, 0x2f, 0x84, 0x84, 0x13, 0x57, 0xdd, 0x9e, 0xbd, 0x10, 0x72, 0x1a, 0xd3, - 0x40, 0x78, 0x62, 0x56, 0x6e, 0x71, 0x53, 0x6d, 0xb0, 0x12, 0x88, 0x0d, 0xb5, 0xe4, 0xba, 0xa6, - 0x94, 0x37, 0xb2, 0x3d, 0xb2, 0x64, 0xb2, 0x67, 0xa0, 0xfa, 0xd0, 0x39, 0xf6, 0x22, 0x21, 0xd5, - 0x0c, 0x9d, 0x48, 0xa1, 0x9a, 0x96, 0xfc, 0xec, 0x1d, 0xa0, 0x55, 0x0b, 0x02, 0xb8, 0x7a, 0xc6, - 0xa8, 0xd2, 0x22, 0xd5, 0xa2, 0xc8, 0x67, 0x84, 0x2d, 0x10, 0x31, 0x0f, 0x9e, 0x51, 0x65, 0x0d, - 0x2d, 0x8e, 0x98, 0x9d, 0xbf, 0xbd, 0x24, 0x18, 0xfc, 0xd4, 0xd0, 0x8b, 0x3d, 0xd7, 0xe5, 0xe0, - 0xca, 0xfd, 0x49, 0x9e, 0xdd, 0x16, 0x6a, 0xaa, 0xba, 0x23, 0x66, 0x47, 0x78, 0x75, 0xce, 0x14, - 0xe3, 0xff, 0xac, 0xdb, 0x64, 0x12, 0xdb, 0x08, 0x15, 0x1d, 0xe1, 0x75, 0x75, 0x38, 0xd7, 0xa2, - 0xd1, 0x52, 0xf9, 0x74, 0x2c, 0xbb, 0xa8, 0x55, 0xd2, 0x8f, 0x5f, 0xa6, 0x9c, 0xa7, 0x1d, 0x19, - 0xeb, 0x73, 0xdb, 0x7f, 0x24, 0x7f, 0x34, 0x78, 0x53, 0x5e, 0x19, 0x32, 0x2e, 0x0e, 0x59, 0x00, - 0xb8, 0x5c, 0xfa, 0xd1, 0x3d, 0xfb, 0xfa, 0xcd, 0x7d, 0x5b, 0xbb, 0xbd, 0x6f, 0x6b, 0x77, 0xf7, - 0x6d, 0xed, 0xfa, 0xa1, 0x5d, 0xb9, 0x7d, 0x68, 0x57, 0x7e, 0x3c, 0xb4, 0x2b, 0x76, 0x5d, 0x55, - 0x7c, 0xfb, 0x37, 0x00, 0x00, 0xff, 0xff, 0x3f, 0x65, 0x03, 0x5b, 0x9a, 0x05, 0x00, 0x00, + // 787 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x54, 0xdd, 0x6e, 0xdb, 0x36, + 0x14, 0xb6, 0xec, 0xda, 0xb1, 0xa9, 0xae, 0x6d, 0x88, 0xa0, 0xd5, 0xd4, 0xcd, 0x36, 0x7c, 0x51, + 0xf8, 0x62, 0xa5, 0x10, 0x6f, 0x05, 0xba, 0x15, 0x08, 0x90, 0x3f, 0x60, 0x36, 0x82, 0x2e, 0x55, + 0xf7, 0x02, 0x94, 0x75, 0xa6, 0x10, 0x96, 0x45, 0x85, 0xa2, 0x1c, 0xf8, 0x6e, 0x2f, 0x30, 0x20, + 0x2f, 0xb0, 0xf7, 0xc9, 0x65, 0x2e, 0x07, 0x0c, 0xd8, 0x86, 0xe4, 0x45, 0x06, 0x52, 0x3f, 0x56, + 0xec, 0xec, 0xc2, 0xd8, 0x1d, 0xcf, 0xe1, 0xf7, 0x7d, 0x3c, 0xbf, 0x44, 0x3d, 0x16, 0x49, 0x10, + 0x11, 0x0d, 0x1d, 0x2a, 0xe6, 0xd4, 0xa7, 0x0e, 0x8d, 0x99, 0x73, 0x99, 0x42, 0x0a, 0x24, 0x16, + 0x5c, 0x72, 0xdc, 0xa0, 0x31, 0xb3, 0x7b, 0x01, 0xe7, 0x41, 0x08, 0x8e, 0x76, 0x79, 0xe9, 0x2f, + 0x8e, 0x64, 0x73, 0x48, 0x24, 0x9d, 0xc7, 0x19, 0xca, 0x1e, 0xcc, 0xde, 0x27, 0x84, 0x71, 0xcd, + 0x9e, 0x72, 0x01, 0xce, 0x62, 0xdf, 0x09, 0x20, 0x02, 0x41, 0x25, 0xf8, 0x39, 0xe6, 0xbb, 0x15, + 0x66, 0x4e, 0xa7, 0x17, 0x2c, 0x02, 0xb1, 0x74, 0xe2, 0x59, 0xa0, 0x49, 0x02, 0x12, 0x9e, 0x8a, + 0x29, 0x6c, 0xb0, 0xde, 0x06, 0x4c, 0x5e, 0xa4, 0x1e, 0x99, 0xf2, 0xb9, 0x13, 0xf0, 0x80, 0xaf, + 0x62, 0x50, 0x96, 0x36, 0xf4, 0x29, 0x87, 0xbf, 0x5e, 0x8f, 0x14, 0xe6, 0xb1, 0x5c, 0x66, 0x97, + 0x83, 0xdf, 0x9a, 0xa8, 0x31, 0xe1, 0x1e, 0x7e, 0x86, 0xea, 0x63, 0xdf, 0x32, 0xfa, 0xc6, 0xb0, + 0xe3, 0xd6, 0xc7, 0x3e, 0xb6, 0x51, 0x7b, 0xc2, 0xbd, 0xcf, 0x20, 0xc7, 0xbe, 0x55, 0xd7, 0xde, + 0xd2, 0xc6, 0x7b, 0xa8, 0xf9, 0x49, 0x95, 0xc3, 0x6a, 0xe8, 0x8b, 0xcc, 0xc0, 0x5f, 0xa1, 0xce, + 0x47, 0x3a, 0x87, 0x24, 0xa6, 0x53, 0xb0, 0x76, 0xf4, 0xcd, 0xca, 0x81, 0xbf, 0x41, 0xad, 0x33, + 0xea, 0x41, 0x98, 0x58, 0x9d, 0x7e, 0x63, 0x68, 0x8e, 0xf6, 0x08, 0x8d, 0x19, 0x99, 0x70, 0x8f, + 0x64, 0xee, 0xd3, 0x48, 0x8a, 0xa5, 0x9b, 0x63, 0xf0, 0x07, 0x64, 0x1e, 0x46, 0x11, 0x97, 0x54, + 0x32, 0x1e, 0x25, 0x16, 0xd2, 0x94, 0x2f, 0x4b, 0x4a, 0xe5, 0x2e, 0xe3, 0x55, 0xd1, 0xf8, 0x1c, + 0x61, 0x17, 0x2e, 0x53, 0x26, 0xc0, 0xff, 0xc8, 0x7d, 0xc8, 0x9f, 0x35, 0xb5, 0x46, 0xbf, 0xd4, + 0xd8, 0x84, 0x64, 0x52, 0x8f, 0x70, 0x55, 0xc2, 0x3f, 0x5d, 0x45, 0x20, 0xac, 0x76, 0x96, 0xb0, + 0x36, 0x54, 0x89, 0xce, 0x05, 0xe3, 0x82, 0xc9, 0xa5, 0xf5, 0xa4, 0x6f, 0x0c, 0x0d, 0xb7, 0xb4, + 0xf1, 0x3b, 0xb4, 0x73, 0xce, 0xfd, 0xcf, 0x31, 0x4c, 0xad, 0x66, 0xdf, 0x18, 0x9a, 0xa3, 0xd7, + 0x24, 0x6b, 0xb5, 0x7e, 0x5f, 0x8d, 0x03, 0x59, 0xec, 0x93, 0x1c, 0xe2, 0x16, 0x58, 0x7c, 0x80, + 0x76, 0x8e, 0x05, 0xa8, 0x56, 0x5b, 0x2d, 0x4d, 0xb3, 0x49, 0xd6, 0x3c, 0x52, 0x34, 0x8f, 0xfc, + 0x5c, 0x8c, 0xd9, 0x51, 0xfb, 0xe6, 0xaf, 0x5e, 0xed, 0xfa, 0xef, 0x9e, 0xe1, 0x16, 0x24, 0xfb, + 0x7b, 0x64, 0x56, 0x72, 0xc1, 0x2f, 0x50, 0x63, 0x06, 0xcb, 0xbc, 0xab, 0xea, 0xa8, 0x32, 0x59, + 0xd0, 0x30, 0x85, 0xbc, 0xa7, 0x99, 0xf1, 0x43, 0xfd, 0xbd, 0x61, 0x1f, 0xa0, 0x17, 0xeb, 0x65, + 0xdd, 0x8a, 0x7f, 0x8a, 0x5e, 0xfd, 0x47, 0x49, 0xb7, 0x91, 0x19, 0xfc, 0x5e, 0x47, 0x4f, 0xcf, + 0x80, 0x26, 0xa0, 0xc4, 0x20, 0x91, 0x6a, 0xac, 0x8e, 0xc3, 0x34, 0x91, 0x20, 0xca, 0xf9, 0x5c, + 0x39, 0xf0, 0x09, 0xea, 0xb8, 0xf9, 0x9a, 0x24, 0x56, 0xbd, 0xd2, 0xe2, 0xaa, 0x06, 0x29, 0x21, + 0x3a, 0x9e, 0xa3, 0x27, 0xaa, 0x70, 0xee, 0x8a, 0x88, 0x3f, 0xa0, 0xe7, 0x87, 0x0b, 0xca, 0x42, + 0xea, 0x85, 0xc5, 0xb8, 0x34, 0xb4, 0xd6, 0xae, 0xd6, 0x2a, 0xf3, 0x61, 0x51, 0xe0, 0xae, 0x23, + 0xed, 0x10, 0x3d, 0x7b, 0xa8, 0xff, 0x48, 0xbe, 0x27, 0xd5, 0x7c, 0xcd, 0x11, 0xa9, 0x0c, 0x43, + 0xb9, 0xf7, 0x24, 0x9e, 0x05, 0xfa, 0xb9, 0x62, 0xef, 0xc9, 0xa7, 0x94, 0x46, 0x92, 0xc9, 0x65, + 0xb5, 0x3e, 0xbf, 0x1a, 0xe8, 0x69, 0x35, 0x1e, 0xfc, 0xae, 0x5c, 0xac, 0x2c, 0xe4, 0xaf, 0x37, + 0x42, 0x7e, 0x6c, 0xc3, 0xfe, 0xc7, 0xa4, 0x0c, 0xde, 0xe8, 0xaf, 0x41, 0x17, 0x18, 0xdb, 0xfa, + 0xf7, 0xb0, 0x0c, 0xfd, 0x74, 0xbb, 0x58, 0x2e, 0x57, 0x39, 0x07, 0x36, 0x6a, 0x8d, 0xfd, 0x33, + 0x96, 0x48, 0xa5, 0x3e, 0xf6, 0x13, 0x8d, 0xea, 0xb8, 0xea, 0x38, 0x38, 0x46, 0xbb, 0x2e, 0x44, + 0x70, 0xb5, 0x45, 0xab, 0x73, 0x91, 0xfa, 0x4a, 0xe4, 0x47, 0xb5, 0xe8, 0x32, 0x15, 0xd1, 0x16, + 0x2a, 0x7b, 0xa8, 0x39, 0xe1, 0x5e, 0xf9, 0xa9, 0x65, 0xc6, 0xe8, 0x4f, 0x03, 0x3d, 0x3f, 0x0c, + 0x02, 0x01, 0x81, 0x5a, 0xa3, 0xec, 0x3f, 0x7b, 0x8b, 0x3a, 0x5a, 0x77, 0xc2, 0xbd, 0x04, 0xef, + 0x6e, 0x0c, 0x95, 0xfd, 0x45, 0x91, 0x6d, 0x56, 0x89, 0x7d, 0x84, 0x56, 0x19, 0xe1, 0x97, 0xfa, + 0x72, 0x23, 0x45, 0xdb, 0xd4, 0xfe, 0xbc, 0x2c, 0x07, 0xc8, 0xac, 0xc4, 0x8f, 0x5f, 0xe5, 0x9c, + 0xf5, 0x8c, 0xec, 0x97, 0x1b, 0x9f, 0xc0, 0xa9, 0xfa, 0xc1, 0xf1, 0x1b, 0xf5, 0x64, 0xcc, 0x85, + 0x3c, 0xe1, 0x11, 0xe0, 0xaa, 0xf4, 0x83, 0x77, 0x8e, 0xac, 0x9b, 0xbb, 0xae, 0x71, 0x7b, 0xd7, + 0x35, 0xfe, 0xb9, 0xeb, 0x1a, 0xd7, 0xf7, 0xdd, 0xda, 0xed, 0x7d, 0xb7, 0xf6, 0xc7, 0x7d, 0xb7, + 0xe6, 0xb5, 0xb4, 0xe2, 0xb7, 0xff, 0x06, 0x00, 0x00, 0xff, 0xff, 0xd6, 0xbd, 0xb6, 0x8d, 0xf3, + 0x06, 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. @@ -728,6 +797,23 @@ func (m *Job) MarshalTo(dAtA []byte) (int, error) { i += copy(dAtA[i:], v) } } + if len(m.RequiredNodeLabels) > 0 { + for k, _ := range m.RequiredNodeLabels { + dAtA[i] = 0x5a + i++ + v := m.RequiredNodeLabels[k] + mapSize := 1 + len(k) + sovQueue(uint64(len(k))) + 1 + len(v) + sovQueue(uint64(len(v))) + i = encodeVarintQueue(dAtA, i, uint64(mapSize)) + dAtA[i] = 0xa + i++ + i = encodeVarintQueue(dAtA, i, uint64(len(k))) + i += copy(dAtA[i:], k) + dAtA[i] = 0x12 + i++ + i = encodeVarintQueue(dAtA, i, uint64(len(v))) + i += copy(dAtA[i:], v) + } + } return i, nil } @@ -778,6 +864,53 @@ func (m *LeaseRequest) MarshalTo(dAtA []byte) (int, error) { i += n3 } } + if len(m.AvailableLabels) > 0 { + for _, msg := range m.AvailableLabels { + dAtA[i] = 0x1a + i++ + i = encodeVarintQueue(dAtA, i, uint64(msg.Size())) + n, err := msg.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n + } + } + return i, nil +} + +func (m *NodeLabeling) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *NodeLabeling) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if len(m.Labels) > 0 { + for k, _ := range m.Labels { + dAtA[i] = 0x1a + i++ + v := m.Labels[k] + mapSize := 1 + len(k) + sovQueue(uint64(len(k))) + 1 + len(v) + sovQueue(uint64(len(v))) + i = encodeVarintQueue(dAtA, i, uint64(mapSize)) + dAtA[i] = 0xa + i++ + i = encodeVarintQueue(dAtA, i, uint64(len(k))) + i += copy(dAtA[i:], k) + dAtA[i] = 0x12 + i++ + i = encodeVarintQueue(dAtA, i, uint64(len(v))) + i += copy(dAtA[i:], v) + } + } return i, nil } @@ -973,6 +1106,14 @@ func (m *Job) Size() (n int) { n += mapEntrySize + 1 + sovQueue(uint64(mapEntrySize)) } } + if len(m.RequiredNodeLabels) > 0 { + for k, v := range m.RequiredNodeLabels { + _ = k + _ = v + mapEntrySize := 1 + len(k) + sovQueue(uint64(len(k))) + 1 + len(v) + sovQueue(uint64(len(v))) + n += mapEntrySize + 1 + sovQueue(uint64(mapEntrySize)) + } + } return n } @@ -995,6 +1136,29 @@ func (m *LeaseRequest) Size() (n int) { n += mapEntrySize + 1 + sovQueue(uint64(mapEntrySize)) } } + if len(m.AvailableLabels) > 0 { + for _, e := range m.AvailableLabels { + l = e.Size() + n += 1 + l + sovQueue(uint64(l)) + } + } + return n +} + +func (m *NodeLabeling) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if len(m.Labels) > 0 { + for k, v := range m.Labels { + _ = k + _ = v + mapEntrySize := 1 + len(k) + sovQueue(uint64(len(k))) + 1 + len(v) + sovQueue(uint64(len(v))) + n += mapEntrySize + 1 + sovQueue(uint64(mapEntrySize)) + } + } return n } @@ -1600,6 +1764,133 @@ func (m *Job) Unmarshal(dAtA []byte) error { } m.Annotations[mapkey] = mapvalue iNdEx = postIndex + case 11: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field RequiredNodeLabels", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQueue + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthQueue + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthQueue + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.RequiredNodeLabels == nil { + m.RequiredNodeLabels = make(map[string]string) + } + var mapkey string + var mapvalue string + for iNdEx < postIndex { + entryPreIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQueue + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + if fieldNum == 1 { + var stringLenmapkey uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQueue + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLenmapkey |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLenmapkey := int(stringLenmapkey) + if intStringLenmapkey < 0 { + return ErrInvalidLengthQueue + } + postStringIndexmapkey := iNdEx + intStringLenmapkey + if postStringIndexmapkey < 0 { + return ErrInvalidLengthQueue + } + if postStringIndexmapkey > l { + return io.ErrUnexpectedEOF + } + mapkey = string(dAtA[iNdEx:postStringIndexmapkey]) + iNdEx = postStringIndexmapkey + } else if fieldNum == 2 { + var stringLenmapvalue uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQueue + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLenmapvalue |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLenmapvalue := int(stringLenmapvalue) + if intStringLenmapvalue < 0 { + return ErrInvalidLengthQueue + } + postStringIndexmapvalue := iNdEx + intStringLenmapvalue + if postStringIndexmapvalue < 0 { + return ErrInvalidLengthQueue + } + if postStringIndexmapvalue > l { + return io.ErrUnexpectedEOF + } + mapvalue = string(dAtA[iNdEx:postStringIndexmapvalue]) + iNdEx = postStringIndexmapvalue + } else { + iNdEx = entryPreIndex + skippy, err := skipQueue(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthQueue + } + if (iNdEx + skippy) > postIndex { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + m.RequiredNodeLabels[mapkey] = mapvalue + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipQueue(dAtA[iNdEx:]) @@ -1814,6 +2105,220 @@ func (m *LeaseRequest) Unmarshal(dAtA []byte) error { } m.Resources[mapkey] = *mapvalue iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field AvailableLabels", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQueue + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthQueue + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthQueue + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.AvailableLabels = append(m.AvailableLabels, &NodeLabeling{}) + if err := m.AvailableLabels[len(m.AvailableLabels)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipQueue(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthQueue + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthQueue + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *NodeLabeling) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQueue + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: NodeLabeling: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: NodeLabeling: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Labels", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQueue + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthQueue + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthQueue + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Labels == nil { + m.Labels = make(map[string]string) + } + var mapkey string + var mapvalue string + for iNdEx < postIndex { + entryPreIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQueue + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + if fieldNum == 1 { + var stringLenmapkey uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQueue + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLenmapkey |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLenmapkey := int(stringLenmapkey) + if intStringLenmapkey < 0 { + return ErrInvalidLengthQueue + } + postStringIndexmapkey := iNdEx + intStringLenmapkey + if postStringIndexmapkey < 0 { + return ErrInvalidLengthQueue + } + if postStringIndexmapkey > l { + return io.ErrUnexpectedEOF + } + mapkey = string(dAtA[iNdEx:postStringIndexmapkey]) + iNdEx = postStringIndexmapkey + } else if fieldNum == 2 { + var stringLenmapvalue uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQueue + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLenmapvalue |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLenmapvalue := int(stringLenmapvalue) + if intStringLenmapvalue < 0 { + return ErrInvalidLengthQueue + } + postStringIndexmapvalue := iNdEx + intStringLenmapvalue + if postStringIndexmapvalue < 0 { + return ErrInvalidLengthQueue + } + if postStringIndexmapvalue > l { + return io.ErrUnexpectedEOF + } + mapvalue = string(dAtA[iNdEx:postStringIndexmapvalue]) + iNdEx = postStringIndexmapvalue + } else { + iNdEx = entryPreIndex + skippy, err := skipQueue(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthQueue + } + if (iNdEx + skippy) > postIndex { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + m.Labels[mapkey] = mapvalue + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipQueue(dAtA[iNdEx:]) diff --git a/internal/armada/api/queue.proto b/internal/armada/api/queue.proto index 95675b1a017..242fe5ff659 100644 --- a/internal/armada/api/queue.proto +++ b/internal/armada/api/queue.proto @@ -15,6 +15,7 @@ message Job { string Namespace = 7; map Labels = 9; map Annotations = 10; + map RequiredNodeLabels = 11; string Owner = 8; double Priority = 4; k8s.io.api.core.v1.PodSpec PodSpec = 5; @@ -24,6 +25,11 @@ message Job { message LeaseRequest { string ClusterId = 1; map Resources = 2 [(gogoproto.nullable) = false]; + repeated NodeLabeling AvailableLabels = 3; +} + +message NodeLabeling { + map Labels = 3; } message JobLease { diff --git a/internal/armada/api/submit.pb.go b/internal/armada/api/submit.pb.go index 7232dedc9c8..7d76040d2ae 100644 --- a/internal/armada/api/submit.pb.go +++ b/internal/armada/api/submit.pb.go @@ -29,11 +29,12 @@ var _ = math.Inf const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package type JobSubmitRequestItem struct { - Priority float64 `protobuf:"fixed64,1,opt,name=Priority,proto3" json:"Priority,omitempty"` - Namespace string `protobuf:"bytes,3,opt,name=Namespace,proto3" json:"Namespace,omitempty"` - Labels map[string]string `protobuf:"bytes,4,rep,name=Labels,proto3" json:"Labels,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` - Annotations map[string]string `protobuf:"bytes,5,rep,name=Annotations,proto3" json:"Annotations,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` - PodSpec *v1.PodSpec `protobuf:"bytes,2,opt,name=PodSpec,proto3" json:"PodSpec,omitempty"` + Priority float64 `protobuf:"fixed64,1,opt,name=Priority,proto3" json:"Priority,omitempty"` + Namespace string `protobuf:"bytes,3,opt,name=Namespace,proto3" json:"Namespace,omitempty"` + Labels map[string]string `protobuf:"bytes,4,rep,name=Labels,proto3" json:"Labels,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` + Annotations map[string]string `protobuf:"bytes,5,rep,name=Annotations,proto3" json:"Annotations,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` + RequiredNodeLabels map[string]string `protobuf:"bytes,6,rep,name=RequiredNodeLabels,proto3" json:"RequiredNodeLabels,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` + PodSpec *v1.PodSpec `protobuf:"bytes,2,opt,name=PodSpec,proto3" json:"PodSpec,omitempty"` } func (m *JobSubmitRequestItem) Reset() { *m = JobSubmitRequestItem{} } @@ -97,6 +98,13 @@ func (m *JobSubmitRequestItem) GetAnnotations() map[string]string { return nil } +func (m *JobSubmitRequestItem) GetRequiredNodeLabels() map[string]string { + if m != nil { + return m.RequiredNodeLabels + } + return nil +} + func (m *JobSubmitRequestItem) GetPodSpec() *v1.PodSpec { if m != nil { return m.PodSpec @@ -441,6 +449,7 @@ func init() { proto.RegisterType((*JobSubmitRequestItem)(nil), "api.JobSubmitRequestItem") proto.RegisterMapType((map[string]string)(nil), "api.JobSubmitRequestItem.AnnotationsEntry") proto.RegisterMapType((map[string]string)(nil), "api.JobSubmitRequestItem.LabelsEntry") + proto.RegisterMapType((map[string]string)(nil), "api.JobSubmitRequestItem.RequiredNodeLabelsEntry") proto.RegisterType((*JobSubmitRequest)(nil), "api.JobSubmitRequest") proto.RegisterType((*JobCancelRequest)(nil), "api.JobCancelRequest") proto.RegisterType((*JobSubmitResponseItem)(nil), "api.JobSubmitResponseItem") @@ -452,50 +461,52 @@ func init() { func init() { proto.RegisterFile("internal/armada/api/submit.proto", fileDescriptor_83bbfbf574fac779) } var fileDescriptor_83bbfbf574fac779 = []byte{ - // 674 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x54, 0xcf, 0x6b, 0xd4, 0x40, - 0x14, 0xee, 0xec, 0xb6, 0xd5, 0x7d, 0x2b, 0x75, 0x1d, 0xdb, 0x9a, 0xa6, 0x25, 0x2c, 0x01, 0x65, - 0xe9, 0x21, 0xa1, 0x15, 0xa1, 0x16, 0x14, 0x74, 0x69, 0x65, 0x97, 0x52, 0x6b, 0x8a, 0x0a, 0x7a, - 0x9a, 0x6c, 0x9e, 0x25, 0x36, 0x9b, 0x49, 0x27, 0x93, 0x4a, 0x11, 0x2f, 0x82, 0x87, 0x9e, 0x14, - 0xfc, 0xa7, 0x3c, 0x16, 0xbc, 0x78, 0x94, 0xd6, 0x3f, 0x44, 0x32, 0x93, 0x6d, 0xd3, 0xdd, 0xad, - 0xe0, 0x6d, 0xde, 0xcb, 0xf7, 0xbe, 0xf7, 0xbd, 0x1f, 0x79, 0xd0, 0x0c, 0x63, 0x89, 0x22, 0x66, - 0x91, 0xcb, 0x44, 0x9f, 0x05, 0xcc, 0x65, 0x49, 0xe8, 0xa6, 0x99, 0xdf, 0x0f, 0xa5, 0x93, 0x08, - 0x2e, 0x39, 0xad, 0xb2, 0x24, 0x34, 0x17, 0xf7, 0x38, 0xdf, 0x8b, 0xd0, 0x55, 0x2e, 0x3f, 0x7b, - 0xe7, 0x62, 0x3f, 0x91, 0x47, 0x1a, 0x61, 0xda, 0xfb, 0x6b, 0xa9, 0x13, 0x72, 0x15, 0xda, 0xe3, - 0x02, 0xdd, 0xc3, 0x15, 0x77, 0x0f, 0x63, 0x14, 0x4c, 0x62, 0x50, 0x60, 0x96, 0x0a, 0x82, 0x1c, - 0xc3, 0xe2, 0x98, 0x4b, 0x26, 0x43, 0x1e, 0xa7, 0xfa, 0xab, 0xfd, 0xb5, 0x0a, 0xb3, 0x5d, 0xee, - 0xef, 0xaa, 0xbc, 0x1e, 0x1e, 0x64, 0x98, 0xca, 0x8e, 0xc4, 0x3e, 0x35, 0xe1, 0xfa, 0x8e, 0x08, - 0xb9, 0x08, 0xe5, 0x91, 0x41, 0x9a, 0xa4, 0x45, 0xbc, 0x73, 0x9b, 0x2e, 0x41, 0x6d, 0x9b, 0xf5, - 0x31, 0x4d, 0x58, 0x0f, 0x8d, 0x6a, 0x93, 0xb4, 0x6a, 0xde, 0x85, 0x83, 0x3e, 0x82, 0xe9, 0x2d, - 0xe6, 0x63, 0x94, 0x1a, 0x93, 0xcd, 0x6a, 0xab, 0xbe, 0x7a, 0xd7, 0x61, 0x49, 0xe8, 0x8c, 0x4b, - 0xe2, 0x68, 0xdc, 0x46, 0x2c, 0xc5, 0x91, 0x57, 0x04, 0xd1, 0x2d, 0xa8, 0x3f, 0xb9, 0x90, 0x69, - 0x4c, 0x29, 0x8e, 0xe5, 0xab, 0x39, 0x4a, 0x60, 0x4d, 0x54, 0x0e, 0xa7, 0x0f, 0xe0, 0xda, 0x0e, - 0x0f, 0x76, 0x13, 0xec, 0x19, 0x95, 0x26, 0x69, 0xd5, 0x57, 0x17, 0x1d, 0xdd, 0x33, 0x45, 0x98, - 0xf7, 0xcc, 0x39, 0x5c, 0x71, 0x0a, 0x88, 0x37, 0xc0, 0x9a, 0x0f, 0xa1, 0x5e, 0xd2, 0x46, 0x1b, - 0x50, 0xdd, 0x47, 0xdd, 0x87, 0x9a, 0x97, 0x3f, 0xe9, 0x2c, 0x4c, 0x1d, 0xb2, 0x28, 0x43, 0xc5, - 0x5a, 0xf3, 0xb4, 0xb1, 0x5e, 0x59, 0x23, 0xe6, 0x63, 0x68, 0x0c, 0x4b, 0xfa, 0x9f, 0x78, 0xfb, - 0x98, 0x40, 0x63, 0xb8, 0xd0, 0x1c, 0xfe, 0x22, 0xc3, 0x0c, 0x0b, 0x0a, 0x6d, 0xe4, 0x33, 0xca, - 0x91, 0x28, 0x3b, 0x41, 0xc1, 0x73, 0x6e, 0xd3, 0x36, 0xdc, 0xec, 0x72, 0xbf, 0xd4, 0xa8, 0xd4, - 0xa8, 0xaa, 0x56, 0x2e, 0x5c, 0xd9, 0x4a, 0x6f, 0x38, 0xc2, 0x7e, 0xa3, 0xa4, 0xb4, 0x59, 0xdc, - 0xc3, 0xa8, 0x24, 0xa5, 0xcb, 0xfd, 0x4e, 0x30, 0x90, 0xa2, 0x8c, 0x7f, 0x4a, 0x39, 0x17, 0x5f, - 0x2d, 0x89, 0xb7, 0xdb, 0x30, 0x57, 0x12, 0x91, 0x26, 0x3c, 0x4e, 0x51, 0x6d, 0xde, 0xf8, 0x04, - 0xb3, 0x30, 0xb5, 0x21, 0x04, 0x17, 0x83, 0x86, 0x29, 0xc3, 0x7e, 0x0b, 0xb7, 0x46, 0x48, 0xe8, - 0xa6, 0x52, 0x5d, 0xe6, 0x4c, 0x0d, 0xa2, 0x6a, 0x37, 0x87, 0x6b, 0xbf, 0x80, 0x78, 0x23, 0x31, - 0xf6, 0x17, 0x52, 0x08, 0xa7, 0x14, 0x26, 0xf3, 0xfd, 0x2e, 0x14, 0xa9, 0x37, 0xbd, 0x07, 0x33, - 0x83, 0x1f, 0x62, 0x93, 0xf5, 0x64, 0xa1, 0x8c, 0x78, 0x43, 0x5e, 0x6a, 0x01, 0xbc, 0x4c, 0x51, - 0x3c, 0xff, 0x10, 0xa3, 0xd0, 0x33, 0xa8, 0x79, 0x25, 0x0f, 0x6d, 0x42, 0xfd, 0x99, 0xe0, 0x59, - 0x52, 0x00, 0x26, 0x15, 0xa0, 0xec, 0xb2, 0xd7, 0x80, 0xea, 0x11, 0x44, 0x6a, 0xa7, 0x3c, 0x4c, - 0xb3, 0x48, 0x52, 0x1b, 0x6e, 0x14, 0x5e, 0x0c, 0x3a, 0x81, 0xae, 0xb0, 0xe6, 0x5d, 0xf2, 0xad, - 0x1e, 0x57, 0x60, 0x5a, 0x97, 0x4a, 0x5f, 0x01, 0xe8, 0x57, 0x97, 0xfb, 0x29, 0x9d, 0x1b, 0xbb, - 0x04, 0xe6, 0xfc, 0xf8, 0xfe, 0xd8, 0x0b, 0x9f, 0x7f, 0xfe, 0xf9, 0x5e, 0xb9, 0x6d, 0xcf, 0xe4, - 0x07, 0xe6, 0x3d, 0xf7, 0x8b, 0x3b, 0xb5, 0x4e, 0x96, 0xe9, 0x6b, 0x00, 0x9d, 0xf2, 0x32, 0xef, - 0xa5, 0x9d, 0x31, 0xef, 0x28, 0xf7, 0x68, 0x11, 0xa3, 0xc4, 0x3d, 0x85, 0xc9, 0x89, 0xb7, 0xa1, - 0xde, 0x16, 0xc8, 0x24, 0xea, 0x11, 0x80, 0xa2, 0x50, 0x6f, 0x73, 0xde, 0xd1, 0x37, 0xcd, 0x19, - 0x1c, 0x45, 0x67, 0x23, 0x3f, 0x8a, 0xf6, 0xa2, 0x62, 0x9b, 0x33, 0x1b, 0x39, 0xdb, 0x41, 0x0e, - 0x75, 0x3f, 0xe6, 0xb3, 0xfa, 0xb4, 0x4e, 0x96, 0x9f, 0x1a, 0x3f, 0x4e, 0x2d, 0x72, 0x72, 0x6a, - 0x91, 0xdf, 0xa7, 0x16, 0xf9, 0x76, 0x66, 0x4d, 0x9c, 0x9c, 0x59, 0x13, 0xbf, 0xce, 0xac, 0x09, - 0x7f, 0x5a, 0xd1, 0xdc, 0xff, 0x1b, 0x00, 0x00, 0xff, 0xff, 0x69, 0xaa, 0x6d, 0x95, 0x92, 0x05, - 0x00, 0x00, + // 711 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x94, 0x5d, 0x6b, 0x13, 0x4d, + 0x14, 0xc7, 0x3b, 0x49, 0x9a, 0xe7, 0xc9, 0xc9, 0x43, 0x9f, 0x38, 0xf6, 0x65, 0xbb, 0x2d, 0x21, + 0x2c, 0x28, 0xa1, 0x17, 0xbb, 0xb4, 0x22, 0xd4, 0x82, 0x82, 0x86, 0x54, 0x12, 0x4a, 0xad, 0x5b, + 0x54, 0xd0, 0xab, 0xd9, 0xec, 0xb1, 0xac, 0x4d, 0x76, 0xb6, 0xb3, 0xb3, 0x95, 0x22, 0xde, 0x08, + 0x5e, 0xf4, 0x4e, 0xf0, 0xd3, 0xf8, 0x0d, 0xbc, 0x2c, 0x78, 0xe3, 0xa5, 0xb4, 0x7e, 0x10, 0xd9, + 0x99, 0x4d, 0xbb, 0xcd, 0x8b, 0xd0, 0xbb, 0x39, 0x67, 0xff, 0xe7, 0x37, 0xff, 0x33, 0x67, 0x76, + 0xa0, 0x11, 0x84, 0x12, 0x45, 0xc8, 0xfa, 0x0e, 0x13, 0x03, 0xe6, 0x33, 0x87, 0x45, 0x81, 0x13, + 0x27, 0xde, 0x20, 0x90, 0x76, 0x24, 0xb8, 0xe4, 0xb4, 0xc8, 0xa2, 0xc0, 0x5c, 0x39, 0xe0, 0xfc, + 0xa0, 0x8f, 0x8e, 0x4a, 0x79, 0xc9, 0x5b, 0x07, 0x07, 0x91, 0x3c, 0xd1, 0x0a, 0xd3, 0x3a, 0xdc, + 0x8c, 0xed, 0x80, 0xab, 0xd2, 0x1e, 0x17, 0xe8, 0x1c, 0xaf, 0x3b, 0x07, 0x18, 0xa2, 0x60, 0x12, + 0xfd, 0x4c, 0xb3, 0x9a, 0x01, 0x52, 0x0d, 0x0b, 0x43, 0x2e, 0x99, 0x0c, 0x78, 0x18, 0xeb, 0xaf, + 0xd6, 0xb7, 0x12, 0xcc, 0x77, 0xb9, 0xb7, 0xaf, 0xf6, 0x75, 0xf1, 0x28, 0xc1, 0x58, 0x76, 0x24, + 0x0e, 0xa8, 0x09, 0xff, 0xee, 0x89, 0x80, 0x8b, 0x40, 0x9e, 0x18, 0xa4, 0x41, 0x9a, 0xc4, 0xbd, + 0x8c, 0xe9, 0x2a, 0x54, 0x76, 0xd9, 0x00, 0xe3, 0x88, 0xf5, 0xd0, 0x28, 0x36, 0x48, 0xb3, 0xe2, + 0x5e, 0x25, 0xe8, 0x43, 0x28, 0xef, 0x30, 0x0f, 0xfb, 0xb1, 0x51, 0x6a, 0x14, 0x9b, 0xd5, 0x8d, + 0x3b, 0x36, 0x8b, 0x02, 0x7b, 0xd2, 0x26, 0xb6, 0xd6, 0xb5, 0x43, 0x29, 0x4e, 0xdc, 0xac, 0x88, + 0xee, 0x40, 0xf5, 0xf1, 0x95, 0x4d, 0x63, 0x56, 0x31, 0xd6, 0xa6, 0x33, 0x72, 0x62, 0x0d, 0xca, + 0x97, 0x53, 0x06, 0x34, 0x15, 0x07, 0x02, 0xfd, 0x5d, 0xee, 0x63, 0x66, 0xac, 0xac, 0xa0, 0xeb, + 0xd3, 0xa1, 0xe3, 0x35, 0x9a, 0x3d, 0x01, 0x46, 0xef, 0xc3, 0x3f, 0x7b, 0xdc, 0xdf, 0x8f, 0xb0, + 0x67, 0x14, 0x1a, 0xa4, 0x59, 0xdd, 0x58, 0xb1, 0xf5, 0x58, 0x14, 0x3e, 0x1d, 0x8b, 0x7d, 0xbc, + 0x6e, 0x67, 0x12, 0x77, 0xa8, 0x35, 0x1f, 0x40, 0x35, 0x47, 0xa6, 0x35, 0x28, 0x1e, 0xa2, 0x3e, + 0xea, 0x8a, 0x9b, 0x2e, 0xe9, 0x3c, 0xcc, 0x1e, 0xb3, 0x7e, 0x82, 0x8a, 0x5a, 0x71, 0x75, 0xb0, + 0x55, 0xd8, 0x24, 0xe6, 0x23, 0xa8, 0x8d, 0x76, 0x7d, 0xa3, 0xfa, 0x36, 0x2c, 0x4d, 0x69, 0xf0, + 0x26, 0x18, 0xeb, 0x94, 0x40, 0x6d, 0xf4, 0xf4, 0x52, 0xf9, 0xf3, 0x04, 0x13, 0xcc, 0x10, 0x3a, + 0x48, 0x6f, 0x53, 0xaa, 0x44, 0xd9, 0xf1, 0x33, 0xce, 0x65, 0x4c, 0x5b, 0xf0, 0x7f, 0x97, 0x7b, + 0xb9, 0xd3, 0x8f, 0x8d, 0xa2, 0x9a, 0xcf, 0xf2, 0xd4, 0xf9, 0xb8, 0xa3, 0x15, 0xd6, 0x6b, 0x65, + 0xa5, 0xc5, 0xc2, 0x1e, 0xf6, 0x73, 0x56, 0xba, 0xdc, 0xeb, 0xf8, 0x43, 0x2b, 0x2a, 0xf8, 0xab, + 0x95, 0x4b, 0xf3, 0xc5, 0x9c, 0x79, 0xab, 0x05, 0x0b, 0x39, 0x13, 0x71, 0xc4, 0xc3, 0x18, 0xd5, + 0x3f, 0x32, 0x79, 0x83, 0x79, 0x98, 0x6d, 0x0b, 0xc1, 0xc5, 0xf0, 0xc0, 0x54, 0x60, 0xbd, 0x81, + 0x5b, 0x63, 0x10, 0xba, 0xad, 0x5c, 0xe7, 0x99, 0xb1, 0x41, 0x54, 0xef, 0xe6, 0x68, 0xef, 0x57, + 0x12, 0x77, 0xac, 0xc6, 0xfa, 0x4c, 0x32, 0xe3, 0x94, 0x42, 0x29, 0xfd, 0x13, 0x33, 0x47, 0x6a, + 0x4d, 0xef, 0xc2, 0xdc, 0xf0, 0xd7, 0xdd, 0x66, 0x3d, 0x99, 0x39, 0x23, 0xee, 0x48, 0x96, 0xd6, + 0x01, 0x5e, 0xc4, 0x28, 0x9e, 0xbd, 0x0f, 0x51, 0xe8, 0x19, 0x54, 0xdc, 0x5c, 0x86, 0x36, 0xa0, + 0xfa, 0x54, 0xf0, 0x24, 0xca, 0x04, 0x25, 0x25, 0xc8, 0xa7, 0xac, 0x4d, 0xa0, 0x7a, 0x04, 0x7d, + 0x75, 0x35, 0x5d, 0x8c, 0x93, 0xbe, 0xa4, 0x16, 0xfc, 0x97, 0x65, 0xd1, 0xef, 0xf8, 0xba, 0xc3, + 0x8a, 0x7b, 0x2d, 0xb7, 0x71, 0x5a, 0x80, 0xb2, 0x6e, 0x95, 0xbe, 0x04, 0xd0, 0xab, 0x2e, 0xf7, + 0x62, 0xba, 0x30, 0xf1, 0x12, 0x98, 0x8b, 0x93, 0xcf, 0xc7, 0x5a, 0xfe, 0xf4, 0xe3, 0xf7, 0xd7, + 0xc2, 0x6d, 0x6b, 0x2e, 0x7d, 0x0a, 0xdf, 0x71, 0x2f, 0x7b, 0x51, 0xb7, 0xc8, 0x1a, 0x7d, 0x05, + 0xa0, 0xb7, 0xbc, 0xce, 0xbd, 0x76, 0x67, 0xcc, 0x25, 0x95, 0x1e, 0x6f, 0x62, 0x1c, 0xdc, 0x53, + 0x9a, 0x14, 0xbc, 0x0b, 0xd5, 0x96, 0x40, 0x26, 0x51, 0x8f, 0x00, 0x14, 0x42, 0xad, 0xcd, 0x45, + 0x5b, 0xbf, 0xbe, 0xf6, 0xf0, 0xf9, 0xb6, 0xdb, 0xe9, 0xf3, 0x6d, 0xad, 0x28, 0xda, 0x82, 0x59, + 0x4b, 0x69, 0x47, 0xa9, 0xd4, 0xf9, 0x90, 0xce, 0xea, 0xe3, 0x16, 0x59, 0x7b, 0x62, 0x7c, 0x3f, + 0xaf, 0x93, 0xb3, 0xf3, 0x3a, 0xf9, 0x75, 0x5e, 0x27, 0x5f, 0x2e, 0xea, 0x33, 0x67, 0x17, 0xf5, + 0x99, 0x9f, 0x17, 0xf5, 0x19, 0xaf, 0xac, 0x30, 0xf7, 0xfe, 0x04, 0x00, 0x00, 0xff, 0xff, 0x40, + 0x7e, 0x00, 0x7e, 0x3c, 0x06, 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. @@ -707,6 +718,23 @@ func (m *JobSubmitRequestItem) MarshalTo(dAtA []byte) (int, error) { i += copy(dAtA[i:], v) } } + if len(m.RequiredNodeLabels) > 0 { + for k, _ := range m.RequiredNodeLabels { + dAtA[i] = 0x32 + i++ + v := m.RequiredNodeLabels[k] + mapSize := 1 + len(k) + sovSubmit(uint64(len(k))) + 1 + len(v) + sovSubmit(uint64(len(v))) + i = encodeVarintSubmit(dAtA, i, uint64(mapSize)) + dAtA[i] = 0xa + i++ + i = encodeVarintSubmit(dAtA, i, uint64(len(k))) + i += copy(dAtA[i:], k) + dAtA[i] = 0x12 + i++ + i = encodeVarintSubmit(dAtA, i, uint64(len(v))) + i += copy(dAtA[i:], v) + } + } return i, nil } @@ -983,6 +1011,14 @@ func (m *JobSubmitRequestItem) Size() (n int) { n += mapEntrySize + 1 + sovSubmit(uint64(mapEntrySize)) } } + if len(m.RequiredNodeLabels) > 0 { + for k, v := range m.RequiredNodeLabels { + _ = k + _ = v + mapEntrySize := 1 + len(k) + sovSubmit(uint64(len(k))) + 1 + len(v) + sovSubmit(uint64(len(v))) + n += mapEntrySize + 1 + sovSubmit(uint64(mapEntrySize)) + } + } return n } @@ -1480,6 +1516,133 @@ func (m *JobSubmitRequestItem) Unmarshal(dAtA []byte) error { } m.Annotations[mapkey] = mapvalue iNdEx = postIndex + case 6: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field RequiredNodeLabels", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSubmit + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthSubmit + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthSubmit + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.RequiredNodeLabels == nil { + m.RequiredNodeLabels = make(map[string]string) + } + var mapkey string + var mapvalue string + for iNdEx < postIndex { + entryPreIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSubmit + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + if fieldNum == 1 { + var stringLenmapkey uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSubmit + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLenmapkey |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLenmapkey := int(stringLenmapkey) + if intStringLenmapkey < 0 { + return ErrInvalidLengthSubmit + } + postStringIndexmapkey := iNdEx + intStringLenmapkey + if postStringIndexmapkey < 0 { + return ErrInvalidLengthSubmit + } + if postStringIndexmapkey > l { + return io.ErrUnexpectedEOF + } + mapkey = string(dAtA[iNdEx:postStringIndexmapkey]) + iNdEx = postStringIndexmapkey + } else if fieldNum == 2 { + var stringLenmapvalue uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSubmit + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLenmapvalue |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLenmapvalue := int(stringLenmapvalue) + if intStringLenmapvalue < 0 { + return ErrInvalidLengthSubmit + } + postStringIndexmapvalue := iNdEx + intStringLenmapvalue + if postStringIndexmapvalue < 0 { + return ErrInvalidLengthSubmit + } + if postStringIndexmapvalue > l { + return io.ErrUnexpectedEOF + } + mapvalue = string(dAtA[iNdEx:postStringIndexmapvalue]) + iNdEx = postStringIndexmapvalue + } else { + iNdEx = entryPreIndex + skippy, err := skipSubmit(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthSubmit + } + if (iNdEx + skippy) > postIndex { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + m.RequiredNodeLabels[mapkey] = mapvalue + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipSubmit(dAtA[iNdEx:]) diff --git a/internal/armada/api/submit.proto b/internal/armada/api/submit.proto index 562c414dd8a..04cb773ada5 100644 --- a/internal/armada/api/submit.proto +++ b/internal/armada/api/submit.proto @@ -11,6 +11,7 @@ message JobSubmitRequestItem { string Namespace = 3; map Labels = 4; map Annotations = 5; + map RequiredNodeLabels = 6; k8s.io.api.core.v1.PodSpec PodSpec = 2; } diff --git a/internal/armada/repository/job.go b/internal/armada/repository/job.go index a48c6b80473..979bcb7053e 100644 --- a/internal/armada/repository/job.go +++ b/internal/armada/repository/job.go @@ -63,6 +63,8 @@ func (repo *RedisJobRepository) CreateJobs(request *api.JobSubmitRequest, princi Labels: item.Labels, Annotations: item.Annotations, + RequiredNodeLabels: item.RequiredNodeLabels, + Priority: item.Priority, PodSpec: item.PodSpec, diff --git a/internal/armada/server/lease.go b/internal/armada/server/lease.go index 0863cc0b324..59fcd5981ad 100644 --- a/internal/armada/server/lease.go +++ b/internal/armada/server/lease.go @@ -98,14 +98,14 @@ func (q AggregatedQueueServer) LeaseJobs(ctx context.Context, request *api.Lease jobs := []*api.Job{} if !q.schedulingConfig.UseProbabilisticSchedulingForAllResources { - jobs, e = q.assignJobs(request.ClusterId, slices) + jobs, e = q.assignJobs(request, slices) if e != nil { log.Errorf("Error when leasing jobs for cluster %s: %s", request.ClusterId, e) return nil, e } } - additionalJobs, e := q.distributeRemainder(scarcity, request.ClusterId, activeQueuePriority, slices) + additionalJobs, e := q.distributeRemainder(request, scarcity, activeQueuePriority, slices) if e != nil { log.Errorf("Error when leasing jobs for cluster %s: %s", request.ClusterId, e) return nil, e @@ -146,11 +146,11 @@ func (q *AggregatedQueueServer) ReportDone(ctx context.Context, idList *api.IdLi return &api.IdList{cleaned}, e } -func (q *AggregatedQueueServer) assignJobs(clusterId string, slices map[*api.Queue]common.ComputeResourcesFloat) ([]*api.Job, error) { +func (q *AggregatedQueueServer) assignJobs(request *api.LeaseRequest, slices map[*api.Queue]common.ComputeResourcesFloat) ([]*api.Job, error) { jobs := make([]*api.Job, 0) // TODO: parallelize for queue, slice := range slices { - leased, remainder, e := q.leaseJobs(clusterId, queue, slice, -1) + leased, remainder, e := q.leaseJobs(request, queue, slice, -1) if e != nil { log.Error(e) continue @@ -161,7 +161,7 @@ func (q *AggregatedQueueServer) assignJobs(clusterId string, slices map[*api.Que return jobs, nil } -func (q *AggregatedQueueServer) distributeRemainder(resourceScarcity map[string]float64, clusterId string, priorities map[*api.Queue]scheduling.QueuePriorityInfo, slices map[*api.Queue]common.ComputeResourcesFloat) ([]*api.Job, error) { +func (q *AggregatedQueueServer) distributeRemainder(request *api.LeaseRequest, resourceScarcity map[string]float64, priorities map[*api.Queue]scheduling.QueuePriorityInfo, slices map[*api.Queue]common.ComputeResourcesFloat) ([]*api.Job, error) { jobs := []*api.Job{} remainder := common.ComputeResourcesFloat{} shares := map[*api.Queue]float64{} @@ -177,7 +177,7 @@ func (q *AggregatedQueueServer) distributeRemainder(resourceScarcity map[string] queue := q.pickQueueRandomly(shares) emptySteps++ - leased, remaining, e := q.leaseJobs(clusterId, queue, remainder, 1) + leased, remaining, e := q.leaseJobs(request, queue, remainder, 1) if e != nil { log.Error(e) continue @@ -215,7 +215,7 @@ func (q *AggregatedQueueServer) pickQueueRandomly(shares map[*api.Queue]float64) return lastQueue } -func (q *AggregatedQueueServer) leaseJobs(clusterId string, queue *api.Queue, slice common.ComputeResourcesFloat, limit int) ([]*api.Job, common.ComputeResourcesFloat, error) { +func (q *AggregatedQueueServer) leaseJobs(request *api.LeaseRequest, queue *api.Queue, slice common.ComputeResourcesFloat, limit int) ([]*api.Job, common.ComputeResourcesFloat, error) { jobs := make([]*api.Job, 0) remainder := slice for slice.IsValid() { @@ -230,7 +230,7 @@ func (q *AggregatedQueueServer) leaseJobs(clusterId string, queue *api.Queue, sl requirement := common.TotalResourceRequest(job.PodSpec).AsFloat() remainder = slice.DeepCopy() remainder.Sub(requirement) - if remainder.IsValid() { + if remainder.IsValid() && matchRequirements(job, request) { slice = remainder candidates = append(candidates, job) } @@ -239,7 +239,7 @@ func (q *AggregatedQueueServer) leaseJobs(clusterId string, queue *api.Queue, sl } } - leased, e := q.jobRepository.TryLeaseJobs(clusterId, queue.Name, candidates) + leased, e := q.jobRepository.TryLeaseJobs(request.ClusterId, queue.Name, candidates) if e != nil { return nil, slice, e } @@ -256,11 +256,28 @@ func (q *AggregatedQueueServer) leaseJobs(clusterId string, queue *api.Queue, sl } } - go reportJobsLeased(q.eventRepository, jobs, clusterId) + go reportJobsLeased(q.eventRepository, jobs, request.ClusterId) return jobs, slice, nil } +func matchRequirements(job *api.Job, request *api.LeaseRequest) bool { + if len(job.RequiredNodeLabels) == 0 { + return true + } + +Labels: + for _, labeling := range request.AvailableLabels { + for k, v := range job.RequiredNodeLabels { + if labeling.Labels[k] != v { + continue Labels + } + } + return true + } + return false +} + func filterPriorityMapByKeys(original map[*api.Queue]scheduling.QueuePriorityInfo, keys []*api.Queue) map[*api.Queue]scheduling.QueuePriorityInfo { result := make(map[*api.Queue]scheduling.QueuePriorityInfo) for _, key := range keys { diff --git a/internal/armada/server/lease_test.go b/internal/armada/server/lease_test.go new file mode 100644 index 00000000000..bdc91762743 --- /dev/null +++ b/internal/armada/server/lease_test.go @@ -0,0 +1,28 @@ +package server + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/G-Research/armada/internal/armada/api" +) + +func Test_matchRequirements(t *testing.T) { + + job := &api.Job{RequiredNodeLabels: map[string]string{"armada/region": "eu", "armada/zone": "1"}} + + assert.False(t, matchRequirements(job, &api.LeaseRequest{})) + assert.False(t, matchRequirements(job, &api.LeaseRequest{AvailableLabels: []*api.NodeLabeling{ + {Labels: map[string]string{"armada/region": "eu"}}, + {Labels: map[string]string{"armada/zone": "2"}}, + }})) + assert.False(t, matchRequirements(job, &api.LeaseRequest{AvailableLabels: []*api.NodeLabeling{ + {Labels: map[string]string{"armada/region": "eu", "armada/zone": "2"}}, + }})) + + assert.True(t, matchRequirements(job, &api.LeaseRequest{AvailableLabels: []*api.NodeLabeling{ + {Labels: map[string]string{"x": "y"}}, + {Labels: map[string]string{"armada/region": "eu", "armada/zone": "1", "x": "y"}}, + }})) +} diff --git a/internal/executor/application.go b/internal/executor/application.go index 058af22a221..b4790b0beeb 100644 --- a/internal/executor/application.go +++ b/internal/executor/application.go @@ -56,7 +56,8 @@ func StartUp(config configuration.ExecutorConfiguration) (func(), *sync.WaitGrou clusterUtilisationService := service.NewClusterUtilisationService( clusterContext, - usageClient) + usageClient, + config.Kubernetes.TrackedNodeLabels) stuckPodDetector := service.NewPodProgressMonitorService( clusterContext, diff --git a/internal/executor/configuration/types.go b/internal/executor/configuration/types.go index 4d06a297e2a..4409aaae5d6 100644 --- a/internal/executor/configuration/types.go +++ b/internal/executor/configuration/types.go @@ -11,7 +11,8 @@ type ApplicationConfiguration struct { } type KubernetesConfiguration struct { - ImpersonateUsers bool + ImpersonateUsers bool + TrackedNodeLabels []string } type TaskConfiguration struct { diff --git a/internal/executor/service/cluster_allocation.go b/internal/executor/service/cluster_allocation.go index 7e24f4ab394..d307a8d31fb 100644 --- a/internal/executor/service/cluster_allocation.go +++ b/internal/executor/service/cluster_allocation.go @@ -40,13 +40,13 @@ func NewClusterAllocationService( } func (allocationService *ClusterAllocationService) AllocateSpareClusterCapacity() { - availableResource, err := allocationService.utilisationService.GetAvailableClusterCapacity() + availableResource, availableLabels, err := allocationService.utilisationService.GetAvailableClusterCapacity() if err != nil { log.Errorf("Failed to allocate spare cluster capacity because %s", err) return } - newJobs, err := allocationService.leaseService.RequestJobLeases(availableResource) + newJobs, err := allocationService.leaseService.RequestJobLeases(availableResource, availableLabels) cpu := (*availableResource)["cpu"] memory := (*availableResource)["memory"] diff --git a/internal/executor/service/cluster_utilisation.go b/internal/executor/service/cluster_utilisation.go index 6c2d04f9393..4fe58df0d6d 100644 --- a/internal/executor/service/cluster_utilisation.go +++ b/internal/executor/service/cluster_utilisation.go @@ -15,21 +15,24 @@ import ( ) type UtilisationService interface { - GetAvailableClusterCapacity() (*common.ComputeResources, error) + GetAvailableClusterCapacity() (*common.ComputeResources, []map[string]string, error) } type ClusterUtilisationService struct { - clusterContext context.ClusterContext - usageClient api.UsageClient + clusterContext context.ClusterContext + usageClient api.UsageClient + trackedNodeLabels []string } func NewClusterUtilisationService( clusterContext context.ClusterContext, - usageClient api.UsageClient) *ClusterUtilisationService { + usageClient api.UsageClient, + trackedNodeLabels []string) *ClusterUtilisationService { return &ClusterUtilisationService{ - clusterContext: clusterContext, - usageClient: usageClient} + clusterContext: clusterContext, + usageClient: usageClient, + trackedNodeLabels: trackedNodeLabels} } func (clusterUtilisationService *ClusterUtilisationService) ReportClusterUtilisation() { @@ -63,15 +66,15 @@ func (clusterUtilisationService *ClusterUtilisationService) ReportClusterUtilisa } } -func (clusterUtilisationService *ClusterUtilisationService) GetAvailableClusterCapacity() (*common.ComputeResources, error) { +func (clusterUtilisationService *ClusterUtilisationService) GetAvailableClusterCapacity() (*common.ComputeResources, []map[string]string, error) { processingNodes, err := clusterUtilisationService.getAllAvailableProcessingNodes() if err != nil { - return new(common.ComputeResources), fmt.Errorf("Failed getting available cluster capacity due to: %s", err) + return new(common.ComputeResources), nil, fmt.Errorf("Failed getting available cluster capacity due to: %s", err) } allPods, err := clusterUtilisationService.clusterContext.GetAllPods() if err != nil { - return new(common.ComputeResources), fmt.Errorf("Failed getting available cluster capacity due to: %s", err) + return new(common.ComputeResources), nil, fmt.Errorf("Failed getting available cluster capacity due to: %s", err) } allPodsRequiringResource := getAllPodsRequiringResourceOnProcessingNodes(allPods, processingNodes) @@ -83,7 +86,9 @@ func (clusterUtilisationService *ClusterUtilisationService) GetAvailableClusterC availableResource := totalNodeResource.DeepCopy() availableResource.Sub(totalPodResource) - return &availableResource, nil + availableLabels := getDistinctNodesLabels(clusterUtilisationService.trackedNodeLabels, processingNodes) + + return &availableResource, availableLabels, nil } func (clusterUtilisationService *ClusterUtilisationService) getAllAvailableProcessingNodes() ([]*v1.Node, error) { @@ -200,3 +205,24 @@ func getUsageByQueue(pods []*v1.Pod) map[string]common.ComputeResources { return utilisationByQueue } + +func getDistinctNodesLabels(labels []string, nodes []*v1.Node) []map[string]string { + result := []map[string]string{} + existing := map[string]bool{} + for _, n := range nodes { + selectedLabels := map[string]string{} + id := "" + for _, key := range labels { + value, ok := n.Labels[key] + if ok { + selectedLabels[key] = value + } + id += "|" + value + } + if !existing[id] { + result = append(result, selectedLabels) + existing[id] = true + } + } + return result +} diff --git a/internal/executor/service/cluster_utilisation_test.go b/internal/executor/service/cluster_utilisation_test.go index a6874a0866b..29d6db8a44f 100644 --- a/internal/executor/service/cluster_utilisation_test.go +++ b/internal/executor/service/cluster_utilisation_test.go @@ -216,6 +216,31 @@ func TestGetUsageByQueue_HandlesEmptyList(t *testing.T) { assert.Equal(t, len(result), 0) } +func Test_getDistinctNodesLabels(t *testing.T) { + + nodes := []*v1.Node{ + {ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{ + "A": "x", + "B": "x", + }}}, + {ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{ + "A": "x", + "B": "x", + }}}, + {ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{ + "B": "y", + }}}, + } + labels := []string{"A", "B"} + + result := getDistinctNodesLabels(labels, nodes) + + assert.Equal(t, []map[string]string{ + {"A": "x", "B": "x"}, + {"B": "y"}, + }, result) +} + func hasKey(value map[string]common.ComputeResources, key string) bool { _, ok := value[key] return ok diff --git a/internal/executor/service/job_lease.go b/internal/executor/service/job_lease.go index 41dc8f43584..902218496d5 100644 --- a/internal/executor/service/job_lease.go +++ b/internal/executor/service/job_lease.go @@ -18,7 +18,7 @@ import ( type LeaseService interface { ReturnLease(pod *v1.Pod) error - RequestJobLeases(availableResource *common.ComputeResources) ([]*api.Job, error) + RequestJobLeases(availableResource *common.ComputeResources, availableLabels []map[string]string) ([]*api.Job, error) ReportDone(pods []*v1.Pod) error } @@ -36,10 +36,16 @@ func NewJobLeaseService( queueClient: queueClient} } -func (jobLeaseService *JobLeaseService) RequestJobLeases(availableResource *common.ComputeResources) ([]*api.Job, error) { +func (jobLeaseService *JobLeaseService) RequestJobLeases(availableResource *common.ComputeResources, availableLabels []map[string]string) ([]*api.Job, error) { + labeling := []*api.NodeLabeling{} + for _, l := range availableLabels { + labeling = append(labeling, &api.NodeLabeling{Labels: l}) + } + leaseRequest := api.LeaseRequest{ - ClusterId: jobLeaseService.clusterContext.GetClusterId(), - Resources: *availableResource, + ClusterId: jobLeaseService.clusterContext.GetClusterId(), + Resources: *availableResource, + AvailableLabels: labeling, } ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel()