Author: jmesnil
Date: 2010-09-13 09:35:19 -0400 (Mon, 13 Sep 2010)
New Revision: 9675
Added:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/RemoteMultipleLivesMultipleBackupsFailoverTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/RemoteSingleLiveMultipleBackupsFailoverTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/SingleLiveMultipleBackupsFailoverTest.java
Removed:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupFailoverTest.java
Modified:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util/RemoteProcessHornetQServer.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util/RemoteProcessHornetQServerSupport.java
Log:
add tests for multiple backups on single/multiple live nodes (using either invm or remote
processes)
Modified:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
---
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2010-09-13
07:44:26 UTC (rev 9674)
+++
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2010-09-13
13:35:19 UTC (rev 9675)
@@ -97,13 +97,12 @@
FakeLockFile.clearLocks();
createConfigs();
+ liveServer.start();
+
if (backupServer != null)
{
backupServer.start();
}
-
- liveServer.start();
-
}
protected TestableServer createLiveServer()
Deleted:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupFailoverTest.java
===================================================================
---
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupFailoverTest.java 2010-09-13
07:44:26 UTC (rev 9674)
+++
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupFailoverTest.java 2010-09-13
13:35:19 UTC (rev 9675)
@@ -1,370 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.tests.integration.cluster.failover;
-
-import junit.framework.Assert;
-import org.hornetq.api.core.HornetQException;
-import org.hornetq.api.core.Pair;
-import org.hornetq.api.core.SimpleString;
-import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.*;
-import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
-import org.hornetq.core.client.impl.ServerLocatorImpl;
-import org.hornetq.core.config.BackupConnectorConfiguration;
-import org.hornetq.core.config.ClusterConnectionConfiguration;
-import org.hornetq.core.config.Configuration;
-import org.hornetq.core.remoting.impl.invm.TransportConstants;
-import org.hornetq.core.server.ActivateCallback;
-import org.hornetq.core.server.HornetQServer;
-import org.hornetq.core.server.cluster.impl.FakeLockFile;
-import org.hornetq.jms.client.HornetQTextMessage;
-import org.hornetq.tests.integration.cluster.util.SameProcessHornetQServer;
-import org.hornetq.tests.integration.cluster.util.TestableServer;
-import org.hornetq.tests.util.ServiceTestBase;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-/**
- */
-public class MultipleBackupFailoverTest extends ServiceTestBase
-{
- private ArrayList<TestableServer> servers = new
ArrayList<TestableServer>(5);
-
- @Override
- protected void setUp() throws Exception
- {
- super.setUp();
- clearData();
- FakeLockFile.clearLocks();
- }
-
- public void testMultipleFailovers() throws Exception
- {
- createLiveConfig(0);
- createBackupConfig(0, 1,false, 0, 2, 3, 4, 5);
- createBackupConfig(0, 2,false, 0, 1, 3, 4, 5);
- createBackupConfig(0, 3,false, 0, 1, 2, 4, 5);
- createBackupConfig(0, 4, false, 0, 1, 2, 3, 4);
- createBackupConfig(0, 5, false, 0, 1, 2, 3, 4);
- servers.get(1).start();
- servers.get(2).start();
- servers.get(3).start();
- servers.get(4).start();
- servers.get(5).start();
- servers.get(0).start();
-
- ServerLocator locator = getServerLocator(0);
-
- locator.setBlockOnNonDurableSend(true);
- locator.setBlockOnDurableSend(true);
- locator.setBlockOnAcknowledge(true);
- locator.setReconnectAttempts(-1);
- ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator,
2);
- int backupNode;
- ClientSession session = sendAndConsume(sf, true);
- System.out.println("failing node 0");
- fail(0, session);
- session.close();
- backupNode = waitForBackup(5);
- session = sendAndConsume(sf, false);
- System.out.println("failing node " + backupNode);
- fail(backupNode, session);
- session.close();
- backupNode = waitForBackup(5);
- session = sendAndConsume(sf, false);
- System.out.println("failing node " + backupNode);
- fail(backupNode, session);
- session.close();
- backupNode = waitForBackup(5);
- session = sendAndConsume(sf, false);
- System.out.println("failing node " + backupNode);
- fail(backupNode, session);
- session.close();
- backupNode = waitForBackup(5);
- session = sendAndConsume(sf, false);
- System.out.println("failing node " + backupNode);
- fail(backupNode, session);
- session.close();
- backupNode = waitForBackup(5);
- session = sendAndConsume(sf, false);
- session.close();
- servers.get(backupNode).stop();
- System.out.println("MultipleBackupFailoverTest.test");
- }
-
- public void testMultipleFailovers2liveservers() throws Exception
- {
- createLiveConfig(0, 3);
- createBackupConfig(0, 1, true, 0, 3);
- createBackupConfig(0, 2,true, 0, 3);
- createLiveConfig(3, 0);
- createBackupConfig(3, 4, true,0, 3);
- createBackupConfig(3, 5, true,0, 3);
- servers.get(1).start();
- servers.get(2).start();
- servers.get(0).start();
- servers.get(4).start();
- servers.get(5).start();
- servers.get(3).start();
- ServerLocator locator = getServerLocator(0);
-
- locator.setBlockOnNonDurableSend(true);
- locator.setBlockOnDurableSend(true);
- locator.setBlockOnAcknowledge(true);
- locator.setReconnectAttempts(-1);
- ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator,
4);
- ClientSession session = sendAndConsume(sf, true);
-
- fail(0, session);
-
- ServerLocator locator2 = getServerLocator(3);
- locator2.setBlockOnNonDurableSend(true);
- locator2.setBlockOnDurableSend(true);
- locator2.setBlockOnAcknowledge(true);
- locator2.setReconnectAttempts(-1);
- ClientSessionFactoryInternal sf2 = createSessionFactoryAndWaitForTopology(locator2,
4);
- ClientSession session2 = sendAndConsume(sf2, true);
- fail(3, session2);
- servers.get(2).stop();
- servers.get(4).stop();
- servers.get(1).stop();
- servers.get(3).stop();
- }
-
- protected void fail(int node, final ClientSession... sessions) throws Exception
- {
- servers.get(node).crash(sessions);
- }
-
- protected int waitForBackup(long seconds)
- {
- long time = System.currentTimeMillis();
- long toWait = seconds * 1000;
- while (true)
- {
- for (int i = 0, serversSize = servers.size(); i < serversSize; i++)
- {
- TestableServer backupServer = servers.get(i);
- if (backupServer.isInitialised())
- {
- return i;
- }
- }
- try
- {
- Thread.sleep(100);
- }
- catch (InterruptedException e)
- {
- //ignore
- }
- if (System.currentTimeMillis() > (time + toWait))
- {
- fail("backup server never started");
- }
- }
- }
-
-
- private void createBackupConfig(int liveNode, int nodeid, boolean
createClusterConnections, int... nodes)
- {
- Configuration config1 = super.createDefaultConfig();
- config1.getAcceptorConfigurations().clear();
-
config1.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(nodeid));
- config1.setSecurityEnabled(false);
- config1.setSharedStore(true);
- config1.setBackup(true);
- config1.setClustered(true);
- List<String> staticConnectors = new ArrayList<String>();
-
- for (int node : nodes)
- {
- TransportConfiguration liveConnector =
getConnectorTransportConfiguration(node);
- config1.getConnectorConfigurations().put(liveConnector.getName(),
liveConnector);
- staticConnectors.add(liveConnector.getName());
- }
- TransportConfiguration backupConnector =
getConnectorTransportConfiguration(nodeid);
- List<String> pairs = null;
- ClusterConnectionConfiguration ccc1 = new
ClusterConnectionConfiguration("cluster1", "jms",
backupConnector.getName(), -1, false, false, 1, 1,
- createClusterConnections? staticConnectors:pairs);
- config1.getClusterConfigurations().add(ccc1);
- BackupConnectorConfiguration connectorConfiguration = new
BackupConnectorConfiguration(staticConnectors, backupConnector.getName());
- config1.setBackupConnectorConfiguration(connectorConfiguration);
- config1.getConnectorConfigurations().put(backupConnector.getName(),
backupConnector);
-
-
- config1.setBindingsDirectory(config1.getBindingsDirectory() + "_" +
liveNode);
- config1.setJournalDirectory(config1.getJournalDirectory() + "_" +
liveNode);
- config1.setPagingDirectory(config1.getPagingDirectory() + "_" +
liveNode);
- config1.setLargeMessagesDirectory(config1.getLargeMessagesDirectory() +
"_" + liveNode);
-
- servers.add(new SameProcessHornetQServer(createFakeLockServer(true, config1)));
- }
-
- public void createLiveConfig(int liveNode, int ... otherLiveNodes)
- {
- TransportConfiguration liveConnector =
getConnectorTransportConfiguration(liveNode);
- Configuration config0 = super.createDefaultConfig();
- config0.getAcceptorConfigurations().clear();
-
config0.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(liveNode));
- config0.setSecurityEnabled(false);
- config0.setSharedStore(true);
- config0.setClustered(true);
- List<String> pairs = new ArrayList<String>();
- for (int node : otherLiveNodes)
- {
- TransportConfiguration otherLiveConnector =
getConnectorTransportConfiguration(node);
- config0.getConnectorConfigurations().put(otherLiveConnector.getName(),
otherLiveConnector);
- pairs.add(otherLiveConnector.getName());
-
- }
- ClusterConnectionConfiguration ccc0 = new
ClusterConnectionConfiguration("cluster1", "jms",
liveConnector.getName(), -1, false, false, 1, 1,
- pairs);
- config0.getClusterConfigurations().add(ccc0);
- config0.getConnectorConfigurations().put(liveConnector.getName(), liveConnector);
-
- config0.setBindingsDirectory(config0.getBindingsDirectory() + "_" +
liveNode);
- config0.setJournalDirectory(config0.getJournalDirectory() + "_" +
liveNode);
- config0.setPagingDirectory(config0.getPagingDirectory() + "_" +
liveNode);
- config0.setLargeMessagesDirectory(config0.getLargeMessagesDirectory() +
"_" + liveNode);
-
- servers.add(new SameProcessHornetQServer(createFakeLockServer(true, config0)));
- }
-
- private TransportConfiguration getConnectorTransportConfiguration(int node)
- {
- HashMap<String, Object> map = new HashMap<String, Object>();
- map.put(TransportConstants.SERVER_ID_PROP_NAME, node);
- return new TransportConfiguration(INVM_CONNECTOR_FACTORY, map);
- }
-
- private TransportConfiguration getAcceptorTransportConfiguration(int node)
- {
- HashMap<String, Object> map = new HashMap<String, Object>();
- map.put(TransportConstants.SERVER_ID_PROP_NAME, node);
- return new TransportConfiguration(INVM_ACCEPTOR_FACTORY, map);
- }
-
- protected ClientSessionFactoryInternal
createSessionFactoryAndWaitForTopology(ServerLocator locator, int topologyMembers)
- throws Exception
- {
- ClientSessionFactoryInternal sf;
- CountDownLatch countDownLatch = new CountDownLatch(topologyMembers);
-
- locator.addClusterTopologyListener(new
LatchClusterTopologyListener(countDownLatch));
-
- sf = (ClientSessionFactoryInternal) locator.createSessionFactory();
-
- boolean ok = countDownLatch.await(5, TimeUnit.SECONDS);
- assertTrue(ok);
- return sf;
- }
-
- public ServerLocator getServerLocator(int... nodes)
- {
- TransportConfiguration[] configs = new TransportConfiguration[nodes.length];
- for (int i = 0, configsLength = configs.length; i < configsLength; i++)
- {
- HashMap<String, Object> map = new HashMap<String, Object>();
- map.put(TransportConstants.SERVER_ID_PROP_NAME, nodes[i]);
- configs[i] = new TransportConfiguration(INVM_CONNECTOR_FACTORY, map);
-
- }
- return new ServerLocatorImpl(true, configs);
- }
-
- private ClientSession sendAndConsume(final ClientSessionFactory sf, final boolean
createQueue) throws Exception
- {
- ClientSession session = sf.createSession(false, true, true);
-
- if (createQueue)
- {
- session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null,
false);
- }
-
- ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
-
- final int numMessages = 1000;
-
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = session.createMessage(HornetQTextMessage.TYPE,
- false,
- 0,
- System.currentTimeMillis(),
- (byte) 1);
- message.putIntProperty(new SimpleString("count"), i);
- message.getBodyBuffer().writeString("aardvarks");
- producer.send(message);
- }
-
- ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
-
- session.start();
-
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message2 = consumer.receive();
-
- Assert.assertEquals("aardvarks",
message2.getBodyBuffer().readString());
-
- Assert.assertEquals(i, message2.getObjectProperty(new
SimpleString("count")));
-
- message2.acknowledge();
- }
-
- ClientMessage message3 = consumer.receiveImmediate();
-
- Assert.assertNull(message3);
-
- return session;
- }
-
- class LatchClusterTopologyListener implements ClusterTopologyListener
- {
- final CountDownLatch latch;
- int liveNodes = 0;
- int backUpNodes = 0;
- List<String> liveNode = new ArrayList<String>();
- List<String> backupNode = new ArrayList<String>();
-
- public LatchClusterTopologyListener(CountDownLatch latch)
- {
- this.latch = latch;
- }
-
- public void nodeUP(String nodeID, Pair<TransportConfiguration,
TransportConfiguration> connectorPair, boolean last, int distance)
- {
- if (connectorPair.a != null &&
!liveNode.contains(connectorPair.a.getName()))
- {
- liveNode.add(connectorPair.a.getName());
- latch.countDown();
- }
- if (connectorPair.b != null &&
!backupNode.contains(connectorPair.b.getName()))
- {
- backupNode.add(connectorPair.b.getName());
- latch.countDown();
- }
- }
-
- public void nodeDown(String nodeID)
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
- }
-}
Added:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java
===================================================================
---
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java
(rev 0)
+++
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java 2010-09-13
13:35:19 UTC (rev 9675)
@@ -0,0 +1,216 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.cluster.failover;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.Assert;
+
+import org.hornetq.api.core.Pair;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.ClusterTopologyListener;
+import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
+import org.hornetq.core.client.impl.ServerLocatorImpl;
+import org.hornetq.core.server.cluster.impl.FakeLockFile;
+import org.hornetq.jms.client.HornetQTextMessage;
+import org.hornetq.tests.integration.cluster.util.TestableServer;
+import org.hornetq.tests.util.ServiceTestBase;
+
+/**
+ * A MultipleBackupsFailoverTestBase
+ *
+ * @author jmesnil
+ *
+ *
+ */
+public abstract class MultipleBackupsFailoverTestBase extends ServiceTestBase
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static -------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ clearData();
+ FakeLockFile.clearLocks();
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ protected abstract boolean isNetty();
+
+ protected int waitForBackup(long seconds, List<TestableServer> servers, int...
nodes)
+ {
+ long time = System.currentTimeMillis();
+ long toWait = seconds * 1000;
+ while (true)
+ {
+ for (int node : nodes)
+ {
+ TestableServer backupServer = servers.get(node);
+ if (backupServer.isInitialised())
+ {
+ return node;
+ }
+ }
+ try
+ {
+ Thread.sleep(100);
+ }
+ catch (InterruptedException e)
+ {
+ // ignore
+ }
+ if (System.currentTimeMillis() > (time + toWait))
+ {
+ fail("backup server never started");
+ }
+ }
+ }
+
+ protected ClientSession sendAndConsume(final ClientSessionFactory sf, final boolean
createQueue) throws Exception
+ {
+ ClientSession session = sf.createSession(false, true, true);
+
+ if (createQueue)
+ {
+ session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null,
false);
+ }
+
+ ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
+
+ final int numMessages = 1000;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session.createMessage(HornetQTextMessage.TYPE,
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("count"), i);
+ message.getBodyBuffer().writeString("aardvarks");
+ producer.send(message);
+ }
+
+ ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
+
+ session.start();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message2 = consumer.receive();
+
+ Assert.assertEquals("aardvarks",
message2.getBodyBuffer().readString());
+
+ Assert.assertEquals(i, message2.getObjectProperty(new
SimpleString("count")));
+
+ message2.acknowledge();
+ }
+
+ ClientMessage message3 = consumer.receiveImmediate();
+
+ Assert.assertNull(message3);
+
+ return session;
+ }
+
+ protected ClientSessionFactoryInternal
createSessionFactoryAndWaitForTopology(ServerLocator locator,
+ int
topologyMembers) throws Exception
+ {
+ ClientSessionFactoryInternal sf;
+ CountDownLatch countDownLatch = new CountDownLatch(topologyMembers);
+
+ locator.addClusterTopologyListener(new
LatchClusterTopologyListener(countDownLatch));
+
+ sf = (ClientSessionFactoryInternal)locator.createSessionFactory();
+
+ boolean ok = countDownLatch.await(5, TimeUnit.SECONDS);
+ assertTrue(ok);
+ return sf;
+ }
+
+ public ServerLocator getServerLocator(int... nodes)
+ {
+ TransportConfiguration[] configs = new TransportConfiguration[nodes.length];
+ for (int i = 0, configsLength = configs.length; i < configsLength; i++)
+ {
+ configs[i] = createTransportConfiguration(isNetty(), false,
generateParams(nodes[i], isNetty()));
+ }
+ return new ServerLocatorImpl(true, configs);
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+ class LatchClusterTopologyListener implements ClusterTopologyListener
+ {
+ final CountDownLatch latch;
+
+ int liveNodes = 0;
+
+ int backUpNodes = 0;
+
+ List<String> liveNode = new ArrayList<String>();
+
+ List<String> backupNode = new ArrayList<String>();
+
+ public LatchClusterTopologyListener(CountDownLatch latch)
+ {
+ this.latch = latch;
+ }
+
+ public void nodeUP(String nodeID,
+ Pair<TransportConfiguration, TransportConfiguration>
connectorPair,
+ boolean last,
+ int distance)
+ {
+ if (connectorPair.a != null &&
!liveNode.contains(connectorPair.a.getName()))
+ {
+ liveNode.add(connectorPair.a.getName());
+ latch.countDown();
+ }
+ if (connectorPair.b != null &&
!backupNode.contains(connectorPair.b.getName()))
+ {
+ backupNode.add(connectorPair.b.getName());
+ latch.countDown();
+ }
+ }
+
+ public void nodeDown(String nodeID)
+ {
+ }
+ }
+}
Added:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java
===================================================================
---
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java
(rev 0)
+++
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java 2010-09-13
13:35:19 UTC (rev 9675)
@@ -0,0 +1,164 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.cluster.failover;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
+import org.hornetq.core.config.BackupConnectorConfiguration;
+import org.hornetq.core.config.ClusterConnectionConfiguration;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.tests.integration.cluster.util.SameProcessHornetQServer;
+import org.hornetq.tests.integration.cluster.util.TestableServer;
+
+/**
+ */
+public class MultipleLivesMultipleBackupsFailoverTest extends
MultipleBackupsFailoverTestBase
+{
+ protected ArrayList<TestableServer> servers = new
ArrayList<TestableServer>(5);
+
+ public void testMultipleFailovers2LiveServers() throws Exception
+ {
+ createLiveConfig(0, 3);
+ createBackupConfig(0, 1, true, 0, 3);
+ createBackupConfig(0, 2,true, 0, 3);
+ createLiveConfig(3, 0);
+ createBackupConfig(3, 4, true,0, 3);
+ createBackupConfig(3, 5, true,0, 3);
+ servers.get(0).start();
+ servers.get(3).start();
+ servers.get(1).start();
+ servers.get(2).start();
+ servers.get(4).start();
+ servers.get(5).start();
+ ServerLocator locator = getServerLocator(0);
+
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setBlockOnAcknowledge(true);
+ locator.setReconnectAttempts(-1);
+ ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator,
4);
+ ClientSession session = sendAndConsume(sf, true);
+
+ servers.get(0).crash(session);
+
+ int liveAfter0 = waitForBackup(10000, servers, 1, 2);
+
+ ServerLocator locator2 = getServerLocator(3);
+ locator2.setBlockOnNonDurableSend(true);
+ locator2.setBlockOnDurableSend(true);
+ locator2.setBlockOnAcknowledge(true);
+ locator2.setReconnectAttempts(-1);
+ ClientSessionFactoryInternal sf2 = createSessionFactoryAndWaitForTopology(locator2,
4);
+ ClientSession session2 = sendAndConsume(sf2, true);
+ servers.get(3).crash(session2);
+ int liveAfter3 = waitForBackup(10000, servers, 4, 5);
+
+ if (liveAfter0 == 2)
+ {
+ servers.get(1).stop();
+ servers.get(2).stop();
+ }
+ else
+ {
+ servers.get(2).stop();
+ servers.get(1).stop();
+ }
+
+ if (liveAfter3 == 4)
+ {
+ servers.get(5).stop();
+ servers.get(4).stop();
+ }
+ else
+ {
+ servers.get(4).stop();
+ servers.get(5).stop();
+ }
+ }
+
+ protected void createBackupConfig(int liveNode, int nodeid, boolean
createClusterConnections, int... nodes)
+ {
+ Configuration config1 = super.createDefaultConfig();
+ config1.getAcceptorConfigurations().clear();
+ config1.getAcceptorConfigurations().add(createTransportConfiguration(isNetty(),
true, generateParams(nodeid, isNetty())));
+ config1.setSecurityEnabled(false);
+ config1.setSharedStore(true);
+ config1.setBackup(true);
+ config1.setClustered(true);
+ List<String> staticConnectors = new ArrayList<String>();
+
+ for (int node : nodes)
+ {
+ TransportConfiguration liveConnector = createTransportConfiguration(isNetty(),
false, generateParams(node, isNetty()));
+ config1.getConnectorConfigurations().put(liveConnector.getName(),
liveConnector);
+ staticConnectors.add(liveConnector.getName());
+ }
+ TransportConfiguration backupConnector = createTransportConfiguration(isNetty(),
false, generateParams(nodeid, isNetty()));
+ List<String> pairs = null;
+ ClusterConnectionConfiguration ccc1 = new
ClusterConnectionConfiguration("cluster1", "jms",
backupConnector.getName(), -1, false, false, 1, 1,
+ createClusterConnections? staticConnectors:pairs);
+ config1.getClusterConfigurations().add(ccc1);
+ BackupConnectorConfiguration connectorConfiguration = new
BackupConnectorConfiguration(staticConnectors, backupConnector.getName());
+ config1.setBackupConnectorConfiguration(connectorConfiguration);
+ config1.getConnectorConfigurations().put(backupConnector.getName(),
backupConnector);
+
+
+ config1.setBindingsDirectory(config1.getBindingsDirectory() + "_" +
liveNode);
+ config1.setJournalDirectory(config1.getJournalDirectory() + "_" +
liveNode);
+ config1.setPagingDirectory(config1.getPagingDirectory() + "_" +
liveNode);
+ config1.setLargeMessagesDirectory(config1.getLargeMessagesDirectory() +
"_" + liveNode);
+
+ servers.add(new SameProcessHornetQServer(createFakeLockServer(true, config1)));
+ }
+
+ protected void createLiveConfig(int liveNode, int ... otherLiveNodes)
+ {
+ TransportConfiguration liveConnector = createTransportConfiguration(isNetty(),
false, generateParams(liveNode, isNetty()));
+ Configuration config0 = super.createDefaultConfig();
+ config0.getAcceptorConfigurations().clear();
+ config0.getAcceptorConfigurations().add(createTransportConfiguration(isNetty(),
true, generateParams(liveNode, isNetty())));
+ config0.setSecurityEnabled(false);
+ config0.setSharedStore(true);
+ config0.setClustered(true);
+ List<String> pairs = new ArrayList<String>();
+ for (int node : otherLiveNodes)
+ {
+ TransportConfiguration otherLiveConnector =
createTransportConfiguration(isNetty(), false, generateParams(node, isNetty()));
+ config0.getConnectorConfigurations().put(otherLiveConnector.getName(),
otherLiveConnector);
+ pairs.add(otherLiveConnector.getName());
+
+ }
+ ClusterConnectionConfiguration ccc0 = new
ClusterConnectionConfiguration("cluster1", "jms",
liveConnector.getName(), -1, false, false, 1, 1,
+ pairs);
+ config0.getClusterConfigurations().add(ccc0);
+ config0.getConnectorConfigurations().put(liveConnector.getName(), liveConnector);
+
+ config0.setBindingsDirectory(config0.getBindingsDirectory() + "_" +
liveNode);
+ config0.setJournalDirectory(config0.getJournalDirectory() + "_" +
liveNode);
+ config0.setPagingDirectory(config0.getPagingDirectory() + "_" +
liveNode);
+ config0.setLargeMessagesDirectory(config0.getLargeMessagesDirectory() +
"_" + liveNode);
+
+ servers.add(new SameProcessHornetQServer(createFakeLockServer(true, config0)));
+ }
+
+ protected boolean isNetty()
+ {
+ return false;
+ }
+}
Added:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/RemoteMultipleLivesMultipleBackupsFailoverTest.java
===================================================================
---
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/RemoteMultipleLivesMultipleBackupsFailoverTest.java
(rev 0)
+++
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/RemoteMultipleLivesMultipleBackupsFailoverTest.java 2010-09-13
13:35:19 UTC (rev 9675)
@@ -0,0 +1,211 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.cluster.failover;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.core.config.BackupConnectorConfiguration;
+import org.hornetq.core.config.ClusterConnectionConfiguration;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.server.JournalType;
+import org.hornetq.tests.integration.cluster.util.RemoteProcessHornetQServer;
+import org.hornetq.tests.integration.cluster.util.RemoteServerConfiguration;
+
+/**
+ * A RemoteMultipleLivesMultipleBackupsFailoverTest
+ *
+ * @author jmesnil
+ *
+ *
+ */
+public class RemoteMultipleLivesMultipleBackupsFailoverTest extends
MultipleLivesMultipleBackupsFailoverTest
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private static Map<Integer, String> lives = new HashMap<Integer,
String>();
+ private static Map<Integer, String> backups = new HashMap<Integer,
String>();
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ lives.put(0, LiveServerConfiguration0.class.getName());
+ lives.put(3, LiveServerConfiguration3.class.getName());
+
+ backups.put(1, SharedBackupServerConfiguration1.class.getName());
+ backups.put(2, SharedBackupServerConfiguration2.class.getName());
+ backups.put(4, SharedBackupServerConfiguration4.class.getName());
+ backups.put(5, SharedBackupServerConfiguration5.class.getName());
+ }
+
+ protected boolean isNetty()
+ {
+ return true;
+ }
+
+ @Override
+ protected void createLiveConfig(int liveNode, int... otherLiveNodes)
+ {
+ servers.add(new RemoteProcessHornetQServer(lives.get(liveNode)));
+ }
+
+ @Override
+ protected void createBackupConfig(int liveNode, int nodeid, boolean
createClusterConnections, int... nodes)
+ {
+ servers.add(new RemoteProcessHornetQServer(backups.get(nodeid)));
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+ public static class LiveServerConfiguration0 extends RemoteServerConfiguration
+ {
+ @Override
+ public Configuration getConfiguration()
+ {
+ return createLiveConfiguration(0, 3);
+ }
+ }
+
+ public static class LiveServerConfiguration3 extends RemoteServerConfiguration
+ {
+ @Override
+ public Configuration getConfiguration()
+ {
+ return createLiveConfiguration(3, 0);
+ }
+ }
+
+ public static class SharedBackupServerConfiguration1 extends
RemoteServerConfiguration
+ {
+ @Override
+ public Configuration getConfiguration()
+ {
+ return createBackupConfiguration(0, 1, true, 0, 3);
+ }
+ }
+
+ public static class SharedBackupServerConfiguration2 extends
RemoteServerConfiguration
+ {
+ @Override
+ public Configuration getConfiguration()
+ {
+ return createBackupConfiguration(0, 2, true, 0, 3);
+ }
+ }
+
+ public static class SharedBackupServerConfiguration4 extends
RemoteServerConfiguration
+ {
+ @Override
+ public Configuration getConfiguration()
+ {
+ return createBackupConfiguration(3, 4, true, 0, 3);
+ }
+ }
+
+ public static class SharedBackupServerConfiguration5 extends
RemoteServerConfiguration
+ {
+ @Override
+ public Configuration getConfiguration()
+ {
+ return createBackupConfiguration(3, 5, true, 0, 3);
+ }
+ }
+
+ protected static Configuration createBackupConfiguration(int liveNode, int nodeid,
boolean createClusterConnections, int... nodes)
+ {
+ Configuration config1 = new ConfigurationImpl();
+ config1.getAcceptorConfigurations().add(createTransportConfiguration(true, true,
generateParams(nodeid, true)));
+ config1.setSecurityEnabled(false);
+ config1.setSharedStore(true);
+ config1.setBackup(true);
+ config1.setJournalType(JournalType.NIO);
+ config1.setClustered(true);
+ List<String> staticConnectors = new ArrayList<String>();
+
+ for (int node : nodes)
+ {
+ TransportConfiguration liveConnector = createTransportConfiguration(true, false,
generateParams(node, true));
+ config1.getConnectorConfigurations().put(liveConnector.getName(),
liveConnector);
+ staticConnectors.add(liveConnector.getName());
+ }
+ TransportConfiguration backupConnector = createTransportConfiguration(true, false,
generateParams(nodeid, true));
+ List<String> pairs = null;
+ ClusterConnectionConfiguration ccc1 = new
ClusterConnectionConfiguration("cluster1", "jms",
backupConnector.getName(), -1, false, false, 1, 1,
+ createClusterConnections? staticConnectors:pairs);
+ config1.getClusterConfigurations().add(ccc1);
+ BackupConnectorConfiguration connectorConfiguration = new
BackupConnectorConfiguration(staticConnectors, backupConnector.getName());
+ config1.setBackupConnectorConfiguration(connectorConfiguration);
+ config1.getConnectorConfigurations().put(backupConnector.getName(),
backupConnector);
+
+
+ config1.setBindingsDirectory(config1.getBindingsDirectory() + "_" +
liveNode);
+ config1.setJournalDirectory(config1.getJournalDirectory() + "_" +
liveNode);
+ config1.setPagingDirectory(config1.getPagingDirectory() + "_" +
liveNode);
+ config1.setLargeMessagesDirectory(config1.getLargeMessagesDirectory() +
"_" + liveNode);
+
+ return config1;
+ }
+
+ protected static Configuration createLiveConfiguration(int liveNode, int...
otherLiveNodes)
+ {
+ Configuration config0 = new ConfigurationImpl();
+ TransportConfiguration liveConnector = createTransportConfiguration(true, false,
generateParams(liveNode, true));
+ config0.getConnectorConfigurations().put(liveConnector.getName(), liveConnector);
+ config0.getAcceptorConfigurations().add(createTransportConfiguration(true, true,
generateParams(liveNode, true)));
+ config0.setSecurityEnabled(false);
+ config0.setSharedStore(true);
+ config0.setJournalType(JournalType.NIO);
+ config0.setClustered(true);
+ List<String> pairs = new ArrayList<String>();
+ for (int node : otherLiveNodes)
+ {
+ TransportConfiguration otherLiveConnector = createTransportConfiguration(true,
false, generateParams(node, true));
+ config0.getConnectorConfigurations().put(otherLiveConnector.getName(),
otherLiveConnector);
+ pairs.add(otherLiveConnector.getName());
+
+ }
+ ClusterConnectionConfiguration ccc0 = new
ClusterConnectionConfiguration("cluster1", "jms",
liveConnector.getName(), -1, false, false, 1, 1,
+ pairs);
+ config0.getClusterConfigurations().add(ccc0);
+ config0.getConnectorConfigurations().put(liveConnector.getName(), liveConnector);
+
+ config0.setBindingsDirectory(config0.getBindingsDirectory() + "_" +
liveNode);
+ config0.setJournalDirectory(config0.getJournalDirectory() + "_" +
liveNode);
+ config0.setPagingDirectory(config0.getPagingDirectory() + "_" +
liveNode);
+ config0.setLargeMessagesDirectory(config0.getLargeMessagesDirectory() +
"_" + liveNode);
+
+ return config0;
+ }
+}
Added:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/RemoteSingleLiveMultipleBackupsFailoverTest.java
===================================================================
---
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/RemoteSingleLiveMultipleBackupsFailoverTest.java
(rev 0)
+++
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/RemoteSingleLiveMultipleBackupsFailoverTest.java 2010-09-13
13:35:19 UTC (rev 9675)
@@ -0,0 +1,199 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.cluster.failover;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.core.config.BackupConnectorConfiguration;
+import org.hornetq.core.config.ClusterConnectionConfiguration;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.server.JournalType;
+import org.hornetq.tests.integration.cluster.util.RemoteProcessHornetQServer;
+import org.hornetq.tests.integration.cluster.util.RemoteServerConfiguration;
+
+public class RemoteSingleLiveMultipleBackupsFailoverTest extends
SingleLiveMultipleBackupsFailoverTest
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private static Map<Integer, String> backups = new HashMap<Integer,
String>();
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ backups.put(1, SharedBackupServerConfiguration1.class.getName());
+ backups.put(2, SharedBackupServerConfiguration2.class.getName());
+ backups.put(3, SharedBackupServerConfiguration3.class.getName());
+ backups.put(4, SharedBackupServerConfiguration4.class.getName());
+ backups.put(5, SharedBackupServerConfiguration5.class.getName());
+ }
+
+ protected boolean isNetty()
+ {
+ return true;
+ }
+
+ @Override
+ protected void createLiveConfig(int liveNode, int... otherLiveNodes)
+ {
+ servers.add(new
RemoteProcessHornetQServer(SharedLiveServerConfiguration.class.getName()));
+ }
+
+ @Override
+ protected void createBackupConfig(int liveNode, int nodeid, boolean
createClusterConnections, int... nodes)
+ {
+ servers.add(new RemoteProcessHornetQServer(backups.get(nodeid)));
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+ public static class SharedLiveServerConfiguration extends RemoteServerConfiguration
+ {
+ @Override
+ public Configuration getConfiguration()
+ {
+ int liveNode = 0;
+ int[] otherLiveNodes = new int[0];
+
+ Configuration config0 = new ConfigurationImpl();
+ TransportConfiguration liveConnector = createTransportConfiguration(true, false,
generateParams(liveNode, true));
+ config0.getConnectorConfigurations().put(liveConnector.getName(),
liveConnector);
+ config0.getAcceptorConfigurations().add(createTransportConfiguration(true, true,
generateParams(liveNode, true)));
+ config0.setSecurityEnabled(false);
+ config0.setSharedStore(true);
+ config0.setJournalType(JournalType.NIO);
+ config0.setClustered(true);
+ List<String> pairs = new ArrayList<String>();
+ for (int node : otherLiveNodes)
+ {
+ TransportConfiguration otherLiveConnector =
createTransportConfiguration(true, false, generateParams(node, true));
+ config0.getConnectorConfigurations().put(otherLiveConnector.getName(),
otherLiveConnector);
+ pairs.add(otherLiveConnector.getName());
+
+ }
+ ClusterConnectionConfiguration ccc0 = new
ClusterConnectionConfiguration("cluster1", "jms",
liveConnector.getName(), -1, false, false, 1, 1,
+ pairs);
+ config0.getClusterConfigurations().add(ccc0);
+ config0.getConnectorConfigurations().put(liveConnector.getName(),
liveConnector);
+
+ config0.setBindingsDirectory(config0.getBindingsDirectory() + "_" +
liveNode);
+ config0.setJournalDirectory(config0.getJournalDirectory() + "_" +
liveNode);
+ config0.setPagingDirectory(config0.getPagingDirectory() + "_" +
liveNode);
+ config0.setLargeMessagesDirectory(config0.getLargeMessagesDirectory() +
"_" + liveNode);
+
+ return config0;
+ }
+ }
+
+ public static class SharedBackupServerConfiguration1 extends
RemoteServerConfiguration
+ {
+ @Override
+ public Configuration getConfiguration()
+ {
+ return createBackupConfiguration(0, 1, false, 0, 2, 3, 4, 5);
+ }
+ }
+
+ public static class SharedBackupServerConfiguration2 extends
RemoteServerConfiguration
+ {
+ @Override
+ public Configuration getConfiguration()
+ {
+ return createBackupConfiguration(0, 2, false, 0, 1, 3, 4, 5);
+ }
+ }
+
+ public static class SharedBackupServerConfiguration3 extends
RemoteServerConfiguration
+ {
+ @Override
+ public Configuration getConfiguration()
+ {
+ return createBackupConfiguration(0, 3, false, 0, 1, 2, 4, 5);
+ }
+ }
+
+ public static class SharedBackupServerConfiguration4 extends
RemoteServerConfiguration
+ {
+ @Override
+ public Configuration getConfiguration()
+ {
+ return createBackupConfiguration(0, 4, false, 0, 1, 2, 3, 5);
+ }
+ }
+
+ public static class SharedBackupServerConfiguration5 extends
RemoteServerConfiguration
+ {
+ @Override
+ public Configuration getConfiguration()
+ {
+ return createBackupConfiguration(0, 5, false, 0, 1, 2, 3, 4);
+ }
+ }
+
+ protected static Configuration createBackupConfiguration(int liveNode, int nodeid,
boolean createClusterConnections, int... nodes)
+ {
+ Configuration config1 = new ConfigurationImpl();
+ config1.getAcceptorConfigurations().add(createTransportConfiguration(true, true,
generateParams(nodeid, true)));
+ config1.setSecurityEnabled(false);
+ config1.setSharedStore(true);
+ config1.setBackup(true);
+ config1.setJournalType(JournalType.NIO);
+ config1.setClustered(true);
+ List<String> staticConnectors = new ArrayList<String>();
+
+ for (int node : nodes)
+ {
+ TransportConfiguration liveConnector = createTransportConfiguration(true, false,
generateParams(node, true));
+ config1.getConnectorConfigurations().put(liveConnector.getName(),
liveConnector);
+ staticConnectors.add(liveConnector.getName());
+ }
+ TransportConfiguration backupConnector = createTransportConfiguration(true, false,
generateParams(nodeid, true));
+ List<String> pairs = null;
+ ClusterConnectionConfiguration ccc1 = new
ClusterConnectionConfiguration("cluster1", "jms",
backupConnector.getName(), -1, false, false, 1, 1,
+ createClusterConnections? staticConnectors:pairs);
+ config1.getClusterConfigurations().add(ccc1);
+ BackupConnectorConfiguration connectorConfiguration = new
BackupConnectorConfiguration(staticConnectors, backupConnector.getName());
+ config1.setBackupConnectorConfiguration(connectorConfiguration);
+ config1.getConnectorConfigurations().put(backupConnector.getName(),
backupConnector);
+
+
+ config1.setBindingsDirectory(config1.getBindingsDirectory() + "_" +
liveNode);
+ config1.setJournalDirectory(config1.getJournalDirectory() + "_" +
liveNode);
+ config1.setPagingDirectory(config1.getPagingDirectory() + "_" +
liveNode);
+ config1.setLargeMessagesDirectory(config1.getLargeMessagesDirectory() +
"_" + liveNode);
+
+ return config1;
+ }
+}
Copied:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/SingleLiveMultipleBackupsFailoverTest.java
(from rev 9654,
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupFailoverTest.java)
===================================================================
---
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/SingleLiveMultipleBackupsFailoverTest.java
(rev 0)
+++
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/SingleLiveMultipleBackupsFailoverTest.java 2010-09-13
13:35:19 UTC (rev 9675)
@@ -0,0 +1,167 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.cluster.failover;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
+import org.hornetq.core.config.BackupConnectorConfiguration;
+import org.hornetq.core.config.ClusterConnectionConfiguration;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.tests.integration.cluster.util.SameProcessHornetQServer;
+import org.hornetq.tests.integration.cluster.util.TestableServer;
+
+/**
+ */
+public class SingleLiveMultipleBackupsFailoverTest extends
MultipleBackupsFailoverTestBase
+{
+
+ protected ArrayList<TestableServer> servers = new
ArrayList<TestableServer>(5);
+
+ public void testMultipleFailovers() throws Exception
+ {
+ createLiveConfig(0);
+ createBackupConfig(0, 1,false, 0, 2, 3, 4, 5);
+ createBackupConfig(0, 2,false, 0, 1, 3, 4, 5);
+ createBackupConfig(0, 3,false, 0, 1, 2, 4, 5);
+ createBackupConfig(0, 4, false, 0, 1, 2, 3, 4);
+ createBackupConfig(0, 5, false, 0, 1, 2, 3, 4);
+ servers.get(0).start();
+ servers.get(1).start();
+ servers.get(2).start();
+ servers.get(3).start();
+ servers.get(4).start();
+ servers.get(5).start();
+
+ ServerLocator locator = getServerLocator(0);
+
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setBlockOnAcknowledge(true);
+ locator.setReconnectAttempts(-1);
+ ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator,
2);
+ int backupNode;
+ ClientSession session = sendAndConsume(sf, true);
+ System.out.println("failing node 0");
+ servers.get(0).crash(session);
+
+ session.close();
+ backupNode = waitForBackup(5, servers, 1, 2, 3, 4, 5);
+ session = sendAndConsume(sf, false);
+ System.out.println("failing node " + backupNode);
+ servers.get(backupNode).crash(session);
+
+ session.close();
+ backupNode = waitForBackup(5, servers, 1, 2, 3, 4, 5);
+ session = sendAndConsume(sf, false);
+ System.out.println("failing node " + backupNode);
+ servers.get(backupNode).crash(session);
+
+ session.close();
+ backupNode = waitForBackup(5, servers, 1, 2, 3, 4, 5);
+ session = sendAndConsume(sf, false);
+ System.out.println("failing node " + backupNode);
+ servers.get(backupNode).crash(session);
+
+ session.close();
+ backupNode = waitForBackup(5, servers, 1, 2, 3, 4, 5);
+ session = sendAndConsume(sf, false);
+ System.out.println("failing node " + backupNode);
+ servers.get(backupNode).crash(session);
+
+ session.close();
+ backupNode = waitForBackup(5, servers, 1, 2, 3, 4, 5);
+ session = sendAndConsume(sf, false);
+ session.close();
+ servers.get(backupNode).stop();
+ }
+
+ protected void createBackupConfig(int liveNode, int nodeid, boolean
createClusterConnections, int... nodes)
+ {
+ Configuration config1 = super.createDefaultConfig();
+ config1.getAcceptorConfigurations().clear();
+ config1.getAcceptorConfigurations().add(createTransportConfiguration(isNetty(),
true, generateParams(nodeid, isNetty())));
+ config1.setSecurityEnabled(false);
+ config1.setSharedStore(true);
+ config1.setBackup(true);
+ config1.setClustered(true);
+ List<String> staticConnectors = new ArrayList<String>();
+
+ for (int node : nodes)
+ {
+ TransportConfiguration liveConnector = createTransportConfiguration(isNetty(),
false, generateParams(node, isNetty()));
+ config1.getConnectorConfigurations().put(liveConnector.getName(),
liveConnector);
+ staticConnectors.add(liveConnector.getName());
+ }
+ TransportConfiguration backupConnector = createTransportConfiguration(isNetty(),
false, generateParams(nodeid, isNetty()));
+ List<String> pairs = null;
+ ClusterConnectionConfiguration ccc1 = new
ClusterConnectionConfiguration("cluster1", "jms",
backupConnector.getName(), -1, false, false, 1, 1,
+ createClusterConnections? staticConnectors:pairs);
+ config1.getClusterConfigurations().add(ccc1);
+ BackupConnectorConfiguration connectorConfiguration = new
BackupConnectorConfiguration(staticConnectors, backupConnector.getName());
+ config1.setBackupConnectorConfiguration(connectorConfiguration);
+ config1.getConnectorConfigurations().put(backupConnector.getName(),
backupConnector);
+
+
+ config1.setBindingsDirectory(config1.getBindingsDirectory() + "_" +
liveNode);
+ config1.setJournalDirectory(config1.getJournalDirectory() + "_" +
liveNode);
+ config1.setPagingDirectory(config1.getPagingDirectory() + "_" +
liveNode);
+ config1.setLargeMessagesDirectory(config1.getLargeMessagesDirectory() +
"_" + liveNode);
+
+ servers.add(new SameProcessHornetQServer(createFakeLockServer(true, config1)));
+ }
+
+ protected void createLiveConfig(int liveNode, int ... otherLiveNodes)
+ {
+ TransportConfiguration liveConnector = createTransportConfiguration(isNetty(),
false, generateParams(liveNode, isNetty()));
+ Configuration config0 = super.createDefaultConfig();
+ config0.getAcceptorConfigurations().clear();
+ config0.getAcceptorConfigurations().add(createTransportConfiguration(isNetty(),
true, generateParams(liveNode, isNetty())));
+ config0.setSecurityEnabled(false);
+ config0.setSharedStore(true);
+ config0.setClustered(true);
+ List<String> pairs = new ArrayList<String>();
+ for (int node : otherLiveNodes)
+ {
+ TransportConfiguration otherLiveConnector =
createTransportConfiguration(isNetty(), false, generateParams(node, isNetty()));
+ config0.getConnectorConfigurations().put(otherLiveConnector.getName(),
otherLiveConnector);
+ pairs.add(otherLiveConnector.getName());
+
+ }
+ ClusterConnectionConfiguration ccc0 = new
ClusterConnectionConfiguration("cluster1", "jms",
liveConnector.getName(), -1, false, false, 1, 1,
+ pairs);
+ config0.getClusterConfigurations().add(ccc0);
+ config0.getConnectorConfigurations().put(liveConnector.getName(), liveConnector);
+
+ config0.setBindingsDirectory(config0.getBindingsDirectory() + "_" +
liveNode);
+ config0.setJournalDirectory(config0.getJournalDirectory() + "_" +
liveNode);
+ config0.setPagingDirectory(config0.getPagingDirectory() + "_" +
liveNode);
+ config0.setLargeMessagesDirectory(config0.getLargeMessagesDirectory() +
"_" + liveNode);
+
+ servers.add(new SameProcessHornetQServer(createFakeLockServer(true, config0)));
+ }
+
+ protected boolean isNetty()
+ {
+ return false;
+ }
+
+
+
+
+}
Modified:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util/RemoteProcessHornetQServer.java
===================================================================
---
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util/RemoteProcessHornetQServer.java 2010-09-13
07:44:26 UTC (rev 9674)
+++
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util/RemoteProcessHornetQServer.java 2010-09-13
13:35:19 UTC (rev 9675)
@@ -34,7 +34,9 @@
private String configurationClassName;
private Process serverProcess;
-
+ private boolean initialised = false;
+ private CountDownLatch initLatch;
+
public RemoteProcessHornetQServer(String configurationClassName)
{
this.configurationClassName = configurationClassName;
@@ -42,12 +44,37 @@
public boolean isInitialised()
{
- return (serverProcess != null);
+ if (serverProcess == null)
+ {
+ return false;
+ }
+ try
+ {
+ initLatch = new CountDownLatch(1);
+ RemoteProcessHornetQServerSupport.isInitialised(serverProcess);
+ boolean ok = initLatch.await(10, TimeUnit.SECONDS);
+ if (ok)
+ {
+ return initialised;
+ }
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ return false;
+ }
+ return false;
}
+
+ public void setInitialised(boolean initialised)
+ {
+ this.initialised = initialised;
+ initLatch.countDown();
+ }
public void start() throws Exception
{
- serverProcess = RemoteProcessHornetQServerSupport.start(configurationClassName);
+ serverProcess = RemoteProcessHornetQServerSupport.start(configurationClassName,
this);
Thread.sleep(2000);
}
Modified:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util/RemoteProcessHornetQServerSupport.java
===================================================================
---
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util/RemoteProcessHornetQServerSupport.java 2010-09-13
07:44:26 UTC (rev 9674)
+++
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util/RemoteProcessHornetQServerSupport.java 2010-09-13
13:35:19 UTC (rev 9675)
@@ -60,8 +60,12 @@
String line = null;
while ((line = br.readLine()) != null)
{
- if ("STOP".equals(line.trim()))
+ if ("INIT?".equals(line.trim()))
{
+ System.out.println("INIT:" + server.isInitialised());
+ }
+ else if ("STOP".equals(line.trim()))
+ {
server.stop();
System.out.println("Server stopped");
System.exit(0);
@@ -90,7 +94,7 @@
}
- public static Process start(String serverClassName) throws Exception
+ public static Process start(String serverClassName, final RemoteProcessHornetQServer
remoteProcessHornetQServer) throws Exception
{
String[] vmArgs = new String[] {
"-Dorg.hornetq.logger-delegate-factory-class-name=org.hornetq.jms.SysoutLoggerDelegateFactory"
};
Process serverProcess =
SpawnedVMSupport.spawnVM(RemoteProcessHornetQServerSupport.class.getName(), vmArgs, false,
serverClassName);
@@ -115,6 +119,11 @@
while ((line = br.readLine()) != null)
{
System.out.println("SERVER: " + line);
+ if (line.startsWith("INIT:"))
+ {
+ boolean init =
Boolean.parseBoolean(line.substring("INIT:".length(), line.length()));
+ remoteProcessHornetQServer.setInitialised(init);
+ }
}
}
catch (Exception e)
@@ -146,6 +155,13 @@
serverProcess.destroy();
}
}
+
+ public static void isInitialised(Process serverProcess) throws Exception
+ {
+ OutputStreamWriter osw = new OutputStreamWriter(serverProcess.getOutputStream());
+ osw.write("INIT?\n");
+ osw.flush();
+ }
public static void crash(Process serverProcess) throws Exception
{