[jboss-cvs] JBoss Messaging SVN: r5905 - in trunk: src/main/org/jboss/messaging/core/client and 13 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Feb 19 14:09:41 EST 2009


Author: timfox
Date: 2009-02-19 14:09:40 -0500 (Thu, 19 Feb 2009)
New Revision: 5905

Added:
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusteredRequestResponseTest.java
Modified:
   trunk/.classpath
   trunk/src/main/org/jboss/messaging/core/client/ClientSession.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
   trunk/src/main/org/jboss/messaging/core/cluster/impl/DiscoveryGroupImpl.java
   trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java
   trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/RollbackMessage.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BroadcastGroupImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
   trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnection.java
   trunk/src/main/org/jboss/messaging/util/SimpleString.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java
   trunk/tests/src/org/jboss/messaging/tests/integration/discovery/DiscoveryTest.java
Log:
various fixes and tweaks, more tests

Modified: trunk/.classpath
===================================================================
--- trunk/.classpath	2009-02-19 17:36:47 UTC (rev 5904)
+++ trunk/.classpath	2009-02-19 19:09:40 UTC (rev 5905)
@@ -65,6 +65,5 @@
 	<classpathentry kind="lib" path="thirdparty/netty/lib/netty-3.1.0.ALPHA2.jar" sourcepath="thirdparty/netty/lib/netty-3.1.0.ALPHA2-sources.jar"/>
 	<classpathentry kind="lib" path="thirdparty/apache-mina/lib/mina-core-2.0.0-M4.jar"/>
 	<classpathentry kind="lib" path="thirdparty/slf4j/log4j/lib/slf4j-log4j12-1.5.2.jar"/>
-	<classpathentry kind="lib" path="thirdparty/slf4j/api/lib/slf4j-api-1.5.2.jar"/>
 	<classpathentry kind="output" path="eclipse-output"/>
 </classpath>

Modified: trunk/src/main/org/jboss/messaging/core/client/ClientSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ClientSession.java	2009-02-19 17:36:47 UTC (rev 5904)
+++ trunk/src/main/org/jboss/messaging/core/client/ClientSession.java	2009-02-19 19:09:40 UTC (rev 5905)
@@ -162,10 +162,10 @@
    void rollback() throws MessagingException;
 
    /**
-    * @param isLastMessageAsDelived the first message on deliveringMessage Buffer is considered as delivered
+    * @param considerLastMessageAsDelivered the first message on deliveringMessage Buffer is considered as delivered
     * @throws MessagingException
     */
-   void rollback(boolean isLastMessageAsDelived) throws MessagingException;
+   void rollback(boolean considerLastMessageAsDelivered) throws MessagingException;
 
    void close() throws MessagingException;
 

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2009-02-19 17:36:47 UTC (rev 5904)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2009-02-19 19:09:40 UTC (rev 5905)
@@ -809,7 +809,7 @@
       {
          return true;
       }
-
+      
       boolean ok = false;
 
       // We lock the channel to prevent any packets to be added to the resend

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java	2009-02-19 17:36:47 UTC (rev 5904)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java	2009-02-19 19:09:40 UTC (rev 5905)
@@ -608,7 +608,6 @@
       
       Map<RemotingConnection, List<ClientSessionInternal>> sessionsPerConnection = new HashMap<RemotingConnection, List<ClientSessionInternal>>();
 
-      
       for (Map.Entry<ClientSessionInternal, RemotingConnection> entry : sessions.entrySet())
       {
          ClientSessionInternal session = entry.getKey();
@@ -680,7 +679,7 @@
             if (!b)
             {
                //If a session fails to re-attach we doom the lot, but we make sure we try all sessions and don't exit early
-               //or connections might be left lying around
+               //or connections might be left lying around               
                ok = false;
             }
          }

Modified: trunk/src/main/org/jboss/messaging/core/cluster/impl/DiscoveryGroupImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/cluster/impl/DiscoveryGroupImpl.java	2009-02-19 17:36:47 UTC (rev 5904)
+++ trunk/src/main/org/jboss/messaging/core/cluster/impl/DiscoveryGroupImpl.java	2009-02-19 19:09:40 UTC (rev 5905)
@@ -34,12 +34,11 @@
 import java.util.List;
 import java.util.Map;
 
+import org.jboss.messaging.core.cluster.DiscoveryGroup;
 import org.jboss.messaging.core.cluster.DiscoveryListener;
 import org.jboss.messaging.core.config.TransportConfiguration;
 import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.cluster.DiscoveryGroup;
 import org.jboss.messaging.util.Pair;
