[Jboss-cvs] JBoss Messaging SVN: r1287 - in trunk: src/etc/server/default/deploy src/etc/xmdesc src/main/org/jboss/messaging/core/plugin src/main/org/jboss/messaging/core/plugin/contract src/main/org/jboss/messaging/core/plugin/postoffice src/main/org/jboss/messaging/core/plugin/postoffice/cluster src/main/org/jboss/messaging/core/tx tests/src/org/jboss/test/messaging/core/plugin

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Sep 13 14:23:44 EDT 2006


Author: timfox
Date: 2006-09-13 14:23:31 -0400 (Wed, 13 Sep 2006)
New Revision: 1287

Added:
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/BalancedBindingImpl.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/BasicRedistributionPolicy.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MessageRedistributor.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/QueueStats.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/QueueStatsRequest.java
Removed:
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MessageMover.java
Modified:
   trunk/src/etc/server/default/deploy/hsqldb-persistence-service.xml
   trunk/src/etc/xmdesc/ClusteredPostOffice-xmbean.xml
   trunk/src/main/org/jboss/messaging/core/plugin/ClusteredPostOfficeService.java
   trunk/src/main/org/jboss/messaging/core/plugin/contract/ClusteredPostOffice.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/PostOfficeImpl.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/BalancedBinding.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusteredPostOfficeImpl.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MoveMessagesCallback.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java
   trunk/src/main/org/jboss/messaging/core/tx/TransactionRepository.java
   trunk/tests/src/org/jboss/test/messaging/core/plugin/ClusteredPostOfficeTest.java
Log:
Clustering - message redistribution



Modified: trunk/src/etc/server/default/deploy/hsqldb-persistence-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/hsqldb-persistence-service.xml	2006-09-13 14:37:47 UTC (rev 1286)
+++ trunk/src/etc/server/default/deploy/hsqldb-persistence-service.xml	2006-09-13 18:23:31 UTC (rev 1287)
@@ -59,6 +59,7 @@
       <attribute name="GroupName">cluster1</attribute>
       <attribute name="StateTimeout">5000</attribute>
       <attribute name="CastTimeout">5000</attribute>
+      <attribute name="RedistributionPeriod">5000</attribute>
       <attribute name="SyncChannelConfig">
          <UDP mcast_addr="228.8.8.8" mcast_port="45568"
               ip_ttl="8" ip_mcast="true"

Modified: trunk/src/etc/xmdesc/ClusteredPostOffice-xmbean.xml
===================================================================
--- trunk/src/etc/xmdesc/ClusteredPostOffice-xmbean.xml	2006-09-13 14:37:47 UTC (rev 1286)
+++ trunk/src/etc/xmdesc/ClusteredPostOffice-xmbean.xml	2006-09-13 18:23:31 UTC (rev 1287)
@@ -55,10 +55,16 @@
    
    <attribute access="read-write" getMethod="getCastTimeout" setMethod="setCastTimeout">
       <description>Timeout for getState()</description>
-      <name>Timeout when waiting for synchronous responses when sending synchronous requests</name>
+      <name>CastTimeout/name>
       <type>long</type>
    </attribute>   
    
+   <attribute access="read-write" getMethod="getRedistributionPeriod" setMethod="setRedistributionPeriod">
+      <description>The period between which successive message redistribution calculations will be performed</description>
+      <name>RedistributionPeriod</name>
+      <type>long</type>
+   </attribute>    
+   
    <attribute access="read-write" getMethod="getSyncChannelConfig" setMethod="setSyncChannelConfig">
       <description>The JGroups stack configuration for the synchronous channel</description>
       <name>SyncChannelConfig</name>

Modified: trunk/src/main/org/jboss/messaging/core/plugin/ClusteredPostOfficeService.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/ClusteredPostOfficeService.java	2006-09-13 14:37:47 UTC (rev 1286)
+++ trunk/src/main/org/jboss/messaging/core/plugin/ClusteredPostOfficeService.java	2006-09-13 18:23:31 UTC (rev 1287)
@@ -29,8 +29,10 @@
 import org.jboss.messaging.core.plugin.contract.MessageStore;
 import org.jboss.messaging.core.plugin.contract.MessagingComponent;
 import org.jboss.messaging.core.plugin.contract.PersistenceManager;
+import org.jboss.messaging.core.plugin.postoffice.cluster.BasicRedistributionPolicy;
 import org.jboss.messaging.core.plugin.postoffice.cluster.ClusteredPostOfficeImpl;
 import org.jboss.messaging.core.plugin.postoffice.cluster.FavourLocalRoutingPolicy;
+import org.jboss.messaging.core.plugin.postoffice.cluster.RedistributionPolicy;
 import org.jboss.messaging.core.plugin.postoffice.cluster.RoutingPolicy;
 import org.jboss.messaging.core.tx.TransactionRepository;
 import org.w3c.dom.Element;
@@ -64,6 +66,8 @@
    
    private long castTimeout = 5000;
    
+   private long redistPeriod = 5000;
+   
    private String groupName;
    
    // Constructors --------------------------------------------------------
@@ -151,6 +155,16 @@
       return castTimeout;
    }
    
