[hornetq-commits] JBoss hornetq SVN: r9723 - in trunk: src/main/org/hornetq/core/server/impl and 1 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Sep 27 05:37:15 EDT 2010


Author: timfox
Date: 2010-09-27 05:37:14 -0400 (Mon, 27 Sep 2010)
New Revision: 9723

Modified:
   trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
   trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
   trunk/tests/src/org/hornetq/tests/integration/client/ReceiveImmediateTest.java
Log:
https://jira.jboss.org/browse/HORNETQ-450

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java	2010-09-24 12:16:25 UTC (rev 9722)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java	2010-09-27 09:37:14 UTC (rev 9723)
@@ -113,7 +113,7 @@
 
    private boolean stopped = false;
 
-   private final AtomicLong forceDeliveryCount = new AtomicLong(0);
+   private long forceDeliveryCount;
 
    private final SessionQueueQueryResponseMessage queueInfo;
 
@@ -226,7 +226,7 @@
                      // we only force delivery once per call to receive
                      if (!deliveryForced)
                      {
-                        session.forceDelivery(id, forceDeliveryCount.incrementAndGet());
+                        session.forceDelivery(id, forceDeliveryCount++);
 
                         deliveryForced = true;
                      }
@@ -260,18 +260,17 @@
                if (m.containsProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE))
                {
                   long seq = m.getLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE);
-                  if (seq >= forceDeliveryCount.longValue())
+
+                  if (forcingDelivery && seq == forceDeliveryCount - 1)
                   {
                      // forced delivery messages are discarded, nothing has been delivered by the queue
-                     if (forcingDelivery)
-                     {
-                        resetIfSlowConsumer();
-                        return null;
-                     }
+                     resetIfSlowConsumer();
+
+                     return null;
                   }
                   else
                   {
-                     // ignore any previous forced delivery message
+                     // Ignore the message
                      continue;
                   }
                }
@@ -425,7 +424,7 @@
       lastAckedMessage = null;
 
       creditsToSend = 0;
-      
+
       ackIndividually = false;
    }
 
@@ -468,7 +467,7 @@
    {
       return browseOnly;
    }
-   
+
    public synchronized void handleMessage(final ClientMessageInternal message) throws Exception
    {
       if (closing)
@@ -571,7 +570,7 @@
          while (iter.hasNext())
          {
             ClientMessageInternal message = iter.next();
-            
+
             flowControlBeforeConsumption(message);
          }
 
@@ -603,7 +602,7 @@
          {
             flushAcks();
          }
-         
+
          session.individualAcknowledge(id, message.getMessageID());
       }
       else
@@ -708,7 +707,7 @@
 
    private void resetIfSlowConsumer()
    {
-      if(clientWindowSize == 0)
+      if (clientWindowSize == 0)
       {
          slowConsumerInitialCreditSent = false;
          sendCredits(0);

Modified: trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java	2010-09-24 12:16:25 UTC (rev 9722)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java	2010-09-27 09:37:14 UTC (rev 9723)
@@ -73,7 +73,7 @@
    }
 
    /*
-    * Construct a MessageImpl from storage, or notification
+    * Construct a MessageImpl from storage, or notification, or before routing
     */
    public ServerMessageImpl(final long messageID, final int initialMessageBufferSize)
    {
@@ -82,11 +82,6 @@
       this.messageID = messageID;
    }
 
-   protected ServerMessageImpl(final int initialMessageBufferSize)
-   {
-      super(initialMessageBufferSize);
-   }
-
    /*
     * Copy constructor
     */

Modified: trunk/tests/src/org/hornetq/tests/integration/client/ReceiveImmediateTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/ReceiveImmediateTest.java	2010-09-24 12:16:25 UTC (rev 9722)
+++ trunk/tests/src/org/hornetq/tests/integration/client/ReceiveImmediateTest.java	2010-09-27 09:37:14 UTC (rev 9723)
@@ -16,7 +16,12 @@
 
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.*;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
 import org.hornetq.core.config.Configuration;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.server.HornetQServer;
@@ -113,7 +118,42 @@
       sf.close();
 
    }
+   
+   // https://jira.jboss.org/browse/HORNETQ-450
+   public void testReceivedImmediateFollowedByReceive() throws Exception
+   {
+      sf = HornetQClient.createClientSessionFactory(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));
+      sf.setBlockOnNonDurableSend(true);
 
+      ClientSession session = sf.createSession(false, true, true);
+
+      session.createQueue(ADDRESS, QUEUE, null, false);
+      
+      ClientProducer producer = session.createProducer(ADDRESS);
+
+      ClientMessage message = session.createMessage(false);
+      
+      producer.send(message);
+
+      ClientConsumer consumer = session.createConsumer(QUEUE, null, false);
+      
+      session.start();
+      
+      ClientMessage received = consumer.receiveImmediate();
+
+      assertNotNull(received);
+      
+      received.acknowledge();
+      
+      received = consumer.receive(1);
+      
+      assertNull(received);      
+
+      session.close();
+
+      sf.close();
+   }
+
    private void doConsumerReceiveImmediateWithNoMessages(final boolean browser) throws Exception
    {
       sf = HornetQClient.createClientSessionFactory(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));



More information about the hornetq-commits mailing list