Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

2.5_ElasticSearch Writer

elevenqq edited this page Sep 30, 2018 · 2 revisions

一、使用场景

ES-Writer插件主要用于将业务系统其他类型数据源的增量数据同步到ES,应用场景主要有两种:

1)由于订单的海量数据(Mysql)而使查询的速度降低到无法满足业务需要时(当然慢查询,也有可能拖垮整个数据库),我们将使用ElasticSearch作为海量数据的存储介质,作为二级系统来解决查询的性能问题。

2)由于HBase的索引局限性(比较单一,不好扩展),需要将HBase的数据同步到ElasticSearch,来解决该类问题。(建设中)

二、设计原理

目前,Hdfs-Writer插件只实现了将Mysql类型的增量数据同步到ES。自定义的RdbEventRecordHandler继承自com.ucar.datalink.worker.api.handle.AbstractHandler,使用了系统提供的默认流程(详见深入领域),并根据需要自行扩展了一些功能,将RDBMS类型的数据进行转换、加工等处理,最终调用REST_client写入ES相应的索引中。具体的数据写入流程如下图所示:

ES-Writer

  • 【Records分组】
    > 将Records按表名进行分组,不同分组并发写入,保证单表内的局部有序。
    > 若ES-Writer开启了批量写入,则将每张表的Records按照设定的batchSize进行拆分,分批进行数据转换与写入。
  • 【Records转换】
    > Hdfs-Writer暂时只支持insert和update类型的Records数据同步。并且,由于涉及到多表聚合操作,所以不管原始操作是insert还是update,Action都设置为Update。(内置实现自动创建index功能??)
    > 根据Records构建批量提交ES的动作描述信息。首先设置转化json串时,保留值为null的字段,使其能将值为null的字段传递到服务端。并且设置采用前缀处理EsUsePrefix = true,提前根据原始表名构造列名前缀,用于构建列内容。构造ES动作描述具体步骤如下:
    1)填充Index和Type:在Transform阶段tableName已经转化成了"index.type"形式的别名,所以从映射表别名中获取相应的字符串填充ES的Index和Type。
    2)填充主键:若指定了joinColumn,即多表聚合的情况,主表和子表均用joinColumn的值填充主键;若未指定joinColumn,则认为主键就是id,用id的值填充主键。
    3)填充列内容:将RdbEventRecord数据转化为ES数据,其中,主键信息和joinColumn列必须要同步,insert类型的Record所有列全部同步,update类型的Record只同步发生变更的列。
    4)合并地理位置信息:取出映射中配置的地理位置合并信息,将经纬度的值格式化后放到一个对象中,并赋值给合并之后的列。
  • 【ES批量更新】
    > 批量提交:将转换好的数据批量发送到ES,并对ES批量操作返回结果进行校验和解析。
    > 处理结果:若出现写入失败或者更新失败,则进行重试,超过最大重试次数还有错则直接抛异常(默认MAX_TRIES = 3)。其中,当出现更新失败且状态码为404时,将update转为index操作,以写入模式重试。

三、功能介绍

  • 【按表并发】
    > 一批次的Records支持按表分组并发写入。ES-Writer按表开启了多个线程,每个线程负责一张表的ES写入操作,单表内同步是有序的,保证了数据的一致性,同时提高写数据的吞吐量。
  • 【多表聚合】
    > 在同步Mysql数据过程中,可以实现多表聚合功能,即通过设置聚合列名称,将Mysql中多张有外键关系的表,在同步过程中进行聚合,到ES端,多张表的数据合并成一条。实现了将多个相关业务表的数据聚合到ES的同一个索引里,提高了查询性能。
  • 【字段合并】
    > 地理位置信息合并:为了方便查询和展示,将业务数据中的经纬度信息合并成ES中的一个字段,来表示各个场景的地理位置信息。
    > 扩展列:指的是源端并不存在该列,而是在特定业务场景下,凭空创造出的列,如A和B两个Column合并成一个新的Column。例如,自定义拦截器YccOrder2EsInterceptor,用于将源表的若干个字段整合成ES中的一个字段。
    注:当我们既用到了[白名单]又用到了[扩展列]时,需要在Record的元数据中,把扩展列的名字放进去,否则扩展列不在白名单中,将被忽略。
  • 【ddl拦截器】
    > ES-Writer在AbstractHandler内置的基础拦截器前面增加自定义拦截器DdlEventInterceptor,对ddl类型的Record进行sql解析,实现自动同步加字段功能。
    首先将sql中新增的字段名称和类型转换为ES中的字段名称和对应类型,并序列化为columnJson(格式实例:{"properties":{"t_dl_test_source|yy":{"type":"integer"}}}),然后通过映射信息获取要同步到的ES的index和type,以及ES的配置信息,最后调用EsClient的updateMappingIndex方法连接ES集群进行更新。
  • 异常重试
    > ES的批量写入,利用返回结果,采用了异常重试机制,最大限度的保证了写入数据的一致性。

