本站总访问量 Kubernetes中的Kubelet组件 - Jerry的小站

Jerry Gao

上帝就是真理,真理就是上帝

概述

Kubelet是Kubernetes集群中的核心节点,运行在每一个节点之上,负责管理节点的容器运行时。

  • 节点注册:在启动时将节点注册到控制平面,以便主控制平面知道该节点的存在
  • 容器启动与监控(Container Startup and Monitoring):kubelet负责启动和监控在节点上运行的容器。它根据从主控制平面接收到的Pod定义,创建并管理相应的容器。它监视容器的运行状态,如果容器停止或崩溃,kubelet会重新启动容器以确保其持续运行。
  • 资源管理(Resource Management):kubelet负责监视节点的资源使用情况,并根据预设的资源限制和请求来管理容器的资源分配。它会与容器运行时(如Docker、Containerd等)交互,确保容器只使用其分配的资源量。
  • 存储卷管理(Volume Management):kubelet负责挂载和管理Pod中定义的存储卷。它会与存储插件进行交互,将存储卷挂载到容器中,并确保存储卷的可用性和一致性。
  • 安全与准入控制(Security and Admission Control):kubelet负责执行容器的安全策略和准入控制机制。它会与容器运行时进行交互,确保容器在运行时具备必要的安全特性,并执行一些策略,如限制容器的权限和网络访问等。
  • 网络管理(Networking):kubelet负责管理容器的网络设置。它会与网络插件进行交互,为容器分配IP地址,并配置容器间和容器与外部网络的通信。
  • 节点状态报告(Node Status Reporting):kubelet会定期向主控制平面报告节点的状态信息,包括节点的资源使用情况、已运行的容器列表等。这样主控制平面就能了解到节点的健康状况和容器的运行情况。

在这里,我们主要看:

  • 节点是如何被注册的?
  • 容器如何启动?如何被监控
  • kubelet如何进行资源管理?

源码分析

cmd

首先我们先看cmd模块,看看kubelet的初始化过程以及对外暴露了哪些使用接口。

1
2
3
4
5
func main() {
command := app.NewKubeletCommand()
code := cli.Run(command)
os.Exit(code)
}

app.NewKubeletCommand()这个主要是创建了一个cobra.Command,关于cobra.Command,请看cobra的介绍pflag的介绍,让我们来看下做了什么。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
// NewKubeletCommand creates a *cobra.Command object with default parameters
func NewKubeletCommand() *cobra.Command {
// 定义了kubelet命令
cleanFlagSet := pflag.NewFlagSet(componentKubelet, pflag.ContinueOnError)
// 类似于cobra的智能建议功能,这里是将输错的选项自动识别为正确的选项
cleanFlagSet.SetNormalizeFunc(cliflag.WordSepNormalizeFunc)
// 创建一些默认的flag,包含默认的值
/*
func NewKubeletFlags() *KubeletFlags {
return &KubeletFlags{
ContainerRuntimeOptions: *NewContainerRuntimeOptions(), // 容器运行时期望的选项
CertDirectory: "/var/lib/kubelet/pki", // pki证书的目录,请查看https://kubernetes.io/zh-cn/docs/setup/best-practices/certificates/
RootDirectory: filepath.Clean(defaultRootDir), // kubelet相关文件的目录,默认是/var/lib/kubelet
MaxContainerCount: -1,
MaxPerPodContainerCount: 1,
MinimumGCAge: metav1.Duration{Duration: 0},
RegisterSchedulable: true, // 注册节点的实现,使节点可调度
NodeLabels: make(map[string]string), // 节点标签
}
}
*/
kubeletFlags := options.NewKubeletFlags()
kubeletConfig, err := options.NewKubeletConfiguration()
// programmer error
if err != nil {
klog.ErrorS(err, "Failed to create a new kubelet configuration")
os.Exit(1)
}

cmd := &cobra.Command{
Use: componentKubelet, // 字面量,值为“kubelet”
Long: `The kubelet is the primary "node agent" that runs on each
node. It can register the node with the apiserver using one of: the hostname; a flag to
override the hostname; or specific logic for a cloud provider.

The kubelet works in terms of a PodSpec. A PodSpec is a YAML or JSON object
that describes a pod. The kubelet takes a set of PodSpecs that are provided through
various mechanisms (primarily through the apiserver) and ensures that the containers
described in those PodSpecs are running and healthy. The kubelet doesn't manage
containers which were not created by Kubernetes.

Other than from an PodSpec from the apiserver, there are two ways that a container
manifest can be provided to the Kubelet.

File: Path passed as a flag on the command line. Files under this path will be monitored
periodically for updates. The monitoring period is 20s by default and is configurable
via a flag.

HTTP endpoint: HTTP endpoint passed as a parameter on the command line. This endpoint
is checked every 20 seconds (also configurable with a flag).`,
// The Kubelet has special flag parsing requirements to enforce flag precedence rules,
// so we do all our parsing manually in Run, below.
// DisableFlagParsing=true provides the full set of flags passed to the kubelet in the
// `args` arg to Run, without Cobra's interference.
DisableFlagParsing: true,
SilenceUsage: true,
RunE: func(cmd *cobra.Command, args []string) error {
// initial flag parse, since we disable cobra's flag parsing
if err := cleanFlagSet.Parse(args); err != nil {
return fmt.Errorf("failed to parse kubelet flag: %w", err)
}

// check if there are non-flag arguments in the command line
cmds := cleanFlagSet.Args()
if len(cmds) > 0 {
return fmt.Errorf("unknown command %+s", cmds[0])
}

// short-circuit on help
help, err := cleanFlagSet.GetBool("help")
if err != nil {
return errors.New(`"help" flag is non-bool, programmer error, please correct`)
}
if help {
return cmd.Help()
}

// short-circuit on verflag
verflag.PrintAndExitIfRequested()

// 从初始的配置中设置功能门
if err := utilfeature.DefaultMutableFeatureGate.SetFromMap(kubeletConfig.FeatureGates); err != nil {
return fmt.Errorf("failed to set feature gates from initial flags-based config: %w", err)
}

// validate the initial KubeletFlags
// ValidateKubeletFlags 验证 Kubelet 的配置标志,并在无效时返回错误。主要是遍历nodelabel并验证其合法性,如果不属于kubelet标签,则返回未知标签,如果没通过合法性验证,主要是名称符合DNS1123subdomain规则(https://datatracker.ietf.org/doc/html/rfc1123和https://kuboard.cn/learning/k8s-intermediate/obj/names.html#names)值不超过63个字符
if err := options.ValidateKubeletFlags(kubeletFlags); err != nil {
return fmt.Errorf("failed to validate kubelet flags: %w", err)
}
// 镜像垃圾收集器(image garbage collector)负责清理不再使用的镜像。然而,--pod-infra-container-image所指定的镜像不会被镜像垃圾收集器清理,因为它是作为Pod基础设施容器而存在的,始终被使用
if cleanFlagSet.Changed("pod-infra-container-image") {
klog.InfoS("--pod-infra-container-image will not be pruned by the image garbage collector in kubelet and should also be set in the remote runtime")
}

// load kubelet config file, if provided
if configFile := kubeletFlags.KubeletConfigFile; len(configFile) > 0 {
kubeletConfig, err = loadConfigFile(configFile)
if err != nil {
return fmt.Errorf("failed to load kubelet config file, error: %w, path: %s", err, configFile)
}
// We must enforce flag precedence by re-parsing the command line into the new object.
// This is necessary to preserve backwards-compatibility across binary upgrades.
// See issue #56171 for more details.
if err := kubeletConfigFlagPrecedence(kubeletConfig, args); err != nil {
return fmt.Errorf("failed to precedence kubeletConfigFlag: %w", err)
}
// update feature gates based on new config
if err := utilfeature.DefaultMutableFeatureGate.SetFromMap(kubeletConfig.FeatureGates); err != nil {
return fmt.Errorf("failed to set feature gates from initial flags-based config: %w", err)
}
}

// Config and flags parsed, now we can initialize logging.
logs.InitLogs()
if err := logsapi.ValidateAndApplyAsField(&kubeletConfig.Logging, utilfeature.DefaultFeatureGate, field.NewPath("logging")); err != nil {
return fmt.Errorf("initialize logging: %v", err)
}
cliflag.PrintFlags(cleanFlagSet)

// We always validate the local configuration (command line + config file).
// This is the default "last-known-good" config for dynamic config, and must always remain valid.
if err := kubeletconfigvalidation.ValidateKubeletConfiguration(kubeletConfig, utilfeature.DefaultFeatureGate); err != nil {
return fmt.Errorf("failed to validate kubelet configuration, error: %w, path: %s", err, kubeletConfig)
}

if (kubeletConfig.KubeletCgroups != "" && kubeletConfig.KubeReservedCgroup != "") && (strings.Index(kubeletConfig.KubeletCgroups, kubeletConfig.KubeReservedCgroup) != 0) {
klog.InfoS("unsupported configuration:KubeletCgroups is not within KubeReservedCgroup")
}

// construct a KubeletServer from kubeletFlags and kubeletConfig
kubeletServer := &options.KubeletServer{
KubeletFlags: *kubeletFlags,
KubeletConfiguration: *kubeletConfig,
}

// use kubeletServer to construct the default KubeletDeps
kubeletDeps, err := UnsecuredDependencies(kubeletServer, utilfeature.DefaultFeatureGate)
if err != nil {
return fmt.Errorf("failed to construct kubelet dependencies: %w", err)
}

if err := checkPermissions(); err != nil {
klog.ErrorS(err, "kubelet running with insufficient permissions")
}

// make the kubelet's config safe for logging
config := kubeletServer.KubeletConfiguration.DeepCopy()
for k := range config.StaticPodURLHeader {
config.StaticPodURLHeader[k] = []string{"<masked>"}
}
// log the kubelet's config for inspection
klog.V(5).InfoS("KubeletConfiguration", "configuration", config)

// set up signal context for kubelet shutdown
ctx := genericapiserver.SetupSignalContext()

utilfeature.DefaultMutableFeatureGate.AddMetrics()
// run the kubelet
return Run(ctx, kubeletServer, kubeletDeps, utilfeature.DefaultFeatureGate)
},
}

// keep cleanFlagSet separate, so Cobra doesn't pollute it with the global flags
kubeletFlags.AddFlags(cleanFlagSet)
options.AddKubeletConfigFlags(cleanFlagSet, kubeletConfig)
options.AddGlobalFlags(cleanFlagSet)
cleanFlagSet.BoolP("help", "h", false, fmt.Sprintf("help for %s", cmd.Name()))

// ugly, but necessary, because Cobra's default UsageFunc and HelpFunc pollute the flagset with global flags
const usageFmt = "Usage:\n %s\n\nFlags:\n%s"
cmd.SetUsageFunc(func(cmd *cobra.Command) error {
fmt.Fprintf(cmd.OutOrStderr(), usageFmt, cmd.UseLine(), cleanFlagSet.FlagUsagesWrapped(2))
return nil
})
cmd.SetHelpFunc(func(cmd *cobra.Command, args []string) {
fmt.Fprintf(cmd.OutOrStdout(), "%s\n\n"+usageFmt, cmd.Long, cmd.UseLine(), cleanFlagSet.FlagUsagesWrapped(2))
})

return cmd
}
    1. 首先用cobra和pflag库为模版使得kubelet更加符合类Unix风格,同时定义了组件的名称、描述、错误处理等属性。
    1. cobra.Command的RunE属性:该属性会被调用并返回结果,程序以它为主要的入口。
    • (1). 首先如果命令中存在help或者version则使用短路原则打印kubelet的帮助信息
    • (2). 加载配置文件并给kubeletConfig结构体赋值,同时设置功能门
    • (3). 初始化日志
    • (4). 验证本地配置文件+命令行的有效性
    • (5). 通过使用kubeletConfig来构造kubeletServer然后构造kubeletDeps
    • (6). 检查当前运行的权限,必须以uid必须为0,即应该以root权限运行
    • (7). 运行kubeletServer(见下文)
    1. 一些cobra全局标志的处理,在这里不做介绍,不是我们主要的关注点

