Skip to content

Commit

Permalink
cleanup tests and add cache invalidation
Browse files Browse the repository at this point in the history
  • Loading branch information
agouin committed Oct 25, 2023
1 parent 9f5ee71 commit f81358f
Show file tree
Hide file tree
Showing 9 changed files with 295 additions and 326 deletions.
9 changes: 1 addition & 8 deletions api/v1/cosmosfullnode_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,21 +138,14 @@ type FullNodeStatus struct {

// Current sync information. Collected every 60s.
// +optional
SyncInfo *SyncInfoStatus `json:"syncInfo,omitempty"`
SyncInfo map[string]*SyncInfoPodStatus `json:"sync,omitempty"`

// Latest Height information. collected when node starts up and when RPC is successfully queried.
// +optional
Height map[string]uint64 `json:"height,omitempty"`
}

type SyncInfoStatus struct {
// The latest consensus state of pods.
Pods []SyncInfoPodStatus `json:"pods"`
}

type SyncInfoPodStatus struct {
// Pod's name.
Pod string `json:"pod"`
// When consensus information was fetched.
Timestamp metav1.Time `json:"timestamp"`
// Latest height if no error encountered.
Expand Down
36 changes: 12 additions & 24 deletions api/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

53 changes: 21 additions & 32 deletions config/crd/bases/cosmos.strange.love_cosmosfullnodes.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6009,39 +6009,28 @@ spec:
status:
description: A generic message for the user. May contain errors.
type: string
syncInfo:
sync:
additionalProperties:
properties:
error:
description: Error message if unable to fetch consensus state.
type: string
height:
description: Latest height if no error encountered.
format: int64
type: integer
inSync:
description: If the pod reports itself as in sync with chain
tip.
type: boolean
timestamp:
description: When consensus information was fetched.
format: date-time
type: string
required:
- timestamp
type: object
description: Current sync information. Collected every 60s.
properties:
pods:
description: The latest consensus state of pods.
items:
properties:
error:
description: Error message if unable to fetch consensus
state.
type: string
height:
description: Latest height if no error encountered.
format: int64
type: integer
inSync:
description: If the pod reports itself as in sync with chain
tip.
type: boolean
pod:
description: Pod's name.
type: string
timestamp:
description: When consensus information was fetched.
format: date-time
type: string
required:
- pod
- timestamp
type: object
type: array
required:
- pods
type: object
required:
- observedGeneration
Expand Down
12 changes: 6 additions & 6 deletions controllers/cosmosfullnode_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func NewFullNode(
configMapControl: fullnode.NewConfigMapControl(client),
nodeKeyControl: fullnode.NewNodeKeyControl(client),
peerCollector: fullnode.NewPeerCollector(client),
podControl: fullnode.NewPodControl(client),
podControl: fullnode.NewPodControl(client, cacheController),
pvcControl: fullnode.NewPVCControl(client),
recorder: recorder,
serviceControl: fullnode.NewServiceControl(client),
Expand Down Expand Up @@ -121,7 +121,7 @@ func (r *CosmosFullNodeReconciler) Reconcile(ctx context.Context, req ctrl.Reque

syncInfo := fullnode.SyncInfoStatus(ctx, crd, r.cacheController)

defer r.updateStatus(ctx, crd, &syncInfo)
defer r.updateStatus(ctx, crd, syncInfo)

errs := &kube.ReconcileErrors{}

Expand Down Expand Up @@ -172,7 +172,7 @@ func (r *CosmosFullNodeReconciler) Reconcile(ctx context.Context, req ctrl.Reque
}

// Reconcile pods.
podRequeue, err := r.podControl.Reconcile(ctx, reporter, crd, configCksums, &syncInfo)
podRequeue, err := r.podControl.Reconcile(ctx, reporter, crd, configCksums, syncInfo)
if err != nil {
errs.Append(err)
}
Expand Down Expand Up @@ -221,19 +221,19 @@ func (r *CosmosFullNodeReconciler) resultWithErr(crd *cosmosv1.CosmosFullNode, e
return stopResult, err
}

func (r *CosmosFullNodeReconciler) updateStatus(ctx context.Context, crd *cosmosv1.CosmosFullNode, syncInfo *cosmosv1.SyncInfoStatus) {
func (r *CosmosFullNodeReconciler) updateStatus(ctx context.Context, crd *cosmosv1.CosmosFullNode, syncInfo map[string]*cosmosv1.SyncInfoPodStatus) {
if err := r.statusClient.SyncUpdate(ctx, client.ObjectKeyFromObject(crd), func(status *cosmosv1.FullNodeStatus) {
status.ObservedGeneration = crd.Status.ObservedGeneration
status.Phase = crd.Status.Phase
status.StatusMessage = crd.Status.StatusMessage
status.Peers = crd.Status.Peers
status.SyncInfo = syncInfo
for _, v := range syncInfo.Pods {
for k, v := range syncInfo {
if v.Height != nil && *v.Height > 0 {
if status.Height == nil {
status.Height = make(map[string]uint64)
}
status.Height[v.Pod] = *v.Height
status.Height[k] = *v.Height
}
}
}); err != nil {
Expand Down
16 changes: 16 additions & 0 deletions internal/cosmos/cache_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,22 @@ func (c *CacheController) Reconcile(ctx context.Context, req reconcile.Request)
return finishResult, nil
}

// Invalidate removes the given pods status from the cache.
func (c *CacheController) Invalidate(controller client.ObjectKey, pods []string) {
v, _ := c.cache.Get(controller)
now := time.Now()
for _, s := range v {
for _, pod := range pods {
if s.Pod.Name == pod {
s.Status = CometStatus{}
s.Err = fmt.Errorf("invalidated")
s.TS = now
}
}
}
c.cache.Update(controller, v)
}

// Collect returns a StatusCollection for the given controller. Only returns cached CometStatus.
func (c *CacheController) Collect(ctx context.Context, controller client.ObjectKey) StatusCollection {
pods, err := c.listPods(ctx, controller)
Expand Down
112 changes: 64 additions & 48 deletions internal/fullnode/pod_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,23 @@ type Client interface {
Scheme() *runtime.Scheme
}

type CacheInvalidator interface {
Invalidate(controller client.ObjectKey, pods []string)
}

// PodControl reconciles pods for a CosmosFullNode.
type PodControl struct {
client Client
computeRollout func(maxUnavail *intstr.IntOrString, desired, ready int) int
client Client
cacheInvalidator CacheInvalidator
computeRollout func(maxUnavail *intstr.IntOrString, desired, ready int) int
}

// NewPodControl returns a valid PodControl.
func NewPodControl(client Client) PodControl {
func NewPodControl(client Client, cacheInvalidator CacheInvalidator) PodControl {
return PodControl{
client: client,
computeRollout: kube.ComputeRollout,
client: client,
cacheInvalidator: cacheInvalidator,
computeRollout: kube.ComputeRollout,
}
}

Expand All @@ -44,7 +50,7 @@ func (pc PodControl) Reconcile(
reporter kube.Reporter,
crd *cosmosv1.CosmosFullNode,
cksums ConfigChecksums,
syncInfo *cosmosv1.SyncInfoStatus,
syncInfo map[string]*cosmosv1.SyncInfoPodStatus,
) (bool, kube.ReconcileError) {
var pods corev1.PodList
if err := pc.client.List(ctx, &pods,
Expand All @@ -70,11 +76,24 @@ func (pc PodControl) Reconcile(
}
}

var invalidateCache []string

defer func() {
if pc.cacheInvalidator == nil {
return
}
if len(invalidateCache) > 0 {
pc.cacheInvalidator.Invalidate(client.ObjectKeyFromObject(crd), invalidateCache)
}
}()

for _, pod := range diffed.Deletes() {
reporter.Info("Deleting pod", "name", pod.Name)
if err := pc.client.Delete(ctx, pod, client.PropagationPolicy(metav1.DeletePropagationForeground)); kube.IgnoreNotFound(err) != nil {
return true, kube.TransientError(fmt.Errorf("delete pod %q: %w", pod.Name, err))
}
delete(syncInfo, pod.Name)
invalidateCache = append(invalidateCache, pod.Name)
}

if len(diffed.Creates())+len(diffed.Deletes()) > 0 {
Expand All @@ -85,54 +104,49 @@ func (pc PodControl) Reconcile(
diffedUpdates := diffed.Updates()
if len(diffedUpdates) > 0 {
var (
upgradePods = make(map[string]bool)
otherUpdates = make(map[string]*corev1.Pod)
updatedPods = 0
rpcReachablePods = 0
inSyncPods = 0
otherUpdates = []*corev1.Pod{}
)

PodsLoop:
for _, ps := range syncInfo.Pods {
for _, existing := range pods.Items {
if existing.Name == ps.Pod {
if existing.DeletionTimestamp != nil {
continue PodsLoop
}
break
}
}
for _, existing := range pods.Items {
podName := existing.Name

if ps.InSync != nil && *ps.InSync {
inSyncPods++
if existing.DeletionTimestamp != nil {
// Pod is being deleted, so we skip it.
continue
}
rpcReachable := ps.Error == nil
if rpcReachable {
rpcReachablePods++

var rpcReachable bool
if ps, ok := syncInfo[podName]; ok {
if ps.InSync != nil && *ps.InSync {
inSyncPods++
}
rpcReachable = ps.Error == nil
if rpcReachable {
rpcReachablePods++
}
}
for _, update := range diffedUpdates {
if ps.Pod == update.Name {
awaitingUpgrade := false
for _, existing := range pods.Items {
if existing.Name == ps.Pod {
if existing.Spec.Containers[0].Image != update.Spec.Containers[0].Image {
awaitingUpgrade = true
}
break
}
}
if awaitingUpgrade {
if podName == update.Name {
if existing.Spec.Containers[0].Image != update.Spec.Containers[0].Image {
// awaiting upgrade
if !rpcReachable {
upgradePods[ps.Pod] = true
reporter.Info("Deleting pod for version upgrade", "name", ps.Pod)
updatedPods++
reporter.Info("Deleting pod for version upgrade", "name", podName)
// Because we should watch for deletes, we get a re-queued request, detect pod is missing, and re-create it.
if err := pc.client.Delete(ctx, update, client.PropagationPolicy(metav1.DeletePropagationForeground)); client.IgnoreNotFound(err) != nil {
return true, kube.TransientError(fmt.Errorf("upgrade pod version %q: %w", ps.Pod, err))
return true, kube.TransientError(fmt.Errorf("upgrade pod version %q: %w", podName, err))
}
syncInfo[podName].InSync = nil
syncInfo[podName].Error = ptr("version upgrade in progress")
invalidateCache = append(invalidateCache, podName)
} else {
otherUpdates[ps.Pod] = update
otherUpdates = append(otherUpdates, update)
}
} else {
otherUpdates[ps.Pod] = update
otherUpdates = append(otherUpdates, update)
}
break
}
Expand All @@ -148,32 +162,34 @@ func (pc PodControl) Reconcile(

numUpdates := pc.computeRollout(crd.Spec.RolloutStrategy.MaxUnavailable, int(crd.Spec.Replicas), ready)

updated := len(upgradePods)

if updated == len(diffedUpdates) {
if updatedPods == len(diffedUpdates) {
// All pods are updated.
return false, nil
}

if updated >= numUpdates {
if updatedPods >= numUpdates {
// Signal requeue.
return true, nil
}

for podName, pod := range otherUpdates {
reporter.Info("Deleting pod for update", "podName", podName)
for _, pod := range otherUpdates {
podName := pod.Name
reporter.Info("Deleting pod for update", "name", podName)
// Because we should watch for deletes, we get a re-queued request, detect pod is missing, and re-create it.
if err := pc.client.Delete(ctx, pod, client.PropagationPolicy(metav1.DeletePropagationForeground)); client.IgnoreNotFound(err) != nil {
return true, kube.TransientError(fmt.Errorf("update pod %q: %w", podName, err))
}
updated++
if updated >= numUpdates {
syncInfo[podName].InSync = nil
syncInfo[podName].Error = ptr("update in progress")
invalidateCache = append(invalidateCache, podName)
updatedPods++
if updatedPods >= numUpdates {
// done for this round
break
}
}

if len(diffedUpdates) == updated {
if len(diffedUpdates) == updatedPods {
// All pods are updated.
return false, nil
}
Expand Down
Loading

0 comments on commit f81358f

Please sign in to comment.