[hornetq-commits] JBoss hornetq SVN: r9724 - in trunk: tests/src/org/hornetq/tests/integration/client and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Sep 27 08:27:03 EDT 2010


Author: timfox
Date: 2010-09-27 08:27:02 -0400 (Mon, 27 Sep 2010)
New Revision: 9724

Modified:
   trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
   trunk/tests/src/org/hornetq/tests/integration/client/ReceiveImmediateTest.java
Log:
https://jira.jboss.org/browse/HORNETQ-450

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java	2010-09-27 09:37:14 UTC (rev 9723)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java	2010-09-27 12:27:02 UTC (rev 9724)
@@ -16,7 +16,6 @@
 import java.io.File;
 import java.util.Iterator;
 import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicLong;
 
 import org.hornetq.api.core.HornetQBuffers;
 import org.hornetq.api.core.HornetQException;
@@ -728,6 +727,7 @@
       {
          ClientConsumerImpl.log.trace("Adding Runner on Executor for delivery");
       }
+      
       sessionExecutor.execute(runner);
    }
 
@@ -805,6 +805,12 @@
 
          if (message != null)
          {
+            if (message.containsProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE))
+            {
+               //Ignore, this could be a relic from a previous receiveImmediate();
+               return;
+            }
+            
             boolean expired = message.isExpired();
 
             flowControlBeforeConsumption(message);
@@ -932,7 +938,6 @@
    {
       public void run()
       {
-
          try
          {
             callOnMessage();

Modified: trunk/tests/src/org/hornetq/tests/integration/client/ReceiveImmediateTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/ReceiveImmediateTest.java	2010-09-27 09:37:14 UTC (rev 9723)
+++ trunk/tests/src/org/hornetq/tests/integration/client/ReceiveImmediateTest.java	2010-09-27 12:27:02 UTC (rev 9724)
@@ -12,6 +12,8 @@
  */
 package org.hornetq.tests.integration.client;
 
+import java.util.concurrent.atomic.AtomicBoolean;
+
 import junit.framework.Assert;
 
 import org.hornetq.api.core.SimpleString;
@@ -22,6 +24,7 @@
 import org.hornetq.api.core.client.ClientSession;
 import org.hornetq.api.core.client.ClientSessionFactory;
 import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.MessageHandler;
 import org.hornetq.core.config.Configuration;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.server.HornetQServer;
@@ -153,7 +156,52 @@
 
       sf.close();
    }
+   
+   // https://jira.jboss.org/browse/HORNETQ-450
+   public void testReceivedImmediateFollowedByAsyncConsume() throws Exception
+   {
+      sf = HornetQClient.createClientSessionFactory(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));
+      sf.setBlockOnNonDurableSend(true);
 
+      ClientSession session = sf.createSession(false, true, true);
+
+      session.createQueue(ADDRESS, QUEUE, null, false);
+      
+      ClientProducer producer = session.createProducer(ADDRESS);
+
+      ClientMessage message = session.createMessage(false);
+      
+      producer.send(message);
+
+      ClientConsumer consumer = session.createConsumer(QUEUE, null, false);
+      
+      session.start();
+      
+      ClientMessage received = consumer.receiveImmediate();
+
+      assertNotNull(received);
+      
+      received.acknowledge();
+      
+      final AtomicBoolean receivedAsync = new AtomicBoolean(false);
+      
+      consumer.setMessageHandler(new MessageHandler()
+      {
+         public void onMessage(ClientMessage message)
+         {
+            receivedAsync.set(true);
+         }
+      });
+      
+      Thread.sleep(1000);
+                  
+      assertFalse(receivedAsync.get());      
+
+      session.close();
+
+      sf.close();
+   }
+
    private void doConsumerReceiveImmediateWithNoMessages(final boolean browser) throws Exception
    {
       sf = HornetQClient.createClientSessionFactory(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));



More information about the hornetq-commits mailing list