kubeletServer的启动

我们看上文的Run方法:

1
2
3
4
5
6
7
8
9
10
11
func Run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate) error {

...
日志打印和兼容windows启动的处理
...

if err := run(ctx, s, kubeDeps, featureGate); err != nil {
return fmt.Errorf("failed to run Kubelet: %w", err)
}
return nil
}

可以看到Run方法除了做一些日志打印和Windows平台的兼容之外,就是直接调用了run方法来做具体的启动处理。这里的Run方法有四个参数:

  • context.Context
  • *options.KubeletServer: kubeletServer的配置
  • *kubelet.Dependencies: 注入依赖项,从配置中取得
  • featuregate.FeatureGate: 开启的功能门

我们来看KubeletServer:

1
2
3
4
type KubeletServer struct {
KubeletFlags
kubeletconfig.KubeletConfiguration
}

它是kubeletServer用来启动服务的配置,其中,KubeletFlags是一些无法被修改的内容,当节点(kubelet Server)启动后,不能被修改。这里主要是存放一些不共享的内容,比如说:

1
2
3
4
type KubeletFlags struct {
HostnameOverride string
NodeIP string
}

HostnameOverride和NodeIP分别代表节点的主机名和节点的IP,用来识别节点。
kubeletconfig.KubeletConfiguration指的是可以在集群之间共享的配置集,它是一个runtime对象,意味着在运行时可能被修改。

来看看run方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate) (err error) {
1. 使用utilfeature.DefaultMutableFeatureGate.SetFromMap设置初始featureGate
2. 使用options.ValidateKubeletServer(s)验证初始featureGate
3. 使用utilfeature.DefaultFeatureGate.Enabled(features.MemoryQoS),如果打开了MemoryQoS,则打印警告信息
a. MemoryQoS是一个关于内存资源服务质量提升的功能门,是在kubernetes v1.22引入的一个alpha功能,改进了Linux节点实现内存资源请求的限制。
b. 在cgroup v1,当CPU资源即将达到限制时,Kubernetes会开始限制Pod的CPU资源的使用,Pod并不会终止;但是cgroup v1无法压缩内存的使用,当超过内存限制时,容器将会被OOM终止。
c. cgroup v2,提供了丰富的参数用于实现内存预留与分配限速,MemoryQos使用memory.min/memory.high来实现对内存的限速分配,从而避免Burstable Pod被Kill和节点瞬时压力暴涨的风险
4. 获取锁文件,避免两个kubelet同时运行
...

5. 将kubelet的配置暴露在/configz端点,可以通过访问/api/v1/nodes/[node_name]/proxy/configz来获取kubelet的配置
// 初始化可用的API版本,并将该配置保存到配置(全局变量configs)
err = initConfigz(&s.KubeletConfiguration) // 见部分具体实现细节
if err != nil {
klog.ErrorS(err, "Failed to register kubelet configuration with configz")
}
...
6. 设置云服务商接口
// 返回一个云商的Interface抽象、可插拔接口
if kubeDeps.Cloud == nil {
// s.CloudProvider是一个字符串,表示云服务商
// 判断是否是内部云服务商
if !cloudprovider.IsExternal(s.CloudProvider) {
// cloudprovider.DeprecationWarningForProvider 如果是内部云服务提供商,提示:内部云服务提供商已被废弃
// 使用external-cloud-provider
cloudprovider.DeprecationWarningForProvider(s.CloudProvider)
// 初始化云服务商实例
// s.CloudProvider是云服务商的名字,s.CloudConfigFile是云服务商的配置文件
cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile)
if err != nil {
return err
}
if cloud != nil {
klog.V(2).InfoS("Successfully initialized cloud provider", "cloudProvider", s.CloudProvider, "cloudConfigFile", s.CloudConfigFile)
}
kubeDeps.Cloud = cloud
}
}
// 获取本机hostname,如果设置了s.HostnameOverride就用s.HostnameOverride,转换为小写
hostName, err := nodeutil.GetHostname(s.HostnameOverride)
...
// 通过cloud.Instance.CurrentNodeName接口获取当前节点的节点名称,大多数时候是主机名
nodeName, err := getNodeName(kubeDeps.Cloud, hostName)

