如何写一个 K8s Operator


使用过 k8s 的同学可能执行过以下命令:

kubectl edit sts myapp # 编辑一个名称为 myapp 的 StatefulSet
kubectl describe sts myapp # 查看一个名称为 myapp 的 StatefulSet

StatefulSet 是 k8s 定义的一种资源,类似的还有 Deployment、Job、ConfigMap 等。当你执行 edit 命令编辑这些资源后,k8s 会通过不停轮询的方式(核心概念:control loop),将目标资源调整(核心概念:reconcile)到你期望的状态。

例如,你按了空调的遥控器,希望将房间的温度下调到 20℃。空调的压缩机开始工作,并且同时不停的检测当前实际的温度与你期望的温度之间的差异,直到温度达到20℃,这就是一个 control loop 的例子。

很简单,对吧?

设想一下如果不是这样,你将一手拿着温度计,然后不停的告诉空调温度仍然很高,或者已经变得过低了。

这正是声明式 API 的精妙所在——开发者只需声明目标状态,剩下的协调工作(即控制回路)就如同自动驾驶系统般持续运转,通过不断比对期望状态与实际状态差异,自主完成收敛过程。

Operator 是什么

试想我们不再满足于 k8s 提供的默认的资源,我们想利用这种省心省力的方式,来管理我们自己的资源,如:数据库的一个用户。

你可能想说,数据库的用户存在于数据库内,我知道数据库的集群可以定义为 StatefulSet 然后由 k8s 管理,用户又怎么使用 k8s 管理呢?为什么要用 k8s 来管理呢?

为什么要用 k8s 管理用户资源?

以 MySql 为例,通常我们创建用户,是使用 root 用户登录到数据库,执行 sql 语句创建用户。但是设想以下几种场景:

  1. 你不知道 root 用户的密码,或者因为安全要求,不能提供给你
  2. 你不知道 MySql 的 IP
  3. 你知道以上信息,但是因为没有开启相应的节点权限,你无法登录数据库
  4. 你完成了以上所有步骤,结果其中某些登录或者创建步骤失败了,你和数据库运维人员开始扯皮

看到了吧?这些都是生产环境中,真实会遇到的事情。而使用以下步骤,我们就可以一举解决这些问题。

怎么做到?

把大象关进冰箱需要三步,而我们要使用 Operator 完成在数据库中创建用户只需要两步:


  1. 告诉数据库,我需要创建的用户信息


    apiVersion: handsomeguy.cn/v1alpha1
    kind: DatabaseUser
    metadata:
    name: cnhandsomeguy
    spec:
    user: cnhandsomeguy
    password: changeit

    这个 YAML 文件定义了一个标准的 Kubernetes 自定义资源(Custom Resource),其结构包含:



    • apiVersion:遵循 semver 规范的版本标识(注意:非兼容性变更必须升级主版本号)

    • kind:资源类型标识符(类比 Kubernetes 原生的 StatefulSet/Deployment)

    • spec:声明式配置区,包含用户名及安全凭证引用(后续将演示如何通过 Secret 实现密钥安全存储)


    聪明的同学肯定能看到,这个资源还缺少了一些信息,如需要在哪个数据库创建?密码怎么明文写在这里了呢?我们将在后面的章节完善这些部分。



  2. 数据库来创建用户


    实际上此时并不是数据库来执行创建用户的动作,而是我们的 Operator。Operator 一直在待命(持续的监控 Kind 为 DatabaseUser 的资源),在我们提交上面的请求后,它就可以连接到数据库,执行创建用户的动作,当然,这部分逻辑需要由我们自己来编写。


简单吧!

Operator 本质上是一个可编程的 Kubernetes 控制器,遵循控制回路的设计范式。如同工厂里的自动化机械臂,这个智能控制器会持续监听自定义资源的变更事件,开发者只需编写业务逻辑来决定如何响应这些事件:

  1. 如何实现这些创建、更新、删除的逻辑
  2. 检测是否达到了期望,如果返回了错误,则认为需要进入下一次循环,再来一遍!

看看一个案例

还有一个重要的概念没有介绍:自定义资源的定义(custom resource definition,简称 crd)。有点绕口,但是试着这么理解:

  1. Operator 是一个进程,一直运行在 k8s 集群内部,监控着某种 Kind 的资源的事件
  2. 它到底在监控什么呢?我们需要一个名字!(Kind)
  3. 如果它监控到了 DatabaseUser,该如何去 spec 中找到用户名、密码这些信息呢?

    我们需要一个定义,一个描述文件,来事先告诉 Operator 数据库用户的类型、细节,以便于 Operator 来监控、按照流程执行。

