Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor: Pass client.Object types by reference #4040

Merged
merged 5 commits into from
Nov 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"

"github.com/crunchydata/postgres-operator/internal/bridge"
"github.com/crunchydata/postgres-operator/internal/controller/runtime"
pgoRuntime "github.com/crunchydata/postgres-operator/internal/controller/runtime"
"github.com/crunchydata/postgres-operator/internal/initialize"
"github.com/crunchydata/postgres-operator/internal/naming"
"github.com/crunchydata/postgres-operator/pkg/apis/postgres-operator.crunchydata.com/v1beta1"
)
Expand Down Expand Up @@ -54,15 +55,23 @@ func (r *CrunchyBridgeClusterReconciler) SetupWithManager(
For(&v1beta1.CrunchyBridgeCluster{}).
Owns(&corev1.Secret{}).
// Wake periodically to check Bridge API for all CrunchyBridgeClusters.
// Potentially replace with different requeue times, remove the Watch function
// Smarter: retry after a certain time for each cluster: https://gist.github.com/cbandy/a5a604e3026630c5b08cfbcdfffd2a13
// Potentially replace with different requeue times
// Smarter: retry after a certain time for each cluster
WatchesRawSource(
pgoRuntime.NewTickerImmediate(5*time.Minute, event.GenericEvent{}, r.Watch()),
runtime.NewTickerImmediate(5*time.Minute, event.GenericEvent{},
handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, _ client.Object) []ctrl.Request {
var list v1beta1.CrunchyBridgeClusterList
_ = r.List(ctx, &list)
return runtime.Requests(initialize.Pointers(list.Items...)...)
}),
),
).
// Watch secrets and filter for secrets mentioned by CrunchyBridgeClusters
Watches(
&corev1.Secret{},
r.watchForRelatedSecret(),
handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, secret client.Object) []ctrl.Request {
return runtime.Requests(r.findCrunchyBridgeClustersForSecret(ctx, client.ObjectKeyFromObject(secret))...)
}),
).
Complete(r)
}
Expand Down
66 changes: 1 addition & 65 deletions internal/bridge/crunchybridgecluster/watches.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,48 +7,11 @@ package crunchybridgecluster
import (
"context"

"k8s.io/client-go/util/workqueue"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"github.com/crunchydata/postgres-operator/pkg/apis/postgres-operator.crunchydata.com/v1beta1"
)

// watchForRelatedSecret handles create/update/delete events for secrets,
// passing the Secret ObjectKey to findCrunchyBridgeClustersForSecret
func (r *CrunchyBridgeClusterReconciler) watchForRelatedSecret() handler.EventHandler {
handle := func(ctx context.Context, secret client.Object, q workqueue.RateLimitingInterface) {
key := client.ObjectKeyFromObject(secret)

for _, cluster := range r.findCrunchyBridgeClustersForSecret(ctx, key) {
q.Add(ctrl.Request{
NamespacedName: client.ObjectKeyFromObject(cluster),
})
}
}

return handler.Funcs{
CreateFunc: func(ctx context.Context, e event.CreateEvent, q workqueue.RateLimitingInterface) {
handle(ctx, e.Object, q)
},
UpdateFunc: func(ctx context.Context, e event.UpdateEvent, q workqueue.RateLimitingInterface) {
handle(ctx, e.ObjectNew, q)
},
// If the secret is deleted, we want to reconcile
// in order to emit an event/status about this problem.
// We will also emit a matching event/status about this problem
// when we reconcile the cluster and can't find the secret.
// That way, users will get two alerts: one when the secret is deleted
// and another when the cluster is being reconciled.
DeleteFunc: func(ctx context.Context, e event.DeleteEvent, q workqueue.RateLimitingInterface) {
handle(ctx, e.Object, q)
},
}
}

//+kubebuilder:rbac:groups="postgres-operator.crunchydata.com",resources="crunchybridgeclusters",verbs={list}

