k8s first commit 源码分析之 API Server

穿越回 2014 年,分析下 k8s 第一个提交的源码。

获取 first commit 源码

git clone https://github.com/kubernetes/kubernetes.git
cd kubernetes
git checkout `git rev-list --max-parents=0 HEAD`

简介

api-server 是 k8s 的核心组件之一,用于接收 kubelet 的请求,并将请求信息保存到后端存储 etcd 中。核心功能是提供 k8s 各类资源对象的 CURD 等操作。

源码分析

从 api-server 的命令行入口开始分析,命令行代码位于 cmd/apiserver/apiserver.go

var (
	port                        = flag.Uint("port", 8080, "The port to listen on.  Default 8080.")
	address                     = flag.String("address", "127.0.0.1", "The address on the local server to listen to. Default 127.0.0.1")
	apiPrefix                   = flag.String("api_prefix", "/api/v1beta1", "The prefix for API requests on the server. Default '/api/v1beta1'")
	etcdServerList, machineList util.StringList
)

func init() {
	flag.Var(&etcdServerList, "etcd_servers", "Servers for the etcd (http://ip:port), comma separated")
	flag.Var(&machineList, "machines", "List of machines to schedule onto, comma separated.")
}

最开始定义了 api-server 启动所需要的相关参数,上古版本的 k8s 使用了标准库自带的 flag 库,其中 util.StringList实现了flag.Value接口。

type StringList []string

func (sl *StringList) String() string {
	return fmt.Sprint(*sl)
}

func (sl *StringList) Set(value string) error {
	for _, s := range strings.Split(value, ",") {
		if len(s) == 0 {
			return fmt.Errorf("value should not be an empty string")
		}
		*sl = append(*sl, s)
	}
	return nil
}

可以看到util.StringList用于将以逗号分割的字符串转为[]string类型。各个命令行参数含义如下:

  • port: api-server 监听的 port
  • address: api-server 监听的 ip
  • apiPrefix: 访问 api-server 的 URL 前缀
  • etcdServerList: 后端存储的 etcd 节点列表
  • machineList: 工作节点的列表

从 main 函数开始分析 api-server 的具体实现

var (
    taskRegistry       registry.TaskRegistry
    controllerRegistry registry.ControllerRegistry
    serviceRegistry    registry.ServiceRegistry
)

if len(etcdServerList) > 0 {
    log.Printf("Creating etcd client pointing to %v", etcdServerList)
    etcdClient := etcd.NewClient(etcdServerList)
    taskRegistry = registry.MakeEtcdRegistry(etcdClient, machineList)
    controllerRegistry = registry.MakeEtcdRegistry(etcdClient, machineList)
    serviceRegistry = registry.MakeEtcdRegistry(etcdClient, machineList)
} else {
    taskRegistry = registry.MakeMemoryRegistry()
    controllerRegistry = registry.MakeMemoryRegistry()
    serviceRegistry = registry.MakeMemoryRegistry()
}

registry 是对具体资源对象的后端存储的抽象,这里定义了三个 registry,并根据命令行参数判断是使用 etcd 还是内存作为存储后端。

// 代码路径:pkg/registry/interfaces.go
// TaskRegistry is an interface implemented by things that know how to store Task objects
type TaskRegistry interface {
	// ListTasks obtains a list of tasks that match query.
	// Query may be nil in which case all tasks are returned.
	ListTasks(query *map[string]string) ([]api.Task, error)
	// Get a specific task
	GetTask(taskId string) (*api.Task, error)
	// Create a task based on a specification, schedule it onto a specific machine.
	CreateTask(machine string, task api.Task) error
	// Update an existing task
	UpdateTask(task api.Task) error
	// Delete an existing task
	DeleteTask(taskId string) error
}

其中taskRegistry是对 task 的存储抽象,task 可以当作 pod 的前身看待,实现了对 task 的 list,get,create, update, delete 的操作。

// 代码路径:pkg/registry/interfaces.go
// ControllerRegistry is an interface for things that know how to store Controllers
type ControllerRegistry interface {
	ListControllers() ([]api.ReplicationController, error)
	GetController(controllerId string) (*api.ReplicationController, error)
	CreateController(controller api.ReplicationController) error
	UpdateController(controller api.ReplicationController) error
	DeleteController(controllerId string) error
}

