[jboss-cvs] JBoss Messaging SVN: r3210 - in trunk: tests/src/org/jboss/test/messaging/jms and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Oct 19 13:21:09 EDT 2007


Author: clebert.suconic at jboss.com
Date: 2007-10-19 13:21:09 -0400 (Fri, 19 Oct 2007)
New Revision: 3210

Modified:
   trunk/src/main/org/jboss/jms/client/container/ClientConsumer.java
   trunk/src/main/org/jboss/jms/client/container/ConsumerAspect.java
   trunk/tests/src/org/jboss/test/messaging/jms/ConnectionConsumerTest.java
Log:
JBMESSAGING-1099 - Closing ClientConsumer correctly if ConsumerAspect.closing fails

Modified: trunk/src/main/org/jboss/jms/client/container/ClientConsumer.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/ClientConsumer.java	2007-10-19 16:51:31 UTC (rev 3209)
+++ trunk/src/main/org/jboss/jms/client/container/ClientConsumer.java	2007-10-19 17:21:09 UTC (rev 3210)
@@ -249,7 +249,13 @@
    }
         
    // Public ---------------------------------------------------------------------------------------
-  
+
+
+   public boolean isClosed()
+   {
+      return closed;
+   }
+
    /**
     * Handles a message sent from the server.
     *

Modified: trunk/src/main/org/jboss/jms/client/container/ConsumerAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/ConsumerAspect.java	2007-10-19 16:51:31 UTC (rev 3209)
+++ trunk/src/main/org/jboss/jms/client/container/ConsumerAspect.java	2007-10-19 17:21:09 UTC (rev 3210)
@@ -125,33 +125,46 @@
    }
    
    public Object handleClosing(Invocation invocation) throws Throwable
-   {      
+   {
       ConsumerState consumerState = getState(invocation);
-                       
-      // We make sure closing is called on the ServerConsumerEndpoint.
-      // This returns us the last delivery id sent
-      
-      Long l  = (Long)invocation.invokeNext();
-      
-      long lastDeliveryId = l.longValue();
-      
-      // First we call close on the ClientConsumer which waits for onMessage invocations      
-      // to complete and the last delivery to arrive
-      consumerState.getClientConsumer().close(lastDeliveryId);
-                
-      SessionState sessionState = (SessionState)consumerState.getParent();
-      ConnectionState connectionState = (ConnectionState)sessionState.getParent();
-                 
-      sessionState.removeCallbackHandler(consumerState.getClientConsumer());
+      try
+      {
 
-      CallbackManager cm = connectionState.getRemotingConnection().getCallbackManager();
-      cm.unregisterHandler(consumerState.getConsumerID());
-         
-      //And then we cancel any messages still in the message callback handler buffer     
-      consumerState.getClientConsumer().cancelBuffer();
-                                   
-      return l;
-   }      
+         // We make sure closing is called on the ServerConsumerEndpoint.
+         // This returns us the last delivery id sent
+
+         Long l  = (Long)invocation.invokeNext();
+
+         long lastDeliveryId = l.longValue();
+
+         // First we call close on the ClientConsumer which waits for onMessage invocations
+         // to complete and the last delivery to arrive
+         consumerState.getClientConsumer().close(lastDeliveryId);
+
+         SessionState sessionState = (SessionState)consumerState.getParent();
+         ConnectionState connectionState = (ConnectionState)sessionState.getParent();
+
+         sessionState.removeCallbackHandler(consumerState.getClientConsumer());
+
+         CallbackManager cm = connectionState.getRemotingConnection().getCallbackManager();
+         cm.unregisterHandler(consumerState.getConsumerID());
+
+         //And then we cancel any messages still in the message callback handler buffer
+         consumerState.getClientConsumer().cancelBuffer();
+
+         return l;
+      }
+      finally
+      {
+         // If this method fails before the call to clientConsumer.close,
+         // we need to ensure the method will close the consumer, otherwise the server
+         // would hang during a shutdown
+         if (!consumerState.getClientConsumer().isClosed())
+         {
+            consumerState.getClientConsumer().close(-1);
+         }
+      }
+   }
    
    public Object handleReceive(Invocation invocation) throws Throwable
    {

Modified: trunk/tests/src/org/jboss/test/messaging/jms/ConnectionConsumerTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/ConnectionConsumerTest.java	2007-10-19 16:51:31 UTC (rev 3209)
+++ trunk/tests/src/org/jboss/test/messaging/jms/ConnectionConsumerTest.java	2007-10-19 17:21:09 UTC (rev 3210)
@@ -309,12 +309,45 @@
       {
          if (connConsumer != null) connConsumer.close();
          if (connConsumer != null) connProducer.close();
-         
+
          removeAllMessages(queue1.getQueueName(), true, 0);
       }
    }
 
-   
+   public void testStopWhileProcessing() throws Exception
+   {
+      if (ServerManagement.isRemote()) return;
+
+
+      Connection connConsumer = null;
+
+      try
+      {
+         connConsumer = cf.createConnection();
+
+         connConsumer.start();
+
+         Session sessCons = connConsumer.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         SimpleMessageListener listener = new SimpleMessageListener(0);
+
+         sessCons.setMessageListener(listener);
+
+         ServerSessionPool pool = new MockServerSessionPool(sessCons);
+
+         JBossConnectionConsumer cc = (JBossConnectionConsumer)connConsumer.createConnectionConsumer(queue1, null, pool, 1);
+
+         ServerManagement.stop();
+         connConsumer.close();
+         connConsumer = null;
+      }
+      finally
+      {
+         if (connConsumer != null) connConsumer.close();
+      }
+   }
+
+
    class SimpleMessageListener implements MessageListener
    {
       Latch latch = new Latch();




More information about the jboss-cvs-commits mailing list