[jboss-cvs] JBoss Messaging SVN: r6205 - in trunk: src/main/org/jboss/messaging/core/server/impl and 1 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Mar 27 18:39:37 EDT 2009


Author: clebert.suconic at jboss.com
Date: 2009-03-27 18:39:36 -0400 (Fri, 27 Mar 2009)
New Revision: 6205

Modified:
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
   trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientConsumerWindowSizeTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientSessionStopStartTest.java
Log:
Adding SlowConsumerTests (both windowSize=1 and windowSize=0) and fixing slowConsumers for largeMessages and regularMessages

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java	2009-03-27 18:57:23 UTC (rev 6204)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java	2009-03-27 22:39:36 UTC (rev 6205)
@@ -135,6 +135,11 @@
                                       "Cannot call receive(...) - a MessageHandler is set");
       }
 
+      if (clientWindowSize == 0 && buffer.isEmpty())
+      {
+         sendCredits(1);
+      }
+
       receiverThread = Thread.currentThread();
 
       if (timeout == 0)
@@ -152,7 +157,7 @@
          while (true)
          {
             ClientMessageInternal m = null;
-
+            
             synchronized (this)
             {
                while ((stopped || (m = buffer.poll()) == null) &&
@@ -248,6 +253,11 @@
       }
 
       boolean noPreviousHandler = handler == null;
+      
+      if (handler != theHandler && clientWindowSize == 0)
+      {
+         sendCredits(1);
+      }
 
       handler = theHandler;
 
@@ -492,16 +502,16 @@
       sessionExecutor.execute(runner);
    }
 
-   private void flowControl(final int messageBytes, final boolean useExecutor) throws MessagingException
+   private void flowControl(final int messageBytes, final boolean isLargeMessage) throws MessagingException
    {
-      if (clientWindowSize > 0)
+      if (clientWindowSize >= 0)
       {
          creditsToSend += messageBytes;
 
          if (creditsToSend >= clientWindowSize)
          {
 
-            if (useExecutor)
+            if (isLargeMessage)
             {
                // Flowcontrol on largeMessages continuations needs to be done in a separate thread or failover would block
                final int credits = creditsToSend;
@@ -509,23 +519,39 @@
                creditsToSend = 0;
                sessionExecutor.execute(new Runnable()
                {
-
                   public void run()
                   {
-                     channel.send(new SessionConsumerFlowCreditMessage(id, credits));
+                     sendCredits(credits);
                   }
 
+
                });
             }
             else
             {
-               channel.send(new SessionConsumerFlowCreditMessage(id, creditsToSend));
+               if (clientWindowSize == 0)
+               {
+                  // sending the credits - 1 initially send to fire the slow consumer, or the slow consumer would be aways buffering one after received the first message
+                  sendCredits(creditsToSend - 1);
+               }
+               else
+               {
+                  sendCredits(creditsToSend);
+               }
                creditsToSend = 0;
             }
          }
       }
    }
 
+   /**
+    * @param credits
+    */
+   private void sendCredits(final int credits)
+   {
+      channel.send(new SessionConsumerFlowCreditMessage(id, credits));
+   }
+
    private void waitForOnMessageToComplete()
    {
       if (handler == null)
@@ -598,6 +624,11 @@
             {
                session.expire(id, message.getMessageID());
             }
+            
+            if (clientWindowSize == 0)
+            {
+               sendCredits(1);
+            }
          }
       }
    }

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2009-03-27 18:57:23 UTC (rev 6204)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2009-03-27 22:39:36 UTC (rev 6205)
@@ -1218,9 +1218,14 @@
          // caution for very fast consumers
          clientWindowSize = -1;
       }
+      else if (windowSize == 0)
+      {
+         // Slow consumer - no buffering
+         clientWindowSize = 0;
+      }
       else if (windowSize == 1)
       {
-         // Slow consumer - no buffering
+         // Slow consumer = buffer 1
          clientWindowSize = 1;
       }
       else if (windowSize > 1)
@@ -1249,7 +1254,10 @@
       // We even send it if windowSize == -1, since we need to start the
       // consumer
 
