Elasticsearch 连接器

此连接器提供可以向 Elasticsearch 索引请求文档操作的 sinks。 要使用此连接器,请根据 Elasticsearch 的安装版本将以下依赖之一添加到你的项目中:

Elasticsearch 版本Maven 依赖
6.x

There is no connector (yet) available for Flink version 1.20.

7.x

There is no connector (yet) available for Flink version 1.20.

为了在 PyFlink 作业中使用 ,需要添加下列依赖:

VersionPyFlink JAR
flink-connector-elasticsearch6There is no SQL jar (yet) available for Flink version 1.20.
flink-connector-elasticsearch7There is no SQL jar (yet) available for Flink version 1.20.

在 PyFlink 中如何添加 JAR 包依赖请参考 Python 依赖管理

请注意,流连接器目前不是二进制发行版的一部分。 有关如何将程序和用于集群执行的库一起打包,参考此文档

安装 Elasticsearch

Elasticsearch 集群的设置可以参考此文档

Elasticsearch Sink

下面的示例展示了如何配置并创建一个 sink:

Java

Elasticsearch 6:

  1. import org.apache.flink.api.common.functions.MapFunction;
  2. import org.apache.flink.connector.elasticsearch.sink.Elasticsearch6SinkBuilder;
  3. import org.apache.flink.streaming.api.datastream.DataStream;
  4. import org.apache.http.HttpHost;
  5. import org.elasticsearch.action.index.IndexRequest;
  6. import org.elasticsearch.client.Requests;
  7. import java.util.HashMap;
  8. import java.util.Map;
  9. DataStream<String> input = ...;
  10. input.sinkTo(
  11. new Elasticsearch6SinkBuilder<String>()
  12. // 下面的设置使 sink 在接收每个元素之后立即提交,否则这些元素将被缓存起来
  13. .setBulkFlushMaxActions(1)
  14. .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
  15. .setEmitter(
  16. (element, context, indexer) ->
  17. indexer.add(createIndexRequest(element)))
  18. .build());
  19. private static IndexRequest createIndexRequest(String element) {
  20. Map<String, Object> json = new HashMap<>();
  21. json.put("data", element);
  22. return Requests.indexRequest()
  23. .index("my-index")
  24. .type("my-type")
  25. .id(element)
  26. .source(json);
  27. }

Elasticsearch 7:

  1. import org.apache.flink.api.common.functions.MapFunction;
  2. import org.apache.flink.connector.elasticsearch.sink.Elasticsearch7SinkBuilder;
  3. import org.apache.flink.streaming.api.datastream.DataStream;
  4. import org.apache.http.HttpHost;
  5. import org.elasticsearch.action.index.IndexRequest;
  6. import org.elasticsearch.client.Requests;
  7. import java.util.HashMap;
  8. import java.util.Map;
  9. DataStream<String> input = ...;
  10. input.sinkTo(
  11. new Elasticsearch7SinkBuilder<String>()
  12. // 下面的设置使 sink 在接收每个元素之后立即提交,否则这些元素将被缓存起来
  13. .setBulkFlushMaxActions(1)
  14. .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
  15. .setEmitter(
  16. (element, context, indexer) ->
  17. indexer.add(createIndexRequest(element)))
  18. .build());
  19. private static IndexRequest createIndexRequest(String element) {
  20. Map<String, Object> json = new HashMap<>();
  21. json.put("data", element);
  22. return Requests.indexRequest()
  23. .index("my-index")
  24. .id(element)
  25. .source(json);
  26. }

Scala

Elasticsearch 6:

  1. import org.apache.flink.api.connector.sink.SinkWriter
  2. import org.apache.flink.connector.elasticsearch.sink.{Elasticsearch6SinkBuilder, RequestIndexer}
  3. import org.apache.flink.streaming.api.datastream.DataStream
  4. import org.apache.http.HttpHost
  5. import org.elasticsearch.action.index.IndexRequest
  6. import org.elasticsearch.client.Requests
  7. val input: DataStream[String] = ...
  8. input.sinkTo(
  9. new Elasticsearch6SinkBuilder[String]
  10. // 下面的设置使 sink 在接收每个元素之后立即提交,否则这些元素将被缓存起来
  11. .setBulkFlushMaxActions(1)
  12. .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
  13. .setEmitter((element: String, context: SinkWriter.Context, indexer: RequestIndexer) =>
  14. indexer.add(createIndexRequest(element)))
  15. .build())
  16. def createIndexRequest(element: (String)): IndexRequest = {
  17. val json = Map(
  18. "data" -> element.asInstanceOf[AnyRef]
  19. )
  20. Requests.indexRequest.index("my-index").source(mapAsJavaMap(json))
  21. }