7. 一大串身份验证相关的逻辑,启动证书管理器,身份验证器,使用kubeconfig, bootstrap kubconfig和cert证书向apiserver新建或轮转kubectl证书
// 以下全是kubeconfig身份验证相关
switch {
case standaloneMode:
kubeDeps.KubeClient = nil
kubeDeps.EventClient = nil
kubeDeps.HeartbeatClient = nil
klog.InfoS("Standalone mode, no API client")

case kubeDeps.KubeClient == nil, kubeDeps.EventClient == nil, kubeDeps.HeartbeatClient == nil:
// 构建kubeconfig证书(新获取或证书轮转,通过kubeconfig,bootstrap kubeconfig和cert证书)
clientConfig, onHeartbeatFailure, err := buildKubeletClientConfig(ctx, s, kubeDeps.TracerProvider, nodeName)
if err != nil {
return err
}
if onHeartbeatFailure == nil {
return errors.New("onHeartbeatFailure must be a valid function other than nil")
}
kubeDeps.OnHeartbeatFailure = onHeartbeatFailure

kubeDeps.KubeClient, err = clientset.NewForConfig(clientConfig)
if err != nil {
return fmt.Errorf("failed to initialize kubelet client: %w", err)
}

// make a separate client for events
// 为event创建一个单独的证书
eventClientConfig := *clientConfig
eventClientConfig.QPS = float32(s.EventRecordQPS)
eventClientConfig.Burst = int(s.EventBurst)
kubeDeps.EventClient, err = v1core.NewForConfig(&eventClientConfig)
if err != nil {
return fmt.Errorf("failed to initialize kubelet event client: %w", err)
}

// make a separate client for heartbeat with throttling disabled and a timeout attached
// 为心跳创建一个单独的客户端,禁用节流并附加超时
heartbeatClientConfig := *clientConfig
heartbeatClientConfig.Timeout = s.KubeletConfiguration.NodeStatusUpdateFrequency.Duration
// The timeout is the minimum of the lease duration and status update frequency
leaseTimeout := time.Duration(s.KubeletConfiguration.NodeLeaseDurationSeconds) * time.Second
if heartbeatClientConfig.Timeout > leaseTimeout {
heartbeatClientConfig.Timeout = leaseTimeout
}

heartbeatClientConfig.QPS = float32(-1)
kubeDeps.HeartbeatClient, err = clientset.NewForConfig(&heartbeatClientConfig)
if err != nil {
return fmt.Errorf("failed to initialize kubelet heartbeat client: %w", err)
}
}

if kubeDeps.Auth == nil {
auth, runAuthenticatorCAReload, err := BuildAuth(nodeName, kubeDeps.KubeClient, s.KubeletConfiguration)
if err != nil {
return err
}
kubeDeps.Auth = auth
runAuthenticatorCAReload(ctx.Done())
}

}

部分具体实现细节

cloud-provider/plogin.go

这个文件是一些云服务商相关的接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
// Factory是一个返回cloudprovider.Interface的函数工厂,config用于加载特定云商的配置。
// 如果没有配置,则这个参数将是nil
type Factory func(config io.Reader) (Interface, error)

// 所有已经注册的云商
var (
// 互斥锁,用于保证providers变量的线程安全
providersMutex sync.Mutex
// 所有已注册的云商
providers = make(map[string]Factory)
// 已经被废弃的内部支持的云商
deprecatedCloudProviders = []struct {
name string
external bool
detail string
}{
{"azure", false, "The Azure provider is deprecated and will be removed in a future release. Please use https://github.com/kubernetes-sigs/cloud-provider-azure"},
{"gce", false, "The GCE provider is deprecated and will be removed in a future release. Please use https://github.com/kubernetes/cloud-provider-gcp"},
{"vsphere", false, "The vSphere provider is deprecated and will be removed in a future release. Please use https://github.com/kubernetes/cloud-provider-vsphere"},
}
)

// 通过名称注册一个cloudprovider.Factory,该函数工厂在kubeserver启动时将被调用
func RegisterCloudProvider(name string, cloud Factory) {
providersMutex.Lock()
defer providersMutex.Unlock()
if _, found := providers[name]; found {
klog.Fatalf("Cloud provider %q was registered twice", name)
}
klog.V(1).Infof("Registered cloud provider %q", name)
providers[name] = cloud
}

// 是否是已经注册的云商
func IsCloudProvider(name string) bool {
providersMutex.Lock()
defer providersMutex.Unlock()
_, found := providers[name]
return found
}

// 调用函数工厂,创建一个cloudprovider.Interface云商实例
func GetCloudProvider(name string, config io.Reader) (Interface, error) {
providersMutex.Lock()
defer providersMutex.Unlock()
f, found := providers[name]
if !found {
return nil, nil
}
return f(config)
}

// 创建指定云提供商的实例
func InitCloudProvider(name string, configFilePath string) (Interface, error) {
var cloud Interface
var err error

if name == "" {
return nil, nil
}

if IsExternal(name) {
klog.Info("External cloud provider specified")
return nil, nil
}

if configFilePath != "" {
var config *os.File
config, err = os.Open(configFilePath)
if err != nil {
klog.Fatalf("Couldn't open cloud provider configuration %s: %#v",
configFilePath, err)
}
// 新建一个云商的实例,name是云商的名字,config是云商的配置文件,类型是io.Reader
defer config.Close()
cloud, err = GetCloudProvider(name, config)
} else {
// Pass explicit nil so plugins can actually check for nil. See
// "Why is my nil error value not equal to nil?" in golang.org/doc/faq.
// nil != nil的原因是,interface包含两个属性,T表示类型,V表示值。当T赋予具体的类型,V为nil时,即 interface {T: int, V: nil}
// 这个interface将不等于nil,因为T值已被设置。也就是只有interface {T: nil, V: nil}时,这个interface才是 nil == nil
// 这里要讲的是,如果这里显式传nil,当后续处理判断config == nil时不会出现不符合预期的情况,否则将会对后续的插件造成困扰
cloud, err = GetCloudProvider(name, nil)
}

if err != nil {
return nil, fmt.Errorf("could not init cloud provider %q: %v", name, err)
}
if cloud == nil {
return nil, fmt.Errorf("unknown cloud provider %q", name)
}

return cloud, nil
}

