Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

1.4_深入(Re)Balance

elevenqq edited this page Sep 30, 2018 · 2 revisions

概述

Datalink的HA和Reblance机制深入参考了kafka和kafka-connect(版本<0.10.1.0>)的相关设计思想,结合自身的特点和场景进行了重写和改造,相关概念介绍如下:

  • Central Co-Ordination
    Datalink-Manager提供了GroupCoordinator,整个集群的 (Re-)Balance都必须经由中心协调器统一处理,这样可以保证整个集群的状态统一,尽量避免 "脑裂" 的发生
  • Worker
    Datalink-Worker(类似Kafka-Connect的Worker)提供了了WorkerCoordinator,负责和GroupCoordinator进行通信和协调,共同实现Failure-Detection、Balancing、HA等功能
  • Group
    进行(Re-)Balance Processing的基本单位是Group,不同Group之间完全隔离,同一Group中的Task和Worker根据不同的策略进行动态分配和组合

详解

Protocol Introduction

(Re-)Balance流程分为两个阶段

  • 第一阶段(Group Membership)
    该阶段主要用来进行【成员发现】和【Leader选举】
  • 第二阶段(State Synchronization)
    该阶段主要用来进行【状态收集】和【资源分配】

下面对两个阶段做进一步介绍

每个Worker启动完成之后,会拿到两个重要信息:自己所属的Group和GroupCoordinator的地址,这两个信息是Worker完成(Re-)Balance的必要条件

  • Phase 1 : Joining the Group,主要流程如下所示:
    1)触发(Re-)Balance
    常见的触发条件有:Worker加入Group、Worker退出Group、新增Task、删除Task、Manager发生主备切换等
    2)Worker发送JoinGroupRequest给Manager
    JoinGroupRequest携带的主要信息有:GroupId,WorkerId,协议类型和相关元数据
    3)Manager端进行Collect并Await
    Manager以Group为单位对Worker进行管理,等待Group下所有worker的JoinGroupRequest请求(依据last generation的worker列表做判断),直至触发等待超时
    4)Manager发送Response
    当所有Worker已加入Group或触发了等待超时,Manager便随机选中一个Worker为Leader,并给所有worker发送JoinGroupResponse(kafka中还有协议选择的相关操作,我们暂未支持)

  • Phase 2 : Synchronizing Group State
    Join阶段完成后,每个Worker都会收到JoinGroupResponse,然后开始Sync,Sync阶段主要流程如下所示:
    1)Worker判断自己是否是Leader,是的话进行Assign操作并把分配信息放到SyncGroupRequest中发送給Manager,不是的话直接发送一个空的SyncGroupRequest给Manager
    2)Manager等待Leader-Worker的SyncGroupRequest,收到之后解析出每个worker对应的Assignment,然后构造SyncGroupResponse将Assignment信息分别发给每个worker
    3)Worker收到SyncGroupResponse,解析Assignment信息,然后执行分配给自己的Task

  • 补充
    JoinGroupRequest、JoinGroupResponse、SyncGroupRequest、SyncGroupResponse的详情可参见 : org.apache.kafka.common.requests.*下的相关实现

Group State Machine

介绍完Coordinating的主流程,接下来详细介绍一下Manager端Goup的状态机

