K8sではDeploymentを作成したときにReplicaSetも作成されるようにしたり、 Load Balancer Serviceを作成したときにGCPなどその環境に応じたLoad Balancerも作成されるようにしたりするため、Controllerがそれらを監視してAPIを呼んでいる。
GKEでのService(ClusterIP/NodePort/LoadBalancer)とIngress - sambaiz-net
Controllerは単なるAPIを呼ぶアプリケーションなので自分でCustom Controllerを作成してDeploymentとしてデプロイすることもできる。 また、監視する対象もpodsやdeploymentsといった標準のAPIだけではなく、 Custom Resource で拡張したものを使うことができる。
特定のアプリケーションのためのControllerはOperatorとも呼ばれる。
CustomResourceDefinition(CRD)
Custom Resourceを定義する。
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
name: crontabs.stable.example.com
spec:
# REST APIで使われる /apis/<group>/<version>
group: stable.example.com
version: v1
# Namespaced か Cluster
scope: Namespaced
names:
# 複数形 URLに使われる /apis/<group>/<version>/<plural>
plural: crontabs
# 単数形 CLIなどで使われる
singular: crontab
# manifestで使う
kind: CronTab
shortNames:
- ct
$ kubectl create -f crd.yaml
$ kubectl get crd
NAME AGE
crontabs.stable.example.com 8s
標準のresourceと同様にKindに指定してcreateできる。
$ cat crontab.yaml
apiVersion: stable.example.com/v1
kind: CronTab
metadata:
name: test
$ kubectl create -f crontab.yaml
$ kubectl get crontab
NAME AGE
test 10s
Client
Controllerで使うCustom ResourceのClientを準備する。
生成
まず、以下のようにファイルを作成する。
$ tree pkg
pkg
└── apis
└── mysamplecontroller
├── register.go
└── v1
├── doc.go
├── register.go
└── types.go
doc.go
はpackage指定のみ、types.go
にはCustom Resourceのstructを書いてcode-generator用のタグを付けている。register.goはsampleからほとんどコピー。
// +k8s:deepcopy-gen=package,register
// +groupName=example.com
package v1
package v1
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// +genclient
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
type SampleResource struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Spec SampleResourceSpec `json:"spec"`
}
type SampleResourceSpec struct {
PodImage string `json:"podImage"`
}
code-generatorを持ってきてgenerate-groups.sh
を実行すると残りのファイルが生成される。
$ cat Makefile
.PHONY: codegen
codegen:
${GOPATH}/src/k8s.io/code-generator/generate-groups.sh "deepcopy,client,informer,lister" \
github.com/sambaiz/k8s-sample-crd-controller/pkg/client github.com/sambaiz/k8s-sample-crd-controller/pkg/apis \
mysamplecontroller:v1
$ make codegen
zz_generated.deepcopy.go
でCustom ResourceのstructにDeepCopyObject()
関数を追加し、
runtime.Object interfaceを満たすようにしている。
$ tree pkg
pkg
├── apis
│ └── mysamplecontroller
│ ├── register.go
│ └── v1
│ ├── doc.go
│ ├── register.go
│ ├── types.go
│ └── zz_generated.deepcopy.go
└── client
Controller
公式のsample-controllerを見ていく。
main.go
Clientを作る。
import (
"k8s.io/client-go/kubernetes"
clientset "k8s.io/sample-controller/pkg/client/clientset/versioned"
)
cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfig)
kubeClient, err := kubernetes.NewForConfig(cfg)
exampleClient, err := clientset.NewForConfig(cfg)
次にClientからdeploymentsとCustom ResourceであるfoosのInformerを作ってControllerに渡す。Informerは変更があったObjectが入るDeltaFifoQueueを監視して、Event Handlerを呼ぶもの。 ちなみにK8sのAPIを監視してDeltaFifoQueueに入れるのはReflectorがやる。
import (
kubeinformers "k8s.io/client-go/informers"
informers "k8s.io/sample-controller/pkg/client/informers/externalversions"
)
kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30)
exampleInformerFactory := informers.NewSharedInformerFactory(exampleClient, time.Second*30)
controller := NewController(kubeClient, exampleClient,
kubeInformerFactory.Apps().V1().Deployments(),
exampleInformerFactory.Samplecontroller().V1alpha1().Foos())
go kubeInformerFactory.Start(stopCh)
go exampleInformerFactory.Start(stopCh)
controller.go
func NewController(
kubeclientset kubernetes.Interface,
sampleclientset clientset.Interface,
deploymentInformer appsinformers.DeploymentInformer,
fooInformer informers.FooInformer) *Controller {
...
controller := &Controller{
kubeclientset: kubeclientset,
sampleclientset: sampleclientset,
deploymentsLister: deploymentInformer.Lister(),
deploymentsSynced: deploymentInformer.Informer().HasSynced,
foosLister: fooInformer.Lister(),
foosSynced: fooInformer.Informer().HasSynced,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Foos"),
recorder: recorder,
}
...
}
EventHandlerを登録する。fooInformerの方は"namespace/name"の文字列をControllerのWorkququeに入れる。 deploymentInformerの方はGetControllerOf()でOwnerReferenceを見て、それがFooならInformerでObjectを取得し同様の処理を行う。
fooInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.enqueueFoo,
UpdateFunc: func(old, new interface{}) {
controller.enqueueFoo(new)
},
})
deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.handleObject,
UpdateFunc: func(old, new interface{}) {
newDepl := new.(*appsv1.Deployment)
oldDepl := old.(*appsv1.Deployment)
if newDepl.ResourceVersion == oldDepl.ResourceVersion {
return
}
controller.handleObject(new)
},
DeleteFunc: controller.handleObject,
})
func (c *Controller) enqueueFoo(obj interface{}) {
var key string
var err error
if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
runtime.HandleError(err)
return
}
c.workqueue.AddRateLimited(key)
}
func (c *Controller) handleObject(obj interface{}) {
var object metav1.Object
var ok bool
if object, ok = obj.(metav1.Object); !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
runtime.HandleError(fmt.Errorf("error decoding object, invalid type"))
return
}
object, ok = tombstone.Obj.(metav1.Object)
if !ok {
runtime.HandleError(fmt.Errorf("error decoding object tombstone, invalid type"))
return
}
glog.V(4).Infof("Recovered deleted object '%s' from tombstone", object.GetName())
}
glog.V(4).Infof("Processing object: %s", object.GetName())
if ownerRef := metav1.GetControllerOf(object); ownerRef != nil {
if ownerRef.Kind != "Foo" {
return
}
foo, err := c.foosLister.Foos(object.GetNamespace()).Get(ownerRef.Name)
if err != nil {
glog.V(4).Infof("ignoring orphaned object '%s' of foo '%s'", object.GetSelfLink(), ownerRef.Name)
return
}
c.enqueueFoo(foo)
return
}
}
Workqueueから取り出して処理を行う。
obj, shutdown := c.workqueue.Get()
err := func(obj interface{}) error {
defer c.workqueue.Done(obj)
if key, ok = obj.(string); !ok {
c.workqueue.Forget(obj)
runtime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
return nil
}
if err := c.syncHandler(key); err != nil {
return fmt.Errorf("error syncing '%s': %s", key, err.Error())
}
c.workqueue.Forget(obj)
glog.Infof("Successfully synced '%s'", key)
return nil
}(obj)
fooを取得し、specのdeploymentNameのdeploymentをclientで生成する。OwnerReferenceを含んでいる。 ここでは省略しているが、既に存在してコントロール下にない場合などのチェックもしている。
namespace, name, err := cache.SplitMetaNamespaceKey(key)
foo, err := c.foosLister.Foos(namespace).Get(name)
deploymentName := foo.Spec.DeploymentName
deployment, err := c.deploymentsLister.Deployments(foo.Namespace).Get(deploymentName)
if errors.IsNotFound(err) {
deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Create(newDeployment(foo))
}
func newDeployment(foo *samplev1alpha1.Foo) *appsv1.Deployment {
labels := map[string]string{
"app": "nginx",
"controller": foo.Name,
}
return &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: foo.Spec.DeploymentName,
Namespace: foo.Namespace,
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(foo, schema.GroupVersionKind{
Group: samplev1alpha1.SchemeGroupVersion.Group,
Version: samplev1alpha1.SchemeGroupVersion.Version,
Kind: "Foo",
}),
},
},
Spec: appsv1.DeploymentSpec{
Replicas: foo.Spec.Replicas,
Selector: &metav1.LabelSelector{
MatchLabels: labels,
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: labels,
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "nginx",
Image: "nginx:latest",
},
},
},
},
},
}
}
デプロイ
ローカルで動かすこともできるが、せっかくなのでクラスタにデプロイする。環境はGKE。
$ cat Dockerfile
FROM golang:1.10 AS builder
ADD . /go/src/k8s.io/sample-controller
WORKDIR /go/src/k8s.io/sample-controller
RUN CGO_ENABLED=0 go build -o sample-controller .
FROM alpine
WORKDIR /
COPY --from=builder /go/src/k8s.io/sample-controller/sample-controller .
CMD ["/sample-controller"]
$ docker build -t asia.gcr.io/*****/sample-controller .
$ gcloud auth configure-docker
$ docker push asia.gcr.io/*****/sample-controller
RBACが有効になっている場合はAPIを呼ぶため必要なClusterRoleをControllerが動くPodに与える必要がある。
$ kubectl create clusterrolebinding cluster-admin-binding --clusterrole=cluster-admin --user=<email>
$ cat role.yaml
kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1beta1
metadata:
name: sample-controller
rules:
- apiGroups: ["apps", "samplecontroller.k8s.io"]
resources: ["*"]
verbs: ["*"]
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: sample-controller
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1beta1
metadata:
name: sample-controller
subjects:
- kind: ServiceAccount
name: sample-controller
namespace: default
roleRef:
kind: ClusterRole
name: sample-controller
apiGroup: rbac.authorization.k8s.io
$ kubectl apply -f role.yaml
初めClusterRoleではなくRoleにしたせいで以下のようなエラーが出て少し悩んだ。
k8s.io/sample-controller/pkg/client/informers/externalversions/factory.go:117: Failed to list *v1alpha1.Foo: foos.samplecontroller.k8s.io is forbidden: User "system:serviceaccount:default:sample-controller" cannot list foos.samplecontroller.k8s.io at the cluster scope: Unknown user "system:serviceaccount:default:sample-controller"
作ったServiceAccountをDeploymentのPodSpecに含める。
$ cat deployment.yaml
kind: Deployment
metadata:
name: sample-controller
spec:
template:
metadata:
labels:
app: sample-controller
spec:
serviceAccount: sample-controller
containers:
- name: sample-controller
image: asia.gcr.io/*****/sample-controller
$ kubectl apply -f deployment.yaml
動作
FooのCRDを作成してcreateするとたしかにdeploymentが作成された。 また、deploymentを削除すると新しいdeploymentが作成され、Fooを削除するとdeploymentも削除されることが確認できる。
$ kubectl create -f artifacts/examples/crd.yaml
$ kubectl create -f artifacts/examples/example-foo.yaml
$ kubectl get foo
NAME AGE
example-foo 2s
$ kubectl get deployment
NAME DESIRED CURRENT UP-TO-DATE AVAILABLE AGE
example-foo 1 1 1 1 5s
sample-controller 1 1 1 1 1h
$ kubectl delete deployment example-foo
deployment "example-foo" deleted
$ kubectl get deployment
NAME DESIRED CURRENT UP-TO-DATE AVAILABLE AGE
example-foo 1 1 1 1 1s
sample-controller 1 1 1 1 1h
$ kubectl delete foo example-foo
$ kubectl get deployment
No resources found.
参考
Kubernetes Deep Dive: Code Generation for CustomResources – OpenShift Blog