最后来看cloud.Interface这个接口
// Interface is an abstract, pluggable interface for cloud providers.
// 云商的抽象、可插拔接口
// Interface is an abstract, pluggable interface for cloud providers.
// 云商的抽象、可插拔接口
type Interface interface {
// Initialize provides the cloud with a kubernetes client builder and may spawn goroutines
// to perform housekeeping or run custom controllers specific to the cloud provider.
// Any tasks started here should be cleaned up when the stop channel closes.
// Initialize 为云提供了 kubernetes 客户端构建器,并可能生成 goroutine
// 执行内务管理或运行特定于云提供商的自定义控制器。
// 当停止通道关闭时,应清除此处启动的任何任务。
Initialize(clientBuilder ControllerClientBuilder, stop <-chan struct{})
// LoadBalancer returns a balancer interface. Also returns true if the interface is supported, false otherwise.
//LoadBalancer 返回一个负载均衡接口。如果支持该接口,也返回 true,否则返回 false。
LoadBalancer() (LoadBalancer, bool)
// Instances returns an instances interface. Also returns true if the interface is supported, false otherwise.
// Instances 返回一个实例接口。如果支持该接口,也返回 true,否则返回 false。
Instances() (Instances, bool)
// InstancesV2 is an implementation for instances and should only be implemented by external cloud providers.
// Implementing InstancesV2 is behaviorally identical to Instances but is optimized to significantly reduce
// API calls to the cloud provider when registering and syncing nodes. Implementation of this interface will
// disable calls to the Zones interface. Also returns true if the interface is supported, false otherwise.
// InstancesV2 是实例的实现,只能由外部云提供商实现。
// 实现 InstancesV2 在行为上与 Instances 相同,但经过优化,可以显著减少在注册和同步节点时对云提供商的 API 调用。
// 实现此接口将禁用对 Zones 接口的调用。如果支持该接口,也返回 true,否则返回 false。
InstancesV2() (InstancesV2, bool)
// Zones returns a zones interface. Also returns true if the interface is supported, false otherwise.
// DEPRECATED: Zones is deprecated in favor of retrieving zone/region information from InstancesV2.
// This interface will not be called if InstancesV2 is enabled.
// Zones 返回一个区域接口。如果支持该接口,也返回 true,否则返回 false。
// 已弃用:Zones 已弃用,以便从 InstancesV2 检索区域信息。
// 如果启用了 InstancesV2,则不会调用此接口。
Zones() (Zones, bool)
// Clusters returns a clusters interface. Also returns true if the interface is supported, false otherwise.
// Clusters 返回一个集群接口。如果支持该接口,也返回 true,否则返回 false。
Clusters() (Clusters, bool)
// Routes returns a routes interface along with whether the interface is supported.
// Routes 返回一个路由接口以及该接口是否受支持。
Routes() (Routes, bool)
// ProviderName returns the cloud provider ID.
// ProviderName 返回云提供商 ID。
ProviderName() string
// HasClusterID returns true if a ClusterID is required and set
// HasClusterID 如果需要并设置了 ClusterID,则返回 true
HasClusterID() bool
}

看Instance
// Instances is an abstract, pluggable interface for sets of instances.
type Instances interface {
// NodeAddresses returns the addresses of the specified instance.
NodeAddresses(ctx context.Context, name types.NodeName) ([]v1.NodeAddress, error)
// NodeAddressesByProviderID returns the addresses of the specified instance.
// The instance is specified using the providerID of the node. The
// ProviderID is a unique identifier of the node. This will not be called
// from the node whose nodeaddresses are being queried. i.e. local metadata
// services cannot be used in this method to obtain nodeaddresses
NodeAddressesByProviderID(ctx context.Context, providerID string) ([]v1.NodeAddress, error)
// InstanceID returns the cloud provider ID of the node with the specified NodeName.
// Note that if the instance does not exist, we must return ("", cloudprovider.InstanceNotFound)
// cloudprovider.InstanceNotFound should NOT be returned for instances that exist but are stopped/sleeping
InstanceID(ctx context.Context, nodeName types.NodeName) (string, error)
// InstanceType returns the type of the specified instance.
InstanceType(ctx context.Context, name types.NodeName) (string, error)
// InstanceTypeByProviderID returns the type of the specified instance.
InstanceTypeByProviderID(ctx context.Context, providerID string) (string, error)
// AddSSHKeyToAllInstances adds an SSH public key as a legal identity for all instances
// expected format for the key is standard ssh-keygen format: <protocol> <blob>
AddSSHKeyToAllInstances(ctx context.Context, user string, keyData []byte) error
// CurrentNodeName returns the name of the node we are currently running on
// On most clouds (e.g. GCE) this is the hostname, so we provide the hostname
CurrentNodeName(ctx context.Context, hostname string) (types.NodeName, error)
// InstanceExistsByProviderID returns true if the instance for the given provider exists.
// If false is returned with no error, the instance will be immediately deleted by the cloud controller manager.
// This method should still return true for instances that exist but are stopped/sleeping.
InstanceExistsByProviderID(ctx context.Context, providerID string) (bool, error)
// InstanceShutdownByProviderID returns true if the instance is shutdown in cloudprovider
InstanceShutdownByProviderID(ctx context.Context, providerID string) (bool, error)
}

// Instances is an abstract, pluggable interface for sets of instances.
// 实例集的抽象、可插入接口。
type Instances interface {
// NodeAddresses returns the addresses of the specified instance.
// NodeAddresses返回指定实例的地址。
NodeAddresses(ctx context.Context, name types.NodeName) ([]v1.NodeAddress, error)
// NodeAddressesByProviderID returns the addresses of the specified instance.
// The instance is specified using the providerID of the node. The
// ProviderID is a unique identifier of the node. This will not be called
// from the node whose nodeaddresses are being queried. i.e. local metadata
// services cannot be used in this method to obtain nodeaddresses
// NodeAddressesByProviderID 返回指定实例的地址。
// 使用节点的 providerID 指定实例。ProviderID 是节点的唯一标识符。这不会从正在查询其节点地址的节点调用。
// 即不能在此方法中使用本地元数据服务来获取节点地址
NodeAddressesByProviderID(ctx context.Context, providerID string) ([]v1.NodeAddress, error)
// InstanceID returns the cloud provider ID of the node with the specified NodeName.
// Note that if the instance does not exist, we must return ("", cloudprovider.InstanceNotFound)
// cloudprovider.InstanceNotFound should NOT be returned for instances that exist but are stopped/sleeping
// InstanceID 返回具有指定 NodeName 的节点的云提供商 ID。
// 请注意,如果实例不存在,我们必须返回(“”,cloudprovider.InstanceNotFound)
// cloudprovider.InstanceNotFound 不应返回存在但已停止/休眠的实例
InstanceID(ctx context.Context, nodeName types.NodeName) (string, error)
// InstanceType returns the type of the specified instance.
// InstanceType 返回指定实例的类型。
InstanceType(ctx context.Context, name types.NodeName) (string, error)
// InstanceTypeByProviderID returns the type of the specified instance.
// InstanceTypeByProviderID返回指定实例的类型
InstanceTypeByProviderID(ctx context.Context, providerID string) (string, error)
// AddSSHKeyToAllInstances adds an SSH public key as a legal identity for all instances
// expected format for the key is standard ssh-keygen format: <protocol> <blob>
// AddSSHKeyToAllInstances 将 SSH 公钥作为所有实例的合法标识添加
AddSSHKeyToAllInstances(ctx context.Context, user string, keyData []byte) error
// CurrentNodeName returns the name of the node we are currently running on
// On most clouds (e.g. GCE) this is the hostname, so we provide the hostname
// CurrentNodeName返回我们当前正在运行的节点的名称
// 在大多数云(例如 GCE)上,这是主机名,因此我们提供主机名
CurrentNodeName(ctx context.Context, hostname string) (types.NodeName, error)
// InstanceExistsByProviderID returns true if the instance for the given provider exists.
// If false is returned with no error, the instance will be immediately deleted by the cloud controller manager.
// This method should still return true for instances that exist but are stopped/sleeping.
// InstanceExistsByProviderID 如果给定提供程序的实例存在,则返回 true。
InstanceExistsByProviderID(ctx context.Context, providerID string) (bool, error)
// InstanceShutdownByProviderID returns true if the instance is shutdown in cloudprovider
// InstanceShutdownByProviderID 如果云提供商中的实例已关闭,则返回 true
InstanceShutdownByProviderID(ctx context.Context, providerID string) (bool, error)
}

