[jboss-cvs] JBoss Messaging SVN: r3014 - in trunk/src/main/org/jboss: messaging/core/impl/postoffice and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Aug 15 11:25:52 EDT 2007


Author: timfox
Date: 2007-08-15 11:25:52 -0400 (Wed, 15 Aug 2007)
New Revision: 3014

Modified:
   trunk/src/main/org/jboss/jms/server/bridge/Bridge.java
   trunk/src/main/org/jboss/messaging/core/impl/postoffice/GroupListener.java
   trunk/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java
   trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
Log:
cleanup groupmember


Modified: trunk/src/main/org/jboss/jms/server/bridge/Bridge.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/bridge/Bridge.java	2007-08-15 12:08:02 UTC (rev 3013)
+++ trunk/src/main/org/jboss/jms/server/bridge/Bridge.java	2007-08-15 15:25:52 UTC (rev 3014)
@@ -48,7 +48,6 @@
 import org.jboss.logging.Logger;
 import org.jboss.messaging.core.contract.MessagingComponent;
 import org.jboss.tm.TransactionManagerLocator;
-import org.jboss.tm.TxManager;
 
 /**
  * 
@@ -808,11 +807,9 @@
          }
          
          //Sanity check
-         if (tm instanceof TxManager)
+         if (!(tm instanceof com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionManagerImple))
          {
-            log.warn("WARNING! The old JBoss transaction manager is being used. " +
-                     "This does not have XA transaction recovery functionality. " +
-                     "For XA transaction recovery please deploy the JBoss Transactions JTA/JTS implementation.");
+            log.warn("WARNING! The JBoss Transactions JTA transaction manager is not be used!");
          }
       }
       

Modified: trunk/src/main/org/jboss/messaging/core/impl/postoffice/GroupListener.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/postoffice/GroupListener.java	2007-08-15 12:08:02 UTC (rev 3013)
+++ trunk/src/main/org/jboss/messaging/core/impl/postoffice/GroupListener.java	2007-08-15 15:25:52 UTC (rev 3014)
@@ -21,6 +21,8 @@
  */
 package org.jboss.messaging.core.impl.postoffice;
 
+import java.util.List;
+
 import org.jgroups.Address;
 
 /**
@@ -33,7 +35,7 @@
  */
 interface GroupListener
 {
-	void nodeLeft(Address address) throws Throwable;
+	void nodesLeft(List addresses) throws Throwable;
 	
 	void nodeJoined(Address address) throws Exception;
 	

Modified: trunk/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java	2007-08-15 12:08:02 UTC (rev 3013)
+++ trunk/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java	2007-08-15 15:25:52 UTC (rev 3014)
@@ -25,7 +25,9 @@
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
+import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.List;
 
 import org.jboss.logging.Logger;
 import org.jboss.messaging.core.contract.JChannelFactory;
@@ -446,15 +448,19 @@
 
             if (oldView != null)
             {
+            	List leftNodes = new ArrayList();
                for (Iterator i = oldView.getMembers().iterator(); i.hasNext(); )
                {
                   Address address = (Address)i.next();
                   if (!newView.containsMember(address))
                   {
-                     // this is where the failover happens, if necessary
-                     groupListener.nodeLeft(address);
+                  	leftNodes.add(address);
                   }
                }
+               if (!leftNodes.isEmpty())
+               {
+               	groupListener.nodesLeft(leftNodes);
+               }
             }
 
             for (Iterator i = newView.getMembers().iterator(); i.hasNext(); )

Modified: trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2007-08-15 12:08:02 UTC (rev 3013)
+++ trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2007-08-15 15:25:52 UTC (rev 3014)
@@ -841,85 +841,88 @@
       
       // Currently does nothing
    }
