Skip to content

Commit

Permalink
Add notifications for jobs
Browse files Browse the repository at this point in the history
Signed-off-by: Gabriel Adrian Samfira <gsamfira@cloudbasesolutions.com>
  • Loading branch information
gabriel-samfira committed Jun 19, 2024
1 parent 5f07bc2 commit b7d138d
Showing 1 changed file with 40 additions and 5 deletions.
45 changes: 40 additions & 5 deletions database/sql/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,14 @@ func (s *sqlDatabase) paramsJobToWorkflowJob(ctx context.Context, job params.Job
return workflofJob, nil
}

func (s *sqlDatabase) DeleteJob(_ context.Context, jobID int64) error {
func (s *sqlDatabase) DeleteJob(_ context.Context, jobID int64) (err error) {
defer func() {
if err == nil {
if notifyErr := s.sendNotify(common.JobEntityType, common.DeleteOperation, params.Job{ID: jobID}); notifyErr != nil {
slog.With(slog.Any("error", notifyErr)).Error("failed to send notify")
}
}
}()
q := s.conn.Delete(&WorkflowJob{}, jobID)
if q.Error != nil {
if errors.Is(q.Error, gorm.ErrRecordNotFound) {
Expand Down Expand Up @@ -134,10 +141,17 @@ func (s *sqlDatabase) LockJob(_ context.Context, jobID int64, entityID string) e
return errors.Wrap(err, "saving job")
}

asParams, err := sqlWorkflowJobToParamsJob(workflowJob)
if err == nil {
s.sendNotify(common.JobEntityType, common.UpdateOperation, asParams)
} else {
slog.With(slog.Any("error", err)).Error("failed to convert job to params")
}

return nil
}

func (s *sqlDatabase) BreakLockJobIsQueued(_ context.Context, jobID int64) error {
func (s *sqlDatabase) BreakLockJobIsQueued(_ context.Context, jobID int64) (err error) {
var workflowJob WorkflowJob
q := s.conn.Clauses(clause.Locking{Strength: "UPDATE"}).Preload("Instance").Where("id = ? and status = ?", jobID, params.JobStatusQueued).First(&workflowJob)

Expand All @@ -157,7 +171,12 @@ func (s *sqlDatabase) BreakLockJobIsQueued(_ context.Context, jobID int64) error
if err := s.conn.Save(&workflowJob).Error; err != nil {
return errors.Wrap(err, "saving job")
}

asParams, err := sqlWorkflowJobToParamsJob(workflowJob)
if err == nil {
s.sendNotify(common.JobEntityType, common.UpdateOperation, asParams)
} else {
slog.With(slog.Any("error", err)).Error("failed to convert job to params")
}
return nil
}

Expand Down Expand Up @@ -186,6 +205,12 @@ func (s *sqlDatabase) UnlockJob(_ context.Context, jobID int64, entityID string)
return errors.Wrap(err, "saving job")
}

asParams, err := sqlWorkflowJobToParamsJob(workflowJob)
if err == nil {
s.sendNotify(common.JobEntityType, common.UpdateOperation, asParams)
} else {
slog.With(slog.Any("error", err)).Error("failed to convert job to params")
}
return nil
}

Expand All @@ -198,9 +223,11 @@ func (s *sqlDatabase) CreateOrUpdateJob(ctx context.Context, job params.Job) (pa
return params.Job{}, errors.Wrap(q.Error, "fetching job")
}
}

var operation common.OperationType
if workflowJob.ID != 0 {
// Update workflowJob with values from job.
operation = common.UpdateOperation

workflowJob.Status = job.Status
workflowJob.Action = job.Action
workflowJob.Conclusion = job.Conclusion
Expand Down Expand Up @@ -238,6 +265,8 @@ func (s *sqlDatabase) CreateOrUpdateJob(ctx context.Context, job params.Job) (pa
return params.Job{}, errors.Wrap(err, "saving job")
}
} else {
operation = common.CreateOperation

workflowJob, err := s.paramsJobToWorkflowJob(ctx, job)
if err != nil {
return params.Job{}, errors.Wrap(err, "converting job")
Expand All @@ -247,7 +276,13 @@ func (s *sqlDatabase) CreateOrUpdateJob(ctx context.Context, job params.Job) (pa
}
}

return sqlWorkflowJobToParamsJob(workflowJob)
asParams, err := sqlWorkflowJobToParamsJob(workflowJob)
if err != nil {
return params.Job{}, errors.Wrap(err, "converting job")
}
s.sendNotify(common.JobEntityType, operation, asParams)

return asParams, nil
}

// ListJobsByStatus lists all jobs for a given status.
Expand Down

0 comments on commit b7d138d

Please sign in to comment.