本站总访问量 Kubernetes如何启动Scheduler - Jerry的小站

Jerry Gao

上帝就是真理,真理就是上帝

概述

  • Kubernetes如何启动Scheduler

源码解析

Scheduler

上一篇文章文章我们讲了Schuduler的主循环,这一节我们讲讲kube-scheduler如何启动。

Setup

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
/ Setup creates a completed config and a scheduler based on the command args and options
func Setup(ctx context.Context, opts *options.Options, outOfTreeRegistryOptions ...Option) (*schedulerserverconfig.CompletedConfig, *scheduler.Scheduler, error) {
if cfg, err := latest.Default(); err != nil {
return nil, nil, err
} else {
opts.ComponentConfig = cfg
}

if errs := opts.Validate(); len(errs) > 0 {
return nil, nil, utilerrors.NewAggregate(errs)
}

c, err := opts.Config(ctx)
if err != nil {
return nil, nil, err
}

// Get the completed config
cc := c.Complete()

outOfTreeRegistry := make(runtime.Registry)
for _, option := range outOfTreeRegistryOptions {
if err := option(outOfTreeRegistry); err != nil {
return nil, nil, err
}
}

recorderFactory := getRecorderFactory(&cc)
completedProfiles := make([]kubeschedulerconfig.KubeSchedulerProfile, 0)
// Create the scheduler.
sched, err := scheduler.New(cc.Client,
cc.InformerFactory,
cc.DynInformerFactory,
recorderFactory,
ctx.Done(),
scheduler.WithComponentConfigVersion(cc.ComponentConfig.TypeMeta.APIVersion),
scheduler.WithKubeConfig(cc.KubeConfig),
scheduler.WithProfiles(cc.ComponentConfig.Profiles...),
scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore),
scheduler.WithFrameworkOutOfTreeRegistry(outOfTreeRegistry),
scheduler.WithPodMaxBackoffSeconds(cc.ComponentConfig.PodMaxBackoffSeconds),
scheduler.WithPodInitialBackoffSeconds(cc.ComponentConfig.PodInitialBackoffSeconds),
scheduler.WithPodMaxInUnschedulablePodsDuration(cc.PodMaxInUnschedulablePodsDuration),
scheduler.WithExtenders(cc.ComponentConfig.Extenders...),
scheduler.WithParallelism(cc.ComponentConfig.Parallelism),
scheduler.WithBuildFrameworkCapturer(func(profile kubeschedulerconfig.KubeSchedulerProfile) {
// Profiles are processed during Framework instantiation to set default plugins and configurations. Capturing them for logging
completedProfiles = append(completedProfiles, profile)
}),
)
if err != nil {
return nil, nil, err
}
if err := options.LogOrWriteConfig(klog.FromContext(ctx), opts.WriteConfigTo, &cc.ComponentConfig, completedProfiles); err != nil {
return nil, nil, err
}

return &cc, sched, nil
}

根据注释,我们可以看到这个函数主要是用来根据命令行参数和选项创建配置文件和Scheduler。

  • 1.使用了last.Default()函数,我们可以得到一个默认的config。会使用最新API版本的配置默认值。默认配置包括:
    • metav1.TypeMeta,包括Kind和APIVersion
    • Parallelism,并行度定义了用于调度 Pod 的算法中的并行度。必须大于 0。默认为 16
    • LeaderElection,定义了选举领导者的配置,具体配置可以查看LeaderElectionConfiguration结构体
    • ClientConnection,指定使用kubeconfig与apiserver通信时的配置
    • componentbaseconfigv1alpha1.DebuggingConfiguration,调试相关的功能和配置
    • PercentageOfNodesToScore,发现可行性节点的配置
    • PodInitialBackoffSeconds,不可调度Pod的初始退避
    • PodMaxBackoffSeconds,不可调度Pod的最大退避
    • Profiles,Pod指定的调度程序的名称
    • Extenders,拓展器列表
  • 2.opts.Validate对用户的配置进行检查
  • 3.c.Complete获取完整的配置
  • 4.使用scheduler.New创建一个Scheduler结构体实例

scheduler.New()

