Original Recommendation In-depth Analysis of KubernetesSch


Author: xidianwangtao@gmail.com

(located) at Kubernetes 1.8 preemptive scheduling Preemption source code analysis In several places we mentioned NominatedPods and did not give enough analysis at that time, so today we will focus on the significance and rationale of NominatedPods.

What are NominatedPods?

When the enable PodPriority feature gate is enabled, the scheduler will grab the resources of the lower priority Pods (which become victims) for the preemptor when the cluster resources are running low, and then the preemptor will go into the scheduling queue again and wait for the next graceful termination of the victims and the next scheduling.

In order to try to avoid that the scheduler in the time period between the preemptor seizing resources and actually performing the scheduling again can sense that those resources have been seized and consider those resources already seized when the scheduler schedules other higher priority Pods, so in the seizure phase, for setting the preemptorpod.Status.NominatedNodeName , indicates that a preemption has occurred on NominatedNodeName and the preemptor expects dispatch on that node.

The PriorityQueue caches NominatedPods on each node, which represent Pods that have been nominated by that node and are expected to be scheduled on that node, but have not yet been successfully scheduled over.

What happens when you seize dispatch?

Let's focus on the processes associated with scheduler when it does preempt.

func (sched *Scheduler) preempt(preemptor *v1.Pod, scheduleErr error) (string, error) {
	...
	node, victims, nominatedPodsToClear, err := sched.config.Algorithm.Preempt(preemptor, sched.config.NodeLister, scheduleErr)
	...
	var nodeName = ""
	if node != nil {
		nodeName = node.Name
		err = sched.config.PodPreemptor.SetNominatedNodeName(preemptor, nodeName)
		if err != nil {
			glog.Errorf("Error in preemption process. Cannot update pod %v annotations: %v", preemptor.Name, err)
			return "", err
		}
		...
	}
	// Clearing nominated pods should happen outside of "if node != nil". Node could
	// be nil when a pod with nominated node name is eligible to preempt again,
	// but preemption logic does not find any node for it. In that case Preempt()
	// function of generic_scheduler.go returns the pod itself for removal of the annotation.
	for _, p := range nominatedPodsToClear {
		rErr := sched.config.PodPreemptor.RemoveNominatedNodeName(p)
		if rErr != nil {
			glog.Errorf("Cannot remove nominated node annotation of pod: %v", rErr)
			// We do not return as this error is not critical.
		}
	}
	return nodeName, err
}
  • invoke ScheduleAlgorithm.Preempt Performs a resource grab, returning the node, victims, and nominatedPodsToClear where the grab occurred.
    • node: the best node for which preemption occurs.
    • victims: pods to be deleted to release resources to the preemptor.
    • nominatedPodsToClear: those to be deleted.Status.NominatedNodeNameThe list of Pods that are Pods that belong to the nominatedPods Cache in PriorityQueue in the first place and their Pod Priority is lower than the preemptor Pod Priority, meaning that these nominatedPods are no longer suitable for scheduling to this node that was selected during the previous preemption.

    func (g *genericScheduler) Preempt(pod *v1.Pod, nodeLister algorithm.NodeLister, scheduleErr error) (*v1.Node, []*v1.Pod, []*v1.Pod, error) { ... candidateNode := pickOneNodeForPreemption(nodeToVictims) if candidateNode == nil { return nil, nil, nil, err } nominatedPods := g.getLowerPriorityNominatedPods(pod, candidateNode.Name) if nodeInfo, ok := g.cachedNodeInfoMap[candidateNode.Name]; ok { return nodeInfo.Node(), nodeToVictims[candidateNode].Pods, nominatedPods, err } return nil, nil, nil, fmt.Errorf( "preemption failed: the target node %s has been deleted from scheduler cache", candidateNode.Name) } func (g *genericScheduler) getLowerPriorityNominatedPods(pod *v1.Pod, nodeName string) []*v1.Pod { pods := g.schedulingQueue.WaitingPodsForNode(nodeName) if len(pods) == 0 { return nil } var lowerPriorityPods []*v1.Pod podPriority := util.GetPodPriority(pod) for _, p := range pods { if util.GetPodPriority(p) < podPriority { lowerPriorityPods = append(lowerPriorityPods, p) } } return lowerPriorityPods }

  • If the seizure is successful (node is non-empty), callpodPreemptor.SetNominatedNodeName Set the preemptor's.Status.NominatedNodeName for thatnode name, indicates that thepreemptor The expectation is to seize the opportunity to be in thatnode upper。 func (p *podPreemptor) SetNominatedNodeName(pod *v1.Pod, nominatedNodeName string) error { podCopy := pod.DeepCopy() podCopy.Status.NominatedNodeName = nominatedNodeName _, err := p.Client.CoreV1().Pods(pod.Namespace).UpdateStatus(podCopy) return err }
  • Whether or not the seizure is successful (whether or not the node is empty), nominatedPodsToClear may not be empty, and all Pods within nominatedPodsToClear need to be traversed, callingpodPreemptor.RemoveNominatedNodeName will.Status.NominatedNodeName Set to empty。 func (p *podPreemptor) RemoveNominatedNodeName(pod *v1.Pod) error { if len(pod.Status.NominatedNodeName) == 0 { return nil } return p.SetNominatedNodeName(pod, "") }

