目录

K8S go-client 源码分析

K8S go-client 源码分析,k8s controller开发所需

本文参考了CSDN博主「进德」的原创文章,原文链接:https://blog.csdn.net/weixin_42663840/article/details/81699303, 并做了部分内容和注释说明修改:

Client-go

client-go 设计

可以看到client-go主要模块有:

  • client
    • restclient
    • clientset
    • dynamicclient
    • discoveryclient
  • informer
    • reflactor
    • deltafifo
    • indexer
  • workqueue

说明:client和workqueue源码分析不在本文中介绍。

设计图

由上可看出,进行k8s controller开发时,对以下几点进行定制开发:
  • informer的resource event handler处理
  • 对workqueue的入队和出队操作处理
  • 如果业务需要,对resource的本地存储操作
  • controller自身的业务层逻辑处理
informer类图

Informer

这里介绍常用的SharedInformer

不难看出Shared指的是多个listeners共享同一个cache,而且资源的变化会同时通知到cache和listeners。这个解释和上面图所展示的内容的是一致的,cache我们在Indexer的介绍中已经分析过了,listerners指的就是OnAdd、OnUpdate、OnDelete这些回调函数背后的对象,本文就要对Informer进行系统性的分析。我们先对上面的图做一些初步的认识:

  • List/Watch:List是列举apiserver中对象的接口,Watch是监控apiserver资源变化的接口;
  • Reflector:反射器,实现对apiserver指定类型对象的监控,其中反射实现的就是把监控的结果实例化成具体的对象;
  • DeltaIFIFO:将Reflector监控的变化的对象形成一个FIFO队列,此处的Delta就是变化
  • LocalStore:指的就是Indexer的实现cache,这里面缓存的就是apiserver中的对象(其中有一部分可能还在DeltaFIFO中),此时使用者再查询对象的时候就直接从cache中查找,减少了apiserver的压力;
  • Callbacks:通知回调函数,Infomer感知的所有对象变化都是通过回调函数通知使用者(Listener),即event_handler处理;

ListerWatcher

ListerWatcher是一个interface类型 client-go/tools/cache/listwatch.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource.
type ListerWatcher interface {
    // List should return a list type object; the Items field will be extracted, and the
    // ResourceVersion field will be used to start the watch in the right place.
    List(options metav1.ListOptions) (runtime.Object, error)
    // Watch should begin a watch at the specified version.
    Watch(options metav1.ListOptions) (watch.Interface, error)
}

// ListFunc knows how to list resources
type ListFunc func(options metav1.ListOptions) (runtime.Object, error)

// WatchFunc knows how to watch resources
type WatchFunc func(options metav1.ListOptions) (watch.Interface, error)

// ListWatch knows how to list and watch a set of apiserver resources.  It satisfies the ListerWatcher interface.
// It is a convenience function for users of NewReflector, etc.
// ListFunc and WatchFunc must not be nil
type ListWatch struct {
    ListFunc  ListFunc
    WatchFunc WatchFunc
    // DisableChunking requests no chunking for this list watcher. It has no effect in Kubernetes 1.8, but in
    // 1.9 will allow a controller to opt out of chunking.
    DisableChunking bool
}

需要注意一点:ListerWatcher是针对某一类对象的,比如Pod,不是所有对象的,这个在构造ListerWatcher对象的时候由apiserver的client类型决定了。

Reflector

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
// 代码源自client-go/tools/cache/reflector.go
type Reflector struct {
    name string                                 // 名字
    metrics *reflectorMetrics                   // 但凡遇到metrics多半是用于做监控的,可以忽略
    expectedType reflect.Type                   // 反射的类型,也就是要监控的对象类型,比如Pod
    store Store                                 // 存储,就是DeltaFIFO,为什么,后面会有代码证明
    listerWatcher ListerWatcher                 // 这个是用来从apiserver获取资源用的
    period       time.Duration                  // 反射器在List和Watch的时候理论上是死循环,只有出现错误才会退出
                                                // 这个变量用在出错后多长时间再执行List和Watch,默认值是1秒钟
    resyncPeriod time.Duration                  // 重新同步的周期,很多人肯定认为这个同步周期指的是从apiserver的同步周期
                                                // 其实这里面同步指的是shared_informer使用者需要定期同步全量对象
    ShouldResync func() bool                    // 如果需要同步,调用这个函数问一下,当然前提是该函数指针不为空
    clock clock.Clock                           // 时钟
    lastSyncResourceVersion string              // 最后一次同步的资源版本
    lastSyncResourceVersionMutex sync.RWMutex   // 还专门为最后一次同步的资源版本弄了个锁
}

  1. listerWatcher用于获取和监控资源,lister可以获取对象的全量,watcher可以获取对象的增量(变化);
  2. 系统会周期性的执行list-watch的流程,一旦过程中失败就要重新执行流程,这个重新执行的周期就是period指定的;
  3. expectedType规定了监控对象的类型,非此类型的对象将会被忽略;
  4. 实例化后的expectedType类型的对象会被添加到store中;
  5. kubernetes资源在apiserver中都是有版本的,对象的任何除了修改(添加、删除、更新)都会造成资源版本更新,所以lastSyncResourceVersion就是指的这个版本;
  6. 如果使用者需要定期同步全量对象,那么Reflector就会定期产生全量对象的同步事件给DeltaFIFO;

Run

Reflector有一个Run()函数,这个是Reflector的核心功能流程

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
// 代码源自client-go/tools/cache/reflector.go
func (r *Reflector) Run(stopCh <-chan struct{}) {
    // func Until(f func(), period time.Duration, stopCh <-chan struct{})是下面函数的声明
    // 这里面我们不用关心wait.Until是如何实现的,只要知道他调用函数f会被每period周期执行一次
    // 意思就是f()函数执行完毕再等period时间后在执行一次,也就是r.ListAndWatch()会被周期性的调用
    wait.Until(func() {
        if err := r.ListAndWatch(stopCh); err != nil {
            utilruntime.HandleError(err)
        }
    }, r.period, stopCh)
}

ListAndWatch

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147

// 代码源自client-go/tools/cache/reflector.go
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
    var resourceVersion string
    // 很多存储类的系统都是这样设计的,数据采用版本的方式记录,数据每变化(添加、删除、更新)都会触发版本更新,
    // 这样的做法可以避免全量数据访问。以apiserver资源监控为例,只要监控比缓存中资源版本大的对象就可以了,
    // 把变化的部分更新到缓存中就可以达到与apiserver一致的效果,一般资源的初始版本为0,从0版本开始列举就是全量的对象了
    // 1. 从版本号0 开始list,List()可以从缓存中提供,并且可能会相对于etcd内容延迟。
    // 最终,由watch进行监听更新
    options := metav1.ListOptions{ResourceVersion: "0"}
    // 与监控相关的内容不多解释
    r.metrics.numberOfLists.Inc()
    start := r.clock.Now()
    // 列举资源,这部分是apimachery相关的内容,读者感兴趣可以自己了解
    list, err := r.listerWatcher.List(options)
    if err != nil {
        return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedType, err)
    }
    // 还是监控相关的
    r.metrics.listDuration.Observe(time.Since(start).Seconds())
    // 下面的代码主要是利用apimachinery相关的函数实现,就是把列举返回的结果转换为对象数组
    // 下面的代码大部分来自apimachinery,此处不做过多说明,读者只要知道实现什么功能就行了
    listMetaInterface, err := meta.ListAccessor(list)
    if err != nil {
        return fmt.Errorf("%s: Unable to understand list result %#v: %v", r.name, list, err)
    }
    // 2. 根据listMeta ResourceVersion的版本号,决定从哪里开始watch
    resourceVersion = listMetaInterface.GetResourceVersion()
    // 3. 将资源数据转换成资源列表 runtime.Object -> []runtime.Object
    items, err := meta.ExtractList(list)
    if err != nil {
        return fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err)
    }
    // 和监控相关的内容
    r.metrics.numberOfItemsInList.Observe(float64(len(items)))
    // 以上部分都是对象实例化的过程,可以称之为反射,也是Reflector这个名字的主要来源,本文不是讲解反射原理的,
    // 而是作为SharedInformer的前端,所以我们重点介绍的是对象在SharedInformer中流转过程,所以反射原理部分不做为重点讲解
    // 这可是真正从apiserver同步过来的全量对象,所以要同步到DeltaFIFO中
    // 4. 将资源对象列表和资源版本号 以r.store.Replace方式存储到DeltaFIFO中
    if err := r.syncWith(items, resourceVersion); err != nil {
        return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)
    }
    // 设置最新的同步的对象版本
    // 5. 设置最新的资源版本号
    r.setLastSyncResourceVersion(resourceVersion)
    // 下面要启动一个后台协程实现定期的同步操作,这个同步就是将SharedInformer里面的对象全量以同步事件的方式通知使用者
    // 我们暂且称之为“后台同步协程”,Run()函数退出需要后台同步协程退出,所以下面的cancelCh就是干这个用的
    // 利用defer close(cancelCh)实现的,而resyncerrc是后台同步协程反向通知Run()函数的报错通道
    // 当后台同步协程出错,Run()函数接收到信号就可以退出了
    resyncerrc := make(chan error, 1)
    cancelCh := make(chan struct{})
    defer close(cancelCh)
    // 下面这个匿名函数就是后台同步协程的函数了
    // 6. 开启Resync协程: 根据ShouldResync标识,重新同步处理store.Resync()
    // 这个同步Resync,不是从apiserver中获取,而是从indexer cache的resync处理
    go func() {
        // resyncCh返回的就是一个定时器,如果resyncPeriod这个为0那么就会返回一个永久定时器,cleanup函数是用来清理定时器的
        resyncCh, cleanup := r.resyncChan()
        defer func() {
            cleanup() 
        }()
        // 死循环等待各种信号
        for {
            // 只有定时器有信号才继续处理,其他的都会退出
            // 注意case <-resyncCh 定时同步分支,没有return,会继续执行r.store.Resync()处理
            select {
            case <-resyncCh:
            case <-stopCh:
                return
            case <-cancelCh:
                return
            }
            // ShouldResync是个函数地址,创建反射器对象的时候传入,即便时间到了,也要通过函数问问是否需要同步
            if r.ShouldResync == nil || r.ShouldResync() {
                // 我们知道这个store是DeltaFIFO,DeltaFIFO.Resync(): 把knownObjects的objs同步更新到delta_fifo中
                // 就在这里实现了我们前面提到的同步,从这里看所谓的同步就是以全量对象同步事件的方式通知使用者
                if err := r.store.Resync(); err != nil {
                    resyncerrc <- err
                    return
                }
            }
            // 清理掉当前的计时器,获取下一个同步时间定时器
            cleanup()
            resyncCh, cleanup = r.resyncChan()
        }
    }()
 
    // 前面已经列举了全量对象,接下来就是watch的逻辑了
    // 7. 从 资源版本号resourceVersion 开始进行watch处理
    for {
        // 如果有退出信号就立刻返回,否则就会往下走,因为有default.
        select {
        case <-stopCh:
            return nil
        default:
        }
 
        // 计算watch的超时时间
        timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
        // 设置watch的选项,因为前期列举了全量对象,从这里只要监听最新版本以后的资源就可以了
        // 如果没有资源变化总不能一直挂着吧?也不知道是卡死了还是怎么了,所以有一个超时会好一点
        options = metav1.ListOptions{
            ResourceVersion: resourceVersion,
            TimeoutSeconds: &timeoutSeconds,
        }
        // 监控相关
        r.metrics.numberOfWatches.Inc()
        // 开始监控对象
        // 7.1 watch函数阻塞处理,返回w,即监控事件流ResultChan
        w, err := r.listerWatcher.Watch(options)
        // watch产生错误了,大部分错误就要退出函数然后再重新来一遍流程
        if err != nil {
            switch err {
            case io.EOF:
            case io.ErrUnexpectedEOF:
            default:
                utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedType, err))
            }
            // 类似于网络拒绝连接的错误要等一会儿再试,因为可能网络繁忙
            if urlError, ok := err.(*url.Error); ok {
                if opError, ok := urlError.Err.(*net.OpError); ok {
                    if errno, ok := opError.Err.(syscall.Errno); ok && errno == syscall.ECONNREFUSED {
                        time.Sleep(time.Second)
                        continue
                    }
                }
            }
            return nil
        }
 
        // watch返回是流,apiserver会将变化的资源通过这个流发送出来,client-go最终通过chan实现的
        // 所以watchHandler()是一个需要持续从chan读取数据的流程,所以需要传入resyncerrc和stopCh
        // 用于异步通知退出或者后台同步协程错误
        // 7.2 watch 事件处理
        //  Added    EventType = "ADDED"
        //  Modified EventType = "MODIFIED"
        //  Deleted  EventType = "DELETED"
        //  Error    EventType = "ERROR"
        if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil {
            if err != errorStopRequested {
                glog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedType, err)
            }
            return nil
        }
    }
}

