[jboss-cvs] JBoss Messaging SVN: r5751 - in trunk: src/main/org/jboss/messaging/core/config and 16 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Jan 28 09:18:20 EST 2009


Author: timfox
Date: 2009-01-28 09:18:19 -0500 (Wed, 28 Jan 2009)
New Revision: 5751

Added:
   trunk/src/main/org/jboss/messaging/core/config/cluster/ClusterConnectionConfiguration.java
   trunk/src/main/org/jboss/messaging/core/postoffice/impl/DivertBinding.java
   trunk/src/main/org/jboss/messaging/core/postoffice/impl/LocalQueueBinding.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/ClusterConnection.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/RemoteQueueBinding.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/impl/RemoteQueueBindingImpl.java
Removed:
   trunk/src/main/org/jboss/messaging/core/config/cluster/ClusterConfiguration.java
   trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingImpl.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/Cluster.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/FlowBinding.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterImpl.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/impl/FlowBindingImpl.java
Modified:
   trunk/src/main/org/jboss/messaging/core/client/management/impl/ManagementHelper.java
   trunk/src/main/org/jboss/messaging/core/config/Configuration.java
   trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
   trunk/src/main/org/jboss/messaging/core/filter/impl/FilterImpl.java
   trunk/src/main/org/jboss/messaging/core/management/ManagementService.java
   trunk/src/main/org/jboss/messaging/core/management/MessagingServerControlMBean.java
   trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java
   trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java
   trunk/src/main/org/jboss/messaging/core/management/jmx/impl/ReplicationAwareMessagingServerControlWrapper.java
   trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
   trunk/src/main/org/jboss/messaging/core/postoffice/AddressManager.java
   trunk/src/main/org/jboss/messaging/core/postoffice/Binding.java
   trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java
   trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
   trunk/src/main/org/jboss/messaging/core/postoffice/impl/SimpleAddressManager.java
   trunk/src/main/org/jboss/messaging/core/postoffice/impl/WildcardAddressManager.java
   trunk/src/main/org/jboss/messaging/core/server/Bindable.java
   trunk/src/main/org/jboss/messaging/core/server/Consumer.java
   trunk/src/main/org/jboss/messaging/core/server/Divert.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/Bridge.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/DivertImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
   trunk/tests/src/org/jboss/messaging/tests/performance/persistence/FakeBinding.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/fakes/FakeConsumer.java
Log:
More clustering


Modified: trunk/src/main/org/jboss/messaging/core/client/management/impl/ManagementHelper.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/management/impl/ManagementHelper.java	2009-01-28 10:53:22 UTC (rev 5750)
+++ trunk/src/main/org/jboss/messaging/core/client/management/impl/ManagementHelper.java	2009-01-28 14:18:19 UTC (rev 5751)
@@ -48,27 +48,27 @@
 
    // Constants -----------------------------------------------------
 
-   public static final SimpleString HDR_JMX_OBJECTNAME = new SimpleString("JBM_JMX_ObjectName");
+   public static final SimpleString HDR_JMX_OBJECTNAME = new SimpleString("_JBM_JMX_ObjectName");
 
-   public static final SimpleString HDR_JMX_ATTRIBUTE_PREFIX = new SimpleString("JBM_JMXAttribute.");
+   public static final SimpleString HDR_JMX_ATTRIBUTE_PREFIX = new SimpleString("_JBM_JMXAttribute.");
 
-   public static final SimpleString HDR_JMX_OPERATION_PREFIX = new SimpleString("JBM_JMXOperation.");
+   public static final SimpleString HDR_JMX_OPERATION_PREFIX = new SimpleString("_JBM_JMXOperation.");
 
    public static final SimpleString HDR_JMX_OPERATION_NAME = new SimpleString(HDR_JMX_OPERATION_PREFIX + "name");
 
-   public static final SimpleString HDR_JMX_OPERATION_SUCCEEDED = new SimpleString("JBM_JMXOperationSucceeded");
+   public static final SimpleString HDR_JMX_OPERATION_SUCCEEDED = new SimpleString("_JBM_JMXOperationSucceeded");
 
-   public static final SimpleString HDR_JMX_OPERATION_EXCEPTION = new SimpleString("JBM_JMXOperationException");
+   public static final SimpleString HDR_JMX_OPERATION_EXCEPTION = new SimpleString("_JBM_JMXOperationException");
 
-   public static final SimpleString HDR_NOTIFICATION_TYPE = new SimpleString("JBM_NotifType");
+   public static final SimpleString HDR_NOTIFICATION_TYPE = new SimpleString("_JBM_NotifType");
 
-   public static final SimpleString HDR_NOTIFICATION_TIMESTAMP = new SimpleString("JBM_NotifTimestamp");
+   public static final SimpleString HDR_NOTIFICATION_TIMESTAMP = new SimpleString("_JBM_NotifTimestamp");
    
-   public static final SimpleString HDR_QUEUE_NAME = new SimpleString("JBM_QueueName");
+   public static final SimpleString HDR_QUEUE_NAME = new SimpleString("_JBM_QueueName");
    
-   public static final SimpleString HDR_ADDRESS = new SimpleString("JBM_Address");
+   public static final SimpleString HDR_ADDRESS = new SimpleString("_JBM_Address");
    
-   public static final SimpleString HDR_FILTERSTRING = new SimpleString("JBM_FilterString");
+   public static final SimpleString HDR_FILTERSTRING = new SimpleString("_JBM_FilterString");
 
    // Attributes ----------------------------------------------------
 

Modified: trunk/src/main/org/jboss/messaging/core/config/Configuration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/Configuration.java	2009-01-28 10:53:22 UTC (rev 5750)
+++ trunk/src/main/org/jboss/messaging/core/config/Configuration.java	2009-01-28 14:18:19 UTC (rev 5751)
@@ -29,7 +29,7 @@
 
 import org.jboss.messaging.core.config.cluster.BridgeConfiguration;
 import org.jboss.messaging.core.config.cluster.BroadcastGroupConfiguration;
-import org.jboss.messaging.core.config.cluster.ClusterConfiguration;
+import org.jboss.messaging.core.config.cluster.ClusterConnectionConfiguration;
 import org.jboss.messaging.core.config.cluster.DiscoveryGroupConfiguration;
 import org.jboss.messaging.core.config.cluster.DivertConfiguration;
 import org.jboss.messaging.core.config.cluster.QueueConfiguration;
@@ -119,9 +119,9 @@
 
    void setDivertConfigurations(final List<DivertConfiguration> configs);
    
-   List<ClusterConfiguration> getClusterConfigurations();
+   List<ClusterConnectionConfiguration> getClusterConfigurations();
 
-   void setClusterConfigurations(final List<ClusterConfiguration> configs);
+   void setClusterConfigurations(final List<ClusterConnectionConfiguration> configs);
    
    List<QueueConfiguration> getQueueConfigurations();
 

Deleted: trunk/src/main/org/jboss/messaging/core/config/cluster/ClusterConfiguration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/cluster/ClusterConfiguration.java	2009-01-28 10:53:22 UTC (rev 5750)
+++ trunk/src/main/org/jboss/messaging/core/config/cluster/ClusterConfiguration.java	2009-01-28 14:18:19 UTC (rev 5751)
@@ -1,113 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.core.config.cluster;
-
-import java.io.Serializable;
-import java.util.List;
-
-import org.jboss.messaging.util.Pair;
-
-/**
- * A ClusterConfiguration
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * 
- * Created 13 Jan 2009 09:42:17
- *
- *
- */
-public class ClusterConfiguration implements Serializable
-{
-   private static final long serialVersionUID = 8948303813427795935L;
-
-   private final String name;
-   
-   private final String address;
-
-   private final BridgeConfiguration bridgeConfig;
-   
-   private final boolean duplicateDetection;
-
-   private final List<Pair<String, String>> staticConnectorNamePairs;
-
-   private final String discoveryGroupName;
-
-   public ClusterConfiguration(final String name,
-                               final String address,
-                               final BridgeConfiguration bridgeConfig,
-                               final boolean duplicateDetection,
-                               final List<Pair<String, String>> staticConnectorNamePairs)
-   {
-      this.name = name;
-      this.address = address;
-      this.bridgeConfig = bridgeConfig;
-      this.staticConnectorNamePairs = staticConnectorNamePairs;
-      this.duplicateDetection = duplicateDetection;
-      this.discoveryGroupName = null;
-   }
-
-   public ClusterConfiguration(final String name,
-                               final String address,
-                               final BridgeConfiguration bridgeConfig,
-                               final boolean duplicateDetection,
-                               final String discoveryGroupName)
-   {
-      this.name = name;
-      this.address = address;
-      this.bridgeConfig = bridgeConfig;
-      this.duplicateDetection = duplicateDetection;
-      this.discoveryGroupName = discoveryGroupName;
-      this.staticConnectorNamePairs = null;
-   }
-   
-   public String getName()
-   {
-      return name;
-   }
-
-   public String getAddress()
-   {
-      return address;
-   }
-
-   public BridgeConfiguration getBridgeConfig()
-   {
-      return bridgeConfig;
-   }
-   
-   public boolean isDuplicateDetection()
-   {
-      return duplicateDetection;
-   }
-
-   public List<Pair<String, String>> getStaticConnectorNamePairs()
-   {
-      return staticConnectorNamePairs;
-   }
-
-   public String getDiscoveryGroupName()
-   {
-      return discoveryGroupName;
-   }
-
-}

Copied: trunk/src/main/org/jboss/messaging/core/config/cluster/ClusterConnectionConfiguration.java (from rev 5744, trunk/src/main/org/jboss/messaging/core/config/cluster/ClusterConfiguration.java)
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/cluster/ClusterConnectionConfiguration.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/config/cluster/ClusterConnectionConfiguration.java	2009-01-28 14:18:19 UTC (rev 5751)
@@ -0,0 +1,124 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.config.cluster;
+
+import java.io.Serializable;
+import java.util.List;
+
+import org.jboss.messaging.util.Pair;
+
+/**
+ * A ClusterConnectionConfiguration
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * 
+ * Created 13 Jan 2009 09:42:17
+ *
+ *
+ */
+public class ClusterConnectionConfiguration implements Serializable
+{
+   private static final long serialVersionUID = 8948303813427795935L;
+
+   private final String name;
+
+   private final String address;
+
+   private final BridgeConfiguration bridgeConfig;
+
+   private final boolean duplicateDetection;
+   
+   private final boolean forwardWhenNoConsumers;
+
+   private final List<Pair<String, String>> staticConnectorNamePairs;
+
+   private final String discoveryGroupName;
+
+   public ClusterConnectionConfiguration(final String name,
+                                         final String address,
+                                         final BridgeConfiguration bridgeConfig,
+                                         final boolean duplicateDetection,
+                                         final boolean forwardWhenNoConsumers,
+                                         final List<Pair<String, String>> staticConnectorNamePairs)
+   {
+      this.name = name;
+      this.address = address;
+      this.bridgeConfig = bridgeConfig;
+      this.staticConnectorNamePairs = staticConnectorNamePairs;
+      this.duplicateDetection = duplicateDetection;
+      this.forwardWhenNoConsumers = forwardWhenNoConsumers;
+      this.discoveryGroupName = null;
+   }
+
+   public ClusterConnectionConfiguration(final String name,
+                                         final String address,
+                                         final BridgeConfiguration bridgeConfig,
+                                         final boolean duplicateDetection,
+                                         final boolean forwardWhenNoConsumers,
+                                         final String discoveryGroupName)
+   {
+      this.name = name;
+      this.address = address;
+      this.bridgeConfig = bridgeConfig;
+      this.duplicateDetection = duplicateDetection;
+      this.forwardWhenNoConsumers = forwardWhenNoConsumers;
+      this.discoveryGroupName = discoveryGroupName;
+      this.staticConnectorNamePairs = null;
+   }
+
+   public String getName()
+   {
+      return name;
+   }
+
+   public String getAddress()
+   {
+      return address;
+   }
+
+   public BridgeConfiguration getBridgeConfig()
+   {
+      return bridgeConfig;
+   }
+
+   public boolean isDuplicateDetection()
+   {
+      return duplicateDetection;
+   }
+   
+   public boolean isForwardWhenNoConsumers()
+   {
+      return forwardWhenNoConsumers;
+   }
+
+   public List<Pair<String, String>> getStaticConnectorNamePairs()
+   {
+      return staticConnectorNamePairs;
+   }
+
+   public String getDiscoveryGroupName()
+   {
+      return discoveryGroupName;
+   }
+
+}

Modified: trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java	2009-01-28 10:53:22 UTC (rev 5750)
+++ trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java	2009-01-28 14:18:19 UTC (rev 5751)
@@ -24,7 +24,7 @@
 import org.jboss.messaging.core.config.TransportConfiguration;
 import org.jboss.messaging.core.config.cluster.BridgeConfiguration;
 import org.jboss.messaging.core.config.cluster.BroadcastGroupConfiguration;
-import org.jboss.messaging.core.config.cluster.ClusterConfiguration;
+import org.jboss.messaging.core.config.cluster.ClusterConnectionConfiguration;
 import org.jboss.messaging.core.config.cluster.DiscoveryGroupConfiguration;
 import org.jboss.messaging.core.config.cluster.DivertConfiguration;
 import org.jboss.messaging.core.config.cluster.QueueConfiguration;
@@ -165,7 +165,7 @@
    
    protected List<DivertConfiguration> divertConfigurations = new ArrayList<DivertConfiguration>();
    
