执行配置
该StreamExecutionEnvironment
包含ExecutionConfig
允许为运行时设置工作的具体配置值。要更改影响所有作业的默认值,请参阅配置。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
ExecutionConfig executionConfig = env.getConfig();
val env = StreamExecutionEnvironment.getExecutionEnvironment
var executionConfig = env.getConfig
可以使用以下配置选项:(默认为粗体)
enableClosureCleaner()
/disableClosureCleaner()
。默认情况下启用闭包清理器。闭包清理器删除Flink程序中对周围类匿名函数的不需要的引用。禁用闭包清除程序后,可能会发生匿名用户函数引用周围的类(通常不是Serializable)。这将导致序列化程序出现异常。getParallelism()
/setParallelism(int parallelism)
设置作业的默认并行度。getMaxParallelism()
/setMaxParallelism(int parallelism)
设置作业的默认最大并行度。此设置确定最大并行度并指定动态缩放的上限。getNumberOfExecutionRetries()
/setNumberOfExecutionRetries(int numberOfExecutionRetries)
设置重新执行失败任务的次数。值为零可有效禁用容错。值为-1
表示应使用系统默认值(在配置中定义)。这已弃用,请改用重启策略。getExecutionRetryDelay()
/setExecutionRetryDelay(long executionRetryDelay)
设置在重新执行作业之前系统在作业失败后等待的延迟(以毫秒为单位)。在TaskManagers上成功停止所有任务后,延迟开始,一旦延迟过去,任务就会重新启动。此参数对于延迟重新执行非常有用,以便在尝试重新执行之前让某些超时相关故障完全浮出水面(例如尚未完全超时的断开连接),并且由于同样的问题而再次立即失败。仅当执行重试次数为一次或多次时,此参数才有效。这已弃用,请改用重启策略。getExecutionMode()
/setExecutionMode()
。默认执行模式为PIPELINED。设置执行模式以执行程序。执行模式定义数据交换是以批处理还是以流水线方式执行。enableForceKryo()
/disableForceKryo
。Kryo默认不会被迫。强制GenericTypeInformation将Pryo序列化程序用于POJO,即使我们可以将它们分析为POJO。在某些情况下,这可能更可取。例如,当Flink的内部序列化程序无法正确处理POJO时。enableForceAvro()
/disableForceAvro()
。默认情况下不会强制使用Avro。强制Flink AvroTypeInformation使用Avro序列化程序而不是Kryo来序列化Avro POJO。enableObjectReuse()
/disableObjectReuse()
默认情况下,对象不会在Flink中重复使用。启用对象重用模式将指示运行时重用用户对象以获得更好的性能。请记住,当 算子操作的用户代码函数不知道此行为时,这可能会导致错误。enableSysoutLogging()
/disableSysoutLogging()
JobManager状态更新System.out
默认打印到。此设置允许禁用此行为。getGlobalJobParameters()
/setGlobalJobParameters()
此方法允许用户将自定义对象设置为作业的全局配置。由于ExecutionConfig
可以在所有用户定义的函数中访问,因此这是一种在作业中全局可用的配置的简单方法。addDefaultKryoSerializer(Class<?> type, Serializer<?> serializer)
为给定的注册Kryo序列化程序实例type
。addDefaultKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass)
为给定的注册Kryo序列化程序类type
。registerTypeWithKryoSerializer(Class<?> type, Serializer<?> serializer)
使用Kryo注册给定类型并为其指定序列化程序。通过使用Kryo注册类型,类型的序列化将更加高效。registerKryoType(Class<?> type)
如果类型最终被Kryo序列化,那么它将在Kryo注册以确保只写入标签(整数ID)。如果某个类型未在Kryo中注册,则其整个类名将与每个实例序列化,从而导致更高的I / O成本。registerPojoType(Class<?> type)
使用序列化堆栈注册给定类型。如果类型最终被序列化为POJO,则该类型将在POJO序列化程序中注册。如果类型最终被Kryo序列化,那么它将在Kryo注册以确保只写入标签。如果某个类型未在Kryo中注册,则其整个类名将与每个实例序列化,从而导致更高的I / O成本。
请注意,注册的类型registerKryoType()
不适用于Flink的Kryo序列化程序实例。
disableAutoTypeRegistration()
默认情况下启用自动类型注册。自动类型注册是使用Kryo和POJO序列化器注册用户代码使用的所有类型(包括子类型)。setTaskCancellationInterval(long interval)
设置在连续尝试取消正在运行的任务之间等待的间隔(以毫秒为单位)。取消interrupt()
任务时,如果任务线程未在特定时间内终止,则创建新线程,该线程定期调用任务线程。此参数是指连续呼叫之间的时间interrupt()
,默认设置为30000毫秒或30秒。
的RuntimeContext
是在可访问的Rich*
通过的函数的getRuntimeContext()
方法还允许访问ExecutionConfig
中的所有用户定义的函数。