statemachine

  • Empty
    当Group内没有任何Worker时,状态为Empty,实际场景中,Group的初始状态为Empty(即:未发生任何Balance之前),(Re-)Balance之后如果分组内没有任何worker了分组状态也会被重置为Empty
  • PreparingRebalance
    当GroupCoordinator检测到需要进行(Re-)Balance时,会把Group状态置为PreparingRebalance,检测条件主要有两个:GroupCoordinator收到了Worker主动发送的JoinGroup-Request请求,GroupCoordinator通过heartbeat机制检测到发生了Member-Failure。Group一旦进入PreparingRebalance状态,立即启动定时器,等待Group下所有剩余Worker的JoinGroupRequest请求,直至所有worker都加入或触发定时器超时,然后把状态转换AwaitingSync。当Group处于PreparingRebalance状态时,如果收到了Heartbeats或者SyncGroupRequest请求,GroupCoordinator会返回错误给worker,提示"A rebalance is in progress",worker基于此信息得知自己需要进行Re-join操作,随后发送Join请求。
  • AwaitingSync
    AwaitingSync的上一状态只能是PreparingRebalance。Group处于AwaitingSync状态时,主要任务是等待Leader-Worker的SyncGroupRequest请求,一旦等到Leader的请求,便从请求信息中取出Assingments,将其放到GroupMetadata中,然后把Group状态设置为Stable。当然,Group处于AwaitingSync状态时,也会收到非Leader-Worker的Sync请求,其处理策略是把请求暂存,继续等待Leader,拿到Assingments结果后再对请求进行响应。当Group处于AwaitingSync状态时,如果收到了Heartbeats请求,会返回错误给worker,提示"A rebalance is in progress",worker基于此信息进行Re-join操作;如果收到了JoinGroupRequest请求,会把Group状态置为PreparingRebalance,然后给正在等待SyncGroupResponse响应的worker发送信息,提示"A rebalance is in progress",然后workers发起新一轮的Re-join请求。
  • Stable
    Stable的上一状态只能是AwaitingSync。因为AwaitingSync阶段一旦收到Leader的请求,Group状态便立即转为Stable,所以,处于Stable状态下的Group,仍然会收到SyncGroupRequest请求,此时很简单,从GroupMetadata中取出对应worker的分配结果,直接发送SyncGroupResponse即可。Stable状态下关于心跳的说明,引用kafka文档中的一句话:Heartbeats are accepted from members in this state and are used to keep group members active or to indicate that they need to join the group.
  • Dead
    可以从任何一个状态转换为Dead状态,关于此状态的解释:Group has no more members and its metadata is being removed。目前Manager的高可用采用的是主备模式,并且暂未提供主备动态切换功能,所以Group状态转换为Dead的处理逻辑也还暂未实现。

Worker State Diagram

本小节对Worker端的coordination逻辑做一个深入介绍,因为细节非常多,此处重在介绍主流程

worker-state-diagram

说明:和"group state machine"不同的是,上图展示的并不是一个状态机,图中标识的各种状态也并没有在worker中进行定义,此图是对worker的coordinating逻辑做的一个抽象和总结

  • Down
    表示worker还未启动
  • PreparingJoin
    该状态表示worker需要进行(Re-)join操作,worker的tick线程检测到该状态之后,会触发(Re-)join,worker的状态也随之转换为Join-ing。Worker进入PreparingJoin状态的触发条件有很多,分别是: 23 incomplete 进程启动后的初始状态为PreparingJoin 7 incomplete JoinResponse返回了[特定异常],状态由Join-ing转换为PreparingJoin 8 incomplete SyncResponse返回了[特定异常],状态由Sync-ing转换为PreparingJoin 9 incomplete HeartbeatResponse返回了[特定异常],状态由Stable转换为PreparingJoin 10 incomplete WorkerKeeper检测到新增加了Task或删除了Task,触发PreparingJoin 关于[特定异常],做一下说明,此处表述的[特定异常]主要有: 24 incomplete GROUP_COORDINATOR_NOT_AVAILABLE 17 incomplete NOT_COORDINATOR_FOR_GROUP 18 incomplete REBALANCE_IN_PROGRESS 19 incomplete ILLEGAL_GENERATION 20 incomplete UNKNOWN_MEMBER_ID 26 incomplete 超时异常

