[jboss-cvs] JBoss Messaging SVN: r8169 - in branches/JBM1842: src/main/org/jboss/messaging/core/impl/postoffice and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Jan 17 11:04:34 EST 2011
Author: gaohoward
Date: 2011-01-17 11:04:33 -0500 (Mon, 17 Jan 2011)
New Revision: 8169
Modified:
branches/JBM1842/integration/EAP4/etc/server/default/deploy/mysql-persistence-service.xml
branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
Log:
save my work
Modified: branches/JBM1842/integration/EAP4/etc/server/default/deploy/mysql-persistence-service.xml
===================================================================
--- branches/JBM1842/integration/EAP4/etc/server/default/deploy/mysql-persistence-service.xml 2011-01-17 04:44:21 UTC (rev 8168)
+++ branches/JBM1842/integration/EAP4/etc/server/default/deploy/mysql-persistence-service.xml 2011-01-17 16:04:33 UTC (rev 8169)
@@ -130,6 +130,8 @@
INSERT_BINDING=INSERT INTO JBM_POSTOFFICE (POSTOFFICE_NAME, NODE_ID, QUEUE_NAME, COND, SELECTOR, CHANNEL_ID, CLUSTERED, ALL_NODES) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
DELETE_BINDING=DELETE FROM JBM_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=? AND QUEUE_NAME=?
LOAD_BINDINGS=SELECT QUEUE_NAME, COND, SELECTOR, CHANNEL_ID, CLUSTERED, ALL_NODES FROM JBM_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=?
+CREATE_CLUSTER_STATE_TABLE=CREATE TABLE JBM_CLUSTER (NODE_ID INTEGER, PING_TIMESTAMP BIGINT, STATE INTEGER, PRIMARY KEY(NODE_ID)))
+UPDATE_STATE=UPDATE JBM_CLUSTER SET STATE = ? WHERE NODE_ID = ?
]]></attribute>
<!-- This post office is non clustered. If you want a clustered post office then set to true -->
Modified: branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java 2011-01-17 04:44:21 UTC (rev 8168)
+++ branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java 2011-01-17 16:04:33 UTC (rev 8169)
@@ -107,6 +107,14 @@
// Constants ------------------------------------------------------------------------------------
private static final Logger log = Logger.getLogger(MessagingPostOffice.class);
+
+ public static final int STATE_STANDALONE = 1;
+
+ public static final int STATE_CLUSTERED = 2;
+
+ public static final int STATE_QUARANTINED = 3;
+
+ public static final int STATE_DEAD = 4;
//This are only used in testing
@@ -246,6 +254,11 @@
private boolean stopUpdate = false;
private boolean updateInProcess = false;
+ //state
+ private int state = STATE_DEAD;
+
+ private StateMonitor stateMonitor = null;
+
// Constructors ---------------------------------------------------------------------------------
public boolean isFailoverOnNodeLeave()
@@ -411,7 +424,13 @@
put(Replicator.JVM_ID_KEY, clientVMId);
groupMember.multicastControl(new JoinClusterRequest(thisNodeID, info), true);
+
+ changeState(STATE_CLUSTERED);
}
+ else
+ {
+ changeState(STATE_STANDALONE);
+ }
//Now load the bindings for this node
@@ -421,7 +440,54 @@
log.debug(this + " started");
}
+
+ //this method will trigger a dedicated thread to write time stamp
+ private synchronized void changeState(int newState)
+ {
+ if (state != newState)
+ {
+ this.state = newState;
+ this.updateStateInStorage(state);
+
+ if (stateMonitor == null)
+ {
+ stateMonitor = new StateMonitor();
+ stateMonitor.start();
+ }
+ }
+ }
+
+ private void updateStateInStorage(final int newState) throws Exception
+ {
+ if (ds == null)
+ {
+ return;
+ }
+ class InsertBindings extends JDBCTxRunner
+ {
+ public Object doTransaction() throws Exception
+ {
+ PreparedStatement ps = null;
+ try
+ {
+ ps = conn.prepareStatement(getSQLStatement("UPDATE_STATE"));
+
+ ps.setInt(1, newState);
+ ps.setInt(2, thisNodeID);
+
+ ps.executeUpdate();
+ }
+ finally
+ {
+ closeStatement(ps);
+ }
+ return null;
+ }
+ }
+ new InsertBindings().executeWithRetry();
+ }
+
public void stop() throws Exception
{
stopViewUpdate();
@@ -1131,7 +1197,10 @@
int oldFailoverNodeID = failoverNodeID;
- if (trace) { log.trace("Old failover node id: " + oldFailoverNodeID); }
+ if (trace) { log.trace("Old failover node id: " + oldFailoverNodeID); }
+
+ //before doing anything, quarantine the nodes
+ quarantine(addresses);
calculateFailoverMap();
@@ -1160,6 +1229,13 @@
}
boolean crashed = failoverOnNodeLeave || !leaveMessageReceived(leftNodeID);
+ boolean isolated = false;
+
+ //if not really dead, don't do failover
+ if (crashed)
+ {
+ isolated = checkNodeDead(leftNodeID);
+ }
log.debug(this + ": node " + leftNodeID + " has " + (crashed ? "crashed" : "cleanly left the group"));
@@ -1185,14 +1261,22 @@
// The node crashed and we are the failover node so let's perform failover
log.debug(this + ": I am the failover node for node " + leftNodeID + " that crashed");
-
- performFailover(leftNodeID);
+
+ //if the left node is still alive, don't failover
+ if (!isolated)
+ {
+ performFailover(leftNodeID);
- doneFailover = true;
+ doneFailover = true;
+ }
+ else
+ {
+ log.debug(this + ": won't failover for the node " + leftNodeID + " as it is still alive");
+ }
}
}
- if (!doneFailover)
+ if (!doneFailover && crashed)
{
// Remove any replicant data and non durable bindings for the node - This will notify any listeners which will
// recalculate the connection factory delegates and failover delegates.
@@ -1213,6 +1297,92 @@
sendJMXNotification(VIEW_CHANGED_NOTIFICATION);
}
+ /**
+ * For each node, update its state to be STATE_QUARANTINED
+ * then store the information for later failover.
+ */
+ private void quarantine(List addresses)
+ {
+ Iterator iter = addresses.iterator();
+ while (iter.hasNext())
+ {
+ Address addr = (Address)iter.next();
+
+ Integer leftNodeID = getNodeIDForSyncAddress(addr);
+
+ if (leftNodeID == null)
+ {
+ throw new IllegalStateException(this + " cannot find node ID for address " + addr);
+ }
+
+ boolean crashed = failoverOnNodeLeave || !leaveMessageReceived(leftNodeID);
+ boolean isolated = false;
+
+ //if not really dead, don't do failover
+ if (crashed)
+ {
+ isolated = checkNodeDead(leftNodeID);
+ }
+
+ log.debug(this + ": node " + leftNodeID + " has " + (crashed ? "crashed" : "cleanly left the group"));
+
+ Integer fnodeID = (Integer)oldFailoverMap.get(leftNodeID);
+
+ log.debug(this + " the failover node for the crashed node is " + fnodeID);
+
+ boolean doneFailover = false;
+
+ ClusterNotification notification = new ClusterNotification(ClusterNotification.TYPE_NODE_LEAVE, leftNodeID.intValue(), null);
+
+ clusterNotifier.sendNotification(notification);
+
+ if (crashed && isSupportsFailover())
+ {
+ if (fnodeID == null)
+ {
+ throw new IllegalStateException("Cannot find failover node for node " + leftNodeID);
+ }
+
+ if (fnodeID.intValue() == thisNodeID)
+ {
+ // The node crashed and we are the failover node so let's perform failover
+
+ log.debug(this + ": I am the failover node for node " + leftNodeID + " that crashed");
+
+ //if the left node is still alive, don't failover
+ if (!isolated)
+ {
+ performFailover(leftNodeID);
+
+ doneFailover = true;
+ }
+ else
+ {
+ log.debug(this + ": won't failover for the node " + leftNodeID + " as it is still alive");
+ }
+ }
+ }
+
+ if (!doneFailover && crashed)
+ {
+ // Remove any replicant data and non durable bindings for the node - This will notify any listeners which will
+ // recalculate the connection factory delegates and failover delegates.
+
+ cleanDataForNode(leftNodeID);
+ }
+
+ if (trace) {log.trace("First node: " + firstNode + " oldFailoverNodeID: " + oldFailoverNodeID + " failoverNodeID: " + failoverNodeID); }
+
+ if (oldFailoverNodeID != failoverNodeID)
+ {
+ //Failover node for this node has changed
+
+ failoverNodeChanged(oldFailoverNodeID, firstNode, false);
+ }
+ }
+ }
+ }
+
// RequestTarget implementation ------------------------------------------------------------
/*
@@ -3537,5 +3707,21 @@
}
}
+
+ /*
+ * This thread does the following:
+ *
+ * Periodically update the timeStamp of this node
+ * and monitor its status. If it becomes quarantined, it will
+ * make itself to be a standalone
+ * it also monitors its buddy's state, if it is quarantined, see if it will really die.
+ * When it dies, trigger failover then.
+ */
+ private class StateMonitor extends Thread
+ {
+ public void run()
+ {
+ }
+ }
}
More information about the jboss-cvs-commits
mailing list