[Jboss-cvs] JBoss Messaging SVN: r1346 - in trunk: src/main/org/jboss/messaging/core/plugin/postoffice/cluster tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Sep 22 11:53:37 EDT 2006
Author: timfox
Date: 2006-09-22 11:53:27 -0400 (Fri, 22 Sep 2006)
New Revision: 1346
Added:
trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultMessagePullPolicyTest.java
trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultRouterTest.java
Removed:
trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/BasicRedistributionPolicyTest.java
trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/FavourLocalRouterTest.java
Modified:
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusteredQueue.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultMessagePullPolicy.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MessagePullPolicy.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/NullMessagePullPolicy.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/QueueStats.java
trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeTest.java
Log:
More clustering work
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusteredQueue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusteredQueue.java 2006-09-22 12:29:44 UTC (rev 1345)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusteredQueue.java 2006-09-22 15:53:27 UTC (rev 1346)
@@ -33,7 +33,7 @@
* $Id$
*
*/
-interface ClusteredQueue extends Queue
+public interface ClusteredQueue extends Queue
{
QueueStats getStats();
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java 2006-09-22 12:29:44 UTC (rev 1345)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java 2006-09-22 15:53:27 UTC (rev 1346)
@@ -391,26 +391,28 @@
ClusteredQueue queue = (ClusteredQueue)del.getObserver();
+ log.info("Routing message to queue:" + queue.getName() + " on node " + queue.getNodeId());
+
+ if (router.numberOfReceivers() > 1)
+ {
+ //We have now chosen which one will receive the message so we need to add this
+ //information to a map which will get sent when casting - so the the queue
+ //on the receiving node knows whether to receive the message
+ if (queueNameNodeIdMap == null)
+ {
+ queueNameNodeIdMap = new HashMap();
+ }
+
+ queueNameNodeIdMap.put(queue.getName(), queue.getNodeId());
+ }
+
if (!queue.isLocal())
{
//We need to send the message remotely
numberRemote++;
- lastNodeId = queue.getNodeId();
-
- if (router.numberOfReceivers() > 1 && queueNameNodeIdMap == null)
- {
- //If there are more than one queues with the same node on the remote nodes
- //We have now chosen which one will receive the message so we need to add this
- //information to a map which will get sent when casting - so the the queue
- //on the receiving node knows whether to receive the message
- queueNameNodeIdMap = new HashMap();
-
- //We add an entry to the map so that on the receiving node we can work out which
- //queue instance will receive the message
- queueNameNodeIdMap.put(queue.getName(), lastNodeId);
- }
-
+ lastNodeId = queue.getNodeId();
+
lastChannelId = queue.getChannelID();
}
}
@@ -650,9 +652,9 @@
{
boolean handle = true;
+ //log.info("Queue map is: " + queueNameNodeIdMap);
if (queueNameNodeIdMap != null)
- {
- // log.info("I have a queue map");
+ {
String desiredNodeId = (String)queueNameNodeIdMap.get(binding.getQueue().getName());
//When there are more than one queues with the same name across the cluster we only
@@ -665,16 +667,17 @@
}
if (handle)
- {
- log.info(this.nodeId + " is handling it");
+ {
//It's a local binding so we pass the message on to the subscription
LocalClusteredQueue queue = (LocalClusteredQueue)binding.getQueue();
+ log.info(queue.getName() + " is handling it on node " + queue.getNodeId());
+
Delivery del = queue.handleFromCluster(ref);
- log.info("Handled it: " + del);
- log.info("accepted: " +del.isSelectorAccepted());
+ //log.info("Handled it: " + del);
+ //log.info("accepted: " +del.isSelectorAccepted());
}
else
{
@@ -894,7 +897,7 @@
if (localQueue != null)
{
- RemoteQueueStub toQueue = messagePullPolicy.chooseQueue(router.getQueues());
+ RemoteQueueStub toQueue = (RemoteQueueStub)messagePullPolicy.chooseQueue(router.getQueues());
if (toQueue != null)
{
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultMessagePullPolicy.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultMessagePullPolicy.java 2006-09-22 12:29:44 UTC (rev 1345)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultMessagePullPolicy.java 2006-09-22 15:53:27 UTC (rev 1346)
@@ -38,11 +38,11 @@
public class DefaultMessagePullPolicy implements MessagePullPolicy
{
- public RemoteQueueStub chooseQueue(List queues)
+ public ClusteredQueue chooseQueue(List queues)
{
Iterator iter = queues.iterator();
- RemoteQueueStub chosenQueue = null;
+ ClusteredQueue chosenQueue = null;
int maxMessages = 0;
@@ -60,7 +60,7 @@
{
maxMessages = cnt;
- chosenQueue = (RemoteQueueStub)queue;
+ chosenQueue = queue;
}
}
}
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MessagePullPolicy.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MessagePullPolicy.java 2006-09-22 12:29:44 UTC (rev 1345)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MessagePullPolicy.java 2006-09-22 15:53:27 UTC (rev 1346)
@@ -35,5 +35,5 @@
*/
public interface MessagePullPolicy
{
- RemoteQueueStub chooseQueue(List queues);
+ ClusteredQueue chooseQueue(List queues);
}
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/NullMessagePullPolicy.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/NullMessagePullPolicy.java 2006-09-22 12:29:44 UTC (rev 1345)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/NullMessagePullPolicy.java 2006-09-22 15:53:27 UTC (rev 1346)
@@ -40,7 +40,7 @@
{
}
- public RemoteQueueStub chooseQueue(List queues)
+ public ClusteredQueue chooseQueue(List queues)
{
return null;
}
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/QueueStats.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/QueueStats.java 2006-09-22 12:29:44 UTC (rev 1345)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/QueueStats.java 2006-09-22 15:53:27 UTC (rev 1346)
@@ -35,7 +35,7 @@
* $Id$
*
*/
-class QueueStats implements Streamable
+public class QueueStats implements Streamable
{
private String queueName;
@@ -50,7 +50,7 @@
}
//QueueStats(String queueName, float addRate, float consumeRate, int messageCount)
- QueueStats(String queueName, int messageCount)
+ public QueueStats(String queueName, int messageCount)
{
this.queueName = queueName;
Deleted: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/BasicRedistributionPolicyTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/BasicRedistributionPolicyTest.java 2006-09-22 12:29:44 UTC (rev 1345)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/BasicRedistributionPolicyTest.java 2006-09-22 15:53:27 UTC (rev 1346)
@@ -1,76 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.test.messaging.core.plugin.postoffice.cluster;
-
-import org.jboss.test.messaging.MessagingTestCase;
-import org.jboss.test.messaging.tools.ServerManagement;
-import org.jboss.test.messaging.tools.jmx.ServiceContainer;
-
-
-public class BasicRedistributionPolicyTest extends MessagingTestCase
-{
- // Constants -----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- protected ServiceContainer sc;
-
- // Constructors --------------------------------------------------
-
- public BasicRedistributionPolicyTest(String name)
- {
- super(name);
- }
-
- // Public --------------------------------------------------------
-
- public void setUp() throws Exception
- {
- super.setUp();
-
- sc = new ServiceContainer("all");
-
- sc.start();
-
- log.debug("setup done");
- }
-
- public void tearDown() throws Exception
- {
- if (!ServerManagement.isRemote())
- {
- sc.stop();
- sc = null;
- }
-
- super.tearDown();
- }
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
-}
-
-
Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeTest.java 2006-09-22 12:29:44 UTC (rev 1345)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeTest.java 2006-09-22 15:53:27 UTC (rev 1346)
@@ -1177,7 +1177,9 @@
//Send 3 messages at node1
//========================
+ log.info("******** sending");
List msgs = sendMessages(persistent, office1, 3, null);
+ log.info("********** sent");
//n2
checkContainsAndAcknowledge(msgs, receiver1, nonDurable1);
@@ -1271,6 +1273,14 @@
//Send 3 messages at node4
//========================
+// * node1: no subscriptions
+// * node2: 2 non durable
+// * node3: 1 non shared durable, 1 non durable
+// * node4: 1 shared durable (shared1), 1 non shared durable, 3 non durable
+// * node5: 2 shared durable (shared1 and shared2)
+// * node6: 1 shared durable (shared2), 1 non durable
+// * node7: 1 shared durable (shared2)
+
msgs = sendMessages(persistent, office4, 3, null);
//n2
@@ -1289,7 +1299,7 @@
checkContainsAndAcknowledge(msgs, receiver9, nonDurable6);
//n5
- checkEmpty(receiver10);
+ checkEmpty(receiver10);
checkContainsAndAcknowledge(msgs, receiver11, sharedDurable3);
//n6
@@ -1408,26 +1418,63 @@
if (office3 != null)
{
+ try
+ {
+ office3.unbindClusteredQueue("nonshareddurable1");
+ }
+ catch (Exception ignore)
+ {
+ }
office3.stop();
}
if (office4 != null)
{
+ try
+ {
+ office4.unbindClusteredQueue("shareddurable1");
+ office4.unbindClusteredQueue("nonshareddurable2");
+ }
+ catch (Exception ignore)
+ {
+ }
office4.stop();
}
if (office5 != null)
- {
+ {
+ try
+ {
+ office5.unbindClusteredQueue("shareddurable1");
+ office5.unbindClusteredQueue("shareddurable2");
+ }
+ catch (Exception ignore)
+ {
+ }
office5.stop();
}
if (office6 != null)
- {
+ {
+ try
+ {
+ office6.unbindClusteredQueue("shareddurable2");
+ }
+ catch (Exception ignore)
+ {
+ }
office6.stop();
}
if (office7 != null)
- {
+ {
+ try
+ {
+ office6.unbindClusteredQueue("shareddurable2");
+ }
+ catch (Exception ignore)
+ {
+ }
office7.stop();
}
@@ -1436,67 +1483,9 @@
}
- private List sendMessages(boolean persistent, PostOffice office, int num, Transaction tx) throws Exception
- {
- List list = new ArrayList();
-
- Message msg = CoreMessageFactory.createCoreMessage(1, persistent, null);
-
- MessageReference ref = ms.reference(msg);
-
- boolean routed = office.route(ref, "topic", null);
-
- assertTrue(routed);
-
- list.add(msg);
-
- Thread.sleep(1000);
-
- return list;
- }
+
- private void checkContainsAndAcknowledge(Message msg, SimpleReceiver receiver, Queue queue) throws Throwable
- {
- List msgs = receiver.getMessages();
- assertNotNull(msgs);
- assertEquals(1, msgs.size());
- Message msgRec = (Message)msgs.get(0);
- assertEquals(msg.getMessageID(), msgRec.getMessageID());
- receiver.acknowledge(msgRec, null);
- msgs = queue.browse();
- assertNotNull(msgs);
- assertTrue(msgs.isEmpty());
- receiver.clear();
- }
- private void checkContainsAndAcknowledge(List msgList, SimpleReceiver receiver, Queue queue) throws Throwable
- {
- List msgs = receiver.getMessages();
- assertNotNull(msgs);
- assertEquals(msgList.size(), msgs.size());
-
- for (int i = 0; i < msgList.size(); i++)
- {
- Message msgRec = (Message)msgs.get(i);
- Message msgCheck = (Message)msgList.get(i);
- assertEquals(msgCheck.getMessageID(), msgRec.getMessageID());
- receiver.acknowledge(msgRec, null);
- }
-
- msgs = queue.browse();
- assertNotNull(msgs);
- assertTrue(msgs.isEmpty());
- receiver.clear();
- }
-
- private void checkEmpty(SimpleReceiver receiver) throws Throwable
- {
- List msgs = receiver.getMessages();
- assertNotNull(msgs);
- assertTrue(msgs.isEmpty());
- }
-
-
protected void clusteredTransactionalRoute(boolean persistent) throws Throwable
{
ClusteredPostOffice office1 = null;
@@ -2120,6 +2109,66 @@
}
// Private -------------------------------------------------------
+
+ private List sendMessages(boolean persistent, PostOffice office, int num, Transaction tx) throws Exception
+ {
+ List list = new ArrayList();
+
+ Message msg = CoreMessageFactory.createCoreMessage(1, persistent, null);
+
+ MessageReference ref = ms.reference(msg);
+
+ boolean routed = office.route(ref, "topic", null);
+
+ assertTrue(routed);
+
+ list.add(msg);
+
+ Thread.sleep(1000);
+
+ return list;
+ }
+
+ private void checkContainsAndAcknowledge(Message msg, SimpleReceiver receiver, Queue queue) throws Throwable
+ {
+ List msgs = receiver.getMessages();
+ assertNotNull(msgs);
+ assertEquals(1, msgs.size());
+ Message msgRec = (Message)msgs.get(0);
+ assertEquals(msg.getMessageID(), msgRec.getMessageID());
+ receiver.acknowledge(msgRec, null);
+ msgs = queue.browse();
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
+ receiver.clear();
+ }
+
+ private void checkContainsAndAcknowledge(List msgList, SimpleReceiver receiver, Queue queue) throws Throwable
+ {
+ List msgs = receiver.getMessages();
+ assertNotNull(msgs);
+ assertEquals(msgList.size(), msgs.size());
+
+ for (int i = 0; i < msgList.size(); i++)
+ {
+ Message msgRec = (Message)msgs.get(i);
+ Message msgCheck = (Message)msgList.get(i);
+ assertEquals(msgCheck.getMessageID(), msgRec.getMessageID());
+ receiver.acknowledge(msgRec, null);
+ }
+
+ msgs = queue.browse();
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
+ receiver.clear();
+ }
+
+ private void checkEmpty(SimpleReceiver receiver) throws Throwable
+ {
+ List msgs = receiver.getMessages();
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
+ }
// Inner classes -------------------------------------------------
Copied: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultMessagePullPolicyTest.java (from rev 1298, trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/BasicRedistributionPolicyTest.java)
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/BasicRedistributionPolicyTest.java 2006-09-17 17:58:08 UTC (rev 1298)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultMessagePullPolicyTest.java 2006-09-22 15:53:27 UTC (rev 1346)
@@ -0,0 +1,377 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.test.messaging.core.plugin.postoffice.cluster;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.jboss.messaging.core.Delivery;
+import org.jboss.messaging.core.DeliveryObserver;
+import org.jboss.messaging.core.Filter;
+import org.jboss.messaging.core.MessageReference;
+import org.jboss.messaging.core.Receiver;
+import org.jboss.messaging.core.plugin.postoffice.cluster.ClusteredQueue;
+import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultMessagePullPolicy;
+import org.jboss.messaging.core.plugin.postoffice.cluster.MessagePullPolicy;
+import org.jboss.messaging.core.plugin.postoffice.cluster.QueueStats;
+import org.jboss.messaging.core.tx.Transaction;
+import org.jboss.test.messaging.MessagingTestCase;
+import org.jboss.test.messaging.tools.ServerManagement;
+import org.jboss.test.messaging.tools.jmx.ServiceContainer;
+
+
+public class DefaultMessagePullPolicyTest extends MessagingTestCase
+{
+ // Constants -----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ protected ServiceContainer sc;
+
+ // Constructors --------------------------------------------------
+
+ public DefaultMessagePullPolicyTest(String name)
+ {
+ super(name);
+ }
+
+ // Public --------------------------------------------------------
+
+ public void setUp() throws Exception
+ {
+ super.setUp();
+
+ sc = new ServiceContainer("all");
+
+ sc.start();
+
+ log.debug("setup done");
+ }
+
+ public void tearDown() throws Exception
+ {
+ if (!ServerManagement.isRemote())
+ {
+ sc.stop();
+ sc = null;
+ }
+
+ super.tearDown();
+ }
+
+ public void test1() throws Exception
+ {
+ MessagePullPolicy policy = new DefaultMessagePullPolicy();
+
+ List queues = new ArrayList();
+
+ ClusteredQueue queue1 = new DummyClusteredQueue(true, "queue1", 1000);
+
+ queues.add(queue1);
+
+ ClusteredQueue queue2 = new DummyClusteredQueue(false, "queue2", 435);
+
+ queues.add(queue2);
+
+ ClusteredQueue queue3 = new DummyClusteredQueue(false, "queue3", 12);
+
+ queues.add(queue3);
+
+ ClusteredQueue queue4 = new DummyClusteredQueue(false, "queue4", 900);
+
+ queues.add(queue4);
+
+ ClusteredQueue queue5 = new DummyClusteredQueue(false, "queue5", 0);
+
+ queues.add(queue5);
+
+ ClusteredQueue chosen = policy.chooseQueue(queues);
+
+ assertTrue(chosen == queue4);
+ }
+
+ public void test2() throws Exception
+ {
+ MessagePullPolicy policy = new DefaultMessagePullPolicy();
+
+ List queues = new ArrayList();
+
+ ClusteredQueue queue1 = new DummyClusteredQueue(true, "queue1", 0);
+
+ queues.add(queue1);
+
+ ClusteredQueue queue2 = new DummyClusteredQueue(false, "queue2", 0);
+
+ queues.add(queue2);
+
+ ClusteredQueue queue3 = new DummyClusteredQueue(false, "queue3", 0);
+
+ queues.add(queue3);
+
+ ClusteredQueue queue4 = new DummyClusteredQueue(false, "queue4", 0);
+
+ queues.add(queue4);
+
+ ClusteredQueue queue5 = new DummyClusteredQueue(false, "queue5", 0);
+
+ queues.add(queue5);
+
+ ClusteredQueue chosen = policy.chooseQueue(queues);
+
+ assertNull(chosen);
+ }
+
+ public void test3() throws Exception
+ {
+ MessagePullPolicy policy = new DefaultMessagePullPolicy();
+
+ List queues = new ArrayList();
+
+ ClusteredQueue queue1 = new DummyClusteredQueue(true, "queue1", 0);
+
+ queues.add(queue1);
+
+ ClusteredQueue chosen = policy.chooseQueue(queues);
+
+ assertNull(chosen);
+ }
+
+ public void test4() throws Exception
+ {
+ MessagePullPolicy policy = new DefaultMessagePullPolicy();
+
+ List queues = new ArrayList();
+
+ ClusteredQueue chosen = policy.chooseQueue(queues);
+
+ assertNull(chosen);
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+ class DummyClusteredQueue implements ClusteredQueue
+ {
+ private boolean local;
+
+ private String queueName;
+
+ private int msgCount;
+
+ DummyClusteredQueue(boolean local, String queueName, int msgCount)
+ {
+ this.local = local;
+ this.queueName = queueName;
+ this.msgCount = msgCount;
+ }
+
+ public String getNodeId()
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ public QueueStats getStats()
+ {
+ return new QueueStats(queueName, msgCount);
+ }
+
+ public boolean isLocal()
+ {
+ return local;
+ }
+
+ public Filter getFilter()
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ public String getName()
+ {
+ return queueName;
+ }
+
+ public boolean isClustered()
+ {
+ return true;
+ }
+
+ public void acknowledge(Delivery d, Transaction tx) throws Throwable
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void cancel(Delivery d) throws Throwable
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ public boolean acceptReliableMessages()
+ {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ public void activate()
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ public List browse()
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ public List browse(Filter filter)
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ public void clear()
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void close()
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void deactivate()
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void deliver(boolean synchronous)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ public List delivering(Filter filter)
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ public long getChannelID()
+ {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ public boolean isActive()
+ {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ public boolean isRecoverable()
+ {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ public void load() throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ public int messageCount()
+ {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ public void removeAllReferences() throws Throwable
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ public List undelivered(Filter filter)
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ public void unload() throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ public Delivery handle(DeliveryObserver observer, MessageReference reference, Transaction tx)
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ public boolean add(Receiver receiver)
+ {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ public boolean contains(Receiver receiver)
+ {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ public Iterator iterator()
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ public int numberOfReceivers()
+ {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ public boolean remove(Receiver receiver)
+ {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ }
+
+}
+
+
Copied: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultRouterTest.java (from rev 1345, trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/FavourLocalRouterTest.java)
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/FavourLocalRouterTest.java 2006-09-22 12:29:44 UTC (rev 1345)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultRouterTest.java 2006-09-22 15:53:27 UTC (rev 1346)
@@ -0,0 +1,522 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.test.messaging.core.plugin.postoffice.cluster;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.jboss.jms.server.QueuedExecutorPool;
+import org.jboss.messaging.core.FilterFactory;
+import org.jboss.messaging.core.Message;
+import org.jboss.messaging.core.MessageReference;
+import org.jboss.messaging.core.Queue;
+import org.jboss.messaging.core.plugin.IdManager;
+import org.jboss.messaging.core.plugin.JDBCPersistenceManager;
+import org.jboss.messaging.core.plugin.SimpleMessageStore;
+import org.jboss.messaging.core.plugin.contract.ClusteredPostOffice;
+import org.jboss.messaging.core.plugin.contract.MessageStore;
+import org.jboss.messaging.core.plugin.contract.PersistenceManager;
+import org.jboss.messaging.core.plugin.contract.PostOffice;
+import org.jboss.messaging.core.plugin.postoffice.Binding;
+import org.jboss.messaging.core.plugin.postoffice.cluster.ClusterRouterFactory;
+import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultClusteredPostOffice;
+import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultRouterFactory;
+import org.jboss.messaging.core.plugin.postoffice.cluster.LocalClusteredQueue;
+import org.jboss.messaging.core.plugin.postoffice.cluster.MessagePullPolicy;
+import org.jboss.messaging.core.plugin.postoffice.cluster.NullMessagePullPolicy;
+import org.jboss.messaging.core.tx.Transaction;
+import org.jboss.messaging.core.tx.TransactionRepository;
+import org.jboss.test.messaging.MessagingTestCase;
+import org.jboss.test.messaging.core.SimpleFilterFactory;
+import org.jboss.test.messaging.core.SimpleReceiver;
+import org.jboss.test.messaging.tools.ServerManagement;
+import org.jboss.test.messaging.tools.jmx.ServiceContainer;
+import org.jboss.test.messaging.util.CoreMessageFactory;
+
+import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
+
+/**
+ *
+ * A DefaultRouterTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+public class DefaultRouterTest extends MessagingTestCase
+{
+ // Constants -----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ protected ServiceContainer sc;
+
+ protected IdManager im;
+
+ protected PersistenceManager pm;
+
+ protected MessageStore ms;
+
+ protected TransactionRepository tr;
+
+ protected QueuedExecutorPool pool;
+
+ // Constructors --------------------------------------------------
+
+ public DefaultRouterTest(String name)
+ {
+ super(name);
+ }
+
+ // Public --------------------------------------------------------
+
+ public void setUp() throws Exception
+ {
+ super.setUp();
+
+ sc = new ServiceContainer("all");
+
+ sc.start();
+
+ pm =
+ new JDBCPersistenceManager(sc.getDataSource(), sc.getTransactionManager(), null,
+ true, true, true, 100);
+ pm.start();
+
+ tr = new TransactionRepository(pm, new IdManager("TRANSACTION_ID", 10, pm));
+ tr.start();
+
+ ms = new SimpleMessageStore();
+ ms.start();
+
+ pool = new QueuedExecutorPool(10);
+
+ im = new IdManager("CHANNEL_ID", 10, pm);
+
+ log.debug("setup done");
+ }
+
+ public void tearDown() throws Exception
+ {
+ if (!ServerManagement.isRemote())
+ {
+ sc.stop();
+ sc = null;
+ }
+ pm.stop();
+ tr.stop();
+ ms.stop();
+
+ super.tearDown();
+ }
+
+ public void testNotLocalPersistent() throws Throwable
+ {
+ notLocal(true);
+ }
+
+ public void testNotLocalNonPersistent() throws Throwable
+ {
+ notLocal(false);
+ }
+
+ public void testLocalPersistent() throws Throwable
+ {
+ local(true);
+ }
+
+ public void testLocalNonPersistent() throws Throwable
+ {
+ local(false);
+ }
+
+ protected void notLocal(boolean persistent) throws Throwable
+ {
+ ClusteredPostOffice office1 = null;
+
+ ClusteredPostOffice office2 = null;
+
+ ClusteredPostOffice office3 = null;
+
+ ClusteredPostOffice office4 = null;
+
+ ClusteredPostOffice office5 = null;
+
+ ClusteredPostOffice office6 = null;
+
+ try
+ {
+ office1 = createClusteredPostOffice("node1", "testgroup");
+
+ office2 = createClusteredPostOffice("node2", "testgroup");
+
+ office3 = createClusteredPostOffice("node3", "testgroup");
+
+ office4 = createClusteredPostOffice("node4", "testgroup");
+
+ office5 = createClusteredPostOffice("node5", "testgroup");
+
+ office6 = createClusteredPostOffice("node6", "testgroup");
+
+ LocalClusteredQueue queue1 = new LocalClusteredQueue(office2, "node2", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ Binding binding1 = office2.bindClusteredQueue("topic", queue1);
+ SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+ queue1.add(receiver1);
+
+ LocalClusteredQueue queue2 = new LocalClusteredQueue(office3, "node3", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ Binding binding2 = office3.bindClusteredQueue("topic", queue2);
+ SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+ queue2.add(receiver2);
+
+ LocalClusteredQueue queue3 = new LocalClusteredQueue(office4, "node4", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ Binding binding3 = office4.bindClusteredQueue("topic", queue3);
+ SimpleReceiver receiver3 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+ queue3.add(receiver3);
+
+ LocalClusteredQueue queue4 = new LocalClusteredQueue(office5, "node5", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ Binding binding4 = office5.bindClusteredQueue("topic", queue4);
+ SimpleReceiver receiver4 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+ queue4.add(receiver4);
+
+ LocalClusteredQueue queue5 = new LocalClusteredQueue(office6, "node6", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ Binding binding5 = office6.bindClusteredQueue("topic", queue5);
+ SimpleReceiver receiver5 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+ queue5.add(receiver5);
+
+ List msgs = sendMessages(persistent, office1, 3, null);
+ checkContainsAndAcknowledge(msgs, receiver1, queue1);
+ checkEmpty(receiver2);
+ checkEmpty(receiver3);
+ checkEmpty(receiver4);
+ checkEmpty(receiver5);
+
+ msgs = sendMessages(persistent, office1, 3, null);
+ checkEmpty(receiver1);
+ checkContainsAndAcknowledge(msgs, receiver2, queue1);
+ checkEmpty(receiver3);
+ checkEmpty(receiver4);
+ checkEmpty(receiver5);
+
+ msgs = sendMessages(persistent, office1, 3, null);
+ checkEmpty(receiver1);
+ checkEmpty(receiver2);
+ checkContainsAndAcknowledge(msgs, receiver3, queue1);
+ checkEmpty(receiver4);
+ checkEmpty(receiver5);
+
+ msgs = sendMessages(persistent, office1, 3, null);
+ checkEmpty(receiver1);
+ checkEmpty(receiver2);
+ checkEmpty(receiver3);
+ checkContainsAndAcknowledge(msgs, receiver4, queue1);
+ checkEmpty(receiver5);
+
+ msgs = sendMessages(persistent, office1, 3, null);
+ checkEmpty(receiver1);
+ checkEmpty(receiver2);
+ checkEmpty(receiver3);
+ checkEmpty(receiver4);
+ checkContainsAndAcknowledge(msgs, receiver5, queue1);
+
+ msgs = sendMessages(persistent, office1, 3, null);
+ checkContainsAndAcknowledge(msgs, receiver1, queue1);
+ checkEmpty(receiver2);
+ checkEmpty(receiver3);
+ checkEmpty(receiver4);
+ checkEmpty(receiver5);
+
+ msgs = sendMessages(persistent, office1, 3, null);
+ checkEmpty(receiver1);
+ checkContainsAndAcknowledge(msgs, receiver2, queue1);
+ checkEmpty(receiver3);
+ checkEmpty(receiver4);
+ checkEmpty(receiver5);
+
+
+ }
+ finally
+ {
+ if (office1 != null)
+ {
+ office1.stop();
+ }
+
+ if (office2 != null)
+ {
+ office2.stop();
+ }
+
+ if (office3 != null)
+ {
+ office3.stop();
+ }
+
+ if (office4 != null)
+ {
+ office4.stop();
+ }
+
+ if (office5 != null)
+ {
+ office5.stop();
+ }
+
+ if (office6 != null)
+ {
+ office6.stop();
+ }
+ }
+ }
+
+
+ protected void local(boolean persistent) throws Throwable
+ {
+ ClusteredPostOffice office1 = null;
+
+ ClusteredPostOffice office2 = null;
+
+ ClusteredPostOffice office3 = null;
+
+ ClusteredPostOffice office4 = null;
+
+ ClusteredPostOffice office5 = null;
+
+ ClusteredPostOffice office6 = null;
+
+ try
+ {
+ office1 = createClusteredPostOffice("node1", "testgroup");
+
+ office2 = createClusteredPostOffice("node2", "testgroup");
+
+ office3 = createClusteredPostOffice("node3", "testgroup");
+
+ office4 = createClusteredPostOffice("node4", "testgroup");
+
+ office5 = createClusteredPostOffice("node5", "testgroup");
+
+ office6 = createClusteredPostOffice("node6", "testgroup");
+
+ LocalClusteredQueue queue1 = new LocalClusteredQueue(office2, "node2", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ Binding binding1 = office2.bindClusteredQueue("topic", queue1);
+ SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+ queue1.add(receiver1);
+
+ LocalClusteredQueue queue2 = new LocalClusteredQueue(office3, "node3", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ Binding binding2 = office3.bindClusteredQueue("topic", queue2);
+ SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+ queue2.add(receiver2);
+
+ LocalClusteredQueue queue3 = new LocalClusteredQueue(office4, "node4", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ Binding binding3 = office4.bindClusteredQueue("topic", queue3);
+ SimpleReceiver receiver3 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+ queue3.add(receiver3);
+
+ LocalClusteredQueue queue4 = new LocalClusteredQueue(office5, "node5", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ Binding binding4 = office5.bindClusteredQueue("topic", queue4);
+ SimpleReceiver receiver4 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+ queue4.add(receiver4);
+
+ LocalClusteredQueue queue5 = new LocalClusteredQueue(office6, "node6", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ Binding binding5 = office6.bindClusteredQueue("topic", queue5);
+ SimpleReceiver receiver5 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+ queue5.add(receiver5);
+
+ List msgs = sendMessages(persistent, office2, 3, null);
+ checkContainsAndAcknowledge(msgs, receiver1, queue1);
+ checkEmpty(receiver2);
+ checkEmpty(receiver3);
+ checkEmpty(receiver4);
+ checkEmpty(receiver5);
+
+ msgs = sendMessages(persistent, office2, 3, null);
+ checkContainsAndAcknowledge(msgs, receiver1, queue1);
+ checkEmpty(receiver2);
+ checkEmpty(receiver3);
+ checkEmpty(receiver4);
+ checkEmpty(receiver5);
+
+ msgs = sendMessages(persistent, office2, 3, null);
+ checkContainsAndAcknowledge(msgs, receiver1, queue1);
+ checkEmpty(receiver2);
+ checkEmpty(receiver3);
+ checkEmpty(receiver4);
+ checkEmpty(receiver5);
+
+
+ msgs = sendMessages(persistent, office3, 3, null);
+ checkEmpty(receiver1);
+ checkContainsAndAcknowledge(msgs, receiver2, queue1);
+ checkEmpty(receiver3);
+ checkEmpty(receiver4);
+ checkEmpty(receiver5);
+
+ msgs = sendMessages(persistent, office3, 3, null);
+ checkEmpty(receiver1);
+ checkContainsAndAcknowledge(msgs, receiver2, queue1);
+ checkEmpty(receiver3);
+ checkEmpty(receiver4);
+ checkEmpty(receiver5);
+
+ msgs = sendMessages(persistent, office3, 3, null);
+ checkEmpty(receiver1);
+ checkContainsAndAcknowledge(msgs, receiver2, queue1);
+ checkEmpty(receiver3);
+ checkEmpty(receiver4);
+ checkEmpty(receiver5);
+
+
+ }
+ finally
+ {
+ if (office1 != null)
+ {
+ office1.stop();
+ }
+
+ if (office2 != null)
+ {
+ office2.stop();
+ }
+
+ if (office3 != null)
+ {
+ office3.stop();
+ }
+
+ if (office4 != null)
+ {
+ office4.stop();
+ }
+
+ if (office5 != null)
+ {
+ office5.stop();
+ }
+
+ if (office6 != null)
+ {
+ office6.stop();
+ }
+ }
+ }
+
+
+
+ protected ClusteredPostOffice createClusteredPostOffice(String nodeId, String groupName) throws Exception
+ {
+ MessagePullPolicy redistPolicy = new NullMessagePullPolicy();
+
+ FilterFactory ff = new SimpleFilterFactory();
+
+ ClusterRouterFactory rf = new DefaultRouterFactory();
+
+ DefaultClusteredPostOffice postOffice =
+ new DefaultClusteredPostOffice(sc.getDataSource(), sc.getTransactionManager(),
+ null, true, nodeId, "Clustered", ms, pm, tr, ff, pool,
+ groupName,
+ JGroupsUtil.getControlStackProperties(),
+ JGroupsUtil.getDataStackProperties(),
+ 5000, 5000, redistPolicy, rf, 1);
+
+ postOffice.start();
+
+ return postOffice;
+ }
+
+ // Private -------------------------------------------------------
+
+ //TODO these methods are duplicated from DefaultClusteredPostOfficeTest - put in common super class or somewhere
+ //else
+
+ private List sendMessages(boolean persistent, PostOffice office, int num, Transaction tx) throws Exception
+ {
+ List list = new ArrayList();
+
+ Message msg = CoreMessageFactory.createCoreMessage(1, persistent, null);
+
+ MessageReference ref = ms.reference(msg);
+
+ boolean routed = office.route(ref, "topic", null);
+
+ assertTrue(routed);
+
+ list.add(msg);
+
+ Thread.sleep(1000);
+
+ return list;
+ }
+
+
+ private void checkContainsAndAcknowledge(Message msg, SimpleReceiver receiver, Queue queue) throws Throwable
+ {
+ List msgs = receiver.getMessages();
+ assertNotNull(msgs);
+ assertEquals(1, msgs.size());
+ Message msgRec = (Message)msgs.get(0);
+ assertEquals(msg.getMessageID(), msgRec.getMessageID());
+ receiver.acknowledge(msgRec, null);
+ msgs = queue.browse();
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
+ receiver.clear();
+ }
+
+ private void checkContainsAndAcknowledge(List msgList, SimpleReceiver receiver, Queue queue) throws Throwable
+ {
+ List msgs = receiver.getMessages();
+ assertNotNull(msgs);
+ assertEquals(msgList.size(), msgs.size());
+
+ for (int i = 0; i < msgList.size(); i++)
+ {
+ Message msgRec = (Message)msgs.get(i);
+ Message msgCheck = (Message)msgList.get(i);
+ assertEquals(msgCheck.getMessageID(), msgRec.getMessageID());
+ receiver.acknowledge(msgRec, null);
+ }
+
+ msgs = queue.browse();
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
+ receiver.clear();
+ }
+
+ private void checkEmpty(SimpleReceiver receiver) throws Throwable
+ {
+ List msgs = receiver.getMessages();
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
+ }
+
+ // Inner classes -------------------------------------------------
+
+
+}
+
+
+
Deleted: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/FavourLocalRouterTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/FavourLocalRouterTest.java 2006-09-22 12:29:44 UTC (rev 1345)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/FavourLocalRouterTest.java 2006-09-22 15:53:27 UTC (rev 1346)
@@ -1,172 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.test.messaging.core.plugin.postoffice.cluster;
-
-import java.util.List;
-
-import org.jboss.messaging.core.FilterFactory;
-import org.jboss.messaging.core.Message;
-import org.jboss.messaging.core.MessageReference;
-import org.jboss.messaging.core.plugin.contract.ClusteredPostOffice;
-import org.jboss.messaging.core.plugin.postoffice.Binding;
-import org.jboss.messaging.core.plugin.postoffice.cluster.ClusterRouterFactory;
-import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultClusteredPostOffice;
-import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultRouterFactory;
-import org.jboss.messaging.core.plugin.postoffice.cluster.LocalClusteredQueue;
-import org.jboss.messaging.core.plugin.postoffice.cluster.MessagePullPolicy;
-import org.jboss.messaging.core.plugin.postoffice.cluster.NullMessagePullPolicy;
-import org.jboss.test.messaging.core.SimpleFilterFactory;
-import org.jboss.test.messaging.core.plugin.postoffice.DefaultPostOfficeTest;
-import org.jboss.test.messaging.util.CoreMessageFactory;
-
-import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
-
-public class FavourLocalRouterTest extends DefaultPostOfficeTest
-{
- // Constants -----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- public FavourLocalRouterTest(String name)
- {
- super(name);
- }
-
- // Public --------------------------------------------------------
-
- public void setUp() throws Exception
- {
- super.setUp();
-
- }
-
- public void tearDown() throws Exception
- {
- super.tearDown();
- }
-
- public void testNoLocalQueue() throws Throwable
- {
- ClusteredPostOffice office1 = null;
-
- ClusteredPostOffice office2 = null;
-
- try
- {
- office1 = createClusteredPostOffice("node1", "testgroup");
-
- office2 = createClusteredPostOffice("node2", "testgroup");
-
- office1 = createClusteredPostOffice("node1", "testgroup");
-
- office2 = createClusteredPostOffice("node2", "testgroup");
-
- LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, "node1", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
-
- Binding binding1 = office1.bindClusteredQueue("queue1", queue1);
-
- LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, "node2", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
-
- Binding binding2 = office2.bindClusteredQueue("queue1", queue1);
-
- final int NUM_MESSAGES = 10;
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- Message msg = CoreMessageFactory.createCoreMessage(i, false, null);
- MessageReference ref = ms.reference(msg);
- boolean routed = office1.route(ref, "queue1", null);
- }
-
- //We have a favour local routing policy so all messages should be in queue1
- List msgs = queue1.browse();
- assertEquals(NUM_MESSAGES, msgs.size());
-
- msgs = queue2.browse();
- assertEquals(0, msgs.size());
-
- office1.unbindClusteredQueue("queue1");
-
- //Send some more messages
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- Message msg = CoreMessageFactory.createCoreMessage(i + 10, false, null);
- MessageReference ref = ms.reference(msg);
- boolean routed = office1.route(ref, "queue1", null);
- }
-
- //There is no queue1 on node1 any more so the messages should be on node2
-
- msgs = queue2.browse();
- assertEquals(NUM_MESSAGES, msgs.size());
-
- }
- finally
- {
- if (office1 != null)
- {
- office1.stop();
- }
-
- if (office2 != null)
- {
- office2.stop();
- }
- }
- }
-
-
-
- protected ClusteredPostOffice createClusteredPostOffice(String nodeId, String groupName) throws Exception
- {
- MessagePullPolicy redistPolicy = new NullMessagePullPolicy();
-
- FilterFactory ff = new SimpleFilterFactory();
-
- ClusterRouterFactory rf = new DefaultRouterFactory();
-
- DefaultClusteredPostOffice postOffice =
- new DefaultClusteredPostOffice(sc.getDataSource(), sc.getTransactionManager(),
- null, true, nodeId, "Clustered", ms, pm, tr, ff, pool,
- groupName,
- JGroupsUtil.getControlStackProperties(),
- JGroupsUtil.getDataStackProperties(),
- 5000, 5000, redistPolicy, rf, 1);
-
- postOffice.start();
-
- return postOffice;
- }
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
-
-}
-
-
-
More information about the jboss-cvs-commits
mailing list