[Jboss-cvs] JBoss Messaging SVN: r1345 - in trunk: src/main/org/jboss/messaging/core src/main/org/jboss/messaging/core/local src/main/org/jboss/messaging/core/plugin src/main/org/jboss/messaging/core/plugin/postoffice/cluster tests/src/org/jboss/test/messaging/core/plugin/postoffice tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Sep 22 08:29:55 EDT 2006


Author: timfox
Date: 2006-09-22 08:29:44 -0400 (Fri, 22 Sep 2006)
New Revision: 1345

Added:
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultRouter.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultRouterFactory.java
Removed:
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/FavourLocalRouter.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/FavourLocalRouterFactory.java
Modified:
   trunk/src/main/org/jboss/messaging/core/Queue.java
   trunk/src/main/org/jboss/messaging/core/local/PagingFilteredQueue.java
   trunk/src/main/org/jboss/messaging/core/plugin/ClusteredPostOfficeService.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RemoteQueueStub.java
   trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/DefaultPostOfficeTest.java
   trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeTest.java
   trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/FavourLocalRouterTest.java
Log:
Distributed destinations



Modified: trunk/src/main/org/jboss/messaging/core/Queue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/Queue.java	2006-09-22 02:06:08 UTC (rev 1344)
+++ trunk/src/main/org/jboss/messaging/core/Queue.java	2006-09-22 12:29:44 UTC (rev 1345)
@@ -32,7 +32,9 @@
  */
 public interface Queue extends Channel
 {
-   public String getName();
+   String getName();
    
-   public Filter getFilter();
+   Filter getFilter();
+   
+   boolean isClustered();
 }

Modified: trunk/src/main/org/jboss/messaging/core/local/PagingFilteredQueue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/local/PagingFilteredQueue.java	2006-09-22 02:06:08 UTC (rev 1344)
+++ trunk/src/main/org/jboss/messaging/core/local/PagingFilteredQueue.java	2006-09-22 12:29:44 UTC (rev 1345)
@@ -98,6 +98,24 @@
       
       this.filter = filter;
    }
+   
+   // Queue implementation
+   // ---------------------------------------------------------------
+   
+   public boolean isClustered()
+   {
+      return false;
+   }
+   
+   public String getName()
+   {
+      return name;
+   }
+      
+   public Filter getFilter()
+   {
+      return filter;
+   }
     
    // Channel implementation ----------------------------------------   
    
@@ -116,21 +134,11 @@
    }
    
    // Public --------------------------------------------------------
-
-   public String getName()
-   {
-      return name;
-   }
    
    public String toString()
    {
       return "Queue[" + getChannelID() + "]";
    }
-   
-   public Filter getFilter()
-   {
-      return filter;
-   }
 
    // Package protected ---------------------------------------------
    

Modified: trunk/src/main/org/jboss/messaging/core/plugin/ClusteredPostOfficeService.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/ClusteredPostOfficeService.java	2006-09-22 02:06:08 UTC (rev 1344)
+++ trunk/src/main/org/jboss/messaging/core/plugin/ClusteredPostOfficeService.java	2006-09-22 12:29:44 UTC (rev 1345)
@@ -34,7 +34,7 @@
 import org.jboss.messaging.core.plugin.contract.PersistenceManager;
 import org.jboss.messaging.core.plugin.postoffice.cluster.ClusterRouterFactory;
 import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultClusteredPostOffice;
-import org.jboss.messaging.core.plugin.postoffice.cluster.FavourLocalRouterFactory;
+import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultRouterFactory;
 import org.jboss.messaging.core.plugin.postoffice.cluster.MessagePullPolicy;
 import org.jboss.messaging.core.plugin.postoffice.cluster.NullMessagePullPolicy;
 import org.jboss.messaging.core.tx.TransactionRepository;
@@ -209,7 +209,7 @@
          
          FilterFactory ff = new SelectorFactory();
          
-         ClusterRouterFactory rf = new FavourLocalRouterFactory();
+         ClusterRouterFactory rf = new DefaultRouterFactory();
                   
          postOffice =  new DefaultClusteredPostOffice(ds, tm, sqlProperties, createTablesOnStartup,
                                                nodeId, officeName, ms,

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java	2006-09-22 02:06:08 UTC (rev 1344)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java	2006-09-22 12:29:44 UTC (rev 1345)
@@ -388,31 +388,31 @@
                if (del != null && del.isSelectorAccepted())
                {
                   routed = true;
-               }
-                              
-               ClusteredQueue queue = (ClusteredQueue)del.getObserver();
                
-               if (del.isSelectorAccepted() && !queue.isLocal())
-               {
-                  //We need to send the message remotely
-                  numberRemote++;
+                  ClusteredQueue queue = (ClusteredQueue)del.getObserver();
                   
-                  lastNodeId = queue.getNodeId();
-                  
-                  if (router.numberOfReceivers() > 1 && queueNameNodeIdMap == null)
+                  if (!queue.isLocal())
                   {
-                     //If there are more than one queues with the same node on the remote nodes
-                     //We have now chosen which one will receive the message so we need to add this
-                     //information to a map which will get sent when casting - so the the queue
-                     //on the receiving node knows whether to receive the message
-                     queueNameNodeIdMap = new HashMap();
+                     //We need to send the message remotely
+                     numberRemote++;
                      
-                     //We add an entry to the map so that on the receiving node we can work out which
-                     //queue instance will receive the message
-                     queueNameNodeIdMap.put(queue.getName(), lastNodeId);
+                     lastNodeId = queue.getNodeId();
+                     
+                     if (router.numberOfReceivers() > 1 && queueNameNodeIdMap == null)
+                     {
+                        //If there are more than one queues with the same node on the remote nodes
+                        //We have now chosen which one will receive the message so we need to add this
+                        //information to a map which will get sent when casting - so the the queue
+                        //on the receiving node knows whether to receive the message
+                        queueNameNodeIdMap = new HashMap();
+                        
+                        //We add an entry to the map so that on the receiving node we can work out which
+                        //queue instance will receive the message
+                        queueNameNodeIdMap.put(queue.getName(), lastNodeId);
+                     }
+                     
+                     lastChannelId = queue.getChannelID();
                   }
-                  
-                  lastChannelId = queue.getChannelID();
                }
             }
             
@@ -434,7 +434,7 @@
                      
                      //FIXME - temporarily commented out until can get unicast to work
                      //asyncSendRequest(new MessageRequest(condition, ref.getMessage(), null), lastNodeId);
