[Jboss-cvs] JBoss Messaging SVN: r1279 - in trunk: src/main/org/jboss/messaging/core src/main/org/jboss/messaging/core/plugin src/main/org/jboss/messaging/core/plugin/contract src/main/org/jboss/messaging/core/plugin/postoffice src/main/org/jboss/messaging/core/plugin/postoffice/cluster src/main/org/jboss/messaging/util tests/src/org/jboss/test/messaging/core/plugin

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Sep 12 14:14:04 EDT 2006


Author: timfox
Date: 2006-09-12 14:13:47 -0400 (Tue, 12 Sep 2006)
New Revision: 1279

Added:
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/ConditionBindings.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/BalancedBinding.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusterTransaction.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/FavourLocalRoutingPolicy.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MeasuredQueue.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MessageMover.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MoveMessagesCallback.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MoveTransactionRequest.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RedistributionOrder.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RedistributionPolicy.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RoutingPolicy.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/SendTransactionRequest.java
Removed:
   trunk/src/main/org/jboss/messaging/core/plugin/contract/ib/
Modified:
   trunk/src/main/org/jboss/messaging/core/ChannelSupport.java
   trunk/src/main/org/jboss/messaging/core/PagingChannel.java
   trunk/src/main/org/jboss/messaging/core/plugin/ClusteredPostOfficeService.java
   trunk/src/main/org/jboss/messaging/core/plugin/IdBlock.java
   trunk/src/main/org/jboss/messaging/core/plugin/SimplePostOfficeService.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/PostOfficeImpl.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/CastMessagesCallback.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusteredPostOfficeImpl.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MessageHolder.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MessageRequest.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MessagesRequest.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/TransactionRequest.java
   trunk/src/main/org/jboss/messaging/util/StreamUtils.java
   trunk/tests/src/org/jboss/test/messaging/core/plugin/SimplePostOfficeTest.java
Log:
More clustering work



Modified: trunk/src/main/org/jboss/messaging/core/ChannelSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/ChannelSupport.java	2006-09-12 00:33:57 UTC (rev 1278)
+++ trunk/src/main/org/jboss/messaging/core/ChannelSupport.java	2006-09-12 18:13:47 UTC (rev 1279)
@@ -137,7 +137,7 @@
 
       deliveryLock = new Object();
    }
-
+   
    // Receiver implementation ---------------------------------------
 
    public Delivery handle(DeliveryObserver sender, MessageReference ref, Transaction tx)

Modified: trunk/src/main/org/jboss/messaging/core/PagingChannel.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/PagingChannel.java	2006-09-12 00:33:57 UTC (rev 1278)
+++ trunk/src/main/org/jboss/messaging/core/PagingChannel.java	2006-09-12 18:13:47 UTC (rev 1279)
@@ -128,7 +128,7 @@
 
       this.downCacheSize = downCacheSize;      
    }
-   
+    
    // Channel implementation
    // ---------------------------------------------------------------
    

Modified: trunk/src/main/org/jboss/messaging/core/plugin/ClusteredPostOfficeService.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/ClusteredPostOfficeService.java	2006-09-12 00:33:57 UTC (rev 1278)
+++ trunk/src/main/org/jboss/messaging/core/plugin/ClusteredPostOfficeService.java	2006-09-12 18:13:47 UTC (rev 1279)
@@ -30,6 +30,8 @@
 import org.jboss.messaging.core.plugin.contract.MessagingComponent;
 import org.jboss.messaging.core.plugin.contract.PersistenceManager;
 import org.jboss.messaging.core.plugin.postoffice.cluster.ClusteredPostOfficeImpl;
+import org.jboss.messaging.core.plugin.postoffice.cluster.FavourLocalRoutingPolicy;
+import org.jboss.messaging.core.plugin.postoffice.cluster.RoutingPolicy;
 import org.jboss.messaging.core.tx.TransactionRepository;
 import org.w3c.dom.Element;
 
@@ -184,12 +186,15 @@
          PersistenceManager pm = serverPeer.getPersistenceManagerInstance();
          
          String nodeId = serverPeer.getServerPeerID();
+         
+         RoutingPolicy policy = new FavourLocalRoutingPolicy(nodeId);
                   
          postOffice =  new ClusteredPostOfficeImpl(ds, tm, sqlProperties, createTablesOnStartup,
                                                nodeId, officeName, ms,
                                                groupName,
                                                syncChannelConfig, asyncChannelConfig,
-                                               tr, pm, stateTimeout, castTimeout);
+                                               tr, pm, stateTimeout, castTimeout,
+                                               policy);
          
          postOffice.start();
          

Modified: trunk/src/main/org/jboss/messaging/core/plugin/IdBlock.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/IdBlock.java	2006-09-12 00:33:57 UTC (rev 1278)
+++ trunk/src/main/org/jboss/messaging/core/plugin/IdBlock.java	2006-09-12 18:13:47 UTC (rev 1279)
@@ -44,8 +44,7 @@
    protected long high;
    
    public IdBlock()
