如何实现新引擎

实现一个新的引擎其实就是实现一个新的EngineConnPlugin(ECP)引擎插件。具体步骤如下:

maven依赖

  1. <dependency>
  2. <groupId>org.apache.linkis</groupId>
  3. <artifactId>linkis-engineconn-plugin-core</artifactId>
  4. <version>${linkis.version}</version>
  5. </dependency>
  6. <!-- 以及一些其他所需依赖的maven配置 -->
  • EngineConnPlugin: 启动EngineConn时,先找到对应的EngineConnPlugin类,以此为入口,获取其它核心接口的实现,是必须实现的主要接口。

  • EngineConnFactory: 实现如何启动一个引擎连接器,和如何启动一个引擎执行器的逻辑,是必须实现的接口。

    • 实现createEngineConn方法:返回一个EngineConn对象,其中,getEngine返回一个封装了与底层引擎连接信息的对象,同时包含Engine类型信息。
    • 对于只支持单一计算场景的引擎,继承SingleExecutorEngineConnFactory,实现createExecutor,返回对应的Executor。
    • 对于支持多计算场景的引擎,需要继承MultiExecutorEngineConnFactory,并为每种计算类型实现一个ExecutorFactory。EngineConnPlugin会通过反射获取所有的ExecutorFactory,根据实际情况返回对应的Executor。
  • EngineConnResourceFactory: 用于限定启动一个引擎所需要的资源,引擎启动前,将以此为依 据 向 Linkis Manager 申 请 资 源。非必须,默认可以使用GenericEngineResourceFactory。

  • EngineLaunchBuilder: 用于封装EngineConnManager可以解析成启动命令的必要信息。非必须,可以直接继承JavaProcessEngineConnLaunchBuilder。

Executor为执行器,作为真正的计算场景执行器,是实际的计算逻辑执行单元,也是对引擎各种具体能力的抽象,提供加锁、访问状态、获取日志等多种不同的服务。并根据实际的使用需要,Linkis默认提供以下的派生Executor基类,其类名和主要作用如下:

  • SensibleExecutor:
    • Executor存在多种状态,允许Executor切换状态
    • Executor切换状态后,允许做通知等操作
  • YarnExecutor: 指Yarn类型的引擎,能够获取得到applicationId和applicationURL和队列。
  • ResourceExecutor: 指引擎具备资源动态变化的能力,配合提供requestExpectedResource方法,用于每次希望更改资源时,先向RM申请新的资源;而resourceUpdate方法,用于每次引擎实际使用资源发生变化时,向RM汇报资源情况。
  • AccessibleExecutor: 是一个非常重要的Executor基类。如果用户的Executor继承了该基类,则表示该Engine是可以被访问的。这里需区分SensibleExecutor的state()和 AccessibleExecutor 的 getEngineStatus()方法:state()用于获取引擎状态,getEngineStatus()会获取引擎的状态、负载、并发等基础指标Metric数据。
  • 同时,如果继承了 AccessibleExecutor,会触发Engine 进程实例化多个EngineReceiver方法。EngineReceiver用于处理Entrance、EM和LinkisMaster的RPC请求,使得该引擎变成了一个可被访问的引擎,用户如果有特殊的RPC需求,可以通过实现RPCService接口,进而实现与AccessibleExecutor通信。
  • ExecutableExecutor: 是一个常驻型的Executor基类,常驻型的Executor包含:生产中心的Streaming应用、提交给Schedulis后指定要以独立模式运行的脚步、业务用户的业务应用等。
  • StreamingExecutor: Streaming为流式应用,继承自ExecutableExecutor,需具备诊断、do checkpoint、采集作业信息、监控告警的能力。
  • ComputationExecutor: 是常用的交互式引擎Executor,处理交互式执行任务,并且具备状态查询、任务kill等交互式能力。
  • ConcurrentComputationExecutor: 用户并发引擎Executor,常用于JDBC类型引擎,执行脚本时,由管理员账户拉起引擎实例,所有用户共享引擎实例。

本章节以JDBC引擎举例,详解新引擎的实现过程,包括引擎代码的编译、安装、数据库配置,管理台引擎标签适配,以及Scripts中新引擎脚本类型扩展和工作流新引擎的任务节点扩展等。