上面的函数中,调用了两个私有函数,分别为syncWith()和watchHandler()。syncWith()用于实现一次从apiserver全量对象的同步,这里的同步和我们上面提到的同步不是一回事,这里指的是从apiserver的同步。watchHandler是实现监控apiserver资源变化的处理过程,主要就是把apiserver的资源变化转换为DeltaFIFO调用。我们接下来就看这两个函数的具体实现

接下来我们就要看看watchHandler做了什么?

syncWith和watchHandler

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96

// 代码源自client-go/tools/cache/reflector.go
// 实现apiserver全量对象的同步
func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {
    // 做一次slice类型转换
    found := make([]interface{}, 0, len(items))
    for _, item := range items {
        found = append(found, item)
    }
    // 直接调用了DeltaFIFO的Replace()接口,这个接口就是用于同步全量对象的
    return r.store.Replace(found, resourceVersion)
}
// 实现从watch返回的chan中持续读取变化的资源,并转换为DeltaFIFO相应的调用
func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
    start := r.clock.Now()
    eventCount := 0
    // 监控相关
    defer func() {
        r.metrics.numberOfItemsInWatch.Observe(float64(eventCount))
        r.metrics.watchDuration.Observe(time.Since(start).Seconds())
    }()
 
    // 这里就开始无限循环的从chan中读取资源的变化,也可以理解为资源的增量变化,同时还要监控各种信号
loop:
    for {
        select {
        // 系统退出信号
        case <-stopCh:
            return errorStopRequested
        // 后台同步协程出错信号
        case err := <-errc:
            return err
        // watch函数返回的是一个chan,通过这个chan持续的读取对象
        case event, ok := <-w.ResultChan():
            // 如果不OK,说明chan关闭了,就要重新获取,这里面我们可以推测这个chan可能会运行过程中重新创建
            // 否则就应该退出而不是继续循环
            if !ok {
                break loop
            }
            // 看来event可以作为错误的返回值,挺有意思,而不是通过关闭chan,这种方式可以传递错误信息,关闭chan做不到
            if event.Type == watch.Error {
                return apierrs.FromObject(event.Object)
            }
            // 这里面就是利用反射实例化对象了,而且判断了对象类型是我们设定的类型
            if e, a := r.expectedType, reflect.TypeOf(event.Object); e != nil && e != a {
                utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a))
                continue
            }
            // 和list操作相似,也要获取对象的版本,要更新缓存中的版本,下次watch就可以忽略这些资源了
            meta, err := meta.Accessor(event.Object)
            if err != nil {
                utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
                continue
            }
            newResourceVersion := meta.GetResourceVersion()
            // 根据事件的类型做不同的DeltaFIFO的操作
            switch event.Type {
            // 向DeltaFIFO添加一个添加的Delta
            case watch.Added:
                err := r.store.Add(event.Object)
                if err != nil {
                    utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))
                }
            // 更新对象,向DeltaFIFO添加一个更新的Delta
            case watch.Modified:
                err := r.store.Update(event.Object)
                if err != nil {
                    utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err))
            }
            // 删除对象,向DeltaFIFO添加一个删除的Delta
            case watch.Deleted:
                err := r.store.Delete(event.Object)
                if err != nil {
                    utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err))
            }
            // 其他类型就不知道干什么了,只能报错
            default:
            utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
            }
            // 更新最新资源版本
            *resourceVersion = newResourceVersion
            r.setLastSyncResourceVersion(newResourceVersion)
            eventCount++
        }
    }
    // watch返回时间非常短而且没有任何事件要处理,这个属于异常现象,因为我们watch是设置了超时的
    watchDuration := r.clock.Now().Sub(start)
    if watchDuration < 1*time.Second && eventCount == 0 {
        r.metrics.numberOfShortWatches.Inc()
        return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", r.name)
    }
 
    return nil
}


resyncChan

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 代码源自client-go/tools/cache/reflector.go
func (r *Reflector) setLastSyncResourceVersion(v string) {
    // 设置已经获取到资源的最新版本
    r.lastSyncResourceVersionMutex.Lock()
    defer r.lastSyncResourceVersionMutex.Unlock()
    r.lastSyncResourceVersion = v
 
    rv, err := strconv.Atoi(v)
    if err == nil {
        r.metrics.lastResourceVersion.Set(float64(rv))
    }
}
 
// 获取resync定时器,叫定时器比较好理解,叫chan很难和定时关联起来
func (r *Reflector) resyncChan() (<-chan time.Time, func() bool) {
    // 如果resyncPeriod说明就不用定时同步,返回的是永久超时的定时器
    if r.resyncPeriod == 0 {
        return neverExitWatch, func() bool { return false }
    }
    // 构建定时起
    t := r.clock.NewTimer(r.resyncPeriod)
    return t.C(), t.Stop
}

说明

  1. Reflector利用apiserver的client列举全量对象(版本为0以后的对象全部列举出来)
  2. 将全量对象采用Replace()接口同步到DeltaFIFO中,并且更新资源的版本号,这个版本号后续会用到;
  3. 开启一个协程定时执行resync,如果没有设置定时同步则不会执行,同步就是把全量对象以同步事件的方式通知出去;注意:这个resync操作不是跟apiserver的交互操作
  4. 通过apiserver的client监控(watch)资源,监控的当前资源版本号以后的对象,因为之前的都已经获取到了;
  5. 一旦有对象发生变化,那么就会根据变化的类型(新增、更新、删除)调用DeltaFIFO的相应接口,产生一个相应的对象Delta,同时更新当前资源的版本;

Controller

这里的controller定义在client-go/tools/cache/controller.go中,目的是用来把Reflector、DeltaFIFO组合起来形成一个相对固定的、标准的处理流程。理解了Controller,基本就算把SharedInfomer差不多搞懂了。

实际上informer本质上就是个controller

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21

// 代码源自client-go/tools/cache/controller.go
// 这是一个Controller的抽象
type Controller interface {
    Run(stopCh <-chan struct{})      // 核心流程函数
    HasSynced() bool                 // apiserver中的对象是否已经同步到了Store中
    LastSyncResourceVersion() string // 最新的资源版本号
}


type Config struct {
    Queue                          // SharedInformer使用DeltaFIFO
    ListerWatcher                  // 这个用来构造Reflector
    Process ProcessFunc            // 这个在调用DeltaFIFO.Pop()使用,弹出对象要如何处理
    ObjectType runtime.Object      // 对象类型,这个肯定是Reflector使用
    FullResyncPeriod time.Duration // 全量同步周期,这个在Reflector使用
    ShouldResync ShouldResyncFunc  // Reflector在全量更新的时候会调用该函数询问
    RetryOnError bool              // 错误是否需要尝试
}


从上面的定义来看,HasSynced()可调用DeltaFIFO. HasSynced()实现,LastSyncResourceVersion()可以通过Reflector实现。因为Controller把多个模块整合起来实现了一套业务逻辑,所以在创建Controller需要提供一些配置

从上面两个类型的定义我们可以猜测:Controller自己构造Reflector获取对象,Reflector作为DeltaFIFO生产者持续监控apiserver的资源变化并推送到队列中。Controller的Run()应该是队列的消费者,从队列中弹出对象并调用Process()处理。所以Controller相比于Reflector因为队列的加持表现为每次有资源变化就会调用一次使用者定义的处理函数。

流程处理

Run

  • NewReflector,Reflector生产者,进行obj入队操作
  • processLoop,是消费者,进行obj出队和存储操作
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// 代码源自client-go/tools/cache/controller.go
// controller是Controller的实现类型
type controller struct {
    config         Config       // 配置,上面有讲解
    reflector      *Reflector   // 反射器
    reflectorMutex sync.RWMutex // 反射器的锁
    clock          clock.Clock  // 时钟
}
// 核心业务逻辑实现
func (c *controller) Run(stopCh <-chan struct{}) {
    defer utilruntime.HandleCrash()
    // 创建一个协程,如果收到系统退出的信号就关闭队列,相当于在这里析构的队列
    go func() {
        <-stopCh
        c.config.Queue.Close()
    }()
    // 创建Reflector
    r := NewReflector(
        c.config.ListerWatcher,
        c.config.ObjectType,
        c.config.Queue,
        c.config.FullResyncPeriod,
    )
    // r.ShouldResync的存在就是为了以后使用少些一点代码?否则直接使用c.config.ShouldResync不就完了么?不明白用意
    r.ShouldResync = c.config.ShouldResync
    r.clock = c.clock
    // 记录反射器
    c.reflectorMutex.Lock()
    c.reflector = r
    c.reflectorMutex.Unlock()
    // wait.Group不是本章的讲解内容,只要把它理解为类似barrier就行了
    // 被他管理的所有的协程都退出后调用Wait()才会退出,否则就会被阻塞
    var wg wait.Group
    defer wg.Wait()
    // StartWithChannel()会启动协程执行Reflector.Run(),同时接收到stopCh信号就会退出协程
    wg.StartWithChannel(stopCh, r.Run)
    // wait.Until()在前面的章节讲过了,周期性的调用c.processLoop(),这里来看是1秒
    // 不用担心调用频率太高,正常情况下c.processLoop是不会返回的,除非遇到了解决不了的错误,因为他是个循环
    wait.Until(c.processLoop, time.Second, stopCh)
}

processLoop

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
// 代码源自client-go/tools/cache/controller.go
func (c *controller) processLoop() {
    for {
        // 从队列中弹出一个对象,然后处理它,这才是最主要的部分,这个c.config.Process是构造Controller的时候通过Config传进来的
        // 所以这个读者要特别注意了,这个函数其实是ShareInformer传进来的,所以在分析SharedInformer的时候要重点分析的
        // 核心处理逻辑实现在了Process函数中了
        obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
        if err != nil {
            // 如果FIFO关闭了那就退出
            if err == FIFOClosedError {
                return
            }
            // 如果错误可以再试试
            if c.config.RetryOnError {
                c.config.Queue.AddIfNotPresent(obj)
            }
        }
    }
}

HasSynced

HasSynced 表示队列中的全量对象都已同步完成

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
// 代码源自client-go/tools/cache/controller.go

// Returns true once this controller has completed an initial resource listing
func (c *controller) HasSynced() bool {
    return c.config.Queue.HasSynced()
}


// Return true if an Add/Update/Delete/AddIfNotPresent are called first,
// or an Update called first but the first batch of items inserted by Replace() has been popped
func (f *DeltaFIFO) HasSynced() bool {
    f.lock.Lock()
    defer f.lock.Unlock()
    // 这里就比较明白了,一次同步全量对象后,并且全部Pop()出去才能算是同步完成
    // 其实这里所谓的同步就是全量内容已经进入Indexer,Indexer已经是系统中对象的全量快照了
    return f.populated && f.initialPopulationCount == 0
}


SharedInformer

SharedInformer接口

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41

// 代码源自client-go/tools/cache/shared_informer.go
type SharedInformer interface {
    // 添加资源事件处理器,关于ResourceEventHandler的定义在下面
    // 相当于注册回调函数,当有资源变化就会通过回调通知使用者,是不是能和上面介绍的Controller可以联系上了?
    // 为什么是Add不是Reg,说明可以支持多个handler
    AddEventHandler(handler ResourceEventHandler)
    // 上面添加的是不需要周期同步的处理器,下面的接口添加的是需要周期同步的处理器,周期同步上面提了好多遍了,不赘述
    AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration)
    // Store这个有专门的文章介绍,这个函数就是获取Store的接口,说明SharedInformer内有Store对象
    GetStore() Store
    // Controller在上面的章节介绍了,说明SharedInformer内有Controller对象
    GetController() Controller
    // 这个应该是SharedInformer的核心逻辑实现的地方
    Run(stopCh <-chan struct{})
    // 因为有Store,这个函数就是告知使用者Store里面是否已经同步了apiserver的资源,这个接口很有用
    // 当创建完SharedInformer后,通过Reflector从apiserver同步全量对象,然后在通过DeltaFIFO一个一个的同志到cache
    // 这个接口就是告知使用者,全量的对象是不是已经同步到了cache,这样就可以从cache列举或者查询了
    HasSynced() bool
    // 最新同步资源的版本,这个就不多说了,通过Controller(Controller通过Reflector)实现
    LastSyncResourceVersion() string
}
// 扩展了SharedInformer类型,从类型名字上看共享的是Indexer,Indexer也是一种Store的实现
type SharedIndexInformer interface {
    // 继承了SharedInformer
    SharedInformer
    // 扩展了Indexer相关的接口
    AddIndexers(indexers Indexers) error
    GetIndexer() Indexer
}
// 代码源自client-go/tools/cache/controller.go,SharedInformer使用者如果需要处理资源的事件
// 那么就要自己实现相应的回调函数
type ResourceEventHandler interface {
    // 添加对象回调函数
    OnAdd(obj interface{})
    // 更新对象回调函数
    OnUpdate(oldObj, newObj interface{})
    // 删除对象回调函数
    OnDelete(obj interface{})
}

sharedIndexInformer类

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

