[hornetq-commits] JBoss hornetq SVN: r9301 - in trunk: tests/src/org/hornetq/tests/integration/client and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Jun 8 08:46:41 EDT 2010


Author: timfox
Date: 2010-06-08 08:46:40 -0400 (Tue, 08 Jun 2010)
New Revision: 9301

Modified:
   trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
   trunk/tests/src/org/hornetq/tests/integration/client/ConsumerTest.java
Log:
https://jira.jboss.org/browse/HORNETQ-410

Modified: trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java	2010-06-05 07:02:37 UTC (rev 9300)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java	2010-06-08 12:46:40 UTC (rev 9301)
@@ -453,13 +453,16 @@
    }
 
    public void receiveCredits(final int credits) throws Exception
-   {
+   {      
       if (credits == -1)
       {
          // No flow control
          availableCredits = null;
+         
+         //There may be messages already in the queue
+         promptDelivery();
       }
-      else if(credits == 0)
+      else if (credits == 0)
       {
          //reset, used on slow consumers
          availableCredits.set(0);

Modified: trunk/tests/src/org/hornetq/tests/integration/client/ConsumerTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/ConsumerTest.java	2010-06-05 07:02:37 UTC (rev 9300)
+++ trunk/tests/src/org/hornetq/tests/integration/client/ConsumerTest.java	2010-06-08 12:46:40 UTC (rev 9301)
@@ -66,41 +66,7 @@
       super.tearDown();
    }
    
-//   public void testQueueSpin() throws Exception
-//   {
-//      ClientSessionFactory sf = createInVMFactory();
-//
-//      ClientSession session1 = sf.createSession();
-//      
-//      ClientSession session2 = sf.createSession();
-//
-//      session1.createQueue(QUEUE, QUEUE, null, false);
-//
-//      ClientProducer producer = session1.createProducer(QUEUE);
-//
-//      final int numMessages = 100;
-//
-//      for (int i = 0; i < numMessages; i++)
-//      {
-//         ClientMessage message = createTextMessage("m" + i, session1);
-//         producer.send(message);
-//      }
-//
-//      ClientConsumer consumer1 = session1.createConsumer(QUEUE);
-//      
-//      ClientConsumer consumer2 = session2.createConsumer(QUEUE, new SimpleString("foo=wibble"));
-//      
-//      session2.start();
-//      
-//      consumer2.receive();
-//      
-//      Thread.sleep(30000);
-//      
-//      session1.close();
-//      
-//      session2.close();
-//   }
-
+   
    public void testConsumerAckImmediateAutoCommitTrue() throws Exception
    {
       ClientSessionFactory sf = createInVMFactory();
@@ -301,7 +267,44 @@
       Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
       sessionRec.close();
    }
+   
+   // https://jira.jboss.org/browse/HORNETQ-410
+   public void testConsumeWithNoConsumerFlowControl() throws Exception
+   {
+      ClientSessionFactory sf = createInVMFactory();
+      
+      sf.setConsumerWindowSize(-1);
 
+      ClientSession session = sf.createSession(false, true, true);
+
+      session.createQueue(QUEUE, QUEUE, null, false);
+      
+      session.start();
+
+      ClientProducer producer = session.createProducer(QUEUE);
+
+      final int numMessages = 100;
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = createTextMessage("m" + i, session);
+         producer.send(message);
+      }
+            
+      ClientConsumer consumer = session.createConsumer(QUEUE);
+      
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = consumer.receive(10000);
+         assertNotNull(message);
+         message.acknowledge();
+      }
+      
+      session.close();
+      sf.close();
+      
+   }
+
    public void testClearListener() throws Exception
    {
       ClientSessionFactory sf = createInVMFactory();



More information about the hornetq-commits mailing list