Skip to content

Commit

Permalink
Update ratelimit polies
Browse files Browse the repository at this point in the history
  • Loading branch information
Krishanx92 committed Sep 26, 2024
1 parent 6ee7ed0 commit 434aca2
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 31 deletions.
2 changes: 1 addition & 1 deletion apim-apk-agent/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
github.com/pelletier/go-toml v1.9.5
github.com/sirupsen/logrus v1.9.3
github.com/stretchr/testify v1.9.0
github.com/wso2/apk/common-go-libs v0.0.0-20240923143402-ff7fdb0366f9
github.com/wso2/apk/common-go-libs v0.0.0-20240926095654-65776db416b4
google.golang.org/grpc v1.62.0
gopkg.in/natefinch/lumberjack.v2 v2.2.1
gopkg.in/yaml.v2 v2.4.0
Expand Down
4 changes: 2 additions & 2 deletions apim-apk-agent/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,8 @@ github.com/vektah/gqlparser v1.3.1 h1:8b0IcD3qZKWJQHSzynbDlrtP3IxVydZ2DZepCGofqf
github.com/vektah/gqlparser v1.3.1/go.mod h1:bkVf0FX+Stjg/MHnm8mEyubuaArhNEqfQhF+OTiAL74=
github.com/wso2/apk/adapter v0.0.0-20240408123538-86a74d977eee h1:g0ivVkzybfcEkB0vBGTAXTUuMZpsF3zOTVtAgmW851s=
github.com/wso2/apk/adapter v0.0.0-20240408123538-86a74d977eee/go.mod h1:xYS5auF/YxnyRykw7NBSn/YR2FHD4hTeyav4Nhec8d0=
github.com/wso2/apk/common-go-libs v0.0.0-20240923143402-ff7fdb0366f9 h1:MwQqG+/ODDIfLfc3xNMYk6jM+hB2ttjwZnaDBeiMOJI=
github.com/wso2/apk/common-go-libs v0.0.0-20240923143402-ff7fdb0366f9/go.mod h1:SbZVA1jeiVG9dqk9fGcY/bB0JgEaQgtXqFAlxAfN0Lk=
github.com/wso2/apk/common-go-libs v0.0.0-20240926095654-65776db416b4 h1:TFuNwE6NWvJtFJeZjtBCIU/wCxZHpA7HpoWR0loC6LA=
github.com/wso2/apk/common-go-libs v0.0.0-20240926095654-65776db416b4/go.mod h1:SbZVA1jeiVG9dqk9fGcY/bB0JgEaQgtXqFAlxAfN0Lk=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
Expand Down
65 changes: 45 additions & 20 deletions apim-apk-agent/internal/k8sClient/k8s_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,30 +424,41 @@ func UpdateRateLimitPolicyCR(policy eventhubTypes.RateLimitPolicy, k8sClient cli
// DeploySubscriptionRateLimitPolicyCR applies the given RateLimitPolicies struct to the Kubernetes cluster.
func DeploySubscriptionRateLimitPolicyCR(policy eventhubTypes.SubscriptionPolicy, k8sClient client.Client) {
conf, _ := config.ReadConfigs()

crRateLimitPolicies := dpv1alpha3.RateLimitPolicy{
ObjectMeta: metav1.ObjectMeta{Name: policy.Name,
Namespace: conf.DataPlane.Namespace,
},
Spec: dpv1alpha3.RateLimitPolicySpec{
Override: &dpv1alpha3.RateLimitAPIPolicy{
Subscription: &dpv1alpha3.SubscriptionRateLimitPolicy{
StopOnQuotaReach: policy.StopOnQuotaReach,
Organization: policy.TenantDomain,
RequestCount: &dpv1alpha3.RequestCount{
RequestsPerUnit: uint32(policy.DefaultLimit.RequestCount.RequestCount),
Unit: policy.DefaultLimit.RequestCount.TimeUnit,
crRateLimitPolicy := dpv1alpha3.RateLimitPolicy{}
if err := k8sClient.Get(context.Background(), client.ObjectKey{Namespace: conf.DataPlane.Namespace, Name: policy.Name}, &crRateLimitPolicy); err != nil {
crRateLimitPolicy = dpv1alpha3.RateLimitPolicy{
ObjectMeta: metav1.ObjectMeta{Name: policy.Name,
Namespace: conf.DataPlane.Namespace,
},
Spec: dpv1alpha3.RateLimitPolicySpec{
Override: &dpv1alpha3.RateLimitAPIPolicy{
Subscription: &dpv1alpha3.SubscriptionRateLimitPolicy{
StopOnQuotaReach: policy.StopOnQuotaReach,
Organization: policy.TenantDomain,
RequestCount: &dpv1alpha3.RequestCount{
RequestsPerUnit: uint32(policy.DefaultLimit.RequestCount.RequestCount),
Unit: policy.DefaultLimit.RequestCount.TimeUnit,
},
},
},
TargetRef: gwapiv1b1.PolicyTargetReference{Group: constants.GatewayGroup, Kind: "Subscription", Name: "default"},
},
TargetRef: gwapiv1b1.PolicyTargetReference{Group: constants.GatewayGroup, Kind: "Subscription", Name: "default"},
},
}

if err := k8sClient.Create(context.Background(), &crRateLimitPolicies); err != nil {
loggers.LoggerK8sClient.Error("Unable to create RateLimitPolicies CR: " + err.Error())
}
if err := k8sClient.Create(context.Background(), &crRateLimitPolicy); err != nil {
loggers.LoggerK8sClient.Error("Unable to create RateLimitPolicies CR: " + err.Error())
} else {
loggers.LoggerK8sClient.Info("RateLimitPolicies CR created: " + crRateLimitPolicy.Name)
}
} else {
loggers.LoggerK8sClient.Info("RateLimitPolicies CR created: " + crRateLimitPolicies.Name)
crRateLimitPolicy.Spec.Override.Subscription.StopOnQuotaReach = policy.StopOnQuotaReach
crRateLimitPolicy.Spec.Override.Subscription.Organization = policy.TenantDomain
crRateLimitPolicy.Spec.Override.Subscription.RequestCount.RequestsPerUnit = uint32(policy.DefaultLimit.RequestCount.RequestCount)
crRateLimitPolicy.Spec.Override.Subscription.RequestCount.Unit = policy.DefaultLimit.RequestCount.TimeUnit
if err := k8sClient.Update(context.Background(), &crRateLimitPolicy); err != nil {
loggers.LoggerK8sClient.Error("Unable to update RateLimitPolicies CR: " + err.Error())
} else {
loggers.LoggerK8sClient.Info("RateLimitPolicies CR updated: " + crRateLimitPolicy.Name)
}
}

}
Expand Down Expand Up @@ -512,6 +523,20 @@ func DeployAIRateLimitPolicyFromCPPolicy(policy eventhubTypes.SubscriptionPolicy
}
}

// UnDeploySubscriptionRateLimitPolicyCR applies the given RateLimitPolicies struct to the Kubernetes cluster.
func UnDeploySubscriptionRateLimitPolicyCR(policyName string, k8sClient client.Client) {
conf, _ := config.ReadConfigs()
crRateLimitPolicies := &dpv1alpha1.RateLimitPolicy{}
if err := k8sClient.Get(context.Background(), client.ObjectKey{Namespace: conf.DataPlane.Namespace, Name: policyName}, crRateLimitPolicies); err != nil {
loggers.LoggerK8sClient.Error("Unable to get RateLimitPolicies CR: " + err.Error())
}
err := k8sClient.Delete(context.Background(), crRateLimitPolicies, &client.DeleteOptions{})
if err != nil {
loggers.LoggerK8sClient.Error("Unable to delete RateLimitPolicies CR: " + err.Error())
}
loggers.LoggerK8sClient.Debug("RateLimitPolicies CR deleted: " + crRateLimitPolicies.Name)
}

// DeployBackendCR applies the given Backends struct to the Kubernetes cluster.
func DeployBackendCR(backends *dpv1alpha2.Backend, k8sClient client.Client) {
crBackends := &dpv1alpha2.Backend{}
Expand Down
30 changes: 22 additions & 8 deletions apim-apk-agent/internal/messaging/notification_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,15 +467,29 @@ func handlePolicyEvents(data []byte, eventType string, c client.Client) {
logger.LoggerMessaging.Infof("Rate Limit Policies Internal Map: %v", ratelimitPolicies)
}
} else if strings.EqualFold(eventType, policyUpdate) {
logger.LoggerMessaging.Infof("Policy: %s for policy type: %s for tenant: %s", policyEvent.PolicyName, policyEvent.PolicyType, policyEvent.TenantDomain)
synchronizer.FetchRateLimitPoliciesOnEvent(policyEvent.PolicyName, policyEvent.TenantDomain, c)
ratelimitPolicies := managementserver.GetAllRateLimitPolicies()
logger.LoggerMessaging.Infof("Rate Limit Policies Internal Map: %v", ratelimitPolicies)
if strings.EqualFold(policyEvent.PolicyType, "API") {
logger.LoggerMessaging.Infof("Policy: %s for policy type: %s for tenant: %s", policyEvent.PolicyName, policyEvent.PolicyType, policyEvent.TenantDomain)
synchronizer.FetchRateLimitPoliciesOnEvent(policyEvent.PolicyName, policyEvent.TenantDomain, c)
ratelimitPolicies := managementserver.GetAllRateLimitPolicies()
logger.LoggerMessaging.Infof("Rate Limit Policies Internal Map: %v", ratelimitPolicies)
} else if strings.EqualFold(policyEvent.PolicyType, "SUBSCRIPTION") {
logger.LoggerMessaging.Infof("Policy: %s for policy type: %s", policyEvent.PolicyName, policyEvent.PolicyType)
synchronizer.FetchSubscriptionRateLimitPoliciesOnEvent(policyEvent.PolicyName, policyEvent.TenantDomain, c)
ratelimitPolicies := managementserver.GetAllRateLimitPolicies()
logger.LoggerMessaging.Infof("Rate Limit Policies Internal Map: %v", ratelimitPolicies)
}
} else if strings.EqualFold(eventType, policyDelete) {
logger.LoggerMessaging.Infof("Policy: %s for policy type: %s", policyEvent.PolicyName, policyEvent.PolicyType)
managementserver.DeleteRateLimitPolicy(policyEvent.PolicyName, policyEvent.TenantDomain)
ratelimitPolicies := managementserver.GetAllRateLimitPolicies()
logger.LoggerMessaging.Infof("Rate Limit Policies Internal Map: %v", ratelimitPolicies)
if strings.EqualFold(policyEvent.PolicyType, "API") {
logger.LoggerMessaging.Infof("Policy: %s for policy type: %s", policyEvent.PolicyName, policyEvent.PolicyType)
managementserver.DeleteRateLimitPolicy(policyEvent.PolicyName, policyEvent.TenantDomain)
ratelimitPolicies := managementserver.GetAllRateLimitPolicies()
logger.LoggerMessaging.Infof("Rate Limit Policies Internal Map: %v", ratelimitPolicies)
} else if strings.EqualFold(policyEvent.PolicyType, "SUBSCRIPTION") {
logger.LoggerMessaging.Infof("Policy: %s for policy type: %s", policyEvent.PolicyName, policyEvent.PolicyType)
k8sclient.UnDeploySubscriptionRateLimitPolicyCR(policyEvent.PolicyName, c)
ratelimitPolicies := managementserver.GetAllRateLimitPolicies()
logger.LoggerMessaging.Infof("Rate Limit Policies Internal Map: %v", ratelimitPolicies)
}
}

if strings.EqualFold(applicationEventType, policyEvent.PolicyType) {
Expand Down

0 comments on commit 434aca2

Please sign in to comment.