-   protected List<ClusterConfiguration> clusterConfigurations = new ArrayList<ClusterConfiguration>();
+   protected List<ClusterConnectionConfiguration> clusterConfigurations = new ArrayList<ClusterConnectionConfiguration>();
    
    protected List<QueueConfiguration> queueConfigurations = new ArrayList<QueueConfiguration>();
    
@@ -363,12 +363,12 @@
       this.broadcastGroupConfigurations = configs;
    }
 
-   public List<ClusterConfiguration> getClusterConfigurations()
+   public List<ClusterConnectionConfiguration> getClusterConfigurations()
    {
       return this.clusterConfigurations;
    }
    
-   public void setClusterConfigurations(final List<ClusterConfiguration> configs)
+   public void setClusterConfigurations(final List<ClusterConnectionConfiguration> configs)
    {
       this.clusterConfigurations = configs;
    }

Modified: trunk/src/main/org/jboss/messaging/core/filter/impl/FilterImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/filter/impl/FilterImpl.java	2009-01-28 10:53:22 UTC (rev 5750)
+++ trunk/src/main/org/jboss/messaging/core/filter/impl/FilterImpl.java	2009-01-28 14:18:19 UTC (rev 5751)
@@ -143,14 +143,14 @@
             Object val = null;
 
             if (id.getName().startsWith(JBM_PREFIX))
-            {
+            {               
                // Look it up as header fields
                val = getHeaderFieldValue(message, id.getName());
             }
 
             if (val == null)
-            {
-               val = message.getProperty(id.getName());
+            {               
+               val = message.getProperty(id.getName());               
             }
 
             id.setValue(val);
@@ -158,7 +158,7 @@
          }
 
          // Compute the result of this operator
-
+         
          boolean res = (Boolean)operator.apply();
 
          return res;

Modified: trunk/src/main/org/jboss/messaging/core/management/ManagementService.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/ManagementService.java	2009-01-28 10:53:22 UTC (rev 5750)
+++ trunk/src/main/org/jboss/messaging/core/management/ManagementService.java	2009-01-28 14:18:19 UTC (rev 5751)
@@ -32,7 +32,7 @@
 import org.jboss.messaging.core.config.TransportConfiguration;
 import org.jboss.messaging.core.config.cluster.BridgeConfiguration;
 import org.jboss.messaging.core.config.cluster.BroadcastGroupConfiguration;
-import org.jboss.messaging.core.config.cluster.ClusterConfiguration;
+import org.jboss.messaging.core.config.cluster.ClusterConnectionConfiguration;
 import org.jboss.messaging.core.config.cluster.DiscoveryGroupConfiguration;
 import org.jboss.messaging.core.messagecounter.MessageCounterManager;
 import org.jboss.messaging.core.persistence.StorageManager;
@@ -47,7 +47,7 @@
 import org.jboss.messaging.core.server.ServerMessage;
 import org.jboss.messaging.core.server.cluster.Bridge;
 import org.jboss.messaging.core.server.cluster.BroadcastGroup;
-import org.jboss.messaging.core.server.cluster.Cluster;
+import org.jboss.messaging.core.server.cluster.ClusterConnection;
 import org.jboss.messaging.core.settings.HierarchicalRepository;
 import org.jboss.messaging.core.settings.impl.QueueSettings;
 import org.jboss.messaging.core.transaction.ResourceManager;
@@ -103,7 +103,7 @@
 
    void unregisterBridge(String name) throws Exception;
    
-   void registerCluster(Cluster cluster, ClusterConfiguration configuration) throws Exception;
+   void registerCluster(ClusterConnection cluster, ClusterConnectionConfiguration configuration) throws Exception;
    
    void unregisterCluster(String name) throws Exception;
 

Modified: trunk/src/main/org/jboss/messaging/core/management/MessagingServerControlMBean.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/MessagingServerControlMBean.java	2009-01-28 10:53:22 UTC (rev 5750)
+++ trunk/src/main/org/jboss/messaging/core/management/MessagingServerControlMBean.java	2009-01-28 14:18:19 UTC (rev 5751)
@@ -166,6 +166,6 @@
 
    TabularData getConnectors() throws Exception;
    
-   void sendQueueInfoToQueue(SimpleString queueName) throws Exception;
+   void sendQueueInfoToQueue(String queueName) throws Exception;
 
 }

Modified: trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java	2009-01-28 10:53:22 UTC (rev 5750)
+++ trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java	2009-01-28 14:18:19 UTC (rev 5751)
@@ -47,7 +47,7 @@
 import org.jboss.messaging.core.config.TransportConfiguration;
 import org.jboss.messaging.core.config.cluster.BridgeConfiguration;
 import org.jboss.messaging.core.config.cluster.BroadcastGroupConfiguration;
-import org.jboss.messaging.core.config.cluster.ClusterConfiguration;
+import org.jboss.messaging.core.config.cluster.ClusterConnectionConfiguration;
 import org.jboss.messaging.core.config.cluster.DiscoveryGroupConfiguration;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.management.AcceptorControlMBean;
@@ -77,7 +77,7 @@
 import org.jboss.messaging.core.server.ServerMessage;
 import org.jboss.messaging.core.server.cluster.Bridge;
 import org.jboss.messaging.core.server.cluster.BroadcastGroup;
-import org.jboss.messaging.core.server.cluster.Cluster;
+import org.jboss.messaging.core.server.cluster.ClusterConnection;
 import org.jboss.messaging.core.server.impl.ServerMessageImpl;
 import org.jboss.messaging.core.settings.HierarchicalRepository;
 import org.jboss.messaging.core.settings.impl.QueueSettings;
@@ -288,6 +288,7 @@
       
       TypedProperties props = new TypedProperties();
       
+      log.info("registering queue with address "+ address);
       props.putStringProperty(ManagementHelper.HDR_ADDRESS, address);
       props.putStringProperty(ManagementHelper.HDR_QUEUE_NAME, queue.getName());
       
@@ -431,7 +432,7 @@
       unregisterFromJMX(objectName);
    }
    
-   public void registerCluster(final Cluster cluster, final ClusterConfiguration configuration) throws Exception
+   public void registerCluster(final ClusterConnection cluster, final ClusterConnectionConfiguration configuration) throws Exception
    {      
       //TODO
    }

Modified: trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java	2009-01-28 10:53:22 UTC (rev 5750)
+++ trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java	2009-01-28 14:18:19 UTC (rev 5751)
@@ -54,7 +54,7 @@
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.postoffice.Binding;
 import org.jboss.messaging.core.postoffice.PostOffice;
-import org.jboss.messaging.core.postoffice.impl.BindingImpl;
+import org.jboss.messaging.core.postoffice.impl.LocalQueueBinding;
 import org.jboss.messaging.core.remoting.RemotingConnection;
 import org.jboss.messaging.core.remoting.RemotingService;
 import org.jboss.messaging.core.server.MessagingServer;
@@ -296,7 +296,7 @@
       if (postOffice.getBinding(sName) == null)
       {
          Queue queue = queueFactory.createQueue(-1, sName, null, true, false);
-         Binding binding = new BindingImpl(sAddress, sName, sName, queue, false, true);
+         Binding binding = new LocalQueueBinding(sAddress, queue);
          storageManager.addQueueBinding(binding);
          postOffice.addBinding(binding);
       }
@@ -315,7 +315,7 @@
       if (postOffice.getBinding(sName) == null)
       {
          Queue queue = queueFactory.createQueue(-1, sName, filter, durable, false);
-         Binding binding = new BindingImpl(sAddress, sName, sName, queue, false, true);
+         Binding binding = new LocalQueueBinding(sAddress, queue);
          if (durable)
          {
             storageManager.addQueueBinding(binding);
@@ -544,9 +544,9 @@
       return TransportConfigurationInfo.toTabularData(connectorConfigurations);
    }
    
-   public void sendQueueInfoToQueue(final SimpleString queueName) throws Exception
+   public void sendQueueInfoToQueue(final String queueName) throws Exception
    {
-      postOffice.sendQueueInfoToQueue(queueName);
+      postOffice.sendQueueInfoToQueue(new SimpleString(queueName));
    }
 
    // NotificationEmitter implementation ----------------------------

Modified: trunk/src/main/org/jboss/messaging/core/management/jmx/impl/ReplicationAwareMessagingServerControlWrapper.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/jmx/impl/ReplicationAwareMessagingServerControlWrapper.java	2009-01-28 10:53:22 UTC (rev 5750)
+++ trunk/src/main/org/jboss/messaging/core/management/jmx/impl/ReplicationAwareMessagingServerControlWrapper.java	2009-01-28 14:18:19 UTC (rev 5751)
@@ -239,7 +239,7 @@
       return localControl.getConnectors();
    }
 
-   public void sendQueueInfoToQueue(final SimpleString queueName) throws Exception
+   public void sendQueueInfoToQueue(final String queueName) throws Exception
    {
       localControl.sendQueueInfoToQueue(queueName);
    }

Modified: trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java	2009-01-28 10:53:22 UTC (rev 5750)
+++ trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java	2009-01-28 14:18:19 UTC (rev 5751)
@@ -55,24 +55,20 @@
 
    private static final Logger log = Logger.getLogger(MessageImpl.class);
 
-   public static final SimpleString HDR_ACTUAL_EXPIRY_TIME = new SimpleString("JBM_ACTUAL_EXPIRY");
+   public static final SimpleString HDR_ACTUAL_EXPIRY_TIME = new SimpleString("_JBM_ACTUAL_EXPIRY");
 
-   public static final SimpleString HDR_ORIGINAL_DESTINATION = new SimpleString("JBM_ORIG_DESTINATION");
+   public static final SimpleString HDR_ORIGINAL_DESTINATION = new SimpleString("_JBM_ORIG_DESTINATION");
 
-   public static final SimpleString HDR_ORIG_MESSAGE_ID = new SimpleString("JBM_ORIG_MESSAGE_ID");
+   public static final SimpleString HDR_ORIG_MESSAGE_ID = new SimpleString("_JBM_ORIG_MESSAGE_ID");
 
-   public static final SimpleString HDR_GROUP_ID = new SimpleString("JBM_GROUP_ID");
+   public static final SimpleString HDR_GROUP_ID = new SimpleString("_JBM_GROUP_ID");
 
-   public static final SimpleString HDR_SCHEDULED_DELIVERY_TIME = new SimpleString("JBM_SCHED_DELIVERY");
+   public static final SimpleString HDR_SCHEDULED_DELIVERY_TIME = new SimpleString("_JBM_SCHED_DELIVERY");
    
-   public static final SimpleString HDR_DUPLICATE_DETECTION_ID = new SimpleString("JBM_DUPL_ID");
-   
-   public static final SimpleString HDR_ROUTE_TO_PREFIX = new SimpleString("JBM_ROUTE_TO:");
+   public static final SimpleString HDR_DUPLICATE_DETECTION_ID = new SimpleString("_JBM_DUPL_ID");
+
+   public static final SimpleString HDR_EXCLUDED_QUEUES = new SimpleString("_JBM_EXCLUDED_QUEUES");
       
-   public static final SimpleString HDR_RESET_QUEUE_DATA = new SimpleString("JBM_RESET_QUEUE_DATA");
-      
-   public static final SimpleString HDR_FROM_CLUSTER = new SimpleString("JBM_FROM_CLUSTER");
-   
    // Attributes ----------------------------------------------------
 
    protected long messageID;

Modified: trunk/src/main/org/jboss/messaging/core/postoffice/AddressManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/AddressManager.java	2009-01-28 10:53:22 UTC (rev 5750)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/AddressManager.java	2009-01-28 14:18:19 UTC (rev 5751)
@@ -37,7 +37,7 @@
 {
    boolean addBinding(Binding binding);
 
-   Binding removeBinding(SimpleString queueName);
+   Binding removeBinding(SimpleString uniqueName);
 
    Bindings getBindings(SimpleString address);
 

Modified: trunk/src/main/org/jboss/messaging/core/postoffice/Binding.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/Binding.java	2009-01-28 10:53:22 UTC (rev 5750)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/Binding.java	2009-01-28 14:18:19 UTC (rev 5751)
@@ -45,7 +45,9 @@
 
    SimpleString getRoutingName();
 
-   boolean accept(ServerMessage message) throws Exception;
+   boolean filterMatches(ServerMessage message) throws Exception;
+   
+   boolean isHighAcceptPriority(ServerMessage message);
 
    boolean isExclusive();
 }

