Kubernetes中自定义Controller,


大家好,我是乔克。

在Kubernetes中,Pod是最小的调度单元,它由各种各样的Controller管理,比如ReplicaSet Controller,Deployment Controller等。

Kubernetes内置了许多Controller,这些Controller能满足80%的业务需求,但是企业里也难免需要自定义Controller来适配自己的业务需求。

网上自定义Controller的文章很多,基本都差不多。俗话说:光说不练假把式,本篇文章主要是自己的一个实践归档总结,如果对你有帮助,可以一键三连!

本文主要从以下几个方面进行介绍,其中包括理论部分和具体实践部分。

Controller的实现逻辑

当我们向kube-apiserver提出创建一个Deployment需求的时候,首先是会把这个需求存储到Etcd中,如果这时候没有Controller的话,这条数据仅仅是存在Etcd中,并没有产生实际的作用。

所以就有了Deployment Controller,它实时监听kube-apiserver中的Deployment对象,如果对象有增加、删除、修改等变化,它就会做出相应的相应处理,如下

  1. // pkg/controller/deployment/deployment_controller.go 121行 
  2. ..... 
  3.     dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ 
  4.   AddFunc:    dc.addDeployment, 
  5.   UpdateFunc: dc.updateDeployment, 
  6.   // This will enter the sync loop and no-op, because the deployment has been deleted from the store. 
  7.   DeleteFunc: dc.deleteDeployment, 
  8.  }) 
  9. ...... 

其实现的逻辑图如下(图片来自网络):

可以看到图的上半部分都由client-go实现了,下半部分才是我们具体需要去处理的。

client-go主要包含Reflector、Informer、Indexer三个组件。

  • Reflector会List&Watch kube-apiserver中的特定资源,然后会把变化的资源放入Delta FIFO队列中。
  • Informer会从Delta FIFO队列中拿取对象交给相应的HandleDeltas。
  • Indexer会将对象存储到缓存中。

上面部分不需要我们去开发,我们主要关注下半部分。

当把数据交给Informer的回调函数HandleDeltas后,Distribute会将资源对象分发到具体的处理函数,这些处理函数通过一系列判断过后,把满足需求的对象放入Workqueue中,然后再进行后续的处理。

code-generator介绍

上一节说到我们只需要去实现具体的业务需求,这是为什么呢?主要是因为kubernetes为我们提供了code-generator这样的代码生成器工具,可以通过它自动生成客户端访问的一些代码,比如Informer、ClientSet等。

code-generator提供了以下工具为Kubernetes中的资源生成代码:

  • deepcopy-gen:生成深度拷贝方法,为每个 T 类型生成 func (t* T) DeepCopy() *T 方法,API 类型都需要实现深拷贝
  • client-gen:为资源生成标准的 clientset
  • informer-gen:生成 informer,提供事件机制来响应资源的事件
  • lister-gen:生成 Lister**,**为 get 和 list 请求提供只读缓存层(通过 indexer 获取)

如果需要自动生成,就需要在代码中加入对应格式的配置,如

其中:

  • // +genclient表示需要创建client
  • // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object表示在需要实现k8s.io/apimachinery/pkg/runtime.Object这个接口

除此还有更多的用法,可以参考Kubernetes Deep Dive: Code Generation for CustomResources进行学习。

CRD介绍

CRD全称CustomResourceDefinition,中文简称自定义资源,上面说的Controller主要就是用来管理自定义的资源。

我们可以通过下面命令来查看当前集群中使用了哪些CRD,如下:

  1. # kubectl get crd 
  2. NAME                                                 CREATED AT 
  3. ackalertrules.alert.alibabacloud.com                 2021-06-15T02:19:59Z 
  4. alertmanagers.monitoring.coreos.com                  2019-12-12T12:50:00Z 
  5. aliyunlogconfigs.log.alibabacloud.com                2019-12-02T10:15:02Z 
  6. apmservers.apm.k8s.elastic.co                        2020-09-14T01:52:53Z 
  7. batchreleases.alicloud.com                           2019-12-02T10:15:53Z 
  8. beats.beat.k8s.elastic.co                            2020-09-14T01:52:53Z 
  9. chaosblades.chaosblade.io                            2021-06-15T02:30:54Z 
  10. elasticsearches.elasticsearch.k8s.elastic.co         2020-09-14T01:52:53Z 
  11. enterprisesearches.enterprisesearch.k8s.elastic.co   2020-09-14T01:52:53Z 
  12. globaljobs.jobs.aliyun.com                           2020-04-26T14:40:53Z 
  13. kibanas.kibana.k8s.elastic.co                        2020-09-14T01:52:54Z 
  14. prometheuses.monitoring.coreos.com                   2019-12-12T12:50:01Z 
  15. prometheusrules.monitoring.coreos.com                2019-12-12T12:50:02Z 
  16. servicemonitors.monitoring.coreos.com                2019-12-12T12:50:03Z 

但是仅仅是创建一个CRD对象是不够的,因为它是静态的,创建过后仅仅是保存在Etcd中,如果需要其有意义,就需要Controller配合。

