Kubelet源码分析(二): DockerClient – Docker.Ren

摘要

[db:摘要]

源码版本kubernetes version: v1.3.0DockerClient初始化DockerClient是KubeletConfig的成员之一。KubeletConfig结构介绍:type KubeletConfig struct {Address                        net.IPAllowPrivileged                bool...DockerClient                   dockertools.DockerInterfaceRuntimeCgroups                 stringDockerExecHandler              dockertools.ExecHandler...}而kubeletConfig的初始化是在UnsecuredKubeletConfig()接口中进行的,需要依赖最开始组建的kubeletServer配置结构,该kubeletServer结构中有DockerEndpoint字符串成员:type KubeletServer struct {componentconfig.KubeletConfigurationAuthPath      util.StringFlag // Deprecated -- use KubeConfig insteadKubeConfig    util.StringFlagAPIServerList []stringRunOnce bool// Insert a probability of random errors during calls to the master.ChaosChance float64// Crash immediately, rather than eating panics.ReallyCrashForTesting boolSystemReserved        config.ConfigurationMapKubeReserved          config.ConfigurationMap}type KubeletConfiguration struct {// config is the path to the config file or directory of filesConfig string `json:"config"`...DockerEndpoint string `json:"dockerEndpoint"`... 实际上如果没有指定该参数的话,会默认使用端点"unix:///var/run/docker.sock"做为DockerEndpoint。可以查看NewEnvClient()接口。回到kubeletConfig的初始化接口UnsecuredKubeletConfig():func UnsecuredKubeletConfig(s *options.KubeletServer) (*KubeletConfig, error) {hostNetworkSources, err := kubetypes.GetValidatedSources(strings.Split(s.HostNetworkSources, ","))if err != nil {    return nil, err}...return &KubeletConfig{    Address:                      net.ParseIP(s.Address),    AllowPrivileged:              s.AllowPrivileged,    ...    DockerClient:                 dockertools.ConnectToDockerOrDie(s.DockerEndpoint, s.RuntimeRequestTimeout.Duration), // TODO(random-liu): Set RuntimeRequestTimeout for rkt....}接着继续查看dockertools.ConnectToDockerOrDie(s.DockerEndpoint, s.RuntimeRequestTimeout.Duration)。func ConnectToDockerOrDie(dockerEndpoint string, requestTimeout time.Duration) DockerInterface {if dockerEndpoint == "fake://" {    return NewFakeDockerClient()}client, err := getDockerClient(dockerEndpoint)if err != nil {    glog.Fatalf("Couldn't connect to docker: %v", err)}glog.Infof("Start docker client with request timeout=%v", requestTimeout)return newKubeDockerClient(client, requestTimeout)}先前我们了解了如果在kubelet启动时没有传入"docker-endpoint"参数的话,s.DockerEndpoint即为空。s.RuntimeRequestTimeout.Duration值可以查看NewKubeletServer()函数的初始化,是2min。getDockerClient()接口比较简单:getDockerClient --> dockerapi.NewEnvClient() --> NewClient().NewClient()接口如下:func NewClient(host string, version string, client *http.Client, httpHeaders map[string]string) (*Client, error) {proto, addr, basePath, err := ParseHost(host)if err != nil {    return nil, err}transport, err := transport.NewTransportWithHTTP(proto, addr, client)if err != nil {    return nil, err}return &Client{    proto:             proto,    addr:              addr,    basePath:          basePath,    transport:         transport,    version:           version,    customHTTPHeaders: httpHeaders,}, nil}之前讲了如果没有传入"docker-endpoint"参数的话,默认值就是"unix:///var/run/docker.sock".即host参数为该值。ParseHost()先根据host进行解析,然后创建transport-->Client。Client结构如下:type Client struct {// proto holds the client protocol i.e. unix.proto string// addr holds the client address.addr string// basePath holds the path to prepend to the requests.basePath string// transport is the interface to send request with, it implements transport.Client.transport transport.Client// version of the server to talk to.version string// custom http headers configured by users.customHTTPHeaders map[string]string}创建Client成功之后,最终开始提到的ConnectToDockerOrDie()接口会调用newKubeDockerClient()生成pkg/kubelet/dockertools/kube_docker_client.go里的kubeDockerClient结构:type kubeDockerClient struct {// timeout is the timeout of short running docker operations.timeout time.Durationclient  *dockerapi.Client}初始化到这里就结束了,那我们回到最初,介绍下DockerClient定义:dockertools.DockerInterface如下:type DockerInterface interface {ListContainers(options dockertypes.ContainerListOptions) ([]dockertypes.Container, error)InspectContainer(id string) (*dockertypes.ContainerJSON, error)CreateContainer(dockertypes.ContainerCreateConfig) (*dockertypes.ContainerCreateResponse, error)StartContainer(id string) errorStopContainer(id string, timeout int) errorRemoveContainer(id string, opts dockertypes.ContainerRemoveOptions) errorInspectImage(image string) (*dockertypes.ImageInspect, error)ListImages(opts dockertypes.ImageListOptions) ([]dockertypes.Image, error)PullImage(image string, auth dockertypes.AuthConfig, opts dockertypes.ImagePullOptions) errorRemoveImage(image string, opts dockertypes.ImageRemoveOptions) ([]dockertypes.ImageDelete, error)ImageHistory(id string) ([]dockertypes.ImageHistory, error)Logs(string, dockertypes.ContainerLogsOptions, StreamOptions) errorVersion() (*dockertypes.Version, error)Info() (*dockertypes.Info, error)CreateExec(string, dockertypes.ExecConfig) (*dockertypes.ContainerExecCreateResponse, error)StartExec(string, dockertypes.ExecStartCheck, StreamOptions) errorInspectExec(id string) (*dockertypes.ContainerExecInspect, error)AttachToContainer(string, dockertypes.ContainerAttachOptions, StreamOptions) error}而我们最终初始化返回了结构体kubeDockerClient,所以DockerInterface接口的实现,我们可以回到kubeDockerClient结构体所在文件pkg/kubelet/dockertools/kube_docker_client.go查看接口实现。DockeClient接口分析源码目录: pkg/kubelet/dockertools/kube_docker_client.go实现的接口如下:可以看到kubeDockerClient结构体实现了所有的DockerInterface接口。这些接口其实是对docker的操作接口进行了封装,下面取一个接口进行分析:func (d *kubeDockerClient) ListContainers(options dockertypes.ContainerListOptions) ([]dockertypes.Container, error) {ctx, cancel := d.getTimeoutContext()defer cancel()containers, err := d.client.ContainerList(ctx, options)if ctxErr := contextError(ctx); ctxErr != nil {    return nil, ctxErr}if err != nil {    return nil, err}return containers, nil}该ListContainers()接口的关键就是调用了d.client.ContainerList(ctx, options).所以关键对象还是client,继续回到上面讲初始化时介绍到的Client结构体。Client结构所在文件: vendor/github.com/docker/engine-api/client/client.goClient package结构:操作docker API的接口都封装在这些文件中,有空可以深入了解下,这里就不一一介绍了,我们继续回到d.client.ContainerList(ctx, options),实现如下:func (cli *Client) ContainerList(ctx context.Context, options types.ContainerListOptions) ([]types.Container, error) {query := url.Values{}if options.All {    query.Set("all", "1")}if options.Limit != -1 {    query.Set("limit", strconv.Itoa(options.Limit))}if options.Since != "" {    query.Set("since", options.Since)}if options.Before != "" {    query.Set("before", options.Before)}if options.Size {    query.Set("size", "1")}if options.Filter.Len() > 0 {    filterJSON, err := filters.ToParamWithVersion(cli.version, options.Filter)    if err != nil {        return nil, err    }    query.Set("filters", filterJSON)}resp, err := cli.get(ctx, "/containers/json", query, nil)if err != nil {    return nil, err}var containers []types.Containererr = json.NewDecoder(resp.body).Decode(&containers)ensureReaderClosed(resp)return containers, err}前面都是一些参数初始化,其实就是构建一个GET请求,然后调用cli.get(),该get就是一个httpRequest:func (cli *Client) get(ctx context.Context, path string, query url.Values, headers map[string][]string) (*serverResponse, error) {return cli.sendRequest(ctx, "GET", path, query, nil, headers)}func (cli *Client) sendRequest(ctx context.Context, method, path string, query url.Values, obj interface{}, headers map[string][]string) (*serverResponse, error) {var body io.Readerif obj != nil {    var err error    body, err = encodeData(obj)    if err != nil {        return nil, err    }    if headers == nil {        headers = make(map[string][]string)    }    headers["Content-Type"] = []string{"application/json"}}return cli.sendClientRequest(ctx, method, path, query, body, headers)}func (cli *Client) sendClientRequest(ctx context.Context, method, path string, query url.Values, body io.Reader, headers map[string][]string) (*serverResponse, error) {serverResp := &serverResponse{    body:       nil,    statusCode: -1,}...req, err := cli.newRequest(method, path, query, body, headers)if cli.proto == "unix" || cli.proto == "npipe" {    // For local communications, it doesn't matter what the host is. We just    // need a valid and meaningful host name. (See #189)    req.Host = "docker"}req.URL.Host = cli.addrreq.URL.Scheme = cli.transport.Scheme()if expectedPayload && req.Header.Get("Content-Type") == "" {    req.Header.Set("Content-Type", "text/plain")}resp, err := cancellable.Do(ctx, cli.transport, req)if resp != nil {    serverResp.statusCode = resp.StatusCode}...if serverResp.statusCode < 200 || serverResp.statusCode >= 400 {    body, err := ioutil.ReadAll(resp.Body)    if err != nil {        return serverResp, err    }    if len(body) == 0 {        return serverResp, fmt.Errorf("Error: request returned %s for API route and version %s, check if the server supports the requested API version", http.StatusText(serverResp.statusCode), req.URL)    }    return serverResp, fmt.Errorf("Error response from daemon: %s", bytes.TrimSpace(body))}serverResp.body = resp.BodyserverResp.header = resp.Headerreturn serverResp, nil}func Do(ctx context.Context, client transport.Sender, req *http.Request) (*http.Response, error) {...result := make(chan responseAndError, 1)go func() {    resp, err := client.Do(req)    testHookDoReturned()    result <- responseAndError{resp, err}}()var resp *http.Responseselect {case <-ctx.Done():    testHookContextDoneBeforeHeaders()    cancel()    // Clean up after the goroutine calling client.Do:    go func() {        if r := <-result; r.resp != nil && r.resp.Body != nil {            testHookDidBodyClose()            r.resp.Body.Close()        }    }()    return nil, ctx.Err()case r := <-result:    var err error    resp, err = r.resp, r.err    if err != nil {        return resp, err    }}...return resp, nil}上面列出了httpRequest的整个调用过程,最终调用client.Do(),该client对象需要回到之前的初始化过程中去,实际就是调用vemdor/github.com/docker/engine-api/client/client.go中的Client.transport,而该对象初始化时设置为apiTransport对象:type apiTransport struct {*http.Client*tlsInfotransport *http.Transport}所以client.Do()实际就是调用http.Client.Do()。OK,到此算是分析结束,具体的各个接口实现,还是需要花时间查看源码,但也都是大同小异。学习源码的过程中,可以看到很多经典的实现,比如上面介绍的cancellable.Do()接口实现,golang非常推崇的"协程+channel"的方式,通过select case的方式循环等待协程处理的结果,确实很方便。

docker

发表评论

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen: