[jboss-cvs] JBoss Messaging SVN: r8227 - branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Feb 16 10:39:18 EST 2011
Author: gaohoward
Date: 2011-02-16 10:39:17 -0500 (Wed, 16 Feb 2011)
New Revision: 8227
Modified:
branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
Log:
improvement: failover immediately for a normal node shutdown (if set so)
Modified: branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java 2011-02-15 17:38:16 UTC (rev 8226)
+++ branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java 2011-02-16 15:39:17 UTC (rev 8227)
@@ -1660,69 +1660,114 @@
throw new IllegalStateException(this + " cannot find node ID for address " + addr);
}
- boolean crashed = failoverOnNodeLeave || !leaveMessageReceived(leftNodeID);
-
+ boolean leaveReceived = leaveMessageReceived(leftNodeID);
+
+ boolean crashed = failoverOnNodeLeave || !leaveReceived;
+
log.debug(this + ": node " + leftNodeID + " has " + (crashed ? "crashed" : "cleanly left the group"));
Integer fnodeID = (Integer)oldFailoverMap.get(leftNodeID);
log.debug(this + " the failover node for the crashed node is " + fnodeID);
- //if its failover node is not in the new failoverMap, that means its failover node is quarantined with it
- Integer ffNode = (Integer)failoverMap.get(fnodeID);
- Integer currentFNode;
- while (ffNode == null)
+ if (!leaveReceived)
{
- //Note this requires the failover map must form a circle! otherwise this may become an infinite loop!
- currentFNode = fnodeID;
- //search old map
- fnodeID = (Integer)oldFailoverMap.get(currentFNode);
- //then new map
- ffNode = (Integer)failoverMap.get(fnodeID);
- }
+ log.info("Node " + leftNodeID + " didn't left normally, put it to suspected list.");
+
+ //if its failover node is not in the new failoverMap, that means its failover node is quarantined with it
+ Integer ffNode = (Integer)failoverMap.get(fnodeID);
+ Integer currentFNode;
+ while (ffNode == null)
+ {
+ //Note this requires the failover map must form a circle! otherwise this may become an infinite loop!
+ currentFNode = fnodeID;
+ //search old map
+ fnodeID = (Integer)oldFailoverMap.get(currentFNode);
+ //then new map
+ ffNode = (Integer)failoverMap.get(fnodeID);
+ }
- //now the fnodeID is an active node
- QuarantinedNode qNode = new QuarantinedNode(leftNodeID, fnodeID, crashed);
+ //now the fnodeID is an active node
+ QuarantinedNode qNode = new QuarantinedNode(leftNodeID, fnodeID, crashed);
- //now we need take care of those already quarantined nodes
- Iterator<Integer> iterQ = suspectedNodes.keySet().iterator();
- while (iterQ.hasNext())
- {
- QuarantinedNode aNode = suspectedNodes.get(iterQ.next());
- if (aNode.getFailover().equals(leftNodeID))
+ //now we need take care of those already quarantined nodes
+ Iterator<Integer> iterQ = suspectedNodes.keySet().iterator();
+ while (iterQ.hasNext())
{
- //We need to transfer the failover responsibility to the new active one
- aNode.setFailover(fnodeID);
+ QuarantinedNode aNode = suspectedNodes.get(iterQ.next());
+ if (aNode.getFailover().equals(leftNodeID))
+ {
+ //We need to transfer the failover responsibility to the new active one
+ aNode.setFailover(fnodeID);
+ }
}
- }
- suspectedNodes.put(leftNodeID, qNode);
+ suspectedNodes.put(leftNodeID, qNode);
- if (isFirstNode() && (clusterState.liveNodeNum() > 1))
- {
- try
+ if (isFirstNode() && (clusterState.liveNodeNum() > 1))
{
- clusterState.updateNodeState(thisNodeID, STATE_QUARANTINED);
+ try
+ {
+ clusterState.updateNodeState(thisNodeID, STATE_QUARANTINED);
+ }
+ catch (Exception e)
+ {
+ log.error("Having problem update myself. Shutting down to avoid message duplicated delivery.");
+ this.serverPeer.stopJBMNodeForRecovery();
+ return;
+ }
}
- catch (Exception e)
+
+ if (trace) { log.trace("Quarantined node: " + qNode); }
+ }
+ else
+ {
+ log.info("Node " + leftNodeID + " left normally, clean up now.");
+
+ boolean doneFailover = false;
+
+ ClusterNotification notification = new ClusterNotification(ClusterNotification.TYPE_NODE_LEAVE, leftNodeID.intValue(), null);
+
+ clusterNotifier.sendNotification(notification);
+
+ if (crashed && isSupportsFailover())
+ {
+ if (fnodeID == null)
+ {
+ throw new IllegalStateException("Cannot find failover node for node " + leftNodeID);
+ }
+
+ if (fnodeID.intValue() == thisNodeID)
+ {
+ // The node crashed and we are the failover node so let's perform failover
+
+ log.debug(this + ": I am the failover node for node " + leftNodeID + " that crashed");
+
+ performFailover(leftNodeID);
+
+ doneFailover = true;
+ }
+ }
+
+ if (!doneFailover)
{
- log.error("Having problem update myself. Shutting down to avoid message duplicated delivery.");
- this.serverPeer.stopJBMNodeForRecovery();
- return;
+ // Remove any replicant data and non durable bindings for the node - This will notify any listeners which will
+ // recalculate the connection factory delegates and failover delegates.
+
+ cleanDataForNode(leftNodeID);
}
}
- if (trace) { log.trace("Quarantined node: " + qNode); }
+ if (trace) {log.trace("First node: " + firstNode + " oldFailoverNodeID: " + oldFailoverNodeID + " failoverNodeID: " + failoverNodeID); }
+
+ if (oldFailoverNodeID != failoverNodeID)
+ {
+ //Failover node for this node has changed
+
+ failoverNodeChanged(oldFailoverNodeID, firstNode, false);
+ }
}
- if (trace) {log.trace("First node: " + firstNode + " oldFailoverNodeID: " + oldFailoverNodeID + " failoverNodeID: " + failoverNodeID); }
-
- if (oldFailoverNodeID != failoverNodeID)
- {
- //Failover node for this node has changed
- failoverNodeChanged(oldFailoverNodeID, firstNode, false);
- }
-
stateMonitor.newQuarantined();
}
More information about the jboss-cvs-commits
mailing list