[jboss-cvs] JBoss Messaging SVN: r6152 - in trunk: tests/src/org/jboss/messaging/tests/integration/client and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Mar 24 14:47:27 EDT 2009


Author: ataylor
Date: 2009-03-24 14:47:26 -0400 (Tue, 24 Mar 2009)
New Revision: 6152

Modified:
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
   trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientConsumerTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientEndToEndTest.java
Log:
fixed session.stop() and tests

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java	2009-03-24 14:45:53 UTC (rev 6151)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java	2009-03-24 18:47:26 UTC (rev 6152)
@@ -12,14 +12,6 @@
 
 package org.jboss.messaging.core.client.impl;
 
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.util.LinkedList;
-import java.util.Queue;
-import java.util.concurrent.Executor;
-
 import org.jboss.messaging.core.buffers.ChannelBuffers;
 import org.jboss.messaging.core.client.ClientFileMessage;
 import org.jboss.messaging.core.client.ClientMessage;
@@ -34,6 +26,14 @@
 import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
 import org.jboss.messaging.utils.Future;
 
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.concurrent.Executor;
+
 /**
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
@@ -94,6 +94,8 @@
 
    private volatile ClientMessage lastAckedMessage;
 
+   private boolean stopped = false;
+
    // Constructors
    // ---------------------------------------------------------------------------------
 
@@ -153,7 +155,8 @@
 
             synchronized (this)
             {
-               while ((m = buffer.poll()) == null && !closed && toWait > 0)
+               while ((stopped || (m = buffer.poll()) == null) &&
+                      !closed && toWait > 0 )
                {
                   if (start == -1)
                   {
@@ -251,11 +254,17 @@
 
       if (queueUp)
       {
+         stopped = false;
          for (int i = 0; i < buffer.size(); i++)
          {
             queueExecutor();
          }
       }
+      else
+      {
+         stopped = true;
+         waitForOnMessageToComplete();
+      }
    }
 
    public void close() throws MessagingException
@@ -288,6 +297,25 @@
       return directory != null;
    }
 
+   public synchronized void stop() throws MessagingException
+   {
+      if(stopped)
+      {
+         return;
+      }
+      stopped = true;
+      waitForOnMessageToComplete();
+   }
+
+   public synchronized void start()
+   {
+      stopped = false;
+      for (int i = 0; i < buffer.size(); i++)
+      {
+         queueExecutor();
+      }
+   }
+
    public Exception getLastException()
    {
       return lastException;
@@ -322,9 +350,11 @@
       {
          // Execute using executor
 
-         buffer.add(messageToHandle);
-
-         queueExecutor();
+       buffer.add(messageToHandle);
+       if(!stopped)
+       {
+          queueExecutor();
+       }
       }
       else
       {
@@ -529,11 +559,10 @@
 
    private void callOnMessage() throws Exception
    {
-      if (closing)
+      if (closing || stopped)
       {
          return;
       }
-
       // We pull the message from the buffer from inside the Runnable so we can ensure priority
       // ordering. If we just added a Runnable with the message to the executor immediately as we get it
       // we could not do that

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java	2009-03-24 14:45:53 UTC (rev 6151)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java	2009-03-24 18:47:26 UTC (rev 6152)
@@ -60,4 +60,8 @@
    void flushAcks() throws MessagingException;
    
    boolean isFileConsumer();
+
+   void stop() throws MessagingException;
+
+   void start();
 }

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2009-03-24 14:45:53 UTC (rev 6151)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2009-03-24 18:47:26 UTC (rev 6152)
@@ -21,21 +21,6 @@
  */
 package org.jboss.messaging.core.client.impl;
 
-import static org.jboss.messaging.utils.SimpleString.toSimpleString;
-
-import java.io.File;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-
-import javax.transaction.xa.XAException;
-import javax.transaction.xa.XAResource;
-import javax.transaction.xa.Xid;
-
 import org.jboss.messaging.core.buffers.ChannelBuffers;
 import org.jboss.messaging.core.client.ClientConsumer;
 import org.jboss.messaging.core.client.ClientFileMessage;
