diff --git a/adapter/config/default_config.go b/adapter/config/default_config.go index 05e3618dc..b30d833dd 100644 --- a/adapter/config/default_config.go +++ b/adapter/config/default_config.go @@ -37,6 +37,15 @@ var defaultConfig = &Config{ Type: "prometheus", Port: 18006, }, + ControlPlane: controlplane{ + EnableAPIPropagation: false, + Host: "localhost", + EventPort: 18000, + RestPort: 18001, + RetryInterval: 5, + APIsRestPath: "/apis", + SkipSSLVerification: false, + }, }, Envoy: envoy{ ListenerCodecType: "AUTO", diff --git a/adapter/config/types.go b/adapter/config/types.go index d53eac1ed..3a6a6d679 100644 --- a/adapter/config/types.go +++ b/adapter/config/types.go @@ -95,6 +95,8 @@ type adapter struct { Environment string // Metric represents configurations to expose/export go metrics Metrics Metrics + // ControlPlane represents the connection configuration of ControlPlane + ControlPlane controlplane } // Envoy Listener Component related configurations. @@ -443,3 +445,18 @@ type responseDirection struct { type operator struct { Namespaces []string } + +type controlplane struct { + EnableAPIPropagation bool + Host string + EventPort uint16 + RestPort uint16 + RetryInterval time.Duration + Persistence persistence + SkipSSLVerification bool + APIsRestPath string +} + +type persistence struct { + Type string +} diff --git a/adapter/internal/controlplane/eventPublisher.go b/adapter/internal/controlplane/eventPublisher.go new file mode 100644 index 000000000..a9c380ad2 --- /dev/null +++ b/adapter/internal/controlplane/eventPublisher.go @@ -0,0 +1,200 @@ +/* + * Copyright (c) 2024, WSO2 LLC. (http://www.wso2.org) All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package controlplane + +import ( + "bytes" + "crypto/tls" + "encoding/json" + "fmt" + "net/http" + "sync" + "time" + + "io/ioutil" + + "github.com/wso2/apk/adapter/config" + "github.com/wso2/apk/adapter/internal/loggers" + "github.com/wso2/apk/adapter/pkg/utils/tlsutils" +) + +var ( + configOnce sync.Once + host string + port uint16 + eventQueue chan APICPEvent + labelsQueue chan APICRLabelsUpdate + wg sync.WaitGroup + apisRestPath string + skipSSL bool +) + +// EventType represents the type of event. +type EventType string + +const ( + // EventTypeCreate signifies a create event. + EventTypeCreate EventType = "CREATE" + // EventTypeUpdate signifies an update event. + EventTypeUpdate EventType = "UPDATE" + // EventTypeDelete signifies a delete event. + EventTypeDelete EventType = "DELETE" + applicationJSON = "application/json" + retryInterval = 5 +) + +// APICRLabelsUpdate hold the label update required for a specific API CR +type APICRLabelsUpdate struct { + Namespace string + Name string + Labels map[string]string +} + +// APICPEvent represents data for the control plane API. +type APICPEvent struct { + Event EventType `json:"event"` + API API `json:"payload"` + CRName string `json:"-"` + CRNamespace string `json:"-"` +} + +// API holds the data that needs to be sent to agent +type API struct { + APIUUID string `json:"apiUUID"` + APIName string `json:"apiName"` + APIVersion string `json:"apiVersion"` + IsDefaultVersion bool `json:"isDefaultVersion"` + Definition string `json:"definition"` + APIType string `json:"apiType"` + BasePath string `json:"basePath"` + Organization string `json:"organization"` + SystemAPI bool `json:"systemAPI"` + APIProperties []Property `json:"apiProperties,omitempty"` + Environment string `json:"environment,omitempty"` + RevisionID string `json:"revisionID"` +} + +// Property holds key value pair of APIProperties +type Property struct { + Name string `json:"name,omitempty"` + Value string `json:"value,omitempty"` +} + +// init reads the configuration and starts the worker to send data. +func init() { + configOnce.Do(func() { + conf := config.ReadConfigs() + if !conf.Adapter.ControlPlane.EnableAPIPropagation { + loggers.LoggerAPK.Info("Adapter control plane is not enabled. Not starting agent worker.") + return + } + host = conf.Adapter.ControlPlane.Host + port = conf.Adapter.ControlPlane.RestPort + apisRestPath = fmt.Sprintf("https://%s:%d%s", host, port, conf.Adapter.ControlPlane.APIsRestPath) + skipSSL = conf.Adapter.ControlPlane.SkipSSLVerification + eventQueue = make(chan APICPEvent, 1000) + labelsQueue = make(chan APICRLabelsUpdate, 1000) + wg.Add(1) + go sendData() + }) +} + +// SendData sends data as a POST request to the control plane host. +func sendData() { + loggers.LoggerAPK.Infof("A thread assigned to send API events to agent") + tr := &http.Transport{} + if !skipSSL { + _, _, truststoreLocation := tlsutils.GetKeyLocations() + caCertPool := tlsutils.GetTrustedCertPool(truststoreLocation) + tr = &http.Transport{ + TLSClientConfig: &tls.Config{RootCAs: caCertPool}, + } + } else { + tr = &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + } + } + + // Configuring the http client + client := &http.Client{ + Transport: tr, + } + defer wg.Done() + for event := range eventQueue { + loggers.LoggerAPK.Infof("Sending api event to agent. Event: %+v", event) + jsonData, err := json.Marshal(event) + if err != nil { + loggers.LoggerAPK.Errorf("Error marshalling data. Error %+v", err) + continue + } + for { + resp, err := client.Post( + apisRestPath, + applicationJSON, + bytes.NewBuffer(jsonData), + ) + if err != nil { + loggers.LoggerAPK.Errorf("Error sending data. Error: %+v, Retrying after %d seconds", err, retryInterval) + // Sleep for some time before retrying + time.Sleep(time.Second * retryInterval) + continue + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + body, _ := ioutil.ReadAll(resp.Body) + loggers.LoggerAPK.Errorf("Error: Unexpected status code: %d, received message: %s, retrying after %d seconds", resp.StatusCode, string(body), retryInterval) + // Sleep for some time before retrying + time.Sleep(time.Second * retryInterval) + continue + } + if event.Event == EventTypeDelete { + // If its a delete event that got propagated to CP then we do not need to update CR. + break + } + var responseMap map[string]interface{} + err = json.NewDecoder(resp.Body).Decode(&responseMap) + if err != nil { + loggers.LoggerAPK.Errorf("Could not decode response body as json. body: %+v", resp.Body) + break + } + // Assuming the response contains an ID field, you can extract it like this: + id, ok := responseMap["id"].(string) + if !ok { + loggers.LoggerAPK.Errorf("Id field not present in response body. encoded body: %+v", responseMap) + break + } + loggers.LoggerAPK.Infof("Adding label update to API %s/%s, Lebels: apiUUID: %s", event.CRNamespace, event.CRName, id) + labelsQueue <- APICRLabelsUpdate{ + Namespace: event.CRNamespace, + Name: event.CRName, + Labels: map[string]string{"apiUUID": id}, + } + break + } + } +} + +// AddToEventQueue adds the api event to queue +func AddToEventQueue(data APICPEvent) { + eventQueue <- data +} + +// GetLabelQueue adds the label change to queue +func GetLabelQueue() *chan APICRLabelsUpdate { + return &labelsQueue +} diff --git a/adapter/internal/operator/controllers/dp/api_controller.go b/adapter/internal/operator/controllers/dp/api_controller.go index 0fb9f966c..591a79db2 100644 --- a/adapter/internal/operator/controllers/dp/api_controller.go +++ b/adapter/internal/operator/controllers/dp/api_controller.go @@ -18,12 +18,17 @@ package dp import ( + "bytes" + "compress/gzip" "context" + "encoding/json" "errors" "fmt" + "io/ioutil" "sync" "github.com/wso2/apk/adapter/config" + "github.com/wso2/apk/adapter/internal/controlplane" "github.com/wso2/apk/adapter/internal/discovery/xds" "github.com/wso2/apk/adapter/internal/discovery/xds/common" "github.com/wso2/apk/adapter/internal/loggers" @@ -90,12 +95,13 @@ var ( // APIReconciler reconciles a API object type APIReconciler struct { - client k8client.Client - ods *synchronizer.OperatorDataStore - ch *chan *synchronizer.APIEvent - successChannel *chan synchronizer.SuccessEvent - statusUpdater *status.UpdateHandler - mgr manager.Manager + client k8client.Client + ods *synchronizer.OperatorDataStore + ch *chan *synchronizer.APIEvent + successChannel *chan synchronizer.SuccessEvent + statusUpdater *status.UpdateHandler + mgr manager.Manager + apiPropagationEnabled bool } // NewAPIController creates a new API controller instance. API Controllers watches for dpv1alpha2.API and gwapiv1b1.HTTPRoute. @@ -110,7 +116,6 @@ func NewAPIController(mgr manager.Manager, operatorDataStore *synchronizer.Opera mgr: mgr, } ctx := context.Background() - c, err := controller.New(constants.APIController, mgr, controller.Options{Reconciler: apiReconciler}) if err != nil { loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error2619, logging.BLOCKER, "Error applying startup APIs: %v", err.Error())) @@ -118,6 +123,7 @@ func NewAPIController(mgr manager.Manager, operatorDataStore *synchronizer.Opera } conf := config.ReadConfigs() + apiReconciler.apiPropagationEnabled = conf.Adapter.ControlPlane.EnableAPIPropagation predicates := []predicate.Predicate{predicate.NewPredicateFuncs(utils.FilterByNamespaces(conf.Adapter.Operator.Namespaces))} if err := c.Watch(source.Kind(mgr.GetCache(), &dpv1alpha2.API{}), &handler.EnqueueRequestForObject{}, @@ -204,6 +210,7 @@ func NewAPIController(mgr manager.Manager, operatorDataStore *synchronizer.Opera loggers.LoggerAPKOperator.Info("API Controller successfully started. Watching API Objects....") go apiReconciler.handleStatus() + go apiReconciler.handleLabels(ctx) return nil } @@ -242,6 +249,13 @@ func (apiReconciler *APIReconciler) Reconcile(ctx context.Context, req ctrl.Requ if err := apiReconciler.client.Get(ctx, req.NamespacedName, &apiCR); err != nil { apiState, found := apiReconciler.ods.GetCachedAPI(req.NamespacedName) if found && k8error.IsNotFound(err) { + if apiReconciler.apiPropagationEnabled { + // Convert api state to api cp data + loggers.LoggerAPKOperator.Info("Sending API deletion event to agent") + apiCpData := apiReconciler.convertAPIStateToAPICp(ctx, apiState) + apiCpData.Event = controlplane.EventTypeDelete + controlplane.AddToEventQueue(apiCpData) + } // The api doesn't exist in the api Cache, remove it apiReconciler.ods.DeleteCachedAPI(req.NamespacedName) loggers.LoggerAPKOperator.Infof("Delete event received for API : %s with API UUID : %v, hence deleted from API cache", @@ -416,11 +430,24 @@ func (apiReconciler *APIReconciler) resolveAPIRefs(ctx context.Context, api dpv1 loggers.LoggerAPKOperator.Debugf("Child references are retrieved successfully for API CR %s", apiRef.String()) if !api.Status.DeploymentStatus.Accepted { + if apiReconciler.apiPropagationEnabled { + // Publish the api data to CP + apiCpData := apiReconciler.convertAPIStateToAPICp(ctx, *apiState) + apiCpData.Event = controlplane.EventTypeCreate + controlplane.AddToEventQueue(apiCpData) + } + apiReconciler.ods.AddAPIState(apiRef, apiState) apiReconciler.traverseAPIStateAndUpdateOwnerReferences(ctx, *apiState) return &synchronizer.APIEvent{EventType: constants.Create, Events: []synchronizer.APIState{*apiState}, UpdatedEvents: []string{}}, nil } else if cachedAPI, events, updated := apiReconciler.ods.UpdateAPIState(apiRef, apiState); updated { + if apiReconciler.apiPropagationEnabled { + // Publish the api data to CP + apiCpData := apiReconciler.convertAPIStateToAPICp(ctx, *apiState) + apiCpData.Event = controlplane.EventTypeUpdate + controlplane.AddToEventQueue(apiCpData) + } apiReconciler.traverseAPIStateAndUpdateOwnerReferences(ctx, *apiState) loggers.LoggerAPKOperator.Infof("API CR %s with API UUID : %v is updated on %v", apiRef.String(), string(api.ObjectMeta.UID), events) @@ -2011,6 +2038,40 @@ func (apiReconciler *APIReconciler) handleStatus() { } } +func (apiReconciler *APIReconciler) handleLabels(ctx context.Context) { + type patchStringValue struct { + Op string `json:"op"` + Path string `json:"path"` + Value string `json:"value"` + } + + loggers.LoggerAPKOperator.Info("A thread assigned to handle label updates to API CR.") + for labelUpdate := range *controlplane.GetLabelQueue() { + loggers.LoggerAPKOperator.Infof("Starting to process label update for API %s/%s. Labels: %+v", labelUpdate.Namespace, labelUpdate.Name, labelUpdate.Labels) + + patchOps := []patchStringValue{} + for key, value := range labelUpdate.Labels { + patchOps = append(patchOps, patchStringValue{ + Op: "replace", + Path: fmt.Sprintf("/metadata/labels/%s", key), + Value: value, + }) + } + payloadBytes, _ := json.Marshal(patchOps) + apiCR := dpv1alpha2.API{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: labelUpdate.Namespace, + Name: labelUpdate.Name, + }, + } + + err := apiReconciler.client.Patch(ctx, &apiCR, k8client.RawPatch(types.JSONPatchType, payloadBytes)) + if err != nil { + loggers.LoggerAPKOperator.Errorf("Failed to patch api %s/%s with patch: %+v, error: %+v", labelUpdate.Name, labelUpdate.Namespace, patchOps, err) + } + } +} + func (apiReconciler *APIReconciler) handleOwnerReference(ctx context.Context, obj k8client.Object, apiRequests *[]reconcile.Request) { apis := []dpv1alpha2.API{} for _, req := range *apiRequests { @@ -2060,3 +2121,59 @@ func prepareOwnerReference(apiItems []dpv1alpha2.API) []metav1.OwnerReference { } return ownerReferences } + +func (apiReconciler *APIReconciler) convertAPIStateToAPICp(ctx context.Context, apiState synchronizer.APIState) controlplane.APICPEvent { + apiCPEvent := controlplane.APICPEvent{} + spec := apiState.APIDefinition.Spec + configMap := &corev1.ConfigMap{} + apiDef := "" + if spec.DefinitionFileRef != "" { + err := apiReconciler.client.Get(ctx, types.NamespacedName{Namespace: apiState.APIDefinition.Namespace, Name: spec.DefinitionFileRef}, configMap) + if err != nil { + loggers.LoggerAPKOperator.Errorf("Error while loading config map for the api definition: %+v, Error: %v", types.NamespacedName{Namespace: apiState.APIDefinition.Namespace, Name: spec.DefinitionFileRef}, err) + } else { + for _, val := range configMap.BinaryData { + buf := bytes.NewReader(val) + r, err := gzip.NewReader(buf) + if err != nil { + loggers.LoggerAPKOperator.Errorf("Error creating gzip reader. Error: %+v", err) + continue + } + defer r.Close() + decompressed, err := ioutil.ReadAll(r) + if err != nil { + loggers.LoggerAPKOperator.Errorf("Error reading decompressed data. Error: %+v", err) + continue + } + apiDef = string(decompressed) + } + } + } + apiUUID, apiUUIDExists := apiState.APIDefinition.ObjectMeta.Labels["apiUUID"] + if !apiUUIDExists { + apiUUID = spec.APIName + } + revisionID, revisionIDExists := apiState.APIDefinition.ObjectMeta.Labels["revisionID"] + if !revisionIDExists { + revisionID = "0" + } + + api := controlplane.API{ + APIName: spec.APIName, + APIVersion: spec.APIVersion, + IsDefaultVersion: spec.IsDefaultVersion, + APIType: spec.APIType, + BasePath: spec.BasePath, + Organization: spec.Organization, + Environment: spec.Environment, + SystemAPI: spec.SystemAPI, + Definition: apiDef, + APIUUID: apiUUID, + RevisionID: revisionID, + } + apiCPEvent.API = api + apiCPEvent.CRName = apiState.APIDefinition.ObjectMeta.Name + apiCPEvent.CRNamespace = apiState.APIDefinition.ObjectMeta.Namespace + return apiCPEvent + +} diff --git a/helm-charts/templates/data-plane/gateway-components/log-conf.yaml b/helm-charts/templates/data-plane/gateway-components/log-conf.yaml index cedf817ff..6307f4d4f 100644 --- a/helm-charts/templates/data-plane/gateway-components/log-conf.yaml +++ b/helm-charts/templates/data-plane/gateway-components/log-conf.yaml @@ -19,7 +19,16 @@ data: enabled = {{.Values.wso2.apk.metrics.enabled}} type = "{{.Values.wso2.apk.metrics.type| default "prometheus" }}" port = 18006 - {{ end}} + {{ end}} + + {{- if and .Values.wso2.apk.cp .Values.wso2.apk.cp.controlplane }} + [adapter.controlplane] + enableAPIPropagation = {{ .Values.wso2.apk.cp.controlplane.enableApiPropagation | default false }} + host = "{{ .Values.wso2.apk.cp.controlplane.host | default "apim-apk-agent-service.apk.svc.cluster.local" }}" + eventPort = {{ .Values.wso2.apk.cp.controlplane.port | default 18000 }} + restPort = {{ .Values.wso2.apk.cp.controlplane.restPort | default 18001 }} + skipSSLVerification = {{ .Values.wso2.apk.cp.controlplane.skipSSLVerification | default false }} + {{- end }} {{ if and .Values.wso2.apk.dp.gatewayRuntime.deployment .Values.wso2.apk.dp.gatewayRuntime.deployment.router .Values.wso2.apk.dp.gatewayRuntime.deployment.router.configs }} [router] diff --git a/helm-charts/templates/serviceAccount/apk-cluster-role.yaml b/helm-charts/templates/serviceAccount/apk-cluster-role.yaml index 0eb1b6ef8..ccbb849a5 100644 --- a/helm-charts/templates/serviceAccount/apk-cluster-role.yaml +++ b/helm-charts/templates/serviceAccount/apk-cluster-role.yaml @@ -31,7 +31,7 @@ rules: verbs: [ "get","patch","update" ] - apiGroups: ["dp.wso2.com"] resources: ["apis"] - verbs: ["get","list","watch","update","delete","create"] + verbs: ["get","list","watch","update","delete","create", "patch"] - apiGroups: ["dp.wso2.com"] resources: ["apis/finalizers"] verbs: ["update"] diff --git a/helm-charts/values.yaml.template b/helm-charts/values.yaml.template index f69cdea5e..396c13718 100644 --- a/helm-charts/values.yaml.template +++ b/helm-charts/values.yaml.template @@ -68,6 +68,16 @@ wso2: secretName: "" # -- IDP jwt signing certificate file name fileName: "" + cp: + controlplane: + # -- Enable controlplane connection + enableApiPropagation: false + # -- Hostname of the APK agent service + host: "apim-apk-agent-service.apk.svc.cluster.local" + # -- Port of the APK agent service for events + restPort: 18001 + # -- Skip SSL verification + skipSSLVerification: false dp: # -- Enable the deployment of the Data Plane enabled: true