[FLINK-7301] [docs] Rework state documentation
[flink.git] / docs / ops / upgrading.md
1 ---
2 title: "Upgrading Applications and Flink Versions"
3 nav-parent_id: ops
4 nav-pos: 15
5 ---
6 <!--
7 Licensed to the Apache Software Foundation (ASF) under one
8 or more contributor license agreements.  See the NOTICE file
9 distributed with this work for additional information
10 regarding copyright ownership.  The ASF licenses this file
11 to you under the Apache License, Version 2.0 (the
12 "License"); you may not use this file except in compliance
13 with the License.  You may obtain a copy of the License at
14
15   http://www.apache.org/licenses/LICENSE-2.0
16
17 Unless required by applicable law or agreed to in writing,
18 software distributed under the License is distributed on an
19 "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
20 KIND, either express or implied.  See the License for the
21 specific language governing permissions and limitations
22 under the License.
23 -->
24
25 * ToC
26 {:toc}
27
28 Flink DataStream programs are typically designed to run for long periods of time such as weeks, months, or even years. As with all long-running services, Flink streaming applications need to be maintained, which includes fixing bugs, implementing improvements, or migrating an application to a Flink cluster of a later version.
29
30 This document describes how to update a Flink streaming application and how to migrate a running streaming application to a different Flink cluster.
31
32 ## Restarting Streaming Applications
33
34 The line of action for upgrading a streaming application or migrating an application to a different cluster is based on Flink's [Savepoint]({{ site.baseurl }}/ops/state/savepoints.html) feature. A savepoint is a consistent snapshot of the state of an application at a specific point in time. 
35
36 There are two ways of taking a savepoint from a running streaming application.
37
38 * Taking a savepoint and continue processing.
39 ```
40 > ./bin/flink savepoint <jobID> [pathToSavepoint]
41 ```
42 It is recommended to periodically take savepoints in order to be able to restart an application from a previous point in time.
43
44 * Taking a savepoint and stopping the application as a single action. 
45 ```
46 > ./bin/flink cancel -s [pathToSavepoint] <jobID>
47 ```
48 This means that the application is canceled immediately after the savepoint completed, i.e., no other checkpoints are taken after the savepoint.
49
50 Given a savepoint taken from an application, the same or a compatible application (see [Application State Compatibility](#application-state-compatibility) section below) can be started from that savepoint. Starting an application from a savepoint means that the state of its operators is initialized with the operator state persisted in the savepoint. This is done by starting an application using a savepoint.
51 ```
52 > ./bin/flink run -d -s [pathToSavepoint] ~/application.jar
53 ```
54
55 The operators of the started application are initialized with the operator state of the original application (i.e., the application the savepoint was taken from) at the time when the savepoint was taken. The started application continues processing from exactly this point on. 
56
57 **Note**: Even though Flink consistently restores the state of an application, it cannot revert writes to external systems. This can be an issue if you resume from a savepoint that was taken without stopping the application. In this case, the application has probably emitted data after the savepoint was taken. The restarted application might (depending on whether you changed the application logic or not) emit the same data again. The exact effect of this behavior can be very different depending on the `SinkFunction` and storage system. Data that is emitted twice might be OK in case of idempotent writes to a key-value store like Cassandra but problematic in case of appends to a durable log such as Kafka. In any case, you should carefully check and test the behavior of a restarted application.
58
59 ## Application State Compatibility
60
61 When upgrading an application in order to fix a bug or to improve the application, usually the goal is to replace the application logic of the running application while preserving its state. We do this by starting the upgraded application from a savepoint which was taken from the original application. However, this does only work if both applications are *state compatible*, meaning that the operators of upgraded application are able to initialize their state with the state of the operators of original application. 
62
63 In this section, we discuss how applications can be modified to remain state compatible.
64
65 ### Matching Operator State
66
67 When an application is restarted from a savepoint, Flink matches the operator state stored in the savepoint to stateful operators of the started application. The matching is done based on operator IDs, which are also stored in the savepoint. Each operator has a default ID that is derived from the operator's position in the application's operator topology. Hence, an unmodified application can always be restarted from one of its own savepoints. However, the default IDs of operators are likely to change if an application is modified. Therefore, modified applications can only be started from a savepoint if the operator IDs have been explicitly specified. Assigning IDs to operators is very simple and done using the `uid(String)` method as follows:
68
69 ```
70 val mappedEvents: DataStream[(Int, Long)] = events
71   .map(new MyStatefulMapFunc()).uid(“mapper-1”)
72 ```
73
74 **Note:** Since the operator IDs stored in a savepoint and IDs of operators in the application to start must be equal, it is highly recommended to assign unique IDs to all operators of an application that might be upgraded in the future. This advice applies to all operators, i.e., operators with and without explicitly declared operator state, because some operators have internal state that is not visible to the user. Upgrading an application without assigned operator IDs is significantly more difficult and may only be possible via a low-level workaround using the `setUidHash()` method.
75
76 **Important:** As of 1.3.x this also applies to operators that are part of a chain.
77
78 By default all state stored in a savepoint must be matched to the operators of a starting application. However, users can explicitly agree to skip (and thereby discard) state that cannot be matched to an operator when starting a application from a savepoint. Stateful operators for which no state is found in the savepoint are initialized with their default state.
79
80 ### Stateful Operators and User Functions
81
82 When upgrading an application, user functions and operators can be freely modified with one restriction. It is not possible to change the data type of the state of an operator. This is important because, state from a savepoint can (currently) not be converted into a different data type before it is loaded into an operator. Hence, changing the data type of operator state when upgrading an application breaks application state consistency and prevents the upgraded application from being restarted from the savepoint. 
83
84 Operator state can be either user-defined or internal. 
85
86 * **User-defined operator state:** In functions with user-defined operator state the type of the state is explicitly defined by the user. Although it is not possible to change the data type of operator state, a workaround to overcome this limitation can be to define a second state with a different data type and to implement logic to migrate the state from the original state into the new state. This approach requires a good migration strategy and a solid understanding of the behavior of [key-partitioned state]({{ site.baseurl }}/dev/stream/state/state.html).
87
88 * **Internal operator state:** Operators such as window or join operators hold internal operator state which is not exposed to the user. For these operators the data type of the internal state depends on the input or output type of the operator. Consequently, changing the respective input or output type breaks application state consistency and prevents an upgrade. The following table lists operators with internal state and shows how the state data type relates to their input and output types. For operators which are applied on a keyed stream, the key type (KEY) is always part of the state data type as well.
89
90 | Operator                                            | Data Type of Internal Operator State |
91 |:----------------------------------------------------|:-------------------------------------|
92 | ReduceFunction[IOT]                                 | IOT (Input and output type) [, KEY]  |
93 | FoldFunction[IT, OT]                                | OT (Output type) [, KEY]             |
94 | WindowFunction[IT, OT, KEY, WINDOW]                 | IT (Input type), KEY                 |
95 | AllWindowFunction[IT, OT, WINDOW]                   | IT (Input type)                      |
96 | JoinFunction[IT1, IT2, OT]                          | IT1, IT2 (Type of 1. and 2. input), KEY |
97 | CoGroupFunction[IT1, IT2, OT]                       | IT1, IT2 (Type of 1. and 2. input), KEY |
98 | Built-in Aggregations (sum, min, max, minBy, maxBy) | Input Type [, KEY]                   |
99
100 ### Application Topology
101
102 Besides changing the logic of one or more existing operators, applications can be upgraded by changing the topology of the application, i.e., by adding or removing operators, changing the parallelism of an operator, or modifying the operator chaining behavior.
103
104 When upgrading an application by changing its topology, a few things need to be considered in order to preserve application state consistency.
105
106 * **Adding or removing a stateless operator:** This is no problem unless one of the cases below applies.
107 * **Adding a stateful operator:** The state of the operator will be initialized with the default state unless it takes over the state of another operator.
108 * **Removing a stateful operator:** The state of the removed operator is lost unless another operator takes it over. When starting the upgraded application, you have to explicitly agree to discard the state.
109 * **Changing of input and output types of operators:** When adding a new operator before or behind an operator with internal state, you have to ensure that the input or output type of the stateful operator is not modified to preserve the data type of the internal operator state (see above for details).
110 * **Changing operator chaining:** Operators can be chained together for improved performance. When restoring from a savepoint taken since 1.3.x it is possible to modify chains while preversing state consistency. It is possible a break the chain such that a stateful operator is moved out of the chain. It is also possible to append or inject a new or existing stateful operator into a chain, or to modify the operator order within a chain. However, when upgrading a savepoint to 1.3.x it is paramount that the topology did not change in regards to chaining. All operators that are part of a chain should be assigned an ID as described in the [Matching Operator State](#Matching Operator State) section above.
111
112 ## Upgrading the Flink Framework Version
113
114 This section describes the general way of upgrading Flink across versions and migrating your jobs between the versions.
115
116 In a nutshell, this procedure consists of 2 fundamental steps:
117
118 1. Take a savepoint in the previous, old Flink version for the jobs you want to migrate.
119 2. Resume your jobs under the new Flink version from the previously taken savepoints.
120
121 Besides those two fundamental steps, some additional steps can be required that depend on the way you want to change the
122 Flink version. In this guide we differentiate two approaches to upgrade across Flink versions: **in-place** upgrade and 
123 **shadow copy** upgrade.
124
125 For **in-place** update, after taking savepoints, you need to:
126
127   1. Stop/cancel all running jobs.
128   2. Shutdown the cluster that runs the old Flink version.
129   3. Upgrade Flink to the newer version on the cluster.
130   4. Restart the cluster under the new version.
131
132 For **shadow copy**, you need to:
133
134   1. Before resuming from the savepoint, setup a new installation of the new Flink version besides your old Flink installation.
135   2. Resume from the savepoints with the new Flink installation.
136   3. If everything runs ok, stop and shutdown the old Flink cluster.
137
138 In the following, we will first present the preconditions for successful job migration and then go into more detail 
139 about the steps that we outlined before.
140
141 ### Preconditions
142
143 Before starting the migration, please check that the jobs you are trying to migrate are following the
144 best practises for [savepoints]({{ site.baseurl }}/ops/state/savepoints.html). Also, check out the 
145 [API Migration Guides]({{ site.baseurl }}/dev/migration.html) to see if there is any API changes related to migrating
146 savepoints to newer versions.
147
148 In particular, we advise you to check that explicit `uid`s were set for operators in your job. 
149
150 This is a *soft* precondition, and restore *should* still work in case you forgot about assigning `uid`s. 
151 If you run into a case where this is not working, you can *manually* add the generated legacy vertex ids from previous
152 Flink versions to your job using the `setUidHash(String hash)` call. For each operator (in operator chains: only the
153 head operator) you must assign the 32 character hex string representing the hash that you can see in the web ui or logs
154 for the operator.
155
156 Besides operator uids, there are currently two *hard* preconditions for job migration that will make migration fail: 
157
158 1. We do not support migration for state in RocksDB that was checkpointed using 
159 `semi-asynchronous` mode. In case your old job was using this mode, you can still change your job to use 
160 `fully-asynchronous` mode before taking the savepoint that is used as the basis for the migration.
161
162 2. Another **important** precondition is that for savepoints taken before Flink 1.3.x, all the savepoint data must be
163 accessible from the new installation and reside under the same absolute path. Before Flink 1.3.x, the savepoint data is
164 typically not self-contained in just the created savepoint file. Additional files can be referenced from inside the
165 savepoint file (e.g. the output from state backend snapshots). Since Flink 1.3.x, this is no longer a limitation;
166 savepoints can be relocated using typical filesystem operations..
167
168
169 ### STEP 1: Take a savepoint in the old Flink version.
170
171 First major step in job migration is taking a savepoint of your job running in the older Flink version.
172 You can do this with the command:
173
174 ```sh
175 $ bin/flink savepoint :jobId [:targetDirectory]
176 ```
177
178 For more details, please read the [savepoint documentation]({{ site.baseurl }}/ops/state/savepoints.html).
179
180 ### STEP 2: Update your cluster to the new Flink version.
181
182 In this step, we update the framework version of the cluster. What this basically means is replacing the content of
183 the Flink installation with the new version. This step can depend on how you are running Flink in your cluster (e.g. 
184 standalone, on Mesos, ...).
185
186 If you are unfamiliar with installing Flink in your cluster, please read the [deployment and cluster setup documentation]({{ site.baseurl }}/ops/deployment/cluster_setup.html).
187
188 ### STEP 3: Resume the job under the new Flink version from savepoint.
189
190 As the last step of job migration, you resume from the savepoint taken above on the updated cluster. You can do
191 this with the command:
192
193 ```sh
194 $ bin/flink run -s :savepointPath [:runArgs]
195 ```
196
197 Again, for more details, please take a look at the [savepoint documentation]({{ site.baseurl }}/ops/state/savepoints.html).
198
199 ## Compatibility Table
200
201 Savepoints are compatible across Flink versions as indicated by the table below:
202
203 <br />
204
205 <table class="table table-bordered">
206   <thead>
207     <tr>
208       <th class="text-left" style="width: 25%">Created with \ Resumed with</th>
209       <th class="text-center">1.1.x</th>
210       <th class="text-center">1.2.x</th>
211       <th class="text-center">1.3.x</th>
212       <th class="text-center">Limitations</th>
213     </tr>
214   </thead>
215   <tbody>
216     <tr>
217           <td class="text-center"><strong>1.1.x</strong></td>
218           <td class="text-center">O</td>
219           <td class="text-center">O</td>
220           <td class="text-center">O</td>
221           <td class="text-left">The maximum parallelism of a job that was migrated from Flink 1.1.x to 1.2.x is
222           currently fixed as the parallelism of the job. This means that the parallelism can not be increased after
223           migration. This limitation might be removed in a future bugfix release.</td>
224     </tr>
225     <tr>
226           <td class="text-center"><strong>1.2.x</strong></td>
227           <td class="text-center"></td>
228           <td class="text-center">O</td>
229           <td class="text-center">O</td>
230           <td class="text-left">When migrating from Flink 1.2.x to Flink 1.3.x, changing parallelism at the same
231           time is not supported. Users have to first take a savepoint after migrating to Flink 1.3.x, and then change
232           parallelism.</td>
233     </tr>
234     <tr>
235           <td class="text-center"><strong>1.3.x</strong></td>
236           <td class="text-center"></td>
237           <td class="text-center"></td>
238           <td class="text-center">O</td>
239           <td class="text-left"></td>
240     </tr>
241   </tbody>
242 </table>