Elasticsearch 7:

  1. import org.apache.flink.api.connector.sink.SinkWriter
  2. import org.apache.flink.connector.elasticsearch.sink.{Elasticsearch7SinkBuilder, RequestIndexer}
  3. import org.apache.flink.streaming.api.datastream.DataStream
  4. import org.apache.http.HttpHost
  5. import org.elasticsearch.action.index.IndexRequest
  6. import org.elasticsearch.client.Requests
  7. val input: DataStream[String] = ...
  8. input.sinkTo(
  9. new Elasticsearch7SinkBuilder[String]
  10. // 下面的设置使 sink 在接收每个元素之后立即提交,否则这些元素将被缓存起来
  11. .setBulkFlushMaxActions(1)
  12. .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
  13. .setEmitter((element: String, context: SinkWriter.Context, indexer: RequestIndexer) =>
  14. indexer.add(createIndexRequest(element)))
  15. .build())
  16. def createIndexRequest(element: (String)): IndexRequest = {
  17. val json = Map(
  18. "data" -> element.asInstanceOf[AnyRef]
  19. )
  20. Requests.indexRequest.index("my-index").`type`("my-type").source(mapAsJavaMap(json))
  21. }

Python

Elasticsearch 6 静态索引:

  1. from pyflink.datastream.connectors.elasticsearch import Elasticsearch6SinkBuilder, ElasticsearchEmitter
  2. env = StreamExecutionEnvironment.get_execution_environment()
  3. env.add_jars(ELASTICSEARCH_SQL_CONNECTOR_PATH)
  4. input = ...
  5. # 下面的 set_bulk_flush_max_actions 使 sink 在接收每个元素之后立即提交,否则这些元素将被缓存起来
  6. es6_sink = Elasticsearch6SinkBuilder() \
  7. .set_bulk_flush_max_actions(1) \
  8. .set_emitter(ElasticsearchEmitter.static_index('foo', 'id', 'bar')) \
  9. .set_hosts(['localhost:9200']) \
  10. .build()
  11. input.sink_to(es6_sink).name('es6 sink')

Elasticsearch 6 动态索引:

  1. from pyflink.datastream.connectors.elasticsearch import Elasticsearch6SinkBuilder, ElasticsearchEmitter
  2. env = StreamExecutionEnvironment.get_execution_environment()
  3. env.add_jars(ELASTICSEARCH_SQL_CONNECTOR_PATH)
  4. input = ...
  5. es_sink = Elasticsearch6SinkBuilder() \
  6. .set_emitter(ElasticsearchEmitter.dynamic_index('name', 'id', 'bar')) \
  7. .set_hosts(['localhost:9200']) \
  8. .build()
  9. input.sink_to(es6_sink).name('es6 dynamic index sink')

Elasticsearch 7 静态索引:

  1. from pyflink.datastream.connectors.elasticsearch import Elasticsearch7SinkBuilder, ElasticsearchEmitter
  2. env = StreamExecutionEnvironment.get_execution_environment()
  3. env.add_jars(ELASTICSEARCH_SQL_CONNECTOR_PATH)
  4. input = ...
  5. # 下面的 set_bulk_flush_max_actions 使 sink 在接收每个元素之后立即提交,否则这些元素将被缓存起来
  6. es7_sink = Elasticsearch7SinkBuilder() \
  7. .set_bulk_flush_max_actions(1) \
  8. .set_emitter(ElasticsearchEmitter.static('foo', 'id')) \
  9. .set_hosts(['localhost:9200']) \
  10. .build()
  11. input.sink_to(es7_sink).name('es7 sink')

Elasticsearch 7 动态索引:

  1. from pyflink.datastream.connectors.elasticsearch import Elasticsearch7SinkBuilder, ElasticsearchEmitter
  2. env = StreamExecutionEnvironment.get_execution_environment()
  3. env.add_jars(ELASTICSEARCH_SQL_CONNECTOR_PATH)
  4. input = ...
  5. es7_sink = Elasticsearch7SinkBuilder() \
  6. .set_emitter(ElasticsearchEmitter.dynamic_index('name', 'id')) \
  7. .set_hosts(['localhost:9200']) \
  8. .build()
  9. input.sink_to(es7_sink).name('es7 dynamic index sink')

需要注意的是,该示例仅演示了对每个传入的元素执行单个索引请求。 通常,ElasticsearchSinkFunction 可用于执行多个不同类型的请求(例如 DeleteRequestUpdateRequest 等)。

在内部,Flink Elasticsearch Sink 的每个并行实例使用一个 BulkProcessor 向集群发送操作请求。 这会在元素批量发送到集群之前进行缓存。 BulkProcessor 一次执行一个批量请求,即不会存在两个并行刷新缓存的操作。

