[jboss-cvs] JBoss Messaging SVN: r8169 - in branches/JBM1842: src/main/org/jboss/messaging/core/impl/postoffice and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Jan 17 11:04:34 EST 2011


Author: gaohoward
Date: 2011-01-17 11:04:33 -0500 (Mon, 17 Jan 2011)
New Revision: 8169

Modified:
   branches/JBM1842/integration/EAP4/etc/server/default/deploy/mysql-persistence-service.xml
   branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
Log:
save my work


Modified: branches/JBM1842/integration/EAP4/etc/server/default/deploy/mysql-persistence-service.xml
===================================================================
--- branches/JBM1842/integration/EAP4/etc/server/default/deploy/mysql-persistence-service.xml	2011-01-17 04:44:21 UTC (rev 8168)
+++ branches/JBM1842/integration/EAP4/etc/server/default/deploy/mysql-persistence-service.xml	2011-01-17 16:04:33 UTC (rev 8169)
@@ -130,6 +130,8 @@
 INSERT_BINDING=INSERT INTO JBM_POSTOFFICE (POSTOFFICE_NAME, NODE_ID, QUEUE_NAME, COND, SELECTOR, CHANNEL_ID, CLUSTERED, ALL_NODES) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
 DELETE_BINDING=DELETE FROM JBM_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=? AND QUEUE_NAME=?
 LOAD_BINDINGS=SELECT QUEUE_NAME, COND, SELECTOR, CHANNEL_ID, CLUSTERED, ALL_NODES FROM JBM_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=?
+CREATE_CLUSTER_STATE_TABLE=CREATE TABLE JBM_CLUSTER (NODE_ID INTEGER, PING_TIMESTAMP BIGINT, STATE INTEGER, PRIMARY KEY(NODE_ID)))
+UPDATE_STATE=UPDATE JBM_CLUSTER SET STATE = ? WHERE NODE_ID = ?
       ]]></attribute>
 
       <!-- This post office is non clustered. If you want a clustered post office then set to true -->

Modified: branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2011-01-17 04:44:21 UTC (rev 8168)
+++ branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2011-01-17 16:04:33 UTC (rev 8169)
@@ -107,6 +107,14 @@
    // Constants ------------------------------------------------------------------------------------
 
    private static final Logger log = Logger.getLogger(MessagingPostOffice.class);
+   
+   public static final int STATE_STANDALONE = 1;
+   
+   public static final int STATE_CLUSTERED = 2;
+   
+   public static final int STATE_QUARANTINED = 3;
+   
+   public static final int STATE_DEAD = 4;
 
    //This are only used in testing
    
@@ -246,6 +254,11 @@
    private boolean stopUpdate = false;
    private boolean updateInProcess = false;
       
+   //state
+   private int state = STATE_DEAD;
+   
+   private StateMonitor stateMonitor = null;
+   
    // Constructors ---------------------------------------------------------------------------------
 
    public boolean isFailoverOnNodeLeave()
@@ -411,7 +424,13 @@
 	      put(Replicator.JVM_ID_KEY, clientVMId);
 	      
 	      groupMember.multicastControl(new JoinClusterRequest(thisNodeID, info), true);
+	      
+	      changeState(STATE_CLUSTERED);
       }
+      else
+      {
+         changeState(STATE_STANDALONE);
+      }
    
       //Now load the bindings for this node
       
@@ -421,7 +440,54 @@
 
       log.debug(this + " started");      
    }
+   
+   //this method will trigger a dedicated thread to write time stamp
+   private synchronized void changeState(int newState)
+   {
+      if (state != newState)
+      {
+         this.state = newState;
+         this.updateStateInStorage(state);
+         
+         if (stateMonitor == null)
+         {
+            stateMonitor = new StateMonitor();
+            stateMonitor.start();
+         }
+      }
+   }
+   
+   private void updateStateInStorage(final int newState) throws Exception
+   {
+      if (ds == null)
+      {
+         return;
+      }
+      class InsertBindings extends JDBCTxRunner
+      {
+         public Object doTransaction() throws Exception
+         {
+            PreparedStatement ps  = null;
 
+            try
+            {
+               ps = conn.prepareStatement(getSQLStatement("UPDATE_STATE"));
+
+               ps.setInt(1, newState);
+               ps.setInt(2, thisNodeID);
+
+               ps.executeUpdate();
+            }
+            finally
+            {
+               closeStatement(ps);
+            }
+            return null;
+         }
+      }
+      new InsertBindings().executeWithRetry();
+   }
+
    public void stop() throws Exception
    {
       stopViewUpdate();
@@ -1131,7 +1197,10 @@
    	
    	int oldFailoverNodeID = failoverNodeID;
    	
-      if (trace) { log.trace("Old failover node id: " + oldFailoverNodeID); }      
+      if (trace) { log.trace("Old failover node id: " + oldFailoverNodeID); }
+      
+      //before doing anything, quarantine the nodes
+      quarantine(addresses);
    	
    	calculateFailoverMap();
    	
@@ -1160,6 +1229,13 @@
 	      }
 	      
 	      boolean crashed = failoverOnNodeLeave || !leaveMessageReceived(leftNodeID);