UnsecuredDependencies(kubeletServer, utilfeature.DefaultFeatureGate)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
// kubeletDeps的构造
kubeletDeps, err := UnsecuredDependencies(kubeletServer, utilfeature.DefaultFeatureGate)

这里接受两个参数kubeletServer,是由命令行配置kubeletFlags和kubeConfig为属性的结构体,utilfeature.DefaultFeatureGate是默认的功能门,可用于开关kubelet的某些特性。

kubeletServer := &options.KubeletServer{
KubeletFlags: *kubeletFlags,
KubeletConfiguration: *kubeletConfig,
}

// UnsecuredDependencies 用于返回Dependencies适合运行的依赖项,或者当设置不可用时返回错误。它不用于启动后台进程和不做身份校验。
func UnsecuredDependencies(s *options.KubeletServer, featureGate featuregate.FeatureGate) (*kubelet.Dependencies, error) {
// 初始化TLS配置项,如果没有配置
// InitializeTLS 检查已配置的 TLSCertFile 和 TLSPrivateKeyFile:如果未指定,则会生成新的自签名证书和密钥文件。返回已配置的 server.TLSOptions 对象。
tlsOptions, err := InitializeTLS(&s.KubeletFlags, &s.KubeletConfiguration)
// s.ExperimentalMounterPath安装程序二进制文件的路径,未填则使用默认路径
mounter := mount.New(s.ExperimentalMounterPath)
// New 返回一个子路径.当前系统的接口.
subpather := subpath.New(mounter)
//NewHostUtil 在不支持的平台返回一个实现 HostUtils 接口的结构体
hu := hostutil.NewHostUtil()
// New 返回一个新的接口,它将 os/exec 运行命令
var pluginRunner = exec.New()
// ProbeVolumePlugins 将所有卷插件收集到一个易于使用的列表中
plugins, err := ProbeVolumePlugins(featureGate)
if err != nil {
return nil, err
}
// NewNoopTracerProvider 返回 TracerProvider 的实现
// 不执行任何操作。从返回的结果创建的 Tracer 和 Spans
// TracerProvider 也不执行任何操作。
tp := oteltrace.NewNoopTracerProvider()
// features.KubeletTracing在 kubelet 中添加对分布式跟踪的支持
if utilfeature.DefaultFeatureGate.Enabled(features.KubeletTracing) {
tp, err = newTracerProvider(s)
if err != nil {
return nil, err
}
}
return &kubelet.Dependencies{
Auth: nil, // default does not enforce auth[nz]
CAdvisorInterface: nil, // cadvisor.New launches background processes (bg http.ListenAndServe, and some bg cleaners), not set here
Cloud: nil, // cloud provider might start background processes
ContainerManager: nil,
KubeClient: nil,
HeartbeatClient: nil,
EventClient: nil,
TracerProvider: tp,
HostUtil: hu,
Mounter: mounter,
Subpather: subpather,
OOMAdjuster: oom.NewOOMAdjuster(),
OSInterface: kubecontainer.RealOS{},
VolumePlugins: plugins,
DynamicPluginProber: GetDynamicPluginProber(s.VolumePluginDir, pluginRunner),
TLSOptions: tlsOptions}, nil
}

func InitializeTLS(kf *options.KubeletFlags, kc *kubeletconfiginternal.KubeletConfiguration) (*server.TLSOptions, error) {
// 1. kc.ServerTLSBootstrap:ServerTLSBootstrap是Kubeconfiguration中的一个字段,用来指示启用服务器证书引导
// 系统不再使用自签名的服务证书, kubelet 会调用certificates.k8s.io API 来请求证书,需要有一个批复人来批准证书签名请求(CSR)
// 设置此字段时,RotateKubeletServerCertificate特性必须被启用。
//
// 2. kc.TLSCertFile && kc.TLSPrivateKeyFile:TLSCertFile是包含 HTTPS 所需要的 x509 证书的文件 (如果有 CA 证书,会串接到服务器证书之后)。X509证书也就是HTTPS证书,比如说SSL/TLS证书。
// 如果tlsCertFile 和tlsPrivateKeyFile都没有设置,则系统会为节点的公开地址生成自签名的证书和私钥,并将其保存到 kubelet --cert-dir参数所指定的目录下。
if !kc.ServerTLSBootstrap && kc.TLSCertFile == "" && kc.TLSPrivateKeyFile == "" {
// --cert-dir即为CertDirectory,命令行指定的证书目录
// 这里拿到两个证书的路径
kc.TLSCertFile = path.Join(kf.CertDirectory, "kubelet.crt")
kc.TLSPrivateKeyFile = path.Join(kf.CertDirectory, "kubelet.key")
// CanReadCertAndKey用来判断证书文件是否存在并且可以读取
canReadCertAndKey, err := certutil.CanReadCertAndKey(kc.TLSCertFile, kc.TLSPrivateKeyFile)
if err != nil {
return nil, err
}
// 如果证书文件不存在或者不可以读取,则生成自签名证书并写入路径
if !canReadCertAndKey {
// nodeutil.GetHostname命令行设置的主机名,如果不存在,则使用os.Hostname获取node的hostname,最终转换为小写
hostName, err := nodeutil.GetHostname(kf.HostnameOverride)
if err != nil {
return nil, err
}
// 证书不存在,生成自签名证书
cert, key, err := certutil.GenerateSelfSignedCertKey(hostName, nil, nil)
if err != nil {
return nil, fmt.Errorf("unable to generate self signed cert: %w", err)
}

// 将cert和key写到kc.TLSCertFile和kc.TLSPrivateKeyFile路径
...
}
}
// kc.TLSCipherSuites是一个字符串数组,用来指定TLS连接时所使用的加密套件,见https://golang.org/pkg/crypto/tls/#pkg-constants
// 这个方法会将TLSCipherSuites字符串数组转换为加密套件数组
tlsCipherSuites, err := cliflag.TLSCipherSuites(kc.TLSCipherSuites)
if err != nil {
return nil, err
}
// 如果加密套件数组不为空,判断使用的加密套件是否有安全问题,如果有,打印出来
if len(tlsCipherSuites) > 0 {
// 返回tls/ssl实现的加密套件中具有安全问题的那部分
insecureCiphers := cliflag.InsecureTLSCiphers()
for i := 0; i < len(tlsCipherSuites); i++ {
for cipherName, cipherID := range insecureCiphers {
if tlsCipherSuites[i] == cipherID {
klog.InfoS("Use of insecure cipher detected.", "cipher", cipherName)
}
}
}
}
// TLSMinVersion 是支持的最低 TLS 版本
minTLSVersion, err := cliflag.TLSVersion(kc.TLSMinVersion)
if err != nil {
return nil, err
}
// 打印警告“不可配置最低的TLS版本为TLS1.3”
if minTLSVersion == tls.VersionTLS13 {
if len(tlsCipherSuites) != 0 {
klog.InfoS("Warning: TLS 1.3 cipher suites are not configurable, ignoring --tls-cipher-suites")
}
}
// 创建一个server.TLSOptions对象,包含了TLS配置信息
tlsOptions := &server.TLSOptions{
Config: &tls.Config{
MinVersion: minTLSVersion,
CipherSuites: tlsCipherSuites,
},
CertFile: kc.TLSCertFile,
KeyFile: kc.TLSPrivateKeyFile,
}
// kc.Authentication.X509指定如何对 Kubelet 服务器的请求进行身份验证
// 三个属性:
// X509:x509 包含与 x509 客户端证书身份验证相关的设置。
// clientCAFile 是 PEM 编码证书捆绑包的路径。
// 如果设置了该选项,则任何提交由该证书中的一个机构签署的客户证书的请求,
// 都会使用与客户证书中的 CommonName 对应的用户名和与组织对应的组进行身份验证。
// webhook:包含与 webhook 承载令牌身份验证相关的设置。
// enabled:启用允许由 tokenreviews.authentication.k8s.io API 支持的不记名令牌身份验证
// cacheTTL:启用身份验证结果的缓存
// anonymous:包含与匿名身份验证相关的设置
// enabled:允许向 kubelet 服务器发起匿名请求。未被其他身份验证方法拒绝的请求将被视为匿名请求。
// 匿名请求的用户名是 system:anonymous,组名是 system:unauthenticated
if len(kc.Authentication.X509.ClientCAFile) > 0 {
// NewPoolFromBytes 返回一个 x509.CertPool,其中包含给定 PEM 编码字节中的证书。
// 如果无法读取文件、无法解析证书或文件不包含任何证书,则返回错误
clientCAs, err := certutil.NewPool(kc.Authentication.X509.ClientCAFile)
if err != nil {
return nil, fmt.Errorf("unable to load client CA file %s: %w", kc.Authentication.X509.ClientCAFile, err)
}
// Specify allowed CAs for client certificates
tlsOptions.Config.ClientCAs = clientCAs
// Populate PeerCertificates in requests, but don't reject connections without verified certificates
tlsOptions.Config.ClientAuth = tls.RequestClientCert
}
return tlsOptions, nil
}