-
-   /*
-    * A node has left the group
-    */
-   public void nodeLeft(Address address) throws Throwable
+   
+   public void nodesLeft(List addresses) throws Throwable
    {
-      log.debug(this + ": " + address + " left");
+   	if (trace) { log.trace("Nodes left " + addresses.size()); }
+   	
+   	Map oldFailoverMap = new HashMap(this.failoverMap);
+   	
+   	int oldFailoverNodeID = failoverNodeID;
+   	
+      if (trace) { log.trace("Old failover node id: " + oldFailoverNodeID); }      
+   	
+   	calculateFailoverMap();
+   	
+      if (trace) { log.trace("First node is now " + firstNode); }
+            
+   	Iterator iter = addresses.iterator();
+   	
+   	while (iter.hasNext())
+   	{
+   		Address address = (Address)iter.next();
 
-      Integer leftNodeID = getNodeIDForSyncAddress(address);
+         log.debug(this + ": " + address + " left");
 
-      if (leftNodeID == null)
-      {
-         throw new IllegalStateException(this + " cannot find node ID for address " + address);
-      }
-
-      boolean crashed = !leaveMessageReceived(leftNodeID);
-
-      log.debug(this + ": node " + leftNodeID + " has " + (crashed ? "crashed" : "cleanly left the group"));
+	      Integer leftNodeID = getNodeIDForSyncAddress(address);
+	
+	      if (leftNodeID == null)
+	      {
+	         throw new IllegalStateException(this + " cannot find node ID for address " + address);
+	      }
+	
+	      boolean crashed = !leaveMessageReceived(leftNodeID);
+	
+	      log.debug(this + ": node " + leftNodeID + " has " + (crashed ? "crashed" : "cleanly left the group"));
       
-      // Need to evaluate this before we regenerate the failover map	      
+	      Integer fnodeID = (Integer)oldFailoverMap.get(leftNodeID);
       
-      Integer fnodeID = (Integer)failoverMap.get(leftNodeID);
-      
-      log.debug(this + " the failover node for the crashed node is " + fnodeID);
+         log.debug(this + " the failover node for the crashed node is " + fnodeID);
 	         
-      //Recalculate the failover map
+	      boolean doneFailover = false;
+	      
+	      ClusterNotification notification = new ClusterNotification(ClusterNotification.TYPE_NODE_LEAVE, leftNodeID.intValue(), null);
+	      
+	      clusterNotifier.sendNotification(notification);
       
-      int oldFailoverNodeID = failoverNodeID;
+	      if (crashed && isSupportsFailover())
+	      {	      
+		      if (fnodeID == null)
+		      {
+		      	throw new IllegalStateException("Cannot find failover node for node " + leftNodeID);
+		      }
+		      
+		      if (fnodeID.intValue() == thisNodeID)
+		      {
+		         // The node crashed and we are the failover node so let's perform failover
+		
+		         log.debug(this + ": I am the failover node for node " + leftNodeID + " that crashed");
+		
+		         performFailover(leftNodeID);
+		         
+		         doneFailover = true;
+		      }
+	      }
       
-      if (trace) { log.trace("Old failover node id: " + oldFailoverNodeID); }
-      
-      calculateFailoverMap();
-      
-      if (trace) { log.trace("First node is now " + firstNode); }
-      
-      boolean doneFailover = false;
-      
-      ClusterNotification notification = new ClusterNotification(ClusterNotification.TYPE_NODE_LEAVE, leftNodeID.intValue(), null);
-      
-      clusterNotifier.sendNotification(notification);
-      
-      if (crashed && isSupportsFailover())
-      {	      
-
-	      if (fnodeID == null)
+	      if (!doneFailover)
 	      {
-	      	throw new IllegalStateException("Cannot find failover node for node " + leftNodeID);
+		      // Remove any replicant data and non durable bindings for the node -  This will notify any listeners which will
+		      // recalculate the connection factory delegates and failover delegates.
+		
+		      cleanDataForNode(leftNodeID);
 	      }
+      
+	      if (trace) {log.trace("First node: " + firstNode + " oldFailoverNodeID: " + oldFailoverNodeID + " failoverNodeID: " + failoverNodeID); }
 	      
-	      if (fnodeID.intValue() == thisNodeID)
+	      if (oldFailoverNodeID != failoverNodeID)
 	      {
-	         // The node crashed and we are the failover node so let's perform failover
-	
-	         log.debug(this + ": I am the failover node for node " + leftNodeID + " that crashed");
-	
-	         performFailover(leftNodeID);
-	         
-	         doneFailover = true;
+	      	//Failover node for this node has changed
+	      	
+	      	failoverNodeChanged(oldFailoverNodeID, firstNode, false);      	
 	      }
-      }
-      
-      if (!doneFailover)
-      {
-	      // Remove any replicant data and non durable bindings for the node -  This will notify any listeners which will
-	      // recalculate the connection factory delegates and failover delegates.
-	
-	      cleanDataForNode(leftNodeID);
-      }
-      
-      if (trace) {log.trace("First node: " + firstNode + " oldFailoverNodeID: " + oldFailoverNodeID + " failoverNodeID: " + failoverNodeID); }
-      
-      if (oldFailoverNodeID != failoverNodeID)
-      {
-      	//Failover node for this node has changed
-      	
-      	failoverNodeChanged(oldFailoverNodeID, firstNode, false);      	
-      }
-      
-      sendJMXNotification(VIEW_CHANGED_NOTIFICATION);
+   	}
+	      
+	   sendJMXNotification(VIEW_CHANGED_NOTIFICATION);
    }
    
    // RequestTarget implementation ------------------------------------------------------------
