Skip to content

Commit

Permalink
Merging Cancel/Remove into DeleteJobs and making it clean up job fully (
Browse files Browse the repository at this point in the history
#286)

* Merging Cancel/Remove into DeleteJobs and making it clean up job fully

Cancel and Remove were doing the same thing with subtle differences

Now there is just one RemoveJobs, which deletes all information from the database about the job
 - If the job doesn't exist or nothing was deleted, it won't be in the returned
 - If some information was deleted for the job, it will be returned with the value of nil
 - If an error occurred while deleting, it will be returned with a value of the err message

* Setting job object to expire after 7 days rather than delete immediately

As we currently don't have a good story for debugging previously jobs (although all information is in events)

We will leave the job object around for a short while, so we can look at it if needed before it disappears forever

* Indicating the job was cancelled, on expiry being reset

* Returning error from ReportDone if any jobs failed to be deleted

* Not resetting expiry when deleting job, if expiry already set

* Removing JobIdentities and using Job instead

We are using JobIdentities as an optimization to load less data

However it is used in only 1 place and likely it doesn't bring a huge benefit for the complexity it brings
 - Added maintenance of multiple ways of loading

For now I am removing it to simplify the code and not need to fix it to handle missing jobs
 - When jobs are deleted, they are deleted from the queue index, which breaks this way of loading data

* Removing JobQueue redis index as it is no longer used

* Changing GetJobsByIds to GetExistingJobsByIds

The function no longer errors when a job id is not present.

As we now delete jobs as they finish, the jobs can no longer be assumed to live forever:
 - So now we just return the jobs we find
 - Return error if we failed to load and it wasn't due to job missing

* Fixing accidental negation

* Fix ineffectual assignment

Co-authored-by: jankaspar <2270833+jankaspar@users.noreply.github.com>
  • Loading branch information
JamesMurkin and jankaspar committed Dec 20, 2019
1 parent 15a3a4d commit f574216
Show file tree
Hide file tree
Showing 5 changed files with 147 additions and 123 deletions.
188 changes: 91 additions & 97 deletions internal/armada/repository/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,19 @@ const jobQueuePrefix = "Job:Queue:"
const jobSetPrefix = "Job:Set:"
const jobLeasedPrefix = "Job:Leased:"
const jobClusterMapKey = "Job:ClusterId"
const jobQueueMapKey = "Job:QueueName"

type JobRepository interface {
CreateJobs(request *api.JobSubmitRequest, principal authorization.Principal) []*api.Job
AddJobs(job []*api.Job) ([]*SubmitJobResult, error)
GetJobsByIds(ids []string) ([]*api.Job, error)
GetExistingJobsByIds(ids []string) ([]*api.Job, error)
PeekQueue(queue string, limit int64) ([]*api.Job, error)
FilterActiveQueues(queues []*api.Queue) ([]*api.Queue, error)
GetQueueSizes(queues []*api.Queue) (sizes []int64, e error)
TryLeaseJobs(clusterId string, queue string, jobs []*api.Job) ([]*api.Job, error)
RenewLease(clusterId string, jobIds []string) (renewed []string, e error)
ExpireLeases(queue string, deadline time.Time) (expired []*api.Job, e error)
Remove(jobIds []string) (cleanedJobs []string, e error)
ReturnLease(clusterId string, jobId string) (returnedJob *api.Job, err error)
Cancel(jobs []*api.Job) map[*api.Job]error
DeleteJobs(jobs []*api.Job) map[*api.Job]error
GetActiveJobIds(queue string, jobSetId string) ([]string, error)
}

Expand Down Expand Up @@ -81,7 +79,6 @@ type submitJobRedisResponse struct {
job *api.Job
queueJobResult *redis.IntCmd
saveJobResult *redis.StatusCmd
queueIndexResult *redis.BoolCmd
jobSetIndexResult *redis.IntCmd
}

Expand Down Expand Up @@ -110,7 +107,6 @@ func (repo *RedisJobRepository) AddJobs(jobs []*api.Job) ([]*SubmitJobResult, er
)

submitResult.saveJobResult = pipe.Set(jobObjectPrefix+job.Id, jobData, 0)
submitResult.queueIndexResult = pipe.HSet(jobQueueMapKey, job.Id, job.Queue)
submitResult.jobSetIndexResult = pipe.SAdd(jobSetPrefix+job.JobSetId, job.Id)
submitResults = append(submitResults, submitResult)
}
Expand All @@ -128,9 +124,6 @@ func (repo *RedisJobRepository) AddJobs(jobs []*api.Job) ([]*SubmitJobResult, er
response.Error = e
}

if _, e := submitResult.queueIndexResult.Result(); e != nil {
response.Error = e
}
if _, e := submitResult.jobSetIndexResult.Result(); e != nil {
response.Error = e
}
Expand All @@ -142,15 +135,15 @@ func (repo *RedisJobRepository) AddJobs(jobs []*api.Job) ([]*SubmitJobResult, er
}