+   public void setRedistributionPeriod(long period)
+   {
+      this.redistPeriod = period;
+   }
+   
+   public long getRedistributionPeriod()
+   {
+      return redistPeriod;
+   }
+   
    public void setGroupName(String groupName)
    {
       this.groupName = groupName;
@@ -175,8 +189,7 @@
       try
       {  
          TransactionManager tm = getTransactionManagerReference();
-                  
-         
+                           
          ServerPeer serverPeer = (ServerPeer)server.getAttribute(serverPeerObjectName, "Instance");
          
          MessageStore ms = serverPeer.getMessageStore();
@@ -187,14 +200,16 @@
          
          String nodeId = serverPeer.getServerPeerID();
          
-         RoutingPolicy policy = new FavourLocalRoutingPolicy(nodeId);
+         RoutingPolicy routingPolicy = new FavourLocalRoutingPolicy(nodeId);
+         
+         RedistributionPolicy redistPolicy = new BasicRedistributionPolicy(nodeId);
                   
          postOffice =  new ClusteredPostOfficeImpl(ds, tm, sqlProperties, createTablesOnStartup,
                                                nodeId, officeName, ms,
                                                groupName,
                                                syncChannelConfig, asyncChannelConfig,
                                                tr, pm, stateTimeout, castTimeout,
-                                               policy);
+                                               routingPolicy, redistPolicy, redistPeriod);
          
          postOffice.start();
          

Modified: trunk/src/main/org/jboss/messaging/core/plugin/contract/ClusteredPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/contract/ClusteredPostOffice.java	2006-09-13 14:37:47 UTC (rev 1286)
+++ trunk/src/main/org/jboss/messaging/core/plugin/contract/ClusteredPostOffice.java	2006-09-13 18:23:31 UTC (rev 1287)
@@ -54,4 +54,5 @@
     * @throws Throwable
     */
    Binding unbindClusteredQueue(String queueName) throws Throwable;
+  
 }

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/PostOfficeImpl.java	2006-09-13 14:37:47 UTC (rev 1286)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/PostOfficeImpl.java	2006-09-13 18:23:31 UTC (rev 1287)
@@ -157,7 +157,7 @@
          
          String filter = queue.getFilter() == null ? null : queue.getFilter().getFilterString();
                     
-         binding = new BindingImpl(nodeId, queueName, condition, filter,
+         binding = createBinding(nodeId, queueName, condition, filter,
                                    queue.getChannelID(), durable);         
          
          binding.setQueue(queue);
@@ -178,7 +178,7 @@
       {
          lock.writeLock().release();
       }
-   }
+   }   
             
    public Binding unbindQueue(String queueName) throws Throwable
    {
@@ -220,15 +220,24 @@
          throw new IllegalArgumentException("Condition is null");
       }
       
-      lock.writeLock().acquire();
+      lock.readLock().acquire();
       
       try
       {
-         return listMatchingBindings(condition);
+         List list = (List)conditionMap.get(condition);
+         
+         if (list == null)
+         {
+            return Collections.EMPTY_LIST;
+         }
+         else
+         {
+            return list;
+         }
       }
       finally
       {
-         lock.writeLock().release();
+         lock.readLock().release();
       }
    }
    
@@ -239,7 +248,7 @@
          throw new IllegalArgumentException("Queue name is null");
       }
       
-      lock.writeLock().acquire();
+      lock.readLock().acquire();
       
       try
       {
@@ -256,7 +265,7 @@
       }
       finally
       {
-         lock.writeLock().release();
+         lock.readLock().release();
       }
    }
    
@@ -345,6 +354,13 @@
      
    // Protected -----------------------------------------------------
    
+   protected Binding createBinding(String nodeId, String queueName, String condition, String filter,
+                                   long channelId, boolean durable)
+   {
+      return new BindingImpl(nodeId, queueName, condition, filter,
+                             channelId, durable);   
+   }
+   
    protected void loadBindings() throws Exception
    {
       lock.writeLock().acquire();
@@ -476,7 +492,7 @@
             //We don't load the actual queue - this is because we don't know the paging params until
             //activation time
                     
-            Binding binding = new BindingImpl(nodeId, queueName, condition, selector, channelId, true);
+            Binding binding = createBinding(nodeId, queueName, condition, selector, channelId, true);
             
             list.add(binding);
          }
@@ -606,26 +622,6 @@
    }
    
    // Private -------------------------------------------------------             
-   
-   /*
-    * List all bindings whose condition matches the wildcard
-    * Initially we just do an exact match - when we support topic hierarchies this
-    * will change
-    */
-   private List listMatchingBindings(String wildcard)
-   {      
-      List list = (List)conditionMap.get(wildcard);
-      
-      if (list == null)
-      {
-         return Collections.EMPTY_LIST;
-      }
-      else
-      {
-         return list;
-      }
-   }
-  
-                  
+                 
    // Inner classes -------------------------------------------------            
 }

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/BalancedBinding.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/BalancedBinding.java	2006-09-13 14:37:47 UTC (rev 1286)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/BalancedBinding.java	2006-09-13 18:23:31 UTC (rev 1287)
@@ -37,4 +37,8 @@
    double getConsumptionRate();
    
    int getMessageCount();
+   
+   void setConsumptionRate(double rate);
+   
+   void setMessageCount(int count);
 }

