[hornetq-commits] JBoss hornetq SVN: r11199 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/server/cluster and 4 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Sat Aug 13 01:48:57 EDT 2011


Author: clebert.suconic
Date: 2011-08-13 01:48:57 -0400 (Sat, 13 Aug 2011)
New Revision: 11199

Added:
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerInternal.java
Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/DelegatingSession.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/Bridge.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/MessageFlowRecord.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/NettyFileStorageSymmetricClusterTest.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/UnitTestCase.java
Log:
Fixing the testsuite

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java	2011-08-12 15:10:56 UTC (rev 11198)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java	2011-08-13 05:48:57 UTC (rev 11199)
@@ -1399,27 +1399,19 @@
                log.trace("Disconnect being called on client:" + msg + " server locator = " + serverLocator, new Exception ("trace"));
             }
 
-            closeExecutor.execute(new Runnable()
+            conn.fail(new HornetQException(HornetQException.DISCONNECTED,
+                                           "The connection was disconnected because of server shutdown"));
+            
+            SimpleString nodeID = msg.getNodeID();
+            if (log.isTraceEnabled())
             {
-               // Must be executed on new thread since cannot block the netty thread for a long time and fail can
-               // cause reconnect loop
-               public void run()
-               {
-                  SimpleString nodeID = msg.getNodeID();
-                  if (log.isTraceEnabled())
-                  {
-                     log.trace("notifyDown nodeID=" + msg.getNodeID() + " on serverLocator=" + serverLocator + " csf created at ", ClientSessionFactoryImpl.this.e);
-                  }
-                  if (nodeID != null)
-                  {
-                     serverLocator.notifyNodeDown(msg.getNodeID().toString());
-                  }
+               log.trace("notifyDown nodeID=" + msg.getNodeID() + " on serverLocator=" + serverLocator + " csf created at ", ClientSessionFactoryImpl.this.e);
+            }
+            if (nodeID != null)
+            {
+               serverLocator.notifyNodeDown(msg.getNodeID().toString());
+            }
 
-                  conn.fail(new HornetQException(HornetQException.DISCONNECTED,
-                                                 "The connection was disconnected because of server shutdown"));
-
-               }
-            });
          }
          else if (type == PacketImpl.CLUSTER_TOPOLOGY)
          {

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2011-08-12 15:10:56 UTC (rev 11198)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2011-08-13 05:48:57 UTC (rev 11199)
@@ -282,6 +282,11 @@
 
    // ClientSession implementation
    // -----------------------------------------------------------------
+   
+   public Channel getChannel()
+   {
+      return channel;
+   }
 
    public void createQueue(final SimpleString address, final SimpleString queueName) throws HornetQException
    {

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java	2011-08-12 15:10:56 UTC (rev 11198)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java	2011-08-13 05:48:57 UTC (rev 11199)
@@ -17,6 +17,7 @@
 import org.hornetq.api.core.Message;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.core.protocol.core.Channel;
 import org.hornetq.core.protocol.core.CoreRemotingConnection;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveLargeMessage;
@@ -61,6 +62,8 @@
    void handleFailover(CoreRemotingConnection backupConnection);
 
    RemotingConnection getConnection();
+   
+   Channel getChannel();
 
    void cleanUp(boolean failingOver) throws Exception;
 

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/DelegatingSession.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/DelegatingSession.java	2011-08-12 15:10:56 UTC (rev 11198)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/DelegatingSession.java	2011-08-13 05:48:57 UTC (rev 11199)
@@ -28,6 +28,7 @@
 import org.hornetq.api.core.client.SendAcknowledgementHandler;
 import org.hornetq.api.core.client.SessionFailureListener;
 import org.hornetq.core.logging.Logger;
+import org.hornetq.core.protocol.core.Channel;
 import org.hornetq.core.protocol.core.CoreRemotingConnection;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveLargeMessage;
@@ -584,4 +585,12 @@
       return "DelegatingSession [session=" + session + "]";
    }
 
+   /* (non-Javadoc)
+    * @see org.hornetq.core.client.impl.ClientSessionInternal#getChannel()
+    */
+   public Channel getChannel()
+   {
+      return session.getChannel();
+   }
+
 }

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java	2011-08-12 15:10:56 UTC (rev 11198)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java	2011-08-13 05:48:57 UTC (rev 11199)
@@ -69,7 +69,7 @@
 
    private transient String identity;
 
-   private Set<ClientSessionFactory> factories = new HashSet<ClientSessionFactory>();
+   private Set<ClientSessionFactoryInternal> factories = new HashSet<ClientSessionFactoryInternal>();
 
    private TransportConfiguration[] initialConnectors;
 
@@ -84,7 +84,7 @@
    private boolean receivedTopology;
 
    private boolean compressLargeMessage;
-   
+
    // if the system should shutdown the pool when shutting down
    private transient boolean shutdownPool;
 
