Seatunnel Engine

This article mainly introduces the installation, usage and configuration of the Seatunnel engine plugin in Linkis.

If you want to use Seatunnel engine on your Linkis service, you need to install Seatunnel engine. Moreover, Seatunnel depends on the Spark or Flink environment. Before using the linkis-seatunnel engine, it is strongly recommended to run through the Seatunnel environment locally.

Seatunnel 2.1.2 download address: https://dlcdn.apache.org/seatunnel/2.1.2/apache-seatunnel-incubating-2.1.2-bin.tar.gz

Environment variable nameEnvironment variable contentRequired or not
JAVA_HOMEJDK installation pathRequired
SEATUNNEL_HOMESeatunnel installation pathrequired
SPARK_HOMESpark installation pathSeatunnel needs to run based on Spark
FLINK_HOMEFlink installation pathSeatunnel execution is based on Flink

Table 1-1 Environment configuration list

Linkis variable namevariable contentrequired
wds.linkis.engine.seatunnel.plugin.homeSeatunnel installation pathYes

Take the execution of Spark tasks as an example

  1. cd $SEATUNNEL_HOME
  2. ./bin/start-seatunnel-spark.sh --master local[4] --deploy-mode client --config ./config/spark.batch.conf.template

The output is as follows:

Seatunnel Engine - 图1

Method 1: Download the engine plug-in package directly

Linkis Engine Plugin Download

Method 2: Compile the engine plug-in separately (requires maven environment)

  1. # compile
  2. cd ${linkis_code_dir}/linkis-engineconn-plugins/seatunnel/
  3. mvn clean install
  4. # The compiled engine plug-in package is located in the following directory
  5. ${linkis_code_dir}/linkis-engineconn-plugins/seatunnel/target/out/

EngineConnPlugin Engine Plugin Installation

Upload the engine package in 2.1 to the engine directory of the server

  1. ${LINKIS_HOME}/lib/linkis-engineconn-plugins

The directory structure after uploading is as follows

  1. linkis-engineconn-plugins/
  2. ├── seat tunnel
  3. ├── dist
  4. └── 2.1.2
  5. ├── conf
  6. └── lib
  7. └── plugin
  8. └── 2.1.2

Refresh the engine by restarting the linkis-cg-linkismanager service

  1. cd ${LINKIS_HOME}/sbin
  2. sh linkis-daemon.sh restart cg-linkismanager

You can check whether the last_update_time of the linkis_engine_conn_plugin_bml_resources table in the database is the time to trigger the refresh.

  1. #login to `linkis` database
  2. select * from linkis_cg_engine_conn_plugin_bml_resources;
  1. sh ./bin/linkis-cli --mode once -code 'test' -engineType seatunnel-2.1.2 -codeType sspark -labelMap userCreator=hadoop-seatunnel -labelMap engineConnMode=once -jobContentMap code='env {
  2. spark.app.name = "SeaTunnel"
  3. spark.executor.instances = 2
  4. spark.executor.cores = 1
  5. spark.executor.memory = "1g"
  6. }
  7. source {
  8. Fake {
  9. result_table_name = "my_dataset"
  10. }
  11. }
  12. transform {}
  13. sink {Console {}}' -jobContentMap master=local[4] -jobContentMap deploy-mode=client -sourceMap jobName=OnceJobTest -submitUser hadoop -proxyUser hadoop

OnceEngineConn calls LinkisManager’s createEngineConn interface through LinkisManagerClient, and sends the code to the created Seatunnel engine, and then Seatunnel engine starts to execute. The use of Client is also very simple, first create a new maven project, or introduce the following dependencies in the project

  1. <dependency>
  2. <groupId>org.apache.linkis</groupId>
  3. <artifactId>linkis-computation-client</artifactId>
  4. <version>${linkis.version}</version>
  5. </dependency>

