本站总访问量 Kubernetes-Scheduler的插件 - Jerry的小站

Jerry Gao

上帝就是真理,真理就是上帝

概述

  • 插件是如何注册的?
  • Scheduler中有哪些调度器?
  • 如何写一个调度器?

源码解析

注册过程

根据上一篇文章的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
func (r Registry) Register(name string, factory PluginFactory) error {
if _, ok := r[name]; ok {
return fmt.Errorf("a plugin named %v already exists", name)
}
r[name] = factory
return nil
}

func (r Registry) Merge(in Registry) error {
for name, factory := range in {
if err := r.Register(name, factory); err != nil {
return err
}
}
return nil
}

func New(ctx context.Context, opts *options.Options, outOfTreeRegistryOptions ...Option) (*schedulerserverconfig.CompletedConfig, *scheduler.Scheduler, error) {
...
// 内置插件注册表,里面提供了一系列的特性开关
registry := frameworkplugins.NewInTreeRegistry()
// 合并外置插件注册表
if err := registry.Merge(options.frameworkOutOfTreeRegistry); err != nil {
return nil, err
}
...
profiles, err := profile.NewMap(options.profiles, registry, recorderFactory, stopCh,
frameworkruntime.WithComponentConfigVersion(options.componentConfigVersion),
frameworkruntime.WithClientSet(client),
frameworkruntime.WithKubeConfig(options.kubeConfig),
frameworkruntime.WithInformerFactory(informerFactory),
frameworkruntime.WithSnapshotSharedLister(snapshot),
frameworkruntime.WithCaptureProfile(frameworkruntime.CaptureProfile(options.frameworkCapturer)),
frameworkruntime.WithClusterEventMap(clusterEventMap),
frameworkruntime.WithClusterEventMap(clusterEventMap),
frameworkruntime.WithParallelism(int(options.parallelism)),
frameworkruntime.WithExtenders(extenders),
frameworkruntime.WithMetricsRecorder(metricsRecorder),
)
...
}

上述代码是Scheduler的实例化过程,我们可以看到它注册了内置插件,然后合并了自定义插件,然后放到profile中。

frameworkplugins.NewInTreeRegistry

我们先来看看Register:

1
2
3
4
5
// Register是一个可用插件的集合,框架使用registry来启用并且初始化插件的配置。
// 所有的插件在初始化框架之前必须在register里面
type Registry map[string]PluginFactory
// PluginFactory是一个构建插件的方法
type PluginFactory = func(configuration runtime.Object, f framework.Handle) (framework.Plugin, error)

可以看到,Register是一个map,value为插件的名称(Name),值为插件的工厂方法。

上文调用了一个frameworkplugins.NewInTreeRegistry()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
// Features被不同的插件使用,携带features gate值
// 这个结构允许我们打破插件对内部k8s features pkg的依赖。
type Features struct {
// 启用动态资源分配
EnableDynamicResourceAllocation bool
// 启用 ReadWriteOnce (RWO) 存储卷绑定限制
// 用于限制 Pod 只能使用支持 ReadWriteOnce 访问模式的存储卷
EnableReadWriteOncePod bool
// 启用卷容量优先级
// 在调度Pod时考虑存储卷的容量情况,优先将Pod调度到卷容量足够的节点
EnableVolumeCapacityPriority bool
// 启用Pod拓扑传播中的最小域限制
// 在调度过程中限制同一拓扑域中Pod的数量
// 拓扑域用来表示集群中节点之间的物理分区或逻辑分区的概念
// 物理分区比如机架,节点组,区域等;逻辑分区比如自定义标签或注解等
// 通过将Pod分布在不同的拓扑域当中,可以降低单点故障的风险,提高应用程序的高可用性
EnableMinDomainsInPodTopologySpread bool
// 启用Pod拓扑传播中的节点包含策略
// 用于在调度过程中根据节点的标签信息限制Pod的调度
EnableNodeInclusionPolicyInPodTopologySpread bool
// 启用Pod拓扑传播中的标签键匹配策略
// 在调度中限制Pod调度到具有特定标签的节点上面
EnableMatchLabelKeysInPodTopologySpread bool
// 启用Pod调度就绪性检查
// 只将就绪的Pod调度到节点上
EnablePodSchedulingReadiness bool
// 启用Pod中断检查
// 限制Pod调度到可能中断的节点上面
EnablePodDisruptionConditions bool
// 启用Pod垂直扩缩容
// 调度过程中根据Pod的资源使用情况调整Pod的请求和限制,实现Pod自动扩缩容
EnableInPlacePodVerticalScaling bool
}

