Skip to content

Commit

Permalink
Add headless services to armada (#820)
Browse files Browse the repository at this point in the history
* Add Headless service

* Add label selector code to headless services

* Work on tests

* Lint

* Add tests

* Lint

* Remove custom selector from headless service

* Checkpoint before changing groupIngressConfig

* Separate services from ingress interface

* Add tests

* Lint

* Create internal datastructure for services

* Update deserialize

* Add service_config file

* Remove unused enum values

* Change name to IngressServiceConfig

* rerun pipe

* Add renamed file
  • Loading branch information
theAntiYeti authored Dec 14, 2021
1 parent 64b07a2 commit bb7ce57
Show file tree
Hide file tree
Showing 22 changed files with 1,310 additions and 323 deletions.
36 changes: 32 additions & 4 deletions client/DotNet/Armada.Client/ClientGenerated.cs
Original file line number Diff line number Diff line change
Expand Up @@ -904,14 +904,12 @@ public partial class ApiIngressConfig

}

/// <summary>Ingress type is being kept here to maintain backwards compatibility for a while.</summary>
[System.CodeDom.Compiler.GeneratedCode("NJsonSchema", "10.0.27.0 (Newtonsoft.Json v12.0.0.0)")]
public enum ApiIngressType
{
[System.Runtime.Serialization.EnumMember(Value = @"NodePort")]
NodePort = 0,

[System.Runtime.Serialization.EnumMember(Value = @"Ingress")]
Ingress = 1,
Ingress = 0,

}

Expand Down Expand Up @@ -963,6 +961,9 @@ public partial class ApiJob
[Newtonsoft.Json.JsonProperty("requiredNodeLabels", Required = Newtonsoft.Json.Required.Default, NullValueHandling = Newtonsoft.Json.NullValueHandling.Ignore)]
public System.Collections.Generic.IDictionary<string, string> RequiredNodeLabels { get; set; }

[Newtonsoft.Json.JsonProperty("services", Required = Newtonsoft.Json.Required.Default, NullValueHandling = Newtonsoft.Json.NullValueHandling.Ignore)]
public System.Collections.Generic.ICollection<ApiServiceConfig> Services { get; set; }


}

Expand Down Expand Up @@ -1441,6 +1442,9 @@ public partial class ApiJobSubmitRequestItem
[Newtonsoft.Json.JsonProperty("requiredNodeLabels", Required = Newtonsoft.Json.Required.Default, NullValueHandling = Newtonsoft.Json.NullValueHandling.Ignore)]
public System.Collections.Generic.IDictionary<string, string> RequiredNodeLabels { get; set; }

[Newtonsoft.Json.JsonProperty("services", Required = Newtonsoft.Json.Required.Default, NullValueHandling = Newtonsoft.Json.NullValueHandling.Ignore)]
public System.Collections.Generic.ICollection<ApiServiceConfig> Services { get; set; }


}

Expand Down Expand Up @@ -1696,6 +1700,30 @@ public partial class ApiQueueInfo

}

[System.CodeDom.Compiler.GeneratedCode("NJsonSchema", "10.0.27.0 (Newtonsoft.Json v12.0.0.0)")]
public partial class ApiServiceConfig
{
[Newtonsoft.Json.JsonProperty("ports", Required = Newtonsoft.Json.Required.Default, NullValueHandling = Newtonsoft.Json.NullValueHandling.Ignore)]
public System.Collections.Generic.ICollection<long> Ports { get; set; }

[Newtonsoft.Json.JsonProperty("type", Required = Newtonsoft.Json.Required.Default, NullValueHandling = Newtonsoft.Json.NullValueHandling.Ignore)]
[Newtonsoft.Json.JsonConverter(typeof(Newtonsoft.Json.Converters.StringEnumConverter))]
public ApiServiceType? Type { get; set; }


}

[System.CodeDom.Compiler.GeneratedCode("NJsonSchema", "10.0.27.0 (Newtonsoft.Json v12.0.0.0)")]
public enum ApiServiceType
{
[System.Runtime.Serialization.EnumMember(Value = @"NodePort")]
NodePort = 0,

[System.Runtime.Serialization.EnumMember(Value = @"Headless")]
Headless = 1,

}

