DataX doriswriter

DataX doriswriter plug-in, used to synchronize data from other data sources to Doris through DataX.

The plug-in uses Doris’ Stream Load function to synchronize and import data. It needs to be used with DataX service.

About DataX

DataX is an open source version of Alibaba Cloud DataWorks data integration, an offline data synchronization tool/platform widely used in Alibaba Group. DataX implements efficient data synchronization functions between various heterogeneous data sources including MySQL, Oracle, SqlServer, Postgre, HDFS, Hive, ADS, HBase, TableStore (OTS), MaxCompute (ODPS), Hologres, DRDS, etc.

More details can be found at: https://github.com/alibaba/DataX/

Usage

The code of DataX doriswriter plug-in can be found here.

This directory is the doriswriter plug-in development environment of Alibaba DataX.

Because the doriswriter plug-in depends on some modules in the DataX code base, and these module dependencies are not submitted to the official Maven repository, when we develop the doriswriter plug-in, we need to download the complete DataX code base to facilitate our development and compilation of the doriswriter plug-in.

Directory structure

  1. doriswriter/

    This directory is the code directory of doriswriter, and this part of the code should be in the Doris code base.

    The help doc can be found in doriswriter/doc

  2. init-env.sh

    The script mainly performs the following steps:

    1. Git clone the DataX code base to the local

    2. Softlink the doriswriter/ directory to DataX/doriswriter.

    3. Add <module>doriswriter</module> to the original DataX/pom.xml

    4. Change httpclient version from 4.5 to 4.5.13 in DataX/core/pom.xml

      httpclient v4.5 can not handle redirect 307 correctly.

    After that, developers can enter DataX/ for development. And the changes in the DataX/doriswriter directory will be reflected in the doriswriter/ directory, which is convenient for developers to submit code.

How to build

  1. Run init-env.sh

  2. Modify code of doriswriter in DataX/doriswriter if you need.

  3. Build doriswriter

    1. Build doriswriter along:

      mvn clean install -pl plugin-rdbms-util,doriswriter -DskipTests

    2. Build DataX:

      mvn package assembly:assembly -Dmaven.test.skip=true

      The output will be in target/datax/datax/.

      hdfsreader, hdfswriter and oscarwriter needs some extra jar packages. If you don’t need to use these components, you can comment out the corresponding module in DataX/pom.xml.

    3. Compilation error

      If you encounter the following compilation errors:

      1. Could not find artifact com.alibaba.datax:datax-all:pom:0.0.1-SNAPSHOT ...

      You can try the following solutions:

      1. Download alibaba-datax-maven-m2-20210928.tar.gz
      2. After decompression, copy the resulting alibaba/datax/ directory to .m2/repository/com/alibaba/ corresponding to the maven used.
      3. Try to compile again.
  4. Commit code of doriswriter in doriswriter if you need.

Example

1. Stream reads the data and imports it to Doris

For instructions on using the doriswriter plug-in, please refer to here.

2.Mysql reads the data and imports it to Doris

1.Mysql table structure

  1. CREATE TABLE `t_test`(
  2. `id`bigint(30) NOT NULL,
  3. `order_code` varchar(30) DEFAULT NULL COMMENT '',
  4. `line_code` varchar(30) DEFAULT NULL COMMENT '',
  5. `remark` varchar(30) DEFAULT NULL COMMENT '',
  6. `unit_no` varchar(30) DEFAULT NULL COMMENT '',
  7. `unit_name` varchar(30) DEFAULT NULL COMMENT '',
  8. `price` decimal(12,2) DEFAULT NULL COMMENT '',
  9. PRIMARY KEY(`id`) USING BTREE
  10. )ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 ROW_FORMAT=DYNAMIC COMMENT='';

2.Doris table structure

  1. CREATE TABLE `ods_t_test` (
  2. `id`bigint(30) NOT NULL,
  3. `order_code` varchar(30) DEFAULT NULL COMMENT '',
  4. `line_code` varchar(30) DEFAULT NULL COMMENT '',
  5. `remark` varchar(30) DEFAULT NULL COMMENT '',
  6. `unit_no` varchar(30) DEFAULT NULL COMMENT '',
  7. `unit_name` varchar(30) DEFAULT NULL COMMENT '',
  8. `price` decimal(12,2) DEFAULT NULL COMMENT ''
  9. )ENGINE=OLAP
  10. UNIQUE KEY(`id`, `order_code`)
  11. DISTRIBUTED BY HASH(`order_code`) BUCKETS 1
  12. PROPERTIES (
  13. "replication_allocation" = "tag.location.default: 3",
  14. "in_memory" = "false",
  15. "storage_format" = "V2"
  16. );

3.Create datax script

  1. {
  2. "job": {
  3. "content": [
  4. {
  5. "reader": {
  6. "name": "mysqlreader",
  7. "parameter": {
  8. "column": ["emp_no", "birth_date", "first_name","last_name","gender","hire_date"],
  9. "connection": [
  10. {
  11. "jdbcUrl": ["jdbc:mysql://localhost:3306/demo"],
  12. "table": ["employees_1"]
  13. }
  14. ],
  15. "username": "root",
  16. "password": "xxxxx",
  17. "where": ""
  18. }
  19. },
  20. "writer": {
  21. "name": "doriswriter",
  22. "parameter": {
  23. "loadUrl": ["172.16.0.13:8030"],
  24. "loadProps": {
  25. },
  26. "column": ["emp_no", "birth_date", "first_name","last_name","gender","hire_date"],
  27. "username": "root",
  28. "password": "xxxxxx",
  29. "postSql": ["select count(1) from all_employees_info"],
  30. "preSql": [],
  31. "flushInterval":30000,
  32. "connection": [
  33. {
  34. "jdbcUrl": "jdbc:mysql://172.16.0.13:9030/demo",
  35. "selectedDatabase": "demo",
  36. "table": ["all_employees_info"]
  37. }
  38. ],
  39. "loadProps": {
  40. "format": "json",
  41. "strip_outer_array": true
  42. }
  43. }
  44. }
  45. }
  46. ],
  47. "setting": {
  48. "speed": {
  49. "channel": "1"
  50. }
  51. }
  52. }
  53. }

4.Execute the datax task, refer to the specific datax official website