[jboss-svn-commits] JBL Code SVN: r28459 - in labs/jbosstm/workspace/whitingjr/trunk/MVCCSampleSTM/src: main/java/uk/ac/ncl/sdia/a8905943/persistence/xa and 1 other directories.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Sun Jul 26 15:19:01 EDT 2009


Author: whitingjr
Date: 2009-07-26 15:19:00 -0400 (Sun, 26 Jul 2009)
New Revision: 28459

Added:
   labs/jbosstm/workspace/whitingjr/trunk/MVCCSampleSTM/src/test/java/uk/ac/ncl/sdia/a8905943/persistence/xa/JUnitTestSTMXAConnectionImpl.java
   labs/jbosstm/workspace/whitingjr/trunk/MVCCSampleSTM/src/test/java/uk/ac/ncl/sdia/a8905943/persistence/xa/JUnitTestSTMXAResource.java
Modified:
   labs/jbosstm/workspace/whitingjr/trunk/MVCCSampleSTM/src/main/java/uk/ac/ncl/sdia/a8905943/persistence/jdbc/STMConnection.java
   labs/jbosstm/workspace/whitingjr/trunk/MVCCSampleSTM/src/main/java/uk/ac/ncl/sdia/a8905943/persistence/jdbc/STMDataSource.java
   labs/jbosstm/workspace/whitingjr/trunk/MVCCSampleSTM/src/main/java/uk/ac/ncl/sdia/a8905943/persistence/xa/STMXAConnectionImpl.java
   labs/jbosstm/workspace/whitingjr/trunk/MVCCSampleSTM/src/main/java/uk/ac/ncl/sdia/a8905943/persistence/xa/STMXADatasourceImpl.java
   labs/jbosstm/workspace/whitingjr/trunk/MVCCSampleSTM/src/main/java/uk/ac/ncl/sdia/a8905943/persistence/xa/STMXAResource.java
Log:
Added implementation and test cases.

Modified: labs/jbosstm/workspace/whitingjr/trunk/MVCCSampleSTM/src/main/java/uk/ac/ncl/sdia/a8905943/persistence/jdbc/STMConnection.java
===================================================================
--- labs/jbosstm/workspace/whitingjr/trunk/MVCCSampleSTM/src/main/java/uk/ac/ncl/sdia/a8905943/persistence/jdbc/STMConnection.java	2009-07-26 19:18:24 UTC (rev 28458)
+++ labs/jbosstm/workspace/whitingjr/trunk/MVCCSampleSTM/src/main/java/uk/ac/ncl/sdia/a8905943/persistence/jdbc/STMConnection.java	2009-07-26 19:19:00 UTC (rev 28459)
@@ -24,17 +24,23 @@
 import java.util.Map;
 import java.util.Properties;
 
+import org.apache.log4j.Logger;
+
+import uk.ac.ncl.sdia.a8905943.persistence.xa.STMXAConnectionImpl;
+
 /**
  * This object represents a logical connection with database. In this system the 
- * database is STM memory. 
+ * database is STM memory.
  * 
  * @author <a href="whitingjr at hotmail.com">Jeremy Whiting</a>
  * @version $Revision: 1.1 $
  */
 public class STMConnection implements Connection
 {
+   private static final Logger logger = Logger.getLogger(STMConnection.class);
    private final boolean autoCommit = false;
    private boolean isActive = false;
+   private final STMXAConnectionImpl xaConnection ;
    
    @Override
    public void clearWarnings() throws SQLException
@@ -377,5 +383,12 @@
       return null;
    }
    
-
+   public STMConnection(STMXAConnectionImpl xaConn)
+   {
+      this.xaConnection = xaConn;
+      if (logger.isDebugEnabled())
+      {
+         logger.debug("Created an instance of STMConnection associtated with parent STMXAConnectionImpl.");
+      }
+   }
 }

Modified: labs/jbosstm/workspace/whitingjr/trunk/MVCCSampleSTM/src/main/java/uk/ac/ncl/sdia/a8905943/persistence/jdbc/STMDataSource.java
===================================================================
--- labs/jbosstm/workspace/whitingjr/trunk/MVCCSampleSTM/src/main/java/uk/ac/ncl/sdia/a8905943/persistence/jdbc/STMDataSource.java	2009-07-26 19:18:24 UTC (rev 28458)
+++ labs/jbosstm/workspace/whitingjr/trunk/MVCCSampleSTM/src/main/java/uk/ac/ncl/sdia/a8905943/persistence/jdbc/STMDataSource.java	2009-07-26 19:19:00 UTC (rev 28459)
@@ -28,7 +28,7 @@
    public Connection getConnection(String username, String password) throws SQLException
    {
       logger.warn("Call to getConnection() for Connection that is non-XA compliant.");
-      return new STMConnection();
+      throw new UnsupportedOperationException("Unsupported operation.");
    }
 
    @Override

