[jboss-cvs] JBoss Messaging SVN: r7687 - in trunk: tests/src/org/jboss/messaging/tests/integration/client and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Aug 7 06:58:40 EDT 2009


Author: timfox
Date: 2009-08-07 06:58:39 -0400 (Fri, 07 Aug 2009)
New Revision: 7687

Modified:
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
   trunk/tests/src/org/jboss/messaging/tests/integration/client/ConsumerCloseTest.java
Log:
https://jira.jboss.org/jira/browse/JBMESSAGING-1526

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java	2009-08-07 09:51:39 UTC (rev 7686)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java	2009-08-07 10:58:39 UTC (rev 7687)
@@ -137,7 +137,7 @@
    // ClientConsumer implementation
    // -----------------------------------------------------------------
 
-    public ClientMessage receive(long timeout) throws MessagingException
+   public ClientMessage receive(long timeout) throws MessagingException
    {
       checkClosed();
 
@@ -379,11 +379,6 @@
 
       messageToHandle.onReceipt(this);
 
-      if (trace)
-      {
-         log.trace("Adding message " + message + " into buffer");
-      }
-
       // Add it to the buffer
       buffer.addLast(messageToHandle, messageToHandle.getPriority());
 
@@ -419,10 +414,11 @@
       currentChunkMessage.setLargeMessage(true);
 
       File largeMessageCache = null;
-      
+
       if (session.isCacheLargeMessageClient())
       {
-         largeMessageCache = File.createTempFile("tmp-large-message-" + currentChunkMessage.getMessageID()+ "-", ".tmp");
+         largeMessageCache = File.createTempFile("tmp-large-message-" + currentChunkMessage.getMessageID() + "-",
+                                                 ".tmp");
          largeMessageCache.deleteOnExit();
       }
 

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java	2009-08-07 09:51:39 UTC (rev 7686)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java	2009-08-07 10:58:39 UTC (rev 7687)
@@ -1018,9 +1018,7 @@
       {
          // Can be legitimately null if session was closed before then went to remove session from csf
          // and locked since failover had started then after failover removes it but it's already been failed
-      } 
-      
-      log.info("Returning connection, now " + this.connections.size() + " pingers " + this.pingers.size());
+      }            
    }
 
    private void failConnections(final MessagingException me)

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/client/ConsumerCloseTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/ConsumerCloseTest.java	2009-08-07 09:51:39 UTC (rev 7686)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/ConsumerCloseTest.java	2009-08-07 10:58:39 UTC (rev 7687)
@@ -25,6 +25,7 @@
 
 import org.jboss.messaging.core.client.ClientConsumer;
 import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
 import org.jboss.messaging.core.client.ClientSession;
 import org.jboss.messaging.core.client.ClientSessionFactory;
 import org.jboss.messaging.core.client.MessageHandler;
@@ -33,6 +34,7 @@
 import org.jboss.messaging.core.config.TransportConfiguration;
 import org.jboss.messaging.core.config.impl.ConfigurationImpl;
 import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory;
 import org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory;
 import org.jboss.messaging.core.server.Messaging;
@@ -46,15 +48,21 @@
  */
 public class ConsumerCloseTest extends ServiceTestBase
 {
-
    // Constants -----------------------------------------------------
+   
+   private static final Logger log = Logger.getLogger(ConsumerCloseTest.class);
 
+
    // Attributes ----------------------------------------------------
 
    private MessagingServer server;
+
    private ClientSession session;
+
    private SimpleString queue;
 
+   private SimpleString address;
+
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
@@ -76,7 +84,7 @@
             consumer.receive();
          }
       });
-      
+
       expectMessagingException(MessagingException.OBJECT_CLOSED, new MessagingAction()
       {
          public void run() throws MessagingException
@@ -99,6 +107,51 @@
       });
    }
 
+   // https://jira.jboss.org/jira/browse/JBMESSAGING-1526
+   public void testCloseWithManyMessagesInBufferAndSlowConsumer() throws Exception
+   {
+      ClientConsumer consumer = session.createConsumer(queue);
+
+      ClientProducer producer = session.createProducer(address);
+
+      final int numMessages = 10;
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = session.createClientMessage(false);
+
+         producer.send(message);
+      }
+
+      class MyHandler implements MessageHandler
+      {
+         public void onMessage(ClientMessage message)
+         {
+            try
+            {
+               Thread.sleep(1000);
+            }
+            catch (Exception e)
+            {
+            }
+         }
+      }
+      
+      consumer.setMessageHandler(new MyHandler());
+      
+      session.start();
+      
+      Thread.sleep(1000);
+      
+      //Close shouldn't wait for all messages to be processed before closing
+      long start= System.currentTimeMillis();
+      consumer.close();
+      long end = System.currentTimeMillis();
+      
+      assertTrue(end - start <= 1500);
+
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------
@@ -113,8 +166,8 @@
       config.setSecurityEnabled(false);
       server = Messaging.newMessagingServer(config, false);
       server.start();
-      
-      SimpleString address = randomSimpleString();
+
+      address = randomSimpleString();
       queue = randomSimpleString();
 
       sf = new ClientSessionFactoryImpl(new TransportConfiguration(InVMConnectorFactory.class.getName()));
@@ -122,18 +175,18 @@
       session.createQueue(address, queue, false);
 
    }
-   
+
    private ClientSessionFactory sf;
 
    @Override
    protected void tearDown() throws Exception
    {
       session.deleteQueue(queue);
-      
+
       session.close();
-      
+
       sf.close();
-      
+
       server.stop();
 
       super.tearDown();




More information about the jboss-cvs-commits mailing list