@@ -252,13 +252,12 @@
 
    private void setThreadPools()
    {
-	  if (threadPool != null)
-	  {
-		  return;
-	  }
-	  else
-      if (useGlobalPools)
+      if (threadPool != null)
       {
+         return;
+      }
+      else if (useGlobalPools)
+      {
          threadPool = getGlobalThreadPool();
 
          scheduledThreadPool = getGlobalScheduledThreadPool();
@@ -266,7 +265,7 @@
       else
       {
          this.shutdownPool = true;
-         
+
          ThreadFactory factory = new HornetQThreadFactory("HornetQ-client-factory-threads-" + System.identityHashCode(this),
                                                           true,
                                                           getThisClassLoader());
@@ -369,19 +368,13 @@
 
    private ServerLocatorImpl(final Topology topology,
                              final boolean useHA,
-                             final ExecutorService threadPool, 
-                             final ScheduledExecutorService scheduledExecutor, 
                              final DiscoveryGroupConfiguration discoveryGroupConfiguration,
                              final TransportConfiguration[] transportConfigs)
    {
       e.fillInStackTrace();
-      
-      this.scheduledThreadPool = scheduledExecutor;
-      
-      this.threadPool = threadPool;
-      
+
       this.topology = topology;
-      
+
       this.ha = useHA;
 
       this.discoveryGroupConfiguration = discoveryGroupConfiguration;
@@ -459,8 +452,13 @@
     */
    public ServerLocatorImpl(final boolean useHA, final DiscoveryGroupConfiguration groupConfiguration)
    {
-      this(new Topology(null), useHA, null, null, groupConfiguration, null);
-      topology.setOwner(this);
+      this(useHA ? new Topology(null) : null, useHA, groupConfiguration, null);
+      if (useHA)
+      {
+         // We only set the owner at where the Topology was created.
+         // For that reason we can't set it at the main constructor
+         topology.setOwner(this);
+      }
    }
 
    /**
@@ -470,8 +468,13 @@
     */
    public ServerLocatorImpl(final boolean useHA, final TransportConfiguration... transportConfigs)
    {
-      this(new Topology(null), useHA, null, null, null, transportConfigs);
-      topology.setOwner(this);
+      this(useHA ? new Topology(null) : null, useHA, null, transportConfigs);
+      if (useHA)
+      {
+         // We only set the owner at where the Topology was created.
+         // For that reason we can't set it at the main constructor
+         topology.setOwner(this);
+      }
    }
 
    /**
@@ -480,10 +483,10 @@
     * @param discoveryAddress
     * @param discoveryPort
     */
-   public ServerLocatorImpl(final Topology topology, final boolean useHA, final ExecutorService threadPool, final ScheduledExecutorService scheduledExecutor, final DiscoveryGroupConfiguration groupConfiguration)
+   public ServerLocatorImpl(final Topology topology, final boolean useHA, final DiscoveryGroupConfiguration groupConfiguration)
    {
-      this(topology, useHA, threadPool, scheduledExecutor, groupConfiguration, null);
-      
+      this(topology, useHA, groupConfiguration, null);
+
    }
 
    /**
@@ -491,9 +494,11 @@
     *
     * @param transportConfigs
     */
-   public ServerLocatorImpl(final Topology topology, final boolean useHA, final ExecutorService threadPool, final ScheduledExecutorService scheduledExecutor, final TransportConfiguration... transportConfigs)
+   public ServerLocatorImpl(final Topology topology,
+                            final boolean useHA,
+                            final TransportConfiguration... transportConfigs)
    {
-      this(topology, useHA, threadPool, scheduledExecutor, null, transportConfigs);
+      this(topology, useHA, null, transportConfigs);
    }
 
    private TransportConfiguration selectConnector()
@@ -565,7 +570,7 @@
       addFactory(sf);
       return sf;
    }
-   
+
    public boolean isClosed()
    {
       return closed || closing;
@@ -609,7 +614,7 @@
 
    public ClientSessionFactory createSessionFactory() throws Exception
    {
-      if (closed)
+      if (closed || closing)
       {
          throw new IllegalStateException("Cannot create session factory, server locator is closed (maybe it has been garbage collected)");
       }
@@ -697,10 +702,12 @@
          if (ha || clusterConnection)
          {
             long timeout = System.currentTimeMillis() + 30000;
-            while (!ServerLocatorImpl.this.closed && !ServerLocatorImpl.this.closing && !receivedTopology && timeout > System.currentTimeMillis())
+            while (!ServerLocatorImpl.this.closed && !ServerLocatorImpl.this.closing &&
+                   !receivedTopology &&
+                   timeout > System.currentTimeMillis())
             {
                // Now wait for the topology
-               
+
                try
                {
                   wait(1000);
@@ -711,7 +718,7 @@
 
             }
 
-            if (System.currentTimeMillis() > timeout && ! receivedTopology && !closed && !closing)
+            if (System.currentTimeMillis() > timeout && !receivedTopology && !closed && !closing)
             {
                throw new HornetQException(HornetQException.CONNECTION_TIMEDOUT,
                                           "Timed out waiting to receive cluster topology");
@@ -1137,7 +1144,7 @@
 
       super.finalize();
    }
-   
+
    public void cleanup()
    {
       doClose(false);
@@ -1147,7 +1154,7 @@
    {
       doClose(true);
    }
-   
+
    protected void doClose(final boolean sendClose)
    {
       if (closed)
@@ -1182,7 +1189,7 @@
          staticConnector.disconnect();
       }
 
-      Set<ClientSessionFactory> clonedFactory = new HashSet<ClientSessionFactory>(factories);
+      Set<ClientSessionFactoryInternal> clonedFactory = new HashSet<ClientSessionFactoryInternal>(factories);
 
       for (ClientSessionFactory factory : clonedFactory)
       {
@@ -1239,12 +1246,10 @@
 
    public void notifyNodeDown(final String nodeID)
    {
-      if (!clusterConnection && !ha)
+
+      if (topology == null)
       {
-         if (log.isDebugEnabled())
-         {
-            log.debug(this + "::ignoring notifyNodeDown=" + nodeID + " as isHA=false");
-         }
+         // there's no topology here
          return;
       }
 
@@ -1254,7 +1259,7 @@
       }
 
       topology.removeMember(nodeID);
- 
+
       if (!topology.isEmpty())
       {
          updateArraysAndPairs();
@@ -1274,25 +1279,18 @@
    }
 
    public void notifyNodeUp(final String nodeID,
-                                         final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
-                                         final boolean last)
+                            final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
+                            final boolean last)
    {
-      if (!clusterConnection && !ha)
+      if (topology == null)
       {
-         if (log.isDebugEnabled())
-         {
-            log.debug(this + "::Ignoring notifyNodeUp for " +
-                      nodeID +
-                      " connectorPair=" +
-                      connectorPair +
-                      ", since ha=false and clusterConnection=false");
-         }
+         // there's no topology
          return;
       }
 
       if (log.isDebugEnabled())
       {
-         log.debug("NodeUp " + this + "::nodeID=" + nodeID + ", connectorPair=" + connectorPair, new Exception ("trace"));
+         log.debug("NodeUp " + this + "::nodeID=" + nodeID + ", connectorPair=" + connectorPair, new Exception("trace"));
       }
 
       topology.addMember(nodeID, new TopologyMember(connectorPair), last);
@@ -1352,7 +1350,7 @@
    private synchronized void updateArraysAndPairs()
    {
       Collection<TopologyMember> membersCopy = topology.getMembers();
-      
+
       topologyArray = (Pair<TransportConfiguration, TransportConfiguration>[])Array.newInstance(Pair.class,
                                                                                                 membersCopy.size());
 
@@ -1395,12 +1393,12 @@
    {
       factories.remove(factory);
 
-      if (!clusterConnection  && factories.isEmpty())
+      if (!clusterConnection && factories.isEmpty())
       {
          // Go back to using the broadcast or static list
 
          receivedTopology = false;
-         
+
          topologyArray = null;
       }
    }
@@ -1424,7 +1422,13 @@
    {
       if (factory != null)
       {
-         TransportConfiguration backup = topology.getBackupForConnector(factory.getConnectorConfiguration());
+         TransportConfiguration backup = null;
+
+         if (topology != null)
+         {
+            backup = topology.getBackupForConnector(factory.getConnectorConfiguration());
+         }
+
          factory.setBackupConnector(factory.getConnectorConfiguration(), backup);
          factories.add(factory);
       }
@@ -1476,10 +1480,11 @@
                   {
                      csf.getConnection().addFailureListener(new FailureListener()
                      {
-                        // Case the node where we were connected is gone, we need to restart the connection
+                        // Case the node where the cluster connection was connected is gone, we need to restart the
+                        // connection
                         public void connectionFailed(HornetQException exception, boolean failedOver)
                         {
-                           if (exception.getCode() == HornetQException.DISCONNECTED)
+                           if (clusterConnection && exception.getCode() == HornetQException.DISCONNECTED)
                            {
                               try
                               {
@@ -1496,7 +1501,7 @@
 
                      if (log.isDebugEnabled())
                      {
-                        log.debug("XXX Returning " + csf +
+                        log.debug("Returning " + csf +
                                   " after " +
                                   retryNumber +
                                   " retries on StaticConnector " +

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java	2011-08-12 15:10:56 UTC (rev 11198)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java	2011-08-13 05:48:57 UTC (rev 11199)
@@ -40,6 +40,8 @@
 
    private static final Logger log = Logger.getLogger(Topology.class);
 
+   private Executor executor = null;
+
    /** Used to debug operations.
     * 
     *  Someone may argue this is not needed. But it's impossible to debg anything related to topology without knowing what node
@@ -49,14 +51,8 @@
     *  */
    private volatile Object owner;
 
-   public Topology(final Object owner)
-   {
-      this.owner = owner;
-      Topology.log.debug("Topology@" + Integer.toHexString(System.identityHashCode(this)) + " CREATE",
-                         new Exception("trace")); // Delete this line
-   }
 
-   /*
+   /**
     * topology describes the other cluster nodes that this server knows about:
     *
     * keys are node IDs
@@ -64,6 +60,18 @@
     */
    private final Map<String, TopologyMember> topology = new ConcurrentHashMap<String, TopologyMember>();
 
+   public Topology(final Object owner)
+   {
+      this.owner = owner;
+      Topology.log.debug("Topology@" + Integer.toHexString(System.identityHashCode(this)) + " CREATE",
+                         new Exception("trace")); // Delete this line
+   }
+
+   public void setExecutor(final Executor executor)
+   {
+      this.executor = executor;
+   }
+
    public void addClusterTopologyListener(final ClusterTopologyListener listener)
    {
       if (log.isDebugEnabled())
@@ -164,25 +172,32 @@
 
       if (replaced)
       {
+
+         
          final ArrayList<ClusterTopologyListener> copy = copyListeners();
-
-         // Has to use a different thread otherwise we may get dead locks case the remove is coming from the channel
-         for (ClusterTopologyListener listener : copy)
+         
+         execute(new Runnable()
          {
-            if (Topology.log.isTraceEnabled())
+            public void run()
             {
-               Topology.log.trace(this + " informing " + listener + " about node up = " + nodeId);
-            }
+               for (ClusterTopologyListener listener : copy)
+               {
+                  if (Topology.log.isTraceEnabled())
+                  {
+                     Topology.log.trace(this + " informing " + listener + " about node up = " + nodeId);
+                  }
 
-            try
-            {
-               listener.nodeUP(nodeId, member.getConnector(), last);
+                  try
+                  {
+                     listener.nodeUP(nodeId, member.getConnector(), last);
+                  }
+                  catch (Throwable e)
+                  {
+                     log.warn(e.getMessage(), e);
+                  }
+               }
             }
-            catch (Throwable e)
-            {
-               log.warn(e.getMessage(), e);
-            }
-         }
+         });
       }
 
       return replaced;
@@ -212,7 +227,7 @@
 
       if (Topology.log.isDebugEnabled())
       {
-         Topology.log.debug("ZZZ removeMember " + this +
+         Topology.log.debug("removeMember " + this +
                             " removing nodeID=" +
                             nodeId +
                             ", result=" +
@@ -225,17 +240,43 @@
       {
          final ArrayList<ClusterTopologyListener> copy = copyListeners();
 
-         for (ClusterTopologyListener listener : copy)
+         execute(new Runnable()
          {
-            if (Topology.log.isTraceEnabled())
+            public void run()
             {
-               Topology.log.trace(this + " informing " + listener + " about node down = " + nodeId);
+               for (ClusterTopologyListener listener : copy)
+               {
+                  if (Topology.log.isTraceEnabled())
+                  {
+                     Topology.log.trace(this + " informing " + listener + " about node down = " + nodeId);
+                  }
+                  try
+                  {
+                     listener.nodeDown(nodeId);
+                  }
+                  catch (Exception e)
+                  {
+                     log.warn(e.getMessage(), e);
+                  }
+               }
             }
-            listener.nodeDown(nodeId);
-         }
+         });
+
       }
       return member != null;
    }
+   
+   protected void execute(final Runnable runnable)
+   {
+      if (executor != null)
+      {
+         executor.execute(runnable);
+      }
+      else
+      {
+         runnable.run();
+      }
+   }
 
    /**
     * it will send all the member updates to listeners, independently of being changed or not
@@ -264,7 +305,7 @@
       }
    }
 
-   public synchronized void sendTopology(final ClusterTopologyListener listener)
+   public void sendTopology(final ClusterTopologyListener listener)
    {
       int count = 0;
 

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/Bridge.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/Bridge.java	2011-08-12 15:10:56 UTC (rev 11198)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/Bridge.java	2011-08-13 05:48:57 UTC (rev 11199)
@@ -53,4 +53,10 @@
    void pause() throws Exception;
 
    void resume() throws Exception;
+
+   /**
+    * To be called when the server sent a disconnect to the client.
+    * Basically this is for cluster bridges being disconnected
+    */
+   void disconnect();
 }

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/MessageFlowRecord.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/MessageFlowRecord.java	2011-08-12 15:10:56 UTC (rev 11198)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/MessageFlowRecord.java	2011-08-13 05:48:57 UTC (rev 11199)
@@ -33,6 +33,8 @@
    Bridge getBridge();
 
    void close() throws Exception;
+   
+   void serverDisconnected();
 
    boolean isClosed();
 

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java	2011-08-12 15:10:56 UTC (rev 11198)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java	2011-08-13 05:48:57 UTC (rev 11199)
@@ -25,14 +25,14 @@
 import org.hornetq.api.core.Message;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.client.ClientProducer;
-import org.hornetq.api.core.client.ClientSession.BindingQuery;
 import org.hornetq.api.core.client.ClientSessionFactory;
 import org.hornetq.api.core.client.SendAcknowledgementHandler;
-import org.hornetq.api.core.client.ServerLocator;
 import org.hornetq.api.core.client.SessionFailureListener;
+import org.hornetq.api.core.client.ClientSession.BindingQuery;
 import org.hornetq.api.core.management.NotificationType;
 import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
 import org.hornetq.core.client.impl.ClientSessionInternal;
+import org.hornetq.core.client.impl.ServerLocatorInternal;
 import org.hornetq.core.filter.Filter;
 import org.hornetq.core.filter.impl.FilterImpl;
 import org.hornetq.core.logging.Logger;
@@ -47,6 +47,7 @@
 import org.hornetq.core.server.management.Notification;
 import org.hornetq.core.server.management.NotificationService;
 import org.hornetq.spi.core.protocol.RemotingConnection;
+import org.hornetq.utils.ConcurrentHashSet;
 import org.hornetq.utils.Future;
 import org.hornetq.utils.TypedProperties;
 import org.hornetq.utils.UUID;
@@ -77,7 +78,7 @@
 
    private static final SimpleString JMS_TOPIC_ADDRESS_PREFIX = new SimpleString("jms.topic.");
 
-   protected final ServerLocator serverLocator;
+   protected final ServerLocatorInternal serverLocator;
 
    private final UUID nodeUUID;
 
@@ -138,7 +139,7 @@
 
    // Public --------------------------------------------------------
 
-   public BridgeImpl(final ServerLocator serverLocator,
+   public BridgeImpl(final ServerLocatorInternal serverLocator,
                      final int reconnectAttempts,
                      final long retryInterval,
                      final double retryMultiplier,
@@ -159,7 +160,7 @@
    {
 
       this.reconnectAttempts = reconnectAttempts;
-      
+
       this.reconnectAttemptsInUse = -1;
 
       this.retryInterval = retryInterval;
@@ -199,7 +200,6 @@
    {
       this.notificationService = notificationService;
    }
-
    public synchronized void start() throws Exception
    {
       if (started)
@@ -278,6 +278,35 @@
       }
    }
 
+   public void disconnect()
+   {
+      executor.execute(new Runnable()
+      {
+         public void run()
+         {
+            if (session != null)
+            {
+               try
+               {
+                  session.cleanUp(false);
+               }
+               catch (Exception dontcare)
+               {
+                  log.debug(dontcare.getMessage(), dontcare);
+               }
+               session = null;
+            }
+         }
+      });
+   }
+
+   /** The cluster manager needs to use the same executor to close the serverLocator, otherwise the stop will break. 
+    *  This method is intended to expose this executor to the ClusterManager */
+   public Executor getExecutor()
+   {
+      return executor;
+   }
+   
    public void stop() throws Exception
    {
       if (log.isDebugEnabled())
@@ -468,7 +497,7 @@
          {
             if (log.isDebugEnabled())
             {
-            	log.debug(this + "::Ignoring reference on bridge as it is set to iniactive ref=" + ref);
+               log.debug(this + "::Ignoring reference on bridge as it is set to iniactive ref=" + ref);
             }
             return HandleStatus.BUSY;
          }
@@ -521,9 +550,9 @@
 
    public void connectionFailed(final HornetQException me, boolean failedOver)
    {
-      
+
       log.warn(this + "::Connection failed with failedOver=" + failedOver + "-" + me, me);
-      
+
       try
       {
          csf.cleanup();
@@ -539,12 +568,12 @@
       catch (Throwable dontCare)
       {
       }
-      
+
       fail(me.getCode() == HornetQException.DISCONNECTED);
 
       tryScheduleRetryReconnect(me.getCode());
    }
-   
+
    protected void tryScheduleRetryReconnect(final int code)
    {
       scheduleRetryConnect();
@@ -552,8 +581,8 @@
 
    public void beforeReconnect(final HornetQException exception)
    {
-//      log.warn(name + "::Connection failed before reconnect ", exception);
-//      fail(false);
+      // log.warn(name + "::Connection failed before reconnect ", exception);
+      // fail(false);
    }
 
    // Package protected ---------------------------------------------
@@ -568,7 +597,15 @@
    @Override
    public String toString()
    {
-      return this.getClass().getSimpleName() + "@" + Integer.toHexString(System.identityHashCode(this)) + " [name=" + name + ", queue=" + queue + " targetConnector=" + this.serverLocator + "]";
+      return this.getClass().getSimpleName() + "@" +
+             Integer.toHexString(System.identityHashCode(this)) +
+             " [name=" +
+             name +
+             ", queue=" +
+             queue +
+             " targetConnector=" +
+             this.serverLocator +
+             "]";
    }
 
    protected void fail(final boolean permanently)
@@ -590,7 +627,7 @@
             log.debug(dontcare);
          }
       }
-      
+
       cancelRefs();
       if (queue != null)
       {
@@ -615,7 +652,6 @@
    {
       ClientSessionFactoryInternal csf = (ClientSessionFactoryInternal)serverLocator.createSessionFactory();
       csf.setReconnectAttempts(0);
-      //csf.setInitialReconnectAttempts(1);
       return csf;
    }
 
@@ -625,7 +661,7 @@
       BridgeImpl.log.debug("Connecting  " + this + " to its destination [" + nodeUUID.toString() + "], csf=" + this.csf);
 
       retryCount++;
-      
+
       try
       {
          if (csf == null || csf.isClosed())
@@ -712,15 +748,14 @@
             {
                log.debug("Bridge " + this + " is unable to connect to destination. Retrying", e);
             }
+
+            scheduleRetryConnect();
          }
       }
       catch (Exception e)
       {
          BridgeImpl.log.warn("Bridge " + this + " is unable to connect to destination. It will be disabled.", e);
       }
-
-      scheduleRetryConnect();
-
    }
 
    protected void scheduleRetryConnect()
@@ -730,7 +765,7 @@
          log.warn("ServerLocator was shutdown, can't retry on opening connection for bridge");
          return;
       }
-      
+
       if (reconnectAttemptsInUse >= 0 && retryCount > reconnectAttempts)
       {
          log.warn("Bridge " + this.name +
@@ -752,9 +787,15 @@
       {
          timeout = maxRetryInterval;
       }
-      
-      log.debug("Bridge " + this + " retrying connection #" + retryCount + ", maxRetry=" + reconnectAttemptsInUse + ", timeout=" + timeout);
 
+      log.debug("Bridge " + this +
+                " retrying connection #" +
+                retryCount +
+                ", maxRetry=" +
+                reconnectAttemptsInUse +
+                ", timeout=" +
+                timeout);
+
       scheduleRetryConnectFixedTimeout(timeout);
    }
 
@@ -770,13 +811,15 @@
          {
          }
       }
-      
+
       if (log.isDebugEnabled())
       {
          log.debug("Scheduling retry for bridge " + this.name + " in " + milliseconds + " milliseconds");
       }
 
-      futureScheduledReconnection = scheduledExecutor.schedule(new FutureConnectRunnable(), milliseconds, TimeUnit.MILLISECONDS);
+      futureScheduledReconnection = scheduledExecutor.schedule(new FutureConnectRunnable(),
+                                                               milliseconds,
+                                                               TimeUnit.MILLISECONDS);
    }
 
    // Inner classes -------------------------------------------------
@@ -788,12 +831,23 @@
          try
          {
             log.debug("stopping bridge " + BridgeImpl.this);
-            
+
+            queue.removeConsumer(BridgeImpl.this);
+
+            internalCancelReferences();
+
             if (session != null)
             {
                log.debug("Cleaning up session " + session);
-               session.close();
                session.removeFailureListener(BridgeImpl.this);
+               try
+               {
+                  session.close();
+                  session = null;
+               }
+               catch (Exception dontcare)
+               {
+               }
             }
 
             if (csf != null)
@@ -815,10 +869,6 @@
             {
                log.trace("Removing consumer on stopRunnable " + this + " from queue " + queue);
             }
-            queue.removeConsumer(BridgeImpl.this);
-
-            internalCancelReferences();
-
             log.info("stopped bridge " + name);
          }
          catch (Exception e)

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java	2011-08-12 15:10:56 UTC (rev 11198)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java	2011-08-13 05:48:57 UTC (rev 11199)
@@ -24,15 +24,16 @@
 import org.hornetq.api.core.client.ClientConsumer;
 import org.hornetq.api.core.client.ClientMessage;
 import org.hornetq.api.core.client.ClientProducer;
-import org.hornetq.api.core.client.ServerLocator;
 import org.hornetq.api.core.management.ManagementHelper;
 import org.hornetq.api.core.management.NotificationType;
 import org.hornetq.api.core.management.ResourceNames;
+import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
 import org.hornetq.core.client.impl.ServerLocatorInternal;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.message.impl.MessageImpl;
 import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.core.postoffice.BindingType;
+import org.hornetq.core.remoting.FailureListener;
 import org.hornetq.core.server.Queue;
 import org.hornetq.core.server.ServerMessage;
 import org.hornetq.core.server.cluster.ClusterConnection;
@@ -57,6 +58,8 @@
 
    private final ClusterConnection clusterConnection;
 
+   private final ClusterManagerInternal clusterManager;
+
    private final MessageFlowRecord flowRecord;
 
    private final SimpleString managementAddress;
@@ -74,7 +77,8 @@
    private final ServerLocatorInternal discoveryLocator;
 
    public ClusterConnectionBridge(final ClusterConnection clusterConnection,
-                                  final ServerLocator targetLocator,
+                                  final ClusterManagerInternal clusterManager,
+                                  final ServerLocatorInternal targetLocator,
                                   final ServerLocatorInternal discoveryLocator,
                                   final int reconnectAttempts,
                                   final long retryInterval,
@@ -124,6 +128,8 @@
 
       this.clusterConnection = clusterConnection;
 
+      this.clusterManager = clusterManager;
+
       this.targetNodeID = targetNodeID;
       this.managementAddress = managementAddress;
       this.managementNotificationAddress = managementNotificationAddress;
@@ -132,13 +138,32 @@
 
       // we need to disable DLQ check on the clustered bridges
       queue.setInternalQueue(true);
-      
+
       if (log.isDebugEnabled())
       {
-         log.debug("Setting up bridge between " + clusterConnection.getConnector() + " and " + targetLocator, new Exception ("trace"));
+         log.debug("Setting up bridge between " + clusterConnection.getConnector() + " and " + targetLocator,
+                   new Exception("trace"));
       }
    }
-   
+
+   protected ClientSessionFactoryInternal createSessionFactory() throws Exception
+   {
+      ClientSessionFactoryInternal factory = super.createSessionFactory();
+      factory.getConnection().addFailureListener(new FailureListener()
+      {
+
+         public void connectionFailed(HornetQException exception, boolean failedOver)
+         {
+            if (exception.getCode() == HornetQException.DISCONNECTED)
+            {
+               flowRecord.serverDisconnected();
+               clusterManager.removeClusterLocator(serverLocator);
+            }
+         }
+      });
+      return factory;
+   }
+
    @Override
    protected ServerMessage beforeForward(ServerMessage message)
    {
@@ -247,7 +272,7 @@
 
          ClientMessage message = session.createMessage(false);
 
-         log.debug("Requesting sendQueueInfoToQueue through " + this, new Exception ("trace"));
+         log.debug("Requesting sendQueueInfoToQueue through " + this, new Exception("trace"));
          ManagementHelper.putOperationInvocation(message,
                                                  ResourceNames.CORE_SERVER,
                                                  "sendQueueInfoToQueue",
@@ -286,7 +311,6 @@
       }
    }
 
-
    protected void fail(final boolean permanently)
    {
       log.debug("Cluster Bridge " + this.getName() + " failed, permanently=" + permanently);

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2011-08-12 15:10:56 UTC (rev 11198)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2011-08-13 05:48:57 UTC (rev 11199)
@@ -33,12 +33,15 @@
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.TransportConfiguration;
 import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClusterTopologyListener;
 import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.ServerLocator;
 import org.hornetq.api.core.management.ManagementHelper;
 import org.hornetq.api.core.management.NotificationType;
 import org.hornetq.core.client.impl.ServerLocatorImpl;
 import org.hornetq.core.client.impl.ServerLocatorInternal;
 import org.hornetq.core.client.impl.Topology;
+import org.hornetq.core.client.impl.TopologyMember;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.postoffice.Binding;
 import org.hornetq.core.postoffice.Bindings;
@@ -48,6 +51,7 @@
 import org.hornetq.core.server.Queue;
 import org.hornetq.core.server.cluster.Bridge;
 import org.hornetq.core.server.cluster.ClusterConnection;
+import org.hornetq.core.server.cluster.ClusterManager;
 import org.hornetq.core.server.cluster.MessageFlowRecord;
 import org.hornetq.core.server.cluster.RemoteQueueBinding;
 import org.hornetq.core.server.group.impl.Proposal;
@@ -136,9 +140,9 @@
 
    private final Set<TransportConfiguration> allowableConnections = new HashSet<TransportConfiguration>();
 
-   private final ClusterManagerImpl manager;
+   private final ClusterManagerInternal manager;
 
-   public ClusterConnectionImpl(final ClusterManagerImpl manager,
+   public ClusterConnectionImpl(final ClusterManagerInternal manager,
                                 final Topology clusterManagerTopology,
                                 final TransportConfiguration[] tcConfigs,
                                 final TransportConfiguration connector,
@@ -657,38 +661,10 @@
                                 final Queue queue,
                                 final boolean start) throws Exception
    {
-      MessageFlowRecordImpl record = new MessageFlowRecordImpl(targetNodeID, connector, queueName, queue);
-
-      Bridge bridge = createClusteredBridge(record);
-
-      if (log.isDebugEnabled())
-      {
-         log.debug("creating record between " + this.connector + " and " + connector + bridge);
-      }
-
-      record.setBridge(bridge);
-
-      records.put(targetNodeID, record);
-
-      if (start)
-      {
-         bridge.start();
-      }
-   }
-
-   /**
-    * @param record
-    * @return
-    * @throws Exception
-    */
-   protected Bridge createClusteredBridge(MessageFlowRecordImpl record) throws Exception
-   {
-
-      final ServerLocatorInternal targetLocator = new ServerLocatorImpl(this.clusterManagerTopology,
+      Topology topology = new Topology(null);
+      final ServerLocatorInternal targetLocator = new ServerLocatorImpl(topology,
                                                                         false,
-                                                                        server.getThreadPool(),
-                                                                        server.getScheduledPool(),
-                                                                        record.getConnector());
+                                                                        connector);
 
       targetLocator.setReconnectAttempts(0);
 
@@ -700,7 +676,6 @@
       targetLocator.setConfirmationWindowSize(confirmationWindowSize);
       targetLocator.setBlockOnDurableSend(!useDuplicateDetection);
       targetLocator.setBlockOnNonDurableSend(!useDuplicateDetection);
-      targetLocator.setClusterConnection(true);
 
       targetLocator.setRetryInterval(retryInterval);
       targetLocator.setMaxRetryInterval(maxRetryInterval);
@@ -714,10 +689,37 @@
       {
          targetLocator.setRetryInterval(retryInterval);
       }
+      
+      targetLocator.disableFinalizeCheck();
+      
+      targetLocator.connect();
+      
 
-      manager.addClusterLocator(targetLocator);
+      MessageFlowRecordImpl record = new MessageFlowRecordImpl(targetLocator, targetNodeID, connector, queueName, queue);
 
+      topology.setOwner(record);
+      
+      // Establish a proxy to the main topology. 
+      // We are going to listen for adds and removes on the bridges as well
+      topology.addClusterTopologyListener(new ClusterTopologyListener(){
+
+         public void nodeDown(String nodeID)
+         {
+            clusterManagerTopology.removeMember(nodeID);
+         }
+
+         public void nodeUP(String nodeID,
+                            Pair<TransportConfiguration, TransportConfiguration> connectorPair,
+                            boolean last)
+         {
+            clusterManagerTopology.addMember(nodeID,new TopologyMember(connectorPair), last);
+         }
+         
+      });
+
+
       ClusterConnectionBridge bridge = new ClusterConnectionBridge(this,
+                                                                   manager,
                                                                    targetLocator,
                                                                    serverLocator,
                                                                    reconnectAttempts,
@@ -745,20 +747,36 @@
 
       targetLocator.setIdentity("(Cluster-connection-bridge::" + bridge.toString() + "::" + this.toString() + ")");
 
-      return bridge;
+      if (log.isDebugEnabled())
+      {
+         log.debug("creating record between " + this.connector + " and " + connector + bridge);
+      }
+
+      record.setBridge(bridge);
+
+      records.put(targetNodeID, record);
+
+      if (start)
+      {
+         bridge.start();
+      }
    }
-
+   
    // Inner classes -----------------------------------------------------------------------------------
 
    private class MessageFlowRecordImpl implements MessageFlowRecord
    {
-      private Bridge bridge;
+      private BridgeImpl bridge;
 
       private final String targetNodeID;
 
       private final TransportConfiguration connector;
+      
+      private final ServerLocatorInternal targetLocator;
 
       private final SimpleString queueName;
+      
+      private boolean disconnected = false;;
 
       private final Queue queue;
 
@@ -768,11 +786,13 @@
 
       private volatile boolean firstReset = false;
 
-      public MessageFlowRecordImpl(final String targetNodeID,
+      public MessageFlowRecordImpl(final ServerLocatorInternal targetLocator,
+                                   final String targetNodeID,
                                    final TransportConfiguration connector,
                                    final SimpleString queueName,
                                    final Queue queue)
       {
+         this.targetLocator = targetLocator;
          this.queue = queue;
          this.targetNodeID = targetNodeID;
          this.connector = connector;
@@ -798,6 +818,11 @@
                 firstReset +
                 "]";
       }
+      
+      public void serverDisconnected()
+      {
+         this.disconnected = true;
+      }
 
       public String getAddress()
       {
@@ -850,8 +875,35 @@
 
          isClosed = true;
          clearBindings();
+         
+         if (disconnected)
+         {
+            bridge.disconnect();
+         }
 
          bridge.stop();
+         
+         bridge.getExecutor().execute(new Runnable()
+         {
+            public void run()
+            {
+               try
+               {
+                  if (disconnected)
+                  {
+                     targetLocator.cleanup();
+                  }
+                  else
+                  {
+                     targetLocator.close();
+                  }
+               }
+               catch (Exception ignored)
+               {
+                  log.debug(ignored.getMessage(), ignored);
+               }
+            }
+         });
       }
 
       public boolean isClosed()
@@ -864,7 +916,7 @@
          clearBindings();
       }
 
-      public void setBridge(final Bridge bridge)
+      public void setBridge(final BridgeImpl bridge)
       {
          this.bridge = bridge;
       }
@@ -1306,8 +1358,6 @@
             }
             return new ServerLocatorImpl(clusterManagerTopology,
                                          true,
-                                         server.getThreadPool(),
-                                         server.getScheduledPool(),
                                          tcConfigs);
          }
          else
@@ -1340,8 +1390,6 @@
       {
          return new ServerLocatorImpl(clusterManagerTopology,
                                       true,
-                                      server.getThreadPool(),
-                                      server.getScheduledPool(),
                                       dg);
       }
    }

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java	2011-08-12 15:10:56 UTC (rev 11198)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java	2011-08-13 05:48:57 UTC (rev 11199)
@@ -36,7 +36,6 @@
 import org.hornetq.api.core.client.ClientSessionFactory;
 import org.hornetq.api.core.client.ClusterTopologyListener;
 import org.hornetq.api.core.client.HornetQClient;
-import org.hornetq.api.core.client.ServerLocator;
 import org.hornetq.core.client.impl.ServerLocatorInternal;
 import org.hornetq.core.client.impl.Topology;
 import org.hornetq.core.client.impl.TopologyMember;
@@ -53,7 +52,6 @@
 import org.hornetq.core.server.cluster.Bridge;
 import org.hornetq.core.server.cluster.BroadcastGroup;
 import org.hornetq.core.server.cluster.ClusterConnection;
-import org.hornetq.core.server.cluster.ClusterManager;
 import org.hornetq.core.server.cluster.Transformer;
 import org.hornetq.core.server.management.ManagementService;
 import org.hornetq.utils.ConcurrentHashSet;
@@ -69,7 +67,7 @@
  *
  *
  */
-public class ClusterManagerImpl implements ClusterManager
+public class ClusterManagerImpl implements ClusterManagerInternal
 {
    private static final Logger log = Logger.getLogger(ClusterManagerImpl.class);
 
@@ -126,6 +124,8 @@
       this.executorFactory = executorFactory;
 
       executor = executorFactory.getExecutor();
+      
+      topology.setExecutor(executorFactory.getExecutor());
 
       this.server = server;
 
@@ -482,10 +482,15 @@
       }
    }
 
-   void addClusterLocator(final ServerLocatorInternal serverLocator)
+   public void addClusterLocator(final ServerLocatorInternal serverLocator)
    {
       this.clusterLocators.add(serverLocator);
    }
+   
+   public void removeClusterLocator(final ServerLocatorInternal serverLocator)
+   {
+      this.clusterLocators.remove(serverLocator);
+   }
 
    private synchronized void announceNode()
    {

Added: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerInternal.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerInternal.java	                        (rev 0)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerInternal.java	2011-08-13 05:48:57 UTC (rev 11199)
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.server.cluster.impl;
+
+import org.hornetq.core.client.impl.ServerLocatorInternal;
+import org.hornetq.core.server.cluster.ClusterManager;
+
+/**
+ * A ClusterManagerInternal
+ *
+ * @author clebert
+ *
+ *
+ */
+public interface ClusterManagerInternal extends ClusterManager
+{
+   void addClusterLocator(ServerLocatorInternal locator);
+   
+   void removeClusterLocator(ServerLocatorInternal locator);
+
+}

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2011-08-12 15:10:56 UTC (rev 11198)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2011-08-13 05:48:57 UTC (rev 11199)
@@ -523,7 +523,7 @@
                                          queue.isTemporary(),
                                          filterString,
                                          queue.getConsumerCount(),
-                                         queue.getMessageCount());
+                                         queue.getInstantMessageCount());
       }
       // make an exception for the management address (see HORNETQ-29)
       else if (name.equals(managementAddress))

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java	2011-08-12 15:10:56 UTC (rev 11198)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java	2011-08-13 05:48:57 UTC (rev 11199)
@@ -111,7 +111,7 @@
       }
 
       locators = new ServerLocator[ClusterTestBase.MAX_SERVERS];
-      
+
       // To make sure the test will start with a clean VM
       forceGC();
 
@@ -146,8 +146,6 @@
 
       consumers = new ConsumerHolder[ClusterTestBase.MAX_CONSUMERS];
 
-
-
       nodeManagers = null;
 
       super.tearDown();
@@ -195,7 +193,7 @@
    {
       return consumers[node].consumer;
    }
-   
+
    protected void waitForMessages(final int node, final String address, final int count) throws Exception
    {
       HornetQServer server = servers[node];
@@ -247,37 +245,41 @@
 
       throw new IllegalStateException(msg);
    }
-   
+
    protected void waitForTopology(final HornetQServer server, final int nodes) throws Exception
    {
       waitForTopology(server, nodes, WAIT_TIMEOUT);
    }
-   
+
    protected void waitForTopology(final HornetQServer server, final int nodes, final long timeout) throws Exception
    {
       log.debug("waiting for " + nodes + " on the topology for server = " + server);
 
+      long start = System.currentTimeMillis();
 
-      long start = System.currentTimeMillis();
-      
       Topology topology = server.getClusterManager().getTopology();
 
       do
       {
          if (nodes == topology.getMembers().size())
          {
-           return;
+            return;
          }
 
          Thread.sleep(10);
       }
       while (System.currentTimeMillis() - start < timeout);
-      
-      String msg = "Timed out waiting for cluster topology of " + nodes + " (received " + topology.getMembers().size() + ") topology = " + topology + ")";
 
+      String msg = "Timed out waiting for cluster topology of " + nodes +
+                   " (received " +
+                   topology.getMembers().size() +
+                   ") topology = " +
+                   topology +
+                   ")";
+
       ClusterTestBase.log.error(msg);
-      
-      throw new Exception (msg);
+
+      throw new Exception(msg);
    }
 
    protected void waitForBindings(final int node,
@@ -340,9 +342,15 @@
       }
       while (System.currentTimeMillis() - start < ClusterTestBase.WAIT_TIMEOUT);
 
-      String msg = "Timed out waiting for bindings (bindingCount = " + bindingCount + " (expecting " + expectedBindingCount + ") "+
+      String msg = "Timed out waiting for bindings (bindingCount = " + bindingCount +
+                   " (expecting " +
+                   expectedBindingCount +
+                   ") " +
                    ", totConsumers = " +
-                   totConsumers + " (expecting " + expectedConsumerCount + ")" + 
+                   totConsumers +
+                   " (expecting " +
+                   expectedConsumerCount +
+                   ")" +
                    ")";
 
       ClusterTestBase.log.error(msg);
@@ -364,7 +372,7 @@
 
       StringWriter writer = new StringWriter();
       PrintWriter out = new PrintWriter(writer);
-      
+
       try
       {
          for (HornetQServer hornetQServer : servers)
@@ -372,35 +380,38 @@
             if (hornetQServer != null)
             {
                out.println(clusterDescription(hornetQServer));
-               out.println(debugBindings(hornetQServer, hornetQServer.getConfiguration().getManagementNotificationAddress().toString()));
+               out.println(debugBindings(hornetQServer, hornetQServer.getConfiguration()
+                                                                     .getManagementNotificationAddress()
+                                                                     .toString()));
             }
          }
-         
+
          for (HornetQServer hornetQServer : servers)
          {
             out.println("Management bindings on " + hornetQServer);
             if (hornetQServer != null)
             {
-               out.println(debugBindings(hornetQServer, hornetQServer.getConfiguration().getManagementNotificationAddress().toString()));
+               out.println(debugBindings(hornetQServer, hornetQServer.getConfiguration()
+                                                                     .getManagementNotificationAddress()
+                                                                     .toString()));
             }
          }
       }
       catch (Throwable dontCare)
       {
       }
-      
+
       logAndSystemOut(writer.toString());
-      
+
       throw new IllegalStateException(msg);
    }
-   
-   
+
    protected String debugBindings(final HornetQServer server, final String address) throws Exception
    {
-      
+
       StringWriter str = new StringWriter();
       PrintWriter out = new PrintWriter(str);
-      
+
       if (server == null)
       {
          return "server is shutdown";
@@ -414,7 +425,7 @@
       Bindings bindings = po.getBindingsForAddress(new SimpleString(address));
 
       out.println("=======================================================================");
-      out.println("Binding information for address = " + address + " on "  + server);
+      out.println("Binding information for address = " + address + " on " + server);
 
       for (Binding binding : bindings.getBindings())
       {
@@ -423,7 +434,7 @@
          out.println("Binding = " + qBinding + ", queue=" + qBinding.getQueue());
       }
       out.println("=======================================================================");
-      
+
       return str.toString();
 
    }
@@ -449,7 +460,7 @@
       {
          filterString = ClusterTestBase.FILTER_PROP.toString() + "='" + filterVal + "'";
       }
-      
+
       log.info("Creating " + queueName + " , address " + address + " on " + servers[node]);
 
       session.createQueue(address, queueName, filterString, durable);
@@ -538,7 +549,7 @@
          if (holder != null)
          {
             holder.consumer.close();
-           // holder.session.close();
+            // holder.session.close();
 
             consumers[i] = null;
          }
@@ -621,13 +632,13 @@
             message.putIntProperty(ClusterTestBase.COUNT_PROP, i);
 
             producer.send(message);
-            
+
             if (i % 100 == 0)
             {
                session.commit();
             }
          }
-         
+
          session.commit();
       }
       finally
@@ -851,10 +862,9 @@
 
          for (int j = msgStart; j < msgEnd; j++)
          {
-            
+
             ClientMessage message = holder.consumer.receive(WAIT_TIMEOUT);
-            
-            
+
             if (message == null)
             {
                ClusterTestBase.log.info("*** dumping consumers:");
@@ -863,9 +873,9 @@
 
                Assert.fail("consumer " + consumerID + " did not receive message " + j);
             }
-            
-            log.info("msg on ClusterTestBase = " + message);            
 
+            log.info("msg on ClusterTestBase = " + message);
+
             if (ack)
             {
                message.acknowledge();
@@ -880,15 +890,17 @@
             {
                if (firstOutOfOrderMessage == null)
                {
-                  firstOutOfOrderMessage = "expected " + j + " received " + message.getObjectProperty(ClusterTestBase.COUNT_PROP);
+                  firstOutOfOrderMessage = "expected " + j +
+                                           " received " +
+                                           message.getObjectProperty(ClusterTestBase.COUNT_PROP);
                }
                outOfOrder = true;
                System.out.println("Message j=" + j +
                                   " was received out of order = " +
                                   message.getObjectProperty(ClusterTestBase.COUNT_PROP));
                log.info("Message j=" + j +
-                                  " was received out of order = " +
-                                  message.getObjectProperty(ClusterTestBase.COUNT_PROP));
+                        " was received out of order = " +
+                        message.getObjectProperty(ClusterTestBase.COUNT_PROP));
             }
          }
       }
@@ -908,7 +920,7 @@
          }
       }
    }
-   
+
    protected String clusterDescription(HornetQServer server)
    {
       String br = "-------------------------\n";
@@ -990,28 +1002,28 @@
          if (consumerIDs[count] >= 0)
          {
             ConsumerHolder holder = consumers[consumerIDs[count]];
-   
+
             if (holder == null)
             {
                throw new IllegalArgumentException("No consumer at " + consumerIDs[i]);
             }
-   
+
             ClientMessage message = holder.consumer.receive(WAIT_TIMEOUT);
-            
+
             message.acknowledge();
-            
+
             consumers[consumerIDs[count]].session.commit();
-            
+
             System.out.println("Msg: " + message);
-   
+
             Assert.assertNotNull("consumer " + consumerIDs[count] + " did not receive message " + i, message);
-   
+
             Assert.assertEquals("consumer " + consumerIDs[count] + " message " + i,
                                 i,
                                 message.getObjectProperty(ClusterTestBase.COUNT_PROP));
 
          }
-         
+
          count++;
 
          if (count == consumerIDs.length)
@@ -1277,7 +1289,7 @@
       {
          res[j++] = i;
       }
-      
+
       if (ack)
       {
          // just to flush acks
@@ -1340,11 +1352,11 @@
       locators[node].setBlockOnDurableSend(true);
       ClientSessionFactory sf = locators[node].createSessionFactory();
 
-      sf.createSession().close();
+      ClientSession session = sf.createSession();
+      session.close();
       sfs[node] = sf;
    }
 
-
    protected void setupSessionFactory(final int node, final boolean netty, int reconnectAttempts) throws Exception
    {
       if (sfs[node] != null)
@@ -1395,7 +1407,6 @@
          serverTotc = new TransportConfiguration(ServiceTestBase.INVM_CONNECTOR_FACTORY, params);
       }
 
-
       locators[node] = HornetQClient.createServerLocatorWithHA(serverTotc);
       locators[node].setRetryInterval(100);
       locators[node].setRetryIntervalMultiplier(1d);
@@ -1431,79 +1442,79 @@
                                   final boolean fileStorage,
                                   final boolean sharedStorage,
                                   final boolean netty)
+   {
+      if (servers[node] != null)
       {
-         if (servers[node] != null)
-         {
-            throw new IllegalArgumentException("Already a server at node " + node);
-         }
+         throw new IllegalArgumentException("Already a server at node " + node);
+      }
 
-         Configuration configuration = createBasicConfig();
+      Configuration configuration = createBasicConfig();
 
-         configuration.setSecurityEnabled(false);
-         configuration.setJournalMinFiles(2);
-         configuration.setJournalMaxIO_AIO(1000);
-         configuration.setJournalFileSize(100 * 1024);
-         configuration.setJournalType(getDefaultJournalType());
-         configuration.setSharedStore(sharedStorage);
-         configuration.setThreadPoolMaxSize(10);
+      configuration.setSecurityEnabled(false);
+      configuration.setJournalMinFiles(2);
+      configuration.setJournalMaxIO_AIO(1000);
+      configuration.setJournalFileSize(100 * 1024);
+      configuration.setJournalType(getDefaultJournalType());
+      configuration.setSharedStore(sharedStorage);
+      configuration.setThreadPoolMaxSize(10);
+      if (sharedStorage)
+      {
+         // Shared storage will share the node between the backup and live node
+         configuration.setBindingsDirectory(getBindingsDir(node, false));
+         configuration.setJournalDirectory(getJournalDir(node, false));
+         configuration.setPagingDirectory(getPageDir(node, false));
+         configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, false));
+      }
+      else
+      {
+         configuration.setBindingsDirectory(getBindingsDir(node, true));
+         configuration.setJournalDirectory(getJournalDir(node, true));
+         configuration.setPagingDirectory(getPageDir(node, true));
+         configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, true));
+      }
+      configuration.setClustered(true);
+      configuration.setJournalCompactMinFiles(0);
+
+      configuration.getAcceptorConfigurations().clear();
+      configuration.getAcceptorConfigurations().add(createTransportConfiguration(netty, true, generateParams(node,
+                                                                                                             netty)));
+
+      HornetQServer server;
+
+      if (fileStorage)
+      {
          if (sharedStorage)
          {
-            // Shared storage will share the node between the backup and live node
-            configuration.setBindingsDirectory(getBindingsDir(node, false));
-            configuration.setJournalDirectory(getJournalDir(node, false));
-            configuration.setPagingDirectory(getPageDir(node, false));
-            configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, false));
+            server = createInVMFailoverServer(true, configuration, nodeManagers[node], node);
          }
          else
          {
-            configuration.setBindingsDirectory(getBindingsDir(node, true));
-            configuration.setJournalDirectory(getJournalDir(node, true));
-            configuration.setPagingDirectory(getPageDir(node, true));
-            configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, true));
+            server = HornetQServers.newHornetQServer(configuration);
+            server.setIdentity("Server " + node);
          }
-         configuration.setClustered(true);
-         configuration.setJournalCompactMinFiles(0);
-
-         configuration.getAcceptorConfigurations().clear();
-         configuration.getAcceptorConfigurations().add(createTransportConfiguration(netty, true, generateParams(node, netty)));
-
-         HornetQServer server;
-
-         if (fileStorage)
+      }
+      else
+      {
+         if (sharedStorage)
          {
-            if (sharedStorage)
-            {
-               server = createInVMFailoverServer(true, configuration, nodeManagers[node], node);
-            }
-            else
-            {
-               server = HornetQServers.newHornetQServer(configuration);
-               server.setIdentity("Server " + node);
-            }
+            server = createInVMFailoverServer(false, configuration, nodeManagers[node], node);
          }
          else
          {
-            if (sharedStorage)
-            {
-               server = createInVMFailoverServer(false, configuration,  nodeManagers[node], node);
-            }
-            else
-            {
-               server = HornetQServers.newHornetQServer(configuration, false);
-               server.setIdentity("Server " + node);
-            }
+            server = HornetQServers.newHornetQServer(configuration, false);
+            server.setIdentity("Server " + node);
          }
-         
-         server.setIdentity(this.getClass().getSimpleName() + "/Live(" + node + ")");
-         servers[node] = server;
       }
 
+      server.setIdentity(this.getClass().getSimpleName() + "/Live(" + node + ")");
+      servers[node] = server;
+   }
 
-    protected void setupBackupServer(final int node,
-                                     final int liveNode,
-                                     final boolean fileStorage,
-                                     final boolean sharedStorage,
-                                     final boolean netty)
+   protected void setupBackupServer(final int node,
+                                    final int liveNode,
+                                    final boolean fileStorage,
+                                    final boolean sharedStorage,
+                                    final boolean netty)
    {
       if (servers[node] != null)
       {
@@ -1540,7 +1551,7 @@
       configuration.getAcceptorConfigurations().clear();
       TransportConfiguration acceptorConfig = createTransportConfiguration(netty, true, generateParams(node, netty));
       configuration.getAcceptorConfigurations().add(acceptorConfig);
-      //add backup connector
+      // add backup connector
       TransportConfiguration liveConfig = createTransportConfiguration(netty, false, generateParams(liveNode, netty));
       configuration.getConnectorConfigurations().put(liveConfig.getName(), liveConfig);
       TransportConfiguration backupConfig = createTransportConfiguration(netty, false, generateParams(node, netty));
@@ -1577,174 +1588,183 @@
    }
 
    protected void setupLiveServerWithDiscovery(final int node,
-                                             final String groupAddress,
-                                             final int port,
-                                             final boolean fileStorage,
-                                             final boolean netty,
-                                             final boolean sharedStorage)
-     {
-        if (servers[node] != null)
-        {
-           throw new IllegalArgumentException("Already a server at node " + node);
-        }
+                                               final String groupAddress,
+                                               final int port,
+                                               final boolean fileStorage,
+                                               final boolean netty,
+                                               final boolean sharedStorage)
+   {
+      if (servers[node] != null)
+      {
+         throw new IllegalArgumentException("Already a server at node " + node);
+      }
 
-        Configuration configuration = createBasicConfig();
+      Configuration configuration = createBasicConfig();
 
-        configuration.setSecurityEnabled(false);
-        configuration.setBindingsDirectory(getBindingsDir(node, false));
-        configuration.setJournalMinFiles(2);
-        configuration.setJournalDirectory(getJournalDir(node, false));
-        configuration.setJournalFileSize(100 * 1024);
-        configuration.setJournalType(getDefaultJournalType());
-        configuration.setJournalMaxIO_AIO(1000);
-        configuration.setPagingDirectory(getPageDir(node, false));
-        configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, false));
-        configuration.setClustered(true);
-        configuration.setBackup(false);
+      configuration.setSecurityEnabled(false);
+      configuration.setBindingsDirectory(getBindingsDir(node, false));
+      configuration.setJournalMinFiles(2);
+      configuration.setJournalDirectory(getJournalDir(node, false));
+      configuration.setJournalFileSize(100 * 1024);
+      configuration.setJournalType(getDefaultJournalType());
+      configuration.setJournalMaxIO_AIO(1000);
+      configuration.setPagingDirectory(getPageDir(node, false));
+      configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, false));
+      configuration.setClustered(true);
+      configuration.setBackup(false);
 
