Skip to content

Commit

Permalink
Merge pull request #1256 from vincepri/backport-metadata-only-05
Browse files Browse the repository at this point in the history
✨ Backport release-0.5: Support metadata-only watches
  • Loading branch information
k8s-ci-robot authored Nov 19, 2020
2 parents 9dc7370 + 8da4f34 commit e1a1cae
Show file tree
Hide file tree
Showing 12 changed files with 1,637 additions and 73 deletions.
48 changes: 41 additions & 7 deletions pkg/builder/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"strings"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
Expand Down Expand Up @@ -155,20 +156,42 @@ func (blder *Builder) Build(r reconcile.Reconciler) (controller.Controller, erro
return blder.ctrl, nil
}

func (blder *Builder) project(obj runtime.Object) (runtime.Object, error) {
switch o := obj.(type) {
case *onlyMetadataWrapper:
metaObj := &metav1.PartialObjectMetadata{}
gvk, err := getGvk(o.Object, blder.mgr.GetScheme())
if err != nil {
return nil, fmt.Errorf("unable to determine GVK of %T for a metadata-only watch: %w", obj, err)
}
metaObj.SetGroupVersionKind(gvk)
return metaObj, nil
default:
return obj, nil
}
}

func (blder *Builder) doWatch() error {
// Reconcile type
src := &source.Kind{Type: blder.apiType}
hdler := &handler.EnqueueRequestForObject{}
err := blder.ctrl.Watch(src, hdler, blder.predicates...)
apiType, err := blder.project(blder.apiType)
if err != nil {
return err
}
src := &source.Kind{Type: apiType}
hdler := &handler.EnqueueRequestForObject{}
if err := blder.ctrl.Watch(src, hdler, blder.predicates...); err != nil {
return err
}

// Watches the managed types
for _, obj := range blder.managedObjects {
src := &source.Kind{Type: obj}
typeForSrc, err := blder.project(obj)
if err != nil {
return err
}
src := &source.Kind{Type: typeForSrc}
hdler := &handler.EnqueueRequestForOwner{
OwnerType: blder.apiType,
OwnerType: apiType,
IsController: true,
}
if err := blder.ctrl.Watch(src, hdler, blder.predicates...); err != nil {
Expand All @@ -178,10 +201,17 @@ func (blder *Builder) doWatch() error {

// Do the watch requests
for _, w := range blder.watchRequest {
// If the source of this watch is of type *source.Kind, project it.
if srckind, ok := w.src.(*source.Kind); ok {
typeForSrc, err := blder.project(srckind.Type)
if err != nil {
return err
}
srckind.Type = typeForSrc
}
if err := blder.ctrl.Watch(w.src, w.eventhandler, blder.predicates...); err != nil {
return err
}

}
return nil
}
Expand All @@ -196,7 +226,11 @@ func (blder *Builder) getControllerName() (string, error) {
if blder.name != "" {
return blder.name, nil
}
gvk, err := getGvk(blder.apiType, blder.mgr.GetScheme())
obj, err := blder.project(blder.apiType)
if err != nil {
return "", err
}
gvk, err := getGvk(obj, blder.mgr.GetScheme())
if err != nil {
return "", err
}
Expand Down
109 changes: 105 additions & 4 deletions pkg/builder/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/cache"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -203,8 +205,107 @@ var _ = Describe("application", func() {
close(done)
}, 10)
})

Describe("watching with projections", func() {
var mgr manager.Manager
BeforeEach(func() {
// use a cache that intercepts requests for fully typed objects to
// ensure we use the projected versions
var err error
mgr, err = manager.New(cfg, manager.Options{NewCache: newNonTypedOnlyCache})
Expect(err).NotTo(HaveOccurred())
})

It("should support watching For, Owns, and Watch as metadata", func() {
statefulSetMaps := make(chan *metav1.PartialObjectMetadata)

bldr := ControllerManagedBy(mgr).
For(OnlyMetadata(&appsv1.Deployment{})).
Owns(OnlyMetadata(&appsv1.ReplicaSet{})).
Watches(&source.Kind{Type: OnlyMetadata(&appsv1.StatefulSet{})},
&handler.EnqueueRequestsFromMapFunc{
ToRequests: handler.ToRequestsFunc(func(o handler.MapObject) []reconcile.Request {
ometa := o.Object.(*metav1.PartialObjectMetadata)
statefulSetMaps <- ometa
return nil
}),
})

doReconcileTest("8", stop, bldr, mgr, true)

By("Creating a new stateful set")
set := &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "test1",
Labels: map[string]string{
"foo": "bar",
},
},
Spec: appsv1.StatefulSetSpec{
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{"foo": "bar"},
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"foo": "bar"}},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "nginx",
Image: "nginx",
},
},
},
},
},
}
err := mgr.GetClient().Create(context.TODO(), set)
Expect(err).NotTo(HaveOccurred())

By("Checking that the mapping function has been called")
Eventually(func() bool {
metaSet := <-statefulSetMaps
Expect(metaSet.Name).To(Equal(set.Name))
Expect(metaSet.Namespace).To(Equal(set.Namespace))
Expect(metaSet.Labels).To(Equal(set.Labels))
return true
}).Should(BeTrue())
})
})
})

// newNonTypedOnlyCache returns a new cache that wraps the normal cache,
// returning an error if normal, typed objects have informers requested.
func newNonTypedOnlyCache(config *rest.Config, opts cache.Options) (cache.Cache, error) {
normalCache, err := cache.New(config, opts)
if err != nil {
return nil, err
}
return &nonTypedOnlyCache{
Cache: normalCache,
}, nil
}

