[jboss-cvs] JBoss Messaging SVN: r2761 - in trunk: src/main/org/jboss/jms/client/container and 6 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Jun 6 16:06:54 EDT 2007


Author: timfox
Date: 2007-06-06 16:06:54 -0400 (Wed, 06 Jun 2007)
New Revision: 2761

Added:
   trunk/tests/src/org/jboss/test/messaging/jms/stress/OpenCloseStressTest.java
Modified:
   trunk/docs/userguide/en/modules/c_configuration.xml
   trunk/src/main/org/jboss/jms/client/container/SessionAspect.java
   trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
   trunk/src/main/org/jboss/jms/server/remoting/JMSServerInvocationHandler.java
   trunk/src/main/org/jboss/messaging/core/ChannelSupport.java
   trunk/src/main/org/jboss/messaging/core/PagingChannelSupport.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java
Log:
tests/src/org/jboss/test/messaging/jms/stress/OpenCloseStressTest.java


Modified: trunk/docs/userguide/en/modules/c_configuration.xml
===================================================================
--- trunk/docs/userguide/en/modules/c_configuration.xml	2007-06-06 08:17:48 UTC (rev 2760)
+++ trunk/docs/userguide/en/modules/c_configuration.xml	2007-06-06 20:06:54 UTC (rev 2761)
@@ -12,7 +12,7 @@
 
    <itemizedlist>
    
-      <listitem>Choosing the connetion factory</listitem>
+      <listitem>Choosing the connection factory</listitem>
 
       <listitem>Choosing the cluster router policy</listitem>
 
@@ -28,7 +28,8 @@
       </para>
       
       <para>See the section <xref linkend="conf.connectionfactory">Connection Factory configuration</xref> for more information.
-      </para>
+      </para>
+      
    </section>
       
 
@@ -126,4 +127,4 @@
 
 
 
-</chapter>
\ No newline at end of file
+</chapter>