func GenerateSelfSignedCertKeyWithFixtures(host string, alternateIPs []net.IP, alternateDNS []string, fixtureDirectory string) ([]byte, []byte, error) {
// 生效时间:当前时间的前一个小时,避免时钟偏差的影响
// 一年的有效期
validFrom := time.Now().Add(-time.Hour) // valid an hour earlier to avoid flakes due to clock skew
maxAge := time.Hour * 24 * 365 // one year self-signed certs
// hostname_ip_dns
baseName := fmt.Sprintf("%s_%s_%s", host, strings.Join(ipsToStrings(alternateIPs), "-"), strings.Join(alternateDNS, "-"))
certFixturePath := filepath.Join(fixtureDirectory, baseName+".crt")
keyFixturePath := filepath.Join(fixtureDirectory, baseName+".key")
...
// 生成密钥对,实际上下面要生成中间证书
caKey, err := rsa.GenerateKey(cryptorand.Reader, 2048)
caTemplate := x509.Certificate{
SerialNumber: big.NewInt(1),
Subject: pkix.Name{
CommonName: fmt.Sprintf("%s-ca@%d", host, time.Now().Unix()),
},
NotBefore: validFrom,
NotAfter: validFrom.Add(maxAge),

KeyUsage: x509.KeyUsageKeyEncipherment | x509.KeyUsageDigitalSignature | x509.KeyUsageCertSign,
BasicConstraintsValid: true,
IsCA: true,
}
// 构建一个Certificate结构体,代表Certificate证书,即创建证书模版
caTemplate := x509.Certificate{
SerialNumber: big.NewInt(1), //该号码表示CA颁发的唯一序列号,在此使用一个数来代表
Subject: pkix.Name{
CommonName: fmt.Sprintf("%s-ca@%d", host, time.Now().Unix()),
},
NotBefore: validFrom,
NotAfter: validFrom.Add(maxAge),
// Key Encipherment (x509.KeyUsageKeyEncipherment):表示证书的密钥可用于加密数据
// Digital Signature (x509.KeyUsageDigitalSignature):表示证书的密钥可用于数字签名
// Certificate Signing (x509.KeyUsageCertSign):表示证书的密钥可用于签发其他证书,通常用于证书链中的中间或根证书
KeyUsage: x509.KeyUsageKeyEncipherment | x509.KeyUsageDigitalSignature | x509.KeyUsageCertSign, //表示该证书是用来做服务端认证的
BasicConstraintsValid: true,
IsCA: true,
}
// CreateCertificate 根据模板创建新的 X.509 v3 证书,返回DER类型的编码
// 查阅文档 https://pkg.go.dev/crypto/x509@go1.21.0#CreateCertificate
caDERBytes, err := x509.CreateCertificate(cryptorand.Reader, &caTemplate, &caTemplate, &caKey.PublicKey, caKey)
if err != nil {
return nil, nil, err
}
// 从DER类型的数据中解析Certificate证书 中间证书
caCertificate, err := x509.ParseCertificate(caDERBytes)
if err != nil {
return nil, nil, err
}
// 生成密钥对
priv, err := rsa.GenerateKey(cryptorand.Reader, 2048)
if err != nil {
return nil, nil, err
}
template := x509.Certificate{
SerialNumber: big.NewInt(2), //该号码表示CA颁发的唯一序列号,在此使用一个数来代表
Subject: pkix.Name{
CommonName: fmt.Sprintf("%s@%d", host, time.Now().Unix()),
},
NotBefore: validFrom,
NotAfter: validFrom.Add(maxAge),

KeyUsage: x509.KeyUsageKeyEncipherment | x509.KeyUsageDigitalSignature,
ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth},
BasicConstraintsValid: true,
}
// 获取IP地址
if ip := netutils.ParseIPSloppy(host); ip != nil {
template.IPAddresses = append(template.IPAddresses, ip)
} else {
template.DNSNames = append(template.DNSNames, host)
}
template.IPAddresses = append(template.IPAddresses, alternateIPs...)
template.DNSNames = append(template.DNSNames, alternateDNS...)
// CreateCertificate 根据模板创建新的 X.509 v3 证书,返回DER类型的编码
derBytes, err := x509.CreateCertificate(cryptorand.Reader, &template, caCertificate, &priv.PublicKey, caKey)
if err != nil {
return nil, nil, err
}
// 生成中间证书+CA证书
certBuffer := bytes.Buffer{}
if err := pem.Encode(&certBuffer, &pem.Block{Type: CertificateBlockType, Bytes: derBytes}); err != nil {
return nil, nil, err
}
if err := pem.Encode(&certBuffer, &pem.Block{Type: CertificateBlockType, Bytes: caDERBytes}); err != nil {
return nil, nil, err
}
// 将私钥中的密钥对放入pem.Block结构体中
keyBuffer := bytes.Buffer{}
if err := pem.Encode(&keyBuffer, &pem.Block{Type: keyutil.RSAPrivateKeyBlockType, Bytes: x509.MarshalPKCS1PrivateKey(priv)}); err != nil {
return nil, nil, err
}
...
return certBuffer.Bytes(), keyBuffer.Bytes(), nil
}

