IGNITE-7409: Exception before assert in tx.resume - Fixes #3374.
[ignite.git] / modules / core / src / main / java / org / apache / ignite / internal / processors / cache / transactions / IgniteTxManager.java
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 package org.apache.ignite.internal.processors.cache.transactions;
19
20 import java.io.Externalizable;
21 import java.util.ArrayList;
22 import java.util.Collection;
23 import java.util.Collections;
24 import java.util.HashSet;
25 import java.util.Iterator;
26 import java.util.LinkedHashSet;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.Set;
30 import java.util.UUID;
31 import java.util.concurrent.ConcurrentHashMap;
32 import java.util.concurrent.ConcurrentMap;
33 import org.apache.ignite.IgniteCheckedException;
34 import org.apache.ignite.IgniteClientDisconnectedException;
35 import org.apache.ignite.IgniteSystemProperties;
36 import org.apache.ignite.binary.BinaryObjectException;
37 import org.apache.ignite.cluster.ClusterNode;
38 import org.apache.ignite.events.DiscoveryEvent;
39 import org.apache.ignite.events.Event;
40 import org.apache.ignite.internal.IgniteInternalFuture;
41 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
42 import org.apache.ignite.internal.managers.communication.GridIoPolicy;
43 import org.apache.ignite.internal.managers.communication.GridMessageListener;
44 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
45 import org.apache.ignite.internal.pagemem.wal.record.TxRecord;
46 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
47 import org.apache.ignite.internal.processors.cache.CacheObjectsReleaseFuture;
48 import org.apache.ignite.internal.processors.cache.GridCacheContext;
49 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
50 import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
51 import org.apache.ignite.internal.processors.cache.GridCacheMapEntry;
52 import org.apache.ignite.internal.processors.cache.GridCacheMessage;
53 import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
54 import org.apache.ignite.internal.processors.cache.GridCacheVersionedFuture;
55 import org.apache.ignite.internal.processors.cache.GridCacheReturnCompletableWrapper;
56 import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter;
57 import org.apache.ignite.internal.processors.cache.GridDeferredAckMessageSender;
58 import org.apache.ignite.internal.processors.cache.distributed.GridCacheMappedVersion;
59 import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxFinishSync;
60 import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryFuture;
61 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedLockCancelledException;
62 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
63 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxLocal;
64 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxOnePhaseCommitAckRequest;
65 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxRemote;
66 import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtColocatedLockFuture;
67 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
68 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry;
69 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockFuture;
70 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearOptimisticTxPrepareFuture;
71 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
72 import org.apache.ignite.internal.processors.cache.transactions.TxDeadlockDetection.TxDeadlockFuture;
73 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
74 import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
75 import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException;
76 import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
77 import org.apache.ignite.internal.util.GridBoundedConcurrentOrderedMap;
78 import org.apache.ignite.internal.util.future.GridCompoundFuture;
79 import org.apache.ignite.internal.util.future.GridFutureAdapter;
80 import org.apache.ignite.internal.util.lang.IgnitePair;
81 import org.apache.ignite.internal.util.typedef.CI1;
82 import org.apache.ignite.internal.util.typedef.F;
83 import org.apache.ignite.internal.util.typedef.X;
84 import org.apache.ignite.internal.util.typedef.internal.CU;
85 import org.apache.ignite.internal.util.typedef.internal.U;
86 import org.apache.ignite.lang.IgniteFuture;
87 import org.apache.ignite.lang.IgniteReducer;
88 import org.apache.ignite.lang.IgniteUuid;
89 import org.apache.ignite.transactions.TransactionConcurrency;
90 import org.apache.ignite.transactions.TransactionIsolation;
91 import org.apache.ignite.transactions.TransactionState;
92 import org.jetbrains.annotations.Nullable;
93 import org.jsr166.ConcurrentLinkedDeque8;
94 import org.jsr166.ConcurrentLinkedHashMap;
95
96 import static org.apache.ignite.IgniteSystemProperties.IGNITE_DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_BUFFER_SIZE;
97 import static org.apache.ignite.IgniteSystemProperties.IGNITE_DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_TIMEOUT;
98 import static org.apache.ignite.IgniteSystemProperties.IGNITE_MAX_COMPLETED_TX_COUNT;
99 import static org.apache.ignite.IgniteSystemProperties.IGNITE_SLOW_TX_WARN_TIMEOUT;
100 import static org.apache.ignite.IgniteSystemProperties.IGNITE_TX_DEADLOCK_DETECTION_MAX_ITERS;
101 import static org.apache.ignite.IgniteSystemProperties.IGNITE_TX_SALVAGE_TIMEOUT;
102 import static org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_LOG_TX_RECORDS;
103 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
104 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
105 import static org.apache.ignite.internal.GridTopic.TOPIC_TX;
106 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
107 import static org.apache.ignite.internal.processors.cache.GridCacheOperation.READ;
108 import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isNearEnabled;
109 import static org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx.FinalizationStatus.RECOVERY_FINISH;
110 import static org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx.FinalizationStatus.USER_FINISH;
111 import static org.apache.ignite.internal.util.GridConcurrentFactory.newMap;
112 import static org.apache.ignite.transactions.TransactionState.ACTIVE;
113 import static org.apache.ignite.transactions.TransactionState.COMMITTED;
114 import static org.apache.ignite.transactions.TransactionState.COMMITTING;
115 import static org.apache.ignite.transactions.TransactionState.MARKED_ROLLBACK;
116 import static org.apache.ignite.transactions.TransactionState.PREPARED;
117 import static org.apache.ignite.transactions.TransactionState.PREPARING;
118 import static org.apache.ignite.transactions.TransactionState.SUSPENDED;
119 import static org.apache.ignite.transactions.TransactionState.UNKNOWN;
120 import static org.jsr166.ConcurrentLinkedHashMap.QueuePolicy.PER_SEGMENT_Q;
121
122 /**
123 * Cache transaction manager.
124 */
125 public class IgniteTxManager extends GridCacheSharedManagerAdapter {
126 /** Default maximum number of transactions that have completed. */
127 private static final int DFLT_MAX_COMPLETED_TX_CNT = 262144; // 2^18
128
129 /** Slow tx warn timeout (initialized to 0). */
130 private static final int SLOW_TX_WARN_TIMEOUT = Integer.getInteger(IGNITE_SLOW_TX_WARN_TIMEOUT, 0);
131
132 /** Tx salvage timeout. */
133 private static final int TX_SALVAGE_TIMEOUT = Integer.getInteger(IGNITE_TX_SALVAGE_TIMEOUT, 100);
134
135 /** One phase commit deferred ack request timeout. */
136 public static final int DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_TIMEOUT =
137 Integer.getInteger(IGNITE_DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_TIMEOUT, 500);
138
139 /** One phase commit deferred ack request buffer size. */
140 private static final int DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_BUFFER_SIZE =
141 Integer.getInteger(IGNITE_DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_BUFFER_SIZE, 256);
142
143 /** Deadlock detection maximum iterations. */
144 static int DEADLOCK_MAX_ITERS =
145 IgniteSystemProperties.getInteger(IGNITE_TX_DEADLOCK_DETECTION_MAX_ITERS, 1000);
146
147 /** Committing transactions. */
148 private final ThreadLocal<IgniteInternalTx> threadCtx = new ThreadLocal<>();
149
150 /** Topology version should be used when mapping internal tx. */
151 private final ThreadLocal<AffinityTopologyVersion> txTop = new ThreadLocal<>();
152
153 /** Per-thread transaction map. */
154 private final ConcurrentMap<Long, IgniteInternalTx> threadMap = newMap();
155
156 /** Per-thread system transaction map. */
157 private final ConcurrentMap<TxThreadKey, IgniteInternalTx> sysThreadMap = newMap();
158
159 /** Per-ID map. */
160 private final ConcurrentMap<GridCacheVersion, IgniteInternalTx> idMap = newMap();
161
162 /** Per-ID map for near transactions. */
163 private final ConcurrentMap<GridCacheVersion, IgniteInternalTx> nearIdMap = newMap();
164
165 /** Deadlock detection futures. */
166 private final ConcurrentMap<Long, TxDeadlockFuture> deadlockDetectFuts = new ConcurrentHashMap<>();
167
168 /** TX handler. */
169 private IgniteTxHandler txHnd;
170
171 /** Committed local transactions. */
172 private final GridBoundedConcurrentOrderedMap<GridCacheVersion, Boolean> completedVersSorted =
173 new GridBoundedConcurrentOrderedMap<>(
174 Integer.getInteger(IGNITE_MAX_COMPLETED_TX_COUNT, DFLT_MAX_COMPLETED_TX_CNT));
175
176 /** Committed local transactions. */
177 private final ConcurrentLinkedHashMap<GridCacheVersion, Object> completedVersHashMap =
178 new ConcurrentLinkedHashMap<>(
179 Integer.getInteger(IGNITE_MAX_COMPLETED_TX_COUNT, DFLT_MAX_COMPLETED_TX_CNT),
180 0.75f,
181 Runtime.getRuntime().availableProcessors() * 2,
182 Integer.getInteger(IGNITE_MAX_COMPLETED_TX_COUNT, DFLT_MAX_COMPLETED_TX_CNT),
183 PER_SEGMENT_Q);
184
185 /** Pending one phase commit ack requests sender. */
186 private GridDeferredAckMessageSender deferredAckMsgSnd;
187
188 /** Transaction finish synchronizer. */
189 private GridCacheTxFinishSync txFinishSync;
190
191 /** For test purposes only. */
192 private boolean finishSyncDisabled;
193
194 /** Slow tx warn timeout. */
195 private int slowTxWarnTimeout = SLOW_TX_WARN_TIMEOUT;
196
197 /**
198 * Near version to DHT version map. Note that we initialize to 5K size from get go,
199 * to avoid future map resizings.
200 */
201 private final ConcurrentMap<GridCacheVersion, GridCacheVersion> mappedVers =
202 new ConcurrentHashMap<>(5120);
203
204 /** TxDeadlock detection. */
205 private TxDeadlockDetection txDeadlockDetection;
206
207 /** Flag indicates that {@link TxRecord} records will be logged to WAL. */
208 private boolean logTxRecords;
209
210 /** {@inheritDoc} */
211 @Override protected void onKernalStop0(boolean cancel) {
212 cctx.gridIO().removeMessageListener(TOPIC_TX);
213 }
214
215 /** {@inheritDoc} */
216 @Override protected void start0() throws IgniteCheckedException {
217 txFinishSync = new GridCacheTxFinishSync<>(cctx);
218
219 txHnd = new IgniteTxHandler(cctx);
220
221 deferredAckMsgSnd = new GridDeferredAckMessageSender<GridCacheVersion>(cctx.time(), cctx.kernalContext().closure()) {
222 @Override public int getTimeout() {
223 return DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_TIMEOUT;
224 }
225
226 @Override public int getBufferSize() {
227 return DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_BUFFER_SIZE;
228 }
229
230 @Override public void finish(UUID nodeId, ConcurrentLinkedDeque8<GridCacheVersion> vers) {
231 GridDhtTxOnePhaseCommitAckRequest ackReq = new GridDhtTxOnePhaseCommitAckRequest(vers);
232
233 cctx.kernalContext().gateway().readLock();
234
235 try {
236 cctx.io().send(nodeId, ackReq, GridIoPolicy.SYSTEM_POOL);
237 }
238 catch (ClusterTopologyCheckedException ignored) {
239 if (log.isDebugEnabled())
240 log.debug("Failed to send one phase commit ack to backup node because it left grid: " + nodeId);
241 }
242 catch (IgniteCheckedException e) {
243 log.error("Failed to send one phase commit ack to backup node [backup=" + nodeId + ']', e);
244 }
245 finally {
246 cctx.kernalContext().gateway().readUnlock();
247 }
248 }
249 };
250
251 cctx.gridEvents().addLocalEventListener(
252 new GridLocalEventListener() {
253 @Override public void onEvent(Event evt) {
254 assert evt instanceof DiscoveryEvent;
255 assert evt.type() == EVT_NODE_FAILED || evt.type() == EVT_NODE_LEFT;
256
257 DiscoveryEvent discoEvt = (DiscoveryEvent)evt;
258
259 UUID nodeId = discoEvt.eventNode().id();
260
261 // Wait some time in case there are some unprocessed messages from failed node.
262 cctx.time().addTimeoutObject(new NodeFailureTimeoutObject(nodeId));
263
264 if (txFinishSync != null)
265 txFinishSync.onNodeLeft(nodeId);
266
267 for (TxDeadlockFuture fut : deadlockDetectFuts.values())
268 fut.onNodeLeft(nodeId);
269
270 for (Map.Entry<GridCacheVersion, Object> entry : completedVersHashMap.entrySet()) {
271 Object obj = entry.getValue();
272
273 if (obj instanceof GridCacheReturnCompletableWrapper &&
274 nodeId.equals(((GridCacheReturnCompletableWrapper)obj).nodeId()))
275 removeTxReturn(entry.getKey());
276 }
277 }
278 },
279 EVT_NODE_FAILED, EVT_NODE_LEFT);
280
281 this.txDeadlockDetection = new TxDeadlockDetection(cctx);
282
283 cctx.gridIO().addMessageListener(TOPIC_TX, new DeadlockDetectionListener());
284
285 this.logTxRecords = IgniteSystemProperties.getBoolean(IGNITE_WAL_LOG_TX_RECORDS, false);
286 }
287
288 /**
289 * @param cacheId Cache ID.
290 */
291 public void rollbackTransactionsForCache(int cacheId) {
292 rollbackTransactionsForCache(cacheId, nearIdMap);
293
294 rollbackTransactionsForCache(cacheId, idMap);
295 }
296
297 /**
298 * @param cacheId Cache ID.
299 * @param txMap Transactions map.
300 */
301 private void rollbackTransactionsForCache(int cacheId, ConcurrentMap<?, IgniteInternalTx> txMap) {
302 for (Map.Entry<?, IgniteInternalTx> e : txMap.entrySet()) {
303 IgniteInternalTx tx = e.getValue();
304
305 for (IgniteTxEntry entry : tx.allEntries()) {
306 if (entry.cacheId() == cacheId) {
307 rollbackTx(tx, false);
308
309 break;
310 }
311 }
312 }
313 }
314
315 /** {@inheritDoc} */
316 @Override public void onDisconnected(IgniteFuture reconnectFut) {
317 txFinishSync.onDisconnected(reconnectFut);
318
319 for (IgniteInternalTx tx : idMap.values())
320 rollbackTx(tx, true);
321 for (IgniteInternalTx tx : nearIdMap.values())
322 rollbackTx(tx, true);
323
324 IgniteClientDisconnectedException err =
325 new IgniteClientDisconnectedException(reconnectFut, "Client node disconnected.");
326
327 for (TxDeadlockFuture fut : deadlockDetectFuts.values())
328 fut.onDone(err);
329 }
330
331 /**
332 * @return TX handler.
333 */
334 public IgniteTxHandler txHandler() {
335 return txHnd;
336 }
337
338 /**
339 * Invalidates transaction.
340 *
341 * @param tx Transaction.
342 */
343 public void salvageTx(IgniteInternalTx tx) {
344 salvageTx(tx, USER_FINISH);
345 }
346
347 /**
348 * Invalidates transaction.
349 *
350 * @param tx Transaction.
351 * @param status Finalization status.
352 */
353 private void salvageTx(IgniteInternalTx tx, IgniteInternalTx.FinalizationStatus status) {
354 assert tx != null;
355
356 TransactionState state = tx.state();
357
358 if (state == ACTIVE || state == PREPARING || state == PREPARED || state == MARKED_ROLLBACK) {
359 if (!tx.markFinalizing(status)) {
360 if (log.isDebugEnabled())
361 log.debug("Will not try to commit invalidate transaction (could not mark finalized): " + tx);
362
363 return;
364 }
365
366 tx.salvageTx();
367
368 if (log.isDebugEnabled())
369 log.debug("Invalidated transaction because originating node left grid: " + CU.txString(tx));
370 }
371 }
372
373 /**
374 * Prints out memory stats to standard out.
375 * <p>
376 * USE ONLY FOR MEMORY PROFILING DURING TESTS.
377 */
378 @Override public void printMemoryStats() {
379 X.println(">>> ");
380 X.println(">>> Transaction manager memory stats [igniteInstanceName=" + cctx.igniteInstanceName() + ']');
381 X.println(">>> threadMapSize: " + threadMap.size());
382 X.println(">>> idMap [size=" + idMap.size() + ']');
383 X.println(">>> nearIdMap [size=" + nearIdMap.size() + ']');
384 X.println(">>> completedVersSortedSize: " + completedVersSorted.size());
385 X.println(">>> completedVersHashMapSize: " + completedVersHashMap.sizex());
386 }
387
388 /**
389 * @return Thread map size.
390 */
391 public int threadMapSize() {
392 return threadMap.size();
393 }
394
395 /**
396 * @return ID map size.
397 */
398 public int idMapSize() {
399 return idMap.size();
400 }
401
402 /**
403 * @return Committed versions size.
404 */
405 public int completedVersionsSize() {
406 return completedVersHashMap.size();
407 }
408
409 /**
410 *
411 * @param tx Transaction to check.
412 * @return {@code True} if transaction has been committed or rolled back,
413 * {@code false} otherwise.
414 */
415 private boolean isCompleted(IgniteInternalTx tx) {
416 boolean completed = completedVersHashMap.containsKey(tx.xidVersion());
417
418 // Need check that for tx with timeout rollback message was not received before lock.
419 if (!completed && tx.local() && tx.dht() && tx.timeout() > 0)
420 return completedVersHashMap.containsKey(tx.nearXidVersion());
421
422 return completed;
423 }
424
425 /**
426 * @param implicit {@code True} if transaction is implicit.
427 * @param implicitSingle Implicit-with-single-key flag.
428 * @param concurrency Concurrency.
429 * @param isolation Isolation.
430 * @param timeout transaction timeout.
431 * @param txSize Expected transaction size.
432 * @return New transaction.
433 */
434 public GridNearTxLocal newTx(
435 boolean implicit,
436 boolean implicitSingle,
437 @Nullable GridCacheContext sysCacheCtx,
438 TransactionConcurrency concurrency,
439 TransactionIsolation isolation,
440 long timeout,
441 boolean storeEnabled,
442 int txSize
443 ) {
444 assert sysCacheCtx == null || sysCacheCtx.systemTx();
445
446 UUID subjId = null; // TODO GG-9141 how to get subj ID?
447
448 int taskNameHash = cctx.kernalContext().job().currentTaskNameHash();
449
450 GridNearTxLocal tx = new GridNearTxLocal(
451 cctx,
452 implicit,
453 implicitSingle,
454 sysCacheCtx != null,
455 sysCacheCtx != null ? sysCacheCtx.ioPolicy() : SYSTEM_POOL,
456 concurrency,
457 isolation,
458 timeout,
459 storeEnabled,
460 txSize,
461 subjId,
462 taskNameHash);
463
464 if (tx.system()) {
465 AffinityTopologyVersion topVer = cctx.tm().lockedTopologyVersion(Thread.currentThread().getId(), tx);
466
467 // If there is another system transaction in progress, use it's topology version to prevent deadlock.
468 if (topVer != null)
469 tx.topologyVersion(topVer);
470 }
471
472 return onCreated(sysCacheCtx, tx);
473 }
474
475 /**
476 * @param cacheCtx Cache context.
477 * @param tx Created transaction.
478 * @return Started transaction.
479 */
480 @Nullable public <T extends IgniteInternalTx> T onCreated(@Nullable GridCacheContext cacheCtx, T tx) {
481 ConcurrentMap<GridCacheVersion, IgniteInternalTx> txIdMap = transactionMap(tx);
482
483 // Start clean.
484 resetContext();
485
486 if (isCompleted(tx)) {
487 if (log.isDebugEnabled())
488 log.debug("Attempt to create a completed transaction (will ignore): " + tx);
489
490 return null;
491 }
492
493 IgniteInternalTx t;
494
495 if ((t = txIdMap.putIfAbsent(tx.xidVersion(), tx)) == null) {
496 if (tx.local() && !tx.dht()) {
497 assert tx instanceof GridNearTxLocal : tx;
498
499 if (!tx.implicit()) {
500 if (cacheCtx == null || !cacheCtx.systemTx())
501 threadMap.put(tx.threadId(), tx);
502 else
503 sysThreadMap.put(new TxThreadKey(tx.threadId(), cacheCtx.cacheId()), tx);
504 }
505 }
506
507 // Handle mapped versions.
508 if (tx instanceof GridCacheMappedVersion) {
509 GridCacheMappedVersion mapped = (GridCacheMappedVersion)tx;
510
511 GridCacheVersion from = mapped.mappedVersion();
512
513 if (from != null)
514 mappedVers.put(from, tx.xidVersion());
515
516 if (log.isDebugEnabled())
517 log.debug("Added transaction version mapping [from=" + from + ", to=" + tx.xidVersion() +
518 ", tx=" + tx + ']');
519 }
520 }
521 else {
522 if (log.isDebugEnabled())
523 log.debug("Attempt to create an existing transaction (will ignore) [newTx=" + tx + ", existingTx=" +
524 t + ']');
525
526 return null;
527 }
528
529 if (log.isDebugEnabled())
530 log.debug("Transaction created: " + tx);
531
532 return tx;
533 }
534
535 /**
536 * Creates a future that will wait for all ongoing transactions that maybe affected by topology update
537 * to be finished. This set of transactions include
538 * <ul>
539 * <li/> All {@link TransactionConcurrency#PESSIMISTIC} transactions with topology version
540 * less or equal to {@code topVer}.
541 * <li/> {@link TransactionConcurrency#OPTIMISTIC} transactions in PREPARING state with topology
542 * version less or equal to {@code topVer} and having transaction key with entry that belongs to
543 * one of partitions in {@code parts}.
544 * </ul>
545 *
546 * @param topVer Topology version.
547 * @return Future that will be completed when all ongoing transactions are finished.
548 */
549 public IgniteInternalFuture<Boolean> finishTxs(AffinityTopologyVersion topVer) {
550 GridCompoundFuture<IgniteInternalTx, Boolean> res =
551 new CacheObjectsReleaseFuture<>(
552 "Tx",
553 topVer,
554 new IgniteReducer<IgniteInternalTx, Boolean>() {
555 @Override public boolean collect(IgniteInternalTx e) {
556 return true;
557 }
558
559 @Override public Boolean reduce() {
560 return true;
561 }
562 });
563
564 for (IgniteInternalTx tx : txs()) {
565 if (needWaitTransaction(tx, topVer))
566 res.add(tx.finishFuture());
567 }
568
569 res.markInitialized();
570
571 return res;
572 }
573
574 /**
575 * @param tx Transaction.
576 * @param topVer Exchange version.
577 * @return {@code True} if need wait transaction for exchange.
578 */
579 public boolean needWaitTransaction(IgniteInternalTx tx, AffinityTopologyVersion topVer) {
580 AffinityTopologyVersion txTopVer = tx.topologyVersionSnapshot();
581
582 return txTopVer != null && txTopVer.compareTo(topVer) < 0;
583 }
584
585 /**
586 * Transaction start callback (has to do with when any operation was
587 * performed on this transaction).
588 *
589 * @param tx Started transaction.
590 * @return {@code True} if transaction is not in completed set.
591 */
592 public boolean onStarted(IgniteInternalTx tx) {
593 assert tx.state() == ACTIVE || tx.isRollbackOnly() : "Invalid transaction state [locId=" + cctx.localNodeId() +
594 ", tx=" + tx + ']';
595
596 if (isCompleted(tx)) {
597 ConcurrentMap<GridCacheVersion, IgniteInternalTx> txIdMap = transactionMap(tx);
598
599 txIdMap.remove(tx.xidVersion(), tx);
600
601 if (log.isDebugEnabled())
602 log.debug("Attempt to start a completed transaction (will ignore): " + tx);
603
604 return false;
605 }
606
607 if (log.isDebugEnabled())
608 log.debug("Transaction started: " + tx);
609
610 return true;
611 }
612
613 /**
614 * @param from Near version.
615 * @return DHT version for a near version.
616 */
617 public GridCacheVersion mappedVersion(GridCacheVersion from) {
618 GridCacheVersion to = mappedVers.get(from);
619
620 if (log.isDebugEnabled())
621 log.debug("Found mapped version [from=" + from + ", to=" + to);
622
623 return to;
624 }
625
626 /**
627 *
628 * @param ver Alternate version.
629 * @param tx Transaction.
630 */
631 public void addAlternateVersion(GridCacheVersion ver, IgniteInternalTx tx) {
632 if (idMap.putIfAbsent(ver, tx) == null)
633 if (log.isDebugEnabled())
634 log.debug("Registered alternate transaction version [ver=" + ver + ", tx=" + tx + ']');
635 }
636
637 /**
638 * @return Local transaction.
639 */
640 @Nullable public IgniteTxLocalAdapter localTx() {
641 IgniteTxLocalAdapter tx = tx();
642
643 return tx != null && tx.local() ? tx : null;
644 }
645
646 /**
647 * @param cctx Cache context.
648 * @return Transaction for current thread.
649 */
650 public GridNearTxLocal threadLocalTx(GridCacheContext cctx) {
651 IgniteInternalTx tx = tx(cctx, Thread.currentThread().getId());
652
653 if (tx != null && tx.local() && (!tx.dht() || tx.colocated()) && !tx.implicit()) {
654 assert tx instanceof GridNearTxLocal : tx;
655
656 return (GridNearTxLocal)tx;
657 }
658
659 return null;
660 }
661
662 /**
663 * @return Transaction for current thread.
664 */
665 @SuppressWarnings({"unchecked", "RedundantCast"})
666 public <T> T tx() {
667 IgniteInternalTx tx = txContext();
668
669 return tx != null ? (T)tx : (T)tx(null, Thread.currentThread().getId());
670 }
671
672 /**
673 * @param threadId Thread ID.
674 * @param ignore Transaction to ignore.
675 * @return Not null topology version if current thread holds lock preventing topology change.
676 */
677 @Nullable public AffinityTopologyVersion lockedTopologyVersion(long threadId, IgniteInternalTx ignore) {
678 IgniteInternalTx tx = threadMap.get(threadId);
679
680 if (tx != null) {
681 AffinityTopologyVersion topVer = tx.topologyVersionSnapshot();
682
683 if (topVer != null)
684 return topVer;
685 }
686
687 if (!sysThreadMap.isEmpty()) {
688 for (GridCacheContext cacheCtx : cctx.cache().context().cacheContexts()) {
689 if (!cacheCtx.systemTx())
690 continue;
691
692 tx = sysThreadMap.get(new TxThreadKey(threadId, cacheCtx.cacheId()));
693
694 if (tx != null && tx != ignore) {
695 AffinityTopologyVersion topVer = tx.topologyVersionSnapshot();
696
697 if (topVer != null)
698 return topVer;
699 }
700 }
701 }
702
703 return txTop.get();
704 }
705
706 /**
707 * @param topVer Locked topology version.
708 * @return {@code True} if topology hint was set.
709 */
710 public boolean setTxTopologyHint(@Nullable AffinityTopologyVersion topVer) {
711 if (topVer == null)
712 txTop.set(null);
713 else {
714 if (txTop.get() == null) {
715 txTop.set(topVer);
716
717 return true;
718 }
719 }
720
721 return false;
722 }
723
724 /**
725 * @return User transaction for current thread.
726 */
727 @Nullable public GridNearTxLocal userTx() {
728 IgniteInternalTx tx = txContext();
729
730 if (activeUserTx(tx))
731 return (GridNearTxLocal)tx;
732
733 tx = tx(null, Thread.currentThread().getId());
734
735 if (activeUserTx(tx))
736 return (GridNearTxLocal)tx;
737
738 return null;
739 }
740
741 /**
742 * @param cctx Cache context.
743 * @return User transaction for current thread.
744 */
745 @Nullable GridNearTxLocal userTx(GridCacheContext cctx) {
746 IgniteInternalTx tx = tx(cctx, Thread.currentThread().getId());
747
748 if (activeUserTx(tx))
749 return (GridNearTxLocal)tx;
750
751 return null;
752 }
753
754 /**
755 * @param tx Transaction.
756 * @return {@code True} if given transaction is explicitly started user transaction.
757 */
758 private boolean activeUserTx(@Nullable IgniteInternalTx tx) {
759 if (tx != null && tx.user() && tx.state() == ACTIVE) {
760 assert tx instanceof GridNearTxLocal : tx;
761
762 return true;
763 }
764
765 return false;
766 }
767
768 /**
769 * @param cctx Cache context.
770 * @param threadId Id of thread for transaction.
771 * @return Transaction for thread with given ID.
772 */
773 @SuppressWarnings({"unchecked"})
774 private <T> T tx(GridCacheContext cctx, long threadId) {
775 if (cctx == null || !cctx.systemTx())
776 return (T)threadMap.get(threadId);
777
778 TxThreadKey key = new TxThreadKey(threadId, cctx.cacheId());
779
780 return (T)sysThreadMap.get(key);
781 }
782
783 /**
784 * @return {@code True} if current thread is currently within transaction.
785 */
786 public boolean inUserTx() {
787 return userTx() != null;
788 }
789
790 /**
791 * @param txId Transaction ID.
792 * @return Transaction with given ID.
793 */
794 @SuppressWarnings({"unchecked"})
795 @Nullable public <T extends IgniteInternalTx> T tx(GridCacheVersion txId) {
796 return (T)idMap.get(txId);
797 }
798
799 /**
800 * @param txId Transaction ID.
801 * @return Transaction with given ID.
802 */
803 @SuppressWarnings({"unchecked"})
804 @Nullable public <T extends IgniteInternalTx> T nearTx(GridCacheVersion txId) {
805 return (T)nearIdMap.get(txId);
806 }
807
808 /**
809 * Handles prepare stage.
810 *
811 * @param tx Transaction to prepare.
812 * @param entries Entries to lock or {@code null} if use default {@link IgniteInternalTx#optimisticLockEntries()}.
813 * @throws IgniteCheckedException If preparation failed.
814 */
815 public void prepareTx(IgniteInternalTx tx, @Nullable Collection<IgniteTxEntry> entries) throws IgniteCheckedException {
816 if (tx.state() == MARKED_ROLLBACK) {
817 if (tx.remainingTime() == -1)
818 throw new IgniteTxTimeoutCheckedException("Transaction timed out: " + this);
819
820 throw new IgniteCheckedException("Transaction is marked for rollback: " + tx);
821 }
822
823 if (tx.remainingTime() == -1) {
824 tx.setRollbackOnly();
825
826 throw new IgniteTxTimeoutCheckedException("Transaction timed out: " + this);
827 }
828
829 if (tx.pessimistic() && tx.local())
830 return; // Nothing else to do in pessimistic mode.
831
832 // Optimistic.
833 assert tx.optimistic() || !tx.local();
834
835 if (!lockMultiple(tx, entries != null ? entries : tx.optimisticLockEntries())) {
836 tx.setRollbackOnly();
837
838 throw new IgniteTxOptimisticCheckedException("Failed to prepare transaction (lock conflict): " + tx);
839 }
840 }
841
842 /**
843 * @param tx Transaction.
844 */
845 private void removeObsolete(IgniteInternalTx tx) {
846 Collection<IgniteTxEntry> entries = tx.local() ? tx.allEntries() : tx.writeEntries();
847
848 for (IgniteTxEntry entry : entries) {
849 GridCacheEntryEx cached = entry.cached();
850
851 GridCacheContext cacheCtx = entry.context();
852
853 if (cached == null)
854 cached = cacheCtx.cache().peekEx(entry.key());
855
856 if (cached.detached())
857 continue;
858
859 try {
860 if (cached.obsolete() || cached.markObsoleteIfEmpty(tx.xidVersion()))
861 cacheCtx.cache().removeEntry(cached);
862
863 if (!tx.near() && isNearEnabled(cacheCtx)) {
864 GridNearCacheAdapter near = cacheCtx.isNear() ? cacheCtx.near() : cacheCtx.dht().near();
865
866 GridNearCacheEntry e = near.peekExx(entry.key());
867
868 if (e != null && e.markObsoleteIfEmpty(null))
869 near.removeEntry(e);
870 }
871 }
872 catch (IgniteCheckedException e) {
873 U.error(log, "Failed to remove obsolete entry from cache: " + cached, e);
874 }
875 }
876 }
877
878 /**
879 * @param min Minimum version.
880 * @return Pair [committed, rolledback] - never {@code null}, elements potentially empty,
881 * but also never {@code null}.
882 */
883 public IgnitePair<Collection<GridCacheVersion>> versions(GridCacheVersion min) {
884 Collection<GridCacheVersion> committed = null;
885 Collection<GridCacheVersion> rolledback = null;
886
887 for (Map.Entry<GridCacheVersion, Boolean> e : completedVersSorted.tailMap(min, true).entrySet()) {
888 if (e.getValue()) {
889 if (committed == null)
890 committed = new ArrayList<>();
891
892 committed.add(e.getKey());
893 }
894 else {
895 if (rolledback == null)
896 rolledback = new ArrayList<>();
897
898 rolledback.add(e.getKey());
899 }
900 }
901
902 return new IgnitePair<>(
903 committed == null ? Collections.<GridCacheVersion>emptyList() : committed,
904 rolledback == null ? Collections.<GridCacheVersion>emptyList() : rolledback);
905 }
906
907 /**
908 * @return Collection of active transactions.
909 */
910 public Collection<IgniteInternalTx> activeTransactions() {
911 return F.concat(false, idMap.values(), nearIdMap.values());
912 }
913
914 /**
915 * @param tx Tx to remove.
916 */
917 public void removeCommittedTx(IgniteInternalTx tx) {
918 completedVersHashMap.remove(tx.xidVersion(), true);
919
920 if (tx.needsCompletedVersions())
921 completedVersSorted.remove(tx.xidVersion(), true);
922 }
923
924 /**
925 * @param tx Committed transaction.
926 */
927 public void addCommittedTx(IgniteInternalTx tx) {
928 addCommittedTx(tx, tx.xidVersion(), tx.nearXidVersion());
929 }
930
931 /**
932 * @param tx Committed transaction.
933 */
934 public void addCommittedTxReturn(IgniteInternalTx tx, GridCacheReturnCompletableWrapper ret) {
935 addCommittedTxReturn(tx.nearXidVersion(), null, ret);
936 }
937
938 /**
939 * @param tx Committed transaction.
940 * @return If transaction was not already present in committed set.
941 */
942 public boolean addRolledbackTx(IgniteInternalTx tx) {
943 return addRolledbackTx(tx, tx.xidVersion());
944 }
945
946 /**
947 * @param tx Tx.
948 * @param xidVer Completed transaction version.
949 * @param nearXidVer Optional near transaction ID.
950 * @return If transaction was not already present in completed set.
951 */
952 public boolean addCommittedTx(
953 IgniteInternalTx tx,
954 GridCacheVersion xidVer,
955 @Nullable GridCacheVersion nearXidVer
956 ) {
957 if (nearXidVer != null)
958 xidVer = new CommittedVersion(xidVer, nearXidVer);
959
960 Object committed0 = completedVersHashMap.putIfAbsent(xidVer, true);
961
962 if (committed0 == null && (tx == null || tx.needsCompletedVersions())) {
963 Boolean b = completedVersSorted.putIfAbsent(xidVer, true);
964
965 assert b == null;
966 }
967
968 Boolean committed = committed0 != null && !committed0.equals(Boolean.FALSE);
969
970 return committed0 == null || committed;
971 }
972
973 /**
974 * @param xidVer Completed transaction version.
975 * @param nearXidVer Optional near transaction ID.
976 * @param retVal Invoke result.
977 */
978 private void addCommittedTxReturn(
979 GridCacheVersion xidVer,
980 @Nullable GridCacheVersion nearXidVer,
981 GridCacheReturnCompletableWrapper retVal
982 ) {
983 assert retVal != null;
984
985 if (nearXidVer != null)
986 xidVer = new CommittedVersion(xidVer, nearXidVer);
987
988 Object prev = completedVersHashMap.putIfAbsent(xidVer, retVal);
989
990 assert prev == null || Boolean.FALSE.equals(prev) : prev; // Can be rolled back.
991 }
992
993 /**
994 * @param tx Tx.
995 * @param xidVer Completed transaction version.
996 * @return If transaction was not already present in completed set.
997 */
998 public boolean addRolledbackTx(
999 IgniteInternalTx tx,
1000 GridCacheVersion xidVer
1001 ) {
1002 Object committed0 = completedVersHashMap.putIfAbsent(xidVer, false);
1003
1004 if (committed0 == null && (tx == null || tx.needsCompletedVersions())) {
1005 Boolean b = completedVersSorted.putIfAbsent(xidVer, false);
1006
1007 assert b == null;
1008 }
1009
1010 Boolean committed = committed0 != null && !committed0.equals(Boolean.FALSE);
1011
1012 return committed0 == null || !committed;
1013 }
1014
1015 /**
1016 * @param xidVer xidVer Completed transaction version.
1017 * @return Tx result.
1018 */
1019 public GridCacheReturnCompletableWrapper getCommittedTxReturn(GridCacheVersion xidVer) {
1020 Object retVal = completedVersHashMap.get(xidVer);
1021
1022 // Will gain true in regular case or GridCacheReturn in onePhaseCommit case.
1023 if (!Boolean.TRUE.equals(retVal)) {
1024 assert !Boolean.FALSE.equals(retVal); // Method should be used only after 'committed' checked.
1025
1026 GridCacheReturnCompletableWrapper res = (GridCacheReturnCompletableWrapper)retVal;
1027
1028 removeTxReturn(xidVer);
1029
1030 return res;
1031 }
1032 else
1033 return null;
1034 }
1035
1036 /**
1037 * @param xidVer xidVer Completed transaction version.
1038 */
1039 public void removeTxReturn(GridCacheVersion xidVer) {
1040 Object prev = completedVersHashMap.get(xidVer);
1041
1042 if (Boolean.FALSE.equals(prev)) // Tx can be rolled back.
1043 return;
1044
1045 assert prev instanceof GridCacheReturnCompletableWrapper:
1046 prev + " instead of GridCacheReturnCompletableWrapper";
1047
1048 boolean res = completedVersHashMap.replace(xidVer, prev, true);
1049
1050 assert res;
1051 }
1052
1053 /**
1054 * @param tx Transaction.
1055 */
1056 private void processCompletedEntries(IgniteInternalTx tx) {
1057 if (tx.needsCompletedVersions()) {
1058 GridCacheVersion min = minVersion(tx.readEntries(), tx.xidVersion(), tx);
1059
1060 min = minVersion(tx.writeEntries(), min, tx);
1061
1062 assert min != null;
1063
1064 IgnitePair<Collection<GridCacheVersion>> versPair = versions(min);
1065
1066 tx.completedVersions(min, versPair.get1(), versPair.get2());
1067 }
1068 }
1069
1070 /**
1071 * Collects versions for all pending locks for all entries within transaction
1072 *
1073 * @param dhtTxLoc Transaction being committed.
1074 */
1075 private void collectPendingVersions(GridDhtTxLocal dhtTxLoc) {
1076 if (dhtTxLoc.needsCompletedVersions()) {
1077 if (log.isDebugEnabled())
1078 log.debug("Checking for pending locks with version less then tx version: " + dhtTxLoc);
1079
1080 Set<GridCacheVersion> vers = new LinkedHashSet<>();
1081
1082 collectPendingVersions(dhtTxLoc.readEntries(), dhtTxLoc.xidVersion(), vers);
1083 collectPendingVersions(dhtTxLoc.writeEntries(), dhtTxLoc.xidVersion(), vers);
1084
1085 if (!vers.isEmpty())
1086 dhtTxLoc.pendingVersions(vers);
1087 }
1088 }
1089
1090 /**
1091 * Gets versions of all not acquired locks for collection of tx entries that are less then base version.
1092 *
1093 * @param entries Tx entries to process.
1094 * @param baseVer Base version to compare with.
1095 * @param vers Collection of versions that will be populated.
1096 */
1097 @SuppressWarnings("TypeMayBeWeakened")
1098 private void collectPendingVersions(Iterable<IgniteTxEntry> entries,
1099 GridCacheVersion baseVer, Set<GridCacheVersion> vers) {
1100
1101 // The locks are not released yet, so we can safely list pending candidates versions.
1102 for (IgniteTxEntry txEntry : entries) {
1103 GridCacheEntryEx cached = txEntry.cached();
1104
1105 try {
1106 // If check should be faster then exception handling.
1107 if (!cached.obsolete()) {
1108 for (GridCacheMvccCandidate cand : cached.localCandidates()) {
1109 if (!cand.owner() && cand.version().compareTo(baseVer) < 0) {
1110 if (log.isDebugEnabled())
1111 log.debug("Adding candidate version to pending set: " + cand);
1112
1113 vers.add(cand.version());
1114 }
1115 }
1116 }
1117 }
1118 catch (GridCacheEntryRemovedException ignored) {
1119 if (log.isDebugEnabled())
1120 log.debug("There are no pending locks for entry (entry was deleted in transaction): " + txEntry);
1121 }
1122 }
1123 }
1124
1125 /**
1126 * Go through all candidates for entries involved in transaction and find their min
1127 * version. We know that these candidates will commit after this transaction, and
1128 * therefore we can grab the min version so we can send all committed and rolled
1129 * back versions from min to current to remote nodes for re-ordering.
1130 *
1131 * @param entries Entries.
1132 * @param min Min version so far.
1133 * @param tx Transaction.
1134 * @return Minimal available version.
1135 */
1136 private GridCacheVersion minVersion(Iterable<IgniteTxEntry> entries, GridCacheVersion min,
1137 IgniteInternalTx tx) {
1138 for (IgniteTxEntry txEntry : entries) {
1139 GridCacheEntryEx cached = txEntry.cached();
1140
1141 // We are assuming that this method is only called on commit. In that
1142 // case, if lock is held, entry can never be removed.
1143 assert txEntry.isRead() || !cached.obsolete(tx.xidVersion()) :
1144 "Invalid obsolete version for transaction [entry=" + cached + ", tx=" + tx + ']';
1145
1146 for (GridCacheMvccCandidate cand : cached.remoteMvccSnapshot())
1147 if (min == null || cand.version().isLess(min))
1148 min = cand.version();
1149 }
1150
1151 return min;
1152 }
1153
1154 /**
1155 * @param tx Transaction.
1156 * @return {@code True} if transaction read entries should be unlocked.
1157 */
1158 private boolean unlockReadEntries(IgniteInternalTx tx) {
1159 if (tx.pessimistic())
1160 return !tx.readCommitted();
1161 else
1162 return tx.serializable();
1163 }
1164
1165 /**
1166 * Commits a transaction.
1167 *
1168 * @param tx Transaction to commit.
1169 * @throws IgniteCheckedException If failed.
1170 */
1171 public void commitTx(IgniteInternalTx tx) throws IgniteCheckedException {
1172 assert tx != null;
1173 assert tx.state() == COMMITTING : "Invalid transaction state for commit from tm [state=" + tx.state() +
1174 ", expected=COMMITTING, tx=" + tx + ']';
1175
1176 if (log.isDebugEnabled())
1177 log.debug("Committing from TM [locNodeId=" + cctx.localNodeId() + ", tx=" + tx + ']');
1178
1179 /*
1180 * Note that write phase is handled by transaction adapter itself,
1181 * so we don't do it here.
1182 */
1183
1184 Object committed0 = completedVersHashMap.get(tx.xidVersion());
1185
1186 Boolean committed = committed0 != null && !committed0.equals(Boolean.FALSE);
1187
1188 // 1. Make sure that committed version has been recorded.
1189 if (!(committed || tx.writeSet().isEmpty() || tx.isSystemInvalidate())) {
1190 uncommitTx(tx);
1191
1192 tx.errorWhenCommitting();
1193
1194 throw new IgniteCheckedException("Missing commit version (consider increasing " +
1195 IGNITE_MAX_COMPLETED_TX_COUNT + " system property) [ver=" + tx.xidVersion() +
1196 ", committed0=" + committed0 +
1197 ", tx=" + tx.getClass().getSimpleName() + ']');
1198 }
1199
1200 ConcurrentMap<GridCacheVersion, IgniteInternalTx> txIdMap = transactionMap(tx);
1201
1202 if (txIdMap.remove(tx.xidVersion(), tx)) {
1203 // 2. Must process completed entries before unlocking!
1204 processCompletedEntries(tx);
1205
1206 if (tx instanceof GridDhtTxLocal) {
1207 GridDhtTxLocal dhtTxLoc = (GridDhtTxLocal)tx;
1208
1209 collectPendingVersions(dhtTxLoc);
1210 }
1211
1212 // 3. Unlock write resources.
1213 unlockMultiple(tx, tx.writeEntries());
1214
1215 // 4. Unlock read resources if required.
1216 if (unlockReadEntries(tx))
1217 unlockMultiple(tx, tx.readEntries());
1218
1219 // 5. Notify evictions.
1220 notifyEvictions(tx);
1221
1222 // 6. Remove obsolete entries from cache.
1223 removeObsolete(tx);
1224
1225 // 7. Assign transaction number at the end of transaction.
1226 tx.endVersion(cctx.versions().next(tx.topologyVersion()));
1227
1228 // 8. Remove from per-thread storage.
1229 clearThreadMap(tx);
1230
1231 // 9. Unregister explicit locks.
1232 if (!tx.alternateVersions().isEmpty()) {
1233 for (GridCacheVersion ver : tx.alternateVersions())
1234 idMap.remove(ver);
1235 }
1236
1237 // 10. Remove Near-2-DHT mappings.
1238 if (tx instanceof GridCacheMappedVersion) {
1239 GridCacheVersion mapped = ((GridCacheMappedVersion)tx).mappedVersion();
1240
1241 if (mapped != null)
1242 mappedVers.remove(mapped);
1243 }
1244
1245 // 11. Clear context.
1246 resetContext();
1247
1248 // 12. Update metrics.
1249 if (!tx.dht() && tx.local()) {
1250 if (!tx.system())
1251 cctx.txMetrics().onTxCommit();
1252
1253 tx.txState().onTxEnd(cctx, tx, true);
1254 }
1255
1256 if (slowTxWarnTimeout > 0 && tx.local() &&
1257 U.currentTimeMillis() - tx.startTime() > slowTxWarnTimeout)
1258 U.warn(log, "Slow transaction detected [tx=" + tx +
1259 ", slowTxWarnTimeout=" + slowTxWarnTimeout + ']') ;
1260
1261 if (log.isDebugEnabled())
1262 log.debug("Committed from TM [locNodeId=" + cctx.localNodeId() + ", tx=" + tx + ']');
1263 }
1264 else if (log.isDebugEnabled())
1265 log.debug("Did not commit from TM (was already committed): " + tx);
1266 }
1267
1268 /**
1269 * Rolls back a transaction.
1270 *
1271 * @param tx Transaction to rollback.
1272 * @param clearThreadMap {@code True} if need remove tx from thread map.
1273 */
1274 public void rollbackTx(IgniteInternalTx tx, boolean clearThreadMap) {
1275 assert tx != null;
1276
1277 if (log.isDebugEnabled())
1278 log.debug("Rolling back from TM [locNodeId=" + cctx.localNodeId() + ", tx=" + tx + ']');
1279
1280 // 1. Record transaction version to avoid duplicates.
1281 addRolledbackTx(tx);
1282
1283 ConcurrentMap<GridCacheVersion, IgniteInternalTx> txIdMap = transactionMap(tx);
1284
1285 if (txIdMap.remove(tx.xidVersion(), tx)) {
1286 // 2. Unlock write resources.
1287 unlockMultiple(tx, tx.writeEntries());
1288
1289 // 3. Unlock read resources if required.
1290 if (unlockReadEntries(tx))
1291 unlockMultiple(tx, tx.readEntries());
1292
1293 // 4. Notify evictions.
1294 notifyEvictions(tx);
1295
1296 // 5. Remove obsolete entries.
1297 removeObsolete(tx);
1298
1299 // 6. Remove from per-thread storage.
1300 if (clearThreadMap)
1301 clearThreadMap(tx);
1302
1303 // 7. Unregister explicit locks.
1304 if (!tx.alternateVersions().isEmpty())
1305 for (GridCacheVersion ver : tx.alternateVersions())
1306 idMap.remove(ver);
1307
1308 // 8. Remove Near-2-DHT mappings.
1309 if (tx instanceof GridCacheMappedVersion)
1310 mappedVers.remove(((GridCacheMappedVersion)tx).mappedVersion());
1311
1312 // 9. Clear context.
1313 resetContext();
1314
1315 // 10. Update metrics.
1316 if (!tx.dht() && tx.local()) {
1317 if (!tx.system())
1318 cctx.txMetrics().onTxRollback();
1319
1320 tx.txState().onTxEnd(cctx, tx, false);
1321 }
1322
1323 if (log.isDebugEnabled())
1324 log.debug("Rolled back from TM: " + tx);
1325 }
1326 else if (log.isDebugEnabled())
1327 log.debug("Did not rollback from TM (was already rolled back): " + tx);
1328 }
1329
1330 /**
1331 * Fast finish transaction. Can be used only if no locks were acquired.
1332 *
1333 * @param tx Transaction to finish.
1334 * @param commit {@code True} if transaction is committed, {@code false} if rolled back.
1335 */
1336 public void fastFinishTx(GridNearTxLocal tx, boolean commit) {
1337 assert tx != null;
1338 assert tx.writeMap().isEmpty();
1339 assert tx.optimistic() || tx.readMap().isEmpty();
1340
1341 ConcurrentMap<GridCacheVersion, IgniteInternalTx> txIdMap = transactionMap(tx);
1342
1343 if (txIdMap.remove(tx.xidVersion(), tx)) {
1344 // 1. Notify evictions.
1345 notifyEvictions(tx);
1346
1347 // 2. Evict near entries.
1348 if (!tx.readMap().isEmpty()) {
1349 for (IgniteTxEntry entry : tx.readMap().values())
1350 tx.evictNearEntry(entry, false);
1351 }
1352
1353 // 3. Remove obsolete entries.
1354 removeObsolete(tx);
1355
1356 // 4. Remove from per-thread storage.
1357 clearThreadMap(tx);
1358
1359 // 5. Clear context.
1360 resetContext();
1361
1362 // 6. Update metrics.
1363 if (!tx.dht() && tx.local()) {
1364 if (!tx.system()) {
1365 if (commit)
1366 cctx.txMetrics().onTxCommit();
1367 else
1368 cctx.txMetrics().onTxRollback();
1369 }
1370
1371 tx.txState().onTxEnd(cctx, tx, commit);
1372 }
1373 }
1374 }
1375
1376 /**
1377 * Tries to minimize damage from partially-committed transaction.
1378 *
1379 * @param tx Tx to uncommit.
1380 */
1381 void uncommitTx(IgniteInternalTx tx) {
1382 assert tx != null;
1383
1384 if (log.isDebugEnabled())
1385 log.debug("Uncommiting from TM: " + tx);
1386
1387 ConcurrentMap<GridCacheVersion, IgniteInternalTx> txIdMap = transactionMap(tx);
1388
1389 if (txIdMap.remove(tx.xidVersion(), tx)) {
1390 // 1. Unlock write resources.
1391 unlockMultiple(tx, tx.writeEntries());
1392
1393 // 2. Unlock read resources if required.
1394 if (unlockReadEntries(tx))
1395 unlockMultiple(tx, tx.readEntries());
1396
1397 // 3. Notify evictions.
1398 notifyEvictions(tx);
1399
1400 // 4. Remove from per-thread storage.
1401 clearThreadMap(tx);
1402
1403 // 5. Unregister explicit locks.
1404 if (!tx.alternateVersions().isEmpty()) {
1405 for (GridCacheVersion ver : tx.alternateVersions())
1406 idMap.remove(ver);
1407 }
1408
1409 // 6. Remove Near-2-DHT mappings.
1410 if (tx instanceof GridCacheMappedVersion)
1411 mappedVers.remove(((GridCacheMappedVersion)tx).mappedVersion());
1412
1413 // 7. Clear context.
1414 resetContext();
1415
1416 if (log.isDebugEnabled())
1417 log.debug("Uncommitted from TM: " + tx);
1418 }
1419 else if (log.isDebugEnabled())
1420 log.debug("Did not uncommit from TM (was already committed or rolled back): " + tx);
1421 }
1422
1423 /**
1424 * @param tx Transaction to clear.
1425 */
1426 public void clearThreadMap(IgniteInternalTx tx) {
1427 if (tx.local() && !tx.dht()) {
1428 assert tx instanceof GridNearTxLocal : tx;
1429
1430 if (!tx.system())
1431 threadMap.remove(tx.threadId(), tx);
1432 else {
1433 Integer cacheId = tx.txState().firstCacheId();
1434
1435 if (cacheId != null)
1436 sysThreadMap.remove(new TxThreadKey(tx.threadId(), cacheId), tx);
1437 else {
1438 for (Iterator<IgniteInternalTx> it = sysThreadMap.values().iterator(); it.hasNext(); ) {
1439 IgniteInternalTx txx = it.next();
1440
1441 if (tx == txx) {
1442 it.remove();
1443
1444 break;
1445 }
1446 }
1447 }
1448 }
1449 }
1450 }
1451
1452 /**
1453 * Gets transaction ID map depending on transaction type.
1454 *
1455 * @param tx Transaction.
1456 * @return Transaction map.
1457 */
1458 private ConcurrentMap<GridCacheVersion, IgniteInternalTx> transactionMap(IgniteInternalTx tx) {
1459 return (tx.near() && !tx.local()) ? nearIdMap : idMap;
1460 }
1461
1462 /**
1463 * @param tx Transaction to notify evictions for.
1464 */
1465 private void notifyEvictions(IgniteInternalTx tx) {
1466 if (tx.internal())
1467 return;
1468
1469 for (IgniteTxEntry txEntry : tx.allEntries())
1470 txEntry.cached().context().evicts().touch(txEntry, tx.local());
1471 }
1472
1473 /**
1474 * Callback invoked whenever a member of a transaction acquires
1475 * lock ownership.
1476 *
1477 * @param entry Cache entry.
1478 * @param owner Candidate that won ownership.
1479 * @return {@code True} if transaction was notified, {@code false} otherwise.
1480 */
1481 public boolean onOwnerChanged(GridCacheEntryEx entry, GridCacheMvccCandidate owner) {
1482 // We only care about acquired locks.
1483 if (owner != null) {
1484 IgniteTxAdapter tx = tx(owner.version());
1485
1486 if (tx == null)
1487 tx = nearTx(owner.version());
1488
1489 if (tx != null) {
1490 if (!tx.local()) {
1491 if (log.isDebugEnabled())
1492 log.debug("Found transaction for owner changed event [owner=" + owner + ", entry=" + entry +
1493 ", tx=" + tx + ']');
1494
1495 tx.onOwnerChanged(entry, owner);
1496
1497 return true;
1498 }
1499 else if (log.isDebugEnabled())
1500 log.debug("Ignoring local transaction for owner change event: " + tx);
1501 }
1502 else if (log.isDebugEnabled())
1503 log.debug("Transaction not found for owner changed event [owner=" + owner + ", entry=" + entry + ']');
1504 }
1505
1506 return false;
1507 }
1508
1509 /**
1510 * Callback called by near finish future before sending near finish request to remote node. Will increment
1511 * per-thread counter so that further awaitAck call will wait for finish response.
1512 *
1513 * @param rmtNodeId Remote node ID for which finish request is being sent.
1514 * @param threadId Near tx thread ID.
1515 */
1516 public void beforeFinishRemote(UUID rmtNodeId, long threadId) {
1517 if (finishSyncDisabled)
1518 return;
1519
1520 assert txFinishSync != null;
1521
1522 txFinishSync.onFinishSend(rmtNodeId, threadId);
1523 }
1524
1525 /**
1526 * Callback invoked when near finish response is received from remote node.
1527 *
1528 * @param rmtNodeId Remote node ID from which response is received.
1529 * @param threadId Near tx thread ID.
1530 */
1531 public void onFinishedRemote(UUID rmtNodeId, long threadId) {
1532 if (finishSyncDisabled)
1533 return;
1534
1535 assert txFinishSync != null;
1536
1537 txFinishSync.onAckReceived(rmtNodeId, threadId);
1538 }
1539
1540 /**
1541 * Asynchronously waits for last finish request ack.
1542 *
1543 * @param rmtNodeId Remote node ID.
1544 * @param threadId Near tx thread ID.
1545 * @return {@code null} if ack was received or future that will be completed when ack is received.
1546 */
1547 @Nullable public IgniteInternalFuture<?> awaitFinishAckAsync(UUID rmtNodeId, long threadId) {
1548 if (finishSyncDisabled)
1549 return null;
1550
1551 assert txFinishSync != null;
1552
1553 return txFinishSync.awaitAckAsync(rmtNodeId, threadId);
1554 }
1555
1556 /**
1557 * For test purposes only.
1558 *
1559 * @param finishSyncDisabled {@code True} if finish sync should be disabled.
1560 */
1561 public void finishSyncDisabled(boolean finishSyncDisabled) {
1562 this.finishSyncDisabled = finishSyncDisabled;
1563 }
1564
1565 /**
1566 * @param tx Transaction.
1567 * @param entries Entries to lock.
1568 * @return {@code True} if all keys were locked.
1569 * @throws IgniteCheckedException If lock has been cancelled.
1570 */
1571 private boolean lockMultiple(IgniteInternalTx tx, Iterable<IgniteTxEntry> entries)
1572 throws IgniteCheckedException {
1573 assert tx.optimistic() || !tx.local();
1574
1575 long remainingTime = tx.remainingTime();
1576
1577 // For serializable transactions, failure to acquire lock means
1578 // that there is a serializable conflict. For all other isolation levels,
1579 // we wait for the lock.
1580 long timeout = remainingTime < 0 ? 0 : remainingTime;
1581
1582 GridCacheVersion serOrder = (tx.serializable() && tx.optimistic()) ? tx.nearXidVersion() : null;
1583
1584 for (IgniteTxEntry txEntry1 : entries) {
1585 // Check if this entry was prepared before.
1586 if (!txEntry1.markPrepared() || txEntry1.explicitVersion() != null)
1587 continue;
1588
1589 GridCacheContext cacheCtx = txEntry1.context();
1590
1591 while (true) {
1592 cctx.database().checkpointReadLock();
1593
1594 try {
1595 GridCacheEntryEx entry1 = txEntry1.cached();
1596
1597 assert entry1 != null : txEntry1;
1598 assert !entry1.detached() : "Expected non-detached entry for near transaction " +
1599 "[locNodeId=" + cctx.localNodeId() + ", entry=" + entry1 + ']';
1600
1601 GridCacheVersion serReadVer = txEntry1.entryReadVersion();
1602
1603 assert serReadVer == null || (tx.optimistic() && tx.serializable()) : txEntry1;
1604
1605 boolean read = serOrder != null && txEntry1.op() == READ;
1606
1607 entry1.unswap();
1608
1609 if (!entry1.tmLock(tx, timeout, serOrder, serReadVer, read)) {
1610 // Unlock locks locked so far.
1611 for (IgniteTxEntry txEntry2 : entries) {
1612 if (txEntry2 == txEntry1)
1613 break;
1614
1615 txUnlock(tx, txEntry2);
1616 }
1617
1618 return false;
1619 }
1620
1621 break;
1622 }
1623 catch (GridCacheEntryRemovedException ignored) {
1624 if (log.isDebugEnabled())
1625 log.debug("Got removed entry in TM lockMultiple(..) method (will retry): " + txEntry1);
1626
1627 try {
1628 // Renew cache entry.
1629 txEntry1.cached(cacheCtx.cache().entryEx(txEntry1.key(), tx.topologyVersion()));
1630 }
1631 catch (GridDhtInvalidPartitionException e) {
1632 assert tx.dht() : "Received invalid partition for non DHT transaction [tx=" +
1633 tx + ", invalidPart=" + e.partition() + ']';
1634
1635 // If partition is invalid, we ignore this entry.
1636 tx.addInvalidPartition(cacheCtx, e.partition());
1637
1638 break;
1639 }
1640 }
1641 catch (GridDistributedLockCancelledException ignore) {
1642 tx.setRollbackOnly();
1643
1644 throw new IgniteCheckedException("Entry lock has been cancelled for transaction: " + tx);
1645 }
1646 finally {
1647 cctx.database().checkpointReadUnlock();
1648 }
1649 }
1650 }
1651
1652 return true;
1653 }
1654
1655 /**
1656 * @param tx Transaction.
1657 * @param txEntry Entry to unlock.
1658 */
1659 private void txUnlock(IgniteInternalTx tx, IgniteTxEntry txEntry) {
1660 while (true) {
1661 try {
1662 txEntry.cached().txUnlock(tx);
1663
1664 break;
1665 }
1666 catch (GridCacheEntryRemovedException ignored) {
1667 if (log.isDebugEnabled())
1668 log.debug("Got removed entry in TM txUnlock(..) method (will retry): " + txEntry);
1669
1670 try {
1671 txEntry.cached(txEntry.context().cache().entryEx(txEntry.key(), tx.topologyVersion()));
1672 }
1673 catch (GridDhtInvalidPartitionException e) {
1674 tx.addInvalidPartition(txEntry.context(), e.partition());
1675
1676 break;
1677 }
1678 }
1679 }
1680 }
1681
1682 /**
1683 * @param tx Owning transaction.
1684 * @param entries Entries to unlock.
1685 */
1686 private void unlockMultiple(IgniteInternalTx tx, Iterable<IgniteTxEntry> entries) {
1687 for (IgniteTxEntry txEntry : entries) {
1688 GridCacheContext cacheCtx = txEntry.context();
1689
1690 while (true) {
1691 try {
1692 GridCacheEntryEx entry = txEntry.cached();
1693
1694 assert entry != null;
1695
1696 if (entry.detached())
1697 break;
1698
1699 entry.txUnlock(tx);
1700
1701 break;
1702 }
1703 catch (GridCacheEntryRemovedException ignored) {
1704 if (log.isDebugEnabled())
1705 log.debug("Got removed entry in TM unlockMultiple(..) method (will retry): " + txEntry);
1706
1707 // Renew cache entry.
1708 txEntry.cached(cacheCtx.cache().entryEx(txEntry.key(), tx.topologyVersion()));
1709 }
1710 }
1711 }
1712 }
1713
1714 /**
1715 * @param tx Committing transaction.
1716 */
1717 public void txContext(IgniteInternalTx tx) {
1718 threadCtx.set(tx);
1719 }
1720
1721 /**
1722 * @return Currently committing transaction.
1723 */
1724 @SuppressWarnings({"unchecked"})
1725 private IgniteInternalTx txContext() {
1726 return threadCtx.get();
1727 }
1728
1729 /**
1730 * Gets version of transaction in tx context or {@code null}
1731 * if tx context is empty.
1732 * <p>
1733 * This is a convenience method provided mostly for debugging.
1734 *
1735 * @return Transaction version from transaction context.
1736 */
1737 @Nullable public GridCacheVersion txContextVersion() {
1738 IgniteInternalTx tx = txContext();
1739
1740 return tx == null ? null : tx.xidVersion();
1741 }
1742
1743 /**
1744 * Commit ended.
1745 */
1746 public void resetContext() {
1747 threadCtx.set(null);
1748 }
1749
1750 /**
1751 * @return All transactions.
1752 */
1753 public Collection<IgniteInternalTx> txs() {
1754 return F.concat(false, idMap.values(), nearIdMap.values());
1755 }
1756
1757 /**
1758 * @return Slow tx warn timeout.
1759 */
1760 public int slowTxWarnTimeout() {
1761 return slowTxWarnTimeout;
1762 }
1763
1764 /**
1765 * @param slowTxWarnTimeout Slow tx warn timeout.
1766 */
1767 public void slowTxWarnTimeout(int slowTxWarnTimeout) {
1768 this.slowTxWarnTimeout = slowTxWarnTimeout;
1769 }
1770
1771 /**
1772 * Checks if transactions with given near version ID was prepared or committed.
1773 *
1774 * @param nearVer Near version ID.
1775 * @param txNum Number of transactions.
1776 * @return Future for flag indicating if transactions were prepared or committed or {@code null} for success future.
1777 */
1778 @Nullable public IgniteInternalFuture<Boolean> txsPreparedOrCommitted(GridCacheVersion nearVer, int txNum) {
1779 return txsPreparedOrCommitted(nearVer, txNum, null, null);
1780 }
1781
1782 /**
1783 * @param xidVer Version.
1784 * @return Future for flag indicating if transactions was committed.
1785 */
1786 public IgniteInternalFuture<Boolean> txCommitted(GridCacheVersion xidVer) {
1787 final GridFutureAdapter<Boolean> resFut = new GridFutureAdapter<>();
1788
1789 final IgniteInternalTx tx = cctx.tm().tx(xidVer);
1790
1791 if (tx != null) {
1792 assert tx.near() && tx.local() : tx;
1793
1794 if (log.isDebugEnabled())
1795 log.debug("Found near transaction, will wait for completion: " + tx);
1796
1797 tx.finishFuture().listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() {
1798 @Override public void apply(IgniteInternalFuture<IgniteInternalTx> fut) {
1799 TransactionState state = tx.state();
1800
1801 if (log.isDebugEnabled())
1802 log.debug("Near transaction finished with state: " + state);
1803
1804 resFut.onDone(state == COMMITTED);
1805 }
1806 });
1807
1808 return resFut;
1809 }
1810
1811 boolean committed = false;
1812
1813 for (Map.Entry<GridCacheVersion, Object> entry : completedVersHashMap.entrySet()) {
1814 if (entry.getKey() instanceof CommittedVersion) {
1815 CommittedVersion comm = (CommittedVersion)entry.getKey();
1816
1817 if (comm.nearVer.equals(xidVer)) {
1818 committed = !entry.getValue().equals(Boolean.FALSE);
1819
1820 break;
1821 }
1822 }
1823 }
1824
1825 if (log.isDebugEnabled())
1826 log.debug("Near transaction committed: " + committed);
1827
1828 resFut.onDone(committed);
1829
1830 return resFut;
1831 }
1832
1833 /**
1834 * @param nearVer Near version.
1835 * @return Finish future for related remote transactions.
1836 */
1837 @SuppressWarnings("unchecked")
1838 public IgniteInternalFuture<?> remoteTxFinishFuture(GridCacheVersion nearVer) {
1839 GridCompoundFuture<Void, Void> fut = new GridCompoundFuture<>();
1840
1841 for (final IgniteInternalTx tx : txs()) {
1842 if (!tx.local() && nearVer.equals(tx.nearXidVersion()))
1843 fut.add((IgniteInternalFuture) tx.finishFuture());
1844 }
1845
1846 fut.markInitialized();
1847
1848 return fut;
1849 }
1850
1851 /**
1852 * @param nearVer Near version ID.
1853 * @param txNum Number of transactions.
1854 * @param fut Result future.
1855 * @param processedVers Processed versions.
1856 * @return Future for flag indicating if transactions were prepared or committed or {@code null} for success future.
1857 */
1858 @Nullable private IgniteInternalFuture<Boolean> txsPreparedOrCommitted(final GridCacheVersion nearVer,
1859 int txNum,
1860 @Nullable GridFutureAdapter<Boolean> fut,
1861 @Nullable Collection<GridCacheVersion> processedVers)
1862 {
1863 for (final IgniteInternalTx tx : txs()) {
1864 if (nearVer.equals(tx.nearXidVersion())) {
1865 IgniteInternalFuture<?> prepFut = tx.currentPrepareFuture();
1866
1867 if (prepFut != null && !prepFut.isDone()) {
1868 if (log.isDebugEnabled())
1869 log.debug("Transaction is preparing (will wait): " + tx);
1870
1871 final GridFutureAdapter<Boolean> fut0 = fut != null ? fut : new GridFutureAdapter<Boolean>();
1872
1873 final int txNum0 = txNum;
1874
1875 final Collection<GridCacheVersion> processedVers0 = processedVers;
1876
1877 prepFut.listen(new CI1<IgniteInternalFuture<?>>() {
1878 @Override public void apply(IgniteInternalFuture<?> prepFut) {
1879 if (log.isDebugEnabled())
1880 log.debug("Transaction prepare future finished: " + tx);
1881
1882 IgniteInternalFuture<Boolean> fut = txsPreparedOrCommitted(nearVer,
1883 txNum0,
1884 fut0,
1885 processedVers0);
1886
1887 assert fut == fut0;
1888 }
1889 });
1890
1891 return fut0;
1892 }
1893
1894 TransactionState state = tx.state();
1895
1896 if (state == PREPARED || state == COMMITTING || state == COMMITTED) {
1897 if (--txNum == 0) {
1898 if (fut != null)
1899 fut.onDone(true);
1900
1901 return fut;
1902 }
1903 }
1904 else {
1905 if (tx.state(MARKED_ROLLBACK) || tx.state() == UNKNOWN) {
1906 tx.rollbackAsync();
1907
1908 if (log.isDebugEnabled())
1909 log.debug("Transaction was not prepared (rolled back): " + tx);
1910
1911 if (fut == null)
1912 fut = new GridFutureAdapter<>();
1913
1914 fut.onDone(false);
1915
1916 return fut;
1917 }
1918 else {
1919 if (tx.state() == COMMITTED) {
1920 if (--txNum == 0) {
1921 if (fut != null)
1922 fut.onDone(true);
1923
1924 return fut;
1925 }
1926 }
1927 else {
1928 if (log.isDebugEnabled())
1929 log.debug("Transaction is not prepared: " + tx);
1930
1931 if (fut == null)
1932 fut = new GridFutureAdapter<>();
1933
1934 fut.onDone(false);
1935
1936 return fut;
1937 }
1938 }
1939 }
1940
1941 if (processedVers == null)
1942 processedVers = new HashSet<>(txNum, 1.0f);
1943
1944 processedVers.add(tx.xidVersion());
1945 }
1946 }
1947
1948 // Not all transactions were found. Need to scan committed versions to check
1949 // if transaction was already committed.
1950 for (Map.Entry<GridCacheVersion, Object> e : completedVersHashMap.entrySet()) {
1951 if (e.getValue().equals(Boolean.FALSE))
1952 continue;
1953
1954 GridCacheVersion ver = e.getKey();
1955
1956 if (processedVers != null && processedVers.contains(ver))
1957 continue;
1958
1959 if (ver instanceof CommittedVersion) {
1960 CommittedVersion commitVer = (CommittedVersion)ver;
1961
1962 if (commitVer.nearVer.equals(nearVer)) {
1963 if (--txNum == 0) {
1964 if (fut != null)
1965 fut.onDone(true);
1966
1967 return fut;
1968 }
1969 }
1970 }
1971 }
1972
1973 if (fut == null)
1974 fut = new GridFutureAdapter<>();
1975
1976 fut.onDone(false);
1977
1978 return fut;
1979 }
1980
1981 /**
1982 * Commits or rolls back prepared transaction.
1983 *
1984 * @param tx Transaction.
1985 * @param commit Whether transaction should be committed or rolled back.
1986 */
1987 public void finishTxOnRecovery(final IgniteInternalTx tx, boolean commit) {
1988 if (log.isDebugEnabled())
1989 log.debug("Finishing prepared transaction [tx=" + tx + ", commit=" + commit + ']');
1990
1991 if (!tx.markFinalizing(RECOVERY_FINISH)) {
1992 if (log.isDebugEnabled())
1993 log.debug("Will not try to commit prepared transaction (could not mark finalized): " + tx);
1994
1995 return;
1996 }
1997
1998 if (tx instanceof IgniteTxRemoteEx) {
1999 IgniteTxRemoteEx rmtTx = (IgniteTxRemoteEx)tx;
2000
2001 rmtTx.doneRemote(tx.xidVersion(),
2002 Collections.<GridCacheVersion>emptyList(),
2003 Collections.<GridCacheVersion>emptyList(),
2004 Collections.<GridCacheVersion>emptyList());
2005 }
2006
2007 if (commit)
2008 tx.commitAsync().listen(new CommitListener(tx));
2009 else
2010 tx.rollbackAsync();
2011 }
2012
2013 /**
2014 * Commits transaction in case when node started transaction failed, but all related
2015 * transactions were prepared (invalidates transaction if it is not fully prepared).
2016 *
2017 * @param tx Transaction.
2018 * @param failedNodeIds Failed nodes IDs.
2019 */
2020 public void commitIfPrepared(IgniteInternalTx tx, Set<UUID> failedNodeIds) {
2021 assert tx instanceof GridDhtTxLocal || tx instanceof GridDhtTxRemote : tx;
2022 assert !F.isEmpty(tx.transactionNodes()) : tx;
2023 assert tx.nearXidVersion() != null : tx;
2024
2025 GridCacheTxRecoveryFuture fut = new GridCacheTxRecoveryFuture(
2026 cctx,
2027 tx,
2028 failedNodeIds,
2029 tx.transactionNodes());
2030
2031 cctx.mvcc().addFuture(fut, fut.futureId());
2032
2033 if (log.isDebugEnabled())
2034 log.debug("Checking optimistic transaction state on remote nodes [tx=" + tx + ", fut=" + fut + ']');
2035
2036 fut.prepare();
2037 }
2038
2039 /**
2040 * @return {@code True} if deadlock detection is enabled.
2041 */
2042 public boolean deadlockDetectionEnabled() {
2043 return DEADLOCK_MAX_ITERS > 0;
2044 }
2045
2046 /**
2047 * Performs deadlock detection for given keys.
2048 *
2049 * @param tx Target tx.
2050 * @param keys Keys.
2051 * @return Detection result.
2052 */
2053 public IgniteInternalFuture<TxDeadlock> detectDeadlock(
2054 IgniteInternalTx tx,
2055 Set<IgniteTxKey> keys
2056 ) {
2057 return txDeadlockDetection.detectDeadlock(tx, keys);
2058 }
2059
2060 /**
2061 * @param nodeId Node ID.
2062 * @param fut Future.
2063 * @param txKeys Tx keys.
2064 */
2065 void txLocksInfo(UUID nodeId, TxDeadlockFuture fut, Set<IgniteTxKey> txKeys) {
2066 ClusterNode node = cctx.node(nodeId);
2067
2068 if (node == null) {
2069 if (log.isDebugEnabled())
2070 log.debug("Failed to finish deadlock detection, node left: " + nodeId);
2071
2072 fut.onDone();
2073
2074 return;
2075 }
2076
2077 TxLocksRequest req = new TxLocksRequest(fut.futureId(), txKeys);
2078
2079 try {
2080 if (!cctx.localNodeId().equals(nodeId))
2081 req.prepareMarshal(cctx);
2082
2083 cctx.gridIO().sendToGridTopic(node, TOPIC_TX, req, SYSTEM_POOL);
2084 }
2085 catch (IgniteCheckedException e) {
2086 if (e instanceof ClusterTopologyCheckedException) {
2087 if (log.isDebugEnabled())
2088 log.debug("Failed to finish deadlock detection, node left: " + nodeId);
2089 }
2090 else
2091 U.warn(log, "Failed to finish deadlock detection: " + e, e);
2092
2093 fut.onDone();
2094 }
2095 }
2096
2097 /**
2098 * @param tx Tx.
2099 * @param txKeys Tx keys.
2100 * @return {@code True} if key is involved into tx.
2101 */
2102 private boolean hasKeys(IgniteInternalTx tx, Collection<IgniteTxKey> txKeys) {
2103 for (IgniteTxKey key : txKeys) {
2104 if (tx.txState().entry(key) != null)
2105 return true;
2106 }
2107
2108 return false;
2109 }
2110
2111 /**
2112 * @param txKeys Tx keys.
2113 * @return Transactions locks and nodes.
2114 */
2115 private TxLocksResponse txLocksInfo(Collection<IgniteTxKey> txKeys) {
2116 TxLocksResponse res = new TxLocksResponse();
2117
2118 Collection<IgniteInternalTx> txs = activeTransactions();
2119
2120 for (IgniteInternalTx tx : txs) {
2121 boolean nearTxLoc = tx instanceof GridNearTxLocal;
2122
2123 if (!(nearTxLoc || tx instanceof GridDhtTxLocal) || !hasKeys(tx, txKeys))
2124 continue;
2125
2126 IgniteTxState state = tx.txState();
2127
2128 assert state instanceof IgniteTxStateImpl || state instanceof IgniteTxImplicitSingleStateImpl;
2129
2130 Collection<IgniteTxEntry> txEntries =
2131 state instanceof IgniteTxStateImpl ? ((IgniteTxStateImpl)state).allEntriesCopy() : state.allEntries();
2132
2133 Set<IgniteTxKey> requestedKeys = null;
2134
2135 // Try to get info about requested keys for detached entries in case of GridNearTxLocal transaction
2136 // in order to reduce amount of requests to remote nodes.
2137 if (nearTxLoc) {
2138 if (tx.pessimistic()) {
2139 GridDhtColocatedLockFuture fut =
2140 (GridDhtColocatedLockFuture)mvccFuture(tx, GridDhtColocatedLockFuture.class);
2141
2142 if (fut != null)
2143 requestedKeys = fut.requestedKeys();
2144
2145 GridNearLockFuture nearFut = (GridNearLockFuture)mvccFuture(tx, GridNearLockFuture.class);
2146
2147 if (nearFut != null) {
2148 Set<IgniteTxKey> nearRequestedKeys = nearFut.requestedKeys();
2149
2150 if (nearRequestedKeys != null) {
2151 if (requestedKeys == null)
2152 requestedKeys = nearRequestedKeys;
2153 else
2154 requestedKeys = nearRequestedKeys;
2155 }
2156 }
2157 }
2158 else {
2159 GridNearOptimisticTxPrepareFuture fut =
2160 (GridNearOptimisticTxPrepareFuture)mvccFuture(tx, GridNearOptimisticTxPrepareFuture.class);
2161
2162 if (fut != null)
2163 requestedKeys = fut.requestedKeys();
2164 }
2165 }
2166
2167 for (IgniteTxEntry txEntry : txEntries) {
2168 IgniteTxKey txKey = txEntry.txKey();
2169
2170 if (res.txLocks(txKey) == null) {
2171 GridCacheMapEntry e = (GridCacheMapEntry)txEntry.cached();
2172
2173 List<GridCacheMvccCandidate> locs = e.mvccAllLocal();
2174
2175 if (locs != null) {
2176 boolean owner = false;
2177
2178 for (GridCacheMvccCandidate loc : locs) {
2179 if (!owner && loc.owner() && loc.tx())
2180 owner = true;
2181
2182 if (!owner) // Skip all candidates in case when no tx that owns lock.
2183 break;
2184
2185 if (loc.tx()) {
2186 UUID nearNodeId = loc.otherNodeId();
2187
2188 GridCacheVersion txId = loc.otherVersion();
2189
2190 TxLock txLock = new TxLock(
2191 txId == null ? loc.version() : txId,
2192 nearNodeId == null ? loc.nodeId() : nearNodeId,
2193 loc.threadId(),
2194 loc.owner() ? TxLock.OWNERSHIP_OWNER : TxLock.OWNERSHIP_CANDIDATE);
2195
2196 res.addTxLock(txKey, txLock);
2197 }
2198 }
2199 }
2200 // Special case for optimal sequence of nodes processing.
2201 else if (nearTxLoc && requestedKeys != null && requestedKeys.contains(txKey)) {
2202 TxLock txLock = new TxLock(
2203 tx.nearXidVersion(),
2204 tx.nodeId(),
2205 tx.threadId(),
2206 TxLock.OWNERSHIP_REQUESTED);
2207
2208 res.addTxLock(txKey, txLock);
2209 }
2210 else
2211 res.addKey(txKey);
2212 }
2213 }
2214 }
2215
2216 return res;
2217 }
2218
2219 /**
2220 * @param tx Tx. Must be instance of {@link GridNearTxLocal}.
2221 * @param cls Future class.
2222 * @return Cache future.
2223 */
2224 private IgniteInternalFuture mvccFuture(IgniteInternalTx tx, Class<? extends IgniteInternalFuture> cls) {
2225 assert tx instanceof GridNearTxLocal : tx;
2226
2227 Collection<GridCacheVersionedFuture<?>> futs = cctx.mvcc().futuresForVersion(tx.nearXidVersion());
2228
2229 if (futs != null) {
2230 for (GridCacheVersionedFuture<?> fut : futs) {
2231 if (fut.getClass().equals(cls))
2232 return fut;
2233 }
2234 }
2235
2236 return null;
2237 }
2238
2239 /**
2240 * @param fut Future.
2241 */
2242 public void addFuture(TxDeadlockFuture fut) {
2243 TxDeadlockFuture old = deadlockDetectFuts.put(fut.futureId(), fut);
2244
2245 assert old == null : old;
2246 }
2247
2248 /**
2249 * @param futId Future ID.
2250 * @return Found future.
2251 */
2252 @Nullable public TxDeadlockFuture future(long futId) {
2253 return deadlockDetectFuts.get(futId);
2254 }
2255
2256 /**
2257 * @param futId Future ID.
2258 */
2259 public void removeFuture(long futId) {
2260 deadlockDetectFuts.remove(futId);
2261 }
2262
2263 /**
2264 * @param nodeId Node ID to send message to.
2265 * @param ver Version to ack.
2266 */
2267 public void sendDeferredAckResponse(UUID nodeId, GridCacheVersion ver) {
2268 deferredAckMsgSnd.sendDeferredAckMessage(nodeId, ver);
2269 }
2270
2271 /**
2272 * @return Collection of active transaction deadlock detection futures.
2273 */
2274 @SuppressWarnings("unchecked")
2275 public Collection<IgniteInternalFuture<?>> deadlockDetectionFutures() {
2276 Collection<? extends IgniteInternalFuture<?>> values = deadlockDetectFuts.values();
2277
2278 return (Collection<IgniteInternalFuture<?>>)values;
2279 }
2280
2281 /**
2282 * Suspends transaction.
2283 * Should not be used directly. Use tx.suspend() instead.
2284 *
2285 * @param tx Transaction to be suspended.
2286 *
2287 * @see #resumeTx(GridNearTxLocal)
2288 * @see GridNearTxLocal#suspend()
2289 * @see GridNearTxLocal#resume()
2290 * @throws IgniteCheckedException If failed to suspend transaction.
2291 */
2292 public void suspendTx(final GridNearTxLocal tx) throws IgniteCheckedException {
2293 assert tx != null && !tx.system() : tx;
2294
2295 if (!tx.state(SUSPENDED)) {
2296 throw new IgniteCheckedException("Trying to suspend transaction with incorrect state "
2297 + "[expected=" + ACTIVE + ", actual=" + tx.state() + ']');
2298 }
2299
2300 clearThreadMap(tx);
2301
2302 transactionMap(tx).remove(tx.xidVersion(), tx);
2303 }
2304
2305 /**
2306 * Resume transaction in current thread.
2307 * Please don't use directly. Use tx.resume() instead.
2308 *
2309 * @param tx Transaction to be resumed.
2310 *
2311 * @see #suspendTx(GridNearTxLocal)
2312 * @see GridNearTxLocal#suspend()
2313 * @see GridNearTxLocal#resume()
2314 * @throws IgniteCheckedException If failed to resume tx.
2315 */
2316 public void resumeTx(GridNearTxLocal tx) throws IgniteCheckedException {
2317 assert tx != null && !tx.system() : tx;
2318
2319 if (!tx.state(ACTIVE)) {
2320 throw new IgniteCheckedException("Trying to resume transaction with incorrect state "
2321 + "[expected=" + SUSPENDED + ", actual=" + tx.state() + ']');
2322 }
2323
2324 assert !threadMap.containsValue(tx) : tx;
2325 assert !transactionMap(tx).containsValue(tx) : tx;
2326 assert !haveSystemTxForThread(Thread.currentThread().getId());
2327
2328 long threadId = Thread.currentThread().getId();
2329
2330 if (threadMap.putIfAbsent(threadId, tx) != null)
2331 throw new IgniteCheckedException("Thread already has started a transaction.");
2332
2333 if (transactionMap(tx).putIfAbsent(tx.xidVersion(), tx) != null)
2334 throw new IgniteCheckedException("Thread already has started a transaction.");
2335
2336 tx.threadId(threadId);
2337 }
2338
2339 /**
2340 * @param threadId Thread id.
2341 * @return True if thread have system transaction. False otherwise.
2342 */
2343 private boolean haveSystemTxForThread(long threadId) {
2344 if (!sysThreadMap.isEmpty()) {
2345 for (GridCacheContext cacheCtx : cctx.cache().context().cacheContexts()) {
2346 if (!cacheCtx.systemTx())
2347 continue;
2348
2349 if (sysThreadMap.containsKey(new TxThreadKey(threadId, cacheCtx.cacheId())))
2350 return true;
2351 }
2352 }
2353
2354 return false;
2355 }
2356
2357 /**
2358 * @return True if {@link TxRecord} records should be logged to WAL.
2359 */
2360 public boolean logTxRecords() {
2361 return logTxRecords;
2362 }
2363
2364 /**
2365 * Timeout object for node failure handler.
2366 */
2367 private final class NodeFailureTimeoutObject extends GridTimeoutObjectAdapter {
2368 /** Left or failed node. */
2369 private final UUID evtNodeId;
2370
2371 /**
2372 * @param evtNodeId Event node ID.
2373 */
2374 private NodeFailureTimeoutObject(UUID evtNodeId) {
2375 super(IgniteUuid.fromUuid(cctx.localNodeId()), TX_SALVAGE_TIMEOUT);
2376
2377 this.evtNodeId = evtNodeId;
2378 }
2379
2380 /**
2381 *
2382 */
2383 private void onTimeout0() {
2384 try {
2385 cctx.kernalContext().gateway().readLock();
2386 }
2387 catch (IllegalStateException | IgniteClientDisconnectedException e) {
2388 if (log.isDebugEnabled())
2389 log.debug("Failed to acquire kernal gateway [err=" + e + ']');
2390
2391 return;
2392 }
2393
2394 try {
2395 if (log.isDebugEnabled())
2396 log.debug("Processing node failed event [locNodeId=" + cctx.localNodeId() +
2397 ", failedNodeId=" + evtNodeId + ']');
2398
2399 for (final IgniteInternalTx tx : txs()) {
2400 if ((tx.near() && !tx.local()) || (tx.storeWriteThrough() && tx.masterNodeIds().contains(evtNodeId))) {
2401 // Invalidate transactions.
2402 salvageTx(tx, RECOVERY_FINISH);
2403 }
2404 else {
2405 // Check prepare only if originating node ID failed. Otherwise parent node will finish this tx.
2406 if (tx.originatingNodeId().equals(evtNodeId)) {
2407 if (tx.state() == PREPARED)
2408 commitIfPrepared(tx, Collections.singleton(evtNodeId));
2409 else {
2410 IgniteInternalFuture<?> prepFut = tx.currentPrepareFuture();
2411
2412 if (prepFut != null) {
2413 prepFut.listen(new CI1<IgniteInternalFuture<?>>() {
2414 @Override public void apply(IgniteInternalFuture<?> fut) {
2415 if (tx.state() == PREPARED)
2416 commitIfPrepared(tx, Collections.singleton(evtNodeId));
2417 else if (tx.setRollbackOnly())
2418 tx.rollbackAsync();
2419 }
2420 });
2421 }
2422 else {
2423 // If we could not mark tx as rollback, it means that transaction is being committed.
2424 if (tx.setRollbackOnly())
2425 tx.rollbackAsync();
2426 }
2427 }
2428 }
2429 }
2430 }
2431 }
2432 finally {
2433 cctx.kernalContext().gateway().readUnlock();
2434 }
2435 }
2436
2437 /** {@inheritDoc} */
2438 @Override public void onTimeout() {
2439 // Should not block timeout thread.
2440 cctx.kernalContext().closure().runLocalSafe(new Runnable() {
2441 @Override public void run() {
2442 onTimeout0();
2443 }
2444 });
2445 }
2446 }
2447
2448 /**
2449 * Per-thread key for system transactions.
2450 */
2451 private static class TxThreadKey {
2452 /** Thread ID. */
2453 private long threadId;
2454
2455 /** Cache ID. */
2456 private int cacheId;
2457
2458 /**
2459 * @param threadId Thread ID.
2460 * @param cacheId Cache ID.
2461 */
2462 private TxThreadKey(long threadId, int cacheId) {
2463 this.threadId = threadId;
2464 this.cacheId = cacheId;
2465 }
2466
2467 /** {@inheritDoc} */
2468 @Override public boolean equals(Object o) {
2469 if (this == o)
2470 return true;
2471
2472 if (!(o instanceof TxThreadKey))
2473 return false;
2474
2475 TxThreadKey that = (TxThreadKey)o;
2476
2477 return cacheId == that.cacheId && threadId == that.threadId;
2478 }
2479
2480 /** {@inheritDoc} */
2481 @Override public int hashCode() {
2482 int res = (int)(threadId ^ (threadId >>> 32));
2483
2484 res = 31 * res + cacheId;
2485
2486 return res;
2487 }
2488 }
2489
2490 /**
2491 *
2492 */
2493 private static class CommittedVersion extends GridCacheVersion {
2494 /** */
2495 private static final long serialVersionUID = 0L;
2496
2497 /** Corresponding near version. Transient. */
2498 private GridCacheVersion nearVer;
2499
2500 /**
2501 * Empty constructor required by {@link Externalizable}.
2502 */
2503 public CommittedVersion() {
2504 // No-op.
2505 }
2506
2507 /**
2508 * @param ver Committed version.
2509 * @param nearVer Near transaction version.
2510 */
2511 private CommittedVersion(GridCacheVersion ver, GridCacheVersion nearVer) {
2512 super(ver.topologyVersion(), ver.order(), ver.nodeOrder(), ver.dataCenterId());
2513
2514 assert nearVer != null;
2515
2516 this.nearVer = nearVer;
2517 }
2518 }
2519
2520 /**
2521 * Commit listener. Checks if commit succeeded and rollbacks if case of error.
2522 */
2523 private class CommitListener implements CI1<IgniteInternalFuture<IgniteInternalTx>> {
2524 /** */
2525 private static final long serialVersionUID = 0L;
2526
2527 /** Transaction. */
2528 private final IgniteInternalTx tx;
2529
2530 /**
2531 * @param tx Transaction.
2532 */
2533 private CommitListener(IgniteInternalTx tx) {
2534 this.tx = tx;
2535 }
2536
2537 /** {@inheritDoc} */
2538 @Override public void apply(IgniteInternalFuture<IgniteInternalTx> t) {
2539 try {
2540 t.get();
2541 }
2542 catch (IgniteTxOptimisticCheckedException ignore) {
2543 if (log.isDebugEnabled())
2544 log.debug("Optimistic failure while committing prepared transaction (will rollback): " +
2545 tx);
2546
2547 try {
2548 tx.rollbackAsync();
2549 }
2550 catch (Throwable e) {
2551 U.error(log, "Failed to automatically rollback transaction: " + tx, e);
2552 }
2553 }
2554 catch (IgniteCheckedException e) {
2555 U.error(log, "Failed to commit transaction during failover: " + tx, e);
2556 }
2557 }
2558 }
2559
2560 /**
2561 * Transactions deadlock detection process message listener.
2562 */
2563 private class DeadlockDetectionListener implements GridMessageListener {
2564 /** {@inheritDoc} */
2565 @SuppressWarnings("unchecked")
2566 @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
2567 GridCacheMessage cacheMsg = (GridCacheMessage)msg;
2568
2569 Throwable err = null;
2570
2571 try {
2572 unmarshall(nodeId, cacheMsg);
2573 }
2574 catch (Exception e) {
2575 err = e;
2576 }
2577
2578 if (err != null || cacheMsg.classError() != null) {
2579 try {
2580 processFailedMessage(nodeId, cacheMsg, err);
2581 }
2582 catch(Throwable e){
2583 U.error(log, "Failed to process message [senderId=" + nodeId +
2584 ", messageType=" + cacheMsg.getClass() + ']', e);
2585
2586 if (e instanceof Error)
2587 throw (Error)e;
2588 }
2589 }
2590 else {
2591 if (log.isDebugEnabled())
2592 log.debug("Message received [locNodeId=" + cctx.localNodeId() +
2593 ", rmtNodeId=" + nodeId + ", msg=" + msg + ']');
2594
2595 if (msg instanceof TxLocksRequest) {
2596 TxLocksRequest req = (TxLocksRequest)msg;
2597
2598 TxLocksResponse res = txLocksInfo(req.txKeys());
2599
2600 res.futureId(req.futureId());
2601
2602 try {
2603 if (!cctx.localNodeId().equals(nodeId))
2604 res.prepareMarshal(cctx);
2605
2606 cctx.gridIO().sendToGridTopic(nodeId, TOPIC_TX, res, SYSTEM_POOL);
2607 }
2608 catch (ClusterTopologyCheckedException e) {
2609 if (log.isDebugEnabled())
2610 log.debug("Failed to send response, node failed: " + nodeId);
2611 }
2612 catch (IgniteCheckedException e) {
2613 U.error(log, "Failed to send response to node [node=" + nodeId + ", res=" + res + ']', e);
2614 }
2615 }
2616 else if (msg instanceof TxLocksResponse) {
2617 TxLocksResponse res = (TxLocksResponse)msg;
2618
2619 long futId = res.futureId();
2620
2621 TxDeadlockFuture fut = future(futId);
2622
2623 if (fut != null)
2624 fut.onResult(nodeId, res);
2625 else
2626 U.warn(log, "Unexpected response received " + res);
2627 }
2628 else
2629 throw new IllegalArgumentException("Unknown message [msg=" + msg + ']');
2630 }
2631 }
2632
2633 /**
2634 * @param nodeId Node ID.
2635 * @param msg Message.
2636 */
2637 private void processFailedMessage(UUID nodeId, GridCacheMessage msg, Throwable err) throws IgniteCheckedException {
2638 switch (msg.directType()) {
2639 case -24: {
2640 TxLocksRequest req = (TxLocksRequest)msg;
2641
2642 TxLocksResponse res = new TxLocksResponse();
2643
2644 res.futureId(req.futureId());
2645
2646 try {
2647 cctx.gridIO().sendToGridTopic(nodeId, TOPIC_TX, res, SYSTEM_POOL);
2648 }
2649 catch (ClusterTopologyCheckedException e) {
2650 if (log.isDebugEnabled())
2651 log.debug("Failed to send response, node failed: " + nodeId);
2652 }
2653 catch (IgniteCheckedException e) {
2654 U.error(log, "Failed to send response to node (is node still alive?) [nodeId=" + nodeId +
2655 ", res=" + res + ']', e);
2656 }
2657 }
2658
2659 break;
2660
2661 case -23: {
2662 TxLocksResponse res = (TxLocksResponse)msg;
2663
2664 TxDeadlockFuture fut = future(res.futureId());
2665
2666 if (fut == null) {
2667 if (log.isDebugEnabled())
2668 log.debug("Failed to find future for response [sender=" + nodeId + ", res=" + res + ']');
2669
2670 return;
2671 }
2672
2673 if (err == null)
2674 fut.onResult(nodeId, res);
2675 else
2676 fut.onDone(null, err);
2677 }
2678
2679 break;
2680
2681 default:
2682 throw new IgniteCheckedException("Failed to process message. Unsupported direct type [msg=" +
2683 msg + ']', msg.classError());
2684 }
2685
2686 }
2687
2688 /**
2689 * @param nodeId Sender node ID.
2690 * @param cacheMsg Message.
2691 */
2692 private void unmarshall(UUID nodeId, GridCacheMessage cacheMsg) {
2693 if (cctx.localNodeId().equals(nodeId))
2694 return;
2695
2696 try {
2697 cacheMsg.finishUnmarshal(cctx, cctx.deploy().globalLoader());
2698 }
2699 catch (IgniteCheckedException e) {
2700 cacheMsg.onClassError(e);
2701 }
2702 catch (BinaryObjectException e) {
2703 cacheMsg.onClassError(new IgniteCheckedException(e));
2704 }
2705 catch (Error e) {
2706 if (cacheMsg.ignoreClassErrors() &&
2707 X.hasCause(e, NoClassDefFoundError.class, UnsupportedClassVersionError.class)) {
2708 cacheMsg.onClassError(
2709 new IgniteCheckedException("Failed to load class during unmarshalling: " + e, e)
2710 );
2711 }
2712 else
2713 throw e;
2714 }
2715 }
2716 }
2717 }