本站总访问量 kubernetes中如何进行调度 - Jerry的小站

Jerry Gao

上帝就是真理,真理就是上帝

概述

  • K8s中是如何调度的?
  • K8s中调度的具体实现?

K8s如何调度Pod

Pod的调度主要分为两个阶段,主要是调度周期和绑定周期,调度周期决定Pod被调度到哪一个节点,绑定周期将决策运用到集群。调度周期和绑定周期成为“调度上下文”。

调度周期

  • 发现未被调度的Pod:调度器通过K8s的watch机制来发现集群中新创建且尚未被调度到节点的Pod。kube-scheduler位于集群控制面,是k8s的默认调度器,主要负责将一个pod调度到一个节点上。
  • 选择节点:在选择节点时包含过滤和打分两个步骤来选择合适的节点进行调度。
    • 在过滤阶段,调度器会选择一系列符合pod调度需求的节点
    • 在打分阶段,调度器会对可调度节点队列中的每一个节点进行打分
    • 最终pod会被调度到分数最高的节点上面

调度框架-调度器插件

调度框架是k8s调度框架的一种插件架构,提供了一系列API插件来实现大部分的调度功能,这些调度插件被注册后在一个或多个拓展点被调用。

内置调度器插件可以启用或禁用,大部分默认调度器都默认启用。如果要启用调度器,可以修改配置文件:

1
2
3
4
5
6
apiVersion: kubescheduler.config.k8s.io/v1alpha1
kind: KubeSchedulerConfiguration
enablePlugins:
PodTopologySpread: true
disablePlugins:
NodeAffinity: true

默认配置文件地址为在/etc/kubernetes/scheduler.conf

除了默认的插件,可以实现自己的插件,并和默认插件一起配置。你可以访问 scheduler-plugins 了解更多信息。

也可以通过KubeSchedulerConfiguration进行配置

源码分析

