[jboss-cvs] JBoss Messaging SVN: r2840 - in trunk: src/main/org/jboss/jms/server/connectionmanager and 3 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Jul 4 15:26:26 EDT 2007
Author: timfox
Date: 2007-07-04 15:26:26 -0400 (Wed, 04 Jul 2007)
New Revision: 2840
Modified:
trunk/src/main/org/jboss/jms/server/ServerPeer.java
trunk/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java
trunk/src/main/org/jboss/messaging/core/contract/Replicator.java
trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
trunk/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java
Log:
Fixed remoting deadlock issue
Modified: trunk/src/main/org/jboss/jms/server/ServerPeer.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/ServerPeer.java 2007-07-04 18:32:01 UTC (rev 2839)
+++ trunk/src/main/org/jboss/jms/server/ServerPeer.java 2007-07-04 19:26:26 UTC (rev 2840)
@@ -150,7 +150,7 @@
private SecurityMetadataStore securityStore;
private ConnectionFactoryJNDIMapper connFactoryJNDIMapper;
private TransactionRepository txRepository;
- private ConnectionManager connectionManager;
+ private SimpleConnectionManager connectionManager;
private ConnectorManager connectorManager;
private IDManager messageIDManager;
private IDManager channelIDManager;
@@ -1230,6 +1230,8 @@
this.clusterConnectionManager.injectPostOffice(postOffice);
this.clusterConnectionManager.injectReplicator((Replicator)postOffice);
+
+ this.connectionManager.injectReplicator((Replicator)postOffice);
}
// Also need to inject into txRepository
Modified: trunk/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java 2007-07-04 18:32:01 UTC (rev 2839)
+++ trunk/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java 2007-07-04 19:26:26 UTC (rev 2840)
@@ -35,6 +35,9 @@
import org.jboss.jms.delegate.ConnectionEndpoint;
import org.jboss.jms.server.ConnectionManager;
import org.jboss.logging.Logger;
+import org.jboss.messaging.core.contract.ClusterNotification;
+import org.jboss.messaging.core.contract.ClusterNotificationListener;
+import org.jboss.messaging.core.contract.Replicator;
import org.jboss.messaging.util.Util;
import org.jboss.remoting.Client;
import org.jboss.remoting.ClientDisconnectedException;
@@ -47,7 +50,7 @@
*
* $Id$
*/
-public class SimpleConnectionManager implements ConnectionManager, ConnectionListener
+public class SimpleConnectionManager implements ConnectionManager, ConnectionListener, ClusterNotificationListener
{
// Constants ------------------------------------------------------------------------------------
@@ -60,13 +63,15 @@
// Attributes -----------------------------------------------------------------------------------
// Map<jmsClientVMID<String> - Map<remotingClientSessionID<String> - ConnectionEndpoint>>
- protected Map jmsClients;
+ private Map jmsClients;
// Map<remotingClientSessionID<String> - jmsClientVMID<String>
- protected Map remotingSessions;
+ private Map remotingSessions;
// Set<ConnectionEndpoint>
- protected Set activeConnectionEndpoints;
+ private Set activeConnectionEndpoints;
+
+ private Replicator replicator;
// Constructors ---------------------------------------------------------------------------------
@@ -130,9 +135,7 @@
}
return null;
}
-
-
public synchronized List getActiveConnections()
{
// I will make a copy to avoid ConcurrentModification
@@ -140,7 +143,7 @@
list.addAll(activeConnectionEndpoints);
return list;
}
-
+
public synchronized void handleClientFailure(String remotingSessionID, boolean clientToServer)
{
String jmsClientID = (String)remotingSessions.get(remotingSessionID);
@@ -158,47 +161,7 @@
"its connection(s) or there is a network problem. All connection resources " +
"corresponding to that client process will now be removed.");
- // Remoting only provides one pinger per invoker, not per connection therefore when the pinger
- // dies we must close ALL connections corresponding to that jms client ID.
-
- // TODO (ovidiu) In case the error was detected while trying to send a callback, I am assuming
- // that the whole TCP/IP connection is hosed, so I close everything that's on it. Is this
- // OK? Maybe we want to be a little bit more selective.
-
- Map endpoints = (Map)jmsClients.get(jmsClientID);
-
- if (endpoints != null)
- {
- List sces = new ArrayList();
-
- for(Iterator i = endpoints.entrySet().iterator(); i.hasNext(); )
- {
- Map.Entry entry = (Map.Entry)i.next();
- ConnectionEndpoint sce = (ConnectionEndpoint)entry.getValue();
- sces.add(sce);
- }
-
- // Now close the end points - this will result in a callback into unregisterConnection
- // to remove the data from the jmsClients and sessions maps.
- // Note we do this outside the loop to prevent ConcurrentModificationException
-
- for(Iterator i = sces.iterator(); i.hasNext(); )
- {
- ConnectionEndpoint sce = (ConnectionEndpoint)i.next();
-
- try
- {
- log.debug("clearing up state for connection " + sce);
- sce.closing();
- sce.close();
- log.debug("cleared up state for connection " + sce);
- }
- catch (JMSException e)
- {
- log.error("Failed to close connection", e);
- }
- }
- }
+ closeConsumersForClientVMID(jmsClientID);
}
// ConnectionListener implementation ------------------------------------------------------------
@@ -229,6 +192,48 @@
}
}
+ // FailoverWaiter implementation ---------------------------------------------------------------
+
+ public void notify(ClusterNotification notification)
+ {
+ if (notification.type == ClusterNotification.TYPE_FAILOVER_START)
+ {
+ try
+ {
+ //We remove any consumers with the same JVMID as the node that just failed
+ //This will remove any message suckers from a failed node
+ //This is important to workaround a remoting bug where sending messages down a broken connection
+ //can cause a deadlock with the bisocket transport
+
+ //Get the jvm id for the failed node
+
+ Map ids = replicator.get(Replicator.JVM_ID_KEY);
+
+ if (ids == null)
+ {
+ throw new IllegalStateException("Cannot find jvmid map");
+ }
+
+ int failedNodeID = notification.nodeID;
+
+ String clientVMID = (String)ids.get(new Integer(failedNodeID));
+
+ if (clientVMID == null)
+ {
+ throw new IllegalStateException("Cannot find clientVMID for failed node " + failedNodeID);
+ }
+
+ //Close the consumers corresponding to that VM
+
+ closeConsumersForClientVMID(clientVMID);
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to process failover start", e);
+ }
+ }
+ }
+
// MessagingComponent implementation ------------------------------------------------------------
public void start() throws Exception
@@ -258,6 +263,11 @@
{
return Collections.unmodifiableMap(jmsClients);
}
+
+ public void injectReplicator(Replicator replicator)
+ {
+ this.replicator = replicator;
+ }
public String toString()
@@ -271,8 +281,47 @@
// Private --------------------------------------------------------------------------------------
+ private synchronized void closeConsumersForClientVMID(String jmsClientID)
+ {
+ // Remoting only provides one pinger per invoker, not per connection therefore when the pinger
+ // dies we must close ALL connections corresponding to that jms client ID.
+ Map endpoints = (Map)jmsClients.get(jmsClientID);
+ if (endpoints != null)
+ {
+ List sces = new ArrayList();
+
+ for(Iterator i = endpoints.entrySet().iterator(); i.hasNext(); )
+ {
+ Map.Entry entry = (Map.Entry)i.next();
+ ConnectionEndpoint sce = (ConnectionEndpoint)entry.getValue();
+ sces.add(sce);
+ }
+
+ // Now close the end points - this will result in a callback into unregisterConnection
+ // to remove the data from the jmsClients and sessions maps.
+ // Note we do this outside the loop to prevent ConcurrentModificationException
+
+ for(Iterator i = sces.iterator(); i.hasNext(); )
+ {
+ ConnectionEndpoint sce = (ConnectionEndpoint)i.next();
+
+ try
+ {
+ log.debug("clearing up state for connection " + sce);
+ sce.closing();
+ sce.close();
+ log.debug("cleared up state for connection " + sce);
+ }
+ catch (JMSException e)
+ {
+ log.error("Failed to close connection", e);
+ }
+ }
+ }
+ }
+
// Inner classes --------------------------------------------------------------------------------
}
Modified: trunk/src/main/org/jboss/messaging/core/contract/Replicator.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/contract/Replicator.java 2007-07-04 18:32:01 UTC (rev 2839)
+++ trunk/src/main/org/jboss/messaging/core/contract/Replicator.java 2007-07-04 19:26:26 UTC (rev 2840)
@@ -45,6 +45,8 @@
public interface Replicator
{
public static final String CF_PREFIX = "CF_";
+
+ public static final String JVM_ID_KEY = "JVMID";
/**
* Broadcast data across the cluster, updating replication maps on all nodes, including the local
Modified: trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java 2007-07-04 18:32:01 UTC (rev 2839)
+++ trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java 2007-07-04 19:26:26 UTC (rev 2840)
@@ -47,6 +47,7 @@
import javax.sql.DataSource;
import javax.transaction.TransactionManager;
+import org.jboss.jms.client.container.JMSClientVMIdentifier;
import org.jboss.logging.Logger;
import org.jboss.messaging.core.contract.Binding;
import org.jboss.messaging.core.contract.ClusterNotification;
@@ -325,6 +326,9 @@
//calculate the failover map
calculateFailoverMap();
+ //add our vm identifier to the replicator
+ put(Replicator.JVM_ID_KEY, JMSClientVMIdentifier.instance);
+
groupMember.multicastControl(new JoinClusterRequest(thisNodeID, info), true);
}
Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java 2007-07-04 18:32:01 UTC (rev 2839)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java 2007-07-04 19:26:26 UTC (rev 2840)
@@ -25,18 +25,26 @@
import java.util.Map;
import java.util.Set;
-import javax.jms.*;
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
import org.jboss.jms.client.JBossConnection;
import org.jboss.jms.client.JBossConnectionFactory;
import org.jboss.jms.client.JBossMessageConsumer;
import org.jboss.jms.client.JBossSession;
-import org.jboss.jms.client.remoting.JMSRemotingConnection;
+import org.jboss.jms.client.delegate.ClientClusteredConnectionFactoryDelegate;
import org.jboss.jms.client.delegate.ClientConnectionDelegate;
import org.jboss.jms.client.delegate.ClientConnectionFactoryDelegate;
-import org.jboss.jms.client.delegate.ClientClusteredConnectionFactoryDelegate;
-import org.jboss.jms.client.delegate.DelegateSupport;
import org.jboss.jms.client.delegate.ClientSessionDelegate;
+import org.jboss.jms.client.remoting.JMSRemotingConnection;
import org.jboss.jms.client.state.ConnectionState;
import org.jboss.jms.client.state.ConsumerState;
import org.jboss.jms.client.state.SessionState;
More information about the jboss-cvs-commits
mailing list