Elasticsearch

Elasticsearch (ES) Catalogs in Doris support auto-mapping of ES metadata. Users can utilize the full-text search capability of ES in combination of the distributed query planning capability of Doris to provide a full-fledged OLAP solution that is able to perform:

  1. Multi-index distributed Join queries in ES;
  2. Join queries across Doris and ES as well as full-text search and filter.

Usage

  1. Doris supports Elasticsearch 5.x and newer versions.

Create Catalog

  1. CREATE CATALOG es PROPERTIES (
  2. "type"="es",
  3. "hosts"="http://127.0.0.1:9200"
  4. );

Since there is no concept of “database” in ES, after connecting to ES, Doris will automatically generate a unique database: default_db.

After switching to the ES Catalog, you will be in the dafault_db so you don’t need to execute the USE default_db command.

Parameter Description

ParameterRequired or NotDefault ValueDescription
hostsYesES address, can be one or multiple addresses, or the load balancer address of ES
userNoEmptyES username
passwordNoEmptyPassword of the corresponding user
doc_value_scanNotrueWhether to obtain value of the target field by ES/Lucene columnar storage
keyword_sniffNotrueWhether to sniff the text.fields in ES based on keyword; If this is set to false, the system will perform matching after tokenization.
nodes_discoveryNotrueWhether to enable ES node discovery, set to true by default; set to false in network isolation environments and only connected to specified nodes
sslNofalseWhether to enable HTTPS access mode for ES, currently follows a “Trust All” method in FE/BE
mapping_es_idNofalseWhether to map the _id field in the ES index
like_push_downNotrueWhether to transform like to wildcard push down to es, this increases the cpu consumption of the es.
include_hidden_indexNofalseWhether to include hidden index, default to false.
  1. In terms of authentication, only HTTP Basic authentication is supported and it requires the user to have read privilege for the index and paths including /_cluster/state/ and _nodes/http ; if you have not enabled security authentication for the cluster, you don’t need to set the user and password.

  2. If there are multiple types in the index in 5.x and 6.x, the first type is taken by default.

Column Type Mapping

ES TypeDoris TypeComment
nullnull
booleanboolean
bytetinyint
shortsmallint
integerint
longbigint
unsigned_longlargeint
floatfloat
half_floatfloat
doubledouble
scaled_floatdouble
datedateOnly support default/yyyy-MM-dd HH:mm:ss/yyyy-MM-dd/epoch_millis format
keywordstring
textstring
ipstring
constant_keywordstring
wildcardstring
nestedjson
objectjson
otherunsupported

Array Type

Elasticsearch does not have an explicit array type, but one of its fields can contain 0 or more values. To indicate that a field is an array type, a specific doris structural annotation can be added to the _meta section of the index mapping. For Elasticsearch 6.x and before release, please refer _meta.

For example, suppose there is an index doc containing the following data structure.

  1. {
  2. "array_int_field": [1, 2, 3, 4],
  3. "array_string_field": ["doris", "is", "the", "best"],
  4. "id_field": "id-xxx-xxx",
  5. "timestamp_field": "2022-11-12T12:08:56Z",
  6. "array_object_field": [
  7. {
  8. "name": "xxx",
  9. "age": 18
  10. }
  11. ]
  12. }

The array fields of this structure can be defined by using the following command to add the field property definition to the _meta.doris property of the target index mapping.

  1. # ES 7.x and above
  2. curl -X PUT "localhost:9200/doc/_mapping?pretty" -H 'Content-Type:application/json' -d '
  3. {
  4. "_meta": {
  5. "doris":{
  6. "array_fields":[
  7. "array_int_field",
  8. "array_string_field",
  9. "array_object_field"
  10. ]
  11. }
  12. }
  13. }'
  14. # ES 6.x and before
  15. curl -X PUT "localhost:9200/doc/_mapping?pretty" -H 'Content-Type: application/json' -d '
  16. {
  17. "_doc": {
  18. "_meta": {
  19. "doris":{
  20. "array_fields":[
  21. "array_int_field",
  22. "array_string_field",
  23. "array_object_field"
  24. ]
  25. }
  26. }
  27. }
  28. }
  29. '

array_fields:Used to indicate a field that is an array type.

Best Practice

Predicate Pushdown

ES Catalogs support predicate pushdown to ES, which means only the filtered data will be returned. This can markedly improve query performance and reduce usage of CPU, memory, and IO in both Doris and ES.

For the sake of optimization, operators will be converted into the following ES queries:

SQL syntaxES 5.x+ syntax
=term query
interms query
> , < , >= , ⇐range query
andbool.filter
orbool.should
notbool.must_not
not inbool.must_not + terms query
is_not_nullexists query
is_nullbool.must_not + exists query
esqueryES-native JSON QueryDSL

Columnar Scan for Faster Queries (enable_docvalue_scan=true)

Set "enable_docvalue_scan" = "true".

After this, when obtaining data from ES, Doris will follow these rules:

  • Try and see: Doris will automatically check if columnar storage is enabled for the target fields (doc_value: true), if it is, Doris will obtain all values in the fields from the columnar storage.
  • Auto-downgrading: If any one of the target fields is not available in columnar storage, Doris will parse and obtain all target data from row storage (_source).

