[hornetq-commits] JBoss hornetq SVN: r8299 - in trunk: src/main/org/hornetq/core/config/cluster and 5 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Nov 17 14:40:50 EST 2009


Author: timfox
Date: 2009-11-17 14:40:49 -0500 (Tue, 17 Nov 2009)
New Revision: 8299

Modified:
   trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java
   trunk/src/main/org/hornetq/core/config/cluster/BridgeConfiguration.java
   trunk/src/main/org/hornetq/core/config/impl/FileConfiguration.java
   trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java
   trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeStartTest.java
   trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
   trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeWithDiscoveryGroupStartTest.java
   trunk/tests/src/org/hornetq/tests/integration/cluster/failover/ReplicatedDistributionTest.java
   trunk/tests/src/org/hornetq/tests/integration/management/BridgeControlTest.java
   trunk/tests/src/org/hornetq/tests/integration/management/BridgeControlUsingCoreTest.java
   trunk/tests/src/org/hornetq/tests/integration/remoting/ReconnectTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-178

Modified: trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java	2009-11-17 19:38:24 UTC (rev 8298)
+++ trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java	2009-11-17 19:40:49 UTC (rev 8299)
@@ -594,6 +594,8 @@
             catch (Exception ignore)
             {
             }
+            
+            this.cancelPinger();
 
             connector = null;
 
@@ -624,7 +626,7 @@
 
             connection = null;                       
          }      
-                          
+                    
          callFailureListeners(me, true);
          
          if (connection == null)
@@ -658,7 +660,7 @@
       final List<SessionFailureListener> listenersClone = new ArrayList<SessionFailureListener>(listeners);
 
       for (final SessionFailureListener listener : listenersClone)
-      {
+      {       
          try
          {
             if (afterReconnect)
@@ -685,18 +687,18 @@
     */
    private void reconnectSessions(final RemotingConnection oldConnection, final int reconnectAttempts)
    {        
-      RemotingConnection backupConnection = getConnectionWithRetry(reconnectAttempts);
+      RemotingConnection newConnection = getConnectionWithRetry(reconnectAttempts);
       
-      if (backupConnection == null)
+      if (newConnection == null)
       {
          log.warn("Failed to connect to server.");
 
          return;
       }
-      
+                 
       List<FailureListener> oldListeners = oldConnection.getFailureListeners();
       
-      List<FailureListener> newListeners = new ArrayList<FailureListener>(backupConnection.getFailureListeners());
+      List<FailureListener> newListeners = new ArrayList<FailureListener>(newConnection.getFailureListeners());
 
       for (FailureListener listener : oldListeners)
       {
@@ -708,11 +710,11 @@
          }
       }
 
-      backupConnection.setFailureListeners(newListeners);
+      newConnection.setFailureListeners(newListeners);
 
       for (ClientSessionInternal session : sessions)
       {
-         session.handleFailover(backupConnection);
+         session.handleFailover(newConnection);
       }
    }
 
@@ -775,22 +777,27 @@
          }
       }
    }
+   
+   private void cancelPinger()
+   {
+      if (pingerFuture != null)
+      {
+         pingRunnable.cancel();
 
+         pingerFuture.cancel(false);
+
+         pingRunnable = null;
+
+         pingerFuture = null;
+      }
+   }
+
    private void checkCloseConnection()
    {
       if (connection != null && sessions.size() == 0)
       {
-         if (pingerFuture != null)
-         {
-            pingRunnable.cancel();
+         cancelPinger();
 
-            pingerFuture.cancel(false);
-
-            pingRunnable = null;
-
-            pingerFuture = null;
-         }
-
          try
          {
             connection.destroy();
@@ -817,7 +824,7 @@
    }
 
    public RemotingConnection getConnection()
-   {
+   {     
       if (connection == null)
       {
          Connection tc = null;
@@ -922,7 +929,7 @@
             }
          }
       }
-
+      
       return connection;
    }
 
@@ -1082,6 +1089,8 @@
                final HornetQException me = new HornetQException(HornetQException.CONNECTION_TIMEDOUT,
                                                                 "Did not receive data from server for " + connection.getTransportConnection());
 
+               cancelled = true;
+               
                threadPool.execute(new Runnable()
                {
                   // Must be executed on different thread
@@ -1090,7 +1099,7 @@
                      connection.fail(me);
                   }
                });
-
+                              
                return;
             }
             else