-                     asyncSendRequest(new MessageRequest(condition, ref.getMessage(), null));
+                     asyncSendRequest(new MessageRequest(condition, ref.getMessage(), queueNameNodeIdMap));
                   }
                   else
                   {
@@ -632,8 +632,12 @@
          // We route on the condition
          DefaultClusteredBindings cb = (DefaultClusteredBindings)conditionMap.get(routingKey);
          
+        // log.info("cb is " + cb);
+         
          if (cb != null)
-         {                                
+         {                      
+          //  log.info("cb size is " + cb.getAllBindings().size());
+            
             Collection bindings = cb.getAllBindings();
             
             Iterator iter = bindings.iterator();
@@ -648,6 +652,7 @@
                   
                   if (queueNameNodeIdMap != null)
                   {
+                    // log.info("I have a queue map");
                      String desiredNodeId = (String)queueNameNodeIdMap.get(binding.getQueue().getName());
                      
                      //When there are more than one queues with the same name across the cluster we only
@@ -661,13 +666,25 @@
                   
                   if (handle)
                   {
+                     log.info(this.nodeId + " is handling it");
                      //It's a local binding so we pass the message on to the subscription
                      
                      LocalClusteredQueue queue = (LocalClusteredQueue)binding.getQueue();
                      
-                     Delivery del = queue.handleFromCluster(ref);                                          
+                     Delivery del = queue.handleFromCluster(ref);         
+                     
+                     log.info("Handled it: " + del);
+                     log.info("accepted: " +del.isSelectorAccepted());
                   }
-               }                                              
+                  else
+                  {
+                     log.info(this.nodeId + " not handling it");
+                  }
+               }
+               else
+               {
+                  //log.info("wrong node");
+               }
             }                          
          }
       }

Copied: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultRouter.java (from rev 1336, trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/FavourLocalRouter.java)
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/FavourLocalRouter.java	2006-09-21 09:41:24 UTC (rev 1336)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultRouter.java	2006-09-22 12:29:44 UTC (rev 1345)
@@ -0,0 +1,195 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.plugin.postoffice.cluster;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.jboss.logging.Logger;
+import org.jboss.messaging.core.Delivery;
+import org.jboss.messaging.core.DeliveryObserver;
+import org.jboss.messaging.core.MessageReference;
+import org.jboss.messaging.core.Receiver;
+import org.jboss.messaging.core.tx.Transaction;
+
+/**
+ * 
+ * A DefaultRouter
+ * 
+ * This router always favours the local queue.
+ * 
+ * If there is no local queue it will round robin between the others.
+ * 
+ * In the case of a distributed point to point queue deployed at each node in the cluster
+ * there will always be a local queue.
+ * 
+ * In this case, with the assumption that producers and consumers are distributed evenly across the cluster
+ * then sending the message to the local queue is the most efficient policy.
+ * 
+ * In the case of a durable subscription, there may well be no local queue since the durable subscription lives
+ * only on the number of nodes that it is looked up at.
+ * 
+ * In this case the round robin routing will kick in
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+public class DefaultRouter implements ClusterRouter
+{
+   private static final Logger log = Logger.getLogger(DefaultRouter.class);
+      
+   //MUST be an arraylist for fast index access
+   private ArrayList queues;
+   
+   private LocalClusteredQueue localQueue;
+   
+   private int target;
+   
+   public DefaultRouter()
+   {
+      queues = new ArrayList();
+   }
+   
+   public int size()
+   {
+      return queues.size();
+   }
+   
+   public LocalClusteredQueue getLocalQueue()
+   {
+      return localQueue;
+   }
+
+   public boolean add(Receiver receiver)
+   {
+      ClusteredQueue queue = (ClusteredQueue)receiver;
+      
+      if (queue.isLocal())
+      {
+         if (localQueue != null)
+         {
+            throw new IllegalStateException("Already has local queue");
+         }
+         localQueue = (LocalClusteredQueue)queue;
+      }
+      
+      queues.add(queue); 
+      
+      target = 0;
+      
+      return true;
+   }
+
+   public void clear()
+   {
+      queues.clear();
+      
+      localQueue = null;
+      
+      target = 0;
+   }
+
+   public boolean contains(Receiver queue)
+   {
+      return queues.contains(queue);
+   }
+
+   public Iterator iterator()
+   {
+      return queues.iterator();
+   }
+
+   public boolean remove(Receiver queue)
+   {      
+      if (queues.remove(queue))
+      {
+         if (localQueue == queue)
+         {
+            localQueue = null;
+         }
+         
+         target = 0;
+         
+         return true;
+      }
+      else
+      {
+         return false;
+      }
+   }
+
+   public Delivery handle(DeliveryObserver observer, MessageReference reference, Transaction tx)
+   {
+      //Favour the local queue
+           
+      if (localQueue != null)
+      {
+         log.info("There is a local queue");
+         
+         //The only time the local queue won't accept is if the selector doesn't
+         //match - in which case it won't match at any other nodes too so no point
+         //in trying them
+         
+         Delivery del = localQueue.handle(observer, reference, tx);
+         
+         return del;
+      }
+      else
+      {
+         log.info("No local queue!");
+         //There is no local shared queue
+         
+         //We round robin among the rest
+         if (!queues.isEmpty())
+         {
+            ClusteredQueue queue = (ClusteredQueue)queues.get(target);
+            
+            Delivery del = queue.handle(observer, reference, tx);
+                        
+            target++;
+            
+            if (target == queues.size())
+            {
+               target = 0;
+            }
+            
+            //Again, if the selector doesn't match then it won't on any others so no point trying them
+            return del;
+         }                  
+      }
+      return null;
+   }
+   
+   public List getQueues()
+   {
+      return queues;
+   }
+
+   public int numberOfReceivers()
+   {
+      return queues.size();
+   }
+}

Copied: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultRouterFactory.java (from rev 1336, trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/FavourLocalRouterFactory.java)
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/FavourLocalRouterFactory.java	2006-09-21 09:41:24 UTC (rev 1336)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultRouterFactory.java	2006-09-22 12:29:44 UTC (rev 1345)
@@ -0,0 +1,42 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.plugin.postoffice.cluster;
+
+import org.jboss.messaging.core.Router;
+
+
+/**
+ * A FavourLocalRouterFactory
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+public class DefaultRouterFactory implements ClusterRouterFactory
+{
+   public ClusterRouter createRouter()
+   {
+      return new DefaultRouter();
+   }
+}

Deleted: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/FavourLocalRouter.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/FavourLocalRouter.java	2006-09-22 02:06:08 UTC (rev 1344)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/FavourLocalRouter.java	2006-09-22 12:29:44 UTC (rev 1345)
@@ -1,190 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.messaging.core.plugin.postoffice.cluster;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-import org.jboss.messaging.core.Delivery;
-import org.jboss.messaging.core.DeliveryObserver;
-import org.jboss.messaging.core.MessageReference;
-import org.jboss.messaging.core.Receiver;
-import org.jboss.messaging.core.Router;
-import org.jboss.messaging.core.tx.Transaction;
-
-/**
- * 
- * A FavourLocalRouter
- * 
- * This router always favours the local queue.
- * 
- * If there is no local queue it will round robin between the others.
- * 
- * In the case of a distributed point to point queue deployed at each node in the cluster
- * there will always be a local queue.
- * 
- * In this case, with the assumption that producers and consumers are distributed evenly across the cluster
- * then sending the message to the local queue is the most efficient policy.
- * 
- * In the case of a durable subscription, there may well be no local queue since the durable subscription lives
- * only on the number of nodes that it is looked up at.
- * 
- * In this case the round robin routing will kick in
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @version <tt>$Revision: 1.1 $</tt>
- *
- * $Id$
- *
- */
-public class FavourLocalRouter implements ClusterRouter
-{
-   //MUST be an arraylist for fast index access
-   private ArrayList queues;
-   
-   private LocalClusteredQueue localQueue;
-   
-   private int target;
-   
-   public FavourLocalRouter()
-   {
-      queues = new ArrayList();
-   }
-   
-   public int size()
-   {
-      return queues.size();
-   }
-   
-   public LocalClusteredQueue getLocalQueue()
-   {
-      return localQueue;
-   }
-
-   public boolean add(Receiver receiver)
-   {
-      ClusteredQueue queue = (ClusteredQueue)receiver;
-      
-      if (queue.isLocal())
-      {
-         if (localQueue != null)
-         {
-            throw new IllegalStateException("Already has local queue");
-         }
-         localQueue = (LocalClusteredQueue)queue;
-      }
-      
-      queues.add(queue); 
-      
-      target = 0;
-      
-      return true;
-   }
-
-   public void clear()
-   {
-      queues.clear();
-      
-      localQueue = null;
-      
-      target = 0;
-   }
-
-   public boolean contains(Receiver queue)
-   {
-      return queues.contains(queue);
-   }
-
-   public Iterator iterator()
-   {
-      return queues.iterator();
-   }
-
-   public boolean remove(Receiver queue)
-   {      
-      if (queues.remove(queue))
-      {
-         if (localQueue == queue)
-         {
-            localQueue = null;
-         }
-         
-         target = 0;
-         
-         return true;
-      }
-      else
-      {
-         return false;
-      }
-   }
-
-   public Delivery handle(DeliveryObserver observer, MessageReference reference, Transaction tx)
-   {
-      //Favour the local queue
-           
-      if (localQueue != null)
-      {
-         //The only time the local queue won't accept is if the selector doesn't
-         //match - in which case it won't match at any other nodes too so no point
-         //in trying them
-         
-         Delivery del = localQueue.handle(observer, reference, tx);
-         
-         return del;
-      }
-      else
-      {
-         //There is no local shared queue
-         
-         //We round robin among the rest
-         if (!queues.isEmpty())
-         {
-            ClusteredQueue queue = (ClusteredQueue)queues.get(target);
-            
-            Delivery del = queue.handle(observer, reference, tx);
-                        
-            target++;
-            
-            if (target == queues.size())
-            {
-               target = 0;
-            }
-            
-            //Again, if the selector doesn't match then it won't on any others so no point trying them
-            return del;
-         }                  
-      }
-      return null;
-   }
-   
-   public List getQueues()
-   {
-      return queues;
-   }
-
-   public int numberOfReceivers()
-   {
-      return queues.size();
-   }
-}

