KubernetesのCustom Resource Definition(CRD)とCustom Controller

kubernetesgcpgolang

K8sではDeploymentを作成したときにReplicaSetも作成されるようにしたり、 Load Balancer Serviceを作成したときにGCPなどその環境に応じたLoad Balancerも作成されるようにしたりするため、Controllerがそれらを監視してAPIを呼んでいる。

GKEでのService(ClusterIP/NodePort/LoadBalancer)とIngress - sambaiz-net

Controllerは単なるAPIを呼ぶアプリケーションなので自分でCustom Controllerを作成してDeploymentとしてデプロイすることもできる。 また、監視する対象もpodsやdeploymentsといった標準のAPIだけではなく、 Custom Resource で拡張したものを使うことができる。

特定のアプリケーションのためのControllerはOperatorとも呼ばれる。

CustomResourceDefinition(CRD)

Custom Resourceを定義する。

apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
  name: crontabs.stable.example.com
spec:
  # REST APIで使われる /apis/<group>/<version>
  group: stable.example.com
  version: v1
  # Namespaced か Cluster
  scope: Namespaced
  names:
    # 複数形 URLに使われる /apis/<group>/<version>/<plural>
    plural: crontabs
    # 単数形 CLIなどで使われる
    singular: crontab
    # manifestで使う
    kind: CronTab
    shortNames:
    - ct
$ kubectl create -f crd.yaml
$ kubectl get crd
NAME                          AGE
crontabs.stable.example.com   8s

標準のresourceと同様にKindに指定してcreateできる。

$ cat crontab.yaml
apiVersion: stable.example.com/v1
kind: CronTab
metadata:
  name: test

$ kubectl create -f crontab.yaml
$ kubectl get crontab
NAME      AGE
test      10s

Client

Controllerで使うCustom ResourceのClientを準備する。

生成

まず、以下のようにファイルを作成する。

$ tree pkg
pkg
└── apis
    └── mysamplecontroller
        ├── register.go
        └── v1
            ├── doc.go
            ├── register.go
            └── types.go

doc.goはpackage指定のみ、types.goにはCustom Resourceのstructを書いてcode-generator用のタグを付けている。register.goはsampleからほとんどコピー。

// +k8s:deepcopy-gen=package,register

// +groupName=example.com
package v1
package v1

import (
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// +genclient
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object

type SampleResource struct {
	metav1.TypeMeta   `json:",inline"`
	metav1.ObjectMeta `json:"metadata,omitempty"`

	Spec SampleResourceSpec `json:"spec"`
}

type SampleResourceSpec struct {
	PodImage string `json:"podImage"`
}

code-generatorを持ってきてgenerate-groups.shを実行すると残りのファイルが生成される。

$ cat Makefile
.PHONY: codegen
codegen:
	${GOPATH}/src/k8s.io/code-generator/generate-groups.sh "deepcopy,client,informer,lister" \
	github.com/sambaiz/k8s-sample-crd-controller/pkg/client github.com/sambaiz/k8s-sample-crd-controller/pkg/apis \
	mysamplecontroller:v1

$ make codegen

zz_generated.deepcopy.goでCustom ResourceのstructにDeepCopyObject()関数を追加し、 runtime.Object interfaceを満たすようにしている。

$ tree pkg
pkg
├── apis
│   └── mysamplecontroller
│       ├── register.go
│       └── v1
│           ├── doc.go
│           ├── register.go
│           ├── types.go
│           └── zz_generated.deepcopy.go
└── client

Controller

公式のsample-controllerを見ていく。

main.go

Clientを作る。

import (
    "k8s.io/client-go/kubernetes"
    clientset "k8s.io/sample-controller/pkg/client/clientset/versioned"
)

cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfig)
kubeClient, err := kubernetes.NewForConfig(cfg)
exampleClient, err := clientset.NewForConfig(cfg)

次にClientからdeploymentsとCustom ResourceであるfoosのInformerを作ってControllerに渡す。Informerは変更があったObjectが入るDeltaFifoQueueを監視して、Event Handlerを呼ぶもの。 ちなみにK8sのAPIを監視してDeltaFifoQueueに入れるのはReflectorがやる。

import (
    kubeinformers "k8s.io/client-go/informers"
    informers "k8s.io/sample-controller/pkg/client/informers/externalversions"
)

kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30)
exampleInformerFactory := informers.NewSharedInformerFactory(exampleClient, time.Second*30)

