diff --git a/backend/src/apiserver/resource/resource_manager.go b/backend/src/apiserver/resource/resource_manager.go index 2f4460e2c46..b25ec48af39 100644 --- a/backend/src/apiserver/resource/resource_manager.go +++ b/backend/src/apiserver/resource/resource_manager.go @@ -18,6 +18,7 @@ import ( "context" "encoding/json" "fmt" + scheduledworkflow "github.com/kubeflow/pipelines/backend/src/crd/pkg/apis/scheduledworkflow/v1beta1" "io" "net" "reflect" @@ -582,30 +583,39 @@ func (r *ResourceManager) ReconcileSwfCrs(ctx context.Context) error { return util.Wrap(err, "Failed to reconcile ScheduledWorkflow Kubernetes resources") } +nextJob: for i := range jobs { - tmpl, _, err := r.fetchTemplateFromPipelineSpec(&jobs[i].PipelineSpec) - if err != nil { - return failedToReconcileSwfCrsError(err) - } - - newScheduledWorkflow, err := tmpl.ScheduledWorkflow(jobs[i]) - if err != nil { - return failedToReconcileSwfCrsError(err) - } + retryJob: + for { + tmpl, _, err := r.fetchTemplateFromPipelineSpec(&jobs[i].PipelineSpec) + if err != nil { + return failedToReconcileSwfCrsError(err) + } - currentScheduledWorkflow, err := r.getScheduledWorkflowClient(jobs[i].Namespace).Get(ctx, jobs[i].K8SName, v1.GetOptions{}) - if err != nil { - return failedToReconcileSwfCrsError(err) - } + newScheduledWorkflow, err := tmpl.ScheduledWorkflow(jobs[i]) + if err != nil { + return failedToReconcileSwfCrsError(err) + } - if !reflect.DeepEqual(currentScheduledWorkflow.Spec, newScheduledWorkflow.Spec) { - err = r.patchSwfCrSpec(ctx, jobs[i].Namespace, jobs[i].K8SName, newScheduledWorkflow.Spec) + currentScheduledWorkflow, err := r.getScheduledWorkflowClient(jobs[i].Namespace).Get(ctx, jobs[i].K8SName, v1.GetOptions{}) if err != nil { - if util.IsNotFound(errors.Cause(err)) { - continue - } return failedToReconcileSwfCrsError(err) } + + if !reflect.DeepEqual(currentScheduledWorkflow.Spec, newScheduledWorkflow.Spec) { + newScheduledWorkflow.Name = currentScheduledWorkflow.Name + newScheduledWorkflow.ResourceVersion = currentScheduledWorkflow.ResourceVersion + err = r.updateSwfCrSpec(ctx, jobs[i].Namespace, newScheduledWorkflow) + if err != nil { + if apierrors.IsConflict(errors.Unwrap(err)) { + continue retryJob + } else if util.IsNotFound(errors.Cause(err)) { + continue nextJob + } + return failedToReconcileSwfCrsError(err) + } + } + continue nextJob } } @@ -616,28 +626,11 @@ func failedToReconcileSwfCrsError(err error) error { return util.Wrap(err, "Failed to reconcile ScheduledWorkflow Kubernetes resources") } -func (r *ResourceManager) patchSwfCrSpec(ctx context.Context, k8sNamespace string, crdName string, newSpec interface{}) error { - patchPayload := map[string]interface{}{ - "spec": newSpec, - } - - patchBytes, err := json.Marshal(patchPayload) - if err != nil { - return util.NewInternalServerError(err, - "Failed to marshal patch spec") - } - - _, err = r.getScheduledWorkflowClient(k8sNamespace).Patch( - ctx, - crdName, - types.MergePatchType, - patchBytes, - ) +func (r *ResourceManager) updateSwfCrSpec(ctx context.Context, k8sNamespace string, scheduledWorkflow *scheduledworkflow.ScheduledWorkflow) error { + _, err := r.getScheduledWorkflowClient(k8sNamespace).Update(ctx, scheduledWorkflow) if err != nil { - return util.NewInternalServerError(err, - "Failed to patch ScheduledWorkflow") + return util.Wrap(err, "Failed to update ScheduledWorkflow") } - return nil }