[jboss-cvs] JBoss Messaging SVN: r8227 - branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Feb 16 10:39:18 EST 2011


Author: gaohoward
Date: 2011-02-16 10:39:17 -0500 (Wed, 16 Feb 2011)
New Revision: 8227

Modified:
   branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
Log:
improvement: failover immediately for a normal node shutdown (if set so)


Modified: branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2011-02-15 17:38:16 UTC (rev 8226)
+++ branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2011-02-16 15:39:17 UTC (rev 8227)
@@ -1660,69 +1660,114 @@
             throw new IllegalStateException(this + " cannot find node ID for address " + addr);
          }
          
-         boolean crashed = failoverOnNodeLeave || !leaveMessageReceived(leftNodeID);
-   
+         boolean leaveReceived = leaveMessageReceived(leftNodeID);
+         
+         boolean crashed = failoverOnNodeLeave || !leaveReceived;
+         
          log.debug(this + ": node " + leftNodeID + " has " + (crashed ? "crashed" : "cleanly left the group"));
       
          Integer fnodeID = (Integer)oldFailoverMap.get(leftNodeID);
       
          log.debug(this + " the failover node for the crashed node is " + fnodeID);
          
-         //if its failover node is not in the new failoverMap, that means its failover node is quarantined with it
-         Integer ffNode = (Integer)failoverMap.get(fnodeID);
-         Integer currentFNode;
-         while (ffNode == null)
+         if (!leaveReceived)
          {
-            //Note this requires the failover map must form a circle! otherwise this may become an infinite loop!
-            currentFNode = fnodeID;
-            //search old map
-            fnodeID = (Integer)oldFailoverMap.get(currentFNode);
-            //then new map
-            ffNode = (Integer)failoverMap.get(fnodeID);
-         }
+            log.info("Node " + leftNodeID + " didn't left normally, put it to suspected list.");
+            
+            //if its failover node is not in the new failoverMap, that means its failover node is quarantined with it
+            Integer ffNode = (Integer)failoverMap.get(fnodeID);
+            Integer currentFNode;
+            while (ffNode == null)
+            {
+               //Note this requires the failover map must form a circle! otherwise this may become an infinite loop!
+               currentFNode = fnodeID;
+               //search old map
+               fnodeID = (Integer)oldFailoverMap.get(currentFNode);
+               //then new map
+               ffNode = (Integer)failoverMap.get(fnodeID);
+            }
          
-         //now the fnodeID is an active node
-         QuarantinedNode qNode = new QuarantinedNode(leftNodeID, fnodeID, crashed);
+            //now the fnodeID is an active node
+            QuarantinedNode qNode = new QuarantinedNode(leftNodeID, fnodeID, crashed);
          
-         //now we need take care of those already quarantined nodes
-         Iterator<Integer> iterQ = suspectedNodes.keySet().iterator();
-         while (iterQ.hasNext())
-         {
-            QuarantinedNode aNode = suspectedNodes.get(iterQ.next());
-            if (aNode.getFailover().equals(leftNodeID))
+            //now we need take care of those already quarantined nodes
+            Iterator<Integer> iterQ = suspectedNodes.keySet().iterator();
+            while (iterQ.hasNext())
             {
-               //We need to transfer the failover responsibility to the new active one
-               aNode.setFailover(fnodeID);
+               QuarantinedNode aNode = suspectedNodes.get(iterQ.next());
+               if (aNode.getFailover().equals(leftNodeID))
+               {
+                  //We need to transfer the failover responsibility to the new active one
+                  aNode.setFailover(fnodeID);
+               }
             }
-         }
          
-         suspectedNodes.put(leftNodeID, qNode);
+            suspectedNodes.put(leftNodeID, qNode);
 
-         if (isFirstNode() && (clusterState.liveNodeNum() > 1))
-         {
-            try
+            if (isFirstNode() && (clusterState.liveNodeNum() > 1))
             {
-               clusterState.updateNodeState(thisNodeID, STATE_QUARANTINED);
+               try
+               {
+                  clusterState.updateNodeState(thisNodeID, STATE_QUARANTINED);
+               }
+               catch (Exception e)
+               {
+                  log.error("Having problem update myself. Shutting down to avoid message duplicated delivery.");
+                  this.serverPeer.stopJBMNodeForRecovery();
+                  return;
+               }
             }
-            catch (Exception e)
+
+            if (trace) { log.trace("Quarantined node: " + qNode); }
+         }
+         else
+         {
+            log.info("Node " + leftNodeID + " left normally, clean up now.");
+            
+            boolean doneFailover = false;
+            
+            ClusterNotification notification = new ClusterNotification(ClusterNotification.TYPE_NODE_LEAVE, leftNodeID.intValue(), null);
+            
+            clusterNotifier.sendNotification(notification);
+            
+            if (crashed && isSupportsFailover())
+            {        
+               if (fnodeID == null)
+               {
+                  throw new IllegalStateException("Cannot find failover node for node " + leftNodeID);
+               }
+               
+               if (fnodeID.intValue() == thisNodeID)
+               {
+                  // The node crashed and we are the failover node so let's perform failover
+         
+                  log.debug(this + ": I am the failover node for node " + leftNodeID + " that crashed");
+         
+                  performFailover(leftNodeID);
+                  
+                  doneFailover = true;
+               }
+            }
+         
+            if (!doneFailover)
             {
-               log.error("Having problem update myself. Shutting down to avoid message duplicated delivery.");
-               this.serverPeer.stopJBMNodeForRecovery();
-               return;
+               // Remove any replicant data and non durable bindings for the node -  This will notify any listeners which will
+               // recalculate the connection factory delegates and failover delegates.
+         
+               cleanDataForNode(leftNodeID);
             }
          }
          
-         if (trace) { log.trace("Quarantined node: " + qNode); }
+         if (trace) {log.trace("First node: " + firstNode + " oldFailoverNodeID: " + oldFailoverNodeID + " failoverNodeID: " + failoverNodeID); }
+         
+         if (oldFailoverNodeID != failoverNodeID)
+         {
+            //Failover node for this node has changed
+            
+            failoverNodeChanged(oldFailoverNodeID, firstNode, false);         
+         }
       }
 
-      if (trace) {log.trace("First node: " + firstNode + " oldFailoverNodeID: " + oldFailoverNodeID + " failoverNodeID: " + failoverNodeID); }
-      
-      if (oldFailoverNodeID != failoverNodeID)
-      {
-         //Failover node for this node has changed
-         failoverNodeChanged(oldFailoverNodeID, firstNode, false);         
-      }
-      
       stateMonitor.newQuarantined();      
    }
 



More information about the jboss-cvs-commits mailing list