[jboss-cvs] JBoss Messaging SVN: r7687 - in trunk: tests/src/org/jboss/messaging/tests/integration/client and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Aug 7 06:58:40 EDT 2009
Author: timfox
Date: 2009-08-07 06:58:39 -0400 (Fri, 07 Aug 2009)
New Revision: 7687
Modified:
trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
trunk/tests/src/org/jboss/messaging/tests/integration/client/ConsumerCloseTest.java
Log:
https://jira.jboss.org/jira/browse/JBMESSAGING-1526
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java 2009-08-07 09:51:39 UTC (rev 7686)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java 2009-08-07 10:58:39 UTC (rev 7687)
@@ -137,7 +137,7 @@
// ClientConsumer implementation
// -----------------------------------------------------------------
- public ClientMessage receive(long timeout) throws MessagingException
+ public ClientMessage receive(long timeout) throws MessagingException
{
checkClosed();
@@ -379,11 +379,6 @@
messageToHandle.onReceipt(this);
- if (trace)
- {
- log.trace("Adding message " + message + " into buffer");
- }
-
// Add it to the buffer
buffer.addLast(messageToHandle, messageToHandle.getPriority());
@@ -419,10 +414,11 @@
currentChunkMessage.setLargeMessage(true);
File largeMessageCache = null;
-
+
if (session.isCacheLargeMessageClient())
{
- largeMessageCache = File.createTempFile("tmp-large-message-" + currentChunkMessage.getMessageID()+ "-", ".tmp");
+ largeMessageCache = File.createTempFile("tmp-large-message-" + currentChunkMessage.getMessageID() + "-",
+ ".tmp");
largeMessageCache.deleteOnExit();
}
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java 2009-08-07 09:51:39 UTC (rev 7686)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java 2009-08-07 10:58:39 UTC (rev 7687)
@@ -1018,9 +1018,7 @@
{
// Can be legitimately null if session was closed before then went to remove session from csf
// and locked since failover had started then after failover removes it but it's already been failed
- }
-
- log.info("Returning connection, now " + this.connections.size() + " pingers " + this.pingers.size());
+ }
}
private void failConnections(final MessagingException me)
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/client/ConsumerCloseTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/ConsumerCloseTest.java 2009-08-07 09:51:39 UTC (rev 7686)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/ConsumerCloseTest.java 2009-08-07 10:58:39 UTC (rev 7687)
@@ -25,6 +25,7 @@
import org.jboss.messaging.core.client.ClientConsumer;
import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
import org.jboss.messaging.core.client.ClientSession;
import org.jboss.messaging.core.client.ClientSessionFactory;
import org.jboss.messaging.core.client.MessageHandler;
@@ -33,6 +34,7 @@
import org.jboss.messaging.core.config.TransportConfiguration;
import org.jboss.messaging.core.config.impl.ConfigurationImpl;
import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory;
import org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory;
import org.jboss.messaging.core.server.Messaging;
@@ -46,15 +48,21 @@
*/
public class ConsumerCloseTest extends ServiceTestBase
{
-
// Constants -----------------------------------------------------
+
+ private static final Logger log = Logger.getLogger(ConsumerCloseTest.class);
+
// Attributes ----------------------------------------------------
private MessagingServer server;
+
private ClientSession session;
+
private SimpleString queue;
+ private SimpleString address;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -76,7 +84,7 @@
consumer.receive();
}
});
-
+
expectMessagingException(MessagingException.OBJECT_CLOSED, new MessagingAction()
{
public void run() throws MessagingException
@@ -99,6 +107,51 @@
});
}
+ // https://jira.jboss.org/jira/browse/JBMESSAGING-1526
+ public void testCloseWithManyMessagesInBufferAndSlowConsumer() throws Exception
+ {
+ ClientConsumer consumer = session.createConsumer(queue);
+
+ ClientProducer producer = session.createProducer(address);
+
+ final int numMessages = 10;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session.createClientMessage(false);
+
+ producer.send(message);
+ }
+
+ class MyHandler implements MessageHandler
+ {
+ public void onMessage(ClientMessage message)
+ {
+ try
+ {
+ Thread.sleep(1000);
+ }
+ catch (Exception e)
+ {
+ }
+ }
+ }
+
+ consumer.setMessageHandler(new MyHandler());
+
+ session.start();
+
+ Thread.sleep(1000);
+
+ //Close shouldn't wait for all messages to be processed before closing
+ long start= System.currentTimeMillis();
+ consumer.close();
+ long end = System.currentTimeMillis();
+
+ assertTrue(end - start <= 1500);
+
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
@@ -113,8 +166,8 @@
config.setSecurityEnabled(false);
server = Messaging.newMessagingServer(config, false);
server.start();
-
- SimpleString address = randomSimpleString();
+
+ address = randomSimpleString();
queue = randomSimpleString();
sf = new ClientSessionFactoryImpl(new TransportConfiguration(InVMConnectorFactory.class.getName()));
@@ -122,18 +175,18 @@
session.createQueue(address, queue, false);
}
-
+
private ClientSessionFactory sf;
@Override
protected void tearDown() throws Exception
{
session.deleteQueue(queue);
-
+
session.close();
-
+
sf.close();
-
+
server.stop();
super.tearDown();
More information about the jboss-cvs-commits
mailing list