// findCrunchyBridgeClustersForSecret returns CrunchyBridgeClusters
Expand All @@ -60,7 +23,7 @@ func (r *CrunchyBridgeClusterReconciler) findCrunchyBridgeClustersForSecret(
var clusters v1beta1.CrunchyBridgeClusterList

// NOTE: If this becomes slow due to a large number of CrunchyBridgeClusters in a single
// namespace, we can configure the [ctrl.Manager] field indexer and pass a
// namespace, we can configure the [manager.Manager] field indexer and pass a
// [fields.Selector] here.
// - https://book.kubebuilder.io/reference/watching-resources/externally-managed.html
if err := r.List(ctx, &clusters, &client.ListOptions{
Expand All @@ -74,30 +37,3 @@ func (r *CrunchyBridgeClusterReconciler) findCrunchyBridgeClustersForSecret(
}
return matching
}

//+kubebuilder:rbac:groups="postgres-operator.crunchydata.com",resources="crunchybridgeclusters",verbs={list}

// Watch enqueues all existing CrunchyBridgeClusters for reconciles.
func (r *CrunchyBridgeClusterReconciler) Watch() handler.EventHandler {
return handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, _ client.Object) []reconcile.Request {
log := ctrl.LoggerFrom(ctx)

crunchyBridgeClusterList := &v1beta1.CrunchyBridgeClusterList{}
if err := r.List(ctx, crunchyBridgeClusterList); err != nil {
log.Error(err, "Error listing CrunchyBridgeClusters.")
}

reconcileRequests := []reconcile.Request{}
for index := range crunchyBridgeClusterList.Items {
reconcileRequests = append(reconcileRequests,
reconcile.Request{
NamespacedName: client.ObjectKeyFromObject(
&crunchyBridgeClusterList.Items[index],
),
},
)
}

return reconcileRequests
})
}
31 changes: 3 additions & 28 deletions internal/controller/pgupgrade/pgupgrade_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,8 @@ import (
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"

"github.com/crunchydata/postgres-operator/internal/config"
Expand Down Expand Up @@ -50,7 +48,9 @@ func (r *PGUpgradeReconciler) SetupWithManager(mgr ctrl.Manager) error {
Owns(&batchv1.Job{}).
Watches(
v1beta1.NewPostgresCluster(),
r.watchPostgresClusters(),
handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, cluster client.Object) []ctrl.Request {
return runtime.Requests(r.findUpgradesForPostgresCluster(ctx, client.ObjectKeyFromObject(cluster))...)
}),
).
Complete(r)
}
Expand Down Expand Up @@ -80,31 +80,6 @@ func (r *PGUpgradeReconciler) findUpgradesForPostgresCluster(
return matching
}

// watchPostgresClusters returns a [handler.EventHandler] for PostgresClusters.
func (r *PGUpgradeReconciler) watchPostgresClusters() handler.Funcs {
handle := func(ctx context.Context, cluster client.Object, q workqueue.RateLimitingInterface) {
key := client.ObjectKeyFromObject(cluster)

for _, upgrade := range r.findUpgradesForPostgresCluster(ctx, key) {
q.Add(ctrl.Request{
NamespacedName: client.ObjectKeyFromObject(upgrade),
})
}
}

return handler.Funcs{
CreateFunc: func(ctx context.Context, e event.CreateEvent, q workqueue.RateLimitingInterface) {
handle(ctx, e.Object, q)
},
UpdateFunc: func(ctx context.Context, e event.UpdateEvent, q workqueue.RateLimitingInterface) {
handle(ctx, e.ObjectNew, q)
},
DeleteFunc: func(ctx context.Context, e event.DeleteEvent, q workqueue.RateLimitingInterface) {
handle(ctx, e.Object, q)
},
}
}

