[Jboss-cvs] JBoss Messaging SVN: r1298 - in trunk: src/main/org/jboss/messaging/core/plugin src/main/org/jboss/messaging/core/plugin/postoffice/cluster tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Sun Sep 17 13:58:17 EDT 2006


Author: timfox
Date: 2006-09-17 13:58:08 -0400 (Sun, 17 Sep 2006)
New Revision: 1298

Added:
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/FavourLocalRouter.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/FavourLocalRouterFactory.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RouterFactory.java
   trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/FavourLocalRouterTest.java
   trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/NullRedistributionPolicy.java
Removed:
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusterRouter.java
Modified:
   trunk/src/main/org/jboss/messaging/core/plugin/ClusteredPostOfficeService.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusteredBindingsImpl.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusteredPostOfficeImpl.java
   trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/ClusteredPostOfficeTest.java
Log:
More cluster routing stuff



Modified: trunk/src/main/org/jboss/messaging/core/plugin/ClusteredPostOfficeService.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/ClusteredPostOfficeService.java	2006-09-17 15:47:58 UTC (rev 1297)
+++ trunk/src/main/org/jboss/messaging/core/plugin/ClusteredPostOfficeService.java	2006-09-17 17:58:08 UTC (rev 1298)
@@ -34,7 +34,9 @@
 import org.jboss.messaging.core.plugin.contract.PersistenceManager;
 import org.jboss.messaging.core.plugin.postoffice.cluster.BasicRedistributionPolicy;
 import org.jboss.messaging.core.plugin.postoffice.cluster.ClusteredPostOfficeImpl;
+import org.jboss.messaging.core.plugin.postoffice.cluster.FavourLocalRouterFactory;
 import org.jboss.messaging.core.plugin.postoffice.cluster.RedistributionPolicy;
+import org.jboss.messaging.core.plugin.postoffice.cluster.RouterFactory;
 import org.jboss.messaging.core.tx.TransactionRepository;
 import org.w3c.dom.Element;
 
@@ -206,6 +208,8 @@
          RedistributionPolicy redistPolicy = new BasicRedistributionPolicy(nodeId);
          
          FilterFactory ff = new SelectorFactory();
+         
+         RouterFactory rf = new FavourLocalRouterFactory();
                   
          postOffice =  new ClusteredPostOfficeImpl(ds, tm, sqlProperties, createTablesOnStartup,
                                                nodeId, officeName, ms,
@@ -213,7 +217,7 @@
                                                groupName,
                                                syncChannelConfig, asyncChannelConfig,
                                                stateTimeout, castTimeout,
-                                               redistPolicy, redistPeriod);
+                                               redistPolicy, redistPeriod, rf);
          
          postOffice.start();
          

