Support spark ETL data synchronization

Using the Spark ETL function, users can synchronize Spark data by configuring json.

currently supported types

  1. jdbc, file, redis, kafka, elasticsearch, mongo, datalake (hudi, delta)
  1. name: data source name
  2. type: Contains `source`, `transformation`, `sink`, corresponding to input, transformation, and output respectively
  3. options: configuration parameters
  4. saveMode: save mode, currently supports: `overwrite` and `append`
  5. path: file path, can be: 'file://' or 'hdfs://'(default)
  6. `resultTable` needs to correspond to `sourceTable`

When using the data source, you need to upload the corresponding spark connector jar to the spark/jars directory, the directory location is $SPARK_HOME/jars

The spark connector jar can be obtained by the following command

  1. git clone https://github.com/apache/linkis.git
  2. cd link is
  3. git checkout master
  4. cd linkis-engineconn-plugins/spark/scala-2.12
  5. mvn clean install -Dmaven.test.skip=true

The compiled spark connector jar is located in the following directory

  1. linkis/linkis-engineconn-plugins/spark/scala-2.12/target/out/spark/dist/3.2.1/lib

Just pass in the specific json code in code, pay attention to the conversion of quotation marks.

  1. sh /appcom/Install/linkis/bin/linkis-cli -engineType spark-3.2.1 -codeType data_calc -code "" -submitUser hadoop -proxyUser hadoop

Linkis-cli submits redis data synchronization task example

  1. sh ./bin/linkis-cli -engineType spark-3.2.1 -codeType data_calc -code "{\"plugins\":[{\"name\":\"file\",\"type\":\" source\",\"config\":{\"resultTable\":\"test\",\"path\":\"hdfs://linkishdfs/tmp/linkis/spark_etl_test/etltest.dolphin\",\ "serializer\":\"csv\",\"options\":{\"header\":\"true\",\"delimiter\":\";\"},\"columnNames\":[ \"name\",\"age\"]}},{\"name\":\"redis\",\"type\":\"sink\",\"config\":{\"sourceTable \":\"test\",\"host\":\"wds07\",\"port\":\"6679\",\"auth\":\"password\",\"targetTable\" :\"spark_etl_test\",\"saveMode\":\"append\"}}]}" -submitUser hadoop -proxyUser hadoop

Configuration instructions

  1. url: jdbc connection information
  2. user: user name
  3. password: password
  4. query: sql query statement

json code

  1. {
  2. "sources": [
  3. {
  4. "name": "jdbc",
  5. "type": "source",
  6. "config": {
  7. "resultTable": "test1",
  8. "url": "jdbc:mysql://127.0.0.1:3306/dip_linkis?characterEncoding=UTF-8",
  9. "driver": "com.mysql.jdbc.Driver",
  10. "user": "root",
  11. "password": "123456",
  12. "query": "select * from dip_linkis.linkis_ps_udf_baseinfo",
  13. "options": {
  14. }
  15. }
  16. }
  17. ],
  18. "transformations": [
  19. {
  20. "name": "sql",
  21. "type": "transformation",
  22. "config": {
  23. "resultTable": "T1654611700631",
  24. "sql": "select * from test1"
  25. }
  26. }
  27. ],
  28. "sinks": [
  29. {
  30. "name": "jdbc",
  31. "type": "sink",
  32. "config": {
  33. "sourceTable": "T1654611700631",
  34. "url": "jdbc:mysql://127.0.0.1:3306/dip_linkis?characterEncoding=UTF-8",
  35. "driver": "com.mysql.jdbc.Driver",
  36. "user": "root",
  37. "password": "123456",
  38. "targetTable": "linkis_ps_udf_baseinfo2",
  39. "options": {
  40. }
  41. }
  42. }
  43. ]
  44. }

A new jar needs to be added, and the corresponding jar should be selected according to the specific data source used

  1. DmJdbcDriver18.jar
  2. kingbase8-8.6.0.jar
  3. postgresql-42.3.8.jar

Configuration instructions

  1. serializer: file format, can be `csv`, `parquet`, etc.
  2. columnNames: column names

json code

  1. {
  2. "sources": [
  3. {
  4. "name": "file",
  5. "type": "source",
  6. "config": {
  7. "resultTable": "test2",
  8. "path": "hdfs:///tmp/test_new_no_partition",
  9. "serializer": "csv",
  10. "columnNames": ["id", "create_user", "udf_name", "udf_type", "tree_id", "create_time", "update_time", "sys", "cluster_name", "is_expire", "is_shared"]
  11. }
  12. }
  13. ],
  14. "sinks": [
  15. {
  16. "name": "file",
  17. "config": {
  18. "sourceTable": "test2",
  19. "path": "hdfs:///tmp/test_new",
  20. "partitionBy": ["create_user"],
  21. "saveMode": "overwrite",
  22. "serializer": "csv"
  23. }
  24. }
  25. ]
  26. }

Need to add new jar

  1. spark-excel-2.12.17-3.2.2_2.12-3.2.2_0.18.1.jar
  1. sourceTable: source table,
  2. host: ip address,
  3. port": port,
  4. auth": password,
  5. targetTable: target table,
  6. saveMode: support append

