Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

2.1_MySql Reader

elevenqq edited this page Sep 30, 2018 · 2 revisions

一、使用场景

Mysql-Reader插件用于Mysql数据库增量日志解析和数据模型转换,适用于需要实时同步mysql数据的场景,例如:EDA、CQRS、BigData、数据库升级、迁移与备份,以及业务缓存刷新等。

二、设计原理

Mysql-Reader

如上图所示,Mysql-Reader内部处理流程主要分为两个阶段:

  1. 获取Binlog。MysqlReader集成了阿里开源的Canal组件来采集binlog,CanalServer的生命周期和Reader保持一致,CanalServer启动之后会模拟mysql slave向mysql master发送dump请求,mysql master收到dump请求之后会向canal推送Binlog,Canal内部将binlog事件解析为内部的Entry,MysqlReader通过CanalServer提供的api获取这些Entries。
  2. 数据转换。该阶段主要对Entries进行模型转换,由MessageParser将canal组装好的Entry对象转换为自定义的RdbEventRecord对象。

Mysql-Reader的具体工作原理分如下三个阶段介绍:

  • 【启动canal】
    > build canalFilter
    根据源端要同步的库名和表名生成canal的过滤表达式,过滤不需要同步的库和表。
    > generate CanalInstance
    首先进行组装CanalConfig、创建AlarmHandler、动态生成slaveId等准备工作。其中,组装CanalConfig主要从MysqlReaderParameter的参数配置中获取,包括组装源端数据库、配置起始位点、内存缓冲区大小、HA模式(限定为HEARTBEAT)、心跳检测SQL和频率、黑名单等。
    然后,创建具体的CanalInstance,主要完成以下工作:
    (1)初始化MetaManager,设置位点管理器和canal过滤器;
    (2)启动EventParser,设置支持的Mysql的Binlog类型和格式;
    (3)初始化EventSink,设置数据处理的模式。根据源端master数据库的个数设置EventSink:当master数据库个数>1且GroupSinkMode为Coordinate时,将EventSink初始化为FixedGroupEventSink,支持事务保留,其他情况下将EventSink初始化为EntryEventSink。
    > start canalServer
    启动canal服务器,并从相应位点发起客户端消费订阅,同时订阅filter的变化。这里的MysqlReader相当于一个canal的消费端。
    注: 默认使用MetaManager中记录的最后一次消费位点,若为空,则使用canal实例存储的第一个位点。
  • 【获取数据】
    > get Message withoutAck
    canal服务器在检查确认消费端(此处即为Task)已启动并发起订阅之后,才会拿到该客户端的canal实例进行数据获取。首先判断该实例的上批次数据是否仍存在,若存在,则继续使用上批次events,不再获取新数据,若不存在,则根据上批次的batchId来获取本批次events。然后将获取到的events转换为entrys,并把该批次的batchId和entrys封装为Message返回给Mysql-Reader。随后,Mysql-Reader再对Message数据做具体的处理,包括计算该批次数据对应的binlog日志大小、dump数据详情、将entrys解析为RdbEventRecord等。
    > batchTimeout
    Mysql-Reader利用canal服务器获取数据之前,会首先判断Task是否启动了获取批量数据的超时时间控制(batchTimeout),默认不进行超时控制,当无数据时,进行轮询处理。若进行超时控制,当无数据时,则按照超时时间处理。
    注:轮询处理时,为避免空循环机器挂死,允许最多重试3次,超过3次,最多sleep10毫秒。
  • 【解析数据】
    > do filter
    canal返回的entrys首先需要处理数据过滤,主要包括Transaction Begin/End过滤、回环表以及回环数据过滤、canal心跳表数据过滤。
    > parse Entry
    首先将Entry中的数据转换成RowChange,得到其事件类型EventType,然后根据不同的EventType对Entry进行处理。对于Query事件类型的Entry,直接过滤;对于ddl事件类型的Entry,若Mysql-Reader开启了同步ddl开关,则将其转换为RdbEventRecord对象,若未开启则直接忽略;对于dml事件类型的Entry,可以从rowData的beforeColumns和afterColumns中得到所有主键和非主键列变更前和变更后的数据,并按照EventColumn的index进行排序,最后将其转换为RdbEventRecord对象。

