[jboss-cvs] JBoss Messaging SVN: r6297 - in trunk: examples/jms/expiry/src/org/jboss/jms/example and 21 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Apr 3 10:08:00 EDT 2009


Author: timfox
Date: 2009-04-03 10:08:00 -0400 (Fri, 03 Apr 2009)
New Revision: 6297

Added:
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/DiscoveryClusterWithBackupFailoverTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FileStorageClusterWithBackupFailoverTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/NettyClusterWithBackupFailoverTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/NettyFileStorageClusterWithBackupFailoverTest.java
Modified:
   trunk/examples/jms/browser/src/org/jboss/jms/example/QueueBrowserExample.java
   trunk/examples/jms/expiry/src/org/jboss/jms/example/ExpiryExample.java
   trunk/examples/jms/temp-queue/src/org/jboss/jms/example/TemporaryQueueExample.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
   trunk/src/main/org/jboss/messaging/core/cluster/DiscoveryGroup.java
   trunk/src/main/org/jboss/messaging/core/cluster/impl/DiscoveryGroupImpl.java
   trunk/src/main/org/jboss/messaging/core/config/TransportConfiguration.java
   trunk/src/main/org/jboss/messaging/core/management/Notification.java
   trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java
   trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
   trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java
   trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
   trunk/src/main/org/jboss/messaging/core/postoffice/impl/SimpleAddressManager.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateQueueMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/server/RemotingService.java
   trunk/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java
   trunk/src/main/org/jboss/messaging/core/security/impl/SecurityStoreImpl.java
   trunk/src/main/org/jboss/messaging/core/server/Messaging.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/Bridge.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/MessageFlowRecord.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BroadcastGroupImpl.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/impl/RemoteQueueBindingImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
   trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SymmetricClusterWithDiscoveryTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/ClusterWithBackupFailoverTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/discovery/DiscoveryTest.java
   trunk/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java
   trunk/tests/src/org/jboss/messaging/tests/util/UnitTestCase.java
Log:
more clustering

Modified: trunk/examples/jms/browser/src/org/jboss/jms/example/QueueBrowserExample.java
===================================================================
--- trunk/examples/jms/browser/src/org/jboss/jms/example/QueueBrowserExample.java	2009-04-03 14:02:02 UTC (rev 6296)
+++ trunk/examples/jms/browser/src/org/jboss/jms/example/QueueBrowserExample.java	2009-04-03 14:08:00 UTC (rev 6297)
@@ -63,7 +63,7 @@
          // Step 3. Perform a lookup on the Connection Factory
          ConnectionFactory cf = (ConnectionFactory)initialContext.lookup("/ConnectionFactory");
 
-         // Step 4.Create a JMS Connection
+         // Step 4. Create a JMS Connection
          connection = cf.createConnection();
 
          // Step 5. Create a JMS Session

Modified: trunk/examples/jms/expiry/src/org/jboss/jms/example/ExpiryExample.java
===================================================================
--- trunk/examples/jms/expiry/src/org/jboss/jms/example/ExpiryExample.java	2009-04-03 14:02:02 UTC (rev 6296)
+++ trunk/examples/jms/expiry/src/org/jboss/jms/example/ExpiryExample.java	2009-04-03 14:08:00 UTC (rev 6297)
@@ -21,9 +21,6 @@
    */
 package org.jboss.jms.example;
 
-import java.util.HashSet;
-import java.util.Set;
-
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.MessageConsumer;

Modified: trunk/examples/jms/temp-queue/src/org/jboss/jms/example/TemporaryQueueExample.java
===================================================================
--- trunk/examples/jms/temp-queue/src/org/jboss/jms/example/TemporaryQueueExample.java	2009-04-03 14:02:02 UTC (rev 6296)
+++ trunk/examples/jms/temp-queue/src/org/jboss/jms/example/TemporaryQueueExample.java	2009-04-03 14:08:00 UTC (rev 6297)
@@ -74,7 +74,7 @@
 
          // Step 8. Create a text message
          TextMessage message = session.createTextMessage("This is a text message");
-
+         
          // Step 9. Send the text message to the queue
          messageProducer.send(message);
 

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java	2009-04-03 14:02:02 UTC (rev 6296)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java	2009-04-03 14:08:00 UTC (rev 6297)
@@ -21,6 +21,7 @@
 
 import org.jboss.messaging.core.client.ClientSession;
 import org.jboss.messaging.core.client.ConnectionLoadBalancingPolicy;
+import org.jboss.messaging.core.cluster.DiscoveryEntry;
 import org.jboss.messaging.core.cluster.DiscoveryGroup;
 import org.jboss.messaging.core.cluster.DiscoveryListener;
 import org.jboss.messaging.core.cluster.impl.DiscoveryGroupImpl;
@@ -90,10 +91,9 @@
    public static final double DEFAULT_RETRY_INTERVAL_MULTIPLIER = 1d;
 
    public static final int DEFAULT_RECONNECT_ATTEMPTS = 0;
-   
+
    public static final boolean DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN = false;
 
-
    // Attributes
    // -----------------------------------------------------------------------------------
 
@@ -149,7 +149,7 @@
    private final double retryIntervalMultiplier; // For exponential backoff
 
    private final int reconnectAttempts;
-   
+
    private final boolean failoverOnServerShutdown;
 
    // Static
@@ -442,7 +442,7 @@
 
    public ClientSessionFactoryImpl(final TransportConfiguration connectorConfig,
                                    final TransportConfiguration backupConfig)
