[jboss-svn-commits] JBL Code SVN: r37629 - in labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration: examples/classes/com/arjuna/jta/distributed/example/server and 4 other directories.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Thu Oct 20 05:51:24 EDT 2011
Author: tomjenkinson
Date: 2011-10-20 05:51:23 -0400 (Thu, 20 Oct 2011)
New Revision: 37629
Modified:
labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/examples/classes/com/arjuna/jta/distributed/example/ExampleDistributedJTATestCase.java
labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/examples/classes/com/arjuna/jta/distributed/example/server/LocalServer.java
labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/examples/classes/com/arjuna/jta/distributed/example/server/RemoteServer.java
labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/examples/classes/com/arjuna/jta/distributed/example/server/impl/ProxyXAResource.java
labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/examples/classes/com/arjuna/jta/distributed/example/server/impl/ProxyXAResourceRecovery.java
labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/examples/classes/com/arjuna/jta/distributed/example/server/impl/ServerImpl.java
labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/SimpleIsolatedServers.java
labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/LocalServer.java
labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/impl/ServerImpl.java
Log:
JBTM-895 make sure that when we create a proxy we have a record of the current transaction, this is important in case the remote side crashes mid action and we therefore need to be able to roll it back
Modified: labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/examples/classes/com/arjuna/jta/distributed/example/ExampleDistributedJTATestCase.java
===================================================================
--- labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/examples/classes/com/arjuna/jta/distributed/example/ExampleDistributedJTATestCase.java 2011-10-20 09:26:01 UTC (rev 37628)
+++ labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/examples/classes/com/arjuna/jta/distributed/example/ExampleDistributedJTATestCase.java 2011-10-20 09:51:23 UTC (rev 37629)
@@ -28,7 +28,6 @@
import javax.transaction.HeuristicMixedException;
import javax.transaction.HeuristicRollbackException;
-import javax.transaction.InvalidTransactionException;
import javax.transaction.NotSupportedException;
import javax.transaction.RollbackException;
import javax.transaction.SystemException;
@@ -68,8 +67,8 @@
}
@Test
- public void testMigrateTransaction() throws NotSupportedException, SystemException, IllegalStateException, RollbackException, InvalidTransactionException,
- XAException, SecurityException, HeuristicMixedException, HeuristicRollbackException {
+ public void testMigrateTransaction() throws NotSupportedException, SystemException, IllegalStateException, RollbackException, XAException,
+ SecurityException, HeuristicMixedException, HeuristicRollbackException, IOException {
int startingTimeout = 0;
List<Integer> nodesToFlowTo = new LinkedList<Integer>(Arrays.asList(new Integer[] { 1000, 2000, 3000, 2000, 1000, 2000, 3000, 1000, 3000 }));
@@ -96,30 +95,23 @@
// SUSPEND THE TRANSACTION
Xid currentXid = originalServer.getCurrentXid();
originalServer.storeRootTransaction();
+ XAResource proxyXAResource = originalServer.generateProxyXAResource(lookupProvider, nextServerNodeName);
transactionManager.suspend();
- boolean proxyRequired = performTransactionalWork(nodesToFlowTo, remainingTimeout, currentXid);
+ performTransactionalWork(nodesToFlowTo, remainingTimeout, currentXid);
transactionManager.resume(originalTransaction);
- // Create a proxy for the new server if necessary, this can orphan
- // the remote server but XA recovery will handle that on the remote
- // server
- // The alternative is to always create a proxy but this is a
- // performance drain and will result in multiple subordinate
- // transactions and performance issues
- if (proxyRequired) {
- XAResource proxyXAResource = originalServer.generateProxyXAResource(lookupProvider, originalServer.getNodeName(), nextServerNodeName);
- originalTransaction.enlistResource(proxyXAResource);
- originalTransaction.registerSynchronization(originalServer.generateProxySynchronization(lookupProvider, originalServer.getNodeName(),
- nextServerNodeName, currentXid));
- }
+ originalTransaction.enlistResource(proxyXAResource);
+ originalTransaction.registerSynchronization(originalServer.generateProxySynchronization(lookupProvider, originalServer.getNodeName(),
+ nextServerNodeName, currentXid));
+
originalServer.removeRootTransaction(currentXid);
}
transactionManager.commit();
Thread.currentThread().setContextClassLoader(classLoader);
}
- private boolean performTransactionalWork(List<Integer> nodesToFlowTo, int remainingTimeout, Xid toMigrate) throws RollbackException,
- InvalidTransactionException, IllegalStateException, XAException, SystemException, NotSupportedException {
+ private boolean performTransactionalWork(List<Integer> nodesToFlowTo, int remainingTimeout, Xid toMigrate) throws RollbackException, IllegalStateException,
+ XAException, SystemException, NotSupportedException, IOException {
Integer currentServerName = nodesToFlowTo.remove(0);
LocalServer currentServer = getLocalServer(currentServerName);
@@ -141,15 +133,17 @@
// SUSPEND THE TRANSACTION
Xid currentXid = currentServer.getCurrentXid();
+ XAResource proxyXAResource = currentServer.generateProxyXAResource(lookupProvider, nextServerNodeName);
transactionManager.suspend();
boolean proxyRequired = performTransactionalWork(nodesToFlowTo, remainingTimeout, currentXid);
transactionManager.resume(transaction);
if (proxyRequired) {
- XAResource proxyXAResource = currentServer.generateProxyXAResource(lookupProvider, currentServer.getNodeName(), nextServerNodeName);
transaction.enlistResource(proxyXAResource);
transaction.registerSynchronization(currentServer.generateProxySynchronization(lookupProvider, currentServer.getNodeName(), nextServerNodeName,
toMigrate));
+ } else {
+ currentServer.cleanupProxyXAResource(proxyXAResource);
}
}
Modified: labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/examples/classes/com/arjuna/jta/distributed/example/server/LocalServer.java
===================================================================
--- labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/examples/classes/com/arjuna/jta/distributed/example/server/LocalServer.java 2011-10-20 09:26:01 UTC (rev 37628)
+++ labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/examples/classes/com/arjuna/jta/distributed/example/server/LocalServer.java 2011-10-20 09:51:23 UTC (rev 37629)
@@ -21,6 +21,7 @@
*/
package com.arjuna.jta.distributed.example.server;
+import java.io.File;
import java.io.IOException;
import javax.transaction.InvalidTransactionException;
@@ -52,8 +53,10 @@
public RemoteServer connectTo();
- public XAResource generateProxyXAResource(LookupProvider lookupProvider, Integer localServerName, Integer remoteServerName);
+ public XAResource generateProxyXAResource(LookupProvider lookupProvider, Integer remoteServerName) throws IOException, SystemException;
+ public void cleanupProxyXAResource(XAResource proxyXAResource);
+
public Synchronization generateProxySynchronization(LookupProvider lookupProvider, Integer localServerName, Integer remoteServerName, Xid toRegisterAgainst);
public Xid getCurrentXid() throws SystemException;
Modified: labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/examples/classes/com/arjuna/jta/distributed/example/server/RemoteServer.java
===================================================================
--- labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/examples/classes/com/arjuna/jta/distributed/example/server/RemoteServer.java 2011-10-20 09:26:01 UTC (rev 37628)
+++ labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/examples/classes/com/arjuna/jta/distributed/example/server/RemoteServer.java 2011-10-20 09:51:23 UTC (rev 37629)
@@ -33,7 +33,7 @@
public void propagateRollback(Xid xid) throws XAException, DummyRemoteException;
- public Xid[] propagateRecover(int formatId, byte[] gtrid) throws XAException, DummyRemoteException;
+ public Xid[] propagateRecover(Integer nodeName) throws XAException, DummyRemoteException;
public void propagateForget(Xid xid) throws XAException, DummyRemoteException;
Modified: labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/examples/classes/com/arjuna/jta/distributed/example/server/impl/ProxyXAResource.java
===================================================================
--- labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/examples/classes/com/arjuna/jta/distributed/example/server/impl/ProxyXAResource.java 2011-10-20 09:26:01 UTC (rev 37628)
+++ labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/examples/classes/com/arjuna/jta/distributed/example/server/impl/ProxyXAResource.java 2011-10-20 09:51:23 UTC (rev 37629)
@@ -21,12 +21,16 @@
*/
package com.arjuna.jta.distributed.example.server.impl;
-import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
-import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
@@ -46,10 +50,10 @@
private int transactionTimeout;
private Integer remoteServerName = -1;
- private File file;
+ private Map<Xid, File> map;
private Integer localServerName;
private LookupProvider lookupProvider;
- private Xid xid;
+ private File file;
/**
* Create a new proxy to the remote server.
@@ -58,10 +62,12 @@
* @param localServerName
* @param remoteServerName
*/
- public ProxyXAResource(LookupProvider lookupProvider, Integer localServerName, Integer remoteServerName) {
+ public ProxyXAResource(LookupProvider lookupProvider, Integer localServerName, Integer remoteServerName, File file) {
this.lookupProvider = lookupProvider;
this.localServerName = localServerName;
this.remoteServerName = remoteServerName;
+ this.file = file;
+ map = new HashMap<Xid, File>();
}
/**
@@ -69,38 +75,20 @@
*
* @param lookupProvider
* @param localServerName
+ * @param map
+ * @param remoteServerName
* @param file
* @throws IOException
*/
- public ProxyXAResource(LookupProvider lookupProvider, Integer localServerName, File file) throws IOException {
+ public ProxyXAResource(LookupProvider lookupProvider, Integer localServerName, Integer remoteServerName, Map<Xid, File> map) throws IOException {
this.lookupProvider = lookupProvider;
this.localServerName = localServerName;
- this.file = file;
- DataInputStream fis = new DataInputStream(new FileInputStream(file));
- this.remoteServerName = fis.readInt();
- final int formatId = fis.readInt();
- int gtrid_length = fis.readInt();
- final byte[] gtrid = new byte[gtrid_length];
- fis.read(gtrid, 0, gtrid_length);
- int bqual_length = fis.readInt();
- final byte[] bqual = new byte[bqual_length];
- fis.read(bqual, 0, bqual_length);
- this.xid = new Xid() {
- @Override
- public byte[] getBranchQualifier() {
- return bqual;
- }
+ this.remoteServerName = remoteServerName;
+ this.map = map;
+ }
- @Override
- public int getFormatId() {
- return formatId;
- }
-
- @Override
- public byte[] getGlobalTransactionId() {
- return gtrid;
- }
- };
+ public void deleteTemporaryFile() {
+ this.file.delete();
}
/**
@@ -109,7 +97,6 @@
@Override
public void start(Xid xid, int flags) throws XAException {
System.out.println(" ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_START [" + xid + "]");
- this.xid = xid;
}
/**
@@ -118,7 +105,6 @@
@Override
public void end(Xid xid, int flags) throws XAException {
System.out.println(" ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_END [" + xid + "]");
- this.xid = null;
}
/**
@@ -130,9 +116,38 @@
public synchronized int prepare(Xid xid) throws XAException {
System.out.println(" ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_PREPARE [" + xid + "]");
- persistProxy(xid);
+ // Persist a proxy for the remote server this can mean we try to recover
+ // a transaction at a remote server that did not get chance to
+ // prepare but the alternative is to orphan a prepared server
try {
+ File dir = new File(System.getProperty("user.dir") + "/distributedjta/ProxyXAResource/" + localServerName + "/");
+ dir.mkdirs();
+ File file = new File(dir, new Uid().fileStringForm());
+ file.createNewFile();
+ DataOutputStream fos = new DataOutputStream(new FileOutputStream(file));
+ fos.writeInt(remoteServerName);
+ fos.writeInt(xid.getFormatId());
+ fos.writeInt(xid.getGlobalTransactionId().length);
+ fos.write(xid.getGlobalTransactionId());
+ fos.writeInt(xid.getBranchQualifier().length);
+ fos.write(xid.getBranchQualifier());
+
+ if (map.containsKey(xid)) {
+ System.out.println(map.get(xid));
+ map.remove(xid).delete();
+ }
+ if (this.file != null) {
+ this.file.delete();
+ }
+
+ map.put(xid, file);
+ } catch (IOException e) {
+ e.printStackTrace();
+ throw new XAException(XAException.XAER_RMERR);
+ }
+
+ try {
int propagatePrepare = lookupProvider.lookup(remoteServerName).propagatePrepare(xid);
System.out.println(" ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_PREPARED");
return propagatePrepare;
@@ -145,8 +160,6 @@
public synchronized void commit(Xid xid, boolean onePhase) throws XAException {
System.out.println(" ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_COMMIT [" + xid + "]");
- persistProxy(xid);
-
try {
lookupProvider.lookup(remoteServerName).propagateCommit(xid, onePhase);
System.out.println(" ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_COMMITED");
@@ -154,6 +167,12 @@
throw new XAException(XAException.XA_RETRY);
}
+ if (map.get(xid) != null) {
+ map.get(xid).delete();
+ map.remove(xid);
+ }
+
+ // THIS CAN ONLY HAPPEN IN 1PC OR ROLLBACK
if (file != null) {
file.delete();
}
@@ -163,8 +182,6 @@
public synchronized void rollback(Xid xid) throws XAException {
System.out.println(" ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_ROLLBACK[" + xid + "]");
- persistProxy(xid);
-
try {
lookupProvider.lookup(remoteServerName).propagateRollback(xid);
System.out.println(" ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_ROLLBACKED");
@@ -179,6 +196,12 @@
}
}
+ if (map.get(xid) != null) {
+ map.get(xid).delete();
+ map.remove(xid);
+ }
+
+ // THIS CAN ONLY HAPPEN IN 1PC OR ROLLBACK
if (file != null) {
file.delete();
}
@@ -196,7 +219,6 @@
*/
@Override
public Xid[] recover(int flag) throws XAException {
- Xid[] recovered = null;
if ((flag & XAResource.TMSTARTRSCAN) == XAResource.TMSTARTRSCAN) {
System.out.println(" ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_RECOVER [XAResource.TMSTARTRSCAN]: "
+ remoteServerName);
@@ -206,29 +228,50 @@
+ remoteServerName);
}
- if (this.xid != null) {
- try {
- recovered = lookupProvider.lookup(remoteServerName).propagateRecover(xid.getFormatId(), xid.getGlobalTransactionId());
- } catch (DummyRemoteException ce) {
- throw new XAException(XAException.XA_RETRY);
+ List<Xid> toReturn = new ArrayList<Xid>();
+ Xid[] recovered = null;
+ try {
+ recovered = lookupProvider.lookup(remoteServerName).propagateRecover(localServerName);
+ } catch (DummyRemoteException ce) {
+ throw new XAException(XAException.XA_RETRY);
+ }
+
+ List<Xid> arrayList = new ArrayList<Xid>();
+ arrayList.addAll(map.keySet());
+ if (recovered != null) {
+ for (int i = 0; i < recovered.length; i++) {
+ System.out.println(" ProxyXAResource (" + localServerName + ":" + remoteServerName + ") recovered: " + recovered[i]);
+ Iterator<Xid> iterator = map.keySet().iterator();
+ while (iterator.hasNext()) {
+ Xid next = iterator.next();
+ if (Arrays.equals(next.getGlobalTransactionId(), recovered[i].getGlobalTransactionId())) {
+ toReturn.add(next);
+ } else if (!iterator.hasNext()) {
+ toReturn.add(recovered[i]);
+ }
+ arrayList.remove(next);
+ }
+ System.out.println(" ProxyXAResource (" + localServerName + ":" + remoteServerName + ") added: " + toReturn.get(toReturn.size() - 1));
}
}
- for (int i = 0; i < recovered.length; i++) {
- System.out.println(" ProxyXAResource (" + localServerName + ":" + remoteServerName + ") recovered: " + recovered[i]);
+ // We now know the remote server didn't know about these Xids
+ List<Xid> knownNoneKnownXids = new ArrayList<Xid>();
+ knownNoneKnownXids.addAll(arrayList);
+ Iterator<Xid> iterator = knownNoneKnownXids.iterator();
+ while (iterator.hasNext()) {
+ Xid next = iterator.next();
+ map.remove(next).delete();
}
-
- Xid[] toReturn = null;
if ((flag & XAResource.TMSTARTRSCAN) == XAResource.TMSTARTRSCAN) {
System.out.println(" ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_RECOVERD[XAResource.TMSTARTRSCAN]: "
+ remoteServerName);
- toReturn = new Xid[] { xid };
}
if ((flag & XAResource.TMENDRSCAN) == XAResource.TMENDRSCAN) {
System.out.println(" ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_RECOVERD[XAResource.TMENDRSCAN]: "
+ remoteServerName);
}
- return toReturn;
+ return toReturn.toArray(new Xid[0]);
}
@Override
@@ -296,30 +339,4 @@
public String getJndiName() {
return "ProxyXAResource: " + localServerName + " " + remoteServerName;
}
-
- private void persistProxy(Xid xid) throws XAException {
- // Persist a proxy for the remote server this can mean we try to recover
- // a transaction at a remote server that did not get chance to
- // prepare but the alternative is to orphan a prepared server
-
- if (this.file == null) {
- try {
- File dir = new File(System.getProperty("user.dir") + "/distributedjta/ProxyXAResource/" + localServerName + "/");
- dir.mkdirs();
- File file = new File(dir, new Uid().fileStringForm());
- file.createNewFile();
- DataOutputStream fos = new DataOutputStream(new FileOutputStream(file));
- fos.writeInt(remoteServerName);
- fos.writeInt(xid.getFormatId());
- fos.writeInt(xid.getGlobalTransactionId().length);
- fos.write(xid.getGlobalTransactionId());
- fos.writeInt(xid.getBranchQualifier().length);
- fos.write(xid.getBranchQualifier());
- this.file = file;
- } catch (IOException e) {
- e.printStackTrace();
- throw new XAException(XAException.XAER_RMERR);
- }
- }
- }
-}
+}
\ No newline at end of file
Modified: labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/examples/classes/com/arjuna/jta/distributed/example/server/impl/ProxyXAResourceRecovery.java
===================================================================
--- labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/examples/classes/com/arjuna/jta/distributed/example/server/impl/ProxyXAResourceRecovery.java 2011-10-20 09:26:01 UTC (rev 37628)
+++ labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/examples/classes/com/arjuna/jta/distributed/example/server/impl/ProxyXAResourceRecovery.java 2011-10-20 09:51:23 UTC (rev 37629)
@@ -21,30 +21,76 @@
*/
package com.arjuna.jta.distributed.example.server.impl;
+import java.io.DataInputStream;
import java.io.File;
+import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
import org.jboss.tm.XAResourceRecovery;
+import com.arjuna.ats.jta.xa.XidImple;
import com.arjuna.jta.distributed.example.server.LookupProvider;
public class ProxyXAResourceRecovery implements XAResourceRecovery {
private List<ProxyXAResource> resources = new ArrayList<ProxyXAResource>();
- public ProxyXAResourceRecovery(LookupProvider lookupProvider, int id) throws IOException {
- File file = new File(System.getProperty("user.dir") + "/distributedjta-example/ProxyXAResource/" + id + "/");
- if (file.exists() && file.isDirectory()) {
- File[] listFiles = file.listFiles();
+ public ProxyXAResourceRecovery(LookupProvider lookupProvider, Integer id) throws IOException {
+ File directory = new File(System.getProperty("user.dir") + "/distributedjta/ProxyXAResource/" + id + "/");
+ Map<Integer, Map<Xid, File>> savedData = new HashMap<Integer, Map<Xid, File>>();
+ if (directory.exists() && directory.isDirectory()) {
+ File[] listFiles = directory.listFiles();
for (int i = 0; i < listFiles.length; i++) {
- File currentFile = listFiles[i];
- resources.add(new ProxyXAResource(lookupProvider, id, currentFile));
+ File file = listFiles[i];
+ DataInputStream fis = new DataInputStream(new FileInputStream(file));
+ int remoteServerName = fis.readInt();
+
+ Map<Xid, File> map = savedData.get(remoteServerName);
+ if (map == null) {
+ map = new HashMap<Xid, File>();
+ savedData.put(remoteServerName, map);
+ }
+ final int formatId = fis.readInt();
+ int gtrid_length = fis.readInt();
+ final byte[] gtrid = new byte[gtrid_length];
+ fis.read(gtrid, 0, gtrid_length);
+
+ int bqual_length = fis.readInt();
+ final byte[] bqual = new byte[bqual_length];
+ fis.read(bqual, 0, bqual_length);
+ Xid xid = new XidImple(new Xid() {
+ @Override
+ public byte[] getBranchQualifier() {
+ return bqual;
+ }
+
+ @Override
+ public int getFormatId() {
+ return formatId;
+ }
+
+ @Override
+ public byte[] getGlobalTransactionId() {
+ return gtrid;
+ }
+ });
+ map.put(xid, file);
}
}
+ Iterator<Integer> iterator = savedData.keySet().iterator();
+ while (iterator.hasNext()) {
+ Integer remoteServerName = iterator.next();
+ Map<Xid, File> map = savedData.get(remoteServerName);
+ resources.add(new ProxyXAResource(lookupProvider, id, remoteServerName, map));
+ }
}
@Override
Modified: labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/examples/classes/com/arjuna/jta/distributed/example/server/impl/ServerImpl.java
===================================================================
--- labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/examples/classes/com/arjuna/jta/distributed/example/server/impl/ServerImpl.java 2011-10-20 09:26:01 UTC (rev 37628)
+++ labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/examples/classes/com/arjuna/jta/distributed/example/server/impl/ServerImpl.java 2011-10-20 09:51:23 UTC (rev 37629)
@@ -21,10 +21,12 @@
*/
package com.arjuna.jta.distributed.example.server.impl;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -43,6 +45,7 @@
import com.arjuna.ats.arjuna.common.CoreEnvironmentBeanException;
import com.arjuna.ats.arjuna.common.ObjectStoreEnvironmentBean;
import com.arjuna.ats.arjuna.common.RecoveryEnvironmentBean;
+import com.arjuna.ats.arjuna.common.Uid;
import com.arjuna.ats.arjuna.coordinator.TxControl;
import com.arjuna.ats.arjuna.recovery.RecoveryManager;
import com.arjuna.ats.arjuna.tools.osb.mbean.ObjStoreBrowser;
@@ -197,11 +200,33 @@
}
@Override
- public ProxyXAResource generateProxyXAResource(LookupProvider lookupProvider, Integer localServerName, Integer remoteServerName) {
- return new ProxyXAResource(lookupProvider, localServerName, remoteServerName);
+ public ProxyXAResource generateProxyXAResource(LookupProvider lookupProvider, Integer remoteServerName) throws IOException, SystemException {
+ // Persist a proxy for the remote server this can mean we try to recover
+ // transactions at a remote server that did not get chance to
+ // prepare but the alternative is to orphan a prepared server
+
+ Xid currentXid = getCurrentXid();
+ File dir = new File(System.getProperty("user.dir") + "/distributedjta/ProxyXAResource/" + getNodeName());
+ dir.mkdirs();
+ File file = new File(dir, new Uid().fileStringForm());
+ file.createNewFile();
+ DataOutputStream fos = new DataOutputStream(new FileOutputStream(file));
+ fos.writeInt(remoteServerName);
+ fos.writeInt(currentXid.getFormatId());
+ fos.writeInt(currentXid.getGlobalTransactionId().length);
+ fos.write(currentXid.getGlobalTransactionId());
+ fos.writeInt(currentXid.getBranchQualifier().length);
+ fos.write(currentXid.getBranchQualifier());
+
+ return new ProxyXAResource(lookupProvider, nodeName, remoteServerName, file);
}
@Override
+ public void cleanupProxyXAResource(XAResource proxyXAResource) {
+ ((ProxyXAResource) proxyXAResource).deleteTemporaryFile();
+ }
+
+ @Override
public Synchronization generateProxySynchronization(LookupProvider lookupProvider, Integer localServerName, Integer remoteServerName, Xid toRegisterAgainst) {
return new ProxySynchronization(lookupProvider, localServerName, remoteServerName, toRegisterAgainst);
}
@@ -246,22 +271,12 @@
}
@Override
- public Xid[] propagateRecover(int formatId, byte[] gtrid) throws XAException, DummyRemoteException {
+ public Xid[] propagateRecover(Integer parentNodeName) throws XAException, DummyRemoteException {
ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
- List<Xid> toReturn = new ArrayList<Xid>();
- Xid[] recovered = ((XATerminatorImple) SubordinationManager.getXATerminator()).recover();
- if (recovered != null) {
- for (int i = 0; i < recovered.length; i++) {
- // Filter out the transactions that are not owned by this
- // parent
- if (recovered[i].getFormatId() == formatId && Arrays.equals(gtrid, recovered[i].getGlobalTransactionId())) {
- toReturn.add(recovered[i]);
- }
- }
- }
- return toReturn.toArray(new Xid[0]);
+ Xid[] recovered = ((XATerminatorImple) SubordinationManager.getXATerminator()).doRecover(parentNodeName);
+ return recovered;
} finally {
Thread.currentThread().setContextClassLoader(contextClassLoader);
}
Modified: labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/SimpleIsolatedServers.java
===================================================================
--- labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/SimpleIsolatedServers.java 2011-10-20 09:26:01 UTC (rev 37628)
+++ labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/SimpleIsolatedServers.java 2011-10-20 09:51:23 UTC (rev 37629)
@@ -133,9 +133,9 @@
Transaction originalTransaction = transactionManager.getTransaction();
int remainingTimeout = (int) (originalServer.getTimeLeftBeforeTransactionTimeout() / 1000);
Xid currentXid = originalServer.getCurrentXid();
+ XAResource proxyXAResource = originalServer.generateProxyXAResource(lookupProvider, 2000);
originalServer.storeRootTransaction();
transactionManager.suspend();
- XAResource proxyXAResource = initializeProxy(originalServer, 2000, currentXid);
performTransactionalWork(null, new LinkedList<Integer>(Arrays.asList(new Integer[] { 2000 })), remainingTimeout, currentXid, 1, false);
transactionManager.resume(originalTransaction);
originalTransaction.enlistResource(proxyXAResource);
@@ -211,8 +211,8 @@
int remainingTimeout = (int) (originalServer.getTimeLeftBeforeTransactionTimeout() / 1000);
Xid currentXid = originalServer.getCurrentXid();
originalServer.storeRootTransaction();
+ XAResource proxyXAResource = originalServer.generateProxyXAResource(lookupProvider, 2000);
transactionManager.suspend();
- XAResource proxyXAResource = initializeProxy(originalServer, 2000, currentXid);
performTransactionalWork(null, new LinkedList<Integer>(Arrays.asList(new Integer[] { 2000 })), remainingTimeout, currentXid, 2, false);
transactionManager.resume(originalTransaction);
originalTransaction.enlistResource(proxyXAResource);
@@ -289,8 +289,8 @@
int remainingTimeout = (int) (originalServer.getTimeLeftBeforeTransactionTimeout() / 1000);
Xid currentXid = originalServer.getCurrentXid();
originalServer.storeRootTransaction();
+ XAResource proxyXAResource = originalServer.generateProxyXAResource(lookupProvider, 2000);
transactionManager.suspend();
- XAResource proxyXAResource = initializeProxy(originalServer, 2000, currentXid);
performTransactionalWork(null, new LinkedList<Integer>(Arrays.asList(new Integer[] { 2000 })), remainingTimeout, currentXid, 2, false);
transactionManager.resume(originalTransaction);
originalTransaction.enlistResource(proxyXAResource);
@@ -427,8 +427,8 @@
int remainingTimeout = (int) (originalServer.getTimeLeftBeforeTransactionTimeout() / 1000);
Xid currentXid = originalServer.getCurrentXid();
originalServer.storeRootTransaction();
+ XAResource proxyXAResource = originalServer.generateProxyXAResource(lookupProvider, 2000);
transactionManager.suspend();
- XAResource proxyXAResource = initializeProxy(originalServer, 2000, currentXid);
performTransactionalWork(null, new LinkedList<Integer>(Arrays.asList(new Integer[] { 2000 })), remainingTimeout, currentXid, 1, false);
transactionManager.resume(originalTransaction);
originalTransaction.enlistResource(proxyXAResource);
@@ -451,8 +451,8 @@
int remainingTimeout = (int) (originalServer.getTimeLeftBeforeTransactionTimeout() / 1000);
Xid currentXid = originalServer.getCurrentXid();
originalServer.storeRootTransaction();
+ XAResource proxyXAResource = originalServer.generateProxyXAResource(lookupProvider, 2000);
transactionManager.suspend();
- XAResource proxyXAResource = initializeProxy(originalServer, 2000, currentXid);
performTransactionalWork(null, new LinkedList<Integer>(Arrays.asList(new Integer[] { 2000 })), remainingTimeout, currentXid, 1, false);
transactionManager.resume(originalTransaction);
originalTransaction.enlistResource(proxyXAResource);
@@ -544,8 +544,8 @@
Xid currentXid = originalServer.getCurrentXid();
originalServer.storeRootTransaction();
originalTransaction.enlistResource(new TestResource(counter, originalServer.getNodeName(), false));
+ XAResource proxyXAResource = originalServer.generateProxyXAResource(lookupProvider, 2000);
transactionManager.suspend();
- XAResource proxyXAResource = initializeProxy(originalServer, 2000, currentXid);
// Migrate a transaction
LocalServer currentServer = getLocalServer(2000);
@@ -658,12 +658,11 @@
// FLOW THE TRANSACTION
remainingTimeout = (int) (currentServer.getTimeLeftBeforeTransactionTimeout() / 1000);
- // SUSPEND THE TRANSACTION
+ // STORE AND SUSPEND THE TRANSACTION
Xid currentXid = currentServer.getCurrentXid();
+ XAResource proxyXAResource = currentServer.generateProxyXAResource(lookupProvider, nodesToFlowTo.get(0));
transactionManager.suspend();
- XAResource proxyXAResource = initializeProxy(currentServer, nodesToFlowTo.get(0), currentXid);
-
boolean proxyRequired = performTransactionalWork(counter, nodesToFlowTo, remainingTimeout, currentXid, numberOfResourcesToRegister,
addSynchronization);
transactionManager.resume(transaction);
@@ -679,7 +678,7 @@
transaction.registerSynchronization(currentServer.generateProxySynchronization(lookupProvider, currentServer.getNodeName(), nextServerNodeName,
toMigrate));
} else {
- currentServer.cleanupProxy(proxyXAResource);
+ currentServer.cleanupProxyXAResource(proxyXAResource);
}
}
@@ -695,28 +694,6 @@
return localServers[index];
}
- private synchronized XAResource initializeProxy(LocalServer server, Integer remoteServerName, Xid xid) throws IOException {
- // Persist a proxy for the remote server this can mean we try to recover
- // transactions at a remote server that did not get chance to
- // prepare but the alternative is to orphan a prepared server
-
- File dir = new File(System.getProperty("user.dir") + "/distributedjta/ProxyXAResource/" + server.getNodeName());
- dir.mkdirs();
- File file = new File(dir, new Uid().fileStringForm());
- file.createNewFile();
- DataOutputStream fos = new DataOutputStream(new FileOutputStream(file));
- fos.writeInt(remoteServerName);
- fos.writeInt(xid.getFormatId());
- fos.writeInt(xid.getGlobalTransactionId().length);
- fos.write(xid.getGlobalTransactionId());
- fos.writeInt(xid.getBranchQualifier().length);
- fos.write(xid.getBranchQualifier());
-
- XAResource proxyXAResource = server.generateProxyXAResource(lookupProvider, server.getNodeName(), remoteServerName, file);
-
- return proxyXAResource;
- }
-
private static class MyLookupProvider implements LookupProvider {
@Override
Modified: labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/LocalServer.java
===================================================================
--- labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/LocalServer.java 2011-10-20 09:26:01 UTC (rev 37628)
+++ labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/LocalServer.java 2011-10-20 09:51:23 UTC (rev 37629)
@@ -58,8 +58,10 @@
public RemoteServer connectTo();
- public XAResource generateProxyXAResource(LookupProvider lookupProvider, Integer localServerName, Integer remoteServerName, File file);
+ public XAResource generateProxyXAResource(LookupProvider lookupProvider, Integer remoteServerName) throws SystemException, IOException;
+ public void cleanupProxyXAResource(XAResource proxyXAResource);
+
public Synchronization generateProxySynchronization(LookupProvider lookupProvider, Integer localServerName, Integer remoteServerName, Xid toRegisterAgainst);
public Xid getCurrentXid() throws SystemException;
@@ -67,6 +69,4 @@
public CompletionCounter getCompletionCounter();
public void shutdown() throws Exception;
-
- public void cleanupProxy(XAResource proxyXAResource);
}
Modified: labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/impl/ServerImpl.java
===================================================================
--- labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/impl/ServerImpl.java 2011-10-20 09:26:01 UTC (rev 37628)
+++ labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/impl/ServerImpl.java 2011-10-20 09:51:23 UTC (rev 37629)
@@ -21,7 +21,9 @@
*/
package com.arjuna.ats.jta.distributed.server.impl;
+import java.io.DataOutputStream;
import java.io.File;
+import java.io.FileOutputStream;
import java.io.IOException;
import java.lang.reflect.Field;
import java.net.InetAddress;
@@ -47,6 +49,7 @@
import com.arjuna.ats.arjuna.common.CoreEnvironmentBeanException;
import com.arjuna.ats.arjuna.common.ObjectStoreEnvironmentBean;
import com.arjuna.ats.arjuna.common.RecoveryEnvironmentBean;
+import com.arjuna.ats.arjuna.common.Uid;
import com.arjuna.ats.arjuna.coordinator.TransactionReaper;
import com.arjuna.ats.arjuna.coordinator.TxControl;
import com.arjuna.ats.arjuna.recovery.RecoveryManager;
@@ -296,13 +299,31 @@
}
@Override
- public ProxyXAResource generateProxyXAResource(LookupProvider lookupProvider, Integer localServerName, Integer remoteServerName, File file) {
- return new ProxyXAResource(counter, lookupProvider, localServerName, remoteServerName, file);
+ public ProxyXAResource generateProxyXAResource(LookupProvider lookupProvider, Integer remoteServerName) throws SystemException, IOException {
+
+ // Persist a proxy for the remote server this can mean we try to recover
+ // transactions at a remote server that did not get chance to
+ // prepare but the alternative is to orphan a prepared server
+
+ Xid currentXid = getCurrentXid();
+ File dir = new File(System.getProperty("user.dir") + "/distributedjta/ProxyXAResource/" + getNodeName());
+ dir.mkdirs();
+ File file = new File(dir, new Uid().fileStringForm());
+ file.createNewFile();
+ DataOutputStream fos = new DataOutputStream(new FileOutputStream(file));
+ fos.writeInt(remoteServerName);
+ fos.writeInt(currentXid.getFormatId());
+ fos.writeInt(currentXid.getGlobalTransactionId().length);
+ fos.write(currentXid.getGlobalTransactionId());
+ fos.writeInt(currentXid.getBranchQualifier().length);
+ fos.write(currentXid.getBranchQualifier());
+
+ return new ProxyXAResource(counter, lookupProvider, getNodeName(), remoteServerName, file);
}
@Override
- public void cleanupProxy(XAResource proxyXAResource) {
- ((ProxyXAResource)proxyXAResource).deleteTemporaryFile();
+ public void cleanupProxyXAResource(XAResource proxyXAResource) {
+ ((ProxyXAResource) proxyXAResource).deleteTemporaryFile();
}
@Override
More information about the jboss-svn-commits
mailing list