What happened after Preemptor grabbed it successfully?

After a successful Premmptor seizure, the Pod is added again to the Unschedulable Sub-Queue queue in PriorityQueue and waits for conditions to depart for scheduling again. For a more in-depth explanation of this section, please refer to my blog In-depth analysis of Kubernetes Scheduler's priority queue . The preemptor will again process the predicate logic on the node via podFitsOnNode.

func podFitsOnNode(
	pod *v1.Pod,
	meta algorithm.PredicateMetadata,
	info *schedulercache.NodeInfo,
	predicateFuncs map[string]algorithm.FitPredicate,
	ecache *EquivalenceCache,
	queue SchedulingQueue,
	alwaysCheckAllPredicates bool,
	equivCacheInfo *equivalenceClassInfo,
) (bool, []algorithm.PredicateFailureReason, error) {
	var (
		eCacheAvailable  bool
		failedPredicates []algorithm.PredicateFailureReason
	)
	predicateResults := make(map[string]HostPredicate)

	podsAdded := false
	
	for i := 0; i < 2; i++ {
		metaToUse := meta
		nodeInfoToUse := info
		if i == 0 {
			podsAdded, metaToUse, nodeInfoToUse = addNominatedPods(util.GetPodPriority(pod), meta, info, queue)
		} else if !podsAdded || len(failedPredicates) != 0 {  //  Something's wrong, right?? It should be.podsAdded, rather than!podsAdded
			break
		}
		// Bypass eCache if node has any nominated pods.
		// TODO(bsalamat): consider using eCache and adding proper eCache invalidations
		// when pods are nominated or their nominations change.
		eCacheAvailable = equivCacheInfo != nil && !podsAdded
		for _, predicateKey := range predicates.Ordering() {
			var (
				fit     bool
				reasons []algorithm.PredicateFailureReason
				err     error
			)
			
				func() {
					var invalid bool
					if eCacheAvailable {
						...
					}

					if !eCacheAvailable || invalid {
						// we need to execute predicate functions since equivalence cache does not work
						fit, reasons, err = predicate(pod, metaToUse, nodeInfoToUse)
						if err != nil {
							return
						}

						...
					}
				}()

				...
			}
		}
	}

	return len(failedPredicates) == 0, failedPredicates, nil
}

A total of two predicate attempts will be made.

  • The first time the predicate is called, theaddNominatedPods , iterate through all Pods in PriorityQueue nominatedPods, add all nominatedPods whose PodPriority is greater than or equal to the priority of that scheduled Pod to the NodeInfo of the SchedulerCache, meaning that these high priority nominatedPods should be considered for preselection when scheduling that pod, e.g. to subtract their resourceRequest, etc., and update to the PredicateMetadata, then execute the normal predicate logic.
  • On the second predicate, if there is a failure in the previous predicate logic, or if the previous podsAdded is false (if inaddNominatedPods When the node corresponding to the nominatedPods cache is found to be empty, then the return value podAdded is false), then the second predicate ends immediately and does not trigger the real predicate logic.
  • On the second predicate, if all the previous predicate logic succeeds and podAdded is true, then the real second predicate logic needs to be triggered because the success of the addition of nominatedPods and possibly Inter-Pod Affinity will affect the predicate result.

Here is the code for addNominatedPods, which is responsible for generating a temporary schedulercache. NodeInfo and algorithm.PredicateMetadata, provided to the specific predicate Function for preselection processing.