// 代码源自client-go/tools/cache/shared_informer.go
type sharedIndexInformer struct {
    // Indexer也是一种Store,这个我们知道的,Controller负责把Reflector和FIFO逻辑串联起来
    // 所以这两个变量就涵盖了开篇那张图里面的Reflector、DeltaFIFO和LocalStore(cache)
    indexer    Indexer
    controller Controller
    // sharedIndexInformer把上面提到的ResourceEventHandler进行了在层封装,并统一由sharedProcessor管理,后面章节专门介绍
    processor             *sharedProcessor
    // CacheMutationDetector其实没啥用,我理解是开发者自己实现的一个调试工具,用来发现对象突变的
    // 实现方法也比较简单,DeltaFIFO弹出的对象在处理前先备份(深度拷贝)一份,然后定期比对两个对象是否相同
    // 如果不同那就报警,说明处理过程中有人修改过对象,这个功能默认是关闭,所以我说没啥用
    cacheMutationDetector CacheMutationDetector
    // 这两个变量是给Reflector用的,我们知道Reflector是在Controller创建的
    listerWatcher ListerWatcher
    objectType    runtime.Object
    // 定期同步的周期,因为可能存在多个ResourceEventHandler,就有可能存在多个同步周期,sharedIndexInformer采用最小的周期
    // 这个周期值就存储在resyncCheckPeriod中,通过AddEventHandler()添加的处理器都采用defaultEventHandlerResyncPeriod
    resyncCheckPeriod time.Duration
    defaultEventHandlerResyncPeriod time.Duration
    // 时钟
    clock clock.Clock
    // 启动、停止标记,肯定有人会问为啥用两个变量,一个变量不就可以实现启动和停止了么?
    // 其实此处是三个状态,启动前,已启动和已停止,start表示了两个状态,而且为启动标记专门做了个锁
    // 说明启动前和启动后有互斥的资源操作
    started, stopped bool
    startedLock      sync.Mutex
 
    // 这个名字起的也是够了,因为DeltaFIFO每次Pop()的时候需要传入一个函数用来处理Deltas
    // 处理Deltas也就意味着要把消息通知给处理器,如果此时调用了AddEventHandler()
    // 就会存在崩溃的问题,所以要有这个锁,阻塞Deltas....细想名字也没毛病~
    blockDeltas sync.Mutex
}

CacheMutationDetector 【非重要,可忽略】

CacheMutationDetector这个就是检测对象在过程中突变的,何所谓突变呢?突变就是莫名其妙的修改了,如何实现突变检测,也是比较简单的。CacheMutationDetector对所有的对象做了一次深度拷贝(DeepCopy),然后定期比较两个对象是否一致,当发现有不同时说明对象突变了,然后就panic。我认为CacheMutationDetector是用来调试的,因为代码默认是关闭的:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// 代码源自client-go/tools/cache/mutation_detector.go
// 默认关闭突变检测
var mutationDetectionEnabled = false
// 但是可以通过环境变量的KUBE_CACHE_MUTATION_DETECTOR开启
func init() {
    mutationDetectionEnabled, _ = strconv.ParseBool(os.Getenv("KUBE_CACHE_MUTATION_DETECTOR"))
}
 
// 这个是突变检测的类型抽象
type CacheMutationDetector interface {
    AddObject(obj interface{})  // 用于记录所有的对象
    Run(stopCh <-chan struct{}) // 开启协程定期比对
}
// 创建CacheMutationDetector对象
func NewCacheMutationDetector(name string) CacheMutationDetector {
    // 如果没有开启选项就构造一个什么都不做的对象
    if !mutationDetectionEnabled {
        return dummyMutationDetector{}
    }
    // 如果开启了选项,那么就构造一个默认的突变检测器
    glog.Warningln("Mutation detector is enabled, this will result in memory leakage.")
    return &defaultCacheMutationDetector{name: name, period: 1 * time.Second}
}
// 这就是什么都不做的突变检测器
type dummyMutationDetector struct{}
func (dummyMutationDetector) Run(stopCh <-chan struct{}) {
}
func (dummyMutationDetector) AddObject(obj interface{}) {
}

sharedProcessor

有没有感觉shared这个词被kubernetes玩儿坏了(继controller之后有一个背玩儿坏的单词),sharedProcessor这又shared啥了?首先需要知道Processor的定义,这里定义的Processor就是处理事件的东西。什么事件,就是SharedInformer向外部通知的事件。因为官方代码没有注释,我猜是shared是同一个SharedInformer,有没有很绕嘴?还有更绕的在后面呢,我们还要了解一个新的类型,那就是processorListener,processor刚说完,又来了个Listener!

通过SharedInformer.AddEventHandler()添加的处理器最终就会封装成processorListener,然后通过sharedProcessor管理起来,通过processorListener的封装就可以达到所谓的有事处理,没事挂起。

processorListener

rocessorListener可以理解为两个核心功能,一个是processor,一个是listener,用一句话概括,有事做事没事挂起。先看看processorListener的定义:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
// 代码源自clien-go/tools/cache/shared_informer.go
type processorListener struct {
    // nextCh、addCh、handler、pendingNotifications的用法请参看我的《golang的chan有趣用法》里面有相关的例子
    // 总结这四个变量实现了事件的输入、缓冲、处理,事件就是apiserver资源的变化
    nextCh chan interface{}
    addCh  chan interface{}
    handler ResourceEventHandler
    pendingNotifications buffer.RingGrowing
    // 下面四个变量就是跟定时同步相关的了,requestedResyncPeriod是处理器设定的定时同步周期
    // resyncPeriod是跟sharedIndexInformer对齐的同步时间,因为sharedIndexInformer管理了多个处理器
    // 最终所有的处理器都会对齐到一个周期上,nextResync就是下一次同步的时间点
    requestedResyncPeriod time.Duration
    resyncPeriod time.Duration
    nextResync time.Time
    resyncLock sync.Mutex
}

我们需要知道就是processor如何接收事件(此处事件就是apiserver的资源变化,也就是DeltaFIFO输出的Deltas)?如何通知事件处理器?如何缓冲处理器?如何阻塞处理器进而形成listener的?一系列的问题我们需要沿着处理逻辑的流程逐一解释。第一个问题,事件是如何传入的:

1
2
3
4
5
// 代码源自client-go/tools/cache/shared_informer.go
// 对,就这么简单,通过addCh传入,这里面的notification就是我们所谓的事件
func (p *processorListener) add(notification interface{}) {
    p.addCh <- notification
}

因为addCh是无缓冲chan,调用add()函数的人是事件分发器processor.distribute。意思就是从DeltaFIFO弹出的Deltas要要逐一送到多个处理器,此时如果处理器没有及时处理会造成addCh把分发器阻塞,那别的处理器也就同样无法收到新的事件了。这一点,processorListener利用一个后台协程处理这个问题(相应的原理参看《golang的chan有趣用法》):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43

// 代码源自client-go/tools/cache/shared_informer.go
// 这个函数是通过sharedProcessor利用wait.Group启动的,读者可以自行查看wait.Group
func (p *processorListener) pop() {
    defer utilruntime.HandleCrash()
    // nextCh是在这里,函数退出前析构的
    defer close(p.nextCh)
    // 临时变量,下面会用到
    var nextCh chan<- interface{}
    var notification interface{}
    // 进入死循环啦
    for {
        select {
        // 有两种情况,nextCh还没有初始化,这个语句就会被阻塞,这个我在《深入浅出golang之chan》说过
        // nextChan后面会赋值为p.nextCh,因为p.nextCh也是无缓冲的chan,数据不发送成功就阻塞                        
        case nextCh <- notification:
            // 如果发送成功了,那就从缓冲中再取一个事件出来
            var ok bool
            notification, ok = p.pendingNotifications.ReadOne()
            if !ok {
                // 如果没有事件,那就把nextCh再次设置为nil,接下来对于nextCh操作还会被阻塞
                nextCh = nil
            }
        // 从p.addCh读取一个事件出来,这回看到消费p.addCh的地方了
        case notificationToAdd, ok := <-p.addCh:
            // 说明p.addCh关闭了,只能退出
            if !ok {
                return
            }
            // notification为空说明当前还没发送任何事件给处理器
            if notification == nil {
                // 那就把刚刚获取的事件通过p.nextCh发送个处理器
                notification = notificationToAdd
                nextCh = p.nextCh
            } else {
                // 上一个事件还没有发送成功,那就先放到缓存中
                // pendingNotifications可以想象为一个slice,这样方便理解,是一个动态的缓存,
                p.pendingNotifications.WriteOne(notificationToAdd)
            }
        }
    }
}

pop()函数实现的非常巧妙,利用一个协程就把接收、缓冲、发送全部解决了。它充分的利用了golang的select可以同时操作多个chan的特性,同时从addChd读取数据从nextCh发送数据,这两个chan任何一个完成都可以激活协程。

  • notification 待通知的事件,来源有2个:
    • p.addCh 直接接收到的新事件
    • pendingNotifications已阻塞缓存的未通知发送的事件
  • pendingNotifications 为缓存阻塞中的事件
  • pendingNotifications.WriteOne表示缓存事件
  • p.pendingNotifications.ReadOne表示从缓存读取事件
  • 通过把notification 发送到 p.nextCh 表示发送
  • notificationToAdd 接收消费最新的p.addCh

接下来,我们看看从nextCh读取事件后是如何处理的:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37

// 代码源自client-go/tools/cache/shared_informer.go
// 这个也是sharedProcessor通过wait.Group启动的
func (p *processorListener) run() {
    // 因为wait.Until需要传入退出信号的chan
    stopCh := make(chan struct{})
    // wait.Until不多说了,我在前期不点的文章中说过了,只要没有收到退出信号就会周期的执行传入的函数
    wait.Until(func() {
        // wait.ExponentialBackoff()和wait.Until()类似,wait.Until()是无限循环
        // wait.ExponentialBackoff()是尝试几次,每次等待时间会以指数上涨
        err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
            // 这也是chan的range用法,可以参看我的《深入浅出golang的chan》了解细节
            for next := range p.nextCh {
                // 判断事件类型,这里面的handler就是调用SharedInfomer.AddEventHandler()传入的
                // 理论上处理的不是Deltas么?怎么变成了其他类型,这是SharedInformer做的二次封装,后面会看到
                switch notification := next.(type) {
                case updateNotification:
                    p.handler.OnUpdate(notification.oldObj, notification.newObj)
                case addNotification:
                    p.handler.OnAdd(notification.newObj)
                case deleteNotification:
                    p.handler.OnDelete(notification.oldObj)
                default:
                    utilruntime.HandleError(fmt.Errorf("unrecognized notification: %#v", next))
                }
            }
 
            return true, nil
        })
 
        // 执行到这里只能是nextCh已经被关闭了,所以关闭stopCh,通知wait.Until()退出
        if err == nil {
            close(stopCh)
        }
    }, 1*time.Minute, stopCh)
}

因为processorListener其他函数没啥大用,上面两个函数就就已经把核心功能都实现了。processorListener就是实现了事件的缓冲和处理,此处的处理就是使用者传入的函数。在没有事件的时候可以阻塞处理器,当事件较多是可以把事件缓冲起来,实现了事件分发器与处理器的异步处理。

processorListener的run()和pop()函数是sharedProcessor启动的协程调用的,所以下面就要对sharedProcessor进行分析了。

总结: 通过p.nextCh方式,processorListener.pop取出事件,然后提供给processorListener.run接收消费事件。这些处理都在sharedProcessor.run中实现,见下面内容。

sharedProcessor

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// client-go/tools/cache/shared_informer.go
// sharedProcessor是通过数组组织处理器的,只是分了需要定时同步和不需要要同步两类
type sharedProcessor struct {
    listenersStarted bool                 // 所有处理器是否已经启动的标识
    listenersLock    sync.RWMutex         // 读写锁
    listeners        []*processorListener // 通用的处理器
    syncingListeners []*processorListener // 需要定时同步的处理器
    clock            clock.Clock          // 时钟
    wg               wait.Group           // 前面讲过了processorListener每个需要两个协程,
                                          // 用wait.Group来管理所有处理器的携程,保证他们都能退出
}

// 代码源自client-go/tools/cache/shared_informer.go
// 添加处理器,sharedIndexInformer.AddEventHandler()就会调用这个函数实现处理器的添加
func (p *sharedProcessor) addListener(listener *processorListener) {
    // 加锁,这个很好理解
    p.listenersLock.Lock()
    defer p.listenersLock.Unlock()
    // 把处理器添加到数组中
    p.addListenerLocked(listener)
    // 通过wait.Group启动两个协程,做的事情我们在processorListener说过了,这里就是我们上面提到的启动两个协程的地方
    // 这个地方判断了listenersStarted,这说明sharedProcessor在启动前、后都可以添加处理器
    if p.listenersStarted {
        p.wg.Start(listener.run)
        p.wg.Start(listener.pop)
    }
}
// 把处理器添加到数组中
func (p *sharedProcessor) addListenerLocked(listener *processorListener) {
    // 两类(定时同步和不同步)的处理器数组都添加了,这是因为没有定时同步的也会用默认的时间,后面我们会看到
    // 那么问题来了,那还用两个数组干什么呢?
    p.listeners = append(p.listeners, listener)
    p.syncingListeners = append(p.syncingListeners, listener)
}