-        configuration.getAcceptorConfigurations().clear();
+      configuration.getAcceptorConfigurations().clear();
 
-        Map<String, Object> params = generateParams(node, netty);
+      Map<String, Object> params = generateParams(node, netty);
 
-        configuration.getAcceptorConfigurations().add(createTransportConfiguration(netty, true, params));
+      configuration.getAcceptorConfigurations().add(createTransportConfiguration(netty, true, params));
 
-        TransportConfiguration connector = createTransportConfiguration(netty, false, params);
-        configuration.getConnectorConfigurations().put(connector.getName(), connector);
+      TransportConfiguration connector = createTransportConfiguration(netty, false, params);
+      configuration.getConnectorConfigurations().put(connector.getName(), connector);
 
-        List<String> connectorPairs = new ArrayList<String>();
-        connectorPairs.add(connector.getName());
+      List<String> connectorPairs = new ArrayList<String>();
+      connectorPairs.add(connector.getName());
 
-        BroadcastGroupConfiguration bcConfig = new BroadcastGroupConfiguration("bg1",
-                                                                               null,
-                                                                               -1,
-                                                                               groupAddress,
-                                                                               port,
-                                                                               1000,
-                                                                               connectorPairs);
+      BroadcastGroupConfiguration bcConfig = new BroadcastGroupConfiguration("bg1",
+                                                                             null,
+                                                                             -1,
+                                                                             groupAddress,
+                                                                             port,
+                                                                             1000,
+                                                                             connectorPairs);
 