// FeatureGate 指定是否启用特定的特性
type FeatureGate interface {
// Enabled returns true if the key is enabled.
Enabled(key Feature) bool
// KnownFeatures returns a slice of strings describing the FeatureGate's known features.
KnownFeatures() []string
// DeepCopy returns a deep copy of the FeatureGate object, such that gates can be
// set on the copy without mutating the original. This is useful for validating
// config against potential feature gate changes before committing those changes.
DeepCopy() MutableFeatureGate
}

// featureGate实现了FeatureGate接口(默认实现,也就是下面的DefaultFeatureGate),以及用于标志解析的 pflag.Value
type featureGate struct {
featureGateName string

special map[Feature]func(map[Feature]FeatureSpec, map[Feature]bool, bool)

// lock guards writes to known, enabled, and reads/writes of closed
lock sync.Mutex
// known holds a map[Feature]FeatureSpec
known *atomic.Value
// enabled holds a map[Feature]bool
enabled *atomic.Value
// closed is set to true when AddFlag is called, and prevents subsequent calls to Add
closed bool
}

// 获取特性的配置,如果有则从enabled中获取,如果没有则从known中获取默认值
func (f *featureGate) Enabled(key Feature) bool {
if v, ok := f.enabled.Load().(map[Feature]bool)[key]; ok {
return v
}
if v, ok := f.known.Load().(map[Feature]FeatureSpec)[key]; ok {
return v.Default
}

panic(fmt.Errorf("feature %q is not registered in FeatureGate %q", key, f.featureGateName))
}

func FactoryAdapter(fts plfeature.Features, withFts PluginFactoryWithFts) PluginFactory {
return func(plArgs runtime.Object, fh framework.Handle) (framework.Plugin, error) {
return withFts(plArgs, fh, fts)
}
}

// NewInTreeRegistry使用内置插件构建注册表register
// 运行自定义插件的scheduler可以通过WithFrameworkOutOfTreeRegistry选项注册自定义插件
func NewInTreeRegistry() runtime.Registry {
fts := plfeature.Features{
EnableDynamicResourceAllocation: feature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation),
EnableReadWriteOncePod: feature.DefaultFeatureGate.Enabled(features.ReadWriteOncePod),
EnableVolumeCapacityPriority: feature.DefaultFeatureGate.Enabled(features.VolumeCapacityPriority),
EnableMinDomainsInPodTopologySpread: feature.DefaultFeatureGate.Enabled(features.MinDomainsInPodTopologySpread),
EnableNodeInclusionPolicyInPodTopologySpread: feature.DefaultFeatureGate.Enabled(features.NodeInclusionPolicyInPodTopologySpread),
EnableMatchLabelKeysInPodTopologySpread: feature.DefaultFeatureGate.Enabled(features.MatchLabelKeysInPodTopologySpread),
EnablePodSchedulingReadiness: feature.DefaultFeatureGate.Enabled(features.PodSchedulingReadiness),
EnablePodDisruptionConditions: feature.DefaultFeatureGate.Enabled(features.PodDisruptionConditions),
EnableInPlacePodVerticalScaling: feature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling),
}