controller := NewController(kubeClient, exampleClient,
    kubeInformerFactory.Apps().V1().Deployments(),
    exampleInformerFactory.Samplecontroller().V1alpha1().Foos())

go kubeInformerFactory.Start(stopCh)
go exampleInformerFactory.Start(stopCh)

controller.go

func NewController(
	kubeclientset kubernetes.Interface,
	sampleclientset clientset.Interface,
	deploymentInformer appsinformers.DeploymentInformer,
	fooInformer informers.FooInformer) *Controller {
    ...
    controller := &Controller{
        kubeclientset:     kubeclientset,
        sampleclientset:   sampleclientset,
        deploymentsLister: deploymentInformer.Lister(),
        deploymentsSynced: deploymentInformer.Informer().HasSynced,
        foosLister:        fooInformer.Lister(),
        foosSynced:        fooInformer.Informer().HasSynced,
        workqueue:         workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Foos"),
        recorder:          recorder,
    }
    ...
}

EventHandlerを登録する。fooInformerの方は"namespace/name"の文字列をControllerのWorkququeに入れる。 deploymentInformerの方はGetControllerOf()OwnerReferenceを見て、それがFooならInformerでObjectを取得し同様の処理を行う。

fooInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    AddFunc: controller.enqueueFoo,
    UpdateFunc: func(old, new interface{}) {
        controller.enqueueFoo(new)
    },
})

deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    AddFunc: controller.handleObject,
    UpdateFunc: func(old, new interface{}) {
        newDepl := new.(*appsv1.Deployment)
        oldDepl := old.(*appsv1.Deployment)
        if newDepl.ResourceVersion == oldDepl.ResourceVersion {
            return
        }
        controller.handleObject(new)
    },
    DeleteFunc: controller.handleObject,
})

func (c *Controller) enqueueFoo(obj interface{}) {
	var key string
	var err error
	if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
		runtime.HandleError(err)
		return
	}
	c.workqueue.AddRateLimited(key)
}

func (c *Controller) handleObject(obj interface{}) {
	var object metav1.Object
	var ok bool
	if object, ok = obj.(metav1.Object); !ok {
		tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
		if !ok {
			runtime.HandleError(fmt.Errorf("error decoding object, invalid type"))
			return
		}
		object, ok = tombstone.Obj.(metav1.Object)
		if !ok {
			runtime.HandleError(fmt.Errorf("error decoding object tombstone, invalid type"))
			return
		}
		glog.V(4).Infof("Recovered deleted object '%s' from tombstone", object.GetName())
	}
	glog.V(4).Infof("Processing object: %s", object.GetName())
	if ownerRef := metav1.GetControllerOf(object); ownerRef != nil {
		if ownerRef.Kind != "Foo" {
			return
		}

		foo, err := c.foosLister.Foos(object.GetNamespace()).Get(ownerRef.Name)
		if err != nil {
			glog.V(4).Infof("ignoring orphaned object '%s' of foo '%s'", object.GetSelfLink(), ownerRef.Name)
			return
		}

		c.enqueueFoo(foo)
		return
	}
}

Workqueueから取り出して処理を行う。

obj, shutdown := c.workqueue.Get()
err := func(obj interface{}) error {
    defer c.workqueue.Done(obj)
	if key, ok = obj.(string); !ok {
        c.workqueue.Forget(obj)
        runtime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
        return nil
    }
    if err := c.syncHandler(key); err != nil {
        return fmt.Errorf("error syncing '%s': %s", key, err.Error())
    }
    c.workqueue.Forget(obj)
    glog.Infof("Successfully synced '%s'", key)
    return nil
}(obj)

fooを取得し、specのdeploymentNameのdeploymentをclientで生成する。OwnerReferenceを含んでいる。 ここでは省略しているが、既に存在してコントロール下にない場合などのチェックもしている。

namespace, name, err := cache.SplitMetaNamespaceKey(key)
foo, err := c.foosLister.Foos(namespace).Get(name)
deploymentName := foo.Spec.DeploymentName
deployment, err := c.deploymentsLister.Deployments(foo.Namespace).Get(deploymentName)
if errors.IsNotFound(err) {
    deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Create(newDeployment(foo))
}

