[jboss-cvs] JBoss Messaging SVN: r6206 - trunk/tests/src/org/jboss/messaging/tests/integration/client.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Mar 27 18:56:39 EDT 2009


Author: clebert.suconic at jboss.com
Date: 2009-03-27 18:56:39 -0400 (Fri, 27 Mar 2009)
New Revision: 6206

Added:
   trunk/tests/src/org/jboss/messaging/tests/integration/client/ConsumerWindowSizeTest.java
Removed:
   trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientConsumerWindowSizeTest.java
Log:
Renaming test

Deleted: trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientConsumerWindowSizeTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientConsumerWindowSizeTest.java	2009-03-27 22:39:36 UTC (rev 6205)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientConsumerWindowSizeTest.java	2009-03-27 22:56:39 UTC (rev 6206)
@@ -1,856 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.messaging.tests.integration.client;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import org.jboss.messaging.core.client.ClientConsumer;
-import org.jboss.messaging.core.client.ClientMessage;
-import org.jboss.messaging.core.client.ClientProducer;
-import org.jboss.messaging.core.client.ClientSession;
-import org.jboss.messaging.core.client.ClientSessionFactory;
-import org.jboss.messaging.core.client.MessageHandler;
-import org.jboss.messaging.core.client.impl.ClientConsumerInternal;
-import org.jboss.messaging.core.server.MessagingService;
-import org.jboss.messaging.tests.util.ServiceTestBase;
-import org.jboss.messaging.utils.SimpleString;
-
-/**
- * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
- * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
- */
-public class ClientConsumerWindowSizeTest extends ServiceTestBase
-{
-   private final SimpleString addressA = new SimpleString("addressA");
-
-   private final SimpleString queueA = new SimpleString("queueA");
-
-   private final int TIMEOUT = 5;
-
-   /*
-   * tests send window size. we do this by having 2 receivers on the q. since we roundrobin the consumer for delivery we
-   * know if consumer 1 has received n messages then consumer 2 must have also have received n messages or at least up
-   * to its window size
-   * */
-   public void testSendWindowSize() throws Exception
-   {
-      MessagingService messagingService = createService(false);
-      ClientSessionFactory cf = createInVMFactory();
-      try
-      {
-         messagingService.start();
-         cf.setBlockOnNonPersistentSend(false);
-         ClientSession sendSession = cf.createSession(false, true, true);
-         ClientSession receiveSession = cf.createSession(false, true, true);
-         sendSession.createQueue(addressA, queueA, false);
-         ClientConsumer receivingConsumer = receiveSession.createConsumer(queueA);
-         ClientMessage cm = sendSession.createClientMessage(false);
-         cm.setDestination(addressA);
-         int encodeSize = cm.getEncodeSize();
-         int numMessage = 100;
-         cf.setConsumerWindowSize(numMessage * encodeSize);
-         ClientSession session = cf.createSession(false, true, true);
-         ClientProducer cp = sendSession.createProducer(addressA);
-         ClientConsumer cc = session.createConsumer(queueA);
-         session.start();
-         receiveSession.start();
-         for (int i = 0; i < numMessage * 4; i++)
-         {
-            cp.send(sendSession.createClientMessage(false));
-         }
-
-         for (int i = 0; i < numMessage * 2; i++)
-         {
-            ClientMessage m = receivingConsumer.receive(5000);
-            assertNotNull(m);
-            m.acknowledge();
-         }
-         receiveSession.close();
-         
-         for (int i = 0; i < numMessage * 2; i++)
-         {
-            ClientMessage m = cc.receive(5000);
-            assertNotNull(m);
-            m.acknowledge();
-         }
-         
-
-         session.close();
-         sendSession.close();
-         
-
-         assertEquals(0, getMessageCount(messagingService, queueA.toString()));
-
-      }
-      finally
-      {
-         if (messagingService.isStarted())
-         {
-            messagingService.stop();
-         }
-      }
-   }
-
-   public void testSlowConsumerBufferingOne() throws Exception
-   {
-      MessagingService service = createService(false);
-
-      ClientSession sessionB = null;
-      ClientSession session = null;
-
-      try
-      {
-         final int numberOfMessages = 100;
-
-         service.start();
-
-         ClientSessionFactory sf = createInVMFactory();
-         sf.setConsumerWindowSize(1);
-
-         session = sf.createSession(false, true, true);
-
-         SimpleString ADDRESS = addressA;
-
-         session.createQueue(ADDRESS, ADDRESS, true);
-
-         sessionB = sf.createSession(false, true, true);
-         sessionB.start();
-
-         session.start();
-
-         ClientConsumer consNeverUsed = sessionB.createConsumer(ADDRESS);
-
-         ClientConsumer cons1 = session.createConsumer(ADDRESS);
-
-         ClientProducer prod = session.createProducer(ADDRESS);
-
-         for (int i = 0; i < numberOfMessages; i++)
-         {
-            prod.send(createTextMessage(session, "Msg" + i));
-         }
-
-         for (int i = 0; i < numberOfMessages - 1; i++)
-         {
-            ClientMessage msg = cons1.receive(1000);
-            assertNotNull("expected message at i = " + i, msg);
-            msg.acknowledge();
-         }
-
-         ClientMessage msg = consNeverUsed.receive(500);
-         assertNotNull(msg);
-         msg.acknowledge();
-
-         session.close();
-         session = null;
-
-         sessionB.close();
-         sessionB = null;
-
-         assertEquals(0, getMessageCount(service, ADDRESS.toString()));
-
-      }
-      finally
-      {
-         try
-         {
-            if (session != null)
-               session.close();
-            if (sessionB != null)
-               sessionB.close();
-         }
-         catch (Exception ignored)
-         {
-         }
-
-         if (service.isStarted())
-         {
-            service.stop();
-         }
-      }
-   }
-
-   public void testSlowConsumerNoBuffer() throws Exception
-   {
-      internalTestSlowConsumerNoBuffer(false);
-   }
-
-   public void testSlowConsumerNoBufferLargeMessages() throws Exception
-   {
-      internalTestSlowConsumerNoBuffer(true);
-   }
-
-   private void internalTestSlowConsumerNoBuffer(boolean largeMessages) throws Exception
-   {
-      MessagingService service = createService(false);
-
-      ClientSession sessionB = null;
-      ClientSession session = null;
-
-      try
-      {
-         final int numberOfMessages = 100;
-
-         service.start();
-
-         ClientSessionFactory sf = createInVMFactory();
-         sf.setConsumerWindowSize(0);
-
-         if (largeMessages)
-         {
-            sf.setMinLargeMessageSize(100);
-         }
-
-         session = sf.createSession(false, true, true);
-
-         SimpleString ADDRESS = addressA;
-
-         session.createQueue(ADDRESS, ADDRESS, true);
-
-         sessionB = sf.createSession(false, true, true);
-         sessionB.start();
-
-         session.start();
-
-         ClientConsumerInternal consNeverUsed = (ClientConsumerInternal)sessionB.createConsumer(ADDRESS);
-
-         ClientConsumer cons1 = session.createConsumer(ADDRESS);
-
-         ClientProducer prod = session.createProducer(ADDRESS);
-
-         for (int i = 0; i < numberOfMessages; i++)
-         {
-            ClientMessage msg = createTextMessage(session, "Msg" + i);
-
-            if (largeMessages)
-            {
-               msg.getBody().writeBytes(new byte[600]);
-            }
-
-            prod.send(msg);
-         }
-
-         for (int i = 0; i < numberOfMessages; i++)
-         {
-            ClientMessage msg = cons1.receive(1000);
-            assertNotNull("expected message at i = " + i, msg);
-            assertEquals("Msg" + i, getTextMessage(msg));
-            msg.acknowledge();
-         }
-
-         assertEquals(0, consNeverUsed.getBufferSize());
-
-         session.close();
-         session = null;
-
-         sessionB.close();
-         sessionB = null;
-
-         assertEquals(0, getMessageCount(service, ADDRESS.toString()));
-
-      }
-      finally
-      {
-         try
-         {
-            if (session != null)
-               session.close();
-            if (sessionB != null)
-               sessionB.close();
-         }
-         catch (Exception ignored)
-         {
-         }
-
-         if (service.isStarted())
-         {
-            service.stop();
-         }
-      }
-   }
-
-   public void testSlowConsumerNoBuffer2() throws Exception
-   {
-      internalTestSlowConsumerNoBuffer2(false);
-   }
-
-   public void testSlowConsumerNoBuffer2LargeMessages() throws Exception
-   {
-      internalTestSlowConsumerNoBuffer2(true);
-   }
-
-   private void internalTestSlowConsumerNoBuffer2(final boolean largeMessages) throws Exception
-   {
-      MessagingService service = createService(false);
-
-      ClientSession session1 = null;
-      ClientSession session2 = null;
-
-      try
-      {
-         final int numberOfMessages = 100;
-
-         service.start();
-
-         ClientSessionFactory sf = createInVMFactory();
-
-         sf.setConsumerWindowSize(0);
-
-         if (largeMessages)
-         {
-            sf.setMinLargeMessageSize(100);
-         }
-
-         session1 = sf.createSession(false, true, true);
-
-         session2 = sf.createSession(false, true, true);
-
-         session1.start();
-
-         session2.start();
-
-         SimpleString ADDRESS = new SimpleString("some-queue");
-
-         session1.createQueue(ADDRESS, ADDRESS, true);
-
-         ClientConsumerInternal cons1 = (ClientConsumerInternal)session1.createConsumer(ADDRESS);
-
-         // Note we make sure we send the messages *before* cons2 is created
-
-         ClientProducer prod = session1.createProducer(ADDRESS);
-
-         for (int i = 0; i < numberOfMessages; i++)
-         {
-            ClientMessage msg = createTextMessage(session1, "Msg" + i);
-            if (largeMessages)
-            {
-               msg.getBody().writeBytes(new byte[600]);
-            }
-            prod.send(msg);
-         }
-
-         ClientConsumerInternal cons2 = (ClientConsumerInternal)session2.createConsumer(ADDRESS);
-
-         for (int i = 0; i < numberOfMessages / 2; i++)
-         {
-            ClientMessage msg = cons1.receive(1000);
-            assertNotNull("expected message at i = " + i, msg);
-
-            String str = getTextMessage(msg);
-            assertEquals("Msg" + i, str);
-
-            msg.acknowledge();
-
-            assertEquals("A slow consumer shouldn't buffer anything on the client side!", 0, cons1.getBufferSize());
-         }
-
-         for (int i = numberOfMessages / 2; i < numberOfMessages; i++)
-         {
-            ClientMessage msg = cons2.receive(1000);
-
-            assertNotNull("expected message at i = " + i, msg);
-
-            assertEquals("Msg" + i, msg.getBody().readString());
-
-            msg.acknowledge();
-
-            assertEquals("A slow consumer shouldn't buffer anything on the client side!", 0, cons2.getBufferSize());
-         }
-
-         session1.commit(); // just to make sure everything is flushed and no pending packets on the sending buffer, or
-         // the getMessageCount would fail
-         session2.commit();
-
-         assertEquals(0, getMessageCount(service, ADDRESS.toString()));
-
-         // This should also work the other way around
-
-         cons1.close();
-
-         cons2.close();
-
-         cons1 = (ClientConsumerInternal)session1.createConsumer(ADDRESS);
-
-         // Note we make sure we send the messages *before* cons2 is created
-
-         for (int i = 0; i < numberOfMessages; i++)
-         {
-            ClientMessage msg = createTextMessage(session1, "Msg" + i);
-            if (largeMessages)
-            {
-               msg.getBody().writeBytes(new byte[600]);
-            }
-            prod.send(msg);
-         }
-
-         cons2 = (ClientConsumerInternal)session2.createConsumer(ADDRESS);
-
-         // Now we receive on cons2 first
-
-         for (int i = 0; i < numberOfMessages / 2; i++)
-         {
-            ClientMessage msg = cons2.receive(1000);
-            assertNotNull("expected message at i = " + i, msg);
-
-            assertEquals("Msg" + i, msg.getBody().readString());
-
-            msg.acknowledge();
-
-            assertEquals("A slow consumer shouldn't buffer anything on the client side!", 0, cons2.getBufferSize());
-
-         }
-
-         for (int i = numberOfMessages / 2; i < numberOfMessages; i++)
-         {
-            ClientMessage msg = cons1.receive(1000);
-
-            assertNotNull("expected message at i = " + i, msg);
-
-            assertEquals("Msg" + i, msg.getBody().readString());
-
-            msg.acknowledge();
-
-            assertEquals("A slow consumer shouldn't buffer anything on the client side!", 0, cons1.getBufferSize());
-         }
-
-         session1.commit(); // just to make sure everything is flushed and no pending packets on the sending buffer, or
-         // the getMessageCount would fail
-         session2.commit();
-
-         assertEquals(0, getMessageCount(service, ADDRESS.toString()));
-
-      }
-      finally
-      {
-         try
-         {
-            if (session1 != null)
-               session1.close();
-            if (session2 != null)
-               session2.close();
-         }
-         catch (Exception ignored)
-         {
-         }
-
-         if (service.isStarted())
-         {
-            service.stop();
-         }
-      }
-   }
-
-   public void testSlowConsumerOnMessageHandlerNoBuffers() throws Exception
-   {
-      MessagingService service = createService(false);
-
-      ClientSession sessionB = null;
-      ClientSession session = null;
-
-      try
-      {
-         final int numberOfMessages = 100;
-
-         service.start();
-
-         ClientSessionFactory sf = createInVMFactory();
-         sf.setConsumerWindowSize(0);
-
-         session = sf.createSession(false, true, true);
-
-         SimpleString ADDRESS = new SimpleString("some-queue");
-
-         session.createQueue(ADDRESS, ADDRESS, true);
-
-         sessionB = sf.createSession(false, true, true);
-         sessionB.start();
-
-         session.start();
-
-         ClientConsumerInternal consReceiveOneAndHold = (ClientConsumerInternal)sessionB.createConsumer(ADDRESS);
-
-         final CountDownLatch latchReceived = new CountDownLatch(2);
-
-         final CountDownLatch latchDone = new CountDownLatch(1);
-
-         // It should receive two messages and then give up
-         class LocalHandler implements MessageHandler
-         {
-            boolean failed = false;
-
-            int count = 0;
-
-            /* (non-Javadoc)
-             * @see org.jboss.messaging.core.client.MessageHandler#onMessage(org.jboss.messaging.core.client.ClientMessage)
-             */
-            public synchronized void onMessage(ClientMessage message)
-            {
-               try
-               {
-                  String str = getTextMessage(message);
-
-                  failed = failed || !str.equals("Msg" + count);
-
-                  message.acknowledge();
-                  latchReceived.countDown();
-
-                  if (count++ == 1)
-                  {
-                     // it will hold here for a while
-                     if (!latchDone.await(TIMEOUT, TimeUnit.SECONDS)) // a timed wait, so if the test fails, one less
-                                                                      // thread around
-                     {
-                        new Exception("ClientConsuemrWindowSizeTest Handler couldn't receive signal in less than 5 seconds").printStackTrace(); // hudson
-                                                                                                                                                // or
-                                                                                                                                                // junit
-                                                                                                                                                // report
-                        failed = true;
-                     }
-                  }
-               }
-               catch (Exception e)
-               {
-                  e.printStackTrace(); // Hudson / JUnit report
-                  failed = true;
-               }
-            }
-         }
-
-         LocalHandler handler = new LocalHandler();
-
-         ClientConsumer cons1 = session.createConsumer(ADDRESS);
-
-         ClientProducer prod = session.createProducer(ADDRESS);
-
-         for (int i = 0; i < numberOfMessages; i++)
-         {
-            prod.send(createTextMessage(session, "Msg" + i));
-         }
-
-         consReceiveOneAndHold.setMessageHandler(handler);
-
-         assertTrue(latchReceived.await(TIMEOUT, TimeUnit.SECONDS));
-
-         assertEquals(0, consReceiveOneAndHold.getBufferSize());
-
-         for (int i = 2; i < numberOfMessages; i++)
-         {
-            ClientMessage msg = cons1.receive(1000);
-            assertNotNull("expected message at i = " + i, msg);
-            assertEquals("Msg" + i, getTextMessage(msg));
-            msg.acknowledge();
-         }
-
-         assertEquals(0, consReceiveOneAndHold.getBufferSize());
-
-         latchDone.countDown();
-
-         session.close();
-         session = null;
-
-         sessionB.close();
-         sessionB = null;
-
-         assertEquals(0, getMessageCount(service, ADDRESS.toString()));
-
-         assertFalse("MessageHandler received a failure", handler.failed);
-
-      }
-      finally
-      {
-         try
-         {
-            if (session != null)
-               session.close();
-            if (sessionB != null)
-               sessionB.close();
-         }
-         catch (Exception ignored)
-         {
-         }
-
-         if (service.isStarted())
-         {
-            service.stop();
-         }
-      }
-   }
-
-   public void testSlowConsumerOnMessageHandlerBufferOne() throws Exception
-   {
-      internalTestSlowConsumerOnMessageHandlerBufferOne(false);
-   }
-
-   public void testSlowConsumerOnMessageHandlerBufferOneLargeMessages() throws Exception
-   {
-      internalTestSlowConsumerOnMessageHandlerBufferOne(true);
-   }
-   
-   
-   private void internalTestSlowConsumerOnMessageHandlerBufferOne(boolean largeMessage) throws Exception
-   {
-      MessagingService service = createService(false);
-
-      ClientSession sessionB = null;
-      ClientSession session = null;
-
-      try
-      {
-         final int numberOfMessages = 100;
-
-         service.start();
-
-         ClientSessionFactory sf = createInVMFactory();
-         sf.setConsumerWindowSize(1);
-
-         if (largeMessage)
-         {
-            sf.setMinLargeMessageSize(100);
-         }
-
-         session = sf.createSession(false, true, true);
-
-         SimpleString ADDRESS = new SimpleString("some-queue");
-
-         session.createQueue(ADDRESS, ADDRESS, true);
-
-         sessionB = sf.createSession(false, true, true);
-         sessionB.start();
-
-         session.start();
-
-         ClientConsumerInternal consReceiveOneAndHold = (ClientConsumerInternal)sessionB.createConsumer(ADDRESS);
-
-         final CountDownLatch latchReceived = new CountDownLatch(2);
-         final CountDownLatch latchReceivedBuffered = new CountDownLatch(3);
-
-         final CountDownLatch latchDone = new CountDownLatch(1);
-
-         // It should receive two messages and then give up
-         class LocalHandler implements MessageHandler
-         {
-            boolean failed = false;
-
-            int count = 0;
-
-            /* (non-Javadoc)
-             * @see org.jboss.messaging.core.client.MessageHandler#onMessage(org.jboss.messaging.core.client.ClientMessage)
-             */
-            public synchronized void onMessage(ClientMessage message)
-            {
-               try
-               {
-                  String str = getTextMessage(message);
-
-                  System.out.println("Received " + str);
-
-                  failed = failed || !str.equals("Msg" + count);
-
-                  message.acknowledge();
-                  latchReceived.countDown();
-                  latchReceivedBuffered.countDown();
-
-                  if (count++ == 1)
-                  {
-                     // it will hold here for a while
-                     if (!latchDone.await(TIMEOUT, TimeUnit.SECONDS))
-                     {
-                        new Exception("ClientConsuemrWindowSizeTest Handler couldn't receive signal in less than 5 seconds").printStackTrace(); // hudson
-                                                                                                                                                // or
-                                                                                                                                                // junit
-                                                                                                                                                // report
-                        failed = true;
-                     }
-                  }
-               }
-               catch (Exception e)
-               {
-                  e.printStackTrace(); // Hudson / JUnit report
-                  failed = true;
-               }
-            }
-         }
-
-         LocalHandler handler = new LocalHandler();
-
-         ClientProducer prod = session.createProducer(ADDRESS);
-
-         for (int i = 0; i < numberOfMessages; i++)
-         {
-            ClientMessage msg = createTextMessage(session, "Msg" + i);
-            if (largeMessage)
-            {
-               msg.getBody().writeBytes(new byte[600]);
-            }
-            prod.send(msg);
-         }
-
-         consReceiveOneAndHold.setMessageHandler(handler);
-
-         assertTrue(latchReceived.await(TIMEOUT, TimeUnit.SECONDS));
-
-         ClientConsumer cons1 = session.createConsumer(ADDRESS);
-
-         for (int i = 3; i < numberOfMessages; i++)
-         {
-            ClientMessage msg = cons1.receive(1000);
-            assertNotNull("expected message at i = " + i, msg);
-            assertEquals("Msg" + i, getTextMessage(msg));
-            msg.acknowledge();
-         }
-
-         latchDone.countDown();
-         
-         assertTrue(latchReceivedBuffered.await(TIMEOUT, TimeUnit.SECONDS));
-
-         session.close();
-         session = null;
-
-         sessionB.close();
-         sessionB = null;
-
-         assertEquals(0, getMessageCount(service, ADDRESS.toString()));
-
-         assertFalse("MessageHandler received a failure", handler.failed);
-
-      }
-      finally
-      {
-         try
-         {
-            if (session != null)
-               session.close();
-            if (sessionB != null)
-               sessionB.close();
-         }
-         catch (Exception ignored)
-         {
-         }
-
-         if (service.isStarted())
-         {
-            service.stop();
-         }
-      }
-   }
-   
-   public void testNoWindowRoundRobin() throws Exception
-   {
-      testNoWindowRoundRobin(false);
-   }
-   
-   
-   public void testNoWindowRoundRobinLargeMessage() throws Exception
-   {
-      testNoWindowRoundRobin(true);
-   }
-   
-   private void testNoWindowRoundRobin(final boolean largeMessages) throws Exception
-   {
-      
-      MessagingService service = createService(false);
-
-      ClientSession sessionA = null;
-      ClientSession sessionB = null;
-
-      try
-      {
-         final int numberOfMessages = 100;
-
-         service.start();
-
-         ClientSessionFactory sf = createInVMFactory();
-         sf.setConsumerWindowSize(-1);
-         
-         if (largeMessages)
-         {
-            sf.setMinLargeMessageSize(100);
-         }
-
-         sessionA = sf.createSession(false, true, true);
-
-         SimpleString ADDRESS = new SimpleString("some-queue");
-
-         sessionA.createQueue(ADDRESS, ADDRESS, true);
-
-         sessionB = sf.createSession(false, true, true);
-
-         sessionA.start();
-         sessionB.start();
-
-         ClientConsumerInternal consA = (ClientConsumerInternal)sessionA.createConsumer(ADDRESS);
-
-         ClientConsumerInternal consB = (ClientConsumerInternal)sessionB.createConsumer(ADDRESS);
-
-         ClientProducer prod = sessionA.createProducer(ADDRESS);
-
-         for (int i = 0; i < numberOfMessages; i++)
-         {
-            ClientMessage msg = createTextMessage(sessionA, "Msg" + i);
-            if (largeMessages)
-            {
-               msg.getBody().writeBytes(new byte[600]);
-            }
-            prod.send(msg);
-         }
-         
-         
-         long timeout = System.currentTimeMillis() + TIMEOUT * 1000;
-         
-         boolean foundA = false;
-         boolean foundB = false;
-         
-         do
-         {
-            foundA = consA.getBufferSize() == numberOfMessages / 2;
-            foundB = consB.getBufferSize() == numberOfMessages / 2;
-            
-            Thread.sleep(10);
-         } while ((!foundA || !foundB) && System.currentTimeMillis() < timeout);
-         
-         
-         assertTrue("ConsumerA didn't receive the expected number of messages on buffer (consA=" + consA.getBufferSize() + ", consB=" + consB.getBufferSize() + ") foundA = " + foundA + " foundB = " + foundB, foundA);
-         assertTrue("ConsumerB didn't receive the expected number of messages on buffer (consA=" + consA.getBufferSize() + ", consB=" + consB.getBufferSize() + ") foundA = " + foundA + " foundB = " + foundB, foundB);
-         
-
-      }
-      finally
-      {
-         try
-         {
-            if (sessionA != null)
-               sessionA.close();
-            if (sessionB != null)
-               sessionB.close();
-         }
-         catch (Exception ignored)
-         {
-         }
-
-         if (service.isStarted())
-         {
-            service.stop();
-         }
-      }
-   }
-
-}