Added: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/BalancedBindingImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/BalancedBindingImpl.java	2006-09-13 14:37:47 UTC (rev 1286)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/BalancedBindingImpl.java	2006-09-13 18:23:31 UTC (rev 1287)
@@ -0,0 +1,72 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.plugin.postoffice.cluster;
+
+import org.jboss.messaging.core.plugin.postoffice.BindingImpl;
+
+/**
+ * A BalancedBindingImpl
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+public class BalancedBindingImpl extends BindingImpl implements BalancedBinding
+{
+   private double consumptionRate;
+   
+   private int messageCount;
+   
+   public BalancedBindingImpl()
+   {
+   }
+
+   public BalancedBindingImpl(String nodeId, String queueName, String condition, String selector, long channelId, boolean durable)
+   {
+      super(nodeId, queueName, condition, selector, channelId, durable);
+   }
+
+   public double getConsumptionRate()
+   {
+      return consumptionRate;
+   }
+
+   public int getMessageCount()
+   {
+      return messageCount;
+   }
+
+   public void setConsumptionRate(double consumptionRate)
+   {
+      this.consumptionRate = consumptionRate;
+   }
+
+   public void setMessageCount(int messageCount)
+   {
+      this.messageCount = messageCount;
+   }
+
+ 
+
+}

Added: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/BasicRedistributionPolicy.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/BasicRedistributionPolicy.java	2006-09-13 14:37:47 UTC (rev 1286)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/BasicRedistributionPolicy.java	2006-09-13 18:23:31 UTC (rev 1287)
@@ -0,0 +1,111 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.plugin.postoffice.cluster;
+
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * A BasicRedistributonPolicy
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+public class BasicRedistributionPolicy implements RedistributionPolicy
+{
+   private String localNodeId;
+   
+   private int MAX_MESSAGES_TO_MOVE = 100;
+   
+   public BasicRedistributionPolicy(String localNodeId)
+   {
+      this.localNodeId = localNodeId;
+   }
+
+   public RedistributionOrder calculate(List bindings)
+   {
+      Iterator iter = bindings.iterator();
+      
+      BalancedBinding localBinding = null;
+      
+      while (iter.hasNext())
+      {
+         BalancedBinding binding = (BalancedBinding)iter.next();
+         
+         if (binding.getNodeId().equals(localNodeId))
+         {
+            localBinding = binding;
+            
+            break;
+         }
+      }
+      
+      if (localBinding == null)
+      {
+         return null;
+      }
+      
+      if (localBinding.getConsumptionRate() == 0 && localBinding.getMessageCount() > 0)
+      {
+         //No consumers on the queue - the messages are stranded
+         //We should consider moving them somewhere else
+         
+         //We move messages to the node with the highest consumption rate
+         
+         iter = bindings.iterator();
+         
+         double maxRate = 0;
+         
+         BalancedBinding maxRateBinding = null;
+         
+         while (iter.hasNext())
+         {
+            BalancedBinding binding = (BalancedBinding)iter.next();
+            
+            if (!binding.getNodeId().equals(localNodeId))
+            {
+               if (binding.getConsumptionRate() > maxRate)
+               {
+                  maxRate = binding.getConsumptionRate();
+                  
+                  maxRateBinding = binding;
+               }
+            }
+         }
+         
+         if (maxRate > 0)
+         {
+            //Move messages to this node
+            
+            //How many should we move?
+            int numberToMove = Math.min(MAX_MESSAGES_TO_MOVE, localBinding.getMessageCount());     
+            
+            return new RedistributionOrder(numberToMove, localBinding.getQueueName(), maxRateBinding.getNodeId());
+         }
+      }
+      
+      return null;
+   }
+}

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusteredPostOfficeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusteredPostOfficeImpl.java	2006-09-13 14:37:47 UTC (rev 1286)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusteredPostOfficeImpl.java	2006-09-13 18:23:31 UTC (rev 1287)
@@ -42,7 +42,6 @@
 import org.jboss.messaging.core.plugin.contract.PersistenceManager;
 import org.jboss.messaging.core.plugin.postoffice.ConditionBindings;
 import org.jboss.messaging.core.plugin.postoffice.PostOfficeImpl;
-import org.jboss.messaging.core.plugin.postoffice.BindingImpl;
 import org.jboss.messaging.core.tx.Transaction;
 import org.jboss.messaging.core.tx.TransactionRepository;
 import org.jgroups.Address;
@@ -115,6 +114,12 @@
    private long castTimeout;
    
    private RoutingPolicy routingPolicy;
+   
+   private RedistributionPolicy redistributionPolicy;
+   
+   private MessageRedistributor redistributor;
+   
+   private long redistributePeriod;
       
    public ClusteredPostOfficeImpl()
    {        
@@ -140,10 +145,13 @@
                               TransactionRepository tr,
                               PersistenceManager pm,
                               long stateTimeout, long castTimeout,
-                              RoutingPolicy routingPolicy) throws Exception
+                              RoutingPolicy routingPolicy,
+                              RedistributionPolicy redistributionPolicy,
+                              long redistributePeriod) throws Exception
    {            
       this(ds, tm, sqlProperties, createTablesOnStartup, nodeId, officeName, ms,
-           groupName, tr, pm, stateTimeout, castTimeout, routingPolicy);
+           groupName, tr, pm, stateTimeout, castTimeout, routingPolicy, redistributionPolicy,
+           redistributePeriod);
       
       this.syncChannelConfigE = syncChannelConfig;      
       this.asyncChannelConfigE = asyncChannelConfig;     
@@ -161,10 +169,12 @@
                               TransactionRepository tr,
                               PersistenceManager pm,
                               long stateTimeout, long castTimeout,
-                              RoutingPolicy routingPolicy) throws Exception
+                              RoutingPolicy routingPolicy,
+                              RedistributionPolicy redistributionPolicy,
+                              long redistributePeriod) throws Exception
    {            
       this(ds, tm, sqlProperties, createTablesOnStartup, nodeId, officeName, ms,
-           groupName, tr, pm, stateTimeout, castTimeout, routingPolicy);
+           groupName, tr, pm, stateTimeout, castTimeout, routingPolicy, redistributionPolicy, redistributePeriod);
 
       this.syncChannelConfigS = syncChannelConfig;      
       this.asyncChannelConfigS = asyncChannelConfig;     
@@ -177,7 +187,9 @@
                                TransactionRepository tr,
                                PersistenceManager pm,
                                long stateTimeout, long castTimeout,
-                               RoutingPolicy routingPolicy)
+                               RoutingPolicy routingPolicy,
+                               RedistributionPolicy redistributionPolicy,
+                               long redistributePeriod)
    {
       super (ds, tm, sqlProperties, createTablesOnStartup, nodeId, officeName, ms, tr);
        
@@ -191,9 +203,15 @@
       
       this.routingPolicy = routingPolicy;
       
+      this.redistributionPolicy = redistributionPolicy;
+      
+      this.redistributePeriod = redistributePeriod;
+      
       init();
    }
 
+   // MessagingComponent overrides
+   // --------------------------------------------------------------
    
    public void start() throws Exception
    {
@@ -236,12 +254,18 @@
       handleAddressNodeMapping(currentAddress, nodeId);
       
       syncSendRequest(new SendNodeIdRequest(currentAddress, nodeId));
+            
+      redistributor = new MessageRedistributor(this, redistributePeriod);
+      
+      redistributor.start();
    }
 
    public void stop() throws Exception
    {
       super.stop();
       
+      redistributor.stop();
+      
       syncChannel.close();
       
       asyncChannel.close();
@@ -463,7 +487,7 @@
     * Called when another node adds a binding
     */
    public void addBindingFromCluster(String nodeId, String queueName, String condition,
-                                      String filterString, long channelID, boolean durable)
+                                     String filterString, long channelID, boolean durable)
       throws Exception
    {
       lock.writeLock().acquire();
@@ -491,8 +515,8 @@
             throw new IllegalArgumentException(this.nodeId + "Binding already exists for node Id " + nodeId + " queue name " + queueName);
          }
          
