Author: ataylor
Date: 2010-11-01 05:44:00 -0400 (Mon, 01 Nov 2010)
New Revision: 9825
Modified:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTestBase.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/jms/cluster/JMSFailoverTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/jms/server/management/JMSUtil.java
Log:
test fixes
Modified:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java
===================================================================
---
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java 2010-10-30
13:58:39 UTC (rev 9824)
+++
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java 2010-11-01
09:44:00 UTC (rev 9825)
@@ -33,8 +33,10 @@
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.NodeManager;
import org.hornetq.core.server.cluster.Bridge;
import org.hornetq.core.server.cluster.impl.BridgeImpl;
+import org.hornetq.core.server.impl.InVMNodeManager;
import org.hornetq.spi.core.protocol.RemotingConnection;
/**
@@ -73,14 +75,15 @@
// Fail bridge and reconnecting immediately
public void testFailoverAndReconnectImmediately() throws Exception
{
+ NodeManager nodeManager = new InVMNodeManager();
Map<String, Object> server0Params = new HashMap<String, Object>();
- HornetQServer server0 = createHornetQServer(0, isNetty(), server0Params);
+ HornetQServer server0 = createHornetQServer(0, server0Params, isNetty(),
nodeManager);
Map<String, Object> server1Params = new HashMap<String, Object>();
HornetQServer server1 = createHornetQServer(1, isNetty(), server1Params);
Map<String, Object> server2Params = new HashMap<String, Object>();
- HornetQServer service2 = createHornetQServer(2, server2Params, isNetty(), true);
+ HornetQServer service2 = createBackupHornetQServer(2, server2Params, isNetty(), 0,
nodeManager);
TransportConfiguration server0tc = new TransportConfiguration(getConnector(),
server0Params, "server0tc");
@@ -145,26 +148,34 @@
server1.start();
server0.start();
+
+
+ BridgeReconnectTest.log.info("** failing connection");
+ // Now we will simulate a failure of the bridge connection between server0 and
server1
+ /*Bridge bridge = server0.getClusterManager().getBridges().get(bridgeName);
+ RemotingConnection forwardingConnection = getForwardingConnection(bridge);
+ forwardingConnection.fail(new HornetQException(HornetQException.NOT_CONNECTED));*/
+ server0.kill();
+
+ waitForServerStart(service2);
+
ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(server0tc,
server2tc);
- ClientSessionFactory csf0 = locator.createSessionFactory(server0tc);
+ ClientSessionFactory csf0 = locator.createSessionFactory(server2tc);
+
ClientSession session0 = csf0.createSession(false, true, true);
+
+ ClientProducer prod0 = session0.createProducer(testAddress);
+
ClientSessionFactory csf2 = locator.createSessionFactory(server2tc);
+
ClientSession session2 = csf2.createSession(false, true, true);
- ClientProducer prod0 = session0.createProducer(testAddress);
-
ClientConsumer cons2 = session2.createConsumer(queueName0);
session2.start();
-
- BridgeReconnectTest.log.info("** failing connection");
- // Now we will simulate a failure of the bridge connection between server0 and
server1
- Bridge bridge = server0.getClusterManager().getBridges().get(bridgeName);
- RemotingConnection forwardingConnection = getForwardingConnection(bridge);
- forwardingConnection.fail(new HornetQException(HornetQException.NOT_CONNECTED));
-
+
final int numMessages = 10;
SimpleString propKey = new SimpleString("propkey");
@@ -198,17 +209,20 @@
Assert.assertEquals(0, service2.getRemotingService().getConnections().size());
}
+
// Fail bridge and attempt failover a few times before succeeding
public void testFailoverAndReconnectAfterAFewTries() throws Exception
{
+ NodeManager nodeManager = new InVMNodeManager();
+
Map<String, Object> server0Params = new HashMap<String, Object>();
- HornetQServer server0 = createHornetQServer(0, isNetty(), server0Params);
+ HornetQServer server0 = createHornetQServer(0, server0Params, isNetty(),
nodeManager);
Map<String, Object> server1Params = new HashMap<String, Object>();
HornetQServer server1 = createHornetQServer(1, isNetty(), server1Params);
Map<String, Object> server2Params = new HashMap<String, Object>();
- HornetQServer service2 = createHornetQServer(2, server2Params, isNetty(), true);
+ HornetQServer service2 = createBackupHornetQServer(2, server2Params, isNetty(), 0,
nodeManager);
TransportConfiguration server0tc = new TransportConfiguration(getConnector(),
server0Params, "server0tc");
@@ -272,8 +286,13 @@
service2.start();
server1.start();
server0.start();
- ServerLocator locator = HornetQClient.createServerLocatorWithHA(server0tc,
server2tc);
- ClientSessionFactory csf0 = locator.createSessionFactory(server0tc);
+ // Now we will simulate a failure of the bridge connection between server0 and
server1
+ server0.kill();
+
+
+ ServerLocator locator = HornetQClient.createServerLocatorWithHA(server2tc);
+ locator.setReconnectAttempts(100);
+ ClientSessionFactory csf0 = locator.createSessionFactory(server2tc);
ClientSession session0 = csf0.createSession(false, true, true);
ClientSessionFactory csf2 = locator.createSessionFactory(server2tc);
@@ -284,17 +303,8 @@
ClientConsumer cons2 = session2.createConsumer(queueName0);
session2.start();
+
- // Now we will simulate a failure of the bridge connection between server0 and
server1
- Bridge bridge = server0.getClusterManager().getBridges().get(bridgeName);
- RemotingConnection forwardingConnection = getForwardingConnection(bridge);
- InVMConnector.failOnCreateConnection = true;
- InVMConnector.numberOfFailures = reconnectAttempts - 1;
- forwardingConnection.fail(new HornetQException(HornetQException.NOT_CONNECTED));
-
- forwardingConnection = getForwardingConnection(bridge);
- forwardingConnection.fail(new HornetQException(HornetQException.NOT_CONNECTED));
-
final int numMessages = 10;
SimpleString propKey = new SimpleString("propkey");
Modified:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTestBase.java
===================================================================
---
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTestBase.java 2010-10-30
13:58:39 UTC (rev 9824)
+++
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTestBase.java 2010-11-01
09:44:00 UTC (rev 9825)
@@ -23,8 +23,14 @@
import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.HornetQServers;
+import org.hornetq.core.server.NodeManager;
+import org.hornetq.core.server.impl.HornetQServerImpl;
+import org.hornetq.spi.core.security.HornetQSecurityManager;
+import org.hornetq.tests.integration.cluster.distribution.ClusterTestBase;
import org.hornetq.tests.util.UnitTestCase;
+import javax.management.MBeanServer;
+
/**
* A BridgeTestBase
*
@@ -75,18 +81,22 @@
protected HornetQServer createHornetQServer(final int id, final boolean netty, final
Map<String, Object> params)
{
- return createHornetQServer(id, params, netty, false);
+ return createHornetQServer(id, params, netty, null);
}
+ protected HornetQServer createHornetQServer(final int id, final boolean netty, final
Map<String, Object> params, NodeManager nodeManager)
+ {
+ return createHornetQServer(id, params, netty, nodeManager);
+ }
+
protected HornetQServer createHornetQServer(final int id,
final Map<String, Object> params,
final boolean netty,
- final boolean backup)
+ final NodeManager nodeManager)
{
Configuration serviceConf = new ConfigurationImpl();
serviceConf.setClustered(true);
serviceConf.setSecurityEnabled(false);
- serviceConf.setBackup(backup);
serviceConf.setSharedStore(true);
serviceConf.setJournalType(getDefaultJournalType());
serviceConf.setBindingsDirectory(getBindingsDir(id, false));
@@ -111,11 +121,130 @@
serviceConf.getAcceptorConfigurations()
.add(new
TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory",
params));
}
- HornetQServer service = HornetQServers.newHornetQServer(serviceConf, true);
+ HornetQServer service;
+ if(nodeManager == null)
+ {
+ service = HornetQServers.newHornetQServer(serviceConf, true);
+ }
+ else
+ {
+ service = new InVMNodeManagerServer(serviceConf, nodeManager);
+ }
servers.add(service);
return service;
}
+ protected HornetQServer createBackupHornetQServer(final int id,
+ final Map<String, Object> params,
+ final boolean netty,
+ final int liveId,
+ final NodeManager nodeManager)
+ {
+ Configuration serviceConf = new ConfigurationImpl();
+ serviceConf.setClustered(true);
+ serviceConf.setSecurityEnabled(false);
+ serviceConf.setBackup(true);
+ serviceConf.setSharedStore(true);
+ serviceConf.setJournalType(getDefaultJournalType());
+ serviceConf.setBindingsDirectory(getBindingsDir(liveId, false));
+ serviceConf.setJournalMinFiles(2);
+ serviceConf.setJournalDirectory(getJournalDir(liveId, false));
+ serviceConf.setPagingDirectory(getPageDir(liveId, false));
+ serviceConf.setLargeMessagesDirectory(getLargeMessagesDir(liveId, false));
+ // these tests don't need any big storage so limiting the size of the journal
files to speed up the test
+ serviceConf.setJournalFileSize(100 * 1024);
+
+ if (netty)
+ {
+
params.put(org.hornetq.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME,
+ org.hornetq.core.remoting.impl.netty.TransportConstants.DEFAULT_PORT
+ id);
+ serviceConf.getAcceptorConfigurations().add(new
TransportConfiguration(NettyAcceptorFactory.class.getName(),
+
params));
+
+ }
+ else
+ {
+
params.put(org.hornetq.core.remoting.impl.invm.TransportConstants.SERVER_ID_PROP_NAME,
id);
+ serviceConf.getAcceptorConfigurations()
+ .add(new
TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory",
params));
+ }
+ HornetQServer service;
+ if(nodeManager == null)
+ {
+ service = HornetQServers.newHornetQServer(serviceConf, true);
+ }
+ else
+ {
+ service = new InVMNodeManagerServer(serviceConf, nodeManager);
+ }
+
+ servers.add(service);
+
+ return service;
+ }
+
+
+ protected void waitForServerStart(HornetQServer server) throws Exception
+ {
+ long start = System.currentTimeMillis();
+ do
+ {
+ if (server.isInitialised())
+ {
+ return;
+ }
+ Thread.sleep(10);
+ }
+ while (System.currentTimeMillis() - start < 5000);
+
+ String msg = "Timed out waiting for server starting = " + server;
+
+
+ throw new IllegalStateException(msg);
+ }
+
+ // Inner classes -------------------------------------------------
+ class InVMNodeManagerServer extends HornetQServerImpl
+ {
+ final NodeManager nodeManager;
+ public InVMNodeManagerServer(NodeManager nodeManager)
+ {
+ super();
+ this.nodeManager = nodeManager;
+ }
+
+ public InVMNodeManagerServer(Configuration configuration, NodeManager nodeManager)
+ {
+ super(configuration);
+ this.nodeManager = nodeManager;
+ }
+
+ public InVMNodeManagerServer(Configuration configuration, MBeanServer mbeanServer,
NodeManager nodeManager)
+ {
+ super(configuration, mbeanServer);
+ this.nodeManager = nodeManager;
+ }
+
+ public InVMNodeManagerServer(Configuration configuration, HornetQSecurityManager
securityManager, NodeManager nodeManager)
+ {
+ super(configuration, securityManager);
+ this.nodeManager = nodeManager;
+ }
+
+ public InVMNodeManagerServer(Configuration configuration, MBeanServer mbeanServer,
HornetQSecurityManager securityManager, NodeManager nodeManager)
+ {
+ super(configuration, mbeanServer, securityManager);
+ this.nodeManager = nodeManager;
+ }
+
+ @Override
+ protected NodeManager createNodeManager(String directory)
+ {
+ return nodeManager;
+ }
+
+ }
+
}
Modified:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/jms/cluster/JMSFailoverTest.java
===================================================================
---
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/jms/cluster/JMSFailoverTest.java 2010-10-30
13:58:39 UTC (rev 9824)
+++
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/jms/cluster/JMSFailoverTest.java 2010-11-01
09:44:00 UTC (rev 9825)
@@ -118,7 +118,7 @@
jbcf.setReconnectAttempts(-1);
- Connection conn = jbcf.createConnection();
+ Connection conn = JMSUtil.createConnectionAndWaitForTopology(jbcf, 2, 5);
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -140,7 +140,7 @@
jbcf.setReconnectAttempts(-1);
- Connection conn = jbcf.createConnection();
+ Connection conn = JMSUtil.createConnectionAndWaitForTopology(jbcf, 2, 5);
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -168,7 +168,7 @@
jbcf.setConsumerWindowSize(numMessages * bodySize / 10);
- Connection conn = jbcf.createConnection();
+ Connection conn = JMSUtil.createConnectionAndWaitForTopology(jbcf, 2, 5);
MyExceptionListener listener = new MyExceptionListener();
@@ -336,7 +336,10 @@
backupConf.getConnectorConfigurations().put(backuptc.getName(), backuptc);
ArrayList<String> staticConnectors = new ArrayList<String>();
staticConnectors.add(livetc.getName());
+ ClusterConnectionConfiguration cccBackup = new
ClusterConnectionConfiguration("cluster1", "jms", backuptc.getName(),
-1, false, false, 1, 1,
+ staticConnectors);
+ backupConf.getClusterConfigurations().add(cccBackup);
backupConf.setSecurityEnabled(false);
backupConf.setJournalType(getDefaultJournalType());
backupParams.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
Modified:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/jms/server/management/JMSUtil.java
===================================================================
---
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/jms/server/management/JMSUtil.java 2010-10-30
13:58:39 UTC (rev 9824)
+++
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/jms/server/management/JMSUtil.java 2010-11-01
09:44:00 UTC (rev 9825)
@@ -27,11 +27,16 @@
import junit.framework.Assert;
import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.Pair;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClusterTopologyListener;
+import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.api.core.client.SessionFailureListener;
+import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.cluster.impl.ClusterManagerImpl;
+import org.hornetq.jms.client.HornetQConnection;
import org.hornetq.jms.client.HornetQConnectionFactory;
import org.hornetq.api.jms.HornetQJMSClient;
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
@@ -39,6 +44,8 @@
import org.hornetq.tests.util.RandomUtil;
import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -47,10 +54,8 @@
* A JMSUtil
*
* @author <a href="jmesnil(a)redhat.com">Jeff Mesnil</a>
- *
- * Created 14 nov. 2008 13:48:08
- *
- *
+ * <p/>
+ * Created 14 nov. 2008 13:48:08
*/
public class JMSUtil
{
@@ -181,25 +186,25 @@
}
}
- public static void waitForServer(HornetQServer server)
+ public static void waitForServer(HornetQServer server)
throws InterruptedException
{
- long timetowait =System.currentTimeMillis() + 5000;
- while(!server.isStarted())
+ long timetowait = System.currentTimeMillis() + 5000;
+ while (!server.isStarted())
{
Thread.sleep(100);
- if(server.isStarted())
+ if (server.isStarted())
{
break;
}
- else if(System.currentTimeMillis() > timetowait)
+ else if (System.currentTimeMillis() > timetowait)
{
throw new IllegalStateException("server didnt start");
}
}
}
- public static void crash(HornetQServer server, ClientSession... sessions) throws
Exception
+ public static void crash(HornetQServer server, ClientSession... sessions) throws
Exception
{
final CountDownLatch latch = new CountDownLatch(sessions.length);
@@ -219,21 +224,16 @@
{
session.addFailureListener(new MyListener());
}
- Set<RemotingConnection> connections =
server.getRemotingService().getConnections();
+ /*Set<RemotingConnection> connections =
server.getRemotingService().getConnections();
for (RemotingConnection remotingConnection : connections)
{
remotingConnection.destroy();
server.getRemotingService().removeConnection(remotingConnection.getID());
- }
+ }*/
ClusterManagerImpl clusterManager = (ClusterManagerImpl)
server.getClusterManager();
clusterManager.clear();
server.kill();
- // recreate the live.lock file (since it was deleted by the
- // clean stop
- File lockFile = new File(server.getConfiguration().getJournalDirectory(),
"live.lock");
- Assert.assertFalse(lockFile.exists());
- lockFile.createNewFile();
// Wait to be informed of failure
@@ -254,4 +254,55 @@
// Inner classes -------------------------------------------------
+ public static HornetQConnection
createConnectionAndWaitForTopology(HornetQConnectionFactory factory, int topologyMembers,
int timeout) throws Exception
+ {
+ HornetQConnection conn;
+ CountDownLatch countDownLatch = new CountDownLatch(topologyMembers);
+
+ ServerLocator locator = factory.getServerLocator();
+
+ locator.addClusterTopologyListener(new
LatchClusterTopologyListener(countDownLatch));
+
+ conn = (HornetQConnection) factory.createConnection();
+
+ boolean ok = countDownLatch.await(timeout, TimeUnit.SECONDS);
+ if (!ok)
+ {
+ throw new IllegalStateException("timed out waiting for topology");
+ }
+ return conn;
+ }
+
+ static class LatchClusterTopologyListener implements ClusterTopologyListener
+ {
+ final CountDownLatch latch;
+ int liveNodes = 0;
+ int backUpNodes = 0;
+ List<String> liveNode = new ArrayList<String>();
+ List<String> backupNode = new ArrayList<String>();
+
+ public LatchClusterTopologyListener(CountDownLatch latch)
+ {
+ this.latch = latch;
+ }
+
+ public void nodeUP(String nodeID, String sourceNodeID,
Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last,
int distance)
+ {
+ if (connectorPair.a != null &&
!liveNode.contains(connectorPair.a.getName()))
+ {
+ liveNode.add(connectorPair.a.getName());
+ latch.countDown();
+ }
+ if (connectorPair.b != null &&
!backupNode.contains(connectorPair.b.getName()))
+ {
+ backupNode.add(connectorPair.b.getName());
+ latch.countDown();
+ }
+ }
+
+ public void nodeDown(String nodeID)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+ }
}