Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

2.4_HDFS Writer

elevenqq edited this page Sep 30, 2018 · 2 revisions

一、使用场景

Hdfs-Writer插件主要用于将其他类型数据源(如RDBMS和HBase)的增量数据进行处理和转换,写入HDFS相应的文件路径下。数据处理平台每天凌晨对T-1的数据进行清洗、去重处理,把同步过去的增量数据更新到spark-hive表中,供大数据分析和查询使用。

二、设计原理

Hdfs-Writer插件自定义的BaseRecordHandler继承自com.ucar.datalink.worker.api.handle.AbstractHandler,使用了系统提供的默认流程(详见深入领域),并根据源端数据源类型扩展了两个子类RdbEventRecordHandler和HRecordHandler,分别负责将RDBMS和HBase类型的数据进行转换,最终写入HDFS相应路径的文件中。以源端为mysql数据库为例,具体的数据写入流程如下图所示(源端为HBase时,Hdfs-Writer的数据处理流程相同,只是写入Hadoop的文件路径不同):

Hdfs-Writer

  • 【Records分组】
    > 首先将Records按目标端库名、表名进行分组,不同分组并发写入。
    > 然后将每张表的Records根据mapping-id再次进行聚合,因为多表合一的场景下,会存在同一个TableGroup下mapping-id不相同的情况。
    > 聚合之后的mappingRecords中,所有Record的namespace和name都相同,并且所有Record对应的MediaMappingInfo对象都相同。
  • 【Records转换】
    > 将mappingRecords按batchSize进行拆分,分批对数据进行转换。首先根据目标端HDFS数据源信息和Hdfs-Writer参数构建HdfsConfig,包括HDFS地址、ZK配置、hadoopUser、写数据包大小、高可用设置等。接着将batchRecords构造为一个Map对象,key为Hdfs文件路径,value为构造好的待写入数据,准备向HDFS执行写入。
    > 构建Hdfs文件路径:RDBMS的数据类型为delete时,为其设置单独的写入文件路径;HBase的schemaName为"default"时,Hdfs文件路径中省略schemaName。
    > 构造写入数据:将每个RdbEventRecord/HRecord中的各列转换为map并序列化,其中key为columnName,value为columnValue,并为RDBMS类型的数据另外新增三个字段:binlog_ts和binlog_seq组合来标识数据写入的先后顺序,binlog_eventtime标识数据的实际产生时间。
  • 【数据写入】
    > 单表支持按BatchSize拆分写入。即将每张表按mapping-id聚合后的Records根据batchSize进行拆分,分批进行数据转换和写入。将每批次的batchRecords构造好对应的数据之后,通过调用写入对应的HDFS文件路径。
    > 向HDFS的某个文件路径写入数据之前,首先要从FileStreamHolder管理的文件流tokens中获取其文件流,若不存在,则由HDFS的DistributedFileSystem实例来创建该路径的文件流,然后通过文件流写入数据(数据大小不能大于Hdfs-Writer设置的写入HDFS的数据包大小),并根据HDFS的提交方式进行hflush&hsync,二者可动态切换,前者保证把数据刷到OS级缓存,后者保证数据落盘。若写入过程中出现异常,则进行关流操作。
  • 【文件流管理】
    > FileStreamHolder是HDFS文件流管理类,由TaskWriter主线程调用。一个文件对应了一个文件流,创建文件流时FSNamesystem会把流对应的path放到Lease中,关闭文件流时FSNamesystem会把流对应的path从Lease中移除。
    > 对文件流的管理需要保证以下几个原则:
    1)流的生命周期应该和Task保持一致,Task运行过程中流随用随创建,Task关闭时把其占有的所有流也关闭,这样才能保证在发生Reblance后,不会出现租约被其它DFSClient占用的问题。
    2)超时不用的流要及时清理,保证其它使用者有机会获取权限,比如发生日切之后,所有的数据都写到新文件中了,前一天的文件不会再有写入操作,那么应该及时关闭前一天的文件流。