Benefits

By default, Doris On ES obtains all target columns from _source, which is in row storage and JSON format. Compared to columnar storage, _source is slow in batch read. In particular, when the system only needs to read small number of columns, the performance of docvalue can be about a dozen times faster than that of _source.

Note

  1. Columnar storage is not available for text fields in ES. Thus, if you need to obtain fields containing text values, you will need to obtain them from _source.
  2. When obtaining large numbers of fields (>= 25), the performances of docvalue and _source are basically equivalent.
  3. The keyword type field, due to the ignore_above parameter’s limit, long text fields exceeding this limit will be ignored, so the result may be empty. In this case, you need to turn off enable_docvalue_scan and get the result from _source.

Sniff Keyword Fields

Set "enable_keyword_sniff" = "true".

ES allows direct data ingestion without an index since it will automatically create an index after ingestion. For string fields, ES will create a field with both text and keyword types. This is how the Multi-Field feature of ES works. The mapping is as follows:

  1. "k4": {
  2. "type": "text",
  3. "fields": {
  4. "keyword": {
  5. "type": "keyword",
  6. "ignore_above": 256
  7. }
  8. }
  9. }

For example, to conduct “=” filtering on k4, Doris on ES will convert the filtering operation into an ES TermQuery.

The original SQL filter:

  1. k4 = "Doris On ES"

The converted ES query DSL:

  1. "term" : {
  2. "k4": "Doris On ES"
  3. }

Since the first field of k4 is text, it will be tokenized by the analyzer set for k4 (or by the standard analyzer if no analyzer has been set for k4) after data ingestion. As a result, it will be tokenized into three terms: “Doris”, “on”, and “ES”.

The details are as follows:

  1. POST /_analyze
  2. {
  3. "analyzer": "standard",
  4. "text": "Doris On ES"
  5. }

The tokenization results:

  1. {
  2. "tokens": [
  3. {
  4. "token": "doris",
  5. "start_offset": 0,
  6. "end_offset": 5,
  7. "type": "<ALPHANUM>",
  8. "position": 0
  9. },
  10. {
  11. "token": "on",
  12. "start_offset": 6,
  13. "end_offset": 8,
  14. "type": "<ALPHANUM>",
  15. "position": 1
  16. },
  17. {
  18. "token": "es",
  19. "start_offset": 9,
  20. "end_offset": 11,
  21. "type": "<ALPHANUM>",
  22. "position": 2
  23. }
  24. ]
  25. }

If you conduct a query as follows:

  1. "term" : {
  2. "k4": "Doris On ES"
  3. }

Since there is no term in the dictionary that matches the term Doris On ES, no result will be returned.

However, if you have set enable_keyword_sniff: true, the system will convert k4 = "Doris On ES" to k4.keyword = "Doris On ES" to match the SQL semantics. The converted ES query DSL will be:

  1. "term" : {
  2. "k4.keyword": "Doris On ES"
  3. }

k4.keyword is of keyword type, so the data is written in ES as a complete term, allowing for successful matching.

Auto Node Discovery, Set to True by Default (nodes_discovery=true)

Set "nodes_discovery" = "true".

Then, Doris will discover all available data nodes (the allocated shards) in ES. If Doris BE hasn’t accessed the ES data node addresses, then set "nodes_discovery" = "false" . ES clusters are deployed in private networks that are isolated from public Internet, so users will need proxy access.

HTTPS Access Mode for ES Clusters

Set "ssl" = "true".

A temporary solution is to implement a “Trust All” method in FE/BE. In the future, the real user configuration certificates will be used.

Query Usage

You can use the ES external tables in Doris the same way as using Doris internal tables, except that the Doris data models (Rollup, Pre-Aggregation, and Materialized Views) are unavailable.

Basic Query

  1. select * from es_table where k1 > 1000 and k3 ='term' or k4 like 'fu*z_'

Extended esquery(field, QueryDSL)

The esquery(field, QueryDSL) function can be used to push queries that cannot be expressed in SQL, such as match_phrase and geoshape , to ES for filtering.

In esquery, the first parameter (the column name) is used to associate with index, while the second parameter is the JSON expression of basic Query DSL in ES, which is surrounded by {}. The root key in JSON is unique, which can be match_phrase, geo_shape or bool , etc.

A match_phrase query:

  1. select * from es_table where esquery(k4, '{
  2. "match_phrase": {
  3. "k4": "doris on es"
  4. }
  5. }');

A geo query:

  1. select * from es_table where esquery(k4, '{
  2. "geo_shape": {
  3. "location": {
  4. "shape": {
  5. "type": "envelope",
  6. "coordinates": [
  7. [
  8. 13,
  9. 53
  10. ],
  11. [
  12. 14,
  13. 52
  14. ]
  15. ]
  16. },
  17. "relation": "within"
  18. }
  19. }
  20. }');