当worker收到这些异常时,会触发PreparingJoin,此外,如果是GROUP_COORDINATOR_NOT_AVAILABLE或NOT_COORDINATOR_FOR_GROUP,还会触发"Re Discover Coordinator"操作,如果是ILLEGAL_GENERATION或UNKNOWN_MEMBER_ID,还会触发"Reset Generation"操作

  • Join-ing
    该状态表示worker正在进行"Joining Group"操作,对应于Manager端的PreparingBalance状态。Join-ing状态开始于发送JoinRequest请求,结束于收到JoinResponse响应,如果响应信息正常则进入Sync-ing状态,否则进入PreparingJoin状态。Worker发送JoinRequest之前会检测本地是否有Task正在运行,如果有,则关闭Task之后才发送请求。
  • Sync-ing
    该状态表示worker正在进行"Sync state"操作,对应于Manager端的AwaitingSync状态。Sync-ing状态开始于发送SyncRequest请求,结束于收到SyncResponse响应,如果响应信息正常则进入Stable状态,否则进入PreparingJoin状态。Worker发送SyncRequest之前会判断自己是否是leader,如果是,则进行资源分配,将分配结果放到SyncRequest中发送。Worker收到SyncResponse响应之后,从响应信息中取出自己需要运行的Task,然后运行这些Task。
  • Stable
    该状态表示Worker正在稳定运行,也是worker绝大部分时间所处的状态,此状态下的worker依靠心跳维持和manager的会话,当会话超时或Manager通过心跳通知worker需要进行(Re-)Balance时,worker进入PreparingJoin状态。注:heartbeat在Join-ing和Sync-ing状态下是处于禁用状态的。

Task Assignment

Why Woker Assignment

如前文所述,Join阶段完成之后,Manager会从Group中选中一个Worker为Leader,在Sync阶段,由这个Leader负责Task的分配,然后将分配结果回传给Manager,Manager再将结果下发给该分组下的所有worker。那么,为何我们不在Manager进行任务分配呢,直接在Join-Response中将分配结果下发给Worker,这样还可以省去Sync阶段,缩短Reblance的时间,其原因如下所示:

因为我们深度参考了kafka的设计,先来看一下kafka不在broker进行assign的原因:https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Client-side+Assignment+Proposal,
总结来看其原因主要有:

  • 部署运维不方便
    每次新增或修改Assignments策略,都需要对broker进行重新部署,当集群规模大的时候费时又费力,运维成本非常高
  • 扩展性差
    kafka的生态体系越来越丰富,衍生出了越来越多的子产品,这些子产品对HA和(Re-)Balance的设计要求基本相同,最大的不同点就在资源的分配上,如kafka-connect,分配的资源是job和task,而不是partition。如果在broker端进行"resource assignment",那么每新增一个子产品,都要为broker引入新的领域概念,broker将变的越来越重、越来越复杂,极大的限制了系统的扩展性;如果在客户端进行"resource assignment",broker只提供"group management",那么不仅能最大限度的让所有产品复用HA基础设置,还能实现对broker的零侵入,系统的扩展性大大增强。
  • 耦合性高
    即使没有基于kafka扩充新产品的需求,但broker和consumer也都分别有自己的【领域模型】,如果在broker端进行assign,一些简单的分配策略也许还没什么问题(比如:Round Robin),但进行一些特殊的或高级的资源分配策略时,可能需要用到consumer自己领域内的一些【指标】或【元数据】,显然这些都不合适在broker端实现

基于以上的描述,datalink中倒没有部署运维不方便的问题,因为Manager只有两个节点做HA互备,但从【扩展性】和【耦合性】两个维度看的话,在Worker端进行Assign还是上上策,虽然多了一个Sync阶段,但是这个对性能的影响并不大,完全在可接受范围内,so,Woker Assignment First

Assignment Policy

目前我们的Task分配策略还比较简单,只支持RoundRobin一种方式,新的策略还在规划设计中,简要介绍如下:

  • 基于资源的分配
    通过MetaData获取到Worker的CPU、内存、网卡等的配置参数,通过运行统计信息获取到Task的流量排行,基于这些信息实现Task和Worker的最优分配,解决分配倾斜的问题
  • 降低Task的不可用时间
    在现有的方案中,Worker每次Re-join之前都会关闭其运行的Task,但Re-balance之后,这些Task很可能仍然在此worker上运行,需要进行优化,尽量避免Task的重启
  • Sticky Assign
    增加对【粘性分配】的支持,可以显示指定Task和Worker之间的绑定关系,Task优先分配给绑定关系中配置的worke,除非worker已不在分组中

参考资料

说明:如下链接的相关描述都是基于kafka0.9,同【Kafka-0.10】以及【datalink】相比,很多细节是有不少差异的,仅供参考

https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Client+Re-Design
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Client-side+Assignment+Proposal
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Detailed+Consumer+Coordinator+Design


Clone this wiki locally

AltStyle によって変換されたページ (->オリジナル) /