[jboss-cvs] JBoss Messaging SVN: r8200 - in branches/port1842/src/main/org/jboss/messaging/core/impl: postoffice and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Jan 28 11:11:26 EST 2011


Author: gaohoward
Date: 2011-01-28 11:11:25 -0500 (Fri, 28 Jan 2011)
New Revision: 8200

Modified:
   branches/port1842/src/main/org/jboss/messaging/core/impl/JDBCSupport.java
   branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
Log:
fix left node DB failure issue


Modified: branches/port1842/src/main/org/jboss/messaging/core/impl/JDBCSupport.java
===================================================================
--- branches/port1842/src/main/org/jboss/messaging/core/impl/JDBCSupport.java	2011-01-27 06:54:07 UTC (rev 8199)
+++ branches/port1842/src/main/org/jboss/messaging/core/impl/JDBCSupport.java	2011-01-28 16:11:25 UTC (rev 8200)
@@ -533,6 +533,28 @@
 
       private TransactionWrapper wrap;
 
+      public T executeOnlyOnce() throws Exception
+      {
+         wrap = new TransactionWrapper();
+
+         try
+         {
+            conn = ds.getConnection();
+
+            return doTransaction();
+         }
+         catch (Exception e)
+         {
+            wrap.exceptionOccurred();
+            throw e;
+         }
+         finally
+         {
+            wrap.end();
+            closeConnection(conn);
+         }
+      }
+
       public T execute() throws Exception
 		{
 	      wrap = new TransactionWrapper();

Modified: branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2011-01-27 06:54:07 UTC (rev 8199)
+++ branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2011-01-28 16:11:25 UTC (rev 8200)
@@ -495,7 +495,7 @@
          new AddNodeState().executeWithRetry();
       }
       
-      private Integer updateStateInStorage(final int nID, final int newState) throws Exception
+      private Integer updateStateInStorage(final int nID, final int newState, boolean ifRetry) throws Exception
       {
          if (ds == null)
          {
@@ -525,7 +525,11 @@
                return new Integer(row);
             }
          }
-         return (Integer)new UpdateState().executeWithRetry();
+         if (ifRetry)
+         {
+            return (Integer)new UpdateState().executeWithRetry();
+         }
+         return (Integer)new UpdateState().executeOnlyOnce();
       }
       
       private void cleanUpSuspectedNode(final Integer qNode) throws Exception
@@ -559,7 +563,14 @@
                   return null;
                }
             }
-            new CleanupDeadNode().executeWithRetry();
+            try
+            {
+               new CleanupDeadNode().executeOnlyOnce();
+            }
+            catch (Exception e)
+            {
+               log.warn("Failed to clean up dead node " + node + " from DB");
+            }
          }
       }
       
@@ -588,8 +599,14 @@
             }
             else if (isDead)
             {
-   
                QuarantinedNode qNode = suspectedNodes.get(qNodeID);
+               
+               //the node may be null because it has just having a DB problem. In that case we ignore
+               if (qNode == null)
+               {
+                  log.debug("node " + qNodeID + " seems dead but it hasn't been suspected yet. Probably having DB problem.");
+                  continue;
+               }
                Integer fNodeID = qNode.getFailover();
    
                Integer foNodeID = (Integer)failoverMap.get(fNodeID);
@@ -616,7 +633,7 @@
                      log.debug(this + ": I am the failover node for node " + qNodeID + " that crashed");
    
                      //update node status. this definitely not me!
-                     this.updateStateInStorage(qNodeID, STATE_DEAD);
+                     this.updateStateInStorage(qNodeID, STATE_DEAD, true);
    
                      performFailover(qNodeID);
    
@@ -635,15 +652,16 @@
       }
       
       //timestamp and query the new state from db
-      private void refreshNodeState() throws Exception
+      private boolean refreshNodeState() throws Exception
       {
          if (ds == null)
          {
-            return;
+            return true;
          }
          
          class RefreshNodeState extends JDBCTxRunner
          {
+            boolean timestampDone = false;
             public Object doTransaction() throws Exception
             {
                PreparedStatement ps  = null;
@@ -658,6 +676,8 @@
    
                   ps.executeUpdate();
                   
+                  timestampDone = true;
+                  
                   synchronized (clusterState)
                   {
                      clusterState.clear();
@@ -682,7 +702,16 @@
                return null;
             }
          }
-         new RefreshNodeState().executeWithRetry();
+         RefreshNodeState r = new RefreshNodeState();
+         try
+         {
+            r.executeOnlyOnce();
+         }
+         catch (Exception e)
+         {
+            // ignore;
+         }
+         return r.timestampDone;
       }
    
 
@@ -1358,7 +1387,6 @@
       // if I am the quarantined node, I do rejoin.
       if (clusterState.isQuarantined(thisNodeID))
       {
-         updateStateInStorage(thisNodeID, STATE_CLUSTERED);
          new Thread()
          {
             public void run()
@@ -1367,6 +1395,7 @@
                                                                       groupMember.getDataChannelAddress());
                try
                {
+                  updateStateInStorage(thisNodeID, STATE_CLUSTERED, true);
                   groupMember.multicastControl(new JoinClusterRequest(thisNodeID, info), true);
                }
                catch (Exception e)
@@ -1571,7 +1600,16 @@
 
          if (isFirstNode())
          {
-            clusterState.updateNodeState(thisNodeID, STATE_QUARANTINED);
+            try
+            {
+               clusterState.updateNodeState(thisNodeID, STATE_QUARANTINED);
+            }
+            catch (Exception e)
+            {
+               log.error("Having problem update myself. Shutting down to avoid message duplicated delivery.");
+               this.serverPeer.stopService();
+               return;
+            }
          }
 
          if (trace)
@@ -3971,24 +4009,34 @@
          do
          {
             ClusterState newState;
+            boolean timeStampDone = false;
             try
             {
-               refreshNodeState();
-               processClusterState();
+               timeStampDone = refreshNodeState();
+               if (timeStampDone)
+               {
+                  processClusterState();
+               }
+               else if (clusterState.isQuarantined(thisNodeID))
+               {
+                  log.error("I'm orphaned and now I can't tell others that I'm alive. Shutdown node: " + thisNodeID);
+                  serverPeer.stopService();
+                  working = false;
+                  nodeStateRefreshInterval = 1; // let the thread quite quickly.
+               }
+
+               try
+               {
+                  wait(nodeStateRefreshInterval);
+               }
+               catch (InterruptedException e)
+               {
+               }
             }
             catch (Exception e)
             {
                log.error("Error refreshing state of node: " + thisNodeID, e);
             }
-
-            try
-            {
-               wait(nodeStateRefreshInterval);
-            }
-            catch (InterruptedException e)
-            {
-            }
-
          }
          while (working);
          log.debug("Stop monitoring the stats at node " + thisNodeID);
@@ -4020,7 +4068,7 @@
       {
          NodeState node = states.get(nodeID);
          node.setState(newState);
-         updateStateInStorage(nodeID, newState);
+         updateStateInStorage(nodeID, newState, false);
       }
 
       public boolean allDeadButMe(int nodeID)



More information about the jboss-cvs-commits mailing list