[jboss-cvs] JBoss Messaging SVN: r8228 - branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Feb 16 10:40:26 EST 2011


Author: gaohoward
Date: 2011-02-16 10:40:25 -0500 (Wed, 16 Feb 2011)
New Revision: 8228

Modified:
   branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
Log:
improvement: clean up immediately for a normally shutdown node


Modified: branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2011-02-16 15:39:17 UTC (rev 8227)
+++ branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2011-02-16 15:40:25 UTC (rev 8228)
@@ -1587,17 +1587,11 @@
 
       int oldFailoverNodeID = failoverNodeID;
 
-      if (trace)
-      {
-         log.trace("Old failover node id: " + oldFailoverNodeID);
-      }
+      if (trace) { log.trace("Old failover node id: " + oldFailoverNodeID); }
 
       calculateFailoverMap();
 
-      if (trace)
-      {
-         log.trace("First node is now " + firstNode);
-      }
+      if (trace) { log.trace("First node is now " + firstNode); }
 
       if (firstNode && this.useJGroupsWorkaround)
       {
@@ -1607,6 +1601,7 @@
       }
 
       Iterator iter = addresses.iterator();
+      
       while (iter.hasNext())
       {
          Address addr = (Address)iter.next();
@@ -1618,7 +1613,9 @@
             throw new IllegalStateException(this + " cannot find node ID for address " + addr);
          }
 
-         boolean crashed = failoverOnNodeLeave || !leaveMessageReceived(leftNodeID);
+         boolean leaveReceived = leaveMessageReceived(leftNodeID);
+         
+         boolean crashed = failoverOnNodeLeave || !leaveReceived;
 
          log.debug(this + ": node " + leftNodeID + " has " + (crashed ? "crashed" : "cleanly left the group"));
 
@@ -1626,70 +1623,104 @@
 
          log.debug(this + " the failover node for the crashed node is " + fnodeID);
 
-         // if its failover node is not in the new failoverMap, that means its failover node is quarantined with it
-         Integer ffNode = (Integer)failoverMap.get(fnodeID);
-         Integer currentFNode;
-         while (ffNode == null)
+         if (!leaveReceived)
          {
-            // Note this requires the failover map must form a circle! otherwise this may become an infinite loop!
-            currentFNode = fnodeID;
-            // search old map
-            fnodeID = (Integer)oldFailoverMap.get(currentFNode);
-            // then new map
-            ffNode = (Integer)failoverMap.get(fnodeID);
-         }
+            log.info("Node " + leftNodeID + " didn't left normally, put it to suspected list.");
 
-         // now the fnodeID is an active node
-         QuarantinedNode qNode = new QuarantinedNode(leftNodeID, fnodeID, crashed);
+            // if its failover node is not in the new failoverMap, that means its failover node is quarantined with it
+            Integer ffNode = (Integer)failoverMap.get(fnodeID);
+            Integer currentFNode;
+            while (ffNode == null)
+            {
+               // Note this requires the failover map must form a circle! otherwise this may become an infinite loop!
+               currentFNode = fnodeID;
+               // search old map
+               fnodeID = (Integer)oldFailoverMap.get(currentFNode);
+               // then new map
+               ffNode = (Integer)failoverMap.get(fnodeID);
+            }
 
-         // now we need take care of those already quarantined nodes
-         Iterator<Integer> iterQ = suspectedNodes.keySet().iterator();
-         while (iterQ.hasNext())
-         {
-            QuarantinedNode aNode = suspectedNodes.get(iterQ.next());
-            if (aNode.getFailover().equals(leftNodeID))
+            // now the fnodeID is an active node
+            QuarantinedNode qNode = new QuarantinedNode(leftNodeID, fnodeID, crashed);
+
+            // now we need take care of those already quarantined nodes
+            Iterator<Integer> iterQ = suspectedNodes.keySet().iterator();
+            while (iterQ.hasNext())
             {
-               // We need to transfer the failover responsibility to the new active one
-               aNode.setFailover(fnodeID);
+               QuarantinedNode aNode = suspectedNodes.get(iterQ.next());
+               if (aNode.getFailover().equals(leftNodeID))
+               {
+                  // We need to transfer the failover responsibility to the new active one
+                  aNode.setFailover(fnodeID);
+               }
             }
+
+            suspectedNodes.put(leftNodeID, qNode);
+
+            if (isFirstNode() && (clusterState.liveNodeNum() > 1))
+            {
+               try
+               {
+                  clusterState.updateNodeState(thisNodeID, STATE_QUARANTINED);
+               }
+               catch (Exception e)
+               {
+                  log.error("Having problem update myself. Shutting down to avoid message duplicated delivery.");
+                  this.serverPeer.stopJBMNodeForRecovery();
+                  return;
+               }
+            }
+
+            if (trace) { log.trace("Quarantined node: " + qNode); }
          }
+         else
+         {
+            log.info("Node " + leftNodeID + " left normally, clean up now.");
 
-         suspectedNodes.put(leftNodeID, qNode);
+            boolean doneFailover = false;
 
-         if (isFirstNode() && (clusterState.liveNodeNum() > 1))
-         {
-            try
+            ClusterNotification notification = new ClusterNotification(ClusterNotification.TYPE_NODE_LEAVE, leftNodeID.intValue(), null);
+
+            clusterNotifier.sendNotification(notification);
+
+            if (crashed && isSupportsFailover())
             {
-               clusterState.updateNodeState(thisNodeID, STATE_QUARANTINED);
+               if (fnodeID == null)
+               {
+                  throw new IllegalStateException("Cannot find failover node for node " + leftNodeID);
+               }
+
+               if (fnodeID.intValue() == thisNodeID)
+               {
+                  // The node crashed and we are the failover node so let's perform failover
+
+                  log.debug(this + ": I am the failover node for node " + leftNodeID + " that crashed");
+
+                  performFailover(leftNodeID);
+
+                  doneFailover = true;
+               }
             }
-            catch (Exception e)
+
+            if (!doneFailover)
             {
-               log.error("Having problem update myself. Shutting down to avoid message duplicated delivery.");
-               this.serverPeer.stopJBMNodeForRecovery();
-               return;
+               // Remove any replicant data and non durable bindings for the node - This will notify any listeners which will
+               // recalculate the connection factory delegates and failover delegates.
+
+               cleanDataForNode(leftNodeID);
             }
          }
 
-         if (trace)
+         if (trace) { log.trace("First node: " + firstNode + " oldFailoverNodeID: " + oldFailoverNodeID + " failoverNodeID: " + failoverNodeID); }
+
+         if (oldFailoverNodeID != failoverNodeID)
          {
-            log.trace("Quarantined node: " + qNode);
+            // Failover node for this node has changed
+
+            failoverNodeChanged(oldFailoverNodeID, firstNode, false);
          }
       }
 
-      if (trace)
-      {
-         log.trace("First node: " + firstNode +
-                   " oldFailoverNodeID: " +
-                   oldFailoverNodeID +
-                   " failoverNodeID: " +
-                   failoverNodeID);
-      }
-
-      if (oldFailoverNodeID != failoverNodeID)
-      {
-         // Failover node for this node has changed
-         failoverNodeChanged(oldFailoverNodeID, firstNode, false);
-      }
       stateMonitor.newQuarantined();
    }
    



More information about the jboss-cvs-commits mailing list