Skip to content

Commit

Permalink
Implementing cancel/preempt rpcs on queue service
Browse files Browse the repository at this point in the history
  • Loading branch information
mustafai-gr committed Nov 7, 2024
1 parent 0f87b66 commit 4edccef
Show file tree
Hide file tree
Showing 11 changed files with 2,252 additions and 242 deletions.
120 changes: 120 additions & 0 deletions internal/scheduler/database/query.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

69 changes: 69 additions & 0 deletions internal/scheduleringester/dbops.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"golang.org/x/exp/maps"

schedulerdb "github.com/armadaproject/armada/internal/scheduler/database"
"github.com/armadaproject/armada/pkg/controlplaneevents"
)

type Operation int
Expand Down Expand Up @@ -87,6 +88,18 @@ type CancelOnExecutor struct {
PriorityClasses []string
}

type PreemptOnQueue struct {
Name string
PriorityClasses []string
JobStates []controlplaneevents.ActiveJobState
}

type CancelOnQueue struct {
Name string
PriorityClasses []string
JobStates []controlplaneevents.ActiveJobState
}

// DbOperation captures a generic batch database operation.
//
// There are 5 types of operations:
Expand Down Expand Up @@ -185,6 +198,8 @@ type (
DeleteExecutorSettings map[string]*ExecutorSettingsDelete
PreemptExecutor map[string]*PreemptOnExecutor
CancelExecutor map[string]*CancelOnExecutor
PreemptQueue map[string]*PreemptOnQueue
CancelQueue map[string]*CancelOnQueue
)

type jobSetOperation interface {
Expand Down Expand Up @@ -337,6 +352,14 @@ func (ce CancelExecutor) Merge(_ DbOperation) bool {
return false
}

func (pq PreemptQueue) Merge(_ DbOperation) bool {
return false
}

func (cq CancelQueue) Merge(_ DbOperation) bool {
return false
}

// MergeInMap merges an op b into a, provided that b is of the same type as a.
// For example, if a is of type MarkJobSetsCancelRequested, b is only merged if also of type MarkJobSetsCancelRequested.
// Returns true if the ops were merged and false otherwise.
Expand Down Expand Up @@ -536,6 +559,30 @@ func (ce CancelExecutor) CanBeAppliedBefore(b DbOperation) bool {
return true
}

func (pq PreemptQueue) CanBeAppliedBefore(b DbOperation) bool {
switch op := b.(type) {
case queueOperation:
for queue := range pq {
if affectsQueue := op.affectsQueue(queue); affectsQueue {
return false
}
}
}
return true
}

func (cq CancelQueue) CanBeAppliedBefore(b DbOperation) bool {
switch op := b.(type) {
case queueOperation:
for queue := range cq {
if affectsQueue := op.affectsQueue(queue); affectsQueue {
return false
}
}
}
return true
}

// definesJobInSet returns true if b is an InsertJobs operation
// that inserts at least one job in any of the job sets that make
// up the keys of a.
Expand Down Expand Up @@ -684,6 +731,14 @@ func (ce CancelExecutor) GetOperation() Operation {
return ControlPlaneOperation
}

func (pq PreemptQueue) GetOperation() Operation {
return ControlPlaneOperation
}

func (cq CancelQueue) GetOperation() Operation {
return ControlPlaneOperation
}

type executorOperation interface {
affectsExecutor(string) bool
}
Expand All @@ -707,3 +762,17 @@ func (ce CancelExecutor) affectsExecutor(executor string) bool {
_, ok := ce[executor]
return ok
}

type queueOperation interface {
affectsQueue(string) bool
}

func (pq PreemptQueue) affectsQueue(queue string) bool {
_, ok := pq[queue]
return ok
}

func (cq CancelQueue) affectsQueue(queue string) bool {
_, ok := cq[queue]
return ok
}
28 changes: 28 additions & 0 deletions internal/scheduleringester/instructions.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,10 @@ func (c *ControlPlaneEventsInstructionConverter) dbOperationFromControlPlaneEven
operations, err = c.handlePreemptOnExecutor(event.GetPreemptOnExecutor())
case *controlplaneevents.Event_CancelOnExecutor:
operations, err = c.handleCancelOnExecutor(event.GetCancelOnExecutor())
case *controlplaneevents.Event_PreemptOnQueue:
operations, err = c.handlePreemptOnQueue(event.GetPreemptOnQueue())
case *controlplaneevents.Event_CancelOnQueue:
operations, err = c.handleCancelOnQueue(event.GetCancelOnQueue())
default:
log.Errorf("Unknown event of type %T", ev)
}
Expand Down Expand Up @@ -473,6 +477,30 @@ func (c *ControlPlaneEventsInstructionConverter) handleCancelOnExecutor(cancel *
}, nil
}