initConfigz(&s.KubeletConfiguration)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
func initConfigz(kc *kubeletconfiginternal.KubeletConfiguration) error {
cz, err := configz.New("kubeletconfig")
if err != nil {
klog.ErrorS(err, "Failed to register configz")
return err
}
if err := setConfigz(cz, kc); err != nil {
klog.ErrorS(err, "Failed to register config")
return err
}
return nil
}

// configz.New的实现
// 1. 先加锁,configsGuard.Lock()使用了sync.RWMutex,sync.RWMutex允许多个读锁或一个写锁存在,适用于读频繁但是写不频繁的场景。
// golang的map不是线程安全的,在查找、赋值、遍历、删除都会检查写标志,当写标志存在时,会直接panic
// 2. 保证配置只被写入一次,先赋值为将一个空的Config结构体:
// type Config struct {
// val interface{}
// }
// configs是一个全局变量,用于存储name和Config句柄的映射
func New(name string) (*Config, error) {
configsGuard.Lock() // configsGuard sync.RWMutex
defer configsGuard.Unlock()
if _, found := configs[name]; found {
return nil, fmt.Errorf("register config %q twice", name)
}
newConfig := Config{}
configs[name] = &newConfig
return &newConfig, nil
}

// 一些前置的结构体说明

// 用来标识一种资源类型,它表示类型,版本和组
type GroupVersionKind struct {
Group string
Version string
Kind string
}
// 用来将字段选择器转换成内部表示
type FieldLabelConversionFunc func(label, value string) (internalLabel, internalValue string, err error)

//
func setConfigz(cz *configz.Config, kc *kubeletconfiginternal.KubeletConfiguration) error {
scheme, _, err := kubeletscheme.NewSchemeAndCodecs()
if err != nil {
return err
}
versioned := kubeletconfigv1beta1.KubeletConfiguration{}
if err := scheme.Convert(kc, &versioned, nil); err != nil {
return err
}
cz.Set(versioned)
return nil
}

// 用来标志一种API的版本和类型
type GroupVersionKind struct {
Group string
Version string
Kind string
}

// schema注册了api对象的序列化和反序列化对象。它将组,版本和kind信息转换成Go Schema和不同版本之间的映射。它是版本化API和版本化配置的基础。
// 在schema里面,Type是一个特定的结构,Version是该类型特定的时间点标识符,通常向后兼容。对于特定版本的Type,Kind具有唯一的名称。
// Group用来标识一系列随着时间变化的Version,Kind和Type。(实际上是类型的“v1”,不期望将来被打破)
// 以上实际上是Kubernetes用来管理不同的API版本的方式,它存储了API的元信息。例如:
// apiVersion: v1
// kind: Pod
// Kind为Pod,v1是Version,Group是core
// 这个对象不应该被修改,在注册后它是线程安全的
type Scheme struct {
// 通过API版本信息和名称找出Type,GroupVersionKind结构体已经说明
gvkToType map[schema.GroupVersionKind]reflect.Type

// 通过Type找出版本
typeToGVK map[reflect.Type][]schema.GroupVersionKind

// unversionedTypes无需转换
unversionedTypes map[reflect.Type]schema.GroupVersionKind

// unversionedKinds是Kind的名称,可以在任何组/版本的上下文被创建
unversionedKinds map[string]reflect.Type

// 从版本和资源映射到相应的方法,该方法将资源的label字段映射成内部版本
fieldLabelConversionFuncs map[schema.GroupVersionKind]FieldLabelConversionFunc

// defaulterFuncs 使用对象调用,以实现默认功能的映射
defaulterFuncs map[reflect.Type]func(interface{})

// 存储已转换的函数
converter *conversion.Converter

// versionPriority 组到版本的有序列表,指示默认列表中的优先级
versionPriority map[string][]string

// observedVersions 跟踪在类型注册期间我们看到的版本的顺序
observedVersions []schema.GroupVersion

// schemeName 是schema的名称。不指定名称则使用NewSchema的调用堆栈
schemeName string
}

func NewSchemeAndCodecs(mutators ...serializer.CodecFactoryOptionsMutator) (*runtime.Scheme, *serializer.CodecFactory, error) {
scheme := runtime.NewScheme() // 新建一个Schema的默认对象
// AddToScheme主要调用kubeletconfig register.go的Register函数列表,注册和转换为Schema元信息,见下文register.go和scheme_builder.go
if err := kubeletconfig.AddToScheme(scheme); err != nil {
return nil, nil, err
}
if err := kubeletconfigv1beta1.AddToScheme(scheme); err != nil {
return nil, nil, err
}
if err := kubeletconfigv1.AddToScheme(scheme); err != nil {
return nil, nil, err
}
codecs := serializer.NewCodecFactory(scheme, mutators...)
return scheme, &codecs, nil
}

下面我们先看kubeletconfig.AddToScheme(scheme),看看是怎么将kubeletconfig注册到Schema的。

/**
kubelet的注册文件register.go
*/
// kubelet Group名称
const GroupName = "kubelet.config.k8s.io"
// APIVersionInternal为__internal,表示一个不稳定的内部对象
var SchemeGroupVersion = schema.GroupVersion{Group: GroupName, Version: runtime.APIVersionInternal}

var (
// SchemeBuilder is the scheme builder with scheme init functions to run for this API package
SchemeBuilder = runtime.NewSchemeBuilder(addKnownTypes)
// AddToScheme is a global function that registers this API group & version to a scheme
AddToScheme = SchemeBuilder.AddToScheme
)

// addKnownTypes registers known types to the given scheme
func addKnownTypes(scheme *runtime.Scheme) error {
// 见下文vendor/k8s.io/apimachinery/pkg/runtime/scheme.go
scheme.AddKnownTypes(SchemeGroupVersion,
&KubeletConfiguration{},
&SerializedNodeConfigSource{},
&CredentialProviderConfig{},
)
return nil
}

/*
scheme_builder.go
*/

// AddToScheme applies all the stored functions to the scheme. A non-nil error
// indicates that one function failed and the attempt was abandoned.
func (sb *SchemeBuilder) AddToScheme(s *Scheme) error {
for _, f := range *sb {
if err := f(s); err != nil {
return err
}
}
return nil
}

// Register adds a scheme setup function to the list.
func (sb *SchemeBuilder) Register(funcs ...func(*Scheme) error) {
for _, f := range funcs {
*sb = append(*sb, f)
}
}

// NewSchemeBuilder calls Register for you.
func NewSchemeBuilder(funcs ...func(*Scheme) error) SchemeBuilder {
var sb SchemeBuilder
sb.Register(funcs...)
return sb
}

/*
vendor/k8s.io/apimachinery/pkg/runtime/scheme.go
*/

// 判断如果为非内部类型并且不在observedVersions(记录注册的版本顺序)中,则加进去
func (s *Scheme) addObservedVersion(version schema.GroupVersion) {
if len(version.Version) == 0 || version.Version == APIVersionInternal {
return
}
for _, observedVersion := range s.observedVersions {
if observedVersion == version {
return
}
}

s.observedVersions = append(s.observedVersions, version)
}

