[jboss-cvs] JBoss Messaging SVN: r2491 - in trunk: tests/src/org/jboss/test/messaging/core and 1 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Feb 28 01:54:10 EST 2007
Author: ovidiu.feodorov at jboss.com
Date: 2007-02-28 01:54:10 -0500 (Wed, 28 Feb 2007)
New Revision: 2491
Modified:
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultMessagePullPolicy.java
trunk/tests/src/org/jboss/test/messaging/core/SimpleReceiver.java
trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionWithDefaultMessagePullPolicyTest.java
Log:
chasing http://jira.jboss.org/jira/browse/JBMESSAGING-901
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultMessagePullPolicy.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultMessagePullPolicy.java 2007-02-28 06:14:08 UTC (rev 2490)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultMessagePullPolicy.java 2007-02-28 06:54:10 UTC (rev 2491)
@@ -25,9 +25,7 @@
import java.util.List;
/**
- * A DefaultMessagePullPolicy
- *
- * This chooses the remote queue with the most messages
+ * A MessagePullPolicy implementation that chooses the remote queue with the most messages.
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @version <tt>$Revision$</tt>
@@ -37,37 +35,53 @@
*/
public class DefaultMessagePullPolicy implements MessagePullPolicy
{
+ // Constants ------------------------------------------------------------------------------------
+
+ // Static ---------------------------------------------------------------------------------------
+
+ // Attributes -----------------------------------------------------------------------------------
+
+ // Constructors ---------------------------------------------------------------------------------
+
+ // MessagePullPolicy implementation -------------------------------------------------------------
+
public ClusteredQueue chooseQueue(List queues)
{
- Iterator iter = queues.iterator();
-
+ int maxMessages = 0;
ClusteredQueue chosenQueue = null;
-
- int maxMessages = 0;
-
- while (iter.hasNext())
+
+ for(Iterator i = queues.iterator(); i.hasNext(); )
{
- ClusteredQueue queue = (ClusteredQueue)iter.next();
-
+ ClusteredQueue queue = (ClusteredQueue)i.next();
+
if (!queue.isLocal())
- {
+ {
QueueStats stats = queue.getStats();
-
+
if (stats != null)
- {
+ {
int cnt = stats.getMessageCount();
-
+
if (cnt > maxMessages)
{
maxMessages = cnt;
-
chosenQueue = queue;
}
}
}
}
-
+
return chosenQueue;
}
+ // Public ---------------------------------------------------------------------------------------
+
+ // Package protected ----------------------------------------------------------------------------
+
+ // Protected ------------------------------------------------------------------------------------
+
+ // Private --------------------------------------------------------------------------------------
+
+ // Inner classes --------------------------------------------------------------------------------
+
}
Modified: trunk/tests/src/org/jboss/test/messaging/core/SimpleReceiver.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/SimpleReceiver.java 2007-02-28 06:14:08 UTC (rev 2490)
+++ trunk/tests/src/org/jboss/test/messaging/core/SimpleReceiver.java 2007-02-28 06:54:10 UTC (rev 2491)
@@ -258,6 +258,7 @@
{
if (timeout < 0)
{
+ log.trace(this + ".waitForHandleInvocations() current timeout is " + timeout);
resetInvocationCount();
return false;
}
@@ -277,7 +278,6 @@
}
resetInvocationCount();
-
return true;
}
Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionWithDefaultMessagePullPolicyTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionWithDefaultMessagePullPolicyTest.java 2007-02-28 06:14:08 UTC (rev 2490)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionWithDefaultMessagePullPolicyTest.java 2007-02-28 06:54:10 UTC (rev 2491)
@@ -154,7 +154,7 @@
Thread.sleep(2000);
- //Messages should all be in queue1
+ // Messages should all be in queue1
List msgs = queue1.browse();
assertEquals(1, msgs.size());
@@ -169,11 +169,11 @@
receiver2.setMaxRefs(0);
queue2.add(receiver2);
- //Prompt delivery so the channels know if the receivers are ready
+ // Prompt delivery so the channels know if the receivers are ready
queue1.deliver();
Thread.sleep(2000);
- //Pull from 1 to 2
+ // Pull from 1 to 2
receiver2.setMaxRefs(1);
@@ -522,6 +522,8 @@
DefaultClusteredPostOffice office4 = null;
DefaultClusteredPostOffice office5 = null;
+ boolean readOK;
+
try
{
log.trace("Creating post offices");
@@ -631,10 +633,12 @@
log.trace("Waiting for handleInvocations");
long start = System.currentTimeMillis();
- receiver.waitForHandleInvocations(NUM_MESSAGES * 5, 60000);
+ readOK = receiver.waitForHandleInvocations(NUM_MESSAGES * 5, 60000);
long end = System.currentTimeMillis();
log.trace("I waited for " + (end - start) + " ms");
+ assertTrue(readOK);
+
Thread.sleep(2000);
@@ -732,6 +736,8 @@
DefaultClusteredPostOffice office4 = null;
DefaultClusteredPostOffice office5 = null;
+ boolean readOK;
+
try
{
office1 = (DefaultClusteredPostOffice)
@@ -789,7 +795,7 @@
Thread.sleep(2000);
- //Check the sizes
+ // Check the sizes
log.trace("Here are the sizes 1:");
log.trace("queue1, refs:" + queue1.memoryRefCount() + " dels:" + queue1.getDeliveringCount());
@@ -832,7 +838,8 @@
receiver1.setMaxRefs(5);
queue1.deliver();
- receiver1.waitForHandleInvocations(5, 20000);
+ readOK = receiver1.waitForHandleInvocations(5, 20000);
+ assertTrue(readOK);
Thread.sleep(1000);
assertEquals(NUM_MESSAGES - 5, queue1.memoryRefCount());
assertEquals(5, queue1.getDeliveringCount());
@@ -843,7 +850,8 @@
receiver2.setMaxRefs(10);
queue2.deliver();
- receiver2.waitForHandleInvocations(10, 20000);
+ readOK = receiver2.waitForHandleInvocations(10, 20000);
+ assertTrue(readOK);
Thread.sleep(1000);
assertEquals(NUM_MESSAGES - 10, queue2.memoryRefCount());
assertEquals(10, queue2.getDeliveringCount());
@@ -852,7 +860,7 @@
receiver3.setMaxRefs(15);
queue3.deliver();
- receiver3.waitForHandleInvocations(15, 20000);
+ readOK = receiver3.waitForHandleInvocations(15, 20000);
Thread.sleep(1000);
assertEquals(NUM_MESSAGES - 15, queue3.memoryRefCount());
assertEquals(15, queue3.getDeliveringCount());
@@ -861,7 +869,8 @@
receiver4.setMaxRefs(20);
queue4.deliver();
- receiver4.waitForHandleInvocations(20, 20000);
+ readOK = receiver4.waitForHandleInvocations(20, 20000);
+ assertTrue(readOK);
Thread.sleep(1000);
assertEquals(NUM_MESSAGES - 20, queue4.memoryRefCount());
assertEquals(20, queue4.getDeliveringCount());
@@ -870,7 +879,8 @@
receiver5.setMaxRefs(25);
queue5.deliver();
- receiver5.waitForHandleInvocations(25, 20000);
+ readOK = receiver5.waitForHandleInvocations(25, 20000);
+ assertTrue(readOK);
Thread.sleep(1000);
assertEquals(NUM_MESSAGES - 25, queue5.memoryRefCount());
assertEquals(25, queue5.getDeliveringCount());
@@ -890,10 +900,11 @@
log.trace("queue4, refs:" + queue4.memoryRefCount() + " dels:" + queue4.getDeliveringCount());
log.trace("queue5, refs:" + queue5.memoryRefCount() + " dels:" + queue5.getDeliveringCount());
- //Consume the rest from queue 5
+ // Consume the rest from queue 5
receiver5.setMaxRefs(NUM_MESSAGES - 25);
queue5.deliver();
- receiver5.waitForHandleInvocations(NUM_MESSAGES - 25, 20000);
+ readOK = receiver5.waitForHandleInvocations(NUM_MESSAGES - 25, 20000);
+ assertTrue(readOK);
Thread.sleep(2000);
@@ -906,11 +917,12 @@
log.trace("queue4, refs:" + queue4.memoryRefCount() + " dels:" + queue4.getDeliveringCount());
log.trace("queue5, refs:" + queue5.memoryRefCount() + " dels:" + queue5.getDeliveringCount());
- //This will result in an extra one being pulled from queue1 - we cannot avoid this
- //This is because the channel does not know that the receiver is full unless it tries
- //with a ref so it needs to retrieve one
+ // This will result in an extra one being pulled from queue1 - we cannot avoid this. This
+ // is because the channel does not know that the receiver is full unless it tries with a
+ // ref so it needs to retrieve one
- assertEquals(NUM_MESSAGES - 6, queue1.memoryRefCount());
+ // http://jira.jboss.org/jira/browse/JBMESSAGING-901
+ assertEquals(NUM_MESSAGES - 6, queue1.memoryRefCount()); // <- expected:<94> but was:<93>
assertEquals(0, queue1.getDeliveringCount());
assertEquals(NUM_MESSAGES - 10, queue2.memoryRefCount());
@@ -937,13 +949,14 @@
assertTrue(office4.getHoldingTransactions().isEmpty());
assertTrue(office5.getHoldingTransactions().isEmpty());
- //Now consume 5 more from queue5, they should come from queue1 which has the most messages
+ // Now consume 5 more from queue5, they should come from queue1 which has the most messages
log.trace("Consume 5 more from queue 5");
receiver5.setMaxRefs(5);
queue5.deliver();
- receiver5.waitForHandleInvocations(5, 20000);
+ readOK = receiver5.waitForHandleInvocations(5, 20000);
+ assertTrue(readOK);
Thread.sleep(4000);
@@ -954,7 +967,7 @@
log.trace("queue4, refs:" + queue4.memoryRefCount() + " dels:" + queue4.getDeliveringCount());
log.trace("queue5, refs:" + queue5.memoryRefCount() + " dels:" + queue5.getDeliveringCount());
- assertEquals(NUM_MESSAGES - 11, queue1.memoryRefCount());
+ assertEquals(NUM_MESSAGES - 11, queue1.memoryRefCount()); // <-- expected:<89> but was:<88>
assertEquals(0, queue1.getDeliveringCount());
@@ -982,11 +995,12 @@
assertTrue(office4.getHoldingTransactions().isEmpty());
assertTrue(office5.getHoldingTransactions().isEmpty());
- //Consume 1 more - should pull one from queue2
+ // Consume 1 more - should pull one from queue2
receiver5.setMaxRefs(1);
queue5.deliver();
- receiver5.waitForHandleInvocations(1, 20000);
+ readOK = receiver5.waitForHandleInvocations(1, 20000);
+ assertTrue(readOK);
Thread.sleep(2000);
@@ -1024,12 +1038,13 @@
assertTrue(office4.getHoldingTransactions().isEmpty());
assertTrue(office5.getHoldingTransactions().isEmpty());
- //From queue 4 consume everything else
+ // From queue 4 consume everything else
int num = NUM_MESSAGES - 15 + NUM_MESSAGES - 20 + NUM_MESSAGES - 11 + NUM_MESSAGES - 11 + 1;
receiver4.setMaxRefs(num);
queue4.deliver();
- receiver4.waitForHandleInvocations(num, 20000);
+ readOK = receiver4.waitForHandleInvocations(num, 20000);
+ assertTrue(readOK);
Thread.sleep(2000);
@@ -1046,7 +1061,7 @@
assertEquals(0, queue2.memoryRefCount());
assertEquals(0, queue2.getDeliveringCount());
- assertEquals(0, queue3.memoryRefCount());
+ assertEquals(0, queue3.memoryRefCount()); // <- sometimes, there is still 1 message left in this queue
assertEquals(0, queue3.getDeliveringCount());
assertEquals(0, queue4.memoryRefCount());
More information about the jboss-cvs-commits
mailing list