Doris On ES

Doris-On-ES将Doris的分布式查询规划能力和ES(Elasticsearch)的全文检索能力相结合,提供更完善的OLAP分析场景解决方案:

  1. ES中的多index分布式Join查询
  2. Doris和ES中的表联合查询,更复杂的全文检索过滤

本文档主要介绍该功能的实现原理、使用方式等。

名词解释

Doris相关

  • FE:Frontend,Doris 的前端节点,负责元数据管理和请求接入
  • BE:Backend,Doris 的后端节点,负责查询执行和数据存储

ES相关

  • DataNode:ES的数据存储与计算节点
  • MasterNode:ES的Master节点,管理元数据、节点、数据分布等
  • scroll:ES内置的数据集游标特性,用来对数据进行流式扫描和过滤
  • _source: 导入时传入的原始JSON格式文档内容
  • doc_values: ES/Lucene 中字段的列式存储定义
  • keyword: 字符串类型字段,ES/Lucene不会对文本内容进行分词处理
  • text: 字符串类型字段,ES/Lucene会对文本内容进行分词处理,分词器需要用户指定,默认为standard英文分词器

使用方法

创建ES索引

  1. PUT test
  2. {
  3. "settings": {
  4. "index": {
  5. "number_of_shards": "1",
  6. "number_of_replicas": "0"
  7. }
  8. },
  9. "mappings": {
  10. "doc": { // ES 7.x版本之后创建索引时不需要指定type,会有一个默认且唯一的`_doc` type
  11. "properties": {
  12. "k1": {
  13. "type": "long"
  14. },
  15. "k2": {
  16. "type": "date"
  17. },
  18. "k3": {
  19. "type": "keyword"
  20. },
  21. "k4": {
  22. "type": "text",
  23. "analyzer": "standard"
  24. },
  25. "k5": {
  26. "type": "float"
  27. }
  28. }
  29. }
  30. }
  31. }

ES索引导入数据

  1. POST /_bulk
  2. {"index":{"_index":"test","_type":"doc"}}
  3. { "k1" : 100, "k2": "2020-01-01", "k3": "Trying out Elasticsearch", "k4": "Trying out Elasticsearch", "k5": 10.0}
  4. {"index":{"_index":"test","_type":"doc"}}
  5. { "k1" : 100, "k2": "2020-01-01", "k3": "Trying out Doris", "k4": "Trying out Doris", "k5": 10.0}
  6. {"index":{"_index":"test","_type":"doc"}}
  7. { "k1" : 100, "k2": "2020-01-01", "k3": "Doris On ES", "k4": "Doris On ES", "k5": 10.0}
  8. {"index":{"_index":"test","_type":"doc"}}
  9. { "k1" : 100, "k2": "2020-01-01", "k3": "Doris", "k4": "Doris", "k5": 10.0}
  10. {"index":{"_index":"test","_type":"doc"}}
  11. { "k1" : 100, "k2": "2020-01-01", "k3": "ES", "k4": "ES", "k5": 10.0}

Doris中创建ES外表

具体建表语法参照:CREATE TABLE

  1. CREATE EXTERNAL TABLE `test` (
  2. `k1` bigint(20) COMMENT "",
  3. `k2` datetime COMMENT "",
  4. `k3` varchar(20) COMMENT "",
  5. `k4` varchar(100) COMMENT "",
  6. `k5` float COMMENT ""
  7. ) ENGINE=ELASTICSEARCH // ENGINE必须是Elasticsearch
  8. PROPERTIES (
  9. "hosts" = "http://192.168.0.1:8200,http://192.168.0.2:8200",
  10. "index" = "test",
  11. "type" = "doc",
  12. "user" = "root",
  13. "password" = "root"
  14. );

参数说明:

参数说明
hostsES集群地址,可以是一个或多个,也可以是ES前端的负载均衡地址
index对应的ES的index名字,支持alias,如果使用doc_value,需要使用真实的名称
typeindex的type,ES 7.x及以后的版本不传此参数
userES集群用户名
password对应用户的密码信息
  • ES 7.x之前的集群请注意在建表的时候选择正确的索引类型type
  • 认证方式目前仅支持Http Basic认证,并且需要确保该用户有访问: /_cluster/state/、_nodes/http等路径和index的读权限; 集群未开启安全认证,用户名和密码不需要设置
  • Doris表中的列名需要和ES中的字段名完全匹配,字段类型应该保持一致
  • ENGINE必须是 Elasticsearch
过滤条件下推