ControllerRegistry是对 RC(Replication Controller) 的存储抽象,而我们现在使用的较多的是 RS(RepicateSet), RS 正是 RC 的升级,同样是实现了对 RC 的 list,get,create,update,delete 操作。

// 代码路径:pkg/registry/service_registry.go
type ServiceRegistry interface {
	ListServices() (ServiceList, error)
	CreateService(svc Service) error
	GetService(name string) (*Service, error)
	DeleteService(name string) error
	UpdateService(svc Service) error
	UpdateEndpoints(e Endpoints) error
}

ServiceRegistry是对 service 的存储抽象

containerInfo := &kube_client.HTTPContainerInfo{
    Client: http.DefaultClient,
    Port:   10250,
}

storage := map[string]apiserver.RESTStorage{
    "tasks":                  registry.MakeTaskRegistryStorage(taskRegistry, containerInfo, registry.MakeFirstFitScheduler(machineList, taskRegistry)),
    "replicationControllers": registry.MakeControllerRegistryStorage(controllerRegistry),
    "services":               registry.MakeServiceRegistryStorage(serviceRegistry),
}

storge 是对所有资源的 registry 的统一抽象,被定义为 REST 风格的资源操作接口。

// 代码路径: pkg/apiserver/api_server.go
// RESTStorage is a generic interface for RESTful storage services
type RESTStorage interface {
	List(*url.URL) (interface{}, error)
	Get(id string) (interface{}, error)
	Delete(id string) error
	Extract(body string) (interface{}, error)
	Create(interface{}) error
	Update(interface{}) error
}

实例化所有资源的 storage 后放在 map 中维护,用于后面 handler 的处理。

s := &http.Server{
    Addr:           fmt.Sprintf("%s:%d", *address, *port),
    Handler:        apiserver.New(storage, *apiPrefix),  // 使用 REST storage 创建请求的 handler
    ReadTimeout:    10 * time.Second,
    WriteTimeout:   10 * time.Second,
    MaxHeaderBytes: 1 << 20,
}
log.Fatal(s.ListenAndServe())

使用前面的 REST Storage map 和 api prefix 创建 handler,启动 HTTP 服务器等待接收请求。接下来转到 handler 分析源码

// 代码路径:pkg/apiserver/api_server.go
// New creates a new ApiServer object.
// 'storage' contains a map of handlers.
// 'prefix' is the hosting path prefix.
func New(storage map[string]RESTStorage, prefix string) *ApiServer {
	return &ApiServer{
		storage: storage,
		prefix:  prefix,
	}
}

// HTTP Handler interface
func (server *ApiServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
	log.Printf("%s %s", req.Method, req.RequestURI)
	url, err := url.ParseRequestURI(req.RequestURI)
	if err != nil {
		server.error(err, w)
		return
	}
	if url.Path == "/index.html" || url.Path == "/" || url.Path == "" {
		server.handleIndex(w)
		return
	}
	if !strings.HasPrefix(url.Path, server.prefix) {
		server.notFound(req, w)
		return
	}
	requestParts := strings.Split(url.Path[len(server.prefix):], "/")[1:]
	if len(requestParts) < 1 {
		server.notFound(req, w)
		return
	}
	storage := server.storage[requestParts[0]]
	if storage == nil {
		server.notFound(req, w)
		return
	} else {
		server.handleREST(requestParts, url, req, w, storage)
	}
}

Golang HTTP 的标准库是通过实现 Handler 接口的 ServeHTTP 函数来实现处理请求,通过代码可以看出先对请求的 URL 进行解析获取具体的资源对象,再通过 REST storage map 拿到对应资源对象的 REST storage,最后调用 server.handleREST来处理具体的请求。