三、功能介绍

  • 【按表并发写入】
    > 一批次的Records支持按表分组并发写入。Hdfs-Writer按表开启了多个线程,每个线程负责一张表的HDFS写入操作,提高写数据的吞吐量。
  • 【解决租约问题】
    > HDFS的读写模式为 "write-once-read-many",为了实现write-once,需要设计一种互斥机制,即某个文件在同一时刻只能被一个客户端写入,租约应运而生,保证了数据的一致性。
    > DataLink的应用场景为:一个Worker进程中运行着一批Task,每个Task的Hdfs-Writer负责管理Hdfs中的N个文件,当发生(Re-)balance的时候,Task关闭之前会先进行关流操作,及时释放租约,保证其它Worker可以接管文件写入。关流不成功的情况下,Task重新分配可能会触发Other-Create或者Re-Create问题。
    > 解决方法:
    1)Other-Create异常中包含了other-Dfsclient的IP信息,我们可以调用other-worker提供的接口,远程关闭出问题的流,如果关闭失败或者访问出现超时(宕机的时候会超时),再进行force recovery操作。
    2)流关闭时可能会出现Re-Create异常,如果出现异常,需要进行force recovery操作,否则的话租约将一直不可释放,一直报Re-Create异常。
    3)FileStreamHolder在关闭的时候有些流可能会关闭失败,此时token不能被remove,而是需要把tokenSize大于0的Holder放到leakHolders队列中,后台线程定时清理这些holder,避免引发Lease的ReCreate或OtherCreate问题。
    4)后台定时线程进行的另一项工作是,文件流超时自动关闭。定时对文件流进行空闲时间进行检测,某个流的空闲时间超过指定最大空闲时间(默认60秒),会自动关闭。
    注:DataLink租约问题详解和源码分析参见文章链接:https://www.cnblogs.com/ucarinc/p/8064447.html
  • 【流的并发处理】
    > FileStreamHolder的close方法和getStreamToken方法会被不同的线程调用,tokens存在并发问题,引入读写锁,进行并发处理。
    > 具体的并发场景(问题)为:
    close方法被TaskWriter主线程调用,getStreamToken方法被AbstractHandler的executorService线程调用,如果不进行并发控制,可能会导致close执行结束后,仍然有新的FileStreamToken(ps:假设其对应的pathString名字为p1)加入进来,那么新加入的token对应的FileStream没有机会被关闭。待Task重新启动之后,会实例化一个新的FileStreamHolder对象,新对象的tokens里面并没有p1对应的FileStreamToken,那么会尝试去创建,因为之前的那个FileStream没有被关闭,本次创建会导致hdfs的租约异常:because current leaseholder is trying to recreate file,并且无法自动恢复。
    > 注:close方法被调用的时候TaskWriter主线程都已经接近尾声了,AbstractHandler的executorService为什么还有线程在运行??
    参见AbstractHandler的submitAndWait方法,我们用到了ExecutorCompletionService,当出现异常的时候,会尝试执行Future.cancel,但这并不会导致线程的立即结束(参见Thread.interrupt()方法),所以当主线程接近尾声的时候,线程池中的操作是有可能还未结束的。
  • 【访问多个Hadoop集群】
    > 在一个Worker进程中,可能运行着多个Task的Hdfs-Writer,若要一个进程内同时访问多个hadoop集群,那就需要针对每个集群分别创建各自的FileSystem实例,需要做的有两点:
    1)保证针对这多个集群的Configuration实例中的 "fs.defaultFS" 的配置是不同的;
    2)HADOOP_USER_NAME属性不能通过System.setProperty方法设置,应该调用FileSystem的get方法时动态传入。
  • 【缓存资源】
    > 对于同一个HDFS,可能会有多次写入,但是不能每次访问都创建一个Configuration,为了提高性能,Hdfs-Writer使用CacheBuilder为每个HDFS缓存一份配置,从而减少后续的网络传输开销,加快查找连接速度。
    > 对于同一个Hadoop集群,不能每次写入都要创建其FileSystem实例,因此同样使用CacheBuilder进行缓存,减少重复创建开销。
    > 对于同一个HDFS文件,可能会有不同的客户端写入,为了实现互斥机制,每个客户端进行写入时都会生成一个ReentrantLock,同样使用CacheBuilder进行缓存,减少内存开销。
    > 对于同一个HDFS文件,可能会有多次写入,即要多次用到该文件的文件流,所以将各个HDFS文件的文件流在内存中缓存起来,如果不进行缓存,而是频繁的创建和关闭文件流,存在两个问题:
    1)性能问题:打开和关闭流都是耗费资源的操作,每接收到一次请求都创建和关闭一次文件流,不合理。
    2)租约释放问题:HDFS文件流的关闭是一个较复杂的过程,针对同一文件,如果刚刚执行完关闭操作,立刻又开启一个文件流,频繁如此操作容易出现租约冲突,最终会积压一堆冲突,甚至无法恢复。
  • 【文件拆分方式】
    > HDFS的文件拆分方式支持DAY, HOUR, HALFHOUR三种(同一张表每天/每小时/每半小时生成一个文件)。可以在映射中配置HdfsFileParameter参数,设置多种FileSplitStrategy,每种文件拆分方式有不同的生效日期,自动取距离当前时间最近的生效时间对应的文件拆分方式生成文件。没有明确配置的话,文件以天为单位进行管理,同一张表每天对应一个文件。