Modified: labs/jbosstm/workspace/whitingjr/trunk/MVCCSampleSTM/src/main/java/uk/ac/ncl/sdia/a8905943/persistence/xa/STMXAConnectionImpl.java
===================================================================
--- labs/jbosstm/workspace/whitingjr/trunk/MVCCSampleSTM/src/main/java/uk/ac/ncl/sdia/a8905943/persistence/xa/STMXAConnectionImpl.java	2009-07-26 19:18:24 UTC (rev 28458)
+++ labs/jbosstm/workspace/whitingjr/trunk/MVCCSampleSTM/src/main/java/uk/ac/ncl/sdia/a8905943/persistence/xa/STMXAConnectionImpl.java	2009-07-26 19:19:00 UTC (rev 28459)
@@ -10,6 +10,7 @@
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
 
 import javax.sql.ConnectionEventListener;
 import javax.sql.StatementEventListener;
@@ -22,9 +23,10 @@
 import uk.ac.ncl.sdia.a8905943.stm.STM;
 
 /**
- * This object is responsible for interfacing with the transaction manager
- * to provide a XAResource reference to enable a transaction (using this connection)
+ * This object is responsible for interfacing with the server pool
+ * to provide a XAResource reference to enable a transaction (using this physical connection)
  * to operate in a distributed transaction.
+ * This is the physical connection.
  * When used in an application server a number of connection objects are obtained
  * and pooled by server. The server then allocates a connection to an EntityManager
  * for unit of work performed in a transaction.
@@ -34,15 +36,15 @@
  * @author <a href="whitingjr at hotmail.com">Jeremy Whiting</a>
  * @version $Revision: 1.1 $
  */
