我们可以使用code-generator 以及controller-tools来进行代码自动生成,通过代码自动生成可以帮我们自动生成 CRD 资源对象,以及客户端访问的 ClientSet、Informer、Lister 等工具包,接下来我们就来了解下如何编写一个自定义的控制器。
CRD定义首先初始化项目:
$ mkdir operator-crd && cd operator-crd
$ go mod init operator-crd
$ mkdir -p pkg/apis/example.com/v1
在该文件夹下新建doc.go
文件,内容如下所示:
// +k8s:deepcopy-gen=package
// +groupName=example.com
package v1
根据 CRD 的规范定义,这里我们定义的 group 为 example.com
,版本为 v1
,在顶部添加了一个代码自动生成的 deepcopy-gen
的 tag,为整个包中的类型生成深拷贝方法。
然后就是非常重要的资源对象的结构体定义,新建 types.go
文件,types.go内容可以使用type-scaffpld
自动生成,具体文件内容如下:
package v1
import metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
// BarSpec defines the desired state of Bar
type BarSpec struct {
// INSERT ADDITIONAL SPEC FIELDS -- desired state of cluster
DeploymentName string `json:"deploymentName"`
Image string `json:"image"`
Replicas *int32 `json:"replicas"`
}
// BarStatus defines the observed state of Bar.
// It should always be reconstructable from the state of the cluster and/or outside world.
type BarStatus struct {
// INSERT ADDITIONAL STATUS FIELDS -- observed state of cluster
}
// 下面这个一定不能少,少了的话不能生成 lister 和 informer
// +genclient
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// Bar is the Schema for the bars API
// +k8s:openapi-gen=true
type Bar struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Spec BarSpec `json:"spec,omitempty"`
Status BarStatus `json:"status,omitempty"`
}
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// BarList contains a list of Bar
type BarList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata,omitempty"`
Items []Bar `json:"items"`
}
然后可以参考系统内置的资源对象,还需要提供 AddToScheme 与 Resource 两个变量供 client 注册,新建 register.go 文件,内容如下所示:
package v1
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
)
// SchemeGroupVersion 注册自己的自定义资源
var SchemeGroupVersion = schema.GroupVersion{Group: "example.com", Version: "v1"}
// Kind takes an unqualified kind and returns back a Group qualified GroupKind
func Kind(kind string) schema.GroupKind {
return SchemeGroupVersion.WithKind(kind).GroupKind()
}
// Resource takes an unqualified resource and returns a Group qualified GroupResource
func Resource(resource string) schema.GroupResource {
return SchemeGroupVersion.WithResource(resource).GroupResource()
}
var (
// SchemeBuilder initializes a scheme builder
SchemeBuilder = runtime.NewSchemeBuilder(addKnownTypes)
// AddToScheme is a global function that registers this API group & version to a scheme
AddToScheme = SchemeBuilder.AddToScheme
)
// Adds the list of known types to Scheme.
func addKnownTypes(scheme *runtime.Scheme) error {
// 添加 Bar 与 BarList这两个资源到 scheme
scheme.AddKnownTypes(SchemeGroupVersion,
&Bar{},
&BarList{},
)
metav1.AddToGroupVersion(scheme, SchemeGroupVersion)
return nil
}
使用controller-gen
生成crd:
$ controller-gen crd paths=./... output:crd:dir=crd
生成example.com_bars.yaml文件如下所示:
---
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: (devel)
creationTimestamp: null
name: bars.example.com
spec:
group: example.com
names:
kind: Bar
listKind: BarList
plural: bars
singular: bar
scope: Namespaced
versions:
- name: v1
schema:
openAPIV3Schema:
description: Bar is the Schema for the bars API
properties:
apiVersion:
description: 'APIVersion defines the versioned schema of this representation
of an object. Servers should convert recognized schemas to the latest
internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources'
type: string
kind:
description: 'Kind is a string value representing the REST resource this
object represents. Servers may infer this from the endpoint the client
submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds'
type: string
metadata:
type: object
spec:
description: BarSpec defines the desired state of Bar
properties:
deploymentName:
description: INSERT ADDITIONAL SPEC FIELDS -- desired state of cluster
type: string
image:
type: string
replicas:
format: int32
type: integer
required:
- deploymentName
- image
- replicas
type: object
status:
description: BarStatus defines the observed state of Bar. It should always
be reconstructable from the state of the cluster and/or outside world.
type: object
type: object
served: true
storage: true
最终项目结构如下所示:
$ tree
.
├── crd
│ └── example.com_bars.yaml
├── go.mod
├── go.sum
└── pkg
└── apis
└── example.com
└── v1
├── doc.go
├── register.go
└── types.go
5 directories, 6 files
生成客户端相关代码
上面我们准备好资源的 API 资源类型后,就可以使用开始生成 CRD 资源的客户端使用的相关代码了。
首先创建生成代码的脚本,下面这些脚本均来源于 sample-controller 提供的示例:
$ mkdir hack && cd hack
在该目录下面新建 tools.go 文件,添加 code-generator 依赖,因为在没有代码使用 code-generator 时,go module 默认不会为我们依赖此包。文件内容如下所示:
// +build tools
// 建立 tools.go 来依赖 code-generator
// 因为在没有代码使用 code-generator 时,go module 默认不会为我们依赖此包.
package tools
import _ "k8s.io/code-generator"
然后新建 update-codegen.sh 脚本,用来配置代码生成的脚本:
#!/usr/bin/env bash
set -o errexit
set -o nounset
set -o pipefail
SCRIPT_ROOT=$(dirname "${BASH_SOURCE[0]}")/..
CODEGEN_PKG=${CODEGEN_PKG:-$(cd "${SCRIPT_ROOT}"; ls -d -1 ./vendor/k8s.io/code-generator 2>/dev/null || echo ../code-generator)}
bash "${CODEGEN_PKG}"/generate-groups.sh "deepcopy,client,informer,lister" \
operator-crd/pkg/client operator-crd/pkg/apis example.com:v1 \
--output-base "${SCRIPT_ROOT}"/../ \
--go-header-file "${SCRIPT_ROOT}"/hack/boilerplate.go.txt
# To use your own boilerplate text append:
# --go-header-file "${SCRIPT_ROOT}"/hack/custom-boilerplate.go.txt
同样还有 verify-codegen.sh 脚本,用来校验生成的代码是否是最新的:
#!/usr/bin/env bash
set -o errexit
set -o nounset
set -o pipefail
SCRIPT_ROOT=$(dirname "${BASH_SOURCE[0]}")/..
DIFFROOT="${SCRIPT_ROOT}/pkg"
TMP_DIFFROOT="${SCRIPT_ROOT}/_tmp/pkg"
_tmp="${SCRIPT_ROOT}/_tmp"
cleanup() {
rm -rf "${_tmp}"
}
trap "cleanup" EXIT SIGINT
cleanup
mkdir -p "${TMP_DIFFROOT}"
cp -a "${DIFFROOT}"/* "${TMP_DIFFROOT}"
"${SCRIPT_ROOT}/hack/update-codegen.sh"
echo "diffing ${DIFFROOT} against freshly generated codegen"
ret=0
diff -Naupr "${DIFFROOT}" "${TMP_DIFFROOT}" || ret=$?
cp -a "${TMP_DIFFROOT}"/* "${DIFFROOT}"
if [[ $ret -eq 0 ]]
then
echo "${DIFFROOT} up to date."
else
echo "${DIFFROOT} is out of date. Please run hack/update-codegen.sh"
exit 1
fi
还有一个为生成的代码文件添加头部内容的 boilerplate.go.txt 文件,内容如下所示,其实就是为每个生成的代码文件头部添加上下面的开源协议声明:
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
接下来我们就可以来执行代码生成的脚本了,首先将依赖包放置到 vendor 目录中去:
$ go mod vendor
然后执行脚本生成代码:
$ chmod +x ./hack/update-codegen.sh
$./hack/update-codegen.sh
Generating deepcopy funcs
Generating clientset for example.com:v1 at operator-crd/pkg/client/clientset
Generating listers for example.com:v1 at operator-crd/pkg/client/listers
Generating informers for example.com:v1 at operator-crd/pkg/client/informers
代码生成后,整个项目的 pkg 包变成了下面的样子:
$ tree pkg
pkg
├── apis
│ └── example.com
│ └── v1
│ ├── doc.go
│ ├── register.go
│ ├── types.go
│ └── zz_generated.deepcopy.go
└── client
├── clientset
│ └── versioned
│ ├── clientset.go
│ ├── doc.go
│ ├── fake
│ │ ├── clientset_generated.go
│ │ ├── doc.go
│ │ └── register.go
│ ├── scheme
│ │ ├── doc.go
│ │ └── register.go
│ └── typed
│ └── example.com
│ └── v1
│ ├── bar.go
│ ├── doc.go
│ ├── example.com_client.go
│ ├── fake
│ │ ├── doc.go
│ │ ├── fake_bar.go
│ │ └── fake_example.com_client.go
│ └── generated_expansion.go
├── informers
│ └── externalversions
│ ├── example.com
│ │ ├── interface.go
│ │ └── v1
│ │ ├── bar.go
│ │ └── interface.go
│ ├── factory.go
│ ├── generic.go
│ └── internalinterfaces
│ └── factory_interfaces.go
└── listers
└── example.com
└── v1
├── bar.go
└── expansion_generated.go
20 directories, 26 files
仔细观察可以发现 pkg/apis/example.com/v1
目录下面多了一个zz_generated.deepcopy.go
文件,在 pkg/client
文件夹下生成了 clientset和 informers 和 listers 三个目录,有了这几个自动生成的客户端相关操作包,我们就可以去访问 CRD 资源了,可以和使用内置的资源对象一样去对 Bar 进行 List 和 Watch 操作了。
首先要先获取访问资源对象的 ClientSet,在项目根目录下面新建 main.go 文件。
package main
import (
"k8s.io/client-go/tools/clientcmd"
"k8s.io/klog/v2"
clientset "operator-crd/pkg/client/clientset/versioned"
"operator-crd/pkg/client/informers/externalversions"
"time"
"os"
"os/signal"
"syscall"
)
var (
onlyOneSignalHandler = make(chan struct{})
shutdownSignals = []os.Signal{os.Interrupt, syscall.SIGTERM}
)
// 注册 SIGTERM 和 SIGINT 信号
// 返回一个 stop channel, 该通道在捕获到第一个信号时被关闭
// 如果捕获到第二个信号,程序直接退出
func setupSignalHandler() (stopCh <-chan struct{}) {
// 当调用两次的时候 panics
close(onlyOneSignalHandler)
stop := make(chan struct{})
c := make(chan os.Signal, 2)
// Notify 函数让 signal 包将输入信号转发到c
// 如果没有列出要传递的信号,会将所有输入信号传递到 c; 否则只会传递列出的输入信号
signal.Notify(c, shutdownSignals...)
go func() {
<-c
close(stop)
<-c
os.Exit(1) // 第二个信号直接退出
}()
return stop
}
func main() {
stopCh := setupSignalHandler()
// 获取config
config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
if err != nil {
klog.Fatalln(err)
}
// 通过config构建clientSet
// 这里的clientSet 是 Bar 的
clientSet, err := clientset.NewForConfig(config)
if err != nil {
klog.Fatalln(err)
}
// informerFactory 工厂类, 这里注入我们通过代码生成的 client
// client 主要用于和 API Server 进行通信,实现 ListAndWatch
factory := externalversions.NewSharedInformerFactory(clientSet, time.Second*30)
// 实例化自定义控制器
controller := NewController(factory.Example().V1().Bars())
// 启动 informer,开始list 和 watch
go factory.Start(stopCh)
// 启动控制器
if err = controller.Run(2, stopCh); err != nil {
klog.Fatalf("Error running controller: %s", err.Error())
}
}
首先初始化一个用于访问 Bar 资源的 ClientSet 对象,然后同样新建一个 Bar 的 InformerFactory 实例,通过这个工厂实例可以去启动 Informer 开始对 Bar 的 List 和 Watch 操作,然后同样我们要自己去封装一个自定义的控制器,在这个控制器里面去实现一个控制循环,不断对 Bar 的状态进行调谐。
在项目根目录下新建 controller.go
文件,内容如下所示:
package main
import (
"fmt"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
v1 "operator-crd/pkg/apis/example.com/v1"
"time"
informers "operator-crd/pkg/client/informers/externalversions/example.com/v1"
)
type Controller struct {
informer informers.BarInformer
workqueue workqueue.RateLimitingInterface
}
func NewController(informer informers.BarInformer) *Controller {
controller := &Controller{
informer: informer,
// WorkQueue 的实现,负责同步 Informer 和控制循环之间的数据
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "bar"),
}
klog.Info("Setting up Bar event handlers")
// informer 注册了三个 Handler(AddFunc、UpdateFunc 和 DeleteFunc)
// 分别对应 API 对象的“添加”“更新”和“删除”事件。
// 而具体的处理操作,都是将该事件对应的 API 对象加入到工作队列中
informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.addBar,
UpdateFunc: controller.updateBar,
DeleteFunc: controller.deleteBar,
})
return controller
}
func (c *Controller) Run(thread int, stopCh <-chan struct{}) error {
defer runtime.HandleCrash()
defer c.workqueue.ShuttingDown()
// 记录开始日志
klog.Info("Starting Bar control loop")
klog.Info("Waiting for informer caches to sync")
// 等待缓存同步数据
if ok := cache.WaitForCacheSync(stopCh, c.informer.Informer().HasSynced); !ok {
return fmt.Errorf("failed to wati for caches to sync")
}
klog.Info("Starting workers")
for i := 0; i < thread; i++ {
go wait.Until(c.runWorker, time.Second, stopCh)
}
klog.Info("Started workers")
<-stopCh
klog.Info("Shutting down workers")
return nil
}
// runWorker 是一个不断运行的方法,并且一直会调用 c.processNextWorkItem 从 workqueue读取消息
func (c *Controller) runWorker() {
for c.processNExtWorkItem() {
}
}
// 从workqueue读取和读取消息
func (c *Controller) processNExtWorkItem() bool {
// 获取 item
item, shutdown := c.workqueue.Get()
if shutdown {
return false
}
if err := func(item interface{}) error {
// 标记以及处理
defer c.workqueue.Done(item)
var key string
var ok bool
if key, ok = item.(string); !ok {
// 判读key的类型不是字符串,则直接丢弃
c.workqueue.Forget(item)
runtime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", item))
return nil
}
if err := c.syncHandler(key); err != nil {
return fmt.Errorf("error syncing '%s':%s", item, err.Error())
}
c.workqueue.Forget(item)
return nil
}(item); err != nil {
runtime.HandleError(err)
return false
}
return true
}
// 尝试从 Informer 维护的缓存中拿到了它所对应的 Bar 对象
func (c *Controller) syncHandler(key string) error {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
runtime.HandleError(fmt.Errorf("invalid respirce key:%s", key))
return err
}
bar, err := c.informer.Lister().Bars(namespace).Get(name)
if err != nil {
if errors.IsNotFound(err) {
// 说明是在删除事件中添加进来的
return nil
}
runtime.HandleError(fmt.Errorf("failed to get bar by: %s/%s", namespace, name))
return err
}
fmt.Printf("[BarCRD] try to process bar:%#v ...", bar)
// 可以根据bar来做其他的事。
// todo
return nil
}
func (c *Controller) addBar(item interface{}) {
var key string
var err error
if key, err = cache.MetaNamespaceKeyFunc(item); err != nil {
runtime.HandleError(err)
return
}
c.workqueue.AddRateLimited(key)
}
func (c *Controller) deleteBar(item interface{}) {
var key string
var err error
if key, err = cache.DeletionHandlingMetaNamespaceKeyFunc(item); err != nil {
runtime.HandleError(err)
return
}
fmt.Println("delete crd")
c.workqueue.AddRateLimited(key)
}
func (c *Controller) updateBar(old, new interface{}) {
oldItem := old.(*v1.Bar)
newItem := new.(*v1.Bar)
// 比较两个资源版本,如果相同,则不处理
if oldItem.ResourceVersion == newItem.ResourceVersion {
return
}
c.workqueue.AddRateLimited(new)
}
我们这里自定义的控制器只封装了一个 Informer 和一个限速队列,我们当然也可以在里面添加一个用于访问本地缓存的 Indexer,但实际上 Informer 中已经包含了 Lister,对于 List 和 Get 操作都会去通过 Indexer 从本地缓存中获取数据,所以只用一个 Informer 也是完全可行的。
同样在 Informer 中注册了3个事件处理器,将监听的事件获取到后送入 workqueue 队列,然后通过控制器的控制循环不断从队列中消费数据,根据获取的 key 来获取数据判断对象是需要删除还是需要进行其他业务处理,这里我们同样也只是打印出了对应的操作日志,对于实际的项目则进行相应的业务逻辑处理即可。
到这里一个完整的自定义 API 对象和它所对应的自定义控制器就编写完毕了。
测试接下来我们直接运行我们的main函数:
I0512 16:51:33.922138 39032 controller.go:29] Setting up Bar event handlers
I0512 16:51:33.922255 39032 controller.go:47] Starting Bar control loop
I0512 16:51:33.922258 39032 controller.go:48] Waiting for informer caches to sync
I0512 16:51:34.023108 39032 controller.go:55] Starting workers
I0512 16:51:34.023153 39032 controller.go:60] Started workers
现在我们创建一个Bar资源对象:
# bar.yaml
apiVersion: example.com/v1
kind: Bar
metadata:
name: bar-demo
namespace: default
spec:
image: "nginx:1.17.1"
deploymentName: example-bar
replicas: 2
直接创建上面的对象,注意观察控制器的日志:
I0512 16:51:33.922138 39032 controller.go:29] Setting up Bar event handlers
I0512 16:51:33.922255 39032 controller.go:47] Starting Bar control loop
I0512 16:51:33.922258 39032 controller.go:48] Waiting for informer caches to sync
I0512 16:51:34.023108 39032 controller.go:55] Starting workers
I0512 16:51:34.023153 39032 controller.go:60] Started workers
[BarCRD] try to process bar:"bar-demo" ...
可以看到,我们上面创建 bar.yaml 的操作,触发了 EventHandler 的添加事件,从而被放进了工作队列。然后控制器的控制循环从队列里拿到这个对象,并且打印出了正在处理这个 bar 对象的日志信息。
同样我们删除这个资源的时候,也会有对应的提示。
这就是开发自定义 CRD 控制器的基本流程,当然我们还可以在事件处理的业务逻辑中去记录一些 Events 信息,这样我们就可以通过 Event 去了解我们资源的状态了。