[Jboss-cvs] JBoss Messaging SVN: r1279 - in trunk: src/main/org/jboss/messaging/core src/main/org/jboss/messaging/core/plugin src/main/org/jboss/messaging/core/plugin/contract src/main/org/jboss/messaging/core/plugin/postoffice src/main/org/jboss/messaging/core/plugin/postoffice/cluster src/main/org/jboss/messaging/util tests/src/org/jboss/test/messaging/core/plugin
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Sep 12 14:14:04 EDT 2006
Author: timfox
Date: 2006-09-12 14:13:47 -0400 (Tue, 12 Sep 2006)
New Revision: 1279
Added:
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/ConditionBindings.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/BalancedBinding.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusterTransaction.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/FavourLocalRoutingPolicy.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MeasuredQueue.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MessageMover.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MoveMessagesCallback.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MoveTransactionRequest.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RedistributionOrder.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RedistributionPolicy.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RoutingPolicy.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/SendTransactionRequest.java
Removed:
trunk/src/main/org/jboss/messaging/core/plugin/contract/ib/
Modified:
trunk/src/main/org/jboss/messaging/core/ChannelSupport.java
trunk/src/main/org/jboss/messaging/core/PagingChannel.java
trunk/src/main/org/jboss/messaging/core/plugin/ClusteredPostOfficeService.java
trunk/src/main/org/jboss/messaging/core/plugin/IdBlock.java
trunk/src/main/org/jboss/messaging/core/plugin/SimplePostOfficeService.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/PostOfficeImpl.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/CastMessagesCallback.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusteredPostOfficeImpl.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MessageHolder.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MessageRequest.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MessagesRequest.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/TransactionRequest.java
trunk/src/main/org/jboss/messaging/util/StreamUtils.java
trunk/tests/src/org/jboss/test/messaging/core/plugin/SimplePostOfficeTest.java
Log:
More clustering work
Modified: trunk/src/main/org/jboss/messaging/core/ChannelSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/ChannelSupport.java 2006-09-12 00:33:57 UTC (rev 1278)
+++ trunk/src/main/org/jboss/messaging/core/ChannelSupport.java 2006-09-12 18:13:47 UTC (rev 1279)
@@ -137,7 +137,7 @@
deliveryLock = new Object();
}
-
+
// Receiver implementation ---------------------------------------
public Delivery handle(DeliveryObserver sender, MessageReference ref, Transaction tx)
Modified: trunk/src/main/org/jboss/messaging/core/PagingChannel.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/PagingChannel.java 2006-09-12 00:33:57 UTC (rev 1278)
+++ trunk/src/main/org/jboss/messaging/core/PagingChannel.java 2006-09-12 18:13:47 UTC (rev 1279)
@@ -128,7 +128,7 @@
this.downCacheSize = downCacheSize;
}
-
+
// Channel implementation
// ---------------------------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/plugin/ClusteredPostOfficeService.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/ClusteredPostOfficeService.java 2006-09-12 00:33:57 UTC (rev 1278)
+++ trunk/src/main/org/jboss/messaging/core/plugin/ClusteredPostOfficeService.java 2006-09-12 18:13:47 UTC (rev 1279)
@@ -30,6 +30,8 @@
import org.jboss.messaging.core.plugin.contract.MessagingComponent;
import org.jboss.messaging.core.plugin.contract.PersistenceManager;
import org.jboss.messaging.core.plugin.postoffice.cluster.ClusteredPostOfficeImpl;
+import org.jboss.messaging.core.plugin.postoffice.cluster.FavourLocalRoutingPolicy;
+import org.jboss.messaging.core.plugin.postoffice.cluster.RoutingPolicy;
import org.jboss.messaging.core.tx.TransactionRepository;
import org.w3c.dom.Element;
@@ -184,12 +186,15 @@
PersistenceManager pm = serverPeer.getPersistenceManagerInstance();
String nodeId = serverPeer.getServerPeerID();
+
+ RoutingPolicy policy = new FavourLocalRoutingPolicy(nodeId);
postOffice = new ClusteredPostOfficeImpl(ds, tm, sqlProperties, createTablesOnStartup,
nodeId, officeName, ms,
groupName,
syncChannelConfig, asyncChannelConfig,
- tr, pm, stateTimeout, castTimeout);
+ tr, pm, stateTimeout, castTimeout,
+ policy);
postOffice.start();
Modified: trunk/src/main/org/jboss/messaging/core/plugin/IdBlock.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/IdBlock.java 2006-09-12 00:33:57 UTC (rev 1278)
+++ trunk/src/main/org/jboss/messaging/core/plugin/IdBlock.java 2006-09-12 18:13:47 UTC (rev 1279)
@@ -44,8 +44,7 @@
protected long high;
public IdBlock()
- {
-
+ {
}
public IdBlock(long low, long high)
Modified: trunk/src/main/org/jboss/messaging/core/plugin/SimplePostOfficeService.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/SimplePostOfficeService.java 2006-09-12 00:33:57 UTC (rev 1278)
+++ trunk/src/main/org/jboss/messaging/core/plugin/SimplePostOfficeService.java 2006-09-12 18:13:47 UTC (rev 1279)
@@ -29,6 +29,7 @@
import org.jboss.messaging.core.plugin.contract.MessageStore;
import org.jboss.messaging.core.plugin.contract.MessagingComponent;
import org.jboss.messaging.core.plugin.postoffice.PostOfficeImpl;
+import org.jboss.messaging.core.tx.TransactionRepository;
/**
* A SimplePostOfficeService
@@ -115,11 +116,13 @@
MessageStore ms = serverPeer.getMessageStore();
+ TransactionRepository tr = serverPeer.getTxRepository();
+
String nodeId = serverPeer.getServerPeerID();
postOffice = new PostOfficeImpl(ds, tm, sqlProperties,
createTablesOnStartup,
- nodeId, officeName, ms);
+ nodeId, officeName, ms, tr);
postOffice.start();
Added: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/ConditionBindings.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/ConditionBindings.java 2006-09-12 00:33:57 UTC (rev 1278)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/ConditionBindings.java 2006-09-12 18:13:47 UTC (rev 1279)
@@ -0,0 +1,177 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.plugin.postoffice;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.jboss.messaging.core.plugin.contract.Binding;
+
+/**
+ * A ConditionBindings
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+public class ConditionBindings
+{
+ private List allBindings;
+
+ private List durableBindings;
+
+ private List nonDurableBindings;
+
+ // Map <name, binding or list of bindings>
+ private Map nameMap;
+
+ private String thisNode;
+
+ private int localDurableCount;
+
+ public ConditionBindings(String thisNode)
+ {
+ allBindings = new ArrayList();
+
+ durableBindings = new ArrayList();
+
+ nonDurableBindings = new ArrayList();
+
+ this.thisNode = thisNode;
+ }
+
+ public void addBinding(Binding binding)
+ {
+ if (allBindings.contains(binding))
+ {
+ throw new IllegalArgumentException("Bindings already contains binding: " + binding);
+ }
+
+ allBindings.add(binding);
+
+ if (binding.isDurable())
+ {
+ durableBindings.add(binding);
+ }
+ else
+ {
+ nonDurableBindings.add(binding);
+ }
+
+ List bindings = (List)nameMap.get(binding.getQueueName());
+
+ if (bindings == null)
+ {
+ bindings = new ArrayList();
+
+ nameMap.put(binding.getQueueName(), bindings);
+ }
+
+ bindings.add(binding);
+
+ if (binding.isDurable() && binding.getNodeId().equals(thisNode))
+ {
+ localDurableCount++;
+ }
+ }
+
+ public boolean removeBinding(Binding binding)
+ {
+ boolean removed = allBindings.remove(binding);
+
+ if (!removed)
+ {
+ return false;
+ }
+
+ if (binding.isDurable())
+ {
+ durableBindings.remove(binding);
+ }
+ else
+ {
+ nonDurableBindings.remove(binding);
+ }
+
+ List bindings = (List)nameMap.get(binding.getQueueName());
+
+ if (bindings == null)
+ {
+ throw new IllegalStateException("Cannot find bindins in name map");
+ }
+
+ removed = bindings.remove(binding);
+
+ if (!removed)
+ {
+ throw new IllegalStateException("Cannot find binding in list");
+ }
+
+ if (bindings.isEmpty())
+ {
+ nameMap.remove(binding.getQueueName());
+ }
+
+ if (binding.isDurable() && binding.getNodeId().equals(thisNode))
+ {
+ localDurableCount--;
+ }
+
+ return true;
+ }
+
+ public Collection getBindingsByName()
+ {
+ return nameMap.values();
+ }
+
+ public boolean isEmpty()
+ {
+ return nameMap.isEmpty();
+ }
+
+ public List getAllBindings()
+ {
+ return allBindings;
+ }
+
+ public int getLocalDurableCount()
+ {
+ return this.localDurableCount;
+ }
+
+ public int getDurableCount()
+ {
+ return this.durableBindings.size();
+ }
+
+ public int getRemoteDurableCount()
+ {
+ return getDurableCount() - getLocalDurableCount();
+ }
+
+
+}
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/PostOfficeImpl.java 2006-09-12 00:33:57 UTC (rev 1278)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/PostOfficeImpl.java 2006-09-12 18:13:47 UTC (rev 1279)
@@ -45,6 +45,7 @@
import org.jboss.messaging.core.plugin.contract.MessageStore;
import org.jboss.messaging.core.plugin.contract.PostOffice;
import org.jboss.messaging.core.tx.Transaction;
+import org.jboss.messaging.core.tx.TransactionRepository;
import EDU.oswego.cs.dl.util.concurrent.ReadWriteLock;
import EDU.oswego.cs.dl.util.concurrent.WriterPreferenceReadWriteLock;
@@ -69,6 +70,8 @@
protected MessageStore ms;
+ protected TransactionRepository tr;
+
protected String nodeId;
//Map <node id, Map < queue name, binding > >
@@ -82,8 +85,9 @@
}
public PostOfficeImpl(DataSource ds, TransactionManager tm, Properties sqlProperties,
- boolean createTablesOnStartup,
- String nodeId, String officeName, MessageStore ms)
+ boolean createTablesOnStartup,
+ String nodeId, String officeName, MessageStore ms,
+ TransactionRepository tr)
{
super (ds, tm, sqlProperties, createTablesOnStartup);
@@ -98,6 +102,8 @@
this.officeName = officeName;
this.ms = ms;
+
+ this.tr = tr;
}
// MessagingComponent implementation --------------------------------
@@ -277,11 +283,31 @@
try
{
- //We route on the condition
- List bindings = (List)conditionMap.get(condition);
-
- if (bindings != null)
+ ConditionBindings cb = (ConditionBindings)conditionMap.get(condition);
+
+ if (cb != null)
{
+ boolean startInternalTx = false;
+
+ if (tx == null && ref.isReliable())
+ {
+ if (cb.getDurableCount() > 1)
+ {
+ // When routing a persistent message without a transaction then we may need to start an
+ // internal transaction in order to route it.
+ // This is so we can guarantee the message is delivered to all or none of the subscriptions.
+ // We need to do this if there is more than one durable sub
+ startInternalTx = true;
+ }
+ }
+
+ if (startInternalTx)
+ {
+ tx = tr.createTransaction();
+ }
+
+ List bindings = cb.getAllBindings();
+
Iterator iter = bindings.iterator();
while (iter.hasNext())
@@ -300,7 +326,13 @@
routed = true;
}
}
- }
+ }
+
+ if (startInternalTx)
+ {
+ //TODO - do we need to rollback if an exception is thrown??
+ tx.commit();
+ }
}
return routed;
@@ -484,16 +516,16 @@
String condition = binding.getCondition();
- List bindings = (List)conditionMap.get(condition);
+ ConditionBindings bindings = (ConditionBindings)conditionMap.get(condition);
if (bindings == null)
{
- bindings = new ArrayList();
+ bindings = new ConditionBindings(this.nodeId);
conditionMap.put(condition, bindings);
}
- bindings.add(binding);
+ bindings.addBinding(binding);
}
protected Binding removeBinding(String nodeId, String queueName)
@@ -527,30 +559,20 @@
nameMaps.remove(nodeId);
}
- List bindings = (List)conditionMap.get(binding.getCondition());
+ ConditionBindings bindings = (ConditionBindings)conditionMap.get(binding.getCondition());
if (bindings == null)
{
throw new IllegalStateException("Cannot find condition bindings for " + binding.getCondition());
}
- boolean removed = bindings.remove(binding);
+ boolean removed = bindings.removeBinding(binding);
if (!removed)
{
throw new IllegalStateException("Cannot find binding in condition binding list");
- }
+ }
- if (binding == null)
- {
- throw new IllegalStateException("Channel id map does not contain binding for " + binding.getChannelId());
- }
-
- if (!removed)
- {
- throw new IllegalStateException("Cannot find binding in condition binding list");
- }
-
if (bindings.isEmpty())
{
conditionMap.remove(binding.getCondition());
Added: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/BalancedBinding.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/BalancedBinding.java 2006-09-12 00:33:57 UTC (rev 1278)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/BalancedBinding.java 2006-09-12 18:13:47 UTC (rev 1279)
@@ -0,0 +1,40 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.plugin.postoffice.cluster;
+
+import org.jboss.messaging.core.plugin.contract.Binding;
+
+/**
+ * A BalancedBinding
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+public interface BalancedBinding extends Binding
+{
+ double getConsumptionRate();
+
+ int getMessageCount();
+}
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/CastMessagesCallback.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/CastMessagesCallback.java 2006-09-12 00:33:57 UTC (rev 1278)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/CastMessagesCallback.java 2006-09-12 18:13:47 UTC (rev 1279)
@@ -23,6 +23,7 @@
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import org.jboss.messaging.core.Message;
import org.jboss.messaging.core.tx.TxCallback;
@@ -69,9 +70,9 @@
private PostOfficeInternal office;
- void addMessage(String routingKey, Message message)
+ void addMessage(String routingKey, Message message, Map queueNameToNodeIdMap)
{
- MessageHolder holder = new MessageHolder(routingKey, message);
+ MessageHolder holder = new MessageHolder(routingKey, message, queueNameToNodeIdMap);
if (message.isReliable())
{
@@ -113,7 +114,7 @@
if (persistent != null)
{
// Cast a commit message
- ClusterRequest req = new TransactionRequest(nodeId, txId);
+ ClusterRequest req = new SendTransactionRequest(nodeId, txId);
// Stack must be FIFO
office.asyncSendRequest(req);
@@ -136,7 +137,7 @@
{
//We send the persistent messages which go into the "holding area" on
//the receiving nodes
- ClusterRequest req = new TransactionRequest(nodeId, txId, persistent);
+ ClusterRequest req = new SendTransactionRequest(nodeId, txId, persistent);
//Stack must be FIFO
office.asyncSendRequest(req);
Added: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusterTransaction.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusterTransaction.java 2006-09-12 00:33:57 UTC (rev 1278)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusterTransaction.java 2006-09-12 18:13:47 UTC (rev 1279)
@@ -0,0 +1,36 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.plugin.postoffice.cluster;
+
+/**
+ * A ClusterTransaction
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+public interface ClusterTransaction
+{
+ void commit(PostOfficeInternal office) throws Exception;
+}
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusteredPostOfficeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusteredPostOfficeImpl.java 2006-09-12 00:33:57 UTC (rev 1278)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusteredPostOfficeImpl.java 2006-09-12 18:13:47 UTC (rev 1279)
@@ -22,6 +22,7 @@
package org.jboss.messaging.core.plugin.postoffice.cluster;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -32,12 +33,14 @@
import javax.transaction.TransactionManager;
import org.jboss.logging.Logger;
+import org.jboss.messaging.core.Delivery;
import org.jboss.messaging.core.MessageReference;
import org.jboss.messaging.core.local.Queue;
import org.jboss.messaging.core.plugin.contract.Binding;
import org.jboss.messaging.core.plugin.contract.ClusteredPostOffice;
import org.jboss.messaging.core.plugin.contract.MessageStore;
import org.jboss.messaging.core.plugin.contract.PersistenceManager;
+import org.jboss.messaging.core.plugin.postoffice.ConditionBindings;
import org.jboss.messaging.core.plugin.postoffice.PostOfficeImpl;
import org.jboss.messaging.core.plugin.postoffice.BindingImpl;
import org.jboss.messaging.core.tx.Transaction;
@@ -99,8 +102,6 @@
private PersistenceManager pm;
- private TransactionRepository tr;
-
private Element syncChannelConfigE;
private Element asyncChannelConfigE;
@@ -112,6 +113,8 @@
private long stateTimeout;
private long castTimeout;
+
+ private RoutingPolicy routingPolicy;
public ClusteredPostOfficeImpl()
{
@@ -136,10 +139,11 @@
Element asyncChannelConfig,
TransactionRepository tr,
PersistenceManager pm,
- long stateTimeout, long castTimeout) throws Exception
+ long stateTimeout, long castTimeout,
+ RoutingPolicy routingPolicy) throws Exception
{
this(ds, tm, sqlProperties, createTablesOnStartup, nodeId, officeName, ms,
- groupName, tr, pm, stateTimeout, castTimeout);
+ groupName, tr, pm, stateTimeout, castTimeout, routingPolicy);
this.syncChannelConfigE = syncChannelConfig;
this.asyncChannelConfigE = asyncChannelConfig;
@@ -156,10 +160,11 @@
String asyncChannelConfig,
TransactionRepository tr,
PersistenceManager pm,
- long stateTimeout, long castTimeout) throws Exception
+ long stateTimeout, long castTimeout,
+ RoutingPolicy routingPolicy) throws Exception
{
this(ds, tm, sqlProperties, createTablesOnStartup, nodeId, officeName, ms,
- groupName, tr, pm, stateTimeout, castTimeout);
+ groupName, tr, pm, stateTimeout, castTimeout, routingPolicy);
this.syncChannelConfigS = syncChannelConfig;
this.asyncChannelConfigS = asyncChannelConfig;
@@ -171,12 +176,11 @@
String groupName,
TransactionRepository tr,
PersistenceManager pm,
- long stateTimeout, long castTimeout)
+ long stateTimeout, long castTimeout,
+ RoutingPolicy routingPolicy)
{
- super (ds, tm, sqlProperties, createTablesOnStartup, nodeId, officeName, ms);
+ super (ds, tm, sqlProperties, createTablesOnStartup, nodeId, officeName, ms, tr);
- this.tr = tr;
-
this.pm = pm;
this.groupName = groupName;
@@ -185,6 +189,8 @@
this.castTimeout = castTimeout;
+ this.routingPolicy = routingPolicy;
+
init();
}
@@ -277,7 +283,7 @@
public void recover() throws Exception
{
//We send a "check" message to all nodes of the cluster
- this.asyncSendRequest(new CheckMessage(nodeId));
+ asyncSendRequest(new CheckMessage(nodeId));
}
public boolean route(MessageReference ref, String condition, Transaction tx) throws Exception
@@ -292,103 +298,133 @@
throw new IllegalArgumentException("Condition is null");
}
+ boolean routed = false;
+
lock.readLock().acquire();
try
{
- // We route on the condition
- List bindings = (List)conditionMap.get(condition);
-
- if (bindings != null)
- {
- // When routing a persistent message without a transaction then we may need to start an
- // internal transaction in order to route it.
- // We do this if the message is reliable AND:
- // (
- // a) The message needs to be routed to more than one durable subscription. This is so we
- // can guarantee the message is persisted on all the durable subscriptions or none if failure
- // occurs - i.e. the persistence is transactional
- // OR
- // b) There is at least one durable subscription on a different node.
- // In this case we need to start a transaction since we want to add a callback on the transaction
- // to cast the message to other nodes
- // )
-
- //TODO we can optimise this out by storing this as a flag somewhere
- boolean startInternalTx = false;
-
- if (tx == null)
+ ConditionBindings cb = (ConditionBindings)conditionMap.get(condition);
+
+ boolean startInternalTx = false;
+
+ if (cb != null)
+ {
+ if (tx == null && ref.isReliable())
{
- if (ref.isReliable())
+ if (!(cb.getDurableCount() == 1 && cb.getLocalDurableCount() == 1))
{
- Iterator iter = bindings.iterator();
-
- int count = 0;
-
- while (iter.hasNext())
- {
- Binding binding = (Binding)iter.next();
-
- if (binding.isDurable())
- {
- count++;
-
- if (count == 2 || !binding.getNodeId().equals(this.nodeId))
- {
- startInternalTx = true;
-
- break;
- }
- }
- }
+ // When routing a persistent message without a transaction then we may need to start an
+ // internal transaction in order to route it.
+ // This is so we can guarantee the message is delivered to all or none of the subscriptions.
+ // We need to do this if there is any other than a single local durable subscription
+ startInternalTx = true;
}
-
- if (startInternalTx)
- {
- tx = tr.createTransaction();
- }
}
-
- Iterator iter = bindings.iterator();
- boolean sendRemotely = false;
+ if (startInternalTx)
+ {
+ tx = tr.createTransaction();
+ }
+
+ //There may be no transaction in the following cases
+ //1) No transaction specified in params and reference is unreliable
+ //2) No transaction specified in params and reference is reliable and there is only one
+ // or less local durable subscription
+
+ Collection bindingLists = cb.getBindingsByName();
+ Iterator iter = bindingLists.iterator();
+
+ int numberRemote = 0;
+
+ Map queueNameNodeIdMap = null;
+
while (iter.hasNext())
{
- Binding binding = (Binding)iter.next();
+ //Each list is the list of bindings which have the same queue name
+ List bindings = (List)iter.next();
- if (binding.isActive())
- {
- if (binding.getNodeId().equals(this.nodeId))
+ //We may have more than one binding with the same queue name on different nodes in the
+ //following situations:
+ //1) When a point to point queue has been deployed across the cluster
+ //2) When a durable subscription has been created on multiple nodes to implement
+ // shared durable subscriptions.
+ //In both of these cases we only want one of the queues to receive the message, we choose which
+ //one by asking the routing policy
+ Binding binding;
+
+ if (bindings.size() == 1)
+ {
+ binding = (Binding)bindings.get(0);
+ }
+ else if (bindings.size() > 1)
+ {
+ binding = routingPolicy.choose(bindings);
+
+ if (queueNameNodeIdMap == null)
{
- //It's a local binding so we pass the message on to the subscription
- Queue subscription = binding.getQueue();
+ queueNameNodeIdMap = new HashMap();
+ }
- subscription.handle(null, ref, tx);
+ if (!binding.getNodeId().equals(this.nodeId))
+ {
+ //Chose a remote node
+ //We add the node id to the map against the name
+ //This is used on receipt to work out if a particular queue should
+ //accept the message, when multicasted
+ queueNameNodeIdMap.put(binding.getQueueName(), binding.getNodeId());
}
- else
+ }
+ else
+ {
+ throw new IllegalStateException("No bindings in list!");
+ }
+
+ if (binding.getNodeId().equals(this.nodeId))
+ {
+ //It's a local binding so we pass the message on to the queue
+ Queue queue = binding.getQueue();
+
+ Delivery del = queue.handle(null, ref, tx);
+
+ if (del != null && del.isSelectorAccepted())
{
- //It's a binding on a different office instance on the cluster
- sendRemotely = true;
-
- if (ref.isReliable() && binding.isDurable())
- {
- //Insert the reference into the database
- pm.addReference(binding.getChannelId(), ref, tx);
- }
- }
+ routed = true;
+ }
}
- }
+ else
+ {
+ //It's a binding on a different office instance on the cluster
+ numberRemote++;
+
+ if (ref.isReliable() && binding.isDurable())
+ {
+ //Insert the reference into the database
+
+ //TODO perhaps we should do this via a stub queue class
+ pm.addReference(binding.getChannelId(), ref, tx);
+ }
+
+ routed = true;
+ }
+ }
+
+ //Now we've sent the message to any local queues, we might also need
+ //to send the message to the other office instances on the cluster if there are
+ //queues on those nodes that need to receive the message
- //Now we've sent the message to all the local subscriptions, we might also need
- //to multicast the message to the other office instances on the cluster if there are
- //subscriptions on those nodes that need to receive the message
- if (sendRemotely)
+ if (numberRemote > 0)
{
+ //TODO - If numberRemote == 1, we could do unicast rather than multicast
+ //This would avoid putting strain on nodes that don't need to receive the reference
+ //This would be the case for load balancing queues where the routing policy
+ //sometimes allows a remote queue to get the reference
+
if (tx == null)
{
- //We just throw the message on the network - no need to wait for any reply
- asyncSendRequest(new MessageRequest(condition, ref.getMessage()));
+ //We just throw the message on the network - no need to wait for any reply
+ asyncSendRequest(new MessageRequest(condition, ref.getMessage(), queueNameNodeIdMap));
}
else
{
@@ -402,12 +438,13 @@
tx.addFirstCallback(callback, this);
}
- callback.addMessage(condition, ref.getMessage());
+ callback.addMessage(condition, ref.getMessage(), queueNameNodeIdMap);
}
}
if (startInternalTx)
{
+ // TODO - do we need to rollback if an exception is thrown??
tx.commit();
}
}
@@ -417,9 +454,7 @@
lock.readLock().release();
}
- // We don't care if the individual subscriptions accepted the reference
- // We always return true
- return true;
+ return routed;
}
// PostOfficeInternal implementation ------------------------------------------------------------------
@@ -506,9 +541,55 @@
}
}
- public void routeFromCluster(org.jboss.messaging.core.Message message, String routingKey) throws Exception
+ public void addToQueue(String queueName, List messages) throws Exception
{
lock.readLock().acquire();
+
+ try
+ {
+ Binding binding = this.getBindingForQueueName(queueName);
+
+ if (binding == null)
+ {
+ throw new IllegalStateException("Cannot find binding for queue name " + queueName);
+ }
+
+ Queue queue = binding.getQueue();
+
+ Iterator iter = messages.iterator();
+
+ while (iter.hasNext())
+ {
+ MessageReference ref = null;
+
+ try
+ {
+ org.jboss.messaging.core.Message msg = (org.jboss.messaging.core.Message)iter.next();
+
+ ref = ms.reference(msg);
+
+ queue.handleDontPersist(null, ref, null);
+ }
+ finally
+ {
+ if (ref != null)
+ {
+ ref.releaseMemoryReference();
+ }
+ }
+ }
+ }
+ finally
+ {
+
+ lock.readLock().release();
+ }
+ }
+
+ public void routeFromCluster(org.jboss.messaging.core.Message message, String routingKey,
+ Map queueNameNodeIdMap) throws Exception
+ {
+ lock.readLock().acquire();
// Need to reference the message
MessageReference ref = null;
@@ -517,26 +598,46 @@
ref = ms.reference(message);
// We route on the condition
- List bindings = (List)conditionMap.get(routingKey);
+ ConditionBindings cb = (ConditionBindings)conditionMap.get(routingKey);
- if (bindings != null)
+ if (cb != null)
{
+ List bindings = cb.getAllBindings();
+
Iterator iter = bindings.iterator();
while (iter.hasNext())
{
Binding binding = (Binding)iter.next();
-
+
if (binding.isActive())
{
if (binding.getNodeId().equals(this.nodeId))
{
- //It's a local binding so we pass the message on to the subscription
- Queue subscription = binding.getQueue();
-
- //TODO instead of adding a new method on the channel
- //we should set a header and use the same method
- subscription.handleDontPersist(null, ref, null);
+ boolean handle = true;
+
+ if (queueNameNodeIdMap != null)
+ {
+ String desiredNodeId = (String)queueNameNodeIdMap.get(binding.getQueueName());
+
+ //When there are more than one queues with the same name across the cluster we only
+ //want to chose one of them
+
+ if (desiredNodeId != null)
+ {
+ handle = desiredNodeId.equals(nodeId);
+ }
+ }
+
+ if (handle)
+ {
+ //It's a local binding so we pass the message on to the subscription
+ Queue subscription = binding.getQueue();
+
+ //TODO instead of adding a new method on the channel
+ //we should set a header and use the same method
+ subscription.handleDontPersist(null, ref, null);
+ }
}
}
}
@@ -558,36 +659,32 @@
asyncChannel.send(new Message(null, null, request));
}
- public void addToHoldingArea(TransactionId id, List messageHolders) throws Exception
+ /*
+ * We put the transaction in the holding area
+ */
+ public void holdTransaction(TransactionId id, ClusterTransaction tx) throws Exception
{
synchronized (holdingArea)
{
- holdingArea.put(id, messageHolders);
- }
+ holdingArea.put(id, tx);
+ }
}
-
+
public void commitTransaction(TransactionId id) throws Exception
{
- List messageHolders = null;
+ ClusterTransaction tx = null;
synchronized (holdingArea)
{
- messageHolders = (List)holdingArea.remove(id);
+ tx = (ClusterTransaction)holdingArea.remove(id);
}
- if (messageHolders == null)
+ if (tx == null)
{
- throw new IllegalStateException("Cannot find messages for transaction id: " + id);
+ throw new IllegalStateException("Cannot find transaction transaction id: " + id);
}
- Iterator iter = messageHolders.iterator();
-
- while (iter.hasNext())
- {
- MessageHolder holder = (MessageHolder)iter.next();
-
- routeFromCluster(holder.getMessage(), holder.getRoutingKey());
- }
+ tx.commit(this);
}
/*
@@ -624,7 +721,7 @@
{
MessageHolder holder = (MessageHolder)iter2.next();
- routeFromCluster(holder.getMessage(), holder.getRoutingKey());
+ routeFromCluster(holder.getMessage(), holder.getRoutingKey(), holder.getQueueNameToNodeIdMap());
}
toRemove.add(id);
@@ -736,43 +833,45 @@
* We have received a reference cast from another node - and we need to route it to our local
* subscriptions
*/
- protected void routeFromCluster(MessageReference ref, String routingKey) throws Exception
- {
- lock.readLock().acquire();
-
- try
- {
- // We route on the condition
- List bindings = (List)conditionMap.get(routingKey);
-
- if (bindings != null)
- {
- Iterator iter = bindings.iterator();
-
- while (iter.hasNext())
- {
- Binding binding = (Binding)iter.next();
-
- if (binding.isActive())
- {
- if (binding.getNodeId().equals(this.nodeId))
- {
- //It's a local binding so we pass the message on to the subscription
- Queue subscription = binding.getQueue();
-
- //TODO instead of adding a new method on the channel
- //we should set a header and use the same method
- subscription.handleDontPersist(null, ref, null);
- }
- }
- }
- }
- }
- finally
- {
- lock.readLock().release();
- }
- }
+// protected void routeFromCluster(MessageReference ref, String routingKey) throws Exception
+// {
+// lock.readLock().acquire();
+//
+// try
+// {
+// // We route on the condition
+// List bindings = (List)conditionMap.get(routingKey);
+//
+// if (bindings != null)
+// {
+// Iterator iter = bindings.iterator();
+//
+// while (iter.hasNext())
+// {
+// Binding binding = (Binding)iter.next();
+//
+// if (binding.isActive())
+// {
+// if (binding.getNodeId().equals(this.nodeId))
+// {
+// //It's a local binding so we pass the message on to the subscription
+// Queue subscription = binding.getQueue();
+//
+// //TODO instead of adding a new method on the channel
+// //we should set a header and use the same method
+// subscription.handleDontPersist(null, ref, null);
+// }
+// }
+// }
+// }
+// }
+// finally
+// {
+// lock.readLock().release();
+// }
+// }
+//
+
// Private ------------------------------------------------------------------------------------------
Added: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/FavourLocalRoutingPolicy.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/FavourLocalRoutingPolicy.java 2006-09-12 00:33:57 UTC (rev 1278)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/FavourLocalRoutingPolicy.java 2006-09-12 18:13:47 UTC (rev 1279)
@@ -0,0 +1,68 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.plugin.postoffice.cluster;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.jboss.messaging.core.plugin.contract.Binding;
+
+/**
+ * A FavourLocalRoutingPolicy
+ *
+ * This routing policy always favours the local queue
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+public class FavourLocalRoutingPolicy implements RoutingPolicy
+{
+ private String localNodeId;
+
+ public FavourLocalRoutingPolicy(String localNodeId)
+ {
+ this.localNodeId = localNodeId;
+ }
+
+ public Binding choose(List bindings)
+ {
+ Iterator iter = bindings.iterator();
+
+ Binding binding = null;
+
+ while (iter.hasNext())
+ {
+ binding = (Binding)iter.next();
+
+ if (binding.getNodeId().equals(localNodeId))
+ {
+ return binding;
+ }
+ }
+
+ return binding;
+ }
+
+}
Added: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MeasuredQueue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MeasuredQueue.java 2006-09-12 00:33:57 UTC (rev 1278)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MeasuredQueue.java 2006-09-12 18:13:47 UTC (rev 1279)
@@ -0,0 +1,136 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.plugin.postoffice.cluster;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.jboss.messaging.core.Delivery;
+import org.jboss.messaging.core.Filter;
+import org.jboss.messaging.core.MessageReference;
+import org.jboss.messaging.core.SimpleDelivery;
+import org.jboss.messaging.core.local.Queue;
+import org.jboss.messaging.core.plugin.contract.MessageStore;
+import org.jboss.messaging.core.plugin.contract.PersistenceManager;
+
+import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
+
+/**
+ *
+ * A MeasuredQueue
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+public class MeasuredQueue extends Queue
+{
+ private static final int MIN_PERIOD = 1000;
+
+ private long lastTime;
+
+ private double lastGrowthRate;
+
+ private volatile int numberAdded;
+
+ private volatile int numberConsumed;
+
+ public MeasuredQueue(long id, MessageStore ms, PersistenceManager pm, boolean acceptReliableMessages, boolean recoverable, int fullSize, int pageSize, int downCacheSize, QueuedExecutor executor, Filter filter)
+ {
+ super(id, ms, pm, acceptReliableMessages, recoverable, fullSize, pageSize,
+ downCacheSize, executor, filter);
+
+ lastTime = System.currentTimeMillis();
+
+ numberAdded = numberConsumed = 0;
+ }
+
+ /**
+ *
+ * @return The rate of growth in messages per second of the queue
+ * Rate of growth is defined as follows:
+ * growth = (number of messages added - number of messages consumed) / time
+ */
+ public synchronized double getGrowthRate()
+ {
+ long now = System.currentTimeMillis();
+
+ long period = now - lastTime;
+
+ if (period <= MIN_PERIOD)
+ {
+ //Cache the value to avoid recalculating too often
+ return lastGrowthRate;
+ }
+
+ lastGrowthRate = 1000 * (numberAdded - numberConsumed) / ((double)period);
+
+ lastTime = now;
+
+ numberAdded = numberConsumed = 0;
+
+ return lastGrowthRate;
+ }
+
+ public List getDeliveries(int number) throws Exception
+ {
+ List dels = new ArrayList();
+
+ synchronized (refLock)
+ {
+ synchronized (deliveryLock)
+ {
+ MessageReference ref;
+
+ while ((ref = removeFirstInMemory()) != null)
+ {
+ SimpleDelivery del = new SimpleDelivery(this, ref);
+
+ deliveries.add(del);
+
+ dels.add(del);
+ }
+ return dels;
+ }
+ }
+ }
+
+ protected void addReferenceInMemory(MessageReference ref) throws Exception
+ {
+ super.addReferenceInMemory(ref);
+
+ //This is ok, since the channel ensures only one thread calls this method at once
+ numberAdded++;
+ }
+
+ protected boolean acknowledgeInMemory(Delivery d)
+ {
+ boolean acked = super.acknowledgeInMemory(d);
+
+ // This is ok, since the channel ensures only one thread calls this method at once
+ numberConsumed--;
+
+ return acked;
+ }
+}
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MessageHolder.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MessageHolder.java 2006-09-12 00:33:57 UTC (rev 1278)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MessageHolder.java 2006-09-12 18:13:47 UTC (rev 1279)
@@ -22,6 +22,7 @@
package org.jboss.messaging.core.plugin.postoffice.cluster;
import java.io.Serializable;
+import java.util.Map;
import org.jboss.messaging.core.Message;
@@ -36,15 +37,19 @@
*/
class MessageHolder implements Serializable
{
- String routingKey;
+ private String routingKey;
- Message message;
+ private Message message;
- MessageHolder(String routingKey, Message message)
+ private Map queueNameToNodeIdMap;
+
+ MessageHolder(String routingKey, Message message, Map queueNameToNodeIdMap)
{
this.routingKey = routingKey;
this.message = message;
+
+ this.queueNameToNodeIdMap = queueNameToNodeIdMap;
}
String getRoutingKey()
@@ -56,4 +61,9 @@
{
return message;
}
+
+ Map getQueueNameToNodeIdMap()
+ {
+ return queueNameToNodeIdMap;
+ }
}
Added: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MessageMover.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MessageMover.java 2006-09-12 00:33:57 UTC (rev 1278)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MessageMover.java 2006-09-12 18:13:47 UTC (rev 1279)
@@ -0,0 +1,102 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.plugin.postoffice.cluster;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+import org.jboss.messaging.core.Delivery;
+import org.jboss.messaging.core.local.Queue;
+import org.jboss.messaging.core.plugin.contract.Binding;
+import org.jboss.messaging.core.tx.Transaction;
+import org.jboss.messaging.core.tx.TransactionRepository;
+
+/**
+ * A MessageMover
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+public class MessageMover
+{
+ private TransactionRepository tr;
+
+ private String nodeId;
+
+ private RedistributionPolicy redistributionPolicy;
+
+ public void moveMessages(Binding from, Binding to, int num) throws Throwable
+ {
+ if (!from.getNodeId().equals(this.nodeId))
+ {
+ throw new IllegalArgumentException("From binding must be on local node!");
+ }
+
+ if (to.getNodeId().equals(this.nodeId))
+ {
+ throw new IllegalArgumentException("To binding cannot be on local node");
+ }
+
+ //Consume the messages in a transaction - don't commit
+ Queue fromQueue = from.getQueue();
+
+ Transaction tx = tr.createTransaction();
+
+ List dels = ((MeasuredQueue)fromQueue).getDeliveries(num);
+
+ Iterator iter = dels.iterator();
+
+ while (iter.hasNext())
+ {
+ Delivery del = (Delivery)iter.next();
+
+ del.acknowledge(tx);
+ }
+
+
+
+ }
+
+ public void calculateMovements(Collection nameLists)
+ {
+ Iterator iter = nameLists.iterator();
+
+ while (iter.hasNext())
+ {
+ List bindings = (List)iter.next();
+
+ if (bindings.size() > 1)
+ {
+ RedistributionOrder order = redistributionPolicy.calculate(bindings);
+
+ if (order != null)
+ {
+
+ }
+ }
+ }
+ }
+}
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MessageRequest.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MessageRequest.java 2006-09-12 00:33:57 UTC (rev 1278)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MessageRequest.java 2006-09-12 18:13:47 UTC (rev 1279)
@@ -21,6 +21,8 @@
*/
package org.jboss.messaging.core.plugin.postoffice.cluster;
+import java.util.Map;
+
import org.jboss.messaging.core.Message;
/**
@@ -42,15 +44,19 @@
private Message message;
- MessageRequest(String routingKey, Message message)
+ private Map queueNameNodeIdMap;
+
+ MessageRequest(String routingKey, Message message, Map queueNameNodeIdMap)
{
this.routingKey = routingKey;
this.message = message;
+
+ this.queueNameNodeIdMap = queueNameNodeIdMap;
}
public void execute(PostOfficeInternal office) throws Exception
{
- office.routeFromCluster(message, routingKey);
+ office.routeFromCluster(message, routingKey, queueNameNodeIdMap);
}
}
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MessagesRequest.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MessagesRequest.java 2006-09-12 00:33:57 UTC (rev 1278)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MessagesRequest.java 2006-09-12 18:13:47 UTC (rev 1279)
@@ -54,7 +54,7 @@
{
MessageHolder holder = (MessageHolder)iter.next();
- office.routeFromCluster(holder.getMessage(), holder.getRoutingKey());
+ office.routeFromCluster(holder.getMessage(), holder.getRoutingKey(), holder.getQueueNameToNodeIdMap());
}
}
}
Added: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MoveMessagesCallback.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MoveMessagesCallback.java 2006-09-12 00:33:57 UTC (rev 1278)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MoveMessagesCallback.java 2006-09-12 18:13:47 UTC (rev 1279)
@@ -0,0 +1,87 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.plugin.postoffice.cluster;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.jboss.messaging.core.Message;
+import org.jboss.messaging.core.tx.TxCallback;
+
+/**
+ * A MoveMessagesCallback
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+public class MoveMessagesCallback implements TxCallback
+{
+ private List messages;
+
+ private String queueName;
+
+ MoveMessagesCallback(String queueName)
+ {
+ this.queueName = queueName;
+
+ messages = new ArrayList();
+ }
+
+ void addMessage(Message msg)
+ {
+ messages.add(msg);
+ }
+
+ public void afterCommit(boolean onePhase) throws Exception
+ {
+
+ }
+
+ public void afterPrepare() throws Exception
+ {
+
+ }
+
+ public void afterRollback(boolean onePhase) throws Exception
+ {
+
+ }
+
+ public void beforeCommit(boolean onePhase) throws Exception
+ {
+
+ }
+
+ public void beforePrepare() throws Exception
+ {
+
+ }
+
+ public void beforeRollback(boolean onePhase) throws Exception
+ {
+
+ }
+
+}
Added: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MoveTransactionRequest.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MoveTransactionRequest.java 2006-09-12 00:33:57 UTC (rev 1278)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MoveTransactionRequest.java 2006-09-12 18:13:47 UTC (rev 1279)
@@ -0,0 +1,62 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.plugin.postoffice.cluster;
+
+import java.util.List;
+
+/**
+ *
+ * A MoveTransactionRequest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+class MoveTransactionRequest extends TransactionRequest
+{
+ private List messages;
+
+ private String queueName;
+
+ MoveTransactionRequest(String nodeId, long txId, List messages, String queueName)
+ {
+ super(nodeId, txId, true);
+
+ this.messages = messages;
+
+ this.queueName = queueName;
+ }
+
+ MoveTransactionRequest(String nodeId, long txId)
+ {
+ super(nodeId, txId, false);
+ }
+
+ public void commit(PostOfficeInternal office) throws Exception
+ {
+ office.addToQueue(queueName, messages);
+ }
+}
+
+
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java 2006-09-12 00:33:57 UTC (rev 1278)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java 2006-09-12 18:13:47 UTC (rev 1279)
@@ -22,6 +22,7 @@
package org.jboss.messaging.core.plugin.postoffice.cluster;
import java.util.List;
+import java.util.Map;
import org.jboss.messaging.core.Message;
import org.jgroups.Address;
@@ -29,6 +30,9 @@
/**
*
* A PostOfficeInternal
+ *
+ * Extension to the ClusteredPostOffice interface that expose extra methods useful to
+ * ClusteredRequests
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @version <tt>$Revision: 1.1 $</tt>
@@ -48,11 +52,13 @@
void handleAddressNodeMapping(Address address, String nodeId)
throws Exception;
- void routeFromCluster(Message message, String routingKey) throws Exception;
+ void routeFromCluster(Message message, String routingKey, Map queueNameNodeIdMap) throws Exception;
+ void addToQueue(String queueName, List messages) throws Exception;
+
void asyncSendRequest(ClusterRequest request) throws Exception;
- void addToHoldingArea(TransactionId id, List messageHolders) throws Exception;
+ void holdTransaction(TransactionId id, ClusterTransaction tx) throws Exception;
void commitTransaction(TransactionId id) throws Exception;
Added: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RedistributionOrder.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RedistributionOrder.java 2006-09-12 00:33:57 UTC (rev 1278)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RedistributionOrder.java 2006-09-12 18:13:47 UTC (rev 1279)
@@ -0,0 +1,65 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.plugin.postoffice.cluster;
+
+/**
+ *
+ * A RedistributionOrder
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+public class RedistributionOrder
+{
+ private int numberOfMessages;
+
+ private String queueName;
+
+ private String destinationNodeId;
+
+ public RedistributionOrder(int numberOfMessages, String queueName, String destinationNodeId)
+ {
+ this.numberOfMessages = numberOfMessages;
+
+ this.queueName = queueName;
+
+ this.destinationNodeId = destinationNodeId;
+ }
+
+ public String getDestinationNodeId()
+ {
+ return destinationNodeId;
+ }
+
+ public int getNumberOfMessages()
+ {
+ return numberOfMessages;
+ }
+
+ public String getQueueName()
+ {
+ return queueName;
+ }
+}
Added: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RedistributionPolicy.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RedistributionPolicy.java 2006-09-12 00:33:57 UTC (rev 1278)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RedistributionPolicy.java 2006-09-12 18:13:47 UTC (rev 1279)
@@ -0,0 +1,38 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.plugin.postoffice.cluster;
+
+import java.util.List;
+
+/**
+ * A RedistributionPolicy
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+public interface RedistributionPolicy
+{
+ RedistributionOrder calculate(List bindings);
+}
Added: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RoutingPolicy.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RoutingPolicy.java 2006-09-12 00:33:57 UTC (rev 1278)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RoutingPolicy.java 2006-09-12 18:13:47 UTC (rev 1279)
@@ -0,0 +1,40 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.plugin.postoffice.cluster;
+
+import java.util.List;
+
+import org.jboss.messaging.core.plugin.contract.Binding;
+
+/**
+ * A RoutingPolicy
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+public interface RoutingPolicy
+{
+ Binding choose(List bindings);
+}
Added: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/SendTransactionRequest.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/SendTransactionRequest.java 2006-09-12 00:33:57 UTC (rev 1278)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/SendTransactionRequest.java 2006-09-12 18:13:47 UTC (rev 1279)
@@ -0,0 +1,66 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.plugin.postoffice.cluster;
+
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * A SendTransactionRequest
+ *
+ * Used for sending persistent messages transactionally across the network
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+class SendTransactionRequest extends TransactionRequest
+{
+ private List messageHolders;
+
+ SendTransactionRequest(String nodeId, long txId, List messageHolders)
+ {
+ super(nodeId, txId, true);
+
+ this.messageHolders = messageHolders;
+ }
+
+ SendTransactionRequest(String nodeId, long txId)
+ {
+ super(nodeId, txId, false);
+ }
+
+ public void commit(PostOfficeInternal office) throws Exception
+ {
+ Iterator iter = messageHolders.iterator();
+
+ while (iter.hasNext())
+ {
+ MessageHolder holder = (MessageHolder)iter.next();
+
+ office.routeFromCluster(holder.getMessage(), holder.getRoutingKey(), holder.getQueueNameToNodeIdMap());
+ }
+ }
+}
+
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/TransactionRequest.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/TransactionRequest.java 2006-09-12 00:33:57 UTC (rev 1278)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/TransactionRequest.java 2006-09-12 18:13:47 UTC (rev 1279)
@@ -21,58 +21,40 @@
*/
package org.jboss.messaging.core.plugin.postoffice.cluster;
-import java.util.List;
/**
* A TransactionRequest
*
- * Used for sending persistent messages transactionally across the network
- *
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @version <tt>$Revision: 1.1 $</tt>
*
* $Id$
*
*/
-class TransactionRequest implements ClusterRequest
+abstract class TransactionRequest implements ClusterRequest, ClusterTransaction
{
- private static final long serialVersionUID = 644500948910063649L;
-
private String nodeId;
private long txId;
- private List messageHolders;
-
- private boolean tryTransaction;
+ private boolean hold;
- TransactionRequest(String nodeId, long txId, List messageHolders)
+ TransactionRequest(String nodeId, long txId, boolean hold)
{
this.nodeId = nodeId;
this.txId= txId;
- this.messageHolders = messageHolders;
-
- tryTransaction = true;
+ this.hold = hold;
}
- TransactionRequest(String nodeId, long txId)
- {
- this.nodeId = nodeId;
-
- this.txId= txId;
-
- tryTransaction = false;
- }
-
public void execute(PostOfficeInternal office) throws Exception
{
TransactionId id = new TransactionId(nodeId, txId);
- if (tryTransaction)
+ if (hold)
{
- office.addToHoldingArea(id, messageHolders);
+ office.holdTransaction(id, this);
}
else
{
@@ -81,3 +63,4 @@
}
}
+
Modified: trunk/src/main/org/jboss/messaging/util/StreamUtils.java
===================================================================
--- trunk/src/main/org/jboss/messaging/util/StreamUtils.java 2006-09-12 00:33:57 UTC (rev 1278)
+++ trunk/src/main/org/jboss/messaging/util/StreamUtils.java 2006-09-12 18:13:47 UTC (rev 1279)
@@ -32,8 +32,6 @@
import java.util.Map;
import java.util.Set;
-import org.jboss.util.Primitives;
-
/**
* A StreamUtils
*
@@ -101,7 +99,7 @@
value = new Double(in.readDouble());
break;
case BOOLEAN :
- value = Primitives.valueOf(in.readBoolean());
+ value = in.readBoolean() ? Boolean.TRUE : Boolean.FALSE;
break;
case STRING :
if (longStrings)
Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/SimplePostOfficeTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/SimplePostOfficeTest.java 2006-09-12 00:33:57 UTC (rev 1278)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/SimplePostOfficeTest.java 2006-09-12 18:13:47 UTC (rev 1279)
@@ -972,7 +972,7 @@
{
PostOfficeImpl postOffice =
new PostOfficeImpl(sc.getDataSource(), sc.getTransactionManager(),
- null, true, "node1", "Simple", ms);
+ null, true, "node1", "Simple", ms, tr);
postOffice.start();
More information about the jboss-cvs-commits
mailing list