From fd01bb826629ba0803c363f95c278ffa199d48d5 Mon Sep 17 00:00:00 2001 From: Susana Garcia Date: Thu, 8 Dec 2022 17:25:19 +0100 Subject: [PATCH 1/2] extend object storage collector filtering by bucket by namespace label and list of namespaces --- pkg/clients/cluster/kubernetes.go | 8 +++- pkg/service/sos/objectstorage.go | 60 ++++++++++++++++++++------- pkg/service/sos/objectstorage_test.go | 14 +++++-- 3 files changed, 61 insertions(+), 21 deletions(-) diff --git a/pkg/clients/cluster/kubernetes.go b/pkg/clients/cluster/kubernetes.go index f7b4373..f7bafed 100644 --- a/pkg/clients/cluster/kubernetes.go +++ b/pkg/clients/cluster/kubernetes.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/vshn/provider-exoscale/apis" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/dynamic" "k8s.io/client-go/rest" @@ -11,8 +12,11 @@ import ( ) // InitK8sClient creates a k8s client from the server url and token url -func InitK8sClient(url, token string) (*client.Client, error) { +func InitK8sClient(url, token string) (client.Client, error) { scheme := runtime.NewScheme() + if err := corev1.AddToScheme(scheme); err != nil { + return nil, fmt.Errorf("core scheme: %w", err) + } err := apis.AddToScheme(scheme) if err != nil { return nil, fmt.Errorf("cannot add k8s exoscale scheme: %w", err) @@ -24,7 +28,7 @@ func InitK8sClient(url, token string) (*client.Client, error) { if err != nil { return nil, fmt.Errorf("cannot initialize k8s client: %w", err) } - return &k8sClient, nil + return k8sClient, nil } // InitK8sClientDynamic creates a dynamic k8s client from the server url and token url diff --git a/pkg/service/sos/objectstorage.go b/pkg/service/sos/objectstorage.go index ede2792..2a0c96b 100644 --- a/pkg/service/sos/objectstorage.go +++ b/pkg/service/sos/objectstorage.go @@ -13,7 +13,9 @@ import ( egoscale "github.com/exoscale/egoscale/v2" db "github.com/vshn/exoscale-metrics-collector/pkg/database" exoscalev1 "github.com/vshn/provider-exoscale/apis/exoscale/v1" + corev1 "k8s.io/api/core/v1" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" k8s "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -32,10 +34,10 @@ type BucketDetail struct { } // NewObjectStorage creates an ObjectStorage with the initial setup -func NewObjectStorage(exoscaleClient *egoscale.Client, k8sClient *k8s.Client, databaseURL string) ObjectStorage { +func NewObjectStorage(exoscaleClient *egoscale.Client, k8sClient k8s.Client, databaseURL string) ObjectStorage { return ObjectStorage{ exoscaleClient: exoscaleClient, - k8sClient: *k8sClient, + k8sClient: k8sClient, database: &db.SosDatabase{ Database: db.Database{ URL: databaseURL, @@ -51,7 +53,7 @@ func (o *ObjectStorage) Execute(ctx context.Context) error { p := pipeline.NewPipeline[context.Context]() p.WithSteps( - p.NewStep("Fetch managed buckets", o.fetchManagedBuckets), + p.NewStep("Fetch managed buckets and namespaces", o.fetchManagedBucketsAndNamespaces), p.NewStep("Get bucket usage", o.getBucketUsage), p.NewStep("Get billing date", o.getBillingDate), p.NewStep("Save to database", o.saveToDatabase), @@ -79,9 +81,9 @@ func (o *ObjectStorage) getBucketUsage(ctx context.Context) error { return nil } -func (o *ObjectStorage) fetchManagedBuckets(ctx context.Context) error { +func (o *ObjectStorage) fetchManagedBucketsAndNamespaces(ctx context.Context) error { log := ctrl.LoggerFrom(ctx) - log.Info("Fetching buckets from cluster") + log.Info("Fetching buckets and namespaces from cluster") buckets := exoscalev1.BucketList{} log.V(1).Info("Listing buckets from cluster") @@ -89,7 +91,14 @@ func (o *ObjectStorage) fetchManagedBuckets(ctx context.Context) error { if err != nil { return fmt.Errorf("cannot list buckets: %w", err) } - o.bucketDetails = addOrgAndNamespaceToBucket(ctx, buckets) + + log.V(1).Info("Listing namespaces from cluster") + namespaces, err := fetchNamespaceWithOrganizationMap(ctx, o.k8sClient) + if err != nil { + return fmt.Errorf("cannot list namespaces: %w", err) + } + + o.bucketDetails = addOrgAndNamespaceToBucket(ctx, buckets, namespaces) return nil } @@ -148,7 +157,7 @@ func getAggregatedBuckets(ctx context.Context, sosBucketsUsage []oapi.SosBucketU return aggregatedBuckets } -func addOrgAndNamespaceToBucket(ctx context.Context, buckets exoscalev1.BucketList) []BucketDetail { +func addOrgAndNamespaceToBucket(ctx context.Context, buckets exoscalev1.BucketList, namespaces map[string]string) []BucketDetail { log := ctrl.LoggerFrom(ctx) log.V(1).Info("Gathering org and namespace from buckets") @@ -157,17 +166,17 @@ func addOrgAndNamespaceToBucket(ctx context.Context, buckets exoscalev1.BucketLi bucketDetail := BucketDetail{ BucketName: bucket.Spec.ForProvider.BucketName, } - if organization, exist := bucket.ObjectMeta.Labels[service.OrganizationLabel]; exist { - bucketDetail.Organization = organization - } else { - // cannot get organization from bucket - log.Info("Organization label is missing in bucket, skipping...", - "label", service.OrganizationLabel, - "bucket", bucket.Name) - continue - } if namespace, exist := bucket.ObjectMeta.Labels[service.NamespaceLabel]; exist { + organization, ok := namespaces[namespace] + if !ok { + // cannot find namespace in namespace list + log.Info("Namespace not found in namespace list, skipping...", + "namespace", namespace, + "bucket", bucket.Name) + continue + } bucketDetail.Namespace = namespace + bucketDetail.Organization = organization } else { // cannot get namespace from bucket log.Info("Namespace label is missing in bucket, skipping...", @@ -183,3 +192,22 @@ func addOrgAndNamespaceToBucket(ctx context.Context, buckets exoscalev1.BucketLi } return bucketDetails } + +func fetchNamespaceWithOrganizationMap(ctx context.Context, k8sclient client.Client) (map[string]string, error) { + log := ctrl.LoggerFrom(ctx) + list := &corev1.NamespaceList{} + if err := k8sclient.List(ctx, list); err != nil { + return nil, fmt.Errorf("cannot get namespace list: %w", err) + } + + namespaces := map[string]string{} + for _, ns := range list.Items { + org, ok := ns.Labels[service.OrganizationLabel] + if !ok { + log.Info("Organization label not found in namespace", "namespace", ns.Name) + continue + } + namespaces[ns.Name] = org + } + return namespaces, nil +} diff --git a/pkg/service/sos/objectstorage_test.go b/pkg/service/sos/objectstorage_test.go index 931e2a7..fe7ace8 100644 --- a/pkg/service/sos/objectstorage_test.go +++ b/pkg/service/sos/objectstorage_test.go @@ -108,9 +108,10 @@ func TestObjectStorage_GetAggregated(t *testing.T) { } } -func TestObjectStorage_AadOrgAndNamespaceToBucket(t *testing.T) { +func TestObjectStorage_addOrgAndNamespaceToBucket(t *testing.T) { tests := map[string]struct { givenBucketList exoscalev1.BucketList + givenNamespaces map[string]string expectedBucketDetails []BucketDetail }{ "GivenBucketListFromExoscale_WhenOrgAndNamespaces_ThenExpectBucketDetailObjects": { @@ -123,6 +124,12 @@ func TestObjectStorage_AadOrgAndNamespaceToBucket(t *testing.T) { createBucket("bucket-5", "theta", "orgC"), }, }, + givenNamespaces: map[string]string{ + "alpha": "orgA", + "beta": "orgB", + "omega": "orgB", + "theta": "orgC", + }, expectedBucketDetails: []BucketDetail{ createBucketDetail("bucket-1", "alpha", "orgA"), createBucketDetail("bucket-2", "beta", "orgB"), @@ -139,6 +146,7 @@ func TestObjectStorage_AadOrgAndNamespaceToBucket(t *testing.T) { createBucket("bucket-3", "", ""), }, }, + givenNamespaces: map[string]string{}, expectedBucketDetails: []BucketDetail{}, }, } @@ -148,10 +156,10 @@ func TestObjectStorage_AadOrgAndNamespaceToBucket(t *testing.T) { ctx := context.Background() // When - bucketDetails := addOrgAndNamespaceToBucket(ctx, tc.givenBucketList) + bucketDetails := addOrgAndNamespaceToBucket(ctx, tc.givenBucketList, tc.givenNamespaces) // Then - assert.Equal(t, tc.expectedBucketDetails, bucketDetails) + assert.ElementsMatch(t, tc.expectedBucketDetails, bucketDetails) }) } } From 5ace4d18748b3210400587f9dc369bf9ffa255ef Mon Sep 17 00:00:00 2001 From: Susana Garcia Date: Mon, 12 Dec 2022 13:34:52 +0100 Subject: [PATCH 2/2] extend object storage collector filtering by DBaaS by namespace label and list of namespaces --- pkg/service/dbaas/service.go | 99 +++++++++++++++++++++++++----------- 1 file changed, 70 insertions(+), 29 deletions(-) diff --git a/pkg/service/dbaas/service.go b/pkg/service/dbaas/service.go index fbbf006..9a95105 100644 --- a/pkg/service/dbaas/service.go +++ b/pkg/service/dbaas/service.go @@ -3,13 +3,16 @@ package dbaas import ( "context" "fmt" - "k8s.io/apimachinery/pkg/api/errors" "time" + "k8s.io/apimachinery/pkg/api/errors" + pipeline "github.com/ccremer/go-command-pipeline" + "github.com/go-logr/logr" "github.com/vshn/exoscale-metrics-collector/pkg/clients/exoscale" "github.com/vshn/exoscale-metrics-collector/pkg/database" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/dynamic" @@ -89,7 +92,7 @@ func (s *Service) Execute(ctx context.Context) error { p := pipeline.NewPipeline[*Context]() p.WithSteps( - p.NewStep("Fetch cluster managed DBaaS", s.fetchManagedDBaaS), + p.NewStep("Fetch cluster managed DBaaS and namespaces", s.fetchManagedDBaaSAndNamespaces), p.NewStep("Fetch exoscale DBaaS usage", s.fetchDBaaSUsage), p.NewStep("Aggregate DBaaS services by namespace and plan", aggregateDBaaS), p.WithNestedSteps("Save to billing database", hasAggregatedInstances, @@ -101,10 +104,16 @@ func (s *Service) Execute(ctx context.Context) error { return p.RunWithContext(&Context{Context: ctx}) } -// fetchManagedDBaaS fetches instances from kubernetes cluster -func (s *Service) fetchManagedDBaaS(ctx *Context) error { +// fetchManagedDBaaSAndNamespaces fetches instances and namespaces from kubernetes cluster +func (s *Service) fetchManagedDBaaSAndNamespaces(ctx *Context) error { log := ctrl.LoggerFrom(ctx) + log.V(1).Info("Listing namespaces from cluster") + namespaces, err := fetchNamespaceWithOrganizationMap(ctx, s.k8sClient) + if err != nil { + return fmt.Errorf("cannot list namespaces: %w", err) + } + var dbaasDetails []Detail for _, groupVersionResource := range groupVersionResources { managedResources, err := s.k8sClient.Resource(groupVersionResource).List(ctx, metav1.ListOptions{}) @@ -116,33 +125,11 @@ func (s *Service) fetchManagedDBaaS(ctx *Context) error { } for _, managedResource := range managedResources.Items { - dbaasDetail := Detail{ - DBName: managedResource.GetName(), - Type: groupVersionResource.Resource, - } - if organization, exist := managedResource.GetLabels()[service.OrganizationLabel]; exist { - dbaasDetail.Organization = organization - } else { - // cannot get organization from DBaaS - log.Info("Organization label is missing in DBaaS service, skipping...", - "label", service.OrganizationLabel, - "dbaas", managedResource.GetName()) - continue - } - if namespace, exist := managedResource.GetLabels()[service.NamespaceLabel]; exist { - dbaasDetail.Namespace = namespace - } else { - // cannot get namespace from DBaaS - log.Info("Namespace label is missing in DBaaS, skipping...", - "label", service.NamespaceLabel, - "dbaas", managedResource.GetName()) + dbaasDetail := findDBaaSDetailInNamespacesMap(managedResource, groupVersionResource, namespaces, log) + if dbaasDetail == nil { continue } - log.V(1).Info("Added namespace and organization to DBaaS", - "dbaas", managedResource.GetName(), - "namespace", dbaasDetail.Namespace, - "organization", dbaasDetail.Organization) - dbaasDetails = append(dbaasDetails, dbaasDetail) + dbaasDetails = append(dbaasDetails, *dbaasDetail) } } @@ -150,6 +137,36 @@ func (s *Service) fetchManagedDBaaS(ctx *Context) error { return nil } +func findDBaaSDetailInNamespacesMap(managedResource unstructured.Unstructured, groupVersionResource schema.GroupVersionResource, namespaces map[string]string, log logr.Logger) *Detail { + dbaasDetail := Detail{ + DBName: managedResource.GetName(), + Type: groupVersionResource.Resource, + } + if namespace, exist := managedResource.GetLabels()[service.NamespaceLabel]; exist { + organization, ok := namespaces[namespace] + if !ok { + // cannot find namespace in namespace list + log.Info("Namespace not found in namespace list, skipping...", + "namespace", namespace, + "dbaas", managedResource.GetName()) + return nil + } + dbaasDetail.Namespace = namespace + dbaasDetail.Organization = organization + } else { + // cannot get namespace from DBaaS + log.Info("Namespace label is missing in DBaaS, skipping...", + "label", service.NamespaceLabel, + "dbaas", managedResource.GetName()) + return nil + } + log.V(1).Info("Added namespace and organization to DBaaS", + "dbaas", managedResource.GetName(), + "namespace", dbaasDetail.Namespace, + "organization", dbaasDetail.Organization) + return &dbaasDetail +} + // fetchDBaaSUsage gets DBaaS service usage from Exoscale func (s *Service) fetchDBaaSUsage(ctx *Context) error { log := ctrl.LoggerFrom(ctx) @@ -237,3 +254,27 @@ func hasAggregatedInstances(ctx *Context) bool { } return true } + +func fetchNamespaceWithOrganizationMap(ctx context.Context, k8sClient dynamic.Interface) (map[string]string, error) { + log := ctrl.LoggerFrom(ctx) + nsGroupVersionResource := schema.GroupVersionResource{ + Group: "", + Version: "v1", + Resource: "namespaces", + } + list, err := k8sClient.Resource(nsGroupVersionResource).List(ctx, metav1.ListOptions{}) + if err != nil { + return nil, fmt.Errorf("cannot get namespace list: %w", err) + } + + namespaces := map[string]string{} + for _, ns := range list.Items { + org, ok := ns.GetLabels()[service.OrganizationLabel] + if !ok { + log.Info("Organization label not found in namespace", "namespace", ns.GetName()) + continue + } + namespaces[ns.GetName()] = org + } + return namespaces, nil +}