[jboss-cvs] JBoss Messaging SVN: r2828 - in trunk: tests/src/org/jboss/test/messaging/jms and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Jul 3 08:12:42 EDT 2007


Author: sergeypk
Date: 2007-07-03 08:12:42 -0400 (Tue, 03 Jul 2007)
New Revision: 2828

Modified:
   trunk/src/main/org/jboss/jms/client/container/ClientConsumer.java
   trunk/src/main/org/jboss/jms/client/container/ConnectionAspect.java
   trunk/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java
Log:
http://jira.jboss.com/jira/browse/JBMESSAGING-983 - Connection.stop should wait until all message listeners have completed.

Modified: trunk/src/main/org/jboss/jms/client/container/ClientConsumer.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/ClientConsumer.java	2007-07-03 12:00:14 UTC (rev 2827)
+++ trunk/src/main/org/jboss/jms/client/container/ClientConsumer.java	2007-07-03 12:12:42 UTC (rev 2828)
@@ -198,6 +198,7 @@
    private MessageListener listener;
    private int ackMode;
    private boolean closed;
+   private volatile boolean paused;
    private Object mainLock;
    private int maxBufferSize;
    private int minBufferSize;
@@ -287,12 +288,12 @@
          
          this.listener = listener;
                             
-         if (listener != null && !buffer.isEmpty())
+         if (listener != null && isMessageAvailableForConsuming())
          {  
             listenerRunning = true;
             
             this.queueRunner(new ListenerRunner());
-         }        
+         }
       }   
    }
    
