本站总访问量 Kubernetes中的SchedulingQueue - Jerry的小站

Jerry Gao

上帝就是真理,真理就是上帝

概述

Scheduler启动主循环的函数,我们可以看到,使用了sched.SchedulingQueue.Run(),调用了SchedulingQueue的Run方法,我们来看看SchedulingQueue做了什么

SchedulingQueue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
//SchedulingQueue 是队列的接口,用于存储等待调度的 pod。
//该接口遵循类似于 cache.FIFO 和 cache.Heap 的模式和
//使得将这些数据结构用作 SchedulingQueue 变得容易。
type SchedulingQueue interface {
framework.PodNominator
Add(pod *v1.Pod) error
//Activate 将给定的 pod 移动到 activeQ,前提是它们在 unschedulablePods 或 backoffQ 中
//传入的 Pod 最初是由想要激活 Pod 的插件编译而来的,
//通过保留的 CycleState 结构 (PodsToActivate) 注入 pod
Activate(pods map[string]*v1.Pod)
//AddUnschedulableIfNotPresent 将不可调度的 pod 添加回调度队列
//podSchedulingCycle 表示当前的调度周期数,可以是
//通过调用 SchedulingCycle() 返回
AddUnschedulableIfNotPresent(pod *framework.QueuedPodInfo, podSchedulingCycle int64) error
//SchedulingCycle 返回当前的调度周期数,即
//通过调度队列缓存。通常,每当
//弹出一个 pod(例如调用 Pop())就足够了
SchedulingCycle() int64
//Pop 移除队列的头部并将其返回。如果
//队列为空,等待直到有新项目添加到队列中
Pop() (*framework.QueuedPodInfo, error)
Update(oldPod, newPod *v1.Pod) error
Delete(pod *v1.Pod) error
MoveAllToActiveOrBackoffQueue(event framework.ClusterEvent, preCheck PreEnqueueCheck)
AssignedPodAdded(pod *v1.Pod)
AssignedPodUpdated(pod *v1.Pod)
PendingPods() ([]*v1.Pod, string)
//Close 关闭 SchedulingQueue 以便等待弹出项目的goroutine可以优雅地退出
Close()
// Run启动管理队列的 goroutines
Run()
}

我们可以看到,SchedulingQueue这个接口定义了一些列的方法,在k8s,默认的实现是PriorityQueue。

1
2
3
4
5
6
func NewSchedulingQueue(
lessFn framework.LessFunc,
informerFactory informers.SharedInformerFactory,
opts ...Option) SchedulingQueue {
return NewPriorityQueue(lessFn, informerFactory, opts...)
}

PriorityQueue

先来看看PriorityQueue的结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
// PriorityQueue实现了一个调度队列
// PriorityQueue 的头部是最高优先级的pending pod。
// 这个结构有两个子队列和一个可选的数据结构,即activeQ,backoffQ和unschedulablePods。
// - activeQ 持有正在考虑调度的Pod
// - backoffQ 持有从unschedulablePods移动到的pod,并将在他们的backoffQ退避期结束时移动到activeQ
// - unschedulablePods 持有那些已经被尝试过调度并且当前被确定不可调度的pod
type PriorityQueue struct {
*nominator

// 关闭队列的通道
stop chan struct{}
// 用于获取当前时间的时钟
clock clock.Clock

// Pod 初始的回退时长,用于实现 backoff 机制
podInitialBackoffDuration time.Duration
// Pod 最大的回退时长,用于实现 backoff 机制
podMaxBackoffDuration time.Duration
// Pod 在 unschedulablePods 中最大的停留时长
podMaxInUnschedulablePodsDuration time.Duration
// 用于实现条件变量的 sync.Cond
cond sync.Cond

// 一个基于堆的结构,用于存储调度器正在主动查看以找到要调度的 Pod
// 堆的头部是最高优先级的 Pod
activeQ *heap.Heap
// 一个基于堆的结构,按照回退过期时间排序的 Pod 队列
// 用于存储完成回退的 Pod,在调度器查看 activeQ 之前,从该堆中弹出 Pod
podBackoffQ *heap.Heap
// 存储已经尝试过但无法调度的 Pod 的结构
unschedulablePods *UnschedulablePods
// 调度周期的序列号,每次从队列中弹出一个 Pod 时会递增
schedulingCycle int64
// 缓存在接收到移动请求时的调度周期的序列号
// 如果在接收到移动请求时正在尝试调度的 Pod 则会被放回 activeQ
moveRequestCycle int64
// 用于存储集群事件和对应的 Pod 集合的映射
clusterEventMap map[framework.ClusterEvent]sets.Set[string]
// 存储注册的preEnqueue plugins插件,以配置文件名为键
preEnqueuePluginMap map[string][]framework.PreEnqueuePlugin

// 表示队列是否已关闭
closed bool

// 用于从缓存中获取 Namespace 列表的 Listers 接口
nsLister listersv1.NamespaceLister
// 用于记录调度器指标的异步记录器
metricsRecorder metrics.MetricAsyncRecorder
// 插件指标抽样百分比,用于控制插件指标的采样率
pluginMetricsSamplePercent int
}

