[hornetq-commits] JBoss hornetq SVN: r9953 - in branches/2_2_0_HA_Improvements_preMerge: src/main/org/hornetq/core/protocol/core/impl/wireformat and 6 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Nov 30 11:27:03 EST 2010


Author: ataylor
Date: 2010-11-30 11:27:01 -0500 (Tue, 30 Nov 2010)
New Revision: 9953

Modified:
   branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
   branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/protocol/core/impl/wireformat/NodeAnnounceMessage.java
   branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/server/cluster/ClusterConnection.java
   branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
   branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
   branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
   branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
   branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverReplicationTest.java
   branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverSharedServerTest.java
   branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java
   branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java
   branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/failover/SingleLiveMultipleBackupsFailoverTest.java
   branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/util/ServiceTestBase.java
Log:
activate cluster connection properly and test fixes

Modified: branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java	2010-11-30 15:39:22 UTC (rev 9952)
+++ branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java	2010-11-30 16:27:01 UTC (rev 9953)
@@ -244,7 +244,7 @@
 
    public void setBackupConnector(TransportConfiguration live, TransportConfiguration backUp)
    {
-      if(live.equals(connectorConfig))
+      if(live.equals(connectorConfig) && backUp != null)
       {
          backupConfig = backUp;
       }
@@ -1209,10 +1209,7 @@
          if (type == PacketImpl.DISCONNECT)
          {
             final DisconnectMessage msg = (DisconnectMessage)packet;
-            if (msg.getNodeID() != null)
-            {
-               System.out.println("received disconnect from node " + msg.getNodeID());
-            }
+            
             closeExecutor.execute(new Runnable()
             {
                // Must be executed on new thread since cannot block the netty thread for a long time and fail can

Modified: branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/protocol/core/impl/wireformat/NodeAnnounceMessage.java
===================================================================
--- branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/protocol/core/impl/wireformat/NodeAnnounceMessage.java	2010-11-30 15:39:22 UTC (rev 9952)
+++ branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/protocol/core/impl/wireformat/NodeAnnounceMessage.java	2010-11-30 16:27:01 UTC (rev 9953)
@@ -49,16 +49,8 @@
       this.backup = backup;
       
       this.connector = tc;
-      if(System.getProperty("foo") != null)
-      {
-         if(tc.toString().contains("org-hornetq-core-remoting-impl-invm-InVMConnectorFactory?server-id=1"))
-         {
-            System.out.println("");
-         }
-      }
-
    }
-
+   
    public NodeAnnounceMessage()
    {
       super(PacketImpl.NODE_ANNOUNCE);

Modified: branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/server/cluster/ClusterConnection.java
===================================================================
--- branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/server/cluster/ClusterConnection.java	2010-11-30 15:39:22 UTC (rev 9952)
+++ branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/server/cluster/ClusterConnection.java	2010-11-30 16:27:01 UTC (rev 9953)
@@ -48,7 +48,7 @@
                                    SimpleString queueName,
                                    int distance) throws Exception;
 
-   void activate();
+   void activate() throws Exception;
    
    TransportConfiguration getConnector();
 

Modified: branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2010-11-30 15:39:22 UTC (rev 9952)
+++ branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2010-11-30 16:27:01 UTC (rev 9953)
@@ -195,41 +195,14 @@
          return;
       }
 
-      if (serverLocator != null)
+      started = true;
+      
+      if(!backup)
       {
-         serverLocator.addClusterTopologyListener(this);
-         serverLocator.start();
-         // FIXME Ugly ugly code to connect to other nodes and form the cluster... :(
-         server.getExecutorFactory().getExecutor().execute(new Runnable()
-         {
-            public void run()
-            {
-               try
-               {
-                  serverLocator.connect();
-               }
-               catch (Exception e)
-               {
-                  if(started)
-                  {
-                     log.warn("did not connect the cluster connection to other nodes", e);
-                  }
-               }
-            }
-         });
+         activate();
       }
-      
-      started = true;
 
-      if (managementService != null)
-      {
-         TypedProperties props = new TypedProperties();
-         props.putSimpleStringProperty(new SimpleString("name"), name);
-         Notification notification = new Notification(nodeUUID.toString(),
-                                                      NotificationType.CLUSTER_CONNECTION_STARTED,
-                                                      props);
-         managementService.sendNotification(notification);
-      }
+
    }
 
    public void stop() throws Exception
@@ -299,7 +272,7 @@
       return nodes;
    }
 
