Skip to content

Commit

Permalink
Unit test for lightweight informer (#59)
Browse files Browse the repository at this point in the history
* created data for fake dynamic client

* added informer for fake client

* added stoppers for informers

* removed unused codes

* adding event functions

* added fake watcher for informer testing

* add condition for break - for linting

* added retry logic for testing

* listAndWatch test

* Add test for sync from previous state

* Small change so this file shows on sonar report

* Refactor to simplify test

* Add Run() tests

* Refactor tests

* Update Makefile

* added watch() test

* Reduce wait times

* Update informer.go

* Fix lint

Co-authored-by: Sherin Varughese <shvarugh@redhat.com>
Co-authored-by: Jorge Padilla <jpadilla@redhat.com>
  • Loading branch information
3 people authored Sep 11, 2020
1 parent b4da2ad commit b14a1c7
Show file tree
Hide file tree
Showing 3 changed files with 197 additions and 19 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ require (
github.com/open-cluster-management/multicloud-operators-placementrule v1.0.0
github.com/open-cluster-management/multicloud-operators-subscription v1.0.0
github.com/open-cluster-management/multicloud-operators-subscription-release v1.0.1-0.20200603160156-4d66bd136ba3 //Use 2.0 when available
github.com/stretchr/testify v1.6.0
github.com/tkanos/gonfig v0.0.0-20181112185242-896f3d81fadf
gopkg.in/yaml.v2 v2.3.0
k8s.io/api v0.18.2
Expand Down
36 changes: 17 additions & 19 deletions pkg/informer/informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

// GenericInformer ...
type GenericInformer struct {
client dynamic.Interface
gvr schema.GroupVersionResource
AddFunc func(interface{})
DeleteFunc func(interface{})
Expand All @@ -28,16 +29,10 @@ type GenericInformer struct {
// InformerForResource initialize a Generic Informer for a resource (GVR).
func InformerForResource(resource schema.GroupVersionResource) (GenericInformer, error) {
i := GenericInformer{
gvr: resource,
AddFunc: (func(interface{}) {
glog.Info("Add function not initialized.")
}),
UpdateFunc: (func(interface{}, interface{}) {
glog.Info("Update function not initialized.")
}),
DeleteFunc: (func(interface{}) {
glog.Info("Delete function not initialized.")
}),
gvr: resource,
AddFunc: (func(interface{}) { glog.Warning("AddFunc not initialized. For ", resource.String()) }),
DeleteFunc: (func(interface{}) { glog.Warning("DeleteFunc not initialized. For ", resource.String()) }),
UpdateFunc: (func(interface{}, interface{}) { glog.Warning("UpdateFunc not initialized. For ", resource.String()) }),
retries: 0,
resourceIndex: make(map[string]string),
}
Expand All @@ -54,9 +49,12 @@ func (inform *GenericInformer) Run(stopper chan struct{}) {
time.Sleep(wait)
}
glog.V(2).Info("(Re)starting informer: ", inform.gvr.String())
client := config.GetDynamicClient()
listAndResync(inform, client)
watch(inform, client, stopper)
if inform.client == nil {
inform.client = config.GetDynamicClient()
}

inform.listAndResync()
inform.watch(stopper)

if inform.stopped {
break
Expand Down Expand Up @@ -87,10 +85,10 @@ func newUnstructured(kind, uid string) *unstructured.Unstructured {

// List current resources and fires ADDED events. Then sync the current state with the previous
// state and delete any resources that are still in our cache, but no longer exist in the cluster.
func listAndResync(inform *GenericInformer, client dynamic.Interface) {
func (inform *GenericInformer) listAndResync() {

// List resources.
resources, listError := client.Resource(inform.gvr).List(metav1.ListOptions{})
resources, listError := inform.client.Resource(inform.gvr).List(metav1.ListOptions{})
if listError != nil {
glog.Warningf("Error listing resources for %s. Error: %s", inform.gvr.String(), listError)
inform.retries++
Expand Down Expand Up @@ -126,9 +124,9 @@ func listAndResync(inform *GenericInformer, client dynamic.Interface) {
}

// Watch resources and process events.
func watch(inform *GenericInformer, client dynamic.Interface, stopper chan struct{}) {
func (inform *GenericInformer) watch(stopper chan struct{}) {

watch, watchError := client.Resource(inform.gvr).Watch(metav1.ListOptions{})
watch, watchError := inform.client.Resource(inform.gvr).Watch(metav1.ListOptions{})
if watchError != nil {
glog.Warningf("Error watching resources for %s. Error: %s", inform.gvr.String(), watchError)
inform.retries++
Expand Down Expand Up @@ -184,12 +182,12 @@ func watch(inform *GenericInformer, client dynamic.Interface, stopper chan struc
delete(inform.resourceIndex, string(obj.GetUID()))

case "ERROR":
glog.V(2).Infof("Received ERROR event. Ending listAndWatch() for %s ", inform.gvr.String())
glog.V(2).Infof("Received ERROR event. Ending listAndWatch() for %s", inform.gvr.String())
watch.Stop()
return

default:
glog.V(2).Infof("Received unexpected event. Ending listAndWatch() for %s ", inform.gvr.String())
glog.V(2).Infof("Received unexpected event. Ending listAndWatch() for %s", inform.gvr.String())
watch.Stop()
return
}
Expand Down
179 changes: 179 additions & 0 deletions pkg/informer/informer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
// Copyright (c) 2020 Red Hat, Inc.

package informer

import (
"testing"
"time"

v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic/fake"
)

func fakeDynamicClient() *fake.FakeDynamicClient {

scheme := runtime.NewScheme()
return fake.NewSimpleDynamicClient(scheme,
newTestUnstructured("open-cluster-management.io/v1", "TheKind", "ns-foo", "name-foo", "id-001"),
newTestUnstructured("open-cluster-management.io/v1", "TheKind", "ns-foo", "name-foo2", "id-002"),
newTestUnstructured("open-cluster-management.io/v1", "TheKind", "ns-foo", "name-bar", "id-003"),
newTestUnstructured("open-cluster-management.io/v1", "TheKind", "ns-foo", "name-bar2", "id-004"),
newTestUnstructured("open-cluster-management.io/v1", "TheKind", "ns-foo", "name-bar3", "id-005"),
)
}

func generateSimpleEvent(informer GenericInformer, t *testing.T) {
// Add resource. Generates ADDED event.
gvr := schema.GroupVersionResource{Group: "open-cluster-management.io", Version: "v1", Resource: "thekinds"}
newResource := newTestUnstructured("open-cluster-management.io/v1", "TheKind", "ns-foo", "name-new", "id-999")
_, err1 := informer.client.Resource(gvr).Namespace("ns-foo").Create(newResource, v1.CreateOptions{})

// Update resource. Generates MODIFIED event.
_, err2 := informer.client.Resource(gvr).Namespace("ns-foo").Update(newResource, v1.UpdateOptions{})

// Delete resource. Generated DELETED event.
err3 := informer.client.Resource(gvr).Namespace("ns-foo").Delete("name-bar2", &v1.DeleteOptions{})

if err1 != nil || err2 != nil || err3 != nil {
t.Error("Error generating mocked events.")
}
}

func newTestUnstructured(apiVersion, kind, namespace, name, uid string) *unstructured.Unstructured {
return &unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": apiVersion,
"kind": kind,
"metadata": map[string]interface{}{
"namespace": namespace,
"name": name,
"uid": uid,
},
},
}
}

func initInformer() (informer GenericInformer, _ *int, _ *int, _ *int) {
// Create informer instance to test.
gvr := schema.GroupVersionResource{Group: "open-cluster-management.io", Version: "v1", Resource: "thekinds"}
informer, _ = InformerForResource(gvr)

// Add the fake client to be used by informer.
informer.client = fakeDynamicClient()

// Add mock functions
var addFuncCount, updateFuncCount, deleteFuncCount int
informer.AddFunc = func(interface{}) { addFuncCount++ }
informer.DeleteFunc = func(interface{}) { deleteFuncCount++ }
informer.UpdateFunc = func(interface{}, interface{}) { updateFuncCount++ }

return informer, &addFuncCount, &deleteFuncCount, &updateFuncCount
}

// Verify that AddFunc is called for each mocked resource.
func Test_listAndResync(t *testing.T) {

// Create informer instance to test.
informer, addFuncCount, _, _ := initInformer()

// Execute function
informer.listAndResync()

// Verify that informer.AddFunc is called for each of the mocked resources (5 times).
if *addFuncCount != 5 {
t.Errorf("Expected informer.AddFunc to be called 5 times, but got %d.", *addFuncCount)
}
}

// Verify that DeleteFunc is called for indexed resources that no longer exist.
func Test_listAndResync_syncWithPrevState(t *testing.T) {
// Create informer instance to test.
informer, _, deleteFuncCount, _ := initInformer()

// Add existing state to the informer
informer.resourceIndex["fake-uid"] = "fake-resource-version" // This resource should get deleted.
informer.resourceIndex["id-001"] = "some-resource-version" // This resource won't get deleted.

// Execute function
informer.listAndResync()

// Verify that informer.DeleteFunc is called once for resource with "fake-uid"
if *deleteFuncCount != 1 {
t.Errorf("Expected informer.DeleteFunc to be called 1 time, but got %d.", *deleteFuncCount)
}
}

// Verify the informer's Run function.
func Test_Run(t *testing.T) {
// Create informer instance to test.
informer, addFuncCount, deleteFuncCount, updateFuncCount := initInformer()

// Start informer routine
stopper := make(chan struct{})
go informer.Run(stopper)
time.Sleep(10 * time.Millisecond)

generateSimpleEvent(informer, t)
time.Sleep(10 * time.Millisecond)

stopper <- struct{}{}

// Verify that informer.AddFunc is called for each of the mocked resources (6 times).
if *addFuncCount != 6 {
t.Errorf("Expected informer.AddFunc to be called 6 times, but got %d.", *addFuncCount)
}
// Verify informer.UpdateFunc is called once.
if *updateFuncCount != 1 {
t.Errorf("Expected informer.UpdateFunc to be called 1 times, but got %d.", *updateFuncCount)
}
// Verify informer.DeleteFunc is called once.
if *deleteFuncCount != 1 {
t.Errorf("Expected informer.DeleteFunc to be called 1 times, but got %d.", *deleteFuncCount)
}
}

// Verify that backoff logic waits after retry.
func Test_Run_retryBackoff(t *testing.T) {
// Create informer instance to test.
informer, _, _, _ := initInformer()

informer.retries = 1
startTime := time.Now()
retryTime := time.Now() // Initializing to now ensures that the test fail if AddFunc is not called in the expected time.
informer.AddFunc = func(interface{}) { retryTime = time.Now() }

// Execute function
go informer.Run(make(chan struct{}))
time.Sleep(2010 * time.Millisecond)

// Verify backoff logic waits 2 seconds before retrying.
if startTime.Add(2 * time.Second).After(retryTime) {
t.Errorf("Backoff logic failed to wait for 2 seconds.")
}
}

// Verify that the informer is able to watch resources and process the events.
func Test_watch(t *testing.T) {
// Create informer instance to test.
informer, _, _, _ := initInformer()

// Create a stopper for the watch function
stopper := make(chan struct{})

go informer.watch(stopper)
time.Sleep(10 * time.Millisecond)

generateSimpleEvent(informer, t)
time.Sleep(10 * time.Millisecond)

// Simulate that the informer has been stopped successfully.
stopper <- struct{}{}
time.Sleep(10 * time.Millisecond)

if !informer.stopped {
t.Errorf("Expected informer.stopped to be true, but got %t", informer.stopped)
}
}

0 comments on commit b14a1c7

Please sign in to comment.