三、功能介绍

  • 【HA机制】

    > canal的ha分为两部分,canal server和canal client分别有对应的ha实现:
    canal server:为了减少对mysql dump的请求,不同server上的instance要求同一时间只能有一个处于running,其他的处于standby状态。
    canal client:为了保证有序性,一份instance同一时间只能由一个canal client进行get/ack/rollback操作,否则客户端接收无法保证有序。

    > mysql的ha通过配置多个读IP实现,当源端数据库出现异常时,能够自动切换到健康的DB

  • 【过滤机制】

    > 过滤Binlog:过滤不需要同步的数据库、数据表、事务头、事务尾、回环表、心跳表、以及Query类型的数据等,只保留需要同步的Binlog。

    > 过滤事件:可以设置需要过滤的特定事件类型的Record(INSERT/UPDATE/DELETE,可多选)。

  • 【dump数据详情】

    > Mysql-Reader拿到canal返回的Message之后,可以设置其进行dump Records的详细信息。由于数据量比较大,默认不进行dunp。

  • 【消费起始时间】

    > Mysql-Reader可以配置消费Binlog的起始时间戳,当Task第一次启动时,若指定了起始时间戳,则从基于指定时间戳最近的时间开始寻找消费位点;若不指定起始时间戳,则默认从源端数据库的当前最新时间开始寻找消费位点

    > Task同步过程中,可以重置消费位点,即将消费时间改为过去的某个时间,则Mysql-Reader将从新的时间戳开始寻找消费位点,实现当在数据出现异常时的追数据、补数据的功能。

  • 【支持ddl同步】

    > Mysql-Reader可以设置启用自动同步ddl类型的Events,将其转换为RdbEventRecord对象,主要用于往关系型数据库的同步。系统默认开启该功能。

  • 【其他类型数据源支持】
    > 基于canal的设计机制,Mysql-Reader源端支持的数据源类型除了Mysql,还支持MariaDB 5.5.35和10.0.7(理论上可支持以下版本),以及部分oracle版本。

四、插件参数说明

  • 【MysqlReaderParameter】
    在继承Reader插件通用参数基类(PluginReaderParameter,详见深入Task)的基础上,MysqlReaderParameter还根据canal的特点扩展了自己的参数类,用户可以根据需求在页面更改其参数配置。

    MysqlReaderParameter扩展参数 参数描述 默认值 备注
    dumpDetail

    是否需要dump records详情

    false 可以设置dump records详情
    startTimeStamps 起始时间戳 源端数据库当前最新时间 Task第一次启动时,通过该时间戳寻找位点
    fallbackIntervalInSeconds 数据库发生切换查找时回退的时间 60(单位:s) 当源端数据库出现问题发生DB切换时,默认canal回退60s查找位点

    batchTimeout

    获取批量数据的超时时间

    -1L(单位:ms)

    -1代表不进行超时控制,0代表永久,>0则表示按照指定的时间进行控制

    messageBatchSize

    Message订阅批次大小

    2000

    参数调大了可以增大系统的吞吐率,调小了增加同步的实时性
    memoryStorageBufferSize 缓存记录数 32 * 1024(单位:byte) 内存存储的buffer大小
    memoryStorageBufferMemUnit 缓存记录单元大小 1024(单位:byte) 内存存储的buffer单元大小
    detectingSQL 心跳检测sql select 1 Reader插件与源端数据库之间有心跳检测线程
    detectingIntervalInSeconds 心跳检测频率 3(单位:s)
    detectingTimeoutThresholdInSeconds 心跳超时时间 30(单位:s)
    detectingRetryTimes 心跳检测失败重试次数 3
    defaultConnectionTimeoutInSeconds 连接超时时间(sotimeout) 30(单位:s)
    receiveBufferSize mysql连接接收到的BufferSize 64 * 1024(单位:byte)
    sendBufferSize mysql连接发送的BufferSize 64 * 1024(单位:byte)
    GroupSinkMode 分组同步模式下的Event-Sink模式 Coordinate

    Coordinate,//所有eventparser必须相互协同,保证event在时间序列上全局有序,保证所有分库必须都同时正常才进行数据同步(生产环境需用该模式,保证数据全局有序,把所有分库看成一个整体)

    Separate;//eventparser各自进行数据同步,不需要相互协同,event只是局部有序,一个分库出现问题不会影响其它分库数据同步(测试和预生产推荐使用该模式,因为测试和预生产数据量比较小,很容易出现某个分库长时间没数据的情况,这种情况下会出现协调等待,其它分库的数据也无法同步了)

    filteredEventTypes 需要过滤的某些特定类型的事件 INSERT,UPDATE,DELETE(可多选)
    blackFilter 正则表达式匹配表黑名单,忽略解析 .*\\._.* 默认过滤所有以"_"开头的表


  • 【RdbEventRecord】
    Mysql-Reader插件将每条Mysql的变更数据抽象为RdbEventRecord。其主要参数如下:

    RdbEventRecord参数 参数描述 备注

    tableName

    数据表名


    schemaName

    数据库(实例)名称


    EventType

    变更数据的业务类型(I/U/D/C/A/E)


    executeTime

    发生数据变更的业务时间


    oldKeys(List<EventColumn>)

    变更前的主键值

    和oldColumns不同的是,只有主键发生变化时,才需要给oldKeys设置值;

    而oldColumns,不管前后是否发生更新变化,都会赋值

    keys(List<EventColumn>)

    变更后的主键值

    如果是insert/delete,变更前和变更后的主键值是一样的

    oldColumns(List<EventColumn>)

    变更前非主键的其他字段


    columns(List<EventColumn>)

    变更后非主键的其他字段


    sql

    对应的sql语句

    当eventType = CREATE/ALTER/ERASE时,就是对应的sql语句,其他情况为动态生成的INSERT/UPDATE/DELETE sql

    ddlSchemaName

    ddl/query的schemaName

    会存在跨库ddl,需要保留执行ddl的当前schemaName

    hint

    生成对应的hint内容


    RSI Record资源标识符


