Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

2.6_HBase Writer

elevenqq edited this page Sep 30, 2018 · 3 revisions

一、使用场景

HBase-Writer插件实现了关系型数据库Rdbms向HBase数据同步的功能,主要使用场景为增量数据实时归档,解决DBA方归档数据表加字段困难的问题。

二、设计原理

HBase-Writer插件自定义的RdbEventRecordHandler继承自com.ucar.datalink.worker.api.handle.AbstractHandler,使用了系统提供的默认流程(详见深入领域),并根据需要自行扩展了一些功能,执行最终的HBase写入操作。具体的数据写入流程如下图所示:

HBase-Writer

  • 【Records分组】
    > 首先将Records按目标端库名、表名进行分组,不同分组并发写入。
    > 然后将每张表的Records根据mapping-id再次进行聚合,因为多表合一的场景下,会存在同一个TableGroup下mapping-id不相同的情况。
  • 【HTable加载】
    > 初始化org.apache.hadoop.conf.Configuration
    要向某个HBase写数据,首先要通过配置HConstants中的参数,来建立与该HBase的zk连接,包括zk的IP地址、端口、父节点。
    > 创建HTable
    由configuration和tableName创建相应的HTable,客户端通过HTable对象与HBase服务端进行CRUD操作(增删改查)。
  • 【Records转换】
    > 将batchRecords中的数据操作转换为HBase相应的数据操作,准备向HBase执行写入。
    > 转换原则:
    1)Rdb的Insert和Update操作转换为HBase的put操作,或者说,HBase的Put操作即相当于关系型数据库的replace操作;
    2)Rdb的Delete操作通过Mapping的扩展参数控制,默认不同步,需要同步时进行特殊配置。当设置Mapping参数syncDelete = true时,将Rdb的Delete转换为HBase的delete操作
    > 转换模型:
    1)HBase表的列族:默认为default;
    2)事件类型为Insert和Update时,Rdb的主键作为row key(不支持联合主键),主键和非主键均作为非row key同步,即主键在非row key中再存一遍。事件类型为delete时,只将Rdb的主键作为row key,在HBase端执行删除操作,非主键列无需同步
  • 【HTable写入】
    > 单表支持按BatchSize拆分写入。即将每张表按mapping-id聚合后的Records根据batchSize进行拆分,分批进行写入。每批次的batchRecords转换为对应的row key和columns之后,通过调用HTable的Put/Delete写入HBase,最后调用flushCommits显示发起请求提交。

三、功能介绍

  • 【并发写入】
    > 一批次的RdbEventRecord支持按表分组并发写入。HBase-Writer开启了多个HTable写线程,每个写线程负责一个HTable对象的flush操作,提高写数据的吞吐量。
    注:HTable对象对于客户端读写数据来说不是线程安全的,因此这里的多线程,是为目标端不同的表开启,每个线程单独创建复用自己的HTable对象,不同线程间不会共享HTable对象使用,能够保证数据的一致性。
  • 【批量写入】
    > HTable.put(Put)可以将一个指定的row key记录写入HBase,为了提高性能,HBase-Writer使用HTable.put(List<Put>)可以将指定的row key列表,批量写入多行记录。批量执行的好处是,只需要一次网络I/O开销,这对于对数据实时性要求高,网络传输RTT高的情景下来明显地提升性能。
    > HBase-Writer初始化HTable时,设置了HTable.setAutoFlush(false),将HTable的自动flush关闭,可以启动Write Buffer模式,批量写入数据到HBase。否则的话,默认情况下auto flush是开启的,每有一个put就要向服务端发一个请求,执行一次更新。
    autoFlush = false这种模式下,向服务端提交的时机分为显式和隐式两种情况:
    1) 显式提交:客户端调用flushCommits()进行提交;
    2) 隐式提交:当Put填满Write Buffer时,客户端才实际向HBase服务端发起写请求,自动执行提交;或者调用了HTable的close()方法时无条件执行提交操作。
    HBase-Writer中,在HTable put每批次数据时进行一次显示提交,防止其运行出现问题时,会导致在Write Buffer中未提交的数据丢失。
    注:默认配置下,Write Buffer大小为2MB,可以根据应用实际情况,通过HTable.setWriteBufferSize(writeBufferSize)设置HTable客户端的写buffer大小。
  • 【缓存资源】
    > 对于同一个HBase,可能会有多次写入,但是不能每次访问都创建一个Configuration,为了提高性能,HBase-Writer使用CacheBuilder为每个HBase缓存一份配置,实现HTable对象之间共享Configuration对象,从而减少后续的网络传输开销,加快查找过程速度。
    > 对于HBase中的同一张表HTable,可能会有多次写入,所以HBase-Writer同样使用CacheBuilder进行缓存,可以规避HTable对象的重复创建开销。
  • 【删除事件同步】
    > 对于删除事件,由于HBase-Writer的主要使用场景为数据归档,所以默认不同步。其他场景可以通过配置Mapping映射的扩展参数,设置同步删除事件。

四、插件参数说明

在继承Writer插件通用参数基类(PluginWriterParameter,详见深入Task)的基础上,HBaseWriterParameter没有扩展自己的参数类,只是实现了插件参数类PluginParameter的基本方法。

五、Mapping参数说明

在Mapping通用配置参数基础上,HBase-Writer插件还定义了自己的参数类HBaseSyncParameter,用来设置是否同步Delete事件。相关Mapping配置参数如下:

