DataX 官网支持绝大部分主流数据源的读写插件,并且有详细的使用文档。

CSV 文件的读写插件

csv 文件就是文本文件,用 txtreader 和 txtwriter 读写。配置文件详细语法请参见 DataX官网说明

txtreader配置示例:

  1. "reader":{
  2. "name":"txtfilereader",
  3. "parameter":{
  4. "path":["文件全路径"],
  5. "encoding":"UTF-8",
  6. "column":[
  7. { "index":0, "type":"long" }
  8. ,{ "index":1, "type":"long" }
  9. ,{ "index":2, "type":"string" }
  10. ,{ "index":3, "type":"double" }
  11. ,{ "index":4, "type":"string" }
  12. ],
  13. "fieldDelimiter":"||",
  14. "fileFormat":"text"
  15. }
  16. }

txtwriter 配置示例:

  1. "writer":{
  2. "name":"txtfilewriter",
  3. "parameter":{
  4. "path":"文件全路径",
  5. "fileName":"文件名",
  6. "writeMode":"truncate",
  7. "dateFormat":"yyyy-MM-dd",
  8. "charset":"UTF-8",
  9. "nullFormat":"",
  10. "fileDelimiter":"||"
  11. }
  12. }

MySQL 数据库的读写插件

针对 MySQL 数据库,用 mysqlreader 和 mysqlwriter 插件读写。

mysqlreader 配置示例:

  1. "reader": {
  2. "name": "mysqlreader",
  3. "parameter": {
  4. "username": "root",
  5. "password": "root",
  6. "column": [
  7. "id",
  8. "name"
  9. ],
  10. "splitPk": "db_id",
  11. "connection": [
  12. {
  13. "table": [
  14. "table"
  15. ],
  16. "jdbcUrl": [
  17. "jdbc:mysql://127.0.0.1:3306/database"
  18. ]
  19. }
  20. ]
  21. }
  22. }

mysqlwriter 配置示例:

  1. "writer": {
  2. "name": "mysqlwriter",
  3. "parameter": {
  4. "writeMode": "insert",
  5. "username": "root",
  6. "password": "root",
  7. "column": [
  8. "id",
  9. "name"
  10. ],
  11. "session": [
  12. "set session sql_mode='ANSI'"
  13. ],
  14. "preSql": [
  15. "delete from test"
  16. ],
  17. "connection": [
  18. {
  19. "jdbcUrl": "jdbc:mysql://127.0.0.1:3306/datax?useUnicode=true&characterEncoding=gbk",
  20. "table": [
  21. "test"
  22. ]
  23. }
  24. ]
  25. }
  26. }

Oracle 数据库的读写插件

针对 Oracle 数据库,用 oraclereader 和 oraclewriter 插件来读写。

oraclereader 配置示例:

  1. "reader": {
  2. "name": "oraclereader",
  3. "parameter": {
  4. // 数据库连接用户名
  5. "username": "root",
  6. // 数据库连接密码
  7. "password": "root",
  8. "column": [
  9. "id","name"
  10. ],
  11. //切分主键
  12. "splitPk": "db_id",
  13. "connection": [
  14. {
  15. "table": [
  16. "table"
  17. ],
  18. "jdbcUrl": [
  19. "jdbc:oracle:thin:@[HOST_NAME]:PORT:[DATABASE_NAME]"
  20. ]
  21. }
  22. ]
  23. }
  24. }

oraclewriter 配置示例:

  1. "writer": {
  2. "name": "oraclewriter",
  3. "parameter": {
  4. "username": "root",
  5. "password": "root",
  6. "column": [
  7. "id",
  8. "name"
  9. ],
  10. "preSql": [
  11. "delete from test"
  12. ],
  13. "connection": [
  14. {
  15. "jdbcUrl": "jdbc:oracle:thin:@[HOST_NAME]:PORT:[DATABASE_NAME]",
  16. "table": [
  17. "test"
  18. ]
  19. }
  20. ]
  21. }
  22. }

DB2 数据库的读写插件

db2reader 配置示例:

  1. "reader":{
  2. "name":"db2reader",
  3. "parameter":{
  4. "username":"SRC_DB_UESRNAME",
  5. "password":"SRC_DB_PASSWORD",
  6. "column":[
  7. "SRC_COLUMN_LIST"
  8. ],
  9. "connection":[
  10. {
  11. "table":[
  12. "SRC_TABLE_NAME"
  13. ],
  14. "jdbcUrl":[
  15. "jdbc:db2://SRC_DB_IP:SRC_DB_PORT/SRC_DB_NAME"
  16. ]
  17. }
  18. ]
  19. }
  20. }

db2writer 配置示例:

OceanBase 数据库的读写插件

OceanBase 数据库使用插件 oceanbasev10reader 和 oceanbasev10writer 来读写。该插件由 OceanBase 产品团队单独提供。

  • oceanbasev10reader 配置示例
  1. "reader":{
  2. "name":"oceanbasev10reader",
  3. "parameter":{
  4. "where":"",
  5. "timeout":10000,
  6. "readBatchSize":100000,
  7. "readByPartition":"true",
  8. "column": [
  9. “列名1”,”列名2
  10. ],
  11. "connection":[
  12. {
  13. "jdbcUrl":["||_dsc_ob10_dsc_||集群名:租户名||_dsc_ob10_dsc_||jdbc:oceanbase://连接IP:连接端口/模式名或数据库名"],
  14. "table":["表名"]
  15. }
  16. ],
  17. "username":"租户内用户名",
  18. "password":"密码"
  19. }
  20. }