-import org.jboss.messaging.util.SimpleString;
 
 /**
  * A DiscoveryGroupImpl
@@ -61,7 +60,7 @@
 
    private final String name;
 
-   private final Thread thread;
+   private Thread thread;
 
    private boolean received;
 
@@ -74,6 +73,10 @@
    private volatile boolean started;
 
    private final String nodeID;
+   
+   private final InetAddress groupAddress;
+   
+   private final int groupPort;
 
    public DiscoveryGroupImpl(final String nodeID,
                              final String name,
@@ -83,19 +86,13 @@
    {
       this.nodeID = nodeID;
 
-      this.name = name;
+      this.name = name;     
 
-      socket = new MulticastSocket(groupPort);
-
-      socket.joinGroup(groupAddress);
-
-      socket.setSoTimeout(SOCKET_TIMEOUT);
-
-      this.timeout = timeout;
-
-      thread = new Thread(this);
-
-      thread.setDaemon(true);
+      this.timeout = timeout;     
+      
+      this.groupAddress = groupAddress;
+      
+      this.groupPort = groupPort;
    }
 
    public synchronized void start() throws Exception
@@ -105,8 +102,18 @@
          return;
       }
       
+      socket = new MulticastSocket(groupPort);
+
+      socket.joinGroup(groupAddress);
+
+      socket.setSoTimeout(SOCKET_TIMEOUT);
+
       started = true;
       
+      thread = new Thread(this);
+
+      thread.setDaemon(true);
+
       thread.start();
    }
 
@@ -131,6 +138,10 @@
       }
 
       socket.close();
+      
+      socket = null;
+      
+      thread = null;
    }
 
    public boolean isStarted()
@@ -194,7 +205,7 @@
             {
                return;
             }
-  
+
             final DatagramPacket packet = new DatagramPacket(data, data.length);
 
             try
@@ -221,7 +232,7 @@
 
             if (nodeID.equals(originatingNodeID))
             {
-               //Ignore traffic from own node
+               // Ignore traffic from own node
                continue;
             }
 
@@ -324,40 +335,4 @@
       }
    }
 
-   private String replaceWildcardChars(final String str)
-   {
-      return str.replace('.', '-');
-   }
-
-   private SimpleString generateConnectorString(final TransportConfiguration config) throws Exception
-   {
-      StringBuilder str = new StringBuilder(replaceWildcardChars(config.getFactoryClassName()));
-
-      if (config.getParams() != null)
-      {
-         if (!config.getParams().isEmpty())
-         {
-            str.append("?");
-         }
-
-         boolean first = true;
-         for (Map.Entry<String, Object> entry : config.getParams().entrySet())
-         {
-            if (!first)
-            {
-               str.append("&");
-            }
-            String encodedKey = replaceWildcardChars(entry.getKey());
-
-            String val = entry.getValue().toString();
-            String encodedVal = replaceWildcardChars(val);
-
-            str.append(encodedKey).append('=').append(encodedVal);
-
-            first = false;
-         }
-      }
-
-      return new SimpleString(str.toString());
-   }
 }

Modified: trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java	2009-02-19 17:36:47 UTC (rev 5904)
+++ trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java	2009-02-19 19:09:40 UTC (rev 5905)
@@ -24,7 +24,6 @@
 
 package org.jboss.messaging.core.management.impl;
 
-
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.nio.ByteBuffer;
@@ -182,7 +181,7 @@
                                                  broadcaster,
                                                  queueFactory);
       ObjectName objectName = ObjectNames.getMessagingServerObjectName();
-      registerInJMX(objectName, new ReplicationAwareMessagingServerControlWrapper(objectName, 
+      registerInJMX(objectName, new ReplicationAwareMessagingServerControlWrapper(objectName,
                                                                                   managedServer,
                                                                                   replicationInvoker));
       registerInRegistry(objectName, managedServer);
@@ -242,9 +241,7 @@
       messageCounterManager.registerMessageCounter(queue.getName().toString(), counter);
       ObjectName objectName = ObjectNames.getQueueObjectName(address, queue.getName());
       QueueControl queueControl = new QueueControl(queue, postOffice, addressSettingsRepository, counter);
-      registerInJMX(objectName, new ReplicationAwareQueueControlWrapper(objectName,
-                                                                        queueControl,
-                                                                        replicationInvoker));
+      registerInJMX(objectName, new ReplicationAwareQueueControlWrapper(objectName, queueControl, replicationInvoker));
       registerInRegistry(objectName, queueControl);
 
       if (log.isDebugEnabled())
@@ -352,8 +349,11 @@
                {
                   exceptionMessage = ((InvocationTargetException)e).getTargetException().getMessage();
                }
-               message.putStringProperty(ManagementHelper.HDR_JMX_OPERATION_EXCEPTION,
-                                         new SimpleString(exceptionMessage));
+               if (e != null)
+               {
+                  message.putStringProperty(ManagementHelper.HDR_JMX_OPERATION_EXCEPTION,
+                                            new SimpleString(exceptionMessage));
+               }
             }
          }
       }
@@ -426,47 +426,47 @@
    {
       listeners.remove(listener);
    }
-   
+
    public SimpleString getManagementAddress()
    {
       return managementAddress;
    }
-   
+
    public void setManagementAddress(SimpleString managementAddress)
    {
       this.managementAddress = managementAddress;
    }
-   
+
    public SimpleString getManagementNotificationAddress()
    {
       return managementNotificationAddress;
    }
-   
+
    public void setManagementNotificationAddress(SimpleString managementNotificationAddress)
    {
-      this.managementNotificationAddress = managementNotificationAddress;  
+      this.managementNotificationAddress = managementNotificationAddress;
    }
 
    public String getClusterPassword()
    {
       return managementClusterPassword;
    }
-   
+
    public void setClusterPassword(String clusterPassword)
    {
       this.managementClusterPassword = clusterPassword;
    }
-   
+
    public long getManagementRequestTimeout()
    {
       return managementRequestTimeout;
    }
-   
+
    public void setManagementRequestTimeout(long timeout)
    {
       this.managementRequestTimeout = timeout;
    }
-   
+
    public ReplicationOperationInvoker getReplicationOperationInvoker()
    {
       return replicationInvoker;
@@ -476,7 +476,9 @@
 
    public void start() throws Exception
    {
-      replicationInvoker = new ReplicationOperationInvokerImpl(managementClusterPassword, managementAddress, managementRequestTimeout);
+      replicationInvoker = new ReplicationOperationInvokerImpl(managementClusterPassword,
+                                                               managementAddress,
+                                                               managementRequestTimeout);
       started = true;
    }
 
@@ -489,11 +491,11 @@
          unregisterResource(objectName);
       }
 
-      //FIXME the replicationInvoker should be properly stopped.
+      // FIXME the replicationInvoker should be properly stopped.
       // the code is commented since stopping the invoker will interact
       // with the remoting service which is stopped first when stopping the server
       // replicationInvoker.stop();
-      
+
       started = false;
    }
 
@@ -570,9 +572,10 @@
             {
                notifProps = new TypedProperties();
             }
-            
-            notifProps.putStringProperty(ManagementHelper.HDR_NOTIFICATION_TYPE, new SimpleString(notification.getType().toString()));
-            
+
+            notifProps.putStringProperty(ManagementHelper.HDR_NOTIFICATION_TYPE,
+                                         new SimpleString(notification.getType().toString()));
+
             notifProps.putLongProperty(ManagementHelper.HDR_NOTIFICATION_TIMESTAMP, System.currentTimeMillis());
 
             notificationMessage.putTypedProperties(notifProps);

Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java	2009-02-19 17:36:47 UTC (rev 5904)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java	2009-02-19 19:09:40 UTC (rev 5905)
@@ -62,16 +62,16 @@
    private final Map<Integer, Binding> bindingsMap = new ConcurrentHashMap<Integer, Binding>();
 
    private final List<Binding> exclusiveBindings = new CopyOnWriteArrayList<Binding>();
-   
+
    private volatile boolean routeWhenNoConsumers;
 
    public void setRouteWhenNoConsumers(final boolean routeWhenNoConsumers)
-   {      
+   {
       this.routeWhenNoConsumers = routeWhenNoConsumers;
    }
-   
+
    public Collection<Binding> getBindings()
-   {      
+   {
       return bindingsMap.values();
    }
 
@@ -101,8 +101,8 @@
 
          bindings.add(binding);
       }
-      
-      bindingsMap.put(binding.getID(), binding);           
+
+      bindingsMap.put(binding.getID(), binding);
    }
 
    public void removeBinding(final Binding binding)
@@ -128,54 +128,54 @@
          }
       }
 
-      bindingsMap.remove(binding.getID());          
+      bindingsMap.remove(binding.getID());
    }
 
    private void routeFromCluster(final ServerMessage message, final Transaction tx) throws Exception
    {
       byte[] ids = (byte[])message.getProperty(MessageImpl.HDR_ROUTE_TO_IDS);
-      
+
       ByteBuffer buff = ByteBuffer.wrap(ids);
-      
+
       Set<Bindable> chosen = new HashSet<Bindable>();
-      
+
       while (buff.hasRemaining())
       {
          int bindingID = buff.getInt();
-         
+
          Binding binding = bindingsMap.get(bindingID);
-         
+
          if (binding == null)
          {
-            //The binding has been closed - we need to route the message somewhere else...............
+            // The binding has been closed - we need to route the message somewhere else...............
             throw new IllegalStateException("Binding not found when routing from cluster - it must have closed");
-            
-            //FIXME need to deal with this better            
+
+            // FIXME need to deal with this better
          }
-         
+
          binding.willRoute(message);
-         
+
          chosen.add(binding.getBindable());
       }
-      
+
       for (Bindable bindable : chosen)
       {
          bindable.preroute(message, tx);
       }
-      
+
       for (Bindable bindable : chosen)
       {
          bindable.route(message, tx);
       }
    }
-   
+
    public boolean redistribute(final ServerMessage message, final SimpleString routingName, final Transaction tx) throws Exception
    {
       if (routeWhenNoConsumers)
       {
          return false;
       }
-      
+
       List<Binding> bindings = routingNameBindingMap.get(routingName);
 
       if (bindings == null)
@@ -195,7 +195,7 @@
 
       Binding theBinding = null;
 
-      //TODO - combine this with similar logic in route()
+      // TODO - combine this with similar logic in route()
       while (true)
       {
          Binding binding;
@@ -217,40 +217,40 @@
                break;
             }
          }
-         
+
          pos = incrementPos(pos, length);
 
          Filter filter = binding.getFilter();
-                                            
+
          boolean highPrior = binding.isHighAcceptPriority(message);
-         
+
          if (highPrior && (filter == null || filter.match(message)))
-         {                     
+         {
             theBinding = binding;
 
-            break;            
+            break;
          }
-         
+
          if (pos == startPos)
-         {            
+         {
             break;
          }
       }
-      
+
       routingNamePositions.put(routingName, pos);
 
       if (theBinding != null)
-      {         
+      {
          theBinding.willRoute(message);
-         
+
          theBinding.getBindable().preroute(message, tx);
-         
-         theBinding.getBindable().route(message, tx);         
-         
+
+         theBinding.getBindable().route(message, tx);
+
          return true;
       }
       else
-      {        
+      {
          return false;
       }
    }
@@ -267,38 +267,38 @@
       else
       {
          if (message.getProperty(MessageImpl.HDR_FROM_CLUSTER) != null)
-         {            
+         {
             routeFromCluster(message, tx);
          }
          else
          {
             Set<Bindable> chosen = new HashSet<Bindable>();
-   
+
             for (Map.Entry<SimpleString, List<Binding>> entry : routingNameBindingMap.entrySet())
             {
                SimpleString routingName = entry.getKey();
-   
+
                List<Binding> bindings = entry.getValue();
-   
+
                if (bindings == null)
                {
                   // The value can become null if it's concurrently removed while we're iterating - this is expected
                   // ConcurrentHashMap behaviour!
                   continue;
                }
-   
+
                Integer ipos = routingNamePositions.get(routingName);
-   
+
                int pos = ipos != null ? ipos.intValue() : 0;
-   
+
                int length = bindings.size();
-   
+
                int startPos = pos;
-   
+
                Binding theBinding = null;
-   
+
                int lastLowPriorityBinding = -1;
-   
+
                while (true)
                {
                   Binding binding;
@@ -312,7 +312,7 @@
                      if (!bindings.isEmpty())
                      {
                         pos = 0;
-   
+
                         continue;
                      }
                      else
@@ -320,19 +320,19 @@
                         break;
                      }
                   }
-   
+
                   Filter filter = binding.getFilter();
-                                                     
+
                   if (filter == null || filter.match(message))
-                  {                     
+                  {
                      // bindings.length == 1 ==> only a local queue so we don't check for matching consumers (it's an
                      // unnecessary overhead)
                      if (length == 1 || routeWhenNoConsumers || binding.isHighAcceptPriority(message))
                      {
                         theBinding = binding;
-   
+
                         pos = incrementPos(pos, length);
-   
+
                         break;
                      }
                      else
@@ -343,13 +343,13 @@
                         }
                      }
                   }
-   
+
                   pos = incrementPos(pos, length);
-   
+
                   if (pos == startPos)
                   {
                      if (lastLowPriorityBinding != -1)
-                     {                     
+                     {
                         try
                         {
                            theBinding = bindings.get(pos);
@@ -360,9 +360,9 @@
                            if (!bindings.isEmpty())
                            {
                               pos = 0;
-                              
+
                               lastLowPriorityBinding = -1;
-   
+
                               continue;
                            }
                            else
@@ -370,32 +370,32 @@
                               break;
                            }
                         }
-                                            
+
                         pos = lastLowPriorityBinding;
-   
+
                         pos = incrementPos(pos, length);
                      }
                      break;
                   }
                }
-   
+
                if (theBinding != null)
                {
                   theBinding.willRoute(message);
-                  
+
                   chosen.add(theBinding.getBindable());
                }
 
                routingNamePositions.put(routingName, pos);
             }
-   
-            //TODO refactor to do this is one iteration
-            
+
+            // TODO refactor to do this is one iteration
+
             for (Bindable bindable : chosen)
             {
                bindable.preroute(message, tx);
             }
-            
+
             for (Bindable bindable : chosen)
             {
                bindable.route(message, tx);

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2009-02-19 17:36:47 UTC (rev 5904)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2009-02-19 19:09:40 UTC (rev 5905)
@@ -260,7 +260,8 @@
    // debug only stuff
 
    private boolean createdActive;
-
+   
+   
    // Constructors
    // ---------------------------------------------------------------------------------
 
@@ -295,7 +296,7 @@
                                  final long connectionTTL)
 
    {
-      this(transportConnection, -1, -1, connectionTTL, null, interceptors, replicatingConnection, active, false);
+      this(transportConnection, -1, -1, connectionTTL, null, interceptors, replicatingConnection, active, false);    
    }
 
    private RemotingConnectionImpl(final Connection transportConnection,
@@ -448,7 +449,7 @@
          destroyed = true;
       }
 
-      log.warn("Connection failed " + System.identityHashCode(this) + " " + me.getMessage());
+      log.warn("Connection failed, client " + client + " " + System.identityHashCode(this) + " " + me.getMessage(), me);
 
       // Then call the listeners
       callListeners(me);

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/RollbackMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/RollbackMessage.java	2009-02-19 17:36:47 UTC (rev 5904)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/RollbackMessage.java	2009-02-19 19:09:40 UTC (rev 5905)
@@ -45,32 +45,33 @@
       super(SESS_ROLLBACK);
    }
 
-   public RollbackMessage(final boolean isLastMessageAsDelived)
+   public RollbackMessage(final boolean considerLastMessageAsDelivered)
    {
       super(SESS_ROLLBACK);
-      this.isLastMessageAsDelived = isLastMessageAsDelived;
+      
+      this.considerLastMessageAsDelivered = considerLastMessageAsDelivered;
    }
 
    // Constants -----------------------------------------------------
 
    // Attributes ----------------------------------------------------
 
-   private boolean isLastMessageAsDelived;
+   private boolean considerLastMessageAsDelivered;
 
    /**
-    * @return the isLastMessageAsDelived
+    * @return the considerLastMessageAsDelivered
     */
