[jboss-cvs] JBoss Messaging SVN: r2314 - in branches/Branch_1_0_1_SP: tests/src/org/jboss/test/messaging/jms/stress and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Feb 14 17:48:52 EST 2007


Author: clebert.suconic at jboss.com
Date: 2007-02-14 17:48:51 -0500 (Wed, 14 Feb 2007)
New Revision: 2314

Added:
   branches/Branch_1_0_1_SP/tests/src/org/jboss/test/messaging/jms/stress/ConcurrentStopStartStressTest.java
Modified:
   branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
   branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
Log:
http://jira.jboss.org/jira/browse/JBMESSAGING-836 - proposed fix

Modified: branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
===================================================================
--- branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java	2007-02-14 20:25:03 UTC (rev 2313)
+++ branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java	2007-02-14 22:48:51 UTC (rev 2314)
@@ -745,16 +745,13 @@
    
    private void setStarted(boolean s) throws Throwable
    {
-      synchronized(sessions)
+      for(Iterator i = new HashSet(sessions.values()).iterator(); i.hasNext(); )
       {
-         for (Iterator i = sessions.values().iterator(); i.hasNext(); )
-         {
-            ServerSessionEndpoint sd = (ServerSessionEndpoint)i.next();
-            sd.setStarted(s);
-         }
-         started = s;
+         ServerSessionEndpoint sd = (ServerSessionEndpoint)i.next();
+         sd.setStarted(s);
       }
-   }   
+      started = s;
+   }
    
    private void processTransaction(TxState txState, Transaction tx) throws Throwable
    {

Modified: branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2007-02-14 20:25:03 UTC (rev 2313)
+++ branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2007-02-14 22:48:51 UTC (rev 2314)
@@ -808,22 +808,19 @@
     */
    protected void setStarted(boolean s) throws Throwable
    {
-      synchronized(consumers)
+      for(Iterator i = new HashSet(consumers.values()).iterator(); i.hasNext(); )
       {
-         for(Iterator i = consumers.values().iterator(); i.hasNext(); )
+         ServerConsumerEndpoint sce = (ServerConsumerEndpoint)i.next();
+         if (s)
          {
-            ServerConsumerEndpoint sce = (ServerConsumerEndpoint)i.next();
-            if (s)
-            {
-               sce.start();
-            }
-            else
-            {
-               sce.stop();
-            }
+            sce.start();
          }
+         else
+         {
+            sce.stop();
+         }
       }
-   }   
+   }
 
    // Private -------------------------------------------------------
 

