[jboss-svn-commits] JBL Code SVN: r37622 - in labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration: examples/classes/com/arjuna/jta/distributed/example/server/impl and 4 other directories.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Wed Oct 19 19:08:16 EDT 2011


Author: tomjenkinson
Date: 2011-10-19 19:08:15 -0400 (Wed, 19 Oct 2011)
New Revision: 37622

Added:
   labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/byteman-scripts/leave-subordinate-orphan.txt
Modified:
   labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/examples/classes/com/arjuna/jta/distributed/example/server/LocalServer.java
   labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/examples/classes/com/arjuna/jta/distributed/example/server/impl/ProxyXAResource.java
   labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/examples/classes/com/arjuna/jta/distributed/example/server/impl/ServerImpl.java
   labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/SimpleIsolatedServers.java
   labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/LocalServer.java
   labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/RemoteServer.java
   labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/impl/ProxyXAResource.java
   labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/impl/ProxyXAResourceRecovery.java
   labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/impl/ServerImpl.java
Log:
JBTM-895 ensure that we can recover orphans that are fully prepared on the remote node - by a call to one phase commit

Modified: labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/examples/classes/com/arjuna/jta/distributed/example/server/LocalServer.java
===================================================================
--- labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/examples/classes/com/arjuna/jta/distributed/example/server/LocalServer.java	2011-10-19 21:25:20 UTC (rev 37621)
+++ labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/examples/classes/com/arjuna/jta/distributed/example/server/LocalServer.java	2011-10-19 23:08:15 UTC (rev 37622)
@@ -56,7 +56,5 @@
 
 	public Synchronization generateProxySynchronization(LookupProvider lookupProvider, Integer localServerName, Integer remoteServerName, Xid toRegisterAgainst);
 
-	public Xid extractXid(XAResource proxyXAResource);
-
 	public Xid getCurrentXid() throws SystemException;
 }

