[hornetq-commits] JBoss hornetq SVN: r7966 - trunk/src/main/org/hornetq/ra/inflow.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Sep 17 05:54:17 EDT 2009


Author: ataylor
Date: 2009-09-17 05:54:17 -0400 (Thu, 17 Sep 2009)
New Revision: 7966

Modified:
   trunk/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-134 - reverted to option B which is now followed exactly but left in local tx optimisation which is off by default

Modified: trunk/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java
===================================================================
--- trunk/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java	2009-09-17 08:54:21 UTC (rev 7965)
+++ trunk/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java	2009-09-17 09:54:17 UTC (rev 7966)
@@ -13,12 +13,15 @@
 package org.hornetq.ra.inflow;
 
 import java.util.UUID;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
 
 import javax.jms.InvalidClientIDException;
 import javax.jms.JMSException;
 import javax.jms.MessageListener;
 import javax.resource.spi.endpoint.MessageEndpoint;
 import javax.resource.spi.endpoint.MessageEndpointFactory;
+import javax.resource.ResourceException;
 import javax.transaction.Status;
 import javax.transaction.Transaction;
 import javax.transaction.TransactionManager;
@@ -66,10 +69,7 @@
 
    private final HornetQActivation activation;
 
-   /**
-    * The transaction demarcation strategy factory
-    */
-   private final DemarcationStrategyFactory strategyFactory = new DemarcationStrategyFactory();
+   private boolean useLocalTx;
 
    public HornetQMessageHandler(final HornetQActivation activation, final ClientSession session)
    {
@@ -103,7 +103,7 @@
 
          SimpleString queueName = new SimpleString(HornetQTopic.createQueueNameForDurableSubscription(activation.getActivationSpec()
                .getClientID(),
-                                                                                                    subscriptionName));
+                                                                                                      subscriptionName));
 
          SessionQueueQueryResponseMessage subResponse = session.queueQuery(queueName);
 
@@ -160,6 +160,7 @@
 
       // Create the endpoint, if we are transacted pass the sesion so it is enlisted, unless using Local TX
       MessageEndpointFactory endpointFactory = activation.getMessageEndpointFactory();
+      useLocalTx = !activation.isDeliveryTransacted() && activation.getActivationSpec().isUseLocalTx();
       if (activation.isDeliveryTransacted() && !activation.getActivationSpec().isUseLocalTx())
       {
          endpoint = endpointFactory.createEndpoint(session);
@@ -213,321 +214,49 @@
          log.trace("onMessage(" + message + ")");
       }
 
-      TransactionDemarcationStrategy txnStrategy = strategyFactory.getStrategy();
-      try
-      {
-         txnStrategy.start();
-      }
-      catch (Throwable throwable)
-      {
-         log.warn("Unable to create transaction: " + throwable.getMessage());
-         txnStrategy = new NoTXTransactionDemarcationStrategy();
-      }
-
       HornetQMessage msg = HornetQMessage.createMessage(message, session);
