Skip to content

Commit

Permalink
Simplify controller watches using EnqueueRequestsFromMapFunc
Browse files Browse the repository at this point in the history
  • Loading branch information
cbandy committed Nov 29, 2024
1 parent ed8ca88 commit 58351d3
Show file tree
Hide file tree
Showing 9 changed files with 100 additions and 209 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"

"github.com/crunchydata/postgres-operator/internal/bridge"
"github.com/crunchydata/postgres-operator/internal/controller/runtime"
pgoRuntime "github.com/crunchydata/postgres-operator/internal/controller/runtime"
"github.com/crunchydata/postgres-operator/internal/initialize"
"github.com/crunchydata/postgres-operator/internal/naming"
"github.com/crunchydata/postgres-operator/pkg/apis/postgres-operator.crunchydata.com/v1beta1"
)
Expand Down Expand Up @@ -54,15 +55,23 @@ func (r *CrunchyBridgeClusterReconciler) SetupWithManager(
For(&v1beta1.CrunchyBridgeCluster{}).
Owns(&corev1.Secret{}).
// Wake periodically to check Bridge API for all CrunchyBridgeClusters.
// Potentially replace with different requeue times, remove the Watch function
// Smarter: retry after a certain time for each cluster: https://gist.github.com/cbandy/a5a604e3026630c5b08cfbcdfffd2a13
// Potentially replace with different requeue times
// Smarter: retry after a certain time for each cluster
WatchesRawSource(
pgoRuntime.NewTickerImmediate(5*time.Minute, event.GenericEvent{}, r.Watch()),
runtime.NewTickerImmediate(5*time.Minute, event.GenericEvent{},
handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, _ client.Object) []ctrl.Request {
var list v1beta1.CrunchyBridgeClusterList
_ = r.List(ctx, &list)
return runtime.Requests(initialize.Pointers(list.Items...)...)
}),
),
).
// Watch secrets and filter for secrets mentioned by CrunchyBridgeClusters
Watches(
&corev1.Secret{},
r.watchForRelatedSecret(),
handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, secret client.Object) []ctrl.Request {
return runtime.Requests(r.findCrunchyBridgeClustersForSecret(ctx, client.ObjectKeyFromObject(secret))...)
}),
).
Complete(r)
}
Expand Down
66 changes: 1 addition & 65 deletions internal/bridge/crunchybridgecluster/watches.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,48 +7,11 @@ package crunchybridgecluster
import (
"context"

"k8s.io/client-go/util/workqueue"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"github.com/crunchydata/postgres-operator/pkg/apis/postgres-operator.crunchydata.com/v1beta1"
)

// watchForRelatedSecret handles create/update/delete events for secrets,
// passing the Secret ObjectKey to findCrunchyBridgeClustersForSecret
func (r *CrunchyBridgeClusterReconciler) watchForRelatedSecret() handler.EventHandler {
handle := func(ctx context.Context, secret client.Object, q workqueue.RateLimitingInterface) {
key := client.ObjectKeyFromObject(secret)

for _, cluster := range r.findCrunchyBridgeClustersForSecret(ctx, key) {
q.Add(ctrl.Request{
NamespacedName: client.ObjectKeyFromObject(cluster),
})
}
}

return handler.Funcs{
CreateFunc: func(ctx context.Context, e event.CreateEvent, q workqueue.RateLimitingInterface) {
handle(ctx, e.Object, q)
},
UpdateFunc: func(ctx context.Context, e event.UpdateEvent, q workqueue.RateLimitingInterface) {
handle(ctx, e.ObjectNew, q)
},
// If the secret is deleted, we want to reconcile
// in order to emit an event/status about this problem.
// We will also emit a matching event/status about this problem
// when we reconcile the cluster and can't find the secret.
// That way, users will get two alerts: one when the secret is deleted
// and another when the cluster is being reconciled.
DeleteFunc: func(ctx context.Context, e event.DeleteEvent, q workqueue.RateLimitingInterface) {
handle(ctx, e.Object, q)
},
}
}

//+kubebuilder:rbac:groups="postgres-operator.crunchydata.com",resources="crunchybridgeclusters",verbs={list}