Doris On ES一个重要的功能就是过滤条件的下推: 过滤条件下推给ES,这样只有真正满足条件的数据才会被返回,能够显著的提高查询性能和降低Doris和Elasticsearch的CPU、memory、IO使用量

下面的操作符(Operators)会被优化成如下ES Query:

SQL syntaxES 5.x+ syntax
=term query
interms query
> , < , >= , ⇐range query
andbool.filter
orbool.should
notbool.must_not
not inbool.must_not + terms query
is_not_nullexists query
is_nullbool.must_not + exists query
esqueryES原生json形式的QueryDSL
数据类型映射
Doris\ESbyteshortintegerlongfloatdoublekeywordtextdate
tinyint
smallint
int
bigint
float
double
char
varchar
date
datetime

启用列式扫描优化查询速度(enable_docvalue_scan=true)

  1. CREATE EXTERNAL TABLE `test` (
  2. `k1` bigint(20) COMMENT "",
  3. `k2` datetime COMMENT "",
  4. `k3` varchar(20) COMMENT "",
  5. `k4` varchar(100) COMMENT "",
  6. `k5` float COMMENT ""
  7. ) ENGINE=ELASTICSEARCH
  8. PROPERTIES (
  9. "hosts" = "http://192.168.0.1:8200,http://192.168.0.2:8200",
  10. "index" = "test",
  11. "user" = "root",
  12. "password" = "root",
  13. "enable_docvalue_scan" = "true"
  14. );

参数说明:

参数说明
enable_docvalue_scan是否开启通过ES/Lucene列式存储获取查询字段的值,默认为false

开启后Doris从ES中获取数据会遵循以下两个原则:

  • 尽力而为: 自动探测要读取的字段是否开启列式存储(doc_value: true),如果获取的字段全部有列存,Doris会从列式存储中获取所有字段的值
  • 自动降级: 如果要获取的字段只要有一个字段没有列存,所有字段的值都会从行存_source中解析获取
优势:

默认情况下,Doris On ES会从行存也就是_source中获取所需的所有列,_source的存储采用的行式+json的形式存储,在批量读取性能上要劣于列式存储,尤其在只需要少数列的情况下尤为明显,只获取少数列的情况下,docvalue的性能大约是_source性能的十几倍

注意
  1. text类型的字段在ES中是没有列式存储,因此如果要获取的字段值有text类型字段会自动降级为从_source中获取
  2. 在获取的字段数量过多的情况下(>= 25),从docvalue中获取字段值的性能会和从_source中获取字段值基本一样

探测keyword类型字段(enable_keyword_sniff=true)

  1. CREATE EXTERNAL TABLE `test` (
  2. `k1` bigint(20) COMMENT "",
  3. `k2` datetime COMMENT "",
  4. `k3` varchar(20) COMMENT "",
  5. `k4` varchar(100) COMMENT "",
  6. `k5` float COMMENT ""
  7. ) ENGINE=ELASTICSEARCH
  8. PROPERTIES (
  9. "hosts" = "http://192.168.0.1:8200,http://192.168.0.2:8200",
  10. "index" = "test",
  11. "user" = "root",
  12. "password" = "root",
  13. "enable_keyword_sniff" = "true"
  14. );

参数说明:

参数说明
enable_keyword_sniff是否对ES中字符串类型分词类型(text) fields 进行探测,获取额外的未分词(keyword)字段名(multi-fields机制)

在ES中可以不建立index直接进行数据导入,这时候ES会自动创建一个新的索引,针对字符串类型的字段ES会创建一个既有text类型的字段又有keyword类型的字段,这就是ES的multi fields特性,mapping如下:

  1. "k4": {
  2. "type": "text",
  3. "fields": {
  4. "keyword": {
  5. "type": "keyword",
  6. "ignore_above": 256
  7. }
  8. }
  9. }

对k4进行条件过滤时比如=,Doris On ES会将查询转换为ES的TermQuery

SQL过滤条件:

  1. k4 = "Doris On ES"

转换成ES的query DSL为:

  1. "term" : {
  2. "k4": "Doris On ES"
  3. }

因为k4的第一字段类型为text,在数据导入的时候就会根据k4设置的分词器(如果没有设置,就是standard分词器)进行分词处理得到doris、on、es三个Term,如下ES analyze API分析:

  1. POST /_analyze
  2. {
  3. "analyzer": "standard",
  4. "text": "Doris On ES"
  5. }

