[jboss-cvs] JBoss Messaging SVN: r5860 - in trunk: src/main/org/jboss/messaging/core/client/impl and 5 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Feb 13 10:03:51 EST 2009


Author: timfox
Date: 2009-02-13 10:03:51 -0500 (Fri, 13 Feb 2009)
New Revision: 5860

Added:
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/NettySymmetricClusterWithDiscoveryTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SymmetricClusterWithDiscoveryTest.java
Modified:
   trunk/src/config/jbm-configuration.xml
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java
   trunk/src/main/org/jboss/messaging/core/cluster/impl/DiscoveryGroupImpl.java
   trunk/src/main/org/jboss/messaging/core/deployers/impl/QueueDeployer.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BroadcastGroupImpl.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SymmetricClusterTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/discovery/DiscoveryTest.java
Log:
more clustering tests and tweaks

Modified: trunk/src/config/jbm-configuration.xml
===================================================================
--- trunk/src/config/jbm-configuration.xml	2009-02-13 11:59:51 UTC (rev 5859)
+++ trunk/src/config/jbm-configuration.xml	2009-02-13 15:03:51 UTC (rev 5860)
@@ -4,9 +4,7 @@
    <configuration>
 
       <clustered>false</clustered>
-      
-      
-      
+                  
       <!-- Maximum number of threads to use for scheduled deliveries -->
       <scheduled-max-pool-size>30</scheduled-max-pool-size>
 

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java	2009-02-13 11:59:51 UTC (rev 5859)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java	2009-02-13 15:03:51 UTC (rev 5860)
@@ -29,6 +29,7 @@
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.util.Pair;
+import org.jboss.messaging.util.UUIDGenerator;
 
 /**
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
@@ -171,7 +172,8 @@
    {
       InetAddress groupAddress = InetAddress.getByName(discoveryGroupAddress);
 
-      discoveryGroup = new DiscoveryGroupImpl(discoveryGroupAddress,
+      discoveryGroup = new DiscoveryGroupImpl(UUIDGenerator.getInstance().generateStringUUID(),
+                                              discoveryGroupAddress,
                                               groupAddress,
                                               discoveryGroupPort,
                                               discoveryRefreshTimeout);
@@ -232,7 +234,8 @@
       {
          InetAddress groupAddress = InetAddress.getByName(discoveryGroupAddress);
 
-         discoveryGroup = new DiscoveryGroupImpl(discoveryGroupAddress,
+         discoveryGroup = new DiscoveryGroupImpl(UUIDGenerator.getInstance().generateStringUUID(),
+                                                 discoveryGroupAddress,
                                                  groupAddress,
                                                  discoveryGroupPort,
                                                  discoveryRefreshTimeout);

Modified: trunk/src/main/org/jboss/messaging/core/cluster/impl/DiscoveryGroupImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/cluster/impl/DiscoveryGroupImpl.java	2009-02-13 11:59:51 UTC (rev 5859)
+++ trunk/src/main/org/jboss/messaging/core/cluster/impl/DiscoveryGroupImpl.java	2009-02-13 15:03:51 UTC (rev 5860)
@@ -39,6 +39,7 @@
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.cluster.DiscoveryGroup;
 import org.jboss.messaging.util.Pair;
+import org.jboss.messaging.util.SimpleString;
 
 /**
  * A DiscoveryGroupImpl
@@ -51,44 +52,52 @@
 public class DiscoveryGroupImpl implements Runnable, DiscoveryGroup
 {
    private static final Logger log = Logger.getLogger(DiscoveryGroupImpl.class);
-   
+
    private static final int SOCKET_TIMEOUT = 500;
-   
+
    private MulticastSocket socket;
 
    private final List<DiscoveryListener> listeners = new ArrayList<DiscoveryListener>();
-        
+
    private final String name;
 
    private final Thread thread;
-   
+
    private boolean received;
-   
+
    private final Object waitLock = new Object();
-   
+
    private final Map<Pair<TransportConfiguration, TransportConfiguration>, Long> connectors = new HashMap<Pair<TransportConfiguration, TransportConfiguration>, Long>();
-   
+
    private final long timeout;
-   
+
    private volatile boolean started;
-   
-   public DiscoveryGroupImpl(final String name, final InetAddress groupAddress, final int groupPort, final long timeout) throws Exception
+
+   private final String nodeID;
+
+   public DiscoveryGroupImpl(final String nodeID,
+                             final String name,
+                             final InetAddress groupAddress,
+                             final int groupPort,
+                             final long timeout) throws Exception
    {
+      this.nodeID = nodeID;
+
       this.name = name;
 
       socket = new MulticastSocket(groupPort);
 
       socket.joinGroup(groupAddress);
-      
+
       socket.setSoTimeout(SOCKET_TIMEOUT);
-      
+
       this.timeout = timeout;
-      
+
       thread = new Thread(this);
-      
+
       thread.setDaemon(true);
    }
-   
+
    public synchronized void start() throws Exception
    {
       if (started)
@@ -96,11 +105,11 @@
          return;
       }
       
+      started = true;
+      
       thread.start();
-      
-      started = true;
    }
-   
+
    public void stop()
    {
       synchronized (this)
@@ -109,85 +118,85 @@
          {
             return;
          }
-         
+
          started = false;
       }
-      
+
       try
       {
          thread.join();
       }
       catch (InterruptedException e)
-      {        
+      {
       }
-      
-      socket.close();           
+
+      socket.close();
    }
-   
+
    public boolean isStarted()
    {
       return started;
    }
-   
+
    public String getName()
    {
       return name;
    }
-        
+
    public synchronized List<Pair<TransportConfiguration, TransportConfiguration>> getConnectors()
    {
       return new ArrayList<Pair<TransportConfiguration, TransportConfiguration>>(connectors.keySet());
    }
-   
+
    public boolean waitForBroadcast(final long timeout)
-   {      
+   {
       synchronized (waitLock)
-      { 
+      {
          long start = System.currentTimeMillis();
-         
+
          long toWait = timeout;
-         
+
          while (!received && toWait > 0)
-         {      
+         {
             try
             {
-               waitLock.wait(toWait);        
+               waitLock.wait(toWait);
             }
             catch (InterruptedException e)
-            {               
+            {
             }
-            
+
             long now = System.currentTimeMillis();
-            
+
             toWait -= now - start;
 
-            start = now;                       
+            start = now;
          }
-         
+
          boolean ret = received;
-         
+
          received = false;
-         
+
          return ret;
-      }      
+      }
    }
 
    public void run()
    {
-      //TODO - can we use a smaller buffer size?
-      final byte[] data = new byte[65535];
-      
-      final DatagramPacket packet = new DatagramPacket(data, data.length);
-      
       try
-      {      
+      {
+         // TODO - can we use a smaller buffer size?
+         final byte[] data = new byte[65535];
+
          while (true)
          {
             if (!started)
             {
                return;
             }
-            
+  
+            final DatagramPacket packet = new DatagramPacket(data, data.length);
+
             try
             {
                socket.receive(packet);
@@ -203,74 +212,79 @@
                   continue;
                }
             }
-            
+
             ByteArrayInputStream bis = new ByteArrayInputStream(data);
-            
+
             ObjectInputStream ois = new ObjectInputStream(bis);
-            
+
+            String originatingNodeID = ois.readUTF();
+
+            if (nodeID.equals(originatingNodeID))
+            {
+               //Ignore traffic from own node
+               continue;
+            }
+
             int size = ois.readInt();
-            
+
             boolean changed = false;
-            
+
             synchronized (this)
-            {            
+            {
                for (int i = 0; i < size; i++)
                {
                   TransportConfiguration connector = (TransportConfiguration)ois.readObject();
-                  
+
                   boolean existsBackup = ois.readBoolean();
-                  
+
                   TransportConfiguration backupConnector = null;
-                  
+
                   if (existsBackup)
                   {
                      backupConnector = (TransportConfiguration)ois.readObject();
                   }
-                  
-                  Pair<TransportConfiguration, TransportConfiguration> connectorPair =
-                     new Pair<TransportConfiguration, TransportConfiguration>(connector, backupConnector);
-                  
+
+                  Pair<TransportConfiguration, TransportConfiguration> connectorPair = new Pair<TransportConfiguration, TransportConfiguration>(connector,
+                                                                                                                                                backupConnector);
+
                   Long oldVal = connectors.put(connectorPair, System.currentTimeMillis());
-                  
+
                   if (oldVal == null)
                   {
                      changed = true;
                   }
                }
-               
+
                long now = System.currentTimeMillis();
-               
-               Iterator<Map.Entry<Pair<TransportConfiguration, TransportConfiguration>, Long>> iter = connectors.entrySet().iterator();
-               
-               //Weed out any expired connectors
-               
+
+               Iterator<Map.Entry<Pair<TransportConfiguration, TransportConfiguration>, Long>> iter = connectors.entrySet()
+                                                                                                                .iterator();
+               // Weed out any expired connectors
+
                while (iter.hasNext())
                {
                   Map.Entry<Pair<TransportConfiguration, TransportConfiguration>, Long> entry = iter.next();
-                  
+
                   if (entry.getValue() + timeout <= now)
                   {
                      iter.remove();
-                     
+
                      changed = true;
                   }
                }
             }
-            
-            packet.setLength(data.length);
-            
+
             if (changed)
             {
                callListeners();
             }
-            
+
             synchronized (waitLock)
             {
                received = true;
-               
+
                waitLock.notify();
             }
-                        
          }
       }
       catch (Exception e)
@@ -278,20 +292,25 @@
          log.error("Failed to receive datagram", e);
       }
    }
-   
+
    public synchronized void registerListener(final DiscoveryListener listener)
    {
-      this.listeners.add(listener);
+      listeners.add(listener);
+
+      if (!connectors.isEmpty())
+      {
+         listener.connectorsChanged();
+      }
    }
-   
+
    public synchronized void unregisterListener(final DiscoveryListener listener)
    {
-      this.listeners.remove(listener);
+      listeners.remove(listener);
    }
-   
+
    private void callListeners()
-   {      
-      for (DiscoveryListener listener: listeners)
+   {
+      for (DiscoveryListener listener : listeners)
       {
          try
          {
@@ -299,9 +318,46 @@
          }
          catch (Throwable t)
          {
-            //Catch it so exception doesn't prevent other listeners from running
+            // Catch it so exception doesn't prevent other listeners from running
             log.error("Failed to call discovery listener", t);
          }
       }
    }
+
+   private String replaceWildcardChars(final String str)
+   {
+      return str.replace('.', '-');
+   }
+
+   private SimpleString generateConnectorString(final TransportConfiguration config) throws Exception
+   {
+      StringBuilder str = new StringBuilder(replaceWildcardChars(config.getFactoryClassName()));
+
+      if (config.getParams() != null)
+      {
+         if (!config.getParams().isEmpty())
+         {
+            str.append("?");
+         }
+
+         boolean first = true;
+         for (Map.Entry<String, Object> entry : config.getParams().entrySet())
+         {
+            if (!first)
+            {
+               str.append("&");
+            }
+            String encodedKey = replaceWildcardChars(entry.getKey());
+
+            String val = entry.getValue().toString();
+            String encodedVal = replaceWildcardChars(val);
+
+            str.append(encodedKey).append('=').append(encodedVal);
+
+            first = false;
+         }
+      }
+
+      return new SimpleString(str.toString());
+   }
 }

Modified: trunk/src/main/org/jboss/messaging/core/deployers/impl/QueueDeployer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/deployers/impl/QueueDeployer.java	2009-02-13 11:59:51 UTC (rev 5859)
+++ trunk/src/main/org/jboss/messaging/core/deployers/impl/QueueDeployer.java	2009-02-13 15:03:51 UTC (rev 5860)
@@ -18,7 +18,7 @@
  * License along with this software; if not, write to the Free
  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */ 
