Installation Spark lineage

This paper mainly introduces the ‘Spark’ engine blood collection scheme in ‘Linkis’.

The Spline Agent for Apache Spark is a complementary module to the Spline project that captures runtime lineage information from the Apache Spark jobs.

github address

  1. https://github.com/AbsaOSS/spline-spark-agent
  1. cd $SPARK_HOME/jars
  2. wget https://repo1.maven.org/maven2/za/co/absa/spline/agent/spark/spark-3.2-spline-agent-bundle_2.12/2.0.0/spark-3.2-spline-agent-bundle_2.12-2.0.0.jar

When the download is complete, $SPARK_HOME/jars will appear spark-3.2-spline-agent-bundle_2.12-2.0.0.jar

  1. vim $SPARK_HOME/conf/spark-defaults.conf
  2. Add the following configuration
  3. spark.sql.queryExecutionListeners=za.co.absa.spline.harvester.listener.SplineQueryExecutionListener
  4. spark.spline.lineageDispatcher=log
  5. spark.spline.lineageDispatcher.log.level=INFO
  6. spark.spline.lineageDispatcher.log.className=za.co.absa.spline.harvester.dispatcher.LoggingLineageDispatcher
  1. Create input files and upload them to hdfs
  2. vim read.json
  3. {"name":"linkis","age":"5"}
  4. hadoop fs -put read.json /tmp
  1. Create output directory
  2. hadoop fs -mkdir /tmp/jsonWrite
  1. sh ./bin/linkis-cli -engineType spark-3.2.1 -codeType sql -code \
  2. "CREATE TEMPORARY VIEW jsonReadTable
  3. USING org.apache.spark.sql.json
  4. OPTIONS (
  5. path '/tmp/read.json'
  6. );
  7. INSERT OVERWRITE DIRECTORY '/tmp/jsonWrite' SELECT * FROM jsonReadTable;" \
  8. -submitUser hadoop -proxyUser hadoop
  1. cat /appcom/tmp/hadoop/20230829/spark/117ca887-f9d6-4923-8ca1-cef7155ee0e7/logs/stdout

The output is as follows: spark-lineage-log

Details are as follows:

  1. {
  2. "id":"a5b273b3-a87f-5a30-8ced-c8eeff2d1458",
  3. "name":"Linkis-EngineConn-Spark_LINKISCLI",
  4. "operations":{
  5. "write":{
  6. "outputSource":"/tmp/jsonWrite",
  7. "append":false,
  8. "id":"op-0",
  9. "name":"InsertIntoHiveDirCommand",
  10. "childIds":[
  11. "op-1"
  12. ],
  13. "extra":{
  14. "destinationType":"hive"
  15. }
  16. },
  17. "reads":[
  18. {
  19. "inputSources":[
  20. "hdfs://linkishdfs/tmp/read.json"
  21. ],
  22. "id":"op-4",
  23. "name":"LogicalRelation",
  24. "output":[
  25. "attr-0",
  26. "attr-1"
  27. ],
  28. "params":{
  29. "path":"/tmp/read.json"
  30. },
  31. "extra":{
  32. "sourceType":"json"
  33. }
  34. }
  35. ],
  36. "other":[
  37. {
  38. "id":"op-3",
  39. "name":"View",
  40. "childIds":[
  41. "op-4"
  42. ],
  43. "output":[
  44. "attr-0",
  45. "attr-1"
  46. ],
  47. "params":{
  48. "desc":"CatalogTable(\nTable: jsonReadTable\nCreated Time: Tue Aug 29 11:52:10 CST 2023\nLast Access: UNKNOWN\nCreated By: Spark \nType: VIEW\nTable Properties: []\nSchema: root\n |-- age: string (nullable = true)\n |-- name: string (nullable = true)\n)",
  49. "isTempView":true
  50. }
  51. },
  52. {
  53. "id":"op-2",
  54. "name":"SubqueryAlias",
  55. "childIds":[
  56. "op-3"
  57. ],
  58. "output":[
  59. "attr-0",
  60. "attr-1"
  61. ],
  62. "params":{
  63. "identifier":"jsonreadtable"
  64. }
  65. },
  66. {
  67. "id":"op-1",
  68. "name":"Project",
  69. "childIds":[
  70. "op-2"
  71. ],
  72. "output":[
  73. "attr-0",
  74. "attr-1"
  75. ],
  76. "params":{
  77. "projectList":[
  78. {
  79. "__attrId":"attr-0"
  80. },
  81. {
  82. "__attrId":"attr-1"
  83. }
  84. ]
  85. }
  86. }
  87. ]
  88. },
  89. "attributes":[
  90. {
  91. "id":"attr-0",
  92. "dataType":"e63adadc-648a-56a0-9424-3289858cf0bb",
  93. "name":"age"
  94. },
  95. {
  96. "id":"attr-1",
  97. "dataType":"e63adadc-648a-56a0-9424-3289858cf0bb",
  98. "name":"name"
  99. }
  100. ],
  101. "expressions":{
  102. },
  103. "systemInfo":{
  104. "name":"spark",
  105. "version":"3.2.1"
  106. },
  107. "agentInfo":{
  108. "name":"spline",
  109. "version":"2.0.0"
  110. },
  111. "extraInfo":{
  112. "appName":"Linkis-EngineConn-Spark_LINKISCLI",
  113. "dataTypes":[
  114. {
  115. "id":"e63adadc-648a-56a0-9424-3289858cf0bb",
  116. "name":"string",
  117. "nullable":true,
  118. "_typeHint":"dt.Simple"
  119. }
  120. ]
  121. }
  122. }
  1. vim $SPARK_HOME/conf/spark-defaults.conf
  2. Add the following configuration
  3. spark.sql.queryExecutionListeners=za.co.absa.spline.harvester.listener.SplineQueryExecutionListener
  4. spark.spline.lineageDispatcher=kafka
  5. spark.spline.lineageDispatcher.kafka.topic=linkis_spark_lineage_test
  6. spark.spline.lineageDispatcher.kafka.producer.bootstrap.servers=localhost:9092
  1. sh ./bin/linkis-cli -engineType spark-3.2.1 -codeType sql -code \
  2. "CREATE TEMPORARY VIEW jsonReadTable
  3. USING org.apache.spark.sql.json
  4. OPTIONS (
  5. path '/tmp/read.json'
  6. );
  7. INSERT OVERWRITE DIRECTORY '/tmp/jsonWrite' SELECT * FROM jsonReadTable;" \
  8. -submitUser hadoop -proxyUser hadoop
  1. kafka/bin/kafka-console-consumer.sh --topic linkis_spark_lineage_test --from-beginning --bootstrap-server localhost:9092