Deleted: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/FavourLocalRouterFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/FavourLocalRouterFactory.java	2006-09-22 02:06:08 UTC (rev 1344)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/FavourLocalRouterFactory.java	2006-09-22 12:29:44 UTC (rev 1345)
@@ -1,42 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.messaging.core.plugin.postoffice.cluster;
-
-import org.jboss.messaging.core.Router;
-
-
-/**
- * A FavourLocalRouterFactory
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @version <tt>$Revision: 1.1 $</tt>
- *
- * $Id$
- *
- */
-public class FavourLocalRouterFactory implements ClusterRouterFactory
-{
-   public ClusterRouter createRouter()
-   {
-      return new FavourLocalRouter();
-   }
-}

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java	2006-09-22 02:06:08 UTC (rev 1344)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java	2006-09-22 12:29:44 UTC (rev 1345)
@@ -209,5 +209,8 @@
       }
    }
    
-   
+   public boolean isClustered()
+   {
+      return true;
+   }
 }

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RemoteQueueStub.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RemoteQueueStub.java	2006-09-22 02:06:08 UTC (rev 1344)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RemoteQueueStub.java	2006-09-22 12:29:44 UTC (rev 1345)
@@ -263,4 +263,9 @@
       throw new UnsupportedOperationException();
    }
    
+   public boolean isClustered()
+   {
+      return true;
+   }
+   
 }

Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/DefaultPostOfficeTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/DefaultPostOfficeTest.java	2006-09-22 02:06:08 UTC (rev 1344)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/DefaultPostOfficeTest.java	2006-09-22 12:29:44 UTC (rev 1345)
@@ -21,10 +21,18 @@
   */
 package org.jboss.test.messaging.core.plugin.postoffice;
 
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 
+import javax.naming.InitialContext;
+import javax.sql.DataSource;
+import javax.transaction.TransactionManager;
+
 import org.jboss.jms.selector.Selector;
 import org.jboss.jms.server.QueuedExecutorPool;
 import org.jboss.messaging.core.Filter;
@@ -49,6 +57,7 @@
 import org.jboss.test.messaging.tools.ServerManagement;
 import org.jboss.test.messaging.tools.jmx.ServiceContainer;
 import org.jboss.test.messaging.util.CoreMessageFactory;
+import org.jboss.tm.TransactionManagerService;
 
 import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
 
@@ -224,7 +233,7 @@
          Binding binding9 = office3.getBindingForQueueName("durableQueue");
          assertNull(binding9);
          
-         office3.stop();
+         
       }
       finally
       {
@@ -242,6 +251,8 @@
          {
             office2.stop();
          }
+         
+         checkNoBindingData();
       }
             
    }