-        configuration.getBroadcastGroupConfigurations().add(bcConfig);
+      configuration.getBroadcastGroupConfigurations().add(bcConfig);
 
-        DiscoveryGroupConfiguration dcConfig = new DiscoveryGroupConfiguration("dg1", null, groupAddress, port, 5000, 5000);
+      DiscoveryGroupConfiguration dcConfig = new DiscoveryGroupConfiguration("dg1",
+                                                                             null,
+                                                                             groupAddress,
+                                                                             port,
+                                                                             5000,
+                                                                             5000);
 
-        configuration.getDiscoveryGroupConfigurations().put(dcConfig.getName(), dcConfig);
+      configuration.getDiscoveryGroupConfigurations().put(dcConfig.getName(), dcConfig);
 
-        HornetQServer server;
-        if (fileStorage)
-        {
-           if (sharedStorage)
-           {
-              server = createInVMFailoverServer(true, configuration, nodeManagers[node], node);
-           }
-           else
-           {
-              server = HornetQServers.newHornetQServer(configuration);
-              server.setIdentity("Server " + node);
-           }
-        }
-        else
-        {
-           if (sharedStorage)
-           {
-              server = createInVMFailoverServer(false, configuration, nodeManagers[node], node);
-           }
-           else
-           {
-              server = HornetQServers.newHornetQServer(configuration, false);
-              server.setIdentity("Server " + node);
-           }
-        }
-        servers[node] = server;
-     }
+      HornetQServer server;
+      if (fileStorage)
+      {
+         if (sharedStorage)
+         {
+            server = createInVMFailoverServer(true, configuration, nodeManagers[node], node);
+         }
+         else
+         {
+            server = HornetQServers.newHornetQServer(configuration);
+            server.setIdentity("Server " + node);
+         }
+      }
+      else
+      {
+         if (sharedStorage)
+         {
+            server = createInVMFailoverServer(false, configuration, nodeManagers[node], node);
+         }
+         else
+         {
+            server = HornetQServers.newHornetQServer(configuration, false);
+            server.setIdentity("Server " + node);
+         }
+      }
+      servers[node] = server;
+   }
 
    protected void setupBackupServerWithDiscovery(final int node,
-                                             final int liveNode,
-                                             final String groupAddress,
-                                             final int port,
-                                             final boolean fileStorage,
-                                             final boolean netty,
-                                             final boolean sharedStorage)
-     {
-        if (servers[node] != null)
-        {
-           throw new IllegalArgumentException("Already a server at node " + node);
-        }
+                                                 final int liveNode,
+                                                 final String groupAddress,
+                                                 final int port,
+                                                 final boolean fileStorage,
+                                                 final boolean netty,
+                                                 final boolean sharedStorage)
+   {
+      if (servers[node] != null)
+      {
+         throw new IllegalArgumentException("Already a server at node " + node);
+      }
 
-        Configuration configuration = createBasicConfig();
+      Configuration configuration = createBasicConfig();
 
-        configuration.setSecurityEnabled(false);
-        configuration.setSharedStore(sharedStorage);
-        if (sharedStorage)
-        {
-           // Shared storage will share the node between the backup and live node
-           configuration.setBindingsDirectory(getBindingsDir(liveNode, false));
-           configuration.setJournalDirectory(getJournalDir(liveNode, false));
-           configuration.setPagingDirectory(getPageDir(liveNode, false));
-           configuration.setLargeMessagesDirectory(getLargeMessagesDir(liveNode, false));
-        }
-        else
-        {
-           configuration.setBindingsDirectory(getBindingsDir(node, true));
-           configuration.setJournalDirectory(getJournalDir(node, true));
-           configuration.setPagingDirectory(getPageDir(node, true));
-           configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, true));
-        }
-        configuration.setClustered(true);
-        configuration.setBackup(true);
+      configuration.setSecurityEnabled(false);
+      configuration.setSharedStore(sharedStorage);
+      if (sharedStorage)
+      {
+         // Shared storage will share the node between the backup and live node
+         configuration.setBindingsDirectory(getBindingsDir(liveNode, false));
+         configuration.setJournalDirectory(getJournalDir(liveNode, false));
+         configuration.setPagingDirectory(getPageDir(liveNode, false));
+         configuration.setLargeMessagesDirectory(getLargeMessagesDir(liveNode, false));
+      }
+      else
+      {
+         configuration.setBindingsDirectory(getBindingsDir(node, true));
+         configuration.setJournalDirectory(getJournalDir(node, true));
+         configuration.setPagingDirectory(getPageDir(node, true));
+         configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, true));
+      }
+      configuration.setClustered(true);
+      configuration.setBackup(true);
 
