DataX 官网支持绝大部分主流数据源的读写插件,并且有详细的使用文档。
CSV 文件的读写插件
csv 文件就是文本文件,用 txtreader 和 txtwriter 读写。配置文件详细语法请参见 DataX官网说明。
txtreader配置示例:
"reader":{
"name":"txtfilereader",
"parameter":{
"path":["文件全路径"],
"encoding":"UTF-8",
"column":[
{ "index":0, "type":"long" }
,{ "index":1, "type":"long" }
,{ "index":2, "type":"string" }
,{ "index":3, "type":"double" }
,{ "index":4, "type":"string" }
],
"fieldDelimiter":"||",
"fileFormat":"text"
}
}
txtwriter 配置示例:
"writer":{
"name":"txtfilewriter",
"parameter":{
"path":"文件全路径",
"fileName":"文件名",
"writeMode":"truncate",
"dateFormat":"yyyy-MM-dd",
"charset":"UTF-8",
"nullFormat":"",
"fileDelimiter":"||"
}
}
MySQL 数据库的读写插件
针对 MySQL 数据库,用 mysqlreader 和 mysqlwriter 插件读写。
mysqlreader 配置示例:
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "root",
"column": [
"id",
"name"
],
"splitPk": "db_id",
"connection": [
{
"table": [
"table"
],
"jdbcUrl": [
"jdbc:mysql://127.0.0.1:3306/database"
]
}
]
}
}
mysqlwriter 配置示例:
"writer": {
"name": "mysqlwriter",
"parameter": {
"writeMode": "insert",
"username": "root",
"password": "root",
"column": [
"id",
"name"
],
"session": [
"set session sql_mode='ANSI'"
],
"preSql": [
"delete from test"
],
"connection": [
{
"jdbcUrl": "jdbc:mysql://127.0.0.1:3306/datax?useUnicode=true&characterEncoding=gbk",
"table": [
"test"
]
}
]
}
}
Oracle 数据库的读写插件
针对 Oracle 数据库,用 oraclereader 和 oraclewriter 插件来读写。
oraclereader 配置示例:
"reader": {
"name": "oraclereader",
"parameter": {
// 数据库连接用户名
"username": "root",
// 数据库连接密码
"password": "root",
"column": [
"id","name"
],
//切分主键
"splitPk": "db_id",
"connection": [
{
"table": [
"table"
],
"jdbcUrl": [
"jdbc:oracle:thin:@[HOST_NAME]:PORT:[DATABASE_NAME]"
]
}
]
}
}
oraclewriter 配置示例:
"writer": {
"name": "oraclewriter",
"parameter": {
"username": "root",
"password": "root",
"column": [
"id",
"name"
],
"preSql": [
"delete from test"
],
"connection": [
{
"jdbcUrl": "jdbc:oracle:thin:@[HOST_NAME]:PORT:[DATABASE_NAME]",
"table": [
"test"
]
}
]
}
}
DB2 数据库的读写插件
db2reader 配置示例:
"reader":{
"name":"db2reader",
"parameter":{
"username":"SRC_DB_UESRNAME",
"password":"SRC_DB_PASSWORD",
"column":[
"SRC_COLUMN_LIST"
],
"connection":[
{
"table":[
"SRC_TABLE_NAME"
],
"jdbcUrl":[
"jdbc:db2://SRC_DB_IP:SRC_DB_PORT/SRC_DB_NAME"
]
}
]
}
}
db2writer 配置示例:
OceanBase 数据库的读写插件
OceanBase 数据库使用插件 oceanbasev10reader 和 oceanbasev10writer 来读写。该插件由 OceanBase 产品团队单独提供。
- oceanbasev10reader 配置示例
"reader":{
"name":"oceanbasev10reader",
"parameter":{
"where":"",
"timeout":10000,
"readBatchSize":100000,
"readByPartition":"true",
"column": [
“列名1”,”列名2”
],
"connection":[
{
"jdbcUrl":["||_dsc_ob10_dsc_||集群名:租户名||_dsc_ob10_dsc_||jdbc:oceanbase://连接IP:连接端口/模式名或数据库名"],
"table":["表名"]
}
],
"username":"租户内用户名",
"password":"密码"
}
}
示例:OceanBase 表 ware 导出到 csv 文件
[admin@*** /home/admin/datax3]
$cat job/ob_tpcc_ware_2_csv.json
{
"job":{
"setting":{
"speed":{
"channel":10
},
"errorLimit":{
"record":0, "percentage": 0.02
}
},
"content":[
{
"reader":{
"name":"oceanbasev10reader",
"parameter":{
"where":"",
"timeout":10000,
"readBatchSize":100000,
"readByPartition":"true",
"column": [
"W_ID","W_YTD","W_TAX","W_NAME","W_STREET_1","W_STREET_2","W_CITY","W_STATE","W_ZIP"
],
"connection":[
{
"jdbcUrl":["||_dsc_ob10_dsc_||obdemo:obbmsql||_dsc_ob10_dsc_||jdbc:oceanbase://127.1:2883/tpcc"],
"table":["ware"]
}
],
"username":"tpcc",
"password":"123456"
}
},
"writer":{
"name":"txtfilewriter",
"parameter":{
"path":"/home/admin/csvdata/",
"fileName":"ware",
"writeMode":"truncate",
"dateFormat":"yyyy-MM-dd",
"charset":"UTF-8",
"nullFormat":"",
"fileDelimiter":"||"
}
}
}
]
}
}
[admin@*** /home/admin/datax3]
$bin/datax.py job/ob_tpcc_ware_2_csv.json
- oceanbasev10writer**配置示例**
使用 DataX 向 OceanBase 里写入时,要避免写入速度过快导致 OceanBase 的增量内存耗尽。通常建议 DataX 配置文件里针对写入做一个写入限速设置。关键字是 memstoreThreshold :
"writer": {
"name": "oceanbasev10writer",
"parameter": {
"username": "租户内的用户名",
"password": "密码",
"writeMode": "insert",
"column": [
"列名1",“列名2”
],
"preSql": [
""
],
"connection": [
{
"jdbcUrl": "||_dsc_ob10_dsc_||集群名:租户名||_dsc_ob10_dsc_||jdbc:oceanbase://连接IP:连接端口(默认2883)/模式名或数据库名",
"table": [
"表名"
]
}
],
"batchSize": 1024,
"memstoreThreshold": "90"
}
}
示例:从 csv 文件导入到 OceanBase 表中
[admin@*** /home/admin/datax3]
$cat job/csv_2_ob_tpcc_ware2.json
{
"job":{
"setting":{
"speed":{
"channel":32
},
"errorLimit":{
"record":0, "percentage": 0.02
}
},
"content":[
{
"reader":{
"name":"txtfilereader",
"parameter":{
"path":["/home/admin/csvdata/ware*"],
"encoding":"UTF-8",
"column":[
{ "index":0, "type":"long" }
,{ "index":1, "type":"long" }
,{ "index":2, "type":"long" }
,{ "index":3, "type":"string" }
,{ "index":4, "type":"string" }
,{ "index":5, "type":"string" }
,{ "index":6, "type":"string" }
,{ "index":7, "type":"string" }
,{ "index":8, "type":"string" }
],
"fieldDelimiter":",",
"fileFormat":"text"
}
},
"writer":{
"name":"oceanbasev10writer",
"parameter":{
"writeMode":"insert",
"column":[
"W_ID","W_YTD","W_TAX","W_NAME","W_STREET_1","W_STREET_2","W_CITY","W_STATE","W_ZIP"
],
"connection":[
{
"jdbcUrl":"||_dsc_ob10_dsc_||obdemo:obbmsql||_dsc_ob10_dsc_||jdbc:oceanbase://127.1:2883/tpcc",
"table":["WARE2"]
}
],
"username":"tpcc",
"password":"123456",
"batchSize":256,
" memstoreThreshold":"90"
}
}
}
]
}
}
[admin@*** /home/admin/datax3]
$bin/datax.py job/csv_2_ob_tpcc_ware2.json