[jboss-cvs] JBoss Messaging SVN: r6168 - in trunk: tests/src/org/jboss/messaging/tests/integration/client and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Mar 26 05:02:26 EDT 2009


Author: ataylor
Date: 2009-03-26 05:02:25 -0400 (Thu, 26 Mar 2009)
New Revision: 6168

Added:
   trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientMessageHandlerTest.java
Modified:
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
   trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientConsumerTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientSessionStopStartTest.java
Log:
tests and fixes

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java	2009-03-26 02:47:40 UTC (rev 6167)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java	2009-03-26 09:02:25 UTC (rev 6168)
@@ -156,7 +156,7 @@
             synchronized (this)
             {
                while ((stopped || (m = buffer.poll()) == null) &&
-                      !closed && toWait > 0 )
+                      !closed && toWait > 0)
                {
                   if (start == -1)
                   {
@@ -247,22 +247,21 @@
                                       "Cannot set MessageHandler - consumer is in receive(...)");
       }
 
-      // If no handler before then need to queue them up
-      boolean queueUp = handler == null;
+      boolean noPreviousHandler = handler == null;
 
       handler = theHandler;
 
-      if (queueUp)
+      //if no previous handler existed queue up messages for delivery
+      if (handler != null && noPreviousHandler)
       {
-         stopped = false;
          for (int i = 0; i < buffer.size(); i++)
          {
             queueExecutor();
          }
       }
-      else
+      //if unsetting a previous handler may be in onMessage so wait for completion
+      else if (handler == null && !noPreviousHandler)
       {
-         stopped = true;
          waitForOnMessageToComplete();
       }
    }
@@ -299,7 +298,7 @@
 
    public synchronized void stop() throws MessagingException
    {
-      if(stopped)
+      if (stopped)
       {
          return;
       }
@@ -336,7 +335,7 @@
          // This is ok - we just ignore the message
          return;
       }
-      
+
       ClientMessageInternal messageToHandle = message;
 
       if (isFileConsumer())
@@ -350,11 +349,11 @@
       {
          // Execute using executor
 
-       buffer.add(messageToHandle);
-       if(!stopped)
-       {
-          queueExecutor();
-       }
+         buffer.add(messageToHandle);
+         if (!stopped)
+         {
+            queueExecutor();
+         }
       }
       else
       {
@@ -393,7 +392,7 @@
 
       if (isFileConsumer())
       {
-         ClientFileMessageInternal fileMessage = (ClientFileMessageInternal)currentChunkMessage;
+         ClientFileMessageInternal fileMessage = (ClientFileMessageInternal) currentChunkMessage;
          addBytesBody(fileMessage, chunk.getBody());
       }
       else
@@ -413,7 +412,7 @@
          // Close the file that was being generated
          if (isFileConsumer())
          {
-            ((ClientFileMessageInternal)currentChunkMessage).closeChannel();
+            ((ClientFileMessageInternal) currentChunkMessage).closeChannel();
          }
 
          currentChunkMessage.setFlowControlSize(chunk.getPacketSize());
@@ -432,7 +431,7 @@
       {
          buffer.clear();
       }
-           
+
       waitForOnMessageToComplete();
    }
 
@@ -574,7 +573,7 @@
       MessageHandler theHandler = handler;
 
       if (theHandler != null)
