[jboss-cvs] JBoss Messaging SVN: r2840 - in trunk: src/main/org/jboss/jms/server/connectionmanager and 3 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Jul 4 15:26:26 EDT 2007


Author: timfox
Date: 2007-07-04 15:26:26 -0400 (Wed, 04 Jul 2007)
New Revision: 2840

Modified:
   trunk/src/main/org/jboss/jms/server/ServerPeer.java
   trunk/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java
   trunk/src/main/org/jboss/messaging/core/contract/Replicator.java
   trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
   trunk/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java
Log:
Fixed remoting deadlock issue


Modified: trunk/src/main/org/jboss/jms/server/ServerPeer.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/ServerPeer.java	2007-07-04 18:32:01 UTC (rev 2839)
+++ trunk/src/main/org/jboss/jms/server/ServerPeer.java	2007-07-04 19:26:26 UTC (rev 2840)
@@ -150,7 +150,7 @@
    private SecurityMetadataStore securityStore;
    private ConnectionFactoryJNDIMapper connFactoryJNDIMapper;
    private TransactionRepository txRepository;
-   private ConnectionManager connectionManager;
+   private SimpleConnectionManager connectionManager;
    private ConnectorManager connectorManager;
    private IDManager messageIDManager;
    private IDManager channelIDManager;
@@ -1230,6 +1230,8 @@
             this.clusterConnectionManager.injectPostOffice(postOffice);
             
             this.clusterConnectionManager.injectReplicator((Replicator)postOffice);
+            
+            this.connectionManager.injectReplicator((Replicator)postOffice);
          }
          
          // Also need to inject into txRepository

Modified: trunk/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java	2007-07-04 18:32:01 UTC (rev 2839)
+++ trunk/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java	2007-07-04 19:26:26 UTC (rev 2840)
@@ -35,6 +35,9 @@
 import org.jboss.jms.delegate.ConnectionEndpoint;
 import org.jboss.jms.server.ConnectionManager;
 import org.jboss.logging.Logger;
+import org.jboss.messaging.core.contract.ClusterNotification;
+import org.jboss.messaging.core.contract.ClusterNotificationListener;
+import org.jboss.messaging.core.contract.Replicator;
 import org.jboss.messaging.util.Util;
 import org.jboss.remoting.Client;
 import org.jboss.remoting.ClientDisconnectedException;
@@ -47,7 +50,7 @@
  *
  * $Id$
  */
-public class SimpleConnectionManager implements ConnectionManager, ConnectionListener
+public class SimpleConnectionManager implements ConnectionManager, ConnectionListener, ClusterNotificationListener
 {
    // Constants ------------------------------------------------------------------------------------
 
@@ -60,13 +63,15 @@
    // Attributes -----------------------------------------------------------------------------------
 
    // Map<jmsClientVMID<String> - Map<remotingClientSessionID<String> - ConnectionEndpoint>>
-   protected Map jmsClients;
+   private Map jmsClients;
 
    // Map<remotingClientSessionID<String> - jmsClientVMID<String>
-   protected Map remotingSessions;
+   private Map remotingSessions;
 
    // Set<ConnectionEndpoint>
-   protected Set activeConnectionEndpoints;
+   private Set activeConnectionEndpoints;
+   
+   private Replicator replicator;
 
    // Constructors ---------------------------------------------------------------------------------
 
@@ -130,9 +135,7 @@
       }
       return null;
    }
-
    
-
    public synchronized List getActiveConnections()
    {
       // I will make a copy to avoid ConcurrentModification
@@ -140,7 +143,7 @@
       list.addAll(activeConnectionEndpoints);
       return list;
    }
-   
+      
    public synchronized void handleClientFailure(String remotingSessionID, boolean clientToServer)
    {
       String jmsClientID = (String)remotingSessions.get(remotingSessionID);
@@ -158,47 +161,7 @@
          "its connection(s) or there is a network problem. All connection resources " +
          "corresponding to that client process will now be removed.");
 