我们来看看PriorityQueue.Run()方法:

1
2
3
4
5
// Run starts the goroutine to pump from podBackoffQ to activeQ
func (p *PriorityQueue) Run() {
go wait.Until(p.flushBackoffQCompleted, 1.0*time.Second, p.stop)
go wait.Until(p.flushUnschedulablePodsLeftover, 30*time.Second, p.stop)
}

我们可以看到,Run()方法开启了两个goroutine,分别是 flushBackoffQCompletedflushUnschedulablePodsLeftover

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
// flushBackoffQCompleted 将所有已完成退避的 pod 从 backoffQ 移动到 activeQ
func (p *PriorityQueue) flushBackoffQCompleted() {
p.lock.Lock()
defer p.lock.Unlock()
activated := false
for {
rawPodInfo := p.podBackoffQ.Peek()
if rawPodInfo == nil {
break
}
pInfo := rawPodInfo.(*framework.QueuedPodInfo)
pod := pInfo.Pod
if p.isPodBackingoff(pInfo) {
break
}
_, err := p.podBackoffQ.Pop()
if err != nil {
klog.ErrorS(err, "Unable to pop pod from backoff queue despite backoff completion", "pod", klog.KObj(pod))
break
}
if added, _ := p.addToActiveQ(pInfo); added {
klog.V(5).InfoS("Pod moved to an internal scheduling queue", "pod", klog.KObj(pod), "event", BackoffComplete, "queue", activeQName)
metrics.SchedulerQueueIncomingPods.WithLabelValues("active", BackoffComplete).Inc()
activated = true
}
}

if activated {
p.cond.Broadcast()
}
}

// flushUnschedulablePodsLeftover移动留在unschedulablePods中且超过podMaxInUnschedulablePodsDuration时间的Pod到backoffQ或activeQ
func (p *PriorityQueue) flushUnschedulablePodsLeftover() {
p.lock.Lock()
defer p.lock.Unlock()

var podsToMove []*framework.QueuedPodInfo
currentTime := p.clock.Now()
for _, pInfo := range p.unschedulablePods.podInfoMap {
lastScheduleTime := pInfo.Timestamp
if currentTime.Sub(lastScheduleTime) > p.podMaxInUnschedulablePodsDuration {
podsToMove = append(podsToMove, pInfo)
}
}

if len(podsToMove) > 0 {
p.movePodsToActiveOrBackoffQueue(podsToMove, UnschedulableTimeout)
}
}

