[jboss-svn-commits] JBL Code SVN: r37629 - in labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration: examples/classes/com/arjuna/jta/distributed/example/server and 4 other directories.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Thu Oct 20 05:51:24 EDT 2011


Author: tomjenkinson
Date: 2011-10-20 05:51:23 -0400 (Thu, 20 Oct 2011)
New Revision: 37629

Modified:
   labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/examples/classes/com/arjuna/jta/distributed/example/ExampleDistributedJTATestCase.java
   labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/examples/classes/com/arjuna/jta/distributed/example/server/LocalServer.java
   labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/examples/classes/com/arjuna/jta/distributed/example/server/RemoteServer.java
   labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/examples/classes/com/arjuna/jta/distributed/example/server/impl/ProxyXAResource.java
   labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/examples/classes/com/arjuna/jta/distributed/example/server/impl/ProxyXAResourceRecovery.java
   labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/examples/classes/com/arjuna/jta/distributed/example/server/impl/ServerImpl.java
   labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/SimpleIsolatedServers.java
   labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/LocalServer.java
   labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/impl/ServerImpl.java
Log:
JBTM-895 make sure that when we create a proxy we have a record of the current transaction, this is important in case the remote side crashes mid action and we therefore need to be able to roll it back

Modified: labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/examples/classes/com/arjuna/jta/distributed/example/ExampleDistributedJTATestCase.java
===================================================================
--- labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/examples/classes/com/arjuna/jta/distributed/example/ExampleDistributedJTATestCase.java	2011-10-20 09:26:01 UTC (rev 37628)
+++ labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/examples/classes/com/arjuna/jta/distributed/example/ExampleDistributedJTATestCase.java	2011-10-20 09:51:23 UTC (rev 37629)
@@ -28,7 +28,6 @@
 
 import javax.transaction.HeuristicMixedException;
 import javax.transaction.HeuristicRollbackException;
-import javax.transaction.InvalidTransactionException;
 import javax.transaction.NotSupportedException;
 import javax.transaction.RollbackException;
 import javax.transaction.SystemException;
@@ -68,8 +67,8 @@
 	}
 
 	@Test
-	public void testMigrateTransaction() throws NotSupportedException, SystemException, IllegalStateException, RollbackException, InvalidTransactionException,
-			XAException, SecurityException, HeuristicMixedException, HeuristicRollbackException {
+	public void testMigrateTransaction() throws NotSupportedException, SystemException, IllegalStateException, RollbackException, XAException,
+			SecurityException, HeuristicMixedException, HeuristicRollbackException, IOException {
 
 		int startingTimeout = 0;
 		List<Integer> nodesToFlowTo = new LinkedList<Integer>(Arrays.asList(new Integer[] { 1000, 2000, 3000, 2000, 1000, 2000, 3000, 1000, 3000 }));
@@ -96,30 +95,23 @@
 			// SUSPEND THE TRANSACTION
 			Xid currentXid = originalServer.getCurrentXid();
 			originalServer.storeRootTransaction();
+			XAResource proxyXAResource = originalServer.generateProxyXAResource(lookupProvider, nextServerNodeName);
 			transactionManager.suspend();
-			boolean proxyRequired = performTransactionalWork(nodesToFlowTo, remainingTimeout, currentXid);
+			performTransactionalWork(nodesToFlowTo, remainingTimeout, currentXid);
 			transactionManager.resume(originalTransaction);
 
-			// Create a proxy for the new server if necessary, this can orphan
-			// the remote server but XA recovery will handle that on the remote
-			// server
-			// The alternative is to always create a proxy but this is a
-			// performance drain and will result in multiple subordinate
-			// transactions and performance issues
-			if (proxyRequired) {
-				XAResource proxyXAResource = originalServer.generateProxyXAResource(lookupProvider, originalServer.getNodeName(), nextServerNodeName);
-				originalTransaction.enlistResource(proxyXAResource);
-				originalTransaction.registerSynchronization(originalServer.generateProxySynchronization(lookupProvider, originalServer.getNodeName(),
-						nextServerNodeName, currentXid));
-			}
+			originalTransaction.enlistResource(proxyXAResource);
+			originalTransaction.registerSynchronization(originalServer.generateProxySynchronization(lookupProvider, originalServer.getNodeName(),
+					nextServerNodeName, currentXid));
+
 			originalServer.removeRootTransaction(currentXid);
 		}
 		transactionManager.commit();
 		Thread.currentThread().setContextClassLoader(classLoader);
 	}
 