Modified: labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/examples/classes/com/arjuna/jta/distributed/example/server/impl/ProxyXAResource.java
===================================================================
--- labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/examples/classes/com/arjuna/jta/distributed/example/server/impl/ProxyXAResource.java	2011-10-19 21:25:20 UTC (rev 37621)
+++ labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/examples/classes/com/arjuna/jta/distributed/example/server/impl/ProxyXAResource.java	2011-10-19 23:08:15 UTC (rev 37622)
@@ -130,28 +130,9 @@
 	public synchronized int prepare(Xid xid) throws XAException {
 		System.out.println("     ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_PREPARE [" + xid + "]");
 
-		// Persist a proxy for the remote server this can mean we try to recover
-		// a transaction at a remote server that did not get chance to
-		// prepare but the alternative is to orphan a prepared server
+		persistProxy(xid);
 
 		try {
-			File dir = new File(System.getProperty("user.dir") + "/distributedjta-example/ProxyXAResource/" + localServerName + "/");
-			dir.mkdirs();
-			file = new File(dir, new Uid().fileStringForm());
-			file.createNewFile();
-			DataOutputStream fos = new DataOutputStream(new FileOutputStream(file));
-			fos.writeInt(remoteServerName);
-			fos.writeInt(xid.getFormatId());
-			fos.writeInt(xid.getGlobalTransactionId().length);
-			fos.write(xid.getGlobalTransactionId());
-			fos.writeInt(xid.getBranchQualifier().length);
-			fos.write(xid.getBranchQualifier());
-		} catch (IOException e) {
-			e.printStackTrace();
-			throw new XAException(XAException.XAER_RMERR);
-		}
-
-		try {
 			int propagatePrepare = lookupProvider.lookup(remoteServerName).propagatePrepare(xid);
 			System.out.println("     ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_PREPARED");
 			return propagatePrepare;
@@ -164,6 +145,8 @@
 	public synchronized void commit(Xid xid, boolean onePhase) throws XAException {
 		System.out.println("     ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_COMMIT  [" + xid + "]");
 
+		persistProxy(xid);
+
 		try {
 			lookupProvider.lookup(remoteServerName).propagateCommit(xid, onePhase);
 			System.out.println("     ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_COMMITED");
@@ -179,6 +162,9 @@
 	@Override
 	public synchronized void rollback(Xid xid) throws XAException {
 		System.out.println("     ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_ROLLBACK[" + xid + "]");
+
+		persistProxy(xid);
+
 		try {
 			lookupProvider.lookup(remoteServerName).propagateRollback(xid);
 			System.out.println("     ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_ROLLBACKED");
@@ -311,7 +297,29 @@
 		return "ProxyXAResource: " + localServerName + " " + remoteServerName;
 	}
 
-	public Xid getXid() {
-		return xid;
+	private void persistProxy(Xid xid) throws XAException {
+		// Persist a proxy for the remote server this can mean we try to recover
+		// a transaction at a remote server that did not get chance to
+		// prepare but the alternative is to orphan a prepared server
+
+		if (this.file == null) {
+			try {
+				File dir = new File(System.getProperty("user.dir") + "/distributedjta/ProxyXAResource/" + localServerName + "/");
+				dir.mkdirs();
+				File file = new File(dir, new Uid().fileStringForm());
+				file.createNewFile();
+				DataOutputStream fos = new DataOutputStream(new FileOutputStream(file));
+				fos.writeInt(remoteServerName);
+				fos.writeInt(xid.getFormatId());
+				fos.writeInt(xid.getGlobalTransactionId().length);
+				fos.write(xid.getGlobalTransactionId());
+				fos.writeInt(xid.getBranchQualifier().length);
+				fos.write(xid.getBranchQualifier());
+				this.file = file;
+			} catch (IOException e) {
+				e.printStackTrace();
+				throw new XAException(XAException.XAER_RMERR);
+			}
+		}
 	}
 }

Modified: labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/examples/classes/com/arjuna/jta/distributed/example/server/impl/ServerImpl.java
===================================================================
--- labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/examples/classes/com/arjuna/jta/distributed/example/server/impl/ServerImpl.java	2011-10-19 21:25:20 UTC (rev 37621)
+++ labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/examples/classes/com/arjuna/jta/distributed/example/server/impl/ServerImpl.java	2011-10-19 23:08:15 UTC (rev 37622)
@@ -289,10 +289,4 @@
 			Thread.currentThread().setContextClassLoader(contextClassLoader);
 		}
 	}
-
-	@Override
-	public Xid extractXid(XAResource xaResource) {
-		ProxyXAResource proxyXAResource = (ProxyXAResource) xaResource;
-		return proxyXAResource.getXid();
-	}
 }

Added: labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/byteman-scripts/leave-subordinate-orphan.txt
===================================================================
--- labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/byteman-scripts/leave-subordinate-orphan.txt	                        (rev 0)
+++ labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/byteman-scripts/leave-subordinate-orphan.txt	2011-10-19 23:08:15 UTC (rev 37622)
@@ -0,0 +1,16 @@
+########################################################################
+#
+# byteman script used to ensure that tests can synchronize with various
+# actions performed by the recovery code
+
+
+
+#########################################################################
+RULE Fail phase2Commit
+CLASS  com.arjuna.ats.arjuna.coordinator.BasicAction
+METHOD phase2Commit
+AT ENTRY
+BIND NOTHING
+IF TRUE
+	DO throw new Error()
+ENDRULE
\ No newline at end of file

Modified: labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/SimpleIsolatedServers.java
===================================================================
--- labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/SimpleIsolatedServers.java	2011-10-19 21:25:20 UTC (rev 37621)
+++ labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/SimpleIsolatedServers.java	2011-10-19 23:08:15 UTC (rev 37622)
@@ -100,11 +100,11 @@
 		// same server
 		{
 			RemoteServer server = lookupProvider.lookup(2000);
-			server.propagateRecover(0, null);
+			server.propagateRecover(1000);
 		}
 		{
 			RemoteServer server = lookupProvider.lookup(2000);
-			server.propagateRecover(0, null);
+			server.propagateRecover(3000);
 		}
 	}
 
@@ -259,11 +259,82 @@
 			assertTrue(server.getCompletionCounter().getRollbackCount() == 0);
 			server.doRecoveryManagerScan(true);
 			assertTrue(server.getCompletionCounter().getCommitCount() == 0);
-			assertTrue(server.getCompletionCounter().getRollbackCount() == 0);
+			assertTrue(server.getCompletionCounter().getRollbackCount() == 1);
 		}
 	}
 
 	@Test