四、插件参数说明

在继承Writer插件通用参数基类(PluginWriterParameter,详见深入Task)的基础上,HdfsWriterParameter还针对HDFS类型数据库的特点扩展了自己的参数类,用户可以根据需求在页面更改其参数配置。

HdfsWriterParameter扩展参数 参数描述 默认值 备注

CommitMode

HDFS的提交方式

Hflush

CommitMode分为Hflush, Hsync两种

streamLeisureLimit

流的空闲时间最大值

600000

单位:毫秒,用于流的超时关闭

hdfsPacketSize

客户端写入HDFS的数据包大小

20971520

20*1024*1024,packet-size设置为20M,保证数据一次性发送到hdfs-server端,避免被截断

hsyncInterval

当CommitMode为Hflush的时候,进行hsync的时间间隔

30000

单位:毫秒,用于控制当CommitMode为Hflush的时候,多长时间进行一次hsync操作

hbasePath

HBase增量数据在HDFS的路径前缀

"user/hbase"

根据文件拆分方式写入具体的路径

mysqlBinlogPath

Mysql增量数据在HDFS的路径前缀

"user/mysql/binlog"

根据文件拆分方式写入具体的路径

五、Mapping参数说明

在Mapping通用配置参数基础上,Hdfs-Writer插件还定义了自己的参数类HdfsFileParameter,用来设置写入HDFS路径的文件拆分策略。相关Mapping配置参数如下:

Mapping参数

参数描述

默认值

备注

targetMediaSourceId

目标端数据源的id

目标数据源,这里指要同步到的HDFS集群

targetMediaName

HDFS中表的名字

支持目标端有表别名,用于源端和目标端表名不一致的情况

targetMediaNamespace

目标端数据源的namespace

目标端数据源schema,对HDFS来说默认为空

ColumnMappingMode

列映射模式

NONE

支持列名黑白名单与列别名:

NONE,//所有列均同步到目标端

INCLUDE,//只同步白名单中的列,可以设置列别名

EXCLUDE;//黑名单中的列不同步

writePriority

同步优先级

5

数值越小优先级越高

interceptorId

拦截器id

拦截器可以对Records进行特殊处理,满足少数特定功能的需求

skipIds

主键黑名单

可以通过指定主键id来过滤源端的某些异常Records

valid

是否有效

同步映射有效时,才进行同步

HdfsFileParameter

设置写入HDFS路径的文件拆分策略
每天一个文件

FileSplitMode支持DAY, HOUR, HALFHOUR三种文件拆分方式,默认按天生成文件。用户可以根据需要在映射中进行参数配置,参数配置格式举例:

{"@type":"com.ucar.datalink.domain.plugin.writer.hdfs.HdfsFileParameter","fileSplitStrategieList":[{"effectiveDate":"2016-11-30","fileSplitMode":"HALFHOUR"}]}


六、版本说明

关联技术 稳定版本 待测版本

hadoop-hdfs

2.6.3


七、注意事项


八、FAQ

参考资料:

https://www.cnblogs.com/ucarinc/p/8064447.html

https://stackoverflow.com/questions/44349970/hive-doesnt-read-latest-hdfs-file-data-until-i-close-outputstream

Clone this wiki locally

AltStyle によって変換されたページ (->オリジナル) /