scheduler.New()用来创建一个Scheduler实例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
func New(client clientset.Interface,
informerFactory informers.SharedInformerFactory,
dynInformerFactory dynamicinformer.DynamicSharedInformerFactory,
recorderFactory profile.RecorderFactory,
stopCh <-chan struct{},
opts ...Option) (*Scheduler, error) {

// 停止信号通道默认值
stopEverything := stopCh
if stopEverything == nil {
stopEverything = wait.NeverStop
}

// 创建默认的配置,opt为用来给scheduler赋值的工厂函数
options := defaultSchedulerOptions
for _, opt := range opts {
opt(&options)
}

if options.applyDefaultProfile {
var versionedCfg configv1.KubeSchedulerConfiguration
scheme.Scheme.Default(&versionedCfg)
cfg := schedulerapi.KubeSchedulerConfiguration{}
if err := scheme.Scheme.Convert(&versionedCfg, &cfg, nil); err != nil {
return nil, err
}
options.profiles = cfg.Profiles
}

// 内置插件注册表,里面提供了一系列的特性开关
registry := frameworkplugins.NewInTreeRegistry()
// 合并外置插件注册表
if err := registry.Merge(options.frameworkOutOfTreeRegistry); err != nil {
return nil, err
}

// 指标注册
metrics.Register()

// 构建拓展器
extenders, err := buildExtenders(options.extenders, options.profiles)
if err != nil {
return nil, fmt.Errorf("couldn't build extenders: %w", err)
}

// 创建 Pod 和 Node 的 Lister,用于访问 Informer 中的缓存数据
podLister := informerFactory.Core().V1().Pods().Lister()
nodeLister := informerFactory.Core().V1().Nodes().Lister()

// 创建调度器使用的快照、集群事件映射和度量指标记录器
snapshot := internalcache.NewEmptySnapshot()
clusterEventMap := make(map[framework.ClusterEvent]sets.Set[string])
metricsRecorder := metrics.NewMetricsAsyncRecorder(1000, time.Second, stopCh)

// 初始化调度器的配置文件,并创建调度器使用的配置文件映射
profiles, err := profile.NewMap(options.profiles, registry, recorderFactory, stopCh,
frameworkruntime.WithComponentConfigVersion(options.componentConfigVersion),
frameworkruntime.WithClientSet(client),
frameworkruntime.WithKubeConfig(options.kubeConfig),
frameworkruntime.WithInformerFactory(informerFactory),
frameworkruntime.WithSnapshotSharedLister(snapshot),
frameworkruntime.WithCaptureProfile(frameworkruntime.CaptureProfile(options.frameworkCapturer)),
frameworkruntime.WithClusterEventMap(clusterEventMap),
frameworkruntime.WithClusterEventMap(clusterEventMap),
frameworkruntime.WithParallelism(int(options.parallelism)),
frameworkruntime.WithExtenders(extenders),
frameworkruntime.WithMetricsRecorder(metricsRecorder),
)
if err != nil {
return nil, fmt.Errorf("initializing profiles: %v", err)
}

if len(profiles) == 0 {
return nil, errors.New("at least one profile is required")
}

// 创建预处理插件映射,用于在调度前执行预处理插件
preEnqueuePluginMap := make(map[string][]framework.PreEnqueuePlugin)
for profileName, profile := range profiles {
preEnqueuePluginMap[profileName] = profile.PreEnqueuePlugins()
}
// 创建 Pod 的调度队列,设置队列的排序函数、初始和最大回退时间、Lister、集群事件映射、最大未调度 Pod 持续时间等
podQueue := internalqueue.NewSchedulingQueue(
// 指定 Pod 调度队列中的排序函数,用于决定 Pod 的调度顺序。这里使用了一个配置文件中指定的调度器名称对应的排序函数
profiles[options.profiles[0].SchedulerName].QueueSortFunc(),
// 用于创建 Informer 对象的工厂,这些 Informer 对象用于监听 Kubernetes API Server 中的资源变更事件,以便调度器能够实时获取最新的资源状态
informerFactory,
// 设置 Pod 的初始退避时间和最大退避时间,用于在调度队列中处理失败的调度尝试时进行重试
internalqueue.WithPodInitialBackoffDuration(time.Duration(options.podInitialBackoffSeconds)*time.Second),
internalqueue.WithPodMaxBackoffDuration(time.Duration(options.podMaxBackoffSeconds)*time.Second),
// 设置用于获取 Pod 列表的 Lister 接口,用于从本地缓存中获取 Pod 的最新信息,以提高调度队列的性能
internalqueue.WithPodLister(podLister),
// 设置用于处理集群级事件的映射,这些事件包括节点状态变更、节点资源变更等
internalqueue.WithClusterEventMap(clusterEventMap),
// 设置 Pod 在调度队列中最大的未调度时长,用于控制长时间未调度的 Pod 的处理方式
internalqueue.WithPodMaxInUnschedulablePodsDuration(options.podMaxInUnschedulablePodsDuration),
// 设置预调度插件(PreFilter、Filter 和 Score)的映射,用于在将 Pod 加入调度队列之前进行预处理,例如资源检查、节点亲和性/亲和性反亲和性检查等
internalqueue.WithPreEnqueuePluginMap(preEnqueuePluginMap),
// 设置插件指标采样的百分比,用于控制插件指标的采样率,以降低对性能的影响
internalqueue.WithPluginMetricsSamplePercent(pluginMetricsSamplePercent),
// 设置用于记录调度队列指标的 MetricsRecorder,用于监控和度量调度队列的性能
internalqueue.WithMetricsRecorder(*metricsRecorder),
)

// 遍历所有配置文件,为每个配置文件设置 Pod 的提名器
for _, fwk := range profiles {
fwk.SetPodNominator(podQueue)
}

// 创建调度器缓存,用于存储调度器运行时状态
schedulerCache := internalcache.New(durationToExpireAssumedPod, stopEverything)

// 创建缓存调试器,用于监听停止信号并进行调试
debugger := cachedebugger.New(nodeLister, podLister, schedulerCache, podQueue)
debugger.ListenForSignal(stopEverything)

// 创建调度器实例,并应用默认的事件处理函数
sched := &Scheduler{
Cache: schedulerCache,
client: client,
nodeInfoSnapshot: snapshot,
percentageOfNodesToScore: options.percentageOfNodesToScore,
Extenders: extenders,
NextPod: internalqueue.MakeNextPodFunc(podQueue),
StopEverything: stopEverything,
SchedulingQueue: podQueue,
Profiles: profiles,
}
sched.applyDefaultHandlers()
// 添加所有的事件处理函数到调度器中,包括 Informer 和动态 Informer 的事件处理函数
addAllEventHandlers(sched, informerFactory, dynInformerFactory, unionedGVKs(clusterEventMap))

return sched, nil
}

