[jboss-cvs] JBoss Messaging SVN: r3261 - in trunk: src/main/org/jboss/jms/exception and 2 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Oct 30 00:25:11 EDT 2007


Author: clebert.suconic at jboss.com
Date: 2007-10-30 00:25:11 -0400 (Tue, 30 Oct 2007)
New Revision: 3261

Added:
   trunk/src/main/org/jboss/jms/exception/MessagingShutdownException.java
Modified:
   trunk/src/main/org/jboss/jms/client/container/ConsumerAspect.java
   trunk/src/main/org/jboss/jms/server/remoting/JMSServerInvocationHandler.java
   trunk/tests/src/org/jboss/test/messaging/jms/ConnectionConsumerTest.java
Log:
http://jira.jboss.com/jira/browse/JBMESSAGING-1099 - fix

Modified: trunk/src/main/org/jboss/jms/client/container/ConsumerAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/ConsumerAspect.java	2007-10-29 22:02:00 UTC (rev 3260)
+++ trunk/src/main/org/jboss/jms/client/container/ConsumerAspect.java	2007-10-30 04:25:11 UTC (rev 3261)
@@ -21,8 +21,7 @@
   */
 package org.jboss.jms.client.container;
 
-import javax.jms.MessageListener;
-
+import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
 import org.jboss.aop.joinpoint.Invocation;
 import org.jboss.aop.joinpoint.MethodInvocation;
 import org.jboss.jms.client.delegate.DelegateSupport;
@@ -33,10 +32,11 @@
 import org.jboss.jms.delegate.ConnectionDelegate;
 import org.jboss.jms.delegate.ConsumerDelegate;
 import org.jboss.jms.delegate.SessionDelegate;
+import org.jboss.jms.exception.MessagingShutdownException;
 import org.jboss.logging.Logger;
 import org.jboss.messaging.util.MessageQueueNameHelper;
 
-import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
+import javax.jms.MessageListener;
 
 /**
  * 
@@ -127,36 +127,58 @@
 
       return consumerDelegate;
    }
-   
+
    public Object handleClosing(Invocation invocation) throws Throwable
    {
       ConsumerState consumerState = getState(invocation);
-      
-      // We make sure closing is called on the ServerConsumerEndpoint.
-      // This returns us the last delivery id sent
+      try
+      {
 
-      Long l  = (Long)invocation.invokeNext();
+         // We make sure closing is called on the ServerConsumerEndpoint.
+         // This returns us the last delivery id sent
 
-      long lastDeliveryId = l.longValue();
+         Long l = (Long) invocation.invokeNext();
 
-      // First we call close on the ClientConsumer which waits for onMessage invocations
-      // to complete and the last delivery to arrive
-      consumerState.getClientConsumer().close(lastDeliveryId);
+         long lastDeliveryId = l.longValue();
 
-      SessionState sessionState = (SessionState)consumerState.getParent();
-      ConnectionState connectionState = (ConnectionState)sessionState.getParent();
+         // First we call close on the ClientConsumer which waits for onMessage invocations
+         // to complete and the last delivery to arrive
+         consumerState.getClientConsumer().close(lastDeliveryId);
 
-      sessionState.removeCallbackHandler(consumerState.getClientConsumer());
+         SessionState sessionState = (SessionState) consumerState.getParent();
+         ConnectionState connectionState = (ConnectionState) sessionState.getParent();
 
-      CallbackManager cm = connectionState.getRemotingConnection().getCallbackManager();
-      cm.unregisterHandler(consumerState.getConsumerID());
+         sessionState.removeCallbackHandler(consumerState.getClientConsumer());
 
-      //And then we cancel any messages still in the message callback handler buffer
-      consumerState.getClientConsumer().cancelBuffer();
+         CallbackManager cm = connectionState.getRemotingConnection().getCallbackManager();
+         cm.unregisterHandler(consumerState.getConsumerID());
 
-      return l;
+         //And then we cancel any messages still in the message callback handler buffer
+         consumerState.getClientConsumer().cancelBuffer();
+
+         return l;
+
+      }
+      catch (Exception proxiedException)
+      {
+         ConnectionState connectionState = (ConnectionState) (consumerState.getParent().getParent());
+         // if ServerPeer is shutdown or
+         // if there is no failover in place... we just close the consumerState as well
+         if (proxiedException instanceof MessagingShutdownException ||
+                 (connectionState.getFailoverCommandCenter() == null))
+
+
+         {
+            if (!consumerState.getClientConsumer().isClosed())
+            {
+               consumerState.getClientConsumer().close(-1);
+            }
+         }
+         throw proxiedException;
+      }
+
    }
-   
+
    public Object handleReceive(Invocation invocation) throws Throwable
    {
       MethodInvocation mi = (MethodInvocation)invocation;

Added: trunk/src/main/org/jboss/jms/exception/MessagingShutdownException.java
===================================================================
--- trunk/src/main/org/jboss/jms/exception/MessagingShutdownException.java	                        (rev 0)
+++ trunk/src/main/org/jboss/jms/exception/MessagingShutdownException.java	2007-10-30 04:25:11 UTC (rev 3261)
@@ -0,0 +1,60 @@
+/*
+   * JBoss, Home of Professional Open Source
+   * Copyright 2005, JBoss Inc., and individual contributors as indicated
+   * by the @authors tag. See the copyright.txt in the distribution for a
+   * full listing of individual contributors.
+   *
+   * This is free software; you can redistribute it and/or modify it
+   * under the terms of the GNU Lesser General Public License as
+   * published by the Free Software Foundation; either version 2.1 of
+   * the License, or (at your option) any later version.
+   *
+   * This software is distributed in the hope that it will be useful,
+   * but WITHOUT ANY WARRANTY; without even the implied warranty of
+   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+   * Lesser General Public License for more details.
+   *
+   * You should have received a copy of the GNU Lesser General Public
+   * License along with this software; if not, write to the Free
+   * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+   * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+   */
+
+package org.jboss.jms.exception;
+
+/**
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * @version <tt>$Revision$</tt>
+ *          $Id$
+ */
+public class MessagingShutdownException extends MessagingJMSException
+{
+   private static final long serialVersionUID = -2234413113067993577L;
+
+   // Constructors ---------------------------------------------------------------------------------
+
+   public MessagingShutdownException(String reason)
+   {
+      super(reason);
+   }
+
+   public MessagingShutdownException(Throwable cause)
+   {
+      super(cause);
+   }
+
+   public MessagingShutdownException(String reason, String errorCode)
+   {
+      super(reason, errorCode);
+   }
+
+   public MessagingShutdownException(String reason, Throwable cause)
+   {
+      super(reason, cause);
+   }
+
+   public MessagingShutdownException(String reason, String errorCode, Throwable cause)
+   {
+      super(reason, errorCode, cause);
+   }
+}


