Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

1.3_深入Task

elevenqq edited this page Sep 30, 2018 · 6 revisions

Task概述

DataLink真正执行数据同步任务的是每一个具体的Task,由Task从某一个固定类型的数据源读取数据,并同步到若干个目标端数据源,即为一对多的关系。我们将源端数据源类型规定为Task的类型,系统目前支持的Task类型有:MYSQL, FLEXIBLEQ, HBASE,支持同步到的目标端数据源类型有:Rdbms、ElasticSearch、Hdfs、HBase、FlexibleQ、SDDL。

Task生命周期

Task

上图展示了Task进行一次数据同步的流程。

  • 【类拥有关系】
    > WorkerTaskContainer为Task的执行入口类,负责管理WorkerTaskReader和WorkerCombinedTaskWriter的生命周期;WorkerCombinedTaskWriter负责管理多个WorkerTaskWriter;WorkerTaskReader和WorkerTaskWriter分别管理TaskReader和TaskWriter的生命周期。
  • 【Task同步流程】
    > 同步流程从WorkerTaskReader发起,由TaskReader从某个类型的数据源拉取数据,或者收到数据源推送的数据,然后将数据put到队列中,进行callback等待;WorkerCombinedTaskWriter负责从队列take数据,再分别存储到与它所管理的各个WorkerTaskWriter所共享的队列中,进行callback等待;然后,每个WorkerTaskWriter从队列中take数据,由各个TaskWriter根据数据的Record类型加载对应的Handler,等所有Handler处理完数据后,不论同步成功或失败都进行callback通知,结束所有队列的等待;最后,TaskReader进行判断,如果成功则执行commit,然后发起下一轮同步,如果失败则执行rollback,然后重试。
    【注】在Task的Reader和Writer为一对多的情况下,除了第一个Writer用Reader直接传过来的RecordChunk,其他Writer要copy一份RecordChunk来用,而copyRecordChunk使用反射实现,比较耗费系统资源,所以,如果是数据量较大且性能要求较高的同步场景,不建议使用一对多。
  • 【Task生命周期】
    > TargetState
    Task的目标状态,包括STARTED和PAUSED两种,用于在在Manager端控制Task的运行状态。
    > TaskStatus

    Task的实际运行状态类,包括UNASSIGNED、PREPARING、RUNNING、PAUSED、FAILED五种状态以及executionId、generation、startTime、workerId等运行属性,会随Task的启动注册到ZK,用于监控Task的当前运行状态信息。
    例,id=10的Task在ZK上的Status节点路径为"/datalink/tasks/10/status",节点内容如下图所示:

    zk-status
    > Task生命周期
    Task的实际运行状态默认值设置为UNASSIGNED。Task启动时,首先进入Prepare阶段,即在ZK添加Task的status节点,并将运行状态更新为PREPARING。Prepare成功之后,判断Task的目标状态,若目标状态为PAUSED,则status节点的运行状态更新为PAUSED,同时Task线程阻塞;若目标状态为STARTED,则status节点的运行状态更新为RUNNING,同时Task正常运行。如果Task在运行过程中出现异常,且Task未被cancelled,则status节点的运行状态更新为FAILED,同时将异常堆栈发送到ZK,阻塞等待直到触发stopTask。stopTask完成之后,便会remove掉Task的status节点。在上述过程中,由TaskStatusListener负责监听并通知系统对ZK的status节点做相应的状态变更

  • 【Task状态监控报警】
    在Task生命周期中,对其运行状态的监控与报警有如下几种情况:
    > 在Task启动之前的Prepare阶段,在添加status节点时,若发现ZK上已存在该节点,说明该Task正在其他地方运行,则抛出TaskConflictException异常,并每秒进行一次重试,如果重试时间已经超过了ZK的会话超时时间,则触发报警,并继续重试,直到Task重复运行的情况消失。
    > 除了Task运行冲突异常监控报警,TaskStatusMonitor还对Task的实际运行状态进行监控,当出现以下三种情况之一时,触发报警:
    (1)Task的目标状态运行状态不一致(正常情况下,目标状态STARTED对应运行状态RUNNING,目标状态PAUSED对应运行状态PAUSED);
    (2)Task的status节点消失,运行状态变为UNASSIGNED;
    (3)Task出现异常,运行状态变为FAILED。
    并且,如果Task的目标状态为STARTED而运行状态为FAILED时,会重启Task,即结束线程,重新启动