registry := runtime.Registry{
dynamicresources.Name: runtime.FactoryAdapter(fts, dynamicresources.New),
selectorspread.Name: selectorspread.New,
imagelocality.Name: imagelocality.New,
tainttoleration.Name: tainttoleration.New,
nodename.Name: nodename.New,
nodeports.Name: nodeports.New,
nodeaffinity.Name: nodeaffinity.New,
podtopologyspread.Name: runtime.FactoryAdapter(fts, podtopologyspread.New),
nodeunschedulable.Name: nodeunschedulable.New,
noderesources.Name: runtime.FactoryAdapter(fts, noderesources.NewFit),
noderesources.BalancedAllocationName: runtime.FactoryAdapter(fts, noderesources.NewBalancedAllocation),
volumebinding.Name: runtime.FactoryAdapter(fts, volumebinding.New),
volumerestrictions.Name: runtime.FactoryAdapter(fts, volumerestrictions.New),
volumezone.Name: volumezone.New,
nodevolumelimits.CSIName: runtime.FactoryAdapter(fts, nodevolumelimits.NewCSI),
nodevolumelimits.EBSName: runtime.FactoryAdapter(fts, nodevolumelimits.NewEBS),
nodevolumelimits.GCEPDName: runtime.FactoryAdapter(fts, nodevolumelimits.NewGCEPD),
nodevolumelimits.AzureDiskName: runtime.FactoryAdapter(fts, nodevolumelimits.NewAzureDisk),
nodevolumelimits.CinderName: runtime.FactoryAdapter(fts, nodevolumelimits.NewCinder),
interpodaffinity.Name: interpodaffinity.New,
queuesort.Name: queuesort.New,
defaultbinder.Name: defaultbinder.New,
defaultpreemption.Name: runtime.FactoryAdapter(fts, defaultpreemption.New),
schedulinggates.Name: runtime.FactoryAdapter(fts, schedulinggates.New),
}

return registry
}
  • Features结构体是Kubernetes调度器中的一些特性开关。每个字段对应一个开关,代表该特性是否启用。关于每个字段的说明已经在上述代码中标注。
  • featureGate结构体内有一个enabled属性,类型为*atomic.Value*atomic.Value主要是用来对其中的值进行原子化写入操作,具体可以看看这篇文章,其值为一个map类型,因为map不是线程安全的。
  • registry := runtime.Registry实例化了一个Registry结构体,runtime.FactoryAdapter主要是为了兼容旧版本插件的New方法

profile.NewMap()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
type cfgValidator struct {
m Map
queueSort string
queueSortArgs runtime.Object
}

// key为scheduler名称,值为framework
type Map map[string]framework.Framework

func NewMap(cfgs []config.KubeSchedulerProfile, r frameworkruntime.Registry, recorderFact RecorderFactory,
stopCh <-chan struct{}, opts ...frameworkruntime.Option) (Map, error) {
m := make(Map)
v := cfgValidator{m: m}

for _, cfg := range cfgs {
p, err := newProfile(cfg, r, recorderFact, stopCh, opts...)
if err != nil {
return nil, fmt.Errorf("creating profile for scheduler name %s: %v", cfg.SchedulerName, err)
}
if err := v.validate(cfg, p); err != nil {
return nil, err
}
m[cfg.SchedulerName] = p
}
return m, nil
}

newProfile解析配置,然后将KubeSchedulerConfiguration的配置加载到新建的Framework结构体(可以视为一个Scheduler,包含了拓展点及其插件列表),最后返回一个key为SchedulerName,value为Scheduler的结构Map。在newProfile内通过,实例化了一个Framework结构体,并调用了Register的工厂方法,实例化插件。

一些调度器

内置的代码可以在kubernetes的源代码库找到,其他的一些外置的插件,可以在这里找到。

NodeName

它是一个非常常见的插件,它的作用是指定Pod所要调度的节点的名称,即spec.NodeName。他的代码很短,我们来看看:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
type NodeName struct{}

...

const (
// Name is the name of the plugin used in the plugin registry and configurations.
Name = names.NodeName

// ErrReason returned when node name doesn't match.
ErrReason = "node(s) didn't match the requested node name"
)

...

// Name returns name of the plugin. It is used in logs, etc.
func (pl *NodeName) Name() string {
return Name
}

