[hornetq-commits] JBoss hornetq SVN: r9639 - in branches/2_2_0_HA_Improvements: src/main/org/hornetq/core/config and 4 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Sep 3 05:11:39 EDT 2010


Author: ataylor
Date: 2010-09-03 05:11:37 -0400 (Fri, 03 Sep 2010)
New Revision: 9639

Added:
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/config/BackupConnectorConfiguration.java
Modified:
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/config/Configuration.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/FakeLockFile.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
   branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/AsynchronousFailoverTest.java
   branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/ClusterWithBackupFailoverTestBase.java
   branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
   branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
Log:
added ha listener for clustermanager when in backup mode

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java	2010-09-03 03:40:15 UTC (rev 9638)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java	2010-09-03 09:11:37 UTC (rev 9639)
@@ -1062,6 +1062,11 @@
       this.backup = backup;
    }
 
+   public void announceBackup()
+   {
+      connect();
+   }
+
    @Override
    protected void finalize() throws Throwable
    {

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java	2010-09-03 03:40:15 UTC (rev 9638)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java	2010-09-03 09:11:37 UTC (rev 9639)
@@ -55,4 +55,5 @@
    
    void setBackup(boolean backup);
 
+   void announceBackup();
 }

Added: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/config/BackupConnectorConfiguration.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/config/BackupConnectorConfiguration.java	                        (rev 0)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/config/BackupConnectorConfiguration.java	2010-09-03 09:11:37 UTC (rev 9639)
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ *  Red Hat licenses this file to you under the Apache License, version
+ *  2.0 (the "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ *  implied.  See the License for the specific language governing
+ *  permissions and limitations under the License.
+ */
+
+package org.hornetq.core.config;
+
+import org.hornetq.api.core.TransportConfiguration;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Created by IntelliJ IDEA.
+ * User: andy
+ * Date: Sep 2, 2010
+ * Time: 11:36:13 AM
+ * To change this template use File | Settings | File Templates.
+ */
+public class BackupConnectorConfiguration implements Serializable
+{
+   private final List<String> staticConnectors;
+
+   private final String discoveryGroupName;
+
+   private String connector;
+
+   public BackupConnectorConfiguration(List<String> staticConnectors, String connector)
+   {
+      this.staticConnectors = staticConnectors;
+      this.discoveryGroupName = null;
+      this.connector = connector;
+   }
+
+   public List<String> getStaticConnectors()
+   {
+      return staticConnectors;
+   }
+
+   public String getDiscoveryGroupName()
+   {
+      return discoveryGroupName;
+   }
+
+   public BackupConnectorConfiguration(String discoveryGroupName, String connector)
+   {
+      this.staticConnectors = null;
+      this.discoveryGroupName = discoveryGroupName;
+      this.connector = connector;
+   }
+
+   public String getConnector()
+   {
+      return connector;
+   }
+}

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/config/Configuration.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/config/Configuration.java	2010-09-03 03:40:15 UTC (rev 9638)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/config/Configuration.java	2010-09-03 09:11:37 UTC (rev 9639)
@@ -252,7 +252,18 @@
     */
    Map<String, TransportConfiguration> getConnectorConfigurations();
 
+
+  /**
+   * sets the connectors used to get topology info from for the backup server when shared store is used.
+   */
+   void setBackupConnectorConfiguration(BackupConnectorConfiguration backupConnectorConfiguration);
+
    /**
+    * Returns the connectors used to get topology info from for the backup server when shared store is used.
+    */
+   BackupConnectorConfiguration getBackupConnectorConfiguration();
+
+   /**
     * Sets the connectors configured for this server.
     */
    void setConnectorConfigurations(Map<String, TransportConfiguration> infos);

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java	2010-09-03 03:40:15 UTC (rev 9638)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java	2010-09-03 09:11:37 UTC (rev 9639)
@@ -23,13 +23,7 @@
 
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.core.config.BridgeConfiguration;
-import org.hornetq.core.config.BroadcastGroupConfiguration;
-import org.hornetq.core.config.ClusterConnectionConfiguration;
-import org.hornetq.core.config.Configuration;
-import org.hornetq.core.config.DiscoveryGroupConfiguration;
-import org.hornetq.core.config.DivertConfiguration;
-import org.hornetq.core.config.CoreQueueConfiguration;
+import org.hornetq.core.config.*;
 import org.hornetq.core.logging.impl.JULLogDelegateFactory;
 import org.hornetq.core.security.Role;
 import org.hornetq.core.server.JournalType;
@@ -237,6 +231,8 @@
 
    protected List<BroadcastGroupConfiguration> broadcastGroupConfigurations = new ArrayList<BroadcastGroupConfiguration>();
 
+   protected BackupConnectorConfiguration backupConnectorConfiguration;
+
    protected Map<String, DiscoveryGroupConfiguration> discoveryGroupConfigurations = new LinkedHashMap<String, DiscoveryGroupConfiguration>();
 
    // Paging related attributes ------------------------------------------------------------
@@ -472,6 +468,16 @@
       return connectorConfigs;
    }
 
