Skip to content

Commit

Permalink
harden rollout
Browse files Browse the repository at this point in the history
  • Loading branch information
agouin committed Oct 19, 2023
1 parent 75a517c commit 3181c09
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 37 deletions.
5 changes: 5 additions & 0 deletions internal/cosmos/cache_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,11 @@ func (c *CacheController) SyncedPods(ctx context.Context, controller client.Obje
return kube.AvailablePods(c.Collect(ctx, controller).SyncedPods(), 5*time.Second, time.Now())
}

// ReadyPods returns the pods that are ready to be upgraded or in sync (i.e. caught up with chain tip).
func (c *CacheController) ReadyPods(ctx context.Context, crd *cosmosv1.CosmosFullNode) []*corev1.Pod {
return c.Collect(ctx, client.ObjectKeyFromObject(crd)).ReadyPods(crd)
}

func (c *CacheController) listPods(ctx context.Context, controller client.ObjectKey) ([]corev1.Pod, error) {
var pods corev1.PodList
if err := c.client.List(ctx, &pods,
Expand Down
32 changes: 32 additions & 0 deletions internal/cosmos/status_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/samber/lo"
cosmosv1 "github.com/strangelove-ventures/cosmos-operator/api/v1"
"github.com/strangelove-ventures/cosmos-operator/internal/kube"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -113,3 +114,34 @@ func (coll StatusCollection) Synced() StatusCollection {
func (coll StatusCollection) SyncedPods() []*corev1.Pod {
return lo.Map(coll.Synced(), func(status StatusItem, _ int) *corev1.Pod { return status.GetPod() })
}

// ReadyPods returns the pods that are caught up with the chain tip or ready to be upgraded.
func (coll StatusCollection) ReadyPods(crd *cosmosv1.CosmosFullNode) (out []*corev1.Pod) {
for _, status := range coll {
if crd.Spec.ChainSpec.Versions != nil {
instanceHeight := uint64(0)
if height, ok := crd.Status.Height[status.Pod.Name]; ok {
instanceHeight = height
}
var image string
for _, version := range crd.Spec.ChainSpec.Versions {
if instanceHeight < version.UpgradeHeight {
break
}
image = version.Image
}
if image != "" && status.Pod.Spec.Containers[0].Image != image {
out = append(out, status.GetPod())
continue
}
}
if status.Err != nil {
continue
}
if status.Status.Result.SyncInfo.CatchingUp {
continue
}
out = append(out, status.GetPod())
}
return out
}
52 changes: 23 additions & 29 deletions internal/fullnode/pod_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"

"github.com/samber/lo"
cosmosv1 "github.com/strangelove-ventures/cosmos-operator/api/v1"
"github.com/strangelove-ventures/cosmos-operator/internal/diff"
"github.com/strangelove-ventures/cosmos-operator/internal/kube"
Expand All @@ -17,7 +16,7 @@ import (
)

type PodFilter interface {
SyncedPods(ctx context.Context, controller client.ObjectKey) []*corev1.Pod
ReadyPods(ctx context.Context, crd *cosmosv1.CosmosFullNode) []*corev1.Pod
}

// Client is a controller client. It is a subset of client.Client.
Expand Down Expand Up @@ -83,44 +82,39 @@ func (pc PodControl) Reconcile(ctx context.Context, reporter kube.Reporter, crd
return true, nil
}

if len(diffed.Updates()) > 0 {
versionUpdated := false
for _, update := range diffed.Updates() {
for _, p := range pods.Items {
if p.Name == update.Name {
if p.Spec.Containers[0].Image != update.Spec.Containers[0].Image {
// version update, delete pod now.
reporter.Info("Deleting pod for version update", "podName", p.Name)
if err := pc.client.Delete(ctx, &p, client.PropagationPolicy(metav1.DeletePropagationForeground)); client.IgnoreNotFound(err) != nil {
return true, kube.TransientError(fmt.Errorf("update pod %q: %w", p.Name, err))
}
versionUpdated = true
}
}
}
}

if versionUpdated {
// Signal requeue.
return true, nil
}

diffedUpdates := diffed.Updates()
if len(diffedUpdates) > 0 {
var (
// This may be a source of confusion by passing currentPods vs. pods from diff.Updates().
// This is a leaky abstraction (which may be fixed in the future) because diff.Updates() pods are built
// from the operator and do not match what's returned by listing pods.
avail = pc.podFilter.SyncedPods(ctx, client.ObjectKeyFromObject(crd))
avail = pc.podFilter.ReadyPods(ctx, crd)
numUpdates = pc.computeRollout(crd.Spec.RolloutStrategy.MaxUnavailable, int(crd.Spec.Replicas), len(avail))
)

for _, pod := range lo.Slice(diffed.Updates(), 0, numUpdates) {
for i, pod := range avail {
if i >= numUpdates {
break
}
var diffedPod *corev1.Pod
for _, update := range diffedUpdates {
if update.Name == pod.Name {
diffedPod = update
break
}
}
if diffedPod == nil {
return true, kube.UnrecoverableError(fmt.Errorf("pod %q not found in diffed updates", pod.Name))
}
reporter.Info("Deleting pod for update", "podName", pod.Name)
// Because we should watch for deletes, we get a re-queued request, detect pod is missing, and re-create it.
if err := pc.client.Delete(ctx, pod, client.PropagationPolicy(metav1.DeletePropagationForeground)); client.IgnoreNotFound(err) != nil {
return true, kube.TransientError(fmt.Errorf("update pod %q: %w", pod.Name, err))
}
}

if len(avail) == numUpdates && len(diffedUpdates) == numUpdates {
// All pods are updated.
return false, nil
}

// Signal requeue.
return true, nil
}
Expand Down
17 changes: 9 additions & 8 deletions internal/fullnode/pod_control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"testing"

cosmosv1 "github.com/strangelove-ventures/cosmos-operator/api/v1"
"github.com/strangelove-ventures/cosmos-operator/internal/diff"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
Expand All @@ -12,16 +13,16 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
)

type mockPodFilter func(ctx context.Context, controller client.ObjectKey) []*corev1.Pod
type mockPodFilter func(ctx context.Context, crd *cosmosv1.CosmosFullNode) []*corev1.Pod

func (fn mockPodFilter) SyncedPods(ctx context.Context, controller client.ObjectKey) []*corev1.Pod {
func (fn mockPodFilter) ReadyPods(ctx context.Context, crd *cosmosv1.CosmosFullNode) []*corev1.Pod {
if ctx == nil {
panic("nil context")
}
return fn(ctx, controller)
return fn(ctx, crd)
}

var panicPodFilter = mockPodFilter(func(context.Context, client.ObjectKey) []*corev1.Pod {
var panicPodFilter = mockPodFilter(func(context.Context, *cosmosv1.CosmosFullNode) []*corev1.Pod {
panic("SyncedPods should not be called")
})

Expand Down Expand Up @@ -107,9 +108,9 @@ func TestPodControl_Reconcile(t *testing.T) {
}

var didFilter bool
podFilter := mockPodFilter(func(_ context.Context, controller client.ObjectKey) []*corev1.Pod {
require.Equal(t, namespace, controller.Namespace)
require.Equal(t, "hub", controller.Name)
podFilter := mockPodFilter(func(_ context.Context, crd *cosmosv1.CosmosFullNode) []*corev1.Pod {
require.Equal(t, namespace, crd.Namespace)
require.Equal(t, "hub", crd.Name)
didFilter = true
return existing[:1]
})
Expand All @@ -132,6 +133,6 @@ func TestPodControl_Reconcile(t *testing.T) {
require.True(t, didFilter)

require.Zero(t, mClient.CreateCount)
require.Equal(t, stubRollout, mClient.DeleteCount)
require.Equal(t, 1, mClient.DeleteCount)
})
}

0 comments on commit 3181c09

Please sign in to comment.