json code

  1. {
  2. "plugins":[
  3. {
  4. "name": "file",
  5. "type": "source",
  6. "config": {
  7. "resultTable": "test",
  8. "path": "hdfs://linkishdfs/tmp/linkis/spark_etl_test/etltest.dolphin",
  9. "serializer": "csv",
  10. "options": {
  11. "header": "true",
  12. "delimiter": ";"
  13. },
  14. "columnNames": ["name", "age"]
  15. }
  16. },
  17. {
  18. "name": "redis",
  19. "type": "sink",
  20. "config": {
  21. "sourceTable": "test",
  22. "host": "wds07",
  23. "port": "6679",
  24. "auth": "password",
  25. "targetTable": "spark_etl_test",
  26. "saveMode": "append"
  27. }
  28. }
  29. ]
  30. }

Need to add new jar

  1. jedis-3.2.0.jar
  2. commons-pool2-2.8.1.jar
  3. spark-redis_2.12-2.6.0.jar

Configuration instructions

  1. servers: kafka connection information
  2. mode: currently supports `batch` and `stream`
  3. topic: kafka topic name

Data written to json code

  1. {
  2. "sources": [
  3. {
  4. "name": "file",
  5. "type": "source",
  6. "config": {
  7. "resultTable": "T1654611700631",
  8. "path": "file://{filePath}/etltest.dolphin",
  9. "serializer": "csv",
  10. "options": {
  11. "header": "true",
  12. "delimiter": ";"
  13. },
  14. "columnNames": ["name", "age"]
  15. }
  16. }
  17. ],
  18. "sinks": [
  19. {
  20. "name": "kafka",
  21. "config": {
  22. "sourceTable": "T1654611700631",
  23. "servers": "localhost:9092",
  24. "mode": "batch",
  25. "topic": "test121212"
  26. }
  27. }
  28. ]
  29. }

Data read json code

  1. {
  2. "sources": [
  3. {
  4. "name": "kafka",
  5. "type": "source",
  6. "config": {
  7. "resultTable": "T1654611700631",
  8. "servers": "localhost:9092",
  9. "topic": "test121212"
  10. }
  11. }
  12. ],
  13. "sinks": [
  14. {
  15. "name": "kafka",
  16. "config": {
  17. "sourceTable": "T1654611700631",
  18. "servers": "localhost:9092",
  19. "mode": "stream",
  20. "topic": "test55555"
  21. }
  22. }
  23. ]
  24. }

Need to add new jar

  1. kafka-clients-2.8.0.jar
  2. spark-sql-kafka-0-10_2.12-3.2.1.jar
  3. spark-token-provider-kafka-0-10_2.12-3.2.1.jar

Configuration instructions

  1. node: elasticsearch ip
  2. port: elasticsearch port
  3. index: elasticsearch index name

Data written to json code

  1. {
  2. "sources": [
  3. {
  4. "name": "file",
  5. "type": "source",
  6. "config": {
  7. "resultTable": "T1654611700631",
  8. "path": "file://{filePath}/etltest.dolphin",
  9. "serializer": "csv",
  10. "options": {
  11. "header": "true",
  12. "delimiter": ";"
  13. },
  14. "columnNames": ["name", "age"]
  15. }
  16. }
  17. ],
  18. "sinks": [
  19. {
  20. "name": "elasticsearch",
  21. "config": {
  22. "sourceTable": "T1654611700631",
  23. "node": "localhost",
  24. "port": "9200",
  25. "index": "estest",
  26. "saveMode": "overwrite"
  27. }
  28. }
  29. ]
  30. }

Data read json code

  1. {
  2. "sources": [
  3. {
  4. "name": "elasticsearch",
  5. "type": "source",
  6. "config": {
  7. "resultTable": "T1654611700631",
  8. "node": "localhost",
  9. "port": "9200",
  10. "index": "estest"
  11. }
  12. }
  13. ],
  14. "sinks": [
  15. {
  16. "name": "file",
  17. "config": {
  18. "sourceTable": "T1654611700631",
  19. "path": "file://{filePath}/csv",
  20. "saveMode": "overwrite",
  21. "serializer": "csv"
  22. }
  23. }
  24. ]
  25. }

Need to add new jar

  1. elasticsearch-spark-30_2.12-7.17.7.jar

Configuration instructions

  1. uri: mongo connection information
  2. database: mongo database
  3. collection: mongo collection

Data written to json code

  1. {
  2. "sources": [
  3. {
  4. "name": "file",
  5. "type": "source",
  6. "config": {
  7. "resultTable": "T1654611700631",
  8. "path": "file://{filePath}/etltest.dolphin",
  9. "serializer": "csv",
  10. "options": {
  11. "header": "true",
  12. "delimiter": ";"
  13. },
  14. "columnNames": ["name", "age"]
  15. }
  16. }
  17. ],
  18. "sinks": [
  19. {
  20. "name": "mongo",
  21. "config": {
  22. "sourceTable": "T1654611700631",
  23. "uri": "mongodb://localhost:27017/test",
  24. "database": "test",
  25. "collection": "test",
  26. "saveMode": "overwrite"
  27. }
  28. }
  29. ]
  30. }

