概述
从Scheduler启动主循环的函数,我们可以看到,使用了sched.SchedulingQueue.Run(),调用了SchedulingQueue的Run方法,我们来看看SchedulingQueue做了什么
SchedulingQueue
1 | //SchedulingQueue 是队列的接口,用于存储等待调度的 pod。 |
我们可以看到,SchedulingQueue这个接口定义了一些列的方法,在k8s,默认的实现是PriorityQueue。
1 | func NewSchedulingQueue( |
PriorityQueue
先来看看PriorityQueue的结构:
1 | // PriorityQueue实现了一个调度队列 |
我们来看看PriorityQueue.Run()方法:
1 | // Run starts the goroutine to pump from podBackoffQ to activeQ |
我们可以看到,Run()方法开启了两个goroutine,分别是 flushBackoffQCompleted 和 flushUnschedulablePodsLeftover。
1 | // flushBackoffQCompleted 将所有已完成退避的 pod 从 backoffQ 移动到 activeQ |
我们可以看到两个方法都是加锁的:
- flushBackoffQCompleted:
- 对Pod进行PreEnqueue检查,如果Gated字段为 true,表示该 Pod 没有通过预调度插件的检查,被禁止调度,将该Pod添加到unschedulablePods中
- 如果 Gated 字段为 false,表示该Pod通过了预调度插件的检查,可以继续被调度,将调用activeQ的Add方法将QueuedPodInfo对象添加到activeQ中,作为一个优先级队列(heap)
- flushUnschedulablePodsLeftover:
- 检查是否有Pod在unschedulablePods中且超过podMaxInUnschedulablePodsDuration时间
- 如果有,则根据Pod的状态是否为podBackoff,如果是则放入activeQ,如不是则放入podBackoffQ
当Pod调度失败
当Filter阶段调度失败时,会调用AddUnschedulableIfNotPresent方法:
1 | func (p *PriorityQueue) AddUnschedulableIfNotPresent(pInfo *framework.QueuedPodInfo, podSchedulingCycle int64) error { |
具体逻辑如下:
- 获取待添加的 Pod 信息:从传入的 QueuedPodInfo 对象中获取 Pod 对象。
- 检查 Pod 是否已经存在于不可调度队列、活跃队列或者后退队列中,如果是则返回错误,表示 Pod 已经存在于相应的队列中。
- 更新 Pod 的时间戳:将 QueuedPodInfo 对象中的时间戳更新为当前时间,表示 Pod 被重新添加到队列中。
- 根据 Pod 的 move request 判断放入哪个队列:根据 Pod 的 move request 时间与当前调度轮次(podSchedulingCycle)的比较,决定将 Pod 放入后退队列(backoffQ)还是不可调度队列(unschedulablePods)。如果 Pod 的 move request 时间早于或等于当前调度轮次,则将 Pod 放入后退队列(backoffQ);否则,将 Pod 放入不可调度队列(unschedulablePods)。
- 更新队列信息和计数器:根据放入的队列类型,更新相应队列的信息和计数器。同时,调用 addNominatedPodUnlocked 方法,将 Pod 加入到优先队列的 nominatedPods 中。
- 返回错误或 nil:根据操作的结果,返回相应的错误(如果有)或者 nil,表示成功添加 Pod 到队列中。
