[hornetq-commits] JBoss hornetq SVN: r9734 - in trunk: tests/src/org/hornetq/tests/integration/cluster/failover and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Sep 29 21:06:27 EDT 2010


Author: gaohoward
Date: 2010-09-29 21:06:26 -0400 (Wed, 29 Sep 2010)
New Revision: 9734

Modified:
   trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
   trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
Log:
https://jira.jboss.org/browse/HORNETQ-522


Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2010-09-29 08:47:43 UTC (rev 9733)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2010-09-30 01:06:26 UTC (rev 9734)
@@ -974,6 +974,13 @@
 
                      sendPacketWithoutLock(packet);
                   }
+                  else
+                  {
+                     //https://jira.jboss.org/browse/HORNETQ-522
+                     SessionConsumerFlowCreditMessage packet = new SessionConsumerFlowCreditMessage(entry.getKey(),
+                                                                                                    1);
+                     sendPacketWithoutLock(packet);
+                  }
                }
 
                if ((!autoCommitAcks || !autoCommitSends) && workDone)

Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java	2010-09-29 08:47:43 UTC (rev 9733)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java	2010-09-30 01:06:26 UTC (rev 9734)
@@ -37,6 +37,7 @@
 import org.hornetq.api.core.client.ClientSession;
 import org.hornetq.api.core.client.ClientSessionFactory;
 import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.MessageHandler;
 import org.hornetq.api.core.client.SessionFailureListener;
 import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
 import org.hornetq.core.client.impl.ClientSessionInternal;
@@ -87,6 +88,92 @@
       }
    }
 
+   //https://jira.jboss.org/browse/HORNETQ-522
+   public void testNonTransactedWithZeroConsumerWindowSize() throws Exception
+   {
+      ClientSessionFactoryInternal sf = getSessionFactory();
+
+      sf.setBlockOnNonDurableSend(true);
+      sf.setBlockOnDurableSend(true);
+
+      ClientSession session = sf.createSession(true, true);
+
+      session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
+
+      final CountDownLatch latch = new CountDownLatch(1);
+
+      class MyListener extends BaseListener
+      {
+         public void connectionFailed(final HornetQException me)
+         {
+            latch.countDown();
+         }
+      }
+
+      session.addFailureListener(new MyListener());
+
+      ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
+
+      final int numMessages = 100;
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = session.createMessage(true);
+
+         setBody(i, message);
+
+         message.putIntProperty("counter", i);
+
+         producer.send(message);
+      }
+
+      int winSize = 0;
+      ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS, null, winSize, 100, false);
+      
+      final List<ClientMessage> received = new ArrayList<ClientMessage>();
+      
+      consumer.setMessageHandler(new MessageHandler() {
+
+         public void onMessage(ClientMessage message)
+         {
+            received.add(message);
+            try
+            {
+               Thread.sleep(20);
+            }
+            catch (InterruptedException e)
+            {
+               // TODO Auto-generated catch block
+               e.printStackTrace();
+            }
+         }
+         
+      });
+
+      session.start();
+      
+      fail(session, latch);
+      
+      int retry = 0;
+      while (received.size() != numMessages)
+      {
+         Thread.sleep(1000);
+         retry++;
+         if (retry > 5)
+         {
+            break;
+         }
+      }
+
+      session.close();
+      
+      Assert.assertTrue(retry <= 5);
+
+      Assert.assertEquals(0, sf.numSessions());
+
+      Assert.assertEquals(0, sf.numConnections());
+   }
+
    public void testNonTransacted() throws Exception
    {
       ClientSessionFactoryInternal sf = getSessionFactory();



More information about the hornetq-commits mailing list