Deleted: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusterRouter.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusterRouter.java	2006-09-17 15:47:58 UTC (rev 1297)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusterRouter.java	2006-09-17 17:58:08 UTC (rev 1298)
@@ -1,145 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.messaging.core.plugin.postoffice.cluster;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-import org.jboss.messaging.core.Delivery;
-import org.jboss.messaging.core.DeliveryObserver;
-import org.jboss.messaging.core.MessageReference;
-import org.jboss.messaging.core.Receiver;
-import org.jboss.messaging.core.Router;
-import org.jboss.messaging.core.tx.Transaction;
-
-/**
- * A ClusterRouter
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @version <tt>$Revision: 1.1 $</tt>
- *
- * $Id$
- *
- */
-public class ClusterRouter implements Router
-{
-   private List queues;
-   
-   private ClusteredQueue localQueue;
-   
-   public ClusterRouter()
-   {
-      queues = new ArrayList();
-   }
-
-   public boolean add(Receiver receiver)
-   {
-      ClusteredQueue queue = (ClusteredQueue)receiver;
-      
-      if (queue.isLocal())
-      {
-         if (localQueue != null)
-         {
-            throw new IllegalStateException("Already has local queue");
-         }
-         localQueue = queue;
-      }
-      
-      queues.add(queue);      
-      
-      return true;
-   }
-
-   public void clear()
-   {
-      queues.clear();
-      localQueue = null;
-   }
-
-   public boolean contains(Receiver queue)
-   {
-      return queues.contains(queue);
-   }
-
-   public Iterator iterator()
-   {
-      return queues.iterator();
-   }
-
-   public boolean remove(Receiver queue)
-   {
-      if (queues.remove(queue))
-      {
-         if (localQueue == queue)
-         {
-            localQueue = null;
-         }
-         return true;
-      }
-      else
-      {
-         return false;
-      }
-   }
-
-   public Delivery handle(DeliveryObserver observer, MessageReference reference, Transaction tx)
-   {
-      //Favour the local queue
-      
-      if (localQueue != null)
-      {
-         Delivery del = localQueue.handle(observer, reference, tx);
-         
-         if (del != null && del.isSelectorAccepted())
-         {
-            return del;
-         }
-      }
-      
-      //TODO make this round robin
-      
-      Iterator iter = queues.iterator();
-      
-      while (iter.hasNext())
-      {
-         ClusteredQueue queue = (ClusteredQueue)iter.next();
-         
-         if (!queue.isLocal())
-         {
-            Delivery del = queue.handle(observer, reference, tx);
-            
-            if (del != null && del.isSelectorAccepted())
-            {
-               return del;
-            }
-         }
-      }
-      
-      return null;      
-   }
-   
-   public List getQueues()
-   {
-      return queues;
-   }
-}

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusteredBindingsImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusteredBindingsImpl.java	2006-09-17 15:47:58 UTC (rev 1297)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusteredBindingsImpl.java	2006-09-17 17:58:08 UTC (rev 1298)
@@ -25,6 +25,7 @@
 import java.util.HashMap;
 import java.util.Map;
 
+import org.jboss.messaging.core.Router;
 import org.jboss.messaging.core.plugin.postoffice.Binding;
 import org.jboss.messaging.core.plugin.postoffice.BindingsImpl;
 
@@ -48,24 +49,28 @@
    
    private int localDurableCount;
    
-   public ClusteredBindingsImpl(String thisNode)
+   private RouterFactory rf;
+   
+   public ClusteredBindingsImpl(String thisNode, RouterFactory rf)
    {
       super();
       
       nameMap = new HashMap();
       
       this.thisNode = thisNode;
+      
+      this.rf = rf;
    }
    
    public void addBinding(Binding binding)
    {
       super.addBinding(binding);
                
-      ClusterRouter router = (ClusterRouter)nameMap.get(binding.getQueue().getName());
+      Router router = (Router)nameMap.get(binding.getQueue().getName());
       
       if (router == null)
       {
-         router = new ClusterRouter();
+         router = rf.createRouter();
          
          nameMap.put(binding.getQueue().getName(), router);
       }
@@ -87,7 +92,7 @@
          return false;
       }
            
-      ClusterRouter router = (ClusterRouter)nameMap.get(binding.getQueue().getName());
+      FavourLocalRouter router = (FavourLocalRouter)nameMap.get(binding.getQueue().getName());
       
       if (router == null)
       {

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusteredPostOfficeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusteredPostOfficeImpl.java	2006-09-17 15:47:58 UTC (rev 1297)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusteredPostOfficeImpl.java	2006-09-17 17:58:08 UTC (rev 1298)
@@ -129,6 +129,8 @@
    private MessageRedistributor redistributor;
    
    private long redistributePeriod;
+   
+   private RouterFactory routerFactory;
       
    public ClusteredPostOfficeImpl()
    {        
@@ -157,10 +159,11 @@
             Element asyncChannelConfig,
             long stateTimeout, long castTimeout,
             RedistributionPolicy redistributionPolicy,
-            long redistributePeriod) throws Exception
+            long redistributePeriod,
+            RouterFactory rf) throws Exception
    {            
       this(ds, tm, sqlProperties, createTablesOnStartup, nodeId, officeName, ms,
-           pm, tr, filterFactory, pool, groupName, stateTimeout, castTimeout, redistributionPolicy, redistributePeriod);
+           pm, tr, filterFactory, pool, groupName, stateTimeout, castTimeout, redistributionPolicy, redistributePeriod, rf);
       
       this.syncChannelConfigE = syncChannelConfig;      
       this.asyncChannelConfigE = asyncChannelConfig;     