JDBC引擎中的核心类JDBCEngineConnExecutor继承的抽象类是ConcurrentComputationExecutor,计算类引擎中的核心类XXXEngineConnExecutor继承的抽象类是ComputationExecutor。这导致两者最大的一个区别是:JDBC引擎实例由管理员用户启动,被所有用户共享,以提高机器资源利用率;而计算引擎类型的脚本在提交时,每个用户下会启动一个引擎实例,用户间引擎实例互相隔离。这个在此处暂不细说,因为无论是并发引擎还是计算引擎,下文提到的额外修改流程应是一致的。

相应的,如果你新增的引擎是并发引擎,那么你需要关注下这个类:AMConfiguration.scala,如果你新增的引擎是计算类引擎,则可忽略。

  1. object AMConfiguration {
  2. // 如果你的引擎是多用户并发引擎,那么这个配置项需要关注下
  3. val MULTI_USER_ENGINE_TYPES = CommonVars("wds.linkis.multi.user.engine.types", "jdbc,ck,es,io_file,appconn")
  4. private def getDefaultMultiEngineUser(): String = {
  5. // 此处应该是为了设置并发引擎拉起时的启动用户,默认jvmUser即是引擎服务Java进程的启动用户
  6. val jvmUser = Utils.getJvmUser
  7. s"""{jdbc:"$jvmUser", presto: "$jvmUser" es: "$jvmUser", ck:"$jvmUser", appconn:"$jvmUser", io_file:"root"}"""
  8. }
  9. }

实现ComputationSingleExecutorEngineConnFactory接口的类JDBCEngineConnFactory中,下面两个方法需要实现:

  1. override protected def getEngineConnType: EngineType = EngineType.JDBC
  2. override protected def getRunType: RunType = RunType.JDBC

因此需要在EngineType和RunType中增加JDBC对应的变量。

  1. // EngineType.scala中类似已存在引擎的变量定义,增加JDBC相关变量或代码
  2. object EngineType extends Enumeration with Logging {
  3. val JDBC = Value("jdbc")
  4. }
  5. def mapStringToEngineType(str: String): EngineType = str match {
  6. case _ if JDBC.toString.equalsIgnoreCase(str) => JDBC
  7. }
  8. // RunType.scla中
  9. object RunType extends Enumeration {
  10. val JDBC = Value("jdbc")
  11. }
  1. // 在LabelCommonConfig中增加JDBC的version配置
  2. public class LabelCommonConfig {
  3. public final static CommonVars<String> JDBC_ENGINE_VERSION = CommonVars.apply("wds.linkis.jdbc.engine.version", "4");
  4. }
  5. // 在EngineTypeLabelCreator的init方法中补充jdbc的匹配逻辑
  6. // 如果这一步不做,代码提交到引擎上时,引擎标签信息中会缺少版本号
  7. public class EngineTypeLabelCreator {
  8. private static void init() {
  9. defaultVersion.put(EngineType.JDBC().toString(), LabelCommonConfig.JDBC_ENGINE_VERSION.getValue());
  10. }
  11. }

关注配置项:wds.linkis.storage.file.type

  1. object LinkisStorageConf{
  2. val FILE_TYPE = CommonVars("wds.linkis.storage.file.type", "dolphin,sql,scala,py,hql,python,out,log,text,sh,jdbc,ngql,psql,fql").getValue
  3. }

如果这个操作不做,JDBC的脚本中变量不能被正常存储和解析,脚本中直接使用${变量}时代码会执行失败!

变量解析

  1. // 通过CodeAndRunTypeUtils工具类中的CODE_TYPE_AND_RUN_TYPE_RELATION变量来维护codeType和runType间的对应关系
  2. val CODE_TYPE_AND_RUN_TYPE_RELATION = CommonVars("wds.linkis.codeType.runType.relation", "sql=>sql|hql|jdbc|hive|psql|fql,python=>python|py|pyspark,java=>java,scala=>scala,shell=>sh|shell")

参考PR:https://github.com/apache/linkis/pull/2047

linkis-web/src/dss/module/resourceSimple/engine.vue

  1. methods: {
  2. calssifyName(params) {
  3. switch (params) {
  4. case 'jdbc':
  5. return 'JDBC';
  6. ......
  7. }
  8. }
  9. // 图标过滤
  10. supportIcon(item) {
  11. const supportTypes = [
  12. ......
  13. { rule: 'jdbc', logo: 'fi-jdbc' },
  14. ];
  15. }
  16. }

最终呈现给用户的效果:

JDBC类型引擎

JDBC引擎模块编译的示例命令如下:

  1. cd /linkis-project/linkis-engineconn-pluginsengineconn-plugins/jdbc
  2. mvn clean install -DskipTests

