[jboss-cvs] JBoss Messaging SVN: r2845 - in trunk/tests/src/org/jboss/test/messaging: jms/clustering and 1 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Jul 5 04:58:21 EDT 2007


Author: sergeypk
Date: 2007-07-05 04:58:21 -0400 (Thu, 05 Jul 2007)
New Revision: 2845

Added:
   trunk/tests/src/org/jboss/test/messaging/jms/stress/clustering/FailoverStressTest.java
Modified:
   trunk/tests/src/org/jboss/test/messaging/MessagingTestCase.java
   trunk/tests/src/org/jboss/test/messaging/jms/clustering/ClusteringTestBase.java
Log:
http://jira.jboss.com/jira/browse/JBMESSAGING-572 - a preliminary failover stress test, most probably broken.

Modified: trunk/tests/src/org/jboss/test/messaging/MessagingTestCase.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/MessagingTestCase.java	2007-07-05 05:42:36 UTC (rev 2844)
+++ trunk/tests/src/org/jboss/test/messaging/MessagingTestCase.java	2007-07-05 08:58:21 UTC (rev 2845)
@@ -288,7 +288,7 @@
     * @param conn a JMS connection
     * @return the ID of the ServerPeer the connection is communicating with.
     */
-   protected int getServerId(Connection conn)
+   protected static int getServerId(Connection conn)
    {
       return ((JBossConnection) conn).getServerID();
    }

Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/ClusteringTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/ClusteringTestBase.java	2007-07-05 05:42:36 UTC (rev 2844)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/ClusteringTestBase.java	2007-07-05 08:58:21 UTC (rev 2845)
@@ -146,11 +146,6 @@
          getRemotingClient().getInvoker().getLocator().getLocatorURI();
    }
 