-public class STMXAConnectionImpl extends STMConnection implements XAConnection 
+public class STMXAConnectionImpl implements XAConnection 
 {
    //TODO:jrw decide where exactly this STM reference is going to be held.
    private final List<ConnectionEventListener> listeners = new ArrayList<ConnectionEventListener>();
    private final STM stm;
-   private STMXAResource xaResource ;
-   private volatile boolean isActive = false;
+   private final STMXAResource xaResource ;
    private STMXADatasourceImpl dataSource ;
    private static final Logger logger = Logger.getLogger(STMXAConnectionImpl.class);
+   private AtomicReference<STMConnection> logicalConnection = new AtomicReference<STMConnection>();
 
    /**
     * This method returns a XAResource for this particular connection. This method is called
@@ -64,15 +66,22 @@
    @Override
    public void close() throws SQLException
    {
-      this.isActive = false;
-
+      STMConnection conn = this.logicalConnection.get();
+      conn.close();
+      this.logicalConnection.compareAndSet(conn, null);
    }
 
    @Override
    public Connection getConnection() throws SQLException
    {
-      this.isActive = true;
-      return new STMConnection();
+      STMConnection returnValue = null;
+      if (null == this.logicalConnection.get())
+      {
+         this.logicalConnection.compareAndSet(null, new STMConnection(this));
+      }
+      returnValue = this.logicalConnection.get();
+      
+      return returnValue;
    }
 
    @Override
@@ -111,7 +120,7 @@
 
    public boolean isActive()
    {
-      return isActive;
+      return (null != this.logicalConnection.get());
    }
    
    public STM getSTM()

Modified: labs/jbosstm/workspace/whitingjr/trunk/MVCCSampleSTM/src/main/java/uk/ac/ncl/sdia/a8905943/persistence/xa/STMXADatasourceImpl.java
===================================================================
--- labs/jbosstm/workspace/whitingjr/trunk/MVCCSampleSTM/src/main/java/uk/ac/ncl/sdia/a8905943/persistence/xa/STMXADatasourceImpl.java	2009-07-26 19:18:24 UTC (rev 28458)
+++ labs/jbosstm/workspace/whitingjr/trunk/MVCCSampleSTM/src/main/java/uk/ac/ncl/sdia/a8905943/persistence/xa/STMXADatasourceImpl.java	2009-07-26 19:19:00 UTC (rev 28459)
@@ -8,7 +8,6 @@
 
 import java.io.PrintWriter;
 import java.io.Serializable;
-import java.sql.Connection;
 import java.sql.SQLException;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -24,7 +23,6 @@
 import org.apache.commons.lang.builder.HashCodeBuilder;
 import org.apache.log4j.Logger;
 
-import uk.ac.ncl.sdia.a8905943.persistence.jdbc.STMConnection;
 import uk.ac.ncl.sdia.a8905943.persistence.jdbc.STMDataSource;
 import uk.ac.ncl.sdia.a8905943.stm.STM;
 
@@ -55,7 +53,7 @@
    {
       if (logger.isDebugEnabled())
       {
-         logger.info("The XA Datasource is providing a XAConnection ");
+         logger.debug("The XA Datasource is providing a XAConnection ");
       }
       STMXAConnectionImpl returnValue = null;
       if (stmDatabases.contains(getDatabaseName()))

Modified: labs/jbosstm/workspace/whitingjr/trunk/MVCCSampleSTM/src/main/java/uk/ac/ncl/sdia/a8905943/persistence/xa/STMXAResource.java
===================================================================
--- labs/jbosstm/workspace/whitingjr/trunk/MVCCSampleSTM/src/main/java/uk/ac/ncl/sdia/a8905943/persistence/xa/STMXAResource.java	2009-07-26 19:18:24 UTC (rev 28458)
+++ labs/jbosstm/workspace/whitingjr/trunk/MVCCSampleSTM/src/main/java/uk/ac/ncl/sdia/a8905943/persistence/xa/STMXAResource.java	2009-07-26 19:19:00 UTC (rev 28459)
@@ -6,18 +6,23 @@
  */
 package uk.ac.ncl.sdia.a8905943.persistence.xa;
 
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
 
 import javax.transaction.xa.XAException;
 import javax.transaction.xa.XAResource;
 import javax.transaction.xa.Xid;
 
+import org.apache.commons.lang.builder.EqualsBuilder;
+import org.apache.commons.lang.builder.HashCodeBuilder;
 import org.apache.log4j.Logger;
 
 /**
  * This object delegates work and then maps application specific exceptions
  * to XA Spec specific exceptions.
  * This object also checks requested operations are valid.
+ * Migration of Xids between threads is not supported.
  * 
  * @author <a href="whitingjr at hotmail.com">Jeremy Whiting</a>
  * @version $Revision: 1.1 $
@@ -25,20 +30,49 @@
 public class STMXAResource implements XAResource
 {
    // access to the shared resource manager is synchronized
-   private static final ConcurrentHashMap<String, XAResourceManager> resourceManagers = new ConcurrentHashMap<String, XAResourceManager>();
    private final STMXAConnectionImpl xaConnection;
    private static final Logger logger = Logger.getLogger(STMXAResource.class);
+   /* Branch mappings are to associate the Xid branch and the thread of control
+    * which this connection is allocated to. Only one active branch is allowed per
+    * connection, others may be associated. Non active associations allowed are
+    * suspended or ended.*/
+   private final Map<Xid, BranchMapping> branches = new HashMap<Xid, BranchMapping> ();
+   private AtomicReference<BranchMapping> currentBranch = new AtomicReference<BranchMapping>();
+   //TODO: jrw decide what is a valid measure of unique identity in the system, the value below is clumsy
+   private final long rmid = System.currentTimeMillis();
    
    @Override
-   public void commit(Xid arg0, boolean arg1) throws XAException
+   public void commit(Xid xid, boolean xFlag) throws XAException
    {
-      //TODO: implement method
+      if (!this.xaConnection.isActive())
+      {
+         throw new XAException("Cannot commit using an inactive connection.");
+      }
+      /*
+      switch (xFlag)
+      {
+         case XAResource. :
+            
+            break;
+
+         default :
+            break;
+      }
+      */
    }
 
    @Override
-   public void end(Xid arg0, int arg1) throws XAException
+   public void end(Xid xid, int xFlag) throws XAException
    {
-      // FIXME end
+      if (!this.xaConnection.isActive())
+      {
+         throw new XAException(XAException.XA_RBPROTO);
+      }
+      if (!this.currentBranch.get().getXidentity().equals(xid))
+      {
+         throw new XAException("Cannot end a branch of work. The resource manager does not recognize this branch.");
+      }
+      //TODO:jrw pass through to the STM system 
       
    }
 
@@ -103,7 +137,66 @@
       {
          if (this.xaConnection.isActive())
          {
-            this.resourceManagers.get(this.xaConnection.getDataSource().getDatabaseName()).start(this.xaConnection, this, xId, xFlag);
+            if (logger.isTraceEnabled())
+            {
+               logger.trace("ResourceManager.start called with xid["+xId+"] and flag["+xFlag+"].");
+            }
+            BranchMapping mapping = this.branches.get(xId);
+            switch (xFlag)
+            {
+               case XAResource.TMJOIN:
+               case XAResource.TMRESUME:
+                  if (null == mapping)
+                  {
+                     logger.error("An attempt to resume a branch that is not associated with the XAResourceManager.");
+                     throw new XAException(XAException.XA_RBPROTO);
+                  }
+                  else if (ResourceStatus.SUSPENDED ==  mapping.getStatus())
+                  {
+                     logger.debug("Resuming transaction branch.");
+                     mapping.changeStatus(ResourceStatus.ACTIVE);  
+                  }
+                  else if (ResourceStatus.ACTIVE == mapping.getStatus())
+                  {
+                     logger.debug("Joining a transaction branch.");
+                     //TODO:jrw complete implementation
+                  }
+                  else
+                  {
+                     logger.error("Attemt to resume a branch that was not suspended.");
+                     throw new XAException(XAException.XA_RBPROTO);
+                  }
+                  break;
+               case XAResource.TMNOFLAGS:
+                  /* this is a request to associate a transaction branch with resource manager.
+                   *  check for existing association and create one if not found  */
+                  if (null == currentBranch.get())
+                  {// no branch is currently running
+                     if (null == mapping)
+                     {// create the association
+                        synchronized(this)
+                        {
+                           mapping  = new BranchMapping(xId, this.xaConnection, ResourceStatus.ACTIVE);
+                           this.currentBranch.set(mapping);
+                           this.branches.put(xId, mapping);
+                        }
+                     }
+                     else
+                     {// existing branch association, should this be resumed not started?
+                        throw new XAException(XAException.XA_RBPROTO);
+                     }
+                  }
+                  else
+                  {/* there is an active branch running already, trying to start another
+                  branch is a protocol error. The current branch should have been closed or suspended.
+                  */ 
+                     throw new XAException(XAException.XA_RBPROTO);
+                  }
+                  break;
+               default :
+                  logger.error("Unexpected xFlag received for XAResource.start");
+                  break;
+            }
          }
          else
          {
@@ -119,21 +212,38 @@
          logger.info("Instance of STMXAResource(STMXAConnectionImpl) created.");
       }
       this.xaConnection = connection;
-      resourceManagers.putIfAbsent(connection.getDataSource().getDatabaseName(), new XAResourceManager()) ;
    }
-   
+
    @Override
    public boolean equals(Object obj)
    {
-      // TODO: jrw complete implementation
-      return super.equals(obj);
+      boolean returnValue = false;
+      if (null != obj)
+      {
+         if (this == obj)
+         {
+            returnValue = true;
+         }
+         else
+         {
+            if (this.getClass().equals(obj.getClass()))
+            {
+               STMXAResource target = (STMXAResource)obj;
+               returnValue = new EqualsBuilder().append(this.hashCode() , target.hashCode() ).isEquals();
+            }
+         }
+      }
+      return returnValue;
    }
    
    @Override
    public int hashCode()
    {
-   // TODO: jrw complete implementation
-      return super.hashCode();
+      return new HashCodeBuilder(17, 37).append(this.rmid).toHashCode();
    }
-
+   
+   public STMXAConnectionImpl getXAConnection()
+   {
+      return this.xaConnection;
+   }
 }

