HDFS-13330. ShortCircuitCache#fetchOrCreate never retries. Contributed by Gabor Bota.
[hadoop.git] / hadoop-hdfs-project / hadoop-hdfs / src / test / java / org / apache / hadoop / hdfs / shortcircuit / TestShortCircuitCache.java
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18 package org.apache.hadoop.hdfs.shortcircuit;
19
20 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
21 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CONTEXT;
22 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC;
23 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY;
24 import static org.hamcrest.CoreMatchers.equalTo;
25
26 import java.io.DataOutputStream;
27 import java.io.File;
28 import java.io.FileInputStream;
29 import java.io.FileOutputStream;
30 import java.io.IOException;
31 import java.util.Arrays;
32 import java.util.HashMap;
33 import java.util.Iterator;
34 import java.util.Map;
35 import java.util.concurrent.TimeoutException;
36
37 import net.jcip.annotations.NotThreadSafe;
38 import org.apache.commons.collections.map.LinkedMap;
39 import org.apache.commons.lang.mutable.MutableBoolean;
40 import org.apache.commons.logging.Log;
41 import org.apache.commons.logging.LogFactory;
42 import org.apache.hadoop.conf.Configuration;
43 import org.apache.hadoop.fs.FSDataInputStream;
44 import org.apache.hadoop.fs.Path;
45 import org.apache.hadoop.hdfs.client.impl.BlockReaderFactory;
46 import org.apache.hadoop.hdfs.client.impl.BlockReaderTestUtil;
47 import org.apache.hadoop.hdfs.DFSInputStream;
48 import org.apache.hadoop.hdfs.DFSTestUtil;
49 import org.apache.hadoop.hdfs.DistributedFileSystem;
50 import org.apache.hadoop.hdfs.ExtendedBlockId;
51 import org.apache.hadoop.hdfs.MiniDFSCluster;
52 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
53 import org.apache.hadoop.hdfs.net.DomainPeer;
54 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
55 import org.apache.hadoop.hdfs.protocol.DatanodeInfo.DatanodeInfoBuilder;
56 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
57 import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
58 import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
59 import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry;
60 import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry.RegisteredShm;
61 import org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.PerDatanodeVisitorInfo;
62 import org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.Visitor;
63 import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.CacheVisitor;
64 import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.ShortCircuitReplicaCreator;
65 import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.ShmId;
66 import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot;
67 import org.apache.hadoop.io.IOUtils;
68 import org.apache.hadoop.ipc.RetriableException;
69 import org.apache.hadoop.net.unix.DomainSocket;
70 import org.apache.hadoop.net.unix.TemporarySocketDirectory;
71 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
72 import org.apache.hadoop.test.GenericTestUtils;
73 import org.apache.hadoop.util.DataChecksum;
74 import org.apache.hadoop.util.Time;
75 import org.junit.Assert;
76 import org.junit.Assume;
77 import org.junit.Test;
78 import org.mockito.Mockito;
79 import org.mockito.invocation.InvocationOnMock;
80 import org.mockito.stubbing.Answer;
81
82 import com.google.common.base.Preconditions;
83 import com.google.common.base.Supplier;
84 import com.google.common.collect.HashMultimap;
85
86 @NotThreadSafe
87 public class TestShortCircuitCache {
88 static final Log LOG = LogFactory.getLog(TestShortCircuitCache.class);
89
90 private static class TestFileDescriptorPair {
91 final TemporarySocketDirectory dir = new TemporarySocketDirectory();
92 final FileInputStream[] fis;
93
94 public TestFileDescriptorPair() throws IOException {
95 fis = new FileInputStream[2];
96 for (int i = 0; i < 2; i++) {
97 String name = dir.getDir() + "/file" + i;
98 FileOutputStream fos = new FileOutputStream(name);
99 if (i == 0) {
100 // write 'data' file
101 fos.write(1);
102 } else {
103 // write 'metadata' file
104 BlockMetadataHeader header =
105 new BlockMetadataHeader((short)1,
106 DataChecksum.newDataChecksum(DataChecksum.Type.NULL, 4));
107 DataOutputStream dos = new DataOutputStream(fos);
108 BlockMetadataHeader.writeHeader(dos, header);
109 dos.close();
110 }
111 fos.close();
112 fis[i] = new FileInputStream(name);
113 }
114 }
115
116 public FileInputStream[] getFileInputStreams() {
117 return fis;
118 }
119
120 public void close() throws IOException {
121 IOUtils.cleanup(LOG, fis);
122 dir.close();
123 }
124
125 public boolean compareWith(FileInputStream data, FileInputStream meta) {
126 return ((data == fis[0]) && (meta == fis[1]));
127 }
128 }
129
130 private static class SimpleReplicaCreator
131 implements ShortCircuitReplicaCreator {
132 private final int blockId;
133 private final ShortCircuitCache cache;
134 private final TestFileDescriptorPair pair;
135
136 SimpleReplicaCreator(int blockId, ShortCircuitCache cache,
137 TestFileDescriptorPair pair) {
138 this.blockId = blockId;
139 this.cache = cache;
140 this.pair = pair;
141 }
142
143 @Override
144 public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
145 try {
146 ExtendedBlockId key = new ExtendedBlockId(blockId, "test_bp1");
147 return new ShortCircuitReplicaInfo(
148 new ShortCircuitReplica(key,
149 pair.getFileInputStreams()[0], pair.getFileInputStreams()[1],
150 cache, Time.monotonicNow(), null));
151 } catch (IOException e) {
152 throw new RuntimeException(e);
153 }
154 }
155 }
156
157 @Test(timeout=60000)
158 public void testCreateAndDestroy() throws Exception {
159 ShortCircuitCache cache =
160 new ShortCircuitCache(10, 1, 10, 1, 1, 10000, 0);
161 cache.close();
162 }
163
164 @Test(timeout=60000)
165 public void testAddAndRetrieve() throws Exception {
166 final ShortCircuitCache cache =
167 new ShortCircuitCache(10, 10000000, 10, 10000000, 1, 10000, 0);
168 final TestFileDescriptorPair pair = new TestFileDescriptorPair();
169 ShortCircuitReplicaInfo replicaInfo1 =
170 cache.fetchOrCreate(new ExtendedBlockId(123, "test_bp1"),
171 new SimpleReplicaCreator(123, cache, pair));
172 Preconditions.checkNotNull(replicaInfo1.getReplica());
173 Preconditions.checkState(replicaInfo1.getInvalidTokenException() == null);
174 pair.compareWith(replicaInfo1.getReplica().getDataStream(),
175 replicaInfo1.getReplica().getMetaStream());
176 ShortCircuitReplicaInfo replicaInfo2 =
177 cache.fetchOrCreate(new ExtendedBlockId(123, "test_bp1"),
178 new ShortCircuitReplicaCreator() {
179 @Override
180 public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
181 Assert.fail("expected to use existing entry.");
182 return null;
183 }
184 });
185 Preconditions.checkNotNull(replicaInfo2.getReplica());
186 Preconditions.checkState(replicaInfo2.getInvalidTokenException() == null);
187 Preconditions.checkState(replicaInfo1 == replicaInfo2);
188 pair.compareWith(replicaInfo2.getReplica().getDataStream(),
189 replicaInfo2.getReplica().getMetaStream());
190 replicaInfo1.getReplica().unref();
191 replicaInfo2.getReplica().unref();
192
193 // Even after the reference count falls to 0, we still keep the replica
194 // around for a while (we have configured the expiry period to be really,
195 // really long here)
196 ShortCircuitReplicaInfo replicaInfo3 =
197 cache.fetchOrCreate(
198 new ExtendedBlockId(123, "test_bp1"), new ShortCircuitReplicaCreator() {
199 @Override
200 public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
201 Assert.fail("expected to use existing entry.");
202 return null;
203 }
204 });
205 Preconditions.checkNotNull(replicaInfo3.getReplica());
206 Preconditions.checkState(replicaInfo3.getInvalidTokenException() == null);
207 replicaInfo3.getReplica().unref();
208
209 pair.close();
210 cache.close();
211 }
212
213 @Test(timeout=100000)
214 public void testExpiry() throws Exception {
215 final ShortCircuitCache cache =
216 new ShortCircuitCache(2, 1, 1, 10000000, 1, 10000000, 0);
217 final TestFileDescriptorPair pair = new TestFileDescriptorPair();
218 ShortCircuitReplicaInfo replicaInfo1 =
219 cache.fetchOrCreate(
220 new ExtendedBlockId(123, "test_bp1"),
221 new SimpleReplicaCreator(123, cache, pair));
222 Preconditions.checkNotNull(replicaInfo1.getReplica());
223 Preconditions.checkState(replicaInfo1.getInvalidTokenException() == null);
224 pair.compareWith(replicaInfo1.getReplica().getDataStream(),
225 replicaInfo1.getReplica().getMetaStream());
226 replicaInfo1.getReplica().unref();
227 final MutableBoolean triedToCreate = new MutableBoolean(false);
228 do {
229 Thread.sleep(10);
230 ShortCircuitReplicaInfo replicaInfo2 =
231 cache.fetchOrCreate(
232 new ExtendedBlockId(123, "test_bp1"), new ShortCircuitReplicaCreator() {
233 @Override
234 public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
235 triedToCreate.setValue(true);
236 return null;
237 }
238 });
239 if ((replicaInfo2 != null) && (replicaInfo2.getReplica() != null)) {
240 replicaInfo2.getReplica().unref();
241 }
242 } while (triedToCreate.isFalse());
243 cache.close();
244 }
245
246
247 @Test(timeout=60000)
248 public void testEviction() throws Exception {
249 final ShortCircuitCache cache =
250 new ShortCircuitCache(2, 10000000, 1, 10000000, 1, 10000, 0);
251 final TestFileDescriptorPair pairs[] = new TestFileDescriptorPair[] {
252 new TestFileDescriptorPair(),
253 new TestFileDescriptorPair(),
254 new TestFileDescriptorPair(),
255 };
256 ShortCircuitReplicaInfo replicaInfos[] = new ShortCircuitReplicaInfo[] {
257 null,
258 null,
259 null
260 };
261 for (int i = 0; i < pairs.length; i++) {
262 replicaInfos[i] = cache.fetchOrCreate(
263 new ExtendedBlockId(i, "test_bp1"),
264 new SimpleReplicaCreator(i, cache, pairs[i]));
265 Preconditions.checkNotNull(replicaInfos[i].getReplica());
266 Preconditions.checkState(replicaInfos[i].getInvalidTokenException() == null);
267 pairs[i].compareWith(replicaInfos[i].getReplica().getDataStream(),
268 replicaInfos[i].getReplica().getMetaStream());
269 }
270 // At this point, we have 3 replicas in use.
271 // Let's close them all.
272 for (int i = 0; i < pairs.length; i++) {
273 replicaInfos[i].getReplica().unref();
274 }
275 // The last two replicas should still be cached.
276 for (int i = 1; i < pairs.length; i++) {
277 final Integer iVal = i;
278 replicaInfos[i] = cache.fetchOrCreate(
279 new ExtendedBlockId(i, "test_bp1"),
280 new ShortCircuitReplicaCreator() {
281 @Override
282 public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
283 Assert.fail("expected to use existing entry for " + iVal);
284 return null;
285 }
286 });
287 Preconditions.checkNotNull(replicaInfos[i].getReplica());
288 Preconditions.checkState(replicaInfos[i].getInvalidTokenException() == null);
289 pairs[i].compareWith(replicaInfos[i].getReplica().getDataStream(),
290 replicaInfos[i].getReplica().getMetaStream());
291 }
292 // The first (oldest) replica should not be cached.
293 final MutableBoolean calledCreate = new MutableBoolean(false);
294 replicaInfos[0] = cache.fetchOrCreate(
295 new ExtendedBlockId(0, "test_bp1"),
296 new ShortCircuitReplicaCreator() {
297 @Override
298 public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
299 calledCreate.setValue(true);
300 return null;
301 }
302 });
303 Preconditions.checkState(replicaInfos[0].getReplica() == null);
304 Assert.assertTrue(calledCreate.isTrue());
305 // Clean up
306 for (int i = 1; i < pairs.length; i++) {
307 replicaInfos[i].getReplica().unref();
308 }
309 for (int i = 0; i < pairs.length; i++) {
310 pairs[i].close();
311 }
312 cache.close();
313 }
314
315 @Test(timeout=60000)
316 public void testTimeBasedStaleness() throws Exception {
317 // Set up the cache with a short staleness time.
318 final ShortCircuitCache cache =
319 new ShortCircuitCache(2, 10000000, 1, 10000000, 1, 10, 0);
320 final TestFileDescriptorPair pairs[] = new TestFileDescriptorPair[] {
321 new TestFileDescriptorPair(),
322 new TestFileDescriptorPair(),
323 };
324 ShortCircuitReplicaInfo replicaInfos[] = new ShortCircuitReplicaInfo[] {
325 null,
326 null
327 };
328 final long HOUR_IN_MS = 60 * 60 * 1000;
329 for (int i = 0; i < pairs.length; i++) {
330 final Integer iVal = i;
331 final ExtendedBlockId key = new ExtendedBlockId(i, "test_bp1");
332 replicaInfos[i] = cache.fetchOrCreate(key,
333 new ShortCircuitReplicaCreator() {
334 @Override
335 public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
336 try {
337 return new ShortCircuitReplicaInfo(
338 new ShortCircuitReplica(key,
339 pairs[iVal].getFileInputStreams()[0],
340 pairs[iVal].getFileInputStreams()[1],
341 cache, Time.monotonicNow() + (iVal * HOUR_IN_MS), null));
342 } catch (IOException e) {
343 throw new RuntimeException(e);
344 }
345 }
346 });
347 Preconditions.checkNotNull(replicaInfos[i].getReplica());
348 Preconditions.checkState(replicaInfos[i].getInvalidTokenException() == null);
349 pairs[i].compareWith(replicaInfos[i].getReplica().getDataStream(),
350 replicaInfos[i].getReplica().getMetaStream());
351 }
352
353 // Keep trying to getOrCreate block 0 until it goes stale (and we must re-create.)
354 GenericTestUtils.waitFor(new Supplier<Boolean>() {
355 @Override
356 public Boolean get() {
357 ShortCircuitReplicaInfo info = cache.fetchOrCreate(
358 new ExtendedBlockId(0, "test_bp1"), new ShortCircuitReplicaCreator() {
359 @Override
360 public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
361 return null;
362 }
363 });
364 if (info.getReplica() != null) {
365 info.getReplica().unref();
366 return false;
367 }
368 return true;
369 }
370 }, 500, 60000);
371
372 // Make sure that second replica did not go stale.
373 ShortCircuitReplicaInfo info = cache.fetchOrCreate(
374 new ExtendedBlockId(1, "test_bp1"), new ShortCircuitReplicaCreator() {
375 @Override
376 public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
377 Assert.fail("second replica went stale, despite 1 " +
378 "hour staleness time.");
379 return null;
380 }
381 });
382 info.getReplica().unref();
383
384 // Clean up
385 for (int i = 1; i < pairs.length; i++) {
386 replicaInfos[i].getReplica().unref();
387 }
388 cache.close();
389 }
390
391 private static Configuration createShortCircuitConf(String testName,
392 TemporarySocketDirectory sockDir) {
393 Configuration conf = new Configuration();
394 conf.set(DFS_CLIENT_CONTEXT, testName);
395 conf.setLong(DFS_BLOCK_SIZE_KEY, 4096);
396 conf.set(DFS_DOMAIN_SOCKET_PATH_KEY, new File(sockDir.getDir(),
397 testName).getAbsolutePath());
398 conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true);
399 conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY,
400 false);
401 conf.setBoolean(DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC, false);
402 DFSInputStream.tcpReadsDisabledForTesting = true;
403 DomainSocket.disableBindPathValidation();
404 Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null));
405 return conf;
406 }
407
408 private static DomainPeer getDomainPeerToDn(Configuration conf)
409 throws IOException {
410 DomainSocket sock =
411 DomainSocket.connect(conf.get(DFS_DOMAIN_SOCKET_PATH_KEY));
412 return new DomainPeer(sock);
413 }
414
415 @Test(timeout=60000)
416 public void testAllocShm() throws Exception {
417 BlockReaderTestUtil.enableShortCircuitShmTracing();
418 TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
419 Configuration conf = createShortCircuitConf("testAllocShm", sockDir);
420 MiniDFSCluster cluster =
421 new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
422 cluster.waitActive();
423 DistributedFileSystem fs = cluster.getFileSystem();
424 final ShortCircuitCache cache =
425 fs.getClient().getClientContext().getShortCircuitCache();
426 cache.getDfsClientShmManager().visit(new Visitor() {
427 @Override
428 public void visit(HashMap<DatanodeInfo, PerDatanodeVisitorInfo> info)
429 throws IOException {
430 // The ClientShmManager starts off empty
431 Assert.assertEquals(0, info.size());
432 }
433 });
434 DomainPeer peer = getDomainPeerToDn(conf);
435 MutableBoolean usedPeer = new MutableBoolean(false);
436 ExtendedBlockId blockId = new ExtendedBlockId(123, "xyz");
437 final DatanodeInfo datanode = new DatanodeInfoBuilder()
438 .setNodeID(cluster.getDataNodes().get(0).getDatanodeId())
439 .build();
440 // Allocating the first shm slot requires using up a peer.
441 Slot slot = cache.allocShmSlot(datanode, peer, usedPeer,
442 blockId, "testAllocShm_client");
443 Assert.assertNotNull(slot);
444 Assert.assertTrue(usedPeer.booleanValue());
445 cache.getDfsClientShmManager().visit(new Visitor() {
446 @Override
447 public void visit(HashMap<DatanodeInfo, PerDatanodeVisitorInfo> info)
448 throws IOException {
449 // The ClientShmManager starts off empty
450 Assert.assertEquals(1, info.size());
451 PerDatanodeVisitorInfo vinfo = info.get(datanode);
452 Assert.assertFalse(vinfo.disabled);
453 Assert.assertEquals(0, vinfo.full.size());
454 Assert.assertEquals(1, vinfo.notFull.size());
455 }
456 });
457 cache.scheduleSlotReleaser(slot);
458 // Wait for the slot to be released, and the shared memory area to be
459 // closed. Since we didn't register this shared memory segment on the
460 // server, it will also be a test of how well the server deals with
461 // bogus client behavior.
462 GenericTestUtils.waitFor(new Supplier<Boolean>() {
463 @Override
464 public Boolean get() {
465 final MutableBoolean done = new MutableBoolean(false);
466 try {
467 cache.getDfsClientShmManager().visit(new Visitor() {
468 @Override
469 public void visit(HashMap<DatanodeInfo, PerDatanodeVisitorInfo> info)
470 throws IOException {
471 done.setValue(info.get(datanode).full.isEmpty() &&
472 info.get(datanode).notFull.isEmpty());
473 }
474 });
475 } catch (IOException e) {
476 LOG.error("error running visitor", e);
477 }
478 return done.booleanValue();
479 }
480 }, 10, 60000);
481 cluster.shutdown();
482 sockDir.close();
483 }
484
485 @Test(timeout=60000)
486 public void testShmBasedStaleness() throws Exception {
487 BlockReaderTestUtil.enableShortCircuitShmTracing();
488 TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
489 Configuration conf = createShortCircuitConf("testShmBasedStaleness", sockDir);
490 MiniDFSCluster cluster =
491 new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
492 cluster.waitActive();
493 DistributedFileSystem fs = cluster.getFileSystem();
494 final ShortCircuitCache cache =
495 fs.getClient().getClientContext().getShortCircuitCache();
496 String TEST_FILE = "/test_file";
497 final int TEST_FILE_LEN = 8193;
498 final int SEED = 0xFADED;
499 DFSTestUtil.createFile(fs, new Path(TEST_FILE), TEST_FILE_LEN,
500 (short)1, SEED);
501 FSDataInputStream fis = fs.open(new Path(TEST_FILE));
502 int first = fis.read();
503 final ExtendedBlock block =
504 DFSTestUtil.getFirstBlock(fs, new Path(TEST_FILE));
505 Assert.assertTrue(first != -1);
506 cache.accept(new CacheVisitor() {
507 @Override
508 public void visit(int numOutstandingMmaps,
509 Map<ExtendedBlockId, ShortCircuitReplica> replicas,
510 Map<ExtendedBlockId, InvalidToken> failedLoads,
511 LinkedMap evictable,
512 LinkedMap evictableMmapped) {
513 ShortCircuitReplica replica = replicas.get(
514 ExtendedBlockId.fromExtendedBlock(block));
515 Assert.assertNotNull(replica);
516 Assert.assertTrue(replica.getSlot().isValid());
517 }
518 });
519 // Stop the Namenode. This will close the socket keeping the client's
520 // shared memory segment alive, and make it stale.
521 cluster.getDataNodes().get(0).shutdown();
522 cache.accept(new CacheVisitor() {
523 @Override
524 public void visit(int numOutstandingMmaps,
525 Map<ExtendedBlockId, ShortCircuitReplica> replicas,
526 Map<ExtendedBlockId, InvalidToken> failedLoads,
527 LinkedMap evictable,
528 LinkedMap evictableMmapped) {
529 ShortCircuitReplica replica = replicas.get(
530 ExtendedBlockId.fromExtendedBlock(block));
531 Assert.assertNotNull(replica);
532 Assert.assertFalse(replica.getSlot().isValid());
533 }
534 });
535 cluster.shutdown();
536 sockDir.close();
537 }
538
539 /**
540 * Test unlinking a file whose blocks we are caching in the DFSClient.
541 * The DataNode will notify the DFSClient that the replica is stale via the
542 * ShortCircuitShm.
543 */
544 @Test(timeout=60000)
545 public void testUnlinkingReplicasInFileDescriptorCache() throws Exception {
546 BlockReaderTestUtil.enableShortCircuitShmTracing();
547 TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
548 Configuration conf = createShortCircuitConf(
549 "testUnlinkingReplicasInFileDescriptorCache", sockDir);
550 // We don't want the CacheCleaner to time out short-circuit shared memory
551 // segments during the test, so set the timeout really high.
552 conf.setLong(HdfsClientConfigKeys.Read.ShortCircuit.STREAMS_CACHE_EXPIRY_MS_KEY,
553 1000000000L);
554 MiniDFSCluster cluster =
555 new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
556 cluster.waitActive();
557 DistributedFileSystem fs = cluster.getFileSystem();
558 final ShortCircuitCache cache =
559 fs.getClient().getClientContext().getShortCircuitCache();
560 cache.getDfsClientShmManager().visit(new Visitor() {
561 @Override
562 public void visit(HashMap<DatanodeInfo, PerDatanodeVisitorInfo> info)
563 throws IOException {
564 // The ClientShmManager starts off empty.
565 Assert.assertEquals(0, info.size());
566 }
567 });
568 final Path TEST_PATH = new Path("/test_file");
569 final int TEST_FILE_LEN = 8193;
570 final int SEED = 0xFADE0;
571 DFSTestUtil.createFile(fs, TEST_PATH, TEST_FILE_LEN,
572 (short)1, SEED);
573 byte contents[] = DFSTestUtil.readFileBuffer(fs, TEST_PATH);
574 byte expected[] = DFSTestUtil.
575 calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
576 Assert.assertTrue(Arrays.equals(contents, expected));
577 // Loading this file brought the ShortCircuitReplica into our local
578 // replica cache.
579 final DatanodeInfo datanode = new DatanodeInfoBuilder()
580 .setNodeID(cluster.getDataNodes().get(0).getDatanodeId())
581 .build();
582 cache.getDfsClientShmManager().visit(new Visitor() {
583 @Override
584 public void visit(HashMap<DatanodeInfo, PerDatanodeVisitorInfo> info)
585 throws IOException {
586 Assert.assertTrue(info.get(datanode).full.isEmpty());
587 Assert.assertFalse(info.get(datanode).disabled);
588 Assert.assertEquals(1, info.get(datanode).notFull.values().size());
589 DfsClientShm shm =
590 info.get(datanode).notFull.values().iterator().next();
591 Assert.assertFalse(shm.isDisconnected());
592 }
593 });
594 // Remove the file whose blocks we just read.
595 fs.delete(TEST_PATH, false);
596
597 // Wait for the replica to be purged from the DFSClient's cache.
598 GenericTestUtils.waitFor(new Supplier<Boolean>() {
599 MutableBoolean done = new MutableBoolean(true);
600 @Override
601 public Boolean get() {
602 try {
603 done.setValue(true);
604 cache.getDfsClientShmManager().visit(new Visitor() {
605 @Override
606 public void visit(HashMap<DatanodeInfo,
607 PerDatanodeVisitorInfo> info) throws IOException {
608 Assert.assertTrue(info.get(datanode).full.isEmpty());
609 Assert.assertFalse(info.get(datanode).disabled);
610 Assert.assertEquals(1,
611 info.get(datanode).notFull.values().size());
612 DfsClientShm shm = info.get(datanode).notFull.values().
613 iterator().next();
614 // Check that all slots have been invalidated.
615 for (Iterator<Slot> iter = shm.slotIterator();
616 iter.hasNext(); ) {
617 Slot slot = iter.next();
618 if (slot.isValid()) {
619 done.setValue(false);
620 }
621 }
622 }
623 });
624 } catch (IOException e) {
625 LOG.error("error running visitor", e);
626 }
627 return done.booleanValue();
628 }
629 }, 10, 60000);
630 cluster.shutdown();
631 sockDir.close();
632 }
633
634 static private void checkNumberOfSegmentsAndSlots(final int expectedSegments,
635 final int expectedSlots, final ShortCircuitRegistry registry)
636 throws InterruptedException, TimeoutException {
637 GenericTestUtils.waitFor(new Supplier<Boolean>() {
638 @Override
639 public Boolean get() {
640 return registry.visit(new ShortCircuitRegistry.Visitor() {
641 @Override
642 public boolean accept(HashMap<ShmId, RegisteredShm> segments,
643 HashMultimap<ExtendedBlockId, Slot> slots) {
644 return (expectedSegments == segments.size()) &&
645 (expectedSlots == slots.size());
646 }
647 });
648 }
649 }, 100, 10000);
650
651 }
652
653 public static class TestCleanupFailureInjector
654 extends BlockReaderFactory.FailureInjector {
655 @Override
656 public void injectRequestFileDescriptorsFailure() throws IOException {
657 throw new IOException("injected I/O error");
658 }
659 }
660
661 // Regression test for HDFS-7915
662 @Test(timeout=60000)
663 public void testDataXceiverCleansUpSlotsOnFailure() throws Exception {
664 BlockReaderTestUtil.enableShortCircuitShmTracing();
665 TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
666 Configuration conf = createShortCircuitConf(
667 "testDataXceiverCleansUpSlotsOnFailure", sockDir);
668 conf.setLong(
669 HdfsClientConfigKeys.Read.ShortCircuit.STREAMS_CACHE_EXPIRY_MS_KEY,
670 1000000000L);
671 MiniDFSCluster cluster =
672 new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
673 cluster.waitActive();
674 DistributedFileSystem fs = cluster.getFileSystem();
675 final Path TEST_PATH1 = new Path("/test_file1");
676 final Path TEST_PATH2 = new Path("/test_file2");
677 final int TEST_FILE_LEN = 4096;
678 final int SEED = 0xFADE1;
679 DFSTestUtil.createFile(fs, TEST_PATH1, TEST_FILE_LEN,
680 (short)1, SEED);
681 DFSTestUtil.createFile(fs, TEST_PATH2, TEST_FILE_LEN,
682 (short)1, SEED);
683
684 // The first read should allocate one shared memory segment and slot.
685 DFSTestUtil.readFileBuffer(fs, TEST_PATH1);
686
687 // The second read should fail, and we should only have 1 segment and 1 slot
688 // left.
689 BlockReaderFactory.setFailureInjectorForTesting(
690 new TestCleanupFailureInjector());
691 try {
692 DFSTestUtil.readFileBuffer(fs, TEST_PATH2);
693 } catch (Throwable t) {
694 GenericTestUtils.assertExceptionContains("TCP reads were disabled for " +
695 "testing, but we failed to do a non-TCP read.", t);
696 }
697 checkNumberOfSegmentsAndSlots(1, 1,
698 cluster.getDataNodes().get(0).getShortCircuitRegistry());
699 cluster.shutdown();
700 sockDir.close();
701 }
702
703 // Regression test for HADOOP-11802
704 @Test(timeout=60000)
705 public void testDataXceiverHandlesRequestShortCircuitShmFailure()
706 throws Exception {
707 BlockReaderTestUtil.enableShortCircuitShmTracing();
708 TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
709 Configuration conf = createShortCircuitConf(
710 "testDataXceiverHandlesRequestShortCircuitShmFailure", sockDir);
711 conf.setLong(HdfsClientConfigKeys.Read.ShortCircuit.STREAMS_CACHE_EXPIRY_MS_KEY,
712 1000000000L);
713 MiniDFSCluster cluster =
714 new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
715 cluster.waitActive();
716 DistributedFileSystem fs = cluster.getFileSystem();
717 final Path TEST_PATH1 = new Path("/test_file1");
718 DFSTestUtil.createFile(fs, TEST_PATH1, 4096,
719 (short)1, 0xFADE1);
720 LOG.info("Setting failure injector and performing a read which " +
721 "should fail...");
722 DataNodeFaultInjector failureInjector = Mockito.mock(DataNodeFaultInjector.class);
723 Mockito.doAnswer(new Answer<Void>() {
724 @Override
725 public Void answer(InvocationOnMock invocation) throws Throwable {
726 throw new IOException("injected error into sendShmResponse");
727 }
728 }).when(failureInjector).sendShortCircuitShmResponse();
729 DataNodeFaultInjector prevInjector = DataNodeFaultInjector.get();
730 DataNodeFaultInjector.set(failureInjector);
731
732 try {
733 // The first read will try to allocate a shared memory segment and slot.
734 // The shared memory segment allocation will fail because of the failure
735 // injector.
736 DFSTestUtil.readFileBuffer(fs, TEST_PATH1);
737 Assert.fail("expected readFileBuffer to fail, but it succeeded.");
738 } catch (Throwable t) {
739 GenericTestUtils.assertExceptionContains("TCP reads were disabled for " +
740 "testing, but we failed to do a non-TCP read.", t);
741 }
742
743 checkNumberOfSegmentsAndSlots(0, 0,
744 cluster.getDataNodes().get(0).getShortCircuitRegistry());
745
746 LOG.info("Clearing failure injector and performing another read...");
747 DataNodeFaultInjector.set(prevInjector);
748
749 fs.getClient().getClientContext().getDomainSocketFactory().clearPathMap();
750
751 // The second read should succeed.
752 DFSTestUtil.readFileBuffer(fs, TEST_PATH1);
753
754 // We should have added a new short-circuit shared memory segment and slot.
755 checkNumberOfSegmentsAndSlots(1, 1,
756 cluster.getDataNodes().get(0).getShortCircuitRegistry());
757
758 cluster.shutdown();
759 sockDir.close();
760 }
761
762 public static class TestPreReceiptVerificationFailureInjector
763 extends BlockReaderFactory.FailureInjector {
764 @Override
765 public boolean getSupportsReceiptVerification() {
766 return false;
767 }
768 }
769
770 // Regression test for HDFS-8070
771 @Test(timeout=60000)
772 public void testPreReceiptVerificationDfsClientCanDoScr() throws Exception {
773 BlockReaderTestUtil.enableShortCircuitShmTracing();
774 TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
775 Configuration conf = createShortCircuitConf(
776 "testPreReceiptVerificationDfsClientCanDoScr", sockDir);
777 conf.setLong(
778 HdfsClientConfigKeys.Read.ShortCircuit.STREAMS_CACHE_EXPIRY_MS_KEY,
779 1000000000L);
780 MiniDFSCluster cluster =
781 new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
782 cluster.waitActive();
783 DistributedFileSystem fs = cluster.getFileSystem();
784 BlockReaderFactory.setFailureInjectorForTesting(
785 new TestPreReceiptVerificationFailureInjector());
786 final Path TEST_PATH1 = new Path("/test_file1");
787 DFSTestUtil.createFile(fs, TEST_PATH1, 4096, (short)1, 0xFADE2);
788 final Path TEST_PATH2 = new Path("/test_file2");
789 DFSTestUtil.createFile(fs, TEST_PATH2, 4096, (short)1, 0xFADE2);
790 DFSTestUtil.readFileBuffer(fs, TEST_PATH1);
791 DFSTestUtil.readFileBuffer(fs, TEST_PATH2);
792 checkNumberOfSegmentsAndSlots(1, 2,
793 cluster.getDataNodes().get(0).getShortCircuitRegistry());
794 cluster.shutdown();
795 sockDir.close();
796 }
797
798 @Test
799 public void testFetchOrCreateRetries() throws Exception {
800 try(ShortCircuitCache cache = Mockito
801 .spy(new ShortCircuitCache(10, 10000000, 10, 10000000, 1, 10000, 0))) {
802 final TestFileDescriptorPair pair = new TestFileDescriptorPair();
803 ExtendedBlockId extendedBlockId = new ExtendedBlockId(123, "test_bp1");
804 SimpleReplicaCreator sRC = new SimpleReplicaCreator(123, cache, pair);
805
806 // Arrange that fetch will throw RetriableException for any call
807 Mockito.doThrow(new RetriableException("Retry")).when(cache)
808 .fetch(Mockito.eq(extendedBlockId), Mockito.any());
809
810 // Act: calling fetchOrCreate two times
811 // first call: it will create and put entry to replicaInfoMap
812 // second call: it will call fetch to get info for entry, and should
813 // retry 3 times because RetriableException thrown
814 cache.fetchOrCreate(extendedBlockId, sRC);
815 cache.fetchOrCreate(extendedBlockId, sRC);
816
817 // Assert that fetchOrCreate retried to fetch at least 3 times
818 Mockito.verify(cache, Mockito.atLeast(3))
819 .fetch(Mockito.eq(extendedBlockId), Mockito.any());
820 }
821 }
822 }