Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit 860173c

Browse files
author
Zhang Jun
committed
update delta_fifo chapter
1 parent b6f4a71 commit 860173c

File tree

1 file changed

+108
-102
lines changed

1 file changed

+108
-102
lines changed

‎client-go/2.queue-fifo-delta_fifo.md‎

Lines changed: 108 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ type Queue interface {
1818

1919
AddIfNotPresent(interface{}) error
2020

21-
// 当队列中第一批元素都 Pop 后,返回 true。
21+
// 当队列中第一批对象都 Pop 后,返回 true。
2222
HasSynced() bool
2323
Close()
2424
}
@@ -56,10 +56,10 @@ type FIFO struct {
5656
// 对象弹出(Pop)顺序队列,队列中各对象 Key 是**唯一**的
5757
queue []string
5858

59-
// 首先调用 Delete/Add/Update 或 Replace() 添加的第一批元素都 Pop 后为 true
59+
// 首先调用 Delete/Add/Update 或 Replace() 添加的第一批对象都 Pop 后为 true
6060
populated bool
6161

62-
// Replace() 添加的第一批元素的数目
62+
// Replace() 添加的第一批对象的数目
6363
initialPopulationCount int
6464

6565
// 添加或减少 obj 用到的,生成 obj 标识 key 的函数
@@ -89,7 +89,7 @@ func NewFIFO(keyFunc KeyFunc) *FIFO {
8989

9090
## `Add()` 方法
9191

92-
只有缓存中没有该对象时,才将它加入弹出(Pop)队列,这样可以保证该对象在未被 Pop 前只会由一个 worker 处理:
92+
只有缓存中没有该对象时,才将它加入弹出(f.queue)队列,这样可以保证该对象在未被弹出前只会被一个 worker 处理:
9393

9494
``` go
9595
// 来源于 k8s.io/client-go/tools/cache/fifo.go
@@ -281,10 +281,11 @@ func (f *FIFO) Resync() error {
281281
4. DeltaFIFO 的 Pop/Get() 方法,返回的不是对象最新值,而是 Deltas 类型的对象事件列表。
282282

283283
DeltaFIFO 适用的情况:
284+
284285
1. 你希望最多一个 worker 处理某个对象的事件(与 FIFO 类似,对象在 Queue 中是唯一的);
285286
2. 当处理该对象时,可以获得自上次以来该对象的所有事件,如 Add/Updat/Delete(FIFO 只缓存和弹出该对象的最新值);
286287
3. 你可以处理删除对象的事件(FIFO 不支持该功能,它不会弹出被删除的对象);
287-
4. 你想周期处理所有的对象(调用 Resync 方法,将 knownObjects 中的对象同步到 DeltaFIFO 中);
288+
4. 你想周期处理所有的对象( Reflector 周期调用 Resync() 方法,将 knownObjects 中的对象同步到 DeltaFIFO 中);
288289

289290
DeltaFIFO 是一个生产者-消费者队列,生产者是 [Reflector](3.reflector.md),消费者主要是 [controller/sharedInformer/sharedIndexInformer](4.controller-informer.md)
290291

@@ -294,136 +295,106 @@ DeltaFIFO 是一个生产者-消费者队列,生产者是 [Reflector](3.reflec
294295
// 来源于 k8s.io/client-go/tools/cache/delta_fifo.go
295296
func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO {
296297
f := &DeltaFIFO{
298+
// 对象事件缓存,对象 Key 为 map key,map value 为事件列表类型 Deltas;
297299
items: map[string]Deltas{},
300+
// 对象弹出队列,缓存的是对象 Key,后续 Pop 方法按续弹出;
298301
queue: []string{},
299302
keyFunc: keyFunc,
303+
// 对象缓存,DeltaFIFO 的消费者 controller 根据从 DeltaFIFO 弹出的 Deltas 对象进行更新;
300304
knownObjects: knownObjects,
301305
}
302306
f.cond.L = &f.lock
303307
return f
304308
}
305309
```
306310

307-
传入的 `knownObjects` 是实现了 `KeyListerGetter` 接口的 `Store``Index`(一般是 [cache](1.store-indexer-cache.md)),它缓存 DeltaFIFO 已知的所有对象。
308-
309-
DelaFIFO 只调用 `knownObjects` 缓存的两个**只读**方法,**不会对他进行更新**:
310-
+ `f.knownObjects.GetByKey(id)`:按照 id 获取对象;
311-
+ `f.knownObjects.ListKeys()`:列出所有对象的 Key;
312-
313-
各种 `Informer` 的创建函数,如 `NewInformer、NewIndexInformer、NewSharedInformer、NewSharedIndexInformer`,会依次创建 `knownObjects` 缓存(`struct cache` 类型)、`FIFO/DeltaFIO`[`controller`](4.controller-informer.md)
311+
传入的 `knownObjects` 缓存了 DeltaFIFO 已知的所有对象,DelaFIFO **不对它进行更新**,只是用它来查找对象。
314312

315-
`controller` 使用 `DeltaFIFO` 作为队列,再将它传给创建的 [Reflector](3.reflector.md),**Reflecter 是 FIFO/DeltaFIO 的生产者**
313+
### DeltaFIFO 的生产者和消费者
316314

317-
**`controller``FIFO/DeltaFIFO` 的消费者,它同时负责更新 `knownObjects` 缓存**:
318-
1. 从 FIFO 弹出对象;
319-
2. 更新 knownObjects 缓存;
320-
3. 再调用注册的 OnUpdate/OnAdd/OnDelete 回调函数;
315+
后续文章会介绍,创建各种 `Informer`(如 `Informer、IndexInformer、SharedInformer、SharedIndexInformer`)时,初始化函数会依次创建 `knownObjects` 缓存、`DeltaFIO`[`controller`](4.controller-informer.md)`controller` 再将 `DeltaFIFO` 传给 [Reflector](3.reflector.md),**Reflector 的 `ListAndWatch()` 方法是 DeltaFIFO 的生产者**:
321316

322-
所以,controller 用从 DetalFIFO 弹出的 Deltas 更新(取决于 Delta 中记录的事件类型) `knownObjects` 缓存,缓存的更新时间**晚于** FIFO。
317+
1. List etcd 中(通过 kube-apiserver,下同)特定类型的所有对象,然后调用 DeltaFIFO 的 `Replace()` 方法,将他们同步到 DeltaFIFO;
318+
2. 根据配置的 Resync 时间,**周期调用** DeltaFIFO 的 `Resync()` 方法(见后文),将 knownObjects 中的对象更新到 DeltaFIFO 中;
319+
3. 阻塞 Watch etcd,根据事件的类型分别调用 DeltaFIFO 的 Add/Update/Delete 方法,将对象更新到 DeltaFIFO;
323320

324-
各种 `Informer``controller` 使用 DeltaFIFO 的细节可以参考 [Reflector](3.reflector.md)[`controller`](4.controller-informer.md) 文档。
325-
326-
## Add() 和 Update() 方法
321+
Watch etcd 会**周期性的**超时(因为设置了 timeout),当 `ListAndWatch()` 出错返回时,Reflector 会等待一段时间再执行它,从而周期的将 `etcd` 中的特定类型对象**的全部对象**同步到 `DeltaFIFO`
327322

328-
``` go
329-
// 来源于 k8s.io/client-go/tools/cache/delta_fifo.go
330-
func (f *DeltaFIFO) Add(obj interface{}) error {
331-
f.lock.Lock()
332-
defer f.lock.Unlock()
333-
f.populated = true
334-
return f.queueActionLocked(Added, obj)
335-
}
336-
```
323+
**`controller``DeltaFIFO` 的消费者,它用 DeltaFIFO 弹出的对象更新 `knownObjects` 缓存,然后调用注册的 OnUpdate/OnAdd/OnDelete 回调函数**。详情参考 [Reflector](3.reflector.md)[controller 和 Informer](4.controller-informer.md) 文档。
337324

338-
`Update()` 方法和 `Add()` 方法类似,它最终调用 `f.queueActionLocked(Updated, obj)` 方法。
339-
340-
## queueActionLocked() 方法
341-
342-
queueActionLocked(actionType DeltaType, obj interface{}) ->
343-
344-
1. 如果事件类型为 Sync,而且 obj 对应的事件列表中最后一个事件为 Delete,则直接返回。
345-
2. 合并 obj 连续重复的 Delete 事件为一个;
346-
3. 如果 f.items[id] 中没有该元素,则将它的 id append 到 f.queue; <-- 唯一更新 f.queue 的时机。
347-
4. 将 obj 合并后的事件保存到 f.items[id] <-- 将 obj 当前事件与历史事件合并;
348-
349-
什么时候 `f.items[id]` 不存在呢?
350-
351-
1. DeltaFIFO 刚创建,元素第一次产生 Add/Update/Delete/Sync 事件;
352-
2. 后续调用 f.Pop() 方法,弹出 id 对应的事件列表;
353-
354-
## Delete() 方法
325+
### 记录对象事件的 Delta、Deltas 和 DeletedFinalStateUnknown 类型
355326

356-
Delete() 方法可能**直接返回**,也可能调用方法 `f.queueActionLocked(Deleted, obj)` 为对象生成 `Deleted` 事件。取决于 `f.knownObjects.GetByKey(id)``f.items[id]` 是否均均找不到对象。
357-
358-
注意:Delete() 方法**不将对象从事件缓存 f.items 和弹出队列 f.queue 删除**(FIFO 的 Deleta() 方法会将对象从缓存中删除),只是可能生成 Deleted 事件。DeltaFIFO 的消费者 `controller` 根据弹出的事件更新 f.knownObjects 缓存。
327+
DeltaFIFO 使用 Delta 类型记录对象的事件类型和发生**事件后**的对象值:
359328

360-
## Get/GetByKey/List/ListKeys() 方法
329+
``` go
330+
type Delta struct {
331+
// DeltaType 可能是:Added、Deleted、Updated、Sync
332+
Type DeltaType
333+
Object interface{}
334+
}
335+
```
361336

362-
都是从内部缓存 f.items 中(非外部缓存 f.knownObjects)返回对象的 Deltas 或 Key 列表;
337+
有一种特殊情况:当 Reflecter 重复执行 `ListAndWatch()` 方法时,List etcd 获取的对象集合 set1 可能与 knownObjects 缓存中的对象集合 set2 **不一致**,调用的 Replace() 方法会发现这种情况,将位于 set1 但不在 set2 中的对象生成 `DeletedFinalStateUnknown` 类型事件:
363338

364-
## Replace() 方法
339+
``` go
340+
type DeletedFinalStateUnknown struct {
341+
// 对象的 Key
342+
Key string
343+
// knownObjects 缓存中的对象值
344+
Obj interface{}
345+
}
346+
```
365347

366-
Replace(list []interface{}, resourceVersion string)
348+
之所以不用 Type 为 Deleted 的 Delta 来表示,是因为:etcd 中没有该对象,Reflector **不知道**这个对象的当前值,所以只能用一个有别与 Deleted Delta 类型的特殊类型来表示这种情况。
367349

368-
1. 遍历 list 中的元素,为每个元素生成 Sync 事件,f.queueActionLocked(Sync, item)
369-
2. 遍历 f.knownObjects.ListKeys() 中的元素,对于不在传入的 list 中的元素,生成 Delete 事件,添加的元素类型为 DeletedFinalStateUnknown(非 Delta 类型):
350+
**有且只有** `Replace()` 方法才会产生 `DeletedFinalStateUnknown` 类型事件。
370351

371-
f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj})
352+
## Add() 和 Update() 方法
372353

