[hornetq-commits] JBoss hornetq SVN: r12304 - in trunk: hornetq-core/src/main/java/org/hornetq/core/client/impl and 2 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Mar 15 09:11:38 EDT 2012


Author: borges
Date: 2012-03-15 09:11:37 -0400 (Thu, 15 Mar 2012)
New Revision: 12304

Modified:
   trunk/hornetq-core/src/main/java/org/hornetq/api/core/DiscoveryGroupConfiguration.java
   trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java
   trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
   trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QuorumManager.java
   trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/remoting/PingTest.java
Log:
HORNETQ-720 HORNETQ-776 Fix connection opening for voting. Exit earlier if possible.

Modified: trunk/hornetq-core/src/main/java/org/hornetq/api/core/DiscoveryGroupConfiguration.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/api/core/DiscoveryGroupConfiguration.java	2012-03-15 09:56:54 UTC (rev 12303)
+++ trunk/hornetq-core/src/main/java/org/hornetq/api/core/DiscoveryGroupConfiguration.java	2012-03-15 13:11:37 UTC (rev 12304)
@@ -176,9 +176,6 @@
       return result;
    }
 
-   /* (non-Javadoc)
-    * @see java.lang.Object#toString()
-    */
    @Override
    public String toString()
    {

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java	2012-03-15 09:56:54 UTC (rev 12303)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java	2012-03-15 13:11:37 UTC (rev 12304)
@@ -247,7 +247,7 @@
       return globalThreadPool;
    }
 
-   public static synchronized ScheduledExecutorService getGlobalScheduledThreadPool()
+   private static synchronized ScheduledExecutorService getGlobalScheduledThreadPool()
    {
       if (globalScheduledThreadPool == null)
       {
@@ -975,6 +975,7 @@
       this.autoGroup = autoGroup;
    }
 
+   @Override
    public boolean isPreAcknowledge()
    {
       return preAcknowledge;
@@ -1149,17 +1150,13 @@
       return groupID;
    }
 
-   /* (non-Javadoc)
-    * @see org.hornetq.api.core.client.ServerLocator#isCompressLargeMessage()
-    */
+   @Override
    public boolean isCompressLargeMessage()
    {
       return compressLargeMessage;
    }
 
-   /* (non-Javadoc)
-    * @see org.hornetq.api.core.client.ServerLocator#setCompressLargeMessage(boolean)
-    */
+   @Override
    public void setCompressLargeMessage(boolean compress)
    {
       this.compressLargeMessage = compress;
@@ -1173,6 +1170,7 @@
       }
    }
 
