[jboss-cvs] JBoss Messaging SVN: r5780 - in trunk: src/main/org/jboss/messaging/core/postoffice and 5 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Feb 2 15:12:41 EST 2009


Author: timfox
Date: 2009-02-02 15:12:41 -0500 (Mon, 02 Feb 2009)
New Revision: 5780

Modified:
   trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java
   trunk/src/main/org/jboss/messaging/core/postoffice/Binding.java
   trunk/src/main/org/jboss/messaging/core/postoffice/Bindings.java
   trunk/src/main/org/jboss/messaging/core/postoffice/QueueInfo.java
   trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java
   trunk/src/main/org/jboss/messaging/core/postoffice/impl/DivertBinding.java
   trunk/src/main/org/jboss/messaging/core/postoffice/impl/LocalQueueBinding.java
   trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/impl/RemoteQueueBindingImpl.java
   trunk/src/schemas/jbm-queues.xsd
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java
   trunk/tests/src/org/jboss/messaging/tests/performance/persistence/FakeBinding.java
Log:
More clustering


Modified: trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java	2009-02-02 18:57:51 UTC (rev 5779)
+++ trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java	2009-02-02 20:12:41 UTC (rev 5780)
@@ -117,7 +117,7 @@
    private HierarchicalRepository<Set<Role>> securityRepository;
 
    private HierarchicalRepository<QueueSettings> queueSettingsRepository;
-   
+
    private MessagingServerControl managedServer;
 
    private final MessageCounterManager messageCounterManager = new MessageCounterManagerImpl(10000);
@@ -127,9 +127,9 @@
    private boolean started = false;
 
    private boolean noticationsEnabled;
-      
+
    private final Set<NotificationListener> listeners = new ConcurrentHashSet<NotificationListener>();
-   
+
    // Static --------------------------------------------------------
 
    public static ObjectName getMessagingServerObjectName() throws Exception
@@ -214,7 +214,7 @@
       this.managementNotificationAddress = configuration.getManagementNotificationAddress();
       managedServer = new MessagingServerControl(postOffice,
                                                  storageManager,
-                                                 configuration,                                                 
+                                                 configuration,
                                                  resourceManager,
                                                  remotingService,
                                                  messagingServer,
@@ -240,30 +240,30 @@
       AddressControl addressControl = new AddressControl(address, postOffice, securityRepository);
 
       registerInJMX(objectName, new ReplicationAwareAddressControlWrapper(objectName, addressControl));
-      
+
       registerInRegistry(objectName, addressControl);
-      
+
       if (log.isDebugEnabled())
       {
          log.debug("registered address " + objectName);
       }
       TypedProperties props = new TypedProperties();
-      
+
       props.putStringProperty(ManagementHelper.HDR_ADDRESS, address);
-            
+
       sendNotification(new Notification(NotificationType.ADDRESS_ADDED, props));
    }
 
    public void unregisterAddress(final SimpleString address) throws Exception
    {
       ObjectName objectName = getAddressObjectName(address);
-      
+
       unregisterResource(objectName);
-      
+
       TypedProperties props = new TypedProperties();
-      
+
       props.putStringProperty(ManagementHelper.HDR_ADDRESS, address);
-      
+
       sendNotification(new Notification(NotificationType.ADDRESS_REMOVED, props));
    }
 
@@ -284,14 +284,14 @@
       if (log.isDebugEnabled())
       {
          log.debug("registered queue " + objectName);
-      }            
+      }
    }
 
    public void unregisterQueue(final SimpleString name, final SimpleString address) throws Exception
    {
       ObjectName objectName = getQueueObjectName(address, name);
       unregisterResource(objectName);
-      messageCounterManager.unregisterMessageCounter(name.toString());     
+      messageCounterManager.unregisterMessageCounter(name.toString());
    }
 
    public void registerAcceptor(final Acceptor acceptor, final TransportConfiguration configuration) throws Exception
@@ -341,7 +341,7 @@
       ObjectName objectName = getBridgeObjectName(configuration.getName());
       BridgeControlMBean control = new BridgeControl(bridge, configuration);
       registerInJMX(objectName, new StandardMBean(control, BridgeControlMBean.class));
-      registerInRegistry(objectName, control);           
+      registerInRegistry(objectName, control);
    }
 
    public void unregisterBridge(String name) throws Exception
@@ -416,15 +416,15 @@
       unregisterFromRegistry(objectName);
       unregisterFromJMX(objectName);
    }
-   
+
    public void registerCluster(final ClusterConnection cluster, final ClusterConnectionConfiguration configuration) throws Exception
-   {      
-      //TODO
+   {
+      // TODO
    }
-   
+
    public void unregisterCluster(final String name) throws Exception
-   {      
-      //TODO
+   {
+      // TODO
    }
 
    public Object getResource(final ObjectName objectName)
@@ -455,7 +455,7 @@
    {
       listeners.add(listener);
    }
-   
+
    public void removeNotificationListener(final NotificationListener listener)
    {
       listeners.remove(listener);
@@ -512,16 +512,16 @@
          }
       }
    }
-      
+
    public void sendNotification(final Notification notification) throws Exception
-   {     
+   {
       if (managedServer != null && noticationsEnabled)
       {
-         //This needs to be synchronized since we need to ensure notifications are processed in strict sequence
+         // This needs to be synchronized since we need to ensure notifications are processed in strict sequence
          synchronized (this)
          {
-            //First send to any local listeners
-            for (NotificationListener listener: listeners)
+            // First send to any local listeners
+            for (NotificationListener listener : listeners)
             {
                try
                {
@@ -529,19 +529,20 @@
                }
                catch (Exception e)
                {
-                  //Exception thrown from one listener should not stop execution of others
+                  // Exception thrown from one listener should not stop execution of others
                   log.error("Failed to call listener", e);
-               }            
+               }
             }
-            
-            //Now send message
-            
+
+            // Now send message
+
             ServerMessage notificationMessage = new ServerMessageImpl(storageManager.generateUniqueID());
             notificationMessage.setBody(new ByteBufferWrapper(ByteBuffer.allocate(0)));
-            //Notification messages are always durable so the user can choose whether to add a durable queue to consume them in
+            // Notification messages are always durable so the user can choose whether to add a durable queue to consume
+            // them in
             notificationMessage.setDurable(true);
             notificationMessage.setDestination(managementNotificationAddress);
-               
+
             TypedProperties notifProps;
             if (notification.getProperties() != null)
             {
@@ -551,12 +552,13 @@
             {
                notifProps = new TypedProperties();
             }
-   
-            notifProps.putStringProperty(ManagementHelper.HDR_NOTIFICATION_TYPE, new SimpleString(notification.getType().toString()));        
+
+            notifProps.putStringProperty(ManagementHelper.HDR_NOTIFICATION_TYPE,
+                                         new SimpleString(notification.getType().toString()));
             notifProps.putLongProperty(ManagementHelper.HDR_NOTIFICATION_TIMESTAMP, System.currentTimeMillis());
-            
+
             notificationMessage.putTypedProperties(notifProps);
-            
+
             postOffice.route(notificationMessage, null);
          }
       }

Modified: trunk/src/main/org/jboss/messaging/core/postoffice/Binding.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/Binding.java	2009-02-02 18:57:51 UTC (rev 5779)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/Binding.java	2009-02-02 20:12:41 UTC (rev 5780)
@@ -22,6 +22,7 @@
 
 package org.jboss.messaging.core.postoffice;
 
+import org.jboss.messaging.core.filter.Filter;
 import org.jboss.messaging.core.server.Bindable;
 import org.jboss.messaging.core.server.ServerMessage;
 import org.jboss.messaging.util.SimpleString;
@@ -45,8 +46,10 @@
 
    SimpleString getRoutingName();
 
-   boolean filterMatches(ServerMessage message) throws Exception;
+   //boolean filterMatches(ServerMessage message) throws Exception;
    
+   Filter getFilter();
+   
    boolean isHighAcceptPriority(ServerMessage message);
    
    //TODO find a better way

Modified: trunk/src/main/org/jboss/messaging/core/postoffice/Bindings.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/Bindings.java	2009-02-02 18:57:51 UTC (rev 5779)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/Bindings.java	2009-02-02 20:12:41 UTC (rev 5780)
@@ -45,4 +45,6 @@
    void addBinding(Binding binding);
    
    void removeBinding(Binding binding);
+   
+   void setRouteWhenNoConsumers(boolean routeWhenNoConsumers);
 }

Modified: trunk/src/main/org/jboss/messaging/core/postoffice/QueueInfo.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/QueueInfo.java	2009-02-02 18:57:51 UTC (rev 5779)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/QueueInfo.java	2009-02-02 20:12:41 UTC (rev 5780)
@@ -45,16 +45,19 @@
    
    private final SimpleString address;
    
+   private final SimpleString filterString;
+   
    private final int id;
    
    private List<SimpleString> filterStrings;
    
    private int numberOfConsumers;
 
-   public QueueInfo(final SimpleString queueName, final SimpleString address, final int id)
+   public QueueInfo(final SimpleString queueName, final SimpleString address, final SimpleString filterString, final int id)
    {
       this.queueName = queueName;
       this.address = address;      
+      this.filterString = filterString;
       this.id = id;
    }
 
@@ -68,6 +71,11 @@
       return address;
    }
    