Copied: trunk/tests/src/org/jboss/messaging/tests/integration/client/ConsumerWindowSizeTest.java (from rev 6205, trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientConsumerWindowSizeTest.java)
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/ConsumerWindowSizeTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/ConsumerWindowSizeTest.java	2009-03-27 22:56:39 UTC (rev 6206)
@@ -0,0 +1,888 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.tests.integration.client;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.MessageHandler;
+import org.jboss.messaging.core.client.impl.ClientConsumerInternal;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.tests.util.ServiceTestBase;
+import org.jboss.messaging.utils.SimpleString;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ */
+public class ConsumerWindowSizeTest extends ServiceTestBase
+{
+   private final SimpleString addressA = new SimpleString("addressA");
+
+   private final SimpleString queueA = new SimpleString("queueA");
+
+   private final int TIMEOUT = 5;
+
+   /*
+   * tests send window size. we do this by having 2 receivers on the q. since we roundrobin the consumer for delivery we
+   * know if consumer 1 has received n messages then consumer 2 must have also have received n messages or at least up
+   * to its window size
+   * */
+   public void testSendWindowSize() throws Exception
+   {
+      MessagingService messagingService = createService(false);
+      ClientSessionFactory cf = createInVMFactory();
+      try
+      {
+         messagingService.start();
+         cf.setBlockOnNonPersistentSend(false);
+         ClientSession sendSession = cf.createSession(false, true, true);
+         ClientSession receiveSession = cf.createSession(false, true, true);
+         sendSession.createQueue(addressA, queueA, false);
+         ClientConsumer receivingConsumer = receiveSession.createConsumer(queueA);
+         ClientMessage cm = sendSession.createClientMessage(false);
+         cm.setDestination(addressA);
+         int encodeSize = cm.getEncodeSize();
+         int numMessage = 100;
+         cf.setConsumerWindowSize(numMessage * encodeSize);
+         ClientSession session = cf.createSession(false, true, true);
+         ClientProducer cp = sendSession.createProducer(addressA);
+         ClientConsumer cc = session.createConsumer(queueA);
+         session.start();
+         receiveSession.start();
+         for (int i = 0; i < numMessage * 4; i++)
+         {
+            cp.send(sendSession.createClientMessage(false));
+         }
+
+         for (int i = 0; i < numMessage * 2; i++)
+         {
+            ClientMessage m = receivingConsumer.receive(5000);
+            assertNotNull(m);
+            m.acknowledge();
+         }
+         receiveSession.close();
+
+         for (int i = 0; i < numMessage * 2; i++)
+         {
+            ClientMessage m = cc.receive(5000);
+            assertNotNull(m);
+            m.acknowledge();
+         }
+
+         session.close();
+         sendSession.close();
+
+         assertEquals(0, getMessageCount(messagingService, queueA.toString()));
+
+      }
+      finally
+      {
+         if (messagingService.isStarted())
+         {
+            messagingService.stop();
+         }
+      }
+   }
+
+   public void testSlowConsumerBufferingOne() throws Exception
+   {
+      MessagingService service = createService(false);
+
+      ClientSession sessionB = null;
+      ClientSession session = null;
+
+      try
+      {
+         final int numberOfMessages = 100;
+
+         service.start();
+
+         ClientSessionFactory sf = createInVMFactory();
+         sf.setConsumerWindowSize(1);
+
+         session = sf.createSession(false, true, true);
+
+         SimpleString ADDRESS = addressA;
+
+         session.createQueue(ADDRESS, ADDRESS, true);
+
+         sessionB = sf.createSession(false, true, true);
+         sessionB.start();
+
+         session.start();
+
+         ClientConsumer consNeverUsed = sessionB.createConsumer(ADDRESS);
+
+         ClientConsumer cons1 = session.createConsumer(ADDRESS);
+
+         ClientProducer prod = session.createProducer(ADDRESS);
+
+         for (int i = 0; i < numberOfMessages; i++)
+         {
+            prod.send(createTextMessage(session, "Msg" + i));
+         }
+
+         for (int i = 0; i < numberOfMessages - 1; i++)
+         {
+            ClientMessage msg = cons1.receive(1000);
+            assertNotNull("expected message at i = " + i, msg);
+            msg.acknowledge();
+         }
+
+         ClientMessage msg = consNeverUsed.receive(500);
+         assertNotNull(msg);
+         msg.acknowledge();
+
+         session.close();
+         session = null;
+
+         sessionB.close();
+         sessionB = null;
+
+         assertEquals(0, getMessageCount(service, ADDRESS.toString()));
+
+      }
+      finally
+      {
+         try
+         {
+            if (session != null)
+            {
+               session.close();
+            }
+            if (sessionB != null)
+            {
+               sessionB.close();
+            }
+         }
+         catch (Exception ignored)
+         {
+         }
+
+         if (service.isStarted())
+         {
+            service.stop();
+         }
+      }
+   }
+
+   public void testSlowConsumerNoBuffer() throws Exception
+   {
+      internalTestSlowConsumerNoBuffer(false);
+   }
+
+   public void testSlowConsumerNoBufferLargeMessages() throws Exception
+   {
+      internalTestSlowConsumerNoBuffer(true);
+   }
+
+   private void internalTestSlowConsumerNoBuffer(final boolean largeMessages) throws Exception
+   {
+      MessagingService service = createService(false);
+
+      ClientSession sessionB = null;
+      ClientSession session = null;
+
+      try
+      {
+         final int numberOfMessages = 100;
+
+         service.start();
+
+         ClientSessionFactory sf = createInVMFactory();
+         sf.setConsumerWindowSize(0);
+
+         if (largeMessages)
+         {
+            sf.setMinLargeMessageSize(100);
+         }
+
+         session = sf.createSession(false, true, true);
+
+         SimpleString ADDRESS = addressA;
+
+         session.createQueue(ADDRESS, ADDRESS, true);
+
+         sessionB = sf.createSession(false, true, true);
+         sessionB.start();
+
+         session.start();
+
+         ClientConsumerInternal consNeverUsed = (ClientConsumerInternal)sessionB.createConsumer(ADDRESS);
+
+         ClientConsumer cons1 = session.createConsumer(ADDRESS);
+
+         ClientProducer prod = session.createProducer(ADDRESS);
+
+         for (int i = 0; i < numberOfMessages; i++)
+         {
+            ClientMessage msg = createTextMessage(session, "Msg" + i);
+
+            if (largeMessages)
+            {
+               msg.getBody().writeBytes(new byte[600]);
+            }
+
+            prod.send(msg);
+         }
+
+         for (int i = 0; i < numberOfMessages; i++)
+         {
+            ClientMessage msg = cons1.receive(1000);
+            assertNotNull("expected message at i = " + i, msg);
+            assertEquals("Msg" + i, getTextMessage(msg));
+            msg.acknowledge();
+         }
+
+         assertEquals(0, consNeverUsed.getBufferSize());
+
+         session.close();
+         session = null;
+
+         sessionB.close();
+         sessionB = null;
+
+         assertEquals(0, getMessageCount(service, ADDRESS.toString()));
+
+      }
+      finally
+      {
+         try
+         {
+            if (session != null)
+            {
+               session.close();
+            }
+            if (sessionB != null)
+            {
+               sessionB.close();
+            }
+         }
+         catch (Exception ignored)
+         {
+         }
+
+         if (service.isStarted())
+         {
+            service.stop();
+         }
+      }
+   }
+
+   public void testSlowConsumerNoBuffer2() throws Exception
+   {
+      internalTestSlowConsumerNoBuffer2(false);
+   }
+
+   public void testSlowConsumerNoBuffer2LargeMessages() throws Exception
+   {
+      internalTestSlowConsumerNoBuffer2(true);
+   }
+
+   private void internalTestSlowConsumerNoBuffer2(final boolean largeMessages) throws Exception
+   {
+      MessagingService service = createService(false);
+
+      ClientSession session1 = null;
+      ClientSession session2 = null;
+
+      try
+      {
+         final int numberOfMessages = 100;
+
+         service.start();
+
+         ClientSessionFactory sf = createInVMFactory();
+
+         sf.setConsumerWindowSize(0);
+
+         if (largeMessages)
+         {
+            sf.setMinLargeMessageSize(100);
+         }
+
+         session1 = sf.createSession(false, true, true);
+
+         session2 = sf.createSession(false, true, true);
+
+         session1.start();
+
+         session2.start();
+
+         SimpleString ADDRESS = new SimpleString("some-queue");
+
+         session1.createQueue(ADDRESS, ADDRESS, true);
+
+         ClientConsumerInternal cons1 = (ClientConsumerInternal)session1.createConsumer(ADDRESS);
+
+         // Note we make sure we send the messages *before* cons2 is created
+
+         ClientProducer prod = session1.createProducer(ADDRESS);
+
+         for (int i = 0; i < numberOfMessages; i++)
+         {
+            ClientMessage msg = createTextMessage(session1, "Msg" + i);
+            if (largeMessages)
+            {
+               msg.getBody().writeBytes(new byte[600]);
+            }
+            prod.send(msg);
+         }
+
+         ClientConsumerInternal cons2 = (ClientConsumerInternal)session2.createConsumer(ADDRESS);
+
+         for (int i = 0; i < numberOfMessages / 2; i++)
+         {
+            ClientMessage msg = cons1.receive(1000);
+            assertNotNull("expected message at i = " + i, msg);
+
+            String str = getTextMessage(msg);
+            assertEquals("Msg" + i, str);
+
+            msg.acknowledge();
+
+            assertEquals("A slow consumer shouldn't buffer anything on the client side!", 0, cons1.getBufferSize());
+         }
+
+         for (int i = numberOfMessages / 2; i < numberOfMessages; i++)
+         {
+            ClientMessage msg = cons2.receive(1000);
+
+            assertNotNull("expected message at i = " + i, msg);
+
+            assertEquals("Msg" + i, msg.getBody().readString());
+
+            msg.acknowledge();
+
+            assertEquals("A slow consumer shouldn't buffer anything on the client side!", 0, cons2.getBufferSize());
+         }
+
+         session1.commit(); // just to make sure everything is flushed and no pending packets on the sending buffer, or
+         // the getMessageCount would fail
+         session2.commit();
+
+         assertEquals(0, getMessageCount(service, ADDRESS.toString()));
+
+         // This should also work the other way around
+
+         cons1.close();
+
+         cons2.close();
+
+         cons1 = (ClientConsumerInternal)session1.createConsumer(ADDRESS);
+
+         // Note we make sure we send the messages *before* cons2 is created
+
+         for (int i = 0; i < numberOfMessages; i++)
+         {
+            ClientMessage msg = createTextMessage(session1, "Msg" + i);
+            if (largeMessages)
+            {
+               msg.getBody().writeBytes(new byte[600]);
+            }
+            prod.send(msg);
+         }
+
+         cons2 = (ClientConsumerInternal)session2.createConsumer(ADDRESS);
+
+         // Now we receive on cons2 first
+
+         for (int i = 0; i < numberOfMessages / 2; i++)
+         {
+            ClientMessage msg = cons2.receive(1000);
+            assertNotNull("expected message at i = " + i, msg);
+
+            assertEquals("Msg" + i, msg.getBody().readString());
+
+            msg.acknowledge();
+
+            assertEquals("A slow consumer shouldn't buffer anything on the client side!", 0, cons2.getBufferSize());
+
+         }
+
+         for (int i = numberOfMessages / 2; i < numberOfMessages; i++)
+         {
+            ClientMessage msg = cons1.receive(1000);
+
+            assertNotNull("expected message at i = " + i, msg);
+
+            assertEquals("Msg" + i, msg.getBody().readString());
+
+            msg.acknowledge();
+
+            assertEquals("A slow consumer shouldn't buffer anything on the client side!", 0, cons1.getBufferSize());
+         }
+
+         session1.commit(); // just to make sure everything is flushed and no pending packets on the sending buffer, or
+         // the getMessageCount would fail
+         session2.commit();
+
+         assertEquals(0, getMessageCount(service, ADDRESS.toString()));
+
+      }
+      finally
+      {
+         try
+         {
+            if (session1 != null)
+            {
+               session1.close();
+            }
+            if (session2 != null)
+            {
+               session2.close();
+            }
+         }
+         catch (Exception ignored)
+         {
+         }
+
+         if (service.isStarted())
+         {
+            service.stop();
+         }
+      }
+   }
+
+   public void testSlowConsumerOnMessageHandlerNoBuffers() throws Exception
+   {
+      MessagingService service = createService(false);
+
+      ClientSession sessionB = null;
+      ClientSession session = null;
+
+      try
+      {
+         final int numberOfMessages = 100;
+
+         service.start();
+
+         ClientSessionFactory sf = createInVMFactory();
+         sf.setConsumerWindowSize(0);
+
+         session = sf.createSession(false, true, true);
+
+         SimpleString ADDRESS = new SimpleString("some-queue");
+
+         session.createQueue(ADDRESS, ADDRESS, true);
+
+         sessionB = sf.createSession(false, true, true);
+         sessionB.start();
+
+         session.start();
+
+         ClientConsumerInternal consReceiveOneAndHold = (ClientConsumerInternal)sessionB.createConsumer(ADDRESS);
+
+         final CountDownLatch latchReceived = new CountDownLatch(2);
+
+         final CountDownLatch latchDone = new CountDownLatch(1);
+
+         // It should receive two messages and then give up
+         class LocalHandler implements MessageHandler
+         {
+            boolean failed = false;
+
+            int count = 0;
+
+            /* (non-Javadoc)
+             * @see org.jboss.messaging.core.client.MessageHandler#onMessage(org.jboss.messaging.core.client.ClientMessage)
+             */
+            public synchronized void onMessage(final ClientMessage message)
+            {
+               try
+               {
+                  String str = getTextMessage(message);
+
+                  failed = failed || !str.equals("Msg" + count);
+
+                  message.acknowledge();
+                  latchReceived.countDown();
+
+                  if (count++ == 1)
+                  {
+                     // it will hold here for a while
+                     if (!latchDone.await(TIMEOUT, TimeUnit.SECONDS)) // a timed wait, so if the test fails, one less
+                     // thread around
+                     {
+                        new Exception("ClientConsuemrWindowSizeTest Handler couldn't receive signal in less than 5 seconds").printStackTrace(); // hudson
+                        // or
+                        // junit
+                        // report
+                        failed = true;
+                     }
+                  }
+               }
+               catch (Exception e)
+               {
+                  e.printStackTrace(); // Hudson / JUnit report
+                  failed = true;
+               }
+            }
+         }
+
+         LocalHandler handler = new LocalHandler();
+
+         ClientConsumer cons1 = session.createConsumer(ADDRESS);
+
+         ClientProducer prod = session.createProducer(ADDRESS);
+
+         for (int i = 0; i < numberOfMessages; i++)
+         {
+            prod.send(createTextMessage(session, "Msg" + i));
+         }
+
+         consReceiveOneAndHold.setMessageHandler(handler);
+
+         assertTrue(latchReceived.await(TIMEOUT, TimeUnit.SECONDS));
+
+         assertEquals(0, consReceiveOneAndHold.getBufferSize());
+
+         for (int i = 2; i < numberOfMessages; i++)
+         {
+            ClientMessage msg = cons1.receive(1000);
+            assertNotNull("expected message at i = " + i, msg);
+            assertEquals("Msg" + i, getTextMessage(msg));
+            msg.acknowledge();
+         }
+
+         assertEquals(0, consReceiveOneAndHold.getBufferSize());
+
+         latchDone.countDown();
+
+         session.close();
+         session = null;
+
+         sessionB.close();
+         sessionB = null;
+
+         assertEquals(0, getMessageCount(service, ADDRESS.toString()));
+
+         assertFalse("MessageHandler received a failure", handler.failed);
+
+      }
+      finally
+      {
+         try
+         {
+            if (session != null)
+            {
+               session.close();
+            }
+            if (sessionB != null)
+            {
+               sessionB.close();
+            }
+         }
+         catch (Exception ignored)
+         {
+         }
+
+         if (service.isStarted())
+         {
+            service.stop();
+         }
+      }
+   }
+
+   public void testSlowConsumerOnMessageHandlerBufferOne() throws Exception
+   {
+      internalTestSlowConsumerOnMessageHandlerBufferOne(false);
+   }
+
+   public void testSlowConsumerOnMessageHandlerBufferOneLargeMessages() throws Exception
+   {
+      internalTestSlowConsumerOnMessageHandlerBufferOne(true);
+   }
+
+   private void internalTestSlowConsumerOnMessageHandlerBufferOne(final boolean largeMessage) throws Exception
+   {
+      MessagingService service = createService(false);
+
+      ClientSession sessionB = null;
+      ClientSession session = null;
+
+      try
+      {
+         final int numberOfMessages = 100;
+
+         service.start();
+
+         ClientSessionFactory sf = createInVMFactory();
+         sf.setConsumerWindowSize(1);
+
+         if (largeMessage)
+         {
+            sf.setMinLargeMessageSize(100);
+         }
+
+         session = sf.createSession(false, true, true);
+
+         SimpleString ADDRESS = new SimpleString("some-queue");
+
+         session.createQueue(ADDRESS, ADDRESS, true);
+
+         sessionB = sf.createSession(false, true, true);
+         sessionB.start();
+
+         session.start();
+
+         ClientConsumerInternal consReceiveOneAndHold = (ClientConsumerInternal)sessionB.createConsumer(ADDRESS);
+
+         final CountDownLatch latchReceived = new CountDownLatch(2);
+         final CountDownLatch latchReceivedBuffered = new CountDownLatch(3);
+
+         final CountDownLatch latchDone = new CountDownLatch(1);
+
+         // It should receive two messages and then give up
+         class LocalHandler implements MessageHandler
+         {
+            boolean failed = false;
+
+            int count = 0;
+
+            /* (non-Javadoc)
+             * @see org.jboss.messaging.core.client.MessageHandler#onMessage(org.jboss.messaging.core.client.ClientMessage)
+             */
+            public synchronized void onMessage(final ClientMessage message)
+            {
+               try
+               {
+                  String str = getTextMessage(message);
+
+                  System.out.println("Received " + str);
+
+                  failed = failed || !str.equals("Msg" + count);
+
+                  message.acknowledge();
+                  latchReceived.countDown();
+                  latchReceivedBuffered.countDown();
+
+                  if (count++ == 1)
+                  {
+                     // it will hold here for a while
+                     if (!latchDone.await(TIMEOUT, TimeUnit.SECONDS))
+                     {
+                        new Exception("ClientConsuemrWindowSizeTest Handler couldn't receive signal in less than 5 seconds").printStackTrace(); // hudson
+                        // or
+                        // junit
+                        // report
+                        failed = true;
+                     }
+                  }
+               }
+               catch (Exception e)
+               {
+                  e.printStackTrace(); // Hudson / JUnit report
+                  failed = true;
+               }
+            }
+         }
+
+         LocalHandler handler = new LocalHandler();
+
+         ClientProducer prod = session.createProducer(ADDRESS);
+
+         for (int i = 0; i < numberOfMessages; i++)
+         {
+            ClientMessage msg = createTextMessage(session, "Msg" + i);
+            if (largeMessage)
+            {
+               msg.getBody().writeBytes(new byte[600]);
+            }
+            prod.send(msg);
+         }
+
+         consReceiveOneAndHold.setMessageHandler(handler);
+
+         assertTrue(latchReceived.await(TIMEOUT, TimeUnit.SECONDS));
+
+         ClientConsumer cons1 = session.createConsumer(ADDRESS);
+
+         for (int i = 3; i < numberOfMessages; i++)
+         {
+            ClientMessage msg = cons1.receive(1000);
+            assertNotNull("expected message at i = " + i, msg);
+            assertEquals("Msg" + i, getTextMessage(msg));
+            msg.acknowledge();
+         }
+
+         latchDone.countDown();
+
+         assertTrue(latchReceivedBuffered.await(TIMEOUT, TimeUnit.SECONDS));
+
+         session.close();
+         session = null;
+
+         sessionB.close();
+         sessionB = null;
+
+         assertEquals(0, getMessageCount(service, ADDRESS.toString()));
+
+         assertFalse("MessageHandler received a failure", handler.failed);
+
+      }
+      finally
+      {
+         try
+         {
+            if (session != null)
+            {
+               session.close();
+            }
+            if (sessionB != null)
+            {
+               sessionB.close();
+            }
+         }
+         catch (Exception ignored)
+         {
+         }
+
+         if (service.isStarted())
+         {
+            service.stop();
+         }
+      }
+   }
+
+   public void testNoWindowRoundRobin() throws Exception
+   {
+      testNoWindowRoundRobin(false);
+   }
+
+   public void testNoWindowRoundRobinLargeMessage() throws Exception
+   {
+      testNoWindowRoundRobin(true);
+   }
+
+   private void testNoWindowRoundRobin(final boolean largeMessages) throws Exception
+   {
+
+      MessagingService service = createService(false);
+
+      ClientSession sessionA = null;
+      ClientSession sessionB = null;
+
+      try
+      {
+         final int numberOfMessages = 100;
+
+         service.start();
+
+         ClientSessionFactory sf = createInVMFactory();
+         sf.setConsumerWindowSize(-1);
+
+         if (largeMessages)
+         {
+            sf.setMinLargeMessageSize(100);
+         }
+
+         sessionA = sf.createSession(false, true, true);
+
+         SimpleString ADDRESS = new SimpleString("some-queue");
+
+         sessionA.createQueue(ADDRESS, ADDRESS, true);
+
+         sessionB = sf.createSession(false, true, true);
+
+         sessionA.start();
+         sessionB.start();
+
+         ClientConsumerInternal consA = (ClientConsumerInternal)sessionA.createConsumer(ADDRESS);
+
+         ClientConsumerInternal consB = (ClientConsumerInternal)sessionB.createConsumer(ADDRESS);
+
+         ClientProducer prod = sessionA.createProducer(ADDRESS);
+
+         for (int i = 0; i < numberOfMessages; i++)
+         {
+            ClientMessage msg = createTextMessage(sessionA, "Msg" + i);
+            if (largeMessages)
+            {
+               msg.getBody().writeBytes(new byte[600]);
+            }
+            prod.send(msg);
+         }
+
+         long timeout = System.currentTimeMillis() + TIMEOUT * 1000;
+
+         boolean foundA = false;
+         boolean foundB = false;
+
+         do
+         {
+            foundA = consA.getBufferSize() == numberOfMessages / 2;
+            foundB = consB.getBufferSize() == numberOfMessages / 2;
+
+            Thread.sleep(10);
+         }
+         while ((!foundA || !foundB) && System.currentTimeMillis() < timeout);
+
+         assertTrue("ConsumerA didn't receive the expected number of messages on buffer (consA=" + consA.getBufferSize() +
+                             ", consB=" +
+                             consB.getBufferSize() +
+                             ") foundA = " +
+                             foundA +
+                             " foundB = " +
+                             foundB,
+                    foundA);
+         assertTrue("ConsumerB didn't receive the expected number of messages on buffer (consA=" + consA.getBufferSize() +
+                             ", consB=" +
+                             consB.getBufferSize() +
+                             ") foundA = " +
+                             foundA +
+                             " foundB = " +
+                             foundB,
+                    foundB);
+
+      }
+      finally
+      {
+         try
+         {
+            if (sessionA != null)
+            {
+               sessionA.close();
+            }
+            if (sessionB != null)
+            {
+               sessionB.close();
+            }
+         }
+         catch (Exception ignored)
+         {
+         }
+
+         if (service.isStarted())
+         {
+            service.stop();
+         }
+      }
+   }
+
+}


Property changes on: trunk/tests/src/org/jboss/messaging/tests/integration/client/ConsumerWindowSizeTest.java
___________________________________________________________________
Name: svn:mergeinfo
   + 




More information about the jboss-cvs-commits mailing list