[jboss-cvs] JBoss Messaging SVN: r5768 - in trunk: src/main/org/jboss/messaging/core/management/impl and 8 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Jan 30 14:46:02 EST 2009


Author: timfox
Date: 2009-01-30 14:46:02 -0500 (Fri, 30 Jan 2009)
New Revision: 5768

Added:
   trunk/src/main/org/jboss/messaging/core/postoffice/QueueBinding.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java
Modified:
   trunk/src/main/org/jboss/messaging/core/management/MessagingServerControlMBean.java
   trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java
   trunk/src/main/org/jboss/messaging/core/management/jmx/impl/ReplicationAwareMessagingServerControlWrapper.java
   trunk/src/main/org/jboss/messaging/core/postoffice/PostOffice.java
   trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java
   trunk/src/main/org/jboss/messaging/core/postoffice/impl/LocalQueueBinding.java
   trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/RemoteQueueBinding.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/impl/RemoteQueueBindingImpl.java
   trunk/src/schemas/jbm-queues.xsd
   trunk/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java
Log:
more clustering

Modified: trunk/src/main/org/jboss/messaging/core/management/MessagingServerControlMBean.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/MessagingServerControlMBean.java	2009-01-30 17:20:35 UTC (rev 5767)
+++ trunk/src/main/org/jboss/messaging/core/management/MessagingServerControlMBean.java	2009-01-30 19:46:02 UTC (rev 5768)
@@ -166,6 +166,6 @@
 
    TabularData getConnectors() throws Exception;
    
-   void sendQueueInfoToQueue(String queueName) throws Exception;
+   void sendQueueInfoToQueue(String queueName, String address) throws Exception;
 
 }

Modified: trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java	2009-01-30 17:20:35 UTC (rev 5767)
+++ trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java	2009-01-30 19:46:02 UTC (rev 5768)
@@ -544,9 +544,9 @@
       return TransportConfigurationInfo.toTabularData(connectorConfigurations);
    }
    
-   public void sendQueueInfoToQueue(final String queueName) throws Exception
+   public void sendQueueInfoToQueue(final String queueName, final String address) throws Exception
    {
-      postOffice.sendQueueInfoToQueue(new SimpleString(queueName));
+      postOffice.sendQueueInfoToQueue(new SimpleString(queueName), new SimpleString(address));
    }
 
    // NotificationEmitter implementation ----------------------------

Modified: trunk/src/main/org/jboss/messaging/core/management/jmx/impl/ReplicationAwareMessagingServerControlWrapper.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/jmx/impl/ReplicationAwareMessagingServerControlWrapper.java	2009-01-30 17:20:35 UTC (rev 5767)
+++ trunk/src/main/org/jboss/messaging/core/management/jmx/impl/ReplicationAwareMessagingServerControlWrapper.java	2009-01-30 19:46:02 UTC (rev 5768)
@@ -239,9 +239,9 @@
       return localControl.getConnectors();
    }
 
-   public void sendQueueInfoToQueue(final String queueName) throws Exception
+   public void sendQueueInfoToQueue(final String queueName, final String address) throws Exception
    {
-      localControl.sendQueueInfoToQueue(queueName);
+      localControl.sendQueueInfoToQueue(queueName, address);
    }
 
    public boolean addAddress(String address) throws Exception

Modified: trunk/src/main/org/jboss/messaging/core/postoffice/PostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/PostOffice.java	2009-01-30 17:20:35 UTC (rev 5767)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/PostOffice.java	2009-01-30 19:46:02 UTC (rev 5768)
@@ -81,5 +81,5 @@
 
    DuplicateIDCache getDuplicateIDCache(SimpleString address);
    
-   void sendQueueInfoToQueue(SimpleString queueName) throws Exception;
+   void sendQueueInfoToQueue(SimpleString queueName, SimpleString address) throws Exception;
 }

Added: trunk/src/main/org/jboss/messaging/core/postoffice/QueueBinding.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/QueueBinding.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/QueueBinding.java	2009-01-30 19:46:02 UTC (rev 5768)
@@ -0,0 +1,38 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.postoffice;
+
+/**
+ * A QueueBinding
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * 
+ * Created 30 Jan 2009 11:04:37
+ *
+ *
+ */
+public interface QueueBinding extends Binding
+{
+   int consumerCount();
+}

Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java	2009-01-30 17:20:35 UTC (rev 5767)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java	2009-01-30 19:46:02 UTC (rev 5768)
@@ -63,7 +63,7 @@
    private final List<Binding> exclusiveBindings = new CopyOnWriteArrayList<Binding>();
 
    public Collection<Binding> getBindings()
-   {
+   {      
       return bindingsMap.values();
    }
 
@@ -93,8 +93,8 @@
 
          bindings.add(binding);
       }
-
-      bindingsMap.put(binding.getID(), binding);
+      
+      bindingsMap.put(binding.getID(), binding);           
    }
 
    public void removeBinding(final Binding binding)
@@ -120,7 +120,7 @@
          }
       }
 
-      bindingsMap.remove(binding.getID());
+      bindingsMap.remove(binding.getID());          
    }
 
    private void routeFromCluster(final ServerMessage message, final Transaction tx) throws Exception

Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/LocalQueueBinding.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/LocalQueueBinding.java	2009-01-30 17:20:35 UTC (rev 5767)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/LocalQueueBinding.java	2009-01-30 19:46:02 UTC (rev 5768)
@@ -26,7 +26,8 @@
 import java.util.List;
 
 import org.jboss.messaging.core.filter.Filter;