+ */
 
 package org.jboss.messaging.core.deployers.impl;
 
@@ -37,23 +37,22 @@
  * @author <a href="jmesnil at redhat.com">Jeff Mesnil</a>
  */
 public class QueueDeployer extends XmlDeployer
-{   
+{
    private final Configuration serverConfiguration;
 
-   public QueueDeployer(final DeploymentManager deploymentManager,
-                                final Configuration configuration)
+   public QueueDeployer(final DeploymentManager deploymentManager, final Configuration configuration)
    {
       super(deploymentManager);
       this.serverConfiguration = configuration;
    }
-   
+
    /**
     * the names of the elements to deploy
     * @return the names of the elements todeploy
     */
    public String[] getElementTagName()
    {
-      return new String[]{"queue"};
+      return new String[] { "queue" };
    }
 
    @Override
@@ -62,9 +61,10 @@
       if ("deployment".equals(rootNode.getNodeName()))
       {
          XMLUtil.validate(rootNode, "jbm-configuration.xsd");
-      } else 
+      }
+      else
       {
-         XMLUtil.validate(rootNode, "jbm-queues.xsd");         
+         XMLUtil.validate(rootNode, "jbm-queues.xsd");
       }
    }
 
@@ -80,7 +80,7 @@
       configurations.add(queueConfig);
       serverConfiguration.setQueueConfigurations(configurations);
    }