Modified: trunk/src/main/org/hornetq/core/config/cluster/BridgeConfiguration.java
===================================================================
--- trunk/src/main/org/hornetq/core/config/cluster/BridgeConfiguration.java	2009-11-17 19:38:24 UTC (rev 8298)
+++ trunk/src/main/org/hornetq/core/config/cluster/BridgeConfiguration.java	2009-11-17 19:40:49 UTC (rev 8299)
@@ -55,7 +55,9 @@
    private boolean useDuplicateDetection;
    
    private int confirmationWindowSize;
-
+   
+   private long clientFailureCheckPeriod;
+    
    public BridgeConfiguration(final String name,
                               final String queueName,
                               final String forwardingAddress,
@@ -67,6 +69,7 @@
                               final boolean failoverOnServerShutdown,
                               final boolean useDuplicateDetection,
                               final int confirmationWindowSize,
+                              final long clientFailureCheckPeriod,
                               final Pair<String, String> connectorPair)
    {
       this.name = name;
@@ -80,6 +83,7 @@
       this.failoverOnServerShutdown = failoverOnServerShutdown;
       this.useDuplicateDetection = useDuplicateDetection;
       this.confirmationWindowSize = confirmationWindowSize;
+      this.clientFailureCheckPeriod = clientFailureCheckPeriod;
       this.connectorPair = connectorPair;
       this.discoveryGroupName = null;
    }
@@ -95,6 +99,7 @@
                               final boolean failoverOnServerShutdown,
                               final boolean useDuplicateDetection,
                               final int confirmationWindowSize,
+                              final long clientFailureCheckPeriod,
                               final String discoveryGroupName)
    {
       this.name = name;
@@ -108,6 +113,7 @@
       this.failoverOnServerShutdown = failoverOnServerShutdown;
       this.useDuplicateDetection = useDuplicateDetection;
       this.confirmationWindowSize = confirmationWindowSize;
+      this.clientFailureCheckPeriod = clientFailureCheckPeriod;
       this.connectorPair = null;
       this.discoveryGroupName = discoveryGroupName;
    }
@@ -176,6 +182,11 @@
    {
       return confirmationWindowSize;
    }