+   @Override
    public void setIdentity(String identity)
    {
       this.identity = identity;

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java	2012-03-15 09:56:54 UTC (rev 12303)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java	2012-03-15 13:11:37 UTC (rev 12304)
@@ -2130,7 +2130,7 @@
                      liveServerSessionFactory.setReconnectAttempts(1);
                      quorumManager.setSessionFactory(liveServerSessionFactory);
                      CoreRemotingConnection liveConnection = liveServerSessionFactory.getConnection();
-                     liveConnection.addFailureListener(quorumManager);
+                     quorumManager.addAsFailureListenerOf(liveConnection);
                      Channel pingChannel = liveConnection.getChannel(CHANNEL_ID.PING.id, -1);
                      Channel replicationChannel = liveConnection.getChannel(CHANNEL_ID.REPLICATION.id, -1);
                      connectToReplicationEndpoint(replicationChannel);

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QuorumManager.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QuorumManager.java	2012-03-15 09:56:54 UTC (rev 12303)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QuorumManager.java	2012-03-15 13:11:37 UTC (rev 12304)
@@ -10,6 +10,7 @@
 import org.hornetq.api.core.HornetQException;
 import org.hornetq.api.core.Pair;
 import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientSession;
 import org.hornetq.api.core.client.ClientSessionFactory;
 import org.hornetq.api.core.client.HornetQClient;
 import org.hornetq.api.core.client.ServerLocator;
@@ -18,6 +19,7 @@
 import org.hornetq.core.client.impl.Topology;
 import org.hornetq.core.client.impl.TopologyMember;
 import org.hornetq.core.logging.Logger;
+import org.hornetq.core.protocol.core.CoreRemotingConnection;
 import org.hornetq.core.remoting.FailureListener;
 import org.hornetq.core.server.HornetQServer;
 
@@ -39,10 +41,10 @@
    private ClientSessionFactoryInternal sessionFactory;
    private final Topology topology;
    private final HornetQServer backupServer;
+   private CoreRemotingConnection connection;
 
    /** safety parameter to make _sure_ we get out of await() */
    private static final int LATCH_TIMEOUT = 30;
-   private static final long DISCOVERY_TIMEOUT = 5;
    private static final int RECONNECT_ATTEMPTS = 5;
 
    public QuorumManager(HornetQServer backup, ServerLocator serverLocator, ExecutorService executor, String identity)
@@ -63,33 +65,31 @@
    {
       Collection<TopologyMember> nodes = topology.getMembers();
 
-      if (nodes.size() < 2) // the life server is also in the list
-      {
-         return true;
-      }
 
-      final int size = nodes.size();
       Collection<ServerLocator> locatorsList = new LinkedList<ServerLocator>();
       AtomicInteger pingCount = new AtomicInteger(0);
-      final CountDownLatch latch = new CountDownLatch(size);
       int total = 0;
+      for (TopologyMember tm : nodes)
+         if (useIt(tm))
+            total++;
+
+      if (total < 1)
+         return true;
+
+      final CountDownLatch latch = new CountDownLatch(total);
       try
       {
          for (TopologyMember tm : nodes)
          {
             Pair<TransportConfiguration, TransportConfiguration> pair = tm.getConnector();
-//            if (targetServerID.equals(pair.getKey()))
-//               continue;
+
             TransportConfiguration serverTC = pair.getA();
-            if (serverTC == null)
+            if (useIt(tm))
             {
-               latch.countDown();
-               continue;
+               ServerLocatorImpl locator = (ServerLocatorImpl)HornetQClient.createServerLocatorWithoutHA(serverTC);
+               locatorsList.add(locator);
+               executor.submit(new ServerConnect(latch, total, pingCount, locator, serverTC));
             }
-            total++;
-            ServerLocatorImpl locator = (ServerLocatorImpl)HornetQClient.createServerLocatorWithoutHA(serverTC);
-            locatorsList.add(locator);
-            executor.submit(new ServerConnect(latch, pingCount, locator));
          }
 
          try
@@ -101,7 +101,9 @@
             // No-op. The best the quorum can do now is to return the latest number it has
          }
          // -1: because the live server is not being filtered out.
-         return pingCount.get() * 2 >= locatorsList.size() - 1;
+         boolean vote = nodeIsDown(total, pingCount.get());
+         log.trace("quorum vote is liveIsDown=" + vote + ", count=" + pingCount);
+         return vote;
       }
       finally
       {
@@ -119,6 +121,15 @@
       }
    }
 
+   /**
+    * @param tm
+    * @return
+    */
+   private boolean useIt(TopologyMember tm)
+   {
+      return tm.getA() != null && !targetServerID.equals(tm.getA().getName());
+   }
+
    @Override
    public String toString()
    {
@@ -126,6 +137,29 @@
    }
 
    /**
+    * Returns {@link true} when a sufficient number of votes has been cast to take a decision.
+    * @param total the total number of votes
+    * @param pingCount the total number of servers that were reached
+    * @param votesLeft the total number of servers for which we don't have a decision yet.
+    * @return
+    */
+   private static final boolean isSufficient(int total, int pingCount, long votesLeft)
+   {
+      boolean notEnoughVotesLeft = total - 2 * (pingCount + votesLeft) > 0;
+      return nodeIsDown(total, pingCount) || notEnoughVotesLeft;
+   }
+
+   /**
+    * @param total
+    * @param pingCount
+    * @return
+    */
+   private static boolean nodeIsDown(int total, int pingCount)
+   {
+      return pingCount * 2 >= total - 1;
+   }
+
+   /**
     * Attempts to connect to a given server.
     */
    private static class ServerConnect implements Runnable
