[hornetq-commits] JBoss hornetq SVN: r12269 - in trunk: tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Mar 8 11:07:11 EST 2012


Author: borges
Date: 2012-03-08 11:07:11 -0500 (Thu, 08 Mar 2012)
New Revision: 12269

Modified:
   trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
   trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QuorumManager.java
   trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/QuorumFailOverTest.java
   trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/StaticClusterWithBackupFailoverTest.java
Log:
HORNETQ-776 HORNETQ-720 Improve QuorumVoting code and tests. (Unfinished)
Note that currently the code will avoid QuorumVoting even for this test!

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java	2012-03-08 16:06:44 UTC (rev 12268)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java	2012-03-08 16:07:11 UTC (rev 12269)
@@ -2108,7 +2108,7 @@
 
             final TransportConfiguration config = configuration.getConnectorConfigurations().get(liveConnectorName);
             serverLocator0 = (ServerLocatorInternal)HornetQClient.createServerLocatorWithHA(config);
-            final QuorumManager quorumManager = new QuorumManager(serverLocator0);
+            final QuorumManager quorumManager = new QuorumManager(serverLocator0, threadPool);
             replicationEndpoint.setQuorumManager(quorumManager);
 
             serverLocator0.setReconnectAttempts(-1);

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QuorumManager.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QuorumManager.java	2012-03-08 16:06:44 UTC (rev 12268)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QuorumManager.java	2012-03-08 16:07:11 UTC (rev 12269)
@@ -7,7 +7,6 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -35,12 +34,15 @@
    private final Map<String, Pair<TransportConfiguration, TransportConfiguration>> nodes =
             new ConcurrentHashMap<String, Pair<TransportConfiguration, TransportConfiguration>>();
 
+   private final ExecutorService executor;
+
    /** safety parameter to make _sure_ we get out of await() */
    private static final int LATCH_TIMEOUT = 60;
    private static final long DISCOVERY_TIMEOUT = 5;
 
-   public QuorumManager(ServerLocator serverLocator)
+   public QuorumManager(ServerLocator serverLocator, ExecutorService executor)
    {
+      this.executor = executor;
       this.locator = serverLocator;
       locator.addClusterTopologyListener(this);
    }
@@ -81,11 +83,10 @@
       {
          return true;
       }
-      // go for the vote...
+
       final int size = nodes.size();
       Set<ServerLocator> locatorsList = new HashSet<ServerLocator>(size);
       AtomicInteger pingCount = new AtomicInteger(0);
-      ExecutorService pool = Executors.newFixedThreadPool(size);
       final CountDownLatch latch = new CountDownLatch(size);
       try
       {
@@ -96,7 +97,7 @@
             TransportConfiguration serverTC = pair.getValue().getA();
             ServerLocatorImpl locator = (ServerLocatorImpl)HornetQClient.createServerLocatorWithoutHA(serverTC);
             locatorsList.add(locator);
-            pool.submit(new ServerConnect(latch, pingCount, locator));
+            executor.submit(new ServerConnect(latch, pingCount, locator));
          }
          // Some servers may have disappeared between the latch creation
          for (int i = 0; i < size - locatorsList.size(); i++)
@@ -125,7 +126,6 @@
                // no-op
             }
          }
-         pool.shutdownNow();
       }
    }
 
@@ -145,7 +145,7 @@
       @Override
       public void run()
       {
-         locator.setReconnectAttempts(-1);
+         locator.setReconnectAttempts(0);
          locator.getDiscoveryGroupConfiguration().setDiscoveryInitialWaitTimeout(DISCOVERY_TIMEOUT);
 
          final ClientSessionFactory liveServerSessionFactory;

Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/QuorumFailOverTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/QuorumFailOverTest.java	2012-03-08 16:06:44 UTC (rev 12268)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/QuorumFailOverTest.java	2012-03-08 16:07:11 UTC (rev 12269)
@@ -17,27 +17,37 @@
       setupCluster();
 
       startServers(0, 1, 2, 3, 4, 5);
+
+      for (int i = 0; i < 3; i++)
+      {
+         waitForTopology(servers[i], 3, 3);
+      }
+
+      waitForFailoverTopology(3, 0, 1, 2);
+      waitForFailoverTopology(4, 0, 1, 2);
+      waitForFailoverTopology(5, 0, 1, 2);
+
       for (int i : new int[] { 0, 1, 2 })
       {
          setupSessionFactory(i, i + 3, isNetty(), false);
       }
 
       createQueue(0, QUEUES_TESTADDRESS, QUEUE_NAME, null, true);
+      addConsumer(0, 0, QUEUE_NAME, null);
+      waitForBindings(0, QUEUES_TESTADDRESS, 1, 1, true);
 
       final TopologyListener liveTopologyListener = new TopologyListener("LIVE-1");
 
       locators[0].addClusterTopologyListener(liveTopologyListener);
 
-      final TopologyListener backupTopologyListener = new TopologyListener("LIVE-2");
-      locators[1].addClusterTopologyListener(backupTopologyListener);
-
       assertTrue("we assume 3 is a backup", servers[3].getConfiguration().isBackup());
       assertFalse("no shared storage", servers[3].getConfiguration().isSharedStore());
 
-      // assertEquals(liveTopologyListener.toString(), 6, liveTopologyListener.nodes.size());
-      // assertEquals(backupTopologyListener.toString(), 6, backupTopologyListener.nodes.size());
+      failNode(0);
 
-      failNode(0);
+      waitForFailoverTopology(4, 3, 1, 2);
+      waitForFailoverTopology(5, 3, 1, 2);
+
       waitForBindings(3, QUEUES_TESTADDRESS, 1, 1, true);
 
       assertTrue(servers[3].waitForInitialization(10, TimeUnit.SECONDS));
@@ -67,14 +77,12 @@
                          Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last)
       {
          nodes.put(nodeID, connectorPair);
-         System.out.println(prefix + " UP: " + nodeID + " connectPair=" + connectorPair);
       }
 
       @Override
       public void nodeDown(long eventUID, String nodeID)
       {
          nodes.remove(nodeID);
-         System.out.println(prefix + " DOWN: " + nodeID);
       }
 
       @Override

Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/StaticClusterWithBackupFailoverTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/StaticClusterWithBackupFailoverTest.java	2012-03-08 16:06:44 UTC (rev 12268)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/StaticClusterWithBackupFailoverTest.java	2012-03-08 16:07:11 UTC (rev 12269)
@@ -23,15 +23,6 @@
 public class StaticClusterWithBackupFailoverTest extends ClusterWithBackupFailoverTestBase
 {
 
-   // Constants -----------------------------------------------------
-
-   // Attributes ----------------------------------------------------
-
-   // Static --------------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   // Public --------------------------------------------------------
    @Override
    protected void setupCluster(final boolean forwardWhenNoConsumers) throws Exception
    {



More information about the hornetq-commits mailing list