这段代码的输入参数为:

  • client: clientset.Interface 类型,表示与 Kubernetes API 通信的客户端
  • informerFactory: informer.SharedInformerFactory 类型,表示用于创建 Informer 的工厂,用于从 Kubernetes API 接收事件并维护本地缓存
  • dynInformerFactory: dynamicinformer.DynamicSharedInformerFactory 类型,表示用于创建动态 Informer 的工厂,用于从 Kubernetes API 接收动态资源事件并维护本地缓存
  • recorderFactory: profile.RecorderFactory 类型,表示用于创建事件记录器的工厂,用于记录调度器产生的事件
  • stopCh: <-chan struct{} 类型,表示一个停止信号通道,用于通知调度器停止运行
  • opts …Option: 可选的选项参数,用于配置调度器的行为

sched.applyDefaultHandlers()

在New一个Scheduler的时候会给scheduler结构体实例添加默认的Handler,即sched.applyDefaultHandlers()

1
2
3
4
func (s *Scheduler) applyDefaultHandlers() {
s.SchedulePod = s.schedulePod
s.FailureHandler = s.handleSchedulingFailure
}

我们可以看到给scheduler添加了两个方法,即schedulePodhandleSchedulingFailure

schedulePod
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
// 尝试调度一个给定的Pod到节点列表中的节点
// 如果成功返回节点名称
// 失败返回FitError
func (sched *Scheduler) schedulePod(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) {
// 创建了一个名为 "Scheduling" 的追踪(trace)对象,用于在调度过程中记录相关的跟踪信息。
// 这个追踪对象包含两个字段:一个是 "namespace",值为 Pod 的命名空间(pod.Namespace),
// 另一个是 "name",值为 Pod 的名称(pod.Name)。
// 这些字段将被用于在跟踪信息中记录 Pod 的命名空间和名称,以便在调度过程中进行监控和日志记录
trace := utiltrace.New("Scheduling", utiltrace.Field{Key: "namespace", Value: pod.Namespace}, utiltrace.Field{Key: "name", Value: pod.Name})
// 调度过程中检查追踪对象 trace 是否超时(超过 100 毫秒)
defer trace.LogIfLong(100 * time.Millisecond)

// 更新调度器的缓存快照
if err := sched.Cache.UpdateSnapshot(sched.nodeInfoSnapshot); err != nil {
return result, err
}
trace.Step("Snapshotting scheduler cache and node infos done")

// 调度器的节点信息快照(nodeInfoSnapshot)中节点的数量是否为0。如果节点数量为0,说明没有可用的节点进行调度
if sched.nodeInfoSnapshot.NumNodes() == 0 {
return result, ErrNoNodesAvailable
}
// 查找满足 pod 调度条件的节点,并获取调度诊断信息,以便后续的调度决策和处理
feasibleNodes, diagnosis, err := sched.findNodesThatFitPod(ctx, fwk, state, pod)
if err != nil {
return result, err
}
trace.Step("Computing predicates done")

// 没有满足条件的节点
if len(feasibleNodes) == 0 {
return result, &framework.FitError{
Pod: pod,
NumAllNodes: sched.nodeInfoSnapshot.NumNodes(),
Diagnosis: diagnosis,
}
}

// 只有一个节点,直接使用这个节点
if len(feasibleNodes) == 1 {
return ScheduleResult{
SuggestedHost: feasibleNodes[0].Name,
EvaluatedNodes: 1 + len(diagnosis.NodeToStatusMap),
FeasibleNodes: 1,
}, nil
}

// 进行优先级排序,选择最优的节点
priorityList, err := prioritizeNodes(ctx, sched.Extenders, fwk, state, pod, feasibleNodes)
if err != nil {
return result, err
}

// 从priorityList找出得分最高的节点,如果多个节点得分一样,则从得分一致的节点列表随机选择一个
host, err := selectHost(priorityList)
trace.Step("Prioritizing done")

return ScheduleResult{
SuggestedHost: host,
EvaluatedNodes: len(feasibleNodes) + len(diagnosis.NodeToStatusMap),
FeasibleNodes: len(feasibleNodes),
}, err
}