使用以下命令可以查看当前 k8s 中已经有哪些 crd:

kubectl get crd

所以现在,我们需要以下几种东西:

  1. 资源定义(crd)
  2. Operator 程序的编码和部署
  3. 资源(cr)

吓到我了,我需要从零开始编码,写一个 Operator 吗?可以用 Java 吗?部署在哪?怎么监控?怎么对接 k8s?

不必担心!我们将使用 Kubebuilder 框架——这个 Kubernetes 官方维护的 Operator SDK,就像给你的代码装配了涡轮增压引擎,能自动生成 80% 的脚手架代码。准备好你的代码编辑器,我们要进入加速开发模式了!

使用 Kubebuilder 开发 Operator

有点快了,Kubebuilder 是什么?为什么选它?

还有个选择是 operator-sdk,大同小异,都是生成代码的工具罢了。当然你想手撸也不是不行。

我建议通篇阅读一下 https://book.kubebuilder.io/,但是时间有限的同学,看本文熟悉下脉络就行。本文有个作用是,帮助你避免一些坑,否则你生成的代码很可能是在某些 k8s 版本上跑不起来的。

准备


  1. 准备一台 linux 虚拟机,并且安装好 gcc 和 make 命令。


    按照 https://book.kubebuilder.io/quick-start.html#installation 执行命令,下载 kubebuilder。



  2. 初始化仓库


    按照 https://book.kubebuilder.io/quick-start.html#create-a-project 执行命令,执行初始化


    kubebuilder init --domain handsomeguy.cn --repo handsomeguy.cn/databaseuser


  3. 创建一个 API


    一个 Operator 可以管理多个资源,这些资源可以理解为就是一个 API。


    kubebuilder create api --crd-version v1 --group mygroup --version v1alpha1 --kind DatabaseUser

    这里注意一下, –crd-version 从 k8s 1.16 的版本后就不支持 v1beta1 了。

    这一步骤会生成一些 go 文件。


  4. 编辑这些 go 文件。

    假设我们要增加用户名和密码:

    type DatabaseUser struct {
    metav1.TypeMeta `json:",inline"`
    metav1.ObjectMeta `json:"metadata"`
    Spec DatabaseUserSpec `json:"spec,omitempty"`
    }
    type DatabaseUserSpec struct {
    User string `json:"user,omitempty"`
    Password corev1.SecretKeySelector `json:"password,omitempty"`
    }

    编写好了,假设先写这么多。有两点要说明一下:

    1. 上面我们说密码是明文存储的,不安全,这里我们使用了一个 corev1.SecretKeySelector。假设你的密码存储在了某一个名为 my-secret 的卷中的 my-key 字段,我们就可以在后面使用如下方式来取到它的明文。
      apiVersion: handsomeguy.cn/v1alpha1
      kind: DatabaseUser
      metadata:
      name: cnhandsomeguy
      spec:
      user: cnhandsomeguy
      password:
      name: my-secret
      key: my-key

    2. 用户的定义还可以增加 status 这样的 sub-resource,这样在用户创建失败的时候,将失败的信息刷回到 status 中,就可以使用 kubectl describe 命令来查看失败信息,后文会给案例。

  5. 生成 crd 文件


    make manifests #在项目的根目录执行 

    重大提醒


    根目录中有MakeFile文件,有必要仔细阅读一下。因为 make manifests 步骤会调用一个 controller-gen 的工具,来生成 crd 文件。但是 controller-gen 和 k8s 是有严格的配套关系的。如果你想生成老版本的(k8s 1.12)的 v1beta1 版本的 crd 文件,就必须使用 0.6.2 版本之前的 controller-gen。可以查看其 release 页面 https://github.com/kubernetes-sigs/controller-tools/releases 来确定使用什么版本。

    controller-gen 是由 MakeFile 中指定并在 make 过程中自动下载的(你也可以下载好后放到指定位置),如果想要修改 controller-gen 的版本,可以在 MakeFile 的以下位置修改:


    controller-gen: ## Download controller-gen locally if necessary.
    $(call go-get-tool,$(CONTROLLER_GEN),sigs.k8s.io/controller-tools/cmd/controller-gen@v0.6.2) #将0.6.2改为你需要的版本


  6. 同时生成多个版本的 crd 文件

    修改 MakeFile 的以下几行,以同时生成支持新老版本 k8s 的 crd 文件。


    CRD_OPTIONS ?= "crd:crdVersions={v1beta1,v1},trivialVersions=true,preserveUnknownFields=false"
    manifests: controller-gen ## Generate WebhookConfiguration, ClusterRole and CustomResourceDefinition objects.
    $(CONTROLLER_GEN) $(CRD_OPTIONS) rbac:roleName=my-crd-manager webhook paths=./... output:crd:artifacts:config=config/crd/bases

    实际执行的命令其实是,


    bin/controller-gen rbac:roleName=my-crd-manager crd:crdVersions={v1,v1beta1} webhook paths=./... output:crd:artifacts:config=config/crd/ bases

    因此你也可以手动执行。



  7. 在 k8s cluster 中创建这个 crd

    在进行这一步之前,你可以微调你的 crd,比如调整它的缩写为 dbu,这样就可以执行 kubectl get dbu 来查看你的资源。


    ---
    apiVersion: apiextensions.k8s.io/v1
    kind: CustomResourceDefinition
    metadata:
    annotations:
    controller-gen.kubebuilder.io/version: v0.6.2
    "helm.sh/resource-policy": keep
    creationTimestamp: null
    name: databaseuserss.handsomeguy.cn
    spec:
    group: mygroup.handsomeguy.cn
    names:
    kind: DatabaseUser
    listKind: DatabaseUserList
    plural: databaseusers
    singular: databaseuser
    shortNames: [dbu]
    preserveUnknownFields: false
    scope: Namespaced
    versions:
    // ... 省略

    使用以下命令创建并查看 crd。


    kubectl apply -f my-crd.yaml
    kubectl get crd

    到这里,准备工作就做完了(不出意外,你会在 make manifests 的时候遇到很多报错,请耐心查看报错,一一思考解决,都是有迹可循的。)。

    我们现在有了



  8. crd



  9. 代码

    此时可以观察一下生成的代码,如 DatabaseUserController 的 Reconcile 方法,这里将是你编码的主要阵地。

    还差亿点点小细节,就可以编写 cr 文件并部署测试了。