-	private boolean performTransactionalWork(List<Integer> nodesToFlowTo, int remainingTimeout, Xid toMigrate) throws RollbackException,
-			InvalidTransactionException, IllegalStateException, XAException, SystemException, NotSupportedException {
+	private boolean performTransactionalWork(List<Integer> nodesToFlowTo, int remainingTimeout, Xid toMigrate) throws RollbackException, IllegalStateException,
+			XAException, SystemException, NotSupportedException, IOException {
 		Integer currentServerName = nodesToFlowTo.remove(0);
 		LocalServer currentServer = getLocalServer(currentServerName);
 
@@ -141,15 +133,17 @@
 
 			// SUSPEND THE TRANSACTION
 			Xid currentXid = currentServer.getCurrentXid();
+			XAResource proxyXAResource = currentServer.generateProxyXAResource(lookupProvider, nextServerNodeName);
 			transactionManager.suspend();
 			boolean proxyRequired = performTransactionalWork(nodesToFlowTo, remainingTimeout, currentXid);
 			transactionManager.resume(transaction);
 
 			if (proxyRequired) {
-				XAResource proxyXAResource = currentServer.generateProxyXAResource(lookupProvider, currentServer.getNodeName(), nextServerNodeName);
 				transaction.enlistResource(proxyXAResource);
 				transaction.registerSynchronization(currentServer.generateProxySynchronization(lookupProvider, currentServer.getNodeName(), nextServerNodeName,
 						toMigrate));
+			} else {
+				currentServer.cleanupProxyXAResource(proxyXAResource);
 			}
 		}
 

Modified: labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/examples/classes/com/arjuna/jta/distributed/example/server/LocalServer.java
===================================================================
--- labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/examples/classes/com/arjuna/jta/distributed/example/server/LocalServer.java	2011-10-20 09:26:01 UTC (rev 37628)
+++ labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/examples/classes/com/arjuna/jta/distributed/example/server/LocalServer.java	2011-10-20 09:51:23 UTC (rev 37629)
@@ -21,6 +21,7 @@
  */
 package com.arjuna.jta.distributed.example.server;
 
+import java.io.File;
 import java.io.IOException;
 
 import javax.transaction.InvalidTransactionException;
@@ -52,8 +53,10 @@
 
 	public RemoteServer connectTo();
 
-	public XAResource generateProxyXAResource(LookupProvider lookupProvider, Integer localServerName, Integer remoteServerName);
+	public XAResource generateProxyXAResource(LookupProvider lookupProvider, Integer remoteServerName) throws IOException, SystemException;
 
+	public void cleanupProxyXAResource(XAResource proxyXAResource);
+
 	public Synchronization generateProxySynchronization(LookupProvider lookupProvider, Integer localServerName, Integer remoteServerName, Xid toRegisterAgainst);
 
 	public Xid getCurrentXid() throws SystemException;

Modified: labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/examples/classes/com/arjuna/jta/distributed/example/server/RemoteServer.java
===================================================================
--- labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/examples/classes/com/arjuna/jta/distributed/example/server/RemoteServer.java	2011-10-20 09:26:01 UTC (rev 37628)
+++ labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/examples/classes/com/arjuna/jta/distributed/example/server/RemoteServer.java	2011-10-20 09:51:23 UTC (rev 37629)
@@ -33,7 +33,7 @@
 
 	public void propagateRollback(Xid xid) throws XAException, DummyRemoteException;
 
-	public Xid[] propagateRecover(int formatId, byte[] gtrid) throws XAException, DummyRemoteException;
+	public Xid[] propagateRecover(Integer nodeName) throws XAException, DummyRemoteException;
 
 	public void propagateForget(Xid xid) throws XAException, DummyRemoteException;
 

Modified: labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/examples/classes/com/arjuna/jta/distributed/example/server/impl/ProxyXAResource.java
===================================================================
--- labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/examples/classes/com/arjuna/jta/distributed/example/server/impl/ProxyXAResource.java	2011-10-20 09:26:01 UTC (rev 37628)
+++ labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/examples/classes/com/arjuna/jta/distributed/example/server/impl/ProxyXAResource.java	2011-10-20 09:51:23 UTC (rev 37629)
@@ -21,12 +21,16 @@
  */
 package com.arjuna.jta.distributed.example.server.impl;
 
-import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.File;
-import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
 
 import javax.transaction.xa.XAException;
 import javax.transaction.xa.XAResource;
@@ -46,10 +50,10 @@
 
 	private int transactionTimeout;
 	private Integer remoteServerName = -1;
-	private File file;
+	private Map<Xid, File> map;
 	private Integer localServerName;
 	private LookupProvider lookupProvider;
-	private Xid xid;
+	private File file;
 
 	/**
 	 * Create a new proxy to the remote server.
@@ -58,10 +62,12 @@
 	 * @param localServerName
 	 * @param remoteServerName
 	 */
-	public ProxyXAResource(LookupProvider lookupProvider, Integer localServerName, Integer remoteServerName) {
+	public ProxyXAResource(LookupProvider lookupProvider, Integer localServerName, Integer remoteServerName, File file) {
 		this.lookupProvider = lookupProvider;
 		this.localServerName = localServerName;
 		this.remoteServerName = remoteServerName;
+		this.file = file;
+		map = new HashMap<Xid, File>();
 	}
 
 	/**
@@ -69,38 +75,20 @@
 	 * 
 	 * @param lookupProvider
 	 * @param localServerName
+	 * @param map
+	 * @param remoteServerName
 	 * @param file
 	 * @throws IOException
 	 */
-	public ProxyXAResource(LookupProvider lookupProvider, Integer localServerName, File file) throws IOException {
+	public ProxyXAResource(LookupProvider lookupProvider, Integer localServerName, Integer remoteServerName, Map<Xid, File> map) throws IOException {
 		this.lookupProvider = lookupProvider;
 		this.localServerName = localServerName;
-		this.file = file;
-		DataInputStream fis = new DataInputStream(new FileInputStream(file));
-		this.remoteServerName = fis.readInt();
-		final int formatId = fis.readInt();
-		int gtrid_length = fis.readInt();
-		final byte[] gtrid = new byte[gtrid_length];
-		fis.read(gtrid, 0, gtrid_length);
-		int bqual_length = fis.readInt();
-		final byte[] bqual = new byte[bqual_length];
-		fis.read(bqual, 0, bqual_length);
-		this.xid = new Xid() {
-			@Override
-			public byte[] getBranchQualifier() {
-				return bqual;
-			}
+		this.remoteServerName = remoteServerName;
+		this.map = map;
+	}
 
-			@Override
-			public int getFormatId() {
-				return formatId;
-			}
-
-			@Override
-			public byte[] getGlobalTransactionId() {
-				return gtrid;
-			}
-		};
+	public void deleteTemporaryFile() {
+		this.file.delete();
 	}
 
 	/**
@@ -109,7 +97,6 @@
 	@Override
 	public void start(Xid xid, int flags) throws XAException {
 		System.out.println("     ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_START   [" + xid + "]");
-		this.xid = xid;
 	}
 
 	/**
@@ -118,7 +105,6 @@
 	@Override
 	public void end(Xid xid, int flags) throws XAException {
 		System.out.println("     ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_END     [" + xid + "]");
-		this.xid = null;
 	}
 
 	/**
@@ -130,9 +116,38 @@
 	public synchronized int prepare(Xid xid) throws XAException {
 		System.out.println("     ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_PREPARE [" + xid + "]");
 
-		persistProxy(xid);
+		// Persist a proxy for the remote server this can mean we try to recover
+		// a transaction at a remote server that did not get chance to
+		// prepare but the alternative is to orphan a prepared server
 
 		try {
+			File dir = new File(System.getProperty("user.dir") + "/distributedjta/ProxyXAResource/" + localServerName + "/");
+			dir.mkdirs();
+			File file = new File(dir, new Uid().fileStringForm());
+			file.createNewFile();
+			DataOutputStream fos = new DataOutputStream(new FileOutputStream(file));
+			fos.writeInt(remoteServerName);
+			fos.writeInt(xid.getFormatId());
+			fos.writeInt(xid.getGlobalTransactionId().length);
+			fos.write(xid.getGlobalTransactionId());
+			fos.writeInt(xid.getBranchQualifier().length);
+			fos.write(xid.getBranchQualifier());
+
+			if (map.containsKey(xid)) {
+				System.out.println(map.get(xid));
+				map.remove(xid).delete();
+			}
+			if (this.file != null) {
+				this.file.delete();
+			}
+
+			map.put(xid, file);
+		} catch (IOException e) {
+			e.printStackTrace();
+			throw new XAException(XAException.XAER_RMERR);
+		}
+
+		try {
 			int propagatePrepare = lookupProvider.lookup(remoteServerName).propagatePrepare(xid);
 			System.out.println("     ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_PREPARED");
 			return propagatePrepare;
@@ -145,8 +160,6 @@
 	public synchronized void commit(Xid xid, boolean onePhase) throws XAException {
 		System.out.println("     ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_COMMIT  [" + xid + "]");
 
-		persistProxy(xid);
-
 		try {
 			lookupProvider.lookup(remoteServerName).propagateCommit(xid, onePhase);
 			System.out.println("     ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_COMMITED");
@@ -154,6 +167,12 @@
 			throw new XAException(XAException.XA_RETRY);
 		}
 
+		if (map.get(xid) != null) {
+			map.get(xid).delete();
+			map.remove(xid);
+		}
+
+		// THIS CAN ONLY HAPPEN IN 1PC OR ROLLBACK
 		if (file != null) {
 			file.delete();
 		}
@@ -163,8 +182,6 @@
 	public synchronized void rollback(Xid xid) throws XAException {
 		System.out.println("     ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_ROLLBACK[" + xid + "]");
 
-		persistProxy(xid);
-
 		try {
 			lookupProvider.lookup(remoteServerName).propagateRollback(xid);
 			System.out.println("     ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_ROLLBACKED");
@@ -179,6 +196,12 @@
 			}
 		}
 
+		if (map.get(xid) != null) {
+			map.get(xid).delete();
+			map.remove(xid);
+		}
+
+		// THIS CAN ONLY HAPPEN IN 1PC OR ROLLBACK
 		if (file != null) {
 			file.delete();
 		}
@@ -196,7 +219,6 @@
 	 */
 	@Override
 	public Xid[] recover(int flag) throws XAException {
-		Xid[] recovered = null;
 		if ((flag & XAResource.TMSTARTRSCAN) == XAResource.TMSTARTRSCAN) {
 			System.out.println("     ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_RECOVER [XAResource.TMSTARTRSCAN]: "
 					+ remoteServerName);
@@ -206,29 +228,50 @@
 					+ remoteServerName);
 		}
 
-		if (this.xid != null) {
-			try {
-				recovered = lookupProvider.lookup(remoteServerName).propagateRecover(xid.getFormatId(), xid.getGlobalTransactionId());
-			} catch (DummyRemoteException ce) {
-				throw new XAException(XAException.XA_RETRY);
+		List<Xid> toReturn = new ArrayList<Xid>();
+		Xid[] recovered = null;
+		try {
+			recovered = lookupProvider.lookup(remoteServerName).propagateRecover(localServerName);
+		} catch (DummyRemoteException ce) {
+			throw new XAException(XAException.XA_RETRY);
+		}
+
+		List<Xid> arrayList = new ArrayList<Xid>();
+		arrayList.addAll(map.keySet());
+		if (recovered != null) {
+			for (int i = 0; i < recovered.length; i++) {
+				System.out.println("     ProxyXAResource (" + localServerName + ":" + remoteServerName + ") recovered: " + recovered[i]);
+				Iterator<Xid> iterator = map.keySet().iterator();
+				while (iterator.hasNext()) {
+					Xid next = iterator.next();
+					if (Arrays.equals(next.getGlobalTransactionId(), recovered[i].getGlobalTransactionId())) {
+						toReturn.add(next);
+					} else if (!iterator.hasNext()) {
+						toReturn.add(recovered[i]);
+					}
+					arrayList.remove(next);
+				}
+				System.out.println("     ProxyXAResource (" + localServerName + ":" + remoteServerName + ") added: " + toReturn.get(toReturn.size() - 1));
 			}
 		}
 
-		for (int i = 0; i < recovered.length; i++) {
-			System.out.println("     ProxyXAResource (" + localServerName + ":" + remoteServerName + ") recovered: " + recovered[i]);
+		// We now know the remote server didn't know about these Xids
+		List<Xid> knownNoneKnownXids = new ArrayList<Xid>();
+		knownNoneKnownXids.addAll(arrayList);
+		Iterator<Xid> iterator = knownNoneKnownXids.iterator();
+		while (iterator.hasNext()) {
+			Xid next = iterator.next();
+			map.remove(next).delete();
 		}
-
-		Xid[] toReturn = null;
 		if ((flag & XAResource.TMSTARTRSCAN) == XAResource.TMSTARTRSCAN) {
 			System.out.println("     ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_RECOVERD[XAResource.TMSTARTRSCAN]: "
 					+ remoteServerName);
-			toReturn = new Xid[] { xid };
 		}
 		if ((flag & XAResource.TMENDRSCAN) == XAResource.TMENDRSCAN) {
 			System.out.println("     ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_RECOVERD[XAResource.TMENDRSCAN]: "
 					+ remoteServerName);
 		}
-		return toReturn;
+		return toReturn.toArray(new Xid[0]);
 	}
 
 	@Override
@@ -296,30 +339,4 @@
 	public String getJndiName() {
 		return "ProxyXAResource: " + localServerName + " " + remoteServerName;
 	}
-
-	private void persistProxy(Xid xid) throws XAException {
-		// Persist a proxy for the remote server this can mean we try to recover
-		// a transaction at a remote server that did not get chance to
-		// prepare but the alternative is to orphan a prepared server
-
-		if (this.file == null) {
-			try {
-				File dir = new File(System.getProperty("user.dir") + "/distributedjta/ProxyXAResource/" + localServerName + "/");
-				dir.mkdirs();
-				File file = new File(dir, new Uid().fileStringForm());
-				file.createNewFile();
-				DataOutputStream fos = new DataOutputStream(new FileOutputStream(file));
-				fos.writeInt(remoteServerName);
-				fos.writeInt(xid.getFormatId());
-				fos.writeInt(xid.getGlobalTransactionId().length);
-				fos.write(xid.getGlobalTransactionId());
-				fos.writeInt(xid.getBranchQualifier().length);
-				fos.write(xid.getBranchQualifier());
-				this.file = file;
-			} catch (IOException e) {
-				e.printStackTrace();
-				throw new XAException(XAException.XAER_RMERR);
-			}
-		}
-	}
-}
+}
\ No newline at end of file

Modified: labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/examples/classes/com/arjuna/jta/distributed/example/server/impl/ProxyXAResourceRecovery.java
===================================================================
--- labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/examples/classes/com/arjuna/jta/distributed/example/server/impl/ProxyXAResourceRecovery.java	2011-10-20 09:26:01 UTC (rev 37628)
+++ labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/examples/classes/com/arjuna/jta/distributed/example/server/impl/ProxyXAResourceRecovery.java	2011-10-20 09:51:23 UTC (rev 37629)
@@ -21,30 +21,76 @@
  */
 package com.arjuna.jta.distributed.example.server.impl;
 
+import java.io.DataInputStream;
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 
 import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
 
 import org.jboss.tm.XAResourceRecovery;
 
+import com.arjuna.ats.jta.xa.XidImple;
 import com.arjuna.jta.distributed.example.server.LookupProvider;
 
 public class ProxyXAResourceRecovery implements XAResourceRecovery {
 
 	private List<ProxyXAResource> resources = new ArrayList<ProxyXAResource>();
 
-	public ProxyXAResourceRecovery(LookupProvider lookupProvider, int id) throws IOException {
-		File file = new File(System.getProperty("user.dir") + "/distributedjta-example/ProxyXAResource/" + id + "/");
-		if (file.exists() && file.isDirectory()) {
-			File[] listFiles = file.listFiles();
+	public ProxyXAResourceRecovery(LookupProvider lookupProvider, Integer id) throws IOException {
+		File directory = new File(System.getProperty("user.dir") + "/distributedjta/ProxyXAResource/" + id + "/");
+		Map<Integer, Map<Xid, File>> savedData = new HashMap<Integer, Map<Xid, File>>();
+		if (directory.exists() && directory.isDirectory()) {
+			File[] listFiles = directory.listFiles();
 			for (int i = 0; i < listFiles.length; i++) {
-				File currentFile = listFiles[i];
-				resources.add(new ProxyXAResource(lookupProvider, id, currentFile));
+				File file = listFiles[i];
+				DataInputStream fis = new DataInputStream(new FileInputStream(file));
+				int remoteServerName = fis.readInt();
+
+				Map<Xid, File> map = savedData.get(remoteServerName);
+				if (map == null) {
+					map = new HashMap<Xid, File>();
+					savedData.put(remoteServerName, map);
+				}
+				final int formatId = fis.readInt();
+				int gtrid_length = fis.readInt();
+				final byte[] gtrid = new byte[gtrid_length];
+				fis.read(gtrid, 0, gtrid_length);
+
+				int bqual_length = fis.readInt();
+				final byte[] bqual = new byte[bqual_length];
+				fis.read(bqual, 0, bqual_length);
+				Xid xid = new XidImple(new Xid() {
+					@Override
+					public byte[] getBranchQualifier() {
+						return bqual;
+					}
+
+					@Override
+					public int getFormatId() {
+						return formatId;
+					}
+
+					@Override
+					public byte[] getGlobalTransactionId() {
+						return gtrid;
+					}
+				});
+				map.put(xid, file);
 			}
 		}
+		Iterator<Integer> iterator = savedData.keySet().iterator();
+		while (iterator.hasNext()) {
+			Integer remoteServerName = iterator.next();
+			Map<Xid, File> map = savedData.get(remoteServerName);
+			resources.add(new ProxyXAResource(lookupProvider, id, remoteServerName, map));
+		}
 	}
 
 	@Override

Modified: labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/examples/classes/com/arjuna/jta/distributed/example/server/impl/ServerImpl.java
===================================================================
--- labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/examples/classes/com/arjuna/jta/distributed/example/server/impl/ServerImpl.java	2011-10-20 09:26:01 UTC (rev 37628)
+++ labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/examples/classes/com/arjuna/jta/distributed/example/server/impl/ServerImpl.java	2011-10-20 09:51:23 UTC (rev 37629)
@@ -21,10 +21,12 @@
  */
 package com.arjuna.jta.distributed.example.server.impl;
 
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -43,6 +45,7 @@
 import com.arjuna.ats.arjuna.common.CoreEnvironmentBeanException;
 import com.arjuna.ats.arjuna.common.ObjectStoreEnvironmentBean;
 import com.arjuna.ats.arjuna.common.RecoveryEnvironmentBean;
+import com.arjuna.ats.arjuna.common.Uid;
 import com.arjuna.ats.arjuna.coordinator.TxControl;
 import com.arjuna.ats.arjuna.recovery.RecoveryManager;
 import com.arjuna.ats.arjuna.tools.osb.mbean.ObjStoreBrowser;
@@ -197,11 +200,33 @@
 	}
 
 	@Override
-	public ProxyXAResource generateProxyXAResource(LookupProvider lookupProvider, Integer localServerName, Integer remoteServerName) {
-		return new ProxyXAResource(lookupProvider, localServerName, remoteServerName);
+	public ProxyXAResource generateProxyXAResource(LookupProvider lookupProvider, Integer remoteServerName) throws IOException, SystemException {
+		// Persist a proxy for the remote server this can mean we try to recover
+		// transactions at a remote server that did not get chance to
+		// prepare but the alternative is to orphan a prepared server
+
+		Xid currentXid = getCurrentXid();
+		File dir = new File(System.getProperty("user.dir") + "/distributedjta/ProxyXAResource/" + getNodeName());
+		dir.mkdirs();
+		File file = new File(dir, new Uid().fileStringForm());
+		file.createNewFile();
+		DataOutputStream fos = new DataOutputStream(new FileOutputStream(file));
+		fos.writeInt(remoteServerName);
+		fos.writeInt(currentXid.getFormatId());
+		fos.writeInt(currentXid.getGlobalTransactionId().length);
+		fos.write(currentXid.getGlobalTransactionId());
+		fos.writeInt(currentXid.getBranchQualifier().length);
+		fos.write(currentXid.getBranchQualifier());
+		
+		return new ProxyXAResource(lookupProvider, nodeName, remoteServerName, file);
 	}
 
 	@Override
+	public void cleanupProxyXAResource(XAResource proxyXAResource) {
+		((ProxyXAResource) proxyXAResource).deleteTemporaryFile();
+	}
+
+	@Override
 	public Synchronization generateProxySynchronization(LookupProvider lookupProvider, Integer localServerName, Integer remoteServerName, Xid toRegisterAgainst) {
 		return new ProxySynchronization(lookupProvider, localServerName, remoteServerName, toRegisterAgainst);
 	}
@@ -246,22 +271,12 @@
 	}
 
 	@Override
-	public Xid[] propagateRecover(int formatId, byte[] gtrid) throws XAException, DummyRemoteException {
+	public Xid[] propagateRecover(Integer parentNodeName) throws XAException, DummyRemoteException {
 		ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
 		try {
 			Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
-			List<Xid> toReturn = new ArrayList<Xid>();
-			Xid[] recovered = ((XATerminatorImple) SubordinationManager.getXATerminator()).recover();
-			if (recovered != null) {
-				for (int i = 0; i < recovered.length; i++) {
-					// Filter out the transactions that are not owned by this
-					// parent
-					if (recovered[i].getFormatId() == formatId && Arrays.equals(gtrid, recovered[i].getGlobalTransactionId())) {
-						toReturn.add(recovered[i]);
-					}
-				}
-			}
-			return toReturn.toArray(new Xid[0]);
+			Xid[] recovered = ((XATerminatorImple) SubordinationManager.getXATerminator()).doRecover(parentNodeName);
+			return recovered;
 		} finally {
 			Thread.currentThread().setContextClassLoader(contextClassLoader);
 		}

Modified: labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/SimpleIsolatedServers.java
===================================================================
--- labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/SimpleIsolatedServers.java	2011-10-20 09:26:01 UTC (rev 37628)
+++ labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/SimpleIsolatedServers.java	2011-10-20 09:51:23 UTC (rev 37629)
@@ -133,9 +133,9 @@
 					Transaction originalTransaction = transactionManager.getTransaction();
 					int remainingTimeout = (int) (originalServer.getTimeLeftBeforeTransactionTimeout() / 1000);
 					Xid currentXid = originalServer.getCurrentXid();
+					XAResource proxyXAResource = originalServer.generateProxyXAResource(lookupProvider, 2000);
 					originalServer.storeRootTransaction();
 					transactionManager.suspend();
-					XAResource proxyXAResource = initializeProxy(originalServer, 2000, currentXid);
 					performTransactionalWork(null, new LinkedList<Integer>(Arrays.asList(new Integer[] { 2000 })), remainingTimeout, currentXid, 1, false);
 					transactionManager.resume(originalTransaction);
 					originalTransaction.enlistResource(proxyXAResource);
@@ -211,8 +211,8 @@
 					int remainingTimeout = (int) (originalServer.getTimeLeftBeforeTransactionTimeout() / 1000);
 					Xid currentXid = originalServer.getCurrentXid();
 					originalServer.storeRootTransaction();
+					XAResource proxyXAResource = originalServer.generateProxyXAResource(lookupProvider, 2000);
 					transactionManager.suspend();
-					XAResource proxyXAResource = initializeProxy(originalServer, 2000, currentXid);
 					performTransactionalWork(null, new LinkedList<Integer>(Arrays.asList(new Integer[] { 2000 })), remainingTimeout, currentXid, 2, false);
 					transactionManager.resume(originalTransaction);
 					originalTransaction.enlistResource(proxyXAResource);
@@ -289,8 +289,8 @@
 					int remainingTimeout = (int) (originalServer.getTimeLeftBeforeTransactionTimeout() / 1000);
 					Xid currentXid = originalServer.getCurrentXid();
 					originalServer.storeRootTransaction();
+					XAResource proxyXAResource = originalServer.generateProxyXAResource(lookupProvider, 2000);
 					transactionManager.suspend();
-					XAResource proxyXAResource = initializeProxy(originalServer, 2000, currentXid);
 					performTransactionalWork(null, new LinkedList<Integer>(Arrays.asList(new Integer[] { 2000 })), remainingTimeout, currentXid, 2, false);
 					transactionManager.resume(originalTransaction);
 					originalTransaction.enlistResource(proxyXAResource);
@@ -427,8 +427,8 @@
 		int remainingTimeout = (int) (originalServer.getTimeLeftBeforeTransactionTimeout() / 1000);
 		Xid currentXid = originalServer.getCurrentXid();
 		originalServer.storeRootTransaction();
+		XAResource proxyXAResource = originalServer.generateProxyXAResource(lookupProvider, 2000);
 		transactionManager.suspend();
-		XAResource proxyXAResource = initializeProxy(originalServer, 2000, currentXid);
 		performTransactionalWork(null, new LinkedList<Integer>(Arrays.asList(new Integer[] { 2000 })), remainingTimeout, currentXid, 1, false);
 		transactionManager.resume(originalTransaction);
 		originalTransaction.enlistResource(proxyXAResource);
@@ -451,8 +451,8 @@
 		int remainingTimeout = (int) (originalServer.getTimeLeftBeforeTransactionTimeout() / 1000);
 		Xid currentXid = originalServer.getCurrentXid();
 		originalServer.storeRootTransaction();
+		XAResource proxyXAResource = originalServer.generateProxyXAResource(lookupProvider, 2000);
 		transactionManager.suspend();
-		XAResource proxyXAResource = initializeProxy(originalServer, 2000, currentXid);
 		performTransactionalWork(null, new LinkedList<Integer>(Arrays.asList(new Integer[] { 2000 })), remainingTimeout, currentXid, 1, false);
 		transactionManager.resume(originalTransaction);
 		originalTransaction.enlistResource(proxyXAResource);
@@ -544,8 +544,8 @@
 		Xid currentXid = originalServer.getCurrentXid();
 		originalServer.storeRootTransaction();
 		originalTransaction.enlistResource(new TestResource(counter, originalServer.getNodeName(), false));
+		XAResource proxyXAResource = originalServer.generateProxyXAResource(lookupProvider, 2000);
 		transactionManager.suspend();
-		XAResource proxyXAResource = initializeProxy(originalServer, 2000, currentXid);
 
 		// Migrate a transaction
 		LocalServer currentServer = getLocalServer(2000);
@@ -658,12 +658,11 @@
 			// FLOW THE TRANSACTION
 			remainingTimeout = (int) (currentServer.getTimeLeftBeforeTransactionTimeout() / 1000);
 
-			// SUSPEND THE TRANSACTION
+			// STORE AND SUSPEND THE TRANSACTION
 			Xid currentXid = currentServer.getCurrentXid();
+			XAResource proxyXAResource = currentServer.generateProxyXAResource(lookupProvider, nodesToFlowTo.get(0));
 			transactionManager.suspend();
 
-			XAResource proxyXAResource = initializeProxy(currentServer, nodesToFlowTo.get(0), currentXid);
-
 			boolean proxyRequired = performTransactionalWork(counter, nodesToFlowTo, remainingTimeout, currentXid, numberOfResourcesToRegister,
 					addSynchronization);
 			transactionManager.resume(transaction);
@@ -679,7 +678,7 @@
 				transaction.registerSynchronization(currentServer.generateProxySynchronization(lookupProvider, currentServer.getNodeName(), nextServerNodeName,
 						toMigrate));
 			} else {
-				currentServer.cleanupProxy(proxyXAResource);
+				currentServer.cleanupProxyXAResource(proxyXAResource);
 			}
 		}
 
@@ -695,28 +694,6 @@
 		return localServers[index];
 	}
 
-	private synchronized XAResource initializeProxy(LocalServer server, Integer remoteServerName, Xid xid) throws IOException {
-		// Persist a proxy for the remote server this can mean we try to recover
-		// transactions at a remote server that did not get chance to
-		// prepare but the alternative is to orphan a prepared server
-
-		File dir = new File(System.getProperty("user.dir") + "/distributedjta/ProxyXAResource/" + server.getNodeName());
-		dir.mkdirs();
-		File file = new File(dir, new Uid().fileStringForm());
-		file.createNewFile();
-		DataOutputStream fos = new DataOutputStream(new FileOutputStream(file));
-		fos.writeInt(remoteServerName);
-		fos.writeInt(xid.getFormatId());
-		fos.writeInt(xid.getGlobalTransactionId().length);
-		fos.write(xid.getGlobalTransactionId());
-		fos.writeInt(xid.getBranchQualifier().length);
-		fos.write(xid.getBranchQualifier());
-
-		XAResource proxyXAResource = server.generateProxyXAResource(lookupProvider, server.getNodeName(), remoteServerName, file);
-
-		return proxyXAResource;
-	}
-
 	private static class MyLookupProvider implements LookupProvider {
 
 		@Override

Modified: labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/LocalServer.java
===================================================================
--- labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/LocalServer.java	2011-10-20 09:26:01 UTC (rev 37628)
+++ labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/LocalServer.java	2011-10-20 09:51:23 UTC (rev 37629)
@@ -58,8 +58,10 @@
 
 	public RemoteServer connectTo();
 
-	public XAResource generateProxyXAResource(LookupProvider lookupProvider, Integer localServerName, Integer remoteServerName, File file);
+	public XAResource generateProxyXAResource(LookupProvider lookupProvider, Integer remoteServerName) throws SystemException, IOException;
 
+	public void cleanupProxyXAResource(XAResource proxyXAResource);
+
 	public Synchronization generateProxySynchronization(LookupProvider lookupProvider, Integer localServerName, Integer remoteServerName, Xid toRegisterAgainst);
 
 	public Xid getCurrentXid() throws SystemException;
@@ -67,6 +69,4 @@
 	public CompletionCounter getCompletionCounter();
 
 	public void shutdown() throws Exception;
-
-	public void cleanupProxy(XAResource proxyXAResource);
 }

Modified: labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/impl/ServerImpl.java
===================================================================
--- labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/impl/ServerImpl.java	2011-10-20 09:26:01 UTC (rev 37628)
+++ labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/impl/ServerImpl.java	2011-10-20 09:51:23 UTC (rev 37629)
@@ -21,7 +21,9 @@
  */
 package com.arjuna.ats.jta.distributed.server.impl;
 
+import java.io.DataOutputStream;
 import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.lang.reflect.Field;
 import java.net.InetAddress;
@@ -47,6 +49,7 @@
 import com.arjuna.ats.arjuna.common.CoreEnvironmentBeanException;
 import com.arjuna.ats.arjuna.common.ObjectStoreEnvironmentBean;
 import com.arjuna.ats.arjuna.common.RecoveryEnvironmentBean;
+import com.arjuna.ats.arjuna.common.Uid;
 import com.arjuna.ats.arjuna.coordinator.TransactionReaper;
 import com.arjuna.ats.arjuna.coordinator.TxControl;
 import com.arjuna.ats.arjuna.recovery.RecoveryManager;
@@ -296,13 +299,31 @@
 	}
 
 	@Override
-	public ProxyXAResource generateProxyXAResource(LookupProvider lookupProvider, Integer localServerName, Integer remoteServerName, File file) {
-		return new ProxyXAResource(counter, lookupProvider, localServerName, remoteServerName, file);
+	public ProxyXAResource generateProxyXAResource(LookupProvider lookupProvider, Integer remoteServerName) throws SystemException, IOException {
+
+		// Persist a proxy for the remote server this can mean we try to recover
+		// transactions at a remote server that did not get chance to
+		// prepare but the alternative is to orphan a prepared server
+
+		Xid currentXid = getCurrentXid();
+		File dir = new File(System.getProperty("user.dir") + "/distributedjta/ProxyXAResource/" + getNodeName());
+		dir.mkdirs();
+		File file = new File(dir, new Uid().fileStringForm());
+		file.createNewFile();
+		DataOutputStream fos = new DataOutputStream(new FileOutputStream(file));
+		fos.writeInt(remoteServerName);
+		fos.writeInt(currentXid.getFormatId());
+		fos.writeInt(currentXid.getGlobalTransactionId().length);
+		fos.write(currentXid.getGlobalTransactionId());
+		fos.writeInt(currentXid.getBranchQualifier().length);
+		fos.write(currentXid.getBranchQualifier());
+
+		return new ProxyXAResource(counter, lookupProvider, getNodeName(), remoteServerName, file);
 	}
 
 	@Override
-	public void cleanupProxy(XAResource proxyXAResource) {
-		((ProxyXAResource)proxyXAResource).deleteTemporaryFile();
+	public void cleanupProxyXAResource(XAResource proxyXAResource) {
+		((ProxyXAResource) proxyXAResource).deleteTemporaryFile();
 	}
 
 	@Override



More information about the jboss-svn-commits mailing list