// Filter invoked at the filter extension point.
func (pl *NodeName) Filter(ctx context.Context, _ *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
if nodeInfo.Node() == nil {
return framework.NewStatus(framework.Error, "node not found")
}
if !Fits(pod, nodeInfo) {
return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReason)
}
return nil
}

// Fits actually checks if the pod fits the node.
func Fits(pod *v1.Pod, nodeInfo *framework.NodeInfo) bool {
return len(pod.Spec.NodeName) == 0 || pod.Spec.NodeName == nodeInfo.Node().Name
}

// New initializes a new plugin and returns it.
func New(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) {
return &NodeName{}, nil
}
  • 首先,NodeName是一个空的结构体,它主要是在Filter拓展点实现了Filter方法。
  • Filter方法调用了Fits方法,该方法用pod.Spec.NodeNamenodeInfo.Node().Name相比较,如果相等,则返回true
  • pod.Spec.NodeNamenodeInfo.Node().Name不想等,则返回一个Status状态,说明Node不匹配的原因
  • 当Pod匹配时,返回nil

NodeAffinity

NodeAffinity是比较常见的一个插件,我们知道,Affinity有两种类型,一种表示偏好,即preferredDuringSchedulingIgnoredDuringExecution,一种表示必须满足,即requiredDuringSchedulingIgnoredDuringExecution

所以我们来看NodeAffinity的源码,我们可以看到:

1
2
3
4
5
type NodeAffinity struct {
handle framework.Handle
addedNodeSelector *nodeaffinity.NodeSelector
addedPrefSchedTerms *nodeaffinity.PreferredSchedulingTerms
}

其中addedNodeSelector代表requiredDuringSchedulingIgnoredDuringExecutionaddedPrefSchedTerms代表requiredDuringSchedulingIgnoredDuringExecution

NodeAffinity实现了PreFilter,Filter,PreScore,Score拓展点,我们按顺序来看PreFilter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
func (pl *NodeAffinity) PreFilter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
affinity := pod.Spec.Affinity
noNodeAffinity := (affinity == nil ||
affinity.NodeAffinity == nil ||
affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution == nil)
if noNodeAffinity && pl.addedNodeSelector == nil && pod.Spec.NodeSelector == nil {
// NodeAffinity Filter has nothing to do with the Pod.
return nil, framework.NewStatus(framework.Skip)
}

state := &preFilterState{requiredNodeSelectorAndAffinity: nodeaffinity.GetRequiredNodeAffinity(pod)}
cycleState.Write(preFilterStateKey, state)

if noNodeAffinity || len(affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms) == 0 {
return nil, nil
}

// Check if there is affinity to a specific node and return it.
terms := affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms
var nodeNames sets.Set[string]
for _, t := range terms {
var termNodeNames sets.Set[string]
for _, r := range t.MatchFields {
if r.Key == metav1.ObjectNameField && r.Operator == v1.NodeSelectorOpIn {
// The requirements represent ANDed constraints, and so we need to
// find the intersection of nodes.
s := sets.New(r.Values...)
if termNodeNames == nil {
termNodeNames = s
} else {
termNodeNames = termNodeNames.Intersection(s)
}
}
}
if termNodeNames == nil {
// If this term has no node.Name field affinity,
// then all nodes are eligible because the terms are ORed.
return nil, nil
}
nodeNames = nodeNames.Union(termNodeNames)
}
// If nodeNames is not nil, but length is 0, it means each term have conflicting affinity to node.Name;
// therefore, pod will not match any node.
if nodeNames != nil && len(nodeNames) == 0 {
return nil, framework.NewStatus(framework.UnschedulableAndUnresolvable, errReasonConflict)
} else if len(nodeNames) > 0 {
return &framework.PreFilterResult{NodeNames: nodeNames}, nil
}
return nil, nil

}
  • 这个方法的实现类似于NodeSelector,首先是判断是否定义了RequiredDuringSchedulingIgnoredDuringExecution字段,如果没有则进入下一个拓展点
  • 接着,将标签选择和节点选择的信息写到cycleState方便后续的拓展点处理
  • 最后,判断是否存在节点选择的信息,即Key为meta.name,匹配模式为IN,如果有则返回选择的节点列表,如果没有则直接进入下一步处理

