Skip to content

Latest commit

 

History

History
170 lines (142 loc) · 6.47 KB

File metadata and controls

170 lines (142 loc) · 6.47 KB

第三步:实施抢占动作

抢占的最后一步是真正实施抢占动作。

	if err := PrepareCandidate(bestCandidate, pl.fh, cs, pod); err != nil {
		return "", err
	}

也细分为三个步骤。

func PrepareCandidate(c Candidate, fh framework.Handle, cs kubernetes.Interface, pod *v1.Pod) error {
	for _, victim := range c.Victims().Pods {
		if err := util.DeletePod(cs, victim); err != nil {
			klog.Errorf("Error preempting pod %v/%v: %v", victim.Namespace, victim.Name, err)
			return err
		}
        ...
}

func DeletePod(cs kubernetes.Interface, pod *v1.Pod) error {
	return cs.CoreV1().Pods(pod.Namespace).Delete(context.TODO(), pod.Name, metav1.DeleteOptions{})
}

首先,会抢占目标节点上所有的被强占的 Pod。可以看出这里直接给 Kubernetes API Server 发送 Delete 请求来对 Pod 进行抢占。

func PrepareCandidate(c Candidate, fh framework.Handle, cs kubernetes.Interface, pod *v1.Pod) error {
	for _, victim := range c.Victims().Pods {
        ...

		// If the victim is a WaitingPod, send a reject message to the PermitPlugin
		if waitingPod := fh.GetWaitingPod(victim.UID); waitingPod != nil {
			waitingPod.Reject("preempted")
		}
		fh.EventRecorder().Eventf(victim, pod, v1.EventTypeNormal, "Preempted", "Preempting", "Preempted by %v/%v on node %v",
			pod.Namespace, pod.Name, c.Name())
	}

然后,如果被抢占的 Pod 在 WaitingPod 队列中,则执行一个 Reject 动作,这样会将终止此 Pod 的等待状态并将其从 WaitingPod 队列中删除。关于 WaitingPod 可参见 延时绑定

	nominatedPods := getLowerPriorityNominatedPods(fh.PreemptHandle(), pod, c.Name())
	if err := util.ClearNominatedNodeName(cs, nominatedPods...); err != nil {
		klog.Errorf("Cannot clear 'NominatedNodeName' field: %v", err)
		// We do not return as this error is not critical.
	}

最后,是处理目标节点上其它已经抢占成功的 Pod。在当前 Pod 开始抢占调度之前,目标节点上可能已经存在其它 Pod,这些 Pod 也是经过抢占操作调度到这个节点上来的,这些抢占成功的 Pod 的 Status.NominatedNodeName 字段会设置为当前节点。现在处理的就是这些 Pod。

分成两个步骤:

func getLowerPriorityNominatedPods(pn framework.PodNominator, pod *v1.Pod, nodeName string) []*v1.Pod {
	pods := pn.NominatedPodsForNode(nodeName)

	if len(pods) == 0 {
		return nil
	}

	var lowerPriorityPods []*v1.Pod
	podPriority := corev1helpers.PodPriority(pod)
	for _, p := range pods {
		if corev1helpers.PodPriority(p) < podPriority {
			lowerPriorityPods = append(lowerPriorityPods, p)
		}
	}
	return lowerPriorityPods
}

第一步,使用 getLowerPriorityNominatedPods() 找出优先级比当前 Pod 低、同时是通过抢占操作调度到当前节点的 Pod。

func ClearNominatedNodeName(cs kubernetes.Interface, pods ...*v1.Pod) utilerrors.Aggregate {
	var errs []error
	for _, p := range pods {
		if len(p.Status.NominatedNodeName) == 0 {
			continue
		}
		podCopy := p.DeepCopy()
		podCopy.Status.NominatedNodeName = ""
		if err := PatchPod(cs, p, podCopy); err != nil {
			errs = append(errs, err)
		}
	}
	return utilerrors.NewAggregate(errs)
}

第二步,清除这些 Pod 的 Status.NominatedNodeName 字段。这个操作会导致这些 Pod 重新加入到 Kubernetes Scheduler 的调度队列中进行重新调度。

截止目前为止,抢占插件所有的动作都已经执行完毕。

由于当前 Pod 是通过抢占操作调度到节点上,因此也需要更新它的 Status.NominatedNodeName 字段,这个操作在 PostFilter 扩展点 执行完毕之后。

	scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, fwk, state, pod)
	if err != nil {
		nominatedNode := ""
		if fitError, ok := err.(*core.FitError); ok {
			if !fwk.HasPostFilterPlugins() {
				klog.V(3).Infof("No PostFilter plugins are registered, so no preemption will be performed.")
			} else {
				// Run PostFilter plugins to try to make the pod schedulable in a future scheduling cycle.
				result, status := fwk.RunPostFilterPlugins(ctx, state, pod, fitError.FilteredNodesStatuses)
                ...
				if status.IsSuccess() && result != nil {
					nominatedNode = result.NominatedNodeName
				}
			}
            ...
		}
		sched.recordSchedulingFailure(fwk, podInfo, err, v1.PodReasonUnschedulable, nominatedNode)
		return
	}

这里最后的 recordSchedulingFailure() 中包含了这个操作。

func (sched *Scheduler) recordSchedulingFailure(fwk framework.Framework, podInfo *framework.QueuedPodInfo, err error, reason string, nominatedNode string) {
	sched.Error(podInfo, err)

	// Update the scheduling queue with the nominated pod information. Without
	// this, there would be a race condition between the next scheduling cycle
	// and the time the scheduler receives a Pod Update for the nominated pod.
	// Here we check for nil only for tests.
	if sched.SchedulingQueue != nil {
		sched.SchedulingQueue.AddNominatedPod(podInfo.Pod, nominatedNode)
	}

	pod := podInfo.Pod
	fwk.EventRecorder().Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", err.Error())
	if err := updatePod(sched.client, pod, &v1.PodCondition{
		Type:    v1.PodScheduled,
		Status:  v1.ConditionFalse,
		Reason:  reason,
		Message: err.Error(),
	}, nominatedNode); err != nil {
		klog.Errorf("Error updating pod %s/%s: %v", pod.Namespace, pod.Name, err)
	}
}

首先调用 AddNominatedPod() 将当前 Pod 加入到已完成抢占的 Map 中。然后调用 updatePod() 将当前 Pod 的 Status.NominatedNodeName 字段更新为当前节点的名称。

func updatePod(client clientset.Interface, pod *v1.Pod, condition *v1.PodCondition, nominatedNode string) error {
	klog.V(3).Infof("Updating pod condition for %s/%s to (%s==%s, Reason=%s)", pod.Namespace, pod.Name, condition.Type, condition.Status, condition.Reason)
	podCopy := pod.DeepCopy()
	// NominatedNodeName is updated only if we are trying to set it, and the value is
	// different from the existing one.
	if !podutil.UpdatePodCondition(&podCopy.Status, condition) &&
		(len(nominatedNode) == 0 || pod.Status.NominatedNodeName == nominatedNode) {
		return nil
	}
	if nominatedNode != "" {
		podCopy.Status.NominatedNodeName = nominatedNode
	}
	return util.PatchPod(client, pod, podCopy)
}

至此,所有的 PostFilter 扩展点以及核心的抢占插件的执行过程就全部分析完毕。