// findCrunchyBridgeClustersForSecret returns CrunchyBridgeClusters
Expand All @@ -60,7 +23,7 @@ func (r *CrunchyBridgeClusterReconciler) findCrunchyBridgeClustersForSecret(
var clusters v1beta1.CrunchyBridgeClusterList

// NOTE: If this becomes slow due to a large number of CrunchyBridgeClusters in a single
// namespace, we can configure the [ctrl.Manager] field indexer and pass a
// namespace, we can configure the [manager.Manager] field indexer and pass a
// [fields.Selector] here.
// - https://book.kubebuilder.io/reference/watching-resources/externally-managed.html
if err := r.List(ctx, &clusters, &client.ListOptions{
Expand All @@ -74,30 +37,3 @@ func (r *CrunchyBridgeClusterReconciler) findCrunchyBridgeClustersForSecret(
}
return matching
}

//+kubebuilder:rbac:groups="postgres-operator.crunchydata.com",resources="crunchybridgeclusters",verbs={list}

// Watch enqueues all existing CrunchyBridgeClusters for reconciles.
func (r *CrunchyBridgeClusterReconciler) Watch() handler.EventHandler {
return handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, _ client.Object) []reconcile.Request {
log := ctrl.LoggerFrom(ctx)

crunchyBridgeClusterList := &v1beta1.CrunchyBridgeClusterList{}
if err := r.List(ctx, crunchyBridgeClusterList); err != nil {
log.Error(err, "Error listing CrunchyBridgeClusters.")
}

reconcileRequests := []reconcile.Request{}
for index := range crunchyBridgeClusterList.Items {
reconcileRequests = append(reconcileRequests,
reconcile.Request{
NamespacedName: client.ObjectKeyFromObject(
&crunchyBridgeClusterList.Items[index],
),
},
)
}

return reconcileRequests
})
}
31 changes: 3 additions & 28 deletions internal/controller/pgupgrade/pgupgrade_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,8 @@ import (
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"

"github.com/crunchydata/postgres-operator/internal/config"
Expand Down Expand Up @@ -50,7 +48,9 @@ func (r *PGUpgradeReconciler) SetupWithManager(mgr ctrl.Manager) error {
Owns(&batchv1.Job{}).
Watches(
v1beta1.NewPostgresCluster(),
r.watchPostgresClusters(),
handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, cluster client.Object) []ctrl.Request {
return runtime.Requests(r.findUpgradesForPostgresCluster(ctx, client.ObjectKeyFromObject(cluster))...)
}),
).
Complete(r)
}
Expand Down Expand Up @@ -80,31 +80,6 @@ func (r *PGUpgradeReconciler) findUpgradesForPostgresCluster(
return matching
}

// watchPostgresClusters returns a [handler.EventHandler] for PostgresClusters.
func (r *PGUpgradeReconciler) watchPostgresClusters() handler.Funcs {
handle := func(ctx context.Context, cluster client.Object, q workqueue.RateLimitingInterface) {
key := client.ObjectKeyFromObject(cluster)

for _, upgrade := range r.findUpgradesForPostgresCluster(ctx, key) {
q.Add(ctrl.Request{
NamespacedName: client.ObjectKeyFromObject(upgrade),
})
}
}

return handler.Funcs{
CreateFunc: func(ctx context.Context, e event.CreateEvent, q workqueue.RateLimitingInterface) {
handle(ctx, e.Object, q)
},
UpdateFunc: func(ctx context.Context, e event.UpdateEvent, q workqueue.RateLimitingInterface) {
handle(ctx, e.ObjectNew, q)
},
DeleteFunc: func(ctx context.Context, e event.DeleteEvent, q workqueue.RateLimitingInterface) {
handle(ctx, e.Object, q)
},
}
}