+	@BMScript("leave-subordinate-orphan")
+	public void testOnePhaseSubordinateOrphan() throws Exception {
+		assertTrue(getLocalServer(3000).getCompletionCounter().getCommitCount() == 0);
+		assertTrue(getLocalServer(2000).getCompletionCounter().getCommitCount() == 0);
+		assertTrue(getLocalServer(1000).getCompletionCounter().getCommitCount() == 0);
+		final Phase2CommitAborted phase2CommitAborted = new Phase2CommitAborted();
+		Thread thread = new Thread(new Runnable() {
+			public void run() {
+				int startingTimeout = 0;
+				try {
+					int startingServer = 1000;
+					LocalServer originalServer = getLocalServer(startingServer);
+					ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+					Thread.currentThread().setContextClassLoader(originalServer.getClass().getClassLoader());
+					TransactionManager transactionManager = originalServer.getTransactionManager();
+					transactionManager.setTransactionTimeout(startingTimeout);
+					transactionManager.begin();
+					Transaction originalTransaction = transactionManager.getTransaction();
+					int remainingTimeout = (int) (originalServer.getTimeLeftBeforeTransactionTimeout() / 1000);
+					Xid currentXid = originalServer.getCurrentXid();
+					originalServer.storeRootTransaction();
+					transactionManager.suspend();
+					performTransactionalWork(null, new LinkedList<Integer>(Arrays.asList(new Integer[] { 2000 })), remainingTimeout, currentXid, 2, false);
+					transactionManager.resume(originalTransaction);
+					XAResource proxyXAResource = originalServer.generateProxyXAResource(lookupProvider, originalServer.getNodeName(), 2000);
+					originalTransaction.enlistResource(proxyXAResource);
+					originalServer.removeRootTransaction(currentXid);
+					transactionManager.commit();
+					Thread.currentThread().setContextClassLoader(classLoader);
+				} catch (ExecuteException e) {
+					System.err.println("Should be a thread death but cest la vie");
+					synchronized (phase2CommitAborted) {
+						phase2CommitAborted.setPhase2CommitAborted(true);
+						phase2CommitAborted.notify();
+					}
+				} catch (LinkageError t) {
+					System.err.println("Should be a thread death but cest la vie");
+					synchronized (phase2CommitAborted) {
+						phase2CommitAborted.setPhase2CommitAborted(true);
+						phase2CommitAborted.notify();
+					}
+				} catch (Throwable t) {
+					System.err.println("Should be a thread death but cest la vie");
+					synchronized (phase2CommitAborted) {
+						phase2CommitAborted.setPhase2CommitAborted(true);
+						phase2CommitAborted.notify();
+					}
+				}
+			}
+		}, "Orphan-creator");
+		thread.start();
+		synchronized (phase2CommitAborted) {
+			if (!phase2CommitAborted.isPhase2CommitAborted()) {
+				phase2CommitAborted.wait();
+			}
+		}
+		tearDown();
+		setup();
+		assertTrue(getLocalServer(2000).getCompletionCounter().getCommitCount() == 0);
+		assertTrue(getLocalServer(2000).getCompletionCounter().getRollbackCount() == 0);
+		assertTrue(getLocalServer(1000).getCompletionCounter().getCommitCount() == 0);
+		assertTrue(getLocalServer(1000).getCompletionCounter().getRollbackCount() == 0);
+		getLocalServer(1000).doRecoveryManagerScan(true);
+		assertTrue(getLocalServer(1000).getCompletionCounter().getCommitCount() == 0);
+		assertTrue(getLocalServer(1000).getCompletionCounter().getRollbackCount() == 1);
+		assertTrue(getLocalServer(2000).getCompletionCounter().getCommitCount() == 0);
+		assertTrue(getLocalServer(2000).getCompletionCounter().getRollbackCount() == 2);
+
+	}
+
+	@Test
 	@BMScript("fail2pc")
 	public void testRecovery() throws Exception {
 		tearDown();

Modified: labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/LocalServer.java
===================================================================
--- labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/LocalServer.java	2011-10-19 21:25:20 UTC (rev 37621)
+++ labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/LocalServer.java	2011-10-19 23:08:15 UTC (rev 37622)
@@ -61,8 +61,6 @@
 
 	public Synchronization generateProxySynchronization(LookupProvider lookupProvider, Integer localServerName, Integer remoteServerName, Xid toRegisterAgainst);
 
-	public Xid extractXid(XAResource proxyXAResource);
-
 	public Xid getCurrentXid() throws SystemException;
 
 	public CompletionCounter getCompletionCounter();

Modified: labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/RemoteServer.java
===================================================================
--- labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/RemoteServer.java	2011-10-19 21:25:20 UTC (rev 37621)
+++ labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/RemoteServer.java	2011-10-19 23:08:15 UTC (rev 37622)
@@ -35,7 +35,7 @@
 
 	public void propagateRollback(Xid xid) throws XAException, DummyRemoteException;
 
-	public Xid[] propagateRecover(int formatId, byte[] gtrid) throws XAException, DummyRemoteException;
+	public Xid[] propagateRecover(Integer parentNodeName) throws XAException, DummyRemoteException;
 
 	public void propagateForget(Xid xid) throws XAException, DummyRemoteException;
 

Modified: labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/impl/ProxyXAResource.java
===================================================================
--- labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/impl/ProxyXAResource.java	2011-10-19 21:25:20 UTC (rev 37621)
+++ labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/impl/ProxyXAResource.java	2011-10-19 23:08:15 UTC (rev 37622)
@@ -21,12 +21,16 @@
  */
 package com.arjuna.ats.jta.distributed.server.impl;
 
-import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.File;
-import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
 
 import javax.transaction.xa.XAException;
 import javax.transaction.xa.XAResource;
@@ -47,10 +51,9 @@
 
 	private int transactionTimeout;
 	private Integer remoteServerName = -1;
-	private File file;
+	private Map<Xid, File> map;
 	private Integer localServerName;
 	private LookupProvider lookupProvider;
-	private Xid xid;
 	private CompletionCounter completionCounter;
 
 	/**
@@ -65,6 +68,7 @@
 		this.lookupProvider = lookupProvider;
 		this.localServerName = localServerName;
 		this.remoteServerName = remoteServerName;
+		map = new HashMap<Xid, File>();
 	}
 
 	/**
@@ -72,39 +76,18 @@
 	 * 
 	 * @param lookupProvider
 	 * @param localServerName
+	 * @param map
+	 * @param remoteServerName
 	 * @param file
 	 * @throws IOException
 	 */
-	public ProxyXAResource(CompletionCounter completionCounter, LookupProvider lookupProvider, Integer localServerName, File file) throws IOException {
+	public ProxyXAResource(CompletionCounter completionCounter, LookupProvider lookupProvider, Integer localServerName, Integer remoteServerName,
+			Map<Xid, File> map) throws IOException {
 		this.completionCounter = completionCounter;
 		this.lookupProvider = lookupProvider;
 		this.localServerName = localServerName;
-		this.file = file;
-		DataInputStream fis = new DataInputStream(new FileInputStream(file));
-		this.remoteServerName = fis.readInt();
-		final int formatId = fis.readInt();
-		int gtrid_length = fis.readInt();
-		final byte[] gtrid = new byte[gtrid_length];
-		fis.read(gtrid, 0, gtrid_length);
-		int bqual_length = fis.readInt();
-		final byte[] bqual = new byte[bqual_length];
-		fis.read(bqual, 0, bqual_length);
-		this.xid = new Xid() {
-			@Override
-			public byte[] getBranchQualifier() {
-				return bqual;
-			}
-
-			@Override
-			public int getFormatId() {
-				return formatId;
-			}
-
-			@Override
-			public byte[] getGlobalTransactionId() {
-				return gtrid;
-			}
-		};
+		this.remoteServerName = remoteServerName;
+		this.map = map;
 	}
 
 	/**
@@ -113,7 +96,6 @@
 	@Override
 	public void start(Xid xid, int flags) throws XAException {
 		System.out.println("     ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_START   [" + xid + "]");
-		this.xid = xid;
 	}
 
 	/**
@@ -122,7 +104,6 @@
 	@Override
 	public void end(Xid xid, int flags) throws XAException {
 		System.out.println("     ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_END     [" + xid + "]");
-		this.xid = null;
 	}
 
 	/**
@@ -134,28 +115,9 @@
 	public synchronized int prepare(Xid xid) throws XAException {
 		System.out.println("     ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_PREPARE [" + xid + "]");
 
-		// Persist a proxy for the remote server this can mean we try to recover
-		// a transaction at a remote server that did not get chance to
-		// prepare but the alternative is to orphan a prepared server
+		persistProxy(xid);
 
 		try {
-			File dir = new File(System.getProperty("user.dir") + "/distributedjta/ProxyXAResource/" + localServerName + "/");
-			dir.mkdirs();
-			file = new File(dir, new Uid().fileStringForm());
-			file.createNewFile();
-			DataOutputStream fos = new DataOutputStream(new FileOutputStream(file));
-			fos.writeInt(remoteServerName);
-			fos.writeInt(xid.getFormatId());
-			fos.writeInt(xid.getGlobalTransactionId().length);
-			fos.write(xid.getGlobalTransactionId());
-			fos.writeInt(xid.getBranchQualifier().length);
-			fos.write(xid.getBranchQualifier());
-		} catch (IOException e) {
-			e.printStackTrace();
-			throw new XAException(XAException.XAER_RMERR);
-		}
-
-		try {
 			int propagatePrepare = lookupProvider.lookup(remoteServerName).propagatePrepare(xid);
 			System.out.println("     ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_PREPARED");
 			return propagatePrepare;
@@ -168,6 +130,8 @@
 	public synchronized void commit(Xid xid, boolean onePhase) throws XAException {
 		System.out.println("     ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_COMMIT  [" + xid + "]");
 
+		persistProxy(xid);
+
 		try {
 			lookupProvider.lookup(remoteServerName).propagateCommit(xid, onePhase);
 			System.out.println("     ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_COMMITED");
@@ -175,8 +139,9 @@
 			throw new XAException(XAException.XA_RETRY);
 		}
 
-		if (file != null) {
-			file.delete();
+		if (map.get(xid) != null) {
+			map.get(xid).delete();
+			map.remove(xid);
 		}
 		if (completionCounter != null) {
 			completionCounter.incrementCommit();
@@ -187,6 +152,8 @@
 	public synchronized void rollback(Xid xid) throws XAException {
 		System.out.println("     ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_ROLLBACK[" + xid + "]");
 
+		persistProxy(xid);
+
 		try {
 			lookupProvider.lookup(remoteServerName).propagateRollback(xid);
 			System.out.println("     ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_ROLLBACKED");
@@ -201,8 +168,9 @@
 			}
 		}
 
-		if (file != null) {
-			file.delete();
+		if (map.get(xid) != null) {
+			map.get(xid).delete();
+			map.remove(xid);
 		}
 		if (completionCounter != null) {
 			completionCounter.incrementRollback();
@@ -221,7 +189,6 @@
 	 */
 	@Override
 	public Xid[] recover(int flag) throws XAException {
-		Xid[] recovered = null;
 		if ((flag & XAResource.TMSTARTRSCAN) == XAResource.TMSTARTRSCAN) {
 			System.out.println("     ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_RECOVER [XAResource.TMSTARTRSCAN]: "
 					+ remoteServerName);
@@ -231,29 +198,45 @@
 					+ remoteServerName);
 		}
 
-		if (this.xid != null) {
-			try {
-				recovered = lookupProvider.lookup(remoteServerName).propagateRecover(xid.getFormatId(), xid.getGlobalTransactionId());
-			} catch (DummyRemoteException ce) {
-				throw new XAException(XAException.XA_RETRY);
-			}
+		List<Xid> toReturn = new ArrayList<Xid>();
+		Xid[] recovered = null;
+		try {
+			recovered = lookupProvider.lookup(remoteServerName).propagateRecover(localServerName);
+		} catch (DummyRemoteException ce) {
+			throw new XAException(XAException.XA_RETRY);
 		}
 
+		List<Xid> arrayList = new ArrayList<Xid>();
+		arrayList.addAll(map.keySet());
 		for (int i = 0; i < recovered.length; i++) {
 			System.out.println("     ProxyXAResource (" + localServerName + ":" + remoteServerName + ") recovered: " + recovered[i]);
+			Iterator<Xid> iterator = map.keySet().iterator();
+			while (iterator.hasNext()) {
+				Xid next = iterator.next();
+				if (Arrays.equals(next.getGlobalTransactionId(), recovered[i].getGlobalTransactionId())) {
+					toReturn.add(next);
+				} else if (!iterator.hasNext()) {
+					toReturn.add(recovered[i]);
+				}
+				arrayList.remove(next);
+			}
+			System.out.println("     ProxyXAResource (" + localServerName + ":" + remoteServerName + ") added: " + toReturn.get(toReturn.size() - 1));
 		}
 
-		Xid[] toReturn = null;
+		Iterator<Xid> iterator = arrayList.iterator();
+		while (iterator.hasNext()) {
+			Xid next = iterator.next();
+			toReturn.add(next);
+		}
 		if ((flag & XAResource.TMSTARTRSCAN) == XAResource.TMSTARTRSCAN) {
 			System.out.println("     ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_RECOVERD[XAResource.TMSTARTRSCAN]: "
 					+ remoteServerName);
-			toReturn = new Xid[] { xid };
 		}
 		if ((flag & XAResource.TMENDRSCAN) == XAResource.TMENDRSCAN) {
 			System.out.println("     ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_RECOVERD[XAResource.TMENDRSCAN]: "
 					+ remoteServerName);
 		}
-		return toReturn;
+		return toReturn.toArray(new Xid[0]);
 	}
 
 	@Override
@@ -322,7 +305,29 @@
 		return "ProxyXAResource: " + localServerName + " " + remoteServerName;
 	}
 
-	public Xid getXid() {
-		return xid;
+	private void persistProxy(Xid xid) throws XAException {
+		// Persist a proxy for the remote server this can mean we try to recover
+		// a transaction at a remote server that did not get chance to
+		// prepare but the alternative is to orphan a prepared server
+
+		if (!map.containsKey(xid)) {
+			try {
+				File dir = new File(System.getProperty("user.dir") + "/distributedjta/ProxyXAResource/" + localServerName + "/");
+				dir.mkdirs();
+				File file = new File(dir, new Uid().fileStringForm());
+				file.createNewFile();
+				DataOutputStream fos = new DataOutputStream(new FileOutputStream(file));
+				fos.writeInt(remoteServerName);
+				fos.writeInt(xid.getFormatId());
+				fos.writeInt(xid.getGlobalTransactionId().length);
+				fos.write(xid.getGlobalTransactionId());
+				fos.writeInt(xid.getBranchQualifier().length);
+				fos.write(xid.getBranchQualifier());
+				map.put(xid, file);
+			} catch (IOException e) {
+				e.printStackTrace();
+				throw new XAException(XAException.XAER_RMERR);
+			}
+		}
 	}
 }

Modified: labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/impl/ProxyXAResourceRecovery.java
===================================================================
--- labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/impl/ProxyXAResourceRecovery.java	2011-10-19 21:25:20 UTC (rev 37621)
+++ labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/impl/ProxyXAResourceRecovery.java	2011-10-19 23:08:15 UTC (rev 37622)
@@ -21,31 +21,76 @@
  */
 package com.arjuna.ats.jta.distributed.server.impl;
 
+import java.io.DataInputStream;
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 
 import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
 
 import org.jboss.tm.XAResourceRecovery;
 
 import com.arjuna.ats.jta.distributed.server.CompletionCounter;
 import com.arjuna.ats.jta.distributed.server.LookupProvider;
+import com.arjuna.ats.jta.xa.XidImple;
 
 public class ProxyXAResourceRecovery implements XAResourceRecovery {
 
 	private List<ProxyXAResource> resources = new ArrayList<ProxyXAResource>();
 
 	public ProxyXAResourceRecovery(CompletionCounter counter, LookupProvider lookupProvider, Integer id) throws IOException {
-		File file = new File(System.getProperty("user.dir") + "/distributedjta/ProxyXAResource/" + id + "/");
-		if (file.exists() && file.isDirectory()) {
-			File[] listFiles = file.listFiles();
+		File directory = new File(System.getProperty("user.dir") + "/distributedjta/ProxyXAResource/" + id + "/");
+		Map<Integer, Map<Xid, File>> savedData = new HashMap<Integer, Map<Xid, File>>();
+		if (directory.exists() && directory.isDirectory()) {
+			File[] listFiles = directory.listFiles();
 			for (int i = 0; i < listFiles.length; i++) {
-				File currentFile = listFiles[i];
-				resources.add(new ProxyXAResource(counter, lookupProvider, id, currentFile));
+				File file = listFiles[i];
+				DataInputStream fis = new DataInputStream(new FileInputStream(file));
+				int remoteServerName = fis.readInt();
+
+				Map<Xid, File> map = savedData.get(remoteServerName);
+				if (map == null) {
+					map = new HashMap<Xid, File>();
+					savedData.put(remoteServerName, map);
+				}
+				final int formatId = fis.readInt();
+				int gtrid_length = fis.readInt();
+				final byte[] gtrid = new byte[gtrid_length];
+				fis.read(gtrid, 0, gtrid_length);
+				int bqual_length = fis.readInt();
+				final byte[] bqual = new byte[bqual_length];
+				fis.read(bqual, 0, bqual_length);
+				Xid xid = new XidImple(new Xid() {
+					@Override
+					public byte[] getBranchQualifier() {
+						return bqual;
+					}
+
+					@Override
+					public int getFormatId() {
+						return formatId;
+					}
+
+					@Override
+					public byte[] getGlobalTransactionId() {
+						return gtrid;
+					}
+				});
+				map.put(xid, file);
 			}
 		}
+		Iterator<Integer> iterator = savedData.keySet().iterator();
+		while (iterator.hasNext()) {
+			Integer remoteServerName = iterator.next();
+			Map<Xid, File> map = savedData.get(remoteServerName);
+			resources.add(new ProxyXAResource(counter, lookupProvider, id, remoteServerName, map));
+		}
 	}
 
 	@Override

Modified: labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/impl/ServerImpl.java
===================================================================
--- labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/impl/ServerImpl.java	2011-10-19 21:25:20 UTC (rev 37621)
+++ labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/impl/ServerImpl.java	2011-10-19 23:08:15 UTC (rev 37622)
@@ -25,7 +25,6 @@
 import java.lang.reflect.Field;
 import java.net.InetAddress;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -37,7 +36,6 @@
 import javax.transaction.Transaction;
 import javax.transaction.TransactionManager;
 import javax.transaction.xa.XAException;
-import javax.transaction.xa.XAResource;
 import javax.transaction.xa.Xid;
 
 import org.jboss.tm.TransactionTimeoutConfiguration;
@@ -68,6 +66,8 @@
 import com.arjuna.ats.jta.distributed.server.LocalServer;
 import com.arjuna.ats.jta.distributed.server.LookupProvider;
 import com.arjuna.ats.jta.distributed.server.RemoteServer;
+import com.arjuna.ats.jta.xa.XATxConverter;
+import com.arjuna.ats.jta.xa.XidImple;
 
 public class ServerImpl implements LocalServer, RemoteServer {
 
@@ -349,7 +349,7 @@
 	}
 
 	@Override
-	public Xid[] propagateRecover(int formatId, byte[] gtrid) throws XAException, DummyRemoteException {
+	public Xid[] propagateRecover(Integer parentNodeName) throws XAException, DummyRemoteException {
 		ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
 		try {
 			Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
@@ -359,7 +359,8 @@
 				for (int i = 0; i < recovered.length; i++) {
 					// Filter out the transactions that are not owned by this
 					// parent
-					if (recovered[i].getFormatId() == formatId && Arrays.equals(gtrid, recovered[i].getGlobalTransactionId())) {
+					if ((recovered[i].getFormatId() == XATxConverter.FORMAT_ID && parentNodeName == XATxConverter.getParentNodeName(((XidImple) recovered[i])
+							.getXID()))) {
 						toReturn.add(recovered[i]);
 					}
 				}
@@ -394,12 +395,6 @@
 	}
 
 	@Override
-	public Xid extractXid(XAResource xaResource) {
-		ProxyXAResource proxyXAResource = (ProxyXAResource) xaResource;
-		return proxyXAResource.getXid();
-	}
-
-	@Override
 	public CompletionCounter getCompletionCounter() {
 		return counter;
 	}



More information about the jboss-svn-commits mailing list