-        configuration.getAcceptorConfigurations().clear();
+      configuration.getAcceptorConfigurations().clear();
 
-        Map<String, Object> params = generateParams(node, netty);
+      Map<String, Object> params = generateParams(node, netty);
 
-        configuration.getAcceptorConfigurations().add(createTransportConfiguration(netty, true, params));
+      configuration.getAcceptorConfigurations().add(createTransportConfiguration(netty, true, params));
 
-        TransportConfiguration connector = createTransportConfiguration(netty, false, params);
-        configuration.getConnectorConfigurations().put(connector.getName(), connector);
+      TransportConfiguration connector = createTransportConfiguration(netty, false, params);
+      configuration.getConnectorConfigurations().put(connector.getName(), connector);
 
-        List<String> connectorPairs = new ArrayList<String>();
-        connectorPairs.add(connector.getName());
+      List<String> connectorPairs = new ArrayList<String>();
+      connectorPairs.add(connector.getName());
 
-        BroadcastGroupConfiguration bcConfig = new BroadcastGroupConfiguration("bg1",
-                                                                               null,
-                                                                               -1,
-                                                                               groupAddress,
-                                                                               port,
-                                                                               1000,
-                                                                               connectorPairs);
+      BroadcastGroupConfiguration bcConfig = new BroadcastGroupConfiguration("bg1",
+                                                                             null,
+                                                                             -1,
+                                                                             groupAddress,
+                                                                             port,
+                                                                             1000,
+                                                                             connectorPairs);
 
