[Jboss-cvs] JBoss Messaging SVN: r1298 - in trunk: src/main/org/jboss/messaging/core/plugin src/main/org/jboss/messaging/core/plugin/postoffice/cluster tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Sun Sep 17 13:58:17 EDT 2006
Author: timfox
Date: 2006-09-17 13:58:08 -0400 (Sun, 17 Sep 2006)
New Revision: 1298
Added:
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/FavourLocalRouter.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/FavourLocalRouterFactory.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RouterFactory.java
trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/FavourLocalRouterTest.java
trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/NullRedistributionPolicy.java
Removed:
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusterRouter.java
Modified:
trunk/src/main/org/jboss/messaging/core/plugin/ClusteredPostOfficeService.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusteredBindingsImpl.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusteredPostOfficeImpl.java
trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/ClusteredPostOfficeTest.java
Log:
More cluster routing stuff
Modified: trunk/src/main/org/jboss/messaging/core/plugin/ClusteredPostOfficeService.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/ClusteredPostOfficeService.java 2006-09-17 15:47:58 UTC (rev 1297)
+++ trunk/src/main/org/jboss/messaging/core/plugin/ClusteredPostOfficeService.java 2006-09-17 17:58:08 UTC (rev 1298)
@@ -34,7 +34,9 @@
import org.jboss.messaging.core.plugin.contract.PersistenceManager;
import org.jboss.messaging.core.plugin.postoffice.cluster.BasicRedistributionPolicy;
import org.jboss.messaging.core.plugin.postoffice.cluster.ClusteredPostOfficeImpl;
+import org.jboss.messaging.core.plugin.postoffice.cluster.FavourLocalRouterFactory;
import org.jboss.messaging.core.plugin.postoffice.cluster.RedistributionPolicy;
+import org.jboss.messaging.core.plugin.postoffice.cluster.RouterFactory;
import org.jboss.messaging.core.tx.TransactionRepository;
import org.w3c.dom.Element;
@@ -206,6 +208,8 @@
RedistributionPolicy redistPolicy = new BasicRedistributionPolicy(nodeId);
FilterFactory ff = new SelectorFactory();
+
+ RouterFactory rf = new FavourLocalRouterFactory();
postOffice = new ClusteredPostOfficeImpl(ds, tm, sqlProperties, createTablesOnStartup,
nodeId, officeName, ms,
@@ -213,7 +217,7 @@
groupName,
syncChannelConfig, asyncChannelConfig,
stateTimeout, castTimeout,
- redistPolicy, redistPeriod);
+ redistPolicy, redistPeriod, rf);
postOffice.start();
Deleted: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusterRouter.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusterRouter.java 2006-09-17 15:47:58 UTC (rev 1297)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusterRouter.java 2006-09-17 17:58:08 UTC (rev 1298)
@@ -1,145 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.messaging.core.plugin.postoffice.cluster;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-import org.jboss.messaging.core.Delivery;
-import org.jboss.messaging.core.DeliveryObserver;
-import org.jboss.messaging.core.MessageReference;
-import org.jboss.messaging.core.Receiver;
-import org.jboss.messaging.core.Router;
-import org.jboss.messaging.core.tx.Transaction;
-
-/**
- * A ClusterRouter
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @version <tt>$Revision: 1.1 $</tt>
- *
- * $Id$
- *
- */
-public class ClusterRouter implements Router
-{
- private List queues;
-
- private ClusteredQueue localQueue;
-
- public ClusterRouter()
- {
- queues = new ArrayList();
- }
-
- public boolean add(Receiver receiver)
- {
- ClusteredQueue queue = (ClusteredQueue)receiver;
-
- if (queue.isLocal())
- {
- if (localQueue != null)
- {
- throw new IllegalStateException("Already has local queue");
- }
- localQueue = queue;
- }
-
- queues.add(queue);
-
- return true;
- }
-
- public void clear()
- {
- queues.clear();
- localQueue = null;
- }
-
- public boolean contains(Receiver queue)
- {
- return queues.contains(queue);
- }
-
- public Iterator iterator()
- {
- return queues.iterator();
- }
-
- public boolean remove(Receiver queue)
- {
- if (queues.remove(queue))
- {
- if (localQueue == queue)
- {
- localQueue = null;
- }
- return true;
- }
- else
- {
- return false;
- }
- }
-
- public Delivery handle(DeliveryObserver observer, MessageReference reference, Transaction tx)
- {
- //Favour the local queue
-
- if (localQueue != null)
- {
- Delivery del = localQueue.handle(observer, reference, tx);
-
- if (del != null && del.isSelectorAccepted())
- {
- return del;
- }
- }
-
- //TODO make this round robin
-
- Iterator iter = queues.iterator();
-
- while (iter.hasNext())
- {
- ClusteredQueue queue = (ClusteredQueue)iter.next();
-
- if (!queue.isLocal())
- {
- Delivery del = queue.handle(observer, reference, tx);
-
- if (del != null && del.isSelectorAccepted())
- {
- return del;
- }
- }
- }
-
- return null;
- }
-
- public List getQueues()
- {
- return queues;
- }
-}
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusteredBindingsImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusteredBindingsImpl.java 2006-09-17 15:47:58 UTC (rev 1297)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusteredBindingsImpl.java 2006-09-17 17:58:08 UTC (rev 1298)
@@ -25,6 +25,7 @@
import java.util.HashMap;
import java.util.Map;
+import org.jboss.messaging.core.Router;
import org.jboss.messaging.core.plugin.postoffice.Binding;
import org.jboss.messaging.core.plugin.postoffice.BindingsImpl;
@@ -48,24 +49,28 @@
private int localDurableCount;
- public ClusteredBindingsImpl(String thisNode)
+ private RouterFactory rf;
+
+ public ClusteredBindingsImpl(String thisNode, RouterFactory rf)
{
super();
nameMap = new HashMap();
this.thisNode = thisNode;
+
+ this.rf = rf;
}
public void addBinding(Binding binding)
{
super.addBinding(binding);
- ClusterRouter router = (ClusterRouter)nameMap.get(binding.getQueue().getName());
+ Router router = (Router)nameMap.get(binding.getQueue().getName());
if (router == null)
{
- router = new ClusterRouter();
+ router = rf.createRouter();
nameMap.put(binding.getQueue().getName(), router);
}
@@ -87,7 +92,7 @@
return false;
}
- ClusterRouter router = (ClusterRouter)nameMap.get(binding.getQueue().getName());
+ FavourLocalRouter router = (FavourLocalRouter)nameMap.get(binding.getQueue().getName());
if (router == null)
{
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusteredPostOfficeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusteredPostOfficeImpl.java 2006-09-17 15:47:58 UTC (rev 1297)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusteredPostOfficeImpl.java 2006-09-17 17:58:08 UTC (rev 1298)
@@ -129,6 +129,8 @@
private MessageRedistributor redistributor;
private long redistributePeriod;
+
+ private RouterFactory routerFactory;
public ClusteredPostOfficeImpl()
{
@@ -157,10 +159,11 @@
Element asyncChannelConfig,
long stateTimeout, long castTimeout,
RedistributionPolicy redistributionPolicy,
- long redistributePeriod) throws Exception
+ long redistributePeriod,
+ RouterFactory rf) throws Exception
{
this(ds, tm, sqlProperties, createTablesOnStartup, nodeId, officeName, ms,
- pm, tr, filterFactory, pool, groupName, stateTimeout, castTimeout, redistributionPolicy, redistributePeriod);
+ pm, tr, filterFactory, pool, groupName, stateTimeout, castTimeout, redistributionPolicy, redistributePeriod, rf);
this.syncChannelConfigE = syncChannelConfig;
this.asyncChannelConfigE = asyncChannelConfig;
@@ -181,10 +184,11 @@
String asyncChannelConfig,
long stateTimeout, long castTimeout,
RedistributionPolicy redistributionPolicy,
- long redistributePeriod) throws Exception
+ long redistributePeriod,
+ RouterFactory rf) throws Exception
{
this(ds, tm, sqlProperties, createTablesOnStartup, nodeId, officeName, ms,
- pm, tr, filterFactory, pool, groupName, stateTimeout, castTimeout, redistributionPolicy, redistributePeriod);
+ pm, tr, filterFactory, pool, groupName, stateTimeout, castTimeout, redistributionPolicy, redistributePeriod, rf);
this.syncChannelConfigS = syncChannelConfig;
this.asyncChannelConfigS = asyncChannelConfig;
@@ -200,7 +204,8 @@
String groupName,
long stateTimeout, long castTimeout,
RedistributionPolicy redistributionPolicy,
- long redistributePeriod)
+ long redistributePeriod,
+ RouterFactory rf)
{
super (ds, tm, sqlProperties, createTablesOnStartup, nodeId, officeName, ms, pm, tr, filterFactory,
pool);
@@ -217,6 +222,8 @@
this.redistributePeriod = redistributePeriod;
+ this.routerFactory = rf;
+
init();
}
@@ -776,7 +783,7 @@
while (iter2.hasNext())
{
- ClusterRouter router = (ClusterRouter)iter2.next();
+ FavourLocalRouter router = (FavourLocalRouter)iter2.next();
RedistributionOrder order = redistributionPolicy.calculate(router.getQueues());
@@ -922,7 +929,7 @@
protected Bindings createBindings()
{
- return new ClusteredBindingsImpl(this.nodeId);
+ return new ClusteredBindingsImpl(this.nodeId, this.routerFactory);
}
protected void loadBindings() throws Exception
Added: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/FavourLocalRouter.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/FavourLocalRouter.java 2006-09-17 15:47:58 UTC (rev 1297)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/FavourLocalRouter.java 2006-09-17 17:58:08 UTC (rev 1298)
@@ -0,0 +1,148 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.plugin.postoffice.cluster;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.jboss.messaging.core.Delivery;
+import org.jboss.messaging.core.DeliveryObserver;
+import org.jboss.messaging.core.MessageReference;
+import org.jboss.messaging.core.Receiver;
+import org.jboss.messaging.core.Router;
+import org.jboss.messaging.core.tx.Transaction;
+
+/**
+ *
+ * A FavourLocalRouter
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+public class FavourLocalRouter implements Router
+{
+ private List queues;
+
+ private ClusteredQueue localQueue;
+
+ public FavourLocalRouter()
+ {
+ queues = new ArrayList();
+ }
+
+ public boolean add(Receiver receiver)
+ {
+ ClusteredQueue queue = (ClusteredQueue)receiver;
+
+ if (queue.isLocal())
+ {
+ if (localQueue != null)
+ {
+ throw new IllegalStateException("Already has local queue");
+ }
+ localQueue = queue;
+ }
+
+ queues.add(queue);
+
+ return true;
+ }
+
+ public void clear()
+ {
+ queues.clear();
+ localQueue = null;
+ }
+
+ public boolean contains(Receiver queue)
+ {
+ return queues.contains(queue);
+ }
+
+ public Iterator iterator()
+ {
+ return queues.iterator();
+ }
+
+ public boolean remove(Receiver queue)
+ {
+ if (queues.remove(queue))
+ {
+ if (localQueue == queue)
+ {
+ localQueue = null;
+ }
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+ public Delivery handle(DeliveryObserver observer, MessageReference reference, Transaction tx)
+ {
+ //Favour the local queue
+
+ if (localQueue != null)
+ {
+ //But only if it has consumers
+
+ Delivery del = localQueue.handle(observer, reference, tx);
+
+ if (del != null && del.isSelectorAccepted())
+ {
+ return del;
+ }
+ }
+
+ //TODO make this round robin
+
+ Iterator iter = queues.iterator();
+
+ while (iter.hasNext())
+ {
+ ClusteredQueue queue = (ClusteredQueue)iter.next();
+
+ if (!queue.isLocal())
+ {
+ Delivery del = queue.handle(observer, reference, tx);
+
+ if (del != null && del.isSelectorAccepted())
+ {
+ return del;
+ }
+ }
+ }
+
+ return null;
+ }
+
+ public List getQueues()
+ {
+ return queues;
+ }
+}
Added: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/FavourLocalRouterFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/FavourLocalRouterFactory.java 2006-09-17 15:47:58 UTC (rev 1297)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/FavourLocalRouterFactory.java 2006-09-17 17:58:08 UTC (rev 1298)
@@ -0,0 +1,43 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.plugin.postoffice.cluster;
+
+import org.jboss.messaging.core.Router;
+
+/**
+ * A FavourLocalRouterFactory
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+public class FavourLocalRouterFactory implements RouterFactory
+{
+
+ public Router createRouter()
+ {
+ return new FavourLocalRouter();
+ }
+
+}
Added: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RouterFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RouterFactory.java 2006-09-17 15:47:58 UTC (rev 1297)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RouterFactory.java 2006-09-17 17:58:08 UTC (rev 1298)
@@ -0,0 +1,38 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.plugin.postoffice.cluster;
+
+import org.jboss.messaging.core.Router;
+
+/**
+ * A RouterFactory
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+public interface RouterFactory
+{
+ Router createRouter();
+}
Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/ClusteredPostOfficeTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/ClusteredPostOfficeTest.java 2006-09-17 15:47:58 UTC (rev 1297)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/ClusteredPostOfficeTest.java 2006-09-17 17:58:08 UTC (rev 1298)
@@ -33,8 +33,10 @@
import org.jboss.messaging.core.plugin.postoffice.Binding;
import org.jboss.messaging.core.plugin.postoffice.cluster.BasicRedistributionPolicy;
import org.jboss.messaging.core.plugin.postoffice.cluster.ClusteredPostOfficeImpl;
+import org.jboss.messaging.core.plugin.postoffice.cluster.FavourLocalRouterFactory;
import org.jboss.messaging.core.plugin.postoffice.cluster.LocalClusteredQueue;
import org.jboss.messaging.core.plugin.postoffice.cluster.RedistributionPolicy;
+import org.jboss.messaging.core.plugin.postoffice.cluster.RouterFactory;
import org.jboss.messaging.core.tx.Transaction;
import org.jboss.test.messaging.core.SimpleReceiver;
import org.jboss.test.messaging.core.plugin.postoffice.SimpleFilter;
@@ -450,54 +452,6 @@
this.clusteredRouteWithFilter(true);
}
- public final void testRedistribute() throws Exception
- {
- ClusteredPostOffice office1 = null;
-
- ClusteredPostOffice office2 = null;
-
- try
- {
- office1 = createClusteredPostOffice("node1", "testgroup");
- office2 = createClusteredPostOffice("node2", "testgroup");
-
- LocalClusteredQueue queue1 = new LocalClusteredQueue("node1", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
-
- Binding binding1 = office1.bindClusteredQueue("queue1", queue1);
-
- LocalClusteredQueue queue2 = new LocalClusteredQueue("node2", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
-
- Binding binding2 = office2.bindClusteredQueue("queue1", queue1);
-
- final int NUM_MESSAGES = 10;
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- Message msg = CoreMessageFactory.createCoreMessage(i, false, null);
- MessageReference ref = ms.reference(msg);
- boolean routed = office1.route(ref, "queue1", null);
- }
-
- List msgs = queue1.browse();
- assertEquals(NUM_MESSAGES, msgs.size());
- msgs = queue2.browse();
- assertEquals(0, msgs.size());
-
-
-
- }
- finally
- {
- if (office1 != null)
- {
- office1.stop();
- }
-
- if (office2 != null)
- {
- office2.stop();
- }
- }
- }
/*
* We should allow the clustered bind of queues with the same queue name on different nodes of the
@@ -609,6 +563,8 @@
}
}
+
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
@@ -1474,17 +1430,19 @@
protected ClusteredPostOffice createClusteredPostOffice(String nodeId, String groupName) throws Exception
{
- RedistributionPolicy redistPolicy = new BasicRedistributionPolicy(nodeId);
+ RedistributionPolicy redistPolicy = new NullRedistributionPolicy();
FilterFactory ff = new SimpleFilterFactory();
+ RouterFactory rf = new FavourLocalRouterFactory();
+
ClusteredPostOfficeImpl postOffice =
new ClusteredPostOfficeImpl(sc.getDataSource(), sc.getTransactionManager(),
null, true, nodeId, "Clustered", ms, pm, tr, ff, pool,
groupName,
JGroupsUtil.getControlStackProperties(50, 1),
JGroupsUtil.getDataStackProperties(50, 1),
- 5000, 5000, redistPolicy, 1000);
+ 5000, 5000, redistPolicy, 1000, rf);
postOffice.start();
Added: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/FavourLocalRouterTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/FavourLocalRouterTest.java 2006-09-17 15:47:58 UTC (rev 1297)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/FavourLocalRouterTest.java 2006-09-17 17:58:08 UTC (rev 1298)
@@ -0,0 +1,178 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.test.messaging.core.plugin.postoffice.cluster;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+import org.jboss.messaging.core.FilterFactory;
+import org.jboss.messaging.core.Message;
+import org.jboss.messaging.core.MessageReference;
+import org.jboss.messaging.core.local.PagingFilteredQueue;
+import org.jboss.messaging.core.plugin.contract.ClusteredPostOffice;
+import org.jboss.messaging.core.plugin.postoffice.Binding;
+import org.jboss.messaging.core.plugin.postoffice.cluster.BasicRedistributionPolicy;
+import org.jboss.messaging.core.plugin.postoffice.cluster.ClusteredPostOfficeImpl;
+import org.jboss.messaging.core.plugin.postoffice.cluster.FavourLocalRouterFactory;
+import org.jboss.messaging.core.plugin.postoffice.cluster.LocalClusteredQueue;
+import org.jboss.messaging.core.plugin.postoffice.cluster.RedistributionPolicy;
+import org.jboss.messaging.core.plugin.postoffice.cluster.RouterFactory;
+import org.jboss.messaging.core.tx.Transaction;
+import org.jboss.test.messaging.core.SimpleReceiver;
+import org.jboss.test.messaging.core.plugin.postoffice.SimpleFilter;
+import org.jboss.test.messaging.core.plugin.postoffice.SimpleFilterFactory;
+import org.jboss.test.messaging.core.plugin.postoffice.SimplePostOfficeTest;
+import org.jboss.test.messaging.util.CoreMessageFactory;
+
+import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
+
+public class FavourLocalRouterTest extends SimplePostOfficeTest
+{
+ // Constants -----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public FavourLocalRouterTest(String name)
+ {
+ super(name);
+ }
+
+ // Public --------------------------------------------------------
+
+ public void setUp() throws Exception
+ {
+ super.setUp();
+
+ }
+
+ public void tearDown() throws Exception
+ {
+ super.tearDown();
+ }
+
+ public void testNoLocalQueue() throws Throwable
+ {
+ ClusteredPostOffice office1 = null;
+
+ ClusteredPostOffice office2 = null;
+
+ try
+ {
+ office1 = createClusteredPostOffice("node1", "testgroup");
+
+ office2 = createClusteredPostOffice("node2", "testgroup");
+
+ office1 = createClusteredPostOffice("node1", "testgroup");
+
+ office2 = createClusteredPostOffice("node2", "testgroup");
+
+ LocalClusteredQueue queue1 = new LocalClusteredQueue("node1", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+
+ Binding binding1 = office1.bindClusteredQueue("queue1", queue1);
+
+ LocalClusteredQueue queue2 = new LocalClusteredQueue("node2", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+
+ Binding binding2 = office2.bindClusteredQueue("queue1", queue1);
+
+ final int NUM_MESSAGES = 10;
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ Message msg = CoreMessageFactory.createCoreMessage(i, false, null);
+ MessageReference ref = ms.reference(msg);
+ boolean routed = office1.route(ref, "queue1", null);
+ }
+
+ //We have a favour local routing policy so all messages should be in queue1
+ List msgs = queue1.browse();
+ assertEquals(NUM_MESSAGES, msgs.size());
+
+ msgs = queue2.browse();
+ assertEquals(0, msgs.size());
+
+ office1.unbindClusteredQueue("queue1");
+
+ //Send some more messages
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ Message msg = CoreMessageFactory.createCoreMessage(i + 10, false, null);
+ MessageReference ref = ms.reference(msg);
+ boolean routed = office1.route(ref, "queue1", null);
+ }
+
+ //There is no queue1 on node1 any more so the messages should be on node2
+
+ msgs = queue2.browse();
+ assertEquals(NUM_MESSAGES, msgs.size());
+
+ }
+ finally
+ {
+ if (office1 != null)
+ {
+ office1.stop();
+ }
+
+ if (office2 != null)
+ {
+ office2.stop();
+ }
+ }
+ }
+
+
+
+ protected ClusteredPostOffice createClusteredPostOffice(String nodeId, String groupName) throws Exception
+ {
+ RedistributionPolicy redistPolicy = new NullRedistributionPolicy();
+
+ FilterFactory ff = new SimpleFilterFactory();
+
+ RouterFactory rf = new FavourLocalRouterFactory();
+
+ ClusteredPostOfficeImpl postOffice =
+ new ClusteredPostOfficeImpl(sc.getDataSource(), sc.getTransactionManager(),
+ null, true, nodeId, "Clustered", ms, pm, tr, ff, pool,
+ groupName,
+ JGroupsUtil.getControlStackProperties(50, 1),
+ JGroupsUtil.getDataStackProperties(50, 1),
+ 5000, 5000, redistPolicy, 1000, rf);
+
+ postOffice.start();
+
+ return postOffice;
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+
+}
+
+
+
Added: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/NullRedistributionPolicy.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/NullRedistributionPolicy.java 2006-09-17 15:47:58 UTC (rev 1297)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/NullRedistributionPolicy.java 2006-09-17 17:58:08 UTC (rev 1298)
@@ -0,0 +1,46 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.test.messaging.core.plugin.postoffice.cluster;
+
+import java.util.List;
+
+import org.jboss.messaging.core.plugin.postoffice.cluster.RedistributionOrder;
+import org.jboss.messaging.core.plugin.postoffice.cluster.RedistributionPolicy;
+
+/**
+ * A NullRedistrubtionPolicy
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+public class NullRedistributionPolicy implements RedistributionPolicy
+{
+
+ public RedistributionOrder calculate(List bindings)
+ {
+ return null;
+ }
+
+}
More information about the jboss-cvs-commits
mailing list