管理状态的自定义序列化
如果您的应用程序使用Flink的托管状态,则可能需要为特殊用例实现自定义序列化逻辑。
此页面的目标是需要对其状态使用自定义序列化的用户,包括如何提供自定义序列化程序以及如何处理序列化程序的升级以实现兼容性。如果您只是使用Flink自己的序列化程序,则此页面无关紧要,可以跳过。
使用自定义序列化器
如上例所示,当注册托管算子或被Keys化状态时,StateDescriptor
需要指定状态名称以及有关状态类型的信息。Flink的类型序列化框架使用类型信息为状态创建适当的序列化程序。
也可以完全绕过这个并让Flink使用您自己的自定义序列化程序来序列化托管状态,只需直接StateDescriptor
使用您自己的TypeSerializer
实现实例化:
public class CustomTypeSerializer extends TypeSerializer<Tuple2<String, Integer>> {...};
ListStateDescriptor<Tuple2<String, Integer>> descriptor =
new ListStateDescriptor<>(
"state-name",
new CustomTypeSerializer());
checkpointedState = getRuntimeContext().getListState(descriptor);
class CustomTypeSerializer extends TypeSerializer[(String, Integer)] {...}
val descriptor = new ListStateDescriptor[(String, Integer)](
"state-name",
new CustomTypeSerializer)
)
checkpointedState = getRuntimeContext.getListState(descriptor)
请注意,Flink将状态序列化程序与状态一起写为元数据。在某些还原的情况下(请参阅以下小节),需要对已编写的序列化程序进行反序列化和使用。因此,建议避免使用匿名类作为状态序列化程序。匿名类对生成的类名没有保证,它在编译器之间有所不同,并且取决于它们在封闭类中实例化的顺序,这很容易导致先前编写的序列化程序不可读(因为原始类不再是在classpath中找到)。
处理序列化程序升级和兼容性
Flink允许更改用于读取和写入托管状态的序列化程序,以便用户不会被锁定到任何特定的序列化。恢复状态后,将检查为状态注册的新序列化程序(即,StateDescriptor
用于访问还原作业中的状态的序列化程序)的兼容性,并将其替换为状态的新序列化程序。
兼容的串行器意味着串行器能够读取状态的先前序列化字节,并且状态的写入二进制格式也保持相同。通过以下两种TypeSerializer
接口方法提供了检查新序列化程序兼容性的方法:
public abstract TypeSerializerConfigSnapshot snapshotConfiguration();
public abstract CompatibilityResult ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot);
简而言之,每次执行检查点时,都会snapshotConfiguration
调用该方法来创建状态序列化程序配置的时间点视图。返回的配置SNAPSHOT与检查点一起存储为状态的元数据。使用检查点还原作业时,将通过对应方法将该序列化程序配置SNAPSHOT提供给相同状态的新序列化程序ensureCompatibility
,以验证新序列化程序的兼容性。此方法用于检查新序列化程序是否兼容,以及在不兼容的情况下可能重新配置新序列化程序的钩子。
请注意,Flink自己的序列化程序实现时至少与它们自身兼容,即当同一个序列化程序用于恢复作业中的状态时,序列化程序将重新配置它们以与其先前的配置兼容。
以下小节说明了使用自定义序列化程序时实现这两种方法的准则。
实现snapshotConfiguration方法
序列化程序的配置SNAPSHOT应该捕获足够的信息,以便在恢复时,传输到状态的新串行器的信息足以确定它是否兼容。这通常可以包含有关序列化程序参数的信息或序列化数据的二进制格式; 通常,允许新的串行器决定是否可以用它来读取先前的序列化字节,以及它以相同的二进制格式写入的任何内容。
如何将串行器的配置SNAPSHOT写入检查点并从检查点读取,这是完全可自定义的。以下是所有序列化程序配置SNAPSHOT实现的基类,TypeSerializerConfigSnapshot
。
public abstract TypeSerializerConfigSnapshot extends VersionedIOReadableWritable {
public abstract int getVersion();
public void read(DataInputView in) {...}
public void write(DataOutputView out) {...}
}
该read
和write
方法定义如何配置从读取和写入检查点。基本实现包含用于读取和写入配置SNAPSHOT版本的逻辑,因此应该对其进行扩展而不是完全覆盖。
配置SNAPSHOT的版本通过该getVersion
方法确定。串行器配置SNAPSHOT的版本控制是维护兼容配置的方法,因为配置中包含的信息可能会随时间而变化。默认情况下,配置SNAPSHOT仅与当前版本兼容(由返回getVersion
)。要指示配置与其他版本兼容,请覆盖该getCompatibleVersions
方法以返回更多版本值。从检查点读取时,可以使用该getReadVersion
方法确定写入配置的版本,并使读取逻辑适应特定版本。
注意串行的配置SNAPSHOT的版本是不相关的升级序列化。完全相同的序列化程序可以具有其配置SNAPSHOT的不同实现,例如,当将更多信息添加到配置中以允许将来进行更全面的兼容性检查时。
实现a的一个限制TypeSerializerConfigSnapshot
是必须存在空构造函数。从检查点读取配置SNAPSHOT时,需要空构造函数。
实现ensureCompatibility方法
该ensureCompatibility
方法应包含逻辑,该逻辑执行检查通过提供的有关前一个序列化程序的信息TypeSerializerConfigSnapshot
,基本上执行以下 算子操作之一:
检查串行器是否兼容,同时可能重新配置(如果需要)以使其兼容。然后,与Flink确认串行器兼容。
确认序列化程序不兼容,并且在Flink继续使用新的序列化程序之前需要进行状态迁移。
通过从ensureCompatibility
方法中返回以下其中一项,可以将上述情况转换为代码:
CompatibilityResult.compatible()
:这确认新的串行器兼容,或者已经重新配置为兼容,并且Flink可以继续使用串行器的工作。CompatibilityResult.requiresMigration()
:这确认串行器不兼容,或者无法重新配置为兼容,并且在使用新的串行器之前需要状态迁移。通过使用先前的串行器将读取的状态字节读取到对象,然后使用新的序列化器再次序列化来执行状态迁移。CompatibilityResult.requiresMigration(TypeDeserializer deserializer)
:此确认具有与之相同的语义CompatibilityResult.requiresMigration()
,但是如果无法找到或加载先前的序列化程序以读取迁移的已还原状态字节,则提供的TypeDeserializer
可用作回退手段。
注意目前,从Flink 1.3开始,如果兼容性检查的结果确认需要执行状态迁移,则作业无法从检查点恢复,因为状态迁移当前不可用。将来的版本中将引入迁移状态的能力。
管理TypeSerializer和TypeSerializerConfigSnapshot用户代码中的类
由于TypeSerializer
s和TypeSerializerConfigSnapshot
s与状态值一起作为检查点的一部分写入,因此类路径中类的可用性可能会影响还原行为。
TypeSerializer
使用Java Object Serialization将s直接写入检查点。在新的串行器确认它不兼容并且需要状态迁移的情况下,将需要存在以便能够读取恢复的状态字节。因此,如果原始序列化程序类serialVersionUID
由于状态的序列化程序升级而不再存在或已被修改(导致不同),则还原将无法继续。此要求的替代方法是TypeDeserializer
在确认需要使用状态迁移时提供回退CompatibilityResult.requiresMigration(TypeDeserializer deserializer)
。
TypeSerializerConfigSnapshot
恢复的检查点中的s 类必须存在于类路径中,因为它们是对升级的序列化程序进行兼容性检查的基本组件,如果该类不存在则无法恢复。由于使用自定义序列化将配置SNAPSHOT写入检查点,因此只要使用版本控制机制处理配置更改的兼容性,就可以自由更改类的实现TypeSerializerConfigSnapshot
。