+   public void setBackupConnectorConfiguration(BackupConnectorConfiguration backupConnectorConfiguration)
+   {
+      this.backupConnectorConfiguration = backupConnectorConfiguration;
+   }
+
+   public BackupConnectorConfiguration getBackupConnectorConfiguration()
+   {
+      return backupConnectorConfiguration;
+   }
+
    public void setConnectorConfigurations(final Map<String, TransportConfiguration> infos)
    {
       connectorConfigs = infos;

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java	2010-09-03 03:40:15 UTC (rev 9638)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java	2010-09-03 09:11:37 UTC (rev 9639)
@@ -29,19 +29,19 @@
 import org.hornetq.api.core.Pair;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientSessionFactory;
 import org.hornetq.api.core.client.ClusterTopologyListener;
 import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.core.client.impl.ServerLocatorImpl;
 import org.hornetq.core.client.impl.ServerLocatorInternal;
 import org.hornetq.core.client.impl.Topology;
 import org.hornetq.core.client.impl.TopologyMember;
-import org.hornetq.core.config.BridgeConfiguration;
-import org.hornetq.core.config.BroadcastGroupConfiguration;
-import org.hornetq.core.config.ClusterConnectionConfiguration;
-import org.hornetq.core.config.Configuration;
-import org.hornetq.core.config.DiscoveryGroupConfiguration;
+import org.hornetq.core.config.*;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.postoffice.Binding;
 import org.hornetq.core.postoffice.PostOffice;
+import org.hornetq.core.protocol.core.impl.wireformat.NodeAnnounceMessage;
 import org.hornetq.core.server.HornetQServer;
 import org.hornetq.core.server.Queue;
 import org.hornetq.core.server.cluster.Bridge;
@@ -103,6 +103,7 @@
    private Set<ClusterTopologyListener> clusterConnectionListeners = new ConcurrentHashSet<ClusterTopologyListener>();
 
    private Topology topology = new Topology();
+   private ClientSessionFactory backupSessionFactory;
 
    public ClusterManagerImpl(final ExecutorFactory executorFactory,
                              final HornetQServer server,
@@ -147,6 +148,11 @@
 
       if (clustered)
       {
+         BackupConnectorConfiguration connectorConfiguration = configuration.getBackupConnectorConfiguration();
+         if(connectorConfiguration != null)
+         {
+            deployBackupListener(connectorConfiguration);
+         }
          for (BroadcastGroupConfiguration config : configuration.getBroadcastGroupConfigurations())
          {
             deployBroadcastGroup(config);
@@ -174,6 +180,47 @@
       started = true;
    }
 
+   private void deployBackupListener(BackupConnectorConfiguration connectorConfiguration)
+         throws Exception
+   {
+      ServerLocator locator;
+      if (connectorConfiguration.getDiscoveryGroupName() != null)
+      {
+         DiscoveryGroupConfiguration groupConfiguration = configuration.getDiscoveryGroupConfigurations().get(connectorConfiguration.getDiscoveryGroupName());
+         if (groupConfiguration == null)
+         {
+            ClusterManagerImpl.log.warn("There is no discovery group deployed with name " + connectorConfiguration.getDiscoveryGroupName() +
+                                        " deployed. This one will not be deployed.");
+
+            return;
+         }
+         locator = new ServerLocatorImpl(true, groupConfiguration.getGroupAddress(), groupConfiguration.getGroupPort());
+      }
+      else
+      {
+         TransportConfiguration[] configs = new TransportConfiguration[connectorConfiguration.getStaticConnectors().size()];
+         for (int i = 0, configsLength = configs.length; i < configsLength; i++)
+         {
+            configs[i] = configuration.getConnectorConfigurations().get(connectorConfiguration.getStaticConnectors().get(i));
+         }
+         locator = new ServerLocatorImpl(true, configs);
+      }
+      locator.addClusterTopologyListener(new ClusterTopologyListener()
+      {
+         public void nodeUP(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last, int distance)
+         {
+            //todo update the topology
+         }
+
+         public void nodeDown(String nodeID)
+         {
+            //todo update the topology
+         }
+      });
+      backupSessionFactory = locator.createSessionFactory();
+      backupSessionFactory.getConnection().getChannel(0, -1).send(new NodeAnnounceMessage(nodeUUID.toString(), true, configuration.getConnectorConfigurations().get(connectorConfiguration.getConnector())));
+   }
+
    public synchronized void stop() throws Exception
    {
       if (!started)

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/FakeLockFile.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/FakeLockFile.java	2010-09-03 03:40:15 UTC (rev 9638)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/FakeLockFile.java	2010-09-03 09:11:37 UTC (rev 9639)
@@ -43,7 +43,7 @@
 
    private final String directory;
    
-   private final static Map<String, Semaphore> locks = new WeakHashMap<String, Semaphore>();
+   private final static Map<String, Semaphore> locks = new HashMap<String, Semaphore>();
 
    private Semaphore semaphore;
    /**

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2010-09-03 03:40:15 UTC (rev 9638)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2010-09-03 09:11:37 UTC (rev 9639)
@@ -44,6 +44,7 @@
 import org.hornetq.api.core.Pair;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
+import org.hornetq.core.config.BackupConnectorConfiguration;
 import org.hornetq.core.config.Configuration;
 import org.hornetq.core.config.CoreQueueConfiguration;
 import org.hornetq.core.config.DivertConfiguration;
@@ -560,7 +561,7 @@
             //TODO TODO at this point the clustermanager needs to announce it's presence so the cluster can know about the backup
             // We now look for the live.lock file - if it doesn't exist it means the live isn't started yet, so we wait
             // for that
-            
+
             while (true)
             {
                File liveLockFile = new File(configuration.getJournalDirectory(), "live.lock");

Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/AsynchronousFailoverTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/AsynchronousFailoverTest.java	2010-09-03 03:40:15 UTC (rev 9638)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/AsynchronousFailoverTest.java	2010-09-03 09:11:37 UTC (rev 9639)
@@ -168,10 +168,12 @@
          {
             AsynchronousFailoverTest.log.info("Iteration " + i);
             ServerLocator locator = getServerLocator();
-            sf = (ClientSessionFactoryInternal) locator.createSessionFactory();
+            locator.setBlockOnNonDurableSend(true);
+            locator.setBlockOnDurableSend(true);
+            locator.setFailoverOnServerShutdown(true);
+            locator.setReconnectAttempts(-1);
+            sf = (ClientSessionFactoryInternal) createSessionFactoryAndWaitForTopology(locator, 2);
 
-            sf.getServerLocator().setBlockOnNonDurableSend(true);
-            sf.getServerLocator().setBlockOnDurableSend(true);
 
             ClientSession createSession = sf.createSession(true, true);
 
@@ -198,15 +200,15 @@
             // Simulate failure on connection
             synchronized (lockFail)
             {
-               conn.fail(new HornetQException(HornetQException.NOT_CONNECTED));
+               fail((ClientSession) createSession);
             }
 
-            if (listener != null)
+            /*if (listener != null)
             {
                boolean ok = listener.latch.await(10000, TimeUnit.MILLISECONDS);
 
                Assert.assertTrue(ok);
-            }
+            }*/
 
             runnable.setFailed();
 
@@ -280,6 +282,10 @@
                catch (HornetQException e)
                {
                   AsynchronousFailoverTest.log.info("exception when sending message with counter " + i);
+                  if(e.getCode() != HornetQException.UNBLOCKED)
+                  {
+                     e.printStackTrace();
+                  }
                   Assert.assertEquals(e.getCode(), HornetQException.UNBLOCKED);
 
                   retry = true;

Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/ClusterWithBackupFailoverTestBase.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/ClusterWithBackupFailoverTestBase.java	2010-09-03 03:40:15 UTC (rev 9638)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/ClusterWithBackupFailoverTestBase.java	2010-09-03 09:11:37 UTC (rev 9639)
@@ -76,115 +76,121 @@
 
    public void testFailLiveNodes() throws Exception
    {
-      setupCluster();
+      try
+      {
+         setupCluster();
 
-      startServers(3, 4, 5, 0, 1, 2);
+         startServers(3, 4, 5, 0, 1, 2);
 
-      setupSessionFactory(0, 3, isNetty(), false);
-      setupSessionFactory(1, 4, isNetty(), false);
-      setupSessionFactory(2, 5, isNetty(), false);
+         setupSessionFactory(0, 3, isNetty(), false);
+         setupSessionFactory(1, 4, isNetty(), false);
+         setupSessionFactory(2, 5, isNetty(), false);
 
-      createQueue(0, "queues.testaddress", "queue0", null, true);
-      createQueue(1, "queues.testaddress", "queue0", null, true);
-      createQueue(2, "queues.testaddress", "queue0", null, true);
+         createQueue(0, "queues.testaddress", "queue0", null, true);
+         createQueue(1, "queues.testaddress", "queue0", null, true);
+         createQueue(2, "queues.testaddress", "queue0", null, true);
 
-      addConsumer(0, 0, "queue0", null);
-      addConsumer(1, 1, "queue0", null);
-      addConsumer(2, 2, "queue0", null);
+         addConsumer(0, 0, "queue0", null);
+         addConsumer(1, 1, "queue0", null);
+         addConsumer(2, 2, "queue0", null);
 
-      waitForBindings(0, "queues.testaddress", 1, 1, true);
-      waitForBindings(1, "queues.testaddress", 1, 1, true);
-      waitForBindings(2, "queues.testaddress", 1, 1, true);
+         waitForBindings(0, "queues.testaddress", 1, 1, true);
+         waitForBindings(1, "queues.testaddress", 1, 1, true);
+         waitForBindings(2, "queues.testaddress", 1, 1, true);
 
-      waitForBindings(0, "queues.testaddress", 2, 2, false);
-      waitForBindings(1, "queues.testaddress", 2, 2, false);
-      waitForBindings(2, "queues.testaddress", 2, 2, false);
+         waitForBindings(0, "queues.testaddress", 2, 2, false);
+         waitForBindings(1, "queues.testaddress", 2, 2, false);
+         waitForBindings(2, "queues.testaddress", 2, 2, false);
 
-      send(0, "queues.testaddress", 10, false, null);
-      verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
+         send(0, "queues.testaddress", 10, false, null);
+         verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
 
-      send(1, "queues.testaddress", 10, false, null);
-      verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
+         send(1, "queues.testaddress", 10, false, null);
+         verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
 
-      send(2, "queues.testaddress", 10, false, null);
-      verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
+         send(2, "queues.testaddress", 10, false, null);
+         verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
 
-      failNode(0);
+         failNode(0);
 
-      // live nodes
-      waitForBindings(1, "queues.testaddress", 1, 1, true);
-      waitForBindings(2, "queues.testaddress", 1, 1, true);
-      // activated backup nodes
-      waitForBindings(3, "queues.testaddress", 1, 1, true);
+         // live nodes
+         waitForBindings(1, "queues.testaddress", 1, 1, true);
+         waitForBindings(2, "queues.testaddress", 1, 1, true);
+         // activated backup nodes
+         waitForBindings(3, "queues.testaddress", 1, 1, true);
 
-      // live nodes
-      waitForBindings(1, "queues.testaddress", 2, 2, false);
-      waitForBindings(2, "queues.testaddress", 2, 2, false);
-      // activated backup nodes
-      waitForBindings(3, "queues.testaddress", 2, 2, false);
+         // live nodes
+         waitForBindings(1, "queues.testaddress", 2, 2, false);
+         waitForBindings(2, "queues.testaddress", 2, 2, false);
+         // activated backup nodes
+         waitForBindings(3, "queues.testaddress", 2, 2, false);
 
-      ClusterWithBackupFailoverTestBase.log.info("** now sending");
+         ClusterWithBackupFailoverTestBase.log.info("** now sending");
 
-      send(0, "queues.testaddress", 10, false, null);
-      verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
+         send(0, "queues.testaddress", 10, false, null);
+         verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
 
-      send(1, "queues.testaddress", 10, false, null);
-      verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
+         send(1, "queues.testaddress", 10, false, null);
+         verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
 
-      send(2, "queues.testaddress", 10, false, null);
-      verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
+         send(2, "queues.testaddress", 10, false, null);
+         verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
 
-      failNode(1);
+         failNode(1);
 
-      // live nodes
-      waitForBindings(2, "queues.testaddress", 1, 1, true);
-      // activated backup nodes
-      waitForBindings(3, "queues.testaddress", 1, 1, true);
-      waitForBindings(4, "queues.testaddress", 1, 1, true);
+         // live nodes
+         waitForBindings(2, "queues.testaddress", 1, 1, true);
+         // activated backup nodes
+         waitForBindings(3, "queues.testaddress", 1, 1, true);
+         waitForBindings(4, "queues.testaddress", 1, 1, true);
 
-      // live nodes
-      waitForBindings(2, "queues.testaddress", 2, 2, false);
-      // activated backup nodes
-      waitForBindings(3, "queues.testaddress", 2, 2, false);
-      waitForBindings(4, "queues.testaddress", 2, 2, false);
+         // live nodes
+         waitForBindings(2, "queues.testaddress", 2, 2, false);
+         // activated backup nodes
+         waitForBindings(3, "queues.testaddress", 2, 2, false);
+         waitForBindings(4, "queues.testaddress", 2, 2, false);
 
-      send(0, "queues.testaddress", 10, false, null);
-      verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
+         send(0, "queues.testaddress", 10, false, null);
+         verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
 
-      send(1, "queues.testaddress", 10, false, null);
-      verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
+         send(1, "queues.testaddress", 10, false, null);
+         verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
 
-      send(2, "queues.testaddress", 10, false, null);
-      verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
+         send(2, "queues.testaddress", 10, false, null);
+         verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
 
-      failNode(2);
+         failNode(2);
 
-      // activated backup nodes
-      waitForBindings(3, "queues.testaddress", 1, 1, true);
-      waitForBindings(4, "queues.testaddress", 1, 1, true);
-      waitForBindings(5, "queues.testaddress", 1, 1, true);
+         // activated backup nodes
+         waitForBindings(3, "queues.testaddress", 1, 1, true);
+         waitForBindings(4, "queues.testaddress", 1, 1, true);
+         waitForBindings(5, "queues.testaddress", 1, 1, true);
 
-      // activated backup nodes
-      waitForBindings(3, "queues.testaddress", 2, 2, false);
-      waitForBindings(4, "queues.testaddress", 2, 2, false);
-      waitForBindings(5, "queues.testaddress", 2, 2, false);
+         // activated backup nodes
+         waitForBindings(3, "queues.testaddress", 2, 2, false);
+         waitForBindings(4, "queues.testaddress", 2, 2, false);
+         waitForBindings(5, "queues.testaddress", 2, 2, false);
 
-      send(0, "queues.testaddress", 10, false, null);
-      verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
+         send(0, "queues.testaddress", 10, false, null);
+         verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
 
-      send(1, "queues.testaddress", 10, false, null);
-      verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
+         send(1, "queues.testaddress", 10, false, null);
+         verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
 
-      send(2, "queues.testaddress", 10, false, null);
-      verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
+         send(2, "queues.testaddress", 10, false, null);
+         verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
 
-      removeConsumer(0);
-      removeConsumer(1);
-      removeConsumer(2);
+         removeConsumer(0);
+         removeConsumer(1);
+         removeConsumer(2);
 
-      stopServers();
+         stopServers();
 
-      ClusterWithBackupFailoverTestBase.log.info("*** test done");
+         ClusterWithBackupFailoverTestBase.log.info("*** test done");
+      } catch (Exception e)
+      {
+         e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+      }
    }
    
    public void testFailBackupNodes() throws Exception

Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java	2010-09-03 03:40:15 UTC (rev 9638)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java	2010-09-03 09:11:37 UTC (rev 9639)
@@ -366,30 +366,8 @@
    }
 
 
-    private void fail(final ClientSession... sessions) throws Exception
-   {
-      final CountDownLatch latch = new CountDownLatch(sessions.length);
 
-      class MyListener extends BaseListener
-      {
-         public void connectionFailed(final HornetQException me)
-         {
-            latch.countDown();
-         }
 
-      }
-      for (ClientSession session : sessions)
-      {
-         session.addFailureListener(new MyListener());
-      }
-      server0Service.stop();
-
-      // Wait to be informed of failure
-      boolean ok = latch.await(10000, TimeUnit.MILLISECONDS);
-
-      Assert.assertTrue(ok);
-   }
-
    public void testTransactedMessagesSentSoRollback() throws Exception
    {
       ServerLocator locator = getServerLocator();

Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java	2010-09-03 03:40:15 UTC (rev 9638)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java	2010-09-03 09:11:37 UTC (rev 9639)
@@ -22,16 +22,15 @@
 
 import junit.framework.Assert;
 
+import org.hornetq.api.core.HornetQException;
 import org.hornetq.api.core.Pair;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.ClientSessionFactory;
-import org.hornetq.api.core.client.ClusterTopologyListener;
-import org.hornetq.api.core.client.HornetQClient;
-import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.api.core.client.*;
 import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
 import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
 import org.hornetq.core.client.impl.ServerLocatorInternal;
+import org.hornetq.core.config.BackupConnectorConfiguration;
 import org.hornetq.core.config.ClusterConnectionConfiguration;
 import org.hornetq.core.config.Configuration;
 import org.hornetq.core.remoting.impl.invm.InVMConnector;
@@ -114,13 +113,11 @@
       config1.setClustered(true);
       TransportConfiguration liveConnector = getConnectorTransportConfiguration(true);
       TransportConfiguration backupConnector = getConnectorTransportConfiguration(false);
-      List<String> staticConnectors = new ArrayList<String>();
-      staticConnectors.add(liveConnector.getName());
-       ClusterConnectionConfiguration ccc1 = new ClusterConnectionConfiguration("cluster1", "jms", backupConnector.getName(), -1, false, false, 1, 1,
-               staticConnectors);
-      config1.getClusterConfigurations().add(ccc1);
       config1.getConnectorConfigurations().put(liveConnector.getName(), liveConnector);
       config1.getConnectorConfigurations().put(backupConnector.getName(), backupConnector);
+      ArrayList<String> staticConnectors = new ArrayList<String>();
+      staticConnectors.add(liveConnector.getName());
+      config1.setBackupConnectorConfiguration(new BackupConnectorConfiguration(staticConnectors, backupConnector.getName()));
       server1Service = createFakeLockServer(true, config1);
       
       server1Service.registerActivateCallback(new ActivateCallback()
@@ -323,9 +320,40 @@
       return (ServerLocatorInternal) locator;
    }
 
+   protected void fail(final ClientSession... sessions) throws Exception
+   {
+      final CountDownLatch latch = new CountDownLatch(sessions.length);
+
+      class MyListener extends BaseListener
+      {
+         public void connectionFailed(final HornetQException me)
+         {
+            latch.countDown();
+         }
+
+      }
+      for (ClientSession session : sessions)
+      {
+         session.addFailureListener(new MyListener());
+      }
+      server0Service.stop();
+
+      // Wait to be informed of failure
+      boolean ok = latch.await(10000, TimeUnit.MILLISECONDS);
+
+      Assert.assertTrue(ok);
+   }
    // Private -------------------------------------------------------
 
    // Inner classes -------------------------------------------------
+
+   abstract class BaseListener implements SessionFailureListener
+   {
+      public void beforeReconnect(final HornetQException me)
+      {
+      }
+   }
+   
    class LatchClusterTopologyListener implements ClusterTopologyListener
    {
       final CountDownLatch latch;



More information about the hornetq-commits mailing list