-   public synchronized void activate()
+   public synchronized void activate() throws Exception
    {
       if (!started)
       {
@@ -307,6 +280,41 @@
       }
 
       backup = false;
+
+
+      if (serverLocator != null)
+      {
+         serverLocator.addClusterTopologyListener(this);
+         serverLocator.start();
+         // FIXME Ugly ugly code to connect to other nodes and form the cluster... :(
+         server.getExecutorFactory().getExecutor().execute(new Runnable()
+         {
+            public void run()
+            {
+               try
+               {
+                  serverLocator.connect();
+               }
+               catch (Exception e)
+               {
+                  if(started)
+                  {
+                     log.warn("did not connect the cluster connection to other nodes", e);
+                  }
+               }
+            }
+         });
+      }
+
+      if (managementService != null)
+      {
+         TypedProperties props = new TypedProperties();
+         props.putSimpleStringProperty(new SimpleString("name"), name);
+         Notification notification = new Notification(nodeUUID.toString(),
+                                                      NotificationType.CLUSTER_CONNECTION_STARTED,
+                                                      props);
+         managementService.sendNotification(notification);
+      }
    }
    
    public TransportConfiguration getConnector()

Modified: branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java	2010-11-30 15:39:22 UTC (rev 9952)
+++ branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java	2010-11-30 16:27:01 UTC (rev 9953)
@@ -100,7 +100,7 @@
 
    private Topology topology = new Topology();
 
-   private ServerLocatorInternal backupServerLocator;
+   private volatile ServerLocatorInternal backupServerLocator;
 
    private final List<ServerLocatorInternal> clusterLocators = new ArrayList<ServerLocatorInternal>();
 
@@ -384,7 +384,7 @@
          {
             try
             {
-               clusterConnection.start();
+               clusterConnection.activate();
             }
             catch (Exception e)
             {
@@ -790,12 +790,10 @@
 
       clusterConnections.put(config.getName(), clusterConnection);
 
-      if (!backup)
+      clusterConnection.start();
+      
+      if(backup)
       {
-         clusterConnection.start();
-      }
-      else
-      {
          announceBackup(config, connector);
       }
    }
@@ -836,7 +834,11 @@
             try
             {
                ClientSessionFactory backupSessionFactory = backupServerLocator.connect();
-               backupSessionFactory.getConnection().getChannel(0, -1).send(new NodeAnnounceMessage(nodeUUID.toString(), true, connector));
+               if (backupSessionFactory != null)
+               {
+                  backupSessionFactory.getConnection().getChannel(0, -1).send(new NodeAnnounceMessage(nodeUUID.toString(), true, connector));
+                  log.info("backup announced");
+               }
             }
             catch (Exception e)
             {

Modified: branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2010-11-30 15:39:22 UTC (rev 9952)
+++ branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2010-11-30 16:27:01 UTC (rev 9953)
@@ -605,17 +605,18 @@
          }
 
          connectorsService.stop();
+         //we stop the groupinghandler before we stop te cluster manager so binding mappings aren't removed in case of failover
+         if (groupingHandler != null)
+         {
+            managementService.removeNotificationListener(groupingHandler);
+            groupingHandler = null;
+         }
          
          if (clusterManager != null)
          {
             clusterManager.stop();
          }
 
-         if (groupingHandler != null)
-         {
-            managementService.removeNotificationListener(groupingHandler);
-            groupingHandler = null;
-         }
       }
 
       // we stop the remoting service outside a lock

