Spark Engine

This article mainly introduces the installation, use and configuration of the Spark engine plugin in Linkis.

If you wish to use the spark engine on your server, you need to ensure that the following environment variables are set correctly and that the engine’s starting user has these environment variables.

It is strongly recommended that you check these environment variables for the executing user before executing a spark job.

Environment variable nameEnvironment variable contentRemarks
JAVA_HOMEJDK installation pathRequired
HADOOP_HOMEHadoop installation pathRequired
HADOOP_CONF_DIRHadoop configuration pathrequired
HIVE_CONF_DIRHive configuration pathrequired
SPARK_HOMESpark installation pathRequired
SPARK_CONF_DIRSpark configuration pathRequired
pythonpythonIt is recommended to use anaconda’s python as the default python

Verify that Spark is successfully installed by pyspark

  1. pyspark
  2. #After entering the pyspark virtual environment, the spark logo appears, indicating that the environment is successfully installed
  3. Welcome to
  4. ______
  5. /__/__ ___ _____/ /__
  6. _\ \/ _ \/ _ `/ __/ '_/
  7. /__ / .__/\_,_/_/ /_/\_\ version 3.2.1
  8. /_/
  9. Using Python version 2.7.13 (default, Sep 30 2017 18:12:43)
  10. SparkSession available as 'spark'.

The Spark engine plugin is included in the binary installation package released by linkis by default, and users do not need to install it additionally.

In theory Linkis supports all versions of spark2.x and above. The default supported version is Spark3.2.1. If you want to use another version of spark, such as spark2.1.0, you just need to modify the version of the plugin spark and compile it. Specifically, you can find the linkis-engineplugin-spark module, change the value of the <spark.version> tag in the maven dependency to 2.1.0, and then compile this module separately.

EngineConnPlugin engine plugin installation

  1. # codeType correspondence py-->pyspark sql-->sparkSQL scala-->Spark scala
  2. sh ./bin/linkis-cli -engineType spark-3.2.1 -codeType sql -code "show databases" -submitUser hadoop -proxyUser hadoop
  3. # You can specify the yarn queue in the submission parameter by -confMap wds.linkis.yarnqueue=dws
  4. sh ./bin/linkis-cli -engineType spark-3.2.1 -codeType sql -confMap wds.linkis.yarnqueue=dws -code "show databases" -submitUser hadoop -proxyUser hadoop

More Linkis-Cli command parameter reference: Linkis-Cli usage

Linkis provides SDK of Java and Scala to submit tasks to Linkis server. For details, please refer to JAVA SDK Manual. For Spark tasks you only need to modify the EngineConnType and CodeType parameters in Demo:

  1. Map<String, Object> labels = new HashMap<String, Object>();
  2. labels.put(LabelKeyConstant.ENGINE_TYPE_KEY, "spark-3.2.1"); // required engineType Label
  3. labels.put(LabelKeyConstant.USER_CREATOR_TYPE_KEY, "hadoop-IDE");// required execute user and creator
  4. labels.put(LabelKeyConstant.CODE_TYPE_KEY, "sql"); // required codeType py,sql,scala

You can also submit scala and python code:

  1. //scala
  2. labels.put(LabelKeyConstant.CODE_TYPE_KEY, "scala");
  3. code:
  4. val df=spark.sql("show tables")
  5. show(df)
  6. //pyspark
  7. /labels.put(LabelKeyConstant.CODE_TYPE_KEY, "py");
  8. code:
  9. df=spark.sql("show tables")
  10. show(df)

Through OnceEngineConn submit tasks (through the spark-submit submit jar package mission), submission for reference org.apache.linkis.com putation.Client.SparkOnceJobTest.

  1. public class SparkOnceJobTest {
  2. public static void main(String[] args) {
  3. LinkisJobClient.config().setDefaultServerUrl("http://127.0.0.1:9001");
  4. String submitUser = "linkis";
  5. String engineType = "spark";
  6. SubmittableSimpleOnceJob onceJob =
  7. // region
  8. LinkisJobClient.once().simple().builder()
  9. .setCreateService("Spark-Test")
  10. .setMaxSubmitTime(300000)
  11. .setDescription("SparkTestDescription")
  12. .addExecuteUser(submitUser)
  13. .addJobContent("runType", "jar")
  14. .addJobContent("spark.app.main.class", "org.apache.spark.examples.JavaWordCount")
  15. // Parameters obtained from the submitted jar package
  16. .addJobContent("spark.app.args", "hdfs:///tmp/test_word_count.txt") // WordCount test file
  17. .addLabel("engineType", engineType + "-2.4.7")
  18. .addLabel("userCreator", submitUser + "-IDE")
  19. .addLabel("engineConnMode", "once")
  20. .addStartupParam("spark.app.name", "spark-submit-jar-test-linkis") // Application Name on yarn
  21. .addStartupParam("spark.executor.memory", "1g")
  22. .addStartupParam("spark.driver.memory", "1g")
  23. .addStartupParam("spark.executor.cores", "1")
  24. .addStartupParam("spark.executor.instance", "1")
  25. .addStartupParam("spark.app.resource", "hdfs:///tmp/spark/spark-examples_2.11-2.3.0.2.6.5.0-292.jar")
  26. .addSource("jobName", "OnceJobTest")
  27. .build();
  28. // endregion
  29. onceJob.submit();
  30. onceJob.waitForCompleted(); //A temporary network interruption may cause an exception. It is recommended to modify the SDK later. If the SDK is in use at this stage, exception handling is required.
  31. // Temporary network failure will cause exceptions. It is recommended to modify the SDK later. For use at this stage, exception handling is required
  32. onceJob.waitForCompleted();
  33. }
  34. }

Scripts type includes sqlscalapythondata_calc(content type is json).

Restful API Usage

  1. POST /api/rest_j/v1/entrance/submit
  2. Content-Type: application/json
  3. Token-Code: dss-AUTH
  4. Token-User: linkis
  5. {
  6. "executionContent": {
  7. // script content, type: sql, python, scala, json
  8. "code": "show databases",
  9. // script type: sql, py(pyspark), scala, data_calc(json)
  10. "runType": "sql"
  11. },
  12. "params": {
  13. "variable": {
  14. },
  15. "configuration": {
  16. // spark startup parameters, not required
  17. "startup": {
  18. "spark.executor.memory": "1g",
  19. "spark.driver.memory": "1g",
  20. "spark.executor.cores": "1",
  21. "spark.executor.instances": 1
  22. }
  23. }
  24. },
  25. "source": {
  26. // not required, file:/// or hdfs:///
  27. "scriptPath": "file:///tmp/hadoop/test.sql"
  28. },
  29. "labels": {
  30. // pattern:engineType-version
  31. "engineType": "spark-3.2.1",
  32. // userCreator: linkis is username。IDE is system that be configed in Linkis。
  33. "userCreator": "linkis-IDE"
  34. }
  35. }

Upload the jar package and configuration

  1. # Upload the jar package under the lib of the linkis spark engine (modify the following parameters according to your actual installation directory)
  2. cd /appcom/Install/linkis/lib/linkis-engineconn-plugins/spark/dist/3.2.1/lib
  3. hdfs dfs -put *.jar hdfs:///spark/cluster
  4. # Upload the linkis configuration file (modify the following parameters according to your actual installation directory)
  5. cd /appcom/Install/linkis/conf
  6. hdfs dfs -put * hdfs:///spark/cluster
  7. # Upload hive-site.xml (modify the following parameters according to your actual installation directory)
  8. cd $HIVE_CONF_DIR
  9. hdfs dfs -put hive-site.xml hdfs:///spark/cluster

Can pass linkis.spark.yarn.cluster.jarsparameters to modifyhdfs:///spark/cluster

Execute the test case

  1. # Use `engingeConnRuntimeMode=yarnCluster` to specify the yarn cluster mode
  2. sh ./bin/linkis-cli -engineType spark-3.2.1 -codeType sql -labelMap engingeConnRuntimeMode=yarnCluster -submitUser hadoop -proxyUser hadoop -code "select 123"

Before submitting the task, please install the metric server on Kubernetes, as relevant APIs will be invoked during the resource validation process.

To submit task to kubernetes cluster, you need to add cluster configuration on Linkis Control Panel->Basic Data Management->External Resource Provider Manage as show in the figure. The Resource Type must be set to Kubernetes while the Name can be customized.

k8s

The parameters to be set in the Config are shown in the following table:

ConfDesc
k8sMasterUrlFull URL of the API Server such ashttps://xxx.xxx.xxx.xxx:6443. This parameter must be configured.
k8sConfigLocation of the kubeconfig file such as/home/hadoop/.kube/config. If this parameter is configured, the following three parameters do not need to be configured.
k8sCaCertDataCA certificate for clusters in kubeconfig corresponding to certificate-authority-data. If k8sConfig is not configured, you need to configure this parameter
k8sClientCertDataClient certificate in kubeconfig corresponding to client-certificate-data,If k8sConfig is not configured, you need to configure this parameter
k8sClientKeyDataClient private key in kubeconfig corresponding to client-key-data,If k8sConfig is not configured, you need to configure this parameter

After external provider configuration, you need to configure corresponding cluster label information on ECM Managerment as shown in the figure. You need to selete yarnClusterfor label type and K8S-cluster name for label value where the cluster name is the name specified in External Resource Provider Configuration such as K8S-default if the name is set to default in the previous step.

Due to compatibility issues with ClusterLabel, the Key value has not been changed yet(yarnCluster).

k8s-ecm-label

When usinglinkis-cli to submit task, the parameters that need to be set are as follows:

  • Specify the cluster to execute the task. If the cluster name is default when configuring the external provider, you need to specify the value of the k8sCluster as 'K8S-default' when submitting the task;
  • To distinguish it from the k8s-operator submitting method, you need to specify the spark.master parameter as k8s-native;
  • Currently spark once job tasks on k8s only support cluster deploy mode, you need to set spark.submit.deployMode to cluster.

The corresponding Spark parameter of Linkis parameters as follows:

Linkis ParametersSpark ParametersDefault Value
linkis.spark.k8s.master.url—masterempty string
linkis.spark.k8s.serviceAccountspark.kubernetes.authenticate.driver.serviceAccountNameempty string
linkis.spark.k8s.imagespark.kubernetes.container.imageapache/spark:v3.2.1
linkis.spark.k8s.imagePullPolicyspark.kubernetes.container.image.pullPolicyAlways
linkis.spark.k8s.namespacespark.kubernetes.namespacedefault
linkis.spark.k8s.ui.portspark.ui.port4040
linkis.spark.k8s.executor.request.coresspark.kubernetes.executor.request.cores1
linkis.spark.k8s.driver.request.coresspark.kubernetes.driver.request.cores1

submitting task with jar

  1. linkis-cli --mode once \
  2. -engineType spark-3.2.1 \
  3. -labelMap engineConnMode=once \
  4. -k8sCluster 'K8S-default' \
  5. -jobContentMap runType='jar' \
  6. -jobContentMap spark.app.main.class='org.apache.spark.examples.SparkPi' \
  7. -confMap spark.master='k8s-native' \
  8. -confMap spark.app.name='spark-submit-jar-k8s' \
  9. -confMap spark.app.resource='local:///opt/spark/examples/jars/spark-examples_2.12-3.2.1.jar' \
  10. -confMap spark.submit.deployMode='cluster' \
  11. -confMap linkis.spark.k8s.serviceAccount='spark' \
  12. -confMap linkis.spark.k8s.master.url='k8s://https://xxx.xxx.xxx.xxx:6443' \
  13. -confMap linkis.spark.k8s.config.file='/home/hadoop/.kube/config' \
  14. -confMap linkis.spark.k8s.imagePullPolicy='IfNotPresent' \
  15. -confMap linkis.spark.k8s.namespace='default'

submitting task with py

  1. linkis-cli --mode once \
  2. -engineType spark-3.2.1 \
  3. -labelMap engineConnMode=once \
  4. -k8sCluster 'K8S-default' \
  5. -jobContentMap runType='py' \
  6. -confMap spark.master='k8s-native' \
  7. -confMap spark.app.name='spark-submit-py-k8s' \
  8. -confMap spark.app.resource='local:///opt/spark/examples/src/main/python/pi.py' \
  9. -confMap spark.submit.deployMode='cluster' \
  10. -confMap spark.submit.pyFiles='local:///opt/spark/examples/src/main/python/wordcount.py' \
  11. -confMap linkis.spark.k8s.serviceAccount='spark' \
  12. -confMap linkis.spark.k8s.master.url='k8s://https://xxx.xxx.xxx.xxx:6443' \
  13. -confMap linkis.spark.k8s.config.file='/home/hadoop/.kube/config' \
  14. -confMap linkis.spark.k8s.imagePullPolicy='IfNotPresent' \
  15. -confMap linkis.spark.k8s.namespace='default' \
  16. -confMap linkis.spark.k8s.image="apache/spark-py:v3.2.1"

Upgrade instructions for old version

  • You need to use linkis-dist/package/db/upgrade/1.5.0_schema/mysql/linkis_ddl.sql to upgrade the database fields. Specifically, the label_key field of linkis_cg_manager_label needs to be increased from 32 to 50 in length.

    1. ALTER TABLE `linkis_cg_manager_label` MODIFY COLUMN label_key varchar(50);
  • Prior to version 1.5.0, when building CombineLabel, ClusterLabel was not included. To maintain compatibility with older versions, when the submitted ClusterLabel value is ‘Yarn-default’, ClusterLabel is still not included when building CombineLabel. You can disable this feature by setting linkis.combined.without.yarn.default to false (default is true).

    The specific reason is that if tasks related to that ClusterLabel were submitted in old versions, corresponding resource records would exist in the database. After upgrading to the new version, since CombineLabel includes ClusterLabel, conflicts would occur in the database’s resource records when submitting tasks of this type. Therefore, to maintain compatibility with older versions, the construction of CombineLabel for Yarn-default (the default value of ClusterLabel) still does not include ClusterLabel. If the latest version is installed directly, this issue does not need to be considered because there are no conflicting records in the database. You can set linkis.combined.without.yarn.default to false to improve readability.

Validation of submitting tasks

Submitting a Spark Once Job to K8S involves two levels of resource validation, and the task will only be submitted to the K8S cluster after passing both levels of validation:

  1. First, user resource quota validation will be performed. For detailed process, please refer to ResourceManager Architecture.
  2. Next, resource validation for the K8S cluster will be performed. If a resourceQuota is configured under the current namespace, it will be prioritized for validation. Otherwise, the available resources of the cluster will be calculated directly through the metric server for validation.
ConfigurationDefaultRequiredDescription
wds.linkis.rm.instance10NoMaximum number of concurrent engines
spark.executor.cores1NoNumber of spark executor cores
spark.driver.memory1gnomaximum concurrent number of spark executor instances
spark.executor.memory1gNospark executor memory size
wds.linkis.engineconn.max.free.time1hNoEngine idle exit time
spark.python.versionpython2nopython version

Because the execution of spark requires queue resources, you need to set up a queue that you can execute.

yarn

If the default parameters are not satisfied, there are the following ways to configure some basic parameters

Users can customize settings, such as the number of spark sessions executor and executor memory. These parameters are for users to set their own spark parameters more freely, and other spark parameters can also be modified, such as the python version of pyspark, etc. spark

Note: After modifying the configuration under the IDE tag, you need to specify -creator IDE to take effect (other tags are similar), such as:

  1. sh ./bin/linkis-cli -creator IDE \
  2. -engineType spark-3.2.1 -codeType sql \
  3. -code "show databases" \
  4. -submitUser hadoop -proxyUser hadoop

Submit the task interface, configure it through the parameter params.configuration.runtime

  1. Example of http request parameters
  2. {
  3. "executionContent": {"code": "show databases;", "runType": "sql"},
  4. "params": {
  5. "variable": {},
  6. "configuration": {
  7. "runtime": {
  8. "wds.linkis.rm.instance":"10"
  9. }
  10. }
  11. },
  12. "labels": {
  13. "engineType": "spark-3.2.1",
  14. "userCreator": "hadoop-IDE"
  15. }
  16. }

Linkis is managed through the engine tag, and the data table information involved is shown below.

  1. linkis_ps_configuration_config_key: Insert the key and default values ​​​​of the configuration parameters of the engine
  2. linkis_cg_manager_label: insert engine label such as: spark-3.2.1
  3. linkis_ps_configuration_category: The directory association relationship of the insertion engine
  4. linkis_ps_configuration_config_value: The configuration that the insertion engine needs to display
  5. linkis_ps_configuration_key_engine_relation: The relationship between the configuration item and the engine

The initial data in the table related to the spark engine is as follows

  1. -- set variable
  2. SET @SPARK_LABEL="spark-3.2.1";
  3. SET @SPARK_ALL=CONCAT('*-*,',@SPARK_LABEL);
  4. SET @SPARK_IDE=CONCAT('*-IDE,',@SPARK_LABEL);
  5. -- engine label
  6. insert into `linkis_cg_manager_label` (`label_key`, `label_value`, `label_feature`, `label_value_size`, `update_time`, `create_time`) VALUES ('combined_userCreator_engineType', @SPARK_ALL, 'OPTIONAL', 2, now(), now());
  7. insert into `linkis_cg_manager_label` (`label_key`, `label_value`, `label_feature`, `label_value_size`, `update_time`, `create_time`) VALUES ('combined_userCreator_engineType', @SPARK_IDE, 'OPTIONAL', 2, now(), now());
  8. select @label_id := id from linkis_cg_manager_label where `label_value` = @SPARK_IDE;
  9. insert into linkis_ps_configuration_category (`label_id`, `level`) VALUES (@label_id, 2);
  10. -- configuration key
  11. INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('wds.linkis.rm.instance', 'Range: 1-20, unit: each', 'Maximum concurrent number of spark engine', '10', 'NumInterval', '[1,20]', '0 ', '0', '1', 'queue resources', 'spark');
  12. INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('spark.executor.instances', 'value range: 1-40, unit: individual', 'maximum concurrent number of spark executor instances', '1', 'NumInterval', '[1,40]', '0', '0', '2', 'spark resource settings', 'spark');
  13. INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('spark.executor.cores', 'Value range: 1-8, unit: number', 'Number of spark executor cores', '1', 'NumInterval', '[1,8]', ' 0', '0', '1','spark resource settings', 'spark');
  14. INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('spark.executor.memory', 'value range: 1-15, unit: G', 'spark executor memory size', '1g', 'Regex', '^([1-9]|1 [0-5])(G|g)$', '0', '0', '3', 'spark resource settings', 'spark');
  15. INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('spark.driver.cores', 'Value range: only 1, unit: number', 'Number of spark driver cores', '1', 'NumInterval', '[1,1]', '0 ', '1', '1', 'spark resource settings', 'spark');
  16. INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('spark.driver.memory', 'value range: 1-15, unit: G', 'spark driver memory size','1g', 'Regex', '^([1-9]|1[ 0-5])(G|g)$', '0', '0', '1', 'spark resource settings', 'spark');
  17. INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('wds.linkis.engineconn.max.free.time', 'Value range: 3m,15m,30m,1h,2h', 'Engine idle exit time','1h', 'OFT', '[\ "1h\",\"2h\",\"30m\",\"15m\",\"3m\"]', '0', '0', '1', 'spark engine settings', ' spark');
  18. INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('spark.tispark.pd.addresses', NULL, NULL, 'pd0:2379', 'None', NULL, '0', '0', '1', 'tidb设置', 'spark');
  19. INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('spark.tispark.tidb.addr', NULL, NULL, 'tidb', 'None', NULL, '0', '0', '1', 'tidb设置', 'spark');
  20. INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('spark.tispark.tidb.password', NULL, NULL, NULL, 'None', NULL, '0', '0', '1', 'tidb设置', 'spark');
  21. INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('spark.tispark.tidb.port', NULL, NULL, '4000', 'None', NULL, '0', '0', '1', 'tidb设置', 'spark');
  22. INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('spark.tispark.tidb.user', NULL, NULL, 'root', 'None', NULL, '0', '0', '1', 'tidb设置', 'spark');
  23. INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('spark.python.version', 'Value range: python2,python3', 'python version','python2', 'OFT', '[\"python3\",\"python2\"]', ' 0', '0', '1', 'spark engine settings', 'spark');
  24. -- key engine relation
  25. insert into `linkis_ps_configuration_key_engine_relation` (`config_key_id`, `engine_type_label_id`)
  26. (select config.id as `config_key_id`, label.id AS `engine_type_label_id` FROM linkis_ps_configuration_config_key config
  27. INNER JOIN linkis_cg_manager_label label ON config.engine_conn_type = 'spark' and label.label_value = @SPARK_ALL);
  28. -- engine default configuration
  29. insert into `linkis_ps_configuration_config_value` (`config_key_id`, `config_value`, `config_label_id`)
  30. (select `relation`.`config_key_id` AS `config_key_id`, '' AS `config_value`, `relation`.`engine_type_label_id` AS `config_label_id` FROM linkis_ps_configuration_key_engine_relation relation
  31. INNER JOIN linkis_cg_manager_label label ON relation.engine_type_label_id = label.id AND label.label_value = @SPARK_ALL);