然后是Filter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func (pl *NodeAffinity) Filter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
node := nodeInfo.Node()
if node == nil {
return framework.NewStatus(framework.Error, "node not found")
}
// 查找RequiredDuringSchedulingIgnoredDuringExecution是否能找到相应的节点
if pl.addedNodeSelector != nil && !pl.addedNodeSelector.Match(node) {
return framework.NewStatus(framework.UnschedulableAndUnresolvable, errReasonEnforced)
}

// 获取PreFilter的cycleState结构体(Pod指定的nodeselector和nodeaffinity)
s, err := getPreFilterState(state)
if err != nil {
// Fallback to calculate requiredNodeSelector and requiredNodeAffinity
// here when PreFilter is disabled.
s = &preFilterState{requiredNodeSelectorAndAffinity: nodeaffinity.GetRequiredNodeAffinity(pod)}
}

// Pod指定的nodeselector和nodeaffinity是否冲突
match, _ := s.requiredNodeSelectorAndAffinity.Match(node)
if !match {
return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonPod)
}

return nil
}
  • 首先查找RequiredDuringSchedulingIgnoredDuringExecution是否能找到相应的节点
  • 然后获取PreFilter的cycleState结构体,这个是已经在PreFilter计算过并保存的结构体,这个结构体中保存了NodeSelectorNodeAffinity的信息
  • 最后判断NodeSelectorNodeAffinity的信息是否冲突

同样的,PreScore也是对Score进行预处理,将preScoreState存入cycleState结构中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (pl *NodeAffinity) PreScore(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodes []*v1.Node) *framework.Status {
if len(nodes) == 0 {
return nil
}
preferredNodeAffinity, err := getPodPreferredNodeAffinity(pod)
if err != nil {
return framework.AsStatus(err)
}
state := &preScoreState{
preferredNodeAffinity: preferredNodeAffinity,
}
cycleState.Write(preScoreStateKey, state)
return nil
}

Score

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
// 当matchExpressions和matchFields匹配的时候,将weight加入计算,最终返回weight之和
func (t *PreferredSchedulingTerms) Score(node *v1.Node) int64 {
var score int64
nodeLabels := labels.Set(node.Labels)
nodeFields := extractNodeFields(node)
for _, term := range t.terms {
// parse errors are reported in NewPreferredSchedulingTerms.
if ok, _ := term.match(nodeLabels, nodeFields); ok {
score += int64(term.weight)
}
}
return score
}

// 返回节点权重之和
func (pl *NodeAffinity) Score(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {
nodeInfo, err := pl.handle.SnapshotSharedLister().NodeInfos().Get(nodeName)
if err != nil {
return 0, framework.AsStatus(fmt.Errorf("getting node %q from Snapshot: %w", nodeName, err))
}

node := nodeInfo.Node()

// 通过AddedAffinity规则计算节点分数
var count int64
if pl.addedPrefSchedTerms != nil {
count += pl.addedPrefSchedTerms.Score(node)
}

// 获取PreScore的preScoreState结构,如果没有则重新计算
s, err := getPreScoreState(state)
if err != nil {
// Fallback to calculate preferredNodeAffinity here when PreScore is disabled.
preferredNodeAffinity, err := getPodPreferredNodeAffinity(pod)
if err != nil {
return 0, framework.AsStatus(err)
}
s = &preScoreState{
preferredNodeAffinity: preferredNodeAffinity,
}
}

// 通过preferredNodeAffinity规则计算节点分数
if s.preferredNodeAffinity != nil {
count += s.preferredNodeAffinity.Score(node)
}

return count, nil
}
  • 该方法对匹配的节点规则进行weight的累加
  • AddedAffinity是在调度规则中指定的,对指定的Pod生效(通过schedulerName),可以看这个文章
  • preferredNodeAffinity规则在Pod的yaml中指定

总结

本文从源码角度概述了调度器插件的注册过程,介绍了NodeSelector插件和NodeAffinity插件的实现方法。

评论