Author: timfox
Date: 2009-11-17 14:40:49 -0500 (Tue, 17 Nov 2009)
New Revision: 8299
Modified:
trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java
trunk/src/main/org/hornetq/core/config/cluster/BridgeConfiguration.java
trunk/src/main/org/hornetq/core/config/impl/FileConfiguration.java
trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java
trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeStartTest.java
trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeWithDiscoveryGroupStartTest.java
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/ReplicatedDistributionTest.java
trunk/tests/src/org/hornetq/tests/integration/management/BridgeControlTest.java
trunk/tests/src/org/hornetq/tests/integration/management/BridgeControlUsingCoreTest.java
trunk/tests/src/org/hornetq/tests/integration/remoting/ReconnectTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-178
Modified: trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java 2009-11-17
19:38:24 UTC (rev 8298)
+++ trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java 2009-11-17
19:40:49 UTC (rev 8299)
@@ -594,6 +594,8 @@
catch (Exception ignore)
{
}
+
+ this.cancelPinger();
connector = null;
@@ -624,7 +626,7 @@
connection = null;
}
-
+
callFailureListeners(me, true);
if (connection == null)
@@ -658,7 +660,7 @@
final List<SessionFailureListener> listenersClone = new
ArrayList<SessionFailureListener>(listeners);
for (final SessionFailureListener listener : listenersClone)
- {
+ {
try
{
if (afterReconnect)
@@ -685,18 +687,18 @@
*/
private void reconnectSessions(final RemotingConnection oldConnection, final int
reconnectAttempts)
{
- RemotingConnection backupConnection = getConnectionWithRetry(reconnectAttempts);
+ RemotingConnection newConnection = getConnectionWithRetry(reconnectAttempts);
- if (backupConnection == null)
+ if (newConnection == null)
{
log.warn("Failed to connect to server.");
return;
}
-
+
List<FailureListener> oldListeners = oldConnection.getFailureListeners();
- List<FailureListener> newListeners = new
ArrayList<FailureListener>(backupConnection.getFailureListeners());
+ List<FailureListener> newListeners = new
ArrayList<FailureListener>(newConnection.getFailureListeners());
for (FailureListener listener : oldListeners)
{
@@ -708,11 +710,11 @@
}
}
- backupConnection.setFailureListeners(newListeners);
+ newConnection.setFailureListeners(newListeners);
for (ClientSessionInternal session : sessions)
{
- session.handleFailover(backupConnection);
+ session.handleFailover(newConnection);
}
}
@@ -775,22 +777,27 @@
}
}
}
+
+ private void cancelPinger()
+ {
+ if (pingerFuture != null)
+ {
+ pingRunnable.cancel();
+ pingerFuture.cancel(false);
+
+ pingRunnable = null;
+
+ pingerFuture = null;
+ }
+ }
+
private void checkCloseConnection()
{
if (connection != null && sessions.size() == 0)
{
- if (pingerFuture != null)
- {
- pingRunnable.cancel();
+ cancelPinger();
- pingerFuture.cancel(false);
-
- pingRunnable = null;
-
- pingerFuture = null;
- }
-
try
{
connection.destroy();
@@ -817,7 +824,7 @@
}
public RemotingConnection getConnection()
- {
+ {
if (connection == null)
{
Connection tc = null;
@@ -922,7 +929,7 @@
}
}
}
-
+
return connection;
}
@@ -1082,6 +1089,8 @@
final HornetQException me = new
HornetQException(HornetQException.CONNECTION_TIMEDOUT,
"Did not receive
data from server for " + connection.getTransportConnection());
+ cancelled = true;
+
threadPool.execute(new Runnable()
{
// Must be executed on different thread
@@ -1090,7 +1099,7 @@
connection.fail(me);
}
});
-
+
return;
}
else
Modified: trunk/src/main/org/hornetq/core/config/cluster/BridgeConfiguration.java
===================================================================
--- trunk/src/main/org/hornetq/core/config/cluster/BridgeConfiguration.java 2009-11-17
19:38:24 UTC (rev 8298)
+++ trunk/src/main/org/hornetq/core/config/cluster/BridgeConfiguration.java 2009-11-17
19:40:49 UTC (rev 8299)
@@ -55,7 +55,9 @@
private boolean useDuplicateDetection;
private int confirmationWindowSize;
-
+
+ private long clientFailureCheckPeriod;
+
public BridgeConfiguration(final String name,
final String queueName,
final String forwardingAddress,
@@ -67,6 +69,7 @@
final boolean failoverOnServerShutdown,
final boolean useDuplicateDetection,
final int confirmationWindowSize,
+ final long clientFailureCheckPeriod,
final Pair<String, String> connectorPair)
{
this.name = name;
@@ -80,6 +83,7 @@
this.failoverOnServerShutdown = failoverOnServerShutdown;
this.useDuplicateDetection = useDuplicateDetection;
this.confirmationWindowSize = confirmationWindowSize;
+ this.clientFailureCheckPeriod = clientFailureCheckPeriod;
this.connectorPair = connectorPair;
this.discoveryGroupName = null;
}
@@ -95,6 +99,7 @@
final boolean failoverOnServerShutdown,
final boolean useDuplicateDetection,
final int confirmationWindowSize,
+ final long clientFailureCheckPeriod,
final String discoveryGroupName)
{
this.name = name;
@@ -108,6 +113,7 @@
this.failoverOnServerShutdown = failoverOnServerShutdown;
this.useDuplicateDetection = useDuplicateDetection;
this.confirmationWindowSize = confirmationWindowSize;
+ this.clientFailureCheckPeriod = clientFailureCheckPeriod;
this.connectorPair = null;
this.discoveryGroupName = discoveryGroupName;
}
@@ -176,6 +182,11 @@
{
return confirmationWindowSize;
}
+
+ public long getClientFailureCheckPeriod()
+ {
+ return clientFailureCheckPeriod;
+ }
/**
* @param name the name to set
Modified: trunk/src/main/org/hornetq/core/config/impl/FileConfiguration.java
===================================================================
--- trunk/src/main/org/hornetq/core/config/impl/FileConfiguration.java 2009-11-17 19:38:24
UTC (rev 8298)
+++ trunk/src/main/org/hornetq/core/config/impl/FileConfiguration.java 2009-11-17 19:40:49
UTC (rev 8299)
@@ -681,6 +681,7 @@
failoverOnServerShutdown,
useDuplicateDetection,
confirmationWindowSize,
+
ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
connectorPair);
}
else
@@ -696,6 +697,7 @@
failoverOnServerShutdown,
useDuplicateDetection,
confirmationWindowSize,
+
ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
discoveryGroupName);
}
Modified:
trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java 2009-11-17
19:38:24 UTC (rev 8298)
+++
trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java 2009-11-17
19:40:49 UTC (rev 8299)
@@ -17,6 +17,8 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
import org.hornetq.core.client.ClientConsumer;
import org.hornetq.core.client.ClientMessage;
@@ -29,6 +31,7 @@
import org.hornetq.core.config.cluster.QueueConfiguration;
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.FailureListener;
import org.hornetq.core.remoting.RemotingConnection;
import org.hornetq.core.remoting.impl.invm.InVMConnector;
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
@@ -124,6 +127,7 @@
true,
false,
confirmationWindowSize,
+
ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
connectorPair);
List<BridgeConfiguration> bridgeConfigs = new
ArrayList<BridgeConfiguration>();
@@ -246,6 +250,7 @@
true,
false,
confirmationWindowSize,
+
ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
connectorPair);
List<BridgeConfiguration> bridgeConfigs = new
ArrayList<BridgeConfiguration>();
@@ -363,6 +368,7 @@
true,
false,
confirmationWindowSize,
+
ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
connectorPair);
List<BridgeConfiguration> bridgeConfigs = new
ArrayList<BridgeConfiguration>();
@@ -432,9 +438,21 @@
assertEquals(0, server0.getRemotingService().getConnections().size());
assertEquals(0, server1.getRemotingService().getConnections().size());
}
-
+
+ //We test that we can pause more than client failure check period (to prompt the
pinger to failing)
+ //before reconnecting
+ public void testShutdownServerCleanlyAndReconnectSameNodeWithSleep() throws Exception
+ {
+ testShutdownServerCleanlyAndReconnectSameNode(true);
+ }
+
public void testShutdownServerCleanlyAndReconnectSameNode() throws Exception
{
+ testShutdownServerCleanlyAndReconnectSameNode(false);
+ }
+
+ private void testShutdownServerCleanlyAndReconnectSameNode(final boolean sleep) throws
Exception
+ {
Map<String, Object> server0Params = new HashMap<String, Object>();
HornetQServer server0 = createHornetQServer(0, isNetty(), server0Params);
@@ -461,6 +479,7 @@
final double retryIntervalMultiplier = 1d;
final int reconnectAttempts = -1;
final int confirmationWindowSize = 1024;
+ final long clientFailureCheckPeriod = 1000;
Pair<String, String> connectorPair = new Pair<String,
String>(server1tc.getName(), null);
@@ -475,6 +494,7 @@
true,
false,
confirmationWindowSize,
+
clientFailureCheckPeriod,
connectorPair);
List<BridgeConfiguration> bridgeConfigs = new
ArrayList<BridgeConfiguration>();
@@ -501,6 +521,12 @@
log.info("stopping server1");
server1.stop();
+
+ if (sleep)
+ {
+ Thread.sleep(2 * clientFailureCheckPeriod);
+ }
+
log.info("restarting server1");
server1.start();
log.info("server 1 restarted");
@@ -591,6 +617,7 @@
true,
false,
confirmationWindowSize,
+
ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
connectorPair);
List<BridgeConfiguration> bridgeConfigs = new
ArrayList<BridgeConfiguration>();
Modified:
trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeStartTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeStartTest.java 2009-11-17
19:38:24 UTC (rev 8298)
+++
trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeStartTest.java 2009-11-17
19:40:49 UTC (rev 8299)
@@ -110,6 +110,7 @@
true,
true,
1024,
+
ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
connectorPair);
List<BridgeConfiguration> bridgeConfigs = new
ArrayList<BridgeConfiguration>();
@@ -259,6 +260,7 @@
true,
true,
1024,
+
ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
connectorPair);
List<BridgeConfiguration> bridgeConfigs = new
ArrayList<BridgeConfiguration>();
@@ -455,6 +457,7 @@
false,
false,
1024,
+
ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
connectorPair);
List<BridgeConfiguration> bridgeConfigs = new
ArrayList<BridgeConfiguration>();
@@ -589,6 +592,7 @@
false,
true,
1024,
+
ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
connectorPair);
List<BridgeConfiguration> bridgeConfigs = new
ArrayList<BridgeConfiguration>();
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java 2009-11-17
19:38:24 UTC (rev 8298)
+++
trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java 2009-11-17
19:40:49 UTC (rev 8299)
@@ -136,6 +136,7 @@
// Choose
confirmation size to make sure acks
// are sent
numMessages *
messageSize / 2,
+
ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
connectorPair);
List<BridgeConfiguration> bridgeConfigs = new
ArrayList<BridgeConfiguration>();
@@ -329,6 +330,7 @@
true,
false,
1024,
+
ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
connectorPair);
List<BridgeConfiguration> bridgeConfigs = new
ArrayList<BridgeConfiguration>();
@@ -497,6 +499,7 @@
true,
false,
1024,
+
ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
connectorPair);
List<BridgeConfiguration> bridgeConfigs = new
ArrayList<BridgeConfiguration>();
@@ -625,6 +628,7 @@
true,
false,
1024,
+
ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
connectorPair);
List<BridgeConfiguration> bridgeConfigs = new
ArrayList<BridgeConfiguration>();
Modified:
trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeWithDiscoveryGroupStartTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeWithDiscoveryGroupStartTest.java 2009-11-17
19:38:24 UTC (rev 8298)
+++
trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeWithDiscoveryGroupStartTest.java 2009-11-17
19:40:49 UTC (rev 8299)
@@ -116,6 +116,7 @@
true,
true,
1024,
+
ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
"dg1");
List<BridgeConfiguration> bridgeConfigs = new
ArrayList<BridgeConfiguration>();
Modified:
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/ReplicatedDistributionTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/ReplicatedDistributionTest.java 2009-11-17
19:38:24 UTC (rev 8298)
+++
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/ReplicatedDistributionTest.java 2009-11-17
19:40:49 UTC (rev 8299)
@@ -38,10 +38,9 @@
*/
public class ReplicatedDistributionTest extends ClusterTestBase
{
-
// Constants -----------------------------------------------------
- static final SimpleString ADDRESS = new SimpleString("test.SomeAddress");
+ private static final SimpleString ADDRESS = new
SimpleString("test.SomeAddress");
// Attributes ----------------------------------------------------
Modified: trunk/tests/src/org/hornetq/tests/integration/management/BridgeControlTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/management/BridgeControlTest.java 2009-11-17
19:38:24 UTC (rev 8298)
+++
trunk/tests/src/org/hornetq/tests/integration/management/BridgeControlTest.java 2009-11-17
19:40:49 UTC (rev 8299)
@@ -26,6 +26,7 @@
import javax.management.MBeanServer;
import javax.management.MBeanServerFactory;
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.TransportConfiguration;
import org.hornetq.core.config.cluster.BridgeConfiguration;
@@ -163,6 +164,7 @@
randomBoolean(),
randomBoolean(),
randomPositiveInt(),
+
ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
connectorPair);
Configuration conf_1 = new ConfigurationImpl();
Modified:
trunk/tests/src/org/hornetq/tests/integration/management/BridgeControlUsingCoreTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/management/BridgeControlUsingCoreTest.java 2009-11-17
19:38:24 UTC (rev 8298)
+++
trunk/tests/src/org/hornetq/tests/integration/management/BridgeControlUsingCoreTest.java 2009-11-17
19:40:49 UTC (rev 8299)
@@ -143,6 +143,7 @@
randomBoolean(),
randomBoolean(),
randomPositiveInt(),
+
ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
connectorPair);
Configuration conf_1 = new ConfigurationImpl();
Modified: trunk/tests/src/org/hornetq/tests/integration/remoting/ReconnectTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/remoting/ReconnectTest.java 2009-11-17
19:38:24 UTC (rev 8298)
+++ trunk/tests/src/org/hornetq/tests/integration/remoting/ReconnectTest.java 2009-11-17
19:40:49 UTC (rev 8299)
@@ -17,12 +17,11 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import junit.framework.TestSuite;
-
import org.hornetq.core.client.ClientSessionFactory;
+import org.hornetq.core.client.SessionFailureListener;
import org.hornetq.core.client.impl.ClientSessionInternal;
import org.hornetq.core.exception.HornetQException;
-import org.hornetq.core.remoting.FailureListener;
+import org.hornetq.core.logging.Logger;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.tests.util.ServiceTestBase;
@@ -38,6 +37,8 @@
// Constants -----------------------------------------------------
+ private static final Logger log = Logger.getLogger(ReconnectTest.class);
+
// Attributes ----------------------------------------------------
// Static --------------------------------------------------------
@@ -46,19 +47,6 @@
// Public --------------------------------------------------------
- // This is a hack to remove this test from the testsuite
- // Remove this method to enable this Test on the testsuite.
- // You can still run tests individually on eclipse, but not on the testsuite
- public static TestSuite suite()
- {
- TestSuite suite = new TestSuite();
-
- // System.out -> JUnit report
- System.out.println("Test ReconnectTest being ignored for now!");
-
- return suite;
- }
-
public void testReconnectNetty() throws Exception
{
internalTestReconnect(true);
@@ -71,9 +59,8 @@
public void internalTestReconnect(final boolean isNetty) throws Exception
{
-
final int pingPeriod = 1000;
-
+
HornetQServer server = createServer(false, isNetty);
server.start();
@@ -97,7 +84,7 @@
final CountDownLatch latch = new CountDownLatch(1);
- session.getConnection().addFailureListener(new FailureListener()
+ session.addFailureListener(new SessionFailureListener()
{
public void connectionFailed(final HornetQException me)
@@ -106,12 +93,14 @@
latch.countDown();
}
+ public void beforeReconnect(HornetQException exception)
+ {
+ }
+
});
server.stop();
- // I couldn't find a way to install a latch here as I couldn't just use
the FailureListener
- // as the FailureListener won't be informed until the reconnection process
is done.
Thread.sleep((int)(pingPeriod * 2));
server.start();