Added: labs/jbosstm/workspace/whitingjr/trunk/MVCCSampleSTM/src/test/java/uk/ac/ncl/sdia/a8905943/persistence/xa/JUnitTestSTMXAConnectionImpl.java
===================================================================
--- labs/jbosstm/workspace/whitingjr/trunk/MVCCSampleSTM/src/test/java/uk/ac/ncl/sdia/a8905943/persistence/xa/JUnitTestSTMXAConnectionImpl.java	                        (rev 0)
+++ labs/jbosstm/workspace/whitingjr/trunk/MVCCSampleSTM/src/test/java/uk/ac/ncl/sdia/a8905943/persistence/xa/JUnitTestSTMXAConnectionImpl.java	2009-07-26 19:19:00 UTC (rev 28459)
@@ -0,0 +1,12 @@
+/*
+ * JBoss, the OpenSource J2EE webOS
+ * 
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package uk.ac.ncl.sdia.a8905943.persistence.xa;
+
+public class JUnitTestSTMXAConnectionImpl
+{
+
+}

Added: labs/jbosstm/workspace/whitingjr/trunk/MVCCSampleSTM/src/test/java/uk/ac/ncl/sdia/a8905943/persistence/xa/JUnitTestSTMXAResource.java
===================================================================
--- labs/jbosstm/workspace/whitingjr/trunk/MVCCSampleSTM/src/test/java/uk/ac/ncl/sdia/a8905943/persistence/xa/JUnitTestSTMXAResource.java	                        (rev 0)
+++ labs/jbosstm/workspace/whitingjr/trunk/MVCCSampleSTM/src/test/java/uk/ac/ncl/sdia/a8905943/persistence/xa/JUnitTestSTMXAResource.java	2009-07-26 19:19:00 UTC (rev 28459)
@@ -0,0 +1,98 @@
+/*
+ * JBoss, the OpenSource J2EE webOS
+ * 
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package uk.ac.ncl.sdia.a8905943.persistence.xa;
+
+import java.sql.SQLException;
+
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import org.apache.log4j.Logger;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.arjuna.ats.arjuna.xa.XID;
+import com.arjuna.ats.jta.xa.XidImple;
+
+public class JUnitTestSTMXAResource
+{
+   private static final Logger logger = Logger.getLogger(JUnitTestSTMXAResource.class);
+   private STMXAResource resourceManager ;
+   @Before
+   public void setUp()
+   {
+      try
+      {
+         STMXADatasourceImpl dataSource = new STMXADatasourceImpl();
+         dataSource.setDatabaseName("media");
+         STMXAConnectionImpl connection = (STMXAConnectionImpl) dataSource.getXAConnection();
+         this.resourceManager = (STMXAResource)connection.getXAResource();
+      }
+      catch (SQLException sqle)
+      {
+         Assert.fail();
+      }
+   }
+   @Test
+   /**
+    * This test checks the resource manager allows a new branch to be started
+    * for the associated connection.
+    */
+   public void testCheckStartWorks()
+   {
+      // starting off with a clean resource manager
+      Assert.assertNotNull(resourceManager);
+      Xid xid = new XidImple();
+      try
+      {
+         resourceManager.getXAConnection().getConnection();
+         resourceManager.start(xid, XAResource.TMNOFLAGS);
+      }
+      catch (XAException xae)
+      {// not expecting an exception here
+         Assert.fail("Not expecting an exception when XAResource.start called. message["+xae.getMessage()+"] code ["+xae.errorCode+"].");
+      }
+      catch (SQLException sqle)
+      {
+         Assert.fail("Failed to create a Connection object to get the resource manager to accept work.");
+      }
+   }
+   
+   
+   /**
+    * Test to check an exception is thrown when an active branch is
+    * running and a second branch attempts to start.
+    */
+   @Test (expected=XAException.class)
+   public void testCheckExceptionThrownForActiveBranch()
+      throws XAException
+   {// starting off with a clean resource manager
+      Assert.assertNotNull(resourceManager);
+      Xid xid = new XidImple();
+      resourceManager.start(xid, XAResource.TMNOFLAGS);
+      XID xidaj = new XID();// set dummy data
+      xidaj.data = "sample".getBytes();
+      xidaj.formatID = 44;
+      xidaj.gtrid_length = 55;
+      // attempt to start another branch when one is already active
+      try
+      {
+         resourceManager.start(new XidImple(xidaj), XAResource.TMNOFLAGS);
+         Assert.fail("REsource manager failed to throw an exception when starting a branch");
+      }
+      catch (XAException xae)
+      {
+         Assert.assertNotNull(xae);
+         Assert.assertEquals(XAException.XA_RBPROTO, xae.errorCode);//check protocol error is reported
+         throw xae;
+      }
+      
+   }
+   
+}



More information about the jboss-svn-commits mailing list