-   {      
+   {
       this.loadBalancingPolicy = new FirstElementConnectionLoadBalancingPolicy();
       this.pingPeriod = DEFAULT_PING_PERIOD;
       this.callTimeout = DEFAULT_CALL_TIMEOUT;
@@ -780,12 +780,15 @@
    public synchronized void connectorsChanged()
    {
       receivedBroadcast = true;
+            
+      Map<String, DiscoveryEntry> newConnectors = discoveryGroup.getDiscoveryEntryMap();
 
-      List<Pair<TransportConfiguration, TransportConfiguration>> newConnectors = discoveryGroup.getConnectors();
-
       Set<Pair<TransportConfiguration, TransportConfiguration>> connectorSet = new HashSet<Pair<TransportConfiguration, TransportConfiguration>>();
 
-      connectorSet.addAll(newConnectors);
+      for (DiscoveryEntry entry : newConnectors.values())
+      {
+         connectorSet.add(entry.getConnectorPair());
+      }
 
       Iterator<Map.Entry<Pair<TransportConfiguration, TransportConfiguration>, ConnectionManager>> iter = connectionManagerMap.entrySet()
                                                                                                                               .iterator();
@@ -802,7 +805,7 @@
          }
       }
 
-      for (Pair<TransportConfiguration, TransportConfiguration> connectorPair : newConnectors)
+      for (Pair<TransportConfiguration, TransportConfiguration> connectorPair : connectorSet)
       {
          if (!connectionManagerMap.containsKey(connectorPair))
          {

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2009-04-03 14:02:02 UTC (rev 6296)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2009-04-03 14:08:00 UTC (rev 6297)
@@ -257,7 +257,7 @@
    {
       createQueue(toSimpleString(address), toSimpleString(queueName), toSimpleString(filterString), durable);
    }
-   
+
    public void createTemporaryQueue(SimpleString address, SimpleString queueName) throws MessagingException
    {
       internalCreateQueue(address, queueName, null, false, true);
@@ -267,7 +267,7 @@
    {
       internalCreateQueue(toSimpleString(address), toSimpleString(queueName), null, false, true);
    }
-   
+
    public void createTemporaryQueue(SimpleString address, SimpleString queueName, SimpleString filter) throws MessagingException
    {
       internalCreateQueue(address, queueName, filter, false, true);
@@ -278,7 +278,7 @@
       internalCreateQueue(toSimpleString(address), toSimpleString(queueName), toSimpleString(filter), false, true);
    }
 
-public void deleteQueue(final SimpleString queueName) throws MessagingException
+   public void deleteQueue(final SimpleString queueName) throws MessagingException
    {
       checkClosed();
 
@@ -630,7 +630,7 @@
          started = false;
       }
    }
-   
+
    public void addFailureListener(final FailureListener listener)
    {
       remotingConnection.addFailureListener(listener);
@@ -739,7 +739,7 @@
          consumer.handleLargeMessageContinuation(continuation);
       }
    }
-   
+
    public void close() throws MessagingException
    {
       if (closed)
@@ -1246,7 +1246,9 @@
                                                                consumerID,
                                                                clientWindowSize,
                                                                ackBatchSize,
-                                                               consumerMaxRate > 0 ? new TokenBucketLimiterImpl(maxRate, false) : null, 
+                                                               consumerMaxRate > 0 ? new TokenBucketLimiterImpl(maxRate,
+                                                                                                                false)
+                                                                                  : null,
                                                                executor,
                                                                channel,
                                                                directory);
@@ -1288,24 +1290,22 @@
    }
 
    private void internalCreateQueue(final SimpleString address,
-                            final SimpleString queueName,
-                            final SimpleString filterString,
-                            final boolean durable,
-                            final boolean temp) throws MessagingException
-                            {
+                                    final SimpleString queueName,
+                                    final SimpleString filterString,
+                                    final boolean durable,
+                                    final boolean temp) throws MessagingException
+   {
       checkClosed();
-
-
+      
       if (durable && temp)
       {
-         throw new MessagingException(MessagingException.INTERNAL_ERROR,
-         "Queue can not be both durable and temporay");
+         throw new MessagingException(MessagingException.INTERNAL_ERROR, "Queue can not be both durable and temporay");
       }
 
       CreateQueueMessage request = new CreateQueueMessage(address, queueName, filterString, durable, temp);
 
       channel.sendBlocking(request);
-                            }
+   }
 
    private void checkXA() throws XAException
    {

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java	2009-04-03 14:02:02 UTC (rev 6296)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java	2009-04-03 14:08:00 UTC (rev 6297)
@@ -265,7 +265,6 @@
                      if (!failureSignalled)
                      {
                         // This can happen if the connection manager gets closed - e.g. the server gets shut down
-                        //return null;
                         
                         throw new MessagingException(MessagingException.NOT_CONNECTED, "Unable to connect to server");
                      }
@@ -572,7 +571,7 @@
             if (attemptFailover)
             {
                // Now try failing over to backup
-
+               
                connectorFactory = backupConnectorFactory;
 
                transportParams = backupTransportParams;
@@ -734,7 +733,7 @@
 
                   return null;
                }
-
+               
                try
                {
                   Thread.sleep(interval);

Modified: trunk/src/main/org/jboss/messaging/core/cluster/DiscoveryGroup.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/cluster/DiscoveryGroup.java	2009-04-03 14:02:02 UTC (rev 6296)
+++ trunk/src/main/org/jboss/messaging/core/cluster/DiscoveryGroup.java	2009-04-03 14:08:00 UTC (rev 6297)
@@ -23,11 +23,9 @@
 
 package org.jboss.messaging.core.cluster;
 
-import java.util.List;
+import java.util.Map;
 
-import org.jboss.messaging.core.config.TransportConfiguration;
 import org.jboss.messaging.core.server.MessagingComponent;
-import org.jboss.messaging.utils.Pair;
 
 /**
  * A DiscoveryGroup
@@ -42,7 +40,7 @@
 {
    String getName();
 
-   List<Pair<TransportConfiguration, TransportConfiguration>> getConnectors();
+   Map<String, DiscoveryEntry> getDiscoveryEntryMap();
    
    boolean waitForBroadcast(long timeout);
    

Modified: trunk/src/main/org/jboss/messaging/core/cluster/impl/DiscoveryGroupImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/cluster/impl/DiscoveryGroupImpl.java	2009-04-03 14:02:02 UTC (rev 6296)
+++ trunk/src/main/org/jboss/messaging/core/cluster/impl/DiscoveryGroupImpl.java	2009-04-03 14:08:00 UTC (rev 6297)
@@ -33,6 +33,7 @@
 import java.util.Map;
 
 import org.jboss.messaging.core.buffers.ChannelBuffers;
+import org.jboss.messaging.core.cluster.DiscoveryEntry;
 import org.jboss.messaging.core.cluster.DiscoveryGroup;
 import org.jboss.messaging.core.cluster.DiscoveryListener;
 import org.jboss.messaging.core.config.TransportConfiguration;
@@ -66,16 +67,16 @@
 
    private final Object waitLock = new Object();
 
-   private final Map<Pair<TransportConfiguration, TransportConfiguration>, Long> connectors = new HashMap<Pair<TransportConfiguration, TransportConfiguration>, Long>();
+   private final Map<String, DiscoveryEntry> connectors = new HashMap<String, DiscoveryEntry>();
 
    private final long timeout;
 
    private volatile boolean started;
 
    private final String nodeID;
-   
+
    private final InetAddress groupAddress;
-   
+
    private final int groupPort;
 
    public DiscoveryGroupImpl(final String nodeID,
@@ -86,12 +87,12 @@
    {
       this.nodeID = nodeID;
 
-      this.name = name;     
+      this.name = name;
 
-      this.timeout = timeout;     
-      
+      this.timeout = timeout;
+
       this.groupAddress = groupAddress;
-      
+
       this.groupPort = groupPort;
    }
 
@@ -101,7 +102,7 @@
       {
          return;
       }
-      
+
       socket = new MulticastSocket(groupPort);
 
       socket.joinGroup(groupAddress);
@@ -109,7 +110,7 @@
       socket.setSoTimeout(SOCKET_TIMEOUT);
 
       started = true;
-      
+
       thread = new Thread(this);
 
       thread.setDaemon(true);
@@ -138,9 +139,9 @@
       }
 
       socket.close();
-      
+
       socket = null;
-      
+
       thread = null;
    }
 
@@ -154,9 +155,9 @@
       return name;
    }
 
-   public synchronized List<Pair<TransportConfiguration, TransportConfiguration>> getConnectors()
+   public synchronized Map<String, DiscoveryEntry> getDiscoveryEntryMap()
    {
-      return new ArrayList<Pair<TransportConfiguration, TransportConfiguration>>(connectors.keySet());
+      return new HashMap<String, DiscoveryEntry>(connectors);
    }
 
    public boolean waitForBroadcast(final long timeout)
@@ -191,6 +192,83 @@
          return ret;
       }
    }
+   
+   private static class UniqueIDEntry
+   {
+      String uniqueID;
+      
+      boolean changed;
+      
+      UniqueIDEntry(final String uniqueID)
+      {
+         this.uniqueID = uniqueID;
+      }
+      
+      boolean isChanged()
+      {
+         return changed;
+      }
+      
+      void setChanged()
+      {
+         changed = true;
+      }
+      
+      String getUniqueID()
+      {
+         return uniqueID;
+      }
+      
+      void setUniqueID(final String uniqueID)
+      {
+         this.uniqueID = uniqueID;
+      }      
+   }
+   
+   private Map<String, UniqueIDEntry> uniqueIDMap = new HashMap<String, UniqueIDEntry>();
+   
+   /*
+    * This is a sanity check to catch any cases where two different nodes are broadcasting the same node id either
+    * due to misconfiguration or problems in failover
+    */
+   private boolean uniqueIDOK(final String originatingNodeID, final String uniqueID)
+   {
+      UniqueIDEntry entry = uniqueIDMap.get(originatingNodeID);
+      
+      if (entry == null)
+      {
+         entry = new UniqueIDEntry(uniqueID);
+         
+         uniqueIDMap.put(originatingNodeID, entry);
+         
+         return true;
+      }
+      else
+      {
+         if (entry.getUniqueID().equals(uniqueID))
+         {
+            return true;
+         }
+         else
+         {
+            //We allow one change - this might occur if one node fails over onto its backup which
+            //has same node id but different unique id
+            if (!entry.isChanged())
+            {
+               entry.setChanged();
+               
+               entry.setUniqueID(uniqueID);
+               
+               return true;
+            }
+            else
+            {
+               return false;
+            }
+         }
+      }
+   }
+   
 
    public void run()
    {
@@ -207,7 +285,7 @@
             }
 
             final DatagramPacket packet = new DatagramPacket(data, data.length);
-
+                        
             try
             {
                socket.receive(packet);
@@ -223,17 +301,26 @@
                   continue;
                }
             }
-            
+
             MessagingBuffer buffer = ChannelBuffers.wrappedBuffer(data);
+
+            String originatingNodeID = buffer.readString();
             
-            String originatingNodeID = buffer.readString();
+            String uniqueID = buffer.readString();
+            
+            if (!uniqueIDOK(originatingNodeID, uniqueID))
+            {
+               log.warn("There seem to be more than one broadcasters on the network broadcasting the same node id");
+               
+               continue;
+            }
 
             if (nodeID.equals(originatingNodeID))
             {
                // Ignore traffic from own node
                continue;
             }
-
+            
             int size = buffer.readInt();
 
             boolean changed = false;
@@ -243,7 +330,7 @@
                for (int i = 0; i < size; i++)
                {
                   TransportConfiguration connector = new TransportConfiguration();
-                  
+
                   connector.decode(buffer);
 
                   boolean existsBackup = buffer.readBoolean();
@@ -253,15 +340,17 @@
                   if (existsBackup)
                   {
                      backupConnector = new TransportConfiguration();
-                     
+
                      backupConnector.decode(buffer);
                   }
 
                   Pair<TransportConfiguration, TransportConfiguration> connectorPair = new Pair<TransportConfiguration, TransportConfiguration>(connector,
                                                                                                                                                 backupConnector);
 
-                  Long oldVal = connectors.put(connectorPair, System.currentTimeMillis());
+                  DiscoveryEntry entry = new DiscoveryEntry(connectorPair, System.currentTimeMillis());
 
+                  DiscoveryEntry oldVal = connectors.put(originatingNodeID, entry);
+
                   if (oldVal == null)
                   {
                      changed = true;
@@ -270,15 +359,15 @@
 
                long now = System.currentTimeMillis();
 
-               Iterator<Map.Entry<Pair<TransportConfiguration, TransportConfiguration>, Long>> iter = connectors.entrySet()
-                                                                                                                .iterator();
+               Iterator<Map.Entry<String, DiscoveryEntry>> iter = connectors.entrySet().iterator();
+               
                // Weed out any expired connectors
 
                while (iter.hasNext())
                {
-                  Map.Entry<Pair<TransportConfiguration, TransportConfiguration>, Long> entry = iter.next();
+                  Map.Entry<String, DiscoveryEntry> entry = iter.next();
 
-                  if (entry.getValue() + timeout <= now)
+                  if (entry.getValue().getLastUpdate() + timeout <= now)
                   {
                      iter.remove();
 

Modified: trunk/src/main/org/jboss/messaging/core/config/TransportConfiguration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/TransportConfiguration.java	2009-04-03 14:02:02 UTC (rev 6296)
+++ trunk/src/main/org/jboss/messaging/core/config/TransportConfiguration.java	2009-04-03 14:08:00 UTC (rev 6297)
@@ -26,6 +26,7 @@
 import java.util.Map;
 
 import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+import org.jboss.messaging.utils.SimpleString;
 import org.jboss.messaging.utils.UUIDGenerator;
 
 /**
@@ -263,4 +264,41 @@
          return false;
       }
    }
+   
+   public String toString()
+   {
+      StringBuilder str = new StringBuilder(replaceWildcardChars(factoryClassName));
+
+      if (params != null)
+      {
+         if (!params.isEmpty())
+         {
+            str.append("?");
+         }
+
+         boolean first = true;
+         for (Map.Entry<String, Object> entry : params.entrySet())
+         {
+            if (!first)
+            {
+               str.append("&");
+            }
+            String encodedKey = replaceWildcardChars(entry.getKey());
+
+            String val = entry.getValue().toString();
+            String encodedVal = replaceWildcardChars(val);
+
+            str.append(encodedKey).append('=').append(encodedVal);
+
+            first = false;
+         }
+      }
+
+      return str.toString();
+   }
+   
+   private String replaceWildcardChars(final String str)
+   {
+      return str.replace('.', '-');
+   }
 }

Modified: trunk/src/main/org/jboss/messaging/core/management/Notification.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/Notification.java	2009-04-03 14:02:02 UTC (rev 6296)
+++ trunk/src/main/org/jboss/messaging/core/management/Notification.java	2009-04-03 14:08:00 UTC (rev 6297)
@@ -20,7 +20,6 @@
  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
  */
 
-
 package org.jboss.messaging.core.management;
 
 import org.jboss.messaging.utils.TypedProperties;
@@ -37,15 +36,16 @@
 public class Notification
 {
    private final NotificationType type;
-   
+
    private final TypedProperties properties;
-      
-   public Notification(final NotificationType type, final TypedProperties properties)
+
+   public Notification(String uid, final NotificationType type, final TypedProperties properties)
    {
+      this.uid = uid;
       this.type = type;
       this.properties = properties;
    }
-   
+
    public NotificationType getType()
    {
       return type;
@@ -55,4 +55,11 @@
    {
       return properties;
    }
+
+   private String uid;
+
+   public String getUID()
+   {
+      return uid;
+   }
 }

Modified: trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java	2009-04-03 14:02:02 UTC (rev 6296)
+++ trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java	2009-04-03 14:08:00 UTC (rev 6297)
@@ -598,6 +598,11 @@
 
             notifProps.putLongProperty(ManagementHelper.HDR_NOTIFICATION_TIMESTAMP, System.currentTimeMillis());
 
+            if (notification.getUID() != null)
+            {               
+               notifProps.putStringProperty(new SimpleString("foobar"), new SimpleString(notification.getUID()));
+            }
+            
             notificationMessage.putTypedProperties(notifProps);
 
             postOffice.route(notificationMessage, null);

Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java	2009-04-03 14:02:02 UTC (rev 6296)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java	2009-04-03 14:08:00 UTC (rev 6297)
@@ -83,7 +83,6 @@
 import org.jboss.messaging.utils.Pair;
 import org.jboss.messaging.utils.SimpleString;
 import org.jboss.messaging.utils.UUID;
-import org.jboss.messaging.utils.UUIDGenerator;
 
 /**
  * 

Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java	2009-04-03 14:02:02 UTC (rev 6296)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java	2009-04-03 14:08:00 UTC (rev 6297)
@@ -138,7 +138,7 @@
       ByteBuffer buff = ByteBuffer.wrap(ids);
 
       Set<Bindable> chosen = new HashSet<Bindable>();
-
+      
       while (buff.hasRemaining())
       {
          int bindingID = buff.getInt();
@@ -269,7 +269,7 @@
       else
       {
          if (message.getProperty(MessageImpl.HDR_FROM_CLUSTER) != null)
-         {
+         {            
             routeFromCluster(message, tx);
          }
          else
@@ -281,7 +281,7 @@
                SimpleString routingName = entry.getKey();
 
                List<Binding> bindings = entry.getValue();
-
+               
                if (bindings == null)
                {
                   // The value can become null if it's concurrently removed while we're iterating - this is expected

Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2009-04-03 14:02:02 UTC (rev 6296)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2009-04-03 14:08:00 UTC (rev 6297)
@@ -54,6 +54,7 @@
 import org.jboss.messaging.core.postoffice.DuplicateIDCache;
 import org.jboss.messaging.core.postoffice.PostOffice;
 import org.jboss.messaging.core.postoffice.QueueInfo;
+import org.jboss.messaging.core.server.MessagingServer;
 import org.jboss.messaging.core.server.Queue;
 import org.jboss.messaging.core.server.QueueFactory;
 import org.jboss.messaging.core.server.ServerMessage;
@@ -68,6 +69,7 @@
 import org.jboss.messaging.utils.ExecutorFactory;
 import org.jboss.messaging.utils.SimpleString;
 import org.jboss.messaging.utils.TypedProperties;
+import org.jboss.messaging.utils.UUIDGenerator;
 
 /**
  * A PostOfficeImpl
@@ -82,6 +84,8 @@
 
    public static final SimpleString HDR_RESET_QUEUE_DATA = new SimpleString("_JBM_RESET_QUEUE_DATA");
 
+   private MessagingServer server;
+
    private final AddressManager addressManager;
 
    private final QueueFactory queueFactory;
@@ -125,10 +129,11 @@
    private final org.jboss.messaging.utils.ExecutorFactory redistributorExecutorFactory;
 
    private final HierarchicalRepository<AddressSettings> addressSettingsRepository;
-   
+
    private final boolean allowRouteWhenNoBindings;
-   
-   public PostOfficeImpl(final StorageManager storageManager,
+
+   public PostOfficeImpl(final MessagingServer server,
+                         final StorageManager storageManager,
                          final PagingManager pagingManager,
                          final QueueFactory bindableFactory,
                          final ManagementService managementService,
@@ -143,6 +148,8 @@
                          HierarchicalRepository<AddressSettings> addressSettingsRepository)
 
    {
+      this.server = server;
+
       this.storageManager = storageManager;
 
       this.queueFactory = bindableFactory;
@@ -169,7 +176,7 @@
       this.idCacheSize = idCacheSize;
 
       this.persistIDCache = persistIDCache;
-      
+
       this.allowRouteWhenNoBindings = allowRouteWhenNoBindings;
 
       this.redistributorExecutorFactory = orderedExecutorFactory;
@@ -190,22 +197,23 @@
 
       // Injecting the postoffice (itself) on queueFactory for paging-control
       queueFactory.setPostOffice(this);
-      
+
       if (!backup)
       {
          startExpiryScanner();
       }
-                  
+
       started = true;
    }
-   
+
    private void startExpiryScanner()
    {
       if (messageExpiryScanPeriod > 0)
       {
          MessageExpiryRunner messageExpiryRunner = new MessageExpiryRunner();
-         messageExpiryExecutor = new ScheduledThreadPoolExecutor(1, new org.jboss.messaging.utils.JBMThreadFactory("JBM-scheduled-threads",
-                                                                                         messageExpiryThreadPriority));
+         messageExpiryExecutor = new ScheduledThreadPoolExecutor(1,
+                                                                 new org.jboss.messaging.utils.JBMThreadFactory("JBM-scheduled-threads",
+                                                                                                                messageExpiryThreadPriority));
          messageExpiryExecutor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
          messageExpiryExecutor.scheduleWithFixedDelay(messageExpiryRunner,
                                                       messageExpiryScanPeriod,
@@ -225,9 +233,9 @@
       }
 
       addressManager.clear();
-      
+
       queueInfos.clear();
-      
+
       transientIDs.clear();
 
       started = false;
@@ -366,7 +374,7 @@
                                                                                                  .toString());
 
                      long redistributionDelay = addressSettings.getRedistributionDelay();
-                     
+
                      if (redistributionDelay != -1)
                      {
                         queue.addRedistributor(redistributionDelay, redistributorExecutorFactory.getExecutor());
@@ -381,7 +389,7 @@
                TypedProperties props = notification.getProperties();
 
                SimpleString clusterName = (SimpleString)props.getProperty(ManagementHelper.HDR_CLUSTER_NAME);
-               
+
                if (clusterName == null)
                {
                   throw new IllegalStateException("No distance");
@@ -436,7 +444,7 @@
                                                                                                  .toString());
 
                      long redistributionDelay = addressSettings.getRedistributionDelay();
-                     
+
                      if (redistributionDelay != -1)
                      {
                         queue.addRedistributor(redistributionDelay, redistributorExecutorFactory.getExecutor());
@@ -460,7 +468,6 @@
 
    // PostOffice implementation -----------------------------------------------
 
-
    // TODO - needs to be synchronized to prevent happening concurrently with activate().
    // (and possible removeBinding and other methods)
    // Otherwise can have situation where createQueue comes in before failover, then failover occurs
@@ -469,9 +476,9 @@
    public synchronized void addBinding(final Binding binding) throws Exception
    {
       binding.setID(generateTransientID());
-      
+
       boolean existed = addressManager.addBinding(binding);
-      
+
       if (binding.getType() == BindingType.LOCAL_QUEUE)
       {
          Queue queue = (Queue)binding.getBindable();
@@ -482,13 +489,13 @@
          }
 
          managementService.registerQueue(queue, binding.getAddress(), storageManager);
-         
+
          if (!existed)
          {
             managementService.registerAddress(binding.getAddress());
          }
       }
-                  
+
       TypedProperties props = new TypedProperties();
 
       props.putIntProperty(ManagementHelper.HDR_BINDING_TYPE, binding.getType().toInt());
@@ -509,34 +516,37 @@
       {
          props.putStringProperty(ManagementHelper.HDR_FILTERSTRING, filter.getFilterString());
       }
-         
-      managementService.sendNotification(new Notification(NotificationType.BINDING_ADDED, props));
+
+      String uid = UUIDGenerator.getInstance().generateStringUUID();
+
+      managementService.sendNotification(new Notification(uid, NotificationType.BINDING_ADDED, props));
    }
 
    public synchronized Binding removeBinding(final SimpleString uniqueName) throws Exception
    {
       Binding binding = addressManager.removeBinding(uniqueName);
-      if(binding == null)
+      if (binding == null)
       {
          throw new MessagingException(MessagingException.QUEUE_DOES_NOT_EXIST);
       }
-      
+
       if (binding.getType() == BindingType.LOCAL_QUEUE)
       {
          managementService.unregisterQueue(uniqueName, binding.getAddress());
-         
+
          if (addressManager.getBindings(binding.getAddress()) == null)
          {
             managementService.unregisterAddress(binding.getAddress());
          }
-      } else if (binding.getType() == BindingType.DIVERT)
+      }
+      else if (binding.getType() == BindingType.DIVERT)
       {
          managementService.unregisterDivert(uniqueName);
-         
+
          if (addressManager.getBindings(binding.getAddress()) == null)
          {
             managementService.unregisterAddress(binding.getAddress());
-         }         
+         }
       }
 
       TypedProperties props = new TypedProperties();
@@ -549,7 +559,7 @@
 
       props.putIntProperty(ManagementHelper.HDR_DISTANCE, binding.getDistance());
 
-      managementService.sendNotification(new Notification(NotificationType.BINDING_REMOVED, props));
+      managementService.sendNotification(new Notification(null, NotificationType.BINDING_REMOVED, props));
 
       releaseTransientID(binding.getID());
 
@@ -572,11 +582,11 @@
    {
       return addressManager.getBinding(name);
    }
-   
+
    public void route(final ServerMessage message, Transaction tx) throws Exception
-   {                      
+   {
       SimpleString address = message.getDestination();
-       
+
       byte[] duplicateID = (byte[])message.getProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID);
 
       DuplicateIDCache cache = null;
@@ -639,11 +649,10 @@
          }
       }
 
-      
       Bindings bindings = addressManager.getBindings(address);
 
       if (bindings != null)
-      { 
+      {
          bindings.route(message, tx);
       }
 
@@ -701,7 +710,7 @@
             }
          }
       }
-      
+
       startExpiryScanner();
 
       return queues;
@@ -727,7 +736,7 @@
    }
 
    public void sendQueueInfoToQueue(final SimpleString queueName, final SimpleString address) throws Exception
-   {            
+   {
       // We send direct to the queue so we can send it to the same queue that is bound to the notifications adress -
       // this is crucial for ensuring
       // that queue infos and notifications are received in a contiguous consistent stream
@@ -820,9 +829,13 @@
 
       message.setDestination(queueName);
 
+      String uid = UUIDGenerator.getInstance().generateStringUUID();
+
       message.putStringProperty(ManagementHelper.HDR_NOTIFICATION_TYPE, new SimpleString(type.toString()));
       message.putLongProperty(ManagementHelper.HDR_NOTIFICATION_TIMESTAMP, System.currentTimeMillis());
 
+      message.putStringProperty(new SimpleString("foobar"), new SimpleString(uid));
+
       return message;
    }
 
@@ -857,16 +870,16 @@
       synchronized (tx)
       {
          PageMessageOperation oper = (PageMessageOperation)tx.getProperty(TransactionPropertyIndexes.PAGE_MESSAGES_OPERATION);
-   
+
          if (oper == null)
          {
             oper = new PageMessageOperation();
-   
+
             tx.putProperty(TransactionPropertyIndexes.PAGE_MESSAGES_OPERATION, oper);
-   
+
             tx.addOperation(oper);
          }
-   
+
          return oper;
       }
    }

Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/SimpleAddressManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/SimpleAddressManager.java	2009-04-03 14:02:02 UTC (rev 6296)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/SimpleAddressManager.java	2009-04-03 14:08:00 UTC (rev 6297)
@@ -21,15 +21,16 @@
  */
 package org.jboss.messaging.core.postoffice.impl;
 
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.postoffice.AddressManager;
 import org.jboss.messaging.core.postoffice.Binding;
 import org.jboss.messaging.core.postoffice.Bindings;
 import org.jboss.messaging.utils.SimpleString;
 
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
 /**
  * A simple address manager that maintains the addresses and bindings.
  *
@@ -39,6 +40,8 @@
  */
 public class SimpleAddressManager implements AddressManager
 {
+   private static final Logger log = Logger.getLogger(SimpleAddressManager.class);
+
    private final ConcurrentMap<SimpleString, Bindings> mappings = new ConcurrentHashMap<SimpleString, Bindings>();
 
    private final ConcurrentMap<SimpleString, Binding> nameMap = new ConcurrentHashMap<SimpleString, Binding>();
@@ -47,7 +50,10 @@
    {
       if (nameMap.putIfAbsent(binding.getUniqueName(), binding) != null)
       {
-         throw new IllegalStateException("Binding already exists " + binding);
+         //throw new IllegalStateException("Binding already exists " + binding);
+         log.error("Binding already exists " + binding.getUniqueName(), new Exception());
+         
+         System.exit(1);
       }
       return addMappingInternal(binding.getAddress(), binding);
    }

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateQueueMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateQueueMessage.java	2009-04-03 14:02:02 UTC (rev 6296)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateQueueMessage.java	2009-04-03 14:08:00 UTC (rev 6297)
@@ -22,6 +22,7 @@
 
 package org.jboss.messaging.core.remoting.impl.wireformat;
 
+import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
 import org.jboss.messaging.utils.DataConstants;
 import org.jboss.messaging.utils.SimpleString;
@@ -34,7 +35,10 @@
 public class CreateQueueMessage extends PacketImpl
 {
    // Constants -----------------------------------------------------
+   
+   private static final Logger log = Logger.getLogger(CreateQueueMessage.class);
 
+
    // Attributes ----------------------------------------------------
 
    private SimpleString address;
@@ -46,7 +50,7 @@
    private boolean durable;
 
    private boolean temporary;
-
+   
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
@@ -110,7 +114,7 @@
    {
       return temporary;
    }
-
+   
    public void encodeBody(final MessagingBuffer buffer)
    {
       buffer.writeSimpleString(address);

Modified: trunk/src/main/org/jboss/messaging/core/remoting/server/RemotingService.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/server/RemotingService.java	2009-04-03 14:02:02 UTC (rev 6296)
+++ trunk/src/main/org/jboss/messaging/core/remoting/server/RemotingService.java	2009-04-03 14:08:00 UTC (rev 6297)
@@ -49,4 +49,6 @@
    boolean removeInterceptor(Interceptor interceptor);
    
    void setManagementService(ManagementService managementService);
+   
+   void freeze();
 }

Modified: trunk/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java	2009-04-03 14:02:02 UTC (rev 6296)
+++ trunk/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java	2009-04-03 14:08:00 UTC (rev 6297)
@@ -182,13 +182,23 @@
       started = true;
    }
    
+   public synchronized void freeze()
+   {
+      //Used in testing - prevents service taking any more connections
+      
+      for (Acceptor acceptor : acceptors)
+      {
+         acceptor.pause();
+      }
+   }
+   
    public synchronized void stop() throws Exception
    {
       if (!started)
       {
          return;
       }
-
+ 
       if (failedConnectionTimer != null)
       {
          failedConnectionsTask.cancel();
@@ -199,7 +209,7 @@
 
          failedConnectionTimer = null;
       }
-      
+          
       //We need to stop them accepting first so no new connections are accepted after we send the disconnect message
       for (Acceptor acceptor : acceptors)
       {
@@ -262,7 +272,7 @@
 
       Object id = connection.getID();
 
-      connections.put(id, rc);
+      connections.put(id, rc);           
    }
 
    public void connectionDestroyed(final Object connectionID)

Modified: trunk/src/main/org/jboss/messaging/core/security/impl/SecurityStoreImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/security/impl/SecurityStoreImpl.java	2009-04-03 14:02:02 UTC (rev 6296)
+++ trunk/src/main/org/jboss/messaging/core/security/impl/SecurityStoreImpl.java	2009-04-03 14:08:00 UTC (rev 6297)
@@ -128,7 +128,7 @@
 
                   props.putStringProperty(ManagementHelper.HDR_USER, SimpleString.toSimpleString(user));
 
-                  Notification notification = new Notification(SECURITY_AUTHENTICATION_VIOLATION, props);
+                  Notification notification = new Notification(null, SECURITY_AUTHENTICATION_VIOLATION, props);
 
                   notificationService.sendNotification(notification);
                }
@@ -173,7 +173,7 @@
                props.putStringProperty(ManagementHelper.HDR_CHECK_TYPE, new SimpleString(checkType.toString()));
                props.putStringProperty(ManagementHelper.HDR_USER, SimpleString.toSimpleString(user));
 
-               Notification notification = new Notification(NotificationType.SECURITY_PERMISSION_VIOLATION, props);
+               Notification notification = new Notification(null, NotificationType.SECURITY_PERMISSION_VIOLATION, props);
 
                notificationService.sendNotification(notification);
             }

Modified: trunk/src/main/org/jboss/messaging/core/server/Messaging.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/Messaging.java	2009-04-03 14:02:02 UTC (rev 6296)
+++ trunk/src/main/org/jboss/messaging/core/server/Messaging.java	2009-04-03 14:08:00 UTC (rev 6297)
@@ -154,9 +154,6 @@
 
       MessagingServer server = new MessagingServerImpl();
 
-      log.info("** creating server with security enabled " + config.isSecurityEnabled() + 
-               " " + System.identityHashCode(config));
-
       server.setConfiguration(config);
 
       server.setStorageManager(storageManager);

Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/Bridge.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/Bridge.java	2009-04-03 14:02:02 UTC (rev 6296)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/Bridge.java	2009-04-03 14:08:00 UTC (rev 6297)
@@ -51,4 +51,6 @@
    boolean isUseDuplicateDetection();  
    
    void activate();
+   
+   void setQueue(Queue queue);
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/MessageFlowRecord.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/MessageFlowRecord.java	2009-04-03 14:02:02 UTC (rev 6296)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/MessageFlowRecord.java	2009-04-03 14:08:00 UTC (rev 6297)
@@ -24,6 +24,7 @@
 package org.jboss.messaging.core.server.cluster;
 
 import org.jboss.messaging.core.client.MessageHandler;
+import org.jboss.messaging.core.server.Queue;
 
 /**
  * A MessageFlowRecord
@@ -40,6 +41,8 @@
    
    int getMaxHops();
    
+   void activate(Queue queue) throws Exception;
+   
    void reset() throws Exception;
    
    void close() throws Exception;

Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java	2009-04-03 14:02:02 UTC (rev 6296)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java	2009-04-03 14:08:00 UTC (rev 6297)
@@ -74,6 +74,7 @@
 import org.jboss.messaging.core.security.impl.SecurityStoreImpl;
 import org.jboss.messaging.core.server.HandleStatus;
 import org.jboss.messaging.core.server.MessageReference;
+import org.jboss.messaging.core.server.MessagingServer;
 import org.jboss.messaging.core.server.Queue;
 import org.jboss.messaging.core.server.ServerMessage;
 import org.jboss.messaging.core.server.cluster.Bridge;
@@ -105,7 +106,7 @@
 
    private final SimpleString name;
 
-   private final Queue queue;
+   private Queue queue;
 
    private final Executor executor;
 
@@ -154,6 +155,9 @@
    private Channel replicatingChannel;
 
    private boolean activated;
+   
+   
+   private MessagingServer server;
 
    // Static --------------------------------------------------------
 
@@ -202,7 +206,7 @@
            null,
            replicatingChannel,
            activated,
-           storageManager);
+           storageManager, null);
    }
 
    public BridgeImpl(final UUID nodeUUID,
@@ -225,7 +229,8 @@
                      final MessageFlowRecord flowRecord,
                      final Channel replicatingChannel,
                      final boolean activated,
-                     final StorageManager storageManager) throws Exception
+                     final StorageManager storageManager,
+                     MessagingServer server) throws Exception
    {
       this.nodeUUID = nodeUUID;
 
@@ -274,7 +279,9 @@
 
       this.replicatingChannel = replicatingChannel;
 
-      this.activated = activated;   
+      this.activated = activated;  
+      
+      this.server = server;
    }
 
    public synchronized void start() throws Exception
@@ -283,7 +290,7 @@
       {
          return;
       }
-
+      
       started = true;
 
       if (activated)
@@ -300,7 +307,6 @@
 
       while ((ref = refs.poll()) != null)
       {
-         // ref.getQueue().cancel(ref);
          list.addFirst(ref);
       }
 
@@ -325,7 +331,7 @@
       
       executor.execute(new StopRunnable());
            
-      waitForRunnablesToComplete();
+      waitForRunnablesToComplete();   
    }
 
    public boolean isStarted()
@@ -351,6 +357,11 @@
    {
       return queue;
    }
+   
+   public void setQueue(final Queue queue)
+   {
+      this.queue = queue;
+   }
 
    public Filter getFilter()
    {
@@ -569,7 +580,7 @@
       try
       {
          queue.addConsumer(BridgeImpl.this);
-
+  
          csf = new ClientSessionFactoryImpl(connectorPair.a,
                                             connectorPair.b,
                                             failoverOnServerShutdown,
@@ -625,7 +636,7 @@
             // different each time this is called
             // Otherwise it may already exist if server is restarted before it has been deleted on backup
 
-            String qName = "notif-" + nodeUUID.toString() + "-" + name.toString();
+            String qName = "notif." + nodeUUID.toString() + "." + name.toString();
             
             SimpleString notifQueueName = new SimpleString(qName);
 
@@ -693,7 +704,7 @@
          active = true;
 
          queue.deliverAsync(executor);
-         
+
          return true;
       }
       catch (Exception e)

Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BroadcastGroupImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BroadcastGroupImpl.java	2009-04-03 14:02:02 UTC (rev 6296)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BroadcastGroupImpl.java	2009-04-03 14:08:00 UTC (rev 6297)
@@ -35,6 +35,7 @@
 import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
 import org.jboss.messaging.core.server.cluster.BroadcastGroup;
 import org.jboss.messaging.utils.Pair;
+import org.jboss.messaging.utils.UUIDGenerator;
 
 /**
  * A BroadcastGroupImpl
@@ -67,6 +68,10 @@
    private ScheduledFuture<?> future;
    
    private boolean active;
+   
+   //Each broadcast group has a unique id - we use this to detect when more than one group broadcasts the same node id
+   //on the network which would be an error
+   private final String uniqueID;
 
    public BroadcastGroupImpl(final String nodeID,
                              final String name,
@@ -86,6 +91,8 @@
       this.groupPort = groupPort;
       
       this.active = active;
+           
+      this.uniqueID = UUIDGenerator.getInstance().generateStringUUID();
    }
 
    public synchronized void start() throws Exception
@@ -136,7 +143,7 @@
    }
 
    public synchronized void addConnectorPair(final Pair<TransportConfiguration, TransportConfiguration> connectorPair)
-   {
+   { 
       connectorPairs.add(connectorPair);
    }
 
@@ -165,6 +172,8 @@
       MessagingBuffer buff = ChannelBuffers.dynamicBuffer(4096);
      
       buff.writeString(nodeID);
+      
+      buff.writeString(uniqueID);
 
       buff.writeInt(connectorPairs.size());
 

Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java	2009-04-03 14:02:02 UTC (rev 6296)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java	2009-04-03 14:08:00 UTC (rev 6297)
@@ -24,21 +24,17 @@
 
 import static org.jboss.messaging.core.management.NotificationType.CONSUMER_CLOSED;
 import static org.jboss.messaging.core.management.NotificationType.CONSUMER_CREATED;
-import static org.jboss.messaging.core.management.NotificationType.SECURITY_AUTHENTICATION_VIOLATION;
-import static org.jboss.messaging.core.management.NotificationType.SECURITY_PERMISSION_VIOLATION;
 import static org.jboss.messaging.core.postoffice.impl.PostOfficeImpl.HDR_RESET_QUEUE_DATA;
 
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 
 import org.jboss.messaging.core.client.ClientMessage;
 import org.jboss.messaging.core.client.management.impl.ManagementHelper;
+import org.jboss.messaging.core.cluster.DiscoveryEntry;
 import org.jboss.messaging.core.cluster.DiscoveryGroup;
 import org.jboss.messaging.core.cluster.DiscoveryListener;
 import org.jboss.messaging.core.config.TransportConfiguration;
@@ -46,7 +42,6 @@
 import org.jboss.messaging.core.management.ManagementService;
 import org.jboss.messaging.core.management.Notification;
 import org.jboss.messaging.core.management.NotificationType;
-import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.postoffice.Binding;
 import org.jboss.messaging.core.postoffice.Bindings;
 import org.jboss.messaging.core.postoffice.PostOffice;
@@ -63,7 +58,6 @@
 import org.jboss.messaging.core.server.cluster.ClusterConnection;
 import org.jboss.messaging.core.server.cluster.MessageFlowRecord;
 import org.jboss.messaging.core.server.cluster.RemoteQueueBinding;
-import org.jboss.messaging.core.server.cluster.Transformer;
 import org.jboss.messaging.utils.ExecutorFactory;
 import org.jboss.messaging.utils.Pair;
 import org.jboss.messaging.utils.SimpleString;
@@ -95,13 +89,13 @@
 
    private final SimpleString address;
 
-   private final long retryInterval;  
-   
+   private final long retryInterval;
+
    private final boolean useDuplicateDetection;
 
    private final boolean routeWhenNoConsumers;
 
-   private Map<Pair<TransportConfiguration, TransportConfiguration>, MessageFlowRecord> records = new HashMap<Pair<TransportConfiguration, TransportConfiguration>, MessageFlowRecord>();
+   private Map<String, MessageFlowRecord> records = new HashMap<String, MessageFlowRecord>();
 
    private final DiscoveryGroup discoveryGroup;
 
@@ -177,7 +171,7 @@
 
       if (!backup)
       {
-         this.updateConnectors(connectors);
+         this.updateFromStaticConnectors(connectors);
       }
    }
 
@@ -205,7 +199,7 @@
       this.address = address;
 
       this.retryInterval = retryInterval;
-      
+
       this.executorFactory = executorFactory;
 
       this.server = server;
@@ -284,7 +278,7 @@
       {
          return;
       }
-       
+
       backup = false;
 
       if (discoveryGroup != null)
@@ -295,7 +289,7 @@
       {
          try
          {
-            updateConnectors(staticConnectors);
+            updateFromStaticConnectors(staticConnectors);
          }
          catch (Exception e)
          {
@@ -315,7 +309,7 @@
 
       try
       {
-         List<Pair<TransportConfiguration, TransportConfiguration>> connectors = discoveryGroup.getConnectors();
+         Map<String, DiscoveryEntry> connectors = discoveryGroup.getDiscoveryEntryMap();
 
          updateConnectors(connectors);
       }
@@ -325,28 +319,37 @@
       }
    }
 
-   private void updateConnectors(final List<Pair<TransportConfiguration, TransportConfiguration>> connectors) throws Exception
+   private void updateFromStaticConnectors(final List<Pair<TransportConfiguration, TransportConfiguration>> connectors) throws Exception
    {
-      doUpdateConnectors(connectors);
+      Map<String, DiscoveryEntry> map = new HashMap<String, DiscoveryEntry>();
+
+      // TODO - we fudge the node id - it's never updated anyway
+      int i = 0;
+      for (Pair<TransportConfiguration, TransportConfiguration> connectorPair : connectors)
+      {
+         map.put(String.valueOf(i++), new DiscoveryEntry(connectorPair, 0));
+      }
+      
+      updateConnectors(map);
    }
 
-   private void doUpdateConnectors(final List<Pair<TransportConfiguration, TransportConfiguration>> connectors) throws Exception
+   private void updateConnectors(final Map<String, DiscoveryEntry> connectors) throws Exception
    {
-      Set<Pair<TransportConfiguration, TransportConfiguration>> connectorSet = new HashSet<Pair<TransportConfiguration, TransportConfiguration>>();
+      // Set<Pair<TransportConfiguration, TransportConfiguration>> connectorSet = new
+      // HashSet<Pair<TransportConfiguration, TransportConfiguration>>();
 
-      connectorSet.addAll(connectors);
+      // connectorSet.addAll(connectors);
 
-      Iterator<Map.Entry<Pair<TransportConfiguration, TransportConfiguration>, MessageFlowRecord>> iter = records.entrySet()
-                                                                                                                 .iterator();
+      Iterator<Map.Entry<String, MessageFlowRecord>> iter = records.entrySet().iterator();
 
       while (iter.hasNext())
       {
-         Map.Entry<Pair<TransportConfiguration, TransportConfiguration>, MessageFlowRecord> entry = iter.next();
+         Map.Entry<String, MessageFlowRecord> entry = iter.next();
 
-         if (!connectorSet.contains(entry.getKey()))
+         if (!connectors.containsKey(entry.getKey()))
          {
             // Connector no longer there - we should remove and close it - we don't delete the queue though - it may
-            // have messages - this is up to the admininstrator to do this
+            // have messages - this is up to the administrator to do this
 
             entry.getValue().close();
 
@@ -354,12 +357,14 @@
          }
       }
 
-      for (final Pair<TransportConfiguration, TransportConfiguration> connectorPair : connectors)
+      for (final Map.Entry<String, DiscoveryEntry> entry : connectors.entrySet())
       {
-         if (!records.containsKey(connectorPair))
+         if (!records.containsKey(entry.getKey()))
          {
-            final SimpleString queueName = generateQueueName(name, connectorPair);
+            Pair<TransportConfiguration, TransportConfiguration> connectorPair = entry.getValue().getConnectorPair();
 
+            final SimpleString queueName = new SimpleString("sf." + name + "." + entry.getKey());
+
             Binding queueBinding = postOffice.getBinding(queueName);
 
             Queue queue;
@@ -367,25 +372,30 @@
             if (queueBinding != null)
             {
                queue = (Queue)queueBinding.getBindable();
-               
-               createNewRecord(connectorPair, queueName, queue);
+
+               createNewRecord(entry.getKey(), connectorPair, queueName, queue, true);
             }
             else
             {
                // Add binding in storage so the queue will get reloaded on startup and we can find it - it's never
                // actually routed to at that address though
-               
+
                if (replicatingChannel == null)
                {
                   queue = server.createQueue(queueName, queueName, null, true, false);
-                  
-                  createNewRecord(connectorPair, queueName, queue);
+
+                  createNewRecord(entry.getKey(), connectorPair, queueName, queue, true);
                }
                else
                {
-                  //Replicate the createQueue first
+                  // We need to create the record before we replicate, since otherwise, two updates can come in for
+                  // the same entry before the first replication comes back, and it won't find the record, so it
+                  // will try and create the queue twice
+                  createNewRecord(entry.getKey(), connectorPair, queueName, null, false);
+
+                  // Replicate the createQueue first
                   Packet packet = new CreateQueueMessage(queueName, queueName, null, true, false);
-                  
+
                   replicatingChannel.replicatePacket(packet, 1, new Runnable()
                   {
                      public void run()
@@ -393,8 +403,16 @@
                         try
                         {
                            Queue queue = server.createQueue(queueName, queueName, null, true, false);
-                        
-                           createNewRecord(connectorPair, queueName, queue);
+
+                           synchronized (ClusterConnectionImpl.this)
+                           {
+                              MessageFlowRecord record = records.get(entry.getKey());
+
+                              if (record != null)
+                              {
+                                 record.activate(queue);
+                              }
+                           }
                         }
                         catch (Exception e)
                         {
@@ -405,11 +423,14 @@
                }
             }
          }
-      }            
+      }
    }
-   
-   private void createNewRecord(final Pair<TransportConfiguration, TransportConfiguration> connectorPair, final SimpleString queueName,
-                                final Queue queue) throws Exception
+
+   private void createNewRecord(final String nodeID,
+                                final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
+                                final SimpleString queueName,
+                                final Queue queue,
+                                final boolean start) throws Exception
    {
       MessageFlowRecordImpl record = new MessageFlowRecordImpl(queue);
 
@@ -433,69 +454,36 @@
                                      record,
                                      replicatingChannel,
                                      !backup,
-                                     server.getStorageManager());
+                                     server.getStorageManager(),
+                                     server);
 
       record.setBridge(bridge);
 
-      records.put(connectorPair, record);
+      records.put(nodeID, record);
 
-      bridge.start();
-   }
-
-   private SimpleString generateQueueName(final SimpleString clusterName,
-                                          final Pair<TransportConfiguration, TransportConfiguration> connectorPair) throws Exception
-   {
-      return new SimpleString("cluster." + name +
-                              "." +
-                              generateConnectorString(connectorPair.a) +
-                              "-" +
-                              (connectorPair.b == null ? "null" : generateConnectorString(connectorPair.b)));
-   }
-
-   private String replaceWildcardChars(final String str)
-   {
-      return str.replace('.', '-');
-   }
-
-   private SimpleString generateConnectorString(final TransportConfiguration config) throws Exception
-   {
-      StringBuilder str = new StringBuilder(replaceWildcardChars(config.getFactoryClassName()));
-
-      if (config.getParams() != null)
+      if (start)
       {
-         if (!config.getParams().isEmpty())
-         {
-            str.append("?");
-         }
-
-         boolean first = true;
-         for (Map.Entry<String, Object> entry : config.getParams().entrySet())
-         {
-            if (!first)
-            {
-               str.append("&");
-            }
-            String encodedKey = replaceWildcardChars(entry.getKey());
-
-            String val = entry.getValue().toString();
-            String encodedVal = replaceWildcardChars(val);
-
-            str.append(encodedKey).append('=').append(encodedVal);
-
-            first = false;
-         }
+         bridge.start();
       }
-
-      return new SimpleString(str.toString());
    }
 
+//   private SimpleString generateQueueName(final SimpleString clusterName,
+//                                          final Pair<TransportConfiguration, TransportConfiguration> connectorPair) throws Exception
+//   {
+//      return new SimpleString("sf." + name +
+//                              "." +
+//                              connectorPair.a.toString() +
+//                              "-" +
+//                              (connectorPair.b == null ? "null" : connectorPair.b.toString()));
+//   }
+
    // Inner classes -----------------------------------------------------------------------------------
 
    private class MessageFlowRecordImpl implements MessageFlowRecord
    {
       private Bridge bridge;
 
-      private final Queue queue;
+      private Queue queue;
 
       private final Map<SimpleString, RemoteQueueBinding> bindings = new HashMap<SimpleString, RemoteQueueBinding>();
 
@@ -517,12 +505,21 @@
       }
 
       public void close() throws Exception
-      {       
+      {
          bridge.stop();
 
          clearBindings();
       }
 
+      public void activate(final Queue queue) throws Exception
+      {
+         this.queue = queue;
+
+         bridge.setQueue(queue);
+
+         bridge.start();
+      }
+
       public void setBridge(final Bridge bridge)
       {
          this.bridge = bridge;
@@ -531,14 +528,14 @@
       public synchronized void reset() throws Exception
       {
          clearBindings();
-         
+
          firstReset = false;
       }
 
       public synchronized void onMessage(final ClientMessage message)
       {
          try
-         {                        
+         {
             // Reset the bindings
             if (message.getProperty(HDR_RESET_QUEUE_DATA) != null)
             {
@@ -553,7 +550,7 @@
             {
                return;
             }
-            
+
             // TODO - optimised this by just passing int in header - but filter needs to be extended to support IN with
             // a list of integers
             SimpleString type = (SimpleString)message.getProperty(ManagementHelper.HDR_NOTIFICATION_TYPE);
@@ -649,7 +646,7 @@
          {
             throw new IllegalStateException("queueID is null");
          }
-         
+
          if (replChannel != null)
          {
             Packet packet = new ReplicateRemoteBindingAddedMessage(name,
@@ -660,13 +657,13 @@
                                                                    filterString,
                                                                    queue.getName(),
                                                                    distance + 1);
-            
+
             replChannel.replicatePacket(packet, 1, new Runnable()
             {
                public void run()
                {
                   try
-                  {                     
+                  {
                      doBindingAdded(message, null);
                   }
                   catch (Exception e)
@@ -684,7 +681,7 @@
                                                                     queueID,
                                                                     filterString,
                                                                     queue,
-                                                                   // useDuplicateDetection,
+                                                                    // useDuplicateDetection,
                                                                     bridge.getName(),
                                                                     distance + 1);
 
@@ -767,7 +764,7 @@
          {
             throw new IllegalStateException("clusterName is null");
          }
-         
+
          message.putIntProperty(ManagementHelper.HDR_DISTANCE, distance + 1);
 
          SimpleString filterString = (SimpleString)message.getProperty(ManagementHelper.HDR_FILTERSTRING);
@@ -801,9 +798,9 @@
             }
 
             binding.addConsumer(filterString);
-           
+
             // Need to propagate the consumer add
-            Notification notification = new Notification(CONSUMER_CREATED, message.getProperties());
+            Notification notification = new Notification(null, CONSUMER_CREATED, message.getProperties());
 
             managementService.sendNotification(notification);
          }
@@ -824,14 +821,16 @@
          {
             throw new IllegalStateException("clusterName is null");
          }
-         
+
          message.putIntProperty(ManagementHelper.HDR_DISTANCE, distance + 1);
 
          SimpleString filterString = (SimpleString)message.getProperty(ManagementHelper.HDR_FILTERSTRING);
 
          if (replChannel != null)
          {
-            Packet packet = new ReplicateRemoteConsumerRemovedMessage(clusterName, filterString, message.getProperties());
+            Packet packet = new ReplicateRemoteConsumerRemovedMessage(clusterName,
+                                                                      filterString,
+                                                                      message.getProperties());
 
             replChannel.replicatePacket(packet, 1, new Runnable()
             {
@@ -859,9 +858,8 @@
 
             binding.removeConsumer(filterString);
 
-            
             // Need to propagate the consumer close
-            Notification notification = new Notification(CONSUMER_CLOSED, message.getProperties());
+            Notification notification = new Notification(null, CONSUMER_CLOSED, message.getProperties());
 
             managementService.sendNotification(notification);
          }
@@ -892,10 +890,18 @@
                                                               queueID,
                                                               filterString,
                                                               queue,
-                                                             // useDuplicateDetection,
                                                               queueName,
                                                               distance);
 
+      if (postOffice.getBinding(uniqueName) != null)
+      {
+         log.warn("Remoting queue binding " + uniqueName +
+                  " has already been bound in the post office. Most likely cause for this is you have a loop " +
+                  "in your cluster due to cluster max-hops being too large or you have multiple cluster connections to the same nodes using overlapping addresses");
+
+         return;
+      }
+
       postOffice.addBinding(binding);
 
       Bindings theBindings = postOffice.getBindingsForAddress(address);

Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/RemoteQueueBindingImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/RemoteQueueBindingImpl.java	2009-04-03 14:02:02 UTC (rev 6296)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/RemoteQueueBindingImpl.java	2009-04-03 14:08:00 UTC (rev 6297)
@@ -220,7 +220,7 @@
       
       buff.putInt(remoteQueueID);
       
-      message.putBytesProperty(idsHeaderName, ids);                
+      message.putBytesProperty(idsHeaderName, ids);           
    }
 
    public synchronized void addConsumer(final SimpleString filterString) throws Exception

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2009-04-03 14:02:02 UTC (rev 6296)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2009-04-03 14:08:00 UTC (rev 6297)
@@ -246,7 +246,8 @@
 
       resourceManager = new ResourceManagerImpl((int)configuration.getTransactionTimeout() / 1000,
                                                 configuration.getTransactionTimeoutScanPeriod());
-      postOffice = new PostOfficeImpl(storageManager,
+      postOffice = new PostOfficeImpl(this,
+                                      storageManager,
                                       pagingManager,
                                       queueFactory,
                                       managementService,
@@ -278,35 +279,39 @@
 
       storageManager.loadBindingJournal(queueBindingInfos);
 
-      // TODO - this logic could be simplified
-      if (uuid == null)
+      if (!configuration.isBackup())
       {
-         uuid = storageManager.getPersistentID();
-
-         if (uuid == null && !configuration.isBackup())
+         if (uuid == null)
          {
-            uuid = UUIDGenerator.getInstance().generateUUID();
+            uuid = storageManager.getPersistentID();
 
-            storageManager.setPersistentID(uuid);
-         }
+            if (uuid == null)
+            {
+               uuid = UUIDGenerator.getInstance().generateUUID();
 
-         if (uuid != null)
-         {
-            nodeID = new SimpleString(uuid.toString());
+               storageManager.setPersistentID(uuid);
+            }
+            
+            nodeID = new SimpleString(uuid.toString());            
          }
       }
       else
       {
-         UUID theUUID = storageManager.getPersistentID();
-
-         if (theUUID == null)
+         UUID currentUUID = storageManager.getPersistentID();
+         
+         if (currentUUID != null)
          {
-            // Backup being initialised
+            if (!currentUUID.equals(uuid))
+            {
+               throw new IllegalStateException("Backup server already has an id but it's not the same as live");
+            }
+         }
+         else
+         {
             storageManager.setPersistentID(uuid);
          }
-
       }
-
+      
       serverManagement = managementService.registerServer(postOffice,
                                                           storageManager,
                                                           configuration,
@@ -650,18 +655,18 @@
          synchronized (this)
          {
             freezeBackupConnection();
-   
+
             List<Queue> toActivate = postOffice.activate();
-   
+
             for (Queue queue : toActivate)
             {
                scheduledExecutor.schedule(new ActivateRunner(queue),
                                           configuration.getQueueActivationTimeout(),
                                           TimeUnit.MILLISECONDS);
             }
-   
+
             configuration.setBackup(false);
-   
+ 
             if (clusterManager != null)
             {
                clusterManager.activate();
@@ -833,42 +838,27 @@
 
    public void initialiseBackup(final UUID theUUID, final long currentMessageID) throws Exception
    {
+      if (theUUID == null)
+      {
+         throw new IllegalArgumentException("node id is null");
+      }
+
       synchronized (initialiseLock)
       {
          if (initialised)
          {
-            if (uuid == null)
-            {
-               throw new IllegalStateException("Server is already initialised but has no id");
-            }
-
-            if (!uuid.toString().equals(theUUID.toString()))
-            {
-               throw new IllegalStateException("Backup node already has a unique id but it's not the same as the live node id");
-            }
-
-            return;
+            throw new IllegalStateException("Server is already initialised");
          }
 
-         if (uuid != null && !uuid.toString().equals(theUUID.toString()))
-         {
-            throw new IllegalStateException("Backup node already has a unique id but it's not the same as the live node id");
-         }
-
-         if (theUUID == null)
-         {
-            throw new IllegalArgumentException("node id is null");
-         }
-
          this.uuid = theUUID;
 
          this.nodeID = new SimpleString(uuid.toString());
-
+         
          doStart();
-
+         
          if (currentMessageID != this.storageManager.getCurrentUniqueID())
          {
-            throw new IllegalStateException("Backup node current unique id != live node current unique id " + this.storageManager.getCurrentUniqueID() +
+            throw new IllegalStateException("Backup node current id sequence != live node current id sequence " + this.storageManager.getCurrentUniqueID() +
                                             ", " +
                                             currentMessageID);
          }
@@ -975,7 +965,7 @@
                             final boolean temporary) throws Exception
    {
       Binding binding = postOffice.getBinding(queueName);
-
+      
       if (binding != null)
       {
          throw new MessagingException(MessagingException.QUEUE_EXISTS);
@@ -1216,7 +1206,7 @@
 
       sessions.put(name, session);
 
-      ServerSessionPacketHandler handler = new ServerSessionPacketHandler(session, channel);
+      ServerSessionPacketHandler handler = new ServerSessionPacketHandler(session);
 
       session.setHandler(handler);
 

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java	2009-04-03 14:02:02 UTC (rev 6296)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java	2009-04-03 14:08:00 UTC (rev 6297)
@@ -131,7 +131,7 @@
             // Create queue can also be fielded here in the case of a replicated store and forward queue creation
 
             CreateQueueMessage request = (CreateQueueMessage)packet;
-
+            
             handleCreateQueue(request);
 
             break;
@@ -377,7 +377,7 @@
       }
       
       // Need to propagate the consumer add
-      Notification notification = new Notification(CONSUMER_CREATED, request.getProperties());
+      Notification notification = new Notification(null, CONSUMER_CREATED, request.getProperties());
 
       try
       {
@@ -409,7 +409,7 @@
       }
       
       // Need to propagate the consumer close
-      Notification notification = new Notification(CONSUMER_CLOSED, request.getProperties());
+      Notification notification = new Notification(null, CONSUMER_CLOSED, request.getProperties());
 
       try
       {

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2009-04-03 14:02:02 UTC (rev 6296)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2009-04-03 14:08:00 UTC (rev 6297)
@@ -357,7 +357,7 @@
    }
 
    public void addLast(final MessageReference ref)
-   {               
+   {          
       add(ref, false);
    }
 

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2009-04-03 14:02:02 UTC (rev 6296)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2009-04-03 14:08:00 UTC (rev 6297)
@@ -51,6 +51,7 @@
 import org.jboss.messaging.core.server.HandleStatus;
 import org.jboss.messaging.core.server.LargeServerMessage;
 import org.jboss.messaging.core.server.MessageReference;
+import org.jboss.messaging.core.server.MessagingServer;
 import org.jboss.messaging.core.server.Queue;
 import org.jboss.messaging.core.server.ServerConsumer;
 import org.jboss.messaging.core.server.ServerMessage;
@@ -136,10 +137,13 @@
    private final ManagementService managementService;
 
    private final Binding binding;
+   
+   
+   private MessagingServer server;
 
    // Constructors ---------------------------------------------------------------------------------
 
-   public ServerConsumerImpl(final long id,
+   public ServerConsumerImpl(final MessagingServer server, final long id,
                              final long replicatedSessionID,
                              final ServerSession session,
                              final QueueBinding binding,
@@ -155,6 +159,8 @@
                              final Executor executor,
                              final ManagementService managementService) throws Exception
    {
+      this.server = server;
+      
       this.id = id;
 
       this.replicatedSessionID = replicatedSessionID;
@@ -251,7 +257,7 @@
 
          props.putIntProperty(ManagementHelper.HDR_CONSUMER_COUNT, messageQueue.getConsumerCount());
 
-         Notification notification = new Notification(NotificationType.CONSUMER_CLOSED, props);
+         Notification notification = new Notification(null, NotificationType.CONSUMER_CLOSED, props);
 
          managementService.sendNotification(notification);
       }
@@ -409,13 +415,27 @@
 
    public void deliverReplicated(final long messageID) throws Exception
    {      
-      MessageReference ref = removeFirstReference(messageID);
+      MessageReference ref = messageQueue.removeFirstReference(messageID);
 
       if (ref == null)
       {
-         throw new IllegalStateException("Cannot find ref when replicating delivery " + messageID +
-                                         " queue" +
-                                         messageQueue.getName());
+         // The order is correct, but it hasn't been depaged yet, so we need to force a depage
+         PagingStore store = pagingManager.getPageStore(binding.getAddress());
+         
+         // force a depage
+         if (!store.readPage()) // This returns false if there are no pages
+         {
+            throw new IllegalStateException("Cannot find ref " + messageID + " server " + System.identityHashCode(server) + " queue " + this.messageQueue.getName());
+         }
+         else
+         {
+            ref = messageQueue.removeFirstReference(id);
+            
+            if (ref == null)
+            {
+               throw new IllegalStateException("Cannot find ref after depaging");
+            }
+         }
       }
 
       // We call doHandle rather than handle, since we don't want to check available credits
@@ -457,34 +477,6 @@
 
    // Private --------------------------------------------------------------------------------------
 
-   private MessageReference removeFirstReference(final long id) throws Exception
-   {
-      MessageReference ref = messageQueue.removeFirstReference(id);
-
-      if (ref == null)
-      {
-         // The order is correct, but it hasn't been depaged yet, so we need to force a depage
-         PagingStore store = pagingManager.getPageStore(binding.getAddress());
-         
-         // force a depage
-         if (!store.readPage()) // This returns false if there are no pages
-         {
-            throw new IllegalStateException("Cannot find page " + id);
-         }
-         else
-         {
-            ref = messageQueue.removeFirstReference(id);
-            
-            if (ref == null)
-            {
-               throw new IllegalStateException("Cannot find ref after depaging");
-            }
-         }
-      }
-
-      return ref;
-   }
-   
    private void promptDelivery()
    {
       lock.lock();

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2009-04-03 14:02:02 UTC (rev 6296)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2009-04-03 14:08:00 UTC (rev 6297)
@@ -1208,7 +1208,7 @@
             theQueue = (Queue)binding.getBindable();
          }
 
-         ServerConsumer consumer = new ServerConsumerImpl(idGenerator.generateID(),
+         ServerConsumer consumer = new ServerConsumerImpl(server, idGenerator.generateID(),
                                                           oppositeChannelID,
                                                           this,
                                                           (QueueBinding)binding,
@@ -1245,7 +1245,7 @@
                props.putStringProperty(ManagementHelper.HDR_FILTERSTRING, filterString);
             }
 
-            Notification notification = new Notification(CONSUMER_CREATED, props);
+            Notification notification = new Notification(null, CONSUMER_CREATED, props);
 
             managementService.sendNotification(notification);
          }

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java	2009-04-03 14:02:02 UTC (rev 6296)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java	2009-04-03 14:08:00 UTC (rev 6297)
@@ -86,14 +86,9 @@
 
    private final ServerSession session;
 
-   private final Channel channel;
-
-   public ServerSessionPacketHandler(final ServerSession session, final Channel channel)
-
+   public ServerSessionPacketHandler(final ServerSession session)
    {
       this.session = session;
-
-      this.channel = channel;
    }
 
    public long getID()
@@ -121,8 +116,8 @@
                break;
             }
             case CREATE_QUEUE:
-            {
-               CreateQueueMessage request = (CreateQueueMessage)packet;
+            {               
+               CreateQueueMessage request = (CreateQueueMessage)packet;               
                session.handleCreateQueue(request);             
                break;
             }

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java	2009-04-03 14:02:02 UTC (rev 6296)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java	2009-04-03 14:08:00 UTC (rev 6297)
@@ -175,7 +175,7 @@
                                   final int consumerCount,
                                   final boolean local) throws Exception
    {
-//      log.info("waiting for bindings on node " + node +
+//       log.info("waiting for bindings on node " + node +
 //               " address " +
 //               address +
 //               " count " +
@@ -572,7 +572,13 @@
                {
                   message.acknowledge();
                }
+               
+               //log.info("consumer " + consumerIDs[i] +" returns " + count);
             }
+            else
+            {
+              // log.info("consumer " + consumerIDs[i] +" returns null");
+            }
          }
          while (message != null);
       }
@@ -792,7 +798,7 @@
          serverBackuptc = new TransportConfiguration(INVM_CONNECTOR_FACTORY, backupParams);
       }
 
-      ClientSessionFactory sf = new ClientSessionFactoryImpl(serverTotc, serverBackuptc);
+      ClientSessionFactory sf = new ClientSessionFactoryImpl(serverTotc, serverBackuptc, false, 100, 1d, -1);
 
       sf.setBlockOnNonPersistentSend(blocking);
       sf.setBlockOnPersistentSend(blocking);
@@ -840,13 +846,13 @@
       Configuration configuration = new ConfigurationImpl();
 
       configuration.setSecurityEnabled(false);
-      configuration.setBindingsDirectory(getBindingsDir(node));
+      configuration.setBindingsDirectory(getBindingsDir(node, backup));
       configuration.setJournalMinFiles(2);
       configuration.setJournalDirectory(getJournalDir(node, backup));
       configuration.setJournalFileSize(100 * 1024);
       configuration.setJournalType(JournalType.NIO);
-      configuration.setPagingDirectory(getPageDir(node));
-      configuration.setLargeMessagesDirectory(getLargeMessagesDir(node));
+      configuration.setPagingDirectory(getPageDir(node, backup));
+      configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, backup));
       configuration.setClustered(true);
       configuration.setBackup(backup);
 
@@ -898,8 +904,34 @@
       servers[node] = server;
    }
 
-   protected void setupServerWithDiscovery(int node, String groupAddress, int port, boolean fileStorage, boolean netty)
+   protected void setupServerWithDiscovery(int node,
+                                           String groupAddress,
+                                           int port,
+                                           boolean fileStorage,
+                                           boolean netty,
+                                           boolean backup)
    {
+      this.setupServerWithDiscovery(node, groupAddress, port, fileStorage, netty, backup, -1);
+   }
+
+   protected void setupServerWithDiscovery(int node,
+                                           String groupAddress,
+                                           int port,
+                                           boolean fileStorage,
+                                           boolean netty,
+                                           int backupNode)
+   {
+      this.setupServerWithDiscovery(node, groupAddress, port, fileStorage, netty, false, backupNode);
+   }
+
+   protected void setupServerWithDiscovery(int node,
+                                           String groupAddress,
+                                           int port,
+                                           boolean fileStorage,
+                                           boolean netty,
+                                           boolean backup,
+                                           int backupNode)
+   {
       if (servers[node] != null)
       {
          throw new IllegalArgumentException("Already a server at node " + node);
@@ -908,14 +940,40 @@
       Configuration configuration = new ConfigurationImpl();
 
       configuration.setSecurityEnabled(false);
-      configuration.setBindingsDirectory(getBindingsDir(node));
+      configuration.setBindingsDirectory(getBindingsDir(node, false));
       configuration.setJournalMinFiles(2);
       configuration.setJournalDirectory(getJournalDir(node, false));
       configuration.setJournalFileSize(100 * 1024);
-      configuration.setPagingDirectory(getPageDir(node));
-      configuration.setLargeMessagesDirectory(getLargeMessagesDir(node));
+      configuration.setPagingDirectory(getPageDir(node, false));
+      configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, false));
       configuration.setClustered(true);
+      configuration.setBackup(backup);
 
+      TransportConfiguration nettyBackuptc = null;
+      TransportConfiguration invmBackuptc = null;
+      
+      if (backupNode != -1)
+      {
+         Map<String, Object> backupParams = generateParams(backupNode, netty);
+
+         if (netty)
+         {
+            nettyBackuptc = new TransportConfiguration(NETTY_CONNECTOR_FACTORY, backupParams);
+
+            configuration.getConnectorConfigurations().put(nettyBackuptc.getName(), nettyBackuptc);
+
+            configuration.setBackupConnectorName(nettyBackuptc.getName());
+         }
+         else
+         {
+            invmBackuptc = new TransportConfiguration(INVM_CONNECTOR_FACTORY, backupParams);
+
+            configuration.getConnectorConfigurations().put(invmBackuptc.getName(), invmBackuptc);
+
+            configuration.setBackupConnectorName(invmBackuptc.getName());
+         }
+      }
+
       configuration.getAcceptorConfigurations().clear();
 
       Map<String, Object> params = generateParams(node, netty);
@@ -939,11 +997,11 @@
          TransportConfiguration nettytc_c = new TransportConfiguration(NETTY_CONNECTOR_FACTORY, params);
          configuration.getConnectorConfigurations().put(nettytc_c.getName(), nettytc_c);
 
-         connectorPairs.add(new Pair<String, String>(nettytc_c.getName(), null));
+         connectorPairs.add(new Pair<String, String>(nettytc_c.getName(), nettyBackuptc == null ? null : nettyBackuptc.getName()));
       }
       else
       {
-         connectorPairs.add(new Pair<String, String>(invmtc_c.getName(), null));
+         connectorPairs.add(new Pair<String, String>(invmtc_c.getName(), invmBackuptc == null ? null : invmBackuptc.getName()));
       }
 
       BroadcastGroupConfiguration bcConfig = new BroadcastGroupConfiguration("bg1",
@@ -998,6 +1056,14 @@
          servers[nodes[i]] = null;
       }
    }
+   
+   protected void clearAllServers()
+   {
+      for (int i = 0; i < servers.length; i++)
+      {
+         servers[i] = null;
+      }
+   }
 
    protected void setupClusterConnection(String name,
                                          int nodeFrom,

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SymmetricClusterWithDiscoveryTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SymmetricClusterWithDiscoveryTest.java	2009-04-03 14:02:02 UTC (rev 6296)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SymmetricClusterWithDiscoveryTest.java	2009-04-03 14:08:00 UTC (rev 6297)
@@ -37,9 +37,9 @@
 {
    private static final Logger log = Logger.getLogger(SymmetricClusterWithDiscoveryTest.class);
    
-   private static final String groupAddress = "230.1.2.3";
+   protected static final String groupAddress = "230.1.2.3";
    
-   private static final int groupPort = 6745;
+   protected static final int groupPort = 6745;
 
    protected boolean isNetty()
    {
@@ -74,11 +74,11 @@
    @Override
    protected void setupServers() throws Exception
    {
-      setupServerWithDiscovery(0, groupAddress, groupPort, isFileStorage(), isNetty());
-      setupServerWithDiscovery(1, groupAddress, groupPort, isFileStorage(), isNetty());
-      setupServerWithDiscovery(2, groupAddress, groupPort, isFileStorage(), isNetty());
-      setupServerWithDiscovery(3, groupAddress, groupPort, isFileStorage(), isNetty());
-      setupServerWithDiscovery(4, groupAddress, groupPort, isFileStorage(), isNetty()); 
+      setupServerWithDiscovery(0, groupAddress, groupPort, isFileStorage(), isNetty(), false);
+      setupServerWithDiscovery(1, groupAddress, groupPort, isFileStorage(), isNetty(), false);
+      setupServerWithDiscovery(2, groupAddress, groupPort, isFileStorage(), isNetty(), false);
+      setupServerWithDiscovery(3, groupAddress, groupPort, isFileStorage(), isNetty(), false);
+      setupServerWithDiscovery(4, groupAddress, groupPort, isFileStorage(), isNetty(), false); 
    }
      
 

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/ClusterWithBackupFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/ClusterWithBackupFailoverTest.java	2009-04-03 14:02:02 UTC (rev 6296)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/ClusterWithBackupFailoverTest.java	2009-04-03 14:08:00 UTC (rev 6297)
@@ -27,6 +27,7 @@
 import org.jboss.messaging.core.client.impl.ConnectionManagerImpl;
 import org.jboss.messaging.core.config.TransportConfiguration;
 import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.server.MessagingServer;
 import org.jboss.messaging.tests.integration.cluster.distribution.ClusterTestBase;
 
 /**
@@ -47,7 +48,7 @@
    protected void setUp() throws Exception
    {
       super.setUp();
-      
+
       ConnectionManagerImpl.enableDebug();
 
       setupServers();
@@ -70,96 +71,76 @@
    {
       return false;
    }
-   
-   private void failNode(int node)
+
+   public void testFailAllNodes() throws Exception
    {
-      Map<String, Object> params = generateParams(node, isNetty());
+      this.setupCluster();
 
-      TransportConfiguration serverTC;
+      startServers(3, 4, 5, 0, 1, 2);
 
-      if (isNetty())
-      {
-         serverTC = new TransportConfiguration(NETTY_CONNECTOR_FACTORY, params);
-      }
-      else
-      {
-         serverTC = new TransportConfiguration(INVM_CONNECTOR_FACTORY, params);
-      }
+      setupSessionFactory(0, 3, isNetty(), false);
+      setupSessionFactory(1, 4, isNetty(), false);
+      setupSessionFactory(2, 5, isNetty(), false);
+
+      createQueue(0, "queues.testaddress", "queue0", null, false);
+      createQueue(1, "queues.testaddress", "queue0", null, false);
+      createQueue(2, "queues.testaddress", "queue0", null, false);
+
+      addConsumer(0, 0, "queue0", null);
+      addConsumer(1, 1, "queue0", null);
+      addConsumer(2, 2, "queue0", null);
+
+      waitForBindings(0, "queues.testaddress", 1, 1, true);
+      waitForBindings(1, "queues.testaddress", 1, 1, true);
+      waitForBindings(2, "queues.testaddress", 1, 1, true);
+
+      waitForBindings(0, "queues.testaddress", 2, 2, false);
+      waitForBindings(1, "queues.testaddress", 2, 2, false);
+      waitForBindings(2, "queues.testaddress", 2, 2, false);
+
+      send(0, "queues.testaddress", 10, false, null);
+      verifyReceiveRoundRobinInSomeOrder(10, 0, 1, 2);
+
+      send(1, "queues.testaddress", 10, false, null);
+      verifyReceiveRoundRobinInSomeOrder(10, 0, 1, 2);
+            
+      send(2, "queues.testaddress", 10, false, null);
+      verifyReceiveRoundRobinInSomeOrder(10, 0, 1, 2);
       
-      super.failNode(serverTC);
-   }
+      failNode(0);
 
-   public void testFailAllNodes() throws Exception
-   {           
-      //We do this in a loop a few times
-      
-      final int numIterations = 5;
-      
-      this.setupCluster();
-      
-      for (int i = 0; i < numIterations; i++)
-      {
-         log.info("Iteration " + i);
-         
-         startServers(3, 4, 5, 0, 1, 2);
-         
-         setupSessionFactory(0, 3, isNetty(), false);
-         setupSessionFactory(1, 4, isNetty(), false);
-         setupSessionFactory(2, 5, isNetty(), false);
-         
-         createQueue(0, "queues.testaddress", "queue0", null, false);
-         createQueue(1, "queues.testaddress", "queue0", null, false);
-         createQueue(2, "queues.testaddress", "queue0", null, false);
-   
-         addConsumer(0, 0, "queue0", null);
-         addConsumer(1, 1, "queue0", null);
-         addConsumer(2, 2, "queue0", null);
-   
-         waitForBindings(0, "queues.testaddress", 1, 1, true);
-         waitForBindings(1, "queues.testaddress", 1, 1, true);
-         waitForBindings(2, "queues.testaddress", 1, 1, true);
-   
-         waitForBindings(0, "queues.testaddress", 2, 2, false);
-         waitForBindings(1, "queues.testaddress", 2, 2, false);
-         waitForBindings(2, "queues.testaddress", 2, 2, false);
-   
-         send(0, "queues.testaddress", 10, false, null);
-   
-         verifyReceiveRoundRobinInSomeOrder(10, 0, 1, 2);
-   
-         verifyNotReceive(0, 1, 2);
-         
-         failNode(0);
-                     
-         send(0, "queues.testaddress", 10, false, null);
-         
-         verifyReceiveRoundRobinInSomeOrder(10, 0, 1, 2);
-   
-         verifyNotReceive(0, 1, 2);
-         
-         failNode(1);
-         
-         send(0, "queues.testaddress", 10, false, null);
-         
-         verifyReceiveRoundRobinInSomeOrder(10, 0, 1, 2);
-   
-         verifyNotReceive(0, 1, 2);
-         
-         failNode(2);
-         
-         send(0, "queues.testaddress", 10, false, null);
-         
-         verifyReceiveRoundRobinInSomeOrder(10, 0, 1, 2);
-   
-         verifyNotReceive(0, 1, 2);
-          
-         stopServers();
-         
-         //Need to reset backup status since they will have gone live
-         getServer(3).getConfiguration().setBackup(true);
-         getServer(4).getConfiguration().setBackup(true);
-         getServer(5).getConfiguration().setBackup(true);
-      }            
+      send(0, "queues.testaddress", 10, false, null);
+      verifyReceiveRoundRobinInSomeOrder(10, 0, 1, 2);
+
+      send(1, "queues.testaddress", 10, false, null);
+      verifyReceiveRoundRobinInSomeOrder(10, 0, 1, 2);
+            
+      send(2, "queues.testaddress", 10, false, null);
+      verifyReceiveRoundRobinInSomeOrder(10, 0, 1, 2);
+
+      failNode(1);
+
+      send(0, "queues.testaddress", 10, false, null);
+      verifyReceiveRoundRobinInSomeOrder(10, 0, 1, 2);
+
+//      send(1, "queues.testaddress", 10, false, null);
+//      verifyReceiveRoundRobinInSomeOrder(10, 0, 1, 2);
+//            
+//      send(2, "queues.testaddress", 10, false, null);
+//      verifyReceiveRoundRobinInSomeOrder(10, 0, 1, 2);
+
+      failNode(2);
+
+      send(0, "queues.testaddress", 10, false, null);
+      verifyReceiveRoundRobinInSomeOrder(10, 0, 1, 2);
+
+//      send(1, "queues.testaddress", 10, false, null);
+//      verifyReceiveRoundRobinInSomeOrder(10, 0, 1, 2);
+//            
+//      send(2, "queues.testaddress", 10, false, null);
+//      verifyReceiveRoundRobinInSomeOrder(10, 0, 1, 2);
+
+      stopServers();
    }
 
    protected void setupCluster() throws Exception
@@ -242,8 +223,36 @@
       closeAllConsumers();
 
       closeAllSessionFactories();
-      
+
       stopServers(0, 1, 2, 3, 4, 5);
    }
 
+   protected void failNode(int node) throws Exception
+   {
+      log.info("*** failing node " + node);
+
+      Map<String, Object> params = generateParams(node, isNetty());
+
+      TransportConfiguration serverTC;
+
+      if (isNetty())
+      {
+         serverTC = new TransportConfiguration(NETTY_CONNECTOR_FACTORY, params);
+      }
+      else
+      {
+         serverTC = new TransportConfiguration(INVM_CONNECTOR_FACTORY, params);
+      }
+      
+      MessagingServer server = getServer(node);
+      
+      //Prevent remoting service taking any more connections
+      server.getRemotingService().freeze();
+      
+      server.getClusterManager().stop();
+
+      //Fail all client connections that go to this node
+      super.failNode(serverTC);
+   }
+
 }

Added: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/DiscoveryClusterWithBackupFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/DiscoveryClusterWithBackupFailoverTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/DiscoveryClusterWithBackupFailoverTest.java	2009-04-03 14:08:00 UTC (rev 6297)
@@ -0,0 +1,170 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.cluster.failover;
+
+import org.jboss.messaging.core.logging.Logger;
+
+/**
+ * A DiscoveryClusterWithBackupFailoverTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ *
+ */
+public class DiscoveryClusterWithBackupFailoverTest extends ClusterWithBackupFailoverTest
+{
+   private static final Logger log = Logger.getLogger(DiscoveryClusterWithBackupFailoverTest.class);
+
+   protected static final String groupAddress = "230.1.2.3";
+
+   protected static final int groupPort = 6745;
+
+   protected boolean isNetty()
+   {
+      return false;
+   }
+
+   protected boolean isFileStorage()
+   {
+      return false;
+   }
+
+   @Override
+   public void testFailAllNodes() throws Exception
+   {
+      for (int i = 0; i < 5; i++)
+      {
+         log.info("*** iteration " + i);
+
+         tearDown();
+
+         super.clearAllServers();
+
+         setUp();
+
+         this.setupCluster();
+
+         startServers(3, 4, 5, 0, 1, 2);
+
+         setupSessionFactory(0, 3, isNetty(), false);
+         setupSessionFactory(1, 4, isNetty(), false);
+         setupSessionFactory(2, 5, isNetty(), false);
+
+         createQueue(0, "queues.testaddress", "queue0", null, false);
+         createQueue(1, "queues.testaddress", "queue0", null, false);
+         createQueue(2, "queues.testaddress", "queue0", null, false);
+
+         addConsumer(0, 0, "queue0", null);
+         addConsumer(1, 1, "queue0", null);
+         addConsumer(2, 2, "queue0", null);
+
+         waitForBindings(0, "queues.testaddress", 1, 1, true);
+         waitForBindings(1, "queues.testaddress", 1, 1, true);
+         waitForBindings(2, "queues.testaddress", 1, 1, true);
+
+         waitForBindings(0, "queues.testaddress", 2, 2, false);
+         waitForBindings(1, "queues.testaddress", 2, 2, false);
+         waitForBindings(2, "queues.testaddress", 2, 2, false);
+
+         send(0, "queues.testaddress", 10, false, null);
+         verifyReceiveRoundRobinInSomeOrder(10, 0, 1, 2);
+
+         send(1, "queues.testaddress", 10, false, null);
+         verifyReceiveRoundRobinInSomeOrder(10, 0, 1, 2);
+
+         send(2, "queues.testaddress", 10, false, null);
+         verifyReceiveRoundRobinInSomeOrder(10, 0, 1, 2);
+
+         failNode(0);
+
+         send(0, "queues.testaddress", 10, false, null);
+         verifyReceiveRoundRobinInSomeOrder(10, 0, 1, 2);
+
+         send(1, "queues.testaddress", 10, false, null);
+         verifyReceiveRoundRobinInSomeOrder(10, 0, 1, 2);
+
+         send(2, "queues.testaddress", 10, false, null);
+         verifyReceiveRoundRobinInSomeOrder(10, 0, 1, 2);
+
+         failNode(1);
+
+         send(0, "queues.testaddress", 10, false, null);
+         verifyReceiveRoundRobinInSomeOrder(10, 0, 1, 2);
+
+         send(1, "queues.testaddress", 10, false, null);
+         verifyReceiveRoundRobinInSomeOrder(10, 0, 1, 2);
+
+         send(2, "queues.testaddress", 10, false, null);
+         verifyReceiveRoundRobinInSomeOrder(10, 0, 1, 2);
+
+         failNode(2);
+
+         send(0, "queues.testaddress", 10, false, null);
+         verifyReceiveRoundRobinInSomeOrder(10, 0, 1, 2);
+
+         send(1, "queues.testaddress", 10, false, null);
+         verifyReceiveRoundRobinInSomeOrder(10, 0, 1, 2);
+
+         send(2, "queues.testaddress", 10, false, null);
+         verifyReceiveRoundRobinInSomeOrder(10, 0, 1, 2);
+
+         stopServers();
+      }
+
+   }
+
+   @Override
+   protected void setupCluster(final boolean forwardWhenNoConsumers) throws Exception
+   {
+      // The lives
+
+      setupDiscoveryClusterConnection("cluster0", 0, "dg1", "queues", forwardWhenNoConsumers, 1, isNetty());
+
+      setupDiscoveryClusterConnection("cluster1", 1, "dg1", "queues", forwardWhenNoConsumers, 1, isNetty());
+
+      setupDiscoveryClusterConnection("cluster2", 2, "dg1", "queues", forwardWhenNoConsumers, 1, isNetty());
+
+      // The backups
+
+      setupDiscoveryClusterConnection("cluster0", 3, "dg1", "queues", forwardWhenNoConsumers, 1, isNetty());
+
+      setupDiscoveryClusterConnection("cluster1", 4, "dg1", "queues", forwardWhenNoConsumers, 1, isNetty());
+
+      setupDiscoveryClusterConnection("cluster2", 5, "dg1", "queues", forwardWhenNoConsumers, 1, isNetty());
+   }
+
+   @Override
+   protected void setupServers() throws Exception
+   {
+      // The lives
+      setupServerWithDiscovery(0, groupAddress, groupPort, isFileStorage(), isNetty(), 3);
+      setupServerWithDiscovery(1, groupAddress, groupPort, isFileStorage(), isNetty(), 4);
+      setupServerWithDiscovery(2, groupAddress, groupPort, isFileStorage(), isNetty(), 5);
+
+      // The backups
+      setupServerWithDiscovery(3, groupAddress, groupPort, isFileStorage(), isNetty(), true);
+      setupServerWithDiscovery(4, groupAddress, groupPort, isFileStorage(), isNetty(), true);
+      setupServerWithDiscovery(5, groupAddress, groupPort, isFileStorage(), isNetty(), true);
+   }
+
+}

Added: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FileStorageClusterWithBackupFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FileStorageClusterWithBackupFailoverTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FileStorageClusterWithBackupFailoverTest.java	2009-04-03 14:08:00 UTC (rev 6297)
@@ -0,0 +1,44 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.tests.integration.cluster.failover;
+
+/**
+ * A FileStorageClusterWithBackupFailoverTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ *
+ */
+public class FileStorageClusterWithBackupFailoverTest extends ClusterWithBackupFailoverTest
+{
+   protected boolean isNetty()
+   {
+      return false;
+   }
+
+   protected boolean isFileStorage()
+   {
+      return true;
+   }
+}

Added: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/NettyClusterWithBackupFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/NettyClusterWithBackupFailoverTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/NettyClusterWithBackupFailoverTest.java	2009-04-03 14:08:00 UTC (rev 6297)
@@ -0,0 +1,47 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.tests.integration.cluster.failover;
+
+/**
+ * A NettyClusterWithBackupFailoverTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ *
+ */
+public class NettyClusterWithBackupFailoverTest extends ClusterWithBackupFailoverTest
+{
+   protected boolean isNetty()
+   {
+      return true;
+   }
+
+   protected boolean isFileStorage()
+   {
+      return false;
+   }
+   
+}
+
+

Added: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/NettyFileStorageClusterWithBackupFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/NettyFileStorageClusterWithBackupFailoverTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/NettyFileStorageClusterWithBackupFailoverTest.java	2009-04-03 14:08:00 UTC (rev 6297)
@@ -0,0 +1,44 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.tests.integration.cluster.failover;
+
+/**
+ * A NettyFileStorageClusterWithBackupFailoverTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ *
+ */
+public class NettyFileStorageClusterWithBackupFailoverTest extends ClusterWithBackupFailoverTest 
+{
+   protected boolean isNetty()
+   {
+      return true;
+   }
+
+   protected boolean isFileStorage()
+   {
+      return true;
+   }
+}

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/discovery/DiscoveryTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/discovery/DiscoveryTest.java	2009-04-03 14:02:02 UTC (rev 6296)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/discovery/DiscoveryTest.java	2009-04-03 14:08:00 UTC (rev 6297)
@@ -26,9 +26,9 @@
 
 import java.net.InetAddress;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
+import org.jboss.messaging.core.cluster.DiscoveryEntry;
 import org.jboss.messaging.core.cluster.DiscoveryGroup;
 import org.jboss.messaging.core.cluster.DiscoveryListener;
 import org.jboss.messaging.core.cluster.impl.DiscoveryGroupImpl;
@@ -68,8 +68,10 @@
       final InetAddress groupAddress = InetAddress.getByName(address1);
       final int groupPort = 6745;
       final int timeout = 500;
+      
+      final String nodeID = randomString();
 
-      BroadcastGroup bg = new BroadcastGroupImpl(randomString(), randomString(), -1, groupAddress, groupPort, true);
+      BroadcastGroup bg = new BroadcastGroupImpl(nodeID, randomString(), -1, groupAddress, groupPort, true);
 
       bg.start();
 
@@ -92,15 +94,17 @@
 
       assertTrue(ok);
 
-      List<Pair<TransportConfiguration, TransportConfiguration>> connectors = dg.getConnectors();
+      Map<String, DiscoveryEntry> entryMap = dg.getDiscoveryEntryMap();
 
-      assertNotNull(connectors);
+      assertNotNull(entryMap);
 
-      assertEquals(1, connectors.size());
+      assertEquals(1, entryMap.size());
 
-      Pair<TransportConfiguration, TransportConfiguration> receivedPair = connectors.get(0);
+      DiscoveryEntry entry = entryMap.get(nodeID);
+      
+      assertNotNull(entry);
 
-      assertEquals(connectorPair, receivedPair);
+      assertEquals(connectorPair, entry.getConnectorPair());
 
       bg.stop();
 
@@ -113,8 +117,10 @@
       final InetAddress groupAddress = InetAddress.getByName(address1);
       final int groupPort = 6745;
       final int timeout = 500;
+      
+      final String nodeID = randomString();
 
-      BroadcastGroup bg = new BroadcastGroupImpl(randomString(), randomString(), -1, groupAddress, groupPort, true);
+      BroadcastGroup bg = new BroadcastGroupImpl(nodeID, randomString(), -1, groupAddress, groupPort, true);
 
       bg.start();
 
@@ -137,15 +143,17 @@
 
       assertTrue(ok);
 
-      List<Pair<TransportConfiguration, TransportConfiguration>> connectors = dg.getConnectors();
+      Map<String, DiscoveryEntry> entryMap = dg.getDiscoveryEntryMap();
 
-      assertNotNull(connectors);
+      assertNotNull(entryMap);
 
-      assertEquals(1, connectors.size());
+      assertEquals(1, entryMap.size());
 
-      Pair<TransportConfiguration, TransportConfiguration> receivedPair = connectors.get(0);
+      DiscoveryEntry entry = entryMap.get(nodeID);
+      
+      assertNotNull(entry);
 
-      assertEquals(connectorPair, receivedPair);
+      assertEquals(connectorPair, entry.getConnectorPair());
 
       bg.stop();
 
@@ -161,15 +169,17 @@
 
       assertTrue(ok);
 
-      connectors = dg.getConnectors();
+      entryMap = dg.getDiscoveryEntryMap();
 
-      assertNotNull(connectors);
+      assertNotNull(entryMap);
 
-      assertEquals(1, connectors.size());
+      assertEquals(1, entryMap.size());
 
-      receivedPair = connectors.get(0);
+      entry = entryMap.get(nodeID);
+      
+      assertNotNull(entry);
 
-      assertEquals(connectorPair, receivedPair);
+      assertEquals(connectorPair, entry.getConnectorPair());
 
    }
    
@@ -204,11 +214,11 @@
 
       assertFalse(ok);
 
-      List<Pair<TransportConfiguration, TransportConfiguration>> connectors = dg.getConnectors();
+      Map<String, DiscoveryEntry> entryMap = dg.getDiscoveryEntryMap();
 
-      assertNotNull(connectors);
+      assertNotNull(entryMap);
 
-      assertEquals(0, connectors.size());
+      assertEquals(0, entryMap.size());
 
       bg.stop();
 
@@ -342,14 +352,20 @@
       final int groupPort3 = 6747;
 
       final int timeout = 500;
+      
+      String node1 = randomString();
+      
+      String node2 = randomString();
+      
+      String node3 = randomString();
 
-      BroadcastGroup bg1 = new BroadcastGroupImpl(randomString(), randomString(), -1, groupAddress1, groupPort1, true);
+      BroadcastGroup bg1 = new BroadcastGroupImpl(node1, randomString(), -1, groupAddress1, groupPort1, true);
       bg1.start();
 
-      BroadcastGroup bg2 = new BroadcastGroupImpl(randomString(), randomString(), -1, groupAddress2, groupPort2, true);
+      BroadcastGroup bg2 = new BroadcastGroupImpl(node2, randomString(), -1, groupAddress2, groupPort2, true);
       bg2.start();
 
-      BroadcastGroup bg3 = new BroadcastGroupImpl(randomString(), randomString(), -1, groupAddress3, groupPort3, true);
+      BroadcastGroup bg3 = new BroadcastGroupImpl(node3, randomString(), -1, groupAddress3, groupPort3, true);
       bg3.start();
 
       TransportConfiguration live1 = generateTC();
@@ -391,27 +407,30 @@
 
       boolean ok = dg1.waitForBroadcast(1000);
       assertTrue(ok);
-      List<Pair<TransportConfiguration, TransportConfiguration>> connectors = dg1.getConnectors();
-      assertNotNull(connectors);
-      assertEquals(1, connectors.size());
-      Pair<TransportConfiguration, TransportConfiguration> receivedPair = connectors.get(0);
-      assertEquals(connectorPair1, receivedPair);
+      Map<String, DiscoveryEntry> entryMap = dg1.getDiscoveryEntryMap();
+      assertNotNull(entryMap);
+      assertEquals(1, entryMap.size());
+      DiscoveryEntry entry = entryMap.get(node1);      
+      assertNotNull(entry);
+      assertEquals(connectorPair1, entry.getConnectorPair());
 
       ok = dg2.waitForBroadcast(1000);
       assertTrue(ok);
-      connectors = dg2.getConnectors();
-      assertNotNull(connectors);
-      assertEquals(1, connectors.size());
-      receivedPair = connectors.get(0);
-      assertEquals(connectorPair2, receivedPair);
+      entryMap = dg2.getDiscoveryEntryMap();
+      assertNotNull(entryMap);
+      assertEquals(1, entryMap.size());
+      entry = entryMap.get(node2);      
+      assertNotNull(entry);
+      assertEquals(connectorPair2, entry.getConnectorPair());
 
       ok = dg3.waitForBroadcast(1000);
       assertTrue(ok);
-      connectors = dg3.getConnectors();
-      assertNotNull(connectors);
-      assertEquals(1, connectors.size());
-      receivedPair = connectors.get(0);
-      assertEquals(connectorPair3, receivedPair);
+      entryMap = dg3.getDiscoveryEntryMap();
+      assertNotNull(entryMap);
+      assertEquals(1, entryMap.size());
+      entry = entryMap.get(node3);      
+      assertNotNull(entry);
+      assertEquals(connectorPair3, entry.getConnectorPair());
 
       bg1.stop();
       bg2.stop();
@@ -427,8 +446,10 @@
       final InetAddress groupAddress = InetAddress.getByName(address1);
       final int groupPort = 6745;
       final int timeout = 500;
+      
+      String nodeID = randomString();
 
-      BroadcastGroup bg = new BroadcastGroupImpl(randomString(), randomString(), -1, groupAddress, groupPort, true);
+      BroadcastGroup bg = new BroadcastGroupImpl(nodeID, randomString(), -1, groupAddress, groupPort, true);
 
       bg.start();
 
@@ -449,16 +470,13 @@
 
       assertTrue(ok);
 
-      List<Pair<TransportConfiguration, TransportConfiguration>> connectors = dg.getConnectors();
+      Map<String, DiscoveryEntry> entryMap = dg.getDiscoveryEntryMap();
+      assertNotNull(entryMap);
+      assertEquals(1, entryMap.size());
+      DiscoveryEntry entry = entryMap.get(nodeID);      
+      assertNotNull(entry);
+      assertEquals(connectorPair, entry.getConnectorPair());
 
-      assertNotNull(connectors);
-
-      assertEquals(1, connectors.size());
-
-      Pair<TransportConfiguration, TransportConfiguration> receivedPair = connectors.get(0);
-
-      assertEquals(connectorPair, receivedPair);
-
       bg.stop();
 
       dg.stop();
@@ -470,8 +488,10 @@
       final InetAddress groupAddress = InetAddress.getByName(address1);
       final int groupPort = 6745;
       final int timeout = 500;
+      
+      String nodeID = randomString();
 
-      BroadcastGroup bg = new BroadcastGroupImpl(randomString(), randomString(), -1, groupAddress, groupPort, true);
+      BroadcastGroup bg = new BroadcastGroupImpl(nodeID, randomString(), -1, groupAddress, groupPort, true);
 
       bg.start();
 
@@ -515,66 +535,9 @@
       assertFalse(listener2.called);
       assertFalse(listener3.called);
 
-      listener1.called = false;
-      listener2.called = false;
-      listener3.called = false;
-
-      TransportConfiguration live2 = generateTC();
-
-      Pair<TransportConfiguration, TransportConfiguration> connectorPair2 = new Pair<TransportConfiguration, TransportConfiguration>(live2,
-                                                                                                                                     null);
-
-      bg.addConnectorPair(connectorPair2);
-
-      dg.unregisterListener(listener1);
-
-      bg.broadcastConnectors();
-      ok = dg.waitForBroadcast(1000);
-      assertTrue(ok);
-
-      assertFalse(listener1.called);
-      assertTrue(listener2.called);
-      assertTrue(listener3.called);
-
-      listener1.called = false;
-      listener2.called = false;
-      listener3.called = false;
-
-      dg.unregisterListener(listener2);
-
-      bg.broadcastConnectors();
-      ok = dg.waitForBroadcast(1000);
-      assertTrue(ok);
-
-      assertFalse(listener1.called);
-      assertFalse(listener2.called);
-      assertFalse(listener3.called);
-
-      listener1.called = false;
-      listener2.called = false;
-      listener3.called = false;
-
-      TransportConfiguration live4 = generateTC();
-
-      Pair<TransportConfiguration, TransportConfiguration> connectorPair4 = new Pair<TransportConfiguration, TransportConfiguration>(live4,
-                                                                                                                                     null);
-
-      bg.addConnectorPair(connectorPair4);
-
-      dg.unregisterListener(listener3);
-
-      bg.broadcastConnectors();
-      ok = dg.waitForBroadcast(1000);
-      assertTrue(ok);
-
-      assertFalse(listener1.called);
-      assertFalse(listener2.called);
-      assertFalse(listener3.called);
-
       bg.stop();
 
       dg.stop();
-
    }
 
    public void testConnectorsUpdatedMultipleBroadcasters() throws Exception
@@ -582,14 +545,18 @@
       final InetAddress groupAddress = InetAddress.getByName(address1);
       final int groupPort = 6745;
       final int timeout = 500;
+      
+      String node1 = randomString();
+      String node2 = randomString();
+      String node3 = randomString();
 
-      BroadcastGroup bg1 = new BroadcastGroupImpl(randomString(), randomString(), -1, groupAddress, groupPort, true);
+      BroadcastGroup bg1 = new BroadcastGroupImpl(node1, randomString(), -1, groupAddress, groupPort, true);
       bg1.start();
 
-      BroadcastGroup bg2 = new BroadcastGroupImpl(randomString(), randomString(), -1, groupAddress, groupPort, true);
+      BroadcastGroup bg2 = new BroadcastGroupImpl(node2, randomString(), -1, groupAddress, groupPort, true);
       bg2.start();
 
-      BroadcastGroup bg3 = new BroadcastGroupImpl(randomString(), randomString(), -1, groupAddress, groupPort, true);
+      BroadcastGroup bg3 = new BroadcastGroupImpl(node3, randomString(), -1, groupAddress, groupPort, true);
       bg3.start();
 
       TransportConfiguration live1 = generateTC();
@@ -622,10 +589,12 @@
       bg1.broadcastConnectors();
       boolean ok = dg.waitForBroadcast(1000);
       assertTrue(ok);
-      List<Pair<TransportConfiguration, TransportConfiguration>> connectors = dg.getConnectors();
-      assertNotNull(connectors);
-      assertEquals(1, connectors.size());
-      assertTrue(connectors.contains(connectorPair1));
+      Map<String, DiscoveryEntry> entryMap = dg.getDiscoveryEntryMap();
+      assertNotNull(entryMap);
+      assertEquals(1, entryMap.size());
+      DiscoveryEntry entry = entryMap.get(node1);      
+      assertNotNull(entry);
+      assertEquals(connectorPair1, entry.getConnectorPair());
       assertTrue(listener1.called);
       assertTrue(listener2.called);
       listener1.called = false;
@@ -634,11 +603,15 @@
       bg2.broadcastConnectors();
       ok = dg.waitForBroadcast(1000);
       assertTrue(ok);
-      connectors = dg.getConnectors();
-      assertNotNull(connectors);
-      assertEquals(2, connectors.size());
-      assertTrue(connectors.contains(connectorPair1));
-      assertTrue(connectors.contains(connectorPair2));
+      entryMap = dg.getDiscoveryEntryMap();
+      assertNotNull(entryMap);
+      assertEquals(2, entryMap.size());      
+      DiscoveryEntry entry1 = entryMap.get(node1);      
+      assertNotNull(entry1);
+      assertEquals(connectorPair1, entry1.getConnectorPair());
+      DiscoveryEntry entry2 = entryMap.get(node2);      
+      assertNotNull(entry2);
+      assertEquals(connectorPair2, entry2.getConnectorPair());
       assertTrue(listener1.called);
       assertTrue(listener2.called);
       listener1.called = false;
@@ -647,12 +620,18 @@
       bg3.broadcastConnectors();
       ok = dg.waitForBroadcast(1000);
       assertTrue(ok);
-      connectors = dg.getConnectors();
-      assertNotNull(connectors);
-      assertEquals(3, connectors.size());
-      assertTrue(connectors.contains(connectorPair1));
-      assertTrue(connectors.contains(connectorPair2));
-      assertTrue(connectors.contains(connectorPair3));
+      entryMap = dg.getDiscoveryEntryMap();
+      assertNotNull(entryMap);
+      assertEquals(3, entryMap.size());      
+      entry1 = entryMap.get(node1);      
+      assertNotNull(entry1);
+      assertEquals(connectorPair1, entry1.getConnectorPair());
+      entry2 = entryMap.get(node2);      
+      assertNotNull(entry2);
+      assertEquals(connectorPair2, entry2.getConnectorPair());
+      DiscoveryEntry entry3 = entryMap.get(node3);      
+      assertNotNull(entry3);
+      assertEquals(connectorPair3, entry3.getConnectorPair());
       assertTrue(listener1.called);
       assertTrue(listener2.called);
       listener1.called = false;
@@ -661,12 +640,18 @@
       bg1.broadcastConnectors();
       ok = dg.waitForBroadcast(1000);
       assertTrue(ok);
-      connectors = dg.getConnectors();
-      assertNotNull(connectors);
-      assertEquals(3, connectors.size());
-      assertTrue(connectors.contains(connectorPair1));
-      assertTrue(connectors.contains(connectorPair2));
-      assertTrue(connectors.contains(connectorPair3));
+      entryMap = dg.getDiscoveryEntryMap();
+      assertNotNull(entryMap);
+      assertEquals(3, entryMap.size());      
+      entry1 = entryMap.get(node1);      
+      assertNotNull(entry1);
+      assertEquals(connectorPair1, entry1.getConnectorPair());
+      entry2 = entryMap.get(node2);      
+      assertNotNull(entry2);
+      assertEquals(connectorPair2, entry2.getConnectorPair());
+      entry3 = entryMap.get(node3);      
+      assertNotNull(entry3);
+      assertEquals(connectorPair3, entry3.getConnectorPair());
       assertFalse(listener1.called);
       assertFalse(listener2.called);
       listener1.called = false;
@@ -675,12 +660,18 @@
       bg2.broadcastConnectors();
       ok = dg.waitForBroadcast(1000);
       assertTrue(ok);
-      connectors = dg.getConnectors();
-      assertNotNull(connectors);
-      assertEquals(3, connectors.size());
-      assertTrue(connectors.contains(connectorPair1));
-      assertTrue(connectors.contains(connectorPair2));
-      assertTrue(connectors.contains(connectorPair3));
+      entryMap = dg.getDiscoveryEntryMap();
+      assertNotNull(entryMap);
+      assertEquals(3, entryMap.size());      
+      entry1 = entryMap.get(node1);      
+      assertNotNull(entry1);
+      assertEquals(connectorPair1, entry1.getConnectorPair());
+      entry2 = entryMap.get(node2);      
+      assertNotNull(entry2);
+      assertEquals(connectorPair2, entry2.getConnectorPair());
+      entry3 = entryMap.get(node3);      
+      assertNotNull(entry3);
+      assertEquals(connectorPair3, entry3.getConnectorPair());
       assertFalse(listener1.called);
       assertFalse(listener2.called);
       listener1.called = false;
@@ -689,49 +680,43 @@
       bg3.broadcastConnectors();
       ok = dg.waitForBroadcast(1000);
       assertTrue(ok);
-      connectors = dg.getConnectors();
-      assertNotNull(connectors);
-      assertEquals(3, connectors.size());
-      assertTrue(connectors.contains(connectorPair1));
-      assertTrue(connectors.contains(connectorPair2));
-      assertTrue(connectors.contains(connectorPair3));
+      entryMap = dg.getDiscoveryEntryMap();
+      assertNotNull(entryMap);
+      assertEquals(3, entryMap.size());      
+      entry1 = entryMap.get(node1);      
+      assertNotNull(entry1);
+      assertEquals(connectorPair1, entry1.getConnectorPair());
+      entry2 = entryMap.get(node2);      
+      assertNotNull(entry2);
+      assertEquals(connectorPair2, entry2.getConnectorPair());
+      entry3 = entryMap.get(node3);      
+      assertNotNull(entry3);
+      assertEquals(connectorPair3, entry3.getConnectorPair());
       assertFalse(listener1.called);
       assertFalse(listener2.called);
       listener1.called = false;
       listener2.called = false;
-
-      TransportConfiguration live1_1 = generateTC();
-      TransportConfiguration backup1_1 = generateTC();
-      Pair<TransportConfiguration, TransportConfiguration> connectorPair1_1 = new Pair<TransportConfiguration, TransportConfiguration>(live1_1,
-                                                                                                                                       backup1_1);
-      bg1.addConnectorPair(connectorPair1_1);
-      bg1.broadcastConnectors();
-      ok = dg.waitForBroadcast(1000);
-      assertTrue(ok);
-      connectors = dg.getConnectors();
-      assertNotNull(connectors);
-      assertEquals(4, connectors.size());
-      assertTrue(connectors.contains(connectorPair1));
-      assertTrue(connectors.contains(connectorPair2));
-      assertTrue(connectors.contains(connectorPair3));
-      assertTrue(connectors.contains(connectorPair1_1));
-      assertTrue(listener1.called);
-      assertTrue(listener2.called);
-      listener1.called = false;
-      listener2.called = false;
-
+    
       bg2.removeConnectorPair(connectorPair2);
       bg2.broadcastConnectors();
       ok = dg.waitForBroadcast(1000);
       assertTrue(ok);
-      connectors = dg.getConnectors();
-      assertNotNull(connectors);
-      assertEquals(4, connectors.size());
-      assertTrue(connectors.contains(connectorPair1));
+
       // Connector2 should still be there since not timed out yet
-      assertTrue(connectors.contains(connectorPair2));
-      assertTrue(connectors.contains(connectorPair3));
-      assertTrue(connectors.contains(connectorPair1_1));
+
+      entryMap = dg.getDiscoveryEntryMap();
+      assertNotNull(entryMap);
+      assertEquals(3, entryMap.size());      
+      entry1 = entryMap.get(node1);      
+      assertNotNull(entry1);
+      assertEquals(connectorPair1, entry1.getConnectorPair());
+      entry2 = entryMap.get(node2);      
+      assertNotNull(entry2);
+      assertEquals(connectorPair2, entry2.getConnectorPair());
+      entry3 = entryMap.get(node3);      
+      assertNotNull(entry3);
+      assertEquals(connectorPair3, entry3.getConnectorPair());
+      
       assertFalse(listener1.called);
       assertFalse(listener2.called);
       listener1.called = false;
@@ -746,12 +731,16 @@
       bg3.broadcastConnectors();
       ok = dg.waitForBroadcast(1000);
 
-      connectors = dg.getConnectors();
-      assertNotNull(connectors);
-      assertEquals(3, connectors.size());
-      assertTrue(connectors.contains(connectorPair1));
-      assertTrue(connectors.contains(connectorPair3));
-      assertTrue(connectors.contains(connectorPair1_1));
+      entryMap = dg.getDiscoveryEntryMap();
+      assertNotNull(entryMap);
+      assertEquals(2, entryMap.size());      
+      entry1 = entryMap.get(node1);      
+      assertNotNull(entry1);
+      assertEquals(connectorPair1, entry1.getConnectorPair());     
+      entry3 = entryMap.get(node3);      
+      assertNotNull(entry3);
+      assertEquals(connectorPair3, entry3.getConnectorPair());
+      
       assertTrue(listener1.called);
       assertTrue(listener2.called);
       listener1.called = false;
@@ -769,19 +758,14 @@
       bg3.broadcastConnectors();
       ok = dg.waitForBroadcast(1000);
 
-      connectors = dg.getConnectors();
-      assertNotNull(connectors);
-      assertEquals(1, connectors.size());
-      assertTrue(connectors.contains(connectorPair1_1));
+      entryMap = dg.getDiscoveryEntryMap();
+      assertNotNull(entryMap);
+      assertEquals(0, entryMap.size());           
       assertTrue(listener1.called);
       assertTrue(listener2.called);
       listener1.called = false;
       listener2.called = false;
 
-      bg1.removeConnectorPair(connectorPair1_1);
-
-      Thread.sleep(timeout);
-
       bg1.broadcastConnectors();
       ok = dg.waitForBroadcast(1000);
       bg2.broadcastConnectors();
@@ -789,24 +773,9 @@
       bg3.broadcastConnectors();
       ok = dg.waitForBroadcast(1000);
 
-      connectors = dg.getConnectors();
-      assertNotNull(connectors);
-      assertEquals(0, connectors.size());
-      assertTrue(listener1.called);
-      assertTrue(listener2.called);
-      listener1.called = false;
-      listener2.called = false;
-
-      bg1.broadcastConnectors();
-      ok = dg.waitForBroadcast(1000);
-      bg2.broadcastConnectors();
-      ok = dg.waitForBroadcast(1000);
-      bg3.broadcastConnectors();
-      ok = dg.waitForBroadcast(1000);
-
-      connectors = dg.getConnectors();
-      assertNotNull(connectors);
-      assertEquals(0, connectors.size());
+      entryMap = dg.getDiscoveryEntryMap();
+      assertNotNull(entryMap);
+      assertEquals(0, entryMap.size());   
       assertFalse(listener1.called);
       assertFalse(listener2.called);
 
@@ -822,8 +791,10 @@
       final InetAddress groupAddress = InetAddress.getByName(address1);
       final int groupPort = 6745;
       final int timeout = 500;
+      
+      String nodeID = randomString();
 
-      BroadcastGroup bg = new BroadcastGroupImpl(randomString(), randomString(), -1, groupAddress, groupPort, true);
+      BroadcastGroup bg = new BroadcastGroupImpl(nodeID, randomString(), -1, groupAddress, groupPort, true);
 
       bg.start();
 
@@ -834,6 +805,7 @@
                                                                                                                                      backup1);
 
       bg.addConnectorPair(connectorPair1);
+            
 
       DiscoveryGroup dg1 = new DiscoveryGroupImpl(randomString(), randomString(), groupAddress, groupPort, timeout);
 
@@ -849,56 +821,32 @@
 
       boolean ok = dg1.waitForBroadcast(1000);
       assertTrue(ok);
-      List<Pair<TransportConfiguration, TransportConfiguration>> connectors = dg1.getConnectors();
-      assertNotNull(connectors);
-      assertEquals(1, connectors.size());
-      assertTrue(connectors.contains(connectorPair1));
+      Map<String, DiscoveryEntry> entryMap = dg1.getDiscoveryEntryMap();
+      assertNotNull(entryMap);
+      assertEquals(1, entryMap.size());
+      DiscoveryEntry entry = entryMap.get(nodeID);      
+      assertNotNull(entry);
+      assertEquals(connectorPair1, entry.getConnectorPair());
 
       ok = dg2.waitForBroadcast(1000);
       assertTrue(ok);
-      connectors = dg2.getConnectors();
-      assertNotNull(connectors);
-      assertEquals(1, connectors.size());
-      assertTrue(connectors.contains(connectorPair1));
+      entryMap = dg2.getDiscoveryEntryMap();
+      assertNotNull(entryMap);
+      assertEquals(1, entryMap.size());
+      entry = entryMap.get(nodeID);      
+      assertNotNull(entry);
+      assertEquals(connectorPair1, entry.getConnectorPair());
+      
+      
       ok = dg3.waitForBroadcast(1000);
       assertTrue(ok);
-      connectors = dg3.getConnectors();
-      assertNotNull(connectors);
-      assertEquals(1, connectors.size());
-      assertTrue(connectors.contains(connectorPair1));
-
-      TransportConfiguration live2 = generateTC();
-      TransportConfiguration backup2 = generateTC();
-      Pair<TransportConfiguration, TransportConfiguration> connectorPair2 = new Pair<TransportConfiguration, TransportConfiguration>(live2,
-                                                                                                                                     backup2);
-
-      bg.addConnectorPair(connectorPair2);
-
-      bg.broadcastConnectors();
-      ok = dg1.waitForBroadcast(1000);
-      assertTrue(ok);
-      connectors = dg1.getConnectors();
-      assertNotNull(connectors);
-      assertEquals(2, connectors.size());
-      assertTrue(connectors.contains(connectorPair1));
-      assertTrue(connectors.contains(connectorPair2));
-
-      ok = dg2.waitForBroadcast(1000);
-      assertTrue(ok);
-      connectors = dg2.getConnectors();
-      assertNotNull(connectors);
-      assertEquals(2, connectors.size());
-      assertTrue(connectors.contains(connectorPair1));
-      assertTrue(connectors.contains(connectorPair2));
-
-      ok = dg3.waitForBroadcast(1000);
-      assertTrue(ok);
-      connectors = dg3.getConnectors();
-      assertNotNull(connectors);
-      assertEquals(2, connectors.size());
-      assertTrue(connectors.contains(connectorPair1));
-      assertTrue(connectors.contains(connectorPair2));
-
+      entryMap = dg3.getDiscoveryEntryMap();
+      assertNotNull(entryMap);
+      assertEquals(1, entryMap.size());
+      entry = entryMap.get(nodeID);      
+      assertNotNull(entry);
+      assertEquals(connectorPair1, entry.getConnectorPair());
+      
       bg.stop();
 
       dg1.stop();

Modified: trunk/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java	2009-04-03 14:02:02 UTC (rev 6296)
+++ trunk/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java	2009-04-03 14:08:00 UTC (rev 6297)
@@ -22,6 +22,7 @@
 
 package org.jboss.messaging.tests.util;
 
+import java.io.File;
 import java.lang.ref.WeakReference;
 import java.util.HashMap;
 import java.util.Map;
@@ -110,6 +111,12 @@
 
    protected void clearData(String testDir)
    {
+      //Need to delete the root
+      
+      File file = new File(testDir);
+      deleteDirectory(file);
+      file.mkdirs();
+      
       recreateDirectory(getJournalDir(testDir));
       recreateDirectory(getBindingsDir(testDir));
       recreateDirectory(getPageDir(testDir));
@@ -237,13 +244,13 @@
    {
       Configuration configuration = new ConfigurationImpl();
       configuration.setSecurityEnabled(false);
-      configuration.setBindingsDirectory(getBindingsDir(index));
+      configuration.setBindingsDirectory(getBindingsDir(index, false));
       configuration.setJournalMinFiles(2);
       configuration.setJournalDirectory(getJournalDir(index, false));
       configuration.setJournalFileSize(100 * 1024);
       configuration.setJournalType(JournalType.NIO);
-      configuration.setPagingDirectory(getPageDir(index));
-      configuration.setLargeMessagesDirectory(getLargeMessagesDir(index));
+      configuration.setPagingDirectory(getPageDir(index, false));
+      configuration.setLargeMessagesDirectory(getLargeMessagesDir(index, false));
 
       configuration.getAcceptorConfigurations().clear();
 

Modified: trunk/tests/src/org/jboss/messaging/tests/util/UnitTestCase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/util/UnitTestCase.java	2009-04-03 14:02:02 UTC (rev 6296)
+++ trunk/tests/src/org/jboss/messaging/tests/util/UnitTestCase.java	2009-04-03 14:08:00 UTC (rev 6297)
@@ -251,9 +251,9 @@
    /**
     * @return the bindingsDir
     */
-   protected String getBindingsDir(int index)
+   protected String getBindingsDir(int index, boolean backup)
    {
-      return getBindingsDir(testDir) + index;
+      return getBindingsDir(testDir) + index + "-" + (backup ? "B" : "L");
    }
 
    /**
@@ -272,9 +272,9 @@
       return testDir + "/page";
    }
    
-   protected String getPageDir(int index)
+   protected String getPageDir(int index, boolean backup)
    {
-      return getPageDir(testDir) + index;
+      return getPageDir(testDir) + index + "-" + (backup ? "B" : "L");
    }
 
    /**
@@ -293,9 +293,9 @@
       return testDir + "/large-msg";
    }
    
-   protected String getLargeMessagesDir(int index)
+   protected String getLargeMessagesDir(int index, boolean backup)
    {
-      return getLargeMessagesDir(testDir) + index;
+      return getLargeMessagesDir(testDir) + index + "-" + (backup ? "B" : "L");
    }
 
    /**
@@ -464,9 +464,6 @@
       return buffer.array();
    }
 
-   
-
-   
    protected void recreateDirectory(String directory)
    {
       File file = new File(directory);
@@ -474,7 +471,6 @@
       file.mkdirs();
    }
 
-
    protected boolean deleteDirectory(File directory)
    {
       if (directory.isDirectory())




More information about the jboss-cvs-commits mailing list