[jboss-cvs] JBoss Messaging SVN: r1864 - in trunk: src/main/org/jboss/jms/client/container tests/src/org/jboss/test/messaging/jms/clustering

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Dec 27 23:02:34 EST 2006


Author: clebert.suconic at jboss.com
Date: 2006-12-27 23:02:30 -0500 (Wed, 27 Dec 2006)
New Revision: 1864

Added:
   trunk/tests/src/org/jboss/test/messaging/jms/clustering/HAStressTest.java
Removed:
   trunk/tests/src/org/jboss/test/messaging/jms/clustering/ValveTest.java
Modified:
   trunk/src/main/org/jboss/jms/client/container/HAAspect.java
   trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java
Log:
Enabling Valve, changing FailoverTest and renaming ValveTest to HAStressTest

Modified: trunk/src/main/org/jboss/jms/client/container/HAAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/HAAspect.java	2006-12-27 21:16:46 UTC (rev 1863)
+++ trunk/src/main/org/jboss/jms/client/container/HAAspect.java	2006-12-28 04:02:30 UTC (rev 1864)
@@ -87,7 +87,7 @@
    public static final int MAX_IO_RETRY_COUNT = 2;
 
    // Setting this attribute to false will disable the valve
-   public static final boolean INSTALL_VALVE = false;
+   public static final boolean INSTALL_VALVE = true;
 
    // Static --------------------------------------------------------
 

Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java	2006-12-27 21:16:46 UTC (rev 1863)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java	2006-12-28 04:02:30 UTC (rev 1864)
@@ -89,10 +89,6 @@
          {
          }
 
-         // TODO - this shouldn't be necessary if we have the client valve in place
-         log.info("Sleeping for 60 sec");
-         Thread.sleep(60000);
-
          // we must receive the message
 
          TextMessage tm = (TextMessage)c1.receive(1000);

