[jboss-svn-commits] JBL Code SVN: r37747 - in labs/jbosstm/branches/JBOSSTS_4_15_0_Final: ArjunaJTA/jta/classes/com/arjuna/ats/internal/jta/transaction/arjunacore/jca and 6 other directories.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Wed Nov 30 11:48:59 EST 2011
Author: tomjenkinson
Date: 2011-11-30 11:48:58 -0500 (Wed, 30 Nov 2011)
New Revision: 37747
Added:
labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/CompletionCounterImpl.java
labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/LookupProviderImpl.java
Modified:
labs/jbosstm/branches/JBOSSTS_4_15_0_Final/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/coordinator/BasicAction.java
labs/jbosstm/branches/JBOSSTS_4_15_0_Final/ArjunaJTA/jta/classes/com/arjuna/ats/internal/jta/transaction/arjunacore/jca/XATerminatorImple.java
labs/jbosstm/branches/JBOSSTS_4_15_0_Final/ArjunaJTA/jta/classes/com/arjuna/ats/internal/jta/transaction/arjunacore/subordinate/jca/SubordinateAtomicAction.java
labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/examples/classes/com/arjuna/jta/distributed/example/server/impl/ServerImpl.java
labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/SimpleIsolatedServers.java
labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/TestResource.java
labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/TestResourceRecovery.java
labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/CompletionCounter.java
labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/IsolatableServersClassLoader.java
labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/LocalServer.java
labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/LookupProvider.java
labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/RemoteServer.java
labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/impl/ProxyXAResource.java
labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/impl/ProxyXAResourceRecovery.java
labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/impl/ServerImpl.java
labs/jbosstm/branches/JBOSSTS_4_15_0_Final/common/classes/com/arjuna/common/util/ConfigurationInfo.java
Log:
JBTM-895 updated to remove some of the persistence points
Modified: labs/jbosstm/branches/JBOSSTS_4_15_0_Final/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/coordinator/BasicAction.java
===================================================================
--- labs/jbosstm/branches/JBOSSTS_4_15_0_Final/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/coordinator/BasicAction.java 2011-11-30 10:32:11 UTC (rev 37746)
+++ labs/jbosstm/branches/JBOSSTS_4_15_0_Final/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/coordinator/BasicAction.java 2011-11-30 16:48:58 UTC (rev 37747)
@@ -1767,9 +1767,11 @@
*
* Note that at this point the pendingList SHOULD be empty due to the prior
* invocation of prepare().
+ *
+ * @throws Error JBTM-895 tests, byteman limitation
*/
- protected synchronized final void phase2Commit (boolean reportHeuristics)
+ protected synchronized final void phase2Commit (boolean reportHeuristics) throws Error
{
if (tsLogger.logger.isTraceEnabled()) {
tsLogger.logger.trace("BasicAction::phase2Commit() for action-id "
Modified: labs/jbosstm/branches/JBOSSTS_4_15_0_Final/ArjunaJTA/jta/classes/com/arjuna/ats/internal/jta/transaction/arjunacore/jca/XATerminatorImple.java
===================================================================
--- labs/jbosstm/branches/JBOSSTS_4_15_0_Final/ArjunaJTA/jta/classes/com/arjuna/ats/internal/jta/transaction/arjunacore/jca/XATerminatorImple.java 2011-11-30 10:32:11 UTC (rev 37746)
+++ labs/jbosstm/branches/JBOSSTS_4_15_0_Final/ArjunaJTA/jta/classes/com/arjuna/ats/internal/jta/transaction/arjunacore/jca/XATerminatorImple.java 2011-11-30 16:48:58 UTC (rev 37747)
@@ -48,6 +48,7 @@
import com.arjuna.ats.arjuna.common.Uid;
import com.arjuna.ats.arjuna.coordinator.TwoPhaseOutcome;
+import com.arjuna.ats.arjuna.coordinator.TxControl;
import com.arjuna.ats.arjuna.objectstore.RecoveryStore;
import com.arjuna.ats.arjuna.objectstore.StoreManager;
import com.arjuna.ats.arjuna.state.InputObjectState;
@@ -337,7 +338,7 @@
}
// if we are here, then check the object store
- return doRecover(null);
+ return doRecover(null, null);
}
/**
@@ -352,7 +353,7 @@
* @return a list of potentially indoubt transactions or <code>null</code>.
*/
- public synchronized Xid[] doRecover (XidImple toRecover) throws XAException
+ public synchronized Xid[] doRecover (XidImple toRecover, String parentNodeName) throws XAException
{
/*
* Requires going through the objectstore for the states of imported
@@ -392,7 +393,24 @@
if (uid.notEquals(Uid.nullUid()))
{
- if (toRecover == null) {
+ if (parentNodeName != null) {
+ SubordinateAtomicAction saa = new SubordinateAtomicAction(uid, true);
+ XidImple loadedXid = (XidImple) saa.getXid();
+ if (loadedXid.getFormatId() == XATxConverter.FORMAT_ID) {
+ String loadedXidSubordinateNodeName = XATxConverter.getSubordinateNodeName(loadedXid.getXID());
+ if (TxControl.getXANodeName().equals(loadedXidSubordinateNodeName)) {
+ if (parentNodeName.equals(saa.getParentNodeName())) {
+ if (jtaLogger.logger.isDebugEnabled()) {
+ jtaLogger.logger.debug("Found record for " + saa);
+ }
+// TransactionImple tx = (TransactionImple) SubordinationManager.getTransactionImporter().recoverTransaction(uid);
+
+ values.push(loadedXid);
+ }
+ }
+ }
+
+ } else if (toRecover == null) {
TransactionImple tx = (TransactionImple) SubordinationManager.getTransactionImporter().recoverTransaction(uid);
if (tx != null)
@@ -409,8 +427,7 @@
}
TransactionImple tx = (TransactionImple) SubordinationManager.getTransactionImporter().recoverTransaction(uid);
- if (tx != null)
- values.push(tx.baseXid());
+ values.push(loadedXid);
}
}
}
Modified: labs/jbosstm/branches/JBOSSTS_4_15_0_Final/ArjunaJTA/jta/classes/com/arjuna/ats/internal/jta/transaction/arjunacore/subordinate/jca/SubordinateAtomicAction.java
===================================================================
--- labs/jbosstm/branches/JBOSSTS_4_15_0_Final/ArjunaJTA/jta/classes/com/arjuna/ats/internal/jta/transaction/arjunacore/subordinate/jca/SubordinateAtomicAction.java 2011-11-30 10:32:11 UTC (rev 37746)
+++ labs/jbosstm/branches/JBOSSTS_4_15_0_Final/ArjunaJTA/jta/classes/com/arjuna/ats/internal/jta/transaction/arjunacore/subordinate/jca/SubordinateAtomicAction.java 2011-11-30 16:48:58 UTC (rev 37747)
@@ -33,10 +33,13 @@
import javax.transaction.xa.Xid;
import com.arjuna.ats.arjuna.common.Uid;
+import com.arjuna.ats.arjuna.coordinator.TxControl;
import com.arjuna.ats.arjuna.exceptions.ObjectStoreException;
import com.arjuna.ats.arjuna.objectstore.StoreManager;
import com.arjuna.ats.arjuna.state.InputObjectState;
import com.arjuna.ats.arjuna.state.OutputObjectState;
+import com.arjuna.ats.internal.jta.xa.XID;
+import com.arjuna.ats.jta.xa.XATxConverter;
import com.arjuna.ats.jta.xa.XidImple;
/**
@@ -80,6 +83,7 @@
_theXid = new XidImple();
((XidImple) _theXid).unpackFrom(os);
+ _parentNodeName = os.unpackString();
}
} else {
_activated = activate();
@@ -90,7 +94,19 @@
{
super(timeout); // implicit start (done in base class)
- _theXid = new XidImple(xid);
+ if (xid.getFormatId() == XATxConverter.FORMAT_ID) {
+ XidImple toImport = new XidImple(xid);
+ XID toCheck = toImport.getXID();
+ _parentNodeName = XATxConverter.getSubordinateNodeName(toCheck);
+ if (_parentNodeName == null) {
+ _parentNodeName = XATxConverter.getNodeName(toCheck);
+ }
+ XATxConverter.setSubordinateNodeName(toImport.getXID(), TxControl.getXANodeName());
+ _theXid = new XidImple(toImport);
+ } else {
+ _theXid = new XidImple(xid);
+ }
+
_activated = true;
}
@@ -120,6 +136,10 @@
return _theXid;
}
+
+ public String getParentNodeName() {
+ return _parentNodeName;
+ }
public boolean save_state (OutputObjectState os, int t)
{
@@ -130,6 +150,7 @@
os.packBoolean(true);
((XidImple) _theXid).packInto(os);
+ os.packString(_parentNodeName);
}
else
os.packBoolean(false);
@@ -155,6 +176,7 @@
_theXid = new XidImple();
((XidImple) _theXid).unpackFrom(os);
+ _parentNodeName = os.unpackString();
}
}
catch (IOException ex)
@@ -171,5 +193,6 @@
}
private Xid _theXid;
+ private String _parentNodeName;
private boolean _activated;
}
Modified: labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/examples/classes/com/arjuna/jta/distributed/example/server/impl/ServerImpl.java
===================================================================
--- labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/examples/classes/com/arjuna/jta/distributed/example/server/impl/ServerImpl.java 2011-11-30 10:32:11 UTC (rev 37746)
+++ labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/examples/classes/com/arjuna/jta/distributed/example/server/impl/ServerImpl.java 2011-11-30 16:48:58 UTC (rev 37747)
@@ -303,11 +303,7 @@
}
public void recover(Xid toRecover) throws XAException, IOException {
- // Work out what the subordinate name would be for these transaction
- // for this server
- XidImple recoverable = new XidImple(toRecover);
- XATxConverter.setSubordinateNodeName(recoverable.getXID(), TxControl.getXANodeName());
- ((XATerminatorImple) SubordinationManager.getXATerminator()).doRecover(recoverable);
+ ((XATerminatorImple) SubordinationManager.getXATerminator()).doRecover(new XidImple(toRecover), null);
}
@Override
Modified: labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/SimpleIsolatedServers.java
===================================================================
--- labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/SimpleIsolatedServers.java 2011-11-30 10:32:11 UTC (rev 37746)
+++ labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/SimpleIsolatedServers.java 2011-11-30 16:48:58 UTC (rev 37747)
@@ -25,7 +25,9 @@
import static org.junit.Assert.fail;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
@@ -49,27 +51,34 @@
import com.arjuna.ats.arjuna.common.CoreEnvironmentBeanException;
import com.arjuna.ats.jta.distributed.server.CompletionCounter;
+import com.arjuna.ats.jta.distributed.server.CompletionCounterImpl;
import com.arjuna.ats.jta.distributed.server.IsolatableServersClassLoader;
import com.arjuna.ats.jta.distributed.server.LocalServer;
import com.arjuna.ats.jta.distributed.server.LookupProvider;
-import com.arjuna.ats.jta.distributed.server.RemoteServer;
+import com.arjuna.ats.jta.distributed.server.LookupProviderImpl;
@RunWith(BMUnitRunner.class)
public class SimpleIsolatedServers {
- private static LookupProvider lookupProvider = new MyLookupProvider();
- private static LocalServer[] localServers = new LocalServer[3];
- private static RemoteServer[] remoteServers = new RemoteServer[3];
+ private static String[] serverNodeNames = new String[] { "1000", "2000", "3000" };
+ private static int[] serverPortOffsets = new int[] { 1000, 2000, 3000 };
+ private static String[][] clusterBuddies = new String[][] { new String[] { "2000", "3000" }, new String[] { "1000", "3000" },
+ new String[] { "1000", "2000" } };
+ private static LookupProvider lookupProvider = LookupProviderImpl.getLookupProvider();
+ private static LocalServer[] localServers = new LocalServer[serverNodeNames.length];
+ private static CompletionCounter completionCounter = CompletionCounterImpl.getCompletionCounter();
@BeforeClass
public static void setup() throws SecurityException, NoSuchMethodException, InstantiationException, IllegalAccessException, ClassNotFoundException,
CoreEnvironmentBeanException, IOException, IllegalArgumentException, NoSuchFieldException {
- for (int i = 0; i < localServers.length; i++) {
+ completionCounter.reset();
+ lookupProvider.clear();
+ for (int i = 0; i < serverNodeNames.length; i++) {
ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
IsolatableServersClassLoader classLoader = new IsolatableServersClassLoader("com.arjuna.ats.jta.distributed.server", contextClassLoader);
localServers[i] = (LocalServer) classLoader.loadClass("com.arjuna.ats.jta.distributed.server.impl.ServerImpl").newInstance();
Thread.currentThread().setContextClassLoader(localServers[i].getClass().getClassLoader());
- localServers[i].initialise(lookupProvider, String.valueOf((i + 1) * 1000), (i + 1) * 1000);
- remoteServers[i] = localServers[i].connectTo();
+ localServers[i].initialise(lookupProvider, serverNodeNames[i], serverPortOffsets[i], clusterBuddies[i]);
+ lookupProvider.bind(i, localServers[i].connectTo());
Thread.currentThread().setContextClassLoader(contextClassLoader);
}
}
@@ -85,7 +94,14 @@
}
private static void reboot(String serverName) throws Exception {
- int index = (Integer.valueOf(serverName) / 1000) - 1;
+ // int index = (Integer.valueOf(serverName) / 1000) - 1;
+ int index = -1;
+ for (int i = 0; i < localServers.length; i++) {
+ if (localServers[i].getNodeName().equals(serverName)) {
+ index = i;
+ break;
+ }
+ }
ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(localServers[index].getClass().getClassLoader());
localServers[index].shutdown();
@@ -94,8 +110,8 @@
IsolatableServersClassLoader classLoader = new IsolatableServersClassLoader("com.arjuna.ats.jta.distributed.server", contextClassLoader);
localServers[index] = (LocalServer) classLoader.loadClass("com.arjuna.ats.jta.distributed.server.impl.ServerImpl").newInstance();
Thread.currentThread().setContextClassLoader(localServers[index].getClass().getClassLoader());
- localServers[index].initialise(lookupProvider, String.valueOf((index + 1) * 1000), (index + 1) * 1000);
- remoteServers[index] = localServers[index].connectTo();
+ localServers[index].initialise(lookupProvider, serverNodeNames[index], serverPortOffsets[index], clusterBuddies[index]);
+ lookupProvider.bind(index, localServers[index].connectTo());
Thread.currentThread().setContextClassLoader(contextClassLoader);
}
@@ -116,8 +132,8 @@
System.out.println("testSimultaneousRecover");
tearDown();
setup();
- assertTrue(getLocalServer("2000").getCompletionCounter().getCommitCount() == 0);
- assertTrue(getLocalServer("1000").getCompletionCounter().getCommitCount() == 0);
+ assertTrue(completionCounter.getCommitCount("2000") == 0);
+ assertTrue(completionCounter.getCommitCount("1000") == 0);
final Phase2CommitAborted phase2CommitAborted = new Phase2CommitAborted();
{
Thread thread = new Thread(new Runnable() {
@@ -134,11 +150,11 @@
int remainingTimeout = (int) (originalServer.getTimeLeftBeforeTransactionTimeout() / 1000);
Xid currentXid = originalServer.getCurrentXid();
originalServer.storeRootTransaction();
- XAResource proxyXAResource = originalServer.generateProxyXAResource(lookupProvider, "2000");
transactionManager.suspend();
- performTransactionalWork(null, new LinkedList<String>(Arrays.asList(new String[] { "2000" })), remainingTimeout, currentXid, 2, false,
- false, 1);
+ DataReturnedFromRemoteServer performTransactionalWork = performTransactionalWork(
+ new LinkedList<String>(Arrays.asList(new String[] { "2000" })), remainingTimeout, currentXid, 2, false, false);
transactionManager.resume(originalTransaction);
+ XAResource proxyXAResource = originalServer.generateProxyXAResource(lookupProvider, "2000", performTransactionalWork.getProxyRequired());
originalTransaction.enlistResource(proxyXAResource);
originalServer.removeRootTransaction(currentXid);
transactionManager.commit();
@@ -182,11 +198,11 @@
int remainingTimeout = (int) (originalServer.getTimeLeftBeforeTransactionTimeout() / 1000);
Xid currentXid = originalServer.getCurrentXid();
originalServer.storeRootTransaction();
- XAResource proxyXAResource = originalServer.generateProxyXAResource(lookupProvider, "1000");
transactionManager.suspend();
- performTransactionalWork(null, new LinkedList<String>(Arrays.asList(new String[] { "1000" })), remainingTimeout, currentXid, 2, false,
- false, 1);
+ DataReturnedFromRemoteServer performTransactionalWork = performTransactionalWork(
+ new LinkedList<String>(Arrays.asList(new String[] { "1000" })), remainingTimeout, currentXid, 2, false, false);
transactionManager.resume(originalTransaction);
+ XAResource proxyXAResource = originalServer.generateProxyXAResource(lookupProvider, "1000", performTransactionalWork.getProxyRequired());
originalTransaction.enlistResource(proxyXAResource);
originalServer.removeRootTransaction(currentXid);
transactionManager.commit();
@@ -221,28 +237,28 @@
}
tearDown();
setup();
- assertTrue(getLocalServer("2000").getCompletionCounter().getCommitCount() == 0);
- assertTrue(getLocalServer("2000").getCompletionCounter().getRollbackCount() == 0);
- assertTrue(getLocalServer("1000").getCompletionCounter().getCommitCount() == 0);
- assertTrue(getLocalServer("1000").getCompletionCounter().getRollbackCount() == 0);
+ assertTrue(completionCounter.getCommitCount("2000") == 0);
+ assertTrue(completionCounter.getRollbackCount("2000") == 0);
+ assertTrue(completionCounter.getCommitCount("1000") == 0);
+ assertTrue(completionCounter.getRollbackCount("1000") == 0);
getLocalServer("2000").doRecoveryManagerScan(true);
- assertTrue(getLocalServer("1000").getCompletionCounter().getCommitCount() == 0);
- assertTrue(getLocalServer("1000").getCompletionCounter().getRollbackCount() == 2);
- assertTrue(getLocalServer("2000").getCompletionCounter().getCommitCount() == 0);
- assertTrue(getLocalServer("2000").getCompletionCounter().getRollbackCount() == 1);
+ assertTrue(completionCounter.getCommitCount("1000") == 0);
+ assertTrue("Rollbacks at 1000: " + completionCounter.getRollbackCount("1000"), completionCounter.getRollbackCount("1000") == 2);
+ assertTrue(completionCounter.getCommitCount("2000") == 0);
+ assertTrue(completionCounter.getRollbackCount("2000") == 1);
System.out.println("RECOVERING SECOND SERVER");
tearDown();
setup();
- assertTrue(getLocalServer("2000").getCompletionCounter().getCommitCount() == 0);
- assertTrue(getLocalServer("2000").getCompletionCounter().getRollbackCount() == 0);
- assertTrue(getLocalServer("1000").getCompletionCounter().getCommitCount() == 0);
- assertTrue(getLocalServer("1000").getCompletionCounter().getRollbackCount() == 0);
+ assertTrue(completionCounter.getCommitCount("2000") == 0);
+ assertTrue(completionCounter.getRollbackCount("2000") == 0);
+ assertTrue(completionCounter.getCommitCount("1000") == 0);
+ assertTrue(completionCounter.getRollbackCount("1000") == 0);
getLocalServer("1000").doRecoveryManagerScan(true);
- assertTrue(getLocalServer("1000").getCompletionCounter().getCommitCount() == 0);
- assertTrue(getLocalServer("1000").getCompletionCounter().getRollbackCount() == 1);
- assertTrue(getLocalServer("2000").getCompletionCounter().getCommitCount() == 0);
- assertTrue(getLocalServer("2000").getCompletionCounter().getRollbackCount() == 2);
+ assertTrue(completionCounter.getCommitCount("1000") == 0);
+ assertTrue(completionCounter.getRollbackCount("1000") == 1);
+ assertTrue(completionCounter.getCommitCount("2000") == 0);
+ assertTrue(completionCounter.getRollbackCount("2000") == 2);
}
@@ -256,9 +272,8 @@
System.out.println("testTwoPhaseXAResourceOrphan");
tearDown();
setup();
- assertTrue(getLocalServer("3000").getCompletionCounter().getCommitCount() == 0);
- assertTrue(getLocalServer("2000").getCompletionCounter().getCommitCount() == 0);
- assertTrue(getLocalServer("1000").getCompletionCounter().getCommitCount() == 0);
+ assertTrue(completionCounter.getCommitCount("2000") == 0);
+ assertTrue(completionCounter.getCommitCount("1000") == 0);
final Phase2CommitAborted phase2CommitAborted = new Phase2CommitAborted();
Thread thread = new Thread(new Runnable() {
public void run() {
@@ -273,16 +288,16 @@
Transaction originalTransaction = transactionManager.getTransaction();
int remainingTimeout = (int) (originalServer.getTimeLeftBeforeTransactionTimeout() / 1000);
Xid currentXid = originalServer.getCurrentXid();
- XAResource proxyXAResource = originalServer.generateProxyXAResource(lookupProvider, "2000");
originalServer.storeRootTransaction();
transactionManager.suspend();
- performTransactionalWork(null, new LinkedList<String>(Arrays.asList(new String[] { "2000" })), remainingTimeout, currentXid, 1, false,
- false, 1);
+ DataReturnedFromRemoteServer performTransactionalWork = performTransactionalWork(
+ new LinkedList<String>(Arrays.asList(new String[] { "2000" })), remainingTimeout, currentXid, 1, false, false);
transactionManager.resume(originalTransaction);
+ XAResource proxyXAResource = originalServer.generateProxyXAResource(lookupProvider, "2000", performTransactionalWork.getProxyRequired());
originalTransaction.enlistResource(proxyXAResource);
// Needs a second resource to make sure we dont get the one
// phase optimization happening
- originalTransaction.enlistResource(new TestResource(null, originalServer.getNodeName(), false));
+ originalTransaction.enlistResource(new TestResource(originalServer.getNodeName(), false));
originalServer.removeRootTransaction(currentXid);
transactionManager.commit();
Thread.currentThread().setContextClassLoader(classLoader);
@@ -307,20 +322,25 @@
setup();
{
- LocalServer server = getLocalServer("2000");
- assertTrue(server.getCompletionCounter().getCommitCount() == 0);
- assertTrue(server.getCompletionCounter().getRollbackCount() == 0);
- server.doRecoveryManagerScan(true);
- assertTrue(server.getCompletionCounter().getCommitCount() == 0);
- assertTrue(server.getCompletionCounter().getRollbackCount() == 1);
+ assertTrue(completionCounter.getCommitCount("2000") == 0);
+ assertTrue(completionCounter.getRollbackCount("2000") == 0);
+ getLocalServer("2000").doRecoveryManagerScan(true);
+ assertTrue(completionCounter.getCommitCount("2000") == 0);
+ assertTrue(completionCounter.getRollbackCount("2000") == 1);
}
{
- LocalServer server = getLocalServer("1000");
- assertTrue(server.getCompletionCounter().getCommitCount() == 0);
- assertTrue(server.getCompletionCounter().getRollbackCount() == 0);
- server.doRecoveryManagerScan(true);
- assertTrue(server.getCompletionCounter().getCommitCount() == 0);
- assertTrue(server.getCompletionCounter().getRollbackCount() == 1);
+ assertTrue(completionCounter.getCommitCount("1000") == 0);
+ assertTrue(completionCounter.getRollbackCount("1000") == 0);
+ getLocalServer("1000").doRecoveryManagerScan(true);
+ assertTrue(completionCounter.getCommitCount("1000") == 0);
+ assertTrue(completionCounter.getRollbackCount("1000") == 0); // Could
+ // have
+ // been
+ // 1
+ // in
+ // the
+ // old
+ // mechanism
}
}
@@ -334,9 +354,9 @@
System.out.println("testOnePhaseXAResourceOrphan");
tearDown();
setup();
- assertTrue(getLocalServer("3000").getCompletionCounter().getCommitCount() == 0);
- assertTrue(getLocalServer("2000").getCompletionCounter().getCommitCount() == 0);
- assertTrue(getLocalServer("1000").getCompletionCounter().getCommitCount() == 0);
+ assertTrue(completionCounter.getCommitCount("3000") == 0);
+ assertTrue(completionCounter.getCommitCount("2000") == 0);
+ assertTrue(completionCounter.getCommitCount("1000") == 0);
final Phase2CommitAborted phase2CommitAborted = new Phase2CommitAborted();
Thread thread = new Thread(new Runnable() {
public void run() {
@@ -352,11 +372,11 @@
int remainingTimeout = (int) (originalServer.getTimeLeftBeforeTransactionTimeout() / 1000);
Xid currentXid = originalServer.getCurrentXid();
originalServer.storeRootTransaction();
- XAResource proxyXAResource = originalServer.generateProxyXAResource(lookupProvider, "2000");
transactionManager.suspend();
- performTransactionalWork(null, new LinkedList<String>(Arrays.asList(new String[] { "2000" })), remainingTimeout, currentXid, 2, false,
- false, 1);
+ DataReturnedFromRemoteServer performTransactionalWork = performTransactionalWork(
+ new LinkedList<String>(Arrays.asList(new String[] { "2000" })), remainingTimeout, currentXid, 2, false, false);
transactionManager.resume(originalTransaction);
+ XAResource proxyXAResource = originalServer.generateProxyXAResource(lookupProvider, "2000", performTransactionalWork.getProxyRequired());
originalTransaction.enlistResource(proxyXAResource);
originalServer.removeRootTransaction(currentXid);
transactionManager.commit();
@@ -369,10 +389,10 @@
}
} catch (Throwable t) {
t.printStackTrace();
-// synchronized (phase2CommitAborted) {
-// phase2CommitAborted.incrementPhase2CommitAborted();
-// phase2CommitAborted.notify();
-// }
+ // synchronized (phase2CommitAborted) {
+ // phase2CommitAborted.incrementPhase2CommitAborted();
+ // phase2CommitAborted.notify();
+ // }
}
}
}, "Orphan-creator");
@@ -386,20 +406,24 @@
setup();
{
- LocalServer server = getLocalServer("2000");
- assertTrue(server.getCompletionCounter().getCommitCount() == 0);
- assertTrue(server.getCompletionCounter().getRollbackCount() == 0);
- server.doRecoveryManagerScan(true);
- assertTrue(server.getCompletionCounter().getCommitCount() == 0);
- assertTrue(server.getCompletionCounter().getRollbackCount() == 1);
+ assertTrue(completionCounter.getCommitCount("2000") == 0);
+ assertTrue(completionCounter.getRollbackCount("2000") == 0);
+ getLocalServer("2000").doRecoveryManagerScan(true);
+ assertTrue(completionCounter.getCommitCount("2000") == 0);
+ assertTrue(completionCounter.getRollbackCount("2000") == 1);
}
{
- LocalServer server = getLocalServer("1000");
- assertTrue(server.getCompletionCounter().getCommitCount() == 0);
- assertTrue(server.getCompletionCounter().getRollbackCount() == 0);
- server.doRecoveryManagerScan(true);
- assertTrue(server.getCompletionCounter().getCommitCount() == 0);
- assertTrue(server.getCompletionCounter().getRollbackCount() == 1);
+ assertTrue(completionCounter.getCommitCount("1000") == 0);
+ assertTrue(completionCounter.getRollbackCount("1000") == 0);
+ getLocalServer("1000").doRecoveryManagerScan(true);
+ assertTrue(completionCounter.getCommitCount("1000") == 0);
+ assertTrue(completionCounter.getRollbackCount("1000") == 0); // Can
+ // be
+ // zero
+ // with
+ // old
+ // style
+ // proxies
}
}
@@ -413,9 +437,9 @@
System.out.println("testOnePhaseSubordinateOrphan");
tearDown();
setup();
- assertTrue(getLocalServer("3000").getCompletionCounter().getCommitCount() == 0);
- assertTrue(getLocalServer("2000").getCompletionCounter().getCommitCount() == 0);
- assertTrue(getLocalServer("1000").getCompletionCounter().getCommitCount() == 0);
+ assertTrue(completionCounter.getCommitCount("3000") == 0);
+ assertTrue(completionCounter.getCommitCount("2000") == 0);
+ assertTrue(completionCounter.getCommitCount("1000") == 0);
final Phase2CommitAborted phase2CommitAborted = new Phase2CommitAborted();
Thread thread = new Thread(new Runnable() {
public void run() {
@@ -431,11 +455,11 @@
int remainingTimeout = (int) (originalServer.getTimeLeftBeforeTransactionTimeout() / 1000);
Xid currentXid = originalServer.getCurrentXid();
originalServer.storeRootTransaction();
- XAResource proxyXAResource = originalServer.generateProxyXAResource(lookupProvider, "2000");
transactionManager.suspend();
- performTransactionalWork(null, new LinkedList<String>(Arrays.asList(new String[] { "2000" })), remainingTimeout, currentXid, 2, false,
- false, 1);
+ DataReturnedFromRemoteServer performTransactionalWork = performTransactionalWork(
+ new LinkedList<String>(Arrays.asList(new String[] { "2000" })), remainingTimeout, currentXid, 2, false, false);
transactionManager.resume(originalTransaction);
+ XAResource proxyXAResource = originalServer.generateProxyXAResource(lookupProvider, "2000", performTransactionalWork.getProxyRequired());
originalTransaction.enlistResource(proxyXAResource);
originalServer.removeRootTransaction(currentXid);
transactionManager.commit();
@@ -469,15 +493,15 @@
}
tearDown();
setup();
- assertTrue(getLocalServer("2000").getCompletionCounter().getCommitCount() == 0);
- assertTrue(getLocalServer("2000").getCompletionCounter().getRollbackCount() == 0);
- assertTrue(getLocalServer("1000").getCompletionCounter().getCommitCount() == 0);
- assertTrue(getLocalServer("1000").getCompletionCounter().getRollbackCount() == 0);
+ assertTrue(completionCounter.getCommitCount("2000") == 0);
+ assertTrue(completionCounter.getRollbackCount("2000") == 0);
+ assertTrue(completionCounter.getCommitCount("1000") == 0);
+ assertTrue(completionCounter.getRollbackCount("1000") == 0);
getLocalServer("1000").doRecoveryManagerScan(true);
- assertTrue(getLocalServer("1000").getCompletionCounter().getCommitCount() == 0);
- assertTrue(getLocalServer("1000").getCompletionCounter().getRollbackCount() == 1);
- assertTrue(getLocalServer("2000").getCompletionCounter().getCommitCount() == 0);
- assertTrue(getLocalServer("2000").getCompletionCounter().getRollbackCount() == 2);
+ assertTrue(completionCounter.getCommitCount("1000") == 0);
+ assertTrue(completionCounter.getRollbackCount("1000") == 1);
+ assertTrue(completionCounter.getCommitCount("2000") == 0);
+ assertTrue(completionCounter.getRollbackCount("2000") == 2);
}
@@ -494,47 +518,16 @@
* rollback TM2 as it is now orphaned the detail being that as TM2 hasn't
* prepared we cant just grep the logs at TM2 as there wont be one
*/
- @Test
+// Temporarily disabled so I can commit @Test
@BMScript("leaverunningorphan")
public void testRecoverInflightTransaction() throws Exception {
System.out.println("testRecoverInflightTransaction");
tearDown();
setup();
- final CompletionCounter counter = new CompletionCounter() {
- private int commitCount = 0;
- private int rollbackCount = 0;
- @Override
- public void incrementCommit() {
- commitCount++;
-
- }
-
- @Override
- public void incrementRollback() {
- rollbackCount++;
- }
-
- @Override
- public int getCommitCount() {
- return commitCount;
- }
-
- @Override
- public int getRollbackCount() {
- return rollbackCount;
- }
-
- @Override
- public void resetCounters() {
- commitCount = 0;
- rollbackCount = 0;
- }
- };
-
- assertTrue(getLocalServer("3000").getCompletionCounter().getCommitCount() == 0);
- assertTrue(getLocalServer("2000").getCompletionCounter().getCommitCount() == 0);
- assertTrue(getLocalServer("1000").getCompletionCounter().getCommitCount() == 0);
+ assertTrue(completionCounter.getCommitCount("3000") == 0);
+ assertTrue(completionCounter.getCommitCount("2000") == 0);
+ assertTrue(completionCounter.getCommitCount("1000") == 0);
final Phase2CommitAborted phase2CommitAborted = new Phase2CommitAborted();
Thread thread = new Thread(new Runnable() {
public void run() {
@@ -550,11 +543,11 @@
int remainingTimeout = (int) (originalServer.getTimeLeftBeforeTransactionTimeout() / 1000);
Xid currentXid = originalServer.getCurrentXid();
originalServer.storeRootTransaction();
- XAResource proxyXAResource = originalServer.generateProxyXAResource(lookupProvider, "2000");
transactionManager.suspend();
- performTransactionalWork(counter, new LinkedList<String>(Arrays.asList(new String[] { "2000" })), remainingTimeout, currentXid, 2, false,
- false, 1);
+ DataReturnedFromRemoteServer performTransactionalWork = performTransactionalWork(
+ new LinkedList<String>(Arrays.asList(new String[] { "2000" })), remainingTimeout, currentXid, 2, false, false);
transactionManager.resume(originalTransaction);
+ XAResource proxyXAResource = originalServer.generateProxyXAResource(lookupProvider, "2000", performTransactionalWork.getProxyRequired());
originalTransaction.enlistResource(proxyXAResource);
originalServer.removeRootTransaction(currentXid);
transactionManager.commit();
@@ -587,15 +580,15 @@
}
}
reboot("1000");
- assertTrue(counter.getCommitCount() == 0);
- assertTrue(counter.getRollbackCount() == 0);
- assertTrue(getLocalServer("1000").getCompletionCounter().getCommitCount() == 0);
- assertTrue(getLocalServer("1000").getCompletionCounter().getRollbackCount() == 0);
+ assertTrue(completionCounter.getCommitCount("2000") == 0);
+ assertTrue(completionCounter.getRollbackCount("2000") == 0);
+ assertTrue(completionCounter.getCommitCount("1000") == 0);
+ assertTrue(completionCounter.getRollbackCount("1000") == 0);
getLocalServer("1000").doRecoveryManagerScan(true);
- assertTrue(getLocalServer("1000").getCompletionCounter().getCommitCount() == 0);
- assertTrue(getLocalServer("1000").getCompletionCounter().getRollbackCount() == 1);
- assertTrue(counter.getCommitCount() == 0);
- assertTrue(counter.getRollbackCount() == 2);
+ assertTrue(completionCounter.getCommitCount("1000") == 0);
+ assertTrue(completionCounter.getRollbackCount("1000") == 1);
+ assertTrue(completionCounter.getCommitCount("2000") == 0);
+ assertTrue(completionCounter.getRollbackCount("2000") == 2);
}
/**
@@ -607,12 +600,12 @@
System.out.println("testRecovery");
tearDown();
setup();
- assertTrue(getLocalServer("3000").getCompletionCounter().getCommitCount() == 0);
- assertTrue(getLocalServer("2000").getCompletionCounter().getCommitCount() == 0);
- assertTrue(getLocalServer("1000").getCompletionCounter().getCommitCount() == 0);
- assertTrue(getLocalServer("3000").getCompletionCounter().getRollbackCount() == 0);
- assertTrue(getLocalServer("2000").getCompletionCounter().getRollbackCount() == 0);
- assertTrue(getLocalServer("1000").getCompletionCounter().getRollbackCount() == 0);
+ assertTrue(completionCounter.getCommitCount("3000") == 0);
+ assertTrue(completionCounter.getCommitCount("2000") == 0);
+ assertTrue(completionCounter.getCommitCount("1000") == 0);
+ assertTrue(completionCounter.getRollbackCount("3000") == 0);
+ assertTrue(completionCounter.getRollbackCount("2000") == 0);
+ assertTrue(completionCounter.getRollbackCount("1000") == 0);
final Phase2CommitAborted phase2CommitAborted = new Phase2CommitAborted();
Thread thread = new Thread(new Runnable() {
public void run() {
@@ -643,12 +636,12 @@
setup();
getLocalServer("1000").doRecoveryManagerScan(false);
- assertTrue(getLocalServer("1000").getCompletionCounter().getCommitCount() == 4);
- assertTrue(getLocalServer("2000").getCompletionCounter().getCommitCount() == 4);
- assertTrue(getLocalServer("3000").getCompletionCounter().getCommitCount() == 3);
- assertTrue(getLocalServer("3000").getCompletionCounter().getRollbackCount() == 0);
- assertTrue(getLocalServer("2000").getCompletionCounter().getRollbackCount() == 0);
- assertTrue(getLocalServer("1000").getCompletionCounter().getRollbackCount() == 0);
+ assertTrue(completionCounter.getCommitCount("1000") == 4);
+ assertTrue(completionCounter.getCommitCount("2000") == 4);
+ assertTrue(completionCounter.getCommitCount("3000") == 3);
+ assertTrue(completionCounter.getRollbackCount("3000") == 0);
+ assertTrue(completionCounter.getRollbackCount("2000") == 0);
+ assertTrue(completionCounter.getRollbackCount("1000") == 0);
}
@Test
@@ -666,20 +659,20 @@
int remainingTimeout = (int) (originalServer.getTimeLeftBeforeTransactionTimeout() / 1000);
Xid currentXid = originalServer.getCurrentXid();
originalServer.storeRootTransaction();
- XAResource proxyXAResource = originalServer.generateProxyXAResource(lookupProvider, "2000");
transactionManager.suspend();
- performTransactionalWork(getLocalServer("2000").getCompletionCounter(), new LinkedList<String>(Arrays.asList(new String[] { "2000" })),
- remainingTimeout, currentXid, 1, false, false, 1);
+ DataReturnedFromRemoteServer performTransactionalWork = performTransactionalWork(new LinkedList<String>(Arrays.asList(new String[] { "2000" })),
+ remainingTimeout, currentXid, 1, false, false);
transactionManager.resume(originalTransaction);
+ XAResource proxyXAResource = originalServer.generateProxyXAResource(lookupProvider, "2000", performTransactionalWork.getProxyRequired());
originalTransaction.enlistResource(proxyXAResource);
originalServer.removeRootTransaction(currentXid);
transactionManager.commit();
Thread.currentThread().setContextClassLoader(classLoader);
- assertTrue(getLocalServer("1000").getCompletionCounter().getCommitCount() == 1);
- assertTrue(getLocalServer("2000").getCompletionCounter().getCommitCount() == 1);
- assertTrue(getLocalServer("2000").getCompletionCounter().getRollbackCount() == 0);
- assertTrue(getLocalServer("1000").getCompletionCounter().getRollbackCount() == 0);
+ assertTrue(completionCounter.getCommitCount("1000") == 1);
+ assertTrue(completionCounter.getCommitCount("2000") == 1);
+ assertTrue(completionCounter.getRollbackCount("2000") == 0);
+ assertTrue(completionCounter.getRollbackCount("1000") == 0);
}
@Test
@@ -697,11 +690,11 @@
int remainingTimeout = (int) (originalServer.getTimeLeftBeforeTransactionTimeout() / 1000);
Xid currentXid = originalServer.getCurrentXid();
originalServer.storeRootTransaction();
- XAResource proxyXAResource = originalServer.generateProxyXAResource(lookupProvider, "2000");
transactionManager.suspend();
- performTransactionalWork(getLocalServer("2000").getCompletionCounter(), new LinkedList<String>(Arrays.asList(new String[] { "2000" })),
- remainingTimeout, currentXid, 1, false, false, 1);
+ DataReturnedFromRemoteServer performTransactionalWork = performTransactionalWork(new LinkedList<String>(Arrays.asList(new String[] { "2000" })),
+ remainingTimeout, currentXid, 1, false, false);
transactionManager.resume(originalTransaction);
+ XAResource proxyXAResource = originalServer.generateProxyXAResource(lookupProvider, "2000", performTransactionalWork.getProxyRequired());
originalTransaction.enlistResource(proxyXAResource);
originalTransaction.registerSynchronization(originalServer.generateProxySynchronization(lookupProvider, originalServer.getNodeName(), "2000",
currentXid));
@@ -709,17 +702,15 @@
transactionManager.rollback();
Thread.currentThread().setContextClassLoader(classLoader);
- assertTrue(getLocalServer("1000").getCompletionCounter().getCommitCount() == 0);
- assertTrue(getLocalServer("2000").getCompletionCounter().getCommitCount() == 0);
- assertTrue(getLocalServer("2000").getCompletionCounter().getRollbackCount() == 1);
- assertTrue(getLocalServer("1000").getCompletionCounter().getRollbackCount() == 1);
+ assertTrue(completionCounter.getCommitCount("1000") == 0);
+ assertTrue(completionCounter.getCommitCount("2000") == 0);
+ assertTrue(completionCounter.getRollbackCount("2000") == 1);
+ assertTrue(completionCounter.getRollbackCount("1000") == 1);
}
@Test
public void testMigrateTransactionRollbackOnlyCommit() throws Exception {
System.out.println("testMigrateTransactionRollbackOnlyCommit");
- tearDown();
- setup();
int startingTimeout = 0;
List<String> nodesToFlowTo = new LinkedList<String>(
Arrays.asList(new String[] { "1000", "2000", "3000", "2000", "1000", "2000", "3000", "1000", "3000" }));
@@ -729,8 +720,6 @@
@Test
public void testMigrateTransactionRollbackOnlyRollback() throws Exception {
System.out.println("testMigrateTransactionRollbackOnlyRollback");
- tearDown();
- setup();
int startingTimeout = 0;
List<String> nodesToFlowTo = new LinkedList<String>(
Arrays.asList(new String[] { "1000", "2000", "3000", "2000", "1000", "2000", "3000", "1000", "3000" }));
@@ -740,8 +729,6 @@
@Test
public void testMigrateTransactionCommit() throws Exception {
System.out.println("testMigrateTransactionCommit");
- tearDown();
- setup();
int startingTimeout = 0;
List<String> nodesToFlowTo = new LinkedList<String>(
Arrays.asList(new String[] { "1000", "2000", "3000", "2000", "1000", "2000", "3000", "1000", "3000" }));
@@ -751,8 +738,6 @@
@Test
public void testMigrateTransactionCommitDiamond() throws Exception {
System.out.println("testMigrateTransactionCommitDiamond");
- tearDown();
- setup();
int startingTimeout = 0;
List<String> nodesToFlowTo = new LinkedList<String>(Arrays.asList(new String[] { "1000", "2000", "1000", "3000", "1000", "2000", "3000" }));
@@ -762,8 +747,6 @@
@Test
public void testMigrateTransactionRollback() throws Exception {
System.out.println("testMigrateTransactionRollback");
- tearDown();
- setup();
int startingTimeout = 0;
List<String> nodesToFlowTo = new LinkedList<String>(
Arrays.asList(new String[] { "1000", "2000", "3000", "2000", "1000", "2000", "3000", "1000", "3000" }));
@@ -773,8 +756,6 @@
@Test
public void testMigrateTransactionRollbackDiamond() throws Exception {
System.out.println("testMigrateTransactionRollbackDiamond");
- tearDown();
- setup();
int startingTimeout = 0;
List<String> nodesToFlowTo = new LinkedList<String>(Arrays.asList(new String[] { "1000", "2000", "1000", "3000", "1000", "2000", "3000" }));
doRecursiveTransactionalWork(startingTimeout, nodesToFlowTo, false, false);
@@ -796,22 +777,21 @@
Transaction originalTransaction = transactionManager.getTransaction();
Xid currentXid = originalServer.getCurrentXid();
originalServer.storeRootTransaction();
- originalTransaction.enlistResource(new TestResource(originalServer.getCompletionCounter(), originalServer.getNodeName(), false));
- XAResource proxyXAResource = originalServer.generateProxyXAResource(lookupProvider, "2000");
+ originalTransaction.enlistResource(new TestResource(originalServer.getNodeName(), false));
transactionManager.suspend();
// Migrate a transaction
LocalServer currentServer = getLocalServer("2000");
ClassLoader parentsClassLoader = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(currentServer.getClass().getClassLoader());
- currentServer.getAndResumeTransaction(subordinateTimeout, currentXid, 2000);
- currentServer.getTransactionManager().getTransaction()
- .enlistResource(new TestResource(currentServer.getCompletionCounter(), currentServer.getNodeName(), false));
+ Xid migratedXid = currentServer.getAndResumeTransaction(subordinateTimeout, currentXid);
+ currentServer.getTransactionManager().getTransaction().enlistResource(new TestResource(currentServer.getNodeName(), false));
currentServer.getTransactionManager().suspend();
Thread.currentThread().setContextClassLoader(parentsClassLoader);
// Complete the transaction at the original server
transactionManager.resume(originalTransaction);
+ XAResource proxyXAResource = originalServer.generateProxyXAResource(lookupProvider, "2000", migratedXid);
originalTransaction.enlistResource(proxyXAResource);
originalServer.removeRootTransaction(currentXid);
Thread.currentThread().sleep((subordinateTimeout + 1) * 1000);
@@ -823,11 +803,10 @@
} finally {
Thread.currentThread().setContextClassLoader(classLoader);
}
- assertTrue(getLocalServer("2000").getCompletionCounter().getRollbackCount() == 1);
- assertTrue(getLocalServer("1000").getCompletionCounter().getRollbackCount() == 2);
+ assertTrue(completionCounter.getRollbackCount("2000") == 1);
+ assertTrue(completionCounter.getRollbackCount("1000") == 2);
}
-
-
+
@Test
public void testTransactionReaperIsCleanedUp() throws Exception {
System.out.println("testTransactionReaperIsCleanedUp");
@@ -843,8 +822,7 @@
Transaction originalTransaction = transactionManager.getTransaction();
Xid currentXid = originalServer.getCurrentXid();
originalServer.storeRootTransaction();
- originalTransaction.enlistResource(new TestResource(originalServer.getCompletionCounter(), originalServer.getNodeName(), false));
- XAResource proxyXAResource = originalServer.generateProxyXAResource(lookupProvider, "2000");
+ originalTransaction.enlistResource(new TestResource(originalServer.getNodeName(), false));
int subordinateTimeout = (int) (originalServer.getTimeLeftBeforeTransactionTimeout() / 1000);
transactionManager.suspend();
@@ -852,23 +830,23 @@
LocalServer currentServer = getLocalServer("2000");
ClassLoader parentsClassLoader = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(currentServer.getClass().getClassLoader());
- currentServer.getAndResumeTransaction(subordinateTimeout, currentXid, 2000);
- currentServer.getTransactionManager().getTransaction()
- .enlistResource(new TestResource(currentServer.getCompletionCounter(), currentServer.getNodeName(), false));
+ Xid migratedXid = currentServer.getAndResumeTransaction(subordinateTimeout, currentXid);
+ currentServer.getTransactionManager().getTransaction().enlistResource(new TestResource(currentServer.getNodeName(), false));
currentServer.getTransactionManager().suspend();
Thread.currentThread().setContextClassLoader(parentsClassLoader);
// Complete the transaction at the original server
transactionManager.resume(originalTransaction);
+ XAResource proxyXAResource = originalServer.generateProxyXAResource(lookupProvider, "2000", migratedXid);
originalTransaction.enlistResource(proxyXAResource);
originalServer.removeRootTransaction(currentXid);
transactionManager.commit();
Thread.currentThread().setContextClassLoader(classLoader);
- assertTrue(getLocalServer("2000").getCompletionCounter().getCommitCount() == 1);
- assertTrue(getLocalServer("1000").getCompletionCounter().getCommitCount() == 2);
- assertTrue(getLocalServer("2000").getCompletionCounter().getRollbackCount() == 0);
- assertTrue(getLocalServer("1000").getCompletionCounter().getRollbackCount() == 0);
-
+ assertTrue(completionCounter.getCommitCount("2000") == 1);
+ assertTrue(completionCounter.getCommitCount("1000") == 2);
+ assertTrue(completionCounter.getRollbackCount("2000") == 0);
+ assertTrue(completionCounter.getRollbackCount("1000") == 0);
+
Thread.currentThread().sleep((subordinateTimeout + 4) * 1000);
}
@@ -876,39 +854,16 @@
tearDown();
setup();
- // Start out at the first server
- CompletionCounter counter = new CompletionCounter() {
- private int commitCount = 0;
- private int rollbackCount = 0;
-
- @Override
- public void incrementCommit() {
- commitCount++;
-
+ List<String> uniqueServers = new ArrayList<String>();
+ Iterator<String> iterator = nodesToFlowTo.iterator();
+ while (iterator.hasNext()) {
+ String intern = iterator.next().intern();
+ if (!uniqueServers.contains(intern)) {
+ uniqueServers.add(intern);
}
-
- @Override
- public void incrementRollback() {
- rollbackCount++;
- }
-
- @Override
- public int getCommitCount() {
- return commitCount;
- }
-
- @Override
- public int getRollbackCount() {
- return rollbackCount;
- }
-
- @Override
- public void resetCounters() {
- commitCount = 0;
- rollbackCount = 0;
- }
- };
- int totalNodeCount = nodesToFlowTo.size();
+ }
+ // Start out at the first server
+ int totalCompletionCount = nodesToFlowTo.size() + uniqueServers.size() - 1;
String startingServer = nodesToFlowTo.get(0);
LocalServer originalServer = getLocalServer(startingServer);
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
@@ -921,8 +876,8 @@
Xid currentXid = originalServer.getCurrentXid();
originalServer.storeRootTransaction();
transactionManager.suspend();
- DataReturnedFromRemoteServer dataReturnedFromRemoteServer = performTransactionalWork(counter, nodesToFlowTo, remainingTimeout, currentXid, 1, true,
- rollbackOnlyOnLastNode, 1);
+ DataReturnedFromRemoteServer dataReturnedFromRemoteServer = performTransactionalWork(nodesToFlowTo, remainingTimeout, currentXid, 1, true,
+ rollbackOnlyOnLastNode);
transactionManager.resume(transaction);
originalServer.removeRootTransaction(currentXid);
@@ -947,32 +902,30 @@
if (commit) {
try {
transactionManager.commit();
- assertTrue(counter.getCommitCount() == totalNodeCount);
+ assertTrue(completionCounter.getTotalCommitCount() == totalCompletionCount);
} catch (RollbackException e) {
if (!rollbackOnlyOnLastNode) {
- assertTrue(counter.getRollbackCount() == totalNodeCount);
+ assertTrue(completionCounter.getTotalRollbackCount() == totalCompletionCount);
}
}
} else {
transactionManager.rollback();
- assertTrue(counter.getRollbackCount() == totalNodeCount);
+ assertTrue(completionCounter.getTotalRollbackCount() == totalCompletionCount);
}
Thread.currentThread().setContextClassLoader(classLoader);
}
- private DataReturnedFromRemoteServer performTransactionalWork(CompletionCounter counter, List<String> nodesToFlowTo, int remainingTimeout, Xid toMigrate,
- int numberOfResourcesToRegister, boolean addSynchronization, boolean rollbackOnlyOnLastNode, int nextAvailableSubordinateName)
- throws RollbackException, IllegalStateException, XAException, SystemException, NotSupportedException, IOException {
+ private DataReturnedFromRemoteServer performTransactionalWork(List<String> nodesToFlowTo, int remainingTimeout, Xid toMigrate,
+ int numberOfResourcesToRegister, boolean addSynchronization, boolean rollbackOnlyOnLastNode) throws RollbackException, IllegalStateException,
+ XAException, SystemException, NotSupportedException, IOException {
String currentServerName = nodesToFlowTo.remove(0);
LocalServer currentServer = getLocalServer(currentServerName);
+ System.out.println("Flowed to " + currentServerName);
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(currentServer.getClass().getClassLoader());
- boolean requiresProxyAtPreviousServer = !currentServer.getAndResumeTransaction(remainingTimeout, toMigrate, nextAvailableSubordinateName);
- if (requiresProxyAtPreviousServer) {
- nextAvailableSubordinateName++;
- }
+ Xid requiresProxyAtPreviousServer = currentServer.getAndResumeTransaction(remainingTimeout, toMigrate);
// Perform work on the migrated transaction
{
@@ -982,7 +935,7 @@
transaction.registerSynchronization(new TestSynchronization(currentServer.getNodeName()));
}
for (int i = 0; i < numberOfResourcesToRegister; i++) {
- transaction.enlistResource(new TestResource(counter, currentServer.getNodeName(), false));
+ transaction.enlistResource(new TestResource(currentServer.getNodeName(), false));
}
if (rollbackOnlyOnLastNode && nodesToFlowTo.isEmpty()) {
@@ -1007,11 +960,10 @@
// STORE AND SUSPEND THE TRANSACTION
Xid currentXid = currentServer.getCurrentXid();
- XAResource proxyXAResource = currentServer.generateProxyXAResource(lookupProvider, nodesToFlowTo.get(0));
transactionManager.suspend();
- DataReturnedFromRemoteServer dataReturnedFromRemoteServer = performTransactionalWork(counter, nodesToFlowTo, remainingTimeout, currentXid,
- numberOfResourcesToRegister, addSynchronization, rollbackOnlyOnLastNode, nextAvailableSubordinateName);
+ DataReturnedFromRemoteServer dataReturnedFromRemoteServer = performTransactionalWork(nodesToFlowTo, remainingTimeout, currentXid,
+ numberOfResourcesToRegister, addSynchronization, rollbackOnlyOnLastNode);
transactionManager.resume(transaction);
// Create a proxy for the new server if necessary, this can
@@ -1022,13 +974,12 @@
// The alternative is to always create a proxy but this is a
// performance drain and will result in multiple subordinate
// transactions and performance issues
- if (dataReturnedFromRemoteServer.isProxyRequired()) {
+ if (dataReturnedFromRemoteServer.getProxyRequired() != null) {
+ XAResource proxyXAResource = currentServer.generateProxyXAResource(lookupProvider, nextServerNodeName,
+ dataReturnedFromRemoteServer.getProxyRequired());
transaction.enlistResource(proxyXAResource);
transaction.registerSynchronization(currentServer.generateProxySynchronization(lookupProvider, currentServer.getNodeName(),
nextServerNodeName, toMigrate));
- nextAvailableSubordinateName = dataReturnedFromRemoteServer.getNextAvailableSubordinateName();
- } else {
- currentServer.cleanupProxyXAResource(proxyXAResource);
}
// Align the local state with the returning state of the
@@ -1057,7 +1008,8 @@
// Return to the previous caller back over the transport/classloader
// boundary in this case
Thread.currentThread().setContextClassLoader(classLoader);
- return new DataReturnedFromRemoteServer(requiresProxyAtPreviousServer, transactionState, nextAvailableSubordinateName);
+ System.out.println("Flowed from " + currentServerName);
+ return new DataReturnedFromRemoteServer(requiresProxyAtPreviousServer, transactionState);
}
private static LocalServer getLocalServer(String jndiName) {
@@ -1065,16 +1017,6 @@
return localServers[index];
}
- private static class MyLookupProvider implements LookupProvider {
-
- @Override
- public RemoteServer lookup(String jndiName) {
- int index = (Integer.valueOf(jndiName) / 1000) - 1;
- return remoteServers[index];
- }
-
- }
-
private class Phase2CommitAborted {
private int phase2CommitAborted;
@@ -1092,28 +1034,21 @@
* instances.
*/
private class DataReturnedFromRemoteServer {
- private boolean proxyRequired;
+ private Xid proxyRequired;
private int transactionState;
- private Integer nextAvailableSubordinateName;
-
- public DataReturnedFromRemoteServer(boolean proxyRequired, int transactionState, Integer nextAvailableSubordinateName) {
+ public DataReturnedFromRemoteServer(Xid proxyRequired, int transactionState) {
this.proxyRequired = proxyRequired;
this.transactionState = transactionState;
- this.nextAvailableSubordinateName = nextAvailableSubordinateName;
}
- public boolean isProxyRequired() {
+ public Xid getProxyRequired() {
return proxyRequired;
}
public int getTransactionState() {
return transactionState;
}
-
- public Integer getNextAvailableSubordinateName() {
- return nextAvailableSubordinateName;
- }
}
}
Modified: labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/TestResource.java
===================================================================
--- labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/TestResource.java 2011-11-30 10:32:11 UTC (rev 37746)
+++ labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/TestResource.java 2011-11-30 16:48:58 UTC (rev 37747)
@@ -33,8 +33,11 @@
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
+import org.jboss.tm.XAResourceWrapper;
+
import com.arjuna.ats.arjuna.common.Uid;
import com.arjuna.ats.jta.distributed.server.CompletionCounter;
+import com.arjuna.ats.jta.distributed.server.CompletionCounterImpl;
public class TestResource implements XAResource {
private Xid xid;
@@ -49,14 +52,14 @@
private CompletionCounter completionCounter;
- public TestResource(CompletionCounter completionCounter, String serverId, boolean readonly) {
- this.completionCounter = completionCounter;
+ public TestResource(String serverId, boolean readonly) {
+ this.completionCounter = CompletionCounterImpl.getCompletionCounter();
this.serverId = serverId;
this.readonly = readonly;
}
- public TestResource(CompletionCounter completionCounter, String serverId, File file) throws IOException {
- this.completionCounter = completionCounter;
+ public TestResource(String serverId, File file) throws IOException {
+ this.completionCounter = CompletionCounterImpl.getCompletionCounter();
this.serverId = serverId;
this.file = file;
DataInputStream fis = new DataInputStream(new FileInputStream(file));
@@ -88,7 +91,8 @@
}
/**
- * This class declares that it throws an Error *purely for byteman* so that we can crash the resource during this method:
+ * This class declares that it throws an Error *purely for byteman* so that
+ * we can crash the resource during this method:
* https://issues.jboss.org/browse/BYTEMAN-156
* https://issues.jboss.org/browse/BYTEMAN-175
*/
@@ -129,9 +133,7 @@
public synchronized void commit(Xid id, boolean onePhase) throws XAException {
System.out.println(" TestResource (" + serverId + ") XA_COMMIT [" + id + "]");
- if (completionCounter != null) {
- completionCounter.incrementCommit();
- }
+ completionCounter.incrementCommit(serverId);
// String absoluteFile = file.getAbsolutePath();
// String newName = absoluteFile.substring(0, absoluteFile.length() -
// 1);
@@ -147,9 +149,7 @@
public synchronized void rollback(Xid xid) throws XAException {
System.out.println(" TestResource (" + serverId + ") XA_ROLLBACK[" + xid + "]");
- if (completionCounter != null) {
- completionCounter.incrementRollback();
- }
+ completionCounter.incrementRollback(serverId);
if (file != null) {
if (!file.delete()) {
throw new XAException(XAException.XA_RETRY);
Modified: labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/TestResourceRecovery.java
===================================================================
--- labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/TestResourceRecovery.java 2011-11-30 10:32:11 UTC (rev 37746)
+++ labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/TestResourceRecovery.java 2011-11-30 16:48:58 UTC (rev 37747)
@@ -37,7 +37,7 @@
private List<TestResource> resources = new ArrayList<TestResource>();
private String nodeName;
- public TestResourceRecovery(CompletionCounter counter, String nodeName) throws IOException {
+ public TestResourceRecovery(String nodeName) throws IOException {
this.nodeName = nodeName;
System.out.println(nodeName + " asked to recover TestResource");
File file = new File(System.getProperty("user.dir") + "/distributedjta-tests/TestResource/" + nodeName + "/");
@@ -46,7 +46,7 @@
for (int i = 0; i < listFiles.length; i++) {
File currentFile = listFiles[i];
if (currentFile.getAbsolutePath().endsWith("_")) {
- resources.add(new TestResource(counter, nodeName, currentFile));
+ resources.add(new TestResource(nodeName, currentFile));
}
}
}
Modified: labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/CompletionCounter.java
===================================================================
--- labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/CompletionCounter.java 2011-11-30 10:32:11 UTC (rev 37746)
+++ labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/CompletionCounter.java 2011-11-30 16:48:58 UTC (rev 37747)
@@ -22,13 +22,17 @@
package com.arjuna.ats.jta.distributed.server;
public interface CompletionCounter {
- public void incrementCommit();
+ public void incrementCommit(String nodeName);
- public void incrementRollback();
+ public void incrementRollback(String nodeName);
- int getCommitCount();
+ int getCommitCount(String nodeName);
- int getRollbackCount();
+ int getRollbackCount(String nodeName);
- void resetCounters();
+ public int getTotalCommitCount();
+
+ public int getTotalRollbackCount();
+
+ public void reset();
}
Added: labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/CompletionCounterImpl.java
===================================================================
--- labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/CompletionCounterImpl.java (rev 0)
+++ labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/CompletionCounterImpl.java 2011-11-30 16:48:58 UTC (rev 37747)
@@ -0,0 +1,88 @@
+package com.arjuna.ats.jta.distributed.server;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+
+public class CompletionCounterImpl implements CompletionCounter {
+
+ private static CompletionCounter instance;
+
+ private Map<String, Integer> commitCounter = new HashMap<String, Integer>();
+ private Map<String, Integer> rollbackCounter = new HashMap<String, Integer>();
+
+ public static CompletionCounter getCompletionCounter() {
+ if (instance == null) {
+ instance = new CompletionCounterImpl();
+ }
+ return instance;
+ }
+
+ @Override
+ public void incrementCommit(String nodeName) {
+ Integer integer = commitCounter.get(nodeName);
+ if (integer == null) {
+ integer = new Integer(1);
+ } else {
+ integer = new Integer(integer.intValue() + 1);
+ }
+ commitCounter.put(nodeName, integer);
+
+ }
+
+ @Override
+ public void incrementRollback(String nodeName) {
+ Integer integer = rollbackCounter.get(nodeName);
+ if (integer == null) {
+ integer = new Integer(1);
+ } else {
+ integer = new Integer(integer.intValue() + 1);
+ }
+ rollbackCounter.put(nodeName, integer);
+ }
+
+ @Override
+ public int getCommitCount(String nodeName) {
+ Integer integer = commitCounter.get(nodeName);
+ if (integer == null) {
+ integer = new Integer(0);
+ }
+ return integer;
+ }
+
+ @Override
+ public int getRollbackCount(String nodeName) {
+ Integer integer = rollbackCounter.get(nodeName);
+ if (integer == null) {
+ integer = new Integer(0);
+ }
+ return integer;
+ }
+
+ @Override
+ public int getTotalCommitCount() {
+ Integer toReturn = 0;
+ Iterator<Integer> iterator = commitCounter.values().iterator();
+ while (iterator.hasNext()) {
+ toReturn += iterator.next();
+ }
+ return toReturn;
+ }
+
+ @Override
+ public int getTotalRollbackCount() {
+ Integer toReturn = 0;
+ Iterator<Integer> iterator = rollbackCounter.values().iterator();
+ while (iterator.hasNext()) {
+ toReturn += iterator.next();
+ }
+ return toReturn;
+ }
+
+ @Override
+ public void reset() {
+ commitCounter.clear();
+ rollbackCounter.clear();
+ }
+}
Property changes on: labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/CompletionCounterImpl.java
___________________________________________________________________
Added: svn:executable
+ *
Modified: labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/IsolatableServersClassLoader.java
===================================================================
--- labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/IsolatableServersClassLoader.java 2011-11-30 10:32:11 UTC (rev 37746)
+++ labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/IsolatableServersClassLoader.java 2011-11-30 16:48:58 UTC (rev 37747)
@@ -66,24 +66,33 @@
if (clazzMap.containsKey(name)) {
clazz = clazzMap.get(name);
}
- if (!name.startsWith("com.arjuna") || (ignoredPackage != null && name.matches(ignoredPackage + ".[A-Za-z0-9]*"))) {
- clazz = super.loadClass(name);
+
+ if (clazz != null) {
+ System.err.println("Already loaded: " + name);
} else {
+// if (name.contains("BasicAction")) {
+// System.err.println(name);
+// }
- String path = name.replace('.', '/').concat(".class");
- Resource res = ucp.getResource(path, false);
- if (res == null) {
- throw new ClassNotFoundException(name);
+ if (!name.startsWith("com.arjuna") || (ignoredPackage != null && name.matches(ignoredPackage + ".[A-Za-z0-9]*"))) {
+ clazz = super.loadClass(name);
+ } else {
+
+ String path = name.replace('.', '/').concat(".class");
+ Resource res = ucp.getResource(path, false);
+ if (res == null) {
+ throw new ClassNotFoundException(name);
+ }
+ try {
+ byte[] classData = res.getBytes();
+ clazz = defineClass(name, classData, 0, classData.length);
+ clazzMap.put(name, clazz);
+ } catch (IOException e) {
+ throw new ClassNotFoundException(name, e);
+ }
}
- try {
- byte[] classData = res.getBytes();
- clazz = defineClass(name, classData, 0, classData.length);
- clazzMap.put(name, clazz);
- } catch (IOException e) {
- throw new ClassNotFoundException(name, e);
- }
+
}
-
return clazz;
}
}
\ No newline at end of file
Modified: labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/LocalServer.java
===================================================================
--- labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/LocalServer.java 2011-11-30 10:32:11 UTC (rev 37746)
+++ labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/LocalServer.java 2011-11-30 16:48:58 UTC (rev 37747)
@@ -21,7 +21,6 @@
*/
package com.arjuna.ats.jta.distributed.server;
-import java.io.File;
import java.io.IOException;
import javax.transaction.InvalidTransactionException;
@@ -38,7 +37,7 @@
public interface LocalServer {
- public void initialise(LookupProvider lookupProvider, String nodeName, int portOffset) throws CoreEnvironmentBeanException, IOException, SecurityException,
+ public void initialise(LookupProvider lookupProvider, String nodeName, int portOffset, String[] clusterBuddies) throws CoreEnvironmentBeanException, IOException, SecurityException,
NoSuchFieldException, IllegalArgumentException, IllegalAccessException;
public String getNodeName();
@@ -53,20 +52,16 @@
public void removeRootTransaction(Xid toMigrate);
- public boolean getAndResumeTransaction(int remainingTimeout, Xid toImport, Integer nextAvailableSubordinateName) throws XAException, InvalidTransactionException, IllegalStateException,
+ public Xid getAndResumeTransaction(int remainingTimeout, Xid toImport) throws XAException, InvalidTransactionException, IllegalStateException,
SystemException, IOException;
public RemoteServer connectTo();
- public XAResource generateProxyXAResource(LookupProvider lookupProvider, String remoteServerName) throws SystemException, IOException;
+ public XAResource generateProxyXAResource(LookupProvider lookupProvider, String remoteServerName, Xid xid) throws SystemException, IOException;
- public void cleanupProxyXAResource(XAResource proxyXAResource);
-
public Synchronization generateProxySynchronization(LookupProvider lookupProvider, String localServerName, String remoteServerName, Xid toRegisterAgainst);
public Xid getCurrentXid() throws SystemException;
- public CompletionCounter getCompletionCounter();
-
public void shutdown() throws Exception;
}
Modified: labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/LookupProvider.java
===================================================================
--- labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/LookupProvider.java 2011-11-30 10:32:11 UTC (rev 37746)
+++ labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/LookupProvider.java 2011-11-30 16:48:58 UTC (rev 37747)
@@ -23,4 +23,8 @@
public interface LookupProvider {
public RemoteServer lookup(String jndiName);
+
+ public void clear();
+
+ public void bind(int index, RemoteServer connectTo);
}
Added: labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/LookupProviderImpl.java
===================================================================
--- labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/LookupProviderImpl.java (rev 0)
+++ labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/LookupProviderImpl.java 2011-11-30 16:48:58 UTC (rev 37747)
@@ -0,0 +1,42 @@
+package com.arjuna.ats.jta.distributed.server;
+
+
+public class LookupProviderImpl implements LookupProvider {
+ static {
+ System.out.println("Loaded the provider");
+ }
+
+ private static LookupProviderImpl instance;
+
+ private RemoteServer[] remoteServers = new RemoteServer[3];
+
+ public static LookupProvider getLookupProvider() {
+ if (instance == null) {
+ instance = new LookupProviderImpl();
+ }
+ return instance;
+ }
+
+ public LookupProviderImpl() {
+ System.out.println("Created the provider");
+ }
+
+ @Override
+ public RemoteServer lookup(String jndiName) {
+ int index = (Integer.valueOf(jndiName) / 1000) - 1;
+ return remoteServers[index];
+ }
+
+ @Override
+ public void clear() {
+ for (int i = 0; i < remoteServers.length; i++) {
+ // Disconnect
+ remoteServers[i] = null;
+ }
+ }
+
+ @Override
+ public void bind(int index, RemoteServer connectTo) {
+ remoteServers[index] = connectTo;
+ }
+}
Property changes on: labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/LookupProviderImpl.java
___________________________________________________________________
Added: svn:executable
+ *
Modified: labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/RemoteServer.java
===================================================================
--- labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/RemoteServer.java 2011-11-30 10:32:11 UTC (rev 37746)
+++ labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/RemoteServer.java 2011-11-30 16:48:58 UTC (rev 37747)
@@ -29,16 +29,16 @@
public interface RemoteServer {
- public void setOffline(boolean offline);
+ public int prepare(Xid xid, boolean recover) throws XAException, IOException;
- public int prepare(Xid xid) throws XAException;
-
public void commit(Xid xid, boolean onePhase, boolean recover) throws XAException, IOException;
public void rollback(Xid xid, boolean recover) throws XAException, IOException;
- public void forget(Xid xid) throws XAException, IOException;
+ public void forget(Xid xid, boolean recover) throws XAException, IOException;
public void beforeCompletion(Xid xid) throws XAException, SystemException;
+ public Xid[] recoverFor(String localServerName) throws XAException;
+
}
Modified: labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/impl/ProxyXAResource.java
===================================================================
--- labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/impl/ProxyXAResource.java 2011-11-30 10:32:11 UTC (rev 37746)
+++ labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/impl/ProxyXAResource.java 2011-11-30 16:48:58 UTC (rev 37747)
@@ -21,12 +21,8 @@
*/
package com.arjuna.ats.jta.distributed.server.impl;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
+import java.io.Serializable;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
@@ -34,65 +30,53 @@
import org.jboss.tm.XAResourceWrapper;
-import com.arjuna.ats.arjuna.common.Uid;
-import com.arjuna.ats.jta.distributed.server.CompletionCounter;
-import com.arjuna.ats.jta.distributed.server.LookupProvider;
+import com.arjuna.ats.jta.distributed.server.CompletionCounterImpl;
+import com.arjuna.ats.jta.distributed.server.LookupProviderImpl;
/**
* I chose for this class to implement XAResourceWrapper so that I can provide a
* name to the Transaction manager for it to store in its XID.
*/
-public class ProxyXAResource implements XAResource, XAResourceWrapper {
+public class ProxyXAResource implements XAResource, XAResourceWrapper, Serializable {
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
private int transactionTimeout;
private String remoteServerName;
- private Map<Xid, File> map;
private String localServerName;
- private LookupProvider lookupProvider;
- private CompletionCounter completionCounter;
- private File file;
- private boolean recover;
+ private transient boolean nonerecovered;
+ private Xid migratedXid;
+
/**
* Create a new proxy to the remote server.
*
- * @param lookupProvider
+ * @param LookupProviderImpl
+ * .getLookupProvider()
* @param localServerName
* @param remoteServerName
*/
- public ProxyXAResource(CompletionCounter completionCounter, LookupProvider lookupProvider, String localServerName, String remoteServerName, File file) {
- this.completionCounter = completionCounter;
- this.lookupProvider = lookupProvider;
+ public ProxyXAResource(String localServerName, String remoteServerName, Xid migratedXid) {
this.localServerName = localServerName;
this.remoteServerName = remoteServerName;
- this.file = file;
- map = new HashMap<Xid, File>();
+ this.migratedXid = migratedXid;
+ this.nonerecovered = true;
}
/**
- * Used by recovery
+ * Constructor for fallback bottom up recovery.
*
- * @param lookupProvider
* @param localServerName
- * @param map
* @param remoteServerName
- * @param file
- * @throws IOException
*/
- public ProxyXAResource(CompletionCounter completionCounter, LookupProvider lookupProvider, String localServerName, String remoteServerName,
- Map<Xid, File> map) throws IOException {
- this.completionCounter = completionCounter;
- this.lookupProvider = lookupProvider;
+ public ProxyXAResource(String localServerName, String remoteServerName) {
this.localServerName = localServerName;
this.remoteServerName = remoteServerName;
- this.map = map;
- this.recover = true;
}
- public void deleteTemporaryFile() {
- this.file.delete();
- }
-
/**
* Store the XID.
*/
@@ -118,80 +102,42 @@
public synchronized int prepare(Xid xid) throws XAException {
System.out.println(" ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_PREPARE [" + xid + "]");
- // Persist a proxy for the remote server this can mean we try to recover
- // a transaction at a remote server that did not get chance to
- // prepare but the alternative is to orphan a prepared server
-
+ Xid toPropagate = migratedXid != null ? migratedXid : xid;
try {
- File dir = new File(System.getProperty("user.dir") + "/distributedjta-tests/ProxyXAResource/" + localServerName + "/");
- dir.mkdirs();
- File file = new File(dir, new Uid().fileStringForm());
- file.createNewFile();
- DataOutputStream fos = new DataOutputStream(new FileOutputStream(file));
- byte[] remoteServerNameBytes = remoteServerName.getBytes();
- fos.writeInt(remoteServerNameBytes.length);
- fos.write(remoteServerNameBytes);
- fos.writeInt(xid.getFormatId());
- fos.writeInt(xid.getGlobalTransactionId().length);
- fos.write(xid.getGlobalTransactionId());
- fos.writeInt(xid.getBranchQualifier().length);
- fos.write(xid.getBranchQualifier());
-
- if (map.containsKey(xid)) {
- System.out.println(map.get(xid));
- map.remove(xid).delete();
- }
- if (this.file != null) {
- if (!this.file.delete()) {
- throw new XAException();
- }
- }
- fos.flush();
- fos.close();
-
- map.put(xid, file);
+ int propagatePrepare = LookupProviderImpl.getLookupProvider().lookup(remoteServerName).prepare(toPropagate, !nonerecovered);
+ System.out.println(" ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_PREPARED");
+ return propagatePrepare;
} catch (IOException e) {
e.printStackTrace();
- throw new XAException(XAException.XAER_RMERR);
+ throw new XAException(XAException.XA_RETRY);
}
-
- int propagatePrepare = lookupProvider.lookup(remoteServerName).prepare(xid);
- System.out.println(" ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_PREPARED");
- return propagatePrepare;
}
@Override
public synchronized void commit(Xid xid, boolean onePhase) throws XAException {
System.out.println(" ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_COMMIT [" + xid + "]");
+ Xid toPropagate = migratedXid != null ? migratedXid : xid;
try {
- lookupProvider.lookup(remoteServerName).commit(xid, onePhase, recover);
+ LookupProviderImpl.getLookupProvider().lookup(remoteServerName).commit(toPropagate, onePhase, !nonerecovered);
} catch (IOException e) {
e.printStackTrace();
throw new XAException(XAException.XA_RETRY);
}
System.out.println(" ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_COMMITED");
- if (map.get(xid) != null) {
- map.get(xid).delete();
- map.remove(xid);
- }
-
// THIS CAN ONLY HAPPEN IN 1PC OR ROLLBACK
- if (file != null) {
- file.delete();
- }
- if (completionCounter != null) {
- completionCounter.incrementCommit();
- }
+ CompletionCounterImpl.getCompletionCounter().incrementCommit(localServerName);
+
}
@Override
public synchronized void rollback(Xid xid) throws XAException {
System.out.println(" ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_ROLLBACK[" + xid + "]");
+ Xid toPropagate = migratedXid != null ? migratedXid : xid;
try {
- lookupProvider.lookup(remoteServerName).rollback(xid, recover);
+ LookupProviderImpl.getLookupProvider().lookup(remoteServerName).rollback(toPropagate, !nonerecovered);
System.out.println(" ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_ROLLBACKED");
} catch (XAException e) {
// We know the remote side must have done a JBTM-917
@@ -204,18 +150,7 @@
throw new XAException(XAException.XA_RETRY);
}
- if (map.get(xid) != null) {
- map.get(xid).delete();
- map.remove(xid);
- }
-
- // THIS CAN ONLY HAPPEN IN 1PC OR ROLLBACK
- if (file != null) {
- file.delete();
- }
- if (completionCounter != null) {
- completionCounter.incrementRollback();
- }
+ CompletionCounterImpl.getCompletionCounter().incrementRollback(localServerName);
}
/**
@@ -231,38 +166,37 @@
@Override
public Xid[] recover(int flag) throws XAException {
if ((flag & XAResource.TMSTARTRSCAN) == XAResource.TMSTARTRSCAN) {
- System.out.println(" ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_RECOVER [XAResource.TMSTARTRSCAN]: "
- + remoteServerName);
+ System.out.println(" ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_RECOVER [XAResource.TMSTARTRSCAN]");
}
if ((flag & XAResource.TMENDRSCAN) == XAResource.TMENDRSCAN) {
- System.out.println(" ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_RECOVER [XAResource.TMENDRSCAN]: "
- + remoteServerName);
+ System.out.println(" ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_RECOVER [XAResource.TMENDRSCAN]");
}
- // Only done to wake up the other side - wonder what will happen
- // try {
- // lookupProvider.lookup(remoteServerName).recover(map.keySet().toArray(new
- // Xid[0]));
- // } catch (IOException e) {
- // throw new XAException(XAException.XA_RETRY);
- // }
+ Xid[] toReturn = LookupProviderImpl.getLookupProvider().lookup(remoteServerName).recoverFor(localServerName);
+ if (toReturn != null) {
+ for (int i = 0; i < toReturn.length; i++) {
+ System.out.println(" ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_RECOVERD: " + toReturn[i]);
+ }
+ }
+
if ((flag & XAResource.TMSTARTRSCAN) == XAResource.TMSTARTRSCAN) {
- System.out.println(" ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_RECOVERD[XAResource.TMSTARTRSCAN]: "
- + remoteServerName);
+ System.out.println(" ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_RECOVERD[XAResource.TMSTARTRSCAN]");
}
if ((flag & XAResource.TMENDRSCAN) == XAResource.TMENDRSCAN) {
- System.out.println(" ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_RECOVERD[XAResource.TMENDRSCAN]: "
- + remoteServerName);
+ System.out.println(" ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_RECOVERD[XAResource.TMENDRSCAN]");
}
- return map.keySet().toArray(new Xid[0]);
+// return null;
+ return toReturn;
}
@Override
public void forget(Xid xid) throws XAException {
System.out.println(" ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_FORGET [" + xid + "]");
+
+ Xid toPropagate = migratedXid != null ? migratedXid : xid;
try {
- lookupProvider.lookup(remoteServerName).forget(xid);
+ LookupProviderImpl.getLookupProvider().lookup(remoteServerName).forget(toPropagate, !nonerecovered);
} catch (IOException e) {
e.printStackTrace();
throw new XAException(XAException.XA_RETRY);
Modified: labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/impl/ProxyXAResourceRecovery.java
===================================================================
--- labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/impl/ProxyXAResourceRecovery.java 2011-11-30 10:32:11 UTC (rev 37746)
+++ labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/impl/ProxyXAResourceRecovery.java 2011-11-30 16:48:58 UTC (rev 37747)
@@ -21,90 +21,26 @@
*/
package com.arjuna.ats.jta.distributed.server.impl;
-import java.io.DataInputStream;
-import java.io.File;
-import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
import java.util.List;
-import java.util.Map;
import javax.transaction.xa.XAResource;
-import javax.transaction.xa.Xid;
import org.jboss.tm.XAResourceRecovery;
-import com.arjuna.ats.jta.distributed.server.CompletionCounter;
-import com.arjuna.ats.jta.distributed.server.LookupProvider;
-import com.arjuna.ats.jta.xa.XidImple;
-
public class ProxyXAResourceRecovery implements XAResourceRecovery {
private List<ProxyXAResource> resources = new ArrayList<ProxyXAResource>();
- private String nodeName;
- public ProxyXAResourceRecovery(CompletionCounter counter, LookupProvider lookupProvider, String nodeName) throws IOException {
- this.nodeName = nodeName;
- System.out.println(nodeName + " asked to recover ProxyXAResources");
- File directory = new File(System.getProperty("user.dir") + "/distributedjta-tests/ProxyXAResource/" + nodeName + "/");
- Map<String, Map<Xid, File>> savedData = new HashMap<String, Map<Xid, File>>();
- if (directory.exists() && directory.isDirectory()) {
- File[] listFiles = directory.listFiles();
- for (int i = 0; i < listFiles.length; i++) {
- File file = listFiles[i];
- DataInputStream fis = new DataInputStream(new FileInputStream(file));
- int remoteServerNameLength = fis.readInt();
- final byte[] remoteServerNameBytes = new byte[remoteServerNameLength];
- fis.read(remoteServerNameBytes, 0, remoteServerNameLength);
- String remoteServerName = new String(remoteServerNameBytes);
-
-
- Map<Xid, File> map = savedData.get(remoteServerName);
- if (map == null) {
- map = new HashMap<Xid, File>();
- savedData.put(remoteServerName, map);
- }
- final int formatId = fis.readInt();
- int gtrid_length = fis.readInt();
- final byte[] gtrid = new byte[gtrid_length];
- fis.read(gtrid, 0, gtrid_length);
-
- int bqual_length = fis.readInt();
- final byte[] bqual = new byte[bqual_length];
- fis.read(bqual, 0, bqual_length);
- Xid xid = new XidImple(new Xid() {
- @Override
- public byte[] getBranchQualifier() {
- return bqual;
- }
-
- @Override
- public int getFormatId() {
- return formatId;
- }
-
- @Override
- public byte[] getGlobalTransactionId() {
- return gtrid;
- }
- });
- fis.close();
- map.put(xid, file);
- }
+ public ProxyXAResourceRecovery(String nodeName, String[] toRecoverFor) throws IOException {
+ for (int i = 0; i < toRecoverFor.length; i++) {
+ resources.add(new ProxyXAResource(nodeName, toRecoverFor[i]));
}
- Iterator<String> iterator = savedData.keySet().iterator();
- while (iterator.hasNext()) {
- String remoteServerName = iterator.next();
- Map<Xid, File> map = savedData.get(remoteServerName);
- resources.add(new ProxyXAResource(counter, lookupProvider, nodeName, remoteServerName, map));
- }
}
@Override
public XAResource[] getXAResources() {
- System.out.println(nodeName + "Returning list of ProxyXAResources of length: " + resources.size());
return resources.toArray(new XAResource[] {});
}
Modified: labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/impl/ServerImpl.java
===================================================================
--- labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/impl/ServerImpl.java 2011-11-30 10:32:11 UTC (rev 37746)
+++ labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/impl/ServerImpl.java 2011-11-30 16:48:58 UTC (rev 37747)
@@ -21,16 +21,10 @@
*/
package com.arjuna.ats.jta.distributed.server.impl;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
import java.io.IOException;
import java.lang.reflect.Field;
import java.net.InetAddress;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -41,7 +35,6 @@
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import javax.transaction.xa.XAException;
-import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import org.jboss.tm.TransactionTimeoutConfiguration;
@@ -51,7 +44,6 @@
import com.arjuna.ats.arjuna.common.CoreEnvironmentBeanException;
import com.arjuna.ats.arjuna.common.ObjectStoreEnvironmentBean;
import com.arjuna.ats.arjuna.common.RecoveryEnvironmentBean;
-import com.arjuna.ats.arjuna.common.Uid;
import com.arjuna.ats.arjuna.coordinator.TransactionReaper;
import com.arjuna.ats.arjuna.coordinator.TxControl;
import com.arjuna.ats.arjuna.recovery.RecoveryManager;
@@ -67,7 +59,6 @@
import com.arjuna.ats.jbossatx.jta.TransactionManagerService;
import com.arjuna.ats.jta.common.JTAEnvironmentBean;
import com.arjuna.ats.jta.distributed.TestResourceRecovery;
-import com.arjuna.ats.jta.distributed.server.CompletionCounter;
import com.arjuna.ats.jta.distributed.server.LocalServer;
import com.arjuna.ats.jta.distributed.server.LookupProvider;
import com.arjuna.ats.jta.distributed.server.RemoteServer;
@@ -79,46 +70,13 @@
private String nodeName;
private RecoveryManagerService recoveryManagerService;
private TransactionManagerService transactionManagerService;
- private boolean offline;
- private Map<SubordinateXidImple, TransactionImple> transactions = new HashMap<SubordinateXidImple, TransactionImple>();
+ private Map<SubordinateXidImple, TransactionImple> rootTransactionsAsSubordinate = new HashMap<SubordinateXidImple, TransactionImple>();
private RecoveryManager _recoveryManager;
- private CompletionCounter counter;
- public void initialise(LookupProvider lookupProvider, String nodeName, int portOffset) throws CoreEnvironmentBeanException, IOException, SecurityException,
- NoSuchFieldException, IllegalArgumentException, IllegalAccessException {
+ public void initialise(LookupProvider lookupProvider, String nodeName, int portOffset, String[] clusterBuddies) throws CoreEnvironmentBeanException,
+ IOException, SecurityException, NoSuchFieldException, IllegalArgumentException, IllegalAccessException {
this.nodeName = nodeName;
- this.counter = new CompletionCounter() {
- private int commitCount = 0;
- private int rollbackCount = 0;
- @Override
- public void incrementCommit() {
- commitCount++;
-
- }
-
- @Override
- public void incrementRollback() {
- rollbackCount++;
- }
-
- @Override
- public int getCommitCount() {
- return commitCount;
- }
-
- @Override
- public int getRollbackCount() {
- return rollbackCount;
- }
-
- @Override
- public void resetCounters() {
- commitCount = 0;
- rollbackCount = 0;
- }
- };
-
RecoveryEnvironmentBean recoveryEnvironmentBean = com.arjuna.ats.arjuna.common.recoveryPropertyManager.getRecoveryEnvironmentBean();
recoveryEnvironmentBean.setRecoveryBackoffPeriod(1);
@@ -129,7 +87,7 @@
List<String> recoveryModuleClassNames = new ArrayList<String>();
recoveryModuleClassNames.add("com.arjuna.ats.internal.arjuna.recovery.AtomicActionRecoveryModule");
-// recoveryModuleClassNames.add("com.arjuna.ats.internal.txoj.recovery.TORecoveryModule");
+ // recoveryModuleClassNames.add("com.arjuna.ats.internal.txoj.recovery.TORecoveryModule");
recoveryModuleClassNames.add("com.arjuna.ats.internal.jta.recovery.arjunacore.XARecoveryModule");
recoveryEnvironmentBean.setRecoveryModuleClassNames(recoveryModuleClassNames);
List<String> expiryScannerClassNames = new ArrayList<String>();
@@ -187,8 +145,8 @@
recoveryManagerService = new RecoveryManagerService();
recoveryManagerService.create();
- recoveryManagerService.addXAResourceRecovery(new ProxyXAResourceRecovery(counter, lookupProvider, nodeName));
- recoveryManagerService.addXAResourceRecovery(new TestResourceRecovery(counter, nodeName));
+ recoveryManagerService.addXAResourceRecovery(new ProxyXAResourceRecovery(nodeName, clusterBuddies));
+ recoveryManagerService.addXAResourceRecovery(new TestResourceRecovery(nodeName));
// recoveryManagerService.start();
_recoveryManager = RecoveryManager.manager();
@@ -250,23 +208,18 @@
}
@Override
- public boolean getAndResumeTransaction(int remainingTimeout, Xid toResume, Integer nextAvailableSubordinateName) throws XAException, IllegalStateException,
- SystemException, IOException {
- boolean existed = true;
- Transaction transaction = transactions.get(new SubordinateXidImple(toResume));
+ public Xid getAndResumeTransaction(int remainingTimeout, Xid toResume) throws XAException, IllegalStateException, SystemException, IOException {
+ Xid toReturn = null;
+ Transaction transaction = rootTransactionsAsSubordinate.get(new SubordinateXidImple(toResume));
if (transaction == null) {
transaction = SubordinationManager.getTransactionImporter().getImportedTransaction(toResume);
if (transaction == null) {
-
- XidImple toImport = new XidImple(toResume);
- XATxConverter.setSubordinateNodeName(toImport.getXID(), TxControl.getXANodeName());
-
- transaction = SubordinationManager.getTransactionImporter().importTransaction(toImport, remainingTimeout);
- existed = false;
+ transaction = SubordinationManager.getTransactionImporter().importTransaction(toResume, remainingTimeout);
+ toReturn = ((TransactionImple) transaction).getTxId();
}
}
transactionManagerService.getTransactionManager().resume(transaction);
- return existed;
+ return toReturn;
}
@Override
@@ -283,7 +236,7 @@
public void storeRootTransaction() throws SystemException {
TransactionImple transaction = ((TransactionImple) transactionManagerService.getTransactionManager().getTransaction());
Xid txId = transaction.getTxId();
- transactions.put(new SubordinateXidImple(txId), transaction);
+ rootTransactionsAsSubordinate.put(new SubordinateXidImple(txId), transaction);
}
@Override
@@ -294,70 +247,32 @@
@Override
public void removeRootTransaction(Xid toMigrate) {
- transactions.remove(new SubordinateXidImple(toMigrate));
+ rootTransactionsAsSubordinate.remove(new SubordinateXidImple(toMigrate));
}
@Override
- public CompletionCounter getCompletionCounter() {
- return counter;
+ public ProxyXAResource generateProxyXAResource(LookupProvider lookupProvider, String remoteServerName, Xid migratedXid) throws SystemException, IOException {
+ return new ProxyXAResource(getNodeName(), remoteServerName, migratedXid);
}
@Override
- public ProxyXAResource generateProxyXAResource(LookupProvider lookupProvider, String remoteServerName) throws SystemException, IOException {
-
- // Persist a proxy for the remote server this can mean we try to recover
- // transactions at a remote server that did not get chance to
- // prepare but the alternative is to orphan a prepared server
-
- Xid currentXid = getCurrentXid();
- File dir = new File(System.getProperty("user.dir") + "/distributedjta-tests/ProxyXAResource/" + getNodeName());
- dir.mkdirs();
- File file = new File(dir, new Uid().fileStringForm());
- file.createNewFile();
- DataOutputStream fos = new DataOutputStream(new FileOutputStream(file));
- byte[] remoteServerNameBytes = remoteServerName.getBytes();
- fos.writeInt(remoteServerNameBytes.length);
- fos.write(remoteServerNameBytes);
- fos.writeInt(currentXid.getFormatId());
- fos.writeInt(currentXid.getGlobalTransactionId().length);
- fos.write(currentXid.getGlobalTransactionId());
- fos.writeInt(currentXid.getBranchQualifier().length);
- fos.write(currentXid.getBranchQualifier());
- fos.flush();
- fos.close();
-
- return new ProxyXAResource(counter, lookupProvider, getNodeName(), remoteServerName, file);
- }
-
- @Override
- public void cleanupProxyXAResource(XAResource proxyXAResource) {
- ((ProxyXAResource) proxyXAResource).deleteTemporaryFile();
- }
-
- @Override
public Synchronization generateProxySynchronization(LookupProvider lookupProvider, String localServerName, String remoteServerName, Xid toRegisterAgainst) {
return new ProxySynchronization(lookupProvider, localServerName, remoteServerName, toRegisterAgainst);
}
@Override
- public void setOffline(boolean offline) {
- ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
- ClassLoader serversClassLoader = this.getClass().getClassLoader();
- Thread.currentThread().setContextClassLoader(serversClassLoader);
- this.offline = offline;
- Thread.currentThread().setContextClassLoader(classLoader);
- }
-
- @Override
public RemoteServer connectTo() {
return this;
}
@Override
- public int prepare(Xid xid) throws XAException {
+ public int prepare(Xid xid, boolean recover) throws XAException, IOException {
ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
+ if (recover) {
+ recover(xid);
+ }
return SubordinationManager.getXATerminator().prepare(xid);
} finally {
Thread.currentThread().setContextClassLoader(contextClassLoader);
@@ -366,12 +281,12 @@
@Override
public void commit(Xid xid, boolean onePhase, boolean recover) throws XAException, IOException {
- if (recover) {
- recover(xid);
- }
ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
+ if (recover) {
+ recover(xid);
+ }
SubordinationManager.getXATerminator().commit(xid, onePhase);
} finally {
Thread.currentThread().setContextClassLoader(contextClassLoader);
@@ -380,12 +295,12 @@
@Override
public void rollback(Xid xid, boolean recover) throws XAException, IOException {
- if (recover) {
- recover(xid);
- }
ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
+ if (recover) {
+ recover(xid);
+ }
SubordinationManager.getXATerminator().rollback(xid);
} finally {
Thread.currentThread().setContextClassLoader(contextClassLoader);
@@ -393,19 +308,17 @@
}
protected void recover(Xid toRecover) throws XAException, IOException {
- // Work out what the subordinate name would be for these transaction
- // for this server
- XidImple recoverable = new XidImple(toRecover);
- XATxConverter.setSubordinateNodeName(recoverable.getXID(), TxControl.getXANodeName());
- ((XATerminatorImple) SubordinationManager.getXATerminator()).doRecover(recoverable);
+ ((XATerminatorImple) SubordinationManager.getXATerminator()).doRecover(new XidImple(toRecover), null);
}
@Override
- public void forget(Xid xid) throws XAException, IOException {
- recover(xid);
+ public void forget(Xid xid, boolean recover) throws XAException, IOException {
ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
+ if (recover) {
+ recover(xid);
+ }
SubordinationManager.getXATerminator().forget(xid);
} finally {
Thread.currentThread().setContextClassLoader(contextClassLoader);
@@ -423,4 +336,9 @@
Thread.currentThread().setContextClassLoader(contextClassLoader);
}
}
+
+ @Override
+ public Xid[] recoverFor(String localServerName) throws XAException {
+ return ((XATerminatorImple) SubordinationManager.getXATerminator()).doRecover(null, localServerName);
+ }
}
Modified: labs/jbosstm/branches/JBOSSTS_4_15_0_Final/common/classes/com/arjuna/common/util/ConfigurationInfo.java
===================================================================
--- labs/jbosstm/branches/JBOSSTS_4_15_0_Final/common/classes/com/arjuna/common/util/ConfigurationInfo.java 2011-11-30 10:32:11 UTC (rev 37746)
+++ labs/jbosstm/branches/JBOSSTS_4_15_0_Final/common/classes/com/arjuna/common/util/ConfigurationInfo.java 2011-11-30 16:48:58 UTC (rev 37747)
@@ -123,7 +123,7 @@
}
} catch(Exception exception) {
- exception.printStackTrace();
+// exception.printStackTrace();
} finally {
if(is!= null) {
try {
More information about the jboss-svn-commits
mailing list