diff --git a/core/services/feeds/service.go b/core/services/feeds/service.go index f986d1753af..6c527408883 100644 --- a/core/services/feeds/service.go +++ b/core/services/feeds/service.go @@ -510,6 +510,27 @@ func (s *service) DeleteJob(ctx context.Context, args *DeleteJobArgs) (int64, er logger.Errorw("Failed to push metrics for job proposal deletion", "err", err) } + // auto-cancellation for Workflow specs + if !proposal.ExternalJobID.Valid { + logger.Infow("ExternalJobID is null", "id", proposal.ID, "name", proposal.Name) + return proposal.ID, nil + } + job, err := s.jobORM.FindJobByExternalJobID(ctx, proposal.ExternalJobID.UUID) + if err != nil { + // NOTE: at this stage, we don't know if this job is of Workflow type + // so we don't want to return an error + logger.Infow("FindJobByExternalJobID failed", "id", proposal.ID, "externalJobID", proposal.ExternalJobID.UUID, "name", proposal.Name) + return proposal.ID, nil + } + if job.WorkflowSpecID != nil { // this is a Workflow job + specID := int64(*job.WorkflowSpecID) + if err := s.CancelSpec(ctx, proposal.ID); err != nil { + logger.Errorw("Failed to auto-cancel workflow spec", "id", specID, "err", err, "name", job.Name) + return 0, fmt.Errorf("failed to auto-cancel workflow spec %d: %w", specID, err) + } + logger.Infow("Successfully auto-cancelled a workflow spec", "id", specID) + } + return proposal.ID, nil } diff --git a/core/services/feeds/service_test.go b/core/services/feeds/service_test.go index 115695d8514..09a3ac4d705 100644 --- a/core/services/feeds/service_test.go +++ b/core/services/feeds/service_test.go @@ -1269,12 +1269,25 @@ func Test_Service_DeleteJob(t *testing.T) { } approved = feeds.JobProposal{ - ID: 1, + ID: 321, FeedsManagerID: 1, RemoteUUID: remoteUUID, + ExternalJobID: uuid.NullUUID{UUID: uuid.New(), Valid: true}, Status: feeds.JobProposalStatusApproved, } + wfSpecID = int32(4321) + workflowJob = job.Job{ + ID: 1, + WorkflowSpecID: &wfSpecID, + } + spec = &feeds.JobProposalSpec{ + ID: 20, + Status: feeds.SpecStatusApproved, + JobProposalID: approved.ID, + Version: 1, + } + httpTimeout = *commonconfig.MustNewDuration(1 * time.Second) ) @@ -1291,6 +1304,7 @@ func Test_Service_DeleteJob(t *testing.T) { svc.orm.On("GetJobProposalByRemoteUUID", mock.Anything, approved.RemoteUUID).Return(&approved, nil) svc.orm.On("DeleteProposal", mock.Anything, approved.ID).Return(nil) svc.orm.On("CountJobProposalsByStatus", mock.Anything).Return(&feeds.JobProposalCounts{}, nil) + svc.jobORM.On("FindJobByExternalJobID", mock.Anything, approved.ExternalJobID.UUID).Return(job.Job{}, sql.ErrNoRows) }, args: args, wantID: approved.ID, @@ -1334,6 +1348,37 @@ func Test_Service_DeleteJob(t *testing.T) { args: args, wantErr: "DeleteProposal failed", }, + { + name: "Delete workflow-spec with auto-cancellation", + before: func(svc *TestService) { + svc.orm.On("GetJobProposalByRemoteUUID", mock.Anything, approved.RemoteUUID).Return(&approved, nil) + svc.orm.On("DeleteProposal", mock.Anything, approved.ID).Return(nil) + svc.orm.On("CountJobProposalsByStatus", mock.Anything).Return(&feeds.JobProposalCounts{}, nil) + svc.jobORM.On("FindJobByExternalJobID", mock.Anything, approved.ExternalJobID.UUID).Return(workflowJob, nil) + + // mocks for CancelSpec() + svc.orm.On("GetSpec", mock.Anything, approved.ID).Return(spec, nil) + svc.orm.On("GetJobProposal", mock.Anything, approved.ID).Return(&approved, nil) + svc.connMgr.On("GetClient", mock.Anything).Return(svc.fmsClient, nil) + + svc.orm.On("CancelSpec", mock.Anything, approved.ID).Return(nil) + svc.jobORM.On("FindJobByExternalJobID", mock.Anything, approved.ExternalJobID.UUID).Return(workflowJob, nil) + svc.spawner.On("DeleteJob", mock.Anything, mock.Anything, workflowJob.ID).Return(nil) + + svc.fmsClient.On("CancelledJob", + mock.MatchedBy(func(ctx context.Context) bool { return true }), + &proto.CancelledJobRequest{ + Uuid: approved.RemoteUUID.String(), + Version: int64(spec.Version), + }, + ).Return(&proto.CancelledJobResponse{}, nil) + svc.orm.On("CountJobProposalsByStatus", mock.Anything).Return(&feeds.JobProposalCounts{}, nil) + svc.orm.On("WithDataSource", mock.Anything).Return(feeds.ORM(svc.orm)) + svc.jobORM.On("WithDataSource", mock.Anything).Return(job.ORM(svc.jobORM)) + }, + args: args, + wantID: approved.ID, + }, } for _, tc := range testCases {