func (c *ControlPlaneEventsInstructionConverter) handlePreemptOnQueue(preempt *controlplaneevents.PreemptOnQueue) ([]DbOperation, error) {
return []DbOperation{
PreemptQueue{
preempt.Name: {
Name: preempt.Name,
PriorityClasses: preempt.PriorityClasses,
JobStates: preempt.JobStates,
},
},
}, nil
}

func (c *ControlPlaneEventsInstructionConverter) handleCancelOnQueue(cancel *controlplaneevents.CancelOnQueue) ([]DbOperation, error) {
return []DbOperation{
CancelQueue{
cancel.Name: {
Name: cancel.Name,
PriorityClasses: cancel.PriorityClasses,
JobStates: cancel.JobStates,
},
},
}, nil
}

// schedulingInfoFromSubmitJob returns a minimal representation of a job containing only the info needed by the scheduler.
func (c *JobSetEventsInstructionConverter) schedulingInfoFromSubmitJob(submitJob *armadaevents.SubmitJob, submitTime time.Time) (*schedulerobjects.JobSchedulingInfo, error) {
return SchedulingInfoFromSubmitJob(submitJob, submitTime)
Expand Down
48 changes: 48 additions & 0 deletions internal/scheduleringester/schedulerdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,54 @@ func (s *SchedulerDb) WriteDbOp(ctx *armadacontext.Context, tx pgx.Tx, op DbOper
}
}
}
case CancelQueue:
for _, cancelRequest := range o {
jobs, err := queries.SelectAllJobsByQueueAndJobState(ctx, cancelRequest.Name, cancelRequest.JobStates)
if err != nil {
return errors.Wrapf(err, "error cancelling jobs by queue, job state and priority class")
}
inPriorityClasses := jobInPriorityClasses(cancelRequest.PriorityClasses)
jobsToCancel := make([]schedulerdb.Job, 0)
for _, job := range jobs {
ok, err := inPriorityClasses(job)
if err != nil {
return errors.Wrapf(err, "error cancelling jobs by queue, job state and priority class")
}
if ok {
jobsToCancel = append(jobsToCancel, job)
}
}
for _, requestCancelParams := range createMarkJobsCancelRequestedByIdParams(jobsToCancel) {
err = queries.MarkJobsCancelRequestedById(ctx, *requestCancelParams)
if err != nil {
return errors.Wrapf(err, "error cancelling jobs by queue, job state and priority class")
}
}
}
case PreemptQueue:
for _, preemptRequest := range o {
jobs, err := queries.SelectAllJobsByQueueAndJobState(ctx, preemptRequest.Name, preemptRequest.JobStates)
if err != nil {
return errors.Wrapf(err, "error preempting jobs by queue, job state and priority class")
}
inPriorityClasses := jobInPriorityClasses(preemptRequest.PriorityClasses)
jobsToPreempt := make([]schedulerdb.Job, 0)
for _, job := range jobs {
ok, err := inPriorityClasses(job)
if err != nil {
return errors.Wrapf(err, "error preempting jobs by queue, job state and priority class")
}
if ok {
jobsToPreempt = append(jobsToPreempt, job)
}
}
for _, requestPreemptParams := range createMarkJobRunsPreemptRequestedByJobIdParams(jobsToPreempt) {
err = queries.MarkJobRunsPreemptRequestedByJobId(ctx, *requestPreemptParams)
if err != nil {
return errors.Wrapf(err, "error preempting jobs by queue, job state and priority class")
}
}
}
default:
return errors.Errorf("received unexpected op %+v", op)
}
Expand Down
Loading

0 comments on commit 4edccef

Please sign in to comment.