[jboss-svn-commits] JBL Code SVN: r37647 - in labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration: examples/classes/com/arjuna/jta/distributed/example/server/impl and 2 other directories.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Fri Oct 21 04:16:49 EDT 2011
Author: tomjenkinson
Date: 2011-10-21 04:16:49 -0400 (Fri, 21 Oct 2011)
New Revision: 37647
Modified:
labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/examples/classes/com/arjuna/jta/distributed/example/ExampleDistributedJTATestCase.java
labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/examples/classes/com/arjuna/jta/distributed/example/server/impl/ProxyXAResource.java
labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/examples/classes/com/arjuna/jta/distributed/example/server/impl/ProxyXAResourceRecovery.java
labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/examples/classes/com/arjuna/jta/distributed/example/server/impl/ServerImpl.java
labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/SimpleIsolatedServers.java
labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/impl/ServerImpl.java
Log:
JBTM-895 indicated how to handle rollback only
Modified: labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/examples/classes/com/arjuna/jta/distributed/example/ExampleDistributedJTATestCase.java
===================================================================
--- labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/examples/classes/com/arjuna/jta/distributed/example/ExampleDistributedJTATestCase.java 2011-10-21 07:27:41 UTC (rev 37646)
+++ labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/examples/classes/com/arjuna/jta/distributed/example/ExampleDistributedJTATestCase.java 2011-10-21 08:16:49 UTC (rev 37647)
@@ -30,6 +30,7 @@
import javax.transaction.HeuristicRollbackException;
import javax.transaction.NotSupportedException;
import javax.transaction.RollbackException;
+import javax.transaction.Status;
import javax.transaction.SystemException;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
@@ -62,6 +63,11 @@
* Note the use of LocalServer and RemoteServer is just an example, the
* transport is responsible for creating objects that perform similar
* capabilities to these.
+ *
+ * Note that calls to getting the remaining time of a transaction may
+ * programatic configurably trigger a rollback exception which is good for
+ * certain situations, the example though guards "migrations" by checking their
+ * state before propagation - I recommend all transports do the same.
*/
public class ExampleDistributedJTATestCase {
/**
@@ -153,63 +159,106 @@
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(originalServer.getClass().getClassLoader());
- // Get a reference on the transaction manager and create a transaction
- TransactionManager transactionManager = originalServer.getTransactionManager();
- transactionManager.setTransactionTimeout(startingTimeout);
- transactionManager.begin();
+ // THIS SIMULATES NORMAL BUSINESS LOGIC IN BMT/CMT (interceptors?)
+ {
+ // Get a reference on the transaction manager and create a
+ // transaction
+ TransactionManager transactionManager = originalServer.getTransactionManager();
+ transactionManager.setTransactionTimeout(startingTimeout);
+ transactionManager.begin();
- // Do some simulated local work on the TestResource and register a
- // TestSynchronization
- Transaction originalTransaction = transactionManager.getTransaction();
- originalTransaction.registerSynchronization(new TestSynchronization(originalServer.getNodeName()));
- originalTransaction.enlistResource(new TestResource(originalServer.getNodeName()));
+ // Do some simulated local work on the TestResource and register a
+ // TestSynchronization
+ Transaction transaction = transactionManager.getTransaction();
+ transaction.registerSynchronization(new TestSynchronization(originalServer.getNodeName()));
+ transaction.enlistResource(new TestResource(originalServer.getNodeName()));
+ }
- // This is where we start to propagate the transaction - watch closely
- // ;)
+ // This is where we start to propagate the transaction - it is all
+ // transport related code - watch closely ;)
if (!nodesToFlowTo.isEmpty()) {
- // Stash away the root transaction, this is needed in case a
- // subordinate naughtily comes back to this server part way through
- // so we can return the original transaction to them
- originalServer.storeRootTransaction(originalTransaction);
- // Peek at the next node - this is just a test abstraction to
- // simulate where business logic might decide to access an EJB at a
- // server with a remoting name
- String nextServerNodeName = nodesToFlowTo.get(0);
+ TransactionManager transactionManager = originalServer.getTransactionManager();
+ Transaction transaction = transactionManager.getTransaction();
+ int status = transaction.getStatus();
- // Check the remaining timeout
- int remainingTimeout = (int) (((TransactionTimeoutConfiguration) transactionManager).getTimeLeftBeforeTransactionTimeout(false) / 1000);
- // Get the Xid to propagate
- Xid currentXid = originalServer.getCurrentXid();
- // Generate a ProxyXAresource, this is transport specific but it
- // should at least have stored the currentXid in a temporary
- // location or the name of the remote server so that we can recover
- // orphan subordinate transactions
- XAResource proxyXAResource = originalServer.generateProxyXAResource(lookupProvider, nextServerNodeName);
- // Suspend the transaction locally
- transactionManager.suspend();
+ // Only propagate active transactions - this may be inactive through
+ // user code (rollback/setRollbackOnly) or it may be inactive due to
+ // the transaction reaper
+ if (status == Status.STATUS_ACTIVE) {
+ // Stash away the root transaction, this is needed in case a
+ // subordinate naughtily comes back to this server part way
+ // through
+ // so we can return the original transaction to them
+ originalServer.storeRootTransaction(transaction);
- // WE CAN NOW PROPAGATE THE TRANSACTION
- propagateTransaction(nodesToFlowTo, remainingTimeout, currentXid);
+ // Peek at the next node - this is just a test abstraction to
+ // simulate where business logic might decide to access an EJB
+ // at a
+ // server with a remoting name
+ String nextServerNodeName = nodesToFlowTo.get(0);
- // After the call retuns, resume the local transaction
- transactionManager.resume(originalTransaction);
- // Enlist the proxy XA resource with the local transaction so that
- // it can propagate the transaction completion events to the
- // subordinate
- originalTransaction.enlistResource(proxyXAResource);
- // Register a synchronization that can proxy the beforeCompletion
- // event to the remote side, after completion events are the
- // responsibility of the remote server to initiate
- originalTransaction.registerSynchronization(originalServer.generateProxySynchronization(lookupProvider, nextServerNodeName, currentXid));
+ // Check the remaining timeout - false is passed in so the call
+ // doesn't raise a rollback exception
+ int remainingTimeout = (int) (((TransactionTimeoutConfiguration) transactionManager).getTimeLeftBeforeTransactionTimeout(false) / 1000);
+ // Get the Xid to propagate
+ Xid currentXid = originalServer.getCurrentXid();
+ // Generate a ProxyXAresource, this is transport specific but it
+ // should at least have stored the currentXid in a temporary
+ // location or the name of the remote server so that we can
+ // recover
+ // orphan subordinate transactions
+ XAResource proxyXAResource = originalServer.generateProxyXAResource(lookupProvider, nextServerNodeName);
+ // Suspend the transaction locally
+ transactionManager.suspend();
- // Deference the local copy of the current transaction so the GC can
- // free it
- originalServer.removeRootTransaction(currentXid);
+ // WE CAN NOW PROPAGATE THE TRANSACTION
+ DataReturnedFromRemoteServer dataReturnedFromRemoteServer = propagateTransaction(nodesToFlowTo, remainingTimeout, currentXid);
+
+ // After the call retuns, resume the local transaction
+ transactionManager.resume(transaction);
+ // Enlist the proxy XA resource with the local transaction so
+ // that
+ // it can propagate the transaction completion events to the
+ // subordinate
+ transaction.enlistResource(proxyXAResource);
+ // Register a synchronization that can proxy the
+ // beforeCompletion
+ // event to the remote side, after completion events are the
+ // responsibility of the remote server to initiate
+ transaction.registerSynchronization(originalServer.generateProxySynchronization(lookupProvider, nextServerNodeName, currentXid));
+
+ // Deference the local copy of the current transaction so the GC
+ // can
+ // free it
+ originalServer.removeRootTransaction(currentXid);
+
+ // Align the local state with the returning state of the
+ // transaction
+ // from the subordinate
+ switch (dataReturnedFromRemoteServer.getTransactionState()) {
+ case Status.STATUS_MARKED_ROLLBACK:
+ case Status.STATUS_ROLLEDBACK:
+ case Status.STATUS_ROLLING_BACK:
+ switch (transaction.getStatus()) {
+ case Status.STATUS_MARKED_ROLLBACK:
+ case Status.STATUS_ROLLEDBACK:
+ case Status.STATUS_ROLLING_BACK:
+ transaction.setRollbackOnly();
+ }
+ break;
+ default:
+ break;
+ }
+ }
}
- // Commit the local transaction!
- // This should propagate to the nodes required!
- transactionManager.commit();
+ // Again - this is business logic in BMT/CMT (interceptors?)
+ {
+ TransactionManager transactionManager = originalServer.getTransactionManager();
+ // Commit the local transaction!
+ // This should propagate to the nodes required!
+ transactionManager.commit();
+ }
// Reset the test classloader
Thread.currentThread().setContextClassLoader(classLoader);
}
@@ -228,8 +277,8 @@
* @throws NotSupportedException
* @throws IOException
*/
- private boolean propagateTransaction(List<String> nodesToFlowTo, int remainingTimeout, Xid toMigrate) throws RollbackException, IllegalStateException,
- XAException, SystemException, NotSupportedException, IOException {
+ private DataReturnedFromRemoteServer propagateTransaction(List<String> nodesToFlowTo, int remainingTimeout, Xid toMigrate) throws RollbackException,
+ IllegalStateException, XAException, SystemException, NotSupportedException, IOException {
// Do some test setup to initialize this method as it if was being
// invoked in a remote server
String currentServerName = nodesToFlowTo.remove(0);
@@ -245,57 +294,99 @@
// crucial to ensure that calling servers will only lay down a proxy if
// they are the first visitor to this server.
boolean requiresProxyAtPreviousServer = !currentServer.getAndResumeTransaction(remainingTimeout, toMigrate);
- // Perform work on the migrated transaction
- TransactionManager transactionManager = currentServer.getTransactionManager();
- Transaction transaction = transactionManager.getTransaction();
- // Do some simple work on local dummy resources and synchronizations
- transaction.registerSynchronization(new TestSynchronization(currentServer.getNodeName()));
- transaction.enlistResource(new TestResource(currentServer.getNodeName()));
+ {
+ // Perform work on the migrated transaction
+ TransactionManager transactionManager = currentServer.getTransactionManager();
+ Transaction transaction = transactionManager.getTransaction();
+ // Do some simple work on local dummy resources and synchronizations
+ transaction.registerSynchronization(new TestSynchronization(currentServer.getNodeName()));
+ transaction.enlistResource(new TestResource(currentServer.getNodeName()));
+ }
+
// If there are any more nodes to simulate a flow to
if (!nodesToFlowTo.isEmpty()) {
- // Get the transport specific representation of the remote server
- // name
- String nextServerNodeName = nodesToFlowTo.get(0);
- // Determine the remaining timeout to propagate
- remainingTimeout = (int) (((TransactionTimeoutConfiguration) transactionManager).getTimeLeftBeforeTransactionTimeout(false) / 1000);
- // Get the XID to propagate
- Xid currentXid = currentServer.getCurrentXid();
- // Generate the proxy (which saves a temporary copy of the XID in
- // case the remote server was to be orphaned, this could just save a
- // proxy that knows to contact the remote server but then it will
- // force rollbacks on recovery - not ideal)
- XAResource proxyXAResource = currentServer.generateProxyXAResource(lookupProvider, nextServerNodeName);
- // Suspend the transaction ready for propagation
- transactionManager.suspend();
- // Propagate the transaction - in the example I return a boolean to
- // indicate whether this caller is the first client to establish the
- // subordinate transaction at the remote node
- boolean proxyRequired = propagateTransaction(nodesToFlowTo, remainingTimeout, currentXid);
- // Resume the transaction locally, ready for any more local work and
- // to add the proxy resource and sync if needed
- transactionManager.resume(transaction);
- // If this caller was the first entity to propagate the transaction
- // to the remote server
- if (proxyRequired) {
- // Formally enlist the resource
- transaction.enlistResource(proxyXAResource);
- // Register a sync
- transaction.registerSynchronization(currentServer.generateProxySynchronization(lookupProvider, nextServerNodeName, toMigrate));
- } else {
- // This will discard the state of this resource, i.e. the file
- // containing the temporary unprepared XID
- currentServer.cleanupProxyXAResource(proxyXAResource);
+ TransactionManager transactionManager = currentServer.getTransactionManager();
+ Transaction transaction = transactionManager.getTransaction();
+ int status = transaction.getStatus();
+
+ // Only propagate active transactions - this may be inactive through
+ // user code (rollback/setRollbackOnly) or it may be inactive due to
+ // the transaction reaper
+ if (status == Status.STATUS_ACTIVE) {
+ // Get the transport specific representation of the remote
+ // server
+ // name
+ String nextServerNodeName = nodesToFlowTo.get(0);
+
+ // Determine the remaining timeout to propagate
+ remainingTimeout = (int) (((TransactionTimeoutConfiguration) transactionManager).getTimeLeftBeforeTransactionTimeout(false) / 1000);
+ // Get the XID to propagate
+ Xid currentXid = currentServer.getCurrentXid();
+ // Generate the proxy (which saves a temporary copy of the XID
+ // in
+ // case the remote server was to be orphaned, this could just
+ // save a
+ // proxy that knows to contact the remote server but then it
+ // will
+ // force rollbacks on recovery - not ideal)
+ XAResource proxyXAResource = currentServer.generateProxyXAResource(lookupProvider, nextServerNodeName);
+ // Suspend the transaction ready for propagation
+ transactionManager.suspend();
+ // Propagate the transaction - in the example I return a boolean
+ // to
+ // indicate whether this caller is the first client to establish
+ // the
+ // subordinate transaction at the remote node
+ DataReturnedFromRemoteServer dataReturnedFromRemoteServer = propagateTransaction(nodesToFlowTo, remainingTimeout, currentXid);
+ // Resume the transaction locally, ready for any more local work
+ // and
+ // to add the proxy resource and sync if needed
+ transactionManager.resume(transaction);
+ // If this caller was the first entity to propagate the
+ // transaction
+ // to the remote server
+ if (dataReturnedFromRemoteServer.isProxyRequired()) {
+ // Formally enlist the resource
+ transaction.enlistResource(proxyXAResource);
+ // Register a sync
+ transaction.registerSynchronization(currentServer.generateProxySynchronization(lookupProvider, nextServerNodeName, toMigrate));
+ } else {
+ // This will discard the state of this resource, i.e. the
+ // file
+ // containing the temporary unprepared XID
+ currentServer.cleanupProxyXAResource(proxyXAResource);
+ }
+
+ // Align the local state with the returning state of the
+ // transaction
+ // from the subordinate
+ switch (dataReturnedFromRemoteServer.getTransactionState()) {
+ case Status.STATUS_MARKED_ROLLBACK:
+ case Status.STATUS_ROLLEDBACK:
+ case Status.STATUS_ROLLING_BACK:
+ switch (transaction.getStatus()) {
+ case Status.STATUS_MARKED_ROLLBACK:
+ case Status.STATUS_ROLLEDBACK:
+ case Status.STATUS_ROLLING_BACK:
+ transaction.setRollbackOnly();
+ }
+ break;
+ default:
+ break;
+ }
}
}
+ TransactionManager transactionManager = currentServer.getTransactionManager();
+ int transactionState = transactionManager.getStatus();
// SUSPEND THE TRANSACTION WHEN YOU ARE READY TO RETURN TO YOUR CALLER
transactionManager.suspend();
// Return to the previous caller back over the transport/classloader
// boundary in this case
Thread.currentThread().setContextClassLoader(classLoader);
- return requiresProxyAtPreviousServer;
+ return new DataReturnedFromRemoteServer(requiresProxyAtPreviousServer, transactionState);
}
/**
@@ -311,4 +402,27 @@
}
}
+
+ /**
+ * This is the transactional data the transport needs to return from remote
+ * instances.
+ */
+ private class DataReturnedFromRemoteServer {
+ private boolean proxyRequired;
+
+ private int transactionState;
+
+ public DataReturnedFromRemoteServer(boolean proxyRequired, int transactionState) {
+ this.proxyRequired = proxyRequired;
+ this.transactionState = transactionState;
+ }
+
+ public boolean isProxyRequired() {
+ return proxyRequired;
+ }
+
+ public int getTransactionState() {
+ return transactionState;
+ }
+ }
}
Modified: labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/examples/classes/com/arjuna/jta/distributed/example/server/impl/ProxyXAResource.java
===================================================================
--- labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/examples/classes/com/arjuna/jta/distributed/example/server/impl/ProxyXAResource.java 2011-10-21 07:27:41 UTC (rev 37646)
+++ labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/examples/classes/com/arjuna/jta/distributed/example/server/impl/ProxyXAResource.java 2011-10-21 08:16:49 UTC (rev 37647)
@@ -156,7 +156,7 @@
// prepare but the alternative is to orphan a prepared server
try {
- File dir = new File(System.getProperty("user.dir") + "/distributedjta/ProxyXAResource/" + localServerName + "/");
+ File dir = new File(System.getProperty("user.dir") + "/distributedjta-example/ProxyXAResource/" + localServerName + "/");
dir.mkdirs();
File file = new File(dir, new Uid().fileStringForm());
file.createNewFile();
Modified: labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/examples/classes/com/arjuna/jta/distributed/example/server/impl/ProxyXAResourceRecovery.java
===================================================================
--- labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/examples/classes/com/arjuna/jta/distributed/example/server/impl/ProxyXAResourceRecovery.java 2011-10-21 07:27:41 UTC (rev 37646)
+++ labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/examples/classes/com/arjuna/jta/distributed/example/server/impl/ProxyXAResourceRecovery.java 2011-10-21 08:16:49 UTC (rev 37647)
@@ -44,7 +44,7 @@
private List<ProxyXAResource> resources = new ArrayList<ProxyXAResource>();
public ProxyXAResourceRecovery(LookupProvider lookupProvider, Integer id) throws IOException {
- File directory = new File(System.getProperty("user.dir") + "/distributedjta/ProxyXAResource/" + id + "/");
+ File directory = new File(System.getProperty("user.dir") + "/distributedjta-example/ProxyXAResource/" + id + "/");
Map<String, Map<Xid, File>> savedData = new HashMap<String, Map<Xid, File>>();
if (directory.exists() && directory.isDirectory()) {
File[] listFiles = directory.listFiles();
Modified: labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/examples/classes/com/arjuna/jta/distributed/example/server/impl/ServerImpl.java
===================================================================
--- labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/examples/classes/com/arjuna/jta/distributed/example/server/impl/ServerImpl.java 2011-10-21 07:27:41 UTC (rev 37646)
+++ labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/examples/classes/com/arjuna/jta/distributed/example/server/impl/ServerImpl.java 2011-10-21 08:16:49 UTC (rev 37647)
@@ -206,7 +206,7 @@
// prepare but the alternative is to orphan a prepared server
Xid currentXid = getCurrentXid();
- File dir = new File(System.getProperty("user.dir") + "/distributedjta/ProxyXAResource/" + nodeName);
+ File dir = new File(System.getProperty("user.dir") + "/distributedjta-example/ProxyXAResource/" + nodeName);
dir.mkdirs();
File file = new File(dir, new Uid().fileStringForm());
file.createNewFile();
Modified: labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/SimpleIsolatedServers.java
===================================================================
--- labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/SimpleIsolatedServers.java 2011-10-21 07:27:41 UTC (rev 37646)
+++ labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/SimpleIsolatedServers.java 2011-10-21 08:16:49 UTC (rev 37647)
@@ -29,11 +29,9 @@
import java.util.LinkedList;
import java.util.List;
-import javax.transaction.HeuristicMixedException;
-import javax.transaction.HeuristicRollbackException;
-import javax.transaction.InvalidTransactionException;
import javax.transaction.NotSupportedException;
import javax.transaction.RollbackException;
+import javax.transaction.Status;
import javax.transaction.SystemException;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
@@ -161,7 +159,8 @@
XAResource proxyXAResource = originalServer.generateProxyXAResource(lookupProvider, 2000);
originalServer.storeRootTransaction();
transactionManager.suspend();
- performTransactionalWork(null, new LinkedList<Integer>(Arrays.asList(new Integer[] { 2000 })), remainingTimeout, currentXid, 1, false, false);
+ performTransactionalWork(null, new LinkedList<Integer>(Arrays.asList(new Integer[] { 2000 })), remainingTimeout, currentXid, 1, false,
+ false);
transactionManager.resume(originalTransaction);
originalTransaction.enlistResource(proxyXAResource);
// Needs a second resource to make sure we dont get the one
@@ -238,7 +237,8 @@
originalServer.storeRootTransaction();
XAResource proxyXAResource = originalServer.generateProxyXAResource(lookupProvider, 2000);
transactionManager.suspend();
- performTransactionalWork(null, new LinkedList<Integer>(Arrays.asList(new Integer[] { 2000 })), remainingTimeout, currentXid, 2, false, false);
+ performTransactionalWork(null, new LinkedList<Integer>(Arrays.asList(new Integer[] { 2000 })), remainingTimeout, currentXid, 2, false,
+ false);
transactionManager.resume(originalTransaction);
originalTransaction.enlistResource(proxyXAResource);
originalServer.removeRootTransaction(currentXid);
@@ -316,7 +316,8 @@
originalServer.storeRootTransaction();
XAResource proxyXAResource = originalServer.generateProxyXAResource(lookupProvider, 2000);
transactionManager.suspend();
- performTransactionalWork(null, new LinkedList<Integer>(Arrays.asList(new Integer[] { 2000 })), remainingTimeout, currentXid, 2, false, false);
+ performTransactionalWork(null, new LinkedList<Integer>(Arrays.asList(new Integer[] { 2000 })), remainingTimeout, currentXid, 2, false,
+ false);
transactionManager.resume(originalTransaction);
originalTransaction.enlistResource(proxyXAResource);
originalServer.removeRootTransaction(currentXid);
@@ -430,7 +431,8 @@
originalServer.storeRootTransaction();
XAResource proxyXAResource = originalServer.generateProxyXAResource(lookupProvider, 2000);
transactionManager.suspend();
- performTransactionalWork(counter, new LinkedList<Integer>(Arrays.asList(new Integer[] { 2000 })), remainingTimeout, currentXid, 2, false, false);
+ performTransactionalWork(counter, new LinkedList<Integer>(Arrays.asList(new Integer[] { 2000 })), remainingTimeout, currentXid, 2, false,
+ false);
transactionManager.resume(originalTransaction);
originalTransaction.enlistResource(proxyXAResource);
originalServer.removeRootTransaction(currentXid);
@@ -493,40 +495,13 @@
List<Integer> nodesToFlowTo = new LinkedList<Integer>(Arrays.asList(new Integer[] { 1000, 2000, 3000, 2000, 1000, 2000, 3000, 1000, 3000 }));
try {
doRecursiveTransactionalWork(startingTimeout, nodesToFlowTo, true, false);
- } catch (InvalidTransactionException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- } catch (IllegalStateException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- } catch (SecurityException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- } catch (NotSupportedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- } catch (SystemException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- } catch (RollbackException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- } catch (XAException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- } catch (HeuristicMixedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- } catch (HeuristicRollbackException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
} catch (ExecuteException e) {
System.err.println("Should be a thread death but cest la vie");
synchronized (phase2CommitAborted) {
phase2CommitAborted.setPhase2CommitAborted(true);
phase2CommitAborted.notify();
}
- } catch (IOException e) {
+ } catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
@@ -615,32 +590,28 @@
}
@Test
- public void testMigrateTransactionRollbackOnlyCommit() throws NotSupportedException, SystemException, IllegalStateException, RollbackException, XAException,
- SecurityException, HeuristicMixedException, HeuristicRollbackException, IOException {
+ public void testMigrateTransactionRollbackOnlyCommit() throws Exception {
int startingTimeout = 0;
List<Integer> nodesToFlowTo = new LinkedList<Integer>(Arrays.asList(new Integer[] { 1000, 2000, 3000, 2000, 1000, 2000, 3000, 1000, 3000 }));
doRecursiveTransactionalWork(startingTimeout, nodesToFlowTo, true, true);
}
@Test
- public void testMigrateTransactionRollbackOnlyRollback() throws NotSupportedException, SystemException, IllegalStateException, RollbackException, XAException,
- SecurityException, HeuristicMixedException, HeuristicRollbackException, IOException {
+ public void testMigrateTransactionRollbackOnlyRollback() throws Exception {
int startingTimeout = 0;
List<Integer> nodesToFlowTo = new LinkedList<Integer>(Arrays.asList(new Integer[] { 1000, 2000, 3000, 2000, 1000, 2000, 3000, 1000, 3000 }));
doRecursiveTransactionalWork(startingTimeout, nodesToFlowTo, false, true);
}
@Test
- public void testMigrateTransactionCommit() throws NotSupportedException, SystemException, IllegalStateException, RollbackException, XAException,
- SecurityException, HeuristicMixedException, HeuristicRollbackException, IOException {
+ public void testMigrateTransactionCommit() throws Exception {
int startingTimeout = 0;
List<Integer> nodesToFlowTo = new LinkedList<Integer>(Arrays.asList(new Integer[] { 1000, 2000, 3000, 2000, 1000, 2000, 3000, 1000, 3000 }));
doRecursiveTransactionalWork(startingTimeout, nodesToFlowTo, true, false);
}
@Test
- public void testMigrateTransactionCommitDiamond() throws NotSupportedException, SystemException, IllegalStateException, RollbackException, XAException,
- SecurityException, HeuristicMixedException, HeuristicRollbackException, IOException {
+ public void testMigrateTransactionCommitDiamond() throws Exception {
int startingTimeout = 0;
List<Integer> nodesToFlowTo = new LinkedList<Integer>(Arrays.asList(new Integer[] { 1000, 2000, 1000, 3000, 1000, 2000, 3000 }));
@@ -648,16 +619,14 @@
}
@Test
- public void testMigrateTransactionRollback() throws NotSupportedException, SystemException, IllegalStateException, RollbackException, XAException,
- SecurityException, HeuristicMixedException, HeuristicRollbackException, IOException {
+ public void testMigrateTransactionRollback() throws Exception {
int startingTimeout = 0;
List<Integer> nodesToFlowTo = new LinkedList<Integer>(Arrays.asList(new Integer[] { 1000, 2000, 3000, 2000, 1000, 2000, 3000, 1000, 3000 }));
doRecursiveTransactionalWork(startingTimeout, nodesToFlowTo, false, false);
}
@Test
- public void testMigrateTransactionRollbackDiamond() throws NotSupportedException, SystemException, IllegalStateException, RollbackException, XAException,
- SecurityException, HeuristicMixedException, HeuristicRollbackException, IOException {
+ public void testMigrateTransactionRollbackDiamond() throws Exception {
int startingTimeout = 0;
List<Integer> nodesToFlowTo = new LinkedList<Integer>(Arrays.asList(new Integer[] { 1000, 2000, 1000, 3000, 1000, 2000, 3000 }));
doRecursiveTransactionalWork(startingTimeout, nodesToFlowTo, false, false);
@@ -709,8 +678,10 @@
assertTrue(getLocalServer(1000).getCompletionCounter().getRollbackCount() == 2);
}
- private void doRecursiveTransactionalWork(int startingTimeout, List<Integer> nodesToFlowTo, boolean commit, boolean rollbackOnlyOnLastNode) throws NotSupportedException, SystemException,
- RollbackException, IllegalStateException, XAException, SecurityException, HeuristicMixedException, HeuristicRollbackException, IOException {
+ private void doRecursiveTransactionalWork(int startingTimeout, List<Integer> nodesToFlowTo, boolean commit, boolean rollbackOnlyOnLastNode)
+ throws Exception {
+ tearDown();
+ setup();
// Start out at the first server
CompletionCounter counter = new CompletionCounter() {
@@ -752,14 +723,34 @@
TransactionManager transactionManager = originalServer.getTransactionManager();
transactionManager.setTransactionTimeout(startingTimeout);
transactionManager.begin();
- Transaction originalTransaction = transactionManager.getTransaction();
+ Transaction transaction = transactionManager.getTransaction();
int remainingTimeout = (int) (originalServer.getTimeLeftBeforeTransactionTimeout() / 1000);
Xid currentXid = originalServer.getCurrentXid();
originalServer.storeRootTransaction();
transactionManager.suspend();
- performTransactionalWork(counter, nodesToFlowTo, remainingTimeout, currentXid, 1, true, rollbackOnlyOnLastNode);
- transactionManager.resume(originalTransaction);
+ DataReturnedFromRemoteServer dataReturnedFromRemoteServer = performTransactionalWork(counter, nodesToFlowTo, remainingTimeout, currentXid, 1, true,
+ rollbackOnlyOnLastNode);
+ transactionManager.resume(transaction);
originalServer.removeRootTransaction(currentXid);
+
+ // Align the local state with the returning state of the
+ // transaction
+ // from the subordinate
+ switch (dataReturnedFromRemoteServer.getTransactionState()) {
+ case Status.STATUS_MARKED_ROLLBACK:
+ case Status.STATUS_ROLLEDBACK:
+ case Status.STATUS_ROLLING_BACK:
+ switch (transaction.getStatus()) {
+ case Status.STATUS_MARKED_ROLLBACK:
+ case Status.STATUS_ROLLEDBACK:
+ case Status.STATUS_ROLLING_BACK:
+ transaction.setRollbackOnly();
+ }
+ break;
+ default:
+ break;
+ }
+
if (commit) {
try {
transactionManager.commit();
@@ -776,9 +767,9 @@
Thread.currentThread().setContextClassLoader(classLoader);
}
- private boolean performTransactionalWork(CompletionCounter counter, List<Integer> nodesToFlowTo, int remainingTimeout, Xid toMigrate,
- int numberOfResourcesToRegister, boolean addSynchronization, boolean rollbackOnlyOnLastNode) throws RollbackException, IllegalStateException, XAException, SystemException,
- NotSupportedException, IOException {
+ private DataReturnedFromRemoteServer performTransactionalWork(CompletionCounter counter, List<Integer> nodesToFlowTo, int remainingTimeout, Xid toMigrate,
+ int numberOfResourcesToRegister, boolean addSynchronization, boolean rollbackOnlyOnLastNode) throws RollbackException, IllegalStateException,
+ XAException, SystemException, NotSupportedException, IOException {
Integer currentServerName = nodesToFlowTo.remove(0);
LocalServer currentServer = getLocalServer(currentServerName);
@@ -786,55 +777,90 @@
Thread.currentThread().setContextClassLoader(currentServer.getClass().getClassLoader());
boolean requiresProxyAtPreviousServer = !currentServer.getAndResumeTransaction(remainingTimeout, toMigrate);
+
// Perform work on the migrated transaction
- TransactionManager transactionManager = currentServer.getTransactionManager();
- Transaction transaction = transactionManager.getTransaction();
- if (addSynchronization) {
- transaction.registerSynchronization(new TestSynchronization(currentServer.getNodeName()));
- }
- for (int i = 0; i < numberOfResourcesToRegister; i++) {
- transaction.enlistResource(new TestResource(counter, currentServer.getNodeName(), false));
- }
+ {
+ TransactionManager transactionManager = currentServer.getTransactionManager();
+ Transaction transaction = transactionManager.getTransaction();
+ if (addSynchronization) {
+ transaction.registerSynchronization(new TestSynchronization(currentServer.getNodeName()));
+ }
+ for (int i = 0; i < numberOfResourcesToRegister; i++) {
+ transaction.enlistResource(new TestResource(counter, currentServer.getNodeName(), false));
+ }
- if (nodesToFlowTo.isEmpty()) {
- if (rollbackOnlyOnLastNode) {
+ if (rollbackOnlyOnLastNode && nodesToFlowTo.isEmpty()) {
transaction.setRollbackOnly();
}
- } else {
- Integer nextServerNodeName = nodesToFlowTo.get(0);
+ }
- // FLOW THE TRANSACTION
- remainingTimeout = (int) (currentServer.getTimeLeftBeforeTransactionTimeout() / 1000);
+ if (!nodesToFlowTo.isEmpty()) {
- // STORE AND SUSPEND THE TRANSACTION
- Xid currentXid = currentServer.getCurrentXid();
- XAResource proxyXAResource = currentServer.generateProxyXAResource(lookupProvider, nodesToFlowTo.get(0));
- transactionManager.suspend();
+ TransactionManager transactionManager = currentServer.getTransactionManager();
+ Transaction transaction = transactionManager.getTransaction();
+ int status = transaction.getStatus();
- boolean proxyRequired = performTransactionalWork(counter, nodesToFlowTo, remainingTimeout, currentXid, numberOfResourcesToRegister,
- addSynchronization, rollbackOnlyOnLastNode);
- transactionManager.resume(transaction);
+ // Only propagate active transactions - this may be inactive through
+ // user code (rollback/setRollbackOnly) or it may be inactive due to
+ // the transaction reaper
+ if (status == Status.STATUS_ACTIVE) {
+ Integer nextServerNodeName = nodesToFlowTo.get(0);
- // Create a proxy for the new server if necessary, this can orphan
- // the remote server but XA recovery will handle that on the remote
- // server
- // The alternative is to always create a proxy but this is a
- // performance drain and will result in multiple subordinate
- // transactions and performance issues
- if (proxyRequired) {
- transaction.enlistResource(proxyXAResource);
- transaction.registerSynchronization(currentServer.generateProxySynchronization(lookupProvider, currentServer.getNodeName(), nextServerNodeName,
- toMigrate));
- } else {
- currentServer.cleanupProxyXAResource(proxyXAResource);
+ // FLOW THE TRANSACTION
+ remainingTimeout = (int) (currentServer.getTimeLeftBeforeTransactionTimeout() / 1000);
+
+ // STORE AND SUSPEND THE TRANSACTION
+ Xid currentXid = currentServer.getCurrentXid();
+ XAResource proxyXAResource = currentServer.generateProxyXAResource(lookupProvider, nodesToFlowTo.get(0));
+ transactionManager.suspend();
+
+ DataReturnedFromRemoteServer dataReturnedFromRemoteServer = performTransactionalWork(counter, nodesToFlowTo, remainingTimeout, currentXid,
+ numberOfResourcesToRegister, addSynchronization, rollbackOnlyOnLastNode);
+ transactionManager.resume(transaction);
+
+ // Create a proxy for the new server if necessary, this can
+ // orphan
+ // the remote server but XA recovery will handle that on the
+ // remote
+ // server
+ // The alternative is to always create a proxy but this is a
+ // performance drain and will result in multiple subordinate
+ // transactions and performance issues
+ if (dataReturnedFromRemoteServer.isProxyRequired()) {
+ transaction.enlistResource(proxyXAResource);
+ transaction.registerSynchronization(currentServer.generateProxySynchronization(lookupProvider, currentServer.getNodeName(),
+ nextServerNodeName, toMigrate));
+ } else {
+ currentServer.cleanupProxyXAResource(proxyXAResource);
+ }
+
+ // Align the local state with the returning state of the
+ // transaction
+ // from the subordinate
+ switch (dataReturnedFromRemoteServer.getTransactionState()) {
+ case Status.STATUS_MARKED_ROLLBACK:
+ case Status.STATUS_ROLLEDBACK:
+ case Status.STATUS_ROLLING_BACK:
+ switch (transaction.getStatus()) {
+ case Status.STATUS_MARKED_ROLLBACK:
+ case Status.STATUS_ROLLEDBACK:
+ case Status.STATUS_ROLLING_BACK:
+ transaction.setRollbackOnly();
+ }
+ break;
+ default:
+ break;
+ }
}
}
-
+ TransactionManager transactionManager = currentServer.getTransactionManager();
+ int transactionState = transactionManager.getStatus();
// SUSPEND THE TRANSACTION WHEN YOU ARE READY TO RETURN TO YOUR CALLER
transactionManager.suspend();
-
+ // Return to the previous caller back over the transport/classloader
+ // boundary in this case
Thread.currentThread().setContextClassLoader(classLoader);
- return requiresProxyAtPreviousServer;
+ return new DataReturnedFromRemoteServer(requiresProxyAtPreviousServer, transactionState);
}
private static LocalServer getLocalServer(Integer jndiName) {
@@ -863,4 +889,24 @@
this.phase2CommitAborted = phase2CommitAborted;
}
}
+
+ private class DataReturnedFromRemoteServer {
+ private boolean proxyRequired;
+
+ private int transactionState;
+
+ public DataReturnedFromRemoteServer(boolean proxyRequired, int transactionState) {
+ this.proxyRequired = proxyRequired;
+ this.transactionState = transactionState;
+ }
+
+ public boolean isProxyRequired() {
+ return proxyRequired;
+ }
+
+ public int getTransactionState() {
+ return transactionState;
+ }
+ }
+
}
Modified: labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/impl/ServerImpl.java
===================================================================
--- labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/impl/ServerImpl.java 2011-10-21 07:27:41 UTC (rev 37646)
+++ labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/impl/ServerImpl.java 2011-10-21 08:16:49 UTC (rev 37647)
@@ -272,7 +272,7 @@
@Override
public long getTimeLeftBeforeTransactionTimeout() throws RollbackException {
- return ((TransactionTimeoutConfiguration) transactionManagerService.getTransactionManager()).getTimeLeftBeforeTransactionTimeout(false);
+ return ((TransactionTimeoutConfiguration) transactionManagerService.getTransactionManager()).getTimeLeftBeforeTransactionTimeout(true);
}
@Override
More information about the jboss-svn-commits
mailing list