Network memory tuning guide

Overview

Each record in Flink is sent to the next subtask compounded with other records in a network buffer, the smallest unit for communication between subtasks. In order to maintain consistent high throughput, Flink uses network buffer queues (also known as in-flight data) on the input and output side of the transmission process.

Each subtask has an input queue waiting to consume data and an output queue waiting to send data to the next subtask. Having a larger amount of in-flight data means that Flink can provide higher and more resilient throughput in the pipeline. This will, however, cause longer checkpoint times.

Checkpoints in Flink can only finish once all the subtasks receive all of the injected checkpoint barriers. In aligned checkpoints, those checkpoint barriers are traveling throughout the job graph along with the network buffers. The larger the amount of in-flight data, the longer the checkpoint barrier propagation time. In unaligned checkpoints, the larger the amount of in-flight data, the larger the checkpoint size will be because all of the captured in-flight data has to be persisted as part of the checkpoint.

The Buffer Debloating Mechanism

Previously, the only way to configure the amount of in-flight data was to specify both the buffer amount and the buffer size. However, ideal values can be difficult to choose since they are different for every deployment. The buffer debloating mechanism added in Flink 1.14 attempts to address this issue by automatically adjusting the amount of in-flight data to reasonable values.

The buffer debloating feature calculates the maximum possible throughput for the subtask (in the scenario that it is always busy) and adjusts the amount of in-flight data such that the consumption time of those in-flight data will be equal to the configured value.

The buffer debloat mechanism can be enabled by setting the property taskmanager.network.memory.buffer-debloat.enabled to true. The targeted time to consume the in-flight data can be configured by setting taskmanager.network.memory.buffer-debloat.target to a duration. The default value of the debloat target should be good enough for most cases.

This feature uses past throughput data to predict the time required to consume the remaining in-flight data. If the predictions are incorrect, the debloating mechanism can fail in one of two ways:

  • There will not be enough buffered data to provide full throughput.
  • There will be too many buffered in-flight data which will negatively affect the aligned checkpoint barriers propagation time or the unaligned checkpoint size.

If you have a varying load in your Job (i.e. sudden spikes of incoming records, periodically firing windowed aggregations or joins), you might need to adjust the following settings:

  • taskmanager.network.memory.buffer-debloat.period - This is the minimum time period between buffer size recalculation. The shorter the period, the faster the reaction time of the debloating mechanism but the higher the CPU overhead for the necessary calculations.

  • taskmanager.network.memory.buffer-debloat.samples - This adjusts the number of samples over which throughput measurements are averaged out. The frequency of the collected samples can be adjusted via taskmanager.network.memory.buffer-debloat.period. The fewer the samples, the faster the reaction time of the debloating mechanism, but a higher chance of a sudden spike or drop of the throughput which can cause the buffer debloating mechanism to miscalculate the optimal amount of in-flight data.

  • taskmanager.network.memory.buffer-debloat.threshold-percentages - An optimization for preventing frequent buffer size changes (i.e. if the new size is not much different compared to the old size).

Consult the configuration documentation for more details and additional parameters.

Here are metrics you can use to monitor the current buffer size:

  • estimatedTimeToConsumeBuffersMs - total time to consume data from all input channels
  • debloatedBufferSize - current buffer size

Limitations

Currently, there are a few cases that are not handled automatically by the buffer debloating mechanism.

Multiple inputs and unions

Currently, the throughput calculation and buffer debloating happen on the subtask level.

If your subtask has multiple different inputs or it has a single but unioned input, buffer debloating can cause the input of the low throughput to have too much buffered in-flight data, while the input of the high throughput might have buffers that are too small to sustain that throughput. This might be particularly visible if the different inputs have vastly different throughputs. We recommend paying special attention to such subtasks when testing this feature.

Buffer size and number of buffers

Currently, buffer debloating only caps at the maximal used buffer size. The actual buffer size and the number of buffers remain unchanged. This means that the debloating mechanism cannot reduce the memory usage of your job. You would have to manually reduce either the amount or the size of the buffers.

Furthermore, if you want to reduce the amount of buffered in-flight data below what buffer debloating currently allows, you might want to manually configure the number of buffers.

High parallelism

Currently, the buffer debloating mechanism might not perform correctly with high parallelism (above ~200) using the default configuration. If you observe reduced throughput or higher than expected checkpointing times we suggest increasing the number of floating buffers (taskmanager.network.memory.floating-buffers-per-gate) from the default value to at least the number equal to the parallelism.

The actual value of parallelism from which the problem occurs is various from job to job but normally it should be more than a couple of hundreds.

Network buffer lifecycle

Flink has several local buffer pools - one for the output stream and one for each input gate. The target size of each buffer pool is calculated by the following formula.

#channels * taskmanager.network.memory.buffers-per-channel + taskmanager.network.memory.floating-buffers-per-gate

The size of the buffer can be configured by setting taskmanager.memory.segment-size.

Input network buffers

The target buffer pool size is not always reached. There’s a threshold controlling whether Flink should fail upon not obtaining buffers. The part of the target number of buffers that below this threshold is considered required. The remaining, if any, is optional. Not obtaining required buffers will lead to task failures. A task will not fail if it cannot obtain optional buffers, but may suffer a performance reduction.

