[jboss-svn-commits] JBL Code SVN: r37552 - in labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed: impl and 1 other directory.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Sat Oct 15 07:06:45 EDT 2011


Author: tomjenkinson
Date: 2011-10-15 07:06:44 -0400 (Sat, 15 Oct 2011)
New Revision: 37552

Added:
   labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/LocalServer.java
   labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/RemoteServer.java
Removed:
   labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/Server.java
Modified:
   labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/SimpleIsolatedServers.java
   labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/impl/ProxySynchronization.java
   labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/impl/ProxyXAResource.java
   labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/impl/ServerImpl.java
Log:
JBTM-916 updated to provide clearer guidance on what constitutes calls locally vs remote

Copied: labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/LocalServer.java (from rev 37545, labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/Server.java)
===================================================================
--- labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/LocalServer.java	                        (rev 0)
+++ labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/LocalServer.java	2011-10-15 11:06:44 UTC (rev 37552)
@@ -0,0 +1,45 @@
+package com.arjuna.ats.jta.distributed;
+
+import java.io.IOException;
+import java.net.ConnectException;
+import java.util.List;
+
+import javax.transaction.HeuristicCommitException;
+import javax.transaction.HeuristicMixedException;
+import javax.transaction.HeuristicRollbackException;
+import javax.transaction.InvalidTransactionException;
+import javax.transaction.NotSupportedException;
+import javax.transaction.RollbackException;
+import javax.transaction.Synchronization;
+import javax.transaction.SystemException;
+import javax.transaction.TransactionManager;
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import com.arjuna.ats.arjuna.common.CoreEnvironmentBeanException;
+
+public interface LocalServer {
+
+	public void initialise(Integer nodeName) throws CoreEnvironmentBeanException, IOException;
+
+	public Integer getNodeName();
+
+	public TransactionManager getTransactionManager() throws NotSupportedException, SystemException;
+
+	public void doRecoveryManagerScan();
+
+	public long getTimeLeftBeforeTransactionTimeout() throws RollbackException;
+
+	public Xid getCurrentXid() throws SystemException;
+	
+	public void setOffline(boolean offline);
+
+	public XAResource generateProxyXAResource(Integer localServerName, Integer remoteServerName);
+
+	public Synchronization generateProxySynchronization(Integer localServerName, Integer remoteServerName, Xid toRegisterAgainst);
+
+	public boolean importTransaction(int remainingTimeout, Xid toImport) throws XAException, InvalidTransactionException, IllegalStateException,
+			SystemException;
+
+}

Added: labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/RemoteServer.java
===================================================================
--- labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/RemoteServer.java	                        (rev 0)
+++ labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/RemoteServer.java	2011-10-15 11:06:44 UTC (rev 37552)
@@ -0,0 +1,29 @@
+package com.arjuna.ats.jta.distributed;
+
+import java.net.ConnectException;
+import java.util.List;
+
+import javax.transaction.HeuristicCommitException;
+import javax.transaction.HeuristicMixedException;
+import javax.transaction.HeuristicRollbackException;
+import javax.transaction.SystemException;
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.Xid;
+
+public interface RemoteServer {
+
+	public int propagatePrepare(Xid xid) throws XAException, ConnectException;
+
+	public void propagateCommit(Xid xid, boolean onePhase) throws IllegalStateException, HeuristicMixedException, HeuristicRollbackException,
+			HeuristicCommitException, SystemException, XAException, ConnectException;
+
+	public void propagateRollback(Xid xid) throws IllegalStateException, HeuristicMixedException, HeuristicCommitException, HeuristicRollbackException,
+			SystemException, XAException, ConnectException;
+
+	public Xid[] propagateRecover(List<Integer> startScanned, int flag) throws XAException, ConnectException;
+
+	public void propagateForget(Xid xid) throws XAException, ConnectException;
+
+	public void propagateBeforeCompletion(Xid xid) throws XAException, SystemException, ConnectException;
+
+}

Deleted: labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/Server.java
===================================================================
--- labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/Server.java	2011-10-15 08:34:44 UTC (rev 37551)
+++ labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/Server.java	2011-10-15 11:06:44 UTC (rev 37552)
@@ -1,55 +0,0 @@
-package com.arjuna.ats.jta.distributed;
-
-import java.io.IOException;
-import java.util.List;
-
-import javax.transaction.HeuristicCommitException;
-import javax.transaction.HeuristicMixedException;
-import javax.transaction.HeuristicRollbackException;
-import javax.transaction.InvalidTransactionException;
-import javax.transaction.NotSupportedException;
-import javax.transaction.RollbackException;
-import javax.transaction.Synchronization;
-import javax.transaction.SystemException;
-import javax.transaction.TransactionManager;
-import javax.transaction.xa.XAException;
-import javax.transaction.xa.XAResource;
-import javax.transaction.xa.Xid;
-
-import com.arjuna.ats.arjuna.common.CoreEnvironmentBeanException;
-
-public interface Server {
-
-	public void initialise(int nodeName) throws CoreEnvironmentBeanException, IOException;
-
-	public int getNodeName();
-
-	public TransactionManager getTransactionManager() throws NotSupportedException, SystemException;
-
-	public boolean importTransaction(int remainingTimeout, Xid toMigrate) throws XAException, InvalidTransactionException, IllegalStateException,
-			SystemException;
-
-	public void propagateCommit(Xid xid, boolean onePhase) throws IllegalStateException, HeuristicMixedException, HeuristicRollbackException,
-			HeuristicCommitException, SystemException, XAException;
-
-	public int propagatePrepare(Xid xid) throws XAException;
-
-	public void propagateRollback(Xid xid) throws IllegalStateException, HeuristicMixedException, HeuristicCommitException, HeuristicRollbackException,
-			SystemException, XAException;
-
-	public void propagateForget(Xid xid) throws XAException;
-
-	public void doRecoveryManagerScan();
-
-	public Xid[] propagateRecover(List<Integer> startScanned, int flag) throws XAException;
-
-	public long getTimeLeftBeforeTransactionTimeout() throws RollbackException;
-
-	public void propagateBeforeCompletion(Xid xid) throws XAException, SystemException;
-
-	public Xid getCurrentXid() throws SystemException;
-
-	public XAResource generateProxyXAResource(int currentNodeName, int nextNodeName);
-
-	public Synchronization generateProxySynchronization(int serverId, int serverIdToProxyTo, Xid toRegisterAgainst);
-}

