[jboss-cvs] JBoss Messaging SVN: r8202 - in branches/JBM1842/src/main/org/jboss/messaging/core/impl: postoffice and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Sun Jan 30 08:44:13 EST 2011


Author: gaohoward
Date: 2011-01-30 08:44:12 -0500 (Sun, 30 Jan 2011)
New Revision: 8202

Modified:
   branches/JBM1842/src/main/org/jboss/messaging/core/impl/JDBCSupport.java
   branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
Log:
fix DB failure handling


Modified: branches/JBM1842/src/main/org/jboss/messaging/core/impl/JDBCSupport.java
===================================================================
--- branches/JBM1842/src/main/org/jboss/messaging/core/impl/JDBCSupport.java	2011-01-30 07:19:46 UTC (rev 8201)
+++ branches/JBM1842/src/main/org/jboss/messaging/core/impl/JDBCSupport.java	2011-01-30 13:44:12 UTC (rev 8202)
@@ -522,7 +522,29 @@
    	protected Connection conn;
 
       private TransactionWrapper wrap;
+      
+      public T executeOnlyOnce() throws Exception
+      {
+         wrap = new TransactionWrapper();
 
+         try
+         {
+            conn = ds.getConnection();
+
+            return doTransaction();
+         }
+         catch (Exception e)
+         {
+            wrap.exceptionOccurred();
+            throw e;
+         }
+         finally
+         {
+            wrap.end();
+            closeConnection(conn);           
+         }         
+      }
+
       public T execute() throws Exception
 		{
 	      wrap = new TransactionWrapper();

Modified: branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2011-01-30 07:19:46 UTC (rev 8201)
+++ branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2011-01-30 13:44:12 UTC (rev 8202)
@@ -506,7 +506,7 @@
       new AddNodeState().executeWithRetry();
    }
    
-   private Integer updateStateInStorage(final int nID, final int newState) throws Exception
+   private Integer updateStateInStorage(final int nID, final int newState, boolean ifRetry) throws Exception
    {
       if (ds == null)
       {
@@ -536,7 +536,11 @@
             return new Integer(row);
          }
       }
-      return (Integer)new UpdateState().executeWithRetry();
+      if (ifRetry)
+      {
+         return (Integer)new UpdateState().executeWithRetry();
+      }
+      return (Integer)new UpdateState().executeOnlyOnce();
    }
    
    private void cleanUpSuspectedNode(final Integer qNode) throws Exception
@@ -570,7 +574,14 @@
                return null;
             }
          }
-         new CleanupDeadNode().executeWithRetry();
+         try
+         {
+            new CleanupDeadNode().executeOnlyOnce();
+         }
+         catch (Exception e)
+         {
+            log.warn("Failed to clean up dead node " + node + " from DB");
+         }
       }
    }
    
@@ -599,8 +610,15 @@
          }
          else if (isDead)
          {
+            QuarantinedNode qNode = suspectedNodes.get(qNodeID);
 
-            QuarantinedNode qNode = suspectedNodes.get(qNodeID);
+            // the node may be null because it has just having a DB problem. In that case we ignore
+            if (qNode == null)
+            {
+               log.debug("node " + qNodeID +
+                         " seems dead but it hasn't been suspected yet. Probably having DB problem.");
+               continue;
+            }
             Integer fNodeID = qNode.getFailover();
 
             Integer foNodeID = (Integer)failoverMap.get(fNodeID);
@@ -627,7 +645,7 @@
                   log.debug(this + ": I am the failover node for node " + qNodeID + " that crashed");
 
                   //update node status. this definitely not me!
-                  this.updateStateInStorage(qNodeID, STATE_DEAD);
+                  this.updateStateInStorage(qNodeID, STATE_DEAD, true);
 
                   performFailover(qNodeID);
 
@@ -646,15 +664,16 @@
    }
    
    //timestamp and query the new state from db
-   private void refreshNodeState() throws Exception
+   private boolean refreshNodeState() throws Exception
    {
       if (ds == null)
       {
-         return;
+         return true;
       }
       
       class RefreshNodeState extends JDBCTxRunner
       {
+         boolean timestampDone = false;
          public Object doTransaction() throws Exception
          {
             PreparedStatement ps  = null;
@@ -669,6 +688,8 @@
 
                ps.executeUpdate();
                
+               timestampDone = true;
+
                synchronized (clusterState)
                {
                   clusterState.clear();
@@ -693,7 +714,16 @@
             return null;
          }
       }
-      new RefreshNodeState().executeWithRetry();
+      RefreshNodeState r = new RefreshNodeState();
+      try
+      {
+         r.executeOnlyOnce();
+      }
+      catch (Exception e)
+      {
+         //ignore;
+      }
+      return r.timestampDone;
    }
 
    public void stop() throws Exception
@@ -1421,13 +1451,13 @@
       //if I am the quarantined node, I do rejoin.
       if (clusterState.isQuarantined(thisNodeID))
       {
-         updateStateInStorage(thisNodeID, STATE_CLUSTERED);
          new Thread() {
             public void run()
             {
                PostOfficeAddressInfo info = new PostOfficeAddressInfo(groupMember.getControlChannelAddress(), groupMember.getDataChannelAddress());
                try
                {
+                  updateStateInStorage(thisNodeID, STATE_CLUSTERED, true);
                   groupMember.multicastControl(new JoinClusterRequest(thisNodeID, info), true);
                }
                catch (Exception e)
@@ -1609,7 +1639,16 @@
 
          if (isFirstNode())
          {
-            clusterState.updateNodeState(thisNodeID, STATE_QUARANTINED);
+            try
+            {
+               clusterState.updateNodeState(thisNodeID, STATE_QUARANTINED);
+            }
+            catch (Exception e)
+            {
+               log.error("Having problem update myself. Shutting down to avoid message duplicated delivery.");
+               this.serverPeer.stopService();
+               return;
+            }
          }
          
          if (trace) { log.trace("Quarantined node: " + qNode); }
@@ -3999,24 +4038,35 @@
          do
          {
             ClusterState newState;
+            boolean timeStampDone = false;
             try
             {
-               refreshNodeState();
-               processClusterState();
+               timeStampDone = refreshNodeState();
+               if (timeStampDone)
+               {
+                  processClusterState();
+               }
+               else if (clusterState.isQuarantined(thisNodeID))
+               {
+                  log.error("I'm orphaned and now I can't tell others that I'm alive. Shutdown node: " + thisNodeID);
+                  serverPeer.stopService();
+                  working = false;
+                  nodeStateRefreshInterval = 1; //let the thread quite quickly.
+               }
+
+               try
+               {
+                  wait(nodeStateRefreshInterval);
+               }
+               catch (InterruptedException e)
+               {
+               }
             }
             catch (Exception e)
             {
                log.error("Error refreshing state of node: " + thisNodeID, e);
             }
 
-            try
-            {
-               wait(nodeStateRefreshInterval);
-            }
-            catch (InterruptedException e)
-            {
-            }
-
          } while (working);
          log.debug("Stop monitoring the stats at node " + thisNodeID);
       }
@@ -4047,7 +4097,7 @@
       {
          NodeState node = states.get(nodeID);
          node.setState(newState);
-         updateStateInStorage(nodeID, newState);
+         updateStateInStorage(nodeID, newState, false);
       }
 
       public boolean allDeadButMe(int nodeID)



More information about the jboss-cvs-commits mailing list