HIVE-18192: Introduce WriteID per table rather than using global transaction ID ...
[hive.git] / hcatalog / streaming / src / java / org / apache / hive / hcatalog / streaming / mutate / client / MutatorClient.java
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18 package org.apache.hive.hcatalog.streaming.mutate.client;
19
20 import java.io.Closeable;
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.Collection;
24 import java.util.Collections;
25 import java.util.List;
26 import java.util.Map;
27
28 import org.apache.hadoop.hive.conf.HiveConf;
29 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
30 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
31 import org.apache.hadoop.hive.metastore.api.Table;
32 import org.apache.hadoop.hive.ql.io.AcidUtils;
33 import org.apache.hive.hcatalog.streaming.mutate.client.lock.Lock;
34 import org.apache.hive.hcatalog.streaming.mutate.client.lock.LockFailureListener;
35 import org.apache.thrift.TException;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
38
39 /**
40 * Responsible for orchestrating {@link Transaction Transactions} within which ACID table mutation events can occur.
41 * Typically this will be a large batch of delta operations.
42 */
43 public class MutatorClient implements Closeable {
44
45 private static final Logger LOG = LoggerFactory.getLogger(MutatorClient.class);
46 private static final String TRANSACTIONAL_PARAM_KEY = "transactional";
47
48 private final IMetaStoreClient metaStoreClient;
49 private final Lock.Options lockOptions;
50 private final List<AcidTable> tables;
51 private boolean connected;
52
53 MutatorClient(IMetaStoreClient metaStoreClient, HiveConf configuration, LockFailureListener lockFailureListener,
54 String user, Collection<AcidTable> tables) {
55 this.metaStoreClient = metaStoreClient;
56 this.tables = Collections.unmodifiableList(new ArrayList<>(tables));
57
58 lockOptions = new Lock.Options()
59 .configuration(configuration)
60 .lockFailureListener(lockFailureListener == null ? LockFailureListener.NULL_LISTENER : lockFailureListener)
61 .user(user);
62 for (AcidTable table : tables) {
63 switch (table.getTableType()) {
64 case SOURCE:
65 lockOptions.addSourceTable(table.getDatabaseName(), table.getTableName());
66 break;
67 case SINK:
68 lockOptions.addSinkTable(table.getDatabaseName(), table.getTableName());
69 break;
70 default:
71 throw new IllegalArgumentException("Unknown TableType: " + table.getTableType());
72 }
73 }
74 }
75
76 /**
77 * Connects to the {@link IMetaStoreClient meta store} that will be used to manage {@link Transaction} life-cycles.
78 * Also checks that the tables destined to receive mutation events are able to do so. The client should only hold one
79 * open transaction at any given time (TODO: enforce this).
80 */
81 public void connect() throws ConnectionException {
82 if (connected) {
83 throw new ConnectionException("Already connected.");
84 }
85 for (AcidTable table : tables) {
86 checkTable(metaStoreClient, table);
87 }
88 LOG.debug("Connected to end point {}", metaStoreClient);
89 connected = true;
90 }
91
92 /** Creates a new {@link Transaction} by opening a transaction with the {@link IMetaStoreClient meta store}. */
93 public Transaction newTransaction() throws TransactionException {
94 if (!connected) {
95 throw new TransactionException("Not connected - cannot create transaction.");
96 }
97 Transaction transaction = new Transaction(metaStoreClient, lockOptions);
98 long txnId = transaction.getTransactionId();
99 for (AcidTable table : tables) {
100 try {
101 table.setWriteId(metaStoreClient.allocateTableWriteId(txnId,
102 table.getDatabaseName(), table.getTableName()));
103 } catch (TException ex) {
104 try {
105 metaStoreClient.rollbackTxn(txnId);
106 } catch (TException e) {
107 LOG.warn("Allocation of write id failed for table {} and rollback transaction {} failed due to {}",
108 AcidUtils.getFullTableName(table.getDatabaseName(), table.getTableName()), txnId, e.getMessage());
109 }
110 throw new TransactionException("Unable to allocate table write ID for table "
111 + AcidUtils.getFullTableName(table.getDatabaseName(), table.getTableName())
112 + " under txn " + txnId, ex);
113 }
114 }
115 LOG.debug("Created transaction {}", transaction);
116 return transaction;
117 }
118
119 /** Did the client connect successfully. Note the the client may have since become disconnected. */
120 public boolean isConnected() {
121 return connected;
122 }
123
124 /**
125 * Closes the client releasing any {@link IMetaStoreClient meta store} connections held. Does not notify any open
126 * transactions (TODO: perhaps it should?)
127 */
128 @Override
129 public void close() throws IOException {
130 metaStoreClient.close();
131 LOG.debug("Closed client.");
132 connected = false;
133 }
134
135 /**
136 * Returns the list of managed {@link AcidTable AcidTables} that can receive mutation events under the control of this
137 * client.
138 */
139 public List<AcidTable> getTables() throws ConnectionException {
140 if (!connected) {
141 throw new ConnectionException("Not connected - cannot interrogate tables.");
142 }
143 return Collections.<AcidTable> unmodifiableList(tables);
144 }
145
146 @Override
147 public String toString() {
148 return "MutatorClient [metaStoreClient=" + metaStoreClient + ", connected=" + connected + "]";
149 }
150
151 private void checkTable(IMetaStoreClient metaStoreClient, AcidTable acidTable) throws ConnectionException {
152 try {
153 LOG.debug("Checking table {}.", acidTable.getQualifiedName());
154 Table metaStoreTable = metaStoreClient.getTable(acidTable.getDatabaseName(), acidTable.getTableName());
155
156 if (acidTable.getTableType() == TableType.SINK) {
157 Map<String, String> parameters = metaStoreTable.getParameters();
158 if (!Boolean.parseBoolean(parameters.get(TRANSACTIONAL_PARAM_KEY))) {
159 throw new ConnectionException("Cannot stream to table that is not transactional: '"
160 + acidTable.getQualifiedName() + "'.");
161 }
162 int totalBuckets = metaStoreTable.getSd().getNumBuckets();
163 LOG.debug("Table {} has {} buckets.", acidTable.getQualifiedName(), totalBuckets);
164 if (totalBuckets <= 0) {
165 throw new ConnectionException("Cannot stream to table that has not been bucketed: '"
166 + acidTable.getQualifiedName() + "'.");
167 }
168
169 String outputFormat = metaStoreTable.getSd().getOutputFormat();
170 LOG.debug("Table {} has {} OutputFormat.", acidTable.getQualifiedName(), outputFormat);
171 acidTable.setTable(metaStoreTable);
172 }
173 } catch (NoSuchObjectException e) {
174 throw new ConnectionException("Invalid table '" + acidTable.getQualifiedName() + "'", e);
175 } catch (TException e) {
176 throw new ConnectionException("Error communicating with the meta store", e);
177 }
178 LOG.debug("Table {} OK.", acidTable.getQualifiedName());
179 }
180
181 }