-
+      boolean beforeDelivery = false;
       try
       {
+         endpoint.beforeDelivery(HornetQActivation.ONMESSAGE);
+         beforeDelivery = true;
          msg.doBeforeReceive();
          message.acknowledge();
-      }
-      catch (Exception e)
-      {
-         log.error("Failed to prepare message for receipt", e);
-
-         return;
-      }
-
-      try
-      {
          ((MessageListener) endpoint).onMessage(msg);
-      }
-      catch (Throwable t)
-      {
-         log.error("Unexpected error delivering message " + message, t);
-         txnStrategy.error();
-      }
-      finally
-      {
-         txnStrategy.end();
-      }
-   }
-
-   /**
-    * Demarcation strategy factory
-    */
-   private class DemarcationStrategyFactory
-   {
-      /**
-       * Get the transaction demarcation strategy
-       *
-       * @return The strategy
-       */
-      TransactionDemarcationStrategy getStrategy()
-      {
-         if (trace)
+         endpoint.afterDelivery();
+         if(useLocalTx)
          {
-            log.trace("getStrategy()");
+            session.commit();
          }
-
-         if (activation.isDeliveryTransacted())
-         {
-            if (!activation.getActivationSpec().isUseLocalTx())
-            {
-               try
-               {
-                  return new XATransactionDemarcationStrategy();
-               }
-               catch (Throwable t)
-               {
-                  log.error(this + " error creating transaction demarcation ", t);
-               }
-            }
-            else
-            {
-               return new LocalDemarcationStrategy();
-            }
-
-         }
-         else
-         {
-            if (!activation.getActivationSpec().isUseLocalTx())
-            {
-               return new NoTXTransactionDemarcationStrategy();
-            }
-            else
-            {
-               return new LocalDemarcationStrategy();
-            }
-         }
-
-         return null;
       }
-   }
-
-   /**
-    * Transaction demarcation strategy
-    */
-   private interface TransactionDemarcationStrategy
-   {
-      /*
-      * Start
-      */
-      void start() throws Throwable;
-
-      /**
-       * Error
-       */
-      void error();
-
-      /**
-       * End
-       */
-      void end();
-   }
-
-   /**
-    * Local demarcation strategy
-    */
-   private class LocalDemarcationStrategy implements TransactionDemarcationStrategy
-   {
-      private boolean rolledBack = false;
-      /*
-      * Start
-      */
-
-      public void start()
+      catch (Throwable e)
       {
-      }
-
-      /**
-       * Error
-       */
-      public void error()
-      {
-         if (trace)
+         log.error("Failed to deliver message", e);
+         //we need to call before/afterDelivery as a pair
+         if(beforeDelivery)
          {
-            log.trace("error()");
-         }
-
-         if (session != null)
-         {
             try
             {
-               session.rollback();
-               rolledBack = true;
+               endpoint.afterDelivery();
             }
-            catch (HornetQException e)
+            catch (ResourceException e1)
             {
-               log.error("Failed to rollback session transaction", e);
+               log.warn("Unable to call after delivery");
             }
          }
-      }
-
-      /**
-       * End
-       */
-      public void end()
-      {
-         if (trace)
+         if(useLocalTx)
          {
-            log.trace("end()");
-         }
-
-         if (!rolledBack)
-         {
-            if (session != null)
-            {
-               try
-               {
-                  session.commit();
-               }
-               catch (HornetQException e)
-               {
-                  log.error("Failed to commit session transaction", e);
-               }
-            }
-         }
-      }
-   }
-
-   /**
-    * XA demarcation strategy
-    */
-   private class XATransactionDemarcationStrategy implements TransactionDemarcationStrategy
-   {
-      private final TransactionManager tm = activation.getTransactionManager();
-
-      private Transaction trans;
-
-      public void start() throws Throwable
-      {
-         final int timeout = activation.getActivationSpec().getTransactionTimeout();
-
-         if (timeout > 0)
-         {
-            if (trace)
-            {
-               log.trace("Setting transactionTimeout for JMSSessionPool to " + timeout);
-            }
-
-            tm.setTransactionTimeout(timeout);
-         }
-
-         tm.begin();
-
-         try
-         {
-            trans = tm.getTransaction();
-
-            if (trace)
-            {
-               log.trace(this + " using tx=" + trans);
-            }
-
-            if (!trans.enlistResource(session))
-            {
-               throw new JMSException("could not enlist resource");
-            }
-            if (trace)
-            {
-               log.trace(this + " XAResource '" + session + " enlisted.");
-            }
-
-         }
-         catch (Throwable t)
-         {
             try
             {
-               tm.rollback();
+               session.rollback();
             }
-            catch (Throwable ignored)
+            catch (HornetQException e1)
             {
-               log.trace(this + " ignored error rolling back after failed enlist", ignored);
+               log.warn("Unable to roll local transaction back");
             }
-            throw t;
          }
       }
 
-      public void error()
-      {
-         // Mark for tollback TX via TM
-         try
-         {
-            if (trace)
-            {
-               log.trace(this + " using TM to mark TX for rollback tx=" + trans);
-            }
-
-            trans.setRollbackOnly();
-         }
-         catch (Throwable t)
-         {
-            log.error(this + " failed to set rollback only", t);
-         }
-      }
-
-      public void end()
-      {
-         try
-         {
-            // Use the TM to commit the Tx (assert the correct association)
-            Transaction currentTx = tm.getTransaction();
-            if (!trans.equals(currentTx))
-            {
-               throw new IllegalStateException("Wrong tx association: expected " + trans + " was " + currentTx);
-            }
-
-            // Marked rollback
-            if (trans.getStatus() == Status.STATUS_MARKED_ROLLBACK)
-            {
-               if (trace)
-               {
-                  log.trace(this + " rolling back JMS transaction tx=" + trans);
-               }
-
-               // Actually roll it back
-               tm.rollback();
-
-            }
-            else if (trans.getStatus() == Status.STATUS_ACTIVE)
-            {
-               // Commit tx
-               // This will happen if
-               // a) everything goes well
-               // b) app. exception was thrown
-               if (trace)
-               {
-                  log.trace(this + " commiting the JMS transaction tx=" + trans);
-               }
-
-               tm.commit();
-
-            }
-            else
-            {
-               tm.suspend();
-            }
-         }
-         catch (Throwable t)
-         {
-            log.error(this + " failed to commit/rollback", t);
-         }
-      }
    }
 
-   private class NoTXTransactionDemarcationStrategy implements TransactionDemarcationStrategy
-   {
-      public void start() throws Throwable
-      {
-      }
-
-      public void error()
-      {
-      }
-
-      public void end()
-      {
-      }
-   }
 }



More information about the hornetq-commits mailing list