[hornetq-commits] JBoss hornetq SVN: r11244 - in branches/Branch_2_2_EAP_cluster_clean3: src/main/org/hornetq/core/server/cluster/impl and 3 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Aug 30 13:13:33 EDT 2011


Author: clebert.suconic at jboss.com
Date: 2011-08-30 13:13:32 -0400 (Tue, 30 Aug 2011)
New Revision: 11244

Added:
   branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/AfterConnectInternalListener.java
Modified:
   branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
   branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
   branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
   branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/Topology.java
   branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
   branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
   branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
   branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
   branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java
   branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java
   branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/util/SameProcessHornetQServer.java
Log:
Adding AfterConnectioninternal to my branch, and adjusting versioning

Added: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/AfterConnectInternalListener.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/AfterConnectInternalListener.java	                        (rev 0)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/AfterConnectInternalListener.java	2011-08-30 17:13:32 UTC (rev 11244)
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.client.impl;
+
+/**
+ * To be called right after the ConnectionFactory created a connection.
+ * This listener is not part of the API and shouldn't be used by users.
+ * (if you do so we can't guarantee any API compatibility on this class) 
+ *
+ * @author clebertsuconic
+ *
+ *
+ */
+public interface AfterConnectInternalListener
+{
+   void onConnection(ClientSessionFactoryInternal sf);
+}

Modified: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java	2011-08-30 11:05:47 UTC (rev 11243)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java	2011-08-30 17:13:32 UTC (rev 11244)
@@ -1293,23 +1293,15 @@
             }
 
             channel0.send(new SubscribeClusterTopologyUpdatesMessageV2(serverLocator.isClusterConnection(), VersionLoader.getVersion().getIncrementingVersion()));
-            
-            
-            if (serverLocator.isClusterConnection())
-            {
-               TransportConfiguration config = serverLocator.getClusterTransportConfiguration();
-               if (ClientSessionFactoryImpl.isDebug)
-               {
-                  ClientSessionFactoryImpl.log.debug("Announcing node " + serverLocator.getNodeID() +
-                                                     ", isBackup=" +
-                                                     serverLocator.isBackup());
-               }
-               sendNodeAnnounce(System.currentTimeMillis(), serverLocator.getNodeID(), serverLocator.isBackup(), config, null);
-               //channel0.send(new NodeAnnounceMessage(serverLocator.getNodeID(), serverLocator.isBackup(), config));
-            }
+
          }
       }
 
+      if (serverLocator.getAfterConnectInternalListener() != null)
+      {
+         serverLocator.getAfterConnectInternalListener().onConnection(this);
+      }
+
       if (ClientSessionFactoryImpl.log.isTraceEnabled())
       {
          ClientSessionFactoryImpl.log.trace("returning " + connection);

Modified: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java	2011-08-30 11:05:47 UTC (rev 11243)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java	2011-08-30 17:13:32 UTC (rev 11244)
@@ -165,6 +165,8 @@
    private Executor startExecutor;
 
    private static ScheduledExecutorService globalScheduledThreadPool;
+   
+   private AfterConnectInternalListener afterConnectListener;
 
    private String groupID;
 
@@ -578,6 +580,19 @@
       return sf;
    }
 