-   {
-      
+   {      
    }
    
    public IdBlock(long low, long high)

Modified: trunk/src/main/org/jboss/messaging/core/plugin/SimplePostOfficeService.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/SimplePostOfficeService.java	2006-09-12 00:33:57 UTC (rev 1278)
+++ trunk/src/main/org/jboss/messaging/core/plugin/SimplePostOfficeService.java	2006-09-12 18:13:47 UTC (rev 1279)
@@ -29,6 +29,7 @@
 import org.jboss.messaging.core.plugin.contract.MessageStore;
 import org.jboss.messaging.core.plugin.contract.MessagingComponent;
 import org.jboss.messaging.core.plugin.postoffice.PostOfficeImpl;
+import org.jboss.messaging.core.tx.TransactionRepository;
 
 /**
  * A SimplePostOfficeService
@@ -115,11 +116,13 @@
          
          MessageStore ms = serverPeer.getMessageStore();
          
+         TransactionRepository tr = serverPeer.getTxRepository();
+         
          String nodeId = serverPeer.getServerPeerID();
                   
          postOffice = new PostOfficeImpl(ds, tm, sqlProperties,
                                            createTablesOnStartup,
-                                           nodeId, officeName, ms);
+                                           nodeId, officeName, ms, tr);
          
          postOffice.start();
          

Added: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/ConditionBindings.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/ConditionBindings.java	2006-09-12 00:33:57 UTC (rev 1278)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/ConditionBindings.java	2006-09-12 18:13:47 UTC (rev 1279)
@@ -0,0 +1,177 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.plugin.postoffice;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.jboss.messaging.core.plugin.contract.Binding;
+
+/**
+ * A ConditionBindings
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+public class ConditionBindings
+{
+   private List allBindings;
+   
+   private List durableBindings;
+   
+   private List nonDurableBindings;
+   
+   // Map <name, binding or list of bindings>
+   private Map nameMap;
+   
+   private String thisNode;
+   
+   private int localDurableCount;
+   
+   public ConditionBindings(String thisNode)
+   {
+      allBindings = new ArrayList();
+      
+      durableBindings = new ArrayList();
+      
+      nonDurableBindings = new ArrayList();
+      
+      this.thisNode = thisNode;
+   }
+   
+   public void addBinding(Binding binding)
+   {
+      if (allBindings.contains(binding))
+      {
+         throw new IllegalArgumentException("Bindings already contains binding: " + binding);
+      }
+               
+      allBindings.add(binding);
+      
+      if (binding.isDurable())
+      {
+         durableBindings.add(binding);
+      }
+      else
+      {
+         nonDurableBindings.add(binding);
+      }
+      
+      List bindings = (List)nameMap.get(binding.getQueueName());
+      
+      if (bindings == null)
+      {
+         bindings = new ArrayList();
+         
+         nameMap.put(binding.getQueueName(), bindings);
+      }
+      
+      bindings.add(binding);      
+      
+      if (binding.isDurable() && binding.getNodeId().equals(thisNode))
+      {
+         localDurableCount++;
+      }
+   }
+   
+   public boolean removeBinding(Binding binding)
+   {
+      boolean removed = allBindings.remove(binding);
+      
+      if (!removed)
+      {
+         return false;
+      }
+      
+      if (binding.isDurable())
+      {
+         durableBindings.remove(binding);
+      }
+      else
+      {
+         nonDurableBindings.remove(binding);
+      }
+      
+      List bindings = (List)nameMap.get(binding.getQueueName());
+      
+      if (bindings == null)
+      {
+         throw new IllegalStateException("Cannot find bindins in name map");
+      }
+      
+      removed = bindings.remove(binding);
+      
+      if (!removed)
+      {
+         throw new IllegalStateException("Cannot find binding in list");
+      }
+      
+      if (bindings.isEmpty())
+      {
+         nameMap.remove(binding.getQueueName());
+      }
+      
+      if (binding.isDurable() && binding.getNodeId().equals(thisNode))
+      {
+         localDurableCount--;
+      }
+      
+      return true;
+   }
+   
+   public Collection getBindingsByName()
+   {
+      return nameMap.values();
+   }
+   
+   public boolean isEmpty()
+   {
+      return nameMap.isEmpty();
+   }
+   
+   public List getAllBindings()
+   {
+      return allBindings;
+   }
+   
+   public int getLocalDurableCount()
+   {
+      return this.localDurableCount;
+   }
+   
+   public int getDurableCount()
+   {
+      return this.durableBindings.size();
+   }
+   
+   public int getRemoteDurableCount()
+   {
+      return getDurableCount() - getLocalDurableCount();
+   }
+   
+      
+}

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/PostOfficeImpl.java	2006-09-12 00:33:57 UTC (rev 1278)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/PostOfficeImpl.java	2006-09-12 18:13:47 UTC (rev 1279)
@@ -45,6 +45,7 @@
 import org.jboss.messaging.core.plugin.contract.MessageStore;
 import org.jboss.messaging.core.plugin.contract.PostOffice;
 import org.jboss.messaging.core.tx.Transaction;
+import org.jboss.messaging.core.tx.TransactionRepository;
 
 import EDU.oswego.cs.dl.util.concurrent.ReadWriteLock;
 import EDU.oswego.cs.dl.util.concurrent.WriterPreferenceReadWriteLock;
@@ -69,6 +70,8 @@
    
    protected MessageStore ms;     
    
+   protected TransactionRepository tr;
+   
    protected String nodeId;
    
    //Map <node id, Map < queue name, binding > >
@@ -82,8 +85,9 @@
    }
    
    public PostOfficeImpl(DataSource ds, TransactionManager tm, Properties sqlProperties,
-                           boolean createTablesOnStartup,
-                           String nodeId, String officeName, MessageStore ms)
+                         boolean createTablesOnStartup,
+                         String nodeId, String officeName, MessageStore ms,
+                         TransactionRepository tr)
    {            
       super (ds, tm, sqlProperties, createTablesOnStartup);
       
@@ -98,6 +102,8 @@
       this.officeName = officeName;
       
       this.ms = ms;
+      
+      this.tr = tr;
    }
    
    // MessagingComponent implementation --------------------------------
@@ -277,11 +283,31 @@
                 
       try
       {                 
-         //We route on the condition
-         List bindings = (List)conditionMap.get(condition);
-           
-         if (bindings != null)
+         ConditionBindings cb = (ConditionBindings)conditionMap.get(condition);
+                             
+         if (cb != null)
          {            
+            boolean startInternalTx = false;
+            
+            if (tx == null && ref.isReliable())
+            {
+               if (cb.getDurableCount() > 1)
+               {
+                  // When routing a persistent message without a transaction then we may need to start an 
+                  // internal transaction in order to route it.
+                  // This is so we can guarantee the message is delivered to all or none of the subscriptions.
+                  // We need to do this if there is more than one durable sub
+                  startInternalTx = true;
+               }
+            }
+            
+            if (startInternalTx)
+            {
+               tx = tr.createTransaction();
+            }
+                        
+            List bindings = cb.getAllBindings();
+            
             Iterator iter = bindings.iterator();
             
             while (iter.hasNext())
@@ -300,7 +326,13 @@
                      routed = true;
                   }                  
                }               
-            }                        
+            } 
+            
+            if (startInternalTx)
+            {
+               //TODO - do we need to rollback if an exception is thrown??
+               tx.commit();
+            }
          }
                  
          return routed;
@@ -484,16 +516,16 @@
       
       String condition = binding.getCondition();
             
-      List bindings = (List)conditionMap.get(condition);
+      ConditionBindings bindings = (ConditionBindings)conditionMap.get(condition);
       
       if (bindings == null)
       {
-         bindings = new ArrayList();
+         bindings = new ConditionBindings(this.nodeId);
          
          conditionMap.put(condition, bindings);
       }
       
-      bindings.add(binding);
+      bindings.addBinding(binding);
    }   
    
    protected Binding removeBinding(String nodeId, String queueName)
@@ -527,30 +559,20 @@
          nameMaps.remove(nodeId);
       }
                   
-      List bindings = (List)conditionMap.get(binding.getCondition());
+      ConditionBindings bindings = (ConditionBindings)conditionMap.get(binding.getCondition());
       
       if (bindings == null)
       {
          throw new IllegalStateException("Cannot find condition bindings for " + binding.getCondition());
       }
       
-      boolean removed = bindings.remove(binding);
+      boolean removed = bindings.removeBinding(binding);
       
       if (!removed)
       {
          throw new IllegalStateException("Cannot find binding in condition binding list");
-      }
+      }           
       
-      if (binding == null)
-      {
-         throw new IllegalStateException("Channel id map does not contain binding for " + binding.getChannelId());
-      }
-      
-      if (!removed)
-      {
-         throw new IllegalStateException("Cannot find binding in condition binding list");
-      }
-      
       if (bindings.isEmpty())
       {
          conditionMap.remove(binding.getCondition());

Added: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/BalancedBinding.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/BalancedBinding.java	2006-09-12 00:33:57 UTC (rev 1278)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/BalancedBinding.java	2006-09-12 18:13:47 UTC (rev 1279)
@@ -0,0 +1,40 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.plugin.postoffice.cluster;
+
+import org.jboss.messaging.core.plugin.contract.Binding;
+
+/**
+ * A BalancedBinding
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+public interface BalancedBinding extends Binding
+{
+   double getConsumptionRate();
+   
+   int getMessageCount();
+}

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/CastMessagesCallback.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/CastMessagesCallback.java	2006-09-12 00:33:57 UTC (rev 1278)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/CastMessagesCallback.java	2006-09-12 18:13:47 UTC (rev 1279)
@@ -23,6 +23,7 @@
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 import org.jboss.messaging.core.Message;
 import org.jboss.messaging.core.tx.TxCallback;
@@ -69,9 +70,9 @@
    
    private PostOfficeInternal office;
    
-   void addMessage(String routingKey, Message message)
+   void addMessage(String routingKey, Message message, Map queueNameToNodeIdMap)
    {
-      MessageHolder holder = new MessageHolder(routingKey, message);
+      MessageHolder holder = new MessageHolder(routingKey, message, queueNameToNodeIdMap);
       
       if (message.isReliable())
       {
@@ -113,7 +114,7 @@
       if (persistent != null)
       {
          // Cast a commit message
-         ClusterRequest req = new TransactionRequest(nodeId, txId);
+         ClusterRequest req = new SendTransactionRequest(nodeId, txId);
          
          // Stack must be FIFO
          office.asyncSendRequest(req);
@@ -136,7 +137,7 @@
       {
          //We send the persistent messages which go into the "holding area" on
          //the receiving nodes
-         ClusterRequest req = new TransactionRequest(nodeId, txId, persistent);
+         ClusterRequest req = new SendTransactionRequest(nodeId, txId, persistent);
          
          //Stack must be FIFO
          office.asyncSendRequest(req);

Added: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusterTransaction.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusterTransaction.java	2006-09-12 00:33:57 UTC (rev 1278)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusterTransaction.java	2006-09-12 18:13:47 UTC (rev 1279)
@@ -0,0 +1,36 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.plugin.postoffice.cluster;
+
+/**
+ * A ClusterTransaction
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+public interface ClusterTransaction
+{
+   void commit(PostOfficeInternal office) throws Exception;
+}

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusteredPostOfficeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusteredPostOfficeImpl.java	2006-09-12 00:33:57 UTC (rev 1278)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusteredPostOfficeImpl.java	2006-09-12 18:13:47 UTC (rev 1279)
@@ -22,6 +22,7 @@
 package org.jboss.messaging.core.plugin.postoffice.cluster;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -32,12 +33,14 @@
 import javax.transaction.TransactionManager;
 
 import org.jboss.logging.Logger;
+import org.jboss.messaging.core.Delivery;
 import org.jboss.messaging.core.MessageReference;
 import org.jboss.messaging.core.local.Queue;
 import org.jboss.messaging.core.plugin.contract.Binding;
 import org.jboss.messaging.core.plugin.contract.ClusteredPostOffice;
 import org.jboss.messaging.core.plugin.contract.MessageStore;
 import org.jboss.messaging.core.plugin.contract.PersistenceManager;
+import org.jboss.messaging.core.plugin.postoffice.ConditionBindings;
 import org.jboss.messaging.core.plugin.postoffice.PostOfficeImpl;
 import org.jboss.messaging.core.plugin.postoffice.BindingImpl;
 import org.jboss.messaging.core.tx.Transaction;
@@ -99,8 +102,6 @@
    
    private PersistenceManager pm;
    
-   private TransactionRepository tr;  
-   
    private Element syncChannelConfigE;
    
    private Element asyncChannelConfigE;
@@ -112,6 +113,8 @@
    private long stateTimeout;
    
    private long castTimeout;
+   
+   private RoutingPolicy routingPolicy;
       
    public ClusteredPostOfficeImpl()
    {        
@@ -136,10 +139,11 @@
                               Element asyncChannelConfig,
                               TransactionRepository tr,
                               PersistenceManager pm,
-                              long stateTimeout, long castTimeout) throws Exception
+                              long stateTimeout, long castTimeout,
+                              RoutingPolicy routingPolicy) throws Exception
    {            
       this(ds, tm, sqlProperties, createTablesOnStartup, nodeId, officeName, ms,
-           groupName, tr, pm, stateTimeout, castTimeout);
+           groupName, tr, pm, stateTimeout, castTimeout, routingPolicy);
       
       this.syncChannelConfigE = syncChannelConfig;      
       this.asyncChannelConfigE = asyncChannelConfig;     
@@ -156,10 +160,11 @@
                               String asyncChannelConfig,
                               TransactionRepository tr,
                               PersistenceManager pm,
-                              long stateTimeout, long castTimeout) throws Exception
+                              long stateTimeout, long castTimeout,
+                              RoutingPolicy routingPolicy) throws Exception
    {            
       this(ds, tm, sqlProperties, createTablesOnStartup, nodeId, officeName, ms,
-           groupName, tr, pm, stateTimeout, castTimeout);
+           groupName, tr, pm, stateTimeout, castTimeout, routingPolicy);
 
       this.syncChannelConfigS = syncChannelConfig;      
       this.asyncChannelConfigS = asyncChannelConfig;     
@@ -171,12 +176,11 @@
                                String groupName,
                                TransactionRepository tr,
                                PersistenceManager pm,
-                               long stateTimeout, long castTimeout)
+                               long stateTimeout, long castTimeout,
+                               RoutingPolicy routingPolicy)
    {
-      super (ds, tm, sqlProperties, createTablesOnStartup, nodeId, officeName, ms);
+      super (ds, tm, sqlProperties, createTablesOnStartup, nodeId, officeName, ms, tr);
        
-      this.tr = tr;
-      
       this.pm = pm;
       
       this.groupName = groupName;
@@ -185,6 +189,8 @@
       
       this.castTimeout = castTimeout;
       
+      this.routingPolicy = routingPolicy;
+      
       init();
    }
 
@@ -277,7 +283,7 @@
    public void recover() throws Exception
    {
       //We send a "check" message to all nodes of the cluster
-      this.asyncSendRequest(new CheckMessage(nodeId));
+      asyncSendRequest(new CheckMessage(nodeId));
    }
    
    public boolean route(MessageReference ref, String condition, Transaction tx) throws Exception
@@ -292,103 +298,133 @@
          throw new IllegalArgumentException("Condition is null");
       }
       
+      boolean routed = false;
+      
       lock.readLock().acquire();
       
       try
       {      
-         // We route on the condition
-         List bindings = (List)conditionMap.get(condition);
-      
-         if (bindings != null)
-         {                
-            // When routing a persistent message without a transaction then we may need to start an 
-            // internal transaction in order to route it.
-            // We do this if the message is reliable AND:
-            // (
-            // a) The message needs to be routed to more than one durable subscription. This is so we
-            // can guarantee the message is persisted on all the durable subscriptions or none if failure
-            // occurs - i.e. the persistence is transactional
-            // OR
-            // b) There is at least one durable subscription on a different node.
-            // In this case we need to start a transaction since we want to add a callback on the transaction
-            // to cast the message to other nodes
-            // )
-                        
-            //TODO we can optimise this out by storing this as a flag somewhere
-            boolean startInternalTx = false;
-      
-            if (tx == null)
+         ConditionBindings cb = (ConditionBindings)conditionMap.get(condition);
+         
+         boolean startInternalTx = false;
+         
+         if (cb != null)
+         {
+            if (tx == null && ref.isReliable())
             {
-               if (ref.isReliable())
+               if (!(cb.getDurableCount() == 1 && cb.getLocalDurableCount() == 1))
                {
-                  Iterator iter = bindings.iterator();
-                  
-                  int count = 0;
-                  
-                  while (iter.hasNext())
-                  {
-                     Binding binding = (Binding)iter.next();
-                     
-                     if (binding.isDurable())
-                     {
-                        count++;
-                        
-                        if (count == 2 || !binding.getNodeId().equals(this.nodeId))
-                        {
-                           startInternalTx = true;
-                           
-                           break;
-                        }                          
-                     }
-                  }
+                  // When routing a persistent message without a transaction then we may need to start an 
+                  // internal transaction in order to route it.
+                  // This is so we can guarantee the message is delivered to all or none of the subscriptions.
+                  // We need to do this if there is any other than a single local durable subscription
+                  startInternalTx = true;
                }
-               
-               if (startInternalTx)
-               {
-                  tx = tr.createTransaction();
-               }
             }
-                       
-            Iterator iter = bindings.iterator();
             
-            boolean sendRemotely = false;
+            if (startInternalTx)
+            {
+               tx = tr.createTransaction();
+            }
+            
+            //There may be no transaction in the following cases
+            //1) No transaction specified in params and reference is unreliable
+            //2) No transaction specified in params and reference is reliable and there is only one
+            //   or less local durable subscription
+         
+            Collection bindingLists = cb.getBindingsByName();
 
+            Iterator iter = bindingLists.iterator();
+            
+            int numberRemote = 0;
+            
+            Map queueNameNodeIdMap = null;
+            
             while (iter.hasNext())
             {
-               Binding binding = (Binding)iter.next();
+               //Each list is the list of bindings which have the same queue name
+               List bindings = (List)iter.next();
                
-               if (binding.isActive())
-               {            
-                  if (binding.getNodeId().equals(this.nodeId))
+               //We may have more than one binding with the same queue name on different nodes in the
+               //following situations:
+               //1) When a point to point queue has been deployed across the cluster
+               //2) When a durable subscription has been created on multiple nodes to implement
+               // shared durable subscriptions.
+               //In both of these cases we only want one of the queues to receive the message, we choose which 
+               //one by asking the routing policy
+               Binding binding;
+               
+               if (bindings.size() == 1)
+               {
+                  binding = (Binding)bindings.get(0);
+               }
+               else if (bindings.size() > 1)
+               {
+                  binding = routingPolicy.choose(bindings); 
+                  
+                  if (queueNameNodeIdMap == null)
                   {
-                     //It's a local binding so we pass the message on to the subscription
-                     Queue subscription = binding.getQueue();
+                     queueNameNodeIdMap = new HashMap();
+                  }
                   
-                     subscription.handle(null, ref, tx);                    
+                  if (!binding.getNodeId().equals(this.nodeId))
+                  {
+                     //Chose a remote node
+                     //We add the node id to the map against the name
+                     //This is used on receipt to work out if a particular queue should
+                     //accept the message, when multicasted
+                     queueNameNodeIdMap.put(binding.getQueueName(), binding.getNodeId());
                   }
-                  else
+               }
+               else
+               {
+                  throw new IllegalStateException("No bindings in list!");
+               }
+               
+               if (binding.getNodeId().equals(this.nodeId))
+               {
+                  //It's a local binding so we pass the message on to the queue
+                  Queue queue = binding.getQueue();
+               
+                  Delivery del = queue.handle(null, ref, tx);    
+                  
+                  if (del != null && del.isSelectorAccepted())
                   {
-                     //It's a binding on a different office instance on the cluster
-                     sendRemotely = true;                     
-                      
-                     if (ref.isReliable() && binding.isDurable())
-                     {
-                        //Insert the reference into the database
-                        pm.addReference(binding.getChannelId(), ref, tx);
-                     }
-                  }                     
+                     routed = true;
+                  }  
                }
-            } 
+               else
+               {
+                  //It's a binding on a different office instance on the cluster
+                  numberRemote++;                     
+                   
+                  if (ref.isReliable() && binding.isDurable())
+                  {
+                     //Insert the reference into the database
+                     
+                     //TODO perhaps we should do this via a stub queue class
+                     pm.addReference(binding.getChannelId(), ref, tx);
+                  }
+                  
+                  routed = true;
+               }                                                
+            }
+                                    
+            //Now we've sent the message to any local queues, we might also need
+            //to send the message to the other office instances on the cluster if there are
+            //queues on those nodes that need to receive the message
             
-            //Now we've sent the message to all the local subscriptions, we might also need
-            //to multicast the message to the other office instances on the cluster if there are
-            //subscriptions on those nodes that need to receive the message
-            if (sendRemotely)
+            if (numberRemote > 0)
             {
+               //TODO - If numberRemote == 1, we could do unicast rather than multicast
+               //This would avoid putting strain on nodes that don't need to receive the reference
+               //This would be the case for load balancing queues where the routing policy
+               //sometimes allows a remote queue to get the reference
+               
                if (tx == null)
                {
-                  //We just throw the message on the network - no need to wait for any reply            
-                  asyncSendRequest(new MessageRequest(condition, ref.getMessage()));               
+                  //We just throw the message on the network - no need to wait for any reply                  
+                  asyncSendRequest(new MessageRequest(condition, ref.getMessage(), queueNameNodeIdMap));               
                }
                else
                {
@@ -402,12 +438,13 @@
                      tx.addFirstCallback(callback, this);
                   }
                       
-                  callback.addMessage(condition, ref.getMessage());                  
+                  callback.addMessage(condition, ref.getMessage(), queueNameNodeIdMap);                  
                }
             }
             
             if (startInternalTx)
             {               
+               // TODO - do we need to rollback if an exception is thrown??
                tx.commit();
             }
          }
@@ -417,9 +454,7 @@
          lock.readLock().release();
       }
          
-      // We don't care if the individual subscriptions accepted the reference
-      // We always return true
-      return true; 
+      return routed; 
    }
    
    // PostOfficeInternal implementation ------------------------------------------------------------------
@@ -506,9 +541,55 @@
       }
    }
    
-   public void routeFromCluster(org.jboss.messaging.core.Message message, String routingKey) throws Exception
+   public void addToQueue(String queueName, List messages) throws Exception
    {
       lock.readLock().acquire();      
+            
+      try
+      {
+         Binding binding = this.getBindingForQueueName(queueName);
+         
+         if (binding == null)
+         {
+            throw new IllegalStateException("Cannot find binding for queue name " + queueName);
+         }
+         
+         Queue queue = binding.getQueue();
+         
+         Iterator iter = messages.iterator();
+         
+         while (iter.hasNext())
+         {
+            MessageReference ref = null;
+            
+            try
+            {
+               org.jboss.messaging.core.Message msg = (org.jboss.messaging.core.Message)iter.next();
+               
+               ref = ms.reference(msg);
+               
+               queue.handleDontPersist(null, ref, null);
+            }
+            finally
+            {
+               if (ref != null)
+               {
+                  ref.releaseMemoryReference();
+               }
+            }
+         }    
+      }
+      finally
+      {
+         
+         lock.readLock().release();
+      }
+   }
+   
+   public void routeFromCluster(org.jboss.messaging.core.Message message, String routingKey,
+                                Map queueNameNodeIdMap) throws Exception
+   {
+      lock.readLock().acquire();      
       
       // Need to reference the message
       MessageReference ref = null;
@@ -517,26 +598,46 @@
          ref = ms.reference(message);
               
          // We route on the condition
-         List bindings = (List)conditionMap.get(routingKey);
+         ConditionBindings cb = (ConditionBindings)conditionMap.get(routingKey);
       
-         if (bindings != null)
+         if (cb != null)
          {                                
+            List bindings = cb.getAllBindings();
+            
             Iterator iter = bindings.iterator();
             
             while (iter.hasNext())
             {
                Binding binding = (Binding)iter.next();
-               
+                                             
                if (binding.isActive())
                {            
                   if (binding.getNodeId().equals(this.nodeId))
                   {  
-                     //It's a local binding so we pass the message on to the subscription
-                     Queue subscription = binding.getQueue();
-                  
-                     //TODO instead of adding a new method on the channel
-                     //we should set a header and use the same method
-                     subscription.handleDontPersist(null, ref, null);
+                     boolean handle = true;
+                     
+                     if (queueNameNodeIdMap != null)
+                     {
+                        String desiredNodeId = (String)queueNameNodeIdMap.get(binding.getQueueName());
+                        
+                        //When there are more than one queues with the same name across the cluster we only
+                        //want to chose one of them
+                        
+                        if (desiredNodeId != null)
+                        {
+                           handle = desiredNodeId.equals(nodeId);
+                        }
+                     }
+                     
+                     if (handle)
+                     {
+                        //It's a local binding so we pass the message on to the subscription
+                        Queue subscription = binding.getQueue();
+                     
+                        //TODO instead of adding a new method on the channel
+                        //we should set a header and use the same method
+                        subscription.handleDontPersist(null, ref, null);
+                     }
                   }                               
                }
             }                          
@@ -558,36 +659,32 @@
       asyncChannel.send(new Message(null, null, request));
    }
    
-   public void addToHoldingArea(TransactionId id, List messageHolders) throws Exception
+   /*
+    * We put the transaction in the holding area
+    */
+   public void holdTransaction(TransactionId id, ClusterTransaction tx) throws Exception
    {
       synchronized (holdingArea)
       {
-         holdingArea.put(id, messageHolders);
-      }      
+         holdingArea.put(id, tx);
+      } 
    }
