Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove unused queues #605

Merged
merged 5 commits into from
Aug 25, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 28 additions & 39 deletions pkg/controller/queuejob/queuejob_controller_ex.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,10 @@ type XController struct {

// QueueJobs that need to be initialized
// Add labels and selectors to AppWrapper
initQueue *cache.FIFO
//initQueue *cache.FIFO

// QueueJobs that need to sync up after initialization
updateQueue *cache.FIFO
//updateQueue *cache.FIFO

// eventQueue that need to sync up
eventQueue *cache.FIFO
Expand Down Expand Up @@ -241,9 +241,9 @@ func NewJobController(config *rest.Config, serverOption *options.ServerOption) *
arbclients: clientset.NewForConfigOrDie(config),
eventQueue: cache.NewFIFO(GetQueueJobKey),
agentEventQueue: cache.NewFIFO(GetQueueJobKey),
initQueue: cache.NewFIFO(GetQueueJobKey),
updateQueue: cache.NewFIFO(GetQueueJobKey),
qjqueue: NewSchedulingQueue(),
//initQueue: cache.NewFIFO(GetQueueJobKey),
//updateQueue: cache.NewFIFO(GetQueueJobKey),
qjqueue: NewSchedulingQueue(),
//cache is turned-off, issue: https://github.com/project-codeflare/multi-cluster-app-dispatcher/issues/588
//cache: clusterstatecache.New(config),
schedulingAW: nil,
Expand Down Expand Up @@ -1262,7 +1262,7 @@ func (qjm *XController) ScheduleNext(qj *arbv1.AppWrapper) {
klog.Infof("[ScheduleNext] [Agent Mode] Blocking dispatch for app wrapper '%s/%s' due to quota limits, activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v msg=%s",
qj.Namespace, qj.Name, time.Now().Sub(HOLStartTime), qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj, qj.ResourceVersion, qj.Status, msg)
//call update etcd here to retrigger AW execution for failed quota

//TODO: quota management tests fail if this is converted into go-routine, need to inspect why?
qjm.backoff(context.Background(), qj, dispatchFailedReason, dispatchFailedMessage)

}
Expand Down Expand Up @@ -1702,31 +1702,12 @@ func (cc *XController) deleteQueueJob(obj interface{}) {
klog.Errorf("[Informer-deleteQJ] obj is not AppWrapper. obj=%+v", obj)
return
}
current_ts := metav1.NewTime(time.Now())
klog.V(10).Infof("[Informer-deleteQJ] %s *Delay=%.6f seconds before enqueue &qj=%p Version=%s Status=%+v Deletion Timestame=%+v", qj.Name, time.Now().Sub(qj.Status.ControllerFirstTimestamp.Time).Seconds(), qj, qj.ResourceVersion, qj.Status, qj.GetDeletionTimestamp())
accessor, err := meta.Accessor(qj)
if err != nil {
klog.V(10).Infof("[Informer-deleteQJ] Error obtaining the accessor for AW job: %s", qj.Name)
qj.SetDeletionTimestamp(&current_ts)
} else {
accessor.SetDeletionTimestamp(&current_ts)
}
// validate that app wraper has not been marked for deletion by the infomer's delete handler
if qj.DeletionTimestamp != nil {
klog.V(3).Infof("[Informer-deleteQJ] AW job=%s/%s set for deletion.", qj.Namespace, qj.Name)
// cleanup resources for running job, ignoring errors
if err00 := cc.Cleanup(context.Background(), qj); err00 != nil {
klog.Warningf("Failed to cleanup resources for app wrapper '%s/%s', err = %v", qj.Namespace, qj.Name, err00)
}
// empty finalizers and delete the queuejob again
if accessor, err00 := meta.Accessor(qj); err00 == nil {
accessor.SetFinalizers(nil)
}
// we delete the job from the queue if it is there, ignoring errors
cc.qjqueue.Delete(qj)
cc.eventQueue.Delete(qj)
klog.V(3).Infof("[Informer-deleteQJ] AW job=%s/%s deleted.", qj.Namespace, qj.Name)
// we delete the job from the queue if it is there, ignoring errors
if cc.serverOption.QuotaEnabled && cc.quotaManager != nil {
cc.quotaManager.Release(qj)
}
cc.qjqueue.Delete(qj)
cc.eventQueue.Delete(qj)
}

func (cc *XController) enqueue(obj interface{}) error {
Expand Down Expand Up @@ -1888,19 +1869,27 @@ func (cc *XController) worker() {
//if everything passes then CanRun is set to true and AW is ready for dispatch
if !queuejob.Status.CanRun && (queuejob.Status.State != arbv1.AppWrapperStateActive) {
cc.ScheduleNext(queuejob)
return nil
//When an AW passes ScheduleNext gate then we want to progress AW to Running to begin with
//Sync queuejob will not unwrap an AW to spawn genericItems
if queuejob.Status.CanRun {
metalcycling marked this conversation as resolved.
Show resolved Hide resolved

// errs := make(chan error, 1)
// go func() {
// errs <- cc.syncQueueJob(ctx, queuejob)
// }()

// // later:
// if err := <-errs; err != nil {
// return err
// }
if err := cc.syncQueueJob(ctx, queuejob); err != nil {
// If any error, requeue it.
return err
}

}
//When an AW passes ScheduleNext gate then we want to progress AW to Running to begin with
//Sync queuejob will not unwrap an AW to spawn genericItems
if queuejob.Status.CanRun {
if err := cc.syncQueueJob(ctx, queuejob); err != nil {
// If any error, requeue it.
return err
}

}

//asmalvan- ends

klog.V(10).Infof("[worker] Ending %s Delay=%.6f seconds &newQJ=%p Version=%s Status=%+v", queuejob.Name, time.Now().Sub(queuejob.Status.ControllerFirstTimestamp.Time).Seconds(), queuejob, queuejob.ResourceVersion, queuejob.Status)
Expand Down