Skip to content
This repository has been archived by the owner on Jun 17, 2022. It is now read-only.

Commit

Permalink
add wait for object deletion
Browse files Browse the repository at this point in the history
  • Loading branch information
imsky committed Mar 16, 2018
1 parent da71cbd commit 3f0edd6
Showing 1 changed file with 36 additions and 8 deletions.
44 changes: 36 additions & 8 deletions plan.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
package main

import (
"time"

"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
batchv1 "k8s.io/client-go/pkg/apis/batch/v1"
batchv2alpha1 "k8s.io/client-go/pkg/apis/batch/v2alpha1"
Expand Down Expand Up @@ -65,6 +70,28 @@ func getObjectNamespaces(objects []runtime.Object) []string {
return namespaces
}

func waitForObjectDeletion(object runtime.Object, clientset *kubernetes.Clientset) error {
return wait.PollImmediate(time.Millisecond*100, time.Second*60, func() (bool, error) {
var err error
switch t := object.(type) {
case *batchv1.Job:
metadata, _ := meta.Accessor(t)
_, err = clientset.BatchV1().Jobs(metadata.GetNamespace()).Get(metadata.GetName(), metav1.GetOptions{})
case *batchv2alpha1.CronJob:
metadata, _ := meta.Accessor(t)
_, err = clientset.BatchV2alpha1().CronJobs(metadata.GetNamespace()).Get(metadata.GetName(), metav1.GetOptions{})
}

if err == nil {
return false, nil
} else if errors.IsNotFound(err) {
return true, nil
} else {
return false, err
}
})
}

func generatePlan(filenames []string, label *string, clientset *kubernetes.Clientset) []Step {
files := readFiles(filenames)
objects := make([]runtime.Object, 0, 1)
Expand Down Expand Up @@ -162,28 +189,29 @@ func executePlan(plan []Step, clientset *kubernetes.Clientset) {

if dstGVK.Kind == "Job" {
//todo: set propagation policy?
err := clientset.BatchV1().Jobs(dstMetadata.GetNamespace()).Delete(srcMetadata.GetName(), nil)
err := clientset.BatchV1().Jobs(dstMetadata.GetNamespace()).Delete(dstMetadata.GetName(), nil)

if err != nil {
panic(err)
}

//todo: wait until deleted
//todo: wait/retry if object is being deleted
waitForObjectDeletion(dst, clientset)

_, err = clientset.BatchV1().Jobs(srcMetadata.GetNamespace()).Create(src.(*batchv1.Job))

if err != nil {
panic(err)
}
} else if dstGVK.Kind == "CronJob" {
//todo: set propagation policy?
//todo: delete current CronJob child Jobs
err := clientset.BatchV2alpha1().CronJobs(dstMetadata.GetNamespace()).Delete(dstMetadata.GetName(), nil)
if err != nil {
panic(err)
}

//todo: wait until deleted
//todo: wait/retry if object is being deleted
waitForObjectDeletion(dst, clientset)

_, err = clientset.BatchV1().Jobs(srcMetadata.GetNamespace()).Create(src.(*batchv1.Job))

if err != nil {
Expand All @@ -195,14 +223,14 @@ func executePlan(plan []Step, clientset *kubernetes.Clientset) {

if dstGVK.Kind == "Job" {
//todo: set propagation policy?
err := clientset.BatchV1().Jobs(dstMetadata.GetNamespace()).Delete(srcMetadata.GetName(), nil)
err := clientset.BatchV1().Jobs(dstMetadata.GetNamespace()).Delete(dstMetadata.GetName(), nil)

if err != nil {
panic(err)
}

//todo: wait until deleted
//todo: wait/retry if object is being deleted
waitForObjectDeletion(dst, clientset)

_, err = clientset.BatchV2alpha1().CronJobs(srcMetadata.GetNamespace()).Create(src.(*batchv2alpha1.CronJob))

if err != nil {
Expand Down

0 comments on commit 3f0edd6

Please sign in to comment.