Pipeline Engine

Pipeline is mainly used to import and export files. This article mainly introduces the installation, use and configuration of the Hive engine plugin in Linkis.

Method 1: Download the engine plug-in package directly

Linkis Engine Plugin Download

Method 2: Compile the engine plug-in separately (maven environment is required)

  1. # compile
  2. cd ${linkis_code_dir}/linkis-engineconn-plugins/pipeline/
  3. mvn clean install
  4. # The compiled engine plug-in package is located in the following directory
  5. ${linkis_code_dir}/linkis-engineconn-plugins/pipeline/target/out/

EngineConnPlugin engine plugin installation

Upload the engine plug-in package in 1.1 to the engine directory of the server

  1. ${LINKIS_HOME}/lib/linkis-engineconn-plugins

The directory structure after uploading is as follows

  1. linkis-engineconn-plugins/
  2. ├── pipeline
  3. ├── dist
  4. └── 1
  5. ├── conf
  6. └── lib
  7. └── plugin
  8. └── 1

Refresh the engine by restarting the linkis-cg-linkismanager service

  1. cd ${LINKIS_HOME}/sbin
  2. sh linkis-daemon.sh restart cg-linkismanager

You can check whether the last_update_time of the linkis_engine_conn_plugin_bml_resources table in the database is the time to trigger the refresh.

  1. #Log in to the linkis database
  2. select * from linkis_cg_engine_conn_plugin_bml_resources;

Because the pipeline engine is mainly used to import and export files, now we assume that importing files from A to B is an introduction case

  1. sh bin/linkis-cli -submitUser Hadoop \
  2. -engineType pipeline-1 -codeType pipeline \
  3. -code "from hdfs:///000/000/000/A.dolphin to file:///000/000/000/B.csv"

from hdfs:///000/000/000/A.dolphin to file:///000/000/000/B.csv This content is explained in 2.3

More Linkis-Cli command parameter reference: Linkis-Cli usage

ConfigurationDefaultRequiredDescription
pipeline.output.moldcsvnoresult set export type
pipeline.field.split,nocsv separator
pipeline.output.charsetgbknoresult set export character set
pipeline.output.isoverwritetruenooverwrite
wds.linkis.rm.instance3NoMaximum concurrent number of pipeline engines
pipeline.output.shuffle.null.typeNULLNoNull replacement
wds.linkis.engineconn.java.driver.memory2gnopipeline engine initialization memory size

If the default parameters are not satisfied, there are the following ways to configure some basic parameters

Pipeline Engine - 图1

Note: After modifying the configuration under the IDE tag, you need to specify -creator IDE to take effect (other tags are similar), such as:

  1. sh bin/linkis-cli -creator IDE \
  2. -submitUser hadoop \
  3. -engineType pipeline-1 \
  4. -codeType pipeline \
  5. -code "from hdfs:///000/000/000/A.dolphin to file:///000/000/000/B.csv"

Submit the task interface, configure it through the parameter params.configuration.runtime

  1. Example of http request parameters
  2. {
  3. "executionContent": {"code": "from hdfs:///000/000/000/A.dolphin to file:///000/000/000/B.csv", "runType": "pipeline"},
  4. "params": {
  5. "variable": {},
  6. "configuration": {
  7. "runtime": {
  8. "pipeline.output.mold":"csv",
  9. "pipeline.output.charset":"gbk"
  10. }
  11. }
  12. },
  13. "labels": {
  14. "engineType": "pipeline-1",
  15. "userCreator": "hadoop-IDE"
  16. }
  17. }

Linkis is managed through engine tags, and the data table information involved is as follows.

  1. linkis_ps_configuration_config_key: key and default values ​​of configuration parameters inserted into the engine
  2. linkis_cg_manager_label: insert engine label such as: pipeline-1
  3. linkis_ps_configuration_category: The directory association relationship of the insertion engine
  4. linkis_ps_configuration_config_value: Insert the configuration that the engine needs to display
  5. linkis_ps_configuration_key_engine_relation: The relationship between the configuration item and the engine

