[jboss-svn-commits] JBL Code SVN: r37545 - in labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration: tests and 7 other directories.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Fri Oct 14 19:48:41 EDT 2011
Author: tomjenkinson
Date: 2011-10-14 19:48:41 -0400 (Fri, 14 Oct 2011)
New Revision: 37545
Added:
labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/
labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/
labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/
labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/
labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/
labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/
labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/
labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/IsolatableServersClassLoader.java
labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/Server.java
labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/SimpleIsolatedServers.java
labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/TestResource.java
labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/TestResourceRecovery.java
labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/TestSynchronization.java
labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/impl/
labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/impl/ProxySynchronization.java
labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/impl/ProxyXAResource.java
labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/impl/ProxyXAResourceRecovery.java
labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/impl/ServerImpl.java
Log:
JBTM-916 added a test bed for the distributed JTA work
Added: labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/IsolatableServersClassLoader.java
===================================================================
--- labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/IsolatableServersClassLoader.java (rev 0)
+++ labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/IsolatableServersClassLoader.java 2011-10-14 23:48:41 UTC (rev 37545)
@@ -0,0 +1,91 @@
+package com.arjuna.ats.jta.distributed;
+
+import java.io.ByteArrayOutputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLConnection;
+import java.util.HashMap;
+import java.util.Map;
+
+public class IsolatableServersClassLoader extends ClassLoader {
+
+ private Map<String, Class<?>> clazzMap = new HashMap<String, Class<?>>();
+ private Method m;
+
+ public IsolatableServersClassLoader(ClassLoader parent) throws SecurityException, NoSuchMethodException {
+ super(parent);
+ m = ClassLoader.class.getDeclaredMethod("findLoadedClass", new Class[] { String.class });
+ m.setAccessible(true);
+ }
+
+ @Override
+ protected Class<?> findClass(String name) throws ClassNotFoundException {
+ if (clazzMap.containsKey(name)) {
+ return clazzMap.get(name);
+ }
+ return super.findClass(name);
+ }
+
+ public Class<?> loadClass(String name) throws ClassNotFoundException {
+ Class<?> clazz = null;
+ if (clazzMap.containsKey(name)) {
+ clazz = clazzMap.get(name);
+ }
+
+ try {
+ ClassLoader parent2 = getParent();
+ Object test1 = m.invoke(parent2, name);
+ if (test1 != null) {
+
+ if (!name.equals("")) {
+ clazz = super.loadClass(name);
+ }
+ } else {
+ try {
+ String url = "file:" + System.getProperty("user.dir") + "/bin/" + name.replace('.', '/') + ".class";
+ URL myUrl = new URL(url);
+ try {
+ URLConnection connection = myUrl.openConnection();
+ InputStream input = connection.getInputStream();
+ ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+ int data = input.read();
+
+ while (data != -1) {
+ buffer.write(data);
+ data = input.read();
+ }
+
+ input.close();
+
+ byte[] classData = buffer.toByteArray();
+
+ clazz = defineClass(name, classData, 0, classData.length);
+ clazzMap.put(name, clazz);
+ } catch (FileNotFoundException fnfe) {
+ return super.loadClass(name);
+ }
+ } catch (MalformedURLException e) {
+ e.printStackTrace();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ } catch (IllegalArgumentException e1) {
+ // TODO Auto-generated catch block
+ e1.printStackTrace();
+ } catch (IllegalAccessException e1) {
+ // TODO Auto-generated catch block
+ e1.printStackTrace();
+ } catch (InvocationTargetException e1) {
+ // TODO Auto-generated catch block
+ e1.printStackTrace();
+ }
+
+ return clazz;
+ }
+}
\ No newline at end of file
Added: labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/Server.java
===================================================================
--- labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/Server.java (rev 0)
+++ labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/Server.java 2011-10-14 23:48:41 UTC (rev 37545)
@@ -0,0 +1,55 @@
+package com.arjuna.ats.jta.distributed;
+
+import java.io.IOException;
+import java.util.List;
+
+import javax.transaction.HeuristicCommitException;
+import javax.transaction.HeuristicMixedException;
+import javax.transaction.HeuristicRollbackException;
+import javax.transaction.InvalidTransactionException;
+import javax.transaction.NotSupportedException;
+import javax.transaction.RollbackException;
+import javax.transaction.Synchronization;
+import javax.transaction.SystemException;
+import javax.transaction.TransactionManager;
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import com.arjuna.ats.arjuna.common.CoreEnvironmentBeanException;
+
+public interface Server {
+
+ public void initialise(int nodeName) throws CoreEnvironmentBeanException, IOException;
+
+ public int getNodeName();
+
+ public TransactionManager getTransactionManager() throws NotSupportedException, SystemException;
+
+ public boolean importTransaction(int remainingTimeout, Xid toMigrate) throws XAException, InvalidTransactionException, IllegalStateException,
+ SystemException;
+
+ public void propagateCommit(Xid xid, boolean onePhase) throws IllegalStateException, HeuristicMixedException, HeuristicRollbackException,
+ HeuristicCommitException, SystemException, XAException;
+
+ public int propagatePrepare(Xid xid) throws XAException;
+
+ public void propagateRollback(Xid xid) throws IllegalStateException, HeuristicMixedException, HeuristicCommitException, HeuristicRollbackException,
+ SystemException, XAException;
+
+ public void propagateForget(Xid xid) throws XAException;
+
+ public void doRecoveryManagerScan();
+
+ public Xid[] propagateRecover(List<Integer> startScanned, int flag) throws XAException;
+
+ public long getTimeLeftBeforeTransactionTimeout() throws RollbackException;
+
+ public void propagateBeforeCompletion(Xid xid) throws XAException, SystemException;
+
+ public Xid getCurrentXid() throws SystemException;
+
+ public XAResource generateProxyXAResource(int currentNodeName, int nextNodeName);
+
+ public Synchronization generateProxySynchronization(int serverId, int serverIdToProxyTo, Xid toRegisterAgainst);
+}
Added: labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/SimpleIsolatedServers.java
===================================================================
--- labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/SimpleIsolatedServers.java (rev 0)
+++ labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/SimpleIsolatedServers.java 2011-10-14 23:48:41 UTC (rev 37545)
@@ -0,0 +1,143 @@
+package com.arjuna.ats.jta.distributed;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+
+import javax.transaction.HeuristicMixedException;
+import javax.transaction.HeuristicRollbackException;
+import javax.transaction.InvalidTransactionException;
+import javax.transaction.NotSupportedException;
+import javax.transaction.RollbackException;
+import javax.transaction.SystemException;
+import javax.transaction.Transaction;
+import javax.transaction.TransactionManager;
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.arjuna.ats.arjuna.common.CoreEnvironmentBeanException;
+
+public class SimpleIsolatedServers {
+ private static Server[] servers = new Server[3];
+
+ @BeforeClass
+ public static void setup() throws SecurityException, NoSuchMethodException, InstantiationException, IllegalAccessException, ClassNotFoundException,
+ CoreEnvironmentBeanException, IOException {
+
+ // Get the Server interface loaded, only way I found to do this was
+ // instantiate one
+ Server server = (Server) java.lang.reflect.Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[] { Server.class },
+ new InvocationHandler() {
+
+ @Override
+ public Object invoke(Object arg0, Method arg1, Object[] arg2) throws Throwable {
+ // TODO Auto-generated method stub
+ return null;
+ }
+ });
+
+ ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
+ for (int i = 0; i < getServers().length; i++) {
+ IsolatableServersClassLoader classLoader = new IsolatableServersClassLoader(contextClassLoader);
+ getServers()[i] = (Server) classLoader.loadClass("com.arjuna.ats.jta.distributed.impl.ServerImpl").newInstance();
+ getServers()[i].initialise((i + 1) * 1000);
+ }
+ }
+
+ @Test
+ public void testRecovery() {
+ getServers()[0].doRecoveryManagerScan();
+ }
+
+ @Test
+ public void testMigrateTransaction() throws NotSupportedException, SystemException, IllegalStateException, RollbackException, InvalidTransactionException,
+ XAException, SecurityException, HeuristicMixedException, HeuristicRollbackException {
+
+ File file = new File(System.getProperty("user.dir") + "/tmp/");
+ if (file.exists()) {
+ file.delete();
+ }
+ int startingTimeout = 0;
+
+ // Start out at the first server
+ Server originalServer = getServers()[0];
+ TransactionManager transactionManager = getServers()[0].getTransactionManager();
+ transactionManager.setTransactionTimeout(startingTimeout);
+ transactionManager.begin();
+ Transaction originalTransaction = transactionManager.getTransaction();
+ originalTransaction.registerSynchronization(new TestSynchronization(originalServer.getNodeName()));
+ originalTransaction.enlistResource(new TestResource(originalServer.getNodeName(), false));
+ Xid toMigrate = originalServer.getCurrentXid();
+
+ // Loop through the rest of the servers passing the transaction up and
+ // down
+ Transaction suspendedTransaction = originalServer.getTransactionManager().suspend();
+ boolean proxyRequired = recursivelyFlowTransaction(0, 1, toMigrate);
+ originalServer.getTransactionManager().resume(suspendedTransaction);
+ if (proxyRequired) {
+ XAResource proxyXAResource = originalServer.generateProxyXAResource(originalServer.getNodeName(), getServers()[1].getNodeName());
+ originalTransaction.enlistResource(proxyXAResource);
+ originalTransaction.registerSynchronization(originalServer.generateProxySynchronization(originalServer.getNodeName(),
+ getServers()[1].getNodeName(), toMigrate));
+ }
+
+ Transaction transaction = transactionManager.getTransaction();
+ transaction.commit();
+ }
+
+ private boolean recursivelyFlowTransaction(int previousServerIndex, int currentServerIndex, Xid toMigrate) throws RollbackException,
+ InvalidTransactionException, IllegalStateException, XAException, SystemException, NotSupportedException {
+
+ Server previousServer = getServers()[previousServerIndex];
+ Server currentServer = getServers()[currentServerIndex];
+
+ // Migrate the transaction to the next server
+ int remainingTimeout = (int) (previousServer.getTimeLeftBeforeTransactionTimeout() / 1000);
+
+ boolean requiresProxyAtPreviousServer = !currentServer.importTransaction(remainingTimeout, toMigrate);
+ // Perform work on the migrated transaction
+ TransactionManager transactionManager = currentServer.getTransactionManager();
+ Transaction transaction = transactionManager.getTransaction();
+ transaction.registerSynchronization(new TestSynchronization(currentServer.getNodeName()));
+ transaction.enlistResource(new TestResource(currentServer.getNodeName(), false));
+
+ int nextNextServerIndex = -1;
+ if (currentServerIndex > previousServerIndex && currentServerIndex + 1 != getServers().length) {
+ // Ascending
+ nextNextServerIndex = currentServerIndex + 1;
+ } else {
+ // Descending
+ nextNextServerIndex = currentServerIndex - 1;
+ }
+
+ // THE WORKHORSE OF FLOWING A TRANSACTION
+ // SUSPEND THE TRANSACTION
+ Transaction suspendedTransaction = currentServer.getTransactionManager().suspend();
+ boolean proxyRequired = false;
+ if (nextNextServerIndex != -1) {
+ // FLOW THE TRANSACTION
+ proxyRequired = recursivelyFlowTransaction(currentServerIndex, nextNextServerIndex, toMigrate);
+ }
+ // RESUME THE TRANSACTION IN CASE THERE IS MORE WORK
+ currentServer.getTransactionManager().resume(suspendedTransaction);
+ // Create a proxy for the new server if necessary
+ if (proxyRequired) {
+ XAResource proxyXAResource = currentServer.generateProxyXAResource(currentServer.getNodeName(), getServers()[nextNextServerIndex].getNodeName());
+ suspendedTransaction.enlistResource(proxyXAResource);
+ suspendedTransaction.registerSynchronization(currentServer.generateProxySynchronization(currentServer.getNodeName(),
+ getServers()[nextNextServerIndex].getNodeName(), toMigrate));
+ }
+ // SUSPEND THE TRANSACTION WHEN YOU ARE READY TO RETURN TO YOUR CALLER
+ suspendedTransaction = currentServer.getTransactionManager().suspend();
+ return requiresProxyAtPreviousServer;
+ }
+
+ public static Server[] getServers() {
+ return servers;
+ }
+}
Added: labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/TestResource.java
===================================================================
--- labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/TestResource.java (rev 0)
+++ labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/TestResource.java 2011-10-14 23:48:41 UTC (rev 37545)
@@ -0,0 +1,185 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags.
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ * MA 02110-1301, USA.
+ *
+ * (C) 2005-2006,
+ * @author JBoss Inc.
+ */
+
+package com.arjuna.ats.jta.distributed;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import com.arjuna.ats.arjuna.common.Uid;
+
+public class TestResource implements XAResource {
+ private Xid xid;
+
+ protected int timeout = 0;
+
+ private boolean readonly = false;
+
+ private File file;
+
+ private int serverId;
+
+ public TestResource(int serverId, boolean readonly) {
+ this.serverId = serverId;
+ this.readonly = readonly;
+ }
+
+ public TestResource(int serverId, File file) throws IOException {
+ this.serverId = serverId;
+ this.file = file;
+ DataInputStream fis = new DataInputStream(new FileInputStream(file));
+ final int formatId = fis.readInt();
+ final int gtrid_length = fis.readInt();
+ final byte[] gtrid = new byte[gtrid_length];
+ fis.read(gtrid, 0, gtrid_length);
+ final int bqual_length = fis.readInt();
+ final byte[] bqual = new byte[bqual_length];
+ fis.read(bqual, 0, bqual_length);
+ this.xid = new Xid() {
+
+ @Override
+ public byte[] getGlobalTransactionId() {
+ return gtrid;
+ }
+
+ @Override
+ public int getFormatId() {
+ return formatId;
+ }
+
+ @Override
+ public byte[] getBranchQualifier() {
+ return bqual;
+ }
+ };
+ }
+
+ public synchronized int prepare(Xid xid) throws XAException {
+ System.out.println(" TestResource (" + serverId + ") XA_PREPARE [" + xid + "]");
+
+ if (readonly)
+ return XA_RDONLY;
+ else {
+ File dir = new File(System.getProperty("user.dir") + "/tmp/TestResource/" + serverId + "/");
+ dir.mkdirs();
+ file = new File(dir, new Uid().fileStringForm() + "_");
+ try {
+ file.createNewFile();
+ final int formatId = xid.getFormatId();
+ final byte[] gtrid = xid.getGlobalTransactionId();
+ final int gtrid_length = gtrid.length;
+ final byte[] bqual = xid.getBranchQualifier();
+ final int bqual_length = bqual.length;
+
+ DataOutputStream fos = new DataOutputStream(new FileOutputStream(file));
+ fos.writeInt(formatId);
+ fos.writeInt(gtrid_length);
+ fos.write(gtrid, 0, gtrid_length);
+ fos.writeInt(bqual_length);
+ fos.write(bqual, 0, bqual_length);
+ } catch (IOException e) {
+ e.printStackTrace();
+ throw new XAException(XAException.XAER_RMERR);
+ }
+ return XA_OK;
+ }
+
+ // throw new XAException();
+ }
+
+ public synchronized void commit(Xid id, boolean onePhase) throws XAException {
+ System.out.println(" TestResource (" + serverId + ") XA_COMMIT [" + id + "]");
+ // String absoluteFile = file.getAbsolutePath();
+ // String newName = absoluteFile.substring(0, absoluteFile.length() -
+ // 1);
+ // File file2 = new File(newName);
+ // file.renameTo(file2);
+ if (file != null) {
+ file.delete();
+ }
+ }
+
+ public synchronized void rollback(Xid xid) throws XAException {
+ System.out.println(" TestResource (" + serverId + ") XA_ROLLBACK[" + xid + "]");
+ if (file != null) {
+ file.delete();
+ }
+ }
+
+ public void end(Xid xid, int flags) throws XAException {
+ System.out.println(" TestResource (" + serverId + ") XA_END [" + xid + "] Flags=" + flags);
+ }
+
+ public void forget(Xid xid) throws XAException {
+ System.out.println(" TestResource (" + serverId + ") XA_FORGET[" + xid + "]");
+ }
+
+ public int getTransactionTimeout() throws XAException {
+ return (timeout);
+ }
+
+ public boolean isSameRM(XAResource xares) throws XAException {
+ if (xares instanceof TestResource) {
+ TestResource other = (TestResource) xares;
+ if ((this.xid != null && other.xid != null)) {
+ if (this.xid.getFormatId() == other.xid.getFormatId()) {
+ if (Arrays.equals(this.xid.getGlobalTransactionId(), other.xid.getGlobalTransactionId())) {
+ if (Arrays.equals(this.xid.getBranchQualifier(), other.xid.getBranchQualifier())) {
+ return true;
+ }
+ }
+ }
+ }
+ }
+ return false;
+ }
+
+ public Xid[] recover(int flag) throws XAException {
+ if ((flag & XAResource.TMSTARTRSCAN) == XAResource.TMSTARTRSCAN) {
+ System.out.println(" TestResource (" + serverId + ") RECOVER[XAResource.TMSTARTRSCAN]: " + serverId);
+ }
+ if ((flag & XAResource.TMENDRSCAN) == XAResource.TMENDRSCAN) {
+ System.out.println(" TestResource (" + serverId + ") RECOVER[XAResource.TMENDRSCAN]: " + serverId);
+ }
+ if (flag == XAResource.TMNOFLAGS) {
+ System.out.println(" TestResource (" + serverId + ") RECOVER[XAResource.TMENDRSCAN]: " + serverId);
+ }
+ return new Xid[] { xid };
+ }
+
+ public boolean setTransactionTimeout(int seconds) throws XAException {
+ timeout = seconds;
+ return (true);
+ }
+
+ public void start(Xid xid, int flags) throws XAException {
+ System.out.println(" TestResource (" + serverId + ") XA_START [" + xid + "] Flags=" + flags);
+ }
+}
Added: labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/TestResourceRecovery.java
===================================================================
--- labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/TestResourceRecovery.java (rev 0)
+++ labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/TestResourceRecovery.java 2011-10-14 23:48:41 UTC (rev 37545)
@@ -0,0 +1,34 @@
+package com.arjuna.ats.jta.distributed;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.transaction.xa.XAResource;
+
+import org.jboss.tm.XAResourceRecovery;
+
+public class TestResourceRecovery implements XAResourceRecovery {
+
+ private List<TestResource> resources = new ArrayList<TestResource>();
+
+ public TestResourceRecovery(int serverId) throws IOException {
+ File file = new File(System.getProperty("user.dir") + "/tmp/TestResource/" + serverId + "/");
+ if (file.exists() && file.isDirectory()) {
+ File[] listFiles = file.listFiles();
+ for (int i = 0; i < listFiles.length; i++) {
+ File currentFile = listFiles[i];
+ if (currentFile.getAbsolutePath().endsWith("_")) {
+ resources.add(new TestResource(serverId, currentFile));
+ }
+ }
+ }
+ }
+
+ @Override
+ public XAResource[] getXAResources() {
+ return resources.toArray(new XAResource[] {});
+ }
+
+}
Added: labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/TestSynchronization.java
===================================================================
--- labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/TestSynchronization.java (rev 0)
+++ labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/TestSynchronization.java 2011-10-14 23:48:41 UTC (rev 37545)
@@ -0,0 +1,21 @@
+package com.arjuna.ats.jta.distributed;
+
+import javax.transaction.Synchronization;
+
+public class TestSynchronization implements Synchronization {
+ private int serverId;
+
+ public TestSynchronization(int serverId) {
+ this.serverId = serverId;
+ }
+
+ @Override
+ public void beforeCompletion() {
+ System.out.println(" TestSynchronization (" + serverId + ") beforeCompletion");
+ }
+
+ @Override
+ public void afterCompletion(int status) {
+ System.out.println(" TestSynchronization (" + serverId + ") afterCompletion");
+ }
+}
Added: labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/impl/ProxySynchronization.java
===================================================================
--- labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/impl/ProxySynchronization.java (rev 0)
+++ labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/impl/ProxySynchronization.java 2011-10-14 23:48:41 UTC (rev 37545)
@@ -0,0 +1,39 @@
+package com.arjuna.ats.jta.distributed.impl;
+
+import javax.transaction.Synchronization;
+import javax.transaction.SystemException;
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.Xid;
+
+import com.arjuna.ats.jta.distributed.SimpleIsolatedServers;
+
+public class ProxySynchronization implements Synchronization {
+
+ private int serverId;
+ private int serverIdToProxyTo;
+ private Xid toRegisterAgainst;
+
+ public ProxySynchronization(int serverId, int serverIdToProxyTo, Xid toRegisterAgainst) {
+ this.serverId = serverId;
+ this.serverIdToProxyTo = serverIdToProxyTo;
+ this.toRegisterAgainst = toRegisterAgainst;
+ }
+
+ @Override
+ public void beforeCompletion() {
+ System.out.println("ProxySynchronization (" + serverId + ":" + serverIdToProxyTo + ") beforeCompletion");
+ int index = (serverIdToProxyTo / 1000) - 1;
+ try {
+ SimpleIsolatedServers.getServers()[index].propagateBeforeCompletion(toRegisterAgainst);
+ } catch (XAException e) {
+ e.printStackTrace();
+ } catch (SystemException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void afterCompletion(int status) {
+ // These are not proxied but are handled during local commits
+ }
+}
Added: labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/impl/ProxyXAResource.java
===================================================================
--- labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/impl/ProxyXAResource.java (rev 0)
+++ labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/impl/ProxyXAResource.java 2011-10-14 23:48:41 UTC (rev 37545)
@@ -0,0 +1,298 @@
+package com.arjuna.ats.jta.distributed.impl;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.transaction.HeuristicCommitException;
+import javax.transaction.HeuristicMixedException;
+import javax.transaction.HeuristicRollbackException;
+import javax.transaction.SystemException;
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import org.jboss.tm.XAResourceWrapper;
+
+import com.arjuna.ats.arjuna.common.Uid;
+import com.arjuna.ats.jta.distributed.Server;
+import com.arjuna.ats.jta.distributed.SimpleIsolatedServers;
+
+public class ProxyXAResource implements XAResource, XAResourceWrapper {
+
+ public static final ThreadLocal<List<Integer>> RECOVERY_SCAN_STARTED = new ThreadLocal<List<Integer>>();
+
+ private int transactionTimeout;
+ private Xid xid;
+ private int serverIdToProxyTo = -1;
+ private File file;
+ private Integer serverId;
+
+ public ProxyXAResource(int serverId, int serverIdToProxyTo) {
+ this.serverId = serverId;
+ this.serverIdToProxyTo = serverIdToProxyTo;
+ }
+
+ public ProxyXAResource(int recoverFor, File file) throws IOException {
+ this.serverId = recoverFor;
+ this.file = file;
+ DataInputStream fis = new DataInputStream(new FileInputStream(file));
+ final int formatId = fis.readInt();
+ final int gtrid_length = fis.readInt();
+ final byte[] gtrid = new byte[gtrid_length];
+ fis.read(gtrid, 0, gtrid_length);
+ final int bqual_length = fis.readInt();
+ final byte[] bqual = new byte[bqual_length];
+ fis.read(bqual, 0, bqual_length);
+ int serverIdToProxyTo = fis.readInt();
+ this.serverIdToProxyTo = serverIdToProxyTo;
+ this.xid = new Xid() {
+ @Override
+ public byte[] getGlobalTransactionId() {
+ return gtrid;
+ }
+
+ @Override
+ public int getFormatId() {
+ return formatId;
+ }
+
+ @Override
+ public byte[] getBranchQualifier() {
+ return bqual;
+ }
+
+ @Override
+ public boolean equals(Object object) {
+ Xid xid = (Xid) object;
+ if (xid == null)
+ return false;
+
+ if (xid == this)
+ return true;
+ else {
+
+ if (xid.getFormatId() == formatId) {
+ byte[] gtx = xid.getGlobalTransactionId();
+ byte[] bql = xid.getBranchQualifier();
+ final int bqlength = (bql == null ? 0 : bql.length);
+
+ if ((gtrid.length == gtx.length) && (bqual.length == bqlength)) {
+ int i;
+
+ for (i = 0; i < gtrid.length; i++) {
+ if (gtrid[i] != gtx[i])
+ return false;
+ }
+
+ for (i = 0; i < bqual.length; i++) {
+ if (bqual[i] != bql[i])
+ return false;
+ }
+
+ return true;
+ }
+ }
+ }
+
+ return false;
+ }
+ };
+ }
+
+ public Xid getXid() {
+ return xid;
+ }
+
+ @Override
+ public void start(Xid xid, int flags) throws XAException {
+ System.out.println(" ProxyXAResource (" + serverId + ":" + serverIdToProxyTo + ") XA_START [" + xid + "]");
+ this.xid = xid;
+ }
+
+ @Override
+ public void end(Xid xid, int flags) throws XAException {
+ System.out.println(" ProxyXAResource (" + serverId + ":" + serverIdToProxyTo + ") XA_END [" + xid + "]");
+ this.xid = null;
+ }
+
+ @Override
+ public synchronized int prepare(Xid xid) throws XAException {
+ System.out.println(" ProxyXAResource (" + serverId + ":" + serverIdToProxyTo + ") XA_PREPARE [" + xid + "]");
+
+ try {
+ File dir = new File(System.getProperty("user.dir") + "/tmp/ProxyXAResource/" + serverId + "/");
+ dir.mkdirs();
+ file = new File(dir, new Uid().fileStringForm());
+
+ file.createNewFile();
+
+ final int formatId = xid.getFormatId();
+ final byte[] gtrid = xid.getGlobalTransactionId();
+ final int gtrid_length = gtrid.length;
+ final byte[] bqual = xid.getBranchQualifier();
+ final int bqual_length = bqual.length;
+
+ DataOutputStream fos = new DataOutputStream(new FileOutputStream(file));
+ fos.writeInt(formatId);
+ fos.writeInt(gtrid_length);
+ fos.write(gtrid, 0, gtrid_length);
+ fos.writeInt(bqual_length);
+ fos.write(bqual, 0, bqual_length);
+ fos.writeInt(serverIdToProxyTo);
+ } catch (IOException e) {
+ e.printStackTrace();
+ throw new XAException(XAException.XAER_RMERR);
+ }
+
+ int propagatePrepare = getServerToProxyTo().propagatePrepare(xid);
+ System.out.println(" ProxyXAResource (" + serverId + ":" + serverIdToProxyTo + ") XA_PREPARED");
+ return propagatePrepare;
+ }
+
+ @Override
+ public synchronized void commit(Xid xid, boolean onePhase) throws XAException {
+ System.out.println(" ProxyXAResource (" + serverId + ":" + serverIdToProxyTo + ") XA_COMMIT [" + xid + "]");
+
+ try {
+ getServerToProxyTo().propagateCommit(xid, onePhase);
+ System.out.println(" ProxyXAResource (" + serverId + ":" + serverIdToProxyTo + ") XA_COMMITED");
+ } catch (IllegalStateException e) {
+ throw new XAException(XAException.XAER_INVAL);
+ } catch (HeuristicMixedException e) {
+ throw new XAException(XAException.XA_HEURMIX);
+ } catch (HeuristicRollbackException e) {
+ throw new XAException(XAException.XA_HEURRB);
+ } catch (HeuristicCommitException e) {
+ throw new XAException(XAException.XA_HEURCOM);
+ } catch (SystemException e) {
+ throw new XAException(XAException.XAER_PROTO);
+ }
+
+ if (file != null) {
+ file.delete();
+ }
+ }
+
+ @Override
+ public synchronized void rollback(Xid xid) throws XAException {
+ System.out.println(" ProxyXAResource (" + serverId + ":" + serverIdToProxyTo + ") XA_ROLLBACK[" + xid + "]");
+ try {
+ getServerToProxyTo().propagateRollback(xid);
+ System.out.println(" ProxyXAResource (" + serverId + ":" + serverIdToProxyTo + ") XA_ROLLBACKED");
+ } catch (IllegalStateException e) {
+ throw new XAException(XAException.XAER_INVAL);
+ } catch (HeuristicMixedException e) {
+ throw new XAException(XAException.XA_HEURMIX);
+ } catch (HeuristicCommitException e) {
+ throw new XAException(XAException.XA_HEURCOM);
+ } catch (HeuristicRollbackException e) {
+ throw new XAException(XAException.XA_HEURRB);
+ } catch (SystemException e) {
+ throw new XAException(XAException.XAER_PROTO);
+ }
+
+ if (file != null) {
+ file.delete();
+ }
+ }
+
+ @Override
+ public Xid[] recover(int flag) throws XAException {
+ List<Integer> startScanned = RECOVERY_SCAN_STARTED.get();
+ if (startScanned == null) {
+ startScanned = new ArrayList<Integer>();
+ RECOVERY_SCAN_STARTED.set(startScanned);
+ }
+
+ int tocheck = (flag & XAResource.TMSTARTRSCAN);
+ if (tocheck == XAResource.TMSTARTRSCAN) {
+ System.out.println(" ProxyXAResource (" + serverId + ":" + serverIdToProxyTo + ") XA_RECOVER [XAResource.TMSTARTRSCAN]: " + serverIdToProxyTo);
+
+ if (!startScanned.contains(serverIdToProxyTo)) {
+ startScanned.add(serverIdToProxyTo);
+
+ // Make sure that the remote server has recovered all
+ // transactions
+ getServerToProxyTo().propagateRecover(startScanned, flag);
+ startScanned.remove((Integer) serverIdToProxyTo);
+ }
+
+ System.out.println(" ProxyXAResource (" + serverId + ":" + serverIdToProxyTo + ") XA_RECOVERD[XAResource.TMSTARTRSCAN]: " + serverIdToProxyTo);
+ }
+ tocheck = (flag & XAResource.TMENDRSCAN);
+ if (tocheck == XAResource.TMENDRSCAN) {
+ System.out.println(" ProxyXAResource (" + serverId + ":" + serverIdToProxyTo + ") XA_RECOVER [XAResource.TMENDRSCAN]: " + serverIdToProxyTo);
+
+ if (!startScanned.contains(serverIdToProxyTo)) {
+ getServerToProxyTo().propagateRecover(startScanned, flag);
+ }
+
+ System.out.println(" ProxyXAResource (" + serverId + ":" + serverIdToProxyTo + ") XA_RECOVERD[XAResource.TMENDRSCAN]: " + serverIdToProxyTo);
+ }
+
+ return new Xid[] { xid };
+ }
+
+ @Override
+ public void forget(Xid xid) throws XAException {
+ System.out.println(" ProxyXAResource (" + serverId + ":" + serverIdToProxyTo + ") XA_FORGET [" + xid + "]");
+ getServerToProxyTo().propagateForget(xid);
+ System.out.println(" ProxyXAResource (" + serverId + ":" + serverIdToProxyTo + ") XA_FORGETED[" + xid + "]");
+ }
+
+ @Override
+ public int getTransactionTimeout() throws XAException {
+ return transactionTimeout;
+ }
+
+ @Override
+ public boolean setTransactionTimeout(int seconds) throws XAException {
+ this.transactionTimeout = seconds;
+ return true;
+ }
+
+ @Override
+ public boolean isSameRM(XAResource xares) throws XAException {
+ return xares.equals(this);
+ }
+
+ /**
+ * I don't think this is used by TM.
+ */
+ @Override
+ public XAResource getResource() {
+ return null;
+ }
+
+ /**
+ * I don't think this is used by TM.
+ */
+ @Override
+ public String getProductName() {
+ return null;
+ }
+
+ /**
+ * I don't think this is used by TM.
+ */
+ @Override
+ public String getProductVersion() {
+ return null;
+ }
+
+ @Override
+ public String getJndiName() {
+ return "ProxyXAResource";
+ }
+
+ private Server getServerToProxyTo() {
+ int index = (serverIdToProxyTo / 1000) - 1;
+ return SimpleIsolatedServers.getServers()[index];
+ }
+}
Added: labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/impl/ProxyXAResourceRecovery.java
===================================================================
--- labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/impl/ProxyXAResourceRecovery.java (rev 0)
+++ labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/impl/ProxyXAResourceRecovery.java 2011-10-14 23:48:41 UTC (rev 37545)
@@ -0,0 +1,33 @@
+package com.arjuna.ats.jta.distributed.impl;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.transaction.xa.XAResource;
+
+import org.jboss.tm.XAResourceRecovery;
+
+
+public class ProxyXAResourceRecovery implements XAResourceRecovery {
+
+ private List<ProxyXAResource> resources = new ArrayList<ProxyXAResource>();
+
+ public ProxyXAResourceRecovery(int id) throws IOException {
+ File file = new File(System.getProperty("user.dir") + "/tmp/ProxyXAResource/" + id + "/");
+ if (file.exists() && file.isDirectory()) {
+ File[] listFiles = file.listFiles();
+ for (int i = 0; i < listFiles.length; i++) {
+ File currentFile = listFiles[i];
+ resources.add(new ProxyXAResource(id, currentFile));
+ }
+ }
+ }
+
+ @Override
+ public XAResource[] getXAResources() {
+ return resources.toArray(new XAResource[] {});
+ }
+
+}
Added: labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/impl/ServerImpl.java
===================================================================
--- labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/impl/ServerImpl.java (rev 0)
+++ labs/jbosstm/branches/JBOSSTS_4_15_0_Final/atsintegration/tests/classes/com/arjuna/ats/jta/distributed/impl/ServerImpl.java 2011-10-14 23:48:41 UTC (rev 37545)
@@ -0,0 +1,220 @@
+package com.arjuna.ats.jta.distributed.impl;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.transaction.HeuristicCommitException;
+import javax.transaction.HeuristicMixedException;
+import javax.transaction.HeuristicRollbackException;
+import javax.transaction.InvalidTransactionException;
+import javax.transaction.RollbackException;
+import javax.transaction.Synchronization;
+import javax.transaction.SystemException;
+import javax.transaction.TransactionManager;
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import org.jboss.tm.TransactionTimeoutConfiguration;
+
+import com.arjuna.ats.arjuna.common.CoordinatorEnvironmentBean;
+import com.arjuna.ats.arjuna.common.CoreEnvironmentBean;
+import com.arjuna.ats.arjuna.common.CoreEnvironmentBeanException;
+import com.arjuna.ats.arjuna.common.ObjectStoreEnvironmentBean;
+import com.arjuna.ats.arjuna.common.RecoveryEnvironmentBean;
+import com.arjuna.ats.arjuna.coordinator.TransactionReaper;
+import com.arjuna.ats.arjuna.coordinator.TxControl;
+import com.arjuna.ats.arjuna.recovery.RecoveryManager;
+import com.arjuna.ats.arjuna.tools.osb.mbean.ObjStoreBrowser;
+import com.arjuna.ats.internal.jbossatx.jta.XAResourceRecordWrappingPluginImpl;
+import com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionImple;
+import com.arjuna.ats.internal.jta.transaction.arjunacore.jca.SubordinateTransaction;
+import com.arjuna.ats.internal.jta.transaction.arjunacore.jca.SubordinationManager;
+import com.arjuna.ats.jbossatx.jta.RecoveryManagerService;
+import com.arjuna.ats.jbossatx.jta.TransactionManagerService;
+import com.arjuna.ats.jta.common.JTAEnvironmentBean;
+import com.arjuna.ats.jta.distributed.Server;
+import com.arjuna.ats.jta.distributed.TestResourceRecovery;
+
+public class ServerImpl implements Server {
+
+ private int id;
+ private RecoveryManagerService recoveryManagerService;
+ private TransactionManagerService transactionManagerService;
+
+ public void initialise(int id) throws CoreEnvironmentBeanException, IOException {
+ this.id = id;
+
+ RecoveryEnvironmentBean recoveryEnvironmentBean = com.arjuna.ats.arjuna.common.recoveryPropertyManager.getRecoveryEnvironmentBean();
+ recoveryEnvironmentBean.setRecoveryBackoffPeriod(1);
+
+ recoveryEnvironmentBean.setRecoveryInetAddress(InetAddress.getByName("localhost"));
+ recoveryEnvironmentBean.setRecoveryPort(4712 + id);
+ recoveryEnvironmentBean.setTransactionStatusManagerInetAddress(InetAddress.getByName("localhost"));
+ recoveryEnvironmentBean.setTransactionStatusManagerPort(4713 + id);
+ List<String> recoveryModuleClassNames = new ArrayList<String>();
+
+ recoveryModuleClassNames.add("com.arjuna.ats.internal.arjuna.recovery.AtomicActionRecoveryModule");
+ recoveryModuleClassNames.add("com.arjuna.ats.internal.txoj.recovery.TORecoveryModule");
+ recoveryModuleClassNames.add("com.arjuna.ats.internal.jta.recovery.arjunacore.XARecoveryModule");
+ recoveryEnvironmentBean.setRecoveryModuleClassNames(recoveryModuleClassNames);
+ List<String> expiryScannerClassNames = new ArrayList<String>();
+ expiryScannerClassNames.add("com.arjuna.ats.internal.arjuna.recovery.ExpiredTransactionStatusManagerScanner");
+ recoveryEnvironmentBean.setExpiryScannerClassNames(expiryScannerClassNames);
+ recoveryEnvironmentBean.setRecoveryActivators(null);
+
+ CoreEnvironmentBean coreEnvironmentBean = com.arjuna.ats.arjuna.common.arjPropertyManager.getCoreEnvironmentBean();
+ coreEnvironmentBean.setSocketProcessIdPort(4714 + id);
+ coreEnvironmentBean.setNodeIdentifier(id);
+ coreEnvironmentBean.setSocketProcessIdMaxPorts(1);
+
+ CoordinatorEnvironmentBean coordinatorEnvironmentBean = com.arjuna.ats.arjuna.common.arjPropertyManager.getCoordinatorEnvironmentBean();
+ coordinatorEnvironmentBean.setEnableStatistics(false);
+ coordinatorEnvironmentBean.setDefaultTimeout(300);
+ coordinatorEnvironmentBean.setTransactionStatusManagerEnable(false);
+
+ ObjectStoreEnvironmentBean actionStoreObjectStoreEnvironmentBean = com.arjuna.common.internal.util.propertyservice.BeanPopulator.getNamedInstance(
+ com.arjuna.ats.arjuna.common.ObjectStoreEnvironmentBean.class, "default");
+ actionStoreObjectStoreEnvironmentBean.setObjectStoreDir(System.getProperty("user.dir") + "/tmp/tx-object-store/" + id);
+
+ ObjectStoreEnvironmentBean stateStoreObjectStoreEnvironmentBean = com.arjuna.common.internal.util.propertyservice.BeanPopulator.getNamedInstance(
+ com.arjuna.ats.arjuna.common.ObjectStoreEnvironmentBean.class, "stateStore");
+ stateStoreObjectStoreEnvironmentBean.setObjectStoreDir(System.getProperty("user.dir") + "/tmp/tx-object-store/" + id);
+
+ ObjectStoreEnvironmentBean communicationStoreObjectStoreEnvironmentBean = com.arjuna.common.internal.util.propertyservice.BeanPopulator
+ .getNamedInstance(com.arjuna.ats.arjuna.common.ObjectStoreEnvironmentBean.class, "communicationStore");
+ communicationStoreObjectStoreEnvironmentBean.setObjectStoreDir(System.getProperty("user.dir") + "/tmp/tx-object-store/" + id);
+
+ ObjStoreBrowser objStoreBrowser = new ObjStoreBrowser();
+ Map<String, String> types = new HashMap<String, String>();
+ types.put("StateManager/BasicAction/TwoPhaseCoordinator/AtomicAction", "com.arjuna.ats.internal.jta.tools.osb.mbean.jta.JTAActionBean");
+ objStoreBrowser.setTypes(types);
+
+ JTAEnvironmentBean jTAEnvironmentBean = com.arjuna.ats.jta.common.jtaPropertyManager.getJTAEnvironmentBean();
+ jTAEnvironmentBean.setLastResourceOptimisationInterface(org.jboss.tm.LastResource.class);
+ jTAEnvironmentBean.setTransactionManagerClassName("com.arjuna.ats.jbossatx.jta.TransactionManagerDelegate");
+ jTAEnvironmentBean.setUserTransactionClassName("com.arjuna.ats.internal.jta.transaction.arjunacore.UserTransactionImple");
+ jTAEnvironmentBean
+ .setTransactionSynchronizationRegistryClassName("com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionSynchronizationRegistryImple");
+ List<Integer> xaRecoveryNodes = new ArrayList<Integer>();
+ xaRecoveryNodes.add(id);
+ jTAEnvironmentBean.setXaRecoveryNodes(xaRecoveryNodes);
+
+ List<String> xaResourceOrphanFilterClassNames = new ArrayList<String>();
+
+ xaResourceOrphanFilterClassNames.add("com.arjuna.ats.internal.jta.recovery.arjunacore.JTATransactionLogXAResourceOrphanFilter");
+ xaResourceOrphanFilterClassNames.add("com.arjuna.ats.internal.jta.recovery.arjunacore.JTANodeNameXAResourceOrphanFilter");
+ // xaResourceOrphanFilterClassNames.add("com.arjuna.ats.internal.jta.recovery.arjunacore.ParentNodeNameXAResourceOrphanFilter");
+ jTAEnvironmentBean.setXaResourceOrphanFilterClassNames(xaResourceOrphanFilterClassNames);
+ jTAEnvironmentBean.setXAResourceRecordWrappingPlugin(new XAResourceRecordWrappingPluginImpl());
+
+ recoveryManagerService = new RecoveryManagerService();
+// recoveryManagerService.create();
+ RecoveryManager.delayRecoveryManagerThread();
+ RecoveryManager.manager();
+ recoveryManagerService.addXAResourceRecovery(new ProxyXAResourceRecovery(id));
+ recoveryManagerService.addXAResourceRecovery(new TestResourceRecovery(id));
+ // recoveryManagerService.start();
+ RecoveryManager.manager().initialize();
+
+ transactionManagerService = new TransactionManagerService();
+ TxControl txControl = new com.arjuna.ats.arjuna.coordinator.TxControl();
+ txControl.setDefaultTimeout(0);
+ transactionManagerService.setJbossXATerminator(new com.arjuna.ats.internal.jbossatx.jta.jca.XATerminator());
+ transactionManagerService
+ .setTransactionSynchronizationRegistry(new com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionSynchronizationRegistryImple());
+ // starts the transaction reaper transactionManagerService.create();
+
+ }
+
+ public void doRecoveryManagerScan() {
+ RecoveryManager.manager().scan();
+ }
+
+ public void startTransactionReaper() {
+ TransactionReaper.transactionReaper();
+ }
+
+ public TransactionManager getTransactionManager() {
+ return transactionManagerService.getTransactionManager();
+ }
+
+ @Override
+ public boolean importTransaction(int remainingTimeout, Xid toResume) throws XAException, InvalidTransactionException, IllegalStateException,
+ SystemException {
+ boolean existed = true;
+ SubordinateTransaction importTransaction = SubordinationManager.getTransactionImporter().getImportedTransaction(toResume);
+ if (importTransaction == null) {
+ importTransaction = SubordinationManager.getTransactionImporter().importTransaction(toResume, remainingTimeout);
+ existed = false;
+ }
+ getTransactionManager().resume(importTransaction);
+ return existed;
+ }
+
+ @Override
+ public int propagatePrepare(Xid xid) throws XAException {
+ return SubordinationManager.getTransactionImporter().getImportedTransaction(xid).doPrepare();
+ }
+
+ @Override
+ public void propagateCommit(Xid xid, boolean onePhase) throws IllegalStateException, HeuristicMixedException, HeuristicRollbackException,
+ HeuristicCommitException, SystemException, XAException {
+ SubordinationManager.getTransactionImporter().getImportedTransaction(xid).doCommit();
+ }
+
+ @Override
+ public void propagateRollback(Xid xid) throws IllegalStateException, HeuristicMixedException, HeuristicCommitException, HeuristicRollbackException,
+ SystemException, XAException {
+ SubordinationManager.getTransactionImporter().getImportedTransaction(xid).doRollback();
+ }
+
+ @Override
+ public Xid[] propagateRecover(List<Integer> recoveryScanStarted, int flag) throws XAException {
+ // Assumes that this thread is used by the recovery thread
+ ProxyXAResource.RECOVERY_SCAN_STARTED.set(recoveryScanStarted);
+ return SubordinationManager.getXATerminator().recover(flag);
+ }
+
+ @Override
+ public void propagateForget(Xid xid) throws XAException {
+ SubordinationManager.getXATerminator().forget(xid);
+
+ }
+
+ @Override
+ public int getNodeName() {
+ return TxControl.getXANodeName();
+ }
+
+ @Override
+ public long getTimeLeftBeforeTransactionTimeout() throws RollbackException {
+ return ((TransactionTimeoutConfiguration) transactionManagerService.getTransactionManager()).getTimeLeftBeforeTransactionTimeout(false);
+ }
+
+ @Override
+ public void propagateBeforeCompletion(Xid xid) throws XAException, SystemException {
+ SubordinateTransaction tx = SubordinationManager.getTransactionImporter().getImportedTransaction(xid);
+ tx.doBeforeCompletion();
+ }
+
+ @Override
+ public Xid getCurrentXid() throws SystemException {
+ TransactionImple transaction = ((TransactionImple) transactionManagerService.getTransactionManager().getTransaction());
+ return transaction.getTxId();
+ }
+
+ @Override
+ public XAResource generateProxyXAResource(int currentNodeName, int nextNodeName) {
+ return new ProxyXAResource(currentNodeName, nextNodeName);
+ }
+
+ @Override
+ public Synchronization generateProxySynchronization(int serverId, int serverIdToProxyTo, Xid toRegisterAgainst) {
+ return new ProxySynchronization(serverId, serverIdToProxyTo, toRegisterAgainst);
+ }
+}
More information about the jboss-svn-commits
mailing list