从代码可以看到,有两个比较关键的函数sched.findNodesThatFitPodprioritizeNodes

  1. sched.findNodesThatFitPod:

使用过滤器插件和过滤器拓展器来查找可用的节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
func (sched *Scheduler) findNodesThatFitPod(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) ([]*v1.Node, framework.Diagnosis, error) {
// 记录详细信息以诊断调度失败
diagnosis := framework.Diagnosis{
NodeToStatusMap: make(framework.NodeToStatusMap),
UnschedulablePlugins: sets.New[string](),
}

// 从节点信息快照中获取所有的节点信息
allNodes, err := sched.nodeInfoSnapshot.NodeInfos().List()
if err != nil {
return nil, diagnosis, err
}
// 运行 PreFilterPlugins 插件
preRes, s := fwk.RunPreFilterPlugins(ctx, state, pod)
// 失败
if !s.IsSuccess() {
// 不为不可调度
if !s.IsUnschedulable() {
return nil, diagnosis, s.AsError()
}
// Record the messages from PreFilter in Diagnosis.PreFilterMsg.
msg := s.Message()
diagnosis.PreFilterMsg = msg
klog.V(5).InfoS("Status after running PreFilter plugins for pod", "pod", klog.KObj(pod), "status", msg)
// Status satisfying IsUnschedulable() gets injected into diagnosis.UnschedulablePlugins.
if s.FailedPlugin() != "" {
diagnosis.UnschedulablePlugins.Insert(s.FailedPlugin())
}
return nil, diagnosis, nil
}

// 存在已经被提名的节点,即类似nodeSelector,Node Affinity等
if len(pod.Status.NominatedNodeName) > 0 {
feasibleNodes, err := sched.evaluateNominatedNode(ctx, pod, fwk, state, diagnosis)
if err != nil {
klog.ErrorS(err, "Evaluation failed on nominated node", "pod", klog.KObj(pod), "node", pod.Status.NominatedNodeName)
}
// Nominated node passes all the filters, scheduler is good to assign this node to the pod.
if len(feasibleNodes) != 0 {
return feasibleNodes, diagnosis, nil
}
}

nodes := allNodes
if !preRes.AllNodes() {
nodes = make([]*framework.NodeInfo, 0, len(preRes.NodeNames))
for n := range preRes.NodeNames {
nInfo, err := sched.nodeInfoSnapshot.NodeInfos().Get(n)
if err != nil {
return nil, diagnosis, err
}
nodes = append(nodes, nInfo)
}
}
// 根据Filter插件列表查找适合过滤器插件的节点
feasibleNodes, err := sched.findNodesThatPassFilters(ctx, fwk, state, pod, diagnosis, nodes)
// 用于记录下一次开始调度的起始节点的索引值。
// 在调度过程中,调度器(scheduler)通常会遍历所有的节点进行评估,然后选择一个合适的节点进行调度。
// 为了避免每次都从同一个节点开始评估,可以使用 nextStartNodeIndex 变量记录下一次开始评估的节点索引值,
// 以便能够在下一次调度过程中从下一个节点开始,从而实现节点的轮询(Round-robin)策略
processedNodes := len(feasibleNodes) + len(diagnosis.NodeToStatusMap)
sched.nextStartNodeIndex = (sched.nextStartNodeIndex + processedNodes) % len(nodes)
if err != nil {
return nil, diagnosis, err
}

// 根据Filter拓展器列表查找可用的节点列表
feasibleNodes, err = findNodesThatPassExtenders(sched.Extenders, pod, feasibleNodes, diagnosis.NodeToStatusMap)
if err != nil {
return nil, diagnosis, err
}
return feasibleNodes, diagnosis, nil
}
  1. prioritizeNodes