分词的结果是:

  1. {
  2. "tokens": [
  3. {
  4. "token": "doris",
  5. "start_offset": 0,
  6. "end_offset": 5,
  7. "type": "<ALPHANUM>",
  8. "position": 0
  9. },
  10. {
  11. "token": "on",
  12. "start_offset": 6,
  13. "end_offset": 8,
  14. "type": "<ALPHANUM>",
  15. "position": 1
  16. },
  17. {
  18. "token": "es",
  19. "start_offset": 9,
  20. "end_offset": 11,
  21. "type": "<ALPHANUM>",
  22. "position": 2
  23. }
  24. ]
  25. }

查询时使用的是:

  1. "term" : {
  2. "k4": "Doris On ES"
  3. }

Doris On ES这个term匹配不到词典中的任何term,不会返回任何结果,而启用enable_keyword_sniff: true会自动将k4 = "Doris On ES"转换成k4.keyword = "Doris On ES"来完全匹配SQL语义,转换后的ES query DSL为:

  1. "term" : {
  2. "k4.keyword": "Doris On ES"
  3. }

k4.keyword 的类型是keyword,数据写入ES中是一个完整的term,所以可以匹配

开启节点自动发现, 默认为true(nodes_discovery=true)

  1. CREATE EXTERNAL TABLE `test` (
  2. `k1` bigint(20) COMMENT "",
  3. `k2` datetime COMMENT "",
  4. `k3` varchar(20) COMMENT "",
  5. `k4` varchar(100) COMMENT "",
  6. `k5` float COMMENT ""
  7. ) ENGINE=ELASTICSEARCH
  8. PROPERTIES (
  9. "hosts" = "http://192.168.0.1:8200,http://192.168.0.2:8200",
  10. "index" = "test",
  11. "user" = "root",
  12. "password" = "root",
  13. "nodes_discovery" = "true"
  14. );

参数说明:

参数说明
nodes_discovery是否开启es节点发现,默认为true

当配置为true时,Doris将从ES找到所有可用的相关数据节点(在上面分配的分片)。如果ES数据节点的地址没有被Doris BE访问,则设置为false。ES集群部署在与公共Internet隔离的内网,用户通过代理访问

ES集群是否开启https访问模式,如果开启应设置为true,默认为false(http_ssl_enabled=true)

  1. CREATE EXTERNAL TABLE `test` (
  2. `k1` bigint(20) COMMENT "",
  3. `k2` datetime COMMENT "",
  4. `k3` varchar(20) COMMENT "",
  5. `k4` varchar(100) COMMENT "",
  6. `k5` float COMMENT ""
  7. ) ENGINE=ELASTICSEARCH
  8. PROPERTIES (
  9. "hosts" = "http://192.168.0.1:8200,http://192.168.0.2:8200",
  10. "index" = "test",
  11. "user" = "root",
  12. "password" = "root",
  13. "http_ssl_enabled" = "true"
  14. );

参数说明:

参数说明
http_ssl_enabledES集群是否开启https访问模式

目前会fe/be实现方式为信任所有,这是临时解决方案,后续会使用真实的用户配置证书

查询用法

完成在Doris中建立ES外表后,除了无法使用Doris中的数据模型(rollup、预聚合、物化视图等)外并无区别

基本查询

  1. select * from es_table where k1 > 1000 and k3 ='term' or k4 like 'fu*z_'

扩展的esquery(field, QueryDSL)

通过esquery(field, QueryDSL)函数将一些无法用sql表述的query如match_phrase、geoshape等下推给ES进行过滤处理,esquery的第一个列名参数用于关联index,第二个参数是ES的基本Query DSL的json表述,使用花括号{}包含,json的root key有且只能有一个,如match_phrase、geo_shape、bool等

match_phrase查询:

  1. select * from es_table where esquery(k4, '{
  2. "match_phrase": {
  3. "k4": "doris on es"
  4. }
  5. }');

geo相关查询:

  1. select * from es_table where esquery(k4, '{
  2. "geo_shape": {
  3. "location": {
  4. "shape": {
  5. "type": "envelope",
  6. "coordinates": [
  7. [
  8. 13,
  9. 53
  10. ],
  11. [
  12. 14,
  13. 52
  14. ]
  15. ]
  16. },
  17. "relation": "within"
  18. }
  19. }
  20. }');

bool查询:

  1. select * from es_table where esquery(k4, ' {
  2. "bool": {
  3. "must": [
  4. {
  5. "terms": {
  6. "k1": [
  7. 11,
  8. 12
  9. ]
  10. }
  11. },
  12. {
  13. "terms": {
  14. "k2": [
  15. 100
  16. ]
  17. }
  18. }
  19. ]
  20. }
  21. }');

