TINKERPOP-1862 Fix Messenger implementations for Spark/Giraph handling BOTH
[tinkerpop.git] / gremlin-test / src / main / java / org / apache / tinkerpop / gremlin / process / computer / GraphComputerTest.java
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19 package org.apache.tinkerpop.gremlin.process.computer;
20
21 import org.apache.commons.configuration.BaseConfiguration;
22 import org.apache.commons.configuration.Configuration;
23 import org.apache.commons.configuration.ConfigurationUtils;
24 import org.apache.tinkerpop.gremlin.ExceptionCoverage;
25 import org.apache.tinkerpop.gremlin.LoadGraphWith;
26 import org.apache.tinkerpop.gremlin.process.AbstractGremlinProcessTest;
27 import org.apache.tinkerpop.gremlin.process.computer.clustering.peerpressure.PeerPressureVertexProgram;
28 import org.apache.tinkerpop.gremlin.process.computer.ranking.pagerank.PageRankVertexProgram;
29 import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
30 import org.apache.tinkerpop.gremlin.process.computer.util.AbstractVertexProgramBuilder;
31 import org.apache.tinkerpop.gremlin.process.computer.util.StaticMapReduce;
32 import org.apache.tinkerpop.gremlin.process.computer.util.StaticVertexProgram;
33 import org.apache.tinkerpop.gremlin.process.traversal.Operator;
34 import org.apache.tinkerpop.gremlin.process.traversal.P;
35 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
36 import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
37 import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal;
38 import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
39 import org.apache.tinkerpop.gremlin.process.traversal.step.util.EmptyPath;
40 import org.apache.tinkerpop.gremlin.process.traversal.strategy.verification.VerificationException;
41 import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
42 import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
43 import org.apache.tinkerpop.gremlin.structure.Direction;
44 import org.apache.tinkerpop.gremlin.structure.Edge;
45 import org.apache.tinkerpop.gremlin.structure.Graph;
46 import org.apache.tinkerpop.gremlin.structure.Property;
47 import org.apache.tinkerpop.gremlin.structure.Vertex;
48 import org.apache.tinkerpop.gremlin.structure.VertexProperty;
49 import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
50 import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
51 import org.javatuples.Pair;
52 import org.junit.Test;
53
54 import java.util.ArrayList;
55 import java.util.Arrays;
56 import java.util.Collections;
57 import java.util.Comparator;
58 import java.util.HashMap;
59 import java.util.HashSet;
60 import java.util.Iterator;
61 import java.util.LinkedList;
62 import java.util.List;
63 import java.util.Map;
64 import java.util.Optional;
65 import java.util.Set;
66 import java.util.concurrent.ConcurrentSkipListSet;
67 import java.util.concurrent.ExecutionException;
68 import java.util.concurrent.Future;
69
70 import static org.apache.tinkerpop.gremlin.LoadGraphWith.GraphData.GRATEFUL;
71 import static org.apache.tinkerpop.gremlin.LoadGraphWith.GraphData.MODERN;
72 import static org.apache.tinkerpop.gremlin.LoadGraphWith.GraphData.SINK;
73 import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.outE;
74 import static org.junit.Assert.assertEquals;
75 import static org.junit.Assert.assertFalse;
76 import static org.junit.Assert.assertTrue;
77 import static org.junit.Assert.fail;
78 import static org.junit.Assume.assumeNoException;
79
80 /**
81 * @author Marko A. Rodriguez (http://markorodriguez.com)
82 */
83 @ExceptionCoverage(exceptionClass = GraphComputer.Exceptions.class, methods = {
84 "providedKeyIsNotAMemoryComputeKey",
85 "computerHasNoVertexProgramNorMapReducers",
86 "computerHasAlreadyBeenSubmittedAVertexProgram",
87 "providedKeyIsNotAnElementComputeKey",
88 "incidentAndAdjacentElementsCanNotBeAccessedInMapReduce",
89 "adjacentVertexLabelsCanNotBeRead",
90 "adjacentVertexPropertiesCanNotBeReadOrUpdated",
91 "adjacentVertexEdgesAndVerticesCanNotBeReadOrUpdated",
92 "resultGraphPersistCombinationNotSupported",
93 "vertexPropertiesCanNotBeUpdatedInMapReduce",
94 "computerRequiresMoreWorkersThanSupported",
95 "vertexFilterAccessesIncidentEdges",
96 "edgeFilterAccessesAdjacentVertices",
97 "graphFilterNotSupported"
98 })
99 @ExceptionCoverage(exceptionClass = Memory.Exceptions.class, methods = {
100 "memoryKeyCanNotBeEmpty",
101 "memoryKeyCanNotBeNull",
102 "memoryValueCanNotBeNull",
103 "memoryIsCurrentlyImmutable",
104 "memoryDoesNotExist",
105 "memorySetOnlyDuringVertexProgramSetUpAndTerminate",
106 "memoryAddOnlyDuringVertexProgramExecute",
107 "adjacentVertexEdgesAndVerticesCanNotBeReadOrUpdated"
108 })
109 @ExceptionCoverage(exceptionClass = Graph.Exceptions.class, methods = {
110 "graphDoesNotSupportProvidedGraphComputer"
111 })
112 @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
113 public class GraphComputerTest extends AbstractGremlinProcessTest {
114
115 @Test
116 @LoadGraphWith(MODERN)
117 public void shouldHaveStandardStringRepresentation() {
118 final GraphComputer computer = graphProvider.getGraphComputer(graph);
119 assertEquals(StringFactory.graphComputerString(computer), computer.toString());
120 }
121
122 @Test
123 @LoadGraphWith(MODERN)
124 public void shouldNotAllowWithNoVertexProgramNorMapReducers() throws Exception {
125 try {
126 graphProvider.getGraphComputer(graph).submit().get();
127 fail("Should throw an IllegalStateException when there is no vertex program nor map reducers");
128 } catch (Exception ex) {
129 validateException(GraphComputer.Exceptions.computerHasNoVertexProgramNorMapReducers(), ex);
130 }
131 }
132
133
134 /////////////////////////////////////////////
135 @Test
136 @LoadGraphWith(MODERN)
137 public void shouldNotAllowBadGraphComputers() {
138 try {
139 graph.compute(BadGraphComputer.class);
140 fail("Providing a bad graph computer class should fail");
141 } catch (Exception ex) {
142 validateException(Graph.Exceptions.graphDoesNotSupportProvidedGraphComputer(BadGraphComputer.class), ex);
143 }
144 if (!new BadGraphComputer().features().supportsGraphFilter()) {
145 try {
146 new BadGraphComputer().vertices(__.hasLabel("software"));
147 fail("Should throw an unsupported operation exception");
148 } catch (final UnsupportedOperationException e) {
149 assertEquals(GraphComputer.Exceptions.graphFilterNotSupported().getMessage(), e.getMessage());
150 }
151 try {
152 new BadGraphComputer().edges(__.bothE());
153 fail("Should throw an unsupported operation exception");
154 } catch (final UnsupportedOperationException e) {
155 assertEquals(GraphComputer.Exceptions.graphFilterNotSupported().getMessage(), e.getMessage());
156 }
157 } else {
158 fail("Should not support graph filter: " + BadGraphComputer.class);
159 }
160 }
161
162 public static class BadGraphComputer implements GraphComputer {
163
164 @Override
165 public GraphComputer result(final ResultGraph resultGraph) {
166 return null;
167 }
168
169 @Override
170 public GraphComputer persist(final Persist persist) {
171 return null;
172 }
173
174 @Override
175 public GraphComputer program(final VertexProgram vertexProgram) {
176 return null;
177 }
178
179 @Override
180 public GraphComputer mapReduce(final MapReduce mapReduce) {
181 return null;
182 }
183
184 @Override
185 public GraphComputer workers(final int workers) {
186 return null;
187 }
188
189 @Override
190 public GraphComputer vertices(final Traversal<Vertex, Vertex> vertexFilter) {
191 throw GraphComputer.Exceptions.graphFilterNotSupported();
192 }
193
194 @Override
195 public GraphComputer edges(final Traversal<Vertex, Edge> edgeFilter) {
196 throw GraphComputer.Exceptions.graphFilterNotSupported();
197 }
198
199 @Override
200 public GraphComputer configure(final String key, final Object value) {
201 return null;
202 }
203
204 @Override
205 public Future<ComputerResult> submit() {
206 return null;
207 }
208
209 public GraphComputer.Features features() {
210 return new Features() {
211 @Override
212 public boolean supportsGraphFilter() {
213 return false;
214 }
215 };
216 }
217 }
218 /////////////////////////////////////////////
219
220 /////////////////////////////////////////////
221 @Test
222 @LoadGraphWith(MODERN)
223 public void shouldHaveImmutableComputeResultMemory() throws Exception {
224 final ComputerResult results = graphProvider.getGraphComputer(graph).program(new VertexProgramB()).submit().get();
225
226 try {
227 results.memory().set("set", "test");
228 } catch (Exception ex) {
229 validateException(Memory.Exceptions.memoryIsCurrentlyImmutable(), ex);
230 }
231
232 try {
233 results.memory().add("incr", 1);
234 } catch (Exception ex) {
235 validateException(Memory.Exceptions.memoryIsCurrentlyImmutable(), ex);
236 }
237
238 try {
239 results.memory().add("and", true);
240 } catch (Exception ex) {
241 validateException(Memory.Exceptions.memoryIsCurrentlyImmutable(), ex);
242 }
243
244 try {
245 results.memory().add("or", false);
246 } catch (Exception ex) {
247 validateException(Memory.Exceptions.memoryIsCurrentlyImmutable(), ex);
248 }
249 }
250
251 public static class VertexProgramB extends StaticVertexProgram {
252 @Override
253 public void setup(final Memory memory) {
254 assertEquals(0, memory.getIteration());
255 assertTrue(memory.isInitialIteration());
256 }
257
258 @Override
259 public void execute(final Vertex vertex, final Messenger messenger, final Memory memory) {
260 assertEquals(0, memory.getIteration());
261 assertTrue(memory.isInitialIteration());
262 }
263
264 @Override
265 public boolean terminate(final Memory memory) {
266 assertEquals(0, memory.getIteration());
267 assertTrue(memory.isInitialIteration());
268 return true;
269 }
270
271 @Override
272 public Set<MemoryComputeKey> getMemoryComputeKeys() {
273 return new HashSet<>(Arrays.asList(
274 MemoryComputeKey.of("set", Operator.assign, true, false),
275 MemoryComputeKey.of("incr", Operator.sum, true, false),
276 MemoryComputeKey.of("and", Operator.and, true, false),
277 MemoryComputeKey.of("or", Operator.or, true, false)));
278 }
279
280 @Override
281 public Set<MessageScope> getMessageScopes(final Memory memory) {
282 return Collections.emptySet();
283 }
284
285 @Override
286 public GraphComputer.ResultGraph getPreferredResultGraph() {
287 return GraphComputer.ResultGraph.ORIGINAL;
288 }
289
290 @Override
291 public GraphComputer.Persist getPreferredPersist() {
292 return GraphComputer.Persist.NOTHING;
293 }
294 }
295 /////////////////////////////////////////////
296
297 @Test
298 @LoadGraphWith(MODERN)
299 public void shouldNotAllowNullMemoryKeys() throws Exception {
300 try {
301 graphProvider.getGraphComputer(graph).program(new VertexProgramC()).submit().get();
302 fail("Providing null memory key should fail");
303 } catch (Exception ex) {
304 // validateException(Memory.Exceptions.memoryKeyCanNotBeNull(), ex);
305 }
306 }
307
308 public static class VertexProgramC extends StaticVertexProgram {
309 @Override
310 public void setup(final Memory memory) {
311
312 }
313
314 @Override
315 public void execute(final Vertex vertex, final Messenger messenger, final Memory memory) {
316
317 }
318
319 @Override
320 public boolean terminate(final Memory memory) {
321 return true;
322 }
323
324 @Override
325 public Set<MemoryComputeKey> getMemoryComputeKeys() {
326 return Collections.singleton(MemoryComputeKey.of(null, Operator.or, true, false));
327 }
328
329 @Override
330 public Set<MessageScope> getMessageScopes(final Memory memory) {
331 return Collections.emptySet();
332 }
333
334 @Override
335 public GraphComputer.ResultGraph getPreferredResultGraph() {
336 return GraphComputer.ResultGraph.ORIGINAL;
337 }
338
339 @Override
340 public GraphComputer.Persist getPreferredPersist() {
341 return GraphComputer.Persist.NOTHING;
342 }
343 }
344 /////////////////////////////////////////////
345
346 /////////////////////////////////////////////
347 @Test
348 @LoadGraphWith(MODERN)
349 public void shouldNotAllowEmptyMemoryKeys() throws Exception {
350 try {
351 graphProvider.getGraphComputer(graph).program(new VertexProgramD()).submit().get();
352 fail("Providing empty memory key should fail");
353 } catch (Exception ex) {
354 validateException(Memory.Exceptions.memoryKeyCanNotBeEmpty(), ex);
355 }
356 }
357
358 public static class VertexProgramD extends StaticVertexProgram {
359 @Override
360 public void setup(final Memory memory) {
361
362 }
363
364 @Override
365 public void execute(final Vertex vertex, final Messenger messenger, final Memory memory) {
366
367 }
368
369 @Override
370 public boolean terminate(final Memory memory) {
371 return true;
372 }
373
374 @Override
375 public Set<MemoryComputeKey> getMemoryComputeKeys() {
376 return Collections.singleton(MemoryComputeKey.of("", Operator.or, true, false));
377 }
378
379 @Override
380 public Set<MessageScope> getMessageScopes(final Memory memory) {
381 return Collections.emptySet();
382 }
383
384 @Override
385 public GraphComputer.ResultGraph getPreferredResultGraph() {
386 return GraphComputer.ResultGraph.ORIGINAL;
387 }
388
389 @Override
390 public GraphComputer.Persist getPreferredPersist() {
391 return GraphComputer.Persist.NOTHING;
392 }
393 }
394 ////////////////////////////////////////////
395
396 ////////////////////////////////////////////
397 @Test
398 @LoadGraphWith(MODERN)
399 public void shouldHandleUndeclaredMemoryKeysCorrectly() throws Exception {
400 graphProvider.getGraphComputer(graph).program(new VertexProgramE()).submit().get();
401 }
402
403 public static class VertexProgramE extends StaticVertexProgram {
404 @Override
405 public void setup(final Memory memory) {
406 try {
407 memory.get("a");
408 fail("The memory key does not exist and should fail");
409 } catch (Exception e) {
410 validateException(Memory.Exceptions.memoryDoesNotExist("a"), e);
411 }
412 try {
413 memory.set("a", true);
414 fail("Setting a memory key that wasn't declared should fail");
415 } catch (Exception e) {
416 validateException(GraphComputer.Exceptions.providedKeyIsNotAMemoryComputeKey("a"), e);
417 }
418 }
419
420 @Override
421 public void execute(final Vertex vertex, final Messenger messenger, final Memory memory) {
422 try {
423 memory.get("a");
424 fail("The memory key does not exist and should fail");
425 } catch (Exception e) {
426 validateException(Memory.Exceptions.memoryDoesNotExist("a"), e);
427 }
428 try {
429 memory.add("a", true);
430 fail("Setting a memory key that wasn't declared should fail");
431 } catch (Exception e) {
432 validateException(GraphComputer.Exceptions.providedKeyIsNotAMemoryComputeKey("a"), e);
433 }
434 }
435
436 @Override
437 public boolean terminate(final Memory memory) {
438 try {
439 memory.get("a");
440 fail("The memory key does not exist and should fail");
441 } catch (Exception e) {
442 validateException(Memory.Exceptions.memoryDoesNotExist("a"), e);
443 }
444 try {
445 memory.set("a", true);
446 // fail("Setting a memory key that wasn't declared should fail");
447 } catch (Exception e) {
448 validateException(GraphComputer.Exceptions.providedKeyIsNotAMemoryComputeKey("a"), e);
449 }
450 return true;
451 }
452
453 @Override
454 public Set<MessageScope> getMessageScopes(final Memory memory) {
455 return Collections.emptySet();
456 }
457
458 @Override
459 public GraphComputer.ResultGraph getPreferredResultGraph() {
460 return GraphComputer.ResultGraph.ORIGINAL;
461 }
462
463 @Override
464 public GraphComputer.Persist getPreferredPersist() {
465 return GraphComputer.Persist.NOTHING;
466 }
467 }
468 ////////////////////////////////////////////
469
470 /////////////////////////////////////////////
471 @Test
472 @LoadGraphWith(MODERN)
473 public void shouldNotAllowTheSameComputerToExecutedTwice() throws Exception {
474 final GraphComputer computer = graphProvider.getGraphComputer(graph).program(new VertexProgramA());
475 computer.submit().get(); // this should work as its the first run of the graph computer
476
477 try {
478 computer.submit(); // this should fail as the computer has already been executed
479 fail("Using the same graph computer to compute again should not be possible");
480 } catch (IllegalStateException e) {
481
482 } catch (Exception e) {
483 fail("Should yield an illegal state exception for graph computer being executed twice");
484 }
485
486 // test no rerun of graph computer
487 try {
488 computer.submit(); // this should fail as the computer has already been executed even through new program submitted
489 fail("Using the same graph computer to compute again should not be possible");
490 } catch (IllegalStateException e) {
491
492 } catch (Exception e) {
493 fail("Should yield an illegal state exception for graph computer being executed twice");
494 }
495 }
496
497 public static class VertexProgramA extends StaticVertexProgram {
498
499 @Override
500 public void setup(final Memory memory) {
501
502 }
503
504 @Override
505 public void execute(final Vertex vertex, final Messenger messenger, final Memory memory) {
506
507 }
508
509 @Override
510 public boolean terminate(final Memory memory) {
511 return true;
512 }
513
514 @Override
515 public Set<MessageScope> getMessageScopes(final Memory memory) {
516 return Collections.emptySet();
517 }
518
519 @Override
520 public GraphComputer.ResultGraph getPreferredResultGraph() {
521 return GraphComputer.ResultGraph.ORIGINAL;
522 }
523
524 @Override
525 public GraphComputer.Persist getPreferredPersist() {
526 return GraphComputer.Persist.NOTHING;
527 }
528 }
529 /////////////////////////////////////////////
530
531 /////////////////////////////////////////////
532 @Test
533 @LoadGraphWith(MODERN)
534 public void shouldHaveConsistentMemoryVertexPropertiesAndExceptions() throws Exception {
535 ComputerResult results = graphProvider.getGraphComputer(graph).program(new VertexProgramF()).submit().get();
536 assertEquals(1, results.memory().getIteration());
537 assertEquals(2, results.memory().asMap().size());
538 assertEquals(2, results.memory().keys().size());
539 assertTrue(results.memory().keys().contains("a"));
540 assertTrue(results.memory().keys().contains("b"));
541 assertTrue(results.memory().getRuntime() >= 0);
542
543 assertEquals(12, results.memory().<Integer>get("a").intValue()); // 2 iterations
544 assertEquals(28, results.memory().<Integer>get("b").intValue());
545 try {
546 results.memory().get("BAD");
547 fail("Should throw an IllegalArgumentException");
548 } catch (IllegalArgumentException e) {
549 assertEquals(Memory.Exceptions.memoryDoesNotExist("BAD").getMessage(), e.getMessage());
550 }
551 assertEquals(Long.valueOf(0), results.graph().traversal().V().count().next()); // persist new/nothing.
552
553 results.graph().traversal().V().forEachRemaining(v -> {
554 assertTrue(v.property("nameLengthCounter").isPresent());
555 assertEquals(Integer.valueOf(v.<String>value("name").length() * 2), Integer.valueOf(v.<Integer>value("nameLengthCounter")));
556 });
557 }
558
559 public static class VertexProgramF extends StaticVertexProgram<Object> {
560
561 @Override
562 public void setup(final Memory memory) {
563
564 }
565
566 @Override
567 public void execute(final Vertex vertex, final Messenger<Object> messenger, final Memory memory) {
568 try {
569 vertex.property(VertexProperty.Cardinality.single, "blah", "blah");
570 fail("Should throw an IllegalArgumentException");
571 } catch (final IllegalArgumentException e) {
572 assertEquals(GraphComputer.Exceptions.providedKeyIsNotAnElementComputeKey("blah").getMessage(), e.getMessage());
573 } catch (final Exception e) {
574 fail("Should throw an IllegalArgumentException: " + e);
575 }
576
577 memory.add("a", 1);
578 if (memory.isInitialIteration()) {
579 vertex.property(VertexProperty.Cardinality.single, "nameLengthCounter", vertex.<String>value("name").length());
580 memory.add("b", vertex.<String>value("name").length());
581 } else {
582 vertex.property(VertexProperty.Cardinality.single, "nameLengthCounter", vertex.<String>value("name").length() + vertex.<Integer>value("nameLengthCounter"));
583 }
584 }
585
586 @Override
587 public boolean terminate(final Memory memory) {
588 return memory.getIteration() == 1;
589 }
590
591 @Override
592 public Set<VertexComputeKey> getVertexComputeKeys() {
593 return Collections.singleton(VertexComputeKey.of("nameLengthCounter", false));
594 }
595
596 @Override
597 public Set<MemoryComputeKey> getMemoryComputeKeys() {
598 return new HashSet<>(Arrays.asList(
599 MemoryComputeKey.of("a", Operator.sum, true, false),
600 MemoryComputeKey.of("b", Operator.sum, true, false)));
601 }
602
603 @Override
604 public Set<MessageScope> getMessageScopes(Memory memory) {
605 return Collections.emptySet();
606 }
607
608 @Override
609 public GraphComputer.ResultGraph getPreferredResultGraph() {
610 return GraphComputer.ResultGraph.NEW;
611 }
612
613 @Override
614 public GraphComputer.Persist getPreferredPersist() {
615 return GraphComputer.Persist.NOTHING;
616 }
617 }
618 /////////////////////////////////////////////
619
620 /////////////////////////////////////////////
621 @Test
622 @LoadGraphWith(MODERN)
623 public void shouldAndOrIncrCorrectlyThroughSubStages() throws Exception {
624 ComputerResult results = graphProvider.getGraphComputer(graph).program(new VertexProgramG()).submit().get();
625 assertEquals(2, results.memory().getIteration());
626 assertEquals(6, results.memory().asMap().size());
627 assertEquals(6, results.memory().keys().size());
628 assertTrue(results.memory().keys().contains("a"));
629 assertTrue(results.memory().keys().contains("b"));
630 assertTrue(results.memory().keys().contains("c"));
631 assertTrue(results.memory().keys().contains("d"));
632 assertTrue(results.memory().keys().contains("e"));
633 assertTrue(results.memory().keys().contains("f"));
634
635 assertEquals(Long.valueOf(18), results.memory().get("a"));
636 assertEquals(Long.valueOf(0), results.memory().get("b"));
637 assertFalse(results.memory().get("c"));
638 assertTrue(results.memory().get("d"));
639 assertTrue(results.memory().get("e"));
640 assertEquals(3, results.memory().<Integer>get("f").intValue());
641 }
642
643 public static class VertexProgramG extends StaticVertexProgram {
644
645 @Override
646 public void setup(final Memory memory) {
647 memory.set("a", 0l);
648 memory.set("b", 0l);
649 memory.set("c", true);
650 memory.set("d", false);
651 memory.set("e", true);
652 memory.set("f", memory.getIteration());
653 try {
654 memory.add("a", 0l);
655 fail("Should only allow Memory.set() during VertexProgram.setup()");
656 } catch (final Exception e) {
657 validateException(Memory.Exceptions.memoryAddOnlyDuringVertexProgramExecute("a"), e);
658 }
659 }
660
661 @Override
662 public void execute(final Vertex vertex, final Messenger messenger, final Memory memory) {
663 // test current step values
664 assertEquals(Long.valueOf(6 * memory.getIteration()), memory.get("a"));
665 assertEquals(Long.valueOf(0), memory.get("b"));
666 if (memory.isInitialIteration()) {
667 assertTrue(memory.get("c"));
668 assertFalse(memory.get("d"));
669 } else {
670 assertFalse(memory.get("c"));
671 assertTrue(memory.get("d"));
672 }
673 assertTrue(memory.get("e"));
674 assertEquals(memory.getIteration(), memory.<Integer>get("f").intValue());
675
676 // update current step values
677 memory.add("a", 1l);
678 memory.add("b", 1l);
679 memory.add("c", false);
680 memory.add("d", true);
681 memory.add("e", false);
682 memory.add("f", memory.getIteration() + 1);
683
684 // test current step values, should be the same as previous prior to update
685 assertEquals(Long.valueOf(6 * memory.getIteration()), memory.get("a"));
686 assertEquals(Long.valueOf(0), memory.get("b"));
687 if (memory.isInitialIteration()) {
688 assertTrue(memory.get("c"));
689 assertFalse(memory.get("d"));
690 } else {
691 assertFalse(memory.get("c"));
692 assertTrue(memory.get("d"));
693 }
694 assertTrue(memory.get("e"));
695 assertEquals(memory.getIteration(), memory.<Integer>get("f").intValue());
696 try {
697 memory.set("a", 0l);
698 fail("Should only allow Memory.add() during VertexProgram.execute()");
699 } catch (final Exception e) {
700 validateException(Memory.Exceptions.memorySetOnlyDuringVertexProgramSetUpAndTerminate("a"), e);
701 }
702 }
703
704 @Override
705 public boolean terminate(Memory memory) {
706 assertEquals(Long.valueOf(6 * (memory.getIteration() + 1)), memory.get("a"));
707 assertEquals(Long.valueOf(6), memory.get("b"));
708 assertFalse(memory.get("c"));
709 assertTrue(memory.get("d"));
710 assertFalse(memory.get("e"));
711 assertEquals(memory.getIteration() + 1, memory.<Integer>get("f").intValue());
712 memory.set("b", 0l);
713 memory.set("e", true);
714 try {
715 memory.add("a", 0l);
716 fail("Should only allow Memory.set() during VertexProgram.terminate()");
717 } catch (final Exception e) {
718 validateException(Memory.Exceptions.memoryAddOnlyDuringVertexProgramExecute("a"), e);
719 }
720 return memory.getIteration() > 1;
721 }
722
723 @Override
724 public Set<MemoryComputeKey> getMemoryComputeKeys() {
725 return new HashSet<>(Arrays.asList(
726 MemoryComputeKey.of("a", Operator.sum, true, false),
727 MemoryComputeKey.of("b", Operator.sum, true, false),
728 MemoryComputeKey.of("c", Operator.and, true, false),
729 MemoryComputeKey.of("d", Operator.or, true, false),
730 MemoryComputeKey.of("e", Operator.and, true, false),
731 MemoryComputeKey.of("f", Operator.assign, true, false)));
732 }
733
734 @Override
735 public Set<MessageScope> getMessageScopes(Memory memory) {
736 return Collections.emptySet();
737 }
738
739 @Override
740 public GraphComputer.ResultGraph getPreferredResultGraph() {
741 return GraphComputer.ResultGraph.NEW;
742 }
743
744 @Override
745 public GraphComputer.Persist getPreferredPersist() {
746 return GraphComputer.Persist.NOTHING;
747 }
748 }
749 /////////////////////////////////////////////
750
751 /////////////////////////////////////////////
752 @Test
753 @LoadGraphWith(MODERN)
754 public void shouldAllowMapReduceWithNoVertexProgram() throws Exception {
755 final ComputerResult results = graphProvider.getGraphComputer(graph).mapReduce(new MapReduceA()).submit().get();
756 assertEquals(123, results.memory().<Integer>get("ageSum").intValue());
757 }
758
759 private static class MapReduceA extends StaticMapReduce<MapReduce.NullObject, Integer, MapReduce.NullObject, Integer, Integer> {
760
761 @Override
762 public boolean doStage(final Stage stage) {
763 return stage.equals(Stage.MAP) || stage.equals(Stage.REDUCE);
764 }
765
766 @Override
767 public void map(final Vertex vertex, final MapEmitter<NullObject, Integer> emitter) {
768 vertex.<Integer>property("age").ifPresent(emitter::emit);
769 }
770
771 @Override
772 public void reduce(NullObject key, Iterator<Integer> values, ReduceEmitter<NullObject, Integer> emitter) {
773 int sum = 0;
774 while (values.hasNext()) {
775 sum = sum + values.next();
776 }
777 emitter.emit(sum);
778 }
779
780 @Override
781 public Integer generateFinalResult(Iterator<KeyValue<NullObject, Integer>> keyValues) {
782 return keyValues.next().getValue();
783 }
784
785 @Override
786 public String getMemoryKey() {
787 return "ageSum";
788 }
789 }
790 /////////////////////////////////////////////
791
792 @Test
793 @LoadGraphWith(MODERN)
794 public void shouldSupportMultipleMapReduceJobs() throws Exception {
795 final ComputerResult results = graphProvider.getGraphComputer(graph)
796 .program(new VertexProgramH())
797 .mapReduce(new MapReduceH1())
798 .mapReduce(new MapReduceH2()).submit().get();
799 assertEquals(60, results.memory().<Integer>get("a").intValue());
800 assertEquals(1, results.memory().<Integer>get("b").intValue());
801 }
802
803 public static class VertexProgramH extends StaticVertexProgram {
804
805 @Override
806 public void setup(final Memory memory) {
807
808 }
809
810 @Override
811 public void execute(Vertex vertex, Messenger messenger, Memory memory) {
812 vertex.property(VertexProperty.Cardinality.single, "counter", memory.isInitialIteration() ? 1 : vertex.<Integer>value("counter") + 1);
813 }
814
815 @Override
816 public boolean terminate(final Memory memory) {
817 return memory.getIteration() > 8;
818 }
819
820 @Override
821 public Set<VertexComputeKey> getVertexComputeKeys() {
822 return Collections.singleton(VertexComputeKey.of("counter", false));
823 }
824
825 @Override
826 public Set<MessageScope> getMessageScopes(Memory memory) {
827 return Collections.emptySet();
828 }
829
830 @Override
831 public GraphComputer.ResultGraph getPreferredResultGraph() {
832 return GraphComputer.ResultGraph.NEW;
833 }
834
835 @Override
836 public GraphComputer.Persist getPreferredPersist() {
837 return GraphComputer.Persist.NOTHING;
838 }
839 }
840
841 private static class MapReduceH1 extends StaticMapReduce<MapReduce.NullObject, Integer, MapReduce.NullObject, Integer, Integer> {
842
843 @Override
844 public boolean doStage(final Stage stage) {
845 return stage.equals(Stage.MAP) || stage.equals(Stage.REDUCE);
846 }
847
848 @Override
849 public void map(final Vertex vertex, final MapEmitter<NullObject, Integer> emitter) {
850 vertex.<Integer>property("counter").ifPresent(emitter::emit);
851 }
852
853 @Override
854 public void reduce(final NullObject key, final Iterator<Integer> values, final ReduceEmitter<NullObject, Integer> emitter) {
855 int sum = 0;
856 while (values.hasNext()) {
857 sum = sum + values.next();
858 }
859 emitter.emit(sum);
860 }
861
862 @Override
863 public Integer generateFinalResult(final Iterator<KeyValue<NullObject, Integer>> keyValues) {
864 return keyValues.next().getValue();
865 }
866
867 @Override
868 public String getMemoryKey() {
869 return "a";
870 }
871 }
872
873 private static class MapReduceH2 extends StaticMapReduce<Integer, Integer, Integer, Integer, Integer> {
874
875 @Override
876 public boolean doStage(final Stage stage) {
877 return true;
878 }
879
880 @Override
881 public void map(final Vertex vertex, final MapEmitter<Integer, Integer> emitter) {
882 vertex.<Integer>property("age").ifPresent(age -> emitter.emit(age, age));
883 }
884
885 @Override
886 public void combine(Integer key, Iterator<Integer> values, ReduceEmitter<Integer, Integer> emitter) {
887 values.forEachRemaining(i -> emitter.emit(i, 1));
888 }
889
890 @Override
891 public void reduce(Integer key, Iterator<Integer> values, ReduceEmitter<Integer, Integer> emitter) {
892 values.forEachRemaining(i -> emitter.emit(i, 1));
893 }
894
895 @Override
896 public Integer generateFinalResult(Iterator<KeyValue<Integer, Integer>> keyValues) {
897 return keyValues.next().getValue();
898 }
899
900 @Override
901 public String getMemoryKey() {
902 return "b";
903 }
904 }
905 /////////////////////////////////////////////
906
907 /////////////////////////////////////////////
908 @Test
909 @LoadGraphWith(MODERN)
910 public void shouldSortReduceOutput() throws Exception {
911 final ComputerResult results = graphProvider.getGraphComputer(graph).mapReduce(new MapReduceB()).submit().get();
912 final List<Integer> nameLengths = results.memory().get("nameLengths");
913 assertEquals(6, nameLengths.size());
914 for (int i = 1; i < nameLengths.size(); i++) {
915 assertTrue(nameLengths.get(i) <= nameLengths.get(i - 1));
916 }
917 }
918
919 public static class MapReduceB extends StaticMapReduce<Integer, Integer, Integer, Integer, List<Integer>> {
920
921 @Override
922 public boolean doStage(final Stage stage) {
923 return stage.equals(Stage.REDUCE) || stage.equals(Stage.MAP);
924 }
925
926 @Override
927 public void map(final Vertex vertex, final MapEmitter<Integer, Integer> emitter) {
928 emitter.emit(vertex.<String>value("name").length(), vertex.<String>value("name").length());
929 }
930
931 @Override
932 public void reduce(Integer key, Iterator<Integer> values, ReduceEmitter<Integer, Integer> emitter) {
933 values.forEachRemaining(id -> emitter.emit(id, id));
934 }
935
936 @Override
937 public Optional<Comparator<Integer>> getReduceKeySort() {
938 return Optional.of(Comparator.<Integer>reverseOrder());
939 }
940
941 @Override
942 public String getMemoryKey() {
943 return "nameLengths";
944 }
945
946 @Override
947 public List<Integer> generateFinalResult(final Iterator<KeyValue<Integer, Integer>> keyValues) {
948 final List<Integer> list = new ArrayList<>();
949 keyValues.forEachRemaining(nameLength -> list.add(nameLength.getKey()));
950 return list;
951 }
952 }
953
954 /////////////////////////////////////////////
955 @Test
956 @LoadGraphWith(MODERN)
957 public void shouldSortMapOutput() throws Exception {
958 final ComputerResult results = graphProvider.getGraphComputer(graph).mapReduce(new MapReduceBB()).submit().get();
959 final List<Integer> nameLengths = results.memory().get("nameLengths");
960 assertEquals(6, nameLengths.size());
961 for (int i = 1; i < nameLengths.size(); i++) {
962 assertTrue(nameLengths.get(i) <= nameLengths.get(i - 1));
963 }
964 }
965
966 public static class MapReduceBB extends StaticMapReduce<Integer, Integer, Integer, Integer, List<Integer>> {
967
968 @Override
969 public boolean doStage(final Stage stage) {
970 return stage.equals(Stage.MAP);
971 }
972
973 @Override
974 public void map(final Vertex vertex, final MapEmitter<Integer, Integer> emitter) {
975 emitter.emit(vertex.<String>value("name").length(), vertex.<String>value("name").length());
976 }
977
978 @Override
979 public Optional<Comparator<Integer>> getMapKeySort() {
980 return Optional.of(Comparator.<Integer>reverseOrder());
981 }
982
983 @Override
984 public String getMemoryKey() {
985 return "nameLengths";
986 }
987
988 @Override
989 public List<Integer> generateFinalResult(final Iterator<KeyValue<Integer, Integer>> keyValues) {
990 final List<Integer> list = new ArrayList<>();
991 keyValues.forEachRemaining(nameLength -> list.add(nameLength.getKey()));
992 return list;
993 }
994 }
995
996
997 /////////////////////////////////////////////
998 @Test
999 @LoadGraphWith(MODERN)
1000 public void shouldOnlyAllowReadingVertexPropertiesInMapReduce() throws Exception {
1001 graphProvider.getGraphComputer(graph).mapReduce(new MapReduceC()).submit().get();
1002 }
1003
1004 public static class MapReduceC extends StaticMapReduce<MapReduce.NullObject, MapReduce.NullObject, MapReduce.NullObject, MapReduce.NullObject, MapReduce.NullObject> {
1005
1006 @Override
1007 public boolean doStage(final Stage stage) {
1008 return stage.equals(Stage.MAP);
1009 }
1010
1011 @Override
1012 public void map(final Vertex vertex, final MapEmitter<MapReduce.NullObject, MapReduce.NullObject> emitter) {
1013 try {
1014 vertex.edges(Direction.OUT);
1015 fail("Edges should not be accessible in MapReduce.map()");
1016 } catch (final UnsupportedOperationException e) {
1017 assertEquals(GraphComputer.Exceptions.incidentAndAdjacentElementsCanNotBeAccessedInMapReduce().getMessage(), e.getMessage());
1018 }
1019 try {
1020 vertex.edges(Direction.IN);
1021 fail("Edges should not be accessible in MapReduce.map()");
1022 } catch (final UnsupportedOperationException e) {
1023 assertEquals(GraphComputer.Exceptions.incidentAndAdjacentElementsCanNotBeAccessedInMapReduce().getMessage(), e.getMessage());
1024 }
1025 try {
1026 vertex.edges(Direction.BOTH);
1027 fail("Edges should not be accessible in MapReduce.map()");
1028 } catch (final UnsupportedOperationException e) {
1029 assertEquals(GraphComputer.Exceptions.incidentAndAdjacentElementsCanNotBeAccessedInMapReduce().getMessage(), e.getMessage());
1030 }
1031 ////
1032 try {
1033 vertex.property("name", "bob");
1034 fail("Vertex properties should be immutable in MapReduce.map()");
1035 } catch (final UnsupportedOperationException e) {
1036 assertEquals(GraphComputer.Exceptions.vertexPropertiesCanNotBeUpdatedInMapReduce().getMessage(), e.getMessage());
1037 }
1038 try {
1039 vertex.property("name").property("test", 1);
1040 fail("Vertex properties should be immutable in MapReduce.map()");
1041 } catch (final UnsupportedOperationException e) {
1042 assertEquals(GraphComputer.Exceptions.vertexPropertiesCanNotBeUpdatedInMapReduce().getMessage(), e.getMessage());
1043 }
1044
1045 }
1046
1047 @Override
1048 public String getMemoryKey() {
1049 return "nothing";
1050 }
1051
1052 @Override
1053 public MapReduce.NullObject generateFinalResult(final Iterator<KeyValue<MapReduce.NullObject, MapReduce.NullObject>> keyValues) {
1054 return MapReduce.NullObject.instance();
1055 }
1056 }
1057 /////////////////////////////////////////////
1058
1059 /////////////////////////////////////////////
1060 @Test
1061 @LoadGraphWith(MODERN)
1062 public void shouldOnlyAllowIDAccessOfAdjacentVertices() throws Exception {
1063 graphProvider.getGraphComputer(graph).program(new VertexProgramI()).submit().get();
1064 }
1065
1066 public static class VertexProgramI extends StaticVertexProgram<MapReduce.NullObject> {
1067
1068 @Override
1069 public void setup(final Memory memory) {
1070
1071 }
1072
1073 @Override
1074 public void execute(Vertex vertex, Messenger messenger, Memory memory) {
1075 vertex.vertices(Direction.OUT).forEachRemaining(Vertex::id);
1076 vertex.vertices(Direction.IN).forEachRemaining(Vertex::id);
1077 vertex.vertices(Direction.BOTH).forEachRemaining(Vertex::id);
1078 if (vertex.vertices(Direction.OUT).hasNext()) {
1079 try {
1080 vertex.vertices(Direction.OUT).forEachRemaining(Vertex::label);
1081 fail("Adjacent vertex labels should not be accessible in VertexProgram.execute()");
1082 } catch (UnsupportedOperationException e) {
1083 assertEquals(GraphComputer.Exceptions.adjacentVertexLabelsCanNotBeRead().getMessage(), e.getMessage());
1084 }
1085 }
1086 if (vertex.vertices(Direction.IN).hasNext()) {
1087 try {
1088 vertex.vertices(Direction.IN).forEachRemaining(Vertex::label);
1089 fail("Adjacent vertex labels should not be accessible in VertexProgram.execute()");
1090 } catch (UnsupportedOperationException e) {
1091 assertEquals(GraphComputer.Exceptions.adjacentVertexLabelsCanNotBeRead().getMessage(), e.getMessage());
1092 }
1093 }
1094 if (vertex.vertices(Direction.BOTH).hasNext()) {
1095 try {
1096 vertex.vertices(Direction.BOTH).forEachRemaining(Vertex::label);
1097 fail("Adjacent vertex labels should not be accessible in VertexProgram.execute()");
1098 } catch (UnsupportedOperationException e) {
1099 assertEquals(GraphComputer.Exceptions.adjacentVertexLabelsCanNotBeRead().getMessage(), e.getMessage());
1100 }
1101 }
1102 ////////////////////
1103 if (vertex.vertices(Direction.OUT).hasNext()) {
1104 try {
1105 vertex.vertices(Direction.OUT).forEachRemaining(v -> v.property("name"));
1106 fail("Adjacent vertex properties should not be accessible in VertexProgram.execute()");
1107 } catch (UnsupportedOperationException e) {
1108 assertEquals(GraphComputer.Exceptions.adjacentVertexPropertiesCanNotBeReadOrUpdated().getMessage(), e.getMessage());
1109 }
1110 }
1111 if (vertex.vertices(Direction.IN).hasNext()) {
1112 try {
1113 vertex.vertices(Direction.IN).forEachRemaining(v -> v.property("name"));
1114 fail("Adjacent vertex properties should not be accessible in VertexProgram.execute()");
1115 } catch (UnsupportedOperationException e) {
1116 assertEquals(GraphComputer.Exceptions.adjacentVertexPropertiesCanNotBeReadOrUpdated().getMessage(), e.getMessage());
1117 }
1118 }
1119 if (vertex.vertices(Direction.BOTH).hasNext()) {
1120 try {
1121 vertex.vertices(Direction.BOTH).forEachRemaining(v -> v.property("name"));
1122 fail("Adjacent vertex properties should not be accessible in VertexProgram.execute()");
1123 } catch (UnsupportedOperationException e) {
1124 assertEquals(GraphComputer.Exceptions.adjacentVertexPropertiesCanNotBeReadOrUpdated().getMessage(), e.getMessage());
1125 }
1126 }
1127 ////////////////////
1128 if (vertex.vertices(Direction.BOTH).hasNext()) {
1129 try {
1130 vertex.vertices(Direction.BOTH).forEachRemaining(v -> v.edges(Direction.BOTH));
1131 fail("Adjacent vertex edges should not be accessible in VertexProgram.execute()");
1132 } catch (UnsupportedOperationException e) {
1133 assertEquals(GraphComputer.Exceptions.adjacentVertexEdgesAndVerticesCanNotBeReadOrUpdated().getMessage(), e.getMessage());
1134 }
1135 }
1136 if (vertex.vertices(Direction.BOTH).hasNext()) {
1137 try {
1138 vertex.vertices(Direction.BOTH).forEachRemaining(v -> v.vertices(Direction.BOTH));
1139 fail("Adjacent vertex vertices should not be accessible in VertexProgram.execute()");
1140 } catch (UnsupportedOperationException e) {
1141 assertEquals(GraphComputer.Exceptions.adjacentVertexEdgesAndVerticesCanNotBeReadOrUpdated().getMessage(), e.getMessage());
1142 }
1143 }
1144
1145 }
1146
1147 @Override
1148 public boolean terminate(final Memory memory) {
1149 return memory.getIteration() > 1;
1150 }
1151
1152 @Override
1153 public Set<MessageScope> getMessageScopes(Memory memory) {
1154 return Collections.emptySet();
1155 }
1156
1157 @Override
1158 public GraphComputer.ResultGraph getPreferredResultGraph() {
1159 return GraphComputer.ResultGraph.NEW;
1160 }
1161
1162 @Override
1163 public GraphComputer.Persist getPreferredPersist() {
1164 return GraphComputer.Persist.NOTHING;
1165 }
1166 }
1167 /////////////////////////////////////////////
1168
1169 /////////////////////////////////////////////
1170 @Test
1171 @LoadGraphWith(MODERN)
1172 public void shouldStartAndEndWorkersForVertexProgramAndMapReduce() throws Exception {
1173 MapReduceI.WORKER_START.clear();
1174 MapReduceI.WORKER_END.clear();
1175 assertEquals(3, graphProvider.getGraphComputer(graph).program(new VertexProgramJ()).mapReduce(new MapReduceI()).submit().get().memory().<Integer>get("a").intValue());
1176 if (MapReduceI.WORKER_START.size() == 2) {
1177 assertEquals(2, MapReduceI.WORKER_START.size());
1178 assertTrue(MapReduceI.WORKER_START.contains(MapReduce.Stage.MAP) && MapReduceI.WORKER_START.contains(MapReduce.Stage.REDUCE));
1179 } else {
1180 assertEquals(3, MapReduceI.WORKER_START.size());
1181 assertTrue(MapReduceI.WORKER_START.contains(MapReduce.Stage.MAP) && MapReduceI.WORKER_START.contains(MapReduce.Stage.COMBINE) && MapReduceI.WORKER_START.contains(MapReduce.Stage.REDUCE));
1182 }
1183 if (MapReduceI.WORKER_END.size() == 2) {
1184 assertEquals(2, MapReduceI.WORKER_END.size());
1185 assertTrue(MapReduceI.WORKER_END.contains(MapReduce.Stage.MAP) && MapReduceI.WORKER_END.contains(MapReduce.Stage.REDUCE));
1186 } else {
1187 assertEquals(3, MapReduceI.WORKER_END.size());
1188 assertTrue(MapReduceI.WORKER_END.contains(MapReduce.Stage.MAP) && MapReduceI.WORKER_END.contains(MapReduce.Stage.COMBINE) && MapReduceI.WORKER_END.contains(MapReduce.Stage.REDUCE));
1189 }
1190 }
1191
1192 public static class VertexProgramJ extends StaticVertexProgram {
1193
1194
1195 @Override
1196 public void setup(final Memory memory) {
1197 memory.set("test", memory.getIteration());
1198 }
1199
1200 @Override
1201 public void workerIterationStart(final Memory memory) {
1202 assertEquals(memory.getIteration() * 6, memory.<Integer>get("test").intValue());
1203 try {
1204 memory.add("test", memory.getIteration());
1205 fail("Should throw an immutable memory exception");
1206 } catch (IllegalStateException e) {
1207 assertEquals(Memory.Exceptions.memoryIsCurrentlyImmutable().getMessage(), e.getMessage());
1208 }
1209 }
1210
1211 @Override
1212 public void execute(Vertex vertex, Messenger messenger, Memory memory) {
1213 assertEquals(memory.getIteration() * 6, memory.<Integer>get("test").intValue());
1214 memory.add("test", 1);
1215 }
1216
1217 @Override
1218 public boolean terminate(final Memory memory) {
1219 return memory.getIteration() > 3;
1220 }
1221
1222 @Override
1223 public void workerIterationEnd(final Memory memory) {
1224 assertEquals(memory.getIteration() * 6, memory.<Integer>get("test").intValue());
1225 try {
1226 memory.set("test", memory.getIteration());
1227 fail("Should throw an immutable memory exception");
1228 } catch (IllegalStateException e) {
1229 assertEquals(Memory.Exceptions.memoryIsCurrentlyImmutable().getMessage(), e.getMessage());
1230 }
1231 }
1232
1233 @Override
1234 public Set<MemoryComputeKey> getMemoryComputeKeys() {
1235 return Collections.singleton(MemoryComputeKey.of("test", Operator.sum, true, false));
1236 }
1237
1238 @Override
1239 public Set<MessageScope> getMessageScopes(Memory memory) {
1240 return Collections.emptySet();
1241 }
1242
1243 @Override
1244 public GraphComputer.ResultGraph getPreferredResultGraph() {
1245 return GraphComputer.ResultGraph.NEW;
1246 }
1247
1248 @Override
1249 public GraphComputer.Persist getPreferredPersist() {
1250 return GraphComputer.Persist.NOTHING;
1251 }
1252 }
1253
1254 private static class MapReduceI extends StaticMapReduce<MapReduce.NullObject, Integer, MapReduce.NullObject, Integer, Integer> {
1255
1256 private static final Set<Stage> WORKER_START = new ConcurrentSkipListSet<>();
1257 private static final Set<Stage> WORKER_END = new ConcurrentSkipListSet<>();
1258
1259 @Override
1260 public boolean doStage(final Stage stage) {
1261 return true;
1262 }
1263
1264 @Override
1265 public void workerStart(final Stage stage) {
1266 WORKER_START.add(stage);
1267 if (!stage.equals(Stage.MAP))
1268 assertFalse(WORKER_END.isEmpty());
1269 }
1270
1271 @Override
1272 public void map(final Vertex vertex, final MapEmitter<NullObject, Integer> emitter) {
1273 emitter.emit(1);
1274 assertEquals(1, WORKER_START.size());
1275 assertTrue(WORKER_START.contains(Stage.MAP));
1276 }
1277
1278 @Override
1279 public void combine(final NullObject key, final Iterator<Integer> values, final ReduceEmitter<NullObject, Integer> emitter) {
1280 emitter.emit(2);
1281 assertEquals(2, WORKER_START.size());
1282 assertTrue(WORKER_START.contains(Stage.MAP) && WORKER_START.contains(Stage.COMBINE));
1283 assertFalse(WORKER_END.isEmpty());
1284 }
1285
1286 @Override
1287 public void reduce(final NullObject key, final Iterator<Integer> values, final ReduceEmitter<NullObject, Integer> emitter) {
1288 emitter.emit(3);
1289 if (WORKER_START.size() == 2) {
1290 assertEquals(2, WORKER_START.size());
1291 assertTrue(WORKER_START.contains(Stage.MAP) && WORKER_START.contains(Stage.REDUCE));
1292 } else {
1293 assertEquals(3, WORKER_START.size());
1294 assertTrue(WORKER_START.contains(Stage.MAP) && WORKER_START.contains(Stage.COMBINE) && WORKER_START.contains(Stage.REDUCE));
1295 }
1296 assertFalse(WORKER_END.isEmpty());
1297 }
1298
1299 @Override
1300 public void workerEnd(final Stage stage) {
1301 assertFalse(WORKER_START.isEmpty());
1302 if (!stage.equals(Stage.MAP))
1303 assertFalse(WORKER_END.isEmpty());
1304 WORKER_END.add(stage);
1305 }
1306
1307 @Override
1308 public Integer generateFinalResult(final Iterator<KeyValue<NullObject, Integer>> keyValues) {
1309 assertEquals(3, keyValues.next().getValue().intValue());
1310 return 3;
1311 }
1312
1313 @Override
1314 public String getMemoryKey() {
1315 return "a";
1316 }
1317 }
1318
1319 /////////////////////////////////////////////
1320
1321 /////////////////////////////////////////////
1322 @Test
1323 @LoadGraphWith
1324 public void shouldSupportPersistResultGraphPairsStatedInFeatures() throws Exception {
1325 for (final GraphComputer.ResultGraph resultGraph : GraphComputer.ResultGraph.values()) {
1326 for (final GraphComputer.Persist persist : GraphComputer.Persist.values()) {
1327 final GraphComputer computer = graphProvider.getGraphComputer(graph);
1328 if (computer.features().supportsResultGraphPersistCombination(resultGraph, persist)) {
1329 computer.program(new VertexProgramK()).result(resultGraph).persist(persist).submit().get();
1330 } else {
1331 try {
1332 computer.program(new VertexProgramK()).result(resultGraph).persist(persist).submit().get();
1333 fail("The GraphComputer " + computer + " states that it does support the following resultGraph/persist pair: " + resultGraph + ":" + persist);
1334 } catch (final IllegalArgumentException e) {
1335 assertEquals(GraphComputer.Exceptions.resultGraphPersistCombinationNotSupported(resultGraph, persist).getMessage(), e.getMessage());
1336 }
1337 }
1338 }
1339 }
1340 }
1341
1342 @Test
1343 @LoadGraphWith(MODERN)
1344 public void shouldProcessResultGraphNewWithPersistNothing() throws Exception {
1345 final GraphComputer computer = graphProvider.getGraphComputer(graph);
1346 if (computer.features().supportsResultGraphPersistCombination(GraphComputer.ResultGraph.NEW, GraphComputer.Persist.NOTHING)) {
1347 final ComputerResult result = computer.program(new VertexProgramK()).result(GraphComputer.ResultGraph.NEW).persist(GraphComputer.Persist.NOTHING).submit().get();
1348 assertEquals(Long.valueOf(0l), result.graph().traversal().V().count().next());
1349 assertEquals(Long.valueOf(0l), result.graph().traversal().E().count().next());
1350 assertEquals(Long.valueOf(0l), result.graph().traversal().V().values().count().next());
1351 assertEquals(Long.valueOf(0l), result.graph().traversal().E().values().count().next());
1352 assertEquals(0, result.graph().traversal().V().values("money").sum().next());
1353 ///
1354 assertEquals(Long.valueOf(6l), graph.traversal().V().count().next());
1355 assertEquals(Long.valueOf(6l), graph.traversal().E().count().next());
1356 assertEquals(Long.valueOf(12l), graph.traversal().V().values().count().next());
1357 assertEquals(Long.valueOf(6l), graph.traversal().E().values().count().next());
1358 assertEquals(0, graph.traversal().V().values("money").sum().next());
1359 }
1360 }
1361
1362 @Test
1363 @LoadGraphWith(MODERN)
1364 public void shouldProcessResultGraphNewWithPersistVertexProperties() throws Exception {
1365 final GraphComputer computer = graphProvider.getGraphComputer(graph);
1366 if (computer.features().supportsResultGraphPersistCombination(GraphComputer.ResultGraph.NEW, GraphComputer.Persist.VERTEX_PROPERTIES)) {
1367 final ComputerResult result = computer.program(new VertexProgramK()).result(GraphComputer.ResultGraph.NEW).persist(GraphComputer.Persist.VERTEX_PROPERTIES).submit().get();
1368 assertEquals(Long.valueOf(6l), result.graph().traversal().V().count().next());
1369 assertEquals(Long.valueOf(0l), result.graph().traversal().E().count().next());
1370 assertEquals(Long.valueOf(18l), result.graph().traversal().V().values().count().next());
1371 assertEquals(Long.valueOf(0l), result.graph().traversal().E().values().count().next());
1372 assertEquals(28l, result.graph().traversal().V().values("money").sum().next());
1373 ///
1374 assertEquals(Long.valueOf(6l), graph.traversal().V().count().next());
1375 assertEquals(Long.valueOf(6l), graph.traversal().E().count().next());
1376 assertEquals(Long.valueOf(12l), graph.traversal().V().values().count().next());
1377 assertEquals(Long.valueOf(6l), graph.traversal().E().values().count().next());
1378 assertEquals(0, graph.traversal().V().values("money").sum().next());
1379 }
1380 }
1381
1382 @Test
1383 @LoadGraphWith(MODERN)
1384 public void shouldProcessResultGraphNewWithPersistEdges() throws Exception {
1385 final GraphComputer computer = graphProvider.getGraphComputer(graph);
1386 if (computer.features().supportsResultGraphPersistCombination(GraphComputer.ResultGraph.NEW, GraphComputer.Persist.EDGES)) {
1387 final ComputerResult result = computer.program(new VertexProgramK()).result(GraphComputer.ResultGraph.NEW).persist(GraphComputer.Persist.EDGES).submit().get();
1388 assertEquals(Long.valueOf(6l), result.graph().traversal().V().count().next());
1389 assertEquals(Long.valueOf(6l), result.graph().traversal().E().count().next());
1390 assertEquals(Long.valueOf(18l), result.graph().traversal().V().values().count().next());
1391 assertEquals(Long.valueOf(6l), result.graph().traversal().E().values().count().next());
1392 assertEquals(28l, result.graph().traversal().V().values("money").sum().next());
1393 ///
1394 assertEquals(Long.valueOf(6l), graph.traversal().V().count().next());
1395 assertEquals(Long.valueOf(6l), graph.traversal().E().count().next());
1396 assertEquals(Long.valueOf(12l), graph.traversal().V().values().count().next());
1397 assertEquals(Long.valueOf(6l), graph.traversal().E().values().count().next());
1398 assertEquals(0, graph.traversal().V().values("money").sum().next());
1399 }
1400 }
1401
1402 @Test
1403 @LoadGraphWith(MODERN)
1404 public void shouldProcessResultGraphOriginalWithPersistNothing() throws Exception {
1405 final GraphComputer computer = graphProvider.getGraphComputer(graph);
1406 if (computer.features().supportsResultGraphPersistCombination(GraphComputer.ResultGraph.ORIGINAL, GraphComputer.Persist.NOTHING)) {
1407 final ComputerResult result = computer.program(new VertexProgramK()).result(GraphComputer.ResultGraph.ORIGINAL).persist(GraphComputer.Persist.NOTHING).submit().get();
1408 assertEquals(Long.valueOf(6l), result.graph().traversal().V().count().next());
1409 assertEquals(Long.valueOf(6l), result.graph().traversal().E().count().next());
1410 assertEquals(Long.valueOf(12l), result.graph().traversal().V().values().count().next());
1411 assertEquals(Long.valueOf(6l), result.graph().traversal().E().values().count().next());
1412 assertEquals(0, result.graph().traversal().V().values("money").sum().next());
1413 ///
1414 assertEquals(Long.valueOf(6l), graph.traversal().V().count().next());
1415 assertEquals(Long.valueOf(6l), graph.traversal().E().count().next());
1416 assertEquals(Long.valueOf(12l), graph.traversal().V().values().count().next());
1417 assertEquals(Long.valueOf(6l), graph.traversal().E().values().count().next());
1418 assertEquals(0, graph.traversal().V().values("money").sum().next());
1419 }
1420 }
1421
1422 @Test
1423 @LoadGraphWith(MODERN)
1424 public void shouldProcessResultGraphOriginalWithPersistVertexProperties() throws Exception {
1425 final GraphComputer computer = graphProvider.getGraphComputer(graph);
1426 if (computer.features().supportsResultGraphPersistCombination(GraphComputer.ResultGraph.ORIGINAL, GraphComputer.Persist.VERTEX_PROPERTIES)) {
1427 final ComputerResult result = computer.program(new VertexProgramK()).result(GraphComputer.ResultGraph.ORIGINAL).persist(GraphComputer.Persist.VERTEX_PROPERTIES).submit().get();
1428 assertEquals(Long.valueOf(6l), result.graph().traversal().V().count().next());
1429 assertEquals(Long.valueOf(6l), result.graph().traversal().E().count().next());
1430 assertEquals(Long.valueOf(18l), result.graph().traversal().V().values().count().next());
1431 assertEquals(Long.valueOf(6l), result.graph().traversal().E().values().count().next());
1432 assertEquals(28l, result.graph().traversal().V().values("money").sum().next());
1433 ///
1434 assertEquals(Long.valueOf(6l), graph.traversal().V().count().next());
1435 assertEquals(Long.valueOf(6l), graph.traversal().E().count().next());
1436 assertEquals(Long.valueOf(18l), graph.traversal().V().values().count().next());
1437 assertEquals(Long.valueOf(6l), graph.traversal().E().values().count().next());
1438 assertEquals(28l, graph.traversal().V().values("money").sum().next());
1439 }
1440 }
1441
1442 @Test
1443 @LoadGraphWith(MODERN)
1444 public void shouldProcessResultGraphOriginalWithPersistEdges() throws Exception {
1445 final GraphComputer computer = graphProvider.getGraphComputer(graph);
1446 if (computer.features().supportsResultGraphPersistCombination(GraphComputer.ResultGraph.ORIGINAL, GraphComputer.Persist.EDGES)) {
1447 final ComputerResult result = computer.program(new VertexProgramK()).result(GraphComputer.ResultGraph.ORIGINAL).persist(GraphComputer.Persist.EDGES).submit().get();
1448 assertEquals(Long.valueOf(6l), result.graph().traversal().V().count().next());
1449 assertEquals(Long.valueOf(6l), result.graph().traversal().E().count().next());
1450 assertEquals(Long.valueOf(18l), result.graph().traversal().V().values().count().next());
1451 assertEquals(Long.valueOf(6l), result.graph().traversal().E().values().count().next());
1452 assertEquals(28l, result.graph().traversal().V().values("money").sum().next());
1453 ///
1454 assertEquals(Long.valueOf(6l), graph.traversal().V().count().next());
1455 assertEquals(Long.valueOf(6l), graph.traversal().E().count().next());
1456 assertEquals(Long.valueOf(18l), graph.traversal().V().values().count().next());
1457 assertEquals(Long.valueOf(6l), graph.traversal().E().values().count().next());
1458 assertEquals(28l, graph.traversal().V().values("money").sum().next());
1459 }
1460 }
1461
1462 public static class VertexProgramK extends StaticVertexProgram {
1463
1464
1465 @Override
1466 public void setup(final Memory memory) {
1467
1468 }
1469
1470 @Override
1471 public void execute(final Vertex vertex, final Messenger messenger, final Memory memory) {
1472 vertex.property("money", vertex.<String>value("name").length());
1473 }
1474
1475 @Override
1476 public boolean terminate(final Memory memory) {
1477 return true;
1478 }
1479
1480 @Override
1481 public Set<VertexComputeKey> getVertexComputeKeys() {
1482 return Collections.singleton(VertexComputeKey.of("money", false));
1483 }
1484
1485 @Override
1486 public Set<MessageScope> getMessageScopes(Memory memory) {
1487 return Collections.emptySet();
1488 }
1489
1490 @Override
1491 public GraphComputer.ResultGraph getPreferredResultGraph() {
1492 return GraphComputer.ResultGraph.NEW;
1493 }
1494
1495 @Override
1496 public GraphComputer.Persist getPreferredPersist() {
1497 return GraphComputer.Persist.EDGES;
1498 }
1499 }
1500
1501 /////////////////////////////////////////////
1502
1503 @Test
1504 @LoadGraphWith(GRATEFUL)
1505 public void shouldSupportWorkerCount() throws Exception {
1506 int maxWorkers = graphProvider.getGraphComputer(graph).features().getMaxWorkers();
1507 if (maxWorkers != Integer.MAX_VALUE) {
1508 for (int i = maxWorkers + 1; i < maxWorkers + 10; i++) {
1509 try {
1510 graphProvider.getGraphComputer(graph).program(new VertexProgramL()).workers(i).submit().get();
1511 fail("Should throw a GraphComputer.Exceptions.computerRequiresMoreWorkersThanSupported() exception");
1512 } catch (final IllegalArgumentException e) {
1513 assertTrue(e.getMessage().contains("computer requires more workers"));
1514 }
1515 }
1516 }
1517 if (maxWorkers > 25) maxWorkers = 25;
1518 for (int i = 1; i <= maxWorkers; i++) {
1519 ComputerResult result = graphProvider.getGraphComputer(graph).program(new VertexProgramL()).workers(i).submit().get();
1520 assertEquals(Integer.valueOf(i).longValue(), (long) result.memory().get("workerCount"));
1521 }
1522 }
1523
1524 public static class VertexProgramL implements VertexProgram {
1525
1526 boolean announced = false;
1527
1528 @Override
1529 public void setup(final Memory memory) {
1530 memory.set("workerCount", 0l);
1531 }
1532
1533 @Override
1534 public void execute(final Vertex vertex, final Messenger messenger, final Memory memory) {
1535 try {
1536 Thread.sleep(1);
1537 } catch (Exception e) {
1538 throw new IllegalStateException(e.getMessage(), e);
1539 }
1540 if (!this.announced) {
1541 memory.add("workerCount", 1l);
1542 this.announced = true;
1543 }
1544 }
1545
1546 @Override
1547 public boolean terminate(final Memory memory) {
1548 return true;
1549 }
1550
1551 @Override
1552 public Set<MemoryComputeKey> getMemoryComputeKeys() {
1553 return Collections.singleton(MemoryComputeKey.of("workerCount", Operator.sum, true, false));
1554 }
1555
1556 /*public void workerIterationStart(final Memory memory) {
1557 assertEquals(0l, (long) memory.get("workerCount"));
1558 }
1559
1560 public void workerIterationEnd(final Memory memory) {
1561 assertEquals(1l, (long) memory.get("workerCount"));
1562 }*/
1563
1564 @Override
1565 public Set<MessageScope> getMessageScopes(Memory memory) {
1566 return Collections.emptySet();
1567 }
1568
1569 @Override
1570 public GraphComputer.ResultGraph getPreferredResultGraph() {
1571 return GraphComputer.ResultGraph.NEW;
1572 }
1573
1574 @Override
1575 public GraphComputer.Persist getPreferredPersist() {
1576 return GraphComputer.Persist.NOTHING;
1577 }
1578
1579 @Override
1580 @SuppressWarnings("CloneDoesntCallSuperClone,CloneDoesntDeclareCloneNotSupportedException")
1581 public VertexProgramL clone() {
1582 return new VertexProgramL();
1583 }
1584
1585 @Override
1586 public void storeState(final Configuration configuration) {
1587 VertexProgram.super.storeState(configuration);
1588 }
1589 }
1590
1591 /////////////////////////////////////////////
1592 @Test
1593 @LoadGraphWith(MODERN)
1594 public void shouldSupportMultipleScopes() throws ExecutionException, InterruptedException {
1595 final ComputerResult result = graphProvider.getGraphComputer(graph).program(new MultiScopeVertexProgram()).submit().get();
1596 assertEquals(result.graph().traversal().V().has("name", "josh").next().property(MultiScopeVertexProgram.MEMORY_KEY).value(), 0L);
1597 assertEquals(result.graph().traversal().V().has("name", "lop").next().property(MultiScopeVertexProgram.MEMORY_KEY).value(), 1L);
1598 assertEquals(result.graph().traversal().V().has("name", "ripple").next().property(MultiScopeVertexProgram.MEMORY_KEY).value(), 1L);
1599 assertEquals(result.graph().traversal().V().has("name", "marko").next().property(MultiScopeVertexProgram.MEMORY_KEY).value(), 2L);
1600 }
1601
1602 public static class MultiScopeVertexProgram extends StaticVertexProgram<Long> {
1603
1604 private final MessageScope.Local<Long> countMessageScopeIn = MessageScope.Local.of(__::inE);
1605 private final MessageScope.Local<Long> countMessageScopeOut = MessageScope.Local.of(__::outE);
1606
1607 private static final String MEMORY_KEY = "count";
1608
1609
1610 @Override
1611 public void setup(final Memory memory) {
1612 }
1613
1614 @Override
1615 public GraphComputer.Persist getPreferredPersist() {
1616 return GraphComputer.Persist.VERTEX_PROPERTIES;
1617 }
1618
1619 @Override
1620 public Set<VertexComputeKey> getVertexComputeKeys() {
1621 return Collections.singleton(VertexComputeKey.of(MEMORY_KEY, false));
1622 }
1623
1624 @Override
1625 public Set<MessageScope> getMessageScopes(final Memory memory) {
1626 HashSet<MessageScope> scopes = new HashSet<>();
1627 scopes.add(countMessageScopeIn);
1628 scopes.add(countMessageScopeOut);
1629 return scopes;
1630 }
1631
1632 @Override
1633 public void execute(Vertex vertex, Messenger<Long> messenger, Memory memory) {
1634 switch (memory.getIteration()) {
1635 case 0:
1636 if (vertex.value("name").equals("josh")) {
1637 messenger.sendMessage(this.countMessageScopeIn, 2L);
1638 messenger.sendMessage(this.countMessageScopeOut, 1L);
1639 }
1640 break;
1641 case 1:
1642 long edgeCount = IteratorUtils.reduce(messenger.receiveMessages(), 0L, (a, b) -> a + b);
1643 vertex.property(MEMORY_KEY, edgeCount);
1644 break;
1645 }
1646 }
1647
1648 @Override
1649 public boolean terminate(final Memory memory) {
1650 return memory.getIteration() == 1;
1651 }
1652
1653 @Override
1654 public GraphComputer.ResultGraph getPreferredResultGraph() {
1655 return GraphComputer.ResultGraph.NEW;
1656 }
1657 }
1658
1659 /////////////////////////////////////////////
1660
1661 /////////////////////////////////////////////
1662 @Test
1663 @LoadGraphWith(MODERN)
1664 public void shouldSupportMultipleScopesWithEdgeFunction() throws ExecutionException, InterruptedException {
1665 final ComputerResult result = graphProvider.getGraphComputer(graph).program(new MultiScopeVertexWithEdgeFunctionProgram()).submit().get();
1666 assertEquals(result.graph().traversal().V().has("name", "josh").next().property(MultiScopeVertexProgram.MEMORY_KEY).value(), 0L);
1667 assertEquals(result.graph().traversal().V().has("name", "lop").next().property(MultiScopeVertexProgram.MEMORY_KEY).value(), 4L);
1668 assertEquals(result.graph().traversal().V().has("name", "ripple").next().property(MultiScopeVertexProgram.MEMORY_KEY).value(), 10L);
1669 assertEquals(result.graph().traversal().V().has("name", "marko").next().property(MultiScopeVertexProgram.MEMORY_KEY).value(), 20L);
1670 }
1671
1672 public static class MultiScopeVertexWithEdgeFunctionProgram extends StaticVertexProgram<Long> {
1673
1674 private final MessageScope.Local<Long> countMessageScopeIn = MessageScope.Local.of(__::inE, (m,e) -> m * Math.round(((Double) e.values("weight").next()) * 10));
1675 private final MessageScope.Local<Long> countMessageScopeOut = MessageScope.Local.of(__::outE, (m,e) -> m * Math.round(((Double) e.values("weight").next()) * 10));
1676
1677 private static final String MEMORY_KEY = "count";
1678
1679
1680 @Override
1681 public void setup(final Memory memory) {
1682 }
1683
1684 @Override
1685 public GraphComputer.Persist getPreferredPersist() {
1686 return GraphComputer.Persist.VERTEX_PROPERTIES;
1687 }
1688
1689 @Override
1690 public Set<VertexComputeKey> getVertexComputeKeys() {
1691 return Collections.singleton(VertexComputeKey.of(MEMORY_KEY, false));
1692 }
1693
1694 @Override
1695 public Set<MessageScope> getMessageScopes(final Memory memory) {
1696 HashSet<MessageScope> scopes = new HashSet<>();
1697 scopes.add(countMessageScopeIn);
1698 scopes.add(countMessageScopeOut);
1699 return scopes;
1700 }
1701
1702 @Override
1703 public void execute(Vertex vertex, Messenger<Long> messenger, Memory memory) {
1704 switch (memory.getIteration()) {
1705 case 0:
1706 if (vertex.value("name").equals("josh")) {
1707 messenger.sendMessage(this.countMessageScopeIn, 2L);
1708 messenger.sendMessage(this.countMessageScopeOut, 1L);
1709 }
1710 break;
1711 case 1:
1712 long edgeCount = IteratorUtils.reduce(messenger.receiveMessages(), 0L, (a, b) -> a + b);
1713 vertex.property(MEMORY_KEY, edgeCount);
1714 break;
1715 }
1716 }
1717
1718 @Override
1719 public boolean terminate(final Memory memory) {
1720 return memory.getIteration() == 1;
1721 }
1722
1723 @Override
1724 public GraphComputer.ResultGraph getPreferredResultGraph() {
1725 return GraphComputer.ResultGraph.NEW;
1726 }
1727 }
1728
1729 /////////////////////////////////////////////
1730
1731 @Test
1732 @LoadGraphWith(MODERN)
1733 public void shouldSupportGraphFilter() throws Exception {
1734 // if the graph computer does not support graph filter, then make sure its exception handling is correct
1735 if (!graphProvider.getGraphComputer(graph).features().supportsGraphFilter()) {
1736 try {
1737 graphProvider.getGraphComputer(graph).vertices(__.hasLabel("software"));
1738 fail("Should throw an unsupported operation exception");
1739 } catch (final UnsupportedOperationException e) {
1740 assertEquals(GraphComputer.Exceptions.graphFilterNotSupported().getMessage(), e.getMessage());
1741 }
1742 try {
1743 graphProvider.getGraphComputer(graph).edges(__.<Vertex>outE().limit(10));
1744 fail("Should throw an unsupported operation exception");
1745 } catch (final UnsupportedOperationException e) {
1746 assertEquals(GraphComputer.Exceptions.graphFilterNotSupported().getMessage(), e.getMessage());
1747 }
1748 return;
1749 }
1750 /// VERTEX PROGRAM
1751 graphProvider.getGraphComputer(graph).vertices(__.hasLabel("software")).program(new VertexProgramM(VertexProgramM.SOFTWARE_ONLY)).submit().get();
1752 graphProvider.getGraphComputer(graph).vertices(__.hasLabel("person")).program(new VertexProgramM(VertexProgramM.PEOPLE_ONLY)).submit().get();
1753 graphProvider.getGraphComputer(graph).edges(__.bothE("knows")).program(new VertexProgramM(VertexProgramM.KNOWS_ONLY)).submit().get();
1754 graphProvider.getGraphComputer(graph).vertices(__.hasLabel("person")).edges(__.bothE("knows")).program(new VertexProgramM(VertexProgramM.PEOPLE_KNOWS_ONLY)).submit().get();
1755 graphProvider.getGraphComputer(graph).vertices(__.hasLabel("person")).edges(__.<Vertex>bothE("knows").has("weight", P.gt(0.5f))).program(new VertexProgramM(VertexProgramM.PEOPLE_KNOWS_WELL_ONLY)).submit().get();
1756 graphProvider.getGraphComputer(graph).edges(__.<Vertex>bothE().limit(0)).program(new VertexProgramM(VertexProgramM.VERTICES_ONLY)).submit().get();
1757 graphProvider.getGraphComputer(graph).edges(__.<Vertex>outE().limit(1)).program(new VertexProgramM(VertexProgramM.ONE_OUT_EDGE_ONLY)).submit().get();
1758 graphProvider.getGraphComputer(graph).edges(outE()).program(new VertexProgramM(VertexProgramM.OUT_EDGES_ONLY)).submit().get();
1759
1760 /// VERTEX PROGRAM + MAP REDUCE
1761 graphProvider.getGraphComputer(graph).vertices(__.hasLabel("software")).program(new VertexProgramM(VertexProgramM.SOFTWARE_ONLY)).mapReduce(new MapReduceJ(VertexProgramM.SOFTWARE_ONLY)).submit().get();
1762 graphProvider.getGraphComputer(graph).vertices(__.hasLabel("person")).program(new VertexProgramM(VertexProgramM.PEOPLE_ONLY)).mapReduce(new MapReduceJ(VertexProgramM.PEOPLE_ONLY)).submit().get();
1763 graphProvider.getGraphComputer(graph).edges(__.bothE("knows")).program(new VertexProgramM(VertexProgramM.KNOWS_ONLY)).mapReduce(new MapReduceJ(VertexProgramM.KNOWS_ONLY)).submit().get();
1764 graphProvider.getGraphComputer(graph).vertices(__.hasLabel("person")).edges(__.bothE("knows")).program(new VertexProgramM(VertexProgramM.PEOPLE_KNOWS_ONLY)).mapReduce(new MapReduceJ(VertexProgramM.PEOPLE_KNOWS_ONLY)).submit().get();
1765 graphProvider.getGraphComputer(graph).vertices(__.hasLabel("person")).edges(__.<Vertex>bothE("knows").has("weight", P.gt(0.5f))).program(new VertexProgramM(VertexProgramM.PEOPLE_KNOWS_WELL_ONLY)).mapReduce(new MapReduceJ(VertexProgramM.PEOPLE_KNOWS_WELL_ONLY)).submit().get();
1766 graphProvider.getGraphComputer(graph).edges(__.<Vertex>bothE().limit(0)).program(new VertexProgramM(VertexProgramM.VERTICES_ONLY)).mapReduce(new MapReduceJ(VertexProgramM.VERTICES_ONLY)).submit().get();
1767 graphProvider.getGraphComputer(graph).edges(__.<Vertex>outE().limit(1)).program(new VertexProgramM(VertexProgramM.ONE_OUT_EDGE_ONLY)).mapReduce(new MapReduceJ(VertexProgramM.ONE_OUT_EDGE_ONLY)).submit().get();
1768 graphProvider.getGraphComputer(graph).edges(outE()).program(new VertexProgramM(VertexProgramM.OUT_EDGES_ONLY)).mapReduce(new MapReduceJ(VertexProgramM.OUT_EDGES_ONLY)).submit().get();
1769
1770 /// MAP REDUCE ONLY
1771 graphProvider.getGraphComputer(graph).vertices(__.hasLabel("software")).mapReduce(new MapReduceJ(VertexProgramM.SOFTWARE_ONLY)).submit().get();
1772 graphProvider.getGraphComputer(graph).vertices(__.hasLabel("person")).mapReduce(new MapReduceJ(VertexProgramM.PEOPLE_ONLY)).submit().get();
1773 graphProvider.getGraphComputer(graph).edges(__.bothE("knows")).mapReduce(new MapReduceJ(VertexProgramM.KNOWS_ONLY)).submit().get();
1774 graphProvider.getGraphComputer(graph).vertices(__.hasLabel("person")).edges(__.bothE("knows")).mapReduce(new MapReduceJ(VertexProgramM.PEOPLE_KNOWS_ONLY)).submit().get();
1775 graphProvider.getGraphComputer(graph).vertices(__.hasLabel("person")).edges(__.<Vertex>bothE("knows").has("weight", P.gt(0.5f))).mapReduce(new MapReduceJ(VertexProgramM.PEOPLE_KNOWS_WELL_ONLY)).submit().get();
1776 graphProvider.getGraphComputer(graph).edges(__.<Vertex>bothE().limit(0)).mapReduce(new MapReduceJ(VertexProgramM.VERTICES_ONLY)).submit().get();
1777 graphProvider.getGraphComputer(graph).edges(__.<Vertex>outE().limit(1)).mapReduce(new MapReduceJ(VertexProgramM.ONE_OUT_EDGE_ONLY)).submit().get();
1778 graphProvider.getGraphComputer(graph).edges(outE()).mapReduce(new MapReduceJ(VertexProgramM.OUT_EDGES_ONLY)).submit().get();
1779
1780 // EXCEPTION HANDLING
1781 try {
1782 graphProvider.getGraphComputer(graph).vertices(__.out());
1783 fail();
1784 } catch (final IllegalArgumentException e) {
1785 assertEquals(e.getMessage(), GraphComputer.Exceptions.vertexFilterAccessesIncidentEdges(__.out()).getMessage());
1786 }
1787 try {
1788 graphProvider.getGraphComputer(graph).edges(__.<Vertex>out().outE());
1789 fail();
1790 } catch (final IllegalArgumentException e) {
1791 assertEquals(e.getMessage(), GraphComputer.Exceptions.edgeFilterAccessesAdjacentVertices(__.<Vertex>out().outE()).getMessage());
1792 }
1793 }
1794
1795 public static class VertexProgramM implements VertexProgram {
1796
1797 public static final String SOFTWARE_ONLY = "softwareOnly";
1798 public static final String PEOPLE_ONLY = "peopleOnly";
1799 public static final String KNOWS_ONLY = "knowsOnly";
1800 public static final String PEOPLE_KNOWS_ONLY = "peopleKnowsOnly";
1801 public static final String PEOPLE_KNOWS_WELL_ONLY = "peopleKnowsWellOnly";
1802 public static final String VERTICES_ONLY = "verticesOnly";
1803 public static final String ONE_OUT_EDGE_ONLY = "oneOutEdgeOnly";
1804 public static final String OUT_EDGES_ONLY = "outEdgesOnly";
1805
1806 private String state;
1807
1808 public VertexProgramM() {
1809
1810 }
1811
1812 public VertexProgramM(final String state) {
1813 this.state = state;
1814 }
1815
1816 @Override
1817 public void setup(final Memory memory) {
1818
1819 }
1820
1821 @Override
1822 public void execute(final Vertex vertex, final Messenger messenger, final Memory memory) {
1823 switch (this.state) {
1824 case SOFTWARE_ONLY: {
1825 assertEquals("software", vertex.label());
1826 assertFalse(vertex.edges(Direction.OUT).hasNext());
1827 assertTrue(vertex.edges(Direction.IN).hasNext());
1828 assertTrue(vertex.edges(Direction.IN, "created").hasNext());
1829 assertFalse(vertex.edges(Direction.IN, "knows").hasNext());
1830 break;
1831 }
1832 case PEOPLE_ONLY: {
1833 assertEquals("person", vertex.label());
1834 assertFalse(vertex.edges(Direction.IN, "created").hasNext());
1835 assertTrue(IteratorUtils.count(vertex.edges(Direction.BOTH)) > 0);
1836 break;
1837 }
1838 case KNOWS_ONLY: {
1839 assertEquals(0, IteratorUtils.count(vertex.edges(Direction.BOTH, "created")));
1840 if (vertex.value("name").equals("marko"))
1841 assertEquals(2, IteratorUtils.count(vertex.edges(Direction.BOTH, "knows")));
1842 else if (vertex.value("name").equals("vadas"))
1843 assertEquals(1, IteratorUtils.count(vertex.edges(Direction.IN, "knows")));
1844 else if (vertex.value("name").equals("josh"))
1845 assertEquals(1, IteratorUtils.count(vertex.edges(Direction.IN, "knows")));
1846 else {
1847 assertEquals(0, IteratorUtils.count(vertex.edges(Direction.BOTH, "knows")));
1848 assertEquals(0, IteratorUtils.count(vertex.edges(Direction.BOTH)));
1849 }
1850 break;
1851 }
1852 case PEOPLE_KNOWS_ONLY: {
1853 assertEquals("person", vertex.label());
1854 assertEquals(0, IteratorUtils.count(vertex.edges(Direction.BOTH, "created")));
1855 if (vertex.value("name").equals("marko"))
1856 assertEquals(2, IteratorUtils.count(vertex.edges(Direction.BOTH, "knows")));
1857 else if (vertex.value("name").equals("vadas"))
1858 assertEquals(1, IteratorUtils.count(vertex.edges(Direction.IN, "knows")));
1859 else if (vertex.value("name").equals("josh"))
1860 assertEquals(1, IteratorUtils.count(vertex.edges(Direction.IN, "knows")));
1861 else {
1862 assertEquals(0, IteratorUtils.count(vertex.edges(Direction.BOTH, "knows")));
1863 assertEquals(0, IteratorUtils.count(vertex.edges(Direction.BOTH)));
1864 }
1865 break;
1866 }
1867 case PEOPLE_KNOWS_WELL_ONLY: {
1868 assertEquals("person", vertex.label());
1869 assertEquals(0, IteratorUtils.count(vertex.edges(Direction.BOTH, "created")));
1870 if (vertex.value("name").equals("marko")) {
1871 assertEquals(1, IteratorUtils.count(vertex.edges(Direction.BOTH, "knows")));
1872 assertEquals(1.0, vertex.edges(Direction.OUT, "knows").next().value("weight"), 0.001);
1873 } else if (vertex.value("name").equals("vadas"))
1874 assertEquals(0, IteratorUtils.count(vertex.edges(Direction.IN, "knows")));
1875 else if (vertex.value("name").equals("josh")) {
1876 assertEquals(1, IteratorUtils.count(vertex.edges(Direction.IN, "knows")));
1877 assertEquals(1.0, vertex.edges(Direction.IN, "knows").next().value("weight"), 0.001);
1878 } else {
1879 assertEquals(0, IteratorUtils.count(vertex.edges(Direction.BOTH, "knows")));
1880 assertEquals(0, IteratorUtils.count(vertex.edges(Direction.BOTH)));
1881 }
1882 break;
1883 }
1884 case VERTICES_ONLY: {
1885 assertEquals(0, IteratorUtils.count(vertex.edges(Direction.BOTH)));
1886 break;
1887 }
1888 case ONE_OUT_EDGE_ONLY: {
1889 if (vertex.label().equals("software") || vertex.value("name").equals("vadas"))
1890 assertEquals(0, IteratorUtils.count(vertex.edges(Direction.BOTH)));
1891 else {
1892 assertEquals(1, IteratorUtils.count(vertex.edges(Direction.OUT)));
1893 assertEquals(0, IteratorUtils.count(vertex.edges(Direction.IN)));
1894 assertEquals(1, IteratorUtils.count(vertex.edges(Direction.BOTH)));
1895 }
1896 break;
1897 }
1898 case OUT_EDGES_ONLY: {
1899 if (vertex.label().equals("software") || vertex.value("name").equals("vadas"))
1900 assertEquals(0, IteratorUtils.count(vertex.edges(Direction.BOTH)));
1901 else {
1902 assertTrue(IteratorUtils.count(vertex.edges(Direction.OUT)) > 0);
1903 assertEquals(0, IteratorUtils.count(vertex.edges(Direction.IN)));
1904 assertEquals(IteratorUtils.count(vertex.edges(Direction.OUT)), IteratorUtils.count(vertex.edges(Direction.BOTH)));
1905 }
1906 break;
1907 }
1908 default:
1909 throw new IllegalStateException("This is an illegal state for this test case: " + this.state);
1910 }
1911 }
1912
1913 @Override
1914 public boolean terminate(final Memory memory) {
1915 return true;
1916 }
1917
1918 @Override
1919 public Set<MessageScope> getMessageScopes(Memory memory) {
1920 return Collections.emptySet();
1921 }
1922
1923 @Override
1924 public GraphComputer.ResultGraph getPreferredResultGraph() {
1925 return GraphComputer.ResultGraph.NEW;
1926 }
1927
1928 @Override
1929 public GraphComputer.Persist getPreferredPersist() {
1930 return GraphComputer.Persist.NOTHING;
1931 }
1932
1933 @Override
1934 @SuppressWarnings("CloneDoesntCallSuperClone,CloneDoesntDeclareCloneNotSupportedException")
1935 public VertexProgramM clone() {
1936 return new VertexProgramM(this.state);
1937 }
1938
1939 @Override
1940 public void loadState(final Graph graph, final Configuration configuration) {
1941 this.state = configuration.getString("state");
1942 }
1943
1944 @Override
1945 public void storeState(final Configuration configuration) {
1946 configuration.setProperty("state", this.state);
1947 VertexProgram.super.storeState(configuration);
1948 }
1949
1950 }
1951
1952 private static class MapReduceJ implements MapReduce<MapReduce.NullObject, Integer, MapReduce.NullObject, Integer, Integer> {
1953
1954 private String state;
1955
1956 public MapReduceJ() {
1957 }
1958
1959 public MapReduceJ(final String state) {
1960 this.state = state;
1961 }
1962
1963 @Override
1964 public void loadState(final Graph graph, final Configuration configuration) {
1965 this.state = configuration.getString("state");
1966 }
1967
1968 @Override
1969 public void storeState(final Configuration configuration) {
1970 configuration.setProperty("state", this.state);
1971 MapReduce.super.storeState(configuration);
1972 }
1973
1974 @Override
1975 @SuppressWarnings("CloneDoesntCallSuperClone,CloneDoesntDeclareCloneNotSupportedException")
1976 public MapReduceJ clone() {
1977 return new MapReduceJ(this.state);
1978 }
1979
1980 @Override
1981 public boolean doStage(final Stage stage) {
1982 return true;
1983 }
1984
1985 @Override
1986 public void map(final Vertex vertex, final MapEmitter<NullObject, Integer> emitter) {
1987 emitter.emit(1);
1988 switch (this.state) {
1989 case VertexProgramM.SOFTWARE_ONLY: {
1990 assertEquals("software", vertex.label());
1991 break;
1992 }
1993 case VertexProgramM.PEOPLE_ONLY: {
1994 assertEquals("person", vertex.label());
1995 break;
1996 }
1997 case VertexProgramM.KNOWS_ONLY: {
1998 assertTrue(vertex.label().equals("person") || vertex.label().equals("software"));
1999 break;
2000 }
2001 case VertexProgramM.PEOPLE_KNOWS_ONLY: {
2002 assertEquals("person", vertex.label());
2003 break;
2004 }
2005 case VertexProgramM.PEOPLE_KNOWS_WELL_ONLY: {
2006 assertEquals("person", vertex.label());
2007 break;
2008 }
2009 case VertexProgramM.VERTICES_ONLY: {
2010 assertTrue(vertex.label().equals("person") || vertex.label().equals("software"));
2011 break;
2012 }
2013 case VertexProgramM.ONE_OUT_EDGE_ONLY: {
2014 assertTrue(vertex.label().equals("person") || vertex.label().equals("software"));
2015 break;
2016 }
2017 case VertexProgramM.OUT_EDGES_ONLY: {
2018 assertTrue(vertex.label().equals("person") || vertex.label().equals("software"));
2019 break;
2020 }
2021 default:
2022 throw new IllegalStateException("This is an illegal state for this test case: " + this.state);
2023 }
2024 }
2025
2026 @Override
2027 public void combine(final NullObject key, final Iterator<Integer> values, final ReduceEmitter<NullObject, Integer> emitter) {
2028 this.reduce(key, values, emitter);
2029 }
2030
2031 @Override
2032 public void reduce(final NullObject key, final Iterator<Integer> values, final ReduceEmitter<NullObject, Integer> emitter) {
2033 int count = 0;
2034 while (values.hasNext()) {
2035 count = count + values.next();
2036 }
2037 emitter.emit(count);
2038 }
2039
2040 @Override
2041 public Integer generateFinalResult(final Iterator<KeyValue<NullObject, Integer>> keyValues) {
2042 int counter = keyValues.next().getValue();
2043 assertFalse(keyValues.hasNext());
2044
2045 switch (this.state) {
2046 case VertexProgramM.SOFTWARE_ONLY: {
2047 assertEquals(2, counter);
2048 break;
2049 }
2050 case VertexProgramM.PEOPLE_ONLY: {
2051 assertEquals(4, counter);
2052 break;
2053 }
2054 case VertexProgramM.KNOWS_ONLY: {
2055 assertEquals(6, counter);
2056 break;
2057 }
2058 case VertexProgramM.PEOPLE_KNOWS_ONLY: {
2059 assertEquals(4, counter);
2060 break;
2061 }
2062 case VertexProgramM.PEOPLE_KNOWS_WELL_ONLY: {
2063 assertEquals(4, counter);
2064 break;
2065 }
2066 case VertexProgramM.VERTICES_ONLY: {
2067 assertEquals(6, counter);
2068 break;
2069 }
2070 case VertexProgramM.ONE_OUT_EDGE_ONLY: {
2071 assertEquals(6, counter);
2072 break;
2073 }
2074 case VertexProgramM.OUT_EDGES_ONLY: {
2075 assertEquals(6, counter);
2076 break;
2077 }
2078 default:
2079 throw new IllegalStateException("This is an illegal state for this test case: " + this.state);
2080 }
2081 return counter;
2082 }
2083
2084 @Override
2085 public String getMemoryKey() {
2086 return "a";
2087 }
2088 }
2089
2090 @Test
2091 @LoadGraphWith(MODERN)
2092 public void shouldSupportJobChaining() throws Exception {
2093 final ComputerResult result1 = graphProvider.getGraphComputer(graph)
2094 .program(PageRankVertexProgram.build().iterations(5).create(graph)).persist(GraphComputer.Persist.EDGES).result(GraphComputer.ResultGraph.NEW).submit().get();
2095 final Graph graph1 = result1.graph();
2096 final Memory memory1 = result1.memory();
2097 assertEquals(5, memory1.getIteration());
2098 assertEquals(6, graph1.traversal().V().count().next().intValue());
2099 assertEquals(6, graph1.traversal().E().count().next().intValue());
2100 assertEquals(6, graph1.traversal().V().values(PageRankVertexProgram.PAGE_RANK).count().next().intValue());
2101 assertEquals(18, graph1.traversal().V().values().count().next().intValue());
2102 //
2103 final ComputerResult result2 = graph1.compute(graphProvider.getGraphComputer(graph1).getClass())
2104 .program(PeerPressureVertexProgram.build().maxIterations(4).create(graph1)).persist(GraphComputer.Persist.EDGES).result(GraphComputer.ResultGraph.NEW).submit().get();
2105 final Graph graph2 = result2.graph();
2106 final Memory memory2 = result2.memory();
2107 assertTrue(memory2.getIteration() <= 4);
2108 assertEquals(6, graph2.traversal().V().count().next().intValue());
2109 assertEquals(6, graph2.traversal().E().count().next().intValue());
2110 assertEquals(6, graph2.traversal().V().values(PeerPressureVertexProgram.CLUSTER).count().next().intValue());
2111 assertEquals(6, graph2.traversal().V().values(PageRankVertexProgram.PAGE_RANK).count().next().intValue());
2112 assertEquals(24, graph2.traversal().V().values().count().next().intValue());
2113 //
2114 final ComputerResult result3 = graph2.compute(graphProvider.getGraphComputer(graph2).getClass())
2115 .program(TraversalVertexProgram.build().traversal(g.V().groupCount("m").by(__.values(PageRankVertexProgram.PAGE_RANK).count()).label().asAdmin()).create(graph2)).persist(GraphComputer.Persist.EDGES).result(GraphComputer.ResultGraph.NEW).submit().get();
2116 final Graph graph3 = result3.graph();
2117 final Memory memory3 = result3.memory();
2118 assertTrue(memory3.keys().contains("m"));
2119 assertTrue(memory3.keys().contains(TraversalVertexProgram.HALTED_TRAVERSERS));
2120 assertEquals(1, memory3.<Map<Long, Long>>get("m").size());
2121 assertEquals(6, memory3.<Map<Long, Long>>get("m").get(1l).intValue());
2122 List<Traverser<String>> traversers = IteratorUtils.list(memory3.<TraverserSet>get(TraversalVertexProgram.HALTED_TRAVERSERS).iterator());
2123 assertEquals(6l, traversers.stream().map(Traverser::bulk).reduce((a, b) -> a + b).get().longValue());
2124 assertEquals(4l, traversers.stream().filter(s -> s.get().equals("person")).map(Traverser::bulk).reduce((a, b) -> a + b).get().longValue());
2125 assertEquals(2l, traversers.stream().filter(s -> s.get().equals("software")).map(Traverser::bulk).reduce((a, b) -> a + b).get().longValue());
2126 assertEquals(6, graph3.traversal().V().count().next().intValue());
2127 assertEquals(6, graph3.traversal().E().count().next().intValue());
2128 assertEquals(0, graph3.traversal().V().values(TraversalVertexProgram.HALTED_TRAVERSERS).count().next().intValue());
2129 assertEquals(6, graph3.traversal().V().values(PeerPressureVertexProgram.CLUSTER).count().next().intValue());
2130 assertEquals(6, graph3.traversal().V().values(PageRankVertexProgram.PAGE_RANK).count().next().intValue());
2131 assertEquals(24, graph3.traversal().V().values().count().next().intValue()); // no halted traversers
2132
2133 // TODO: add a test the shows DAG behavior -- splitting another TraversalVertexProgram off of the PeerPressureVertexProgram job.
2134 }
2135
2136 ///////////////////////////////////
2137
2138 @Test
2139 @LoadGraphWith(MODERN)
2140 public void shouldSupportPreExistingComputeKeys() throws Exception {
2141 final ComputerResult result = graphProvider.getGraphComputer(graph).program(new VertexProgramN()).submit().get();
2142 result.graph().vertices().forEachRemaining(vertex -> {
2143 if (vertex.label().equals("person")) {
2144 if (vertex.value("name").equals("marko"))
2145 assertEquals(32, vertex.<Integer>value("age").intValue());
2146 else if (vertex.value("name").equals("peter"))
2147 assertEquals(38, vertex.<Integer>value("age").intValue());
2148 else if (vertex.value("name").equals("vadas"))
2149 assertEquals(30, vertex.<Integer>value("age").intValue());
2150 else if (vertex.value("name").equals("josh"))
2151 assertEquals(35, vertex.<Integer>value("age").intValue());
2152 else
2153 throw new IllegalStateException("This vertex should not have been accessed: " + vertex);
2154 }
2155 });
2156 }
2157
2158 private static class VertexProgramN extends StaticVertexProgram {
2159
2160 @Override
2161 public void setup(final Memory memory) {
2162
2163 }
2164
2165 @Override
2166 public void execute(final Vertex vertex, final Messenger messenger, final Memory memory) {
2167 if (vertex.label().equals("person"))
2168 vertex.property(VertexProperty.Cardinality.single, "age", vertex.<Integer>value("age") + 1);
2169 }
2170
2171 @Override
2172 public boolean terminate(final Memory memory) {
2173 return memory.getIteration() > 1;
2174 }
2175
2176 @Override
2177 public Set<MessageScope> getMessageScopes(final Memory memory) {
2178 return Collections.emptySet();
2179 }
2180
2181 @Override
2182 public Set<VertexComputeKey> getVertexComputeKeys() {
2183 return Collections.singleton(VertexComputeKey.of("age", false));
2184 }
2185
2186 @Override
2187 public GraphComputer.ResultGraph getPreferredResultGraph() {
2188 return GraphComputer.ResultGraph.NEW;
2189 }
2190
2191 @Override
2192 public GraphComputer.Persist getPreferredPersist() {
2193 return GraphComputer.Persist.VERTEX_PROPERTIES;
2194 }
2195 }
2196
2197 ///////////////////////////////////
2198
2199 @Test
2200 @LoadGraphWith(MODERN)
2201 public void shouldSupportTransientKeys() throws Exception {
2202 final ComputerResult result = graphProvider.getGraphComputer(graph).program(new VertexProgramO()).mapReduce(new MapReduceK()).submit().get();
2203 result.graph().vertices().forEachRemaining(vertex -> {
2204 assertFalse(vertex.property("v1").isPresent());
2205 assertFalse(vertex.property("v2").isPresent());
2206 assertTrue(vertex.property("v3").isPresent());
2207 assertEquals("shouldExist", vertex.value("v3"));
2208 assertTrue(vertex.property("name").isPresent());
2209 if (vertex.label().equals("software"))
2210 assertTrue(vertex.property("lang").isPresent());
2211 else
2212 assertTrue(vertex.property("age").isPresent());
2213 assertEquals(3, IteratorUtils.count(vertex.properties()));
2214 assertEquals(0, IteratorUtils.count(vertex.properties("v1")));
2215 assertEquals(0, IteratorUtils.count(vertex.properties("v2")));
2216 assertEquals(1, IteratorUtils.count(vertex.properties("v3")));
2217 assertEquals(1, IteratorUtils.count(vertex.properties("name")));
2218 });
2219 assertEquals(6l, result.graph().traversal().V().properties("name").count().next().longValue());
2220 assertEquals(0l, result.graph().traversal().V().properties("v1").count().next().longValue());
2221 assertEquals(0l, result.graph().traversal().V().properties("v2").count().next().longValue());
2222 assertEquals(6l, result.graph().traversal().V().properties("v3").count().next().longValue());
2223 assertEquals(6l, result.graph().traversal().V().<String>values("name").dedup().count().next().longValue());
2224 assertEquals(1l, result.graph().traversal().V().<String>values("v3").dedup().count().next().longValue());
2225 assertEquals("shouldExist", result.graph().traversal().V().<String>values("v3").dedup().next());
2226 ///
2227 assertFalse(result.memory().exists("m1"));
2228 assertFalse(result.memory().exists("m2"));
2229 assertTrue(result.memory().exists("m3"));
2230 assertEquals(24l, result.memory().<Long>get("m3").longValue());
2231 assertEquals(2, result.memory().keys().size()); // mapReduceK
2232 }
2233
2234 private static class VertexProgramO extends StaticVertexProgram {
2235
2236 @Override
2237 public void setup(final Memory memory) {
2238 assertFalse(memory.exists("m1"));
2239 assertFalse(memory.exists("m2"));
2240 assertFalse(memory.exists("m3"));
2241 }
2242
2243 @Override
2244 public void execute(final Vertex vertex, final Messenger messenger, final Memory memory) {
2245 if (memory.isInitialIteration()) {
2246 assertFalse(vertex.property("v1").isPresent());
2247 assertFalse(vertex.property("v2").isPresent());
2248 assertFalse(vertex.property("v3").isPresent());
2249 vertex.property("v1", "shouldNotExist");
2250 vertex.property("v2", "shouldNotExist");
2251 vertex.property("v3", "shouldExist");
2252 assertTrue(vertex.property("v1").isPresent());
2253 assertTrue(vertex.property("v2").isPresent());
2254 assertTrue(vertex.property("v3").isPresent());
2255 assertEquals("shouldNotExist", vertex.value("v1"));
2256 assertEquals("shouldNotExist", vertex.value("v2"));
2257 assertEquals("shouldExist", vertex.value("v3"));
2258 //
2259 assertFalse(memory.exists("m1"));
2260 assertFalse(memory.exists("m2"));
2261 assertFalse(memory.exists("m3"));
2262 memory.add("m1", false);
2263 memory.add("m2", true);
2264 memory.add("m3", 2l);
2265 // should still not exist as this pulls from the master memory
2266 assertFalse(memory.exists("m1"));
2267 assertFalse(memory.exists("m2"));
2268 assertFalse(memory.exists("m3"));
2269
2270 } else {
2271 assertTrue(vertex.property("v1").isPresent());
2272 assertTrue(vertex.property("v2").isPresent());
2273 assertTrue(vertex.property("v3").isPresent());
2274 assertEquals("shouldNotExist", vertex.value("v1"));
2275 assertEquals("shouldNotExist", vertex.value("v2"));
2276 assertEquals("shouldExist", vertex.value("v3"));
2277 //
2278 assertTrue(memory.exists("m1"));
2279 assertTrue(memory.exists("m2"));
2280 assertTrue(memory.exists("m3"));
2281 assertFalse(memory.get("m1"));
2282 assertTrue(memory.get("m2"));
2283 assertEquals(12l, memory.<Long>get("m3").longValue());
2284 memory.add("m1", true);
2285 memory.add("m2", true);
2286 memory.add("m3", 2l);
2287 }
2288 }
2289
2290 @Override
2291 public boolean terminate(final Memory memory) {
2292 assertTrue(memory.exists("m1"));
2293 assertTrue(memory.exists("m2"));
2294 assertTrue(memory.exists("m3"));
2295 if (memory.isInitialIteration()) {
2296 assertFalse(memory.get("m1"));
2297 assertTrue(memory.get("m2"));
2298 assertEquals(12l, memory.<Long>get("m3").longValue());
2299 return false;
2300 } else {
2301 assertTrue(memory.get("m1"));
2302 assertTrue(memory.get("m2"));
2303 assertEquals(24l, memory.<Long>get("m3").longValue());
2304 return true;
2305 }
2306 }
2307
2308 @Override
2309 public Set<MessageScope> getMessageScopes(final Memory memory) {
2310 return Collections.emptySet();
2311 }
2312
2313 @Override
2314 public Set<MemoryComputeKey> getMemoryComputeKeys() {
2315 return new HashSet<>(Arrays.asList(
2316 MemoryComputeKey.of("m1", Operator.or, true, true),
2317 MemoryComputeKey.of("m2", Operator.and, true, true),
2318 MemoryComputeKey.of("m3", Operator.sum, true, false)));
2319 }
2320
2321 @Override
2322 public Set<VertexComputeKey> getVertexComputeKeys() {
2323 return new HashSet<>(Arrays.asList(
2324 VertexComputeKey.of("v1", true),
2325 VertexComputeKey.of("v2", true),
2326 VertexComputeKey.of("v3", false)));
2327 }
2328
2329 @Override
2330 public GraphComputer.ResultGraph getPreferredResultGraph() {
2331 return GraphComputer.ResultGraph.NEW;
2332 }
2333
2334 @Override
2335 public GraphComputer.Persist getPreferredPersist() {
2336 return GraphComputer.Persist.VERTEX_PROPERTIES;
2337 }
2338 }
2339
2340 public static class MapReduceK extends StaticMapReduce {
2341
2342 @Override
2343 public boolean doStage(final Stage stage) {
2344 return stage.equals(Stage.MAP);
2345 }
2346
2347 @Override
2348 public void map(final Vertex vertex, final MapEmitter emitter) {
2349 assertFalse(vertex.property("v1").isPresent());
2350 assertFalse(vertex.property("v2").isPresent());
2351 assertTrue(vertex.property("v3").isPresent());
2352 assertTrue(vertex.property("name").isPresent());
2353 assertEquals(3, IteratorUtils.count(vertex.properties()));
2354 assertEquals(3, IteratorUtils.count(vertex.values()));
2355 }
2356
2357 @Override
2358 public String getMemoryKey() {
2359 return "mapReduceK";
2360 }
2361
2362 @Override
2363 public Object generateFinalResult(final Iterator keyValues) {
2364 return "anObject";
2365 }
2366 }
2367
2368 ///////////////////////////////////
2369
2370 @Test
2371 @LoadGraphWith(MODERN)
2372 public void shouldSupportBroadcastKeys() throws Exception {
2373 final ComputerResult result = graphProvider.getGraphComputer(graph).program(new VertexProgramP()).submit().get();
2374 assertTrue(result.memory().exists("m1"));
2375 assertFalse(result.memory().exists("m2"));
2376 assertFalse(result.memory().exists("m3"));
2377 assertTrue(result.memory().exists("m4"));
2378 assertTrue(result.memory().get("m1"));
2379 assertEquals(-18, result.memory().<Integer>get("m4").intValue());
2380 assertEquals(2, result.memory().keys().size());
2381 }
2382
2383 private static class VertexProgramP extends StaticVertexProgram {
2384
2385 @Override
2386 public void setup(final Memory memory) {
2387 assertFalse(memory.exists("m1")); // or
2388 assertFalse(memory.exists("m2")); // and
2389 assertFalse(memory.exists("m3")); // long
2390 assertFalse(memory.exists("m4")); // int
2391 memory.set("m1", false);
2392 memory.set("m2", true);
2393 memory.set("m3", 0l);
2394 memory.set("m4", 0);
2395 }
2396
2397 @Override
2398 public void execute(final Vertex vertex, final Messenger messenger, final Memory memory) {
2399 if (memory.isInitialIteration()) {
2400 assertFalse(memory.exists("m1"));
2401 assertTrue(memory.exists("m2"));
2402 assertTrue(memory.get("m2"));
2403 assertFalse(memory.exists("m3"));
2404 assertTrue(memory.exists("m4"));
2405 assertEquals(0, memory.<Integer>get("m4").intValue());
2406 memory.add("m1", false);
2407 memory.add("m2", true);
2408 memory.add("m3", 1l);
2409 memory.add("m4", -1);
2410 } else {
2411 assertFalse(memory.exists("m1")); // no broadcast
2412 assertTrue(memory.exists("m2"));
2413 assertFalse(memory.exists("m3")); // no broadcast
2414 assertTrue(memory.exists("m4"));
2415 try {
2416 assertFalse(memory.get("m1"));
2417 fail();
2418 } catch (final Exception e) {
2419 validateException(Memory.Exceptions.memoryDoesNotExist("m1"), e);
2420 }
2421 assertTrue(memory.get("m2"));
2422 try {
2423 assertEquals(6l, memory.<Long>get("m3").longValue());
2424 fail();
2425 } catch (final Exception e) {
2426 validateException(Memory.Exceptions.memoryDoesNotExist("m3"), e);
2427 }
2428 assertEquals(-6l, memory.<Integer>get("m4").intValue());
2429 ///
2430 memory.add("m1", true);
2431 memory.add("m2", true);
2432 memory.add("m3", 2l);
2433 memory.add("m4", -2);
2434 }
2435 }
2436
2437 @Override
2438 public boolean terminate(final Memory memory) {
2439 assertTrue(memory.exists("m1"));
2440 assertTrue(memory.exists("m2"));
2441 assertTrue(memory.exists("m3"));
2442 assertTrue(memory.exists("m4"));
2443 if (memory.isInitialIteration()) {
2444 assertFalse(memory.get("m1"));
2445 assertTrue(memory.get("m2"));
2446 assertEquals(6l, memory.<Long>get("m3").longValue());
2447 assertEquals(-6, memory.<Integer>get("m4").intValue());
2448 return false;
2449 } else {
2450 assertTrue(memory.get("m1"));
2451 assertTrue(memory.get("m2"));
2452 assertEquals(18l, memory.<Long>get("m3").longValue());
2453 assertEquals(-18, memory.<Integer>get("m4").intValue());
2454 return true;
2455 }
2456 }
2457
2458 @Override
2459 public Set<MessageScope> getMessageScopes(final Memory memory) {
2460 return Collections.emptySet();
2461 }
2462
2463 @Override
2464 public Set<MemoryComputeKey> getMemoryComputeKeys() {
2465 return new HashSet<>(Arrays.asList(
2466 MemoryComputeKey.of("m1", Operator.or, false, false),
2467 MemoryComputeKey.of("m2", Operator.and, true, true),
2468 MemoryComputeKey.of("m3", Operator.sum, false, true),
2469 MemoryComputeKey.of("m4", Operator.sum, true, false)));
2470 }
2471
2472 @Override
2473 public GraphComputer.ResultGraph getPreferredResultGraph() {
2474 return GraphComputer.ResultGraph.NEW;
2475 }
2476
2477 @Override
2478 public GraphComputer.Persist getPreferredPersist() {
2479 return GraphComputer.Persist.VERTEX_PROPERTIES;
2480 }
2481 }
2482
2483 ///////////////////////////////////
2484
2485 @Test
2486 @LoadGraphWith(MODERN)
2487 public void shouldSucceedWithProperTraverserRequirements() throws Exception {
2488
2489 final VertexProgramQ vp = VertexProgramQ.build().property("pl").create();
2490 final Map<String, List<Integer>> expected = new HashMap<>();
2491 expected.put("vadas", Collections.singletonList(2));
2492 expected.put("lop", Arrays.asList(2, 2, 2, 3));
2493 expected.put("josh", Collections.singletonList(2));
2494 expected.put("ripple", Arrays.asList(2, 3));
2495
2496 try {
2497 g.V().repeat(__.out()).emit().program(vp).dedup()
2498 .valueMap("name", "pl").forEachRemaining((Map<String, Object> map) -> {
2499
2500 final String name = (String) ((List) map.get("name")).get(0);
2501 final List<Integer> pathLengths = (List<Integer>) map.get("pl");
2502 assertTrue(expected.containsKey(name));
2503 final List<Integer> expectedPathLengths = expected.remove(name);
2504 assertTrue(expectedPathLengths.containsAll(pathLengths));
2505 assertTrue(pathLengths.containsAll(expectedPathLengths));
2506 });
2507
2508 assertTrue(expected.isEmpty());
2509 } catch (VerificationException ex) {
2510 assumeNoException(ex);
2511 }
2512 }
2513
2514 @Test
2515 @LoadGraphWith(MODERN)
2516 public void shouldFailWithImproperTraverserRequirements() throws Exception {
2517 final VertexProgramQ vp = VertexProgramQ.build().property("pl").useTraverserRequirements(false).create();
2518 try {
2519 g.V().repeat(__.out()).emit().program(vp).dedup()
2520 .forEachRemaining((Vertex v) -> assertFalse(v.property("pl").isPresent()));
2521 } catch (VerificationException ex) {
2522 assumeNoException(ex);
2523 }
2524 }
2525
2526 private static class VertexProgramQ implements VertexProgram<Object> {
2527
2528 private static final String VERTEX_PROGRAM_Q_CFG_PREFIX = "gremlin.vertexProgramQ";
2529 private static final String PROPERTY_CFG_KEY = VERTEX_PROGRAM_Q_CFG_PREFIX + ".property";
2530 private static final String LENGTHS_KEY = VERTEX_PROGRAM_Q_CFG_PREFIX + ".lengths";
2531 private static final String USE_TRAVERSER_REQUIREMENTS_CFG_KEY = VERTEX_PROGRAM_Q_CFG_PREFIX + ".useTraverserRequirements";
2532
2533 private final static Set<MemoryComputeKey> MEMORY_COMPUTE_KEYS = Collections.singleton(
2534 MemoryComputeKey.of(LENGTHS_KEY, Operator.addAll, true, true)
2535 );
2536
2537 private final Set<VertexComputeKey> elementComputeKeys;
2538 private Configuration configuration;
2539 private String propertyKey;
2540 private Set<TraverserRequirement> traverserRequirements;
2541
2542 private VertexProgramQ() {
2543 elementComputeKeys = new HashSet<>();
2544 }
2545
2546 public static Builder build() {
2547 return new Builder();
2548 }
2549
2550 static class Builder extends AbstractVertexProgramBuilder<Builder> {
2551
2552 private Builder() {
2553 super(VertexProgramQ.class);
2554 }
2555
2556 @SuppressWarnings("unchecked")
2557 @Override
2558 public VertexProgramQ create(final Graph graph) {
2559 if (graph != null) {
2560 ConfigurationUtils.append(graph.configuration().subset(VERTEX_PROGRAM_Q_CFG_PREFIX), configuration);
2561 }
2562 return (VertexProgramQ) VertexProgram.createVertexProgram(graph, configuration);
2563 }
2564
2565 public VertexProgramQ create() {
2566 return create(null);
2567 }
2568
2569 public Builder property(final String name) {
2570 configuration.setProperty(PROPERTY_CFG_KEY, name);
2571 return this;
2572 }
2573
2574 /**
2575 * This is only configurable for the purpose of testing. In a real-world VP this would be a bad pattern.
2576 */
2577 public Builder useTraverserRequirements(final boolean value) {
2578 configuration.setProperty(USE_TRAVERSER_REQUIREMENTS_CFG_KEY, value);
2579 return this;
2580 }
2581 }
2582
2583 @Override
2584 public</