AMBARI-21581 - Replace Hard Coded conf-select Structures (jonathanhurley)
[ambari.git] / ambari-server / src / main / resources / common-services / ATLAS / 0.1.0.2.3 / package / scripts / metadata_server.py
1 """
2 Licensed to the Apache Software Foundation (ASF) under one
3 or more contributor license agreements. See the NOTICE file
4 distributed with this work for additional information
5 regarding copyright ownership. The ASF licenses this file
6 to you under the Apache License, Version 2.0 (the
7 "License"); you may not use this file except in compliance
8 with the License. You may obtain a copy of the License at
9
10 http://www.apache.org/licenses/LICENSE-2.0
11
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17
18 """
19 # Python Imports
20 import os
21
22 # Local Imports
23 from metadata import metadata
24 from resource_management import Fail
25 from resource_management.libraries.functions import stack_select
26 from resource_management.core.resources.system import Execute, File
27 from resource_management.libraries.script.script import Script
28 from resource_management.libraries.functions.version import format_stack_version
29 from resource_management.libraries.functions.check_process_status import check_process_status
30 from resource_management.libraries.functions.format import format
31 from resource_management.libraries.functions.security_commons import build_expectations, \
32 get_params_from_filesystem, validate_security_config_properties, \
33 FILE_TYPE_PROPERTIES
34 from resource_management.libraries.functions.show_logs import show_logs
35 from resource_management.libraries.functions.stack_features import check_stack_feature, get_stack_feature_version
36 from resource_management.libraries.functions.constants import StackFeature
37 from resource_management.core.resources.system import Directory
38 from resource_management.core.logger import Logger
39 from setup_ranger_atlas import setup_ranger_atlas
40 from resource_management.core.resources.zkmigrator import ZkMigrator
41
42 class MetadataServer(Script):
43 def install(self, env):
44 import params
45 env.set_params(params)
46
47 Directory(format("{expanded_war_dir}/atlas"),
48 action = "delete",
49 )
50
51 self.install_packages(env)
52
53 def configure(self, env, upgrade_type=None, config_dir=None):
54 import params
55 env.set_params(params)
56 metadata()
57
58 def pre_upgrade_restart(self, env, upgrade_type=None):
59 import params
60 env.set_params(params)
61
62 if check_stack_feature(StackFeature.ATLAS_UPGRADE_SUPPORT, params.version):
63 stack_select.select_packages(params.version)
64
65 def start(self, env, upgrade_type=None):
66 import params
67
68 env.set_params(params)
69 self.configure(env)
70
71 daemon_cmd = format('source {params.conf_dir}/atlas-env.sh ; {params.metadata_start_script}')
72 no_op_test = format('ls {params.pid_file} >/dev/null 2>&1 && ps -p `cat {params.pid_file}` >/dev/null 2>&1')
73 atlas_hbase_setup_command = format("cat {atlas_hbase_setup} | hbase shell -n")
74 atlas_kafka_setup_command = format("bash {atlas_kafka_setup}")
75 secure_atlas_hbase_setup_command = format("kinit -kt {hbase_user_keytab} {hbase_principal_name}; ") + atlas_hbase_setup_command
76 # in case if principal was distributed across several hosts, pattern need to be replaced to right one
77 secure_atlas_kafka_setup_command = format("kinit -kt {kafka_keytab} {kafka_principal_name}; ").replace("_HOST", params.hostname) + atlas_kafka_setup_command
78
79 if params.stack_supports_atlas_ranger_plugin:
80 Logger.info('Atlas plugin is enabled, configuring Atlas plugin.')
81 setup_ranger_atlas(upgrade_type=upgrade_type)
82 else:
83 Logger.info('Atlas plugin is not supported or enabled.')
84
85 try:
86 effective_version = get_stack_feature_version(params.config)
87
88 if check_stack_feature(StackFeature.ATLAS_HBASE_SETUP, effective_version):
89 if params.security_enabled and params.has_hbase_master:
90 Execute(secure_atlas_hbase_setup_command,
91 tries = 5,
92 try_sleep = 10,
93 user=params.hbase_user
94 )
95 elif params.enable_ranger_hbase and not params.security_enabled:
96 Execute(atlas_hbase_setup_command,
97 tries = 5,
98 try_sleep = 10,
99 user=params.hbase_user
100 )
101
102 if check_stack_feature(StackFeature.ATLAS_UPGRADE_SUPPORT, effective_version) and params.security_enabled:
103 try:
104 Execute(secure_atlas_kafka_setup_command,
105 user=params.kafka_user,
106 tries=5,
107 try_sleep=10
108 )
109 except Fail:
110 pass # do nothing and do not block Atlas start, fail logs would be available via Execute internals
111
112 Execute(daemon_cmd,
113 user=params.metadata_user,
114 not_if=no_op_test
115 )
116 except:
117 show_logs(params.log_dir, params.metadata_user)
118 raise
119
120 def stop(self, env, upgrade_type=None):
121 import params
122
123 env.set_params(params)
124 daemon_cmd = format('source {params.conf_dir}/atlas-env.sh; {params.metadata_stop_script}')
125
126 # If the pid dir doesn't exist, this means either
127 # 1. The user just added Atlas service and issued a restart command (stop+start). So stop should be a no-op
128 # since there's nothing to stop.
129 # OR
130 # 2. The user changed the value of the pid dir config and incorrectly issued a restart command.
131 # In which case the stop command cannot do anything since Ambari doesn't know which process to kill.
132 # The start command will spawn another instance.
133 # The user should have issued a stop, changed the config, and then started it.
134 if not os.path.isdir(params.pid_dir):
135 Logger.info("*******************************************************************")
136 Logger.info("Will skip the stop command since this is the first time stopping/restarting Atlas "
137 "and the pid dir does not exist, %s\n" % params.pid_dir)
138 return
139
140 try:
141 Execute(daemon_cmd,
142 user=params.metadata_user,
143 )
144 except:
145 show_logs(params.log_dir, params.metadata_user)
146 raise
147
148 File(params.pid_file, action="delete")
149
150 def disable_security(self, env):
151 import params
152 if not params.zookeeper_quorum:
153 Logger.info("No zookeeper connection string. Skipping reverting ACL")
154 return
155 zkmigrator = ZkMigrator(params.zookeeper_quorum, params.java_exec, params.java64_home, params.atlas_jaas_file, params.metadata_user)
156 zkmigrator.set_acls(params.zk_root if params.zk_root.startswith('/') else '/' + params.zk_root, 'world:anyone:crdwa')
157 if params.atlas_kafka_group_id:
158 zkmigrator.set_acls(format('/consumers/{params.atlas_kafka_group_id}'), 'world:anyone:crdwa')
159
160 def status(self, env):
161 import status_params
162
163 env.set_params(status_params)
164 check_process_status(status_params.pid_file)
165
166 def get_log_folder(self):
167 import params
168
169 return params.log_dir
170
171 def get_user(self):
172 import params
173
174 return params.metadata_user
175
176
177 def get_pid_files(self):
178 import status_params
179 return [status_params.pid_file]
180
181 if __name__ == "__main__":
182 MetadataServer().execute()