原理

  1. +----------------------------------------------+
  2. | |
  3. | Doris +------------------+ |
  4. | | FE +--------------+-------+
  5. | | | Request Shard Location
  6. | +--+-------------+-+ | |
  7. | ^ ^ | |
  8. | | | | |
  9. | +-------------------+ +------------------+ | |
  10. | | | | | | | | |
  11. | | +----------+----+ | | +--+-----------+ | | |
  12. | | | BE | | | | BE | | | |
  13. | | +---------------+ | | +--------------+ | | |
  14. +----------------------------------------------+ |
  15. | | | | | | |
  16. | | | | | | |
  17. | HTTP SCROLL | | HTTP SCROLL | |
  18. +-----------+---------------------+------------+ |
  19. | | v | | v | | |
  20. | | +------+--------+ | | +------+-------+ | | |
  21. | | | | | | | | | | |
  22. | | | DataNode | | | | DataNode +<-----------+
  23. | | | | | | | | | | |
  24. | | | +<--------------------------------+
  25. | | +---------------+ | | |--------------| | | |
  26. | +-------------------+ +------------------+ | |
  27. | Same Physical Node | |
  28. | | |
  29. | +-----------------------+ | |
  30. | | | | |
  31. | | MasterNode +<-----------------+
  32. | ES | | |
  33. | +-----------------------+ |
  34. +----------------------------------------------+
  1. 创建ES外表后,FE会请求建表指定的主机,获取所有节点的HTTP端口信息以及index的shard分布信息等,如果请求失败会顺序遍历host列表直至成功或完全失败

  2. 查询时会根据FE得到的一些节点信息和index的元数据信息,生成查询计划并发给对应的BE节点

  3. BE节点会根据就近原则即优先请求本地部署的ES节点,BE通过HTTP Scroll方式流式的从ES index的每个分片中并发的从_sourcedocvalue中获取数据

  4. Doris计算完结果后,返回给用户

最佳实践

时间类型字段使用建议

在ES中,时间类型的字段使用十分灵活,但是在Doris On ES中如果对时间类型字段的类型设置不当,则会造成过滤条件无法下推

创建索引时对时间类型格式的设置做最大程度的格式兼容:

  1. "dt": {
  2. "type": "date",
  3. "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
  4. }

在Doris中建立该字段时建议设置为datedatetime,也可以设置为varchar类型, 使用如下SQL语句都可以直接将过滤条件下推至ES:

  1. select * from doe where k2 > '2020-06-21';
  2. select * from doe where k2 < '2020-06-21 12:00:00';
  3. select * from doe where k2 < 1593497011;
  4. select * from doe where k2 < now();
  5. select * from doe where k2 < date_format(now(), '%Y-%m-%d');

注意:

  • 在ES中如果不对时间类型的字段设置format, 默认的时间类型字段格式为
  1. strict_date_optional_time||epoch_millis
  • 导入到ES的日期字段如果是时间戳需要转换成ms, ES内部处理时间戳都是按照ms进行处理的, 否则Doris On ES会出现显示错误

获取ES元数据字段_id

导入文档在不指定_id的情况下ES会给每个文档分配一个全局唯一的_id即主键, 用户也可以在导入时为文档指定一个含有特殊业务意义的_id; 如果需要在Doris On ES中获取该字段值,建表时可以增加类型为varchar_id字段:

  1. CREATE EXTERNAL TABLE `doe` (
  2. `_id` varchar COMMENT "",
  3. `city` varchar COMMENT ""
  4. ) ENGINE=ELASTICSEARCH
  5. PROPERTIES (
  6. "hosts" = "http://127.0.0.1:8200",
  7. "user" = "root",
  8. "password" = "root",
  9. "index" = "doe"
  10. }

注意:

  1. _id字段的过滤条件仅支持=in两种
  2. _id字段只能是varchar类型

Q&A

  1. Doris On ES对ES的版本要求

    ES主版本大于5,ES在2.x之前和5.x之后数据的扫描方式不同,目前支持仅5.x之后的

  2. 是否支持X-Pack认证的ES集群

    支持所有使用HTTP Basic认证方式的ES集群

  3. 一些查询比请求ES慢很多

    是,比如_count相关的query等,ES内部会直接读取满足条件的文档个数相关的元数据,不需要对真实的数据进行过滤

  4. 聚合操作是否可以下推

    目前Doris On ES不支持聚合操作如sum, avg, min/max 等下推,计算方式是批量流式的从ES获取所有满足条件的文档,然后在Doris中进行计算