[jboss-svn-commits] JBL Code SVN: r28662 - labs/jbosstm/workspace/whitingjr/trunk/MVCCSampleSTM/src/main/java/uk/ac/ncl/sdia/a8905943/persistence/xa.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Fri Jul 31 12:41:52 EDT 2009


Author: whitingjr
Date: 2009-07-31 12:41:51 -0400 (Fri, 31 Jul 2009)
New Revision: 28662

Modified:
   labs/jbosstm/workspace/whitingjr/trunk/MVCCSampleSTM/src/main/java/uk/ac/ncl/sdia/a8905943/persistence/xa/STMXAResource.java
Log:
Added implementation to commit. Fixed OnePhase HeuristicExceptions reported by BasicAction.


Modified: labs/jbosstm/workspace/whitingjr/trunk/MVCCSampleSTM/src/main/java/uk/ac/ncl/sdia/a8905943/persistence/xa/STMXAResource.java
===================================================================
--- labs/jbosstm/workspace/whitingjr/trunk/MVCCSampleSTM/src/main/java/uk/ac/ncl/sdia/a8905943/persistence/xa/STMXAResource.java	2009-07-31 16:41:01 UTC (rev 28661)
+++ labs/jbosstm/workspace/whitingjr/trunk/MVCCSampleSTM/src/main/java/uk/ac/ncl/sdia/a8905943/persistence/xa/STMXAResource.java	2009-07-31 16:41:51 UTC (rev 28662)
@@ -6,6 +6,7 @@
  */
 package uk.ac.ncl.sdia.a8905943.persistence.xa;
 