Modified: labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/SimpleIsolatedServers.java
===================================================================
--- labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/SimpleIsolatedServers.java	2011-10-15 08:34:44 UTC (rev 37551)
+++ labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/SimpleIsolatedServers.java	2011-10-15 11:06:44 UTC (rev 37552)
@@ -4,6 +4,9 @@
 import java.io.IOException;
 import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
 
 import javax.transaction.HeuristicMixedException;
 import javax.transaction.HeuristicRollbackException;
@@ -23,7 +26,8 @@
 import com.arjuna.ats.arjuna.common.CoreEnvironmentBeanException;
 
 public class SimpleIsolatedServers {
-	private static Server[] servers = new Server[3];
+	private static LocalServer[] localServers = new LocalServer[3];
+	private static RemoteServer[] remoteServers = new RemoteServer[3];
 
 	@BeforeClass
 	public static void setup() throws SecurityException, NoSuchMethodException, InstantiationException, IllegalAccessException, ClassNotFoundException,
@@ -31,7 +35,7 @@
 
 		// Get the Server interface loaded, only way I found to do this was
 		// instantiate one
-		java.lang.reflect.Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[] { Server.class },
+		java.lang.reflect.Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[] { LocalServer.class, RemoteServer.class },
 				new InvocationHandler() {
 
 					@Override
@@ -42,16 +46,17 @@
 				});
 
 		ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
-		for (int i = 0; i < getServers().length; i++) {
+		for (int i = 0; i < localServers.length; i++) {
 			IsolatableServersClassLoader classLoader = new IsolatableServersClassLoader(contextClassLoader);
-			getServers()[i] = (Server) classLoader.loadClass("com.arjuna.ats.jta.distributed.impl.ServerImpl").newInstance();
-			getServers()[i].initialise((i + 1) * 1000);
+			localServers[i] = (LocalServer) classLoader.loadClass("com.arjuna.ats.jta.distributed.impl.ServerImpl").newInstance();
+			localServers[i].initialise((i + 1) * 1000);
+			remoteServers[i] = (RemoteServer) localServers[i];
 		}
 	}
 
 	@Test
 	public void testRecovery() {
-		getServers()[0].doRecoveryManagerScan();
+		localServers[0].doRecoveryManagerScan();
 	}
 
 	@Test
@@ -65,8 +70,8 @@
 		int startingTimeout = 0;
 
 		// Start out at the first server
-		Server originalServer = getServers()[0];
-		TransactionManager transactionManager = getServers()[0].getTransactionManager();
+		LocalServer originalServer = getLocalServer(1000);
+		TransactionManager transactionManager = originalServer.getTransactionManager();
 		transactionManager.setTransactionTimeout(startingTimeout);
 		transactionManager.begin();
 		Transaction originalTransaction = transactionManager.getTransaction();
@@ -77,27 +82,28 @@
 		// Loop through the rest of the servers passing the transaction up and
 		// down
 		Transaction suspendedTransaction = originalServer.getTransactionManager().suspend();
-		boolean proxyRequired = recursivelyFlowTransaction(0, 1, toMigrate);
+		long timeLeftBeforeTransactionTimeout = originalServer.getTimeLeftBeforeTransactionTimeout();
+		List<Integer> nodesToFlowTo = new LinkedList<Integer>(Arrays.asList(new Integer[] { 2000, 3000, 2000, 1000, 2000, 3000, 1000, 3000 }));
+		boolean proxyRequired = recursivelyFlowTransaction(nodesToFlowTo, timeLeftBeforeTransactionTimeout, toMigrate);
 		originalServer.getTransactionManager().resume(suspendedTransaction);
 		if (proxyRequired) {
-			XAResource proxyXAResource = originalServer.generateProxyXAResource(originalServer.getNodeName(), getServers()[1].getNodeName());
+			XAResource proxyXAResource = originalServer.generateProxyXAResource(originalServer.getNodeName(), 2000);
 			originalTransaction.enlistResource(proxyXAResource);
-			originalTransaction.registerSynchronization(originalServer.generateProxySynchronization(originalServer.getNodeName(),
-					getServers()[1].getNodeName(), toMigrate));
+			originalTransaction.registerSynchronization(originalServer.generateProxySynchronization(originalServer.getNodeName(), 2000, toMigrate));
 		}
 
 		Transaction transaction = transactionManager.getTransaction();
 		transaction.commit();
 	}
 