A bool query:

  1. select * from es_table where esquery(k4, ' {
  2. "bool": {
  3. "must": [
  4. {
  5. "terms": {
  6. "k1": [
  7. 11,
  8. 12
  9. ]
  10. }
  11. },
  12. {
  13. "terms": {
  14. "k2": [
  15. 100
  16. ]
  17. }
  18. }
  19. ]
  20. }
  21. }');

Suggestions for Time Fields

These are only applicable for ES external tables. Time fields will be automatically mapped to Date or Datetime type in ES Catalogs.

ES boasts flexible usage of time fields, but in ES external tables, improper type setting of time fields will result in predicate pushdown failures.

It is recommended to allow the highest level of format compatibility for time fields when creating an index:

  1. "dt": {
  2. "type": "date",
  3. "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
  4. }

When creating this field in Doris, it is recommended to set its type to date or datetime (or varchar ) . You can use the following SQL statements to push the filters down to ES.

  1. select * from doe where k2 > '2020-06-21';
  2. select * from doe where k2 < '2020-06-21 12:00:00';
  3. select * from doe where k2 < 1593497011;
  4. select * from doe where k2 < now();
  5. select * from doe where k2 < date_format(now(), '%Y-%m-%d');

Note:

  • The default format of time fields in ES is:
  1. strict_date_optional_time||epoch_millis
  • Timestamps ingested into ES need to be converted into ms, which is the internal processing format in ES; otherwise errors will occur in ES external tables.

Obtain ES Metadata Field _id

Each ingested files, if not specified with an _id , will be given a globally unique _id, which is the primary key. Users can assign an _id with unique business meanings to the files during ingestion.

To obtain such field values from ES external tables, you can add an _id field of varchar type when creating tables.

  1. CREATE EXTERNAL TABLE `doe` (
  2. `_id` varchar COMMENT "",
  3. `city` varchar COMMENT ""
  4. ) ENGINE=ELASTICSEARCH
  5. PROPERTIES (
  6. "hosts" = "http://127.0.0.1:8200",
  7. "user" = "root",
  8. "password" = "root",
  9. "index" = "doe"
  10. }

To obtain such field values from ES Catalogs, please set "mapping_es_id" = "true".

Note:

  1. The _id field only supports = and in filtering.
  2. The_id field must be of varchar type.

Getting globally ordered query results

ES query results sorted by scores are useful in scenarios such as relevance sorting, prioritizing important content, etc. Doris querying ES pulls data according to the distribution of shards in the ES index in order to take full advantage of the MPP architecture.
In order to get globally ordered sorting results, you need to do a single point query on ES. This can be controlled by the session variable enable_es_parallel_scroll (default true).
When enable_es_parallel_scroll=false is set, Doris will send a scroll query without shard_preference and sort information to the ES cluster to get globally ordered results.
Note: Use with caution when the query result set is large.

Modify the batch size for scroll requests.

The batch of a scroll request is 4064 by default, and can be changed with the session variable batch_size.

FAQ

  1. Are X-Pack authenticated ES clusters supported?

    All ES clusters with HTTP Basic authentications are supported.

  2. Why are some queries require longer response time than those in ES?

    For _count queries, ES can directly read the metadata regarding the number of the specified files instead of filtering the original data. This is a huge time saver.

  3. Can aggregation operations be pushed down?

    Currently, Doris On ES does not support pushdown for aggregations such as sum, avg, and min/max. In such operations, Doris obtains all files that met the specified conditions from ES and then conducts computing internally.

Appendix

How Doris Conducts Queries in ES

  1. +----------------------------------------------+
  2. | |
  3. | Doris +------------------+ |
  4. | | FE +--------------+-------+
  5. | | | Request Shard Location
  6. | +--+-------------+-+ | |
  7. | ^ ^ | |
  8. | | | | |
  9. | +-------------------+ +------------------+ | |
  10. | | | | | | | | |
  11. | | +----------+----+ | | +--+-----------+ | | |
  12. | | | BE | | | | BE | | | |
  13. | | +---------------+ | | +--------------+ | | |
  14. +----------------------------------------------+ |
  15. | | | | | | |
  16. | | | | | | |
  17. | HTTP SCROLL | | HTTP SCROLL | |
  18. +-----------+---------------------+------------+ |
  19. | | v | | v | | |
  20. | | +------+--------+ | | +------+-------+ | | |
  21. | | | | | | | | | | |
  22. | | | DataNode | | | | DataNode +<-----------+
  23. | | | | | | | | | | |
  24. | | | +<--------------------------------+
  25. | | +---------------+ | | |--------------| | | |
  26. | +-------------------+ +------------------+ | |
  27. | Same Physical Node | |
  28. | | |
  29. | +-----------------------+ | |
  30. | | | | |
  31. | | MasterNode +<-----------------+
  32. | ES | | |
  33. | +-----------------------+ |
  34. +----------------------------------------------+
  1. Doris FE sends a request to the specified host for table creation in order to obtain information about the HTTP port and the index shard allocation.

  2. Based on the information about node and index metadata from FE, Doris generates a query plan and send it to the corresponding BE node.

  3. Following the principle of proximity, the BE node sends request to the locally deployed ES node, and obtain data from _source or docvalue from each shard of ES index concurrently by way of HTTP Scroll.

  4. Doris returns the computing results to the user.