-      {         
+      {
          synchronized (this)
          {
             message = buffer.poll();
@@ -665,7 +664,7 @@
          {
             if (message instanceof ClientFileMessage)
             {
-               ((ClientFileMessage)message).getFile().delete();
+               ((ClientFileMessage) message).getFile().delete();
             }
          }
       }
@@ -692,7 +691,7 @@
       {
          int propertiesSize = message.getPropertiesEncodeSize();
 
-         MessagingBuffer bufferProperties = session.createBuffer(propertiesSize); 
+         MessagingBuffer bufferProperties = session.createBuffer(propertiesSize);
 
          // FIXME: Find a better way to clone this ClientMessageImpl as ClientFileMessageImpl without using the
          // MessagingBuffer.
@@ -729,7 +728,7 @@
    private ClientMessageInternal createFileMessage(final byte[] header) throws Exception
    {
 
-      MessagingBuffer headerBuffer = ChannelBuffers.wrappedBuffer(header); 
+      MessagingBuffer headerBuffer = ChannelBuffers.wrappedBuffer(header);
 
       if (isFileConsumer())
       {
@@ -771,7 +770,7 @@
    {
       public void run()
       {
-         
+
          try
          {
             callOnMessage();

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientConsumerTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientConsumerTest.java	2009-03-26 02:47:40 UTC (rev 6167)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientConsumerTest.java	2009-03-26 09:02:25 UTC (rev 6168)
@@ -26,16 +26,12 @@
 import org.jboss.messaging.core.client.ClientProducer;
 import org.jboss.messaging.core.client.ClientSession;
 import org.jboss.messaging.core.client.ClientSessionFactory;
-import org.jboss.messaging.core.client.MessageHandler;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.server.MessagingService;
 import org.jboss.messaging.core.server.Queue;
 import org.jboss.messaging.tests.util.ServiceTestBase;
 import org.jboss.messaging.utils.SimpleString;
 
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
 /**
  * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
  * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
@@ -69,165 +65,7 @@
    }
 
    
-   public void testSetMessageHandlerWithMessagesPending() throws Exception
-   {
-      ClientSessionFactory sf = createInVMFactory();
-
-      ClientSession session = sf.createSession(false, true, true);
-
-      session.createQueue(QUEUE, QUEUE, null, false, false);
-
-      ClientProducer producer = session.createProducer(QUEUE);
-
-      final int numMessages = 100;
-
-      for (int i = 0; i < numMessages; i++)
-      {
-         ClientMessage message = createTextMessage("m" + i, session);
-         producer.send(message);
-      }
-
-      ClientConsumer consumer = session.createConsumer(QUEUE, null, true);
-
-      session.start();
-
-      Thread.sleep(100);
-
-      // Message should be in consumer
-
-      class MyHandler implements MessageHandler
-      {
-         public void onMessage(final ClientMessage message)
-         {
-            try
-            {
-               Thread.sleep(10);
-
-               message.acknowledge();
-            }
-            catch (Exception e)
-            {
-            }
-         }
-      }
-
-      consumer.setMessageHandler(new MyHandler());
-
-      // Let a few messages get processed
-      Thread.sleep(100);
-
-      // Now set null
-
-      consumer.setMessageHandler(null);
-
-      // Give a bit of time for some queued executors to run
-
-      Thread.sleep(500);
-
-      // Make sure no exceptions were thrown from onMessage
-      assertNull(consumer.getLastException());
-
-      session.close();
-   }
-
-
-
-   public void testSetUnsetMessageHandler() throws Exception
-   {
-      ClientSessionFactory sf = createInVMFactory();
-
-      final ClientSession session = sf.createSession(false, true, true);
-
-      session.createQueue(QUEUE, QUEUE, null, false, false);
-
-      ClientProducer producer = session.createProducer(QUEUE);
-
-      final int numMessages = 100;
-
-      for (int i = 0; i < numMessages; i++)
-      {
-         ClientMessage message = createTextMessage("m" + i, session);
-         message.putIntProperty(new SimpleString("i"), i);
-         producer.send(message);
-      }
-
-      final ClientConsumer consumer = session.createConsumer(QUEUE);
-
-      Queue q = (Queue) messagingService.getServer().getPostOffice().getBinding(QUEUE).getBindable();
-         int ccount = q.getConsumerCount();
-
-      session.start();
-
-      CountDownLatch latch = new CountDownLatch(50);
-
-      // Message should be in consumer
-
-      class MyHandler implements MessageHandler
-      {
-         int messageReceived = 0;
-         boolean failed;
-
-         boolean started = true;
-
-         private final CountDownLatch latch;
-
-         public MyHandler(CountDownLatch latch)
-         {
-            this.latch = latch;
-         }
-
-         public void onMessage(final ClientMessage message)
-         {
-
-            try
-            {
-               if (!started)
-               {
-                  failed = true;
-               }
-               messageReceived++;
-               latch.countDown();
-
-               if (latch.getCount() == 0)
-               {
-
-                  message.acknowledge();
-                  started = false;
-                  consumer.setMessageHandler(null);
-               }
-
-            }
-            catch (Exception e)
-            {
-            }
-         }
-      }
-
-      MyHandler handler = new MyHandler(latch);
-
-      consumer.setMessageHandler(handler);
-
-      latch.await();
-
-      Thread.sleep(100);
-
-      assertFalse(handler.failed);
-
-      // Make sure no exceptions were thrown from onMessage
-      assertNull(consumer.getLastException());
-      latch = new CountDownLatch(50);
-      handler = new MyHandler(latch);
-      consumer.setMessageHandler(handler);
-      session.start();
-      assertTrue("message received " + handler.messageReceived, latch.await(5, TimeUnit.SECONDS));
-
-      Thread.sleep(100);
-
-      assertFalse(handler.failed);
-      assertNull(consumer.getLastException());
-      session.close();
-   }
-
+  
    public void testConsumerAckImmediateAutoCommitTrue() throws Exception
    {
       ClientSessionFactory sf = createInVMFactory();

Added: trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientMessageHandlerTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientMessageHandlerTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientMessageHandlerTest.java	2009-03-26 09:02:25 UTC (rev 6168)
@@ -0,0 +1,412 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.tests.integration.client;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.MessageHandler;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.tests.util.ServiceTestBase;
+import org.jboss.messaging.utils.SimpleString;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class ClientMessageHandlerTest extends ServiceTestBase
+{
+   private static final Logger log = Logger.getLogger(ClientConsumerTest.class);
+
+   private MessagingService messagingService;
+
+   private final SimpleString QUEUE = new SimpleString("ConsumerTestQueue");
+
+   @Override
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+
+      messagingService = createService(false);
+
+      messagingService.start();
+   }
+
+   @Override
+   protected void tearDown() throws Exception
+   {
+      messagingService.stop();
+
+      messagingService = null;
+
+      super.tearDown();
+   }
+
+   public void testSetMessageHandlerWithMessagesPending() throws Exception
+   {
+      ClientSessionFactory sf = createInVMFactory();
+
+      ClientSession session = sf.createSession(false, true, true);
+
+      session.createQueue(QUEUE, QUEUE, null, false, false);
+
+      ClientProducer producer = session.createProducer(QUEUE);
+
+      final int numMessages = 100;
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = createTextMessage("m" + i, session);
+         producer.send(message);
+      }
+
+      ClientConsumer consumer = session.createConsumer(QUEUE, null, true);
+
+      session.start();
+
+      Thread.sleep(100);
+
+      // Message should be in consumer
+
+      class MyHandler implements MessageHandler
+      {
+         public void onMessage(final ClientMessage message)
+         {
+            try
+            {
+               Thread.sleep(10);
+
+               message.acknowledge();
+            }
+            catch (Exception e)
+            {
+            }
+         }
+      }
+
+      consumer.setMessageHandler(new MyHandler());
+
+      // Let a few messages get processed
+      Thread.sleep(100);
+
+      // Now set null
+
+      consumer.setMessageHandler(null);
+
+      // Give a bit of time for some queued executors to run
+
+      Thread.sleep(500);
+
+      // Make sure no exceptions were thrown from onMessage
+      assertNull(consumer.getLastException());
+
+      session.close();
+   }
+
+
+   public void testSetResetMessageHandler() throws Exception
+   {
+      ClientSessionFactory sf = createInVMFactory();
+
+      final ClientSession session = sf.createSession(false, true, true);
+
+      session.createQueue(QUEUE, QUEUE, null, false, false);
+
+      ClientProducer producer = session.createProducer(QUEUE);
+
+      final int numMessages = 100;
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = createTextMessage("m" + i, session);
+         message.putIntProperty(new SimpleString("i"), i);
+         producer.send(message);
+      }
+
+      final ClientConsumer consumer = session.createConsumer(QUEUE);
+
+
+      session.start();
+
+      CountDownLatch latch = new CountDownLatch(50);
+
+      // Message should be in consumer
+
+      class MyHandler implements MessageHandler
+      {
+         int messageReceived = 0;
+
+         boolean failed;
+
+         boolean started = true;
+
+         private final CountDownLatch latch;
+
+         public MyHandler(CountDownLatch latch)
+         {
+            this.latch = latch;
+         }
+
+         public void onMessage(final ClientMessage message)
+         {
+
+            try
+            {
+               if (!started)
+               {
+                  failed = true;
+               }
+               messageReceived++;
+               latch.countDown();
+
+               if (latch.getCount() == 0)
+               {
+
+                  message.acknowledge();
+                  started = false;
+                  consumer.setMessageHandler(null);
+               }
+
+            }
+            catch (Exception e)
+            {
+            }
+         }
+      }
+
+      MyHandler handler = new MyHandler(latch);
+
+      consumer.setMessageHandler(handler);
+
+      latch.await();
+
+      Thread.sleep(100);
+
+      assertFalse(handler.failed);
+
+      // Make sure no exceptions were thrown from onMessage
+      assertNull(consumer.getLastException());
+      latch = new CountDownLatch(50);
+      handler = new MyHandler(latch);
+      consumer.setMessageHandler(handler);
+      session.start();
+      assertTrue("message received " + handler.messageReceived, latch.await(5, TimeUnit.SECONDS));
+
+      Thread.sleep(100);
+
+      assertFalse(handler.failed);
+      assertNull(consumer.getLastException());
+      session.close();
+   }
+
+   public void testSetUnsetMessageHandler() throws Exception
+   {
+      ClientSessionFactory sf = createInVMFactory();
+
+      final ClientSession session = sf.createSession(false, true, true);
+
+      session.createQueue(QUEUE, QUEUE, null, false, false);
+
+      ClientProducer producer = session.createProducer(QUEUE);
+
+      final int numMessages = 100;
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = createTextMessage("m" + i, session);
+         message.putIntProperty(new SimpleString("i"), i);
+         producer.send(message);
+      }
+
+      final ClientConsumer consumer = session.createConsumer(QUEUE);
+
+
+      session.start();
+
+      CountDownLatch latch = new CountDownLatch(50);
+
+      // Message should be in consumer
+
+      class MyHandler implements MessageHandler
+      {
+         int messageReceived = 0;
+
+         boolean failed;
+
+         boolean started = true;
+
+         private final CountDownLatch latch;
+
+         public MyHandler(CountDownLatch latch)
+         {
+            this.latch = latch;
+         }
+
+         public void onMessage(final ClientMessage message)
+         {
+
+            try
+            {
+               if (!started)
+               {
+                  failed = true;
+               }
+               messageReceived++;
+               latch.countDown();
+
+               if (latch.getCount() == 0)
+               {
+
+                  message.acknowledge();
+                  started = false;
+                  consumer.setMessageHandler(null);
+               }
+
+            }
+            catch (Exception e)
+            {
+            }
+         }
+      }
+
+      MyHandler handler = new MyHandler(latch);
+
+      consumer.setMessageHandler(handler);
+
+      latch.await();
+
+      Thread.sleep(100);
+
+      assertFalse(handler.failed);
+
+      // Make sure no exceptions were thrown from onMessage
+      assertNull(consumer.getLastException());
+      consumer.setMessageHandler(null);
+      ClientMessage cm = consumer.receiveImmediate();
+      assertNotNull(cm);
+
+      session.close();
+   }
+
+   public void testSetUnsetResetMessageHandler() throws Exception
+   {
+      ClientSessionFactory sf = createInVMFactory();
+
+      final ClientSession session = sf.createSession(false, true, true);
+
+      session.createQueue(QUEUE, QUEUE, null, false, false);
+
+      ClientProducer producer = session.createProducer(QUEUE);
+
+      final int numMessages = 100;
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = createTextMessage("m" + i, session);
+         message.putIntProperty(new SimpleString("i"), i);
+         producer.send(message);
+      }
+
+      final ClientConsumer consumer = session.createConsumer(QUEUE);
+
+
+      session.start();
+
+      CountDownLatch latch = new CountDownLatch(50);
+
+      // Message should be in consumer
+
+      class MyHandler implements MessageHandler
+      {
+         int messageReceived = 0;
+
+         boolean failed;
+
+         boolean started = true;
+
+         private final CountDownLatch latch;
+
+         public MyHandler(CountDownLatch latch)
+         {
+            this.latch = latch;
+         }
+
+         public void onMessage(final ClientMessage message)
+         {
+
+            try
+            {
+               if (!started)
+               {
+                  failed = true;
+               }
+               messageReceived++;
+               latch.countDown();
+
+               if (latch.getCount() == 0)
+               {
+
+                  message.acknowledge();
+                  started = false;
+                  consumer.setMessageHandler(null);
+               }
+
+            }
+            catch (Exception e)
+            {
+            }
+         }
+      }
+
+      MyHandler handler = new MyHandler(latch);
+
+      consumer.setMessageHandler(handler);
+
+      latch.await();
+
+      Thread.sleep(100);
+
+      assertFalse(handler.failed);
+
+      // Make sure no exceptions were thrown from onMessage
+      assertNull(consumer.getLastException());
+      consumer.setMessageHandler(null);
+      ClientMessage cm = consumer.receiveImmediate();
+      assertNotNull(cm);
+      latch = new CountDownLatch(49);
+      handler = new MyHandler(latch);
+      consumer.setMessageHandler(handler);
+      session.start();
+      assertTrue("message received " + handler.messageReceived, latch.await(5, TimeUnit.SECONDS));
+
+      Thread.sleep(100);
+
+      assertFalse(handler.failed);
+      assertNull(consumer.getLastException());
+      session.close();
+   }
+}

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientSessionStopStartTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientSessionStopStartTest.java	2009-03-26 02:47:40 UTC (rev 6167)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientSessionStopStartTest.java	2009-03-26 09:02:25 UTC (rev 6168)
@@ -29,7 +29,6 @@
 import org.jboss.messaging.core.client.MessageHandler;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.server.MessagingService;
-import org.jboss.messaging.core.server.Queue;
 import org.jboss.messaging.tests.util.ServiceTestBase;
 import org.jboss.messaging.utils.SimpleString;
 
@@ -43,31 +42,31 @@
 {
    private static final Logger log = Logger.getLogger(ClientConsumerTest.class);
 
-      private MessagingService messagingService;
+   private MessagingService messagingService;
 
-      private final SimpleString QUEUE = new SimpleString("ConsumerTestQueue");
+   private final SimpleString QUEUE = new SimpleString("ConsumerTestQueue");
 
-      @Override
-      protected void setUp() throws Exception
-      {
-         super.setUp();
+   @Override
+   protected void setUp() throws Exception
+   {
+      super.setUp();
 
-         messagingService = createService(false);
+      messagingService = createService(false);
 
-         messagingService.start();
-      }
+      messagingService.start();
+   }
 
-      @Override
-      protected void tearDown() throws Exception
-      {
-         messagingService.stop();
+   @Override
+   protected void tearDown() throws Exception
+   {
+      messagingService.stop();
 
-         messagingService = null;
+      messagingService = null;
 
-         super.tearDown();
-      }
+      super.tearDown();
+   }
 
-    public void testStopStartConsumerSyncReceiveImmediate() throws Exception
+   public void testStopStartConsumerSyncReceiveImmediate() throws Exception
    {
       ClientSessionFactory sf = createInVMFactory();
 
@@ -91,7 +90,7 @@
       session.start();
 
 
-      for(int i = 0; i < numMessages/2; i++)
+      for (int i = 0; i < numMessages / 2; i++)
       {
          ClientMessage cm = consumer.receive(5000);
          assertNotNull(cm);
@@ -101,6 +100,14 @@
       ClientMessage cm = consumer.receiveImmediate();
       assertNull(cm);
 
+      session.start();
+      for (int i = 0; i < numMessages / 2; i++)
+      {
+         cm = consumer.receive(5000);
+         assertNotNull(cm);
+         cm.acknowledge();
+      }
+
       session.close();
    }
 
@@ -128,7 +135,7 @@
       session.start();
 
 
-      for(int i = 0; i < numMessages/2; i++)
+      for (int i = 0; i < numMessages / 2; i++)
       {
          ClientMessage cm = consumer.receive(5000);
          assertNotNull(cm);
@@ -141,10 +148,18 @@
       assertTrue(taken >= 1000);
       assertNull(cm);
 
+      session.start();
+      for (int i = 0; i < numMessages / 2; i++)
+      {
+         cm = consumer.receive(5000);
+         assertNotNull(cm);
+         cm.acknowledge();
+      }
+
       session.close();
    }
 
-   public void testStopStartConsumerAsyncSync() throws Exception
+   public void testStopStartConsumerAsyncSyncStoppedByHandler() throws Exception
    {
       ClientSessionFactory sf = createInVMFactory();
 
@@ -165,11 +180,97 @@
 
       final ClientConsumer consumer = session.createConsumer(QUEUE);
 
-      Queue q = (Queue) messagingService.getServer().getPostOffice().getBinding(QUEUE).getBindable();
-         int ccount = q.getConsumerCount();
+      session.start();
 
+      final CountDownLatch latch = new CountDownLatch(10);
+
+      // Message should be in consumer
+
+      class MyHandler implements MessageHandler
+      {
+         boolean failed;
+
+         boolean started = true;
+
+         public void onMessage(final ClientMessage message)
+         {
+
+            try
+            {
+               if (!started)
+               {
+                  failed = true;
+               }
+
+               latch.countDown();
+
+               if (latch.getCount() == 0)
+               {
+
+                  message.acknowledge();
+                  session.stop();
+                  started = false;
+               }
+            }
+            catch (Exception e)
+            {
+            }
+         }
+      }
+
+      MyHandler handler = new MyHandler();
+
+      consumer.setMessageHandler(handler);
+
+      latch.await();
+
+      Thread.sleep(100);
+
+      assertFalse(handler.failed);
+
+      // Make sure no exceptions were thrown from onMessage
+      assertNull(consumer.getLastException());
+      consumer.setMessageHandler(null);
       session.start();
+      for (int i = 0; i < 90; i++)
+      {
+         ClientMessage msg = consumer.receive(1000);
+         if (msg == null)
+         {
+            System.out.println("ClientConsumerTest.testStopConsumer");
+         }
+         assertNotNull("message " + i, msg);
+         msg.acknowledge();
+      }
 
+      assertNull(consumer.receiveImmediate());
+
+      session.close();
+   }
+
+      public void testStopStartConsumerAsyncSync() throws Exception
+   {
+      ClientSessionFactory sf = createInVMFactory();
+
+      final ClientSession session = sf.createSession(false, true, true);
+
+      session.createQueue(QUEUE, QUEUE, null, false, false);
+
+      ClientProducer producer = session.createProducer(QUEUE);
+
+      final int numMessages = 100;
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = createTextMessage("m" + i, session);
+         message.putIntProperty(new SimpleString("i"), i);
+         producer.send(message);
+      }
+
+      final ClientConsumer consumer = session.createConsumer(QUEUE);
+
+      session.start();
+
       final CountDownLatch latch = new CountDownLatch(10);
 
       // Message should be in consumer
@@ -196,9 +297,8 @@
                {
 
                   message.acknowledge();
-                  session.stop(); // Shouldn't this alone prevent messages being delivered to this Handler?
                   started = false;
-                  //consumer.setMessageHandler(null); // If we comment out this line, the test will fail
+                  consumer.setMessageHandler(null);
                }
 
             }
@@ -214,7 +314,7 @@
 
       latch.await();
 
-      Thread.sleep(100);
+      session.stop();
 
       assertFalse(handler.failed);
 
@@ -225,8 +325,7 @@
       for (int i = 0; i < 90; i++)
       {
          ClientMessage msg = consumer.receive(1000);
-         ccount = q.getConsumerCount();
-         if(msg == null)
+         if (msg == null)
          {
             System.out.println("ClientConsumerTest.testStopConsumer");
          }
@@ -239,7 +338,7 @@
       session.close();
    }
 
-   public void testStopStartConsumerAsyncASync() throws Exception
+   public void testStopStartConsumerAsyncASyncStoppeeByHandler() throws Exception
    {
       ClientSessionFactory sf = createInVMFactory();
 
@@ -260,11 +359,110 @@
 
       final ClientConsumer consumer = session.createConsumer(QUEUE);
 
-      Queue q = (Queue) messagingService.getServer().getPostOffice().getBinding(QUEUE).getBindable();
-         int ccount = q.getConsumerCount();
+      session.start();
 
+      CountDownLatch latch = new CountDownLatch(10);
+
+      // Message should be in consumer
+
+      class MyHandler implements MessageHandler
+      {
+         int messageReceived = 0;
+
+         boolean failed;
+
+         boolean started = true;
+
+         private final CountDownLatch latch;
+
+         private boolean stop = true;
+
+         public MyHandler(CountDownLatch latch)
+         {
+            this.latch = latch;
+         }
+
+         public MyHandler(CountDownLatch latch, boolean stop)
+         {
+            this(latch);
+            this.stop = stop;
+         }
+
+         public void onMessage(final ClientMessage message)
+         {
+
+            try
+            {
+               if (!started)
+               {
+                  failed = true;
+               }
+               messageReceived++;
+               latch.countDown();
+
+               if (stop && latch.getCount() == 0)
+               {
+
+                  message.acknowledge();
+                  session.stop();
+                  started = false;
+               }
+
+            }
+            catch (Exception e)
+            {
+            }
+         }
+      }
+
+      MyHandler handler = new MyHandler(latch);
+
+      consumer.setMessageHandler(handler);
+
+      latch.await();
+
+      Thread.sleep(100);
+
+      assertFalse(handler.failed);
+
+      // Make sure no exceptions were thrown from onMessage
+      assertNull(consumer.getLastException());
+      latch = new CountDownLatch(90);
+      handler = new MyHandler(latch, false);
+      consumer.setMessageHandler(handler);
       session.start();
+      assertTrue("message received " + handler.messageReceived, latch.await(5, TimeUnit.SECONDS));
 
+      Thread.sleep(100);
+
+      assertFalse(handler.failed);
+      assertNull(consumer.getLastException());
+      session.close();
+   }
+
+   public void testStopStartConsumerAsyncASync() throws Exception
+   {
+      ClientSessionFactory sf = createInVMFactory();
+
+      final ClientSession session = sf.createSession(false, true, true);
+
+      session.createQueue(QUEUE, QUEUE, null, false, false);
+
+      ClientProducer producer = session.createProducer(QUEUE);
+
+      final int numMessages = 100;
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = createTextMessage("m" + i, session);
+         message.putIntProperty(new SimpleString("i"), i);
+         producer.send(message);
+      }
+
+      final ClientConsumer consumer = session.createConsumer(QUEUE);
+
+      session.start();
+
       CountDownLatch latch = new CountDownLatch(10);
 
       // Message should be in consumer
@@ -272,6 +470,7 @@
       class MyHandler implements MessageHandler
       {
          int messageReceived = 0;
+
          boolean failed;
 
          boolean started = true;
@@ -307,9 +506,8 @@
                {
 
                   message.acknowledge();
-                  session.stop(); // Shouldn't this alone prevent messages being delivered to this Handler?
+                  consumer.setMessageHandler(null);
                   started = false;
-                  //consumer.setMessageHandler(null); // If we comment out this line, the test will fail
                }
 
             }
@@ -344,4 +542,149 @@
       session.close();
    }
 
+    public void testStopStartMultipleConsumers() throws Exception
+   {
+      ClientSessionFactory sf = createInVMFactory();
+      sf.setConsumerWindowSize(10);
+      final ClientSession session = sf.createSession(false, true, true);
+
+      session.createQueue(QUEUE, QUEUE, null, false, false);
+
+      ClientProducer producer = session.createProducer(QUEUE);
+
+      final int numMessages = 100;
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = createTextMessage("m" + i, session);
+         message.putIntProperty(new SimpleString("i"), i);
+         producer.send(message);
+      }
+
+      ClientConsumer consumer = session.createConsumer(QUEUE);
+      ClientConsumer consumer2 = session.createConsumer(QUEUE);
+      ClientConsumer consumer3 = session.createConsumer(QUEUE);
+
+      session.start();
+
+      ClientMessage cm = consumer.receive(5000);
+      assertNotNull(cm);
+      cm.acknowledge();
+      cm = consumer2.receive(5000);
+      assertNotNull(cm);
+      cm.acknowledge();
+      cm = consumer3.receive(5000);
+      assertNotNull(cm);
+      cm.acknowledge();
+
+      session.stop();
+      cm = consumer.receiveImmediate();
+      assertNull(cm);
+      cm = consumer2.receiveImmediate();
+      assertNull(cm);
+      cm = consumer3.receiveImmediate();
+      assertNull(cm);
+
+      session.start();
+      cm = consumer.receiveImmediate();
+      assertNotNull(cm);
+      cm = consumer2.receiveImmediate();
+      assertNotNull(cm);
+      cm = consumer3.receiveImmediate();
+      assertNotNull(cm);
+      session.close();
+   }
+
+
+   public void testStopStartAlreadyStartedSession() throws Exception
+      {
+         ClientSessionFactory sf = createInVMFactory();
+
+         final ClientSession session = sf.createSession(false, true, true);
+
+         session.createQueue(QUEUE, QUEUE, null, false, false);
+
+         ClientProducer producer = session.createProducer(QUEUE);
+
+         final int numMessages = 100;
+
+         for (int i = 0; i < numMessages; i++)
+         {
+            ClientMessage message = createTextMessage("m" + i, session);
+            message.putIntProperty(new SimpleString("i"), i);
+            producer.send(message);
+         }
+
+         final ClientConsumer consumer = session.createConsumer(QUEUE);
+
+         session.start();
+
+
+         for (int i = 0; i < numMessages / 2; i++)
+         {
+            ClientMessage cm = consumer.receive(5000);
+            assertNotNull(cm);
+            cm.acknowledge();
+         }
+
+         session.start();
+         for (int i = 0; i < numMessages / 2; i++)
+         {
+            ClientMessage cm = consumer.receive(5000);
+            assertNotNull(cm);
+            cm.acknowledge();
+         }
+
+         session.close();
+      }
+
+     public void testStopAlreadyStoppedSession() throws Exception
+   {
+      ClientSessionFactory sf = createInVMFactory();
+
+      final ClientSession session = sf.createSession(false, true, true);
+
+      session.createQueue(QUEUE, QUEUE, null, false, false);
+
+      ClientProducer producer = session.createProducer(QUEUE);
+
+      final int numMessages = 100;
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = createTextMessage("m" + i, session);
+         message.putIntProperty(new SimpleString("i"), i);
+         producer.send(message);
+      }
+
+      final ClientConsumer consumer = session.createConsumer(QUEUE);
+
+      session.start();
+
+
+      for (int i = 0; i < numMessages / 2; i++)
+      {
+         ClientMessage cm = consumer.receive(5000);
+         assertNotNull(cm);
+         cm.acknowledge();
+      }
+      session.stop();
+      ClientMessage cm = consumer.receiveImmediate();
+      assertNull(cm);
+
+      session.stop();
+      cm = consumer.receiveImmediate();
+      assertNull(cm);
+      
+      session.start();
+      for (int i = 0; i < numMessages / 2; i++)
+      {
+         cm = consumer.receive(5000);
+         assertNotNull(cm);
+         cm.acknowledge();
+      }
+
+      session.close();
+   }
+
 }




More information about the jboss-cvs-commits mailing list