The initial data related to the engine in the table is as follows

  1. -- set variable
  2. SET @PIPELINE_LABEL="pipeline-1";
  3. SET @PIPELINE_ALL=CONCAT('*-*,',@PIPELINE_LABEL);
  4. SET @PIPELINE_IDE=CONCAT('*-IDE,',@PIPELINE_LABEL);
  5. -- engine label
  6. insert into `linkis_cg_manager_label` (`label_key`, `label_value`, `label_feature`, `label_value_size`, `update_time`, `create_time`) VALUES ('combined_userCreator_engineType', @PIPELINE_ALL, 'OPTIONAL', 2, now(), now());
  7. insert into `linkis_cg_manager_label` (`label_key`, `label_value`, `label_feature`, `label_value_size`, `update_time`, `create_time`) VALUES ('combined_userCreator_engineType', @PIPELINE_IDE, 'OPTIONAL', 2, now(), now());
  8. select @label_id := id from linkis_cg_manager_label where `label_value` = @PIPELINE_IDE;
  9. insert into linkis_ps_configuration_category (`label_id`, `level`) VALUES (@label_id, 2);
  10. -- configuration key
  11. INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('pipeline.output.mold', 'Value range: csv or excel', 'Result set export type','csv', 'OFT', '[\"csv\",\"excel\"]' , '0', '0', '1', 'pipeline engine settings', 'pipeline');
  12. INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('pipeline.field.split', 'value range:, or \\t', 'csv delimiter',',', 'OFT', '[\",\",\"\\\\ t\"]', '0', '0', '1', 'pipeline engine settings', 'pipeline');
  13. INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('pipeline.output.charset', 'value range: utf-8 or gbk', 'result set export character set','gbk', 'OFT', '[\"utf-8\",\" gbk\"]', '0', '0', '1', 'pipeline engine settings', 'pipeline');
  14. INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('pipeline.output.isoverwrite', 'Value range: true or false', 'Whether to overwrite','true', 'OFT', '[\"true\",\"false\"]', '0', '0', '1', 'pipeline engine settings', 'pipeline');
  15. INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('wds.linkis.rm.instance', 'Range: 1-3, Unit: Piece', 'Maximum concurrent number of pipeline engines','3', 'NumInterval', '[1,3]', '0 ', '0', '1', 'pipeline engine settings', 'pipeline');
  16. INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('wds.linkis.engineconn.java.driver.memory', 'value range: 1-10, unit: G', 'pipeline engine initialization memory size','2g', 'Regex', '^([ 1-9]|10)(G|g)$', '0', '0', '1', 'pipeline resource settings', 'pipeline');
  17. INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('pipeline.output.shuffle.null.type', 'Value range: NULL or BLANK', 'Null value replacement','NULL', 'OFT', '[\"NULL\",\"BLANK\ "]', '0', '0', '1', 'pipeline engine settings', 'pipeline');
  18. -- key engine relation
  19. insert into `linkis_ps_configuration_key_engine_relation` (`config_key_id`, `engine_type_label_id`)
  20. (select config.id as `config_key_id`, label.id AS `engine_type_label_id` FROM linkis_ps_configuration_config_key config
  21. INNER JOIN linkis_cg_manager_label label ON config.engine_conn_type = 'pipeline' and label_value = @PIPELINE_ALL);
  22. -- engine default configuration
  23. insert into `linkis_ps_configuration_config_value` (`config_key_id`, `config_value`, `config_label_id`)
  24. (select `relation`.`config_key_id` AS `config_key_id`, '' AS `config_value`, `relation`.`engine_type_label_id` AS `config_label_id` FROM linkis_ps_configuration_key_engine_relation relation
  25. INNER JOIN linkis_cg_manager_label label ON relation.engine_type_label_id = label.id AND label.label_value = @PIPELINE_ALL);