@@ -337,18 +338,15 @@
    public void close(long lastDeliveryId) throws JMSException
    {     
    	log.debug(this + " closing");
-         	
-   	//Wait for the last delivery to arrive
+      
+      //Wait for the last delivery to arrive
       waitForLastDelivery(lastDeliveryId);
       
-      //Important! We set the listener to null so the next ListenerRunner won't run
-      if (listener != null)
-      {
-      	setMessageListener(null);
-      }
+      //Important! We set paused to true so the next ListenerRunner won't run
+      paused = true;
       
       //Now we wait for any current listener runners to run.
-      waitForOnMessageToComplete();   
+      waitForOnMessageToComplete();
       
       synchronized (mainLock)
       {         
@@ -370,6 +368,32 @@
                            
       if (trace) { log.trace(this + " closed"); }
    }
+   
+   public void start()
+   {
+      paused = false;
+      synchronized (mainLock)
+      {
+         if (!buffer.isEmpty())
+         {
+            // Messages arrived while the consumer was paused
+            messageAdded();
+         }
+      }
+   }
+   
+   public void stop()
+   {
+      log.debug(this + " stopping");
+      paused = true;
+   }
+
+   public void waitUntilStopped() throws JMSException
+   {
+      log.debug(this + " waiting until paused");
+      waitForOnMessageToComplete();
+      if (trace) { log.trace(this + " stopped"); }
+   }
      
    /**
     * Method used by the client thread to get a Message, if available.
@@ -729,27 +753,30 @@
    {
       boolean notified = false;
       
-      // If we have a thread waiting on receive() we notify it
-      if (receiverThread != null)
+      if (!paused)
       {
-         if (trace) { log.trace(this + " notifying receiver/waiter thread"); }   
-         
-         mainLock.notifyAll();
-         
-         notified = true;
-      }     
-      else if (listener != null)
-      { 
-         // We have a message listener
-         if (!listenerRunning)
+         // If we have a thread waiting on receive() we notify it
+         if (receiverThread != null)
          {
-            listenerRunning = true;
-
-            if (trace) { log.trace(this + " scheduled a new ListenerRunner"); }
-            this.queueRunner(new ListenerRunner());
-         }     
-         
-         //TODO - Execute onMessage on same thread for even better throughput 
+            if (trace) { log.trace(this + " notifying receiver/waiter thread"); }   
+            
+            mainLock.notifyAll();
+   
+            notified = true;
+         }
+         else if (listener != null)
+         {
+            // We have a message listener
+            if (!listenerRunning && !paused)
+            {
+               listenerRunning = true;
+   
+               if (trace) { log.trace(this + " scheduled a new ListenerRunner"); }
+               this.queueRunner(new ListenerRunner());
+            }     
+            
+            //TODO - Execute onMessage on same thread for even better throughput 
+         }
       }
       
       // Make sure we notify any thread waiting for last delivery
@@ -793,7 +820,7 @@
             if (timeout == 0)
             {
                // wait for ever potentially
-               while (!closed && buffer.isEmpty())
+               while (!closed && !isMessageAvailableForConsuming())
                {
                   if (trace) { log.trace(this + " waiting on main lock, no timeout"); }
 
@@ -807,7 +834,7 @@
                // wait with timeout
                long toWait = timeout;
              
-               while (!closed && buffer.isEmpty() && toWait > 0)
+               while (!closed && !isMessageAvailableForConsuming() && toWait > 0)
                {
                   if (trace) { log.trace(this + " waiting on main lock, timeout " + toWait + " ms"); }
 
@@ -834,6 +861,14 @@
       return m;
    }
    
+   /**
+    * @return true if the ClientConsumer is not paused and has a message in its buffer.
+    */
+   private boolean isMessageAvailableForConsuming()
+   {
+      return !paused && !buffer.isEmpty();
+   }
+   
    // Inner classes --------------------------------------------------------------------------------
          
    /*
@@ -872,11 +907,11 @@
          
          synchronized (mainLock)
          {
-            if (listener == null || buffer.isEmpty())
+            if (listener == null || !isMessageAvailableForConsuming())
             {
                listenerRunning = false;
                
-               if (trace) { log.trace("no listener or buffer is empty, returning"); }
+               if (trace) { log.trace("no listener or no message available for processing, returning"); }
                
                return;
             }
@@ -887,7 +922,7 @@
 
             mp = (MessageProxy)buffer.removeFirst();
                           
-            if (!buffer.isEmpty())
+            if (isMessageAvailableForConsuming())
             {
             	//Queue up the next runner to run
             	

Modified: trunk/src/main/org/jboss/jms/client/container/ConnectionAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/ConnectionAspect.java	2007-07-03 12:00:14 UTC (rev 2827)
+++ trunk/src/main/org/jboss/jms/client/container/ConnectionAspect.java	2007-07-03 12:12:42 UTC (rev 2828)
@@ -21,6 +21,10 @@
  */
 package org.jboss.jms.client.container;
 
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
 import javax.jms.ExceptionListener;
 import javax.jms.IllegalStateException;
 
@@ -32,6 +36,8 @@
 import org.jboss.jms.client.remoting.ConsolidatedRemotingConnectionListener;
 import org.jboss.jms.client.remoting.JMSRemotingConnection;
 import org.jboss.jms.client.state.ConnectionState;
+import org.jboss.jms.client.state.ConsumerState;
+import org.jboss.jms.client.state.SessionState;
 import org.jboss.jms.message.MessageIdGeneratorFactory;
 import org.jboss.jms.tx.ResourceManagerFactory;
 
@@ -145,6 +151,15 @@
       ConnectionState currentState = getConnectionState(invocation);
       currentState.setStarted(true);
       currentState.setJustCreated(false);
+      
+      // Start all consumers
+      forEachConsumer(currentState, new ConsumerAction() {
+         public void run(ClientConsumer consumer) throws Exception
+         {
+            consumer.start();
+         }
+      });
+
       return invocation.invokeNext();
    }
    
@@ -153,7 +168,27 @@
       ConnectionState currentState = getConnectionState(invocation);
       currentState.setStarted(false);
       currentState.setJustCreated(false);
-      return invocation.invokeNext();
+      
+      Object ret = invocation.invokeNext();
+      
+      // Stop all consumers - in two steps, first tell each one to stop,
+      // then actually wait until all have stopped.
+
+      forEachConsumer(currentState, new ConsumerAction() {
+         public void run(ClientConsumer consumer) throws Exception
+         {
+            consumer.stop();
+         }
+      });
+      
+      forEachConsumer(currentState, new ConsumerAction() {
+         public void run(ClientConsumer consumer) throws Exception
+         {
+            consumer.waitUntilStopped();
+         }
+      });
+
+      return ret;
    }
    
    public Object handleCreateSessionDelegate(Invocation invocation) throws Throwable
@@ -256,7 +291,32 @@
       }
       return state;
    }
