DataX Doriswriter
The DataX Doriswriter plugin supports synchronizing data from various data sources, such as MySQL, Oracle, and SQL Server, into Doris using the Stream Load method.
This plugin needs to be used together with the DataX service. DataX supports multiple data sources. For more details, see here.
Usage
Directly Download the DataX Installation Package
DataX provides an official installation package that already includes DataX, which can be downloaded and used directly. For more details, refer to here.
Compile the DorisWriter Plugin Manually
Download the source code for the DorisWriter plugin.
-
Run
init-env.sh -
Modify code of doriswriter in
DataX/doriswriterif you need. -
Build doriswriter
Build doriswriter along:
`mvn clean install -pl plugin-rdbms-util,doriswriter -DskipTests`If you need to compile the entire DataX project, please refer to here
Compilation error
If you encounter the following compilation errors:
```
Could not find artifact com.alibaba.datax:datax-all:pom:0.0.1-SNAPSHOT ...
```
You can try the following solutions:
1. Download [alibaba-datax-maven-m2-20210928.tar.gz](https://doris-thirdparty-repo.bj.bcebos.com/thirdparty/alibaba-datax-maven-m2-20210928.tar.gz)
2. After decompression, copy the resulting `alibaba/datax/` directory to `.m2/repository/com/alibaba/` corresponding to the maven used, and try to compile again.
Datax DorisWriter parameter introduction:
-
jdbcUrl
- Description: Doris's JDBC connection string, the user executes preSql or postSQL.
- Mandatory: Yes
- Default: None
-
loadUrl
- Description: As a connection target for Stream Load. The format is "ip:port". Where IP is the FE node IP, port is the http_port of the FE node. You can fill in more than one, separated by commas in English:
,, doriswriter will visit in a polling manner. - Mandatory: Yes
- Default: None
- Description: As a connection target for Stream Load. The format is "ip:port". Where IP is the FE node IP, port is the http_port of the FE node. You can fill in more than one, separated by commas in English:
-
username
- Description: The username to access the Doris database
- Mandatory: Yes
- Default: None
-
password
- Description: Password to access Doris database
- Mandatory: No
- Default: empty
-
connection.selectedDatabase
- Description: The name of the Doris database that needs to be written.
- Mandatory: Yes
- Default: None
-
connection. table
- Description: The name of the Doris table that needs to be written.
- Mandatory: Yes
- Default: None
- Description: The name of the Doris table that needs to be written.
-
flushInterval
- Description: The time interval at which data is written in batches. If this time interval is set too small, it will cause Doris write blocking problem, error code -235, and if you set this time interval too small,
maxBatchRowsandbatchSizeparameters are set too large, then it may not be able to reach you The data size set by this will also be imported. - Mandatory: No
- Default: 30000 (ms)
- Description: The time interval at which data is written in batches. If this time interval is set too small, it will cause Doris write blocking problem, error code -235, and if you set this time interval too small,
-
column
- Description: The fields that the destination table needs to write data into, these fields will be used as the field names of the generated Json data. Fields are separated by commas. For example: "column": ["id","name","age"].
- Mandatory: Yes
- Default: No
-
preSql
- Description: Before writing data to the destination table, the standard statement here will be executed first.
- Mandatory: No
- Default: None
-
postSql
- Description: After writing data to the destination table, the standard statement here will be executed.
- Mandatory: No
- Default: None
-
maxBatchRows
- Description: The maximum number of rows for each batch of imported data. Together with batchSize, it controls the number of imported record rows per batch. When each batch of data reaches one of the two thresholds, the data of this batch will start to be imported.
- Mandatory: No
- Default: 500000
-
batchSize
- Description: The maximum amount of data imported in each batch. Works with maxBatchRows to control the number of imports per batch. When each batch of data reaches one of the two thresholds, the data of this batch will start to be imported.
- Mandatory: No
- Default: 94371840
-
maxRetries
- Description: The number of retries after each batch of failed data imports.
- Mandatory: No
- Default: 3
-
labelPrefix
- Description: The label prefix for each batch of imported tasks. The final label will have
labelPrefix + UUIDto form a globally unique label to ensure that data will not be imported repeatedly - Mandatory: No
- Default:
datax_doris_writer_
- Description: The label prefix for each batch of imported tasks. The final label will have
-
loadProps
-
Description: The request parameter of StreamLoad. For details, refer to the StreamLoad introduction page. Stream load - Apache Doris
This includes the imported data format: format, etc. The imported data format defaults to csv, which supports JSON. For details, please refer to the type conversion section below, or refer to the official information of Stream load above.
-
Mandatory: No
-
Default: None
-
Example
1. Stream reads the data and imports it to Doris
For instructions on using the doriswriter plug-in, please refer to here.
2.Mysql reads the data and imports it to Doris
1.Mysql table structure
CREATETABLE`t_test`(
`id`bigint(30)NOTNULL,
`order_code`varchar(30)DEFAULTNULLCOMMENT'',
`line_code`varchar(30)DEFAULTNULLCOMMENT'',
`remark`varchar(30)DEFAULTNULLCOMMENT'',
`unit_no`varchar(30)DEFAULTNULLCOMMENT'',
`unit_name`varchar(30)DEFAULTNULLCOMMENT'',
`price`decimal(12,2)DEFAULTNULLCOMMENT'',
PRIMARYKEY(`id`)USINGBTREE
)ENGINE=InnoDBDEFAULTCHARSET=utf8mb4 ROW_FORMAT=DYNAMIC COMMENT='';
2.Doris table structure
CREATETABLE`ods_t_test`(
`id`bigint(30)NOTNULL,
`order_code`varchar(30)DEFAULTNULLCOMMENT'',
`line_code`varchar(30)DEFAULTNULLCOMMENT'',
`remark`varchar(30)DEFAULTNULLCOMMENT'',
`unit_no`varchar(30)DEFAULTNULLCOMMENT'',
`unit_name`varchar(30)DEFAULTNULLCOMMENT'',
`price`decimal(12,2)DEFAULTNULLCOMMENT''
)ENGINE=OLAP
UNIQUEKEY(id`, `order_code`)
DISTRIBUTED BY HASH(`order_code`) BUCKETS 1
PROPERTIES (
"replication_allocation"="tag.location.default: 3",
"in_memory"="false",
"storage_format"="V2"
);
3.Create datax script
my_import.json
{
"job":{
"content":[
{
"reader":{
"name":"mysqlreader",
"parameter":{
"column":["id","order_code","line_code","remark","unit_no","unit_name","price"],
"connection":[
{
"jdbcUrl":["jdbc:mysql://localhost:3306/demo"],
"table":["employees_1"]
}
],
"username":"root",
"password":"xxxxx",
"where":""
}
},
"writer":{
"name":"doriswriter",
"parameter":{
"loadUrl":["127.0.0.1:8030"],
"column":["id","order_code","line_code","remark","unit_no","unit_name","price"],
"username":"root",
"password":"xxxxxx",
"postSql":["select count(1) from all_employees_info"],
"preSql":[],
"flushInterval":30000,
"connection":[
{
"jdbcUrl":"jdbc:mysql://127.0.0.1:9030/demo",
"selectedDatabase":"demo",
"table":["all_employees_info"]
}
],
"loadProps":{
"format":"json",
"strip_outer_array":"true",
"line_delimiter":"\\x02"
}
}
}
}
],
"setting":{
"speed":{
"channel":"1"
}
}
}
}
Remark:
"loadProps":{
"format":"json",
"strip_outer_array":"true",
"line_delimiter":"\\x02"
}
- Here we use JSON format to import data
line_delimiterdefaults to a newline character, which may conflict with the value in the data, we can use some special characters or invisible characters to avoid import errors- strip_outer_array : Represents multiple rows of data in a batch of imported data. Doris will expand the array when parsing, and then parse each Object in it as a row of data in turn.
- For more parameters of Stream load, please refer to Stream load - Apache Doris
- If it is in CSV format, we can use it like this
"loadProps":{
"format":"csv",
"column_separator":"\\x01",
"line_delimiter":"\\x02"
}CSV format should pay special attention to row and column separators to avoid conflicts with special characters in the data. Hidden characters are recommended here. The default column separator is: \t, row separator: \n
4.Execute the datax task, refer to the specific datax official website
python bin/datax.py my_import.json
After execution, we can see the following information
2022年11月16日 14:28:54.012 [job-0] INFO JobContainer - jobContainer starts to do prepare ...
2022年11月16日 14:28:54.012 [job-0] INFO JobContainer - DataX Reader.Job [mysqlreader] do prepare work .
2022年11月16日 14:28:54.013 [job-0] INFO JobContainer - DataX Writer.Job [doriswriter] do prepare work .
2022年11月16日 14:28:54.020 [job-0] INFO JobContainer - jobContainer starts to do split ...
2022年11月16日 14:28:54.020 [job-0] INFO JobContainer - Job set Channel-Number to 1 channels.
2022年11月16日 14:28:54.023 [job-0] INFO JobContainer - DataX Reader.Job [mysqlreader] splits to [1] tasks.
2022年11月16日 14:28:54.023 [job-0] INFO JobContainer - DataX Writer.Job [doriswriter] splits to [1] tasks.
2022年11月16日 14:28:54.033 [job-0] INFO JobContainer - jobContainer starts to do schedule ...
2022年11月16日 14:28:54.036 [job-0] INFO JobContainer - Scheduler starts [1] taskGroups.
2022年11月16日 14:28:54.037 [job-0] INFO JobContainer - Running by standalone Mode.
2022年11月16日 14:28:54.041 [taskGroup-0] INFO TaskGroupContainer - taskGroupId=[0] start [1] channels for [1] tasks.
2022年11月16日 14:28:54.043 [taskGroup-0] INFO Channel - Channel set byte_speed_limit to -1, No bps activated.
2022年11月16日 14:28:54.043 [taskGroup-0] INFO Channel - Channel set record_speed_limit to -1, No tps activated.
2022年11月16日 14:28:54.049 [taskGroup-0] INFO TaskGroupContainer - taskGroup[0] taskId[0] attemptCount[1] is started
2022年11月16日 14:28:54.052 [0-0-0-reader] INFO CommonRdbmsReader$Task - Begin to read record by Sql: [select taskid,projectid,taskflowid,templateid,template_name,status_task from dwd_universal_tb_task
] jdbcUrl:[jdbc:mysql://localhost:3306/demo?yearIsDateType=false&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&rewriteBatchedStatements=true].
Wed Nov 16 14:28:54 GMT+08:00 2022 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
2022年11月16日 14:28:54.071 [0-0-0-reader] INFO CommonRdbmsReader$Task - Finished read record by Sql: [select taskid,projectid,taskflowid,templateid,template_name,status_task from dwd_universal_tb_task
] jdbcUrl:[jdbc:mysql://localhost:3306/demo?yearIsDateType=false&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&rewriteBatchedStatements=true].
2022年11月16日 14:28:54.104 [Thread-1] INFO DorisStreamLoadObserver - Start to join batch data: rows[2] bytes[438] label[datax_doris_writer_c4e08cb9-c157-4689-932f-db34acc45b6f].
2022年11月16日 14:28:54.104 [Thread-1] INFO DorisStreamLoadObserver - Executing stream load to: 'http://127.0.0.1:8030/api/demo/dwd_universal_tb_task/_stream_load', size: '441'
2022年11月16日 14:28:54.224 [Thread-1] INFO DorisStreamLoadObserver - StreamLoad response :{"Status":"Success","BeginTxnTimeMs":0,"Message":"OK","NumberUnselectedRows":0,"CommitAndPublishTimeMs":17,"Label":"datax_doris_writer_c4e08cb9-c157-4689-932f-db34acc45b6f","LoadBytes":441,"StreamLoadPutTimeMs":1,"NumberTotalRows":2,"WriteDataTimeMs":11,"TxnId":217056,"LoadTimeMs":31,"TwoPhaseCommit":"false","ReadDataTimeMs":0,"NumberLoadedRows":2,"NumberFilteredRows":0}
2022年11月16日 14:28:54.225 [Thread-1] INFO DorisWriterManager - Async stream load finished: label[datax_doris_writer_c4e08cb9-c157-4689-932f-db34acc45b6f].
2022年11月16日 14:28:54.249 [taskGroup-0] INFO TaskGroupContainer - taskGroup[0] taskId[0] is successed, used[201]ms
2022年11月16日 14:28:54.250 [taskGroup-0] INFO TaskGroupContainer - taskGroup[0] completed it's tasks.
2022年11月16日 14:29:04.048 [job-0] INFO StandAloneJobContainerCommunicator - Total 2 records, 214 bytes | Speed 21B/s, 0 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.000s | Percentage 100.00%
2022年11月16日 14:29:04.049 [job-0] INFO AbstractScheduler - Scheduler accomplished all tasks.
2022年11月16日 14:29:04.049 [job-0] INFO JobContainer - DataX Writer.Job [doriswriter] do post work.
Wed Nov 16 14:29:04 GMT+08:00 2022 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
2022年11月16日 14:29:04.187 [job-0] INFO DorisWriter$Job - Start to execute preSqls:[select count(1) from dwd_universal_tb_task]. context info:jdbc:mysql://172.16.0.13:9030/demo.
2022年11月16日 14:29:04.204 [job-0] INFO JobContainer - DataX Reader.Job [mysqlreader] do post work.
2022年11月16日 14:29:04.204 [job-0] INFO JobContainer - DataX jobId [0] completed successfully.
2022年11月16日 14:29:04.204 [job-0] INFO HookInvoker - No hook invoked, because base dir not exists or is a file: /data/datax/hook
2022年11月16日 14:29:04.205 [job-0] INFO JobContainer -
[total cpu info] =>
averageCpu | maxDeltaCpu | minDeltaCpu
-1.00% | -1.00% | -1.00%
[total gc info] =>
NAME | totalGCCount | maxDeltaGCCount | minDeltaGCCount | totalGCTime | maxDeltaGCTime | minDeltaGCTime
PS MarkSweep | 1 | 1 | 1 | 0.017s | 0.017s | 0.017s
PS Scavenge | 1 | 1 | 1 | 0.007s | 0.007s | 0.007s
2022年11月16日 14:29:04.205 [job-0] INFO JobContainer - PerfTrace not enable!
2022年11月16日 14:29:04.206 [job-0] INFO StandAloneJobContainerCommunicator - Total 2 records, 214 bytes | Speed 21B/s, 0 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.000s | Percentage 100.00%
2022年11月16日 14:29:04.206 [job-0] INFO JobContainer -
Task Start Time : 2022年11月16日 14:28:53
Task End Time : 2022年11月16日 14:29:04
Total Task Duration : 10s
Average Task Throughput : 21B/s
Record Write Speed : 0rec/s
Total Records Read : 2
Total Read/Write Failures : 0