[jboss-cvs] JBossAS SVN: r90937 - branches/Branch_5_x/connector/src/main/org/jboss/resource/adapter/jms/inflow.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Jul 8 10:44:19 EDT 2009


Author: jesper.pedersen
Date: 2009-07-08 10:44:19 -0400 (Wed, 08 Jul 2009)
New Revision: 90937

Modified:
   branches/Branch_5_x/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsActivationSpec.java
   branches/Branch_5_x/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsServerSession.java
Log:
[JBAS-7084] Add support for a traditional XA strategy in the JMS resource adapter

Modified: branches/Branch_5_x/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsActivationSpec.java
===================================================================
--- branches/Branch_5_x/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsActivationSpec.java	2009-07-08 14:43:10 UTC (rev 90936)
+++ branches/Branch_5_x/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsActivationSpec.java	2009-07-08 14:44:19 UTC (rev 90937)
@@ -129,6 +129,8 @@
    
    private int forceClearAttempts = 0;
    
+   private Boolean forceTransacted = Boolean.FALSE;
+   
    public void setForceClearOnShutdown(boolean forceClear)
    {
       this.forceClearOnShutdown = forceClear;
@@ -715,4 +717,17 @@
    {
       this.isSameRMOverrideValue = isSameRMOverrideValue;
    }
+
+   public Boolean isForceTransacted()
+   {
+      if (forceTransacted != null)
+         return forceTransacted;
+
+      return Boolean.FALSE;
+   }
+
+   public void setForceTransacted(Boolean forceTransacted)
+   {
+      this.forceTransacted = forceTransacted;
+   }
 }