运行评分插件来确定节点的优先级,从调用 RunScorePlugins() 返回每个节点的分数。来自每个插件的分数被加在一起以得到该节点的分数,然后任何扩展程序也会运行。最后将所有分数合并(相加)得到所有节点的总加权分数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
func prioritizeNodes(
ctx context.Context,
extenders []framework.Extender,
fwk framework.Framework,
state *framework.CycleState,
pod *v1.Pod,
nodes []*v1.Node,
) ([]framework.NodePluginScores, error) {
//如果未提供优先级配置,则所有节点的得分均为 1。
//这是生成所需格式的优先级列表所必需的
if len(extenders) == 0 && !fwk.HasScorePlugins() {
result := make([]framework.NodePluginScores, 0, len(nodes))
for i := range nodes {
result = append(result, framework.NodePluginScores{
Name: nodes[i].Name,
TotalScore: 1,
})
}
return result, nil
}

// Run PreScore plugins.
preScoreStatus := fwk.RunPreScorePlugins(ctx, state, pod, nodes)
if !preScoreStatus.IsSuccess() {
return nil, preScoreStatus.AsError()
}

// Run the Score plugins.
nodesScores, scoreStatus := fwk.RunScorePlugins(ctx, state, pod, nodes)
if !scoreStatus.IsSuccess() {
return nil, scoreStatus.AsError()
}

// Additional details logged at level 10 if enabled.
klogV := klog.V(10)
if klogV.Enabled() {
for _, nodeScore := range nodesScores {
for _, pluginScore := range nodeScore.Scores {
klogV.InfoS("Plugin scored node for pod", "pod", klog.KObj(pod), "plugin", pluginScore.Name, "node", nodeScore.Name, "score", pluginScore.Score)
}
}
}

if len(extenders) != 0 && nodes != nil {
// allNodeExtendersScores has all extenders scores for all nodes.
// It is keyed with node name.
allNodeExtendersScores := make(map[string]*framework.NodePluginScores, len(nodes))
var mu sync.Mutex
var wg sync.WaitGroup
for i := range extenders {
if !extenders[i].IsInterested(pod) {
continue
}
wg.Add(1)
go func(extIndex int) {
metrics.SchedulerGoroutines.WithLabelValues(metrics.PrioritizingExtender).Inc()
metrics.Goroutines.WithLabelValues(metrics.PrioritizingExtender).Inc()
defer func() {
metrics.SchedulerGoroutines.WithLabelValues(metrics.PrioritizingExtender).Dec()
metrics.Goroutines.WithLabelValues(metrics.PrioritizingExtender).Dec()
wg.Done()
}()
prioritizedList, weight, err := extenders[extIndex].Prioritize(pod, nodes)
if err != nil {
// Prioritization errors from extender can be ignored, let k8s/other extenders determine the priorities
klog.V(5).InfoS("Failed to run extender's priority function. No score given by this extender.", "error", err, "pod", klog.KObj(pod), "extender", extenders[extIndex].Name())
return
}
mu.Lock()
defer mu.Unlock()
for i := range *prioritizedList {
nodename := (*prioritizedList)[i].Host
score := (*prioritizedList)[i].Score
if klogV.Enabled() {
klogV.InfoS("Extender scored node for pod", "pod", klog.KObj(pod), "extender", extenders[extIndex].Name(), "node", nodename, "score", score)
}

// MaxExtenderPriority may diverge from the max priority used in the scheduler and defined by MaxNodeScore,
// therefore we need to scale the score returned by extenders to the score range used by the scheduler.
finalscore := score * weight * (framework.MaxNodeScore / extenderv1.MaxExtenderPriority)

if allNodeExtendersScores[nodename] == nil {
allNodeExtendersScores[nodename] = &framework.NodePluginScores{
Name: nodename,
Scores: make([]framework.PluginScore, 0, len(extenders)),
}
}
allNodeExtendersScores[nodename].Scores = append(allNodeExtendersScores[nodename].Scores, framework.PluginScore{
Name: extenders[extIndex].Name(),
Score: finalscore,
})
allNodeExtendersScores[nodename].TotalScore += finalscore
}
}(i)
}
// wait for all go routines to finish
wg.Wait()
for i := range nodesScores {
if score, ok := allNodeExtendersScores[nodes[i].Name]; ok {
nodesScores[i].Scores = append(nodesScores[i].Scores, score.Scores...)
nodesScores[i].TotalScore += score.TotalScore
}
}
}

if klogV.Enabled() {
for i := range nodesScores {
klogV.InfoS("Calculated node's final score for pod", "pod", klog.KObj(pod), "node", nodesScores[i].Name, "score", nodesScores[i].TotalScore)
}
}
return nodesScores, nil
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
func prioritizeNodes(
ctx context.Context,
extenders []framework.Extender,
fwk framework.Framework,
state *framework.CycleState,
pod *v1.Pod,
nodes []*v1.Node,
) ([]framework.NodePluginScores, error) {
// If no priority configs are provided, then all nodes will have a score of one.
// This is required to generate the priority list in the required format
if len(extenders) == 0 && !fwk.HasScorePlugins() {
result := make([]framework.NodePluginScores, 0, len(nodes))
for i := range nodes {
result = append(result, framework.NodePluginScores{
Name: nodes[i].Name,
TotalScore: 1,
})
}
return result, nil
}

// Run PreScore plugins.
preScoreStatus := fwk.RunPreScorePlugins(ctx, state, pod, nodes)
if !preScoreStatus.IsSuccess() {
return nil, preScoreStatus.AsError()
}

// Run the Score plugins.
nodesScores, scoreStatus := fwk.RunScorePlugins(ctx, state, pod, nodes)
if !scoreStatus.IsSuccess() {
return nil, scoreStatus.AsError()
}

// Additional details logged at level 10 if enabled.
klogV := klog.V(10)
if klogV.Enabled() {
for _, nodeScore := range nodesScores {
for _, pluginScore := range nodeScore.Scores {
klogV.InfoS("Plugin scored node for pod", "pod", klog.KObj(pod), "plugin", pluginScore.Name, "node", nodeScore.Name, "score", pluginScore.Score)
}
}
}

if len(extenders) != 0 && nodes != nil {
// allNodeExtendersScores has all extenders scores for all nodes.
// It is keyed with node name.
allNodeExtendersScores := make(map[string]*framework.NodePluginScores, len(nodes))
var mu sync.Mutex
var wg sync.WaitGroup
for i := range extenders {
if !extenders[i].IsInterested(pod) {
continue
}
wg.Add(1)
go func(extIndex int) {
metrics.SchedulerGoroutines.WithLabelValues(metrics.PrioritizingExtender).Inc()
metrics.Goroutines.WithLabelValues(metrics.PrioritizingExtender).Inc()
defer func() {
metrics.SchedulerGoroutines.WithLabelValues(metrics.PrioritizingExtender).Dec()
metrics.Goroutines.WithLabelValues(metrics.PrioritizingExtender).Dec()
wg.Done()
}()
prioritizedList, weight, err := extenders[extIndex].Prioritize(pod, nodes)
if err != nil {
// Prioritization errors from extender can be ignored, let k8s/other extenders determine the priorities
klog.V(5).InfoS("Failed to run extender's priority function. No score given by this extender.", "error", err, "pod", klog.KObj(pod), "extender", extenders[extIndex].Name())
return
}
mu.Lock()
defer mu.Unlock()
for i := range *prioritizedList {
nodename := (*prioritizedList)[i].Host
score := (*prioritizedList)[i].Score
if klogV.Enabled() {
klogV.InfoS("Extender scored node for pod", "pod", klog.KObj(pod), "extender", extenders[extIndex].Name(), "node", nodename, "score", score)
}

// MaxExtenderPriority may diverge from the max priority used in the scheduler and defined by MaxNodeScore,
// therefore we need to scale the score returned by extenders to the score range used by the scheduler.
finalscore := score * weight * (framework.MaxNodeScore / extenderv1.MaxExtenderPriority)

if allNodeExtendersScores[nodename] == nil {
allNodeExtendersScores[nodename] = &framework.NodePluginScores{
Name: nodename,
Scores: make([]framework.PluginScore, 0, len(extenders)),
}
}
allNodeExtendersScores[nodename].Scores = append(allNodeExtendersScores[nodename].Scores, framework.PluginScore{
Name: extenders[extIndex].Name(),
Score: finalscore,
})
allNodeExtendersScores[nodename].TotalScore += finalscore
}
}(i)
}
// wait for all go routines to finish
wg.Wait()
for i := range nodesScores {
if score, ok := allNodeExtendersScores[nodes[i].Name]; ok {
nodesScores[i].Scores = append(nodesScores[i].Scores, score.Scores...)
nodesScores[i].TotalScore += score.TotalScore
}
}
}

if klogV.Enabled() {
for i := range nodesScores {
klogV.InfoS("Calculated node's final score for pod", "pod", klog.KObj(pod), "node", nodesScores[i].Name, "score", nodesScores[i].TotalScore)
}
}
return nodesScores, nil
}
handleSchedulingFailure
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
func (sched *Scheduler) handleSchedulingFailure(ctx context.Context, fwk framework.Framework, podInfo *framework.QueuedPodInfo, status *framework.Status, nominatingInfo *framework.NominatingInfo, start time.Time) {
...

pod := podInfo.Pod
err := status.AsError()
errMsg := status.Message()

if err == ErrNoNodesAvailable {
klog.V(2).InfoS("Unable to schedule pod; no nodes are registered to the cluster; waiting", "pod", klog.KObj(pod), "err", err)
} else if fitError, ok := err.(*framework.FitError); ok {
// Inject UnschedulablePlugins to PodInfo, which will be used later for moving Pods between queues efficiently.
podInfo.UnschedulablePlugins = fitError.Diagnosis.UnschedulablePlugins
klog.V(2).InfoS("Unable to schedule pod; no fit; waiting", "pod", klog.KObj(pod), "err", errMsg)
} else if apierrors.IsNotFound(err) {
// 错误是 NotFound 错误,可能是因为节点未找到,将会输出日志信息并等待,并尝试从调度器缓存中移除节点
klog.V(2).InfoS("Unable to schedule pod, possibly due to node not found; waiting", "pod", klog.KObj(pod), "err", errMsg)
if errStatus, ok := err.(apierrors.APIStatus); ok && errStatus.Status().Details.Kind == "node" {
nodeName := errStatus.Status().Details.Name
// when node is not found, We do not remove the node right away. Trying again to get
// the node and if the node is still not found, then remove it from the scheduler cache.
_, err := fwk.ClientSet().CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
if err != nil && apierrors.IsNotFound(err) {
node := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: nodeName}}
if err := sched.Cache.RemoveNode(&node); err != nil {
klog.V(4).InfoS("Node is not found; failed to remove it from the cache", "node", node.Name)
}
}
}
} else {
klog.ErrorS(err, "Error scheduling pod; retrying", "pod", klog.KObj(pod))
}