-      channel.send(new SessionConsumerFlowCreditMessage(consumerID, windowSize));
+      if (windowSize != 0)
+      {
+         channel.send(new SessionConsumerFlowCreditMessage(consumerID, windowSize));
+      }
 
       return consumer;
    }

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2009-03-27 18:57:23 UTC (rev 6204)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2009-03-27 22:39:36 UTC (rev 6205)
@@ -801,7 +801,7 @@
 
             while (positionPendingLargeMessage < sizePendingLargeMessage)
             {
-               if (precalculateAvailableCredits <= 0)
+               if (precalculateAvailableCredits <= 0 && availableCredits != null)
                {
                   if (trace)
                   {

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientConsumerWindowSizeTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientConsumerWindowSizeTest.java	2009-03-27 18:57:23 UTC (rev 6204)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientConsumerWindowSizeTest.java	2009-03-27 22:39:36 UTC (rev 6205)
@@ -21,29 +21,32 @@
  */
 package org.jboss.messaging.tests.integration.client;
 
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
 import org.jboss.messaging.core.client.ClientConsumer;
 import org.jboss.messaging.core.client.ClientMessage;
 import org.jboss.messaging.core.client.ClientProducer;
 import org.jboss.messaging.core.client.ClientSession;
 import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.MessageHandler;
+import org.jboss.messaging.core.client.impl.ClientConsumerInternal;
 import org.jboss.messaging.core.server.MessagingService;
-import org.jboss.messaging.core.server.Queue;
 import org.jboss.messaging.tests.util.ServiceTestBase;
 import org.jboss.messaging.utils.SimpleString;
 
 /**
  * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
  */
 public class ClientConsumerWindowSizeTest extends ServiceTestBase
 {
-   public final SimpleString addressA = new SimpleString("addressA");
+   private final SimpleString addressA = new SimpleString("addressA");
 
-   public final SimpleString queueA = new SimpleString("queueA");
+   private final SimpleString queueA = new SimpleString("queueA");
 
-   public final SimpleString queueB = new SimpleString("queueB");
+   private final int TIMEOUT = 5;
 
-   public final SimpleString queueC = new SimpleString("queueC");
-
    /*
    * tests send window size. we do this by having 2 receivers on the q. since we roundrobin the consumer for delivery we
    * know if consumer 1 has received n messages then consumer 2 must have also have received n messages or at least up
@@ -56,7 +59,7 @@
       try
       {
          messagingService.start();
-         cf.setBlockOnNonPersistentSend(true);
+         cf.setBlockOnNonPersistentSend(false);
          ClientSession sendSession = cf.createSession(false, true, true);
          ClientSession receiveSession = cf.createSession(false, true, true);
          sendSession.createQueue(addressA, queueA, false);
@@ -83,11 +86,21 @@
             m.acknowledge();
          }
          receiveSession.close();
-         Queue q = (Queue)messagingService.getServer().getPostOffice().getBinding(queueA).getBindable();
-         assertEquals(numMessage, q.getDeliveringCount());
+         
+         for (int i = 0; i < numMessage * 2; i++)
+         {
+            ClientMessage m = cc.receive(5000);
+            assertNotNull(m);
+            m.acknowledge();
+         }
+         
 
          session.close();
          sendSession.close();
+         
+
+         assertEquals(0, getMessageCount(messagingService, queueA.toString()));
+
       }
       finally
       {
@@ -98,11 +111,11 @@
       }
    }
 
-   public void testSlowConsumer() throws Exception
+   public void testSlowConsumerBufferingOne() throws Exception
    {
       MessagingService service = createService(false);
 
-      ClientSession sessionNotUsed = null;
+      ClientSession sessionB = null;
       ClientSession session = null;
 
       try
@@ -116,16 +129,16 @@
 
          session = sf.createSession(false, true, true);
 
-         SimpleString ADDRESS = new SimpleString("some-queue");
+         SimpleString ADDRESS = addressA;
 
          session.createQueue(ADDRESS, ADDRESS, true);
 
-         sessionNotUsed = sf.createSession(false, true, true);
-         sessionNotUsed.start();
+         sessionB = sf.createSession(false, true, true);
+         sessionB.start();
 
          session.start();
 
-         ClientConsumer consNeverUsed = sessionNotUsed.createConsumer(ADDRESS);
+         ClientConsumer consNeverUsed = sessionB.createConsumer(ADDRESS);
 
          ClientConsumer cons1 = session.createConsumer(ADDRESS);
 
@@ -146,13 +159,13 @@
          ClientMessage msg = consNeverUsed.receive(500);
          assertNotNull(msg);
          msg.acknowledge();
-         
+
          session.close();
          session = null;
-         
-         sessionNotUsed.close();
-         sessionNotUsed = null;
 
+         sessionB.close();
+         sessionB = null;
+
          assertEquals(0, getMessageCount(service, ADDRESS.toString()));
 
       }
@@ -162,8 +175,8 @@
          {
             if (session != null)
                session.close();
-            if (sessionNotUsed != null)
-               sessionNotUsed.close();
+            if (sessionB != null)
+               sessionB.close();
          }
          catch (Exception ignored)
          {
@@ -176,136 +189,668 @@
       }
    }
 
-   // A better slow consumer test
+   public void testSlowConsumerNoBuffer() throws Exception
+   {
+      internalTestSlowConsumerNoBuffer(false);
+   }
+
+   public void testSlowConsumerNoBufferLargeMessages() throws Exception
+   {
+      internalTestSlowConsumerNoBuffer(true);
+   }
+
+   private void internalTestSlowConsumerNoBuffer(boolean largeMessages) throws Exception
+   {
+      MessagingService service = createService(false);
+
+      ClientSession sessionB = null;
+      ClientSession session = null;
+
+      try
+      {
+         final int numberOfMessages = 100;
+
+         service.start();
+
+         ClientSessionFactory sf = createInVMFactory();
+         sf.setConsumerWindowSize(0);
+
+         if (largeMessages)
+         {
+            sf.setMinLargeMessageSize(100);
+         }
+
+         session = sf.createSession(false, true, true);
+
+         SimpleString ADDRESS = addressA;
+
+         session.createQueue(ADDRESS, ADDRESS, true);
+
+         sessionB = sf.createSession(false, true, true);
+         sessionB.start();
+
+         session.start();
+
+         ClientConsumerInternal consNeverUsed = (ClientConsumerInternal)sessionB.createConsumer(ADDRESS);
+
+         ClientConsumer cons1 = session.createConsumer(ADDRESS);
+
+         ClientProducer prod = session.createProducer(ADDRESS);
+
+         for (int i = 0; i < numberOfMessages; i++)
+         {
+            ClientMessage msg = createTextMessage(session, "Msg" + i);
+
+            if (largeMessages)
+            {
+               msg.getBody().writeBytes(new byte[600]);
+            }
+
+            prod.send(msg);
+         }
+
+         for (int i = 0; i < numberOfMessages; i++)
+         {
+            ClientMessage msg = cons1.receive(1000);
+            assertNotNull("expected message at i = " + i, msg);
+            assertEquals("Msg" + i, getTextMessage(msg));
+            msg.acknowledge();
+         }
+
+         assertEquals(0, consNeverUsed.getBufferSize());
+
+         session.close();
+         session = null;
+
+         sessionB.close();
+         sessionB = null;
+
+         assertEquals(0, getMessageCount(service, ADDRESS.toString()));
+
+      }
+      finally
+      {
+         try
+         {
+            if (session != null)
+               session.close();
+            if (sessionB != null)
+               sessionB.close();
+         }
+         catch (Exception ignored)
+         {
+         }
+
+         if (service.isStarted())
+         {
+            service.stop();
+         }
+      }
+   }
+
+   public void testSlowConsumerNoBuffer2() throws Exception
+   {
+      internalTestSlowConsumerNoBuffer2(false);
+   }
+
+   public void testSlowConsumerNoBuffer2LargeMessages() throws Exception
+   {
+      internalTestSlowConsumerNoBuffer2(true);
+   }
+
+   private void internalTestSlowConsumerNoBuffer2(final boolean largeMessages) throws Exception
+   {
+      MessagingService service = createService(false);
+
+      ClientSession session1 = null;
+      ClientSession session2 = null;
+
+      try
+      {
+         final int numberOfMessages = 100;
+
+         service.start();
+
+         ClientSessionFactory sf = createInVMFactory();
+
+         sf.setConsumerWindowSize(0);
+
+         if (largeMessages)
+         {
+            sf.setMinLargeMessageSize(100);
+         }
+
+         session1 = sf.createSession(false, true, true);
+
+         session2 = sf.createSession(false, true, true);
+
+         session1.start();
+
+         session2.start();
+
+         SimpleString ADDRESS = new SimpleString("some-queue");
+
+         session1.createQueue(ADDRESS, ADDRESS, true);
+
+         ClientConsumerInternal cons1 = (ClientConsumerInternal)session1.createConsumer(ADDRESS);
+
+         // Note we make sure we send the messages *before* cons2 is created
+
+         ClientProducer prod = session1.createProducer(ADDRESS);
+
+         for (int i = 0; i < numberOfMessages; i++)
+         {
+            ClientMessage msg = createTextMessage(session1, "Msg" + i);
+            if (largeMessages)
+            {
+               msg.getBody().writeBytes(new byte[600]);
+            }
+            prod.send(msg);
+         }
+
+         ClientConsumerInternal cons2 = (ClientConsumerInternal)session2.createConsumer(ADDRESS);
+
+         for (int i = 0; i < numberOfMessages / 2; i++)
+         {
+            ClientMessage msg = cons1.receive(1000);
+            assertNotNull("expected message at i = " + i, msg);
+
+            String str = getTextMessage(msg);
+            assertEquals("Msg" + i, str);
+
+            msg.acknowledge();
+
+            assertEquals("A slow consumer shouldn't buffer anything on the client side!", 0, cons1.getBufferSize());
+         }
+
+         for (int i = numberOfMessages / 2; i < numberOfMessages; i++)
+         {
+            ClientMessage msg = cons2.receive(1000);
+
+            assertNotNull("expected message at i = " + i, msg);
+
+            assertEquals("Msg" + i, msg.getBody().readString());
+
+            msg.acknowledge();
+
+            assertEquals("A slow consumer shouldn't buffer anything on the client side!", 0, cons2.getBufferSize());
+         }
+
+         session1.commit(); // just to make sure everything is flushed and no pending packets on the sending buffer, or
+         // the getMessageCount would fail
+         session2.commit();
+
+         assertEquals(0, getMessageCount(service, ADDRESS.toString()));
+
+         // This should also work the other way around
+
+         cons1.close();
+
+         cons2.close();
+
+         cons1 = (ClientConsumerInternal)session1.createConsumer(ADDRESS);
+
+         // Note we make sure we send the messages *before* cons2 is created
+
+         for (int i = 0; i < numberOfMessages; i++)
+         {
+            ClientMessage msg = createTextMessage(session1, "Msg" + i);
+            if (largeMessages)
+            {
+               msg.getBody().writeBytes(new byte[600]);
+            }
+            prod.send(msg);
+         }
+
+         cons2 = (ClientConsumerInternal)session2.createConsumer(ADDRESS);
+
+         // Now we receive on cons2 first
+
+         for (int i = 0; i < numberOfMessages / 2; i++)
+         {
+            ClientMessage msg = cons2.receive(1000);
+            assertNotNull("expected message at i = " + i, msg);
+
+            assertEquals("Msg" + i, msg.getBody().readString());
+
+            msg.acknowledge();
+
+            assertEquals("A slow consumer shouldn't buffer anything on the client side!", 0, cons2.getBufferSize());
+
+         }
+
+         for (int i = numberOfMessages / 2; i < numberOfMessages; i++)
+         {
+            ClientMessage msg = cons1.receive(1000);
+
+            assertNotNull("expected message at i = " + i, msg);
+
+            assertEquals("Msg" + i, msg.getBody().readString());
+
+            msg.acknowledge();
+
+            assertEquals("A slow consumer shouldn't buffer anything on the client side!", 0, cons1.getBufferSize());
+         }
+
+         session1.commit(); // just to make sure everything is flushed and no pending packets on the sending buffer, or
+         // the getMessageCount would fail
+         session2.commit();
+
+         assertEquals(0, getMessageCount(service, ADDRESS.toString()));
+
+      }
+      finally
+      {
+         try
+         {
+            if (session1 != null)
+               session1.close();
+            if (session2 != null)
+               session2.close();
+         }
+         catch (Exception ignored)
+         {
+         }
+
+         if (service.isStarted())
+         {
+            service.stop();
+         }
+      }
+   }
+
+   public void testSlowConsumerOnMessageHandlerNoBuffers() throws Exception
+   {
+      MessagingService service = createService(false);
+
+      ClientSession sessionB = null;
+      ClientSession session = null;
+
+      try
+      {
+         final int numberOfMessages = 100;
+
+         service.start();
+
+         ClientSessionFactory sf = createInVMFactory();
+         sf.setConsumerWindowSize(0);
+
+         session = sf.createSession(false, true, true);
+
+         SimpleString ADDRESS = new SimpleString("some-queue");
+
+         session.createQueue(ADDRESS, ADDRESS, true);
+
+         sessionB = sf.createSession(false, true, true);
+         sessionB.start();
+
+         session.start();
+
+         ClientConsumerInternal consReceiveOneAndHold = (ClientConsumerInternal)sessionB.createConsumer(ADDRESS);
+
+         final CountDownLatch latchReceived = new CountDownLatch(2);
+
+         final CountDownLatch latchDone = new CountDownLatch(1);
+
+         // It should receive two messages and then give up
+         class LocalHandler implements MessageHandler
+         {
+            boolean failed = false;
+
+            int count = 0;
+
+            /* (non-Javadoc)
+             * @see org.jboss.messaging.core.client.MessageHandler#onMessage(org.jboss.messaging.core.client.ClientMessage)
+             */
+            public synchronized void onMessage(ClientMessage message)
+            {
+               try
+               {
+                  String str = getTextMessage(message);
+
+                  failed = failed || !str.equals("Msg" + count);
+
+                  message.acknowledge();
+                  latchReceived.countDown();
+
+                  if (count++ == 1)
+                  {
+                     // it will hold here for a while
+                     if (!latchDone.await(TIMEOUT, TimeUnit.SECONDS)) // a timed wait, so if the test fails, one less
+                                                                      // thread around
+                     {
+                        new Exception("ClientConsuemrWindowSizeTest Handler couldn't receive signal in less than 5 seconds").printStackTrace(); // hudson
+                                                                                                                                                // or
+                                                                                                                                                // junit
+                                                                                                                                                // report
+                        failed = true;
+                     }
+                  }
+               }
+               catch (Exception e)
+               {
+                  e.printStackTrace(); // Hudson / JUnit report
+                  failed = true;
+               }
+            }
+         }
+
+         LocalHandler handler = new LocalHandler();
+
+         ClientConsumer cons1 = session.createConsumer(ADDRESS);
+
+         ClientProducer prod = session.createProducer(ADDRESS);
+
+         for (int i = 0; i < numberOfMessages; i++)
+         {
+            prod.send(createTextMessage(session, "Msg" + i));
+         }
+
+         consReceiveOneAndHold.setMessageHandler(handler);
+
+         assertTrue(latchReceived.await(TIMEOUT, TimeUnit.SECONDS));
+
+         assertEquals(0, consReceiveOneAndHold.getBufferSize());
+
+         for (int i = 2; i < numberOfMessages; i++)
+         {
+            ClientMessage msg = cons1.receive(1000);
+            assertNotNull("expected message at i = " + i, msg);
+            assertEquals("Msg" + i, getTextMessage(msg));
+            msg.acknowledge();
+         }
+
+         assertEquals(0, consReceiveOneAndHold.getBufferSize());
+
+         latchDone.countDown();
+
+         session.close();
+         session = null;
+
+         sessionB.close();
+         sessionB = null;
+
+         assertEquals(0, getMessageCount(service, ADDRESS.toString()));
+
+         assertFalse("MessageHandler received a failure", handler.failed);
+
+      }
+      finally
+      {
+         try
+         {
+            if (session != null)
+               session.close();
+            if (sessionB != null)
+               sessionB.close();
+         }
+         catch (Exception ignored)
+         {
+         }
+
+         if (service.isStarted())
+         {
+            service.stop();
+         }
+      }
+   }
+
+   public void testSlowConsumerOnMessageHandlerBufferOne() throws Exception
+   {
+      internalTestSlowConsumerOnMessageHandlerBufferOne(false);
+   }
+
+   public void testSlowConsumerOnMessageHandlerBufferOneLargeMessages() throws Exception
+   {
+      internalTestSlowConsumerOnMessageHandlerBufferOne(true);
+   }
    
-   //Commented out until behaviour is fixed
-//   public void testSlowConsumer2() throws Exception
-//   {
-//      MessagingService service = createService(false);
-//
-//      ClientSession session1 = null;
-//      ClientSession session2 = null;
-//
-//      try
-//      {
-//         final int numberOfMessages = 100;
-//
-//         service.start();
-//
-//         ClientSessionFactory sf = createInVMFactory();
-//
-//         sf.setConsumerWindowSize(1);
-//
-//         session1 = sf.createSession(false, true, true);
-//
-//         session2 = sf.createSession(false, true, true);
-//         
-//         session1.start();
-//         
-//         session2.start();
-//
-//         SimpleString ADDRESS = new SimpleString("some-queue");
-//
-//         session1.createQueue(ADDRESS, ADDRESS, true);
-//
-//         ClientConsumer cons1 = session1.createConsumer(ADDRESS);
-//         
-//         //Note we make sure we send the messages *before* cons2 is created
-//         
-//         ClientProducer prod = session1.createProducer(ADDRESS);
-//
-//         for (int i = 0; i < numberOfMessages; i++)
-//         {
-//            prod.send(createTextMessage(session1, "Msg" + i));
-//         }
-//
-//         ClientConsumer cons2 = session2.createConsumer(ADDRESS);
-//         
-//         for (int i = 0; i < numberOfMessages; i += 2)
-//         {
-//            ClientMessage msg = cons1.receive(1000);
-//            assertNotNull("expected message at i = " + i, msg);
-//
-//            //assertEquals("Msg" + i, msg.getBody().readString());
-//
-//            msg.acknowledge();
-//         }
-//
-//         for (int i = 1; i < numberOfMessages; i += 2)
-//         {
-//            ClientMessage msg = cons2.receive(1000);
-//
-//            assertNotNull("expected message at i = " + i, msg);
-//
-//            assertEquals("Msg" + i, msg.getBody().readString());
-//
-//            msg.acknowledge();
-//         }
-//
-//         assertEquals(0, getMessageCount(service, ADDRESS.toString()));
-//         
-//         //This should also work the other way around
-//         
-//         cons1.close();
-//         
-//         cons2.close();
-//         
-//         cons1 = session1.createConsumer(ADDRESS);
-//         
-//         //Note we make sure we send the messages *before* cons2 is created
-//         
-//         for (int i = 0; i < numberOfMessages; i++)
-//         {
-//            prod.send(createTextMessage(session1, "Msg" + i));
-//         }
-//
-//         cons2 = session2.createConsumer(ADDRESS);
-//         
-//         //Now we receive on cons2 first
-//         
-//         for (int i = 0; i < numberOfMessages; i += 2)
-//         {
-//            ClientMessage msg = cons2.receive(1000);
-//            assertNotNull("expected message at i = " + i, msg);
-//
-//            assertEquals("Msg" + i, msg.getBody().readString());
-//
-//            msg.acknowledge();
-//         }
-//
-//         for (int i = 1; i < numberOfMessages; i += 2)
-//         {
-//            ClientMessage msg = cons1.receive(1000);
-//
-//            assertNotNull("expected message at i = " + i, msg);
-//
-//            assertEquals("Msg" + i, msg.getBody().readString());
-//
-//            msg.acknowledge();
-//         }
-//
-//         assertEquals(0, getMessageCount(service, ADDRESS.toString()));
-//         
-//         
-//      }
-//      finally
-//      {
-//         try
-//         {
-//            if (session1 != null)
-//               session1.close();
-//            if (session2 != null)
-//               session2.close();
-//         }
-//         catch (Exception ignored)
-//         {
-//         }
-//
-//         if (service.isStarted())
-//         {
-//            service.stop();
-//         }
-//      }
-//   }
+   
+   private void internalTestSlowConsumerOnMessageHandlerBufferOne(boolean largeMessage) throws Exception
+   {
+      MessagingService service = createService(false);
 
+      ClientSession sessionB = null;
+      ClientSession session = null;
+
+      try
+      {
+         final int numberOfMessages = 100;
+
+         service.start();
+
+         ClientSessionFactory sf = createInVMFactory();
+         sf.setConsumerWindowSize(1);
+
+         if (largeMessage)
+         {
+            sf.setMinLargeMessageSize(100);
+         }
+
+         session = sf.createSession(false, true, true);
+
+         SimpleString ADDRESS = new SimpleString("some-queue");
+
+         session.createQueue(ADDRESS, ADDRESS, true);
+
+         sessionB = sf.createSession(false, true, true);
+         sessionB.start();
+
+         session.start();
+
+         ClientConsumerInternal consReceiveOneAndHold = (ClientConsumerInternal)sessionB.createConsumer(ADDRESS);
+
+         final CountDownLatch latchReceived = new CountDownLatch(2);
+         final CountDownLatch latchReceivedBuffered = new CountDownLatch(3);
+
+         final CountDownLatch latchDone = new CountDownLatch(1);
+
+         // It should receive two messages and then give up
+         class LocalHandler implements MessageHandler
+         {
+            boolean failed = false;
+
+            int count = 0;
+
+            /* (non-Javadoc)
+             * @see org.jboss.messaging.core.client.MessageHandler#onMessage(org.jboss.messaging.core.client.ClientMessage)
+             */
+            public synchronized void onMessage(ClientMessage message)
+            {
+               try
+               {
+                  String str = getTextMessage(message);
+
+                  System.out.println("Received " + str);
+
+                  failed = failed || !str.equals("Msg" + count);
+
+                  message.acknowledge();
+                  latchReceived.countDown();
+                  latchReceivedBuffered.countDown();
+
+                  if (count++ == 1)
+                  {
+                     // it will hold here for a while
+                     if (!latchDone.await(TIMEOUT, TimeUnit.SECONDS))
+                     {
+                        new Exception("ClientConsuemrWindowSizeTest Handler couldn't receive signal in less than 5 seconds").printStackTrace(); // hudson
+                                                                                                                                                // or
+                                                                                                                                                // junit
+                                                                                                                                                // report
+                        failed = true;
+                     }
+                  }
+               }
+               catch (Exception e)
+               {
+                  e.printStackTrace(); // Hudson / JUnit report
+                  failed = true;
+               }
+            }
+         }
+
+         LocalHandler handler = new LocalHandler();
+
+         ClientProducer prod = session.createProducer(ADDRESS);
+
+         for (int i = 0; i < numberOfMessages; i++)
+         {
+            ClientMessage msg = createTextMessage(session, "Msg" + i);
+            if (largeMessage)
+            {
+               msg.getBody().writeBytes(new byte[600]);
+            }
+            prod.send(msg);
+         }
+
+         consReceiveOneAndHold.setMessageHandler(handler);
+
+         assertTrue(latchReceived.await(TIMEOUT, TimeUnit.SECONDS));
+
+         ClientConsumer cons1 = session.createConsumer(ADDRESS);
+
+         for (int i = 3; i < numberOfMessages; i++)
+         {
+            ClientMessage msg = cons1.receive(1000);
+            assertNotNull("expected message at i = " + i, msg);
+            assertEquals("Msg" + i, getTextMessage(msg));
+            msg.acknowledge();
+         }
+
+         latchDone.countDown();
+         
+         assertTrue(latchReceivedBuffered.await(TIMEOUT, TimeUnit.SECONDS));
+
+         session.close();
+         session = null;
+
+         sessionB.close();
+         sessionB = null;
+
+         assertEquals(0, getMessageCount(service, ADDRESS.toString()));
+
+         assertFalse("MessageHandler received a failure", handler.failed);
+
+      }
+      finally
+      {
+         try
+         {
+            if (session != null)
+               session.close();
+            if (sessionB != null)
+               sessionB.close();
+         }
+         catch (Exception ignored)
+         {
+         }
+
+         if (service.isStarted())
+         {
+            service.stop();
+         }
+      }
+   }
+   
+   public void testNoWindowRoundRobin() throws Exception
+   {
+      testNoWindowRoundRobin(false);
+   }
+   
+   
+   public void testNoWindowRoundRobinLargeMessage() throws Exception
+   {
+      testNoWindowRoundRobin(true);
+   }
+   
+   private void testNoWindowRoundRobin(final boolean largeMessages) throws Exception
+   {
+      
+      MessagingService service = createService(false);
+
+      ClientSession sessionA = null;
+      ClientSession sessionB = null;
+
+      try
+      {
+         final int numberOfMessages = 100;
+
+         service.start();
+
+         ClientSessionFactory sf = createInVMFactory();
+         sf.setConsumerWindowSize(-1);
+         
+         if (largeMessages)
+         {
+            sf.setMinLargeMessageSize(100);
+         }
+
+         sessionA = sf.createSession(false, true, true);
+
+         SimpleString ADDRESS = new SimpleString("some-queue");
+
+         sessionA.createQueue(ADDRESS, ADDRESS, true);
+
+         sessionB = sf.createSession(false, true, true);
+
+         sessionA.start();
+         sessionB.start();
+
+         ClientConsumerInternal consA = (ClientConsumerInternal)sessionA.createConsumer(ADDRESS);
+
+         ClientConsumerInternal consB = (ClientConsumerInternal)sessionB.createConsumer(ADDRESS);
+
+         ClientProducer prod = sessionA.createProducer(ADDRESS);
+
+         for (int i = 0; i < numberOfMessages; i++)
+         {
+            ClientMessage msg = createTextMessage(sessionA, "Msg" + i);
+            if (largeMessages)
+            {
+               msg.getBody().writeBytes(new byte[600]);
+            }
+            prod.send(msg);
+         }
+         
+         
+         long timeout = System.currentTimeMillis() + TIMEOUT * 1000;
+         
+         boolean foundA = false;
+         boolean foundB = false;
+         
+         do
+         {
+            foundA = consA.getBufferSize() == numberOfMessages / 2;
+            foundB = consB.getBufferSize() == numberOfMessages / 2;
+            
+            Thread.sleep(10);
+         } while ((!foundA || !foundB) && System.currentTimeMillis() < timeout);
+         
+         
+         assertTrue("ConsumerA didn't receive the expected number of messages on buffer (consA=" + consA.getBufferSize() + ", consB=" + consB.getBufferSize() + ") foundA = " + foundA + " foundB = " + foundB, foundA);
+         assertTrue("ConsumerB didn't receive the expected number of messages on buffer (consA=" + consA.getBufferSize() + ", consB=" + consB.getBufferSize() + ") foundA = " + foundA + " foundB = " + foundB, foundB);
+         
+
+      }
+      finally
+      {
+         try
+         {
+            if (sessionA != null)
+               sessionA.close();
+            if (sessionB != null)
+               sessionB.close();
+         }
+         catch (Exception ignored)
+         {
+         }
+
+         if (service.isStarted())
+         {
+            service.stop();
+         }
+      }
+   }
+
 }

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientSessionStopStartTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientSessionStopStartTest.java	2009-03-27 18:57:23 UTC (rev 6204)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientSessionStopStartTest.java	2009-03-27 22:39:36 UTC (rev 6205)
@@ -314,7 +314,15 @@
 
       latch.await();
 
-      session.stop();
+      try
+      {
+         session.stop();
+      }
+      catch (Exception e)
+      {
+         log.warn(e.getMessage(), e);
+         throw e;
+      }
 
       assertFalse(handler.failed);
 




More information about the jboss-cvs-commits mailing list