-      // Remoting only provides one pinger per invoker, not per connection therefore when the pinger
-      // dies we must close ALL connections corresponding to that jms client ID.
-
-      // TODO (ovidiu) In case the error was detected while trying to send a callback, I am assuming
-      //      that the whole TCP/IP connection is hosed, so I close everything that's on it. Is this
-      //      OK? Maybe we want to be a little bit more selective.
-
-      Map endpoints = (Map)jmsClients.get(jmsClientID);
-
-      if (endpoints != null)
-      {
-         List sces = new ArrayList();
-
-         for(Iterator i = endpoints.entrySet().iterator(); i.hasNext(); )
-         {
-            Map.Entry entry = (Map.Entry)i.next();
-            ConnectionEndpoint sce = (ConnectionEndpoint)entry.getValue();
-            sces.add(sce);
-         }
-
-         // Now close the end points - this will result in a callback into unregisterConnection
-         // to remove the data from the jmsClients and sessions maps.
-         // Note we do this outside the loop to prevent ConcurrentModificationException
-
-         for(Iterator i = sces.iterator(); i.hasNext(); )
-         {
-            ConnectionEndpoint sce = (ConnectionEndpoint)i.next();
-
-            try
-            {
-      			log.debug("clearing up state for connection " + sce);
-               sce.closing();
-               sce.close();
-               log.debug("cleared up state for connection " + sce);
-            }
-            catch (JMSException e)
-            {
-               log.error("Failed to close connection", e);
-            }          
-         }
-      }
+      closeConsumersForClientVMID(jmsClientID);
    }
    
    // ConnectionListener implementation ------------------------------------------------------------
@@ -229,6 +192,48 @@
       }
    }
    
+   // FailoverWaiter implementation ---------------------------------------------------------------
+   
+   public void notify(ClusterNotification notification)
+	{
+		if (notification.type == ClusterNotification.TYPE_FAILOVER_START)
+		{
+			try
+			{
+				//We remove any consumers with the same JVMID as the node that just failed
+				//This will remove any message suckers from a failed node
+				//This is important to workaround a remoting bug where sending messages down a broken connection
+				//can cause a deadlock with the bisocket transport
+				
+				//Get the jvm id for the failed node
+				
+				Map ids = replicator.get(Replicator.JVM_ID_KEY);
+				
+				if (ids == null)
+				{
+					throw new IllegalStateException("Cannot find jvmid map");
+				}
+				
+				int failedNodeID = notification.nodeID;
+				
+				String clientVMID = (String)ids.get(new Integer(failedNodeID));
+				
+				if (clientVMID == null)
+				{
+					throw new IllegalStateException("Cannot find clientVMID for failed node " + failedNodeID);
+				}
+				
+				//Close the consumers corresponding to that VM
+				
+				closeConsumersForClientVMID(clientVMID);
+			}
+			catch (Exception e)
+			{
+				log.error("Failed to process failover start", e);
+			}
+		}		
+	}
+   
    // MessagingComponent implementation ------------------------------------------------------------
    
    public void start() throws Exception
@@ -258,6 +263,11 @@
    {
       return Collections.unmodifiableMap(jmsClients);
    }
+   
+   public void injectReplicator(Replicator replicator)
+   {
+   	this.replicator = replicator;
+   }
 
 
    public String toString()
@@ -271,8 +281,47 @@
 
    // Private --------------------------------------------------------------------------------------
    