在SharedInformer的接口中有一个与之对应的接口,就是SharedInformer.AddEventHandler()。因为SharedInformer没有删除处理器的接口,sharedProcessor也没有相应接口。接下来就是sharedProcessor的分发事件的接口:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21

// 代码源自client-go/tools/cache/shared_informer.go
// 通过函数名称也能感觉到分发的感觉~sync表示obj对象是否为同步事件对象
func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
    // 加锁没毛病
    p.listenersLock.RLock()
    defer p.listenersLock.RUnlock()
 
    // 无论是否为sync,添加处理器的代码中我们知道两个数组都会被添加,所以判断不判断没啥区别~
    // 所以我的猜测是代码以前实现的是明显区分两类的,但随着代码的更新二者的界限已经没那么明显了
    if sync {
        for _, listener := range p.syncingListeners {
            listener.add(obj)
        }
    } else {
        for _, listener := range p.listeners {
            listener.add(obj)
        }
    }
}

sharedProcessor运行起来后,唯一需要做的就是等待退出信号然后关闭所有的处理器,来看看具体实现代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// 代码源自client-go/tools/cache/shared_informer.go
func (p *sharedProcessor) run(stopCh <-chan struct{}) {
    // 启动前、后对于添加处理器的逻辑是不同,启动前的处理器是不会立刻启动连个协程执行处理器的pop()和run()函数的
    // 而是在这里统一的启动
    func() {
        p.listenersLock.RLock()
        defer p.listenersLock.RUnlock()
        // 遍历所有的处理器,然后为处理器启动两个后台协程
        for _, listener := range p.listeners {
            p.wg.Start(listener.run)
            p.wg.Start(listener.pop)
        }
        p.listenersStarted = true
    }()
    // 等待退出信号
    <-stopCh
    p.listenersLock.RLock()
    defer p.listenersLock.RUnlock()
    // 关闭addCh,processorListener.pop()这个协程就会退出,不明白的可以再次回顾代码
    // 因为processorListener.pop()会关闭processorListener.nextCh,processorListener.run()就会退出
    // 所以这里只要关闭processorListener.addCh就可以自动实现两个协程的退出,不得不说设计的还是挺巧妙的
    for _, listener := range p.listeners {
        close(listener.addCh) 
    }
    // 等待所有的协程退出,这里指的所有协程就是所有处理器的那两个协程
    p.wg.Wait()
}

SharedInformer实现

client-go实现了两个创建SharedInformer的接口,如下所示:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// 代码源自client-go/tools/cache/shared_informer.go
// lw:这个是apiserver客户端相关的,用于Reflector从apiserver获取资源,所以需要外部提供
// objType:这个SharedInformer监控的对象类型
// resyncPeriod:同步周期,SharedInformer需要多长时间给使用者发送一次全量对象的同步时间
func NewSharedInformer(lw ListerWatcher, objType runtime.Object, resyncPeriod time.Duration) SharedInformer {
    // 还是用SharedIndexInformer实现的
    return NewSharedIndexInformer(lw, objType, resyncPeriod, Indexers{})
}
// 创建SharedIndexInformer对象,其中大部分参数再上面的函数已经介绍了
// indexers:需要外部提供计算对象索引键的函数,也就是这里面的对象需要通过什么方式创建索引
func NewSharedIndexInformer(lw ListerWatcher, objType runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
    realClock := &clock.RealClock{}
    sharedIndexInformer := &sharedIndexInformer{
        // 管理所有处理器用的,这个上面的章节解释了
        processor:                       &sharedProcessor{clock: realClock},
        // 其实就是在构造cache,读者可以自行查看NewIndexer()的实现,
        // 在cache中的对象用DeletionHandlingMetaNamespaceKeyFunc计算对象键,用indexers计算索引键
        // 可以想象成每个对象键是Namespace/Name,每个索引键是Namespace,即按照Namesapce分类
        // 因为objType决定了只有一种类型对象,所以Namesapce是最大的分类
        indexer:                         NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
        // 下面这两主要就是给Controller用,确切的说是给Reflector用的
        listerWatcher:                   lw,
        objectType:                      objType,
        // 无论是否需要定时同步,SharedInformer都提供了一个默认的同步时间,当然这个是外部设置的
        resyncCheckPeriod:               defaultEventHandlerResyncPeriod,
        defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
        // 默认没有开启的对象突变检测器,没啥用,也不多介绍
        cacheMutationDetector:           NewCacheMutationDetector(fmt.Sprintf("%T", objType)),
        clock: realClock,
    }
    return sharedIndexInformer
}

创建完ShareInformer对象,就要添加事件处理器了:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
// 代码源自client-go/tools/cache/shared_informer.go
// 添加没有指定同步周期的事件处理器
func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) {
    // defaultEventHandlerResyncPeriod是默认的同步周期,在创建SharedInformer的时候设置的
    s.AddEventHandlerWithResyncPeriod(handler, s.defaultEventHandlerResyncPeriod)
}
// 添加需要定期同步的事件处理器
func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) {
    // 因为是否已经开始对于添加事件处理器的方式不同,后面会有介绍,所以此处加了锁
    s.startedLock.Lock()
    defer s.startedLock.Unlock()
 
    // 如果已经结束了,那就可以直接返回了
    if s.stopped {
        return
    }
    // 如果有同步周期,==0就是永远不用同步
    if resyncPeriod > 0 {
        // 同步周期不能太短,太短对于系统来说反而是个负担,大量的无效计算浪费在这上面
        if resyncPeriod < minimumResyncPeriod {
            resyncPeriod = minimumResyncPeriod
        }
        // SharedInformer管理了很多处理器,每个处理器都有自己的同步周期,所以此处要统一成一个,称之为对齐
        // SharedInformer会选择所有处理器中最小的那个作为所有处理器的同步周期,称为对齐后的同步周期
        // 此处就要判断是不是比当前对齐后的同步周期还要小
        if resyncPeriod < s.resyncCheckPeriod {
            // 如果已经启动了,那么只能用和大家一样的周期
            if s.started {
                resyncPeriod = s.resyncCheckPeriod
            // 如果没启动,那就让大家都用最新的对齐同步周期
            } else {
                s.resyncCheckPeriod = resyncPeriod
                s.processor.resyncCheckPeriodChanged(resyncPeriod)
            }
        }
    }
    // 创建处理器,代码一直用listener,可能想强调没事件就挂起把,我反而想用处理器这个名词
    // determineResyncPeriod()这个函数读者自己分析把,非常简单,这里面只要知道创建了处理器就行了
    listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize)
    // 如果没有启动,那么直接添加处理器就可以了
    if !s.started {
        s.processor.addListener(listener)
        return
    }
 
    // 这个锁就是暂停再想所有的处理器分发事件用的,因为这样会遍历所有的处理器,此时添加会有风险
    s.blockDeltas.Lock()
    defer s.blockDeltas.Unlock()
    // 添加处理器
    s.processor.addListener(listener)
    // 这里有意思啦,遍历缓冲中的所有对象,通知处理器,因为SharedInformer已经启动了,可能很多对象已经让其他的处理器处理过了,
    // 所以这些对象就不会再通知新添加的处理器,此处就是解决这个问题的
    for _, item := range s.indexer.List() {
        listener.add(addNotification{newObj: item})
    }
}

事件处理器添加完了,就要看SharedInformer如何把事件分发给每个处理器的了:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// 代码源自client-go/tools/cache/shared_informer.go
// sharedIndexInformer的核心逻辑函数
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
    defer utilruntime.HandleCrash()
    // 在此处构造的DeltaFIFO
    fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)
    // 这里的Config是我们介绍Reflector时介绍的那个Config
    cfg := &Config{
        // 我前面一直在说Reflector输入到DeltaFIFO,这里算是直接证明了
        Queue:            fifo,            
        // 下面这些变量我们在Reflector都说了,这里赘述
        ListerWatcher:    s.listerWatcher, 
        ObjectType:       s.objectType,
        FullResyncPeriod: s.resyncCheckPeriod,
        RetryOnError:     false,
        ShouldResync:     s.processor.shouldResync,
        // 这个才是重点,Controller调用DeltaFIFO.Pop()接口传入的就是这个回调函数,也是我们后续重点介绍的
        Process: s.HandleDeltas,
    }
    // 创建Controller,这个不用多说了
    func() {
        s.startedLock.Lock()
        defer s.startedLock.Unlock()
 
        s.controller = New(cfg)
        s.controller.(*controller).clock = s.clock
        s.started = true
    }()
    // 这个processorStopCh 是给sharedProcessor和cacheMutationDetector传递退出信号的
    // 因为这里要创建两个协程运行sharedProcessor和cacheMutationDetector的核心函数
    processorStopCh := make(chan struct{})
    var wg wait.Group
    defer wg.Wait()              // Wait for Processor to stop
    defer close(processorStopCh) // Tell Processor to stop
    wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
    wg.StartWithChannel(processorStopCh, s.processor.run)
 
    // Run()函数都退出了,也就应该设置结束的标识了
    defer func() {
        s.startedLock.Lock()
        defer s.startedLock.Unlock()
        s.stopped = true 
    }()
    // 启动Controller,Controller一旦运行,整个流程就开始启动了,所以叫Controller也不为过
    // 毕竟Controller是SharedInformer的发动机嘛
    s.controller.Run(stopCh)
}

sharedIndexInformer通过Run()函数启动了Controller和sharedProcess(),Controller通过DeltaFIFO.Pop()函数弹出Deltas,并调用函数处理,这个处理函数就是sharedIndexInformer.HandleDeltas(),这个函数是衔接Controller和sharedProcess的关键点,他把Deltas转换为sharedProcess需要的各种Notification类型。下面我们就对这个函数进行代码分析:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// 代码源自client-go/tools/cache/shared_informer.go
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
    // 看到这里就知道为啥起名为blockDeltas了,这是阻塞处理器Deltas啊~因为分发事件到处理器,所以要加锁
    s.blockDeltas.Lock()
    defer s.blockDeltas.Unlock()
 
    // Deltas里面包含了一个对象的多个增量操作,所以要从最老的Delta到最先的Delta遍历处理
    for _, d := range obj.(Deltas) {
        // 根据不同的Delta做不同的操作,但是大致分为对象添加、删除两大类操作
        // 所有的操作都要先同步到cache在通知处理器,这样保持处理器和cache的状态是一致的
        switch d.Type {
        // 同步、添加、更新都是对象添加类的造作,至于是否是更新还要看cache是否有这个对象
        case Sync, Added, Updated:
            // 看看对象是不是有定时同步产生的事件
            isSync := d.Type == Sync
            // 检测突变,没啥用
            s.cacheMutationDetector.AddObject(d.Object)
            // 如果cache中有的对象,一律看做是更新事件
            if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
                // 把对象更新到cache中
                if err := s.indexer.Update(d.Object); err != nil {
                    return err
                }
                // 通知处理器处理事件
                s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
            // cache中没有的对象,一律看做是新增事件
            } else {
                // 把对象添加到cache中
                if err := s.indexer.Add(d.Object); err != nil {
                    return err
                }
                // 通知处理器处理器事件
                s.processor.distribute(addNotification{newObj: d.Object}, isSync)
            }
        // 对象被删除
        case Deleted:
            // 从cache中删除对象
            if err := s.indexer.Delete(d.Object); err != nil {
                return err
            }
            // 通知所有的处理器对象被删除了
            s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
        }
    }
    return nil
}

说明

  1. 利用apiserver的api实现资源的列举和监控(Reflector实现);

  2. 利用cache存储apiserver中的部分对象,通过对象类型进行制定,并在cache中采用Namespace做对象的索引

  3. 先通过apiserver的api将对象的全量列举出来存储在cache中,然后再watch资源,一旦有变化就更新cache中;

  4. 更新到cache中的过程通过DeltaFIFO实现的有顺序的更新,因为资源状态是通过全量+增量方式实现同步的,所以顺序错误会造成状态不一致;

  5. 使用者可以注册回调函数(类似挂钩子),在更新到cache的同时通知使用者处理,为了保证回调处理不被某一个处理器阻塞,SharedInformer实现了processorListener异步缓冲处理;

  6. 整个过程是Controller是发动机,驱动整个流程运转;

最后我们还是用一幅图来总结SharedInformer,绝对的干货(其中Reflector.resync()因为是个匿名函数,所以用斜体,其实是不存在这个函数的)~

DeltaFIFO

Delta其实就是kubernetes系统中对象的变化(增、删、改、同步),FIFO比较好理解,是一个先入先出的队列,那么DeltaFIFO就是一个按序的(先入先出)kubernetes对象变化的队列

client-go/tools/cache/delta_fifo.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18

// A KeyListerGetter is anything that knows how to list its keys and look up by key.
type KeyListerGetter interface {
    KeyLister
    KeyGetter
}

// A KeyLister is anything that knows how to list its keys.
type KeyLister interface {
    ListKeys() []string
}

// A KeyGetter is anything that knows how to get the value stored under a given key.
type KeyGetter interface {
    GetByKey(key string) (interface{}, bool, error)
}


