Skip to content

Latest commit

 

History

History
358 lines (314 loc) · 9.11 KB

how-to-use-crd.md

File metadata and controls

358 lines (314 loc) · 9.11 KB

CRD

CRD是一个内建的API, 它提供了一个简单的方式来创建自定义资源。 部署一个CRD到集群中使Kubernetes API服务端开始为你指定的自定义资源服务。

这使你不必再编写自己的API服务端来处理自定义资源,但是这种实现的一般性意味着比你使用API server aggregation缺乏灵活性。

如果只是想添加资源到集群,可以考虑使用 customer resource define,简称CRD,CRD需要更少的编码和重用, 在这里阅读更多有关自定义资源和扩展api之间的差异

创建CRD

cat > EOF | kubectl create -f 
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
  name: podtoips.example.com
spec:
  group: example.com
  version: v1
  scope: Namespaced
  names:
    plural: podtoips
    singular: podtoip
    kind: Podtoip
    shortNames:
    - ptp
EOF

controller逻辑

controller 一般会有 1 个或者多个 informer,来跟踪某一个 resource, 跟 APIserver 保持通讯,把最新的状态反映到本地的 cache 中。 只要这些资源有变化,informal 会调用 callback。 这些 callbacks 只是做一些非常简单的预处理,把不关心的的变化过滤掉, 然后把关心的变更的 Object 放到 workqueue 里面。 其实真正的 business logic 都是在 worker 里面, 一般 1 个 Controller 会启动很多 goroutines 跑 Workers, 处理 workqueue 里的 items。它会计算用户想要达到的状态和当前的状态有多大的区别, 然后通过 clients 向 APIserver 发送请求,来驱动这个集群向用户要求的状态演化。 图里面蓝色的是 client-go 的原件,红色是自己写 controller 时填的代码

controller

下面是我实现的一个简单的 poptoip controller

package main

import (
	"flag"
	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/rest"
	"k8s.io/client-go/tools/clientcmd"
	"k8s.io/apimachinery/pkg/runtime/schema"
	"k8s.io/apimachinery/pkg/runtime/serializer"
	"k8s.io/apimachinery/pkg/runtime"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/client-go/tools/cache"
	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
	"k8s.io/client-go/util/workqueue"
	"time"
	v1 "k8s.io/api/core/v1"
	"k8s.io/apimachinery/pkg/fields"
	"fmt"
	"k8s.io/apimachinery/pkg/util/wait"
	"log"
	"k8s.io/client-go/kubernetes/scheme"
)
func init(){
	log.SetFlags(log.Lshortfile)
}
func main(){
	kubeconfig := flag.String("kubeconfig", "./kubeconfig", "Path to a kube config. Only required if out-of-cluster.")
	flag.Parse()
	pic:=newPodipcontroller(*kubeconfig)
	var stopCh <-chan struct{}
	pic.Run(2, stopCh)
}

type Podipcontroller struct {
	kubeClient *kubernetes.Clientset
	crdclient  *Podipclient

	podStore cache.Store
	//cache.AppendFunc
	podController cache.Controller
	podtoip        *PodToIp
	podsQueue      workqueue.RateLimitingInterface
}

func (p2p *Podipcontroller) Run(workers int, stopCh <-chan struct{}) {
	defer utilruntime.HandleCrash()

	fmt.Println("Starting Controller")
	//p2p.registerTPR()
	go p2p.podController.Run(stopCh)
	//go p2p.endpointController.Run(stopCh)
	// wait for the controller to List. This help avoid churns during start up.
	if !cache.WaitForCacheSync(stopCh, p2p.podController.HasSynced) {
		return
	}
	for i := 0; i < workers; i++ {
		go wait.Until(p2p.podWorker, time.Second, stopCh)
	}

	<-stopCh
	fmt.Printf("Shutting down Controller")
	p2p.podsQueue.ShutDown()
}

func (p2p *Podipcontroller) podWorker() {
	workFunc := func() bool {
		key, quit := p2p.podsQueue.Get()
		log.Println(key)
		if quit {
			return true
		}
		defer p2p.podsQueue.Done(key)
		p2p.podStore.Resync()
		obj, exists, err := p2p.podStore.GetByKey(key.(string))
		log.Printf("%#v",obj)
		if !exists {
			fmt.Printf("Pod has been deleted %v\n", key)
			return false
		}
		if err != nil {
			fmt.Printf("cannot get pod: %v\n", key)
			return false
		}
		pod := obj.(*v1.Pod)
		if pod.DeletionTimestamp!=nil{
			log.Println(p2p.crdclient.Delete(pod.Name,pod.Namespace))
			return false
		}
		log.Println(p2p.crdclient.Create(&PodToIp{
			Metadata: metav1.ObjectMeta{
				Name: pod.ObjectMeta.Name,
				Namespace: pod.Namespace,
			},
			PodName:     pod.ObjectMeta.Name,
			PodAddress:  pod.Status.PodIP,
		}))
		return false
	}
	for {
		if quit := workFunc(); quit {
			fmt.Printf("pod worker shutting down")
			return
		}
	}
}
func newPodipcontroller(kubeconfig string) *Podipcontroller{
	p2p:=&Podipcontroller{
		kubeClient:getClientsetOrDie(kubeconfig),
		crdclient: getCRDClientOrDie(kubeconfig),
		podsQueue:  workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "pods"),
	}
	watchList:=cache.NewListWatchFromClient(p2p.kubeClient.CoreV1().RESTClient(),"pods",v1.NamespaceAll,fields.Everything())
	p2p.podStore,p2p.podController=cache.NewInformer(
		watchList,
		&v1.Pod{},
		time.Second*30,
		cache.ResourceEventHandlerFuncs{
			AddFunc: p2p.enqueuePod,
			UpdateFunc: p2p.updatePod,
		},
	)
	return p2p
}

