[jboss-cvs] JBoss Messaging SVN: r1485 - in trunk: src/main/org/jboss/messaging/core/plugin/postoffice/cluster tests tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Oct 17 10:49:28 EDT 2006
Author: timfox
Date: 2006-10-17 10:49:19 -0400 (Tue, 17 Oct 2006)
New Revision: 1485
Added:
trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionWithDefaultMessagePullPolicyTest.java
Removed:
trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionTest.java
Modified:
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesResultRequest.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/TransactionRequest.java
trunk/tests/build.xml
trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RecoveryTest.java
Log:
Some tests on message pull recovery
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java 2006-10-17 10:37:37 UTC (rev 1484)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java 2006-10-17 14:49:19 UTC (rev 1485)
@@ -1031,13 +1031,9 @@
if (remoteQueue != null)
{
- localQueue.handlePullMessagesResult(remoteQueue, messages, holdingTxId);
+ localQueue.handlePullMessagesResult(remoteQueue, messages, holdingTxId,
+ failBeforeCommit, failAfterCommit);
}
- else
- {
- //TODO need to send a rollback to the remote queue otherwise will get leak on remote node
- //in holding area
- }
}
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java 2006-10-17 10:37:37 UTC (rev 1484)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java 2006-10-17 14:49:19 UTC (rev 1485)
@@ -178,11 +178,13 @@
acknowledgeInternal(d, null, false, false);
}
- public void handlePullMessagesResult(RemoteQueueStub remoteQueue, List messages, long holdingTxId) throws Exception
+ public void handlePullMessagesResult(RemoteQueueStub remoteQueue, List messages,
+ long holdingTxId, boolean failBeforeCommit, boolean failAfterCommit) throws Exception
{
//This needs to be run on a different thread to the one used by JGroups to deliver the message
//to avoid deadlock
- Runnable runnable = new MessagePullResultRunnable(remoteQueue, messages, holdingTxId);
+ Runnable runnable = new MessagePullResultRunnable(remoteQueue, messages, holdingTxId,
+ failBeforeCommit, failAfterCommit);
executor.execute(runnable);
}
@@ -225,10 +227,9 @@
protected void deliverInternal() throws Throwable
{
super.deliverInternal();
-
+
//If the receivers are still ready to accept more refs then we might pull messages
- //from a remote queue
-
+ //from a remote queue
if (receiversReady && pullQueue != null)
{
//We send a message to the remote queue to pull a message - the remote queue will then send back
@@ -457,15 +458,23 @@
private List messages;
private long holdingTxId;
+
+ //for testing only
+ private boolean failBeforeCommit;
+ private boolean failAfterCommit;
private MessagePullResultRunnable(RemoteQueueStub remoteQueue,
- List messages, long holdingTxId)
+ List messages, long holdingTxId,
+ boolean failBeforeCommit, boolean failAfterCommit)
{
this.remoteQueue = remoteQueue;
this.messages = messages;
this.holdingTxId = holdingTxId;
+
+ this.failBeforeCommit = failBeforeCommit;
+ this.failAfterCommit = failAfterCommit;
}
public void run()
@@ -504,7 +513,7 @@
//Should be executed synchronously since we already in the event queue
Delivery delRet = handleInternal(null, ref, tx, true, true);
-
+
if (delRet == null || !delRet.isSelectorAccepted())
{
//This should never happen
@@ -524,9 +533,21 @@
del.acknowledge(tx);
}
+
+ //For testing to simulate failures
+ if (failBeforeCommit)
+ {
+ throw new Exception("Test failure before commit");
+ }
tx.commit();
-
+
+ //For testing to simulate failures
+ if (failAfterCommit)
+ {
+ throw new Exception("Test failure after commit");
+ }
+
//TODO what if commit throws an exception - this means the commit message doesn't hit the
//remote node so the holding transaction stays in the holding area
//Need to catch the exception and throw a check message
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesResultRequest.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesResultRequest.java 2006-10-17 10:37:37 UTC (rev 1484)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesResultRequest.java 2006-10-17 14:49:19 UTC (rev 1485)
@@ -27,6 +27,7 @@
import java.util.Iterator;
import java.util.List;
+import org.jboss.logging.Logger;
import org.jboss.messaging.core.Message;
import org.jboss.messaging.core.message.MessageFactory;
@@ -42,6 +43,8 @@
*/
public class PullMessagesResultRequest extends ClusterRequest
{
+ private static final Logger log = Logger.getLogger(PullMessagesResultRequest.class);
+
public static final int TYPE = 2;
private long holdingTxId;
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/TransactionRequest.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/TransactionRequest.java 2006-10-17 10:37:37 UTC (rev 1484)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/TransactionRequest.java 2006-10-17 14:49:19 UTC (rev 1485)
@@ -24,8 +24,10 @@
import java.io.DataInputStream;
import java.io.DataOutputStream;
+import org.jboss.logging.Logger;
+
/**
* A TransactionRequest
*
@@ -37,6 +39,8 @@
*/
abstract class TransactionRequest extends ClusterRequest implements ClusterTransaction
{
+ private static final Logger log = Logger.getLogger(TransactionRequest.class);
+
protected int nodeId;
protected long txId;
Modified: trunk/tests/build.xml
===================================================================
--- trunk/tests/build.xml 2006-10-17 10:37:37 UTC (rev 1484)
+++ trunk/tests/build.xml 2006-10-17 14:49:19 UTC (rev 1485)
@@ -338,7 +338,7 @@
haltonerror="${junit.batchtest.haltonerror}">
<formatter type="plain" usefile="${junit.formatter.usefile}"/>
<fileset dir="${build.tests.classes}">
- <!-- <include name="**/messaging/core/**/*Test.class"/> -->
+ <include name="**/messaging/core/**/*Test.class"/>
<include name="**/messaging/jms/**/*Test.class"/>
<exclude name="**/jms/stress/**"/>
<exclude name="**/jms/crash/*Test.class"/>
Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RecoveryTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RecoveryTest.java 2006-10-17 10:37:37 UTC (rev 1484)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RecoveryTest.java 2006-10-17 14:49:19 UTC (rev 1485)
@@ -165,8 +165,10 @@
msgs = receiver3.getMessages();
assertTrue(msgs.isEmpty());
- assertEquals(1, office1.getHoldingTransactions().size());
+ //Nodes 2 and 3 should have a held tx
+ assertTrue(office1.getHoldingTransactions().isEmpty());
+
assertEquals(1, office2.getHoldingTransactions().size());
assertEquals(1, office3.getHoldingTransactions().size());
@@ -176,6 +178,8 @@
Thread.sleep(1000);
+ //This should result in the held txs being rolled back
+
assertTrue(office1.getHoldingTransactions().isEmpty());
assertTrue(office2.getHoldingTransactions().isEmpty());
@@ -271,6 +275,9 @@
msgs = receiver2.getMessages();
assertTrue(msgs.isEmpty());
+ msgs = receiver3.getMessages();
+ assertTrue(msgs.isEmpty());
+
try
{
//An exception should be thrown
@@ -293,8 +300,10 @@
msgs = receiver3.getMessages();
assertTrue(msgs.isEmpty());
- assertEquals(1, office1.getHoldingTransactions().size());
+ //There should be held tx in 2 and 3 but not in 1
+ assertTrue(office1.getHoldingTransactions().isEmpty());
+
assertEquals(1, office2.getHoldingTransactions().size());
assertEquals(1, office3.getHoldingTransactions().size());
@@ -319,6 +328,7 @@
msgs = receiver3.getMessages();
assertEquals(NUM_MESSAGES, msgs.size());
+
}
finally
{
@@ -334,6 +344,9 @@
}
}
+
+
+
protected ClusteredPostOffice createClusteredPostOffice(int nodeId, String groupName) throws Exception
{
MessagePullPolicy redistPolicy = new NullMessagePullPolicy();
Deleted: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionTest.java 2006-10-17 10:37:37 UTC (rev 1484)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionTest.java 2006-10-17 14:49:19 UTC (rev 1485)
@@ -1,936 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.test.messaging.core.plugin.postoffice.cluster;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-import org.jboss.messaging.core.Channel;
-import org.jboss.messaging.core.Delivery;
-import org.jboss.messaging.core.DeliveryObserver;
-import org.jboss.messaging.core.FilterFactory;
-import org.jboss.messaging.core.Message;
-import org.jboss.messaging.core.MessageReference;
-import org.jboss.messaging.core.Receiver;
-import org.jboss.messaging.core.SimpleDelivery;
-import org.jboss.messaging.core.plugin.contract.ClusteredPostOffice;
-import org.jboss.messaging.core.plugin.postoffice.Binding;
-import org.jboss.messaging.core.plugin.postoffice.cluster.ClusterRouterFactory;
-import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultClusteredPostOffice;
-import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultMessagePullPolicy;
-import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultRouterFactory;
-import org.jboss.messaging.core.plugin.postoffice.cluster.LocalClusteredQueue;
-import org.jboss.messaging.core.plugin.postoffice.cluster.MessagePullPolicy;
-import org.jboss.messaging.core.tx.Transaction;
-import org.jboss.test.messaging.core.SimpleFilterFactory;
-import org.jboss.test.messaging.core.SimpleReceiver;
-import org.jboss.test.messaging.core.plugin.base.ClusteringTestBase;
-
-import EDU.oswego.cs.dl.util.concurrent.Executor;
-import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
-
-
-public class RedistributionTest extends ClusteringTestBase
-{
- // Constants -----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- public RedistributionTest(String name)
- {
- super(name);
- }
-
- // Public --------------------------------------------------------
-
- public void setUp() throws Exception
- {
- super.setUp();
- }
-
- public void tearDown() throws Exception
- {
- super.tearDown();
- }
-
-
- public void testConsumeAllNonPersistentNonRecoverable() throws Throwable
- {
- consumeAll(false, false);
- }
-
- public void testConsumeAllPersistentNonRecoverable() throws Throwable
- {
- consumeAll(true, false);
- }
-
- public void testConsumeAllNonPersistentRecoverable() throws Throwable
- {
- consumeAll(false, true);
- }
-
- public void testConsumeAllPersistentRecoverable() throws Throwable
- {
- consumeAll(true, true);
- }
-
-
-
- public void testConsumeBitByBitNonPersistentNonRecoverable() throws Throwable
- {
- consumeBitByBit(false, false);
- }
-
- public void testConsumeBitByBitPersistentNonRecoverable() throws Throwable
- {
- consumeBitByBit(true, false);
- }
-
- public void testConsumeBitByBitNonPersistentRecoverable() throws Throwable
- {
- consumeBitByBit(false, true);
- }
-
- public void testConsumeBitByBitPersistentRecoverable() throws Throwable
- {
- consumeBitByBit(true, true);
- }
-
-
-
-
-//
-// public void testConsumeConcurrentlyNonPersistentNonRecoverable() throws Throwable
-// {
-// consumeConcurrently(false, false);
-// }
-//
-// public void testConsumeConsumeConcurrentlyPersistentNonRecoverable() throws Throwable
-// {
-// consumeConcurrently(true, false);
-// }
-//
-// public void testConsumeConsumeConcurrentlyNonPersistentRecoverable() throws Throwable
-// {
-// consumeConcurrently(false, true);
-// }
-//
-// public void testConsumeConsumeConcurrentlyPersistentRecoverable() throws Throwable
-// {
-// consumeConcurrently(true, true);
-// }
-
- protected void consumeAll(boolean persistent, boolean recoverable) throws Throwable
- {
- DefaultClusteredPostOffice office1 = null;
-
- DefaultClusteredPostOffice office2 = null;
-
- DefaultClusteredPostOffice office3 = null;
-
- DefaultClusteredPostOffice office4 = null;
-
- DefaultClusteredPostOffice office5 = null;
-
- try
- {
- office1 = (DefaultClusteredPostOffice)createClusteredPostOffice(1, "testgroup");
-
- office2 = (DefaultClusteredPostOffice)createClusteredPostOffice(2, "testgroup");
-
- office3 = (DefaultClusteredPostOffice)createClusteredPostOffice(3, "testgroup");
-
- office4 = (DefaultClusteredPostOffice)createClusteredPostOffice(4, "testgroup");
-
- office5 = (DefaultClusteredPostOffice)createClusteredPostOffice(5, "testgroup");
-
- LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
- Binding binding1 = office1.bindClusteredQueue("queue1", queue1);
-
- LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, 2, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
- Binding binding2 = office2.bindClusteredQueue("queue1", queue2);
-
- LocalClusteredQueue queue3 = new LocalClusteredQueue(office3, 3, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
- Binding binding3 = office3.bindClusteredQueue("queue1", queue3);
-
- LocalClusteredQueue queue4 = new LocalClusteredQueue(office4, 4, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
- Binding binding4 = office4.bindClusteredQueue("queue1", queue4);
-
- LocalClusteredQueue queue5 = new LocalClusteredQueue(office5, 5, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
- Binding binding5 = office5.bindClusteredQueue("queue1", queue5);
-
- final int NUM_MESSAGES = 100;
-
- this.sendMessages("queue1", persistent, office1, NUM_MESSAGES, null);
- this.sendMessages("queue1", persistent, office2, NUM_MESSAGES, null);
- this.sendMessages("queue1", persistent, office3, NUM_MESSAGES, null);
- this.sendMessages("queue1", persistent, office4, NUM_MESSAGES, null);
- this.sendMessages("queue1", persistent, office5, NUM_MESSAGES, null);
-
- Thread.sleep(2000);
-
- //Check the sizes
-
- log.info("Here are the sizes:");
- log.info("queue1, refs:" + queue1.memoryRefCount() + " dels:" + queue1.memoryDeliveryCount());
- log.info("queue2, refs:" + queue2.memoryRefCount() + " dels:" + queue2.memoryDeliveryCount());
- log.info("queue3, refs:" + queue3.memoryRefCount() + " dels:" + queue3.memoryDeliveryCount());
- log.info("queue4, refs:" + queue4.memoryRefCount() + " dels:" + queue4.memoryDeliveryCount());
- log.info("queue5, refs:" + queue5.memoryRefCount() + " dels:" + queue5.memoryDeliveryCount());
-
- assertEquals(NUM_MESSAGES, queue1.memoryRefCount());
- assertEquals(0, queue1.memoryDeliveryCount());
-
- assertEquals(NUM_MESSAGES, queue2.memoryRefCount());
- assertEquals(0, queue2.memoryDeliveryCount());
-
- assertEquals(NUM_MESSAGES, queue3.memoryRefCount());
- assertEquals(0, queue3.memoryDeliveryCount());
-
- assertEquals(NUM_MESSAGES, queue4.memoryRefCount());
- assertEquals(0, queue4.memoryDeliveryCount());
-
- assertEquals(NUM_MESSAGES, queue5.memoryRefCount());
- assertEquals(0, queue5.memoryDeliveryCount());
-
- SimpleReceiver receiver = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
-
- queue1.add(receiver);
-
- queue1.deliver(false);
-
- Thread.sleep(7000);
-
- log.info("Here are the sizes:");
- log.info("queue1, refs:" + queue1.memoryRefCount() + " dels:" + queue1.memoryDeliveryCount());
- log.info("queue2, refs:" + queue2.memoryRefCount() + " dels:" + queue2.memoryDeliveryCount());
- log.info("queue3, refs:" + queue3.memoryRefCount() + " dels:" + queue3.memoryDeliveryCount());
- log.info("queue4, refs:" + queue4.memoryRefCount() + " dels:" + queue4.memoryDeliveryCount());
- log.info("queue5, refs:" + queue5.memoryRefCount() + " dels:" + queue5.memoryDeliveryCount());
-
- assertEquals(0, queue1.memoryRefCount());
- assertEquals(NUM_MESSAGES * 5, queue1.memoryDeliveryCount());
-
- assertEquals(0, queue2.memoryRefCount());
- assertEquals(0, queue2.memoryDeliveryCount());
-
- assertEquals(0, queue3.memoryRefCount());
- assertEquals(0, queue3.memoryDeliveryCount());
-
- assertEquals(0, queue4.memoryRefCount());
- assertEquals(0, queue4.memoryDeliveryCount());
-
- assertEquals(0, queue5.memoryRefCount());
- assertEquals(0, queue5.memoryDeliveryCount());
-
- List messages = receiver.getMessages();
-
- assertNotNull(messages);
-
- assertEquals(NUM_MESSAGES * 5, messages.size());
-
- Iterator iter = messages.iterator();
-
- while (iter.hasNext())
- {
- Message msg = (Message)iter.next();
-
- receiver.acknowledge(msg, null);
- }
-
- receiver.clear();
-
- assertEquals(0, queue1.memoryRefCount());
- assertEquals(0, queue1.memoryDeliveryCount());
-
- assertTrue(office1.getHoldingTransactions().isEmpty());
- assertTrue(office2.getHoldingTransactions().isEmpty());
- assertTrue(office3.getHoldingTransactions().isEmpty());
- assertTrue(office4.getHoldingTransactions().isEmpty());
- assertTrue(office5.getHoldingTransactions().isEmpty());
-
- checkNoMessageData();
- }
- finally
- {
- if (office1 != null)
- {
- office1.stop();
- }
-
- if (office2 != null)
- {
- office2.stop();
- }
-
- if (office3 != null)
- {
- office3.stop();
- }
-
- if (office4 != null)
- {
- office4.stop();
- }
-
- if (office5 != null)
- {
- office5.stop();
- }
- }
- }
-
- protected void consumeBitByBit(boolean persistent, boolean recoverable) throws Throwable
- {
- DefaultClusteredPostOffice office1 = null;
-
- DefaultClusteredPostOffice office2 = null;
-
- DefaultClusteredPostOffice office3 = null;
-
- DefaultClusteredPostOffice office4 = null;
-
- DefaultClusteredPostOffice office5 = null;
-
- try
- {
- office1 = (DefaultClusteredPostOffice)createClusteredPostOffice(1, "testgroup");
-
- office2 = (DefaultClusteredPostOffice)createClusteredPostOffice(2, "testgroup");
-
- office3 = (DefaultClusteredPostOffice)createClusteredPostOffice(3, "testgroup");
-
- office4 = (DefaultClusteredPostOffice)createClusteredPostOffice(4, "testgroup");
-
- office5 = (DefaultClusteredPostOffice)createClusteredPostOffice(5, "testgroup");
-
- LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
- Binding binding1 = office1.bindClusteredQueue("queue1", queue1);
-
- LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, 2, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
- Binding binding2 = office2.bindClusteredQueue("queue1", queue2);
-
- LocalClusteredQueue queue3 = new LocalClusteredQueue(office3, 3, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
- Binding binding3 = office3.bindClusteredQueue("queue1", queue3);
-
- LocalClusteredQueue queue4 = new LocalClusteredQueue(office4, 4, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
- Binding binding4 = office4.bindClusteredQueue("queue1", queue4);
-
- LocalClusteredQueue queue5 = new LocalClusteredQueue(office5, 5, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
- Binding binding5 = office5.bindClusteredQueue("queue1", queue5);
-
- final int NUM_MESSAGES = 100;
-
- this.sendMessages("queue1", persistent, office1, NUM_MESSAGES, null);
- this.sendMessages("queue1", persistent, office2, NUM_MESSAGES, null);
- this.sendMessages("queue1", persistent, office3, NUM_MESSAGES, null);
- this.sendMessages("queue1", persistent, office4, NUM_MESSAGES, null);
- this.sendMessages("queue1", persistent, office5, NUM_MESSAGES, null);
-
- Thread.sleep(2000);
-
- //Check the sizes
-
- log.info("Here are the sizes 1:");
- log.info("queue1, refs:" + queue1.memoryRefCount() + " dels:" + queue1.memoryDeliveryCount());
- log.info("queue2, refs:" + queue2.memoryRefCount() + " dels:" + queue2.memoryDeliveryCount());
- log.info("queue3, refs:" + queue3.memoryRefCount() + " dels:" + queue3.memoryDeliveryCount());
- log.info("queue4, refs:" + queue4.memoryRefCount() + " dels:" + queue4.memoryDeliveryCount());
- log.info("queue5, refs:" + queue5.memoryRefCount() + " dels:" + queue5.memoryDeliveryCount());
-
- assertEquals(NUM_MESSAGES, queue1.memoryRefCount());
- assertEquals(0, queue1.memoryDeliveryCount());
-
- assertEquals(NUM_MESSAGES, queue2.memoryRefCount());
- assertEquals(0, queue2.memoryDeliveryCount());
-
- assertEquals(NUM_MESSAGES, queue3.memoryRefCount());
- assertEquals(0, queue3.memoryDeliveryCount());
-
- assertEquals(NUM_MESSAGES, queue4.memoryRefCount());
- assertEquals(0, queue4.memoryDeliveryCount());
-
- assertEquals(NUM_MESSAGES, queue5.memoryRefCount());
- assertEquals(0, queue5.memoryDeliveryCount());
-
- assertTrue(office1.getHoldingTransactions().isEmpty());
- assertTrue(office2.getHoldingTransactions().isEmpty());
- assertTrue(office3.getHoldingTransactions().isEmpty());
- assertTrue(office4.getHoldingTransactions().isEmpty());
- assertTrue(office5.getHoldingTransactions().isEmpty());
-
- SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING_TO_MAX);
- queue1.add(receiver1);
- SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING_TO_MAX);
- queue2.add(receiver2);
- SimpleReceiver receiver3 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING_TO_MAX);
- queue3.add(receiver3);
- SimpleReceiver receiver4 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING_TO_MAX);
- queue4.add(receiver4);
- SimpleReceiver receiver5 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING_TO_MAX);
- queue5.add(receiver5);
-
- receiver1.setMaxRefs(5);
- queue1.deliver(false);
- Thread.sleep(1000);
- assertEquals(NUM_MESSAGES - 5, queue1.memoryRefCount());
- assertEquals(5, queue1.memoryDeliveryCount());
-
- acknowledgeAll(receiver1);
- assertEquals(0, queue1.memoryDeliveryCount());
- receiver1.setMaxRefs(0);
-
- receiver2.setMaxRefs(10);
- queue2.deliver(false);
- Thread.sleep(1000);
- assertEquals(NUM_MESSAGES - 10, queue2.memoryRefCount());
- assertEquals(10, queue2.memoryDeliveryCount());
- acknowledgeAll(receiver2);
- receiver2.setMaxRefs(0);
-
- receiver3.setMaxRefs(15);
- queue3.deliver(false);
- Thread.sleep(1000);
- assertEquals(NUM_MESSAGES - 15, queue3.memoryRefCount());
- assertEquals(15, queue3.memoryDeliveryCount());
- acknowledgeAll(receiver3);
- receiver3.setMaxRefs(0);
-
- receiver4.setMaxRefs(20);
- queue4.deliver(false);
- Thread.sleep(1000);
- assertEquals(NUM_MESSAGES - 20, queue4.memoryRefCount());
- assertEquals(20, queue4.memoryDeliveryCount());
- acknowledgeAll(receiver4);
- receiver4.setMaxRefs(0);
-
- receiver5.setMaxRefs(25);
- queue5.deliver(false);
- Thread.sleep(1000);
- assertEquals(NUM_MESSAGES - 25, queue5.memoryRefCount());
- assertEquals(25, queue5.memoryDeliveryCount());
- acknowledgeAll(receiver5);
- receiver5.setMaxRefs(0);
-
- Thread.sleep(1000);
-
- assertTrue(office1.getHoldingTransactions().isEmpty());
- assertTrue(office2.getHoldingTransactions().isEmpty());
- assertTrue(office3.getHoldingTransactions().isEmpty());
- assertTrue(office4.getHoldingTransactions().isEmpty());
- assertTrue(office5.getHoldingTransactions().isEmpty());
-
- log.info("Here are the sizes 2:");
- log.info("queue1, refs:" + queue1.memoryRefCount() + " dels:" + queue1.memoryDeliveryCount());
- log.info("queue2, refs:" + queue2.memoryRefCount() + " dels:" + queue2.memoryDeliveryCount());
- log.info("queue3, refs:" + queue3.memoryRefCount() + " dels:" + queue3.memoryDeliveryCount());
- log.info("queue4, refs:" + queue4.memoryRefCount() + " dels:" + queue4.memoryDeliveryCount());
- log.info("queue5, refs:" + queue5.memoryRefCount() + " dels:" + queue5.memoryDeliveryCount());
-
- //Consume the rest from queue 5
- receiver5.setMaxRefs(NUM_MESSAGES - 25);
- queue5.deliver(false);
- Thread.sleep(5000);
-
- log.info("receiver5 msgs:" + receiver5.getMessages().size());
-
- log.info("Here are the sizes 3:");
- log.info("queue1, refs:" + queue1.memoryRefCount() + " dels:" + queue1.memoryDeliveryCount());
- log.info("queue2, refs:" + queue2.memoryRefCount() + " dels:" + queue2.memoryDeliveryCount());
- log.info("queue3, refs:" + queue3.memoryRefCount() + " dels:" + queue3.memoryDeliveryCount());
- log.info("queue4, refs:" + queue4.memoryRefCount() + " dels:" + queue4.memoryDeliveryCount());
- log.info("queue5, refs:" + queue5.memoryRefCount() + " dels:" + queue5.memoryDeliveryCount());
-
- //This will result in an extra one being pulled from queue1 - we cannot avoid this
- //This is because the channel does not know that the receiver is full unless it tries
- //with a ref so it needs to retrieve one
-
- assertEquals(NUM_MESSAGES - 6, queue1.memoryRefCount());
- assertEquals(0, queue1.memoryDeliveryCount());
-
- assertEquals(NUM_MESSAGES - 10, queue2.memoryRefCount());
- assertEquals(0, queue2.memoryDeliveryCount());
-
- assertEquals(NUM_MESSAGES - 15, queue3.memoryRefCount());
- assertEquals(0, queue3.memoryDeliveryCount());
-
- assertEquals(NUM_MESSAGES - 20, queue4.memoryRefCount());
- assertEquals(0, queue4.memoryDeliveryCount());
-
- assertEquals(1, queue5.memoryRefCount());
- assertEquals(NUM_MESSAGES - 25, queue5.memoryDeliveryCount());
-
- acknowledgeAll(receiver5);
-
- assertEquals(0, queue5.memoryDeliveryCount());
-
- receiver5.setMaxRefs(0);
-
- assertTrue(office1.getHoldingTransactions().isEmpty());
- assertTrue(office2.getHoldingTransactions().isEmpty());
- assertTrue(office3.getHoldingTransactions().isEmpty());
- assertTrue(office4.getHoldingTransactions().isEmpty());
- assertTrue(office5.getHoldingTransactions().isEmpty());
-
- //Now consume 5 more from queue5, they should come from queue1 which has the most messages
-
- log.info("Consume 5 more from queue 5");
-
- receiver5.setMaxRefs(5);
- queue5.deliver(false);
- Thread.sleep(3000);
-
- log.info("Here are the sizes 4:");
- log.info("queue1, refs:" + queue1.memoryRefCount() + " dels:" + queue1.memoryDeliveryCount());
- log.info("queue2, refs:" + queue2.memoryRefCount() + " dels:" + queue2.memoryDeliveryCount());
- log.info("queue3, refs:" + queue3.memoryRefCount() + " dels:" + queue3.memoryDeliveryCount());
- log.info("queue4, refs:" + queue4.memoryRefCount() + " dels:" + queue4.memoryDeliveryCount());
- log.info("queue5, refs:" + queue5.memoryRefCount() + " dels:" + queue5.memoryDeliveryCount());
-
- assertEquals(NUM_MESSAGES - 11, queue1.memoryRefCount());
-
- assertEquals(0, queue1.memoryDeliveryCount());
-
- assertEquals(NUM_MESSAGES - 10, queue2.memoryRefCount());
- assertEquals(0, queue2.memoryDeliveryCount());
-
- assertEquals(NUM_MESSAGES - 15, queue3.memoryRefCount());
- assertEquals(0, queue3.memoryDeliveryCount());
-
- assertEquals(NUM_MESSAGES - 20, queue4.memoryRefCount());
- assertEquals(0, queue4.memoryDeliveryCount());
-
- assertEquals(1, queue5.memoryRefCount());
- assertEquals(5, queue5.memoryDeliveryCount());
-
- acknowledgeAll(receiver5);
-
- assertEquals(0, queue5.memoryDeliveryCount());
-
- receiver1.setMaxRefs(0);
-
- assertTrue(office1.getHoldingTransactions().isEmpty());
- assertTrue(office2.getHoldingTransactions().isEmpty());
- assertTrue(office3.getHoldingTransactions().isEmpty());
- assertTrue(office4.getHoldingTransactions().isEmpty());
- assertTrue(office5.getHoldingTransactions().isEmpty());
-
- //Consume 1 more - should pull one from queue2
-
- receiver5.setMaxRefs(1);
- queue5.deliver(false);
- Thread.sleep(2000);
-
- log.info("Here are the sizes 5:");
- log.info("queue1, refs:" + queue1.memoryRefCount() + " dels:" + queue1.memoryDeliveryCount());
- log.info("queue2, refs:" + queue2.memoryRefCount() + " dels:" + queue2.memoryDeliveryCount());
- log.info("queue3, refs:" + queue3.memoryRefCount() + " dels:" + queue3.memoryDeliveryCount());
- log.info("queue4, refs:" + queue4.memoryRefCount() + " dels:" + queue4.memoryDeliveryCount());
- log.info("queue5, refs:" + queue5.memoryRefCount() + " dels:" + queue5.memoryDeliveryCount());
-
- assertEquals(NUM_MESSAGES - 11, queue1.memoryRefCount());
- assertEquals(0, queue1.memoryDeliveryCount());
-
- assertEquals(NUM_MESSAGES - 11, queue2.memoryRefCount());
- assertEquals(0, queue2.memoryDeliveryCount());
-
- assertEquals(NUM_MESSAGES - 15, queue3.memoryRefCount());
- assertEquals(0, queue3.memoryDeliveryCount());
-
- assertEquals(NUM_MESSAGES - 20, queue4.memoryRefCount());
- assertEquals(0, queue4.memoryDeliveryCount());
-
- assertEquals(1, queue5.memoryRefCount());
- assertEquals(1, queue5.memoryDeliveryCount());
-
- acknowledgeAll(receiver5);
-
- assertEquals(0, queue5.memoryDeliveryCount());
-
- receiver5.setMaxRefs(0);
-
- assertTrue(office1.getHoldingTransactions().isEmpty());
- assertTrue(office2.getHoldingTransactions().isEmpty());
- assertTrue(office3.getHoldingTransactions().isEmpty());
- assertTrue(office4.getHoldingTransactions().isEmpty());
- assertTrue(office5.getHoldingTransactions().isEmpty());
-
- //From queue 4 consume everything else
-
- receiver4.setMaxRefs(NUM_MESSAGES - 15 + NUM_MESSAGES - 20 + NUM_MESSAGES - 11 + NUM_MESSAGES - 11 + 1);
- queue4.deliver(false);
- Thread.sleep(7000);
-
- log.info("Here are the sizes 6:");
- log.info("queue1, refs:" + queue1.memoryRefCount() + " dels:" + queue1.memoryDeliveryCount());
- log.info("queue2, refs:" + queue2.memoryRefCount() + " dels:" + queue2.memoryDeliveryCount());
- log.info("queue3, refs:" + queue3.memoryRefCount() + " dels:" + queue3.memoryDeliveryCount());
- log.info("queue4, refs:" + queue4.memoryRefCount() + " dels:" + queue4.memoryDeliveryCount());
- log.info("queue5, refs:" + queue5.memoryRefCount() + " dels:" + queue5.memoryDeliveryCount());
-
- assertEquals(0, queue1.memoryRefCount());
- assertEquals(0, queue1.memoryDeliveryCount());
-
- assertEquals(0, queue2.memoryRefCount());
- assertEquals(0, queue2.memoryDeliveryCount());
-
- assertEquals(0, queue3.memoryRefCount());
- assertEquals(0, queue3.memoryDeliveryCount());
-
- assertEquals(0, queue4.memoryRefCount());
- assertEquals(NUM_MESSAGES - 15 + NUM_MESSAGES - 20 + NUM_MESSAGES - 11 + NUM_MESSAGES - 11 + 1, queue4.memoryDeliveryCount());
-
- assertEquals(0, queue5.memoryRefCount());
- assertEquals(0, queue5.memoryDeliveryCount());
-
- acknowledgeAll(receiver4);
-
- assertEquals(0, queue4.memoryDeliveryCount());
-
- assertTrue(office1.getHoldingTransactions().isEmpty());
- assertTrue(office2.getHoldingTransactions().isEmpty());
- assertTrue(office3.getHoldingTransactions().isEmpty());
- assertTrue(office4.getHoldingTransactions().isEmpty());
- assertTrue(office5.getHoldingTransactions().isEmpty());
-
- checkNoMessageData();
- }
- finally
- {
- if (office1 != null)
- {
- office1.stop();
- }
-
- if (office2 != null)
- {
- office2.stop();
- }
-
- if (office3 != null)
- {
- office3.stop();
- }
-
- if (office4 != null)
- {
- office4.stop();
- }
-
- if (office5 != null)
- {
- office5.stop();
- }
- }
- }
-
- protected void consumeConcurrently(boolean persistent, boolean recoverable) throws Throwable
- {
- DefaultClusteredPostOffice office1 = null;
-
- DefaultClusteredPostOffice office2 = null;
-
- DefaultClusteredPostOffice office3 = null;
-
- DefaultClusteredPostOffice office4 = null;
-
- DefaultClusteredPostOffice office5 = null;
-
- try
- {
- office1 = (DefaultClusteredPostOffice)createClusteredPostOffice(1, "testgroup");
-
- office2 = (DefaultClusteredPostOffice)createClusteredPostOffice(2, "testgroup");
-
- office3 = (DefaultClusteredPostOffice)createClusteredPostOffice(3, "testgroup");
-
- office4 = (DefaultClusteredPostOffice)createClusteredPostOffice(4, "testgroup");
-
- office5 = (DefaultClusteredPostOffice)createClusteredPostOffice(5, "testgroup");
-
- LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
- Binding binding1 = office1.bindClusteredQueue("queue1", queue1);
-
- LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, 2, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
- Binding binding2 = office2.bindClusteredQueue("queue1", queue2);
-
- LocalClusteredQueue queue3 = new LocalClusteredQueue(office3, 3, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
- Binding binding3 = office3.bindClusteredQueue("queue1", queue3);
-
- LocalClusteredQueue queue4 = new LocalClusteredQueue(office4, 4, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
- Binding binding4 = office4.bindClusteredQueue("queue1", queue4);
-
- LocalClusteredQueue queue5 = new LocalClusteredQueue(office5, 5, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
- Binding binding5 = office5.bindClusteredQueue("queue1", queue5);
-
- //Test with no consumers on queue1
-
- //Two equal consumers on queue2 and queue3
-
- //Add messages at queue 1
-
- final int NUM_MESSAGES = 10000;
-
- this.sendMessages("queue1", persistent, office1, NUM_MESSAGES, null);
-
- log.info("sent messages");
-
- Thread.sleep(4000);
-
- ThrottleReceiver receiver1 = new ThrottleReceiver(queue1, 0, 50);
- queue1.add(receiver1);
- queue1.deliver(false);
-
- ThrottleReceiver receiver2 = new ThrottleReceiver(queue2, 0, 50);
- queue2.add(receiver2);
- queue2.deliver(false);
-
- Thread.sleep(45000);
-
- log.info("receiver1: " + receiver1.getTotalCount());
-
- log.info("receiver2: " + receiver2.getTotalCount());
-
-
- //test1
-
-
- //No consumer on node 1
- //Very slow consumer on node 2
- //
-
- /*
- * Test with very fast, infinitely big consumer (i.e. is always ready) on node 1
- * Fast consumer on node2
- * Send messages on node 1
- * Verify all go to node1 consumer
- *
- * Test with very fast, not infinitely big consumer (i.e. is not always ready) on node 1
- * Fast consumer on node2
- * Send messages on node 1
- * Verify most go to node1 consumer, some go to node 2
- *
- * Test with slow consumer on node 1, Fast consumer on node 2
- *
- * Test with no consumer on node 1, consumers on other nodes
- *
- * Things up all the other permutations, then take a guess with error margin of
- * how many messages should be on each node.
- */
-
-
- checkNoMessageData();
- }
- finally
- {
- if (office1 != null)
- {
- office1.stop();
- }
-
- if (office2 != null)
- {
- office2.stop();
- }
-
- if (office3 != null)
- {
- office3.stop();
- }
-
- if (office4 != null)
- {
- office4.stop();
- }
-
- if (office5 != null)
- {
- office5.stop();
- }
- }
- }
-
- class ThrottleReceiver implements Receiver, Runnable
- {
- long pause;
-
- volatile int totalCount;
-
- int count;
-
- int maxSize;
-
- volatile boolean full;
-
- Executor executor;
-
- List dels;
-
- Channel queue;
-
- int getTotalCount()
- {
- return totalCount;
- }
-
- ThrottleReceiver(Channel queue, long pause, int maxSize)
- {
- this.queue = queue;
-
- this.pause = pause;
-
- this.maxSize = maxSize;
-
- this.executor = new QueuedExecutor();
-
- this.dels = new ArrayList();
- }
-
- public Delivery handle(DeliveryObserver observer, MessageReference reference, Transaction tx)
- {
- if (full)
- {
- return null;
- }
-
- //log.info(this + " got ref");
-
- //log.info("cnt:" + totalCount);
-
- SimpleDelivery del = new SimpleDelivery(observer, reference);
-
- dels.add(del);
-
- count++;
-
- totalCount++;
-
- if (count == maxSize)
- {
- full = true;
-
- count = 0;
-
- try
- {
- executor.execute(this);
- }
- catch (InterruptedException e)
- {
- //Ignore
- }
- }
-
- return del;
-
- }
-
- public void run()
- {
- //Simulate processing of messages
-
- try
- {
- Thread.sleep(pause);
- }
- catch (InterruptedException e)
- {
- //Ignore
- }
-
- Iterator iter = dels.iterator();
-
- while (iter.hasNext())
- {
- Delivery del = (Delivery)iter.next();
-
- try
- {
- del.acknowledge(null);
- }
- catch (Throwable t)
- {
- //Ignore
- }
- }
-
- dels.clear();
-
- full = false;
-
- queue.deliver(false);
- }
-
- }
-
- private void acknowledgeAll(SimpleReceiver receiver) throws Throwable
- {
- List messages = receiver.getMessages();
-
- Iterator iter = messages.iterator();
-
- while (iter.hasNext())
- {
- Message msg = (Message)iter.next();
-
- receiver.acknowledge(msg, null);
- }
-
- receiver.clear();
- }
-
-
- protected ClusteredPostOffice createClusteredPostOffice(int nodeId, String groupName) throws Exception
- {
- MessagePullPolicy pullPolicy = new DefaultMessagePullPolicy();
-
- FilterFactory ff = new SimpleFilterFactory();
-
- ClusterRouterFactory rf = new DefaultRouterFactory();
-
- DefaultClusteredPostOffice postOffice =
- new DefaultClusteredPostOffice(sc.getDataSource(), sc.getTransactionManager(),
- null, true, nodeId, "Clustered", ms, pm, tr, ff, pool,
- groupName,
- JGroupsUtil.getControlStackProperties(),
- JGroupsUtil.getDataStackProperties(),
- 10000, 10000, pullPolicy, rf, 1, 1000);
-
- postOffice.start();
-
- return postOffice;
- }
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
-}
-
-
-
-
Copied: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionWithDefaultMessagePullPolicyTest.java (from rev 1474, trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionTest.java)
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionTest.java 2006-10-16 22:03:03 UTC (rev 1474)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionWithDefaultMessagePullPolicyTest.java 2006-10-17 14:49:19 UTC (rev 1485)
@@ -0,0 +1,1236 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.test.messaging.core.plugin.postoffice.cluster;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.jboss.messaging.core.Channel;
+import org.jboss.messaging.core.Delivery;
+import org.jboss.messaging.core.DeliveryObserver;
+import org.jboss.messaging.core.FilterFactory;
+import org.jboss.messaging.core.Message;
+import org.jboss.messaging.core.MessageReference;
+import org.jboss.messaging.core.Receiver;
+import org.jboss.messaging.core.SimpleDelivery;
+import org.jboss.messaging.core.plugin.contract.ClusteredPostOffice;
+import org.jboss.messaging.core.plugin.postoffice.Binding;
+import org.jboss.messaging.core.plugin.postoffice.cluster.ClusterRouterFactory;
+import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultClusteredPostOffice;
+import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultMessagePullPolicy;
+import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultRouterFactory;
+import org.jboss.messaging.core.plugin.postoffice.cluster.LocalClusteredQueue;
+import org.jboss.messaging.core.plugin.postoffice.cluster.MessagePullPolicy;
+import org.jboss.messaging.core.tx.Transaction;
+import org.jboss.test.messaging.core.SimpleFilterFactory;
+import org.jboss.test.messaging.core.SimpleReceiver;
+import org.jboss.test.messaging.core.plugin.base.ClusteringTestBase;
+import org.jboss.test.messaging.util.CoreMessageFactory;
+
+import EDU.oswego.cs.dl.util.concurrent.Executor;
+import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
+
+/**
+ *
+ * A RedistributionWithDefaultMessagePullPolicyTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+public class RedistributionWithDefaultMessagePullPolicyTest extends ClusteringTestBase
+{
+ // Constants -----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public RedistributionWithDefaultMessagePullPolicyTest(String name)
+ {
+ super(name);
+ }
+
+ // Public --------------------------------------------------------
+
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ }
+
+ public void tearDown() throws Exception
+ {
+ super.tearDown();
+ }
+
+
+ public void testConsumeAllNonPersistentNonRecoverable() throws Throwable
+ {
+ consumeAll(false, false);
+ }
+
+ public void testConsumeAllPersistentNonRecoverable() throws Throwable
+ {
+ consumeAll(true, false);
+ }
+
+ public void testConsumeAllNonPersistentRecoverable() throws Throwable
+ {
+ consumeAll(false, true);
+ }
+
+ public void testConsumeAllPersistentRecoverable() throws Throwable
+ {
+ consumeAll(true, true);
+ }
+
+
+
+ public void testConsumeBitByBitNonPersistentNonRecoverable() throws Throwable
+ {
+ consumeBitByBit(false, false);
+ }
+
+ public void testConsumeBitByBitPersistentNonRecoverable() throws Throwable
+ {
+ consumeBitByBit(true, false);
+ }
+
+ public void testConsumeBitByBitNonPersistentRecoverable() throws Throwable
+ {
+ consumeBitByBit(false, true);
+ }
+
+ public void testConsumeBitByBitPersistentRecoverable() throws Throwable
+ {
+ consumeBitByBit(true, true);
+ }
+
+
+
+
+//
+// public void testConsumeConcurrentlyNonPersistentNonRecoverable() throws Throwable
+// {
+// consumeConcurrently(false, false);
+// }
+//
+// public void testConsumeConsumeConcurrentlyPersistentNonRecoverable() throws Throwable
+// {
+// consumeConcurrently(true, false);
+// }
+//
+// public void testConsumeConsumeConcurrentlyNonPersistentRecoverable() throws Throwable
+// {
+// consumeConcurrently(false, true);
+// }
+//
+// public void testConsumeConsumeConcurrentlyPersistentRecoverable() throws Throwable
+// {
+// consumeConcurrently(true, true);
+// }
+
+ public void testSimpleMessagePull() throws Throwable
+ {
+ DefaultClusteredPostOffice office1 = null;
+
+ DefaultClusteredPostOffice office2 = null;
+
+ try
+ {
+ office1 = (DefaultClusteredPostOffice)createClusteredPostOffice(1, "testgroup");
+
+ office2 = (DefaultClusteredPostOffice)createClusteredPostOffice(2, "testgroup");
+
+ LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "queue1", channelIdManager.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
+ Binding binding1 =
+ office1.bindClusteredQueue("queue1", queue1);
+
+ LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, 2, "queue1", channelIdManager.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
+ Binding binding2 =
+ office2.bindClusteredQueue("queue1", queue2);
+
+ Message msg = CoreMessageFactory.createCoreMessage(1);
+ msg.setReliable(true);
+
+ MessageReference ref = ms.reference(msg);
+
+ office1.route(ref, "queue1", null);
+
+ Thread.sleep(2000);
+
+ //Messages should all be in queue1
+
+ List msgs = queue1.browse();
+ assertEquals(1, msgs.size());
+
+ msgs = queue2.browse();
+ assertTrue(msgs.isEmpty());
+
+ SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING_TO_MAX);
+ receiver1.setMaxRefs(0);
+ queue1.add(receiver1);
+ SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING_TO_MAX);
+ receiver2.setMaxRefs(0);
+ queue2.add(receiver2);
+
+ //Prompt delivery so the channels know if the receivers are ready
+ queue1.deliver(false);
+ Thread.sleep(2000);
+
+ //Pull from 1 to 2
+
+ receiver2.setMaxRefs(1);
+
+ log.info("delivering");
+ queue2.deliver(false);
+
+ Thread.sleep(3000);
+
+ assertTrue(office1.getHoldingTransactions().isEmpty());
+ assertTrue(office2.getHoldingTransactions().isEmpty());
+
+ log.info("r2 " + receiver2.getMessages().size());
+
+ log.info("queue1 refs:" + queue1.memoryRefCount() + " dels:" + queue1.memoryDeliveryCount());
+ log.info("queue2 refs:" + queue2.memoryRefCount() + " dels:" + queue2.memoryDeliveryCount());
+
+ assertEquals(0, queue1.memoryRefCount());
+ assertEquals(0, queue1.memoryDeliveryCount());
+
+ assertEquals(0, queue2.memoryRefCount());
+ assertEquals(1, queue2.memoryDeliveryCount());
+
+ this.acknowledgeAll(receiver2);
+
+ assertEquals(0, queue2.memoryRefCount());
+ assertEquals(0, queue2.memoryDeliveryCount());
+
+ assertTrue(office1.getHoldingTransactions().isEmpty());
+ assertTrue(office2.getHoldingTransactions().isEmpty());
+
+ }
+ finally
+ {
+ if (office1 != null)
+ {
+ office1.stop();
+ }
+
+ if (office2 != null)
+ {
+ office2.stop();
+ }
+ }
+ }
+
+ public void testSimpleMessagePullCrashBeforeCommit() throws Throwable
+ {
+ DefaultClusteredPostOffice office1 = null;
+
+ DefaultClusteredPostOffice office2 = null;
+
+ try
+ {
+ office1 = (DefaultClusteredPostOffice)createClusteredPostOffice(1, "testgroup");
+
+ office2 = (DefaultClusteredPostOffice)createClusteredPostOffice(2, "testgroup");
+
+ LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "queue1", channelIdManager.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
+ Binding binding1 =
+ office1.bindClusteredQueue("queue1", queue1);
+
+ LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, 2, "queue1", channelIdManager.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
+ Binding binding2 =
+ office2.bindClusteredQueue("queue1", queue2);
+
+ Message msg = CoreMessageFactory.createCoreMessage(1);
+ msg.setReliable(true);
+
+ MessageReference ref = ms.reference(msg);
+
+ office1.route(ref, "queue1", null);
+
+ Thread.sleep(2000);
+
+ //Messages should all be in queue1
+
+ List msgs = queue1.browse();
+ assertEquals(1, msgs.size());
+
+ msgs = queue2.browse();
+ assertTrue(msgs.isEmpty());
+
+ SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING_TO_MAX);
+ receiver1.setMaxRefs(0);
+ queue1.add(receiver1);
+ SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING_TO_MAX);
+ receiver2.setMaxRefs(0);
+ queue2.add(receiver2);
+
+ //Prompt delivery so the channels know if the receivers are ready
+ queue1.deliver(false);
+ Thread.sleep(2000);
+
+ //Pull from 1 to 2
+
+ receiver2.setMaxRefs(1);
+
+ //Force a failure before commit
+ office2.setFail(true, false);
+
+ log.info("delivering");
+ queue2.deliver(false);
+
+ Thread.sleep(3000);
+
+ assertEquals(1, office1.getHoldingTransactions().size());
+ assertTrue(office2.getHoldingTransactions().isEmpty());
+
+ log.info("queue1 refs:" + queue1.memoryRefCount() + " dels:" + queue1.memoryDeliveryCount());
+ log.info("queue2 refs:" + queue2.memoryRefCount() + " dels:" + queue2.memoryDeliveryCount());
+
+ assertEquals(0, queue1.memoryRefCount());
+ assertEquals(1, queue1.memoryDeliveryCount());
+
+ assertEquals(0, queue2.memoryRefCount());
+ assertEquals(0, queue2.memoryDeliveryCount());
+
+ //Now kill office 2 - this should cause office1 to remove the dead held transaction
+
+ office2.stop();
+ Thread.sleep(2000);
+
+ assertTrue(office1.getHoldingTransactions().isEmpty());
+
+ //The delivery should be cancelled back to the queue too
+
+ assertEquals(1, queue1.memoryRefCount());
+ assertEquals(0, queue1.memoryDeliveryCount());
+
+
+ }
+ finally
+ {
+ if (office1 != null)
+ {
+ office1.stop();
+ }
+
+ if (office2 != null)
+ {
+ office2.stop();
+ }
+ }
+ }
+
+ public void testSimpleMessagePullCrashAfterCommit() throws Throwable
+ {
+ DefaultClusteredPostOffice office1 = null;
+
+ DefaultClusteredPostOffice office2 = null;
+
+ try
+ {
+ office1 = (DefaultClusteredPostOffice)createClusteredPostOffice(1, "testgroup");
+
+ office2 = (DefaultClusteredPostOffice)createClusteredPostOffice(2, "testgroup");
+
+ LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "queue1", channelIdManager.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
+ Binding binding1 =
+ office1.bindClusteredQueue("queue1", queue1);
+
+ LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, 2, "queue1", channelIdManager.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
+ Binding binding2 =
+ office2.bindClusteredQueue("queue1", queue2);
+
+ Message msg = CoreMessageFactory.createCoreMessage(1);
+ msg.setReliable(true);
+
+ MessageReference ref = ms.reference(msg);
+
+ office1.route(ref, "queue1", null);
+
+ Thread.sleep(2000);
+
+ //Messages should all be in queue1
+
+ List msgs = queue1.browse();
+ assertEquals(1, msgs.size());
+
+ msgs = queue2.browse();
+ assertTrue(msgs.isEmpty());
+
+ SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING_TO_MAX);
+ receiver1.setMaxRefs(0);
+ queue1.add(receiver1);
+ SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING_TO_MAX);
+ receiver2.setMaxRefs(0);
+ queue2.add(receiver2);
+
+ //Prompt delivery so the channels know if the receivers are ready
+ queue1.deliver(false);
+ Thread.sleep(2000);
+
+ //Pull from 1 to 2
+
+ receiver2.setMaxRefs(1);
+
+ //Force a failure after commit the ack to storage
+ office2.setFail(false, true);
+
+ log.info("delivering");
+ queue2.deliver(false);
+
+ Thread.sleep(3000);
+
+ assertEquals(1, office1.getHoldingTransactions().size());
+ assertTrue(office2.getHoldingTransactions().isEmpty());
+
+ log.info("queue1 refs:" + queue1.memoryRefCount() + " dels:" + queue1.memoryDeliveryCount());
+ log.info("queue2 refs:" + queue2.memoryRefCount() + " dels:" + queue2.memoryDeliveryCount());
+
+ assertEquals(0, queue1.memoryRefCount());
+ assertEquals(1, queue1.memoryDeliveryCount());
+
+ //Now kill office 2 - this should cause office1 to remove the dead held transaction
+
+ office2.stop();
+ Thread.sleep(2000);
+
+ assertTrue(office1.getHoldingTransactions().isEmpty());
+
+ //The delivery should be committed
+
+ assertEquals(0, queue1.memoryRefCount());
+ assertEquals(0, queue1.memoryDeliveryCount());
+
+ }
+ finally
+ {
+ if (office1 != null)
+ {
+ office1.stop();
+ }
+
+ if (office2 != null)
+ {
+ office2.stop();
+ }
+ }
+ }
+
+ protected void consumeAll(boolean persistent, boolean recoverable) throws Throwable
+ {
+ DefaultClusteredPostOffice office1 = null;
+
+ DefaultClusteredPostOffice office2 = null;
+
+ DefaultClusteredPostOffice office3 = null;
+
+ DefaultClusteredPostOffice office4 = null;
+
+ DefaultClusteredPostOffice office5 = null;
+
+ try
+ {
+ office1 = (DefaultClusteredPostOffice)createClusteredPostOffice(1, "testgroup");
+
+ office2 = (DefaultClusteredPostOffice)createClusteredPostOffice(2, "testgroup");
+
+ office3 = (DefaultClusteredPostOffice)createClusteredPostOffice(3, "testgroup");
+
+ office4 = (DefaultClusteredPostOffice)createClusteredPostOffice(4, "testgroup");
+
+ office5 = (DefaultClusteredPostOffice)createClusteredPostOffice(5, "testgroup");
+
+ LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
+ Binding binding1 = office1.bindClusteredQueue("queue1", queue1);
+
+ LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, 2, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
+ Binding binding2 = office2.bindClusteredQueue("queue1", queue2);
+
+ LocalClusteredQueue queue3 = new LocalClusteredQueue(office3, 3, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
+ Binding binding3 = office3.bindClusteredQueue("queue1", queue3);
+
+ LocalClusteredQueue queue4 = new LocalClusteredQueue(office4, 4, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
+ Binding binding4 = office4.bindClusteredQueue("queue1", queue4);
+
+ LocalClusteredQueue queue5 = new LocalClusteredQueue(office5, 5, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
+ Binding binding5 = office5.bindClusteredQueue("queue1", queue5);
+
+ final int NUM_MESSAGES = 100;
+
+ this.sendMessages("queue1", persistent, office1, NUM_MESSAGES, null);
+ this.sendMessages("queue1", persistent, office2, NUM_MESSAGES, null);
+ this.sendMessages("queue1", persistent, office3, NUM_MESSAGES, null);
+ this.sendMessages("queue1", persistent, office4, NUM_MESSAGES, null);
+ this.sendMessages("queue1", persistent, office5, NUM_MESSAGES, null);
+
+ Thread.sleep(2000);
+
+ //Check the sizes
+
+ log.info("Here are the sizes:");
+ log.info("queue1, refs:" + queue1.memoryRefCount() + " dels:" + queue1.memoryDeliveryCount());
+ log.info("queue2, refs:" + queue2.memoryRefCount() + " dels:" + queue2.memoryDeliveryCount());
+ log.info("queue3, refs:" + queue3.memoryRefCount() + " dels:" + queue3.memoryDeliveryCount());
+ log.info("queue4, refs:" + queue4.memoryRefCount() + " dels:" + queue4.memoryDeliveryCount());
+ log.info("queue5, refs:" + queue5.memoryRefCount() + " dels:" + queue5.memoryDeliveryCount());
+
+ assertEquals(NUM_MESSAGES, queue1.memoryRefCount());
+ assertEquals(0, queue1.memoryDeliveryCount());
+
+ assertEquals(NUM_MESSAGES, queue2.memoryRefCount());
+ assertEquals(0, queue2.memoryDeliveryCount());
+
+ assertEquals(NUM_MESSAGES, queue3.memoryRefCount());
+ assertEquals(0, queue3.memoryDeliveryCount());
+
+ assertEquals(NUM_MESSAGES, queue4.memoryRefCount());
+ assertEquals(0, queue4.memoryDeliveryCount());
+
+ assertEquals(NUM_MESSAGES, queue5.memoryRefCount());
+ assertEquals(0, queue5.memoryDeliveryCount());
+
+ SimpleReceiver receiver = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+
+ queue1.add(receiver);
+
+ queue1.deliver(false);
+
+ Thread.sleep(7000);
+
+ log.info("Here are the sizes:");
+ log.info("queue1, refs:" + queue1.memoryRefCount() + " dels:" + queue1.memoryDeliveryCount());
+ log.info("queue2, refs:" + queue2.memoryRefCount() + " dels:" + queue2.memoryDeliveryCount());
+ log.info("queue3, refs:" + queue3.memoryRefCount() + " dels:" + queue3.memoryDeliveryCount());
+ log.info("queue4, refs:" + queue4.memoryRefCount() + " dels:" + queue4.memoryDeliveryCount());
+ log.info("queue5, refs:" + queue5.memoryRefCount() + " dels:" + queue5.memoryDeliveryCount());
+
+ assertEquals(0, queue1.memoryRefCount());
+ assertEquals(NUM_MESSAGES * 5, queue1.memoryDeliveryCount());
+
+ assertEquals(0, queue2.memoryRefCount());
+ assertEquals(0, queue2.memoryDeliveryCount());
+
+ assertEquals(0, queue3.memoryRefCount());
+ assertEquals(0, queue3.memoryDeliveryCount());
+
+ assertEquals(0, queue4.memoryRefCount());
+ assertEquals(0, queue4.memoryDeliveryCount());
+
+ assertEquals(0, queue5.memoryRefCount());
+ assertEquals(0, queue5.memoryDeliveryCount());
+
+ List messages = receiver.getMessages();
+
+ assertNotNull(messages);
+
+ assertEquals(NUM_MESSAGES * 5, messages.size());
+
+ Iterator iter = messages.iterator();
+
+ while (iter.hasNext())
+ {
+ Message msg = (Message)iter.next();
+
+ receiver.acknowledge(msg, null);
+ }
+
+ receiver.clear();
+
+ assertEquals(0, queue1.memoryRefCount());
+ assertEquals(0, queue1.memoryDeliveryCount());
+
+ assertTrue(office1.getHoldingTransactions().isEmpty());
+ assertTrue(office2.getHoldingTransactions().isEmpty());
+ assertTrue(office3.getHoldingTransactions().isEmpty());
+ assertTrue(office4.getHoldingTransactions().isEmpty());
+ assertTrue(office5.getHoldingTransactions().isEmpty());
+
+ checkNoMessageData();
+ }
+ finally
+ {
+ if (office1 != null)
+ {
+ office1.stop();
+ }
+
+ if (office2 != null)
+ {
+ office2.stop();
+ }
+
+ if (office3 != null)
+ {
+ office3.stop();
+ }
+
+ if (office4 != null)
+ {
+ office4.stop();
+ }
+
+ if (office5 != null)
+ {
+ office5.stop();
+ }
+ }
+ }
+
+ protected void consumeBitByBit(boolean persistent, boolean recoverable) throws Throwable
+ {
+ DefaultClusteredPostOffice office1 = null;
+
+ DefaultClusteredPostOffice office2 = null;
+
+ DefaultClusteredPostOffice office3 = null;
+
+ DefaultClusteredPostOffice office4 = null;
+
+ DefaultClusteredPostOffice office5 = null;
+
+ try
+ {
+ office1 = (DefaultClusteredPostOffice)createClusteredPostOffice(1, "testgroup");
+
+ office2 = (DefaultClusteredPostOffice)createClusteredPostOffice(2, "testgroup");
+
+ office3 = (DefaultClusteredPostOffice)createClusteredPostOffice(3, "testgroup");
+
+ office4 = (DefaultClusteredPostOffice)createClusteredPostOffice(4, "testgroup");
+
+ office5 = (DefaultClusteredPostOffice)createClusteredPostOffice(5, "testgroup");
+
+ LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
+ Binding binding1 = office1.bindClusteredQueue("queue1", queue1);
+
+ LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, 2, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
+ Binding binding2 = office2.bindClusteredQueue("queue1", queue2);
+
+ LocalClusteredQueue queue3 = new LocalClusteredQueue(office3, 3, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
+ Binding binding3 = office3.bindClusteredQueue("queue1", queue3);
+
+ LocalClusteredQueue queue4 = new LocalClusteredQueue(office4, 4, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
+ Binding binding4 = office4.bindClusteredQueue("queue1", queue4);
+
+ LocalClusteredQueue queue5 = new LocalClusteredQueue(office5, 5, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
+ Binding binding5 = office5.bindClusteredQueue("queue1", queue5);
+
+ final int NUM_MESSAGES = 100;
+
+ this.sendMessages("queue1", persistent, office1, NUM_MESSAGES, null);
+ this.sendMessages("queue1", persistent, office2, NUM_MESSAGES, null);
+ this.sendMessages("queue1", persistent, office3, NUM_MESSAGES, null);
+ this.sendMessages("queue1", persistent, office4, NUM_MESSAGES, null);
+ this.sendMessages("queue1", persistent, office5, NUM_MESSAGES, null);
+
+ Thread.sleep(2000);
+
+ //Check the sizes
+
+ log.info("Here are the sizes 1:");
+ log.info("queue1, refs:" + queue1.memoryRefCount() + " dels:" + queue1.memoryDeliveryCount());
+ log.info("queue2, refs:" + queue2.memoryRefCount() + " dels:" + queue2.memoryDeliveryCount());
+ log.info("queue3, refs:" + queue3.memoryRefCount() + " dels:" + queue3.memoryDeliveryCount());
+ log.info("queue4, refs:" + queue4.memoryRefCount() + " dels:" + queue4.memoryDeliveryCount());
+ log.info("queue5, refs:" + queue5.memoryRefCount() + " dels:" + queue5.memoryDeliveryCount());
+
+ assertEquals(NUM_MESSAGES, queue1.memoryRefCount());
+ assertEquals(0, queue1.memoryDeliveryCount());
+
+ assertEquals(NUM_MESSAGES, queue2.memoryRefCount());
+ assertEquals(0, queue2.memoryDeliveryCount());
+
+ assertEquals(NUM_MESSAGES, queue3.memoryRefCount());
+ assertEquals(0, queue3.memoryDeliveryCount());
+
+ assertEquals(NUM_MESSAGES, queue4.memoryRefCount());
+ assertEquals(0, queue4.memoryDeliveryCount());
+
+ assertEquals(NUM_MESSAGES, queue5.memoryRefCount());
+ assertEquals(0, queue5.memoryDeliveryCount());
+
+ assertTrue(office1.getHoldingTransactions().isEmpty());
+ assertTrue(office2.getHoldingTransactions().isEmpty());
+ assertTrue(office3.getHoldingTransactions().isEmpty());
+ assertTrue(office4.getHoldingTransactions().isEmpty());
+ assertTrue(office5.getHoldingTransactions().isEmpty());
+
+ SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING_TO_MAX);
+ queue1.add(receiver1);
+ SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING_TO_MAX);
+ queue2.add(receiver2);
+ SimpleReceiver receiver3 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING_TO_MAX);
+ queue3.add(receiver3);
+ SimpleReceiver receiver4 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING_TO_MAX);
+ queue4.add(receiver4);
+ SimpleReceiver receiver5 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING_TO_MAX);
+ queue5.add(receiver5);
+
+ receiver1.setMaxRefs(5);
+ queue1.deliver(false);
+ Thread.sleep(1000);
+ assertEquals(NUM_MESSAGES - 5, queue1.memoryRefCount());
+ assertEquals(5, queue1.memoryDeliveryCount());
+
+ acknowledgeAll(receiver1);
+ assertEquals(0, queue1.memoryDeliveryCount());
+ receiver1.setMaxRefs(0);
+
+ receiver2.setMaxRefs(10);
+ queue2.deliver(false);
+ Thread.sleep(1000);
+ assertEquals(NUM_MESSAGES - 10, queue2.memoryRefCount());
+ assertEquals(10, queue2.memoryDeliveryCount());
+ acknowledgeAll(receiver2);
+ receiver2.setMaxRefs(0);
+
+ receiver3.setMaxRefs(15);
+ queue3.deliver(false);
+ Thread.sleep(1000);
+ assertEquals(NUM_MESSAGES - 15, queue3.memoryRefCount());
+ assertEquals(15, queue3.memoryDeliveryCount());
+ acknowledgeAll(receiver3);
+ receiver3.setMaxRefs(0);
+
+ receiver4.setMaxRefs(20);
+ queue4.deliver(false);
+ Thread.sleep(1000);
+ assertEquals(NUM_MESSAGES - 20, queue4.memoryRefCount());
+ assertEquals(20, queue4.memoryDeliveryCount());
+ acknowledgeAll(receiver4);
+ receiver4.setMaxRefs(0);
+
+ receiver5.setMaxRefs(25);
+ queue5.deliver(false);
+ Thread.sleep(1000);
+ assertEquals(NUM_MESSAGES - 25, queue5.memoryRefCount());
+ assertEquals(25, queue5.memoryDeliveryCount());
+ acknowledgeAll(receiver5);
+ receiver5.setMaxRefs(0);
+
+ Thread.sleep(1000);
+
+ assertTrue(office1.getHoldingTransactions().isEmpty());
+ assertTrue(office2.getHoldingTransactions().isEmpty());
+ assertTrue(office3.getHoldingTransactions().isEmpty());
+ assertTrue(office4.getHoldingTransactions().isEmpty());
+ assertTrue(office5.getHoldingTransactions().isEmpty());
+
+ log.info("Here are the sizes 2:");
+ log.info("queue1, refs:" + queue1.memoryRefCount() + " dels:" + queue1.memoryDeliveryCount());
+ log.info("queue2, refs:" + queue2.memoryRefCount() + " dels:" + queue2.memoryDeliveryCount());
+ log.info("queue3, refs:" + queue3.memoryRefCount() + " dels:" + queue3.memoryDeliveryCount());
+ log.info("queue4, refs:" + queue4.memoryRefCount() + " dels:" + queue4.memoryDeliveryCount());
+ log.info("queue5, refs:" + queue5.memoryRefCount() + " dels:" + queue5.memoryDeliveryCount());
+
+ //Consume the rest from queue 5
+ receiver5.setMaxRefs(NUM_MESSAGES - 25);
+ queue5.deliver(false);
+ Thread.sleep(5000);
+
+ log.info("receiver5 msgs:" + receiver5.getMessages().size());
+
+ log.info("Here are the sizes 3:");
+ log.info("queue1, refs:" + queue1.memoryRefCount() + " dels:" + queue1.memoryDeliveryCount());
+ log.info("queue2, refs:" + queue2.memoryRefCount() + " dels:" + queue2.memoryDeliveryCount());
+ log.info("queue3, refs:" + queue3.memoryRefCount() + " dels:" + queue3.memoryDeliveryCount());
+ log.info("queue4, refs:" + queue4.memoryRefCount() + " dels:" + queue4.memoryDeliveryCount());
+ log.info("queue5, refs:" + queue5.memoryRefCount() + " dels:" + queue5.memoryDeliveryCount());
+
+ //This will result in an extra one being pulled from queue1 - we cannot avoid this
+ //This is because the channel does not know that the receiver is full unless it tries
+ //with a ref so it needs to retrieve one
+
+ assertEquals(NUM_MESSAGES - 6, queue1.memoryRefCount());
+ assertEquals(0, queue1.memoryDeliveryCount());
+
+ assertEquals(NUM_MESSAGES - 10, queue2.memoryRefCount());
+ assertEquals(0, queue2.memoryDeliveryCount());
+
+ assertEquals(NUM_MESSAGES - 15, queue3.memoryRefCount());
+ assertEquals(0, queue3.memoryDeliveryCount());
+
+ assertEquals(NUM_MESSAGES - 20, queue4.memoryRefCount());
+ assertEquals(0, queue4.memoryDeliveryCount());
+
+ assertEquals(1, queue5.memoryRefCount());
+ assertEquals(NUM_MESSAGES - 25, queue5.memoryDeliveryCount());
+
+ acknowledgeAll(receiver5);
+
+ assertEquals(0, queue5.memoryDeliveryCount());
+
+ receiver5.setMaxRefs(0);
+
+ assertTrue(office1.getHoldingTransactions().isEmpty());
+ assertTrue(office2.getHoldingTransactions().isEmpty());
+ assertTrue(office3.getHoldingTransactions().isEmpty());
+ assertTrue(office4.getHoldingTransactions().isEmpty());
+ assertTrue(office5.getHoldingTransactions().isEmpty());
+
+ //Now consume 5 more from queue5, they should come from queue1 which has the most messages
+
+ log.info("Consume 5 more from queue 5");
+
+ receiver5.setMaxRefs(5);
+ queue5.deliver(false);
+ Thread.sleep(5000);
+
+ log.info("Here are the sizes 4:");
+ log.info("queue1, refs:" + queue1.memoryRefCount() + " dels:" + queue1.memoryDeliveryCount());
+ log.info("queue2, refs:" + queue2.memoryRefCount() + " dels:" + queue2.memoryDeliveryCount());
+ log.info("queue3, refs:" + queue3.memoryRefCount() + " dels:" + queue3.memoryDeliveryCount());
+ log.info("queue4, refs:" + queue4.memoryRefCount() + " dels:" + queue4.memoryDeliveryCount());
+ log.info("queue5, refs:" + queue5.memoryRefCount() + " dels:" + queue5.memoryDeliveryCount());
+
+ assertEquals(NUM_MESSAGES - 11, queue1.memoryRefCount());
+
+ assertEquals(0, queue1.memoryDeliveryCount());
+
+ assertEquals(NUM_MESSAGES - 10, queue2.memoryRefCount());
+ assertEquals(0, queue2.memoryDeliveryCount());
+
+ assertEquals(NUM_MESSAGES - 15, queue3.memoryRefCount());
+ assertEquals(0, queue3.memoryDeliveryCount());
+
+ assertEquals(NUM_MESSAGES - 20, queue4.memoryRefCount());
+ assertEquals(0, queue4.memoryDeliveryCount());
+
+ assertEquals(1, queue5.memoryRefCount());
+ assertEquals(5, queue5.memoryDeliveryCount());
+
+ acknowledgeAll(receiver5);
+
+ assertEquals(0, queue5.memoryDeliveryCount());
+
+ receiver1.setMaxRefs(0);
+
+ assertTrue(office1.getHoldingTransactions().isEmpty());
+ assertTrue(office2.getHoldingTransactions().isEmpty());
+ assertTrue(office3.getHoldingTransactions().isEmpty());
+ assertTrue(office4.getHoldingTransactions().isEmpty());
+ assertTrue(office5.getHoldingTransactions().isEmpty());
+
+ //Consume 1 more - should pull one from queue2
+
+ receiver5.setMaxRefs(1);
+ queue5.deliver(false);
+ Thread.sleep(2000);
+
+ log.info("Here are the sizes 5:");
+ log.info("queue1, refs:" + queue1.memoryRefCount() + " dels:" + queue1.memoryDeliveryCount());
+ log.info("queue2, refs:" + queue2.memoryRefCount() + " dels:" + queue2.memoryDeliveryCount());
+ log.info("queue3, refs:" + queue3.memoryRefCount() + " dels:" + queue3.memoryDeliveryCount());
+ log.info("queue4, refs:" + queue4.memoryRefCount() + " dels:" + queue4.memoryDeliveryCount());
+ log.info("queue5, refs:" + queue5.memoryRefCount() + " dels:" + queue5.memoryDeliveryCount());
+
+ assertEquals(NUM_MESSAGES - 11, queue1.memoryRefCount());
+ assertEquals(0, queue1.memoryDeliveryCount());
+
+ assertEquals(NUM_MESSAGES - 11, queue2.memoryRefCount());
+ assertEquals(0, queue2.memoryDeliveryCount());
+
+ assertEquals(NUM_MESSAGES - 15, queue3.memoryRefCount());
+ assertEquals(0, queue3.memoryDeliveryCount());
+
+ assertEquals(NUM_MESSAGES - 20, queue4.memoryRefCount());
+ assertEquals(0, queue4.memoryDeliveryCount());
+
+ assertEquals(1, queue5.memoryRefCount());
+ assertEquals(1, queue5.memoryDeliveryCount());
+
+ acknowledgeAll(receiver5);
+
+ assertEquals(0, queue5.memoryDeliveryCount());
+
+ receiver5.setMaxRefs(0);
+
+ assertTrue(office1.getHoldingTransactions().isEmpty());
+ assertTrue(office2.getHoldingTransactions().isEmpty());
+ assertTrue(office3.getHoldingTransactions().isEmpty());
+ assertTrue(office4.getHoldingTransactions().isEmpty());
+ assertTrue(office5.getHoldingTransactions().isEmpty());
+
+ //From queue 4 consume everything else
+
+ receiver4.setMaxRefs(NUM_MESSAGES - 15 + NUM_MESSAGES - 20 + NUM_MESSAGES - 11 + NUM_MESSAGES - 11 + 1);
+ queue4.deliver(false);
+ Thread.sleep(7000);
+
+ log.info("Here are the sizes 6:");
+ log.info("queue1, refs:" + queue1.memoryRefCount() + " dels:" + queue1.memoryDeliveryCount());
+ log.info("queue2, refs:" + queue2.memoryRefCount() + " dels:" + queue2.memoryDeliveryCount());
+ log.info("queue3, refs:" + queue3.memoryRefCount() + " dels:" + queue3.memoryDeliveryCount());
+ log.info("queue4, refs:" + queue4.memoryRefCount() + " dels:" + queue4.memoryDeliveryCount());
+ log.info("queue5, refs:" + queue5.memoryRefCount() + " dels:" + queue5.memoryDeliveryCount());
+
+ assertEquals(0, queue1.memoryRefCount());
+ assertEquals(0, queue1.memoryDeliveryCount());
+
+ assertEquals(0, queue2.memoryRefCount());
+ assertEquals(0, queue2.memoryDeliveryCount());
+
+ assertEquals(0, queue3.memoryRefCount());
+ assertEquals(0, queue3.memoryDeliveryCount());
+
+ assertEquals(0, queue4.memoryRefCount());
+ assertEquals(NUM_MESSAGES - 15 + NUM_MESSAGES - 20 + NUM_MESSAGES - 11 + NUM_MESSAGES - 11 + 1, queue4.memoryDeliveryCount());
+
+ assertEquals(0, queue5.memoryRefCount());
+ assertEquals(0, queue5.memoryDeliveryCount());
+
+ acknowledgeAll(receiver4);
+
+ assertEquals(0, queue4.memoryDeliveryCount());
+
+ assertTrue(office1.getHoldingTransactions().isEmpty());
+ assertTrue(office2.getHoldingTransactions().isEmpty());
+ assertTrue(office3.getHoldingTransactions().isEmpty());
+ assertTrue(office4.getHoldingTransactions().isEmpty());
+ assertTrue(office5.getHoldingTransactions().isEmpty());
+
+ checkNoMessageData();
+ }
+ finally
+ {
+ if (office1 != null)
+ {
+ office1.stop();
+ }
+
+ if (office2 != null)
+ {
+ office2.stop();
+ }
+
+ if (office3 != null)
+ {
+ office3.stop();
+ }
+
+ if (office4 != null)
+ {
+ office4.stop();
+ }
+
+ if (office5 != null)
+ {
+ office5.stop();
+ }
+ }
+ }
+
+ protected void consumeConcurrently(boolean persistent, boolean recoverable) throws Throwable
+ {
+ DefaultClusteredPostOffice office1 = null;
+
+ DefaultClusteredPostOffice office2 = null;
+
+ DefaultClusteredPostOffice office3 = null;
+
+ DefaultClusteredPostOffice office4 = null;
+
+ DefaultClusteredPostOffice office5 = null;
+
+ try
+ {
+ office1 = (DefaultClusteredPostOffice)createClusteredPostOffice(1, "testgroup");
+
+ office2 = (DefaultClusteredPostOffice)createClusteredPostOffice(2, "testgroup");
+
+ office3 = (DefaultClusteredPostOffice)createClusteredPostOffice(3, "testgroup");
+
+ office4 = (DefaultClusteredPostOffice)createClusteredPostOffice(4, "testgroup");
+
+ office5 = (DefaultClusteredPostOffice)createClusteredPostOffice(5, "testgroup");
+
+ LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
+ Binding binding1 = office1.bindClusteredQueue("queue1", queue1);
+
+ LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, 2, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
+ Binding binding2 = office2.bindClusteredQueue("queue1", queue2);
+
+ LocalClusteredQueue queue3 = new LocalClusteredQueue(office3, 3, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
+ Binding binding3 = office3.bindClusteredQueue("queue1", queue3);
+
+ LocalClusteredQueue queue4 = new LocalClusteredQueue(office4, 4, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
+ Binding binding4 = office4.bindClusteredQueue("queue1", queue4);
+
+ LocalClusteredQueue queue5 = new LocalClusteredQueue(office5, 5, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
+ Binding binding5 = office5.bindClusteredQueue("queue1", queue5);
+
+ //Test with no consumers on queue1
+
+ //Two equal consumers on queue2 and queue3
+
+ //Add messages at queue 1
+
+ final int NUM_MESSAGES = 10000;
+
+ this.sendMessages("queue1", persistent, office1, NUM_MESSAGES, null);
+
+ log.info("sent messages");
+
+ Thread.sleep(4000);
+
+ ThrottleReceiver receiver1 = new ThrottleReceiver(queue1, 0, 50);
+ queue1.add(receiver1);
+ queue1.deliver(false);
+
+ ThrottleReceiver receiver2 = new ThrottleReceiver(queue2, 0, 50);
+ queue2.add(receiver2);
+ queue2.deliver(false);
+
+ Thread.sleep(45000);
+
+ log.info("receiver1: " + receiver1.getTotalCount());
+
+ log.info("receiver2: " + receiver2.getTotalCount());
+
+
+ //test1
+
+
+ //No consumer on node 1
+ //Very slow consumer on node 2
+ //
+
+ /*
+ * Test with very fast, infinitely big consumer (i.e. is always ready) on node 1
+ * Fast consumer on node2
+ * Send messages on node 1
+ * Verify all go to node1 consumer
+ *
+ * Test with very fast, not infinitely big consumer (i.e. is not always ready) on node 1
+ * Fast consumer on node2
+ * Send messages on node 1
+ * Verify most go to node1 consumer, some go to node 2
+ *
+ * Test with slow consumer on node 1, Fast consumer on node 2
+ *
+ * Test with no consumer on node 1, consumers on other nodes
+ *
+ * Things up all the other permutations, then take a guess with error margin of
+ * how many messages should be on each node.
+ */
+
+
+ checkNoMessageData();
+ }
+ finally
+ {
+ if (office1 != null)
+ {
+ office1.stop();
+ }
+
+ if (office2 != null)
+ {
+ office2.stop();
+ }
+
+ if (office3 != null)
+ {
+ office3.stop();
+ }
+
+ if (office4 != null)
+ {
+ office4.stop();
+ }
+
+ if (office5 != null)
+ {
+ office5.stop();
+ }
+ }
+ }
+
+ class ThrottleReceiver implements Receiver, Runnable
+ {
+ long pause;
+
+ volatile int totalCount;
+
+ int count;
+
+ int maxSize;
+
+ volatile boolean full;
+
+ Executor executor;
+
+ List dels;
+
+ Channel queue;
+
+ int getTotalCount()
+ {
+ return totalCount;
+ }
+
+ ThrottleReceiver(Channel queue, long pause, int maxSize)
+ {
+ this.queue = queue;
+
+ this.pause = pause;
+
+ this.maxSize = maxSize;
+
+ this.executor = new QueuedExecutor();
+
+ this.dels = new ArrayList();
+ }
+
+ public Delivery handle(DeliveryObserver observer, MessageReference reference, Transaction tx)
+ {
+ if (full)
+ {
+ return null;
+ }
+
+ //log.info(this + " got ref");
+
+ //log.info("cnt:" + totalCount);
+
+ SimpleDelivery del = new SimpleDelivery(observer, reference);
+
+ dels.add(del);
+
+ count++;
+
+ totalCount++;
+
+ if (count == maxSize)
+ {
+ full = true;
+
+ count = 0;
+
+ try
+ {
+ executor.execute(this);
+ }
+ catch (InterruptedException e)
+ {
+ //Ignore
+ }
+ }
+
+ return del;
+
+ }
+
+ public void run()
+ {
+ //Simulate processing of messages
+
+ try
+ {
+ Thread.sleep(pause);
+ }
+ catch (InterruptedException e)
+ {
+ //Ignore
+ }
+
+ Iterator iter = dels.iterator();
+
+ while (iter.hasNext())
+ {
+ Delivery del = (Delivery)iter.next();
+
+ try
+ {
+ del.acknowledge(null);
+ }
+ catch (Throwable t)
+ {
+ //Ignore
+ }
+ }
+
+ dels.clear();
+
+ full = false;
+
+ queue.deliver(false);
+ }
+
+ }
+
+ private void acknowledgeAll(SimpleReceiver receiver) throws Throwable
+ {
+ List messages = receiver.getMessages();
+
+ Iterator iter = messages.iterator();
+
+ while (iter.hasNext())
+ {
+ Message msg = (Message)iter.next();
+
+ receiver.acknowledge(msg, null);
+ }
+
+ receiver.clear();
+ }
+
+
+ protected ClusteredPostOffice createClusteredPostOffice(int nodeId, String groupName) throws Exception
+ {
+ MessagePullPolicy pullPolicy = new DefaultMessagePullPolicy();
+
+ FilterFactory ff = new SimpleFilterFactory();
+
+ ClusterRouterFactory rf = new DefaultRouterFactory();
+
+ DefaultClusteredPostOffice postOffice =
+ new DefaultClusteredPostOffice(sc.getDataSource(), sc.getTransactionManager(),
+ null, true, nodeId, "Clustered", ms, pm, tr, ff, pool,
+ groupName,
+ JGroupsUtil.getControlStackProperties(),
+ JGroupsUtil.getDataStackProperties(),
+ 10000, 10000, pullPolicy, rf, 1, 1000);
+
+ postOffice.start();
+
+ return postOffice;
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
+
+
+
+
More information about the jboss-cvs-commits
mailing list