IMPALA-4167: Support insert plan hints for CREATE TABLE AS SELECT
[impala.git] / fe / src / main / java / org / apache / impala / analysis / CreateTableAsSelectStmt.java
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17
18 package org.apache.impala.analysis;
19
20 import java.util.Collections;
21 import java.util.EnumSet;
22 import java.util.List;
23
24 import org.apache.impala.authorization.Privilege;
25 import org.apache.impala.catalog.Db;
26 import org.apache.impala.catalog.HdfsTable;
27 import org.apache.impala.catalog.KuduTable;
28 import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
29 import org.apache.impala.catalog.Table;
30 import org.apache.impala.catalog.Type;
31 import org.apache.impala.common.AnalysisException;
32 import org.apache.impala.rewrite.ExprRewriter;
33 import org.apache.impala.service.CatalogOpExecutor;
34 import org.apache.impala.thrift.THdfsFileFormat;
35
36 import com.google.common.base.Preconditions;
37 import com.google.common.collect.Lists;
38
39 /**
40 * Represents a CREATE TABLE AS SELECT (CTAS) statement
41 *
42 * The statement supports an optional PARTITIONED BY clause. Its syntax and semantics
43 * follow the PARTITION feature of INSERT FROM SELECT statements: inside the PARTITIONED
44 * BY (...) column list the user must specify names of the columns to partition by. These
45 * column names must appear in the specified order at the end of the select statement. A
46 * remapping between columns of the source and destination tables is not possible, because
47 * the destination table does not yet exist. Specifying static values for the partition
48 * columns is also not possible, as their type needs to be deduced from columns in the
49 * select statement.
50 */
51 public class CreateTableAsSelectStmt extends StatementBase {
52 // List of partition columns from the PARTITIONED BY (...) clause. Set to null if no
53 // partition was given.
54 private final List<String> partitionKeys_;
55
56 /////////////////////////////////////////
57 // BEGIN: Members that need to be reset()
58
59 private final CreateTableStmt createStmt_;
60 private final InsertStmt insertStmt_;
61
62 // END: Members that need to be reset()
63 /////////////////////////////////////////
64
65 private final static EnumSet<THdfsFileFormat> SUPPORTED_INSERT_FORMATS =
66 EnumSet.of(THdfsFileFormat.PARQUET, THdfsFileFormat.TEXT, THdfsFileFormat.KUDU);
67
68 /**
69 * Helper class for parsing.
70 * Contains every parameter of the constructor with the exception of hints. This is
71 * needed to keep the production rules that check for optional hints separate from the
72 * rules that check for optional partition info. Merging these independent rules would
73 * make it necessary to create rules for every combination of them.
74 */
75 public static class CtasParams {
76 public CreateTableStmt createStmt;
77 public QueryStmt queryStmt;
78 public List<String> partitionKeys;
79
80 public CtasParams(CreateTableStmt createStmt, QueryStmt queryStmt,
81 List<String> partitionKeys) {
82 this.createStmt = Preconditions.checkNotNull(createStmt);
83 this.queryStmt = Preconditions.checkNotNull(queryStmt);
84 this.partitionKeys = partitionKeys;
85 }
86 }
87
88 /**
89 * Builds a CREATE TABLE AS SELECT statement
90 */
91 public CreateTableAsSelectStmt(CtasParams params, List<PlanHint> planHints) {
92 createStmt_ = params.createStmt;
93 partitionKeys_ = params.partitionKeys;
94 List<PartitionKeyValue> pkvs = null;
95 if (partitionKeys_ != null) {
96 pkvs = Lists.newArrayList();
97 for (String key: partitionKeys_) {
98 pkvs.add(new PartitionKeyValue(key, null));
99 }
100 }
101 insertStmt_ = InsertStmt.createInsert(null, createStmt_.getTblName(), false, pkvs,
102 planHints, null, params.queryStmt, null);
103 }
104
105 public QueryStmt getQueryStmt() { return insertStmt_.getQueryStmt(); }
106 public InsertStmt getInsertStmt() { return insertStmt_; }
107 public CreateTableStmt getCreateStmt() { return createStmt_; }
108 @Override
109 public String toSql() { return ToSqlUtils.getCreateTableSql(this); }
110
111 @Override
112 public void collectTableRefs(List<TableRef> tblRefs) {
113 createStmt_.collectTableRefs(tblRefs);
114 insertStmt_.collectTableRefs(tblRefs);
115 }
116
117 @Override
118 public void analyze(Analyzer analyzer) throws AnalysisException {
119 if (isAnalyzed()) return;
120 super.analyze(analyzer);
121
122 if (!SUPPORTED_INSERT_FORMATS.contains(createStmt_.getFileFormat())) {
123 throw new AnalysisException(String.format("CREATE TABLE AS SELECT " +
124 "does not support the (%s) file format. Supported formats are: (%s)",
125 createStmt_.getFileFormat().toString().replace("_", ""),
126 "PARQUET, TEXTFILE, KUDU"));
127 }
128 if (createStmt_.getFileFormat() == THdfsFileFormat.KUDU && createStmt_.isExternal()) {
129 // TODO: Add support for CTAS on external Kudu tables (see IMPALA-4318)
130 throw new AnalysisException(String.format("CREATE TABLE AS SELECT is not " +
131 "supported for external Kudu tables."));
132 }
133
134 // The analysis for CTAS happens in two phases - the first phase happens before
135 // the target table exists and we want to validate the CREATE statement and the
136 // query portion of the insert statement. If this passes, analysis will be run
137 // over the full INSERT statement. To avoid duplicate registrations of table/colRefs,
138 // create a new root analyzer and clone the query statement for this initial pass.
139 Analyzer dummyRootAnalyzer = new Analyzer(analyzer.getStmtTableCache(),
140 analyzer.getQueryCtx(), analyzer.getAuthzConfig());
141 QueryStmt tmpQueryStmt = insertStmt_.getQueryStmt().clone();
142 Analyzer tmpAnalyzer = new Analyzer(dummyRootAnalyzer);
143 tmpAnalyzer.setUseHiveColLabels(true);
144 tmpQueryStmt.analyze(tmpAnalyzer);
145 // Subqueries need to be rewritten by the StmtRewriter first.
146 if (analyzer.containsSubquery()) return;
147
148 // Add the columns from the partition clause to the create statement.
149 if (partitionKeys_ != null) {
150 int colCnt = tmpQueryStmt.getColLabels().size();
151 int partColCnt = partitionKeys_.size();
152 if (partColCnt >= colCnt) {
153 throw new AnalysisException(String.format("Number of partition columns (%s) " +
154 "must be smaller than the number of columns in the select statement (%s).",
155 partColCnt, colCnt));
156 }
157 int firstCol = colCnt - partColCnt;
158 for (int i = firstCol, j = 0; i < colCnt; ++i, ++j) {
159 String partitionLabel = partitionKeys_.get(j);
160 String colLabel = tmpQueryStmt.getColLabels().get(i);
161
162 // Ensure that partition columns are named and positioned at end of
163 // input column list.
164 if (!partitionLabel.equals(colLabel)) {
165 throw new AnalysisException(String.format("Partition column name " +
166 "mismatch: %s != %s", partitionLabel, colLabel));
167 }
168
169 ColumnDef colDef = new ColumnDef(colLabel, null);
170 colDef.setType(tmpQueryStmt.getBaseTblResultExprs().get(i).getType());
171 createStmt_.getPartitionColumnDefs().add(colDef);
172 }
173 // Remove partition columns from table column list.
174 tmpQueryStmt.getColLabels().subList(firstCol, colCnt).clear();
175 }
176
177 // Add the columns from the select statement to the create statement.
178 int colCnt = tmpQueryStmt.getColLabels().size();
179 for (int i = 0; i < colCnt; ++i) {
180 ColumnDef colDef = new ColumnDef(tmpQueryStmt.getColLabels().get(i), null,
181 Collections.<ColumnDef.Option, Object>emptyMap());
182 colDef.setType(tmpQueryStmt.getBaseTblResultExprs().get(i).getType());
183 createStmt_.getColumnDefs().add(colDef);
184 }
185 createStmt_.analyze(analyzer);
186
187
188 // The full privilege check for the database will be done as part of the INSERT
189 // analysis.
190 Db db = analyzer.getDb(createStmt_.getDb(), Privilege.ANY);
191 if (db == null) {
192 throw new AnalysisException(
193 Analyzer.DB_DOES_NOT_EXIST_ERROR_MSG + createStmt_.getDb());
194 }
195
196 // Running analysis on the INSERT portion of the CTAS requires the target INSERT
197 // table to "exist". For CTAS the table does not exist yet, so create a "temp"
198 // table to run analysis against. The schema of this temp table should exactly
199 // match the schema of the table that will be created by running the CREATE
200 // statement.
201 org.apache.hadoop.hive.metastore.api.Table msTbl =
202 CatalogOpExecutor.createMetaStoreTable(createStmt_.toThrift());
203
204 try (MetaStoreClient client = analyzer.getCatalog().getMetaStoreClient()) {
205 // Set a valid location of this table using the same rules as the metastore. If the
206 // user specified a location for the table this will be a no-op.
207 msTbl.getSd().setLocation(analyzer.getCatalog().getTablePath(msTbl).toString());
208
209 Table tmpTable = null;
210 if (KuduTable.isKuduTable(msTbl)) {
211 tmpTable = KuduTable.createCtasTarget(db, msTbl, createStmt_.getColumnDefs(),
212 createStmt_.getPrimaryKeyColumnDefs(),
213 createStmt_.getKuduPartitionParams());
214 } else {
215 // TODO: Creating a tmp table using load() is confusing.
216 // Refactor it to use a 'createCtasTarget()' function similar to Kudu table.
217 tmpTable = Table.fromMetastoreTable(db, msTbl);
218 tmpTable.load(true, client.getHiveClient(), msTbl);
219 }
220 Preconditions.checkState(tmpTable != null &&
221 (tmpTable instanceof HdfsTable || tmpTable instanceof KuduTable));
222
223 insertStmt_.setTargetTable(tmpTable);
224 } catch (Exception e) {
225 throw new AnalysisException(e.getMessage(), e);
226 }
227
228 // Finally, run analysis on the insert statement.
229 insertStmt_.analyze(analyzer);
230 }
231
232 @Override
233 public List<Expr> getResultExprs() { return insertStmt_.getResultExprs(); }
234
235 @Override
236 public void castResultExprs(List<Type> types) throws AnalysisException {
237 super.castResultExprs(types);
238 // Set types of column definitions.
239 List<ColumnDef> colDefs = createStmt_.getColumnDefs();
240 List<ColumnDef> partitionColDefs = createStmt_.getPartitionColumnDefs();
241 Preconditions.checkState(colDefs.size() + partitionColDefs.size() == types.size());
242 for (int i = 0; i < colDefs.size(); ++i) colDefs.get(i).setType(types.get(i));
243 for (int i = 0; i < partitionColDefs.size(); ++i) {
244 partitionColDefs.get(i).setType(types.get(i + colDefs.size()));
245 }
246 }
247
248 @Override
249 public void rewriteExprs(ExprRewriter rewriter) throws AnalysisException {
250 Preconditions.checkState(isAnalyzed());
251 insertStmt_.rewriteExprs(rewriter);
252 }
253
254 @Override
255 public void reset() {
256 super.reset();
257 createStmt_.reset();
258 insertStmt_.reset();
259 }
260 }