// nonTypedOnlyCache is a cache.Cache that only provides metadata &
// unstructured informers.
type nonTypedOnlyCache struct {
cache.Cache
}

func (c *nonTypedOnlyCache) GetInformer(obj runtime.Object) (cache.Informer, error) {
switch obj.(type) {
case (*metav1.PartialObjectMetadata):
return c.Cache.GetInformer(obj)
default:
return nil, fmt.Errorf("did not want to provide an informer for normal type %T", obj)
}
}
func (c *nonTypedOnlyCache) GetInformerForKind(gvk schema.GroupVersionKind) (cache.Informer, error) {
return nil, fmt.Errorf("don't try to sidestep the restriction on informer types by calling GetInformerForKind")
}

// TODO(directxman12): this function has too many arguments, and the whole
// "nameSuffix" think is a bit of a hack It should be cleaned up significantly by someone with a bit of time
func doReconcileTest(nameSuffix string, stop chan struct{}, blder *Builder, mgr manager.Manager, complete bool) {
deployName := "deploy-name-" + nameSuffix
rsName := "rs-name-" + nameSuffix
Expand Down Expand Up @@ -267,8 +368,8 @@ func doReconcileTest(nameSuffix string, stop chan struct{}, blder *Builder, mgr
Expect(err).NotTo(HaveOccurred())

By("Waiting for the Deployment Reconcile")
Expect(<-ch).To(Equal(reconcile.Request{
NamespacedName: types.NamespacedName{Namespace: "default", Name: deployName}}))
Eventually(ch).Should(Receive(Equal(reconcile.Request{
NamespacedName: types.NamespacedName{Namespace: "default", Name: deployName}})))

By("Creating a ReplicaSet")
// Expect a Reconcile when an Owned object is managedObjects.
Expand Down Expand Up @@ -297,8 +398,8 @@ func doReconcileTest(nameSuffix string, stop chan struct{}, blder *Builder, mgr
Expect(err).NotTo(HaveOccurred())

By("Waiting for the ReplicaSet Reconcile")
Expect(<-ch).To(Equal(reconcile.Request{
NamespacedName: types.NamespacedName{Namespace: "default", Name: deployName}}))
Eventually(ch).Should(Receive(Equal(reconcile.Request{
NamespacedName: types.NamespacedName{Namespace: "default", Name: deployName}})))

}

Expand Down
55 changes: 55 additions & 0 deletions pkg/builder/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"os"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
logf "sigs.k8s.io/controller-runtime/pkg/log"

appsv1 "k8s.io/api/apps/v1"
Expand All @@ -34,6 +35,60 @@ import (
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

func ExampleBuilder_metadata_only() {
logf.SetLogger(zap.New())

var log = logf.Log.WithName("builder-examples")

mgr, err := manager.New(config.GetConfigOrDie(), manager.Options{})
if err != nil {
log.Error(err, "could not create manager")
os.Exit(1)
}

cl := mgr.GetClient()
err = builder.
ControllerManagedBy(mgr). // Create the ControllerManagedBy
For(&appsv1.ReplicaSet{}). // ReplicaSet is the Application API
Owns(builder.OnlyMetadata(&corev1.Pod{})). // ReplicaSet owns Pods created by it, and caches them as metadata only
Complete(reconcile.Func(func(req reconcile.Request) (reconcile.Result, error) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Read the ReplicaSet
rs := &appsv1.ReplicaSet{}
err := cl.Get(ctx, req.NamespacedName, rs)
if err != nil {
return reconcile.Result{}, client.IgnoreNotFound(err)
}

// List the Pods matching the PodTemplate Labels, but only their metadata
var podsMeta metav1.PartialObjectMetadataList
err = cl.List(ctx, &podsMeta, client.InNamespace(req.Namespace), client.MatchingLabels(rs.Spec.Template.Labels))
if err != nil {
return reconcile.Result{}, client.IgnoreNotFound(err)
}

// Update the ReplicaSet
rs.Labels["pod-count"] = fmt.Sprintf("%v", len(podsMeta.Items))
err = cl.Update(ctx, rs)
if err != nil {
return reconcile.Result{}, err
}

return reconcile.Result{}, nil
}))
if err != nil {
log.Error(err, "could not create controller")
os.Exit(1)
}

if err := mgr.Start(signals.SetupSignalHandler()); err != nil {
log.Error(err, "could not start manager")
os.Exit(1)
}
}

// This example creates a simple application ControllerManagedBy that is configured for ReplicaSets and Pods.
//
// * Create a new application for ReplicaSets that manages Pods owned by the ReplicaSet and calls into
Expand Down
36 changes: 36 additions & 0 deletions pkg/builder/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package builder

import "k8s.io/apimachinery/pkg/runtime"

// OnlyMetadata tells the controller to *only* cache metadata, and to watch
// the the API server in metadata-only form. This is useful when watching
// lots of objects, really big objects, or objects for which you only know
// the the GVK, but not the structure. You'll need to pass
// metav1.PartialObjectMetadata to the client when fetching objects in your
// reconciler, otherwise you'll end up with a duplicate structured or
// unstructured cache.
func OnlyMetadata(obj runtime.Object) runtime.Object {
return &onlyMetadataWrapper{obj}
}

type onlyMetadataWrapper struct {
runtime.Object
}

// }}}
Loading

0 comments on commit e1a1cae

Please sign in to comment.