创建CRD的例子如下:

  1. apiVersion: apiextensions.k8s.io/v1 
  2. kind: CustomResourceDefinition 
  3. metadata: 
  4.   # name 必须匹配下面的spec字段:<plural>.<group> 
  5.   name: students.coolops.io 
  6. spec: 
  7.   # group 名用于 REST API 中的定义:/apis/<group>/<version> 
  8.   group: coolops.io 
  9.    # 列出自定义资源的所有 API 版本 
  10.   versions: 
  11.   - name: v1    # 版本名称,比如 v1、v1beta1 
  12.     served: true    # 是否开启通过 REST APIs 访问 `/apis/<group>/<version>/...` 
  13.     storage: true   # 必须将一个且只有一个版本标记为存储版本 
  14.     schema:         # 定义自定义对象的声明规范 
  15.       openAPIV3Schema: 
  16.         type: object 
  17.         properties: 
  18.           spec: 
  19.             type: object 
  20.             properties: 
  21.               name: 
  22.                 type: string 
  23.               school: 
  24.                 type: string 
  25.   scope: Namespaced    # 定义作用范围:Namespaced(命名空间级别)或者 Cluster(整个集群) 
  26.   names: 
  27.     plural: students   # plural 名字用于 REST API 中的定义:/apis/<group>/<version>/<plural> 
  28.     shortNames:        # shortNames 相当于缩写形式 
  29.     - stu 
  30.     kind: Student      # kind 是 sigular 的一个驼峰形式定义,在资源清单中会使用  
  31.     singular: student  # singular 名称用于 CLI 操作或显示的一个别名 

具体演示

本来准备根据官方的demo进行讲解,但是感觉有点敷衍,而且这类教程网上一大堆,所以就准备自己实现一个数据库管理的一个Controller。

因为是演示怎么开发Controller,所以功能不会复杂,主要的功能是:

  • 创建数据库实例
  • 删除数据库实例
  • 更新数据库实例

开发环境说明

本次实验环境如下:

创建CRD

CRD是基础,Controller主要是为CRD服务的,所以我们要先定义好CRD资源,便于开发。

  1. apiVersion: apiextensions.k8s.io/v1 
  2. kind: CustomResourceDefinition 
  3. metadata: 
  4.   name: databasemanagers.coolops.cn 
  5. spec: 
  6.   group: coolops.cn 
  7.   versions: 
  8.     - name: v1alpha1 
  9.       served: true 
  10.       storage: true 
  11.       schema: 
  12.         openAPIV3Schema: 
  13.           type: object 
  14.           properties: 
  15.             spec: 
  16.               type: object 
  17.               properties: 
  18.                 deploymentName: 
  19.                   type: strin 
  20.                 replicas: 
  21.                   type: integer 
  22.                   minimum: 1 
  23.                   maximum: 10 
  24.                 dbtype: 
  25.                   type: string 
  26.             status: 
  27.               type: object 
  28.               properties: 
  29.                 availableReplicas: 
  30.                   type: integer 
  31.   names: 
  32.     kind: DatabaseManager 
  33.     plural: databasemanagers 
  34.     singular: databasemanager 
  35.     shortNames: 
  36.       - dm 
  37.   scope: Namespaced 

创建CRD,检验是否能创建成功。

  1. # kubectl apply -f crd.yaml  
  2. customresourcedefinition.apiextensions.k8s.io/databasemanagers.coolops.cn created 
  3. # kubectl get crd | grep databasemanagers 
  4. databasemanagers.coolops.cn                           2021-11-22T02:31:29Z 

自定义一个测试用例,如下:

  1. apiVersion: coolops.cn/v1alpha1 
  2. kind: DatabaseManager 
  3. metadata: 
  4.   name: example-mysql 
  5. spec: 
  6.   dbtype: "mysql" 
  7.   deploymentName: "example-mysql" 
  8.   replicas: 1 

创建后进行查看:

  1. # kubectl apply -f example-mysql.yaml  
  2. databasemanager.coolops.cn/example-mysql created 
  3. # kubectl get dm 
  4. NAME            AGE 
  5. example-mysql   9s 

不过现在仅仅是创建了一个静态数据,并没有任何实际的应用,下面来编写Controller来管理这个CRD。

开发Controller

项目地址:https://gitee.com/coolops/database-manager-controller

自动生成代码

1、创建项目目录database-manager-controller,并进行go mod 初始化

  1. # mkdir database-manager-controller 
  2. # cd database-manager-controller 
  3. # go mod init 

2、创建源码包目录pkg/apis/databasemanager

  1. # mkdir pkg/apis/databasemanager -p 
  2. # cd pkg/apis/databasemanager 