+import java.sql.SQLException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicReference;
@@ -31,40 +32,68 @@
 {
    // access to the shared resource manager is synchronized
    private final STMXAConnectionImpl xaConnection;
+
    private static final Logger logger = Logger.getLogger(STMXAResource.class);
+
    /* Branch mappings are to associate the Xid branch and the thread of control
     * which this connection is allocated to. Only one active branch is allowed per
     * connection, others may be associated. Non active associations allowed are
     * suspended or ended.*/
-   private final Map<Xid, BranchMapping> branches = new HashMap<Xid, BranchMapping> ();
+   private final Map<Xid, BranchMapping> branches = new HashMap<Xid, BranchMapping>();
+
    private AtomicReference<Xid> currentBranch = new AtomicReference<Xid>();
+
    private final Map<Xid, BranchMapping> prepared = new HashMap<Xid, BranchMapping>();
+
    //TODO: jrw decide what is a valid measure of unique identity in the system, the value below is clumsy
    private final long rmid = System.currentTimeMillis();
-   
+
    @Override
-   public void commit(Xid xid, boolean xFlag) throws XAException
+   public void commit(Xid xid, boolean isOnePhase) throws XAException
    {
+      if (logger.isDebugEnabled())
+      {
+         logger.debug("XAResource.commit called.");
+      }
       if (!this.xaConnection.isActive())
       {
          throw new XAException("Cannot commit using an inactive connection.");
       }
-      if (!isPrepared (xid))
+      
+      if (isOnePhase)
       {
-         throw new XAException(XAException.XAER_NOTA);
+         /* inform the transactional system the changes are to be committed. There should
+         not be any blocking by the STM. */
+         try
+         {
+            this.getXAConnection().getConnection().commit();
+         }
+         catch (SQLException sqle)
+         {
+            //TODO: perform more detailed mapping of the SQLException to XAException.
+            throw new XAException(XAException.XA_RBCOMMFAIL); 
+         }
       }
-      /* inform the transactional system the prepared changes are to be committed. There should
-      not be any blocking by the STM. */
-      getXAConnection().getSTM().commit();
-      //TODO: complete the implementation to handle errors and mapping them to XAException types
+      else
+      {
+         if (!isPrepared(xid))
+         {
+            throw new XAException(XAException.XAER_NOTA);
+         }
+         //TODO: perform two phase commit of prepared changes.
+      }
    }
 
    @Override
    public void end(Xid xid, int xFlag) throws XAException
    {
+      if (logger.isDebugEnabled())
+      {
+         logger.debug("XAResource.end called.");
+      }
       if (logger.isTraceEnabled())
       {
-         logger.trace("XAResource end called for branch ["+xid.toString()+"] and flag ["+xFlag+"]");
+         logger.trace("XAResource end called for branch [" + xid.toString() + "] and flag [" + xFlag + "]");
       }
       if (!this.xaConnection.isActive())
       {
@@ -77,15 +106,15 @@
       switch (xFlag)
       {
          case XAResource.TMSUSPEND :
-            
+
             if (!isActive(xid))
             {
                throw new XAException("Attempted to XAResource.end suspend an inactive branch rejected.");
             }
-            synchronized(this)
+            synchronized (this)
             {
                BranchMapping active = this.branches.get(xid);
-               boolean transitionSuccess = active.changeStatus(ResourceStatus.ACTIVE, ResourceStatus.SUSPENDED); 
+               boolean transitionSuccess = active.changeStatus(ResourceStatus.ACTIVE, ResourceStatus.SUSPENDED);
                if (!transitionSuccess)
                {
                   throw new XAException("Failed to transition an active branch to suspended status. ");
@@ -96,18 +125,20 @@
                   throw new XAException("Failed to nullify currentActive branch id, this invariant has been corrupted.");
                }
             }
-            
+
             break;
 
-         case XAResource.TMSUCCESS:
-            
+         case XAResource.TMSUCCESS :
+
             if (!isActive(xid))
             {
-               throw new XAException("Attempted to XAResource.end successful completion of an inactive branch rejected.");
+               throw new XAException(
+                     "Attempted to XAResource.end successful completion of an inactive branch rejected.");
             }
-            synchronized(this)
+            synchronized (this)
             {
-               boolean transitionSuccess = this.branches.get(xid).changeStatus(ResourceStatus.ACTIVE, ResourceStatus.COMPLETE);
+               boolean transitionSuccess = this.branches.get(xid).changeStatus(ResourceStatus.ACTIVE,
+                     ResourceStatus.COMPLETE);
                if (!transitionSuccess)
                {
                   throw new XAException("Failed to transition an active branch to completed status. ");
@@ -119,52 +150,73 @@
                }
             }
             
+            
             break;
-            
+
+         case XAResource.TMONEPHASE :
+            logger.debug("One phase optimization is being used by TransactionManager.");
+
          default :
             throw new XAException(XAException.XA_RBOTHER);
       }
-      
+
       //TODO:jrw pass through to the STM system 
-      
+
    }
 
    @Override
    public void forget(Xid arg0) throws XAException
    {
-      // FIXME forget
-      
+      if (logger.isDebugEnabled())
+      {
+         logger.debug("XAResource.forget called.");
+      }
    }
 
    @Override
    public int getTransactionTimeout() throws XAException
    {
-      // FIXME getTransactionTimeout
+      if (logger.isDebugEnabled())
+      {
+         logger.debug("XAResource.getTransactionTimeout called.");
+      }
       return 0;
    }
 
    @Override
    public boolean isSameRM(XAResource stranger) throws XAException
    {
+      if (logger.isDebugEnabled())
+      {
+         logger.debug("XAResource.isSameRM called.");
+      }
       return this.equals(stranger);
    }
 
    @Override
    public int prepare(Xid xId) throws XAException
    {
+      if (logger.isDebugEnabled())
+      {
+         logger.debug("XAResource.prepare called.");
+      }
       if (logger.isTraceEnabled())
       {
-         logger.trace("XAResource.prepare called for branch ["+xId+"].");
+         logger.trace("XAResource.prepare called for branch [" + xId + "].");
       }
       isXidRecognised(xId);
-      
-      
+
       return 0;
    }
 
    @Override
    public Xid[] recover(int arg0) throws XAException
    {
+      if (logger.isDebugEnabled())
+      {
+         logger.debug("XAResource.recover called.");
+      }
+
       if (true)
       {
          throw new UnsupportedOperationException("STMXAResource does not support recover(int).");
@@ -175,13 +227,20 @@
    @Override
    public void rollback(Xid arg0) throws XAException
    {
+      if (logger.isDebugEnabled())
+      {
+         logger.debug("XAResource.rollback called.");
+      }
       // pass instruction through to STM system to rollback any prepared resources
    }
 
    @Override
    public boolean setTransactionTimeout(int arg0) throws XAException
    {
-      // FIXME setTransactionTimeout
+      if (logger.isDebugEnabled())
+      {
+         logger.debug("XAResource.setTransactionTimeout called.");
+      }
       return false;
    }
 
@@ -191,28 +250,32 @@
    @Override
    public void start(Xid xId, int xFlag) throws XAException
    {
+      if (logger.isDebugEnabled())
+      {
+         logger.debug("XAResource.start called.");
+      }
       if (null != xId)
       {
          if (this.xaConnection.isActive())
          {
             if (logger.isTraceEnabled())
             {
-               logger.trace("ResourceManager.start called with xid["+xId+"] and flag["+xFlag+"].");
+               logger.trace("XAResource.start called with xid[" + xId + "] and flag[" + xFlag + "].");
             }
             BranchMapping mapping = this.branches.get(xId);
             switch (xFlag)
             {
-               case XAResource.TMJOIN:
-               case XAResource.TMRESUME:
+               case XAResource.TMJOIN :
+               case XAResource.TMRESUME :
                   if (null == mapping)
                   {
                      logger.error("An attempt to resume a branch that is not associated with the XAResourceManager.");
                      throw new XAException(XAException.XA_RBPROTO);
                   }
-                  else if (ResourceStatus.SUSPENDED ==  mapping.getStatus())
+                  else if (ResourceStatus.SUSPENDED == mapping.getStatus())
                   {
                      logger.debug("Resuming transaction branch.");
-                     boolean transitionSuccess = mapping.changeStatus(ResourceStatus.SUSPENDED,  ResourceStatus.ACTIVE); 
+                     boolean transitionSuccess = mapping.changeStatus(ResourceStatus.SUSPENDED, ResourceStatus.ACTIVE);
                      if (!transitionSuccess)
                      {
                         throw new XAException(XAException.XAER_RMERR);
@@ -229,16 +292,16 @@
                      throw new XAException(XAException.XA_RBPROTO);
                   }
                   break;
-               case XAResource.TMNOFLAGS:
+               case XAResource.TMNOFLAGS :
                   /* this is a request to associate a transaction branch with resource manager.
                    *  check for existing association and create one if not found  */
                   if (null == currentBranch.get())
                   {// no branch is currently running
                      if (null == mapping)
                      {// create the association
-                        synchronized(this)
+                        synchronized (this)
                         {
-                           mapping  = new BranchMapping(xId, this.xaConnection, ResourceStatus.ACTIVE);
+                           mapping = new BranchMapping(xId, this.xaConnection, ResourceStatus.ACTIVE);
                            this.currentBranch.set(xId);
                            this.branches.put(xId, mapping);
                         }
@@ -250,8 +313,8 @@
                   }
                   else
                   {/* there is an active branch running already, trying to start another
-                  branch is a protocol error. The current branch should have been closed or suspended.
-                  */ 
+                                                      branch is a protocol error. The current branch should have been closed or suspended.
+                                                      */
                      throw new XAException(XAException.XA_RBPROTO);
                   }
                   break;
@@ -266,7 +329,7 @@
          }
       }
    }
-   
+
    public STMXAResource(STMXAConnectionImpl connection)
    {
       if (logger.isInfoEnabled())
@@ -290,26 +353,25 @@
          {
             if (this.getClass().equals(obj.getClass()))
             {
-               STMXAResource target = (STMXAResource)obj;
-               returnValue = new EqualsBuilder().append(this.hashCode() , target.hashCode() ).isEquals();
+               STMXAResource target = (STMXAResource) obj;
+               returnValue = new EqualsBuilder().append(this.hashCode(), target.hashCode()).isEquals();
             }
          }
       }
       return returnValue;
    }
-   
+
    @Override
    public int hashCode()
    {
       return new HashCodeBuilder(17, 37).append(this.rmid).toHashCode();
    }
-   
+
    public STMXAConnectionImpl getXAConnection()
    {
       return this.xaConnection;
    }
-   
-   //
+
    private boolean isActive(Xid xaIdentity)
    {
       boolean returnValue = false;
@@ -317,16 +379,17 @@
       {
          returnValue = true;
       }
-      return  returnValue;
+      return returnValue;
    }
-   
+
    private boolean isXidRecognised(Xid xid)
    {
       return this.branches.containsKey(xid);
    }
-   
+
    private boolean isPrepared(Xid xid)
    {
       return this.prepared.containsKey(xid);
    }
+
 }



More information about the jboss-svn-commits mailing list