-   
+
    @Override
    public void undeploy(Node node) throws Exception
    {
@@ -93,19 +93,19 @@
     */
    public String[] getConfigFileNames()
    {
-      return new String[] {"jbm-configuration.xml", "jbm-queues.xml"};
+      return new String[] { "jbm-configuration.xml", "jbm-queues.xml" };
    }
 
    private QueueConfiguration parseQueueConfiguration(final Node node)
    {
       String name = node.getAttributes().getNamedItem("name").getNodeValue();
-      
+
       String address = node.getAttributes().getNamedItem("address").getNodeValue();
 
       String filterString = null;
 
       Node filterNode = node.getAttributes().getNamedItem("filter");
-      if (filterNode !=null)
+      if (filterNode != null)
       {
          String filterValue = filterNode.getNodeValue();
          if (!"".equals(filterValue.trim()))
@@ -120,7 +120,7 @@
       {
          durable = Boolean.parseBoolean(durableNode.getNodeValue());
       }
-      
+
       return new QueueConfiguration(address, name, filterString, durable);
    }
 

Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java	2009-02-13 11:59:51 UTC (rev 5859)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java	2009-02-13 15:03:51 UTC (rev 5860)
@@ -280,6 +280,16 @@
 
    public void stop() throws Exception
    {
+      if (started)
+      {
+         //We need to stop the csf here otherwise the stop runnable never runs since the createobjectsrunnable is trying to connect to the target
+         //server which isn't up in an infinite loop
+         if (csf != null)
+         {
+            csf.close();
+         }
+      }
+      
       executor.execute(new StopRunnable());
       
       this.waitForRunnablesToComplete();
@@ -298,10 +308,6 @@
                   return;
                }
 
-               // We close the session factory here - this will cause any connection retries to stop
-
-               csf.close();
-
                if (session != null)
                {    
                   session.close();               
@@ -401,7 +407,7 @@
       {
          return false;
       }
-
+      
       try
       {
          queue.addConsumer(BridgeImpl.this);

Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BroadcastGroupImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BroadcastGroupImpl.java	2009-02-13 11:59:51 UTC (rev 5859)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BroadcastGroupImpl.java	2009-02-13 15:03:51 UTC (rev 5860)
@@ -29,12 +29,14 @@
 import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ScheduledFuture;
 
 import org.jboss.messaging.core.config.TransportConfiguration;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.server.cluster.BroadcastGroup;
 import org.jboss.messaging.util.Pair;
+import org.jboss.messaging.util.SimpleString;
 
 /**
  * A BroadcastGroupImpl
@@ -48,6 +50,8 @@
 {
    private static final Logger log = Logger.getLogger(BroadcastGroupImpl.class);
 
+   private final String nodeID;
+
    private final String name;
 
    private final InetAddress localBindAddress;
@@ -59,21 +63,24 @@
    private final int groupPort;
 
    private DatagramSocket socket;
-   
+
    private final List<Pair<TransportConfiguration, TransportConfiguration>> connectorPairs = new ArrayList<Pair<TransportConfiguration, TransportConfiguration>>();
-   
+
    private boolean started;
-   
+
    private ScheduledFuture<?> future;
-   
-   public BroadcastGroupImpl(final String name,
+
+   public BroadcastGroupImpl(final String nodeID,
+                             final String name,
                              final InetAddress localBindAddress,
                              final int localPort,
                              final InetAddress groupAddress,
                              final int groupPort) throws Exception
    {
+      this.nodeID = nodeID;
+
       this.name = name;
-      
+
       this.localBindAddress = localBindAddress;
 
       this.localPort = localPort;
@@ -84,43 +91,43 @@
 
       // FIXME - doesn't seem to work when specifying port and address
 
-      // this.socket = new DatagramSocket(localPort, localBindAddress);     
+      // this.socket = new DatagramSocket(localPort, localBindAddress);
    }
-   
+
    public synchronized void start() throws Exception
    {
       if (started)
       {
          return;
       }
-      
+
       socket = new DatagramSocket();
-      
+
       started = true;
    }
-   
+
    public synchronized void stop()
    {
       if (!started)
       {
          return;
       }
-      
+
       if (future != null)
       {
          future.cancel(false);
       }
-            
+
       socket.close();
-      
+
       started = false;
    }
-   
+
    public synchronized boolean isStarted()
    {
       return started;
    }
-   
+
    public String getName()
    {
       return name;
@@ -130,7 +137,7 @@
    {
       connectorPairs.add(connectorPair);
    }
-   
+
    public synchronized void removeConnectorPair(final Pair<TransportConfiguration, TransportConfiguration> connectorPair)
    {
       connectorPairs.remove(connectorPair);
@@ -140,7 +147,7 @@
    {
       return connectorPairs.size();
    }
-   
+
    public synchronized void broadcastConnectors() throws Exception
    {
       // TODO - for now we just use plain serialization to serialize the transport configs
@@ -149,16 +156,18 @@
 
       ObjectOutputStream oos = new ObjectOutputStream(bos);
       
+      oos.writeUTF(nodeID);
+
       oos.writeInt(connectorPairs.size());
-
+      
       for (Pair<TransportConfiguration, TransportConfiguration> connectorPair : connectorPairs)
       {
          oos.writeObject(connectorPair.a);
-         
+
          if (connectorPair.b != null)
          {
             oos.writeBoolean(true);
-            
+
             oos.writeObject(connectorPair.b);
          }
          else
@@ -172,7 +181,7 @@
       byte[] data = bos.toByteArray();
 
       DatagramPacket packet = new DatagramPacket(data, data.length, groupAddress, groupPort);
-      
+
       socket.send(packet);
    }
 
@@ -182,9 +191,9 @@
       {
          return;
       }
-      
+
       try
-      {      
+      {
          broadcastConnectors();
       }
       catch (Exception e)
@@ -192,10 +201,47 @@
          log.error("Failed to broadcast connector configs");
       }
    }
-   
+
    public synchronized void setScheduledFuture(final ScheduledFuture<?> future)
    {
       this.future = future;
    }
+   
+   private String replaceWildcardChars(final String str)
+   {
+      return str.replace('.', '-');
+   }
 
+   private SimpleString generateConnectorString(final TransportConfiguration config) throws Exception
+   {
+      StringBuilder str = new StringBuilder(replaceWildcardChars(config.getFactoryClassName()));
+
+      if (config.getParams() != null)
+      {
+         if (!config.getParams().isEmpty())
+         {
+            str.append("?");
+         }
+
+         boolean first = true;
+         for (Map.Entry<String, Object> entry : config.getParams().entrySet())
+         {
+            if (!first)
+            {
+               str.append("&");
+            }
+            String encodedKey = replaceWildcardChars(entry.getKey());
+
+            String val = entry.getValue().toString();
+            String encodedVal = replaceWildcardChars(val);
+
+            str.append(encodedKey).append('=').append(encodedVal);
+
+            first = false;
+         }
+      }
+
+      return new SimpleString(str.toString());
+   }
+
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java	2009-02-13 11:59:51 UTC (rev 5859)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java	2009-02-13 15:03:51 UTC (rev 5860)
@@ -31,6 +31,7 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.jboss.messaging.core.client.ClientMessage;
 import org.jboss.messaging.core.client.management.impl.ManagementHelper;
@@ -183,7 +184,7 @@
                                 final ManagementService managementService,
                                 final ScheduledExecutorService scheduledExecutor,
                                 final QueueFactory queueFactory,
-                                final DiscoveryGroup discoveryGroup,                               
+                                final DiscoveryGroup discoveryGroup,                                
                                 final int maxHops,
                                 final SimpleString nodeID) throws Exception
    {
@@ -231,8 +232,6 @@
 
       if (discoveryGroup != null)
       {
-         updateConnectors(discoveryGroup.getConnectors());
-
          discoveryGroup.registerListener(this);
       }
 
@@ -245,7 +244,7 @@
       {
          return;
       }
-
+      
       if (discoveryGroup != null)
       {
          discoveryGroup.unregisterListener(this);
@@ -271,7 +270,7 @@
 
    // DiscoveryListener implementation ------------------------------------------------------------------
 
-   public void connectorsChanged()
+   public synchronized void connectorsChanged()
    {
       try
       {
@@ -284,7 +283,7 @@
          log.error("Failed to update connectors", e);
       }
    }
-
+   
    private void updateConnectors(final List<Pair<TransportConfiguration, TransportConfiguration>> connectors) throws Exception
    {
       Set<Pair<TransportConfiguration, TransportConfiguration>> connectorSet = new HashSet<Pair<TransportConfiguration, TransportConfiguration>>();
@@ -314,7 +313,7 @@
          if (!records.containsKey(connectorPair))
          {
             SimpleString queueName = generateQueueName(name, connectorPair);
-
+            
             Binding queueBinding = postOffice.getBinding(queueName);
 
             Queue queue;

Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java	2009-02-13 11:59:51 UTC (rev 5859)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java	2009-02-13 15:03:51 UTC (rev 5860)
@@ -90,7 +90,7 @@
    private final Configuration configuration;
 
    private final QueueFactory queueFactory;
-   
+
    private final SimpleString nodeID;
 
    private volatile boolean started;
@@ -117,7 +117,7 @@
       this.configuration = configuration;
 
       this.queueFactory = queueFactory;
-      
+
       this.nodeID = nodeID;
    }
 
@@ -215,7 +215,8 @@
 
       InetAddress groupAddress = InetAddress.getByName(config.getGroupAddress());
 
-      BroadcastGroupImpl group = new BroadcastGroupImpl(config.getName(),
+      BroadcastGroupImpl group = new BroadcastGroupImpl(nodeID.toString(),
+                                                        config.getName(),
                                                         localBindAddress,
                                                         config.getLocalBindPort(),
                                                         groupAddress,
@@ -283,7 +284,8 @@
 
       InetAddress groupAddress = InetAddress.getByName(config.getGroupAddress());
 
-      DiscoveryGroup group = new DiscoveryGroupImpl(config.getName(),
+      DiscoveryGroup group = new DiscoveryGroupImpl(nodeID.toString(),
+                                                    config.getName(),
                                                     groupAddress,
                                                     config.getGroupPort(),
                                                     config.getRefreshTimeout());
@@ -375,9 +377,9 @@
          bridge = new BridgeImpl(new SimpleString(config.getName()),
                                  queue,
                                  pair,
-                                 executorFactory.getExecutor(),                             
+                                 executorFactory.getExecutor(),
                                  config.getFilterString() == null ? null : new SimpleString(config.getFilterString()),
-                                 new SimpleString(config.getForwardingAddress()),                               
+                                 new SimpleString(config.getForwardingAddress()),
                                  scheduledExecutor,
                                  transformer,
                                  config.getRetryInterval(),
@@ -466,7 +468,7 @@
                                                        managementService,
                                                        scheduledExecutor,
                                                        queueFactory,
-                                                       connectors,                                                  
+                                                       connectors,
                                                        config.getMaxHops(),
                                                        nodeID);
       }
@@ -494,7 +496,7 @@
                                                        managementService,
                                                        scheduledExecutor,
                                                        queueFactory,
-                                                       dg,                                                  
+                                                       dg,
                                                        config.getMaxHops(),
                                                        nodeID);
       }

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java	2009-02-13 11:59:51 UTC (rev 5859)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java	2009-02-13 15:03:51 UTC (rev 5860)
@@ -39,8 +39,9 @@
 import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
 import org.jboss.messaging.core.config.Configuration;
 import org.jboss.messaging.core.config.TransportConfiguration;
-import org.jboss.messaging.core.config.cluster.BridgeConfiguration;
+import org.jboss.messaging.core.config.cluster.BroadcastGroupConfiguration;
 import org.jboss.messaging.core.config.cluster.ClusterConnectionConfiguration;
+import org.jboss.messaging.core.config.cluster.DiscoveryGroupConfiguration;
 import org.jboss.messaging.core.config.impl.ConfigurationImpl;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.postoffice.Binding;
@@ -160,15 +161,15 @@
             }
          }
 
-      //    log.info(node + " binding count " + bindingCount + " consumer Count " + totConsumers);
+         //log.info(node + " binding count " + bindingCount + " consumer Count " + totConsumers);
 
          if (bindingCount == count && totConsumers == consumerCount)
          {
-            //log.info("Waited " + (System.currentTimeMillis() - start));
+            // log.info("Waited " + (System.currentTimeMillis() - start));
             return;
          }
 
-         Thread.sleep(10);
+         Thread.sleep(100);
       }
       while (System.currentTimeMillis() - start < WAIT_TIMEOUT);
 
@@ -418,7 +419,7 @@
    {
       verifyReceiveRoundRobinInSomeOrder(true, numMessages, consumerIDs);
    }
-      
+
    protected void verifyReceiveRoundRobinInSomeOrder(boolean ack, int numMessages, int... consumerIDs) throws Exception
    {
       Map<Integer, Integer> countMap = new HashMap<Integer, Integer>();
@@ -455,7 +456,7 @@
                counts.add(count);
 
                countMap.put(i, count);
-               
+
                if (ack)
                {
                   message.acknowledge();
@@ -470,12 +471,12 @@
          assertTrue(counts.contains(i));
       }
    }
-   
+
    protected void verifyReceiveRoundRobinInSomeOrderNoAck(int numMessages, int... consumerIDs) throws Exception
    {
       verifyReceiveRoundRobinInSomeOrder(false, numMessages, consumerIDs);
    }
-   
+
    protected void verifyNotReceive(int... consumerIDs) throws Exception
    {
       for (int i = 0; i < consumerIDs.length; i++)
@@ -563,6 +564,81 @@
       services[node] = service;
    }
 
+   protected void setupServerWithDiscovery(int node, String groupAddress, int port, boolean fileStorage, boolean netty)
+   {
+      if (services[node] != null)
+      {
+         throw new IllegalArgumentException("Already a service at node " + node);
+      }
+
+      Configuration configuration = new ConfigurationImpl();
+
+      configuration.setSecurityEnabled(false);
+      configuration.setBindingsDirectory(getBindingsDir(node));
+      configuration.setJournalMinFiles(2);
+      configuration.setJournalDirectory(getJournalDir(node));
+      configuration.setJournalFileSize(100 * 1024);
+      configuration.setPagingDirectory(getPageDir(node));
+      configuration.setLargeMessagesDirectory(getLargeMessagesDir(node));
+      configuration.setClustered(true);
+
+      configuration.getAcceptorConfigurations().clear();
+
+      Map<String, Object> params = generateParams(node, netty);
+
+      TransportConfiguration invmtc = new TransportConfiguration(INVM_ACCEPTOR_FACTORY, params);
+      configuration.getAcceptorConfigurations().add(invmtc);
+
+      if (netty)
+      {
+         TransportConfiguration nettytc = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params);
+         configuration.getAcceptorConfigurations().add(nettytc);
+      }
+
+      TransportConfiguration invmtc_c = new TransportConfiguration(INVM_CONNECTOR_FACTORY, params);
+      configuration.getConnectorConfigurations().put(invmtc_c.getName(), invmtc_c);
+
+      List<Pair<String, String>> connectorPairs = new ArrayList<Pair<String, String>>();
+
+      if (netty)
+      {
+         TransportConfiguration nettytc_c = new TransportConfiguration(NETTY_CONNECTOR_FACTORY, params);
+         configuration.getConnectorConfigurations().put(nettytc_c.getName(), nettytc_c);
+
+         connectorPairs.add(new Pair<String, String>(nettytc_c.getName(), null));
+      }
+      else
+      {
+         connectorPairs.add(new Pair<String, String>(invmtc_c.getName(), null));
+      }
+
+      BroadcastGroupConfiguration bcConfig = new BroadcastGroupConfiguration("bg1",
+                                                                             null,
+                                                                             -1,
+                                                                             groupAddress,
+                                                                             port,
+                                                                             250,
+                                                                             connectorPairs);
+
+      configuration.getBroadcastGroupConfigurations().add(bcConfig);
+
+      DiscoveryGroupConfiguration dcConfig = new DiscoveryGroupConfiguration("dg1", groupAddress, port, 500);
+
+      configuration.getDiscoveryGroupConfigurations().put(dcConfig.getName(), dcConfig);
+
+      MessagingService service;
+
+      if (fileStorage)
+      {
+         service = Messaging.newMessagingService(configuration);
+      }
+      else
+      {
+         service = Messaging.newNullStorageMessagingService(configuration);
+      }
+      services[node] = service;
+   }
+
    protected Map<String, Object> generateParams(int node, boolean netty)
    {
       Map<String, Object> params = new HashMap<String, Object>();
@@ -605,9 +681,9 @@
          throw new IllegalStateException("No service at node " + nodeFrom);
       }
 
-      Map<String, TransportConfiguration> connectors = serviceFrom.getServer()
-                                                                  .getConfiguration()
-                                                                  .getConnectorConfigurations();
+      // Map<String, TransportConfiguration> connectors = serviceFrom.getServer()
+      // .getConfiguration()
+      // .getConnectorConfigurations();
 
       Map<String, Object> params = generateParams(nodeTo, netty);
 
@@ -622,9 +698,9 @@
          serverTotc = new TransportConfiguration(INVM_CONNECTOR_FACTORY, params);
       }
 
-      connectors.put(serverTotc.getName(), serverTotc);
+      serviceFrom.getServer().getConfiguration().getConnectorConfigurations().put(serverTotc.getName(), serverTotc);
 
-      serviceFrom.getServer().getConfiguration().setConnectorConfigurations(connectors);
+      // serviceFrom.getServer().getConfiguration().setConnectorConfigurations(connectors);
 
       Pair<String, String> connectorPair = new Pair<String, String>(serverTotc.getName(), null);
 
@@ -641,15 +717,162 @@
                                                                                       forwardWhenNoConsumers,
                                                                                       maxHops,
                                                                                       pairs);
-      List<ClusterConnectionConfiguration> clusterConfs = serviceFrom.getServer()
-                                                                     .getConfiguration()
-                                                                     .getClusterConfigurations();
+      serviceFrom.getServer().getConfiguration().getClusterConfigurations().add(clusterConf);
 
-      clusterConfs.add(clusterConf);
+      // clusterConfs.add(clusterConf);
 
-      serviceFrom.getServer().getConfiguration().setClusterConfigurations(clusterConfs);
+      // serviceFrom.getServer().getConfiguration().setClusterConfigurations(clusterConfs);
    }
 
+   // protected void setupClusterConnection(String name,
+   // int nodeFrom,
+   // int nodeTo,
+   // String address,
+   // boolean forwardWhenNoConsumers,
+   // int maxHops,
+   // boolean netty)
+   // {
+   // MessagingService serviceFrom = services[nodeFrom];
+   //
+   // if (serviceFrom == null)
+   // {
+   // throw new IllegalStateException("No service at node " + nodeFrom);
+   // }
+   //
+   // Map<String, TransportConfiguration> connectors = serviceFrom.getServer()
+   // .getConfiguration()
+   // .getConnectorConfigurations();
+   //
+   // Map<String, Object> params = generateParams(nodeTo, netty);
+   //
+   // TransportConfiguration serverTotc;
+   //
+   // if (netty)
+   // {
+   // serverTotc = new TransportConfiguration(NETTY_CONNECTOR_FACTORY, params);
+   // }
+   // else
+   // {
+   // serverTotc = new TransportConfiguration(INVM_CONNECTOR_FACTORY, params);
+   // }
+   //
+   // connectors.put(serverTotc.getName(), serverTotc);
+   //
+   // serviceFrom.getServer().getConfiguration().setConnectorConfigurations(connectors);
+   //
+   // Pair<String, String> connectorPair = new Pair<String, String>(serverTotc.getName(), null);
+   //
+   // List<Pair<String, String>> pairs = new ArrayList<Pair<String, String>>();
+   // pairs.add(connectorPair);
+   //
+   // ClusterConnectionConfiguration clusterConf = new ClusterConnectionConfiguration(name,
+   // address,
+   // 100,
+   // 1d,
+   // -1,
+   // -1,
+   // true,
+   // forwardWhenNoConsumers,
+   // maxHops,
+   // pairs);
+   // List<ClusterConnectionConfiguration> clusterConfs = serviceFrom.getServer()
+   // .getConfiguration()
+   // .getClusterConfigurations();
+   //
+   // clusterConfs.add(clusterConf);
+   //
+   // serviceFrom.getServer().getConfiguration().setClusterConfigurations(clusterConfs);
+   // }
+
+   protected void setupClusterConnection(String name,
+                                         String address,
+                                         boolean forwardWhenNoConsumers,
+                                         int maxHops,
+                                         boolean netty,
+                                         int nodeFrom,
+                                         int... nodesTo)
+   {
+      MessagingService serviceFrom = services[nodeFrom];
+
+      if (serviceFrom == null)
+      {
+         throw new IllegalStateException("No service at node " + nodeFrom);
+      }
+
+      Map<String, TransportConfiguration> connectors = serviceFrom.getServer()
+                                                                  .getConfiguration()
+                                                                  .getConnectorConfigurations();
+
+      List<Pair<String, String>> pairs = new ArrayList<Pair<String, String>>();
+
+      for (int i = 0; i < nodesTo.length; i++)
+      {
+         Map<String, Object> params = generateParams(nodesTo[i], netty);
+
+         TransportConfiguration serverTotc;
+
+         if (netty)
+         {
+            serverTotc = new TransportConfiguration(NETTY_CONNECTOR_FACTORY, params);
+         }
+         else
+         {
+            serverTotc = new TransportConfiguration(INVM_CONNECTOR_FACTORY, params);
+         }
+
+         connectors.put(serverTotc.getName(), serverTotc);
+
+         Pair<String, String> connectorPair = new Pair<String, String>(serverTotc.getName(), null);
+
+         pairs.add(connectorPair);
+      }
+
+      ClusterConnectionConfiguration clusterConf = new ClusterConnectionConfiguration(name,
+                                                                                      address,
+                                                                                      100,
+                                                                                      1d,
+                                                                                      -1,
+                                                                                      -1,
+                                                                                      true,
+                                                                                      forwardWhenNoConsumers,
+                                                                                      maxHops,
+                                                                                      pairs);
+
+      serviceFrom.getServer().getConfiguration().getClusterConfigurations().add(clusterConf);
+   }
+
+   protected void setupDiscoveryClusterConnection(String name,
+                                                  int node,
+                                                  String discoveryGroupName,
+                                                  String address,
+                                                  boolean forwardWhenNoConsumers,
+                                                  int maxHops,
+                                                  boolean netty)
+   {
+      MessagingService service = services[node];
+
+      if (service == null)
+      {
+         throw new IllegalStateException("No service at node " + node);
+      }
+
+      ClusterConnectionConfiguration clusterConf = new ClusterConnectionConfiguration(name,
+                                                                                      address,
+                                                                                      100,
+                                                                                      1d,
+                                                                                      -1,
+                                                                                      -1,
+                                                                                      true,
+                                                                                      forwardWhenNoConsumers,
+                                                                                      maxHops,
+                                                                                      discoveryGroupName);
+      List<ClusterConnectionConfiguration> clusterConfs = service.getServer()
+                                                                 .getConfiguration()
+                                                                 .getClusterConfigurations();
+
+      clusterConfs.add(clusterConf);
+   }
+
    protected void startServers(int... nodes) throws Exception
    {
       for (int i = 0; i < nodes.length; i++)
@@ -661,10 +884,11 @@
    protected void stopServers(int... nodes) throws Exception
    {
       for (int i = 0; i < nodes.length; i++)
-      {
-         log.info("*** stopping server " + i);
-         services[nodes[i]].stop();
-         log.info("*** stopped server " + i);
+      {        
+         if (services[nodes[i]].isStarted())
+         {
+            services[nodes[i]].stop();
+         }  
       }
    }
 

Added: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/NettySymmetricClusterWithDiscoveryTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/NettySymmetricClusterWithDiscoveryTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/NettySymmetricClusterWithDiscoveryTest.java	2009-02-13 15:03:51 UTC (rev 5860)
@@ -0,0 +1,47 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.tests.integration.cluster.distribution;
+
+/**
+ * A NettySymmetricClusterWithDiscoveryTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * 
+ * Created 13 Feb 2009 13:52:03
+ *
+ *
+ */
+public class NettySymmetricClusterWithDiscoveryTest extends SymmetricClusterWithDiscoveryTest 
+{
+   protected boolean isNetty()
+   {
+      return true;
+   }
+
+   protected boolean isFileStorage()
+   {
+      return false;
+   }
+
+}

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java	2009-02-13 11:59:51 UTC (rev 5859)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java	2009-02-13 15:03:51 UTC (rev 5860)
@@ -69,6 +69,21 @@
       return false;
    }
    
+   /*
+    * make sure source can shutdown if target is never started
+    */
+   public void testNeverStartTargetStartSourceThenStopSource() throws Exception
+   {
+      setupClusterConnection("cluster1", 0, 1, "queues", false, 1, isNetty());
+      startServers(0);
+
+      //Give it a little time for the bridge to try to start
+      Thread.sleep(2000);
+      
+      log.info("Stopping server 0");
+      stopServers(0);            
+   }
+   
    public void testStartTargetServerBeforeSourceServer() throws Exception
    {
       setupClusterConnection("cluster1", 0, 1, "queues", false, 1, isNetty());

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SymmetricClusterTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SymmetricClusterTest.java	2009-02-13 11:59:51 UTC (rev 5859)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SymmetricClusterTest.java	2009-02-13 15:03:51 UTC (rev 5860)
@@ -42,24 +42,16 @@
    @Override
    protected void setUp() throws Exception
    {
-      super.setUp();
-
-      setupServer(0, isFileStorage(), isNetty());
-      setupServer(1, isFileStorage(), isNetty());
-      setupServer(2, isFileStorage(), isNetty());
-      setupServer(3, isFileStorage(), isNetty());
-      setupServer(4, isFileStorage(), isNetty());
+      super.setUp();     
+      
+      setupServers();
    }
-
+   
    @Override
    protected void tearDown() throws Exception
    {
-      closeAllConsumers();
+      stopServers();
 
-      closeAllSessionFactories();
-
-      stopServers(0, 1, 2, 3, 4);
-
       super.tearDown();
    }
 
@@ -1457,37 +1449,68 @@
       verifyNotReceive(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27);
    }
          
-   private void setupCluster() throws Exception
+   protected void setupCluster() throws Exception
    {
       setupCluster(false);
    }
    
-   private void setupCluster(final boolean forwardWhenNoConsumers) throws Exception
+   protected void setupCluster(final boolean forwardWhenNoConsumers) throws Exception
    {
-      setupClusterConnection("cluster0-1", 0, 1, "queues", forwardWhenNoConsumers, 1, isNetty());
-      setupClusterConnection("cluster0-2", 0, 2, "queues", forwardWhenNoConsumers, 1, isNetty());
-      setupClusterConnection("cluster0-3", 0, 3, "queues", forwardWhenNoConsumers, 1, isNetty());
-      setupClusterConnection("cluster0-4", 0, 4, "queues", forwardWhenNoConsumers, 1, isNetty());
+      setupClusterConnection("cluster0", "queues", forwardWhenNoConsumers, 1, isNetty(), 0, 1, 2, 3, 4);
 
-      setupClusterConnection("cluster1-0", 1, 0, "queues", forwardWhenNoConsumers, 1, isNetty());
-      setupClusterConnection("cluster1-2", 1, 2, "queues", forwardWhenNoConsumers, 1, isNetty());
-      setupClusterConnection("cluster1-3", 1, 3, "queues", forwardWhenNoConsumers, 1, isNetty());
-      setupClusterConnection("cluster1-4", 1, 4, "queues", forwardWhenNoConsumers, 1, isNetty());
+      setupClusterConnection("cluster1", "queues", forwardWhenNoConsumers, 1, isNetty(), 1, 0, 2, 3, 4);
 
-      setupClusterConnection("cluster2-0", 2, 0, "queues", forwardWhenNoConsumers, 1, isNetty());
-      setupClusterConnection("cluster2-1", 2, 1, "queues", forwardWhenNoConsumers, 1, isNetty());
-      setupClusterConnection("cluster2-3", 2, 3, "queues", forwardWhenNoConsumers, 1, isNetty());
-      setupClusterConnection("cluster2-4", 2, 4, "queues", forwardWhenNoConsumers, 1, isNetty());
+      setupClusterConnection("cluster2", "queues", forwardWhenNoConsumers, 1, isNetty(), 2, 0, 1, 3, 4);
 
-      setupClusterConnection("cluster3-0", 3, 0, "queues", forwardWhenNoConsumers, 1, isNetty());
-      setupClusterConnection("cluster3-1", 3, 1, "queues", forwardWhenNoConsumers, 1, isNetty());
-      setupClusterConnection("cluster3-2", 3, 2, "queues", forwardWhenNoConsumers, 1, isNetty());
-      setupClusterConnection("cluster3-4", 3, 4, "queues", forwardWhenNoConsumers, 1, isNetty());
+      setupClusterConnection("cluster3", "queues", forwardWhenNoConsumers, 1, isNetty(), 3, 0, 1, 2, 4);
 
-      setupClusterConnection("cluster4-0", 4, 0, "queues", forwardWhenNoConsumers, 1, isNetty());
-      setupClusterConnection("cluster4-1", 4, 1, "queues", forwardWhenNoConsumers, 1, isNetty());
-      setupClusterConnection("cluster4-2", 4, 2, "queues", forwardWhenNoConsumers, 1, isNetty());
-      setupClusterConnection("cluster4-3", 4, 3, "queues", forwardWhenNoConsumers, 1, isNetty());
+      setupClusterConnection("cluster4", "queues", forwardWhenNoConsumers, 1, isNetty(), 4, 0, 1, 2, 3);
    }
    
+   protected void setupServers() throws Exception
+   {
+      setupServer(0, isFileStorage(), isNetty());
+      setupServer(1, isFileStorage(), isNetty());
+      setupServer(2, isFileStorage(), isNetty());
+      setupServer(3, isFileStorage(), isNetty());
+      setupServer(4, isFileStorage(), isNetty());  
+   }
+   
+   protected void stopServers() throws Exception
+   {
+      closeAllConsumers();
+
+      closeAllSessionFactories();
+
+      stopServers(0, 1, 2, 3, 4);
+   }
+   
+//   private void setupCluster(final boolean forwardWhenNoConsumers) throws Exception
+//   {
+//      setupClusterConnection("cluster0-1", 0, 1, "queues", forwardWhenNoConsumers, 1, isNetty());
+//      setupClusterConnection("cluster0-2", 0, 2, "queues", forwardWhenNoConsumers, 1, isNetty());
+//      setupClusterConnection("cluster0-3", 0, 3, "queues", forwardWhenNoConsumers, 1, isNetty());
+//      setupClusterConnection("cluster0-4", 0, 4, "queues", forwardWhenNoConsumers, 1, isNetty());
+//
+//      setupClusterConnection("cluster1-0", 1, 0, "queues", forwardWhenNoConsumers, 1, isNetty());
+//      setupClusterConnection("cluster1-2", 1, 2, "queues", forwardWhenNoConsumers, 1, isNetty());
+//      setupClusterConnection("cluster1-3", 1, 3, "queues", forwardWhenNoConsumers, 1, isNetty());
+//      setupClusterConnection("cluster1-4", 1, 4, "queues", forwardWhenNoConsumers, 1, isNetty());
+//
+//      setupClusterConnection("cluster2-0", 2, 0, "queues", forwardWhenNoConsumers, 1, isNetty());
+//      setupClusterConnection("cluster2-1", 2, 1, "queues", forwardWhenNoConsumers, 1, isNetty());
+//      setupClusterConnection("cluster2-3", 2, 3, "queues", forwardWhenNoConsumers, 1, isNetty());
+//      setupClusterConnection("cluster2-4", 2, 4, "queues", forwardWhenNoConsumers, 1, isNetty());
+//
+//      setupClusterConnection("cluster3-0", 3, 0, "queues", forwardWhenNoConsumers, 1, isNetty());
+//      setupClusterConnection("cluster3-1", 3, 1, "queues", forwardWhenNoConsumers, 1, isNetty());
+//      setupClusterConnection("cluster3-2", 3, 2, "queues", forwardWhenNoConsumers, 1, isNetty());
+//      setupClusterConnection("cluster3-4", 3, 4, "queues", forwardWhenNoConsumers, 1, isNetty());
+//
+//      setupClusterConnection("cluster4-0", 4, 0, "queues", forwardWhenNoConsumers, 1, isNetty());
+//      setupClusterConnection("cluster4-1", 4, 1, "queues", forwardWhenNoConsumers, 1, isNetty());
+//      setupClusterConnection("cluster4-2", 4, 2, "queues", forwardWhenNoConsumers, 1, isNetty());
+//      setupClusterConnection("cluster4-3", 4, 3, "queues", forwardWhenNoConsumers, 1, isNetty());
+//   }
+   
 }

Added: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SymmetricClusterWithDiscoveryTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SymmetricClusterWithDiscoveryTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SymmetricClusterWithDiscoveryTest.java	2009-02-13 15:03:51 UTC (rev 5860)
@@ -0,0 +1,85 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.cluster.distribution;
+
+import org.jboss.messaging.core.logging.Logger;
+
+/**
+ * A SymmetricClusterWithDiscoveryTest
+ * 
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * 
+ * Created 3 Feb 2009 09:10:43
+ *
+ *
+ */
+public class SymmetricClusterWithDiscoveryTest extends SymmetricClusterTest
+{
+   private static final Logger log = Logger.getLogger(SymmetricClusterWithDiscoveryTest.class);
+   
+   private static final String groupAddress = "230.1.2.3";
+   
+   private static final int groupPort = 6745;
+
+   protected boolean isNetty()
+   {
+      return false;
+   }
+
+   protected boolean isFileStorage()
+   {
+      return false;
+   }
+
+   @Override
+   protected void setupCluster() throws Exception
+   {
+      setupCluster(false);
+   }
+
+   @Override
+   protected void setupCluster(final boolean forwardWhenNoConsumers) throws Exception
+   {
+      setupDiscoveryClusterConnection("cluster0", 0, "dg1", "queues", forwardWhenNoConsumers, 1, isNetty());
+      
+      setupDiscoveryClusterConnection("cluster1", 1, "dg1", "queues", forwardWhenNoConsumers, 1, isNetty());
+      
+      setupDiscoveryClusterConnection("cluster2", 2, "dg1", "queues", forwardWhenNoConsumers, 1, isNetty());
+      
+      setupDiscoveryClusterConnection("cluster3", 3, "dg1", "queues", forwardWhenNoConsumers, 1, isNetty());
+      
+      setupDiscoveryClusterConnection("cluster4", 4, "dg1", "queues", forwardWhenNoConsumers, 1, isNetty());            
+   }
+   
+   @Override
+   protected void setupServers() throws Exception
+   {
+      setupServerWithDiscovery(0, groupAddress, groupPort, isFileStorage(), isNetty());
+      setupServerWithDiscovery(1, groupAddress, groupPort, isFileStorage(), isNetty());
+      setupServerWithDiscovery(2, groupAddress, groupPort, isFileStorage(), isNetty());
+      setupServerWithDiscovery(3, groupAddress, groupPort, isFileStorage(), isNetty());
+      setupServerWithDiscovery(4, groupAddress, groupPort, isFileStorage(), isNetty()); 
+   }
+     
+
+}

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/discovery/DiscoveryTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/discovery/DiscoveryTest.java	2009-02-13 11:59:51 UTC (rev 5859)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/discovery/DiscoveryTest.java	2009-02-13 15:03:51 UTC (rev 5860)
@@ -69,7 +69,7 @@
       final int groupPort = 6745;
       final int timeout = 500;
 
-      BroadcastGroup bg = new BroadcastGroupImpl(randomString(), null, -1, groupAddress, groupPort);
+      BroadcastGroup bg = new BroadcastGroupImpl(randomString(), randomString(), null, -1, groupAddress, groupPort);
 
       bg.start();
 
@@ -82,7 +82,7 @@
 
       bg.addConnectorPair(connectorPair);
 
-      DiscoveryGroup dg = new DiscoveryGroupImpl(randomString(), groupAddress, groupPort, timeout);
+      DiscoveryGroup dg = new DiscoveryGroupImpl(randomString(), randomString(), groupAddress, groupPort, timeout);
 
       dg.start();
 
@@ -107,7 +107,50 @@
       dg.stop();
 
    }
+   
+   public void testIgnoreTrafficFromOwnNode() throws Exception
+   {
+      final InetAddress groupAddress = InetAddress.getByName(address1);
+      final int groupPort = 6745;
+      final int timeout = 500;
+      
+      String nodeID = randomString();
 
+      BroadcastGroup bg = new BroadcastGroupImpl(nodeID, randomString(), null, -1, groupAddress, groupPort);
+
+      bg.start();
+
+      TransportConfiguration live1 = generateTC();
+
+      TransportConfiguration backup1 = generateTC();
+
+      Pair<TransportConfiguration, TransportConfiguration> connectorPair = new Pair<TransportConfiguration, TransportConfiguration>(live1,
+                                                                                                                                    backup1);
+
+      bg.addConnectorPair(connectorPair);
+
+      DiscoveryGroup dg = new DiscoveryGroupImpl(nodeID, randomString(), groupAddress, groupPort, timeout);
+
+      dg.start();
+
+      bg.broadcastConnectors();
+
+      boolean ok = dg.waitForBroadcast(1000);
+
+      assertFalse(ok);
+
+      List<Pair<TransportConfiguration, TransportConfiguration>> connectors = dg.getConnectors();
+
+      assertNotNull(connectors);
+
+      assertEquals(0, connectors.size());
+
+      bg.stop();
+
+      dg.stop();
+
+   }
+
 // There is a bug in some OSes where different addresses but *Same port* will receive the traffic - hence this test won't pass
 //   See http://www.jboss.org/community/docs/DOC-11710 (jboss wiki promiscuous traffic)
    
@@ -118,7 +161,7 @@
 //      final int groupPort = 6745;
 //      final int timeout = 500;
 //
-//      BroadcastGroup bg = new BroadcastGroupImpl(randomString(), null, -1, groupAddress, groupPort);
+//      BroadcastGroup bg = new BroadcastGroupImpl(randomString(), randomString(), null, -1, groupAddress, groupPort);
 //
 //      bg.start();
 //
@@ -133,7 +176,7 @@
 //
 //      final InetAddress groupAddress2 = InetAddress.getByName(address2);
 //
-//      DiscoveryGroup dg = new DiscoveryGroupImpl(randomString(), groupAddress2, groupPort, timeout);
+//      DiscoveryGroup dg = new DiscoveryGroupImpl(randomString(), randomString(), groupAddress2, groupPort, timeout);
 //
 //      dg.start();
 //
@@ -155,7 +198,7 @@
       final int groupPort = 6745;
       final int timeout = 500;
 
-      BroadcastGroup bg = new BroadcastGroupImpl(randomString(), null, -1, groupAddress, groupPort);
+      BroadcastGroup bg = new BroadcastGroupImpl(randomString(), randomString(), null, -1, groupAddress, groupPort);
 
       bg.start();
 
@@ -170,7 +213,7 @@
 
       final int port2 = 6746;
 
-      DiscoveryGroup dg = new DiscoveryGroupImpl(randomString(), groupAddress, port2, timeout);
+      DiscoveryGroup dg = new DiscoveryGroupImpl(randomString(), randomString(), groupAddress, port2, timeout);
 
       dg.start();
 
@@ -191,7 +234,7 @@
       final int groupPort = 6745;
       final int timeout = 500;
 
-      BroadcastGroup bg = new BroadcastGroupImpl(randomString(), null, -1, groupAddress, groupPort);
+      BroadcastGroup bg = new BroadcastGroupImpl(randomString(), randomString(), null, -1, groupAddress, groupPort);
 
       bg.start();
 
@@ -207,7 +250,7 @@
       final InetAddress groupAddress2 = InetAddress.getByName(address2);
       final int port2 = 6746;
 
-      DiscoveryGroup dg = new DiscoveryGroupImpl(randomString(), groupAddress2, port2, timeout);
+      DiscoveryGroup dg = new DiscoveryGroupImpl(randomString(), randomString(), groupAddress2, port2, timeout);
 
       dg.start();
 
@@ -235,13 +278,13 @@
 
       final int timeout = 500;
 
-      BroadcastGroup bg1 = new BroadcastGroupImpl(randomString(), null, -1, groupAddress1, groupPort1);
+      BroadcastGroup bg1 = new BroadcastGroupImpl(randomString(), randomString(), null, -1, groupAddress1, groupPort1);
       bg1.start();
 
-      BroadcastGroup bg2 = new BroadcastGroupImpl(randomString(), null, -1, groupAddress2, groupPort2);
+      BroadcastGroup bg2 = new BroadcastGroupImpl(randomString(), randomString(), null, -1, groupAddress2, groupPort2);
       bg2.start();
 
-      BroadcastGroup bg3 = new BroadcastGroupImpl(randomString(), null, -1, groupAddress3, groupPort3);
+      BroadcastGroup bg3 = new BroadcastGroupImpl(randomString(), randomString(), null, -1, groupAddress3, groupPort3);
       bg3.start();
 
       TransportConfiguration live1 = generateTC();
@@ -266,13 +309,13 @@
       bg2.addConnectorPair(connectorPair2);
       bg3.addConnectorPair(connectorPair3);
 
-      DiscoveryGroup dg1 = new DiscoveryGroupImpl(randomString(), groupAddress1, groupPort1, timeout);
+      DiscoveryGroup dg1 = new DiscoveryGroupImpl(randomString(), randomString(), groupAddress1, groupPort1, timeout);
       dg1.start();
 
-      DiscoveryGroup dg2 = new DiscoveryGroupImpl(randomString(), groupAddress2, groupPort2, timeout);
+      DiscoveryGroup dg2 = new DiscoveryGroupImpl(randomString(), randomString(), groupAddress2, groupPort2, timeout);
       dg2.start();
 
-      DiscoveryGroup dg3 = new DiscoveryGroupImpl(randomString(), groupAddress3, groupPort3, timeout);
+      DiscoveryGroup dg3 = new DiscoveryGroupImpl(randomString(), randomString(), groupAddress3, groupPort3, timeout);
       dg3.start();
 
       bg1.broadcastConnectors();
@@ -320,7 +363,7 @@
       final int groupPort = 6745;
       final int timeout = 500;
 
-      BroadcastGroup bg = new BroadcastGroupImpl(randomString(), null, -1, groupAddress, groupPort);
+      BroadcastGroup bg = new BroadcastGroupImpl(randomString(), randomString(), null, -1, groupAddress, groupPort);
 
       bg.start();
 
@@ -331,7 +374,7 @@
 
       bg.addConnectorPair(connectorPair);
 
-      DiscoveryGroup dg = new DiscoveryGroupImpl(randomString(), groupAddress, groupPort, timeout);
+      DiscoveryGroup dg = new DiscoveryGroupImpl(randomString(), randomString(), groupAddress, groupPort, timeout);
 
       dg.start();
 
@@ -363,7 +406,7 @@
       final int groupPort = 6745;
       final int timeout = 500;
 
-      BroadcastGroup bg = new BroadcastGroupImpl(randomString(), null, -1, groupAddress, groupPort);
+      BroadcastGroup bg = new BroadcastGroupImpl(randomString(), randomString(), null, -1, groupAddress, groupPort);
 
       bg.start();
 
@@ -374,7 +417,7 @@
 
       bg.addConnectorPair(connectorPair);
 
-      DiscoveryGroup dg = new DiscoveryGroupImpl(randomString(), groupAddress, groupPort, timeout);
+      DiscoveryGroup dg = new DiscoveryGroupImpl(randomString(), randomString(), groupAddress, groupPort, timeout);
 
       MyListener listener1 = new MyListener();
       MyListener listener2 = new MyListener();
@@ -475,13 +518,13 @@
       final int groupPort = 6745;
       final int timeout = 500;
 
-      BroadcastGroup bg1 = new BroadcastGroupImpl(randomString(), null, -1, groupAddress, groupPort);
+      BroadcastGroup bg1 = new BroadcastGroupImpl(randomString(), randomString(), null, -1, groupAddress, groupPort);
       bg1.start();
 
-      BroadcastGroup bg2 = new BroadcastGroupImpl(randomString(), null, -1, groupAddress, groupPort);
+      BroadcastGroup bg2 = new BroadcastGroupImpl(randomString(), randomString(), null, -1, groupAddress, groupPort);
       bg2.start();
 
-      BroadcastGroup bg3 = new BroadcastGroupImpl(randomString(), null, -1, groupAddress, groupPort);
+      BroadcastGroup bg3 = new BroadcastGroupImpl(randomString(), randomString(), null, -1, groupAddress, groupPort);
       bg3.start();
 
       TransportConfiguration live1 = generateTC();
@@ -502,7 +545,7 @@
                                                                                                                                      backup3);
       bg3.addConnectorPair(connectorPair3);
 