-   public boolean isLastMessageAsDelived()
+   public boolean isConsiderLastMessageAsDelivered()
    {
-      return isLastMessageAsDelived;
+      return considerLastMessageAsDelivered;
    }
 
    /**
-    * @param isLastMessageAsDelived the isLastMessageAsDelived to set
+    * @param considerLastMessageAsDelivered the considerLastMessageAsDelivered to set
     */
-   public void setLastMessageAsDelived(final boolean isLastMessageAsDelived)
+   public void setConsiderLastMessageAsDelivered(final boolean isLastMessageAsDelived)
    {
-      this.isLastMessageAsDelived = isLastMessageAsDelived;
+      this.considerLastMessageAsDelivered = isLastMessageAsDelived;
    }
 
    /* (non-Javadoc)
@@ -85,13 +86,13 @@
    @Override
    public void encodeBody(final MessagingBuffer buffer)
    {
-      buffer.putBoolean(isLastMessageAsDelived);
+      buffer.putBoolean(considerLastMessageAsDelivered);
    }
 
    @Override
    public void decodeBody(final MessagingBuffer buffer)
    {
-      this.isLastMessageAsDelived = buffer.getBoolean();
+      this.considerLastMessageAsDelivered = buffer.getBoolean();
    }
 
    // Static --------------------------------------------------------

Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BroadcastGroupImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BroadcastGroupImpl.java	2009-02-19 17:36:47 UTC (rev 5904)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BroadcastGroupImpl.java	2009-02-19 19:09:40 UTC (rev 5905)
@@ -205,43 +205,6 @@
    public synchronized void setScheduledFuture(final ScheduledFuture<?> future)
    {
       this.future = future;
-   }
-   
-   private String replaceWildcardChars(final String str)
-   {
-      return str.replace('.', '-');
-   }
+   }   
 
-   private SimpleString generateConnectorString(final TransportConfiguration config) throws Exception
-   {
-      StringBuilder str = new StringBuilder(replaceWildcardChars(config.getFactoryClassName()));
-
-      if (config.getParams() != null)
-      {
-         if (!config.getParams().isEmpty())
-         {
-            str.append("?");
-         }
-
-         boolean first = true;
-         for (Map.Entry<String, Object> entry : config.getParams().entrySet())
-         {
-            if (!first)
-            {
-               str.append("&");
-            }
-            String encodedKey = replaceWildcardChars(entry.getKey());
-
-            String val = entry.getValue().toString();
-            String encodedVal = replaceWildcardChars(val);
-
-            str.append(encodedKey).append('=').append(encodedVal);
-
-            first = false;
-         }
-      }
-
-      return new SimpleString(str.toString());
-   }
-
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2009-02-19 17:36:47 UTC (rev 5904)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2009-02-19 19:09:40 UTC (rev 5905)
@@ -397,8 +397,6 @@
       // This is the last thing done at the start, after everything else is up and running
       pagingManager.startGlobalDepage();
 
-      log.info("Started messaging server");
-
       started = true;
    }
 
@@ -918,7 +916,7 @@
          throw new MessagingException(MessagingException.INCOMPATIBLE_CLIENT_SERVER_VERSIONS,
                                       "client not compatible with version: " + version.getFullVersion());
       }
-
+      
       // Is this comment relevant any more ?
 
       // Authenticate. Successful autentication will place a new SubjectContext

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java	2009-02-19 17:36:47 UTC (rev 5904)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java	2009-02-19 19:09:40 UTC (rev 5905)
@@ -104,6 +104,7 @@
                                                request.isPreAcknowledge(),
                                                request.isXA(),
                                                request.getWindowSize());
+               
                break;
             }
             case REPLICATE_CREATESESSION:

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2009-02-19 17:36:47 UTC (rev 5904)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2009-02-19 19:09:40 UTC (rev 5905)
@@ -315,7 +315,6 @@
 
    public LinkedList<MessageReference> cancelRefs(final boolean lastConsumedAsDelivered, final Transaction tx) throws Exception
    {
-
       boolean performACK = lastConsumedAsDelivered;
 
       LinkedList<MessageReference> refs = new LinkedList<MessageReference>();
@@ -327,11 +326,13 @@
             if (performACK)
             {
                acknowledge(false, tx, ref.getMessage().getMessageID());
+               
                performACK = false;
             }
             else
             {
                ref.decrementDeliveryCount();
+               
                refs.add(ref);
             }
          }

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2009-02-19 17:36:47 UTC (rev 5904)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2009-02-19 19:09:40 UTC (rev 5905)
@@ -139,7 +139,7 @@
    private final boolean autoCommitAcks;
 
    private final boolean preAcknowledge;
-   
+
    private final boolean updateDeliveries;
 
    private volatile RemotingConnection remotingConnection;
@@ -175,11 +175,11 @@
    private final SimpleString managementAddress;
 
    private final QueueFactory queueFactory;
-   
+
    private final SimpleString nodeID;
 
    // The current currentLargeMessage being processed
-   // In case of replication, currentLargeMessage should only be accessed within the replication callbacks 
+   // In case of replication, currentLargeMessage should only be accessed within the replication callbacks
    private volatile LargeServerMessage currentLargeMessage;
 
    // The current destination used for sending LargeMessages
@@ -242,7 +242,7 @@
       {
          tx = new TransactionImpl(storageManager);
       }
-      
+
       this.updateDeliveries = updateDeliveries;
 
       this.channel = channel;
@@ -256,7 +256,7 @@
       this.managementAddress = managementAddress;
 
       this.queueFactory = queueFactory;
-      
+
       this.nodeID = server.getNodeID();
    }
 
@@ -399,9 +399,14 @@
                         " queueName = " +
                         packet.getQueueName());
                }
-               doHandleCreateQueue(packet);
-
-               lock.unlock();
+               try
+               {
+                  doHandleCreateQueue(packet);
+               }
+               finally
+               {
+                  lock.unlock();
+               }
             }
          });
       }
@@ -444,9 +449,14 @@
                {
                   trace("(Replication) DeleteQueue queueName = " + packet.getQueueName());
                }
-               doHandleDeleteQueue(packet);
-
-               lock.unlock();
+               try
+               {
+                  doHandleDeleteQueue(packet);
+               }
+               finally
+               {
+                  lock.unlock();
+               }
             }
          });
       }
@@ -1185,9 +1195,14 @@
          {
             public void run()
             {
-               doSend(packet);
-
-               lock.afterSend();
+               try
+               {
+                  doSend(packet);
+               }
+               finally
+               {
+                  lock.afterSend();
+               }
             }
          });
       }
@@ -1232,11 +1247,17 @@
                {
                   trace("(Replication) Sending LasChunk MessageID = " + currentLargeMessage.getMessageID());
                }
-               doSendContinuations(packet);
-               if (lock != null)
+               try
                {
-                  lock.afterSend();
+                  doSendContinuations(packet);
                }
+               finally
+               {
+                  if (lock != null)
+                  {
+                     lock.afterSend();
+                  }
+               }
             }
          });
       }
@@ -1269,7 +1290,7 @@
       {
          this.setStarted(false);
       }
-      
+
       remotingConnection.removeFailureListener(this);
 
       channel.transferConnection(newConnection);
@@ -1287,7 +1308,7 @@
 
       newConnection.syncIDGeneratorSequence(remotingConnection.getIDGeneratorSequence());
 
-      // Destroy the old connection     
+      // Destroy the old connection
       remotingConnection.destroy();
 
       remotingConnection = newConnection;
@@ -1302,7 +1323,7 @@
       {
          this.setStarted(true);
       }
-      
+
       return serverLastReceivedCommandID;
    }
 
@@ -1332,9 +1353,8 @@
             }
          }
 
-         // We call handleClose() since we need to replicate the close too, if there is a backup
          handleClose(new PacketImpl(PacketImpl.SESS_CLOSE));
-
+       
          log.info("Cleared up resources for session " + name);
       }
       catch (Throwable t)
@@ -1385,14 +1405,14 @@
          }
 
          Queue theQueue;
-         
+
          if (browseOnly)
          {
             // We consume a copy of the queue - TODO - this is a temporary measure
             // and will disappear once we can provide a proper iterator on the queue
 
             theQueue = queueFactory.createQueue(-1, binding.getAddress(), name, filter, false, true);
-                        
+
             // There's no need for any special locking since the list method is synchronized
             List<MessageReference> refs = ((Queue)binding.getBindable()).list(filter);
 
@@ -1400,16 +1420,16 @@
             {
                theQueue.addLast(ref);
             }
-            
+
             binding = new LocalQueueBinding(binding.getAddress(), theQueue, nodeID);
          }
          else
-         {            
+         {
             theQueue = (Queue)binding.getBindable();
          }
 
          ServerConsumer consumer = new ServerConsumerImpl(idGenerator.generateID(),
-                                                          this,                                                          
+                                                          this,
                                                           (QueueBinding)binding,
                                                           filter,
                                                           started,
@@ -1423,26 +1443,26 @@
                                                           managementService);
 
          consumers.put(consumer.getID(), consumer);
-         
+
          if (!browseOnly)
          {
             TypedProperties props = new TypedProperties();
-            
+
             props.putStringProperty(ManagementHelper.HDR_ADDRESS, binding.getAddress());
-            
+
             props.putStringProperty(ManagementHelper.HDR_CLUSTER_NAME, binding.getClusterName());
-            
+
             props.putStringProperty(ManagementHelper.HDR_ROUTING_NAME, binding.getRoutingName());
-            
+
             props.putIntProperty(ManagementHelper.HDR_DISTANCE, binding.getDistance());
-                        
+
             if (filterString != null)
             {
                props.putStringProperty(ManagementHelper.HDR_FILTERSTRING, filterString);
             }
-            
+
             Notification notification = new Notification(CONSUMER_CREATED, props);
-            
+
             managementService.sendNotification(notification);
          }
 
@@ -1505,16 +1525,17 @@
 
          final Queue queue = queueFactory.createQueue(-1, address, name, filter, durable, temporary);
 
-         //The unique name is given by the concatenation of the node id and the queue name - this is because it must be unique *across the entire cluster*
+         // The unique name is given by the concatenation of the node id and the queue name - this is because it must be
+         // unique *across the entire cluster*
          binding = new LocalQueueBinding(address, queue, nodeID);
 
          if (durable)
          {
-            storageManager.addQueueBinding(binding);                        
+            storageManager.addQueueBinding(binding);
          }
- 
+
          postOffice.addBinding(binding);
-         
+
          if (temporary)
          {
             // Temporary queue in core simply means the queue will be deleted if
@@ -1813,7 +1834,7 @@
 
       try
       {
-         rollback(packet.isLastMessageAsDelived());
+         rollback(packet.isConsiderLastMessageAsDelivered());
 
          response = new NullResponseMessage();
       }
@@ -2476,7 +2497,7 @@
       }
 
       channel.confirm(packet);
-      
+
       channel.flushConfirmations();
 
       channel.send(response);

Modified: trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnection.java
===================================================================
--- trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnection.java	2009-02-19 17:36:47 UTC (rev 5904)
+++ trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnection.java	2009-02-19 19:09:40 UTC (rev 5905)
@@ -76,7 +76,7 @@
       {
          return;
       }
-
+      
       SslHandler sslHandler = (SslHandler)channel.getPipeline().get("ssl");
       if (sslHandler != null)
       {

Modified: trunk/src/main/org/jboss/messaging/util/SimpleString.java
===================================================================
--- trunk/src/main/org/jboss/messaging/util/SimpleString.java	2009-02-19 17:36:47 UTC (rev 5904)
+++ trunk/src/main/org/jboss/messaging/util/SimpleString.java	2009-02-19 19:09:40 UTC (rev 5905)
@@ -239,13 +239,13 @@
       {
          for (int i = 0; i < data.length; i++)
          {
-            hash = 31 * hash + data[i];
+            hash = (hash << 5) - hash + data[i]; // (hash << 5) - hash is same as hash * 31
          }
       }
 
       return hash;
    }
-
+   
    public SimpleString[] split(char delim)
    {
       if (!contains(delim))

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java	2009-02-19 17:36:47 UTC (rev 5904)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java	2009-02-19 19:09:40 UTC (rev 5905)
@@ -123,15 +123,15 @@
                                   final int consumerCount,
                                   final boolean local) throws Exception
    {
-//      log.info("waiting for bindings on node " + node +
-//               " address " +
-//               address +
-//               " count " +
-//               count +
-//               " consumerCount " +
-//               consumerCount +
-//               " local " +
-//               local);
+      log.info("waiting for bindings on node " + node +
+               " address " +
+               address +
+               " count " +
+               count +
+               " consumerCount " +
+               consumerCount +
+               " local " +
+               local);
       MessagingService service = this.services[node];
 
       if (service == null)
@@ -163,7 +163,7 @@
             }
          }
 
-         //log.info(node + " binding count " + bindingCount + " consumer Count " + totConsumers);
+         log.info(node + " binding count " + bindingCount + " consumer Count " + totConsumers);
 
          if (bindingCount == count && totConsumers == consumerCount)
          {

Added: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusteredRequestResponseTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusteredRequestResponseTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusteredRequestResponseTest.java	2009-02-19 19:09:40 UTC (rev 5905)
@@ -0,0 +1,184 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.cluster.distribution;
+
+import org.jboss.messaging.core.logging.Logger;
+
+/**
+ * A ClusteredRequestResponseTest
+ * 
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * 
+ * Created 3 Feb 2009 09:10:43
+ *
+ *
+ */
+public class ClusteredRequestResponseTest extends ClusterTestBase
+{
+   private static final Logger log = Logger.getLogger(ClusteredRequestResponseTest.class);
+
+   @Override
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+
+      setupServers();
+   }
+
+   @Override
+   protected void tearDown() throws Exception
+   {
+      stopServers();
+
+      super.tearDown();
+   }
+
+   protected boolean isNetty()
+   {
+      return false;
+   }
+
+   protected boolean isFileStorage()
+   {
+      return false;
+   }
+   
+   
+   public void testRequestResponse() throws Exception
+   {
+      setupCluster();
+
+      startServers(0, 1, 2, 3, 4);
+
+      setupSessionFactory(0, isNetty());
+      setupSessionFactory(1, isNetty());
+      setupSessionFactory(2, isNetty());
+      setupSessionFactory(3, isNetty());
+      setupSessionFactory(4, isNetty());
+
+      createQueue(0, "queues.testaddress1", "queueA", null, false);
+      createQueue(4, "queues.testaddress2", "queueB", null, false);
+
+      addConsumer(0, 0, "queueA", null);
+      addConsumer(4, 4, "queueB", null);
+
+      waitForBindings(0, "queues.testaddress1", 1, 1, true);
+      waitForBindings(4, "queues.testaddress2", 1, 1, true);
+
+      waitForBindings(1, "queues.testaddress1", 1, 1, false);
+      waitForBindings(2, "queues.testaddress1", 1, 1, false);
+      waitForBindings(3, "queues.testaddress1", 1, 1, false);
+      waitForBindings(4, "queues.testaddress1", 1, 1, false);
+      
+      waitForBindings(0, "queues.testaddress2", 1, 1, false);
+      waitForBindings(1, "queues.testaddress2", 1, 1, false);
+      waitForBindings(2, "queues.testaddress2", 1, 1, false);
+      waitForBindings(3, "queues.testaddress2", 1, 1, false);
+      
+      send(0, "queues.testaddress2", 10, false, null);
+      
+      verifyReceiveAll(10, 4);
+      
+      send(4, "queues.testaddress1", 10, false, null);
+      
+      verifyReceiveAll(10, 0);      
+   }
+   
+   /*
+    * Don't wait for the response queue bindings to get to the other side
+    */
+   public void testRequestResponseNoWaitForBindings() throws Exception
+   {
+      setupCluster();
+
+      startServers(0, 1, 2, 3, 4);
+
+      setupSessionFactory(0, isNetty());
+      setupSessionFactory(1, isNetty());
+      setupSessionFactory(2, isNetty());
+      setupSessionFactory(3, isNetty());
+      setupSessionFactory(4, isNetty());
+
+      createQueue(0, "queues.testaddress1", "queueA", null, false);
+      createQueue(4, "queues.testaddress2", "queueB", null, false);
+
+      addConsumer(0, 0, "queueA", null);
+      addConsumer(4, 4, "queueB", null);
+
+      waitForBindings(4, "queues.testaddress2", 1, 1, true);
+
+      waitForBindings(0, "queues.testaddress2", 1, 1, false);
+      waitForBindings(1, "queues.testaddress2", 1, 1, false);
+      waitForBindings(2, "queues.testaddress2", 1, 1, false);
+      waitForBindings(3, "queues.testaddress2", 1, 1, false);
+      
+      send(0, "queues.testaddress2", 10, false, null);
+      
+      verifyReceiveAll(10, 4);
+      
+      send(4, "queues.testaddress1", 10, false, null);
+      
+      verifyReceiveAll(10, 0);      
+   }
+
+
+   protected void setupCluster() throws Exception
+   {
+      setupCluster(false);
+   }
+
+   protected void setupCluster(final boolean forwardWhenNoConsumers) throws Exception
+   {
+      setupClusterConnection("cluster0", "queues", forwardWhenNoConsumers, 1, isNetty(), 0, 1, 2, 3, 4);
+
+      setupClusterConnection("cluster1", "queues", forwardWhenNoConsumers, 1, isNetty(), 1, 0, 2, 3, 4);
+
+      setupClusterConnection("cluster2", "queues", forwardWhenNoConsumers, 1, isNetty(), 2, 0, 1, 3, 4);
+
+      setupClusterConnection("cluster3", "queues", forwardWhenNoConsumers, 1, isNetty(), 3, 0, 1, 2, 4);
+
+      setupClusterConnection("cluster4", "queues", forwardWhenNoConsumers, 1, isNetty(), 4, 0, 1, 2, 3);
+   }
+
+   protected void setupServers() throws Exception
+   {
+      setupServer(0, isFileStorage(), isNetty());
+      setupServer(1, isFileStorage(), isNetty());
+      setupServer(2, isFileStorage(), isNetty());
+      setupServer(3, isFileStorage(), isNetty());
+      setupServer(4, isFileStorage(), isNetty());
+   }
+
+   protected void stopServers() throws Exception
+   {
+      closeAllConsumers();
+
+      closeAllSessionFactories();
+
+      // We stop the cluster connections first since this makes server shutdown quicker
+      stopClusterConnections(0, 1, 2, 3, 4);
+
+      stopServers(0, 1, 2, 3, 4);
+   }
+
+}

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java	2009-02-19 17:36:47 UTC (rev 5904)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java	2009-02-19 19:09:40 UTC (rev 5905)
@@ -83,7 +83,7 @@
    protected final Map<String, Object> backupParams = new HashMap<String, Object>();
 
    protected Timer timer;
