[jboss-cvs] JBoss Messaging SVN: r2491 - in trunk: tests/src/org/jboss/test/messaging/core and 1 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Feb 28 01:54:10 EST 2007


Author: ovidiu.feodorov at jboss.com
Date: 2007-02-28 01:54:10 -0500 (Wed, 28 Feb 2007)
New Revision: 2491

Modified:
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultMessagePullPolicy.java
   trunk/tests/src/org/jboss/test/messaging/core/SimpleReceiver.java
   trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionWithDefaultMessagePullPolicyTest.java
Log:
chasing http://jira.jboss.org/jira/browse/JBMESSAGING-901

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultMessagePullPolicy.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultMessagePullPolicy.java	2007-02-28 06:14:08 UTC (rev 2490)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultMessagePullPolicy.java	2007-02-28 06:54:10 UTC (rev 2491)
@@ -25,9 +25,7 @@
 import java.util.List;
 
 /**
- * A DefaultMessagePullPolicy
- * 
- * This chooses the remote queue with the most messages
+ * A MessagePullPolicy implementation that chooses the remote queue with the most messages.
  *
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @version <tt>$Revision$</tt>
@@ -37,37 +35,53 @@
  */
 public class DefaultMessagePullPolicy implements MessagePullPolicy
 {
+   // Constants ------------------------------------------------------------------------------------
+
+   // Static ---------------------------------------------------------------------------------------
+
+   // Attributes -----------------------------------------------------------------------------------
+
+   // Constructors ---------------------------------------------------------------------------------
+
+   // MessagePullPolicy implementation -------------------------------------------------------------
+
    public ClusteredQueue chooseQueue(List queues)
    {
-      Iterator iter = queues.iterator();
-      
+      int maxMessages = 0;
       ClusteredQueue chosenQueue = null;
-      
-      int maxMessages = 0;
-       
-      while (iter.hasNext())
+
+      for(Iterator i = queues.iterator(); i.hasNext(); )
       {
-         ClusteredQueue queue = (ClusteredQueue)iter.next();
-         
+         ClusteredQueue queue = (ClusteredQueue)i.next();
+
          if (!queue.isLocal())
-         {  
+         {
             QueueStats stats = queue.getStats();
-            
+
             if (stats != null)
-            {               
+            {
                int cnt = stats.getMessageCount();
-               
+
                if (cnt > maxMessages)
                {
                   maxMessages = cnt;
-                  
                   chosenQueue = queue;
                }
             }
          }
       }
-      
+
       return chosenQueue;
    }
 
+   // Public ---------------------------------------------------------------------------------------
+
+   // Package protected ----------------------------------------------------------------------------
+
+   // Protected ------------------------------------------------------------------------------------
+
+   // Private --------------------------------------------------------------------------------------
+
+   // Inner classes --------------------------------------------------------------------------------
+
 }

Modified: trunk/tests/src/org/jboss/test/messaging/core/SimpleReceiver.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/SimpleReceiver.java	2007-02-28 06:14:08 UTC (rev 2490)
+++ trunk/tests/src/org/jboss/test/messaging/core/SimpleReceiver.java	2007-02-28 06:54:10 UTC (rev 2491)
@@ -258,6 +258,7 @@
          {      
             if (timeout < 0)
             {
+               log.trace(this + ".waitForHandleInvocations() current timeout is " + timeout);
                resetInvocationCount();
                return false;
             }
@@ -277,7 +278,6 @@
       }
       
       resetInvocationCount();
-      
       return true;
    }
 

Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionWithDefaultMessagePullPolicyTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionWithDefaultMessagePullPolicyTest.java	2007-02-28 06:14:08 UTC (rev 2490)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionWithDefaultMessagePullPolicyTest.java	2007-02-28 06:54:10 UTC (rev 2491)
@@ -154,7 +154,7 @@
 
          Thread.sleep(2000);
 
-         //Messages should all be in queue1
+         // Messages should all be in queue1
 
          List msgs = queue1.browse();
          assertEquals(1, msgs.size());
@@ -169,11 +169,11 @@
          receiver2.setMaxRefs(0);
          queue2.add(receiver2);
 
-         //Prompt delivery so the channels know if the receivers are ready
+         // Prompt delivery so the channels know if the receivers are ready
          queue1.deliver();
          Thread.sleep(2000);
 
-         //Pull from 1 to 2
+         // Pull from 1 to 2
 
          receiver2.setMaxRefs(1);
 
@@ -522,6 +522,8 @@
       DefaultClusteredPostOffice office4 = null;
       DefaultClusteredPostOffice office5 = null;
 
+      boolean readOK;
+
       try
       {
          log.trace("Creating post offices");
@@ -631,10 +633,12 @@
 
          log.trace("Waiting for handleInvocations");
          long start = System.currentTimeMillis();
-         receiver.waitForHandleInvocations(NUM_MESSAGES * 5, 60000);
+         readOK = receiver.waitForHandleInvocations(NUM_MESSAGES * 5, 60000);
          long end = System.currentTimeMillis();
          log.trace("I waited for " + (end - start) + " ms");
 
+         assertTrue(readOK);
+
          Thread.sleep(2000);
 
 
@@ -732,6 +736,8 @@
       DefaultClusteredPostOffice office4 = null;
       DefaultClusteredPostOffice office5 = null;
 
+      boolean readOK;
+
       try
       {
          office1 = (DefaultClusteredPostOffice)
@@ -789,7 +795,7 @@
 
          Thread.sleep(2000);
 
-         //Check the sizes
+         // Check the sizes
 
          log.trace("Here are the sizes 1:");
          log.trace("queue1, refs:" + queue1.memoryRefCount() + " dels:" + queue1.getDeliveringCount());
@@ -832,7 +838,8 @@
 
          receiver1.setMaxRefs(5);
          queue1.deliver();
-         receiver1.waitForHandleInvocations(5, 20000);
+         readOK = receiver1.waitForHandleInvocations(5, 20000);
+         assertTrue(readOK);
          Thread.sleep(1000);
          assertEquals(NUM_MESSAGES - 5, queue1.memoryRefCount());
          assertEquals(5, queue1.getDeliveringCount());
@@ -843,7 +850,8 @@
 
          receiver2.setMaxRefs(10);
          queue2.deliver();
-         receiver2.waitForHandleInvocations(10, 20000);
+         readOK = receiver2.waitForHandleInvocations(10, 20000);
+         assertTrue(readOK);
          Thread.sleep(1000);
          assertEquals(NUM_MESSAGES - 10, queue2.memoryRefCount());
          assertEquals(10, queue2.getDeliveringCount());
@@ -852,7 +860,7 @@
 
          receiver3.setMaxRefs(15);
          queue3.deliver();
-         receiver3.waitForHandleInvocations(15, 20000);
+         readOK = receiver3.waitForHandleInvocations(15, 20000);
          Thread.sleep(1000);
          assertEquals(NUM_MESSAGES - 15, queue3.memoryRefCount());
          assertEquals(15, queue3.getDeliveringCount());
@@ -861,7 +869,8 @@
 
          receiver4.setMaxRefs(20);
          queue4.deliver();
-         receiver4.waitForHandleInvocations(20, 20000);
+         readOK = receiver4.waitForHandleInvocations(20, 20000);
+         assertTrue(readOK);
          Thread.sleep(1000);
          assertEquals(NUM_MESSAGES - 20, queue4.memoryRefCount());
          assertEquals(20, queue4.getDeliveringCount());
@@ -870,7 +879,8 @@
 
          receiver5.setMaxRefs(25);
          queue5.deliver();
-         receiver5.waitForHandleInvocations(25, 20000);
+         readOK = receiver5.waitForHandleInvocations(25, 20000);
+         assertTrue(readOK);
          Thread.sleep(1000);
          assertEquals(NUM_MESSAGES - 25, queue5.memoryRefCount());
          assertEquals(25, queue5.getDeliveringCount());
@@ -890,10 +900,11 @@
          log.trace("queue4, refs:" + queue4.memoryRefCount() + " dels:" + queue4.getDeliveringCount());
          log.trace("queue5, refs:" + queue5.memoryRefCount() + " dels:" + queue5.getDeliveringCount());
 
-         //Consume the rest from queue 5
+         // Consume the rest from queue 5
          receiver5.setMaxRefs(NUM_MESSAGES - 25);
          queue5.deliver();
-         receiver5.waitForHandleInvocations(NUM_MESSAGES - 25, 20000);
+         readOK = receiver5.waitForHandleInvocations(NUM_MESSAGES - 25, 20000);
+         assertTrue(readOK);
 
          Thread.sleep(2000);
 
@@ -906,11 +917,12 @@
          log.trace("queue4, refs:" + queue4.memoryRefCount() + " dels:" + queue4.getDeliveringCount());
          log.trace("queue5, refs:" + queue5.memoryRefCount() + " dels:" + queue5.getDeliveringCount());
 
-         //This will result in an extra one being pulled from queue1 - we cannot avoid this
-         //This is because the channel does not know that the receiver is full unless it tries
-         //with a ref so it needs to retrieve one
+         // This will result in an extra one being pulled from queue1 - we cannot avoid this. This
+         // is because the channel does not know that the receiver is full unless it tries with a
+         // ref so it needs to retrieve one
 
-         assertEquals(NUM_MESSAGES - 6, queue1.memoryRefCount());
+                                                                   // http://jira.jboss.org/jira/browse/JBMESSAGING-901                                                                          
+         assertEquals(NUM_MESSAGES - 6, queue1.memoryRefCount());  // <-  expected:<94> but was:<93>
          assertEquals(0, queue1.getDeliveringCount());
 
          assertEquals(NUM_MESSAGES - 10, queue2.memoryRefCount());
@@ -937,13 +949,14 @@
          assertTrue(office4.getHoldingTransactions().isEmpty());
          assertTrue(office5.getHoldingTransactions().isEmpty());
 
-         //Now consume 5 more from queue5, they should come from queue1 which has the most messages
+         // Now consume 5 more from queue5, they should come from queue1 which has the most messages
 
          log.trace("Consume 5 more from queue 5");
 
          receiver5.setMaxRefs(5);
          queue5.deliver();
-         receiver5.waitForHandleInvocations(5, 20000);
+         readOK = receiver5.waitForHandleInvocations(5, 20000);
+         assertTrue(readOK);
 
          Thread.sleep(4000);
 
@@ -954,7 +967,7 @@
          log.trace("queue4, refs:" + queue4.memoryRefCount() + " dels:" + queue4.getDeliveringCount());
          log.trace("queue5, refs:" + queue5.memoryRefCount() + " dels:" + queue5.getDeliveringCount());
 
-         assertEquals(NUM_MESSAGES - 11, queue1.memoryRefCount());
+         assertEquals(NUM_MESSAGES - 11, queue1.memoryRefCount()); // <--  expected:<89> but was:<88>
 
          assertEquals(0, queue1.getDeliveringCount());
 
@@ -982,11 +995,12 @@
          assertTrue(office4.getHoldingTransactions().isEmpty());
          assertTrue(office5.getHoldingTransactions().isEmpty());
 
-         //Consume 1 more - should pull one from queue2
+         // Consume 1 more - should pull one from queue2
 
          receiver5.setMaxRefs(1);
          queue5.deliver();
-         receiver5.waitForHandleInvocations(1, 20000);
+         readOK = receiver5.waitForHandleInvocations(1, 20000);
+         assertTrue(readOK);
 
          Thread.sleep(2000);
 
@@ -1024,12 +1038,13 @@
          assertTrue(office4.getHoldingTransactions().isEmpty());
          assertTrue(office5.getHoldingTransactions().isEmpty());
 
-         //From queue 4 consume everything else
+         // From queue 4 consume everything else
 
          int num = NUM_MESSAGES - 15 + NUM_MESSAGES - 20 + NUM_MESSAGES - 11 + NUM_MESSAGES - 11 + 1;
          receiver4.setMaxRefs(num);
          queue4.deliver();
-         receiver4.waitForHandleInvocations(num, 20000);
+         readOK = receiver4.waitForHandleInvocations(num, 20000);
+         assertTrue(readOK);
 
          Thread.sleep(2000);
 
@@ -1046,7 +1061,7 @@
          assertEquals(0, queue2.memoryRefCount());
          assertEquals(0, queue2.getDeliveringCount());
 
-         assertEquals(0, queue3.memoryRefCount());
+         assertEquals(0, queue3.memoryRefCount()); // <- sometimes, there is still 1 message left in this queue
          assertEquals(0, queue3.getDeliveringCount());
 
          assertEquals(0, queue4.memoryRefCount());




More information about the jboss-cvs-commits mailing list