示例:OceanBase 表 ware 导出到 csv 文件

  1. [admin@*** /home/admin/datax3]
  2. $cat job/ob_tpcc_ware_2_csv.json
  3. {
  4. "job":{
  5. "setting":{
  6. "speed":{
  7. "channel":10
  8. },
  9. "errorLimit":{
  10. "record":0, "percentage": 0.02
  11. }
  12. },
  13. "content":[
  14. {
  15. "reader":{
  16. "name":"oceanbasev10reader",
  17. "parameter":{
  18. "where":"",
  19. "timeout":10000,
  20. "readBatchSize":100000,
  21. "readByPartition":"true",
  22. "column": [
  23. "W_ID","W_YTD","W_TAX","W_NAME","W_STREET_1","W_STREET_2","W_CITY","W_STATE","W_ZIP"
  24. ],
  25. "connection":[
  26. {
  27. "jdbcUrl":["||_dsc_ob10_dsc_||obdemo:obbmsql||_dsc_ob10_dsc_||jdbc:oceanbase://127.1:2883/tpcc"],
  28. "table":["ware"]
  29. }
  30. ],
  31. "username":"tpcc",
  32. "password":"123456"
  33. }
  34. },
  35. "writer":{
  36. "name":"txtfilewriter",
  37. "parameter":{
  38. "path":"/home/admin/csvdata/",
  39. "fileName":"ware",
  40. "writeMode":"truncate",
  41. "dateFormat":"yyyy-MM-dd",
  42. "charset":"UTF-8",
  43. "nullFormat":"",
  44. "fileDelimiter":"||"
  45. }
  46. }
  47. }
  48. ]
  49. }
  50. }
  51. [admin@*** /home/admin/datax3]
  52. $bin/datax.py job/ob_tpcc_ware_2_csv.json

image.png

  • oceanbasev10writer**配置示例**

使用 DataX 向 OceanBase 里写入时,要避免写入速度过快导致 OceanBase 的增量内存耗尽。通常建议 DataX 配置文件里针对写入做一个写入限速设置。关键字是 memstoreThreshold :

  1. "writer": {
  2. "name": "oceanbasev10writer",
  3. "parameter": {
  4. "username": "租户内的用户名",
  5. "password": "密码",
  6. "writeMode": "insert",
  7. "column": [
  8. "列名1",“列名2
  9. ],
  10. "preSql": [
  11. ""
  12. ],
  13. "connection": [
  14. {
  15. "jdbcUrl": "||_dsc_ob10_dsc_||集群名:租户名||_dsc_ob10_dsc_||jdbc:oceanbase://连接IP:连接端口(默认2883)/模式名或数据库名",
  16. "table": [
  17. "表名"
  18. ]
  19. }
  20. ],
  21. "batchSize": 1024,
  22. "memstoreThreshold": "90"
  23. }
  24. }

示例:从 csv 文件导入到 OceanBase 表中

  1. [admin@*** /home/admin/datax3]
  2. $cat job/csv_2_ob_tpcc_ware2.json
  3. {
  4. "job":{
  5. "setting":{
  6. "speed":{
  7. "channel":32
  8. },
  9. "errorLimit":{
  10. "record":0, "percentage": 0.02
  11. }
  12. },
  13. "content":[
  14. {
  15. "reader":{
  16. "name":"txtfilereader",
  17. "parameter":{
  18. "path":["/home/admin/csvdata/ware*"],
  19. "encoding":"UTF-8",
  20. "column":[
  21. { "index":0, "type":"long" }
  22. ,{ "index":1, "type":"long" }
  23. ,{ "index":2, "type":"long" }
  24. ,{ "index":3, "type":"string" }
  25. ,{ "index":4, "type":"string" }
  26. ,{ "index":5, "type":"string" }
  27. ,{ "index":6, "type":"string" }
  28. ,{ "index":7, "type":"string" }
  29. ,{ "index":8, "type":"string" }
  30. ],
  31. "fieldDelimiter":",",
  32. "fileFormat":"text"
  33. }
  34. },
  35. "writer":{
  36. "name":"oceanbasev10writer",
  37. "parameter":{
  38. "writeMode":"insert",
  39. "column":[
  40. "W_ID","W_YTD","W_TAX","W_NAME","W_STREET_1","W_STREET_2","W_CITY","W_STATE","W_ZIP"
  41. ],
  42. "connection":[
  43. {
  44. "jdbcUrl":"||_dsc_ob10_dsc_||obdemo:obbmsql||_dsc_ob10_dsc_||jdbc:oceanbase://127.1:2883/tpcc",
  45. "table":["WARE2"]
  46. }
  47. ],
  48. "username":"tpcc",
  49. "password":"123456",
  50. "batchSize":256,
  51. " memstoreThreshold":"90"
  52. }
  53. }
  54. }
  55. ]
  56. }
  57. }
  58. [admin@*** /home/admin/datax3]
  59. $bin/datax.py job/csv_2_ob_tpcc_ware2.json

image.png

image.png