Deleted: trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingImpl.java	2009-01-28 10:53:22 UTC (rev 5750)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingImpl.java	2009-01-28 14:18:19 UTC (rev 5751)
@@ -1,109 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.core.postoffice.impl;
-
-import org.jboss.messaging.core.filter.Filter;
-import org.jboss.messaging.core.postoffice.Binding;
-import org.jboss.messaging.core.server.Bindable;
-import org.jboss.messaging.core.server.ServerMessage;
-import org.jboss.messaging.util.SimpleString;
-
-/**
- * A BindingImpl
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * 
- * Created 21 Jan 2009 18:52:15
- *
- *
- */
-public class BindingImpl implements Binding
-{
-   private final SimpleString address;
-
-   private final SimpleString uniqueName;
-
-   private final SimpleString routingName;
-
-   protected final Bindable bindable;
-
-   private final boolean exclusive;
-
-   private final boolean isQueue;
-
-   public BindingImpl(final SimpleString address,
-                      final SimpleString uniqueName,
-                      final SimpleString routingName,
-                      final Bindable bindable,                      
-                      final boolean exclusive,
-                      final boolean isQueue)
-   {
-      this.address = address;
-
-      this.uniqueName = uniqueName;
-
-      this.routingName = routingName;
-
-      this.bindable = bindable;
-
-      this.exclusive = exclusive;
-
-      this.isQueue = isQueue;
-   }
-
-   public SimpleString getAddress()
-   {
-      return address;
-   }
-
-   public Bindable getBindable()
-   {
-      return bindable;
-   }
-
-   public boolean isQueueBinding()
-   {
-      return isQueue;
-   }
-
-   public boolean accept(final ServerMessage message) throws Exception
-   {
-      return bindable.accept(message);
-   }
-
-   public SimpleString getRoutingName()
-   {
-      return routingName;
-   }
-
-   public SimpleString getUniqueName()
-   {
-      return uniqueName;
-   }
-
-   public boolean isExclusive()
-   {
-      return exclusive;
-   }
-
-}

Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java	2009-01-28 10:53:22 UTC (rev 5750)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java	2009-01-28 14:18:19 UTC (rev 5751)
@@ -48,17 +48,17 @@
  *
  */
 public class BindingsImpl implements Bindings
-{   
+{
    private static final Logger log = Logger.getLogger(BindingsImpl.class);
 
    private final ConcurrentMap<SimpleString, List<Binding>> routingNameBindingMap = new ConcurrentHashMap<SimpleString, List<Binding>>();
-   
+
    private final Map<SimpleString, Integer> routingNamePositions = new ConcurrentHashMap<SimpleString, Integer>();
-   
+
    private final List<Binding> bindingsList = new CopyOnWriteArrayList<Binding>();
-   
+
    private final List<Binding> exclusiveBindings = new CopyOnWriteArrayList<Binding>();
-   
+
    public List<Binding> getBindings()
    {
       return bindingsList;
@@ -73,27 +73,27 @@
       else
       {
          SimpleString routingName = binding.getRoutingName();
-         
+
          List<Binding> bindings = routingNameBindingMap.get(routingName);
-         
+
          if (bindings == null)
          {
             bindings = new CopyOnWriteArrayList<Binding>();
-            
+
             List<Binding> oldBindings = routingNameBindingMap.putIfAbsent(routingName, bindings);
-            
+
             if (oldBindings != null)
             {
                bindings = oldBindings;
             }
          }
-         
+
          bindings.add(binding);
       }
-      
+
       bindingsList.add(binding);
    }
-   
+
    public void removeBinding(final Binding binding)
    {
       if (binding.isExclusive())
@@ -103,33 +103,33 @@
       else
       {
          SimpleString routingName = binding.getRoutingName();
-         
+
          List<Binding> bindings = routingNameBindingMap.get(routingName);
-         
+
          if (bindings != null)
          {
             bindings.remove(binding);
-            
+
             if (bindings.isEmpty())
             {
                routingNameBindingMap.remove(routingName);
             }
          }
       }
-      
+
       bindingsList.remove(binding);
    }
-   
+
    public void route(ServerMessage message) throws Exception
    {
       route(message, null);
    }
-   
+
    public void route(ServerMessage message, Transaction tx) throws Exception
    {
       if (!exclusiveBindings.isEmpty())
       {
-         for (Binding binding: exclusiveBindings)
+         for (Binding binding : exclusiveBindings)
          {
             binding.getBindable().route(message, tx);
          }
@@ -137,42 +137,46 @@
       else
       {
          Set<Bindable> chosen = new HashSet<Bindable>();
-          
-         for (Map.Entry<SimpleString, List<Binding>> entry: routingNameBindingMap.entrySet())
+
+         for (Map.Entry<SimpleString, List<Binding>> entry : routingNameBindingMap.entrySet())
          {
             SimpleString routingName = entry.getKey();
-            
+
             List<Binding> bindings = entry.getValue();
-            
+
             if (bindings == null)
             {
-               //The value can become null if it's concurrently removed while we're iterating - this is expected ConcurrentHashMap behaviour!
+               // The value can become null if it's concurrently removed while we're iterating - this is expected
+               // ConcurrentHashMap behaviour!
                continue;
             }
-               
+
             Integer ipos = routingNamePositions.get(routingName);
-            
+
             int pos = ipos != null ? ipos.intValue() : 0;
-              
+
+            int length = bindings.size();
+
             int startPos = pos;
-            
-            int length = bindings.size();
-              
-            do
-            {               
+
+            Binding theBinding = null;
+
+            int lastNoMatchingConsumerPos = -1;
+
+            while (true)
+            {
                Binding binding;
-               
                try
                {
                   binding = bindings.get(pos);
                }
                catch (IndexOutOfBoundsException e)
                {
-                  //This can occur if binding is removed while in route
+                  // This can occur if binding is removed while in route
                   if (!bindings.isEmpty())
                   {
                      pos = 0;
-                     
+
                      continue;
                   }
                   else
@@ -180,34 +184,94 @@
                      break;
                   }
                }
-               
-               pos++;
-               
-               if (pos == length)
+
+               if (binding.filterMatches(message))
                {
-                  pos = 0;
+                  // bindings.length == 1 ==> only a local queue so we don't check for matching consumers (it's an
+                  // unnecessary overhead)
+                  if (length == 1 || binding.isHighAcceptPriority(message))
+                  {
+                     theBinding = binding;
+
+                     pos = incrementPos(pos, length);
+
+                     break;
+                  }
+                  else
+                  {
+                     lastNoMatchingConsumerPos = pos;
+                  }
                }
-               
-               if (binding.accept(message))
+
+               pos = incrementPos(pos, length);
+
+               if (pos == startPos)
                {
-                  chosen.add(binding.getBindable());
-                   
+                  if (lastNoMatchingConsumerPos != -1)
+                  {                     
+                     try
+                     {
+                        theBinding = bindings.get(pos);
+                     }
+                     catch (IndexOutOfBoundsException e)
+                     {
+                        // This can occur if binding is removed while in route
+                        if (!bindings.isEmpty())
+                        {
+                           pos = 0;
+                           
+                           lastNoMatchingConsumerPos = -1;
+
+                           continue;
+                        }
+                        else
+                        {
+                           break;
+                        }
+                     }
+                                         
+                     pos = lastNoMatchingConsumerPos;
+
+                     pos = incrementPos(pos, length);
+                  }
                   break;
                }
             }
-            while (startPos != pos);
-            
-            if (pos != startPos)
+
+            if (theBinding != null)
             {
-               routingNamePositions.put(routingName, pos);
+               chosen.add(theBinding.getBindable());
             }
+
+            routingNamePositions.put(routingName, pos);
+
          }
-                     
-         for (Bindable bindable: chosen)
+
+         //TODO refactor to do this is one iteration
+         
+         for (Bindable bindable : chosen)
          {
+            bindable.preroute(message, tx);
+         }
+         
+         for (Bindable bindable : chosen)
+         {
             bindable.route(message, tx);
          }
-      }      
+      }
+
    }
-   
+
+   private final int incrementPos(int pos, int length)
+   {
+      pos++;
+
+      if (pos == length)
+      {
+         pos = 0;
+      }
+
+      return pos;
+   }
+
 }

Added: trunk/src/main/org/jboss/messaging/core/postoffice/impl/DivertBinding.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/DivertBinding.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/DivertBinding.java	2009-01-28 14:18:19 UTC (rev 5751)
@@ -0,0 +1,119 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.postoffice.impl;
+
+import org.jboss.messaging.core.filter.Filter;
+import org.jboss.messaging.core.postoffice.Binding;
+import org.jboss.messaging.core.server.Bindable;
+import org.jboss.messaging.core.server.Divert;
+import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * A LocalQueueBinding
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * 
+ * Created 28 Jan 2009 12:42:23
+ *
+ *
+ */
+public class DivertBinding implements Binding
+{
+   private final SimpleString address;
+   
+   private final Divert divert;
+   
+   private final Filter filter;
+   
+   private final SimpleString uniqueName;
+   
+   private final SimpleString routingName;
+   
+   private final boolean exclusive;
+   
+   public DivertBinding(final SimpleString address, final Divert divert)
+   {
+      this.address = address;
+      
+      this.divert = divert;
+      
+      this.filter = divert.getFilter();
+      
+      this.uniqueName = divert.getUniqueName();
+      
+      this.routingName = divert.getRoutingName();
+      
+      this.exclusive = divert.isExclusive();
+   }
+      
+   public boolean filterMatches(final ServerMessage message) throws Exception
+   {
+      if (filter != null && !filter.match(message))
+      {
+         return false;
+      }
+      else
+      {
+         return true;
+      }
+   }
+
+   public SimpleString getAddress()
+   {
+      return address;
+   }
+
+   public Bindable getBindable()
+   {
+      return divert;
+   }
+
+   public SimpleString getRoutingName()
+   {
+      return routingName;
+   }
+
+   public SimpleString getUniqueName()
+   {
+      return uniqueName;
+   }
+
+   public boolean isExclusive()
+   {
+      return exclusive;
+   }
+
+   public boolean isHighAcceptPriority(final ServerMessage message)
+   {
+      return true;
+   }
+
+   public boolean isQueueBinding()
+   {
+      return false;
+   }
+
+}
+

Added: trunk/src/main/org/jboss/messaging/core/postoffice/impl/LocalQueueBinding.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/LocalQueueBinding.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/LocalQueueBinding.java	2009-01-28 14:18:19 UTC (rev 5751)
@@ -0,0 +1,134 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.postoffice.impl;
+
+import java.util.List;
+
+import org.jboss.messaging.core.filter.Filter;
+import org.jboss.messaging.core.postoffice.Binding;
+import org.jboss.messaging.core.server.Bindable;
+import org.jboss.messaging.core.server.Consumer;
+import org.jboss.messaging.core.server.Queue;
+import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * A LocalQueueBinding
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * 
+ * Created 28 Jan 2009 12:42:23
+ *
+ *
+ */
+public class LocalQueueBinding implements Binding
+{
+   private final SimpleString address;
+   
+   private final Queue queue;
+   
+   private final Filter filter;
+   
+   private final SimpleString name;
+   
+   public LocalQueueBinding(final SimpleString address, final Queue queue)
+   {
+      this.address = address;
+      
+      this.queue = queue;
+      
+      this.filter = queue.getFilter();
+      
+      this.name = queue.getName();
+   }
+      
+   public boolean filterMatches(final ServerMessage message) throws Exception
+   {
+      if (filter != null && !filter.match(message))
+      {
+         return false;
+      }
+      else
+      {
+         return true;
+      }
+   }
+
+   public SimpleString getAddress()
+   {
+      return address;
+   }
+
+   public Bindable getBindable()
+   {
+      return queue;
+   }
+
+   public SimpleString getRoutingName()
+   {
+      return name;
+   }
+
+   public SimpleString getUniqueName()
+   {
+      return name;
+   }
+
+   public boolean isExclusive()
+   {
+      return false;
+   }
+
+   public boolean isHighAcceptPriority(final ServerMessage message)
+   {
+      //It's a high accept priority if the queue has at least one matching consumer
+      
+      List<Consumer> consumers = queue.getConsumers();
+      
+      for (Consumer consumer: consumers)
+      {
+         Filter filter = consumer.getFilter();
+         
+         if (filter == null)
+         {
+            return true;
+         }
+         else
+         {
+            if (filter.match(message))
+            {
+               return true;
+            }
+         }
+      }
+      
+      return false;
+   }
+
+   public boolean isQueueBinding()
+   {
+      return true;
+   }
+
+}

Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2009-01-28 10:53:22 UTC (rev 5750)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2009-01-28 14:18:19 UTC (rev 5751)
@@ -80,7 +80,9 @@
 public class PostOfficeImpl implements PostOffice, NotificationListener
 {
    private static final Logger log = Logger.getLogger(PostOfficeImpl.class);
-
+   
+   public static final SimpleString HDR_RESET_QUEUE_DATA = new SimpleString("_JBM_RESET_QUEUE_DATA");
+   
    private static final List<MessageReference> emptyList = Collections.<MessageReference> emptyList();
 
    private final AddressManager addressManager;
@@ -300,7 +302,7 @@
       
       message.setDestination(queueName);
       
-      message.putStringProperty(ManagementHelper.HDR_NOTIFICATION_TYPE, new SimpleString(NotificationType.QUEUE_CREATED.toString()));        
+      message.putStringProperty(ManagementHelper.HDR_NOTIFICATION_TYPE, new SimpleString(type.toString()));        
       message.putLongProperty(ManagementHelper.HDR_NOTIFICATION_TIMESTAMP, System.currentTimeMillis());
       
       return message;
@@ -327,19 +329,20 @@
          ServerMessage message = new ServerMessageImpl(storageManager.generateUniqueID()); 
          message.setBody(new ByteBufferWrapper(ByteBuffer.allocate(0)));
          message.setDestination(queueName);
-         message.putBooleanProperty(MessageImpl.HDR_RESET_QUEUE_DATA, true);
+         message.putBooleanProperty(HDR_RESET_QUEUE_DATA, true);
          
-         queue.accept(message);            
+         queue.preroute(message, null);            
          queue.route(message, null);
                   
          for (QueueInfo info: queueInfos.values())
          {
+            log.info("creatign queue created message");
             message = createQueueInfoMessage(NotificationType.QUEUE_CREATED, queueName);
             
             message.putStringProperty(ManagementHelper.HDR_ADDRESS, info.getAddress());
             message.putStringProperty(ManagementHelper.HDR_QUEUE_NAME, info.getQueueName());
             
-            queue.accept(message);            
+            queue.preroute(message, null);            
             queue.route(message, null);
             
             int consumersWithFilters = info.getFilterStrings() != null ? info.getFilterStrings().size() : 0;
@@ -350,7 +353,7 @@
                
                message.putStringProperty(ManagementHelper.HDR_QUEUE_NAME, info.getQueueName()); 
                
-               queue.accept(message);            
+               queue.preroute(message, null);            
                queue.route(message, null);
             }
             
@@ -363,7 +366,7 @@
                   message.putStringProperty(ManagementHelper.HDR_QUEUE_NAME, info.getQueueName());
                   message.putStringProperty(ManagementHelper.HDR_FILTERSTRING, filterString); 
                   
-                  queue.accept(message);            
+                  queue.preroute(message, null);            
                   queue.route(message, null);
                }
             }           

Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/SimpleAddressManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/SimpleAddressManager.java	2009-01-28 10:53:22 UTC (rev 5750)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/SimpleAddressManager.java	2009-01-28 14:18:19 UTC (rev 5751)
@@ -57,16 +57,17 @@
       return addMappingInternal(binding.getAddress(), binding);
    }
 