-         
+   
    public void commitTransaction(TransactionId id) throws Exception
    {
-      List messageHolders = null;
+      ClusterTransaction tx = null;
       
       synchronized (holdingArea)
       {
-         messageHolders = (List)holdingArea.remove(id);
+         tx = (ClusterTransaction)holdingArea.remove(id);
       }
       
-      if (messageHolders == null)
+      if (tx == null)
       {
-         throw new IllegalStateException("Cannot find messages for transaction id: " + id);
+         throw new IllegalStateException("Cannot find transaction transaction id: " + id);
       }
       
-      Iterator iter = messageHolders.iterator();
-      
-      while (iter.hasNext())
-      {
-         MessageHolder holder = (MessageHolder)iter.next();
-         
-         routeFromCluster(holder.getMessage(), holder.getRoutingKey());
-      }
+      tx.commit(this);
    }
    
    /*
@@ -624,7 +721,7 @@
                   {
                      MessageHolder holder = (MessageHolder)iter2.next();
                      
-                     routeFromCluster(holder.getMessage(), holder.getRoutingKey());
+                     routeFromCluster(holder.getMessage(), holder.getRoutingKey(), holder.getQueueNameToNodeIdMap());
                   }
                   
                   toRemove.add(id);
@@ -736,43 +833,45 @@
     * We have received a reference cast from another node - and we need to route it to our local
     * subscriptions    
     */
