支持 spark ETL 数据同步

使用 Spark ETL 功能,用户可以通过配置 json 的方式进行 Spark 数据同步。

目前支持的类型

  1. jdbcfilerediskafkaelasticsearchmongodatalake(hudidelta)
  1. name: 数据源名称
  2. type: 包含`source``transformation``sink`,分别对应输入、转换、输出
  3. options: 配置参数
  4. saveMode: 保存模式,目前支持:`overwrite``append`
  5. path: 文件路径,可以是: 'file://' or 'hdfs://'(default)
  6. `resultTable`需要和`sourceTable`对应

使用数据源时需要将对应的 spark connector jar 上传至 spark/jars目录,目录位置 $SPARK_HOME/jars

spark connector jar 可以通过以下命令获取

  1. git clone https://github.com/apache/linkis.git
  2. cd linkis
  3. git checkout master
  4. cd linkis-engineconn-plugins/spark/scala-2.12
  5. mvn clean install -Dmaven.test.skip=true

编译完成的spark connector jar位于以下目录中

  1. linkis/linkis-engineconn-plugins/spark/scala-2.12/target/out/spark/dist/3.2.1/lib

在 code 传入具体的 json 代码即可,注意引号格式转换。

  1. sh /appcom/Install/linkis/bin/linkis-cli -engineType spark-3.2.1 -codeType data_calc -code "" -submitUser hadoop -proxyUser hadoop

linkis-cli 提交 redis 数据同步任务示例

  1. sh ./bin/linkis-cli -engineType spark-3.2.1 -codeType data_calc -code "{\"plugins\":[{\"name\":\"file\",\"type\":\"source\",\"config\":{\"resultTable\":\"test\",\"path\":\"hdfs://linkishdfs/tmp/linkis/spark_etl_test/etltest.dolphin\",\"serializer\":\"csv\",\"options\":{\"header\":\"true\",\"delimiter\":\";\"},\"columnNames\":[\"name\",\"age\"]}},{\"name\":\"redis\",\"type\":\"sink\",\"config\":{\"sourceTable\":\"test\",\"host\":\"wds07\",\"port\":\"6679\",\"auth\":\"password\",\"targetTable\":\"spark_etl_test\",\"saveMode\":\"append\"}}]}" -submitUser hadoop -proxyUser hadoop

配置说明

  1. url: jdbc连接信息
  2. user: 用户名称
  3. password: 密码
  4. query: sql查询语句

json code

  1. {
  2. "sources": [
  3. {
  4. "name": "jdbc",
  5. "type": "source",
  6. "config": {
  7. "resultTable": "test1",
  8. "url": "jdbc:mysql://127.0.0.1:3306/dip_linkis?characterEncoding=UTF-8",
  9. "driver": "com.mysql.jdbc.Driver",
  10. "user": "root",
  11. "password": "123456",
  12. "query": "select * from dip_linkis.linkis_ps_udf_baseinfo",
  13. "options": {
  14. }
  15. }
  16. }
  17. ],
  18. "transformations": [
  19. {
  20. "name": "sql",
  21. "type": "transformation",
  22. "config": {
  23. "resultTable": "T1654611700631",
  24. "sql": "select * from test1"
  25. }
  26. }
  27. ],
  28. "sinks": [
  29. {
  30. "name": "jdbc",
  31. "type": "sink",
  32. "config": {
  33. "sourceTable": "T1654611700631",
  34. "url": "jdbc:mysql://127.0.0.1:3306/dip_linkis?characterEncoding=UTF-8",
  35. "driver": "com.mysql.jdbc.Driver",
  36. "user": "root",
  37. "password": "123456",
  38. "targetTable": "linkis_ps_udf_baseinfo2",
  39. "options": {
  40. }
  41. }
  42. }
  43. ]
  44. }

需要新增的jar,根据具体使用的数据源选择对应的 jar

  1. DmJdbcDriver18.jar
  2. kingbase8-8.6.0.jar
  3. postgresql-42.3.8.jar

配置说明

  1. serializer: 文件格式,可以是`csv``parquet`
  2. columnNames: 列名

json code

  1. {
  2. "sources": [
  3. {
  4. "name": "file",
  5. "type": "source",
  6. "config": {
  7. "resultTable": "test2",
  8. "path": "hdfs:///tmp/test_new_no_partition",
  9. "serializer": "csv",
  10. "columnNames": ["id", "create_user", "udf_name", "udf_type", "tree_id", "create_time", "update_time", "sys", "cluster_name", "is_expire", "is_shared"]
  11. }
  12. }
  13. ],
  14. "sinks": [
  15. {
  16. "name": "file",
  17. "config": {
  18. "sourceTable": "test2",
  19. "path": "hdfs:///tmp/test_new",
  20. "partitionBy": ["create_user"],
  21. "saveMode": "overwrite",
  22. "serializer": "csv"
  23. }
  24. }
  25. ]
  26. }

需要新增的 jar

  1. spark-excel-2.12.17-3.2.2_2.12-3.2.2_0.18.1.jar
  1. sourceTable: 源表,
  2. host: ip地址,
  3. port": 端口,
  4. auth": 密码,
  5. targetTable: 目标表,
  6. saveMode: 支持 append