+      
+   private static void forEachConsumer(ConnectionState connectionState, ConsumerAction action) throws Exception
+   {
+      Set sessions = connectionState.getChildren();
+      
+      for (Iterator sessionsIter = sessions.iterator(); sessionsIter.hasNext();)
+      {
+         SessionState sessionState = (SessionState) sessionsIter.next();
+         
+         for (Iterator childrenIter = sessionState.getChildren().iterator(); childrenIter.hasNext();)
+         {
+            Object childState = childrenIter.next();
+            if (!(childState instanceof ConsumerState))
+            {
+               continue;
+            }
+            ClientConsumer consumer = ((ConsumerState) childState).getClientConsumer();
+            action.run(consumer);
+         }
+      }
+   }
    
-   
    // Inner classes -------------------------------------------------
+
+   private interface ConsumerAction
+   {
+      void run(ClientConsumer consumer) throws Exception;
+   }
 }

Modified: trunk/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java	2007-07-03 12:00:14 UTC (rev 2827)
+++ trunk/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java	2007-07-03 12:12:42 UTC (rev 2828)
@@ -52,6 +52,7 @@
 import org.jboss.test.messaging.tools.ServerManagement;
 
 import EDU.oswego.cs.dl.util.concurrent.Latch;
+import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
 
 /**
  * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
@@ -2070,7 +2071,100 @@
       }
    }
 
+   public void testStopConnectionDuringOnMessage() throws Exception
+   {
+      if (log.isTraceEnabled()) log.trace("testStopConnectionWhileOnMessageIsExecuting");
+      
+      final SynchronizedInt messagesReceived = new SynchronizedInt(0);
+      
+      MessageListener myListener = new MessageListener() {
+         public void onMessage(Message message)
+         {
+            messagesReceived.increment();
+            try
+            {
+               Thread.sleep(100L);
+            }
+            catch (InterruptedException e)
+            {
+               // Ignore
+            }
+         }
+      };
+      
+      queueConsumer.setMessageListener(myListener);
+      
+      log.trace("Starting consumer connection");
+      consumerConnection.start();
+      
+      final int MESSAGE_COUNT = 100;
+      
+      log.trace("Sending the first batch of messages");
 
+      for (int i = 0; i < MESSAGE_COUNT / 2; i++)
+      {
+         queueProducer.send(producerSession.createTextMessage("Message #" + Integer.toString(i)));
+      }
+
+      Thread.sleep(500L);
+      
+      log.trace("Stopping consumer connection");
+      consumerConnection.stop();
+
+      int countAfterStop = messagesReceived.get();
+      assertTrue("Should have received some messages before stopping", countAfterStop > 0);
+      
+      log.trace("Sending the second batch of messages");
+      for (int i = MESSAGE_COUNT / 2; i < MESSAGE_COUNT; i++)
+      {
+         queueProducer.send(producerSession.createTextMessage("Message #" + Integer.toString(i)));
+      }
+
+      log.trace("Sleeping a bit to check that no messages are received");
+      Thread.sleep(2000);
+
+      assertEquals("Should not receive any messages after the connection has been stopped", countAfterStop, messagesReceived.get());
+
+      log.trace("Restarting consumer connection");
+      consumerConnection.start();
+      
+      log.trace("Sleeping to allow remaining messages to arrive");
+      Thread.sleep(15000);
+      assertEquals("Should have received all messages after restarting", MESSAGE_COUNT, messagesReceived.get());
+   }
+   
+   // Test that stop doesn't in any way break subsequent close 
+   public void testCloseAfterStop() throws Exception
+   {
+      MessageListener myListener = new MessageListener() {
+         public void onMessage(Message message)
+         {
+            try
+            {
+               Thread.sleep(100);
+            }
+            catch (InterruptedException e)
+            {
+               // Ignore
+            }
+         }
+      };
+      
+      queueConsumer.setMessageListener(myListener);
+
+      consumerConnection.start();
+
+      for (int i = 0; i < 100; i++)
+      {
+         queueProducer.send(producerSession.createTextMessage("Message #" + Integer.toString(i)));
+      }
+
+      Thread.sleep(500);
+      consumerConnection.stop();
+      Thread.sleep(500);
+      consumerConnection.close();
+   }
+
    //
    // Multiple consumers
    //




More information about the jboss-cvs-commits mailing list