/// <summary>+protobuf=true
/// +protobuf.options.(gogoproto.goproto_stringer)=false
/// +k8s:openapi-gen=true</summary>
Expand Down
1 change: 1 addition & 0 deletions internal/armada/server/submit.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,7 @@ func (server *SubmitServer) createJobs(request *api.JobSubmitRequest, owner stri

RequiredNodeLabels: item.RequiredNodeLabels,
Ingress: item.Ingress,
Services: item.Services,

Priority: item.Priority,

Expand Down
4 changes: 4 additions & 0 deletions internal/common/util/map.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ func MergeMaps(a map[string]string, b map[string]string) map[string]string {
}

func DeepCopy(a map[string]string) map[string]string {
if a == nil {
return nil
}

result := make(map[string]string)
for k, v := range a {
result[k] = v
Expand Down
8 changes: 4 additions & 4 deletions internal/common/validation/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ func Test_ValidateJobSubmitRequestItem(t *testing.T) {
validIngressConfig := &api.JobSubmitRequestItem{
Ingress: []*api.IngressConfig{
{
Type: api.IngressType_NodePort,
Type: api.IngressType_Ingress,
Ports: []uint32{
5,
},
Expand All @@ -26,7 +26,7 @@ func Test_ValidateJobSubmitRequestItem_WithPortRepeatedInSingleConfig(t *testing
validIngressConfig := &api.JobSubmitRequestItem{
Ingress: []*api.IngressConfig{
{
Type: api.IngressType_NodePort,
Type: api.IngressType_Ingress,
Ports: []uint32{
5,
5,
Expand All @@ -41,13 +41,13 @@ func Test_ValidateJobSubmitRequestItem_WithPortRepeatedInSeperateConfig(t *testi
validIngressConfig := &api.JobSubmitRequestItem{
Ingress: []*api.IngressConfig{
{
Type: api.IngressType_NodePort,
Type: api.IngressType_Ingress,
Ports: []uint32{
5,
},
},
{
Type: api.IngressType_NodePort,
Type: api.IngressType_Ingress,
Ports: []uint32{
5,
},
Expand Down
6 changes: 5 additions & 1 deletion internal/executor/job/submit.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,11 @@ func (allocationService *SubmitService) submitPod(job *api.Job, i int) (*v1.Pod,
}

func exposesPorts(job *api.Job, podSpec *v1.PodSpec) bool {
return len(util2.GetServicePorts(job.Ingress, podSpec)) > 0
// This is to workaround needing to get serviceports for service configs
// while maintaining immutability of the configs as they're passed around.
servicesIngressConfig := util2.CombineIngressService(job.Ingress, job.Services)

return len(util2.GetServicePorts(servicesIngressConfig, podSpec)) > 0
}

func isNotRecoverable(status metav1.Status) bool {
Expand Down
59 changes: 59 additions & 0 deletions internal/executor/util/ingress_service_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package util

import (
"github.com/G-Research/armada/internal/common/util"
"github.com/G-Research/armada/pkg/api"
)

type IngressServiceType int

const (
Ingress IngressServiceType = iota
NodePort
Headless
)

func (st IngressServiceType) String() string {
return []string{"Ingress", "NodePort", "Headless"}[st]
}

type IngressServiceConfig struct {
Type IngressServiceType
Ports []uint32
Annotations map[string]string
TlsEnabled bool
CertName string
}

func CombineIngressService(ingresses []*api.IngressConfig, services []*api.ServiceConfig) []*IngressServiceConfig {
result := []*IngressServiceConfig{}

for _, ing := range ingresses {
result = append(
result,
&IngressServiceConfig{
Type: Ingress,
Ports: util.DeepCopyListUint32(ing.Ports),
Annotations: util.DeepCopy(ing.Annotations),
TlsEnabled: ing.TlsEnabled,
CertName: ing.CertName,
},
)
}

for _, svc := range services {
svcType := NodePort
if svc.Type == api.ServiceType_Headless {
svcType = Headless
}
result = append(
result,
&IngressServiceConfig{
Type: svcType,
Ports: util.DeepCopyListUint32(svc.Ports),
},
)
}

return result
}
78 changes: 46 additions & 32 deletions internal/executor/util/ingress_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,20 @@ func GenerateIngresses(job *api.Job, pod *v1.Pod, ingressConfig *configuration.I
services := []*v1.Service{}
ingresses := []*networking.Ingress{}

groupedIngressConfigs := groupIngressConfig(job.Ingress)
for ingressType, configs := range groupedIngressConfigs {
ingressToGen := CombineIngressService(job.Ingress, job.Services)

groupedIngressConfigs := groupIngressConfig(ingressToGen)
for svcType, configs := range groupedIngressConfigs {
if len(GetServicePorts(configs, &pod.Spec)) > 0 {
service := CreateService(job, pod, GetServicePorts(configs, &pod.Spec), ingressType)
service := CreateService(job, pod, GetServicePorts(configs, &pod.Spec), svcType)
services = append(services, service)

if ingressType == api.IngressType_Ingress {
if svcType == Ingress {
for index, config := range configs {
if len(GetServicePorts([]*api.IngressConfig{config}, &pod.Spec)) <= 0 {
if len(GetServicePorts([]*IngressServiceConfig{config}, &pod.Spec)) <= 0 {
continue
}
ingressName := fmt.Sprintf("%s-%s-%d", pod.Name, strings.ToLower(ingressType.String()), index)
ingressName := fmt.Sprintf("%s-%s-%d", pod.Name, strings.ToLower(svcType.String()), index)
ingress := CreateIngress(ingressName, job, pod, service, ingressConfig, config)
ingresses = append(ingresses, ingress)
}
Expand All @@ -38,56 +40,68 @@ func GenerateIngresses(job *api.Job, pod *v1.Pod, ingressConfig *configuration.I
return services, ingresses
}

func groupIngressConfig(configs []*api.IngressConfig) map[api.IngressType][]*api.IngressConfig {
result := make(map[api.IngressType][]*api.IngressConfig, 10)
func groupIngressConfig(configs []*IngressServiceConfig) map[IngressServiceType][]*IngressServiceConfig {
result := gatherIngressConfig(configs)

for ingressType, grp := range result {
result[ingressType] = mergeOnAnnotations(grp)
}

return result
}

func gatherIngressConfig(configs []*IngressServiceConfig) map[IngressServiceType][]*IngressServiceConfig {
result := make(map[IngressServiceType][]*IngressServiceConfig, 10)

for _, config := range configs {
if _, present := result[config.Type]; !present {
result[config.Type] = []*api.IngressConfig{deepCopy(config)}
continue
}
result[config.Type] = append(result[config.Type], deepCopy(config))
}

existingConfigsOfType := result[config.Type]
if config.Type == api.IngressType_NodePort {
existingConfigsOfType[0].Ports = append(existingConfigsOfType[0].Ports, config.Ports...)
} else {
matchFound := false
for _, existingConfig := range existingConfigsOfType {
if util.Equal(config.Annotations, existingConfig.Annotations) {
existingConfig.Ports = append(existingConfig.Ports, config.Ports...)
matchFound = true
}
}
if !matchFound {
result[config.Type] = append(existingConfigsOfType, deepCopy(config))
return result
}

func mergeOnAnnotations(configs []*IngressServiceConfig) []*IngressServiceConfig {
result := make([]*IngressServiceConfig, 0, len(configs))

for _, config := range configs {
matchFound := false

for _, existingConfig := range result {
if util.Equal(config.Annotations, existingConfig.Annotations) {
existingConfig.Ports = append(existingConfig.Ports, config.Ports...)
matchFound = true
}
}
if !matchFound {
result = append(result, deepCopy(config))
}
}

return result
}

func deepCopy(config *api.IngressConfig) *api.IngressConfig {
return &api.IngressConfig{
Type: config.GetType(),
func deepCopy(config *IngressServiceConfig) *IngressServiceConfig {
return &IngressServiceConfig{
Type: config.Type,
Ports: util.DeepCopyListUint32(config.Ports),
Annotations: util.DeepCopy(config.Annotations),
TlsEnabled: config.TlsEnabled,
CertName: config.CertName,
}
}

func GetServicePorts(ingressConfigs []*api.IngressConfig, podSpec *v1.PodSpec) []v1.ServicePort {
func GetServicePorts(svcConfigs []*IngressServiceConfig, podSpec *v1.PodSpec) []v1.ServicePort {
var servicePorts []v1.ServicePort

for _, container := range podSpec.Containers {
ports := container.Ports
for _, ingressConfig := range ingressConfigs {
for _, svcConfig := range svcConfigs {
for _, port := range ports {
//Don't expose host via service, this will already be handled by kubernetes
if port.HostPort > 0 {
continue
}
if contains(ingressConfig, uint32(port.ContainerPort)) {
if contains(svcConfig, uint32(port.ContainerPort)) {
servicePort := v1.ServicePort{
Name: fmt.Sprintf("%s-%d", container.Name, port.ContainerPort),
Port: port.ContainerPort,
Expand All @@ -102,7 +116,7 @@ func GetServicePorts(ingressConfigs []*api.IngressConfig, podSpec *v1.PodSpec) [
return servicePorts
}

func contains(portConfig *api.IngressConfig, port uint32) bool {
func contains(portConfig *IngressServiceConfig, port uint32) bool {
for _, p := range portConfig.Ports {
if p == port {
return true
Expand Down
Loading

0 comments on commit bb7ce57

Please sign in to comment.