Copied: trunk/tests/src/org/jboss/test/messaging/jms/clustering/HAStressTest.java (from rev 1859, trunk/tests/src/org/jboss/test/messaging/jms/clustering/ValveTest.java)
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/ValveTest.java	2006-12-24 23:01:01 UTC (rev 1859)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/HAStressTest.java	2006-12-28 04:02:30 UTC (rev 1864)
@@ -0,0 +1,279 @@
+/*
+   * JBoss, Home of Professional Open Source
+   * Copyright 2005, JBoss Inc., and individual contributors as indicated
+   * by the @authors tag. See the copyright.txt in the distribution for a
+   * full listing of individual contributors.
+   *
+   * This is free software; you can redistribute it and/or modify it
+   * under the terms of the GNU Lesser General Public License as
+   * published by the Free Software Foundation; either version 2.1 of
+   * the License, or (at your option) any later version.
+   *
+   * This software is distributed in the hope that it will be useful,
+   * but WITHOUT ANY WARRANTY; without even the implied warranty of
+   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+   * Lesser General Public License for more details.
+   *
+   * You should have received a copy of the GNU Lesser General Public
+   * License along with this software; if not, write to the Free
+   * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+   * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+   */
+
+package org.jboss.test.messaging.jms.clustering;
+
+import org.jboss.test.messaging.jms.clustering.base.ClusteringTestBase;
+import org.jboss.test.messaging.tools.ServerManagement;
+import org.jboss.jms.client.JBossConnectionFactory;
+import org.jboss.jms.client.JBossConnection;
+import org.jboss.jms.client.delegate.ClientConnectionDelegate;
+import org.jboss.logging.Logger;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import javax.jms.Destination;
+import javax.jms.Connection;
+import javax.jms.MessageProducer;
+import javax.jms.Message;
+import javax.jms.DeliveryMode;
+import java.util.ArrayList;
+import java.util.Iterator;
+
+/**
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * @version <tt>$Revision:$</tt>
+ *          <p/>
+ *          $Id:$
+ */
+public class HAStressTest extends ClusteringTestBase
+{
+
+   int NUMBER_OF_PRODUCER_THREADS=1;
+   int NUMBER_OF_CONSUMER_THREADS=1;
+
+   public HAStressTest(String name)
+   {
+      super(name);
+   }
+
+   int messageCounterConsumer = 0;
+   int messageCounterProducer = 0;
+
+
+   Object lockReader = new Object();
+   Object lockWriter = new Object();
+   Object semaphore = new Object();
+
+   boolean shouldStop = false;
+
+
+   class LocalThreadConsumer extends Thread
+   {
+      private final Logger log = Logger.getLogger(this.getClass());
+
+      int id;
+      MessageConsumer consumer;
+      Session session;
+
+      public LocalThreadConsumer(int id, Session session, Destination destination) throws Exception
+      {
+         super("LocalThreadConsumer-" + id);
+         consumer = session.createConsumer(destination);
+         this.session = session;
+         this.id = id;
+      }
+
+
+      public void run()
+      {
+         try
+         {
+            synchronized (semaphore)
+            {
+               semaphore.wait();
+            }
+
+            int counter = 0;
+            while (true)
+            {
+               Message message = consumer.receive(5000);
+               if (message == null && shouldStop)
+               {
+                  break;
+               }
+               if (message != null)
+               {
+                  synchronized (lockReader)
+                  {
+                     messageCounterConsumer++;
+                  }
+                  log.trace("ReceiverID=" + id + " received message " + message);
+                  if (counter++ % 10 == 0)
+                  {
+                     //log.info("Commit on id=" + id);
+                     //session.commit();
+                  }
+               }
+            }
+            //session.commit();
+         }
+         catch (Exception e)
+         {
+            log.info("Caught exception... finishing Thread " + id, e);
+         }
+      }
+   }
+
+   class LocalThreadProducer extends Thread
+   {
+      private final Logger log = Logger.getLogger(this.getClass());
+
+      MessageProducer producer;
+      Session session;
+      int id;
+
+      public LocalThreadProducer(int id, Session session, Destination destination) throws Exception
+      {
+         super("LocalThreadProducer-" + id);
+         this.session = session;
+         producer = session.createProducer(destination);
+         producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+         this.id = id;
+      }
+
+      public void run()
+      {
+         try
+         {
+            synchronized (semaphore)
+            {
+               semaphore.wait();
+            }
+
+            int counter = 0;
+            while (!shouldStop)
+            {
+               log.trace("Producer ID=" + id + " send message");
+               producer.send(session.createTextMessage("Message from producer " + id + " counter=" + (counter)));
+
+               synchronized (lockWriter)
+               {
+                  messageCounterProducer++;
+               }
+
+               if (counter++ % 5 == 0)
+               {
+                  //log.info("Committing message");
+                  //session.commit();
+               }
+            }
+
+         }
+         catch (Exception e)
+         {
+            log.info("Caught exception... finishing Thread " + id, e);
+         }
+      }
+   }
+
+   /**
+    * This test will open several Consumers at the same Connection and it will kill the server, expecting failover
+    * to happen inside the Valve
+    */
+   public void testMultiThreadFailover() throws Exception
+   {
+      // This test will be disabled until we implement the valve
+      //JBossConnectionFactory factory = (JBossConnectionFactory) ic[1].lookup("/ConnectionFactory");
+
+      Connection conn1 = cf.createConnection();
+      Connection conn2 = cf.createConnection();
+      Connection conn3 = cf.createConnection();
+
+      log.info("Created connections");
+
+      checkConnectionsDifferentServers(new Connection[]{conn1, conn2, conn3});
+
+      Connection conn = getConnection(new Connection[]{conn1, conn2, conn3}, 1);
+      conn.start();
+
+      for (int i = 0; i < 3; i++)
+      {
+         JBossConnection connTest = (JBossConnection) getConnection(new Connection[]{conn1, conn2, conn3}, i);
+
+         String locator = ((ClientConnectionDelegate) connTest.getDelegate()).getRemotingConnection().
+            getInvokingClient().getInvoker().getLocator().getLocatorURI();
+
+         log.info("Server " + i + " has locator=" + locator);
+
+      }
+
+
+      ArrayList threadList = new ArrayList();
+
+      for (int i = 0; i < NUMBER_OF_PRODUCER_THREADS; i++)
+      {
+         threadList.add(new LocalThreadProducer(i, conn.createSession(false, Session.AUTO_ACKNOWLEDGE), queue[1]));
+      }
+
+      for (int i = 0; i < NUMBER_OF_CONSUMER_THREADS; i++)
+      {
+         threadList.add(new LocalThreadConsumer(i, conn.createSession(false, Session.AUTO_ACKNOWLEDGE), queue[1]));
+      }
+
+      for (Iterator iter = threadList.iterator(); iter.hasNext();)
+      {
+         Thread t = (Thread) iter.next();
+         t.start();
+      }
+
+      Thread.sleep(1000);
+      synchronized (semaphore)
+      {
+         semaphore.notifyAll();
+      }
+
+      Thread.sleep(30000);
+
+      log.info("Killing server 1");
+      ServerManagement.log(ServerManagement.INFO, "Server 1 will be killed");
+      ServerManagement.log(ServerManagement.INFO, "Server 1 will be killed", 2);
+      log.info("messageCounterConsumer=" + messageCounterConsumer + ", messageCounterProducer=" + messageCounterProducer);
+
+      ServerManagement.kill(1);
+
+      Thread.sleep(50000);
+      log.info("messageCounterConsumer=" + messageCounterConsumer + ", messageCounterProducer=" + messageCounterProducer);
+      shouldStop = true;
+
+      for (Iterator iter = threadList.iterator(); iter.hasNext();)
+      {
+         Thread t = (Thread) iter.next();
+         t.join();
+      }
+
+      log.info("messageCounterConsumer=" + messageCounterConsumer + ", messageCounterProducer=" + messageCounterProducer);
+
+      assertEquals(messageCounterProducer, messageCounterConsumer);
+
+      conn1.close();
+      conn2.close();
+      conn3.close();
+
+   }
+
+   // Protected -----------------------------------------------------
+
+   protected void setUp() throws Exception
+   {
+      nodeCount = 3;
+
+      super.setUp();
+
+      log.debug("setup done");
+   }
+
+   protected void tearDown() throws Exception
+   {
+      super.tearDown();
+   }
+
+}

