[jboss-cvs] JBoss Messaging SVN: r8170 - in branches/JBM1842: src/main/org/jboss/messaging/core/impl/postoffice and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Jan 18 13:00:48 EST 2011
Author: gaohoward
Date: 2011-01-18 13:00:47 -0500 (Tue, 18 Jan 2011)
New Revision: 8170
Added:
branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice/NodeDeadRequest.java
branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice/QuarantinedNode.java
Modified:
branches/JBM1842/integration/EAP4/etc/server/default/deploy/mysql-persistence-service.xml
branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
Log:
save work
Modified: branches/JBM1842/integration/EAP4/etc/server/default/deploy/mysql-persistence-service.xml
===================================================================
--- branches/JBM1842/integration/EAP4/etc/server/default/deploy/mysql-persistence-service.xml 2011-01-17 16:04:33 UTC (rev 8169)
+++ branches/JBM1842/integration/EAP4/etc/server/default/deploy/mysql-persistence-service.xml 2011-01-18 18:00:47 UTC (rev 8170)
@@ -132,6 +132,8 @@
LOAD_BINDINGS=SELECT QUEUE_NAME, COND, SELECTOR, CHANNEL_ID, CLUSTERED, ALL_NODES FROM JBM_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=?
CREATE_CLUSTER_STATE_TABLE=CREATE TABLE JBM_CLUSTER (NODE_ID INTEGER, PING_TIMESTAMP BIGINT, STATE INTEGER, PRIMARY KEY(NODE_ID)))
UPDATE_STATE=UPDATE JBM_CLUSTER SET STATE = ? WHERE NODE_ID = ?
+UPDATE_TIMESTAMP=UPDATE JBM_CLUSTER SET PING_TIMESTAMP = ? WHERE NODE_ID = ?
+LOAD_CLUSTER_STATE=SELECT NODE_ID, PING_TIMESTAMP, STATE FROM JBM_CLUSTER
]]></attribute>
<!-- This post office is non clustered. If you want a clustered post office then set to true -->
Modified: branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java 2011-01-17 16:04:33 UTC (rev 8169)
+++ branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java 2011-01-18 18:00:47 UTC (rev 8170)
@@ -259,6 +259,13 @@
private StateMonitor stateMonitor = null;
+ private Map<Integer, QuarantinedNode> quarantinedNodes = new java.util.concurrent.ConcurrentHashMap<Integer, QuarantinedNode>();
+
+ private ClusterState clusterState = new ClusterState();
+
+ private long refreshPeriod = 30000;
+
+
// Constructors ---------------------------------------------------------------------------------
public boolean isFailoverOnNodeLeave()
@@ -442,12 +449,12 @@
}
//this method will trigger a dedicated thread to write time stamp
- private synchronized void changeState(int newState)
+ private synchronized void changeState(int newState) throws Exception
{
if (state != newState)
{
this.state = newState;
- this.updateStateInStorage(state);
+ this.updateStateInStorage(thisNodeID, state);
if (stateMonitor == null)
{
@@ -457,13 +464,13 @@
}
}
- private void updateStateInStorage(final int newState) throws Exception
+ private void updateStateInStorage(final int nID, final int newState) throws Exception
{
if (ds == null)
{
return;
}
- class InsertBindings extends JDBCTxRunner
+ class UpdateState extends JDBCTxRunner
{
public Object doTransaction() throws Exception
{
@@ -474,9 +481,151 @@
ps = conn.prepareStatement(getSQLStatement("UPDATE_STATE"));
ps.setInt(1, newState);
+ ps.setInt(2, nID);
+
+ ps.executeUpdate();
+ }
+ finally
+ {
+ closeStatement(ps);
+ }
+ return null;
+ }
+ }
+ new UpdateState().executeWithRetry();
+ }
+
+ private void cleanUpQuarantinedNode(final Integer qNode) throws Exception
+ {
+ QuarantinedNode node = quarantinedNodes.remove(qNode);
+ if (node == null)
+ {
+ log.warn("Cannot find the quarantined node.");
+ }
+
+ //clean from DB
+ if (ds != null)
+ {
+ class CleanupDeadNode extends JDBCTxRunner
+ {
+ public Object doTransaction() throws Exception
+ {
+ PreparedStatement ps = null;
+
+ try
+ {
+ ps = conn.prepareStatement(getSQLStatement("DELETE_DEAD_NODE"));
+ ps.setInt(1, qNode);
+
+ ps.executeUpdate();
+ }
+ finally
+ {
+ closeStatement(ps);
+ }
+ return null;
+ }
+ }
+ new CleanupDeadNode().executeWithRetry();
+ }
+
+ //broadcast
+ NodeDeadRequest request = new NodeDeadRequest(qNode);
+
+ groupMember.multicastControl(request, true);
+
+ }
+
+ /*
+ * 1. for each dead node, clean up.
+ * 2. for its dead buddy, failover
+ * 3. if itself being quarantined, becomes standalone.
+ */
+ private void processClusterState() throws Exception
+ {
+ Iterator<Integer> iter = quarantinedNodes.keySet().iterator();
+ while (iter.hasNext())
+ {
+ Integer qNodeID = iter.next();
+ if (clusterState.isNodeDead(qNodeID))
+ {
+ Integer foNodeID = (Integer)failoverMap.get(qNodeID);
+ if (foNodeID == null)
+ {
+ //that doesn't mean it hadn't one before, possible cases:
+ // 1 its failover node was also quarantined.
+ // 2 its failover node was already dead.
+ // 3 really messed up.
+ //for 1 and 2, we maintain the relationship such that each quarantined node
+ //points to a node in an active node in the cluster.
+ throw new IllegalStateException("Cannot find failover node for node " + qNodeID);
+ }
+
+ if (foNodeID.intValue() == thisNodeID)
+ {
+ //I am the failover node for the dead, perform failover now
+ if (quarantinedNodes.get(qNodeID).shouldFailover() && isSupportsFailover())
+ {
+ log.debug(this + ": I am the failover node for node " + qNodeID + " that crashed");
+
+ performFailover(qNodeID);
+
+ //now clean up the quarantined set
+ cleanUpQuarantinedNode(qNodeID);
+ }
+ }
+ else
+ {
+ //I am not the failover node for the dead, clean up now. But don't delete it from quarantined map
+ //nor from the DB. we wait for it to be failed over. Two possible cases:
+ //1 this node later becomes the failover for the dead node, so this node cleans up it.
+ //2 other node failed over the dead node, this node get notified and clean up it.
+ }
+ }
+ }
+
+ if (clusterState.isQuarantined(thisNodeID))
+ {
+ //I am quarantined, work as standalone now
+ }
+ }
+
+ //timestamp and query the new state from db
+ private void refreshNodeState() throws Exception
+ {
+ if (ds == null)
+ {
+ return;
+ }
+
+ class RefreshNodeState extends JDBCTxRunner
+ {
+ public Object doTransaction() throws Exception
+ {
+ PreparedStatement ps = null;
+
+ try
+ {
+ //writing timestamp
+ ps = conn.prepareStatement(getSQLStatement("UPDATE_TIMESTAMP"));
+
+ ps.setLong(1, System.currentTimeMillis());
ps.setInt(2, thisNodeID);
ps.executeUpdate();
+
+ //collect states
+ ps = conn.prepareStatement(getSQLStatement("LOAD_CLUSTER_STATE"));
+ ResultSet result = ps.executeQuery();
+
+ while (result.next())
+ {
+ int nodeID = result.getInt(1);
+ long timestamp = result.getLong(2);
+ int nodeState = result.getInt(3);
+
+ clusterState.addNode(nodeID, timestamp, nodeState);
+ }
}
finally
{
@@ -485,12 +634,18 @@
return null;
}
}
- new InsertBindings().executeWithRetry();
+ new RefreshNodeState().executeWithRetry();
}
public void stop() throws Exception
{
stopViewUpdate();
+
+ if (this.stateMonitor != null)
+ {
+ stateMonitor.shutdown();
+ stateMonitor = null;
+ }
synchronized (this)
{
@@ -1192,118 +1347,39 @@
public void nodesLeft(List addresses) throws Throwable
{
if (trace) { log.trace("Nodes left " + addresses.size()); }
-
- Map oldFailoverMap = new HashMap(this.failoverMap);
-
- int oldFailoverNodeID = failoverNodeID;
-
- if (trace) { log.trace("Old failover node id: " + oldFailoverNodeID); }
-
- //before doing anything, quarantine the nodes
+
+ //quarantine the nodes
quarantine(addresses);
-
- calculateFailoverMap();
-
- if (trace) { log.trace("First node is now " + firstNode); }
-
- if (firstNode && this.useJGroupsWorkaround)
- {
- //If we are now the first node in the cluster then any outstanding replication requests will not get responses
- //so we must release these and we have no more need of a semaphore until another node joins
- replicateSemaphore.disable();
- }
-
- Iterator iter = addresses.iterator();
-
- while (iter.hasNext())
- {
- Address address = (Address)iter.next();
-
- log.debug(this + ": " + address + " left");
-
- Integer leftNodeID = getNodeIDForSyncAddress(address);
-
- if (leftNodeID == null)
- {
- throw new IllegalStateException(this + " cannot find node ID for address " + address);
- }
- boolean crashed = failoverOnNodeLeave || !leaveMessageReceived(leftNodeID);
- boolean isolated = false;
-
- //if not really dead, don't do failover
- if (crashed)
- {
- isolated = checkNodeDead(leftNodeID);
- }
-
- log.debug(this + ": node " + leftNodeID + " has " + (crashed ? "crashed" : "cleanly left the group"));
-
- Integer fnodeID = (Integer)oldFailoverMap.get(leftNodeID);
-
- log.debug(this + " the failover node for the crashed node is " + fnodeID);
-
- boolean doneFailover = false;
-
- ClusterNotification notification = new ClusterNotification(ClusterNotification.TYPE_NODE_LEAVE, leftNodeID.intValue(), null);
-
- clusterNotifier.sendNotification(notification);
-
- if (crashed && isSupportsFailover())
- {
- if (fnodeID == null)
- {
- throw new IllegalStateException("Cannot find failover node for node " + leftNodeID);
- }
-
- if (fnodeID.intValue() == thisNodeID)
- {
- // The node crashed and we are the failover node so let's perform failover
-
- log.debug(this + ": I am the failover node for node " + leftNodeID + " that crashed");
-
- //if the left node is still alive, don't failover
- if (!isolated)
- {
- performFailover(leftNodeID);
-
- doneFailover = true;
- }
- else
- {
- log.debug(this + ": won't failover for the node " + leftNodeID + " as it is still alive");
- }
- }
- }
-
- if (!doneFailover && crashed)
- {
- // Remove any replicant data and non durable bindings for the node - This will notify any listeners which will
- // recalculate the connection factory delegates and failover delegates.
-
- cleanDataForNode(leftNodeID);
- }
-
- if (trace) {log.trace("First node: " + firstNode + " oldFailoverNodeID: " + oldFailoverNodeID + " failoverNodeID: " + failoverNodeID); }
-
- if (oldFailoverNodeID != failoverNodeID)
- {
- //Failover node for this node has changed
-
- failoverNodeChanged(oldFailoverNodeID, firstNode, false);
- }
- }
-
sendJMXNotification(VIEW_CHANGED_NOTIFICATION);
}
/**
* For each node, update its state to be STATE_QUARANTINED
* then store the information for later failover.
+ * @throws Exception
*/
- private void quarantine(List addresses)
+ private void quarantine(List addresses) throws Exception
{
+ Map oldFailoverMap = new HashMap(this.failoverMap);
+
+ int oldFailoverNodeID = failoverNodeID;
+
+ if (trace) { log.trace("Old failover node id: " + oldFailoverNodeID); }
+
+ calculateFailoverMap();
+
+ if (trace) { log.trace("First node is now " + firstNode); }
+
+ if (firstNode && this.useJGroupsWorkaround)
+ {
+ //If we are now the first node in the cluster then any outstanding replication requests will not get responses
+ //so we must release these and we have no more need of a semaphore until another node joins
+ replicateSemaphore.disable();
+ }
+
Iterator iter = addresses.iterator();
+
while (iter.hasNext())
{
Address addr = (Address)iter.next();
@@ -1316,71 +1392,64 @@
}
boolean crashed = failoverOnNodeLeave || !leaveMessageReceived(leftNodeID);
- boolean isolated = false;
-
- //if not really dead, don't do failover
- if (crashed)
- {
- isolated = checkNodeDead(leftNodeID);
- }
log.debug(this + ": node " + leftNodeID + " has " + (crashed ? "crashed" : "cleanly left the group"));
Integer fnodeID = (Integer)oldFailoverMap.get(leftNodeID);
log.debug(this + " the failover node for the crashed node is " + fnodeID);
-
- boolean doneFailover = false;
ClusterNotification notification = new ClusterNotification(ClusterNotification.TYPE_NODE_LEAVE, leftNodeID.intValue(), null);
clusterNotifier.sendNotification(notification);
-
- if (crashed && isSupportsFailover())
- {
- if (fnodeID == null)
+
+ //if its failover node is not in the new failoverMap, that means its failover node is quarantined with it
+ Integer ffNode = (Integer)failoverMap.get(fnodeID);
+ Integer currentFNode;
+ while (ffNode == null)
+ {
+ //Note this requires the failover map must form a circle! otherwise this may become an infinite loop!
+ currentFNode = fnodeID;
+ //search old map
+ fnodeID = (Integer)oldFailoverMap.get(currentFNode);
+ //then new map
+ ffNode = (Integer)failoverMap.get(fnodeID);
+ }
+
+ //now the fnodeID is an active node
+ QuarantinedNode qNode = new QuarantinedNode(leftNodeID, fnodeID, crashed);
+
+ //now we need take care of those already quarantined nodes
+ Iterator<Integer> iterQ = quarantinedNodes.keySet().iterator();
+ while (iterQ.hasNext())
+ {
+ QuarantinedNode aNode = quarantinedNodes.get(iterQ.next());
+ if (aNode.getFailover().equals(leftNodeID))
{
- throw new IllegalStateException("Cannot find failover node for node " + leftNodeID);
+ //We need to transfer the failover responsibility to the new active one
+ aNode.setFailover(fnodeID);
}
-
- if (fnodeID.intValue() == thisNodeID)
- {
- // The node crashed and we are the failover node so let's perform failover
-
- log.debug(this + ": I am the failover node for node " + leftNodeID + " that crashed");
-
- //if the left node is still alive, don't failover
- if (!isolated)
- {
- performFailover(leftNodeID);
-
- doneFailover = true;
- }
- else
- {
- log.debug(this + ": won't failover for the node " + leftNodeID + " as it is still alive");
- }
- }
}
-
- if (!doneFailover && crashed)
+
+ quarantinedNodes.put(leftNodeID, qNode);
+
+ if (fnodeID == thisNodeID)
{
- // Remove any replicant data and non durable bindings for the node - This will notify any listeners which will
- // recalculate the connection factory delegates and failover delegates.
-
- cleanDataForNode(leftNodeID);
+ this.updateStateInStorage(leftNodeID, STATE_QUARANTINED);
}
-
- if (trace) {log.trace("First node: " + firstNode + " oldFailoverNodeID: " + oldFailoverNodeID + " failoverNodeID: " + failoverNodeID); }
- if (oldFailoverNodeID != failoverNodeID)
- {
- //Failover node for this node has changed
-
- failoverNodeChanged(oldFailoverNodeID, firstNode, false);
- }
+ if (trace) { log.trace("Quarantined node: " + qNode); }
}
+
+ if (trace) {log.trace("First node: " + firstNode + " oldFailoverNodeID: " + oldFailoverNodeID + " failoverNodeID: " + failoverNodeID); }
+
+ if (oldFailoverNodeID != failoverNodeID)
+ {
+ //Failover node for this node has changed
+ failoverNodeChanged(oldFailoverNodeID, firstNode, false);
}
+
+ stateMonitor.newQuarantined();
}
// RequestTarget implementation ------------------------------------------------------------
@@ -3711,7 +3780,7 @@
/*
* This thread does the following:
*
- * Periodically update the timeStamp of this node
+ * Periodically update the timestamp of this node
* and monitor its status. If it becomes quarantined, it will
* make itself to be a standalone
* it also monitors its buddy's state, if it is quarantined, see if it will really die.
@@ -3719,9 +3788,98 @@
*/
private class StateMonitor extends Thread
{
- public void run()
+ private boolean working = true;
+
+ public synchronized void run()
{
+ do
+ {
+ ClusterState newState;
+ try
+ {
+ refreshNodeState();
+ processClusterState();
+ }
+ catch (Exception e)
+ {
+ log.error("Error refreshing state of node: " + thisNodeID);
+ }
+
+ try
+ {
+ wait(refreshPeriod);
+ }
+ catch (InterruptedException e)
+ {
+ }
+
+ } while (working);
+ log.debug("Stop monitoring the stats at node " + thisNodeID);
}
+
+ public synchronized void shutdown()
+ {
+ working = false;
+ notify();
+ }
+
+ public synchronized void newQuarantined()
+ {
+ notify();
+ }
}
+
+ private class ClusterState
+ {
+ Map<Integer, NodeState> states = new HashMap<Integer, NodeState>();
+ public void addNode(int nodeID, long timestamp, int nodeState)
+ {
+ states.put(nodeID, new NodeState(nodeID, timestamp, nodeState));
+ }
+
+ public boolean isQuarantined(int qNodeID)
+ {
+ NodeState nState = states.get(qNodeID);
+ return nState.isQurarntined();
+ }
+
+ public boolean isNodeDead(Integer qNodeID)
+ {
+ NodeState nState = states.get(qNodeID);
+ return nState.isDead();
+ }
+ }
+
+ private class NodeState
+ {
+ private int nodeID;
+ private long timestamp;
+ private int state;
+
+ public NodeState(int nodeID, long timestamp, int state)
+ {
+ this.nodeID = nodeID;
+ this.timestamp = timestamp;
+ this.state = state;
+ }
+
+ public boolean isQurarntined()
+ {
+ return state == STATE_QUARANTINED;
+ }
+
+ //don't rely on state, use timestamp
+ public boolean isDead()
+ {
+ long currentTime = System.currentTimeMillis();
+ long stampAge = currentTime - timestamp;
+ if (stampAge > (2 * refreshPeriod))
+ {
+ return true;
+ }
+ return false;
+ }
+ }
+
}
Added: branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice/NodeDeadRequest.java
===================================================================
--- branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice/NodeDeadRequest.java (rev 0)
+++ branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice/NodeDeadRequest.java 2011-01-18 18:00:47 UTC (rev 8170)
@@ -0,0 +1,85 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2011, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.impl.postoffice;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+
+/**
+ * A NodeDeadRequest
+ *
+ * @author howard
+ *
+ * Created Jan 19, 2011 1:17:36 AM
+ *
+ *
+ */
+public class NodeDeadRequest extends ClusterRequest
+{
+ private int leftNode;
+
+ public NodeDeadRequest(Integer qNode)
+ {
+ leftNode = qNode.intValue();
+ }
+
+ public void write(DataOutputStream out) throws Exception
+ {
+ out.writeInt(leftNode);
+ }
+
+ public void read(DataInputStream in) throws Exception
+ {
+ leftNode = in.readInt();
+ }
+
+ Object execute(RequestTarget office) throws Throwable
+ {
+ return null;
+ }
+
+ byte getType()
+ {
+ return 0;
+ }
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Added: branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice/QuarantinedNode.java
===================================================================
--- branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice/QuarantinedNode.java (rev 0)
+++ branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice/QuarantinedNode.java 2011-01-18 18:00:47 UTC (rev 8170)
@@ -0,0 +1,68 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2011, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.impl.postoffice;
+
+/**
+ * A QuarantinedNode
+ *
+ * @author howard
+ *
+ * Created Jan 18, 2011 11:02:55 AM
+ *
+ *
+ */
+public class QuarantinedNode
+{
+ private Integer nodeID;
+ private Integer failoverID; //it must be an active node at the time of being quarantined.
+ private boolean crashed;
+
+ public QuarantinedNode(Integer leftNodeID, Integer failoverID, boolean crashed)
+ {
+ nodeID = leftNodeID;
+ this.failoverID = failoverID;
+ this.crashed = crashed;
+ }
+
+ public String toString()
+ {
+ return "Quarantined Node[" + nodeID + "], failover[" + failoverID + "], crashed[" + crashed + "]";
+ }
+
+ public Integer getFailover()
+ {
+ return failoverID;
+ }
+
+ public void setFailover(Integer newFailover)
+ {
+ this.failoverID = newFailover;
+ }
+
+ public boolean shouldFailover()
+ {
+ return crashed;
+ }
+
+}
More information about the jboss-cvs-commits
mailing list