json code

  1. {
  2. "plugins":[
  3. {
  4. "name": "file",
  5. "type": "source",
  6. "config": {
  7. "resultTable": "test",
  8. "path": "hdfs://linkishdfs/tmp/linkis/spark_etl_test/etltest.dolphin",
  9. "serializer": "csv",
  10. "options": {
  11. "header":"true",
  12. "delimiter":";"
  13. },
  14. "columnNames": ["name", "age"]
  15. }
  16. },
  17. {
  18. "name": "redis",
  19. "type":"sink",
  20. "config": {
  21. "sourceTable": "test",
  22. "host": "wds07",
  23. "port": "6679",
  24. "auth":"password",
  25. "targetTable":"spark_etl_test",
  26. "saveMode": "append"
  27. }
  28. }
  29. ]
  30. }

需要新增的jar

  1. jedis-3.2.0.jar
  2. commons-pool2-2.8.1.jar
  3. spark-redis_2.12-2.6.0.jar

配置说明

  1. servers: kafka连接信息
  2. mode: 目前支持`batch``stream`
  3. topic: kafka topic名称

数据写入 json code

  1. {
  2. "sources": [
  3. {
  4. "name": "file",
  5. "type": "source",
  6. "config": {
  7. "resultTable": "T1654611700631",
  8. "path": "file://{filePath}/etltest.dolphin",
  9. "serializer": "csv",
  10. "options": {
  11. "header":"true",
  12. "delimiter":";"
  13. },
  14. "columnNames": ["name", "age"]
  15. }
  16. }
  17. ],
  18. "sinks": [
  19. {
  20. "name": "kafka",
  21. "config": {
  22. "sourceTable": "T1654611700631",
  23. "servers": "localhost:9092",
  24. "mode": "batch",
  25. "topic": "test121212"
  26. }
  27. }
  28. ]
  29. }

数据读取 json code

  1. {
  2. "sources": [
  3. {
  4. "name": "kafka",
  5. "type": "source",
  6. "config": {
  7. "resultTable": "T1654611700631",
  8. "servers": "localhost:9092",
  9. "topic": "test121212"
  10. }
  11. }
  12. ],
  13. "sinks": [
  14. {
  15. "name": "kafka",
  16. "config": {
  17. "sourceTable": "T1654611700631",
  18. "servers": "localhost:9092",
  19. "mode": "stream",
  20. "topic": "test55555"
  21. }
  22. }
  23. ]
  24. }

需要新增的 jar

  1. kafka-clients-2.8.0.jar
  2. spark-sql-kafka-0-10_2.12-3.2.1.jar
  3. spark-token-provider-kafka-0-10_2.12-3.2.1.jar

配置说明

  1. node: elasticsearch ip
  2. port: elasticsearch port
  3. index: elasticsearch索引名称

数据写入 json code

  1. {
  2. "sources": [
  3. {
  4. "name": "file",
  5. "type": "source",
  6. "config": {
  7. "resultTable": "T1654611700631",
  8. "path": "file://{filePath}/etltest.dolphin",
  9. "serializer": "csv",
  10. "options": {
  11. "header":"true",
  12. "delimiter":";"
  13. },
  14. "columnNames": ["name", "age"]
  15. }
  16. }
  17. ],
  18. "sinks": [
  19. {
  20. "name": "elasticsearch",
  21. "config": {
  22. "sourceTable": "T1654611700631",
  23. "node": "localhost",
  24. "port": "9200",
  25. "index": "estest",
  26. "saveMode": "overwrite"
  27. }
  28. }
  29. ]
  30. }

数据读取 json code

  1. {
  2. "sources": [
  3. {
  4. "name": "elasticsearch",
  5. "type": "source",
  6. "config": {
  7. "resultTable": "T1654611700631",
  8. "node": "localhost",
  9. "port": "9200",
  10. "index": "estest"
  11. }
  12. }
  13. ],
  14. "sinks": [
  15. {
  16. "name": "file",
  17. "config": {
  18. "sourceTable": "T1654611700631",
  19. "path": "file://{filePath}/csv",
  20. "saveMode": "overwrite",
  21. "serializer": "csv"
  22. }
  23. }
  24. ]
  25. }

需要新增的jar

  1. elasticsearch-spark-30_2.12-7.17.7.jar

配置说明

  1. uri: mongo连接信息
  2. database: mongo database
  3. collection: mongo collection

数据写入 json code

  1. {
  2. "sources": [
  3. {
  4. "name": "file",
  5. "type": "source",
  6. "config": {
  7. "resultTable": "T1654611700631",
  8. "path": "file://{filePath}/etltest.dolphin",
  9. "serializer": "csv",
  10. "options": {
  11. "header":"true",
  12. "delimiter":";"
  13. },
  14. "columnNames": ["name", "age"]
  15. }
  16. }
  17. ],
  18. "sinks": [
  19. {
  20. "name": "mongo",
  21. "config": {
  22. "sourceTable": "T1654611700631",
  23. "uri": "mongodb://localhost:27017/test",
  24. "database": "test",
  25. "collection": "test",
  26. "saveMode": "overwrite"
  27. }
  28. }
  29. ]
  30. }

