[jboss-cvs] JBoss Messaging SVN: r3239 - in trunk/src/main/org/jboss: messaging/core/impl/postoffice and 1 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Oct 25 08:37:55 EDT 2007


Author: timfox
Date: 2007-10-25 08:37:55 -0400 (Thu, 25 Oct 2007)
New Revision: 3239

Modified:
   trunk/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java
   trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
   trunk/src/main/org/jboss/messaging/util/ClearableSemaphore.java
Log:
http://jira.jboss.com/jira/browse/JBMESSAGING-1116


Modified: trunk/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java	2007-10-21 23:29:00 UTC (rev 3238)
+++ trunk/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java	2007-10-25 12:37:55 UTC (rev 3239)
@@ -25,6 +25,7 @@
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -38,8 +39,8 @@
 import org.jboss.messaging.core.contract.ClusterNotification;
 import org.jboss.messaging.core.contract.ClusterNotificationListener;
 import org.jboss.messaging.core.contract.Replicator;
+import org.jboss.messaging.util.ConcurrentHashSet;
 import org.jboss.messaging.util.Util;
-import org.jboss.messaging.util.ConcurrentHashSet;
 import org.jboss.remoting.Client;
 import org.jboss.remoting.ClientDisconnectedException;
 import org.jboss.remoting.ConnectionListener;
@@ -70,7 +71,7 @@
    private Map<String, String> remotingSessions;
 
    // Set<ConnectionEndpoint>
-   private Set activeConnectionEndpoints;
+   private Set<ConnectionEndpoint> activeConnectionEndpoints;
 
    private Map</** CFUniqueName*/ String, ConnectionFactoryCallbackInformation> cfCallbackInfo;
    
@@ -80,14 +81,41 @@
 
    public SimpleConnectionManager()
    {
-      jmsClients = new HashMap();
-      remotingSessions = new HashMap();
-      activeConnectionEndpoints = new HashSet();
+      jmsClients = new HashMap<String, Map<String, ConnectionEndpoint>>();
+      remotingSessions = new HashMap<String, String>();
+      activeConnectionEndpoints = new HashSet<ConnectionEndpoint>();
       cfCallbackInfo = new ConcurrentHashMap<String, ConnectionFactoryCallbackInformation>();
    }
 
    // ConnectionManager implementation -------------------------------------------------------------
 
+   private void dump()
+   {
+   	log.debug("***********Dumping conn map");
+   	for (Iterator iter = jmsClients.entrySet().iterator(); iter.hasNext(); )
+   	{
+   		Map.Entry entry = (Map.Entry)iter.next();
+   		
+   		String jmsClientVMID = (String)entry.getKey();
+   		
+   		Map endpoints = (Map)entry.getValue();
+   		
+   		log.debug(jmsClientVMID + "----->");
+   		
+   		for (Iterator iter2 = endpoints.entrySet().iterator(); iter2.hasNext(); )
+      	{
+   			Map.Entry entry2 = (Map.Entry)iter2.next();
+   			
+   			String sessionID = (String)entry2.getKey();
+   			
+   			ConnectionEndpoint endpoint = (ConnectionEndpoint)entry2.getValue();
+   			
+   			log.debug("            " + sessionID + "------>" + System.identityHashCode(endpoint));
+      	}
+   	}
+   	log.debug("*** Dumped conn map");
+   }
+   
    public synchronized void registerConnection(String jmsClientVMID,
                                                String remotingClientSessionID,
                                                ConnectionEndpoint endpoint)
@@ -96,7 +124,8 @@
       
       if (endpoints == null)
       {
-         endpoints = new HashMap();
+         endpoints = new HashMap<String, ConnectionEndpoint>();
+         
          jmsClients.put(jmsClientVMID, endpoints);
       }
       
@@ -108,6 +137,8 @@
       
       log.debug("registered connection " + endpoint + " as " +
                 Util.guidToString(remotingClientSessionID));
