TINKERPOP-1862 Fix Messenger implementations for Spark/Giraph handling BOTH
[tinkerpop.git] / spark-gremlin / src / test / java / org / apache / tinkerpop / gremlin / spark / structure / io / ToyGraphInputRDD.java
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20 package org.apache.tinkerpop.gremlin.spark.structure.io;
21
22 import org.apache.commons.configuration.Configuration;
23 import org.apache.spark.api.java.JavaPairRDD;
24 import org.apache.spark.api.java.JavaSparkContext;
25 import org.apache.tinkerpop.gremlin.hadoop.Constants;
26 import org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopPools;
27 import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
28 import org.apache.tinkerpop.gremlin.structure.Graph;
29 import org.apache.tinkerpop.gremlin.structure.io.GraphReader;
30 import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoIo;
31 import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoReader;
32 import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoResourceAccess;
33 import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimServiceLoader;
34 import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerFactory;
35 import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph;
36 import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
37 import scala.Tuple2;
38
39 import java.io.IOException;
40 import java.io.InputStream;
41 import java.util.List;
42
43 /**
44 * @author Marko A. Rodriguez (http://markorodriguez.com)
45 */
46 public final class ToyGraphInputRDD implements InputRDD {
47
48 @Override
49 public JavaPairRDD<Object, VertexWritable> readGraphRDD(final Configuration configuration, final JavaSparkContext sparkContext) {
50 KryoShimServiceLoader.applyConfiguration(TinkerGraph.open().configuration());
51 final List<VertexWritable> vertices;
52 if (configuration.getString(Constants.GREMLIN_HADOOP_INPUT_LOCATION).contains("modern"))
53 vertices = IteratorUtils.list(IteratorUtils.map(TinkerFactory.createModern().vertices(), VertexWritable::new));
54 else if (configuration.getString(Constants.GREMLIN_HADOOP_INPUT_LOCATION).contains("classic"))
55 vertices = IteratorUtils.list(IteratorUtils.map(TinkerFactory.createClassic().vertices(), VertexWritable::new));
56 else if (configuration.getString(Constants.GREMLIN_HADOOP_INPUT_LOCATION).contains("crew"))
57 vertices = IteratorUtils.list(IteratorUtils.map(TinkerFactory.createTheCrew().vertices(), VertexWritable::new));
58 else if (configuration.getString(Constants.GREMLIN_HADOOP_INPUT_LOCATION).contains("sink"))
59 vertices = IteratorUtils.list(IteratorUtils.map(TinkerFactory.createKitchenSink().vertices(), VertexWritable::new));
60 else if (configuration.getString(Constants.GREMLIN_HADOOP_INPUT_LOCATION).contains("grateful")) {
61 try {
62 final Graph graph = TinkerGraph.open();
63 final GraphReader reader = GryoReader.build().mapper(graph.io(GryoIo.build()).mapper().create()).create();
64 try (final InputStream stream = GryoResourceAccess.class.getResourceAsStream("grateful-dead.kryo")) {
65 reader.readGraph(stream, graph);
66 }
67 vertices = IteratorUtils.list(IteratorUtils.map(graph.vertices(), VertexWritable::new));
68 } catch (final IOException e) {
69 throw new IllegalStateException(e.getMessage(), e);
70 }
71 } else
72 throw new IllegalArgumentException("No legal toy graph was provided to load: " + configuration.getProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION));
73
74 return sparkContext.parallelize(vertices).mapToPair(vertex -> new Tuple2<>(vertex.get().id(), vertex));
75 }
76 }