From f574216469fe1068ac9183834f4b704c9dcbc1ee Mon Sep 17 00:00:00 2001 From: JamesMurkin Date: Fri, 20 Dec 2019 15:02:31 +0000 Subject: [PATCH] Merging Cancel/Remove into DeleteJobs and making it clean up job fully (#286) * Merging Cancel/Remove into DeleteJobs and making it clean up job fully Cancel and Remove were doing the same thing with subtle differences Now there is just one RemoveJobs, which deletes all information from the database about the job - If the job doesn't exist or nothing was deleted, it won't be in the returned - If some information was deleted for the job, it will be returned with the value of nil - If an error occurred while deleting, it will be returned with a value of the err message * Setting job object to expire after 7 days rather than delete immediately As we currently don't have a good story for debugging previously jobs (although all information is in events) We will leave the job object around for a short while, so we can look at it if needed before it disappears forever * Indicating the job was cancelled, on expiry being reset * Returning error from ReportDone if any jobs failed to be deleted * Not resetting expiry when deleting job, if expiry already set * Removing JobIdentities and using Job instead We are using JobIdentities as an optimization to load less data However it is used in only 1 place and likely it doesn't bring a huge benefit for the complexity it brings - Added maintenance of multiple ways of loading For now I am removing it to simplify the code and not need to fix it to handle missing jobs - When jobs are deleted, they are deleted from the queue index, which breaks this way of loading data * Removing JobQueue redis index as it is no longer used * Changing GetJobsByIds to GetExistingJobsByIds The function no longer errors when a job id is not present. As we now delete jobs as they finish, the jobs can no longer be assumed to live forever: - So now we just return the jobs we find - Return error if we failed to load and it wasn't due to job missing * Fixing accidental negation * Fix ineffectual assignment Co-authored-by: jankaspar <2270833+jankaspar@users.noreply.github.com> --- internal/armada/repository/job.go | 188 ++++++++++++------------- internal/armada/repository/job_test.go | 48 ++++--- internal/armada/server/lease.go | 20 ++- internal/armada/server/submit.go | 12 +- internal/armada/server/submit_test.go | 2 +- 5 files changed, 147 insertions(+), 123 deletions(-) diff --git a/internal/armada/repository/job.go b/internal/armada/repository/job.go index 979bcb7053e..b6517c50680 100644 --- a/internal/armada/repository/job.go +++ b/internal/armada/repository/job.go @@ -19,21 +19,19 @@ const jobQueuePrefix = "Job:Queue:" const jobSetPrefix = "Job:Set:" const jobLeasedPrefix = "Job:Leased:" const jobClusterMapKey = "Job:ClusterId" -const jobQueueMapKey = "Job:QueueName" type JobRepository interface { CreateJobs(request *api.JobSubmitRequest, principal authorization.Principal) []*api.Job AddJobs(job []*api.Job) ([]*SubmitJobResult, error) - GetJobsByIds(ids []string) ([]*api.Job, error) + GetExistingJobsByIds(ids []string) ([]*api.Job, error) PeekQueue(queue string, limit int64) ([]*api.Job, error) FilterActiveQueues(queues []*api.Queue) ([]*api.Queue, error) GetQueueSizes(queues []*api.Queue) (sizes []int64, e error) TryLeaseJobs(clusterId string, queue string, jobs []*api.Job) ([]*api.Job, error) RenewLease(clusterId string, jobIds []string) (renewed []string, e error) ExpireLeases(queue string, deadline time.Time) (expired []*api.Job, e error) - Remove(jobIds []string) (cleanedJobs []string, e error) ReturnLease(clusterId string, jobId string) (returnedJob *api.Job, err error) - Cancel(jobs []*api.Job) map[*api.Job]error + DeleteJobs(jobs []*api.Job) map[*api.Job]error GetActiveJobIds(queue string, jobSetId string) ([]string, error) } @@ -81,7 +79,6 @@ type submitJobRedisResponse struct { job *api.Job queueJobResult *redis.IntCmd saveJobResult *redis.StatusCmd - queueIndexResult *redis.BoolCmd jobSetIndexResult *redis.IntCmd } @@ -110,7 +107,6 @@ func (repo *RedisJobRepository) AddJobs(jobs []*api.Job) ([]*SubmitJobResult, er ) submitResult.saveJobResult = pipe.Set(jobObjectPrefix+job.Id, jobData, 0) - submitResult.queueIndexResult = pipe.HSet(jobQueueMapKey, job.Id, job.Queue) submitResult.jobSetIndexResult = pipe.SAdd(jobSetPrefix+job.JobSetId, job.Id) submitResults = append(submitResults, submitResult) } @@ -128,9 +124,6 @@ func (repo *RedisJobRepository) AddJobs(jobs []*api.Job) ([]*SubmitJobResult, er response.Error = e } - if _, e := submitResult.queueIndexResult.Result(); e != nil { - response.Error = e - } if _, e := submitResult.jobSetIndexResult.Result(); e != nil { response.Error = e } @@ -142,7 +135,7 @@ func (repo *RedisJobRepository) AddJobs(jobs []*api.Job) ([]*SubmitJobResult, er } func (repo *RedisJobRepository) RenewLease(clusterId string, jobIds []string) (renewedJobIds []string, e error) { - jobs, e := repo.getJobIdentities(jobIds) + jobs, e := repo.GetExistingJobsByIds(jobIds) if e != nil { return nil, e } @@ -150,7 +143,7 @@ func (repo *RedisJobRepository) RenewLease(clusterId string, jobIds []string) (r } func (repo *RedisJobRepository) ReturnLease(clusterId string, jobId string) (returnedJob *api.Job, err error) { - jobs, e := repo.GetJobsByIds([]string{jobId}) + jobs, e := repo.GetExistingJobsByIds([]string{jobId}) if e != nil { return nil, e } @@ -169,88 +162,105 @@ func (repo *RedisJobRepository) ReturnLease(clusterId string, jobId string) (ret return nil, nil } -func (repo *RedisJobRepository) Cancel(jobs []*api.Job) map[*api.Job]error { +type deleteJobRedisResponse struct { + job *api.Job + expiryAlreadySet bool + removeFromLeasedResult *redis.IntCmd + removeFromQueueResult *redis.IntCmd + setJobExpiryResult *redis.BoolCmd + deleteJobSetIndexResult *redis.IntCmd +} +func (repo *RedisJobRepository) DeleteJobs(jobs []*api.Job) map[*api.Job]error { + expiryStatus := repo.getExpiryStatus(jobs) pipe := repo.db.Pipeline() - queueCmds := []*redis.IntCmd{} + deletionResults := make([]*deleteJobRedisResponse, 0, len(jobs)) for _, job := range jobs { - queueCmds = append(queueCmds, pipe.ZRem(jobQueuePrefix+job.Queue, job.Id)) - } - leaseCmds := []*redis.IntCmd{} - for _, job := range jobs { - leaseCmds = append(leaseCmds, pipe.ZRem(jobLeasedPrefix+job.Queue, job.Id)) - } + deletionResult := &deleteJobRedisResponse{job: job, expiryAlreadySet: expiryStatus[job]} + deletionResult.removeFromQueueResult = pipe.ZRem(jobQueuePrefix+job.Queue, job.Id) + deletionResult.removeFromLeasedResult = pipe.ZRem(jobLeasedPrefix+job.Queue, job.Id) + deletionResult.deleteJobSetIndexResult = pipe.SRem(jobSetPrefix+job.JobSetId, job.Id) + if !deletionResult.expiryAlreadySet { + deletionResult.setJobExpiryResult = pipe.Expire(jobObjectPrefix+job.Id, time.Hour*24*7) + } + deletionResults = append(deletionResults, deletionResult) + } _, _ = pipe.Exec() // ignoring error here as it will be part of individual commands cancelledJobs := map[*api.Job]error{} - for i, job := range jobs { - result, e := queueCmds[i].Result() - if e != nil { - cancelledJobs[job] = e - } - if result > 0 { - cancelledJobs[job] = nil - } - } - for i, job := range jobs { - result, e := leaseCmds[i].Result() - if e != nil { - cancelledJobs[job] = e + for _, deletionResult := range deletionResults { + numberOfUpdates, err := processDeletionResponse(deletionResult) + + if numberOfUpdates > 0 { + cancelledJobs[deletionResult.job] = nil } - if result > 0 { - cancelledJobs[job] = nil + + if err != nil { + cancelledJobs[deletionResult.job] = err } } - // TODO clean up job completely?? - return cancelledJobs } -func (repo *RedisJobRepository) Remove(jobIds []string) (cleanedJobIds []string, e error) { +// Returns details on if the expiry for each job is already set or not +func (repo *RedisJobRepository) getExpiryStatus(jobs []*api.Job) map[*api.Job]bool { + pipe := repo.db.Pipeline() - jobs, e := repo.getJobIdentities(jobIds) - if e != nil { - return nil, e + var cmds []*redis.DurationCmd + for _, job := range jobs { + cmds = append(cmds, pipe.TTL(jobObjectPrefix+job.Id)) } + _, _ = pipe.Exec() // ignoring error here as it will be part of individual commands - cleanedJobs, e := repo.zRemoveJobIds(jobs, func(j *jobIdentity) string { return jobLeasedPrefix + j.queueName }) - if e != nil { - return nil, e - } + expiryStatus := make(map[*api.Job]bool, len(jobs)) + for index, response := range cmds { + expiry, err := response.Result() + job := jobs[index] - cleanedQueueJobs, e := repo.zRemoveJobIds(jobs, func(j *jobIdentity) string { return jobQueuePrefix + j.queueName }) - if e != nil { - return nil, e + expiryStatus[job] = false + if err == nil && expiry > 0 { + expiryStatus[job] = true + } } - // TODO removing only leases for now, cleanup everything else - return append(cleanedJobs, cleanedQueueJobs...), nil + return expiryStatus } -func (repo *RedisJobRepository) zRemoveJobIds(jobIdentities []jobIdentity, getRedisKey func(*jobIdentity) string) (ids []string, err error) { +func processDeletionResponse(deletionResponse *deleteJobRedisResponse) (int64, error) { + var totalUpdates int64 = 0 + var errorMessage error = nil - pipe := repo.db.Pipeline() - cmds := make(map[string]*redis.IntCmd) - for _, job := range jobIdentities { - cmds[job.id] = pipe.ZRem(getRedisKey(&job), job.id) + modified, e := deletionResponse.removeFromLeasedResult.Result() + totalUpdates += modified + if e != nil { + errorMessage = e } - _, e := pipe.Exec() + modified, e = deletionResponse.removeFromQueueResult.Result() + totalUpdates += modified if e != nil { - return nil, e + errorMessage = e } - cleanedIds := []string{} + modified, e = deletionResponse.deleteJobSetIndexResult.Result() + totalUpdates += modified + if e != nil { + errorMessage = e + } - for jobId, cmd := range cmds { - modified, e := cmd.Result() - if e == nil && modified > 0 { - cleanedIds = append(cleanedIds, jobId) + if !deletionResponse.expiryAlreadySet { + expirySet, e := deletionResponse.setJobExpiryResult.Result() + if expirySet { + totalUpdates++ + } + if e != nil { + errorMessage = e } } - return cleanedIds, nil + + return totalUpdates, errorMessage } func (repo *RedisJobRepository) PeekQueue(queue string, limit int64) ([]*api.Job, error) { @@ -258,19 +268,17 @@ func (repo *RedisJobRepository) PeekQueue(queue string, limit int64) ([]*api.Job if e != nil { return nil, e } - return repo.GetJobsByIds(ids) + return repo.GetExistingJobsByIds(ids) } // returns list of jobs which are successfully leased func (repo *RedisJobRepository) TryLeaseJobs(clusterId string, queue string, jobs []*api.Job) ([]*api.Job, error) { - jobIds := []jobIdentity{} jobById := map[string]*api.Job{} for _, job := range jobs { - jobIds = append(jobIds, jobIdentity{job.Id, queue}) jobById[job.Id] = job } - leasedIds, e := repo.leaseJobs(clusterId, jobIds) + leasedIds, e := repo.leaseJobs(clusterId, jobs) if e != nil { return nil, e } @@ -282,19 +290,27 @@ func (repo *RedisJobRepository) TryLeaseJobs(clusterId string, queue string, job return leasedJobs, nil } -func (repo *RedisJobRepository) GetJobsByIds(ids []string) ([]*api.Job, error) { +// Returns existing jobs by Id +// If an Id is supplied that no longer exists, that job will simply be omitted from the result. +// No error will be thrown for missing jobs +func (repo *RedisJobRepository) GetExistingJobsByIds(ids []string) ([]*api.Job, error) { pipe := repo.db.Pipeline() var cmds []*redis.StringCmd for _, id := range ids { cmds = append(cmds, pipe.Get(jobObjectPrefix+id)) } - _, e := pipe.Exec() - if e != nil { - return nil, e - } + _, _ = pipe.Exec() // ignoring error here as it will be part of individual commands var jobs []*api.Job - for _, cmd := range cmds { + for index, cmd := range cmds { + _, e := cmd.Result() + if e != nil { + if e == redis.Nil { + log.Warnf("No job found with with job id %s", ids[index]) + } else { + return nil, e + } + } d, _ := cmd.Bytes() job := &api.Job{} e = proto.Unmarshal(d, job) @@ -378,7 +394,7 @@ func (repo *RedisJobRepository) ExpireLeases(queue string, deadline time.Time) ( if e != nil { return nil, e } - expiringJobs, e := repo.GetJobsByIds(ids) + expiringJobs, e := repo.GetExistingJobsByIds(ids) if e != nil { return nil, e } @@ -412,29 +428,7 @@ func (repo *RedisJobRepository) ExpireLeases(queue string, deadline time.Time) ( return expired, nil } -type jobIdentity struct { - id string - queueName string -} - -func (repo *RedisJobRepository) getJobIdentities(jobIds []string) ([]jobIdentity, error) { - queues, e := repo.db.HMGet(jobQueueMapKey, jobIds...).Result() - if e != nil { - return nil, e - } - - jobIdentities := []jobIdentity{} - for i, queue := range queues { - if queue != nil { - jobIdentities = append(jobIdentities, jobIdentity{jobIds[i], queue.(string)}) - } else { - log.Errorf("Missing queue for job %s", jobIds[i]) - } - } - return jobIdentities, nil -} - -func (repo *RedisJobRepository) leaseJobs(clusterId string, jobs []jobIdentity) ([]string, error) { +func (repo *RedisJobRepository) leaseJobs(clusterId string, jobs []*api.Job) ([]string, error) { now := time.Now() pipe := repo.db.Pipeline() @@ -443,7 +437,7 @@ func (repo *RedisJobRepository) leaseJobs(clusterId string, jobs []jobIdentity) cmds := make(map[string]*redis.Cmd) for _, job := range jobs { - cmds[job.id] = leaseJob(pipe, job.queueName, clusterId, job.id, now) + cmds[job.Id] = leaseJob(pipe, job.Queue, clusterId, job.Id, now) } _, e := pipe.Exec() if e != nil { diff --git a/internal/armada/repository/job_test.go b/internal/armada/repository/job_test.go index a0b5fd272e2..84cac127edc 100644 --- a/internal/armada/repository/job_test.go +++ b/internal/armada/repository/job_test.go @@ -83,7 +83,7 @@ func TestRenewingNonExistentLease(t *testing.T) { }) } -func TestExpiredJobRemoveShouldRemoveJobFromQueue(t *testing.T) { +func TestDeletingExpiredJobShouldDeleteJobFromQueue(t *testing.T) { withRepository(func(r *RedisJobRepository) { job := addLeasedJob(t, r, "queue1", "cluster1") deadline := time.Now() @@ -91,11 +91,13 @@ func TestExpiredJobRemoveShouldRemoveJobFromQueue(t *testing.T) { _, e := r.ExpireLeases("queue1", deadline) assert.Nil(t, e) - removed, e := r.Remove([]string{job.Id}) - assert.Nil(t, e) + deletionResult := r.DeleteJobs([]*api.Job{job}) + + err, deleted := deletionResult[job] - assert.Equal(t, 1, len(removed)) - assert.Equal(t, job.Id, removed[0]) + assert.Equal(t, 1, len(deletionResult)) + assert.True(t, deleted) + assert.Nil(t, err) queue, e := r.PeekQueue("queue1", 100) assert.Nil(t, e) @@ -142,38 +144,50 @@ func TestReturnLeaseForJobInQueueIsNoop(t *testing.T) { }) } -func TestCancelRunningJob(t *testing.T) { +func TestDeleteRunningJob(t *testing.T) { withRepository(func(r *RedisJobRepository) { job := addLeasedJob(t, r, "queue1", "cluster1") - result := r.Cancel([]*api.Job{job}) - assert.Nil(t, result[job]) + result := r.DeleteJobs([]*api.Job{job}) + err, deletionOccurred := result[job] + assert.Nil(t, err) + assert.True(t, deletionOccurred) }) } -func TestCancelQueuedJob(t *testing.T) { +func TestDeleteQueuedJob(t *testing.T) { withRepository(func(r *RedisJobRepository) { job := addTestJob(t, r, "queue1") - result := r.Cancel([]*api.Job{job}) - assert.Nil(t, result[job]) + result := r.DeleteJobs([]*api.Job{job}) + err, deletionOccurred := result[job] + assert.Nil(t, err) + assert.True(t, deletionOccurred) }) } -func TestCancelMissingJob(t *testing.T) { +func TestDeleteWithSomeMissingJobs(t *testing.T) { withRepository(func(r *RedisJobRepository) { - job := &api.Job{Id: "jobId"} - result := r.Cancel([]*api.Job{job}) - assert.Nil(t, result[job]) + missingJob := &api.Job{Id: "jobId"} + runningJob := addLeasedJob(t, r, "queue1", "cluster1") + result := r.DeleteJobs([]*api.Job{missingJob, runningJob}) + + err, deletionOccurred := result[missingJob] + assert.Nil(t, err) + assert.False(t, deletionOccurred) + + err, deletionOccurred = result[runningJob] + assert.Nil(t, err) + assert.True(t, deletionOccurred) }) } -func TestReturnLeaseForCancelledJobShouldKeepJobCancelled(t *testing.T) { +func TestReturnLeaseForDeletedJobShouldKeepJobDeleted(t *testing.T) { withRepository(func(r *RedisJobRepository) { job := addLeasedJob(t, r, "cancel-test-queue", "cluster") - result := r.Cancel([]*api.Job{job}) + result := r.DeleteJobs([]*api.Job{job}) assert.Nil(t, result[job]) returned, err := r.ReturnLease("cluster", job.Id) diff --git a/internal/armada/server/lease.go b/internal/armada/server/lease.go index 59fcd5981ad..a20a4feff60 100644 --- a/internal/armada/server/lease.go +++ b/internal/armada/server/lease.go @@ -8,6 +8,8 @@ import ( "github.com/gogo/protobuf/types" log "github.com/sirupsen/logrus" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "github.com/G-Research/armada/internal/armada/api" "github.com/G-Research/armada/internal/armada/authorization" @@ -142,8 +144,22 @@ func (q *AggregatedQueueServer) ReportDone(ctx context.Context, idList *api.IdLi if e := checkPermission(q.permissions, ctx, permissions.ExecuteJobs); e != nil { return nil, e } - cleaned, e := q.jobRepository.Remove(idList.Ids) - return &api.IdList{cleaned}, e + jobs, e := q.jobRepository.GetExistingJobsByIds(idList.Ids) + if e != nil { + return nil, status.Errorf(codes.Internal, e.Error()) + } + deletionResult := q.jobRepository.DeleteJobs(jobs) + + cleanedIds := make([]string, 0, len(deletionResult)) + var returnedError error = nil + for job, err := range deletionResult { + if err != nil { + returnedError = err + } else { + cleanedIds = append(cleanedIds, job.Id) + } + } + return &api.IdList{cleanedIds}, returnedError } func (q *AggregatedQueueServer) assignJobs(request *api.LeaseRequest, slices map[*api.Queue]common.ComputeResourcesFloat) ([]*api.Job, error) { diff --git a/internal/armada/server/submit.go b/internal/armada/server/submit.go index ee881ec3639..d4ca508e205 100644 --- a/internal/armada/server/submit.go +++ b/internal/armada/server/submit.go @@ -96,7 +96,7 @@ func (server *SubmitServer) SubmitJobs(ctx context.Context, req *api.JobSubmitRe func (server *SubmitServer) CancelJobs(ctx context.Context, request *api.JobCancelRequest) (*api.CancellationResult, error) { if request.JobId != "" { - jobs, e := server.jobRepository.GetJobsByIds([]string{request.JobId}) + jobs, e := server.jobRepository.GetExistingJobsByIds([]string{request.JobId}) if e != nil { return nil, status.Errorf(codes.Internal, e.Error()) } @@ -108,7 +108,7 @@ func (server *SubmitServer) CancelJobs(ctx context.Context, request *api.JobCanc if e != nil { return nil, status.Errorf(codes.Aborted, e.Error()) } - jobs, e := server.jobRepository.GetJobsByIds(ids) + jobs, e := server.jobRepository.GetExistingJobsByIds(ids) if e != nil { return nil, status.Errorf(codes.Internal, e.Error()) } @@ -127,12 +127,12 @@ func (server *SubmitServer) cancelJobs(ctx context.Context, queue string, jobs [ return nil, status.Errorf(codes.Unknown, e.Error()) } - cancellationResult := server.jobRepository.Cancel(jobs) + deletionResult := server.jobRepository.DeleteJobs(jobs) cancelled := []*api.Job{} cancelledIds := []string{} - for job, error := range cancellationResult { - if error != nil { - log.Errorf("Error when cancelling job id %s: %s", job.Id, error.Error()) + for job, err := range deletionResult { + if err != nil { + log.Errorf("Error when cancelling job id %s: %s", job.Id, err.Error()) } else { cancelled = append(cancelled, job) cancelledIds = append(cancelledIds, job.Id) diff --git a/internal/armada/server/submit_test.go b/internal/armada/server/submit_test.go index 73d78f713ba..53dc579a6a1 100644 --- a/internal/armada/server/submit_test.go +++ b/internal/armada/server/submit_test.go @@ -72,7 +72,7 @@ func TestSubmitServer_SubmitJob_ReturnsJobItemsInTheSameOrderTheyWereSubmitted(t } //Get jobs for jobIds returned - jobs, _ := s.jobRepository.GetJobsByIds(jobIds) + jobs, _ := s.jobRepository.GetExistingJobsByIds(jobIds) jobSet := make(map[string]*api.Job, 5) for _, job := range jobs { jobSet[job.Id] = job