Seatunnel

本文主要介绍在 Linkis 中,Seatunnel 引擎插件的安装、使用和配置。

如果您希望在您的 Linkis 服务上使用 Seatunnel 引擎,您需要安装 Seatunnel 引擎。而且 Seatunnel 是依赖 SparkFlink 环境,使用 linkis-seatunnel 引擎前,强烈建议本地跑通 Seatunnel 环境。

Seatunnel 2.1.2 下载地址:https://dlcdn.apache.org/incubator/seatunnel/2.1.2/apache-seatunnel-incubating-2.1.2-bin.tar.gz

环境变量名称环境变量内容是否需要
JAVA_HOMEJDK安装路径需要
SEATUNNEL_HOMESeatunnel安装路径需要
SPARK_HOMESpark安装路径Seatunnel执行基于Spark就需要
FLINK_HOMEFlink安装路径Seatunnel执行基于Flink就需要

表1-1 环境配置清单

Linkis变量名称变量内容是否必须
wds.linkis.engine.seatunnel.plugin.homeSeatunnel安装路径

以执行 Spark 任务为例

  1. cd $SEATUNNEL_HOME
  2. ./bin/start-seatunnel-spark.sh --master local[4] --deploy-mode client --config ./config/spark.batch.conf.template

输出结果如下:

Seatunnel - 图1

方式一:直接下载引擎插件包

Linkis 引擎插件下载

方式二:单独编译引擎插件(需要有 maven 环境)

  1. # 编译
  2. cd ${linkis_code_dir}/linkis-engineconn-plugins/seatunnel/
  3. mvn clean install
  4. # 编译出来的引擎插件包,位于如下目录中
  5. ${linkis_code_dir}/linkis-engineconn-plugins/seatunnel/target/out/

EngineConnPlugin 引擎插件安装

将 2.1 中的引擎包上传到服务器的引擎目录下

  1. ${LINKIS_HOME}/lib/linkis-engineconn-plugins

上传后目录结构如下所示

  1. linkis-engineconn-plugins/
  2. ├── seatunnel
  3. ├── dist
  4. └── 2.1.2
  5. ├── conf
  6. └── lib
  7. └── plugin
  8. └── 2.1.2

通过重启 linkis-cg-linkismanager 服务刷新引擎

  1. cd ${LINKIS_HOME}/sbin
  2. sh linkis-daemon.sh restart cg-linkismanager

可以查看数据库中的 linkis_engine_conn_plugin_bml_resources 这张表的last_update_time 是否为触发刷新的时间。

  1. #登陆到 `linkis` 的数据库
  2. select * from linkis_cg_engine_conn_plugin_bml_resources;
  1. sh ./bin/linkis-cli --mode once -code 'test' -engineType seatunnel-2.1.2 -codeType sspark -labelMap userCreator=hadoop-seatunnel -labelMap engineConnMode=once -jobContentMap code='env {
  2. spark.app.name = "SeaTunnel"
  3. spark.executor.instances = 2
  4. spark.executor.cores = 1
  5. spark.executor.memory = "1g"
  6. }
  7. source {
  8. Fake {
  9. result_table_name = "my_dataset"
  10. }
  11. }
  12. transform {}
  13. sink {Console {}}' -jobContentMap master=local[4] -jobContentMap deploy-mode=client -sourceMap jobName=OnceJobTest -submitUser hadoop -proxyUser hadoop

OnceEngineConn 通过 LinkisManagerClient 调用 LinkisManager 的 createEngineConn 接口,并将代码发送到创建的 Seatunnel 引擎,然后 Seatunnel 引擎开始执行。 Client 的使用也非常简单,首先创建一个新的 maven 项目,或者在项目中引入以下依赖项

  1. <dependency>
  2. <groupId>org.apache.linkis</groupId>
  3. <artifactId>linkis-computation-client</artifactId>
  4. <version>${linkis.version}</version>
  5. </dependency>

