|
4 | 4 |
|
5 | 5 | - [SharedInformer](#sharedinformer) |
6 | 6 | - [processorListener](#processorlistener) |
| 7 | + - [add() 方法](#add-方法) |
| 8 | + - [pop() 方法](#pop-方法) |
| 9 | + - [run() 方法](#run-方法) |
7 | 10 | - [sharedProcessor](#sharedprocessor) |
| 11 | + - [addListener() 和 addListenerLocked() 方法](#addlistener-和-addlistenerlocked-方法) |
| 12 | + - [distribute() 方法](#distribute-方法) |
| 13 | + - [run() 方法](#run-方法-1) |
| 14 | + - [shouldResync() 方法](#shouldresync-方法) |
8 | 15 | - [SharedInformer 和 SharedIndexInformer](#sharedinformer-和-sharedindexinformer) |
9 | 16 | - [实现 SharedIndexInformer 接口的 sharedIndexInformer 类型](#实现-sharedindexinformer-接口的-sharedindexinformer-类型) |
10 | 17 | - [Run() 方法](#run-方法) |
@@ -32,7 +39,258 @@ SharedInformer 提供一个共享的对象缓存,并且可以将缓存中对 |
32 | 39 | SharedInformer 和 SharedIndexInformer 一般和 workqueue 同时使用,具体参考:[8.customize-controller.md](8.customize-controller.md) |
33 | 40 |
|
34 | 41 | ## processorListener |
| 42 | + |
| 43 | +processorListener 封装了监听器处理函数 ResourceEventHandler 以及 RingGrowing 类型的循环队列。 |
| 44 | + |
| 45 | +它通过 addCh Channel 接收对象,保存到循环队列 pendingNotifications 中缓存(队列大),然后 pop 到 nextCh,最后 run() 方法获得该对象,调用监听器函数。 |
| 46 | + |
| 47 | +``` go |
| 48 | +// 来源于:k8s.io/client-go/tools/cache/shared_informer.go |
| 49 | +type processorListener struct { |
| 50 | + // nextCh 保存从 pendingNotifications.ReadOne() 读取的对象; |
| 51 | + nextCh chan interface{} |
| 52 | + // addCh 用于接收 add() 方法发送的对象,pop 方法读取它后 endingNotifications.WriteOne(notificationToAdd) 该对象; |
| 53 | + addCh chan interface{} |
| 54 | + |
| 55 | + // 用户实际配置的回调函数 |
| 56 | + handler ResourceEventHandler |
| 57 | + |
| 58 | + // 循环队列,默认缓存 1024 个对象,从而提供了事件缓冲的能力 |
| 59 | + pendingNotifications buffer.RingGrowing |
| 60 | + |
| 61 | + // 创建 listner 时,用户指定的 Resync 周期 |
| 62 | + requestedResyncPeriod time.Duration |
| 63 | + |
| 64 | + // 该 listener 实际使用的 Resync 周期,一般是所有 listner 周期的最小值,所以可能与 requestedResyncPeriod 不同 |
| 65 | + resyncPeriod time.Duration |
| 66 | + |
| 67 | + // nextResync is the earliest time the listener should get a full resync |
| 68 | + nextResync time.Time |
| 69 | + // resyncLock guards access to resyncPeriod and nextResync |
| 70 | + resyncLock sync.Mutex |
| 71 | +} |
| 72 | +``` |
| 73 | + |
| 74 | +### add() 方法 |
| 75 | + |
| 76 | +add() 方法将通知对象写入到 p.addCh Channel 中,如果 pendingNotifications 未满(默认 1024)则可以直接写入,而**不需要等待** ResourceEventHandler 处理完毕。 |
| 77 | + |
| 78 | +``` go |
| 79 | +// 来源于:k8s.io/client-go/tools/cache/shared_informer.go |
| 80 | +func (p *processorListener) add(notification interface{}) { |
| 81 | + p.addCh <- notification |
| 82 | +} |
| 83 | +``` |
| 84 | + |
| 85 | +### pop() 方法 |
| 86 | + |
| 87 | +pop() 方法是 processorListener 的核心方法,它实现了: |
| 88 | + |
| 89 | +1. 从 p.addCh Channel 读取数据,存入循环队列 p.pendingNotifications; |
| 90 | +2. 从 循环队列 p.pendingNotifications 取数据,写入 p.nextCh,供后续 run() 方法读取; |
| 91 | + |
| 92 | +run() 从 p.nextCh 读取对象,然后调用用户的处理函数,执行时间可能较长,而 pop() 方法通过 channel selector 机制以及循环队列,巧妙地实现了**异步非阻塞**的写入和读取对象。 |
| 93 | + |
| 94 | + |
| 95 | +``` go |
| 96 | +// 来源于:k8s.io/client-go/tools/cache/shared_informer.go |
| 97 | +func (p *processorListener) pop() { |
| 98 | + defer utilruntime.HandleCrash() |
| 99 | + defer close(p.nextCh) // Tell .run() to stop |
| 100 | + |
| 101 | + var nextCh chan<- interface{} |
| 102 | + var notification interface{} |
| 103 | + for { |
| 104 | + select { |
| 105 | + // 先将对象写入 p.nextCh,然后从循环对内中读取一个对象,下一次执行另一个 case 时写入 p.nextCh |
| 106 | + case nextCh <- notification: |
| 107 | + // Notification dispatched |
| 108 | + var ok bool |
| 109 | + notification, ok = p.pendingNotifications.ReadOne() |
| 110 | + if !ok { // Nothing to pop |
| 111 | + nextCh = nil // Disable this select case |
| 112 | + } |
| 113 | + // 从 p.addCh 读取一个待加入的对象,然后看是否有待通知的对象,如果有则设置好通知返回,否则写入循环队列 |
| 114 | + case notificationToAdd, ok := <-p.addCh: |
| 115 | + if !ok { |
| 116 | + return |
| 117 | + } |
| 118 | + if notification == nil { // No notification to pop (and pendingNotifications is empty) |
| 119 | + // Optimize the case - skip adding to pendingNotifications |
| 120 | + notification = notificationToAdd |
| 121 | + nextCh = p.nextCh |
| 122 | + } else { // There is already a notification waiting to be dispatched |
| 123 | + p.pendingNotifications.WriteOne(notificationToAdd) |
| 124 | + } |
| 125 | + } |
| 126 | + } |
| 127 | +} |
| 128 | +``` |
| 129 | + |
| 130 | +### run() 方法 |
| 131 | + |
| 132 | +run() 方法从 p.nextCh channel 中获取通知对象,然后根据对象的类型调用注册的监听器函数。 |
| 133 | + |
| 134 | +``` go |
| 135 | +// 来源于:k8s.io/client-go/tools/cache/shared_informer.go |
| 136 | +func (p *processorListener) run() { |
| 137 | + // this call blocks until the channel is closed. When a panic happens during the notification |
| 138 | + // we will catch it, **the offending item will be skipped!**, and after a short delay (one second) |
| 139 | + // the next notification will be attempted. This is usually better than the alternative of never |
| 140 | + // delivering again. |
| 141 | + stopCh := make(chan struct{}) |
| 142 | + wait.Until(func() { |
| 143 | + // this gives us a few quick retries before a long pause and then a few more quick retries |
| 144 | + err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) { |
| 145 | + for next := range p.nextCh { |
| 146 | + switch notification := next.(type) { |
| 147 | + case updateNotification: |
| 148 | + p.handler.OnUpdate(notification.oldObj, notification.newObj) |
| 149 | + case addNotification: |
| 150 | + p.handler.OnAdd(notification.newObj) |
| 151 | + case deleteNotification: |
| 152 | + p.handler.OnDelete(notification.oldObj) |
| 153 | + default: |
| 154 | + utilruntime.HandleError(fmt.Errorf("unrecognized notification: %#v", next)) |
| 155 | + } |
| 156 | + } |
| 157 | + // the only way to get here is if the p.nextCh is empty and closed |
| 158 | + return true, nil |
| 159 | + }) |
| 160 | + |
| 161 | + // the only way to get here is if the p.nextCh is empty and closed |
| 162 | + if err == nil { |
| 163 | + close(stopCh) |
| 164 | + } |
| 165 | + }, 1*time.Minute, stopCh) |
| 166 | +} |
| 167 | +``` |
| 168 | + |
35 | 169 | ## sharedProcessor |
| 170 | + |
| 171 | + sharedProcessor 类型封装了多个 processorListener,用于表示多组用户注册的通知函数。 |
| 172 | + |
| 173 | +``` go |
| 174 | +// 来源于:k8s.io/client-go/tools/cache/shared_informer.go |
| 175 | +type sharedProcessor struct { |
| 176 | + // 标记 processor 是否启动 |
| 177 | + listenersStarted bool |
| 178 | + listenersLock sync.RWMutex |
| 179 | + listeners []*processorListener |
| 180 | + syncingListeners []*processorListener |
| 181 | + clock clock.Clock |
| 182 | + wg wait.Group |
| 183 | +} |
| 184 | +``` |
| 185 | + |
| 186 | +### addListener() 和 addListenerLocked() 方法 |
| 187 | + |
| 188 | +addListener() 方法用于向 Processor 添加新的、封装了用户处理函数的 listener,如果 process 的 run() 方法已经在运行,则启动 listener。 |
| 189 | + |
| 190 | +``` go |
| 191 | +// 来源于:k8s.io/client-go/tools/cache/shared_informer.go |
| 192 | +func (p *sharedProcessor) addListener(listener *processorListener) { |
| 193 | + p.listenersLock.Lock() |
| 194 | + defer p.listenersLock.Unlock() |
| 195 | + |
| 196 | + p.addListenerLocked(listener) |
| 197 | + if p.listenersStarted { |
| 198 | + p.wg.Start(listener.run) |
| 199 | + p.wg.Start(listener.pop) |
| 200 | + } |
| 201 | +} |
| 202 | + |
| 203 | +func (p *sharedProcessor) addListenerLocked(listener *processorListener) { |
| 204 | + p.listeners = append(p.listeners, listener) |
| 205 | + p.syncingListeners = append(p.syncingListeners, listener) |
| 206 | +} |
| 207 | +``` |
| 208 | + |
| 209 | +### distribute() 方法 |
| 210 | + |
| 211 | +遍历 listeners,调用他们的 add() 方法添加对象。如果 obj 的事件类型是 Sync,则 sync 为 true。 |
| 212 | + |
| 213 | +``` go |
| 214 | +// 来源于:k8s.io/client-go/tools/cache/shared_informer.go |
| 215 | +func (p *sharedProcessor) distribute(obj interface{}, sync bool) { |
| 216 | + p.listenersLock.RLock() |
| 217 | + defer p.listenersLock.RUnlock() |
| 218 | + |
| 219 | + if sync { |
| 220 | + for _, listener := range p.syncingListeners { |
| 221 | + listener.add(obj) |
| 222 | + } |
| 223 | + } else { |
| 224 | + for _, listener := range p.listeners { |
| 225 | + listener.add(obj) |
| 226 | + } |
| 227 | + } |
| 228 | +} |
| 229 | +``` |
| 230 | + |
| 231 | +### run() 方法 |
| 232 | + |
| 233 | +运行已经注册的 listeners。 |
| 234 | + |
| 235 | +``` go |
| 236 | +// 来源于:k8s.io/client-go/tools/cache/shared_informer.go |
| 237 | +func (p *sharedProcessor) run(stopCh <-chan struct{}) { |
| 238 | + func() { |
| 239 | + p.listenersLock.RLock() |
| 240 | + defer p.listenersLock.RUnlock() |
| 241 | + for _, listener := range p.listeners { |
| 242 | + p.wg.Start(listener.run) |
| 243 | + p.wg.Start(listener.pop) |
| 244 | + } |
| 245 | + p.listenersStarted = true |
| 246 | + }() |
| 247 | + <-stopCh |
| 248 | + p.listenersLock.RLock() |
| 249 | + defer p.listenersLock.RUnlock() |
| 250 | + for _, listener := range p.listeners { |
| 251 | + close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop |
| 252 | + } |
| 253 | + p.wg.Wait() // Wait for all .pop() and .run() to stop |
| 254 | +} |
| 255 | +``` |
| 256 | + |
| 257 | +### shouldResync() 方法 |
| 258 | + |
| 259 | +根据已经注册的所有 listerns,判断所有需要 syncing 的 listeners。 |
| 260 | + |
| 261 | +``` go |
| 262 | +// 来源于:k8s.io/client-go/tools/cache/shared_informer.go |
| 263 | +func (p *sharedProcessor) shouldResync() bool { |
| 264 | + p.listenersLock.Lock() |
| 265 | + defer p.listenersLock.Unlock() |
| 266 | + |
| 267 | + p.syncingListeners = []*processorListener{} |
| 268 | + |
| 269 | + resyncNeeded := false |
| 270 | + now := p.clock.Now() |
| 271 | + for _, listener := range p.listeners { |
| 272 | + // need to loop through all the listeners to see if they need to resync so we can prepare any |
| 273 | + // listeners that are going to be resyncing. |
| 274 | + if listener.shouldResync(now) { |
| 275 | + resyncNeeded = true |
| 276 | + p.syncingListeners = append(p.syncingListeners, listener) |
| 277 | + listener.determineNextResync(now) |
| 278 | + } |
| 279 | + } |
| 280 | + return resyncNeeded |
| 281 | +} |
| 282 | + |
| 283 | +func (p *sharedProcessor) resyncCheckPeriodChanged(resyncCheckPeriod time.Duration) { |
| 284 | + p.listenersLock.RLock() |
| 285 | + defer p.listenersLock.RUnlock() |
| 286 | + |
| 287 | + for _, listener := range p.listeners { |
| 288 | + resyncPeriod := determineResyncPeriod(listener.requestedResyncPeriod, resyncCheckPeriod) |
| 289 | + listener.setResyncPeriod(resyncPeriod) |
| 290 | + } |
| 291 | +} |
| 292 | +``` |
| 293 | + |
36 | 294 | ## SharedInformer 和 SharedIndexInformer |
37 | 295 |
|
38 | 296 | ``` go |
@@ -113,7 +371,9 @@ type sharedIndexInformer struct { |
113 | 371 | // 从 apiserver List/Watch 对象的 Controller |
114 | 372 | controller Controller |
115 | 373 |
|
| 374 | + // 可以注册多组监听器的共享 processor |
116 | 375 | processor *sharedProcessor |
| 376 | + |
117 | 377 | cacheMutationDetector CacheMutationDetector |
118 | 378 |
|
119 | 379 | // 创建 controller 使用的 ListerWatcher 和对象类型 |
@@ -222,6 +482,7 @@ func (s *sharedIndexInformer) HasSynced() bool { |
222 | 482 | ``` go |
223 | 483 | // 来源于:k8s.io/client-go/tools/cache/shared_informer.go |
224 | 484 | func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) { |
| 485 | + // 参考后文对 AddEventHandlerWithResyncPeriod 的分析 |
225 | 486 | s.AddEventHandlerWithResyncPeriod(handler, s.defaultEventHandlerResyncPeriod) |
226 | 487 | } |
227 | 488 | ``` |
|
0 commit comments