3、在pkg/apis/databasemanager目录下创建register.go文件,并写入一下内容

  1. package databasemanager 
  2.  
  3. // GroupName is the group for database manager 
  4. const ( 
  5.  GroupName = "coolops.cn" 

4、在pkg/apis/databasemanager目录下创建v1alpha1目录,进行版本管理

  1. # mkdir v1alpha1 
  2. # cd v1alpha1 

5、在v1alpha1目录下创建doc.go文件,并写入以下内容

  1. // +k8s:deepcopy-gen=package 
  2. // +groupName=coolops.cn 
  3.  
  4. // Package v1alpha1 is the v1alpha1 version of the API 
  5. package v1alpha1 

其中// +k8s:deepcopy-gen=package和// +groupName=coolops.cn都是为了自动生成代码而写的配置。

6、在v1alpha1目录下创建type.go文件,并写入以下内容

  1. package v1alpha1 
  2.  
  3. import metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" 
  4.  
  5. // +genclient 
  6. // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object 
  7.  
  8. type DatabaseManager struct { 
  9.  metav1.TypeMeta   `json:",inline"` 
  10.  metav1.ObjectMeta `json:"metadata,omitempty"` 
  11.  Spec              DatabaseManagerSpec   `json:"spec"` 
  12.  Status            DatabaseManagerStatus `json:"status"` 
  13.  
  14. // DatabaseManagerSpec 期望状态 
  15. type DatabaseManagerSpec struct { 
  16.  DeploymentName string `json:"deploymentName"` 
  17.  Replicas       *int32 `json:"replicas"` 
  18.  Dbtype         string `json:"dbtype"` 
  19.  
  20. // DatabaseManagerStatus 当前状态 
  21. type DatabaseManagerStatus struct { 
  22.  AvailableReplicas int32 `json:"availableReplicas"` 
  23.  
  24. // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object 
  25.  
  26. // DatabaseManagerList is a list of DatabaseManagerList resources 
  27. type DatabaseManagerList struct { 
  28.  metav1.TypeMeta `json:",inline"` 
  29.  metav1.ListMeta `json:"metadata"` 
  30.  
  31.  Items []DatabaseManager `json:"items"` 

type.go主要定义我们的资源类型。

7、在v1alpha1目录下创建register.go文件,并写入以下内容

  1. package v1alpha1 
  2.  
  3. import ( 
  4.  dbcontroller "database-manager-controller/pkg/apis/databasemanager" 
  5.  metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" 
  6.  "k8s.io/apimachinery/pkg/runtime" 
  7.  "k8s.io/apimachinery/pkg/runtime/schema" 
  8.  
  9. // SchemeGroupVersion is group version used to register these objects 
  10. var SchemeGroupVersion = schema.GroupVersion{Group: dbcontroller.GroupName, Version: dbcontroller.Version} 
  11.  
  12. // Kind takes an unqualified kind and returns back a Group qualified GroupKind 
  13. func Kind(kind string) schema.GroupKind { 
  14.  return SchemeGroupVersion.WithKind(kind).GroupKind() 
  15.  
  16. // Resource takes an unqualified resource and returns a Group qualified GroupResource 
  17. func Resource(resource string) schema.GroupResource { 
  18.  return SchemeGroupVersion.WithResource(resource).GroupResource() 
  19.  
  20. var ( 
  21.  // SchemeBuilder initializes a scheme builder 
  22.  SchemeBuilder = runtime.NewSchemeBuilder(addKnownTypes) 
  23.  // AddToScheme is a global function that registers this API group & version to a scheme 
  24.  AddToScheme = SchemeBuilder.AddToScheme 
  25.  
  26. // Adds the list of known types to Scheme. 
  27. func addKnownTypes(scheme *runtime.Scheme) error { 
  28.  scheme.AddKnownTypes(SchemeGroupVersion, 
  29.   &DatabaseManager{}, 
  30.   &DatabaseManagerList{}, 
  31.  ) 
  32.  metav1.AddToGroupVersion(scheme, SchemeGroupVersion) 
  33.  return nil 

register.go的作用是通过addKnownTypes方法使得client可以知道DatabaseManager类型的API对象。

至此,自动生成代码的准备工作完成了,目前的代码目录结构如下:

  1. # tree . 
  2. ├── artifacts 
  3. │   └── database-manager 
  4. │       ├── crd.yaml 
  5. │       └── example-mysql.yaml 
  6. ├── go.mod 
  7. ├── go.sum 
  8. ├── LICENSE 
  9. ├── pkg 
  10. │   └── apis 
  11. │       └── databasemanager 
  12. │           ├── register.go 
  13. │           └── v1alpha1 
  14. │               ├── doc.go 
  15. │               ├── register.go 
  16. │               └── type.go 

接下里就使用code-generator进行代码自动生成了。

8、创建生成代码的脚本

以下代码主要参考sample-controller

(1)在项目根目录下,创建hack目录,代码生成的脚本配置在该目录下

  1. # mkdir hack && cd hack 

(2)创建tools.go文件,添加 code-generator 依赖

  1. //go:build tools 
  2. // +build tools 
  3.  
  4. // This package imports things required by build scripts, to force `go mod` to see them as dependencies 
  5. package tools 
  6.  
  7. import _ "k8s.io/code-generator" 

(3)创建update-codegen.sh文件,用来生成代码

  1. #!/usr/bin/env bash 
  2.  
  3. set -o errexit 
  4. set -o nounset 
  5. set -o pipefail 
  6.  
  7. SCRIPT_ROOT=$(dirname "${BASH_SOURCE[0]}")/.. 
  8. CODEGEN_PKG=${CODEGEN_PKG:-$(cd "${SCRIPT_ROOT}"; ls -d -1 ./vendor/k8s.io/code-generator 2>/dev/null || echo ../code-generator)} 
  9.  
  10. # generate the code with: 
  11. # --output-base    because this script should also be able to run inside the vendor dir of 
  12. #                  k8s.io/kubernetes. The output-base is needed for the generators to output into the vendor dir 
  13. #                  instead of the $GOPATH directly. For normal projects this can be dropped. 
  14. bash "${CODEGEN_PKG}"/generate-groups.sh "deepcopy,client,informer,lister" \ 
  15.   database-manager-controller/pkg/client  database-manager-controller/pkg/apis \ 
  16.   databasemanager:v1alpha1 \ 
  17.   --output-base "$(dirname "${BASH_SOURCE[0]}")/../.." \ 
  18.   --go-header-file "${SCRIPT_ROOT}"/hack/boilerplate.go.txt 
  19.  
  20. # To use your own boilerplate text append: 
  21. #   --go-header-file "${SCRIPT_ROOT}"/hack/custom-boilerplate.go.txt 

其中以下代码段根据实际情况进行修改。

  1. bash "${CODEGEN_PKG}"/generate-groups.sh "deepcopy,client,informer,lister" \ 
  2.   database-manager-controller/pkg/client  database-manager-controller/pkg/apis \ 
  3.   databasemanager:v1alpha1 \ 
  4.   --output-base "$(dirname "${BASH_SOURCE[0]}")/../.." \ 
  5.   --go-header-file "${SCRIPT_ROOT}"/hack/boilerplate.go.txt 

(4)创建verify-codegen.sh文件,主要用于校验生成的代码是否为最新的

  1. #!/usr/bin/env bash 
  2.  
  3. set -o errexit 
  4. set -o nounset 
  5. set -o pipefail 
  6.  
  7. SCRIPT_ROOT=$(dirname "${BASH_SOURCE[0]}")/.. 
  8.  
  9. DIFFROOT="${SCRIPT_ROOT}/pkg" 
  10. TMP_DIFFROOT="${SCRIPT_ROOT}/_tmp/pkg" 
  11. _tmp="${SCRIPT_ROOT}/_tmp" 
  12.  
  13. cleanup() { 
  14.   rm -rf "${_tmp}" 
  15. trap "cleanup" EXIT SIGINT 
  16.  
  17. cleanup 
  18.  
  19. mkdir -p "${TMP_DIFFROOT}" 
  20. cp -a "${DIFFROOT}"/* "${TMP_DIFFROOT}" 
  21.  
  22. "${SCRIPT_ROOT}/hack/update-codegen.sh" 
  23. echo "diffing ${DIFFROOT} against freshly generated codegen" 
  24. ret=0 
  25. diff -Naupr "${DIFFROOT}" "${TMP_DIFFROOT}" || ret=$? 
  26. cp -a "${TMP_DIFFROOT}"/* "${DIFFROOT}" 
  27. if [[ $ret -eq 0 ]] 
  28. then 
  29.   echo "${DIFFROOT} up to date." 
  30. else 
  31.   echo "${DIFFROOT} is out of date. Please run hack/update-codegen.sh" 
  32.   exit 1 
  33. fi 

(5)创建boilerplate.go.txt,主要用于为代码添加开源协议

  1. /* 
  2. Copyright The Kubernetes Authors. 
  3.  
  4. Licensed under the Apache License, Version 2.0 (the "License"); 
  5. you may not use this file except in compliance with the License. 
  6. You may obtain a copy of the License at 
  7.  
  8.     http://www.apache.org/licenses/LICENSE-2.0 
  9.  
  10. Unless required by applicable law or agreed to in writing, software 
  11. distributed under the License is distributed on an "AS IS" BASIS, 
  12. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
  13. See the License for the specific language governing permissions and 
  14. limitations under the License. 
  15. */ 

(6)配置go vendor依赖目录

从update-codegen.sh脚本可以看到该代码生成脚本是利用vendor目录下的依赖进行的,我们项目本身没有配置,执行以下命令进行创建。

  1. # go mod vendor 

(7)在项目根目录下执行脚本生成代码

  1. # chmod +x hack/update-codegen.sh 
  2. # ./hack/update-codegen.sh  
  3. Generating deepcopy funcs 
  4. Generating clientset for databasemanager:v1alpha1 at database-manager-controller/pkg/client/clientset 
  5. Generating listers for databasemanager:v1alpha1 at database-manager-controller/pkg/client/listers 
  6. Generating informers for databasemanager:v1alpha1 at database-manager-controller/pkg/client/informers 

然后新的目录结构如下:

  1. # tree pkg/ 
  2. pkg/ 
  3. ├── apis 
  4. │   └── databasemanager 
  5. │       ├── register.go 
  6. │       └── v1alpha1 
  7. │           ├── doc.go 
  8. │           ├── register.go 
  9. │           ├── type.go 
  10. │           └── zz_generated.deepcopy.go 
  11. └── client 
  12.     ├── clientset 
  13.     │   └── versioned 
  14.     │       ├── clientset.go 
  15.     │       ├── doc.go 
  16.     │       ├── fake 
  17.     │       │   ├── clientset_generated.go 
  18.     │       │   ├── doc.go 
  19.     │       │   └── register.go 
  20.     │       ├── scheme 
  21.     │       │   ├── doc.go 
  22.     │       │   └── register.go 
  23.     │       └── typed 
  24.     │           └── databasemanager 
  25.     │               └── v1alpha1 
  26.     │                   ├── databasemanager_client.go 
  27.     │                   ├── databasemanager.go 
  28.     │                   ├── doc.go 
  29.     │                   ├── fake 
  30.     │                   │   ├── doc.go 
  31.     │                   │   ├── fake_databasemanager_client.go 
  32.     │                   │   └── fake_databasemanager.go 
  33.     │                   └── generated_expansion.go 
  34.     ├── informers 
  35.     │   └── externalversions 
  36.     │       ├── databasemanager 
  37.     │       │   ├── interface.go 
  38.     │       │   └── v1alpha1 
  39.     │       │       ├── databasemanager.go 
  40.     │       │       └── interface.go 
  41.     │       ├── factory.go 
  42.     │       ├── generic.go 
  43.     │       └── internalinterfaces 
  44.     │           └── factory_interfaces.go 
  45.     └── listers 
  46.         └── databasemanager 
  47.             └── v1alpha1 
  48.                 ├── databasemanager.go 
  49.                 └── expansion_generated.go 

Controller开发

上面已经完成了自动代码的生成,生成了informer、lister、clientset的代码,下面就开始编写真正的Controller功能了。

我们需要实现的功能是:

  • 创建数据库实例
  • 更新数据库实例
  • 删除数据库实例

(1)在代码根目录创建controller.go文件,编写如下内容

  1. package main 
  2.  
  3. import ( 
  4.  "context" 
  5.  dbmanagerv1 "database-manager-controller/pkg/apis/databasemanager/v1alpha1" 
  6.  clientset "database-manager-controller/pkg/client/clientset/versioned" 
  7.  dbmanagerscheme "database-manager-controller/pkg/client/clientset/versioned/scheme" 
  8.  informers "database-manager-controller/pkg/client/informers/externalversions/databasemanager/v1alpha1" 
  9.  listers "database-manager-controller/pkg/client/listers/databasemanager/v1alpha1" 
  10.  "fmt" 
  11.  "github.com/golang/glog" 
  12.  appsv1 "k8s.io/api/apps/v1" 
  13.  corev1 "k8s.io/api/core/v1" 
  14.  "k8s.io/apimachinery/pkg/api/errors" 
  15.  metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" 
  16.  "k8s.io/apimachinery/pkg/util/runtime" 
  17.  utilruntime "k8s.io/apimachinery/pkg/util/runtime" 
  18.  "k8s.io/apimachinery/pkg/util/wait" 
  19.  appsinformers "k8s.io/client-go/informers/apps/v1" 
  20.  "k8s.io/client-go/kubernetes" 
  21.  "k8s.io/client-go/kubernetes/scheme" 
  22.  typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" 
  23.  appslisters "k8s.io/client-go/listers/apps/v1" 
  24.  "k8s.io/client-go/tools/cache" 
  25.  "k8s.io/client-go/tools/record" 
  26.  "k8s.io/client-go/util/workqueue" 
  27.  "k8s.io/klog/v2" 
  28.  "time" 
  29.  
  30. const controllerAgentName = "database-manager-controller" 
  31.  
  32. const ( 
  33.  // SuccessSynced 用来表示事件被成功同步 
  34.  SuccessSynced = "Synced" 
  35.  // MessageResourceSynced 表示事件被触发时的消息信息 
  36.  MessageResourceSynced = "database manager synced successfully" 
  37.  MessageResourceExists = "Resource %q already exists and is not managed by DatabaseManager" 
  38.  ErrResourceExists     = "ErrResourceExists" 
  39.  
  40. type Controller struct { 
  41.  // kubeclientset 是kubernetes的clientset 
  42.  kubeclientset kubernetes.Interface 
  43.  // dbmanagerclientset 是自己定义的API Group的clientset 
  44.  dbmanagerclientset clientset.Interface 
  45.  
  46.  // deploymentsLister list deployment 对象 
  47.  deploymentsLister appslisters.DeploymentLister 
  48.  // deploymentsSynced 同步deployment对象 
  49.  deploymentsSynced cache.InformerSynced 
  50.  
  51.  // dbmanagerLister list databasemanager 对象 
  52.  dbmanagerLister listers.DatabaseManagerLister 
  53.  // dbmanagerSynced 同步DatabaseManager对象 
  54.  dbmanagerSynced cache.InformerSynced 
  55.  
  56.  // workqueue 限速的队列 
  57.  workqueue workqueue.RateLimitingInterface 
  58.  // recorder 事件记录器 
  59.  recorder record.EventRecorder 
  60.  
  61. // NewController 初始化Controller 
  62. func NewController(kubeclientset kubernetes.Interface, dbmanagerclientset clientset.Interface, 
  63.  dbmanagerinformer informers.DatabaseManagerInformer, deploymentInformer appsinformers.DeploymentInformer) *Controller { 
  64.  
  65.  utilruntime.Must(dbmanagerscheme.AddToScheme(scheme.Scheme)) 
  66.  glog.V(4).Info("Create event broadcaster") 
  67.  // 创建eventBroadcaster 
  68.  eventBroadcaster := record.NewBroadcaster() 
  69.  // 保存events到日志 
  70.  eventBroadcaster.StartLogging(glog.Infof) 
  71.  // 上报events到APIServer 
  72.  eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")}) 
  73.  recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName}) 
  74.  
  75.  // 初始化Controller 
  76.  controller := &Controller{ 
  77.   kubeclientset:      kubeclientset, 
  78.   dbmanagerclientset: dbmanagerclientset, 
  79.   deploymentsLister:  deploymentInformer.Lister(), 
  80.   deploymentsSynced:  deploymentInformer.Informer().HasSynced, 
  81.   dbmanagerLister:    dbmanagerinformer.Lister(), 
  82.   dbmanagerSynced:    dbmanagerinformer.Informer().HasSynced, 
  83.   workqueue:          workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DatabaseManagers"), 
  84.   recorder:           recorder, 
  85.  } 
  86.  
  87.  glog.Info("Start up event handlers") 
  88.  
  89.  // 注册Event Handler,分别对于添加、更新、删除事件,具体的操作由事件对应的API将其加入队列中 
  90.  dbmanagerinformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ 
  91.   AddFunc: controller.enqueueDatabaseManager, 
  92.   UpdateFunc: func(oldObj, newObj interface{}) { 
  93.    oldDBManager := oldObj.(*dbmanagerv1.DatabaseManager) 
  94.    newDBManager := newObj.(*dbmanagerv1.DatabaseManager) 
  95.    if oldDBManager.ResourceVersion == newDBManager.ResourceVersion { 
  96.     return 
  97.    } 
  98.    controller.enqueueDatabaseManager(newObj) 
  99.   }, 
  100.   DeleteFunc: controller.enqueueDatabaseManagerForDelete, 
  101.  }) 
  102.  
  103.  // 注册Deployment Event Handler 
  104.  deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ 
  105.   AddFunc: controller.handleObject, 
  106.   UpdateFunc: func(old, new interface{}) { 
  107.    newDepl := new.(*appsv1.Deployment) 
  108.    oldDepl := old.(*appsv1.Deployment) 
  109.    if newDepl.ResourceVersion == oldDepl.ResourceVersion { 
  110.     // 如果没有改变,就返回 
  111.     return 
  112.    } 
  113.    controller.handleObject(new) 
  114.   }, 
  115.   DeleteFunc: controller.handleObject, 
  116.  }) 
  117.  
  118.  return controller 
  119.  
  120. // Run 启动入口 
  121. func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error { 
  122.  defer utilruntime.HandleCrash() 
  123.  defer c.workqueue.ShuttingDown() 
  124.  
  125.  glog.Info("start controller, cache sync") 
  126.  // 同步缓存数据 
  127.  if ok := cache.WaitForCacheSync(stopCh, c.dbmanagerSynced); !ok { 
  128.   return fmt.Errorf("failed to wait for caches to sync") 
  129.  } 
  130.  
  131.  glog.Info("begin start worker thread") 
  132.  // 开启work线程 
  133.  for i := 0; i < threadiness; i++ { 
  134.   go wait.Until(c.runWorker, time.Second, stopCh) 
  135.  } 
  136.  
  137.  glog.Info("worker thread started!!!!!!") 
  138.  <-stopCh 
  139.  glog.Info("worker thread stopped!!!!!!") 
  140.  return nil 
  141.  
  142. // runWorker 是一个死循环,会一直调用processNextWorkItem从workqueue中取出数据 
  143. func (c *Controller) runWorker() { 
  144.  for c.processNextWorkItem() { 
  145.  
  146.  } 
  147.  
  148. // processNextWorkItem 从workqueue中取出数据进行处理 
  149. func (c *Controller) processNextWorkItem() bool { 
  150.  obj, shutdown := c.workqueue.Get() 
  151.  
  152.  if shutdown { 
  153.   return false 
  154.  } 
  155.  
  156.  // We wrap this block in a func so we can defer c.workqueue.Done. 
  157.  err := func(obj interface{}) error { 
  158.   defer c.workqueue.Done(obj) 
  159.   var key string 
  160.   var ok bool 
  161.  
  162.   if key, ok = obj.(string); !ok { 
  163.    c.workqueue.Forget(obj) 
  164.    runtime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj)) 
  165.    return nil 
  166.   } 
  167.   // 在syncHandler中处理业务 
  168.   if err := c.syncHandler(key); err != nil { 
  169.    return fmt.Errorf("error syncing '%s': %s", key, err.Error()) 
  170.   } 
  171.  
  172.   c.workqueue.Forget(obj) 
  173.   glog.Infof("Successfully synced '%s'", key) 
  174.   return nil 
  175.  }(obj) 
  176.  
  177.  if err != nil { 
  178.   runtime.HandleError(err) 
  179.   return true 
  180.  } 
  181.  
  182.  return true 
  183.  
  184. // syncHandler 处理业务Handler 
  185. func (c *Controller) syncHandler(key string) error { 
  186.  // 通过split得到namespace和name 
  187.  namespace, name, err := cache.SplitMetaNamespaceKey(key) 
  188.  if err != nil { 
  189.   runtime.HandleError(fmt.Errorf("invalid resource key: %s", key)) 
  190.   return nil 
  191.  } 
  192.  
  193.  // 从缓存中取对象 
  194.  dbManager, err := c.dbmanagerLister.DatabaseManagers(namespace).Get(name) 
  195.  if err != nil { 
  196.   // 如果DatabaseManager对象被删除了,就会走到这里 
  197.   if errors.IsNotFound(err) { 
  198.    glog.Infof("DatabaseManager对象被删除,请在这里执行实际的删除业务: %s/%s ...", namespace, name) 
  199.    return nil 
  200.   } 
  201.  
  202.   runtime.HandleError(fmt.Errorf("failed to list DatabaseManager by: %s/%s", namespace, name)) 
  203.  
  204.   return err 
  205.  } 
  206.  
  207.  glog.Infof("这里是databasemanager对象的期望状态: %#v ...", dbManager) 
  208.  
  209.  // 获取是否有deploymentName 
  210.  deploymentName := dbManager.Spec.DeploymentName 
  211.  
  212.  if deploymentName == "" { 
  213.   utilruntime.HandleError(fmt.Errorf("%s: deploymentName 不能为空", key)) 
  214.   return nil 
  215.  } 
  216.  // 判断deployment是否在集群中存在 
  217.  deployment, err := c.deploymentsLister.Deployments(dbManager.Namespace).Get(deploymentName) 
  218.  if errors.IsNotFound(err) { 
  219.   // 如果没有找到,就创建 
  220.   deployment, err = c.kubeclientset.AppsV1().Deployments(dbManager.Namespace).Create( 
  221.    context.TODO(), newDeployment(dbManager), metav1.CreateOptions{}) 
  222.  } 
  223.  
  224.  // 如果Create 或者 Get 都出错,则返回 
  225.  if err != nil { 
  226.   return err 
  227.  } 
  228.  
  229.  // 如果这个deployment不是由DatabaseManager控制,应该报告这个事件 
  230.  if !metav1.IsControlledBy(deployment, dbManager) { 
  231.   msg := fmt.Sprintf(MessageResourceExists, deployment.Name) 
  232.   c.recorder.Event(dbManager, corev1.EventTypeWarning, ErrResourceExists, msg) 
  233.   return fmt.Errorf("%s", msg) 
  234.  } 
  235.  
  236.  // 如果replicas和期望的不等,则更新deployment 
  237.  if dbManager.Spec.Replicas != nil && *dbManager.Spec.Replicas != *deployment.Spec.Replicas { 
  238.   klog.V(4).Infof("DatabaseManager %s replicas: %d, deployment replicas: %d", name, *dbManager.Spec.Replicas, *deployment.Spec.Replicas) 
  239.   deployment, err = c.kubeclientset.AppsV1().Deployments(dbManager.Namespace).Update(context.TODO(), newDeployment(dbManager), metav1.UpdateOptions{}) 
  240.  } 
  241.  
  242.  if err != nil { 
  243.   return err 
  244.  } 
  245.  
  246.  // 更新状态 
  247.  err = c.updateDatabaseManagerStatus(dbManager, deployment) 
  248.  if err != nil { 
  249.   return err 
  250.  } 
  251.  
  252.  glog.Infof("实际状态是从业务层面得到的,此处应该去的实际状态,与期望状态做对比,并根据差异做出响应(新增或者删除)") 
  253.  
  254.  c.recorder.Event(dbManager, corev1.EventTypeNormal, SuccessSynced, MessageResourceSynced) 
  255.  return nil 
  256.  
  257. // updateDatabaseManagerStatus 更新DatabaseManager状态 
  258. func (c *Controller) updateDatabaseManagerStatus(dbmanager *dbmanagerv1.DatabaseManager, deployment *appsv1.Deployment) error { 
  259.  dbmanagerCopy := dbmanager.DeepCopy() 
  260.  dbmanagerCopy.Status.AvailableReplicas = deployment.Status.AvailableReplicas 
  261.  _, err := c.dbmanagerclientset.CoolopsV1alpha1().DatabaseManagers(dbmanager.Namespace).Update(context.TODO(), dbmanagerCopy, metav1.UpdateOptions{}) 
  262.  return err 
  263.  
  264. func (c *Controller) handleObject(obj interface{}) { 
  265.  var object metav1.Object 
  266.  var ok bool 
  267.  if object, ok = obj.(metav1.Object); !ok { 
  268.   tombstone, ok := obj.(cache.DeletedFinalStateUnknown) 
  269.   if !ok { 
  270.    utilruntime.HandleError(fmt.Errorf("error decoding object, invalid type")) 
  271.    return 
  272.   } 
  273.   object, ok = tombstone.Obj.(metav1.Object) 
  274.   if !ok { 
  275.    utilruntime.HandleError(fmt.Errorf("error decoding object tombstone, invalid type")) 
  276.    return 
  277.   } 
  278.   klog.V(4).Infof("Recovered deleted object '%s' from tombstone", object.GetName()) 
  279.  } 
  280.  klog.V(4).Infof("Processing object: %s", object.GetName()) 
  281.  if ownerRef := metav1.GetControllerOf(object); ownerRef != nil { 
  282.   // 检查对象是否和DatabaseManager对象关联,如果不是就退出 
  283.   if ownerRef.Kind != "DatabaseManager" { 
  284.    return 
  285.   } 
  286.  
  287.   dbmanage, err := c.dbmanagerLister.DatabaseManagers(object.GetNamespace()).Get(ownerRef.Name) 
  288.   if err != nil { 
  289.    klog.V(4).Infof("ignoring orphaned object '%s' of databaseManager '%s'", object.GetSelfLink(), ownerRef.Name) 
  290.    return 
  291.   } 
  292.  
  293.   c.enqueueDatabaseManager(dbmanage) 
  294.   return 
  295.  } 
  296.  
  297. func newDeployment(dbmanager *dbmanagerv1.DatabaseManager) *appsv1.Deployment { 
  298.  var image string 
  299.  var name string 
  300.  switch dbmanager.Spec.Dbtype { 
  301.  case "mysql": 
  302.   image = "mysql:5.7" 
  303.   name = "mysql" 
  304.  case "mariadb": 
  305.   image = "mariadb:10.7.1" 
  306.   name = "mariadb" 
  307.  default: 
  308.   image = "mysql:5.7" 
  309.   name = "mysql" 
  310.  } 
  311.  
  312.  labels := map[string]string{ 
  313.   "app": dbmanager.Spec.Dbtype, 
  314.  } 
  315.  return &appsv1.Deployment{ 
  316.   ObjectMeta: metav1.ObjectMeta{ 
  317.    Namespace: dbmanager.Namespace, 
  318.    Name:      dbmanager.Name, 
  319.    OwnerReferences: []metav1.OwnerReference{ 
  320.     *metav1.NewControllerRef(dbmanager, dbmanagerv1.SchemeGroupVersion.WithKind("DatabaseManager")), 
  321.    }, 
  322.   }, 
  323.   Spec: appsv1.DeploymentSpec{ 
  324.    Replicas: dbmanager.Spec.Replicas, 
  325.    Selector: &metav1.LabelSelector{MatchLabels: labels}, 
  326.    Template: corev1.PodTemplateSpec{ 
  327.     ObjectMeta: metav1.ObjectMeta{Labels: labels}, 
  328.     Spec: corev1.PodSpec{ 
  329.      Containers: []corev1.Container{ 
  330.       { 
  331.        Name:  name, 
  332.        Image: image, 
  333.       }, 
  334.      }, 
  335.     }, 
  336.    }, 
  337.   }, 
  338.  } 
  339.  
  340. // 数据先放入缓存,再入队列 
  341. func (c *Controller) enqueueDatabaseManager(obj interface{}) { 
  342.  var key string 
  343.  var err error 
  344.  // 将对象放入缓存 
  345.  if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { 
  346.   runtime.HandleError(err) 
  347.   return 
  348.  } 
  349.  
  350.  // 将key放入队列 
  351.  c.workqueue.AddRateLimited(key) 
  352.  
  353. // 删除操作 
  354. func (c *Controller) enqueueDatabaseManagerForDelete(obj interface{}) { 
  355.  var key string 
  356.  var err error 
  357.  // 从缓存中删除指定对象 
  358.  key, err = cache.DeletionHandlingMetaNamespaceKeyFunc(obj) 
  359.  if err != nil { 
  360.   runtime.HandleError(err) 
  361.   return 
  362.  } 
  363.  //再将key放入队列 
  364.  c.workqueue.AddRateLimited(key) 

其主要逻辑和文章开头介绍的Controller实现逻辑一样,其中关键点在于:

  • 在NewController方法中,定义了DatabaseManager和Deployment对象的Event Handler,除了同步缓存外,还将对应的Key放入queue中。
  • 实际处理业务的方法是syncHandler,可以根据实际请求来编写代码以达到业务需求。

2、在项目根目录下创建main.go,编写入口函数

(1)编写处理系统信号量的Handler

这部分直接使用的demo中的代码

(2)编写入口main函数

  1. package main 
  2.  
  3. import ( 
  4.  "flag" 
  5.  "time" 
  6.  
  7.  kubeinformers "k8s.io/client-go/informers" 
  8.  "k8s.io/client-go/kubernetes" 
  9.  "k8s.io/client-go/tools/clientcmd" 
  10.  "k8s.io/klog/v2" 
  11.  
  12.  clientset "database-manager-controller/pkg/client/clientset/versioned" 
  13.  informers "database-manager-controller/pkg/client/informers/externalversions" 
  14.  "database-manager-controller/pkg/signals" 
  15.  
  16. var ( 
  17.  masterURL  string 
  18.  kubeconfig string 
  19.  
  20. func main() { 
  21.  // klog.InitFlags(nil) 
  22.  flag.Parse() 
  23.  
  24.  // 设置处理系统信号的Channel 
  25.  stopCh := signals.SetupSignalHandler() 
  26.  
  27.  // 处理入参 
  28.  cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfig) 
  29.  if err != nil { 
  30.   klog.Fatalf("Error building kubeconfig: %s", err.Error()) 
  31.  } 
  32.  
  33.  // 初始化kubeClient 
  34.  kubeClient, err := kubernetes.NewForConfig(cfg) 
  35.  if err != nil { 
  36.   klog.Fatalf("Error building kubernetes clientset: %s", err.Error()) 
  37.  } 
  38.  
  39.  // 初始化dbmanagerClient 
  40.  dbmanagerClient, err := clientset.NewForConfig(cfg) 
  41.  if err != nil { 
  42.   klog.Fatalf("Error building example clientset: %s", err.Error()) 
  43.  } 
  44.  
  45.  kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30) 
  46.  dbmanagerInformerFactory := informers.NewSharedInformerFactory(dbmanagerClient, time.Second*30) 
  47.  
  48.  // 初始化controller 
  49.  controller := NewController(kubeClient, dbmanagerClient, 
  50.   dbmanagerInformerFactory.Coolops().V1alpha1().DatabaseManagers(), kubeInformerFactory.Apps().V1().Deployments()) 
  51.  
  52.  // notice that there is no need to run Start methods in a separate goroutine. (i.e. go kubeInformerFactory.Start(stopCh) 
  53.  // Start method is non-blocking and runs all registered informers in a dedicated goroutine. 
  54.  kubeInformerFactory.Start(stopCh) 
  55.  dbmanagerInformerFactory.Start(stopCh) 
  56.  
  57.  if err = controller.Run(2, stopCh); err != nil { 
  58.   klog.Fatalf("Error running controller: %s", err.Error()) 
  59.  } 
  60.  
  61. func init() { 
  62.  flag.StringVar(&kubeconfig, "kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.") 
  63.  flag.StringVar(&masterURL, "master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.") 

测试Controller

1、在项目目录下添加一个Makefile

  1. build: 
  2.  echo "build database manager controller" 
  3.  CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build . 

2、执行make build进行编译

  1. # make build 
  2. echo "build database manager controller" 
  3. build database manager controller 
  4. CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build . 

然后会输出database-manager-controller一个二进制文件。

3、运行controller

  1. # chmod +x database-manager-controller 
  2. # ./database-manager-controller -kubeconfig=$HOME/.kube/config -alsologtostderr=true 
  3. I1123 09:52:41.595726   29173 controller.go:81] Start up event handlers 
  4. I1123 09:52:41.597448   29173 controller.go:120] start controller, cache sync 
  5. I1123 09:52:41.699716   29173 controller.go:125] begin start worker thread 
  6. I1123 09:52:41.699737   29173 controller.go:130] worker thread started!!!!!! 

4、创建一个CRD测试用例,观察日志以及是否创建deployment

(1)测试样例如下

  1. # cat example-mysql.yaml  
  2. apiVersion: coolops.cn/v1alpha1 
  3. kind: DatabaseManager 
  4. metadata: 
  5.   name: example-mysql 
  6. spec: 
  7.   dbtype: "mysql" 
  8.   deploymentName: "mysql" 
  9.   replicas: 1 

(2)执行以下命令进行创建,观察日志

  1. # kubectl apply -f example-mysql.yaml  
  2. databasemanager.coolops.cn/example-mysql created 

可以看到对于的deployment和pod已经创建,不过由于Deployment的配置没有配置完全,mysql没有正常启动。

我们其实是可以看到Controller获取到了事件。

如果我们删除对象,也可以从日志里正常看到响应。

总结

上面就是自定义Controller的整个开发过程,相对来说还是比较简单,大部分东西社区都做好了,我们只需要套模子,然后实现自己的逻辑就行。

整个过程主要是参考sample-controller ,现在简单整理如下:

  • 确定好目的,然后创建CRD,定义需要的对象
  • 按规定编写代码,定义好CRD所需要的type,然后使用code-generator进行代码自动生成,生成需要的informer、lister、clientset。
  • 编写Controller,实现具体的业务逻辑
  • 编写完成后就是验证,看看是否符合预期,根据具体情况再做进一步的调整

相关内容