-	private boolean recursivelyFlowTransaction(int previousServerIndex, int currentServerIndex, Xid toMigrate) throws RollbackException,
+	private boolean recursivelyFlowTransaction(List<Integer> nodesToFlowTo, long timeLeftBeforeTransactionTimeout, Xid toMigrate) throws RollbackException,
 			InvalidTransactionException, IllegalStateException, XAException, SystemException, NotSupportedException {
 
-		Server previousServer = getServers()[previousServerIndex];
-		Server currentServer = getServers()[currentServerIndex];
+		Integer currentServerName = nodesToFlowTo.remove(0);
+		LocalServer currentServer = getLocalServer(currentServerName);
 
 		// Migrate the transaction to the next server
-		int remainingTimeout = (int) (previousServer.getTimeLeftBeforeTransactionTimeout() / 1000);
+		int remainingTimeout = (int) (timeLeftBeforeTransactionTimeout / 1000);
 
 		boolean requiresProxyAtPreviousServer = !currentServer.importTransaction(remainingTimeout, toMigrate);
 		// Perform work on the migrated transaction
@@ -106,38 +112,44 @@
 		transaction.registerSynchronization(new TestSynchronization(currentServer.getNodeName()));
 		transaction.enlistResource(new TestResource(currentServer.getNodeName(), false));
 
-		int nextNextServerIndex = -1;
-		if (currentServerIndex > previousServerIndex && currentServerIndex + 1 != getServers().length) {
-			// Ascending
-			nextNextServerIndex = currentServerIndex + 1;
-		} else {
-			// Descending
-			nextNextServerIndex = currentServerIndex - 1;
-		}
+		if (!nodesToFlowTo.isEmpty()) {
+			Integer nextServerNodeName = nodesToFlowTo.get(0);
 
-		// THE WORKHORSE OF FLOWING A TRANSACTION
-		// SUSPEND THE TRANSACTION
-		Transaction suspendedTransaction = currentServer.getTransactionManager().suspend();
-		boolean proxyRequired = false;
-		if (nextNextServerIndex != -1) {
+			// SUSPEND THE TRANSACTION
+			Transaction suspendedTransaction = currentServer.getTransactionManager().suspend();
 			// FLOW THE TRANSACTION
-			proxyRequired = recursivelyFlowTransaction(currentServerIndex, nextNextServerIndex, toMigrate);
+			timeLeftBeforeTransactionTimeout = currentServer.getTimeLeftBeforeTransactionTimeout();
+			boolean proxyRequired = recursivelyFlowTransaction(nodesToFlowTo, timeLeftBeforeTransactionTimeout, toMigrate);
+			// Create a proxy for the new server if necessary, this can orphan
+			// the
+			// remote server but XA recovery will handle that on the remote
+			// server
+			// The alternative is to always create a proxy but this is a
+			// performance
+			// drain and will result in multiple subordinate transactions and
+			// performance
+			// issues
+			// RESUME THE TRANSACTION IN CASE THERE IS MORE WORK
+			currentServer.getTransactionManager().resume(suspendedTransaction);
+			if (proxyRequired) {
+				XAResource proxyXAResource = currentServer.generateProxyXAResource(currentServer.getNodeName(), nextServerNodeName);
+				suspendedTransaction.enlistResource(proxyXAResource);
+				suspendedTransaction.registerSynchronization(currentServer.generateProxySynchronization(currentServer.getNodeName(), nextServerNodeName,
+						toMigrate));
+			}
 		}
-		// RESUME THE TRANSACTION IN CASE THERE IS MORE WORK
-		currentServer.getTransactionManager().resume(suspendedTransaction);
-		// Create a proxy for the new server if necessary
-		if (proxyRequired) {
-			XAResource proxyXAResource = currentServer.generateProxyXAResource(currentServer.getNodeName(), getServers()[nextNextServerIndex].getNodeName());
-			suspendedTransaction.enlistResource(proxyXAResource);
-			suspendedTransaction.registerSynchronization(currentServer.generateProxySynchronization(currentServer.getNodeName(),
-					getServers()[nextNextServerIndex].getNodeName(), toMigrate));
-		}
 		// SUSPEND THE TRANSACTION WHEN YOU ARE READY TO RETURN TO YOUR CALLER
-		suspendedTransaction = currentServer.getTransactionManager().suspend();
+		currentServer.getTransactionManager().suspend();
 		return requiresProxyAtPreviousServer;
 	}
 
-	public static Server[] getServers() {
-		return servers;
+	public static RemoteServer lookup(Integer jndiName) {
+		int index = (jndiName / 1000) - 1;
+		return remoteServers[index];
 	}
+
+	private static LocalServer getLocalServer(Integer jndiName) {
+		int index = (jndiName / 1000) - 1;
+		return localServers[index];
+	}
 }

Modified: labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/impl/ProxySynchronization.java
===================================================================
--- labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/impl/ProxySynchronization.java	2011-10-15 08:34:44 UTC (rev 37551)
+++ labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/impl/ProxySynchronization.java	2011-10-15 11:06:44 UTC (rev 37552)
@@ -1,5 +1,7 @@
 package com.arjuna.ats.jta.distributed.impl;
 
+import java.net.ConnectException;
+
 import javax.transaction.Synchronization;
 import javax.transaction.SystemException;
 import javax.transaction.xa.XAException;
@@ -9,26 +11,27 @@
 
 public class ProxySynchronization implements Synchronization {
 
-	private int serverId;
-	private int serverIdToProxyTo;
+	private int localServerName;
+	private int remoteServerName;
 	private Xid toRegisterAgainst;
 
-	public ProxySynchronization(int serverId, int serverIdToProxyTo, Xid toRegisterAgainst) {
-		this.serverId = serverId;
-		this.serverIdToProxyTo = serverIdToProxyTo;
+	public ProxySynchronization(int localServerName, int remoteServerName, Xid toRegisterAgainst) {
+		this.localServerName = localServerName;
+		this.remoteServerName = remoteServerName;
 		this.toRegisterAgainst = toRegisterAgainst;
 	}
 
 	@Override
 	public void beforeCompletion() {
-		System.out.println("ProxySynchronization (" + serverId + ":" + serverIdToProxyTo + ") beforeCompletion");
-		int index = (serverIdToProxyTo / 1000) - 1;
+		System.out.println("ProxySynchronization (" + localServerName + ":" + remoteServerName + ") beforeCompletion");
 		try {
-			SimpleIsolatedServers.getServers()[index].propagateBeforeCompletion(toRegisterAgainst);
+			SimpleIsolatedServers.lookup(remoteServerName).propagateBeforeCompletion(toRegisterAgainst);
 		} catch (XAException e) {
 			e.printStackTrace();
 		} catch (SystemException e) {
 			e.printStackTrace();
+		} catch (ConnectException e) {
+			e.printStackTrace();
 		}
 	}
 

Modified: labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/impl/ProxyXAResource.java
===================================================================
--- labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/impl/ProxyXAResource.java	2011-10-15 08:34:44 UTC (rev 37551)
+++ labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/impl/ProxyXAResource.java	2011-10-15 11:06:44 UTC (rev 37552)
@@ -6,6 +6,7 @@
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.net.ConnectException;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -20,7 +21,6 @@
 import org.jboss.tm.XAResourceWrapper;
 
 import com.arjuna.ats.arjuna.common.Uid;
-import com.arjuna.ats.jta.distributed.Server;
 import com.arjuna.ats.jta.distributed.SimpleIsolatedServers;
 
 public class ProxyXAResource implements XAResource, XAResourceWrapper {
@@ -29,17 +29,17 @@
 
 	private int transactionTimeout;
 	private Xid xid;
-	private int serverIdToProxyTo = -1;
+	private Integer remoteServerName = -1;
 	private File file;
-	private Integer serverId;
+	private Integer localServerName;
 
-	public ProxyXAResource(int serverId, int serverIdToProxyTo) {
-		this.serverId = serverId;
-		this.serverIdToProxyTo = serverIdToProxyTo;
+	public ProxyXAResource(Integer localServerName, Integer remoteServerName) {
+		this.localServerName = localServerName;
+		this.remoteServerName = remoteServerName;
 	}
 
-	public ProxyXAResource(int recoverFor, File file) throws IOException {
-		this.serverId = recoverFor;
+	public ProxyXAResource(Integer localServerName, File file) throws IOException {
+		this.localServerName = localServerName;
 		this.file = file;
 		DataInputStream fis = new DataInputStream(new FileInputStream(file));
 		final int formatId = fis.readInt();
@@ -49,8 +49,8 @@
 		final int bqual_length = fis.readInt();
 		final byte[] bqual = new byte[bqual_length];
 		fis.read(bqual, 0, bqual_length);
-		int serverIdToProxyTo = fis.readInt();
-		this.serverIdToProxyTo = serverIdToProxyTo;
+		int remoteServerName = fis.readInt();
+		this.remoteServerName = remoteServerName;
 		this.xid = new Xid() {
 			@Override
 			public byte[] getGlobalTransactionId() {
@@ -111,22 +111,22 @@
 
 	@Override
 	public void start(Xid xid, int flags) throws XAException {
-		System.out.println("     ProxyXAResource (" + serverId + ":" + serverIdToProxyTo + ") XA_START   [" + xid + "]");
+		System.out.println("     ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_START   [" + xid + "]");
 		this.xid = xid;
 	}
 
 	@Override
 	public void end(Xid xid, int flags) throws XAException {
-		System.out.println("     ProxyXAResource (" + serverId + ":" + serverIdToProxyTo + ") XA_END     [" + xid + "]");
+		System.out.println("     ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_END     [" + xid + "]");
 		this.xid = null;
 	}
 
 	@Override
 	public synchronized int prepare(Xid xid) throws XAException {
-		System.out.println("     ProxyXAResource (" + serverId + ":" + serverIdToProxyTo + ") XA_PREPARE [" + xid + "]");
+		System.out.println("     ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_PREPARE [" + xid + "]");
 
 		try {
-			File dir = new File(System.getProperty("user.dir") + "/tmp/ProxyXAResource/" + serverId + "/");
+			File dir = new File(System.getProperty("user.dir") + "/tmp/ProxyXAResource/" + localServerName + "/");
 			dir.mkdirs();
 			file = new File(dir, new Uid().fileStringForm());
 
@@ -144,24 +144,28 @@
 			fos.write(gtrid, 0, gtrid_length);
 			fos.writeInt(bqual_length);
 			fos.write(bqual, 0, bqual_length);
-			fos.writeInt(serverIdToProxyTo);
+			fos.writeInt(remoteServerName);
 		} catch (IOException e) {
 			e.printStackTrace();
 			throw new XAException(XAException.XAER_RMERR);
 		}
 
-		int propagatePrepare = getServerToProxyTo().propagatePrepare(xid);
-		System.out.println("     ProxyXAResource (" + serverId + ":" + serverIdToProxyTo + ") XA_PREPARED");
-		return propagatePrepare;
+		try {
+			int propagatePrepare = SimpleIsolatedServers.lookup(remoteServerName).propagatePrepare(xid);
+			System.out.println("     ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_PREPARED");
+			return propagatePrepare;
+		} catch (ConnectException ce) {
+			throw new XAException(XAException.XA_RETRY);
+		}
 	}
 
 	@Override
 	public synchronized void commit(Xid xid, boolean onePhase) throws XAException {
-		System.out.println("     ProxyXAResource (" + serverId + ":" + serverIdToProxyTo + ") XA_COMMIT  [" + xid + "]");
+		System.out.println("     ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_COMMIT  [" + xid + "]");
 
 		try {
-			getServerToProxyTo().propagateCommit(xid, onePhase);
-			System.out.println("     ProxyXAResource (" + serverId + ":" + serverIdToProxyTo + ") XA_COMMITED");
+			SimpleIsolatedServers.lookup(remoteServerName).propagateCommit(xid, onePhase);
+			System.out.println("     ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_COMMITED");
 		} catch (IllegalStateException e) {
 			throw new XAException(XAException.XAER_INVAL);
 		} catch (HeuristicMixedException e) {
@@ -172,6 +176,8 @@
 			throw new XAException(XAException.XA_HEURCOM);
 		} catch (SystemException e) {
 			throw new XAException(XAException.XAER_PROTO);
+		} catch (ConnectException ce) {
+			throw new XAException(XAException.XA_RETRY);
 		}
 
 		if (file != null) {
@@ -181,10 +187,10 @@
 
 	@Override
 	public synchronized void rollback(Xid xid) throws XAException {
-		System.out.println("     ProxyXAResource (" + serverId + ":" + serverIdToProxyTo + ") XA_ROLLBACK[" + xid + "]");
+		System.out.println("     ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_ROLLBACK[" + xid + "]");
 		try {
-			getServerToProxyTo().propagateRollback(xid);
-			System.out.println("     ProxyXAResource (" + serverId + ":" + serverIdToProxyTo + ") XA_ROLLBACKED");
+			SimpleIsolatedServers.lookup(remoteServerName).propagateRollback(xid);
+			System.out.println("     ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_ROLLBACKED");
 		} catch (IllegalStateException e) {
 			throw new XAException(XAException.XAER_INVAL);
 		} catch (HeuristicMixedException e) {
@@ -195,6 +201,8 @@
 			throw new XAException(XAException.XA_HEURRB);
 		} catch (SystemException e) {
 			throw new XAException(XAException.XAER_PROTO);
+		} catch (ConnectException ce) {
+			throw new XAException(XAException.XA_RETRY);
 		}
 
 		if (file != null) {
@@ -212,28 +220,43 @@
 
 		int tocheck = (flag & XAResource.TMSTARTRSCAN);
 		if (tocheck == XAResource.TMSTARTRSCAN) {
-			System.out.println("     ProxyXAResource (" + serverId + ":" + serverIdToProxyTo + ") XA_RECOVER [XAResource.TMSTARTRSCAN]: " + serverIdToProxyTo);
+			System.out.println("     ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_RECOVER [XAResource.TMSTARTRSCAN]: "
+					+ remoteServerName);
 
-			if (!startScanned.contains(serverIdToProxyTo)) {
-				startScanned.add(serverIdToProxyTo);
+			if (!startScanned.contains(remoteServerName)) {
+				startScanned.add(remoteServerName);
 
 				// Make sure that the remote server has recovered all
 				// transactions
-				getServerToProxyTo().propagateRecover(startScanned, flag);
-				startScanned.remove((Integer) serverIdToProxyTo);
+				try {
+					SimpleIsolatedServers.lookup(remoteServerName).propagateRecover(startScanned, flag);
+				} catch (ConnectException ce) {
+					throw new XAException(XAException.XA_RETRY);
+				} finally {
+					startScanned.remove((Integer) remoteServerName);
+				}
 			}
 
-			System.out.println("     ProxyXAResource (" + serverId + ":" + serverIdToProxyTo + ") XA_RECOVERD[XAResource.TMSTARTRSCAN]: " + serverIdToProxyTo);
+			System.out.println("     ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_RECOVERD[XAResource.TMSTARTRSCAN]: "
+					+ remoteServerName);
 		}
 		tocheck = (flag & XAResource.TMENDRSCAN);
 		if (tocheck == XAResource.TMENDRSCAN) {
-			System.out.println("     ProxyXAResource (" + serverId + ":" + serverIdToProxyTo + ") XA_RECOVER [XAResource.TMENDRSCAN]: " + serverIdToProxyTo);
+			System.out.println("     ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_RECOVER [XAResource.TMENDRSCAN]: "
+					+ remoteServerName);
 
-			if (!startScanned.contains(serverIdToProxyTo)) {
-				getServerToProxyTo().propagateRecover(startScanned, flag);
+			if (!startScanned.contains(remoteServerName)) {
+				try {
+					SimpleIsolatedServers.lookup(remoteServerName).propagateRecover(startScanned, flag);
+				} catch (ConnectException ce) {
+					throw new XAException(XAException.XA_RETRY);
+				} finally {
+					startScanned.remove((Integer) remoteServerName);
+				}
 			}
 
-			System.out.println("     ProxyXAResource (" + serverId + ":" + serverIdToProxyTo + ") XA_RECOVERD[XAResource.TMENDRSCAN]: " + serverIdToProxyTo);
+			System.out.println("     ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_RECOVERD[XAResource.TMENDRSCAN]: "
+					+ remoteServerName);
 		}
 
 		return new Xid[] { xid };
@@ -241,9 +264,13 @@
 
 	@Override
 	public void forget(Xid xid) throws XAException {
-		System.out.println("     ProxyXAResource (" + serverId + ":" + serverIdToProxyTo + ") XA_FORGET  [" + xid + "]");
-		getServerToProxyTo().propagateForget(xid);
-		System.out.println("     ProxyXAResource (" + serverId + ":" + serverIdToProxyTo + ") XA_FORGETED[" + xid + "]");
+		System.out.println("     ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_FORGET  [" + xid + "]");
+		try {
+			SimpleIsolatedServers.lookup(remoteServerName).propagateForget(xid);
+			System.out.println("     ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_FORGETED[" + xid + "]");
+		} catch (ConnectException ce) {
+			throw new XAException(XAException.XA_RETRY);
+		}
 	}
 
 	@Override
@@ -290,9 +317,4 @@
 	public String getJndiName() {
 		return "ProxyXAResource";
 	}
-
-	private Server getServerToProxyTo() {
-		int index = (serverIdToProxyTo / 1000) - 1;
-		return SimpleIsolatedServers.getServers()[index];
-	}
 }

Modified: labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/impl/ServerImpl.java
===================================================================
--- labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/impl/ServerImpl.java	2011-10-15 08:34:44 UTC (rev 37551)
+++ labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/impl/ServerImpl.java	2011-10-15 11:06:44 UTC (rev 37552)
@@ -1,6 +1,7 @@
 package com.arjuna.ats.jta.distributed.impl;
 
 import java.io.IOException;
+import java.net.ConnectException;
 import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -26,7 +27,6 @@
 import com.arjuna.ats.arjuna.common.CoreEnvironmentBeanException;
 import com.arjuna.ats.arjuna.common.ObjectStoreEnvironmentBean;
 import com.arjuna.ats.arjuna.common.RecoveryEnvironmentBean;
-import com.arjuna.ats.arjuna.coordinator.TransactionReaper;
 import com.arjuna.ats.arjuna.coordinator.TxControl;
 import com.arjuna.ats.arjuna.recovery.RecoveryManager;
 import com.arjuna.ats.arjuna.tools.osb.mbean.ObjStoreBrowser;
@@ -37,25 +37,27 @@
 import com.arjuna.ats.jbossatx.jta.RecoveryManagerService;
 import com.arjuna.ats.jbossatx.jta.TransactionManagerService;
 import com.arjuna.ats.jta.common.JTAEnvironmentBean;
-import com.arjuna.ats.jta.distributed.Server;
+import com.arjuna.ats.jta.distributed.RemoteServer;
+import com.arjuna.ats.jta.distributed.LocalServer;
 import com.arjuna.ats.jta.distributed.TestResourceRecovery;
 
-public class ServerImpl implements Server {
+public class ServerImpl implements LocalServer, RemoteServer {
 
-	private int id;
+	private int nodeName;
 	private RecoveryManagerService recoveryManagerService;
 	private TransactionManagerService transactionManagerService;
+	private boolean offline;
 
-	public void initialise(int id) throws CoreEnvironmentBeanException, IOException {
-		this.id = id;
+	public void initialise(Integer serverName) throws CoreEnvironmentBeanException, IOException {
+		this.nodeName = serverName;
 
 		RecoveryEnvironmentBean recoveryEnvironmentBean = com.arjuna.ats.arjuna.common.recoveryPropertyManager.getRecoveryEnvironmentBean();
 		recoveryEnvironmentBean.setRecoveryBackoffPeriod(1);
 
 		recoveryEnvironmentBean.setRecoveryInetAddress(InetAddress.getByName("localhost"));
-		recoveryEnvironmentBean.setRecoveryPort(4712 + id);
+		recoveryEnvironmentBean.setRecoveryPort(4712 + serverName);
 		recoveryEnvironmentBean.setTransactionStatusManagerInetAddress(InetAddress.getByName("localhost"));
-		recoveryEnvironmentBean.setTransactionStatusManagerPort(4713 + id);
+		recoveryEnvironmentBean.setTransactionStatusManagerPort(4713 + serverName);
 		List<String> recoveryModuleClassNames = new ArrayList<String>();
 
 		recoveryModuleClassNames.add("com.arjuna.ats.internal.arjuna.recovery.AtomicActionRecoveryModule");
@@ -68,26 +70,27 @@
 		recoveryEnvironmentBean.setRecoveryActivators(null);
 
 		CoreEnvironmentBean coreEnvironmentBean = com.arjuna.ats.arjuna.common.arjPropertyManager.getCoreEnvironmentBean();
-		coreEnvironmentBean.setSocketProcessIdPort(4714 + id);
-		coreEnvironmentBean.setNodeIdentifier(id);
+		coreEnvironmentBean.setSocketProcessIdPort(4714 + serverName);
+		coreEnvironmentBean.setNodeIdentifier(serverName);
 		coreEnvironmentBean.setSocketProcessIdMaxPorts(1);
 
 		CoordinatorEnvironmentBean coordinatorEnvironmentBean = com.arjuna.ats.arjuna.common.arjPropertyManager.getCoordinatorEnvironmentBean();
 		coordinatorEnvironmentBean.setEnableStatistics(false);
 		coordinatorEnvironmentBean.setDefaultTimeout(300);
 		coordinatorEnvironmentBean.setTransactionStatusManagerEnable(false);
+		coordinatorEnvironmentBean.setDefaultTimeout(0);
 
 		ObjectStoreEnvironmentBean actionStoreObjectStoreEnvironmentBean = com.arjuna.common.internal.util.propertyservice.BeanPopulator.getNamedInstance(
 				com.arjuna.ats.arjuna.common.ObjectStoreEnvironmentBean.class, "default");
-		actionStoreObjectStoreEnvironmentBean.setObjectStoreDir(System.getProperty("user.dir") + "/tmp/tx-object-store/" + id);
+		actionStoreObjectStoreEnvironmentBean.setObjectStoreDir(System.getProperty("user.dir") + "/tmp/tx-object-store/" + serverName);
 
 		ObjectStoreEnvironmentBean stateStoreObjectStoreEnvironmentBean = com.arjuna.common.internal.util.propertyservice.BeanPopulator.getNamedInstance(
 				com.arjuna.ats.arjuna.common.ObjectStoreEnvironmentBean.class, "stateStore");
-		stateStoreObjectStoreEnvironmentBean.setObjectStoreDir(System.getProperty("user.dir") + "/tmp/tx-object-store/" + id);
+		stateStoreObjectStoreEnvironmentBean.setObjectStoreDir(System.getProperty("user.dir") + "/tmp/tx-object-store/" + serverName);
 
 		ObjectStoreEnvironmentBean communicationStoreObjectStoreEnvironmentBean = com.arjuna.common.internal.util.propertyservice.BeanPopulator
 				.getNamedInstance(com.arjuna.ats.arjuna.common.ObjectStoreEnvironmentBean.class, "communicationStore");
-		communicationStoreObjectStoreEnvironmentBean.setObjectStoreDir(System.getProperty("user.dir") + "/tmp/tx-object-store/" + id);
+		communicationStoreObjectStoreEnvironmentBean.setObjectStoreDir(System.getProperty("user.dir") + "/tmp/tx-object-store/" + serverName);
 
 		ObjStoreBrowser objStoreBrowser = new ObjStoreBrowser();
 		Map<String, String> types = new HashMap<String, String>();
@@ -101,7 +104,7 @@
 		jTAEnvironmentBean
 				.setTransactionSynchronizationRegistryClassName("com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionSynchronizationRegistryImple");
 		List<Integer> xaRecoveryNodes = new ArrayList<Integer>();
-		xaRecoveryNodes.add(id);
+		xaRecoveryNodes.add(serverName);
 		jTAEnvironmentBean.setXaRecoveryNodes(xaRecoveryNodes);
 
 		List<String> xaResourceOrphanFilterClassNames = new ArrayList<String>();
@@ -114,29 +117,26 @@
 
 		recoveryManagerService = new RecoveryManagerService();
 		recoveryManagerService.create();
-		recoveryManagerService.addXAResourceRecovery(new ProxyXAResourceRecovery(id));
-		recoveryManagerService.addXAResourceRecovery(new TestResourceRecovery(id));
+		recoveryManagerService.addXAResourceRecovery(new ProxyXAResourceRecovery(serverName));
+		recoveryManagerService.addXAResourceRecovery(new TestResourceRecovery(serverName));
 		// recoveryManagerService.start();
 		RecoveryManager.manager().initialize();
 
 		transactionManagerService = new TransactionManagerService();
 		TxControl txControl = new com.arjuna.ats.arjuna.coordinator.TxControl();
-		txControl.setDefaultTimeout(0);
 		transactionManagerService.setJbossXATerminator(new com.arjuna.ats.internal.jbossatx.jta.jca.XATerminator());
 		transactionManagerService
 				.setTransactionSynchronizationRegistry(new com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionSynchronizationRegistryImple());
-		// starts the transaction reaper transactionManagerService.create();
+		transactionManagerService.create();
 
 	}
 
+	@Override
 	public void doRecoveryManagerScan() {
 		RecoveryManager.manager().scan();
 	}
 
-	public void startTransactionReaper() {
-		TransactionReaper.transactionReaper();
-	}
-
+	@Override
 	public TransactionManager getTransactionManager() {
 		return transactionManagerService.getTransactionManager();
 	}
@@ -155,64 +155,87 @@
 	}
 
 	@Override
-	public int propagatePrepare(Xid xid) throws XAException {
-		return SubordinationManager.getTransactionImporter().getImportedTransaction(xid).doPrepare();
+	public Integer getNodeName() {
+		return nodeName;
 	}
 
 	@Override
-	public void propagateCommit(Xid xid, boolean onePhase) throws IllegalStateException, HeuristicMixedException, HeuristicRollbackException,
-			HeuristicCommitException, SystemException, XAException {
-		SubordinationManager.getTransactionImporter().getImportedTransaction(xid).doCommit();
+	public long getTimeLeftBeforeTransactionTimeout() throws RollbackException {
+		return ((TransactionTimeoutConfiguration) transactionManagerService.getTransactionManager()).getTimeLeftBeforeTransactionTimeout(false);
 	}
 
 	@Override
-	public void propagateRollback(Xid xid) throws IllegalStateException, HeuristicMixedException, HeuristicCommitException, HeuristicRollbackException,
-			SystemException, XAException {
-		SubordinationManager.getTransactionImporter().getImportedTransaction(xid).doRollback();
+	public Xid getCurrentXid() throws SystemException {
+		TransactionImple transaction = ((TransactionImple) transactionManagerService.getTransactionManager().getTransaction());
+		return transaction.getTxId();
 	}
 
 	@Override
-	public Xid[] propagateRecover(List<Integer> recoveryScanStarted, int flag) throws XAException {
-		// Assumes that this thread is used by the recovery thread
-		ProxyXAResource.RECOVERY_SCAN_STARTED.set(recoveryScanStarted);
-		return SubordinationManager.getXATerminator().recover(flag);
+	public XAResource generateProxyXAResource(Integer localServerName, Integer remoteServerName) {
+		return new ProxyXAResource(localServerName, remoteServerName);
 	}
 
 	@Override
-	public void propagateForget(Xid xid) throws XAException {
-		SubordinationManager.getXATerminator().forget(xid);
+	public Synchronization generateProxySynchronization(Integer localServerName, Integer remoteServerName, Xid toRegisterAgainst) {
+		return new ProxySynchronization(localServerName, remoteServerName, toRegisterAgainst);
+	}
 
+	@Override
+	public int propagatePrepare(Xid xid) throws XAException, ConnectException {
+		if (offline) {
+			throw new ConnectException("Connection refused to: " + nodeName);
+		}
+		return SubordinationManager.getTransactionImporter().getImportedTransaction(xid).doPrepare();
 	}
 
 	@Override
-	public int getNodeName() {
-		return TxControl.getXANodeName();
+	public void propagateCommit(Xid xid, boolean onePhase) throws IllegalStateException, HeuristicMixedException, HeuristicRollbackException,
+			HeuristicCommitException, SystemException, XAException, ConnectException {
+		if (offline) {
+			throw new ConnectException("Connection refused to: " + nodeName);
+		}
+		SubordinationManager.getTransactionImporter().getImportedTransaction(xid).doCommit();
 	}
 
 	@Override
-	public long getTimeLeftBeforeTransactionTimeout() throws RollbackException {
-		return ((TransactionTimeoutConfiguration) transactionManagerService.getTransactionManager()).getTimeLeftBeforeTransactionTimeout(false);
+	public void propagateRollback(Xid xid) throws IllegalStateException, HeuristicMixedException, HeuristicCommitException, HeuristicRollbackException,
+			SystemException, XAException, ConnectException {
+		if (offline) {
+			throw new ConnectException("Connection refused to: " + nodeName);
+		}
+		SubordinationManager.getTransactionImporter().getImportedTransaction(xid).doRollback();
 	}
 
 	@Override
-	public void propagateBeforeCompletion(Xid xid) throws XAException, SystemException {
-		SubordinateTransaction tx = SubordinationManager.getTransactionImporter().getImportedTransaction(xid);
-		tx.doBeforeCompletion();
+	public Xid[] propagateRecover(List<Integer> recoveryScanStarted, int flag) throws XAException, ConnectException {
+		if (offline) {
+			throw new ConnectException("Connection refused to: " + nodeName);
+		}
+		// Assumes that this thread is used by the recovery thread
+		ProxyXAResource.RECOVERY_SCAN_STARTED.set(recoveryScanStarted);
+		return SubordinationManager.getXATerminator().recover(flag);
 	}
 
 	@Override
-	public Xid getCurrentXid() throws SystemException {
-		TransactionImple transaction = ((TransactionImple) transactionManagerService.getTransactionManager().getTransaction());
-		return transaction.getTxId();
+	public void propagateForget(Xid xid) throws XAException, ConnectException {
+		if (offline) {
+			throw new ConnectException("Connection refused to: " + nodeName);
+		}
+		SubordinationManager.getXATerminator().forget(xid);
+
 	}
 
 	@Override
-	public XAResource generateProxyXAResource(int currentNodeName, int nextNodeName) {
-		return new ProxyXAResource(currentNodeName, nextNodeName);
+	public void propagateBeforeCompletion(Xid xid) throws XAException, SystemException, ConnectException {
+		if (offline) {
+			throw new ConnectException("Connection refused to: " + nodeName);
+		}
+		SubordinateTransaction tx = SubordinationManager.getTransactionImporter().getImportedTransaction(xid);
+		tx.doBeforeCompletion();
 	}
 
 	@Override
-	public Synchronization generateProxySynchronization(int serverId, int serverIdToProxyTo, Xid toRegisterAgainst) {
-		return new ProxySynchronization(serverId, serverIdToProxyTo, toRegisterAgainst);
+	public void setOffline(boolean offline) {
+		this.offline = offline;
 	}
 }



More information about the jboss-svn-commits mailing list