+   public SimpleString getFilterString()
+   {
+      return filterString;
+   }
+   
    public int getID()
    {
       return id;

Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java	2009-02-02 18:57:51 UTC (rev 5779)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java	2009-02-02 20:12:41 UTC (rev 5780)
@@ -32,6 +32,7 @@
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 
+import org.jboss.messaging.core.filter.Filter;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.message.impl.MessageImpl;
 import org.jboss.messaging.core.postoffice.Binding;
@@ -61,7 +62,14 @@
    private final Map<Integer, Binding> bindingsMap = new ConcurrentHashMap<Integer, Binding>();
 
    private final List<Binding> exclusiveBindings = new CopyOnWriteArrayList<Binding>();
+   
+   private volatile boolean routeWhenNoConsumers;
 
+   public void setRouteWhenNoConsumers(final boolean routeWhenNoConsumers)
+   {      
+      this.routeWhenNoConsumers = routeWhenNoConsumers;
+   }
+   
    public Collection<Binding> getBindings()
    {      
       return bindingsMap.values();
@@ -203,7 +211,7 @@
    
                Binding theBinding = null;
    
-               int lastNoMatchingConsumerPos = -1;
+               int lastLowPriorityBinding = -1;
    
                while (true)
                {
@@ -227,11 +235,13 @@
                      }
                   }
    
-                  if (binding.filterMatches(message))
-                  {
+                  Filter filter = binding.getFilter();
+                                                     
+                  if (filter == null || filter.match(message))
+                  {                     
                      // bindings.length == 1 ==> only a local queue so we don't check for matching consumers (it's an
                      // unnecessary overhead)
-                     if (length == 1 || binding.isHighAcceptPriority(message))
+                     if (length == 1 || routeWhenNoConsumers || binding.isHighAcceptPriority(message))
                      {
                         theBinding = binding;
    
@@ -241,7 +251,10 @@
                      }
                      else
                      {
-                        lastNoMatchingConsumerPos = pos;
+                        if (lastLowPriorityBinding == -1)
+                        {
+                           lastLowPriorityBinding = pos;
+                        }
                      }
                   }
    
@@ -249,7 +262,7 @@
    
                   if (pos == startPos)
                   {
-                     if (lastNoMatchingConsumerPos != -1)
+                     if (lastLowPriorityBinding != -1)
                      {                     
                         try
                         {
@@ -262,7 +275,7 @@
                            {
                               pos = 0;
                               
-                              lastNoMatchingConsumerPos = -1;
+                              lastLowPriorityBinding = -1;
    
                               continue;
                            }
@@ -272,7 +285,7 @@
                            }
                         }
                                             
-                        pos = lastNoMatchingConsumerPos;
+                        pos = lastLowPriorityBinding;
    
                         pos = incrementPos(pos, length);
                      }
@@ -286,7 +299,7 @@
                   
                   chosen.add(theBinding.getBindable());
                }
-   
+
                routingNamePositions.put(routingName, pos);
             }
    

Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/DivertBinding.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/DivertBinding.java	2009-02-02 18:57:51 UTC (rev 5779)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/DivertBinding.java	2009-02-02 20:12:41 UTC (rev 5780)
@@ -80,17 +80,9 @@
       this.id = id;
    }
    
-      
-   public boolean filterMatches(final ServerMessage message) throws Exception
+   public Filter getFilter()
    {
-      if (filter != null && !filter.match(message))
-      {
-         return false;
-      }
-      else
-      {
-         return true;
-      }
+      return filter;
    }
 
    public SimpleString getAddress()

Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/LocalQueueBinding.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/LocalQueueBinding.java	2009-02-02 18:57:51 UTC (rev 5779)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/LocalQueueBinding.java	2009-02-02 20:12:41 UTC (rev 5780)
@@ -78,19 +78,11 @@
       this.id = id;
    }
    
-      
-   public boolean filterMatches(final ServerMessage message) throws Exception
+   public Filter getFilter()
    {
-      if (filter != null && !filter.match(message))
-      {
-         return false;
-      }
-      else
-      {
-         return true;
-      }
+      return filter;
    }
-
+        
    public SimpleString getAddress()
    {
       return address;

Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2009-02-02 18:57:51 UTC (rev 5779)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2009-02-02 20:12:41 UTC (rev 5780)
@@ -37,6 +37,7 @@
 
 import org.jboss.messaging.core.client.management.impl.ManagementHelper;
 import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.filter.Filter;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.management.ManagementService;
 import org.jboss.messaging.core.management.Notification;
@@ -243,8 +244,10 @@
             
             Integer transientID = (Integer)props.getProperty(ManagementHelper.HDR_BINDING_ID);
             
-            QueueInfo info = new QueueInfo(queueName, address, transientID);
+            SimpleString filterString = (SimpleString)props.getProperty(ManagementHelper.HDR_FILTERSTRING);
             
+            QueueInfo info = new QueueInfo(queueName, address, filterString, transientID);
+            
             queueInfos.put(queueName, info);
          }
          else if (type == NotificationType.BINDING_REMOVED)
@@ -369,6 +372,11 @@
       props.putStringProperty(ManagementHelper.HDR_ADDRESS, binding.getAddress());
       props.putStringProperty(ManagementHelper.HDR_QUEUE_NAME, binding.getRoutingName());
       props.putIntProperty(ManagementHelper.HDR_BINDING_ID, binding.getID());
+      Filter filter = binding.getFilter();
+      if (filter != null)
+      {
+         props.putStringProperty(ManagementHelper.HDR_FILTERSTRING, filter.getFilterString());
+      }
       
       managementService.sendNotification(new Notification(NotificationType.BINDING_ADDED, props));
    }
@@ -569,9 +577,7 @@
 
       return cache;
    }
-   
-   
-
+     
    public void sendQueueInfoToQueue(final SimpleString queueName, final SimpleString address) throws Exception
    {
       //We send direct to the queue so we can send it to the same queue that is bound to the notifications adress - this is crucial for ensuring
@@ -607,6 +613,7 @@
                message.putStringProperty(ManagementHelper.HDR_ADDRESS, info.getAddress());
                message.putStringProperty(ManagementHelper.HDR_QUEUE_NAME, info.getQueueName());
                message.putIntProperty(ManagementHelper.HDR_BINDING_ID, info.getID());
+               message.putStringProperty(ManagementHelper.HDR_FILTERSTRING, info.getFilterString());
                
                queue.preroute(message, null);            
                queue.route(message, null);

Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java	2009-02-02 18:57:51 UTC (rev 5779)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java	2009-02-02 20:12:41 UTC (rev 5780)
@@ -170,8 +170,7 @@
                      final MessageHandler queueInfoMessageHandler,
                      final String queueDataAddress,
                      final boolean forClusterConnector) throws Exception