+      
+      dump();
    }
 
    public synchronized ConnectionEndpoint unregisterConnection(String jmsClientVMId,
@@ -143,7 +174,7 @@
    public synchronized List getActiveConnections()
    {
       // I will make a copy to avoid ConcurrentModification
-      ArrayList list = new ArrayList();
+      List<ConnectionEndpoint> list = new ArrayList<ConnectionEndpoint>();
       list.addAll(activeConnectionEndpoints);
       return list;
    }
@@ -229,6 +260,8 @@
 
          log.trace("SimpleConnectionManager was notified about node leaving from node " +
                     notification.nodeID);
+         
+         dump();
          try
 			{
 				//We remove any consumers with the same JVMID as the node that just failed
@@ -249,7 +282,7 @@
 				int failedNodeID = notification.nodeID;
 				
 				String clientVMID = (String)ids.get(new Integer(failedNodeID));
-				
+					
 				if (clientVMID == null)
 				{
                log.error("Cannot find ClientVMID for failed node " + failedNodeID);
@@ -342,9 +375,9 @@
 
       if (endpoints != null)
       {
-         List<ConnectionEndpoint> sces = new ArrayList();
+         List<ConnectionEndpoint> sces = new ArrayList<ConnectionEndpoint>();
 
-         for(Map.Entry<String, ConnectionEndpoint> entry: endpoints.entrySet())
+         for (Map.Entry<String, ConnectionEndpoint> entry: endpoints.entrySet())
          {
             ConnectionEndpoint sce = entry.getValue();
             sces.add(sce);

Modified: trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2007-10-21 23:29:00 UTC (rev 3238)
+++ trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2007-10-25 12:37:55 UTC (rev 3239)
@@ -108,6 +108,8 @@
    
    public static final String FAILOVER_COMPLETED_NOTIFICATION = "FAILOVER_COMPLETED";
    
+   private static final long SEMAPHORE_ACQUIRE_TIMEOUT = 10000;
+   
    //End only used in testing
 
    // Static ---------------------------------------------------------------------------------------
@@ -351,8 +353,10 @@
 	      //calculate the failover map
 	      calculateFailoverMap();
 	      
+	      String clientVMId = JMSClientVMIdentifier.instance;
+	      
 	      //add our vm identifier to the replicator
-	      put(Replicator.JVM_ID_KEY, JMSClientVMIdentifier.instance);
+	      put(Replicator.JVM_ID_KEY, clientVMId);
 	      
 	      groupMember.multicastControl(new JoinClusterRequest(thisNodeID, info), true);
 	      
@@ -619,8 +623,15 @@
    	
    	if (reply)
    	{
-   		//replicateSemaphore.tryAcquire(250, TimeUnit.MILLISECONDS);
-   		replicateSemaphore.acquire();
+   		//We timeout to avoid locking the system in event of failure
+   		boolean ok = replicateSemaphore.tryAcquire(SEMAPHORE_ACQUIRE_TIMEOUT);
+   		
+   		if (!ok)
+   		{
+   			log.warn("Timed out trying to acquire replication semaphore");
+   			
+   			return;
+   		}
    	}
    	
    	try
@@ -886,7 +897,7 @@
    public void nodesLeft(List addresses) throws Throwable
    {
    	if (trace) { log.trace("Nodes left " + addresses.size()); }
-   	
+   	  	
    	checkStartReaper();
    	
    	Map oldFailoverMap = new HashMap(this.failoverMap);
@@ -898,6 +909,13 @@
    	calculateFailoverMap();
    	
       if (trace) { log.trace("First node is now " + firstNode); }
+      
+      if (firstNode)
+      {
+      	//If we are now the first node in the cluster then any outstanding replication requests will not get responses
+      	//so we must release these and we have no more need of a semaphore until another node joins
+      	replicateSemaphore.disable();
+      }
             
    	Iterator iter = addresses.iterator();
    	
@@ -1091,6 +1109,12 @@
    	
    	calculateFailoverMap();
    	
+   	if (wasFirstNode)
+   	{
+   		//If we were the first node but now another node has joined - we need to re-enable the semaphore
+   		replicateSemaphore.enable();
+   	}
+   	
    	//Note - when a node joins, we DO NOT send it replicated data - this is because it won't have deployed it's queues
    	//the data is requested by the new node when it deploys its queues      
    	
@@ -2780,8 +2804,6 @@
    	
    	if (trace) { log.trace("Failover node has changed from " + oldFailoverNodeID + " to " + failoverNodeID); }   	  	
    	
-   	replicateSemaphore.reset();
-   	
    	if (!firstNode)
    	{	   	
 	   	//If the old node still exists we need to send a message to remove any replicated deliveries

Modified: trunk/src/main/org/jboss/messaging/util/ClearableSemaphore.java
===================================================================
--- trunk/src/main/org/jboss/messaging/util/ClearableSemaphore.java	2007-10-21 23:29:00 UTC (rev 3238)
+++ trunk/src/main/org/jboss/messaging/util/ClearableSemaphore.java	2007-10-25 12:37:55 UTC (rev 3239)
@@ -1,6 +1,7 @@
 package org.jboss.messaging.util;
 
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 
 import org.jboss.logging.Logger;
 
@@ -42,25 +43,47 @@
 		}
 	}
 	
+	public boolean tryAcquire(long timeout) throws InterruptedException
+	{
+		Semaphore sem = semaphore;
+		
+		if (sem != null)
+		{
+			return sem.tryAcquire(timeout, TimeUnit.MILLISECONDS);
+		}
+		else
+		{
+			return true;
+		}
+	}
+	
 	public void release()
 	{
 		Semaphore sem = semaphore;
 		
 		if (sem != null)
 		{
-			sem.release();
+			sem.release();			
 		}
 	}
 	
-	public synchronized void reset()
+	public synchronized void disable()
 	{
 		if (semaphore != null)
 		{
 			Semaphore oldSem = semaphore;
 			
-			createSemaphore();
+			semaphore = null;
 			
 			oldSem.release(permits);
 		}
 	}
+	
+	public synchronized void enable()
+	{
+		if (semaphore == null)
+		{
+			createSemaphore();
+		}
+	}
 }




More information about the jboss-cvs-commits mailing list