五、Mapping参数说明

  • 【Mapping相关参数】
    一个Mapping代表了一个同步方向,即要将哪个库的哪张表的哪些字段和数据同步到哪个库的哪张表中。映射关系为一对多,即一张表可以同步到多个目标端数据源。Mysql-Reader插件的相关Mapping配置参数如下:

    Mapping参数 参数描述 备注

    taskId

    所属Task的id

    每个映射均属于一个Task

    sourceMediaId

    源端表的id

    源表名称的模式有:

    SINGLE,//正常单表名称

    MULTI,//支持类似"offer[0000-0031]"的分库分表模式

    WILDCARD,//支持全库同步的通配符

    YEARLY,//支持按年分表的表名模式

    MONTHLY;//支持按月分表的表名模式


六、版本说明

关联技术 稳定版本 待测版本
canal 1.0.24
mysql 5.7及以下

七、注意事项

  • 【变更表结构的一些限制】
    需要明确一点:
    * Binlog对每条日志事件,并没有记录列名,只是按顺序记录了每个列的值
    * com.alibaba.otter.canal.protocol.CanalEntry中的列名、列类型等信息,是canal拿到binlog数据,通过反查元数据表构造出来的
    所以,参与数据同步的表不能进行如下类型的表结构变更,否则Binlog回溯时可能会出现错误

    变更类型 描述
    Table-Rename 表重命名之后,如果进行binlog回溯,重命名之前binlog中保存的表名在数据库当前元数据中查不到
    canal会抛异常
    Column-Rename 列重命名没有binlog回溯的问题
    Column-Rename不支持,是因为脚本检测的时候,涉及到黑白名单的检查,实现起来比较麻烦,暂未实现
    Column-Drop 列删除之后,如果进行binlog回溯,删除前binlog中列的数量和数据库当前元数据中列的数量不一致,
    canal会抛异常,即使采取不抛错的策略,构造出来的【列名和列值映射】也是错误的
    Add-After-Column 增加列的时候,只能增加到末尾
    否则回溯binlog的时候,Add操作之前binlog中列的顺序和数据库当前元数据中列的顺序不一致,
    构造出来的【列名和列值映射】是错误的
    Modify-After-Column 原因同Add-After-Column

    ps:binlog回溯的相关问题可参考canal源码,com.alibaba.otter.canal.parse.inbound.mysql.dbsync.LogEventConvert

  • 【canal说明】

    Mysql-Reader插件依赖于canal提供数据库日志,针对mysql数据有一些要求,具体请查看: https://github.com/alibaba/canal/wiki/QuickStart

    注意:目前canal支持mixed,row,statement多种日志协议的解析,但配合DataLink进行数据库同步,目前仅支持row协议的同步,使用时需要注意。

八、FAQ

canal参考资料:https://github.com/alibaba/canal

Clone this wiki locally

AltStyle によって変換されたページ (->オリジナル) /