-
+   
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
@@ -973,6 +973,7 @@
 
    protected void doTestI(final ClientSessionFactory sf, final int threadNum) throws Exception
    {
+      log.info("in testi");
       ClientSession sessCreate = sf.createSession(false, true, true);
 
       sessCreate.createQueue(ADDRESS, new SimpleString(threadNum + ADDRESS.toString()), null, false, false);
@@ -1005,6 +1006,8 @@
       sessCreate.deleteQueue(new SimpleString(threadNum + ADDRESS.toString()));
 
       sessCreate.close();
+            
+      log.info("completed testi");
    }
 
    protected void doTestJ(final ClientSessionFactory sf, final int threadNum) throws Exception
@@ -1359,11 +1362,10 @@
                }
             }
             
-            log.info("completed*******");
+            log.info("completed loop");
 
             runnable.checkFail();
             
-            log.info("super completed");
          }
          while (!failer.isExecuted());
 

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/discovery/DiscoveryTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/discovery/DiscoveryTest.java	2009-02-19 17:36:47 UTC (rev 5904)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/discovery/DiscoveryTest.java	2009-02-19 19:09:40 UTC (rev 5905)
@@ -108,6 +108,71 @@
 
    }
    
+   public void testSimpleBroadcastWithStopStartDiscoveryGroup() throws Exception
+   {
+      final InetAddress groupAddress = InetAddress.getByName(address1);
+      final int groupPort = 6745;
+      final int timeout = 500;
+
+      BroadcastGroup bg = new BroadcastGroupImpl(randomString(), randomString(), null, -1, groupAddress, groupPort);
+
+      bg.start();
+
+      TransportConfiguration live1 = generateTC();
+
+      TransportConfiguration backup1 = generateTC();
+
+      Pair<TransportConfiguration, TransportConfiguration> connectorPair = new Pair<TransportConfiguration, TransportConfiguration>(live1,
+                                                                                                                                    backup1);
+
+      bg.addConnectorPair(connectorPair);
+
+      DiscoveryGroup dg = new DiscoveryGroupImpl(randomString(), randomString(), groupAddress, groupPort, timeout);
+
+      dg.start();
+
+      bg.broadcastConnectors();
+
+      boolean ok = dg.waitForBroadcast(1000);
+
+      assertTrue(ok);
+
+      List<Pair<TransportConfiguration, TransportConfiguration>> connectors = dg.getConnectors();
+
+      assertNotNull(connectors);
+
+      assertEquals(1, connectors.size());
+
+      Pair<TransportConfiguration, TransportConfiguration> receivedPair = connectors.get(0);
+
+      assertEquals(connectorPair, receivedPair);
+
+      bg.stop();
+
+      dg.stop();
+      
+      dg.start();
+                
+      bg.start();
+      
+      bg.broadcastConnectors();
+      
+      ok = dg.waitForBroadcast(1000);
+
+      assertTrue(ok);
+
+      connectors = dg.getConnectors();
+
+      assertNotNull(connectors);
+
+      assertEquals(1, connectors.size());
+
+      receivedPair = connectors.get(0);
+
+      assertEquals(connectorPair, receivedPair);
+
+   }
+   
    public void testIgnoreTrafficFromOwnNode() throws Exception
    {
       final InetAddress groupAddress = InetAddress.getByName(address1);




More information about the jboss-cvs-commits mailing list