[Jboss-cvs] JBoss Messaging SVN: r1345 - in trunk: src/main/org/jboss/messaging/core src/main/org/jboss/messaging/core/local src/main/org/jboss/messaging/core/plugin src/main/org/jboss/messaging/core/plugin/postoffice/cluster tests/src/org/jboss/test/messaging/core/plugin/postoffice tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Sep 22 08:29:55 EDT 2006
Author: timfox
Date: 2006-09-22 08:29:44 -0400 (Fri, 22 Sep 2006)
New Revision: 1345
Added:
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultRouter.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultRouterFactory.java
Removed:
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/FavourLocalRouter.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/FavourLocalRouterFactory.java
Modified:
trunk/src/main/org/jboss/messaging/core/Queue.java
trunk/src/main/org/jboss/messaging/core/local/PagingFilteredQueue.java
trunk/src/main/org/jboss/messaging/core/plugin/ClusteredPostOfficeService.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RemoteQueueStub.java
trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/DefaultPostOfficeTest.java
trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeTest.java
trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/FavourLocalRouterTest.java
Log:
Distributed destinations
Modified: trunk/src/main/org/jboss/messaging/core/Queue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/Queue.java 2006-09-22 02:06:08 UTC (rev 1344)
+++ trunk/src/main/org/jboss/messaging/core/Queue.java 2006-09-22 12:29:44 UTC (rev 1345)
@@ -32,7 +32,9 @@
*/
public interface Queue extends Channel
{
- public String getName();
+ String getName();
- public Filter getFilter();
+ Filter getFilter();
+
+ boolean isClustered();
}
Modified: trunk/src/main/org/jboss/messaging/core/local/PagingFilteredQueue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/local/PagingFilteredQueue.java 2006-09-22 02:06:08 UTC (rev 1344)
+++ trunk/src/main/org/jboss/messaging/core/local/PagingFilteredQueue.java 2006-09-22 12:29:44 UTC (rev 1345)
@@ -98,6 +98,24 @@
this.filter = filter;
}
+
+ // Queue implementation
+ // ---------------------------------------------------------------
+
+ public boolean isClustered()
+ {
+ return false;
+ }
+
+ public String getName()
+ {
+ return name;
+ }
+
+ public Filter getFilter()
+ {
+ return filter;
+ }
// Channel implementation ----------------------------------------
@@ -116,21 +134,11 @@
}
// Public --------------------------------------------------------
-
- public String getName()
- {
- return name;
- }
public String toString()
{
return "Queue[" + getChannelID() + "]";
}
-
- public Filter getFilter()
- {
- return filter;
- }
// Package protected ---------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/plugin/ClusteredPostOfficeService.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/ClusteredPostOfficeService.java 2006-09-22 02:06:08 UTC (rev 1344)
+++ trunk/src/main/org/jboss/messaging/core/plugin/ClusteredPostOfficeService.java 2006-09-22 12:29:44 UTC (rev 1345)
@@ -34,7 +34,7 @@
import org.jboss.messaging.core.plugin.contract.PersistenceManager;
import org.jboss.messaging.core.plugin.postoffice.cluster.ClusterRouterFactory;
import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultClusteredPostOffice;
-import org.jboss.messaging.core.plugin.postoffice.cluster.FavourLocalRouterFactory;
+import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultRouterFactory;
import org.jboss.messaging.core.plugin.postoffice.cluster.MessagePullPolicy;
import org.jboss.messaging.core.plugin.postoffice.cluster.NullMessagePullPolicy;
import org.jboss.messaging.core.tx.TransactionRepository;
@@ -209,7 +209,7 @@
FilterFactory ff = new SelectorFactory();
- ClusterRouterFactory rf = new FavourLocalRouterFactory();
+ ClusterRouterFactory rf = new DefaultRouterFactory();
postOffice = new DefaultClusteredPostOffice(ds, tm, sqlProperties, createTablesOnStartup,
nodeId, officeName, ms,
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java 2006-09-22 02:06:08 UTC (rev 1344)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java 2006-09-22 12:29:44 UTC (rev 1345)
@@ -388,31 +388,31 @@
if (del != null && del.isSelectorAccepted())
{
routed = true;
- }
-
- ClusteredQueue queue = (ClusteredQueue)del.getObserver();
- if (del.isSelectorAccepted() && !queue.isLocal())
- {
- //We need to send the message remotely
- numberRemote++;
+ ClusteredQueue queue = (ClusteredQueue)del.getObserver();
- lastNodeId = queue.getNodeId();
-
- if (router.numberOfReceivers() > 1 && queueNameNodeIdMap == null)
+ if (!queue.isLocal())
{
- //If there are more than one queues with the same node on the remote nodes
- //We have now chosen which one will receive the message so we need to add this
- //information to a map which will get sent when casting - so the the queue
- //on the receiving node knows whether to receive the message
- queueNameNodeIdMap = new HashMap();
+ //We need to send the message remotely
+ numberRemote++;
- //We add an entry to the map so that on the receiving node we can work out which
- //queue instance will receive the message
- queueNameNodeIdMap.put(queue.getName(), lastNodeId);
+ lastNodeId = queue.getNodeId();
+
+ if (router.numberOfReceivers() > 1 && queueNameNodeIdMap == null)
+ {
+ //If there are more than one queues with the same node on the remote nodes
+ //We have now chosen which one will receive the message so we need to add this
+ //information to a map which will get sent when casting - so the the queue
+ //on the receiving node knows whether to receive the message
+ queueNameNodeIdMap = new HashMap();
+
+ //We add an entry to the map so that on the receiving node we can work out which
+ //queue instance will receive the message
+ queueNameNodeIdMap.put(queue.getName(), lastNodeId);
+ }
+
+ lastChannelId = queue.getChannelID();
}
-
- lastChannelId = queue.getChannelID();
}
}
@@ -434,7 +434,7 @@
//FIXME - temporarily commented out until can get unicast to work
//asyncSendRequest(new MessageRequest(condition, ref.getMessage(), null), lastNodeId);
- asyncSendRequest(new MessageRequest(condition, ref.getMessage(), null));
+ asyncSendRequest(new MessageRequest(condition, ref.getMessage(), queueNameNodeIdMap));
}
else
{
@@ -632,8 +632,12 @@
// We route on the condition
DefaultClusteredBindings cb = (DefaultClusteredBindings)conditionMap.get(routingKey);
+ // log.info("cb is " + cb);
+
if (cb != null)
- {
+ {
+ // log.info("cb size is " + cb.getAllBindings().size());
+
Collection bindings = cb.getAllBindings();
Iterator iter = bindings.iterator();
@@ -648,6 +652,7 @@
if (queueNameNodeIdMap != null)
{
+ // log.info("I have a queue map");
String desiredNodeId = (String)queueNameNodeIdMap.get(binding.getQueue().getName());
//When there are more than one queues with the same name across the cluster we only
@@ -661,13 +666,25 @@
if (handle)
{
+ log.info(this.nodeId + " is handling it");
//It's a local binding so we pass the message on to the subscription
LocalClusteredQueue queue = (LocalClusteredQueue)binding.getQueue();
- Delivery del = queue.handleFromCluster(ref);
+ Delivery del = queue.handleFromCluster(ref);
+
+ log.info("Handled it: " + del);
+ log.info("accepted: " +del.isSelectorAccepted());
}
- }
+ else
+ {
+ log.info(this.nodeId + " not handling it");
+ }
+ }
+ else
+ {
+ //log.info("wrong node");
+ }
}
}
}
Copied: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultRouter.java (from rev 1336, trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/FavourLocalRouter.java)
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/FavourLocalRouter.java 2006-09-21 09:41:24 UTC (rev 1336)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultRouter.java 2006-09-22 12:29:44 UTC (rev 1345)
@@ -0,0 +1,195 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.plugin.postoffice.cluster;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.jboss.logging.Logger;
+import org.jboss.messaging.core.Delivery;
+import org.jboss.messaging.core.DeliveryObserver;
+import org.jboss.messaging.core.MessageReference;
+import org.jboss.messaging.core.Receiver;
+import org.jboss.messaging.core.tx.Transaction;
+
+/**
+ *
+ * A DefaultRouter
+ *
+ * This router always favours the local queue.
+ *
+ * If there is no local queue it will round robin between the others.
+ *
+ * In the case of a distributed point to point queue deployed at each node in the cluster
+ * there will always be a local queue.
+ *
+ * In this case, with the assumption that producers and consumers are distributed evenly across the cluster
+ * then sending the message to the local queue is the most efficient policy.
+ *
+ * In the case of a durable subscription, there may well be no local queue since the durable subscription lives
+ * only on the number of nodes that it is looked up at.
+ *
+ * In this case the round robin routing will kick in
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+public class DefaultRouter implements ClusterRouter
+{
+ private static final Logger log = Logger.getLogger(DefaultRouter.class);
+
+ //MUST be an arraylist for fast index access
+ private ArrayList queues;
+
+ private LocalClusteredQueue localQueue;
+
+ private int target;
+
+ public DefaultRouter()
+ {
+ queues = new ArrayList();
+ }
+
+ public int size()
+ {
+ return queues.size();
+ }
+
+ public LocalClusteredQueue getLocalQueue()
+ {
+ return localQueue;
+ }
+
+ public boolean add(Receiver receiver)
+ {
+ ClusteredQueue queue = (ClusteredQueue)receiver;
+
+ if (queue.isLocal())
+ {
+ if (localQueue != null)
+ {
+ throw new IllegalStateException("Already has local queue");
+ }
+ localQueue = (LocalClusteredQueue)queue;
+ }
+
+ queues.add(queue);
+
+ target = 0;
+
+ return true;
+ }
+
+ public void clear()
+ {
+ queues.clear();
+
+ localQueue = null;
+
+ target = 0;
+ }
+
+ public boolean contains(Receiver queue)
+ {
+ return queues.contains(queue);
+ }
+
+ public Iterator iterator()
+ {
+ return queues.iterator();
+ }
+
+ public boolean remove(Receiver queue)
+ {
+ if (queues.remove(queue))
+ {
+ if (localQueue == queue)
+ {
+ localQueue = null;
+ }
+
+ target = 0;
+
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+ public Delivery handle(DeliveryObserver observer, MessageReference reference, Transaction tx)
+ {
+ //Favour the local queue
+
+ if (localQueue != null)
+ {
+ log.info("There is a local queue");
+
+ //The only time the local queue won't accept is if the selector doesn't
+ //match - in which case it won't match at any other nodes too so no point
+ //in trying them
+
+ Delivery del = localQueue.handle(observer, reference, tx);
+
+ return del;
+ }
+ else
+ {
+ log.info("No local queue!");
+ //There is no local shared queue
+
+ //We round robin among the rest
+ if (!queues.isEmpty())
+ {
+ ClusteredQueue queue = (ClusteredQueue)queues.get(target);
+
+ Delivery del = queue.handle(observer, reference, tx);
+
+ target++;
+
+ if (target == queues.size())
+ {
+ target = 0;
+ }
+
+ //Again, if the selector doesn't match then it won't on any others so no point trying them
+ return del;
+ }
+ }
+ return null;
+ }
+
+ public List getQueues()
+ {
+ return queues;
+ }
+
+ public int numberOfReceivers()
+ {
+ return queues.size();
+ }
+}
Copied: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultRouterFactory.java (from rev 1336, trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/FavourLocalRouterFactory.java)
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/FavourLocalRouterFactory.java 2006-09-21 09:41:24 UTC (rev 1336)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultRouterFactory.java 2006-09-22 12:29:44 UTC (rev 1345)
@@ -0,0 +1,42 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.plugin.postoffice.cluster;
+
+import org.jboss.messaging.core.Router;
+
+
+/**
+ * A FavourLocalRouterFactory
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+public class DefaultRouterFactory implements ClusterRouterFactory
+{
+ public ClusterRouter createRouter()
+ {
+ return new DefaultRouter();
+ }
+}
Deleted: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/FavourLocalRouter.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/FavourLocalRouter.java 2006-09-22 02:06:08 UTC (rev 1344)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/FavourLocalRouter.java 2006-09-22 12:29:44 UTC (rev 1345)
@@ -1,190 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.messaging.core.plugin.postoffice.cluster;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-import org.jboss.messaging.core.Delivery;
-import org.jboss.messaging.core.DeliveryObserver;
-import org.jboss.messaging.core.MessageReference;
-import org.jboss.messaging.core.Receiver;
-import org.jboss.messaging.core.Router;
-import org.jboss.messaging.core.tx.Transaction;
-
-/**
- *
- * A FavourLocalRouter
- *
- * This router always favours the local queue.
- *
- * If there is no local queue it will round robin between the others.
- *
- * In the case of a distributed point to point queue deployed at each node in the cluster
- * there will always be a local queue.
- *
- * In this case, with the assumption that producers and consumers are distributed evenly across the cluster
- * then sending the message to the local queue is the most efficient policy.
- *
- * In the case of a durable subscription, there may well be no local queue since the durable subscription lives
- * only on the number of nodes that it is looked up at.
- *
- * In this case the round robin routing will kick in
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @version <tt>$Revision: 1.1 $</tt>
- *
- * $Id$
- *
- */
-public class FavourLocalRouter implements ClusterRouter
-{
- //MUST be an arraylist for fast index access
- private ArrayList queues;
-
- private LocalClusteredQueue localQueue;
-
- private int target;
-
- public FavourLocalRouter()
- {
- queues = new ArrayList();
- }
-
- public int size()
- {
- return queues.size();
- }
-
- public LocalClusteredQueue getLocalQueue()
- {
- return localQueue;
- }
-
- public boolean add(Receiver receiver)
- {
- ClusteredQueue queue = (ClusteredQueue)receiver;
-
- if (queue.isLocal())
- {
- if (localQueue != null)
- {
- throw new IllegalStateException("Already has local queue");
- }
- localQueue = (LocalClusteredQueue)queue;
- }
-
- queues.add(queue);
-
- target = 0;
-
- return true;
- }
-
- public void clear()
- {
- queues.clear();
-
- localQueue = null;
-
- target = 0;
- }
-
- public boolean contains(Receiver queue)
- {
- return queues.contains(queue);
- }
-
- public Iterator iterator()
- {
- return queues.iterator();
- }
-
- public boolean remove(Receiver queue)
- {
- if (queues.remove(queue))
- {
- if (localQueue == queue)
- {
- localQueue = null;
- }
-
- target = 0;
-
- return true;
- }
- else
- {
- return false;
- }
- }
-
- public Delivery handle(DeliveryObserver observer, MessageReference reference, Transaction tx)
- {
- //Favour the local queue
-
- if (localQueue != null)
- {
- //The only time the local queue won't accept is if the selector doesn't
- //match - in which case it won't match at any other nodes too so no point
- //in trying them
-
- Delivery del = localQueue.handle(observer, reference, tx);
-
- return del;
- }
- else
- {
- //There is no local shared queue
-
- //We round robin among the rest
- if (!queues.isEmpty())
- {
- ClusteredQueue queue = (ClusteredQueue)queues.get(target);
-
- Delivery del = queue.handle(observer, reference, tx);
-
- target++;
-
- if (target == queues.size())
- {
- target = 0;
- }
-
- //Again, if the selector doesn't match then it won't on any others so no point trying them
- return del;
- }
- }
- return null;
- }
-
- public List getQueues()
- {
- return queues;
- }
-
- public int numberOfReceivers()
- {
- return queues.size();
- }
-}
Deleted: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/FavourLocalRouterFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/FavourLocalRouterFactory.java 2006-09-22 02:06:08 UTC (rev 1344)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/FavourLocalRouterFactory.java 2006-09-22 12:29:44 UTC (rev 1345)
@@ -1,42 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.messaging.core.plugin.postoffice.cluster;
-
-import org.jboss.messaging.core.Router;
-
-
-/**
- * A FavourLocalRouterFactory
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @version <tt>$Revision: 1.1 $</tt>
- *
- * $Id$
- *
- */
-public class FavourLocalRouterFactory implements ClusterRouterFactory
-{
- public ClusterRouter createRouter()
- {
- return new FavourLocalRouter();
- }
-}
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java 2006-09-22 02:06:08 UTC (rev 1344)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java 2006-09-22 12:29:44 UTC (rev 1345)
@@ -209,5 +209,8 @@
}
}
-
+ public boolean isClustered()
+ {
+ return true;
+ }
}
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RemoteQueueStub.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RemoteQueueStub.java 2006-09-22 02:06:08 UTC (rev 1344)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RemoteQueueStub.java 2006-09-22 12:29:44 UTC (rev 1345)
@@ -263,4 +263,9 @@
throw new UnsupportedOperationException();
}
+ public boolean isClustered()
+ {
+ return true;
+ }
+
}
Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/DefaultPostOfficeTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/DefaultPostOfficeTest.java 2006-09-22 02:06:08 UTC (rev 1344)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/DefaultPostOfficeTest.java 2006-09-22 12:29:44 UTC (rev 1345)
@@ -21,10 +21,18 @@
*/
package org.jboss.test.messaging.core.plugin.postoffice;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
+import javax.naming.InitialContext;
+import javax.sql.DataSource;
+import javax.transaction.TransactionManager;
+
import org.jboss.jms.selector.Selector;
import org.jboss.jms.server.QueuedExecutorPool;
import org.jboss.messaging.core.Filter;
@@ -49,6 +57,7 @@
import org.jboss.test.messaging.tools.ServerManagement;
import org.jboss.test.messaging.tools.jmx.ServiceContainer;
import org.jboss.test.messaging.util.CoreMessageFactory;
+import org.jboss.tm.TransactionManagerService;
import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
@@ -224,7 +233,7 @@
Binding binding9 = office3.getBindingForQueueName("durableQueue");
assertNull(binding9);
- office3.stop();
+
}
finally
{
@@ -242,6 +251,8 @@
{
office2.stop();
}
+
+ checkNoBindingData();
}
}
@@ -518,6 +529,9 @@
{
postOffice.stop();
}
+
+ checkNoMessageData();
+ checkNoBindingData();
}
}
@@ -558,6 +572,9 @@
{
postOffice.stop();
}
+
+ checkNoMessageData();
+ checkNoBindingData();
}
}
@@ -663,6 +680,9 @@
{
postOffice.stop();
}
+
+ checkNoMessageData();
+ checkNoBindingData();
}
}
@@ -832,6 +852,9 @@
{
postOffice.stop();
}
+
+ checkNoMessageData();
+ checkNoBindingData();
}
}
@@ -1100,6 +1123,9 @@
{
postOffice.stop();
}
+
+ checkNoMessageData();
+ checkNoBindingData();
}
}
@@ -1126,6 +1152,110 @@
return postOffice;
}
+
+ protected boolean checkNoBindingData() throws Exception
+ {
+ InitialContext ctx = new InitialContext();
+
+ TransactionManager mgr = (TransactionManager)ctx.lookup(TransactionManagerService.JNDI_NAME);
+ DataSource ds = (DataSource)ctx.lookup("java:/DefaultDS");
+
+ javax.transaction.Transaction txOld = mgr.suspend();
+ mgr.begin();
+
+ Connection conn = null;
+
+ PreparedStatement ps = null;
+
+ ResultSet rs = null;
+
+ try
+ {
+ conn = ds.getConnection();
+ String sql = "SELECT * FROM JMS_POSTOFFICE";
+ ps = conn.prepareStatement(sql);
+
+ rs = ps.executeQuery();
+
+ return rs.next();
+ }
+ finally
+ {
+ if (rs != null) rs.close();
+
+ if (ps != null) ps.close();
+
+ if (conn != null) conn.close();
+
+ mgr.commit();
+
+ if (txOld != null)
+ {
+ mgr.resume(txOld);
+ }
+
+ }
+ }
+
+ protected boolean checkNoMessageData() throws Exception
+ {
+ InitialContext ctx = new InitialContext();
+
+ TransactionManager mgr = (TransactionManager)ctx.lookup(TransactionManagerService.JNDI_NAME);
+ DataSource ds = (DataSource)ctx.lookup("java:/DefaultDS");
+
+ javax.transaction.Transaction txOld = mgr.suspend();
+ mgr.begin();
+
+ Connection conn = null;
+
+ PreparedStatement ps = null;
+
+ ResultSet rs = null;
+
+ try
+ {
+ conn = ds.getConnection();
+ String sql = "SELECT * FROM JMS_MESSAGE_REFERENCE";
+ ps = conn.prepareStatement(sql);
+
+ rs = ps.executeQuery();
+
+ boolean exists = rs.next();
+
+ if (!exists)
+ {
+ rs.close();
+
+ ps.close();
+
+ ps = conn.prepareStatement("SELECT * FROM JMS_MESSAGE");
+
+ rs = ps.executeQuery();
+
+ exists = rs.next();
+ }
+
+ return exists;
+ }
+ finally
+ {
+ if (rs != null) rs.close();
+
+ if (ps != null) ps.close();
+
+ if (conn != null) conn.close();
+
+ mgr.commit();
+
+ if (txOld != null)
+ {
+ mgr.resume(txOld);
+ }
+
+ }
+ }
+
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeTest.java 2006-09-22 02:06:08 UTC (rev 1344)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeTest.java 2006-09-22 12:29:44 UTC (rev 1345)
@@ -21,6 +21,7 @@
*/
package org.jboss.test.messaging.core.plugin.postoffice.cluster;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
@@ -28,12 +29,14 @@
import org.jboss.messaging.core.FilterFactory;
import org.jboss.messaging.core.Message;
import org.jboss.messaging.core.MessageReference;
+import org.jboss.messaging.core.Queue;
import org.jboss.messaging.core.local.PagingFilteredQueue;
import org.jboss.messaging.core.plugin.contract.ClusteredPostOffice;
+import org.jboss.messaging.core.plugin.contract.PostOffice;
import org.jboss.messaging.core.plugin.postoffice.Binding;
import org.jboss.messaging.core.plugin.postoffice.cluster.ClusterRouterFactory;
import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultClusteredPostOffice;
-import org.jboss.messaging.core.plugin.postoffice.cluster.FavourLocalRouterFactory;
+import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultRouterFactory;
import org.jboss.messaging.core.plugin.postoffice.cluster.LocalClusteredQueue;
import org.jboss.messaging.core.plugin.postoffice.cluster.MessagePullPolicy;
import org.jboss.messaging.core.plugin.postoffice.cluster.NullMessagePullPolicy;
@@ -425,6 +428,8 @@
{
office2.stop();
}
+
+ checkNoBindingData();
}
}
@@ -459,7 +464,27 @@
this.clusteredRouteWithFilter(true);
}
+ public void testRouteSharedPointToPointQueuePersistent() throws Throwable
+ {
+ this.routeSharedQueue(true);
+ }
+ public void testRouteSharedPointToPointQueueNonPersistent() throws Throwable
+ {
+ this.routeSharedQueue(false);
+ }
+
+ public void testRouteComplexTopicPersistent() throws Throwable
+ {
+ this.routeComplexTopic(true);
+ }
+
+ public void testRouteComplexTopicNonPersistent() throws Throwable
+ {
+ this.routeComplexTopic(false);
+ }
+
+
/*
* We should allow the clustered bind of queues with the same queue name on different nodes of the
* cluster
@@ -673,6 +698,8 @@
{
office2.stop();
}
+
+ checkNoMessageData();
}
}
@@ -841,10 +868,635 @@
{
office2.stop();
}
+
+ checkNoMessageData();
}
}
+ protected void routeSharedQueue(boolean persistentMessage) throws Throwable
+ {
+ ClusteredPostOffice office1 = null;
+
+ ClusteredPostOffice office2 = null;
+
+ ClusteredPostOffice office3 = null;
+
+ ClusteredPostOffice office4 = null;
+
+ ClusteredPostOffice office5 = null;
+
+ ClusteredPostOffice office6 = null;
+
+ try
+ {
+ office1 = createClusteredPostOffice("node1", "testgroup");
+ office2 = createClusteredPostOffice("node2", "testgroup");
+ office3 = createClusteredPostOffice("node3", "testgroup");
+ office4 = createClusteredPostOffice("node4", "testgroup");
+ office5 = createClusteredPostOffice("node5", "testgroup");
+ office6 = createClusteredPostOffice("node6", "testgroup");
+
+ //We deploy the queue on nodes 1, 2, 3, 4 and 5
+ //We don't deploy on node 6
+
+ LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, "node1", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ Binding binding1 = office1.bindClusteredQueue("queue1", queue1);
+ SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+ queue1.add(receiver1);
+
+ LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, "node2", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ Binding binding2 = office2.bindClusteredQueue("queue1", queue2);
+ SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+ queue2.add(receiver2);
+
+ LocalClusteredQueue queue3 = new LocalClusteredQueue(office3, "node3", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ Binding binding3 = office3.bindClusteredQueue("queue1", queue3);
+ SimpleReceiver receiver3 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+ queue3.add(receiver3);
+
+ LocalClusteredQueue queue4 = new LocalClusteredQueue(office4, "node4", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ Binding binding4 = office4.bindClusteredQueue("queue1", queue4);
+ SimpleReceiver receiver4 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+ queue4.add(receiver4);
+
+ LocalClusteredQueue queue5 = new LocalClusteredQueue(office5, "node5", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ Binding binding5 = office5.bindClusteredQueue("queue1", queue5);
+ SimpleReceiver receiver5 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+ queue5.add(receiver5);
+
+ //We are using a AlwaysLocalRoutingPolicy so only the local queue should ever get the message if the filter matches
+
+ Message msg = CoreMessageFactory.createCoreMessage(1, persistentMessage, null);
+ MessageReference ref = ms.reference(msg);
+ boolean routed = office1.route(ref, "queue1", null);
+ assertTrue(routed);
+ checkContainsAndAcknowledge(msg, receiver1, queue1);
+ this.checkEmpty(receiver2);
+ this.checkEmpty(receiver3);
+ this.checkEmpty(receiver4);
+ this.checkEmpty(receiver5);
+
+ msg = CoreMessageFactory.createCoreMessage(1, persistentMessage, null);
+ ref = ms.reference(msg);
+ routed = office2.route(ref, "queue1", null);
+ assertTrue(routed);
+ this.checkEmpty(receiver1);
+ checkContainsAndAcknowledge(msg, receiver2, queue2);
+ this.checkEmpty(receiver3);
+ this.checkEmpty(receiver4);
+ this.checkEmpty(receiver5);
+
+ msg = CoreMessageFactory.createCoreMessage(1, persistentMessage, null);
+ ref = ms.reference(msg);
+ routed = office3.route(ref, "queue1", null);
+ assertTrue(routed);
+ this.checkEmpty(receiver1);
+ this.checkEmpty(receiver2);
+ checkContainsAndAcknowledge(msg, receiver3, queue3);
+ this.checkEmpty(receiver4);
+ this.checkEmpty(receiver5);
+
+ msg = CoreMessageFactory.createCoreMessage(1, persistentMessage, null);
+ ref = ms.reference(msg);
+ routed = office4.route(ref, "queue1", null);
+ assertTrue(routed);
+ this.checkEmpty(receiver1);
+ this.checkEmpty(receiver2);
+ this.checkEmpty(receiver3);
+ checkContainsAndAcknowledge(msg, receiver4, queue3);
+ this.checkEmpty(receiver5);
+
+ msg = CoreMessageFactory.createCoreMessage(1, persistentMessage, null);
+ ref = ms.reference(msg);
+ routed = office5.route(ref, "queue1", null);
+ assertTrue(routed);
+ this.checkEmpty(receiver1);
+ this.checkEmpty(receiver2);
+ this.checkEmpty(receiver3);
+ this.checkEmpty(receiver4);
+ checkContainsAndAcknowledge(msg, receiver5, queue5);
+
+ log.info("************* ROOTING");
+
+ msg = CoreMessageFactory.createCoreMessage(1, persistentMessage, null);
+ ref = ms.reference(msg);
+ routed = office6.route(ref, "queue1", null);
+ assertTrue(routed);
+
+ //The actual queue that receives the mesage is determined by the routing policy
+ //The default uses round robin for the nodes (this is tested more thoroughly in
+ //its own test)
+
+ Thread.sleep(1000);
+
+ log.info("checking");
+ checkContainsAndAcknowledge(msg, receiver1, queue1);
+ this.checkEmpty(receiver1);
+ this.checkEmpty(receiver2);
+ this.checkEmpty(receiver3);
+ this.checkEmpty(receiver4);
+ this.checkEmpty(receiver5);
+
+ }
+ finally
+ {
+ if (office1 != null)
+ {
+ office1.stop();
+ }
+
+ if (office2 != null)
+ {
+ office2.stop();
+ }
+
+ if (office3 != null)
+ {
+ office3.stop();
+ }
+
+ if (office4 != null)
+ {
+ office4.stop();
+ }
+
+ if (office5 != null)
+ {
+ office5.stop();
+ }
+
+ if (office6 != null)
+ {
+ office6.stop();
+ }
+
+ checkNoMessageData();
+ }
+ }
+
+
+ /*
+ * We set up a complex scenario with multiple subscriptions, shared and unshared on different nodes
+ *
+ * node1: no subscriptions
+ * node2: 2 non durable
+ * node3: 1 non shared durable, 1 non durable
+ * node4: 1 shared durable (shared1), 1 non shared durable, 3 non durable
+ * node5: 2 shared durable (shared1 and shared2)
+ * node6: 1 shared durable (shared2), 1 non durable
+ * node7: 1 shared durable (shared2)
+ *
+ * Then we send mess
+ *
+ *
+ */
+ protected void routeComplexTopic(boolean persistent) throws Throwable
+ {
+ ClusteredPostOffice office1 = null;
+
+ ClusteredPostOffice office2 = null;
+
+ ClusteredPostOffice office3 = null;
+
+ ClusteredPostOffice office4 = null;
+
+ ClusteredPostOffice office5 = null;
+
+ ClusteredPostOffice office6 = null;
+
+ ClusteredPostOffice office7 = null;
+
+ try
+ {
+ office1 = createClusteredPostOffice("node1", "testgroup");
+ office2 = createClusteredPostOffice("node2", "testgroup");
+ office3 = createClusteredPostOffice("node3", "testgroup");
+ office4 = createClusteredPostOffice("node4", "testgroup");
+ office5 = createClusteredPostOffice("node5", "testgroup");
+ office6 = createClusteredPostOffice("node6", "testgroup");
+ office7 = createClusteredPostOffice("node7", "testgroup");
+
+ //Node 2
+ //======
+
+ //Non durable 1 on node 2
+ LocalClusteredQueue nonDurable1 = new LocalClusteredQueue(office2, "node2", "nondurable1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ Binding binding1 = office2.bindClusteredQueue("topic", nonDurable1);
+ SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+ nonDurable1.add(receiver1);
+
+ //Non durable 2 on node 2
+ LocalClusteredQueue nonDurable2 = new LocalClusteredQueue(office2, "node2", "nondurable2", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ Binding binding2 = office2.bindClusteredQueue("topic", nonDurable2);
+ SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+ nonDurable2.add(receiver2);
+
+ //Node 3
+ //======
+
+ //Non shared durable
+ LocalClusteredQueue nonSharedDurable1 = new LocalClusteredQueue(office3, "node3", "nonshareddurable1", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null);
+ Binding binding3 = office3.bindClusteredQueue("topic", nonSharedDurable1);
+ SimpleReceiver receiver3 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+ nonSharedDurable1.add(receiver3);
+
+ //Non durable
+ LocalClusteredQueue nonDurable3 = new LocalClusteredQueue(office3, "node3", "nondurable3", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ Binding binding4 = office3.bindClusteredQueue("topic", nonDurable3);
+ SimpleReceiver receiver4 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+ nonDurable3.add(receiver4);
+
+ //Node 4
+ //======
+
+ //Shared durable
+ LocalClusteredQueue sharedDurable1 = new LocalClusteredQueue(office4, "node4", "shareddurable1", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null);
+ Binding binding5 = office4.bindClusteredQueue("topic", sharedDurable1);
+ SimpleReceiver receiver5 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+ sharedDurable1.add(receiver5);
+
+ //Non shared durable
+ LocalClusteredQueue nonSharedDurable2 = new LocalClusteredQueue(office4, "node4", "nonshareddurable2", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null);
+ Binding binding6 = office4.bindClusteredQueue("topic", nonSharedDurable2);
+ SimpleReceiver receiver6 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+ nonSharedDurable2.add(receiver6);
+
+ //Non durable
+ LocalClusteredQueue nonDurable4 = new LocalClusteredQueue(office4, "node4", "nondurable4", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ Binding binding7 = office4.bindClusteredQueue("topic", nonDurable4);
+ SimpleReceiver receiver7 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+ nonDurable4.add(receiver7);
+
+ // Non durable
+ LocalClusteredQueue nonDurable5 = new LocalClusteredQueue(office4, "node4", "nondurable5", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ Binding binding8 = office4.bindClusteredQueue("topic", nonDurable5);
+ SimpleReceiver receiver8 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+ nonDurable5.add(receiver8);
+
+ //Non durable
+ LocalClusteredQueue nonDurable6 = new LocalClusteredQueue(office4, "node4", "nondurable6", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ Binding binding9 = office4.bindClusteredQueue("topic", nonDurable6);
+ SimpleReceiver receiver9 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+ nonDurable6.add(receiver9);
+
+ // Node 5
+ //=======
+ //Shared durable
+ LocalClusteredQueue sharedDurable2 = new LocalClusteredQueue(office5, "node5", "shareddurable1", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null);
+ Binding binding10 = office5.bindClusteredQueue("topic", sharedDurable2);
+ SimpleReceiver receiver10 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+ sharedDurable2.add(receiver10);
+
+ //Shared durable
+ LocalClusteredQueue sharedDurable3 = new LocalClusteredQueue(office5, "node5", "shareddurable2", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null);
+ Binding binding11 = office5.bindClusteredQueue("topic", sharedDurable3);
+ SimpleReceiver receiver11 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+ sharedDurable3.add(receiver11);
+
+ // Node 6
+ //=========
+ LocalClusteredQueue sharedDurable4 = new LocalClusteredQueue(office6, "node6", "shareddurable2", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null);
+ Binding binding12 = office6.bindClusteredQueue("topic", sharedDurable4);
+ SimpleReceiver receiver12 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+ sharedDurable4.add(receiver12);
+
+ LocalClusteredQueue nonDurable7 = new LocalClusteredQueue(office6, "node6", "nondurable7", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ Binding binding13 = office6.bindClusteredQueue("topic", nonDurable7);
+ SimpleReceiver receiver13 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+ nonDurable7.add(receiver13);
+
+ //Node 7
+ //=======
+ LocalClusteredQueue sharedDurable5 = new LocalClusteredQueue(office7, "node7", "shareddurable2", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null);
+ Binding binding14 = office7.bindClusteredQueue("topic", sharedDurable5);
+ SimpleReceiver receiver14 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+ sharedDurable5.add(receiver14);
+
+
+ //Send 3 messages at node1
+ //========================
+
+ List msgs = sendMessages(persistent, office1, 3, null);
+
+ //n2
+ checkContainsAndAcknowledge(msgs, receiver1, nonDurable1);
+ checkContainsAndAcknowledge(msgs, receiver2, nonDurable2);
+
+ //n3
+ checkContainsAndAcknowledge(msgs, receiver3, nonSharedDurable1);
+ checkContainsAndAcknowledge(msgs, receiver4, nonDurable3);
+
+ //n4
+ checkContainsAndAcknowledge(msgs, receiver5, sharedDurable1);
+ checkContainsAndAcknowledge(msgs, receiver6, nonSharedDurable2);
+ checkContainsAndAcknowledge(msgs, receiver7, nonDurable4);
+ checkContainsAndAcknowledge(msgs, receiver8, nonDurable5);
+ checkContainsAndAcknowledge(msgs, receiver9, nonDurable6);
+
+ //n5
+ checkEmpty(receiver10);
+ checkContainsAndAcknowledge(msgs, receiver11, sharedDurable3);
+
+ //n6
+ checkEmpty(receiver12);
+ checkContainsAndAcknowledge(msgs, receiver13, nonDurable7);
+
+ //n7
+ checkEmpty(receiver12);
+
+
+ //Send 3 messages at node2
+ //========================
+
+ msgs = sendMessages(persistent, office2, 3, null);
+
+ //n2
+ checkContainsAndAcknowledge(msgs, receiver1, nonDurable1);
+ checkContainsAndAcknowledge(msgs, receiver2, nonDurable2);
+
+ //n3
+ checkContainsAndAcknowledge(msgs, receiver3, nonSharedDurable1);
+ checkContainsAndAcknowledge(msgs, receiver4, nonDurable3);
+
+ //n4
+ checkContainsAndAcknowledge(msgs, receiver5, sharedDurable1);
+ checkContainsAndAcknowledge(msgs, receiver6, nonSharedDurable2);
+ checkContainsAndAcknowledge(msgs, receiver7, nonDurable4);
+ checkContainsAndAcknowledge(msgs, receiver8, nonDurable5);
+ checkContainsAndAcknowledge(msgs, receiver9, nonDurable6);
+
+ //n5
+ checkEmpty(receiver10);
+ checkContainsAndAcknowledge(msgs, receiver11, sharedDurable3);
+
+ //n6
+ checkEmpty(receiver12);
+ checkContainsAndAcknowledge(msgs, receiver13, nonDurable7);
+
+ //n7
+ checkEmpty(receiver12);
+
+ //Send 3 messages at node3
+ //========================
+
+ msgs = sendMessages(persistent, office3, 3, null);
+
+ //n2
+ checkContainsAndAcknowledge(msgs, receiver1, nonDurable1);
+ checkContainsAndAcknowledge(msgs, receiver2, nonDurable2);
+
+ //n3
+ checkContainsAndAcknowledge(msgs, receiver3, nonSharedDurable1);
+ checkContainsAndAcknowledge(msgs, receiver4, nonDurable3);
+
+ //n4
+ checkContainsAndAcknowledge(msgs, receiver5, sharedDurable1);
+ checkContainsAndAcknowledge(msgs, receiver6, nonSharedDurable2);
+ checkContainsAndAcknowledge(msgs, receiver7, nonDurable4);
+ checkContainsAndAcknowledge(msgs, receiver8, nonDurable5);
+ checkContainsAndAcknowledge(msgs, receiver9, nonDurable6);
+
+ //n5
+ checkEmpty(receiver10);
+ checkContainsAndAcknowledge(msgs, receiver11, sharedDurable3);
+
+ //n6
+ checkEmpty(receiver12);
+ checkContainsAndAcknowledge(msgs, receiver13, nonDurable7);
+
+ //n7
+ checkEmpty(receiver12);
+
+ //Send 3 messages at node4
+ //========================
+
+ msgs = sendMessages(persistent, office4, 3, null);
+
+ //n2
+ checkContainsAndAcknowledge(msgs, receiver1, nonDurable1);
+ checkContainsAndAcknowledge(msgs, receiver2, nonDurable2);
+
+ //n3
+ checkContainsAndAcknowledge(msgs, receiver3, nonSharedDurable1);
+ checkContainsAndAcknowledge(msgs, receiver4, nonDurable3);
+
+ //n4
+ checkContainsAndAcknowledge(msgs, receiver5, sharedDurable1);
+ checkContainsAndAcknowledge(msgs, receiver6, nonSharedDurable2);
+ checkContainsAndAcknowledge(msgs, receiver7, nonDurable4);
+ checkContainsAndAcknowledge(msgs, receiver8, nonDurable5);
+ checkContainsAndAcknowledge(msgs, receiver9, nonDurable6);
+
+ //n5
+ checkEmpty(receiver10);
+ checkContainsAndAcknowledge(msgs, receiver11, sharedDurable3);
+
+ //n6
+ checkEmpty(receiver12);
+ checkContainsAndAcknowledge(msgs, receiver13, nonDurable7);
+
+ //n7
+ checkEmpty(receiver12);
+
+ //Send 3 messages at node5
+ //========================
+
+ msgs = sendMessages(persistent, office5, 3, null);
+
+ //n2
+ checkContainsAndAcknowledge(msgs, receiver1, nonDurable1);
+ checkContainsAndAcknowledge(msgs, receiver2, nonDurable2);
+
+ //n3
+ checkContainsAndAcknowledge(msgs, receiver3, nonSharedDurable1);
+ checkContainsAndAcknowledge(msgs, receiver4, nonDurable3);
+
+ //n4
+ checkEmpty(receiver5);
+ checkContainsAndAcknowledge(msgs, receiver6, nonSharedDurable2);
+ checkContainsAndAcknowledge(msgs, receiver7, nonDurable4);
+ checkContainsAndAcknowledge(msgs, receiver8, nonDurable5);
+ checkContainsAndAcknowledge(msgs, receiver9, nonDurable6);
+
+ //n5
+ checkContainsAndAcknowledge(msgs, receiver10, sharedDurable2);
+ checkContainsAndAcknowledge(msgs, receiver11, sharedDurable3);
+
+ //n6
+ checkEmpty(receiver12);
+ checkContainsAndAcknowledge(msgs, receiver13, nonDurable7);
+
+ //n7
+ checkEmpty(receiver12);
+
+ //Send 3 messages at node6
+ //========================
+
+ msgs = sendMessages(persistent, office6, 3, null);
+
+ //n2
+ checkContainsAndAcknowledge(msgs, receiver1, nonDurable1);
+ checkContainsAndAcknowledge(msgs, receiver2, nonDurable2);
+
+ //n3
+ checkContainsAndAcknowledge(msgs, receiver3, nonSharedDurable1);
+ checkContainsAndAcknowledge(msgs, receiver4, nonDurable3);
+
+ //n4
+ checkContainsAndAcknowledge(msgs, receiver5, sharedDurable1);
+ checkContainsAndAcknowledge(msgs, receiver6, nonSharedDurable2);
+ checkContainsAndAcknowledge(msgs, receiver7, nonDurable4);
+ checkContainsAndAcknowledge(msgs, receiver8, nonDurable5);
+ checkContainsAndAcknowledge(msgs, receiver9, nonDurable6);
+
+ //n5
+ checkEmpty(receiver10);
+
+ checkEmpty(receiver11);
+
+ //n6
+ checkContainsAndAcknowledge(msgs, receiver12, sharedDurable4);
+ checkContainsAndAcknowledge(msgs, receiver13, nonDurable7);
+
+ //n7
+ checkEmpty(receiver12);
+
+ //Send 3 messages at node7
+ //========================
+
+ msgs = sendMessages(persistent, office7, 3, null);
+
+ //n2
+ checkContainsAndAcknowledge(msgs, receiver1, nonDurable1);
+ checkContainsAndAcknowledge(msgs, receiver2, nonDurable2);
+
+ //n3
+ checkContainsAndAcknowledge(msgs, receiver3, nonSharedDurable1);
+ checkContainsAndAcknowledge(msgs, receiver4, nonDurable3);
+
+ //n4
+ checkContainsAndAcknowledge(msgs, receiver5, sharedDurable1);
+ checkContainsAndAcknowledge(msgs, receiver6, nonSharedDurable2);
+ checkContainsAndAcknowledge(msgs, receiver7, nonDurable4);
+ checkContainsAndAcknowledge(msgs, receiver8, nonDurable5);
+ checkContainsAndAcknowledge(msgs, receiver9, nonDurable6);
+
+ //n5
+ checkEmpty(receiver10);
+ checkEmpty(receiver11);
+
+ //n6
+ checkEmpty(receiver12);
+ checkContainsAndAcknowledge(msgs, receiver13, nonDurable7);
+
+ //n7
+ checkContainsAndAcknowledge(msgs, receiver14, sharedDurable5);
+
+ }
+ finally
+ {
+ if (office1 != null)
+ {
+ office1.stop();
+ }
+
+ if (office2 != null)
+ {
+ office2.stop();
+ }
+
+ if (office3 != null)
+ {
+ office3.stop();
+ }
+
+ if (office4 != null)
+ {
+ office4.stop();
+ }
+
+ if (office5 != null)
+ {
+ office5.stop();
+ }
+
+ if (office6 != null)
+ {
+ office6.stop();
+ }
+
+ if (office7 != null)
+ {
+ office7.stop();
+ }
+
+ checkNoMessageData();
+ }
+ }
+
+
+ private List sendMessages(boolean persistent, PostOffice office, int num, Transaction tx) throws Exception
+ {
+ List list = new ArrayList();
+
+ Message msg = CoreMessageFactory.createCoreMessage(1, persistent, null);
+
+ MessageReference ref = ms.reference(msg);
+
+ boolean routed = office.route(ref, "topic", null);
+
+ assertTrue(routed);
+
+ list.add(msg);
+
+ Thread.sleep(1000);
+
+ return list;
+ }
+
+ private void checkContainsAndAcknowledge(Message msg, SimpleReceiver receiver, Queue queue) throws Throwable
+ {
+ List msgs = receiver.getMessages();
+ assertNotNull(msgs);
+ assertEquals(1, msgs.size());
+ Message msgRec = (Message)msgs.get(0);
+ assertEquals(msg.getMessageID(), msgRec.getMessageID());
+ receiver.acknowledge(msgRec, null);
+ msgs = queue.browse();
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
+ receiver.clear();
+ }
+
+ private void checkContainsAndAcknowledge(List msgList, SimpleReceiver receiver, Queue queue) throws Throwable
+ {
+ List msgs = receiver.getMessages();
+ assertNotNull(msgs);
+ assertEquals(msgList.size(), msgs.size());
+
+ for (int i = 0; i < msgList.size(); i++)
+ {
+ Message msgRec = (Message)msgs.get(i);
+ Message msgCheck = (Message)msgList.get(i);
+ assertEquals(msgCheck.getMessageID(), msgRec.getMessageID());
+ receiver.acknowledge(msgRec, null);
+ }
+
+ msgs = queue.browse();
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
+ receiver.clear();
+ }
+
+ private void checkEmpty(SimpleReceiver receiver) throws Throwable
+ {
+ List msgs = receiver.getMessages();
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
+ }
+
+
protected void clusteredTransactionalRoute(boolean persistent) throws Throwable
{
ClusteredPostOffice office1 = null;
@@ -1441,16 +2093,18 @@
{
office2.stop();
}
+
+ checkNoMessageData();
}
}
protected ClusteredPostOffice createClusteredPostOffice(String nodeId, String groupName) throws Exception
{
- MessagePullPolicy redistPolicy = new NullMessagePullPolicy();
+ MessagePullPolicy pullPolicy = new NullMessagePullPolicy();
FilterFactory ff = new SimpleFilterFactory();
- ClusterRouterFactory rf = new FavourLocalRouterFactory();
+ ClusterRouterFactory rf = new DefaultRouterFactory();
DefaultClusteredPostOffice postOffice =
new DefaultClusteredPostOffice(sc.getDataSource(), sc.getTransactionManager(),
@@ -1458,7 +2112,7 @@
groupName,
JGroupsUtil.getControlStackProperties(),
JGroupsUtil.getDataStackProperties(),
- 5000, 5000, redistPolicy, rf, 1);
+ 5000, 5000, pullPolicy, rf, 1);
postOffice.start();
Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/FavourLocalRouterTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/FavourLocalRouterTest.java 2006-09-22 02:06:08 UTC (rev 1344)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/FavourLocalRouterTest.java 2006-09-22 12:29:44 UTC (rev 1345)
@@ -30,7 +30,7 @@
import org.jboss.messaging.core.plugin.postoffice.Binding;
import org.jboss.messaging.core.plugin.postoffice.cluster.ClusterRouterFactory;
import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultClusteredPostOffice;
-import org.jboss.messaging.core.plugin.postoffice.cluster.FavourLocalRouterFactory;
+import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultRouterFactory;
import org.jboss.messaging.core.plugin.postoffice.cluster.LocalClusteredQueue;
import org.jboss.messaging.core.plugin.postoffice.cluster.MessagePullPolicy;
import org.jboss.messaging.core.plugin.postoffice.cluster.NullMessagePullPolicy;
@@ -146,7 +146,7 @@
FilterFactory ff = new SimpleFilterFactory();
- ClusterRouterFactory rf = new FavourLocalRouterFactory();
+ ClusterRouterFactory rf = new DefaultRouterFactory();
DefaultClusteredPostOffice postOffice =
new DefaultClusteredPostOffice(sc.getDataSource(), sc.getTransactionManager(),
More information about the jboss-cvs-commits
mailing list