// 构建GroupVersionKind结构体
func (gv GroupVersion) WithKind(kind string) GroupVersionKind {
return GroupVersionKind{Group: gv.Group, Version: gv.Version, Kind: kind}
}
// GroupVersionKind构建GroupVersion结构体
func (gvk GroupVersionKind) GroupVersion() GroupVersion {
return GroupVersion{Group: gvk.Group, Version: gvk.Version}
}

func (s *Scheme) AddKnownTypes(gv schema.GroupVersion, types ...Object) {
s.addObservedVersion(gv) // 注册到schema.observedVersions
for _, obj := range types {
// 检查,types必须为结构体指针列表
t := reflect.TypeOf(obj)
if t.Kind() != reflect.Pointer {
panic("All types must be pointers to structs.")
}
t = t.Elem() // 获取结构体
s.AddKnownTypeWithName(gv.WithKind(t.Name()), obj)
}
}

func (s *Scheme) AddKnownTypeWithName(gvk schema.GroupVersionKind, obj Object) {
s.addObservedVersion(gvk.GroupVersion()) // 将上面的types ...Object添加到Schema
t := reflect.TypeOf(obj)
// 一些检查,跟上面的检查项一样,主要检查版本是否为空,obj是否是结构体指针
...
t = t.Elem()
// 唯一性检查,不能注册obj两次,t在这里是GroupVersionKind结构体指针
if oldT, found := s.gvkToType[gvk]; found && oldT != t {
panic(fmt.Sprintf("Double registration of different types for %v: old=%v.%v, new=%v.%v in scheme %q", gvk, oldT.PkgPath(), oldT.Name(), t.PkgPath(), t.Name(), s.schemeName))
}
// 添加到Schema的gvkToType映射当中
s.gvkToType[gvk] = t

for _, existingGvk := range s.typeToGVK[t] {
if existingGvk == gvk {
return
}
}
// 添加obj对应的GroupVersionKind添加到typeToGVK中,t在这里是Object结构体即上文的KubeletConfiguration{}等,gvk是GroupVersionKind结构体
s.typeToGVK[t] = append(s.typeToGVK[t], gvk)
}

// 好了,回过头我们再看

func addKnownTypes(scheme *runtime.Scheme) error {
scheme.AddKnownTypes(SchemeGroupVersion,
&KubeletConfiguration{},
&SerializedNodeConfigSource{},
&CredentialProviderConfig{},
)
return nil
}

// scheme.AddKnownTypes将SchemeGroupVersion,KubeletConfiguration,SerializedNodeConfigSource和CredentialProviderConfig注册到Schema元信息当中。并且,在Schema.observedVersions当中它是按顺序的。
// 其中
// KubeletConfiguration包含了Kubelet的配置
// SerializedNodeConfigSource为Kubelet 在内部使用此类型来跟踪检查点动态配置,从1.22起已经废弃
// CredentialProviderConfig凭证提供者的信息。从磁盘读取并且此配置并启用。

回头来看kubeletconfigv1beta1.AddToScheme(scheme),依然是注册了kubeletconfigv1beta1版本的配置信息
不过,下面不一样的是,它注册了一个addDefaultingFuncs函数,在kubeletconfigv1beta1.AddToScheme(scheme)被调用

// GroupName is the group name used in this package
const GroupName = "kubelet.config.k8s.io"

// SchemeGroupVersion is group version used to register these objects
var SchemeGroupVersion = schema.GroupVersion{Group: GroupName, Version: "v1beta1"}

var (
// localSchemeBuilder extends the SchemeBuilder instance with the external types. In this package,
// defaulting and conversion init funcs are registered as well.
localSchemeBuilder = &kubeletconfigv1beta1.SchemeBuilder
// AddToScheme is a global function that registers this API group & version to a scheme
AddToScheme = localSchemeBuilder.AddToScheme
)

func init() {
// We only register manually written functions here. The registration of the
// generated functions takes place in the generated files. The separation
// makes the code compile even when the generated files are missing.
localSchemeBuilder.Register(addDefaultingFuncs)
}

---

// 向Schema.defaulterFuncs添加默认函数
func (s *Scheme) AddTypeDefaultingFunc(srcType Object, fn func(interface{})) {
s.defaulterFuncs[reflect.TypeOf(srcType)] = fn
}

func addDefaultingFuncs(scheme *kruntime.Scheme) error {
return RegisterDefaults(scheme)
}

// 添加了一个默认函数,这个函数的参数为*v1beta1.KubeletConfiguration,
// 实际上将*v1beta1.KubeletConfiguration作为参数调用SetObjectDefaults_KubeletConfiguration
func RegisterDefaults(scheme *runtime.Scheme) error {
scheme.AddTypeDefaultingFunc(&v1beta1.KubeletConfiguration{}, func(obj interface{}) { SetObjectDefaults_KubeletConfiguration(obj.(*v1beta1.KubeletConfiguration)) })
return nil
}

func SetObjectDefaults_KubeletConfiguration(in *v1beta1.KubeletConfiguration) {
// 这个函数比较简单,就是当KubeletConfiguration配置为空时设置v1beta1.KubeletConfiguration结构体的默认参数
SetDefaults_KubeletConfiguration(in)
for i := range in.ReservedMemory {
a := &in.ReservedMemory[i]
v1.SetDefaults_ResourceList(&a.Limits)
}
}

// v1beta1.KubeletConfiguration
type KubeletConfiguration struct {
...
// NUMA节点的保留内存配置
// 相关信息可以查看https://kubernetes.io/docs/tasks/administer-cluster/reserve-compute-resources/#node-allocatable
ReservedMemory []MemoryReservation `json:"reservedMemory,omitempty"`
...
}

同样的,kubeletconfigv1.AddToScheme(scheme)也是用来注册v1版本的kubeletconfig信息

至于下面这一样,是初始化一系列的序列化器,将配置序列化成json,yaml等格式的数据

codecs := serializer.NewCodecFactory(scheme, mutators...)

回到setConfigz方法:

func setConfigz(cz *configz.Config, kc *kubeletconfiginternal.KubeletConfiguration) error {
scheme, _, err := kubeletscheme.NewSchemeAndCodecs()
if err != nil {
return err
}
versioned := kubeletconfigv1beta1.KubeletConfiguration{}
if err := scheme.Convert(kc, &versioned, nil); err != nil {
return err
}
cz.Set(versioned)
return nil
}

type Config struct {
val interface{}
}

首先,使用kubeletscheme.NewSchemeAndCodecs()注册了kubeletconfig,kubeletconfigv1beta1,kubeletconfigv1的配置信息;
然后,实例化一个kubeletconfigv1beta1.KubeletConfiguration结构体
接着,使用scheme.Convert将kubeletconfiginternal.KubeletConfiguration转换为kubeletconfigv1beta1.KubeletConfiguration
最后,将转换后的kubeletconfigv1beta1.KubeletConfiguration设置到Config.val

再重新看看这块的代码:

func initConfigz(kc *kubeletconfiginternal.KubeletConfiguration) error {
cz, err := configz.New("kubeletconfig")
if err != nil {
klog.ErrorS(err, "Failed to register configz")
return err
}
if err := setConfigz(cz, kc); err != nil {
klog.ErrorS(err, "Failed to register config")
return err
}
return nil
}

这块的代码就是注册一些可用的API版本,然后将kc转换为kubeletconfigv1beta1.KubeletConfiguration类型,
并将他以kubeletconfig为key,kubeletconfigv1beta1.KubeletConfiguration句柄为值,保存到一个名为configs的map全局变量中

参考文档

评论