LENS-1527 : Fix session restore issue
[lens.git] / lens-server / src / main / java / org / apache / lens / server / session / HiveSessionService.java
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19 package org.apache.lens.server.session;
20
21 import java.io.IOException;
22 import java.io.ObjectInput;
23 import java.io.ObjectOutput;
24 import java.util.*;
25 import java.util.concurrent.*;
26
27 import javax.ws.rs.BadRequestException;
28 import javax.ws.rs.ClientErrorException;
29 import javax.ws.rs.NotFoundException;
30 import javax.ws.rs.WebApplicationException;
31
32 import org.apache.lens.api.LensSessionHandle;
33 import org.apache.lens.server.BaseLensService;
34 import org.apache.lens.server.api.LensConfConstants;
35 import org.apache.lens.server.api.error.LensException;
36 import org.apache.lens.server.api.health.HealthStatus;
37 import org.apache.lens.server.api.session.*;
38 import org.apache.lens.server.session.LensSessionImpl.ResourceEntry;
39
40 import org.apache.commons.lang3.StringUtils;
41 import org.apache.hadoop.conf.Configuration;
42 import org.apache.hadoop.hive.conf.HiveConf;
43 import org.apache.hadoop.hive.conf.SystemVariables;
44 import org.apache.hadoop.hive.ql.metadata.Hive;
45 import org.apache.hadoop.hive.ql.session.SessionState;
46 import org.apache.hive.service.cli.CLIService;
47 import org.apache.hive.service.cli.HiveSQLException;
48 import org.apache.hive.service.cli.OperationHandle;
49
50 import com.google.common.collect.Maps;
51 import lombok.AccessLevel;
52 import lombok.Getter;
53 import lombok.extern.slf4j.Slf4j;
54
55 /**
56 * The Class HiveSessionService.
57 */
58 @Slf4j
59 public class HiveSessionService extends BaseLensService implements SessionService {
60
61
62 /** The restorable sessions. */
63 private List<LensSessionImpl.LensSessionPersistInfo> restorableSessions;
64
65 /** The session expiry thread. */
66 private ScheduledExecutorService sessionExpiryThread;
67
68 /** The session expiry runnable. */
69 private Runnable sessionExpiryRunnable = new SessionExpiryRunnable();
70
71 /** Service to manage database specific resources */
72 @Getter(AccessLevel.PROTECTED)
73 private DatabaseResourceService databaseResourceService;
74
75 /**
76 * The conf.
77 */
78 private Configuration conf;
79
80 /**
81 * Instantiates a new hive session service.
82 *
83 * @param cliService the cli service
84 */
85 public HiveSessionService(CLIService cliService) {
86 super(NAME, cliService);
87 }
88
89 @Override
90 public List<String> listAllResources(LensSessionHandle sessionHandle, String type) {
91 if (!isValidResouceType(type)) {
92 throw new BadRequestException("Bad resource type is passed. Please pass jar or file as source type");
93 }
94 List<ResourceEntry> resources = getSession(sessionHandle).getResources();
95 List<String> allResources = new ArrayList<String>();
96 for (ResourceEntry resource : resources) {
97 if (type == null || resource.getType().equalsIgnoreCase(type)) {
98 allResources.add(resource.toString());
99 }
100 }
101 return allResources;
102 }
103
104 private boolean isValidResouceType(String type) {
105 return (type == null || type.equalsIgnoreCase("jar") || type.equalsIgnoreCase("file"));
106 }
107
108 /**
109 * {@inheritDoc}
110 */
111 @Override
112 public void addResource(LensSessionHandle sessionid, String type, String path) {
113 try {
114 acquire(sessionid);
115 SessionState ss = getSession(sessionid).getSessionState();
116 String finalLocation = ss.add_resource(SessionState.ResourceType.valueOf(type.toUpperCase()), path);
117 getSession(sessionid).addResource(type, path, finalLocation);
118 } catch (RuntimeException e) {
119 log.error("Failed to add resource type:" + type + " path:" + path + " in session", e);
120 throw new WebApplicationException(e);
121 } finally {
122 release(sessionid);
123 }
124 }
125
126 private void addResourceUponRestart(LensSessionHandle sessionid, ResourceEntry resourceEntry) {
127 try {
128 acquire(sessionid);
129 SessionState ss = getSession(sessionid).getSessionState();
130 resourceEntry.location = ss.add_resource(SessionState.ResourceType.valueOf(resourceEntry.getType()),
131 resourceEntry.getUri());
132 if (resourceEntry.location == null) {
133 throw new NullPointerException("Resource's final location cannot be null");
134 }
135 } finally {
136 release(sessionid);
137 }
138 }
139 /**
140 * {@inheritDoc}
141 */
142 @Override
143 public void deleteResource(LensSessionHandle sessionid, String type, String path) {
144 String command = "delete " + type.toLowerCase() + " " + path;
145 try {
146 acquire(sessionid);
147 closeCliServiceOp(getCliService().executeStatement(getHiveSessionHandle(sessionid), command, null));
148 getSession(sessionid).removeResource(type, path);
149 } catch (HiveSQLException e) {
150 throw new WebApplicationException(e);
151 } finally {
152 release(sessionid);
153 }
154 }
155
156 /**
157 * Gets the session param.
158 *
159 * @param sessionConf the session conf
160 * @param ss the ss
161 * @param varname the varname
162 * @return the session param
163 */
164 private String getSessionParam(Configuration sessionConf, SessionState ss, String varname) {
165 if (varname.indexOf(SystemVariables.HIVEVAR_PREFIX) == 0) {
166 String var = varname.substring(SystemVariables.HIVEVAR_PREFIX.length());
167 if (ss.getHiveVariables().get(var) != null) {
168 return SystemVariables.HIVEVAR_PREFIX + var + "=" + ss.getHiveVariables().get(var);
169 } else {
170 throw new NotFoundException(varname + " is undefined as a hive variable");
171 }
172 } else {
173 String var;
174 if (varname.indexOf(SystemVariables.HIVECONF_PREFIX) == 0) {
175 var = varname.substring(SystemVariables.HIVECONF_PREFIX.length());
176 } else {
177 var = varname;
178 }
179 if (sessionConf.get(var) != null) {
180 return varname + "=" + sessionConf.get(var);
181 } else {
182 throw new NotFoundException(varname + " is undefined");
183 }
184 }
185 }
186
187 /**
188 * {@inheritDoc}
189 */
190 @Override
191 public LensSessionHandle openSession(String username, String password, String database,
192 Map<String, String> configuration)
193 throws LensException {
194 LensSessionHandle sessionid = super.openSession(username, password, configuration);
195 log.info("Opened session " + sessionid + " for user " + username);
196 notifyEvent(new SessionOpened(System.currentTimeMillis(), sessionid, username));
197
198 // Set current database
199 if (StringUtils.isNotBlank(database)) {
200 try {
201 if (!Hive.get(getSession(sessionid).getHiveConf()).databaseExists(database)) {
202 closeSession(sessionid);
203 log.info("Closed session " + sessionid.getPublicId().toString() + " as db " + database + " does not exist");
204 throw new NotFoundException("Database " + database + " does not exist");
205 }
206 } catch (Exception e) {
207 if (!(e instanceof NotFoundException)) {
208 try {
209 closeSession(sessionid);
210 } catch (LensException e2) {
211 log.error("Error closing session " + sessionid.getPublicId().toString(), e2);
212 }
213
214 log.error("Error in checking if database exists " + database, e);
215 throw new LensException("Error in checking if database exists" + database, e);
216 } else {
217 throw (NotFoundException) e;
218 }
219 }
220
221 getSession(sessionid).setCurrentDatabase(database);
222 log.info("Set database to " + database + " for session " + sessionid.getPublicId());
223 }
224
225 // add auxuiliary jars
226 String[] auxJars = getSession(sessionid).getSessionConf().getStrings(LensConfConstants.AUX_JARS);
227
228 if (auxJars != null) {
229 for (String jar : auxJars) {
230 log.info("Adding aux jar:" + jar);
231 addResource(sessionid, "jar", jar);
232 }
233 }
234 return sessionid;
235 }
236
237 @Override
238 public boolean isOpen(LensSessionHandle sessionHandle) {
239 return SESSION_MAP.containsKey(sessionHandle.getPublicId().toString());
240 }
241
242 /**
243 * {@inheritDoc}
244 */
245 @Override
246 public List<String> getAllSessionParameters(LensSessionHandle sessionid, boolean verbose, String key)
247 throws LensException {
248 List<String> result = new ArrayList<String>();
249 acquire(sessionid);
250 try {
251 SessionState ss = getSession(sessionid).getSessionState();
252 if (!StringUtils.isBlank(key)) {
253 result.add(getSessionParam(getSession(sessionid).getSessionConf(), ss, key));
254 } else {
255 SortedMap<String, String> sortedMap = new TreeMap<String, String>();
256 sortedMap.put("silent", (ss.getIsSilent() ? "on" : "off"));
257 for (String s : ss.getHiveVariables().keySet()) {
258 sortedMap.put(SystemVariables.HIVEVAR_PREFIX + s, ss.getHiveVariables().get(s));
259 }
260 for (Map.Entry<String, String> entry : getSession(sessionid).getSessionConf()) {
261 sortedMap.put(entry.getKey(), entry.getValue());
262 }
263
264 for (Map.Entry<String, String> entry : sortedMap.entrySet()) {
265 result.add(entry.toString());
266 }
267 }
268 } finally {
269 release(sessionid);
270 }
271 return result;
272 }
273
274 /**
275 * {@inheritDoc}
276 */
277 @Override
278 public void setSessionParameter(LensSessionHandle sessionid, String key, String value) {
279 HashMap<String, String> config = Maps.newHashMap();
280 config.put(key, value);
281 setSessionParameters(sessionid, config);
282 }
283
284 /**
285 * Sets the session parameter.
286 *
287 * @param sessionid the sessionid
288 * @param config map of string-string. each entry represents key and the value to be set for that key
289 */
290
291 protected void setSessionParameters(LensSessionHandle sessionid, Map<String, String> config) {
292 log.info("Request to Set params:" + config);
293 try {
294 acquire(sessionid);
295 // set in session conf
296 for(Map.Entry<String, String> entry: config.entrySet()) {
297 String var = entry.getKey();
298 if (var.indexOf(SystemVariables.HIVECONF_PREFIX) == 0) {
299 var = var.substring(SystemVariables.HIVECONF_PREFIX.length());
300 }
301 getSession(sessionid).getSessionConf().set(var, entry.getValue());
302 String command = "set" + " " + entry.getKey() + "= " + entry.getValue();
303 closeCliServiceOp(getCliService().executeStatement(getHiveSessionHandle(sessionid), command, null));
304 }
305 // add to persist
306 getSession(sessionid).setConfig(config);
307 log.info("Set params:" + config);
308 } catch (HiveSQLException e) {
309 throw new WebApplicationException(e);
310 } finally {
311 release(sessionid);
312 }
313 }
314
315 private void setSessionParametersOnRestore(LensSessionHandle sessionid, Map<String, String> config) {
316 // set in session conf
317 for(Map.Entry<String, String> entry: config.entrySet()) {
318 String var = entry.getKey();
319 if (var.indexOf(SystemVariables.HIVECONF_PREFIX) == 0) {
320 var = var.substring(SystemVariables.HIVECONF_PREFIX.length());
321 }
322 getSession(sessionid).getSessionConf().set(var, entry.getValue());
323 getSession(sessionid).getHiveConf().set(entry.getKey(), entry.getValue());
324 }
325 log.info("Set params on restart:" + config);
326 }
327
328 /*
329 * (non-Javadoc)
330 *
331 * @see org.apache.hive.service.CompositeService#init()
332 */
333 @Override
334 public synchronized void init(HiveConf hiveConf) {
335 this.databaseResourceService = new DatabaseResourceService(DatabaseResourceService.NAME);
336 addService(this.databaseResourceService);
337 this.conf = hiveConf;
338
339 super.init(hiveConf);
340 }
341
342 /*
343 * (non-Javadoc)
344 *
345 * @see org.apache.hive.service.CompositeService#start()
346 */
347 @Override
348 public synchronized void start() {
349 super.start();
350
351 sessionExpiryThread = Executors.newSingleThreadScheduledExecutor(runnable
352 -> new Thread(runnable, "Session-expiry-thread"));
353
354 int sessionExpiryInterval = getSessionExpiryInterval();
355 sessionExpiryThread.scheduleWithFixedDelay(sessionExpiryRunnable, sessionExpiryInterval,
356 sessionExpiryInterval, TimeUnit.SECONDS);
357
358 // Restore sessions if any
359 if (restorableSessions == null || restorableSessions.size() <= 0) {
360 log.info("No sessions to restore");
361 return;
362 }
363
364 for (LensSessionImpl.LensSessionPersistInfo persistInfo : restorableSessions) {
365 try {
366 LensSessionHandle sessionHandle = persistInfo.getSessionHandle();
367 restoreSession(sessionHandle, persistInfo.getUsername(), persistInfo.getPassword(), persistInfo.getConfig());
368 LensSessionImpl session = getSession(sessionHandle);
369 session.getLensSessionPersistInfo().setLastAccessTime(persistInfo.getLastAccessTime());
370 session.getLensSessionPersistInfo().setConfig(persistInfo.getConfig());
371 session.getLensSessionPersistInfo().setResources(persistInfo.getResources());
372 session.setCurrentDatabase(persistInfo.getDatabase());
373 session.getLensSessionPersistInfo().setMarkedForClose(persistInfo.isMarkedForClose());
374
375 // Add resources for restored sessions
376 for (LensSessionImpl.ResourceEntry resourceEntry : session.getResources()) {
377 try {
378 addResourceUponRestart(sessionHandle, resourceEntry);
379 } catch (Exception e) {
380 log.error("Failed to restore resource for session: " + session + " resource: " + resourceEntry, e);
381 }
382 }
383
384 // Add config for restored sessions
385 try{
386 setSessionParametersOnRestore(sessionHandle, session.getConfig());
387 } catch (Exception e) {
388 log.error("Error setting parameters " + session.getConfig()
389 + " for session: " + session, e);
390 }
391 log.info("Restored session " + persistInfo.getSessionHandle().getPublicId());
392 notifyEvent(new SessionRestored(System.currentTimeMillis(), sessionHandle));
393 } catch (LensException e) {
394 throw new RuntimeException(e);
395 }
396 }
397 log.info("Session service restored " + restorableSessions.size() + " sessions");
398 }
399
400 private int getSessionExpiryInterval() {
401 return conf.getInt(LensConfConstants.SESSION_EXPIRY_SERVICE_INTERVAL_IN_SECS,
402 LensConfConstants.DEFAULT_SESSION_EXPIRY_SERVICE_INTERVAL_IN_SECS);
403 }
404
405 /*
406 * (non-Javadoc)
407 *
408 * @see org.apache.hive.service.CompositeService#stop()
409 */
410 @Override
411 public synchronized void stop() {
412 super.stop();
413 if (sessionExpiryThread != null) {
414 sessionExpiryThread.shutdownNow();
415 }
416 }
417
418 /*
419 * (non-Javadoc)
420 *
421 * @see org.apache.lens.server.LensService#writeExternal(java.io.ObjectOutput)
422 */
423 @Override
424 public void writeExternal(ObjectOutput out) throws IOException {
425 // Write out all the sessions
426 List<LensSessionImpl> sessions = new ArrayList<>();
427 for (LensSessionHandle sessionHandle : SESSION_MAP.values()) {
428 try {
429 sessions.add(getSession(sessionHandle));
430 } catch (ClientErrorException e) {
431 // warn for invalid/null session and continue.
432 log.warn("Cannot persist " + (sessionHandle != null ? sessionHandle.getPublicId() : "null ")
433 + " session. {}", e);
434 }
435 }
436
437 out.writeInt(sessions.size());
438 for (LensSessionImpl session : sessions) {
439 session.getLensSessionPersistInfo().writeExternal(out);
440 }
441
442 log.info("Session service pesristed " + sessions.size() + " sessions out of " + SESSION_MAP.size());
443 }
444
445 /**
446 * {@inheritDoc}
447 */
448 @Override
449 public HealthStatus getHealthStatus() {
450 return this.getServiceState().equals(STATE.STARTED)
451 ? new HealthStatus(true, "Hive session service is healthy.")
452 : new HealthStatus(false, "Hive session service is down.");
453 }
454
455 /*
456 * (non-Javadoc)
457 *
458 * @see org.apache.lens.server.LensService#readExternal(java.io.ObjectInput)
459 */
460 @Override
461 public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
462 int numSessions = in.readInt();
463 restorableSessions = new ArrayList<LensSessionImpl.LensSessionPersistInfo>();
464
465 for (int i = 0; i < numSessions; i++) {
466 LensSessionImpl.LensSessionPersistInfo persistInfo = new LensSessionImpl.LensSessionPersistInfo();
467 persistInfo.readExternal(in);
468 restorableSessions.add(persistInfo);
469 SESSION_MAP.put(persistInfo.getSessionHandle().getPublicId().toString(), persistInfo.getSessionHandle());
470 }
471 log.info("Session service recovered " + SESSION_MAP.size() + " sessions");
472 }
473
474 /**
475 * {@inheritDoc}
476 */
477 @Override
478 public void closeSession(LensSessionHandle sessionHandle) throws LensException {
479 closeInternal(sessionHandle);
480 notifyEvent(new SessionClosed(System.currentTimeMillis(), sessionHandle));
481 }
482
483 @Override
484 public void cleanupIdleSessions() throws LensException {
485 ScheduledFuture<?> schedule = sessionExpiryThread.schedule(sessionExpiryRunnable, 0, TimeUnit.MILLISECONDS);
486 // wait till completion
487 try {
488 schedule.get();
489 } catch (InterruptedException | ExecutionException e) {
490 throw new LensException(e);
491 }
492 }
493
494 /**
495 * Close a Lens server session
496 * @param sessionHandle session handle
497 * @throws LensException
498 */
499 private void closeInternal(LensSessionHandle sessionHandle) throws LensException {
500 super.closeSession(sessionHandle);
501 }
502
503 /**
504 * Close operation created for underlying CLI service
505 * @param op operation handle
506 */
507 private void closeCliServiceOp(OperationHandle op) {
508 if (op != null) {
509 try {
510 getCliService().closeOperation(op);
511 } catch (HiveSQLException e) {
512 log.error("Error closing operation " + op.getHandleIdentifier(), e);
513 }
514 }
515 }
516
517 public Runnable getSessionExpiryRunnable() {
518 return sessionExpiryRunnable;
519 }
520
521 /**
522 * The Class SessionExpiryRunnable.
523 */
524 public class SessionExpiryRunnable implements Runnable {
525
526 /**
527 * Run internal.
528 */
529 public void runInternal() {
530 List<LensSessionHandle> sessionsToRemove = new ArrayList<>(SESSION_MAP.values());
531 Iterator<LensSessionHandle> itr = sessionsToRemove.iterator();
532 while (itr.hasNext()) {
533 LensSessionHandle sessionHandle = itr.next();
534 try {
535 LensSessionImpl session = getSession(sessionHandle);
536 if (session.isActive()) {
537 itr.remove();
538 }
539 } catch (ClientErrorException nfe) {
540 log.error("Error getting session " + sessionHandle.getPublicId(), nfe);
541 itr.remove();
542 }
543 }
544
545 log.info("Sessions to remove : {} out of {} all sessions", sessionsToRemove.size(), SESSION_MAP.size());
546 // Now close all inactive sessions
547 for (LensSessionHandle sessionHandle : sessionsToRemove) {
548 try {
549 long lastAccessTime = getSession(sessionHandle).getLastAccessTime();
550 closeInternal(sessionHandle);
551 log.info("Closed inactive session " + sessionHandle.getPublicId() + " last accessed at "
552 + new Date(lastAccessTime));
553 notifyEvent(new SessionExpired(System.currentTimeMillis(), sessionHandle));
554 } catch (ClientErrorException nfe) {
555 log.error("Error getting session " + sessionHandle.getPublicId(), nfe);
556 // Do nothing
557 } catch (LensException e) {
558 log.error("Error closing session " + sessionHandle.getPublicId() + " reason " + e.getMessage(), e);
559 }
560 }
561 }
562
563 /*
564 * (non-Javadoc)
565 *
566 * @see java.lang.Runnable#run()
567 */
568 @Override
569 public void run() {
570 try {
571 log.info("Running session expiry run");
572 runInternal();
573 } catch (Exception e) {
574 log.warn("Unknown error while checking for inactive sessions - ", e);
575 }
576 }
577 }
578
579 }