Quickly Implement New Engine

Implementing a new engine is actually implementing a new EngineConnPlugin (ECP) engine plugin. Specific steps are as follows:

maven dep

  1. <dependency>
  2. <groupId>org.apache.linkis</groupId>
  3. <artifactId>linkis-engineconn-plugin-core</artifactId>
  4. <version>${linkis.version}</version>
  5. </dependency>
  6. <!-- and some other required maven configurations -->
  • EngineConnPlugin: When starting EngineConn, first find the corresponding EngineConnPlugin class, and use this as the entry point to obtain the implementation of other core interfaces, which is the main interface that must be implemented.

  • EngineConnFactory: Implementing the logic of how to start an engine connector and how to start an engine executor is an interface that must be implemented.

    • Implement the createEngineConn method: return an EngineConn object, where getEngine returns an object that encapsulates the connection information with the underlying engine, and also contains the Engine type information.
    • For engines that only support a single computing scenario, inherit SingleExecutorEngineConnFactory, implement createExecutor, and return the corresponding Executor.
    • For engines that support multi-computing scenarios, you need to inherit MultiExecutorEngineConnFactory and implement an ExecutorFactory for each computation type. EngineConnPlugin will obtain all ExecutorFactory through reflection, and return the corresponding Executor according to the actual situation.
  • EngineConnResourceFactory: It is used to limit the resources required to start an engine. Before the engine starts, it will apply for resources from Linkis Manager based on this. Not required, GenericEngineResourceFactory can be used by default.

  • EngineLaunchBuilder: It is used to encapsulate the necessary information that EngineConnManager can parse into startup commands. Not required, you can directly inherit JavaProcessEngineConnLaunchBuilder.

Executor is an executor. As a real computing scene executor, it is an actual computing logic execution unit and an abstraction of various specific capabilities of the engine. It provides various services such as locking, accessing status, and obtaining logs. And according to the actual needs, Linkis provides the following derived Executor base classes by default. The class names and main functions are as follows:

  • SensibleExecutor:
    • Executor has multiple states, allowing Executor to switch states
    • After the Executor switches states, operations such as notifications are allowed
  • YarnExecutor: Refers to the Yarn type engine, which can obtain applicationId, applicationURL and queue.
  • ResourceExecutor: means that the engine has the ability to dynamically change resources, and provides the requestExpectedResource method, which is used to apply for a new resource from the RM every time you want to change the resource; and the resourceUpdate method, which is used each time the engine actually uses the resource When changes occur, report the resource situation to RM.
  • AccessibleExecutor: is a very important Executor base class. If the user’s Executor inherits this base class, it means that the Engine can be accessed. Here, it is necessary to distinguish between the state() of SensibleExecutor and the getEngineStatus() method of AccessibleExecutor: state() is used to obtain the engine status, and getEngineStatus() will obtain the Metric data of basic indicators such as the status, load, and concurrency of the engine.
  • At the same time, if AccessibleExecutor is inherited, the Engine process will be triggered to instantiate multiple EngineReceiver methods. EngineReceiver is used to process RPC requests from Entrance, EM and LinkisMaster, making the engine an accessible engine. If users have special RPC requirements, they can communicate with AccessibleExecutor by implementing the RPCService interface.
  • ExecutableExecutor: is a resident Executor base class. The resident Executor includes: Streaming application in the production center, steps specified to run in independent mode after being submitted to Schedulelis, business applications for business users, etc.
  • StreamingExecutor: Streaming is a streaming application, inherited from ExecutableExecutor, and needs to have the ability to diagnose, do checkpoint, collect job information, and monitor alarms.
  • ComputationExecutor: is a commonly used interactive engine Executor, which handles interactive execution tasks and has interactive capabilities such as status query and task kill.
  • ConcurrentComputationExecutor: User concurrent engine Executor, commonly used in JDBC type engines. When executing scripts, the administrator account starts the engine instance, and all users share the engine instance.

This chapter takes the JDBC engine as an example to explain the implementation process of the new engine in detail, including engine code compilation, installation, database configuration, management console engine label adaptation, and the new engine script type extension in Scripts and the task node extension of the new workflow engine, etc. .

The abstract class inherited from the core class JDBCEngineConnExecutor in the JDBC engine is ConcurrentComputationExecutor, and the abstract class inherited from the core class XXXEngineConnExecutor in the calculation engine is ComputationExecutor. This leads to the biggest difference between the two: the JDBC engine instance is started by the administrator user and shared by all users to improve the utilization of machine resources; while the script of the computing engine type is submitted, an engine instance will be started under each user. , the engine instances between users are isolated from each other. This will not be elaborated here, because whether it is a concurrent engine or a computing engine, the additional modification process mentioned below should be consistent.