Added: branches/Branch_1_0_1_SP/tests/src/org/jboss/test/messaging/jms/stress/ConcurrentStopStartStressTest.java
===================================================================
--- branches/Branch_1_0_1_SP/tests/src/org/jboss/test/messaging/jms/stress/ConcurrentStopStartStressTest.java	                        (rev 0)
+++ branches/Branch_1_0_1_SP/tests/src/org/jboss/test/messaging/jms/stress/ConcurrentStopStartStressTest.java	2007-02-14 22:48:51 UTC (rev 2314)
@@ -0,0 +1,369 @@
+/*
+   * JBoss, Home of Professional Open Source
+   * Copyright 2005, JBoss Inc., and individual contributors as indicated
+   * by the @authors tag. See the copyright.txt in the distribution for a
+   * full listing of individual contributors.
+   *
+   * This is free software; you can redistribute it and/or modify it
+   * under the terms of the GNU Lesser General Public License as
+   * published by the Free Software Foundation; either version 2.1 of
+   * the License, or (at your option) any later version.
+   *
+   * This software is distributed in the hope that it will be useful,
+   * but WITHOUT ANY WARRANTY; without even the implied warranty of
+   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+   * Lesser General Public License for more details.
+   *
+   * You should have received a copy of the GNU Lesser General Public
+   * License along with this software; if not, write to the Free
+   * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+   * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+   */
+
+package org.jboss.test.messaging.jms.stress;
+
+import org.jboss.test.messaging.MessagingTestCase;
+import org.jboss.test.messaging.tools.ServerManagement;
+import org.jboss.jms.client.JBossConnectionFactory;
+import javax.naming.InitialContext;
+import javax.jms.Connection;
+import javax.jms.Session;
+import javax.jms.Destination;
+import javax.jms.MessageProducer;
+import javax.jms.MessageConsumer;
+import javax.jms.TextMessage;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Random;
+
+/**
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * @version <tt>$Revision$</tt>
+ *
+ * $Id$
+ */
+public class ConcurrentStopStartStressTest extends MessagingTestCase
+{
+
+   // Constants ------------------------------------------------------------------------------------
+
+   // Attributes -----------------------------------------------------------------------------------
+
+   InitialContext ic;
+   JBossConnectionFactory cf;
+   protected Destination queue;
+   boolean running = false;
+   Object semaphore = new Object();
+
+   // Static ---------------------------------------------------------------------------------------
+
+   // Constructors ---------------------------------------------------------------------------------
+
+   public ConcurrentStopStartStressTest(String name)
+   {
+      super(name);
+   }
+
+   // Public ---------------------------------------------------------------------------------------
+
+   public void testConcurrentStopStart() throws Throwable
+   {
+      Connection conn = cf.createConnection();
+
+      ArrayList threads = new ArrayList();
+
+      for (int i = 0; i < 10; i++)
+      {
+         threads.add(new ThreadProducer("Producer " + i, conn));
+         threads.add(new ThreadConsumer("Consumer " + i, conn));
+      }
+      for (int i = 0; i < 10; i++)
+      {
+         threads.add(new ThreadConnectionStarter("Connect Starter(" + i + ")", conn, i == 0));
+      }
+
+      for (Iterator iter = threads.iterator(); iter.hasNext();)
+      {
+         Thread thread = (Thread) iter.next();
+         thread.start();
+      }
+
+      conn.start();
+
+      // Wait enough time to everybody line up
+      Thread.sleep(1000);
+      synchronized (semaphore)
+      {
+         this.running = true;
+         semaphore.notifyAll();
+      }
+
+      Thread.sleep(30000);
+      synchronized (semaphore)
+      {
+         running = false;
+      }
+
+      for (Iterator iter = threads.iterator(); iter.hasNext();)
+      {
+         Thread thread = (Thread) iter.next();
+         thread.join();
+      }
+
+      int produced = 0;
+      int consumed = 0;
+
+      for (Iterator iter = threads.iterator(); iter.hasNext();)
+      {
+         TestThread thread = (TestThread) iter.next();
+         if (thread.e != null)
+         {
+            throw thread.e;
+         }
+
+         if (thread instanceof ThreadProducer)
+         {
+            produced += thread.counter;
+         } else if (thread instanceof ThreadConsumer)
+         {
+            consumed += thread.counter;
+         }
+      }
+
+      assertEquals(produced, consumed);
+
+
+   }
+   // Package protected ----------------------------------------------------------------------------
+
+   // Protected ------------------------------------------------------------------------------------
+
+   public void setUp() throws Exception
+   {
+      super.setUp();
+
+      running = false;
+
+      ServerManagement.start("all");
+
+
+      ic = new InitialContext(ServerManagement.getJNDIEnvironment());
+      cf = (JBossConnectionFactory) ic.lookup("/ConnectionFactory");
+
+      ServerManagement.undeployQueue("TestQueue");
+      ServerManagement.deployQueue("TestQueue");
+
+      queue = (Destination) ic.lookup("queue/TestQueue");
+
+      drainDestination(cf, queue);
+
+      log.debug("setup done");
+   }
+
+   public void tearDown() throws Exception
+   {
+      System.getProperties().remove("jboss.messaging.callback.bind.address");
+      ServerManagement.undeployQueue("TestQueue");
+      super.tearDown();
+
+      log.debug("tear down done");
+   }
+
+   // Private --------------------------------------------------------------------------------------
+
+   // Innert classes used by testInvalidClientIP ---------------------------------------------------
+
+   class TestThread extends Thread
+   {
+      Throwable e;
+      Connection conn;
+      Session session;
+      int counter = 0;
+
+
+      public TestThread(String name, Connection conn) throws Exception
+      {
+         super(name);
+         this.conn = conn;
+      }
+
+      protected void createSession() throws Exception
+      {
+         this.session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      }
+
+      public void waitSemaphore() throws Exception
+      {
+         synchronized (semaphore)
+         {
+            if (!running)
+            {
+               semaphore.wait();
+            }
+         }
+      }
+   }
+
+   class ThreadProducer extends TestThread
+   {
+      MessageProducer producer;
+
+
+      public ThreadProducer(String name, Connection conn) throws Exception
+      {
+         super(name, conn);
+         createSession();
+         this.producer = session.createProducer(queue);
+      }
+
+
+      public void run()
+      {
+         try
+         {
+            waitSemaphore();
+            int i = 0;
+            while (true)
+            {
+               synchronized (semaphore)
+               {
+                  if (!running)
+                  {
+                     break;
+                  }
+               }
+               producer.send(session.createTextMessage("Message " + i));
+               counter++;
+               log.info("Message " + (i++) + " sent");
+               Thread.sleep(500);
+            }
+         }
+         catch (Throwable e)
+         {
+            this.e = e;
+         }
+
+      }
+   }
+
+   class ThreadConsumer extends TestThread
+   {
+      MessageConsumer consumer;
+
+
+      public ThreadConsumer(String name, Connection conn) throws Exception
+      {
+         super(name, conn);
+         createSession();
+         this.consumer = session.createConsumer(queue);
+      }
+
+
+      public void run()
+      {
+         try
+         {
+            waitSemaphore();
+            while (true)
+            {
+               TextMessage message = (TextMessage) consumer.receive(3000);
+               if (message == null && !running)
+               {
+                  break;
+               }
+
+               log.info("message " + message.getText() + " received");
+               counter++;
+            }
+         }
+         catch (Throwable e)
+         {
+            this.e = e;
+         }
+
+      }
+   }
+
+   class ThreadConnectionStarter extends TestThread
+   {
+
+      Random random = new Random();
+      boolean master;
+
+      public ThreadConnectionStarter(String name, Connection conn, boolean master) throws Exception
+      {
+         super(name, conn);
+         this.master = master;
+      }
+
+
+      public void run()
+      {
+         try
+         {
+            waitSemaphore();
+            Thread.sleep(3000); // Three seconds until first attempt;
+            boolean willBreak = false;
+            while (!willBreak)
+            {
+               if (master)
+               {
+                  Thread.sleep(1000);
+               }
+               
+               synchronized (semaphore)
+               {
+                  /// The master thread will synchronize everybody to ensure we are
+                  /// having everybody trying to stop/start simultaneously
+                  if (master)
+                  {
+                     semaphore.notifyAll();
+                  } else
+                  {
+                     // I want to synchronize everybody on stopping closing the session
+                     semaphore.wait();
+                  }
+
+                  if (!running)
+                  {
+                     willBreak = true;
+                  }
+               }
+
+               if (willBreak)
+               {
+                  if (master)
+                  {
+                     // An extra notify to make sure everybody stops
+                     Thread.sleep(3000);
+                     synchronized (semaphore)
+                     {
+                        semaphore.notifyAll();
+                     }
+                     break;
+                  }
+                  else
+                  {
+                     break;
+                  }
+               }
+
+               // Randomly few threads are starting the connection first
+               if (random.nextBoolean())
+               {
+                  conn.start();
+               }
+               conn.stop();
+               conn.start(); // ensure we start right after stopping, so testcase will complete
+            }
+         }
+         catch (Throwable e)
+         {
+            this.e = e;
+         }
+
+      }
+   }
+
+
+}


Property changes on: branches/Branch_1_0_1_SP/tests/src/org/jboss/test/messaging/jms/stress/ConcurrentStopStartStressTest.java
___________________________________________________________________
Name: svn:keywords
   + Id LastChangedDate Author Revision




More information about the jboss-cvs-commits mailing list