[jboss-cvs] JBoss Messaging SVN: r3210 - in trunk: tests/src/org/jboss/test/messaging/jms and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Oct 19 13:21:09 EDT 2007
Author: clebert.suconic at jboss.com
Date: 2007-10-19 13:21:09 -0400 (Fri, 19 Oct 2007)
New Revision: 3210
Modified:
trunk/src/main/org/jboss/jms/client/container/ClientConsumer.java
trunk/src/main/org/jboss/jms/client/container/ConsumerAspect.java
trunk/tests/src/org/jboss/test/messaging/jms/ConnectionConsumerTest.java
Log:
JBMESSAGING-1099 - Closing ClientConsumer correctly if ConsumerAspect.closing fails
Modified: trunk/src/main/org/jboss/jms/client/container/ClientConsumer.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/ClientConsumer.java 2007-10-19 16:51:31 UTC (rev 3209)
+++ trunk/src/main/org/jboss/jms/client/container/ClientConsumer.java 2007-10-19 17:21:09 UTC (rev 3210)
@@ -249,7 +249,13 @@
}
// Public ---------------------------------------------------------------------------------------
-
+
+
+ public boolean isClosed()
+ {
+ return closed;
+ }
+
/**
* Handles a message sent from the server.
*
Modified: trunk/src/main/org/jboss/jms/client/container/ConsumerAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/ConsumerAspect.java 2007-10-19 16:51:31 UTC (rev 3209)
+++ trunk/src/main/org/jboss/jms/client/container/ConsumerAspect.java 2007-10-19 17:21:09 UTC (rev 3210)
@@ -125,33 +125,46 @@
}
public Object handleClosing(Invocation invocation) throws Throwable
- {
+ {
ConsumerState consumerState = getState(invocation);
-
- // We make sure closing is called on the ServerConsumerEndpoint.
- // This returns us the last delivery id sent
-
- Long l = (Long)invocation.invokeNext();
-
- long lastDeliveryId = l.longValue();
-
- // First we call close on the ClientConsumer which waits for onMessage invocations
- // to complete and the last delivery to arrive
- consumerState.getClientConsumer().close(lastDeliveryId);
-
- SessionState sessionState = (SessionState)consumerState.getParent();
- ConnectionState connectionState = (ConnectionState)sessionState.getParent();
-
- sessionState.removeCallbackHandler(consumerState.getClientConsumer());
+ try
+ {
- CallbackManager cm = connectionState.getRemotingConnection().getCallbackManager();
- cm.unregisterHandler(consumerState.getConsumerID());
-
- //And then we cancel any messages still in the message callback handler buffer
- consumerState.getClientConsumer().cancelBuffer();
-
- return l;
- }
+ // We make sure closing is called on the ServerConsumerEndpoint.
+ // This returns us the last delivery id sent
+
+ Long l = (Long)invocation.invokeNext();
+
+ long lastDeliveryId = l.longValue();
+
+ // First we call close on the ClientConsumer which waits for onMessage invocations
+ // to complete and the last delivery to arrive
+ consumerState.getClientConsumer().close(lastDeliveryId);
+
+ SessionState sessionState = (SessionState)consumerState.getParent();
+ ConnectionState connectionState = (ConnectionState)sessionState.getParent();
+
+ sessionState.removeCallbackHandler(consumerState.getClientConsumer());
+
+ CallbackManager cm = connectionState.getRemotingConnection().getCallbackManager();
+ cm.unregisterHandler(consumerState.getConsumerID());
+
+ //And then we cancel any messages still in the message callback handler buffer
+ consumerState.getClientConsumer().cancelBuffer();
+
+ return l;
+ }
+ finally
+ {
+ // If this method fails before the call to clientConsumer.close,
+ // we need to ensure the method will close the consumer, otherwise the server
+ // would hang during a shutdown
+ if (!consumerState.getClientConsumer().isClosed())
+ {
+ consumerState.getClientConsumer().close(-1);
+ }
+ }
+ }
public Object handleReceive(Invocation invocation) throws Throwable
{
Modified: trunk/tests/src/org/jboss/test/messaging/jms/ConnectionConsumerTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/ConnectionConsumerTest.java 2007-10-19 16:51:31 UTC (rev 3209)
+++ trunk/tests/src/org/jboss/test/messaging/jms/ConnectionConsumerTest.java 2007-10-19 17:21:09 UTC (rev 3210)
@@ -309,12 +309,45 @@
{
if (connConsumer != null) connConsumer.close();
if (connConsumer != null) connProducer.close();
-
+
removeAllMessages(queue1.getQueueName(), true, 0);
}
}
-
+ public void testStopWhileProcessing() throws Exception
+ {
+ if (ServerManagement.isRemote()) return;
+
+
+ Connection connConsumer = null;
+
+ try
+ {
+ connConsumer = cf.createConnection();
+
+ connConsumer.start();
+
+ Session sessCons = connConsumer.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ SimpleMessageListener listener = new SimpleMessageListener(0);
+
+ sessCons.setMessageListener(listener);
+
+ ServerSessionPool pool = new MockServerSessionPool(sessCons);
+
+ JBossConnectionConsumer cc = (JBossConnectionConsumer)connConsumer.createConnectionConsumer(queue1, null, pool, 1);
+
+ ServerManagement.stop();
+ connConsumer.close();
+ connConsumer = null;
+ }
+ finally
+ {
+ if (connConsumer != null) connConsumer.close();
+ }
+ }
+
+
class SimpleMessageListener implements MessageListener
{
Latch latch = new Latch();
More information about the jboss-cvs-commits
mailing list