373-
3. 如果 f.Replace() 方法是在创建 DeltaFIFO 后第一次调用的方法(Add/Update/Delete 之前,此时 f.populated 为 false),则设置:
374-
设置 f.populated 为 true;
354+
``` go
355+
// 来源于 k8s.io/client-go/tools/cache/delta_fifo.go
356+
func (f *DeltaFIFO) Add(obj interface{}) error {
357+
f.lock.Lock()
358+
defer f.lock.Unlock()
359+
f.populated = true
360+
// Added 类型事件;
361+
return f.queueActionLocked(Added, obj)
362+
}
363+
```
375364

376-
f.initialPopulationCount = len(list) + queuedDeletions,其中 queuedDeletions 为生成 DeletedFinalStateUnknown 的对象数目;
365+
`Update()` 方法和 `Add()` 方法类似,差别在于产生的是 `Updated` Delta 类型事件;
377366

378-
## Delta 和 DeletedFinalStateUnknown 事件
367+
## queueActionLocked() 方法
379368

380-
前面介绍过,**Reflecter 是 FIFO/DeltaFIO 的生产者**,**`controller``FIFO/DeltaFIFO` 的消费者,它同时负责更新 `knownObjects` 缓存**:
369+
该方法将对象的事件存入事件队列 f.items,如果事件队列中没有该对象则还将对象(Key)加入弹出队列(f.queue),另外它还做如下操作:
381370

