[jboss-cvs] JBoss Messaging SVN: r6680 - in trunk: tests/src/org/jboss/messaging/tests/integration/client and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue May 5 21:05:37 EDT 2009


Author: clebert.suconic at jboss.com
Date: 2009-05-05 21:05:37 -0400 (Tue, 05 May 2009)
New Revision: 6680

Modified:
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
   trunk/tests/src/org/jboss/messaging/tests/integration/client/ConsumerWindowSizeTest.java
Log:
Fixing ConsumerWindowSizeTest with no credits on flow control

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2009-05-05 23:58:08 UTC (rev 6679)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2009-05-06 01:05:37 UTC (rev 6680)
@@ -486,6 +486,12 @@
 
    // Public ---------------------------------------------------------------------------------------
 
+   
+   /** Only use this on tests */
+   public AtomicInteger getAvailableCredits()
+   {
+      return availableCredits;
+   }
    // Private --------------------------------------------------------------------------------------
 
    private void promptDelivery()

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/client/ConsumerWindowSizeTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/ConsumerWindowSizeTest.java	2009-05-05 23:58:08 UTC (rev 6679)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/ConsumerWindowSizeTest.java	2009-05-06 01:05:37 UTC (rev 6680)
@@ -21,6 +21,7 @@
  */
 package org.jboss.messaging.tests.integration.client;
 
+import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
@@ -32,7 +33,12 @@
 import org.jboss.messaging.core.client.MessageHandler;
 import org.jboss.messaging.core.client.impl.ClientConsumerInternal;
 import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.postoffice.Binding;
+import org.jboss.messaging.core.postoffice.Bindings;
+import org.jboss.messaging.core.postoffice.QueueBinding;
+import org.jboss.messaging.core.server.Consumer;
 import org.jboss.messaging.core.server.MessagingServer;
+import org.jboss.messaging.core.server.impl.ServerConsumerImpl;
 import org.jboss.messaging.tests.util.ServiceTestBase;
 import org.jboss.messaging.utils.SimpleString;
 
@@ -62,9 +68,9 @@
       int encodeSize = message.getEncodeSize();
       session.close();
       cf.close();
-      return encodeSize;      
+      return encodeSize;
    }
-   
+
    /*
    * tests send window size. we do this by having 2 receivers on the q. since we roundrobin the consumer for delivery we
    * know if consumer 1 has received n messages then consumer 2 must have also have received n messages or at least up
@@ -252,37 +258,36 @@
          ClientConsumerInternal consNeverUsed = (ClientConsumerInternal)sessionB.createConsumer(ADDRESS);
 
          ClientProducer prod = session.createProducer(ADDRESS);
-         
-         // This will force a credit to be sent, but if the message wasn't received we need to take out that credit from the server
+
+         // This will force a credit to be sent, but if the message wasn't received we need to take out that credit from
+         // the server
          // or the client will be buffering messages
          assertNull(consNeverUsed.receive(1));
-         
+
          ClientMessage msg = createTextMessage(session, "This one will expire");
          if (largeMessages)
          {
             msg.getBody().writeBytes(new byte[600]);
          }
-         
+
          msg.setExpiration(System.currentTimeMillis() + 100);
          prod.send(msg);
-         
+
          msg = createTextMessage(session, "First-on-non-buffered");
 
          prod.send(msg);
-         
+
          Thread.sleep(110);
-         
-         // It will be able to receive another message, but it shouldn't send a credit again, as the credit was already sent
+
+         // It will be able to receive another message, but it shouldn't send a credit again, as the credit was already
+         // sent
          msg = consNeverUsed.receive(TIMEOUT * 1000);
          assertNotNull(msg);
          assertEquals("First-on-non-buffered", getTextMessage(msg));
          msg.acknowledge();
-         
 
          ClientConsumer cons1 = session.createConsumer(ADDRESS);
 
-
-         
          for (int i = 0; i < numberOfMessages; i++)
          {
             msg = createTextMessage(session, "Msg" + i);
@@ -599,7 +604,7 @@
                      if (!latchDone.await(TIMEOUT, TimeUnit.SECONDS)) // a timed wait, so if the test fails, one less
                      // thread around
                      {
-                        new Exception("ClientConsuemrWindowSizeTest Handler couldn't receive signal in less than 5 seconds").printStackTrace(); 
+                        new Exception("ClientConsuemrWindowSizeTest Handler couldn't receive signal in less than 5 seconds").printStackTrace();
                         failed = true;
                      }
 
@@ -861,6 +866,11 @@
       testNoWindowRoundRobin(false);
    }
 
+   public void testNoWindowRoundRobinLargeMessage() throws Exception
+   {
+      testNoWindowRoundRobin(true);
+   }
+
    private void testNoWindowRoundRobin(final boolean largeMessages) throws Exception
    {
 
@@ -878,11 +888,6 @@
          ClientSessionFactory sf = createInVMFactory();
          sf.setConsumerWindowSize(-1);
 
-         if (largeMessages)
-         {
-            sf.setMinLargeMessageSize(100);
-         }
-
          sessionA = sf.createSession(false, true, true);
 
          SimpleString ADDRESS = new SimpleString("some-queue");
@@ -898,6 +903,31 @@
 
          ClientConsumerInternal consB = (ClientConsumerInternal)sessionB.createConsumer(ADDRESS);
 
+         {
+            // We can only guarantee round robing with WindowSize = -1, after the ServerConsumer object received SessionConsumerFlowCreditMessage(-1)
+            // Since that is done asynchronously we verify that the information was received before we proceed on sending messages or else the distribution won't be 
+            // even as expected by the test
+            Bindings bindings = server.getPostOffice().getBindingsForAddress(ADDRESS);
+
+            assertEquals(1, bindings.getBindings().size());
+
+            for (Binding binding : bindings.getBindings())
+            {
+               Set<Consumer> consumers = ((QueueBinding)binding).getQueue().getConsumers();
+
+               for (Consumer consumer : consumers)
+               {
+                  ServerConsumerImpl consumerImpl = (ServerConsumerImpl)consumer;
+                  long timeout = System.currentTimeMillis() + 5000;
+                  while (timeout > System.currentTimeMillis() && consumerImpl.getAvailableCredits() != null)
+                  {
+                     new Exception("Trace").printStackTrace();
+                     Thread.sleep(10);
+                  }
+               }
+            }
+         }
+
          ClientProducer prod = sessionA.createProducer(ADDRESS);
 
          for (int i = 0; i < numberOfMessages; i++)




More information about the jboss-cvs-commits mailing list