-   {
-      log.info("Creating new bridge " + name + " queue " + queue);
+   {      
       this.name = name;
 
       this.queue = queue;
@@ -213,8 +212,6 @@
 
       this.queueInfoMessageHandler = queueInfoMessageHandler;
 
-      log.info("queue info handler " + this.queueInfoMessageHandler);
-
       this.queueDataAddress = queueDataAddress;
 
       this.forClusterConnector = forClusterConnector;
@@ -252,7 +249,6 @@
       {
          try
          {
-            log.info("creating objects");
             createTx();
 
             queue.addConsumer(BridgeImpl.this);
@@ -320,8 +316,6 @@
                prod.send(message);
             }
 
-            log.info("Created objects");
-
             active = true;
 
             queue.deliverAsync(executor);
@@ -497,8 +491,6 @@
             return;
          }
 
-         log.info("sending batch");
-
          // TODO - if batch size = 1 then don't need tx - actually we should use asynch send acknowledgement stream - then we don't need a transaction at all
 
          while (true)

Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java	2009-02-02 18:57:51 UTC (rev 5779)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java	2009-02-02 20:12:41 UTC (rev 5780)
@@ -43,9 +43,9 @@
 import org.jboss.messaging.core.management.NotificationType;
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.postoffice.Binding;
+import org.jboss.messaging.core.postoffice.Bindings;
 import org.jboss.messaging.core.postoffice.PostOffice;
 import org.jboss.messaging.core.postoffice.impl.LocalQueueBinding;
-import org.jboss.messaging.core.postoffice.impl.PostOfficeImpl;
 import org.jboss.messaging.core.server.Queue;
 import org.jboss.messaging.core.server.QueueFactory;
 import org.jboss.messaging.core.server.cluster.Bridge;
@@ -84,7 +84,7 @@
 
    private final boolean useDuplicateDetection;
 
-   private final boolean forwardWhenNoMatchingConsumers;
+   private final boolean routeWhenNoConsumers;
 
    private Map<Pair<TransportConfiguration, TransportConfiguration>, MessageFlowRecord> records = new HashMap<Pair<TransportConfiguration, TransportConfiguration>, MessageFlowRecord>();
 
@@ -103,7 +103,7 @@
                                 final SimpleString address,
                                 final BridgeConfiguration bridgeConfig,
                                 final boolean useDuplicateDetection,
-                                final boolean forwardWhenNoMatchingConsumers,
+                                final boolean routeWhenNoConsumers,
                                 final ExecutorFactory executorFactory,
                                 final StorageManager storageManager,
                                 final PostOffice postOffice,
@@ -119,7 +119,7 @@
 
       this.useDuplicateDetection = useDuplicateDetection;
 
-      this.forwardWhenNoMatchingConsumers = forwardWhenNoMatchingConsumers;
+      this.routeWhenNoConsumers = routeWhenNoConsumers;
 
       this.executorFactory = executorFactory;
 
@@ -143,7 +143,7 @@
                                 final SimpleString address,
                                 final BridgeConfiguration bridgeConfig,
                                 final boolean useDuplicateDetection,
-                                final boolean forwardWhenNoMatchingConsumers,
+                                final boolean routeWhenNoConsumers,
                                 final ExecutorFactory executorFactory,
                                 final StorageManager storageManager,
                                 final PostOffice postOffice,
@@ -171,7 +171,7 @@
 
       this.useDuplicateDetection = useDuplicateDetection;
 
-      this.forwardWhenNoMatchingConsumers = forwardWhenNoMatchingConsumers;
+      this.routeWhenNoConsumers = routeWhenNoConsumers;
    }
 
    public synchronized void start() throws Exception
@@ -424,8 +424,7 @@
 
             NotificationType type = NotificationType.valueOf(message.getProperty(ManagementHelper.HDR_NOTIFICATION_TYPE)
                                                                     .toString());
- 
-       
+            
             if (type == NotificationType.BINDING_ADDED)
             {               
                SimpleString uniqueName = UUIDGenerator.getInstance().generateSimpleStringUUID();
@@ -444,13 +443,16 @@
                                                                        queueID,
                                                                        filterString,
                                                                        queue,
-                                                                       useDuplicateDetection,
-                                                                       forwardWhenNoMatchingConsumers,
+                                                                       useDuplicateDetection,                                                           
                                                                        bridge.getName());
 
                bindings.put(queueName, binding);
 
                postOffice.addBinding(binding);
+               
+               Bindings theBindings = postOffice.getBindingsForAddress(queueAddress);
+               
+               theBindings.setRouteWhenNoConsumers(routeWhenNoConsumers);
             }
             else if (type == NotificationType.BINDING_REMOVED)
             {
@@ -467,8 +469,13 @@
                SimpleString filterString = (SimpleString)message.getProperty(ManagementHelper.HDR_FILTERSTRING);
 
                RemoteQueueBinding binding = bindings.get(queueName);
-
-               binding.addConsumer(filterString);
+               
+               if (binding != null)
+               {
+                  //Can legitimately be null if there are multiple cluster connections which will all receive create consumers for different addresses since
+                  //the address isn't checked on the filter when it's an add or create consumer message
+                  binding.addConsumer(filterString);
+               }
             }
             else if (type == NotificationType.CONSUMER_CLOSED)
             {
@@ -477,8 +484,14 @@
                SimpleString filterString = (SimpleString)message.getProperty(ManagementHelper.HDR_FILTERSTRING);
 
                RemoteQueueBinding binding = bindings.get(queueName);
+               
+               if (binding != null)
+               {
+                  //Can legitimately be null if there are multiple cluster connections which will all receive create consumers for different addresses since
+                  //the address isn't checked on the filter when it's an add or create consumer message
 
-               binding.removeConsumer(filterString);
+                  binding.removeConsumer(filterString);
+               }
             }
          }
          catch (Exception e)

Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java	2009-02-02 18:57:51 UTC (rev 5779)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java	2009-02-02 20:12:41 UTC (rev 5780)
@@ -171,7 +171,6 @@
          managementService.unregisterBridge(bridge.getName().toString());
       }
 
-      log.info("stopping cluster connecttions");
       for (ClusterConnection clusterConnection : clusters.values())
       {
          clusterConnection.stop();

Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/RemoteQueueBindingImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/RemoteQueueBindingImpl.java	2009-02-02 18:57:51 UTC (rev 5779)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/RemoteQueueBindingImpl.java	2009-02-02 20:12:41 UTC (rev 5780)
@@ -72,8 +72,6 @@
 
    private final boolean duplicateDetection;
    
-   private final boolean forwardWhenNoMatchingConsumers;
-   
    private final SimpleString idsHeaderName;
    
    private int id;
@@ -84,8 +82,7 @@
                                  final int remoteQueueID,
                                  final SimpleString filterString,
                                  final Queue storeAndForwardQueue,
-                                 final boolean duplicateDetection,
-                                 final boolean forwardWhenNoMatchingConsumers,
+                                 final boolean duplicateDetection,                      
                                  final SimpleString bridgeName) throws Exception
    {
       this.address = address;
@@ -109,8 +106,6 @@
          queueFilter = null;
       }
       
-      this.forwardWhenNoMatchingConsumers = forwardWhenNoMatchingConsumers;
-      
       this.idsHeaderName = MessageImpl.HDR_ROUTE_TO_IDS.concat(bridgeName);
    }
    
@@ -154,25 +149,13 @@
       return false;
    }
 
