Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve subscription based airl cp to dp #1201

Merged
merged 1 commit into from
Sep 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apim-apk-agent/internal/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func Run(conf *config.Config) {
synchronizer.FetchRateLimitPoliciesOnEvent("", "", mgr.GetClient())
}
// Load initial Subscription Rate Limit data from control plane
synchronizer.FetchSubscriptionRateLimitPoliciesOnEvent("", "", mgr.GetClient())
synchronizer.FetchSubscriptionRateLimitPoliciesOnEvent("", "", mgr.GetClient(), true)
// Load initial AI Provider data from control plane
synchronizer.FetchAIProvidersOnEvent("", "", "", mgr.GetClient())

Expand Down
19 changes: 17 additions & 2 deletions apim-apk-agent/internal/k8sClient/k8s_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,8 @@ func DeployAIRateLimitPolicyFromCPPolicy(policy eventhubTypes.SubscriptionPolicy
}

crRateLimitPolicies := dpv1alpha3.AIRateLimitPolicy{
ObjectMeta: metav1.ObjectMeta{Name: policy.Name,
ObjectMeta: metav1.ObjectMeta{
Name: getSha1Value(policy.Name),
Namespace: conf.DataPlane.Namespace,
},
Spec: dpv1alpha3.AIRateLimitPolicySpec{
Expand Down Expand Up @@ -527,7 +528,7 @@ func DeployAIRateLimitPolicyFromCPPolicy(policy eventhubTypes.SubscriptionPolicy
func UnDeploySubscriptionRateLimitPolicyCR(policyName string, k8sClient client.Client) {
conf, _ := config.ReadConfigs()
crRateLimitPolicies := &dpv1alpha1.RateLimitPolicy{}
if err := k8sClient.Get(context.Background(), client.ObjectKey{Namespace: conf.DataPlane.Namespace, Name: policyName}, crRateLimitPolicies); err != nil {
if err := k8sClient.Get(context.Background(), client.ObjectKey{Namespace: conf.DataPlane.Namespace, Name: getSha1Value(policyName)}, crRateLimitPolicies); err != nil {
loggers.LoggerK8sClient.Error("Unable to get RateLimitPolicies CR: " + err.Error())
}
err := k8sClient.Delete(context.Background(), crRateLimitPolicies, &client.DeleteOptions{})
Expand All @@ -537,6 +538,20 @@ func UnDeploySubscriptionRateLimitPolicyCR(policyName string, k8sClient client.C
loggers.LoggerK8sClient.Debug("RateLimitPolicies CR deleted: " + crRateLimitPolicies.Name)
}

// UndeploySubscriptionAIRateLimitPolicyCR applies the given AIRateLimitPolicies struct to the Kubernetes cluster.
func UndeploySubscriptionAIRateLimitPolicyCR(policyName string, k8sClient client.Client) {
conf, _ := config.ReadConfigs()
crAIRateLimitPolicies := &dpv1alpha3.AIRateLimitPolicy{}
if err := k8sClient.Get(context.Background(), client.ObjectKey{Namespace: conf.DataPlane.Namespace, Name: getSha1Value(policyName)}, crAIRateLimitPolicies); err != nil {
loggers.LoggerK8sClient.Error("Unable to get AIRateLimitPolicies CR: " + err.Error())
}
err := k8sClient.Delete(context.Background(), crAIRateLimitPolicies, &client.DeleteOptions{})
if err != nil {
loggers.LoggerK8sClient.Error("Unable to delete AIRateLimitPolicies CR: " + err.Error())
}
loggers.LoggerK8sClient.Debug("AIRateLimitPolicies CR deleted: " + crAIRateLimitPolicies.Name)
}

// DeployBackendCR applies the given Backends struct to the Kubernetes cluster.
func DeployBackendCR(backends *dpv1alpha2.Backend, k8sClient client.Client) {
crBackends := &dpv1alpha2.Backend{}
Expand Down
13 changes: 10 additions & 3 deletions apim-apk-agent/internal/messaging/notification_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,12 @@ func processNotificationEvent(conf *config.Config, notification *msg.EventNotifi
} else if strings.Contains(eventType, subscriptionEventType) {
handleSubscriptionEvents(decodedByte, eventType)
} else if strings.Contains(eventType, policyEventType) {
if AgentMode == "CPtoDP" {
var policyEvent msg.PolicyInfo
policyEventErr := json.Unmarshal([]byte(string(decodedByte)), &policyEvent)
if policyEventErr != nil {
logger.LoggerMessaging.Errorf("Error occurred while unmarshalling Throttling Policy event data %v", policyEventErr)
}
if AgentMode == "CPtoDP" || strings.EqualFold(policyEvent.PolicyType, "SUBSCRIPTION") {
handlePolicyEvents(decodedByte, eventType, c)
}
} else if strings.Contains(eventType, aiProviderEventType) {
Expand Down Expand Up @@ -460,7 +465,7 @@ func handlePolicyEvents(data []byte, eventType string, c client.Client) {
logger.LoggerMessaging.Infof("Rate Limit Policies Internal Map: %v", ratelimitPolicies)
} else if strings.EqualFold(policyEvent.PolicyType, "SUBSCRIPTION") {
logger.LoggerMessaging.Infof("Policy: %s for policy type: %s", policyEvent.PolicyName, policyEvent.PolicyType)
synchronizer.FetchSubscriptionRateLimitPoliciesOnEvent(policyEvent.PolicyName, policyEvent.TenantDomain, c)
synchronizer.FetchSubscriptionRateLimitPoliciesOnEvent(policyEvent.PolicyName, policyEvent.TenantDomain, c, false)
ratelimitPolicies := managementserver.GetAllRateLimitPolicies()
logger.LoggerMessaging.Infof("Rate Limit Policies Internal Map: %v", ratelimitPolicies)
}
Expand All @@ -472,7 +477,7 @@ func handlePolicyEvents(data []byte, eventType string, c client.Client) {
logger.LoggerMessaging.Infof("Rate Limit Policies Internal Map: %v", ratelimitPolicies)
} else if strings.EqualFold(policyEvent.PolicyType, "SUBSCRIPTION") {
logger.LoggerMessaging.Infof("Policy: %s for policy type: %s", policyEvent.PolicyName, policyEvent.PolicyType)
synchronizer.FetchSubscriptionRateLimitPoliciesOnEvent(policyEvent.PolicyName, policyEvent.TenantDomain, c)
synchronizer.FetchSubscriptionRateLimitPoliciesOnEvent(policyEvent.PolicyName, policyEvent.TenantDomain, c, false)
ratelimitPolicies := managementserver.GetAllRateLimitPolicies()
logger.LoggerMessaging.Infof("Rate Limit Policies Internal Map: %v", ratelimitPolicies)
}
Expand All @@ -484,7 +489,9 @@ func handlePolicyEvents(data []byte, eventType string, c client.Client) {
logger.LoggerMessaging.Infof("Rate Limit Policies Internal Map: %v", ratelimitPolicies)
} else if strings.EqualFold(policyEvent.PolicyType, "SUBSCRIPTION") {
logger.LoggerMessaging.Infof("Policy: %s for policy type: %s", policyEvent.PolicyName, policyEvent.PolicyType)
managementserver.DeleteSubscriptionPolicy(policyEvent.PolicyName, policyEvent.TenantDomain)
k8sclient.UnDeploySubscriptionRateLimitPolicyCR(policyEvent.PolicyName, c)
k8sclient.UndeploySubscriptionAIRateLimitPolicyCR(policyEvent.PolicyName, c)
ratelimitPolicies := managementserver.GetAllRateLimitPolicies()
logger.LoggerMessaging.Infof("Rate Limit Policies Internal Map: %v", ratelimitPolicies)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func FetchRateLimitPoliciesOnEvent(ratelimitName string, organization string, c
}

// FetchSubscriptionRateLimitPoliciesOnEvent fetches the policies from the control plane on the start up and notification event updates
func FetchSubscriptionRateLimitPoliciesOnEvent(ratelimitName string, organization string, c client.Client) {
func FetchSubscriptionRateLimitPoliciesOnEvent(ratelimitName string, organization string, c client.Client, cleanupDeletedPolicies bool) {
logger.LoggerSynchronizer.Info("Fetching Subscription RateLimit Policies from Control Plane.")

// Read configurations and derive the eventHub details
Expand Down Expand Up @@ -308,7 +308,7 @@ func retrySubscriptionRLPFetchData(conf *config.Config, errorMessage string, err
logger.LoggerSynchronizer.Debugf("Time Duration for retrying: %v",
conf.ControlPlane.RetryInterval*time.Second)
time.Sleep(conf.ControlPlane.RetryInterval * time.Second)
FetchSubscriptionRateLimitPoliciesOnEvent("", "", c)
FetchSubscriptionRateLimitPoliciesOnEvent("", "", c, false)
retryAttempt++
if retryAttempt >= retryCount {
logger.LoggerSynchronizer.Errorf(errorMessage, err)
Expand Down
10 changes: 10 additions & 0 deletions apim-apk-agent/pkg/managementserver/event_holder.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ func AddSubscriptionPolicy(rateLimitPolicy eventHub.SubscriptionPolicy) {
subscriptionPolicyMap[rateLimitPolicy.Name+rateLimitPolicy.TenantDomain] = rateLimitPolicy
}

// GetSubscriptionPolicies return the subscription policy map
func GetSubscriptionPolicies() map[string]eventHub.SubscriptionPolicy {
return subscriptionPolicyMap
}

// GetRateLimitPolicy returns a rate limit policy from the rateLimitPolicyMap
func GetRateLimitPolicy(name string, tenantDomain string) eventHub.RateLimitPolicy {
return rateLimitPolicyMap[name+tenantDomain]
Expand All @@ -95,6 +100,11 @@ func DeleteRateLimitPolicy(name string, tenantDomain string) {
delete(rateLimitPolicyMap, name+tenantDomain)
}

// DeleteSubscriptionPolicy deletes a subscription policy from the subscriptionPolicyMap
func DeleteSubscriptionPolicy(name string, tenantDomain string){
delete(subscriptionPolicyMap, name+tenantDomain)
}

// UpdateRateLimitPolicy updates a rate limit policy in the rateLimitPolicyMap
func UpdateRateLimitPolicy(name string, tenantDomain string, rateLimitPolicy eventHub.RateLimitPolicy) {
rateLimitPolicyMap[name+tenantDomain] = rateLimitPolicy
Expand Down
Loading