示例代码

  1. package org.apache.linkis.computation.client;
  2. import org.apache.linkis.common.conf.Configuration;
  3. import org.apache.linkis.computation.client.once.simple.SubmittableSimpleOnceJob;
  4. import org.apache.linkis.computation.client.utils.LabelKeyUtils;
  5. public class SeatunnelOnceJobTest {
  6. public static void main(String[] args) {
  7. LinkisJobClient.config().setDefaultServerUrl("http://ip:9001");
  8. String code =
  9. "\n"
  10. + "env {\n"
  11. + " spark.app.name = \"SeaTunnel\"\n"
  12. + " spark.executor.instances = 2\n"
  13. + " spark.executor.cores = 1\n"
  14. + " spark.executor.memory = \"1g\"\n"
  15. + "}\n"
  16. + "\n"
  17. + "source {\n"
  18. + " Fake {\n"
  19. + " result_table_name = \"my_dataset\"\n"
  20. + " }\n"
  21. + "\n"
  22. + "}\n"
  23. + "\n"
  24. + "transform {\n"
  25. + "}\n"
  26. + "\n"
  27. + "sink {\n"
  28. + " Console {}\n"
  29. + "}";
  30. SubmittableSimpleOnceJob onceJob =
  31. LinkisJobClient.once()
  32. .simple()
  33. .builder()
  34. .setCreateService("seatunnel-Test")
  35. .setMaxSubmitTime(300000)
  36. .addLabel(LabelKeyUtils.ENGINE_TYPE_LABEL_KEY(), "seatunnel-2.1.2")
  37. .addLabel(LabelKeyUtils.USER_CREATOR_LABEL_KEY(), "hadoop-seatunnel")
  38. .addLabel(LabelKeyUtils.ENGINE_CONN_MODE_LABEL_KEY(), "once")
  39. .addStartupParam(Configuration.IS_TEST_MODE().key(), true)
  40. .addExecuteUser("hadoop")
  41. .addJobContent("runType", "sspark")
  42. .addJobContent("code", code)
  43. .addJobContent("master", "local[4]")
  44. .addJobContent("deploy-mode", "client")
  45. .addSource("jobName", "OnceJobTest")
  46. .build();
  47. onceJob.submit();
  48. System.out.println(onceJob.getId());
  49. onceJob.waitForCompleted();
  50. System.out.println(onceJob.getStatus());
  51. LinkisJobMetrics jobMetrics = onceJob.getJobMetrics();
  52. System.out.println(jobMetrics.getMetrics());
  53. }
  54. }
配置默认值说明是否必须
wds.linkis.engine.seatunnel.plugin.home/opt/linkis/seatunnelSeatunnel安装路径true

如果默认参数不满足时,有如下几中方式可以进行一些基础参数配置

  1. sh ./bin/linkis-cli --mode once -code 'test' \
  2. -engineType seatunnel-2.1.2 -codeType sspark \
  3. -labelMap userCreator=hadoop-seatunnel -labelMap engineConnMode=once \
  4. -jobContentMap code='env {
  5. spark.app.name = "SeaTunnel"
  6. spark.executor.instances = 2
  7. spark.executor.cores = 1
  8. spark.executor.memory = "1g"
  9. }
  10. source {
  11. Fake {
  12. result_table_name = "my_dataset"
  13. }
  14. }
  15. transform {}
  16. sink {Console {}}' -jobContentMap master=local[4] \
  17. -jobContentMap deploy-mode=client \
  18. -sourceMap jobName=OnceJobTest \
  19. -runtimeMap wds.linkis.engine.seatunnel.plugin.home=/opt/linkis/seatunnel \
  20. -submitUser hadoop -proxyUser hadoop

提交任务接口,通过参数 params.configuration.runtime 进行配置

  1. http 请求参数示例
  2. {
  3. "executionContent": {"code": 'env {
  4. spark.app.name = "SeaTunnel"
  5. spark.executor.instances = 2
  6. spark.executor.cores = 1
  7. spark.executor.memory = "1g"
  8. }
  9. source {
  10. Fake {
  11. result_table_name = "my_dataset"
  12. }
  13. }
  14. transform {}
  15. sink {Console {}}',
  16. "runType": "sql"},
  17. "params": {
  18. "variable": {},
  19. "configuration": {
  20. "runtime": {
  21. "wds.linkis.engine.seatunnel.plugin.home":"/opt/linkis/seatunnel"
  22. }
  23. }
  24. },
  25. "labels": {
  26. "engineType": "seatunnel-2.1.2",
  27. "userCreator": "hadoop-IDE"
  28. }
  29. }