382-
Reflecter 的 ListAndWatch() 方法负责更新内部的 DeltaFIO(具体参考:[3.reflector.md](3.reflector.md)):
371+
1. 如果事件类型为 Sync,且对象的事件列表中最后一个事件类型为 Deleted,则直接返回(没有必要再同步一个已删除的对象);
372+
2. 连续重复的 Deleted 事件为一个;
383373

384-
1. 从 kube-apiserver List 特定类型的所有对象,然后调用 DeltaFIFO 的 Replace() 方法;
385-
2. 根据配置的 Resync 时间,周期调用 DeltaFIFO 的 Resync() 方法;
386-
3. 后续的 Watch 阶段,会根据事件的类型,调用 DeltaFIFO 的 Add/Update/Delete 方法;
374+
## Delete() 方法
387375

388-
Reflecter 的 `Run()` 方法调用 `ListAndWatch()` 方法,如果没有出错则一直阻塞,当出错时,会等待 r.period 时间(**期间没有 Watch,可能会丢事件**),再次执行 ListAndWatch() 方法获取特定类型所有对象并 Watch。
376+
如果 `f.knownObjects` 对象缓存和事件队列 `f.items` 中均没有待删除的对象,则**直接返回**,否则为对象生成 `Deleted` Delta 事件(非`DeletedFinalStateUnknown` 类型)。
377+
378+
不同于 FIFO 的 Deleta() 方法会将对象从缓存中删除,DeltaFIFO 的 Delete() 方法**不将对象从事件缓存 f.items 和弹出队列 f.queue 删除**。而是由 DeltaFIFO 的消费者 `controller` 根据弹出的 Deltas 对象,将对象从对象缓存 f.knownObjects 中删除。
389379