Modified: branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java	2010-11-30 15:39:22 UTC (rev 9952)
+++ branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java	2010-11-30 16:27:01 UTC (rev 9953)
@@ -785,7 +785,7 @@
    {
       for (int i = 0; i < consumers.length; i++)
       {
-         if (consumers[i] != null)
+         if (consumers[i] != null && !consumers[i].consumer.isClosed())
          {
             ClusterTestBase.log.info("Dumping consumer " + i);
 
@@ -1163,6 +1163,11 @@
 
    protected void setupSessionFactory(final int node, final boolean netty) throws Exception
    {
+      setupSessionFactory(node, netty, false);
+   }
+
+   protected void setupSessionFactory(final int node, final boolean netty, boolean ha) throws Exception
+   {
       if (sfs[node] != null)
       {
          throw new IllegalArgumentException("Already a server at " + node);
@@ -1181,7 +1186,14 @@
          serverTotc = new TransportConfiguration(ServiceTestBase.INVM_CONNECTOR_FACTORY, params);
       }
 
-      locators[node] = HornetQClient.createServerLocatorWithoutHA(serverTotc);
+      if (ha)
+      {
+         locators[node] = HornetQClient.createServerLocatorWithHA(serverTotc);
+      }
+      else
+      {
+         locators[node] = HornetQClient.createServerLocatorWithoutHA(serverTotc);
+      }
 
       locators[node].setBlockOnNonDurableSend(true);
       locators[node].setBlockOnDurableSend(true);
@@ -1190,6 +1202,7 @@
       sfs[node] = sf;
    }
 
+
    protected void setupSessionFactory(final int node, final boolean netty, int reconnectAttempts) throws Exception
    {
       if (sfs[node] != null)

Modified: branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverReplicationTest.java
===================================================================
--- branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverReplicationTest.java	2010-11-30 15:39:22 UTC (rev 9952)
+++ branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverReplicationTest.java	2010-11-30 16:27:01 UTC (rev 9953)
@@ -29,64 +29,9 @@
  */
 public class GroupingFailoverReplicationTest extends GroupingFailoverTestBase
 {
-   private static final Logger log = Logger.getLogger(GroupingFailoverReplicationTest.class);
-
    @Override
-   protected void setupReplicatedServer(final int node,
-                                        final boolean fileStorage,
-                                        final boolean netty,
-                                        final int backupNode)
+   boolean isSharedServer()
    {
-      if (servers[node] != null)
-      {
-         throw new IllegalArgumentException("Already a server at node " + node);
-      }
-
-      Configuration configuration = new ConfigurationImpl();
-
-      configuration.setSecurityEnabled(false);
-      configuration.setBindingsDirectory(getBindingsDir(node, false));
-      configuration.setJournalMinFiles(2);
-      configuration.setJournalMaxIO_AIO(1000);
-      configuration.setJournalDirectory(getJournalDir(node, false));
-      configuration.setJournalFileSize(100 * 1024);
-      configuration.setJournalType(getDefaultJournalType());
-      configuration.setPagingDirectory(getPageDir(node, false));
-      configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, false));
-      configuration.setClustered(true);
-      configuration.setJournalCompactMinFiles(0);
-      configuration.setBackup(true);
-      configuration.setSharedStore(false);
-
-      configuration.getAcceptorConfigurations().clear();
-
-      Map<String, Object> params = generateParams(node, netty);
-
-      TransportConfiguration invmtc = new TransportConfiguration(ServiceTestBase.INVM_ACCEPTOR_FACTORY, params);
-      configuration.getAcceptorConfigurations().add(invmtc);
-
-      if (netty)
-      {
-         TransportConfiguration nettytc = new TransportConfiguration(ServiceTestBase.NETTY_ACCEPTOR_FACTORY, params);
-         configuration.getAcceptorConfigurations().add(nettytc);
-      }
-
-      HornetQServer server;
-
-      if (fileStorage)
-      {
-         server = HornetQServers.newHornetQServer(configuration);
-      }
-      else
-      {
-         server = HornetQServers.newHornetQServer(configuration, false);
-      }
-      servers[node] = server;
+      return false; 
    }
-
-   @Override
-   void setupMasterServer(final int i, final boolean fileStorage, final boolean netty)
-   {
-      setupLiveServer(i, fileStorage, false, netty);
-   }
 }

Modified: branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverSharedServerTest.java
===================================================================
--- branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverSharedServerTest.java	2010-11-30 15:39:22 UTC (rev 9952)
+++ branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverSharedServerTest.java	2010-11-30 16:27:01 UTC (rev 9953)
@@ -28,67 +28,10 @@
  */
 public class GroupingFailoverSharedServerTest extends GroupingFailoverTestBase
 {
-   @Override
-   protected void setupReplicatedServer(final int node,
-                                        final boolean fileStorage,
-                                        final boolean netty,
-                                        final int backupNode)
-   {
-      if (servers[node] != null)
-      {
-         throw new IllegalArgumentException("Already a server at node " + node);
-      }
 
-      Configuration configuration = new ConfigurationImpl();
-
-      configuration.setSecurityEnabled(false);
-      configuration.setBindingsDirectory(getBindingsDir(backupNode, false));
-      configuration.setJournalMinFiles(2);
-      configuration.setJournalMaxIO_AIO(1000);
-      configuration.setJournalDirectory(getJournalDir(backupNode, false));
-      configuration.setJournalFileSize(100 * 1024);
-      configuration.setJournalType(getDefaultJournalType());
-      configuration.setPagingDirectory(getPageDir(backupNode, false));
-      configuration.setLargeMessagesDirectory(getLargeMessagesDir(backupNode, false));
-      configuration.setClustered(true);
-      configuration.setJournalCompactMinFiles(0);
-      configuration.setBackup(true);
-      configuration.setSharedStore(true);
-
-      configuration.getAcceptorConfigurations().clear();
-
-      Map<String, Object> params = generateParams(node, netty);
-
-      TransportConfiguration invmtc = new TransportConfiguration(ServiceTestBase.INVM_ACCEPTOR_FACTORY, params);
-      configuration.getAcceptorConfigurations().add(invmtc);
-
-      if (netty)
-      {
-         TransportConfiguration nettytc = new TransportConfiguration(ServiceTestBase.NETTY_ACCEPTOR_FACTORY, params);
-         configuration.getAcceptorConfigurations().add(nettytc);
-      }
-
-      HornetQServer server;
-
-      if (fileStorage)
-      {
-         if(nodeManagers[backupNode] == null)
-         {
-            nodeManagers[backupNode] = new InVMNodeManager();
-         }
-         server = createInVMFailoverServer(true, configuration, nodeManagers[backupNode]);
-      }
-      else
-      {
-         server = HornetQServers.newHornetQServer(configuration, false);
-      }
-      servers[node] = server;
-   }
-
    @Override
-   public void setupMasterServer(final int i, final boolean fileStorage, final boolean netty)
+   boolean isSharedServer()
    {
-      setupServer(i, fileStorage, netty);
+      return true;
    }
-
 }

Modified: branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java
===================================================================
--- branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java	2010-11-30 15:39:22 UTC (rev 9952)
+++ branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java	2010-11-30 16:27:01 UTC (rev 9953)
@@ -21,7 +21,9 @@
 import org.hornetq.api.core.HornetQException;
 import org.hornetq.api.core.Message;
 import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.client.impl.Topology;
 import org.hornetq.core.remoting.FailureListener;
+import org.hornetq.core.server.HornetQServer;
 import org.hornetq.core.server.cluster.MessageFlowRecord;
 import org.hornetq.core.server.cluster.impl.ClusterConnectionImpl;
 import org.hornetq.core.server.group.impl.GroupingHandlerConfiguration;
@@ -37,17 +39,17 @@
 
    public void testGroupingLocalHandlerFails() throws Exception
    {
-      setupReplicatedServer(2, isFileStorage(), isNetty(), 0);
+     setupBackupServer(2, 0, isFileStorage(), isSharedServer(), isNetty());
 
-      setupMasterServer(0, isFileStorage(), isNetty());
+      setupLiveServer(0, isFileStorage(), isSharedServer(), isNetty());
 
-      setupServer(1, isFileStorage(), isNetty());
+      setupLiveServer(1, isFileStorage(), isSharedServer(), isNetty());
 
       setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1);
 
-      setupClusterConnectionWithBackups("cluster1", "queues", false, 1, isNetty(), 1, new int[] { 0 });
+      setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0);
 
-      setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 1);
+      setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 2, 1);
 
       setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
 