-   public boolean filterMatches(final ServerMessage message) throws Exception
+   public Filter getFilter()
    {
-      if (queueFilter != null && !queueFilter.match(message))
-      {
-         return false;
-      }
-      else
-      {
-         return true;
-      }
+      return queueFilter;
    }
 
    public boolean isHighAcceptPriority(final ServerMessage message)
-   {
-      if (forwardWhenNoMatchingConsumers)
-      {
-         return true;
-      }
-      
+   {      
       if (consumerCount == 0)
       {
          return false;
@@ -198,8 +181,6 @@
    
    public void willRoute(final ServerMessage message)
    {      
-      log.info("routing to remote queue binding");
-      
       //We add a header with the name of the queue, holding a list of the transient ids of the queues to route to
       
       //TODO - this can be optimised

Modified: trunk/src/schemas/jbm-queues.xsd
===================================================================
--- trunk/src/schemas/jbm-queues.xsd	2009-02-02 18:57:51 UTC (rev 5779)
+++ trunk/src/schemas/jbm-queues.xsd	2009-02-02 20:12:41 UTC (rev 5780)
@@ -74,7 +74,7 @@
    	<xsd:attribute name="match" type="xsd:string" use="required"></xsd:attribute>
    </xsd:complexType>   
    
-      <xsd:element name="queue" type="queueType"></xsd:element>
+   <xsd:element name="queue" type="queueType"></xsd:element>
    
     <xsd:complexType name="queueType">
         <xsd:attribute name="name" type="xsd:string" use="required"></xsd:attribute>

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java	2009-02-02 18:57:51 UTC (rev 5779)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java	2009-02-02 20:12:41 UTC (rev 5780)
@@ -90,7 +90,7 @@
 
    private static final SimpleString COUNT_PROP = new SimpleString("count_prop");
 
-   private static final SimpleString FILTER_PROP = new SimpleString("animal");
+   protected static final SimpleString FILTER_PROP = new SimpleString("animal");
 
    private static final int MAX_SERVERS = 10;
 
@@ -136,7 +136,7 @@
             }
          }
          
-         //log.info("binding count " + bindingCount + " consumer Count " + totConsumers);
+         log.info("binding count " + bindingCount + " consumer Count " + totConsumers);
 
          if (bindingCount == count && totConsumers == consumerCount)
          {
@@ -277,7 +277,7 @@
          {
             ClientMessage message = consumer.receive(500);
 
-            assertNotNull(message);
+            assertNotNull("consumer " + consumerIDs[i] + " did not receive message " + j, message);
 
             assertEquals(j, message.getProperty(COUNT_PROP));
          }
@@ -299,7 +299,7 @@
 
          ClientMessage message = consumer.receive(500);
 
-         assertNotNull(message);
+         assertNotNull("consumer " + consumerIDs[count] + " did not receive message " + i, message);
 
          assertEquals(i, message.getProperty(COUNT_PROP));
 
@@ -323,7 +323,7 @@
             throw new IllegalArgumentException("No consumer at " + consumerIDs[i]);
          }
 
-         assertNull(consumer.receive(200));
+         assertNull("consumer " + i + " received message", consumer.receive(200));
       }
    }
 
@@ -348,6 +348,9 @@
       }
 
       ClientSessionFactory sf = new ClientSessionFactoryImpl(serverTotc);
+      
+      sf.setBlockOnNonPersistentSend(true);
+      sf.setBlockOnPersistentSend(true);
 
       sfs[node] = sf;
    }

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java	2009-02-02 18:57:51 UTC (rev 5779)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java	2009-02-02 20:12:41 UTC (rev 5780)
@@ -40,9 +40,7 @@
       super.setUp();
       
       setupServer(0, false, false);
-      setupServer(1, false, false);
-      
-      setupClusterConnection("cluster1", 0, 1, "queues", false, false);
+      setupServer(1, false, false);            
    }
 
    @Override