Elasticsearch Sinks 和容错

通过启用 Flink checkpoint,Flink Elasticsearch Sink 保证至少一次将操作请求发送到 Elasticsearch 集群。 这是通过在进行 checkpoint 时等待 BulkProcessor 中所有挂起的操作请求来实现。 这有效地保证了在触发 checkpoint 之前所有的请求被 Elasticsearch 成功确认,然后继续处理发送到 sink 的记录。

关于 checkpoint 和容错的更多详细信息,请参见容错文档

要使用具有容错特性的 Elasticsearch Sinks,需要在执行环境中启用作业拓扑的 checkpoint:

Java

Elasticsearch 6:

  1. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. env.enableCheckpointing(5000); // 每 5000 毫秒执行一次 checkpoint
  3. Elasticsearch6SinkBuilder sinkBuilder = new Elasticsearch6SinkBuilder<String>()
  4. .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
  5. .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
  6. .setEmitter(
  7. (element, context, indexer) ->
  8. indexer.add(createIndexRequest(element)));

Elasticsearch 7:

  1. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. env.enableCheckpointing(5000); // 每 5000 毫秒执行一次 checkpoint
  3. Elasticsearch7SinkBuilder sinkBuilder = new Elasticsearch7SinkBuilder<String>()
  4. .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
  5. .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
  6. .setEmitter(
  7. (element, context, indexer) ->
  8. indexer.add(createIndexRequest(element)));

Scala

Elasticsearch 6:

  1. val env = StreamExecutionEnvironment.getExecutionEnvironment()
  2. env.enableCheckpointing(5000) // 每 5000 毫秒执行一次 checkpoint
  3. val sinkBuilder = new Elasticsearch6SinkBuilder[String]
  4. .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
  5. .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
  6. .setEmitter((element: String, context: SinkWriter.Context, indexer: RequestIndexer) =>
  7. indexer.add(createIndexRequest(element)))

Elasticsearch 7:

  1. val env = StreamExecutionEnvironment.getExecutionEnvironment()
  2. env.enableCheckpointing(5000) // 每 5000 毫秒执行一次 checkpoint
  3. val sinkBuilder = new Elasticsearch7SinkBuilder[String]
  4. .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
  5. .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
  6. .setEmitter((element: String, context: SinkWriter.Context, indexer: RequestIndexer) =>
  7. indexer.add(createIndexRequest(element)))

Python

Elasticsearch 6:

  1. env = StreamExecutionEnvironment.get_execution_environment()
  2. # 每 5000 毫秒执行一次 checkpoint
  3. env.enable_checkpointing(5000)
  4. sink_builder = Elasticsearch6SinkBuilder() \
  5. .set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE) \
  6. .set_emitter(ElasticsearchEmitter.static_index('foo', 'id', 'bar')) \
  7. .set_hosts(['localhost:9200'])

Elasticsearch 7:

  1. env = StreamExecutionEnvironment.get_execution_environment()
  2. # 每 5000 毫秒执行一次 checkpoint
  3. env.enable_checkpointing(5000)
  4. sink_builder = Elasticsearch7SinkBuilder() \
  5. .set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE) \
  6. .set_emitter(ElasticsearchEmitter.static_index('foo', 'id')) \
  7. .set_hosts(['localhost:9200'])

Using UpdateRequests with deterministic ids and the upsert method it is possible to achieve exactly-once semantics in Elasticsearch when AT_LEAST_ONCE delivery is configured for the connector.

处理失败的 Elasticsearch 请求

Elasticsearch 操作请求可能由于多种原因而失败,包括节点队列容量暂时已满或者要被索引的文档格式错误。 Flink Elasticsearch Sink 允许用户通过通过指定一个退避策略来重试请求。

下面是一个例子:

Java

Elasticsearch 6:

  1. DataStream<String> input = ...;
  2. input.sinkTo(
  3. new Elasticsearch6SinkBuilder<String>()
  4. .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
  5. .setEmitter(
  6. (element, context, indexer) ->
  7. indexer.add(createIndexRequest(element)))
  8. // 这里启用了一个指数退避重试策略,初始延迟为 1000 毫秒且最大重试次数为 5
  9. .setBulkFlushBackoffStrategy(FlushBackoffType.EXPONENTIAL, 5, 1000)
  10. .build());

Elasticsearch 7:

  1. DataStream<String> input = ...;
  2. input.sinkTo(
  3. new Elasticsearch7SinkBuilder<String>()
  4. .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
  5. .setEmitter(
  6. (element, context, indexer) ->
  7. indexer.add(createIndexRequest(element)))
  8. // 这里启用了一个指数退避重试策略,初始延迟为 1000 毫秒且最大重试次数为 5
  9. .setBulkFlushBackoffStrategy(FlushBackoffType.EXPONENTIAL, 5, 1000)
  10. .build());

