[jboss-cvs] JBoss Messaging SVN: r8175 - branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Jan 19 10:34:31 EST 2011


Author: gaohoward
Date: 2011-01-19 10:34:30 -0500 (Wed, 19 Jan 2011)
New Revision: 8175

Modified:
   branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
Log:
save work


Modified: branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2011-01-19 15:12:20 UTC (rev 8174)
+++ branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2011-01-19 15:34:30 UTC (rev 8175)
@@ -1382,7 +1382,6 @@
       if (qNode == null) return;
       
       log.debug("A quarantined node " + qNode + " re-joined cluster.");
-
       
       //if I am the quarantined node, I do rejoin.
       if (clusterState.isQuarantined(thisNodeID))
@@ -1404,10 +1403,89 @@
    	}
    	else
    	{
-   	   //move old code here.
-   	   throw new IllegalStateException("Move the code here!!!");
+         Map oldFailoverMap = new HashMap(this.failoverMap);
+         
+         int oldFailoverNodeID = failoverNodeID;
+         
+         if (trace) { log.trace("Old failover node id: " + oldFailoverNodeID); }      
+         
+         calculateFailoverMap();
+         
+         if (trace) { log.trace("First node is now " + firstNode); }
+         
+         if (firstNode && this.useJGroupsWorkaround)
+         {
+            //If we are now the first node in the cluster then any outstanding replication requests will not get responses
+            //so we must release these and we have no more need of a semaphore until another node joins
+            replicateSemaphore.disable();
+         }
+               
+         Iterator iter = addresses.iterator();
+         
+         while (iter.hasNext())
+         {
+            Address address = (Address)iter.next();
+
+            log.debug(this + ": " + address + " left");
+
+            Integer leftNodeID = getNodeIDForSyncAddress(address);
+      
+            if (leftNodeID == null)
+            {
+               throw new IllegalStateException(this + " cannot find node ID for address " + address);
+            }
+            
+            boolean crashed = failoverOnNodeLeave || !leaveMessageReceived(leftNodeID);
+      
+            log.debug(this + ": node " + leftNodeID + " has " + (crashed ? "crashed" : "cleanly left the group"));
+         
+            Integer fnodeID = (Integer)oldFailoverMap.get(leftNodeID);
+         
+            log.debug(this + " the failover node for the crashed node is " + fnodeID);
+               
+            boolean doneFailover = false;
+            
+            ClusterNotification notification = new ClusterNotification(ClusterNotification.TYPE_NODE_LEAVE, leftNodeID.intValue(), null);
+            
+            clusterNotifier.sendNotification(notification);
+         
+            if (crashed && isSupportsFailover())
+            {        
+               if (fnodeID == null)
+               {
+                  throw new IllegalStateException("Cannot find failover node for node " + leftNodeID);
+               }
+               
+               if (fnodeID.intValue() == thisNodeID)
+               {
+                  // The node crashed and we are the failover node so let's perform failover
+         
+                  log.debug(this + ": I am the failover node for node " + leftNodeID + " that crashed");
+         
+                  performFailover(leftNodeID);
+                  
+                  doneFailover = true;
+               }
+            }
+         
+            if (!doneFailover)
+            {
+               // Remove any replicant data and non durable bindings for the node -  This will notify any listeners which will
+               // recalculate the connection factory delegates and failover delegates.
+         
+               cleanDataForNode(leftNodeID);
+            }
+         
+            if (trace) {log.trace("First node: " + firstNode + " oldFailoverNodeID: " + oldFailoverNodeID + " failoverNodeID: " + failoverNodeID); }
+            
+            if (oldFailoverNodeID != failoverNodeID)
+            {
+               //Failover node for this node has changed
+               
+               failoverNodeChanged(oldFailoverNodeID, firstNode, false);         
+            }
+         }
    	}
-	      
 	   sendJMXNotification(VIEW_CHANGED_NOTIFICATION);
    }
    



More information about the jboss-cvs-commits mailing list