390-
``` go
391-
func (r *Reflector) Run(stopCh <-chan struct{}) {
392-
klog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedType, r.resyncPeriod, r.name)
393-
wait.Until(func() {
394-
if err := r.ListAndWatch(stopCh); err != nil {
395-
utilruntime.HandleError(err)
396-
}
397-
}, r.period, stopCh)
398-
}
399-
```
380+
## Get/GetByKey/List/ListKeys() 方法
400381

401-
所以,Reflecter 可能会**多次调用** DeltaFIFO 的 Replace() 方法。
382+
都是从内部事件缓存 f.items 中返回对象的 Deltas 或 Key 列表;
402383

403-
有且只有 Replace() 方法可能生成 DeletedFinalStateUnknown 事件,如:
384+
## Replace() 方法
404385

405-
1. Reflecter 重新执行 ListAndWatch() 方法的 Sleep r.period 期间丢了该对象的 Delete 事件(所以对象还在 f.knownObjects 中);
406-
2. Reflecter 执行 ListAndWatch() 方法从 kube-apiserver LIST 该类型资源对象的结果中没有该对象;
386+
`Replace(list []interface{}, resourceVersion string)`
407387

408-
``` go
409-
type Delta struct {
410-
Type DeltaType
411-
Object interface{}
412-
}
413-
type DeletedFinalStateUnknown struct {
414-
Key string
415-
Obj interface{}
416-
}
417-
```
388+
1. 遍历 list 中的对象,为每个对象生成 `Sync` 事件;
389+
2. 遍历 f.knownObjects.ListKeys() 中的对象,对于不在传入的 list 中的对象,生成 `Deleted` 事件,对象类型为 `DeletedFinalStateUnknown`(非 Delta 类型);
418390

419-
+ Delta 代表正常的事件,包含事件类型和元素,Object 是事件产生后的状态;
420-
+ DeletedFinalStateUnknown 代表删除事件,Key 为对象的 id;
391+
Reflecter 周期调用 Replace() 方法将 etcd 中的特定类型的所有对象同步到 DeltaFIFO 中。`controller` 用 DeltaFIFO 弹出的对象更新 `knownObjects` 缓存,详情参考 [Reflector](3.reflector.md)[controller 和 Informer](4.controller-informer.md) 文档。
421392

422393
## Resync() 方法
423394

424-
遍历 f.knownObjects.ListKeys() 中的元素:
425-
对于某个元素 id,如果 f.items[id] 有值(长度 > 0),则跳过;
426-
否则,生成 Sync 事件:f.queueActionLocked(Sync, obj)
395+
遍历 f.knownObjects.ListKeys() 中的对象:
396+
对于某个对象 id,如果 f.items[id] 有值(长度 > 0),则跳过;
397+
否则,生成 Sync 事件:f.queueActionLocked(Sync, obj)
427398

428399
综上,**Replace() 和 Rsync() 方法会会生成 Sync 事件**
429400

@@ -437,7 +408,7 @@ Pop(process PopProcessFunc)
437408

438409
## HasSyncd() 方法
439410

440-
创建 FIFO/DealtaFIFO 后,如果首先调用的是 `Replace()` 方法,则 `f.populated` 被设置为 `true`,`f.initialPopulationCount` 被设置为传入的对象数量。当这一批对象都被弹出完毕时( 包含弹出前被删除的对象),`HasSynced()` 方法返回 `true`:
411+
创建 FIFO/DealtaFIFO 后,如果首先调用的是 `Replace()` 方法,则 `f.populated` 被设置为 `true`,`f.initialPopulationCount` 被设置为传入的对象数量。当这一批对象都被弹出完毕时( 包含弹出前被删除的对象),`HasSynced()` 方法返回 `true`:
441412