func (repo *RedisJobRepository) RenewLease(clusterId string, jobIds []string) (renewedJobIds []string, e error) {
jobs, e := repo.getJobIdentities(jobIds)
jobs, e := repo.GetExistingJobsByIds(jobIds)
if e != nil {
return nil, e
}
return repo.leaseJobs(clusterId, jobs)
}

func (repo *RedisJobRepository) ReturnLease(clusterId string, jobId string) (returnedJob *api.Job, err error) {
jobs, e := repo.GetJobsByIds([]string{jobId})
jobs, e := repo.GetExistingJobsByIds([]string{jobId})
if e != nil {
return nil, e
}
Expand All @@ -169,108 +162,123 @@ func (repo *RedisJobRepository) ReturnLease(clusterId string, jobId string) (ret
return nil, nil
}

func (repo *RedisJobRepository) Cancel(jobs []*api.Job) map[*api.Job]error {
type deleteJobRedisResponse struct {
job *api.Job
expiryAlreadySet bool
removeFromLeasedResult *redis.IntCmd
removeFromQueueResult *redis.IntCmd
setJobExpiryResult *redis.BoolCmd
deleteJobSetIndexResult *redis.IntCmd
}

func (repo *RedisJobRepository) DeleteJobs(jobs []*api.Job) map[*api.Job]error {
expiryStatus := repo.getExpiryStatus(jobs)
pipe := repo.db.Pipeline()
queueCmds := []*redis.IntCmd{}
deletionResults := make([]*deleteJobRedisResponse, 0, len(jobs))
for _, job := range jobs {
queueCmds = append(queueCmds, pipe.ZRem(jobQueuePrefix+job.Queue, job.Id))
}
leaseCmds := []*redis.IntCmd{}
for _, job := range jobs {
leaseCmds = append(leaseCmds, pipe.ZRem(jobLeasedPrefix+job.Queue, job.Id))
}
deletionResult := &deleteJobRedisResponse{job: job, expiryAlreadySet: expiryStatus[job]}
deletionResult.removeFromQueueResult = pipe.ZRem(jobQueuePrefix+job.Queue, job.Id)
deletionResult.removeFromLeasedResult = pipe.ZRem(jobLeasedPrefix+job.Queue, job.Id)
deletionResult.deleteJobSetIndexResult = pipe.SRem(jobSetPrefix+job.JobSetId, job.Id)

if !deletionResult.expiryAlreadySet {
deletionResult.setJobExpiryResult = pipe.Expire(jobObjectPrefix+job.Id, time.Hour*24*7)
}
deletionResults = append(deletionResults, deletionResult)
}
_, _ = pipe.Exec() // ignoring error here as it will be part of individual commands

cancelledJobs := map[*api.Job]error{}
for i, job := range jobs {
result, e := queueCmds[i].Result()
if e != nil {
cancelledJobs[job] = e
}
if result > 0 {
cancelledJobs[job] = nil
}
}
for i, job := range jobs {
result, e := leaseCmds[i].Result()
if e != nil {
cancelledJobs[job] = e
for _, deletionResult := range deletionResults {
numberOfUpdates, err := processDeletionResponse(deletionResult)

if numberOfUpdates > 0 {
cancelledJobs[deletionResult.job] = nil
}
if result > 0 {
cancelledJobs[job] = nil

if err != nil {
cancelledJobs[deletionResult.job] = err
}
}

// TODO clean up job completely??

return cancelledJobs
}

func (repo *RedisJobRepository) Remove(jobIds []string) (cleanedJobIds []string, e error) {
// Returns details on if the expiry for each job is already set or not
func (repo *RedisJobRepository) getExpiryStatus(jobs []*api.Job) map[*api.Job]bool {
pipe := repo.db.Pipeline()

jobs, e := repo.getJobIdentities(jobIds)
if e != nil {
return nil, e
var cmds []*redis.DurationCmd
for _, job := range jobs {
cmds = append(cmds, pipe.TTL(jobObjectPrefix+job.Id))
}
_, _ = pipe.Exec() // ignoring error here as it will be part of individual commands

cleanedJobs, e := repo.zRemoveJobIds(jobs, func(j *jobIdentity) string { return jobLeasedPrefix + j.queueName })
if e != nil {
return nil, e
}
expiryStatus := make(map[*api.Job]bool, len(jobs))
for index, response := range cmds {
expiry, err := response.Result()
job := jobs[index]

cleanedQueueJobs, e := repo.zRemoveJobIds(jobs, func(j *jobIdentity) string { return jobQueuePrefix + j.queueName })
if e != nil {
return nil, e
expiryStatus[job] = false
if err == nil && expiry > 0 {
expiryStatus[job] = true
}
}

// TODO removing only leases for now, cleanup everything else
return append(cleanedJobs, cleanedQueueJobs...), nil
return expiryStatus
}

func (repo *RedisJobRepository) zRemoveJobIds(jobIdentities []jobIdentity, getRedisKey func(*jobIdentity) string) (ids []string, err error) {
func processDeletionResponse(deletionResponse *deleteJobRedisResponse) (int64, error) {
var totalUpdates int64 = 0
var errorMessage error = nil

pipe := repo.db.Pipeline()
cmds := make(map[string]*redis.IntCmd)
for _, job := range jobIdentities {
cmds[job.id] = pipe.ZRem(getRedisKey(&job), job.id)
modified, e := deletionResponse.removeFromLeasedResult.Result()
totalUpdates += modified
if e != nil {
errorMessage = e
}

_, e := pipe.Exec()
modified, e = deletionResponse.removeFromQueueResult.Result()
totalUpdates += modified
if e != nil {
return nil, e
errorMessage = e
}

cleanedIds := []string{}
modified, e = deletionResponse.deleteJobSetIndexResult.Result()
totalUpdates += modified
if e != nil {
errorMessage = e
}

for jobId, cmd := range cmds {
modified, e := cmd.Result()
if e == nil && modified > 0 {
cleanedIds = append(cleanedIds, jobId)
if !deletionResponse.expiryAlreadySet {
expirySet, e := deletionResponse.setJobExpiryResult.Result()
if expirySet {
totalUpdates++
}
if e != nil {
errorMessage = e
}
}
return cleanedIds, nil

return totalUpdates, errorMessage
}

func (repo *RedisJobRepository) PeekQueue(queue string, limit int64) ([]*api.Job, error) {
ids, e := repo.db.ZRange(jobQueuePrefix+queue, 0, limit-1).Result()
if e != nil {
return nil, e
}
return repo.GetJobsByIds(ids)
return repo.GetExistingJobsByIds(ids)
}

// returns list of jobs which are successfully leased
func (repo *RedisJobRepository) TryLeaseJobs(clusterId string, queue string, jobs []*api.Job) ([]*api.Job, error) {
jobIds := []jobIdentity{}
jobById := map[string]*api.Job{}
for _, job := range jobs {
jobIds = append(jobIds, jobIdentity{job.Id, queue})
jobById[job.Id] = job
}

leasedIds, e := repo.leaseJobs(clusterId, jobIds)
leasedIds, e := repo.leaseJobs(clusterId, jobs)
if e != nil {
return nil, e
}
Expand All @@ -282,19 +290,27 @@ func (repo *RedisJobRepository) TryLeaseJobs(clusterId string, queue string, job
return leasedJobs, nil
}

func (repo *RedisJobRepository) GetJobsByIds(ids []string) ([]*api.Job, error) {
// Returns existing jobs by Id
// If an Id is supplied that no longer exists, that job will simply be omitted from the result.
// No error will be thrown for missing jobs
func (repo *RedisJobRepository) GetExistingJobsByIds(ids []string) ([]*api.Job, error) {
pipe := repo.db.Pipeline()
var cmds []*redis.StringCmd
for _, id := range ids {
cmds = append(cmds, pipe.Get(jobObjectPrefix+id))
}
_, e := pipe.Exec()
if e != nil {
return nil, e
}
_, _ = pipe.Exec() // ignoring error here as it will be part of individual commands

var jobs []*api.Job
for _, cmd := range cmds {
for index, cmd := range cmds {
_, e := cmd.Result()
if e != nil {
if e == redis.Nil {
log.Warnf("No job found with with job id %s", ids[index])
} else {
return nil, e
}
}
d, _ := cmd.Bytes()
job := &api.Job{}
e = proto.Unmarshal(d, job)
Expand Down Expand Up @@ -378,7 +394,7 @@ func (repo *RedisJobRepository) ExpireLeases(queue string, deadline time.Time) (
if e != nil {
return nil, e
}
expiringJobs, e := repo.GetJobsByIds(ids)
expiringJobs, e := repo.GetExistingJobsByIds(ids)
if e != nil {
return nil, e
}
Expand Down Expand Up @@ -412,29 +428,7 @@ func (repo *RedisJobRepository) ExpireLeases(queue string, deadline time.Time) (
return expired, nil
}

type jobIdentity struct {
id string
queueName string
}

func (repo *RedisJobRepository) getJobIdentities(jobIds []string) ([]jobIdentity, error) {
queues, e := repo.db.HMGet(jobQueueMapKey, jobIds...).Result()
if e != nil {
return nil, e
}

jobIdentities := []jobIdentity{}
for i, queue := range queues {
if queue != nil {
jobIdentities = append(jobIdentities, jobIdentity{jobIds[i], queue.(string)})
} else {
log.Errorf("Missing queue for job %s", jobIds[i])
}
}
return jobIdentities, nil
}

func (repo *RedisJobRepository) leaseJobs(clusterId string, jobs []jobIdentity) ([]string, error) {
func (repo *RedisJobRepository) leaseJobs(clusterId string, jobs []*api.Job) ([]string, error) {

now := time.Now()
pipe := repo.db.Pipeline()
Expand All @@ -443,7 +437,7 @@ func (repo *RedisJobRepository) leaseJobs(clusterId string, jobs []jobIdentity)

cmds := make(map[string]*redis.Cmd)
for _, job := range jobs {
cmds[job.id] = leaseJob(pipe, job.queueName, clusterId, job.id, now)
cmds[job.Id] = leaseJob(pipe, job.Queue, clusterId, job.Id, now)
}
_, e := pipe.Exec()
if e != nil {
Expand Down
Loading

0 comments on commit f574216

Please sign in to comment.