func (p2p *Podipcontroller) enqueuePod(obj interface{}) {
	key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
	if err != nil {
		fmt.Printf("Couldn't get key for object %+v: %v", obj, err)
		return
	}
	p2p.podsQueue.Add(key)
}

func (p2p *Podipcontroller) updatePod(oldObj, newObj interface{}) {
	oldPod := oldObj.(*v1.Pod)
	newPod := newObj.(*v1.Pod)

	if newPod.Status.PodIP == oldPod.Status.PodIP {
		return
	}
	p2p.enqueuePod(newObj)
}

type Podipclient struct {
	rest *rest.RESTClient
}

type PodToIp struct {
	metav1.TypeMeta `json:",inline"`
	Metadata        metav1.ObjectMeta `json:"metadata"`

	PodName     string    `json:"podName"`
	PodAddress  string    `json:"podAddress"`
}
func getClientsetOrDie(kubeconfig string) *kubernetes.Clientset {
	// Create the client config. Use kubeconfig if given, otherwise assume in-cluster.
	config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
	if err != nil {
		panic(err)
	}

	clientset, err := kubernetes.NewForConfig(config)
	if err != nil {
		panic(err)
	}
	//clientset.CoreV1().Pods("111").Get("",nil)
	return clientset
}


func (c *Podipclient) Create(body *PodToIp) (*PodToIp, error) {
	var ret PodToIp
	err := c.rest.Post().
		Resource("podtoips").
		Namespace(body.Metadata.Namespace).
		Body(body).
		Do().Into(&ret)
	return &ret, err
}

func (c *Podipclient) Update(body *PodToIp) (*PodToIp, error) {
	var ret PodToIp
	err := c.rest.Put().
		Resource("podtoips").
		Namespace(body.Metadata.Namespace).
		Name(body.Metadata.Name).
		Body(body).
		Do().Into(&ret)
	return &ret, err
}

func (c *Podipclient) Get(name string,namespace string) (*PodToIp, error) {
	var ret PodToIp
	err := c.rest.Get().
		Resource("podtoips").
		Namespace(namespace).
		Name(name).
		Do().Into(&ret)
	return &ret, err
}

func (c *Podipclient) Delete(name string,namespace string) (*PodToIp, error) {
	var ret PodToIp
	err := c.rest.Delete().
		Resource("podtoips").
		Namespace(namespace).
		Name(name).
		Do().Into(&ret)
	return &ret, err
}

func getCRDClientOrDie(kubeconfig string) *Podipclient {
	config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
	if err != nil {
		panic(err)
	}
	configureClient(config)
	rest, err := rest.RESTClientFor(config)
	if err != nil {
		panic(err)
	}
	return &Podipclient{rest}
}

func configureClient(config *rest.Config) {
	groupversion := schema.GroupVersion{
		Group:   "example.com",
		Version: "v1",
	}

	config.GroupVersion = &groupversion
	config.APIPath = "/apis"
	// Currently TPR only supports JSON
	config.ContentType = runtime.ContentTypeJSON
	config.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: scheme.Codecs}

	schemeBuilder := runtime.NewSchemeBuilder(
		func(scheme *runtime.Scheme) error {
			scheme.AddKnownTypes(
				groupversion,
				&PodToIp{},
				&PodToIpList{},
				&metav1.ListOptions{},
				&metav1.DeleteOptions{},
			)
			return nil
		})
	schemeBuilder.AddToScheme(scheme.Scheme)
}

type PodToIpList struct {
	metav1.TypeMeta `json:",inline"`
	Metadata        metav1.ListMeta `json:"metadata"`

	Items []PodToIp `json:"items"`
}


func (in *PodToIp) DeepCopy() *PodToIp {
	if in == nil {
		return nil
	}
	out := new(PodToIp)
	in.DeepCopyInto(out)
	return out
}

// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object.
func (in *PodToIp) DeepCopyObject() runtime.Object {
	if c := in.DeepCopy(); c != nil {
		return c
	} else {
		return nil
	}
}

// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *PodToIp) DeepCopyInto(out *PodToIp) {
	*out = *in
	return
}


func (in *PodToIpList) DeepCopy() *PodToIpList {
	if in == nil {
		return nil
	}
	out := new(PodToIpList)
	in.DeepCopyInto(out)
	return out
}

// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object.
func (in *PodToIpList) DeepCopyObject() runtime.Object {
	if c := in.DeepCopy(); c != nil {
		return c
	} else {
		return nil
	}
}

// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *PodToIpList) DeepCopyInto(out *PodToIpList) {
	*out = *in
	return
}