[FLINK-7301] [docs] Rework state documentation
[flink.git] / docs / ops / state / large_state_tuning.md
1 ---
2 title: "Debugging and Tuning Checkpoints and Large State"
3 nav-parent_id: ops_state
4 nav-pos: 12
5 ---
6 <!--
7 Licensed to the Apache Software Foundation (ASF) under one
8 or more contributor license agreements.  See the NOTICE file
9 distributed with this work for additional information
10 regarding copyright ownership.  The ASF licenses this file
11 to you under the Apache License, Version 2.0 (the
12 "License"); you may not use this file except in compliance
13 with the License.  You may obtain a copy of the License at
14
15   http://www.apache.org/licenses/LICENSE-2.0
16
17 Unless required by applicable law or agreed to in writing,
18 software distributed under the License is distributed on an
19 "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
20 KIND, either express or implied.  See the License for the
21 specific language governing permissions and limitations
22 under the License.
23 -->
24
25 This page gives a guide how to configure and tune applications that use large state.
26
27 * ToC
28 {:toc}
29
30 ## Overview
31
32 For Flink applications to run reliably at large scale, two conditions must be fulfilled:
33
34   - The application needs to be able to take checkpoints reliably
35
36   - The resources need to be sufficient catch up with the input data streams after a failure
37
38 The first sections discuss how to get well performing checkpoints at scale.
39 The last section explains some best practices concerning planning how many resources to use.
40
41
42 ## Monitoring State and Checkpoints
43
44 The easiest way to monitor checkpoint behavior is via the UI's checkpoint section. The documentation
45 for [checkpoint monitoring](../../monitoring/checkpoint_monitoring.html) shows how to access the available checkpoint
46 metrics.
47
48 The two numbers that are of particular interest when scaling up checkpoints are:
49
50   - The time until operators start their checkpoint: This time is currently not exposed directly, but corresponds
51     to:
52     
53     `checkpoint_start_delay = end_to_end_duration - synchronous_duration - asynchronous_duration`
54
55     When the time to trigger the checkpoint is constantly very high, it means that the *checkpoint barriers* need a long
56     time to travel from the source to the operators. That typically indicates that the system is operating under a
57     constant backpressure.
58
59   - The amount of data buffered during alignments. For exactly-once semantics, Flink *aligns* the streams at
60     operators that receive multiple input streams, buffering some data for that alignment.
61     The buffered data volume is ideally low - higher amounts means that checkpoint barriers are reveived at
62     very different times from the different input streams.
63
64 Note that when the here indicated numbers can be occasionally high in the presence of transient backpressure, data skew,
65 or network issues. However, if the numbers are constantly very high, it means that Flink puts many resources into checkpointing.
66
67
68 ## Tuning Checkpointing
69
70 Checkpoints are triggered at regular intervals that applications can configure. When a checkpoint takes longer
71 to complete than the checkpoint interval, the next checkpoint is not triggered before the in-progress checkpoint
72 completes. By default the next checkpoint will then be triggered immediately once the ongoing checkpoint completes.
73
74 When checkpoints end up frequently taking longer than the base interval (for example because state
75 grew larger than planned, or the storage where checkpoints are stored is temporarily slow),
76 the system is constantly taking checkpoints (new ones are started immediately once ongoing once finish).
77 That can mean that too many resources are constantly tied up in checkpointing and that the operators make too
78 little progress. This behavior has less impact on streaming applications that use asynchronously checkpointed state,
79 but may still have an impact on overall application performance.
80
81 To prevent such a situation, applications can define a *minimum duration between checkpoints*:
82
83 `StreamExecutionEnvironment.getCheckpointConfig().setMinPauseBetweenCheckpoints(milliseconds)`
84
85 This duration is the minimum time interval that must pass between the end of the latest checkpoint and the beginning
86 of the next. The figure below illustrates how this impacts checkpointing.
87
88 <img src="../../fig/checkpoint_tuning.svg" class="center" width="80%" alt="Illustration how the minimum-time-between-checkpoints parameter affects checkpointing behavior."/>
89
90 *Note:* Applications can be configured (via the `CheckpointConfig`) to allow multiple checkpoints to be in progress at
91 the same time. For applications with large state in Flink, this often ties up too many resources into the checkpointing.
92 When a savepoint is manually triggered, it may be in process concurrently with an ongoing checkpoint.
93
94
95 ## Tuning Network Buffers
96
97 Before Flink 1.3, an increased number of network buffers also caused increased checkpointing times since
98 keeping more in-flight data meant that checkpoint barriers got delayed. Since Flink 1.3, the
99 number of network buffers used per outgoing/incoming channel is limited and thus network buffers
100 may be configured without affecting checkpoint times
101 (see [network buffer configuration](../config.html#configuring-the-network-buffers)).
102
103 ## Make state checkpointing Asynchronous where possible
104
105 When state is *asynchronously* snapshotted, the checkpoints scale better than when the state is *synchronously* snapshotted.
106 Especially in more complex streaming applications with multiple joins, Co-functions, or windows, this may have a profound
107 impact.
108
109 To get state to be snapshotted asynchronously, applications have to do two things:
110
111   1. Use state that is [managed by Flink](../../dev/stream/state/state.html): Managed state means that Flink provides the data
112      structure in which the state is stored. Currently, this is true for *keyed state*, which is abstracted behind the
113      interfaces like `ValueState`, `ListState`, `ReducingState`, ...
114
115   2. Use a state backend that supports asynchronous snapshots. In Flink 1.2, only the RocksDB state backend uses
116      fully asynchronous snapshots.
117
118 The above two points imply that (in Flink 1.2) large state should generally be kept as keyed state, not as operator state.
119 This is subject to change with the planned introduction of *managed operator state*.
120
121
122 ## Tuning RocksDB
123
124 The state storage workhorse of many large scale Flink streaming applications is the *RocksDB State Backend*.
125 The backend scales well beyond main memory and reliably stores large [keyed state](../../dev/stream/state/state.html).
126
127 Unfortunately, RocksDB's performance can vary with configuration, and there is little documentation on how to tune
128 RocksDB properly. For example, the default configuration is tailored towards SSDs and performs suboptimal
129 on spinning disks.
130
131 **Incremental Checkpoints**
132
133 Incremental checkpoints can dramatically reduce the checkpointing time in comparison to full checkpoints, at the cost of a (potentially) longer
134 recovery time. The core idea is that incremental checkpoints only record all changes to the previous completed checkpoint, instead of
135 producing a full, self-contained backup of the state backend. Like this, incremental checkpoints build upon previous checkpoints. Flink leverages
136 RocksDB's internal backup mechanism in a way that is self-consolidating over time. As a result, the incremental checkpoint history in Flink
137 does not grow indefinitely, and old checkpoints are eventually subsumed and pruned automatically. `
138
139 While we strongly encourage the use of incremental checkpoints for large state, please note that this is a new feature and currently not enabled 
140 by default. To enable this feature, users can instantiate a `RocksDBStateBackend` with the corresponding boolean flag in the constructor set to `true`, e.g.:
141
142 {% highlight java %}
143     RocksDBStateBackend backend =
144         new RocksDBStateBackend(filebackend, true);
145 {% endhighlight %}
146
147 **Passing Options to RocksDB**
148
149 {% highlight java %}
150 RocksDBStateBackend.setOptions(new MyOptions());
151
152 public class MyOptions implements OptionsFactory {
153
154     @Override
155     public DBOptions createDBOptions() {
156         return new DBOptions()
157             .setIncreaseParallelism(4)
158             .setUseFsync(false)
159             .setDisableDataSync(true);
160     }
161
162     @Override
163     public ColumnFamilyOptions createColumnOptions() {
164
165         return new ColumnFamilyOptions()
166             .setTableFormatConfig(
167                 new BlockBasedTableConfig()
168                     .setBlockCacheSize(256 * 1024 * 1024)  // 256 MB
169                     .setBlockSize(128 * 1024));            // 128 KB
170     }
171 }
172 {% endhighlight %}
173
174 **Predefined Options**
175
176 Flink provides some predefined collections of option for RocksDB for different settings, which can be set for example via
177 `RocksDBStateBacked.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM)`.
178
179 We expect to accumulate more such profiles over time. Feel free to contribute such predefined option profiles when you
180 found a set of options that work well and seem representative for certain workloads.
181
182 **Important:** RocksDB is a native library, whose allocated memory not from the JVM, but directly from the process'
183 native memory. Any memory you assign to RocksDB will have to be accounted for, typically by decreasing the JVM heap size
184 of the TaskManagers by the same amount. Not doing that may result in YARN/Mesos/etc terminating the JVM processes for
185 allocating more memory than configures.
186
187
188 ## Capacity Planning
189
190 This section discusses how to decide how many resources should be used for a Flink job to run reliably.
191 The basic rules of thumb for capacity planning are:
192
193   - Normal operation should have enough capacity to not operate under constant *back pressure*.
194     See [back pressure monitoring](../../monitoring/back_pressure.html) for details on how to check whether the application runs under back pressure.
195
196   - Provision some extra resources on top of the resources needed to run the program back-pressure-free during failure-free time.
197     These resources are needed to "catch up" with the input data that accumulated during the time the application
198     was recovering.
199     How much that should be depends on how long recovery operations usually take (which depends on the size of the state
200     that needs to be loaded into the new TaskManagers on a failover) and how fast the scenario requires failures to recover.
201
202     *Important*: The base line should to be established with checkpointing activated, because checkpointing ties up
203     some amount of resources (such as network bandwidth).
204
205   - Temporary back pressure is usually okay, and an essential part of execution flow control during load spikes,
206     during catch-up phases, or when external systems (that are written to in a sink) exhibit temporary slowdown.
207
208   - Certain operations (like large windows) result in a spiky load for their downstream operators: 
209     In the case of windows, the downstream operators may have little to do while the window is being built,
210     and have a load to do when the windows are emitted.
211     The planning for the downstream parallelism needs to take into account how much the windows emit and how
212     fast such a spike needs to be processed.
213
214 **Important:** In order to allow for adding resources later, make sure to set the *maximum parallelism* of the
215 data stream program to a reasonable number. The maximum parallelism defines how high you can set the programs
216 parallelism when re-scaling the program (via a savepoint).
217
218 Flink's internal bookkeeping tracks parallel state in the granularity of max-parallelism-many *key groups*.
219 Flink's design strives to make it efficient to have a very high value for the maximum parallelism, even if
220 executing the program with a low parallelism.
221
222 ## Compression
223
224 Flink offers optional compression (default: off) for all checkpoints and savepoints. Currently, compression always uses 
225 the [snappy compression algorithm (version 1.1.4)](https://github.com/xerial/snappy-java) but we are planning to support
226 custom compression algorithms in the future. Compression works on the granularity of key-groups in keyed state, i.e.
227 each key-group can be decompressed individually, which is important for rescaling. 
228
229 Compression can be activated through the `ExecutionConfig`:
230
231 {% highlight java %}
232                 ExecutionConfig executionConfig = new ExecutionConfig();
233                 executionConfig.setUseSnapshotCompression(true);
234 {% endhighlight %}
235
236 **Notice:** The compression option has no impact on incremental snapshots, because they are using RocksDB's internal
237 format which is always using snappy compression out of the box.