Scheduler结构体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// Scheduler监听新的Pod,试图找到适合的节点并且绑定写回api server
type Scheduler struct {
// 调度器缓存,通过对其修改可以观察到NodeLister和Algorithm的变化
Cache internalcache.Cache
// 调度器使用的拓展器列表
Extenders []framework.Extender
// 阻塞并等待下一个待调度的Pod,使用函数而不是通道的原因是,调度一个 Pod 可能需要一定的时间,而我们不希望 Pod 在通道中等待时变得过期
NextPod func() *framework.QueuedPodInfo
// 调度失败时调用的处理函数
FailureHandler FailureHandlerFn
// 尝试将给定的 Pod 调度到节点列表中的一个节点。如果成功,返回一个包含建议的主机名称的 ScheduleResult 结构体;否则,返回带有失败原因的 FitError
SchedulePod func(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (ScheduleResult, error)
// 用于关闭调度器的通道
StopEverything <-chan struct{}
// 用于保存待调度的 Pod 的调度队列
SchedulingQueue internalqueue.SchedulingQueue
// 调度配置文件(scheduling profiles)的映射
Profiles profile.Map
// 用于与 Kubernetes API 服务器通信的客户端
client clientset.Interface
// 节点信息的快照,用于存储节点的状态信息
nodeInfoSnapshot *internalcache.Snapshot
// 评分节点的百分比
percentageOfNodesToScore int32
// 下一个开始节点的索引,用于决定从节点列表中的哪个节点开始调度
nextStartNodeIndex int
}

Scheduler启动主循环

Scheduler结构体定义了一个方法Run,它用来启动结构体的主循环,即启动调度器:

1
2
3
4
5
6
7
8
9
10
11
// 开始监听和调度Unschedule状态的Pod,阻塞式的,直到上下文关闭
func (sched *Scheduler) Run(ctx context.Context) {
sched.SchedulingQueue.Run()

// 我们需要在专用的Goroutine中启动专用的schedulerOne循环,该方法会在函数获取下一项时被SchedulingQueue挂起。
// 如果没有新的Pod可以调度,将会被挂起,如果这个Goroutine中完成,将阻塞SchedulingQueue的关闭
go wait.UntilWithContext(ctx, sched.scheduleOne, 0)

<-ctx.Done()
sched.SchedulingQueue.Close()
}

wait.UntilWithContext函数在指定的上下文中循环执行给定的函数,ctx为传入的context,用来当ctx.Cancle或ctx.Done之后,会执行第10行的代码,即关闭SchedulingQueue。sched.scheduleOne为循环执行的函数,0即为执行一次。

我们来看下Scheduler.scheduleOne函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
// scheduleOne 对单个 pod 进行完整的调度工作流程,它在调度算法的主机拟合(host fitting)上进行串行化处理
func (sched *Scheduler) scheduleOne(ctx context.Context) {
podInfo := sched.NextPod()
// 当 schedulerQueue 被关闭时,pod 可能为 nil
if podInfo == nil || podInfo.Pod == nil {
return
}
pod := podInfo.Pod
fwk, err := sched.frameworkForPod(pod)
if err != nil {
// 这种情况不应该发生,因为我们只接受指定与配置文件中的调度器名称匹配的 pod 进行调度
klog.ErrorS(err, "Error occurred")
return
}
if sched.skipPodSchedule(fwk, pod) {
return
}

klog.V(3).InfoS("Attempting to schedule pod", "pod", klog.KObj(pod))

// 同步地尝试为 pod 找到合适的节点。
start := time.Now()
state := framework.NewCycleState()
state.SetRecordPluginMetrics(rand.Intn(100) < pluginMetricsSamplePercent)

// 初始化一个空的 podsToActivate 结构,它将由插件填充或保持为空。
podsToActivate := framework.NewPodsToActivate()
state.Write(framework.PodsToActivateKey, podsToActivate)

schedulingCycleCtx, cancel := context.WithCancel(ctx)
defer cancel()

scheduleResult, assumedPodInfo, status := sched.schedulingCycle(schedulingCycleCtx, state, fwk, podInfo, start, podsToActivate)
if !status.IsSuccess() {
sched.FailureHandler(schedulingCycleCtx, fwk, assumedPodInfo, status, scheduleResult.nominatingInfo, start)
return
}

// 异步地将 pod 绑定到其节点(我们可以这样做是因为上面的假设步骤)。
go func() {
bindingCycleCtx, cancel := context.WithCancel(ctx)
defer cancel()

metrics.SchedulerGoroutines.WithLabelValues(metrics.Binding).Inc()
defer metrics.SchedulerGoroutines.WithLabelValues(metrics.Binding).Dec()
metrics.Goroutines.WithLabelValues(metrics.Binding).Inc()
defer metrics.Goroutines.WithLabelValues(metrics.Binding).Dec()

status := sched.bindingCycle(bindingCycleCtx, state, fwk, scheduleResult, assumedPodInfo, start, podsToActivate)
if !status.IsSuccess() {
sched.handleBindingCycleError(bindingCycleCtx, state, fwk, assumedPodInfo, start, scheduleResult, status)
}
}()
}

host fitting,即主机拟合,是指将一个容器化的应用程序(通常是一个 Pod)调度到集群中的一个合适的节点(即主机)上运行的过程。

这个函数比较长,执行过程主要是:

  • 1.阻塞式获取下一个Pod,当schedulerQueue被关闭时退出
  • 2.sched.frameworkForPod方法用来获取Pod中指定的调度器
    Pod.Spec.SchedulerName用来指定Pod使用的调度器名称,默认是default。Kubernetes可以支持从头开始编写Scheduler,如果是自己实现的Scheduler,就可以在这个字段为Pod指定调度器。
  • 3.调度是否能被跳过:
    • 如果Pod正在被删除即跳过
    • 如果某个 Pod 在调度队列中被取出进行调度尝试之前,收到了更新事件(如 Pod 的 Spec 发生了变化),那么该 Pod 将会被标记为假设状态,并不会进行实际的调度尝试,以避免重复调度。
  • 4.概率性开启调度器插件统计指标的flag标志位
    • 插件指标是指在 Kubernetes Scheduler 中使用的调度插件(例如调度算法、优先级函数等)生成的性能指标或统计信息。这些指标通常用于监控和度量调度器的性能、效果和健康状态,以便在需要时进行调优和故障排除。
    • 在调度循环中,调度器可能会调用多个调度插件来评估节点的可用性、计算权重、比较优先级等。这些插件可能会在执行时生成各种指标,例如调度时间、节点利用率、调度决策等。记录这些插件指标可以帮助管理员和开发人员深入了解调度器的行为和性能,并进行优化和故障排除。
    • rand.Intn是 Go 语言标准库 math/rand 包中的一个函数,用于生成一个指定范围内的随机整数
  • 5.尝试调度单个Pod,如果调度失败,则调用FailureHandler函数进行处理
  • 6.调度成功,开启一个新的Goroutine
    • 使用Prometheus记录监控指标,metrics.SchedulerGoroutines 和 metrics。Goroutines,用于度量调度器的 Goroutine 数量
    • 调用sched.bindingCycle将Pod绑定到节点上,绑定失败,则使用sched.handleBindingCycleError处理错误

Scheduler调度

我们看到上述代码中sched.schedulingCycle用来调度Pod,我们看看代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
// schedulingCycle尝试调度单个Pod
func (sched *Scheduler) schedulingCycle(
ctx context.Context,
state *framework.CycleState,
fwk framework.Framework,
podInfo *framework.QueuedPodInfo,
start time.Time,
podsToActivate *framework.PodsToActivate,
) (ScheduleResult, *framework.QueuedPodInfo, *framework.Status) {
pod := podInfo.Pod
scheduleResult, err := sched.SchedulePod(ctx, fwk, state, pod)
if err != nil {
if err == ErrNoNodesAvailable {
status := framework.NewStatus(framework.UnschedulableAndUnresolvable).WithError(err)
return ScheduleResult{nominatingInfo: clearNominatedNode}, podInfo, status
}

fitError, ok := err.(*framework.FitError)
if !ok {
klog.ErrorS(err, "Error selecting node for pod", "pod", klog.KObj(pod))
return ScheduleResult{nominatingInfo: clearNominatedNode}, podInfo, framework.AsStatus(err)
}

// SchedulePod() may have failed because the pod would not fit on any host, so we try to
// preempt, with the expectation that the next time the pod is tried for scheduling it
// will fit due to the preemption. It is also possible that a different pod will schedule
// into the resources that were preempted, but this is harmless.

if !fwk.HasPostFilterPlugins() {
klog.V(3).InfoS("No PostFilter plugins are registered, so no preemption will be performed")
return ScheduleResult{}, podInfo, framework.NewStatus(framework.Unschedulable).WithError(err)
}

// Run PostFilter plugins to attempt to make the pod schedulable in a future scheduling cycle.
result, status := fwk.RunPostFilterPlugins(ctx, state, pod, fitError.Diagnosis.NodeToStatusMap)
msg := status.Message()
fitError.Diagnosis.PostFilterMsg = msg
if status.Code() == framework.Error {
klog.ErrorS(nil, "Status after running PostFilter plugins for pod", "pod", klog.KObj(pod), "status", msg)
} else {
klog.V(5).InfoS("Status after running PostFilter plugins for pod", "pod", klog.KObj(pod), "status", msg)
}

var nominatingInfo *framework.NominatingInfo
if result != nil {
nominatingInfo = result.NominatingInfo
}
return ScheduleResult{nominatingInfo: nominatingInfo}, podInfo, framework.NewStatus(framework.Unschedulable).WithError(err)
}

metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInSeconds(start))
// Tell the cache to assume that a pod now is running on a given node, even though it hasn't been bound yet.
// This allows us to keep scheduling without waiting on binding to occur.
assumedPodInfo := podInfo.DeepCopy()
assumedPod := assumedPodInfo.Pod
// assume modifies `assumedPod` by setting NodeName=scheduleResult.SuggestedHost
err = sched.assume(assumedPod, scheduleResult.SuggestedHost)
if err != nil {
// This is most probably result of a BUG in retrying logic.
// We report an error here so that pod scheduling can be retried.
// This relies on the fact that Error will check if the pod has been bound
// to a node and if so will not add it back to the unscheduled pods queue
// (otherwise this would cause an infinite loop).
return ScheduleResult{nominatingInfo: clearNominatedNode},
assumedPodInfo,
framework.AsStatus(err)
}

// Run the Reserve method of reserve plugins.
if sts := fwk.RunReservePluginsReserve(ctx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() {
// trigger un-reserve to clean up state associated with the reserved Pod
fwk.RunReservePluginsUnreserve(ctx, state, assumedPod, scheduleResult.SuggestedHost)
if forgetErr := sched.Cache.ForgetPod(assumedPod); forgetErr != nil {
klog.ErrorS(forgetErr, "Scheduler cache ForgetPod failed")
}

return ScheduleResult{nominatingInfo: clearNominatedNode},
assumedPodInfo,
sts
}

// Run "permit" plugins.
runPermitStatus := fwk.RunPermitPlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost)
if !runPermitStatus.IsWait() && !runPermitStatus.IsSuccess() {
// trigger un-reserve to clean up state associated with the reserved Pod
fwk.RunReservePluginsUnreserve(ctx, state, assumedPod, scheduleResult.SuggestedHost)
if forgetErr := sched.Cache.ForgetPod(assumedPod); forgetErr != nil {
klog.ErrorS(forgetErr, "Scheduler cache ForgetPod failed")
}

return ScheduleResult{nominatingInfo: clearNominatedNode},
assumedPodInfo,
runPermitStatus
}

// At the end of a successful scheduling cycle, pop and move up Pods if needed.
if len(podsToActivate.Map) != 0 {
sched.SchedulingQueue.Activate(podsToActivate.Map)
// Clear the entries after activation.
podsToActivate.Map = make(map[string]*v1.Pod)
}

return scheduleResult, assumedPodInfo, nil
}