-   protected void routeFromCluster(MessageReference ref, String routingKey) throws Exception
-   {
-      lock.readLock().acquire();
-      
-      try
-      {      
-         // We route on the condition
-         List bindings = (List)conditionMap.get(routingKey);
-      
-         if (bindings != null)
-         {                                
-            Iterator iter = bindings.iterator();
-            
-            while (iter.hasNext())
-            {
-               Binding binding = (Binding)iter.next();
-               
-               if (binding.isActive())
-               {            
-                  if (binding.getNodeId().equals(this.nodeId))
-                  {  
-                     //It's a local binding so we pass the message on to the subscription
-                     Queue subscription = binding.getQueue();
-                  
-                     //TODO instead of adding a new method on the channel
-                     //we should set a header and use the same method
-                     subscription.handleDontPersist(null, ref, null);
-                  }                               
-               }
-            }                          
-         }
-      }
-      finally
-      {                  
-         lock.readLock().release();
-      }
-   }            
+//   protected void routeFromCluster(MessageReference ref, String routingKey) throws Exception
+//   {
+//      lock.readLock().acquire();
+//      
+//      try
+//      {      
+//         // We route on the condition
+//         List bindings = (List)conditionMap.get(routingKey);
+//      
+//         if (bindings != null)
+//         {                                
+//            Iterator iter = bindings.iterator();
+//            
+//            while (iter.hasNext())
+//            {
+//               Binding binding = (Binding)iter.next();
+//               
+//               if (binding.isActive())
+//               {            
+//                  if (binding.getNodeId().equals(this.nodeId))
+//                  {  
+//                     //It's a local binding so we pass the message on to the subscription
+//                     Queue subscription = binding.getQueue();
+//                  
+//                     //TODO instead of adding a new method on the channel
+//                     //we should set a header and use the same method
+//                     subscription.handleDontPersist(null, ref, null);
+//                  }                               
+//               }
+//            }                          
+//         }
+//      }
+//      finally
+//      {                  
+//         lock.readLock().release();
+//      }
+//   }            
+//   
+  
        
    // Private ------------------------------------------------------------------------------------------
              