TaskContext&TaskSession

  • 【TaskContext】
    > 每个Task都会绑定一个TaskContext,通过context可以实现Task和Runtime之间的交互,可以供TaskReader和TaskWriter随时获取Task的Id、ExecutionId、Service、Global-Session、Global-Attributes与开启Session功能。TaskContext的生命周期和Task的运行生命周期保持一致,当Task启动时TaskContext随之创建,当Task关闭时TaskContext也随之销毁。TaskAttributes和TaskContext的生命周期是一致的,主要用来支持Task整个生命周期内的数据共享。而TaskSession是会话级的数据共享机制,完成一次RecordChunk数据同步的过程定义为一次会话,TaskSession的生命周期即为一次数据同步。
  • 【TaskSession】
    > 如上图所示,TaskSession主要用来支持一次数据同步过程中的数据共享,即TaskReader和TaskWriter之间可以通过TaskSession共享数据,而TaskReaderSession和TaskWriterSession则分别供TaskReader和TaskWriter用于其内部共享数据。一次会话的起始和结束均为TaskReader,所以,在此触发TaskSession的begin操作(首先进行reset)。
  • 【TaskSession应用示例】
    > MySQL-Reader插件dump获取到的原始数据:存入TaskReaderSession中的key为MESSAGE_KEY,value为本次同步fetch到的原始数据,并根据需要进行dump。
    > Reader端和Writer端各自的数据同步性能统计:存入TaskReaderSession中的key为ReaderStatistic,TaskWriterSession中的key为WriterStatistic,value分别为各自的统计指标,通过在程序中按需存取,计算到本次同步的Record数量、延迟时间、load时间、TPS等,为Task的性能监控提供基础数据。
    > 映射拦截器OrderEntRecordInterceptor:该拦截器的作用是拦截非企业订单,只同步企业订单。存入TaskWriterSession中的key为PREFIX+id,value为企业订单id的值,当同步订单子表数据时,只有子表的订单id在session中存在,才进行同步。这样利用TaskSession,并依赖主表子表关系,可以实现企业订单的快速过滤,提高了同步效率。

Task消费位点管理机制

  • 【TaskPositionManager】
    PositionManager是Datalink提供的一个公用的数据同步位点管理组件,负责查询、更新消费位点。如果TaskReader有自己的位点管理机制,用自己的机制即可。TaskPositionManager是PositionManager的默认实现,启动和停止跟随Worker主线程,其主要功能如下:
    > 查询消费位点主要用于实时监控Task的同步情况(当前消费位点);
    > 更新消费位点有定时更新和实时更新两种。正常同步过程中采用定时更新,首先将每个Task的位点变更信息存入内存,然后每隔1秒将内存中Task的最新位点信息持久化到ZK节点(多次变更只刷一次),表示当前已经消费到的位置。在重启Task时可以重置消费位点,因此采用实时更新,使其启动之后能够从正确的位点开始消费。
    > 例,id=10的Task在ZK上的Position节点路径为"/datalink/tasks/10/position",节点内容如下图所示:

    zk-position

  • 【位点管理机制应用示例】
    > Task的消费位点变更发生在一次同步成功之后。Reader端进行commit时,会判断是否自带commit机制,若有,则Reader插件自己进行commit和刷新位点;若没有,则使用DataLink公用的位点管理机制刷新ZK的位点信息。一般情况下,具体的Reader插件不需要实现commit功能,由DataLink框架自动记录records消费的偏移量;若需要在自己系统内部存储偏移量,则可以选择实现commit功能。
    > DataLink的MySQL-Reader插件选择使用canal自带的位点更新机制,同时自定义CanalTaskMetaManager,重写了canal自带的MetaManager,将canal的消费位点纳入统一管理。
    > DataLink的HBase-Reader插件和Fq-Reader插件均使用自己的位点管理机制。

Task配置信息管理机制

  • 【TaskConfigManager】
    > TaskConfigManager是Task配置信息管理器,负责发现Task配置变更并进行事件通知,并以组为单位进行监控管理,不属于本组的变更不予关注。

    > TaskConfigManager随Worker线程启动后,首先进行强制refresh,初始化本组的taskConfigList和version,获取最新配置。然后开始每隔500ms循环刷新Task配置信息,若version发生变化,则通过TaskConfigUpdateListener通知系统做相应的处理:若有Task新增或删除,则触发分组进行ReBalance;若有Task参数配置更新(包括Task参数、Reader参数或者Writer参数的变化),则重启Task;若有Task目标状态更新,则更新Task的目标状态,并对mustRestartWhenPause == true类型的Task进行重启(具体原因请见Task常用参数)。

LeaderTask机制

MYSQL类型数据同步,DataLink的Task和Reader端的数据源是一对一的关系,一个同步任务对应一个Task的配置信息。但是分布式集群的同步,例如HBASE,DataLink的Task模拟是HBASE的从集群,与Reader端的HBASE集群是多对一的关系,若为每个Task进行一次配置,同步将过于繁琐。因此针对集群对集群、需要多个Task的数据同步模型,引入LeaderTask机制,简化Task配置。

  • 【LeaderTask机制应用示例】
    > 每个要同步的HBASE集群对应Reader端一个Repl-ZNode-Parent,同一ZNode下的所有Task模拟的是一个HBASE从集群,每个Task模拟的是一个RegionServer,其中创建的第一个Task是LeaderTask。
    > 所有Task的相关配置,包括同步映射和Task监控等,只需要在LeaderTask上配置即可,其它FollowerTask会复用LeaderTask的配置信息。