上面两个接口在client-go.tools.cache.Store这个接口类型中也存在,也就是说实现了Store接口的类型同时也实现了上面三个接口。上面三个接口基本上就是kv的标准接口,但凡是通过kv方式访问的对象(存储、队列、索引等)多半具备以上接口。肯定有人会问直接使用具体的类型不就完了么,定义这些有什么用?答案很简单,当你需要对kv的对象只读但是不关心具体实现时就用上了

Queue

client-go/tools/cache/fifo.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// Queue is exactly like a Store, but has a Pop() method too.
type Queue interface {
    Store

    // Pop blocks until it has something to process.
    // It returns the object that was process and the result of processing.
    // The PopProcessFunc may return an ErrRequeue{...} to indicate the item
    // should be requeued before releasing the lock on the queue.
    Pop(PopProcessFunc) (interface{}, error)

    // AddIfNotPresent adds a value previously
    // returned by Pop back into the queue as long
    // as nothing else (presumably more recent)
    // has since been added.
    AddIfNotPresent(interface{}) error

    // Return true if the first batch of items has been popped
    HasSynced() bool

    // Close queue
    Close()
}

Queue是在Store基础上扩展了Pop接口可以让对象有序的弹出,Indexer是在Store基础上建立了索引,可以快速检索对象。

DeltaFIFO实现

首先我们想想为什么每个对象一个Deltas而不是Delta?对一个对象的多个操作,什么操作可以合并?

DeltaFIFO生产者和消费者是异步的,如果同一个目标的频繁操作,前面操作还缓存在队列中的时候,那么队列就要缓冲对象的所有操作,那可以将多个操作合并么?这是下面讨论的了;

对于更新这种类型的操作在没有全量基础的情况下是没法合并的,同时我们还不知道具体是什么类型的对象,所以能合并的也就是有添加/删除,两个添加/删除操作其实可以视为一个;

因为系统对于删除的对象有DeletedFinalStateUnknown这个状态,所以会存在两次删除的情况,但是两次添加同一个对象由于apiserver可以保证对象的唯一性,所以处理中就没有考虑合并两次添加操作。 这就是为什么合并操作处理只考虑了Delete

client-go/tools/cache/delta_fifo.go

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796