-   public Binding removeBinding(final SimpleString bindableName)
+   public Binding removeBinding(final SimpleString uniqueName)
    {
-      Binding binding = nameMap.remove(bindableName);
+      Binding binding = nameMap.remove(uniqueName);
 
       if (binding == null)
       {
-         throw new IllegalStateException("Queue is not bound " + bindableName);
+         throw new IllegalStateException("Queue is not bound " + uniqueName);
       }
 
-      removeBindingInternal(binding.getAddress(), bindableName);
+      removeBindingInternal(binding.getAddress(), uniqueName);
+      
       return binding;
    }
 

Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/WildcardAddressManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/WildcardAddressManager.java	2009-01-28 10:53:22 UTC (rev 5750)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/WildcardAddressManager.java	2009-01-28 14:18:19 UTC (rev 5751)
@@ -125,12 +125,12 @@
     * If the address is a wild card then the binding will be removed from the actual mappings for any linked address.
     * otherwise it will be removed as normal.
     *
-    * @param bindableName the name of the queue for the binding to remove
+    * @param uniqueName the name of the binding to remove
     * @return true if this was the last mapping for a specific address
     */
-   public Binding removeBinding(final SimpleString bindableName)
+   public Binding removeBinding(final SimpleString uniqueName)
    {
-      Binding binding = super.removeBinding(bindableName);
+      Binding binding = super.removeBinding(uniqueName);
       if (binding != null)
       {
          Address add = getAddress(binding.getAddress());
@@ -152,7 +152,7 @@
          {
             for (Address destination : add.getLinkedAddresses())
             {
-               super.removeBindingInternal(destination.getAddress(), bindableName);
+               super.removeBindingInternal(destination.getAddress(), uniqueName);
             }
          }
          removeAndUpdateAddressMap(add);

Modified: trunk/src/main/org/jboss/messaging/core/server/Bindable.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/Bindable.java	2009-01-28 10:53:22 UTC (rev 5750)
+++ trunk/src/main/org/jboss/messaging/core/server/Bindable.java	2009-01-28 14:18:19 UTC (rev 5751)
@@ -35,8 +35,8 @@
  */
 public interface Bindable
 {
+   void preroute(ServerMessage message, Transaction tx) throws Exception;
+   
    void route(ServerMessage message, Transaction tx) throws Exception;
-   
-   boolean accept(ServerMessage message) throws Exception;
 }
 

Modified: trunk/src/main/org/jboss/messaging/core/server/Consumer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/Consumer.java	2009-01-28 10:53:22 UTC (rev 5750)
+++ trunk/src/main/org/jboss/messaging/core/server/Consumer.java	2009-01-28 14:18:19 UTC (rev 5751)
@@ -22,7 +22,7 @@
 
 package org.jboss.messaging.core.server;
 
-import org.jboss.messaging.util.SimpleString;
+import org.jboss.messaging.core.filter.Filter;
 
 
 
@@ -37,5 +37,5 @@
 {
    HandleStatus handle(MessageReference reference) throws Exception;
    
-   SimpleString getFilterString();
+   Filter getFilter();
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/Divert.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/Divert.java	2009-01-28 10:53:22 UTC (rev 5750)
+++ trunk/src/main/org/jboss/messaging/core/server/Divert.java	2009-01-28 14:18:19 UTC (rev 5751)
@@ -23,8 +23,11 @@
 
 package org.jboss.messaging.core.server;
 
+import org.jboss.messaging.core.filter.Filter;
+import org.jboss.messaging.util.SimpleString;
 
 
+
 /**
  * A Divert
  *
@@ -36,5 +39,11 @@
  */
 public interface Divert extends Bindable
 {
-   //boolean isExclusive();
+   Filter getFilter();
+   
+   boolean isExclusive();
+   
+   SimpleString getUniqueName();
+   
+   SimpleString getRoutingName();
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/Bridge.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/Bridge.java	2009-01-28 10:53:22 UTC (rev 5750)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/Bridge.java	2009-01-28 14:18:19 UTC (rev 5751)
@@ -48,8 +48,6 @@
 
    long getMaxBatchTime();
 
-   SimpleString getFilterString();
-
    SimpleString getForwardingAddress();
 
    Transformer getTransformer();

Deleted: trunk/src/main/org/jboss/messaging/core/server/cluster/Cluster.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/Cluster.java	2009-01-28 10:53:22 UTC (rev 5750)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/Cluster.java	2009-01-28 14:18:19 UTC (rev 5751)
@@ -1,41 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-
-package org.jboss.messaging.core.server.cluster;
-
-import org.jboss.messaging.core.server.MessagingComponent;
-import org.jboss.messaging.util.SimpleString;
-
-/**
- * A Cluster
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * 
- * Created 23 Jan 2009 14:51:55
- *
- *
- */
-public interface Cluster extends MessagingComponent
-{
-   SimpleString getName();
-}

Copied: trunk/src/main/org/jboss/messaging/core/server/cluster/ClusterConnection.java (from rev 5744, trunk/src/main/org/jboss/messaging/core/server/cluster/Cluster.java)
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/ClusterConnection.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/ClusterConnection.java	2009-01-28 14:18:19 UTC (rev 5751)
@@ -0,0 +1,41 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.server.cluster;
+
+import org.jboss.messaging.core.server.MessagingComponent;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * A ClusterConnection
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * 
+ * Created 23 Jan 2009 14:51:55
+ *
+ *
+ */
+public interface ClusterConnection extends MessagingComponent
+{
+   SimpleString getName();
+}

Deleted: trunk/src/main/org/jboss/messaging/core/server/cluster/FlowBinding.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/FlowBinding.java	2009-01-28 10:53:22 UTC (rev 5750)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/FlowBinding.java	2009-01-28 14:18:19 UTC (rev 5751)
@@ -1,44 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-
-package org.jboss.messaging.core.server.cluster;
-
-import org.jboss.messaging.core.postoffice.Binding;
-import org.jboss.messaging.util.SimpleString;
-
-/**
- * A FlowBinding
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * 
- * Created 23 Jan 2009 11:58:05
- *
- *
- */
-public interface FlowBinding extends Binding
-{
-   void addConsumer(SimpleString filterString) throws Exception;
-   
-   void removeConsumer(SimpleString filterString) throws Exception;
-
-}

Copied: trunk/src/main/org/jboss/messaging/core/server/cluster/RemoteQueueBinding.java (from rev 5750, trunk/src/main/org/jboss/messaging/core/server/cluster/FlowBinding.java)
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/RemoteQueueBinding.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/RemoteQueueBinding.java	2009-01-28 14:18:19 UTC (rev 5751)
@@ -0,0 +1,44 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.server.cluster;
+
+import org.jboss.messaging.core.postoffice.Binding;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * A RemoteQueueBinding
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * 
+ * Created 23 Jan 2009 11:58:05
+ *
+ *
+ */
+public interface RemoteQueueBinding extends Binding
+{
+   void addConsumer(SimpleString filterString) throws Exception;
+   
+   void removeConsumer(SimpleString filterString) throws Exception;
+
+}

Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java	2009-01-28 10:53:22 UTC (rev 5750)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java	2009-01-28 14:18:19 UTC (rev 5751)
@@ -24,11 +24,7 @@
 
 import static org.jboss.messaging.core.config.impl.ConfigurationImpl.DEFAULT_MANAGEMENT_NOTIFICATION_ADDRESS;
 
-import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
@@ -44,6 +40,7 @@
 import org.jboss.messaging.core.client.impl.ClientSessionImpl;
 import org.jboss.messaging.core.client.management.impl.ManagementHelper;
 import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.filter.Filter;
 import org.jboss.messaging.core.filter.impl.FilterImpl;
@@ -51,7 +48,6 @@
 import org.jboss.messaging.core.management.NotificationType;
 import org.jboss.messaging.core.management.impl.ManagementServiceImpl;
 import org.jboss.messaging.core.persistence.StorageManager;
-import org.jboss.messaging.core.postoffice.QueueInfo;
 import org.jboss.messaging.core.remoting.FailureListener;
 import org.jboss.messaging.core.remoting.RemotingConnection;
 import org.jboss.messaging.core.server.HandleStatus;
@@ -65,7 +61,6 @@
 import org.jboss.messaging.util.Future;
 import org.jboss.messaging.util.Pair;
 import org.jboss.messaging.util.SimpleString;
-import org.jboss.messaging.util.TypedProperties;
 import org.jboss.messaging.util.UUIDGenerator;
 
 /**
@@ -168,6 +163,7 @@
                      final MessageHandler queueInfoMessageHandler,
                      final String queueDataAddress) throws Exception
    {
+      log.info("Creating new bridge " + name + " queue " + queue);
       this.name = name;
 
       this.queue = queue;
@@ -209,6 +205,8 @@
 
       this.queueInfoMessageHandler = queueInfoMessageHandler;
       
+      log.info("queue info handler " + this.queueInfoMessageHandler);
+      
       this.queueDataAddress = queueDataAddress;
 
       if (maxBatchTime != -1)
@@ -242,6 +240,7 @@
       {
          try
          {
+            log.info("creating objects");
             createTx();
 
             queue.addConsumer(BridgeImpl.this);
@@ -263,22 +262,23 @@
             {
                // Get the queue data
 
-               SimpleString notifQueueName = UUIDGenerator.getInstance().generateSimpleStringUUID();
+               SimpleString notifQueueName = new SimpleString("notif-").concat(UUIDGenerator.getInstance().generateSimpleStringUUID());
 
-               SimpleString filter = new SimpleString(ManagementHelper.HDR_ADDRESS + " LIKE '" + queueDataAddress + "' AND " +                                                                                                           
+               SimpleString filter = new SimpleString(                                                                                                         
                                                       ManagementHelper.HDR_NOTIFICATION_TYPE + " IN (" +
                                                       "'" +
                                                       NotificationType.QUEUE_CREATED +
+                                                      "'," +
                                                       "'" +
-                                                      "'" +
                                                       NotificationType.QUEUE_DESTROYED +
+                                                      "'," +
                                                       "'" +
-                                                      "'" +
                                                       NotificationType.CONSUMER_CREATED +
+                                                      "'," +
                                                       "'" +
-                                                      "'" +
                                                       NotificationType.CONSUMER_CLOSED +
-                                                      "'");
+                                                      "') AND " +                                                     
+                                                      ManagementHelper.HDR_ADDRESS + " LIKE '" + queueDataAddress + "%'");
                
                session.createQueue(DEFAULT_MANAGEMENT_NOTIFICATION_ADDRESS, notifQueueName, filter, false, true);
 
@@ -293,8 +293,14 @@
                ManagementHelper.putOperationInvocation(message,
                                                        ManagementServiceImpl.getMessagingServerObjectName(),
                                                        "sendQueueInfoToQueue",
-                                                       notifQueueName);
+                                                       notifQueueName.toString());
+               
+               ClientProducer prod = session.createProducer(ConfigurationImpl.DEFAULT_MANAGEMENT_ADDRESS);
+               
+               prod.send(message);
             }
+            
+            log.info("Created objects");
 
             active = true;
 
@@ -313,6 +319,11 @@
 
    public synchronized void stop() throws Exception
    {
+      if (!started)
+      {
+         return;
+      }
+      
       started = false;
 
       active = false;
@@ -465,6 +476,8 @@
          {
             return;
          }
+         
+         log.info("sending batch");
 
          // TODO - if batch size = 1 then don't need tx
 
@@ -564,9 +577,9 @@
       return maxBatchTime;
    }
 
-   public SimpleString getFilterString()
+   public Filter getFilter()
    {
-      return filterString;
+      return filter;
    }
 
    public SimpleString getForwardingAddress()

Copied: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java (from rev 5744, trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterImpl.java)
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java	2009-01-28 14:18:19 UTC (rev 5751)
@@ -0,0 +1,505 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.server.cluster.impl;
+
+import static org.jboss.messaging.core.postoffice.impl.PostOfficeImpl.HDR_RESET_QUEUE_DATA;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.MessageHandler;
+import org.jboss.messaging.core.client.management.impl.ManagementHelper;
+import org.jboss.messaging.core.cluster.DiscoveryGroup;
+import org.jboss.messaging.core.cluster.DiscoveryListener;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.cluster.BridgeConfiguration;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.management.NotificationType;
+import org.jboss.messaging.core.persistence.StorageManager;
+import org.jboss.messaging.core.postoffice.Binding;
+import org.jboss.messaging.core.postoffice.PostOffice;
+import org.jboss.messaging.core.postoffice.impl.LocalQueueBinding;
+import org.jboss.messaging.core.postoffice.impl.PostOfficeImpl;
+import org.jboss.messaging.core.server.Queue;
+import org.jboss.messaging.core.server.QueueFactory;
+import org.jboss.messaging.core.server.cluster.Bridge;
+import org.jboss.messaging.core.server.cluster.ClusterConnection;
+import org.jboss.messaging.core.server.cluster.RemoteQueueBinding;
+import org.jboss.messaging.util.ExecutorFactory;
+import org.jboss.messaging.util.Pair;
+import org.jboss.messaging.util.SimpleString;
+import org.jboss.messaging.util.UUIDGenerator;
+
+/**
+ * 
+ * A ClusterConnectionImpl
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * 
+ * Created 21 Jan 2009 14:43:05
+ *
+ *
+ */
+public class ClusterConnectionImpl implements ClusterConnection, DiscoveryListener
+{
+   private static final Logger log = Logger.getLogger(ClusterConnectionImpl.class);
+
+   private final ExecutorFactory executorFactory;
+
+   private final StorageManager storageManager;
+
+   private final PostOffice postOffice;
+
+   private final SimpleString name;
+
+   private final SimpleString address;
+
+   private final BridgeConfiguration bridgeConfig;
+
+   private final boolean useDuplicateDetection;
+
+   private final boolean forwardWhenNoMatchingConsumers;
+
+   private Map<Pair<TransportConfiguration, TransportConfiguration>, MessageFlowRecord> records = new HashMap<Pair<TransportConfiguration, TransportConfiguration>, MessageFlowRecord>();
+
+   private final DiscoveryGroup discoveryGroup;
+
+   private final ScheduledExecutorService scheduledExecutor;
+
+   private final QueueFactory queueFactory;
+
+   private volatile boolean started;
+
+   /*
+    * Constructor using static list of connectors
+    */
+   public ClusterConnectionImpl(final SimpleString name,
+                                final SimpleString address,
+                                final BridgeConfiguration bridgeConfig,
+                                final boolean useDuplicateDetection,
+                                final boolean forwardWhenNoMatchingConsumers,
+                                final ExecutorFactory executorFactory,
+                                final StorageManager storageManager,
+                                final PostOffice postOffice,
+                                final ScheduledExecutorService scheduledExecutor,
+                                final QueueFactory queueFactory,
+                                final List<Pair<TransportConfiguration, TransportConfiguration>> connectors) throws Exception
+   {
+      this.name = name;
+
+      this.address = address;
+
+      this.bridgeConfig = bridgeConfig;
+
+      this.useDuplicateDetection = useDuplicateDetection;
+
+      this.forwardWhenNoMatchingConsumers = forwardWhenNoMatchingConsumers;
+
+      this.executorFactory = executorFactory;
+
+      this.storageManager = storageManager;
+
+      this.postOffice = postOffice;
+
+      this.discoveryGroup = null;
+
+      this.scheduledExecutor = scheduledExecutor;
+
+      this.queueFactory = queueFactory;
+
+      this.updateConnectors(connectors);
+   }
+
+   /*
+    * Constructor using discovery to get connectors
+    */
+   public ClusterConnectionImpl(final SimpleString name,
+                                final SimpleString address,
+                                final BridgeConfiguration bridgeConfig,
+                                final boolean useDuplicateDetection,
+                                final boolean forwardWhenNoMatchingConsumers,
+                                final ExecutorFactory executorFactory,
+                                final StorageManager storageManager,
+                                final PostOffice postOffice,
+                                final ScheduledExecutorService scheduledExecutor,
+                                final QueueFactory queueFactory,
+                                final DiscoveryGroup discoveryGroup) throws Exception
+   {
+      this.name = name;
+
+      this.address = address;
+
+      this.bridgeConfig = bridgeConfig;
+
+      this.executorFactory = executorFactory;
+
+      this.storageManager = storageManager;
+
+      this.postOffice = postOffice;
+
+      this.scheduledExecutor = scheduledExecutor;
+
+      this.queueFactory = queueFactory;
+
+      this.discoveryGroup = discoveryGroup;
+
+      this.useDuplicateDetection = useDuplicateDetection;
+
+      this.forwardWhenNoMatchingConsumers = forwardWhenNoMatchingConsumers;
+   }
+
+   public synchronized void start() throws Exception
+   {
+      if (started)
+      {
+         return;
+      }
+
+      if (discoveryGroup != null)
+      {
+         updateConnectors(discoveryGroup.getConnectors());
+
+         discoveryGroup.registerListener(this);
+      }
+
+      started = true;
+   }
+
+   public synchronized void stop() throws Exception
+   {
+      log.info("Stoping cluster connection");
+
+      if (!started)
+      {
+         return;
+      }
+
+      if (discoveryGroup != null)
+      {
+         discoveryGroup.unregisterListener(this);
+      }
+
+      log.info("Three are " + records.size() + " records");
+
+      for (MessageFlowRecord record : records.values())
+      {
+         log.info("stopping record");
+         record.close();
+      }
+
+      started = false;
+   }
+
+   public boolean isStarted()
+   {
+      return started;
+   }
+
+   public SimpleString getName()
+   {
+      return name;
+   }
+
+   // DiscoveryListener implementation ------------------------------------------------------------------
+
+   public void connectorsChanged()
+   {
+      try
+      {
+         List<Pair<TransportConfiguration, TransportConfiguration>> connectors = discoveryGroup.getConnectors();
+
+         updateConnectors(connectors);
+      }
+      catch (Exception e)
+      {
+         log.error("Failed to update connectors", e);
+      }
+   }
+
+   private void updateConnectors(final List<Pair<TransportConfiguration, TransportConfiguration>> connectors) throws Exception
+   {
+      Set<Pair<TransportConfiguration, TransportConfiguration>> connectorSet = new HashSet<Pair<TransportConfiguration, TransportConfiguration>>();
+
+      connectorSet.addAll(connectors);
+
+      Iterator<Map.Entry<Pair<TransportConfiguration, TransportConfiguration>, MessageFlowRecord>> iter = records.entrySet()
+                                                                                                                 .iterator();
+
+      while (iter.hasNext())
+      {
+         Map.Entry<Pair<TransportConfiguration, TransportConfiguration>, MessageFlowRecord> entry = iter.next();
+
+         if (!connectorSet.contains(entry.getKey()))
+         {
+            // Connector no longer there - we should remove and close it - we don't delete the queue though - it may
+            // have messages - this is up to the admininstrator to do this
+
+            entry.getValue().close();
+
+            iter.remove();
+         }
+      }
+
+      for (Pair<TransportConfiguration, TransportConfiguration> connectorPair : connectors)
+      {
+         if (!records.containsKey(connectorPair))
+         {
+            SimpleString queueName = generateQueueName(name, connectorPair);
+
+            Binding queueBinding = postOffice.getBinding(queueName);
+
+            Queue queue;
+
+            if (queueBinding != null)
+            {
+               queue = (Queue)queueBinding.getBindable();
+            }
+            else
+            {
+               queue = queueFactory.createQueue(-1, name, null, true, false);
+
+               // Add binding in storage so the queue will get reloaded on startup and we can find it - it's never
+               // actually routed to at that address though
+
+               Binding storeBinding = new LocalQueueBinding(queue.getName(), queue);
+
+               storageManager.addQueueBinding(storeBinding);
+            }
+
+            MessageFlowRecord record = new MessageFlowRecord(queue);
+
+            Bridge bridge = new BridgeImpl(queueName,
+                                           queue,
+                                           connectorPair,
+                                           executorFactory.getExecutor(),
+                                           bridgeConfig.getMaxBatchSize(),
+                                           bridgeConfig.getMaxBatchTime(),
+                                           bridgeConfig.getFilterString() == null ? null
+                                                                                 : new SimpleString(bridgeConfig.getFilterString()),
+                                           null,
+                                           storageManager,
+                                           scheduledExecutor,
+                                           null,
+                                           bridgeConfig.getRetryInterval(),
+                                           bridgeConfig.getRetryIntervalMultiplier(),
+                                           bridgeConfig.getMaxRetriesBeforeFailover(),
+                                           bridgeConfig.getMaxRetriesAfterFailover(),
+                                           false,
+                                           record,
+                                           address.toString());
+
+            record.setBridge(bridge);
+
+            log.info("added record");
+            records.put(connectorPair, record);
+
+            bridge.start();
+         }
+      }
+   }
+
+   private SimpleString generateQueueName(final SimpleString clusterName,
+                                          final Pair<TransportConfiguration, TransportConfiguration> connectorPair) throws Exception
+   {
+      return new SimpleString("cluster." + name +
+                              "." +
+                              generateConnectorString(connectorPair.a) +
+                              "-" +
+                              (connectorPair.b == null ? "null" : generateConnectorString(connectorPair.b)));
+   }
+
+   private String replaceWildcardChars(final String str)
+   {
+      return str.replace('.', '-');
+   }
+
+   private SimpleString generateConnectorString(final TransportConfiguration config) throws Exception
+   {
+      StringBuilder str = new StringBuilder(replaceWildcardChars(config.getFactoryClassName()));
+
+      log.info("config is " + config);
+
+      if (config.getParams() != null)
+      {
+         if (!config.getParams().isEmpty())
+         {
+            str.append("?");
+         }
+
+         boolean first = true;
+         for (Map.Entry<String, Object> entry : config.getParams().entrySet())
+         {
+            if (!first)
+            {
+               str.append("&");
+            }
+            String encodedKey = replaceWildcardChars(entry.getKey());
+
+            String val = entry.getValue().toString();
+            String encodedVal = replaceWildcardChars(val);
+
+            str.append(encodedKey).append('=').append(encodedVal);
+
+            first = false;
+         }
+      }
+
+      return new SimpleString(str.toString());
+   }
+
+   // Inner classes -----------------------------------------------------------------------------------
+
+   private class MessageFlowRecord implements MessageHandler
+   {
+      private Bridge bridge;
+
+      private final Queue queue;
+
+      private final Map<SimpleString, RemoteQueueBinding> bindings = new HashMap<SimpleString, RemoteQueueBinding>();
+
+      private boolean firstReset = false;
+
+      public MessageFlowRecord(final Queue queue)
+      {
+         this.queue = queue;
+      }
+
+      public void close() throws Exception
+      {
+         log.info("stopping bridge");
+         bridge.stop();
+         log.info("stopped bridge");
+
+         for (RemoteQueueBinding binding : bindings.values())
+         {
+            postOffice.removeBinding(binding.getUniqueName());
+         }
+      }
+
+      public void setBridge(final Bridge bridge)
+      {
+         this.bridge = bridge;
+      }
+
+      public void onMessage(final ClientMessage message)
+      {
+         try
+         {
+            // Reset the bindings
+            if (message.getProperty(HDR_RESET_QUEUE_DATA) != null)
+            {
+               for (RemoteQueueBinding binding : bindings.values())
+               {
+                  postOffice.removeBinding(binding.getUniqueName());
+               }
+
+               bindings.clear();
+
+               firstReset = true;
+
+               log.info("did reset");
+
+               return;
+            }
+
+            if (!firstReset)
+            {
+               return;
+            }
+
+            NotificationType type = NotificationType.valueOf(message.getProperty(ManagementHelper.HDR_NOTIFICATION_TYPE)
+                                                                    .toString());
+
+            log.info("Got notification message " + type);
+
+            if (type == NotificationType.QUEUE_CREATED)
+            {
+               log.info("queue created");
+               SimpleString uniqueName = new SimpleString("flow-").concat(UUIDGenerator.getInstance()
+                                                                                       .generateSimpleStringUUID());
+
+               SimpleString queueAddress = (SimpleString)message.getProperty(ManagementHelper.HDR_ADDRESS);
+
+               SimpleString queueName = (SimpleString)message.getProperty(ManagementHelper.HDR_QUEUE_NAME);
+
+               SimpleString filterString = (SimpleString)message.getProperty(ManagementHelper.HDR_FILTERSTRING);
+
+               RemoteQueueBinding binding = new RemoteQueueBindingImpl(queueAddress,
+                                                                       uniqueName,
+                                                                       queueName,
+                                                                       filterString,
+                                                                       queue,
+                                                                       useDuplicateDetection,
+                                                                       forwardWhenNoMatchingConsumers);
+
+               bindings.put(queueName, binding);
+
+               postOffice.addBinding(binding);
+            }
+            else if (type == NotificationType.QUEUE_DESTROYED)
+            {
+               log.info("queue destroyed");
+               SimpleString queueName = (SimpleString)message.getProperty(ManagementHelper.HDR_QUEUE_NAME);
+
+               RemoteQueueBinding binding = bindings.remove(queueName);
+
+               postOffice.removeBinding(binding.getUniqueName());
+            }
+            else if (type == NotificationType.CONSUMER_CREATED)
+            {
+               log.info("consumer created");
+               SimpleString queueName = (SimpleString)message.getProperty(ManagementHelper.HDR_QUEUE_NAME);
+
+               SimpleString filterString = (SimpleString)message.getProperty(ManagementHelper.HDR_FILTERSTRING);
+
+               RemoteQueueBinding binding = bindings.get(queueName);
+
+               binding.addConsumer(filterString);
+            }
+            else if (type == NotificationType.CONSUMER_CLOSED)
+            {
+               log.info("consumer closed");
+               SimpleString queueName = (SimpleString)message.getProperty(ManagementHelper.HDR_QUEUE_NAME);
+
+               SimpleString filterString = (SimpleString)message.getProperty(ManagementHelper.HDR_FILTERSTRING);
+
+               RemoteQueueBinding binding = bindings.get(queueName);
+
+               binding.removeConsumer(filterString);
+            }
+         }
+         catch (Exception e)
+         {
+            log.error("Failed to handle message", e);
+         }
+      }
+
+   }
+
+}

Deleted: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterImpl.java	2009-01-28 10:53:22 UTC (rev 5750)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterImpl.java	2009-01-28 14:18:19 UTC (rev 5751)
@@ -1,470 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.core.server.cluster.impl;
-
-import static org.jboss.messaging.core.message.impl.MessageImpl.HDR_RESET_QUEUE_DATA;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ScheduledExecutorService;
-
-import org.jboss.messaging.core.client.ClientMessage;
-import org.jboss.messaging.core.client.MessageHandler;
-import org.jboss.messaging.core.client.management.impl.ManagementHelper;
-import org.jboss.messaging.core.cluster.DiscoveryGroup;
-import org.jboss.messaging.core.cluster.DiscoveryListener;
-import org.jboss.messaging.core.config.TransportConfiguration;
-import org.jboss.messaging.core.config.cluster.BridgeConfiguration;
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.management.NotificationType;
-import org.jboss.messaging.core.persistence.StorageManager;
-import org.jboss.messaging.core.postoffice.Binding;
-import org.jboss.messaging.core.postoffice.PostOffice;
-import org.jboss.messaging.core.postoffice.impl.BindingImpl;
-import org.jboss.messaging.core.server.Queue;
-import org.jboss.messaging.core.server.QueueFactory;
-import org.jboss.messaging.core.server.cluster.Bridge;
-import org.jboss.messaging.core.server.cluster.Cluster;
-import org.jboss.messaging.core.server.cluster.FlowBinding;
-import org.jboss.messaging.util.ExecutorFactory;
-import org.jboss.messaging.util.Pair;
-import org.jboss.messaging.util.SimpleString;
-import org.jboss.messaging.util.UUIDGenerator;
-
-/**
- * 
- * A ClusterImpl
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * 
- * Created 21 Jan 2009 14:43:05
- *
- *
- */
-public class ClusterImpl implements Cluster, DiscoveryListener
-{
-   private static final Logger log = Logger.getLogger(ClusterImpl.class);
-
-   private final ExecutorFactory executorFactory;
-
-   private final StorageManager storageManager;
-
-   private final PostOffice postOffice;
-
-   private final SimpleString name;
-
-   private final SimpleString address;
-
-   private final BridgeConfiguration bridgeConfig;
-
-   private final boolean useDuplicateDetection;
-
-   private Map<Pair<TransportConfiguration, TransportConfiguration>, MessageFlowRecord> records = new HashMap<Pair<TransportConfiguration, TransportConfiguration>, MessageFlowRecord>();
-
-   private final DiscoveryGroup discoveryGroup;
-
-   private final ScheduledExecutorService scheduledExecutor;
-
-   private final QueueFactory queueFactory;
-
-   private volatile boolean started;
-
-   /*
-    * Constructor using static list of connectors
-    */
-   public ClusterImpl(final SimpleString name,
-                      final SimpleString address,
-                      final BridgeConfiguration bridgeConfig,
-                      final boolean useDuplicateDetection,                
-                      final ExecutorFactory executorFactory,
-                      final StorageManager storageManager,
-                      final PostOffice postOffice,                     
-                      final ScheduledExecutorService scheduledExecutor,
-                      final QueueFactory queueFactory,
-                      final List<Pair<TransportConfiguration, TransportConfiguration>> connectors) throws Exception
-   {
-      this.name = name;
-
-      this.address = address;
-
-      this.bridgeConfig = bridgeConfig;
-
-      this.useDuplicateDetection = useDuplicateDetection;
-
-      this.executorFactory = executorFactory;
-
-      this.storageManager = storageManager;
-
-      this.postOffice = postOffice;
-
-      this.discoveryGroup = null;
-
-      this.scheduledExecutor = scheduledExecutor;
-
-      this.queueFactory = queueFactory;
-
-      this.updateConnectors(connectors);
-   }
-
-   /*
-    * Constructor using discovery to get connectors
-    */
-   public ClusterImpl(final SimpleString name,
-                      final SimpleString address,
-                      final BridgeConfiguration bridgeConfig,
-                      final boolean useDuplicateDetection,              
-                      final ExecutorFactory executorFactory,
-                      final StorageManager storageManager,
-                      final PostOffice postOffice,             
-                      final ScheduledExecutorService scheduledExecutor,
-                      final QueueFactory queueFactory,
-                      final DiscoveryGroup discoveryGroup) throws Exception
-   {
-      this.name = name;
-
-      this.address = address;
-
-      this.bridgeConfig = bridgeConfig;
-
-      this.executorFactory = executorFactory;
-
-      this.storageManager = storageManager;
-
-      this.postOffice = postOffice;
-
-      this.scheduledExecutor = scheduledExecutor;
-
-      this.queueFactory = queueFactory;
-
-      this.discoveryGroup = discoveryGroup;
-
-      this.useDuplicateDetection = useDuplicateDetection;
-   }
-
-   public synchronized void start() throws Exception
-   {
-      if (started)
-      {
-         return;
-      }
-
-      if (discoveryGroup != null)
-      {
-         updateConnectors(discoveryGroup.getConnectors());
-
-         discoveryGroup.registerListener(this);
-      }
-
-      started = true;
-   }
-
-   public synchronized void stop() throws Exception
-   {
-      if (!started)
-      {
-         return;
-      }
-
-      if (discoveryGroup != null)
-      {
-         discoveryGroup.unregisterListener(this);
-      }
-
-      for (MessageFlowRecord record : records.values())
-      {
-         record.close();
-      }
-
-      started = false;
-   }
-
-   public boolean isStarted()
-   {
-      return started;
-   }
-
-   public SimpleString getName()
-   {
-      return name;
-   }
-
-   // DiscoveryListener implementation ------------------------------------------------------------------
-
-   public void connectorsChanged()
-   {
-      try
-      {
-         List<Pair<TransportConfiguration, TransportConfiguration>> connectors = discoveryGroup.getConnectors();
-
-         updateConnectors(connectors);
-      }
-      catch (Exception e)
-      {
-         log.error("Failed to update connectors", e);
-      }
-   }
-
-   private void updateConnectors(final List<Pair<TransportConfiguration, TransportConfiguration>> connectors) throws Exception
-   {
-      Set<Pair<TransportConfiguration, TransportConfiguration>> connectorSet = new HashSet<Pair<TransportConfiguration, TransportConfiguration>>();
-
-      connectorSet.addAll(connectors);
-
-      Iterator<Map.Entry<Pair<TransportConfiguration, TransportConfiguration>, MessageFlowRecord>> iter = records.entrySet()
-                                                                                                                 .iterator();
-
-      while (iter.hasNext())
-      {
-         Map.Entry<Pair<TransportConfiguration, TransportConfiguration>, MessageFlowRecord> entry = iter.next();
-
-         if (!connectorSet.contains(entry.getKey()))
-         {
-            // Connector no longer there - we should remove and close it - we don't delete the queue though - it may
-            // have messages - this is up to the admininstrator to do this
-
-            entry.getValue().close();
-
-            iter.remove();
-         }
-      }
-
-      for (Pair<TransportConfiguration, TransportConfiguration> connectorPair : connectors)
-      {
-         if (!records.containsKey(connectorPair))
-         {
-            SimpleString queueName = generateQueueName(name, connectorPair);
-
-            Binding queueBinding = postOffice.getBinding(queueName);
-
-            Queue queue;
-
-            if (queueBinding != null)
-            {
-               queue = (Queue)queueBinding.getBindable();
-            }
-            else
-            {
-               queue = queueFactory.createQueue(-1, name, null, true, false);
-
-               // Add binding in storage so the queue will get reloaded on startup and we can find it - it's never
-               // actually routed to at that address though
-
-               Binding storeBinding = new BindingImpl(queue.getName(),
-                                                      queue.getName(),
-                                                      queue.getName(),
-                                                      queue,
-                                                      false,
-                                                      true);
-
-               storageManager.addQueueBinding(storeBinding);
-            }
-            
-            MessageFlowRecord record = new MessageFlowRecord(queue);
-
-            Bridge bridge = new BridgeImpl(queueName,
-                                           queue,
-                                           connectorPair,
-                                           executorFactory.getExecutor(),
-                                           bridgeConfig.getMaxBatchSize(),
-                                           bridgeConfig.getMaxBatchTime(),
-                                           new SimpleString(bridgeConfig.getFilterString()),
-                                           null,
-                                           storageManager,
-                                           scheduledExecutor,
-                                           null,
-                                           bridgeConfig.getRetryInterval(),
-                                           bridgeConfig.getRetryIntervalMultiplier(),
-                                           bridgeConfig.getMaxRetriesBeforeFailover(),
-                                           bridgeConfig.getMaxRetriesAfterFailover(),
-                                           false,
-                                           record,
-                                           address.toString());
-            
-            record.setBridge(bridge);
-
-            records.put(connectorPair, record);
-
-            bridge.start();
-         }
-      }
-   }
-
-   private SimpleString generateQueueName(final SimpleString clusterName,
-                                          final Pair<TransportConfiguration, TransportConfiguration> connectorPair) throws Exception
-   {
-      return new SimpleString("cluster." + name +
-                              "." +
-                              generateConnectorString(connectorPair.a) +
-                              "-" +
-                              (connectorPair.b == null ? "null" : generateConnectorString(connectorPair.b)));
-   }
-
-   private String replaceWildcardChars(final String str)
-   {
-      return str.replace('.', '-');
-   }
-
-   private SimpleString generateConnectorString(final TransportConfiguration config) throws Exception
-   {
-      StringBuilder str = new StringBuilder(replaceWildcardChars(config.getFactoryClassName()));
-
-      if (!config.getParams().isEmpty())
-      {
-         str.append("?");
-      }
-
-      boolean first = true;
-      for (Map.Entry<String, Object> entry : config.getParams().entrySet())
-      {
-         if (!first)
-         {
-            str.append("&");
-         }
-         String encodedKey = replaceWildcardChars(entry.getKey());
-
-         String val = entry.getValue().toString();
-         String encodedVal = replaceWildcardChars(val);
-
-         str.append(encodedKey).append('=').append(encodedVal);
-
-         first = false;
-      }
-
-      return new SimpleString(str.toString());
-   }
-   
-   // Inner classes -----------------------------------------------------------------------------------
-   
-   private class MessageFlowRecord implements MessageHandler
-   {
-      private Bridge bridge;
-
-      private final Queue queue;
-
-      private final Map<SimpleString, FlowBinding> bindings = new HashMap<SimpleString, FlowBinding>();
-      
-      private boolean firstReset = false;
-
-      public MessageFlowRecord(final Queue queue)
-      {
-         this.queue = queue;
-      }
-
-      public void close() throws Exception
-      {
-         bridge.stop();
-
-         for (FlowBinding binding : bindings.values())
-         {
-            postOffice.removeBinding(binding.getUniqueName());
-         }
-      }
-      
-      public void setBridge(final Bridge bridge)
-      {
-         this.bridge = bridge;
-      }
-
-      public void onMessage(final ClientMessage message)
-      {
-         try
-         {
-            // Reset the bindings
-            if (message.getProperty(HDR_RESET_QUEUE_DATA) != null)
-            {
-               for (FlowBinding binding : bindings.values())
-               {
-                  postOffice.removeBinding(binding.getUniqueName());
-               }
-   
-               bindings.clear();
-               
-               firstReset = true;
-            }
-            
-            if (!firstReset)
-            {
-               return;
-            }
-   
-            NotificationType type = NotificationType.valueOf(message.getProperty(ManagementHelper.HDR_NOTIFICATION_TYPE)
-                                                                    .toString());
-   
-            if (type == NotificationType.QUEUE_CREATED)
-            {
-               SimpleString uniqueName = new SimpleString("flow-").concat(UUIDGenerator.getInstance()
-                                                                                       .generateSimpleStringUUID());
-   
-               SimpleString queueAddress = (SimpleString)message.getProperty(ManagementHelper.HDR_ADDRESS);
-   
-               SimpleString queueName = (SimpleString)message.getProperty(ManagementHelper.HDR_QUEUE_NAME);
-   
-               FlowBinding binding = new FlowBindingImpl(queueAddress, uniqueName, queueName, queue, useDuplicateDetection);
-   
-               bindings.put(queueName, binding);
-   
-               postOffice.addBinding(binding);
-            }
-            else if (type == NotificationType.QUEUE_DESTROYED)
-            {
-               SimpleString queueName = (SimpleString)message.getProperty(ManagementHelper.HDR_QUEUE_NAME);
-   
-               FlowBinding binding = bindings.remove(queueName);
-   
-               postOffice.removeBinding(binding.getUniqueName());
-            }
-            else if (type == NotificationType.CONSUMER_CREATED)
-            {
-               SimpleString queueName = (SimpleString)message.getProperty(ManagementHelper.HDR_QUEUE_NAME);
-   
-               SimpleString filterString = (SimpleString)message.getProperty(ManagementHelper.HDR_FILTERSTRING);
-   
-               FlowBinding binding = bindings.get(queueName);
-   
-               binding.addConsumer(filterString);
-            }
-            else if (type == NotificationType.CONSUMER_CLOSED)
-            {
-               SimpleString queueName = (SimpleString)message.getProperty(ManagementHelper.HDR_QUEUE_NAME);
-   
-               SimpleString filterString = (SimpleString)message.getProperty(ManagementHelper.HDR_FILTERSTRING);
-   
-               FlowBinding binding = bindings.get(queueName);
-   
-               binding.removeConsumer(filterString);
-            }
-         }
-         catch (Exception e)
-         {
-            log.error("Failed to handle message", e);
-         }
-      }
-
-   }
-
-
-}

Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java	2009-01-28 10:53:22 UTC (rev 5750)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java	2009-01-28 14:18:19 UTC (rev 5751)
@@ -38,7 +38,7 @@
 import org.jboss.messaging.core.config.TransportConfiguration;
 import org.jboss.messaging.core.config.cluster.BridgeConfiguration;
 import org.jboss.messaging.core.config.cluster.BroadcastGroupConfiguration;
-import org.jboss.messaging.core.config.cluster.ClusterConfiguration;
+import org.jboss.messaging.core.config.cluster.ClusterConnectionConfiguration;
 import org.jboss.messaging.core.config.cluster.DiscoveryGroupConfiguration;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.management.ManagementService;
@@ -49,7 +49,7 @@
 import org.jboss.messaging.core.server.QueueFactory;
 import org.jboss.messaging.core.server.cluster.Bridge;
 import org.jboss.messaging.core.server.cluster.BroadcastGroup;
-import org.jboss.messaging.core.server.cluster.Cluster;
+import org.jboss.messaging.core.server.cluster.ClusterConnection;
 import org.jboss.messaging.core.server.cluster.ClusterManager;
 import org.jboss.messaging.core.server.cluster.Transformer;
 import org.jboss.messaging.util.ExecutorFactory;
@@ -74,9 +74,9 @@
    private final Map<String, DiscoveryGroup> discoveryGroups = new HashMap<String, DiscoveryGroup>();
 
    private final Map<String, Bridge> bridges = new HashMap<String, Bridge>();
-   
-   private final Map<String, Cluster> clusters = new HashMap<String, Cluster>();
 
+   private final Map<String, ClusterConnection> clusters = new HashMap<String, ClusterConnection>();
+
    private final ExecutorFactory executorFactory;
 
    private final StorageManager storageManager;
@@ -88,7 +88,7 @@
    private final ManagementService managementService;
 
    private final Configuration configuration;
-   
+
    private final QueueFactory queueFactory;
 
    private volatile boolean started;
@@ -112,7 +112,7 @@
       this.managementService = managementService;
 
       this.configuration = configuration;
-      
+
       this.queueFactory = queueFactory;
    }
 
@@ -137,10 +137,10 @@
       {
          deployBridge(config);
       }
-      
-      for (ClusterConfiguration config: configuration.getClusterConfigurations())
+
+      for (ClusterConnectionConfiguration config : configuration.getClusterConfigurations())
       {
-         deployCluster(config);
+         deployClusterConnection(config);
       }
 
       started = true;
@@ -170,11 +170,12 @@
          bridge.stop();
          managementService.unregisterBridge(bridge.getName().toString());
       }
-      
-      for (Cluster cluster: clusters.values())
+
+      log.info("stopping cluster connecttions");
+      for (ClusterConnection clusterConnection : clusters.values())
       {
-         cluster.stop();
-         managementService.unregisterCluster(cluster.getName().toString());
+         clusterConnection.stop();
+         managementService.unregisterCluster(clusterConnection.getName().toString());
       }
 
       broadcastGroups.clear();
@@ -407,8 +408,8 @@
          bridge.start();
       }
    }
