[jboss-cvs] JBoss Messaging SVN: r8200 - in branches/port1842/src/main/org/jboss/messaging/core/impl: postoffice and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Jan 28 11:11:26 EST 2011
Author: gaohoward
Date: 2011-01-28 11:11:25 -0500 (Fri, 28 Jan 2011)
New Revision: 8200
Modified:
branches/port1842/src/main/org/jboss/messaging/core/impl/JDBCSupport.java
branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
Log:
fix left node DB failure issue
Modified: branches/port1842/src/main/org/jboss/messaging/core/impl/JDBCSupport.java
===================================================================
--- branches/port1842/src/main/org/jboss/messaging/core/impl/JDBCSupport.java 2011-01-27 06:54:07 UTC (rev 8199)
+++ branches/port1842/src/main/org/jboss/messaging/core/impl/JDBCSupport.java 2011-01-28 16:11:25 UTC (rev 8200)
@@ -533,6 +533,28 @@
private TransactionWrapper wrap;
+ public T executeOnlyOnce() throws Exception
+ {
+ wrap = new TransactionWrapper();
+
+ try
+ {
+ conn = ds.getConnection();
+
+ return doTransaction();
+ }
+ catch (Exception e)
+ {
+ wrap.exceptionOccurred();
+ throw e;
+ }
+ finally
+ {
+ wrap.end();
+ closeConnection(conn);
+ }
+ }
+
public T execute() throws Exception
{
wrap = new TransactionWrapper();
Modified: branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java 2011-01-27 06:54:07 UTC (rev 8199)
+++ branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java 2011-01-28 16:11:25 UTC (rev 8200)
@@ -495,7 +495,7 @@
new AddNodeState().executeWithRetry();
}
- private Integer updateStateInStorage(final int nID, final int newState) throws Exception
+ private Integer updateStateInStorage(final int nID, final int newState, boolean ifRetry) throws Exception
{
if (ds == null)
{
@@ -525,7 +525,11 @@
return new Integer(row);
}
}
- return (Integer)new UpdateState().executeWithRetry();
+ if (ifRetry)
+ {
+ return (Integer)new UpdateState().executeWithRetry();
+ }
+ return (Integer)new UpdateState().executeOnlyOnce();
}
private void cleanUpSuspectedNode(final Integer qNode) throws Exception
@@ -559,7 +563,14 @@
return null;
}
}
- new CleanupDeadNode().executeWithRetry();
+ try
+ {
+ new CleanupDeadNode().executeOnlyOnce();
+ }
+ catch (Exception e)
+ {
+ log.warn("Failed to clean up dead node " + node + " from DB");
+ }
}
}
@@ -588,8 +599,14 @@
}
else if (isDead)
{
-
QuarantinedNode qNode = suspectedNodes.get(qNodeID);
+
+ //the node may be null because it has just having a DB problem. In that case we ignore
+ if (qNode == null)
+ {
+ log.debug("node " + qNodeID + " seems dead but it hasn't been suspected yet. Probably having DB problem.");
+ continue;
+ }
Integer fNodeID = qNode.getFailover();
Integer foNodeID = (Integer)failoverMap.get(fNodeID);
@@ -616,7 +633,7 @@
log.debug(this + ": I am the failover node for node " + qNodeID + " that crashed");
//update node status. this definitely not me!
- this.updateStateInStorage(qNodeID, STATE_DEAD);
+ this.updateStateInStorage(qNodeID, STATE_DEAD, true);
performFailover(qNodeID);
@@ -635,15 +652,16 @@
}
//timestamp and query the new state from db
- private void refreshNodeState() throws Exception
+ private boolean refreshNodeState() throws Exception
{
if (ds == null)
{
- return;
+ return true;
}
class RefreshNodeState extends JDBCTxRunner
{
+ boolean timestampDone = false;
public Object doTransaction() throws Exception
{
PreparedStatement ps = null;
@@ -658,6 +676,8 @@
ps.executeUpdate();
+ timestampDone = true;
+
synchronized (clusterState)
{
clusterState.clear();
@@ -682,7 +702,16 @@
return null;
}
}
- new RefreshNodeState().executeWithRetry();
+ RefreshNodeState r = new RefreshNodeState();
+ try
+ {
+ r.executeOnlyOnce();
+ }
+ catch (Exception e)
+ {
+ // ignore;
+ }
+ return r.timestampDone;
}
@@ -1358,7 +1387,6 @@
// if I am the quarantined node, I do rejoin.
if (clusterState.isQuarantined(thisNodeID))
{
- updateStateInStorage(thisNodeID, STATE_CLUSTERED);
new Thread()
{
public void run()
@@ -1367,6 +1395,7 @@
groupMember.getDataChannelAddress());
try
{
+ updateStateInStorage(thisNodeID, STATE_CLUSTERED, true);
groupMember.multicastControl(new JoinClusterRequest(thisNodeID, info), true);
}
catch (Exception e)
@@ -1571,7 +1600,16 @@
if (isFirstNode())
{
- clusterState.updateNodeState(thisNodeID, STATE_QUARANTINED);
+ try
+ {
+ clusterState.updateNodeState(thisNodeID, STATE_QUARANTINED);
+ }
+ catch (Exception e)
+ {
+ log.error("Having problem update myself. Shutting down to avoid message duplicated delivery.");
+ this.serverPeer.stopService();
+ return;
+ }
}
if (trace)
@@ -3971,24 +4009,34 @@
do
{
ClusterState newState;
+ boolean timeStampDone = false;
try
{
- refreshNodeState();
- processClusterState();
+ timeStampDone = refreshNodeState();
+ if (timeStampDone)
+ {
+ processClusterState();
+ }
+ else if (clusterState.isQuarantined(thisNodeID))
+ {
+ log.error("I'm orphaned and now I can't tell others that I'm alive. Shutdown node: " + thisNodeID);
+ serverPeer.stopService();
+ working = false;
+ nodeStateRefreshInterval = 1; // let the thread quite quickly.
+ }
+
+ try
+ {
+ wait(nodeStateRefreshInterval);
+ }
+ catch (InterruptedException e)
+ {
+ }
}
catch (Exception e)
{
log.error("Error refreshing state of node: " + thisNodeID, e);
}
-
- try
- {
- wait(nodeStateRefreshInterval);
- }
- catch (InterruptedException e)
- {
- }
-
}
while (working);
log.debug("Stop monitoring the stats at node " + thisNodeID);
@@ -4020,7 +4068,7 @@
{
NodeState node = states.get(nodeID);
node.setState(newState);
- updateStateInStorage(nodeID, newState);
+ updateStateInStorage(nodeID, newState, false);
}
public boolean allDeadButMe(int nodeID)
More information about the jboss-cvs-commits
mailing list