// 检查 Pod 是否存在于 Informer 缓存中。如果 Pod 不存在于缓存中,将会输出日志信息
podLister := fwk.SharedInformerFactory().Core().V1().Pods().Lister()
cachedPod, e := podLister.Pods(pod.Namespace).Get(pod.Name)
if e != nil {
klog.InfoS("Pod doesn't exist in informer cache", "pod", klog.KObj(pod), "err", e)
} else {
// 如果 Pod 存在于缓存中,首先检查缓存中的 Pod 是否已经被分配到节点上,如果是则会输出日志信息并不将其添加回队列中,否则会将缓存中的 Pod 进行深拷贝并添加到调度队列中
if len(cachedPod.Spec.NodeName) != 0 {
klog.InfoS("Pod has been assigned to node. Abort adding it back to queue.", "pod", klog.KObj(pod), "node", cachedPod.Spec.NodeName)
} else {
//由于 <cachedPod> 来自 SharedInformer,我们需要在这里做一个 DeepCopy()。
//忽略这个错误,因为 apiserver 没有正确验证关联项
//而且我们无法修复向后兼容性的验证。
podInfo.PodInfo, _ = framework.NewPodInfo(cachedPod.DeepCopy())
if err := sched.SchedulingQueue.AddUnschedulableIfNotPresent(podInfo, sched.SchedulingQueue.SchedulingCycle()); err != nil {
klog.ErrorS(err, "Error occurred")
}
}
}