Added: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/FavourLocalRoutingPolicy.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/FavourLocalRoutingPolicy.java	2006-09-12 00:33:57 UTC (rev 1278)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/FavourLocalRoutingPolicy.java	2006-09-12 18:13:47 UTC (rev 1279)
@@ -0,0 +1,68 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.plugin.postoffice.cluster;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.jboss.messaging.core.plugin.contract.Binding;
+
+/**
+ * A FavourLocalRoutingPolicy
+ * 
+ * This routing policy always favours the local queue
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+public class FavourLocalRoutingPolicy implements RoutingPolicy
+{
+   private String localNodeId;
+
+   public FavourLocalRoutingPolicy(String localNodeId)
+   {
+      this.localNodeId = localNodeId;
+   }
+   
+   public Binding choose(List bindings)
+   {
+      Iterator iter = bindings.iterator();
+      
+      Binding binding = null;
+      
+      while (iter.hasNext())
+      {
+         binding = (Binding)iter.next();
+         
+         if (binding.getNodeId().equals(localNodeId))
+         {
+            return binding;
+         }
+      }
+      
+      return binding;
+   }
+
+}

Added: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MeasuredQueue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MeasuredQueue.java	2006-09-12 00:33:57 UTC (rev 1278)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MeasuredQueue.java	2006-09-12 18:13:47 UTC (rev 1279)
@@ -0,0 +1,136 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.plugin.postoffice.cluster;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.jboss.messaging.core.Delivery;
+import org.jboss.messaging.core.Filter;
+import org.jboss.messaging.core.MessageReference;
+import org.jboss.messaging.core.SimpleDelivery;
+import org.jboss.messaging.core.local.Queue;
+import org.jboss.messaging.core.plugin.contract.MessageStore;
+import org.jboss.messaging.core.plugin.contract.PersistenceManager;
+
+import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
+
+/**
+ * 
+ * A MeasuredQueue
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+public class MeasuredQueue extends Queue
+{
+   private static final int MIN_PERIOD = 1000;
+   
+   private long lastTime;
+   
+   private double lastGrowthRate;
+   
+   private volatile int numberAdded;
+   
+   private volatile int numberConsumed;
+ 
+   public MeasuredQueue(long id, MessageStore ms, PersistenceManager pm, boolean acceptReliableMessages, boolean recoverable, int fullSize, int pageSize, int downCacheSize, QueuedExecutor executor, Filter filter)
+   {
+      super(id, ms, pm, acceptReliableMessages, recoverable, fullSize, pageSize,
+            downCacheSize, executor, filter);
+      
+      lastTime = System.currentTimeMillis();      
+      
+      numberAdded = numberConsumed = 0;
+   }
+   
+   /**
+    * 
+    * @return The rate of growth in messages per second of the queue
+    * Rate of growth is defined as follows:
+    * growth = (number of messages added - number of messages consumed) / time
+    */
+   public synchronized double getGrowthRate()
+   {
+      long now = System.currentTimeMillis();
+      
+      long period = now - lastTime;
+      
+      if (period <= MIN_PERIOD)
+      {
+         //Cache the value to avoid recalculating too often
+         return lastGrowthRate;
+      }
+      
+      lastGrowthRate = 1000 * (numberAdded - numberConsumed) / ((double)period);
+      
+      lastTime = now;
+      
+      numberAdded = numberConsumed = 0;
+      
+      return lastGrowthRate;
+   }
+   
+   public List getDeliveries(int number) throws Exception
+   {
+      List dels = new ArrayList();
+      
+      synchronized (refLock)
+      {
+         synchronized (deliveryLock)
+         {
+            MessageReference ref;
+            
+            while ((ref = removeFirstInMemory()) != null)
+            {
+               SimpleDelivery del = new SimpleDelivery(this, ref);
+               
+               deliveries.add(del);
+               
+               dels.add(del);               
+            }           
+            return dels;
+         }
+      }          
+   }
+
+   protected void addReferenceInMemory(MessageReference ref) throws Exception
+   {
+      super.addReferenceInMemory(ref);
+      
+      //This is ok, since the channel ensures only one thread calls this method at once
+      numberAdded++;
+   }
+
+   protected boolean acknowledgeInMemory(Delivery d)
+   {
+      boolean acked = super.acknowledgeInMemory(d);
+      
+      // This is ok, since the channel ensures only one thread calls this method at once
+      numberConsumed--;
+      
+      return acked;
+   }  
+}

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MessageHolder.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MessageHolder.java	2006-09-12 00:33:57 UTC (rev 1278)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MessageHolder.java	2006-09-12 18:13:47 UTC (rev 1279)
@@ -22,6 +22,7 @@
 package org.jboss.messaging.core.plugin.postoffice.cluster;
 
 import java.io.Serializable;