Mapping参数

参数描述

默认值

备注

targetMediaSourceId

目标端数据源的id

目标数据源,这里指要同步到的HBase集群

targetMediaName

HBase中表的名字

支持目标端有表别名,用于源端和目标端表名不一致的情况

targetMediaNamespace

目标端数据源的namespace

目标端数据源schema,对HBase来说默认为空

ColumnMappingMode

列映射模式

NONE

支持列名黑白名单与列别名:

NONE,//所有列均同步到目标端

INCLUDE,//只同步白名单中的列,可以设置列别名

EXCLUDE;//黑名单中的列不同步

writePriority

同步优先级

5

数值越小优先级越高

interceptorId

拦截器id

拦截器可以对Records进行特殊处理,满足少数特定功能的需求

skipIds

主键黑名单

可以通过指定主键id来过滤源端的某些异常Records

valid

是否有效

同步映射有效时,才进行同步

syncDelete

HBase-Writer是否同步Delete事件 false

需要向HBase同步Delete事件时,在映射中配置参数,将syncDelete置为true:

{"@type":"com.ucar.datalink.domain.plugin.writer.hbase.HBaseSyncParameter","syncDelete":true}

六、版本说明

关联技术 稳定版本 待测版本
hbase 0.98.16.1-hadoop2

七、注意事项

  • 【HBase shell 常用命令】

    名称

    命令表达式

    创建表

    create '表名称', '列名称1','列名称2','列名称N'

    添加记录

    put '表名称', '行rowkey值', '列名称:', '值'

    查看记录

    get '表名称', '行rowkey值'

    查看表中的记录总数

    count '表名称'

    删除记录

    delete '表名' ,'行rowkey值' , '列名称'

    删除一张表

    先要屏蔽该表,才能对该表进行删除,第一步 disable '表名称' 第二步 drop '表名称'

    查看所有记录

    scan "表名称"

    查看某个表某个列中所有数据

    scan "表名称" , ['列名称:']

    更新记录

    就是重写一遍进行覆盖

    清空表中所有记录 truncate '表名称'


  • 【HBase-Writer同步功能测试】
    > 配置同步
    在DataLink测试环境部署代码,新建ucar_datalink_2_hbase的任务,配置映射表t_dl_test_source同步到表tsqq。
    > HBase建表
    登陆测试环境的hbase-namenode-ip
    进入hbase shell命令:sh hbase shell
    hbase(main):001:0> create ‘tsqq','default'
    > 源端产生Binlog测试
    1)INSERT
    在源端进行插入一条记录,INSERT INTO `t_dl_test_source` (`name`, `create_time`, `modify_time`, `yy`, `cc`) VALUES ('gggg', NOW(), NOW(),222,111);
    查看目标端:
    hbase(main):006:0> scan 'tsqq'
    ROW COLUMN+CELL
    36 column=default:cc, timestamp=1512534970035, value=1
    36 column=default:create_time, timestamp=1512534970035, value=2017年12月06日 11:43:13
    36 column=default:id, timestamp=1512534970035, value=36
    36 column=default:modify_time, timestamp=1512534970035, value=2017年12月06日 11:43:13
    36 column=default:name, timestamp=1512534970035, value=gggg
    36 column=default:yy, timestamp=1512534970035, value=222
    1 row(s) in 0.0710 seconds
    2)UPDATE
    在源端更新刚插入的这条记录:UPDATE `t_dl_test_source` SET name = 'hhhh' WHERE id = 36;
    查看目标端:
    hbase(main):007:0> scan 'tsqq'
    ROW COLUMN+CELL
    36 column=default:cc, timestamp=1512534970035, value=1
    36 column=default:create_time, timestamp=1512534970035, value=2017年12月06日 11:43:13
    36 column=default:id, timestamp=1512534970035, value=36
    36 column=default:modify_time, timestamp=1512534970035, value=2017年12月06日 11:43:13
    36 column=default:name, timestamp=1512534970035, value=hhhh
    36 column=default:yy, timestamp=1512534970035, value=222
    1 row(s) in 0.0710 seconds
    3)DELETE
    在源端删除刚插入的这条记录:DELETE FROM `t_dl_test_source` WHERE id = 36;
    查看目标端,记录仍存在:
    hbase(main):008:0> scan 'tsqq'
    ROW COLUMN+CELL
    36 column=default:cc, timestamp=1512534970035, value=1
    36 column=default:create_time, timestamp=1512534970035, value=2017年12月06日 11:43:13
    36 column=default:id, timestamp=1512534970035, value=36
    36 column=default:modify_time, timestamp=1512534970035, value=2017年12月06日 11:43:13
    36 column=default:name, timestamp=1512534970035, value=hhhh
    36 column=default:yy, timestamp=1512534970035, value=222
    1 row(s) in 0.0710 seconds
    在映射中配置参数,将syncDelete置为true:{"@type":"com.ucar.datalink.domain.plugin.writer.hbase.HBaseSyncParameter","syncDelete":true}
    在源端再次删除这条记录:DELETE FROM `t_dl_test_source` WHERE id = 36;
    查看目标端,相应的删除了rowkey为36的所有记录:
    hbase(main):009:0> scan 'tsqq'
    ROW COLUMN+CELL
    0 row(s) in 0.0100 seconds

八、FAQ



Clone this wiki locally

AltStyle によって変換されたページ (->オリジナル) /