依然这段代码比较长,但是总体内容比较简单:

  • 1.使用sched.SchedulePod对Pod进行模拟调度,返回一个scheduleResult,记录了调度结果Node的信息。
    接下来是一系列错误判断:
    • 没有可用的节点
    • 断言为FitError类型的错误,即选择节点时出现了不符合条件的错误,没有可用节点
    • 如果没有可用节点,且设置了PostFilterPlugins调度器插件,将实行抢占策略
    • 运行RunPostFilter插件尝试使pod在未来可以被调度
  • 2.记录metric指标,计算从开始时间 start 到当前时间的运行时延,并将其观测(Observe)到 metrics.SchedulingAlgorithmLatency 指标中
  • 3.调用sched.assume告诉缓存假定 pod 现在正在给定节点上运行,即使它尚未绑定。这使我们可以继续进行调度,而无需等待绑定发生。
  • 4.使用reserve插件的RunReservePluginsReserve方法,在pod实际绑定到节点之前,为Pod预留资源、标记节点或其他预留操作
    预留失败则调用RunReservePluginsUnreserve清理预留状态,然后用sched.Cache.ForgetPod(assumedPod) 方法将 assumedPod 从调度器的缓存中删除,以便后续的调度可以重新考虑这个Pod
  • 5.接下来,调用了调度器中的许可插件的 RunPermitPlugins 方法。许可插件是调度器的一部分,用于检查是否允许将 Pod 绑定到某个节点
  • 6.如果存在待激活的Pod,将其放入调度队列中,进行进一步的调度处理,并清空待激活的Pod列表
  • 最后返回调度结果,Pod信息

