diff --git a/internal/cosmos/cache_controller.go b/internal/cosmos/cache_controller.go index a4b71d92..243c6363 100644 --- a/internal/cosmos/cache_controller.go +++ b/internal/cosmos/cache_controller.go @@ -168,6 +168,11 @@ func (c *CacheController) SyncedPods(ctx context.Context, controller client.Obje return kube.AvailablePods(c.Collect(ctx, controller).SyncedPods(), 5*time.Second, time.Now()) } +// ReadyPods returns the pods that are ready to be upgraded or in sync (i.e. caught up with chain tip). +func (c *CacheController) ReadyPods(ctx context.Context, crd *cosmosv1.CosmosFullNode) []*corev1.Pod { + return c.Collect(ctx, client.ObjectKeyFromObject(crd)).ReadyPods(crd) +} + func (c *CacheController) listPods(ctx context.Context, controller client.ObjectKey) ([]corev1.Pod, error) { var pods corev1.PodList if err := c.client.List(ctx, &pods, diff --git a/internal/cosmos/status_collection.go b/internal/cosmos/status_collection.go index aefa018f..959d7862 100644 --- a/internal/cosmos/status_collection.go +++ b/internal/cosmos/status_collection.go @@ -6,6 +6,7 @@ import ( "time" "github.com/samber/lo" + cosmosv1 "github.com/strangelove-ventures/cosmos-operator/api/v1" "github.com/strangelove-ventures/cosmos-operator/internal/kube" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" @@ -113,3 +114,34 @@ func (coll StatusCollection) Synced() StatusCollection { func (coll StatusCollection) SyncedPods() []*corev1.Pod { return lo.Map(coll.Synced(), func(status StatusItem, _ int) *corev1.Pod { return status.GetPod() }) } + +// ReadyPods returns the pods that are caught up with the chain tip or ready to be upgraded. +func (coll StatusCollection) ReadyPods(crd *cosmosv1.CosmosFullNode) (out []*corev1.Pod) { + for _, status := range coll { + if crd.Spec.ChainSpec.Versions != nil { + instanceHeight := uint64(0) + if height, ok := crd.Status.Height[status.Pod.Name]; ok { + instanceHeight = height + } + var image string + for _, version := range crd.Spec.ChainSpec.Versions { + if instanceHeight < version.UpgradeHeight { + break + } + image = version.Image + } + if image != "" && status.Pod.Spec.Containers[0].Image != image { + out = append(out, status.GetPod()) + continue + } + } + if status.Err != nil { + continue + } + if status.Status.Result.SyncInfo.CatchingUp { + continue + } + out = append(out, status.GetPod()) + } + return out +} diff --git a/internal/fullnode/pod_control.go b/internal/fullnode/pod_control.go index d3fb3fe7..e3c546c0 100644 --- a/internal/fullnode/pod_control.go +++ b/internal/fullnode/pod_control.go @@ -4,7 +4,6 @@ import ( "context" "fmt" - "github.com/samber/lo" cosmosv1 "github.com/strangelove-ventures/cosmos-operator/api/v1" "github.com/strangelove-ventures/cosmos-operator/internal/diff" "github.com/strangelove-ventures/cosmos-operator/internal/kube" @@ -17,7 +16,7 @@ import ( ) type PodFilter interface { - SyncedPods(ctx context.Context, controller client.ObjectKey) []*corev1.Pod + ReadyPods(ctx context.Context, crd *cosmosv1.CosmosFullNode) []*corev1.Pod } // Client is a controller client. It is a subset of client.Client. @@ -83,37 +82,27 @@ func (pc PodControl) Reconcile(ctx context.Context, reporter kube.Reporter, crd return true, nil } - if len(diffed.Updates()) > 0 { - versionUpdated := false - for _, update := range diffed.Updates() { - for _, p := range pods.Items { - if p.Name == update.Name { - if p.Spec.Containers[0].Image != update.Spec.Containers[0].Image { - // version update, delete pod now. - reporter.Info("Deleting pod for version update", "podName", p.Name) - if err := pc.client.Delete(ctx, &p, client.PropagationPolicy(metav1.DeletePropagationForeground)); client.IgnoreNotFound(err) != nil { - return true, kube.TransientError(fmt.Errorf("update pod %q: %w", p.Name, err)) - } - versionUpdated = true - } - } - } - } - - if versionUpdated { - // Signal requeue. - return true, nil - } - + diffedUpdates := diffed.Updates() + if len(diffedUpdates) > 0 { var ( - // This may be a source of confusion by passing currentPods vs. pods from diff.Updates(). - // This is a leaky abstraction (which may be fixed in the future) because diff.Updates() pods are built - // from the operator and do not match what's returned by listing pods. - avail = pc.podFilter.SyncedPods(ctx, client.ObjectKeyFromObject(crd)) + avail = pc.podFilter.ReadyPods(ctx, crd) numUpdates = pc.computeRollout(crd.Spec.RolloutStrategy.MaxUnavailable, int(crd.Spec.Replicas), len(avail)) ) - for _, pod := range lo.Slice(diffed.Updates(), 0, numUpdates) { + for i, pod := range avail { + if i >= numUpdates { + break + } + var diffedPod *corev1.Pod + for _, update := range diffedUpdates { + if update.Name == pod.Name { + diffedPod = update + break + } + } + if diffedPod == nil { + return true, kube.UnrecoverableError(fmt.Errorf("pod %q not found in diffed updates", pod.Name)) + } reporter.Info("Deleting pod for update", "podName", pod.Name) // Because we should watch for deletes, we get a re-queued request, detect pod is missing, and re-create it. if err := pc.client.Delete(ctx, pod, client.PropagationPolicy(metav1.DeletePropagationForeground)); client.IgnoreNotFound(err) != nil { @@ -121,6 +110,11 @@ func (pc PodControl) Reconcile(ctx context.Context, reporter kube.Reporter, crd } } + if len(avail) == numUpdates && len(diffedUpdates) == numUpdates { + // All pods are updated. + return false, nil + } + // Signal requeue. return true, nil } diff --git a/internal/fullnode/pod_control_test.go b/internal/fullnode/pod_control_test.go index a21cc003..6c2d8640 100644 --- a/internal/fullnode/pod_control_test.go +++ b/internal/fullnode/pod_control_test.go @@ -4,6 +4,7 @@ import ( "context" "testing" + cosmosv1 "github.com/strangelove-ventures/cosmos-operator/api/v1" "github.com/strangelove-ventures/cosmos-operator/internal/diff" "github.com/stretchr/testify/require" corev1 "k8s.io/api/core/v1" @@ -12,16 +13,16 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" ) -type mockPodFilter func(ctx context.Context, controller client.ObjectKey) []*corev1.Pod +type mockPodFilter func(ctx context.Context, crd *cosmosv1.CosmosFullNode) []*corev1.Pod -func (fn mockPodFilter) SyncedPods(ctx context.Context, controller client.ObjectKey) []*corev1.Pod { +func (fn mockPodFilter) ReadyPods(ctx context.Context, crd *cosmosv1.CosmosFullNode) []*corev1.Pod { if ctx == nil { panic("nil context") } - return fn(ctx, controller) + return fn(ctx, crd) } -var panicPodFilter = mockPodFilter(func(context.Context, client.ObjectKey) []*corev1.Pod { +var panicPodFilter = mockPodFilter(func(context.Context, *cosmosv1.CosmosFullNode) []*corev1.Pod { panic("SyncedPods should not be called") }) @@ -107,9 +108,9 @@ func TestPodControl_Reconcile(t *testing.T) { } var didFilter bool - podFilter := mockPodFilter(func(_ context.Context, controller client.ObjectKey) []*corev1.Pod { - require.Equal(t, namespace, controller.Namespace) - require.Equal(t, "hub", controller.Name) + podFilter := mockPodFilter(func(_ context.Context, crd *cosmosv1.CosmosFullNode) []*corev1.Pod { + require.Equal(t, namespace, crd.Namespace) + require.Equal(t, "hub", crd.Name) didFilter = true return existing[:1] }) @@ -132,6 +133,6 @@ func TestPodControl_Reconcile(t *testing.T) { require.True(t, didFilter) require.Zero(t, mClient.CreateCount) - require.Equal(t, stubRollout, mClient.DeleteCount) + require.Equal(t, 1, mClient.DeleteCount) }) }