Set up TaskManager Memory

The TaskManager runs user code in Flink. Configuring memory usage for your needs can greatly reduce Flink’s resource footprint and improve Job stability.

The further described memory configuration is applicable starting with the release version 1.10. If you upgrade Flink from earlier versions, check the migration guide because many changes were introduced with the 1.10 release.

This memory setup guide is relevant only for TaskManagers! The TaskManager memory components have a similar but more sophisticated structure compared to the memory model of the JobManager process.

Configure Total Memory

The total process memory of Flink JVM processes consists of memory consumed by Flink application (total Flink memory) and by the JVM to run the process. The total Flink memory consumption includes usage of JVM Heap, managed memory (managed by Flink) and other direct (or native) memory.

Simple TaskManager Memory Model

If you run Flink locally (e.g. from your IDE) without creating a cluster, then only a subset of the memory configuration options are relevant, see also local execution for more details.

Otherwise, the simplest way to setup memory for TaskManagers is to configure the total memory. A more fine-grained approach is described in more detail here.

The rest of the memory components will be adjusted automatically, based on default values or additionally configured options. See next chapters for more details about the other memory components.

Configure Heap and Managed Memory

As mentioned before in total memory description, another way to setup memory in Flink is to specify explicitly both task heap and managed memory. It gives more control over the available JVM Heap to Flink’s tasks and its managed memory.

The rest of the memory components will be adjusted automatically, based on default values or additionally configured options. Here are more details about the other memory components.

If you have configured the task heap and managed memory explicitly, it is recommended to set neither total process memory nor total Flink memory. Otherwise, it may easily lead to memory configuration conflicts.

Task (Operator) Heap Memory

If you want to guarantee that a certain amount of JVM Heap is available for your user code, you can set the task heap memory explicitly (taskmanager.memory.task.heap.size). It will be added to the JVM Heap size and will be dedicated to Flink’s operators running the user code.

Managed Memory

Managed memory is managed by Flink and is allocated as native memory (off-heap). The following workloads use managed memory:

The size of managed memory can be

Size will override fraction, if both are set. If neither size nor fraction is explicitly configured, the default fraction will be used.

See also how to configure memory for state backends and batch jobs.

Consumer Weights

If your job contains multiple types of managed memory consumers, you can also control how managed memory should be shared across these types. The configuration option taskmanager.memory.managed.consumer-weights allows you to set a weight for each type, to which Flink will reserve managed memory proportionally. Valid consumer types are:

  • OPERATOR: for built-in algorithms.
  • STATE_BACKEND: for RocksDB state backend in streaming
  • PYTHON: for Python processes.

E.g. if a streaming job uses both RocksDB state backend and Python UDFs, and the consumer weights are configured as STATE_BACKEND:70,PYTHON:30, Flink will reserve 70% of the total managed memory for RocksDB state backend and 30% for Python processes.

For each type, Flink reserves managed memory only if the job contains managed memory consumers of that type. E.g, if a streaming job uses the heap state backend and Python UDFs, and the consumer weights are configured as STATE_BACKEND:70,PYTHON:30, Flink will use all of its managed memory for Python processes, because the heap state backend does not use managed memory.

Flink will not reserve managed memory for consumer types that are not included in the consumer weights. If the missing type is actually needed by the job, it can lead to memory allocation failures. By default, all consumer types are included. This could only happen when the weights are explicitly configured/overwritten.

Configure Off-heap Memory (direct or native)

The off-heap memory which is allocated by user code should be accounted for in task off-heap memory (taskmanager.memory.task.off-heap.size).

You can also adjust the framework off-heap memory. You should only change this value if you are sure that the Flink framework needs more memory.

Flink includes the framework off-heap memory and task off-heap memory into the direct memory limit of the JVM, see also JVM parameters.

Note Although, native non-direct memory usage can be accounted for as a part of the framework off-heap memory or task off-heap memory, it will result in a higher JVM’s direct memory limit in this case.

Note The network memory is also part of JVM direct memory, but it is managed by Flink and guaranteed to never exceed its configured size. Therefore, resizing the network memory will not help in this situation.

See also the detailed memory model.

Detailed Memory Model

Simple memory model

The following table lists all memory components, depicted above, and references Flink configuration options which affect the size of the respective components:

  Component    Configuration options    Description  
Framework Heap Memorytaskmanager.memory.framework.heap.sizeJVM Heap memory dedicated to Flink framework (advanced option)
Task Heap Memorytaskmanager.memory.task.heap.sizeJVM Heap memory dedicated to Flink application to run operators and user code
Managed memorytaskmanager.memory.managed.size
taskmanager.memory.managed.fraction
Native memory managed by Flink, reserved for sorting, hash tables, caching of intermediate results and RocksDB state backend
Framework Off-heap Memorytaskmanager.memory.framework.off-heap.sizeOff-heap direct (or native) memory dedicated to Flink framework (advanced option)
Task Off-heap Memorytaskmanager.memory.task.off-heap.sizeOff-heap direct (or native) memory dedicated to Flink application to run operators
Network Memorytaskmanager.memory.network.min
taskmanager.memory.network.max
taskmanager.memory.network.fraction
Direct memory reserved for data record exchange between tasks (e.g. buffering for the transfer over the network), is a capped fractionated component of the total Flink memory. This memory is used for allocation of network buffers
JVM metaspacetaskmanager.memory.jvm-metaspace.sizeMetaspace size of the Flink JVM process
JVM Overheadtaskmanager.memory.jvm-overhead.min
taskmanager.memory.jvm-overhead.max
taskmanager.memory.jvm-overhead.fraction
Native memory reserved for other JVM overhead: e.g. thread stacks, code cache, garbage collection space etc, it is a capped fractionated component of the total process memory

As you can see, the size of some memory components can be simply set by the respective option. Other components can be tuned using multiple options.

Framework Memory

You should not change the framework heap memory and framework off-heap memory without a good reason. Adjust them only if you are sure that Flink needs more memory for some internal data structures or operations. It can be related to a particular deployment environment or job structure, like high parallelism. In addition, Flink dependencies, such as Hadoop may consume more direct or native memory in certain setups.

Note Flink neither isolates heap nor off-heap versions of framework and task memory at the moment. The separation of framework and task memory can be used in future releases for further optimizations.

Local Execution

If you start Flink locally on your machine as a single java program without creating a cluster (e.g. from your IDE) then all components are ignored except for the following:

  Memory component    Relevant options    Default value for the local execution  
Task heaptaskmanager.memory.task.heap.sizeinfinite
Task off-heaptaskmanager.memory.task.off-heap.sizeinfinite
Managed memorytaskmanager.memory.managed.size128Mb
Network memorytaskmanager.memory.network.min
taskmanager.memory.network.max
64Mb

All of the components listed above can be but do not have to be explicitly configured for local execution. If they are not configured they are set to their default values. Task heap memory and task off-heap memory are considered to be infinite (Long.MAX_VALUE bytes) and managed memory has a default value of 128Mb only for the local execution mode.

Note The task heap size is not related in any way to the real heap size in this case. It can become relevant for future optimizations coming with next releases. The actual JVM Heap size of the started local process is not controlled by Flink and depends on how you start the process. If you want to control the JVM Heap size you have to explicitly pass the corresponding JVM arguments, e.g. -Xmx, -Xms.