“待激活”(Pending Activation)是指已经被调度器选择为将要在某个节点上运行的 Pod,但由于某些原因还没有被激活,即还没有被添加到调度队列中进行实际的调度处理。通常情况下,Pod 需要在被激活后才能被真正调度到节点上运行。

Scheduler绑定到节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// bindingCycle tries to bind an assumed Pod.
func (sched *Scheduler) bindingCycle(
ctx context.Context,
state *framework.CycleState,
fwk framework.Framework,
scheduleResult ScheduleResult,
assumedPodInfo *framework.QueuedPodInfo,
start time.Time,
podsToActivate *framework.PodsToActivate) *framework.Status {

assumedPod := assumedPodInfo.Pod

// Run "permit" plugins.
if status := fwk.WaitOnPermit(ctx, assumedPod); !status.IsSuccess() {
return status
}

// Run "prebind" plugins.
if status := fwk.RunPreBindPlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost); !status.IsSuccess() {
return status
}

// Run "bind" plugins.
if status := sched.bind(ctx, fwk, assumedPod, scheduleResult.SuggestedHost, state); !status.IsSuccess() {
return status
}

// Calculating nodeResourceString can be heavy. Avoid it if klog verbosity is below 2.
klog.V(2).InfoS("Successfully bound pod to node", "pod", klog.KObj(assumedPod), "node", scheduleResult.SuggestedHost, "evaluatedNodes", scheduleResult.EvaluatedNodes, "feasibleNodes", scheduleResult.FeasibleNodes)
metrics.PodScheduled(fwk.ProfileName(), metrics.SinceInSeconds(start))
metrics.PodSchedulingAttempts.Observe(float64(assumedPodInfo.Attempts))
metrics.PodSchedulingDuration.WithLabelValues(getAttemptsLabel(assumedPodInfo)).Observe(metrics.SinceInSeconds(assumedPodInfo.InitialAttemptTimestamp))

