HIVE-18192: Introduce WriteID per table rather than using global transaction ID ...
[hive.git] / hcatalog / streaming / src / java / org / apache / hive / hcatalog / streaming / StrictJsonWriter.java
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19 package org.apache.hive.hcatalog.streaming;
20
21 import org.apache.hadoop.hive.conf.HiveConf;
22 import org.apache.hadoop.hive.metastore.api.Table;
23 import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
24 import org.apache.hadoop.hive.serde2.AbstractSerDe;
25 import org.apache.hadoop.hive.serde2.SerDeException;
26 import org.apache.hadoop.hive.serde2.SerDeUtils;
27 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
28 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
29 import org.apache.hadoop.io.Text;
30 import org.apache.hive.hcatalog.data.HCatRecordObjectInspector;
31 import org.apache.hive.hcatalog.data.JsonSerDe;
32
33 import java.io.IOException;
34 import java.util.List;
35 import java.util.Properties;
36
37 /**
38 * Streaming Writer handles utf8 encoded Json (Strict syntax).
39 * Uses org.apache.hive.hcatalog.data.JsonSerDe to process Json input
40 */
41 public class StrictJsonWriter extends AbstractRecordWriter {
42 private JsonSerDe serde;
43
44 private final HCatRecordObjectInspector recordObjInspector;
45 private final ObjectInspector[] bucketObjInspectors;
46 private final StructField[] bucketStructFields;
47
48 /**
49 * @deprecated As of release 1.3/2.1. Replaced by {@link #StrictJsonWriter(HiveEndPoint, HiveConf, StreamingConnection)}
50 */
51 public StrictJsonWriter(HiveEndPoint endPoint)
52 throws ConnectionError, SerializationError, StreamingException {
53 this(endPoint, null, null);
54 }
55
56 /**
57 * @deprecated As of release 1.3/2.1. Replaced by {@link #StrictJsonWriter(HiveEndPoint, HiveConf, StreamingConnection)}
58 */
59 public StrictJsonWriter(HiveEndPoint endPoint, HiveConf conf) throws StreamingException {
60 this(endPoint, conf, null);
61 }
62 /**
63 * @param endPoint the end point to write to
64 * @throws ConnectionError
65 * @throws SerializationError
66 * @throws StreamingException
67 */
68 public StrictJsonWriter(HiveEndPoint endPoint, StreamingConnection conn)
69 throws ConnectionError, SerializationError, StreamingException {
70 this(endPoint, null, conn);
71 }
72 /**
73 * @param endPoint the end point to write to
74 * @param conf a Hive conf object. Should be null if not using advanced Hive settings.
75 * @param conn connection this Writer is to be used with
76 * @throws ConnectionError
77 * @throws SerializationError
78 * @throws StreamingException
79 */
80 public StrictJsonWriter(HiveEndPoint endPoint, HiveConf conf, StreamingConnection conn)
81 throws ConnectionError, SerializationError, StreamingException {
82 super(endPoint, conf, conn);
83 this.serde = createSerde(tbl, conf);
84 // get ObjInspectors for entire record and bucketed cols
85 try {
86 recordObjInspector = ( HCatRecordObjectInspector ) serde.getObjectInspector();
87 this.bucketObjInspectors = getObjectInspectorsForBucketedCols(bucketIds, recordObjInspector);
88 } catch (SerDeException e) {
89 throw new SerializationError("Unable to get ObjectInspector for bucket columns", e);
90 }
91
92 // get StructFields for bucketed cols
93 bucketStructFields = new StructField[bucketIds.size()];
94 List<? extends StructField> allFields = recordObjInspector.getAllStructFieldRefs();
95 for (int i = 0; i < bucketIds.size(); i++) {
96 bucketStructFields[i] = allFields.get(bucketIds.get(i));
97 }
98 }
99
100 @Override
101 public AbstractSerDe getSerde() {
102 return serde;
103 }
104
105 protected HCatRecordObjectInspector getRecordObjectInspector() {
106 return recordObjInspector;
107 }
108
109 @Override
110 protected StructField[] getBucketStructFields() {
111 return bucketStructFields;
112 }
113
114 protected ObjectInspector[] getBucketObjectInspectors() {
115 return bucketObjInspectors;
116 }
117
118
119 @Override
120 public void write(long writeId, byte[] record)
121 throws StreamingIOFailure, SerializationError {
122 try {
123 Object encodedRow = encode(record);
124 int bucket = getBucket(encodedRow);
125 getRecordUpdater(bucket).insert(writeId, encodedRow);
126 } catch (IOException e) {
127 throw new StreamingIOFailure("Error writing record in transaction write id("
128 + writeId + ")", e);
129 }
130
131 }
132
133 /**
134 * Creates JsonSerDe
135 * @param tbl used to create serde
136 * @param conf used to create serde
137 * @return
138 * @throws SerializationError if serde could not be initialized
139 */
140 private static JsonSerDe createSerde(Table tbl, HiveConf conf)
141 throws SerializationError {
142 try {
143 Properties tableProps = MetaStoreUtils.getTableMetadata(tbl);
144 JsonSerDe serde = new JsonSerDe();
145 SerDeUtils.initializeSerDe(serde, conf, tableProps, null);
146 return serde;
147 } catch (SerDeException e) {
148 throw new SerializationError("Error initializing serde " + JsonSerDe.class.getName(), e);
149 }
150 }
151
152 @Override
153 public Object encode(byte[] utf8StrRecord) throws SerializationError {
154 try {
155 Text blob = new Text(utf8StrRecord);
156 return serde.deserialize(blob);
157 } catch (SerDeException e) {
158 throw new SerializationError("Unable to convert byte[] record into Object", e);
159 }
160 }
161
162 }