//+kubebuilder:rbac:groups="postgres-operator.crunchydata.com",resources="pgupgrades",verbs={get}
//+kubebuilder:rbac:groups="postgres-operator.crunchydata.com",resources="pgupgrades/status",verbs={patch}
//+kubebuilder:rbac:groups="batch",resources="jobs",verbs={delete}
Expand Down
12 changes: 12 additions & 0 deletions internal/controller/runtime/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,21 @@ package runtime
import (
"time"

"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

// Requests converts objects to a slice of [reconcile.Request].
func Requests[T client.Object](objects ...T) []reconcile.Request {
result := make([]reconcile.Request, len(objects))
for i := range objects {
result[i] = reconcile.Request{
NamespacedName: client.ObjectKeyFromObject(objects[i]),
}
}
return result
}

// ErrorWithBackoff returns a Result and error that indicate a non-nil err
// should be logged and measured and its [reconcile.Request] should be retried
// later. When err is nil, nothing is logged and the Request is not retried.
Expand Down
27 changes: 27 additions & 0 deletions internal/controller/runtime/reconcile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,36 @@ import (
"time"

"gotest.tools/v3/assert"
"gotest.tools/v3/assert/cmp"
corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

func TestRequests(t *testing.T) {
none := Requests[client.Object]()
assert.Assert(t, none != nil, "does not return nil slice")
assert.DeepEqual(t, none, []reconcile.Request{})

assert.Assert(t, cmp.Panics(func() {
Requests[client.Object](nil)
}), "expected nil pointer dereference")

// Empty request when no metadata.
assert.DeepEqual(t, Requests(new(corev1.Secret)), []reconcile.Request{{}})

secret := new(corev1.Secret)
secret.Namespace = "asdf"

expected := reconcile.Request{}
expected.Namespace = "asdf"
assert.DeepEqual(t, Requests(secret), []reconcile.Request{expected})

secret.Name = "123"
expected.Name = "123"
assert.DeepEqual(t, Requests(secret), []reconcile.Request{expected})
}

func TestErrorWithBackoff(t *testing.T) {
result, err := ErrorWithBackoff(nil)
assert.Assert(t, result.IsZero())
Expand Down
13 changes: 9 additions & 4 deletions internal/controller/standalone_pgadmin/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/handler"

controllerruntime "github.com/crunchydata/postgres-operator/internal/controller/runtime"
"github.com/crunchydata/postgres-operator/internal/controller/runtime"
"github.com/crunchydata/postgres-operator/internal/logging"
"github.com/crunchydata/postgres-operator/pkg/apis/postgres-operator.crunchydata.com/v1beta1"
)
Expand Down Expand Up @@ -46,7 +47,7 @@ type PGAdminReconciler struct {
func (r *PGAdminReconciler) SetupWithManager(mgr ctrl.Manager) error {
if r.PodExec == nil {
var err error
r.PodExec, err = controllerruntime.NewPodExecutor(mgr.GetConfig())
r.PodExec, err = runtime.NewPodExecutor(mgr.GetConfig())
if err != nil {
return err
}
Expand All @@ -61,11 +62,15 @@ func (r *PGAdminReconciler) SetupWithManager(mgr ctrl.Manager) error {
Owns(&corev1.Service{}).
Watches(
v1beta1.NewPostgresCluster(),
r.watchPostgresClusters(),
handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, cluster client.Object) []ctrl.Request {
return runtime.Requests(r.findPGAdminsForPostgresCluster(ctx, cluster)...)
}),
).
Watches(
&corev1.Secret{},
r.watchForRelatedSecret(),
handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, secret client.Object) []ctrl.Request {
return runtime.Requests(r.findPGAdminsForSecret(ctx, client.ObjectKeyFromObject(secret))...)
}),
).
Complete(r)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ func (r *PGAdminReconciler) findPGAdminsForPostgresCluster(
)

// NOTE: If this becomes slow due to a large number of pgadmins in a single
// namespace, we can configure the [ctrl.Manager] field indexer and pass a
// namespace, we can configure the [manager.Manager] field indexer and pass a
// [fields.Selector] here.
// - https://book.kubebuilder.io/reference/watching-resources/externally-managed.html
if r.List(ctx, &pgadmins, &client.ListOptions{
if r.Client.List(ctx, &pgadmins, &client.ListOptions{
Namespace: cluster.GetNamespace(),
}) == nil {
for i := range pgadmins.Items {
Expand All @@ -50,7 +50,36 @@ func (r *PGAdminReconciler) findPGAdminsForPostgresCluster(
return matching
}

//+kubebuilder:rbac:groups="postgres-operator.crunchydata.com",resources="postgresclusters",verbs={list,watch}
//+kubebuilder:rbac:groups="postgres-operator.crunchydata.com",resources="pgadmins",verbs={list}

// findPGAdminsForSecret returns PGAdmins that have a user or users that have their password
// stored in the Secret
func (r *PGAdminReconciler) findPGAdminsForSecret(
ctx context.Context, secret client.ObjectKey,
) []*v1beta1.PGAdmin {
var matching []*v1beta1.PGAdmin
var pgadmins v1beta1.PGAdminList

// NOTE: If this becomes slow due to a large number of PGAdmins in a single
// namespace, we can configure the [manager.Manager] field indexer and pass a
// [fields.Selector] here.
// - https://book.kubebuilder.io/reference/watching-resources/externally-managed.html
if err := r.Client.List(ctx, &pgadmins, &client.ListOptions{
Namespace: secret.Namespace,
}); err == nil {
for i := range pgadmins.Items {
for j := range pgadmins.Items[i].Spec.Users {
if pgadmins.Items[i].Spec.Users[j].PasswordRef.Name == secret.Name {
matching = append(matching, &pgadmins.Items[i])
break
}
}
}
}
return matching
}

//+kubebuilder:rbac:groups="postgres-operator.crunchydata.com",resources="postgresclusters",verbs={get,list}

// getClustersForPGAdmin returns clusters managed by the given pgAdmin
func (r *PGAdminReconciler) getClustersForPGAdmin(
Expand All @@ -64,7 +93,7 @@ func (r *PGAdminReconciler) getClustersForPGAdmin(
for _, serverGroup := range pgAdmin.Spec.ServerGroups {
var cluster v1beta1.PostgresCluster
if serverGroup.PostgresClusterName != "" {
err = r.Get(ctx, client.ObjectKey{
err = r.Client.Get(ctx, client.ObjectKey{
Name: serverGroup.PostgresClusterName,
Namespace: pgAdmin.GetNamespace(),
}, &cluster)
Expand All @@ -75,7 +104,7 @@ func (r *PGAdminReconciler) getClustersForPGAdmin(
}
if selector, err = naming.AsSelector(serverGroup.PostgresClusterSelector); err == nil {
var list v1beta1.PostgresClusterList
err = r.List(ctx, &list,
err = r.Client.List(ctx, &list,
client.InNamespace(pgAdmin.Namespace),
client.MatchingLabelsSelector{Selector: selector},
)
Expand Down
Loading

0 comments on commit 58351d3

Please sign in to comment.