Elasticsearch
概览
Elasticsearch Load 节点允许将数据写入到 Elasticsearch 引擎的索引中。本文档描述运行 SQL 查询时如何设置 Elasticsearch Load 节点。
连接器可以工作在 Upsert 模式,使用 DDL 中定义的主键与外部系统交换 UPDATE/DELETE 消息。
如果 DDL 中没有定义主键,那么连接器只能工作在 Append 模式,只能与外部系统交换 INSERT 消息。
支持的版本
Load 节点 | 版本 |
---|---|
elasticsearch | Elasticsearch: 6.x, 7.x |
依赖
为了设置 Elasticsearch Load 节点,下表提供了使用构建自动化工具(例如 Maven 或 SBT)和带有 Sort Connectors JAR 包的 SQL 客户端的两个项目的依赖关系信息。
- Elasticsearch 6
<dependency>
<groupId>org.apache.inlong</groupId>
<artifactId>sort-connector-elasticsearch6</artifactId>
<version>1.2.0-incubating</version>
</dependency>
- Elasticsearch 7
<dependency>
<groupId>org.apache.inlong</groupId>
<artifactId>sort-connector-elasticsearch7</artifactId>
<version>1.2.0-incubating</version>
</dependency>
如何创建一个 Elasticsearch Load 节点
SQL API 用法
下面列子展示了如何利用Flink SQL
创建一个 Elasticsearch Load 节点:
CREATE TABLE myUserTable (
user_id STRING,
user_name STRING,
uv BIGINT,
pv BIGINT,
PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = 'http://localhost:9200',
'index' = 'users'
);
InLong Dashboard 用法
TODO: 将在未来支持这个特性。
InLong Manager Client 用法
TODO: 将在未来支持这个特性。
Elasticsearch Load 节点参数
参数 | 是否必选 | 默认值 | 数据类型 | 描述 |
---|---|---|---|---|
connector | 必选 | (none) | String | 指定要使用的连接器,有效值为:
|
hosts | 必选 | (none) | String | 要连接到的一台或多台 Elasticsearch 主机,例如 ‘http://host_name:9092;http://host_name:9093‘ 。 |
index | 必选 | (none) | String | Elasticsearch 中每条记录的索引。可以是一个静态索引(例如 ‘myIndex’ )或一个动态索引(例如 ‘index-{logts|yyyy-MM-dd}’ )。 更多详细信息,请参见下面的动态索引部分。 |
document-type | 6.x 版本中必选 | (none) | String | Elasticsearch 文档类型。在 elasticsearch-7 中不再需要。 |
document-id.key-delimiter | 可选 | String | 复合键的分隔符(默认为”_”),例如,指定为”$”将导致文档 ID 为”KEY1$KEY2$KEY3”。 | |
username | 可选 | (none) | String | 用于连接 Elasticsearch 实例的用户名。请注意,Elasticsearch 没有预绑定安全特性,但你可以通过如下指南启用它来保护 Elasticsearch 集群。 |
password | 可选 | (none) | String | 用于连接 Elasticsearch 实例的密码。如果配置了username ,则此选项也必须配置为非空字符串。 |
failure-handler | 可选 | fail | String | 对 Elasticsearch 请求失败情况下的失败处理策略。有效策略为:
|
sink.flush-on-checkpoint | 可选 | true | Boolean | 是否在 checkpoint 时执行 flush。禁用后,在 checkpoint 时 sink 将不会等待所有的 pending 请求被 Elasticsearch 确认。因此,sink 不会为请求的 at-least-once 交付提供任何有力保证。 |
sink.bulk-flush.max-actions | 可选 | 1000 | Integer | 每个批量请求的最大缓冲操作数。 可以设置为‘0’ 来禁用它。 |
sink.bulk-flush.max-size | 可选 | 2mb | MemorySize | 每个批量请求的缓冲操作在内存中的最大值。单位必须为 MB。 可以设置为‘0’ 来禁用它。 |
sink.bulk-flush.interval | 可选 | 1s | Duration | flush 缓冲操作的间隔。 可以设置为‘0’ 来禁用它。注意,‘sink.bulk-flush.max-size’ 和‘sink.bulk-flush.max-actions’ 都设置为‘0’ 的这种 flush 间隔设置允许对缓冲操作进行完全异步处理。 |
sink.bulk-flush.backoff.strategy | 可选 | DISABLED | String | 指定在由于临时请求错误导致任何 flush 操作失败时如何执行重试。有效策略为:
|
sink.bulk-flush.backoff.max-retries | 可选 | 8 | Integer | 最大回退重试次数。 |
sink.bulk-flush.backoff.delay | 可选 | 50ms | Duration | 每次回退尝试之间的延迟。对于 CONSTANT 回退策略,该值是每次重试之间的延迟。对于 EXPONENTIAL 回退策略,该值是初始的延迟。 |
connection.max-retry-timeout | 可选 | (none) | Duration | 最大重试超时时间。 |
connection.path-prefix | 可选 | (none) | String | 添加到每个 REST 通信中的前缀字符串,例如,‘/v1’ 。 |
format | 可选 | json | String | Elasticsearch 连接器支持指定格式。该格式必须生成一个有效的 json 文档。 默认使用内置的 ‘json’ 格式。更多详细信息,请参阅 JSON Format 页面。 |
特性
Key 处理
Elasticsearch Load 节点可以根据是否定义了主键来确定是在 Upsert 模式还是 Append 模式下工作。 如果定义了主键,Elasticsearch Load 节点将以 Upsert 模式工作,该模式可以消费包含 UPDATE/DELETE 消息的查询。 如果未定义主键,Elasticsearch Load 节点将以 Append 模式工作,该模式只能消费包含 INSERT 消息的查询。
在 Elasticsearch Load 节点中,主键用于计算 Elasticsearch 的文档 Id,文档 Id 为最多 512 字节且不包含空格的字符串。 Elasticsearch Load 节点通过使用 document-id.key-delimiter
指定的键分隔符按照 DDL 中定义的顺序连接所有主键字段,为每一行记录生成一个文档 Id 字符串。 某些类型不允许作为主键字段,因为它们没有对应的字符串表示形式,例如,BYTES
,ROW
,ARRAY
,MAP
等。 如果未指定主键,Elasticsearch 将自动生成文档 Id。
有关 PRIMARY KEY 语法的更多详细信息,请参见 CREATE TABLE DDL。
动态索引
Elasticsearch Load 节点同时支持静态索引和动态索引。
如果你想使用静态索引,则 index
选项值应为纯字符串,例如 'myusers'
,所有记录都将被写入到 “myusers” 索引中。
如果你想使用动态索引,你可以使用 {field_name}
来引用记录中的字段值来动态生成目标索引。 你也可以使用 '{field_name|date_format_string}'
将 TIMESTAMP/DATE/TIME
类型的字段值转换为 date_format_string
指定的格式。 date_format_string
与 Java 的 DateTimeFormatter 兼容。 例如,如果选项值设置为 'myusers-{'{log_ts|yyyy-MM-dd}'}'
,则 log_ts
字段值为 2020-03-27 12:25:55
的记录将被写入到 “myusers-2020-03-27” 索引中。
数据类型映射
JSON type | Flink SQL type |
---|---|
string | CHAR / VARCHAR / STRING |
boolean | BOOLEAN |
string with encoding: base64 | BINARY / VARBINARY |
number | DECIMAL |
number | TINYINT |
number | SMALLINT |
number | INT |
number | BIGINT |
number | FLOAT |
number | DOUBLE |
string with format: date | DATE |
string with format: time | TIME |
string with format: date-time | TIMESTAMP |
string with format: date-time (with UTC time zone) | TIMESTAMP_WITH_LOCAL_TIME_ZONE |
number | INTERVAL |
array | ARRAY |
object | MAP / MULTISET |
object | ROW |