+import java.util.Map;
 
 import org.jboss.messaging.core.Message;
 
@@ -36,15 +37,19 @@
  */
 class MessageHolder implements Serializable
 {
-   String routingKey;
+   private String routingKey;
    
-   Message message;
+   private Message message;
    
-   MessageHolder(String routingKey, Message message)
+   private Map queueNameToNodeIdMap;
+   
+   MessageHolder(String routingKey, Message message, Map queueNameToNodeIdMap)
    {
       this.routingKey = routingKey;
       
       this.message = message;
+      
+      this.queueNameToNodeIdMap = queueNameToNodeIdMap;
    }
    
    String getRoutingKey()
@@ -56,4 +61,9 @@
    {
       return message;
    }
+   
+   Map getQueueNameToNodeIdMap()
+   {
+      return queueNameToNodeIdMap;
+   }
 }     

Added: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MessageMover.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MessageMover.java	2006-09-12 00:33:57 UTC (rev 1278)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MessageMover.java	2006-09-12 18:13:47 UTC (rev 1279)
@@ -0,0 +1,102 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.plugin.postoffice.cluster;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+import org.jboss.messaging.core.Delivery;
+import org.jboss.messaging.core.local.Queue;
+import org.jboss.messaging.core.plugin.contract.Binding;
+import org.jboss.messaging.core.tx.Transaction;
+import org.jboss.messaging.core.tx.TransactionRepository;
+
+/**
+ * A MessageMover
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+public class MessageMover
+{
+   private TransactionRepository tr;
+   
+   private String nodeId;
+   
+   private RedistributionPolicy redistributionPolicy;
+   
+   public void moveMessages(Binding from, Binding to, int num) throws Throwable
+   {
+      if (!from.getNodeId().equals(this.nodeId))
+      {
+         throw new IllegalArgumentException("From binding must be on local node!");
+      }
+      
+      if (to.getNodeId().equals(this.nodeId))
+      {
+         throw new IllegalArgumentException("To binding cannot be on local node");
+      }
+      
+      //Consume the messages in a transaction - don't commit
+      Queue fromQueue = from.getQueue();
+      
+      Transaction tx = tr.createTransaction();
+      
+      List dels = ((MeasuredQueue)fromQueue).getDeliveries(num);
+      
+      Iterator iter = dels.iterator();
+      
+      while (iter.hasNext())
+      {
+         Delivery del = (Delivery)iter.next();
+         
+         del.acknowledge(tx);         
+      }
+      
+      
+ 
+   } 
+   
+   public void calculateMovements(Collection nameLists)
+   {
+      Iterator iter = nameLists.iterator();
+      
+      while (iter.hasNext())
+      {
+         List bindings = (List)iter.next();
+         
+         if (bindings.size() > 1)
+         {
+            RedistributionOrder order = redistributionPolicy.calculate(bindings);
+            
+            if (order != null)
+            {
+               
+            }
+         }
+      }
+   }
+}

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MessageRequest.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MessageRequest.java	2006-09-12 00:33:57 UTC (rev 1278)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MessageRequest.java	2006-09-12 18:13:47 UTC (rev 1279)
@@ -21,6 +21,8 @@
  */
 package org.jboss.messaging.core.plugin.postoffice.cluster;
 