// 代码路径:pkg/apiserver/api_server.go
func (server *ApiServer) handleREST(parts []string, url *url.URL, req *http.Request, w http.ResponseWriter, storage RESTStorage) {
	switch req.Method {
	case "GET":
		switch len(parts) {
		case 1:
			controllers, err := storage.List(url)
			if err != nil {
				server.error(err, w)
				return
			}
			server.write(200, controllers, w)
		case 2:
			task, err := storage.Get(parts[1])
			if err != nil {
				server.error(err, w)
				return
			}
			if task == nil {
				server.notFound(req, w)
				return
			}
			server.write(200, task, w)
		default:
			server.notFound(req, w)
		}
		return
	case "POST":
		if len(parts) != 1 {
			server.notFound(req, w)
			return
		}
		body, err := server.readBody(req)
		if err != nil {
			server.error(err, w)
			return
		}
		obj, err := storage.Extract(body)
		if err != nil {
			server.error(err, w)
			return
		}
		storage.Create(obj)
		server.write(200, obj, w)
		return
	case "DELETE":
		if len(parts) != 2 {
			server.notFound(req, w)
			return
		}
		err := storage.Delete(parts[1])
		if err != nil {
			server.error(err, w)
			return
		}
		server.write(200, Status{success: true}, w)
		return
	case "PUT":
		if len(parts) != 2 {
			server.notFound(req, w)
			return
		}
		body, err := server.readBody(req)
		if err != nil {
			server.error(err, w)
		}
		obj, err := storage.Extract(body)
		if err != nil {
			server.error(err, w)
			return
		}
		err = storage.Update(obj)
		if err != nil {
			server.error(err, w)
			return
		}
		server.write(200, obj, w)
		return
	default:
		server.notFound(req, w)
	}
}

可以很清晰的看出,这段逻辑是根据请求方法和请求参数对实际的资源对象进行特定的 REST 的操作。

回到 main 函数,在启动 HTTP server 之前还启动了一个 goroutine 做定时任务

endpoints := registry.MakeEndpointController(serviceRegistry, taskRegistry)
go util.Forever(func() { endpoints.SyncServiceEndpoints() }, time.Second*10)

其中 util.Forever就是周期性任务的封装

// 代码路径: pkg/util/util.go
// Loops forever running f every d.  Catches any panics, and keeps going.
func Forever(f func(), period time.Duration) {
	for {
		func() {
			defer HandleCrash()
			f()
		}()
		time.Sleep(period)
	}
}

任务实体endpoints.SyncServiceEndpoints逻辑如下

// 代码路径: pkg/registry/endpoint.go
func (e *EndpointController) SyncServiceEndpoints() error {
	services, err := e.serviceRegistry.ListServices()
	if err != nil {
		return err
	}
	var resultErr error
	for _, service := range services.Items {
		tasks, err := e.taskRegistry.ListTasks(&service.Labels)
		if err != nil {
			log.Printf("Error syncing service: %#v, skipping.", service)
			resultErr = err
			continue
		}
		endpoints := make([]string, len(tasks))
		for ix, task := range tasks {
			// TODO: Use port names in the service object, don't just use port #0
			endpoints[ix] = fmt.Sprintf("%s:%d", task.CurrentState.Host, task.DesiredState.Manifest.Containers[0].Ports[0].HostPort)
		}
		err = e.serviceRegistry.UpdateEndpoints(Endpoints{
			Name:      service.ID,
			Endpoints: endpoints,
		})
		if err != nil {
			log.Printf("Error updating endpoints: %#v", err)
			continue
		}
	}
	return resultErr
}

可以看到主要逻辑就是定时获取所有 service 列表,再遍历 service 列表查询 service 下所有 task,最后根据 task 的 endpoint 来更新 service 的 endpoints。这一段逻辑其实就是为 kubeproxy 做负载均衡用的,让 kubeproxy 知道需要代理的 endpoint 有哪些。这一块逻辑在现在的 k8s 架构中已经从 api-server 中移除了。

笔者只分析了 api-server 主体的逻辑,后续会分析具体 registry 的逻辑。

本文作者: Ifan Tsai  (菜菜)
本文链接: https://www.caiyifan.cn/p/c0ce3a49.html
版权声明: 本文采用 知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议 进行许可。转载请注明出处!

未经允许不得转载:木盒主机 » k8s first commit 源码分析之 API Server

赞 (0)

相关推荐

    暂无内容!