四、插件参数说明

在继承Writer插件通用参数基类(PluginWriterParameter,详见深入Task)的基础上,EsWriterParameter没有扩展自己的参数类,只是实现了插件参数类PluginParameter的基本方法。

五、Mapping参数说明

在Mapping通用配置参数基础上,ES-Writer插件还有自己的专用参数:聚合列名称和列前缀配置参数,用来实现多表合一,地理位置合并配置参数,用来将经度和纬度合并成一个字段。相关Mapping配置参数如下:

Mapping参数 参数描述 默认值 备注

targetMediaSourceId

目标端数据源的id

目标数据源,这里指要同步到的ES集群

targetMediaName

ES中的表别名

支持目标端有表别名,在ES-Writer插件中表示为其自定义的"index"和"type",即表别名格式为:"index.type"

targetMediaNamespace

目标端数据源的namespace

目标端数据源schema,对ES来说默认为空

ColumnMappingMode

列映射模式

NONE

支持列名黑白名单与列别名:

NONE,//所有列均同步到目标端

INCLUDE,//只同步白名单中的列,可以设置列别名

EXCLUDE;//黑名单中的列不同步

writePriority

同步优先级

5

数值越小优先级越高

interceptorId

拦截器id

拦截器可以对Records进行特殊处理,满足少数特定功能的需求。例如YccOrder2EsInterceptor,用于将源表的若干个字段整合成ES中的一个字段

skipIds

主键黑名单

可以通过指定主键id来过滤源端的某些异常Records

valid

是否有效

同步映射有效时,才进行同步

joinColumn

聚合列名称 Mysql多表聚合成ES的一张表时,关联各表的列,注:Mysql主表和子表必须是一对一的关系

esUsePrefix

ES中的列前缀配置 true 当使用前缀时,ES中的每个列使用 "表名|" 做前缀

geoPositionConf

地理位置合并配置

将业务中的经纬度两个字段表示的地理位置信息合并为ES的一个字段,类型为List<GeoPositionMapping>,可以配置多个

注:地理位置合并配置信息举例如下所示,将"latColumnName"和"lonColumnName"合并成为ES中的"columnName"。如专车订单详情中的"预计上/下车经纬度"、"实际上/下车经纬度"等位置信息各为两个字段,合并到ES中的字段为"预计上/下车地理位置"、"实际上/下车地理位置"。

[
{
"columnName": "estimate_board",
"latColumnName": "estimate_board_lat",
"lonColumnName": "estimate_board_lon"
},
{
"columnName": "estimate_off",
"latColumnName": "estimate_off_lat",
"lonColumnName": "estimate_off_lon"
},
{
"columnName": "actual_board",
"latColumnName": "actual_board_lat",
"lonColumnName": "actual_board_lon"
},
{
"columnName": "actual_off",
"latColumnName": "actual_off_lat",
"lonColumnName": "actual_off_lon"
},
{
"columnName": "origin_estimate_off",
"latColumnName": "origin_estimate_off_lat",
"lonColumnName": "origin_estimate_off_lon"
}
]

六、版本说明

关联技术 稳定版本 待测版本
ElasticSearch 2.X > .2X
esClient REST
reader mysql

七、注意事项:

> 配置的ES的用户如果没有index、type、列等的操作权限,需要在目标端手动添加。

八、FAQ

Clone this wiki locally

AltStyle によって変換されたページ (->オリジナル) /