[jboss-cvs] JBoss Messaging SVN: r2828 - in trunk: tests/src/org/jboss/test/messaging/jms and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Jul 3 08:12:42 EDT 2007
Author: sergeypk
Date: 2007-07-03 08:12:42 -0400 (Tue, 03 Jul 2007)
New Revision: 2828
Modified:
trunk/src/main/org/jboss/jms/client/container/ClientConsumer.java
trunk/src/main/org/jboss/jms/client/container/ConnectionAspect.java
trunk/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java
Log:
http://jira.jboss.com/jira/browse/JBMESSAGING-983 - Connection.stop should wait until all message listeners have completed.
Modified: trunk/src/main/org/jboss/jms/client/container/ClientConsumer.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/ClientConsumer.java 2007-07-03 12:00:14 UTC (rev 2827)
+++ trunk/src/main/org/jboss/jms/client/container/ClientConsumer.java 2007-07-03 12:12:42 UTC (rev 2828)
@@ -198,6 +198,7 @@
private MessageListener listener;
private int ackMode;
private boolean closed;
+ private volatile boolean paused;
private Object mainLock;
private int maxBufferSize;
private int minBufferSize;
@@ -287,12 +288,12 @@
this.listener = listener;
- if (listener != null && !buffer.isEmpty())
+ if (listener != null && isMessageAvailableForConsuming())
{
listenerRunning = true;
this.queueRunner(new ListenerRunner());
- }
+ }
}
}
@@ -337,18 +338,15 @@
public void close(long lastDeliveryId) throws JMSException
{
log.debug(this + " closing");
-
- //Wait for the last delivery to arrive
+
+ //Wait for the last delivery to arrive
waitForLastDelivery(lastDeliveryId);
- //Important! We set the listener to null so the next ListenerRunner won't run
- if (listener != null)
- {
- setMessageListener(null);
- }
+ //Important! We set paused to true so the next ListenerRunner won't run
+ paused = true;
//Now we wait for any current listener runners to run.
- waitForOnMessageToComplete();
+ waitForOnMessageToComplete();
synchronized (mainLock)
{
@@ -370,6 +368,32 @@
if (trace) { log.trace(this + " closed"); }
}
+
+ public void start()
+ {
+ paused = false;
+ synchronized (mainLock)
+ {
+ if (!buffer.isEmpty())
+ {
+ // Messages arrived while the consumer was paused
+ messageAdded();
+ }
+ }
+ }
+
+ public void stop()
+ {
+ log.debug(this + " stopping");
+ paused = true;
+ }
+
+ public void waitUntilStopped() throws JMSException
+ {
+ log.debug(this + " waiting until paused");
+ waitForOnMessageToComplete();
+ if (trace) { log.trace(this + " stopped"); }
+ }
/**
* Method used by the client thread to get a Message, if available.
@@ -729,27 +753,30 @@
{
boolean notified = false;
- // If we have a thread waiting on receive() we notify it
- if (receiverThread != null)
+ if (!paused)
{
- if (trace) { log.trace(this + " notifying receiver/waiter thread"); }
-
- mainLock.notifyAll();
-
- notified = true;
- }
- else if (listener != null)
- {
- // We have a message listener
- if (!listenerRunning)
+ // If we have a thread waiting on receive() we notify it
+ if (receiverThread != null)
{
- listenerRunning = true;
-
- if (trace) { log.trace(this + " scheduled a new ListenerRunner"); }
- this.queueRunner(new ListenerRunner());
- }
-
- //TODO - Execute onMessage on same thread for even better throughput
+ if (trace) { log.trace(this + " notifying receiver/waiter thread"); }
+
+ mainLock.notifyAll();
+
+ notified = true;
+ }
+ else if (listener != null)
+ {
+ // We have a message listener
+ if (!listenerRunning && !paused)
+ {
+ listenerRunning = true;
+
+ if (trace) { log.trace(this + " scheduled a new ListenerRunner"); }
+ this.queueRunner(new ListenerRunner());
+ }
+
+ //TODO - Execute onMessage on same thread for even better throughput
+ }
}
// Make sure we notify any thread waiting for last delivery
@@ -793,7 +820,7 @@
if (timeout == 0)
{
// wait for ever potentially
- while (!closed && buffer.isEmpty())
+ while (!closed && !isMessageAvailableForConsuming())
{
if (trace) { log.trace(this + " waiting on main lock, no timeout"); }
@@ -807,7 +834,7 @@
// wait with timeout
long toWait = timeout;
- while (!closed && buffer.isEmpty() && toWait > 0)
+ while (!closed && !isMessageAvailableForConsuming() && toWait > 0)
{
if (trace) { log.trace(this + " waiting on main lock, timeout " + toWait + " ms"); }
@@ -834,6 +861,14 @@
return m;
}
+ /**
+ * @return true if the ClientConsumer is not paused and has a message in its buffer.
+ */
+ private boolean isMessageAvailableForConsuming()
+ {
+ return !paused && !buffer.isEmpty();
+ }
+
// Inner classes --------------------------------------------------------------------------------
/*
@@ -872,11 +907,11 @@
synchronized (mainLock)
{
- if (listener == null || buffer.isEmpty())
+ if (listener == null || !isMessageAvailableForConsuming())
{
listenerRunning = false;
- if (trace) { log.trace("no listener or buffer is empty, returning"); }
+ if (trace) { log.trace("no listener or no message available for processing, returning"); }
return;
}
@@ -887,7 +922,7 @@
mp = (MessageProxy)buffer.removeFirst();
- if (!buffer.isEmpty())
+ if (isMessageAvailableForConsuming())
{
//Queue up the next runner to run
Modified: trunk/src/main/org/jboss/jms/client/container/ConnectionAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/ConnectionAspect.java 2007-07-03 12:00:14 UTC (rev 2827)
+++ trunk/src/main/org/jboss/jms/client/container/ConnectionAspect.java 2007-07-03 12:12:42 UTC (rev 2828)
@@ -21,6 +21,10 @@
*/
package org.jboss.jms.client.container;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
import javax.jms.ExceptionListener;
import javax.jms.IllegalStateException;
@@ -32,6 +36,8 @@
import org.jboss.jms.client.remoting.ConsolidatedRemotingConnectionListener;
import org.jboss.jms.client.remoting.JMSRemotingConnection;
import org.jboss.jms.client.state.ConnectionState;
+import org.jboss.jms.client.state.ConsumerState;
+import org.jboss.jms.client.state.SessionState;
import org.jboss.jms.message.MessageIdGeneratorFactory;
import org.jboss.jms.tx.ResourceManagerFactory;
@@ -145,6 +151,15 @@
ConnectionState currentState = getConnectionState(invocation);
currentState.setStarted(true);
currentState.setJustCreated(false);
+
+ // Start all consumers
+ forEachConsumer(currentState, new ConsumerAction() {
+ public void run(ClientConsumer consumer) throws Exception
+ {
+ consumer.start();
+ }
+ });
+
return invocation.invokeNext();
}
@@ -153,7 +168,27 @@
ConnectionState currentState = getConnectionState(invocation);
currentState.setStarted(false);
currentState.setJustCreated(false);
- return invocation.invokeNext();
+
+ Object ret = invocation.invokeNext();
+
+ // Stop all consumers - in two steps, first tell each one to stop,
+ // then actually wait until all have stopped.
+
+ forEachConsumer(currentState, new ConsumerAction() {
+ public void run(ClientConsumer consumer) throws Exception
+ {
+ consumer.stop();
+ }
+ });
+
+ forEachConsumer(currentState, new ConsumerAction() {
+ public void run(ClientConsumer consumer) throws Exception
+ {
+ consumer.waitUntilStopped();
+ }
+ });
+
+ return ret;
}
public Object handleCreateSessionDelegate(Invocation invocation) throws Throwable
@@ -256,7 +291,32 @@
}
return state;
}
+
+ private static void forEachConsumer(ConnectionState connectionState, ConsumerAction action) throws Exception
+ {
+ Set sessions = connectionState.getChildren();
+
+ for (Iterator sessionsIter = sessions.iterator(); sessionsIter.hasNext();)
+ {
+ SessionState sessionState = (SessionState) sessionsIter.next();
+
+ for (Iterator childrenIter = sessionState.getChildren().iterator(); childrenIter.hasNext();)
+ {
+ Object childState = childrenIter.next();
+ if (!(childState instanceof ConsumerState))
+ {
+ continue;
+ }
+ ClientConsumer consumer = ((ConsumerState) childState).getClientConsumer();
+ action.run(consumer);
+ }
+ }
+ }
-
// Inner classes -------------------------------------------------
+
+ private interface ConsumerAction
+ {
+ void run(ClientConsumer consumer) throws Exception;
+ }
}
Modified: trunk/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java 2007-07-03 12:00:14 UTC (rev 2827)
+++ trunk/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java 2007-07-03 12:12:42 UTC (rev 2828)
@@ -52,6 +52,7 @@
import org.jboss.test.messaging.tools.ServerManagement;
import EDU.oswego.cs.dl.util.concurrent.Latch;
+import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
/**
* @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
@@ -2070,7 +2071,100 @@
}
}
+ public void testStopConnectionDuringOnMessage() throws Exception
+ {
+ if (log.isTraceEnabled()) log.trace("testStopConnectionWhileOnMessageIsExecuting");
+
+ final SynchronizedInt messagesReceived = new SynchronizedInt(0);
+
+ MessageListener myListener = new MessageListener() {
+ public void onMessage(Message message)
+ {
+ messagesReceived.increment();
+ try
+ {
+ Thread.sleep(100L);
+ }
+ catch (InterruptedException e)
+ {
+ // Ignore
+ }
+ }
+ };
+
+ queueConsumer.setMessageListener(myListener);
+
+ log.trace("Starting consumer connection");
+ consumerConnection.start();
+
+ final int MESSAGE_COUNT = 100;
+
+ log.trace("Sending the first batch of messages");
+ for (int i = 0; i < MESSAGE_COUNT / 2; i++)
+ {
+ queueProducer.send(producerSession.createTextMessage("Message #" + Integer.toString(i)));
+ }
+
+ Thread.sleep(500L);
+
+ log.trace("Stopping consumer connection");
+ consumerConnection.stop();
+
+ int countAfterStop = messagesReceived.get();
+ assertTrue("Should have received some messages before stopping", countAfterStop > 0);
+
+ log.trace("Sending the second batch of messages");
+ for (int i = MESSAGE_COUNT / 2; i < MESSAGE_COUNT; i++)
+ {
+ queueProducer.send(producerSession.createTextMessage("Message #" + Integer.toString(i)));
+ }
+
+ log.trace("Sleeping a bit to check that no messages are received");
+ Thread.sleep(2000);
+
+ assertEquals("Should not receive any messages after the connection has been stopped", countAfterStop, messagesReceived.get());
+
+ log.trace("Restarting consumer connection");
+ consumerConnection.start();
+
+ log.trace("Sleeping to allow remaining messages to arrive");
+ Thread.sleep(15000);
+ assertEquals("Should have received all messages after restarting", MESSAGE_COUNT, messagesReceived.get());
+ }
+
+ // Test that stop doesn't in any way break subsequent close
+ public void testCloseAfterStop() throws Exception
+ {
+ MessageListener myListener = new MessageListener() {
+ public void onMessage(Message message)
+ {
+ try
+ {
+ Thread.sleep(100);
+ }
+ catch (InterruptedException e)
+ {
+ // Ignore
+ }
+ }
+ };
+
+ queueConsumer.setMessageListener(myListener);
+
+ consumerConnection.start();
+
+ for (int i = 0; i < 100; i++)
+ {
+ queueProducer.send(producerSession.createTextMessage("Message #" + Integer.toString(i)));
+ }
+
+ Thread.sleep(500);
+ consumerConnection.stop();
+ Thread.sleep(500);
+ consumerConnection.close();
+ }
+
//
// Multiple consumers
//
More information about the jboss-cvs-commits
mailing list