Correspondingly, if your new engine is a concurrent engine, then you need to pay attention to this class: AMConfiguration.scala, if your new engine is a computing engine, you can ignore it.

  1. object AMConfiguration {
  2. // If your engine is a multi-user concurrent engine, then this configuration item needs to be paid attention to
  3. val MULTI_USER_ENGINE_TYPES = CommonVars("wds.linkis.multi.user.engine.types", "jdbc,ck,es,io_file,appconn")
  4. private def getDefaultMultiEngineUser(): String = {
  5. // This should be to set the startup user when the concurrent engine is pulled up. The default jvmUser is the startup user of the engine service Java process.
  6. val jvmUser = Utils.getJvmUser
  7. s"""{jdbc:"$jvmUser", presto: "$jvmUser" es: "$jvmUser", ck:"$jvmUser", appconn:"$jvmUser", io_file:"root"}"""
  8. }
  9. }

In the class JDBCEngineConnFactory that implements the ComputationSingleExecutorEngineConnFactory interface, the following two methods need to be implemented:

  1. override protected def getEngineConnType: EngineType = EngineType.JDBC
  2. override protected def getRunType: RunType = RunType.JDBC

Therefore, it is necessary to add variables corresponding to JDBC in EngineType and RunType.

  1. // EngineType.scala is similar to the variable definition of the existing engine, adding JDBC related variables or code
  2. object EngineType extends Enumeration with Logging {
  3. val JDBC = Value("jdbc")
  4. }
  5. def mapStringToEngineType(str: String): EngineType = str match {
  6. case _ if JDBC.toString.equalsIgnoreCase(str) => JDBC
  7. }
  8. // RunType.scla中
  9. object RunType extends Enumeration {
  10. val JDBC = Value("jdbc")
  11. }
  1. // Add the version configuration of JDBC in LabelCommonConfig
  2. public class LabelCommonConfig {
  3. public final static CommonVars<String> JDBC_ENGINE_VERSION = CommonVars.apply("wds.linkis.jdbc.engine.version", "4");
  4. }
  5. // Supplement the matching logic of jdbc in the init method of EngineTypeLabelCreator
  6. // If this step is not done, when the code is submitted to the engine, the version number will be missing from the engine tag information
  7. public class EngineTypeLabelCreator {
  8. private static void init() {
  9. defaultVersion.put(EngineType.JDBC().toString(), LabelCommonConfig.JDBC_ENGINE_VERSION.getValue());
  10. }
  11. }

Follow configuration items:wds.linkis.storage.file.type

  1. object LinkisStorageConf{
  2. val FILE_TYPE = CommonVars("wds.linkis.storage.file.type", "dolphin,sql,scala,py,hql,python,out,log,text,sh,jdbc,ngql,psql,fql").getValue
  3. }

If this operation is not done, the variables in the JDBC script cannot be stored and parsed normally, and the code execution will fail when ${variable} is directly used in the script!

变量解析

  1. // Maintain the variable relationship between codeType and runType through CODE_TYPE_AND_RUN_TYPE_RELATION in the CodeAndRunTypeUtils tool class
  2. val CODE_TYPE_AND_RUN_TYPE_RELATION = CommonVars("wds.linkis.codeType.runType.relation", "sql=>sql|hql|jdbc|hive|psql|fql,python=>python|py|pyspark,java=>java,scala=>scala,shell=>sh|shell")

Refer to PR:https://github.com/apache/linkis/pull/2047

web/src/dss/module/resourceSimple/engine.vue

  1. methods: {
  2. calssifyName(params) {
  3. switch (params) {
  4. case 'jdbc':
  5. return 'JDBC';
  6. ......
  7. }
  8. }
  9. // 图标过滤
  10. supportIcon(item) {
  11. const supportTypes = [
  12. ......
  13. { rule: 'jdbc', logo: 'fi-jdbc' },
  14. ];
  15. }
  16. }

The final effect presented to the user:

JDBC类型引擎

An example command for JDBC engine module compilation is as follows:

  1. cd /linkis-project/linkis-engineconn-pluginsjdbc
  2. mvn clean install -DskipTests

When compiling a complete project, the new engine will not be added to the final tar.gz archive by default. If necessary, please modify the following files:

linkis-dist/package/src/main/assembly/assembly.xml

  1. <!--jdbc-->
  2. <fileSets>
  3. ......
  4. <fileSet>
  5. <directory>
  6. ../../linkis-engineconn-pluginsjdbc/target/out/
  7. </directory>
  8. <outputDirectory>lib/linkis-engineconn-plugins/</outputDirectory>
  9. <includes>
  10. <include>**/*</include>
  11. </includes>
  12. </fileSet>
  13. </fileSets>

Then run the compile command in the project root directory:

  1. mvn clean install -DskipTests

After successful compilation, find out.zip in the directories of linkis-dist/target/apache-linkis-1.x.x-incubating-bin.tar.gz and linkis-engineconn-pluginsjdbc/target/.

Upload the out.zip file to the Linkis deployment node and extract it to the Linkis installation directory /lib/linkis-engineconn-plugins/:

引擎安装

Don’t forget to delete out.zip after decompression, so far the engine compilation and installation are completed.

Select Add Engine in the console

添加引擎

If you want to support engine parameter configuration on the management console, you can modify the database according to the JDBC engine SQL example.

The JDBC engine is used here as an example. After the engine is installed, if you want to run the new engine code, you need to configure the database of the engine. Take the JDBC engine as an example, please modify it according to the situation of the new engine you implemented yourself.

The SQL reference is as follows:

  1. SET @JDBC_LABEL="jdbc-4";
  2. SET @JDBC_ALL=CONCAT('*-*,',@JDBC_LABEL);
  3. SET @JDBC_IDE=CONCAT('*-IDE,',@JDBC_LABEL);
  4. SET @JDBC_NODE=CONCAT('*-nodeexecution,',@JDBC_LABEL);
  5. -- JDBC
  6. INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('wds.linkis.rm.instance', '范围:1-20,单位:个', 'jdbc引擎最大并发数', '2', 'NumInterval', '[1,20]', '0', '0', '1', '队列资源', 'jdbc');
  7. insert into `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('wds.linkis.jdbc.driver', '取值范围:对应JDBC驱动名称', 'jdbc驱动名称','', 'None', '', '0', '0', '1', '数据源配置', 'jdbc');
  8. insert into `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('wds.linkis.jdbc.connect.url', '例如:jdbc:hive2://127.0.0.1:10000', 'jdbc连接地址', 'jdbc:hive2://127.0.0.1:10000', 'None', '', '0', '0', '1', '数据源配置', 'jdbc');
  9. insert into `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('wds.linkis.jdbc.version', '取值范围:jdbc3,jdbc4', 'jdbc版本','jdbc4', 'OFT', '[\"jdbc3\",\"jdbc4\"]', '0', '0', '1', '数据源配置', 'jdbc');
  10. insert into `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('wds.linkis.jdbc.connect.max', '范围:1-20,单位:个', 'jdbc引擎最大连接数', '10', 'NumInterval', '[1,20]', '0', '0', '1', '数据源配置', 'jdbc');
  11. insert into `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('wds.linkis.jdbc.auth.type', '取值范围:SIMPLE,USERNAME,KERBEROS', 'jdbc认证方式', 'USERNAME', 'OFT', '[\"SIMPLE\",\"USERNAME\",\"KERBEROS\"]', '0', '0', '1', '用户配置', 'jdbc');
  12. insert into `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('wds.linkis.jdbc.username', 'username', '数据库连接用户名', '', 'None', '', '0', '0', '1', '用户配置', 'jdbc');
  13. insert into `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('wds.linkis.jdbc.password', 'password', '数据库连接密码', '', 'None', '', '0', '0', '1', '用户配置', 'jdbc');
  14. insert into `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('wds.linkis.jdbc.principal', '例如:hadoop/host@KDC.COM', '用户principal', '', 'None', '', '0', '0', '1', '用户配置', 'jdbc');
  15. insert into `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('wds.linkis.jdbc.keytab.location', '例如:/data/keytab/hadoop.keytab', '用户keytab文件路径', '', 'None', '', '0', '0', '1', '用户配置', 'jdbc');
  16. insert into `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('wds.linkis.jdbc.proxy.user.property', '例如:hive.server2.proxy.user', '用户代理配置', '', 'None', '', '0', '0', '1', '用户配置', 'jdbc');
  17. INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('wds.linkis.engineconn.java.driver.cores', '取值范围:1-8,单位:个', 'jdbc引擎初始化核心个数', '1', 'NumInterval', '[1,8]', '0', '0', '1', 'jdbc引擎设置', 'jdbc');
  18. INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('wds.linkis.engineconn.java.driver.memory', '取值范围:1-8,单位:G', 'jdbc引擎初始化内存大小', '1g', 'Regex', '^([1-8])(G|g)$', '0', '0', '1', 'jdbc引擎设置', 'jdbc');
  19. insert into `linkis_cg_manager_label` (`label_key`, `label_value`, `label_feature`, `label_value_size`, `update_time`, `create_time`) VALUES ('combined_userCreator_engineType',@JDBC_ALL, 'OPTIONAL', 2, now(), now());
  20. insert into `linkis_ps_configuration_key_engine_relation` (`config_key_id`, `engine_type_label_id`)
  21. (select config.id as `config_key_id`, label.id AS `engine_type_label_id` FROM linkis_ps_configuration_config_key config INNER JOIN linkis_cg_manager_label label ON config.engine_conn_type = 'jdbc' and label_value = @JDBC_ALL);
  22. insert into `linkis_cg_manager_label` (`label_key`, `label_value`, `label_feature`, `label_value_size`, `update_time`, `create_time`) VALUES ('combined_userCreator_engineType',@JDBC_IDE, 'OPTIONAL', 2, now(), now());
  23. insert into `linkis_cg_manager_label` (`label_key`, `label_value`, `label_feature`, `label_value_size`, `update_time`, `create_time`) VALUES ('combined_userCreator_engineType',@JDBC_NODE, 'OPTIONAL', 2, now(), now());
  24. select @label_id := id from linkis_cg_manager_label where `label_value` = @JDBC_IDE;
  25. insert into linkis_ps_configuration_category (`label_id`, `level`) VALUES (@label_id, 2);
  26. select @label_id := id from linkis_cg_manager_label where `label_value` = @JDBC_NODE;
  27. insert into linkis_ps_configuration_category (`label_id`, `level`) VALUES (@label_id, 2);
  28. -- jdbc default configuration
  29. insert into `linkis_ps_configuration_config_value` (`config_key_id`, `config_value`, `config_label_id`)
  30. (select `relation`.`config_key_id` AS `config_key_id`, '' AS `config_value`, `relation`.`engine_type_label_id` AS `config_label_id` FROM linkis_ps_configuration_key_engine_relation relation INNER JOIN linkis_cg_manager_label label ON relation.engine_type_label_id = label.id AND label.label_value = @JDBC_ALL);

If you want to reset the database configuration data of the engine, the reference files are as follows, please modify and use as needed:

  1. -- Clear the initialization data of the jdbc engine
  2. SET @JDBC_LABEL="jdbc-4";
  3. SET @JDBC_ALL=CONCAT('*-*,',@JDBC_LABEL);
  4. SET @JDBC_IDE=CONCAT('*-IDE,',@JDBC_LABEL);
  5. SET @JDBC_NODE=CONCAT('*-nodeexecution,',@JDBC_LABEL);
  6. delete from `linkis_ps_configuration_config_value` where `config_label_id` in
  7. (select `relation`.`engine_type_label_id` AS `config_label_id` FROM `linkis_ps_configuration_key_engine_relation` relation INNER JOIN `linkis_cg_manager_label` label ON relation.engine_type_label_id = label.id AND label.label_value = @JDBC_ALL);
  8. delete from `linkis_ps_configuration_key_engine_relation`
  9. where `engine_type_label_id` in
  10. (select label.id FROM `linkis_ps_configuration_config_key` config
  11. INNER JOIN `linkis_cg_manager_label` label
  12. ON config.engine_conn_type = 'jdbc' and label_value = @JDBC_ALL);
  13. delete from `linkis_ps_configuration_category`
  14. where `label_id` in (select id from `linkis_cg_manager_label` where `label_value` in(@JDBC_IDE, @JDBC_NODE));
  15. delete from `linkis_ps_configuration_config_key` where `engine_conn_type` = 'jdbc';
  16. delete from `linkis_cg_manager_label` where `label_value` in (@JDBC_ALL, @JDBC_IDE, @JDBC_NODE);

Final effect:

JDBC引擎

After this configuration, when linkis-cli and Scripts submit the engine script, the tag information of the engine and the connection information of the data source can be correctly matched, and then the newly added engine can be pulled up.

If you use the Scripts function of DSS, you also need to make some small changes to the front-end files of the web in the dss project. The purpose of the changes is to support creating, opening, and executing JDBC engine script types in Scripts, as well as implementing the corresponding engine. Icons, fonts, etc.

web/src/common/config/scriptis.js

  1. {
  2. rule: /\.jdbc$/i,
  3. lang: 'hql',
  4. executable: true,
  5. application: 'jdbc',
  6. runType: 'jdbc',
  7. ext: '.jdbc',
  8. scriptType: 'jdbc',
  9. abbr: 'jdbc',
  10. logo: 'fi-jdbc',
  11. color: '#444444',
  12. isCanBeNew: true,
  13. label: 'JDBC',
  14. isCanBeOpen: true
  15. },

web/src/apps/scriptis/module/workSidebar/workSidebar.vue

  1. copyName() {
  2. let typeArr = ['......', 'jdbc']
  3. }

web/src/apps/scriptis/module/workbench/title.vue

  1. data() {
  2. return {
  3. isHover: false,
  4. iconColor: {
  5. 'fi-jdbc': '#444444',
  6. },
  7. };
  8. },

web/src/apps/scriptis/module/workbench/modal.js

  1. let logoList = [
  2. { rule: /\.jdbc$/i, logo: 'fi-jdbc' },
  3. ];

web/src/components/tree/support.js

  1. export const supportTypes = [
  2. // Probably useless here
  3. { rule: /\.jdbc$/i, logo: 'fi-jdbc' },
  4. ]

Engine icon display

web/src/dss/module/resourceSimple/engine.vue

  1. methods: {
  2. calssifyName(params) {
  3. switch (params) {
  4. case 'jdbc':
  5. return 'JDBC';
  6. ......
  7. }
  8. }
  9. // 图标过滤
  10. supportIcon(item) {
  11. const supportTypes = [
  12. ......
  13. { rule: 'jdbc', logo: 'fi-jdbc' },
  14. ];
  15. }
  16. }

web/src/dss/assets/projectIconFont/iconfont.css

  1. .fi-jdbc:before {
  2. content: "\e75e";
  3. }

The control here should be:

引擎图标

Find an svg file of the engine icon

web/src/components/svgIcon/svg/fi-jdbc.svg

If the new engine needs to contribute to the community in the future, the svg icons, fonts, etc. corresponding to the new engine need to confirm the open source agreement to which they belong, or obtain their copyright license.

The final result:

工作流适配

Save the definition data of the newly added JDBC engine in the dss_workflow_node table, refer to SQL:

  1. -- Engine task node basic information definition
  2. insert into `dss_workflow_node` (`id`, `name`, `appconn_name`, `node_type`, `jump_url`, `support_jump`, `submit_to_scheduler`, `enable_copy`, `should_creation_before_node`, `icon`) values('18','jdbc','-1','linkis.jdbc.jdbc',NULL,'1','1','1','0','svg文件');
  3. -- The svg file corresponds to the new engine task node icon
  4. -- Classification and division of engine task nodes
  5. insert into `dss_workflow_node_to_group`(`node_id`,`group_id`) values (18, 2);
  6. -- Basic information (parameter attribute) binding of the engine task node
  7. INSERT INTO `dss_workflow_node_to_ui`(`workflow_node_id`,`ui_id`) VALUES (18,45);
  8. -- The basic information related to the engine task node is defined in the dss_workflow_node_ui table, and then displayed in the form of a form on the right side of the above figure. You can expand other basic information for the new engine, and then it will be automatically rendered by the form on the right.

web/src/apps/workflows/service/nodeType.js

  1. import jdbc from '../module/process/images/newIcon/jdbc.svg';
  2. const NODETYPE = {
  3. ......
  4. JDBC: 'linkis.jdbc.jdbc',
  5. }
  6. const ext = {
  7. ......
  8. [NODETYPE.JDBC]: 'jdbc',
  9. }
  10. const NODEICON = {
  11. [NODETYPE.JDBC]: {
  12. icon: jdbc,
  13. class: {'jdbc': true}
  14. },
  15. }

Add the icon of the new engine in the web/src/apps/workflows/module/process/images/newIcon/ directory

web/src/apps/workflows/module/process/images/newIcon/jdbc

Also when contributing to the community, please consider the lincese or copyright of the svg file.

The above content records the implementation process of the new engine, as well as some additional engine configurations that need to be done. At present, the expansion process of a new engine is still relatively cumbersome, and it is hoped that the expansion and installation of the new engine can be optimized in subsequent versions.