diff --git a/api/v1/cosmosfullnode_types.go b/api/v1/cosmosfullnode_types.go index d536390a..062c4313 100644 --- a/api/v1/cosmosfullnode_types.go +++ b/api/v1/cosmosfullnode_types.go @@ -138,21 +138,14 @@ type FullNodeStatus struct { // Current sync information. Collected every 60s. // +optional - SyncInfo *SyncInfoStatus `json:"syncInfo,omitempty"` + SyncInfo map[string]*SyncInfoPodStatus `json:"sync,omitempty"` // Latest Height information. collected when node starts up and when RPC is successfully queried. // +optional Height map[string]uint64 `json:"height,omitempty"` } -type SyncInfoStatus struct { - // The latest consensus state of pods. - Pods []SyncInfoPodStatus `json:"pods"` -} - type SyncInfoPodStatus struct { - // Pod's name. - Pod string `json:"pod"` // When consensus information was fetched. Timestamp metav1.Time `json:"timestamp"` // Latest height if no error encountered. diff --git a/api/v1/zz_generated.deepcopy.go b/api/v1/zz_generated.deepcopy.go index b528590a..b7e88e0a 100644 --- a/api/v1/zz_generated.deepcopy.go +++ b/api/v1/zz_generated.deepcopy.go @@ -320,8 +320,18 @@ func (in *FullNodeStatus) DeepCopyInto(out *FullNodeStatus) { } if in.SyncInfo != nil { in, out := &in.SyncInfo, &out.SyncInfo - *out = new(SyncInfoStatus) - (*in).DeepCopyInto(*out) + *out = make(map[string]*SyncInfoPodStatus, len(*in)) + for key, val := range *in { + var outVal *SyncInfoPodStatus + if val == nil { + (*out)[key] = nil + } else { + in, out := &val, &outVal + *out = new(SyncInfoPodStatus) + (*in).DeepCopyInto(*out) + } + (*out)[key] = outVal + } } if in.Height != nil { in, out := &in.Height, &out.Height @@ -762,25 +772,3 @@ func (in *SyncInfoPodStatus) DeepCopy() *SyncInfoPodStatus { in.DeepCopyInto(out) return out } - -// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *SyncInfoStatus) DeepCopyInto(out *SyncInfoStatus) { - *out = *in - if in.Pods != nil { - in, out := &in.Pods, &out.Pods - *out = make([]SyncInfoPodStatus, len(*in)) - for i := range *in { - (*in)[i].DeepCopyInto(&(*out)[i]) - } - } -} - -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SyncInfoStatus. -func (in *SyncInfoStatus) DeepCopy() *SyncInfoStatus { - if in == nil { - return nil - } - out := new(SyncInfoStatus) - in.DeepCopyInto(out) - return out -} diff --git a/config/crd/bases/cosmos.strange.love_cosmosfullnodes.yaml b/config/crd/bases/cosmos.strange.love_cosmosfullnodes.yaml index 1b340d34..a76cf9d2 100644 --- a/config/crd/bases/cosmos.strange.love_cosmosfullnodes.yaml +++ b/config/crd/bases/cosmos.strange.love_cosmosfullnodes.yaml @@ -6009,39 +6009,28 @@ spec: status: description: A generic message for the user. May contain errors. type: string - syncInfo: + sync: + additionalProperties: + properties: + error: + description: Error message if unable to fetch consensus state. + type: string + height: + description: Latest height if no error encountered. + format: int64 + type: integer + inSync: + description: If the pod reports itself as in sync with chain + tip. + type: boolean + timestamp: + description: When consensus information was fetched. + format: date-time + type: string + required: + - timestamp + type: object description: Current sync information. Collected every 60s. - properties: - pods: - description: The latest consensus state of pods. - items: - properties: - error: - description: Error message if unable to fetch consensus - state. - type: string - height: - description: Latest height if no error encountered. - format: int64 - type: integer - inSync: - description: If the pod reports itself as in sync with chain - tip. - type: boolean - pod: - description: Pod's name. - type: string - timestamp: - description: When consensus information was fetched. - format: date-time - type: string - required: - - pod - - timestamp - type: object - type: array - required: - - pods type: object required: - observedGeneration diff --git a/controllers/cosmosfullnode_controller.go b/controllers/cosmosfullnode_controller.go index aec64940..f60d370a 100644 --- a/controllers/cosmosfullnode_controller.go +++ b/controllers/cosmosfullnode_controller.go @@ -71,7 +71,7 @@ func NewFullNode( configMapControl: fullnode.NewConfigMapControl(client), nodeKeyControl: fullnode.NewNodeKeyControl(client), peerCollector: fullnode.NewPeerCollector(client), - podControl: fullnode.NewPodControl(client), + podControl: fullnode.NewPodControl(client, cacheController), pvcControl: fullnode.NewPVCControl(client), recorder: recorder, serviceControl: fullnode.NewServiceControl(client), @@ -121,7 +121,7 @@ func (r *CosmosFullNodeReconciler) Reconcile(ctx context.Context, req ctrl.Reque syncInfo := fullnode.SyncInfoStatus(ctx, crd, r.cacheController) - defer r.updateStatus(ctx, crd, &syncInfo) + defer r.updateStatus(ctx, crd, syncInfo) errs := &kube.ReconcileErrors{} @@ -172,7 +172,7 @@ func (r *CosmosFullNodeReconciler) Reconcile(ctx context.Context, req ctrl.Reque } // Reconcile pods. - podRequeue, err := r.podControl.Reconcile(ctx, reporter, crd, configCksums, &syncInfo) + podRequeue, err := r.podControl.Reconcile(ctx, reporter, crd, configCksums, syncInfo) if err != nil { errs.Append(err) } @@ -221,19 +221,19 @@ func (r *CosmosFullNodeReconciler) resultWithErr(crd *cosmosv1.CosmosFullNode, e return stopResult, err } -func (r *CosmosFullNodeReconciler) updateStatus(ctx context.Context, crd *cosmosv1.CosmosFullNode, syncInfo *cosmosv1.SyncInfoStatus) { +func (r *CosmosFullNodeReconciler) updateStatus(ctx context.Context, crd *cosmosv1.CosmosFullNode, syncInfo map[string]*cosmosv1.SyncInfoPodStatus) { if err := r.statusClient.SyncUpdate(ctx, client.ObjectKeyFromObject(crd), func(status *cosmosv1.FullNodeStatus) { status.ObservedGeneration = crd.Status.ObservedGeneration status.Phase = crd.Status.Phase status.StatusMessage = crd.Status.StatusMessage status.Peers = crd.Status.Peers status.SyncInfo = syncInfo - for _, v := range syncInfo.Pods { + for k, v := range syncInfo { if v.Height != nil && *v.Height > 0 { if status.Height == nil { status.Height = make(map[string]uint64) } - status.Height[v.Pod] = *v.Height + status.Height[k] = *v.Height } } }); err != nil { diff --git a/internal/cosmos/cache_controller.go b/internal/cosmos/cache_controller.go index a4b71d92..9db49969 100644 --- a/internal/cosmos/cache_controller.go +++ b/internal/cosmos/cache_controller.go @@ -149,6 +149,22 @@ func (c *CacheController) Reconcile(ctx context.Context, req reconcile.Request) return finishResult, nil } +// Invalidate removes the given pods status from the cache. +func (c *CacheController) Invalidate(controller client.ObjectKey, pods []string) { + v, _ := c.cache.Get(controller) + now := time.Now() + for _, s := range v { + for _, pod := range pods { + if s.Pod.Name == pod { + s.Status = CometStatus{} + s.Err = fmt.Errorf("invalidated") + s.TS = now + } + } + } + c.cache.Update(controller, v) +} + // Collect returns a StatusCollection for the given controller. Only returns cached CometStatus. func (c *CacheController) Collect(ctx context.Context, controller client.ObjectKey) StatusCollection { pods, err := c.listPods(ctx, controller) diff --git a/internal/fullnode/pod_control.go b/internal/fullnode/pod_control.go index 28a81bd9..8fc8a0d0 100644 --- a/internal/fullnode/pod_control.go +++ b/internal/fullnode/pod_control.go @@ -23,17 +23,23 @@ type Client interface { Scheme() *runtime.Scheme } +type CacheInvalidator interface { + Invalidate(controller client.ObjectKey, pods []string) +} + // PodControl reconciles pods for a CosmosFullNode. type PodControl struct { - client Client - computeRollout func(maxUnavail *intstr.IntOrString, desired, ready int) int + client Client + cacheInvalidator CacheInvalidator + computeRollout func(maxUnavail *intstr.IntOrString, desired, ready int) int } // NewPodControl returns a valid PodControl. -func NewPodControl(client Client) PodControl { +func NewPodControl(client Client, cacheInvalidator CacheInvalidator) PodControl { return PodControl{ - client: client, - computeRollout: kube.ComputeRollout, + client: client, + cacheInvalidator: cacheInvalidator, + computeRollout: kube.ComputeRollout, } } @@ -44,7 +50,7 @@ func (pc PodControl) Reconcile( reporter kube.Reporter, crd *cosmosv1.CosmosFullNode, cksums ConfigChecksums, - syncInfo *cosmosv1.SyncInfoStatus, + syncInfo map[string]*cosmosv1.SyncInfoPodStatus, ) (bool, kube.ReconcileError) { var pods corev1.PodList if err := pc.client.List(ctx, &pods, @@ -70,11 +76,24 @@ func (pc PodControl) Reconcile( } } + var invalidateCache []string + + defer func() { + if pc.cacheInvalidator == nil { + return + } + if len(invalidateCache) > 0 { + pc.cacheInvalidator.Invalidate(client.ObjectKeyFromObject(crd), invalidateCache) + } + }() + for _, pod := range diffed.Deletes() { reporter.Info("Deleting pod", "name", pod.Name) if err := pc.client.Delete(ctx, pod, client.PropagationPolicy(metav1.DeletePropagationForeground)); kube.IgnoreNotFound(err) != nil { return true, kube.TransientError(fmt.Errorf("delete pod %q: %w", pod.Name, err)) } + delete(syncInfo, pod.Name) + invalidateCache = append(invalidateCache, pod.Name) } if len(diffed.Creates())+len(diffed.Deletes()) > 0 { @@ -85,54 +104,49 @@ func (pc PodControl) Reconcile( diffedUpdates := diffed.Updates() if len(diffedUpdates) > 0 { var ( - upgradePods = make(map[string]bool) - otherUpdates = make(map[string]*corev1.Pod) + updatedPods = 0 rpcReachablePods = 0 inSyncPods = 0 + otherUpdates = []*corev1.Pod{} ) - PodsLoop: - for _, ps := range syncInfo.Pods { - for _, existing := range pods.Items { - if existing.Name == ps.Pod { - if existing.DeletionTimestamp != nil { - continue PodsLoop - } - break - } - } + for _, existing := range pods.Items { + podName := existing.Name - if ps.InSync != nil && *ps.InSync { - inSyncPods++ + if existing.DeletionTimestamp != nil { + // Pod is being deleted, so we skip it. + continue } - rpcReachable := ps.Error == nil - if rpcReachable { - rpcReachablePods++ + + var rpcReachable bool + if ps, ok := syncInfo[podName]; ok { + if ps.InSync != nil && *ps.InSync { + inSyncPods++ + } + rpcReachable = ps.Error == nil + if rpcReachable { + rpcReachablePods++ + } } for _, update := range diffedUpdates { - if ps.Pod == update.Name { - awaitingUpgrade := false - for _, existing := range pods.Items { - if existing.Name == ps.Pod { - if existing.Spec.Containers[0].Image != update.Spec.Containers[0].Image { - awaitingUpgrade = true - } - break - } - } - if awaitingUpgrade { + if podName == update.Name { + if existing.Spec.Containers[0].Image != update.Spec.Containers[0].Image { + // awaiting upgrade if !rpcReachable { - upgradePods[ps.Pod] = true - reporter.Info("Deleting pod for version upgrade", "name", ps.Pod) + updatedPods++ + reporter.Info("Deleting pod for version upgrade", "name", podName) // Because we should watch for deletes, we get a re-queued request, detect pod is missing, and re-create it. if err := pc.client.Delete(ctx, update, client.PropagationPolicy(metav1.DeletePropagationForeground)); client.IgnoreNotFound(err) != nil { - return true, kube.TransientError(fmt.Errorf("upgrade pod version %q: %w", ps.Pod, err)) + return true, kube.TransientError(fmt.Errorf("upgrade pod version %q: %w", podName, err)) } + syncInfo[podName].InSync = nil + syncInfo[podName].Error = ptr("version upgrade in progress") + invalidateCache = append(invalidateCache, podName) } else { - otherUpdates[ps.Pod] = update + otherUpdates = append(otherUpdates, update) } } else { - otherUpdates[ps.Pod] = update + otherUpdates = append(otherUpdates, update) } break } @@ -148,32 +162,34 @@ func (pc PodControl) Reconcile( numUpdates := pc.computeRollout(crd.Spec.RolloutStrategy.MaxUnavailable, int(crd.Spec.Replicas), ready) - updated := len(upgradePods) - - if updated == len(diffedUpdates) { + if updatedPods == len(diffedUpdates) { // All pods are updated. return false, nil } - if updated >= numUpdates { + if updatedPods >= numUpdates { // Signal requeue. return true, nil } - for podName, pod := range otherUpdates { - reporter.Info("Deleting pod for update", "podName", podName) + for _, pod := range otherUpdates { + podName := pod.Name + reporter.Info("Deleting pod for update", "name", podName) // Because we should watch for deletes, we get a re-queued request, detect pod is missing, and re-create it. if err := pc.client.Delete(ctx, pod, client.PropagationPolicy(metav1.DeletePropagationForeground)); client.IgnoreNotFound(err) != nil { return true, kube.TransientError(fmt.Errorf("update pod %q: %w", podName, err)) } - updated++ - if updated >= numUpdates { + syncInfo[podName].InSync = nil + syncInfo[podName].Error = ptr("update in progress") + invalidateCache = append(invalidateCache, podName) + updatedPods++ + if updatedPods >= numUpdates { // done for this round break } } - if len(diffedUpdates) == updated { + if len(diffedUpdates) == updatedPods { // All pods are updated. return false, nil } diff --git a/internal/fullnode/pod_control_test.go b/internal/fullnode/pod_control_test.go index bafa123d..023857c2 100644 --- a/internal/fullnode/pod_control_test.go +++ b/internal/fullnode/pod_control_test.go @@ -15,11 +15,51 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" ) +type mockPodClient struct{ mockClient[*corev1.Pod] } + +func newMockPodClient(pods []*corev1.Pod) *mockPodClient { + return &mockPodClient{ + mockClient: mockClient[*corev1.Pod]{ + ObjectList: corev1.PodList{ + Items: valueSlice(pods), + }, + }, + } +} + +func (c *mockPodClient) setPods(pods []*corev1.Pod) { + c.ObjectList = corev1.PodList{ + Items: valueSlice(pods), + } +} + +func (c *mockPodClient) upgradePods( + t *testing.T, + crdName string, + ordinals ...int, +) { + existing := ptrSlice(c.ObjectList.(corev1.PodList).Items) + for _, ordinal := range ordinals { + updatePod(t, crdName, ordinal, existing, newPodWithNewImage, true) + } + c.setPods(existing) +} + +func (c *mockPodClient) deletePods( + t *testing.T, + crdName string, + ordinals ...int, +) { + existing := ptrSlice(c.ObjectList.(corev1.PodList).Items) + for _, ordinal := range ordinals { + updatePod(t, crdName, ordinal, existing, deletedPod, false) + } + c.setPods(existing) +} + func TestPodControl_Reconcile(t *testing.T) { t.Parallel() - type mockPodClient = mockClient[*corev1.Pod] - ctx := context.Background() const namespace = "test" @@ -31,23 +71,17 @@ func TestPodControl_Reconcile(t *testing.T) { pods, err := BuildPods(&crd, nil) require.NoError(t, err) - existing := diff.New(nil, pods).Creates()[0] + existing := diff.New(nil, pods).Creates() - var mClient mockPodClient - mClient.ObjectList = corev1.PodList{ - Items: []corev1.Pod{*existing}, - } + require.Len(t, existing, 1) - syncInfo := &cosmosv1.SyncInfoStatus{ - Pods: []cosmosv1.SyncInfoPodStatus{ - { - Pod: "hub-0", - InSync: ptr(true), - }, - }, + mClient := newMockPodClient(existing) + + syncInfo := map[string]*cosmosv1.SyncInfoPodStatus{ + "hub-0": {InSync: ptr(true)}, } - control := NewPodControl(&mClient) + control := NewPodControl(mClient, nil) requeue, err := control.Reconcile(ctx, nopReporter, &crd, nil, syncInfo) require.NoError(t, err) require.False(t, requeue) @@ -68,15 +102,12 @@ func TestPodControl_Reconcile(t *testing.T) { crd.Namespace = namespace crd.Spec.Replicas = 3 - var mClient mockPodClient - mClient.ObjectList = corev1.PodList{ - Items: []corev1.Pod{ - {ObjectMeta: metav1.ObjectMeta{Name: "hub-98"}}, - {ObjectMeta: metav1.ObjectMeta{Name: "hub-99"}}, - }, - } + mClient := newMockPodClient([]*corev1.Pod{ + {ObjectMeta: metav1.ObjectMeta{Name: "hub-98"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "hub-99"}}, + }) - control := NewPodControl(&mClient) + control := NewPodControl(mClient, nil) requeue, err := control.Reconcile(ctx, nopReporter, &crd, nil, nil) require.NoError(t, err) require.True(t, requeue) @@ -101,40 +132,18 @@ func TestPodControl_Reconcile(t *testing.T) { pods, err := BuildPods(&crd, nil) require.NoError(t, err) - existing := diff.New(nil, pods).Creates() - mClient := mockPodClient{ - ObjectList: corev1.PodList{ - Items: valueSlice(existing), - }, - } + mClient := newMockPodClient(diff.New(nil, pods).Creates()) - syncInfo := &cosmosv1.SyncInfoStatus{ - Pods: []cosmosv1.SyncInfoPodStatus{ - { - Pod: "hub-0", - InSync: ptr(true), - }, - { - Pod: "hub-1", - InSync: ptr(true), - }, - { - Pod: "hub-2", - InSync: ptr(true), - }, - { - Pod: "hub-3", - InSync: ptr(true), - }, - { - Pod: "hub-4", - InSync: ptr(true), - }, - }, + syncInfo := map[string]*cosmosv1.SyncInfoPodStatus{ + "hub-0": {InSync: ptr(true)}, + "hub-1": {InSync: ptr(true)}, + "hub-2": {InSync: ptr(true)}, + "hub-3": {InSync: ptr(true)}, + "hub-4": {InSync: ptr(true)}, } - control := NewPodControl(&mClient) + control := NewPodControl(mClient, nil) control.computeRollout = func(maxUnavail *intstr.IntOrString, desired, ready int) int { require.EqualValues(t, crd.Spec.Replicas, desired) @@ -150,13 +159,7 @@ func TestPodControl_Reconcile(t *testing.T) { require.Zero(t, mClient.CreateCount) - now := metav1.Now() - existing[0].DeletionTimestamp = ptr(now) - existing[1].DeletionTimestamp = ptr(now) - - mClient.ObjectList = corev1.PodList{ - Items: valueSlice(existing), - } + mClient.deletePods(t, crd.Name, 0, 1) control.computeRollout = func(maxUnavail *intstr.IntOrString, desired, ready int) int { require.EqualValues(t, crd.Spec.Replicas, desired) @@ -174,22 +177,13 @@ func TestPodControl_Reconcile(t *testing.T) { require.Equal(t, 2, mClient.DeleteCount) // once pod deletion is complete, new pods are created with new image. - existing[0].Spec.Containers[0].Image = "new-image" - existing[1].Spec.Containers[0].Image = "new-image" - existing[0].DeletionTimestamp = nil - existing[1].DeletionTimestamp = nil - - recalculatePodRevision(existing[0], 0) - recalculatePodRevision(existing[1], 1) - mClient.ObjectList = corev1.PodList{ - Items: valueSlice(existing), - } + mClient.upgradePods(t, crd.Name, 0, 1) - syncInfo.Pods[0].InSync = nil - syncInfo.Pods[0].Error = ptr("upgrade in progress") + syncInfo["hub-0"].InSync = nil + syncInfo["hub-0"].Error = ptr("upgrade in progress") - syncInfo.Pods[1].InSync = nil - syncInfo.Pods[1].Error = ptr("upgrade in progress") + syncInfo["hub-1"].InSync = nil + syncInfo["hub-1"].Error = ptr("upgrade in progress") control.computeRollout = func(maxUnavail *intstr.IntOrString, desired, ready int) int { require.EqualValues(t, crd.Spec.Replicas, desired) @@ -232,44 +226,33 @@ func TestPodControl_Reconcile(t *testing.T) { require.NoError(t, err) existing := diff.New(nil, pods).Creates() - mClient := mockPodClient{ - ObjectList: corev1.PodList{ - Items: valueSlice(existing), - }, - } + mClient := newMockPodClient(existing) // pods are at upgrade height and reachable - syncInfo := &cosmosv1.SyncInfoStatus{ - Pods: []cosmosv1.SyncInfoPodStatus{ - { - Pod: "hub-0", - Height: ptr(uint64(100)), - InSync: ptr(true), - }, - { - Pod: "hub-1", - Height: ptr(uint64(100)), - InSync: ptr(true), - }, - { - Pod: "hub-2", - Height: ptr(uint64(100)), - InSync: ptr(true), - }, - { - Pod: "hub-3", - Height: ptr(uint64(100)), - InSync: ptr(true), - }, - { - Pod: "hub-4", - Height: ptr(uint64(100)), - InSync: ptr(true), - }, + syncInfo := map[string]*cosmosv1.SyncInfoPodStatus{ + "hub-0": { + Height: ptr(uint64(100)), + InSync: ptr(true), + }, + "hub-1": { + Height: ptr(uint64(100)), + InSync: ptr(true), + }, + "hub-2": { + Height: ptr(uint64(100)), + InSync: ptr(true), + }, + "hub-3": { + Height: ptr(uint64(100)), + InSync: ptr(true), + }, + "hub-4": { + Height: ptr(uint64(100)), + InSync: ptr(true), }, } - control := NewPodControl(&mClient) + control := NewPodControl(mClient, nil) control.computeRollout = func(maxUnavail *intstr.IntOrString, desired, ready int) int { require.EqualValues(t, crd.Spec.Replicas, desired) @@ -293,13 +276,7 @@ func TestPodControl_Reconcile(t *testing.T) { require.Zero(t, mClient.CreateCount) require.Equal(t, 2, mClient.DeleteCount) - now := metav1.Now() - existing[0].DeletionTimestamp = ptr(now) - existing[1].DeletionTimestamp = ptr(now) - - mClient.ObjectList = corev1.PodList{ - Items: valueSlice(existing), - } + mClient.deletePods(t, crd.Name, 0, 1) control.computeRollout = func(maxUnavail *intstr.IntOrString, desired, ready int) int { require.EqualValues(t, crd.Spec.Replicas, desired) @@ -316,23 +293,14 @@ func TestPodControl_Reconcile(t *testing.T) { // should not delete any more. require.Equal(t, 2, mClient.DeleteCount) - existing[0].Spec.Containers[0].Image = "new-image" - existing[1].Spec.Containers[0].Image = "new-image" - existing[0].DeletionTimestamp = nil - existing[1].DeletionTimestamp = nil - - recalculatePodRevision(existing[0], 0) - recalculatePodRevision(existing[1], 1) - mClient.ObjectList = corev1.PodList{ - Items: valueSlice(existing), - } + mClient.upgradePods(t, crd.Name, 0, 1) // 0 and 1 are now unavailable, working on upgrade - syncInfo.Pods[0].InSync = nil - syncInfo.Pods[0].Error = ptr("upgrade in progress") + syncInfo["hub-0"].InSync = nil + syncInfo["hub-0"].Error = ptr("upgrade in progress") - syncInfo.Pods[1].InSync = nil - syncInfo.Pods[1].Error = ptr("upgrade in progress") + syncInfo["hub-1"].InSync = nil + syncInfo["hub-1"].Error = ptr("upgrade in progress") // Reconcile 2, should not update anything because 0 and 1 are still in progress. @@ -354,9 +322,9 @@ func TestPodControl_Reconcile(t *testing.T) { require.Equal(t, 2, mClient.DeleteCount) // mock out that one of the pods completed the upgrade. should begin upgrading one more - syncInfo.Pods[0].InSync = ptr(true) - syncInfo.Pods[0].Height = ptr(uint64(101)) - syncInfo.Pods[0].Error = nil + syncInfo["hub-0"].InSync = ptr(true) + syncInfo["hub-0"].Height = ptr(uint64(101)) + syncInfo["hub-0"].Error = nil // Reconcile 3, should update pod 2 (only one) because 1 is still in progress, but 0 is done. @@ -377,12 +345,7 @@ func TestPodControl_Reconcile(t *testing.T) { // should delete one more require.Equal(t, 3, mClient.DeleteCount) - now = metav1.Now() - existing[2].DeletionTimestamp = ptr(now) - - mClient.ObjectList = corev1.PodList{ - Items: valueSlice(existing), - } + mClient.deletePods(t, crd.Name, 2) control.computeRollout = func(maxUnavail *intstr.IntOrString, desired, ready int) int { require.EqualValues(t, crd.Spec.Replicas, desired) @@ -399,21 +362,16 @@ func TestPodControl_Reconcile(t *testing.T) { // should not delete any more. require.Equal(t, 3, mClient.DeleteCount) - existing[2].Spec.Containers[0].Image = "new-image" - existing[2].DeletionTimestamp = nil - recalculatePodRevision(existing[2], 2) - mClient.ObjectList = corev1.PodList{ - Items: valueSlice(existing), - } + mClient.upgradePods(t, crd.Name, 2) // mock out that both pods completed the upgrade. should begin upgrading the last 2 - syncInfo.Pods[1].InSync = ptr(true) - syncInfo.Pods[1].Height = ptr(uint64(101)) - syncInfo.Pods[1].Error = nil + syncInfo["hub-1"].InSync = ptr(true) + syncInfo["hub-1"].Height = ptr(uint64(101)) + syncInfo["hub-1"].Error = nil - syncInfo.Pods[2].InSync = ptr(true) - syncInfo.Pods[2].Height = ptr(uint64(101)) - syncInfo.Pods[2].Error = nil + syncInfo["hub-2"].InSync = ptr(true) + syncInfo["hub-2"].Height = ptr(uint64(101)) + syncInfo["hub-2"].Error = nil // Reconcile 4, should update 3 and 4 because the rest are done. @@ -461,44 +419,33 @@ func TestPodControl_Reconcile(t *testing.T) { require.NoError(t, err) existing := diff.New(nil, pods).Creates() - mClient := mockPodClient{ - ObjectList: corev1.PodList{ - Items: valueSlice(existing), - }, - } + mClient := newMockPodClient(existing) // pods are at upgrade height and reachable - syncInfo := &cosmosv1.SyncInfoStatus{ - Pods: []cosmosv1.SyncInfoPodStatus{ - { - Pod: "hub-0", - Height: ptr(uint64(100)), - Error: ptr("panic at upgrade height"), - }, - { - Pod: "hub-1", - Height: ptr(uint64(100)), - Error: ptr("panic at upgrade height"), - }, - { - Pod: "hub-2", - Height: ptr(uint64(100)), - Error: ptr("panic at upgrade height"), - }, - { - Pod: "hub-3", - Height: ptr(uint64(100)), - Error: ptr("panic at upgrade height"), - }, - { - Pod: "hub-4", - Height: ptr(uint64(100)), - Error: ptr("panic at upgrade height"), - }, + syncInfo := map[string]*cosmosv1.SyncInfoPodStatus{ + "hub-0": { + Height: ptr(uint64(100)), + Error: ptr("panic at upgrade height"), + }, + "hub-1": { + Height: ptr(uint64(100)), + Error: ptr("panic at upgrade height"), + }, + "hub-2": { + Height: ptr(uint64(100)), + Error: ptr("panic at upgrade height"), + }, + "hub-3": { + Height: ptr(uint64(100)), + Error: ptr("panic at upgrade height"), + }, + "hub-4": { + Height: ptr(uint64(100)), + Error: ptr("panic at upgrade height"), }, } - control := NewPodControl(&mClient) + control := NewPodControl(mClient, nil) control.computeRollout = func(maxUnavail *intstr.IntOrString, desired, ready int) int { require.EqualValues(t, crd.Spec.Replicas, desired) @@ -530,3 +477,27 @@ func recalculatePodRevision(pod *corev1.Pod, ordinal int) { pod.Labels["app.kubernetes.io/revision"] = rev1 pod.Annotations["app.kubernetes.io/ordinal"] = fmt.Sprintf("%d", ordinal) } + +func newPodWithNewImage(pod *corev1.Pod) { + pod.DeletionTimestamp = nil + pod.Spec.Containers[0].Image = "new-image" +} + +func deletedPod(pod *corev1.Pod) { + pod.DeletionTimestamp = ptr(metav1.Now()) +} + +func updatePod(t *testing.T, crdName string, ordinal int, pods []*corev1.Pod, updateFn func(pod *corev1.Pod), recalc bool) { + podName := fmt.Sprintf("%s-%d", crdName, ordinal) + for _, pod := range pods { + if pod.Name == podName { + updateFn(pod) + if recalc { + recalculatePodRevision(pod, ordinal) + } + return + } + } + + require.FailNow(t, "pod not found", podName) +} diff --git a/internal/fullnode/status.go b/internal/fullnode/status.go index 02842c05..4ca46360 100644 --- a/internal/fullnode/status.go +++ b/internal/fullnode/status.go @@ -3,7 +3,6 @@ package fullnode import ( "context" - "github.com/samber/lo" cosmosv1 "github.com/strangelove-ventures/cosmos-operator/api/v1" "github.com/strangelove-ventures/cosmos-operator/internal/cosmos" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -27,24 +26,25 @@ func SyncInfoStatus( ctx context.Context, crd *cosmosv1.CosmosFullNode, collector StatusCollector, -) cosmosv1.SyncInfoStatus { - var status cosmosv1.SyncInfoStatus +) map[string]*cosmosv1.SyncInfoPodStatus { + status := make(map[string]*cosmosv1.SyncInfoPodStatus, crd.Spec.Replicas) coll := collector.Collect(ctx, client.ObjectKeyFromObject(crd)) - status.Pods = lo.Map(coll, func(item cosmos.StatusItem, _ int) cosmosv1.SyncInfoPodStatus { + for _, item := range coll { var stat cosmosv1.SyncInfoPodStatus - stat.Pod = item.GetPod().Name + podName := item.GetPod().Name stat.Timestamp = metav1.NewTime(item.Timestamp()) comet, err := item.GetStatus() if err != nil { stat.Error = ptr(err.Error()) - return stat + status[podName] = &stat + continue } stat.Height = ptr(comet.LatestBlockHeight()) stat.InSync = ptr(!comet.Result.SyncInfo.CatchingUp) - return stat - }) + status[podName] = &stat + } return status } diff --git a/internal/fullnode/status_test.go b/internal/fullnode/status_test.go index b3763169..03a52ee0 100644 --- a/internal/fullnode/status_test.go +++ b/internal/fullnode/status_test.go @@ -72,24 +72,20 @@ func TestSyncInfoStatus(t *testing.T) { } wantTS := metav1.NewTime(ts) - want := cosmosv1.SyncInfoStatus{ - Pods: []cosmosv1.SyncInfoPodStatus{ - { - Pod: "pod-0", - Timestamp: wantTS, - Height: ptr(uint64(9999)), - InSync: ptr(false), - }, - {Pod: "pod-1", - Timestamp: wantTS, - Height: ptr(uint64(10000)), - InSync: ptr(true), - }, - { - Pod: "pod-2", - Timestamp: wantTS, - Error: ptr("some error"), - }, + want := map[string]*cosmosv1.SyncInfoPodStatus{ + "pod-0": { + Timestamp: wantTS, + Height: ptr(uint64(9999)), + InSync: ptr(false), + }, + "pod-1": { + Timestamp: wantTS, + Height: ptr(uint64(10000)), + InSync: ptr(true), + }, + "pod-2": { + Timestamp: wantTS, + Error: ptr("some error"), }, }