IGNITE-8256 Fixed simulated node failure in the test
[ignite.git] / modules / core / src / test / java / org / apache / ignite / internal / processors / cache / distributed / dht / TxRecoveryStoreEnabledTest.java
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 package org.apache.ignite.internal.processors.cache.distributed.dht;
19
20 import java.util.concurrent.CountDownLatch;
21 import javax.cache.Cache;
22 import javax.cache.configuration.Factory;
23 import javax.cache.integration.CacheLoaderException;
24 import javax.cache.integration.CacheWriterException;
25 import org.apache.ignite.Ignite;
26 import org.apache.ignite.IgniteCache;
27 import org.apache.ignite.cache.CacheAtomicityMode;
28 import org.apache.ignite.cache.CacheMode;
29 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
30 import org.apache.ignite.cache.store.CacheStore;
31 import org.apache.ignite.cache.store.CacheStoreAdapter;
32 import org.apache.ignite.cluster.ClusterNode;
33 import org.apache.ignite.configuration.CacheConfiguration;
34 import org.apache.ignite.configuration.IgniteConfiguration;
35 import org.apache.ignite.failure.StopNodeFailureHandler;
36 import org.apache.ignite.internal.IgniteInternalFuture;
37 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
38 import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi;
39 import org.apache.ignite.internal.util.typedef.internal.U;
40 import org.apache.ignite.plugin.extensions.communication.Message;
41 import org.apache.ignite.spi.IgniteSpiException;
42 import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
43 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
44 import org.apache.ignite.transactions.Transaction;
45 import org.apache.ignite.transactions.TransactionConcurrency;
46
47 import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
48 import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
49 import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
50
51 /**
52 *
53 */
54 public class TxRecoveryStoreEnabledTest extends GridCommonAbstractTest {
55 /** Nodes count. */
56 private static final int NODES_CNT = 2;
57
58 /** CAche name. */
59 public static final String CACHE_NAME = "cache";
60
61 /** Latch. */
62 private static CountDownLatch latch;
63
64 /** {@inheritDoc} */
65 @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
66 IgniteConfiguration cfg = super.getConfiguration(gridName);
67
68 cfg.setCommunicationSpi(new TestCommunicationSpi());
69
70 CacheConfiguration ccfg = defaultCacheConfiguration();
71
72 ccfg.setName(CACHE_NAME);
73 ccfg.setNearConfiguration(null);
74 ccfg.setCacheMode(CacheMode.PARTITIONED);
75 ccfg.setBackups(1);
76 ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
77 ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
78 ccfg.setCacheStoreFactory(new TestCacheStoreFactory());
79 ccfg.setReadThrough(true);
80 ccfg.setWriteThrough(true);
81 ccfg.setWriteBehindEnabled(false);
82
83 cfg.setCacheConfiguration(ccfg);
84
85 cfg.setFailureHandler(new StopNodeFailureHandler());
86
87 return cfg;
88 }
89
90 /** {@inheritDoc} */
91 @Override protected void beforeTest() throws Exception {
92 latch = new CountDownLatch(1);
93
94 startGrids(NODES_CNT);
95 }
96
97 /** {@inheritDoc} */
98 @Override protected void afterTest() throws Exception {
99 stopAllGrids();
100 }
101
102 /**
103 * @throws Exception If failed.
104 */
105 public void testOptimistic() throws Exception {
106 checkTxRecovery(OPTIMISTIC);
107 }
108
109 /**
110 * @throws Exception If failed.
111 */
112 public void testPessimistic() throws Exception {
113 checkTxRecovery(PESSIMISTIC);
114 }
115
116 /**
117 * @throws Exception If failed.
118 */
119 private void checkTxRecovery(TransactionConcurrency concurrency) throws Exception {
120 final Ignite node0 = ignite(0);
121 Ignite node1 = ignite(1);
122
123 IgniteInternalFuture<?> fut = multithreadedAsync(new Runnable() {
124 @Override public void run() {
125 try {
126 latch.await();
127
128 IgniteConfiguration cfg = node0.configuration();
129
130 ((TestCommunicationSpi)cfg.getCommunicationSpi()).block();
131 ((IgniteDiscoverySpi)cfg.getDiscoverySpi()).simulateNodeFailure();
132 }
133 catch (InterruptedException e) {
134 Thread.currentThread().interrupt();
135 }
136 }
137 }, 1);
138
139 IgniteCache<Object, Object> cache0 = node0.cache(CACHE_NAME);
140
141 Integer key = primaryKey(cache0);
142
143 try (Transaction tx = node0.transactions().txStart(concurrency, READ_COMMITTED)) {
144 cache0.put(key, key);
145
146 tx.commit();
147 }
148 catch (Exception e) {
149 // No-op.
150 }
151
152 fut.get();
153
154 IgniteCache<Object, Object> cache1 = node1.cache(CACHE_NAME);
155
156 assertNull(cache1.get(key));
157 }
158
159 /**
160 *
161 */
162 private static class TestCacheStoreFactory implements Factory<CacheStore> {
163 /** {@inheritDoc} */
164 @Override public CacheStore create() {
165 return new TestCacheStore();
166 }
167 }
168
169 /**
170 *
171 */
172 private static class TestCacheStore extends CacheStoreAdapter<Integer, Integer> {
173 /** {@inheritDoc} */
174 @Override public void sessionEnd(boolean commit) {
175 if (latch.getCount() > 0) { // Need wait only on primary node.
176 latch.countDown();
177
178 try {
179 U.sleep(3000);
180 }
181 catch (IgniteInterruptedCheckedException e) {
182 Thread.currentThread().interrupt();
183 }
184 }
185 }
186
187 /** {@inheritDoc} */
188 @Override public Integer load(Integer key) throws CacheLoaderException {
189 return null;
190 }
191
192 /** {@inheritDoc} */
193 @Override public void write(Cache.Entry<? extends Integer, ? extends Integer> entry) throws CacheWriterException {
194 // No-op.
195 }
196
197 /** {@inheritDoc} */
198 @Override public void delete(Object key) throws CacheWriterException {
199 // no-op.
200 }
201 }
202
203 /**
204 *
205 */
206 private static class TestCommunicationSpi extends TcpCommunicationSpi {
207 /** Block. */
208 private volatile boolean block;
209
210 /** {@inheritDoc} */
211 @Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException {
212 if (!block)
213 super.sendMessage(node, msg);
214 }
215
216 /**
217 *
218 */
219 private void block() {
220 block = true;
221 }
222 }
223 }