@@ -518,6 +529,9 @@
          {
             postOffice.stop();
          }
+         
+         checkNoMessageData();
+         checkNoBindingData();
       }
    
    }
@@ -558,6 +572,9 @@
          {
             postOffice.stop();
          }
+         
+         checkNoMessageData();
+         checkNoBindingData();
       }
    }
    
@@ -663,6 +680,9 @@
          {
             postOffice.stop();
          }
+         
+         checkNoMessageData();
+         checkNoBindingData();
       }
    }
    
@@ -832,6 +852,9 @@
          {
             postOffice.stop();
          }
+         
+         checkNoMessageData();
+         checkNoBindingData();
       }
    }
    
@@ -1100,6 +1123,9 @@
          {
             postOffice.stop();
          }
+         
+         checkNoMessageData();
+         checkNoBindingData();
       }
    }
    
@@ -1126,6 +1152,110 @@
       
       return postOffice;
    }
+   
+   protected boolean checkNoBindingData() throws Exception
+   {
+      InitialContext ctx = new InitialContext();
+
+      TransactionManager mgr = (TransactionManager)ctx.lookup(TransactionManagerService.JNDI_NAME);
+      DataSource ds = (DataSource)ctx.lookup("java:/DefaultDS");
+      
+      javax.transaction.Transaction txOld = mgr.suspend();
+      mgr.begin();
+      
+      Connection conn = null;
+      
+      PreparedStatement ps = null;
+      
+      ResultSet rs = null;
+
+      try
+      {
+         conn = ds.getConnection();
+         String sql = "SELECT * FROM JMS_POSTOFFICE";
+         ps = conn.prepareStatement(sql);
+         
+         rs = ps.executeQuery();
+         
+         return rs.next();
+      }
+      finally
+      {
+         if (rs != null) rs.close();
+         
+         if (ps != null) ps.close();
+         
+         if (conn != null) conn.close();
+         
+         mgr.commit();
+
+         if (txOld != null)
+         {
+            mgr.resume(txOld);
+         }
+                  
+      } 
+   }
+   
+   protected boolean checkNoMessageData() throws Exception
+   {
+      InitialContext ctx = new InitialContext();
+
+      TransactionManager mgr = (TransactionManager)ctx.lookup(TransactionManagerService.JNDI_NAME);
+      DataSource ds = (DataSource)ctx.lookup("java:/DefaultDS");
+      
+      javax.transaction.Transaction txOld = mgr.suspend();
+      mgr.begin();
+      
+      Connection conn = null;
+      
+      PreparedStatement ps = null;
+      
+      ResultSet rs = null;
+
+      try
+      {
+         conn = ds.getConnection();
+         String sql = "SELECT * FROM JMS_MESSAGE_REFERENCE";
+         ps = conn.prepareStatement(sql);
+         
+         rs = ps.executeQuery();
+         
+         boolean exists = rs.next();
+         
+         if (!exists)
+         {
+            rs.close();
+            
+            ps.close();
+            
+            ps = conn.prepareStatement("SELECT * FROM JMS_MESSAGE");
+            
+            rs = ps.executeQuery();
+           
+            exists = rs.next();
+         }
+         
+         return exists;
+      }
+      finally
+      {
+         if (rs != null) rs.close();
+         
+         if (ps != null) ps.close();
+         
+         if (conn != null) conn.close();
+         
+         mgr.commit();
+
+         if (txOld != null)
+         {
+            mgr.resume(txOld);
+         }
+                  
+      } 
+   }
+   
    // Private -------------------------------------------------------
 
    // Inner classes -------------------------------------------------

Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeTest.java	2006-09-22 02:06:08 UTC (rev 1344)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeTest.java	2006-09-22 12:29:44 UTC (rev 1345)
@@ -21,6 +21,7 @@
   */
 package org.jboss.test.messaging.core.plugin.postoffice.cluster;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
@@ -28,12 +29,14 @@
 import org.jboss.messaging.core.FilterFactory;
 import org.jboss.messaging.core.Message;
 import org.jboss.messaging.core.MessageReference;
+import org.jboss.messaging.core.Queue;
 import org.jboss.messaging.core.local.PagingFilteredQueue;
 import org.jboss.messaging.core.plugin.contract.ClusteredPostOffice;
+import org.jboss.messaging.core.plugin.contract.PostOffice;
 import org.jboss.messaging.core.plugin.postoffice.Binding;
 import org.jboss.messaging.core.plugin.postoffice.cluster.ClusterRouterFactory;
 import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultClusteredPostOffice;
-import org.jboss.messaging.core.plugin.postoffice.cluster.FavourLocalRouterFactory;
+import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultRouterFactory;
 import org.jboss.messaging.core.plugin.postoffice.cluster.LocalClusteredQueue;
 import org.jboss.messaging.core.plugin.postoffice.cluster.MessagePullPolicy;
 import org.jboss.messaging.core.plugin.postoffice.cluster.NullMessagePullPolicy;
@@ -425,6 +428,8 @@
          {
             office2.stop();
          }
+         
+         checkNoBindingData();
       }
       
    }
@@ -459,7 +464,27 @@
       this.clusteredRouteWithFilter(true);
    }
    
+   public void testRouteSharedPointToPointQueuePersistent() throws Throwable
+   {
+      this.routeSharedQueue(true);
+   }
    