-         binding = new BindingImpl(nodeId, queueName, condition, filterString,
-                                   channelID, durable); 
+         binding = new BalancedBindingImpl(nodeId, queueName, condition, filterString,
+                                           channelID, durable); 
          
          binding.activate();
          
@@ -533,7 +557,7 @@
       
       try
       { 
-         nodeIdAddressMap.put(nodeId, address.toString());
+         nodeIdAddressMap.put(nodeId, address);
       }
       finally
       {
@@ -653,6 +677,9 @@
       }
    }
    
+   /*
+    * Multicast a message to all members of the group
+    */
    public void asyncSendRequest(ClusterRequest request) throws Exception
    {            
       //TODO - handle serialization more efficiently
@@ -660,6 +687,19 @@
    }
    
    /*
+    * Unicast a message to one members of the group
+    */
+   public void asyncSendRequest(ClusterRequest request, String nodeId) throws Exception
+   {            
+      Address address = (Address)nodeIdAddressMap.get(nodeId);
+      
+      Message m = new Message(address, null, request);
+      
+      //TODO - handle serialization more efficiently
+      asyncChannel.send(m);
+   }
+   
+   /*
     * We put the transaction in the holding area
     */
    public void holdTransaction(TransactionId id, ClusterTransaction tx) throws Exception
@@ -742,53 +782,179 @@
       }
    }
    
