[jboss-svn-commits] JBL Code SVN: r37552 - in labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed: impl and 1 other directory.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Sat Oct 15 07:06:45 EDT 2011
Author: tomjenkinson
Date: 2011-10-15 07:06:44 -0400 (Sat, 15 Oct 2011)
New Revision: 37552
Added:
labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/LocalServer.java
labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/RemoteServer.java
Removed:
labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/Server.java
Modified:
labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/SimpleIsolatedServers.java
labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/impl/ProxySynchronization.java
labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/impl/ProxyXAResource.java
labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/impl/ServerImpl.java
Log:
JBTM-916 updated to provide clearer guidance on what constitutes calls locally vs remote
Copied: labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/LocalServer.java (from rev 37545, labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/Server.java)
===================================================================
--- labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/LocalServer.java (rev 0)
+++ labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/LocalServer.java 2011-10-15 11:06:44 UTC (rev 37552)
@@ -0,0 +1,45 @@
+package com.arjuna.ats.jta.distributed;
+
+import java.io.IOException;
+import java.net.ConnectException;
+import java.util.List;
+
+import javax.transaction.HeuristicCommitException;
+import javax.transaction.HeuristicMixedException;
+import javax.transaction.HeuristicRollbackException;
+import javax.transaction.InvalidTransactionException;
+import javax.transaction.NotSupportedException;
+import javax.transaction.RollbackException;
+import javax.transaction.Synchronization;
+import javax.transaction.SystemException;
+import javax.transaction.TransactionManager;
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import com.arjuna.ats.arjuna.common.CoreEnvironmentBeanException;
+
+public interface LocalServer {
+
+ public void initialise(Integer nodeName) throws CoreEnvironmentBeanException, IOException;
+
+ public Integer getNodeName();
+
+ public TransactionManager getTransactionManager() throws NotSupportedException, SystemException;
+
+ public void doRecoveryManagerScan();
+
+ public long getTimeLeftBeforeTransactionTimeout() throws RollbackException;
+
+ public Xid getCurrentXid() throws SystemException;
+
+ public void setOffline(boolean offline);
+
+ public XAResource generateProxyXAResource(Integer localServerName, Integer remoteServerName);
+
+ public Synchronization generateProxySynchronization(Integer localServerName, Integer remoteServerName, Xid toRegisterAgainst);
+
+ public boolean importTransaction(int remainingTimeout, Xid toImport) throws XAException, InvalidTransactionException, IllegalStateException,
+ SystemException;
+
+}
Added: labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/RemoteServer.java
===================================================================
--- labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/RemoteServer.java (rev 0)
+++ labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/RemoteServer.java 2011-10-15 11:06:44 UTC (rev 37552)
@@ -0,0 +1,29 @@
+package com.arjuna.ats.jta.distributed;
+
+import java.net.ConnectException;
+import java.util.List;
+
+import javax.transaction.HeuristicCommitException;
+import javax.transaction.HeuristicMixedException;
+import javax.transaction.HeuristicRollbackException;
+import javax.transaction.SystemException;
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.Xid;
+
+public interface RemoteServer {
+
+ public int propagatePrepare(Xid xid) throws XAException, ConnectException;
+
+ public void propagateCommit(Xid xid, boolean onePhase) throws IllegalStateException, HeuristicMixedException, HeuristicRollbackException,
+ HeuristicCommitException, SystemException, XAException, ConnectException;
+
+ public void propagateRollback(Xid xid) throws IllegalStateException, HeuristicMixedException, HeuristicCommitException, HeuristicRollbackException,
+ SystemException, XAException, ConnectException;
+
+ public Xid[] propagateRecover(List<Integer> startScanned, int flag) throws XAException, ConnectException;
+
+ public void propagateForget(Xid xid) throws XAException, ConnectException;
+
+ public void propagateBeforeCompletion(Xid xid) throws XAException, SystemException, ConnectException;
+
+}
Deleted: labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/Server.java
===================================================================
--- labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/Server.java 2011-10-15 08:34:44 UTC (rev 37551)
+++ labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/Server.java 2011-10-15 11:06:44 UTC (rev 37552)
@@ -1,55 +0,0 @@
-package com.arjuna.ats.jta.distributed;
-
-import java.io.IOException;
-import java.util.List;
-
-import javax.transaction.HeuristicCommitException;
-import javax.transaction.HeuristicMixedException;
-import javax.transaction.HeuristicRollbackException;
-import javax.transaction.InvalidTransactionException;
-import javax.transaction.NotSupportedException;
-import javax.transaction.RollbackException;
-import javax.transaction.Synchronization;
-import javax.transaction.SystemException;
-import javax.transaction.TransactionManager;
-import javax.transaction.xa.XAException;
-import javax.transaction.xa.XAResource;
-import javax.transaction.xa.Xid;
-
-import com.arjuna.ats.arjuna.common.CoreEnvironmentBeanException;
-
-public interface Server {
-
- public void initialise(int nodeName) throws CoreEnvironmentBeanException, IOException;
-
- public int getNodeName();
-
- public TransactionManager getTransactionManager() throws NotSupportedException, SystemException;
-
- public boolean importTransaction(int remainingTimeout, Xid toMigrate) throws XAException, InvalidTransactionException, IllegalStateException,
- SystemException;
-
- public void propagateCommit(Xid xid, boolean onePhase) throws IllegalStateException, HeuristicMixedException, HeuristicRollbackException,
- HeuristicCommitException, SystemException, XAException;
-
- public int propagatePrepare(Xid xid) throws XAException;
-
- public void propagateRollback(Xid xid) throws IllegalStateException, HeuristicMixedException, HeuristicCommitException, HeuristicRollbackException,
- SystemException, XAException;
-
- public void propagateForget(Xid xid) throws XAException;
-
- public void doRecoveryManagerScan();
-
- public Xid[] propagateRecover(List<Integer> startScanned, int flag) throws XAException;
-
- public long getTimeLeftBeforeTransactionTimeout() throws RollbackException;
-
- public void propagateBeforeCompletion(Xid xid) throws XAException, SystemException;
-
- public Xid getCurrentXid() throws SystemException;
-
- public XAResource generateProxyXAResource(int currentNodeName, int nextNodeName);
-
- public Synchronization generateProxySynchronization(int serverId, int serverIdToProxyTo, Xid toRegisterAgainst);
-}
Modified: labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/SimpleIsolatedServers.java
===================================================================
--- labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/SimpleIsolatedServers.java 2011-10-15 08:34:44 UTC (rev 37551)
+++ labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/SimpleIsolatedServers.java 2011-10-15 11:06:44 UTC (rev 37552)
@@ -4,6 +4,9 @@
import java.io.IOException;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
import javax.transaction.HeuristicMixedException;
import javax.transaction.HeuristicRollbackException;
@@ -23,7 +26,8 @@
import com.arjuna.ats.arjuna.common.CoreEnvironmentBeanException;
public class SimpleIsolatedServers {
- private static Server[] servers = new Server[3];
+ private static LocalServer[] localServers = new LocalServer[3];
+ private static RemoteServer[] remoteServers = new RemoteServer[3];
@BeforeClass
public static void setup() throws SecurityException, NoSuchMethodException, InstantiationException, IllegalAccessException, ClassNotFoundException,
@@ -31,7 +35,7 @@
// Get the Server interface loaded, only way I found to do this was
// instantiate one
- java.lang.reflect.Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[] { Server.class },
+ java.lang.reflect.Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[] { LocalServer.class, RemoteServer.class },
new InvocationHandler() {
@Override
@@ -42,16 +46,17 @@
});
ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
- for (int i = 0; i < getServers().length; i++) {
+ for (int i = 0; i < localServers.length; i++) {
IsolatableServersClassLoader classLoader = new IsolatableServersClassLoader(contextClassLoader);
- getServers()[i] = (Server) classLoader.loadClass("com.arjuna.ats.jta.distributed.impl.ServerImpl").newInstance();
- getServers()[i].initialise((i + 1) * 1000);
+ localServers[i] = (LocalServer) classLoader.loadClass("com.arjuna.ats.jta.distributed.impl.ServerImpl").newInstance();
+ localServers[i].initialise((i + 1) * 1000);
+ remoteServers[i] = (RemoteServer) localServers[i];
}
}
@Test
public void testRecovery() {
- getServers()[0].doRecoveryManagerScan();
+ localServers[0].doRecoveryManagerScan();
}
@Test
@@ -65,8 +70,8 @@
int startingTimeout = 0;
// Start out at the first server
- Server originalServer = getServers()[0];
- TransactionManager transactionManager = getServers()[0].getTransactionManager();
+ LocalServer originalServer = getLocalServer(1000);
+ TransactionManager transactionManager = originalServer.getTransactionManager();
transactionManager.setTransactionTimeout(startingTimeout);
transactionManager.begin();
Transaction originalTransaction = transactionManager.getTransaction();
@@ -77,27 +82,28 @@
// Loop through the rest of the servers passing the transaction up and
// down
Transaction suspendedTransaction = originalServer.getTransactionManager().suspend();
- boolean proxyRequired = recursivelyFlowTransaction(0, 1, toMigrate);
+ long timeLeftBeforeTransactionTimeout = originalServer.getTimeLeftBeforeTransactionTimeout();
+ List<Integer> nodesToFlowTo = new LinkedList<Integer>(Arrays.asList(new Integer[] { 2000, 3000, 2000, 1000, 2000, 3000, 1000, 3000 }));
+ boolean proxyRequired = recursivelyFlowTransaction(nodesToFlowTo, timeLeftBeforeTransactionTimeout, toMigrate);
originalServer.getTransactionManager().resume(suspendedTransaction);
if (proxyRequired) {
- XAResource proxyXAResource = originalServer.generateProxyXAResource(originalServer.getNodeName(), getServers()[1].getNodeName());
+ XAResource proxyXAResource = originalServer.generateProxyXAResource(originalServer.getNodeName(), 2000);
originalTransaction.enlistResource(proxyXAResource);
- originalTransaction.registerSynchronization(originalServer.generateProxySynchronization(originalServer.getNodeName(),
- getServers()[1].getNodeName(), toMigrate));
+ originalTransaction.registerSynchronization(originalServer.generateProxySynchronization(originalServer.getNodeName(), 2000, toMigrate));
}
Transaction transaction = transactionManager.getTransaction();
transaction.commit();
}
- private boolean recursivelyFlowTransaction(int previousServerIndex, int currentServerIndex, Xid toMigrate) throws RollbackException,
+ private boolean recursivelyFlowTransaction(List<Integer> nodesToFlowTo, long timeLeftBeforeTransactionTimeout, Xid toMigrate) throws RollbackException,
InvalidTransactionException, IllegalStateException, XAException, SystemException, NotSupportedException {
- Server previousServer = getServers()[previousServerIndex];
- Server currentServer = getServers()[currentServerIndex];
+ Integer currentServerName = nodesToFlowTo.remove(0);
+ LocalServer currentServer = getLocalServer(currentServerName);
// Migrate the transaction to the next server
- int remainingTimeout = (int) (previousServer.getTimeLeftBeforeTransactionTimeout() / 1000);
+ int remainingTimeout = (int) (timeLeftBeforeTransactionTimeout / 1000);
boolean requiresProxyAtPreviousServer = !currentServer.importTransaction(remainingTimeout, toMigrate);
// Perform work on the migrated transaction
@@ -106,38 +112,44 @@
transaction.registerSynchronization(new TestSynchronization(currentServer.getNodeName()));
transaction.enlistResource(new TestResource(currentServer.getNodeName(), false));
- int nextNextServerIndex = -1;
- if (currentServerIndex > previousServerIndex && currentServerIndex + 1 != getServers().length) {
- // Ascending
- nextNextServerIndex = currentServerIndex + 1;
- } else {
- // Descending
- nextNextServerIndex = currentServerIndex - 1;
- }
+ if (!nodesToFlowTo.isEmpty()) {
+ Integer nextServerNodeName = nodesToFlowTo.get(0);
- // THE WORKHORSE OF FLOWING A TRANSACTION
- // SUSPEND THE TRANSACTION
- Transaction suspendedTransaction = currentServer.getTransactionManager().suspend();
- boolean proxyRequired = false;
- if (nextNextServerIndex != -1) {
+ // SUSPEND THE TRANSACTION
+ Transaction suspendedTransaction = currentServer.getTransactionManager().suspend();
// FLOW THE TRANSACTION
- proxyRequired = recursivelyFlowTransaction(currentServerIndex, nextNextServerIndex, toMigrate);
+ timeLeftBeforeTransactionTimeout = currentServer.getTimeLeftBeforeTransactionTimeout();
+ boolean proxyRequired = recursivelyFlowTransaction(nodesToFlowTo, timeLeftBeforeTransactionTimeout, toMigrate);
+ // Create a proxy for the new server if necessary, this can orphan
+ // the
+ // remote server but XA recovery will handle that on the remote
+ // server
+ // The alternative is to always create a proxy but this is a
+ // performance
+ // drain and will result in multiple subordinate transactions and
+ // performance
+ // issues
+ // RESUME THE TRANSACTION IN CASE THERE IS MORE WORK
+ currentServer.getTransactionManager().resume(suspendedTransaction);
+ if (proxyRequired) {
+ XAResource proxyXAResource = currentServer.generateProxyXAResource(currentServer.getNodeName(), nextServerNodeName);
+ suspendedTransaction.enlistResource(proxyXAResource);
+ suspendedTransaction.registerSynchronization(currentServer.generateProxySynchronization(currentServer.getNodeName(), nextServerNodeName,
+ toMigrate));
+ }
}
- // RESUME THE TRANSACTION IN CASE THERE IS MORE WORK
- currentServer.getTransactionManager().resume(suspendedTransaction);
- // Create a proxy for the new server if necessary
- if (proxyRequired) {
- XAResource proxyXAResource = currentServer.generateProxyXAResource(currentServer.getNodeName(), getServers()[nextNextServerIndex].getNodeName());
- suspendedTransaction.enlistResource(proxyXAResource);
- suspendedTransaction.registerSynchronization(currentServer.generateProxySynchronization(currentServer.getNodeName(),
- getServers()[nextNextServerIndex].getNodeName(), toMigrate));
- }
// SUSPEND THE TRANSACTION WHEN YOU ARE READY TO RETURN TO YOUR CALLER
- suspendedTransaction = currentServer.getTransactionManager().suspend();
+ currentServer.getTransactionManager().suspend();
return requiresProxyAtPreviousServer;
}
- public static Server[] getServers() {
- return servers;
+ public static RemoteServer lookup(Integer jndiName) {
+ int index = (jndiName / 1000) - 1;
+ return remoteServers[index];
}
+
+ private static LocalServer getLocalServer(Integer jndiName) {
+ int index = (jndiName / 1000) - 1;
+ return localServers[index];
+ }
}
Modified: labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/impl/ProxySynchronization.java
===================================================================
--- labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/impl/ProxySynchronization.java 2011-10-15 08:34:44 UTC (rev 37551)
+++ labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/impl/ProxySynchronization.java 2011-10-15 11:06:44 UTC (rev 37552)
@@ -1,5 +1,7 @@
package com.arjuna.ats.jta.distributed.impl;
+import java.net.ConnectException;
+
import javax.transaction.Synchronization;
import javax.transaction.SystemException;
import javax.transaction.xa.XAException;
@@ -9,26 +11,27 @@
public class ProxySynchronization implements Synchronization {
- private int serverId;
- private int serverIdToProxyTo;
+ private int localServerName;
+ private int remoteServerName;
private Xid toRegisterAgainst;
- public ProxySynchronization(int serverId, int serverIdToProxyTo, Xid toRegisterAgainst) {
- this.serverId = serverId;
- this.serverIdToProxyTo = serverIdToProxyTo;
+ public ProxySynchronization(int localServerName, int remoteServerName, Xid toRegisterAgainst) {
+ this.localServerName = localServerName;
+ this.remoteServerName = remoteServerName;
this.toRegisterAgainst = toRegisterAgainst;
}
@Override
public void beforeCompletion() {
- System.out.println("ProxySynchronization (" + serverId + ":" + serverIdToProxyTo + ") beforeCompletion");
- int index = (serverIdToProxyTo / 1000) - 1;
+ System.out.println("ProxySynchronization (" + localServerName + ":" + remoteServerName + ") beforeCompletion");
try {
- SimpleIsolatedServers.getServers()[index].propagateBeforeCompletion(toRegisterAgainst);
+ SimpleIsolatedServers.lookup(remoteServerName).propagateBeforeCompletion(toRegisterAgainst);
} catch (XAException e) {
e.printStackTrace();
} catch (SystemException e) {
e.printStackTrace();
+ } catch (ConnectException e) {
+ e.printStackTrace();
}
}
Modified: labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/impl/ProxyXAResource.java
===================================================================
--- labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/impl/ProxyXAResource.java 2011-10-15 08:34:44 UTC (rev 37551)
+++ labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/impl/ProxyXAResource.java 2011-10-15 11:06:44 UTC (rev 37552)
@@ -6,6 +6,7 @@
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.net.ConnectException;
import java.util.ArrayList;
import java.util.List;
@@ -20,7 +21,6 @@
import org.jboss.tm.XAResourceWrapper;
import com.arjuna.ats.arjuna.common.Uid;
-import com.arjuna.ats.jta.distributed.Server;
import com.arjuna.ats.jta.distributed.SimpleIsolatedServers;
public class ProxyXAResource implements XAResource, XAResourceWrapper {
@@ -29,17 +29,17 @@
private int transactionTimeout;
private Xid xid;
- private int serverIdToProxyTo = -1;
+ private Integer remoteServerName = -1;
private File file;
- private Integer serverId;
+ private Integer localServerName;
- public ProxyXAResource(int serverId, int serverIdToProxyTo) {
- this.serverId = serverId;
- this.serverIdToProxyTo = serverIdToProxyTo;
+ public ProxyXAResource(Integer localServerName, Integer remoteServerName) {
+ this.localServerName = localServerName;
+ this.remoteServerName = remoteServerName;
}
- public ProxyXAResource(int recoverFor, File file) throws IOException {
- this.serverId = recoverFor;
+ public ProxyXAResource(Integer localServerName, File file) throws IOException {
+ this.localServerName = localServerName;
this.file = file;
DataInputStream fis = new DataInputStream(new FileInputStream(file));
final int formatId = fis.readInt();
@@ -49,8 +49,8 @@
final int bqual_length = fis.readInt();
final byte[] bqual = new byte[bqual_length];
fis.read(bqual, 0, bqual_length);
- int serverIdToProxyTo = fis.readInt();
- this.serverIdToProxyTo = serverIdToProxyTo;
+ int remoteServerName = fis.readInt();
+ this.remoteServerName = remoteServerName;
this.xid = new Xid() {
@Override
public byte[] getGlobalTransactionId() {
@@ -111,22 +111,22 @@
@Override
public void start(Xid xid, int flags) throws XAException {
- System.out.println(" ProxyXAResource (" + serverId + ":" + serverIdToProxyTo + ") XA_START [" + xid + "]");
+ System.out.println(" ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_START [" + xid + "]");
this.xid = xid;
}
@Override
public void end(Xid xid, int flags) throws XAException {
- System.out.println(" ProxyXAResource (" + serverId + ":" + serverIdToProxyTo + ") XA_END [" + xid + "]");
+ System.out.println(" ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_END [" + xid + "]");
this.xid = null;
}
@Override
public synchronized int prepare(Xid xid) throws XAException {
- System.out.println(" ProxyXAResource (" + serverId + ":" + serverIdToProxyTo + ") XA_PREPARE [" + xid + "]");
+ System.out.println(" ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_PREPARE [" + xid + "]");
try {
- File dir = new File(System.getProperty("user.dir") + "/tmp/ProxyXAResource/" + serverId + "/");
+ File dir = new File(System.getProperty("user.dir") + "/tmp/ProxyXAResource/" + localServerName + "/");
dir.mkdirs();
file = new File(dir, new Uid().fileStringForm());
@@ -144,24 +144,28 @@
fos.write(gtrid, 0, gtrid_length);
fos.writeInt(bqual_length);
fos.write(bqual, 0, bqual_length);
- fos.writeInt(serverIdToProxyTo);
+ fos.writeInt(remoteServerName);
} catch (IOException e) {
e.printStackTrace();
throw new XAException(XAException.XAER_RMERR);
}
- int propagatePrepare = getServerToProxyTo().propagatePrepare(xid);
- System.out.println(" ProxyXAResource (" + serverId + ":" + serverIdToProxyTo + ") XA_PREPARED");
- return propagatePrepare;
+ try {
+ int propagatePrepare = SimpleIsolatedServers.lookup(remoteServerName).propagatePrepare(xid);
+ System.out.println(" ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_PREPARED");
+ return propagatePrepare;
+ } catch (ConnectException ce) {
+ throw new XAException(XAException.XA_RETRY);
+ }
}
@Override
public synchronized void commit(Xid xid, boolean onePhase) throws XAException {
- System.out.println(" ProxyXAResource (" + serverId + ":" + serverIdToProxyTo + ") XA_COMMIT [" + xid + "]");
+ System.out.println(" ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_COMMIT [" + xid + "]");
try {
- getServerToProxyTo().propagateCommit(xid, onePhase);
- System.out.println(" ProxyXAResource (" + serverId + ":" + serverIdToProxyTo + ") XA_COMMITED");
+ SimpleIsolatedServers.lookup(remoteServerName).propagateCommit(xid, onePhase);
+ System.out.println(" ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_COMMITED");
} catch (IllegalStateException e) {
throw new XAException(XAException.XAER_INVAL);
} catch (HeuristicMixedException e) {
@@ -172,6 +176,8 @@
throw new XAException(XAException.XA_HEURCOM);
} catch (SystemException e) {
throw new XAException(XAException.XAER_PROTO);
+ } catch (ConnectException ce) {
+ throw new XAException(XAException.XA_RETRY);
}
if (file != null) {
@@ -181,10 +187,10 @@
@Override
public synchronized void rollback(Xid xid) throws XAException {
- System.out.println(" ProxyXAResource (" + serverId + ":" + serverIdToProxyTo + ") XA_ROLLBACK[" + xid + "]");
+ System.out.println(" ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_ROLLBACK[" + xid + "]");
try {
- getServerToProxyTo().propagateRollback(xid);
- System.out.println(" ProxyXAResource (" + serverId + ":" + serverIdToProxyTo + ") XA_ROLLBACKED");
+ SimpleIsolatedServers.lookup(remoteServerName).propagateRollback(xid);
+ System.out.println(" ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_ROLLBACKED");
} catch (IllegalStateException e) {
throw new XAException(XAException.XAER_INVAL);
} catch (HeuristicMixedException e) {
@@ -195,6 +201,8 @@
throw new XAException(XAException.XA_HEURRB);
} catch (SystemException e) {
throw new XAException(XAException.XAER_PROTO);
+ } catch (ConnectException ce) {
+ throw new XAException(XAException.XA_RETRY);
}
if (file != null) {
@@ -212,28 +220,43 @@
int tocheck = (flag & XAResource.TMSTARTRSCAN);
if (tocheck == XAResource.TMSTARTRSCAN) {
- System.out.println(" ProxyXAResource (" + serverId + ":" + serverIdToProxyTo + ") XA_RECOVER [XAResource.TMSTARTRSCAN]: " + serverIdToProxyTo);
+ System.out.println(" ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_RECOVER [XAResource.TMSTARTRSCAN]: "
+ + remoteServerName);
- if (!startScanned.contains(serverIdToProxyTo)) {
- startScanned.add(serverIdToProxyTo);
+ if (!startScanned.contains(remoteServerName)) {
+ startScanned.add(remoteServerName);
// Make sure that the remote server has recovered all
// transactions
- getServerToProxyTo().propagateRecover(startScanned, flag);
- startScanned.remove((Integer) serverIdToProxyTo);
+ try {
+ SimpleIsolatedServers.lookup(remoteServerName).propagateRecover(startScanned, flag);
+ } catch (ConnectException ce) {
+ throw new XAException(XAException.XA_RETRY);
+ } finally {
+ startScanned.remove((Integer) remoteServerName);
+ }
}
- System.out.println(" ProxyXAResource (" + serverId + ":" + serverIdToProxyTo + ") XA_RECOVERD[XAResource.TMSTARTRSCAN]: " + serverIdToProxyTo);
+ System.out.println(" ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_RECOVERD[XAResource.TMSTARTRSCAN]: "
+ + remoteServerName);
}
tocheck = (flag & XAResource.TMENDRSCAN);
if (tocheck == XAResource.TMENDRSCAN) {
- System.out.println(" ProxyXAResource (" + serverId + ":" + serverIdToProxyTo + ") XA_RECOVER [XAResource.TMENDRSCAN]: " + serverIdToProxyTo);
+ System.out.println(" ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_RECOVER [XAResource.TMENDRSCAN]: "
+ + remoteServerName);
- if (!startScanned.contains(serverIdToProxyTo)) {
- getServerToProxyTo().propagateRecover(startScanned, flag);
+ if (!startScanned.contains(remoteServerName)) {
+ try {
+ SimpleIsolatedServers.lookup(remoteServerName).propagateRecover(startScanned, flag);
+ } catch (ConnectException ce) {
+ throw new XAException(XAException.XA_RETRY);
+ } finally {
+ startScanned.remove((Integer) remoteServerName);
+ }
}
- System.out.println(" ProxyXAResource (" + serverId + ":" + serverIdToProxyTo + ") XA_RECOVERD[XAResource.TMENDRSCAN]: " + serverIdToProxyTo);
+ System.out.println(" ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_RECOVERD[XAResource.TMENDRSCAN]: "
+ + remoteServerName);
}
return new Xid[] { xid };
@@ -241,9 +264,13 @@
@Override
public void forget(Xid xid) throws XAException {
- System.out.println(" ProxyXAResource (" + serverId + ":" + serverIdToProxyTo + ") XA_FORGET [" + xid + "]");
- getServerToProxyTo().propagateForget(xid);
- System.out.println(" ProxyXAResource (" + serverId + ":" + serverIdToProxyTo + ") XA_FORGETED[" + xid + "]");
+ System.out.println(" ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_FORGET [" + xid + "]");
+ try {
+ SimpleIsolatedServers.lookup(remoteServerName).propagateForget(xid);
+ System.out.println(" ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_FORGETED[" + xid + "]");
+ } catch (ConnectException ce) {
+ throw new XAException(XAException.XA_RETRY);
+ }
}
@Override
@@ -290,9 +317,4 @@
public String getJndiName() {
return "ProxyXAResource";
}
-
- private Server getServerToProxyTo() {
- int index = (serverIdToProxyTo / 1000) - 1;
- return SimpleIsolatedServers.getServers()[index];
- }
}
Modified: labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/impl/ServerImpl.java
===================================================================
--- labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/impl/ServerImpl.java 2011-10-15 08:34:44 UTC (rev 37551)
+++ labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/impl/ServerImpl.java 2011-10-15 11:06:44 UTC (rev 37552)
@@ -1,6 +1,7 @@
package com.arjuna.ats.jta.distributed.impl;
import java.io.IOException;
+import java.net.ConnectException;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.HashMap;
@@ -26,7 +27,6 @@
import com.arjuna.ats.arjuna.common.CoreEnvironmentBeanException;
import com.arjuna.ats.arjuna.common.ObjectStoreEnvironmentBean;
import com.arjuna.ats.arjuna.common.RecoveryEnvironmentBean;
-import com.arjuna.ats.arjuna.coordinator.TransactionReaper;
import com.arjuna.ats.arjuna.coordinator.TxControl;
import com.arjuna.ats.arjuna.recovery.RecoveryManager;
import com.arjuna.ats.arjuna.tools.osb.mbean.ObjStoreBrowser;
@@ -37,25 +37,27 @@
import com.arjuna.ats.jbossatx.jta.RecoveryManagerService;
import com.arjuna.ats.jbossatx.jta.TransactionManagerService;
import com.arjuna.ats.jta.common.JTAEnvironmentBean;
-import com.arjuna.ats.jta.distributed.Server;
+import com.arjuna.ats.jta.distributed.RemoteServer;
+import com.arjuna.ats.jta.distributed.LocalServer;
import com.arjuna.ats.jta.distributed.TestResourceRecovery;
-public class ServerImpl implements Server {
+public class ServerImpl implements LocalServer, RemoteServer {
- private int id;
+ private int nodeName;
private RecoveryManagerService recoveryManagerService;
private TransactionManagerService transactionManagerService;
+ private boolean offline;
- public void initialise(int id) throws CoreEnvironmentBeanException, IOException {
- this.id = id;
+ public void initialise(Integer serverName) throws CoreEnvironmentBeanException, IOException {
+ this.nodeName = serverName;
RecoveryEnvironmentBean recoveryEnvironmentBean = com.arjuna.ats.arjuna.common.recoveryPropertyManager.getRecoveryEnvironmentBean();
recoveryEnvironmentBean.setRecoveryBackoffPeriod(1);
recoveryEnvironmentBean.setRecoveryInetAddress(InetAddress.getByName("localhost"));
- recoveryEnvironmentBean.setRecoveryPort(4712 + id);
+ recoveryEnvironmentBean.setRecoveryPort(4712 + serverName);
recoveryEnvironmentBean.setTransactionStatusManagerInetAddress(InetAddress.getByName("localhost"));
- recoveryEnvironmentBean.setTransactionStatusManagerPort(4713 + id);
+ recoveryEnvironmentBean.setTransactionStatusManagerPort(4713 + serverName);
List<String> recoveryModuleClassNames = new ArrayList<String>();
recoveryModuleClassNames.add("com.arjuna.ats.internal.arjuna.recovery.AtomicActionRecoveryModule");
@@ -68,26 +70,27 @@
recoveryEnvironmentBean.setRecoveryActivators(null);
CoreEnvironmentBean coreEnvironmentBean = com.arjuna.ats.arjuna.common.arjPropertyManager.getCoreEnvironmentBean();
- coreEnvironmentBean.setSocketProcessIdPort(4714 + id);
- coreEnvironmentBean.setNodeIdentifier(id);
+ coreEnvironmentBean.setSocketProcessIdPort(4714 + serverName);
+ coreEnvironmentBean.setNodeIdentifier(serverName);
coreEnvironmentBean.setSocketProcessIdMaxPorts(1);
CoordinatorEnvironmentBean coordinatorEnvironmentBean = com.arjuna.ats.arjuna.common.arjPropertyManager.getCoordinatorEnvironmentBean();
coordinatorEnvironmentBean.setEnableStatistics(false);
coordinatorEnvironmentBean.setDefaultTimeout(300);
coordinatorEnvironmentBean.setTransactionStatusManagerEnable(false);
+ coordinatorEnvironmentBean.setDefaultTimeout(0);
ObjectStoreEnvironmentBean actionStoreObjectStoreEnvironmentBean = com.arjuna.common.internal.util.propertyservice.BeanPopulator.getNamedInstance(
com.arjuna.ats.arjuna.common.ObjectStoreEnvironmentBean.class, "default");
- actionStoreObjectStoreEnvironmentBean.setObjectStoreDir(System.getProperty("user.dir") + "/tmp/tx-object-store/" + id);
+ actionStoreObjectStoreEnvironmentBean.setObjectStoreDir(System.getProperty("user.dir") + "/tmp/tx-object-store/" + serverName);
ObjectStoreEnvironmentBean stateStoreObjectStoreEnvironmentBean = com.arjuna.common.internal.util.propertyservice.BeanPopulator.getNamedInstance(
com.arjuna.ats.arjuna.common.ObjectStoreEnvironmentBean.class, "stateStore");
- stateStoreObjectStoreEnvironmentBean.setObjectStoreDir(System.getProperty("user.dir") + "/tmp/tx-object-store/" + id);
+ stateStoreObjectStoreEnvironmentBean.setObjectStoreDir(System.getProperty("user.dir") + "/tmp/tx-object-store/" + serverName);
ObjectStoreEnvironmentBean communicationStoreObjectStoreEnvironmentBean = com.arjuna.common.internal.util.propertyservice.BeanPopulator
.getNamedInstance(com.arjuna.ats.arjuna.common.ObjectStoreEnvironmentBean.class, "communicationStore");
- communicationStoreObjectStoreEnvironmentBean.setObjectStoreDir(System.getProperty("user.dir") + "/tmp/tx-object-store/" + id);
+ communicationStoreObjectStoreEnvironmentBean.setObjectStoreDir(System.getProperty("user.dir") + "/tmp/tx-object-store/" + serverName);
ObjStoreBrowser objStoreBrowser = new ObjStoreBrowser();
Map<String, String> types = new HashMap<String, String>();
@@ -101,7 +104,7 @@
jTAEnvironmentBean
.setTransactionSynchronizationRegistryClassName("com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionSynchronizationRegistryImple");
List<Integer> xaRecoveryNodes = new ArrayList<Integer>();
- xaRecoveryNodes.add(id);
+ xaRecoveryNodes.add(serverName);
jTAEnvironmentBean.setXaRecoveryNodes(xaRecoveryNodes);
List<String> xaResourceOrphanFilterClassNames = new ArrayList<String>();
@@ -114,29 +117,26 @@
recoveryManagerService = new RecoveryManagerService();
recoveryManagerService.create();
- recoveryManagerService.addXAResourceRecovery(new ProxyXAResourceRecovery(id));
- recoveryManagerService.addXAResourceRecovery(new TestResourceRecovery(id));
+ recoveryManagerService.addXAResourceRecovery(new ProxyXAResourceRecovery(serverName));
+ recoveryManagerService.addXAResourceRecovery(new TestResourceRecovery(serverName));
// recoveryManagerService.start();
RecoveryManager.manager().initialize();
transactionManagerService = new TransactionManagerService();
TxControl txControl = new com.arjuna.ats.arjuna.coordinator.TxControl();
- txControl.setDefaultTimeout(0);
transactionManagerService.setJbossXATerminator(new com.arjuna.ats.internal.jbossatx.jta.jca.XATerminator());
transactionManagerService
.setTransactionSynchronizationRegistry(new com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionSynchronizationRegistryImple());
- // starts the transaction reaper transactionManagerService.create();
+ transactionManagerService.create();
}
+ @Override
public void doRecoveryManagerScan() {
RecoveryManager.manager().scan();
}
- public void startTransactionReaper() {
- TransactionReaper.transactionReaper();
- }
-
+ @Override
public TransactionManager getTransactionManager() {
return transactionManagerService.getTransactionManager();
}
@@ -155,64 +155,87 @@
}
@Override
- public int propagatePrepare(Xid xid) throws XAException {
- return SubordinationManager.getTransactionImporter().getImportedTransaction(xid).doPrepare();
+ public Integer getNodeName() {
+ return nodeName;
}
@Override
- public void propagateCommit(Xid xid, boolean onePhase) throws IllegalStateException, HeuristicMixedException, HeuristicRollbackException,
- HeuristicCommitException, SystemException, XAException {
- SubordinationManager.getTransactionImporter().getImportedTransaction(xid).doCommit();
+ public long getTimeLeftBeforeTransactionTimeout() throws RollbackException {
+ return ((TransactionTimeoutConfiguration) transactionManagerService.getTransactionManager()).getTimeLeftBeforeTransactionTimeout(false);
}
@Override
- public void propagateRollback(Xid xid) throws IllegalStateException, HeuristicMixedException, HeuristicCommitException, HeuristicRollbackException,
- SystemException, XAException {
- SubordinationManager.getTransactionImporter().getImportedTransaction(xid).doRollback();
+ public Xid getCurrentXid() throws SystemException {
+ TransactionImple transaction = ((TransactionImple) transactionManagerService.getTransactionManager().getTransaction());
+ return transaction.getTxId();
}
@Override
- public Xid[] propagateRecover(List<Integer> recoveryScanStarted, int flag) throws XAException {
- // Assumes that this thread is used by the recovery thread
- ProxyXAResource.RECOVERY_SCAN_STARTED.set(recoveryScanStarted);
- return SubordinationManager.getXATerminator().recover(flag);
+ public XAResource generateProxyXAResource(Integer localServerName, Integer remoteServerName) {
+ return new ProxyXAResource(localServerName, remoteServerName);
}
@Override
- public void propagateForget(Xid xid) throws XAException {
- SubordinationManager.getXATerminator().forget(xid);
+ public Synchronization generateProxySynchronization(Integer localServerName, Integer remoteServerName, Xid toRegisterAgainst) {
+ return new ProxySynchronization(localServerName, remoteServerName, toRegisterAgainst);
+ }
+ @Override
+ public int propagatePrepare(Xid xid) throws XAException, ConnectException {
+ if (offline) {
+ throw new ConnectException("Connection refused to: " + nodeName);
+ }
+ return SubordinationManager.getTransactionImporter().getImportedTransaction(xid).doPrepare();
}
@Override
- public int getNodeName() {
- return TxControl.getXANodeName();
+ public void propagateCommit(Xid xid, boolean onePhase) throws IllegalStateException, HeuristicMixedException, HeuristicRollbackException,
+ HeuristicCommitException, SystemException, XAException, ConnectException {
+ if (offline) {
+ throw new ConnectException("Connection refused to: " + nodeName);
+ }
+ SubordinationManager.getTransactionImporter().getImportedTransaction(xid).doCommit();
}
@Override
- public long getTimeLeftBeforeTransactionTimeout() throws RollbackException {
- return ((TransactionTimeoutConfiguration) transactionManagerService.getTransactionManager()).getTimeLeftBeforeTransactionTimeout(false);
+ public void propagateRollback(Xid xid) throws IllegalStateException, HeuristicMixedException, HeuristicCommitException, HeuristicRollbackException,
+ SystemException, XAException, ConnectException {
+ if (offline) {
+ throw new ConnectException("Connection refused to: " + nodeName);
+ }
+ SubordinationManager.getTransactionImporter().getImportedTransaction(xid).doRollback();
}
@Override
- public void propagateBeforeCompletion(Xid xid) throws XAException, SystemException {
- SubordinateTransaction tx = SubordinationManager.getTransactionImporter().getImportedTransaction(xid);
- tx.doBeforeCompletion();
+ public Xid[] propagateRecover(List<Integer> recoveryScanStarted, int flag) throws XAException, ConnectException {
+ if (offline) {
+ throw new ConnectException("Connection refused to: " + nodeName);
+ }
+ // Assumes that this thread is used by the recovery thread
+ ProxyXAResource.RECOVERY_SCAN_STARTED.set(recoveryScanStarted);
+ return SubordinationManager.getXATerminator().recover(flag);
}
@Override
- public Xid getCurrentXid() throws SystemException {
- TransactionImple transaction = ((TransactionImple) transactionManagerService.getTransactionManager().getTransaction());
- return transaction.getTxId();
+ public void propagateForget(Xid xid) throws XAException, ConnectException {
+ if (offline) {
+ throw new ConnectException("Connection refused to: " + nodeName);
+ }
+ SubordinationManager.getXATerminator().forget(xid);
+
}
@Override
- public XAResource generateProxyXAResource(int currentNodeName, int nextNodeName) {
- return new ProxyXAResource(currentNodeName, nextNodeName);
+ public void propagateBeforeCompletion(Xid xid) throws XAException, SystemException, ConnectException {
+ if (offline) {
+ throw new ConnectException("Connection refused to: " + nodeName);
+ }
+ SubordinateTransaction tx = SubordinationManager.getTransactionImporter().getImportedTransaction(xid);
+ tx.doBeforeCompletion();
}
@Override
- public Synchronization generateProxySynchronization(int serverId, int serverIdToProxyTo, Xid toRegisterAgainst) {
- return new ProxySynchronization(serverId, serverIdToProxyTo, toRegisterAgainst);
+ public void setOffline(boolean offline) {
+ this.offline = offline;
}
}
More information about the jboss-svn-commits
mailing list