diff --git a/internal/armada/repository/job.go b/internal/armada/repository/job.go index 979bcb7053e..b6517c50680 100644 --- a/internal/armada/repository/job.go +++ b/internal/armada/repository/job.go @@ -19,21 +19,19 @@ const jobQueuePrefix = "Job:Queue:" const jobSetPrefix = "Job:Set:" const jobLeasedPrefix = "Job:Leased:" const jobClusterMapKey = "Job:ClusterId" -const jobQueueMapKey = "Job:QueueName" type JobRepository interface { CreateJobs(request *api.JobSubmitRequest, principal authorization.Principal) []*api.Job AddJobs(job []*api.Job) ([]*SubmitJobResult, error) - GetJobsByIds(ids []string) ([]*api.Job, error) + GetExistingJobsByIds(ids []string) ([]*api.Job, error) PeekQueue(queue string, limit int64) ([]*api.Job, error) FilterActiveQueues(queues []*api.Queue) ([]*api.Queue, error) GetQueueSizes(queues []*api.Queue) (sizes []int64, e error) TryLeaseJobs(clusterId string, queue string, jobs []*api.Job) ([]*api.Job, error) RenewLease(clusterId string, jobIds []string) (renewed []string, e error) ExpireLeases(queue string, deadline time.Time) (expired []*api.Job, e error) - Remove(jobIds []string) (cleanedJobs []string, e error) ReturnLease(clusterId string, jobId string) (returnedJob *api.Job, err error) - Cancel(jobs []*api.Job) map[*api.Job]error + DeleteJobs(jobs []*api.Job) map[*api.Job]error GetActiveJobIds(queue string, jobSetId string) ([]string, error) } @@ -81,7 +79,6 @@ type submitJobRedisResponse struct { job *api.Job queueJobResult *redis.IntCmd saveJobResult *redis.StatusCmd - queueIndexResult *redis.BoolCmd jobSetIndexResult *redis.IntCmd } @@ -110,7 +107,6 @@ func (repo *RedisJobRepository) AddJobs(jobs []*api.Job) ([]*SubmitJobResult, er ) submitResult.saveJobResult = pipe.Set(jobObjectPrefix+job.Id, jobData, 0) - submitResult.queueIndexResult = pipe.HSet(jobQueueMapKey, job.Id, job.Queue) submitResult.jobSetIndexResult = pipe.SAdd(jobSetPrefix+job.JobSetId, job.Id) submitResults = append(submitResults, submitResult) } @@ -128,9 +124,6 @@ func (repo *RedisJobRepository) AddJobs(jobs []*api.Job) ([]*SubmitJobResult, er response.Error = e } - if _, e := submitResult.queueIndexResult.Result(); e != nil { - response.Error = e - } if _, e := submitResult.jobSetIndexResult.Result(); e != nil { response.Error = e } @@ -142,7 +135,7 @@ func (repo *RedisJobRepository) AddJobs(jobs []*api.Job) ([]*SubmitJobResult, er } func (repo *RedisJobRepository) RenewLease(clusterId string, jobIds []string) (renewedJobIds []string, e error) { - jobs, e := repo.getJobIdentities(jobIds) + jobs, e := repo.GetExistingJobsByIds(jobIds) if e != nil { return nil, e } @@ -150,7 +143,7 @@ func (repo *RedisJobRepository) RenewLease(clusterId string, jobIds []string) (r } func (repo *RedisJobRepository) ReturnLease(clusterId string, jobId string) (returnedJob *api.Job, err error) { - jobs, e := repo.GetJobsByIds([]string{jobId}) + jobs, e := repo.GetExistingJobsByIds([]string{jobId}) if e != nil { return nil, e } @@ -169,88 +162,105 @@ func (repo *RedisJobRepository) ReturnLease(clusterId string, jobId string) (ret return nil, nil } -func (repo *RedisJobRepository) Cancel(jobs []*api.Job) map[*api.Job]error { +type deleteJobRedisResponse struct { + job *api.Job + expiryAlreadySet bool + removeFromLeasedResult *redis.IntCmd + removeFromQueueResult *redis.IntCmd + setJobExpiryResult *redis.BoolCmd + deleteJobSetIndexResult *redis.IntCmd +} +func (repo *RedisJobRepository) DeleteJobs(jobs []*api.Job) map[*api.Job]error { + expiryStatus := repo.getExpiryStatus(jobs) pipe := repo.db.Pipeline() - queueCmds := []*redis.IntCmd{} + deletionResults := make([]*deleteJobRedisResponse, 0, len(jobs)) for _, job := range jobs { - queueCmds = append(queueCmds, pipe.ZRem(jobQueuePrefix+job.Queue, job.Id)) - } - leaseCmds := []*redis.IntCmd{} - for _, job := range jobs { - leaseCmds = append(leaseCmds, pipe.ZRem(jobLeasedPrefix+job.Queue, job.Id)) - } + deletionResult := &deleteJobRedisResponse{job: job, expiryAlreadySet: expiryStatus[job]} + deletionResult.removeFromQueueResult = pipe.ZRem(jobQueuePrefix+job.Queue, job.Id) + deletionResult.removeFromLeasedResult = pipe.ZRem(jobLeasedPrefix+job.Queue, job.Id) + deletionResult.deleteJobSetIndexResult = pipe.SRem(jobSetPrefix+job.JobSetId, job.Id) + if !deletionResult.expiryAlreadySet { + deletionResult.setJobExpiryResult = pipe.Expire(jobObjectPrefix+job.Id, time.Hour*24*7) + } + deletionResults = append(deletionResults, deletionResult) + } _, _ = pipe.Exec() // ignoring error here as it will be part of individual commands cancelledJobs := map[*api.Job]error{} - for i, job := range jobs { - result, e := queueCmds[i].Result() - if e != nil { - cancelledJobs[job] = e - } - if result > 0 { - cancelledJobs[job] = nil - } - } - for i, job := range jobs { - result, e := leaseCmds[i].Result() - if e != nil { - cancelledJobs[job] = e + for _, deletionResult := range deletionResults { + numberOfUpdates, err := processDeletionResponse(deletionResult) + + if numberOfUpdates > 0 { + cancelledJobs[deletionResult.job] = nil } - if result > 0 { - cancelledJobs[job] = nil + + if err != nil { + cancelledJobs[deletionResult.job] = err } } - // TODO clean up job completely?? - return cancelledJobs } -func (repo *RedisJobRepository) Remove(jobIds []string) (cleanedJobIds []string, e error) { +// Returns details on if the expiry for each job is already set or not +func (repo *RedisJobRepository) getExpiryStatus(jobs []*api.Job) map[*api.Job]bool { + pipe := repo.db.Pipeline() - jobs, e := repo.getJobIdentities(jobIds) - if e != nil { - return nil, e + var cmds []*redis.DurationCmd + for _, job := range jobs { + cmds = append(cmds, pipe.TTL(jobObjectPrefix+job.Id)) } + _, _ = pipe.Exec() // ignoring error here as it will be part of individual commands - cleanedJobs, e := repo.zRemoveJobIds(jobs, func(j *jobIdentity) string { return jobLeasedPrefix + j.queueName }) - if e != nil { - return nil, e - } + expiryStatus := make(map[*api.Job]bool, len(jobs)) + for index, response := range cmds { + expiry, err := response.Result() + job := jobs[index] - cleanedQueueJobs, e := repo.zRemoveJobIds(jobs, func(j *jobIdentity) string { return jobQueuePrefix + j.queueName }) - if e != nil { - return nil, e + expiryStatus[job] = false + if err == nil && expiry > 0 { + expiryStatus[job] = true + } } - // TODO removing only leases for now, cleanup everything else - return append(cleanedJobs, cleanedQueueJobs...), nil + return expiryStatus } -func (repo *RedisJobRepository) zRemoveJobIds(jobIdentities []jobIdentity, getRedisKey func(*jobIdentity) string) (ids []string, err error) { +func processDeletionResponse(deletionResponse *deleteJobRedisResponse) (int64, error) { + var totalUpdates int64 = 0 + var errorMessage error = nil - pipe := repo.db.Pipeline() - cmds := make(map[string]*redis.IntCmd) - for _, job := range jobIdentities { - cmds[job.id] = pipe.ZRem(getRedisKey(&job), job.id) + modified, e := deletionResponse.removeFromLeasedResult.Result() + totalUpdates += modified + if e != nil { + errorMessage = e } - _, e := pipe.Exec() + modified, e = deletionResponse.removeFromQueueResult.Result() + totalUpdates += modified if e != nil { - return nil, e + errorMessage = e } - cleanedIds := []string{} + modified, e = deletionResponse.deleteJobSetIndexResult.Result() + totalUpdates += modified + if e != nil { + errorMessage = e + } - for jobId, cmd := range cmds { - modified, e := cmd.Result() - if e == nil && modified > 0 { - cleanedIds = append(cleanedIds, jobId) + if !deletionResponse.expiryAlreadySet { + expirySet, e := deletionResponse.setJobExpiryResult.Result() + if expirySet { + totalUpdates++ + } + if e != nil { + errorMessage = e } } - return cleanedIds, nil + + return totalUpdates, errorMessage } func (repo *RedisJobRepository) PeekQueue(queue string, limit int64) ([]*api.Job, error) { @@ -258,19 +268,17 @@ func (repo *RedisJobRepository) PeekQueue(queue string, limit int64) ([]*api.Job if e != nil { return nil, e } - return repo.GetJobsByIds(ids) + return repo.GetExistingJobsByIds(ids) } // returns list of jobs which are successfully leased func (repo *RedisJobRepository) TryLeaseJobs(clusterId string, queue string, jobs []*api.Job) ([]*api.Job, error) { - jobIds := []jobIdentity{} jobById := map[string]*api.Job{} for _, job := range jobs { - jobIds = append(jobIds, jobIdentity{job.Id, queue}) jobById[job.Id] = job } - leasedIds, e := repo.leaseJobs(clusterId, jobIds) + leasedIds, e := repo.leaseJobs(clusterId, jobs) if e != nil { return nil, e } @@ -282,19 +290,27 @@ func (repo *RedisJobRepository) TryLeaseJobs(clusterId string, queue string, job return leasedJobs, nil } -func (repo *RedisJobRepository) GetJobsByIds(ids []string) ([]*api.Job, error) { +// Returns existing jobs by Id +// If an Id is supplied that no longer exists, that job will simply be omitted from the result. +// No error will be thrown for missing jobs +func (repo *RedisJobRepository) GetExistingJobsByIds(ids []string) ([]*api.Job, error) { pipe := repo.db.Pipeline() var cmds []*redis.StringCmd for _, id := range ids { cmds = append(cmds, pipe.Get(jobObjectPrefix+id)) } - _, e := pipe.Exec() - if e != nil { - return nil, e - } + _, _ = pipe.Exec() // ignoring error here as it will be part of individual commands var jobs []*api.Job - for _, cmd := range cmds { + for index, cmd := range cmds { + _, e := cmd.Result() + if e != nil { + if e == redis.Nil { + log.Warnf("No job found with with job id %s", ids[index]) + } else { + return nil, e + } + } d, _ := cmd.Bytes() job := &api.Job{} e = proto.Unmarshal(d, job) @@ -378,7 +394,7 @@ func (repo *RedisJobRepository) ExpireLeases(queue string, deadline time.Time) ( if e != nil { return nil, e } - expiringJobs, e := repo.GetJobsByIds(ids) + expiringJobs, e := repo.GetExistingJobsByIds(ids) if e != nil { return nil, e } @@ -412,29 +428,7 @@ func (repo *RedisJobRepository) ExpireLeases(queue string, deadline time.Time) ( return expired, nil } -type jobIdentity struct { - id string - queueName string -} - -func (repo *RedisJobRepository) getJobIdentities(jobIds []string) ([]jobIdentity, error) { - queues, e := repo.db.HMGet(jobQueueMapKey, jobIds...).Result() - if e != nil { - return nil, e - } - - jobIdentities := []jobIdentity{} - for i, queue := range queues { - if queue != nil { - jobIdentities = append(jobIdentities, jobIdentity{jobIds[i], queue.(string)}) - } else { - log.Errorf("Missing queue for job %s", jobIds[i]) - } - } - return jobIdentities, nil -} - -func (repo *RedisJobRepository) leaseJobs(clusterId string, jobs []jobIdentity) ([]string, error) { +func (repo *RedisJobRepository) leaseJobs(clusterId string, jobs []*api.Job) ([]string, error) { now := time.Now() pipe := repo.db.Pipeline() @@ -443,7 +437,7 @@ func (repo *RedisJobRepository) leaseJobs(clusterId string, jobs []jobIdentity) cmds := make(map[string]*redis.Cmd) for _, job := range jobs { - cmds[job.id] = leaseJob(pipe, job.queueName, clusterId, job.id, now) + cmds[job.Id] = leaseJob(pipe, job.Queue, clusterId, job.Id, now) } _, e := pipe.Exec() if e != nil { diff --git a/internal/armada/repository/job_test.go b/internal/armada/repository/job_test.go index a0b5fd272e2..84cac127edc 100644 --- a/internal/armada/repository/job_test.go +++ b/internal/armada/repository/job_test.go @@ -83,7 +83,7 @@ func TestRenewingNonExistentLease(t *testing.T) { }) } -func TestExpiredJobRemoveShouldRemoveJobFromQueue(t *testing.T) { +func TestDeletingExpiredJobShouldDeleteJobFromQueue(t *testing.T) { withRepository(func(r *RedisJobRepository) { job := addLeasedJob(t, r, "queue1", "cluster1") deadline := time.Now() @@ -91,11 +91,13 @@ func TestExpiredJobRemoveShouldRemoveJobFromQueue(t *testing.T) { _, e := r.ExpireLeases("queue1", deadline) assert.Nil(t, e) - removed, e := r.Remove([]string{job.Id}) - assert.Nil(t, e) + deletionResult := r.DeleteJobs([]*api.Job{job}) + + err, deleted := deletionResult[job] - assert.Equal(t, 1, len(removed)) - assert.Equal(t, job.Id, removed[0]) + assert.Equal(t, 1, len(deletionResult)) + assert.True(t, deleted) + assert.Nil(t, err) queue, e := r.PeekQueue("queue1", 100) assert.Nil(t, e) @@ -142,38 +144,50 @@ func TestReturnLeaseForJobInQueueIsNoop(t *testing.T) { }) } -func TestCancelRunningJob(t *testing.T) { +func TestDeleteRunningJob(t *testing.T) { withRepository(func(r *RedisJobRepository) { job := addLeasedJob(t, r, "queue1", "cluster1") - result := r.Cancel([]*api.Job{job}) - assert.Nil(t, result[job]) + result := r.DeleteJobs([]*api.Job{job}) + err, deletionOccurred := result[job] + assert.Nil(t, err) + assert.True(t, deletionOccurred) }) } -func TestCancelQueuedJob(t *testing.T) { +func TestDeleteQueuedJob(t *testing.T) { withRepository(func(r *RedisJobRepository) { job := addTestJob(t, r, "queue1") - result := r.Cancel([]*api.Job{job}) - assert.Nil(t, result[job]) + result := r.DeleteJobs([]*api.Job{job}) + err, deletionOccurred := result[job] + assert.Nil(t, err) + assert.True(t, deletionOccurred) }) } -func TestCancelMissingJob(t *testing.T) { +func TestDeleteWithSomeMissingJobs(t *testing.T) { withRepository(func(r *RedisJobRepository) { - job := &api.Job{Id: "jobId"} - result := r.Cancel([]*api.Job{job}) - assert.Nil(t, result[job]) + missingJob := &api.Job{Id: "jobId"} + runningJob := addLeasedJob(t, r, "queue1", "cluster1") + result := r.DeleteJobs([]*api.Job{missingJob, runningJob}) + + err, deletionOccurred := result[missingJob] + assert.Nil(t, err) + assert.False(t, deletionOccurred) + + err, deletionOccurred = result[runningJob] + assert.Nil(t, err) + assert.True(t, deletionOccurred) }) } -func TestReturnLeaseForCancelledJobShouldKeepJobCancelled(t *testing.T) { +func TestReturnLeaseForDeletedJobShouldKeepJobDeleted(t *testing.T) { withRepository(func(r *RedisJobRepository) { job := addLeasedJob(t, r, "cancel-test-queue", "cluster") - result := r.Cancel([]*api.Job{job}) + result := r.DeleteJobs([]*api.Job{job}) assert.Nil(t, result[job]) returned, err := r.ReturnLease("cluster", job.Id) diff --git a/internal/armada/server/lease.go b/internal/armada/server/lease.go index 59fcd5981ad..a20a4feff60 100644 --- a/internal/armada/server/lease.go +++ b/internal/armada/server/lease.go @@ -8,6 +8,8 @@ import ( "github.com/gogo/protobuf/types" log "github.com/sirupsen/logrus" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "github.com/G-Research/armada/internal/armada/api" "github.com/G-Research/armada/internal/armada/authorization" @@ -142,8 +144,22 @@ func (q *AggregatedQueueServer) ReportDone(ctx context.Context, idList *api.IdLi if e := checkPermission(q.permissions, ctx, permissions.ExecuteJobs); e != nil { return nil, e } - cleaned, e := q.jobRepository.Remove(idList.Ids) - return &api.IdList{cleaned}, e + jobs, e := q.jobRepository.GetExistingJobsByIds(idList.Ids) + if e != nil { + return nil, status.Errorf(codes.Internal, e.Error()) + } + deletionResult := q.jobRepository.DeleteJobs(jobs) + + cleanedIds := make([]string, 0, len(deletionResult)) + var returnedError error = nil + for job, err := range deletionResult { + if err != nil { + returnedError = err + } else { + cleanedIds = append(cleanedIds, job.Id) + } + } + return &api.IdList{cleanedIds}, returnedError } func (q *AggregatedQueueServer) assignJobs(request *api.LeaseRequest, slices map[*api.Queue]common.ComputeResourcesFloat) ([]*api.Job, error) { diff --git a/internal/armada/server/submit.go b/internal/armada/server/submit.go index ee881ec3639..d4ca508e205 100644 --- a/internal/armada/server/submit.go +++ b/internal/armada/server/submit.go @@ -96,7 +96,7 @@ func (server *SubmitServer) SubmitJobs(ctx context.Context, req *api.JobSubmitRe func (server *SubmitServer) CancelJobs(ctx context.Context, request *api.JobCancelRequest) (*api.CancellationResult, error) { if request.JobId != "" { - jobs, e := server.jobRepository.GetJobsByIds([]string{request.JobId}) + jobs, e := server.jobRepository.GetExistingJobsByIds([]string{request.JobId}) if e != nil { return nil, status.Errorf(codes.Internal, e.Error()) } @@ -108,7 +108,7 @@ func (server *SubmitServer) CancelJobs(ctx context.Context, request *api.JobCanc if e != nil { return nil, status.Errorf(codes.Aborted, e.Error()) } - jobs, e := server.jobRepository.GetJobsByIds(ids) + jobs, e := server.jobRepository.GetExistingJobsByIds(ids) if e != nil { return nil, status.Errorf(codes.Internal, e.Error()) } @@ -127,12 +127,12 @@ func (server *SubmitServer) cancelJobs(ctx context.Context, queue string, jobs [ return nil, status.Errorf(codes.Unknown, e.Error()) } - cancellationResult := server.jobRepository.Cancel(jobs) + deletionResult := server.jobRepository.DeleteJobs(jobs) cancelled := []*api.Job{} cancelledIds := []string{} - for job, error := range cancellationResult { - if error != nil { - log.Errorf("Error when cancelling job id %s: %s", job.Id, error.Error()) + for job, err := range deletionResult { + if err != nil { + log.Errorf("Error when cancelling job id %s: %s", job.Id, err.Error()) } else { cancelled = append(cancelled, job) cancelledIds = append(cancelledIds, job.Id) diff --git a/internal/armada/server/submit_test.go b/internal/armada/server/submit_test.go index 73d78f713ba..53dc579a6a1 100644 --- a/internal/armada/server/submit_test.go +++ b/internal/armada/server/submit_test.go @@ -72,7 +72,7 @@ func TestSubmitServer_SubmitJob_ReturnsJobItemsInTheSameOrderTheyWereSubmitted(t } //Get jobs for jobIds returned - jobs, _ := s.jobRepository.GetJobsByIds(jobIds) + jobs, _ := s.jobRepository.GetExistingJobsByIds(jobIds) jobSet := make(map[string]*api.Job, 5) for _, job := range jobs { jobSet[job.Id] = job