// addNominatedPods adds pods with equal or greater priority which are nominated
// to run on the node given in nodeInfo to meta and nodeInfo. It returns 1) whether
// any pod was found, 2) augmented meta data, 3) augmented nodeInfo.
func addNominatedPods(podPriority int32, meta algorithm.PredicateMetadata,
	nodeInfo *schedulercache.NodeInfo, queue SchedulingQueue) (bool, algorithm.PredicateMetadata,
	*schedulercache.NodeInfo) {
	if queue == nil || nodeInfo == nil || nodeInfo.Node() == nil {
		// This may happen only in tests.
		return false, meta, nodeInfo
	}
	nominatedPods := queue.WaitingPodsForNode(nodeInfo.Node().Name)
	if nominatedPods == nil || len(nominatedPods) == 0 {
		return false, meta, nodeInfo
	}
	var metaOut algorithm.PredicateMetadata
	if meta != nil {
		metaOut = meta.ShallowCopy()
	}
	nodeInfoOut := nodeInfo.Clone()
	for _, p := range nominatedPods {
		if util.GetPodPriority(p) >= podPriority {
			nodeInfoOut.AddPod(p)
			if metaOut != nil {
				metaOut.AddPod(p, nodeInfoOut)
			}
		}
	}
	return true, metaOut, nodeInfoOut
}

// WaitingPodsForNode returns pods that are nominated to run on the given node,
// but they are waiting for other pods to be removed from the node before they
// can be actually scheduled.
func (p *PriorityQueue) WaitingPodsForNode(nodeName string) []*v1.Pod {
	p.lock.RLock()
	defer p.lock.RUnlock()
	if list, ok := p.nominatedPods[nodeName]; ok {
		return list
	}
	return nil
}

The logic of addNominatedPods is as follows.

  • invokeWaitingPodsForNode gainPriorityQueue in thatnode upper ofnominatedPods cache data, in casenominatedPods empty, returnpodAdded because offalse,addNominatedPods End of process。
  • Clone out the PredicateMeta and NodeInfo objects, iterate through the nominatedPods, add the nominated pods with priority no lower than the pod to be dispatched to the cloned out NodeInfo object one by one, and update them to the cloned out PredicateMeta object. These cloned NodeInfo and PredicateMeta objects are eventually passed into the predicate Functions for preselection processing. When the traversal is complete, podAdded (true) and the NodeInfo and PredicateMeta objects are returned.

How to Maintain PriorityQueue NominatedPods Cache

In-depth analysis of Kubernetes Scheduler's priority queue The operation of PriorityQueue in the EventHandler registered with podInformer, nodeInformer, serviceInformer, pvcInformer, etc. in the scheduler is analyzed in the following EventHandler related to NominatedPods.

Add Pod to PriorityQueue

  • When a Pod is added to the active queue in PriorityQueue, addNominatedPodIfNeeded is called accordingly to remove the pod to be added from the PriorityQueue nominatedPods Cache first, and then re-add it to the nominatedPods cache after it is removed.
// Add adds a pod to the active queue. It should be called only when a new pod
// is added so there is no chance the pod is already in either queue.
func (p *PriorityQueue) Add(pod *v1.Pod) error {
	p.lock.Lock()
	defer p.lock.Unlock()
	err := p.activeQ.Add(pod)
	if err != nil {
		glog.Errorf("Error adding pod %v to the scheduling queue: %v", pod.Name, err)
	} else {
		if p.unschedulableQ.get(pod) != nil {
			glog.Errorf("Error: pod %v is already in the unschedulable queue.", pod.Name)
			p.deleteNominatedPodIfExists(pod)
			p.unschedulableQ.delete(pod)
		}
		p.addNominatedPodIfNeeded(pod)
		p.cond.Broadcast()
	}
	return err
}

func (p *PriorityQueue) addNominatedPodIfNeeded(pod *v1.Pod) {
	nnn := NominatedNodeName(pod)
	if len(nnn) > 0 {
		for _, np := range p.nominatedPods[nnn] {
			if np.UID == pod.UID {
				glog.Errorf("Pod %v/%v already exists in the nominated map!", pod.Namespace, pod.Name)
				return
			}
		}
		p.nominatedPods[nnn] = append(p.nominatedPods[nnn], pod)
	}
}
  • When adding a Pod to the unSchedulableQ queue in PriorityQueue, addNominatedPodIfNeeded is called accordingly to add/update the pod to be added to the PriorityQueue nominatedPods Cache.
func (p *PriorityQueue) AddUnschedulableIfNotPresent(pod *v1.Pod) error {
	p.lock.Lock()
	defer p.lock.Unlock()
	if p.unschedulableQ.get(pod) != nil {
		return fmt.Errorf("pod is already present in unschedulableQ")
	}
	if _, exists, _ := p.activeQ.Get(pod); exists {
		return fmt.Errorf("pod is already present in the activeQ")
	}
	if !p.receivedMoveRequest && isPodUnschedulable(pod) {
		p.unschedulableQ.addOrUpdate(pod)
		p.addNominatedPodIfNeeded(pod)
		return nil
	}
	err := p.activeQ.Add(pod)
	if err == nil {
		p.addNominatedPodIfNeeded(pod)
		p.cond.Broadcast()
	}
	return err
}