/*
Copyright 2014 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package cache

import (
    "errors"
    "fmt"
    "sync"

    "k8s.io/apimachinery/pkg/util/sets"

    "github.com/golang/glog"
)

// NewDeltaFIFO returns a Store which can be used process changes to items.
//
// keyFunc is used to figure out what key an object should have. (It's
// exposed in the returned DeltaFIFO's KeyOf() method, with bonus features.)
//
// 'compressor' may compress as many or as few items as it wants
// (including returning an empty slice), but it should do what it
// does quickly since it is called while the queue is locked.
// 'compressor' may be nil if you don't want any delta compression.
//
// 'keyLister' is expected to return a list of keys that the consumer of
// this queue "knows about". It is used to decide which items are missing
// when Replace() is called; 'Deleted' deltas are produced for these items.
// It may be nil if you don't need to detect all deletions.
// TODO: consider merging keyLister with this object, tracking a list of
//       "known" keys when Pop() is called. Have to think about how that
//       affects error retrying.
// TODO(lavalamp): I believe there is a possible race only when using an
//                 external known object source that the above TODO would
//                 fix.
//
// Also see the comment on DeltaFIFO.
func NewDeltaFIFO(keyFunc KeyFunc, compressor DeltaCompressor, knownObjects KeyListerGetter) *DeltaFIFO {
    f := &DeltaFIFO{
        items:           map[string]Deltas{},
        queue:           []string{},
        keyFunc:         keyFunc,
        deltaCompressor: compressor,
        knownObjects:    knownObjects,
    }
    f.cond.L = &f.lock
    return f
}

// DeltaFIFO is like FIFO, but allows you to process deletes.
//
// DeltaFIFO is a producer-consumer queue, where a Reflector is
// intended to be the producer, and the consumer is whatever calls
// the Pop() method.
//
// DeltaFIFO solves this use case:
//  * You want to process every object change (delta) at most once.
//  * When you process an object, you want to see everything
//    that's happened to it since you last processed it.
//  * You want to process the deletion of objects.
//  * You might want to periodically reprocess objects.
//
// DeltaFIFO's Pop(), Get(), and GetByKey() methods return
// interface{} to satisfy the Store/Queue interfaces, but it
// will always return an object of type Deltas.
//
// A note on threading: If you call Pop() in parallel from multiple
// threads, you could end up with multiple threads processing slightly
// different versions of the same object.
//
// A note on the KeyLister used by the DeltaFIFO: It's main purpose is
// to list keys that are "known", for the purpose of figuring out which
// items have been deleted when Replace() or Delete() are called. The deleted
// object will be included in the DeleteFinalStateUnknown markers. These objects
// could be stale.
//
// You may provide a function to compress deltas (e.g., represent a
// series of Updates as a single Update).
type DeltaFIFO struct {
    // lock/cond protects access to 'items' and 'queue'.
    // 读写锁,因为涉及到同时读写,读写锁性能要高
    lock sync.RWMutex
    // 给Pop()接口使用,在没有对象的时候可以阻塞,内部锁复用读写锁
    cond sync.Cond

    // We depend on the property that items in the set are in
    // the queue and vice versa, and that all Deltas in this
    // map have at least one Delta.
    // 存储对象
    // 这个应该是Store的本质了,按照kv的方式存储对象,但是存储的是对象的Deltas数组
    items map[string]Deltas
    // 队列
    //这个是为先入先出实现的,存储的就是对象的键
    queue []string

    // populated is true if the first batch of items inserted by Replace() has been populated
    // or Delete/Add/Update was called first.
    // 通过Replace()接口将第一批对象放入队列,或者第一次调用增、删、改接口时标记为true
    populated bool
    // initialPopulationCount is the number of items inserted by the first call of Replace()
    // 通过Replace()接口将第一批对象放入队列的对象数量
    initialPopulationCount int

    // keyFunc is used to make the key used for queued item
    // insertion and retrieval, and should be deterministic.
    // 对象键计算函数
    //MetaNamespaceKeyFunc
    //meta.GetNamespace() + "/" + meta.GetName()
    keyFunc KeyFunc

    // deltaCompressor tells us how to combine two or more
    // deltas. It may be nil.
    deltaCompressor DeltaCompressor

    // knownObjects list keys that are "known", for the
    // purpose of figuring out which items have been deleted
    // when Replace() or Delete() is called.
    // 前面介绍就是为了这是用,该对象指向的就是Indexer,
    knownObjects KeyListerGetter

    // Indication the queue is closed.
    // Used to indicate a queue is closed so a control loop can exit when a queue is empty.
    // Currently, not used to gate any of CRED operations.
    // 是否已经关闭的标记
    closed     bool
    // 专为关闭设计的锁,这里非读写锁,可能不是读多写少场景
    closedLock sync.Mutex
}

var (
    _ = Queue(&DeltaFIFO{}) // DeltaFIFO is a Queue
)

var (
    // ErrZeroLengthDeltasObject is returned in a KeyError if a Deltas
    // object with zero length is encountered (should be impossible,
    // even if such an object is accidentally produced by a DeltaCompressor--
    // but included for completeness).
    ErrZeroLengthDeltasObject = errors.New("0 length Deltas object; can't get key")
)

// Close the queue.
func (f *DeltaFIFO) Close() {
    f.closedLock.Lock()
    defer f.closedLock.Unlock()
    f.closed = true
    f.cond.Broadcast()
}

// KeyOf exposes f's keyFunc, but also detects the key of a Deltas object or
// DeletedFinalStateUnknown objects.
func (f *DeltaFIFO) KeyOf(obj interface{}) (string, error) {
    // 先用Deltas做一次强行转换
    //DeltaFIFO的计算对象键的方式为什么要先做一次Deltas的类型转换呢?
    //原因很简单,那就是从DeltaFIFO.Pop()出去的对象很可能还要再添加进来(比如处理失败需要再放进来),
    //此时添加的对象就是已经封装好的Deltas。
    if d, ok := obj.(Deltas); ok {
        if len(d) == 0 {
            return "", KeyError{obj, ErrZeroLengthDeltasObject}
        }
        // 只用最新版本的对象就可以了
        // 即该Deltas数组中最后一个元素
        obj = d.Newest().Object
    }
    if d, ok := obj.(DeletedFinalStateUnknown); ok {
        return d.Key, nil
    }
    //MetaNamespaceKeyFunc
    //meta.GetNamespace() + "/" + meta.GetName()
    return f.keyFunc(obj)
}

// Return true if an Add/Update/Delete/AddIfNotPresent are called first,
// or an Update called first but the first batch of items inserted by Replace() has been popped
func (f *DeltaFIFO) HasSynced() bool {
    f.lock.Lock()
    defer f.lock.Unlock()
    // 这里就比较明白了,一次同步全量对象后,并且全部Pop()出去才能算是同步完成
    // 其实这里所谓的同步就是全量内容已经进入Indexer,Indexer已经是系统中对象的全量快照了
    return f.populated && f.initialPopulationCount == 0
}

// Add inserts an item, and puts it in the queue. The item is only enqueued
// if it doesn't already exist in the set.
func (f *DeltaFIFO) Add(obj interface{}) error {
    f.lock.Lock()
    defer f.lock.Unlock()
    // 队列第一次写入操作都要设置标记
    f.populated = true
    return f.queueActionLocked(Added, obj)
}

// Update is just like Add, but makes an Updated Delta.
func (f *DeltaFIFO) Update(obj interface{}) error {
    f.lock.Lock()
    defer f.lock.Unlock()
    // 队列第一次写入操作都要设置标记
    f.populated = true
    return f.queueActionLocked(Updated, obj)
}

// Delete is just like Add, but makes an Deleted Delta. If the item does not
// already exist, it will be ignored. (It may have already been deleted by a
// Replace (re-list), for example.
func (f *DeltaFIFO) Delete(obj interface{}) error {
    id, err := f.KeyOf(obj)
    if err != nil {
        return KeyError{obj, err}
    }
    f.lock.Lock()
    defer f.lock.Unlock()
    // 队列第一次写入操作都要设置标记
    f.populated = true
    // 此处是需要注意的,knownObjects就是Indexer,里面存有已知全部的对象
    if f.knownObjects == nil {
        // 在没有Indexer的条件下只能通过自己存储的对象查一下
        if _, exists := f.items[id]; !exists {
            // Presumably, this was deleted when a relist happened.
            // Don't provide a second report of the same deletion.
            return nil
        }
    } else {
        // We only want to skip the "deletion" action if the object doesn't
        // exist in knownObjects and it doesn't have corresponding item in items.
        // Note that even if there is a "deletion" action in items, we can ignore it,
        // because it will be deduped automatically in "queueActionLocked"
        // 自己(itemsExist)和Indexer里面(exists)有任何一个有这个对象就算存在
        _, exists, err := f.knownObjects.GetByKey(id)
        _, itemsExist := f.items[id]
        if err == nil && !exists && !itemsExist {
            // Presumably, this was deleted when a relist happened.
            // Don't provide a second report of the same deletion.
            // TODO(lavalamp): This may be racy-- we aren't properly locked
            // with knownObjects.
            return nil
        }
    }

    return f.queueActionLocked(Deleted, obj)
}

// AddIfNotPresent inserts an item, and puts it in the queue. If the item is already
// present in the set, it is neither enqueued nor added to the set.
//
// This is useful in a single producer/consumer scenario so that the consumer can
// safely retry items without contending with the producer and potentially enqueueing
// stale items.
//
// Important: obj must be a Deltas (the output of the Pop() function). Yes, this is
// different from the Add/Update/Delete functions.
// 添加不存在的对象
func (f *DeltaFIFO) AddIfNotPresent(obj interface{}) error {
    // 这个要求放入的必须是Deltas数组,就是通过Pop()弹出的对象
    deltas, ok := obj.(Deltas)
    if !ok {
        return fmt.Errorf("object must be of type deltas, but got: %#v", obj)
    }
    // 多个Delta都是一个对象,所以用最新的就可以了
    id, err := f.KeyOf(deltas.Newest().Object)
    if err != nil {
        return KeyError{obj, err}
    }
    f.lock.Lock()
    defer f.lock.Unlock()
    f.addIfNotPresent(id, deltas)
    return nil
}

// addIfNotPresent inserts deltas under id if it does not exist, and assumes the caller
// already holds the fifo lock.
// 这个是添加不存在对象的实现
func (f *DeltaFIFO) addIfNotPresent(id string, deltas Deltas) {
    f.populated = true
    // 这里判断的对象是否存在
    if _, exists := f.items[id]; exists {
        return
    }
    // 放入队列中
    f.queue = append(f.queue, id)
    f.items[id] = deltas
    f.cond.Broadcast()
}

// re-listing and watching can deliver the same update multiple times in any
// order. This will combine the most recent two deltas if they are the same.
func dedupDeltas(deltas Deltas) Deltas {
    // 小于2个delta,那就是1个呗,没啥好合并的
    n := len(deltas)
    if n < 2 {
        return deltas
    }
    // 取出最后两个
    a := &deltas[n-1]
    b := &deltas[n-2]
    // 判断如果是重复的,那就删除这两个delta把合并后的追加到Deltas数组尾部
    if out := isDup(a, b); out != nil {
        // 使用deltas 前n-2 元素构造d,然后再把合并后的 out追加
        d := append(Deltas{}, deltas[:n-2]...)
        return append(d, *out)
    }
    return deltas
}

// If a & b represent the same event, returns the delta that ought to be kept.
// Otherwise, returns nil.
// TODO: is there anything other than deletions that need deduping?
// 判断两个Delta是否是重复的
func isDup(a, b *Delta) *Delta {
    // 只有一个判断,只能判断是否为删除类操作,和我们上面的判断相同
    // 这个函数的本意应该还可以判断多种类型的重复,当前来看只能有删除这一种能够合并
    // 只合并obj前后操作类型都是delete的(即为重复)
    if out := isDeletionDup(a, b); out != nil {
        return out
    }
    // TODO: Detect other duplicate situations? Are there any?
    return nil
}

// keep the one with the most information if both are deletions.
// 判断是否为删除类的重复
func isDeletionDup(a, b *Delta) *Delta {
    // 二者都是删除那肯定有一个是重复的
    if b.Type != Deleted || a.Type != Deleted {
        return nil
    }
    // Do more sophisticated checks, or is this sufficient?
    // 理论上返回最后一个比较好,但是对象已经不再系统监控范围,前一个删除状态是好的
    if _, ok := b.Object.(DeletedFinalStateUnknown); ok {
        return a
    }
    return b
}

// willObjectBeDeletedLocked returns true only if the last delta for the
// given object is Delete. Caller must lock first.
func (f *DeltaFIFO) willObjectBeDeletedLocked(id string) bool {
    deltas := f.items[id]
    return len(deltas) > 0 && deltas[len(deltas)-1].Type == Deleted
}

// queueActionLocked appends to the delta list for the object, calling
// f.deltaCompressor if needed. Caller must lock first.
// 从函数名称来看把“动作”放入队列中,这个动作就是DeltaType,而且已经加锁了
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
    // 1 计算资源对象的key
    id, err := f.KeyOf(obj)
    if err != nil {
        return KeyError{obj, err}
    }

    // If object is supposed to be deleted (last event is Deleted),
    // then we should ignore Sync events, because it would result in
    // recreation of this object.
    // 2 如果操作类型是Sync,而对象被删除,则忽略Sync事件,直接返回
    // 如果是同步,并且对象未来会被删除,那么就直接返回,没必要记录这个动作了
    // 肯定有人会问为什么Add/Delete/Update这些动作可以,因为同步对于已经删除的对象是没有意义的
    // 已经删除的对象后续跟添加、更新有可能,因为同名的对象又被添加了,删除也是有可能
    // 删除有些复杂
    if actionType == Sync && f.willObjectBeDeletedLocked(id) {
        return nil
    }
    // 3 构造Delta{actionType, obj},添加到newDeltas 的items[id]
    // 同一个对象的多次操作,所以要追加到Deltas数组f.items[id]中
    newDeltas := append(f.items[id], Delta{actionType, obj})
    // 4 去重操作
    // 合并操作,去掉冗余的delta
    newDeltas = dedupDeltas(newDeltas)
    if f.deltaCompressor != nil {
        newDeltas = f.deltaCompressor.Compress(newDeltas)
    }
    // 5. 如果有newDeltas,则通知所有消费者,解除阻塞
    // 判断对象是否已经存在
    _, exists := f.items[id]
    // 合并后操作有可能变成没有Delta么?后面的代码分析来看应该不会,所以暂时不知道这个判断目的
    if len(newDeltas) > 0 {
        // 如果对象没有存在过,那就放入队列中,如果存在说明已经在queue中了,也就没必要再添加了
        // f.queue 只保存 obj的id,即key
        if !exists {
            f.queue = append(f.queue, id)
        }
        // 更新Deltas数组,通知所有调用Pop()的消费者模块
        f.items[id] = newDeltas
        f.cond.Broadcast()
    } else if exists {
        // The compression step removed all deltas, so
        // we need to remove this from our map (extra items
        // in the queue are ignored if they are not in the
        // map).
        // 直接把对象删除,这段代码不知道什么条件会进来,因为dedupDeltas()肯定有返回结果的
        // 这个分支不是消费者模块处理的,直接在delta_fifo处理了
        delete(f.items, id)
    }
    return nil
}

// List returns a list of all the items; it returns the object
// from the most recent Delta.
// You should treat the items returned inside the deltas as immutable.
func (f *DeltaFIFO) List() []interface{} {
    f.lock.RLock()
    defer f.lock.RUnlock()
    return f.listLocked()
}

func (f *DeltaFIFO) listLocked() []interface{} {
    list := make([]interface{}, 0, len(f.items))
    for _, item := range f.items {
        // Copy item's slice so operations on this slice (delta
        // compression) won't interfere with the object we return.
        item = copyDeltas(item)
        list = append(list, item.Newest().Object)
    }
    return list
}

// ListKeys returns a list of all the keys of the objects currently
// in the FIFO.
func (f *DeltaFIFO) ListKeys() []string {
    f.lock.RLock()
    defer f.lock.RUnlock()
    list := make([]string, 0, len(f.items))
    for key := range f.items {
        list = append(list, key)
    }
    return list
}

// Get returns the complete list of deltas for the requested item,
// or sets exists=false.
// You should treat the items returned inside the deltas as immutable.
//Get返回请求项的完整deltas列表,或不存在,则将集合exists=false。
//您应该将delta中返回的项视为不可变的。
// 获取对象接口,这个有意思哈,用对象获取对象?如果说用Service对象获取Pod对象是不是就能接受了?
// 因为他们的对象键是相同的
func (f *DeltaFIFO) Get(obj interface{}) (item interface{}, exists bool, err error) {
    key, err := f.KeyOf(obj)
    if err != nil {
        return nil, false, KeyError{obj, err}
    }
    return f.GetByKey(key)
}

// GetByKey returns the complete list of deltas for the requested item,
// setting exists=false if that list is empty.
// You should treat the items returned inside the deltas as immutable.
// 通过对象键获取对象
func (f *DeltaFIFO) GetByKey(key string) (item interface{}, exists bool, err error) {
    f.lock.RLock()
    defer f.lock.RUnlock()
    d, exists := f.items[key]
    if exists {
        // Copy item's slice so operations on this slice (delta
        // compression) won't interfere with the object we return.
        d = copyDeltas(d)
    }
    return d, exists, nil
}

// Checks if the queue is closed
// 判断是否关闭
func (f *DeltaFIFO) IsClosed() bool {
    f.closedLock.Lock()
    defer f.closedLock.Unlock()
    if f.closed {
        return true
    }
    return false
}

// Pop blocks until an item is added to the queue, and then returns it.  If
// multiple items are ready, they are returned in the order in which they were
// added/updated. The item is removed from the queue (and the store) before it
// is returned, so if you don't successfully process it, you need to add it back
// with AddIfNotPresent().
// process function is called under lock, so it is safe update data structures
// in it that need to be in sync with the queue (e.g. knownKeys). The PopProcessFunc
// may return an instance of ErrRequeue with a nested error to indicate the current
// item should be requeued (equivalent to calling AddIfNotPresent under the lock).
//
// Pop returns a 'Deltas', which has a complete list of all the things
// that happened to the object (deltas) while it was sitting in the queue.
// 对queue中的资源对象数据进行process处理
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
    // DeltaFIFO 处理,加锁
    f.lock.Lock()
    defer f.lock.Unlock()
    for {
        // 队列中有数据么?
        for len(f.queue) == 0 {
            // When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
            // When Close() is called, the f.closed is set and the condition is broadcasted.
            // Which causes this loop to continue and return from the Pop().
            // 看来是先判断的是否有数据,后判断是否关闭,这个和chan像
            if f.IsClosed() {
                return nil, FIFOClosedError
            }
            // 没数据那就等待
            // 阻塞处理,直到队列有数据添加,此时len(f.queue) > 0, 跳出该for{},执行后续pop处理流程
            f.cond.Wait()
        }
        // 取出第一个对象
        // 取出id 即object的key
        id := f.queue[0]
        // 数组缩小,相当于把数组中的第一个元素弹出去了
        // 去掉一个pod的id,更新queue
        f.queue = f.queue[1:]
        // 取出对象,因为queue中存的是对象键
        // 根据id 检索item 即某object所有的事件Deltas
        item, ok := f.items[id]
        // 同步对象计数减一,当减到0就说明外部已经全部同步完毕了
        if f.initialPopulationCount > 0 {
            f.initialPopulationCount--
        }
        // 对象不存在,这个是什么情况?貌似我们在合并对象的时候代码上有这个逻辑,估计永远不会执行
        if !ok {
            // Item may have been deleted subsequently.
            continue
        }
        // 把对象删除
        // 删除要处理的item的Deltas
        delete(f.items, id)
        // Pop()需要传入一个回调函数,用于处理对象
        // 处理该id-object的所有事件Deltas
        err := process(item)
        // 如果需要重新入队列,那就重新入队列
        if e, ok := err.(ErrRequeue); ok {
            f.addIfNotPresent(id, item)
            err = e.Err
        }
        // Don't need to copyDeltas here, because we're transferring
        // ownership to the caller.
        return item, err
    }
}

// Replace will delete the contents of 'f', using instead the given map.
// 'f' takes ownership of the map, you should not reference the map again
// after calling this function. f's queue is reset, too; upon return, it
// will contain the items in the map, in no particular order.
func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
    f.lock.Lock()
    defer f.lock.Unlock()
    keys := make(sets.String, len(list))
    // 遍历所有的输入目标
    for _, item := range list {
        // 计算目标键
        key, err := f.KeyOf(item)
        if err != nil {
            return KeyError{item, err}
        }
        // 记录处理过的目标键,采用set存储,是为了后续快速查找
        keys.Insert(key)
        // 因为输入是目标全量,所以每个目标相当于重新同步了一次
        // 每个obj进行同步
        if err := f.queueActionLocked(Sync, item); err != nil {
            return fmt.Errorf("couldn't enqueue object: %v", err)
        }
    }
    // 如果没有存储的话,自己存储的就是所有的老对象,目的要看看哪些老对象不在全量集合中,那么就是删除的对象了
    if f.knownObjects == nil {
        // Do deletion detection against our own list.
        // 遍历所有的元素
        for k, oldItem := range f.items {
            // 这个目标在输入的对象中存在就可以忽略
            // 因为输入对象keys是要更新替换(replace)到存储(indexer)中的obj
            if keys.Has(k) {
                continue
            }
            // 输入对象中没有,说明对象已经被删除了
            var deletedObj interface{}
            if n := oldItem.Newest(); n != nil {
                deletedObj = n.Object
            }
            // 终于看到哪里用到DeletedFinalStateUnknown了,队列中存储对象的Deltas数组中
            // 可能已经存在Delete了,避免重复,采用DeletedFinalStateUnknown这种类型
            // DeletedFinalStateUnknown 状态:是delat_fifo本地检查出该obj要删除,但是没有从list-watch到这个ojb的删除事件(可能丢失或延时)
            if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
                return err
            }
        }
        // 如果populated还没有设置,说明是第一次并且还没有任何修改操作执行过
        // 队列中第1次输入数据,设置标志符号和initialPopulationCount
        if !f.populated {
            f.populated = true
            f.initialPopulationCount = len(list)
        }

        return nil
    }

    // Detect deletions not already in the queue.
    // TODO(lavalamp): This may be racy-- we aren't properly locked
    // with knownObjects. Unproven.
    // 下面处理的就是检测某些目标删除但是Delta没有在队列中
    // 从存储中获取所有对象键
    knownKeys := f.knownObjects.ListKeys()
    queuedDeletions := 0
    for _, k := range knownKeys {
        // knownKeys存储的对象还在目标对象keys中,那就忽略
        if keys.Has(k) {
            continue
        }
        // 获取待deletedObj对象
        deletedObj, exists, err := f.knownObjects.GetByKey(k)
        if err != nil {
            deletedObj = nil
            glog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k)
        } else if !exists {
            deletedObj = nil
            glog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k)
        }
        // 累积删除的对象数量
        queuedDeletions++
        // 把对象删除的Delta放入队列
        if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
            return err
        }
    }
    // 和上面的代码差不多,只是计算initialPopulationCount值的时候增加了删除对象的数量
    if !f.populated {
        f.populated = true
        f.initialPopulationCount = len(list) + queuedDeletions
    }

    return nil
}

// Resync will send a sync event for each item
// 重新同步,这个在cache实现是空的,这里面有具体实现
// 把knownObjects的objs同步更新到delta_fifo中
func (f *DeltaFIFO) Resync() error {
    f.lock.Lock()
    defer f.lock.Unlock()
    // 如果没有Indexer那么重新同步是没有意义的,因为连同步了哪些对象都不知道
    // 列举Indexer里面所有的对象键
    keys := f.knownObjects.ListKeys()
    // 遍历对象键,为每个对象产生一个同步的Delta
    for _, k := range keys {
        // 具体对象同步实现接口
        if err := f.syncKeyLocked(k); err != nil {
            return err
        }
    }
    return nil
}

func (f *DeltaFIFO) syncKey(key string) error {
    f.lock.Lock()
    defer f.lock.Unlock()

    return f.syncKeyLocked(key)
}
// 具体对象同步实现接口
func (f *DeltaFIFO) syncKeyLocked(key string) error {
    // 获取对象
    obj, exists, err := f.knownObjects.GetByKey(key)
    if err != nil {
        glog.Errorf("Unexpected error %v during lookup of key %v, unable to queue object for sync", err, key)
        return nil
    } else if !exists {
        glog.Infof("Key %v does not exist in known objects store, unable to queue object for sync", key)
        return nil
    }

    // If we are doing Resync() and there is already an event queued for that object,
    // we ignore the Resync for it. This is to avoid the race, in which the resync
    // comes with the previous value of object (since queueing an event for the object
    // doesn't trigger changing the underlying store <knownObjects>.
    // 计算对象的键值,有人会问对象键不是已经传入了么?那个是存在Indexer里面的对象键,可能与这里的计算方式不同
    id, err := f.KeyOf(obj)
    if err != nil {
        return KeyError{obj, err}
    }
    // 对象已经在存在,说明后续会通知对象的新变化,所以再加更新也没意义
    if len(f.items[id]) > 0 {
        return nil
    }
    // 添加对象同步的这个Delta
    if err := f.queueActionLocked(Sync, obj); err != nil {
        return fmt.Errorf("couldn't queue object: %v", err)
    }
    return nil
}

// A KeyListerGetter is anything that knows how to list its keys and look up by key.
type KeyListerGetter interface {
    KeyLister
    KeyGetter
}

// A KeyLister is anything that knows how to list its keys.
type KeyLister interface {
    ListKeys() []string
}

// A KeyGetter is anything that knows how to get the value stored under a given key.
type KeyGetter interface {
    GetByKey(key string) (interface{}, bool, error)
}

// DeltaCompressor is an algorithm that removes redundant changes.
type DeltaCompressor interface {
    Compress(Deltas) Deltas
}

// DeltaCompressorFunc should remove redundant changes; but changes that
// are redundant depend on one's desired semantics, so this is an
// injectable function.
//
// DeltaCompressorFunc adapts a raw function to be a DeltaCompressor.
type DeltaCompressorFunc func(Deltas) Deltas

// Compress just calls dc.
func (dc DeltaCompressorFunc) Compress(d Deltas) Deltas {
    return dc(d)
}

// DeltaType is the type of a change (addition, deletion, etc)
type DeltaType string

const (
    Added   DeltaType = "Added"
    Updated DeltaType = "Updated"
    Deleted DeltaType = "Deleted"
    // The other types are obvious. You'll get Sync deltas when:
    //  * A watch expires/errors out and a new list/watch cycle is started.
    //  * You've turned on periodic syncs.
    // (Anything that trigger's DeltaFIFO's Replace() method.)
    Sync DeltaType = "Sync"
)

// Delta is the type stored by a DeltaFIFO. It tells you what change
// happened, and the object's state after* that change.
//
// [*] Unless the change is a deletion, and then you'll get the final
//     state of the object before it was deleted.
type Delta struct {
    Type   DeltaType
    Object interface{}
}

// Deltas is a list of one or more 'Delta's to an individual object.
// The oldest delta is at index 0, the newest delta is the last one.
type Deltas []Delta

// Oldest is a convenience function that returns the oldest delta, or
// nil if there are no deltas.
func (d Deltas) Oldest() *Delta {
    if len(d) > 0 {
        return &d[0]
    }
    return nil
}

// Newest is a convenience function that returns the newest delta, or
// nil if there are no deltas.
func (d Deltas) Newest() *Delta {
    if n := len(d); n > 0 {
        return &d[n-1]
    }
    return nil
}

// copyDeltas returns a shallow copy of d; that is, it copies the slice but not
// the objects in the slice. This allows Get/List to return an object that we
// know won't be clobbered by a subsequent call to a delta compressor.
func copyDeltas(d Deltas) Deltas {
    d2 := make(Deltas, len(d))
    copy(d2, d)
    return d2
}

// DeletedFinalStateUnknown is placed into a DeltaFIFO in the case where
// an object was deleted but the watch deletion event was missed. In this
// case we don't know the final "resting" state of the object, so there's
// a chance the included `Obj` is stale.
type DeletedFinalStateUnknown struct {
    Key string
    Obj interface{}
}


主要处理函数

  • queueActionLocked:生产者方法,relactor调用,主要处理obj的的入队和保存

  • pop:消费者方法,conroller通过DeltaFIFO的process回调函数HandleDeltas处理

  • 从Replace()的实现来看,主要用于实现对象的全量更新。这个可以理解为DeltaFIFO在必要的时刻做一次全量更新,这个时刻可以是定期的,也可以是事件触发的。由于DeltaFIFO对外输出的就是所有目标的增量变化,所以每次全量更新都要判断对象是否已经删除,因为在全量更新前可能没有收到目标删除的请求。这一点与cache不同,cache的Replace()相当于重建,因为cache就是对象全量的一种内存映射,所以Replace()就等于重建。

  • 那我来问题一个非常有水平的问题,为什么knownObjects为nil时需要对比队列和对象全量来判断对象是否删除,而knownObjects不为空的时候就不需要了?如果读者想判断自己是否已经全部理解可以不看下面自己想想。

    • knownObjects就是Indexer(具体实现是cache),而开篇的那副图已经非常明确的描述了二者以及使用之间的关系。也就是说knownObjects有的对象就是使用者知道的所有对象,此时即便队列(DeltaFIFO)中有相应的对象,在更新的全量对象中又被删除了,那就没必要通知使用者对象删除了,这种情况可以假想为系统短时间添加并删除了对象,对使用者来说等同于没有这个对象。

说明

  • 判断是否已同步populated和initialPopulationCount这两个变量存在的目的是什么?我的理解是否已同步指的是第一次从apiserver获取全量对象是否已经全部通知到外部,也就是通过Pop()被取走。所谓的同步就是指apiserver的状态已经同步到缓存中了,也就是Indexer中;
  • 接口AddIfNotPresent()存在的目的是什么,只有在Pop()函数中使用了一次,但是在调用这个接口的时候已经从map中删除了,所以肯定不存在。这个接口在我看来主要用来保险的,因为Pop()本身就存在重入队列的可能,外部如果判断返回错误重入队列就可能会重复; 最后,我们还是用一幅图来总结一下

Index

index 就是带索引器的本地缓存

index

tools/cache/index.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97


package cache

import (
    "fmt"

    "k8s.io/apimachinery/pkg/api/meta"
    "k8s.io/apimachinery/pkg/util/sets"
)

// Indexer is a storage interface that lets you list objects using multiple indexing functions
type Indexer interface {
    Store
    // Retrieve list of objects that match on the named indexing function
    // indexName索引类,obj是对象,计算obj在indexName索引类中的索引键,通过索引键把所有的对象取出来
    // 基本就是获取符合obj特征的所有对象,所谓的特征就是对象在索引类中的索引键
    Index(indexName string, obj interface{}) ([]interface{}, error)
    // IndexKeys returns the set of keys that match on the named indexing function.
    // indexKey是indexName索引类中一个索引键,函数返回indexKey指定的所有对象键
    // 这个对象键是Indexer内唯一的,在添加的时候会计算
    IndexKeys(indexName, indexKey string) ([]string, error)
    // ListIndexFuncValues returns the list of generated values of an Index func
    // 获取indexName索引类中的所有索引键
    ListIndexFuncValues(indexName string) []string
    // ByIndex lists object that match on the named indexing function with the exact key
    // 这个函数和Index类似,只是返回值不是对象键,而是所有对象
    ByIndex(indexName, indexKey string) ([]interface{}, error)
    // GetIndexer return the indexers
    // 返回Indexers
    GetIndexers() Indexers

    // AddIndexers adds more indexers to this store.  If you call this after you already have data
    // in the store, the results are undefined.
    // 添加Indexers,就是增加更多的索引分类
    AddIndexers(newIndexers Indexers) error
}

// IndexFunc knows how to provide an indexed value for an object.
type IndexFunc func(obj interface{}) ([]string, error)

// IndexFuncToKeyFuncAdapter adapts an indexFunc to a keyFunc.  This is only useful if your index function returns
// unique values for every object.  This is conversion can create errors when more than one key is found.  You
// should prefer to make proper key and index functions.
func IndexFuncToKeyFuncAdapter(indexFunc IndexFunc) KeyFunc {
    return func(obj interface{}) (string, error) {
        indexKeys, err := indexFunc(obj)
        if err != nil {
            return "", err
        }
        if len(indexKeys) > 1 {
            return "", fmt.Errorf("too many keys: %v", indexKeys)
        }
        if len(indexKeys) == 0 {
            return "", fmt.Errorf("unexpected empty indexKeys")
        }
        return indexKeys[0], nil
    }
}

const (
    NamespaceIndex string = "namespace"
)

// MetaNamespaceIndexFunc is a default index function that indexes based on an object's namespace
func MetaNamespaceIndexFunc(obj interface{}) ([]string, error) {
    meta, err := meta.Accessor(obj)
    if err != nil {
        return []string{""}, fmt.Errorf("object has no meta: %v", err)
    }
    return []string{meta.GetNamespace()}, nil
}

// Index maps the indexed value to a set of keys in the store that match on that value
// sets.String 保存的是 本地缓存的key set,
// 这个map的key 即为索引值keyvalue,比如usernmae1
// map[string]sets.String map[username1]{default/pod1, default/pod2}
// Index 索引器,包含不同keyvalue的map
type Index map[string]sets.String

// sets.String is a set of strings, implemented via map[string]struct{} for minimal memory consumption.
// sets.String 保存的是 本地缓存的key set,即cache items存储obj的key
//type String map[string]Empty

// Indexers maps a name to a IndexFunc
// 计算索引的函数有很多, 采用map,用名字分类
type Indexers map[string]IndexFunc
// 计算索引的函数,传入对象,输出字符串索引,注意是数组哦!
//type IndexFunc func(obj interface{}) ([]string, error)

// Indices maps a name to an Index
// 由于有多种计算索引的方式,那就又要按照计算索引的方式组织索引
// Indices的 key 即为 Indexers的key
// Indices 索引器map,包含不同命名的索引器
type Indices map[string]Index


  • Indexer->Store就是缓存接口
  • cache实现了store接口
  • 数据对象缓存到了cache.cacheStorage ThreadSafeStore
  • ThreadSafeStore 是线程安全的

所谓索引,索引目的就是为了快速查找。比如,我们需要查找某个节点上的所有Pod,那就要Pod按照节点名称排序,对应上面的Index类型就是map[nodename]sets.podname。我们可能有很多种查找方式,这就是Indexers这个类型作用了。

  • IndexFunc1…..这些都是索引函数的名称,我们称之为索引类,大概意思就是把索引分类了;
  • IndexKey1….这些是同一个对象在同一个索引类中的多个索引键值,我们称为索引键,切记索引键有多个;
  • ObjKey1…..这些是对象键,每个对象都有唯一的名称;

Indexers和Indices都是按照IndexFunc(名字)分组, 每个IndexFunc输出多个IndexKey,产生相同IndexKey的多个对象存储在一个集合中。

store

tools/cache/store.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package cache

import (
    "fmt"
    "strings"

    "k8s.io/apimachinery/pkg/api/meta"
)

// Store is a generic object storage interface. Reflector knows how to watch a server
// and update a store. A generic store is provided, which allows Reflector to be used
// as a local caching system, and an LRU store, which allows Reflector to work like a
// queue of items yet to be processed.
//
// Store makes no assumptions about stored object identity; it is the responsibility
// of a Store implementation to provide a mechanism to correctly key objects and to
// define the contract for obtaining objects by some arbitrary key type.
type Store interface {
    Add(obj interface{}) error
    Update(obj interface{}) error
    Delete(obj interface{}) error
    List() []interface{}
    // 列举对象键
    ListKeys() []string
    // 返回obj相同对象键的对象,对象键是通过对象计算出来的字符串
    Get(obj interface{}) (item interface{}, exists bool, err error)
    // 通过对象键获取对象
    GetByKey(key string) (item interface{}, exists bool, err error)

    // Replace will delete the contents of the store, using instead the
    // given list. Store takes ownership of the list, you should not reference
    // it after calling this function.
    // 用[]interface{}替换Store存储的所有对象,等同于删除全部原有对象在逐一添加新的对象
    Replace([]interface{}, string) error
    // 重新同步
    Resync() error
}

// KeyFunc knows how to make a key from an object. Implementations should be deterministic.
type KeyFunc func(obj interface{}) (string, error)



// cache responsibilities are limited to:
//  1. Computing keys for objects via keyFunc
//  2. Invoking methods of a ThreadSafeStorage interface
type cache struct {
    // cacheStorage bears the burden of thread safety for the cache
    cacheStorage ThreadSafeStore
    // keyFunc is used to make the key for objects stored in and retrieved from items, and
    // should be deterministic.
    keyFunc KeyFunc
}

var _ Store = &cache{}

thread_safe_store

tools/cache/thread_safe_store.go

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
/*
Copyright 2014 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package cache

import (
    "fmt"
    "sync"

    "k8s.io/apimachinery/pkg/util/sets"
)

// ThreadSafeStore is an interface that allows concurrent access to a storage backend.
// TL;DR caveats: you must not modify anything returned by Get or List as it will break
// the indexing feature in addition to not being thread safe.
//
// The guarantees of thread safety provided by List/Get are only valid if the caller
// treats returned items as read-only. For example, a pointer inserted in the store
// through `Add` will be returned as is by `Get`. Multiple clients might invoke `Get`
// on the same key and modify the pointer in a non-thread-safe way. Also note that
// modifying objects stored by the indexers (if any) will *not* automatically lead
// to a re-index. So it's not a good idea to directly modify the objects returned by
// Get/List, in general.
type ThreadSafeStore interface {
    Add(key string, obj interface{})
    Update(key string, obj interface{})
    Delete(key string)
    Get(key string) (item interface{}, exists bool)
    List() []interface{}
    ListKeys() []string
    Replace(map[string]interface{}, string)
    Index(indexName string, obj interface{}) ([]interface{}, error)
    IndexKeys(indexName, indexKey string) ([]string, error)
    ListIndexFuncValues(name string) []string
    ByIndex(indexName, indexKey string) ([]interface{}, error)
    GetIndexers() Indexers

    // AddIndexers adds more indexers to this store.  If you call this after you already have data
    // in the store, the results are undefined.
    AddIndexers(newIndexers Indexers) error
    Resync() error
}

// threadSafeMap implements ThreadSafeStore
type threadSafeMap struct {
    lock  sync.RWMutex
    // key为keyfunc计算得出,namespace/name
    items map[string]interface{}

    // indexers maps a name to an IndexFunc
    // 索引器函数,用于计算索引key,如按注解byuser,构建索引
    indexers Indexers
    // indices maps a name to an Index
    indices Indices
}

func (c *threadSafeMap) Add(key string, obj interface{}) {
    c.lock.Lock()
    defer c.lock.Unlock()
    oldObject := c.items[key]
    c.items[key] = obj
    c.updateIndices(oldObject, obj, key)
}

func (c *threadSafeMap) Update(key string, obj interface{}) {
    c.lock.Lock()
    defer c.lock.Unlock()
    oldObject := c.items[key]
    c.items[key] = obj
    c.updateIndices(oldObject, obj, key)
}

func (c *threadSafeMap) Delete(key string) {
    c.lock.Lock()
    defer c.lock.Unlock()
    if obj, exists := c.items[key]; exists {
        c.deleteFromIndices(obj, key)
        delete(c.items, key)
    }
}

func (c *threadSafeMap) Get(key string) (item interface{}, exists bool) {
    c.lock.RLock()
    defer c.lock.RUnlock()
    item, exists = c.items[key]
    return item, exists
}

func (c *threadSafeMap) List() []interface{} {
    c.lock.RLock()
    defer c.lock.RUnlock()
    list := make([]interface{}, 0, len(c.items))
    for _, item := range c.items {
        list = append(list, item)
    }
    return list
}

// ListKeys returns a list of all the keys of the objects currently
// in the threadSafeMap.
func (c *threadSafeMap) ListKeys() []string {
    c.lock.RLock()
    defer c.lock.RUnlock()
    list := make([]string, 0, len(c.items))
    for key := range c.items {
        list = append(list, key)
    }
    return list
}

func (c *threadSafeMap) Replace(items map[string]interface{}, resourceVersion string) {
    c.lock.Lock()
    defer c.lock.Unlock()
    c.items = items

    // rebuild any index
    c.indices = Indices{}
    for key, item := range c.items {
        c.updateIndices(nil, item, key)
    }
}

// Index returns a list of items that match on the index function
// Index is thread-safe so long as you treat all items as immutable
// 这个函数就是通过指定的索引函数计算对象的索引键,然后把索引键的对象全部取出来
// 利用一个对象计算出来的索引键,然后把所有具备这些索引键的对象全部取出来,
// 比如取出一个Pod所在节点上的所有Pod,这样理解就会非常方便,但是kubernetes可能就不这么用。
// 再比如取出满足某个标签或者注解的所有Pod
// 如果更抽象一点,就是符合对象某些特征的所有对象,而这个特征就是我们指定的索引函数计算出来的
func (c *threadSafeMap) Index(indexName string, obj interface{}) ([]interface{}, error) {
    c.lock.RLock()
    defer c.lock.RUnlock()
    // 取出indexName这个分类索引函数
    indexFunc := c.indexers[indexName]
    if indexFunc == nil {
        return nil, fmt.Errorf("Index with name %s does not exist", indexName)
    }
    // 计算对象的索引键
    indexKeys, err := indexFunc(obj)
    if err != nil {
        return nil, err
    }
    // 取出indexName这个分类所有索引
    index := c.indices[indexName]

    // need to de-dupe the return list.  Since multiple keys are allowed, this can happen.
    // 返回对象的对象键的集合
    returnKeySet := sets.String{}
    // 遍历刚刚计算出来的所有索引键
    for _, indexKey := range indexKeys {
        // 取出索引键的所有对象键,数组形式
        set := index[indexKey]
        for _, key := range set.UnsortedList() {
            // 这里的每个key 代表一个obj的key,标识一个obj
            // 把所有的对象键输出到对象键的集合中
            returnKeySet.Insert(key)
        }
    }
    // 通过对象键逐一的把对象取出
    // 根据obj key,从items取出obj,放入list中
    list := make([]interface{}, 0, returnKeySet.Len())
    for absoluteKey := range returnKeySet {
        list = append(list, c.items[absoluteKey])
    }
    return list, nil
}

// ByIndex returns a list of items that match an exact value on the index function
// 这个函数和上面的函数基本一样,只是索引键不用再计算了,使用者提供
func (c *threadSafeMap) ByIndex(indexName, indexKey string) ([]interface{}, error) {
    c.lock.RLock()
    defer c.lock.RUnlock()

    indexFunc := c.indexers[indexName]
    if indexFunc == nil {
        return nil, fmt.Errorf("Index with name %s does not exist", indexName)
    }

    index := c.indices[indexName]
    // 根据indexkey索引key,找到object items的key,然后再从c.items中取出
    set := index[indexKey]
    list := make([]interface{}, 0, set.Len())
    for _, key := range set.List() {
        list = append(list, c.items[key])
    }

    return list, nil
}

// IndexKeys returns a list of keys that match on the index function.
// IndexKeys is thread-safe so long as you treat all items as immutable.
func (c *threadSafeMap) IndexKeys(indexName, indexKey string) ([]string, error) {
    c.lock.RLock()
    defer c.lock.RUnlock()

    indexFunc := c.indexers[indexName]
    if indexFunc == nil {
        return nil, fmt.Errorf("Index with name %s does not exist", indexName)
    }

    index := c.indices[indexName]

    set := index[indexKey]
    return set.List(), nil
}
// 这个函数用来获取索引分类内的所有索引键的
func (c *threadSafeMap) ListIndexFuncValues(indexName string) []string {
    c.lock.RLock()
    defer c.lock.RUnlock()
    // 获取索引分类的所有索引
    index := c.indices[indexName]
    names := make([]string, 0, len(index))
    // 直接遍历后输出索引键
    for key := range index {
        names = append(names, key)
    }
    return names
}

func (c *threadSafeMap) GetIndexers() Indexers {
    return c.indexers
}

func (c *threadSafeMap) AddIndexers(newIndexers Indexers) error {
    c.lock.Lock()
    defer c.lock.Unlock()

    if len(c.items) > 0 {
        return fmt.Errorf("cannot add indexers to running index")
    }

    oldKeys := sets.StringKeySet(c.indexers)
    newKeys := sets.StringKeySet(newIndexers)

    if oldKeys.HasAny(newKeys.List()...) {
        return fmt.Errorf("indexer conflict: %v", oldKeys.Intersection(newKeys))
    }

    for k, v := range newIndexers {
        c.indexers[k] = v
    }
    return nil
}

// updateIndices modifies the objects location in the managed indexes, if this is an update, you must provide an oldObj
// updateIndices must be called from a function that already has a lock on the cache
// 当有对象添加或者更新时,需要更新索引,因为调用该函数的函数已经加锁了,所以这个函数没有加锁操作
func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, key string) error {
    // 在添加和更新的时候都会获取老对象,如果存在老对象,那么就要删除老对象的索引
    // key 就是本地缓存caech item的object的key
    // 如果已有old object 则删除,后面进行更新
    // if we got an old object, we need to remove it before we add it again
    if oldObj != nil {
        c.deleteFromIndices(oldObj, key)
    }
    // 遍历所有的索引函数,因为要为对象在所有的索引分类中创建索引键
    // 索引器函数indexFunc, 如name为byuser,fnc为 UserIndexFunc
    for name, indexFunc := range c.indexers {
        // 计算索引键
        // 根据索引器 检索出的索引值,比如根据pod object 的user注解,检索出username 列表
        indexValues, err := indexFunc(newObj)
        if err != nil {
            return err
        }
        // 获取索引分类的所有索引
        index := c.indices[name]
        // 为空说明这个索引分类还没有任何索引
        if index == nil {
            index = Index{}
            // 按name 构建索引器,比如按user注解构建的name=byuser 的索引器
            c.indices[name] = index
        }
        // 遍历对象的索引键,上面刚刚用索引函数计算出来的,如注解users的各个值,即indexvalue
        for _, indexValue := range indexValues {
            // 找到索引键的对象集合
            set := index[indexValue]
            // 为空说明这个索引键下还没有对象
            if set == nil {
                // 创建对象键集合
                set = sets.String{}
                index[indexValue] = set
            }
            // 把这个obj的对象键添加到集合中,即这个obj的name类型索引已创建完成
            set.Insert(key)
        }
    }
    return nil
}

// deleteFromIndices removes the object from each of the managed indexes
// it is intended to be called from a function that already has a lock on the cache
func (c *threadSafeMap) deleteFromIndices(obj interface{}, key string) error {
    // 遍历索引函数,也就是把所有索引分类
    for name, indexFunc := range c.indexers {
        indexValues, err := indexFunc(obj)
        if err != nil {
            return err
        }
        // 获取索引分类的所有索引
        index := c.indices[name]
        if index == nil {
            continue
        }
        // 遍历对象的索引键
        for _, indexValue := range indexValues {
            //把对象从索引键指定对对象集合删除
            set := index[indexValue]
            if set != nil {
                set.Delete(key)
            }
        }
    }
    return nil
}

func (c *threadSafeMap) Resync() error {
    // Nothing to do
    return nil
}

func NewThreadSafeStore(indexers Indexers, indices Indices) ThreadSafeStore {
    return &threadSafeMap{
        items:    map[string]interface{}{},
        indexers: indexers,
        indices:  indices,
    }
}

-store -> threadSafeMap 通过 updateIndices、deleteFromIndices方法来更新或删除本地存储的obj和索引器

说明

索引键和对象键是两个重要概念

  • indexkey: 索引键是用于对象快速查找的,经过索引建在map中排序查找会更快;
  • objkey: 对象键是为对象在存储中的唯一命名的,对象是通过名字+对象的方式存储的。默认格式为:namespace/name

kubernetes中主要的索引函数,最主要的索引的函数大概就下面几种:

  • MetaNamespaceIndexFunc,定义在client-go/tools/cache/index.go中,从名字看就是获取对象元数据的namesapce字段,也就是所有对象以namespace作为索引键,这个就很好理解了;
  • indexByPodNodeName,定义在kubernetes/pkg/controller/daemon/deamon_controller.go,该索引函数计算的是Pod对象所在节点的名字; 为了方便理解,我们可以假设kubernetes主要就是一种索引函数(MetaNamespaceIndexFunc),也就是在索引中大部分就一个分类,这个分类的索引键就是namesapce。那么有人肯定会问,如果这样的话,所有的对象都存在一个namesapce索引键下面,这样的效率岂不是太低了?其实client-go为每类对象都创建了Informer(Informer内有Indexer),所以即便存储在相同namesapce下的对象都是同一类,这个问题自然也就没有了,详情可以看我针对Informer写的文章。

注意:一定要区分MetaNamespaceIndexFunc和MetaNamespaceKeyFunc的区分,第一个索引键计算函数,第二个是对象键计算函数,第一个返回的是namespace,第二个返回的是对象包含namespace在内的对象全称。
所有的对象(Pod、Node、Service等等)都是有属性/标签的,如果属性/标签就是索引键,Indexer就会把相同属性/标签的所有对象放在一个集合中,如果在对属性/标签分一下类,也就就是我们本文的将的Indexer的核心内容了。甚至你可以简单的理解为Indexer就是简单的把相同namesapce对象放在一个集合中,kubernetes就是基于属性/标签/注解来检索的

参考资料