Client-Go informer机制
在之前的文章中我们了解了client-go的使用,接下来我们深入的了解一下client-go中的informer机制.
client-go介绍
client-go包下主要有以下几个子包:
discovery:通过Kubernetes API 进行服务发现
dynamic:对任意Kubernetes对象执行通用操作的动态client
informers:一个非常牛逼的交互方式,通过reflector watch资源的事件放入队列(DeltaFIFO)中,通过sharedProcessor的pendingNotifications(buffer.RingGrowing)来分发事件到具体的ResourceEventHandler中的OnAdd/OnUpdate/OnDelete进行处理.
kubernetes:一组内置的clientset集合,在上一篇文章中我们都有用过
tools/clientcmd:提供了创建客户端的一些基础的工具
还有一些其他的包需要自行去了解.. 毕竟这个包里面内容还是比较多.
通过这些包,client-go为我们提供了4种方式来处理资源:
REST Client
提供rest的方式与kubernetes交互,提供的方法大多是基于rest协议的方法,例如 get,post,put,delete.
其特点如下:
1,支持json和protobuf
2,支持原生的资源和crd资源
ClientSet
client-go调用kubernetes的资源最基础也最实用的方式就是clientSet,并且在上一篇文章中也是使用此方式来调用.
clientset主要根据group,kind和version来获取资源,并提供了可选的namespace字段,其特点如下:
1,十分优雅
2,支持所有原生资源
3,提供了deepCopy,保证了序列化反序列化性能.
DynamicClient
dynamicClient是一种动态的client,实际为RestClient的包装,主要在garbage collector
和namespace controller
中有应用,主要关注通用的属性的值,特点如下:
1,支持所有资源
2,只支持json
3,返回值为map
informer
client-go中区别其他语言的client的一个较为牛逼的设计就在于此,其他语言很多都直接通过rest api同kubernetes交互,虽然在少量的情况下是没有问题的,但是在大量的连接下会出现各种问题,例如连接太多,状态不同步等等问题,client-go作为在kubernetes源码中都有很多应用的库,在解决这些问题上提供了一个高端的做法.
informer在初始化的时先通过reflector List去从Kubernetes API中取出资源的全部object对象,并同时缓存,然后开启Watch的机制去监控资源,并放入DeltaFIFO中.这样每次获取资源不需要再去kubernetes实时获取,而是通过本地的indexer缓存得到对象,整体连接资源的使用率都降低了不少. Informer还提供了handler机制,需要提供ResourceEventHandler接口的实现来响应OnAdd/OnUpdate/OnDelete事件,当然这其中也少不了功劳,不过这里使用了更为高级的DeltaFIFO+RingGrowing(环形队列)来实现.
informer提供了内置的原生的资源的支持,不过对于其他crd资源,需要使用kubernetes的另外一个项目 code-generator 来进行生成(其实kubernetes中的代码有很多生成的)
整体架构如下图:
informer使用
informer的使用方式也比较简单,主要通过NewSharedInformerFactory来获取一个实例.
//https://github.com/kubernetes/client-go/blob/master/examples/workqueue/main.go#L174
//https://github.com/kubernetes/sample-controller/blob/master/controller.go#L87:6
func TestInformer(t *testing.T) {
config, e := clientcmd.BuildConfigFromFlags("10.30.21.238:6443", "/home/tangxu/.kube/config")
if e != nil {
println(e.Error())
return
}
client, e := kubernetes.NewForConfig(config)
if e != nil {
println(e.Error())
return
}
stopChan := make(chan struct{})
factory := informers.NewSharedInformerFactory(client, 30*time.Second)
podInformer := factory.Core().V1().Pods()
factory.Apps().V1().Deployments()
//添加eventHandler
podInformer.Informer().AddEventHandler(&EventHandler{})
//使用lister方式获取pod资源
ret1, e := podInformer.Lister().List(labels.Nothing())
fmt.Println(ret1, e)
//也可以通过此方式获取informer
informer, e := factory.ForResource(schema.GroupVersionResource{
Group: "apps",
Version: "v1",
Resource: "deployments",
})
if e != nil {
println(e.Error())
}
//具体某个资源的group,version,resource的获取如下:
fmt.Println(appsv1.SchemeGroupVersion.WithResource("deployments"))
//获取列表
ret, e := informer.Lister().ByNamespace("kube-system").List(labels.Nothing())
fmt.Println(ret, e)
factory.Start(stopChan)
time.Sleep(5 * time.Minute)
stopChan <- EventHandler{}
}
type EventHandler struct {
}
func (*EventHandler) OnAdd(obj interface{}) {
fmt.Println("OnAdd")
bytes, _ := json.Marshal(obj)
fmt.Println(string(bytes))
}
func (*EventHandler) OnUpdate(oldObj, newObj interface{}) {
fmt.Println("OnUpdate")
bytes1, _ := json.Marshal(oldObj)
bytes2, _ := json.Marshal(newObj)
fmt.Println(string(bytes1))
fmt.Println(string(bytes2))
}
func (*EventHandler) OnDelete(obj interface{}) {
fmt.Println("OnDelete")
bytes, _ := json.Marshal(obj)
fmt.Println(string(bytes))
}
通过以上的示例大概能表达出informer的使用方法,如果需要深入的了解 还需要自己在编辑器中写代码来体会.
参照
Kubernetes Deep Dive: Code Generation for CustomResources (非常好的文章)