[Jboss-cvs] JBoss Messaging SVN: r1287 - in trunk: src/etc/server/default/deploy src/etc/xmdesc src/main/org/jboss/messaging/core/plugin src/main/org/jboss/messaging/core/plugin/contract src/main/org/jboss/messaging/core/plugin/postoffice src/main/org/jboss/messaging/core/plugin/postoffice/cluster src/main/org/jboss/messaging/core/tx tests/src/org/jboss/test/messaging/core/plugin
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Sep 13 14:23:44 EDT 2006
Author: timfox
Date: 2006-09-13 14:23:31 -0400 (Wed, 13 Sep 2006)
New Revision: 1287
Added:
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/BalancedBindingImpl.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/BasicRedistributionPolicy.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MessageRedistributor.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/QueueStats.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/QueueStatsRequest.java
Removed:
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MessageMover.java
Modified:
trunk/src/etc/server/default/deploy/hsqldb-persistence-service.xml
trunk/src/etc/xmdesc/ClusteredPostOffice-xmbean.xml
trunk/src/main/org/jboss/messaging/core/plugin/ClusteredPostOfficeService.java
trunk/src/main/org/jboss/messaging/core/plugin/contract/ClusteredPostOffice.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/PostOfficeImpl.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/BalancedBinding.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusteredPostOfficeImpl.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MoveMessagesCallback.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java
trunk/src/main/org/jboss/messaging/core/tx/TransactionRepository.java
trunk/tests/src/org/jboss/test/messaging/core/plugin/ClusteredPostOfficeTest.java
Log:
Clustering - message redistribution
Modified: trunk/src/etc/server/default/deploy/hsqldb-persistence-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/hsqldb-persistence-service.xml 2006-09-13 14:37:47 UTC (rev 1286)
+++ trunk/src/etc/server/default/deploy/hsqldb-persistence-service.xml 2006-09-13 18:23:31 UTC (rev 1287)
@@ -59,6 +59,7 @@
<attribute name="GroupName">cluster1</attribute>
<attribute name="StateTimeout">5000</attribute>
<attribute name="CastTimeout">5000</attribute>
+ <attribute name="RedistributionPeriod">5000</attribute>
<attribute name="SyncChannelConfig">
<UDP mcast_addr="228.8.8.8" mcast_port="45568"
ip_ttl="8" ip_mcast="true"
Modified: trunk/src/etc/xmdesc/ClusteredPostOffice-xmbean.xml
===================================================================
--- trunk/src/etc/xmdesc/ClusteredPostOffice-xmbean.xml 2006-09-13 14:37:47 UTC (rev 1286)
+++ trunk/src/etc/xmdesc/ClusteredPostOffice-xmbean.xml 2006-09-13 18:23:31 UTC (rev 1287)
@@ -55,10 +55,16 @@
<attribute access="read-write" getMethod="getCastTimeout" setMethod="setCastTimeout">
<description>Timeout for getState()</description>
- <name>Timeout when waiting for synchronous responses when sending synchronous requests</name>
+ <name>CastTimeout/name>
<type>long</type>
</attribute>
+ <attribute access="read-write" getMethod="getRedistributionPeriod" setMethod="setRedistributionPeriod">
+ <description>The period between which successive message redistribution calculations will be performed</description>
+ <name>RedistributionPeriod</name>
+ <type>long</type>
+ </attribute>
+
<attribute access="read-write" getMethod="getSyncChannelConfig" setMethod="setSyncChannelConfig">
<description>The JGroups stack configuration for the synchronous channel</description>
<name>SyncChannelConfig</name>
Modified: trunk/src/main/org/jboss/messaging/core/plugin/ClusteredPostOfficeService.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/ClusteredPostOfficeService.java 2006-09-13 14:37:47 UTC (rev 1286)
+++ trunk/src/main/org/jboss/messaging/core/plugin/ClusteredPostOfficeService.java 2006-09-13 18:23:31 UTC (rev 1287)
@@ -29,8 +29,10 @@
import org.jboss.messaging.core.plugin.contract.MessageStore;
import org.jboss.messaging.core.plugin.contract.MessagingComponent;
import org.jboss.messaging.core.plugin.contract.PersistenceManager;
+import org.jboss.messaging.core.plugin.postoffice.cluster.BasicRedistributionPolicy;
import org.jboss.messaging.core.plugin.postoffice.cluster.ClusteredPostOfficeImpl;
import org.jboss.messaging.core.plugin.postoffice.cluster.FavourLocalRoutingPolicy;
+import org.jboss.messaging.core.plugin.postoffice.cluster.RedistributionPolicy;
import org.jboss.messaging.core.plugin.postoffice.cluster.RoutingPolicy;
import org.jboss.messaging.core.tx.TransactionRepository;
import org.w3c.dom.Element;
@@ -64,6 +66,8 @@
private long castTimeout = 5000;
+ private long redistPeriod = 5000;
+
private String groupName;
// Constructors --------------------------------------------------------
@@ -151,6 +155,16 @@
return castTimeout;
}
+ public void setRedistributionPeriod(long period)
+ {
+ this.redistPeriod = period;
+ }
+
+ public long getRedistributionPeriod()
+ {
+ return redistPeriod;
+ }
+
public void setGroupName(String groupName)
{
this.groupName = groupName;
@@ -175,8 +189,7 @@
try
{
TransactionManager tm = getTransactionManagerReference();
-
-
+
ServerPeer serverPeer = (ServerPeer)server.getAttribute(serverPeerObjectName, "Instance");
MessageStore ms = serverPeer.getMessageStore();
@@ -187,14 +200,16 @@
String nodeId = serverPeer.getServerPeerID();
- RoutingPolicy policy = new FavourLocalRoutingPolicy(nodeId);
+ RoutingPolicy routingPolicy = new FavourLocalRoutingPolicy(nodeId);
+
+ RedistributionPolicy redistPolicy = new BasicRedistributionPolicy(nodeId);
postOffice = new ClusteredPostOfficeImpl(ds, tm, sqlProperties, createTablesOnStartup,
nodeId, officeName, ms,
groupName,
syncChannelConfig, asyncChannelConfig,
tr, pm, stateTimeout, castTimeout,
- policy);
+ routingPolicy, redistPolicy, redistPeriod);
postOffice.start();
Modified: trunk/src/main/org/jboss/messaging/core/plugin/contract/ClusteredPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/contract/ClusteredPostOffice.java 2006-09-13 14:37:47 UTC (rev 1286)
+++ trunk/src/main/org/jboss/messaging/core/plugin/contract/ClusteredPostOffice.java 2006-09-13 18:23:31 UTC (rev 1287)
@@ -54,4 +54,5 @@
* @throws Throwable
*/
Binding unbindClusteredQueue(String queueName) throws Throwable;
+
}
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/PostOfficeImpl.java 2006-09-13 14:37:47 UTC (rev 1286)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/PostOfficeImpl.java 2006-09-13 18:23:31 UTC (rev 1287)
@@ -157,7 +157,7 @@
String filter = queue.getFilter() == null ? null : queue.getFilter().getFilterString();
- binding = new BindingImpl(nodeId, queueName, condition, filter,
+ binding = createBinding(nodeId, queueName, condition, filter,
queue.getChannelID(), durable);
binding.setQueue(queue);
@@ -178,7 +178,7 @@
{
lock.writeLock().release();
}
- }
+ }
public Binding unbindQueue(String queueName) throws Throwable
{
@@ -220,15 +220,24 @@
throw new IllegalArgumentException("Condition is null");
}
- lock.writeLock().acquire();
+ lock.readLock().acquire();
try
{
- return listMatchingBindings(condition);
+ List list = (List)conditionMap.get(condition);
+
+ if (list == null)
+ {
+ return Collections.EMPTY_LIST;
+ }
+ else
+ {
+ return list;
+ }
}
finally
{
- lock.writeLock().release();
+ lock.readLock().release();
}
}
@@ -239,7 +248,7 @@
throw new IllegalArgumentException("Queue name is null");
}
- lock.writeLock().acquire();
+ lock.readLock().acquire();
try
{
@@ -256,7 +265,7 @@
}
finally
{
- lock.writeLock().release();
+ lock.readLock().release();
}
}
@@ -345,6 +354,13 @@
// Protected -----------------------------------------------------
+ protected Binding createBinding(String nodeId, String queueName, String condition, String filter,
+ long channelId, boolean durable)
+ {
+ return new BindingImpl(nodeId, queueName, condition, filter,
+ channelId, durable);
+ }
+
protected void loadBindings() throws Exception
{
lock.writeLock().acquire();
@@ -476,7 +492,7 @@
//We don't load the actual queue - this is because we don't know the paging params until
//activation time
- Binding binding = new BindingImpl(nodeId, queueName, condition, selector, channelId, true);
+ Binding binding = createBinding(nodeId, queueName, condition, selector, channelId, true);
list.add(binding);
}
@@ -606,26 +622,6 @@
}
// Private -------------------------------------------------------
-
- /*
- * List all bindings whose condition matches the wildcard
- * Initially we just do an exact match - when we support topic hierarchies this
- * will change
- */
- private List listMatchingBindings(String wildcard)
- {
- List list = (List)conditionMap.get(wildcard);
-
- if (list == null)
- {
- return Collections.EMPTY_LIST;
- }
- else
- {
- return list;
- }
- }
-
-
+
// Inner classes -------------------------------------------------
}
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/BalancedBinding.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/BalancedBinding.java 2006-09-13 14:37:47 UTC (rev 1286)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/BalancedBinding.java 2006-09-13 18:23:31 UTC (rev 1287)
@@ -37,4 +37,8 @@
double getConsumptionRate();
int getMessageCount();
+
+ void setConsumptionRate(double rate);
+
+ void setMessageCount(int count);
}
Added: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/BalancedBindingImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/BalancedBindingImpl.java 2006-09-13 14:37:47 UTC (rev 1286)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/BalancedBindingImpl.java 2006-09-13 18:23:31 UTC (rev 1287)
@@ -0,0 +1,72 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.plugin.postoffice.cluster;
+
+import org.jboss.messaging.core.plugin.postoffice.BindingImpl;
+
+/**
+ * A BalancedBindingImpl
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+public class BalancedBindingImpl extends BindingImpl implements BalancedBinding
+{
+ private double consumptionRate;
+
+ private int messageCount;
+
+ public BalancedBindingImpl()
+ {
+ }
+
+ public BalancedBindingImpl(String nodeId, String queueName, String condition, String selector, long channelId, boolean durable)
+ {
+ super(nodeId, queueName, condition, selector, channelId, durable);
+ }
+
+ public double getConsumptionRate()
+ {
+ return consumptionRate;
+ }
+
+ public int getMessageCount()
+ {
+ return messageCount;
+ }
+
+ public void setConsumptionRate(double consumptionRate)
+ {
+ this.consumptionRate = consumptionRate;
+ }
+
+ public void setMessageCount(int messageCount)
+ {
+ this.messageCount = messageCount;
+ }
+
+
+
+}
Added: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/BasicRedistributionPolicy.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/BasicRedistributionPolicy.java 2006-09-13 14:37:47 UTC (rev 1286)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/BasicRedistributionPolicy.java 2006-09-13 18:23:31 UTC (rev 1287)
@@ -0,0 +1,111 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.plugin.postoffice.cluster;
+
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * A BasicRedistributonPolicy
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+public class BasicRedistributionPolicy implements RedistributionPolicy
+{
+ private String localNodeId;
+
+ private int MAX_MESSAGES_TO_MOVE = 100;
+
+ public BasicRedistributionPolicy(String localNodeId)
+ {
+ this.localNodeId = localNodeId;
+ }
+
+ public RedistributionOrder calculate(List bindings)
+ {
+ Iterator iter = bindings.iterator();
+
+ BalancedBinding localBinding = null;
+
+ while (iter.hasNext())
+ {
+ BalancedBinding binding = (BalancedBinding)iter.next();
+
+ if (binding.getNodeId().equals(localNodeId))
+ {
+ localBinding = binding;
+
+ break;
+ }
+ }
+
+ if (localBinding == null)
+ {
+ return null;
+ }
+
+ if (localBinding.getConsumptionRate() == 0 && localBinding.getMessageCount() > 0)
+ {
+ //No consumers on the queue - the messages are stranded
+ //We should consider moving them somewhere else
+
+ //We move messages to the node with the highest consumption rate
+
+ iter = bindings.iterator();
+
+ double maxRate = 0;
+
+ BalancedBinding maxRateBinding = null;
+
+ while (iter.hasNext())
+ {
+ BalancedBinding binding = (BalancedBinding)iter.next();
+
+ if (!binding.getNodeId().equals(localNodeId))
+ {
+ if (binding.getConsumptionRate() > maxRate)
+ {
+ maxRate = binding.getConsumptionRate();
+
+ maxRateBinding = binding;
+ }
+ }
+ }
+
+ if (maxRate > 0)
+ {
+ //Move messages to this node
+
+ //How many should we move?
+ int numberToMove = Math.min(MAX_MESSAGES_TO_MOVE, localBinding.getMessageCount());
+
+ return new RedistributionOrder(numberToMove, localBinding.getQueueName(), maxRateBinding.getNodeId());
+ }
+ }
+
+ return null;
+ }
+}
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusteredPostOfficeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusteredPostOfficeImpl.java 2006-09-13 14:37:47 UTC (rev 1286)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusteredPostOfficeImpl.java 2006-09-13 18:23:31 UTC (rev 1287)
@@ -42,7 +42,6 @@
import org.jboss.messaging.core.plugin.contract.PersistenceManager;
import org.jboss.messaging.core.plugin.postoffice.ConditionBindings;
import org.jboss.messaging.core.plugin.postoffice.PostOfficeImpl;
-import org.jboss.messaging.core.plugin.postoffice.BindingImpl;
import org.jboss.messaging.core.tx.Transaction;
import org.jboss.messaging.core.tx.TransactionRepository;
import org.jgroups.Address;
@@ -115,6 +114,12 @@
private long castTimeout;
private RoutingPolicy routingPolicy;
+
+ private RedistributionPolicy redistributionPolicy;
+
+ private MessageRedistributor redistributor;
+
+ private long redistributePeriod;
public ClusteredPostOfficeImpl()
{
@@ -140,10 +145,13 @@
TransactionRepository tr,
PersistenceManager pm,
long stateTimeout, long castTimeout,
- RoutingPolicy routingPolicy) throws Exception
+ RoutingPolicy routingPolicy,
+ RedistributionPolicy redistributionPolicy,
+ long redistributePeriod) throws Exception
{
this(ds, tm, sqlProperties, createTablesOnStartup, nodeId, officeName, ms,
- groupName, tr, pm, stateTimeout, castTimeout, routingPolicy);
+ groupName, tr, pm, stateTimeout, castTimeout, routingPolicy, redistributionPolicy,
+ redistributePeriod);
this.syncChannelConfigE = syncChannelConfig;
this.asyncChannelConfigE = asyncChannelConfig;
@@ -161,10 +169,12 @@
TransactionRepository tr,
PersistenceManager pm,
long stateTimeout, long castTimeout,
- RoutingPolicy routingPolicy) throws Exception
+ RoutingPolicy routingPolicy,
+ RedistributionPolicy redistributionPolicy,
+ long redistributePeriod) throws Exception
{
this(ds, tm, sqlProperties, createTablesOnStartup, nodeId, officeName, ms,
- groupName, tr, pm, stateTimeout, castTimeout, routingPolicy);
+ groupName, tr, pm, stateTimeout, castTimeout, routingPolicy, redistributionPolicy, redistributePeriod);
this.syncChannelConfigS = syncChannelConfig;
this.asyncChannelConfigS = asyncChannelConfig;
@@ -177,7 +187,9 @@
TransactionRepository tr,
PersistenceManager pm,
long stateTimeout, long castTimeout,
- RoutingPolicy routingPolicy)
+ RoutingPolicy routingPolicy,
+ RedistributionPolicy redistributionPolicy,
+ long redistributePeriod)
{
super (ds, tm, sqlProperties, createTablesOnStartup, nodeId, officeName, ms, tr);
@@ -191,9 +203,15 @@
this.routingPolicy = routingPolicy;
+ this.redistributionPolicy = redistributionPolicy;
+
+ this.redistributePeriod = redistributePeriod;
+
init();
}
+ // MessagingComponent overrides
+ // --------------------------------------------------------------
public void start() throws Exception
{
@@ -236,12 +254,18 @@
handleAddressNodeMapping(currentAddress, nodeId);
syncSendRequest(new SendNodeIdRequest(currentAddress, nodeId));
+
+ redistributor = new MessageRedistributor(this, redistributePeriod);
+
+ redistributor.start();
}
public void stop() throws Exception
{
super.stop();
+ redistributor.stop();
+
syncChannel.close();
asyncChannel.close();
@@ -463,7 +487,7 @@
* Called when another node adds a binding
*/
public void addBindingFromCluster(String nodeId, String queueName, String condition,
- String filterString, long channelID, boolean durable)
+ String filterString, long channelID, boolean durable)
throws Exception
{
lock.writeLock().acquire();
@@ -491,8 +515,8 @@
throw new IllegalArgumentException(this.nodeId + "Binding already exists for node Id " + nodeId + " queue name " + queueName);
}
- binding = new BindingImpl(nodeId, queueName, condition, filterString,
- channelID, durable);
+ binding = new BalancedBindingImpl(nodeId, queueName, condition, filterString,
+ channelID, durable);
binding.activate();
@@ -533,7 +557,7 @@
try
{
- nodeIdAddressMap.put(nodeId, address.toString());
+ nodeIdAddressMap.put(nodeId, address);
}
finally
{
@@ -653,6 +677,9 @@
}
}
+ /*
+ * Multicast a message to all members of the group
+ */
public void asyncSendRequest(ClusterRequest request) throws Exception
{
//TODO - handle serialization more efficiently
@@ -660,6 +687,19 @@
}
/*
+ * Unicast a message to one members of the group
+ */
+ public void asyncSendRequest(ClusterRequest request, String nodeId) throws Exception
+ {
+ Address address = (Address)nodeIdAddressMap.get(nodeId);
+
+ Message m = new Message(address, null, request);
+
+ //TODO - handle serialization more efficiently
+ asyncChannel.send(m);
+ }
+
+ /*
* We put the transaction in the holding area
*/
public void holdTransaction(TransactionId id, ClusterTransaction tx) throws Exception
@@ -742,53 +782,179 @@
}
}
- private boolean checkTransaction(List messageHolders) throws Exception
+ public void calculateRedistribution() throws Throwable
{
- Iterator iter = messageHolders.iterator();
+ lock.readLock().acquire();
- //We only need to check that one of the refs made it to the database - the refs would have
- //been inserted into the db transactionally, so either they're all there or none are
- MessageHolder holder = (MessageHolder)iter.next();
-
- List bindings = listBindingsForCondition(holder.getRoutingKey());
-
- if (bindings == null)
+ try
{
- throw new IllegalStateException("Cannot find bindings for key: " + holder.getRoutingKey());
+ Iterator iter = conditionMap.values().iterator();
+
+ while (iter.hasNext())
+ {
+ ConditionBindings cb = (ConditionBindings)iter.next();
+
+ Collection nameLists = cb.getBindingsByName();
+
+ Iterator iter2 = nameLists.iterator();
+
+ while (iter2.hasNext())
+ {
+ List bindings = (List)iter2.next();
+
+ if (bindings.size() > 1)
+ {
+ RedistributionOrder order = redistributionPolicy.calculate(bindings);
+
+ if (order != null)
+ {
+ moveMessages(order.getQueueName(), order.getDestinationNodeId(), order.getNumberOfMessages());
+ }
+ }
+ }
+ }
}
+ finally
+ {
+ lock.readLock().release();
+ }
+ }
+
+ public void sendStats() throws Exception
+ {
+ lock.writeLock().acquire();
- Iterator iter2 = bindings.iterator();
+ List stats = null;
- long channelID = -1;
- boolean found = false;
-
- while (iter2.hasNext())
+ try
{
- Binding binding = (Binding)iter2.next();
- if (binding.isDurable())
+ Map nameMap = (Map)nameMaps.get(nodeId);
+
+ Iterator iter = nameMap.values().iterator();
+
+ while (iter.hasNext())
{
- found = true;
+ BalancedBinding bb = (BalancedBinding)iter.next();
- channelID = binding.getChannelId();
+ MeasuredQueue q = (MeasuredQueue)bb.getQueue();
+
+ //We don't bother sending the stat if there is less than STATS_DIFFERENCE_MARGIN_PERCENT % difference
+
+ double newRate = q.getGrowthRate();
+
+ int newMessageCount = q.messageCount();
+
+ boolean sendStats = decideToSendStats(bb.getConsumptionRate(), newRate);
+
+ if (!sendStats)
+ {
+ sendStats = decideToSendStats(bb.getMessageCount(), newMessageCount);
+ }
+
+ if (sendStats)
+ {
+ bb.setConsumptionRate(newRate);
+ bb.setMessageCount(newMessageCount);
+
+ if (stats == null)
+ {
+ stats = new ArrayList();
+ }
+ QueueStats qs = new QueueStats(bb.getQueueName(), newRate, newMessageCount);
+
+ stats.add(qs);
+ }
}
}
-
- if (!found)
+ finally
{
- throw new IllegalStateException("Cannot find bindings");
+ lock.writeLock().release();
}
- if (pm.referenceExists(channelID, holder.getMessage().getMessageID()))
+ if (stats != null)
{
- return true;
+ ClusterRequest req = new QueueStatsRequest(nodeId, stats);
+
+ asyncSendRequest(req);
}
+ }
+
+ private boolean decideToSendStats(double oldValue, double newValue)
+ {
+ boolean sendStats = false;
+
+ if (oldValue != 0)
+ {
+ int percentChange = (int)(100 * (oldValue - newValue) / oldValue);
+
+ if (Math.abs(percentChange) >= STATS_DIFFERENCE_MARGIN_PERCENT)
+ {
+ sendStats = true;
+ }
+ }
else
{
- return false;
+ if (newValue != 0)
+ {
+ sendStats = true;
+ }
}
+ return sendStats;
}
-
+
+ private static final int STATS_DIFFERENCE_MARGIN_PERCENT = 10;
+
+ public void updateQueueStats(String nodeId, List stats) throws Exception
+ {
+ lock.writeLock().acquire();
+
+ Map nameMap = (Map)nameMaps.get(nodeId);
+
+ if (nameMap == null)
+ {
+ throw new IllegalStateException("Cannot find name map for node id " + nodeId);
+ }
+
+ try
+ {
+ Iterator iter = stats.iterator();
+
+ while (iter.hasNext())
+ {
+ QueueStats st = (QueueStats)iter.next();
+
+ BalancedBinding bb = (BalancedBinding)nameMap.get(st.getQueueName());
+
+ if (bb == null)
+ {
+ throw new IllegalStateException("Cannot find binding for queue name: " + st.getQueueName());
+ }
+
+ bb.setConsumptionRate(st.getConsumptionRate());
+
+ bb.setMessageCount(st.getMessageCount());
+ }
+ }
+ finally
+ {
+ lock.writeLock().release();
+ }
+ }
+
+
+
+ // Public ------------------------------------------------------------------------------------------
+
+ // Protected ---------------------------------------------------------------------------------------
+
+ protected Binding createBinding(String nodeId, String queueName, String condition, String filter,
+ long channelId, boolean durable)
+ {
+ return new BalancedBindingImpl(nodeId, queueName, condition, filter,
+ channelId, durable);
+ }
+
protected void loadBindings() throws Exception
{
// TODO I need to know whether this call times out - how do I know this??
@@ -814,13 +980,9 @@
}
}
- // Public ------------------------------------------------------------------------------------------
-
-
- // Protected ---------------------------------------------------------------------------------------
-
-
- protected void syncSendRequest(ClusterRequest request) throws Exception
+ // Private ------------------------------------------------------------------------------------------
+
+ private void syncSendRequest(ClusterRequest request) throws Exception
{
//TODO - handle serialization more efficiently
@@ -829,54 +991,55 @@
controlMessageDispatcher.castMessage(null, message, GroupRequest.GET_ALL, castTimeout);
}
- /*
- * We have received a reference cast from another node - and we need to route it to our local
- * subscriptions
- */
-// protected void routeFromCluster(MessageReference ref, String routingKey) throws Exception
-// {
-// lock.readLock().acquire();
-//
-// try
-// {
-// // We route on the condition
-// List bindings = (List)conditionMap.get(routingKey);
-//
-// if (bindings != null)
-// {
-// Iterator iter = bindings.iterator();
-//
-// while (iter.hasNext())
-// {
-// Binding binding = (Binding)iter.next();
-//
-// if (binding.isActive())
-// {
-// if (binding.getNodeId().equals(this.nodeId))
-// {
-// //It's a local binding so we pass the message on to the subscription
-// Queue subscription = binding.getQueue();
-//
-// //TODO instead of adding a new method on the channel
-// //we should set a header and use the same method
-// subscription.handleDontPersist(null, ref, null);
-// }
-// }
-// }
-// }
-// }
-// finally
-// {
-// lock.readLock().release();
-// }
-// }
-//
-
-
- // Private ------------------------------------------------------------------------------------------
-
- private void removeBindingsForAddress(String address) throws Exception
+ private boolean checkTransaction(List messageHolders) throws Exception
{
+ Iterator iter = messageHolders.iterator();
+
+ //We only need to check that one of the refs made it to the database - the refs would have
+ //been inserted into the db transactionally, so either they're all there or none are
+ MessageHolder holder = (MessageHolder)iter.next();
+
+ List bindings = listBindingsForCondition(holder.getRoutingKey());
+
+ if (bindings == null)
+ {
+ throw new IllegalStateException("Cannot find bindings for key: " + holder.getRoutingKey());
+ }
+
+ Iterator iter2 = bindings.iterator();
+
+ long channelID = -1;
+ boolean found = false;
+
+ while (iter2.hasNext())
+ {
+ Binding binding = (Binding)iter2.next();
+
+ if (binding.isDurable())
+ {
+ found = true;
+
+ channelID = binding.getChannelId();
+ }
+ }
+
+ if (!found)
+ {
+ throw new IllegalStateException("Cannot find bindings");
+ }
+
+ if (pm.referenceExists(channelID, holder.getMessage().getMessageID()))
+ {
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+ private void removeBindingsForAddress(Address address) throws Exception
+ {
lock.writeLock().acquire();
try
@@ -888,9 +1051,9 @@
{
Map.Entry entry = (Map.Entry)iter.next();
- String str = (String)entry.getValue();
+ Address adr = (Address)entry.getValue();
- if (str.equals(address))
+ if (adr.equals(address))
{
nodeId = (String)entry.getKey();
}
@@ -989,6 +1152,41 @@
this.nodeIdAddressMap.putAll(state.getNodeIdAddressMap());
}
+ /*
+ * Move messages from queue on one node to queue on another node
+ */
+ private void moveMessages(String queueName, String toNodeId, int num) throws Throwable
+ {
+ Binding binding = getBindingForQueueName(queueName);
+
+ if (binding == null)
+ {
+ throw new IllegalStateException("Cannot find binding for queue name: " + queueName);
+ }
+
+ Queue fromQueue = binding.getQueue();
+
+ Transaction tx = tr.createTransaction();
+
+ List dels = ((MeasuredQueue)fromQueue).getDeliveries(num);
+
+ Iterator iter = dels.iterator();
+
+ MoveMessagesCallback cb = new MoveMessagesCallback(nodeId, toNodeId, queueName,
+ tx.getId(), this);
+ while (iter.hasNext())
+ {
+ Delivery del = (Delivery)iter.next();
+
+ del.acknowledge(tx);
+
+ cb.addMessage(del.getReference().getMessage());
+ }
+
+ tx.commit();
+
+ }
+
// Inner classes -------------------------------------------------------------------
/*
@@ -1100,7 +1298,7 @@
{
try
{
- removeBindingsForAddress(address.toString());
+ removeBindingsForAddress(address);
}
catch (Exception e)
{
Deleted: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MessageMover.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MessageMover.java 2006-09-13 14:37:47 UTC (rev 1286)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MessageMover.java 2006-09-13 18:23:31 UTC (rev 1287)
@@ -1,102 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.messaging.core.plugin.postoffice.cluster;
-
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-
-import org.jboss.messaging.core.Delivery;
-import org.jboss.messaging.core.local.Queue;
-import org.jboss.messaging.core.plugin.contract.Binding;
-import org.jboss.messaging.core.tx.Transaction;
-import org.jboss.messaging.core.tx.TransactionRepository;
-
-/**
- * A MessageMover
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @version <tt>$Revision: 1.1 $</tt>
- *
- * $Id$
- *
- */
-public class MessageMover
-{
- private TransactionRepository tr;
-
- private String nodeId;
-
- private RedistributionPolicy redistributionPolicy;
-
- public void moveMessages(Binding from, Binding to, int num) throws Throwable
- {
- if (!from.getNodeId().equals(this.nodeId))
- {
- throw new IllegalArgumentException("From binding must be on local node!");
- }
-
- if (to.getNodeId().equals(this.nodeId))
- {
- throw new IllegalArgumentException("To binding cannot be on local node");
- }
-
- //Consume the messages in a transaction - don't commit
- Queue fromQueue = from.getQueue();
-
- Transaction tx = tr.createTransaction();
-
- List dels = ((MeasuredQueue)fromQueue).getDeliveries(num);
-
- Iterator iter = dels.iterator();
-
- while (iter.hasNext())
- {
- Delivery del = (Delivery)iter.next();
-
- del.acknowledge(tx);
- }
-
-
-
- }
-
- public void calculateMovements(Collection nameLists)
- {
- Iterator iter = nameLists.iterator();
-
- while (iter.hasNext())
- {
- List bindings = (List)iter.next();
-
- if (bindings.size() > 1)
- {
- RedistributionOrder order = redistributionPolicy.calculate(bindings);
-
- if (order != null)
- {
-
- }
- }
- }
- }
-}
Added: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MessageRedistributor.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MessageRedistributor.java 2006-09-13 14:37:47 UTC (rev 1286)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MessageRedistributor.java 2006-09-13 18:23:31 UTC (rev 1287)
@@ -0,0 +1,100 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.plugin.postoffice.cluster;
+
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.jboss.logging.Logger;
+import org.jboss.messaging.core.plugin.contract.MessagingComponent;
+
+/**
+ * A MessageMover
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+public class MessageRedistributor implements MessagingComponent
+{
+ private static final Logger log = Logger.getLogger(MessageRedistributor.class);
+
+ private PostOfficeInternal office;
+
+ private Timer timer;
+
+ private long period;
+
+ public MessageRedistributor(PostOfficeInternal office,
+ long period)
+ {
+ this.office = office;
+
+ this.period = period;
+ }
+
+
+ // MessagingComponent overrides
+ // ---------------------------------------------------
+
+ public void start() throws Exception
+ {
+ timer = new Timer(true);
+
+ //Add a random delay to start, to prevent multiple exchanges all calculating at the same time
+
+ long delay = (long)(Math.random() * period);
+
+ timer.schedule(new RedistributeTimerTask(), delay, period);
+ }
+
+ public void stop() throws Exception
+ {
+ timer.cancel();
+ }
+
+ class RedistributeTimerTask extends TimerTask
+ {
+ public void run()
+ {
+ try
+ {
+ office.calculateRedistribution();
+ }
+ catch (Throwable t)
+ {
+ log.error("Caught Throwable in calculating message redistribution", t);
+ }
+ try
+ {
+ office.sendStats();
+ }
+ catch (Exception e)
+ {
+ log.error("Caught Exception in calculating/sending queue statistics", e);
+ }
+ }
+
+ }
+}
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MoveMessagesCallback.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MoveMessagesCallback.java 2006-09-13 14:37:47 UTC (rev 1286)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MoveMessagesCallback.java 2006-09-13 18:23:31 UTC (rev 1287)
@@ -40,15 +40,31 @@
{
private List messages;
+ private String destinationNodeId;
+
+ private String currentNodeId;
+
private String queueName;
- MoveMessagesCallback(String queueName)
+ private long txId;
+
+ private PostOfficeInternal office;
+
+ MoveMessagesCallback(String currentNodeId, String destNodeId, String queueName, long txId, PostOfficeInternal office)
{
+ this.currentNodeId = currentNodeId;
+
+ this.destinationNodeId = destNodeId;
+
this.queueName = queueName;
+ this.txId = txId;
+
+ this.office = office;
+
messages = new ArrayList();
}
-
+
void addMessage(Message msg)
{
messages.add(msg);
@@ -56,32 +72,38 @@
public void afterCommit(boolean onePhase) throws Exception
{
-
+ ClusterRequest req = new MoveTransactionRequest(currentNodeId, txId);
+
+ //We unicast the message to the node
+ office.asyncSendRequest(req, destinationNodeId);
}
public void afterPrepare() throws Exception
{
-
+ //NOOP
}
public void afterRollback(boolean onePhase) throws Exception
{
-
+ //NOOP
}
public void beforeCommit(boolean onePhase) throws Exception
{
-
+ ClusterRequest req = new MoveTransactionRequest(currentNodeId, txId, messages, queueName);
+
+ //We unicast
+ office.asyncSendRequest(req, destinationNodeId);
}
public void beforePrepare() throws Exception
{
-
+ //NOOP
}
public void beforeRollback(boolean onePhase) throws Exception
{
-
+ //NOOP
}
}
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java 2006-09-13 14:37:47 UTC (rev 1286)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java 2006-09-13 18:23:31 UTC (rev 1287)
@@ -25,6 +25,7 @@
import java.util.Map;
import org.jboss.messaging.core.Message;
+import org.jboss.messaging.core.plugin.contract.ClusteredPostOffice;
import org.jgroups.Address;
/**
@@ -40,7 +41,7 @@
* $Id$
*
*/
-interface PostOfficeInternal
+interface PostOfficeInternal extends ClusteredPostOffice
{
void addBindingFromCluster(String nodeId, String queueName, String condition,
String filterString, long channelId, boolean durable)
@@ -58,9 +59,17 @@
void asyncSendRequest(ClusterRequest request) throws Exception;
+ void asyncSendRequest(ClusterRequest request, String nodeId) throws Exception;
+
void holdTransaction(TransactionId id, ClusterTransaction tx) throws Exception;
void commitTransaction(TransactionId id) throws Exception;
void check(String nodeId) throws Exception;
+
+ void calculateRedistribution() throws Throwable;
+
+ void updateQueueStats(String nodeId, List stats) throws Exception;
+
+ void sendStats() throws Exception;
}
Added: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/QueueStats.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/QueueStats.java 2006-09-13 14:37:47 UTC (rev 1286)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/QueueStats.java 2006-09-13 18:23:31 UTC (rev 1287)
@@ -0,0 +1,66 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.plugin.postoffice.cluster;
+
+import java.io.Serializable;
+
+/**
+ * A QueueStats
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+public class QueueStats implements Serializable
+{
+ private String queueName;
+
+ private double consumptionRate;
+
+ private int messageCount;
+
+ public QueueStats(String queueName, double consumptionRate, int messageCount)
+ {
+ this.queueName = queueName;
+
+ this.consumptionRate = consumptionRate;
+
+ this.messageCount = messageCount;
+ }
+
+ public double getConsumptionRate()
+ {
+ return consumptionRate;
+ }
+
+ public int getMessageCount()
+ {
+ return messageCount;
+ }
+
+ public String getQueueName()
+ {
+ return queueName;
+ }
+}
Added: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/QueueStatsRequest.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/QueueStatsRequest.java 2006-09-13 14:37:47 UTC (rev 1286)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/QueueStatsRequest.java 2006-09-13 18:23:31 UTC (rev 1287)
@@ -0,0 +1,52 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.plugin.postoffice.cluster;
+
+import java.util.List;
+
+/**
+ * A QueueStatsRequest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+public class QueueStatsRequest implements ClusterRequest
+{
+ private String nodeId;
+
+ private List queueStats;
+
+ public QueueStatsRequest(String nodeId, List stats)
+ {
+ this.nodeId = nodeId;
+
+ this.queueStats = stats;
+ }
+
+ public void execute(PostOfficeInternal office) throws Exception
+ {
+ office.updateQueueStats(nodeId, queueStats);
+ }
+}
Modified: trunk/src/main/org/jboss/messaging/core/tx/TransactionRepository.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/tx/TransactionRepository.java 2006-09-13 14:37:47 UTC (rev 1286)
+++ trunk/src/main/org/jboss/messaging/core/tx/TransactionRepository.java 2006-09-13 18:23:31 UTC (rev 1287)
@@ -54,11 +54,11 @@
private boolean trace = log.isTraceEnabled();
- protected Map globalToLocalMap;
+ private Map globalToLocalMap;
- protected PersistenceManager persistenceManager;
+ private PersistenceManager persistenceManager;
- protected IdManager idManager;
+ private IdManager idManager;
// Static --------------------------------------------------------
@@ -151,6 +151,7 @@
if (trace) { log.trace("created transaction " + tx); }
globalToLocalMap.put(xid, tx);
+
return tx;
}
@@ -163,6 +164,11 @@
return tx;
}
+ public boolean removeTransaction(Xid xid)
+ {
+ return globalToLocalMap.remove(xid) != null;
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/ClusteredPostOfficeTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/ClusteredPostOfficeTest.java 2006-09-13 14:37:47 UTC (rev 1286)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/ClusteredPostOfficeTest.java 2006-09-13 18:23:31 UTC (rev 1287)
@@ -29,7 +29,11 @@
import org.jboss.messaging.core.message.MessageFactory;
import org.jboss.messaging.core.plugin.contract.Binding;
import org.jboss.messaging.core.plugin.contract.ClusteredPostOffice;
+import org.jboss.messaging.core.plugin.postoffice.cluster.BasicRedistributionPolicy;
import org.jboss.messaging.core.plugin.postoffice.cluster.ClusteredPostOfficeImpl;
+import org.jboss.messaging.core.plugin.postoffice.cluster.FavourLocalRoutingPolicy;
+import org.jboss.messaging.core.plugin.postoffice.cluster.RedistributionPolicy;
+import org.jboss.messaging.core.plugin.postoffice.cluster.RoutingPolicy;
import org.jboss.messaging.core.tx.Transaction;
import org.jboss.test.messaging.core.SimpleReceiver;
@@ -1170,12 +1174,16 @@
protected ClusteredPostOffice createClusteredPostOffice(String nodeId, String groupName) throws Exception
{
+ RoutingPolicy routingPolicy = new FavourLocalRoutingPolicy(nodeId);
+
+ RedistributionPolicy redistPolicy = new BasicRedistributionPolicy(nodeId);
+
ClusteredPostOfficeImpl postOffice =
new ClusteredPostOfficeImpl(sc.getDataSource(), sc.getTransactionManager(),
null, true, nodeId, "Clustered", ms, groupName,
JGroupsUtil.getControlStackProperties(50, 1),
JGroupsUtil.getDataStackProperties(50, 1),
- tr, pm, 5000, 5000);
+ tr, pm, 5000, 5000, routingPolicy, redistPolicy, 5000);
postOffice.start();
More information about the jboss-cvs-commits
mailing list