Original Recommendation In-depth Analysis of KubernetesSch
Author: xidianwangtao@gmail.com
(located) at Kubernetes 1.8 preemptive scheduling Preemption source code analysis In several places we mentioned NominatedPods and did not give enough analysis at that time, so today we will focus on the significance and rationale of NominatedPods.
When the enable PodPriority feature gate is enabled, the scheduler will grab the resources of the lower priority Pods (which become victims) for the preemptor when the cluster resources are running low, and then the preemptor will go into the scheduling queue again and wait for the next graceful termination of the victims and the next scheduling.
In order to try to avoid that the scheduler in the time period between the preemptor seizing resources and actually performing the scheduling again can sense that those resources have been seized and consider those resources already seized when the scheduler schedules other higher priority Pods, so in the seizure phase, for setting the preemptorpod.Status.NominatedNodeName , indicates that a preemption has occurred on NominatedNodeName and the preemptor expects dispatch on that node.
The PriorityQueue caches NominatedPods on each node, which represent Pods that have been nominated by that node and are expected to be scheduled on that node, but have not yet been successfully scheduled over.
Let's focus on the processes associated with scheduler when it does preempt.
func (sched *Scheduler) preempt(preemptor *v1.Pod, scheduleErr error) (string, error) { ... node, victims, nominatedPodsToClear, err := sched.config.Algorithm.Preempt(preemptor, sched.config.NodeLister, scheduleErr) ... var nodeName = "" if node != nil { nodeName = node.Name err = sched.config.PodPreemptor.SetNominatedNodeName(preemptor, nodeName) if err != nil { glog.Errorf("Error in preemption process. Cannot update pod %v annotations: %v", preemptor.Name, err) return "", err } ... } // Clearing nominated pods should happen outside of "if node != nil". Node could // be nil when a pod with nominated node name is eligible to preempt again, // but preemption logic does not find any node for it. In that case Preempt() // function of generic_scheduler.go returns the pod itself for removal of the annotation. for _, p := range nominatedPodsToClear { rErr := sched.config.PodPreemptor.RemoveNominatedNodeName(p) if rErr != nil { glog.Errorf("Cannot remove nominated node annotation of pod: %v", rErr) // We do not return as this error is not critical. } } return nodeName, err }
func (g *genericScheduler) Preempt(pod *v1.Pod, nodeLister algorithm.NodeLister, scheduleErr error) (*v1.Node, []*v1.Pod, []*v1.Pod, error) { ... candidateNode := pickOneNodeForPreemption(nodeToVictims) if candidateNode == nil { return nil, nil, nil, err } nominatedPods := g.getLowerPriorityNominatedPods(pod, candidateNode.Name) if nodeInfo, ok := g.cachedNodeInfoMap[candidateNode.Name]; ok { return nodeInfo.Node(), nodeToVictims[candidateNode].Pods, nominatedPods, err } return nil, nil, nil, fmt.Errorf( "preemption failed: the target node %s has been deleted from scheduler cache", candidateNode.Name) } func (g *genericScheduler) getLowerPriorityNominatedPods(pod *v1.Pod, nodeName string) []*v1.Pod { pods := g.schedulingQueue.WaitingPodsForNode(nodeName) if len(pods) == 0 { return nil } var lowerPriorityPods []*v1.Pod podPriority := util.GetPodPriority(pod) for _, p := range pods { if util.GetPodPriority(p) < podPriority { lowerPriorityPods = append(lowerPriorityPods, p) } } return lowerPriorityPods }
After a successful Premmptor seizure, the Pod is added again to the Unschedulable Sub-Queue queue in PriorityQueue and waits for conditions to depart for scheduling again. For a more in-depth explanation of this section, please refer to my blog In-depth analysis of Kubernetes Scheduler's priority queue . The preemptor will again process the predicate logic on the node via podFitsOnNode.
func podFitsOnNode( pod *v1.Pod, meta algorithm.PredicateMetadata, info *schedulercache.NodeInfo, predicateFuncs map[string]algorithm.FitPredicate, ecache *EquivalenceCache, queue SchedulingQueue, alwaysCheckAllPredicates bool, equivCacheInfo *equivalenceClassInfo, ) (bool, []algorithm.PredicateFailureReason, error) { var ( eCacheAvailable bool failedPredicates []algorithm.PredicateFailureReason ) predicateResults := make(map[string]HostPredicate) podsAdded := false for i := 0; i < 2; i++ { metaToUse := meta nodeInfoToUse := info if i == 0 { podsAdded, metaToUse, nodeInfoToUse = addNominatedPods(util.GetPodPriority(pod), meta, info, queue) } else if !podsAdded || len(failedPredicates) != 0 { // Something's wrong, right?? It should be.podsAdded, rather than!podsAdded break } // Bypass eCache if node has any nominated pods. // TODO(bsalamat): consider using eCache and adding proper eCache invalidations // when pods are nominated or their nominations change. eCacheAvailable = equivCacheInfo != nil && !podsAdded for _, predicateKey := range predicates.Ordering() { var ( fit bool reasons []algorithm.PredicateFailureReason err error ) func() { var invalid bool if eCacheAvailable { ... } if !eCacheAvailable || invalid { // we need to execute predicate functions since equivalence cache does not work fit, reasons, err = predicate(pod, metaToUse, nodeInfoToUse) if err != nil { return } ... } }() ... } } } return len(failedPredicates) == 0, failedPredicates, nil }
A total of two predicate attempts will be made.
Here is the code for addNominatedPods, which is responsible for generating a temporary schedulercache. NodeInfo and algorithm.PredicateMetadata, provided to the specific predicate Function for preselection processing.
// addNominatedPods adds pods with equal or greater priority which are nominated // to run on the node given in nodeInfo to meta and nodeInfo. It returns 1) whether // any pod was found, 2) augmented meta data, 3) augmented nodeInfo. func addNominatedPods(podPriority int32, meta algorithm.PredicateMetadata, nodeInfo *schedulercache.NodeInfo, queue SchedulingQueue) (bool, algorithm.PredicateMetadata, *schedulercache.NodeInfo) { if queue == nil || nodeInfo == nil || nodeInfo.Node() == nil { // This may happen only in tests. return false, meta, nodeInfo } nominatedPods := queue.WaitingPodsForNode(nodeInfo.Node().Name) if nominatedPods == nil || len(nominatedPods) == 0 { return false, meta, nodeInfo } var metaOut algorithm.PredicateMetadata if meta != nil { metaOut = meta.ShallowCopy() } nodeInfoOut := nodeInfo.Clone() for _, p := range nominatedPods { if util.GetPodPriority(p) >= podPriority { nodeInfoOut.AddPod(p) if metaOut != nil { metaOut.AddPod(p, nodeInfoOut) } } } return true, metaOut, nodeInfoOut } // WaitingPodsForNode returns pods that are nominated to run on the given node, // but they are waiting for other pods to be removed from the node before they // can be actually scheduled. func (p *PriorityQueue) WaitingPodsForNode(nodeName string) []*v1.Pod { p.lock.RLock() defer p.lock.RUnlock() if list, ok := p.nominatedPods[nodeName]; ok { return list } return nil }
The logic of addNominatedPods is as follows.
In-depth analysis of Kubernetes Scheduler's priority queue The operation of PriorityQueue in the EventHandler registered with podInformer, nodeInformer, serviceInformer, pvcInformer, etc. in the scheduler is analyzed in the following EventHandler related to NominatedPods.
// Add adds a pod to the active queue. It should be called only when a new pod // is added so there is no chance the pod is already in either queue. func (p *PriorityQueue) Add(pod *v1.Pod) error { p.lock.Lock() defer p.lock.Unlock() err := p.activeQ.Add(pod) if err != nil { glog.Errorf("Error adding pod %v to the scheduling queue: %v", pod.Name, err) } else { if p.unschedulableQ.get(pod) != nil { glog.Errorf("Error: pod %v is already in the unschedulable queue.", pod.Name) p.deleteNominatedPodIfExists(pod) p.unschedulableQ.delete(pod) } p.addNominatedPodIfNeeded(pod) p.cond.Broadcast() } return err } func (p *PriorityQueue) addNominatedPodIfNeeded(pod *v1.Pod) { nnn := NominatedNodeName(pod) if len(nnn) > 0 { for _, np := range p.nominatedPods[nnn] { if np.UID == pod.UID { glog.Errorf("Pod %v/%v already exists in the nominated map!", pod.Namespace, pod.Name) return } } p.nominatedPods[nnn] = append(p.nominatedPods[nnn], pod) } }
func (p *PriorityQueue) AddUnschedulableIfNotPresent(pod *v1.Pod) error { p.lock.Lock() defer p.lock.Unlock() if p.unschedulableQ.get(pod) != nil { return fmt.Errorf("pod is already present in unschedulableQ") } if _, exists, _ := p.activeQ.Get(pod); exists { return fmt.Errorf("pod is already present in the activeQ") } if !p.receivedMoveRequest && isPodUnschedulable(pod) { p.unschedulableQ.addOrUpdate(pod) p.addNominatedPodIfNeeded(pod) return nil } err := p.activeQ.Add(pod) if err == nil { p.addNominatedPodIfNeeded(pod) p.cond.Broadcast() } return err }
Note that adding a pod to the nominatedPods cache assumes that the pod's.Status.NominatedNodeName Not for empty.
When updating a Pod in PriorityQueue, it will then call updateNominatedPod to update the nominatedPods Cache in PriorityQueue.
// Update updates a pod in the active queue if present. Otherwise, it removes // the item from the unschedulable queue and adds the updated one to the active // queue. func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error { p.lock.Lock() defer p.lock.Unlock() // If the pod is already in the active queue, just update it there. if _, exists, _ := p.activeQ.Get(newPod); exists { p.updateNominatedPod(oldPod, newPod) err := p.activeQ.Update(newPod) return err } // If the pod is in the unschedulable queue, updating it may make it schedulable. if usPod := p.unschedulableQ.get(newPod); usPod != nil { p.updateNominatedPod(oldPod, newPod) if isPodUpdated(oldPod, newPod) { p.unschedulableQ.delete(usPod) err := p.activeQ.Add(newPod) if err == nil { p.cond.Broadcast() } return err } p.unschedulableQ.addOrUpdate(newPod) return nil } // If pod is not in any of the two queue, we put it in the active queue. err := p.activeQ.Add(newPod) if err == nil { p.addNominatedPodIfNeeded(newPod) p.cond.Broadcast() } return err }
updateNominatedPod updatePriorityQueue nominatedPods Cache logic is: delete oldPod first, then add newPod into it.
// updateNominatedPod updates a pod in the nominatedPods. func (p *PriorityQueue) updateNominatedPod(oldPod, newPod *v1.Pod) { // Even if the nominated node name of the Pod is not changed, we must delete and add it again // to ensure that its pointer is updated. p.deleteNominatedPodIfExists(oldPod) p.addNominatedPodIfNeeded(newPod) }
When deleting a Pod from PriorityQueue before deleting it, deleteNominatedPodIfExists is called to delete the pod from the PriorityQueue nominatedPods cache.
// Delete deletes the item from either of the two queues. It assumes the pod is // only in one queue. func (p *PriorityQueue) Delete(pod *v1.Pod) error { p.lock.Lock() defer p.lock.Unlock() p.deleteNominatedPodIfExists(pod) err := p.activeQ.Delete(pod) if err != nil { // The item was probably not found in the activeQ. p.unschedulableQ.delete(pod) } return nil }
When deletingNominatedPodIfExists, first check the pod's.Status.NominatedNodeName Is it empty.
func (p *PriorityQueue) deleteNominatedPodIfExists(pod *v1.Pod) { nnn := NominatedNodeName(pod) if len(nnn) > 0 { for i, np := range p.nominatedPods[nnn] { if np.UID == pod.UID { p.nominatedPods[nnn] = append(p.nominatedPods[nnn][:i], p.nominatedPods[nnn][i+1:]...) if len(p.nominatedPods[nnn]) == 0 { delete(p.nominatedPods, nnn) } break } } } }
In this paper, we explain the role of NominatedPods and NominatedNode, and analyze what change operations NominatedPods have during and after preemption scheduling from the source code perspective, and finally analyze the impact of Add/Update/Delete operations of Pods in PriorityQueue on PriorityQueue NominatedPods Cache, hoping to help readers deepen their understanding of scheduler preemption scheduling and priority queues.