+   public void testRouteSharedPointToPointQueueNonPersistent() throws Throwable
+   {
+      this.routeSharedQueue(false);
+   }
+   
+   public void testRouteComplexTopicPersistent() throws Throwable
+   {
+      this.routeComplexTopic(true);
+   }
+   
+   public void testRouteComplexTopicNonPersistent() throws Throwable
+   {
+      this.routeComplexTopic(false);
+   }
+   
+   
    /*
     * We should allow the clustered bind of queues with the same queue name on different nodes of the
     * cluster
@@ -673,6 +698,8 @@
          {
             office2.stop();
          }
+         
+         checkNoMessageData();
       }
    }
    
@@ -841,10 +868,635 @@
          {
             office2.stop();
          }
+         
+         checkNoMessageData();
       }
    }
    
+   protected void routeSharedQueue(boolean persistentMessage) throws Throwable
+   {
+      ClusteredPostOffice office1 = null;
+      
+      ClusteredPostOffice office2 = null;
+      
+      ClusteredPostOffice office3 = null;
+      
+      ClusteredPostOffice office4 = null;
+      
+      ClusteredPostOffice office5 = null;
+      
+      ClusteredPostOffice office6 = null;
+        
+      try
+      {   
+         office1 = createClusteredPostOffice("node1", "testgroup");
+         office2 = createClusteredPostOffice("node2", "testgroup");
+         office3 = createClusteredPostOffice("node3", "testgroup");
+         office4 = createClusteredPostOffice("node4", "testgroup");
+         office5 = createClusteredPostOffice("node5", "testgroup");
+         office6 = createClusteredPostOffice("node6", "testgroup");
+    
+         //We deploy the queue on nodes 1, 2, 3, 4 and 5
+         //We don't deploy on node 6
+         
+         LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, "node1", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+         Binding binding1 = office1.bindClusteredQueue("queue1", queue1);
+         SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+         queue1.add(receiver1);
+         
+         LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, "node2", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+         Binding binding2 = office2.bindClusteredQueue("queue1", queue2); 
+         SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+         queue2.add(receiver2);
+         
+         LocalClusteredQueue queue3 = new LocalClusteredQueue(office3, "node3", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+         Binding binding3 = office3.bindClusteredQueue("queue1", queue3);
+         SimpleReceiver receiver3 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+         queue3.add(receiver3);
+         
+         LocalClusteredQueue queue4 = new LocalClusteredQueue(office4, "node4", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+         Binding binding4 = office4.bindClusteredQueue("queue1", queue4); 
+         SimpleReceiver receiver4 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+         queue4.add(receiver4);
+         
+         LocalClusteredQueue queue5 = new LocalClusteredQueue(office5, "node5", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+         Binding binding5 = office5.bindClusteredQueue("queue1", queue5);
+         SimpleReceiver receiver5 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+         queue5.add(receiver5);
+        
+         //We are using a AlwaysLocalRoutingPolicy so only the local queue should ever get the message if the filter matches
+                          
+         Message msg = CoreMessageFactory.createCoreMessage(1, persistentMessage, null);      
+         MessageReference ref = ms.reference(msg);         
+         boolean routed = office1.route(ref, "queue1", null);         
+         assertTrue(routed);
+         checkContainsAndAcknowledge(msg, receiver1, queue1);
+         this.checkEmpty(receiver2);
+         this.checkEmpty(receiver3);
+         this.checkEmpty(receiver4);
+         this.checkEmpty(receiver5);
+         
+         msg = CoreMessageFactory.createCoreMessage(1, persistentMessage, null);      
+         ref = ms.reference(msg);         
+         routed = office2.route(ref, "queue1", null);         
+         assertTrue(routed);
+         this.checkEmpty(receiver1);
+         checkContainsAndAcknowledge(msg, receiver2, queue2);
+         this.checkEmpty(receiver3);
+         this.checkEmpty(receiver4);
+         this.checkEmpty(receiver5);
+         
+         msg = CoreMessageFactory.createCoreMessage(1, persistentMessage, null);      
+         ref = ms.reference(msg);         
+         routed = office3.route(ref, "queue1", null);         
+         assertTrue(routed);
+         this.checkEmpty(receiver1);
+         this.checkEmpty(receiver2);
+         checkContainsAndAcknowledge(msg, receiver3, queue3);
+         this.checkEmpty(receiver4);
+         this.checkEmpty(receiver5);
+         
+         msg = CoreMessageFactory.createCoreMessage(1, persistentMessage, null);      
+         ref = ms.reference(msg);         
+         routed = office4.route(ref, "queue1", null);         
+         assertTrue(routed);
+         this.checkEmpty(receiver1);
+         this.checkEmpty(receiver2);
+         this.checkEmpty(receiver3);
+         checkContainsAndAcknowledge(msg, receiver4, queue3);
+         this.checkEmpty(receiver5);
+         
+         msg = CoreMessageFactory.createCoreMessage(1, persistentMessage, null);      
+         ref = ms.reference(msg);         
+         routed = office5.route(ref, "queue1", null);         
+         assertTrue(routed);
+         this.checkEmpty(receiver1);
+         this.checkEmpty(receiver2);         
+         this.checkEmpty(receiver3);
+         this.checkEmpty(receiver4);
+         checkContainsAndAcknowledge(msg, receiver5, queue5);
+         
+         log.info("************* ROOTING");
+         
+         msg = CoreMessageFactory.createCoreMessage(1, persistentMessage, null);      
+         ref = ms.reference(msg);         
+         routed = office6.route(ref, "queue1", null);         
+         assertTrue(routed);
+         
+         //The actual queue that receives the mesage is determined by the routing policy
+         //The default uses round robin for the nodes (this is tested more thoroughly in
+         //its own test)
+         
+         Thread.sleep(1000);
+         
+         log.info("checking");
+         checkContainsAndAcknowledge(msg, receiver1, queue1);
+         this.checkEmpty(receiver1);
+         this.checkEmpty(receiver2);         
+         this.checkEmpty(receiver3);
+         this.checkEmpty(receiver4);
+         this.checkEmpty(receiver5);
+                 
+      }
+      finally
+      {
+         if (office1 != null)
+         {            
+            office1.stop();
+         }
+         
+         if (office2 != null)
+         {
+            office2.stop();
+         }
+         
+         if (office3 != null)
+         {            
+            office3.stop();
+         }
+         
+         if (office4 != null)
+         {
+            office4.stop();
+         }
+         
+         if (office5 != null)
+         {            
+            office5.stop();
+         }
+         
+         if (office6 != null)
+         {            
+            office6.stop();
+         }
+         
+         checkNoMessageData();
+      }
+   }
    
+
+   
+   /*
+    * We set up a complex scenario with multiple subscriptions, shared and unshared on different nodes
+    * 
+    * node1: no subscriptions
+    * node2: 2 non durable
+    * node3: 1 non shared durable, 1 non durable
+    * node4: 1 shared durable (shared1), 1 non shared durable, 3 non durable
+    * node5: 2 shared durable (shared1 and shared2)
+    * node6: 1 shared durable (shared2), 1 non durable
+    * node7: 1 shared durable (shared2)
+    * 
+    * Then we send mess
+    * 
+    * 
+    */
+   protected void routeComplexTopic(boolean persistent) throws Throwable
+   {
+      ClusteredPostOffice office1 = null;
+      
+      ClusteredPostOffice office2 = null;
+      
+      ClusteredPostOffice office3 = null;
+      
+      ClusteredPostOffice office4 = null;
+      
+      ClusteredPostOffice office5 = null;
+      
+      ClusteredPostOffice office6 = null;
+      
+      ClusteredPostOffice office7 = null;
+        
+      try
+      {   
+         office1 = createClusteredPostOffice("node1", "testgroup");
+         office2 = createClusteredPostOffice("node2", "testgroup");
+         office3 = createClusteredPostOffice("node3", "testgroup");
+         office4 = createClusteredPostOffice("node4", "testgroup");
+         office5 = createClusteredPostOffice("node5", "testgroup");
+         office6 = createClusteredPostOffice("node6", "testgroup");
+         office7 = createClusteredPostOffice("node7", "testgroup");
+         
+         //Node 2
+         //======
+         
+         //Non durable 1 on node 2
+         LocalClusteredQueue nonDurable1 = new LocalClusteredQueue(office2, "node2", "nondurable1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+         Binding binding1 = office2.bindClusteredQueue("topic", nonDurable1);
+         SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+         nonDurable1.add(receiver1);
+         
+         //Non durable 2 on node 2
+         LocalClusteredQueue nonDurable2 = new LocalClusteredQueue(office2, "node2", "nondurable2", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+         Binding binding2 = office2.bindClusteredQueue("topic", nonDurable2);
+         SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+         nonDurable2.add(receiver2);
+         
+         //Node 3
+         //======
+         
+         //Non shared durable
+         LocalClusteredQueue nonSharedDurable1 = new LocalClusteredQueue(office3, "node3", "nonshareddurable1", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null);
+         Binding binding3 = office3.bindClusteredQueue("topic", nonSharedDurable1);
+         SimpleReceiver receiver3 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+         nonSharedDurable1.add(receiver3);
+         
+         //Non durable
+         LocalClusteredQueue nonDurable3 = new LocalClusteredQueue(office3, "node3", "nondurable3", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+         Binding binding4 = office3.bindClusteredQueue("topic", nonDurable3);
+         SimpleReceiver receiver4 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+         nonDurable3.add(receiver4);
+         
+         //Node 4
+         //======
+         
+         //Shared durable
+         LocalClusteredQueue sharedDurable1 = new LocalClusteredQueue(office4, "node4", "shareddurable1", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null);
+         Binding binding5 = office4.bindClusteredQueue("topic", sharedDurable1);
+         SimpleReceiver receiver5 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+         sharedDurable1.add(receiver5);
+         
+         //Non shared durable
+         LocalClusteredQueue nonSharedDurable2 = new LocalClusteredQueue(office4, "node4", "nonshareddurable2", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null);
+         Binding binding6 = office4.bindClusteredQueue("topic", nonSharedDurable2);
+         SimpleReceiver receiver6 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+         nonSharedDurable2.add(receiver6);
+         
+         //Non durable
+         LocalClusteredQueue nonDurable4 = new LocalClusteredQueue(office4, "node4", "nondurable4", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+         Binding binding7 = office4.bindClusteredQueue("topic", nonDurable4);
+         SimpleReceiver receiver7 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+         nonDurable4.add(receiver7);
+         
+         // Non durable
+         LocalClusteredQueue nonDurable5 = new LocalClusteredQueue(office4, "node4", "nondurable5", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+         Binding binding8 = office4.bindClusteredQueue("topic", nonDurable5);
+         SimpleReceiver receiver8 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+         nonDurable5.add(receiver8);
+         
+         //Non durable
+         LocalClusteredQueue nonDurable6 = new LocalClusteredQueue(office4, "node4", "nondurable6", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+         Binding binding9 = office4.bindClusteredQueue("topic", nonDurable6);
+         SimpleReceiver receiver9 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+         nonDurable6.add(receiver9);
+         
+         // Node 5
+         //=======
+         //Shared durable
+         LocalClusteredQueue sharedDurable2 = new LocalClusteredQueue(office5, "node5", "shareddurable1", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null);
+         Binding binding10 = office5.bindClusteredQueue("topic", sharedDurable2);
+         SimpleReceiver receiver10 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+         sharedDurable2.add(receiver10);
+         
+         //Shared durable
+         LocalClusteredQueue sharedDurable3 = new LocalClusteredQueue(office5, "node5", "shareddurable2", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null);
+         Binding binding11 = office5.bindClusteredQueue("topic", sharedDurable3);
+         SimpleReceiver receiver11 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+         sharedDurable3.add(receiver11);
+         
+         // Node 6
+         //=========
+         LocalClusteredQueue sharedDurable4 = new LocalClusteredQueue(office6, "node6", "shareddurable2", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null);
+         Binding binding12 = office6.bindClusteredQueue("topic", sharedDurable4);
+         SimpleReceiver receiver12 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+         sharedDurable4.add(receiver12);
+         
+         LocalClusteredQueue nonDurable7 = new LocalClusteredQueue(office6, "node6", "nondurable7", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+         Binding binding13 = office6.bindClusteredQueue("topic", nonDurable7);
+         SimpleReceiver receiver13 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+         nonDurable7.add(receiver13);
+         
+         //Node 7
+         //=======
+         LocalClusteredQueue sharedDurable5 = new LocalClusteredQueue(office7, "node7", "shareddurable2", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null);
+         Binding binding14 = office7.bindClusteredQueue("topic", sharedDurable5);
+         SimpleReceiver receiver14 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+         sharedDurable5.add(receiver14);
+         
+         
+         //Send 3 messages at node1
+         //========================
+         
+         List msgs = sendMessages(persistent, office1, 3, null);
+         
+         //n2
+         checkContainsAndAcknowledge(msgs, receiver1, nonDurable1);
+         checkContainsAndAcknowledge(msgs, receiver2, nonDurable2);
+         
+         //n3
+         checkContainsAndAcknowledge(msgs, receiver3, nonSharedDurable1);
+         checkContainsAndAcknowledge(msgs, receiver4, nonDurable3);
+         
+         //n4
+         checkContainsAndAcknowledge(msgs, receiver5, sharedDurable1);
+         checkContainsAndAcknowledge(msgs, receiver6, nonSharedDurable2);
+         checkContainsAndAcknowledge(msgs, receiver7, nonDurable4);
+         checkContainsAndAcknowledge(msgs, receiver8, nonDurable5);
+         checkContainsAndAcknowledge(msgs, receiver9, nonDurable6);
+         
+         //n5
+         checkEmpty(receiver10);
+         checkContainsAndAcknowledge(msgs, receiver11, sharedDurable3);
+         
+         //n6
+         checkEmpty(receiver12);
+         checkContainsAndAcknowledge(msgs, receiver13, nonDurable7);
+         
+         //n7
+         checkEmpty(receiver12);
+         
+         
+         //Send 3 messages at node2
+         //========================
+         
+         msgs = sendMessages(persistent, office2, 3, null);
+         
+         //n2
+         checkContainsAndAcknowledge(msgs, receiver1, nonDurable1);
+         checkContainsAndAcknowledge(msgs, receiver2, nonDurable2);
+         
+         //n3
+         checkContainsAndAcknowledge(msgs, receiver3, nonSharedDurable1);
+         checkContainsAndAcknowledge(msgs, receiver4, nonDurable3);
+         
+         //n4
+         checkContainsAndAcknowledge(msgs, receiver5, sharedDurable1);
+         checkContainsAndAcknowledge(msgs, receiver6, nonSharedDurable2);
+         checkContainsAndAcknowledge(msgs, receiver7, nonDurable4);
+         checkContainsAndAcknowledge(msgs, receiver8, nonDurable5);
+         checkContainsAndAcknowledge(msgs, receiver9, nonDurable6);
+         
+         //n5
+         checkEmpty(receiver10);
+         checkContainsAndAcknowledge(msgs, receiver11, sharedDurable3);
+         
+         //n6
+         checkEmpty(receiver12);
+         checkContainsAndAcknowledge(msgs, receiver13, nonDurable7);
+         
+         //n7
+         checkEmpty(receiver12);
+         
+         //Send 3 messages at node3
+         //========================
+         
+         msgs = sendMessages(persistent, office3, 3, null);
+         
+         //n2
+         checkContainsAndAcknowledge(msgs, receiver1, nonDurable1);
+         checkContainsAndAcknowledge(msgs, receiver2, nonDurable2);
+         
+         //n3
+         checkContainsAndAcknowledge(msgs, receiver3, nonSharedDurable1);
+         checkContainsAndAcknowledge(msgs, receiver4, nonDurable3);
+         
+         //n4
+         checkContainsAndAcknowledge(msgs, receiver5, sharedDurable1);
+         checkContainsAndAcknowledge(msgs, receiver6, nonSharedDurable2);
+         checkContainsAndAcknowledge(msgs, receiver7, nonDurable4);
+         checkContainsAndAcknowledge(msgs, receiver8, nonDurable5);
+         checkContainsAndAcknowledge(msgs, receiver9, nonDurable6);
+         
+         //n5
+         checkEmpty(receiver10);
+         checkContainsAndAcknowledge(msgs, receiver11, sharedDurable3);
+         
+         //n6
+         checkEmpty(receiver12);
+         checkContainsAndAcknowledge(msgs, receiver13, nonDurable7);
+         
+         //n7
+         checkEmpty(receiver12);     
+         
+         //Send 3 messages at node4
+         //========================
+         
+         msgs = sendMessages(persistent, office4, 3, null);
+               
+         //n2
+         checkContainsAndAcknowledge(msgs, receiver1, nonDurable1);
+         checkContainsAndAcknowledge(msgs, receiver2, nonDurable2);
+         
+         //n3
+         checkContainsAndAcknowledge(msgs, receiver3, nonSharedDurable1);
+         checkContainsAndAcknowledge(msgs, receiver4, nonDurable3);
+         
+         //n4
+         checkContainsAndAcknowledge(msgs, receiver5, sharedDurable1);
+         checkContainsAndAcknowledge(msgs, receiver6, nonSharedDurable2);
+         checkContainsAndAcknowledge(msgs, receiver7, nonDurable4);
+         checkContainsAndAcknowledge(msgs, receiver8, nonDurable5);
+         checkContainsAndAcknowledge(msgs, receiver9, nonDurable6);
+         
+         //n5
+         checkEmpty(receiver10);
+         checkContainsAndAcknowledge(msgs, receiver11, sharedDurable3);
+         
+         //n6
+         checkEmpty(receiver12);
+         checkContainsAndAcknowledge(msgs, receiver13, nonDurable7);
+         
+         //n7
+         checkEmpty(receiver12);
+         
+         //Send 3 messages at node5
+         //========================
+         
+         msgs = sendMessages(persistent, office5, 3, null);
+             
+         //n2
+         checkContainsAndAcknowledge(msgs, receiver1, nonDurable1);
+         checkContainsAndAcknowledge(msgs, receiver2, nonDurable2);
+         
+         //n3
+         checkContainsAndAcknowledge(msgs, receiver3, nonSharedDurable1);
+         checkContainsAndAcknowledge(msgs, receiver4, nonDurable3);
+         
+         //n4
+         checkEmpty(receiver5);
+         checkContainsAndAcknowledge(msgs, receiver6, nonSharedDurable2);
+         checkContainsAndAcknowledge(msgs, receiver7, nonDurable4);
+         checkContainsAndAcknowledge(msgs, receiver8, nonDurable5);
+         checkContainsAndAcknowledge(msgs, receiver9, nonDurable6);
+         
+         //n5
+         checkContainsAndAcknowledge(msgs, receiver10, sharedDurable2);
+         checkContainsAndAcknowledge(msgs, receiver11, sharedDurable3);
+         
+         //n6
+         checkEmpty(receiver12);
+         checkContainsAndAcknowledge(msgs, receiver13, nonDurable7);
+         
+         //n7
+         checkEmpty(receiver12);
+         
+         //Send 3 messages at node6
+         //========================
+         
+         msgs = sendMessages(persistent, office6, 3, null);
+             
+         //n2
+         checkContainsAndAcknowledge(msgs, receiver1, nonDurable1);
+         checkContainsAndAcknowledge(msgs, receiver2, nonDurable2);
+         
+         //n3
+         checkContainsAndAcknowledge(msgs, receiver3, nonSharedDurable1);
+         checkContainsAndAcknowledge(msgs, receiver4, nonDurable3);
+         
+         //n4
+         checkContainsAndAcknowledge(msgs, receiver5, sharedDurable1);
+         checkContainsAndAcknowledge(msgs, receiver6, nonSharedDurable2);
+         checkContainsAndAcknowledge(msgs, receiver7, nonDurable4);
+         checkContainsAndAcknowledge(msgs, receiver8, nonDurable5);
+         checkContainsAndAcknowledge(msgs, receiver9, nonDurable6);
+         
+         //n5
+         checkEmpty(receiver10);
+        
+         checkEmpty(receiver11);
+         
+         //n6
+         checkContainsAndAcknowledge(msgs, receiver12, sharedDurable4);         
+         checkContainsAndAcknowledge(msgs, receiver13, nonDurable7);
+         
+         //n7
+         checkEmpty(receiver12);
+         
+         //Send 3 messages at node7
+         //========================
+         
+         msgs = sendMessages(persistent, office7, 3, null);
+
+         //n2
+         checkContainsAndAcknowledge(msgs, receiver1, nonDurable1);
+         checkContainsAndAcknowledge(msgs, receiver2, nonDurable2);
+         
+         //n3
+         checkContainsAndAcknowledge(msgs, receiver3, nonSharedDurable1);
+         checkContainsAndAcknowledge(msgs, receiver4, nonDurable3);
+         
+         //n4
+         checkContainsAndAcknowledge(msgs, receiver5, sharedDurable1);
+         checkContainsAndAcknowledge(msgs, receiver6, nonSharedDurable2);
+         checkContainsAndAcknowledge(msgs, receiver7, nonDurable4);
+         checkContainsAndAcknowledge(msgs, receiver8, nonDurable5);
+         checkContainsAndAcknowledge(msgs, receiver9, nonDurable6);
+         
+         //n5
+         checkEmpty(receiver10);
+         checkEmpty(receiver11);
+         
+         //n6
+         checkEmpty(receiver12);
+         checkContainsAndAcknowledge(msgs, receiver13, nonDurable7);
+         
+         //n7
+         checkContainsAndAcknowledge(msgs, receiver14, sharedDurable5);
+        
+      }
+      finally
+      {
+         if (office1 != null)
+         {            
+            office1.stop();
+         }
+         
+         if (office2 != null)
+         {
+            office2.stop();
+         }
+         
+         if (office3 != null)
+         {            
+            office3.stop();
+         }
+         
+         if (office4 != null)
+         {
+            office4.stop();
+         }
+         
+         if (office5 != null)
+         {            
+            office5.stop();
+         }
+         
+         if (office6 != null)
+         {            
+            office6.stop();
+         }
+         
+         if (office7 != null)
+         {            
+            office7.stop();
+         }
+         
+         checkNoMessageData();
+      }
+   }
+   
+   
+   private List sendMessages(boolean persistent, PostOffice office, int num, Transaction tx) throws Exception
+   {
+      List list = new ArrayList();
+      
+      Message msg = CoreMessageFactory.createCoreMessage(1, persistent, null);      
+      
+      MessageReference ref = ms.reference(msg);         
+      
+      boolean routed = office.route(ref, "topic", null);         
+      
+      assertTrue(routed);
+      
+      list.add(msg);
+      
+      Thread.sleep(1000);
+      
+      return list;
+   }
+   
+   private void checkContainsAndAcknowledge(Message msg, SimpleReceiver receiver, Queue queue) throws Throwable
+   {
+      List msgs = receiver.getMessages();
+      assertNotNull(msgs);
+      assertEquals(1, msgs.size());
+      Message msgRec = (Message)msgs.get(0);
+      assertEquals(msg.getMessageID(), msgRec.getMessageID());
+      receiver.acknowledge(msgRec, null);
+      msgs = queue.browse();
+      assertNotNull(msgs);
+      assertTrue(msgs.isEmpty()); 
+      receiver.clear();
+   }
+   
+   private void checkContainsAndAcknowledge(List msgList, SimpleReceiver receiver, Queue queue) throws Throwable
+   {
+      List msgs = receiver.getMessages();
+      assertNotNull(msgs);
+      assertEquals(msgList.size(), msgs.size());
+      
+      for (int i = 0; i < msgList.size(); i++)
+      {
+         Message msgRec = (Message)msgs.get(i);
+         Message msgCheck = (Message)msgList.get(i);
+         assertEquals(msgCheck.getMessageID(), msgRec.getMessageID());
+         receiver.acknowledge(msgRec, null);
+      }
+      
+      msgs = queue.browse();
+      assertNotNull(msgs);
+      assertTrue(msgs.isEmpty()); 
+      receiver.clear();
+   }
+   
+   private void checkEmpty(SimpleReceiver receiver) throws Throwable
+   {
+      List msgs = receiver.getMessages();
+      assertNotNull(msgs);
+      assertTrue(msgs.isEmpty());
+   }
+   
+   
    protected void clusteredTransactionalRoute(boolean persistent) throws Throwable
    {
       ClusteredPostOffice office1 = null;
@@ -1441,16 +2093,18 @@
          {
             office2.stop();
          }
+         
+         checkNoMessageData();
       }
    }
    
    protected ClusteredPostOffice createClusteredPostOffice(String nodeId, String groupName) throws Exception
    {
-      MessagePullPolicy redistPolicy = new NullMessagePullPolicy();
+      MessagePullPolicy pullPolicy = new NullMessagePullPolicy();
       
       FilterFactory ff = new SimpleFilterFactory();
       
-      ClusterRouterFactory rf = new FavourLocalRouterFactory();
+      ClusterRouterFactory rf = new DefaultRouterFactory();
       
       DefaultClusteredPostOffice postOffice = 
          new DefaultClusteredPostOffice(sc.getDataSource(), sc.getTransactionManager(),
@@ -1458,7 +2112,7 @@
                                  groupName,
                                  JGroupsUtil.getControlStackProperties(),
                                  JGroupsUtil.getDataStackProperties(),
-                                 5000, 5000, redistPolicy, rf, 1);
+                                 5000, 5000, pullPolicy, rf, 1);
       
       postOffice.start();      
       

Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/FavourLocalRouterTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/FavourLocalRouterTest.java	2006-09-22 02:06:08 UTC (rev 1344)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/FavourLocalRouterTest.java	2006-09-22 12:29:44 UTC (rev 1345)
@@ -30,7 +30,7 @@
 import org.jboss.messaging.core.plugin.postoffice.Binding;
 import org.jboss.messaging.core.plugin.postoffice.cluster.ClusterRouterFactory;
 import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultClusteredPostOffice;
-import org.jboss.messaging.core.plugin.postoffice.cluster.FavourLocalRouterFactory;
+import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultRouterFactory;
 import org.jboss.messaging.core.plugin.postoffice.cluster.LocalClusteredQueue;
 import org.jboss.messaging.core.plugin.postoffice.cluster.MessagePullPolicy;
 import org.jboss.messaging.core.plugin.postoffice.cluster.NullMessagePullPolicy;
@@ -146,7 +146,7 @@
       
       FilterFactory ff = new SimpleFilterFactory();
       
-      ClusterRouterFactory rf = new FavourLocalRouterFactory();
+      ClusterRouterFactory rf = new DefaultRouterFactory();
       
       DefaultClusteredPostOffice postOffice = 
          new DefaultClusteredPostOffice(sc.getDataSource(), sc.getTransactionManager(),




More information about the jboss-cvs-commits mailing list