Spark

本文主要介绍在 Linkis 中, Spark 引擎插件的安装、使用和配置。

如果您希望在您的服务器上使用 Spark 引擎,您需要保证以下的环境变量已经设置正确并且引擎的启动用户是有这些环境变量的。

强烈建议您在执行 Spark 任务之前,检查下执行用户的这些环境变量。

环境变量名环境变量内容备注
JAVA_HOMEJDK安装路径必须
HADOOP_HOMEHadoop安装路径必须
HADOOP_CONF_DIRHadoop配置路径必须
HIVE_CONF_DIRHive配置路径必须
SPARK_HOMESpark安装路径必须
SPARK_CONF_DIRSpark配置路径必须
pythonpython建议使用anaconda的python作为默认python

通过 pyspark 验证 Spark 是否安装成功

  1. pyspark
  2. #进入pyspark虚拟环境后,出现spark的logo则说明环境安装成功
  3. Welcome to
  4. ____ __
  5. / __/__ ___ _____/ /__
  6. _\ \/ _ \/ _ `/ __/ '_/
  7. /__ / .__/\_,_/_/ /_/\_\ version 3.2.1
  8. /_/
  9. Using Python version 2.7.13 (default, Sep 30 2017 18:12:43)
  10. SparkSession available as 'spark'.

Linkis 发布的二进制安装包中默认包含了 Spark 引擎插件,用户无需额外安装。

理论上 Linkis 支持的 Spark2.x 以上的所有版本。默认支持的版本为 Spark3.2.1 。如果您想使用其他的 Spark 版本,如 Spark2.1.0 ,则您仅仅需要将插件 Spark 的版本进行修改,然后进行编译即可。具体的,您可以找到 linkis-engineplugin-spark 模块,将 maven 依赖中 <spark.version> 标签的值改成 2.1.0 ,然后单独编译此模块即可。

EngineConnPlugin引擎插件安装

  1. # codeType对应关系 py-->pyspark sql-->sparkSQL scala-->Spark scala
  2. sh ./bin/linkis-cli -engineType spark-3.2.1 -codeType sql -code "show databases" -submitUser hadoop -proxyUser hadoop
  3. # 可以在提交参数通过-confMap wds.linkis.yarnqueue=dws 来指定yarn 队列
  4. sh ./bin/linkis-cli -engineType spark-3.2.1 -codeType sql -confMap wds.linkis.yarnqueue=dws -code "show databases" -submitUser hadoop -proxyUser hadoop

更多 Linkis-Cli 命令参数参考: Linkis-Cli 使用

Linkis 提供了 JavaScalaSDKLinkis 服务端提交任务。具体可以参考 JAVA SDK Manual。对于 Spark 任务你只需要修改 Demo 中的 EngineConnTypeCodeType 参数即可:

  1. Map<String, Object> labels = new HashMap<String, Object>();
  2. labels.put(LabelKeyConstant.ENGINE_TYPE_KEY, "spark-3.2.1"); // required engineType Label
  3. labels.put(LabelKeyConstant.USER_CREATOR_TYPE_KEY, "hadoop-IDE");// required execute user and creator
  4. labels.put(LabelKeyConstant.CODE_TYPE_KEY, "sql"); // required codeType py,sql,scala

Spark还支持提交Scala代码和Pyspark代码:

  1. //scala
  2. labels.put(LabelKeyConstant.CODE_TYPE_KEY, "scala");
  3. code:
  4. val df=spark.sql("show tables")
  5. show(df)
  6. //pyspark
  7. /labels.put(LabelKeyConstant.CODE_TYPE_KEY, "py");
  8. code:
  9. df=spark.sql("show tables")
  10. show(df)

通过 OnceEngineConn 提交任务(通过 spark-submit 提交 jar 包执行任务),提交方式参考 org.apache.linkis.computation.client.SparkOnceJobTest

  1. public class SparkOnceJobTest {
  2. public static void main(String[] args) {
  3. LinkisJobClient.config().setDefaultServerUrl("http://127.0.0.1:9001");
  4. String submitUser = "linkis";
  5. String engineType = "spark";
  6. SubmittableSimpleOnceJob onceJob =
  7. // region
  8. LinkisJobClient.once().simple().builder()
  9. .setCreateService("Spark-Test")
  10. .setMaxSubmitTime(300000)
  11. .setDescription("SparkTestDescription")
  12. .addExecuteUser(submitUser)
  13. .addJobContent("runType", "jar")
  14. .addJobContent("spark.app.main.class", "org.apache.spark.examples.JavaWordCount")
  15. // 提交的jar包获取的参数
  16. .addJobContent("spark.app.args", "hdfs:///tmp/test_word_count.txt") // WordCount 测试文件
  17. .addLabel("engineType", engineType + "-2.4.7")
  18. .addLabel("userCreator", submitUser + "-IDE")
  19. .addLabel("engineConnMode", "once")
  20. .addStartupParam("spark.app.name", "spark-submit-jar-test-linkis") // yarn上展示的 Application Name
  21. .addStartupParam("spark.executor.memory", "1g")
  22. .addStartupParam("spark.driver.memory", "1g")
  23. .addStartupParam("spark.executor.cores", "1")
  24. .addStartupParam("spark.executor.instance", "1")
  25. .addStartupParam("spark.app.resource", "hdfs:///tmp/spark/spark-examples_2.11-2.3.0.2.6.5.0-292.jar")
  26. .addSource("jobName", "OnceJobTest")
  27. .build();
  28. // endregion
  29. onceJob.submit();
  30. onceJob.waitForCompleted(); // 网络临时不通会导致异常,建议后期修改 SDK,现阶段使用,需要做异常处理
  31. // 网络临时不通会导致异常,建议后期修改 SDK,现阶段使用,需要做异常处理
  32. onceJob.waitForCompleted();
  33. }
  34. }

运行脚本类型包括 sqlscalapythondata_calc(格式为json)

任务提交执行Restful API文档

  1. POST /api/rest_j/v1/entrance/submit
  2. Content-Type: application/json
  3. Token-Code: dss-AUTH
  4. Token-User: linkis
  5. {
  6. "executionContent": {
  7. // 脚本内容,可以是sql,python,scala,json
  8. "code": "show databases",
  9. // 运行的脚本类型 sql, py(pyspark), scala, data_calc
  10. "runType": "sql"
  11. },
  12. "params": {
  13. "variable": {
  14. },
  15. "configuration": {
  16. // spark 启动参数,非必填
  17. "startup": {
  18. "spark.executor.memory": "1g",
  19. "spark.driver.memory": "1g",
  20. "spark.executor.cores": "1",
  21. "spark.executor.instances": 1
  22. }
  23. }
  24. },
  25. "source": {
  26. // 非必填,file:/// 或者 hdfs:///
  27. "scriptPath": "file:///tmp/hadoop/test.sql"
  28. },
  29. "labels": {
  30. // 格式为:引擎类型-版本
  31. "engineType": "spark-3.2.1",
  32. // userCreator: linkis 为用户名。IDE 是系统名,在 Linkis 后台管理。
  33. "userCreator": "linkis-IDE"
  34. }
  35. }

上传jar包和配置

  1. # 上传linkis spark引擎的lib下的jar包 (根据您的实际安装目录修改以下参数)
  2. cd /appcom/Install/linkis/lib/linkis-engineconn-plugins/spark/dist/3.2.1/lib
  3. hdfs dfs -put *.jar hdfs:///spark/cluster
  4. # 上传linkis 配置文件 (根据您的实际安装目录修改以下参数)
  5. cd /appcom/Install/linkis/conf
  6. hdfs dfs -put * hdfs:///spark/cluster
  7. # 上传hive-site.xml (根据您的实际安装目录修改以下参数)
  8. cd $HIVE_CONF_DIR
  9. hdfs dfs -put hive-site.xml hdfs:///spark/cluster

可以通过linkis.spark.yarn.cluster.jars参数来修改hdfs:///spark/cluster

执行测试用例

  1. # 使用 `engingeConnRuntimeMode=yarnCluster` 来指定yarn cluster模式
  2. sh ./bin/linkis-cli -engineType spark-3.2.1 -codeType sql -labelMap engingeConnRuntimeMode=yarnCluster -submitUser hadoop -proxyUser hadoop -code "select 123"
配置默认值是否必须说明
wds.linkis.rm.instance10引擎最大并发数
spark.executor.cores1spark执行器核心个数
spark.driver.memory1gspark执行器实例最大并发数
spark.executor.memory1gspark执行器内存大小
wds.linkis.engineconn.max.free.time1h引擎空闲退出时间
spark.python.versionpython2python版本

因为 Spark 任务的执行需要队列的资源,须要设置自己能够执行的队列。

yarn

如果默认参数不满足时,有如下几中方式可以进行一些基础参数配置

用户可以进行自定义的设置,比如 Spark 会话 executor 个数和 executor 的内存。这些参数是为了用户能够更加自由地设置自己的 spark 的参数,另外 Spark 其他参数也可以进行修改,比如的 pysparkpython 版本等。 spark

注意: 修改 IDE 标签下的配置后需要指定 -creator IDE 才会生效(其它标签类似),如:

  1. sh ./bin/linkis-cli -creator IDE \
  2. -engineType spark-3.2.1 -codeType sql \
  3. -code "show databases" \
  4. -submitUser hadoop -proxyUser hadoop

提交任务接口,通过参数 params.configuration.runtime 进行配置

  1. http 请求参数示例
  2. {
  3. "executionContent": {"code": "show databases;", "runType": "sql"},
  4. "params": {
  5. "variable": {},
  6. "configuration": {
  7. "runtime": {
  8. "wds.linkis.rm.instance":"10"
  9. }
  10. }
  11. },
  12. "labels": {
  13. "engineType": "spark-3.2.1",
  14. "userCreator": "hadoop-IDE"
  15. }
  16. }

Linkis 是通过引擎标签来进行管理的,所涉及的数据表信息如下所示。

  1. linkis_ps_configuration_config_key: 插入引擎的配置参数的key和默认values
  2. linkis_cg_manager_label:插入引擎label如:spark-3.2.1
  3. linkis_ps_configuration_category 插入引擎的目录关联关系
  4. linkis_ps_configuration_config_value 插入引擎需要展示的配置
  5. linkis_ps_configuration_key_engine_relation:配置项和引擎的关联关系

表中与引擎相关的初始数据如下

  1. -- set variable
  2. SET @SPARK_LABEL="spark-3.2.1";
  3. SET @SPARK_ALL=CONCAT('*-*,',@SPARK_LABEL);
  4. SET @SPARK_IDE=CONCAT('*-IDE,',@SPARK_LABEL);
  5. -- engine label
  6. insert into `linkis_cg_manager_label` (`label_key`, `label_value`, `label_feature`, `label_value_size`, `update_time`, `create_time`) VALUES ('combined_userCreator_engineType', @SPARK_ALL, 'OPTIONAL', 2, now(), now());
  7. insert into `linkis_cg_manager_label` (`label_key`, `label_value`, `label_feature`, `label_value_size`, `update_time`, `create_time`) VALUES ('combined_userCreator_engineType', @SPARK_IDE, 'OPTIONAL', 2, now(), now());
  8. select @label_id := id from linkis_cg_manager_label where `label_value` = @SPARK_IDE;
  9. insert into linkis_ps_configuration_category (`label_id`, `level`) VALUES (@label_id, 2);
  10. -- configuration key
  11. INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('wds.linkis.rm.instance', '范围:1-20,单位:个', 'spark引擎最大并发数', '10', 'NumInterval', '[1,20]', '0', '0', '1', '队列资源', 'spark');
  12. INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('spark.executor.instances', '取值范围:1-40,单位:个', 'spark执行器实例最大并发数', '1', 'NumInterval', '[1,40]', '0', '0', '2', 'spark资源设置', 'spark');
  13. INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('spark.executor.cores', '取值范围:1-8,单位:个', 'spark执行器核心个数', '1', 'NumInterval', '[1,8]', '0', '0', '1','spark资源设置', 'spark');
  14. INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('spark.executor.memory', '取值范围:1-15,单位:G', 'spark执行器内存大小', '1g', 'Regex', '^([1-9]|1[0-5])(G|g)$', '0', '0', '3', 'spark资源设置', 'spark');
  15. INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('spark.driver.cores', '取值范围:只能取1,单位:个', 'spark驱动器核心个数', '1', 'NumInterval', '[1,1]', '0', '1', '1', 'spark资源设置','spark');
  16. INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('spark.driver.memory', '取值范围:1-15,单位:G', 'spark驱动器内存大小','1g', 'Regex', '^([1-9]|1[0-5])(G|g)$', '0', '0', '1', 'spark资源设置', 'spark');
  17. INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('wds.linkis.engineconn.max.free.time', '取值范围:3m,15m,30m,1h,2h', '引擎空闲退出时间','1h', 'OFT', '[\"1h\",\"2h\",\"30m\",\"15m\",\"3m\"]', '0', '0', '1', 'spark引擎设置', 'spark');
  18. INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('spark.tispark.pd.addresses', NULL, NULL, 'pd0:2379', 'None', NULL, '0', '0', '1', 'tidb设置', 'spark');
  19. INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('spark.tispark.tidb.addr', NULL, NULL, 'tidb', 'None', NULL, '0', '0', '1', 'tidb设置', 'spark');
  20. INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('spark.tispark.tidb.password', NULL, NULL, NULL, 'None', NULL, '0', '0', '1', 'tidb设置', 'spark');
  21. INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('spark.tispark.tidb.port', NULL, NULL, '4000', 'None', NULL, '0', '0', '1', 'tidb设置', 'spark');
  22. INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('spark.tispark.tidb.user', NULL, NULL, 'root', 'None', NULL, '0', '0', '1', 'tidb设置', 'spark');
  23. INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('spark.python.version', '取值范围:python2,python3', 'python版本','python2', 'OFT', '[\"python3\",\"python2\"]', '0', '0', '1', 'spark引擎设置', 'spark');
  24. -- key engine relation
  25. insert into `linkis_ps_configuration_key_engine_relation` (`config_key_id`, `engine_type_label_id`)
  26. (select config.id as `config_key_id`, label.id AS `engine_type_label_id` FROM linkis_ps_configuration_config_key config
  27. INNER JOIN linkis_cg_manager_label label ON config.engine_conn_type = 'spark' and label.label_value = @SPARK_ALL);
  28. -- engine default configuration
  29. insert into `linkis_ps_configuration_config_value` (`config_key_id`, `config_value`, `config_label_id`)
  30. (select `relation`.`config_key_id` AS `config_key_id`, '' AS `config_value`, `relation`.`engine_type_label_id` AS `config_label_id` FROM linkis_ps_configuration_key_engine_relation relation
  31. INNER JOIN linkis_cg_manager_label label ON relation.engine_type_label_id = label.id AND label.label_value = @SPARK_ALL);