@@ -74,6 +76,11 @@
          waitForBindings(0, "queues.testaddress", 1, 1, false);
          waitForBindings(1, "queues.testaddress", 1, 1, false);
 
+         waitForBindings(0, "queues.testaddress", 1, 1, true);
+         waitForBindings(1, "queues.testaddress", 1, 1, true);
+
+         waitForServerTopology(servers[1], 3, 5);
+
          sendWithProperty(0, "queues.testaddress", 10, false, Message.HDR_GROUP_ID, new SimpleString("id1"));
 
          verifyReceiveAll(10, 0);
@@ -90,12 +97,6 @@
 
          waitForBindings(2, "queues.testaddress", 1, 1, true);
 
-         waitForBindings(2, "queues.testaddress", 1, 1, false);
-
-         waitForBindings(1, "queues.testaddress", 1, 1, true);
-
-         waitForBindings(1, "queues.testaddress", 1, 1, false);
-
          sendWithProperty(2, "queues.testaddress", 10, false, Message.HDR_GROUP_ID, new SimpleString("id1"));
 
          verifyReceiveAll(10, 2);
@@ -116,18 +117,19 @@
 
    public void testGroupingLocalHandlerFailsMultipleGroups() throws Exception
    {
-      setupReplicatedServer(2, isFileStorage(), isNetty(), 0);
+      setupBackupServer(2, 0, isFileStorage(), isSharedServer(), isNetty());
 
-      setupMasterServer(0, isFileStorage(), isNetty());
+      setupLiveServer(0, isFileStorage(), isSharedServer(), isNetty());
 
-      setupServer(1, isFileStorage(), isNetty());
+      setupLiveServer(1, isFileStorage(), isSharedServer(), isNetty());
 
       setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1);
 