The output is as follows: spark-lineage-kafka

Details are as follows:

  1. {
  2. "id":"3a0e2b8e-11dc-5bd1-9bbc-cfba2fa469e9",
  3. "name":"Linkis-EngineConn-Spark_LINKISCLI",
  4. "operations":{
  5. "write":{
  6. "outputSource":"/tmp/jsonWrite",
  7. "append":false,
  8. "id":"op-0",
  9. "name":"InsertIntoHiveDirCommand",
  10. "childIds":[
  11. "op-1"
  12. ],
  13. "extra":{
  14. "destinationType":"hive"
  15. }
  16. },
  17. "reads":[
  18. {
  19. "inputSources":[
  20. "hdfs://linkishdfs/tmp/read.json"
  21. ],
  22. "id":"op-4",
  23. "name":"LogicalRelation",
  24. "output":[
  25. "attr-0",
  26. "attr-1"
  27. ],
  28. "params":{
  29. "path":"/tmp/read.json"
  30. },
  31. "extra":{
  32. "sourceType":"json"
  33. }
  34. }
  35. ],
  36. "other":[
  37. {
  38. "id":"op-3",
  39. "name":"View",
  40. "childIds":[
  41. "op-4"
  42. ],
  43. "output":[
  44. "attr-0",
  45. "attr-1"
  46. ],
  47. "params":{
  48. "desc":"CatalogTable(\nTable: jsonReadTable\nCreated Time: Tue Aug 29 14:48:06 CST 2023\nLast Access: UNKNOWN\nCreated By: Spark \nType: VIEW\nTable Properties: []\nSchema: root\n |-- age: string (nullable = true)\n |-- name: string (nullable = true)\n)",
  49. "isTempView":true
  50. }
  51. },
  52. {
  53. "id":"op-2",
  54. "name":"SubqueryAlias",
  55. "childIds":[
  56. "op-3"
  57. ],
  58. "output":[
  59. "attr-0",
  60. "attr-1"
  61. ],
  62. "params":{
  63. "identifier":"jsonreadtable"
  64. }
  65. },
  66. {
  67. "id":"op-1",
  68. "name":"Project",
  69. "childIds":[
  70. "op-2"
  71. ],
  72. "output":[
  73. "attr-0",
  74. "attr-1"
  75. ],
  76. "params":{
  77. "projectList":[
  78. {
  79. "__attrId":"attr-0"
  80. },
  81. {
  82. "__attrId":"attr-1"
  83. }
  84. ]
  85. }
  86. }
  87. ]
  88. },
  89. "attributes":[
  90. {
  91. "id":"attr-0",
  92. "dataType":"e63adadc-648a-56a0-9424-3289858cf0bb",
  93. "name":"age"
  94. },
  95. {
  96. "id":"attr-1",
  97. "dataType":"e63adadc-648a-56a0-9424-3289858cf0bb",
  98. "name":"name"
  99. }
  100. ],
  101. "expressions":{
  102. },
  103. "systemInfo":{
  104. "name":"spark",
  105. "version":"3.2.1"
  106. },
  107. "agentInfo":{
  108. "name":"spline",
  109. "version":"2.0.0"
  110. },
  111. "extraInfo":{
  112. "appName":"Linkis-EngineConn-Spark_LINKISCLI",
  113. "dataTypes":[
  114. {
  115. "id":"e63adadc-648a-56a0-9424-3289858cf0bb",
  116. "name":"string",
  117. "nullable":true,
  118. "_typeHint":"dt.Simple"
  119. }
  120. ]
  121. }
  122. }
  1. `spline-spark-agent` also supports more collection modes, such as Http and Console. For details, see the official documentation
  2. https://github.com/AbsaOSS/spline-spark-agent/#configuration