编译完整项目时,新增引擎默认不会加到最终的tar.gz压缩包中,如果需要,请修改如下文件:

linkis-dist/src/main/assembly/distribution.xml

  1. <!--jdbc-->
  2. <fileSets>
  3. ......
  4. <fileSet>
  5. <directory>
  6. ../linkis-engineconn-plugins/jdbc/target/out/
  7. </directory>
  8. <outputDirectory>linkis-package/lib/linkis-engineconn-plugins/</outputDirectory>
  9. <includes>
  10. <include>**/*</include>
  11. </includes>
  12. </fileSet>
  13. </fileSets>

然后对在项目根目录运行编译命令:

  1. mvn clean install -DskipTests

编译成功后在linkis-dist/target/apache-linkis-1.x.x-bin.tar.gz和linkis-engineconn-plugins/jdbc/target/目录下找到out.zip。

上传out.zip文件到Linkis的部署节点,解压缩到Linkis安装目录/lib/linkis-engineconn-plugins/下面:

引擎安装

解压后别忘记删除out.zip,至此引擎编译和安装完成。

在管理台选择添加引擎

添加引擎

如果您希望在管理台支持引擎参数配置,可以按照JDBC引擎SQL示例修改数据库。

此处以JDBC引擎为例,引擎安装完之后,要想运行新的引擎代码,还需对引擎进行数据库配置,以JDBC引擎为例,按照你自己实现的新引擎的情况,请按需修改。

SQL参考如下:

  1. SET @JDBC_LABEL="jdbc-4";
  2. SET @JDBC_ALL=CONCAT('*-*,',@JDBC_LABEL);
  3. SET @JDBC_IDE=CONCAT('*-IDE,',@JDBC_LABEL);
  4. SET @JDBC_NODE=CONCAT('*-nodeexecution,',@JDBC_LABEL);
  5. -- JDBC
  6. INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('wds.linkis.rm.instance', '范围:1-20,单位:个', 'jdbc引擎最大并发数', '2', 'NumInterval', '[1,20]', '0', '0', '1', '队列资源', 'jdbc');
  7. insert into `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('wds.linkis.jdbc.driver', '取值范围:对应JDBC驱动名称', 'jdbc驱动名称','', 'None', '', '0', '0', '1', '数据源配置', 'jdbc');
  8. insert into `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('wds.linkis.jdbc.connect.url', '例如:jdbc:hive2://127.0.0.1:10000', 'jdbc连接地址', 'jdbc:hive2://127.0.0.1:10000', 'None', '', '0', '0', '1', '数据源配置', 'jdbc');
  9. insert into `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('wds.linkis.jdbc.version', '取值范围:jdbc3,jdbc4', 'jdbc版本','jdbc4', 'OFT', '[\"jdbc3\",\"jdbc4\"]', '0', '0', '1', '数据源配置', 'jdbc');
  10. insert into `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('wds.linkis.jdbc.connect.max', '范围:1-20,单位:个', 'jdbc引擎最大连接数', '10', 'NumInterval', '[1,20]', '0', '0', '1', '数据源配置', 'jdbc');
  11. insert into `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('wds.linkis.jdbc.auth.type', '取值范围:SIMPLE,USERNAME,KERBEROS', 'jdbc认证方式', 'USERNAME', 'OFT', '[\"SIMPLE\",\"USERNAME\",\"KERBEROS\"]', '0', '0', '1', '用户配置', 'jdbc');
  12. insert into `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('wds.linkis.jdbc.username', 'username', '数据库连接用户名', '', 'None', '', '0', '0', '1', '用户配置', 'jdbc');
  13. insert into `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('wds.linkis.jdbc.password', 'password', '数据库连接密码', '', 'None', '', '0', '0', '1', '用户配置', 'jdbc');
  14. insert into `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('wds.linkis.jdbc.principal', '例如:hadoop/host@KDC.COM', '用户principal', '', 'None', '', '0', '0', '1', '用户配置', 'jdbc');
  15. insert into `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('wds.linkis.jdbc.keytab.location', '例如:/data/keytab/hadoop.keytab', '用户keytab文件路径', '', 'None', '', '0', '0', '1', '用户配置', 'jdbc');
  16. insert into `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('wds.linkis.jdbc.proxy.user.property', '例如:hive.server2.proxy.user', '用户代理配置', '', 'None', '', '0', '0', '1', '用户配置', 'jdbc');
  17. INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('wds.linkis.engineconn.java.driver.cores', '取值范围:1-8,单位:个', 'jdbc引擎初始化核心个数', '1', 'NumInterval', '[1,8]', '0', '0', '1', 'jdbc引擎设置', 'jdbc');
  18. INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('wds.linkis.engineconn.java.driver.memory', '取值范围:1-8,单位:G', 'jdbc引擎初始化内存大小', '1g', 'Regex', '^([1-8])(G|g)$', '0', '0', '1', 'jdbc引擎设置', 'jdbc');
  19. insert into `linkis_cg_manager_label` (`label_key`, `label_value`, `label_feature`, `label_value_size`, `update_time`, `create_time`) VALUES ('combined_userCreator_engineType',@JDBC_ALL, 'OPTIONAL', 2, now(), now());
  20. insert into `linkis_ps_configuration_key_engine_relation` (`config_key_id`, `engine_type_label_id`)
  21. (select config.id as `config_key_id`, label.id AS `engine_type_label_id` FROM linkis_ps_configuration_config_key config INNER JOIN linkis_cg_manager_label label ON config.engine_conn_type = 'jdbc' and label_value = @JDBC_ALL);
  22. insert into `linkis_cg_manager_label` (`label_key`, `label_value`, `label_feature`, `label_value_size`, `update_time`, `create_time`) VALUES ('combined_userCreator_engineType',@JDBC_IDE, 'OPTIONAL', 2, now(), now());
  23. insert into `linkis_cg_manager_label` (`label_key`, `label_value`, `label_feature`, `label_value_size`, `update_time`, `create_time`) VALUES ('combined_userCreator_engineType',@JDBC_NODE, 'OPTIONAL', 2, now(), now());
  24. select @label_id := id from linkis_cg_manager_label where `label_value` = @JDBC_IDE;
  25. insert into linkis_ps_configuration_category (`label_id`, `level`) VALUES (@label_id, 2);
  26. select @label_id := id from linkis_cg_manager_label where `label_value` = @JDBC_NODE;
  27. insert into linkis_ps_configuration_category (`label_id`, `level`) VALUES (@label_id, 2);
  28. -- jdbc default configuration
  29. insert into `linkis_ps_configuration_config_value` (`config_key_id`, `config_value`, `config_label_id`)
  30. (select `relation`.`config_key_id` AS `config_key_id`, '' AS `config_value`, `relation`.`engine_type_label_id` AS `config_label_id` FROM linkis_ps_configuration_key_engine_relation relation INNER JOIN linkis_cg_manager_label label ON relation.engine_type_label_id = label.id AND label.label_value = @JDBC_ALL);

如果想重置引擎的数据库配置数据,参考文件如下,请按需进行修改使用:

  1. -- 清除jdbc引擎的初始化数据
  2. SET @JDBC_LABEL="jdbc-4";
  3. SET @JDBC_ALL=CONCAT('*-*,',@JDBC_LABEL);
  4. SET @JDBC_IDE=CONCAT('*-IDE,',@JDBC_LABEL);
  5. SET @JDBC_NODE=CONCAT('*-nodeexecution,',@JDBC_LABEL);
  6. delete from `linkis_ps_configuration_config_value` where `config_label_id` in
  7. (select `relation`.`engine_type_label_id` AS `config_label_id` FROM `linkis_ps_configuration_key_engine_relation` relation INNER JOIN `linkis_cg_manager_label` label ON relation.engine_type_label_id = label.id AND label.label_value = @JDBC_ALL);
  8. delete from `linkis_ps_configuration_key_engine_relation`
  9. where `engine_type_label_id` in
  10. (select label.id FROM `linkis_ps_configuration_config_key` config
  11. INNER JOIN `linkis_cg_manager_label` label
  12. ON config.engine_conn_type = 'jdbc' and label_value = @JDBC_ALL);
  13. delete from `linkis_ps_configuration_category`
  14. where `label_id` in (select id from `linkis_cg_manager_label` where `label_value` in(@JDBC_IDE, @JDBC_NODE));
  15. delete from `linkis_ps_configuration_config_key` where `engine_conn_type` = 'jdbc';
  16. delete from `linkis_cg_manager_label` where `label_value` in (@JDBC_ALL, @JDBC_IDE, @JDBC_NODE);

最终的效果:

JDBC引擎

这样配置完之后,linkis-cli以及Scripts提交引擎脚本时,才能正确匹配到引擎的标签信息和数据源的连接信息,然后才能拉起你新加的引擎。

如果你使用到了DSS的Scripts功能,还需要对dss项目中web的前端文件进行一些小小的改动,改动的目的是为了在Scripts中支持新建、打开、执行JDBC引擎脚本类型,以及实现引擎对应的图标、字体等。

web/src/common/config/scriptis.js

  1. {
  2. rule: /\.jdbc$/i,
  3. lang: 'hql',
  4. executable: true,
  5. application: 'jdbc',
  6. runType: 'jdbc',
  7. ext: '.jdbc',
  8. scriptType: 'jdbc',
  9. abbr: 'jdbc',
  10. logo: 'fi-jdbc',
  11. color: '#444444',
  12. isCanBeNew: true,
  13. label: 'JDBC',
  14. isCanBeOpen: true
  15. },

web/src/apps/scriptis/module/workSidebar/workSidebar.vue

  1. copyName() {
  2. let typeArr = ['......', 'jdbc']
  3. }

web/src/apps/scriptis/module/workbench/title.vue

  1. data() {
  2. return {
  3. isHover: false,
  4. iconColor: {
  5. 'fi-jdbc': '#444444',
  6. },
  7. };
  8. },

web/src/apps/scriptis/module/workbench/modal.js

  1. let logoList = [
  2. { rule: /\.jdbc$/i, logo: 'fi-jdbc' },
  3. ];

web/src/components/tree/support.js

  1. export const supportTypes = [
  2. // 这里大概没用到
  3. { rule: /\.jdbc$/i, logo: 'fi-jdbc' },
  4. ]

引擎图标展示

web/src/dss/module/resourceSimple/engine.vue

  1. methods: {
  2. calssifyName(params) {
  3. switch (params) {
  4. case 'jdbc':
  5. return 'JDBC';
  6. ......
  7. }
  8. }
  9. // 图标过滤
  10. supportIcon(item) {
  11. const supportTypes = [
  12. ......
  13. { rule: 'jdbc', logo: 'fi-jdbc' },
  14. ];
  15. }
  16. }

web/src/dss/assets/projectIconFont/iconfont.css

  1. .fi-jdbc:before {
  2. content: "\e75e";
  3. }

此处控制的应该是:

引擎图标

找一个引擎图标的svg文件

web/src/components/svgIcon/svg/fi-jdbc.svg

如果新引擎后续需要贡献社区,那么新引擎对应的svg图标、字体等需要确认其所属的开源协议,或获取其版权许可。

最终达成的效果:

工作流适配

在dss_workflow_node表中保存新加JDBC引擎的定义数据,参考SQL:

  1. # 引擎任务节点基本信息定义
  2. insert into `dss_workflow_node` (`id`, `name`, `appconn_name`, `node_type`, `jump_url`, `support_jump`, `submit_to_scheduler`, `enable_copy`, `should_creation_before_node`, `icon`) values('18','jdbc','-1','linkis.jdbc.jdbc',NULL,'1','1','1','0','svg文件');
  3. # svg文件对应新引擎任务节点图标
  4. # 引擎任务节点分类划分
  5. insert into `dss_workflow_node_to_group`(`node_id`,`group_id`) values (18, 2);
  6. # 引擎任务节点的基本信息(参数属性)绑定
  7. INSERT INTO `dss_workflow_node_to_ui`(`workflow_node_id`,`ui_id`) VALUES (18,45);
  8. # 在dss_workflow_node_ui表中定义了引擎任务节点相关的基本信息,然后以表单的形式在上图右侧中展示,你可以为新引擎扩充其他基础信息,然后自动被右侧表单渲染。

web/src/apps/workflows/service/nodeType.js

  1. import jdbc from '../module/process/images/newIcon/jdbc.svg';
  2. const NODETYPE = {
  3. ......
  4. JDBC: 'linkis.jdbc.jdbc',
  5. }
  6. const ext = {
  7. ......
  8. [NODETYPE.JDBC]: 'jdbc',
  9. }
  10. const NODEICON = {
  11. [NODETYPE.JDBC]: {
  12. icon: jdbc,
  13. class: {'jdbc': true}
  14. },
  15. }

在web/src/apps/workflows/module/process/images/newIcon/目录下增加新引擎的图标

web/src/apps/workflows/module/process/images/newIcon/jdbc

同样贡献社区时,请考虑svg文件的lincese或版权。

上述内容记录了新引擎的实现流程,以及额外需要做的一些引擎配置。目前,一个新引擎的扩展流程还是比较繁琐的,希望能在后续版本中,优化新引擎的扩展、以及安装等过程。