[jboss-svn-commits] JBL Code SVN: r37622 - in labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration: examples/classes/com/arjuna/jta/distributed/example/server/impl and 4 other directories.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Wed Oct 19 19:08:16 EDT 2011
Author: tomjenkinson
Date: 2011-10-19 19:08:15 -0400 (Wed, 19 Oct 2011)
New Revision: 37622
Added:
labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/byteman-scripts/leave-subordinate-orphan.txt
Modified:
labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/examples/classes/com/arjuna/jta/distributed/example/server/LocalServer.java
labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/examples/classes/com/arjuna/jta/distributed/example/server/impl/ProxyXAResource.java
labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/examples/classes/com/arjuna/jta/distributed/example/server/impl/ServerImpl.java
labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/SimpleIsolatedServers.java
labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/LocalServer.java
labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/RemoteServer.java
labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/impl/ProxyXAResource.java
labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/impl/ProxyXAResourceRecovery.java
labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/impl/ServerImpl.java
Log:
JBTM-895 ensure that we can recover orphans that are fully prepared on the remote node - by a call to one phase commit
Modified: labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/examples/classes/com/arjuna/jta/distributed/example/server/LocalServer.java
===================================================================
--- labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/examples/classes/com/arjuna/jta/distributed/example/server/LocalServer.java 2011-10-19 21:25:20 UTC (rev 37621)
+++ labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/examples/classes/com/arjuna/jta/distributed/example/server/LocalServer.java 2011-10-19 23:08:15 UTC (rev 37622)
@@ -56,7 +56,5 @@
public Synchronization generateProxySynchronization(LookupProvider lookupProvider, Integer localServerName, Integer remoteServerName, Xid toRegisterAgainst);
- public Xid extractXid(XAResource proxyXAResource);
-
public Xid getCurrentXid() throws SystemException;
}
Modified: labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/examples/classes/com/arjuna/jta/distributed/example/server/impl/ProxyXAResource.java
===================================================================
--- labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/examples/classes/com/arjuna/jta/distributed/example/server/impl/ProxyXAResource.java 2011-10-19 21:25:20 UTC (rev 37621)
+++ labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/examples/classes/com/arjuna/jta/distributed/example/server/impl/ProxyXAResource.java 2011-10-19 23:08:15 UTC (rev 37622)
@@ -130,28 +130,9 @@
public synchronized int prepare(Xid xid) throws XAException {
System.out.println(" ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_PREPARE [" + xid + "]");
- // Persist a proxy for the remote server this can mean we try to recover
- // a transaction at a remote server that did not get chance to
- // prepare but the alternative is to orphan a prepared server
+ persistProxy(xid);
try {
- File dir = new File(System.getProperty("user.dir") + "/distributedjta-example/ProxyXAResource/" + localServerName + "/");
- dir.mkdirs();
- file = new File(dir, new Uid().fileStringForm());
- file.createNewFile();
- DataOutputStream fos = new DataOutputStream(new FileOutputStream(file));
- fos.writeInt(remoteServerName);
- fos.writeInt(xid.getFormatId());
- fos.writeInt(xid.getGlobalTransactionId().length);
- fos.write(xid.getGlobalTransactionId());
- fos.writeInt(xid.getBranchQualifier().length);
- fos.write(xid.getBranchQualifier());
- } catch (IOException e) {
- e.printStackTrace();
- throw new XAException(XAException.XAER_RMERR);
- }
-
- try {
int propagatePrepare = lookupProvider.lookup(remoteServerName).propagatePrepare(xid);
System.out.println(" ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_PREPARED");
return propagatePrepare;
@@ -164,6 +145,8 @@
public synchronized void commit(Xid xid, boolean onePhase) throws XAException {
System.out.println(" ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_COMMIT [" + xid + "]");
+ persistProxy(xid);
+
try {
lookupProvider.lookup(remoteServerName).propagateCommit(xid, onePhase);
System.out.println(" ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_COMMITED");
@@ -179,6 +162,9 @@
@Override
public synchronized void rollback(Xid xid) throws XAException {
System.out.println(" ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_ROLLBACK[" + xid + "]");
+
+ persistProxy(xid);
+
try {
lookupProvider.lookup(remoteServerName).propagateRollback(xid);
System.out.println(" ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_ROLLBACKED");
@@ -311,7 +297,29 @@
return "ProxyXAResource: " + localServerName + " " + remoteServerName;
}
- public Xid getXid() {
- return xid;
+ private void persistProxy(Xid xid) throws XAException {
+ // Persist a proxy for the remote server this can mean we try to recover
+ // a transaction at a remote server that did not get chance to
+ // prepare but the alternative is to orphan a prepared server
+
+ if (this.file == null) {
+ try {
+ File dir = new File(System.getProperty("user.dir") + "/distributedjta/ProxyXAResource/" + localServerName + "/");
+ dir.mkdirs();
+ File file = new File(dir, new Uid().fileStringForm());
+ file.createNewFile();
+ DataOutputStream fos = new DataOutputStream(new FileOutputStream(file));
+ fos.writeInt(remoteServerName);
+ fos.writeInt(xid.getFormatId());
+ fos.writeInt(xid.getGlobalTransactionId().length);
+ fos.write(xid.getGlobalTransactionId());
+ fos.writeInt(xid.getBranchQualifier().length);
+ fos.write(xid.getBranchQualifier());
+ this.file = file;
+ } catch (IOException e) {
+ e.printStackTrace();
+ throw new XAException(XAException.XAER_RMERR);
+ }
+ }
}
}
Modified: labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/examples/classes/com/arjuna/jta/distributed/example/server/impl/ServerImpl.java
===================================================================
--- labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/examples/classes/com/arjuna/jta/distributed/example/server/impl/ServerImpl.java 2011-10-19 21:25:20 UTC (rev 37621)
+++ labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/examples/classes/com/arjuna/jta/distributed/example/server/impl/ServerImpl.java 2011-10-19 23:08:15 UTC (rev 37622)
@@ -289,10 +289,4 @@
Thread.currentThread().setContextClassLoader(contextClassLoader);
}
}
-
- @Override
- public Xid extractXid(XAResource xaResource) {
- ProxyXAResource proxyXAResource = (ProxyXAResource) xaResource;
- return proxyXAResource.getXid();
- }
}
Added: labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/byteman-scripts/leave-subordinate-orphan.txt
===================================================================
--- labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/byteman-scripts/leave-subordinate-orphan.txt (rev 0)
+++ labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/byteman-scripts/leave-subordinate-orphan.txt 2011-10-19 23:08:15 UTC (rev 37622)
@@ -0,0 +1,16 @@
+########################################################################
+#
+# byteman script used to ensure that tests can synchronize with various
+# actions performed by the recovery code
+
+
+
+#########################################################################
+RULE Fail phase2Commit
+CLASS com.arjuna.ats.arjuna.coordinator.BasicAction
+METHOD phase2Commit
+AT ENTRY
+BIND NOTHING
+IF TRUE
+ DO throw new Error()
+ENDRULE
\ No newline at end of file
Modified: labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/SimpleIsolatedServers.java
===================================================================
--- labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/SimpleIsolatedServers.java 2011-10-19 21:25:20 UTC (rev 37621)
+++ labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/SimpleIsolatedServers.java 2011-10-19 23:08:15 UTC (rev 37622)
@@ -100,11 +100,11 @@
// same server
{
RemoteServer server = lookupProvider.lookup(2000);
- server.propagateRecover(0, null);
+ server.propagateRecover(1000);
}
{
RemoteServer server = lookupProvider.lookup(2000);
- server.propagateRecover(0, null);
+ server.propagateRecover(3000);
}
}
@@ -259,11 +259,82 @@
assertTrue(server.getCompletionCounter().getRollbackCount() == 0);
server.doRecoveryManagerScan(true);
assertTrue(server.getCompletionCounter().getCommitCount() == 0);
- assertTrue(server.getCompletionCounter().getRollbackCount() == 0);
+ assertTrue(server.getCompletionCounter().getRollbackCount() == 1);
}
}
@Test
+ @BMScript("leave-subordinate-orphan")
+ public void testOnePhaseSubordinateOrphan() throws Exception {
+ assertTrue(getLocalServer(3000).getCompletionCounter().getCommitCount() == 0);
+ assertTrue(getLocalServer(2000).getCompletionCounter().getCommitCount() == 0);
+ assertTrue(getLocalServer(1000).getCompletionCounter().getCommitCount() == 0);
+ final Phase2CommitAborted phase2CommitAborted = new Phase2CommitAborted();
+ Thread thread = new Thread(new Runnable() {
+ public void run() {
+ int startingTimeout = 0;
+ try {
+ int startingServer = 1000;
+ LocalServer originalServer = getLocalServer(startingServer);
+ ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+ Thread.currentThread().setContextClassLoader(originalServer.getClass().getClassLoader());
+ TransactionManager transactionManager = originalServer.getTransactionManager();
+ transactionManager.setTransactionTimeout(startingTimeout);
+ transactionManager.begin();
+ Transaction originalTransaction = transactionManager.getTransaction();
+ int remainingTimeout = (int) (originalServer.getTimeLeftBeforeTransactionTimeout() / 1000);
+ Xid currentXid = originalServer.getCurrentXid();
+ originalServer.storeRootTransaction();
+ transactionManager.suspend();
+ performTransactionalWork(null, new LinkedList<Integer>(Arrays.asList(new Integer[] { 2000 })), remainingTimeout, currentXid, 2, false);
+ transactionManager.resume(originalTransaction);
+ XAResource proxyXAResource = originalServer.generateProxyXAResource(lookupProvider, originalServer.getNodeName(), 2000);
+ originalTransaction.enlistResource(proxyXAResource);
+ originalServer.removeRootTransaction(currentXid);
+ transactionManager.commit();
+ Thread.currentThread().setContextClassLoader(classLoader);
+ } catch (ExecuteException e) {
+ System.err.println("Should be a thread death but cest la vie");
+ synchronized (phase2CommitAborted) {
+ phase2CommitAborted.setPhase2CommitAborted(true);
+ phase2CommitAborted.notify();
+ }
+ } catch (LinkageError t) {
+ System.err.println("Should be a thread death but cest la vie");
+ synchronized (phase2CommitAborted) {
+ phase2CommitAborted.setPhase2CommitAborted(true);
+ phase2CommitAborted.notify();
+ }
+ } catch (Throwable t) {
+ System.err.println("Should be a thread death but cest la vie");
+ synchronized (phase2CommitAborted) {
+ phase2CommitAborted.setPhase2CommitAborted(true);
+ phase2CommitAborted.notify();
+ }
+ }
+ }
+ }, "Orphan-creator");
+ thread.start();
+ synchronized (phase2CommitAborted) {
+ if (!phase2CommitAborted.isPhase2CommitAborted()) {
+ phase2CommitAborted.wait();
+ }
+ }
+ tearDown();
+ setup();
+ assertTrue(getLocalServer(2000).getCompletionCounter().getCommitCount() == 0);
+ assertTrue(getLocalServer(2000).getCompletionCounter().getRollbackCount() == 0);
+ assertTrue(getLocalServer(1000).getCompletionCounter().getCommitCount() == 0);
+ assertTrue(getLocalServer(1000).getCompletionCounter().getRollbackCount() == 0);
+ getLocalServer(1000).doRecoveryManagerScan(true);
+ assertTrue(getLocalServer(1000).getCompletionCounter().getCommitCount() == 0);
+ assertTrue(getLocalServer(1000).getCompletionCounter().getRollbackCount() == 1);
+ assertTrue(getLocalServer(2000).getCompletionCounter().getCommitCount() == 0);
+ assertTrue(getLocalServer(2000).getCompletionCounter().getRollbackCount() == 2);
+
+ }
+
+ @Test
@BMScript("fail2pc")
public void testRecovery() throws Exception {
tearDown();
Modified: labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/LocalServer.java
===================================================================
--- labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/LocalServer.java 2011-10-19 21:25:20 UTC (rev 37621)
+++ labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/LocalServer.java 2011-10-19 23:08:15 UTC (rev 37622)
@@ -61,8 +61,6 @@
public Synchronization generateProxySynchronization(LookupProvider lookupProvider, Integer localServerName, Integer remoteServerName, Xid toRegisterAgainst);
- public Xid extractXid(XAResource proxyXAResource);
-
public Xid getCurrentXid() throws SystemException;
public CompletionCounter getCompletionCounter();
Modified: labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/RemoteServer.java
===================================================================
--- labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/RemoteServer.java 2011-10-19 21:25:20 UTC (rev 37621)
+++ labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/RemoteServer.java 2011-10-19 23:08:15 UTC (rev 37622)
@@ -35,7 +35,7 @@
public void propagateRollback(Xid xid) throws XAException, DummyRemoteException;
- public Xid[] propagateRecover(int formatId, byte[] gtrid) throws XAException, DummyRemoteException;
+ public Xid[] propagateRecover(Integer parentNodeName) throws XAException, DummyRemoteException;
public void propagateForget(Xid xid) throws XAException, DummyRemoteException;
Modified: labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/impl/ProxyXAResource.java
===================================================================
--- labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/impl/ProxyXAResource.java 2011-10-19 21:25:20 UTC (rev 37621)
+++ labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/impl/ProxyXAResource.java 2011-10-19 23:08:15 UTC (rev 37622)
@@ -21,12 +21,16 @@
*/
package com.arjuna.ats.jta.distributed.server.impl;
-import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
-import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
@@ -47,10 +51,9 @@
private int transactionTimeout;
private Integer remoteServerName = -1;
- private File file;
+ private Map<Xid, File> map;
private Integer localServerName;
private LookupProvider lookupProvider;
- private Xid xid;
private CompletionCounter completionCounter;
/**
@@ -65,6 +68,7 @@
this.lookupProvider = lookupProvider;
this.localServerName = localServerName;
this.remoteServerName = remoteServerName;
+ map = new HashMap<Xid, File>();
}
/**
@@ -72,39 +76,18 @@
*
* @param lookupProvider
* @param localServerName
+ * @param map
+ * @param remoteServerName
* @param file
* @throws IOException
*/
- public ProxyXAResource(CompletionCounter completionCounter, LookupProvider lookupProvider, Integer localServerName, File file) throws IOException {
+ public ProxyXAResource(CompletionCounter completionCounter, LookupProvider lookupProvider, Integer localServerName, Integer remoteServerName,
+ Map<Xid, File> map) throws IOException {
this.completionCounter = completionCounter;
this.lookupProvider = lookupProvider;
this.localServerName = localServerName;
- this.file = file;
- DataInputStream fis = new DataInputStream(new FileInputStream(file));
- this.remoteServerName = fis.readInt();
- final int formatId = fis.readInt();
- int gtrid_length = fis.readInt();
- final byte[] gtrid = new byte[gtrid_length];
- fis.read(gtrid, 0, gtrid_length);
- int bqual_length = fis.readInt();
- final byte[] bqual = new byte[bqual_length];
- fis.read(bqual, 0, bqual_length);
- this.xid = new Xid() {
- @Override
- public byte[] getBranchQualifier() {
- return bqual;
- }
-
- @Override
- public int getFormatId() {
- return formatId;
- }
-
- @Override
- public byte[] getGlobalTransactionId() {
- return gtrid;
- }
- };
+ this.remoteServerName = remoteServerName;
+ this.map = map;
}
/**
@@ -113,7 +96,6 @@
@Override
public void start(Xid xid, int flags) throws XAException {
System.out.println(" ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_START [" + xid + "]");
- this.xid = xid;
}
/**
@@ -122,7 +104,6 @@
@Override
public void end(Xid xid, int flags) throws XAException {
System.out.println(" ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_END [" + xid + "]");
- this.xid = null;
}
/**
@@ -134,28 +115,9 @@
public synchronized int prepare(Xid xid) throws XAException {
System.out.println(" ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_PREPARE [" + xid + "]");
- // Persist a proxy for the remote server this can mean we try to recover
- // a transaction at a remote server that did not get chance to
- // prepare but the alternative is to orphan a prepared server
+ persistProxy(xid);
try {
- File dir = new File(System.getProperty("user.dir") + "/distributedjta/ProxyXAResource/" + localServerName + "/");
- dir.mkdirs();
- file = new File(dir, new Uid().fileStringForm());
- file.createNewFile();
- DataOutputStream fos = new DataOutputStream(new FileOutputStream(file));
- fos.writeInt(remoteServerName);
- fos.writeInt(xid.getFormatId());
- fos.writeInt(xid.getGlobalTransactionId().length);
- fos.write(xid.getGlobalTransactionId());
- fos.writeInt(xid.getBranchQualifier().length);
- fos.write(xid.getBranchQualifier());
- } catch (IOException e) {
- e.printStackTrace();
- throw new XAException(XAException.XAER_RMERR);
- }
-
- try {
int propagatePrepare = lookupProvider.lookup(remoteServerName).propagatePrepare(xid);
System.out.println(" ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_PREPARED");
return propagatePrepare;
@@ -168,6 +130,8 @@
public synchronized void commit(Xid xid, boolean onePhase) throws XAException {
System.out.println(" ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_COMMIT [" + xid + "]");
+ persistProxy(xid);
+
try {
lookupProvider.lookup(remoteServerName).propagateCommit(xid, onePhase);
System.out.println(" ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_COMMITED");
@@ -175,8 +139,9 @@
throw new XAException(XAException.XA_RETRY);
}
- if (file != null) {
- file.delete();
+ if (map.get(xid) != null) {
+ map.get(xid).delete();
+ map.remove(xid);
}
if (completionCounter != null) {
completionCounter.incrementCommit();
@@ -187,6 +152,8 @@
public synchronized void rollback(Xid xid) throws XAException {
System.out.println(" ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_ROLLBACK[" + xid + "]");
+ persistProxy(xid);
+
try {
lookupProvider.lookup(remoteServerName).propagateRollback(xid);
System.out.println(" ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_ROLLBACKED");
@@ -201,8 +168,9 @@
}
}
- if (file != null) {
- file.delete();
+ if (map.get(xid) != null) {
+ map.get(xid).delete();
+ map.remove(xid);
}
if (completionCounter != null) {
completionCounter.incrementRollback();
@@ -221,7 +189,6 @@
*/
@Override
public Xid[] recover(int flag) throws XAException {
- Xid[] recovered = null;
if ((flag & XAResource.TMSTARTRSCAN) == XAResource.TMSTARTRSCAN) {
System.out.println(" ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_RECOVER [XAResource.TMSTARTRSCAN]: "
+ remoteServerName);
@@ -231,29 +198,45 @@
+ remoteServerName);
}
- if (this.xid != null) {
- try {
- recovered = lookupProvider.lookup(remoteServerName).propagateRecover(xid.getFormatId(), xid.getGlobalTransactionId());
- } catch (DummyRemoteException ce) {
- throw new XAException(XAException.XA_RETRY);
- }
+ List<Xid> toReturn = new ArrayList<Xid>();
+ Xid[] recovered = null;
+ try {
+ recovered = lookupProvider.lookup(remoteServerName).propagateRecover(localServerName);
+ } catch (DummyRemoteException ce) {
+ throw new XAException(XAException.XA_RETRY);
}
+ List<Xid> arrayList = new ArrayList<Xid>();
+ arrayList.addAll(map.keySet());
for (int i = 0; i < recovered.length; i++) {
System.out.println(" ProxyXAResource (" + localServerName + ":" + remoteServerName + ") recovered: " + recovered[i]);
+ Iterator<Xid> iterator = map.keySet().iterator();
+ while (iterator.hasNext()) {
+ Xid next = iterator.next();
+ if (Arrays.equals(next.getGlobalTransactionId(), recovered[i].getGlobalTransactionId())) {
+ toReturn.add(next);
+ } else if (!iterator.hasNext()) {
+ toReturn.add(recovered[i]);
+ }
+ arrayList.remove(next);
+ }
+ System.out.println(" ProxyXAResource (" + localServerName + ":" + remoteServerName + ") added: " + toReturn.get(toReturn.size() - 1));
}
- Xid[] toReturn = null;
+ Iterator<Xid> iterator = arrayList.iterator();
+ while (iterator.hasNext()) {
+ Xid next = iterator.next();
+ toReturn.add(next);
+ }
if ((flag & XAResource.TMSTARTRSCAN) == XAResource.TMSTARTRSCAN) {
System.out.println(" ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_RECOVERD[XAResource.TMSTARTRSCAN]: "
+ remoteServerName);
- toReturn = new Xid[] { xid };
}
if ((flag & XAResource.TMENDRSCAN) == XAResource.TMENDRSCAN) {
System.out.println(" ProxyXAResource (" + localServerName + ":" + remoteServerName + ") XA_RECOVERD[XAResource.TMENDRSCAN]: "
+ remoteServerName);
}
- return toReturn;
+ return toReturn.toArray(new Xid[0]);
}
@Override
@@ -322,7 +305,29 @@
return "ProxyXAResource: " + localServerName + " " + remoteServerName;
}
- public Xid getXid() {
- return xid;
+ private void persistProxy(Xid xid) throws XAException {
+ // Persist a proxy for the remote server this can mean we try to recover
+ // a transaction at a remote server that did not get chance to
+ // prepare but the alternative is to orphan a prepared server
+
+ if (!map.containsKey(xid)) {
+ try {
+ File dir = new File(System.getProperty("user.dir") + "/distributedjta/ProxyXAResource/" + localServerName + "/");
+ dir.mkdirs();
+ File file = new File(dir, new Uid().fileStringForm());
+ file.createNewFile();
+ DataOutputStream fos = new DataOutputStream(new FileOutputStream(file));
+ fos.writeInt(remoteServerName);
+ fos.writeInt(xid.getFormatId());
+ fos.writeInt(xid.getGlobalTransactionId().length);
+ fos.write(xid.getGlobalTransactionId());
+ fos.writeInt(xid.getBranchQualifier().length);
+ fos.write(xid.getBranchQualifier());
+ map.put(xid, file);
+ } catch (IOException e) {
+ e.printStackTrace();
+ throw new XAException(XAException.XAER_RMERR);
+ }
+ }
}
}
Modified: labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/impl/ProxyXAResourceRecovery.java
===================================================================
--- labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/impl/ProxyXAResourceRecovery.java 2011-10-19 21:25:20 UTC (rev 37621)
+++ labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/impl/ProxyXAResourceRecovery.java 2011-10-19 23:08:15 UTC (rev 37622)
@@ -21,31 +21,76 @@
*/
package com.arjuna.ats.jta.distributed.server.impl;
+import java.io.DataInputStream;
import java.io.File;
+import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
import org.jboss.tm.XAResourceRecovery;
import com.arjuna.ats.jta.distributed.server.CompletionCounter;
import com.arjuna.ats.jta.distributed.server.LookupProvider;
+import com.arjuna.ats.jta.xa.XidImple;
public class ProxyXAResourceRecovery implements XAResourceRecovery {
private List<ProxyXAResource> resources = new ArrayList<ProxyXAResource>();
public ProxyXAResourceRecovery(CompletionCounter counter, LookupProvider lookupProvider, Integer id) throws IOException {
- File file = new File(System.getProperty("user.dir") + "/distributedjta/ProxyXAResource/" + id + "/");
- if (file.exists() && file.isDirectory()) {
- File[] listFiles = file.listFiles();
+ File directory = new File(System.getProperty("user.dir") + "/distributedjta/ProxyXAResource/" + id + "/");
+ Map<Integer, Map<Xid, File>> savedData = new HashMap<Integer, Map<Xid, File>>();
+ if (directory.exists() && directory.isDirectory()) {
+ File[] listFiles = directory.listFiles();
for (int i = 0; i < listFiles.length; i++) {
- File currentFile = listFiles[i];
- resources.add(new ProxyXAResource(counter, lookupProvider, id, currentFile));
+ File file = listFiles[i];
+ DataInputStream fis = new DataInputStream(new FileInputStream(file));
+ int remoteServerName = fis.readInt();
+
+ Map<Xid, File> map = savedData.get(remoteServerName);
+ if (map == null) {
+ map = new HashMap<Xid, File>();
+ savedData.put(remoteServerName, map);
+ }
+ final int formatId = fis.readInt();
+ int gtrid_length = fis.readInt();
+ final byte[] gtrid = new byte[gtrid_length];
+ fis.read(gtrid, 0, gtrid_length);
+ int bqual_length = fis.readInt();
+ final byte[] bqual = new byte[bqual_length];
+ fis.read(bqual, 0, bqual_length);
+ Xid xid = new XidImple(new Xid() {
+ @Override
+ public byte[] getBranchQualifier() {
+ return bqual;
+ }
+
+ @Override
+ public int getFormatId() {
+ return formatId;
+ }
+
+ @Override
+ public byte[] getGlobalTransactionId() {
+ return gtrid;
+ }
+ });
+ map.put(xid, file);
}
}
+ Iterator<Integer> iterator = savedData.keySet().iterator();
+ while (iterator.hasNext()) {
+ Integer remoteServerName = iterator.next();
+ Map<Xid, File> map = savedData.get(remoteServerName);
+ resources.add(new ProxyXAResource(counter, lookupProvider, id, remoteServerName, map));
+ }
}
@Override
Modified: labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/impl/ServerImpl.java
===================================================================
--- labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/impl/ServerImpl.java 2011-10-19 21:25:20 UTC (rev 37621)
+++ labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/server/impl/ServerImpl.java 2011-10-19 23:08:15 UTC (rev 37622)
@@ -25,7 +25,6 @@
import java.lang.reflect.Field;
import java.net.InetAddress;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -37,7 +36,6 @@
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import javax.transaction.xa.XAException;
-import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import org.jboss.tm.TransactionTimeoutConfiguration;
@@ -68,6 +66,8 @@
import com.arjuna.ats.jta.distributed.server.LocalServer;
import com.arjuna.ats.jta.distributed.server.LookupProvider;
import com.arjuna.ats.jta.distributed.server.RemoteServer;
+import com.arjuna.ats.jta.xa.XATxConverter;
+import com.arjuna.ats.jta.xa.XidImple;
public class ServerImpl implements LocalServer, RemoteServer {
@@ -349,7 +349,7 @@
}
@Override
- public Xid[] propagateRecover(int formatId, byte[] gtrid) throws XAException, DummyRemoteException {
+ public Xid[] propagateRecover(Integer parentNodeName) throws XAException, DummyRemoteException {
ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
@@ -359,7 +359,8 @@
for (int i = 0; i < recovered.length; i++) {
// Filter out the transactions that are not owned by this
// parent
- if (recovered[i].getFormatId() == formatId && Arrays.equals(gtrid, recovered[i].getGlobalTransactionId())) {
+ if ((recovered[i].getFormatId() == XATxConverter.FORMAT_ID && parentNodeName == XATxConverter.getParentNodeName(((XidImple) recovered[i])
+ .getXID()))) {
toReturn.add(recovered[i]);
}
}
@@ -394,12 +395,6 @@
}
@Override
- public Xid extractXid(XAResource xaResource) {
- ProxyXAResource proxyXAResource = (ProxyXAResource) xaResource;
- return proxyXAResource.getXid();
- }
-
- @Override
public CompletionCounter getCompletionCounter() {
return counter;
}
More information about the jboss-svn-commits
mailing list