@@ -87,8 +72,21 @@
 import org.jboss.messaging.utils.OrderedExecutorFactory;
 import org.jboss.messaging.utils.SimpleIDGenerator;
 import org.jboss.messaging.utils.SimpleString;
+import static org.jboss.messaging.utils.SimpleString.toSimpleString;
 import org.jboss.messaging.utils.TokenBucketLimiterImpl;
 
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+import java.io.File;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
 /*
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  *
@@ -607,6 +605,11 @@
 
       if (!started)
       {
+         for (ClientConsumerInternal clientConsumerInternal : consumers.values())
+         {
+            clientConsumerInternal.start();
+         }
+
          channel.send(new PacketImpl(PacketImpl.SESS_START));
 
          started = true;
@@ -619,6 +622,11 @@
 
       if (started)
       {
+         for (ClientConsumerInternal clientConsumerInternal : consumers.values())
+         {
+            clientConsumerInternal.stop();
+         }
+
          channel.sendBlocking(new PacketImpl(PacketImpl.SESS_STOP));
 
          started = false;

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientConsumerTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientConsumerTest.java	2009-03-24 14:45:53 UTC (rev 6151)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientConsumerTest.java	2009-03-24 18:47:26 UTC (rev 6152)
@@ -21,8 +21,6 @@
  */
 package org.jboss.messaging.tests.integration.client;
 
-import java.util.concurrent.CountDownLatch;
-
 import org.jboss.messaging.core.client.ClientConsumer;
 import org.jboss.messaging.core.client.ClientMessage;
 import org.jboss.messaging.core.client.ClientProducer;