@@ -181,10 +184,11 @@
                               String asyncChannelConfig,
                               long stateTimeout, long castTimeout,
                               RedistributionPolicy redistributionPolicy,
-                              long redistributePeriod) throws Exception
+                              long redistributePeriod,
+                              RouterFactory rf) throws Exception
    {            
       this(ds, tm, sqlProperties, createTablesOnStartup, nodeId, officeName, ms,
-           pm, tr, filterFactory, pool, groupName, stateTimeout, castTimeout, redistributionPolicy, redistributePeriod);
+           pm, tr, filterFactory, pool, groupName, stateTimeout, castTimeout, redistributionPolicy, redistributePeriod, rf);
 
       this.syncChannelConfigS = syncChannelConfig;      
       this.asyncChannelConfigS = asyncChannelConfig;     
@@ -200,7 +204,8 @@
                                String groupName,
                                long stateTimeout, long castTimeout,                             
                                RedistributionPolicy redistributionPolicy,
-                               long redistributePeriod)
+                               long redistributePeriod,
+                               RouterFactory rf)
    {
       super (ds, tm, sqlProperties, createTablesOnStartup, nodeId, officeName, ms, pm, tr, filterFactory,
              pool);
@@ -217,6 +222,8 @@
       
       this.redistributePeriod = redistributePeriod;
       
+      this.routerFactory = rf;
+      
       init();
    }
 