Modified: branches/Branch_5_x/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsServerSession.java
===================================================================
--- branches/Branch_5_x/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsServerSession.java	2009-07-08 14:43:10 UTC (rev 90936)
+++ branches/Branch_5_x/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsServerSession.java	2009-07-08 14:44:19 UTC (rev 90937)
@@ -277,49 +277,54 @@
 
    private class DemarcationStrategyFactory
    {
-
       TransactionDemarcationStrategy getStrategy()
       {
          TransactionDemarcationStrategy current = null;
-			final JmsActivationSpec spec = pool.getActivation()
-					.getActivationSpec();
-			final JmsActivation activation = pool.getActivation();
-			try 
-			{
-				//If we have a transacted delivery
-        	    if (activation.isDeliveryTransacted() )
-				{
-					//if we have an XASession
-        	    	if(xaSession != null)
-					{
-						current = new XATransactionDemarcationStrategy();
-					}
-					else  //if we don't have an XASession, simulate it with a transacted session
-					{
-						current = new SimulatedXATransactionDemarcationStrategy();
-					}
-					
-				}
-        	    else
-				{
-					current = new LocalDemarcationStrategy();
-				}
-			} 
-			catch (Throwable t) 
-			{
-				log.error(this + " error creating transaction demarcation ", t);
-			}
-			return current;
-		}
+         final JmsActivationSpec spec = pool.getActivation().getActivationSpec();
+         final JmsActivation activation = pool.getActivation();
+         try 
+         {
+            //If we have a transacted delivery
+            if (activation.isDeliveryTransacted())
+            {
+               //if we have an XASession
+               if (xaSession != null)
+               {
+                  if (spec.isForceTransacted())
+                  {
+                     current = new XATransactionDemarcationStrategy();
+                  }
+                  else
+                  {
+                     current = new TraditionalXATransactionDemarcationStrategy();
+                  }
+               }
+               else  //if we don't have an XASession, simulate it with a transacted session
+               {
+                  current = new SimulatedXATransactionDemarcationStrategy();
+               }
+            }
+            else
+            {
+               current = new LocalDemarcationStrategy();
+            }
+         } 
+         catch (Throwable t) 
+         {
+            log.error(this + " error creating transaction demarcation ", t);
+         }
 
+         if (current != null && log.isTraceEnabled())
+            log.trace("Using strategy: " + current.getClass().getName());
+
+         return current;
+      }
    }
 
    private interface TransactionDemarcationStrategy
    {
       void error();
-
       void end();
-
    }
 
    /**
@@ -528,40 +533,166 @@
 
 	}
 
-	/**
-	 * This class is used for XATransactions(ie. CMT) for the mdb message delivery.  It creates a transaction for the message delivery,
-	 * enlists the XASession object in the transaction and then after the on message is called, it will commit/rollback the transaction.
-	 * @author jhowell
-	 *
-	 */
-    private class XATransactionDemarcationStrategy implements
-			TransactionDemarcationStrategy
-	{
+   /**
+    * This class is used for XATransactions(ie. CMT) for the mdb message delivery.  It creates a transaction for the message delivery,
+    * enlists the XASession object in the transaction and then after the on message is called, it will commit/rollback the transaction.
+    * @author jhowell
+    *
+    */
+   private class XATransactionDemarcationStrategy implements TransactionDemarcationStrategy
+   {
+      boolean trace = log.isTraceEnabled();
+      TransactionManager tm = pool.getActivation().getTransactionManager();;
+      Transaction trans = null;
 
-		boolean trace = log.isTraceEnabled();
+      public XATransactionDemarcationStrategy() throws Throwable
+      {
+         final int timeout = pool.getActivation().getActivationSpec().getTransactionTimeout();
+         
+         if (timeout > 0)
+         {
+            log.trace("Setting transactionTimeout for JMSSessionPool to " + timeout);
+            tm.setTransactionTimeout(timeout);
+         }
 
-		TransactionManager tm = pool.getActivation().getTransactionManager();;
+         tm.begin();
 
-		public XATransactionDemarcationStrategy() throws Throwable
-		{
-			final int timeout = pool.getActivation().getActivationSpec()
-					.getTransactionTimeout();
+         try
+         {
+            trans = tm.getTransaction();
 
-			if (timeout > 0)
-			{
-				log.trace("Setting transactionTimeout for JMSSessionPool to "
-						+ timeout);
-				tm.setTransactionTimeout(timeout);
+            if (trace)
+               log.trace(JmsServerSession.this + " using tx=" + trans);
 
-			}
-		}
+            if (xaSession != null)
+            {
+               XAResource res = xaSession.getXAResource();
 
-		public void error()
-		{
-		}
+               if (!trans.enlistResource(res))
+               {
+                  throw new JMSException("could not enlist resource");
+               }
+               if (trace)
+                  log.trace(JmsServerSession.this + " XAResource '" + res + "' enlisted.");
+            }
+         } 
+         catch (Throwable t)
+         {
+            try
+            {
+               tm.rollback();
+            } 
+            catch (Throwable ignored)
+            {
+               log.trace(JmsServerSession.this + " ignored error rolling back after failed enlist", ignored);
+            }
+            throw t;
+         }
+      }
 
-		public void end()
-		{
-		}
-	}
-}
\ No newline at end of file
+      public void error()
+      {
+         // Mark for tollback TX via TM
+         try
+         {
+            if (trace)
+               log.trace(JmsServerSession.this + " using TM to mark TX for rollback tx=" + trans);
+
+            trans.setRollbackOnly();
+         } 
+         catch (Throwable t)
+         {
+            log.error(JmsServerSession.this + " failed to set rollback only", t);
+         }
+      }
+
+      public void end()
+      {
+         try
+         {
+            // Use the TM to commit the Tx (assert the correct association)
+            Transaction currentTx = tm.getTransaction();
+            if (trans.equals(currentTx) == false)
+               throw new IllegalStateException("Wrong tx association: expected " + trans + " was " + currentTx);
+
+            // Marked rollback
+            if (trans.getStatus() == Status.STATUS_MARKED_ROLLBACK)
+            {
+               if (trace)
+                  log.trace(JmsServerSession.this + " rolling back JMS transaction tx=" + trans);
+
+               // actually roll it back
+               tm.rollback();
+
+               // NO XASession? then manually rollback.
+               // This is not so good but
+               // it's the best we can do if we have no XASession.
+               if (xaSession == null && pool.getActivation().isDeliveryTransacted())
+               {
+                  session.rollback();
+               }
+            }
+            else if (trans.getStatus() == Status.STATUS_ACTIVE)
+            {
+               // Commit tx
+               // This will happen if
+               // a) everything goes well
+               // b) app. exception was thrown
+               if (trace)
+                  log.trace(JmsServerSession.this + " commiting the JMS transaction tx=" + trans);
+
+               tm.commit();
+
+               // NO XASession? then manually commit. This is not so good
+               // but
+               // it's the best we can do if we have no XASession.
+               if (xaSession == null && pool.getActivation().isDeliveryTransacted())
+               {
+                  session.commit();
+               }
+            } 
+            else
+            {
+               tm.suspend();
+
+               if (xaSession == null && pool.getActivation().isDeliveryTransacted())
+               {
+                  session.rollback();
+               }
+            }
+         } 
+         catch (Throwable t)
+         {
+            log.error(JmsServerSession.this + " failed to commit/rollback", t);
+         }
+      }
+   }
+
+   /**
+    * This class is used for traditional XATransaction interaction as described in JCA 1.5 12.5.6
+    */
+   private class TraditionalXATransactionDemarcationStrategy implements TransactionDemarcationStrategy
+   {
+      boolean trace = log.isTraceEnabled();
+      TransactionManager tm = pool.getActivation().getTransactionManager();;
+
+      public TraditionalXATransactionDemarcationStrategy() throws Throwable
+      {
+         final int timeout = pool.getActivation().getActivationSpec().getTransactionTimeout();
+         
+         if (timeout > 0)
+         {
+            log.trace("Setting transactionTimeout for JMSSessionPool to " + timeout);
+            tm.setTransactionTimeout(timeout);
+         }
+      }
+
+      public void error()
+      {
+      }
+
+      public void end()
+      {
+      }
+   }
+}




More information about the jboss-cvs-commits mailing list