[hornetq-commits] JBoss hornetq SVN: r12269 - in trunk: tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover and 1 other directory.
do-not-reply at jboss.org
do-not-reply at jboss.org
Thu Mar 8 11:07:11 EST 2012
Author: borges
Date: 2012-03-08 11:07:11 -0500 (Thu, 08 Mar 2012)
New Revision: 12269
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QuorumManager.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/QuorumFailOverTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/StaticClusterWithBackupFailoverTest.java
Log:
HORNETQ-776 HORNETQ-720 Improve QuorumVoting code and tests. (Unfinished)
Note that currently the code will avoid QuorumVoting even for this test!
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2012-03-08 16:06:44 UTC (rev 12268)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2012-03-08 16:07:11 UTC (rev 12269)
@@ -2108,7 +2108,7 @@
final TransportConfiguration config = configuration.getConnectorConfigurations().get(liveConnectorName);
serverLocator0 = (ServerLocatorInternal)HornetQClient.createServerLocatorWithHA(config);
- final QuorumManager quorumManager = new QuorumManager(serverLocator0);
+ final QuorumManager quorumManager = new QuorumManager(serverLocator0, threadPool);
replicationEndpoint.setQuorumManager(quorumManager);
serverLocator0.setReconnectAttempts(-1);
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QuorumManager.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QuorumManager.java 2012-03-08 16:06:44 UTC (rev 12268)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QuorumManager.java 2012-03-08 16:07:11 UTC (rev 12269)
@@ -7,7 +7,6 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -35,12 +34,15 @@
private final Map<String, Pair<TransportConfiguration, TransportConfiguration>> nodes =
new ConcurrentHashMap<String, Pair<TransportConfiguration, TransportConfiguration>>();
+ private final ExecutorService executor;
+
/** safety parameter to make _sure_ we get out of await() */
private static final int LATCH_TIMEOUT = 60;
private static final long DISCOVERY_TIMEOUT = 5;
- public QuorumManager(ServerLocator serverLocator)
+ public QuorumManager(ServerLocator serverLocator, ExecutorService executor)
{
+ this.executor = executor;
this.locator = serverLocator;
locator.addClusterTopologyListener(this);
}
@@ -81,11 +83,10 @@
{
return true;
}
- // go for the vote...
+
final int size = nodes.size();
Set<ServerLocator> locatorsList = new HashSet<ServerLocator>(size);
AtomicInteger pingCount = new AtomicInteger(0);
- ExecutorService pool = Executors.newFixedThreadPool(size);
final CountDownLatch latch = new CountDownLatch(size);
try
{
@@ -96,7 +97,7 @@
TransportConfiguration serverTC = pair.getValue().getA();
ServerLocatorImpl locator = (ServerLocatorImpl)HornetQClient.createServerLocatorWithoutHA(serverTC);
locatorsList.add(locator);
- pool.submit(new ServerConnect(latch, pingCount, locator));
+ executor.submit(new ServerConnect(latch, pingCount, locator));
}
// Some servers may have disappeared between the latch creation
for (int i = 0; i < size - locatorsList.size(); i++)
@@ -125,7 +126,6 @@
// no-op
}
}
- pool.shutdownNow();
}
}
@@ -145,7 +145,7 @@
@Override
public void run()
{
- locator.setReconnectAttempts(-1);
+ locator.setReconnectAttempts(0);
locator.getDiscoveryGroupConfiguration().setDiscoveryInitialWaitTimeout(DISCOVERY_TIMEOUT);
final ClientSessionFactory liveServerSessionFactory;
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/QuorumFailOverTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/QuorumFailOverTest.java 2012-03-08 16:06:44 UTC (rev 12268)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/QuorumFailOverTest.java 2012-03-08 16:07:11 UTC (rev 12269)
@@ -17,27 +17,37 @@
setupCluster();
startServers(0, 1, 2, 3, 4, 5);
+
+ for (int i = 0; i < 3; i++)
+ {
+ waitForTopology(servers[i], 3, 3);
+ }
+
+ waitForFailoverTopology(3, 0, 1, 2);
+ waitForFailoverTopology(4, 0, 1, 2);
+ waitForFailoverTopology(5, 0, 1, 2);
+
for (int i : new int[] { 0, 1, 2 })
{
setupSessionFactory(i, i + 3, isNetty(), false);
}
createQueue(0, QUEUES_TESTADDRESS, QUEUE_NAME, null, true);
+ addConsumer(0, 0, QUEUE_NAME, null);
+ waitForBindings(0, QUEUES_TESTADDRESS, 1, 1, true);
final TopologyListener liveTopologyListener = new TopologyListener("LIVE-1");
locators[0].addClusterTopologyListener(liveTopologyListener);
- final TopologyListener backupTopologyListener = new TopologyListener("LIVE-2");
- locators[1].addClusterTopologyListener(backupTopologyListener);
-
assertTrue("we assume 3 is a backup", servers[3].getConfiguration().isBackup());
assertFalse("no shared storage", servers[3].getConfiguration().isSharedStore());
- // assertEquals(liveTopologyListener.toString(), 6, liveTopologyListener.nodes.size());
- // assertEquals(backupTopologyListener.toString(), 6, backupTopologyListener.nodes.size());
+ failNode(0);
- failNode(0);
+ waitForFailoverTopology(4, 3, 1, 2);
+ waitForFailoverTopology(5, 3, 1, 2);
+
waitForBindings(3, QUEUES_TESTADDRESS, 1, 1, true);
assertTrue(servers[3].waitForInitialization(10, TimeUnit.SECONDS));
@@ -67,14 +77,12 @@
Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last)
{
nodes.put(nodeID, connectorPair);
- System.out.println(prefix + " UP: " + nodeID + " connectPair=" + connectorPair);
}
@Override
public void nodeDown(long eventUID, String nodeID)
{
nodes.remove(nodeID);
- System.out.println(prefix + " DOWN: " + nodeID);
}
@Override
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/StaticClusterWithBackupFailoverTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/StaticClusterWithBackupFailoverTest.java 2012-03-08 16:06:44 UTC (rev 12268)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/StaticClusterWithBackupFailoverTest.java 2012-03-08 16:07:11 UTC (rev 12269)
@@ -23,15 +23,6 @@
public class StaticClusterWithBackupFailoverTest extends ClusterWithBackupFailoverTestBase
{
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
@Override
protected void setupCluster(final boolean forwardWhenNoConsumers) throws Exception
{
More information about the hornetq-commits
mailing list