Scala

Elasticsearch 6:

  1. val input: DataStream[String] = ...
  2. input.sinkTo(
  3. new Elasticsearch6SinkBuilder[String]
  4. .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
  5. .setEmitter((element: String, context: SinkWriter.Context, indexer: RequestIndexer) =>
  6. indexer.add(createIndexRequest(element)))
  7. // 这里启用了一个指数退避重试策略,初始延迟为 1000 毫秒且最大重试次数为 5
  8. .setBulkFlushBackoffStrategy(FlushBackoffType.EXPONENTIAL, 5, 1000)
  9. .build())

Elasticsearch 7:

  1. val input: DataStream[String] = ...
  2. input.sinkTo(
  3. new Elasticsearch7SinkBuilder[String]
  4. .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
  5. .setEmitter((element: String, context: SinkWriter.Context, indexer: RequestIndexer) =>
  6. indexer.add(createIndexRequest(element)))
  7. // 这里启用了一个指数退避重试策略,初始延迟为 1000 毫秒且最大重试次数为 5
  8. .setBulkFlushBackoffStrategy(FlushBackoffType.EXPONENTIAL, 5, 1000)
  9. .build())

Python

Elasticsearch 6:

  1. input = ...
  2. # 这里启用了一个指数退避重试策略,初始延迟为 1000 毫秒且最大重试次数为 5
  3. es_sink = Elasticsearch6SinkBuilder() \
  4. .set_bulk_flush_backoff_strategy(FlushBackoffType.CONSTANT, 5, 1000) \
  5. .set_emitter(ElasticsearchEmitter.static_index('foo', 'id', 'bar')) \
  6. .set_hosts(['localhost:9200']) \
  7. .build()
  8. input.sink_to(es_sink).name('es6 sink')

Elasticsearch 7:

  1. input = ...
  2. # 这里启用了一个指数退避重试策略,初始延迟为 1000 毫秒且最大重试次数为 5
  3. es7_sink = Elasticsearch7SinkBuilder() \
  4. .set_bulk_flush_backoff_strategy(FlushBackoffType.EXPONENTIAL, 5, 1000) \
  5. .set_emitter(ElasticsearchEmitter.static_index('foo', 'id')) \
  6. .set_hosts(['localhost:9200']) \
  7. .build()
  8. input.sink_to(es7_sink).name('es7 sink')

上面的示例 sink 重新添加由于资源受限(例如:队列容量已满)而失败的请求。对于其它类型的故障,例如文档格式错误,sink 将会失败。 如若未设置 BulkFlushBackoffStrategy (或者 FlushBackoffType.NONE),那么任何类型的错误都会导致 sink 失败。

重要提示:在失败时将请求重新添加回内部 BulkProcessor 会导致更长的 checkpoint,因为在进行 checkpoint 时,sink 还需要等待重新添加的请求被刷新。 例如,当使用 FlushBackoffType.EXPONENTIAL 时, checkpoint 会进行等待,直到 Elasticsearch 节点队列有足够的容量来处理所有挂起的请求,或者达到最大重试次数。

配置内部批量处理器

通过使用以下在 Elasticsearch6SinkBuilder 中提供的方法,可以进一步配置内部的 BulkProcessor 关于其如何刷新缓存操作请求的行为:

  • setBulkFlushMaxActions(int numMaxActions):刷新前最大缓存的操作数。
  • setBulkFlushMaxSizeMb(int maxSizeMb):刷新前最大缓存的数据量(以兆字节为单位)。
  • setBulkFlushInterval(long intervalMillis):刷新的时间间隔(不论缓存操作的数量或大小如何)。

还支持配置如何对暂时性请求错误进行重试:

  • setBulkFlushBackoffStrategy(FlushBackoffType flushBackoffType, int maxRetries, long delayMillis):退避延迟的类型,CONSTANT 或者 EXPONENTIAL,退避重试次数,退避重试的时间间隔。 对于常量延迟来说,此值是每次重试间的间隔。对于指数延迟来说,此值是延迟的初始值。

可以在此文档找到 Elasticsearch 的更多信息。

将 Elasticsearch 连接器打包到 Uber-Jar 中

建议构建一个包含所有依赖的 uber-jar (可执行的 jar),以便更好地执行你的 Flink 程序。 (更多信息参见此文档)。

或者,你可以将连接器的 jar 文件放入 Flink 的 lib/ 目录下,使其在全局范围内可用,即可用于所有的作业。