AMBARI-21581 - Replace Hard Coded conf-select Structures (jonathanhurley)
[ambari.git] / ambari-server / src / main / resources / common-services / ATLAS / 0.7.0.3.0 / package / scripts / metadata_server.py
1 """
2 Licensed to the Apache Software Foundation (ASF) under one
3 or more contributor license agreements. See the NOTICE file
4 distributed with this work for additional information
5 regarding copyright ownership. The ASF licenses this file
6 to you under the Apache License, Version 2.0 (the
7 "License"); you may not use this file except in compliance
8 with the License. You may obtain a copy of the License at
9
10 http://www.apache.org/licenses/LICENSE-2.0
11
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17
18 """
19 # Python Imports
20 import os
21
22 # Local Imports
23 from metadata import metadata
24 from resource_management import Fail
25 from resource_management.libraries.functions import stack_select
26 from resource_management.core.resources.system import Execute, File
27 from resource_management.libraries.script.script import Script
28 from resource_management.libraries.functions.version import format_stack_version
29 from resource_management.libraries.functions.check_process_status import check_process_status
30 from resource_management.libraries.functions.format import format
31 from resource_management.libraries.functions.security_commons import build_expectations, \
32 get_params_from_filesystem, validate_security_config_properties, \
33 FILE_TYPE_PROPERTIES
34 from resource_management.libraries.functions.show_logs import show_logs
35 from resource_management.libraries.functions.stack_features import check_stack_feature, get_stack_feature_version
36 from resource_management.libraries.functions.constants import StackFeature
37 from resource_management.core.resources.system import Directory
38 from resource_management.core.logger import Logger
39 from setup_ranger_atlas import setup_ranger_atlas
40 from resource_management.core.resources.zkmigrator import ZkMigrator
41
42 class MetadataServer(Script):
43
44 def install(self, env):
45 import params
46 env.set_params(params)
47
48 Directory(format("{expanded_war_dir}/atlas"),
49 action = "delete",
50 )
51
52 self.install_packages(env)
53
54 def configure(self, env, upgrade_type=None, config_dir=None):
55 import params
56 env.set_params(params)
57 metadata()
58
59 def pre_upgrade_restart(self, env, upgrade_type=None):
60 import params
61 env.set_params(params)
62
63 if check_stack_feature(StackFeature.ATLAS_UPGRADE_SUPPORT, params.version):
64 stack_select.select_packages(params.version)
65
66 def start(self, env, upgrade_type=None):
67 import params
68
69 env.set_params(params)
70 self.configure(env)
71
72 daemon_cmd = format('source {params.conf_dir}/atlas-env.sh ; {params.metadata_start_script}')
73 no_op_test = format('ls {params.pid_file} >/dev/null 2>&1 && ps -p `cat {params.pid_file}` >/dev/null 2>&1')
74 atlas_hbase_setup_command = format("cat {atlas_hbase_setup} | hbase shell -n")
75 atlas_kafka_setup_command = format("bash {atlas_kafka_setup}")
76 secure_atlas_hbase_setup_command = format("kinit -kt {hbase_user_keytab} {hbase_principal_name}; ") + atlas_hbase_setup_command
77 # in case if principal was distributed across several hosts, pattern need to be replaced to right one
78 secure_atlas_kafka_setup_command = format("kinit -kt {kafka_keytab} {kafka_principal_name}; ").replace("_HOST", params.hostname) + atlas_kafka_setup_command
79
80 if params.stack_supports_atlas_ranger_plugin:
81 Logger.info('Atlas plugin is enabled, configuring Atlas plugin.')
82 setup_ranger_atlas(upgrade_type=upgrade_type)
83 else:
84 Logger.info('Atlas plugin is not supported or enabled.')
85
86 try:
87 effective_version = get_stack_feature_version(params.config)
88
89 if check_stack_feature(StackFeature.ATLAS_HBASE_SETUP, effective_version):
90 if params.security_enabled and params.has_hbase_master:
91 Execute(secure_atlas_hbase_setup_command,
92 tries = 5,
93 try_sleep = 10,
94 user=params.hbase_user
95 )
96 elif params.enable_ranger_hbase and not params.security_enabled:
97 Execute(atlas_hbase_setup_command,
98 tries = 5,
99 try_sleep = 10,
100 user=params.hbase_user
101 )
102
103 if check_stack_feature(StackFeature.ATLAS_UPGRADE_SUPPORT, effective_version) and params.security_enabled:
104 try:
105 Execute(secure_atlas_kafka_setup_command,
106 user=params.kafka_user,
107 tries=5,
108 try_sleep=10
109 )
110 except Fail:
111 pass # do nothing and do not block Atlas start, fail logs would be available via Execute internals
112
113 Execute(daemon_cmd,
114 user=params.metadata_user,
115 not_if=no_op_test
116 )
117 except:
118 show_logs(params.log_dir, params.metadata_user)
119 raise
120
121 def stop(self, env, upgrade_type=None):
122 import params
123
124 env.set_params(params)
125 daemon_cmd = format('source {params.conf_dir}/atlas-env.sh; {params.metadata_stop_script}')
126
127 # If the pid dir doesn't exist, this means either
128 # 1. The user just added Atlas service and issued a restart command (stop+start). So stop should be a no-op
129 # since there's nothing to stop.
130 # OR
131 # 2. The user changed the value of the pid dir config and incorrectly issued a restart command.
132 # In which case the stop command cannot do anything since Ambari doesn't know which process to kill.
133 # The start command will spawn another instance.
134 # The user should have issued a stop, changed the config, and then started it.
135 if not os.path.isdir(params.pid_dir):
136 Logger.info("*******************************************************************")
137 Logger.info("Will skip the stop command since this is the first time stopping/restarting Atlas "
138 "and the pid dir does not exist, %s\n" % params.pid_dir)
139 return
140
141 try:
142 Execute(daemon_cmd,
143 user=params.metadata_user,
144 )
145 except:
146 show_logs(params.log_dir, params.metadata_user)
147 raise
148
149 File(params.pid_file, action="delete")
150
151 def disable_security(self, env):
152 import params
153 if not params.zookeeper_quorum:
154 Logger.info("No zookeeper connection string. Skipping reverting ACL")
155 return
156 zkmigrator = ZkMigrator(params.zookeeper_quorum, params.java_exec, params.java64_home, params.atlas_jaas_file, params.metadata_user)
157 zkmigrator.set_acls(params.zk_root if params.zk_root.startswith('/') else '/' + params.zk_root, 'world:anyone:crdwa')
158 if params.atlas_kafka_group_id:
159 zkmigrator.set_acls(format('/consumers/{params.atlas_kafka_group_id}'), 'world:anyone:crdwa')
160
161 def status(self, env):
162 import status_params
163
164 env.set_params(status_params)
165 check_process_status(status_params.pid_file)
166
167 def get_log_folder(self):
168 import params
169
170 return params.log_dir
171
172 def get_user(self):
173 import params
174
175 return params.metadata_user
176
177
178 def get_pid_files(self):
179 import status_params
180 return [status_params.pid_file]
181
182 if __name__ == "__main__":
183 MetadataServer().execute()