Author: gaohoward
Date: 2010-09-29 21:06:26 -0400 (Wed, 29 Sep 2010)
New Revision: 9734
Modified:
trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
Log:
https://jira.jboss.org/browse/HORNETQ-522
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2010-09-29 08:47:43
UTC (rev 9733)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2010-09-30 01:06:26
UTC (rev 9734)
@@ -974,6 +974,13 @@
sendPacketWithoutLock(packet);
}
+ else
+ {
+ //https://jira.jboss.org/browse/HORNETQ-522
+ SessionConsumerFlowCreditMessage packet = new
SessionConsumerFlowCreditMessage(entry.getKey(),
+
1);
+ sendPacketWithoutLock(packet);
+ }
}
if ((!autoCommitAcks || !autoCommitSends) && workDone)
Modified:
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2010-09-29
08:47:43 UTC (rev 9733)
+++
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2010-09-30
01:06:26 UTC (rev 9734)
@@ -37,6 +37,7 @@
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.MessageHandler;
import org.hornetq.api.core.client.SessionFailureListener;
import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
import org.hornetq.core.client.impl.ClientSessionInternal;
@@ -87,6 +88,92 @@
}
}
+ //https://jira.jboss.org/browse/HORNETQ-522
+ public void testNonTransactedWithZeroConsumerWindowSize() throws Exception
+ {
+ ClientSessionFactoryInternal sf = getSessionFactory();
+
+ sf.setBlockOnNonDurableSend(true);
+ sf.setBlockOnDurableSend(true);
+
+ ClientSession session = sf.createSession(true, true);
+
+ session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null,
true);
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ class MyListener extends BaseListener
+ {
+ public void connectionFailed(final HornetQException me)
+ {
+ latch.countDown();
+ }
+ }
+
+ session.addFailureListener(new MyListener());
+
+ ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
+
+ final int numMessages = 100;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session.createMessage(true);
+
+ setBody(i, message);
+
+ message.putIntProperty("counter", i);
+
+ producer.send(message);
+ }
+
+ int winSize = 0;
+ ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS, null,
winSize, 100, false);
+
+ final List<ClientMessage> received = new ArrayList<ClientMessage>();
+
+ consumer.setMessageHandler(new MessageHandler() {
+
+ public void onMessage(ClientMessage message)
+ {
+ received.add(message);
+ try
+ {
+ Thread.sleep(20);
+ }
+ catch (InterruptedException e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ });
+
+ session.start();
+
+ fail(session, latch);
+
+ int retry = 0;
+ while (received.size() != numMessages)
+ {
+ Thread.sleep(1000);
+ retry++;
+ if (retry > 5)
+ {
+ break;
+ }
+ }
+
+ session.close();
+
+ Assert.assertTrue(retry <= 5);
+
+ Assert.assertEquals(0, sf.numSessions());
+
+ Assert.assertEquals(0, sf.numConnections());
+ }
+
public void testNonTransacted() throws Exception
{
ClientSessionFactoryInternal sf = getSessionFactory();