-import org.jboss.messaging.core.postoffice.Binding;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.postoffice.QueueBinding;
 import org.jboss.messaging.core.server.Bindable;
 import org.jboss.messaging.core.server.Consumer;
 import org.jboss.messaging.core.server.Queue;
@@ -42,8 +43,10 @@
  *
  *
  */
-public class LocalQueueBinding implements Binding
+public class LocalQueueBinding implements QueueBinding
 {
+   private static final Logger log = Logger.getLogger(LocalQueueBinding.class);
+
    private final SimpleString address;
    
    private final Queue queue;
@@ -142,12 +145,15 @@
    public void willRoute(final ServerMessage message)
    {      
    }
-   
-   
-
+     
    public boolean isQueueBinding()
    {
       return true;
    }
+   
+   public int consumerCount()
+   {
+      return queue.getConsumerCount();
+   }
 
 }

Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2009-01-30 17:20:35 UTC (rev 5767)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2009-01-30 19:46:02 UTC (rev 5768)
@@ -228,7 +228,7 @@
    
    
    public void onNotification(final Notification notification)
-   {
+   {      
       synchronized (notificationLock)
       {
          NotificationType type = notification.getType();
@@ -238,7 +238,7 @@
             TypedProperties props = notification.getProperties();
             
             SimpleString queueName = (SimpleString)props.getProperty(ManagementHelper.HDR_QUEUE_NAME);
-            
+               
             SimpleString address = (SimpleString)props.getProperty(ManagementHelper.HDR_ADDRESS);
             
             Integer transientID = (Integer)props.getProperty(ManagementHelper.HDR_BINDING_ID);
@@ -359,7 +359,7 @@
    // and post office is activated but queue remains unactivated after failover so delivery never occurs
    // even though failover is complete
    public synchronized void addBinding(final Binding binding) throws Exception
-   {
+   {      
       binding.setID(generateTransientID());
       
       addBindingInMemory(binding);
@@ -572,7 +572,7 @@
    
    
 
-   public void sendQueueInfoToQueue(final SimpleString queueName) throws Exception
+   public void sendQueueInfoToQueue(final SimpleString queueName, final SimpleString address) throws Exception
    {
       //We send direct to the queue so we can send it to the same queue that is bound to the notifications adress - this is crucial for ensuring
       //that queue infos and notifications are received in a contiguous consistent stream
@@ -600,43 +600,46 @@
                   
          for (QueueInfo info: queueInfos.values())
          {            
-            message = createQueueInfoMessage(NotificationType.BINDING_ADDED, queueName);
-            
-            message.putStringProperty(ManagementHelper.HDR_ADDRESS, info.getAddress());
-            message.putStringProperty(ManagementHelper.HDR_QUEUE_NAME, info.getQueueName());
-            message.putIntProperty(ManagementHelper.HDR_BINDING_ID, info.getID());
-            
-            queue.preroute(message, null);            
-            queue.route(message, null);
-            
-            int consumersWithFilters = info.getFilterStrings() != null ? info.getFilterStrings().size() : 0;
-            
-            for (int i = 0; i < info.getNumberOfConsumers() - consumersWithFilters; i++)
+            if (info.getAddress().startsWith(address))
             {
-               message = createQueueInfoMessage(NotificationType.CONSUMER_CREATED, queueName);
+               message = createQueueInfoMessage(NotificationType.BINDING_ADDED, queueName);
                
-               message.putStringProperty(ManagementHelper.HDR_QUEUE_NAME, info.getQueueName()); 
+               message.putStringProperty(ManagementHelper.HDR_ADDRESS, info.getAddress());
+               message.putStringProperty(ManagementHelper.HDR_QUEUE_NAME, info.getQueueName());
+               message.putIntProperty(ManagementHelper.HDR_BINDING_ID, info.getID());
                
                queue.preroute(message, null);            
                queue.route(message, null);
-            }
-            
-            if (info.getFilterStrings() != null)
-            {
-               for (SimpleString filterString: info.getFilterStrings())
+               
+               int consumersWithFilters = info.getFilterStrings() != null ? info.getFilterStrings().size() : 0;
+               
+               for (int i = 0; i < info.getNumberOfConsumers() - consumersWithFilters; i++)
                {
                   message = createQueueInfoMessage(NotificationType.CONSUMER_CREATED, queueName);
                   
-                  message.putStringProperty(ManagementHelper.HDR_QUEUE_NAME, info.getQueueName());
-                  message.putStringProperty(ManagementHelper.HDR_FILTERSTRING, filterString); 
+                  message.putStringProperty(ManagementHelper.HDR_QUEUE_NAME, info.getQueueName()); 
                   
                   queue.preroute(message, null);            
                   queue.route(message, null);
                }
-            }           
+               
+               if (info.getFilterStrings() != null)
+               {
+                  for (SimpleString filterString: info.getFilterStrings())
+                  {
+                     message = createQueueInfoMessage(NotificationType.CONSUMER_CREATED, queueName);
+                     
+                     message.putStringProperty(ManagementHelper.HDR_QUEUE_NAME, info.getQueueName());
+                     message.putStringProperty(ManagementHelper.HDR_FILTERSTRING, filterString); 
+                     
+
+                     queue.preroute(message, null);            
+                     queue.route(message, null);
+                  }
+               }     
+            }
          }
       }
-      
    }
    
    // Private -----------------------------------------------------------------

Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/RemoteQueueBinding.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/RemoteQueueBinding.java	2009-01-30 17:20:35 UTC (rev 5767)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/RemoteQueueBinding.java	2009-01-30 19:46:02 UTC (rev 5768)
@@ -20,10 +20,9 @@
  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
  */
 
-
 package org.jboss.messaging.core.server.cluster;
 
-import org.jboss.messaging.core.postoffice.Binding;
+import org.jboss.messaging.core.postoffice.QueueBinding;
 import org.jboss.messaging.util.SimpleString;
 
 /**
@@ -35,10 +34,9 @@
  *
  *
  */
-public interface RemoteQueueBinding extends Binding
+public interface RemoteQueueBinding extends QueueBinding
 {
    void addConsumer(SimpleString filterString) throws Exception;
-   
-   void removeConsumer(SimpleString filterString) throws Exception;
 
+   void removeConsumer(SimpleString filterString) throws Exception;
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java	2009-01-30 17:20:35 UTC (rev 5767)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java	2009-01-30 19:46:02 UTC (rev 5768)
@@ -293,10 +293,11 @@
                                                       "'" +
                                                       NotificationType.CONSUMER_CLOSED +
                                                       "') AND " +
+                                                      "("+ ManagementHelper.HDR_ADDRESS + " IS NULL OR " +                                                      
                                                       ManagementHelper.HDR_ADDRESS +
                                                       " LIKE '" +
                                                       queueDataAddress +
-                                                      "%'");
+                                                      "%')");
 
                session.createQueue(DEFAULT_MANAGEMENT_NOTIFICATION_ADDRESS, notifQueueName, filter, false, true);
 
@@ -311,7 +312,8 @@
                ManagementHelper.putOperationInvocation(message,
                                                        ManagementServiceImpl.getMessagingServerObjectName(),
                                                        "sendQueueInfoToQueue",
-                                                       notifQueueName.toString());
+                                                       notifQueueName.toString(),
+                                                       queueDataAddress);
 
                ClientProducer prod = session.createProducer(ConfigurationImpl.DEFAULT_MANAGEMENT_ADDRESS);
 

Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java	2009-01-30 17:20:35 UTC (rev 5767)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java	2009-01-30 19:46:02 UTC (rev 5768)
@@ -193,8 +193,6 @@
 
    public synchronized void stop() throws Exception
    {
-      log.info("Stoping cluster connection");
-
       if (!started)
       {
          return;
@@ -205,11 +203,8 @@
          discoveryGroup.unregisterListener(this);
       }
 
-      log.info("Three are " + records.size() + " records");
-
       for (MessageFlowRecord record : records.values())
       {
-         log.info("stopping record");
          record.close();
       }
 
@@ -317,7 +312,6 @@
 
             record.setBridge(bridge);
 
-            log.info("added record");
             records.put(connectorPair, record);
 
             bridge.start();
@@ -344,8 +338,6 @@
    {
       StringBuilder str = new StringBuilder(replaceWildcardChars(config.getFactoryClassName()));
 
-      log.info("config is " + config);
-
       if (config.getParams() != null)
       {
          if (!config.getParams().isEmpty())
@@ -393,9 +385,7 @@
 
       public void close() throws Exception
       {
-         log.info("stopping bridge");
          bridge.stop();
-         log.info("stopped bridge");
 
          for (RemoteQueueBinding binding : bindings.values())
          {
@@ -424,8 +414,6 @@
 
                firstReset = true;
 
-               log.info("did reset");
-
                return;
             }
 
@@ -436,18 +424,16 @@
 
             NotificationType type = NotificationType.valueOf(message.getProperty(ManagementHelper.HDR_NOTIFICATION_TYPE)
                                                                     .toString());
-
-            log.info("Got notification message " + type);
-
+ 
+       
             if (type == NotificationType.BINDING_ADDED)
-            {
-               log.info("queue created");
+            {               
                SimpleString uniqueName = UUIDGenerator.getInstance().generateSimpleStringUUID();
 
                SimpleString queueAddress = (SimpleString)message.getProperty(ManagementHelper.HDR_ADDRESS);
 
                SimpleString queueName = (SimpleString)message.getProperty(ManagementHelper.HDR_QUEUE_NAME);
-
+               
                SimpleString filterString = (SimpleString)message.getProperty(ManagementHelper.HDR_FILTERSTRING);
                
                Integer queueID = (Integer)message.getProperty(ManagementHelper.HDR_BINDING_ID);
@@ -468,7 +454,6 @@
             }
             else if (type == NotificationType.BINDING_REMOVED)
             {
-               log.info("queue destroyed");
                SimpleString queueName = (SimpleString)message.getProperty(ManagementHelper.HDR_QUEUE_NAME);
 
                RemoteQueueBinding binding = bindings.remove(queueName);
@@ -477,7 +462,6 @@
             }
             else if (type == NotificationType.CONSUMER_CREATED)
             {
-               log.info("consumer created");
                SimpleString queueName = (SimpleString)message.getProperty(ManagementHelper.HDR_QUEUE_NAME);
 
                SimpleString filterString = (SimpleString)message.getProperty(ManagementHelper.HDR_FILTERSTRING);
@@ -488,7 +472,6 @@
             }
             else if (type == NotificationType.CONSUMER_CLOSED)
             {
-               log.info("consumer closed");
                SimpleString queueName = (SimpleString)message.getProperty(ManagementHelper.HDR_QUEUE_NAME);
 
                SimpleString filterString = (SimpleString)message.getProperty(ManagementHelper.HDR_FILTERSTRING);

Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/RemoteQueueBindingImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/RemoteQueueBindingImpl.java	2009-01-30 17:20:35 UTC (rev 5767)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/RemoteQueueBindingImpl.java	2009-01-30 19:46:02 UTC (rev 5768)
@@ -198,6 +198,8 @@
    
    public void willRoute(final ServerMessage message)
    {      
+      log.info("routing to remote queue binding");
+      
       //We add a header with the name of the queue, holding a list of the transient ids of the queues to route to
       
       //TODO - this can be optimised
@@ -282,5 +284,10 @@
 
       consumerCount--;
    }
+   
+   public synchronized int consumerCount()
+   {
+      return consumerCount;
+   }
 
 }

Modified: trunk/src/schemas/jbm-queues.xsd
===================================================================
--- trunk/src/schemas/jbm-queues.xsd	2009-01-30 17:20:35 UTC (rev 5767)
+++ trunk/src/schemas/jbm-queues.xsd	2009-01-30 19:46:02 UTC (rev 5768)
@@ -85,5 +85,8 @@
         <xsd:attribute name="durable" type="xsd:boolean"
             use="optional">
         </xsd:attribute>
+        <xsd:attribute name="insert-duplicate-detection-header" type="xsd:boolean"
+            use="optional">
+        </xsd:attribute>
     </xsd:complexType>
 </xsd:schema>

Added: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java	2009-01-30 19:46:02 UTC (rev 5768)
@@ -0,0 +1,503 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.cluster.distribution;
+
+import static org.jboss.messaging.core.remoting.impl.invm.TransportConstants.SERVER_ID_PROP_NAME;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.cluster.BridgeConfiguration;
+import org.jboss.messaging.core.config.cluster.ClusterConnectionConfiguration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.postoffice.Binding;
+import org.jboss.messaging.core.postoffice.Bindings;
+import org.jboss.messaging.core.postoffice.PostOffice;
+import org.jboss.messaging.core.postoffice.QueueBinding;
+import org.jboss.messaging.core.postoffice.impl.LocalQueueBinding;
+import org.jboss.messaging.core.server.Messaging;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.server.cluster.RemoteQueueBinding;
+import org.jboss.messaging.tests.util.ServiceTestBase;
+import org.jboss.messaging.util.Pair;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * A ClusterTestBase
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * 
+ * Created 30 Jan 2009 11:29:43
+ *
+ *
+ */
+public class ClusterTestBase extends ServiceTestBase
+{
+   private static final Logger log = Logger.getLogger(ClusterTestBase.class);
+
+   private static final long WAIT_TIMEOUT = 10000;
+  
+   @Override
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+
+      clearData();
+   }
+
+   @Override
+   protected void tearDown() throws Exception
+   {
+      super.tearDown();
+   }
+
+   // Private -------------------------------------------------------------------------------------------------------
+
+   private static final int MAX_CONSUMERS = 100;
+
+   private ClientConsumer[] consumers = new ClientConsumer[MAX_CONSUMERS];
+
+   private static final SimpleString COUNT_PROP = new SimpleString("count_prop");
+
+   private static final SimpleString FILTER_PROP = new SimpleString("animal");
+
+   private static final int MAX_SERVERS = 10;
+
+   private MessagingService[] services = new MessagingService[MAX_SERVERS];
+
+   private ClientSessionFactory[] sfs = new ClientSessionFactory[MAX_SERVERS];
+   
+   protected void waitForBindings(int node,
+                                final String address,
+                                final int count,
+                                final int consumerCount,
+                                final boolean local) throws Exception
+   {
+      //log.info("waiting for bindings on node " + node + " address " + address + " count " + count + " consumerCount " + consumerCount + " local " + local);
+      MessagingService service = this.services[node];
+
+      if (service == null)
+      {
+         throw new IllegalArgumentException("No service at " + node);
+      }
+
+      PostOffice po = service.getServer().getPostOffice();
+
+      long start = System.currentTimeMillis();
+
+      do
+      {
+         Bindings bindings = po.getBindingsForAddress(new SimpleString(address));
+         
+         int bindingCount = 0;
+
+         int totConsumers = 0;
+
+         for (Binding binding : bindings.getBindings())
+         {
+            if ((binding instanceof LocalQueueBinding && local) || (binding instanceof RemoteQueueBinding && !local))
+            {
+               QueueBinding qBinding = (QueueBinding)binding;
+
+               bindingCount++;
+
+               totConsumers += qBinding.consumerCount();
+            }
+         }
+         
+         //log.info("binding count " + bindingCount + " consumer Count " + totConsumers);
+
+         if (bindingCount == count && totConsumers == consumerCount)
+         {
+            log.info("Waited " + (System.currentTimeMillis() - start));
+            return;
+         }
+
+         Thread.sleep(10);
+      }
+      while (System.currentTimeMillis() - start < WAIT_TIMEOUT);
+
+      throw new IllegalStateException("Timed out waiting for bindings");
+   }
+
+   protected void createQueue(int node, String address, String queueName, String filterVal, boolean durable) throws Exception
+   {
+      ClientSessionFactory sf = this.sfs[node];
+
+      if (sf == null)
+      {
+         throw new IllegalArgumentException("No sf at " + node);
+      }
+
+      ClientSession session = sf.createSession(false, true, true);
+
+      String filterString = null;
+
+      if (filterVal != null)
+      {
+         filterString = FILTER_PROP.toString() + "='" + filterVal + "'";
+      }
+
+      session.createQueue(address, queueName, filterString, durable, false);
+
+      session.close();
+   }
+
+   protected void deleteQueue(int node, String queueName) throws Exception
+   {
+      ClientSessionFactory sf = this.sfs[node];
+
+      if (sf == null)
+      {
+         throw new IllegalArgumentException("No sf at " + node);
+      }
+
+      ClientSession session = sf.createSession(false, true, true);
+
+      session.deleteQueue(queueName);
+
+      session.close();
+   }
+
+   protected void addConsumer(int consumerID, int node, String queueName, String filterVal) throws Exception
+   {      
+      if (consumers[consumerID] != null)
+      {
+         throw new IllegalArgumentException("Already a consumer at " + node);
+      }
+
+      ClientSessionFactory sf = this.sfs[node];
+
+      if (sf == null)
+      {
+         throw new IllegalArgumentException("No sf at " + node);
+      }
+
+      ClientSession session = sf.createSession(false, true, true);
+
+      String filterString = null;
+
+      if (filterVal != null)
+      {
+         filterString = FILTER_PROP.toString() + "='" + filterVal + "'";
+      }
+
+      ClientConsumer consumer = session.createConsumer(queueName, filterString);
+
+      session.start();
+
+      consumers[consumerID] = consumer;
+   }
+
+   protected void removeConsumer(int consumerID) throws Exception
+   {
+      ClientConsumer consumer = consumers[consumerID];
+
+      if (consumer == null)
+      {
+         throw new IllegalArgumentException("No consumer at " + consumerID);
+      }
+
+      consumer.close();
+   }
+
+   protected void send(int node, String address, int numMessages, boolean durable, String filterVal) throws Exception
+   {
+      ClientSessionFactory sf = this.sfs[node];
+
+      if (sf == null)
+      {
+         throw new IllegalArgumentException("No sf at " + node);
+      }
+
+      ClientSession session = sf.createSession(false, true, true);
+
+      ClientProducer producer = session.createProducer(address);
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = session.createClientMessage(durable);
+
+         if (filterVal != null)
+         {
+            message.putStringProperty(FILTER_PROP, new SimpleString(filterVal));
+         }
+
+         message.putIntProperty(COUNT_PROP, i);
+
+         producer.send(message);
+      }
+
+      session.close();
+   }
+
+   protected void verifyReceiveAll(int numMessages, int... consumerIDs) throws Exception
+   {
+      for (int i = 0; i < consumerIDs.length; i++)
+      {
+         ClientConsumer consumer = consumers[consumerIDs[i]];
+
+         if (consumer == null)
+         {
+            throw new IllegalArgumentException("No consumer at " + consumerIDs[i]);
+         }
+
+         for (int j = 0; j < numMessages; j++)
+         {
+            ClientMessage message = consumer.receive(500);
+
+            assertNotNull(message);
+
+            assertEquals(j, message.getProperty(COUNT_PROP));
+         }
+      }
+   }
+
+   protected void verifyReceiveRoundRobin(int numMessages, int... consumerIDs) throws Exception
+   {
+      int count = 0;
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientConsumer consumer = consumers[consumerIDs[count]];
+
+         if (consumer == null)
+         {
+            throw new IllegalArgumentException("No consumer at " + consumerIDs[i]);
+         }
+
+         ClientMessage message = consumer.receive(500);
+
+         assertNotNull(message);
+
+         assertEquals(i, message.getProperty(COUNT_PROP));
+
+         count++;
+
+         if (count == consumerIDs.length)
+         {
+            count = 0;
+         }
+      }
+   }
+
+   protected void verifyNotReceive(int... consumerIDs) throws Exception
+   {
+      for (int i = 0; i < consumerIDs.length; i++)
+      {
+         ClientConsumer consumer = consumers[consumerIDs[i]];
+
+         if (consumer == null)
+         {
+            throw new IllegalArgumentException("No consumer at " + consumerIDs[i]);
+         }
+
+         assertNull(consumer.receive(200));
+      }
+   }
+
+   protected void setupSessionFactory(int node, boolean netty)
+   {
+      if (sfs[node] != null)
+      {
+         throw new IllegalArgumentException("Already a service at " + node);
+      }
+
+      Map<String, Object> params = generateParams(node, netty);
+
+      TransportConfiguration serverTotc;
+
+      if (netty)
+      {
+         serverTotc = new TransportConfiguration(NETTY_CONNECTOR_FACTORY, params);
+      }
+      else
+      {
+         serverTotc = new TransportConfiguration(INVM_CONNECTOR_FACTORY, params);
+      }
+
+      ClientSessionFactory sf = new ClientSessionFactoryImpl(serverTotc);
+
+      sfs[node] = sf;
+   }
+
+   protected void setupServer(int node, boolean fileStorage, boolean netty)
+   {
+      if (services[node] != null)
+      {
+         throw new IllegalArgumentException("Already a service at node " + node);
+      }
+
+      Configuration configuration = new ConfigurationImpl();
+
+      configuration.setSecurityEnabled(false);
+      configuration.setBindingsDirectory(getBindingsDir(node));
+      configuration.setJournalMinFiles(2);
+      configuration.setJournalDirectory(getJournalDir(node));
+      configuration.setJournalFileSize(100 * 1024);
+      configuration.setPagingDirectory(getPageDir(node));
+      configuration.setLargeMessagesDirectory(getLargeMessagesDir(node));
+      configuration.setClustered(true);
+
+      configuration.getAcceptorConfigurations().clear();
+
+      Map<String, Object> params = generateParams(node, netty);
+
+      TransportConfiguration invmtc = new TransportConfiguration(INVM_ACCEPTOR_FACTORY, params);
+      configuration.getAcceptorConfigurations().add(invmtc);
+
+      if (netty)
+      {
+         TransportConfiguration nettytc = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params);
+         configuration.getAcceptorConfigurations().add(nettytc);
+      }
+
+      MessagingService service;
+
+      if (fileStorage)
+      {
+         service = Messaging.newMessagingService(configuration);
+      }
+      else
+      {
+         service = Messaging.newNullStorageMessagingService(configuration);
+      }
+      services[node] = service;
+   }
+
+   private Map<String, Object> generateParams(int node, boolean netty)
+   {
+      Map<String, Object> params = new HashMap<String, Object>();
+      params.put(SERVER_ID_PROP_NAME, node);
+
+      if (netty)
+      {
+         params.put(org.jboss.messaging.integration.transports.netty.TransportConstants.PORT_PROP_NAME,
+                    org.jboss.messaging.integration.transports.netty.TransportConstants.DEFAULT_PORT + node);
+      }
+
+      return params;
+   }
+
+   protected void clearServer(int node)
+   {
+      if (services[node] != null)
+      {
+         throw new IllegalArgumentException("No service at node " + node);
+      }
+
+      services[node] = null;
+   }
+
+   protected void setupClusterConnection(String name,
+                                       int nodeFrom,
+                                       int nodeTo,
+                                       String address,
+                                       boolean forwardWhenNoConsumers,
+                                       boolean netty)
+   {
+      MessagingService serviceFrom = services[nodeFrom];
+
+      Map<String, TransportConfiguration> connectors = serviceFrom.getServer()
+                                                                  .getConfiguration()
+                                                                  .getConnectorConfigurations();
+
+      Map<String, Object> params = generateParams(nodeTo, netty);
+
+      TransportConfiguration serverTotc;
+
+      if (netty)
+      {
+         serverTotc = new TransportConfiguration(NETTY_CONNECTOR_FACTORY, params);
+      }
+      else
+      {
+         serverTotc = new TransportConfiguration(INVM_CONNECTOR_FACTORY, params);
+      }
+
+      connectors.put(serverTotc.getName(), serverTotc);
+
+      serviceFrom.getServer().getConfiguration().setConnectorConfigurations(connectors);
+
+      Pair<String, String> connectorPair = new Pair<String, String>(serverTotc.getName(), null);
+
+      List<Pair<String, String>> pairs = new ArrayList<Pair<String, String>>();
+      pairs.add(connectorPair);
+
+      BridgeConfiguration bridgeConfiguration = new BridgeConfiguration(null,
+                                                                        null,
+                                                                        null,
+                                                                        null,
+                                                                        1,
+                                                                        -1,
+                                                                        null,
+                                                                        10,
+                                                                        1d,
+                                                                        -1,
+                                                                        -1,
+                                                                        false,
+                                                                        connectorPair);
+
+      ClusterConnectionConfiguration clusterConf = new ClusterConnectionConfiguration(name,
+                                                                                      address,
+                                                                                      bridgeConfiguration,
+                                                                                      true,
+                                                                                      forwardWhenNoConsumers,
+                                                                                      pairs);
+      List<ClusterConnectionConfiguration> clusterConfs = serviceFrom.getServer()
+                                                                     .getConfiguration()
+                                                                     .getClusterConfigurations();
+
+      clusterConfs.add(clusterConf);
+
+      serviceFrom.getServer().getConfiguration().setClusterConfigurations(clusterConfs);
+   }
+
+   protected void startServers(int... nodes) throws Exception
+   {
+      for (int i = 0; i < nodes.length; i++)
+      {
+         services[nodes[i]].start();
+      }
+   }
+
+   protected void stopServers(int... nodes) throws Exception
+   {
+      for (int i = 0; i < nodes.length; i++)
+      {
+         services[nodes[i]].stop();
+      }
+   }
+
+}

Added: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java	2009-01-30 19:46:02 UTC (rev 5768)
@@ -0,0 +1,314 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.tests.integration.cluster.distribution;
+
+/**
+ * A OnewayTwoNodeClusterTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * 
+ * Created 30 Jan 2009 18:03:28
+ *
+ *
+ */
+public class OnewayTwoNodeClusterTest extends ClusterTestBase
+{
+   @Override
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+      
+      setupServer(0, false, false);
+      setupServer(1, false, false);
+      
+      setupClusterConnection("cluster1", 0, 1, "queues", false, false);
+   }
+
+   @Override
+   protected void tearDown() throws Exception
+   {
+      stopServers(0, 1);
+      
+      super.tearDown();
+   }
+   
+   public void testStartTargetServerBeforeSourceServer() throws Exception
+   {
+      startServers(1, 0);
+
+      setupSessionFactory(0, false);
+      setupSessionFactory(1, false);
+
+      createQueue(1, "queues.testaddress", "queue0", null, false);
+      addConsumer(0, 1, "queue0", null);
+
+      waitForBindings(0, "queues.testaddress", 1, 1, false);
+
+      send(0, "queues.testaddress", 10, false, null);
+      verifyReceiveAll(10, 0);
+      verifyNotReceive(0);
+   }
+   
+   public void testStartSourceServerBeforeTargetServer() throws Exception
+   {
+      startServers(0, 1);
+
+      setupSessionFactory(0, false);
+      setupSessionFactory(1, false);
+
+      createQueue(1, "queues.testaddress", "queue0", null, false);
+      addConsumer(0, 1, "queue0", null);
+          
+      waitForBindings(0, "queues.testaddress", 1, 1, false);
+
+      send(0, "queues.testaddress", 10, false, null);
+      verifyReceiveAll(10, 0);
+      verifyNotReceive(0);
+   }
+
+   public void testBasicLocalReceive() throws Exception
+   {
+      startServers(1, 0);
+
+      setupSessionFactory(0, false);
+      setupSessionFactory(1, false);
+
+      createQueue(0, "queues.testaddress", "queue0", null, false);
+      addConsumer(0, 0, "queue0", null);
+
+      send(0, "queues.testaddress", 10, false, null);
+      verifyReceiveAll(10, 0);
+      verifyNotReceive(0);
+
+      addConsumer(1, 0, "queue0", null);
+      verifyNotReceive(1);
+   }
+
+   public void testBasicRoundRobin() throws Exception
+   {
+      startServers(1, 0);
+
+      setupSessionFactory(0, false);
+      setupSessionFactory(1, false);
+
+      createQueue(0, "queues.testaddress", "queue0", null, false);
+
+      createQueue(1, "queues.testaddress", "queue0", null, false);
+
+      addConsumer(0, 0, "queue0", null);
+
+      addConsumer(1, 1, "queue0", null);
+
+      waitForBindings(0, "queues.testaddress", 1, 1, true);
+      waitForBindings(0, "queues.testaddress", 1, 1, false);
+
+      send(0, "queues.testaddress", 10, false, null);
+      verifyReceiveRoundRobin(10, 0, 1);
+      verifyNotReceive(0, 1);
+   }
+   
+   public void testRoundRobinMultipleQueues() throws Exception
+   {
+      startServers(1, 0);
+
+      setupSessionFactory(0, false);
+      setupSessionFactory(1, false);
+
+      createQueue(0, "queues.testaddress", "queue0", null, false);
+      createQueue(1, "queues.testaddress", "queue0", null, false);
+      
+      createQueue(0, "queues.testaddress", "queue1", null, false);
+      createQueue(1, "queues.testaddress", "queue1", null, false);
+      
+      createQueue(0, "queues.testaddress", "queue2", null, false);
+      createQueue(1, "queues.testaddress", "queue2", null, false);
+
+      addConsumer(0, 0, "queue0", null);
+      addConsumer(1, 1, "queue0", null);
+      
+      addConsumer(2, 0, "queue1", null);
+      addConsumer(3, 1, "queue1", null);
+      
+      addConsumer(4, 0, "queue2", null);
+      addConsumer(5, 1, "queue2", null);
+
+      waitForBindings(0, "queues.testaddress", 3, 3, true);
+      waitForBindings(0, "queues.testaddress", 3, 3, false);
+
+      send(0, "queues.testaddress", 10, false, null);
+                  
+      verifyReceiveRoundRobin(10, 0, 1);
+      
+      verifyReceiveRoundRobin(10, 2, 3);
+      
+      verifyReceiveRoundRobin(10, 4, 5);
+      
+      verifyNotReceive(0, 1, 2, 3, 4, 5);
+   }
+   
+   public void testMultipleNonLoadBalancedQueues() throws Exception
+   {
+      startServers(1, 0);
+
+      setupSessionFactory(0, false);
+      setupSessionFactory(1, false);
+
+      createQueue(0, "queues.testaddress", "queue0", null, false);
+      createQueue(0, "queues.testaddress", "queue1", null, false);
+      createQueue(0, "queues.testaddress", "queue2", null, false);
+      createQueue(0, "queues.testaddress", "queue3", null, false);
+      createQueue(0, "queues.testaddress", "queue4", null, false);
+    
+      
+      createQueue(1, "queues.testaddress", "queue5", null, false);
+      createQueue(1, "queues.testaddress", "queue6", null, false);
+      createQueue(1, "queues.testaddress", "queue7", null, false);
+      createQueue(1, "queues.testaddress", "queue8", null, false);
+      createQueue(1, "queues.testaddress", "queue9", null, false);
+
+      
+      addConsumer(0, 0, "queue0", null);
+      addConsumer(1, 0, "queue1", null);
+      addConsumer(2, 0, "queue2", null);
+      addConsumer(3, 0, "queue3", null);
+      addConsumer(4, 0, "queue4", null);
+      
+      addConsumer(5, 1, "queue5", null);
+      addConsumer(6, 1, "queue6", null);
+      addConsumer(7, 1, "queue7", null);
+      addConsumer(8, 1, "queue8", null);
+      addConsumer(9, 1, "queue9", null);
+           
+      waitForBindings(0, "queues.testaddress", 5, 5, true);
+      waitForBindings(0, "queues.testaddress", 5, 5, false);
+
+      send(0, "queues.testaddress", 10, false, null);
+                  
+      verifyReceiveAll(10, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
+      
+      verifyNotReceive(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
+   }
+   
+   public void testMixtureLoadBalancedAndNonLoadBalancedQueues() throws Exception
+   {
+      startServers(1, 0);
+
+      setupSessionFactory(0, false);
+      setupSessionFactory(1, false);
+
+      createQueue(0, "queues.testaddress", "queue0", null, false);
+      createQueue(0, "queues.testaddress", "queue1", null, false);
+      createQueue(0, "queues.testaddress", "queue2", null, false);
+      createQueue(0, "queues.testaddress", "queue3", null, false);
+      createQueue(0, "queues.testaddress", "queue4", null, false);
+    
+      
+      createQueue(1, "queues.testaddress", "queue5", null, false);
+      createQueue(1, "queues.testaddress", "queue6", null, false);
+      createQueue(1, "queues.testaddress", "queue7", null, false);
+      createQueue(1, "queues.testaddress", "queue8", null, false);
+      createQueue(1, "queues.testaddress", "queue9", null, false);
+      
+      createQueue(0, "queues.testaddress", "queue10", null, false);
+      createQueue(1, "queues.testaddress", "queue10", null, false);
+      
+      createQueue(0, "queues.testaddress", "queue11", null, false);
+      createQueue(1, "queues.testaddress", "queue11", null, false);
+      
+      createQueue(0, "queues.testaddress", "queue12", null, false);
+      createQueue(1, "queues.testaddress", "queue12", null, false);
+
+      
+      addConsumer(0, 0, "queue0", null);
+      addConsumer(1, 0, "queue1", null);
+      addConsumer(2, 0, "queue2", null);
+      addConsumer(3, 0, "queue3", null);
+      addConsumer(4, 0, "queue4", null);
+      
+      addConsumer(5, 1, "queue5", null);
+      addConsumer(6, 1, "queue6", null);
+      addConsumer(7, 1, "queue7", null);
+      addConsumer(8, 1, "queue8", null);
+      addConsumer(9, 1, "queue9", null);
+      
+      addConsumer(10, 0, "queue10", null);
+      addConsumer(11, 1, "queue10", null);
+      
+      addConsumer(12, 0, "queue11", null);
+      addConsumer(13, 1, "queue11", null);
+      
+      addConsumer(14, 0, "queue12", null);
+      addConsumer(15, 1, "queue12", null);
+           
+      waitForBindings(0, "queues.testaddress", 8, 8, true);
+      waitForBindings(0, "queues.testaddress", 8, 8, false);
+
+      send(0, "queues.testaddress", 10, false, null);
+                  
+      verifyReceiveAll(10, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
+      
+      verifyReceiveRoundRobin(10, 10, 11);
+      verifyReceiveRoundRobin(10, 12, 13);
+      verifyReceiveRoundRobin(10, 14, 15);
+      
+      verifyNotReceive(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15);
+   }
+   
+   public void testNotRouteToNonMatchingAddress() throws Exception
+   {
+      startServers(1, 0);
+
+      setupSessionFactory(0, false);
+      setupSessionFactory(1, false);
+
+      createQueue(0, "queues.testaddress", "queue0", null, false);                
+      createQueue(1, "queues.testaddress", "queue1", null, false);
+      
+      createQueue(0, "queues.testaddress2", "queue2", null, false);
+      createQueue(1, "queues.testaddress2", "queue2", null, false);
+      createQueue(0, "queues.testaddress2", "queue3", null, false);
+      createQueue(1, "queues.testaddress2", "queue4", null, false);
+            
+      addConsumer(0, 0, "queue0", null);
+      addConsumer(1, 1, "queue1", null);
+      addConsumer(2, 0, "queue2", null);
+      addConsumer(3, 1, "queue2", null);
+      addConsumer(4, 0, "queue3", null);
+      addConsumer(5, 1, "queue4", null);
+                 
+      waitForBindings(0, "queues.testaddress", 1, 1, true);
+      waitForBindings(0, "queues.testaddress", 1, 1, false);
+      
+      waitForBindings(0, "queues.testaddress2", 2, 2, true);
+      waitForBindings(0, "queues.testaddress2", 2, 2, false);
+
+      send(0, "queues.testaddress", 10, false, null);
+                  
+      verifyReceiveAll(10, 0, 1);
+      
+      verifyNotReceive(2, 3, 4, 5);
+   }
+
+
+}

Modified: trunk/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java	2009-01-30 17:20:35 UTC (rev 5767)
+++ trunk/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java	2009-01-30 19:46:02 UTC (rev 5768)
@@ -48,6 +48,12 @@
  */
 public class FakePostOffice implements PostOffice
 {
+   public void sendQueueInfoToQueue(SimpleString queueName, SimpleString address) throws Exception
+   {
+      // TODO Auto-generated method stub
+      
+   }
+
    private ConcurrentHashMap<SimpleString, Binding> bindings = new ConcurrentHashMap<SimpleString, Binding>();
 
    private ConcurrentHashSet<SimpleString> addresses = new ConcurrentHashSet<SimpleString>();




More information about the jboss-cvs-commits mailing list