[jboss-cvs] JBoss Messaging SVN: r3114 - trunk/tests/src/org/jboss/test/messaging/jms/stress/clustering.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Sep 18 13:05:39 EDT 2007
Author: clebert.suconic at jboss.com
Date: 2007-09-18 13:05:39 -0400 (Tue, 18 Sep 2007)
New Revision: 3114
Added:
trunk/tests/src/org/jboss/test/messaging/jms/stress/clustering/FailoverOverDistributionStressTest.java
Log:
http://jira.jboss.org/jira/browse/JBMESSAGING-1074
Added: trunk/tests/src/org/jboss/test/messaging/jms/stress/clustering/FailoverOverDistributionStressTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/stress/clustering/FailoverOverDistributionStressTest.java (rev 0)
+++ trunk/tests/src/org/jboss/test/messaging/jms/stress/clustering/FailoverOverDistributionStressTest.java 2007-09-18 17:05:39 UTC (rev 3114)
@@ -0,0 +1,344 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.test.messaging.jms.stress.clustering;
+
+import org.jboss.test.messaging.jms.clustering.ClusteringTestBase;
+import org.jboss.test.messaging.tools.ServerManagement;
+import org.jboss.jms.client.JBossConnection;
+import org.jboss.jms.client.FailoverListener;
+import org.jboss.jms.client.FailoverEvent;
+import javax.jms.Connection;
+import javax.jms.Session;
+import javax.jms.JMSException;
+import javax.jms.Queue;
+import javax.jms.MessageProducer;
+import javax.jms.MessageConsumer;
+import javax.jms.Message;
+import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
+
+/**
+ * This test was added to duplicate this issue: http://jira.jboss.org/jira/browse/JBMESSAGING-1074
+ * @author <a href="sergey.koshcheyev at jboss.com">Sergey Koshcheyev</a>
+ * @author <a href="clebert.suconic at jboss.com">Clebert Suconic</a>
+ * @version <tt>$Revision$</tt>
+ *
+ * $Id$
+ */
+public class FailoverOverDistributionStressTest extends ClusteringTestBase
+{
+ // Constants -----------------------------------------------------
+
+ // The number of concurrent connections (worker threads)
+ private final int CONNECTION_COUNT = 20;
+
+ // The node that connections will initially connect to
+ private final int CONNECT_NODE = 1;
+
+ // The node the connections are expected to fail over to
+ private final int FAILOVER_NODE = 0;
+
+ // The number of messages each worker thread will send or receive
+ private final int MESSAGE_COUNT_PER_CONNECTION = 1000;
+
+ // Time in which all threads are supposed to fail over
+ private final long FAILOVER_TIMEOUT = 45000L;
+
+ private static int produced = 0;
+ private static int consumed = 0;
+
+ private static synchronized void incProduced()
+ {
+ produced++;
+ }
+
+ private static synchronized int getProduced()
+ {
+ return produced;
+ }
+
+ private static synchronized int getConsumed()
+ {
+ return consumed;
+ }
+
+ private static synchronized void incConsumed()
+ {
+ consumed++;
+ }
+
+ // Static --------------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public FailoverOverDistributionStressTest(String name)
+ {
+ super(name);
+ this.nodeCount = 2;
+ }
+
+ // Public --------------------------------------------------------
+
+ /**
+ * Establishes many connections to a single node, each connection
+ * either sends or receives messages from a queue. Then kills the node.
+ * All connections should successfully fail over to the failover node.
+ */
+ public void testFailoverManyConnections() throws Exception
+ {
+ produced = 0;
+ consumed = 0;
+
+ Connection connections[] = new Connection[CONNECTION_COUNT];
+ FailoverOverDistributionStressTest.ConnectionWorker workers[] = new FailoverOverDistributionStressTest.ConnectionWorker[CONNECTION_COUNT];
+
+ try
+ {
+ log.info("creating " + CONNECTION_COUNT + " threads to connect to server " + CONNECT_NODE);
+
+ for (int i = 0; i < CONNECTION_COUNT; i++)
+ {
+ if (i % 2 == 0)
+ {
+ connections[i] = createConnectionOnServer(cf, 1);
+ workers[i] = new FailoverOverDistributionStressTest.ConnectionSenderThread(i, connections[i], queue[CONNECT_NODE]);
+ }
+ else
+ {
+ connections[i] = createConnectionOnServer(cf, 0);
+ workers[i] = new FailoverOverDistributionStressTest.ConnectionReceiverThread(i, connections[i], queue[CONNECT_NODE]);
+ }
+ }
+
+ for (int i = 0; i < CONNECTION_COUNT; i++)
+ {
+ workers[i].start();
+ }
+
+ log.info("waiting for a few seconds so that threads begin working");
+
+ while (true)
+ {
+ int produced = getProduced();
+ if (produced > (MESSAGE_COUNT_PER_CONNECTION * (CONNECTION_COUNT/2))/2 )
+ {
+ killAndWaitForFailover(connections);
+ break;
+ }
+ else
+ {
+ log.info("Produced " + produced + " messages.. Consumed (" + getConsumed() + ")" );
+ }
+ Thread.sleep(500);
+ }
+
+ for (int i = 0; i < CONNECTION_COUNT; i++)
+ {
+ assertEquals("Connection #" + i + " did not fail over", FAILOVER_NODE, getServerId(connections[i]));
+ }
+
+ log.info("waiting for connection threads to finish");
+
+ boolean fail = false;
+ for (int i = 0; i < CONNECTION_COUNT; i++)
+ {
+ workers[i].join();
+ Exception e = workers[i].getException();
+ if (e != null)
+ {
+ log.error("Thread " + workers[i].getName() + " terminated abnormally:", e);
+ fail = true;
+ }
+ }
+
+ if (fail) { fail("Some threads terminated abnormally"); }
+
+ assertEquals(produced, consumed);
+ }
+ finally
+ {
+ for (int i = 0; i < CONNECTION_COUNT; i++)
+ {
+ if (connections[i] != null)
+ {
+ connections[i].close();
+ }
+ }
+ }
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ private void killAndWaitForFailover(Connection[] connections) throws Exception
+ {
+ // register a failover listener
+ FailoverOverDistributionStressTest.LoggingFailoverListener failoverListener = new FailoverOverDistributionStressTest.LoggingFailoverListener();
+
+ int numConnections=0;
+ for (int i = 0; i < connections.length; i++)
+ {
+ if (getServerId(connections[i]) == 1)
+ {
+ ((JBossConnection)connections[i]).registerFailoverListener(failoverListener);
+ numConnections ++;
+ }
+ }
+
+ ServerManagement.kill(1);
+
+ log.info("killed node 1, now waiting for all connections to fail over");
+
+ long killTime = System.currentTimeMillis();
+ while (failoverListener.getFailoverCount() < numConnections
+ && System.currentTimeMillis() - killTime <= FAILOVER_TIMEOUT)
+ {
+ Thread.sleep(3000L);
+ }
+
+ assertEquals("Not all connections have failed over successfully",
+ numConnections, failoverListener.getFailoverCount());
+ }
+
+ // Inner classes -------------------------------------------------
+
+ private class LoggingFailoverListener implements FailoverListener
+ {
+ private final SynchronizedInt count = new SynchronizedInt(0);
+
+ public void failoverEventOccured(FailoverEvent event)
+ {
+ if (FailoverEvent.FAILOVER_COMPLETED != event.getType())
+ {
+ // Not interested
+ return;
+ }
+
+ int newCount = count.increment();
+
+ log.info("received FAILOVER_COMPLETED event, " + newCount + " connections failed over so far");
+ }
+
+ public int getFailoverCount()
+ {
+ return count.get();
+ }
+ }
+
+ private abstract class ConnectionWorker extends Thread
+ {
+ private volatile Exception exception;
+ private final Connection connection;
+
+ public ConnectionWorker(String name, Connection connection)
+ {
+ super(name);
+ this.connection = connection;
+ }
+
+ public void run()
+ {
+ Session session = null;
+ try
+ {
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ connection.start();
+ runInSession(session);
+ }
+ catch (Exception e)
+ {
+ this.exception = e;
+ }
+ finally
+ {
+ if (session != null)
+ {
+ try { session.close(); } catch(JMSException e) { }
+ }
+ }
+ }
+
+ protected abstract void runInSession(Session session) throws Exception;
+
+ public Exception getException()
+ {
+ return exception;
+ }
+ }
+
+ private class ConnectionSenderThread extends FailoverOverDistributionStressTest.ConnectionWorker
+ {
+ private final Queue queue;
+
+ public ConnectionSenderThread(int i, Connection connection, Queue queue)
+ {
+ super("ConnectionSenderThread#" + i, connection);
+ this.queue = queue;
+ }
+
+ public void runInSession(Session session) throws Exception
+ {
+ MessageProducer producer = session.createProducer(queue);
+
+ for (int i = 0; i < MESSAGE_COUNT_PER_CONNECTION; i++)
+ {
+ producer.send(session.createTextMessage("Hello"));
+ incProduced();
+ }
+ }
+ }
+
+ private class ConnectionReceiverThread extends FailoverOverDistributionStressTest.ConnectionWorker
+ {
+ private final Queue queue;
+
+ public ConnectionReceiverThread(int i, Connection connection, Queue queue)
+ {
+ super("ConnectionReceiverThread#" + i, connection);
+ this.queue = queue;
+ }
+
+ public void runInSession(Session session) throws Exception
+ {
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ for (int i = 0; i < MESSAGE_COUNT_PER_CONNECTION; i++)
+ {
+ Message message = consumer.receive(20000);
+ if (message != null)
+ {
+ log.info("Received message");
+ incConsumed();
+ }
+ else
+ {
+ break;
+ }
+ }
+ }
+ }
+}
Property changes on: trunk/tests/src/org/jboss/test/messaging/jms/stress/clustering/FailoverOverDistributionStressTest.java
___________________________________________________________________
Name: svn:keywords
+ Id LastChangedDate Author Revision
More information about the jboss-cvs-commits
mailing list