+   
+   public long getClientFailureCheckPeriod()
+   {
+      return clientFailureCheckPeriod;
+   }
 
    /**
     * @param name the name to set

Modified: trunk/src/main/org/hornetq/core/config/impl/FileConfiguration.java
===================================================================
--- trunk/src/main/org/hornetq/core/config/impl/FileConfiguration.java	2009-11-17 19:38:24 UTC (rev 8298)
+++ trunk/src/main/org/hornetq/core/config/impl/FileConfiguration.java	2009-11-17 19:40:49 UTC (rev 8299)
@@ -681,6 +681,7 @@
                                           failoverOnServerShutdown,
                                           useDuplicateDetection,
                                           confirmationWindowSize,
+                                          ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
                                           connectorPair);
       }
       else
@@ -696,6 +697,7 @@
                                           failoverOnServerShutdown,
                                           useDuplicateDetection,
                                           confirmationWindowSize,
+                                          ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
                                           discoveryGroupName);
       }
 

Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java	2009-11-17 19:38:24 UTC (rev 8298)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java	2009-11-17 19:40:49 UTC (rev 8299)
@@ -17,6 +17,8 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.hornetq.core.client.ClientConsumer;
 import org.hornetq.core.client.ClientMessage;
@@ -29,6 +31,7 @@
 import org.hornetq.core.config.cluster.QueueConfiguration;
 import org.hornetq.core.exception.HornetQException;
 import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.FailureListener;
 import org.hornetq.core.remoting.RemotingConnection;
 import org.hornetq.core.remoting.impl.invm.InVMConnector;
 import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
@@ -124,6 +127,7 @@
                                                                         true,
                                                                         false,
                                                                         confirmationWindowSize,
+                                                                        ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
                                                                         connectorPair);
 
       List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
@@ -246,6 +250,7 @@
                                                                         true,
                                                                         false,
                                                                         confirmationWindowSize,
+                                                                        ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
                                                                         connectorPair);
 
       List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
@@ -363,6 +368,7 @@
                                                                         true,
                                                                         false,
                                                                         confirmationWindowSize,
+                                                                        ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
                                                                         connectorPair);
 
       List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
@@ -432,9 +438,21 @@
       assertEquals(0, server0.getRemotingService().getConnections().size());
       assertEquals(0, server1.getRemotingService().getConnections().size());
    }
-
+   
+   //We test that we can pause more than client failure check period (to prompt the pinger to failing)
+   //before reconnecting
+   public void testShutdownServerCleanlyAndReconnectSameNodeWithSleep() throws Exception
+   {
+      testShutdownServerCleanlyAndReconnectSameNode(true);
+   }
+   
    public void testShutdownServerCleanlyAndReconnectSameNode() throws Exception
    {
+      testShutdownServerCleanlyAndReconnectSameNode(false);
+   }
+   
+   private void testShutdownServerCleanlyAndReconnectSameNode(final boolean sleep) throws Exception
+   {
       Map<String, Object> server0Params = new HashMap<String, Object>();
       HornetQServer server0 = createHornetQServer(0, isNetty(), server0Params);
 
@@ -461,6 +479,7 @@
       final double retryIntervalMultiplier = 1d;
       final int reconnectAttempts = -1;
       final int confirmationWindowSize = 1024;
+      final long clientFailureCheckPeriod = 1000;
 
       Pair<String, String> connectorPair = new Pair<String, String>(server1tc.getName(), null);
 
@@ -475,6 +494,7 @@
                                                                         true,
                                                                         false,
                                                                         confirmationWindowSize,
+                                                                        clientFailureCheckPeriod,
                                                                         connectorPair);
 
       List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
@@ -501,6 +521,12 @@
 
       log.info("stopping server1");
       server1.stop();
+      
+      if (sleep)
+      {
+         Thread.sleep(2 * clientFailureCheckPeriod);
+      }
+      
       log.info("restarting server1");
       server1.start();
       log.info("server 1 restarted");
@@ -591,6 +617,7 @@
                                                                         true,
                                                                         false,
                                                                         confirmationWindowSize,
+                                                                        ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
                                                                         connectorPair);
 
       List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();

Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeStartTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeStartTest.java	2009-11-17 19:38:24 UTC (rev 8298)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeStartTest.java	2009-11-17 19:40:49 UTC (rev 8299)
@@ -110,6 +110,7 @@
                                                                         true,
                                                                         true,
                                                                         1024,
+                                                                        ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
                                                                         connectorPair);
 
       List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
@@ -259,6 +260,7 @@
                                                                         true,
                                                                         true,
                                                                         1024,
+                                                                        ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
                                                                         connectorPair);
 
       List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
@@ -455,6 +457,7 @@
                                                                         false,
                                                                         false,
                                                                         1024,
+                                                                        ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
                                                                         connectorPair);
 
       List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
@@ -589,6 +592,7 @@
                                                                         false,
                                                                         true,
                                                                         1024,
+                                                                        ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
                                                                         connectorPair);
 
       List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();

Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java	2009-11-17 19:38:24 UTC (rev 8298)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java	2009-11-17 19:40:49 UTC (rev 8299)
@@ -136,6 +136,7 @@
                                                                            // Choose confirmation size to make sure acks
                                                                            // are sent
                                                                            numMessages * messageSize / 2,
+                                                                           ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
                                                                            connectorPair);
 
          List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
@@ -329,6 +330,7 @@
                                                                            true,
                                                                            false,
                                                                            1024,
+                                                                           ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
                                                                            connectorPair);
 
          List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
@@ -497,6 +499,7 @@
                                                                         true,
                                                                         false,
                                                                         1024,
+                                                                        ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
                                                                         connectorPair);
 
       List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
@@ -625,6 +628,7 @@
                                                                            true,
                                                                            false,
                                                                            1024,
+                                                                           ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
                                                                            connectorPair);
 
          List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();

Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeWithDiscoveryGroupStartTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeWithDiscoveryGroupStartTest.java	2009-11-17 19:38:24 UTC (rev 8298)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeWithDiscoveryGroupStartTest.java	2009-11-17 19:40:49 UTC (rev 8299)
@@ -116,6 +116,7 @@
                                                                         true,
                                                                         true,
                                                                         1024,
+                                                                        ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
                                                                         "dg1");
 
       List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();

Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/failover/ReplicatedDistributionTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/failover/ReplicatedDistributionTest.java	2009-11-17 19:38:24 UTC (rev 8298)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/failover/ReplicatedDistributionTest.java	2009-11-17 19:40:49 UTC (rev 8299)
@@ -38,10 +38,9 @@
  */
 public class ReplicatedDistributionTest extends ClusterTestBase
 {
-
    // Constants -----------------------------------------------------
 
-   static final SimpleString ADDRESS = new SimpleString("test.SomeAddress");
+   private static final SimpleString ADDRESS = new SimpleString("test.SomeAddress");
 
    // Attributes ----------------------------------------------------
 

Modified: trunk/tests/src/org/hornetq/tests/integration/management/BridgeControlTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/management/BridgeControlTest.java	2009-11-17 19:38:24 UTC (rev 8298)
+++ trunk/tests/src/org/hornetq/tests/integration/management/BridgeControlTest.java	2009-11-17 19:40:49 UTC (rev 8299)
@@ -26,6 +26,7 @@
 import javax.management.MBeanServer;
 import javax.management.MBeanServerFactory;
 
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
 import org.hornetq.core.config.Configuration;
 import org.hornetq.core.config.TransportConfiguration;
 import org.hornetq.core.config.cluster.BridgeConfiguration;
@@ -163,6 +164,7 @@
                                              randomBoolean(),
                                              randomBoolean(),
                                              randomPositiveInt(),
+                                             ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
                                              connectorPair);
 
       Configuration conf_1 = new ConfigurationImpl();