@@ -55,40 +53,53 @@
    
    public void testStartTargetServerBeforeSourceServer() throws Exception
    {
+      setupClusterConnection("cluster1", 0, 1, "queues", false, false);
       startServers(1, 0);
 
       setupSessionFactory(0, false);
       setupSessionFactory(1, false);
 
-      createQueue(1, "queues.testaddress", "queue0", null, false);
+      String myFilter = "zebra";
+      
+      createQueue(1, "queues.testaddress", "queue0", myFilter, false);
       addConsumer(0, 1, "queue0", null);
 
       waitForBindings(0, "queues.testaddress", 1, 1, false);
 
-      send(0, "queues.testaddress", 10, false, null);
+      send(0, "queues.testaddress", 10, false, myFilter);
       verifyReceiveAll(10, 0);
       verifyNotReceive(0);
+      
+      send(0, "queues.testaddress", 10, false, null);    
+      verifyNotReceive(0);
    }
    
    public void testStartSourceServerBeforeTargetServer() throws Exception
    {
+      setupClusterConnection("cluster1", 0, 1, "queues", false, false);
       startServers(0, 1);
 
       setupSessionFactory(0, false);
       setupSessionFactory(1, false);
+      
+      String myFilter = "bison";
 
-      createQueue(1, "queues.testaddress", "queue0", null, false);
+      createQueue(1, "queues.testaddress", "queue0", myFilter, false);
       addConsumer(0, 1, "queue0", null);
           
       waitForBindings(0, "queues.testaddress", 1, 1, false);
 
-      send(0, "queues.testaddress", 10, false, null);
+      send(0, "queues.testaddress", 10, false, myFilter);
       verifyReceiveAll(10, 0);
       verifyNotReceive(0);
+      
+      send(0, "queues.testaddress", 10, false, null);
+      verifyNotReceive(0);
    }
 
    public void testBasicLocalReceive() throws Exception
    {
+      setupClusterConnection("cluster1", 0, 1, "queues", false, false);
       startServers(1, 0);
 
       setupSessionFactory(0, false);
@@ -107,6 +118,7 @@
 
    public void testBasicRoundRobin() throws Exception
    {
+      setupClusterConnection("cluster1", 0, 1, "queues", false, false);
       startServers(1, 0);
 
       setupSessionFactory(0, false);
@@ -130,6 +142,7 @@
    
    public void testRoundRobinMultipleQueues() throws Exception
    {
+      setupClusterConnection("cluster1", 0, 1, "queues", false, false);
       startServers(1, 0);
 
       setupSessionFactory(0, false);
@@ -166,9 +179,10 @@
       
       verifyNotReceive(0, 1, 2, 3, 4, 5);
    }
-   
+         
    public void testMultipleNonLoadBalancedQueues() throws Exception
    {
+      setupClusterConnection("cluster1", 0, 1, "queues", false, false);
       startServers(1, 0);
 
       setupSessionFactory(0, false);
@@ -212,6 +226,7 @@
    
    public void testMixtureLoadBalancedAndNonLoadBalancedQueues() throws Exception
    {
+      setupClusterConnection("cluster1", 0, 1, "queues", false, false);
       startServers(1, 0);
 
       setupSessionFactory(0, false);
@@ -275,8 +290,141 @@
       verifyNotReceive(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15);
    }
    
+   public void testMixtureLoadBalancedAndNonLoadBalancedQueuesAddQueuesOnTargetBeforeStartSource() throws Exception
+   {
+      setupClusterConnection("cluster1", 0, 1, "queues", false, false);
+      startServers(1);
+     
+      setupSessionFactory(1, false);
+      
+      createQueue(1, "queues.testaddress", "queue5", null, false);
+      createQueue(1, "queues.testaddress", "queue6", null, false);
+      createQueue(1, "queues.testaddress", "queue7", null, false);
+      createQueue(1, "queues.testaddress", "queue8", null, false);
+      createQueue(1, "queues.testaddress", "queue9", null, false);    
+      
+      createQueue(1, "queues.testaddress", "queue10", null, false);      
+      createQueue(1, "queues.testaddress", "queue11", null, false);      
+      createQueue(1, "queues.testaddress", "queue12", null, false);
+      
+
+      addConsumer(5, 1, "queue5", null);
+      addConsumer(6, 1, "queue6", null);
+      addConsumer(7, 1, "queue7", null);
+      addConsumer(8, 1, "queue8", null);
+      addConsumer(9, 1, "queue9", null); 
+      
+      addConsumer(11, 1, "queue10", null);      
+      addConsumer(13, 1, "queue11", null);      
+      addConsumer(15, 1, "queue12", null);
+      
+      startServers(0);
+      
+      waitForBindings(0, "queues.testaddress", 8, 8, false);
+       
+      setupSessionFactory(0, false);
+      
+      createQueue(0, "queues.testaddress", "queue0", null, false);
+      createQueue(0, "queues.testaddress", "queue1", null, false);
+      createQueue(0, "queues.testaddress", "queue2", null, false);
+      createQueue(0, "queues.testaddress", "queue3", null, false);
+      createQueue(0, "queues.testaddress", "queue4", null, false);   
+      
+      createQueue(0, "queues.testaddress", "queue10", null, false);         
+      createQueue(0, "queues.testaddress", "queue11", null, false);
+      createQueue(0, "queues.testaddress", "queue12", null, false);
+     
+      
+      addConsumer(0, 0, "queue0", null);
+      addConsumer(1, 0, "queue1", null);
+      addConsumer(2, 0, "queue2", null);
+      addConsumer(3, 0, "queue3", null);
+      addConsumer(4, 0, "queue4", null);
+      
+      addConsumer(10, 0, "queue10", null);          
+      addConsumer(12, 0, "queue11", null);           
+      addConsumer(14, 0, "queue12", null);
+                 
+      send(0, "queues.testaddress", 10, false, null);
+                  
+      verifyReceiveAll(10, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
+      
+      verifyReceiveRoundRobin(10, 11, 10);
+      verifyReceiveRoundRobin(10, 13, 12);
+      verifyReceiveRoundRobin(10, 15, 14);
+      
+      verifyNotReceive(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15);
+   }
+   
+   public void testMixtureLoadBalancedAndNonLoadBalancedQueuesAddQueuesOnSourceBeforeStartTarget() throws Exception
+   {
+      setupClusterConnection("cluster1", 0, 1, "queues", false, false);
+      startServers(0);
+     
+      setupSessionFactory(0, false);
+      
+      createQueue(0, "queues.testaddress", "queue0", null, false);
+      createQueue(0, "queues.testaddress", "queue1", null, false);
+      createQueue(0, "queues.testaddress", "queue2", null, false);
+      createQueue(0, "queues.testaddress", "queue3", null, false);
+      createQueue(0, "queues.testaddress", "queue4", null, false);   
+      
+      createQueue(0, "queues.testaddress", "queue10", null, false);         
+      createQueue(0, "queues.testaddress", "queue11", null, false);
+      createQueue(0, "queues.testaddress", "queue12", null, false);
+     
+      
+      addConsumer(0, 0, "queue0", null);
+      addConsumer(1, 0, "queue1", null);
+      addConsumer(2, 0, "queue2", null);
+      addConsumer(3, 0, "queue3", null);
+      addConsumer(4, 0, "queue4", null);
+      
+      addConsumer(10, 0, "queue10", null);          
+      addConsumer(12, 0, "queue11", null);           
+      addConsumer(14, 0, "queue12", null);
+      
+      startServers(1);
+      
+      setupSessionFactory(1, false);
+      
+      createQueue(1, "queues.testaddress", "queue5", null, false);
+      createQueue(1, "queues.testaddress", "queue6", null, false);
+      createQueue(1, "queues.testaddress", "queue7", null, false);
+      createQueue(1, "queues.testaddress", "queue8", null, false);
+      createQueue(1, "queues.testaddress", "queue9", null, false);    
+      
+      createQueue(1, "queues.testaddress", "queue10", null, false);      
+      createQueue(1, "queues.testaddress", "queue11", null, false);      
+      createQueue(1, "queues.testaddress", "queue12", null, false);
+      
+
+      addConsumer(5, 1, "queue5", null);
+      addConsumer(6, 1, "queue6", null);
+      addConsumer(7, 1, "queue7", null);
+      addConsumer(8, 1, "queue8", null);
+      addConsumer(9, 1, "queue9", null); 
+      
+      addConsumer(11, 1, "queue10", null);      
+      addConsumer(13, 1, "queue11", null);      
+      addConsumer(15, 1, "queue12", null);
+      
+      waitForBindings(0, "queues.testaddress", 8, 8, false);
+            
+      send(0, "queues.testaddress", 10, false, null);
+                  
+      verifyReceiveAll(10, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
+      
+      verifyReceiveRoundRobin(10, 10, 11);
+      verifyReceiveRoundRobin(10, 12, 13);
+      verifyReceiveRoundRobin(10, 14, 15);
+      
+      verifyNotReceive(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15);
+   }
+   
    public void testNotRouteToNonMatchingAddress() throws Exception
    {
+      setupClusterConnection("cluster1", 0, 1, "queues", false, false);
       startServers(1, 0);
 
       setupSessionFactory(0, false);
@@ -309,6 +457,615 @@
       
       verifyNotReceive(2, 3, 4, 5);
    }
+   
+   public void testNonLoadBalancedQueuesWithFilters() throws Exception
+   {
+      setupClusterConnection("cluster1", 0, 1, "queues", false, false);
+      startServers(1, 0);
 
+      setupSessionFactory(0, false);
+      setupSessionFactory(1, false);
+      
+      String filter1 = "giraffe";
+      String filter2 = "aardvark";
 
+      createQueue(0, "queues.testaddress", "queue0", filter1, false);
+      createQueue(0, "queues.testaddress", "queue1", filter2, false);
+      createQueue(0, "queues.testaddress", "queue2", filter1, false);
+      createQueue(0, "queues.testaddress", "queue3", filter2, false);
+      createQueue(0, "queues.testaddress", "queue4", filter1, false);
+    
+      
+      createQueue(1, "queues.testaddress", "queue5", filter2, false);
+      createQueue(1, "queues.testaddress", "queue6", filter1, false);
+      createQueue(1, "queues.testaddress", "queue7", filter2, false);
+      createQueue(1, "queues.testaddress", "queue8", filter1, false);
+      createQueue(1, "queues.testaddress", "queue9", filter2, false);
+      
+      createQueue(0, "queues.testaddress", "queue10", null, false);
+      createQueue(1, "queues.testaddress", "queue11", null, false);
+
+      
+      addConsumer(0, 0, "queue0", null);
+      addConsumer(1, 0, "queue1", null);
+      addConsumer(2, 0, "queue2", null);
+      addConsumer(3, 0, "queue3", null);
+      addConsumer(4, 0, "queue4", null);
+      
+      addConsumer(5, 1, "queue5", null);
+      addConsumer(6, 1, "queue6", null);
+      addConsumer(7, 1, "queue7", null);
+      addConsumer(8, 1, "queue8", null);
+      addConsumer(9, 1, "queue9", null);
+      
+      addConsumer(10, 0, "queue10", null);
+      
+      addConsumer(11, 1, "queue11", null);
+           
+      waitForBindings(0, "queues.testaddress", 6, 6, true);
+      waitForBindings(0, "queues.testaddress", 6, 6, false);
+
+      send(0, "queues.testaddress", 10, false, filter1);
+                  
+      verifyReceiveAll(10, 0, 2, 4, 6, 8, 10, 11);
+      
+      verifyNotReceive(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11);
+      
+      send(0, "queues.testaddress", 10, false, filter2);
+      
+      verifyReceiveAll(10, 1, 3, 5, 7, 9, 10, 11);
+      
+      verifyNotReceive(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11);
+   }
+   
+   public void testRoundRobinMultipleQueuesWithFilters() throws Exception
+   {
+      setupClusterConnection("cluster1", 0, 1, "queues", false, false);
+      startServers(1, 0);
+
+      setupSessionFactory(0, false);
+      setupSessionFactory(1, false);
+      
+      String filter1 = "giraffe";
+      String filter2 = "aardvark";
+
+      createQueue(0, "queues.testaddress", "queue0", filter1, false);
+      createQueue(1, "queues.testaddress", "queue0", filter1, false);
+      
+      createQueue(0, "queues.testaddress", "queue1", filter1, false);
+      createQueue(1, "queues.testaddress", "queue1", filter2, false);
+      
+      createQueue(0, "queues.testaddress", "queue2", filter2, false);
+      createQueue(1, "queues.testaddress", "queue2", filter1, false);
+      
+      createQueue(0, "queues.testaddress", "queue3", filter2, false);
+      createQueue(1, "queues.testaddress", "queue3", filter2, false);
+      
+      createQueue(0, "queues.testaddress", "queue4", null, false);
+      createQueue(1, "queues.testaddress", "queue4", null, false);
+
+      addConsumer(0, 0, "queue0", null);
+      addConsumer(1, 1, "queue0", null);
+      
+      addConsumer(2, 0, "queue1", null);
+      addConsumer(3, 1, "queue1", null);
+      
+      addConsumer(4, 0, "queue2", null);
+      addConsumer(5, 1, "queue2", null);
+      
+      addConsumer(6, 0, "queue3", null);
+      addConsumer(7, 1, "queue3", null);
+      
+      addConsumer(8, 0, "queue4", null);
+      addConsumer(9, 1, "queue4", null);
+
+      waitForBindings(0, "queues.testaddress", 5, 5, true);
+      waitForBindings(0, "queues.testaddress", 5, 5, false);
+
+      send(0, "queues.testaddress", 10, false, filter1);
+                  
+      verifyReceiveRoundRobin(10, 0, 1);
+      verifyReceiveRoundRobin(10, 8, 9);
+      
+      verifyReceiveAll(10, 2, 5);
+      verifyNotReceive(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
+            
+      send(0, "queues.testaddress", 10, false, filter2);
+      
+      verifyReceiveRoundRobin(10, 6, 7);
+      verifyReceiveRoundRobin(10, 8, 9);
+      
+      verifyReceiveAll(10, 3, 4);
+      verifyNotReceive(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
+      
+   }
+
+   public void testRouteWhenNoConsumersFalseNonBalancedQueues() throws Exception
+   {
+      setupClusterConnection("cluster2", 0, 1, "queues2", false, false);
+      
+      startServers(1, 0);
+
+      setupSessionFactory(0, false);
+      setupSessionFactory(1, false);
+      
+      createQueue(0, "queues2.testaddress", "queue0", null, false);
+      createQueue(0, "queues2.testaddress", "queue1", null, false);
+      createQueue(0, "queues2.testaddress", "queue2", null, false);
+      
+      createQueue(1, "queues2.testaddress", "queue3", null, false);
+      createQueue(1, "queues2.testaddress", "queue4", null, false);
+      createQueue(1, "queues2.testaddress", "queue5", null, false);
+      
+      waitForBindings(0, "queues2.testaddress", 3, 0, true);
+      waitForBindings(0, "queues2.testaddress", 3, 0, false);
+      
+      send(0, "queues2.testaddress", 10, false, null);
+      
+      addConsumer(0, 0, "queue0", null);
+      addConsumer(1, 0, "queue1", null);
+      addConsumer(2, 0, "queue2", null);
+      
+      addConsumer(3, 1, "queue3", null);
+      addConsumer(4, 1, "queue4", null);
+      addConsumer(5, 1, "queue5", null);
+                  
+      verifyReceiveAll(10, 0, 1, 2, 3, 4, 5);
+      
+      verifyNotReceive(0, 1, 2, 3, 4, 5);      
+   }
+   
+   public void testRouteWhenNoConsumersTrueNonBalancedQueues() throws Exception
+   {
+      setupClusterConnection("cluster2", 0, 1, "queues2", true, false);
+      
+      startServers(1, 0);
+
+      setupSessionFactory(0, false);
+      setupSessionFactory(1, false);
+      
+      createQueue(0, "queues2.testaddress", "queue0", null, false);
+      createQueue(0, "queues2.testaddress", "queue1", null, false);
+      createQueue(0, "queues2.testaddress", "queue2", null, false);
+      
+      createQueue(1, "queues2.testaddress", "queue3", null, false);
+      createQueue(1, "queues2.testaddress", "queue4", null, false);
+      createQueue(1, "queues2.testaddress", "queue5", null, false);
+      
+      waitForBindings(0, "queues2.testaddress", 3, 0, true);
+      waitForBindings(0, "queues2.testaddress", 3, 0, false);
+      
+      send(0, "queues2.testaddress", 10, false, null);
+      
+      addConsumer(0, 0, "queue0", null);
+      addConsumer(1, 0, "queue1", null);
+      addConsumer(2, 0, "queue2", null);
+      
+      addConsumer(3, 1, "queue3", null);
+      addConsumer(4, 1, "queue4", null);
+      addConsumer(5, 1, "queue5", null);
+                  
+      verifyReceiveAll(10, 0, 1, 2, 3, 4, 5);
+      
+      verifyNotReceive(0, 1, 2, 3, 4, 5);      
+   }
+   
+   public void testRouteWhenNoConsumersFalseLoadBalancedQueues() throws Exception
+   {
+      setupClusterConnection("cluster2", 0, 1, "queues2", false, false);
+      
+      startServers(1, 0);
+
+      setupSessionFactory(0, false);
+      setupSessionFactory(1, false);
+      
+      createQueue(0, "queues2.testaddress", "queue0", null, false);
+      createQueue(0, "queues2.testaddress", "queue1", null, false);
+      createQueue(0, "queues2.testaddress", "queue2", null, false);
+      
+      createQueue(1, "queues2.testaddress", "queue0", null, false);
+      createQueue(1, "queues2.testaddress", "queue1", null, false);
+      createQueue(1, "queues2.testaddress", "queue2", null, false);
+      
+      waitForBindings(0, "queues2.testaddress", 3, 0, true);
+      waitForBindings(0, "queues2.testaddress", 3, 0, false);
+      
+      send(0, "queues2.testaddress", 10, false, null);
+      
+      addConsumer(0, 0, "queue0", null);
+      addConsumer(1, 0, "queue1", null);
+      addConsumer(2, 0, "queue2", null);
+      
+      addConsumer(3, 1, "queue0", null);
+      addConsumer(4, 1, "queue1", null);
+      addConsumer(5, 1, "queue2", null);
+      
+      //If route when no consumers is false but there is no consumer on the local queue then messages should be round robin'd
+      //It's only in the case where there is a local consumer they shouldn't be round robin'd
+      
+      verifyReceiveRoundRobin(10, 0, 3);
+      verifyReceiveRoundRobin(10, 1, 4);
+      verifyReceiveRoundRobin(10, 2, 5);
+      
+      verifyNotReceive(0, 1, 2, 3, 4, 5);  
+   }
+   
+   public void testRouteWhenNoConsumersFalseLoadBalancedQueuesLocalConsumer() throws Exception
+   {
+      setupClusterConnection("cluster2", 0, 1, "queues2", false, false);
+      
+      startServers(1, 0);
+
+      setupSessionFactory(0, false);
+      setupSessionFactory(1, false);
+      
+      createQueue(0, "queues2.testaddress", "queue0", null, false);
+      createQueue(0, "queues2.testaddress", "queue1", null, false);
+      createQueue(0, "queues2.testaddress", "queue2", null, false);
+      
+      createQueue(1, "queues2.testaddress", "queue0", null, false);
+      createQueue(1, "queues2.testaddress", "queue1", null, false);
+      createQueue(1, "queues2.testaddress", "queue2", null, false);
+      
+      addConsumer(0, 0, "queue0", null);
+      addConsumer(1, 0, "queue1", null);
+      addConsumer(2, 0, "queue2", null);
+      
+      waitForBindings(0, "queues2.testaddress", 3, 3, true);
+      waitForBindings(0, "queues2.testaddress", 3, 0, false);
+      
+      send(0, "queues2.testaddress", 10, false, null);
+                        
+      addConsumer(3, 1, "queue0", null);
+      addConsumer(4, 1, "queue1", null);
+      addConsumer(5, 1, "queue2", null);
+      
+      //In this case, since the local queue has a consumer, it should receive all the messages
+      
+      verifyReceiveAll(10, 0, 1, 2);
+      
+      verifyNotReceive(0, 1, 2, 3, 4, 5);  
+   }
+   
+   public void testRouteWhenNoConsumersFalseLoadBalancedQueuesNoLocalQueue() throws Exception
+   {
+      setupClusterConnection("cluster2", 0, 1, "queues2", false, false);
+      
+      startServers(1, 0);
+
+      setupSessionFactory(0, false);
+      setupSessionFactory(1, false);
+      
+      createQueue(0, "queues2.testaddress", "queue0", null, false);
+      createQueue(0, "queues2.testaddress", "queue1", null, false);
+      
+      createQueue(1, "queues2.testaddress", "queue0", null, false);
+      createQueue(1, "queues2.testaddress", "queue1", null, false);
+      
+      waitForBindings(0, "queues2.testaddress", 2, 0, true);
+      waitForBindings(0, "queues2.testaddress", 2, 0, false);
+      
+      send(0, "queues2.testaddress", 10, false, null);
+                        
+      addConsumer(0, 0, "queue0", null);
+      addConsumer(1, 0, "queue1", null);
+           
+      addConsumer(2, 1, "queue0", null);
+      addConsumer(3, 1, "queue1", null);
+      
+      verifyReceiveRoundRobin(10, 0, 2);
+      verifyReceiveRoundRobin(10, 1, 3);
+      
+      verifyNotReceive(0, 1, 2, 3);  
+   }
+   
+   public void testRouteWhenNoConsumersTrueLoadBalancedQueues() throws Exception
+   {
+      setupClusterConnection("cluster2", 0, 1, "queues2", true, false);
+      
+      startServers(1, 0);
+
+      setupSessionFactory(0, false);
+      setupSessionFactory(1, false);
+      
+      createQueue(0, "queues2.testaddress", "queue0", null, false);
+      createQueue(0, "queues2.testaddress", "queue1", null, false);
+      createQueue(0, "queues2.testaddress", "queue2", null, false);
+      
+      createQueue(1, "queues2.testaddress", "queue0", null, false);
+      createQueue(1, "queues2.testaddress", "queue1", null, false);
+      createQueue(1, "queues2.testaddress", "queue2", null, false);
+      
+      waitForBindings(0, "queues2.testaddress", 3, 0, true);
+      waitForBindings(0, "queues2.testaddress", 3, 0, false);
+      
+      send(0, "queues2.testaddress", 10, false, null);
+      
+      addConsumer(0, 0, "queue0", null);
+      addConsumer(1, 0, "queue1", null);
+      addConsumer(2, 0, "queue2", null);
+      
+      addConsumer(3, 1, "queue0", null);
+      addConsumer(4, 1, "queue1", null);
+      addConsumer(5, 1, "queue2", null);
+      
+      verifyReceiveRoundRobin(10, 0, 3);
+      verifyReceiveRoundRobin(10, 1, 4);
+      verifyReceiveRoundRobin(10, 2, 5);
+      
+      verifyNotReceive(0, 1, 2, 3, 4, 5);  
+   }
+   
+   public void testRouteWhenNoConsumersTrueLoadBalancedQueuesLocalConsumer() throws Exception
+   {
+      setupClusterConnection("cluster2", 0, 1, "queues2", true, false);
+      
+      startServers(1, 0);
+
+      setupSessionFactory(0, false);
+      setupSessionFactory(1, false);
+      
+      createQueue(0, "queues2.testaddress", "queue0", null, false);
+      createQueue(0, "queues2.testaddress", "queue1", null, false);
+      createQueue(0, "queues2.testaddress", "queue2", null, false);
+      
+      createQueue(1, "queues2.testaddress", "queue0", null, false);
+      createQueue(1, "queues2.testaddress", "queue1", null, false);
+      createQueue(1, "queues2.testaddress", "queue2", null, false);
+      
+      addConsumer(0, 0, "queue0", null);
+      addConsumer(1, 0, "queue1", null);
+      addConsumer(2, 0, "queue2", null);
+      
+      waitForBindings(0, "queues2.testaddress", 3, 3, true);
+      waitForBindings(0, "queues2.testaddress", 3, 0, false);
+      
+      send(0, "queues2.testaddress", 10, false, null);
+                        
+      addConsumer(3, 1, "queue0", null);
+      addConsumer(4, 1, "queue1", null);
+      addConsumer(5, 1, "queue2", null);
+      
+      verifyReceiveRoundRobin(10, 0, 3);
+      verifyReceiveRoundRobin(10, 1, 4);
+      verifyReceiveRoundRobin(10, 2, 5);
+      
+      verifyNotReceive(0, 1, 2, 3, 4, 5);  
+   }
+   
+   public void testRouteWhenNoConsumersTrueLoadBalancedQueuesNoLocalQueue() throws Exception
+   {
+      setupClusterConnection("cluster2", 0, 1, "queues2", true, false);
+      
+      startServers(1, 0);
+
+      setupSessionFactory(0, false);
+      setupSessionFactory(1, false);
+      
+      createQueue(0, "queues2.testaddress", "queue0", null, false);
+      createQueue(0, "queues2.testaddress", "queue1", null, false);
+      
+      createQueue(1, "queues2.testaddress", "queue0", null, false);
+      createQueue(1, "queues2.testaddress", "queue1", null, false);
+      
+      waitForBindings(0, "queues2.testaddress", 2, 0, true);
+      waitForBindings(0, "queues2.testaddress", 2, 0, false);
+      
+      send(0, "queues2.testaddress", 10, false, null);
+                        
+      addConsumer(0, 0, "queue0", null);
+      addConsumer(1, 0, "queue1", null);
+           
+      addConsumer(2, 1, "queue0", null);
+      addConsumer(3, 1, "queue1", null);
+      
+      verifyReceiveRoundRobin(10, 0, 2);
+      verifyReceiveRoundRobin(10, 1, 3);
+      
+      verifyNotReceive(0, 1, 2, 3);  
+   }
+   
+   public void testNonLoadBalancedQueuesWithConsumersWithFilters() throws Exception
+   {
+      setupClusterConnection("cluster1", 0, 1, "queues", false, false);
+      startServers(1, 0);
+
+      setupSessionFactory(0, false);
+      setupSessionFactory(1, false);
+      
+      String filter1 = "giraffe";
+      String filter2 = "aardvark";
+
+      createQueue(0, "queues.testaddress", "queue0", null, false);
+      createQueue(0, "queues.testaddress", "queue1", null, false);
+      createQueue(0, "queues.testaddress", "queue2", null, false);
+      createQueue(0, "queues.testaddress", "queue3", null, false);
+      createQueue(0, "queues.testaddress", "queue4", null, false);
+    
+      
+      createQueue(1, "queues.testaddress", "queue5", null, false);
+      createQueue(1, "queues.testaddress", "queue6", null, false);
+      createQueue(1, "queues.testaddress", "queue7", null, false);
+      createQueue(1, "queues.testaddress", "queue8", null, false);
+      createQueue(1, "queues.testaddress", "queue9", null, false);
+      
+      createQueue(0, "queues.testaddress", "queue10", null, false);
+      createQueue(1, "queues.testaddress", "queue11", null, false);
+
+      
+      addConsumer(0, 0, "queue0", filter1);
+      addConsumer(1, 0, "queue1", filter2);
+      addConsumer(2, 0, "queue2", filter1);
+      addConsumer(3, 0, "queue3", filter2);
+      addConsumer(4, 0, "queue4", filter1);
+      
+      addConsumer(5, 1, "queue5", filter2);
+      addConsumer(6, 1, "queue6", filter1);
+      addConsumer(7, 1, "queue7", filter2);
+      addConsumer(8, 1, "queue8", filter1);
+      addConsumer(9, 1, "queue9", filter2);
+      
+      addConsumer(10, 0, "queue10", null);
+      
+      addConsumer(11, 1, "queue11", null);
+           
+      waitForBindings(0, "queues.testaddress", 6, 6, true);
+      waitForBindings(0, "queues.testaddress", 6, 6, false);
+
+      send(0, "queues.testaddress", 10, false, filter1);
+                  
+      verifyReceiveAll(10, 0, 2, 4, 6, 8, 10, 11);
+      
+      verifyNotReceive(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11);
+      
+      send(0, "queues.testaddress", 10, false, filter2);
+      
+      verifyReceiveAll(10, 1, 3, 5, 7, 9, 10, 11);
+      
+      verifyNotReceive(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11);
+   }
+   
+   public void testRoundRobinMultipleQueuesWithConsumersWithFilters() throws Exception
+   {
+      setupClusterConnection("cluster1", 0, 1, "queues", false, false);
+      startServers(1, 0);
+
+      setupSessionFactory(0, false);
+      setupSessionFactory(1, false);
+      
+      String filter1 = "giraffe";
+      String filter2 = "aardvark";
+
+      createQueue(0, "queues.testaddress", "queue0", null, false);
+      createQueue(1, "queues.testaddress", "queue0", null, false);
+      
+      createQueue(0, "queues.testaddress", "queue1", null, false);
+      createQueue(1, "queues.testaddress", "queue1", null, false);
+      
+      createQueue(0, "queues.testaddress", "queue2", null, false);
+      createQueue(1, "queues.testaddress", "queue2", null, false);
+      
+      createQueue(0, "queues.testaddress", "queue3", null, false);
+      createQueue(1, "queues.testaddress", "queue3", null, false);
+      
+      createQueue(0, "queues.testaddress", "queue4", null, false);
+      createQueue(1, "queues.testaddress", "queue4", null, false);
+
+      addConsumer(0, 0, "queue0", filter1);
+      addConsumer(1, 1, "queue0", filter1);
+      
+      addConsumer(2, 0, "queue1", filter1);
+      addConsumer(3, 1, "queue1", filter2);
+      
+      addConsumer(4, 0, "queue2", filter2);
+      addConsumer(5, 1, "queue2", filter1);
+      
+      addConsumer(6, 0, "queue3", filter2);
+      addConsumer(7, 1, "queue3", filter2);
+      
+      addConsumer(8, 0, "queue4", null);
+      addConsumer(9, 1, "queue4", null);
+
+      waitForBindings(0, "queues.testaddress", 5, 5, true);
+      waitForBindings(0, "queues.testaddress", 5, 5, false);
+
+      send(0, "queues.testaddress", 10, false, filter1);
+                  
+      verifyReceiveRoundRobin(10, 0, 1);
+      verifyReceiveRoundRobin(10, 8, 9);
+      
+      verifyReceiveAll(10, 2, 5);
+      verifyNotReceive(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
+            
+      send(0, "queues.testaddress", 10, false, filter2);
+      
+      verifyReceiveRoundRobin(10, 6, 7);
+      verifyReceiveRoundRobin(10, 8, 9);
+      
+      verifyReceiveAll(10, 3, 4);
+      verifyNotReceive(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
+      
+   }
+   
+   public void testMultipleClusterConnections() throws Exception
+   {
+      setupClusterConnection("cluster1", 0, 1, "queues1", false, false);
+      setupClusterConnection("cluster2", 0, 1, "queues2", false, false);
+      setupClusterConnection("cluster3", 0, 1, "queues3", false, false);
+      
+      startServers(1, 0);
+
+      setupSessionFactory(0, false);
+      setupSessionFactory(1, false);
+
+      //Make sure the different connections don't conflict
+      
+      createQueue(0, "queues1.testaddress", "queue0", null, false);
+      createQueue(0, "queues1.testaddress", "queue1", null, false);
+      createQueue(0, "queues2.testaddress", "queue2", null, false);
+      createQueue(0, "queues2.testaddress", "queue3", null, false);
+      createQueue(0, "queues3.testaddress", "queue4", null, false);
+      createQueue(0, "queues3.testaddress", "queue5", null, false);
+    
+      
+      createQueue(1, "queues1.testaddress", "queue6", null, false);
+      createQueue(1, "queues1.testaddress", "queue7", null, false);
+      createQueue(1, "queues2.testaddress", "queue8", null, false);
+      createQueue(1, "queues2.testaddress", "queue9", null, false);
+      createQueue(1, "queues3.testaddress", "queue10", null, false);
+      createQueue(1, "queues3.testaddress", "queue11", null, false);
+
+      
+      addConsumer(0, 0, "queue0", null);
+      addConsumer(1, 0, "queue1", null);
+      addConsumer(2, 0, "queue2", null);
+      addConsumer(3, 0, "queue3", null);
+      addConsumer(4, 0, "queue4", null);
+      addConsumer(5, 0, "queue5", null);
+      
+     
+      addConsumer(6, 1, "queue6", null);
+      addConsumer(7, 1, "queue7", null);
+      addConsumer(8, 1, "queue8", null);
+      addConsumer(9, 1, "queue9", null);
+      addConsumer(10, 1, "queue10", null);
+      addConsumer(11, 1, "queue11", null);
+           
+      waitForBindings(0, "queues1.testaddress", 2, 2, true);
+      waitForBindings(0, "queues1.testaddress", 2, 2, false);
+      
+      waitForBindings(0, "queues2.testaddress", 2, 2, true);
+      waitForBindings(0, "queues2.testaddress", 2, 2, false);
+      
+      waitForBindings(0, "queues3.testaddress", 2, 2, true);
+      waitForBindings(0, "queues3.testaddress", 2, 2, false);
+
+      send(0, "queues1.testaddress", 10, false, null);
+                  
+      verifyReceiveAll(10, 0, 1, 6, 7);
+      
+      verifyNotReceive(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11);
+      
+      send(0, "queues2.testaddress", 10, false, null);
+      
+      verifyReceiveAll(10, 2, 3, 8, 9);
+      
+      verifyNotReceive(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11);
+      
+      send(0, "queues3.testaddress", 10, false, null);
+      
+      verifyReceiveAll(10, 4, 5, 10, 11);
+      
+      verifyNotReceive(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11);
+   }
+
+//   
+//   public void testDurableAndRestart()
+//   {      
+//   }
+//   
+//   public void testWithNetty()
+//   {      
+//   }
+   
 }

Modified: trunk/tests/src/org/jboss/messaging/tests/performance/persistence/FakeBinding.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/performance/persistence/FakeBinding.java	2009-02-02 18:57:51 UTC (rev 5779)
+++ trunk/tests/src/org/jboss/messaging/tests/performance/persistence/FakeBinding.java	2009-02-02 20:12:41 UTC (rev 5780)
@@ -22,6 +22,7 @@
 
 package org.jboss.messaging.tests.performance.persistence;
 
+import org.jboss.messaging.core.filter.Filter;
 import org.jboss.messaging.core.postoffice.Binding;
 import org.jboss.messaging.core.server.Bindable;
 import org.jboss.messaging.core.server.ServerMessage;
@@ -35,6 +36,12 @@
 public class FakeBinding implements Binding
 {
    
+   public Filter getFilter()
+   {
+      // TODO Auto-generated method stub
+      return null;
+   }
+
    public int getID()
    {
       // TODO Auto-generated method stub




More information about the jboss-cvs-commits mailing list