Task常用参数

  • 【Task基本参数】

    Task基本参数是DataLink所有类型Task的通用参数。

    Task基本参数 参数描述 默认值 备注

    id

    Task在数据库中的唯一标识


    groupId

    Task所属分组的id

    每个Task必须且只能属于一个分组

    TaskType

    Task的类型

    按照Reader端类型划分为三种:MYSQL, FLEXIBLEQ, HBASE;
    mustRestartWhenPause mustRestartWhenPause==true时,
    若Task的目标状态被置为PAUSED,则必须重启Task
    HBASE TaskType中的一个公用方法属性

    TargetState

    Task的目标状态

    STARTED,PAUSED;

    TaskStatus

    Task的当前状态信息 UNASSIGNED,PREPARING,RUNNING,PAUSED,FAILED;
    当前状态与运行信息均在ZK上注册,显示Task的当前运行状况
    readerMediaSourceId Task所关联的Reader的数据源id

    taskParameter

    Task的参数 目前只有taskId

    taskReaderParameter

    Task的Reader插件的基本参数 PluginReaderParameter类型,

    不同的Reader插件可以定义各自的扩展参数

    taskWriterParameter

    Task的Writer插件的基本参数

    List<PluginWriterParameter>类型,

    不同的Writer插件可以定义各自的扩展参数

    isLeaderTask

    Task是否为LeaderTask 适用于集群类型的Task

    leaderTaskId

    是FollowerTask时,其LeaderTask的id 适用于集群类型的Task
    version Task配置的版本 即整个分组配置的版本

    【注】HBASE类型的Task被设为mustRestartWhenPause==true的原因:

    > Task被置为PAUSE之后,其ZK节点还存在,因此HBase-Master会继续往Task推送Log,但是Task收到Log之后会阻塞住,HBase-Master等待超时,继续推,再等待超时。从实际测试结果来看,此Task会导致所有Task的同步出现严重延迟。所以必须重启改Task,WorkerTaskReader检测到PAUSE状态,会阻塞在HBase-Reader启动之前,这样Task便不会在ZK注册,避免HBase-Master往Task推送Log,同时降低系统资源的占用。


  • 【插件基本参数】

    PluginParameter是DataLink所有插件的通用参数,PluginReaderParameter和PluginWriterParameter均继承了PluginParameter。

    PluginParameter参数 参数描述 默认值 备注

    pluginName

    插件的名字 插件自定义

    pluginClass

    插件的实现类 插件实现类的路径

    pluginListenerClass

    插件的Listener实现类 插件Listener实现类的路径

    supportedSourceTypes

    插件支持的数据源类型 不同的插件支持不同的数据源类型 可以为多个,例如Rdbms-Writer支持的类型有
    MYSQL、SQLSERVER、POSTGRESQL、SDDL

    perfStatistic

    是否开启性能统计

    false

    用于监控Reader端和Writer端的数据同步性能


  • 【Reader插件基本参数】

    PluginReaderParameter是DataLink所有Reader插件的通用参数,具体类型的Reader插件可以自定义参数,但是必须继承PluginReaderParameter。

    PluginReaderParameter参数 参数描述 默认值 备注

    mediaSourceId

    Reader关联的数据源的id

    冗余存储

    dump

    是否需要dump fetch到的数据

    false

    用于确认Reader端获取的Binlog数据,有助于排查问题

    ddlSync

    是否同步ddl操作

    true

    主要针对关系型数据库


  • 【Writer插件基本参数】

    PluginWriterParameter是DataLink所有Writer插件的通用参数,具体类型的Writer插件可以自定义参数,但是必须继承PluginWriterParameter。

    PluginWriterParameter参数 参数描述 默认值 备注

    poolSize

    Writer的线程池大小

    5

    根据MediaMapping的配置情况设置一个合理的值

    dryRun

    dryRun==true时不会进行实际写入操作

    false


    useBatch

    是否开启批量写入

    true

    不同的Writer对批量写入会有不同的定义

    batchSize

    批量写入时每个批次的大小

    50


    merging

    是否可以对数据进行合并

    false


    maxRetryTimes

    最大重试次数

    3


    retryMode

    重试模式

    Always

    当Task写入出现异常时,有四种处理方式:

    Always,//一直重试
    TimesOutDiscard,//超过重试次数后丢弃数据
    TimesOutError,//超过重试次数后抛异常,终止任务
    NoAndError;//不重试,直接抛异常,终止Task

    syncAutoAddColumn

    目标端缺少列时,是否自动加列

    true

    用于源端加字段时,在目标端自动补全相应字段

    【注】异常处理方式默认为一直重试,直到恢复成功为止,否则一直抛异常和报警。在一直重试的情况下,多数异常可以自动恢复正常,如mysql到mysql的同步目标表缺字段,或者目标库因自身原因导致连接失败等问题,一些特殊异常需要人工介入处理,比如暂不支持的异构数据源之间的ddl同步


Clone this wiki locally

AltStyle によって変換されたページ (->オリジナル) /