...

// 更新调度状态为调度失败
if err := updatePod(ctx, sched.client, pod, &v1.PodCondition{
Type: v1.PodScheduled,
Status: v1.ConditionFalse,
Reason: reason,
Message: errMsg,
}, nominatingInfo); err != nil {
klog.ErrorS(err, "Error updating pod", "pod", klog.KObj(pod))
}
}

Run()

代码位于cmd/kube-scheduler/server.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
// 通过给定的配置运行scheduler,只在上下文发生时返回错误或者在上下文终止时返回。
func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *scheduler.Scheduler) error {
logger := klog.FromContext(ctx)

// 调试,输出版本信息
logger.Info("Starting Kubernetes Scheduler", "version", version.Get())

logger.Info("Golang settings", "GOGC", os.Getenv("GOGC"), "GOMAXPROCS", os.Getenv("GOMAXPROCS"), "GOTRACEBACK", os.Getenv("GOTRACEBACK"))

// 创建 configz 实例,Configz 是一个用于暴露组件配置信息的工具
if cz, err := configz.New("componentconfig"); err == nil {
cz.Set(cc.ComponentConfig)
} else {
return fmt.Errorf("unable to register configz: %s", err)
}

// cc.EventBroadcaster.StartRecordingToSink(ctx.Done()) 用于启动事件广播器并将其配置为将事件记录到一个接收器中,接收器的生命周期与 ctx.Done() 关联,即在 ctx 完成时自动关闭
// 事件广播器用于将 Kubernetes 调度器产生的事件广播给外部监听器,例如日志记录器、监控系统等,以便进行事件处理和分析
cc.EventBroadcaster.StartRecordingToSink(ctx.Done())
defer cc.EventBroadcaster.Shutdown()

// 设置用于健康检查的检查器
var checks []healthz.HealthChecker
if cc.ComponentConfig.LeaderElection.LeaderElect {
checks = append(checks, cc.LeaderElection.WatchDog)
}

// 判断当前调度器是否为领导者的函数 isLeader()。
// 首先,通过创建一个无缓冲的通道 waitingForLeader,用于等待领导者的信号。
// 然后,通过 select 语句来判断当前调度器是否为领导者
waitingForLeader := make(chan struct{})
isLeader := func() bool {
select {
case _, ok := <-waitingForLeader:
// 首先尝试从 waitingForLeader 通道接收数据,并将结果赋值给变量 ok。如果通道已关闭(ok 为 false),表示当前调度器是领导者,isLeader() 函数返回 true
return !ok
default:
// waitingForLeader 通道上无法接收到数据(即通道中没有数据可接收),则 select 语句会立即执行 default 分支,表示当前调度器正在等待领导者的选举结果,isLeader() 函数返回 false
return false
}
}

// 在安全模式下启动一个 HTTP 服务器,并将指定的 handler 注册为处理请求的处理链
if cc.SecureServing != nil {
handler := buildHandlerChain(newHealthzAndMetricsHandler(&cc.ComponentConfig, cc.InformerFactory, isLeader, checks...), cc.Authentication.Authenticator, cc.Authorization.Authorizer)
// TODO: handle stoppedCh and listenerStoppedCh returned by c.SecureServing.Serve
if _, _, err := cc.SecureServing.Serve(handler, 0, ctx.Done()); err != nil {
// fail early for secure handlers, removing the old error loop from above
return fmt.Errorf("failed to start secure server: %v", err)
}
}

// 启动 cc.InformerFactory 对象的 informer,以监听 Kubernetes API 资源的变化并触发事件
cc.InformerFactory.Start(ctx.Done())
// DynInformerFactory can be nil in tests.
if cc.DynInformerFactory != nil {
cc.DynInformerFactory.Start(ctx.Done())
}

// 等待 InformerFactory 和 DynInformerFactory 对象的缓存同步完成
cc.InformerFactory.WaitForCacheSync(ctx.Done())
// DynInformerFactory can be nil in tests.
if cc.DynInformerFactory != nil {
cc.DynInformerFactory.WaitForCacheSync(ctx.Done())
}

// 进行 LeaderElection 选举时,设置 LeaderElection 相关的回调函数、创建 LeaderElector 对象并启动选举
if cc.LeaderElection != nil {
cc.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{
// 成功获得 Leader 锁并开始作为 Leader 运行时的回调函数,其中会关闭 waitingForLeader 通道,并启动调度器 sched
OnStartedLeading: func(ctx context.Context) {
close(waitingForLeader)
sched.Run(ctx)
},
// 失去 Leader 锁并停止作为 Leader 运行时的回调函数,其中会检查是否接收到取消信号 ctx.Done(),如果接收到则正常退出,否则输出错误日志并以非零退出码强制退出
OnStoppedLeading: func() {
select {
case <-ctx.Done():
// We were asked to terminate. Exit 0.
logger.Info("Requested to terminate, exiting")
os.Exit(0)
default:
// We lost the lock.
logger.Error(nil, "Leaderelection lost")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
},
}
// 启动 LeaderElection 选举过程,该方法会在后台启动 goroutine 进行选举,并且会阻塞当前 goroutine 直到选举结束
leaderElector, err := leaderelection.NewLeaderElector(*cc.LeaderElection)
if err != nil {
return fmt.Errorf("couldn't create leader elector: %v", err)
}

leaderElector.Run(ctx)

return fmt.Errorf("lost lease")
}

// waitingForLeader 是一个 chan struct{} 类型的通道,用于在 LeaderElection 过程中的回调函数中进行信号通知。通过 close(waitingForLeader) 可以关闭这个通道,从而向其他等待该通道的 goroutine 发送一个关闭的信号
close(waitingForLeader)
sched.Run(ctx)
return fmt.Errorf("finished without leader elect")
}

根据代码注释,可以知道,他首先启动事件广播器,健康检查器,然后开启一个HTTP服务器,监听API事件变化,然后启动选举过程,最后启动Scheduler的主循环,处理调度。

总结

文档参考

评论