[hornetq-commits] JBoss hornetq SVN: r10980 - in branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests: integration/replication and 1 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Wed Jul 13 11:35:15 EDT 2011
Author: borges
Date: 2011-07-13 11:35:14 -0400 (Wed, 13 Jul 2011)
New Revision: 10980
Added:
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/util/ReplicatedBackupUtils.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/util/TransportConfigurationUtils.java
Modified:
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/AsynchronousFailoverTest.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailBackAutoTest.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailBackManualTest.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverOnFlowControlTest.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/replication/ReplicationTest.java
Log:
HORNETQ-720 Fix tests, add some utility methods
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/AsynchronousFailoverTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/AsynchronousFailoverTest.java 2011-07-13 15:34:24 UTC (rev 10979)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/AsynchronousFailoverTest.java 2011-07-13 15:35:14 UTC (rev 10980)
@@ -23,16 +23,22 @@
import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.*;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.api.core.client.SessionFailureListener;
import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
import org.hornetq.core.client.impl.ClientSessionInternal;
import org.hornetq.core.client.impl.DelegatingSession;
import org.hornetq.core.logging.Logger;
import org.hornetq.spi.core.protocol.RemotingConnection;
+import org.hornetq.tests.util.TransportConfigurationUtils;
/**
* A MultiThreadFailoverTest
- *
+ *
* Test Failover where failure is prompted by another thread
*
* @author Tim Fox
@@ -47,7 +53,7 @@
private volatile ClientSessionFactoryInternal sf;
- private Object lockFail = new Object();
+ private final Object lockFail = new Object();
class MyListener implements SessionFailureListener
{
@@ -170,7 +176,7 @@
locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
locator.setReconnectAttempts(-1);
- sf = (ClientSessionFactoryInternal) createSessionFactoryAndWaitForTopology(locator, 2);
+ sf = createSessionFactoryAndWaitForTopology(locator, 2);
ClientSession createSession = sf.createSession(true, true);
@@ -198,7 +204,7 @@
// Simulate failure on connection
synchronized (lockFail)
{
- crash((ClientSession) createSession);
+ crash(createSession);
}
/*if (listener != null)
@@ -226,7 +232,7 @@
Assert.assertEquals(0, sf.numSessions());
locator.close();
-
+
Assert.assertEquals(0, sf.numConnections());
if (i != numIts - 1)
@@ -243,7 +249,7 @@
DelegatingSession.debug = false;
}
}
-
+
protected void addPayload(ClientMessage msg)
{
}
@@ -278,7 +284,7 @@
message.getBodyBuffer().writeString("message" + i);
message.putIntProperty("counter", i);
-
+
addPayload(message);
producer.send(message);
@@ -408,7 +414,7 @@
message.putStringProperty(Message.HDR_DUPLICATE_DETECTION_ID, new SimpleString("id:" + i +
",exec:" +
executionId));
-
+
addPayload(message);
@@ -439,13 +445,13 @@
}
while (retry);
-
-
+
+
boolean blocked = false;
retry = false;
-
- ClientConsumer consumer = null;
+
+ ClientConsumer consumer = null;
do
{
ArrayList<Integer> msgs = new ArrayList<Integer>();
@@ -473,7 +479,7 @@
}
session.commit();
-
+
if (blocked)
{
assertTrue("msgs.size is expected to be 0 or " + numMessages + " but it was " + msgs.size(), msgs.size() == 0 || msgs.size() == numMessages);
@@ -535,13 +541,13 @@
@Override
protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live)
{
- return getInVMTransportAcceptorConfiguration(live);
+ return TransportConfigurationUtils.getInVMAcceptor(live);
}
@Override
protected TransportConfiguration getConnectorTransportConfiguration(final boolean live)
{
- return getInVMConnectorTransportConfiguration(live);
+ return TransportConfigurationUtils.getInVMConnector(live);
}
}
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailBackAutoTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailBackAutoTest.java 2011-07-13 15:34:24 UTC (rev 10979)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailBackAutoTest.java 2011-07-13 15:35:14 UTC (rev 10980)
@@ -13,23 +13,29 @@
package org.hornetq.tests.integration.cluster.failover;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
import junit.framework.Assert;
+
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.*;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.SessionFailureListener;
import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
import org.hornetq.core.client.impl.ServerLocatorInternal;
import org.hornetq.core.config.ClusterConnectionConfiguration;
import org.hornetq.core.server.impl.InVMNodeManager;
import org.hornetq.jms.client.HornetQTextMessage;
-import org.hornetq.utils.ReusableLatch;
+import org.hornetq.tests.util.TransportConfigurationUtils;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
/**
* @author <a href="mailto:andy.taylor at jboss.com">Andy Taylor</a>
* Date: Dec 21, 2010
@@ -193,6 +199,7 @@
Assert.assertEquals(0, sf.numConnections());
}
+ @Override
protected void createConfigs() throws Exception
{
nodeManager = new InVMNodeManager();
@@ -234,13 +241,13 @@
@Override
protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live)
{
- return getInVMTransportAcceptorConfiguration(live);
+ return TransportConfigurationUtils.getInVMAcceptor(live);
}
@Override
protected TransportConfiguration getConnectorTransportConfiguration(final boolean live)
{
- return getInVMConnectorTransportConfiguration(live);
+ return TransportConfigurationUtils.getInVMConnector(live);
}
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailBackManualTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailBackManualTest.java 2011-07-13 15:34:24 UTC (rev 10979)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailBackManualTest.java 2011-07-13 15:35:14 UTC (rev 10980)
@@ -13,23 +13,30 @@
package org.hornetq.tests.integration.cluster.failover;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
import junit.framework.Assert;
+
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.*;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.SessionFailureListener;
import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
import org.hornetq.core.client.impl.ServerLocatorInternal;
import org.hornetq.core.config.ClusterConnectionConfiguration;
import org.hornetq.core.server.impl.InVMNodeManager;
import org.hornetq.jms.client.HornetQTextMessage;
import org.hornetq.tests.integration.cluster.util.TestableServer;
+import org.hornetq.tests.util.TransportConfigurationUtils;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
/**
* @author <a href="mailto:andy.taylor at jboss.com">Andy Taylor</a>
* Date: Dec 21, 2010
@@ -133,6 +140,7 @@
}
+ @Override
protected void createConfigs() throws Exception
{
nodeManager = new InVMNodeManager();
@@ -176,13 +184,13 @@
@Override
protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live)
{
- return getInVMTransportAcceptorConfiguration(live);
+ return TransportConfigurationUtils.getInVMAcceptor(live);
}
@Override
protected TransportConfiguration getConnectorTransportConfiguration(final boolean live)
{
- return getInVMConnectorTransportConfiguration(live);
+ return TransportConfigurationUtils.getInVMConnector(live);
}
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverOnFlowControlTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverOnFlowControlTest.java 2011-07-13 15:34:24 UTC (rev 10979)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverOnFlowControlTest.java 2011-07-13 15:35:14 UTC (rev 10980)
@@ -28,6 +28,7 @@
import org.hornetq.core.protocol.core.Packet;
import org.hornetq.core.protocol.core.impl.wireformat.SessionProducerCreditsMessage;
import org.hornetq.spi.core.protocol.RemotingConnection;
+import org.hornetq.tests.util.TransportConfigurationUtils;
/**
* A FailoverOnFlowControlTest
@@ -49,8 +50,8 @@
// Constructors --------------------------------------------------
// Public --------------------------------------------------------
-
-
+
+
public void testOverflowSend() throws Exception
{
ServerLocator locator = getServerLocator();
@@ -65,11 +66,11 @@
public boolean intercept(Packet packet, RemotingConnection connection) throws HornetQException
{
System.out.println("Intercept..." + packet.getClass().getName());
-
+
if (packet instanceof SessionProducerCreditsMessage )
{
SessionProducerCreditsMessage credit = (SessionProducerCreditsMessage)packet;
-
+
System.out.println("Credits: " + credit.getCredits());
if (count.incrementAndGet() == 2)
{
@@ -87,7 +88,7 @@
return true;
}
};
-
+
locator.addInterceptor(interceptorClient);
ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
@@ -97,27 +98,28 @@
session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
-
+
final int numMessages = 10;
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = session.createMessage(true);
-
+
message.getBodyBuffer().writeBytes(new byte[5000]);
message.putIntProperty("counter", i);
producer.send(message);
}
-
+
session.close();
-
+
locator.close();
}
+ @Override
protected void createConfigs() throws Exception
{
super.createConfigs();
@@ -125,6 +127,7 @@
backupServer.getServer().getConfiguration().setJournalFileSize(1024 * 1024);
}
+ @Override
protected ServerLocatorInternal getServerLocator() throws Exception
{
ServerLocatorInternal locator = super.getServerLocator();
@@ -140,13 +143,13 @@
@Override
protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live)
{
- return getInVMTransportAcceptorConfiguration(live);
+ return TransportConfigurationUtils.getInVMAcceptor(live);
}
@Override
protected TransportConfiguration getConnectorTransportConfiguration(final boolean live)
{
- return getInVMConnectorTransportConfiguration(live);
+ return TransportConfigurationUtils.getInVMConnector(live);
}
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2011-07-13 15:34:24 UTC (rev 10979)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2011-07-13 15:35:14 UTC (rev 10980)
@@ -46,6 +46,7 @@
import org.hornetq.jms.client.HornetQTextMessage;
import org.hornetq.tests.integration.cluster.util.TestableServer;
import org.hornetq.tests.util.RandomUtil;
+import org.hornetq.tests.util.TransportConfigurationUtils;
/**
*
@@ -1875,13 +1876,13 @@
@Override
protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live)
{
- return getInVMTransportAcceptorConfiguration(live);
+ return TransportConfigurationUtils.getInVMAcceptor(live);
}
@Override
protected TransportConfiguration getConnectorTransportConfiguration(final boolean live)
{
- return getInVMConnectorTransportConfiguration(live);
+ return TransportConfigurationUtils.getInVMConnector(live);
}
/**
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2011-07-13 15:34:24 UTC (rev 10979)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2011-07-13 15:35:14 UTC (rev 10980)
@@ -35,19 +35,16 @@
import org.hornetq.api.core.client.SessionFailureListener;
import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
import org.hornetq.core.client.impl.ServerLocatorInternal;
-import org.hornetq.core.config.ClusterConnectionConfiguration;
import org.hornetq.core.config.Configuration;
-import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
import org.hornetq.core.remoting.impl.invm.InVMConnector;
-import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
import org.hornetq.core.remoting.impl.invm.InVMRegistry;
-import org.hornetq.core.remoting.impl.invm.TransportConstants;
import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory;
import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
import org.hornetq.core.server.NodeManager;
import org.hornetq.core.server.impl.InVMNodeManager;
import org.hornetq.tests.integration.cluster.util.SameProcessHornetQServer;
import org.hornetq.tests.integration.cluster.util.TestableServer;
+import org.hornetq.tests.util.ReplicatedBackupUtils;
import org.hornetq.tests.util.ServiceTestBase;
/**
@@ -60,8 +57,8 @@
// Constants -----------------------------------------------------
protected static final SimpleString ADDRESS = new SimpleString("FailoverTestAddress");
- private static final String LIVE_NODE_NAME = "hqLIVE";
+
// Attributes ----------------------------------------------------
protected TestableServer liveServer;
@@ -121,19 +118,6 @@
return new SameProcessHornetQServer(createInVMFailoverServer(true, backupConfig, nodeManager));
}
- private ClusterConnectionConfiguration createClusterConnectionConf(String name, String... connectors)
- {
- List<String> conn = new ArrayList<String>(connectors.length);
- for (String iConn : connectors)
- {
- conn.add(iConn);
- }
- return new ClusterConnectionConfiguration("cluster1", "jms", name, -1, false, false, 1, 1, conn, false);
- }
-
- /**
- * @throws Exception
- */
protected void createConfigs() throws Exception
{
nodeManager = new InVMNodeManager();
@@ -149,8 +133,8 @@
TransportConfiguration backupConnector = getConnectorTransportConfiguration(false);
backupConfig.getConnectorConfigurations().put(liveConnector.getName(), liveConnector);
backupConfig.getConnectorConfigurations().put(backupConnector.getName(), backupConnector);
- backupConfig.getClusterConfigurations().add(createClusterConnectionConf(backupConnector.getName(),
- liveConnector.getName()));
+ ReplicatedBackupUtils.createClusterConnectionConf(backupConfig, backupConnector.getName(),
+ liveConnector.getName());
backupServer = createBackupServer();
liveConfig = super.createDefaultConfig();
@@ -159,7 +143,7 @@
liveConfig.setSecurityEnabled(false);
liveConfig.setSharedStore(true);
liveConfig.setClustered(true);
- liveConfig.getClusterConfigurations().add(createClusterConnectionConf(liveConnector.getName()));
+ ReplicatedBackupUtils.createClusterConnectionConf(liveConfig, liveConnector.getName());
liveConfig.getConnectorConfigurations().put(liveConnector.getName(), liveConnector);
liveServer = createLiveServer();
}
@@ -172,42 +156,26 @@
nodeManager = new InVMNodeManager();
backupConfig = super.createDefaultConfig();
+ liveConfig = super.createDefaultConfig();
+ TransportConfiguration backupAcceptor = getAcceptorTransportConfiguration(false);
+ ReplicatedBackupUtils.configureReplicationPair(backupConfig, backupConnector, backupAcceptor, liveConfig,
+ liveConnector);
+
backupConfig.setBindingsDirectory(backupConfig.getBindingsDirectory() + "_backup");
backupConfig.setJournalDirectory(backupConfig.getJournalDirectory() + "_backup");
backupConfig.setPagingDirectory(backupConfig.getPagingDirectory() + "_backup");
backupConfig.setLargeMessagesDirectory(backupConfig.getLargeMessagesDirectory() + "_backup");
- backupConfig.getAcceptorConfigurations().clear();
- backupConfig.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(false));
-
- backupConfig.getConnectorConfigurations().put(backupConnector.getName(), backupConnector);
- backupConfig.getConnectorConfigurations().put(LIVE_NODE_NAME, liveConnector);
- backupConfig.getClusterConfigurations().add(createClusterConnectionConf(backupConnector.getName(),
- backupConnector.getName()));
-
backupConfig.setSecurityEnabled(false);
- backupConfig.setSharedStore(false);
- backupConfig.setBackup(true);
- backupConfig.setLiveConnectorName(LIVE_NODE_NAME);
- backupConfig.setClustered(true);
backupServer = createBackupServer();
backupServer.getServer().setIdentity("idBackup");
- liveConfig = super.createDefaultConfig();
+
liveConfig.getAcceptorConfigurations().clear();
liveConfig.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(true));
- liveConfig.setName(LIVE_NODE_NAME);
- liveConfig.getConnectorConfigurations().put(LIVE_NODE_NAME, liveConnector);
- liveConfig.setSecurityEnabled(false);
- liveConfig.setSharedStore(false);
- liveConfig.setClustered(true);
- liveConfig.getClusterConfigurations().add(createClusterConnectionConf(LIVE_NODE_NAME, LIVE_NODE_NAME));
liveServer = createLiveServer();
liveServer.getServer().setIdentity("idLive");
-
- //liveServer.start();
- //backupServer.start();
}
@Override
@@ -261,8 +229,7 @@
return sf;
}
- protected void waitForBackup(ClientSessionFactoryInternal sf, long seconds)
- throws Exception
+ protected static void waitForBackup(ClientSessionFactoryInternal sf, long seconds) throws Exception
{
long time = System.currentTimeMillis();
long toWait = seconds * 1000;
@@ -288,38 +255,6 @@
System.out.println("sf.getBackupConnector() = " + sf.getBackupConnector());
}
- protected TransportConfiguration getInVMConnectorTransportConfiguration(final boolean live)
- {
- if (live)
- {
- return new TransportConfiguration(InVMConnectorFactory.class.getCanonicalName());
- }
- else
- {
- Map<String, Object> server1Params = new HashMap<String, Object>();
-
- server1Params.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
-
- return new TransportConfiguration(InVMConnectorFactory.class.getCanonicalName(), server1Params);
- }
- }
-
- protected TransportConfiguration getInVMTransportAcceptorConfiguration(final boolean live)
- {
- if (live)
- {
- return new TransportConfiguration(InVMAcceptorFactory.class.getCanonicalName());
- }
- else
- {
- Map<String, Object> server1Params = new HashMap<String, Object>();
-
- server1Params.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
-
- return new TransportConfiguration(InVMAcceptorFactory.class.getCanonicalName(), server1Params);
- }
- }
-
protected TransportConfiguration getNettyAcceptorTransportConfiguration(final boolean live)
{
if (live)
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java 2011-07-13 15:34:24 UTC (rev 10979)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java 2011-07-13 15:35:14 UTC (rev 10980)
@@ -14,12 +14,9 @@
package org.hornetq.tests.integration.cluster.failover;
import java.util.HashMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
import junit.framework.Assert;
-import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientConsumer;
@@ -28,17 +25,16 @@
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
-import org.hornetq.core.client.impl.ClientSessionInternal;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.settings.impl.AddressSettings;
-import org.hornetq.spi.core.protocol.RemotingConnection;
import org.hornetq.tests.integration.cluster.util.SameProcessHornetQServer;
import org.hornetq.tests.integration.cluster.util.TestableServer;
+import org.hornetq.tests.util.TransportConfigurationUtils;
/**
* A PagingFailoverTest
- *
+ *
* TODO: validate replication failover also
*
* @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
@@ -135,10 +131,10 @@
{
crash(session);
}
-
-
+
+
session.close();
-
+
session = sf.createSession(!transacted, !transacted, 0);
session.start();
@@ -161,9 +157,9 @@
}
session.commit();
-
+
cons.close();
-
+
Thread.sleep(1000);
if (!failBeforeConsume)
@@ -173,7 +169,7 @@
}
session.close();
-
+
session = sf.createSession(true, true, 0);
cons = session.createConsumer(PagingFailoverTest.ADDRESS);
@@ -202,23 +198,6 @@
}
}
- /**
- * @param session
- * @param latch
- * @throws InterruptedException
- */
- private void failSession(final ClientSession session, final CountDownLatch latch) throws InterruptedException
- {
- RemotingConnection conn = ((ClientSessionInternal)session).getConnection();
-
- // Simulate failure on connection
- conn.fail(new HornetQException(HornetQException.NOT_CONNECTED));
-
- // Wait to be informed of failure
-
- Assert.assertTrue(latch.await(5000, TimeUnit.MILLISECONDS));
- }
-
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
@@ -226,13 +205,13 @@
@Override
protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live)
{
- return getInVMTransportAcceptorConfiguration(live);
+ return TransportConfigurationUtils.getInVMAcceptor(live);
}
@Override
protected TransportConfiguration getConnectorTransportConfiguration(final boolean live)
{
- return getInVMConnectorTransportConfiguration(live);
+ return TransportConfigurationUtils.getInVMConnector(live);
}
@Override
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/replication/ReplicationTest.java 2011-07-13 15:34:24 UTC (rev 10979)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/replication/ReplicationTest.java 2011-07-13 15:35:14 UTC (rev 10980)
@@ -14,6 +14,7 @@
package org.hornetq.tests.integration.replication;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
@@ -56,15 +57,19 @@
import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
import org.hornetq.core.protocol.core.Packet;
+import org.hornetq.core.replication.ReplicationManager;
import org.hornetq.core.replication.impl.ReplicatedJournal;
import org.hornetq.core.replication.impl.ReplicationManagerImpl;
+import org.hornetq.core.server.HornetQComponent;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.impl.HornetQServerImpl;
import org.hornetq.core.server.impl.ServerMessageImpl;
import org.hornetq.core.settings.HierarchicalRepository;
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.spi.core.protocol.RemotingConnection;
+import org.hornetq.tests.util.ReplicatedBackupUtils;
import org.hornetq.tests.util.ServiceTestBase;
+import org.hornetq.tests.util.TransportConfigurationUtils;
import org.hornetq.utils.ExecutorFactory;
import org.hornetq.utils.HornetQThreadFactory;
import org.hornetq.utils.OrderedExecutorFactory;
@@ -76,23 +81,18 @@
public class ReplicationTest extends ServiceTestBase
{
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
private ThreadFactory tFactory;
-
private ExecutorService executor;
-
private ExecutorFactory factory;
-
private ScheduledExecutorService scheduledExecutor;
- private HornetQServerImpl server;
+ private HornetQServerImpl backupServer;
+ /** This field is not always used. */
+ private HornetQServerImpl liveServer;
private ServerLocator locator;
- private ReplicationManagerImpl manager;
+ private ReplicationManager manager;
// Static --------------------------------------------------------
@@ -100,32 +100,57 @@
// Public --------------------------------------------------------
- public void testBasicConnection() throws Exception
+ private void setupServer(boolean backup, boolean netty, String... interceptors) throws Exception
{
- boolean backup = true;
- boolean netty = false;
- setupServer(backup, netty);
- manager = new ReplicationManagerImpl(locator.createSessionFactory().getConnection(), factory);
- manager.start();
- manager.stop();
+ assert backup; // XXX
+
+ Configuration backupConfig = createDefaultConfig(netty);
+ Configuration liveConfig = createDefaultConfig(netty);
+ backupConfig.setBackup(backup);
+ if (interceptors.length > 0)
+ {
+ List<String> interceptorsList = Arrays.asList(interceptors);
+ backupConfig.setInterceptorClassNames(interceptorsList);
+ }
+
+ TransportConfiguration liveConnector = TransportConfigurationUtils.getInVMConnector(true);
+ TransportConfiguration backupConnector = TransportConfigurationUtils.getInVMConnector(false);
+ TransportConfiguration backupAcceptor = TransportConfigurationUtils.getInVMAcceptor(false);
+
+ ReplicatedBackupUtils.configureReplicationPair(backupConfig, backupConnector, backupAcceptor, liveConfig,
+ liveConnector);
+ if (backup)
+ {
+ liveServer = new HornetQServerImpl(liveConfig);
+ liveServer.start();
+ waitForComponent(liveServer);
+ }
+
+ backupServer = new HornetQServerImpl(backupConfig);
+ locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
+ backupServer.start();
+ Thread.sleep(200); // XXX improve this
+ waitForComponent(backupServer);
}
- private void setupServer(boolean backup, boolean netty) throws Exception
+ private static void waitForComponent(HornetQComponent component) throws Exception
{
- Configuration config = createDefaultConfig(netty);
- config.setBackup(backup);
- server = new HornetQServerImpl(config);
- locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
- server.start();
+ waitForComponent(component, 3);
}
+ public void testBasicConnection() throws Exception
+ {
+ setupServer(true, false);
+ waitForComponent(liveServer.getReplicationManager());
+ }
+
public void testInvalidJournal() throws Exception
{
setupServer(true, false);
+ manager = liveServer.getReplicationManager();
+ waitForComponent(manager);
- manager = new ReplicationManagerImpl(locator.createSessionFactory().getConnection(), factory);
- manager.start();
try
{
manager.compareJournals(new JournalLoadInformation[] { new JournalLoadInformation(2, 2),
@@ -134,7 +159,8 @@
}
catch (HornetQException e)
{
- e.printStackTrace();
+ if (e.getCode() != HornetQException.ILLEGAL_STATE)
+ e.printStackTrace();
Assert.assertEquals(HornetQException.ILLEGAL_STATE, e.getCode());
}
@@ -148,10 +174,8 @@
setupServer(true, false);
- manager = new ReplicationManagerImpl(locator.createSessionFactory().getConnection(), factory);
-
- manager.start();
-
+ manager = liveServer.getReplicationManager();
+ waitForComponent(manager);
try
{
ReplicationManagerImpl manager2 =
@@ -162,17 +186,19 @@
}
catch (Exception e)
{
+ // expected
}
+
}
public void testConnectIntoNonBackup() throws Exception
{
setupServer(false, false);
- manager = new ReplicationManagerImpl(locator.createSessionFactory().getConnection(), factory);
-
try
{
+
+ manager = new ReplicationManagerImpl(locator.createSessionFactory().getConnection(), factory);
manager.start();
Assert.fail("Exception was expected");
}
@@ -188,8 +214,8 @@
StorageManager storage = getStorage();
- manager = new ReplicationManagerImpl(locator.createSessionFactory().getConnection(), factory);
- manager.start();
+ manager = liveServer.getReplicationManager();
+ waitForComponent(manager);
Journal replicatedJournal = new ReplicatedJournal((byte)1, new FakeJournal(), manager);
@@ -226,8 +252,8 @@
blockOnReplication(storage, manager);
PagingManager pagingManager =
- createPageManager(server.getStorageManager(), server.getConfiguration(), server.getExecutorFactory(),
- server.getAddressSettingsRepository());
+ createPageManager(backupServer.getStorageManager(), backupServer.getConfiguration(), backupServer.getExecutorFactory(),
+ backupServer.getAddressSettingsRepository());
PagingStore store = pagingManager.getPageStore(dummy);
store.start();
@@ -266,25 +292,10 @@
public void testSendPacketsWithFailure() throws Exception
{
- Configuration config = createDefaultConfig(false);
+ setupServer(true, false, TestInterceptor.class.getName());
- config.setBackup(true);
-
- ArrayList<String> intercepts = new ArrayList<String>();
-
- intercepts.add(TestInterceptor.class.getName());
-
- config.setInterceptorClassNames(intercepts);
-
- server = new HornetQServerImpl(config);
-
- server.start();
-
- locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
-
StorageManager storage = getStorage();
- manager = new ReplicationManagerImpl(locator.createSessionFactory().getConnection(), factory);
- manager.start();
+ manager = liveServer.getReplicationManager();
Journal replicatedJournal = new ReplicatedJournal((byte)1, new FakeJournal(), manager);
@@ -309,7 +320,7 @@
}
});
- server.stop();
+ backupServer.stop();
Assert.assertTrue(latch.await(50, TimeUnit.SECONDS));
@@ -408,7 +419,7 @@
* @param manager
* @return
*/
- private void blockOnReplication(final StorageManager storage, final ReplicationManagerImpl manager) throws Exception
+ private void blockOnReplication(final StorageManager storage, final ReplicationManager manager) throws Exception
{
final CountDownLatch latch = new CountDownLatch(1);
storage.afterCompleteOperations(new IOAsyncTask()
@@ -427,29 +438,29 @@
Assert.assertTrue(latch.await(30, TimeUnit.SECONDS));
}
- public void testNoServer() throws Exception
- {
- locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
+// public void testNoServer() throws Exception
+// {
+// locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
+//
+// try
+// {
+// manager = new ReplicationManagerImpl(locator.createSessionFactory().getConnection(), factory);
+// manager.start();
+// Assert.fail("Exception expected");
+// }
+// catch (HornetQException expected)
+// {
+// Assert.assertEquals(HornetQException.ILLEGAL_STATE, expected.getCode());
+// }
+// }
- try
- {
- manager = new ReplicationManagerImpl(locator.createSessionFactory().getConnection(), factory);
- manager.start();
- Assert.fail("Exception expected");
- }
- catch (HornetQException expected)
- {
- Assert.assertEquals(HornetQException.ILLEGAL_STATE, expected.getCode());
- }
- }
-
public void testNoActions() throws Exception
{
setupServer(true, false);
StorageManager storage = getStorage();
- manager = new ReplicationManagerImpl(locator.createSessionFactory().getConnection(), factory);
- manager.start();
+ manager = liveServer.getReplicationManager();
+ waitForComponent(manager);
Journal replicatedJournal = new ReplicatedJournal((byte)1, new FakeJournal(), manager);
@@ -482,9 +493,7 @@
final ArrayList<Integer> executions = new ArrayList<Integer>();
StorageManager storage = getStorage();
- manager = new ReplicationManagerImpl(locator.createSessionFactory().getConnection(), factory);
- manager.start();
-
+ manager = liveServer.getReplicationManager();
Journal replicatedJournal = new ReplicatedJournal((byte)1, new FakeJournal(), manager);
int numberOfAdds = 200;
@@ -568,19 +577,13 @@
@Override
protected void tearDown() throws Exception
{
- if (manager != null)
- {
- if (manager.isStarted())
- manager.stop();
- manager = null;
- }
- if (server != null)
- {
- if (server.isStarted())
- server.stop();
- server = null;
- }
+ stopComponent(manager);
+ manager = null;
+ stopComponent(liveServer);
+ liveServer = null;
+ stopComponent(backupServer);
+ backupServer = null;
executor.shutdown();
@@ -593,6 +596,7 @@
}
+
protected
PagingManager
createPageManager(final StorageManager storageManager,
@@ -613,7 +617,7 @@
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
- public static class TestInterceptor implements Interceptor
+ public static final class TestInterceptor implements Interceptor
{
static AtomicBoolean value = new AtomicBoolean(true);
Added: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/util/ReplicatedBackupUtils.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/util/ReplicatedBackupUtils.java (rev 0)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/util/ReplicatedBackupUtils.java 2011-07-13 15:35:14 UTC (rev 10980)
@@ -0,0 +1,71 @@
+/**
+ *
+ */
+package org.hornetq.tests.util;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.core.config.ClusterConnectionConfiguration;
+import org.hornetq.core.config.Configuration;
+
+public final class ReplicatedBackupUtils
+{
+ private static final String LIVE_NODE_NAME = "hqLIVE";
+ private ReplicatedBackupUtils()
+ {
+ // Utility class
+ }
+
+ /**
+ * Creates a {@link ClusterConnectionConfiguration} and adds it to the {@link Configuration}.
+ * @param configuration
+ * @param name
+ * @param connectors
+ */
+ public static void createClusterConnectionConf(Configuration configuration, String name, String... connectors)
+ {
+
+ List<String> conn = new ArrayList<String>(connectors.length);
+ for (String iConn : connectors)
+ {
+ conn.add(iConn);
+ }
+ ClusterConnectionConfiguration clusterConfig =
+ new ClusterConnectionConfiguration("cluster1", "jms", name, -1, false, false, 1, 1, conn, false);
+ configuration.getClusterConfigurations().add(clusterConfig);
+ }
+
+ public static void configureReplicationPair(Configuration backupConfig,
+ TransportConfiguration backupConnector,
+ TransportConfiguration backupAcceptor,
+ Configuration liveConfig,
+ TransportConfiguration liveConnector)
+ {
+ if (backupAcceptor != null)
+ {
+ Set<TransportConfiguration> backupAcceptorSet = backupConfig.getAcceptorConfigurations();
+ backupAcceptorSet.clear();
+ backupAcceptorSet.add(backupAcceptor);
+ }
+
+ backupConfig.getConnectorConfigurations().put(backupConnector.getName(), backupConnector);
+ backupConfig.getConnectorConfigurations().put(LIVE_NODE_NAME, liveConnector);
+ ReplicatedBackupUtils.createClusterConnectionConf(backupConfig, backupConnector.getName(),
+ backupConnector.getName());
+
+ backupConfig.setSharedStore(false);
+ backupConfig.setBackup(true);
+ backupConfig.setLiveConnectorName(LIVE_NODE_NAME);
+ backupConfig.setClustered(true);
+
+ liveConfig.setName(LIVE_NODE_NAME);
+ liveConfig.getConnectorConfigurations().put(LIVE_NODE_NAME, liveConnector);
+ liveConfig.setSecurityEnabled(false);
+ liveConfig.setSharedStore(false);
+ liveConfig.setClustered(true);
+ ReplicatedBackupUtils.createClusterConnectionConf(liveConfig, LIVE_NODE_NAME, LIVE_NODE_NAME);
+ }
+}
Added: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/util/TransportConfigurationUtils.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/util/TransportConfigurationUtils.java (rev 0)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/util/TransportConfigurationUtils.java 2011-07-13 15:35:14 UTC (rev 10980)
@@ -0,0 +1,38 @@
+package org.hornetq.tests.util;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
+import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
+import org.hornetq.core.remoting.impl.invm.TransportConstants;
+
+public final class TransportConfigurationUtils
+{
+
+ public static TransportConfiguration getInVMAcceptor(final boolean live)
+ {
+ if (live)
+ {
+ return new TransportConfiguration(InVMAcceptorFactory.class.getCanonicalName());
+ }
+
+ Map<String, Object> server1Params = new HashMap<String, Object>();
+ server1Params.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
+ return new TransportConfiguration(InVMAcceptorFactory.class.getCanonicalName(), server1Params);
+ }
+
+ public static TransportConfiguration getInVMConnector(final boolean live)
+ {
+ if (live)
+ {
+ return new TransportConfiguration(InVMConnectorFactory.class.getCanonicalName());
+ }
+
+ Map<String, Object> server1Params = new HashMap<String, Object>();
+ server1Params.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
+ return new TransportConfiguration(InVMConnectorFactory.class.getCanonicalName(), server1Params);
+ }
+
+}
More information about the hornetq-commits
mailing list