[jboss-cvs] JBoss Messaging SVN: r1783 - in trunk: src/main/org/jboss/jms/client/container src/main/org/jboss/jms/client/delegate tests/src/org/jboss/test/messaging/jms/clustering
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Dec 13 04:46:44 EST 2006
Author: ovidiu.feodorov at jboss.com
Date: 2006-12-13 04:46:39 -0500 (Wed, 13 Dec 2006)
New Revision: 1783
Added:
trunk/src/main/org/jboss/jms/client/container/DelegateIdentity.java
Modified:
trunk/src/main/org/jboss/jms/client/container/ClosedInterceptor.java
trunk/src/main/org/jboss/jms/client/container/HAAspect.java
trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
trunk/src/main/org/jboss/jms/client/delegate/ClusteredClientConnectionFactoryDelegate.java
trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java
Log:
HAAspect delegate refactoring. Added a new failover test (that currently fails)
Modified: trunk/src/main/org/jboss/jms/client/container/ClosedInterceptor.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/ClosedInterceptor.java 2006-12-13 07:05:09 UTC (rev 1782)
+++ trunk/src/main/org/jboss/jms/client/container/ClosedInterceptor.java 2006-12-13 09:46:39 UTC (rev 1783)
@@ -72,8 +72,7 @@
private int inUseCount;
// The identity of the delegate this interceptor is associated with
- private Integer id;
- private String delegateType;
+ private DelegateIdentity id;
// Static --------------------------------------------------------
@@ -106,7 +105,7 @@
}
else
{
- sb.append(delegateType).append("[").append(id.intValue()).append("]");
+ sb.append(id.getType()).append("[").append(id.getID()).append("]");
}
return sb.toString();
}
@@ -129,7 +128,7 @@
// logging purposes. It makes sense, since it's an PER_INSTANCE interceptor
if (id == null)
{
- getIdentity(invocation);
+ id = DelegateIdentity.getIdentity(invocation);
}
String methodName = ((MethodInvocation) invocation).getMethod().getName();
@@ -319,14 +318,6 @@
// Private --------------------------------------------------------
- private void getIdentity(Invocation i)
- {
- DelegateSupport ds = (DelegateSupport)i.getTargetObject();
- id = new Integer(ds.getID());
- delegateType = ds.getClass().getName();
- delegateType = delegateType.substring(delegateType.lastIndexOf('.') + 1);
- }
-
// Inner Classes --------------------------------------------------
}
Added: trunk/src/main/org/jboss/jms/client/container/DelegateIdentity.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/DelegateIdentity.java 2006-12-13 07:05:09 UTC (rev 1782)
+++ trunk/src/main/org/jboss/jms/client/container/DelegateIdentity.java 2006-12-13 09:46:39 UTC (rev 1783)
@@ -0,0 +1,67 @@
+/**
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.jms.client.container;
+
+import org.jboss.aop.joinpoint.Invocation;
+import org.jboss.jms.client.delegate.DelegateSupport;
+
+/**
+ * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
+ * @version <tt>$Revision$</tt>
+ * $Id$
+ */
+public class DelegateIdentity
+{
+ // Constants -----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ public static DelegateIdentity getIdentity(Invocation i)
+ {
+ DelegateSupport ds = (DelegateSupport)i.getTargetObject();
+
+ Integer id = new Integer(ds.getID());
+ String type = ds.getClass().getName();
+
+ type = type.substring(type.lastIndexOf('.') + 1);
+
+ return new DelegateIdentity(id, type);
+ }
+
+ // Attributes ----------------------------------------------------
+
+ private Integer id;
+ private String type;
+
+ // Constructors --------------------------------------------------
+
+ public DelegateIdentity(Integer id, String type)
+ {
+ this.id = id;
+ this.type = type;
+ }
+
+ // Public --------------------------------------------------------
+
+ public Integer getID()
+ {
+ return id;
+ }
+
+ public String getType()
+ {
+ return type;
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Modified: trunk/src/main/org/jboss/jms/client/container/HAAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/HAAspect.java 2006-12-13 07:05:09 UTC (rev 1782)
+++ trunk/src/main/org/jboss/jms/client/container/HAAspect.java 2006-12-13 09:46:39 UTC (rev 1783)
@@ -66,287 +66,288 @@
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
*
* @version <tt>$Revision: 1.1 $</tt>
*
* $Id$
- *
- * TODO: Make some log.info as log.trace after the code is final
- *
*/
public class HAAspect
{
+ // Constants -----------------------------------------------------
+
private static final Logger log = Logger.getLogger(HAAspect.class);
-
+
+ public static final int MAX_RECONNECT_HOP_COUNT = 10;
+
+ // Static --------------------------------------------------------
+
private static boolean trace = log.isTraceEnabled();
-
- //Cache this here
+
+ // Attributes ----------------------------------------------------
+
private ClientConnectionFactoryDelegate[] delegates;
-
- //Cache this here
+
private Map failoverMap;
-
+
private int currentRobinIndex;
-
+
+ // The identity of the delegate this interceptor is associated with
+ private DelegateIdentity id;
+
+ // Constructors --------------------------------------------------
+
+ public HAAspect()
+ {
+ id = null;
+ }
+
+ // Public --------------------------------------------------------
+
public Object handleCreateConnectionDelegate(Invocation invocation) throws Throwable
{
- if (getServers(invocation) != null)
+ // maintain the identity of the delegate that sends invocation through this aspect, for
+ // logging purposes. It makes sense, since it's an PER_INSTANCE aspect.
+ if (id == null)
{
- // TODO: this should be in loop while we get exceptions creating connections, always trying
- // the next Delegate when we get an exception.
- log.info("Clustered createConnection");
-
- //In a clustered configuration we create connections in a round-robin fashion
- //from the available servers
-
- ClientConnectionFactoryDelegate cfDelegate = getDelegateRoundRobin();
-
- //Now create a connection delegate for this
-
- MethodInvocation mi = (MethodInvocation)invocation;
-
- String username = (String)mi.getArguments()[0];
-
- String password = (String)mi.getArguments()[1];
-
- ClientConnectionDelegate connDelegate = createConnection(cfDelegate, username, password);
-
- return new CreateConnectionResult(connDelegate);
+ id = DelegateIdentity.getIdentity(invocation);
}
- else
+
+ cacheLocalDelegates(invocation);
+
+ if (delegates == null)
{
- //Non clustered
-
- log.info("Assumed non clustered");
-
+ // not clustered, pass the invocation through
return invocation.invokeNext();
}
- }
-
- //TODO this is currently hardcoded as round-robin, this should be made pluggable
- private synchronized ClientConnectionFactoryDelegate getDelegateRoundRobin()
+
+ // clustered
+
+ // TODO: this should be in loop while we get exceptions creating connections, always trying
+ // the next Delegate when we get an exception.
+
+ // In a clustered configuration we create connections in a round-robin fashion, contacting
+ // successively all available servers.
+
+ ClientConnectionFactoryDelegate cfDelegate = getDelegateRoundRobin();
+
+ // Now create a connection delegate for this
+
+ MethodInvocation mi = (MethodInvocation)invocation;
+ String username = (String)mi.getArguments()[0];
+ String password = (String)mi.getArguments()[1];
+
+ ClientConnectionDelegate cd = createConnection(cfDelegate, username, password);
+
+ return new CreateConnectionResult(cd);
+ }
+
+ public String toString()
{
- ClientConnectionFactoryDelegate currentDelegate = delegates[currentRobinIndex++];
-
- if (currentRobinIndex >= delegates.length)
+ StringBuffer sb = new StringBuffer("HAAspect.");
+ if (id == null)
{
- currentRobinIndex = 0;
+ sb.append("UNINITIALIZED");
}
-
- return currentDelegate;
+ else
+ {
+ sb.append(id.getType()).append("[").append(id.getID()).append("]");
+ }
+ return sb.toString();
}
-
- private synchronized ClientConnectionFactoryDelegate[] getServers(Invocation invocation)
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ private synchronized void cacheLocalDelegates(Invocation invocation)
{
- if (delegates == null)
+ if (delegates != null)
{
- log.info("Looking for delegates");
+ // the first set of delegates is already cached, return
+ return;
+ }
- MethodInvocation methodInvoke = (MethodInvocation)invocation;
+ MethodInvocation mi = (MethodInvocation)invocation;
+ Object target = mi.getTargetObject();
- Object target = methodInvoke.getTargetObject();
-
- if (target instanceof ClusteredClientConnectionFactoryDelegate)
- {
- delegates = ((ClusteredClientConnectionFactoryDelegate)target).getDelegates();
- }
+ if (target instanceof ClusteredClientConnectionFactoryDelegate)
+ {
+ ClusteredClientConnectionFactoryDelegate cccfd =
+ (ClusteredClientConnectionFactoryDelegate)target;
+ delegates = cccfd.getDelegates();
+
if (delegates != null)
{
- failoverMap = ((ClusteredClientConnectionFactoryDelegate)target).getFailoverMap();
+ failoverMap = cccfd.getFailoverMap();
if (failoverMap == null)
{
- throw new IllegalStateException("Cannot find failoverMap!");
+ throw new IllegalStateException("HAAspect cannot find the failover map!");
}
}
}
-
- return delegates;
}
-
- private ClientConnectionDelegate createConnection(ClientConnectionFactoryDelegate cf, String username, String password)
- throws Exception
+
+ //TODO this is currently hardcoded as round-robin, this should be made pluggable
+ private synchronized ClientConnectionFactoryDelegate getDelegateRoundRobin()
{
- log.info("createConnection");
-
- CreateConnectionResult res = (CreateConnectionResult)cf.createConnectionDelegate(username, password, -1);
-
- ClientConnectionDelegate connDelegate = (ClientConnectionDelegate)res.getDelegate();
-
- addListener(connDelegate);
-
- return connDelegate;
+ ClientConnectionFactoryDelegate currentDelegate = delegates[currentRobinIndex++];
+
+ if (currentRobinIndex >= delegates.length)
+ {
+ currentRobinIndex = 0;
+ }
+
+ return currentDelegate;
}
-
- private void addListener(ClientConnectionDelegate connDelegate)
+
+ private ClientConnectionDelegate createConnection(ClientConnectionFactoryDelegate cfd,
+ String username,
+ String password)
+ throws Exception
{
- //Add a connection listener
-
- ConnectionState state = (ConnectionState)((DelegateSupport)connDelegate).getState();
-
- ConnectionListener listener = new Listener(connDelegate);
-
+ CreateConnectionResult res =
+ (CreateConnectionResult)cfd.createConnectionDelegate(username, password, -1);
+
+ ClientConnectionDelegate cd = (ClientConnectionDelegate)res.getDelegate();
+
+ // Add a connection listener to detect failure
+
+ ConnectionListener listener = new ConnectionFailureListener(cd);
+ ConnectionState state = (ConnectionState)((DelegateSupport)cd).getState();
state.getRemotingConnection().getInvokingClient().addConnectionListener(listener);
+
+ return cd;
}
- //The connection has failed
- private void handleFailure(ClientConnectionDelegate failedConnection) throws Exception
+ /**
+ * @return a failover ClientConnectionFactoryDelegate or null if a suitable delegate cannot be
+ * found.
+ */
+ private ClientConnectionFactoryDelegate getFailoverDelegate(int failedServerID)
{
- log.info("Handling failure");
-
- //Get the connection factory we are going to failover onto
- ClientConnectionFactoryDelegate newCF = getFailoverDelegate(failedConnection);
-
- ConnectionState state = (ConnectionState)((DelegateSupport)failedConnection).getState();
-
- log.info("calling createFailoverConnectionDelegate");
-
- int tries = 0;
-
- //We try a maximum of 10 hops
- final int MAX_TRIES = 10;
-
- while (tries < MAX_TRIES)
- {
- //Create a connection using that connection factory
- CreateConnectionResult res =
- newCF.createConnectionDelegate(state.getUser(), state.getPassword(), state.getServerID());
-
- log.info("returned from createFailoverConnectionDelegate");
-
- if (res.getDelegate() != null)
+ // Look up the server to failover onto in the failover map
+
+ Integer failoverServerID = (Integer)failoverMap.get(new Integer(failedServerID));
+
+ for (int i = 0; i < delegates.length; i++)
+ {
+ if (delegates[i].getServerId() == failoverServerID.intValue())
{
- log.info("Got connection");
-
- //We got the right server and created a new connection ok
-
- ClientConnectionDelegate newConnection = (ClientConnectionDelegate)res.getDelegate();
-
- log.info("newconnection is " + newConnection);
-
- failover(failedConnection, newConnection);
-
- break;
+ return delegates[i];
}
- else
- {
- if (res.getActualFailoverNode() == -1)
- {
- //No trace of failover was detected on the server side - this might happen if the client side
- //network fails temporarily so the client connection breaks but the server side network is still
- //up and running - in this case we don't failover
-
- //TODO Is this the right thing to do?
-
- log.trace("Client attempted to failover, but no failover had occurred on the server side");
-
- break;
-
- }
- else
- {
- //Server side failover has occurred / is occurring but we tried the wrong node
- //Now we must try the correct node
-
- newCF = null;
-
- tries++;
-
- for (int i = 0; i < delegates.length; i++)
- {
- ClientConnectionFactoryDelegate del = delegates[i];
-
- if (del.getServerId() == res.getActualFailoverNode())
- {
- newCF = del;
-
- break;
- }
- }
-
- if (newCF == null)
- {
- //Houston, we have a problem
-
- //TODO Could this ever happen? Should we send back the cf, or update it instead of just the id??
- throw new JMSException("Cannot find server with id " + res.getActualFailoverNode());
- }
- }
- }
}
-
- if (tries == MAX_TRIES)
- {
- throw new JMSException("Cannot find correct server to failover onto");
- }
+
+ return null;
}
-
- private ClientConnectionFactoryDelegate getFailoverDelegate(ClientConnectionDelegate currentDelegate) throws JMSException
+
+ private void handleFailure(ClientConnectionDelegate failedConnDelegate) throws Exception
{
- //We need to choose which delegate to fail over to
-
- ConnectionState currentState = (ConnectionState)((DelegateSupport)currentDelegate).getState();
-
- int currentServerID = currentState.getServerID();
-
- //Lookup in the failover map to see which server to fail over onto
-
- Integer failoverServerID = (Integer)failoverMap.get(new Integer(currentServerID));
-
- if (failoverServerID == null)
+ ConnectionState failedConnState =
+ (ConnectionState)((DelegateSupport)failedConnDelegate).getState();
+
+ int failedServerID = failedConnState.getServerID();
+
+ log.debug(this + " handling failure on connection to node " + failedServerID);
+
+ // Get the default connection factory delegate we are going to failover onto
+
+ ClientConnectionFactoryDelegate failoverDelegate = getFailoverDelegate(failedServerID);
+
+ if (failoverDelegate == null)
{
- throw new IllegalStateException("Cannot find failover node for node " + currentServerID);
+ throw new IllegalStateException("Cannot find default failover node for server " +
+ failedServerID);
}
-
- //Now find the actual delegate
-
- ClientConnectionFactoryDelegate del = null;
-
- for (int i = 0; i < delegates.length; i++)
+
+ // We attempt to connect to the failover node in a loop, since we might need to go through
+ // multiple hops
+
+ int attemptCount = 0;
+
+ outer: while (attemptCount < MAX_RECONNECT_HOP_COUNT)
{
- if (delegates[i].getServerId() == failoverServerID.intValue())
+ // Create a connection using that connection factory
+ CreateConnectionResult r = failoverDelegate.
+ createConnectionDelegate(failedConnState.getUser(),
+ failedConnState.getPassword(),
+ failedConnState.getServerID());
+
+ if (r.getDelegate() != null)
{
- del = delegates[i];
-
- break;
+ // We got the right server and created a new connection
+ performClientSideFailover(failedConnDelegate, (ClientConnectionDelegate)r.getDelegate());
+ return;
}
+
+ // Did not get a valid connection to the node we've just tried
+
+ int actualServerID = r.getActualFailoverNode();
+
+ if (actualServerID == -1)
+ {
+ // No failover attempt was detected on the server side; this might happen if the client
+ // side network fails temporarily so the client connection breaks but the server cluster
+ // is still up and running - in this case we don't perform failover.
+
+ //TODO Is this the right thing to do?
+
+ log.warn("Client attempted failover, but no failover attempt " +
+ "has been detected on the server side.");
+
+ return;
+ }
+
+ // Server side failover has occurred / is occurring but trying to go to the 'default'
+ // failover node did not succeed. Retry with the node suggested by the cluster.
+
+ attemptCount++;
+
+ for (int i = 0; i < delegates.length; i++)
+ {
+ if (delegates[i].getServerId() == actualServerID)
+ {
+ failoverDelegate = delegates[i];
+ continue outer;
+ }
+ }
+
+ // the delegate corresponding to the actualServerID not found among the cached delegates
+ //TODO Could this ever happen? Should we send back the cf, or update it instead of just the id??
+ throw new JMSException("Cannot find a cached connection factory delegate for " +
+ "node " + actualServerID);
}
-
- if (del == null)
- {
- throw new IllegalStateException("Cannot find failover delegate for node " + failoverServerID.intValue());
- }
-
- return del;
+
+ throw new JMSException("Maximum number of failover attempts exceeded. " +
+ "Cannot find a server to failover onto.");
}
-
- private void failover(ClientConnectionDelegate failedConnection, ClientConnectionDelegate newConnection) throws Exception
+
+ private void performClientSideFailover(ClientConnectionDelegate failedConnDelegate,
+ ClientConnectionDelegate newConnDelegate) throws Exception
{
- if (trace) { log.trace("calling handleFailover"); }
-
- log.info("performing failover");
+ log.debug(this + " performing client side failover");
- ConnectionState failedState = (ConnectionState)failedConnection.getState();
+ ConnectionState failedState = (ConnectionState)failedConnDelegate.getState();
+ ConnectionState newState = (ConnectionState)newConnDelegate.getState();
- ConnectionState newState = (ConnectionState)newConnection.getState();
-
if (failedState.getClientID() != null)
{
- newConnection.setClientID(failedState.getClientID());
+ newConnDelegate.setClientID(failedState.getClientID());
}
// Transfer attributes from newDelegate to failedDelegate
- failedConnection.copyAttributes(newConnection);
-
+ failedConnDelegate.copyAttributes(newConnDelegate);
+
int oldServerId = failedState.getServerID();
-
+
CallbackManager oldCallbackManager = failedState.getRemotingConnection().getCallbackManager();
-
+
//We need to update some of the attributes on the state
failedState.copyState(newState);
-
- log.info("failing over children");
for(Iterator i = failedState.getChildren().iterator(); i.hasNext(); )
{
@@ -354,26 +355,26 @@
ClientSessionDelegate failedSessionDelegate =
(ClientSessionDelegate)failedSessionState.getDelegate();
-
- ClientSessionDelegate newSessionDelegate = (ClientSessionDelegate)newConnection.
+
+ ClientSessionDelegate newSessionDelegate = (ClientSessionDelegate)newConnDelegate.
createSessionDelegate(failedSessionState.isTransacted(),
failedSessionState.getAcknowledgeMode(),
failedSessionState.isXA());
-
+
SessionState newSessionState = (SessionState)newSessionDelegate.getState();
failedSessionDelegate.copyAttributes(newSessionDelegate);
-
+
//We need to update some of the attributes on the state
- newSessionState.copyState(newSessionState);
-
+ newSessionState.copyState(newSessionState);
+
if (trace) { log.trace("replacing session (" + failedSessionDelegate + ") with a new failover session " + newSessionDelegate); }
-
+
List children = new ArrayList();
-
+
// TODO Why is this clone necessary?
children.addAll(failedSessionState.getChildren());
-
+
Set consumerIds = new HashSet();
for (Iterator j = children.iterator(); j.hasNext(); )
@@ -385,84 +386,84 @@
handleFailoverOnProducer((ProducerState)sessionChild, newSessionDelegate);
}
else if (sessionChild instanceof ConsumerState)
- {
- handleFailoverOnConsumer(failedConnection,
+ {
+ handleFailoverOnConsumer(failedConnDelegate,
failedState,
failedSessionState,
(ConsumerState)sessionChild,
failedSessionDelegate,
oldServerId,
- oldCallbackManager);
-
+ oldCallbackManager);
+
// We add the new consumer id to the list of old ids
- consumerIds.add(new Integer(((ConsumerState)sessionChild).getConsumerID()));
+ consumerIds.add(new Integer(((ConsumerState)sessionChild).getConsumerID()));
}
else if (sessionChild instanceof BrowserState)
{
handleFailoverOnBrowser((BrowserState)sessionChild, newSessionDelegate);
}
}
-
+
/* Now we must sent the list of unacked AckInfos to the server - so the consumers
* delivery lists can be repopulated
*/
List ackInfos = null;
-
+
if (!failedSessionState.isTransacted() && !failedSessionState.isXA())
{
/*
Now we remove any unacked np messages - this is because we don't want to ack them
since the server won't know about them and will barf
*/
-
+
Iterator iter = newSessionState.getToAck().iterator();
-
+
while (iter.hasNext())
{
AckInfo info = (AckInfo)iter.next();
-
+
if (!info.getMessage().getMessage().isReliable())
{
iter.remove();
- }
+ }
}
-
+
//Get the ack infos from the list in the session state
ackInfos = failedSessionState.getToAck();
}
else
- {
+ {
//Transacted session - we need to get the acks from the resource manager
//btw we have kept the old resource manager
ResourceManager rm = failedState.getResourceManager();
-
+
// Remove any non persistent acks - so server doesn't barf on commit
-
+
rm.removeNonPersistentAcks(consumerIds);
-
- ackInfos = rm.getAckInfosForConsumerIds(consumerIds);
+
+ ackInfos = rm.getAckInfosForConsumerIds(consumerIds);
}
-
+
if (!ackInfos.isEmpty())
{
log.info("Sending " + ackInfos.size() + " unacked");
newSessionDelegate.sendUnackedAckInfos(ackInfos);
- }
+ }
}
-
+
//TODO
//If the session had consumers which are now closed then there is no way to recreate them on the server
//we need to store with session id
-
- //We must not start the connection until the end
+
+ // We must not start the connection until the end
if (failedState.isStarted())
{
- failedConnection.start();
+ failedConnDelegate.start();
}
-
- log.info("Failover done");
+
+ log.info(this + " completed client-side failover");
}
-
+
private void handleFailoverOnConsumer(ClientConnectionDelegate failedConnectionDelegate,
ConnectionState failedConnectionState,
SessionState failedSessionState,
@@ -472,12 +473,12 @@
CallbackManager oldCallbackManager)
throws JMSException
{
- log.info("Failing over consumer");
-
+ log.debug(this + " failing over consumer " + failedConsumerState);
+
ClientConsumerDelegate failedConsumerDelegate =
(ClientConsumerDelegate)failedConsumerState.getDelegate();
- if (trace) { log.trace("handleFailoverOnConsumer: creating alternate consumer"); }
+ if (trace) { log.trace(this + " creating alternate consumer"); }
ClientConsumerDelegate newConsumerDelegate = (ClientConsumerDelegate)failedSessionDelegate.
createConsumerDelegate((JBossDestination)failedConsumerState.getDestination(),
@@ -487,60 +488,59 @@
failedConsumerState.isConnectionConsumer(),
failedConsumerState.getChannelId());
- if (trace) { log.trace("handleFailoverOnConsumer: alternate consumer created"); }
-
- //Copy the attributes from the new consumer to the old consumer
+ if (trace) { log.trace(this + " alternate consumer created"); }
+
+ // Copy the attributes from the new consumer to the old consumer
failedConsumerDelegate.copyAttributes(newConsumerDelegate);
ConsumerState newState = (ConsumerState)newConsumerDelegate.getState();
-
+
int oldConsumerID = failedConsumerState.getConsumerID();
-
- //Update attributes on the old state
+
+ // Update attributes on the old state
failedConsumerState.copyState(newState);
if (failedSessionState.isTransacted() || failedSessionState.isXA())
{
- //Replace the old consumer id with the new consumer id
-
+ // Replace the old consumer id with the new consumer id
+
ResourceManager rm = failedConnectionState.getResourceManager();
-
+
rm.handleFailover(oldConsumerID, failedConsumerState.getConsumerID());
}
-
- //We need to re-use the existing message callback handler
-
- log.info("Old server id:" + oldServerID + " old consumer id:" + oldConsumerID);
- MessageCallbackHandler oldHandler = oldCallbackManager.unregisterHandler(oldServerID, oldConsumerID);
-
+
+ // We need to re-use the existing message callback handler
+
+ MessageCallbackHandler oldHandler =
+ oldCallbackManager.unregisterHandler(oldServerID, oldConsumerID);
+
ConnectionState newConnectionState = (ConnectionState)failedConnectionDelegate.getState();
-
- CallbackManager newCallbackManager = newConnectionState.getRemotingConnection().getCallbackManager();
-
- log.info("New server id:" + newConnectionState.getServerID() + " new consuer id:" + newState.getConsumerID());
-
- //Remove the new handler
- MessageCallbackHandler newHandler = newCallbackManager.unregisterHandler(newConnectionState.getServerID(),
- newState.getConsumerID());
-
- log.info("New handler is " + System.identityHashCode(newHandler));
-
+
+ CallbackManager newCallbackManager =
+ newConnectionState.getRemotingConnection().getCallbackManager();
+
+ // Remove the new handler
+ MessageCallbackHandler newHandler = newCallbackManager.
+ unregisterHandler(newConnectionState.getServerID(), newState.getConsumerID());
+
+ log.debug("New handler is " + System.identityHashCode(newHandler));
+
//But we need to update some fields from the new one
oldHandler.copyState(newHandler);
-
+
//Now we re-register the old handler with the new callback manager
-
+
newCallbackManager.registerHandler(newConnectionState.getServerID(),
newState.getConsumerID(),
oldHandler);
-
- //We don't need to add the handler to the session state since it is already there - we
- //are re-using the old handler
-
- log.info("failed over consumer");
+
+ // We don't need to add the handler to the session state since it is already there - we
+ // are re-using the old handler
+
+ log.debug(this + " failed over consumer");
}
-
+
private void handleFailoverOnProducer(ProducerState failedProducerState,
ClientSessionDelegate failedSessionDelegate)
throws JMSException
@@ -552,9 +552,8 @@
(ClientProducerDelegate)failedProducerState.getDelegate();
failedProducerDelegate.copyAttributes(newProducerDelegate);
-
failedProducerState.copyState((ProducerState)newProducerDelegate.getState());
-
+
if (trace) { log.trace("handling fail over on producerDelegate " + failedProducerDelegate + " destination=" + failedProducerState.getDestination()); }
}
@@ -563,56 +562,59 @@
throws JMSException
{
ClientBrowserDelegate newBrowserDelegate = (ClientBrowserDelegate)failedSessionDelegate.
- createBrowserDelegate(failedBrowserState.getJmsDestination(),failedBrowserState.getMessageSelector());
+ createBrowserDelegate(failedBrowserState.getJmsDestination(),
+ failedBrowserState.getMessageSelector());
ClientBrowserDelegate failedBrowserDelegate =
(ClientBrowserDelegate)failedBrowserState.getDelegate();
failedBrowserDelegate.copyAttributes(newBrowserDelegate);
-
failedBrowserState.copyState((BrowserState)newBrowserDelegate.getState());
-
+
if (trace) { log.trace("handling fail over on browserDelegate " + failedBrowserDelegate + " destination=" + failedBrowserState.getJmsDestination() + " selector=" + failedBrowserState.getMessageSelector()); }
}
-
- private class Listener implements ConnectionListener
+
+ // Inner classes -------------------------------------------------
+
+ private class ConnectionFailureListener implements ConnectionListener
{
- private ClientConnectionDelegate connection;
+ private ClientConnectionDelegate cd;
private Valve valve;
-
- Listener(ClientConnectionDelegate connection)
+
+ ConnectionFailureListener(ClientConnectionDelegate cd)
{
- this.connection = connection;
+ this.cd = cd;
this.valve = new Valve();
-
- log.info("************* CREATING LISTENER");
}
-
+
+ // ConnectionListener implementation ---------------------------
+
public void handleConnectionException(Throwable throwable, Client client)
{
try
{
- log.info("********* EXCEPTION DETECTED", throwable);
+ log.debug(this + " is being notified of connection failure: " + throwable);
- // it references Valve to a local variable.
- // Since we reset the valve at the end, we need to guarantee we will have the same Valve instance
- // from the moment we enter this method.
+ // It references Valve to a local variable. Since we reset the valve at the end, we need
+ // to guarantee we will have the same Valve instance from the moment we entered this
+ // method.
+
Valve localValve = null;
synchronized (this)
{
localValve = valve;
}
- // We can't have more than one exception being caught at the same time.
- // On that case we will open the valve and any other thread opening the valve
- // will wait until its completion
+ // We can't have more than one exception being caught at the same time. On that case we
+ // will open the valve and any other thread opening the valve will wait until
+ // its completion.
+
if (localValve.open())
{
try
{
- log.info("********* HANDLING FAILOVER");
- handleFailure(connection);
+ handleFailure(cd);
}
finally
{
@@ -625,15 +627,19 @@
}
else
{
- log.info("********* ANOTHER THREAD WAS RESPONSIBLE FOR FAILOVER AS THE VALVE WAS CLOSED");
+ log.debug(this + ": Another thread was responsible for failover as the valve was closed");
}
}
catch (Throwable e)
{
log.error("Caught exception in handling failure", e);
- e.printStackTrace();
}
}
+
+ public String toString()
+ {
+ return "ConnectionFailureListener[" + Integer.toHexString(hashCode()) + "]";
+ }
}
}
Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java 2006-12-13 07:05:09 UTC (rev 1782)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java 2006-12-13 09:46:39 UTC (rev 1783)
@@ -294,7 +294,7 @@
public String toString()
{
- return "ClientConnectionFactoryDelegate[ID=" + id + "]";
+ return "ClientConnectionFactoryDelegate[" + id + "]";
}
public String getServerLocatorURI()
Modified: trunk/src/main/org/jboss/jms/client/delegate/ClusteredClientConnectionFactoryDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClusteredClientConnectionFactoryDelegate.java 2006-12-13 07:05:09 UTC (rev 1782)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClusteredClientConnectionFactoryDelegate.java 2006-12-13 09:46:39 UTC (rev 1783)
@@ -118,7 +118,7 @@
public String toString()
{
- StringBuffer sb = new StringBuffer("ClusteredConnFactoryDelegate[ID=");
+ StringBuffer sb = new StringBuffer("ClusteredConnFactoryDelegate[");
sb.append(id).append("][");
if (delegates == null)
{
Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java 2006-12-13 07:05:09 UTC (rev 1782)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java 2006-12-13 09:46:39 UTC (rev 1783)
@@ -82,9 +82,9 @@
log.info("######## KILLED NODE 1");
log.info("########");
- // TODO - this shouldn't be necessary if we have the client valve in place
+ // TODO - this shouldn't be necessary if we have the client valve in place
log.info("Sleeping for 1 min");
- Thread.sleep(60000);
+ Thread.sleep(30000);
// we must receive the message
More information about the jboss-cvs-commits
mailing list