@@ -133,27 +167,44 @@
       private final ServerLocatorImpl locator;
       private final CountDownLatch latch;
       private final AtomicInteger count;
+      private final TransportConfiguration tc;
+      private final int total;
 
-      public ServerConnect(CountDownLatch latch, AtomicInteger count, ServerLocatorImpl serverLocator)
+      public ServerConnect(CountDownLatch latch, int total, AtomicInteger count,
+                           ServerLocatorImpl serverLocator,
+                           TransportConfiguration serverTC)
       {
-         locator = serverLocator;
-         this.latch = latch;
+         this.total = total;
+         this.locator = serverLocator;
+         this.latch=latch;
          this.count = count;
+         this.tc = serverTC;
       }
 
+
       @Override
       public void run()
       {
          locator.setReconnectAttempts(0);
-         locator.getDiscoveryGroupConfiguration().setDiscoveryInitialWaitTimeout(DISCOVERY_TIMEOUT);
 
-         final ClientSessionFactory liveServerSessionFactory;
+         final ClientSessionFactory sessionFactory;
+         ClientSession session;
          try
          {
-            liveServerSessionFactory = locator.connect();
-            if (liveServerSessionFactory != null)
+            sessionFactory = locator.createSessionFactory(tc);
+            if (sessionFactory != null)
             {
-               count.incrementAndGet();
+               session = sessionFactory.createSession();
+               if (session != null)
+               {
+                  if (isSufficient(total, count.incrementAndGet(), latch.getCount() - 1))
+                  {
+                     while (latch.getCount() > 0)
+                        latch.countDown();
+                  }
+                  session.close();
+                  sessionFactory.close();
+               }
             }
          }
          catch (Exception e)
@@ -171,6 +222,9 @@
    @Override
    public void connectionFailed(HornetQException exception, boolean failedOver)
    {
+      if (signal != null)
+         return;
+
       // Check if connection was reestablished by the sessionFactory:
       if (sessionFactory.numConnections() > 0)
       {
@@ -258,15 +312,26 @@
     */
    public synchronized void causeExit()
    {
+      removeAsFailureListener();
       signal = BACKUP_ACTIVATION.STOP;
       latch.countDown();
    }
 
    /**
+    *
+    */
+   private void removeAsFailureListener()
+   {
+      if (connection != null)
+         connection.removeFailureListener(this);
+   }
+
+   /**
     * Releases the latch, causing the backup activation thread to fail-over.
     */
    public synchronized void failOver()
    {
+      removeAsFailureListener();
       signal = BACKUP_ACTIVATION.FAIL_OVER;
       latch.countDown();
    }
@@ -278,4 +343,13 @@
    {
       this.sessionFactory = sessionFactory;
    }
+
+   /**
+    * @param liveConnection
+    */
+   public void addAsFailureListenerOf(CoreRemotingConnection liveConnection)
+   {
+      this.connection = liveConnection;
+      connection.addFailureListener(this);
+   }
 }

Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/remoting/PingTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/remoting/PingTest.java	2012-03-15 09:56:54 UTC (rev 12303)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/remoting/PingTest.java	2012-03-15 13:11:37 UTC (rev 12304)
@@ -95,8 +95,7 @@
     */
    public void testNoFailureWithPinging() throws Exception
    {
-      TransportConfiguration transportConfig = new TransportConfiguration("org.hornetq.core.remoting.impl.netty.NettyConnectorFactory");
-      ServerLocator locator = addServerLocator(HornetQClient.createServerLocatorWithoutHA(transportConfig));
+      ServerLocator locator = createNettyNonHALocator();
 
       locator.setClientFailureCheckPeriod(PingTest.CLIENT_FAILURE_CHECK_PERIOD);
       locator.setConnectionTTL(PingTest.CLIENT_FAILURE_CHECK_PERIOD * 2);



More information about the hornetq-commits mailing list