Skip to content

Commit

Permalink
Better Interface To JobRepository (#3958)
Browse files Browse the repository at this point in the history
* wip

Signed-off-by: Chris Martin <chris@cmartinit.co.uk>

* wip

Signed-off-by: Chris Martin <chris@cmartinit.co.uk>

---------

Signed-off-by: Chris Martin <chris@cmartinit.co.uk>
  • Loading branch information
d80tb7 authored Sep 23, 2024
1 parent 72f71dc commit ac1f002
Show file tree
Hide file tree
Showing 9 changed files with 66 additions and 99 deletions.
7 changes: 6 additions & 1 deletion internal/scheduler/jobdb/jobdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ import (
"github.com/armadaproject/armada/internal/scheduler/schedulerobjects"
)

type JobIterator interface {
Next() (*Job, bool)
Done() bool
}

var emptyList = immutable.NewSortedSet[*Job](JobPriorityComparer{})

type JobDb struct {
Expand Down Expand Up @@ -496,7 +501,7 @@ func (txn *Txn) HasQueuedJobs(queue string) bool {
}

// QueuedJobs returns true if the queue has any jobs in the running state or false otherwise
func (txn *Txn) QueuedJobs(queue string) *immutable.SortedSetIterator[*Job] {
func (txn *Txn) QueuedJobs(queue string) JobIterator {
jobQueue, ok := txn.jobsByQueue[queue]
if ok {
return jobQueue.Iterator()
Expand Down
30 changes: 13 additions & 17 deletions internal/scheduler/jobiteration.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ import (
"github.com/armadaproject/armada/internal/scheduler/jobdb"
)

type JobIterator interface {
type JobContextIterator interface {
Next() (*schedulercontext.JobSchedulingContext, error)
}

type JobRepository interface {
GetQueueJobIds(queueName string) []string
GetExistingJobsByIds(ids []string) []*jobdb.Job
QueuedJobs(queueName string) jobdb.JobIterator
GetById(id string) *jobdb.Job
}

type InMemoryJobIterator struct {
Expand Down Expand Up @@ -97,25 +97,22 @@ func (repo *InMemoryJobRepository) GetExistingJobsByIds(jobIds []string) []*jobd
return rv
}

func (repo *InMemoryJobRepository) GetJobIterator(queue string) JobIterator {
func (repo *InMemoryJobRepository) GetJobIterator(queue string) JobContextIterator {
repo.mu.Lock()
defer repo.mu.Unlock()
return NewInMemoryJobIterator(slices.Clone(repo.jctxsByQueue[queue]))
}

// QueuedJobsIterator is an iterator over all jobs in a queue.
type QueuedJobsIterator struct {
repo JobRepository
jobIds []string
idx int
ctx *armadacontext.Context
jobIter jobdb.JobIterator
ctx *armadacontext.Context
}

func NewQueuedJobsIterator(ctx *armadacontext.Context, queue string, repo JobRepository) *QueuedJobsIterator {
return &QueuedJobsIterator{
jobIds: repo.GetQueueJobIds(queue),
repo: repo,
ctx: ctx,
jobIter: repo.QueuedJobs(queue),
ctx: ctx,
}
}

Expand All @@ -124,22 +121,21 @@ func (it *QueuedJobsIterator) Next() (*schedulercontext.JobSchedulingContext, er
case <-it.ctx.Done():
return nil, it.ctx.Err()
default:
if it.idx >= len(it.jobIds) {
job, _ := it.jobIter.Next()
if job == nil {
return nil, nil
}
job := it.repo.GetExistingJobsByIds([]string{it.jobIds[it.idx]})
it.idx++
return schedulercontext.JobSchedulingContextFromJob(job[0]), nil
return schedulercontext.JobSchedulingContextFromJob(job), nil
}
}

// MultiJobsIterator chains several JobIterators together in the order provided.
type MultiJobsIterator struct {
i int
its []JobIterator
its []JobContextIterator
}

func NewMultiJobsIterator(its ...JobIterator) *MultiJobsIterator {
func NewMultiJobsIterator(its ...JobContextIterator) *MultiJobsIterator {
return &MultiJobsIterator{
its: its,
}
Expand Down
62 changes: 30 additions & 32 deletions internal/scheduler/jobiteration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func TestMultiJobsIterator_TwoQueues(t *testing.T) {
}

ctx := armadacontext.Background()
its := make([]JobIterator, 3)
its := make([]JobContextIterator, 3)
for i, queue := range []string{"A", "B", "C"} {
it := NewQueuedJobsIterator(ctx, queue, repo)
its[i] = it
Expand Down Expand Up @@ -214,20 +214,43 @@ func TestCreateQueuedJobsIterator_NilOnEmpty(t *testing.T) {
assert.NoError(t, err)
}

// TODO: Deprecate in favour of InMemoryRepo.
type mockJobIterator struct {
jobs []*jobdb.Job
i int
}

func (iter *mockJobIterator) Done() bool {
return iter.i >= len(iter.jobs)
}

func (iter *mockJobIterator) Next() (*jobdb.Job, bool) {
if iter.Done() {
return nil, false
}
job := iter.jobs[iter.i]
iter.i++
return job, true
}

type mockJobRepository struct {
jobsByQueue map[string][]*jobdb.Job
jobsById map[string]*jobdb.Job
// Ids of all jobs hat were leased to an executor.
leasedJobs map[string]bool
getQueueJobIdsDelay time.Duration
}

func (repo *mockJobRepository) QueuedJobs(queueName string) jobdb.JobIterator {
q := repo.jobsByQueue[queueName]
return &mockJobIterator{jobs: q}
}

func (repo *mockJobRepository) GetById(id string) *jobdb.Job {
j, _ := repo.jobsById[id]
return j
}

func newMockJobRepository() *mockJobRepository {
return &mockJobRepository{
jobsByQueue: make(map[string][]*jobdb.Job),
jobsById: make(map[string]*jobdb.Job),
leasedJobs: make(map[string]bool),
}
}

Expand All @@ -242,35 +265,10 @@ func (repo *mockJobRepository) Enqueue(job *jobdb.Job) {
repo.jobsById[job.Id()] = job
}

func (repo *mockJobRepository) GetJobIterator(ctx *armadacontext.Context, queue string) JobIterator {
func (repo *mockJobRepository) GetJobIterator(ctx *armadacontext.Context, queue string) JobContextIterator {
return NewQueuedJobsIterator(ctx, queue, repo)
}

func (repo *mockJobRepository) GetQueueJobIds(queue string) []string {
time.Sleep(repo.getQueueJobIdsDelay)
if jobs, ok := repo.jobsByQueue[queue]; ok {
rv := make([]string, 0, len(jobs))
for _, job := range jobs {
if !repo.leasedJobs[job.Id()] {
rv = append(rv, job.Id())
}
}
return rv
} else {
return make([]string, 0)
}
}

func (repo *mockJobRepository) GetExistingJobsByIds(jobIds []string) []*jobdb.Job {
rv := make([]*jobdb.Job, len(jobIds))
for i, jobId := range jobIds {
if job, ok := repo.jobsById[jobId]; ok {
rv[i] = job
}
}
return rv
}

func jobFromPodSpec(queue string, req *schedulerobjects.PodRequirements) *jobdb.Job {
return testfixtures.TestJob(queue, util.ULID(), "armada-default", req)
}
11 changes: 7 additions & 4 deletions internal/scheduler/preempting_queue_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,7 @@ func addEvictedJobsToNodeDb(_ *armadacontext.Context, sctx *schedulercontext.Sch
}

func (sch *PreemptingQueueScheduler) schedule(ctx *armadacontext.Context, inMemoryJobRepo *InMemoryJobRepository, jobRepo JobRepository) (*schedulerresult.SchedulerResult, error) {
jobIteratorByQueue := make(map[string]JobIterator)
jobIteratorByQueue := make(map[string]JobContextIterator)
for _, qctx := range sch.schedulingContext.QueueSchedulingContexts {
evictedIt := inMemoryJobRepo.GetJobIterator(qctx.Queue)
if jobRepo == nil || reflect.ValueOf(jobRepo).IsNil() {
Expand Down Expand Up @@ -821,13 +821,16 @@ func (evi *Evictor) Evict(ctx *armadacontext.Context, nodeDbTxn *memdb.Txn) (*Ev
if evi.nodeFilter != nil && !evi.nodeFilter(ctx, node) {
continue
}
jobIds := make([]string, 0, len(node.AllocatedByJobId))
jobs := make([]*jobdb.Job, 0, len(node.AllocatedByJobId))
for jobId := range node.AllocatedByJobId {
if _, ok := node.EvictedJobRunIds[jobId]; !ok {
jobIds = append(jobIds, jobId)
job := evi.jobRepo.GetById(jobId)
if job != nil {
jobs = append(jobs, job)
}

}
}
jobs := evi.jobRepo.GetExistingJobsByIds(jobIds)
evictedJobs, node, err := evi.nodeDb.EvictJobsFromNode(jobFilter, jobs, node)
if err != nil {
return nil, err
Expand Down
8 changes: 4 additions & 4 deletions internal/scheduler/preempting_queue_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func TestEvictOversubscribed(t *testing.T) {
require.NoError(t, err)

evictor := NewOversubscribedEvictor(
NewSchedulerJobRepositoryAdapter(jobDbTxn),
jobDbTxn,
nodeDb)
result, err := evictor.Evict(armadacontext.Background(), nodeDbTxn)
require.NoError(t, err)
Expand Down Expand Up @@ -1862,7 +1862,7 @@ func TestPreemptingQueueScheduler(t *testing.T) {
constraints,
testfixtures.TestEmptyFloatingResources,
tc.SchedulingConfig.ProtectedFractionOfFairShare,
NewSchedulerJobRepositoryAdapter(jobDbTxn),
jobDbTxn,
nodeDb,
nodeIdByJobId,
jobIdsByGangId,
Expand Down Expand Up @@ -2209,7 +2209,7 @@ func BenchmarkPreemptingQueueScheduler(b *testing.B) {
constraints,
testfixtures.TestEmptyFloatingResources,
tc.SchedulingConfig.ProtectedFractionOfFairShare,
NewSchedulerJobRepositoryAdapter(jobDbTxn),
jobDbTxn,
nodeDb,
nil,
nil,
Expand Down Expand Up @@ -2268,7 +2268,7 @@ func BenchmarkPreemptingQueueScheduler(b *testing.B) {
constraints,
testfixtures.TestEmptyFloatingResources,
tc.SchedulingConfig.ProtectedFractionOfFairShare,
NewSchedulerJobRepositoryAdapter(jobDbTxn),
jobDbTxn,
nodeDb,
nil,
nil,
Expand Down
6 changes: 3 additions & 3 deletions internal/scheduler/queue_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func NewQueueScheduler(
constraints schedulerconstraints.SchedulingConstraints,
floatingResourceTypes *floatingresources.FloatingResourceTypes,
nodeDb *nodedb.NodeDb,
jobIteratorByQueue map[string]JobIterator,
jobIteratorByQueue map[string]JobContextIterator,
) (*QueueScheduler, error) {
for queue := range jobIteratorByQueue {
if _, ok := sctx.QueueSchedulingContexts[queue]; !ok {
Expand Down Expand Up @@ -219,7 +219,7 @@ func (sch *QueueScheduler) Schedule(ctx *armadacontext.Context) (*schedulerresul
// Jobs without gangIdAnnotation are considered gangs of cardinality 1.
type QueuedGangIterator struct {
schedulingContext *schedulercontext.SchedulingContext
queuedJobsIterator JobIterator
queuedJobsIterator JobContextIterator
// Groups jctxs by the gang they belong to.
jctxsByGangId map[string][]*schedulercontext.JobSchedulingContext
// Maximum number of jobs to look at before giving up.
Expand All @@ -231,7 +231,7 @@ type QueuedGangIterator struct {
next *schedulercontext.GangSchedulingContext
}

func NewQueuedGangIterator(sctx *schedulercontext.SchedulingContext, it JobIterator, maxLookback uint, skipKnownUnschedulableJobs bool) *QueuedGangIterator {
func NewQueuedGangIterator(sctx *schedulercontext.SchedulingContext, it JobContextIterator, maxLookback uint, skipKnownUnschedulableJobs bool) *QueuedGangIterator {
return &QueuedGangIterator{
schedulingContext: sctx,
queuedJobsIterator: it,
Expand Down
2 changes: 1 addition & 1 deletion internal/scheduler/queue_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ func TestQueueScheduler(t *testing.T) {
require.NoError(t, err)
}
constraints := schedulerconstraints.NewSchedulingConstraints("pool", tc.TotalResources, tc.SchedulingConfig, tc.Queues, map[string]bool{})
jobIteratorByQueue := make(map[string]JobIterator)
jobIteratorByQueue := make(map[string]JobContextIterator)
for _, q := range tc.Queues {
it := jobRepo.GetJobIterator(q.Name)
jobIteratorByQueue[q.Name] = it
Expand Down
37 changes: 1 addition & 36 deletions internal/scheduler/scheduling_algo.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ func (l *FairSchedulingAlgo) schedulePool(
constraints,
l.floatingResourceTypes,
l.schedulingConfig.ProtectedFractionOfFairShare,
NewSchedulerJobRepositoryAdapter(fsctx.txn),
fsctx.txn,
nodeDb,
fsctx.nodeIdByJobId,
fsctx.jobIdsByGangId,
Expand Down Expand Up @@ -528,41 +528,6 @@ func (l *FairSchedulingAlgo) schedulePool(
return result, sctx, nil
}

// SchedulerJobRepositoryAdapter allows jobDb implement the JobRepository interface.
// TODO: Pass JobDb into the scheduler instead of using this shim to convert to a JobRepo.
type SchedulerJobRepositoryAdapter struct {
txn *jobdb.Txn
}

func NewSchedulerJobRepositoryAdapter(txn *jobdb.Txn) *SchedulerJobRepositoryAdapter {
return &SchedulerJobRepositoryAdapter{
txn: txn,
}
}

// GetQueueJobIds is necessary to implement the JobRepository interface, which we need while transitioning from the old
// to new scheduler.
func (repo *SchedulerJobRepositoryAdapter) GetQueueJobIds(queue string) []string {
rv := make([]string, 0)
it := repo.txn.QueuedJobs(queue)
for v, _ := it.Next(); v != nil; v, _ = it.Next() {
rv = append(rv, v.Id())
}
return rv
}

// GetExistingJobsByIds is necessary to implement the JobRepository interface which we need while transitioning from the
// old to new scheduler.
func (repo *SchedulerJobRepositoryAdapter) GetExistingJobsByIds(ids []string) []*jobdb.Job {
rv := make([]*jobdb.Job, 0, len(ids))
for _, id := range ids {
if job := repo.txn.GetById(id); job != nil {
rv = append(rv, job)
}
}
return rv
}

// populateNodeDb adds all the nodes and jobs associated with a particular pool to the nodeDb.
func (l *FairSchedulingAlgo) populateNodeDb(nodeDb *nodedb.NodeDb, jobs []*jobdb.Job, nodes []*schedulerobjects.Node) error {
txn := nodeDb.Txn(true)
Expand Down
2 changes: 1 addition & 1 deletion internal/scheduler/simulator/simulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ func (s *Simulator) handleScheduleEvent(ctx *armadacontext.Context) error {
constraints,
nloatingResourceTypes,
s.schedulingConfig.ProtectedFractionOfFairShare,
scheduler.NewSchedulerJobRepositoryAdapter(txn),
txn,
nodeDb,
// TODO: Necessary to support partial eviction.
nil,
Expand Down

0 comments on commit ac1f002

Please sign in to comment.