[hornetq-commits] JBoss hornetq SVN: r12304 - in trunk: hornetq-core/src/main/java/org/hornetq/core/client/impl and 2 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Thu Mar 15 09:11:38 EDT 2012
Author: borges
Date: 2012-03-15 09:11:37 -0400 (Thu, 15 Mar 2012)
New Revision: 12304
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/api/core/DiscoveryGroupConfiguration.java
trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QuorumManager.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/remoting/PingTest.java
Log:
HORNETQ-720 HORNETQ-776 Fix connection opening for voting. Exit earlier if possible.
Modified: trunk/hornetq-core/src/main/java/org/hornetq/api/core/DiscoveryGroupConfiguration.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/api/core/DiscoveryGroupConfiguration.java 2012-03-15 09:56:54 UTC (rev 12303)
+++ trunk/hornetq-core/src/main/java/org/hornetq/api/core/DiscoveryGroupConfiguration.java 2012-03-15 13:11:37 UTC (rev 12304)
@@ -176,9 +176,6 @@
return result;
}
- /* (non-Javadoc)
- * @see java.lang.Object#toString()
- */
@Override
public String toString()
{
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java 2012-03-15 09:56:54 UTC (rev 12303)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java 2012-03-15 13:11:37 UTC (rev 12304)
@@ -247,7 +247,7 @@
return globalThreadPool;
}
- public static synchronized ScheduledExecutorService getGlobalScheduledThreadPool()
+ private static synchronized ScheduledExecutorService getGlobalScheduledThreadPool()
{
if (globalScheduledThreadPool == null)
{
@@ -975,6 +975,7 @@
this.autoGroup = autoGroup;
}
+ @Override
public boolean isPreAcknowledge()
{
return preAcknowledge;
@@ -1149,17 +1150,13 @@
return groupID;
}
- /* (non-Javadoc)
- * @see org.hornetq.api.core.client.ServerLocator#isCompressLargeMessage()
- */
+ @Override
public boolean isCompressLargeMessage()
{
return compressLargeMessage;
}
- /* (non-Javadoc)
- * @see org.hornetq.api.core.client.ServerLocator#setCompressLargeMessage(boolean)
- */
+ @Override
public void setCompressLargeMessage(boolean compress)
{
this.compressLargeMessage = compress;
@@ -1173,6 +1170,7 @@
}
}
+ @Override
public void setIdentity(String identity)
{
this.identity = identity;
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2012-03-15 09:56:54 UTC (rev 12303)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2012-03-15 13:11:37 UTC (rev 12304)
@@ -2130,7 +2130,7 @@
liveServerSessionFactory.setReconnectAttempts(1);
quorumManager.setSessionFactory(liveServerSessionFactory);
CoreRemotingConnection liveConnection = liveServerSessionFactory.getConnection();
- liveConnection.addFailureListener(quorumManager);
+ quorumManager.addAsFailureListenerOf(liveConnection);
Channel pingChannel = liveConnection.getChannel(CHANNEL_ID.PING.id, -1);
Channel replicationChannel = liveConnection.getChannel(CHANNEL_ID.REPLICATION.id, -1);
connectToReplicationEndpoint(replicationChannel);
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QuorumManager.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QuorumManager.java 2012-03-15 09:56:54 UTC (rev 12303)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QuorumManager.java 2012-03-15 13:11:37 UTC (rev 12304)
@@ -10,6 +10,7 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.client.ServerLocator;
@@ -18,6 +19,7 @@
import org.hornetq.core.client.impl.Topology;
import org.hornetq.core.client.impl.TopologyMember;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.protocol.core.CoreRemotingConnection;
import org.hornetq.core.remoting.FailureListener;
import org.hornetq.core.server.HornetQServer;
@@ -39,10 +41,10 @@
private ClientSessionFactoryInternal sessionFactory;
private final Topology topology;
private final HornetQServer backupServer;
+ private CoreRemotingConnection connection;
/** safety parameter to make _sure_ we get out of await() */
private static final int LATCH_TIMEOUT = 30;
- private static final long DISCOVERY_TIMEOUT = 5;
private static final int RECONNECT_ATTEMPTS = 5;
public QuorumManager(HornetQServer backup, ServerLocator serverLocator, ExecutorService executor, String identity)
@@ -63,33 +65,31 @@
{
Collection<TopologyMember> nodes = topology.getMembers();
- if (nodes.size() < 2) // the life server is also in the list
- {
- return true;
- }
- final int size = nodes.size();
Collection<ServerLocator> locatorsList = new LinkedList<ServerLocator>();
AtomicInteger pingCount = new AtomicInteger(0);
- final CountDownLatch latch = new CountDownLatch(size);
int total = 0;
+ for (TopologyMember tm : nodes)
+ if (useIt(tm))
+ total++;
+
+ if (total < 1)
+ return true;
+
+ final CountDownLatch latch = new CountDownLatch(total);
try
{
for (TopologyMember tm : nodes)
{
Pair<TransportConfiguration, TransportConfiguration> pair = tm.getConnector();
-// if (targetServerID.equals(pair.getKey()))
-// continue;
+
TransportConfiguration serverTC = pair.getA();
- if (serverTC == null)
+ if (useIt(tm))
{
- latch.countDown();
- continue;
+ ServerLocatorImpl locator = (ServerLocatorImpl)HornetQClient.createServerLocatorWithoutHA(serverTC);
+ locatorsList.add(locator);
+ executor.submit(new ServerConnect(latch, total, pingCount, locator, serverTC));
}
- total++;
- ServerLocatorImpl locator = (ServerLocatorImpl)HornetQClient.createServerLocatorWithoutHA(serverTC);
- locatorsList.add(locator);
- executor.submit(new ServerConnect(latch, pingCount, locator));
}
try
@@ -101,7 +101,9 @@
// No-op. The best the quorum can do now is to return the latest number it has
}
// -1: because the live server is not being filtered out.
- return pingCount.get() * 2 >= locatorsList.size() - 1;
+ boolean vote = nodeIsDown(total, pingCount.get());
+ log.trace("quorum vote is liveIsDown=" + vote + ", count=" + pingCount);
+ return vote;
}
finally
{
@@ -119,6 +121,15 @@
}
}
+ /**
+ * @param tm
+ * @return
+ */
+ private boolean useIt(TopologyMember tm)
+ {
+ return tm.getA() != null && !targetServerID.equals(tm.getA().getName());
+ }
+
@Override
public String toString()
{
@@ -126,6 +137,29 @@
}
/**
+ * Returns {@link true} when a sufficient number of votes has been cast to take a decision.
+ * @param total the total number of votes
+ * @param pingCount the total number of servers that were reached
+ * @param votesLeft the total number of servers for which we don't have a decision yet.
+ * @return
+ */
+ private static final boolean isSufficient(int total, int pingCount, long votesLeft)
+ {
+ boolean notEnoughVotesLeft = total - 2 * (pingCount + votesLeft) > 0;
+ return nodeIsDown(total, pingCount) || notEnoughVotesLeft;
+ }
+
+ /**
+ * @param total
+ * @param pingCount
+ * @return
+ */
+ private static boolean nodeIsDown(int total, int pingCount)
+ {
+ return pingCount * 2 >= total - 1;
+ }
+
+ /**
* Attempts to connect to a given server.
*/
private static class ServerConnect implements Runnable
@@ -133,27 +167,44 @@
private final ServerLocatorImpl locator;
private final CountDownLatch latch;
private final AtomicInteger count;
+ private final TransportConfiguration tc;
+ private final int total;
- public ServerConnect(CountDownLatch latch, AtomicInteger count, ServerLocatorImpl serverLocator)
+ public ServerConnect(CountDownLatch latch, int total, AtomicInteger count,
+ ServerLocatorImpl serverLocator,
+ TransportConfiguration serverTC)
{
- locator = serverLocator;
- this.latch = latch;
+ this.total = total;
+ this.locator = serverLocator;
+ this.latch=latch;
this.count = count;
+ this.tc = serverTC;
}
+
@Override
public void run()
{
locator.setReconnectAttempts(0);
- locator.getDiscoveryGroupConfiguration().setDiscoveryInitialWaitTimeout(DISCOVERY_TIMEOUT);
- final ClientSessionFactory liveServerSessionFactory;
+ final ClientSessionFactory sessionFactory;
+ ClientSession session;
try
{
- liveServerSessionFactory = locator.connect();
- if (liveServerSessionFactory != null)
+ sessionFactory = locator.createSessionFactory(tc);
+ if (sessionFactory != null)
{
- count.incrementAndGet();
+ session = sessionFactory.createSession();
+ if (session != null)
+ {
+ if (isSufficient(total, count.incrementAndGet(), latch.getCount() - 1))
+ {
+ while (latch.getCount() > 0)
+ latch.countDown();
+ }
+ session.close();
+ sessionFactory.close();
+ }
}
}
catch (Exception e)
@@ -171,6 +222,9 @@
@Override
public void connectionFailed(HornetQException exception, boolean failedOver)
{
+ if (signal != null)
+ return;
+
// Check if connection was reestablished by the sessionFactory:
if (sessionFactory.numConnections() > 0)
{
@@ -258,15 +312,26 @@
*/
public synchronized void causeExit()
{
+ removeAsFailureListener();
signal = BACKUP_ACTIVATION.STOP;
latch.countDown();
}
/**
+ *
+ */
+ private void removeAsFailureListener()
+ {
+ if (connection != null)
+ connection.removeFailureListener(this);
+ }
+
+ /**
* Releases the latch, causing the backup activation thread to fail-over.
*/
public synchronized void failOver()
{
+ removeAsFailureListener();
signal = BACKUP_ACTIVATION.FAIL_OVER;
latch.countDown();
}
@@ -278,4 +343,13 @@
{
this.sessionFactory = sessionFactory;
}
+
+ /**
+ * @param liveConnection
+ */
+ public void addAsFailureListenerOf(CoreRemotingConnection liveConnection)
+ {
+ this.connection = liveConnection;
+ connection.addFailureListener(this);
+ }
}
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/remoting/PingTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/remoting/PingTest.java 2012-03-15 09:56:54 UTC (rev 12303)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/remoting/PingTest.java 2012-03-15 13:11:37 UTC (rev 12304)
@@ -95,8 +95,7 @@
*/
public void testNoFailureWithPinging() throws Exception
{
- TransportConfiguration transportConfig = new TransportConfiguration("org.hornetq.core.remoting.impl.netty.NettyConnectorFactory");
- ServerLocator locator = addServerLocator(HornetQClient.createServerLocatorWithoutHA(transportConfig));
+ ServerLocator locator = createNettyNonHALocator();
locator.setClientFailureCheckPeriod(PingTest.CLIENT_FAILURE_CHECK_PERIOD);
locator.setConnectionTTL(PingTest.CLIENT_FAILURE_CHECK_PERIOD * 2);
More information about the hornetq-commits
mailing list