+import java.util.Map;
+
 import org.jboss.messaging.core.Message;
 
 /**
@@ -42,15 +44,19 @@
    
    private Message message;
    
-   MessageRequest(String routingKey, Message message)
+   private Map queueNameNodeIdMap;
+   
+   MessageRequest(String routingKey, Message message, Map queueNameNodeIdMap)
    {
       this.routingKey = routingKey;
       
       this.message = message;
+      
+      this.queueNameNodeIdMap = queueNameNodeIdMap;
    }
    
    public void execute(PostOfficeInternal office) throws Exception
    {
-      office.routeFromCluster(message, routingKey);      
+      office.routeFromCluster(message, routingKey, queueNameNodeIdMap);      
    }   
 }

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MessagesRequest.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MessagesRequest.java	2006-09-12 00:33:57 UTC (rev 1278)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MessagesRequest.java	2006-09-12 18:13:47 UTC (rev 1279)
@@ -54,7 +54,7 @@
       {
          MessageHolder holder = (MessageHolder)iter.next();
          
-         office.routeFromCluster(holder.getMessage(), holder.getRoutingKey());
+         office.routeFromCluster(holder.getMessage(), holder.getRoutingKey(), holder.getQueueNameToNodeIdMap());
       }
    }   
 }

Added: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MoveMessagesCallback.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MoveMessagesCallback.java	2006-09-12 00:33:57 UTC (rev 1278)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MoveMessagesCallback.java	2006-09-12 18:13:47 UTC (rev 1279)
@@ -0,0 +1,87 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.plugin.postoffice.cluster;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.jboss.messaging.core.Message;
+import org.jboss.messaging.core.tx.TxCallback;
+
+/**
+ * A MoveMessagesCallback
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+public class MoveMessagesCallback implements TxCallback
+{
+   private List messages;
+   
+   private String queueName;
+   
+   MoveMessagesCallback(String queueName)
+   {
+      this.queueName = queueName;
+      
+      messages = new ArrayList();
+   }
+
+   void addMessage(Message msg)
+   {
+      messages.add(msg);
+   }
+   
+   public void afterCommit(boolean onePhase) throws Exception
+   {
+ 
+   }
+
+   public void afterPrepare() throws Exception
+   {
+
+   }
+
+   public void afterRollback(boolean onePhase) throws Exception
+   {
+
+   }
+
+   public void beforeCommit(boolean onePhase) throws Exception
+   {
+
+   }
+
+   public void beforePrepare() throws Exception
+   {
+
+   }
+
+   public void beforeRollback(boolean onePhase) throws Exception
+   {
+
+   }
+
+}

Added: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MoveTransactionRequest.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MoveTransactionRequest.java	2006-09-12 00:33:57 UTC (rev 1278)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MoveTransactionRequest.java	2006-09-12 18:13:47 UTC (rev 1279)
@@ -0,0 +1,62 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.plugin.postoffice.cluster;
+
+import java.util.List;
+
+/**
+ * 
+ * A MoveTransactionRequest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+class MoveTransactionRequest extends TransactionRequest
+{
+   private List messages;
+   
+   private String queueName;
+     
+   MoveTransactionRequest(String nodeId, long txId, List messages, String queueName)
+   {
+      super(nodeId, txId, true);
+      
+      this.messages = messages;
+      
+      this.queueName = queueName;
+   }
+   
+   MoveTransactionRequest(String nodeId, long txId)
+   {
+      super(nodeId, txId, false);
+   }
+   
+   public void commit(PostOfficeInternal office) throws Exception
+   {
+      office.addToQueue(queueName, messages);  
+   }
+}
+
+

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java	2006-09-12 00:33:57 UTC (rev 1278)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java	2006-09-12 18:13:47 UTC (rev 1279)
@@ -22,6 +22,7 @@
 package org.jboss.messaging.core.plugin.postoffice.cluster;
 
 import java.util.List;
+import java.util.Map;
 
 import org.jboss.messaging.core.Message;
 import org.jgroups.Address;
@@ -29,6 +30,9 @@
 /**
  * 
  * A PostOfficeInternal
+ * 
+ * Extension to the ClusteredPostOffice interface that expose extra methods useful to
+ * ClusteredRequests
  *
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @version <tt>$Revision: 1.1 $</tt>
@@ -48,11 +52,13 @@
    void handleAddressNodeMapping(Address address, String nodeId)
       throws Exception;
    
-   void routeFromCluster(Message message, String routingKey) throws Exception;
+   void routeFromCluster(Message message, String routingKey, Map queueNameNodeIdMap) throws Exception;
    
+   void addToQueue(String queueName, List messages) throws Exception;
+   
    void asyncSendRequest(ClusterRequest request) throws Exception;
    
-   void addToHoldingArea(TransactionId id, List messageHolders) throws Exception;
+   void holdTransaction(TransactionId id, ClusterTransaction tx) throws Exception;
    
    void commitTransaction(TransactionId id) throws Exception;
    

Added: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RedistributionOrder.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RedistributionOrder.java	2006-09-12 00:33:57 UTC (rev 1278)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RedistributionOrder.java	2006-09-12 18:13:47 UTC (rev 1279)
@@ -0,0 +1,65 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.plugin.postoffice.cluster;
+
+/**
+ * 
+ * A RedistributionOrder
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+public class RedistributionOrder
+{
+   private int numberOfMessages;
+   
+   private String queueName;
+   
+   private String destinationNodeId;
+
+   public RedistributionOrder(int numberOfMessages, String queueName, String destinationNodeId)
+   {
+      this.numberOfMessages = numberOfMessages;
+      
+      this.queueName = queueName;
+      
+      this.destinationNodeId = destinationNodeId;
+   }
+
+   public String getDestinationNodeId()
+   {
+      return destinationNodeId;
+   }
+
+   public int getNumberOfMessages()
+   {
+      return numberOfMessages;
+   }
+
+   public String getQueueName()
+   {
+      return queueName;
+   }
+}

Added: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RedistributionPolicy.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RedistributionPolicy.java	2006-09-12 00:33:57 UTC (rev 1278)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RedistributionPolicy.java	2006-09-12 18:13:47 UTC (rev 1279)
@@ -0,0 +1,38 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.plugin.postoffice.cluster;
+
+import java.util.List;
+
+/**
+ * A RedistributionPolicy
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+public interface RedistributionPolicy
+{
+   RedistributionOrder calculate(List bindings);
+}

Added: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RoutingPolicy.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RoutingPolicy.java	2006-09-12 00:33:57 UTC (rev 1278)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RoutingPolicy.java	2006-09-12 18:13:47 UTC (rev 1279)
@@ -0,0 +1,40 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.plugin.postoffice.cluster;
+
+import java.util.List;
+
+import org.jboss.messaging.core.plugin.contract.Binding;
+
+/**
+ * A RoutingPolicy
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+public interface RoutingPolicy
+{
+   Binding choose(List bindings); 
+}

Added: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/SendTransactionRequest.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/SendTransactionRequest.java	2006-09-12 00:33:57 UTC (rev 1278)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/SendTransactionRequest.java	2006-09-12 18:13:47 UTC (rev 1279)
@@ -0,0 +1,66 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.plugin.postoffice.cluster;
+
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * A SendTransactionRequest
+ * 
+ * Used for sending persistent messages transactionally across the network
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+class SendTransactionRequest extends TransactionRequest
+{
+   private List messageHolders;
+     
+   SendTransactionRequest(String nodeId, long txId, List messageHolders)
+   {
+      super(nodeId, txId, true);
+      
+      this.messageHolders = messageHolders;  
+   }
+   
+   SendTransactionRequest(String nodeId, long txId)
+   {
+      super(nodeId, txId, false);
+   }
+   
+   public void commit(PostOfficeInternal office) throws Exception
+   {
+      Iterator iter = messageHolders.iterator();
+    
+      while (iter.hasNext())
+      {
+         MessageHolder holder = (MessageHolder)iter.next();
+         
+         office.routeFromCluster(holder.getMessage(), holder.getRoutingKey(), holder.getQueueNameToNodeIdMap());
+      }
+   }
+}
+

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/TransactionRequest.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/TransactionRequest.java	2006-09-12 00:33:57 UTC (rev 1278)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/TransactionRequest.java	2006-09-12 18:13:47 UTC (rev 1279)
@@ -21,58 +21,40 @@
  */
 package org.jboss.messaging.core.plugin.postoffice.cluster;
 