//+kubebuilder:rbac:groups="postgres-operator.crunchydata.com",resources="pgupgrades",verbs={get}
//+kubebuilder:rbac:groups="postgres-operator.crunchydata.com",resources="pgupgrades/status",verbs={patch}
//+kubebuilder:rbac:groups="batch",resources="jobs",verbs={delete}
Expand Down
2 changes: 1 addition & 1 deletion internal/controller/postgrescluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ func (r *Reconciler) reconcileClusterReplicaService(
// `dataSource.pgbackrest` fields
func (r *Reconciler) reconcileDataSource(ctx context.Context,
cluster *v1beta1.PostgresCluster, observed *observedInstances,
clusterVolumes []corev1.PersistentVolumeClaim,
clusterVolumes []*corev1.PersistentVolumeClaim,
rootCA *pki.RootCertificateAuthority,
backupsSpecFound bool,
) (bool, error) {
Expand Down
2 changes: 1 addition & 1 deletion internal/controller/postgrescluster/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func (r *Reconciler) Reconcile(
clusterConfigMap *corev1.ConfigMap
clusterReplicationSecret *corev1.Secret
clusterPodService *corev1.Service
clusterVolumes []corev1.PersistentVolumeClaim
clusterVolumes []*corev1.PersistentVolumeClaim
instanceServiceAccount *corev1.ServiceAccount
instances *observedInstances
patroniLeaderService *corev1.Service
Expand Down
12 changes: 6 additions & 6 deletions internal/controller/postgrescluster/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -588,7 +588,7 @@ func (r *Reconciler) reconcileInstanceSets(
instances *observedInstances,
patroniLeaderService *corev1.Service,
primaryCertificate *corev1.SecretProjection,
clusterVolumes []corev1.PersistentVolumeClaim,
clusterVolumes []*corev1.PersistentVolumeClaim,
exporterQueriesConfig, exporterWebConfig *corev1.ConfigMap,
backupsSpecFound bool,
) error {
Expand Down Expand Up @@ -706,12 +706,12 @@ func (r *Reconciler) cleanupPodDisruptionBudgets(
// for the instance set specified that are not currently associated with an instance, and then
// returning the instance names associated with those PVC's.
func findAvailableInstanceNames(set v1beta1.PostgresInstanceSetSpec,
observedInstances *observedInstances, clusterVolumes []corev1.PersistentVolumeClaim) []string {
observedInstances *observedInstances, clusterVolumes []*corev1.PersistentVolumeClaim) []string {

availableInstanceNames := []string{}

// first identify any PGDATA volumes for the instance set specified
setVolumes := []corev1.PersistentVolumeClaim{}
setVolumes := []*corev1.PersistentVolumeClaim{}
for _, pvc := range clusterVolumes {
// ignore PGDATA PVCs that are terminating
if pvc.GetDeletionTimestamp() != nil {
Expand All @@ -729,7 +729,7 @@ func findAvailableInstanceNames(set v1beta1.PostgresInstanceSetSpec,
// any available PGDATA volumes for the instance set that have no corresponding WAL
// volumes (which means new PVCs will simply be reconciled instead).
if set.WALVolumeClaimSpec != nil {
setVolumesWithWAL := []corev1.PersistentVolumeClaim{}
setVolumesWithWAL := []*corev1.PersistentVolumeClaim{}
for _, setVol := range setVolumes {
setVolInstance := setVol.GetLabels()[naming.LabelInstance]
for _, pvc := range clusterVolumes {
Expand Down Expand Up @@ -1066,7 +1066,7 @@ func (r *Reconciler) scaleUpInstances(
primaryCertificate *corev1.SecretProjection,
availableInstanceNames []string,
numInstancePods int,
clusterVolumes []corev1.PersistentVolumeClaim,
clusterVolumes []*corev1.PersistentVolumeClaim,
exporterQueriesConfig, exporterWebConfig *corev1.ConfigMap,
backupsSpecFound bool,
) ([]*appsv1.StatefulSet, error) {
Expand Down Expand Up @@ -1141,7 +1141,7 @@ func (r *Reconciler) reconcileInstance(
primaryCertificate *corev1.SecretProjection,
instance *appsv1.StatefulSet,
numInstancePods int,
clusterVolumes []corev1.PersistentVolumeClaim,
clusterVolumes []*corev1.PersistentVolumeClaim,
exporterQueriesConfig, exporterWebConfig *corev1.ConfigMap,
backupsSpecFound bool,
) error {
Expand Down
18 changes: 9 additions & 9 deletions internal/controller/postgrescluster/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1758,7 +1758,7 @@ func TestFindAvailableInstanceNames(t *testing.T) {
testCases := []struct {
set v1beta1.PostgresInstanceSetSpec
fakeObservedInstances *observedInstances
fakeClusterVolumes []corev1.PersistentVolumeClaim
fakeClusterVolumes []*corev1.PersistentVolumeClaim
expectedInstanceNames []string
}{{
set: v1beta1.PostgresInstanceSetSpec{Name: "instance1"},
Expand All @@ -1769,7 +1769,7 @@ func TestFindAvailableInstanceNames(t *testing.T) {
[]appsv1.StatefulSet{{}},
[]corev1.Pod{},
),
fakeClusterVolumes: []corev1.PersistentVolumeClaim{{}},
fakeClusterVolumes: []*corev1.PersistentVolumeClaim{{}},
expectedInstanceNames: []string{},
}, {
set: v1beta1.PostgresInstanceSetSpec{Name: "instance1"},
Expand All @@ -1783,7 +1783,7 @@ func TestFindAvailableInstanceNames(t *testing.T) {
naming.LabelInstanceSet: "instance1"}}}},
[]corev1.Pod{},
),
fakeClusterVolumes: []corev1.PersistentVolumeClaim{{ObjectMeta: metav1.ObjectMeta{
fakeClusterVolumes: []*corev1.PersistentVolumeClaim{{ObjectMeta: metav1.ObjectMeta{
Name: "instance1-abc-def",
Labels: map[string]string{
naming.LabelRole: naming.RolePostgresData,
Expand All @@ -1802,7 +1802,7 @@ func TestFindAvailableInstanceNames(t *testing.T) {
naming.LabelInstanceSet: "instance1"}}}},
[]corev1.Pod{},
),
fakeClusterVolumes: []corev1.PersistentVolumeClaim{},
fakeClusterVolumes: []*corev1.PersistentVolumeClaim{},
expectedInstanceNames: []string{},
}, {
set: v1beta1.PostgresInstanceSetSpec{Name: "instance1"},
Expand All @@ -1816,7 +1816,7 @@ func TestFindAvailableInstanceNames(t *testing.T) {
naming.LabelInstanceSet: "instance1"}}}},
[]corev1.Pod{},
),
fakeClusterVolumes: []corev1.PersistentVolumeClaim{
fakeClusterVolumes: []*corev1.PersistentVolumeClaim{
{ObjectMeta: metav1.ObjectMeta{
Name: "instance1-abc-def",
Labels: map[string]string{
Expand All @@ -1843,7 +1843,7 @@ func TestFindAvailableInstanceNames(t *testing.T) {
naming.LabelInstanceSet: "instance1"}}}},
[]corev1.Pod{},
),
fakeClusterVolumes: []corev1.PersistentVolumeClaim{{ObjectMeta: metav1.ObjectMeta{
fakeClusterVolumes: []*corev1.PersistentVolumeClaim{{ObjectMeta: metav1.ObjectMeta{
Name: "instance1-abc-def",
Labels: map[string]string{
naming.LabelRole: naming.RolePostgresData,
Expand All @@ -1863,7 +1863,7 @@ func TestFindAvailableInstanceNames(t *testing.T) {
naming.LabelInstanceSet: "instance1"}}}},
[]corev1.Pod{},
),
fakeClusterVolumes: []corev1.PersistentVolumeClaim{
fakeClusterVolumes: []*corev1.PersistentVolumeClaim{
{ObjectMeta: metav1.ObjectMeta{
Name: "instance1-abc-def",
Labels: map[string]string{
Expand All @@ -1887,7 +1887,7 @@ func TestFindAvailableInstanceNames(t *testing.T) {
[]appsv1.StatefulSet{},
[]corev1.Pod{},
),
fakeClusterVolumes: []corev1.PersistentVolumeClaim{
fakeClusterVolumes: []*corev1.PersistentVolumeClaim{
{ObjectMeta: metav1.ObjectMeta{
Name: "instance1-def-ghi",
Labels: map[string]string{
Expand All @@ -1911,7 +1911,7 @@ func TestFindAvailableInstanceNames(t *testing.T) {
[]appsv1.StatefulSet{},
[]corev1.Pod{},
),
fakeClusterVolumes: []corev1.PersistentVolumeClaim{{ObjectMeta: metav1.ObjectMeta{
fakeClusterVolumes: []*corev1.PersistentVolumeClaim{{ObjectMeta: metav1.ObjectMeta{
Name: "instance1-def-ghi",
Labels: map[string]string{
naming.LabelRole: naming.RolePostgresData,
Expand Down
4 changes: 2 additions & 2 deletions internal/controller/postgrescluster/pgbackrest.go
Original file line number Diff line number Diff line change
Expand Up @@ -1499,7 +1499,7 @@ func (r *Reconciler) reconcilePGBackRest(ctx context.Context,
// for the PostgresCluster being reconciled using the backups of another PostgresCluster.
func (r *Reconciler) reconcilePostgresClusterDataSource(ctx context.Context,
cluster *v1beta1.PostgresCluster, dataSource *v1beta1.PostgresClusterDataSource,
configHash string, clusterVolumes []corev1.PersistentVolumeClaim,
configHash string, clusterVolumes []*corev1.PersistentVolumeClaim,
rootCA *pki.RootCertificateAuthority,
backupsSpecFound bool,
) error {
Expand Down Expand Up @@ -1663,7 +1663,7 @@ func (r *Reconciler) reconcilePostgresClusterDataSource(ctx context.Context,
// data source, i.e., S3, etc.
func (r *Reconciler) reconcileCloudBasedDataSource(ctx context.Context,
cluster *v1beta1.PostgresCluster, dataSource *v1beta1.PGBackRestDataSource,
configHash string, clusterVolumes []corev1.PersistentVolumeClaim) error {
configHash string, clusterVolumes []*corev1.PersistentVolumeClaim) error {

// Ensure the proper instance and instance set can be identified via the status. The
// StartupInstance and StartupInstanceSet values should be populated when the cluster
Expand Down
Loading