+   private synchronized void closeConsumersForClientVMID(String jmsClientID)
+   {
+   	// Remoting only provides one pinger per invoker, not per connection therefore when the pinger
+      // dies we must close ALL connections corresponding to that jms client ID.
 
+      Map endpoints = (Map)jmsClients.get(jmsClientID);
 
+      if (endpoints != null)
+      {
+         List sces = new ArrayList();
+
+         for(Iterator i = endpoints.entrySet().iterator(); i.hasNext(); )
+         {
+            Map.Entry entry = (Map.Entry)i.next();
+            ConnectionEndpoint sce = (ConnectionEndpoint)entry.getValue();
+            sces.add(sce);
+         }
+
+         // Now close the end points - this will result in a callback into unregisterConnection
+         // to remove the data from the jmsClients and sessions maps.
+         // Note we do this outside the loop to prevent ConcurrentModificationException
+
+         for(Iterator i = sces.iterator(); i.hasNext(); )
+         {
+            ConnectionEndpoint sce = (ConnectionEndpoint)i.next();
+
+            try
+            {
+      			log.debug("clearing up state for connection " + sce);
+               sce.closing();
+               sce.close();
+               log.debug("cleared up state for connection " + sce);
+            }
+            catch (JMSException e)
+            {
+               log.error("Failed to close connection", e);
+            }          
+         }
+      }
+   }
+
    // Inner classes --------------------------------------------------------------------------------
 
 }

Modified: trunk/src/main/org/jboss/messaging/core/contract/Replicator.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/contract/Replicator.java	2007-07-04 18:32:01 UTC (rev 2839)
+++ trunk/src/main/org/jboss/messaging/core/contract/Replicator.java	2007-07-04 19:26:26 UTC (rev 2840)
@@ -45,6 +45,8 @@
 public interface Replicator
 {
    public static final String CF_PREFIX = "CF_";
+   
+   public static final String JVM_ID_KEY = "JVMID";
  
    /**
     * Broadcast data across the cluster, updating replication maps on all nodes, including the local

Modified: trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2007-07-04 18:32:01 UTC (rev 2839)
+++ trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2007-07-04 19:26:26 UTC (rev 2840)
@@ -47,6 +47,7 @@
 import javax.sql.DataSource;
 import javax.transaction.TransactionManager;
 
+import org.jboss.jms.client.container.JMSClientVMIdentifier;
 import org.jboss.logging.Logger;
 import org.jboss.messaging.core.contract.Binding;
 import org.jboss.messaging.core.contract.ClusterNotification;
@@ -325,6 +326,9 @@
 	      //calculate the failover map
 	      calculateFailoverMap();
 	      
+	      //add our vm identifier to the replicator
+	      put(Replicator.JVM_ID_KEY, JMSClientVMIdentifier.instance);
+	      
 	      groupMember.multicastControl(new JoinClusterRequest(thisNodeID, info), true);
       }
    

Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java	2007-07-04 18:32:01 UTC (rev 2839)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java	2007-07-04 19:26:26 UTC (rev 2840)
@@ -25,18 +25,26 @@
 import java.util.Map;
 import java.util.Set;
 
-import javax.jms.*;
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
 
 import org.jboss.jms.client.JBossConnection;
 import org.jboss.jms.client.JBossConnectionFactory;
 import org.jboss.jms.client.JBossMessageConsumer;
 import org.jboss.jms.client.JBossSession;
-import org.jboss.jms.client.remoting.JMSRemotingConnection;
+import org.jboss.jms.client.delegate.ClientClusteredConnectionFactoryDelegate;
 import org.jboss.jms.client.delegate.ClientConnectionDelegate;
 import org.jboss.jms.client.delegate.ClientConnectionFactoryDelegate;
-import org.jboss.jms.client.delegate.ClientClusteredConnectionFactoryDelegate;
-import org.jboss.jms.client.delegate.DelegateSupport;
 import org.jboss.jms.client.delegate.ClientSessionDelegate;
+import org.jboss.jms.client.remoting.JMSRemotingConnection;
 import org.jboss.jms.client.state.ConnectionState;
 import org.jboss.jms.client.state.ConsumerState;
 import org.jboss.jms.client.state.SessionState;




More information about the jboss-cvs-commits mailing list