[jboss-cvs] JBoss Messaging SVN: r2550 - in trunk/src/main/org/jboss/jms: client/container and 4 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Mar 15 19:18:19 EDT 2007
Author: timfox
Date: 2007-03-15 19:18:18 -0400 (Thu, 15 Mar 2007)
New Revision: 2550
Modified:
trunk/src/main/org/jboss/jms/client/FailoverCommandCenter.java
trunk/src/main/org/jboss/jms/client/container/ClusteringAspect.java
trunk/src/main/org/jboss/jms/client/container/ConnectionFailureListener.java
trunk/src/main/org/jboss/jms/client/remoting/ConsolidatedRemotingConnectionListener.java
trunk/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java
trunk/src/main/org/jboss/jms/server/ServerPeer.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
trunk/src/main/org/jboss/jms/server/remoting/JMSServerInvocationHandler.java
Log:
Partial fix for http://jira.jboss.org/jira/browse/JBMESSAGING-886 and fix for http://jira.jboss.org/jira/browse/JBMESSAGING-920
Modified: trunk/src/main/org/jboss/jms/client/FailoverCommandCenter.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/FailoverCommandCenter.java 2007-03-15 22:34:09 UTC (rev 2549)
+++ trunk/src/main/org/jboss/jms/client/FailoverCommandCenter.java 2007-03-15 23:18:18 UTC (rev 2550)
@@ -63,9 +63,12 @@
/**
* Method called by failure detection components (FailoverValveInterceptors and
* ConnectionListeners) when they have reasons to believe that a server failure occured.
+ *
+ * Returns true if the failover command centre handled the exception gracefully and failover completed
+ * or false if it didn't and failover did not occur
*/
- public void failureDetected(Throwable reason, FailureDetector source,
- JMSRemotingConnection remotingConnection)
+ public boolean failureDetected(Throwable reason, FailureDetector source,
+ JMSRemotingConnection remotingConnection)
throws Exception
{
log.debug("failure detected by " + source);
@@ -94,7 +97,9 @@
{
log.debug(this + " ignoring failure detection notification, as failover was " +
"already (or is in process of being) performed on this connection");
- return;
+
+ //Return true since failover already completed ok
+ return true;
}
remotingConnection.setFailed();
@@ -118,9 +123,7 @@
if (res == null)
{
- // No failover attempt was detected on the server side; this might happen if the
- // client side network fails temporarily so the client connection breaks but the
- // server cluster is still up and running - in this case we don't perform failover.
+ // Failover did not occur
failoverSuccessful = false;
}
else
@@ -144,6 +147,8 @@
failoverSuccessful = true;
}
+
+ return failoverSuccessful;
}
catch (Exception e)
{
Modified: trunk/src/main/org/jboss/jms/client/container/ClusteringAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/ClusteringAspect.java 2007-03-15 22:34:09 UTC (rev 2549)
+++ trunk/src/main/org/jboss/jms/client/container/ClusteringAspect.java 2007-03-15 23:18:18 UTC (rev 2550)
@@ -99,7 +99,7 @@
while (attemptCount < MAX_RECONNECT_HOP_COUNT)
{
- // since an exceptiong might be captured during an attempt, this has to be the first
+ // since an exception might be captured during an attempt, this has to be the first
// operation
attemptCount++;
try
@@ -142,7 +142,7 @@
// add a connection listener to detect failure; the consolidated remoting connection
// listener must be already in place and configured
state.getRemotingConnection().getConnectionListener().
- addDelegateListener(new ConnectionFailureListener(fcc, state.getRemotingConnection()));
+ setDelegateListener(new ConnectionFailureListener(fcc, state.getRemotingConnection()));
log.debug(this + " installed failure listener on " + cd);
Modified: trunk/src/main/org/jboss/jms/client/container/ConnectionFailureListener.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/ConnectionFailureListener.java 2007-03-15 22:34:09 UTC (rev 2549)
+++ trunk/src/main/org/jboss/jms/client/container/ConnectionFailureListener.java 2007-03-15 23:18:18 UTC (rev 2550)
@@ -6,23 +6,23 @@
*/
package org.jboss.jms.client.container;
-import org.jboss.remoting.ConnectionListener;
-import org.jboss.remoting.Client;
-import org.jboss.logging.Logger;
import org.jboss.jms.client.FailoverCommandCenter;
import org.jboss.jms.client.FailureDetector;
import org.jboss.jms.client.remoting.JMSRemotingConnection;
+import org.jboss.logging.Logger;
+import org.jboss.remoting.Client;
/**
* The listener that detects a connection failure and initiates the failover process. Each physical
* connection created under the supervision of ClusteredAspect has one of these.
*
* @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @version <tt>$Revision$</tt>
*
* $Id$
*/
-public class ConnectionFailureListener implements ConnectionListener, FailureDetector
+public class ConnectionFailureListener implements FailureDetector
{
// Constants ------------------------------------------------------------------------------------
@@ -48,18 +48,24 @@
// ConnectionListener implementation ------------------------------------------------------------
- public void handleConnectionException(Throwable throwable, Client client)
+ /*
+ * Returns true if failover handled the exception gracefully
+ * Returns false if failover was unable to handle the exception and it should be passed
+ * on to any JMS exception listener
+ */
+ public boolean handleConnectionException(Throwable throwable, Client client)
{
try
{
log.debug(this + " is being notified of connection failure: " + throwable);
- fcc.failureDetected(throwable, this, remotingConnection);
-
+ return fcc.failureDetected(throwable, this, remotingConnection);
}
catch (Throwable e)
{
log.error("Caught exception in handling failure", e);
+
+ return false;
}
}
Modified: trunk/src/main/org/jboss/jms/client/remoting/ConsolidatedRemotingConnectionListener.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/ConsolidatedRemotingConnectionListener.java 2007-03-15 22:34:09 UTC (rev 2549)
+++ trunk/src/main/org/jboss/jms/client/remoting/ConsolidatedRemotingConnectionListener.java 2007-03-15 23:18:18 UTC (rev 2550)
@@ -6,17 +6,15 @@
*/
package org.jboss.jms.client.remoting;
-import org.jboss.remoting.ConnectionListener;
-import org.jboss.remoting.Client;
-import org.jboss.logging.Logger;
-import org.jboss.jms.client.state.ConnectionState;
-
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
-import java.util.List;
-import java.util.ArrayList;
-import java.util.Iterator;
+import org.jboss.jms.client.container.ConnectionFailureListener;
+import org.jboss.jms.client.state.ConnectionState;
+import org.jboss.logging.Logger;
+import org.jboss.remoting.Client;
+import org.jboss.remoting.ConnectionListener;
+
/**
* The ONLY remoting connection listener for a JMS connection's underlying remoting connection.
* Added to the remoting connection when the JMS connection is created, and removed when the
@@ -44,51 +42,56 @@
private ExceptionListener jmsExceptionListener;
// List<ConnectionListener>
- private List delegateListeners;
+ //private List delegateListeners;
+
+ private ConnectionFailureListener remotingListener;
// Constructors ---------------------------------------------------------------------------------
public ConsolidatedRemotingConnectionListener()
{
- delegateListeners = new ArrayList();
}
// ConnectionListener implementation ------------------------------------------------------------
public void handleConnectionException(Throwable throwable, Client client)
{
- // forward the exception to delegate listeners and JMS ExceptionListeners; synchronize and
- // copy to avoid race conditions
+ // forward the exception to delegate listener and JMS ExceptionListeners; synchronize
+ // to avoid race conditions
ExceptionListener jmsExceptionListenerCopy;
- List delegateListenersCopy = new ArrayList();
+
+ ConnectionFailureListener remotingListenerCopy;
synchronized(this)
{
jmsExceptionListenerCopy = jmsExceptionListener;
- for(Iterator i = delegateListeners.iterator(); i.hasNext(); )
- {
- delegateListenersCopy.add(i.next());
- }
+ remotingListenerCopy = remotingListener;
}
+
+ boolean forwardToJMSListener = true;
- for(Iterator i = delegateListenersCopy.iterator(); i.hasNext(); )
+ if (remotingListenerCopy != null)
{
- ConnectionListener l = (ConnectionListener)i.next();
-
try
{
- log.debug(this + " forwarding remoting failure \"" + throwable + "\" to " + l);
- l.handleConnectionException(throwable, client);
+ log.debug(this + " forwarding remoting failure \"" + throwable + "\" to " + remotingListenerCopy);
+
+ //We only forward to the JMS listener if failover did not successfully handle the exception
+ //If failover handled the exception transparently then there is effectively no problem
+ //with the logical connection that the client needs to be aware of
+ forwardToJMSListener = !remotingListenerCopy.handleConnectionException(throwable, client);
}
catch(Exception e)
{
- log.warn("Failed to forward " + throwable + " to " + l, e);
+ log.warn("Failed to forward " + throwable + " to " + remotingListenerCopy, e);
}
}
-
- if (jmsExceptionListenerCopy != null)
+
+ log.info("DISPATCHING TO JMSLISTENER " + forwardToJMSListener);
+
+ if (forwardToJMSListener && jmsExceptionListenerCopy != null)
{
JMSException jmsException = null;
@@ -118,10 +121,16 @@
// Public ---------------------------------------------------------------------------------------
- public synchronized boolean addDelegateListener(ConnectionListener l)
+ public synchronized void setDelegateListener(ConnectionFailureListener l)
{
- log.debug(this + " adding delegate listener " + l);
- return delegateListeners.add(l);
+ log.debug(this + " setting delegate listener " + l);
+
+ if (remotingListener != null)
+ {
+ throw new IllegalStateException("There is already a connection listener for the connection");
+ }
+
+ remotingListener = l;
}
public synchronized void addJMSExceptionListener(ExceptionListener jmsExceptionListener)
@@ -141,7 +150,7 @@
public synchronized void clear()
{
jmsExceptionListener = null;
- delegateListeners.clear();
+ remotingListener = null;
log.debug(this + " cleared");
}
Modified: trunk/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java 2007-03-15 22:34:09 UTC (rev 2549)
+++ trunk/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java 2007-03-15 23:18:18 UTC (rev 2550)
@@ -343,6 +343,11 @@
{
// very unlikely to get an exception on a local remove (I suspect badly designed API),
// but we're failed anyway, so we don't care too much
+
+ // Actually an exception will always be thrown here if the failure was detected by the connection
+ // validator since the validator will disconnect the client before calling the connection
+ // listener.
+
log.debug(this + " failed to cleanly remove callback manager from the client", t);
}
Modified: trunk/src/main/org/jboss/jms/server/ServerPeer.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/ServerPeer.java 2007-03-15 22:34:09 UTC (rev 2549)
+++ trunk/src/main/org/jboss/jms/server/ServerPeer.java 2007-03-15 23:18:18 UTC (rev 2550)
@@ -72,7 +72,6 @@
import org.jboss.messaging.core.tx.TransactionRepository;
import org.jboss.messaging.util.Util;
import org.jboss.mx.loading.UnifiedClassLoader3;
-import org.jboss.remoting.ServerInvocationHandler;
import org.jboss.remoting.marshal.MarshalFactory;
import org.jboss.system.ServiceCreator;
import org.jboss.system.ServiceMBeanSupport;
@@ -169,10 +168,6 @@
protected ObjectName defaultExpiryQueueObjectName;
protected Queue defaultExpiryQueue;
- //Other stuff
-
- private JMSServerInvocationHandler handler;
-
// Constructors ---------------------------------------------------------------------------------
public ServerPeer(int serverPeerID,
@@ -272,6 +267,10 @@
txRepository.loadPreparedTransactions();
initializeRemoting(mbeanServer);
+
+ //Now everything is started we can tell the invocation handler to start handling invocations
+ //We do this right at the end otherwise it can start handling invocations before we are properly started
+ JMSServerInvocationHandler.setClosed(false);
started = true;
@@ -296,6 +295,11 @@
log.debug(this + " stopping");
started = false;
+
+ //Tell the invocation handler we are closed - this is so we don't attempt to handle
+ //any invocations when we are in a partial closing down state - which can give strange
+ //"object not found with id" exceptions and stuff like that
+ JMSServerInvocationHandler.setClosed(true);
// Stop the wired components
@@ -927,11 +931,6 @@
{
return channelIDManager;
}
-
- public ServerInvocationHandler getInvocationHandler()
- {
- return handler;
- }
public ServerSessionEndpoint getSession(Integer sessionID)
{
@@ -1277,8 +1276,6 @@
JMSWireFormat wf = new JMSWireFormat();
MarshalFactory.addMarshaller("jms", wf, wf);
-
- handler = new JMSServerInvocationHandler();
}
private void loadServerAOPConfig() throws Exception
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2007-03-15 22:34:09 UTC (rev 2549)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2007-03-15 23:18:18 UTC (rev 2550)
@@ -198,18 +198,8 @@
{
try
{
-// if (!connectionEndpoint.isFailoverConnection())
-// {
- // regular consumer
- return createConsumerDelegateInternal(jmsDestination, selector,
- noLocal, subscriptionName);
- // }
-
-// // we're child of a failover connection. Favor failover channels when creating new
-// // consumers
-// return createFailoverConsumerDelegateInternal(jmsDestination, selector,
-// noLocal, subscriptionName,
-// failoverChannelID);
+ return createConsumerDelegateInternal(jmsDestination, selector,
+ noLocal, subscriptionName);
}
catch (Throwable t)
{
@@ -223,15 +213,7 @@
{
try
{
-// if (!connectionEndpoint.isFailoverConnection())
-// {
- // regular browser
- return createBrowserDelegateInternal(jmsDestination, selector);
-// }
-//
-// // we're child of a failover connection. Favor failover channels when creating new
-// // browsers
-// return createFailoverBrowserDelegateInternal(jmsDestination, selector, failoverChannelID);
+ return createBrowserDelegateInternal(jmsDestination, selector);
}
catch (Throwable t)
{
@@ -850,6 +832,9 @@
promptDelivery(channels);
+ //Close down the executor
+ executor.shutdownAfterProcessingCurrentTask();
+
deliveries.clear();
sp.removeSession(new Integer(id));
Modified: trunk/src/main/org/jboss/jms/server/remoting/JMSServerInvocationHandler.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/remoting/JMSServerInvocationHandler.java 2007-03-15 22:34:09 UTC (rev 2549)
+++ trunk/src/main/org/jboss/jms/server/remoting/JMSServerInvocationHandler.java 2007-03-15 23:18:18 UTC (rev 2550)
@@ -28,6 +28,7 @@
import javax.management.MBeanServer;
+import org.jboss.jms.util.MessagingJMSException;
import org.jboss.jms.wireformat.ConnectionFactoryCreateConnectionDelegateRequest;
import org.jboss.jms.wireformat.RequestSupport;
import org.jboss.logging.Logger;
@@ -62,14 +63,22 @@
private boolean trace;
+ //We need some way the server peer can call the invocation handler to make it open/closed
+ private static boolean closed = true;
+
+ public static synchronized void setClosed(boolean closed)
+ {
+ JMSServerInvocationHandler.closed = closed;
+ }
+
// Constructors ---------------------------------------------------------------------------------
public JMSServerInvocationHandler()
{
callbackHandlers = new HashMap();
trace = log.isTraceEnabled();
- }
-
+ }
+
// ServerInvocationHandler ----------------------------------------------------------------------
public void setMBeanServer(MBeanServer server)
@@ -92,37 +101,45 @@
public Object invoke(InvocationRequest invocation) throws Throwable
{
if (trace) { log.trace("invoking " + invocation); }
-
- RequestSupport request = (RequestSupport)invocation.getParameter();
- if (request instanceof ConnectionFactoryCreateConnectionDelegateRequest)
- {
- //Create connection request
-
- ConnectionFactoryCreateConnectionDelegateRequest cReq =
- (ConnectionFactoryCreateConnectionDelegateRequest)request;
-
- String remotingSessionId = cReq.getRemotingSessionID();
-
- ServerInvokerCallbackHandler callbackHandler = null;
- synchronized(callbackHandlers)
+ synchronized (JMSServerInvocationHandler.class)
+ {
+ if (closed)
{
- callbackHandler = (ServerInvokerCallbackHandler)callbackHandlers.get(remotingSessionId);
+ throw new MessagingJMSException("Cannot handle invocation since server is not active (it is either starting up or shutting down)");
}
- if (callbackHandler != null)
+
+ RequestSupport request = (RequestSupport)invocation.getParameter();
+
+ if (request instanceof ConnectionFactoryCreateConnectionDelegateRequest)
{
- log.debug("found calllback handler for remoting session " + Util.guidToString(remotingSessionId));
+ //Create connection request
- cReq.setCallbackHandler(callbackHandler);
+ ConnectionFactoryCreateConnectionDelegateRequest cReq =
+ (ConnectionFactoryCreateConnectionDelegateRequest)request;
+
+ String remotingSessionId = cReq.getRemotingSessionID();
+
+ ServerInvokerCallbackHandler callbackHandler = null;
+ synchronized(callbackHandlers)
+ {
+ callbackHandler = (ServerInvokerCallbackHandler)callbackHandlers.get(remotingSessionId);
+ }
+ if (callbackHandler != null)
+ {
+ log.debug("found calllback handler for remoting session " + Util.guidToString(remotingSessionId));
+
+ cReq.setCallbackHandler(callbackHandler);
+ }
+ else
+ {
+ throw new IllegalStateException("Cannot find callback handler " +
+ "for session id " + remotingSessionId);
+ }
}
- else
- {
- throw new IllegalStateException("Cannot find callback handler " +
- "for session id " + remotingSessionId);
- }
+
+ return request.serverInvoke();
}
-
- return request.serverInvoke();
}
public void addListener(InvokerCallbackHandler callbackHandler)
@@ -193,5 +210,10 @@
// Private --------------------------------------------------------------------------------------
+ private synchronized void doSetClosed(boolean closed)
+ {
+ this.closed = true;
+ }
+
// Inner classes --------------------------------------------------------------------------------
}
More information about the jboss-cvs-commits
mailing list