From ac1f002765c6c664a9c31eba968577ef8c682cc1 Mon Sep 17 00:00:00 2001 From: Chris Martin Date: Mon, 23 Sep 2024 10:04:34 +0100 Subject: [PATCH 1/3] Better Interface To JobRepository (#3958) * wip Signed-off-by: Chris Martin * wip Signed-off-by: Chris Martin --------- Signed-off-by: Chris Martin --- internal/scheduler/jobdb/jobdb.go | 7 ++- internal/scheduler/jobiteration.go | 30 ++++----- internal/scheduler/jobiteration_test.go | 62 +++++++++---------- .../scheduler/preempting_queue_scheduler.go | 11 ++-- .../preempting_queue_scheduler_test.go | 8 +-- internal/scheduler/queue_scheduler.go | 6 +- internal/scheduler/queue_scheduler_test.go | 2 +- internal/scheduler/scheduling_algo.go | 37 +---------- internal/scheduler/simulator/simulator.go | 2 +- 9 files changed, 66 insertions(+), 99 deletions(-) diff --git a/internal/scheduler/jobdb/jobdb.go b/internal/scheduler/jobdb/jobdb.go index 8a65691390a..6b26902c64d 100644 --- a/internal/scheduler/jobdb/jobdb.go +++ b/internal/scheduler/jobdb/jobdb.go @@ -20,6 +20,11 @@ import ( "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" ) +type JobIterator interface { + Next() (*Job, bool) + Done() bool +} + var emptyList = immutable.NewSortedSet[*Job](JobPriorityComparer{}) type JobDb struct { @@ -496,7 +501,7 @@ func (txn *Txn) HasQueuedJobs(queue string) bool { } // QueuedJobs returns true if the queue has any jobs in the running state or false otherwise -func (txn *Txn) QueuedJobs(queue string) *immutable.SortedSetIterator[*Job] { +func (txn *Txn) QueuedJobs(queue string) JobIterator { jobQueue, ok := txn.jobsByQueue[queue] if ok { return jobQueue.Iterator() diff --git a/internal/scheduler/jobiteration.go b/internal/scheduler/jobiteration.go index b814e9144a9..ac21d2b6496 100644 --- a/internal/scheduler/jobiteration.go +++ b/internal/scheduler/jobiteration.go @@ -11,13 +11,13 @@ import ( "github.com/armadaproject/armada/internal/scheduler/jobdb" ) -type JobIterator interface { +type JobContextIterator interface { Next() (*schedulercontext.JobSchedulingContext, error) } type JobRepository interface { - GetQueueJobIds(queueName string) []string - GetExistingJobsByIds(ids []string) []*jobdb.Job + QueuedJobs(queueName string) jobdb.JobIterator + GetById(id string) *jobdb.Job } type InMemoryJobIterator struct { @@ -97,7 +97,7 @@ func (repo *InMemoryJobRepository) GetExistingJobsByIds(jobIds []string) []*jobd return rv } -func (repo *InMemoryJobRepository) GetJobIterator(queue string) JobIterator { +func (repo *InMemoryJobRepository) GetJobIterator(queue string) JobContextIterator { repo.mu.Lock() defer repo.mu.Unlock() return NewInMemoryJobIterator(slices.Clone(repo.jctxsByQueue[queue])) @@ -105,17 +105,14 @@ func (repo *InMemoryJobRepository) GetJobIterator(queue string) JobIterator { // QueuedJobsIterator is an iterator over all jobs in a queue. type QueuedJobsIterator struct { - repo JobRepository - jobIds []string - idx int - ctx *armadacontext.Context + jobIter jobdb.JobIterator + ctx *armadacontext.Context } func NewQueuedJobsIterator(ctx *armadacontext.Context, queue string, repo JobRepository) *QueuedJobsIterator { return &QueuedJobsIterator{ - jobIds: repo.GetQueueJobIds(queue), - repo: repo, - ctx: ctx, + jobIter: repo.QueuedJobs(queue), + ctx: ctx, } } @@ -124,22 +121,21 @@ func (it *QueuedJobsIterator) Next() (*schedulercontext.JobSchedulingContext, er case <-it.ctx.Done(): return nil, it.ctx.Err() default: - if it.idx >= len(it.jobIds) { + job, _ := it.jobIter.Next() + if job == nil { return nil, nil } - job := it.repo.GetExistingJobsByIds([]string{it.jobIds[it.idx]}) - it.idx++ - return schedulercontext.JobSchedulingContextFromJob(job[0]), nil + return schedulercontext.JobSchedulingContextFromJob(job), nil } } // MultiJobsIterator chains several JobIterators together in the order provided. type MultiJobsIterator struct { i int - its []JobIterator + its []JobContextIterator } -func NewMultiJobsIterator(its ...JobIterator) *MultiJobsIterator { +func NewMultiJobsIterator(its ...JobContextIterator) *MultiJobsIterator { return &MultiJobsIterator{ its: its, } diff --git a/internal/scheduler/jobiteration_test.go b/internal/scheduler/jobiteration_test.go index c4f10a00bfa..a10866535b1 100644 --- a/internal/scheduler/jobiteration_test.go +++ b/internal/scheduler/jobiteration_test.go @@ -62,7 +62,7 @@ func TestMultiJobsIterator_TwoQueues(t *testing.T) { } ctx := armadacontext.Background() - its := make([]JobIterator, 3) + its := make([]JobContextIterator, 3) for i, queue := range []string{"A", "B", "C"} { it := NewQueuedJobsIterator(ctx, queue, repo) its[i] = it @@ -214,20 +214,43 @@ func TestCreateQueuedJobsIterator_NilOnEmpty(t *testing.T) { assert.NoError(t, err) } -// TODO: Deprecate in favour of InMemoryRepo. +type mockJobIterator struct { + jobs []*jobdb.Job + i int +} + +func (iter *mockJobIterator) Done() bool { + return iter.i >= len(iter.jobs) +} + +func (iter *mockJobIterator) Next() (*jobdb.Job, bool) { + if iter.Done() { + return nil, false + } + job := iter.jobs[iter.i] + iter.i++ + return job, true +} + type mockJobRepository struct { jobsByQueue map[string][]*jobdb.Job jobsById map[string]*jobdb.Job - // Ids of all jobs hat were leased to an executor. - leasedJobs map[string]bool - getQueueJobIdsDelay time.Duration +} + +func (repo *mockJobRepository) QueuedJobs(queueName string) jobdb.JobIterator { + q := repo.jobsByQueue[queueName] + return &mockJobIterator{jobs: q} +} + +func (repo *mockJobRepository) GetById(id string) *jobdb.Job { + j, _ := repo.jobsById[id] + return j } func newMockJobRepository() *mockJobRepository { return &mockJobRepository{ jobsByQueue: make(map[string][]*jobdb.Job), jobsById: make(map[string]*jobdb.Job), - leasedJobs: make(map[string]bool), } } @@ -242,35 +265,10 @@ func (repo *mockJobRepository) Enqueue(job *jobdb.Job) { repo.jobsById[job.Id()] = job } -func (repo *mockJobRepository) GetJobIterator(ctx *armadacontext.Context, queue string) JobIterator { +func (repo *mockJobRepository) GetJobIterator(ctx *armadacontext.Context, queue string) JobContextIterator { return NewQueuedJobsIterator(ctx, queue, repo) } -func (repo *mockJobRepository) GetQueueJobIds(queue string) []string { - time.Sleep(repo.getQueueJobIdsDelay) - if jobs, ok := repo.jobsByQueue[queue]; ok { - rv := make([]string, 0, len(jobs)) - for _, job := range jobs { - if !repo.leasedJobs[job.Id()] { - rv = append(rv, job.Id()) - } - } - return rv - } else { - return make([]string, 0) - } -} - -func (repo *mockJobRepository) GetExistingJobsByIds(jobIds []string) []*jobdb.Job { - rv := make([]*jobdb.Job, len(jobIds)) - for i, jobId := range jobIds { - if job, ok := repo.jobsById[jobId]; ok { - rv[i] = job - } - } - return rv -} - func jobFromPodSpec(queue string, req *schedulerobjects.PodRequirements) *jobdb.Job { return testfixtures.TestJob(queue, util.ULID(), "armada-default", req) } diff --git a/internal/scheduler/preempting_queue_scheduler.go b/internal/scheduler/preempting_queue_scheduler.go index 8b4fed9225e..695e728957b 100644 --- a/internal/scheduler/preempting_queue_scheduler.go +++ b/internal/scheduler/preempting_queue_scheduler.go @@ -525,7 +525,7 @@ func addEvictedJobsToNodeDb(_ *armadacontext.Context, sctx *schedulercontext.Sch } func (sch *PreemptingQueueScheduler) schedule(ctx *armadacontext.Context, inMemoryJobRepo *InMemoryJobRepository, jobRepo JobRepository) (*schedulerresult.SchedulerResult, error) { - jobIteratorByQueue := make(map[string]JobIterator) + jobIteratorByQueue := make(map[string]JobContextIterator) for _, qctx := range sch.schedulingContext.QueueSchedulingContexts { evictedIt := inMemoryJobRepo.GetJobIterator(qctx.Queue) if jobRepo == nil || reflect.ValueOf(jobRepo).IsNil() { @@ -821,13 +821,16 @@ func (evi *Evictor) Evict(ctx *armadacontext.Context, nodeDbTxn *memdb.Txn) (*Ev if evi.nodeFilter != nil && !evi.nodeFilter(ctx, node) { continue } - jobIds := make([]string, 0, len(node.AllocatedByJobId)) + jobs := make([]*jobdb.Job, 0, len(node.AllocatedByJobId)) for jobId := range node.AllocatedByJobId { if _, ok := node.EvictedJobRunIds[jobId]; !ok { - jobIds = append(jobIds, jobId) + job := evi.jobRepo.GetById(jobId) + if job != nil { + jobs = append(jobs, job) + } + } } - jobs := evi.jobRepo.GetExistingJobsByIds(jobIds) evictedJobs, node, err := evi.nodeDb.EvictJobsFromNode(jobFilter, jobs, node) if err != nil { return nil, err diff --git a/internal/scheduler/preempting_queue_scheduler_test.go b/internal/scheduler/preempting_queue_scheduler_test.go index 849b3262440..c738b0c1512 100644 --- a/internal/scheduler/preempting_queue_scheduler_test.go +++ b/internal/scheduler/preempting_queue_scheduler_test.go @@ -57,7 +57,7 @@ func TestEvictOversubscribed(t *testing.T) { require.NoError(t, err) evictor := NewOversubscribedEvictor( - NewSchedulerJobRepositoryAdapter(jobDbTxn), + jobDbTxn, nodeDb) result, err := evictor.Evict(armadacontext.Background(), nodeDbTxn) require.NoError(t, err) @@ -1862,7 +1862,7 @@ func TestPreemptingQueueScheduler(t *testing.T) { constraints, testfixtures.TestEmptyFloatingResources, tc.SchedulingConfig.ProtectedFractionOfFairShare, - NewSchedulerJobRepositoryAdapter(jobDbTxn), + jobDbTxn, nodeDb, nodeIdByJobId, jobIdsByGangId, @@ -2209,7 +2209,7 @@ func BenchmarkPreemptingQueueScheduler(b *testing.B) { constraints, testfixtures.TestEmptyFloatingResources, tc.SchedulingConfig.ProtectedFractionOfFairShare, - NewSchedulerJobRepositoryAdapter(jobDbTxn), + jobDbTxn, nodeDb, nil, nil, @@ -2268,7 +2268,7 @@ func BenchmarkPreemptingQueueScheduler(b *testing.B) { constraints, testfixtures.TestEmptyFloatingResources, tc.SchedulingConfig.ProtectedFractionOfFairShare, - NewSchedulerJobRepositoryAdapter(jobDbTxn), + jobDbTxn, nodeDb, nil, nil, diff --git a/internal/scheduler/queue_scheduler.go b/internal/scheduler/queue_scheduler.go index b2e9d2f8916..9cac41665bf 100644 --- a/internal/scheduler/queue_scheduler.go +++ b/internal/scheduler/queue_scheduler.go @@ -32,7 +32,7 @@ func NewQueueScheduler( constraints schedulerconstraints.SchedulingConstraints, floatingResourceTypes *floatingresources.FloatingResourceTypes, nodeDb *nodedb.NodeDb, - jobIteratorByQueue map[string]JobIterator, + jobIteratorByQueue map[string]JobContextIterator, ) (*QueueScheduler, error) { for queue := range jobIteratorByQueue { if _, ok := sctx.QueueSchedulingContexts[queue]; !ok { @@ -219,7 +219,7 @@ func (sch *QueueScheduler) Schedule(ctx *armadacontext.Context) (*schedulerresul // Jobs without gangIdAnnotation are considered gangs of cardinality 1. type QueuedGangIterator struct { schedulingContext *schedulercontext.SchedulingContext - queuedJobsIterator JobIterator + queuedJobsIterator JobContextIterator // Groups jctxs by the gang they belong to. jctxsByGangId map[string][]*schedulercontext.JobSchedulingContext // Maximum number of jobs to look at before giving up. @@ -231,7 +231,7 @@ type QueuedGangIterator struct { next *schedulercontext.GangSchedulingContext } -func NewQueuedGangIterator(sctx *schedulercontext.SchedulingContext, it JobIterator, maxLookback uint, skipKnownUnschedulableJobs bool) *QueuedGangIterator { +func NewQueuedGangIterator(sctx *schedulercontext.SchedulingContext, it JobContextIterator, maxLookback uint, skipKnownUnschedulableJobs bool) *QueuedGangIterator { return &QueuedGangIterator{ schedulingContext: sctx, queuedJobsIterator: it, diff --git a/internal/scheduler/queue_scheduler_test.go b/internal/scheduler/queue_scheduler_test.go index 050fd2a97ee..17189fd0900 100644 --- a/internal/scheduler/queue_scheduler_test.go +++ b/internal/scheduler/queue_scheduler_test.go @@ -535,7 +535,7 @@ func TestQueueScheduler(t *testing.T) { require.NoError(t, err) } constraints := schedulerconstraints.NewSchedulingConstraints("pool", tc.TotalResources, tc.SchedulingConfig, tc.Queues, map[string]bool{}) - jobIteratorByQueue := make(map[string]JobIterator) + jobIteratorByQueue := make(map[string]JobContextIterator) for _, q := range tc.Queues { it := jobRepo.GetJobIterator(q.Name) jobIteratorByQueue[q.Name] = it diff --git a/internal/scheduler/scheduling_algo.go b/internal/scheduler/scheduling_algo.go index 176b3ec733f..3457af313f9 100644 --- a/internal/scheduler/scheduling_algo.go +++ b/internal/scheduler/scheduling_algo.go @@ -478,7 +478,7 @@ func (l *FairSchedulingAlgo) schedulePool( constraints, l.floatingResourceTypes, l.schedulingConfig.ProtectedFractionOfFairShare, - NewSchedulerJobRepositoryAdapter(fsctx.txn), + fsctx.txn, nodeDb, fsctx.nodeIdByJobId, fsctx.jobIdsByGangId, @@ -528,41 +528,6 @@ func (l *FairSchedulingAlgo) schedulePool( return result, sctx, nil } -// SchedulerJobRepositoryAdapter allows jobDb implement the JobRepository interface. -// TODO: Pass JobDb into the scheduler instead of using this shim to convert to a JobRepo. -type SchedulerJobRepositoryAdapter struct { - txn *jobdb.Txn -} - -func NewSchedulerJobRepositoryAdapter(txn *jobdb.Txn) *SchedulerJobRepositoryAdapter { - return &SchedulerJobRepositoryAdapter{ - txn: txn, - } -} - -// GetQueueJobIds is necessary to implement the JobRepository interface, which we need while transitioning from the old -// to new scheduler. -func (repo *SchedulerJobRepositoryAdapter) GetQueueJobIds(queue string) []string { - rv := make([]string, 0) - it := repo.txn.QueuedJobs(queue) - for v, _ := it.Next(); v != nil; v, _ = it.Next() { - rv = append(rv, v.Id()) - } - return rv -} - -// GetExistingJobsByIds is necessary to implement the JobRepository interface which we need while transitioning from the -// old to new scheduler. -func (repo *SchedulerJobRepositoryAdapter) GetExistingJobsByIds(ids []string) []*jobdb.Job { - rv := make([]*jobdb.Job, 0, len(ids)) - for _, id := range ids { - if job := repo.txn.GetById(id); job != nil { - rv = append(rv, job) - } - } - return rv -} - // populateNodeDb adds all the nodes and jobs associated with a particular pool to the nodeDb. func (l *FairSchedulingAlgo) populateNodeDb(nodeDb *nodedb.NodeDb, jobs []*jobdb.Job, nodes []*schedulerobjects.Node) error { txn := nodeDb.Txn(true) diff --git a/internal/scheduler/simulator/simulator.go b/internal/scheduler/simulator/simulator.go index 9f8c2e52584..93669e45186 100644 --- a/internal/scheduler/simulator/simulator.go +++ b/internal/scheduler/simulator/simulator.go @@ -513,7 +513,7 @@ func (s *Simulator) handleScheduleEvent(ctx *armadacontext.Context) error { constraints, nloatingResourceTypes, s.schedulingConfig.ProtectedFractionOfFairShare, - scheduler.NewSchedulerJobRepositoryAdapter(txn), + txn, nodeDb, // TODO: Necessary to support partial eviction. nil, From 977628ccd2e72efca4af7e1096451b09c544b892 Mon Sep 17 00:00:00 2001 From: Eleanor Pratt <101330560+eleanorpratt@users.noreply.github.com> Date: Mon, 23 Sep 2024 11:46:09 +0100 Subject: [PATCH 2/3] Separate Job Spec to own table to improve performance (#3961) Co-authored-by: Eleanor Pratt --- .../lookoutingesterv2/lookoutdb/insertion.go | 114 ++++++++++++++++-- .../lookoutdb/insertion_test.go | 30 ++++- internal/lookoutv2/pruner/pruner.go | 1 + internal/lookoutv2/pruner/pruner_test.go | 1 + internal/lookoutv2/repository/getjobspec.go | 11 +- .../lookoutv2/repository/getjobspec_test.go | 79 ++++++++++++ .../migrations/013_add_job_spec_table.sql | 6 + 7 files changed, 228 insertions(+), 14 deletions(-) create mode 100644 internal/lookoutv2/schema/migrations/013_add_job_spec_table.sql diff --git a/internal/lookoutingesterv2/lookoutdb/insertion.go b/internal/lookoutingesterv2/lookoutdb/insertion.go index 481c888bf45..f65bf774180 100644 --- a/internal/lookoutingesterv2/lookoutdb/insertion.go +++ b/internal/lookoutingesterv2/lookoutdb/insertion.go @@ -52,7 +52,18 @@ func (l *LookoutDb) Store(ctx *armadacontext.Context, instructions *model.Instru start := time.Now() // Jobs need to be ingested first as other updates may reference these - l.CreateJobs(ctx, instructions.JobsToCreate) + wgJobIngestion := sync.WaitGroup{} + wgJobIngestion.Add(2) + go func() { + defer wgJobIngestion.Done() + l.CreateJobs(ctx, instructions.JobsToCreate) + }() + go func() { + defer wgJobIngestion.Done() + l.CreateJobSpecs(ctx, instructions.JobsToCreate) + }() + + wgJobIngestion.Wait() // Now we can job updates, annotations and new job runs wg := sync.WaitGroup{} @@ -98,6 +109,22 @@ func (l *LookoutDb) CreateJobs(ctx *armadacontext.Context, instructions []*model log.Infof("Inserted %d jobs in %s", len(instructions), taken) } +func (l *LookoutDb) CreateJobSpecs(ctx *armadacontext.Context, instructions []*model.CreateJobInstruction) { + if len(instructions) == 0 { + return + } + start := time.Now() + err := l.CreateJobSpecsBatch(ctx, instructions) + if err != nil { + log.WithError(err).Warn("Creating job specs via batch failed, will attempt to insert serially (this might be slow).") + l.CreateJobSpecsScalar(ctx, instructions) + } + taken := time.Since(start) + l.metrics.RecordAvRowChangeTimeByOperation("job_spec", commonmetrics.DBOperationInsert, len(instructions), taken) + l.metrics.RecordRowsChange("job_spec", commonmetrics.DBOperationInsert, len(instructions)) + log.Infof("Inserted %d job specs in %s", len(instructions), taken) +} + func (l *LookoutDb) UpdateJobs(ctx *armadacontext.Context, instructions []*model.UpdateJobInstruction) { if len(instructions) == 0 { return @@ -185,7 +212,6 @@ func (l *LookoutDb) CreateJobsBatch(ctx *armadacontext.Context, instructions []* state smallint, last_transition_time timestamp, last_transition_time_seconds bigint, - job_spec bytea, priority_class varchar(63), annotations jsonb ) ON COMMIT DROP;`, tmpTable)) @@ -213,7 +239,6 @@ func (l *LookoutDb) CreateJobsBatch(ctx *armadacontext.Context, instructions []* "state", "last_transition_time", "last_transition_time_seconds", - "job_spec", "priority_class", "annotations", }, @@ -233,7 +258,6 @@ func (l *LookoutDb) CreateJobsBatch(ctx *armadacontext.Context, instructions []* instructions[i].State, instructions[i].LastTransitionTime, instructions[i].LastTransitionTimeSeconds, - instructions[i].JobProto, instructions[i].PriorityClass, instructions[i].Annotations, }, nil @@ -261,7 +285,6 @@ func (l *LookoutDb) CreateJobsBatch(ctx *armadacontext.Context, instructions []* state, last_transition_time, last_transition_time_seconds, - job_spec, priority_class, annotations ) SELECT * from %s @@ -294,11 +317,10 @@ func (l *LookoutDb) CreateJobsScalar(ctx *armadacontext.Context, instructions [] state, last_transition_time, last_transition_time_seconds, - job_spec, priority_class, annotations ) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16) ON CONFLICT DO NOTHING` for _, i := range instructions { err := l.withDatabaseRetryInsert(func() error { @@ -317,7 +339,6 @@ func (l *LookoutDb) CreateJobsScalar(ctx *armadacontext.Context, instructions [] i.State, i.LastTransitionTime, i.LastTransitionTimeSeconds, - i.JobProto, i.PriorityClass, i.Annotations, ) @@ -446,6 +467,83 @@ func (l *LookoutDb) UpdateJobsScalar(ctx *armadacontext.Context, instructions [] } } +func (l *LookoutDb) CreateJobSpecsBatch(ctx *armadacontext.Context, instructions []*model.CreateJobInstruction) error { + return l.withDatabaseRetryInsert(func() error { + tmpTable := "job_spec_create_tmp" + + createTmp := func(tx pgx.Tx) error { + _, err := tx.Exec(ctx, fmt.Sprintf(` + CREATE TEMPORARY TABLE %s ( + job_id varchar(32), + job_spec bytea + ) ON COMMIT DROP;`, tmpTable)) + if err != nil { + l.metrics.RecordDBError(commonmetrics.DBOperationCreateTempTable) + } + return err + } + + insertTmp := func(tx pgx.Tx) error { + _, err := tx.CopyFrom(ctx, + pgx.Identifier{tmpTable}, + []string{ + "job_id", + "job_spec", + }, + pgx.CopyFromSlice(len(instructions), func(i int) ([]interface{}, error) { + return []interface{}{ + instructions[i].JobId, + instructions[i].JobProto, + }, nil + }), + ) + return err + } + + copyToDest := func(tx pgx.Tx) error { + _, err := tx.Exec( + ctx, + fmt.Sprintf(` + INSERT INTO job_spec ( + job_id, + job_spec + ) SELECT * from %s + ON CONFLICT DO NOTHING`, tmpTable), + ) + if err != nil { + l.metrics.RecordDBError(commonmetrics.DBOperationInsert) + } + return err + } + + return batchInsert(ctx, l.db, createTmp, insertTmp, copyToDest) + }) +} + +func (l *LookoutDb) CreateJobSpecsScalar(ctx *armadacontext.Context, instructions []*model.CreateJobInstruction) { + sqlStatement := `INSERT INTO job_spec ( + job_id, + job_spec + ) + VALUES ($1, $2) + ON CONFLICT DO NOTHING` + for _, i := range instructions { + err := l.withDatabaseRetryInsert(func() error { + _, err := l.db.Exec(ctx, sqlStatement, + i.JobId, + i.JobProto, + ) + if err != nil { + l.metrics.RecordDBError(commonmetrics.DBOperationInsert) + } + return err + }) + if err != nil { + log.WithError(err).Warnf("Create job spec for job %s, jobset %s failed", i.JobId, i.JobSet) + } + } +} + func (l *LookoutDb) CreateJobRunsBatch(ctx *armadacontext.Context, instructions []*model.CreateJobRunInstruction) error { return l.withDatabaseRetryInsert(func() error { tmpTable := "job_run_create_tmp" diff --git a/internal/lookoutingesterv2/lookoutdb/insertion_test.go b/internal/lookoutingesterv2/lookoutdb/insertion_test.go index 0806c841c83..5f20ad259cf 100644 --- a/internal/lookoutingesterv2/lookoutdb/insertion_test.go +++ b/internal/lookoutingesterv2/lookoutdb/insertion_test.go @@ -79,6 +79,11 @@ type JobRow struct { Annotations map[string]string } +type JobSpecRow struct { + JobId string + JobProto []byte +} + type JobRunRow struct { RunId string JobId string @@ -147,7 +152,7 @@ var expectedJobAfterSubmit = JobRow{ State: lookout.JobQueuedOrdinal, LastTransitionTime: baseTime, LastTransitionTimeSeconds: baseTime.Unix(), - JobProto: []byte(jobProto), + JobProto: []byte(nil), Duplicate: false, PriorityClass: priorityClass, Annotations: annotations, @@ -167,7 +172,7 @@ var expectedJobAfterUpdate = JobRow{ State: lookout.JobFailedOrdinal, LastTransitionTime: updateTime, LastTransitionTimeSeconds: updateTime.Unix(), - JobProto: []byte(jobProto), + JobProto: []byte(nil), Duplicate: false, PriorityClass: priorityClass, Annotations: annotations, @@ -841,10 +846,10 @@ func TestStoreNullValue(t *testing.T) { err := ldb.Store(armadacontext.Background(), instructions) assert.NoError(t, err) - job := getJob(t, ldb.db, jobIdString) + jobSpec := getJobSpec(t, ldb.db, jobIdString) jobRun := getJobRun(t, ldb.db, runIdString) - assert.Equal(t, jobProto, job.JobProto) + assert.Equal(t, jobProto, jobSpec.JobProto) assert.Equal(t, errorMsg, jobRun.Error) assert.Equal(t, debugMsg, jobRun.Debug) return nil @@ -988,6 +993,23 @@ func getJob(t *testing.T, db *pgxpool.Pool, jobId string) JobRow { return job } +func getJobSpec(t *testing.T, db *pgxpool.Pool, jobId string) JobSpecRow { + jobSpec := JobSpecRow{} + r := db.QueryRow( + armadacontext.Background(), + `SELECT + job_id, + job_spec + FROM job_spec WHERE job_id = $1`, + jobId) + err := r.Scan( + &jobSpec.JobId, + &jobSpec.JobProto, + ) + assert.Nil(t, err) + return jobSpec +} + func getJobRun(t *testing.T, db *pgxpool.Pool, runId string) JobRunRow { run := JobRunRow{} r := db.QueryRow( diff --git a/internal/lookoutv2/pruner/pruner.go b/internal/lookoutv2/pruner/pruner.go index 6ece3dd3962..2b87e90d5b6 100644 --- a/internal/lookoutv2/pruner/pruner.go +++ b/internal/lookoutv2/pruner/pruner.go @@ -132,6 +132,7 @@ func deleteBatch(ctx *armadacontext.Context, tx pgx.Tx, batchLimit int) (int, er } _, err = tx.Exec(ctx, ` DELETE FROM job WHERE job_id in (SELECT job_id from batch); + DELETE FROM job_spec WHERE job_id in (SELECT job_id from batch); DELETE FROM job_run WHERE job_id in (SELECT job_id from batch); DELETE FROM job_ids_to_delete WHERE job_id in (SELECT job_id from batch); TRUNCATE TABLE batch;`) diff --git a/internal/lookoutv2/pruner/pruner_test.go b/internal/lookoutv2/pruner/pruner_test.go index 3fae15ac9b2..bf34c998b2b 100644 --- a/internal/lookoutv2/pruner/pruner_test.go +++ b/internal/lookoutv2/pruner/pruner_test.go @@ -145,6 +145,7 @@ func TestPruneDb(t *testing.T) { queriedJobIdsPerTable := []map[string]bool{ selectStringSet(t, db, "SELECT job_id FROM job"), + selectStringSet(t, db, "SELECT job_id FROM job_spec"), selectStringSet(t, db, "SELECT DISTINCT job_id FROM job_run"), } for _, queriedJobs := range queriedJobIdsPerTable { diff --git a/internal/lookoutv2/repository/getjobspec.go b/internal/lookoutv2/repository/getjobspec.go index 55799249f35..8e51fe89871 100644 --- a/internal/lookoutv2/repository/getjobspec.go +++ b/internal/lookoutv2/repository/getjobspec.go @@ -30,10 +30,17 @@ func NewSqlGetJobSpecRepository(db *pgxpool.Pool, decompressor compress.Decompre func (r *SqlGetJobSpecRepository) GetJobSpec(ctx *armadacontext.Context, jobId string) (*api.Job, error) { var rawBytes []byte - err := r.db.QueryRow(ctx, "SELECT job_spec FROM job WHERE job_id = $1", jobId).Scan(&rawBytes) + + err := r.db.QueryRow( + ctx, ` + SELECT + COALESCE(job_spec.job_spec, job.job_spec) + FROM job LEFT JOIN job_spec + ON job.job_id = job_spec.job_id + WHERE job.job_id = $1`, jobId).Scan(&rawBytes) if err != nil { if err == pgx.ErrNoRows { - return nil, errors.Errorf("job with id %s not found", jobId) + return nil, errors.Errorf("job_spec with job id %s not found", jobId) } return nil, err } diff --git a/internal/lookoutv2/repository/getjobspec_test.go b/internal/lookoutv2/repository/getjobspec_test.go index 62e619dbf41..7eeb439905b 100644 --- a/internal/lookoutv2/repository/getjobspec_test.go +++ b/internal/lookoutv2/repository/getjobspec_test.go @@ -3,6 +3,7 @@ package repository import ( "testing" + "github.com/gogo/protobuf/proto" "github.com/jackc/pgx/v5/pgxpool" "github.com/stretchr/testify/assert" @@ -50,6 +51,84 @@ func TestGetJobSpec(t *testing.T) { assert.NoError(t, err) } +func TestMIGRATEDGetJobSpec(t *testing.T) { + var migratedResult *api.Job + err := lookout.WithLookoutDb(func(db *pgxpool.Pool) error { + converter := instructions.NewInstructionConverter(metrics.Get().Metrics, userAnnotationPrefix, &compress.NoOpCompressor{}) + store := lookoutdb.NewLookoutDb(db, nil, metrics.Get(), 10) + + _ = NewJobSimulator(converter, store). + Submit(queue, jobSet, owner, namespace, baseTime, &JobOptions{ + JobId: jobId, + Priority: priority, + PriorityClass: "other-default", + Cpu: cpu, + Memory: memory, + EphemeralStorage: ephemeralStorage, + Gpu: gpu, + Annotations: map[string]string{ + "step_path": "/1/2/3", + "hello": "world", + }, + }). + Pending(runId, cluster, baseTime). + Running(runId, node, baseTime). + RunSucceeded(runId, baseTime). + Succeeded(baseTime). + Build(). + ApiJob() + + repo := NewSqlGetJobSpecRepository(db, &compress.NoOpDecompressor{}) + var err error + migratedResult, err = repo.GetJobSpec(armadacontext.TODO(), jobId) + assert.NoError(t, err) + return nil + }) + assert.NoError(t, err) + + var result *api.Job + err = lookout.WithLookoutDb(func(db *pgxpool.Pool) error { + bytes, err := proto.Marshal(migratedResult) + assert.NoError(t, err) + + _, err = db.Exec(armadacontext.Background(), + `INSERT INTO job ( + job_id, queue, owner, namespace, jobset, + cpu, + memory, + ephemeral_storage, + gpu, + priority, + submitted, + state, + last_transition_time, + last_transition_time_seconds, + job_spec, + priority_class, + annotations + ) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17) + ON CONFLICT DO NOTHING`, + jobId, queue, owner, namespace, jobSet, + int64(15), int64(48*1024*1024*1024), int64(100*1024*1024*1024), 8, + priority, baseTime, 1, baseTime, baseTime.Unix(), bytes, "other-default", + map[string]string{ + "step_path": "/1/2/3", + "hello": "world", + }) + assert.NoError(t, err) + + repo := NewSqlGetJobSpecRepository(db, &compress.NoOpDecompressor{}) + result, err = repo.GetJobSpec(armadacontext.TODO(), jobId) + assert.NoError(t, err) + + return nil + }) + assert.NoError(t, err) + + assertApiJobsEquivalent(t, migratedResult, result) +} + func TestGetJobSpecError(t *testing.T) { err := lookout.WithLookoutDb(func(db *pgxpool.Pool) error { repo := NewSqlGetJobSpecRepository(db, &compress.NoOpDecompressor{}) diff --git a/internal/lookoutv2/schema/migrations/013_add_job_spec_table.sql b/internal/lookoutv2/schema/migrations/013_add_job_spec_table.sql new file mode 100644 index 00000000000..36f3fef6fd7 --- /dev/null +++ b/internal/lookoutv2/schema/migrations/013_add_job_spec_table.sql @@ -0,0 +1,6 @@ +CREATE TABLE IF NOT EXISTS job_spec ( + job_id varchar(32) NOT NULL PRIMARY KEY, + job_spec bytea NOT NULL + ); +ALTER TABLE job_spec ALTER COLUMN job_spec SET STORAGE EXTERNAL; +ALTER TABLE job ALTER COLUMN job_spec DROP NOT NULL; From c3d5faf5b3a9e09e214c41d51cf342c809353f23 Mon Sep 17 00:00:00 2001 From: Chris Martin Date: Mon, 23 Sep 2024 12:23:24 +0100 Subject: [PATCH 3/3] Remove assertions from preempting_queue_scheduler.go (#3959) * Remove assertions from preempting_queue_scheduler.go Signed-off-by: Chris Martin * Remove NodeJobDiff + iterators that are now unused Signed-off-by: JamesMurkin --------- Signed-off-by: Chris Martin Signed-off-by: JamesMurkin Co-authored-by: JamesMurkin --- internal/scheduler/nodedb/nodedb.go | 41 -------- internal/scheduler/nodedb/nodeiteration.go | 78 --------------- .../scheduler/nodedb/nodeiteration_test.go | 49 ---------- .../scheduler/preempting_queue_scheduler.go | 95 ------------------- .../preempting_queue_scheduler_test.go | 1 - internal/scheduler/scheduling_algo.go | 3 - 6 files changed, 267 deletions(-) diff --git a/internal/scheduler/nodedb/nodedb.go b/internal/scheduler/nodedb/nodedb.go index d0d8a9b90d3..ff7d12fcfa7 100644 --- a/internal/scheduler/nodedb/nodedb.go +++ b/internal/scheduler/nodedb/nodedb.go @@ -401,47 +401,6 @@ func (nodeDb *NodeDb) GetNodeWithTxn(txn *memdb.Txn, id string) (*internaltypes. return obj.(*internaltypes.Node), nil } -// NodeJobDiff compares two snapshots of the NodeDb memdb and returns -// - a map from job ids of all preempted jobs to the node they used to be on -// - a map from job ids of all scheduled jobs to the node they were scheduled on -// that happened between the two snapshots. -func NodeJobDiff(txnA, txnB *memdb.Txn) (map[string]*internaltypes.Node, map[string]*internaltypes.Node, error) { - preempted := make(map[string]*internaltypes.Node) - scheduled := make(map[string]*internaltypes.Node) - nodePairIterator, err := NewNodePairIterator(txnA, txnB) - if err != nil { - return nil, nil, err - } - for item := nodePairIterator.NextItem(); item != nil; item = nodePairIterator.NextItem() { - if item.NodeA != nil && item.NodeB == nil { - // NodeA was removed. All jobs on NodeA are preempted. - for jobId := range item.NodeA.AllocatedByJobId { - preempted[jobId] = item.NodeA - } - } else if item.NodeA == nil && item.NodeB != nil { - // NodeB was added. All jobs on NodeB are scheduled. - for jobId := range item.NodeB.AllocatedByJobId { - scheduled[jobId] = item.NodeB - } - } else if item.NodeA != nil && item.NodeB != nil { - // NodeA is the same as NodeB. - // Jobs on NodeA that are not on NodeB are preempted. - // Jobs on NodeB that are not on NodeA are scheduled. - for jobId := range item.NodeA.AllocatedByJobId { - if _, ok := item.NodeB.AllocatedByJobId[jobId]; !ok { - preempted[jobId] = item.NodeA - } - } - for jobId := range item.NodeB.AllocatedByJobId { - if _, ok := item.NodeA.AllocatedByJobId[jobId]; !ok { - scheduled[jobId] = item.NodeB - } - } - } - } - return preempted, scheduled, nil -} - func (nodeDb *NodeDb) ScheduleManyWithTxn(txn *memdb.Txn, gctx *schedulercontext.GangSchedulingContext) (bool, error) { // Attempt to schedule pods one by one in a transaction. for _, jctx := range gctx.JobSchedulingContexts { diff --git a/internal/scheduler/nodedb/nodeiteration.go b/internal/scheduler/nodedb/nodeiteration.go index 77a0cae80dc..3e171007139 100644 --- a/internal/scheduler/nodedb/nodeiteration.go +++ b/internal/scheduler/nodedb/nodeiteration.go @@ -47,84 +47,6 @@ func (it *NodesIterator) Next() interface{} { return it.NextNode() } -type NodePairIterator struct { - itA *NodesIterator - itB *NodesIterator - nodeA *internaltypes.Node - nodeB *internaltypes.Node -} - -type NodePairIteratorItem struct { - NodeA *internaltypes.Node - NodeB *internaltypes.Node -} - -func NewNodePairIterator(txnA, txnB *memdb.Txn) (*NodePairIterator, error) { - itA, err := NewNodesIterator(txnA) - if err != nil { - return nil, errors.WithStack(err) - } - itB, err := NewNodesIterator(txnB) - if err != nil { - return nil, errors.WithStack(err) - } - return &NodePairIterator{ - itA: itA, - itB: itB, - }, nil -} - -func (it *NodePairIterator) WatchCh() <-chan struct{} { - panic("not implemented") -} - -func (it *NodePairIterator) NextItem() (rv *NodePairIteratorItem) { - defer func() { - if rv == nil { - return - } - if rv.NodeA != nil { - it.nodeA = nil - } - if rv.NodeB != nil { - it.nodeB = nil - } - }() - if it.nodeA == nil { - it.nodeA = it.itA.NextNode() - } - if it.nodeB == nil { - it.nodeB = it.itB.NextNode() - } - if it.nodeA == nil && it.nodeB == nil { - return nil - } else if it.nodeA == nil || it.nodeB == nil { - return &NodePairIteratorItem{ - NodeA: it.nodeA, - NodeB: it.nodeB, - } - } - cmp := bytes.Compare([]byte(it.nodeA.GetId()), []byte(it.nodeB.GetId())) - if cmp == 0 { - return &NodePairIteratorItem{ - NodeA: it.nodeA, - NodeB: it.nodeB, - } - } else if cmp == -1 { - return &NodePairIteratorItem{ - NodeA: it.nodeA, - } - } else { - return &NodePairIteratorItem{ - NodeB: it.nodeB, - } - } -} - -func (it *NodePairIterator) Next() interface{} { - return it.NextItem() -} - // NodeIndex is an index for internaltypes.Node that returns node.NodeDbKeys[KeyIndex]. type NodeIndex struct { KeyIndex int diff --git a/internal/scheduler/nodedb/nodeiteration_test.go b/internal/scheduler/nodedb/nodeiteration_test.go index 2355a81b563..53580e8489f 100644 --- a/internal/scheduler/nodedb/nodeiteration_test.go +++ b/internal/scheduler/nodedb/nodeiteration_test.go @@ -71,55 +71,6 @@ func TestNodesIterator(t *testing.T) { } } -func TestNodePairIterator(t *testing.T) { - nodes := testfixtures.TestCluster() - for i, nodeId := range []string{"A", "B", "C"} { - nodes[i].Id = nodeId - } - nodeDb, err := newNodeDbWithNodes(nodes) - require.NoError(t, err) - entries := make([]*internaltypes.Node, len(nodes)) - for i, node := range nodes { - entry, err := nodeDb.GetNode(node.Id) - require.NoError(t, err) - entries[i] = entry - } - - txn := nodeDb.Txn(true) - require.NoError(t, txn.Delete("nodes", entries[2])) - txn.Commit() - txnA := nodeDb.Txn(false) - - txn = nodeDb.Txn(true) - require.NoError(t, txn.Delete("nodes", entries[0])) - require.NoError(t, txn.Insert("nodes", entries[2])) - txn.Commit() - txnB := nodeDb.Txn(false) - - it, err := NewNodePairIterator(txnA, txnB) - require.NoError(t, err) - - actual := make([]*NodePairIteratorItem, 0) - for item := it.NextItem(); item != nil; item = it.NextItem() { - actual = append(actual, item) - } - expected := []*NodePairIteratorItem{ - { - NodeA: entries[0], - NodeB: nil, - }, - { - NodeA: entries[1], - NodeB: entries[1], - }, - { - NodeA: nil, - NodeB: entries[2], - }, - } - assert.Equal(t, expected, actual) -} - func TestNodeTypeIterator(t *testing.T) { const nodeTypeALabel = "a" const nodeTypeBLabel = "b" diff --git a/internal/scheduler/preempting_queue_scheduler.go b/internal/scheduler/preempting_queue_scheduler.go index 695e728957b..fc234397fa4 100644 --- a/internal/scheduler/preempting_queue_scheduler.go +++ b/internal/scheduler/preempting_queue_scheduler.go @@ -44,8 +44,6 @@ type PreemptingQueueScheduler struct { gangIdByJobId map[string]string // If true, the unsuccessfulSchedulingKeys check of gangScheduler is omitted. skipUnsuccessfulSchedulingKeyCheck bool - // If true, asserts that the nodeDb state is consistent with expected changes. - enableAssertions bool } func NewPreemptingQueueScheduler( @@ -85,10 +83,6 @@ func NewPreemptingQueueScheduler( } } -func (sch *PreemptingQueueScheduler) EnableAssertions() { - sch.enableAssertions = true -} - func (sch *PreemptingQueueScheduler) SkipUnsuccessfulSchedulingKeyCheck() { sch.skipUnsuccessfulSchedulingKeyCheck = true } @@ -104,10 +98,6 @@ func (sch *PreemptingQueueScheduler) Schedule(ctx *armadacontext.Context) (*sche preemptedJobsById := make(map[string]*schedulercontext.JobSchedulingContext) scheduledJobsById := make(map[string]*schedulercontext.JobSchedulingContext) - // NodeDb snapshot prior to making any changes. - // We compare against this snapshot after scheduling to detect changes. - snapshot := sch.nodeDb.Txn(false) - // Evict preemptible jobs. ctx.WithField("stage", "scheduling-algo").Infof("Evicting preemptible jobs") evictorResult, inMemoryJobRepo, err := sch.evict( @@ -240,19 +230,6 @@ func (sch *PreemptingQueueScheduler) Schedule(ctx *armadacontext.Context) (*sche schedulercontext.PrintJobSummary(ctx, "Scheduling new jobs;", scheduledJobs) // TODO: Show failed jobs. - if sch.enableAssertions { - ctx.WithField("stage", "scheduling-algo").Infof("Performing assertions after scheduling round") - err := sch.assertions( - snapshot, - preemptedJobsById, - scheduledJobsById, - sch.nodeIdByJobId, - ) - if err != nil { - return nil, err - } - ctx.WithField("stage", "scheduling-algo").Infof("Finished running assertions after scheduling round") - } return &schedulerresult.SchedulerResult{ PreemptedJobs: preemptedJobs, ScheduledJobs: scheduledJobs, @@ -612,78 +589,6 @@ func (sch *PreemptingQueueScheduler) updateGangAccounting(preempted []*scheduler return nil } -// For each node in the NodeDb, compare assigned jobs relative to the initial snapshot. -// Jobs no longer assigned to a node are preemtped. -// Jobs assigned to a node that weren't earlier are scheduled. -// -// Compare the nodedb.NodeJobDiff with expected preempted/scheduled jobs to ensure NodeDb is consistent. -// This is only to validate that nothing unexpected happened during scheduling. -func (sch *PreemptingQueueScheduler) assertions( - snapshot *memdb.Txn, - preemptedJobsById map[string]*schedulercontext.JobSchedulingContext, - scheduledJobsById map[string]*schedulercontext.JobSchedulingContext, - nodeIdByJobId map[string]string, -) error { - // Compare two snapshots of the nodeDb to find jobs that - // were preempted/scheduled between creating the snapshots. - preempted, scheduled, err := nodedb.NodeJobDiff(snapshot, sch.nodeDb.Txn(false)) - if err != nil { - return err - } - - // Assert that jobs we expect to be preempted/scheduled are marked as such in the nodeDb. - for jobId := range preemptedJobsById { - if _, ok := preempted[jobId]; !ok { - return errors.Errorf("inconsistent NodeDb: expected job %s to be preempted in nodeDb", jobId) - } - } - for jobId := range scheduledJobsById { - if _, ok := scheduled[jobId]; !ok { - return errors.Errorf("inconsistent NodeDb: expected job %s to be scheduled in nodeDb", jobId) - } - } - - // Assert that jobs marked as preempted (scheduled) in the nodeDb are expected to be preempted (scheduled), - // and that jobs are preempted/scheduled on the nodes we expect them to. - for jobId, node := range preempted { - if expectedNodeId, ok := nodeIdByJobId[jobId]; ok { - if expectedNodeId != node.GetId() { - return errors.Errorf( - "inconsistent NodeDb: expected job %s to be preempted from node %s, but got %s", - jobId, expectedNodeId, node.GetId(), - ) - } - } else { - return errors.Errorf( - "inconsistent NodeDb: expected job %s to be mapped to node %s, but found none", - jobId, node.GetId(), - ) - } - if _, ok := preemptedJobsById[jobId]; !ok { - return errors.Errorf("inconsistent NodeDb: didn't expect job %s to be preempted (job marked as preempted in NodeDb)", jobId) - } - } - for jobId, node := range scheduled { - if expectedNodeId, ok := nodeIdByJobId[jobId]; ok { - if expectedNodeId != node.GetId() { - return errors.Errorf( - "inconsistent NodeDb: expected job %s to be on node %s, but got %s", - jobId, expectedNodeId, node.GetId(), - ) - } - } else { - return errors.Errorf( - "inconsistent NodeDb: expected job %s to be mapped to node %s, but found none", - jobId, node.GetId(), - ) - } - if _, ok := scheduledJobsById[jobId]; !ok { - return errors.Errorf("inconsistent NodeDb: didn't expect job %s to be scheduled (job marked as scheduled in NodeDb)", jobId) - } - } - return nil -} - type Evictor struct { jobRepo JobRepository nodeDb *nodedb.NodeDb diff --git a/internal/scheduler/preempting_queue_scheduler_test.go b/internal/scheduler/preempting_queue_scheduler_test.go index c738b0c1512..f7acf329ca3 100644 --- a/internal/scheduler/preempting_queue_scheduler_test.go +++ b/internal/scheduler/preempting_queue_scheduler_test.go @@ -1868,7 +1868,6 @@ func TestPreemptingQueueScheduler(t *testing.T) { jobIdsByGangId, gangIdByJobId, ) - sch.EnableAssertions() result, err := sch.Schedule(ctx) require.NoError(t, err) diff --git a/internal/scheduler/scheduling_algo.go b/internal/scheduler/scheduling_algo.go index 3457af313f9..4b9941a7e18 100644 --- a/internal/scheduler/scheduling_algo.go +++ b/internal/scheduler/scheduling_algo.go @@ -487,9 +487,6 @@ func (l *FairSchedulingAlgo) schedulePool( if l.schedulingConfig.AlwaysAttemptScheduling { scheduler.SkipUnsuccessfulSchedulingKeyCheck() } - if l.schedulingConfig.EnableAssertions { - scheduler.EnableAssertions() - } result, err := scheduler.Schedule(ctx) if err != nil {