AMBARI-21581 - Replace Hard Coded conf-select Structures (jonathanhurley)
[ambari.git] / ambari-server / src / main / resources / common-services / HDFS / 2.1.0.2.0 / package / scripts / namenode.py
1 """
2 Licensed to the Apache Software Foundation (ASF) under one
3 or more contributor license agreements. See the NOTICE file
4 distributed with this work for additional information
5 regarding copyright ownership. The ASF licenses this file
6 to you under the Apache License, Version 2.0 (the
7 "License"); you may not use this file except in compliance
8 with the License. You may obtain a copy of the License at
9
10 http://www.apache.org/licenses/LICENSE-2.0
11
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17
18 """
19
20 import sys
21 import os
22 import json
23 import tempfile
24 from datetime import datetime
25 import ambari_simplejson as json # simplejson is much faster comparing to Python 2.6 json module and has the same functions set.
26
27 from ambari_commons import constants
28
29 from resource_management.libraries.script.script import Script
30 from resource_management.core.resources.system import Execute, File
31 from resource_management.core import shell
32 from resource_management.libraries.functions import stack_select
33 from resource_management.libraries.functions.constants import Direction
34 from resource_management.libraries.functions.format import format
35 from resource_management.libraries.functions.security_commons import build_expectations, \
36 cached_kinit_executor, get_params_from_filesystem, validate_security_config_properties, \
37 FILE_TYPE_XML
38
39 from resource_management.core.exceptions import Fail
40 from resource_management.core.shell import as_user
41 from resource_management.core.logger import Logger
42
43
44 from ambari_commons.os_family_impl import OsFamilyImpl
45 from ambari_commons import OSConst
46
47
48 import namenode_upgrade
49 from hdfs_namenode import namenode, wait_for_safemode_off
50 from hdfs import hdfs
51 import hdfs_rebalance
52 from utils import initiate_safe_zkfc_failover, get_hdfs_binary, get_dfsadmin_base_command
53
54
55
56 # hashlib is supplied as of Python 2.5 as the replacement interface for md5
57 # and other secure hashes. In 2.6, md5 is deprecated. Import hashlib if
58 # available, avoiding a deprecation warning under 2.6. Import md5 otherwise,
59 # preserving 2.4 compatibility.
60 try:
61 import hashlib
62 _md5 = hashlib.md5
63 except ImportError:
64 import md5
65 _md5 = md5.new
66
67 class NameNode(Script):
68
69 def get_hdfs_binary(self):
70 """
71 Get the name or path to the hdfs binary depending on the component name.
72 """
73 return get_hdfs_binary("hadoop-hdfs-namenode")
74
75 def install(self, env):
76 import params
77 env.set_params(params)
78 self.install_packages(env)
79 #TODO we need this for HA because of manual steps
80 self.configure(env)
81
82 def configure(self, env):
83 import params
84 env.set_params(params)
85 hdfs("namenode")
86 hdfs_binary = self.get_hdfs_binary()
87 namenode(action="configure", hdfs_binary=hdfs_binary, env=env)
88
89 def start(self, env, upgrade_type=None):
90 import params
91 env.set_params(params)
92 self.configure(env)
93 hdfs_binary = self.get_hdfs_binary()
94
95 if not params.hdfs_tmp_dir or params.hdfs_tmp_dir == None or params.hdfs_tmp_dir.lower() == 'null':
96 Logger.error("WARNING: HDFS tmp dir property (hdfs_tmp_dir) is empty or invalid. Ambari will change permissions for the folder on regular basis.")
97
98 namenode(action="start", hdfs_binary=hdfs_binary, upgrade_type=upgrade_type,
99 upgrade_suspended=params.upgrade_suspended, env=env)
100
101 # after starting NN in an upgrade, touch the marker file - but only do this for certain
102 # upgrade types - not all upgrades actually tell NN about the upgrade (like HOU)
103 if upgrade_type in (constants.UPGRADE_TYPE_ROLLING, constants.UPGRADE_TYPE_NON_ROLLING):
104 # place a file on the system indicating that we've submitting the command that
105 # instructs NN that it is now part of an upgrade
106 namenode_upgrade.create_upgrade_marker()
107
108 def stop(self, env, upgrade_type=None):
109 import params
110 env.set_params(params)
111 hdfs_binary = self.get_hdfs_binary()
112 if upgrade_type == constants.UPGRADE_TYPE_ROLLING and params.dfs_ha_enabled:
113 if params.dfs_ha_automatic_failover_enabled:
114 initiate_safe_zkfc_failover()
115 else:
116 raise Fail("Rolling Upgrade - dfs.ha.automatic-failover.enabled must be enabled to perform a rolling restart")
117 namenode(action="stop", hdfs_binary=hdfs_binary, upgrade_type=upgrade_type, env=env)
118
119 def status(self, env):
120 import status_params
121 env.set_params(status_params)
122 namenode(action="status", env=env)
123
124 def decommission(self, env):
125 import params
126 env.set_params(params)
127 hdfs_binary = self.get_hdfs_binary()
128 namenode(action="decommission", hdfs_binary=hdfs_binary)
129
130 @OsFamilyImpl(os_family=OsFamilyImpl.DEFAULT)
131 class NameNodeDefault(NameNode):
132
133 def restore_snapshot(self, env):
134 """
135 Restore the snapshot during a Downgrade.
136 """
137 print "TODO AMBARI-12698"
138 pass
139
140 def prepare_express_upgrade(self, env):
141 """
142 During an Express Upgrade.
143 If in HA, on the Active NameNode only, examine the directory dfs.namenode.name.dir and
144 make sure that there is no "/previous" directory.
145
146 Create a list of all the DataNodes in the cluster.
147 hdfs dfsadmin -report > dfs-old-report-1.log
148
149 hdfs dfsadmin -safemode enter
150 hdfs dfsadmin -saveNamespace
151
152 Copy the checkpoint files located in ${dfs.namenode.name.dir}/current into a backup directory.
153
154 Finalize any prior HDFS upgrade,
155 hdfs dfsadmin -finalizeUpgrade
156
157 Prepare for a NameNode rolling upgrade in order to not lose any data.
158 hdfs dfsadmin -rollingUpgrade prepare
159 """
160 import params
161 Logger.info("Preparing the NameNodes for a NonRolling (aka Express) Upgrade.")
162
163 if params.security_enabled:
164 kinit_command = format("{params.kinit_path_local} -kt {params.hdfs_user_keytab} {params.hdfs_principal_name}")
165 Execute(kinit_command, user=params.hdfs_user, logoutput=True)
166
167 hdfs_binary = self.get_hdfs_binary()
168 namenode_upgrade.prepare_upgrade_check_for_previous_dir()
169 namenode_upgrade.prepare_upgrade_enter_safe_mode(hdfs_binary)
170 namenode_upgrade.prepare_upgrade_save_namespace(hdfs_binary)
171 namenode_upgrade.prepare_upgrade_backup_namenode_dir()
172 namenode_upgrade.prepare_upgrade_finalize_previous_upgrades(hdfs_binary)
173
174 # Call -rollingUpgrade prepare
175 namenode_upgrade.prepare_rolling_upgrade(hdfs_binary)
176
177 def prepare_rolling_upgrade(self, env):
178 hfds_binary = self.get_hdfs_binary()
179 namenode_upgrade.prepare_rolling_upgrade(hfds_binary)
180
181 def wait_for_safemode_off(self, env):
182 wait_for_safemode_off(self.get_hdfs_binary(), afterwait_sleep=30, execute_kinit=True)
183
184 def finalize_non_rolling_upgrade(self, env):
185 hfds_binary = self.get_hdfs_binary()
186 namenode_upgrade.finalize_upgrade(constants.UPGRADE_TYPE_NON_ROLLING, hfds_binary)
187
188 def finalize_rolling_upgrade(self, env):
189 hfds_binary = self.get_hdfs_binary()
190 namenode_upgrade.finalize_upgrade(constants.UPGRADE_TYPE_ROLLING, hfds_binary)
191
192 def pre_upgrade_restart(self, env, upgrade_type=None):
193 Logger.info("Executing Stack Upgrade pre-restart")
194 import params
195 env.set_params(params)
196
197 stack_select.select_packages(params.version)
198
199 def post_upgrade_restart(self, env, upgrade_type=None):
200 Logger.info("Executing Stack Upgrade post-restart")
201 import params
202 env.set_params(params)
203
204 hdfs_binary = self.get_hdfs_binary()
205 dfsadmin_base_command = get_dfsadmin_base_command(hdfs_binary)
206 dfsadmin_cmd = dfsadmin_base_command + " -report -live"
207 Execute(dfsadmin_cmd,
208 user=params.hdfs_user,
209 tries=60,
210 try_sleep=10
211 )
212
213 def rebalancehdfs(self, env):
214 import params
215 env.set_params(params)
216
217 name_node_parameters = json.loads( params.name_node_params )
218 threshold = name_node_parameters['threshold']
219 _print("Starting balancer with threshold = %s\n" % threshold)
220
221 rebalance_env = {'PATH': params.hadoop_bin_dir}
222
223 if params.security_enabled:
224 # Create the kerberos credentials cache (ccache) file and set it in the environment to use
225 # when executing HDFS rebalance command. Use the md5 hash of the combination of the principal and keytab file
226 # to generate a (relatively) unique cache filename so that we can use it as needed.
227 # TODO: params.tmp_dir=/var/lib/ambari-agent/tmp. However hdfs user doesn't have access to this path.
228 # TODO: Hence using /tmp
229 ccache_file_name = "hdfs_rebalance_cc_" + _md5(format("{hdfs_principal_name}|{hdfs_user_keytab}")).hexdigest()
230 ccache_file_path = os.path.join(tempfile.gettempdir(), ccache_file_name)
231 rebalance_env['KRB5CCNAME'] = ccache_file_path
232
233 # If there are no tickets in the cache or they are expired, perform a kinit, else use what
234 # is in the cache
235 klist_cmd = format("{klist_path_local} -s {ccache_file_path}")
236 kinit_cmd = format("{kinit_path_local} -c {ccache_file_path} -kt {hdfs_user_keytab} {hdfs_principal_name}")
237 if shell.call(klist_cmd, user=params.hdfs_user)[0] != 0:
238 Execute(kinit_cmd, user=params.hdfs_user)
239
240 def calculateCompletePercent(first, current):
241 # avoid division by zero
242 try:
243 division_result = current.bytesLeftToMove/first.bytesLeftToMove
244 except ZeroDivisionError:
245 Logger.warning("Division by zero. Bytes Left To Move = {0}. Return 1.0".format(first.bytesLeftToMove))
246 return 1.0
247 return 1.0 - division_result
248
249
250 def startRebalancingProcess(threshold, rebalance_env):
251 rebalanceCommand = format('hdfs --config {hadoop_conf_dir} balancer -threshold {threshold}')
252 return as_user(rebalanceCommand, params.hdfs_user, env=rebalance_env)
253
254 command = startRebalancingProcess(threshold, rebalance_env)
255
256 basedir = os.path.join(env.config.basedir, 'scripts')
257 if(threshold == 'DEBUG'): #FIXME TODO remove this on PROD
258 basedir = os.path.join(env.config.basedir, 'scripts', 'balancer-emulator')
259 command = ['ambari-python-wrap','hdfs-command.py']
260
261 _print("Executing command %s\n" % command)
262
263 parser = hdfs_rebalance.HdfsParser()
264
265 def handle_new_line(line, is_stderr):
266 if is_stderr:
267 return
268
269 _print('[balancer] %s' % (line))
270 pl = parser.parseLine(line)
271 if pl:
272 res = pl.toJson()
273 res['completePercent'] = calculateCompletePercent(parser.initialLine, pl)
274
275 self.put_structured_out(res)
276 elif parser.state == 'PROCESS_FINISED' :
277 _print('[balancer] %s' % ('Process is finished' ))
278 self.put_structured_out({'completePercent' : 1})
279 return
280
281 if (not hdfs_rebalance.is_balancer_running()):
282 # As the rebalance may take a long time (haours, days) the process is triggered only
283 # Tracking the progress based on the command output is no longer supported due to this
284 Execute(command, wait_for_finish=False)
285
286 _print("The rebalance process has been triggered")
287 else:
288 _print("There is another balancer running. This means you or another Ambari user may have triggered the "
289 "operation earlier. The process may take a long time to finish (hours, even days). If the problem persists "
290 "please consult with the HDFS administrators if they have triggred or killed the operation.")
291
292 if params.security_enabled:
293 # Delete the kerberos credentials cache (ccache) file
294 File(ccache_file_path,
295 action = "delete",
296 )
297
298 def get_log_folder(self):
299 import params
300 return params.hdfs_log_dir
301
302 def get_user(self):
303 import params
304 return params.hdfs_user
305
306 def get_pid_files(self):
307 import status_params
308 return [status_params.namenode_pid_file]
309
310 @OsFamilyImpl(os_family=OSConst.WINSRV_FAMILY)
311 class NameNodeWindows(NameNode):
312 def install(self, env):
313 import install_params
314 self.install_packages(env)
315 #TODO we need this for HA because of manual steps
316 self.configure(env)
317
318 def rebalancehdfs(self, env):
319 from ambari_commons.os_windows import UserHelper, run_os_command_impersonated
320 import params
321 env.set_params(params)
322
323 hdfs_username, hdfs_domain = UserHelper.parse_user_name(params.hdfs_user, ".")
324
325 name_node_parameters = json.loads( params.name_node_params )
326 threshold = name_node_parameters['threshold']
327 _print("Starting balancer with threshold = %s\n" % threshold)
328
329 def calculateCompletePercent(first, current):
330 return 1.0 - current.bytesLeftToMove/first.bytesLeftToMove
331
332 def startRebalancingProcess(threshold):
333 rebalanceCommand = 'hdfs balancer -threshold %s' % threshold
334 return ['cmd', '/C', rebalanceCommand]
335
336 command = startRebalancingProcess(threshold)
337 basedir = os.path.join(env.config.basedir, 'scripts')
338
339 _print("Executing command %s\n" % command)
340
341 parser = hdfs_rebalance.HdfsParser()
342 returncode, stdout, err = run_os_command_impersonated(' '.join(command), hdfs_username, Script.get_password(params.hdfs_user), hdfs_domain)
343
344 for line in stdout.split('\n'):
345 _print('[balancer] %s %s' % (str(datetime.now()), line ))
346 pl = parser.parseLine(line)
347 if pl:
348 res = pl.toJson()
349 res['completePercent'] = calculateCompletePercent(parser.initialLine, pl)
350
351 self.put_structured_out(res)
352 elif parser.state == 'PROCESS_FINISED' :
353 _print('[balancer] %s %s' % (str(datetime.now()), 'Process is finished' ))
354 self.put_structured_out({'completePercent' : 1})
355 break
356
357 if returncode != None and returncode != 0:
358 raise Fail('Hdfs rebalance process exited with error. See the log output')
359
360 def _print(line):
361 sys.stdout.write(line)
362 sys.stdout.flush()
363
364 if __name__ == "__main__":
365 NameNode().execute()