-import java.util.List;
 
 /**
  * A TransactionRequest
  * 
- * Used for sending persistent messages transactionally across the network
- *
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @version <tt>$Revision: 1.1 $</tt>
  *
  * $Id$
  *
  */
-class TransactionRequest implements ClusterRequest
+abstract class TransactionRequest implements ClusterRequest, ClusterTransaction
 {
-   private static final long serialVersionUID = 644500948910063649L;
-
    private String nodeId;
    
    private long txId;
  
-   private List messageHolders;
-   
-   private boolean tryTransaction;
+   private boolean hold;
       
-   TransactionRequest(String nodeId, long txId, List messageHolders)
+   TransactionRequest(String nodeId, long txId, boolean hold)
    {
       this.nodeId = nodeId;
       
       this.txId= txId;
 
-      this.messageHolders = messageHolders;  
-      
-      tryTransaction = true;
+      this.hold = hold;
    }
    
-   TransactionRequest(String nodeId, long txId)
-   {
-      this.nodeId = nodeId;
-      
-      this.txId= txId;
-      
-      tryTransaction = false;
-   }
-   
    public void execute(PostOfficeInternal office) throws Exception
    {
       TransactionId id = new TransactionId(nodeId, txId);
       
-      if (tryTransaction)
+      if (hold)
       {
-         office.addToHoldingArea(id, messageHolders);
+         office.holdTransaction(id, this);
       }
       else
       {
@@ -81,3 +63,4 @@
    }   
 }
 
+

Modified: trunk/src/main/org/jboss/messaging/util/StreamUtils.java
===================================================================
--- trunk/src/main/org/jboss/messaging/util/StreamUtils.java	2006-09-12 00:33:57 UTC (rev 1278)
+++ trunk/src/main/org/jboss/messaging/util/StreamUtils.java	2006-09-12 18:13:47 UTC (rev 1279)
@@ -32,8 +32,6 @@
 import java.util.Map;
 import java.util.Set;
 
-import org.jboss.util.Primitives;
-
 /**
  * A StreamUtils
  *
@@ -101,7 +99,7 @@
             value = new Double(in.readDouble());
             break;
          case BOOLEAN :
-            value = Primitives.valueOf(in.readBoolean());
+            value = in.readBoolean() ? Boolean.TRUE : Boolean.FALSE;
             break;
          case STRING :
             if (longStrings)

Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/SimplePostOfficeTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/SimplePostOfficeTest.java	2006-09-12 00:33:57 UTC (rev 1278)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/SimplePostOfficeTest.java	2006-09-12 18:13:47 UTC (rev 1279)
@@ -972,7 +972,7 @@
    {
       PostOfficeImpl postOffice = 
          new PostOfficeImpl(sc.getDataSource(), sc.getTransactionManager(),
-                              null, true, "node1", "Simple", ms);
+                              null, true, "node1", "Simple", ms, tr);
       
       postOffice.start();      
       




More information about the jboss-cvs-commits mailing list