diff --git a/database/sql/jobs.go b/database/sql/jobs.go index 0201428f..f3c8ae72 100644 --- a/database/sql/jobs.go +++ b/database/sql/jobs.go @@ -93,7 +93,14 @@ func (s *sqlDatabase) paramsJobToWorkflowJob(ctx context.Context, job params.Job return workflofJob, nil } -func (s *sqlDatabase) DeleteJob(_ context.Context, jobID int64) error { +func (s *sqlDatabase) DeleteJob(_ context.Context, jobID int64) (err error) { + defer func() { + if err == nil { + if notifyErr := s.sendNotify(common.JobEntityType, common.DeleteOperation, params.Job{ID: jobID}); notifyErr != nil { + slog.With(slog.Any("error", notifyErr)).Error("failed to send notify") + } + } + }() q := s.conn.Delete(&WorkflowJob{}, jobID) if q.Error != nil { if errors.Is(q.Error, gorm.ErrRecordNotFound) { @@ -134,10 +141,17 @@ func (s *sqlDatabase) LockJob(_ context.Context, jobID int64, entityID string) e return errors.Wrap(err, "saving job") } + asParams, err := sqlWorkflowJobToParamsJob(workflowJob) + if err == nil { + s.sendNotify(common.JobEntityType, common.UpdateOperation, asParams) + } else { + slog.With(slog.Any("error", err)).Error("failed to convert job to params") + } + return nil } -func (s *sqlDatabase) BreakLockJobIsQueued(_ context.Context, jobID int64) error { +func (s *sqlDatabase) BreakLockJobIsQueued(_ context.Context, jobID int64) (err error) { var workflowJob WorkflowJob q := s.conn.Clauses(clause.Locking{Strength: "UPDATE"}).Preload("Instance").Where("id = ? and status = ?", jobID, params.JobStatusQueued).First(&workflowJob) @@ -157,7 +171,12 @@ func (s *sqlDatabase) BreakLockJobIsQueued(_ context.Context, jobID int64) error if err := s.conn.Save(&workflowJob).Error; err != nil { return errors.Wrap(err, "saving job") } - + asParams, err := sqlWorkflowJobToParamsJob(workflowJob) + if err == nil { + s.sendNotify(common.JobEntityType, common.UpdateOperation, asParams) + } else { + slog.With(slog.Any("error", err)).Error("failed to convert job to params") + } return nil } @@ -186,6 +205,12 @@ func (s *sqlDatabase) UnlockJob(_ context.Context, jobID int64, entityID string) return errors.Wrap(err, "saving job") } + asParams, err := sqlWorkflowJobToParamsJob(workflowJob) + if err == nil { + s.sendNotify(common.JobEntityType, common.UpdateOperation, asParams) + } else { + slog.With(slog.Any("error", err)).Error("failed to convert job to params") + } return nil } @@ -198,9 +223,11 @@ func (s *sqlDatabase) CreateOrUpdateJob(ctx context.Context, job params.Job) (pa return params.Job{}, errors.Wrap(q.Error, "fetching job") } } - + var operation common.OperationType if workflowJob.ID != 0 { // Update workflowJob with values from job. + operation = common.UpdateOperation + workflowJob.Status = job.Status workflowJob.Action = job.Action workflowJob.Conclusion = job.Conclusion @@ -238,6 +265,8 @@ func (s *sqlDatabase) CreateOrUpdateJob(ctx context.Context, job params.Job) (pa return params.Job{}, errors.Wrap(err, "saving job") } } else { + operation = common.CreateOperation + workflowJob, err := s.paramsJobToWorkflowJob(ctx, job) if err != nil { return params.Job{}, errors.Wrap(err, "converting job") @@ -247,7 +276,13 @@ func (s *sqlDatabase) CreateOrUpdateJob(ctx context.Context, job params.Job) (pa } } - return sqlWorkflowJobToParamsJob(workflowJob) + asParams, err := sqlWorkflowJobToParamsJob(workflowJob) + if err != nil { + return params.Job{}, errors.Wrap(err, "converting job") + } + s.sendNotify(common.JobEntityType, operation, asParams) + + return asParams, nil } // ListJobsByStatus lists all jobs for a given status.