ElasticSearch 引擎
本文主要介绍在 Linkis1.X 中,ElasticSearch 引擎的配置、部署和使用。
如果您希望在您的服务器上使用 ElasticSearch 引擎,您需要准备 ElasticSearch 服务并提供连接信息,如 ElasticSearch 集群的连接地址、用户名和密码等
注意: 编译 ElasticSearch 引擎之前需要进行 Linkis 项目全量编译
发布的安装部署包中默认不包含此引擎插件, 你可以按Linkis引擎安装指引部署安装 ,或者按以下流程,手动编译部署
单独编译 ElasticSearch 引擎
${linkis_code_dir}/linkis-engineconn-plugins/elasticsearch/
mvn clean install
将 2.1 步编译出来的引擎包,位于
${linkis_code_dir}/linkis-engineconn-plugins/jdbc/target/out/elasticsearch
上传到服务器的引擎目录下
${LINKIS_HOME}/lib/linkis-engineplugins
并重启linkis-engineplugin(或者通过引擎接口进行刷新)
cd ${LINKIS_HOME}/sbin
sh linkis-daemon.sh restart cg-engineplugin
Linkis1.X是通过标签来进行的,所以需要在我们数据库中插入数据,插入的方式如下文所示。
管理台的配置是按照引擎标签来进行管理的,如果新增的引擎,有配置参数需要配置的话,需要修改对应的表的元数据
linkis_ps_configuration_config_key: 插入引擎的配置参数的key和默认values
linkis_cg_manager_label:插入引擎label如:hive-2.3.3
linkis_ps_configuration_category: 插入引擎的目录关联关系
linkis_ps_configuration_config_value: 插入引擎需要展示的配置
linkis_ps_configuration_key_engine_relation:配置项和引擎的关联关系
-- set variable
SET @ENGINE_LABEL="elasticsearch-7.6.2";
SET @ENGINE_IDE=CONCAT('*-IDE,',@ENGINE_LABEL);
SET @ENGINE_NAME="elasticsearch";
-- add es engine to IDE
insert into `linkis_cg_manager_label` (`label_key`, `label_value`, `label_feature`, `label_value_size`, `update_time`, `create_time`) VALUES ('combined_userCreator_engineType', @ENGINE_IDE, 'OPTIONAL', 2, now(), now());
select @label_id := id from `linkis_cg_manager_label` where label_value = @ENGINE_IDE;
insert into `linkis_ps_configuration_category` (`label_id`, `level`) VALUES (@label_id, 2);
-- insert configuration key
INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `engine_conn_type`, `is_hidden`, `is_advanced`, `level`, `treeName`) VALUES ('linkis.es.cluster', '例如:http://127.0.0.1:9200', '连接地址', 'http://127.0.0.1:9200', 'None', '', @ENGINE_NAME, 0, 0, 1, '数据源配置');
INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `engine_conn_type`, `is_hidden`, `is_advanced`, `level`, `treeName`) VALUES ('linkis.es.datasource', '连接别名', '连接别名', 'hadoop', 'None', '', @ENGINE_NAME, 0, 0, 1, '数据源配置');
INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `engine_conn_type`, `is_hidden`, `is_advanced`, `level`, `treeName`) VALUES ('linkis.es.username', 'username', 'ES集群用户名', '无', 'None', '', @ENGINE_NAME, 0, 0, 1, '数据源配置');
INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `engine_conn_type`, `is_hidden`, `is_advanced`, `level`, `treeName`) VALUES ('linkis.es.password', 'password', 'ES集群密码', '无', 'None', '', @ENGINE_NAME, 0, 0, 1, '数据源配置');
INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `engine_conn_type`, `is_hidden`, `is_advanced`, `level`, `treeName`) VALUES ('linkis.es.auth.cache', '客户端是否缓存认证', '客户端是否缓存认证', 'false', 'None', '', @ENGINE_NAME, 0, 0, 1, '数据源配置');
INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `engine_conn_type`, `is_hidden`, `is_advanced`, `level`, `treeName`) VALUES ('linkis.es.sniffer.enable', '客户端是否开启 sniffer', '客户端是否开启 sniffer', 'false', 'None', '', @ENGINE_NAME, 0, 0, 1, '数据源配置');
INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `engine_conn_type`, `is_hidden`, `is_advanced`, `level`, `treeName`) VALUES ('linkis.es.http.method', '调用方式', 'HTTP请求方式', 'GET', 'None', '', @ENGINE_NAME, 0, 0, 1, '数据源配置');
INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `engine_conn_type`, `is_hidden`, `is_advanced`, `level`, `treeName`) VALUES ('linkis.es.http.endpoint', '/_search', 'JSON 脚本调用的 Endpoint', '/_search', 'None', '', @ENGINE_NAME, 0, 0, 1, '数据源配置');
INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `engine_conn_type`, `is_hidden`, `is_advanced`, `level`, `treeName`) VALUES ('linkis.es.sql.endpoint', '/_sql', 'SQL 脚本调用的 Endpoint', '/_sql', 'None', '', @ENGINE_NAME, 0, 0, 1, '数据源配置');
INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `engine_conn_type`, `is_hidden`, `is_advanced`, `level`, `treeName`) VALUES ('linkis.es.sql.format', 'SQL 脚本调用的模板,%s 替换成 SQL 作为请求体请求Es 集群', '请求体', '{"query":"%s"}', 'None', '', @ENGINE_NAME, 0, 0, 1, '数据源配置');
INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `engine_conn_type`, `is_hidden`, `is_advanced`, `level`, `treeName`) VALUES ('linkis.es.headers.*', '客户端 Headers 配置', '客户端 Headers 配置', '无', 'None', '', @ENGINE_NAME, 0, 0, 1, '数据源配置');
INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `engine_conn_type`, `is_hidden`, `is_advanced`, `level`, `treeName`) VALUES ('linkis.engineconn.concurrent.limit', '引擎最大并发', '引擎最大并发', '100', 'None', '', @ENGINE_NAME, 0, 0, 1, '数据源配置');
-- elasticsearch engine -*
insert into `linkis_ps_configuration_key_engine_relation` (`config_key_id`, `engine_type_label_id`)
(select config.id as config_key_id, label.id AS engine_type_label_id FROM `linkis_ps_configuration_config_key` config
INNER JOIN `linkis_cg_manager_label` label ON config.engine_conn_type = @ENGINE_NAME and label_value = @ENGINE_IDE);
-- elasticsearch engine default configuration
insert into `linkis_ps_configuration_config_value` (`config_key_id`, `config_value`, `config_label_id`)
(select relation.config_key_id AS config_key_id, '' AS config_value, relation.engine_type_label_id AS config_label_id FROM `linkis_ps_configuration_key_engine_relation` relation
INNER JOIN `linkis_cg_manager_label` label ON relation.engine_type_label_id = label.id AND label.label_value = @ENGINE_IDE);
配置 | 默认值 | 是否必须 | 说明 |
---|---|---|---|
linkis.es.cluster | 127.0.0.1:9200 | 是 | ElasticSearch 集群,多个节点使用逗号分隔 |
linkis.es.datasource | hadoop | 是 | ElasticSearch datasource |
linkis.es.username | 无 | 否 | ElasticSearch 集群用户名 |
linkis.es.password | 无 | 否 | ElasticSearch 集群密码 |
linkis.es.auth.cache | false | 否 | 客户端是否缓存认证 |
linkis.es.sniffer.enable | false | 否 | 客户端是否开启 sniffer |
linkis.es.http.method | GET | 否 | 调用方式 |
linkis.es.http.endpoint | /_search | 否 | JSON 脚本调用的 Endpoint |
linkis.es.sql.endpoint | /_sql | 否 | SQL 脚本调用的 Endpoint |
linkis.es.sql.format | {“query”:”%s”} | 否 | SQL 脚本调用的模板,%s 替换成 SQL 作为请求体请求Es 集群 |
linkis.es.headers.* | 无 | 否 | 客户端 Headers 配置 |
linkis.engineconn.concurrent.limit | 100 | 否 | 引擎最大并发 |
您需要配置ElasticSearch的连接信息,包括连接地址信息或用户名密码(如果启用)等信息。
图3-1 ElasticSearch配置信息
您也可以再提交任务接口中的params.configuration.runtime进行修改即可
linkis.es.cluster
linkis.es.datasource
linkis.es.username
linkis.es.password
-codeType 参数说明
- essql:通过SQL脚本的方式执行ElasticSearch引擎任务
- esjson:通过JSON脚本的方式执行ElasticSearch引擎任务
使用示例
Linkis 1.0后提供了cli的方式提交任务,我们只需要指定对应的EngineConn和CodeType标签类型即可,ElasticSearch的使用如下:
essql方式示例
注意: 使用这种形式,ElasticSearch服务必须安装SQL插件,安装方式参考:https://github.com/NLPchina/elasticsearch-sql#elasticsearch-762
sh ./bin/linkis-cli -submitUser hadoop -engineType elasticsearch-7.6.2 -codeType essql -code '{"sql": "select * from kibana_sample_data_ecommerce limit 10' -runtimeMap linkis.es.http.method=GET -runtimeMap linkis.es.http.endpoint=/_sql -runtimeMap linkis.es.datasource=hadoop -runtimeMap linkis.es.cluster=127.0.0.1:9200
esjson方式示例
sh ./bin/linkis-cli -submitUser hadoop -engineType elasticsearch-7.6.2 -codeType esjson -code '{"query": {"match": {"order_id": "584677"}}}' -runtimeMap linkis.es.http.method=GET -runtimeMap linkis.es.http.endpoint=/kibana_sample_data_ecommerce/_search -runtimeMap linkis.es.datasource=hadoop -runtimeMap linkis.es.cluster=127.0.0.1:9200
具体使用可以参考: Linkis CLI Manual.
ElasticSearch的用户设置主要是设置ElasticSearch的连接信息,但是建议用户将此密码等信息进行加密管理。