上一次分析了api server的源码,这一次来分析 cloudcfg 的源码。
简介
cloudcfg 可以看做是 kubectl 的前身,负责与 API server 的交互,只存在于上古时代的 k8s 中,我们现在接触到的都是叫做 kubectl 的命令行工具了。该组件做的事情非常简单,就是将用户的命令行操作转化为对 API server 的 HTTP 请求。
命令行主体逻辑
从 cloudcfg 的命令行入口开始分析,命令行代码位于 cmd/apiserver/cloudcfg.go
// The flag package provides a default help printer via -h switch
var versionFlag *bool = flag.Bool("v", false, "Print the version number.")
var httpServer *string = flag.String("h", "", "The host to connect to.")
var config *string = flag.String("c", "", "Path to the config file.")
var labelQuery *string = flag.String("l", "", "Label query to use for listing")
var updatePeriod *time.Duration = flag.Duration("u", 60*time.Second, "Update interarrival in seconds")
var portSpec *string = flag.String("p", "", "The port spec, comma-separated list of <external>:<internal>,...")
var servicePort *int = flag.Int("s", -1, "If positive, create and run a corresponding service on this port, only used with 'run'")
var authConfig *string = flag.String("auth", os.Getenv("HOME")+"/.kubernetes_auth", "Path to the auth info file. If missing, prompt the user")
最开始定义了 cloudcfg 命令行工具提供的所有参数。
main 函数中先从命令行中获取 method, api server 的 host,资源对象的类型以及鉴权参数。
method := flag.Arg(0)
url := *httpServer + "/api/v1beta1" + flag.Arg(1)
auth, err := cloudcfg.LoadAuthInfo(*authConfig)
if err != nil {
log.Fatalf("Error loading auth: %#v", err)
}
再根据 method 判断是对资源的何种操作方式。
对资源的 CURD 是通过拼接 method 和 labelQuery 作为 api server 的请求 URL 去发送 HTTP 请求,没有其他多余的逻辑。
if method == "get" || method == "list" {
if len(*labelQuery) > 0 && method == "list" {
url = url + "?labels=" + *labelQuery
}
request, err = http.NewRequest("GET", url, nil)
} else if method == "delete" {
request, err = http.NewRequest("DELETE", url, nil)
} else if method == "create" {
request, err = cloudcfg.RequestWithBody(*config, url, "POST")
} else if method == "update" {
request, err = cloudcfg.RequestWithBody(*config, url, "PUT")
}
var body string
body, err = cloudcfg.DoRequest(request, auth.User, auth.Password)
if err != nil {
log.Fatalf("Error: %#v", err)
}
fmt.Println(body)
还有对 rollingupdate 的操作和 controller 的操作,后面分别展开分析
rollingupdate操作
判断 method 为 rollingupdate 后通过 client 执行 update
else if method == "rollingupdate" {
client := &kube_client.Client{
Host: *httpServer,
Auth: &auth,
}
cloudcfg.Update(flag.Arg(1), client, *updatePeriod)
}
具体操作是通过pkg/client/client.go
中实现的 client 进行操作, client 实现了以下接口
// ClientInterface holds the methods for clients of Kubenetes, an interface to allow mock testing
type ClientInterface interface {
ListTasks(labelQuery map[string]string) (api.TaskList, error)
GetTask(name string) (api.Task, error)
DeleteTask(name string) error
CreateTask(api.Task) (api.Task, error)
UpdateTask(api.Task) (api.Task, error)
GetReplicationController(name string) (api.ReplicationController, error)
CreateReplicationController(api.ReplicationController) (api.ReplicationController, error)
UpdateReplicationController(api.ReplicationController) (api.ReplicationController, error)
DeleteReplicationController(string) error
GetService(name string) (api.Service, error)
CreateService(api.Service) (api.Service, error)
UpdateService(api.Service) (api.Service, error)
DeleteService(string) error
}
可以看到就是对 tasks,RC 和 serveice 的操作。以 create tasks 为例
// CreateTask takes the representation of a task. Returns the server's representation of the task, and an error, if it occurs
func (client Client) CreateTask(task api.Task) (api.Task, error) {
var result api.Task
body, err := json.Marshal(task)
if err == nil {
_, err = client.rawRequest("POST", "tasks", bytes.NewBuffer(body), &result)
}
return result, err
}
可以看到同样是通过请求 api server 来完成操作的。
回到命令行的 main 函数,创建 client 后调用cloudcfg.Update(flag.Arg(1), client, *updatePeriod)
// 代码路径:pkg/cloudcfg/cloudcfg.go
// Perform a rolling update of a collection of tasks.
// 'name' points to a replication controller.
// 'client' is used for updating tasks.
// 'updatePeriod' is the time between task updates.
func Update(name string, client client.ClientInterface, updatePeriod time.Duration) error {
controller, err := client.GetReplicationController(name)
if err != nil {
return err
}
labels := controller.DesiredState.ReplicasInSet
taskList, err := client.ListTasks(labels)
if err != nil {
return err
}
for _, task := range taskList.Items {
_, err = client.UpdateTask(task)
if err != nil {
return err
}
time.Sleep(updatePeriod)
}
return nil
}
大致逻辑是通过 name 获取 RC 对象,再通过 RC 对象的期望状态获取 tasks label,再通过 task label 来 list 所有 task 后更新 task。不过笔者没太看懂这里的更新逻辑,看上去把遍历 task 的时候由原封不动传回去了,或许第一个版本还不支持滚动更新?待笔者后续完整深入看完所有组件逻辑再来补充。
controller 操作
判断 method 为 run 时,拿到 image,replicas, name 后执行 RunController
if method == "run" {
args := flag.Args()
if len(args) < 4 {
log.Fatal("usage: cloudcfg -h <host> run <image> <replicas> <name>")
}
image := args[1]
replicas, err := strconv.Atoi(args[2])
name := args[3]
if err != nil {
log.Fatalf("Error parsing replicas: %#v", err)
}
err = cloudcfg.RunController(image, name, replicas, kube_client.Client{Host: *httpServer, Auth: &auth}, *portSpec, *servicePort)
if err != nil {
log.Fatalf("Error: %#v", err)
}
return
}
进入到 cloudcfg.RunController 函数内部分析
controller := api.ReplicationController{
JSONBase: api.JSONBase{
ID: name,
},
DesiredState: api.ReplicationControllerState{
Replicas: replicas,
ReplicasInSet: map[string]string{
"name": name,
},
TaskTemplate: api.TaskTemplate{
DesiredState: api.TaskState{
Manifest: api.ContainerManifest{
Containers: []api.Container{
api.Container{
Image: image,
Ports: makePorts(portSpec),
},
},
},
},
Labels: map[string]string{
"name": name,
},
},
},
Labels: map[string]string{
"name": name,
},
}
controllerOut, err := client.CreateReplicationController(controller)
if err != nil {
return err
}
根据外部传入的参数构造 RC 对象,然后调用 client 的 CreateReplicationController 函数,本质还是向 API Server 发起请求
data, err := yaml.Marshal(controllerOut)
if err != nil {
return err
}
fmt.Print(string(data))
if servicePort > 0 {
svc, err := createService(name, servicePort, client)
if err != nil {
return err
}
data, err = yaml.Marshal(svc)
if err != nil {
return err
}
fmt.Printf(string(data))
}
再根据掺入的 servicePort 创建 service
func createService(name string, port int, client client.ClientInterface) (api.Service, error) {
svc := api.Service{
JSONBase: api.JSONBase{ID: name},
Port: port,
Labels: map[string]string{
"name": name,
},
}
svc, err := client.CreateService(svc)
return svc, err
}
同样是向 API server 发起请求。
回到 cloudcfg 命令行中,判断 method 为 stop 时执行 StopController
else if method == "stop" {
err = cloudcfg.StopController(flag.Arg(1), kube_client.Client{Host: *httpServer, Auth: &auth})
if err != nil {
log.Fatalf("Error: %#v", err)
}
return
}
分析 cloudcfg.StopController
// StopController stops a controller named 'name' by setting replicas to zero
func StopController(name string, client client.ClientInterface) error {
controller, err := client.GetReplicationController(name)
if err != nil {
return err
}
controller.DesiredState.Replicas = 0
controllerOut, err := client.UpdateReplicationController(controller)
if err != nil {
return err
}
data, err := yaml.Marshal(controllerOut)
if err != nil {
return err
}
fmt.Print(string(data))
return nil
}
可以看到将 RC 的 Replicas 置为 0 后发送给了 API server。
Controller 的操作还有一个 rm
lse if method == "rm" {
err = cloudcfg.DeleteController(flag.Arg(1), kube_client.Client{Host: *httpServer, Auth: &auth})
if err != nil {
log.Fatalf("Error: %#v", err)
}
return
}
cloudcfg.DeleteController 也很简单,获取当前 RC 的副本数,如果副本数不为 0 则报错退出,否则请求 API server 删除
// DeleteController deletes a replication controller named 'name', requires that the controller
// already be stopped
func DeleteController(name string, client client.ClientInterface) error {
controller, err := client.GetReplicationController(name)
if err != nil {
return err
}
if controller.DesiredState.Replicas != 0 {
return fmt.Errorf("controller has non-zero replicas (%d)", controller.DesiredState.Replicas)
}
return client.DeleteReplicationController(name)
}
我的博客即将同步至腾讯云开发者社区,邀请大家一同入驻:https://cloud.tencent.com/developer/support-plan?invite_code=2td5ld5xvow0o
本文作者: Ifan Tsai (菜菜)
本文链接: https://www.caiyifan.cn/p/a35ad04f.html
版权声明: 本文采用 知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议 进行许可。转载请注明出处!
未经允许不得转载:木盒主机 » k8s first commit 源码分析之 Cloudcfg