HIVE-19154: Poll notification events to invalidate the results cache (Jason Dere...
[hive.git] / ql / src / java / org / apache / hadoop / hive / ql / metadata / events / NotificationEventPoll.java
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19 package org.apache.hadoop.hive.ql.metadata.events;
20
21 import java.util.ArrayList;
22 import java.util.List;
23 import java.util.concurrent.Executors;
24 import java.util.concurrent.ScheduledExecutorService;
25 import java.util.concurrent.ScheduledFuture;
26 import java.util.concurrent.ThreadFactory;
27 import java.util.concurrent.TimeUnit;
28 import java.util.concurrent.atomic.AtomicBoolean;
29 import java.util.function.Consumer;
30
31 import org.apache.hadoop.conf.Configuration;
32 import org.apache.hadoop.hive.common.JavaUtils;
33 import org.apache.hadoop.hive.conf.HiveConf;
34 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
35 import org.apache.hadoop.hive.metastore.messaging.EventUtils;
36 import org.apache.hadoop.hive.ql.metadata.Hive;
37 import org.apache.hadoop.hive.ql.parse.repl.dump.events.EventHandler;
38 import org.apache.hadoop.util.ReflectionUtils;
39
40 import com.google.common.util.concurrent.ThreadFactoryBuilder;
41
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
44
45 public class NotificationEventPoll {
46 private static final Logger LOG = LoggerFactory.getLogger(NotificationEventPoll.class);
47 private static final AtomicBoolean inited = new AtomicBoolean(false);
48 private static NotificationEventPoll instance;
49
50 Configuration conf;
51 ScheduledExecutorService executorService;
52 List<EventConsumer> eventConsumers = new ArrayList<>();
53 ScheduledFuture<?> pollFuture;
54 long lastCheckedEventId;
55
56 public static void initialize(Configuration conf) throws Exception {
57 if (!inited.getAndSet(true)) {
58 try {
59 instance = new NotificationEventPoll(conf);
60 } catch (Exception err) {
61 inited.set(false);
62 throw err;
63 }
64 }
65 }
66
67 public static void shutdown() {
68 // Should only be called for testing.
69 if (inited.get()) {
70 instance.stop();
71 instance = null;
72 inited.set(false);
73 }
74 }
75
76 private NotificationEventPoll(Configuration conf) throws Exception {
77 this.conf = conf;
78
79 long pollInterval = HiveConf.getTimeVar(conf,
80 HiveConf.ConfVars.HIVE_NOTFICATION_EVENT_POLL_INTERVAL, TimeUnit.MILLISECONDS);
81 if (pollInterval <= 0) {
82 LOG.debug("Non-positive poll interval configured, notification event polling disabled");
83 return;
84 }
85
86 // Initialize the list of event handlers
87 String[] consumerClassNames =
88 conf.getStrings(HiveConf.ConfVars.HIVE_NOTFICATION_EVENT_CONSUMERS.varname);
89 if (consumerClassNames != null && consumerClassNames.length > 0) {
90 for (String consumerClassName : consumerClassNames) {
91 Class<?> consumerClass = JavaUtils.loadClass(consumerClassName);
92 EventConsumer consumer =
93 (EventConsumer) ReflectionUtils.newInstance(consumerClass, conf);
94 eventConsumers.add(consumer);
95 }
96 } else {
97 LOG.debug("No event consumers configured, notification event polling disabled");
98 return;
99 }
100
101 EventUtils.MSClientNotificationFetcher evFetcher
102 = new EventUtils.MSClientNotificationFetcher(Hive.get().getMSC());
103 lastCheckedEventId = evFetcher.getCurrentNotificationEventId();
104 LOG.info("Initializing lastCheckedEventId to {}", lastCheckedEventId);
105
106 // Start the scheduled poll task
107 ThreadFactory threadFactory =
108 new ThreadFactoryBuilder()
109 .setDaemon(true)
110 .setNameFormat("NotificationEventPoll %d")
111 .build();
112 executorService = Executors.newSingleThreadScheduledExecutor(threadFactory);
113 pollFuture = executorService.scheduleAtFixedRate(new Poller(),
114 pollInterval, pollInterval, TimeUnit.MILLISECONDS);
115 }
116
117 private void stop() {
118 if (pollFuture != null) {
119 pollFuture.cancel(true);
120 pollFuture = null;
121 }
122 if (executorService != null) {
123 executorService.shutdown();
124 executorService = null;
125 }
126 }
127
128 class Poller implements Runnable {
129 @Override
130 public void run() {
131 LOG.debug("Polling for notification events");
132
133 int eventsProcessed = 0;
134 try {
135 // Get any new notification events that have been since the last time we checked,
136 // And pass them on to the event handlers.
137 EventUtils.MSClientNotificationFetcher evFetcher
138 = new EventUtils.MSClientNotificationFetcher(Hive.get().getMSC());
139 EventUtils.NotificationEventIterator evIter =
140 new EventUtils.NotificationEventIterator(evFetcher, lastCheckedEventId, 0, "*", null);
141
142 while (evIter.hasNext()) {
143 NotificationEvent event = evIter.next();
144 LOG.debug("Event: " + event);
145 for (EventConsumer eventConsumer : eventConsumers) {
146 try {
147 eventConsumer.accept(event);
148 } catch (Exception err) {
149 LOG.error("Error processing notification event " + event, err);
150 }
151 }
152 eventsProcessed++;
153 lastCheckedEventId = event.getEventId();
154 }
155 } catch (Exception err) {
156 LOG.error("Error polling for notification events", err);
157 }
158
159 LOG.debug("Processed {} notification events", eventsProcessed);
160 }
161 }
162 }