-   private boolean checkTransaction(List messageHolders) throws Exception
+   public void calculateRedistribution() throws Throwable
    {
-      Iterator iter = messageHolders.iterator();
+      lock.readLock().acquire();
       
-      //We only need to check that one of the refs made it to the database - the refs would have
-      //been inserted into the db transactionally, so either they're all there or none are
-      MessageHolder holder = (MessageHolder)iter.next();
-      
-      List bindings = listBindingsForCondition(holder.getRoutingKey());
-      
-      if (bindings == null)
+      try
       {
-         throw new IllegalStateException("Cannot find bindings for key: " + holder.getRoutingKey());
+         Iterator iter = conditionMap.values().iterator();
+         
+         while (iter.hasNext())
+         {
+            ConditionBindings cb = (ConditionBindings)iter.next();
+            
+            Collection nameLists = cb.getBindingsByName();
+            
+            Iterator iter2 = nameLists.iterator();
+            
+            while (iter2.hasNext())
+            {
+               List bindings = (List)iter2.next();        
+            
+               if (bindings.size() > 1)
+               {
+                  RedistributionOrder order = redistributionPolicy.calculate(bindings);
+                  
+                  if (order != null)
+                  {
+                     moveMessages(order.getQueueName(), order.getDestinationNodeId(), order.getNumberOfMessages());
+                  }
+               }
+            }
+         }
       }
+      finally
+      {
+         lock.readLock().release();
+      }
+   }
+   
+   public void sendStats() throws Exception
+   {
+      lock.writeLock().acquire();
       
-      Iterator iter2 = bindings.iterator();
+      List stats = null;      
       
-      long channelID = -1;
-      boolean found = false;
-      
-      while (iter2.hasNext())
+      try
       {
-         Binding binding = (Binding)iter2.next();
          
-         if (binding.isDurable())
+         Map nameMap = (Map)nameMaps.get(nodeId);
+         
+         Iterator iter = nameMap.values().iterator();
+                  
+         while (iter.hasNext())
          {
-            found = true;
+            BalancedBinding bb = (BalancedBinding)iter.next();
             
-            channelID = binding.getChannelId();
+            MeasuredQueue q = (MeasuredQueue)bb.getQueue();
+            
+            //We don't bother sending the stat if there is less than STATS_DIFFERENCE_MARGIN_PERCENT % difference
+            
+            double newRate = q.getGrowthRate();
+            
+            int newMessageCount = q.messageCount();
+            
+            boolean sendStats = decideToSendStats(bb.getConsumptionRate(), newRate);
+            
+            if (!sendStats)
+            {
+               sendStats = decideToSendStats(bb.getMessageCount(), newMessageCount);
+            }
+            
+            if (sendStats)
+            {
+               bb.setConsumptionRate(newRate);
+               bb.setMessageCount(newMessageCount);
+               
+               if (stats == null)
+               {
+                  stats = new ArrayList();
+               }
+               QueueStats qs = new QueueStats(bb.getQueueName(), newRate, newMessageCount);
+               
+               stats.add(qs);
+            }                  
          }
       }
-      
-      if (!found)
+      finally
       {
-         throw new IllegalStateException("Cannot find bindings");
+         lock.writeLock().release();
       }
       
-      if (pm.referenceExists(channelID, holder.getMessage().getMessageID()))
+      if (stats != null)
       {
-         return true;
+         ClusterRequest req = new QueueStatsRequest(nodeId, stats);
+         
+         asyncSendRequest(req);
       }
+   }
+   
+   private boolean decideToSendStats(double oldValue, double newValue)
+   {
+      boolean sendStats = false;
+      
+      if (oldValue != 0)
+      {         
+         int percentChange = (int)(100 * (oldValue - newValue) / oldValue);
+         
+         if (Math.abs(percentChange) >= STATS_DIFFERENCE_MARGIN_PERCENT)
+         {
+            sendStats = true;
+         }
+      }
       else
       {
-         return false;
+         if (newValue != 0)
+         {
+            sendStats = true;
+         }
       }
+      return sendStats;
    }
-    
+   
+   private static final int STATS_DIFFERENCE_MARGIN_PERCENT = 10;
+   
+   public void updateQueueStats(String nodeId, List stats) throws Exception
+   {
+      lock.writeLock().acquire();
+      
+      Map nameMap = (Map)nameMaps.get(nodeId);
+      
+      if (nameMap == null)
+      {
+         throw new IllegalStateException("Cannot find name map for node id " + nodeId);
+      }
+            
+      try
+      {
+         Iterator iter = stats.iterator();
+         
+         while (iter.hasNext())
+         {
+            QueueStats st = (QueueStats)iter.next();
+            
+            BalancedBinding bb = (BalancedBinding)nameMap.get(st.getQueueName());
+            
+            if (bb == null)
+            {
+               throw new IllegalStateException("Cannot find binding for queue name: " + st.getQueueName());
+            }
+            
+            bb.setConsumptionRate(st.getConsumptionRate());
+            
+            bb.setMessageCount(st.getMessageCount());
+         }         
+      }
+      finally
+      {
+         lock.writeLock().release();      
+      }
+   }
+          
+   
+   
+   // Public ------------------------------------------------------------------------------------------
+      
+   // Protected ---------------------------------------------------------------------------------------
+   
+   protected Binding createBinding(String nodeId, String queueName, String condition, String filter,
+            long channelId, boolean durable)
+   {
+      return new BalancedBindingImpl(nodeId, queueName, condition, filter,
+                                     channelId, durable);   
+   }
+           
    protected void loadBindings() throws Exception
    {
       // TODO I need to know whether this call times out - how do I know this??
@@ -814,13 +980,9 @@
       }
    }
    
-   // Public ------------------------------------------------------------------------------------------
-   
-   
-   // Protected ---------------------------------------------------------------------------------------
-   
-   
-   protected void syncSendRequest(ClusterRequest request) throws Exception
+   // Private ------------------------------------------------------------------------------------------
+           
+   private void syncSendRequest(ClusterRequest request) throws Exception
    {            
       //TODO - handle serialization more efficiently
       
@@ -829,54 +991,55 @@
       controlMessageDispatcher.castMessage(null, message, GroupRequest.GET_ALL, castTimeout);
    }
    
