HIVE-18192: Introduce WriteID per table rather than using global transaction ID ...
[hive.git] / hcatalog / streaming / src / java / org / apache / hive / hcatalog / streaming / StrictRegexWriter.java
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19 package org.apache.hive.hcatalog.streaming;
20
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.List;
24 import java.util.Properties;
25
26 import org.apache.commons.lang.StringUtils;
27 import org.apache.hadoop.hive.conf.HiveConf;
28 import org.apache.hadoop.hive.metastore.api.FieldSchema;
29 import org.apache.hadoop.hive.metastore.api.Table;
30 import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
31 import org.apache.hadoop.hive.serde.serdeConstants;
32 import org.apache.hadoop.hive.serde2.AbstractSerDe;
33 import org.apache.hadoop.hive.serde2.RegexSerDe;
34 import org.apache.hadoop.hive.serde2.SerDeException;
35 import org.apache.hadoop.hive.serde2.SerDeUtils;
36 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
37 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
38 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
39 import org.apache.hadoop.io.Text;
40
41 /**
42 * Streaming Writer handles text input data with regex. Uses
43 * org.apache.hadoop.hive.serde2.RegexSerDe
44 */
45 public class StrictRegexWriter extends AbstractRecordWriter {
46 private RegexSerDe serde;
47 private final StructObjectInspector recordObjInspector;
48 private final ObjectInspector[] bucketObjInspectors;
49 private final StructField[] bucketStructFields;
50
51 /**
52 * @param endPoint the end point to write to
53 * @param conn connection this Writer is to be used with
54 * @throws ConnectionError
55 * @throws SerializationError
56 * @throws StreamingException
57 */
58 public StrictRegexWriter(HiveEndPoint endPoint, StreamingConnection conn)
59 throws ConnectionError, SerializationError, StreamingException {
60 this(null, endPoint, null, conn);
61 }
62
63 /**
64 * @param endPoint the end point to write to
65 * @param conf a Hive conf object. Should be null if not using advanced Hive settings.
66 * @param conn connection this Writer is to be used with
67 * @throws ConnectionError
68 * @throws SerializationError
69 * @throws StreamingException
70 */
71 public StrictRegexWriter(HiveEndPoint endPoint, HiveConf conf, StreamingConnection conn)
72 throws ConnectionError, SerializationError, StreamingException {
73 this(null, endPoint, conf, conn);
74 }
75
76 /**
77 * @param regex to parse the data
78 * @param endPoint the end point to write to
79 * @param conf a Hive conf object. Should be null if not using advanced Hive settings.
80 * @param conn connection this Writer is to be used with
81 * @throws ConnectionError
82 * @throws SerializationError
83 * @throws StreamingException
84 */
85 public StrictRegexWriter(String regex, HiveEndPoint endPoint, HiveConf conf, StreamingConnection conn)
86 throws ConnectionError, SerializationError, StreamingException {
87 super(endPoint, conf, conn);
88 this.serde = createSerde(tbl, conf, regex);
89 // get ObjInspectors for entire record and bucketed cols
90 try {
91 recordObjInspector = ( StructObjectInspector ) serde.getObjectInspector();
92 this.bucketObjInspectors = getObjectInspectorsForBucketedCols(bucketIds, recordObjInspector);
93 } catch (SerDeException e) {
94 throw new SerializationError("Unable to get ObjectInspector for bucket columns", e);
95 }
96
97 // get StructFields for bucketed cols
98 bucketStructFields = new StructField[bucketIds.size()];
99 List<? extends StructField> allFields = recordObjInspector.getAllStructFieldRefs();
100 for (int i = 0; i < bucketIds.size(); i++) {
101 bucketStructFields[i] = allFields.get(bucketIds.get(i));
102 }
103 }
104
105 @Override
106 public AbstractSerDe getSerde() {
107 return serde;
108 }
109
110 @Override
111 protected StructObjectInspector getRecordObjectInspector() {
112 return recordObjInspector;
113 }
114
115 @Override
116 protected StructField[] getBucketStructFields() {
117 return bucketStructFields;
118 }
119
120 @Override
121 protected ObjectInspector[] getBucketObjectInspectors() {
122 return bucketObjInspectors;
123 }
124
125
126 @Override
127 public void write(long writeId, byte[] record)
128 throws StreamingIOFailure, SerializationError {
129 try {
130 Object encodedRow = encode(record);
131 int bucket = getBucket(encodedRow);
132 getRecordUpdater(bucket).insert(writeId, encodedRow);
133 } catch (IOException e) {
134 throw new StreamingIOFailure("Error writing record in transaction write id("
135 + writeId + ")", e);
136 }
137 }
138
139 /**
140 * Creates RegexSerDe
141 * @param tbl used to create serde
142 * @param conf used to create serde
143 * @param regex used to create serde
144 * @return
145 * @throws SerializationError if serde could not be initialized
146 */
147 private static RegexSerDe createSerde(Table tbl, HiveConf conf, String regex)
148 throws SerializationError {
149 try {
150 Properties tableProps = MetaStoreUtils.getTableMetadata(tbl);
151 tableProps.setProperty(RegexSerDe.INPUT_REGEX, regex);
152 ArrayList<String> tableColumns = getCols(tbl);
153 tableProps.setProperty(serdeConstants.LIST_COLUMNS, StringUtils.join(tableColumns, ","));
154 RegexSerDe serde = new RegexSerDe();
155 SerDeUtils.initializeSerDe(serde, conf, tableProps, null);
156 return serde;
157 } catch (SerDeException e) {
158 throw new SerializationError("Error initializing serde " + RegexSerDe.class.getName(), e);
159 }
160 }
161
162 private static ArrayList<String> getCols(Table table) {
163 List<FieldSchema> cols = table.getSd().getCols();
164 ArrayList<String> colNames = new ArrayList<String>(cols.size());
165 for (FieldSchema col : cols) {
166 colNames.add(col.getName().toLowerCase());
167 }
168 return colNames;
169 }
170
171 /**
172 * Encode Utf8 encoded string bytes using RegexSerDe
173 *
174 * @param utf8StrRecord
175 * @return The encoded object
176 * @throws SerializationError
177 */
178 @Override
179 public Object encode(byte[] utf8StrRecord) throws SerializationError {
180 try {
181 Text blob = new Text(utf8StrRecord);
182 return serde.deserialize(blob);
183 } catch (SerDeException e) {
184 throw new SerializationError("Unable to convert byte[] record into Object", e);
185 }
186 }
187
188 }