func newDeployment(foo *samplev1alpha1.Foo) *appsv1.Deployment {
	labels := map[string]string{
		"app":        "nginx",
		"controller": foo.Name,
	}
	return &appsv1.Deployment{
		ObjectMeta: metav1.ObjectMeta{
			Name:      foo.Spec.DeploymentName,
			Namespace: foo.Namespace,
			OwnerReferences: []metav1.OwnerReference{
				*metav1.NewControllerRef(foo, schema.GroupVersionKind{
					Group:   samplev1alpha1.SchemeGroupVersion.Group,
					Version: samplev1alpha1.SchemeGroupVersion.Version,
					Kind:    "Foo",
				}),
			},
		},
		Spec: appsv1.DeploymentSpec{
			Replicas: foo.Spec.Replicas,
			Selector: &metav1.LabelSelector{
				MatchLabels: labels,
			},
			Template: corev1.PodTemplateSpec{
				ObjectMeta: metav1.ObjectMeta{
					Labels: labels,
				},
				Spec: corev1.PodSpec{
					Containers: []corev1.Container{
						{
							Name:  "nginx",
							Image: "nginx:latest",
						},
					},
				},
			},
		},
	}
}

デプロイ

ローカルで動かすこともできるが、せっかくなのでクラスタにデプロイする。環境はGKE。

$ cat Dockerfile
FROM golang:1.10 AS builder
ADD . /go/src/k8s.io/sample-controller
WORKDIR /go/src/k8s.io/sample-controller
RUN CGO_ENABLED=0 go build -o sample-controller .

FROM alpine
WORKDIR /
COPY --from=builder /go/src/k8s.io/sample-controller/sample-controller .
CMD ["/sample-controller"]
$ docker build -t asia.gcr.io/*****/sample-controller .
$ gcloud auth configure-docker
$ docker push asia.gcr.io/*****/sample-controller

RBACが有効になっている場合はAPIを呼ぶため必要なClusterRoleをControllerが動くPodに与える必要がある。

$ kubectl create clusterrolebinding cluster-admin-binding --clusterrole=cluster-admin --user=<email>
$ cat role.yaml 
kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1beta1
metadata:
  name: sample-controller
rules:
- apiGroups: ["apps", "samplecontroller.k8s.io"]
  resources: ["*"]
  verbs: ["*"]
---
apiVersion: v1
kind: ServiceAccount
metadata:
  name: sample-controller
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1beta1
metadata:
  name: sample-controller
subjects:
- kind: ServiceAccount
  name: sample-controller
  namespace: default
roleRef:
  kind: ClusterRole
  name: sample-controller
  apiGroup: rbac.authorization.k8s.io

$ kubectl apply -f role.yaml 

初めClusterRoleではなくRoleにしたせいで以下のようなエラーが出て少し悩んだ。

k8s.io/sample-controller/pkg/client/informers/externalversions/factory.go:117: Failed to list *v1alpha1.Foo: foos.samplecontroller.k8s.io is forbidden: User "system:serviceaccount:default:sample-controller" cannot list foos.samplecontroller.k8s.io at the cluster scope: Unknown user "system:serviceaccount:default:sample-controller"

作ったServiceAccountをDeploymentのPodSpecに含める。

$ cat deployment.yaml
kind: Deployment
metadata:
  name: sample-controller
spec:
  template:
    metadata:
      labels:
        app: sample-controller
    spec:
      serviceAccount: sample-controller
      containers:
      - name: sample-controller
        image: asia.gcr.io/*****/sample-controller

$ kubectl apply -f deployment.yaml

動作

FooのCRDを作成してcreateするとたしかにdeploymentが作成された。 また、deploymentを削除すると新しいdeploymentが作成され、Fooを削除するとdeploymentも削除されることが確認できる。

$ kubectl create -f artifacts/examples/crd.yaml
$ kubectl create -f artifacts/examples/example-foo.yaml
$ kubectl get foo
NAME          AGE
example-foo   2s

$ kubectl get deployment
NAME          DESIRED   CURRENT   UP-TO-DATE   AVAILABLE   AGE
example-foo         1         1         1            1           5s
sample-controller   1         1         1            1           1h

$ kubectl delete deployment example-foo
deployment "example-foo" deleted

$ kubectl get deployment
NAME          DESIRED   CURRENT   UP-TO-DATE   AVAILABLE   AGE
example-foo         1         1         1            1           1s
sample-controller   1         1         1            1           1h

$ kubectl delete foo example-foo
$ kubectl get deployment
No resources found.

参考

Kubernetes Deep Dive: Code Generation for CustomResources – OpenShift Blog