Modified: trunk/tests/src/org/hornetq/tests/integration/management/BridgeControlUsingCoreTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/management/BridgeControlUsingCoreTest.java	2009-11-17 19:38:24 UTC (rev 8298)
+++ trunk/tests/src/org/hornetq/tests/integration/management/BridgeControlUsingCoreTest.java	2009-11-17 19:40:49 UTC (rev 8299)
@@ -143,6 +143,7 @@
                                              randomBoolean(),
                                              randomBoolean(),
                                              randomPositiveInt(),
+                                             ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
                                              connectorPair);
 
       Configuration conf_1 = new ConfigurationImpl();

Modified: trunk/tests/src/org/hornetq/tests/integration/remoting/ReconnectTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/remoting/ReconnectTest.java	2009-11-17 19:38:24 UTC (rev 8298)
+++ trunk/tests/src/org/hornetq/tests/integration/remoting/ReconnectTest.java	2009-11-17 19:40:49 UTC (rev 8299)
@@ -17,12 +17,11 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import junit.framework.TestSuite;
-
 import org.hornetq.core.client.ClientSessionFactory;
+import org.hornetq.core.client.SessionFailureListener;
 import org.hornetq.core.client.impl.ClientSessionInternal;
 import org.hornetq.core.exception.HornetQException;
-import org.hornetq.core.remoting.FailureListener;
+import org.hornetq.core.logging.Logger;
 import org.hornetq.core.server.HornetQServer;
 import org.hornetq.tests.util.ServiceTestBase;
 
@@ -38,6 +37,8 @@
 
    // Constants -----------------------------------------------------
 
+   private static final Logger log = Logger.getLogger(ReconnectTest.class);
+
    // Attributes ----------------------------------------------------
 
    // Static --------------------------------------------------------
@@ -46,19 +47,6 @@
 
    // Public --------------------------------------------------------
 
-   // This is a hack to remove this test from the testsuite
-   // Remove this method to enable this Test on the testsuite.
-   // You can still run tests individually on eclipse, but not on the testsuite
-   public static TestSuite suite()
-   {
-      TestSuite suite = new TestSuite();
-
-      // System.out -> JUnit report
-      System.out.println("Test ReconnectTest being ignored for now!");
-
-      return suite;
-   }
-
    public void testReconnectNetty() throws Exception
    {
       internalTestReconnect(true);
@@ -71,9 +59,8 @@
 
    public void internalTestReconnect(final boolean isNetty) throws Exception
    {
-
       final int pingPeriod = 1000;
-      
+
       HornetQServer server = createServer(false, isNetty);
 
       server.start();
@@ -97,7 +84,7 @@
 
          final CountDownLatch latch = new CountDownLatch(1);
 
-         session.getConnection().addFailureListener(new FailureListener()
+         session.addFailureListener(new SessionFailureListener()
          {
 
             public void connectionFailed(final HornetQException me)
@@ -106,12 +93,14 @@
                latch.countDown();
             }
 
+            public void beforeReconnect(HornetQException exception)
+            {
+            }
+
          });
 
          server.stop();
 
-         // I couldn't find a way to install a latch here as I couldn't just use the FailureListener
-         // as the FailureListener won't be informed until the reconnection process is done.
          Thread.sleep((int)(pingPeriod * 2));
 
          server.start();



More information about the hornetq-commits mailing list