-   
-   private synchronized void deployCluster(final ClusterConfiguration config) throws Exception
+
+   private synchronized void deployClusterConnection(final ClusterConnectionConfiguration config) throws Exception
    {
       if (config.getName() == null)
       {
@@ -423,80 +424,86 @@
 
          return;
       }
-      
-      Cluster cluster;
-     
+
+      ClusterConnection clusterConnection;
+
       List<Pair<TransportConfiguration, TransportConfiguration>> connectors = new ArrayList<Pair<TransportConfiguration, TransportConfiguration>>();
-      
+
       if (config.getStaticConnectorNamePairs() != null)
       {
-         for (Pair<String, String> connectorNamePair: config.getStaticConnectorNamePairs())
-         {                    
+         for (Pair<String, String> connectorNamePair : config.getStaticConnectorNamePairs())
+         {
             TransportConfiguration connector = configuration.getConnectorConfigurations().get(connectorNamePair.a);
-   
+
             if (connector == null)
             {
-               log.warn("No connector defined with name '" + connectorNamePair.a + "'. The bridge will not be deployed.");
-   
+               log.warn("No connector defined with name '" + connectorNamePair.a +
+                        "'. The bridge will not be deployed.");
+
                return;
             }
-   
+
             TransportConfiguration backupConnector = null;
-   
+
             if (connectorNamePair.b != null)
             {
                backupConnector = configuration.getConnectorConfigurations().get(connectorNamePair.b);
-   
+
                if (backupConnector == null)
                {
                   log.warn("No connector defined with name '" + connectorNamePair.b +
                            "'. The bridge will not be deployed.");
-   
+
                   return;
                }
             }
-   
+
             Pair<TransportConfiguration, TransportConfiguration> pair = new Pair<TransportConfiguration, TransportConfiguration>(connector,
                                                                                                                                  backupConnector);
-            
+
             connectors.add(pair);
          }
-         
-         cluster = new ClusterImpl(new SimpleString(config.getName()),
-                                           new SimpleString(config.getAddress()),
-                                           config.getBridgeConfig(),
-                                           config.isDuplicateDetection(),
-                                           executorFactory,
-                                           storageManager,
-                                           postOffice,
-                                           scheduledExecutor,
-                                           queueFactory,
-                                           connectors);
+
+         clusterConnection = new ClusterConnectionImpl(new SimpleString(config.getName()),
+                                                       new SimpleString(config.getAddress()),
+                                                       config.getBridgeConfig(),
+                                                       config.isDuplicateDetection(),
+                                                       config.isForwardWhenNoConsumers(),
+                                                       executorFactory,
+                                                       storageManager,
+                                                       postOffice,
+                                                       scheduledExecutor,
+                                                       queueFactory,
+                                                       connectors);
       }
       else
       {
          DiscoveryGroup dg = discoveryGroups.get(config.getDiscoveryGroupName());
-         
+
          if (dg == null)
          {
-            log.warn("No discovery group with name '" + config.getDiscoveryGroupName() + "'. The bridge will not be deployed.");
+            log.warn("No discovery group with name '" + config.getDiscoveryGroupName() +
+                     "'. The bridge will not be deployed.");
          }
-         
-         cluster = new ClusterImpl(new SimpleString(config.getName()),
-                                           new SimpleString(config.getAddress()),
-                                           config.getBridgeConfig(),
-                                           config.isDuplicateDetection(),
-                                           executorFactory,
-                                           storageManager,
-                                           postOffice,
-                                           scheduledExecutor,
-                                           queueFactory,
-                                           dg);
+
+         clusterConnection = new ClusterConnectionImpl(new SimpleString(config.getName()),
+                                                       new SimpleString(config.getAddress()),
+                                                       config.getBridgeConfig(),
+                                                       config.isDuplicateDetection(),
+                                                       config.isForwardWhenNoConsumers(),
+                                                       executorFactory,
+                                                       storageManager,
+                                                       postOffice,
+                                                       scheduledExecutor,
+                                                       queueFactory,
+                                                       dg);
       }
 
-      managementService.registerCluster(cluster, config);
+      managementService.registerCluster(clusterConnection, config);
 
-      clusters.put(config.getName(), cluster);
+      clusters.put(config.getName(), clusterConnection);
+
+      clusterConnection.start();
    }
 
    private Transformer instantiateTransformer(final String transformerClassName)

Deleted: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/FlowBindingImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/FlowBindingImpl.java	2009-01-28 10:53:22 UTC (rev 5750)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/FlowBindingImpl.java	2009-01-28 14:18:19 UTC (rev 5751)
@@ -1,183 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.core.server.cluster.impl;
-
-import static org.jboss.messaging.core.message.impl.MessageImpl.HDR_FROM_CLUSTER;
-
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import org.jboss.messaging.core.filter.Filter;
-import org.jboss.messaging.core.filter.impl.FilterImpl;
-import org.jboss.messaging.core.message.impl.MessageImpl;
-import org.jboss.messaging.core.postoffice.impl.BindingImpl;
-import org.jboss.messaging.core.server.Bindable;
-import org.jboss.messaging.core.server.ServerMessage;
-import org.jboss.messaging.core.server.cluster.FlowBinding;
-import org.jboss.messaging.util.SimpleString;
-
-/**
- * A FlowBindingImpl
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * 
- * Created 21 Jan 2009 18:55:22
- *
- *
- */
-public class FlowBindingImpl extends BindingImpl implements FlowBinding
-{
-   private final Set<Filter> filters = new HashSet<Filter>();
-
-   private final Map<SimpleString, Integer> filterCounts = new HashMap<SimpleString, Integer>();
-
-   private int consumerCount;
-   
-   private final boolean duplicateDetection;
-   
-   private final SimpleString headerName;
-
-   public FlowBindingImpl(final SimpleString address,
-                          final SimpleString uniqueName,
-                          final SimpleString routingName,
-                          final Bindable bindable,
-                          final boolean duplicateDetection)
-   {
-      super(address, uniqueName, routingName, bindable, false, false);
-      
-      this.duplicateDetection = duplicateDetection;
-      
-      headerName = MessageImpl.HDR_ROUTE_TO_PREFIX.concat(routingName);
-   }
-
-   public boolean accept(final ServerMessage message) throws Exception
-   {
-      if (message.containsProperty(MessageImpl.HDR_FROM_CLUSTER))
-      {
-         return false;
-      }
-      
-      if (consumerCount == 0)
-      {
-         return false;
-      }
-
-      boolean accepted = false;
-      
-      if (filters.isEmpty())
-      {
-         accepted = true;
-      }
-      else
-      {
-         for (Filter filter : filters)
-         {
-            if (filter.match(message))
-            {
-               accepted = true;
-               
-               break;
-            }
-         }         
-      }
-      
-      if (duplicateDetection && accepted)
-      {
-         if (!message.containsProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID))
-         {
-            //Add the message id as a duplicate id header - this will be detected when routing on the remote node - 
-            //any duplicates will be rejected
-            byte[] bytes = new byte[8];
-            
-            ByteBuffer buff = ByteBuffer.wrap(bytes);
-            
-            buff.putLong(message.getMessageID());
-            
-            SimpleString dupID = new SimpleString(bytes);
-            
-            message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);                       
-         }
-      }
-      
-            
-      message.putBooleanProperty(headerName, Boolean.valueOf(true)); 
-      
-      message.putBooleanProperty(HDR_FROM_CLUSTER, Boolean.valueOf(true));
-           
-      return accepted;
-   }
-
-   public synchronized void addConsumer(final SimpleString filterString) throws Exception
-   {
-      if (filterString != null)
-      {
-         // There can actually be many consumers on the same queue with the same filter, so we need to maintain a ref
-         // count
-
-         Integer i = filterCounts.get(filterString);
-
-         if (i == null)
-         {
-            filterCounts.put(filterString, 0);
-
-            filters.add(new FilterImpl(filterString));
-         }
-         else
-         {
-            filterCounts.put(filterString, i + 1);
-         }
-      }
-
-      consumerCount++;
-   }
-
-   public synchronized void removeConsumer(final SimpleString filterString) throws Exception
-   {
-      if (filterString != null)
-      {
-         Integer i = filterCounts.get(filterString);
-
-         if (i != null)
-         {
-            int ii = i - 1;
-
-            if (ii == 0)
-            {
-               filterCounts.remove(filterString);
-
-               filters.remove(filterString);
-            }
-            else
-            {
-               filterCounts.put(filterString, ii);
-            }
-         }
-      }
-
-      consumerCount--;
-   }
-
-}

