[jboss-cvs] JBoss Messaging SVN: r8228 - branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Feb 16 10:40:26 EST 2011
Author: gaohoward
Date: 2011-02-16 10:40:25 -0500 (Wed, 16 Feb 2011)
New Revision: 8228
Modified:
branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
Log:
improvement: clean up immediately for a normally shutdown node
Modified: branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java 2011-02-16 15:39:17 UTC (rev 8227)
+++ branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java 2011-02-16 15:40:25 UTC (rev 8228)
@@ -1587,17 +1587,11 @@
int oldFailoverNodeID = failoverNodeID;
- if (trace)
- {
- log.trace("Old failover node id: " + oldFailoverNodeID);
- }
+ if (trace) { log.trace("Old failover node id: " + oldFailoverNodeID); }
calculateFailoverMap();
- if (trace)
- {
- log.trace("First node is now " + firstNode);
- }
+ if (trace) { log.trace("First node is now " + firstNode); }
if (firstNode && this.useJGroupsWorkaround)
{
@@ -1607,6 +1601,7 @@
}
Iterator iter = addresses.iterator();
+
while (iter.hasNext())
{
Address addr = (Address)iter.next();
@@ -1618,7 +1613,9 @@
throw new IllegalStateException(this + " cannot find node ID for address " + addr);
}
- boolean crashed = failoverOnNodeLeave || !leaveMessageReceived(leftNodeID);
+ boolean leaveReceived = leaveMessageReceived(leftNodeID);
+
+ boolean crashed = failoverOnNodeLeave || !leaveReceived;
log.debug(this + ": node " + leftNodeID + " has " + (crashed ? "crashed" : "cleanly left the group"));
@@ -1626,70 +1623,104 @@
log.debug(this + " the failover node for the crashed node is " + fnodeID);
- // if its failover node is not in the new failoverMap, that means its failover node is quarantined with it
- Integer ffNode = (Integer)failoverMap.get(fnodeID);
- Integer currentFNode;
- while (ffNode == null)
+ if (!leaveReceived)
{
- // Note this requires the failover map must form a circle! otherwise this may become an infinite loop!
- currentFNode = fnodeID;
- // search old map
- fnodeID = (Integer)oldFailoverMap.get(currentFNode);
- // then new map
- ffNode = (Integer)failoverMap.get(fnodeID);
- }
+ log.info("Node " + leftNodeID + " didn't left normally, put it to suspected list.");
- // now the fnodeID is an active node
- QuarantinedNode qNode = new QuarantinedNode(leftNodeID, fnodeID, crashed);
+ // if its failover node is not in the new failoverMap, that means its failover node is quarantined with it
+ Integer ffNode = (Integer)failoverMap.get(fnodeID);
+ Integer currentFNode;
+ while (ffNode == null)
+ {
+ // Note this requires the failover map must form a circle! otherwise this may become an infinite loop!
+ currentFNode = fnodeID;
+ // search old map
+ fnodeID = (Integer)oldFailoverMap.get(currentFNode);
+ // then new map
+ ffNode = (Integer)failoverMap.get(fnodeID);
+ }
- // now we need take care of those already quarantined nodes
- Iterator<Integer> iterQ = suspectedNodes.keySet().iterator();
- while (iterQ.hasNext())
- {
- QuarantinedNode aNode = suspectedNodes.get(iterQ.next());
- if (aNode.getFailover().equals(leftNodeID))
+ // now the fnodeID is an active node
+ QuarantinedNode qNode = new QuarantinedNode(leftNodeID, fnodeID, crashed);
+
+ // now we need take care of those already quarantined nodes
+ Iterator<Integer> iterQ = suspectedNodes.keySet().iterator();
+ while (iterQ.hasNext())
{
- // We need to transfer the failover responsibility to the new active one
- aNode.setFailover(fnodeID);
+ QuarantinedNode aNode = suspectedNodes.get(iterQ.next());
+ if (aNode.getFailover().equals(leftNodeID))
+ {
+ // We need to transfer the failover responsibility to the new active one
+ aNode.setFailover(fnodeID);
+ }
}
+
+ suspectedNodes.put(leftNodeID, qNode);
+
+ if (isFirstNode() && (clusterState.liveNodeNum() > 1))
+ {
+ try
+ {
+ clusterState.updateNodeState(thisNodeID, STATE_QUARANTINED);
+ }
+ catch (Exception e)
+ {
+ log.error("Having problem update myself. Shutting down to avoid message duplicated delivery.");
+ this.serverPeer.stopJBMNodeForRecovery();
+ return;
+ }
+ }
+
+ if (trace) { log.trace("Quarantined node: " + qNode); }
}
+ else
+ {
+ log.info("Node " + leftNodeID + " left normally, clean up now.");
- suspectedNodes.put(leftNodeID, qNode);
+ boolean doneFailover = false;
- if (isFirstNode() && (clusterState.liveNodeNum() > 1))
- {
- try
+ ClusterNotification notification = new ClusterNotification(ClusterNotification.TYPE_NODE_LEAVE, leftNodeID.intValue(), null);
+
+ clusterNotifier.sendNotification(notification);
+
+ if (crashed && isSupportsFailover())
{
- clusterState.updateNodeState(thisNodeID, STATE_QUARANTINED);
+ if (fnodeID == null)
+ {
+ throw new IllegalStateException("Cannot find failover node for node " + leftNodeID);
+ }
+
+ if (fnodeID.intValue() == thisNodeID)
+ {
+ // The node crashed and we are the failover node so let's perform failover
+
+ log.debug(this + ": I am the failover node for node " + leftNodeID + " that crashed");
+
+ performFailover(leftNodeID);
+
+ doneFailover = true;
+ }
}
- catch (Exception e)
+
+ if (!doneFailover)
{
- log.error("Having problem update myself. Shutting down to avoid message duplicated delivery.");
- this.serverPeer.stopJBMNodeForRecovery();
- return;
+ // Remove any replicant data and non durable bindings for the node - This will notify any listeners which will
+ // recalculate the connection factory delegates and failover delegates.
+
+ cleanDataForNode(leftNodeID);
}
}
- if (trace)
+ if (trace) { log.trace("First node: " + firstNode + " oldFailoverNodeID: " + oldFailoverNodeID + " failoverNodeID: " + failoverNodeID); }
+
+ if (oldFailoverNodeID != failoverNodeID)
{
- log.trace("Quarantined node: " + qNode);
+ // Failover node for this node has changed
+
+ failoverNodeChanged(oldFailoverNodeID, firstNode, false);
}
}
- if (trace)
- {
- log.trace("First node: " + firstNode +
- " oldFailoverNodeID: " +
- oldFailoverNodeID +
- " failoverNodeID: " +
- failoverNodeID);
- }
-
- if (oldFailoverNodeID != failoverNodeID)
- {
- // Failover node for this node has changed
- failoverNodeChanged(oldFailoverNodeID, firstNode, false);
- }
stateMonitor.newQuarantined();
}
More information about the jboss-cvs-commits
mailing list