-      setupClusterConnectionWithBackups("cluster1", "queues", false, 1, isNetty(), 1, new int[] { 0 });
+      setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0);
 
-      setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 1);
+      setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 2, 1);
 
+
       setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
 
       setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
@@ -138,7 +140,9 @@
       try
       {
          startServers(2, 0, 1);
+
          setupSessionFactory(0, isNetty());
+
          setupSessionFactory(1, isNetty());
 
          createQueue(0, "queues.testaddress", "queue0", null, true);
@@ -155,6 +159,12 @@
          waitForBindings(0, "queues.testaddress", 1, 1, false);
          waitForBindings(1, "queues.testaddress", 1, 1, false);
 
+         waitForBindings(0, "queues.testaddress", 1, 1, true);
+         waitForBindings(1, "queues.testaddress", 1, 1, true);
+
+         waitForServerTopology(servers[1], 3, 5);
+
+
          sendWithProperty(0, "queues.testaddress", 10, false, Message.HDR_GROUP_ID, new SimpleString("id1"));
          sendWithProperty(0, "queues.testaddress", 10, false, Message.HDR_GROUP_ID, new SimpleString("id2"));
          sendWithProperty(0, "queues.testaddress", 10, false, Message.HDR_GROUP_ID, new SimpleString("id3"));
@@ -166,22 +176,8 @@
 
          closeSessionFactory(0);
 
-         final CountDownLatch latch = new CountDownLatch(1);
+         servers[0].kill();
 
-         class MyListener implements FailureListener
-         {
-            public void connectionFailed(final HornetQException me, boolean failedOver)
-            {
-               latch.countDown();
-            }
-         }
-
-         Map<String, MessageFlowRecord> records = ((ClusterConnectionImpl)getServer(1).getClusterManager()
-                                                                                      .getClusterConnection(new SimpleString("cluster1"))).getRecords();
-         RemotingConnection rc = records.get("0").getBridge().getForwardingConnection();
-         rc.addFailureListener(new MyListener());
-         fail(rc, latch);
-
          waitForServerRestart(2);
 
          setupSessionFactory(2, isNetty());
@@ -203,7 +199,7 @@
          sendWithProperty(2, "queues.testaddress", 10, false, Message.HDR_GROUP_ID, new SimpleString("id5"));
          sendWithProperty(2, "queues.testaddress", 10, false, Message.HDR_GROUP_ID, new SimpleString("id6"));
 
-         verifyReceiveAllWithGroupIDRoundRobin(0, 30, 1, 2);
+         verifyReceiveAllWithGroupIDRoundRobin(2, 30, 1, 2);
 
          System.out.println("*****************************************************************************");
       }
