From 4ce748b343dabec7570ed58cf4bbe39fdfaa5ed4 Mon Sep 17 00:00:00 2001 From: backguynn Date: Fri, 22 Sep 2023 15:40:02 +0900 Subject: [PATCH 1/2] add yaml file for CRD (MulticlusterLBService) --- manifest/crds/multicluster-lb-service.yaml | 141 +++++++++++++++++++++ 1 file changed, 141 insertions(+) create mode 100644 manifest/crds/multicluster-lb-service.yaml diff --git a/manifest/crds/multicluster-lb-service.yaml b/manifest/crds/multicluster-lb-service.yaml new file mode 100644 index 0000000..e691edb --- /dev/null +++ b/manifest/crds/multicluster-lb-service.yaml @@ -0,0 +1,141 @@ +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + annotations: + controller-gen.kubebuilder.io/version: v0.13.0 + name: multiclusterlbservices.multicluster.loxilb.io +spec: + group: multicluster.loxilb.io + names: + kind: MultiClusterLBService + listKind: MultiClusterLBServiceList + plural: multiclusterlbservices + singular: multiclusterlbservice + scope: Cluster + versions: + - name: v1 + schema: + openAPIV3Schema: + description: MultiClusterLBService is the Schema for the multiclusterlbservices + API + properties: + apiVersion: + description: 'APIVersion defines the versioned schema of this representation + of an object. Servers should convert recognized schemas to the latest + internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources' + type: string + kind: + description: 'Kind is a string value representing the REST resource this + object represents. Servers may infer this from the endpoint the client + submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds' + type: string + metadata: + type: object + spec: + description: MultiClusterLBServiceSpec defines the desired state of MultiClusterLBService + properties: + lbModel: + properties: + endpoints: + items: + properties: + endpointIP: + type: string + state: + type: string + targetPort: + type: integer + weight: + type: integer + required: + - endpointIP + - state + - targetPort + - weight + type: object + type: array + secondaryIPs: + items: + properties: + secondaryIP: + type: string + required: + - secondaryIP + type: object + type: array + serviceArguments: + properties: + BGP: + type: boolean + Monitor: + type: boolean + block: + type: integer + externalIP: + type: string + inactiveTimeOut: + format: int32 + type: integer + managed: + type: boolean + mode: + format: int32 + type: integer + port: + type: integer + probeport: + type: integer + probereq: + type: string + proberesp: + type: string + probetype: + type: string + protocol: + type: string + sel: + description: 'EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU + TO OWN! NOTE: json tags are required. Any new fields you + add must have json tags for the fields to be serialized.' + type: integer + required: + - BGP + - Monitor + - block + - externalIP + - inactiveTimeOut + - mode + - port + - probeport + - probereq + - proberesp + - probetype + - protocol + - sel + type: object + required: + - endpoints + - secondaryIPs + - serviceArguments + type: object + required: + - lbModel + type: object + status: + description: MultiClusterLBServiceStatus defines the observed state of + MultiClusterLBService + properties: + externalIP: + description: 'INSERT ADDITIONAL STATUS FIELD - define observed state + of cluster Important: Run "make" to regenerate code after modifying + this file' + type: string + required: + - externalIP + type: object + type: object + served: true + storage: true + subresources: + status: {} From 6c56840a6e2c53d0518a9940662a0c5238535bed Mon Sep 17 00:00:00 2001 From: backguynn Date: Mon, 25 Sep 2023 18:26:32 +0900 Subject: [PATCH 2/2] add CRD(multiclusterLBService) manager & modify kube-loxilb --- cmd/loxilb-agent/agent.go | 16 +- manifest/crds/multicluster-lb-service.yaml | 9 - manifest/kube-loxilb.yaml | 11 + .../manager/multicluster/loadbalancer.go | 246 ++++++++++++++++++ pkg/crds/multiclusterlbservice/v1/types.go | 26 +- pkg/k8s/client.go | 21 +- pkg/k8s/node.go | 3 +- 7 files changed, 304 insertions(+), 28 deletions(-) create mode 100644 pkg/agent/manager/multicluster/loadbalancer.go diff --git a/cmd/loxilb-agent/agent.go b/cmd/loxilb-agent/agent.go index bb5df98..3074153 100644 --- a/cmd/loxilb-agent/agent.go +++ b/cmd/loxilb-agent/agent.go @@ -25,6 +25,9 @@ import ( "github.com/loxilb-io/kube-loxilb/pkg/agent/config" "github.com/loxilb-io/kube-loxilb/pkg/agent/manager/loadbalancer" + "github.com/loxilb-io/kube-loxilb/pkg/agent/manager/multicluster" + crdinformers "github.com/loxilb-io/kube-loxilb/pkg/client/informers/externalversions" + "github.com/loxilb-io/kube-loxilb/pkg/api" "github.com/loxilb-io/kube-loxilb/pkg/ippool" "github.com/loxilb-io/kube-loxilb/pkg/k8s" @@ -54,12 +57,14 @@ func run(o *Options) error { klog.Infof(" Build: %s", BuildInfo) // create k8s Clientset, CRD Clientset and SharedInformerFactory for the given config. - k8sClient, _, _, err := k8s.CreateClients(o.config.ClientConnection, "") + k8sClient, _, crdClient, _, err := k8s.CreateClients(o.config.ClientConnection, "") if err != nil { return fmt.Errorf("error creating k8s clients: %v", err) } informerFactory := informers.NewSharedInformerFactory(k8sClient, informerDefaultResync) + crdInformerFactory := crdinformers.NewSharedInformerFactory(crdClient, informerDefaultResync) + multiclusterLBInformer := crdInformerFactory.Multicluster().V1().MultiClusterLBServices() // networkReadyCh is used to notify that the Node's network is ready. // Functions that rely on the Node's network should wait for the channel to close. @@ -176,6 +181,13 @@ func run(o *Options) error { informerFactory, ) + multiClusterLBManager := multicluster.NewMulticlusterLBManager( + crdClient, + loxilbClients, + networkConfig, + multiclusterLBInformer, + ) + go wait.Until(func() { if len(networkConfig.LoxilbURLs) <= 0 { lbManager.DiscoverLoxiLBServices(loxiLBLiveCh, loxiLBPurgeCh) @@ -188,8 +200,10 @@ func run(o *Options) error { log.StartLogFileNumberMonitor(stopCh) informerFactory.Start(stopCh) + crdInformerFactory.Start(stopCh) go lbManager.Run(stopCh, loxiLBLiveCh, loxiLBPurgeCh, loxiLBSelMasterEvent) + go multiClusterLBManager.Run(stopCh, loxiLBLiveCh, loxiLBPurgeCh, loxiLBSelMasterEvent) <-stopCh diff --git a/manifest/crds/multicluster-lb-service.yaml b/manifest/crds/multicluster-lb-service.yaml index e691edb..c1487d5 100644 --- a/manifest/crds/multicluster-lb-service.yaml +++ b/manifest/crds/multicluster-lb-service.yaml @@ -50,7 +50,6 @@ spec: type: integer required: - endpointIP - - state - targetPort - weight type: object @@ -100,23 +99,15 @@ spec: add must have json tags for the fields to be serialized.' type: integer required: - - BGP - - Monitor - block - externalIP - inactiveTimeOut - mode - port - - probeport - - probereq - - proberesp - - probetype - - protocol - sel type: object required: - endpoints - - secondaryIPs - serviceArguments type: object required: diff --git a/manifest/kube-loxilb.yaml b/manifest/kube-loxilb.yaml index 42a050a..297c56b 100644 --- a/manifest/kube-loxilb.yaml +++ b/manifest/kube-loxilb.yaml @@ -60,6 +60,17 @@ rules: - subjectaccessreviews verbs: - create + - apiGroups: + - multicluster.loxilb.io + resources: + - multiclusterlbservices + verbs: + - get + - watch + - list + - create + - update + - delete --- kind: ClusterRoleBinding apiVersion: rbac.authorization.k8s.io/v1 diff --git a/pkg/agent/manager/multicluster/loadbalancer.go b/pkg/agent/manager/multicluster/loadbalancer.go new file mode 100644 index 0000000..3cc405d --- /dev/null +++ b/pkg/agent/manager/multicluster/loadbalancer.go @@ -0,0 +1,246 @@ +/* + * Copyright (c) 2023 NetLOX Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package multicluster + +import ( + "context" + "fmt" + "strings" + "time" + + "github.com/loxilb-io/kube-loxilb/pkg/agent/config" + "github.com/loxilb-io/kube-loxilb/pkg/api" + "github.com/loxilb-io/kube-loxilb/pkg/client/clientset/versioned" + crdInformer "github.com/loxilb-io/kube-loxilb/pkg/client/informers/externalversions/multiclusterlbservice/v1" + crdLister "github.com/loxilb-io/kube-loxilb/pkg/client/listers/multiclusterlbservice/v1" + crdv1 "github.com/loxilb-io/kube-loxilb/pkg/crds/multiclusterlbservice/v1" + + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" +) + +const ( + mgrName = "MulticlusterLBServiceManager" + defaultWorkers = 4 + resyncPeriod = 60 * time.Second + minRetryDelay = 2 * time.Second + maxRetryDelay = 120 * time.Second +) + +type Manager struct { + crdClient versioned.Interface + loxiClients []*api.LoxiClient + multiclusterLBInformer crdInformer.MultiClusterLBServiceInformer + multiclusterLBLister crdLister.MultiClusterLBServiceLister + multiclusterLBListerSynced cache.InformerSynced + queue workqueue.RateLimitingInterface +} + +// Create and Init Manager. +// Manager is called by kube-loxilb when k8s service is created & updated. +func NewMulticlusterLBManager( + crdClient versioned.Interface, + loxiClients []*api.LoxiClient, + networkConfig *config.NetworkConfig, + multiclusterLBInformer crdInformer.MultiClusterLBServiceInformer) *Manager { + + manager := &Manager{ + crdClient: crdClient, + loxiClients: loxiClients, + multiclusterLBInformer: multiclusterLBInformer, + multiclusterLBLister: multiclusterLBInformer.Lister(), + multiclusterLBListerSynced: multiclusterLBInformer.Informer().HasSynced, + + queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "loadbalancer"), + } + + multiclusterLBInformer.Informer().AddEventHandlerWithResyncPeriod( + cache.ResourceEventHandlerFuncs{ + AddFunc: func(cur interface{}) { + manager.enqueueService(cur) + }, + UpdateFunc: func(old, cur interface{}) { + manager.enqueueService(cur) + }, + DeleteFunc: func(old interface{}) { + manager.enqueueService(old) + }, + }, + resyncPeriod, + ) + + return manager +} + +func (m *Manager) enqueueService(obj interface{}) { + lb, ok := obj.(*crdv1.MultiClusterLBService) + if !ok { + deletedState, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + klog.Errorf("Received unexpected object: %v", obj) + return + } + lb, ok = deletedState.Obj.(*crdv1.MultiClusterLBService) + if !ok { + klog.Errorf("DeletedFinalStateUnknown contains non-MultiClusterLBService object: %v", deletedState.Obj) + } + } + + m.queue.Add(lb) +} + +func (m *Manager) Run(stopCh <-chan struct{}, loxiLBLiveCh chan *api.LoxiClient, loxiLBPurgeCh chan *api.LoxiClient, masterEventCh <-chan bool) { + defer m.queue.ShutDown() + + klog.Infof("Starting %s", mgrName) + defer klog.Infof("Shutting down %s", mgrName) + + if !cache.WaitForNamedCacheSync( + mgrName, + stopCh, + m.multiclusterLBListerSynced) { + return + } + + for i := 0; i < defaultWorkers; i++ { + go wait.Until(m.worker, time.Second, stopCh) + } + <-stopCh +} + +func (m *Manager) worker() { + for m.processNextWorkItem() { + } +} + +func (m *Manager) processNextWorkItem() bool { + obj, quit := m.queue.Get() + if quit { + return false + } + + defer m.queue.Done(obj) + + if lb, ok := obj.(*crdv1.MultiClusterLBService); !ok { + m.queue.Forget(obj) + klog.Errorf("Expected string in work queue but got %#v", obj) + return true + } else if err := m.syncMulticlusterLBService(lb); err == nil { + m.queue.Forget(obj) + } else { + m.queue.AddRateLimited(obj) + klog.Errorf("Error syncing CRD MultiClusterLBService %s, requeuing. Error: %v", lb.Name, err) + } + return true +} + +func (m *Manager) syncMulticlusterLBService(lb *crdv1.MultiClusterLBService) error { + startTime := time.Now() + defer func() { + klog.V(4).Infof("Finished syncing MulticlusterLBService %s. (%v)", lb.Name, time.Since(startTime)) + }() + + _, err := m.multiclusterLBLister.Get(lb.Name) + if err != nil { + return m.deleteMulticlusterLBService(lb) + } + return m.addMulticlusterLBService(lb) +} + +func (m *Manager) addMulticlusterLBService(lb *crdv1.MultiClusterLBService) error { + // TODO: This code should be made into a function so that it can be reused. + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + + klog.Infof("ExternalIP: %v", lb.Spec.Model.Service.ExternalIP) + klog.Infof("Port: %v", lb.Spec.Model.Service.Port) + klog.Infof("Protocol: %v", lb.Spec.Model.Service.Protocol) + klog.Infof("Sel: %v", lb.Spec.Model.Service.Sel) + klog.Infof("Mode: %v", lb.Spec.Model.Service.Mode) + klog.Infof("BGP: %v", lb.Spec.Model.Service.BGP) + klog.Infof("Monitor: %v", lb.Spec.Model.Service.Monitor) + klog.Infof("Timeout: %v", lb.Spec.Model.Service.Timeout) + klog.Infof("Block: %v", lb.Spec.Model.Service.Block) + klog.Infof("Managed: %v", lb.Spec.Model.Service.Managed) + klog.Infof("ProbeType: %v", lb.Spec.Model.Service.ProbeType) + klog.Infof("ProbePort: %v", lb.Spec.Model.Service.ProbePort) + klog.Infof("ProbeReq: %v", lb.Spec.Model.Service.ProbeReq) + klog.Infof("ProbeResp: %v", lb.Spec.Model.Service.ProbeResp) + + var errChList []chan error + for _, client := range m.loxiClients { + ch := make(chan error) + go func(c *api.LoxiClient, h chan error) { + var err error + if err = c.LoadBalancer().Create(ctx, &lb.Spec.Model); err != nil { + if !strings.Contains(err.Error(), "exist") { + klog.Errorf("failed to create load-balancer(%s) :%v", c.Url, err) + } else { + err = nil + } + } + h <- err + }(client, ch) + + errChList = append(errChList, ch) + } + + isError := true + for _, errCh := range errChList { + err := <-errCh + if err == nil { + isError = false + } + } + if isError { + klog.Errorf("failed to add load-balancer") + return fmt.Errorf("failed to add loxiLB loadBalancer") + } + + return nil +} + +func (m *Manager) deleteMulticlusterLBService(lb *crdv1.MultiClusterLBService) error { + var errChList []chan error + for _, loxiClient := range m.loxiClients { + ch := make(chan error) + errChList = append(errChList, ch) + + go func(client *api.LoxiClient, ch chan error) { + klog.Infof("called loxilb API: delete lb rule %v", lb.Spec.Model) + ch <- client.LoadBalancer().Delete(context.Background(), &lb.Spec.Model) + }(loxiClient, ch) + } + + isError := true + errStr := "" + for _, errCh := range errChList { + err := <-errCh + if err == nil { + isError = false + break + } else { + errStr = err.Error() + } + } + if isError { + return fmt.Errorf("failed to delete loxiLB LoadBalancer. Error: %v", errStr) + } + return nil +} diff --git a/pkg/crds/multiclusterlbservice/v1/types.go b/pkg/crds/multiclusterlbservice/v1/types.go index 1d5370e..17da6c4 100644 --- a/pkg/crds/multiclusterlbservice/v1/types.go +++ b/pkg/crds/multiclusterlbservice/v1/types.go @@ -17,6 +17,7 @@ package v1 import ( + "github.com/loxilb-io/kube-loxilb/pkg/api" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -46,22 +47,22 @@ type LoadBalancerService struct { Protocol string `json:"protocol" key:"protocol"` Sel EpSelect `json:"sel"` Mode LbMode `json:"mode"` - BGP bool `json:"BGP" options:"bgp"` - Monitor bool `json:"Monitor"` + BGP bool `json:"BGP,omitempty" options:"bgp"` + Monitor bool `json:"Monitor,omitempty"` Timeout uint32 `json:"inactiveTimeOut"` Block uint16 `json:"block" options:"block"` Managed bool `json:"managed,omitempty"` - ProbeType string `json:"probetype"` - ProbePort uint16 `json:"probeport"` - ProbeReq string `json:"probereq"` - ProbeResp string `json:"proberesp"` + ProbeType string `json:"probetype,omitempty"` + ProbePort uint16 `json:"probeport,omitempty"` + ProbeReq string `json:"probereq,omitempty"` + ProbeResp string `json:"proberesp,omitempty"` } type LoadBalancerEndpoint struct { EndpointIP string `json:"endpointIP"` TargetPort uint16 `json:"targetPort"` Weight uint8 `json:"weight"` - State string `json:"state"` + State string `json:"state,omitempty"` } type LoadBalancerSecIp struct { @@ -72,8 +73,17 @@ type LoadBalancerSecIp struct { type MultiClusterLBServiceSpec struct { Model LoadBalancerModel `json:"lbModel"` } + type LoadBalancerModel struct { Service LoadBalancerService `json:"serviceArguments"` - SecondaryIPs []LoadBalancerSecIp `json:"secondaryIPs"` + SecondaryIPs []LoadBalancerSecIp `json:"secondaryIPs,omitempty"` Endpoints []LoadBalancerEndpoint `json:"endpoints"` } + +func (l *LoadBalancerModel) GetKeyStruct() api.LoxiModel { + return &l.Service +} + +func (lbService *LoadBalancerService) GetKeyStruct() api.LoxiModel { + return lbService +} diff --git a/pkg/k8s/client.go b/pkg/k8s/client.go index 0a5e12f..74cc775 100644 --- a/pkg/k8s/client.go +++ b/pkg/k8s/client.go @@ -24,11 +24,13 @@ import ( componentbaseconfig "k8s.io/component-base/config" "k8s.io/klog/v2" aggregatorclientset "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset" + + crdclientset "github.com/loxilb-io/kube-loxilb/pkg/client/clientset/versioned" ) // CreateClients creates kube clients from the given config. func CreateClients(config componentbaseconfig.ClientConnectionConfiguration, kubeAPIServerOverride string) ( - clientset.Interface, aggregatorclientset.Interface, apiextensionclientset.Interface, error) { + clientset.Interface, aggregatorclientset.Interface, crdclientset.Interface, apiextensionclientset.Interface, error) { var kubeConfig *rest.Config var err error @@ -46,7 +48,7 @@ func CreateClients(config componentbaseconfig.ClientConnectionConfiguration, kub } if err != nil { - return nil, nil, nil, err + return nil, nil, nil, nil, err } kubeConfig.AcceptContentTypes = config.AcceptContentTypes @@ -56,22 +58,25 @@ func CreateClients(config componentbaseconfig.ClientConnectionConfiguration, kub client, err := clientset.NewForConfig(kubeConfig) if err != nil { - return nil, nil, nil, err + return nil, nil, nil, nil, err } aggregatorClient, err := aggregatorclientset.NewForConfig(kubeConfig) if err != nil { - return nil, nil, nil, err + return nil, nil, nil, nil, err } + // Create client for crd operations - //crdClient, err := crdclientset.NewForConfig(kubeConfig) + crdClient, err := crdclientset.NewForConfig(kubeConfig) if err != nil { - return nil, nil, nil, err + return nil, nil, nil, nil, err } + // Create client for crd manipulations apiExtensionClient, err := apiextensionclientset.NewForConfig(kubeConfig) if err != nil { - return nil, nil, nil, err + return nil, nil, nil, nil, err } - return client, aggregatorClient, apiExtensionClient, nil + + return client, aggregatorClient, crdClient, apiExtensionClient, nil } diff --git a/pkg/k8s/node.go b/pkg/k8s/node.go index 139343f..7ad5e75 100644 --- a/pkg/k8s/node.go +++ b/pkg/k8s/node.go @@ -24,7 +24,6 @@ import ( "time" tk "github.com/loxilb-io/loxilib" - corev1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -54,7 +53,7 @@ func GetNodeAddr(node *v1.Node) (net.IP, error) { } // GetServiceLocalEndpoints - Get HostIPs of pods belonging to the given service -func GetServiceLocalEndpoints(kubeClient clientset.Interface, svc *corev1.Service, addrType string) ([]string, error) { +func GetServiceLocalEndpoints(kubeClient clientset.Interface, svc *v1.Service, addrType string) ([]string, error) { var epList []string ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)