Example Code

  1. package org.apache.linkis.computation.client;
  2. import org.apache.linkis.common.conf.Configuration;
  3. import org.apache.linkis.computation.client.once.simple.SubmittableSimpleOnceJob;
  4. import org.apache.linkis.computation.client.utils.LabelKeyUtils;
  5. public class SeatunnelOnceJobTest {
  6. public static void main(String[] args) {
  7. LinkisJobClient.config().setDefaultServerUrl("http://ip:9001");
  8. String code =
  9. "\n"
  10. + "env {\n"
  11. + " spark.app.name = \"SeaTunnel\"\n"
  12. + "spark.executor.instances = 2\n"
  13. + "spark.executor.cores = 1\n"
  14. + " spark.executor.memory = \"1g\"\n"
  15. + "}\n"
  16. + "\n"
  17. + "source {\n"
  18. + "Fake {\n"
  19. + " result_table_name = \"my_dataset\"\n"
  20. + " }\n"
  21. + "\n"
  22. + "}\n"
  23. + "\n"
  24. + "transform {\n"
  25. + "}\n"
  26. + "\n"
  27. + "sink {\n"
  28. + " Console {}\n"
  29. + "}";
  30. SubmittableSimpleOnceJob onceJob =
  31. LinkisJobClient.once()
  32. .simple()
  33. .builder()
  34. .setCreateService("seatunnel-Test")
  35. .setMaxSubmitTime(300000)
  36. .addLabel(LabelKeyUtils.ENGINE_TYPE_LABEL_KEY(), "seatunnel-2.1.2")
  37. .addLabel(LabelKeyUtils.USER_CREATOR_LABEL_KEY(), "hadoop-seatunnel")
  38. .addLabel(LabelKeyUtils.ENGINE_CONN_MODE_LABEL_KEY(), "once")
  39. .addStartupParam(Configuration.IS_TEST_MODE().key(), true)
  40. .addExecuteUser("hadoop")
  41. .addJobContent("runType", "sspark")
  42. .addJobContent("code", code)
  43. .addJobContent("master", "local[4]")
  44. .addJobContent("deploy-mode", "client")
  45. .addSource("jobName", "OnceJobTest")
  46. .build();
  47. onceJob. submit();
  48. System.out.println(onceJob.getId());
  49. onceJob. waitForCompleted();
  50. System.out.println(onceJob.getStatus());
  51. LinkisJobMetrics jobMetrics = onceJob. getJobMetrics();
  52. System.out.println(jobMetrics.getMetrics());
  53. }
  54. }
ConfigurationDefaultDescriptionRequired
wds.linkis.engine.seatunnel.plugin.home/opt/linkis/seatunnelSeatunnel installation pathtrue

If the default parameters are not satisfied, there are the following ways to configure some basic parameters

  1. sh ./bin/linkis-cli --mode once-code 'test' \
  2. -engineType seatunnel-2.1.2 -codeType sspark\
  3. -labelMap userCreator=hadoop-seatunnel -labelMap engineConnMode=once \
  4. -jobContentMap code='env {
  5. spark.app.name = "SeaTunnel"
  6. spark.executor.instances = 2
  7. spark.executor.cores = 1
  8. spark.executor.memory = "1g"
  9. }
  10. source {
  11. Fake {
  12. result_table_name = "my_dataset"
  13. }
  14. }
  15. transform {}
  16. sink {Console {}}' -jobContentMap master=local[4] \
  17. -jobContentMap deploy-mode=client \
  18. -sourceMap jobName=OnceJobTest\
  19. -runtimeMap wds.linkis.engine.seatunnel.plugin.home=/opt/linkis/seatunnel \
  20. -submitUser hadoop -proxyUser hadoop

Submit the task interface and configure it through the parameter params.configuration.runtime

  1. Example of http request parameters
  2. {
  3. "executionContent": {"code": 'env {
  4. spark.app.name = "SeaTunnel"
  5. spark.executor.instances = 2
  6. spark.executor.cores = 1
  7. spark.executor.memory = "1g"
  8. }
  9. source {
  10. Fake {
  11. result_table_name = "my_dataset"
  12. }
  13. }
  14. transform {}
  15. sink {Console {}}',
  16. "runType": "sql"},
  17. "params": {
  18. "variable": {},
  19. "configuration": {
  20. "runtime": {
  21. "wds.linkis.engine.seatunnel.plugin.home":"/opt/linkis/seatunnel"
  22. }
  23. }
  24. },
  25. "labels": {
  26. "engineType": "seatunnel-2.1.2",
  27. "userCreator": "hadoop-IDE"
  28. }
  29. }