+	      boolean isolated = false;
+	      
+	      //if not really dead, don't do failover
+	      if (crashed)
+	      {
+	         isolated = checkNodeDead(leftNodeID);
+	      }
 	
 	      log.debug(this + ": node " + leftNodeID + " has " + (crashed ? "crashed" : "cleanly left the group"));
       
@@ -1185,14 +1261,22 @@
 		         // The node crashed and we are the failover node so let's perform failover
 		
 		         log.debug(this + ": I am the failover node for node " + leftNodeID + " that crashed");
-		
-		         performFailover(leftNodeID);
+
+		         //if the left node is still alive, don't failover
+		         if (!isolated)
+		         {
+		            performFailover(leftNodeID);
 		         
-		         doneFailover = true;
+		            doneFailover = true;
+		         }
+		         else
+		         {
+		            log.debug(this + ": won't failover for the node " + leftNodeID + " as it is still alive");
+		         }
 		      }
 	      }
       
-	      if (!doneFailover)
+	      if (!doneFailover && crashed)
 	      {
 		      // Remove any replicant data and non durable bindings for the node -  This will notify any listeners which will
 		      // recalculate the connection factory delegates and failover delegates.
@@ -1213,6 +1297,92 @@
 	   sendJMXNotification(VIEW_CHANGED_NOTIFICATION);
    }
    
+   /**
+    * For each node, update its state to be STATE_QUARANTINED
+    * then store the information for later failover.
+    */
+   private void quarantine(List addresses)
+   {
+      Iterator iter = addresses.iterator();
+      while (iter.hasNext())
+      {
+         Address addr = (Address)iter.next();
+         
+         Integer leftNodeID = getNodeIDForSyncAddress(addr);
+         
+         if (leftNodeID == null)
+         {
+            throw new IllegalStateException(this + " cannot find node ID for address " + addr);
+         }
+         
+         boolean crashed = failoverOnNodeLeave || !leaveMessageReceived(leftNodeID);
+         boolean isolated = false;
+         
+         //if not really dead, don't do failover
+         if (crashed)
+         {
+            isolated = checkNodeDead(leftNodeID);
+         }
+   
+         log.debug(this + ": node " + leftNodeID + " has " + (crashed ? "crashed" : "cleanly left the group"));
+      
+         Integer fnodeID = (Integer)oldFailoverMap.get(leftNodeID);
+      
+         log.debug(this + " the failover node for the crashed node is " + fnodeID);
+            
+         boolean doneFailover = false;
+         
+         ClusterNotification notification = new ClusterNotification(ClusterNotification.TYPE_NODE_LEAVE, leftNodeID.intValue(), null);
+         
+         clusterNotifier.sendNotification(notification);
+      
+         if (crashed && isSupportsFailover())
+         {        
+            if (fnodeID == null)
+            {
+               throw new IllegalStateException("Cannot find failover node for node " + leftNodeID);
+            }
+            
+            if (fnodeID.intValue() == thisNodeID)
+            {
+               // The node crashed and we are the failover node so let's perform failover
+      
+               log.debug(this + ": I am the failover node for node " + leftNodeID + " that crashed");
+
+               //if the left node is still alive, don't failover
+               if (!isolated)
+               {
+                  performFailover(leftNodeID);
+               
+                  doneFailover = true;
+               }
+               else
+               {
+                  log.debug(this + ": won't failover for the node " + leftNodeID + " as it is still alive");
+               }
+            }
+         }
+      
+         if (!doneFailover && crashed)
+         {
+            // Remove any replicant data and non durable bindings for the node -  This will notify any listeners which will
+            // recalculate the connection factory delegates and failover delegates.
+      
+            cleanDataForNode(leftNodeID);
+         }
+      
+         if (trace) {log.trace("First node: " + firstNode + " oldFailoverNodeID: " + oldFailoverNodeID + " failoverNodeID: " + failoverNodeID); }
+         
+         if (oldFailoverNodeID != failoverNodeID)
+         {
+            //Failover node for this node has changed
+            
+            failoverNodeChanged(oldFailoverNodeID, firstNode, false);         
+         }
+      }
+      }
+   }
+
    // RequestTarget implementation ------------------------------------------------------------
    
    /*
@@ -3537,5 +3707,21 @@
 		}
    	
    }
+   
+   /*
+    * This thread does the following:
+    * 
+    * Periodically update the timeStamp of this node
+    * and monitor its status. If it becomes quarantined, it will
+    * make itself to be a standalone
+    * it also monitors its buddy's state, if it is quarantined, see if it will really die.
+    * When it dies, trigger failover then.
+    */
+   private class StateMonitor extends Thread
+   {
+      public void run()
+      {
+      }
+   }
 
 }



More information about the jboss-cvs-commits mailing list