fiddling with avro writer
authorOwen O'Malley <omalley@apache.org>
Mon, 17 Oct 2016 17:40:36 +0000 (10:40 -0700)
committerOwen O'Malley <omalley@apache.org>
Mon, 17 Oct 2016 17:40:36 +0000 (10:40 -0700)
java/bench/src/java/org/apache/orc/bench/GithubToAvro.java
java/bench/src/java/org/apache/orc/bench/SalesToAvro.java
java/bench/src/java/org/apache/orc/bench/TaxiToAvro.java
java/bench/src/java/org/apache/orc/bench/avro/AvroWriter.java

index eb94ff2..d31f1b6 100644 (file)
@@ -31,7 +31,7 @@ public class GithubToAvro {
     TypeDescription schema = Utilities.loadSchema("github.schema");
     Configuration conf = new Configuration();
     AvroWriter writer = new AvroWriter(new Path(args[0]), schema, conf,
-        TaxiToAvro.getCodec(args[1]));
+        CompressionKind.valueOf(args[1]));
     VectorizedRowBatch batch = schema.createRowBatch();
     for(String inFile: Utilities.sliceArray(args, 2)) {
       JsonReader reader = new JsonReader(new Path(inFile), conf, schema);
index 900be66..fcfe434 100644 (file)
@@ -31,7 +31,7 @@ public class SalesToAvro {
     SalesGenerator sales = new SalesGenerator(Long.parseLong(args[2]));
     TypeDescription schema = sales.getSchema();
     AvroWriter writer = new AvroWriter(new Path(args[0]), schema, conf,
-        TaxiToAvro.getCodec(args[1]));
+        CompressionKind.valueOf(args[1]));
     VectorizedRowBatch batch = schema.createRowBatch();
     while (sales.nextBatch(batch)) {
       writer.writeBatch(batch);
index 9fd2f23..d5eb822 100644 (file)
@@ -27,22 +27,11 @@ import org.apache.orc.bench.csv.CsvReader;
 
 public class TaxiToAvro {
 
-  public static String getCodec(String compression) {
-    if ("none".equals(compression)) {
-      return "null";
-    } else if ("zlib".equals(compression)) {
-      return "deflate";
-    } else if ("snappy".equals(compression)) {
-      return "snappy";
-    }
-    throw new IllegalArgumentException("Unknown compression " + compression);
-  }
-
   public static void main(String[] args) throws Exception {
     TypeDescription schema = Utilities.loadSchema("nyc-taxi.schema");
     Configuration conf = new Configuration();
     AvroWriter writer = new AvroWriter(new Path(args[0]), schema, conf,
-        getCodec(args[1]));
+        CompressionKind.valueOf(args[1]));
     VectorizedRowBatch batch = schema.createRowBatch();
     for(String inFile: Utilities.sliceArray(args, 2)) {
       CsvReader reader = new CsvReader(new Path(inFile), conf, schema);
index 8cc9d06..eeb2fee 100644 (file)
@@ -37,12 +37,12 @@ import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.orc.TypeDescription;
+import org.apache.orc.bench.CompressionKind;
 
 import java.io.IOException;
 import java.nio.Buffer;
 import java.nio.ByteBuffer;
 import java.util.List;
-import java.util.Properties;
 
 public class AvroWriter {
 
@@ -305,7 +305,7 @@ public class AvroWriter {
 
   public AvroWriter(Path path, TypeDescription schema,
                     Configuration conf,
-                    String compression) throws IOException {
+                    CompressionKind compression) throws IOException {
     List<TypeDescription> childTypes = schema.getChildren();
     Schema avroSchema = AvroSchemaUtils.createAvroSchema(schema);
     System.out.println("Hive schema " + schema);
@@ -318,8 +318,17 @@ public class AvroWriter {
     }
     GenericDatumWriter gdw = new GenericDatumWriter(avroSchema);
     writer = new DataFileWriter(gdw);
-    if (compression != null & !"".equals(compression)) {
-      writer.setCodec(CodecFactory.fromString(compression));
+    switch (compression) {
+      case NONE:
+        break;
+      case ZLIB:
+        writer.setCodec(CodecFactory.deflateCodec(-1));
+        break;
+      case SNAPPY:
+        writer.setCodec(CodecFactory.snappyCodec());
+        break;
+      default:
+        throw new IllegalArgumentException("Compression unsupported " + compression);
     }
     writer.create(avroSchema, path.getFileSystem(conf).create(path));
     record = new GenericData.Record(avroSchema);