-        configuration.getBroadcastGroupConfigurations().add(bcConfig);
+      configuration.getBroadcastGroupConfigurations().add(bcConfig);
 
-        DiscoveryGroupConfiguration dcConfig = new DiscoveryGroupConfiguration("dg1", null, groupAddress, port, 5000, 5000);
+      DiscoveryGroupConfiguration dcConfig = new DiscoveryGroupConfiguration("dg1",
+                                                                             null,
+                                                                             groupAddress,
+                                                                             port,
+                                                                             5000,
+                                                                             5000);
 
-        configuration.getDiscoveryGroupConfigurations().put(dcConfig.getName(), dcConfig);
+      configuration.getDiscoveryGroupConfigurations().put(dcConfig.getName(), dcConfig);
 
-        HornetQServer server;
-        if (fileStorage)
-        {
-           if (sharedStorage)
-           {
-              server = createInVMFailoverServer(true, configuration, nodeManagers[liveNode], liveNode);
-           }
-           else
-           {
-              server = HornetQServers.newHornetQServer(configuration);
-              server.setIdentity("Server " + liveNode);
-           }
-        }
-        else
-        {
-           if (sharedStorage)
-           {
-              server = createInVMFailoverServer(false, configuration, nodeManagers[liveNode], liveNode);
-           }
-           else
-           {
-              server = HornetQServers.newHornetQServer(configuration, false);
-           }
-        }
-        servers[node] = server;
-     }
+      HornetQServer server;
+      if (fileStorage)
+      {
+         if (sharedStorage)
+         {
+            server = createInVMFailoverServer(true, configuration, nodeManagers[liveNode], liveNode);
+         }
+         else
+         {
+            server = HornetQServers.newHornetQServer(configuration);
+            server.setIdentity("Server " + liveNode);
+         }
+      }
+      else
+      {
+         if (sharedStorage)
+         {
+            server = createInVMFailoverServer(false, configuration, nodeManagers[liveNode], liveNode);
+         }
+         else
+         {
+            server = HornetQServers.newHornetQServer(configuration, false);
+         }
+      }
+      servers[node] = server;
+   }
 
