[hornetq-commits] JBoss hornetq SVN: r9598 - in trunk/src/main/org/hornetq: ra/inflow and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Aug 26 13:48:38 EDT 2010


Author: timfox
Date: 2010-08-26 13:48:38 -0400 (Thu, 26 Aug 2010)
New Revision: 9598

Modified:
   trunk/src/main/org/hornetq/jms/client/HornetQSession.java
   trunk/src/main/org/hornetq/jms/client/JMSMessageListenerWrapper.java
   trunk/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java
Log:
https://jira.jboss.org/browse/HORNETQ-495

Modified: trunk/src/main/org/hornetq/jms/client/HornetQSession.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQSession.java	2010-08-25 15:34:03 UTC (rev 9597)
+++ trunk/src/main/org/hornetq/jms/client/HornetQSession.java	2010-08-26 17:48:38 UTC (rev 9598)
@@ -213,6 +213,11 @@
 
       return ackMode;
    }
+   
+   public boolean isXA()
+   {
+      return xa;
+   }
 
    public void commit() throws JMSException
    {

Modified: trunk/src/main/org/hornetq/jms/client/JMSMessageListenerWrapper.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/JMSMessageListenerWrapper.java	2010-08-25 15:34:03 UTC (rev 9597)
+++ trunk/src/main/org/hornetq/jms/client/JMSMessageListenerWrapper.java	2010-08-26 17:48:38 UTC (rev 9598)
@@ -42,9 +42,9 @@
    private final boolean transactedOrClientAck;
 
    protected JMSMessageListenerWrapper(final HornetQSession session,
-                                    final ClientConsumer consumer,
-                                    final MessageListener listener,
-                                    final int ackMode)
+                                       final ClientConsumer consumer,
+                                       final MessageListener listener,
+                                       final int ackMode)
    {
       this.session = session;
 
@@ -52,7 +52,7 @@
 
       this.listener = listener;
 
-      transactedOrClientAck = ackMode == Session.SESSION_TRANSACTED || ackMode == Session.CLIENT_ACKNOWLEDGE;
+      transactedOrClientAck = (ackMode == Session.SESSION_TRANSACTED || ackMode == Session.CLIENT_ACKNOWLEDGE) || session.isXA();
    }
 
    /**

Modified: trunk/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java
===================================================================
--- trunk/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java	2010-08-25 15:34:03 UTC (rev 9597)
+++ trunk/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java	2010-08-26 17:48:38 UTC (rev 9598)
@@ -68,11 +68,16 @@
 
    private boolean useLocalTx;
    
+   private boolean transacted;
+
    private final int sessionNr;
 
    private final TransactionManager tm;
 
-   public HornetQMessageHandler(final HornetQActivation activation, final TransactionManager tm, final ClientSession session, final int sessionNr)
+   public HornetQMessageHandler(final HornetQActivation activation,
+                                final TransactionManager tm,
+                                final ClientSession session,
+                                final int sessionNr)
    {
       this.activation = activation;
       this.session = session;
@@ -96,14 +101,16 @@
       {
          String subscriptionName = spec.getSubscriptionName();
          String clientID = spec.getClientID();
-         
+
          // Durable sub
          if (clientID == null)
          {
-            throw new InvalidClientIDException("Cannot create durable subscription for " + subscriptionName + " - client ID has not been set");
+            throw new InvalidClientIDException("Cannot create durable subscription for " + subscriptionName +
+                                               " - client ID has not been set");
          }
 
-         SimpleString queueName = new SimpleString(HornetQDestination.createQueueNameForDurableSubscription(clientID, subscriptionName));
+         SimpleString queueName = new SimpleString(HornetQDestination.createQueueNameForDurableSubscription(clientID,
+                                                                                                            subscriptionName));
 
          QueueQuery subResponse = session.queueQuery(queueName);
 
@@ -123,9 +130,10 @@
             SimpleString oldFilterString = subResponse.getFilterString();
 
             boolean selectorChanged = selector == null && oldFilterString != null ||
-                                      oldFilterString == null && selector != null ||
-                                      (oldFilterString != null && selector != null &&
-                                      !oldFilterString.toString().equals(selector));
+                                      oldFilterString == null &&
+                                      selector != null ||
+                                      (oldFilterString != null && selector != null && !oldFilterString.toString()
+                                                                                                      .equals(selector));
 
             SimpleString oldTopicName = subResponse.getAddress();
 
@@ -155,7 +163,7 @@
             }
             else
             {
-               queueName = activation.getTopicTemporaryQueue(); 
+               queueName = activation.getTopicTemporaryQueue();
             }
          }
          else
@@ -168,6 +176,7 @@
       // Create the endpoint, if we are transacted pass the sesion so it is enlisted, unless using Local TX
       MessageEndpointFactory endpointFactory = activation.getMessageEndpointFactory();
       useLocalTx = !activation.isDeliveryTransacted() && activation.getActivationSpec().isUseLocalTx();
+      transacted = activation.isDeliveryTransacted();
       if (activation.isDeliveryTransacted() && !activation.getActivationSpec().isUseLocalTx())
       {
          endpoint = endpointFactory.createEndpoint(session);
@@ -201,7 +210,7 @@
       {
          HornetQMessageHandler.log.debug("Error releasing endpoint " + endpoint, t);
       }
-      
+
       try
       {
          consumer.close();
@@ -246,15 +255,28 @@
 
       try
       {
-         if(activation.getActivationSpec().getTransactionTimeout() > 0 && tm != null)
+         if (activation.getActivationSpec().getTransactionTimeout() > 0 && tm != null)
          {
             tm.setTransactionTimeout(activation.getActivationSpec().getTransactionTimeout());
          }
          endpoint.beforeDelivery(HornetQActivation.ONMESSAGE);
          beforeDelivery = true;
          msg.doBeforeReceive();
+         
+         //In the transacted case the message must be acked *before* onMessage is called
+         
+         if (transacted)
+         {
+            message.acknowledge();
+         }
+         
          ((MessageListener)endpoint).onMessage(msg);
-         message.acknowledge();
+         
+         if (!transacted)
+         {
+            message.acknowledge();
+         }
+         
          try
          {
             endpoint.afterDelivery();



More information about the hornetq-commits mailing list