@@ -37,6 +35,9 @@
 import org.jboss.messaging.tests.util.ServiceTestBase;
 import org.jboss.messaging.utils.SimpleString;
 
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
 /**
  * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
  * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
@@ -466,8 +467,44 @@
 
       session.close();
    }
+   public void testStopStartConsumerSyncReceiveImmediate() throws Exception
+   {
+      ClientSessionFactory sf = createInVMFactory();
 
-   public void testStopConsumer() throws Exception
+      final ClientSession session = sf.createSession(false, true, true);
+
+      session.createQueue(QUEUE, QUEUE, null, false, false);
+
+      ClientProducer producer = session.createProducer(QUEUE);
+
+      final int numMessages = 100;
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = createTextMessage("m" + i, session);
+         message.putIntProperty(new SimpleString("i"), i);
+         producer.send(message);
+      }
+
+      final ClientConsumer consumer = session.createConsumer(QUEUE);
+
+      session.start();
+
+
+      for(int i = 0; i < numMessages/2; i++)
+      {
+         ClientMessage cm = consumer.receive(5000);
+         assertNotNull(cm);
+         cm.acknowledge();
+      }
+      session.stop();
+      ClientMessage cm = consumer.receiveImmediate();
+      assertNull(cm);
+
+      session.close();
+   }
+
+   public void testStopStartConsumerSyncReceive() throws Exception
    {
       ClientSessionFactory sf = createInVMFactory();
 
@@ -482,13 +519,57 @@
       for (int i = 0; i < numMessages; i++)
       {
          ClientMessage message = createTextMessage("m" + i, session);
+         message.putIntProperty(new SimpleString("i"), i);
          producer.send(message);
       }
 
-      final ClientConsumer consumer = session.createConsumer(QUEUE, null, true);
+      final ClientConsumer consumer = session.createConsumer(QUEUE);
 
       session.start();
 
+
+      for(int i = 0; i < numMessages/2; i++)
+      {
+         ClientMessage cm = consumer.receive(5000);
+         assertNotNull(cm);
+         cm.acknowledge();
+      }
+      session.stop();
+      long time = System.currentTimeMillis();
+      ClientMessage cm = consumer.receive(1000);
+      long taken = System.currentTimeMillis() - time;
+      assertTrue(taken >= 1000);
+      assertNull(cm);
+
+      session.close();
+   }
+
+   public void testStopStartConsumerAsyncSync() throws Exception
+   {
+      ClientSessionFactory sf = createInVMFactory();
+
+      final ClientSession session = sf.createSession(false, true, true);
+
+      session.createQueue(QUEUE, QUEUE, null, false, false);
+
+      ClientProducer producer = session.createProducer(QUEUE);
+
+      final int numMessages = 100;
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = createTextMessage("m" + i, session);
+         message.putIntProperty(new SimpleString("i"), i);
+         producer.send(message);
+      }
+
+      final ClientConsumer consumer = session.createConsumer(QUEUE);
+
+      Queue q = (Queue) messagingService.getServer().getPostOffice().getBinding(QUEUE).getBindable();
+         int ccount = q.getConsumerCount();
+
+      session.start();
+
       final CountDownLatch latch = new CountDownLatch(10);
 
       // Message should be in consumer
@@ -513,12 +594,13 @@
 
                if (latch.getCount() == 0)
                {
+
+                  message.acknowledge();
                   session.stop(); // Shouldn't this alone prevent messages being delivered to this Handler?
                   started = false;
-                  consumer.setMessageHandler(null); // If we comment out this line, the test will fail
+                  //consumer.setMessageHandler(null); // If we comment out this line, the test will fail
                }
 
-               message.acknowledge();
             }
             catch (Exception e)
             {
@@ -538,11 +620,17 @@
 
       // Make sure no exceptions were thrown from onMessage
       assertNull(consumer.getLastException());
-
+      consumer.setMessageHandler(null);
+      session.start();
       for (int i = 0; i < 90; i++)
       {
          ClientMessage msg = consumer.receive(1000);
-         assertNotNull(msg);
+         ccount = q.getConsumerCount();
+         if(msg == null)
+         {
+            System.out.println("ClientConsumerTest.testStopConsumer");
+         }
+         assertNotNull("message " + i, msg);
          msg.acknowledge();
       }
 
@@ -551,6 +639,208 @@
       session.close();
    }
 
+   public void testStopStartConsumerAsyncASync() throws Exception
+   {
+      ClientSessionFactory sf = createInVMFactory();
+
+      final ClientSession session = sf.createSession(false, true, true);
+
+      session.createQueue(QUEUE, QUEUE, null, false, false);
+
+      ClientProducer producer = session.createProducer(QUEUE);
+
+      final int numMessages = 100;
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = createTextMessage("m" + i, session);
+         message.putIntProperty(new SimpleString("i"), i);
+         producer.send(message);
+      }
+
+      final ClientConsumer consumer = session.createConsumer(QUEUE);
+
+      Queue q = (Queue) messagingService.getServer().getPostOffice().getBinding(QUEUE).getBindable();
+         int ccount = q.getConsumerCount();
+
+      session.start();
+
+      CountDownLatch latch = new CountDownLatch(10);
+
+      // Message should be in consumer
+
+      class MyHandler implements MessageHandler
+      {
+         int messageReceived = 0;
+         boolean failed;
+
+         boolean started = true;
+
+         private final CountDownLatch latch;
+
+         private boolean stop = true;
+
+         public MyHandler(CountDownLatch latch)
+         {
+            this.latch = latch;
+         }
+
+         public MyHandler(CountDownLatch latch, boolean stop)
+         {
+            this(latch);
+            this.stop = stop;
+         }
+
+         public void onMessage(final ClientMessage message)
+         {
+
+            try
+            {
+               if (!started)
+               {
+                  failed = true;
+               }
+               messageReceived++;
+               latch.countDown();
+
+               if (stop && latch.getCount() == 0)
+               {
+
+                  message.acknowledge();
+                  session.stop(); // Shouldn't this alone prevent messages being delivered to this Handler?
+                  started = false;
+                  //consumer.setMessageHandler(null); // If we comment out this line, the test will fail
+               }
+
+            }
+            catch (Exception e)
+            {
+            }
+         }
+      }
+
+      MyHandler handler = new MyHandler(latch);
+
+      consumer.setMessageHandler(handler);
+
+      latch.await();
+
+      Thread.sleep(100);
+
+      assertFalse(handler.failed);
+
+      // Make sure no exceptions were thrown from onMessage
+      assertNull(consumer.getLastException());
+      latch = new CountDownLatch(90);
+      handler = new MyHandler(latch, false);
+      consumer.setMessageHandler(handler);
+      session.start();
+      assertTrue("message received " + handler.messageReceived, latch.await(5, TimeUnit.SECONDS));
+
+      Thread.sleep(100);
+
+      assertFalse(handler.failed);
+      assertNull(consumer.getLastException());
+      session.close();
+   }
+
+
+   public void testSetUnsetMessageHandler() throws Exception
+   {
+      ClientSessionFactory sf = createInVMFactory();
+
+      final ClientSession session = sf.createSession(false, true, true);
+
+      session.createQueue(QUEUE, QUEUE, null, false, false);
+
+      ClientProducer producer = session.createProducer(QUEUE);
+
+      final int numMessages = 100;
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = createTextMessage("m" + i, session);
+         message.putIntProperty(new SimpleString("i"), i);
+         producer.send(message);
+      }
+
+      final ClientConsumer consumer = session.createConsumer(QUEUE);
+
+      Queue q = (Queue) messagingService.getServer().getPostOffice().getBinding(QUEUE).getBindable();
+         int ccount = q.getConsumerCount();
+
+      session.start();
+
+      CountDownLatch latch = new CountDownLatch(50);
+
+      // Message should be in consumer
+
+      class MyHandler implements MessageHandler
+      {
+         int messageReceived = 0;
+         boolean failed;
+
+         boolean started = true;
+
+         private final CountDownLatch latch;
+
+         public MyHandler(CountDownLatch latch)
+         {
+            this.latch = latch;
+         }
+
+         public void onMessage(final ClientMessage message)
+         {
+
+            try
+            {
+               if (!started)
+               {
+                  failed = true;
+               }
+               messageReceived++;
+               latch.countDown();
+
+               if (latch.getCount() == 0)
+               {
+
+                  message.acknowledge();
+                  started = false;
+                  consumer.setMessageHandler(null);
+               }
+
+            }
+            catch (Exception e)
+            {
+            }
+         }
+      }
+
+      MyHandler handler = new MyHandler(latch);
+
+      consumer.setMessageHandler(handler);
+
+      latch.await();
+
+      Thread.sleep(100);
+
+      assertFalse(handler.failed);
+
+      // Make sure no exceptions were thrown from onMessage
+      assertNull(consumer.getLastException());
+      latch = new CountDownLatch(50);
+      handler = new MyHandler(latch);
+      consumer.setMessageHandler(handler);
+      session.start();
+      assertTrue("message received " + handler.messageReceived, latch.await(5, TimeUnit.SECONDS));
+
+      Thread.sleep(100);
+
+      assertFalse(handler.failed);
+      assertNull(consumer.getLastException());
+      session.close();
+   }
+
    public void testConsumerAckImmediateAutoCommitTrue() throws Exception
    {
       ClientSessionFactory sf = createInVMFactory();

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientEndToEndTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientEndToEndTest.java	2009-03-24 14:45:53 UTC (rev 6151)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientEndToEndTest.java	2009-03-24 18:47:26 UTC (rev 6152)
@@ -1093,53 +1093,51 @@
       }
    }
 
-   //FIXME uncomment when https://jira.jboss.org/jira/browse/JBMESSAGING-1549 is fixed
-//   public void testAsyncConsumerRollback() throws Exception
-//   {
-//      MessagingService messagingService = createService(false);
-//      try
-//      {
-//         messagingService.start();
-//         ClientSessionFactory cf = createInVMFactory();
-//         cf.setBlockOnAcknowledge(true);
-//         cf.setAckBatchSize(0);
-//         ClientSession sendSession = cf.createSession(false, true, true);
-//         final ClientSession session = cf.createSession(false, true, false);
-//         sendSession.createQueue(addressA, queueA, false);
-//         ClientProducer cp = sendSession.createProducer(addressA);
-//         ClientConsumer cc = session.createConsumer(queueA);
-//         int numMessages = 100;
-//         for (int i = 0; i < numMessages; i++)
-//         {
-//            cp.send(sendSession.createClientMessage(false));
-//         }
-//         CountDownLatch latch = new CountDownLatch(numMessages);
-//         session.start();
-//         cc.setMessageHandler(new ackHandler(session, latch));
-//         assertTrue(latch.await(5, TimeUnit.SECONDS));
-//         Queue q = (Queue)messagingService.getServer().getPostOffice().getBinding(queueA).getBindable();
-//         assertEquals(numMessages, q.getDeliveringCount());
-//         assertEquals(numMessages, q.getMessageCount());
-//         //Need to stop session first or rollback will cause immediate redelivery
-//         session.stop();
-//         session.rollback();
-//         assertEquals(0, q.getDeliveringCount());
-//         assertEquals(numMessages, q.getMessageCount());
-//         session.start();
-//         latch = new CountDownLatch(numMessages);
-//         cc.setMessageHandler(new ackHandler(session, latch));
-//         assertTrue(latch.await(5, TimeUnit.SECONDS));
-//         sendSession.close();
-//         session.close();
-//      }
-//      finally
-//      {
-//         if (messagingService.isStarted())
-//         {
-//            messagingService.stop();
-//         }
-//      }      
-//   }
+   public void testAsyncConsumerRollback() throws Exception
+   {
+      MessagingService messagingService = createService(false);
+      try
+      {
+         messagingService.start();
+         ClientSessionFactory cf = createInVMFactory();
+         cf.setBlockOnAcknowledge(true);
+         cf.setAckBatchSize(0);
+         ClientSession sendSession = cf.createSession(false, true, true);
+         final ClientSession session = cf.createSession(false, true, false);
+         sendSession.createQueue(addressA, queueA, false);
+         ClientProducer cp = sendSession.createProducer(addressA);
+         ClientConsumer cc = session.createConsumer(queueA);
+         int numMessages = 100;
+         for (int i = 0; i < numMessages; i++)
+         {
+            cp.send(sendSession.createClientMessage(false));
+         }
+         CountDownLatch latch = new CountDownLatch(numMessages);
+         session.start();
+         cc.setMessageHandler(new ackHandler(session, latch));
+         assertTrue(latch.await(5, TimeUnit.SECONDS));
+         Queue q = (Queue) messagingService.getServer().getPostOffice().getBinding(queueA).getBindable();
+         assertEquals(numMessages, q.getDeliveringCount());
+         assertEquals(numMessages, q.getMessageCount());
+         session.stop();
+         session.rollback();
+         assertEquals(0, q.getDeliveringCount());
+         assertEquals(numMessages, q.getMessageCount());
+         latch = new CountDownLatch(numMessages);
+         cc.setMessageHandler(new ackHandler(session, latch));
+         session.start();
+         assertTrue(latch.await(5, TimeUnit.SECONDS));
+         sendSession.close();
+         session.close();
+      }
+      finally
+      {
+         if (messagingService.isStarted())
+         {
+            messagingService.stop();
+         }
+      }
+   }
 
    public void testSendDeliveryOrderOnCommit() throws Exception
    {
@@ -1412,12 +1410,12 @@
          consumers[3] = session.createConsumer(queueA);
          consumers[4] = session.createConsumer(queueA);
 
-         ClientSession sendSession = cf.createSession(false, true, true);
-         ClientProducer cp = sendSession.createProducer(addressA);
+         //ClientSession sendSession = cf.createSession(false, true, true);
+         ClientProducer cp = session.createProducer(addressA);
          int numMessage = 100;
          for (int i = 0; i < numMessage; i++)
          {
-            ClientMessage cm = sendSession.createClientMessage(false);
+            ClientMessage cm = session.createClientMessage(false);
             cm.getBody().writeInt(i);
             cp.send(cm);
          }
@@ -1431,7 +1429,7 @@
                assertEquals(currMessage++, cm.getBody().readInt());
             }
          }
-         sendSession.close();
+         //sendSession.close();
          session.close();
       }
       finally




More information about the jboss-cvs-commits mailing list