Note that adding a pod to the nominatedPods cache assumes that the pod's.Status.NominatedNodeName Not for empty.

Update Pod in PriorityQueue

When updating a Pod in PriorityQueue, it will then call updateNominatedPod to update the nominatedPods Cache in PriorityQueue.

// Update updates a pod in the active queue if present. Otherwise, it removes
// the item from the unschedulable queue and adds the updated one to the active
// queue.
func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error {
	p.lock.Lock()
	defer p.lock.Unlock()
	// If the pod is already in the active queue, just update it there.
	if _, exists, _ := p.activeQ.Get(newPod); exists {
		p.updateNominatedPod(oldPod, newPod)
		err := p.activeQ.Update(newPod)
		return err
	}
	// If the pod is in the unschedulable queue, updating it may make it schedulable.
	if usPod := p.unschedulableQ.get(newPod); usPod != nil {
		p.updateNominatedPod(oldPod, newPod)
		if isPodUpdated(oldPod, newPod) {
			p.unschedulableQ.delete(usPod)
			err := p.activeQ.Add(newPod)
			if err == nil {
				p.cond.Broadcast()
			}
			return err
		}
		p.unschedulableQ.addOrUpdate(newPod)
		return nil
	}
	// If pod is not in any of the two queue, we put it in the active queue.
	err := p.activeQ.Add(newPod)
	if err == nil {
		p.addNominatedPodIfNeeded(newPod)
		p.cond.Broadcast()
	}
	return err
}

updateNominatedPod updatePriorityQueue nominatedPods Cache logic is: delete oldPod first, then add newPod into it.

// updateNominatedPod updates a pod in the nominatedPods.
func (p *PriorityQueue) updateNominatedPod(oldPod, newPod *v1.Pod) {
	// Even if the nominated node name of the Pod is not changed, we must delete and add it again
	// to ensure that its pointer is updated.
	p.deleteNominatedPodIfExists(oldPod)
	p.addNominatedPodIfNeeded(newPod)
}

Delete Pod from PriorityQueue

When deleting a Pod from PriorityQueue before deleting it, deleteNominatedPodIfExists is called to delete the pod from the PriorityQueue nominatedPods cache.

// Delete deletes the item from either of the two queues. It assumes the pod is
// only in one queue.
func (p *PriorityQueue) Delete(pod *v1.Pod) error {
	p.lock.Lock()
	defer p.lock.Unlock()
	p.deleteNominatedPodIfExists(pod)
	err := p.activeQ.Delete(pod)
	if err != nil { // The item was probably not found in the activeQ.
		p.unschedulableQ.delete(pod)
	}
	return nil
}

When deletingNominatedPodIfExists, first check the pod's.Status.NominatedNodeName Is it empty.

  • If it is empty, no action is taken and the process is ended by a direct RETURN.
  • If it is not empty, it traverses the nominatedPods cache and once it finds a pod with a matching UID, it indicates that the pod exists in nominatedPods and is then removed from the cache. If, after deletion, it is found that there are no more nominatedPods on the NominatedNode corresponding to that pod, the entire node of nominatedPods is removed from the map cache.
func (p *PriorityQueue) deleteNominatedPodIfExists(pod *v1.Pod) {
	nnn := NominatedNodeName(pod)
	if len(nnn) > 0 {
		for i, np := range p.nominatedPods[nnn] {
			if np.UID == pod.UID {
				p.nominatedPods[nnn] = append(p.nominatedPods[nnn][:i], p.nominatedPods[nnn][i+1:]...)
				if len(p.nominatedPods[nnn]) == 0 {
					delete(p.nominatedPods, nnn)
				}
				break
			}
		}
	}
}

conclude

In this paper, we explain the role of NominatedPods and NominatedNode, and analyze what change operations NominatedPods have during and after preemption scheduling from the source code perspective, and finally analyze the impact of Add/Update/Delete operations of Pods in PriorityQueue on PriorityQueue NominatedPods Cache, hoping to help readers deepen their understanding of scheduler preemption scheduling and priority queues.


Recommended>>
1、Startron Vision Airdoc Dr Chunyu and five other parties sign alliance to create AI vision health service chain Titanium Express
2、Polygon stable box monthly card 25
3、Made in China The worlds first manned flying machine to fly home for Chinese New Year from now on
4、Internet safety education in our school
5、Science and technology lead the way in creating new dynamics to support economic development

    已推荐到看一看 和朋友分享想法
    最多200字,当前共 发送

    已发送

    朋友将在看一看看到

    确定
    分享你的想法...
    取消

    分享想法到看一看

    确定
    最多200字,当前共

    发送中

    网络异常,请稍后重试

    微信扫一扫
    关注该公众号