Author: jmesnil
Date: 2010-09-01 10:32:17 -0400 (Wed, 01 Sep 2010)
New Revision: 9624
Added:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/remote/
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/remote/FailoverWithSharedStoreTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/remote/RemoteHornetQServer.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/remote/RemoteServerConfiguration.java
Log:
add helper methods to start/stop/crash remote hornetq servers
Added:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/remote/FailoverWithSharedStoreTest.java
===================================================================
---
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/remote/FailoverWithSharedStoreTest.java
(rev 0)
+++
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/remote/FailoverWithSharedStoreTest.java 2010-09-01
14:32:17 UTC (rev 9624)
@@ -0,0 +1,189 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.cluster.failover.remote;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.core.config.ClusterConnectionConfiguration;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
+import org.hornetq.core.server.JournalType;
+import org.hornetq.tests.integration.cluster.distribution.ClusterTestBase;
+
+/**
+ * A ServerTest
+ *
+ * @author jmesnil
+ *
+ *
+ */
+public class FailoverWithSharedStoreTest extends ClusterTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ static class SharedLiveServerConfiguration extends RemoteServerConfiguration
+ {
+
+ @Override
+ public Configuration getConfiguration()
+ {
+ Configuration config = new ConfigurationImpl();
+ config.setSecurityEnabled(false);
+ config.setJournalType(JournalType.NIO);
+ config.setSharedStore(true);
+ config.setClustered(true);
+ config.getAcceptorConfigurations().add(createTransportConfiguration(true, true,
generateParams(0, true)));
+ config.getConnectorConfigurations().put("self",
+ createTransportConfiguration(true,
false, generateParams(0, true)));
+ config.getClusterConfigurations().add(new
ClusterConnectionConfiguration("cluster",
+
"foo",
+
"self",
+ -1,
+ false,
+ false,
+ 1,
+ 1,
+ new
ArrayList<String>()));
+ return config;
+ }
+
+ }
+
+ static class SharedBackupServerConfiguration extends RemoteServerConfiguration
+ {
+
+ @Override
+ public Configuration getConfiguration()
+ {
+ Configuration config = new ConfigurationImpl();
+ config.setSecurityEnabled(false);
+ config.setJournalType(JournalType.NIO);
+ config.setSharedStore(true);
+ config.setBackup(true);
+ config.setClustered(true);
+ config.getAcceptorConfigurations().add(createTransportConfiguration(true, true,
generateParams(1, true)));
+ config.setLiveConnectorName("live");
+ config.getConnectorConfigurations().put("live",
+ createTransportConfiguration(true,
false, generateParams(0, true)));
+ config.getConnectorConfigurations().put("self",
+ createTransportConfiguration(true,
false, generateParams(1, true)));
+ List<String> connectors = new ArrayList<String>();
+ connectors.add("live");
+ config.getClusterConfigurations().add(new
ClusterConnectionConfiguration("cluster",
+
"foo",
+
"self",
+ -1,
+ false,
+ false,
+ 1,
+ 1,
+
connectors));
+ return config;
+ }
+
+ }
+
+ public void testCrashLiveServer() throws Exception
+ {
+ Process liveServer = null;
+ Process backupServer = null;
+ try
+ {
+ liveServer =
RemoteHornetQServer.start(SharedLiveServerConfiguration.class.getName());
+ backupServer =
RemoteHornetQServer.start(SharedBackupServerConfiguration.class.getName());
+
+ ServerLocator locator =
HornetQClient.createServerLocatorWithHA(createTransportConfiguration(true,
+
false,
+
generateParams(0,
+
true)));
+ locator.setFailoverOnInitialConnection(true);
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+ ClientSession prodSession = sf.createSession();
+ prodSession.createQueue("foo", "bar", true);
+ ClientProducer producer = prodSession.createProducer("foo");
+ ClientMessage message = prodSession.createMessage(true);
+ message.putStringProperty("key", "value");
+ producer.send(message);
+ prodSession.commit();
+ prodSession.close();
+
+ RemoteHornetQServer.crash(liveServer);
+ assertTrue(liveServer.exitValue() == 1);
+ liveServer = null;
+ Thread.sleep(5000);
+
+ sf = locator.createSessionFactory();
+ ClientSession consSession = sf.createSession();
+ consSession.start();
+ ClientConsumer consumer = consSession.createConsumer("bar");
+ ClientMessage receivedMessage = consumer.receive(5000);
+ assertNotNull(receivedMessage);
+ assertEquals(message.getStringProperty("key"),
receivedMessage.getStringProperty("key"));
+ receivedMessage.acknowledge();
+
+ consumer.close();
+ consSession.deleteQueue("bar");
+ locator.close();
+
+ }
+ finally
+ {
+ if (liveServer != null)
+ {
+ RemoteHornetQServer.stop(liveServer);
+
+ }
+ if (backupServer != null)
+ {
+ RemoteHornetQServer.stop(backupServer);
+
+ }
+ }
+
+ }
+
+ public void _testNoConnection() throws Exception
+ {
+ ServerLocator locator = HornetQClient.createServerLocatorWithHA(new
TransportConfiguration(NettyConnectorFactory.class.getName()));
+ locator.createSessionFactory();
+ }
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Added:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/remote/RemoteHornetQServer.java
===================================================================
---
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/remote/RemoteHornetQServer.java
(rev 0)
+++
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/remote/RemoteHornetQServer.java 2010-09-01
14:32:17 UTC (rev 9624)
@@ -0,0 +1,173 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.cluster.failover.remote;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.HornetQServers;
+import org.hornetq.tests.util.SpawnedVMSupport;
+
+/**
+ * A RemoteHornetQServer
+ *
+ * @author jmesnil
+ *
+ *
+ */
+public class RemoteHornetQServer
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ public static void main(final String[] args) throws Exception
+ {
+ try
+ {
+ String serverClass = args[0];
+ System.out.println("Instantiate " + serverClass);
+ RemoteServerConfiguration spawnedServer =
(RemoteServerConfiguration)Class.forName(serverClass).newInstance();
+
+ HornetQServer server =
HornetQServers.newHornetQServer(spawnedServer.getConfiguration());
+ server.start();
+
+ System.out.println("Server started, ready to start client test");
+
+ // create the reader before printing OK so that if the test is quick
+ // we will still capture the STOP message sent by the client
+ InputStreamReader isr = new InputStreamReader(System.in);
+ BufferedReader br = new BufferedReader(isr);
+
+ System.out.println("OK");
+
+ String line = null;
+ while ((line = br.readLine()) != null)
+ {
+ if ("STOP".equals(line.trim()))
+ {
+ //server.stop();
+ System.out.println("Server stopped");
+ System.exit(0);
+ }
+ else
+ {
+ // stop anyway but with a error status
+ System.out.println("Server crashed");
+ System.exit(1);
+ }
+ }
+ }
+ catch (Throwable t)
+ {
+ t.printStackTrace();
+ String allStack = t.getCause().getMessage() + "|";
+ StackTraceElement[] stackTrace = t.getCause().getStackTrace();
+ for (StackTraceElement stackTraceElement : stackTrace)
+ {
+ allStack += stackTraceElement.toString() + "|";
+ }
+ System.out.println(allStack);
+ System.out.println("KO");
+ System.exit(1);
+ }
+ }
+
+
+ public static Process start(String serverClassName) throws Exception
+ {
+ String[] vmArgs = new String[] {
"-Dorg.hornetq.logger-delegate-factory-class-name=org.hornetq.jms.SysoutLoggerDelegateFactory"
};
+ Process serverProcess =
SpawnedVMSupport.spawnVM(RemoteHornetQServer.class.getName(), vmArgs, false,
serverClassName);
+ InputStreamReader isr = new InputStreamReader(serverProcess.getInputStream());
+
+ final BufferedReader br = new BufferedReader(isr);
+ String line = null;
+ while ((line = br.readLine()) != null)
+ {
+ System.out.println("SERVER: " + line);
+ line.replace('|', '\n');
+ if ("OK".equals(line.trim()))
+ {
+ new Thread()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ String line = null;
+ while ((line = br.readLine()) != null)
+ {
+ System.out.println("SERVER: " + line);
+ }
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+ }.start();
+ return serverProcess;
+ }
+ else if ("KO".equals(line.trim()))
+ {
+ // something went wrong with the server, destroy it:
+ serverProcess.destroy();
+ throw new IllegalStateException("Unable to start the spawned server
:" + line);
+ }
+ }
+ return serverProcess;
+ }
+
+ public static void stop(Process serverProcess) throws Exception
+ {
+ OutputStreamWriter osw = new OutputStreamWriter(serverProcess.getOutputStream());
+ osw.write("STOP\n");
+ osw.flush();
+ int exitValue = serverProcess.waitFor();
+ if (exitValue != 0)
+ {
+ serverProcess.destroy();
+ }
+ }
+
+ public static void crash(Process serverProcess) throws Exception
+ {
+ OutputStreamWriter osw = new OutputStreamWriter(serverProcess.getOutputStream());
+ osw.write("KILL\n");
+ osw.flush();
+ int exitValue = serverProcess.waitFor();
+ if (exitValue != 0)
+ {
+ serverProcess.destroy();
+ }
+ }
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Added:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/remote/RemoteServerConfiguration.java
===================================================================
---
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/remote/RemoteServerConfiguration.java
(rev 0)
+++
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/remote/RemoteServerConfiguration.java 2010-09-01
14:32:17 UTC (rev 9624)
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.cluster.failover.remote;
+
+import org.hornetq.core.config.Configuration;
+
+/**
+ * A RemoteServerConfiguration
+ *
+ * @author jmesnil
+ *
+ */
+public abstract class RemoteServerConfiguration
+{
+
+ public RemoteServerConfiguration() {
+ }
+
+ public abstract Configuration getConfiguration();
+}