+   /* (non-Javadoc)
+    * @see org.hornetq.core.client.impl.ServerLocatorInternal#setAfterConnectionInternalListener(org.hornetq.core.client.impl.AfterConnectInternalListener)
+    */
+   public void setAfterConnectionInternalListener(AfterConnectInternalListener listener)
+   {
+      this.afterConnectListener = listener;
+   }
+
+   public AfterConnectInternalListener getAfterConnectInternalListener()
+   {
+      return afterConnectListener;
+   }
+   
    public boolean isClosed()
    {
       return closed || closing;

Modified: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java	2011-08-30 11:05:47 UTC (rev 11243)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java	2011-08-30 17:13:32 UTC (rev 11244)
@@ -35,6 +35,10 @@
    
    void factoryClosed(final ClientSessionFactory factory);
    
+   AfterConnectInternalListener getAfterConnectInternalListener();
+   
+   void setAfterConnectionInternalListener(AfterConnectInternalListener listener);
+   
    /** Used to better identify Cluster Connection Locators on logs while debugging logs */
    void setIdentity(String identity);
 

Modified: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/Topology.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/Topology.java	2011-08-30 11:05:47 UTC (rev 11243)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/Topology.java	2011-08-30 17:13:32 UTC (rev 11244)
@@ -64,7 +64,7 @@
     * values are a pair of live/backup transport configurations
     */
    private final Map<String, TopologyMember> mapTopology = new ConcurrentHashMap<String, TopologyMember>();
-   
+
    private final Map<String, Long> mapDelete = new ConcurrentHashMap<String, Long>();
 
    public Topology(final Object owner)
@@ -132,14 +132,14 @@
 
       synchronized (this)
       {
-         // TODO treat versioning here. it should remove any previous version
-         // However, if the previous version has a higher time (say if the node time where the system died), we should
-         // use that number ++
-
          TopologyMember currentMember = getMember(nodeId);
          if (currentMember == null)
          {
-            log.warn("There's no live to be updated on backup update", new Exception("trace"));
+            log.warn("There's no live to be updated on backup update, node=" + nodeId + " memberInput=" + memberInput,
+                     new Exception("trace"));
+
+            currentMember = memberInput;
+            mapTopology.put(nodeId, currentMember);
          }
 
          TopologyMember newMember = new TopologyMember(currentMember.getConnector().a, memberInput.getConnector().b);
@@ -162,16 +162,24 @@
     */
    public boolean updateMember(final long uniqueEventID, final String nodeId, final TopologyMember memberInput)
    {
+      
+      if (memberInput.getConnector().a == null && memberInput.getConnector().b != null)
+      {
+         updateBackup(nodeId, memberInput);
+         return true;
+      }
+
       Long deleteTme = mapDelete.get(nodeId);
       if (deleteTme != null && uniqueEventID < deleteTme)
       {
+         log.debug("Update uniqueEvent=" + uniqueEventID +
+                   ", nodeId=" +
+                   nodeId +
+                   ", memberInput=" +
+                   memberInput +
+                   " being rejected as there was a delete done after that");
          return false;
       }
-      
-      if (log.isTraceEnabled())
-      {
-      //   log.trace(this + "::UpdateMember::" + uniqueEventID + ", nodeID=" + nodeId + ", memberInput=" + memberInput);
-      }
 
       synchronized (this)
       {
@@ -201,7 +209,8 @@
             {
                if (log.isDebugEnabled())
                {
-                  log.debug(this + "::updated currentMember=nodeID=" + nodeId  +
+                  log.debug(this + "::updated currentMember=nodeID=" +
+                            nodeId +
                             currentMember +
                             " of memberInput=" +
                             memberInput);

Modified: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java	2011-08-30 11:05:47 UTC (rev 11243)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java	2011-08-30 17:13:32 UTC (rev 11244)
@@ -159,7 +159,6 @@
       try
       {
          TopologyMember member = clusterManager.getLocalMember();
-         factory.sendNodeAnnounce(member.getUniqueEventID(), clusterManager.getNodeId(), false, member.getConnector().a, member.getConnector().b);
       }
       catch (Exception e)
       {

Modified: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2011-08-30 11:05:47 UTC (rev 11243)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2011-08-30 17:13:32 UTC (rev 11244)
@@ -33,9 +33,9 @@
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.TransportConfiguration;
 import org.hornetq.api.core.client.ClientMessage;
-import org.hornetq.api.core.client.ClusterTopologyListener;
 import org.hornetq.api.core.management.ManagementHelper;
 import org.hornetq.api.core.management.NotificationType;
+import org.hornetq.core.client.impl.AfterConnectInternalListener;
 import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
 import org.hornetq.core.client.impl.ServerLocatorImpl;
 import org.hornetq.core.client.impl.ServerLocatorInternal;
@@ -70,7 +70,7 @@
  *
  *
  */
-public class ClusterConnectionImpl implements ClusterConnection
+public class ClusterConnectionImpl implements ClusterConnection, AfterConnectInternalListener
 {
    private static final Logger log = Logger.getLogger(ClusterConnectionImpl.class);
 
@@ -412,6 +412,25 @@
       started = false;
    }
 
+   /* (non-Javadoc)
+    * @see org.hornetq.core.client.impl.AfterConnectInternalListener#onConnection(org.hornetq.core.client.impl.ClientSessionFactoryInternal)
+    */
+   public void onConnection(ClientSessionFactoryInternal sf)
+   {
+      TopologyMember localMember = manager.getLocalMember();
+      sf.sendNodeAnnounce(localMember.getUniqueEventID(),
+                          manager.getNodeId(),
+                          false,
+                          localMember.getConnector().a,
+                          localMember.getConnector().b);
+
+      // sf.sendNodeAnnounce(System.currentTimeMillis(),
+      // manager.getNodeId(),
+      // false,
+      // localMember.getConnector().a,
+      // localMember.getConnector().b);
+   }
+
    public boolean isStarted()
    {
       return started;
@@ -471,13 +490,13 @@
          {
             log.debug("DuplicateDetection is disabled, sending clustered messages blocked");
          }
-         
+
          final TopologyMember currentMember = clusterManagerTopology.getMember(nodeUUID.toString());
-         
+
          if (currentMember == null)
          {
             // sanity check only
-            throw new IllegalStateException ("InternalError! The ClusterConnection doesn't know about its own node = " + this);
+            throw new IllegalStateException("InternalError! The ClusterConnection doesn't know about its own node = " + this);
          }
 
          serverLocator.setNodeID(nodeUUID.toString());
@@ -502,27 +521,9 @@
 
          serverLocator.addClusterTopologyListener(this);
 
+         serverLocator.setAfterConnectionInternalListener(this);
+
          serverLocator.start(server.getExecutorFactory().getExecutor());
-         
-        /* serverLocator.getExecutor().execute(new Runnable(){
-            public void run()
-            {
-               try
-               {
-                  ClientSessionFactoryInternal csf = serverLocator.connect();
-                  
-                  log.info(this + "::YYY " + nodeUUID.toString() + " Cluster connection " + ClusterConnectionImpl.this + 
-                           " connected, sending announce node, connector=" + 
-                           manager.getLocalMember().getConnector().a + "/" + manager.getLocalMember().getConnector().b);
-                  
-                  csf.sendNodeAnnounce(currentMember.getUniqueEventID(), nodeUUID.toString(), false, manager.getLocalMember().getConnector().a, manager.getLocalMember().getConnector().b);
-               }
-               catch (Exception e)
-               {
-                  log.warn("Error on connectin Cluster connection to other nodes", e);
-               }
-            }
-         });*/
       }
 
       if (managementService != null)
@@ -576,7 +577,7 @@
       }
    }
 
-   public void nodeUP(final long eventUID, 
+   public void nodeUP(final long eventUID,
                       final String nodeID,
                       final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
                       final boolean last)
@@ -590,14 +591,18 @@
 
       if (nodeID.equals(nodeUUID.toString()))
       {
-        	if (log.isTraceEnabled())
-        	{
-        	   log.trace(this + "::informing about backup to itself, nodeUUID=" + nodeUUID + ", connectorPair=" + connectorPair + " this = " + this);
-        	}
+         if (log.isTraceEnabled())
+         {
+            log.trace(this + "::informing about backup to itself, nodeUUID=" +
+                      nodeUUID +
+                      ", connectorPair=" +
+                      connectorPair +
+                      " this = " +
+                      this);
+         }
          return;
       }
 
-
       // if the node is more than 1 hop away, we do not create a bridge for direct cluster connection
       if (allowDirectConnectionsOnly && !allowableConnections.contains(connectorPair.a))
       {
@@ -703,6 +708,8 @@
       targetLocator.setMaxRetryInterval(maxRetryInterval);
       targetLocator.setRetryIntervalMultiplier(retryIntervalMultiplier);
 
+      targetLocator.setAfterConnectionInternalListener(this);
+
       targetLocator.setNodeID(serverLocator.getNodeID());
 
       targetLocator.setClusterTransportConfiguration(serverLocator.getClusterTransportConfiguration());
@@ -713,9 +720,14 @@
       }
 
       targetLocator.disableFinalizeCheck();
-      
-      MessageFlowRecordImpl record = new MessageFlowRecordImpl(targetLocator, eventUID, targetNodeID, connector, queueName, queue);
 
+      MessageFlowRecordImpl record = new MessageFlowRecordImpl(targetLocator,
+                                                               eventUID,
+                                                               targetNodeID,
+                                                               connector,
+                                                               queueName,
+                                                               queue);
+
       ClusterConnectionBridge bridge = new ClusterConnectionBridge(this,
                                                                    manager,
                                                                    targetLocator,
@@ -768,7 +780,7 @@
       private BridgeImpl bridge;
 
       private final long eventUID;
-      
+
       private final String targetNodeID;
 
       private final TransportConfiguration connector;
@@ -831,7 +843,7 @@
       {
          return address.toString();
       }
-      
+
       /**
        * @return the eventUID
        */
@@ -1129,7 +1141,8 @@
             // hops is too high
             // or there are multiple cluster connections for the same address
 
-            ClusterConnectionImpl.log.warn(this + "::Remote queue binding " + clusterName +
+            ClusterConnectionImpl.log.warn(this + "::Remote queue binding " +
+                                           clusterName +
                                            " has already been bound in the post office. Most likely cause for this is you have a loop " +
                                            "in your cluster due to cluster max-hops being too large or you have multiple cluster connections to the same nodes using overlapping addresses");
 

Modified: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java	2011-08-30 11:05:47 UTC (rev 11243)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java	2011-08-30 17:13:32 UTC (rev 11244)
@@ -100,8 +100,6 @@
    private final Map<String, ClusterConnection> clusterConnections = new HashMap<String, ClusterConnection>();
 
    private final Topology topology = new Topology(this);
-   
-   private TopologyMember localMember;
 
    private volatile ServerLocatorInternal backupServerLocator;
 
@@ -173,7 +171,7 @@
    
    public TopologyMember getLocalMember()
    {
-      return localMember;
+      return topology.getMember(nodeUUID.toString());
    }
    
    public String getNodeId()
@@ -301,8 +299,9 @@
    {
       if (log.isDebugEnabled())
       {
-         log.debug(this + "::NodeAnnounced, backup=" + backup + nodeID + connectorPair);
+         log.info(this + "::NodeAnnounced, backup=" + backup + nodeID + connectorPair);
       }
+      System.out.println(this + "::NodeAnnounced, backup=" + backup + nodeID + connectorPair);
 
       TopologyMember newMember = new TopologyMember(connectorPair.a, connectorPair.b);
       newMember.setUniqueEventID(uniqueEventID);
@@ -475,6 +474,7 @@
    {
       String nodeID = server.getNodeID().toString();
       
+      TopologyMember localMember;
       if (backup)
       {
          localMember = new TopologyMember(null, nodeConnector);

Modified: branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java	2011-08-30 11:05:47 UTC (rev 11243)
+++ branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java	2011-08-30 17:13:32 UTC (rev 11244)
@@ -2029,6 +2029,7 @@
          servers[node].setIdentity("server " + node);
          log.info("starting server " + servers[node]);
          servers[node].start();
+         Thread.sleep(100);
          
 //         for (int i = 0 ; i <= node; i++)
 //         {

Modified: branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java	2011-08-30 11:05:47 UTC (rev 11243)
+++ branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java	2011-08-30 17:13:32 UTC (rev 11244)
@@ -20,7 +20,6 @@
 import java.util.concurrent.TimeUnit;
 
 import junit.framework.Assert;
-
 import org.hornetq.api.core.Pair;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.TransportConfiguration;
@@ -34,6 +33,8 @@
 import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
 import org.hornetq.core.client.impl.ServerLocatorImpl;
 import org.hornetq.core.client.impl.ServerLocatorInternal;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.server.HornetQServer;
 import org.hornetq.jms.client.HornetQTextMessage;
 import org.hornetq.tests.integration.cluster.util.TestableServer;
 import org.hornetq.tests.util.ServiceTestBase;
@@ -47,6 +48,7 @@
  */
 public abstract class MultipleBackupsFailoverTestBase extends ServiceTestBase
 {
+   Logger log = Logger.getLogger(this.getClass());
    // Constants -----------------------------------------------------
 
    // Attributes ----------------------------------------------------
@@ -102,7 +104,7 @@
                }
             }
          }
-         
+
          try
          {
             Thread.sleep(100);
@@ -170,6 +172,13 @@
    protected ClientSessionFactoryInternal createSessionFactoryAndWaitForTopology(ServerLocator locator,
                                                                                  int topologyMembers) throws Exception
    {
+      return createSessionFactoryAndWaitForTopology(locator, topologyMembers, null);
+   }
+
+   protected ClientSessionFactoryInternal createSessionFactoryAndWaitForTopology(ServerLocator locator,
+                                                                                 int topologyMembers,
+                                                                                 HornetQServer server) throws Exception
+   {
       ClientSessionFactoryInternal sf;
       CountDownLatch countDownLatch = new CountDownLatch(topologyMembers);
 
@@ -179,12 +188,16 @@
       sf = (ClientSessionFactoryInternal)locator.createSessionFactory();
 
       boolean ok = countDownLatch.await(5, TimeUnit.SECONDS);
+      locator.removeClusterTopologyListener(topListener);
       if (!ok)
       {
-         System.out.println(((ServerLocatorInternal)locator).getTopology().describe());
+         log.info("failed topology, Topology on client = " + (((ServerLocatorInternal)locator).getTopology().describe()));
+         if (server != null)
+         {
+            log.info("failed topology, Topology on server = " + server.getClusterManager().getTopology().describe());
+         }
       }
-      locator.removeClusterTopologyListener(topListener);
-      assertTrue(ok);
+      assertTrue("expected " + topologyMembers + " members", ok);
       return sf;
    }
 
@@ -219,7 +232,10 @@
          this.latch = latch;
       }
 
-      public void nodeUP(final long uniqueEventID, String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last)
+      public void nodeUP(final long uniqueEventID,
+                         String nodeID,
+                         Pair<TransportConfiguration, TransportConfiguration> connectorPair,
+                         boolean last)
       {
          if (connectorPair.a != null && !liveNode.contains(connectorPair.a.getName()))
          {

Modified: branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java	2011-08-30 11:05:47 UTC (rev 11243)
+++ branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java	2011-08-30 17:13:32 UTC (rev 11244)
@@ -41,7 +41,7 @@
    {
       for (TestableServer testableServer : servers.values())
       {
-         if(testableServer != null)
+         if (testableServer != null)
          {
             try
             {
@@ -55,44 +55,51 @@
       }
       super.tearDown();
    }
-   
+
    public void testMultipleFailovers2LiveServers() throws Exception
    {
-      // TODO: remove these sleeps
       NodeManager nodeManager1 = new InVMNodeManager();
       NodeManager nodeManager2 = new InVMNodeManager();
       createLiveConfig(nodeManager1, 0, 3, 4, 5);
-      createBackupConfig(nodeManager1, 0, 1, true, new int[] {0, 2}, 3, 4, 5);
-      createBackupConfig(nodeManager1, 0, 2, true, new int[] {0, 1}, 3, 4, 5);
+      createBackupConfig(nodeManager1, 0, 1, true, new int[] { 0, 2 }, 3, 4, 5);
+      createBackupConfig(nodeManager1, 0, 2, true, new int[] { 0, 1 }, 3, 4, 5);
       createLiveConfig(nodeManager2, 3, 0);
-      createBackupConfig(nodeManager2, 3, 4, true, new int[] {3, 5}, 0, 1, 2);
-      createBackupConfig(nodeManager2, 3, 5, true, new int[] {3, 4}, 0, 1, 2);
-      
-      Thread.sleep(500);
+      createBackupConfig(nodeManager2, 3, 4, true, new int[] { 3, 5 }, 0, 1, 2);
+      createBackupConfig(nodeManager2, 3, 5, true, new int[] { 3, 4 }, 0, 1, 2);
+
       servers.get(0).start();
-      Thread.sleep(500);
+      waitForServer(servers.get(0).getServer());
+
       servers.get(3).start();
-      Thread.sleep(500);
+      waitForServer(servers.get(3).getServer());
+      
       servers.get(1).start();
-      Thread.sleep(500);
+      waitForServer(servers.get(1).getServer());
+
       servers.get(2).start();
-      Thread.sleep(500);
+      
       servers.get(4).start();
-      Thread.sleep(500);
+      waitForServer(servers.get(4).getServer());
+      
       servers.get(5).start();
+
+      waitForServer(servers.get(4).getServer());
+
       ServerLocator locator = getServerLocator(0);
 
       locator.setBlockOnNonDurableSend(true);
       locator.setBlockOnDurableSend(true);
       locator.setBlockOnAcknowledge(true);
       locator.setReconnectAttempts(-1);
-      ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 4);
+      ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 4, servers.get(0).getServer());
       ClientSession session = sendAndConsume(sf, true);
 
       System.out.println(((ServerLocatorInternal)locator).getTopology().describe());
       Thread.sleep(500);
       servers.get(0).crash(session);
 
+      System.out.println("server3 " + servers.get(3).getServer().getClusterManager().getTopology().describe());
+
       int liveAfter0 = waitForNewLive(10000, true, servers, 1, 2);
 
       ServerLocator locator2 = getServerLocator(3);
@@ -139,11 +146,18 @@
       }
    }
 
-   protected void createBackupConfig(NodeManager nodeManager, int liveNode, int nodeid, boolean createClusterConnections, int[] otherBackupNodes, int... otherClusterNodes)
+   protected void createBackupConfig(NodeManager nodeManager,
+                                     int liveNode,
+                                     int nodeid,
+                                     boolean createClusterConnections,
+                                     int[] otherBackupNodes,
+                                     int... otherClusterNodes)
    {
       Configuration config1 = super.createDefaultConfig();
       config1.getAcceptorConfigurations().clear();
-      config1.getAcceptorConfigurations().add(createTransportConfiguration(isNetty(), true, generateParams(nodeid, isNetty())));
+      config1.getAcceptorConfigurations().add(createTransportConfiguration(isNetty(),
+                                                                           true,
+                                                                           generateParams(nodeid, isNetty())));
       config1.setSecurityEnabled(false);
       config1.setSharedStore(true);
       config1.setBackup(true);
@@ -152,21 +166,36 @@
       List<String> staticConnectors = new ArrayList<String>();
       for (int node : otherBackupNodes)
       {
-         TransportConfiguration liveConnector = createTransportConfiguration(isNetty(), false, generateParams(node, isNetty()));
+         TransportConfiguration liveConnector = createTransportConfiguration(isNetty(),
+                                                                             false,
+                                                                             generateParams(node, isNetty()));
          config1.getConnectorConfigurations().put(liveConnector.getName(), liveConnector);
          staticConnectors.add(liveConnector.getName());
       }
-      TransportConfiguration backupConnector = createTransportConfiguration(isNetty(), false, generateParams(nodeid, isNetty()));
+      TransportConfiguration backupConnector = createTransportConfiguration(isNetty(),
+                                                                            false,
+                                                                            generateParams(nodeid, isNetty()));
       config1.getConnectorConfigurations().put(backupConnector.getName(), backupConnector);
 
       List<String> clusterNodes = new ArrayList<String>();
       for (int node : otherClusterNodes)
       {
-         TransportConfiguration connector = createTransportConfiguration(isNetty(), false, generateParams(node, isNetty()));
+         TransportConfiguration connector = createTransportConfiguration(isNetty(),
+                                                                         false,
+                                                                         generateParams(node, isNetty()));
          config1.getConnectorConfigurations().put(connector.getName(), connector);
          clusterNodes.add(connector.getName());
       }
-      ClusterConnectionConfiguration ccc1 = new ClusterConnectionConfiguration("cluster1", "jms", backupConnector.getName(), -1, false, false, 1, 1, clusterNodes, false);
+      ClusterConnectionConfiguration ccc1 = new ClusterConnectionConfiguration("cluster1",
+                                                                               "jms",
+                                                                               backupConnector.getName(),
+                                                                               -1,
+                                                                               false,
+                                                                               false,
+                                                                               1,
+                                                                               1,
+                                                                               clusterNodes,
+                                                                               false);
       config1.getClusterConfigurations().add(ccc1);
 
       config1.setBindingsDirectory(config1.getBindingsDirectory() + "_" + liveNode);
@@ -177,25 +206,39 @@
       servers.put(nodeid, new SameProcessHornetQServer(createInVMFailoverServer(true, config1, nodeManager, liveNode)));
    }
 
-   protected void createLiveConfig(NodeManager nodeManager, int liveNode, int ... otherLiveNodes)
+   protected void createLiveConfig(NodeManager nodeManager, int liveNode, int... otherLiveNodes)
    {
-      TransportConfiguration liveConnector = createTransportConfiguration(isNetty(), false, generateParams(liveNode, isNetty()));
+      TransportConfiguration liveConnector = createTransportConfiguration(isNetty(),
+                                                                          false,
+                                                                          generateParams(liveNode, isNetty()));
       Configuration config0 = super.createDefaultConfig();
       config0.getAcceptorConfigurations().clear();
-      config0.getAcceptorConfigurations().add(createTransportConfiguration(isNetty(), true, generateParams(liveNode, isNetty())));
+      config0.getAcceptorConfigurations().add(createTransportConfiguration(isNetty(),
+                                                                           true,
+                                                                           generateParams(liveNode, isNetty())));
       config0.setSecurityEnabled(false);
       config0.setSharedStore(true);
       config0.setClustered(true);
       List<String> pairs = new ArrayList<String>();
       for (int node : otherLiveNodes)
       {
-         TransportConfiguration otherLiveConnector = createTransportConfiguration(isNetty(), false, generateParams(node, isNetty()));
+         TransportConfiguration otherLiveConnector = createTransportConfiguration(isNetty(),
+                                                                                  false,
+                                                                                  generateParams(node, isNetty()));
          config0.getConnectorConfigurations().put(otherLiveConnector.getName(), otherLiveConnector);
-         pairs.add(otherLiveConnector.getName());  
+         pairs.add(otherLiveConnector.getName());
 
       }
-      ClusterConnectionConfiguration ccc0 = new ClusterConnectionConfiguration("cluster1", "jms", liveConnector.getName(), -1, false, false, 1, 1,
-            pairs, false);
+      ClusterConnectionConfiguration ccc0 = new ClusterConnectionConfiguration("cluster1",
+                                                                               "jms",
+                                                                               liveConnector.getName(),
+                                                                               -1,
+                                                                               false,
+                                                                               false,
+                                                                               1,
+                                                                               1,
+                                                                               pairs,
+                                                                               false);
       config0.getClusterConfigurations().add(ccc0);
       config0.getConnectorConfigurations().put(liveConnector.getName(), liveConnector);
 
@@ -204,7 +247,8 @@
       config0.setPagingDirectory(config0.getPagingDirectory() + "_" + liveNode);
       config0.setLargeMessagesDirectory(config0.getLargeMessagesDirectory() + "_" + liveNode);
 
-      servers.put(liveNode, new SameProcessHornetQServer(createInVMFailoverServer(true, config0, nodeManager, liveNode)));
+      servers.put(liveNode,
+                  new SameProcessHornetQServer(createInVMFailoverServer(true, config0, nodeManager, liveNode)));
    }
 
    protected boolean isNetty()

Modified: branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/util/SameProcessHornetQServer.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/util/SameProcessHornetQServer.java	2011-08-30 11:05:47 UTC (rev 11243)
+++ branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/util/SameProcessHornetQServer.java	2011-08-30 17:13:32 UTC (rev 11244)
@@ -25,6 +25,7 @@
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.server.HornetQServer;
 import org.hornetq.core.server.cluster.impl.ClusterManagerImpl;
+import org.hornetq.tests.util.ServiceTestBase;
 
 /**
  * A SameProcessHornetQServer



More information about the hornetq-commits mailing list