Deleted: trunk/tests/src/org/jboss/test/messaging/jms/clustering/ValveTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/ValveTest.java	2006-12-27 21:16:46 UTC (rev 1863)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/ValveTest.java	2006-12-28 04:02:30 UTC (rev 1864)
@@ -1,264 +0,0 @@
-/*
-   * JBoss, Home of Professional Open Source
-   * Copyright 2005, JBoss Inc., and individual contributors as indicated
-   * by the @authors tag. See the copyright.txt in the distribution for a
-   * full listing of individual contributors.
-   *
-   * This is free software; you can redistribute it and/or modify it
-   * under the terms of the GNU Lesser General Public License as
-   * published by the Free Software Foundation; either version 2.1 of
-   * the License, or (at your option) any later version.
-   *
-   * This software is distributed in the hope that it will be useful,
-   * but WITHOUT ANY WARRANTY; without even the implied warranty of
-   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-   * Lesser General Public License for more details.
-   *
-   * You should have received a copy of the GNU Lesser General Public
-   * License along with this software; if not, write to the Free
-   * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
-   * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
-   */
-
-package org.jboss.test.messaging.jms.clustering;
-
-import org.jboss.test.messaging.jms.clustering.base.ClusteringTestBase;
-import org.jboss.test.messaging.tools.ServerManagement;
-import org.jboss.jms.client.JBossConnectionFactory;
-import org.jboss.jms.client.JBossConnection;
-import org.jboss.jms.client.delegate.ClientConnectionDelegate;
-import org.jboss.logging.Logger;
-import javax.jms.MessageConsumer;
-import javax.jms.Session;
-import javax.jms.Destination;
-import javax.jms.Connection;
-import javax.jms.MessageProducer;
-import javax.jms.Message;
-import java.util.ArrayList;
-import java.util.Iterator;
-
-/**
- * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
- * @version <tt>$Revision:$</tt>
- *          <p/>
- *          $Id:$
- */
-public class ValveTest extends ClusteringTestBase
-{
-
-   public ValveTest(String name)
-   {
-      super(name);
-   }
-
-   int messageCounterConsumer = 0;
-   int messageCounterProducer = 0;
-
-
-   Object lockReader = new Object();
-   Object lockWriter = new Object();
-   Object semaphore = new Object();
-
-   boolean shouldStop = false;
-
-
-   class LocalThreadConsumer extends Thread
-   {
-      private final Logger log = Logger.getLogger(this.getClass());
-
-      int id;
-      MessageConsumer consumer;
-      Session session;
-
-      public LocalThreadConsumer(int id, Session session, Destination destination) throws Exception
-      {
-         consumer = session.createConsumer(destination);
-         this.session = session;
-         this.id = id;
-      }
-
-
-      public void run()
-      {
-         try
-         {
-            synchronized (semaphore)
-            {
-               semaphore.wait();
-            }
-
-            int counter = 0;
-            while (true)
-            {
-               Message message = consumer.receive(5000);
-               if (message == null && shouldStop)
-               {
-                  break;
-               }
-               if (message != null)
-               {
-                  synchronized (lockReader)
-                  {
-                     messageCounterConsumer++;
-                  }
-                  log.trace("ReceiverID=" + id + " received message " + message);
-                  if (counter++ % 10 == 0)
-                  {
-                     //log.info("Commit on id=" + id);
-                     //session.commit();
-                  }
-               }
-            }
-            //session.commit();
-         }
-         catch (Exception e)
-         {
-            log.info("Caught exception... finishing Thread " + id, e);
-         }
-      }
-   }
-
-   class LocalThreadProducer extends Thread
-   {
-      private final Logger log = Logger.getLogger(this.getClass());
-
-      MessageProducer producer;
-      Session session;
-      int id;
-
-      public LocalThreadProducer(int id, Session session, Destination destination) throws Exception
-      {
-         this.session = session;
-         producer = session.createProducer(destination);
-         this.id = id;
-      }
-
-      public void run()
-      {
-         try
-         {
-            synchronized (semaphore)
-            {
-               semaphore.wait();
-            }
-
-            int counter = 0;
-            while (!shouldStop)
-            {
-               log.trace("Producer ID=" + id + " send message");
-               producer.send(session.createTextMessage("Message from producer " + id + " counter=" + (counter)));
-
-               synchronized (lockWriter)
-               {
-                  messageCounterProducer++;
-               }
-
-               if (counter++ % 5 == 0)
-               {
-                  //log.info("Committing message");
-                  //session.commit();
-               }
-            }
-
-         }
-         catch (Exception e)
-         {
-            log.info("Caught exception... finishing Thread " + id, e);
-         }
-      }
-   }
-
-   /**
-    * This test will open several Consumers at the same Connection and it will kill the server, expecting failover
-    * to happen inside the Valve
-    */
-   public void testMultiThreadFailover() throws Exception
-   {
-       // This test will be disabled until we implement the valve
-//      JBossConnectionFactory factory = (JBossConnectionFactory) ic[1].lookup("/ConnectionFactory");
-//
-//      Connection conn1 = cf.createConnection();
-//      Connection conn2 = cf.createConnection();
-//      Connection conn3 = cf.createConnection();
-//
-//      log.info("Created connections");
-//
-//      checkConnectionsDifferentServers(new Connection[]{conn1, conn2, conn3});
-//
-//      Connection conn = getConnection(new Connection[]{conn1, conn2, conn3}, 1);
-//      conn.start();
-//
-//      for (int i = 0; i < 3; i++)
-//      {
-//         JBossConnection connTest = (JBossConnection)getConnection(new Connection[]{conn1, conn2, conn3}, i);
-//
-//         String locator = ((ClientConnectionDelegate) connTest.getDelegate()).getRemotingConnection().
-//            getInvokingClient().getInvoker().getLocator().getLocatorURI();
-//
-//         log.info("Server " + i + " has locator=" + locator);
-//
-//      }
-//
-//
-//      ArrayList list = new ArrayList();
-//
-//      for (int i = 0; i < 5; i++)
-//      {
-//         list.add(new LocalThreadProducer(i, conn.createSession(false, Session.AUTO_ACKNOWLEDGE), queue[1]));
-//         list.add(new LocalThreadConsumer(i, conn.createSession(false, Session.AUTO_ACKNOWLEDGE), queue[1]));
-//      }
-//
-//      for (Iterator iter = list.iterator(); iter.hasNext();)
-//      {
-//         Thread t = (Thread) iter.next();
-//         t.start();
-//      }
-//
-//      Thread.sleep(1000);
-//      synchronized (semaphore)
-//      {
-//         semaphore.notifyAll();
-//      }
-//
-//      Thread.sleep(30000);
-//
-//      log.info("Killing server 1");
-//      ServerManagement.log(ServerManagement.INFO, "Server 1 will be killed");
-//      ServerManagement.log(ServerManagement.INFO, "Server 1 will be killed", 2);
-//      log.info("messageCounterConsumer=" + messageCounterConsumer + ", messageCounterProducer=" + messageCounterProducer);
-//
-//      ServerManagement.kill(1);
-//
-//      Thread.sleep(50000);
-//      log.info("messageCounterConsumer=" + messageCounterConsumer + ", messageCounterProducer=" + messageCounterProducer);
-//      shouldStop = true;
-//
-//      for (Iterator iter = list.iterator(); iter.hasNext();)
-//      {
-//         Thread t = (Thread) iter.next();
-//         t.join();
-//      }
-//
-//      log.info("messageCounterConsumer=" + messageCounterConsumer + ", messageCounterProducer=" + messageCounterProducer);
-//
-//      assertEquals(messageCounterProducer, messageCounterConsumer);
-//
-   }
-
-   // Protected -----------------------------------------------------
-
-   protected void setUp() throws Exception
-   {
-      nodeCount = 3;
-
-      //super.setUp();
-
-      log.debug("setup done");
-   }
-
-   protected void tearDown() throws Exception
-   {
-      //super.tearDown();
-   }
-
-}




More information about the jboss-cvs-commits mailing list