Added: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/RemoteQueueBindingImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/RemoteQueueBindingImpl.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/RemoteQueueBindingImpl.java	2009-01-28 14:18:19 UTC (rev 5751)
@@ -0,0 +1,224 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.server.cluster.impl;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.jboss.messaging.core.filter.Filter;
+import org.jboss.messaging.core.filter.impl.FilterImpl;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.server.Bindable;
+import org.jboss.messaging.core.server.Queue;
+import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.core.server.cluster.RemoteQueueBinding;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * A RemoteQueueBindingImpl
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * 
+ * Created 21 Jan 2009 18:55:22
+ *
+ *
+ */
+public class RemoteQueueBindingImpl implements RemoteQueueBinding
+{
+   private static final Logger log = Logger.getLogger(RemoteQueueBindingImpl.class);
+
+   private final SimpleString address;
+
+   private final Queue storeAndForwardQueue;
+
+   private final SimpleString uniqueName;
+
+   private final SimpleString routingName;
+
+   private final Filter queueFilter;
+
+   private final Set<Filter> filters = new HashSet<Filter>();
+
+   private final Map<SimpleString, Integer> filterCounts = new HashMap<SimpleString, Integer>();
+
+   private int consumerCount;
+
+   private final boolean duplicateDetection;
+   
+   private final boolean forwardWhenNoMatchingConsumers;
+
+   public RemoteQueueBindingImpl(final SimpleString address,
+                                 final SimpleString uniqueName,
+                                 final SimpleString routingName,
+                                 final SimpleString filterString,
+                                 final Queue storeAndForwardQueue,
+                                 final boolean duplicateDetection,
+                                 final boolean forwardWhenNoMatchingConsumers) throws Exception
+   {
+      this.address = address;
+
+      this.storeAndForwardQueue = storeAndForwardQueue;
+
+      this.uniqueName = uniqueName;
+
+      this.routingName = routingName;
+
+      this.duplicateDetection = duplicateDetection;
+
+      if (filterString != null)
+      {
+         queueFilter = new FilterImpl(filterString);
+      }
+      else
+      {
+         queueFilter = null;
+      }
+      
+      this.forwardWhenNoMatchingConsumers = forwardWhenNoMatchingConsumers;
+   }
+
+   public SimpleString getAddress()
+   {
+      return address;
+   }
+
+   public Bindable getBindable()
+   {
+      return storeAndForwardQueue;
+   }
+
+   public SimpleString getRoutingName()
+   {
+      return routingName;
+   }
+
+   public SimpleString getUniqueName()
+   {
+      return uniqueName;
+   }
+
+   public boolean isExclusive()
+   {
+      return false;
+   }
+
+   public boolean isQueueBinding()
+   {
+      return false;
+   }
+
+   public boolean filterMatches(final ServerMessage message) throws Exception
+   {
+      if (queueFilter != null && !queueFilter.match(message))
+      {
+         return false;
+      }
+      else
+      {
+         return true;
+      }
+   }
+
+   public boolean isHighAcceptPriority(final ServerMessage message)
+   {
+      if (forwardWhenNoMatchingConsumers)
+      {
+         return true;
+      }
+      
+      if (consumerCount == 0)
+      {
+         return false;
+      }
+
+      if (filters.isEmpty())
+      {
+         return true;
+      }
+      else
+      {
+         for (Filter filter : filters)
+         {
+            if (filter.match(message))
+            {
+               return true;
+            }
+         }
+      }
+
+      return false;
+   }
+
+   public synchronized void addConsumer(final SimpleString filterString) throws Exception
+   {
+      if (filterString != null)
+      {
+         // There can actually be many consumers on the same queue with the same filter, so we need to maintain a ref
+         // count
+
+         Integer i = filterCounts.get(filterString);
+
+         if (i == null)
+         {
+            filterCounts.put(filterString, 0);
+
+            filters.add(new FilterImpl(filterString));
+         }
+         else
+         {
+            filterCounts.put(filterString, i + 1);
+         }
+      }
+
+      consumerCount++;
+   }
+
+   public synchronized void removeConsumer(final SimpleString filterString) throws Exception
+   {
+      if (filterString != null)
+      {
+         Integer i = filterCounts.get(filterString);
+
+         if (i != null)
+         {
+            int ii = i - 1;
+
+            if (ii == 0)
+            {
+               filterCounts.remove(filterString);
+
+               filters.remove(filterString);
+            }
+            else
+            {
+               filterCounts.put(filterString, ii);
+            }
+         }
+      }
+
+      consumerCount--;
+   }
+
+}

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/DivertImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/DivertImpl.java	2009-01-28 10:53:22 UTC (rev 5750)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/DivertImpl.java	2009-01-28 14:18:19 UTC (rev 5751)
@@ -96,35 +96,26 @@
       this.storageManager = storageManager;
    }
 