// Run "postbind" plugins.
fwk.RunPostBindPlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost)

// At the end of a successful binding cycle, move up Pods if needed.
if len(podsToActivate.Map) != 0 {
sched.SchedulingQueue.Activate(podsToActivate.Map)
// Unlike the logic in schedulingCycle(), we don't bother deleting the entries
// as `podsToActivate.Map` is no longer consumed.
}

return nil
}
  • 1.首先通过调用 fwk.WaitOnPermit(ctx, assumedPod) 方法等待 “permit” 插件的完成状态。
    • “permit” 插件通常用于在实际进行 Pod 调度之前进行进一步的验证和确认,例如检查节点资源是否满足要求、检查节点的亲和性和反亲和性等条件是否满足等。这些插件可以对 Pod 进行更细粒度的筛选和过滤,以确保 Pod 能够在合适的节点上运行。
  • 2.执行 “prebind” 插件的逻辑,通过调用 fwk.RunPreBindPlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost) 方法运行 “prebind” 插件。
    • “prebind” 插件通常用于在进行 Pod 节点绑定(binding)之前进行一些预处理操作,例如修改 Pod 的标签、注解、亲和性和反亲和性等信息,以及进行其他一些验证和准备工作。这些插件可以在节点绑定之前对 Pod 进行进一步的处理和调整,以确保 Pod 在节点上的调度和运行是合理和符合预期的。
  • 3.调用 sched.bind(ctx, fwk, assumedPod, scheduleResult.SuggestedHost, state) 方法运行 “bind” 插件。
    • 根据拓展器插件 > 框架插件的优先级,调用extender.Bind方法,将Pod绑定到节点
    • “bind” 插件用于在进行 Pod 节点绑定(binding)时进行实际的节点绑定操作,将 Pod 绑定到指定的节点上,使其在该节点上运行。这些插件通常会处理节点资源的分配、Pod 节点状态的更新、调度事件的记录和报告等操作,以确保节点绑定的正确执行和状态的一致性。
  • 4.执行 “postbind” 插件的逻辑,通过调用 fwk.RunPostBindPlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost) 方法运行 “postbind” 插件。
    • “postbind” 插件用于在进行 Pod 节点绑定(binding)后进行一些后续的操作,例如更新节点状态、记录事件、发送通知等。这些插件通常会处理节点状态的更新、调度事件的记录和报告、通知相关组件等操作,以确保节点绑定后的一致性和完整性

Scheduler总结

参考文档

评论