diff --git a/plan.go b/plan.go index f26043c..6a2b0f5 100644 --- a/plan.go +++ b/plan.go @@ -1,8 +1,13 @@ package main import ( + "time" + + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" batchv1 "k8s.io/client-go/pkg/apis/batch/v1" batchv2alpha1 "k8s.io/client-go/pkg/apis/batch/v2alpha1" @@ -65,6 +70,28 @@ func getObjectNamespaces(objects []runtime.Object) []string { return namespaces } +func waitForObjectDeletion(object runtime.Object, clientset *kubernetes.Clientset) error { + return wait.PollImmediate(time.Millisecond*100, time.Second*60, func() (bool, error) { + var err error + switch t := object.(type) { + case *batchv1.Job: + metadata, _ := meta.Accessor(t) + _, err = clientset.BatchV1().Jobs(metadata.GetNamespace()).Get(metadata.GetName(), metav1.GetOptions{}) + case *batchv2alpha1.CronJob: + metadata, _ := meta.Accessor(t) + _, err = clientset.BatchV2alpha1().CronJobs(metadata.GetNamespace()).Get(metadata.GetName(), metav1.GetOptions{}) + } + + if err == nil { + return false, nil + } else if errors.IsNotFound(err) { + return true, nil + } else { + return false, err + } + }) +} + func generatePlan(filenames []string, label *string, clientset *kubernetes.Clientset) []Step { files := readFiles(filenames) objects := make([]runtime.Object, 0, 1) @@ -162,14 +189,14 @@ func executePlan(plan []Step, clientset *kubernetes.Clientset) { if dstGVK.Kind == "Job" { //todo: set propagation policy? - err := clientset.BatchV1().Jobs(dstMetadata.GetNamespace()).Delete(srcMetadata.GetName(), nil) + err := clientset.BatchV1().Jobs(dstMetadata.GetNamespace()).Delete(dstMetadata.GetName(), nil) if err != nil { panic(err) } - //todo: wait until deleted - //todo: wait/retry if object is being deleted + waitForObjectDeletion(dst, clientset) + _, err = clientset.BatchV1().Jobs(srcMetadata.GetNamespace()).Create(src.(*batchv1.Job)) if err != nil { @@ -177,13 +204,14 @@ func executePlan(plan []Step, clientset *kubernetes.Clientset) { } } else if dstGVK.Kind == "CronJob" { //todo: set propagation policy? + //todo: delete current CronJob child Jobs err := clientset.BatchV2alpha1().CronJobs(dstMetadata.GetNamespace()).Delete(dstMetadata.GetName(), nil) if err != nil { panic(err) } - //todo: wait until deleted - //todo: wait/retry if object is being deleted + waitForObjectDeletion(dst, clientset) + _, err = clientset.BatchV1().Jobs(srcMetadata.GetNamespace()).Create(src.(*batchv1.Job)) if err != nil { @@ -195,14 +223,14 @@ func executePlan(plan []Step, clientset *kubernetes.Clientset) { if dstGVK.Kind == "Job" { //todo: set propagation policy? - err := clientset.BatchV1().Jobs(dstMetadata.GetNamespace()).Delete(srcMetadata.GetName(), nil) + err := clientset.BatchV1().Jobs(dstMetadata.GetNamespace()).Delete(dstMetadata.GetName(), nil) if err != nil { panic(err) } - //todo: wait until deleted - //todo: wait/retry if object is being deleted + waitForObjectDeletion(dst, clientset) + _, err = clientset.BatchV2alpha1().CronJobs(srcMetadata.GetNamespace()).Create(src.(*batchv2alpha1.CronJob)) if err != nil {