@@ -776,7 +783,7 @@
             
             while (iter2.hasNext())
             {
-               ClusterRouter router = (ClusterRouter)iter2.next();        
+               FavourLocalRouter router = (FavourLocalRouter)iter2.next();        
             
                RedistributionOrder order = redistributionPolicy.calculate(router.getQueues());
                
@@ -922,7 +929,7 @@
         
    protected Bindings createBindings()
    {
-      return new ClusteredBindingsImpl(this.nodeId);
+      return new ClusteredBindingsImpl(this.nodeId, this.routerFactory);
    }
    
    protected void loadBindings() throws Exception

Added: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/FavourLocalRouter.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/FavourLocalRouter.java	2006-09-17 15:47:58 UTC (rev 1297)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/FavourLocalRouter.java	2006-09-17 17:58:08 UTC (rev 1298)
@@ -0,0 +1,148 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.plugin.postoffice.cluster;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.jboss.messaging.core.Delivery;
+import org.jboss.messaging.core.DeliveryObserver;
+import org.jboss.messaging.core.MessageReference;
+import org.jboss.messaging.core.Receiver;
+import org.jboss.messaging.core.Router;
+import org.jboss.messaging.core.tx.Transaction;
+
+/**
+ * 
+ * A FavourLocalRouter
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+public class FavourLocalRouter implements Router
+{
+   private List queues;
+   
+   private ClusteredQueue localQueue;
+   
+   public FavourLocalRouter()
+   {
+      queues = new ArrayList();
+   }
+
+   public boolean add(Receiver receiver)
+   {
+      ClusteredQueue queue = (ClusteredQueue)receiver;
+      
+      if (queue.isLocal())
+      {
+         if (localQueue != null)
+         {
+            throw new IllegalStateException("Already has local queue");
+         }
+         localQueue = queue;
+      }
+      
+      queues.add(queue);      
+      
+      return true;
+   }
+
+   public void clear()
+   {
+      queues.clear();
+      localQueue = null;
+   }
+
+   public boolean contains(Receiver queue)
+   {
+      return queues.contains(queue);
+   }
+
+   public Iterator iterator()
+   {
+      return queues.iterator();
+   }
+
+   public boolean remove(Receiver queue)
+   {
+      if (queues.remove(queue))
+      {
+         if (localQueue == queue)
+         {
+            localQueue = null;
+         }
+         return true;
+      }
+      else
+      {
+         return false;
+      }
+   }
+
+   public Delivery handle(DeliveryObserver observer, MessageReference reference, Transaction tx)
+   {
+      //Favour the local queue
+           
+      if (localQueue != null)
+      {
+         //But only if it has consumers
+         
+         Delivery del = localQueue.handle(observer, reference, tx);
+         
+         if (del != null && del.isSelectorAccepted())
+         {
+            return del;
+         }
+      }
+      
+      //TODO make this round robin
+      
+      Iterator iter = queues.iterator();
+      
+      while (iter.hasNext())
+      {
+         ClusteredQueue queue = (ClusteredQueue)iter.next();
+         
+         if (!queue.isLocal())
+         {
+            Delivery del = queue.handle(observer, reference, tx);
+            
+            if (del != null && del.isSelectorAccepted())
+            {
+               return del;
+            }
+         }
+      }
+      
+      return null;      
+   }
+   
+   public List getQueues()
+   {
+      return queues;
+   }
+}

Added: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/FavourLocalRouterFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/FavourLocalRouterFactory.java	2006-09-17 15:47:58 UTC (rev 1297)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/FavourLocalRouterFactory.java	2006-09-17 17:58:08 UTC (rev 1298)
@@ -0,0 +1,43 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.plugin.postoffice.cluster;
+
+import org.jboss.messaging.core.Router;
+
+/**
+ * A FavourLocalRouterFactory
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+public class FavourLocalRouterFactory implements RouterFactory
+{
+
+   public Router createRouter()
+   {
+      return new FavourLocalRouter();
+   }
+
+}

Added: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RouterFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RouterFactory.java	2006-09-17 15:47:58 UTC (rev 1297)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RouterFactory.java	2006-09-17 17:58:08 UTC (rev 1298)
@@ -0,0 +1,38 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.plugin.postoffice.cluster;
+
+import org.jboss.messaging.core.Router;
+
+/**
+ * A RouterFactory
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+public interface RouterFactory
+{
+   Router createRouter();
+}

Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/ClusteredPostOfficeTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/ClusteredPostOfficeTest.java	2006-09-17 15:47:58 UTC (rev 1297)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/ClusteredPostOfficeTest.java	2006-09-17 17:58:08 UTC (rev 1298)
@@ -33,8 +33,10 @@
 import org.jboss.messaging.core.plugin.postoffice.Binding;
 import org.jboss.messaging.core.plugin.postoffice.cluster.BasicRedistributionPolicy;
 import org.jboss.messaging.core.plugin.postoffice.cluster.ClusteredPostOfficeImpl;
+import org.jboss.messaging.core.plugin.postoffice.cluster.FavourLocalRouterFactory;
 import org.jboss.messaging.core.plugin.postoffice.cluster.LocalClusteredQueue;
 import org.jboss.messaging.core.plugin.postoffice.cluster.RedistributionPolicy;
+import org.jboss.messaging.core.plugin.postoffice.cluster.RouterFactory;
 import org.jboss.messaging.core.tx.Transaction;
 import org.jboss.test.messaging.core.SimpleReceiver;
 import org.jboss.test.messaging.core.plugin.postoffice.SimpleFilter;
@@ -450,54 +452,6 @@
       this.clusteredRouteWithFilter(true);
    }
    
-   public final void testRedistribute() throws Exception
-   {
-      ClusteredPostOffice office1 = null;
-      
-      ClusteredPostOffice office2 = null;
-          
-      try
-      {   
-         office1 = createClusteredPostOffice("node1", "testgroup");
-         office2 = createClusteredPostOffice("node2", "testgroup");
-         
-         LocalClusteredQueue queue1 = new LocalClusteredQueue("node1", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
-         
-         Binding binding1 = office1.bindClusteredQueue("queue1", queue1);
-         
-         LocalClusteredQueue queue2 = new LocalClusteredQueue("node2", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
-         
-         Binding binding2 = office2.bindClusteredQueue("queue1", queue1);
-         
-         final int NUM_MESSAGES = 10;
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            Message msg = CoreMessageFactory.createCoreMessage(i, false, null);      
-            MessageReference ref = ms.reference(msg);         
-            boolean routed = office1.route(ref, "queue1", null);
-         }
-         
-         List msgs = queue1.browse();
-         assertEquals(NUM_MESSAGES, msgs.size());
-         msgs = queue2.browse();
-         assertEquals(0, msgs.size());
-         
-         
-      
-      }
-      finally
-      {
-         if (office1 != null)
-         {            
-            office1.stop();
-         }
-         
-         if (office2 != null)
-         {
-            office2.stop();
-         }
-      }
-   }
    
    /*
     * We should allow the clustered bind of queues with the same queue name on different nodes of the
@@ -609,6 +563,8 @@
       }
    }
    
+   
+   
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------
@@ -1474,17 +1430,19 @@
    
    protected ClusteredPostOffice createClusteredPostOffice(String nodeId, String groupName) throws Exception
    {
-      RedistributionPolicy redistPolicy = new BasicRedistributionPolicy(nodeId);
+      RedistributionPolicy redistPolicy = new NullRedistributionPolicy();
       
       FilterFactory ff = new SimpleFilterFactory();
       
+      RouterFactory rf = new FavourLocalRouterFactory();
+      
       ClusteredPostOfficeImpl postOffice = 
          new ClusteredPostOfficeImpl(sc.getDataSource(), sc.getTransactionManager(),
                                  null, true, nodeId, "Clustered", ms, pm, tr, ff, pool,
                                  groupName,
                                  JGroupsUtil.getControlStackProperties(50, 1),
                                  JGroupsUtil.getDataStackProperties(50, 1),
-                                 5000, 5000, redistPolicy, 1000);
+                                 5000, 5000, redistPolicy, 1000, rf);
       
       postOffice.start();      
       

Added: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/FavourLocalRouterTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/FavourLocalRouterTest.java	2006-09-17 15:47:58 UTC (rev 1297)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/FavourLocalRouterTest.java	2006-09-17 17:58:08 UTC (rev 1298)
@@ -0,0 +1,178 @@
+/*
+  * JBoss, Home of Professional Open Source
+  * Copyright 2005, JBoss Inc., and individual contributors as indicated
+  * by the @authors tag. See the copyright.txt in the distribution for a
+  * full listing of individual contributors.
+  *
+  * This is free software; you can redistribute it and/or modify it
+  * under the terms of the GNU Lesser General Public License as
+  * published by the Free Software Foundation; either version 2.1 of
+  * the License, or (at your option) any later version.
+  *
+  * This software is distributed in the hope that it will be useful,
+  * but WITHOUT ANY WARRANTY; without even the implied warranty of
+  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+  * Lesser General Public License for more details.
+  *
+  * You should have received a copy of the GNU Lesser General Public
+  * License along with this software; if not, write to the Free
+  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+  */
+package org.jboss.test.messaging.core.plugin.postoffice.cluster;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+import org.jboss.messaging.core.FilterFactory;
+import org.jboss.messaging.core.Message;
+import org.jboss.messaging.core.MessageReference;
+import org.jboss.messaging.core.local.PagingFilteredQueue;
+import org.jboss.messaging.core.plugin.contract.ClusteredPostOffice;
+import org.jboss.messaging.core.plugin.postoffice.Binding;
+import org.jboss.messaging.core.plugin.postoffice.cluster.BasicRedistributionPolicy;
+import org.jboss.messaging.core.plugin.postoffice.cluster.ClusteredPostOfficeImpl;
+import org.jboss.messaging.core.plugin.postoffice.cluster.FavourLocalRouterFactory;
+import org.jboss.messaging.core.plugin.postoffice.cluster.LocalClusteredQueue;
+import org.jboss.messaging.core.plugin.postoffice.cluster.RedistributionPolicy;
+import org.jboss.messaging.core.plugin.postoffice.cluster.RouterFactory;
+import org.jboss.messaging.core.tx.Transaction;
+import org.jboss.test.messaging.core.SimpleReceiver;
+import org.jboss.test.messaging.core.plugin.postoffice.SimpleFilter;
+import org.jboss.test.messaging.core.plugin.postoffice.SimpleFilterFactory;
+import org.jboss.test.messaging.core.plugin.postoffice.SimplePostOfficeTest;
+import org.jboss.test.messaging.util.CoreMessageFactory;
+
+import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
+
+public class FavourLocalRouterTest extends SimplePostOfficeTest
+{
+   // Constants -----------------------------------------------------
+
+   // Static --------------------------------------------------------
+   
+   // Attributes ----------------------------------------------------
+   
+   // Constructors --------------------------------------------------
+
+   public FavourLocalRouterTest(String name)
+   {
+      super(name);
+   }
+
+   // Public --------------------------------------------------------
+
+   public void setUp() throws Exception
+   {
+      super.setUp();     
+            
+   }
+
+   public void tearDown() throws Exception
+   {           
+      super.tearDown();
+   }
+   
+   public void testNoLocalQueue() throws Throwable
+   {
+      ClusteredPostOffice office1 = null;
+      
+      ClusteredPostOffice office2 = null;
+          
+      try
+      {   
+         office1 = createClusteredPostOffice("node1", "testgroup");
+         
+         office2 = createClusteredPostOffice("node2", "testgroup");
+         
+         office1 = createClusteredPostOffice("node1", "testgroup");
+         
+         office2 = createClusteredPostOffice("node2", "testgroup");
+         
+         LocalClusteredQueue queue1 = new LocalClusteredQueue("node1", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+         
+         Binding binding1 = office1.bindClusteredQueue("queue1", queue1);
+         
+         LocalClusteredQueue queue2 = new LocalClusteredQueue("node2", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+         
+         Binding binding2 = office2.bindClusteredQueue("queue1", queue1);
+      
+         final int NUM_MESSAGES = 10;
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            Message msg = CoreMessageFactory.createCoreMessage(i, false, null);      
+            MessageReference ref = ms.reference(msg);         
+            boolean routed = office1.route(ref, "queue1", null);
+         }
+         
+         //We have a favour local routing policy so all messages should be in queue1
+         List msgs = queue1.browse();
+         assertEquals(NUM_MESSAGES, msgs.size());
+         
+         msgs = queue2.browse();
+         assertEquals(0, msgs.size());
+         
+         office1.unbindClusteredQueue("queue1");
+         
+         //Send some more messages
+         
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            Message msg = CoreMessageFactory.createCoreMessage(i + 10, false, null);      
+            MessageReference ref = ms.reference(msg);         
+            boolean routed = office1.route(ref, "queue1", null);
+         }
+         
+         //There is no queue1 on node1 any more so the messages should be on node2
+         
+         msgs = queue2.browse();
+         assertEquals(NUM_MESSAGES, msgs.size());
+                           
+      }
+      finally
+      {
+         if (office1 != null)
+         {            
+            office1.stop();
+         }
+         
+         if (office2 != null)
+         {
+            office2.stop();
+         }
+      }
+   }
+   
+   
+   
+   protected ClusteredPostOffice createClusteredPostOffice(String nodeId, String groupName) throws Exception
+   {
+      RedistributionPolicy redistPolicy = new NullRedistributionPolicy();
+      
+      FilterFactory ff = new SimpleFilterFactory();
+      
+      RouterFactory rf = new FavourLocalRouterFactory();
+      
+      ClusteredPostOfficeImpl postOffice = 
+         new ClusteredPostOfficeImpl(sc.getDataSource(), sc.getTransactionManager(),
+                                 null, true, nodeId, "Clustered", ms, pm, tr, ff, pool,
+                                 groupName,
+                                 JGroupsUtil.getControlStackProperties(50, 1),
+                                 JGroupsUtil.getDataStackProperties(50, 1),
+                                 5000, 5000, redistPolicy, 1000, rf);
+      
+      postOffice.start();      
+      
+      return postOffice;
+   }
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+   
+
+}
+
+
+

Added: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/NullRedistributionPolicy.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/NullRedistributionPolicy.java	2006-09-17 15:47:58 UTC (rev 1297)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/NullRedistributionPolicy.java	2006-09-17 17:58:08 UTC (rev 1298)
@@ -0,0 +1,46 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.test.messaging.core.plugin.postoffice.cluster;
+
+import java.util.List;
+
+import org.jboss.messaging.core.plugin.postoffice.cluster.RedistributionOrder;
+import org.jboss.messaging.core.plugin.postoffice.cluster.RedistributionPolicy;
+
+/**
+ * A NullRedistrubtionPolicy
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+public class NullRedistributionPolicy implements RedistributionPolicy
+{
+
+   public RedistributionOrder calculate(List bindings)
+   {
+      return null;
+   }
+
+}




More information about the jboss-cvs-commits mailing list