-   public boolean accept(final ServerMessage message) throws Exception
+   public void preroute(final ServerMessage message, final Transaction tx) throws Exception
    {
-      if (filter != null && !filter.match(message))
+      //We need to increment ref count here to ensure that the message doesn't get stored, deleted and stored again in a single route which
+      //can occur if the message is routed to a queue, then acked before it's routed here
+      
+      //TODO - combine with similar code in QueueImpl.accept()
+      
+      int count = message.incrementRefCount();
+      
+      if (count == 1)
       {
-         return false;
-      }
-      else
-      {
-         //We need to increment ref count here to ensure that the message doesn't get stored, deleted and stored again in a single route which
-         //can occur if the message is routed to a queue, then acked before it's routed here
+         PagingStore store = pagingManager.getPageStore(message.getDestination());
          
-         //TODO - combine with similar code in QueueImpl.accept()
-         
-         int count = message.incrementRefCount();
-         
-         if (count == 1)
-         {
-            PagingStore store = pagingManager.getPageStore(message.getDestination());
-            
-            store.addSize(message.getMemoryEstimate());
-         }
-       
-         if (message.isDurable())
-         {
-            message.incrementDurableRefCount();
-         }
-         
-         return true;
+         store.addSize(message.getMemoryEstimate());
       }
+    
+      if (message.isDurable())
+      {
+         message.incrementDurableRefCount();
+      }     
    }
 
    public void route(ServerMessage message, final Transaction tx) throws Exception