-   protected int getServerId(Connection conn)
-   {
-      return getConnectionState(conn).getServerID();
-   }
-
    protected int getObjectId(Connection conn)
    {
       return ((DelegateSupport) ((JBossConnection) conn).

Added: trunk/tests/src/org/jboss/test/messaging/jms/stress/clustering/FailoverStressTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/stress/clustering/FailoverStressTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/test/messaging/jms/stress/clustering/FailoverStressTest.java	2007-07-05 08:58:21 UTC (rev 2845)
@@ -0,0 +1,304 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2007, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.test.messaging.jms.stress.clustering;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.jboss.jms.client.FailoverEvent;
+import org.jboss.jms.client.FailoverListener;
+import org.jboss.jms.client.JBossConnection;
+import org.jboss.test.messaging.jms.clustering.ClusteringTestBase;
+import org.jboss.test.messaging.tools.ServerManagement;
+
+import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
+
+/**
+ * @author <a href="sergey.koshcheyev at jboss.com">Sergey Koshcheyev</a>
+ * @version <tt>$Revision$</tt>
+ * 
+ * $Id$
+ */
+public class FailoverStressTest extends ClusteringTestBase
+{
+   // Constants -----------------------------------------------------
+
+   // The number of concurrent connections (worker threads)
+   private final int CONNECTION_COUNT = 20;
+   
+   // The node that connections will initially connect to
+   private final int CONNECT_NODE = 1;
+   
+   // The node the connections are expected to fail over to
+   private final int FAILOVER_NODE = 0;
+
+   // The number of messages each worker thread will send or receive
+   private final int MESSAGE_COUNT_PER_CONNECTION = 100;
+   
+   // The interval between sending messages (for sender threads)
+   private final long SLEEP_PER_MESSAGE = 100L;
+
+   // The sleep interval between the time connection threads
+   // are started and the time the server is killed. 
+   private final long PAUSE_BEFORE_KILL = MESSAGE_COUNT_PER_CONNECTION * SLEEP_PER_MESSAGE / 2;
+   
+   // Time in which all threads are supposed to fail over
+   private final long FAILOVER_TIMEOUT = 15000L;
+
+   // Static --------------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Constructors --------------------------------------------------
+   
+   public FailoverStressTest(String name)
+   {
+      super(name);
+      this.nodeCount = 2;
+   }
+
+   // Public --------------------------------------------------------
+   
+   /**
+    * Establishes many connections to a single node, each connection
+    * either sends or receives messages from a queue. Then kills the node.
+    * All connections should successfully fail over to the failover node.
+    */
+   public void testFailoverManyConnections() throws Exception
+   {
+      Connection connections[] = new Connection[CONNECTION_COUNT];
+      ConnectionWorker workers[] = new ConnectionWorker[CONNECTION_COUNT];
+      
+      try
+      {
+         log.info("creating " + CONNECTION_COUNT + " threads to connect to server " + CONNECT_NODE);
+
+         for (int i = 0; i < CONNECTION_COUNT; i++)
+         {
+            connections[i] = createConnectionOnServer(cf, CONNECT_NODE);
+            if (i % 2 == 0)
+            {
+               workers[i] = new ConnectionSenderThread(i, connections[i], queue[CONNECT_NODE]);
+            }
+            else
+            {
+               workers[i] = new ConnectionReceiverThread(i, connections[i], queue[CONNECT_NODE]);
+            }
+         }
+         
+         for (int i = 0; i < CONNECTION_COUNT; i++)
+         {
+            workers[i].start();
+         }
+         
+         log.info("waiting for a few seconds so that threads begin working");
+         
+         Thread.sleep(PAUSE_BEFORE_KILL);
+
+         log.info("killing node " + CONNECT_NODE);
+
+         killAndWaitForFailover(connections);
+         
+         for (int i = 0; i < CONNECTION_COUNT; i++)
+         {
+            assertEquals("Connection #" + i + " did not fail over", FAILOVER_NODE, getServerId(connections[i]));
+         }
+
+         log.info("waiting for connection threads to finish");
+
+         boolean fail = false;
+         for (int i = 0; i < CONNECTION_COUNT; i++)
+         {
+            workers[i].join();
+            Exception e = workers[i].getException();
+            if (e != null)
+            {
+               log.error("Thread " + workers[i].getName() + " terminated abnormally:", e);
+               fail = true;
+            }
+         }
+         
+         if (fail) { fail("Some threads terminated abnormally"); }
+      }
+      finally
+      {
+         for (int i = 0; i < CONNECTION_COUNT; i++)
+         {
+            if (connections[i] != null)
+            {
+               connections[i].close();
+            }
+         }
+      }
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+   
+   private void killAndWaitForFailover(Connection[] connections) throws Exception
+   {
+      // register a failover listener
+      LoggingFailoverListener failoverListener = new LoggingFailoverListener();
+      
+      for (int i = 0; i < connections.length; i++)
+      {
+         ((JBossConnection)connections[i]).registerFailoverListener(failoverListener);
+      }
+
+      int serverId = getServerId(connections[0]);
+
+      ServerManagement.killAndWait(serverId);
+
+      log.info("killed node " + serverId + ", now waiting for all connections to fail over");
+      
+      long killTime = System.currentTimeMillis();
+      while (failoverListener.getFailoverCount() < connections.length
+            && System.currentTimeMillis() - killTime <= FAILOVER_TIMEOUT)
+      {
+         Thread.sleep(3000L);
+      }
+      
+      assertEquals("Not all connections have failed over successfully",
+            connections.length, failoverListener.getFailoverCount());
+   }
+
+   // Inner classes -------------------------------------------------
+   
+   private class LoggingFailoverListener implements FailoverListener
+   {
+      private final SynchronizedInt count = new SynchronizedInt(0);
+
+      public void failoverEventOccured(FailoverEvent event)
+      {
+         if (FailoverEvent.FAILOVER_COMPLETED != event.getType())
+         {
+            // Not interested
+            return;
+         }
+         
+         int newCount = count.increment();
+         
+         log.info("received FAILOVER_COMPLETED event, " + newCount + " connections failed over so far");
+      }
+      
+      public int getFailoverCount()
+      {
+         return count.get();
+      }
+   }
+   
+   private abstract class ConnectionWorker extends Thread
+   {
+      private volatile Exception exception;
+      private final Connection connection;
+      
+      public ConnectionWorker(String name, Connection connection)
+      {
+         super(name);
+         this.connection = connection;
+      }
+
+      public void run()
+      {
+         Session session = null;
+         try
+         {
+            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            connection.start();
+            runInSession(session);
+         }
+         catch (Exception e)
+         {
+            this.exception = e;
+         }
+         finally
+         {
+            if (session != null)
+            {
+               try { session.close(); } catch(JMSException e) { }
+            }
+         }
+      }
+      
+      protected abstract void runInSession(Session session) throws Exception;
+      
+      public Exception getException()
+      {
+         return exception;
+      }
+   }
+
+   private class ConnectionSenderThread extends ConnectionWorker
+   {
+      private final Queue queue;
+
+      public ConnectionSenderThread(int i, Connection connection, Queue queue)
+      {
+         super("ConnectionSenderThread#" + i, connection);
+         this.queue = queue;
+      }
+
+      public void runInSession(Session session) throws Exception
+      {
+         MessageProducer producer = session.createProducer(queue);
+         
+         for (int i = 0; i < MESSAGE_COUNT_PER_CONNECTION; i++)
+         {
+            Thread.sleep(SLEEP_PER_MESSAGE);
+            producer.send(session.createTextMessage("Hello"));
+         }
+      }
+   }
+   
+   private class ConnectionReceiverThread extends ConnectionWorker
+   {
+      private final Queue queue;
+
+      public ConnectionReceiverThread(int i, Connection connection, Queue queue)
+      {
+         super("ConnectionReceiverThread#" + i, connection);
+         this.queue = queue;
+      }
+
+      public void runInSession(Session session) throws Exception
+      {
+         MessageConsumer consumer = session.createConsumer(queue);
+         
+         for (int i = 0; i < MESSAGE_COUNT_PER_CONNECTION; i++)
+         {
+            Message message = consumer.receive(2 * SLEEP_PER_MESSAGE);
+            if (message == null)
+            {
+               break;
+            }
+         }
+      }
+   }
+}


Property changes on: trunk/tests/src/org/jboss/test/messaging/jms/stress/clustering/FailoverStressTest.java
___________________________________________________________________
Name: svn:keywords
   + Id LastChangedDate Author Revision




More information about the jboss-cvs-commits mailing list