对接 k8s

首先我们应该对接 k8s,不然怎么知道我们的 operator 是否能正常监听到资源呢?
查看 sigs.k8s.io/controller-runtime/pkg/client/config/config.go 的源码,应该是有很多中配置的方式,我们选择最简单的一种:指定 KUBECONFIG 变量。

kind: Config
apiVersion: v1
clusters:
- cluster:
    insecure-skip-tls-verify: false
    certificate-authority: {{CA_DIR}}/ca.crt
    server: https://{{KUBERNETES_MASTER}}
  name: cluster
users:
- user:
    client-certificate: {{CA_DIR}}/kubecfg.crt
    client-key-data: {{CLIENT_KEY}}
  name: user
contexts:
- context:
    cluster: cluster
    user: user
  name: defaultContext
current-context: defaultContext

这个配置不是开箱即用的,多想一想怎么获取到这些证书吧,我写本文的时候手头没有 k8s 集群,暂不能提供方法了。

配置好后,在你的 go 程序运行的时候指定或在开发过程中在 goland 配置都可以,具体方式不再赘述。

开始编码


  1. 监控指定 namespace 的资源


    operator 可以监控一个或多个 k8s namespace 下的资源。在 main.go 中找到如下位置,修改即可:


    mgrOptions := ctrl.Options{
    Scheme: scheme,
    MetricsBindAddress: setup.MetricsAddr,
    HealthProbeBindAddress: setup.ProbeAddr,
    NewCache: cache.MultiNamespacedCacheBuilder(your_name_spaces), // 在此处指定需要监控的 namespace,实际生产过程中这些都是要做成可配置的,或通过启动参数指定
    }
    mgr, err := ctrl.NewManager(cfg, mgrOptions)


  2. 监控要创建到某个数据库的资源


    可以利用 k8s 资源的 label。如:


    apiVersion: handsomeguy.cn/v1alpha1
    kind: DatabaseUser
    metadata:
    label:
    target: that-database
    name: cnhandsomeguy
    spec:
    user: cnhandsomeguy
    password:
    name: my-secret
    key: my-key

    这样我们就可以在 DatabaseUserController 中过滤出要创建到指定数据库的资源,其它数据库的资源都不管。


    func GetLabelEventFilter(label string, value string) predicate.Predicate {
    return predicate.Funcs{
    UpdateFunc: func(event event.UpdateEvent) bool {
    return strings.ToLower(event.ObjectOld.GetLabels()['target']) == strings.ToLower(value)
    },
    DeleteFunc: func(deleteEvent event.DeleteEvent) bool {
    return strings.ToLower(deleteEvent.Object.GetLabels()['target']) == strings.ToLower(value)
    },
    CreateFunc: func(createEvent event.CreateEvent) bool {
    return strings.ToLower(createEvent.Object.GetLabels()['target']) == strings.ToLower(value)
    },
    GenericFunc: func(genericEvent event.GenericEvent) bool {
    return strings.ToLower(genericEvent.Object.GetLabels()['target']) == strings.ToLower(value)
    },
    }
    }
    // SetupWithManager 方法由 kubebuilder 生成
    func (r *DatabaseUserReconciler) SetupWithManager(mgr ctrl.Manager) error {
    if err := mgr.GetFieldIndexer().IndexField(context.Background(),
    // ... 省略
    }

    logger.Infof("setup with manager, %s, %s", constants.CrLabelKey, r.Opts.Target)
    return ctrl.NewControllerManagedBy(mgr).
    For(&v1.DatabaseUser{}).
    WithEventFilter(GetLabelEventFilter(constants.CrLabelKey, r.Opts.Target)).
    Owns(&v1.DatabaseUser{}).
    Complete(r)
    // ... 省略


  3. Reconcile 方法的编写


    func (r *DatabaseUserReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
    // get user from cluster
    var DatabaseUser v1.DatabaseUser
    if err := r.Get(ctx, req.NamespacedName, &DatabaseUser); err != nil {
    logger.Errorf("unable to fetch DatabaseUser: %v.", err)
    return ctrl.Result{}, client.IgnoreNotFound(err)
    }

    // 处理用户的删除事件
    if !DatabaseUser.ObjectMeta.DeletionTimestamp.IsZero() {
    return ctrl.Result{}, r.delete(ctx, &DatabaseUser)
    }

    logger.Infof("%s before update %s.", DatabaseUser.Name, DatabaseUser.ResourceVersion)
    oldStatus := DatabaseUser.Status.DeepCopy()
    // 用户创建、更新等逻辑编写。可以调用 job 或者直接在 go 中连接本地数据库进行用户操作。
    reconcileError := r.do(ctx, &DatabaseUser)
    // 将实际状态刷新到资源中
    updateStatusError := r.updateStatus(ctx, &DatabaseUser, oldStatus)
    // 在以下方法中判断成功还是失败,并决策是否进行下一轮循环
    return againOrDone(&DatabaseUser, reconcileError, updateStatusError)
    }

    Reconcile 返回两个结果:


    return reconcile.Result{
    Requeue: true, // 重新开始下次循环
    RequeueAfter: requeueAfter,
    }, err // err 不为 nil 的时候也重新开始下次循环


  4. 如何从 secret 中获取密码


    示例代码,可以参考如下,实际实现还需要考虑很多健壮性和扩展性。


    func (r *DatabaseUserReconciler) GetPasswordInCluster(ctx context.Context, user *v1alpha1.DatabaseUser) (string, error) {
    secret := &corev1.Secret{}
    secretKey := client.ObjectKey{Name: user.Spec.Password.Name, Namespace: user.Namespace}

    if err := r.Get(ctx, secretKey, secret); err != nil {
    return "", err
    }
    var path = &user.Spec.Password.Key
    return string(sec.Data[path])
    }


  5. finalizer 实现同步删除资源


    当你执行 kubectl delete dbu my-user 的时候,我们希望 k8s 等待 operatoror 执行完成并返回删除用户成功后,才真的删除这个 cr 文件。这样就需要用到 finalizer的能力。见官方文档:https://kubernetes.io/zh-cn/docs/concepts/overview/working-with-objects/finalizers/


    代码中,可以这么实现:


    // finalizer is pre-delete hook
    myFinalizer = "mygroup.handsomeguy.cn/databaseuser"
    // 在新增用户的时候,打上 finalizer 标记,使用 kubectl get dbu 可以看到 finalizer 的信息
    if !controllerutil.ContainsFinalizer(user, myFinalizer) {
    controllerutil.AddFinalizer(user, myFinalizer)
    if err = r.Update(ctx, user); err != nil {
    return
    }
    }
    // 在删除用户的时候,如果删除成功就去除这个 finalizer
    if controllerutil.ContainsFinalizer(user, myFinalizer) && !r.Opts.SkipUpdateStatus {
    // 先删除用户
    if err := DeleteUser(user); err != nil {
    return err
    }
    // 再去除阻塞器 finalizer
    controllerutil.RemoveFinalizer(user, myFinalizer)
    // update to delete finalizer
    if err := r.Update(ctx, user); err != nil {
    return nil
    }
    return nil
    }

利用 status 提升可维护性

区分一个程序员水平的一个方法,是看他的代码非功能性指标如何,如可维护性高不高,出现问题定位问题快不快。我们这个 operator 说实话还是蛮复杂的,出现问题没有经验的人还真的不好定位。我们需要一种便捷的手段,一条命令就可以定位大部分问题。对于用户来说,熟悉的可能只有 kubectl get dbu 的命令,我们可不可以将创建用户过程中的报错放到这个结果里面呢?答案是可以。

文档在这里。https://book-v1.book.kubebuilder.io/basics/status_subresource.html,但是不如直接看代码:


  1. 在 xxxxtypes.go 中增加 status

    当然你也可以按照官方的文档去增加生成代码的注解。


    //+kubebuilder:object:root=true
    //+kubebuilder:subresource:status

    // DatabaseUser is the Schema for the DatabaseUsers API
    type DatabaseUser struct {
    metav1.TypeMeta `json:",inline"`
    metav1.ObjectMeta `json:"metadata"`

    Spec DatabaseUserSpec `json:"spec,omitempty"`
    Status DatabaseUserStatus `json:"status,omitempty"` // 新增部分
    }
    // DatabaseUserStatus defines the observed state of DatabaseUser
    type DatabaseUserStatus struct {
    Conditions []DatabaseUserCondition `json:"conditions,omitempty"`
    // 其它想放的字段,省略
    }
    // DatabaseUserConditionType custom type
    type DatabaseUserConditionType string
    // DatabaseUserCondition v3 condition
    type DatabaseUserCondition struct {
    // Type of user condition.
    Type DatabaseUserConditionType `json:"type,omitempty"`
    // Status of the condition, one of True, False, Unknown.
    Status corev1.ConditionStatus `json:"status,omitempty"`
    // The last time this condition was updated.
    LastUpdateTime metav1.Time `json:"lastUpdateTime,omitempty"`
    // Last time the condition transitioned from one status to another.
    LastTransitionTime metav1.Time `json:"lastTransitionTime,omitempty"`
    // The reason for the condition's last transition.
    Reason string `json:"reason,omitempty"`
    // A human readable message indicating details about the transition.
    Message string `json:"message,omitempty"`
    }

    // UpdateStatusCondition update status condition
    func (u *DatabaseUser) UpdateStatusCondition(condType DatabaseUserConditionType,
    status corev1.ConditionStatus, reason, message string) (cond *DatabaseUserCondition, changed bool) {
    t := metav1.NewTime(time.Now())
    existedCondition, exists := u.ConditionExists(condType)
    if !exists {
    newCondition := DatabaseUserCondition{
    Type: condType, Status: status, Reason: reason, Message: message,
    LastTransitionTime: t, LastUpdateTime: t,
    }
    u.Status.Conditions = append(u.Status.Conditions, newCondition)

    return &newCondition, true
    }

    if status != existedCondition.Status {
    existedCondition.LastTransitionTime = t
    changed = true
    }

    if message != existedCondition.Message || reason != existedCondition.Reason {
    existedCondition.LastUpdateTime = t
    changed = true
    }

    existedCondition.Status = status
    existedCondition.Message = message
    existedCondition.Reason = reason

    return existedCondition, changed
    }

    加的代码有点多,类爆炸了,但是是值得的。


  2. 在成功或失败的地方(DatabaseUserController中),调用 UpdateStatusCondition
    user.UpdateStatusCondition(
    "Ready", corev1.ConditionTrue,
    "Provision Succeeded", "The user provisioning has succeeded.",
    )
    if *err != nil {
    user.UpdateStatusCondition(
    "NotReady", corev1.ConditionFalse,
    "Provision Failed", fmt.Sprintf("The user provisioning has failed: %s", *err),
    )
    }

    最后在 Reconcile 方法中刷回状态即可。


    func (r *DatabaseUserReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
    // ... 省略
    reconcileError := r.do(ctx, &DatabaseUser)
    updateStatusError := r.updateStatus(ctx, &DatabaseUser, oldStatus) // 刷回状态到集群
    return againOrDone(&DatabaseUser, reconcileError, updateStatusError)
    // ... 省略

    如此,只有你在 controller 的全阶段,将 err 信息写入到 cr 中,用户就可以使用 kubectl describe dbu 来看到这些报错信息,而不用麻烦你了。


好了,以上就是一些实现一个 Operator 的步骤了。这些内容大部分靠回忆,代码部分都是网上找的伪代码,还有很多实战中的坑需要注意,但是限于时间太久,已经想不起来了,以后想起来再补充吧。大家有什么想法可以在评论区交流哦!

发表评论