Author: timfox
Date: 2010-06-08 08:46:40 -0400 (Tue, 08 Jun 2010)
New Revision: 9301
Modified:
trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
trunk/tests/src/org/hornetq/tests/integration/client/ConsumerTest.java
Log:
https://jira.jboss.org/browse/HORNETQ-410
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2010-06-05
07:02:37 UTC (rev 9300)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2010-06-08
12:46:40 UTC (rev 9301)
@@ -453,13 +453,16 @@
}
public void receiveCredits(final int credits) throws Exception
- {
+ {
if (credits == -1)
{
// No flow control
availableCredits = null;
+
+ //There may be messages already in the queue
+ promptDelivery();
}
- else if(credits == 0)
+ else if (credits == 0)
{
//reset, used on slow consumers
availableCredits.set(0);
Modified: trunk/tests/src/org/hornetq/tests/integration/client/ConsumerTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/ConsumerTest.java 2010-06-05
07:02:37 UTC (rev 9300)
+++ trunk/tests/src/org/hornetq/tests/integration/client/ConsumerTest.java 2010-06-08
12:46:40 UTC (rev 9301)
@@ -66,41 +66,7 @@
super.tearDown();
}
-// public void testQueueSpin() throws Exception
-// {
-// ClientSessionFactory sf = createInVMFactory();
-//
-// ClientSession session1 = sf.createSession();
-//
-// ClientSession session2 = sf.createSession();
-//
-// session1.createQueue(QUEUE, QUEUE, null, false);
-//
-// ClientProducer producer = session1.createProducer(QUEUE);
-//
-// final int numMessages = 100;
-//
-// for (int i = 0; i < numMessages; i++)
-// {
-// ClientMessage message = createTextMessage("m" + i, session1);
-// producer.send(message);
-// }
-//
-// ClientConsumer consumer1 = session1.createConsumer(QUEUE);
-//
-// ClientConsumer consumer2 = session2.createConsumer(QUEUE, new
SimpleString("foo=wibble"));
-//
-// session2.start();
-//
-// consumer2.receive();
-//
-// Thread.sleep(30000);
-//
-// session1.close();
-//
-// session2.close();
-// }
-
+
public void testConsumerAckImmediateAutoCommitTrue() throws Exception
{
ClientSessionFactory sf = createInVMFactory();
@@ -301,7 +267,44 @@
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
sessionRec.close();
}
+
+ //
https://jira.jboss.org/browse/HORNETQ-410
+ public void testConsumeWithNoConsumerFlowControl() throws Exception
+ {
+ ClientSessionFactory sf = createInVMFactory();
+
+ sf.setConsumerWindowSize(-1);
+ ClientSession session = sf.createSession(false, true, true);
+
+ session.createQueue(QUEUE, QUEUE, null, false);
+
+ session.start();
+
+ ClientProducer producer = session.createProducer(QUEUE);
+
+ final int numMessages = 100;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = createTextMessage("m" + i, session);
+ producer.send(message);
+ }
+
+ ClientConsumer consumer = session.createConsumer(QUEUE);
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = consumer.receive(10000);
+ assertNotNull(message);
+ message.acknowledge();
+ }
+
+ session.close();
+ sf.close();
+
+ }
+
public void testClearListener() throws Exception
{
ClientSessionFactory sf = createInVMFactory();