Configuration
By default, the Table & SQL API is preconfigured for producing accurate results with acceptable performance.
Depending on the requirements of a table program, it might be necessary to adjust certain parameters for optimization. For example, unbounded streaming programs may need to ensure that the required state size is capped (see streaming concepts).
Overview
When instantiating a TableEnvironment
, EnviromentSettings
can be used to pass the desired configuration for the current session, by passing a Configuration
object to the EnviromentSettings
.
Additionally, in every table environment, the TableConfig
offers options for configuring the current session.
For common or important configuration options, the TableConfig
provides getters and setters methods with detailed inline documentation.
For more advanced configuration, users can directly access the underlying key-value map. The following sections list all available options that can be used to adjust Flink Table & SQL API programs.
Attention Because options are read at different point in time when performing operations, it is recommended to set configuration options early after instantiating a table environment.
Java
// instantiate table environment
Configuration configuration = new Configuration();
// set low-level key-value options
configuration.setString("table.exec.mini-batch.enabled", "true");
configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
configuration.setString("table.exec.mini-batch.size", "5000");
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.inStreamingMode().withConfiguration(configuration).build();
TableEnvironment tEnv = TableEnvironment.create(settings);
// access flink configuration after table environment instantiation
TableConfig tableConfig = tEnv.getConfig();
// set low-level key-value options
tableConfig.set("table.exec.mini-batch.enabled", "true");
tableConfig.set("table.exec.mini-batch.allow-latency", "5 s");
tableConfig.set("table.exec.mini-batch.size", "5000");
Scala
// instantiate table environment
val configuration = new Configuration;
// set low-level key-value options
configuration.setString("table.exec.mini-batch.enabled", "true")
configuration.setString("table.exec.mini-batch.allow-latency", "5 s")
configuration.setString("table.exec.mini-batch.size", "5000")
val settings = EnvironmentSettings.newInstance
.inStreamingMode.withConfiguration(configuration).build
val tEnv: TableEnvironment = TableEnvironment.create(settings)
// access flink configuration after table environment instantiation
val tableConfig = tEnv.getConfig()
// set low-level key-value options
tableConfig.set("table.exec.mini-batch.enabled", "true")
tableConfig.set("table.exec.mini-batch.allow-latency", "5 s")
tableConfig.set("table.exec.mini-batch.size", "5000")
Python
# instantiate table environment
configuration = Configuration()
configuration.set("table.exec.mini-batch.enabled", "true")
configuration.set("table.exec.mini-batch.allow-latency", "5 s")
configuration.set("table.exec.mini-batch.size", "5000")
settings = EnvironmentSettings.new_instance() \
... .in_streaming_mode() \
... .with_configuration(configuration) \
... .build()
t_env = TableEnvironment.create(settings)
# access flink configuration after table environment instantiation
table_config = t_env.get_config()
# set low-level key-value options
table_config.set("table.exec.mini-batch.enabled", "true")
table_config.set("table.exec.mini-batch.allow-latency", "5 s")
table_config.set("table.exec.mini-batch.size", "5000")
SQL CLI
Flink SQL> SET 'table.exec.mini-batch.enabled' = 'true';
Flink SQL> SET 'table.exec.mini-batch.allow-latency' = '5s';
Flink SQL> SET 'table.exec.mini-batch.size' = '5000';
Note: All of the following configuration options can also be set globally in
conf/flink-conf.yaml
(see configuration and can be later on overridden in the application, throughEnvironmentSettings
, before instantiating theTableEnvironment
, or through theTableConfig
of theTableEnvironment
.
Execution Options
The following options can be used to tune the performance of the query execution.
Key | Default | Type | Description |
---|---|---|---|
table.exec.async-lookup.buffer-capacityBatch Streaming | 100 | Integer | The max number of async i/o operation that the async lookup join can trigger. |
table.exec.async-lookup.timeoutBatch Streaming | 3 min | Duration | The async timeout for the asynchronous operation to complete. |
table.exec.deduplicate.insert-update-after-sensitive-enabledStreaming | true | Boolean | Set whether the job (especially the sinks) is sensitive to INSERT messages and UPDATE_AFTER messages. If false, Flink may, sometimes (e.g. deduplication for last row), send UPDATE_AFTER instead of INSERT for the first row. If true, Flink will guarantee to send INSERT for the first row, in that case there will be additional overhead. Default is true. |
table.exec.deduplicate.mini-batch.compact-changes-enabledStreaming | false | Boolean | Set whether to compact the changes sent downstream in row-time mini-batch. If true, Flink will compact changes and send only the latest change downstream. Note that if the downstream needs the details of versioned data, this optimization cannot be applied. If false, Flink will send all changes to downstream just like when the mini-batch is not enabled. |
table.exec.disabled-operatorsBatch | (none) | String | Mainly for testing. A comma-separated list of operator names, each name represents a kind of disabled operator. Operators that can be disabled include “NestedLoopJoin”, “ShuffleHashJoin”, “BroadcastHashJoin”, “SortMergeJoin”, “HashAgg”, “SortAgg”. By default no operator is disabled. |
table.exec.legacy-cast-behaviourBatch Streaming | DISABLED | Enum | Determines whether CAST will operate following the legacy behaviour or the new one that introduces various fixes and improvements. Possible values:
|
table.exec.mini-batch.allow-latencyStreaming | 0 ms | Duration | The maximum latency can be used for MiniBatch to buffer input records. MiniBatch is an optimization to buffer input records to reduce state access. MiniBatch is triggered with the allowed latency interval and when the maximum number of buffered records reached. NOTE: If table.exec.mini-batch.enabled is set true, its value must be greater than zero. |
table.exec.mini-batch.enabledStreaming | false | Boolean | Specifies whether to enable MiniBatch optimization. MiniBatch is an optimization to buffer input records to reduce state access. This is disabled by default. To enable this, users should set this config to true. NOTE: If mini-batch is enabled, ‘table.exec.mini-batch.allow-latency’ and ‘table.exec.mini-batch.size’ must be set. |
table.exec.mini-batch.sizeStreaming | -1 | Long | The maximum number of input records can be buffered for MiniBatch. MiniBatch is an optimization to buffer input records to reduce state access. MiniBatch is triggered with the allowed latency interval and when the maximum number of buffered records reached. NOTE: MiniBatch only works for non-windowed aggregations currently. If table.exec.mini-batch.enabled is set true, its value must be positive. |
table.exec.rank.topn-cache-sizeStreaming | 10000 | Long | Rank operators have a cache which caches partial state contents to reduce state access. Cache size is the number of records in each ranking task. |
table.exec.resource.default-parallelismBatch Streaming | -1 | Integer | Sets default parallelism for all operators (such as aggregate, join, filter) to run with parallel instances. This config has a higher priority than parallelism of StreamExecutionEnvironment (actually, this config overrides the parallelism of StreamExecutionEnvironment). A value of -1 indicates that no default parallelism is set, then it will fallback to use the parallelism of StreamExecutionEnvironment. |
table.exec.simplify-operator-name-enabledBatch Streaming | true | Boolean | When it is true, the optimizer will simplify the operator name with id and type of ExecNode and keep detail in description. Default value is true. |
table.exec.sink.keyed-shuffleStreaming | AUTO | Enum | In order to minimize the distributed disorder problem when writing data into table with primary keys that many users suffers. FLINK will auto add a keyed shuffle by default when the sink’s parallelism differs from upstream operator and upstream is append only. This works only when the upstream ensures the multi-records’ order on the primary key, if not, the added shuffle can not solve the problem (In this situation, a more proper way is to consider the deduplicate operation for the source firstly or use an upsert source with primary key definition which truly reflect the records evolution). By default, the keyed shuffle will be added when the sink’s parallelism differs from upstream operator. You can set to no shuffle(NONE) or force shuffle(FORCE). Possible values:
|
table.exec.sink.not-null-enforcerBatch Streaming | ERROR | Enum | Determines how Flink enforces NOT NULL column constraints when inserting null values. Possible values:
|
table.exec.sink.type-length-enforcerBatch Streaming | IGNORE | Enum | Determines whether values for columns with CHAR(<length>)/VARCHAR(<length>)/BINARY(<length>)/VARBINARY(<length>) types will be trimmed or padded (only for CHAR(<length>)/BINARY(<length>)), so that their length will match the one defined by the length of their respective CHAR/VARCHAR/BINARY/VARBINARY column type. Possible values:
|
table.exec.sink.upsert-materializeStreaming | AUTO | Enum | Because of the disorder of ChangeLog data caused by Shuffle in distributed system, the data received by Sink may not be the order of global upsert. So add upsert materialize operator before upsert sink. It receives the upstream changelog records and generate an upsert view for the downstream. By default, the materialize operator will be added when a distributed disorder occurs on unique keys. You can also choose no materialization(NONE) or force materialization(FORCE). Possible values:
|
table.exec.sort.async-merge-enabledBatch | true | Boolean | Whether to asynchronously merge sorted spill files. |
table.exec.sort.default-limitBatch | -1 | Integer | Default limit when user don’t set a limit after order by. -1 indicates that this configuration is ignored. |
table.exec.sort.max-num-file-handlesBatch | 128 | Integer | The maximal fan-in for external merge sort. It limits the number of file handles per operator. If it is too small, may cause intermediate merging. But if it is too large, it will cause too many files opened at the same time, consume memory and lead to random reading. |
table.exec.source.cdc-events-duplicateStreaming | false | Boolean | Indicates whether the CDC (Change Data Capture) sources in the job will produce duplicate change events that requires the framework to deduplicate and get consistent result. CDC source refers to the source that produces full change events, including INSERT/UPDATE_BEFORE/UPDATE_AFTER/DELETE, for example Kafka source with Debezium format. The value of this configuration is false by default. However, it’s a common case that there are duplicate change events. Because usually the CDC tools (e.g. Debezium) work in at-least-once delivery when failover happens. Thus, in the abnormal situations Debezium may deliver duplicate change events to Kafka and Flink will get the duplicate events. This may cause Flink query to get wrong results or unexpected exceptions. Therefore, it is recommended to turn on this configuration if your CDC tool is at-least-once delivery. Enabling this configuration requires to define PRIMARY KEY on the CDC sources. The primary key will be used to deduplicate change events and generate normalized changelog stream at the cost of an additional stateful operator. |
table.exec.source.idle-timeoutStreaming | 0 ms | Duration | When a source do not receive any elements for the timeout time, it will be marked as temporarily idle. This allows downstream tasks to advance their watermarks without the need to wait for watermarks from this source while it is idle. Default value is 0, which means detecting source idleness is not enabled. |
table.exec.spill-compression.block-sizeBatch | 64 kb | MemorySize | The memory size used to do compress when spilling data. The larger the memory, the higher the compression ratio, but more memory resource will be consumed by the job. |
table.exec.spill-compression.enabledBatch | true | Boolean | Whether to compress spilled data. Currently we only support compress spilled data for sort and hash-agg and hash-join operators. |
table.exec.state.ttlStreaming | 0 ms | Duration | Specifies a minimum time interval for how long idle state (i.e. state which was not updated), will be retained. State will never be cleared until it was idle for less than the minimum time, and will be cleared at some time after it was idle. Default is never clean-up the state. NOTE: Cleaning up state requires additional overhead for bookkeeping. Default value is 0, which means that it will never clean up state. |
table.exec.window-agg.buffer-size-limitBatch | 100000 | Integer | Sets the window elements buffer size limit used in group window agg operator. |
Optimizer Options
The following options can be used to adjust the behavior of the query optimizer to get a better execution plan.
Key | Default | Type | Description |
---|---|---|---|
table.optimizer.agg-phase-strategyBatch Streaming | “AUTO” | String | Strategy for aggregate phase. Only AUTO, TWO_PHASE or ONE_PHASE can be set. AUTO: No special enforcer for aggregate stage. Whether to choose two stage aggregate or one stage aggregate depends on cost. TWO_PHASE: Enforce to use two stage aggregate which has localAggregate and globalAggregate. Note that if aggregate call does not support optimize into two phase, we will still use one stage aggregate. ONE_PHASE: Enforce to use one stage aggregate which only has CompleteGlobalAggregate. |
table.optimizer.distinct-agg.split.bucket-numStreaming | 1024 | Integer | Configure the number of buckets when splitting distinct aggregation. The number is used in the first level aggregation to calculate a bucket key ‘hash_code(distinct_key) % BUCKET_NUM’ which is used as an additional group key after splitting. |
table.optimizer.distinct-agg.split.enabledStreaming | false | Boolean | Tells the optimizer whether to split distinct aggregation (e.g. COUNT(DISTINCT col), SUM(DISTINCT col)) into two level. The first aggregation is shuffled by an additional key which is calculated using the hashcode of distinct_key and number of buckets. This optimization is very useful when there is data skew in distinct aggregation and gives the ability to scale-up the job. Default is false. |
table.optimizer.join-reorder-enabledBatch Streaming | false | Boolean | Enables join reorder in optimizer. Default is disabled. |
table.optimizer.join.broadcast-thresholdBatch | 1048576 | Long | Configures the maximum size in bytes for a table that will be broadcast to all worker nodes when performing a join. By setting this value to -1 to disable broadcasting. |
table.optimizer.multiple-input-enabledBatch | true | Boolean | When it is true, the optimizer will merge the operators with pipelined shuffling into a multiple input operator to reduce shuffling and improve performance. Default value is true. |
table.optimizer.reuse-source-enabledBatch Streaming | true | Boolean | When it is true, the optimizer will try to find out duplicated table sources and reuse them. This works only when table.optimizer.reuse-sub-plan-enabled is true. |
table.optimizer.reuse-sub-plan-enabledBatch Streaming | true | Boolean | When it is true, the optimizer will try to find out duplicated sub-plans and reuse them. |
table.optimizer.source.aggregate-pushdown-enabledBatch | true | Boolean | When it is true, the optimizer will push down the local aggregates into the TableSource which implements SupportsAggregatePushDown. |
table.optimizer.source.predicate-pushdown-enabledBatch Streaming | true | Boolean | When it is true, the optimizer will push down predicates into the FilterableTableSource. Default value is true. |
Table Options
The following options can be used to adjust the behavior of the table planner.
Key | Default | Type | Description |
---|---|---|---|
table.builtin-catalog-nameBatch Streaming | “default_catalog” | String | The name of the initial catalog to be created when instantiating a TableEnvironment. |
table.builtin-database-nameBatch Streaming | “default_database” | String | The name of the default database in the initial catalog to be created when instantiating TableEnvironment. |
table.dml-syncBatch Streaming | false | Boolean | Specifies if the DML job (i.e. the insert operation) is executed asynchronously or synchronously. By default, the execution is async, so you can submit multiple DML jobs at the same time. If set this option to true, the insert operation will wait for the job to finish. |
table.dynamic-table-options.enabledBatch Streaming | true | Boolean | Enable or disable the OPTIONS hint used to specify table options dynamically, if disabled, an exception would be thrown if any OPTIONS hint is specified |
table.generated-code.max-lengthBatch Streaming | 4000 | Integer | Specifies a threshold where generated code will be split into sub-function calls. Java has a maximum method length of 64 KB. This setting allows for finer granularity if necessary. Default value is 4000 instead of 64KB as by default JIT refuses to work on methods with more than 8K byte code. |
table.local-time-zoneBatch Streaming | “default” | String | The local time zone defines current session time zone id. It is used when converting to/from <code>TIMESTAMP WITH LOCAL TIME ZONE</code>. Internally, timestamps with local time zone are always represented in the UTC time zone. However, when converting to data types that don’t include a time zone (e.g. TIMESTAMP, TIME, or simply STRING), the session time zone is used during conversion. The input of option is either a full name such as “America/Los_Angeles”, or a custom timezone id such as “GMT-08:00”. |
table.plan.compile.catalog-objectsBatch Streaming | ALL | Enum | Strategy how to persist catalog objects such as tables, functions, or data types into a plan during compilation. It influences the need for catalog metadata to be present during a restore operation and affects the plan size. This configuration option does not affect anonymous/inline or temporary objects. Anonymous/inline objects will be persisted entirely (including schema and options) if possible or fail the compilation otherwise. Temporary objects will be persisted only by their identifier and the object needs to be present in the session context during a restore. Possible values:
|
table.plan.force-recompileStreaming | false | Boolean | When false COMPILE PLAN statement will fail if the output plan file is already existing, unless the clause IF NOT EXISTS is used. When true COMPILE PLAN will overwrite the existing output plan file. We strongly suggest to enable this flag only for debugging purpose. |
table.plan.restore.catalog-objectsBatch Streaming | ALL | Enum | Strategy how to restore catalog objects such as tables, functions, or data types using a given plan and performing catalog lookups if necessary. It influences the need for catalog metadata to bepresent and enables partial enrichment of plan information. Possible values:
|
table.sql-dialectBatch Streaming | “default” | String | The SQL dialect defines how to parse a SQL query. A different SQL dialect may support different SQL grammar. Currently supported dialects are: default and hive |
SQL Client Options
The following options can be used to adjust the behavior of the sql client.
Key | Default | Type | Description |
---|---|---|---|
sql-client.display.max-column-widthStreaming | 30 | Integer | When printing the query results, this parameter determines the number of characters shown on screen before truncating.This only applies to columns with variable-length types (e.g. STRING) in streaming mode.Fixed-length types and all types in batch mode are printed using a deterministic column width |
sql-client.execution.max-table-result.rowsBatch Streaming | 1000000 | Integer | The number of rows to cache when in the table mode. If the number of rows exceeds the specified value, it retries the row in the FIFO style. |
sql-client.execution.result-modeBatch Streaming | TABLE | Enum | Determines how the query result should be displayed. Possible values:
|
sql-client.verboseBatch Streaming | false | Boolean | Determine whether to output the verbose output to the console. If set the option true, it will print the exception stack. Otherwise, it only output the cause. |