@@ -1855,66 +1858,63 @@
       return removed;
    }
    
-   private void calculateFailoverMap()
+   private synchronized void calculateFailoverMap()
    {
-   	synchronized (failoverMap)
+   	failoverMap.clear();
+   	
+   	View view = groupMember.getCurrentView();
+   	
+   	Vector members = view.getMembers();
+   	
+   	for (int i = 0; i < members.size(); i++)
    	{
-	   	failoverMap.clear();
-	   	
-	   	View view = groupMember.getCurrentView();
-	   	
-	   	Vector members = view.getMembers();
-	   	
-	   	for (int i = 0; i < members.size(); i++)
-	   	{
-	   		Address address = (Address)members.get(i);
-	   		
-	   		Integer theNodeID = findNodeIDForAddress(address);
-	   		
-	   		if (theNodeID == null)
-	   		{
-	   			throw new IllegalStateException("Cannot find node id for address " + address);
-	   		}
-	   		
-	   		int j;
-	   		
-	   		if (i != members.size() - 1)
-	   		{
-	   			j = i + 1;
-	   		}
-	   		else
-	   		{
-	   			j = 0;
-	   		}
-	   		
-	   		Address failoverAddress = (Address)members.get(j);
-	   		
-	   		Integer failoverNodeID = this.findNodeIDForAddress(failoverAddress);
-	   		
-	   		if (failoverNodeID == null)
-	   		{
-	   			throw new IllegalStateException("Cannot find node id for address " + failoverAddress);
-	   		}
-	   		
-	   		failoverMap.put(theNodeID, failoverNodeID);	   			   		   
-	   	}   	
-	   	
-	   	int fid = ((Integer)failoverMap.get(new Integer(thisNodeID))).intValue();
-	   	
-	   	//if we are the first node in the cluster we don't want to be our own failover node!
-	   	
-	   	if (fid == thisNodeID)
-	   	{
-	   		firstNode = true;
-	   		failoverNodeID = -1;
-	   	}
-	   	else
-	   	{
-	   		failoverNodeID = fid;
-	   		firstNode = false;	   		
-	   	}	   	
-   	}   
+   		Address address = (Address)members.get(i);
+   		
+   		Integer theNodeID = findNodeIDForAddress(address);
+   		
+   		if (theNodeID == null)
+   		{
+   			throw new IllegalStateException("Cannot find node id for address " + address);
+   		}
+   		
+   		int j;
+   		
+   		if (i != members.size() - 1)
+   		{
+   			j = i + 1;
+   		}
+   		else
+   		{
+   			j = 0;
+   		}
+   		
+   		Address failoverAddress = (Address)members.get(j);
+   		
+   		Integer failoverNodeID = this.findNodeIDForAddress(failoverAddress);
+   		
+   		if (failoverNodeID == null)
+   		{
+   			throw new IllegalStateException("Cannot find node id for address " + failoverAddress);
+   		}
+   		
+   		failoverMap.put(theNodeID, failoverNodeID);	   			   		   
+   	}   	
    	
+   	int fid = ((Integer)failoverMap.get(new Integer(thisNodeID))).intValue();
+   	
+   	//if we are the first node in the cluster we don't want to be our own failover node!
+   	
+   	if (fid == thisNodeID)
+   	{
+   		firstNode = true;
+   		failoverNodeID = -1;
+   	}
+   	else
+   	{
+   		failoverNodeID = fid;
+   		firstNode = false;	   		
+   	}	   	   	 
+   	
       log.debug("Updated failover map:\n" + dumpFailoverMap(failoverMap));   	      
    }
    




More information about the jboss-cvs-commits mailing list