[jboss-cvs] JBoss Messaging SVN: r7637 - trunk/src/main/org/jboss/messaging/ra/inflow.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Jul 29 06:17:35 EDT 2009


Author: ataylor
Date: 2009-07-29 06:17:35 -0400 (Wed, 29 Jul 2009)
New Revision: 7637

Modified:
   trunk/src/main/org/jboss/messaging/ra/inflow/JBMMessageHandler.java
Log:
changed JCA to use transactions directly instead of endpoints

Modified: trunk/src/main/org/jboss/messaging/ra/inflow/JBMMessageHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/ra/inflow/JBMMessageHandler.java	2009-07-29 08:53:42 UTC (rev 7636)
+++ trunk/src/main/org/jboss/messaging/ra/inflow/JBMMessageHandler.java	2009-07-29 10:17:35 UTC (rev 7637)
@@ -33,11 +33,12 @@
 import org.jboss.messaging.utils.SimpleString;
 
 import javax.jms.InvalidClientIDException;
+import javax.jms.JMSException;
 import javax.jms.MessageListener;
-import javax.resource.ResourceException;
 import javax.resource.spi.endpoint.MessageEndpoint;
 import javax.resource.spi.endpoint.MessageEndpointFactory;
-import javax.transaction.SystemException;
+import javax.transaction.Status;
+import javax.transaction.Transaction;
 import javax.transaction.TransactionManager;
 import java.util.UUID;
 
@@ -109,7 +110,7 @@
          }
 
          SimpleString queueName = new SimpleString(JBossTopic.createQueueNameForDurableSubscription(activation.getActivationSpec()
-                                                                                                              .getClientID(),
+               .getClientID(),
                                                                                                     subscriptionName));
 
          SessionQueueQueryResponseMessage subResponse = session.queueQuery(queueName);
@@ -247,7 +248,7 @@
 
       try
       {
-         ((MessageListener)endpoint).onMessage(jbm);
+         ((MessageListener) endpoint).onMessage(jbm);
       }
       catch (Throwable t)
       {
@@ -284,11 +285,11 @@
                try
                {
                   return new XATransactionDemarcationStrategy();
-                  }
-                  catch (Throwable t)
-                  {
-                     log.error(this + " error creating transaction demarcation ", t);
-                  }
+               }
+               catch (Throwable t)
+               {
+                  log.error(this + " error creating transaction demarcation ", t);
+               }
             }
             else
             {
@@ -405,6 +406,8 @@
    {
       private final TransactionManager tm = activation.getTransactionManager();
 
+      private Transaction trans;
+
       public void start() throws Throwable
       {
          final int timeout = activation.getActivationSpec().getTransactionTimeout();
@@ -418,38 +421,105 @@
 
             tm.setTransactionTimeout(timeout);
          }
-         endpoint.beforeDelivery(JBMActivation.ONMESSAGE);
-      }
 
-      public void error()
-      {
+         tm.begin();
+
          try
          {
+            trans = tm.getTransaction();
+
+            if (trace)
+            {
+               log.trace(this + " using tx=" + trans);
+            }
+
+            if (!trans.enlistResource(session))
+            {
+               throw new JMSException("could not enlist resource");
+            }
+            if (trace)
+            {
+               log.trace(this + " XAResource '" + session + " enlisted.");
+            }
+
+         }
+         catch (Throwable t)
+         {
             try
             {
-               tm.getTransaction().setRollbackOnly();
+               tm.rollback();
             }
-            catch (SystemException e)
+            catch (Throwable ignored)
             {
-               log.error("Unable to mark transaction as rollback only", e);
+               log.trace(this + " ignored error rolling back after failed enlist", ignored);
             }
-            endpoint.afterDelivery();
+            throw t;
          }
-         catch (ResourceException e)
+      }
+
+      public void error()
+      {
+         // Mark for tollback TX via TM
+         try
          {
-            log.error("Error calling after delivery on endpoint", e);
+            if (trace)
+            {
+               log.trace(this + " using TM to mark TX for rollback tx=" + trans);
+            }
+
+            trans.setRollbackOnly();
          }
+         catch (Throwable t)
+         {
+            log.error(this + " failed to set rollback only", t);
+         }
       }
 
       public void end()
       {
          try
          {
-            endpoint.afterDelivery();
+            // Use the TM to commit the Tx (assert the correct association)
+            Transaction currentTx = tm.getTransaction();
+            if (!trans.equals(currentTx))
+            {
+               throw new IllegalStateException("Wrong tx association: expected " + trans + " was " + currentTx);
+            }
+
+            // Marked rollback
+            if (trans.getStatus() == Status.STATUS_MARKED_ROLLBACK)
+            {
+               if (trace)
+               {
+                  log.trace(this + " rolling back JMS transaction tx=" + trans);
+               }
+
+               // Actually roll it back
+               tm.rollback();
+
+            }
+            else if (trans.getStatus() == Status.STATUS_ACTIVE)
+            {
+               // Commit tx
+               // This will happen if
+               // a) everything goes well
+               // b) app. exception was thrown
+               if (trace)
+               {
+                  log.trace(this + " commiting the JMS transaction tx=" + trans);
+               }
+
+               tm.commit();
+
+            }
+            else
+            {
+               tm.suspend();
+            }
          }
-         catch (ResourceException e)
+         catch (Throwable t)
          {
-            log.error("Error calling after delivery on endpoint", e);
+            log.error(this + " failed to commit/rollback", t);
          }
       }
    }




More information about the jboss-cvs-commits mailing list