[jboss-cvs] JBoss Messaging SVN: r3261 - in trunk: src/main/org/jboss/jms/exception and 2 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Oct 30 00:25:11 EDT 2007
Author: clebert.suconic at jboss.com
Date: 2007-10-30 00:25:11 -0400 (Tue, 30 Oct 2007)
New Revision: 3261
Added:
trunk/src/main/org/jboss/jms/exception/MessagingShutdownException.java
Modified:
trunk/src/main/org/jboss/jms/client/container/ConsumerAspect.java
trunk/src/main/org/jboss/jms/server/remoting/JMSServerInvocationHandler.java
trunk/tests/src/org/jboss/test/messaging/jms/ConnectionConsumerTest.java
Log:
http://jira.jboss.com/jira/browse/JBMESSAGING-1099 - fix
Modified: trunk/src/main/org/jboss/jms/client/container/ConsumerAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/ConsumerAspect.java 2007-10-29 22:02:00 UTC (rev 3260)
+++ trunk/src/main/org/jboss/jms/client/container/ConsumerAspect.java 2007-10-30 04:25:11 UTC (rev 3261)
@@ -21,8 +21,7 @@
*/
package org.jboss.jms.client.container;
-import javax.jms.MessageListener;
-
+import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
import org.jboss.aop.joinpoint.Invocation;
import org.jboss.aop.joinpoint.MethodInvocation;
import org.jboss.jms.client.delegate.DelegateSupport;
@@ -33,10 +32,11 @@
import org.jboss.jms.delegate.ConnectionDelegate;
import org.jboss.jms.delegate.ConsumerDelegate;
import org.jboss.jms.delegate.SessionDelegate;
+import org.jboss.jms.exception.MessagingShutdownException;
import org.jboss.logging.Logger;
import org.jboss.messaging.util.MessageQueueNameHelper;
-import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
+import javax.jms.MessageListener;
/**
*
@@ -127,36 +127,58 @@
return consumerDelegate;
}
-
+
public Object handleClosing(Invocation invocation) throws Throwable
{
ConsumerState consumerState = getState(invocation);
-
- // We make sure closing is called on the ServerConsumerEndpoint.
- // This returns us the last delivery id sent
+ try
+ {
- Long l = (Long)invocation.invokeNext();
+ // We make sure closing is called on the ServerConsumerEndpoint.
+ // This returns us the last delivery id sent
- long lastDeliveryId = l.longValue();
+ Long l = (Long) invocation.invokeNext();
- // First we call close on the ClientConsumer which waits for onMessage invocations
- // to complete and the last delivery to arrive
- consumerState.getClientConsumer().close(lastDeliveryId);
+ long lastDeliveryId = l.longValue();
- SessionState sessionState = (SessionState)consumerState.getParent();
- ConnectionState connectionState = (ConnectionState)sessionState.getParent();
+ // First we call close on the ClientConsumer which waits for onMessage invocations
+ // to complete and the last delivery to arrive
+ consumerState.getClientConsumer().close(lastDeliveryId);
- sessionState.removeCallbackHandler(consumerState.getClientConsumer());
+ SessionState sessionState = (SessionState) consumerState.getParent();
+ ConnectionState connectionState = (ConnectionState) sessionState.getParent();
- CallbackManager cm = connectionState.getRemotingConnection().getCallbackManager();
- cm.unregisterHandler(consumerState.getConsumerID());
+ sessionState.removeCallbackHandler(consumerState.getClientConsumer());
- //And then we cancel any messages still in the message callback handler buffer
- consumerState.getClientConsumer().cancelBuffer();
+ CallbackManager cm = connectionState.getRemotingConnection().getCallbackManager();
+ cm.unregisterHandler(consumerState.getConsumerID());
- return l;
+ //And then we cancel any messages still in the message callback handler buffer
+ consumerState.getClientConsumer().cancelBuffer();
+
+ return l;
+
+ }
+ catch (Exception proxiedException)
+ {
+ ConnectionState connectionState = (ConnectionState) (consumerState.getParent().getParent());
+ // if ServerPeer is shutdown or
+ // if there is no failover in place... we just close the consumerState as well
+ if (proxiedException instanceof MessagingShutdownException ||
+ (connectionState.getFailoverCommandCenter() == null))
+
+
+ {
+ if (!consumerState.getClientConsumer().isClosed())
+ {
+ consumerState.getClientConsumer().close(-1);
+ }
+ }
+ throw proxiedException;
+ }
+
}
-
+
public Object handleReceive(Invocation invocation) throws Throwable
{
MethodInvocation mi = (MethodInvocation)invocation;
Added: trunk/src/main/org/jboss/jms/exception/MessagingShutdownException.java
===================================================================
--- trunk/src/main/org/jboss/jms/exception/MessagingShutdownException.java (rev 0)
+++ trunk/src/main/org/jboss/jms/exception/MessagingShutdownException.java 2007-10-30 04:25:11 UTC (rev 3261)
@@ -0,0 +1,60 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.jms.exception;
+
+/**
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * @version <tt>$Revision$</tt>
+ * $Id$
+ */
+public class MessagingShutdownException extends MessagingJMSException
+{
+ private static final long serialVersionUID = -2234413113067993577L;
+
+ // Constructors ---------------------------------------------------------------------------------
+
+ public MessagingShutdownException(String reason)
+ {
+ super(reason);
+ }
+
+ public MessagingShutdownException(Throwable cause)
+ {
+ super(cause);
+ }
+
+ public MessagingShutdownException(String reason, String errorCode)
+ {
+ super(reason, errorCode);
+ }
+
+ public MessagingShutdownException(String reason, Throwable cause)
+ {
+ super(reason, cause);
+ }
+
+ public MessagingShutdownException(String reason, String errorCode, Throwable cause)
+ {
+ super(reason, errorCode, cause);
+ }
+}
Property changes on: trunk/src/main/org/jboss/jms/exception/MessagingShutdownException.java
___________________________________________________________________
Name: svn:keywords
+ Id LastChangedDate Author Revision
Modified: trunk/src/main/org/jboss/jms/server/remoting/JMSServerInvocationHandler.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/remoting/JMSServerInvocationHandler.java 2007-10-29 22:02:00 UTC (rev 3260)
+++ trunk/src/main/org/jboss/jms/server/remoting/JMSServerInvocationHandler.java 2007-10-30 04:25:11 UTC (rev 3261)
@@ -28,8 +28,7 @@
import javax.management.MBeanServer;
-import org.jboss.jms.exception.MessagingJMSException;
-import org.jboss.jms.wireformat.ConnectionFactoryCreateConnectionDelegateRequest;
+import org.jboss.jms.exception.MessagingShutdownException;
import org.jboss.jms.wireformat.RequestSupport;
import org.jboss.jms.wireformat.CallbackRequestSupport;
import org.jboss.logging.Logger;
@@ -131,7 +130,7 @@
{
if (closed)
{
- throw new MessagingJMSException("Cannot handle invocation since messaging server is not active (it is either starting up or shutting down)");
+ throw new MessagingShutdownException("Cannot handle invocation since messaging server is not active (it is either starting up or shutting down)");
}
RequestSupport request = (RequestSupport)invocation.getParameter();
@@ -232,7 +231,7 @@
}
if (callbackHandler != null)
{
- log.debug("found calllback handler for remoting session " + Util.guidToString(remotingSessionId));
+ log.debug("found calllback handler for remoting session " + Util.guidToString(remotingSessionId) + " UID=" + remotingSessionId);
cReq.setCallbackHandler(callbackHandler);
}
Modified: trunk/tests/src/org/jboss/test/messaging/jms/ConnectionConsumerTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/ConnectionConsumerTest.java 2007-10-29 22:02:00 UTC (rev 3260)
+++ trunk/tests/src/org/jboss/test/messaging/jms/ConnectionConsumerTest.java 2007-10-30 04:25:11 UTC (rev 3261)
@@ -314,41 +314,40 @@
}
}
-// Commented out until http://jira.jboss.com/jira/browse/JBMESSAGING-1099 is complete
-// public void testStopWhileProcessing() throws Exception
-// {
-// if (ServerManagement.isRemote()) return;
-//
-//
-// Connection connConsumer = null;
-//
-// try
-// {
-// connConsumer = cf.createConnection();
-//
-// connConsumer.start();
-//
-// Session sessCons = connConsumer.createSession(false, Session.AUTO_ACKNOWLEDGE);
-//
-// SimpleMessageListener listener = new SimpleMessageListener(0);
-//
-// sessCons.setMessageListener(listener);
-//
-// ServerSessionPool pool = new MockServerSessionPool(sessCons);
-//
-// JBossConnectionConsumer cc = (JBossConnectionConsumer)connConsumer.createConnectionConsumer(queue1, null, pool, 1);
-//
-// ServerManagement.stop();
-// connConsumer.close();
-// connConsumer = null;
-// }
-// finally
-// {
-// if (connConsumer != null) connConsumer.close();
-// }
-// }
+ public void testStopWhileProcessing() throws Exception
+ {
+ if (ServerManagement.isRemote()) return;
+ Connection connConsumer = null;
+
+ try
+ {
+ connConsumer = cf.createConnection();
+
+ connConsumer.start();
+
+ Session sessCons = connConsumer.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ SimpleMessageListener listener = new SimpleMessageListener(0);
+
+ sessCons.setMessageListener(listener);
+
+ ServerSessionPool pool = new MockServerSessionPool(sessCons);
+
+ JBossConnectionConsumer cc = (JBossConnectionConsumer)connConsumer.createConnectionConsumer(queue1, null, pool, 1);
+
+ ServerManagement.stop();
+ connConsumer.close();
+ connConsumer = null;
+ }
+ finally
+ {
+ if (connConsumer != null) connConsumer.close();
+ }
+ }
+
+
class SimpleMessageListener implements MessageListener
{
Latch latch = new Latch();
More information about the jboss-cvs-commits
mailing list