@@ -219,14 +215,27 @@
       }
    }
 
-   abstract void setupMasterServer(int i, boolean fileStorage, boolean netty);
+   private void waitForServerTopology(HornetQServer server, int nodes, int seconds)
+         throws InterruptedException
+   {
+      Topology topology = server.getClusterManager().getTopology();
+      long timeToWait = System.currentTimeMillis() + (seconds * 1000);
+      while(topology.nodes()!= nodes)
+      {
+         Thread.sleep(100);
+         if(System.currentTimeMillis() > timeToWait)
+         {
+            fail("timed out waiting for server topology");
+         }
+      }
+   }
 
    public boolean isNetty()
    {
       return true;
    }
 
-   abstract void setupReplicatedServer(int node, boolean fileStorage, boolean netty, int backupNode);
+   abstract boolean isSharedServer();
 
    private void fail(final RemotingConnection conn, final CountDownLatch latch) throws InterruptedException
    {

Modified: branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java
===================================================================
--- branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java	2010-11-30 15:39:22 UTC (rev 9952)
+++ branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java	2010-11-30 16:27:01 UTC (rev 9953)
@@ -97,7 +97,8 @@
       System.setProperty("foo", "bar");
       servers.get(3).crash(session2);
       int liveAfter3 = waitForNewLive(10000, true, servers, 4, 5);
-
+      locator.close();
+      locator2.close();
       if (liveAfter0 == 2)
       {
          servers.get(1).stop();

Modified: branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/failover/SingleLiveMultipleBackupsFailoverTest.java
===================================================================
--- branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/failover/SingleLiveMultipleBackupsFailoverTest.java	2010-11-30 15:39:22 UTC (rev 9952)
+++ branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/failover/SingleLiveMultipleBackupsFailoverTest.java	2010-11-30 16:27:01 UTC (rev 9953)
@@ -94,6 +94,8 @@
       session = sendAndConsume(sf, false);
       session.close();
       servers.get(backupNode).stop();
+
+      locator.close();
    }
    
    protected void createBackupConfig(int liveNode, int nodeid, int... nodes)

Modified: branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/util/ServiceTestBase.java
===================================================================
--- branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/util/ServiceTestBase.java	2010-11-30 15:39:22 UTC (rev 9952)
+++ branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/util/ServiceTestBase.java	2010-11-30 16:27:01 UTC (rev 9953)
@@ -98,6 +98,20 @@
       {
          System.exit(0);
       }
+      Map<Thread, StackTraceElement[]> threadMap  = Thread.getAllStackTraces();
+      for (Thread thread : threadMap.keySet())
+      {
+         StackTraceElement[] stack = threadMap.get(thread);
+         for (StackTraceElement stackTraceElement : stack)
+         {
+            if(stackTraceElement.getMethodName().contains("getConnectionWithRetry"))
+            {
+               System.out.println(this.getName() + " has left threads running");
+               fail("test left serverlocator running, this could effect other tests");
+               //System.exit(0);
+            }
+         }
+      }
    }
 
    protected static Map<String, Object> generateParams(final int node, final boolean netty)



More information about the hornetq-commits mailing list