数据读取 json code

  1. {
  2. "sources": [
  3. {
  4. "name": "mongo",
  5. "type": "source",
  6. "config": {
  7. "resultTable": "T1654611700631",
  8. "uri": "mongodb://localhost:27017/test",
  9. "database": "test",
  10. "collection": "test"
  11. }
  12. }
  13. ],
  14. "sinks": [
  15. {
  16. "name": "file",
  17. "config": {
  18. "sourceTable": "T1654611700631",
  19. "path": "file://{filePath}/json",
  20. "saveMode": "overwrite",
  21. "serializer": "json"
  22. }
  23. }
  24. ]
  25. }

需要新增的 jar

  1. bson-3.12.8.jar
  2. mongo-spark-connector_2.12-3.0.1.jar
  3. mongodb-driver-core-3.12.8.jar
  4. mongodb-driver-sync-3.12.8.jar

配置说明

  1. tableFormat: 目前支持`hudi``delta`

数据写入 json code

  1. {
  2. "sources": [
  3. {
  4. "name": "file",
  5. "type": "source",
  6. "config": {
  7. "resultTable": "T1654611700631",
  8. "path": "file://{filePath}/etltest.dolphin",
  9. "serializer": "csv",
  10. "options": {
  11. "header":"true",
  12. "delimiter":";"
  13. },
  14. "columnNames": ["name", "age"]
  15. }
  16. }
  17. ],
  18. "sinks": [
  19. {
  20. "name": "datalake",
  21. "config": {
  22. "sourceTable": "T1654611700631",
  23. "tableFormat": "delta",
  24. "path": "file://{filePath}/delta",
  25. "saveMode": "overwrite"
  26. }
  27. }
  28. ]
  29. }

数据读取 json code

  1. {
  2. "sources": [
  3. {
  4. "name": "datalake",
  5. "type": "source",
  6. "config": {
  7. "resultTable": "T1654611700631",
  8. "tableFormat": "delta",
  9. "path": "file://{filePath}/delta",
  10. }
  11. }
  12. ],
  13. "sinks": [
  14. {
  15. "name": "file",
  16. "config": {
  17. "sourceTable": "T1654611700631",
  18. "path": "file://{filePath}/csv",
  19. "saveMode": "overwrite",
  20. "options": {
  21. "header":"true"
  22. },
  23. "serializer": "csv"
  24. }
  25. }
  26. ]
  27. }

需要新增的 jar

  1. delta-core_2.12-2.0.2.jar
  2. delta-storage-2.0.2.jar

配置说明

  1. tableFormat: 目前支持`hudi``delta`

数据写入 json code

  1. {
  2. "sources": [
  3. {
  4. "name": "file",
  5. "type": "source",
  6. "config": {
  7. "resultTable": "T1654611700631",
  8. "path": "file://{filePath}/etltest.dolphin",
  9. "serializer": "csv",
  10. "options": {
  11. "header":"true",
  12. "delimiter":";"
  13. },
  14. "columnNames": ["name", "age"]
  15. }
  16. }
  17. ],
  18. "transformations": [
  19. {
  20. "name": "sql",
  21. "type": "transformation",
  22. "config": {
  23. "resultTable": "T111",
  24. "sql": "select * from T1654611700631"
  25. }
  26. }
  27. ],
  28. "sinks": [
  29. {
  30. "name": "datalake",
  31. "config": {
  32. "sourceTable": "T1654611700631",
  33. "tableFormat": "hudi",
  34. "options": {
  35. "hoodie.table.name":"huditest",
  36. "hoodie.datasource.write.recordkey.field":"age",
  37. "hoodie.datasource.write.precombine.field":"age"
  38. },
  39. "path": "file://{filePath}/hudi",
  40. "saveMode": "append"
  41. }
  42. }
  43. ]
  44. }

数据读取 json code

  1. {
  2. "sources": [
  3. {
  4. "name": "datalake",
  5. "type": "source",
  6. "config": {
  7. "resultTable": "T1654611700631",
  8. "tableFormat": "hudi",
  9. "path": "file://{filePath}/hudi",
  10. }
  11. }
  12. ],
  13. "transformations": [
  14. {
  15. "name": "sql",
  16. "type": "transformation",
  17. "config": {
  18. "resultTable": "T111",
  19. "sql": "select * from T1654611700631"
  20. }
  21. }
  22. ],
  23. "sinks": [
  24. {
  25. "name": "file",
  26. "config": {
  27. "sourceTable": "T1654611700631",
  28. "path": "file://{filePath}/csv",
  29. "saveMode": "overwrite",
  30. "options": {
  31. "header":"true"
  32. },
  33. "serializer": "csv"
  34. }
  35. }
  36. ]
  37. }

需要新增的 jar

  1. hudi-spark3.2-bundle_2.12-0.13.0.jar