-
    protected void clearServer(final int... nodes)
    {
       for (int i = 0; i < nodes.length; i++)
@@ -1781,12 +1801,12 @@
       {
          throw new IllegalStateException("No server at node " + nodeFrom);
       }
-      
+
       TransportConfiguration connectorFrom = createTransportConfiguration(netty, false, generateParams(nodeFrom, netty));
       serverFrom.getConfiguration().getConnectorConfigurations().put(name, connectorFrom);
 
       List<String> pairs = null;
-      
+
       if (nodeTo != -1)
       {
          TransportConfiguration serverTotc = createTransportConfiguration(netty, false, generateParams(nodeTo, netty));
@@ -1803,11 +1823,11 @@
                                                                                       forwardWhenNoConsumers,
                                                                                       maxHops,
                                                                                       1024,
-                                                                                      pairs, allowDirectConnectionsOnly);
+                                                                                      pairs,
+                                                                                      allowDirectConnectionsOnly);
       serverFrom.getConfiguration().getClusterConfigurations().add(clusterConf);
    }
 
-
    protected void setupClusterConnection(final String name,
                                          final String address,
                                          final boolean forwardWhenNoConsumers,
@@ -1825,7 +1845,7 @@
 
       TransportConfiguration connectorFrom = createTransportConfiguration(netty, false, generateParams(nodeFrom, netty));
       serverFrom.getConfiguration().getConnectorConfigurations().put(connectorFrom.getName(), connectorFrom);
-      
+
       List<String> pairs = new ArrayList<String>();
       for (int element : nodesTo)
       {
@@ -1863,7 +1883,7 @@
 
       TransportConfiguration connectorFrom = createTransportConfiguration(netty, false, generateParams(nodeFrom, netty));
       serverFrom.getConfiguration().getConnectorConfigurations().put(connectorFrom.getName(), connectorFrom);
-      
+
       List<String> pairs = new ArrayList<String>();
       for (int element : nodesTo)
       {
@@ -1872,21 +1892,21 @@
          pairs.add(serverTotc.getName());
       }
       ClusterConnectionConfiguration clusterConf = new ClusterConnectionConfiguration(name,
-           address,
-           connectorFrom.getName(),
-           ConfigurationImpl.DEFAULT_CLUSTER_FAILURE_CHECK_PERIOD,
-           ConfigurationImpl.DEFAULT_CLUSTER_CONNECTION_TTL,
-           retryInterval,
-           ConfigurationImpl.DEFAULT_CLUSTER_RETRY_INTERVAL_MULTIPLIER,
-           ConfigurationImpl.DEFAULT_CLUSTER_MAX_RETRY_INTERVAL,
-           reconnectAttempts,
-           1000,
-           true,
-           forwardWhenNoConsumers,
-           maxHops,
-           1024,
-           pairs,
-           false);
+                                                                                      address,
+                                                                                      connectorFrom.getName(),
+                                                                                      ConfigurationImpl.DEFAULT_CLUSTER_FAILURE_CHECK_PERIOD,
+                                                                                      ConfigurationImpl.DEFAULT_CLUSTER_CONNECTION_TTL,
+                                                                                      retryInterval,
+                                                                                      ConfigurationImpl.DEFAULT_CLUSTER_RETRY_INTERVAL_MULTIPLIER,
+                                                                                      ConfigurationImpl.DEFAULT_CLUSTER_MAX_RETRY_INTERVAL,
+                                                                                      reconnectAttempts,
+                                                                                      1000,
+                                                                                      true,
+                                                                                      forwardWhenNoConsumers,
+                                                                                      maxHops,
+                                                                                      1024,
+                                                                                      pairs,
+                                                                                      false);
 
       serverFrom.getConfiguration().getClusterConfigurations().add(clusterConf);
    }