Modified: trunk/src/main/org/jboss/jms/client/container/SessionAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/SessionAspect.java	2007-06-06 08:17:48 UTC (rev 2760)
+++ trunk/src/main/org/jboss/jms/client/container/SessionAspect.java	2007-06-06 20:06:54 UTC (rev 2761)
@@ -132,7 +132,7 @@
       if (ackMode == Session.AUTO_ACKNOWLEDGE || isXAAndConsideredNonTransacted(state))
       {
          //Acknowledge or cancel any outstanding auto ack
-         
+      	
          DeliveryInfo remainingAutoAck = state.getAutoAckInfo();
          
          if (remainingAutoAck != null)

Modified: trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java	2007-06-06 08:17:48 UTC (rev 2760)
+++ trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java	2007-06-06 20:06:54 UTC (rev 2761)
@@ -325,14 +325,19 @@
    
    public void close(long lastDeliveryId) throws JMSException
    {     
+   	log.debug(this + " closing");
+         	
+   	//Wait for the last delivery to arrive
       waitForLastDelivery(lastDeliveryId);
       
+      //Important! We set the listener to null so the next ListenerRunner won't run
+      setMessageListener(null);
+      
+      //Now we wait for any current listener runners to run.
       waitForOnMessageToComplete();   
       
       synchronized (mainLock)
-      {
-         log.debug(this + " closing");
-         
+      {         
          if (closed)
          {
             return;
@@ -837,51 +842,48 @@
       {         
          MessageProxy mp = null;
          
-         boolean again = false;
-           
+         MessageListener theListener = null;
+         
          synchronized (mainLock)
          {
-            if (listener == null)
+            if (listener == null || buffer.isEmpty())
             {
                listenerRunning = false;
                
-               if (trace) { log.trace("no listener, returning"); }
+               if (trace) { log.trace("no listener or buffer is empty, returning"); }
                
                return;
             }
             
+            theListener = listener;
+            
             // remove a message from the buffer
 
-            if (buffer.isEmpty())
+            mp = (MessageProxy)buffer.removeFirst();
+                          
+            if (!buffer.isEmpty())
             {
-               listenerRunning = false;
-               
-               if (trace) { log.trace("no messages in buffer, marking listener as not running"); }
+            	//Queue up the next runner to run
+            	
+            	if (trace) { log.trace("More messages in buffer so queueing next onMessage to run"); }
+            	
+            	queueRunner(this);
+            	
+            	if (trace) { log.trace("Queued next onMessage to run"); }
             }
             else
-            {               
-               mp = (MessageProxy)buffer.removeFirst();
-               
-               if (mp == null)
-               {
-                  throw new java.lang.IllegalStateException("Cannot find message in buffer!");
-               }
-               
-               again = !buffer.isEmpty();
-               
-               if (!again)
-               {
-                  listenerRunning  = false;
-                  if (trace) { log.trace("no more messages in buffer, marking listener as not running"); }
-               }  
-            }
+            {
+            	if (trace) { log.trace("no more messages in buffer, marking listener as not running"); }
+            	
+            	listenerRunning  = false;
+            }               
          }
                         
          if (mp != null)
          {
             try
             {
-               callOnMessage(sessionDelegate, listener, consumerID, queueName,
+               callOnMessage(sessionDelegate, theListener, consumerID, queueName,
                              false, mp, ackMode, maxDeliveries, null);
             }
             catch (JMSException e)
@@ -890,13 +892,7 @@
             } 
          }
                   
-         checkStart();
-         
-         if (again)
-         {
-            // Queue it up again
-            queueRunner(this);
-         }                                               
+         checkStart();                                                   
       }
    }   
 }

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2007-06-06 08:17:48 UTC (rev 2760)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2007-06-06 20:06:54 UTC (rev 2761)
@@ -227,7 +227,7 @@
          // queue for delivery later.
          if (!started)
          {
-            if (trace) { log.trace(this + " NOT started yet!"); }
+            if (trace) { log.trace(this + " NOT started!"); }
 
             return null;
          }

Modified: trunk/src/main/org/jboss/jms/server/remoting/JMSServerInvocationHandler.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/remoting/JMSServerInvocationHandler.java	2007-06-06 08:17:48 UTC (rev 2760)
+++ trunk/src/main/org/jboss/jms/server/remoting/JMSServerInvocationHandler.java	2007-06-06 20:06:54 UTC (rev 2761)
@@ -39,6 +39,9 @@
 import org.jboss.remoting.callback.InvokerCallbackHandler;
 import org.jboss.remoting.callback.ServerInvokerCallbackHandler;
 
+import EDU.oswego.cs.dl.util.concurrent.ReadWriteLock;
+import EDU.oswego.cs.dl.util.concurrent.WriterPreferenceReadWriteLock;
+
 /**
  * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
@@ -54,29 +57,49 @@
    
    // Static ---------------------------------------------------------------------------------------
    
+   private static boolean closed = true;
+   
+   private static ReadWriteLock invokeLock;
+   
+   public static void setClosed(boolean b)
+   {
+   	try
+   	{
+	   	invokeLock.writeLock().acquire();
+	   	
+	   	try
+	   	{
+	   		closed = b;
+	   	}
+	   	finally
+	   	{
+	   		invokeLock.writeLock().release();
+	   	}
+   	}
+   	catch (InterruptedException e)
+   	{
+   		log.error("Failed to set closed to " + closed, e);
+   	}
+   }
+   
    // Attributes -----------------------------------------------------------------------------------
 
    private ServerInvoker invoker;
+   
    private MBeanServer server;
 
    protected Map callbackHandlers;
    
    private boolean trace;
-   
-   //We need some way the server peer can call the invocation handler to make it open/closed
-   private static boolean closed = true;
-   
-   public static synchronized void setClosed(boolean closed)
-   {
-      JMSServerInvocationHandler.closed = closed;
-   }
-   
+             
    // Constructors ---------------------------------------------------------------------------------
 
    public JMSServerInvocationHandler()
    {
       callbackHandlers = new HashMap();
       trace = log.isTraceEnabled();
+      
+      invokeLock = new WriterPreferenceReadWriteLock();      
    }   
      
    // ServerInvocationHandler ----------------------------------------------------------------------
@@ -102,8 +125,9 @@
    {      
       if (trace) { log.trace("invoking " + invocation); }
       
-      synchronized (JMSServerInvocationHandler.class)
-      {         
+      invokeLock.readLock().acquire();
+      try
+      {	              
          if (closed)
          {
             throw new MessagingJMSException("Cannot handle invocation since messaging server is not active (it is either starting up or shutting down)");
@@ -140,6 +164,11 @@
       
          return request.serverInvoke();
       }
+      finally
+      {
+      	invokeLock.readLock().release();
+      }
+      
    }
 
    public void addListener(InvokerCallbackHandler callbackHandler)

Modified: trunk/src/main/org/jboss/messaging/core/ChannelSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/ChannelSupport.java	2007-06-06 08:17:48 UTC (rev 2760)
+++ trunk/src/main/org/jboss/messaging/core/ChannelSupport.java	2007-06-06 20:06:54 UTC (rev 2761)
@@ -82,7 +82,7 @@
 
    protected PersistenceManager pm;
 
-   protected Object refLock;
+   protected Object lock;
 
    protected boolean active = true;
    
@@ -114,6 +114,7 @@
       {
          throw new IllegalArgumentException("ChannelSupport requires a non-null message store");
       }
+      
       if (pm == null)
       {
          throw new IllegalArgumentException("ChannelSupport requires a " +
@@ -132,7 +133,7 @@
 
       messageRefs = new BasicPriorityLinkedList(10);
 
-      refLock = new Object();
+      lock = new Object();
       
       deliveringCount = new SynchronizedInt(0);
       
@@ -147,7 +148,7 @@
 
    public Delivery handle(DeliveryObserver sender, MessageReference ref, Transaction tx)
    {
-      if (!active)
+      if (!isActive())
       {
          return null;
       }
@@ -176,9 +177,9 @@
       {
          pm.updateDeliveryCount(this.channelID, ref);
       }
-      
+            
       deliveringCount.decrement();
-      
+
       if (!checkAndSchedule(ref))
       {
          cancelInternal(ref);
@@ -190,59 +191,69 @@
    public boolean add(Receiver r)
    {
       if (trace) { log.trace(this + " attempting to add receiver " + r); }
-
-      boolean added = router.add(r);
-
-      if (trace) { log.trace("receiver " + r + (added ? "" : " NOT") + " added"); }
-
-      synchronized (refLock)
-      {
-         receiversReady = true;
+      
+      synchronized (lock)
+      {	
+	      boolean added = router.add(r);
+	
+	      if (trace) { log.trace("receiver " + r + (added ? "" : " NOT") + " added"); }
+	
+	      receiversReady = true;
+	      
+	
+	      return added;
       }
-
-      return added;
    }
 
    public boolean remove(Receiver r)
    {
-      boolean removed = router.remove(r);
-
-      if (removed && !router.iterator().hasNext())
-      {
-         synchronized (refLock)
-         {
-            receiversReady = false;
-         }
-      }
-
-      if (trace) { log.trace(this + (removed ? " removed " : " did NOT remove ") + r); }
-
-      return removed;
+   	synchronized (lock)
+   	{	   	
+	      boolean removed = router.remove(r);
+	
+	      if (removed && !router.iterator().hasNext())
+	      {
+	         receiversReady = false;	         
+	      }
+	
+	      if (trace) { log.trace(this + (removed ? " removed " : " did NOT remove ") + r); }
+	
+	      return removed;
+   	}
    }
 
    public void clear()
    {
-      router.clear();
-      
-      synchronized (refLock)
+      synchronized (lock)
       {
+         router.clear();         
+      	
          receiversReady = false;
       }
    }
 
    public boolean contains(Receiver r)
    {
-      return router.contains(r);
+      synchronized (lock)
+      {
+      	return router.contains(r);
+      }
    }
 
    public Iterator iterator()
    {
-      return router.iterator();
+   	synchronized (lock)
+   	{
+   		return router.iterator();
+   	}
    }
 
    public int getNumberOfReceivers()
    {
-      return router.getNumberOfReceivers();
+   	synchronized (lock)
+   	{
+   		return router.getNumberOfReceivers();
+   	}
    }
 
    // Channel implementation -----------------------------------------------------------------------
@@ -271,7 +282,7 @@
    {
       if (trace) { log.trace(this + " browse" + (filter == null ? "" : ", filter = " + filter)); }
 
-      synchronized (refLock)
+      synchronized (lock)
       {
          //FIXME - This is currently broken since it doesn't take into account
          // refs paged into persistent storage
@@ -297,28 +308,30 @@
    {
       checkClosed();
       
-      if (router.getNumberOfReceivers() > 0)
-      {                
-         synchronized (refLock)
-         {
-            receiversReady = true;
-            
-            deliverInternal();
-         }
-         
-      }
+      synchronized (lock)
+      {      
+	      if (router != null && router.getNumberOfReceivers() > 0)
+	      {                
+	         receiversReady = true;
+	            
+	         deliverInternal();                  
+	      }
+      }     
    }      
 
    public void close()
    {
-      if (router != null)
-      {
-         router.clear();
-         
-         router = null;
-      }
-      
-      clearAllScheduledDeliveries();
+   	synchronized (lock)
+   	{
+	      if (router != null)
+	      {
+	         router.clear();
+	         
+	         router = null;
+	      }
+	      
+	      clearAllScheduledDeliveries();
+   	}
    }
 
    /*
@@ -333,11 +346,12 @@
    {
       log.debug(this + " removing all references");
       
-      synchronized (refLock)
+      synchronized (lock)
       {
          if (deliveringCount.get() > 0)
          {
-            throw new IllegalStateException("Cannot remove references while deliveries are in progress");
+            throw new IllegalStateException("Cannot remove references while deliveries are in progress, there are " +
+            		deliveringCount.get());
          }
          
          //Now we consume the rest of the messages
@@ -370,7 +384,7 @@
    {
       List undelivered = new ArrayList();
 
-      synchronized (refLock)
+      synchronized (lock)
       {
          Iterator iter = messageRefs.getAll().iterator();
 
@@ -401,7 +415,7 @@
     */
    public int getMessageCount()
    {
-      synchronized (refLock)
+      synchronized (lock)
       {
          return messageRefs.size() + getDeliveringCount() + getScheduledCount();
       }
@@ -422,7 +436,7 @@
 
    public void activate()
    {
-      synchronized (refLock)
+      synchronized (lock)
       {
          active = true;         
       }
@@ -430,7 +444,7 @@
 
    public void deactivate()
    {
-      synchronized (refLock)
+      synchronized (lock)
       {
          active = false;         
       }
@@ -438,7 +452,7 @@
 
    public boolean isActive()
    {
-      synchronized (refLock)
+      synchronized (lock)
       {
          return active;         
       }
@@ -451,7 +465,7 @@
                   
       List dels = new ArrayList();
       
-      synchronized (refLock)
+      synchronized (lock)
       {
          ListIterator liter = messageRefs.iterator();
                            
@@ -491,7 +505,7 @@
    
    public int getMaxSize()
    {
-      synchronized (refLock)
+      synchronized (lock)
       {
          return maxSize;
       }
@@ -499,7 +513,7 @@
    
    public void setMaxSize(int newSize)
    {
-      synchronized (refLock)
+      synchronized (lock)
       {
          int count = getMessageCount();
          
@@ -524,7 +538,7 @@
    //Only used for testing
    public int memoryRefCount()
    {
-      synchronized (refLock)
+      synchronized (lock)
       {
          return messageRefs.size();
       }
@@ -564,7 +578,7 @@
    {
       if (trace) { log.trace(this + " cancelling " + ref + " in memory"); }
 
-      synchronized (refLock)
+      synchronized (lock)
       {
          messageRefs.addFirst(ref, ref.getMessage().getPriority());
       }
@@ -633,7 +647,7 @@
                   
                   // Receiver accepted the reference
                   
-                  synchronized (refLock)
+                  synchronized (lock)
                   {
                      if (iter == null)
                      {
@@ -648,7 +662,7 @@
                         iter.remove();                                
                      }
                   }
-                  
+                                  
                   deliveringCount.increment();                     
                }               
             }
@@ -673,7 +687,7 @@
       {      
          // We synchonize on the ref lock to prevent scheduled deivery kicking in before
          // load has finished
-         synchronized (refLock)
+         synchronized (lock)
          {
             // Attempt to push the ref to a receiver
             
@@ -762,7 +776,7 @@
             
             if (!checkAndSchedule(ref))
             {               
-               synchronized (refLock)
+               synchronized (lock)
                {
                   addReferenceInMemory(ref);
                   
@@ -851,7 +865,7 @@
          }
               
          d.getReference().releaseMemoryReference(); 
-         
+                  
          deliveringCount.decrement();
       }
       else
@@ -992,7 +1006,7 @@
 
                try
                {
-                  synchronized (refLock)
+                  synchronized (lock)
                   {
                      addReferenceInMemory(ref);
                   }
@@ -1022,7 +1036,7 @@
             // prompt delivery
             if (promptDelivery)
             {
-            	synchronized (refLock)
+            	synchronized (lock)
             	{
             		deliverInternal();
             	}

Modified: trunk/src/main/org/jboss/messaging/core/PagingChannelSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/PagingChannelSupport.java	2007-06-06 08:17:48 UTC (rev 2760)
+++ trunk/src/main/org/jboss/messaging/core/PagingChannelSupport.java	2007-06-06 20:06:54 UTC (rev 2761)
@@ -147,7 +147,7 @@
       
       // Also need to add the paged refs
       
-      synchronized (refLock)
+      synchronized (lock)
       {      
          count += nextPagingOrder - firstPagingOrder;
       }
@@ -160,7 +160,7 @@
    //Only used in testing
    public int downCacheCount()
    {
-      synchronized (refLock)
+      synchronized (lock)
       {
          return downCache.size();
       }
@@ -169,7 +169,7 @@
    //Only used in testing
    public boolean isPaging()
    {
-      synchronized (refLock)
+      synchronized (lock)
       {
          return paging;
       }
@@ -177,7 +177,7 @@
    
    public void setPagingParams(int fullSize, int pageSize, int downCacheSize)
    {
-      synchronized (refLock)
+      synchronized (lock)
       { 
          if (active)
          {
@@ -194,7 +194,7 @@
    
    public void load() throws Exception
    {            
-      synchronized (refLock)
+      synchronized (lock)
       {
          if (active)
          {
@@ -221,7 +221,7 @@
       
    public void unload() throws Exception
    {
-      synchronized (refLock)
+      synchronized (lock)
       {
          if (active)
          {
@@ -240,13 +240,6 @@
       }
    }
    
-   public boolean isActive()
-   {
-      synchronized (refLock)
-      {
-         return active;
-      }
-   }
    
    // Protected ------------------------------------------------------------------------------------
    
@@ -320,7 +313,7 @@
       
    protected void cancelInternal(MessageReference ref) throws Exception
    {
-      synchronized (refLock)
+      synchronized (lock)
       {         
          super.cancelInternal(ref);
          

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java	2007-06-06 08:17:48 UTC (rev 2760)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java	2007-06-06 20:06:54 UTC (rev 2761)
@@ -253,7 +253,7 @@
    {
       if (trace) { log.trace("Merging queue " + remoteQueue + " into " + this); }
            
-      synchronized (refLock)
+      synchronized (lock)
       {
          flushDownCache();
                   
@@ -421,7 +421,7 @@
             {                  
                MessageReference ref;
                
-               synchronized (refLock)
+               synchronized (lock)
                {
                   ref = removeFirstInMemory();
                   

Added: trunk/tests/src/org/jboss/test/messaging/jms/stress/OpenCloseStressTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/stress/OpenCloseStressTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/test/messaging/jms/stress/OpenCloseStressTest.java	2007-06-06 20:06:54 UTC (rev 2761)
@@ -0,0 +1,462 @@
+/*
+  * JBoss, Home of Professional Open Source
+  * Copyright 2005, JBoss Inc., and individual contributors as indicated
+  * by the @authors tag. See the copyright.txt in the distribution for a
+  * full listing of individual contributors.
+  *
+  * This is free software; you can redistribute it and/or modify it
+  * under the terms of the GNU Lesser General Public License as
+  * published by the Free Software Foundation; either version 2.1 of
+  * the License, or (at your option) any later version.
+  *
+  * This software is distributed in the hope that it will be useful,
+  * but WITHOUT ANY WARRANTY; without even the implied warranty of
+  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+  * Lesser General Public License for more details.
+  *
+  * You should have received a copy of the GNU Lesser General Public
+  * License along with this software; if not, write to the Free
+  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+  */
+package org.jboss.test.messaging.jms.stress;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.naming.InitialContext;
+
+import org.jboss.jms.client.JBossConnectionFactory;
+import org.jboss.test.messaging.MessagingTestCase;
+import org.jboss.test.messaging.tools.ServerManagement;
+import org.jboss.util.id.GUID;
+
+/**
+ * 
+ * A OpenCloseStressTest.
+ * 
+ * This stress test starts several publisher connections and several subscriber connections, then sends and consumes
+ * messages while concurrently closing the sessions.
+ * 
+ * This test will help catch race conditions that occurred with rapid open/closing of sessions when messages are being 
+ * sent/received
+ * 
+ * E.g. http://jira.jboss.com/jira/browse/JBMESSAGING-982
+ * 
+ * @author <a href="tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 2349 $</tt>
+ *
+ * $Id: StressTest.java 2349 2007-02-19 14:15:53Z timfox $
+ */
+public class OpenCloseStressTest extends MessagingTestCase
+{
+   public OpenCloseStressTest(String name)
+   {
+      super(name);
+   }
+   
+   InitialContext ic;
+   JBossConnectionFactory cf;
+   Topic topic;
+
+   public void setUp() throws Exception
+   {
+      super.setUp();
+
+      ServerManagement.start("all");
+
+      ic = new InitialContext(ServerManagement.getJNDIEnvironment());
+      cf = (JBossConnectionFactory)ic.lookup("/ConnectionFactory");
+
+      ServerManagement.undeployTopic("TestTopic");
+      ServerManagement.deployTopic("TestTopic");
+
+      topic = (Topic) ic.lookup("topic/TestTopic");
+
+      log.debug("setup done");
+   }
+
+   public void tearDown() throws Exception
+   {
+      ServerManagement.undeployQueue("TestQueue");
+
+      super.tearDown();
+
+      log.debug("tear down done");
+   }
+   
+   public void testOpenClose() throws Exception
+   {
+   	Connection conn1 = null;
+   	Connection conn2 = null;
+   	Connection conn3 = null;
+   	
+   	Connection conn4 = null;
+   	Connection conn5 = null;
+   	Connection conn6 = null;
+   	Connection conn7 = null;
+   	Connection conn8 = null;
+   	
+   	try
+   	{   	
+	      Publisher[] publishers = new Publisher[3];
+	      
+	      final int MSGS_PER_PUBLISHER = 10000;
+	      
+	      conn1 = cf.createConnection();
+	      Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+	      MessageProducer prod1 = sess1.createProducer(topic);
+	      prod1.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+	      publishers[0] = new Publisher(sess1, prod1, MSGS_PER_PUBLISHER, 2);
+	      
+	      conn2 = cf.createConnection();
+	      Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+	      MessageProducer prod2 = sess2.createProducer(topic);
+	      prod2.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+	      publishers[1] = new Publisher(sess2, prod2, MSGS_PER_PUBLISHER, 5);
+	      
+	      conn3 = cf.createConnection();
+	      Session sess3 = conn3.createSession(false, Session.AUTO_ACKNOWLEDGE);
+	      MessageProducer prod3 = sess3.createProducer(topic);
+	      prod3.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+	      publishers[2] = new Publisher(sess3, prod3, MSGS_PER_PUBLISHER, 1);
+	      
+	      Subscriber[] subscribers = new Subscriber[5];
+   			      
+	      conn4 = cf.createConnection();
+	      subscribers[0] = new Subscriber(conn4, 3 * MSGS_PER_PUBLISHER, 500, 1000 * 60 * 5, topic, false);
+	      
+	      conn5 = cf.createConnection();
+	      subscribers[1] = new Subscriber(conn5, 3 * MSGS_PER_PUBLISHER, 2000, 1000 * 60 * 5, topic, false);
+	      
+	      conn6 = cf.createConnection();
+	      subscribers[2] = new Subscriber(conn6, 3 * MSGS_PER_PUBLISHER, 700, 1000 * 60 * 5, topic, false);
+	      
+	      conn7 = cf.createConnection();
+	      subscribers[3] = new Subscriber(conn7, 3 * MSGS_PER_PUBLISHER, 1500, 1000 * 60 * 5, topic, true);
+	      
+	      conn8 = cf.createConnection();
+	      subscribers[4] = new Subscriber(conn8, 3 * MSGS_PER_PUBLISHER, 1200, 1000 * 60 * 5, topic, true);
+	      
+	      Thread[] threads = new Thread[8];
+	      
+	      //subscribers
+	      threads[0] = new Thread(subscribers[0]);
+	      
+	      threads[1] = new Thread(subscribers[1]);
+	      
+	      threads[2] = new Thread(subscribers[2]);
+	      
+	      threads[3] = new Thread(subscribers[3]);
+	      
+	      threads[4] = new Thread(subscribers[4]);
+	      
+	      //publishers
+	      
+	      threads[5] = new Thread(publishers[0]);
+	      
+	      threads[6] = new Thread(publishers[1]);
+	      
+	      threads[7] = new Thread(publishers[2]);
+	      
+	      for (int i = 0; i < subscribers.length; i++)
+	      {
+	      	threads[i].start();
+	      }
+	      
+	      // Pause before creating producers otherwise subscribers might not get all messages
+	      
+	      Thread.sleep(5000);
+	      
+	      for (int i = subscribers.length; i < threads.length; i++)
+	      {
+	      	threads[i].start();
+	      }
+	      
+	      for (int i = 0; i < threads.length; i++)
+	      {
+	      	threads[i].join();
+	      }
+	      
+	      for (int i = 0; i < subscribers.length; i++)
+	      {	      	
+	      	if (subscribers[i].isDurable())
+	      	{
+	      		assertEquals(3 * MSGS_PER_PUBLISHER, subscribers[i].getMessagesReceived());
+	      	}
+	      	else
+	      	{
+	      		//Note that for a non durable subscriber the number of messages received in total
+		      	//will be somewhat less than the total number received since when recycling the session
+		      	//there is a period of time after closing the previous session and starting the next one
+		      	//when messages are being sent and won't be received (since there is no consumer)
+	      	}
+	      		
+	      	
+	      	assertFalse(subscribers[i].isFailed());
+	      }
+	      
+	      for (int i = 0; i < publishers.length; i++)
+	      {
+	      	assertFalse(publishers[i].isFailed());
+	      }
+	   	
+   	}
+   	finally
+   	{
+   		if (conn1 != null)
+   		{
+   			conn1.close();
+   		}
+   		if (conn2 != null)
+   		{
+   			conn2.close();
+   		}
+   		if (conn3 != null)
+   		{
+   			conn3.close();
+   		}
+   		if (conn4 != null)
+   		{
+   			conn4.close();
+   		}
+   		if (conn5 != null)
+   		{
+   			conn5.close();
+   		}
+   		if (conn6 != null)
+   		{
+   			conn6.close();
+   		}
+   		if (conn7 != null)
+   		{
+   			conn7.close();
+   		}
+   		if (conn8 != null)
+   		{
+   			conn8.close();
+   		}
+   	}
+      
+   }
+   
+   class Publisher implements Runnable
+   {
+   	private Session sess;
+   	
+   	private int numMessages;
+   	
+   	private int delay;
+   	
+   	private MessageProducer prod;
+   	
+   	private boolean failed;
+   	
+   	boolean isFailed()
+   	{
+   		return failed;
+   	}
+   	   	   	
+   	Publisher(Session sess, MessageProducer prod, int numMessages, int delay)
+   	{
+   		this.sess = sess;
+   		
+   		this.prod = prod;
+   		
+   		this.numMessages = numMessages;
+   		
+   		this.delay = delay;
+   	}
+
+		public void run()
+		{
+			try
+			{
+				for (int i = 0; i < numMessages; i++)
+				{
+					TextMessage tm = sess.createTextMessage("message" + i);
+					
+					
+//					if (i % 1000 == 0)
+//					{
+//						log.info("Publisher " + this + " sending message " + i);
+//					}
+					
+					prod.send(tm);
+														
+					try
+					{
+						Thread.sleep(delay);
+					}
+					catch (Exception ignore)
+					{						
+					}
+				}
+				
+				log.info("Publisher " + this + " sent all messages");
+			}
+			catch (JMSException e)
+			{
+				log.error("Failed to send message", e);
+				failed = true;
+			}
+		}
+   	
+   }
+   
+   
+   class Subscriber implements Runnable
+   {
+   	private Session sess;
+   	
+   	private MessageConsumer cons;
+   	
+   	private int msgsReceived;
+   	
+   	private int numMessages;
+   	
+   	private int delay;
+   	
+   	private Connection conn;
+   	   	
+   	private boolean failed;
+   	
+   	private long timeout;
+   	
+   	private Destination dest;
+   	
+   	private boolean durable;
+   	
+   	private String subname;
+   	
+   	boolean isFailed()
+   	{
+   		return failed;
+   	}
+   	
+   	boolean isDurable()
+   	{
+   		return durable;
+   	}
+   	
+   	   	
+   	synchronized void msgReceived()
+   	{   		
+   		msgsReceived++;
+   		
+//   		if (msgsReceived % 1000 == 0)
+//			{
+//				log.info("Subscriber " + this + " received " + msgsReceived + " messages");
+//			}
+   	}
+   	
+   	synchronized int getMessagesReceived()
+   	{
+   		return msgsReceived;
+   	}
+   	
+   	class Listener implements MessageListener
+   	{
+
+			public void onMessage(Message msg)
+			{
+				msgReceived();
+			}
+   		
+   	}
+   	
+   	
+   	   	   	
+   	Subscriber(Connection conn, int numMessages, int delay, long timeout, Destination dest, boolean durable) throws Exception
+   	{
+   		this.conn = conn;
+   		
+   		this.numMessages = numMessages;
+   		
+   		this.delay = delay;
+   		
+   		this.timeout = timeout;
+   		
+   		this.dest = dest;
+   		
+   		this.durable = durable;
+   		
+   		if (durable)
+   		{
+   			conn.setClientID(new GUID().toString());
+   			
+   			this.subname = new GUID().toString();
+   		}
+   	}
+
+		public void run()
+		{
+			try
+			{
+				long start = System.currentTimeMillis();
+				
+				while (((System.currentTimeMillis() - start) < timeout) && msgsReceived < numMessages)
+				{
+					//recycle the session
+					
+					recycleSession();
+					
+					Thread.sleep(delay);
+				}
+				
+				//Delete the durable sub
+				
+				if (durable)
+				{
+					recycleSession();
+					
+					cons.close();
+					
+					sess.unsubscribe(subname);
+				}
+			}
+			catch (Exception e)
+			{
+				log.error("Failed in subscriber", e);
+				failed = true;
+			}
+						
+		}
+		
+		void recycleSession() throws Exception
+		{
+			conn.stop();
+			
+			if (sess != null)
+			{
+				sess.close();
+			}
+			
+			sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+			
+			if (durable)
+			{
+				cons = sess.createDurableSubscriber((Topic)dest, subname);
+			}
+			else
+			{
+				cons = sess.createConsumer(dest);
+			}
+						
+			cons.setMessageListener(new Listener());
+			
+			conn.start();
+		}
+   	
+   }
+
+}
+




More information about the jboss-cvs-commits mailing list