442413
``` go
443414
// 来源于 k8s.io/client-go/tools/cache/fifo.go
@@ -450,10 +421,45 @@ func (f *DeltaFIFO) HasSynced() bool {
450421

451422
另外,如果在调用 `Replace()` 方法前,**首先**调用了 `Add/Update/Delete/AddIfNotPresent()` 方法,则 `HasSynced()` 方法也会返回 `true`
452423

453-
后面会介绍,FIFO 的主要使用者是 `Refector`,它在启动后:
424+
## DeltaFIFO 和 knownObjects 对象缓存的同步
425+
426+
1. Reflector 从 etcd List 出对象后,调用 DeltaFIFO 的 Replace() 方法为传入的对象生成 Sync 事件,此时 knownObjects 为空;
427+
2. controller 从 DeltaFIFO 弹出对象的事件列表 Deltas,遍历 Deltas,根据 Delta 中的事件类型更新 knownObjects,从而实现 DeltaFIFO 和 knownObjects 缓存中的对象一致:
428+
429+
controller 每次**启动**时,因为 knownObjects 为空且事件类型为 Sync,所以对从 etcd 同步来的所有对象,都先调用一次 clientState 的 **Add() 方法和注册的 OnAdd() 回调函数**
430+
431+
``` go
432+
// 来源于:k8s.io/client-go/tools/cache/controller.go
433+
for _, d := range obj.(Deltas) {
434+
switch d.Type {
435+
// Replace() 方法生成的 Sync 事件涉及到的对象,
436+
case Sync, Added, Updated:
437+
if old, exists, err := clientState.Get(d.Object); err == nil && exists {
438+
if err := clientState.Update(d.Object); err != nil {
439+
return err
440+
}
441+
h.OnUpdate(old, d.Object)
442+
} else {
443+
if err := clientState.Add(d.Object); err != nil {
444+
return err
445+
}
446+
h.OnAdd(d.Object)
447+
}
448+
case Deleted:
449+
if err := clientState.Delete(d.Object); err != nil {
450+
return err
451+
}
452+
h.OnDelete(d.Object)
453+
}
454+
}
455+
```
456+
457+
3. 但是,Reflector 的 Watch 可能会出现**丢失事件**的情况(如 ListAndWatch 出错返回后,Reflector 会 Sleep 一段时间再执行它,期间 etcd 的对象变化事件丢失),这样再次 List 到的对象集合与 knownObjects 缓存中的对象集合不一致。
458+
459+
Replace() 方法会为 knownObjects 缓存中多余的对象生成 Deleted 类型的 `DeletedFinalStateUnknown` 事件,这样后续 controller 弹出该事件后,将对象从 knownObjects 缓存删除,从而达到两个缓存一致的目的;
460+
461+
4. ListAndWatch() 方法起一个 goroutine,周期的调用 Resync() 方法将 knownObjects 中的对象更新到 DeltaFIFO,为何要这么做呢?
454462

455-
1. 首先从 kube-apiserver LIST 某类型对象,然后调用 FIFO 的 Replace() 方法,传入对象列表,实现将**某类型的所有对象**同步到缓存中;
456-
2. 再从 kube-apiserver Watch 该类型对象的变化事件,如 Update/Delete,按需调用 FIFO 的 Add/Delete/Update 方法;
457-
3. 周期从 kube-apiserver 中 Rsync 该类型的所有对象,传给 FIFO/DeltaFIFO 的 Replace() 方法;
463+
在前文我们提到 DeltaFIFO 的使用场景之一是:**"你想周期处理所有的对象"**,但对象一旦从 DeltaFIFO 中弹出,如果没有产生新的 Watch 事件,就不会对它再调用注册的回调函数。而周期执行 Resync() 方法的目的就是**为对象产生新的 Sync 事件**,从而有机会再次调用处理函数。基于此,Resync 时,如果对象已经在 f.items 中,则有机会被弹出,所以不需要为它生成 Sync 事件。
458464

459-
所以,对于 `Refector` 的调用方而言,`HasSynce` 结果为 `true` 表示对象都已经**完整同步过一次和且被处理完毕**
465+
但是这里会有一个问题:如果当前对象的最后一个

0 commit comments

Comments
(0)

AltStyle によって変換されたページ (->オリジナル) /