@@ -180,4 +171,9 @@
    {
       return exclusive;
    }
+   
+   public Filter getFilter()
+   {
+      return filter;
+   }
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2009-01-28 10:53:22 UTC (rev 5750)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2009-01-28 14:18:19 UTC (rev 5751)
@@ -45,7 +45,8 @@
 import org.jboss.messaging.core.postoffice.Binding;
 import org.jboss.messaging.core.postoffice.DuplicateIDCache;
 import org.jboss.messaging.core.postoffice.PostOffice;
-import org.jboss.messaging.core.postoffice.impl.BindingImpl;
+import org.jboss.messaging.core.postoffice.impl.DivertBinding;
+import org.jboss.messaging.core.postoffice.impl.LocalQueueBinding;
 import org.jboss.messaging.core.postoffice.impl.PostOfficeImpl;
 import org.jboss.messaging.core.remoting.Channel;
 import org.jboss.messaging.core.remoting.ChannelHandler;
@@ -288,7 +289,7 @@
                                                 true,
                                                 false);
 
-         Binding binding = new BindingImpl(queueBindingInfo.getAddress(), queueBindingInfo.getQueueName(), queueBindingInfo.getQueueName(), queue, false, true);
+         Binding binding = new LocalQueueBinding(queueBindingInfo.getAddress(), queue);
 
          queues.put(queueBindingInfo.getPersistenceID(), queue);
 
@@ -799,7 +800,7 @@
 
             Queue queue = queueFactory.createQueue(-1, name, filter, config.isDurable(), false);
 
-            Binding queueBinding = new BindingImpl(new SimpleString(config.getAddress()), name, name, queue, false, true);
+            Binding queueBinding = new LocalQueueBinding(new SimpleString(config.getAddress()), queue);
             
             binding = queueBinding;
 
@@ -868,7 +869,7 @@
                                         pagingManager,
                                         storageManager);
 
-         Binding binding = new BindingImpl(sAddress, sName, new SimpleString(config.getRoutingName()), divert, config.isExclusive(), false);
+         Binding binding = new DivertBinding(sAddress, divert);
 
          postOffice.addBinding(binding);
       }

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2009-01-28 10:53:22 UTC (rev 5750)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2009-01-28 14:18:19 UTC (rev 5751)
@@ -116,7 +116,7 @@
 
    private int consumersToFailover = -1;
 
-   private final SimpleString routeToPropertyName;
+  // private final SimpleString routeToPropertyName;
 
    public QueueImpl(final long persistenceID,
                     final SimpleString name,
@@ -144,8 +144,6 @@
 
       this.queueSettingsRepository = queueSettingsRepository;
 
-      routeToPropertyName = MessageImpl.HDR_ROUTE_TO_PREFIX.concat(name);
-
       if (postOffice == null)
       {
          pagingManager = null;
@@ -161,43 +159,7 @@
    }
 
    // Bindable implementation -------------------------------------------------------------------------------------
-
-   public boolean accept(final ServerMessage message) throws Exception
-   {
-      // if (message.containsProperty(MessageImpl.HDR_FROM_CLUSTER))
-      // {
-      // if (message.removeProperty(routeToPropertyName) == null)
-      // {
-      // return false;
-      // }
-      // }
-
-      if (filter != null && !filter.match(message))
-      {
-         return false;
-      }
-      else
-      {
-         int count = message.incrementRefCount();
-
-         if (count == 1)
-         {
-            PagingStore store = pagingManager.getPageStore(message.getDestination());
-
-            store.addSize(message.getMemoryEstimate());
-         }
-
-         boolean durableRef = message.isDurable() && durable;
-
-         if (durableRef)
-         {
-            message.incrementDurableRefCount();
-         }
-
-         return true;
-      }
-   }
-
+  
    public SimpleString getRoutingName()
    {
       return name;
@@ -213,14 +175,29 @@
       return false;
    }
 
-   public void route(final ServerMessage message, final Transaction tx) throws Exception
+   public void preroute(final ServerMessage message, final Transaction tx) throws Exception
    {
-      // Temp
-      SimpleString routeToHeader = MessageImpl.HDR_ROUTE_TO_PREFIX.concat(name);
-      message.removeProperty(routeToHeader);
+      int count = message.incrementRefCount();
 
+      if (count == 1)
+      {
+         PagingStore store = pagingManager.getPageStore(message.getDestination());
+
+         store.addSize(message.getMemoryEstimate());
+      }
+
       boolean durableRef = message.isDurable() && durable;
 
+      if (durableRef)
+      {
+         message.incrementDurableRefCount();
+      }      
+   }
+   
+   public void route(final ServerMessage message, final Transaction tx) throws Exception
+   {
+      boolean durableRef = message.isDurable() && durable;
+
       // If durable, must be persisted before anything is routed
       MessageReference ref = message.createReference(this);
 
@@ -285,10 +262,6 @@
 
    public MessageReference reroute(final ServerMessage message, final Transaction tx) throws Exception
    {
-      // Temp
-      SimpleString routeToHeader = MessageImpl.HDR_ROUTE_TO_PREFIX.concat(name);
-      message.removeProperty(routeToHeader);
-
       MessageReference ref = message.createReference(this);
 
       int count = message.incrementRefCount();

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2009-01-28 10:53:22 UTC (rev 5750)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2009-01-28 14:18:19 UTC (rev 5751)
@@ -195,16 +195,9 @@
       return doHandle(ref);
    }
    
-   public SimpleString getFilterString()
+   public Filter getFilter()
    {
-      if (filter != null)
-      {
-         return filter.getFilterString();
-      }
-      else
-      {
-         return null;
-      }
+      return filter;
    }
 
    public void handleClose(final Packet packet)

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2009-01-28 10:53:22 UTC (rev 5750)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2009-01-28 14:18:19 UTC (rev 5751)
@@ -39,7 +39,7 @@
 import org.jboss.messaging.core.postoffice.Binding;
 import org.jboss.messaging.core.postoffice.Bindings;
 import org.jboss.messaging.core.postoffice.PostOffice;
-import org.jboss.messaging.core.postoffice.impl.BindingImpl;
+import org.jboss.messaging.core.postoffice.impl.LocalQueueBinding;
 import org.jboss.messaging.core.remoting.Channel;
 import org.jboss.messaging.core.remoting.DelayedResult;
 import org.jboss.messaging.core.remoting.FailureListener;
@@ -1492,7 +1492,7 @@
 
          final Queue queue = queueFactory.createQueue(-1, name, filter, durable, temporary);
 
-         binding = new BindingImpl(address, name, name, queue, false, true);
+         binding = new LocalQueueBinding(address, queue);
 
          if (durable)
          {

Modified: trunk/tests/src/org/jboss/messaging/tests/performance/persistence/FakeBinding.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/performance/persistence/FakeBinding.java	2009-01-28 10:53:22 UTC (rev 5750)
+++ trunk/tests/src/org/jboss/messaging/tests/performance/persistence/FakeBinding.java	2009-01-28 14:18:19 UTC (rev 5751)
@@ -34,12 +34,19 @@
  */
 public class FakeBinding implements Binding
 {
-   public boolean accept(ServerMessage message) throws Exception
+   
+   public boolean filterMatches(ServerMessage message) throws Exception
    {
       // TODO Auto-generated method stub
       return false;
    }
 
+   public boolean isHighAcceptPriority(ServerMessage message)
+   {
+      // TODO Auto-generated method stub
+      return false;
+   }
+
    public SimpleString getRoutingName()
    {
       // TODO Auto-generated method stub

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java	2009-01-28 10:53:22 UTC (rev 5750)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java	2009-01-28 14:18:19 UTC (rev 5751)
@@ -589,7 +589,7 @@
 
       class NullConsumer implements Consumer
       {
-         public SimpleString getFilterString()
+         public Filter getFilter()
          {
             return null;
          }

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/fakes/FakeConsumer.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/fakes/FakeConsumer.java	2009-01-28 10:53:22 UTC (rev 5750)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/fakes/FakeConsumer.java	2009-01-28 14:18:19 UTC (rev 5751)
@@ -59,7 +59,7 @@
       this.filter = filter;
    }
 
-   public SimpleString getFilterString()
+   public Filter getFilter()
    {
       // TODO Auto-generated method stub
       return null;




More information about the jboss-cvs-commits mailing list