-   /*
-    * We have received a reference cast from another node - and we need to route it to our local
-    * subscriptions    
-    */
-//   protected void routeFromCluster(MessageReference ref, String routingKey) throws Exception
-//   {
-//      lock.readLock().acquire();
-//      
-//      try
-//      {      
-//         // We route on the condition
-//         List bindings = (List)conditionMap.get(routingKey);
-//      
-//         if (bindings != null)
-//         {                                
-//            Iterator iter = bindings.iterator();
-//            
-//            while (iter.hasNext())
-//            {
-//               Binding binding = (Binding)iter.next();
-//               
-//               if (binding.isActive())
-//               {            
-//                  if (binding.getNodeId().equals(this.nodeId))
-//                  {  
-//                     //It's a local binding so we pass the message on to the subscription
-//                     Queue subscription = binding.getQueue();
-//                  
-//                     //TODO instead of adding a new method on the channel
-//                     //we should set a header and use the same method
-//                     subscription.handleDontPersist(null, ref, null);
-//                  }                               
-//               }
-//            }                          
-//         }
-//      }
-//      finally
-//      {                  
-//         lock.readLock().release();
-//      }
-//   }            
-//   
-  
-       
-   // Private ------------------------------------------------------------------------------------------
-             
-   private void removeBindingsForAddress(String address) throws Exception
+   private boolean checkTransaction(List messageHolders) throws Exception
    {
+      Iterator iter = messageHolders.iterator();
+      
+      //We only need to check that one of the refs made it to the database - the refs would have
+      //been inserted into the db transactionally, so either they're all there or none are
+      MessageHolder holder = (MessageHolder)iter.next();
+      
+      List bindings = listBindingsForCondition(holder.getRoutingKey());
+      
+      if (bindings == null)
+      {
+         throw new IllegalStateException("Cannot find bindings for key: " + holder.getRoutingKey());
+      }
+      
+      Iterator iter2 = bindings.iterator();
+      
+      long channelID = -1;
+      boolean found = false;
+      
+      while (iter2.hasNext())
+      {
+         Binding binding = (Binding)iter2.next();
+         
+         if (binding.isDurable())
+         {
+            found = true;
+            
+            channelID = binding.getChannelId();
+         }
+      }
+      
+      if (!found)
+      {
+         throw new IllegalStateException("Cannot find bindings");
+      }
+      
+      if (pm.referenceExists(channelID, holder.getMessage().getMessageID()))
+      {
+         return true;
+      }
+      else
+      {
+         return false;
+      }
+   }
+   
+   private void removeBindingsForAddress(Address address) throws Exception
+   {
       lock.writeLock().acquire();
       
       try
@@ -888,9 +1051,9 @@
          {
             Map.Entry entry = (Map.Entry)iter.next();
             
-            String str = (String)entry.getValue();
+            Address adr = (Address)entry.getValue();
             
-            if (str.equals(address))
+            if (adr.equals(address))
             {
                nodeId = (String)entry.getKey();
             }
@@ -989,6 +1152,41 @@
       this.nodeIdAddressMap.putAll(state.getNodeIdAddressMap());
    }
    
+   /*
+    * Move messages from queue on one node to queue on another node
+    */
+   private void moveMessages(String queueName, String toNodeId, int num) throws Throwable
+   {      
+      Binding binding = getBindingForQueueName(queueName);
+      
+      if (binding == null)
+      {
+         throw new IllegalStateException("Cannot find binding for queue name: " + queueName);
+      }
+      
+      Queue fromQueue = binding.getQueue();
+
+      Transaction tx = tr.createTransaction();
+               
+      List dels = ((MeasuredQueue)fromQueue).getDeliveries(num);
+      
+      Iterator iter = dels.iterator();
+      
+      MoveMessagesCallback cb = new MoveMessagesCallback(nodeId, toNodeId, queueName,
+                                                         tx.getId(), this);      
+      while (iter.hasNext())
+      {
+         Delivery del = (Delivery)iter.next();
+         
+         del.acknowledge(tx);      
+         
+         cb.addMessage(del.getReference().getMessage());
+      }
+      
+      tx.commit();
+ 
+   } 
+   
    // Inner classes -------------------------------------------------------------------
     
    /*
@@ -1100,7 +1298,7 @@
                   {                  
                      try
                      {
-                        removeBindingsForAddress(address.toString());
+                        removeBindingsForAddress(address);
                      }               
                      catch (Exception e)
                      {

Deleted: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MessageMover.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MessageMover.java	2006-09-13 14:37:47 UTC (rev 1286)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MessageMover.java	2006-09-13 18:23:31 UTC (rev 1287)
@@ -1,102 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.messaging.core.plugin.postoffice.cluster;
-
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-
-import org.jboss.messaging.core.Delivery;
-import org.jboss.messaging.core.local.Queue;
-import org.jboss.messaging.core.plugin.contract.Binding;
-import org.jboss.messaging.core.tx.Transaction;
-import org.jboss.messaging.core.tx.TransactionRepository;
-
-/**
- * A MessageMover
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @version <tt>$Revision: 1.1 $</tt>
- *
- * $Id$
- *
- */
-public class MessageMover
-{
-   private TransactionRepository tr;
-   
-   private String nodeId;
-   
-   private RedistributionPolicy redistributionPolicy;
-   
-   public void moveMessages(Binding from, Binding to, int num) throws Throwable
-   {
-      if (!from.getNodeId().equals(this.nodeId))
-      {
-         throw new IllegalArgumentException("From binding must be on local node!");
-      }
-      
-      if (to.getNodeId().equals(this.nodeId))
-      {
-         throw new IllegalArgumentException("To binding cannot be on local node");
-      }
-      
-      //Consume the messages in a transaction - don't commit
-      Queue fromQueue = from.getQueue();
-      
-      Transaction tx = tr.createTransaction();
-      
-      List dels = ((MeasuredQueue)fromQueue).getDeliveries(num);
-      
-      Iterator iter = dels.iterator();
-      
-      while (iter.hasNext())
-      {
-         Delivery del = (Delivery)iter.next();
-         
-         del.acknowledge(tx);         
-      }
-      
-      
- 
-   } 
-   
-   public void calculateMovements(Collection nameLists)
-   {
-      Iterator iter = nameLists.iterator();
-      
-      while (iter.hasNext())
-      {
-         List bindings = (List)iter.next();
-         
-         if (bindings.size() > 1)
-         {
-            RedistributionOrder order = redistributionPolicy.calculate(bindings);
-            
-            if (order != null)
-            {
-               
-            }
-         }
-      }
-   }
-}

Added: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MessageRedistributor.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MessageRedistributor.java	2006-09-13 14:37:47 UTC (rev 1286)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MessageRedistributor.java	2006-09-13 18:23:31 UTC (rev 1287)
@@ -0,0 +1,100 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.plugin.postoffice.cluster;
+
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.jboss.logging.Logger;
+import org.jboss.messaging.core.plugin.contract.MessagingComponent;
+
+/**
+ * A MessageMover
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+public class MessageRedistributor implements MessagingComponent
+{
+   private static final Logger log = Logger.getLogger(MessageRedistributor.class);
+     
+   private PostOfficeInternal office;
+   
+   private Timer timer;
+   
+   private long period;
+   
+   public MessageRedistributor(PostOfficeInternal office,
+                               long period)
+   {
+      this.office = office;
+      
+      this.period = period;
+   }
+   
+   
+   // MessagingComponent overrides
+   // ---------------------------------------------------
+   
+   public void start() throws Exception
+   {
+      timer = new Timer(true);
+      
+      //Add a random delay to start, to prevent multiple exchanges all calculating at the same time
+      
+      long delay = (long)(Math.random() * period);
+      
+      timer.schedule(new RedistributeTimerTask(), delay, period);
+   }
+
+   public void stop() throws Exception
+   {
+      timer.cancel();
+   }
+   
+   class RedistributeTimerTask extends TimerTask
+   {
+      public void run()
+      {
+         try
+         {
+            office.calculateRedistribution();
+         }
+         catch (Throwable t)
+         {
+            log.error("Caught Throwable in calculating message redistribution", t);
+         }
+         try
+         {
+            office.sendStats();
+         }
+         catch (Exception e)
+         {
+            log.error("Caught Exception in calculating/sending queue statistics", e);
+         }
+      }
+      
+   }
+}

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MoveMessagesCallback.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MoveMessagesCallback.java	2006-09-13 14:37:47 UTC (rev 1286)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MoveMessagesCallback.java	2006-09-13 18:23:31 UTC (rev 1287)
@@ -40,15 +40,31 @@
 {
    private List messages;
    
+   private String destinationNodeId;
+   
+   private String currentNodeId;
+   
    private String queueName;
    
-   MoveMessagesCallback(String queueName)
+   private long txId;
+   
+   private PostOfficeInternal office;
+      
+   MoveMessagesCallback(String currentNodeId, String destNodeId, String queueName, long txId, PostOfficeInternal office)
    {
+      this.currentNodeId = currentNodeId;
+      
+      this.destinationNodeId = destNodeId;
+      
       this.queueName = queueName;
       
+      this.txId = txId;
+      
+      this.office = office;
+      
       messages = new ArrayList();
    }
-
+   
    void addMessage(Message msg)
    {
       messages.add(msg);
@@ -56,32 +72,38 @@
    
    public void afterCommit(boolean onePhase) throws Exception
    {
- 
+      ClusterRequest req = new MoveTransactionRequest(currentNodeId, txId);
+      
+      //We unicast the message to the node
+      office.asyncSendRequest(req, destinationNodeId);    
    }
 
    public void afterPrepare() throws Exception
    {
-
+      //NOOP
    }
 
    public void afterRollback(boolean onePhase) throws Exception
    {
-
+      //NOOP
    }
 
    public void beforeCommit(boolean onePhase) throws Exception
    {
-
+      ClusterRequest req = new MoveTransactionRequest(currentNodeId, txId, messages, queueName);
+      
+      //We unicast
+      office.asyncSendRequest(req, destinationNodeId);
    }
 
    public void beforePrepare() throws Exception
    {
-
+      //NOOP
    }
 
    public void beforeRollback(boolean onePhase) throws Exception
    {
-
+      //NOOP
    }
 
 }

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java	2006-09-13 14:37:47 UTC (rev 1286)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java	2006-09-13 18:23:31 UTC (rev 1287)
@@ -25,6 +25,7 @@
 import java.util.Map;
 
 import org.jboss.messaging.core.Message;
+import org.jboss.messaging.core.plugin.contract.ClusteredPostOffice;
 import org.jgroups.Address;
 
 /**
@@ -40,7 +41,7 @@
  * $Id$
  *
  */
-interface PostOfficeInternal
+interface PostOfficeInternal extends ClusteredPostOffice
 {
    void addBindingFromCluster(String nodeId, String queueName, String condition,
                               String filterString, long channelId, boolean durable)
@@ -58,9 +59,17 @@
    
    void asyncSendRequest(ClusterRequest request) throws Exception;
    
+   void asyncSendRequest(ClusterRequest request, String nodeId) throws Exception;
+   
    void holdTransaction(TransactionId id, ClusterTransaction tx) throws Exception;
    
    void commitTransaction(TransactionId id) throws Exception;
    
    void check(String nodeId) throws Exception;
+   
+   void calculateRedistribution() throws Throwable;
+   
+   void updateQueueStats(String nodeId, List stats) throws Exception;
+   
+   void sendStats() throws Exception;
 }

Added: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/QueueStats.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/QueueStats.java	2006-09-13 14:37:47 UTC (rev 1286)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/QueueStats.java	2006-09-13 18:23:31 UTC (rev 1287)
@@ -0,0 +1,66 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.plugin.postoffice.cluster;
+
+import java.io.Serializable;
+
+/**
+ * A QueueStats
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+public class QueueStats implements Serializable
+{
+   private String queueName;
+   
+   private double consumptionRate;
+   
+   private int messageCount;
+
+   public QueueStats(String queueName, double consumptionRate, int messageCount)
+   {
+      this.queueName = queueName;
+      
+      this.consumptionRate = consumptionRate;
+      
+      this.messageCount = messageCount;
+   }
+
+   public double getConsumptionRate()
+   {
+      return consumptionRate;
+   }
+
+   public int getMessageCount()
+   {
+      return messageCount;
+   }
+
+   public String getQueueName()
+   {
+      return queueName;
+   }      
+}

Added: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/QueueStatsRequest.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/QueueStatsRequest.java	2006-09-13 14:37:47 UTC (rev 1286)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/QueueStatsRequest.java	2006-09-13 18:23:31 UTC (rev 1287)
@@ -0,0 +1,52 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.plugin.postoffice.cluster;
+
+import java.util.List;
+
+/**
+ * A QueueStatsRequest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+public class QueueStatsRequest implements ClusterRequest
+{
+   private String nodeId;
+   
+   private List queueStats;
+   
+   public QueueStatsRequest(String nodeId, List stats)
+   {
+      this.nodeId = nodeId;
+      
+      this.queueStats = stats;
+   }
+
+   public void execute(PostOfficeInternal office) throws Exception
+   {
+      office.updateQueueStats(nodeId, queueStats);
+   }
+}

Modified: trunk/src/main/org/jboss/messaging/core/tx/TransactionRepository.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/tx/TransactionRepository.java	2006-09-13 14:37:47 UTC (rev 1286)
+++ trunk/src/main/org/jboss/messaging/core/tx/TransactionRepository.java	2006-09-13 18:23:31 UTC (rev 1287)
@@ -54,11 +54,11 @@
    
    private boolean trace = log.isTraceEnabled();
    
-   protected Map globalToLocalMap;     
+   private Map globalToLocalMap;     
    
-   protected PersistenceManager persistenceManager;
+   private PersistenceManager persistenceManager;
    
-   protected IdManager idManager;
+   private IdManager idManager;
 
    // Static --------------------------------------------------------
    
@@ -151,6 +151,7 @@
       if (trace) { log.trace("created transaction " + tx); }
       
       globalToLocalMap.put(xid, tx);
+      
       return tx;
    }
    
@@ -163,6 +164,11 @@
       return tx;
    }
    
+   public boolean removeTransaction(Xid xid)
+   {
+      return globalToLocalMap.remove(xid) != null;
+   }
+   
    // Package protected ---------------------------------------------
    
    // Protected -----------------------------------------------------         

Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/ClusteredPostOfficeTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/ClusteredPostOfficeTest.java	2006-09-13 14:37:47 UTC (rev 1286)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/ClusteredPostOfficeTest.java	2006-09-13 18:23:31 UTC (rev 1287)
@@ -29,7 +29,11 @@
 import org.jboss.messaging.core.message.MessageFactory;
 import org.jboss.messaging.core.plugin.contract.Binding;
 import org.jboss.messaging.core.plugin.contract.ClusteredPostOffice;
+import org.jboss.messaging.core.plugin.postoffice.cluster.BasicRedistributionPolicy;
 import org.jboss.messaging.core.plugin.postoffice.cluster.ClusteredPostOfficeImpl;
+import org.jboss.messaging.core.plugin.postoffice.cluster.FavourLocalRoutingPolicy;
+import org.jboss.messaging.core.plugin.postoffice.cluster.RedistributionPolicy;
+import org.jboss.messaging.core.plugin.postoffice.cluster.RoutingPolicy;
 import org.jboss.messaging.core.tx.Transaction;
 import org.jboss.test.messaging.core.SimpleReceiver;
 
@@ -1170,12 +1174,16 @@
    
    protected ClusteredPostOffice createClusteredPostOffice(String nodeId, String groupName) throws Exception
    {
+      RoutingPolicy routingPolicy = new FavourLocalRoutingPolicy(nodeId);
+      
+      RedistributionPolicy redistPolicy = new BasicRedistributionPolicy(nodeId);
+      
       ClusteredPostOfficeImpl postOffice = 
          new ClusteredPostOfficeImpl(sc.getDataSource(), sc.getTransactionManager(),
                                  null, true, nodeId, "Clustered", ms, groupName,
                                  JGroupsUtil.getControlStackProperties(50, 1),
                                  JGroupsUtil.getDataStackProperties(50, 1),
-                                 tr, pm, 5000, 5000);
+                                 tr, pm, 5000, 5000, routingPolicy, redistPolicy, 5000);
       
       postOffice.start();      
       




More information about the jboss-cvs-commits mailing list