Property changes on: trunk/src/main/org/jboss/jms/exception/MessagingShutdownException.java
___________________________________________________________________
Name: svn:keywords
   + Id LastChangedDate Author Revision

Modified: trunk/src/main/org/jboss/jms/server/remoting/JMSServerInvocationHandler.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/remoting/JMSServerInvocationHandler.java	2007-10-29 22:02:00 UTC (rev 3260)
+++ trunk/src/main/org/jboss/jms/server/remoting/JMSServerInvocationHandler.java	2007-10-30 04:25:11 UTC (rev 3261)
@@ -28,8 +28,7 @@
 
 import javax.management.MBeanServer;
 
-import org.jboss.jms.exception.MessagingJMSException;
-import org.jboss.jms.wireformat.ConnectionFactoryCreateConnectionDelegateRequest;
+import org.jboss.jms.exception.MessagingShutdownException;
 import org.jboss.jms.wireformat.RequestSupport;
 import org.jboss.jms.wireformat.CallbackRequestSupport;
 import org.jboss.logging.Logger;
@@ -131,7 +130,7 @@
       {	              
          if (closed)
          {
-            throw new MessagingJMSException("Cannot handle invocation since messaging server is not active (it is either starting up or shutting down)");
+            throw new MessagingShutdownException("Cannot handle invocation since messaging server is not active (it is either starting up or shutting down)");
          }
            
          RequestSupport request = (RequestSupport)invocation.getParameter();
@@ -232,7 +231,7 @@
       }
       if (callbackHandler != null)
       {
-         log.debug("found calllback handler for remoting session " + Util.guidToString(remotingSessionId));
+         log.debug("found calllback handler for remoting session " + Util.guidToString(remotingSessionId) + " UID=" + remotingSessionId);
 
          cReq.setCallbackHandler(callbackHandler);
       }

Modified: trunk/tests/src/org/jboss/test/messaging/jms/ConnectionConsumerTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/ConnectionConsumerTest.java	2007-10-29 22:02:00 UTC (rev 3260)
+++ trunk/tests/src/org/jboss/test/messaging/jms/ConnectionConsumerTest.java	2007-10-30 04:25:11 UTC (rev 3261)
@@ -314,41 +314,40 @@
       }
    }
 
-//   Commented out until http://jira.jboss.com/jira/browse/JBMESSAGING-1099 is complete   
-//   public void testStopWhileProcessing() throws Exception
-//   {
-//      if (ServerManagement.isRemote()) return;
-//
-//
-//      Connection connConsumer = null;
-//
-//      try
-//      {
-//         connConsumer = cf.createConnection();
-//
-//         connConsumer.start();
-//
-//         Session sessCons = connConsumer.createSession(false, Session.AUTO_ACKNOWLEDGE);
-//
-//         SimpleMessageListener listener = new SimpleMessageListener(0);
-//
-//         sessCons.setMessageListener(listener);
-//
-//         ServerSessionPool pool = new MockServerSessionPool(sessCons);
-//
-//         JBossConnectionConsumer cc = (JBossConnectionConsumer)connConsumer.createConnectionConsumer(queue1, null, pool, 1);
-//
-//         ServerManagement.stop();
-//         connConsumer.close();
-//         connConsumer = null;
-//      }
-//      finally
-//      {
-//         if (connConsumer != null) connConsumer.close();
-//      }
-//   }
+   public void testStopWhileProcessing() throws Exception
+   {
+      if (ServerManagement.isRemote()) return;
 
 
+      Connection connConsumer = null;
+
+      try
+      {
+         connConsumer = cf.createConnection();
+
+         connConsumer.start();
+
+         Session sessCons = connConsumer.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         SimpleMessageListener listener = new SimpleMessageListener(0);
+
+         sessCons.setMessageListener(listener);
+
+         ServerSessionPool pool = new MockServerSessionPool(sessCons);
+
+         JBossConnectionConsumer cc = (JBossConnectionConsumer)connConsumer.createConnectionConsumer(queue1, null, pool, 1);
+
+         ServerManagement.stop();
+         connConsumer.close();
+         connConsumer = null;
+      }
+      finally
+      {
+         if (connConsumer != null) connConsumer.close();
+      }
+   }
+
+
    class SimpleMessageListener implements MessageListener
    {
       Latch latch = new Latch();




More information about the jboss-cvs-commits mailing list