@@ -1906,18 +1926,19 @@
                                                                 final int maxHops,
                                                                 TransportConfiguration connectorFrom,
                                                                 List<String> pairs)
-     {
-        ClusterConnectionConfiguration clusterConf = new ClusterConnectionConfiguration(name,
-                                                                                        address,
-                                                                                        connectorFrom.getName(),
-                                                                                        250,
-                                                                                        true,
-                                                                                        forwardWhenNoConsumers,
-                                                                                        maxHops,
-                                                                                        1024,
-                                                                                        pairs, false);
-        return clusterConf;
-     }
+   {
+      ClusterConnectionConfiguration clusterConf = new ClusterConnectionConfiguration(name,
+                                                                                      address,
+                                                                                      connectorFrom.getName(),
+                                                                                      250,
+                                                                                      true,
+                                                                                      forwardWhenNoConsumers,
+                                                                                      maxHops,
+                                                                                      1024,
+                                                                                      pairs,
+                                                                                      false);
+      return clusterConf;
+   }
 
    protected void setupClusterConnectionWithBackups(final String name,
                                                     final String address,
@@ -1953,7 +1974,8 @@
                                                                                       forwardWhenNoConsumers,
                                                                                       maxHops,
                                                                                       1024,
-                                                                                      pairs, false);
+                                                                                      pairs,
+                                                                                      false);
 
       serverFrom.getConfiguration().getClusterConfigurations().add(clusterConf);
    }
@@ -1975,7 +1997,7 @@
 
       TransportConfiguration connectorConfig = createTransportConfiguration(netty, false, generateParams(node, netty));
       server.getConfiguration().getConnectorConfigurations().put(name, connectorConfig);
-      
+
       ClusterConnectionConfiguration clusterConf = new ClusterConnectionConfiguration(name,
                                                                                       address,
                                                                                       name,
@@ -2011,22 +2033,20 @@
           * */
          Thread.sleep(500);
       }
-      
-      
+
    }
 
-   protected void waitForServer(HornetQServer server)
-         throws InterruptedException
+   protected void waitForServer(HornetQServer server) throws InterruptedException
    {
-      long timetowait =System.currentTimeMillis() + 5000;
-      while(!server.isStarted())
+      long timetowait = System.currentTimeMillis() + 5000;
+      while (!server.isStarted())
       {
          Thread.sleep(100);
-         if(server.isStarted())
+         if (server.isStarted())
          {
             break;
          }
-         else if(System.currentTimeMillis() > timetowait)
+         else if (System.currentTimeMillis() > timetowait)
          {
             fail("server didnt start");
          }
@@ -2049,7 +2069,7 @@
 
    protected void stopServers(final int... nodes) throws Exception
    {
-      log.info("Stopping nodes "  + Arrays.toString(nodes));
+      log.info("Stopping nodes " + Arrays.toString(nodes));
       for (int node : nodes)
       {
          log.info("#test stop server " + node);

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/NettyFileStorageSymmetricClusterTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/NettyFileStorageSymmetricClusterTest.java	2011-08-12 15:10:56 UTC (rev 11198)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/NettyFileStorageSymmetricClusterTest.java	2011-08-13 05:48:57 UTC (rev 11199)
@@ -13,6 +13,8 @@
 
 package org.hornetq.tests.integration.cluster.distribution;
 
+import org.hornetq.core.logging.Logger;
+
 /**
  * A NettyFileStorageSymmetricClusterTest
  *
@@ -22,17 +24,17 @@
  */
 public class NettyFileStorageSymmetricClusterTest extends SymmetricClusterTest
 {
+   Logger log = Logger.getLogger(NettyFileStorageSymmetricClusterTest.class);
+
    @Override
    protected boolean isNetty()
    {
       return true;
    }
-   
-   
+
    protected boolean isFileStorage()
    {
       return true;
    }
 
-
 }

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java	2011-08-12 15:10:56 UTC (rev 11198)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java	2011-08-13 05:48:57 UTC (rev 11199)
@@ -242,12 +242,6 @@
       waitForBindings(3, "queues.testaddress", 1, 1, true);
       waitForBindings(4, "queues.testaddress", 1, 1, true);
 
-      System.out.println(clusterDescription(servers[0]));
-      System.out.println(clusterDescription(servers[1]));
-      System.out.println(clusterDescription(servers[2]));
-      System.out.println(clusterDescription(servers[3]));
-      System.out.println(clusterDescription(servers[4]));
-
       waitForBindings(0, "queues.testaddress", 4, 4, false);
       waitForBindings(1, "queues.testaddress", 4, 4, false);
       waitForBindings(2, "queues.testaddress", 4, 4, false);

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/UnitTestCase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/UnitTestCase.java	2011-08-12 15:10:56 UTC (rev 11198)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/UnitTestCase.java	2011-08-13 05:48:57 UTC (rev 11199)
@@ -980,7 +980,7 @@
          
          for (Thread aliveThread : postThreads.keySet())
          {
-            if (!aliveThread.getName().contains("SunPKCS11") && !previousThreads.containsKey(aliveThread))
+            if (!aliveThread.getName().contains("SunPKCS11") && !aliveThread.getName().contains("Attach Listener") && !previousThreads.containsKey(aliveThread))
             {
                failedThread = true;
                buffer.append("=============================================================================\n");



More information about the hornetq-commits mailing list