Data read json code

  1. {
  2. "sources": [
  3. {
  4. "name": "mongo",
  5. "type": "source",
  6. "config": {
  7. "resultTable": "T1654611700631",
  8. "uri": "mongodb://localhost:27017/test",
  9. "database": "test",
  10. "collection": "test"
  11. }
  12. }
  13. ],
  14. "sinks": [
  15. {
  16. "name": "file",
  17. "config": {
  18. "sourceTable": "T1654611700631",
  19. "path": "file://{filePath}/json",
  20. "saveMode": "overwrite",
  21. "serializer": "json"
  22. }
  23. }
  24. ]
  25. }

Need to add new jar

  1. bson-3.12.8.jar
  2. mongo-spark-connector_2.12-3.0.1.jar
  3. mongodb-driver-core-3.12.8.jar
  4. mongodb-driver-sync-3.12.8.jar

Configuration instructions

  1. tableFormat: currently supports `hudi` and `delta`

Data written to json code

  1. {
  2. "sources": [
  3. {
  4. "name": "file",
  5. "type": "source",
  6. "config": {
  7. "resultTable": "T1654611700631",
  8. "path": "file://{filePath}/etltest.dolphin",
  9. "serializer": "csv",
  10. "options": {
  11. "header": "true",
  12. "delimiter": ";"
  13. },
  14. "columnNames": ["name", "age"]
  15. }
  16. }
  17. ],
  18. "sinks": [
  19. {
  20. "name": "datalake",
  21. "config": {
  22. "sourceTable": "T1654611700631",
  23. "tableFormat": "delta",
  24. "path": "file://{filePath}/delta",
  25. "saveMode": "overwrite"
  26. }
  27. }
  28. ]
  29. }

Data read json code

  1. {
  2. "sources": [
  3. {
  4. "name": "datalake",
  5. "type": "source",
  6. "config": {
  7. "resultTable": "T1654611700631",
  8. "tableFormat": "delta",
  9. "path": "file://{filePath}/delta",
  10. }
  11. }
  12. ],
  13. "sinks": [
  14. {
  15. "name": "file",
  16. "config": {
  17. "sourceTable": "T1654611700631",
  18. "path": "file://{filePath}/csv",
  19. "saveMode": "overwrite",
  20. "options": {
  21. "header": "true"
  22. },
  23. "serializer": "csv"
  24. }
  25. }
  26. ]
  27. }

Need to add new jar

  1. delta-core_2.12-2.0.2.jar
  2. delta-storage-2.0.2.jar

Configuration instructions

  1. tableFormat: currently supports `hudi` and `delta`

Data written to json code

  1. {
  2. "sources": [
  3. {
  4. "name": "file",
  5. "type": "source",
  6. "config": {
  7. "resultTable": "T1654611700631",
  8. "path": "file://{filePath}/etltest.dolphin",
  9. "serializer": "csv",
  10. "options": {
  11. "header": "true",
  12. "delimiter": ";"
  13. },
  14. "columnNames": ["name", "age"]
  15. }
  16. }
  17. ],
  18. "transformations": [
  19. {
  20. "name": "sql",
  21. "type": "transformation",
  22. "config": {
  23. "resultTable": "T111",
  24. "sql": "select * from T1654611700631"
  25. }
  26. }
  27. ],
  28. "sinks": [
  29. {
  30. "name": "datalake",
  31. "config": {
  32. "sourceTable": "T1654611700631",
  33. "tableFormat": "hudi",
  34. "options": {
  35. "hoodie.table.name": "huditest",
  36. "hoodie.datasource.write.recordkey.field": "age",
  37. "hoodie.datasource.write.precombine.field":"age"
  38. },
  39. "path": "file://{filePath}/hudi",
  40. "saveMode": "append"
  41. }
  42. }
  43. ]
  44. }

Data read json code

  1. {
  2. "sources": [
  3. {
  4. "name": "datalake",
  5. "type": "source",
  6. "config": {
  7. "resultTable": "T1654611700631",
  8. "tableFormat": "hudi",
  9. "path": "file://{filePath}/hudi",
  10. }
  11. }
  12. ],
  13. "transformations": [
  14. {
  15. "name": "sql",
  16. "type": "transformation",
  17. "config": {
  18. "resultTable": "T111",
  19. "sql": "select * from T1654611700631"
  20. }
  21. }
  22. ],
  23. "sinks": [
  24. {
  25. "name": "file",
  26. "config": {
  27. "sourceTable": "T1654611700631",
  28. "path": "file://{filePath}/csv",
  29. "saveMode": "overwrite",
  30. "options": {
  31. "header": "true"
  32. },
  33. "serializer": "csv"
  34. }
  35. }
  36. ]
  37. }

Need to add new jar

  1. hudi-spark3.2-bundle_2.12-0.13.0.jar