HIVE-18192: Introduce WriteID per table rather than using global transaction ID ...
[hive.git] / hcatalog / streaming / src / java / org / apache / hive / hcatalog / streaming / DelimitedInputWriter.java
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19 package org.apache.hive.hcatalog.streaming;
20
21
22 import com.google.common.annotations.VisibleForTesting;
23 import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
26 import org.apache.hadoop.hive.conf.HiveConf;
27 import org.apache.hadoop.hive.metastore.api.FieldSchema;
28 import org.apache.hadoop.hive.metastore.api.Table;
29 import org.apache.hadoop.hive.serde2.AbstractSerDe;
30 import org.apache.hadoop.hive.serde2.SerDeException;
31 import org.apache.hadoop.hive.serde2.SerDeUtils;
32 import org.apache.hadoop.hive.serde2.lazy.LazySerDeParameters;
33 import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
34 import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector;
35 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
36 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
37 import org.apache.hadoop.io.BytesWritable;
38
39 import java.io.IOException;
40 import java.io.UnsupportedEncodingException;
41 import java.util.ArrayList;
42 import java.util.List;
43 import java.util.Properties;
44
45 /**
46 * Streaming Writer handles delimited input (eg. CSV).
47 * Delimited input is parsed & reordered to match column order in table
48 * Uses Lazy Simple Serde to process delimited input
49 */
50 public class DelimitedInputWriter extends AbstractRecordWriter {
51 private final boolean reorderingNeeded;
52 private String delimiter;
53 private char serdeSeparator;
54 private int[] fieldToColMapping;
55 private final ArrayList<String> tableColumns;
56 private LazySimpleSerDe serde = null;
57
58 private final LazySimpleStructObjectInspector recordObjInspector;
59 private final ObjectInspector[] bucketObjInspectors;
60 private final StructField[] bucketStructFields;
61
62 static final private Logger LOG = LoggerFactory.getLogger(DelimitedInputWriter.class.getName());
63
64 /** Constructor. Uses default separator of the LazySimpleSerde
65 * @param colNamesForFields Column name assignment for input fields. nulls or empty
66 * strings in the array indicates the fields to be skipped
67 * @param delimiter input field delimiter
68 * @param endPoint Hive endpoint
69 * @throws ConnectionError Problem talking to Hive
70 * @throws ClassNotFoundException Serde class not found
71 * @throws SerializationError Serde initialization/interaction failed
72 * @throws StreamingException Problem acquiring file system path for partition
73 * @throws InvalidColumn any element in colNamesForFields refers to a non existing column
74 */
75 public DelimitedInputWriter(String[] colNamesForFields, String delimiter,
76 HiveEndPoint endPoint, StreamingConnection conn)
77 throws ClassNotFoundException, ConnectionError, SerializationError,
78 InvalidColumn, StreamingException {
79 this(colNamesForFields, delimiter, endPoint, null, conn);
80 }
81 /** Constructor. Uses default separator of the LazySimpleSerde
82 * @param colNamesForFields Column name assignment for input fields. nulls or empty
83 * strings in the array indicates the fields to be skipped
84 * @param delimiter input field delimiter
85 * @param endPoint Hive endpoint
86 * @param conf a Hive conf object. Can be null if not using advanced hive settings.
87 * @throws ConnectionError Problem talking to Hive
88 * @throws ClassNotFoundException Serde class not found
89 * @throws SerializationError Serde initialization/interaction failed
90 * @throws StreamingException Problem acquiring file system path for partition
91 * @throws InvalidColumn any element in colNamesForFields refers to a non existing column
92 */
93 public DelimitedInputWriter(String[] colNamesForFields, String delimiter,
94 HiveEndPoint endPoint, HiveConf conf, StreamingConnection conn)
95 throws ClassNotFoundException, ConnectionError, SerializationError,
96 InvalidColumn, StreamingException {
97 this(colNamesForFields, delimiter, endPoint, conf,
98 (char) LazySerDeParameters.DefaultSeparators[0], conn);
99 }
100 /**
101 * Constructor. Allows overriding separator of the LazySimpleSerde
102 * @param colNamesForFields Column name assignment for input fields
103 * @param delimiter input field delimiter
104 * @param endPoint Hive endpoint
105 * @param conf a Hive conf object. Set to null if not using advanced hive settings.
106 * @param serdeSeparator separator used when encoding data that is fed into the
107 * LazySimpleSerde. Ensure this separator does not occur
108 * in the field data
109 * @param conn connection this Writer is to be used with
110 * @throws ConnectionError Problem talking to Hive
111 * @throws ClassNotFoundException Serde class not found
112 * @throws SerializationError Serde initialization/interaction failed
113 * @throws StreamingException Problem acquiring file system path for partition
114 * @throws InvalidColumn any element in colNamesForFields refers to a non existing column
115 */
116 public DelimitedInputWriter(String[] colNamesForFields, String delimiter,
117 HiveEndPoint endPoint, HiveConf conf, char serdeSeparator, StreamingConnection conn)
118 throws ClassNotFoundException, ConnectionError, SerializationError,
119 InvalidColumn, StreamingException {
120 super(endPoint, conf, conn);
121 this.tableColumns = getCols(tbl);
122 this.serdeSeparator = serdeSeparator;
123 this.delimiter = delimiter;
124 this.fieldToColMapping = getFieldReordering(colNamesForFields, getTableColumns());
125 this.reorderingNeeded = isReorderingNeeded(delimiter, getTableColumns());
126 LOG.debug("Field reordering needed = " + this.reorderingNeeded + ", for endpoint " + endPoint);
127 this.serdeSeparator = serdeSeparator;
128 this.serde = createSerde(tbl, conf, serdeSeparator);
129
130 // get ObjInspectors for entire record and bucketed cols
131 try {
132 this.recordObjInspector = (LazySimpleStructObjectInspector) serde.getObjectInspector();
133 this.bucketObjInspectors = getObjectInspectorsForBucketedCols(bucketIds, recordObjInspector);
134 } catch (SerDeException e) {
135 throw new SerializationError("Unable to get ObjectInspector for bucket columns", e);
136 }
137
138 // get StructFields for bucketed cols
139 bucketStructFields = new StructField[bucketIds.size()];
140 List<? extends StructField> allFields = recordObjInspector.getAllStructFieldRefs();
141 for (int i = 0; i < bucketIds.size(); i++) {
142 bucketStructFields[i] = allFields.get(bucketIds.get(i));
143 }
144 }
145 /**
146 * @deprecated As of release 1.3/2.1. Replaced by {@link #DelimitedInputWriter(String[], String, HiveEndPoint, StreamingConnection)}
147 */
148 public DelimitedInputWriter(String[] colNamesForFields, String delimiter,
149 HiveEndPoint endPoint)
150 throws ClassNotFoundException, ConnectionError, SerializationError,
151 InvalidColumn, StreamingException {
152 this(colNamesForFields, delimiter, endPoint, null, null);
153 }
154 /**
155 * @deprecated As of release 1.3/2.1. Replaced by {@link #DelimitedInputWriter(String[], String, HiveEndPoint, HiveConf, StreamingConnection)}
156 */
157 public DelimitedInputWriter(String[] colNamesForFields, String delimiter,
158 HiveEndPoint endPoint, HiveConf conf)
159 throws ClassNotFoundException, ConnectionError, SerializationError,
160 InvalidColumn, StreamingException {
161 this(colNamesForFields, delimiter, endPoint, conf,
162 (char) LazySerDeParameters.DefaultSeparators[0], null);
163 }
164 /**
165 * @deprecated As of release 1.3/2.1. Replaced by {@link #DelimitedInputWriter(String[], String, HiveEndPoint, HiveConf, char, StreamingConnection)}
166 */
167 public DelimitedInputWriter(String[] colNamesForFields, String delimiter,
168 HiveEndPoint endPoint, HiveConf conf, char serdeSeparator)
169 throws ClassNotFoundException, StreamingException {
170 this(colNamesForFields, delimiter, endPoint, conf, serdeSeparator, null);
171 }
172
173 private boolean isReorderingNeeded(String delimiter, ArrayList<String> tableColumns) {
174 return !( delimiter.equals(String.valueOf(getSerdeSeparator()))
175 && areFieldsInColOrder(fieldToColMapping)
176 && tableColumns.size()>=fieldToColMapping.length );
177 }
178
179 private static boolean areFieldsInColOrder(int[] fieldToColMapping) {
180 for(int i=0; i<fieldToColMapping.length; ++i) {
181 if(fieldToColMapping[i]!=i) {
182 return false;
183 }
184 }
185 return true;
186 }
187
188 @VisibleForTesting
189 static int[] getFieldReordering(String[] colNamesForFields, List<String> tableColNames)
190 throws InvalidColumn {
191 int[] result = new int[ colNamesForFields.length ];
192 for(int i=0; i<colNamesForFields.length; ++i) {
193 result[i] = -1;
194 }
195 int i=-1, fieldLabelCount=0;
196 for( String col : colNamesForFields ) {
197 ++i;
198 if(col == null) {
199 continue;
200 }
201 if( col.trim().isEmpty() ) {
202 continue;
203 }
204 ++fieldLabelCount;
205 int loc = tableColNames.indexOf(col);
206 if(loc == -1) {
207 throw new InvalidColumn("Column '" + col + "' not found in table for input field " + i+1);
208 }
209 result[i] = loc;
210 }
211 if(fieldLabelCount>tableColNames.size()) {
212 throw new InvalidColumn("Number of field names exceeds the number of columns in table");
213 }
214 return result;
215 }
216
217 // Reorder fields in record based on the order of columns in the table
218 protected byte[] reorderFields(byte[] record) throws UnsupportedEncodingException {
219 if(!reorderingNeeded) {
220 return record;
221 }
222 String[] reorderedFields = new String[getTableColumns().size()];
223 String decoded = new String(record);
224 String[] fields = decoded.split(delimiter,-1);
225 for (int i=0; i<fieldToColMapping.length; ++i) {
226 int newIndex = fieldToColMapping[i];
227 if(newIndex != -1) {
228 reorderedFields[newIndex] = fields[i];
229 }
230 }
231 return join(reorderedFields, getSerdeSeparator());
232 }
233
234 // handles nulls in items[]
235 // TODO: perhaps can be made more efficient by creating a byte[] directly
236 private static byte[] join(String[] items, char separator) {
237 StringBuilder buff = new StringBuilder(100);
238 if(items.length == 0)
239 return "".getBytes();
240 int i=0;
241 for(; i<items.length-1; ++i) {
242 if(items[i]!=null) {
243 buff.append(items[i]);
244 }
245 buff.append(separator);
246 }
247 if(items[i]!=null) {
248 buff.append(items[i]);
249 }
250 return buff.toString().getBytes();
251 }
252
253 protected ArrayList<String> getTableColumns() {
254 return tableColumns;
255 }
256
257 @Override
258 public void write(long writeId, byte[] record)
259 throws SerializationError, StreamingIOFailure {
260 try {
261 byte[] orderedFields = reorderFields(record);
262 Object encodedRow = encode(orderedFields);
263 int bucket = getBucket(encodedRow);
264 getRecordUpdater(bucket).insert(writeId, encodedRow);
265 } catch (IOException e) {
266 throw new StreamingIOFailure("Error writing record in transaction write id ("
267 + writeId + ")", e);
268 }
269 }
270
271 @Override
272 public AbstractSerDe getSerde() {
273 return serde;
274 }
275
276 protected LazySimpleStructObjectInspector getRecordObjectInspector() {
277 return recordObjInspector;
278 }
279
280 @Override
281 protected StructField[] getBucketStructFields() {
282 return bucketStructFields;
283 }
284
285 protected ObjectInspector[] getBucketObjectInspectors() {
286 return bucketObjInspectors;
287 }
288
289 @Override
290 public Object encode(byte[] record) throws SerializationError {
291 try {
292 BytesWritable blob = new BytesWritable();
293 blob.set(record, 0, record.length);
294 return serde.deserialize(blob);
295 } catch (SerDeException e) {
296 throw new SerializationError("Unable to convert byte[] record into Object", e);
297 }
298 }
299
300 /**
301 * Creates LazySimpleSerde
302 * @return
303 * @throws SerializationError if serde could not be initialized
304 * @param tbl
305 */
306 protected static LazySimpleSerDe createSerde(Table tbl, HiveConf conf, char serdeSeparator)
307 throws SerializationError {
308 try {
309 Properties tableProps = MetaStoreUtils.getTableMetadata(tbl);
310 tableProps.setProperty("field.delim", String.valueOf(serdeSeparator));
311 LazySimpleSerDe serde = new LazySimpleSerDe();
312 SerDeUtils.initializeSerDe(serde, conf, tableProps, null);
313 return serde;
314 } catch (SerDeException e) {
315 throw new SerializationError("Error initializing serde", e);
316 }
317 }
318
319 private ArrayList<String> getCols(Table table) {
320 List<FieldSchema> cols = table.getSd().getCols();
321 ArrayList<String> colNames = new ArrayList<String>(cols.size());
322 for (FieldSchema col : cols) {
323 colNames.add(col.getName().toLowerCase());
324 }
325 return colNames;
326 }
327
328 public char getSerdeSeparator() {
329 return serdeSeparator;
330 }
331 }