-      DiscoveryGroup dg = new DiscoveryGroupImpl(randomString(), groupAddress, groupPort, timeout);
+      DiscoveryGroup dg = new DiscoveryGroupImpl(randomString(), randomString(), groupAddress, groupPort, timeout);
 
       MyListener listener1 = new MyListener();
       dg.registerListener(listener1);
@@ -715,7 +758,7 @@
       final int groupPort = 6745;
       final int timeout = 500;
 
-      BroadcastGroup bg = new BroadcastGroupImpl(randomString(), null, -1, groupAddress, groupPort);
+      BroadcastGroup bg = new BroadcastGroupImpl(randomString(), randomString(), null, -1, groupAddress, groupPort);
 
       bg.start();
 
@@ -727,11 +770,11 @@
 
       bg.addConnectorPair(connectorPair1);
 
-      DiscoveryGroup dg1 = new DiscoveryGroupImpl(randomString(), groupAddress, groupPort, timeout);
+      DiscoveryGroup dg1 = new DiscoveryGroupImpl(randomString(), randomString(), groupAddress, groupPort, timeout);
 
-      DiscoveryGroup dg2 = new DiscoveryGroupImpl(randomString(), groupAddress, groupPort, timeout);
+      DiscoveryGroup dg2 = new DiscoveryGroupImpl(randomString(), randomString(), groupAddress, groupPort, timeout);
 
-      DiscoveryGroup dg3 = new DiscoveryGroupImpl(randomString(), groupAddress, groupPort, timeout);
+      DiscoveryGroup dg3 = new DiscoveryGroupImpl(randomString(), randomString(), groupAddress, groupPort, timeout);
 
       dg1.start();
       dg2.start();




More information about the jboss-cvs-commits mailing list