The default value for this threshold is Integer.MAX_VALUE for streaming workloads, and 1000 for batch workloads. We do not recommend users to change this threshold, unless the user has good reasons and knows what he/she is doing well. The relevant configuration option is taskmanager.network.memory.read-buffer.required-per-gate.max. In general, a smaller threshold leads to less chance of the “insufficient number of network buffers” exception, while the workloads may suffer performance reduction silently, and vice versa.

Output network buffers

Unlike the input buffer pool, the output buffer pool has only one type of buffer which it shares among all subpartitions.

In order to avoid excessive data skew, the number of buffers for each subpartition is limited by the taskmanager.network.memory.max-buffers-per-channel setting.

Unlike the input buffer pool, the configured amount of exclusive buffers and floating buffers is only treated as recommended values. If there are not enough buffers available, Flink can make progress with only a single exclusive buffer per output subpartition and zero floating buffers.

Overdraft buffers

For each output subtask can also request up to taskmanager.network.memory.max-overdraft-buffers-per-gate (by default 5) extra overdraft buffers. Those buffers are only used, if the subtask is backpressured by downstream subtasks and the subtask requires more than a single network buffer to finish what its currently doing. This can happen in situations like:

  • Serializing very large records, that do not fit into a single network buffer.
  • Flat Map like operator, that produces many output records per single input record.
  • Operators that output many records either periodically or on a reaction to some events (for example WindowOperator’s triggers).

Without overdraft buffers in such situations Flink subtask thread would block on the backpressure, preventing for example unaligned checkpoints from completing. To mitigate this, the overdraft buffers concept has been added. Those overdraft buffers are strictly optional and Flink can gradually make progress using only regular buffers, which means 0 is an acceptable configuration for the taskmanager.network.memory.max-overdraft-buffers-per-gate.

This feature only takes effect for Pipelined Shuffle.

The number of in-flight buffers

The default settings for exclusive buffers and floating buffers should be sufficient for the maximum throughput. If the minimum of in-flight data needs to be set, the exclusive buffers can be set to 0 and the memory segment size can be decreased.

Selecting the buffer size

The buffer collects records in order to optimize network overhead when sending the data portion to the next subtask. The next subtask should receive all parts of the record before consuming it.

If the buffer size is too small, or the buffers are flushed too frequently (execution.buffer-timeout configuration parameter), this can lead to decreased throughput since the per-buffer overhead are significantly higher then per-record overheads in the Flink’s runtime.

As a rule of thumb, we don’t recommend thinking about increasing the buffer size, or the buffer timeout unless you can observe a network bottleneck in your real life workload (downstream operator idling, upstream backpressured, output buffer queue is full, downstream input queue is empty).

If the buffer size is too large, this can lead to:

  • high memory usage
  • huge checkpoint data (for unaligned checkpoints)
  • long checkpoint time (for aligned checkpoints)
  • inefficient use of allocated memory with a small execution.buffer-timeout because flushed buffers would only be sent partially filled

Selecting the buffer count

The number of buffers is configured by the taskmanager.network.memory.buffers-per-channel and taskmanager.network.memory.floating-buffers-per-gate settings.

For best throughput, we recommend using the default values for the number of exclusive and floating buffers(except you have one of limit cases). If the amount of in-flight data is causing issues, enabling buffer debloating is recommended.

You can tune the number of network buffers manually, but consider the following:

  1. You should adjust the number of buffers according to your expected throughput (in bytes/second). Assigning credits and sending buffers takes some time (around two roundtrip messages between two nodes). The latency also depends on your network.

Using the buffer roundtrip time (around 1ms in a healthy local network), the buffer size, and the expected throughput, you can calculate the number of buffers required to sustain the throughput by using this formula:

  1. number_of_buffers = expected_throughput * buffer_roundtrip / buffer_size

For example, with an expected throughput of 320MB/s, roundtrip latency of 1ms, and the default memory segment size, 10 is the number of actively used buffers needed to achieve the expected throughput:

  1. number_of_buffers = 320MB/s * 1ms / 32KB = 10
  1. The purpose of floating buffers is to handle data skew scenarios. Ideally, the number of floating buffers (default: 8) and the exclusive buffers (default: 2) that belong to that channel should be able to saturate the network throughput. But this is not always feasible or necessary. It is very rare that only a single channel among all the subtasks in the task manager is being used.

  2. The purpose of exclusive buffers is to provide a fluent throughput. While one buffer is in transit, the other is being filled up. With high throughput setups, the number of exclusive buffers is the main factor that defines the amount of in-flight data Flink uses.

In the case of backpressure in low throughput setups, you should consider reducing the number of exclusive buffers.

Summary

Memory configuration tuning for the network in Flink can be simplified by enabling the buffer debloating mechanism. You may have to tune it.

If this does not work, you can disable the buffer debloating mechanism and manually configure the memory segment size and the number of buffers. For this second scenario, we recommend:

  • using the default values for max throughput
  • reducing the memory segment size and/or number of exclusive buffers to speed up checkpointing and reduce the memory consumption of the network stack