可查询状态Beta
注意:可查询状态的客户端API当前处于不断发展的状态,并且不保证所提供接口的稳定性。在即将推出的Flink版本中,客户端可能会出现API更改。
简而言之,此函数将Flink的托管Keys(分区)状态(请参阅使用状态)暴露给外部世界,并允许用户从Flink外部查询作业的状态。对于某些情况,可查询状态消除了对外部系统(例如键值存储)的分布式 算子操作/事务的需要,这通常是实践中的瓶颈。此外,此函数对于调试目的可能特别有用。
注意:查询状态对象时,无需任何同步或复制即可从并发线程访问该对象。这是一种设计选择,因为上述任何一种都会导致增加的作业延迟,我们希望避免这种情况。由于使用Java堆空间的任何状态后台(例如 MemoryStateBackend
或)FsStateBackend
在检索值时不能与副本一起使用,而是直接引用存储的值,因此读取 - 修改 - 写入模式是不安全的,并且可能导致可查询状态服务器由于并发修改而失败。该RocksDBStateBackend
从这些问题的安全。
建筑
在展示如何使用可查询状态之前,简要描述组成它的实体是很有用的。可查询状态函数包含三个主要实体:
- 的
QueryableStateClient
,所述Flink群集外,其(可能)运行并提交用户查询, - 的
QueryableStateClientProxy
,它运行在每个TaskManager
(即在Flink集群内),并负责接收客户的疑问,代表他取出由负责TaskManager所请求的状态,并返回给客户端, - 在
QueryableStateServer
它运行在每个TaskManager
并负责提供本地存储的状态。客户端连接到其中一个代理,并发送与特定Keys关联的状态的请求k
。如“ 使用状态”中所述,被Keys化状态按键组进行组织,每个键组TaskManager
都分配了许多这些键组。要发现哪个TaskManager
负责Keys组持有k
,代理将询问JobManager
。根据答案,代理将在QueryableStateServer
运行时查询TaskManager
与之关联的状态k
,并将响应转发回客户端。
激活可查询状态
要在Flink群集上启用可查询状态,只需将Flink分发flink-queryable-state-runtime_2.11-1.7-SNAPSHOT.jar
的opt/
文件夹复制到该文件夹即可。否则,未启用可查询状态函数。lib/
要验证群集是否在启用了可查询状态的情况下运行,请检查该行的任何TaskManager的日志:"Started the Queryable State Proxy Server @ …"
。
使状态可查询
现在您已在群集上激活了可查询状态,现在是时候看看如何使用它了。为了使状态对外界可见,需要使用以下方法明确查询状态:
- a
QueryableStateStream
,作为接收器的便利对象,并将其传入值作为可查询状态提供,或者 - 该
stateDescriptor.setQueryable(String queryableStateName)
方法使得由状态描述符表示的被Keys化状态可查询。以下部分解释了这两种方法的用法。
可查询状态流
调用.asQueryableState(stateName, stateDescriptor)
一个KeyedStream
回报QueryableStateStream
它提供其值可查询状态。根据状态的类型,该asQueryableState()
方法有以下变体:
// ValueState
QueryableStateStream asQueryableState(
String queryableStateName,
ValueStateDescriptor stateDescriptor)
// Shortcut for explicit ValueStateDescriptor variant
QueryableStateStream asQueryableState(String queryableStateName)
// FoldingState
QueryableStateStream asQueryableState(
String queryableStateName,
FoldingStateDescriptor stateDescriptor)
// ReducingState
QueryableStateStream asQueryableState(
String queryableStateName,
ReducingStateDescriptor stateDescriptor)
注意:没有可查询的接收ListState
器,因为它会导致不断增长的列表,这些列表可能无法清理,因此最终会消耗太多内存。
返回QueryableStateStream
可以看作是一个接收器,不能进一步转换。在内部,aQueryableStateStream
被转换为 算子,该 算子使用所有传入记录来更新可查询状态实例。更新逻辑由调用中StateDescriptor
提供的类型隐式asQueryableState
。在如下所示的程序中,被Key化的数据流的所有记录将用于通过以下方式更新状态实例ValueState.update(value)
:
stream.keyBy(0).asQueryableState("query-name")
这就像Scala API一样flatMapWithState
。
管理Keys状态
通过使相应的状态描述符可查询,可以使算子的管理被Keys化状态(请参阅使用托管被Keys化状态)可查询StateDescriptor.setQueryable(String queryableStateName)
,如下例所示:
ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
new ValueStateDescriptor<>(
"average", // the state name
TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {})); // type information
descriptor.setQueryable("query-name"); // queryable state name
注意:该queryableStateName
参数可以任意选择并且仅用于查询。它不必与状态自己的名字相同。
该变体对于哪种类型的状态可以被查询没有限制。这意味着,这可以用于任何ValueState
,ReduceState
,ListState
,MapState
,AggregatingState
,和当前已过时FoldingState
。
查询状态
到目前为止,您已将集群设置为以可查询状态运行,并且已将(某些)状态声明为可查询状态。现在是时候看看如何查询这个状态了。
为此,您可以使用QueryableStateClient
帮助程序类。这是可用的flink-queryable-state-client
,必须被明确列入作为一个依赖罐子pom.xml
项目与沿flink-core
,如下图所示:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.7-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-queryable-state-client-java_2.11</artifactId>
<version>1.7-SNAPSHOT</version>
</dependency>
有关详细信息,您可以查看如何设置Flink程序。
在QueryableStateClient
将提交查询到内部代理,然后将处理您的查询并返回最终结果。初始化客户端的唯一要求是提供有效的TaskManager
主机名(请记住,每个TaskManager上都运行可查询的状态代理)以及代理侦听的端口。有关如何在“ 配置”部分中配置代理和状态服务器端口的更多信息。
QueryableStateClient client = new QueryableStateClient(tmHostname, proxyPort);
在客户端准备好的情况下,要查询与类型V
的键相关联的类型状态K
,您可以使用以下方法:
CompletableFuture<S> getKvState(
JobID jobId,
String queryableStateName,
K key,
TypeInformation<K> keyTypeInfo,
StateDescriptor<S, V> stateDescriptor)
以上内容返回CompletableFuture
最终保存由queryableStateName
具有ID的作业标识的可查询状态实例的状态值jobID
。这key
是您感兴趣的状态的Keys,keyTypeInfo
它将告诉Flink如何序列化/反序列化它。最后,stateDescriptor
包含了请求的状态,即它的类型(必要的信息Value
,Reduce
等等),并就如何序列化/反序列化的必要信息。
细心的读者会注意到返回的future包含一个type值S
,即一个State
包含实际值的对象。这可以通过任何支持Flink状态类型:ValueState
,ReduceState
,ListState
,MapState
,AggregatingState
,和当前已过时FoldingState
。
注意:这些状态对象不允许修改包含的状态。您可以使用它们来获取状态的实际值,例如,使用valueState.get()
或迭代所包含的<K, V>
条目,例如使用mapState.entries()
,但您无法修改它们。例如,add()
在返回的列表状态上调用该方法将抛出一个UnsupportedOperationException
。
注意:客户端是异步的,可以由多个线程共享。它需要QueryableStateClient.shutdown()
在未使用时关闭才能释放资源。
例
以下示例通过使CountWindowAverage
示例可查询来扩展示例(请参阅使用托管被Keys化状态),并显示如何查询此值:
public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
private transient ValueState<Tuple2<Long, Long>> sum; // a tuple containing the count and the sum
@Override
public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {
Tuple2<Long, Long> currentSum = sum.value();
currentSum.f0 += 1;
currentSum.f1 += input.f1;
sum.update(currentSum);
if (currentSum.f0 >= 2) {
out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
sum.clear();
}
}
@Override
public void open(Configuration config) {
ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
new ValueStateDescriptor<>(
"average", // the state name
TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {})); // type information
descriptor.setQueryable("query-name");
sum = getRuntimeContext().getState(descriptor);
}
}
在作业中使用后,您可以检索作业ID,然后从该 算子查询任何键的当前状态:
QueryableStateClient client = new QueryableStateClient(tmHostname, proxyPort);
// the state descriptor of the state to be fetched.
ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
new ValueStateDescriptor<>(
"average",
TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}));
CompletableFuture<ValueState<Tuple2<Long, Long>>> resultFuture =
client.getKvState(jobId, "query-name", key, BasicTypeInfo.LONG_TYPE_INFO, descriptor);
// now handle the returned value
resultFuture.thenAccept(response -> {
try {
Tuple2<Long, Long> res = response.get();
} catch (Exception e) {
e.printStackTrace();
}
});
配置
以下配置参数会影响可查询状态服务器和客户端的行为。它们的定义是QueryableStateOptions
。
状态服务器
query.server.ports
:可查询状态服务器的服务器端口范围。如果有多个TaskManager在同一台机器上运行,这对于避免端口冲突很有用。指定的范围可以是:port:“9123”,一系列端口:“50100-50200”,或范围和/或点列表:“50100-50200,50300-50400,51234”。默认端口为9067。query.server.network-threads
:接收状态服务器传入请求的网络(事件循环)线程数(0 => #slots)query.server.query-threads
:处理/服务状态服务器的传入请求的线程数(0 => #slots)。
代理
query.proxy.ports
:可查询状态代理的服务器端口范围。如果有多个TaskManager在同一台机器上运行,这对于避免端口冲突很有用。指定的范围可以是:port:“9123”,一系列端口:“50100-50200”,或范围和/或点列表:“50100-50200,50300-50400,51234”。默认端口为9069。query.proxy.network-threads
:接收客户端代理的传入请求的网络(事件循环)线程数(0 => #slots)query.proxy.query-threads
:处理/提供客户端代理的传入请求的线程数(0 => #slots)。