我们可以看到两个方法都是加锁的:

  • flushBackoffQCompleted:
    • 对Pod进行PreEnqueue检查,如果Gated字段为 true,表示该 Pod 没有通过预调度插件的检查,被禁止调度,将该Pod添加到unschedulablePods中
    • 如果 Gated 字段为 false,表示该Pod通过了预调度插件的检查,可以继续被调度,将调用activeQ的Add方法将QueuedPodInfo对象添加到activeQ中,作为一个优先级队列(heap)
  • flushUnschedulablePodsLeftover:
    • 检查是否有Pod在unschedulablePods中且超过podMaxInUnschedulablePodsDuration时间
    • 如果有,则根据Pod的状态是否为podBackoff,如果是则放入activeQ,如不是则放入podBackoffQ

当Pod调度失败

当Filter阶段调度失败时,会调用AddUnschedulableIfNotPresent方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
func (p *PriorityQueue) AddUnschedulableIfNotPresent(pInfo *framework.QueuedPodInfo, podSchedulingCycle int64) error {
p.lock.Lock()
defer p.lock.Unlock()
pod := pInfo.Pod
if p.unschedulablePods.get(pod) != nil {
return fmt.Errorf("Pod %v is already present in unschedulable queue", klog.KObj(pod))
}

if _, exists, _ := p.activeQ.Get(pInfo); exists {
return fmt.Errorf("Pod %v is already present in the active queue", klog.KObj(pod))
}
if _, exists, _ := p.podBackoffQ.Get(pInfo); exists {
return fmt.Errorf("Pod %v is already present in the backoff queue", klog.KObj(pod))
}

// Refresh the timestamp since the pod is re-added.
pInfo.Timestamp = p.clock.Now()

// If a move request has been received, move it to the BackoffQ, otherwise move
// it to unschedulablePods.
for plugin := range pInfo.UnschedulablePlugins {
metrics.UnschedulableReason(plugin, pInfo.Pod.Spec.SchedulerName).Inc()
}
if p.moveRequestCycle >= podSchedulingCycle {
if err := p.podBackoffQ.Add(pInfo); err != nil {
return fmt.Errorf("error adding pod %v to the backoff queue: %v", klog.KObj(pod), err)
}
klog.V(5).InfoS("Pod moved to an internal scheduling queue", "pod", klog.KObj(pod), "event", ScheduleAttemptFailure, "queue", backoffQName)
metrics.SchedulerQueueIncomingPods.WithLabelValues("backoff", ScheduleAttemptFailure).Inc()
} else {
p.unschedulablePods.addOrUpdate(pInfo)
klog.V(5).InfoS("Pod moved to an internal scheduling queue", "pod", klog.KObj(pod), "event", ScheduleAttemptFailure, "queue", unschedulablePods)
metrics.SchedulerQueueIncomingPods.WithLabelValues("unschedulable", ScheduleAttemptFailure).Inc()

}

p.addNominatedPodUnlocked(pInfo.PodInfo, nil)
return nil
}

具体逻辑如下:

  • 获取待添加的 Pod 信息:从传入的 QueuedPodInfo 对象中获取 Pod 对象。
  • 检查 Pod 是否已经存在于不可调度队列、活跃队列或者后退队列中,如果是则返回错误,表示 Pod 已经存在于相应的队列中。
  • 更新 Pod 的时间戳:将 QueuedPodInfo 对象中的时间戳更新为当前时间,表示 Pod 被重新添加到队列中。
  • 根据 Pod 的 move request 判断放入哪个队列:根据 Pod 的 move request 时间与当前调度轮次(podSchedulingCycle)的比较,决定将 Pod 放入后退队列(backoffQ)还是不可调度队列(unschedulablePods)。如果 Pod 的 move request 时间早于或等于当前调度轮次,则将 Pod 放入后退队列(backoffQ);否则,将 Pod 放入不可调度队列(unschedulablePods)。
  • 更新队列信息和计数器:根据放入的队列类型,更新相应队列的信息和计数器。同时,调用 addNominatedPodUnlocked 方法,将 Pod 加入到优先队列的 nominatedPods 中。
  • 返回错误或 nil:根据操作的结果,返回相应的错误(如果有)或者 nil,表示成功添加 Pod 到队列中。

总结

根据三篇文章,可以画出此流程图:

评论