[jboss-cvs] JBoss Messaging SVN: r8611 - branches/Branch_JBossMessaging_1_4_0_SP3_CP14_JBMESSAGING-1896_JBMESSAGING-1901_JBMESSAGING-1931/src/main/org/jboss/messaging/core/impl/postoffice.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Sep 18 03:48:10 EDT 2013


Author: raggz
Date: 2013-09-18 03:48:09 -0400 (Wed, 18 Sep 2013)
New Revision: 8611

Modified:
   branches/Branch_JBossMessaging_1_4_0_SP3_CP14_JBMESSAGING-1896_JBMESSAGING-1901_JBMESSAGING-1931/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
Log:
Back port of JBMessaging-1896, JBMessaging-1901,JBMessaging-1931


Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP14_JBMESSAGING-1896_JBMESSAGING-1901_JBMESSAGING-1931/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP14_JBMESSAGING-1896_JBMESSAGING-1901_JBMESSAGING-1931/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2013-09-13 06:40:04 UTC (rev 8610)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP14_JBMESSAGING-1896_JBMESSAGING-1901_JBMESSAGING-1931/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2013-09-18 07:48:09 UTC (rev 8611)
@@ -257,6 +257,7 @@
    private Object viewUpdateLock;
    private boolean stopUpdate = false;
    private boolean updateInProcess = false;
+   private Thread updateThread;
    
    private StateMonitor stateMonitor = null;
    
@@ -277,6 +278,7 @@
    
    private Set<Condition> inactiveConditions = new HashSet<Condition>();
    
+   private int initMembers;
    // Constructors ---------------------------------------------------------------------------------
 
    public boolean isFailoverOnNodeLeave()
@@ -458,7 +460,7 @@
       
       started = true;
 
-      log.debug(this + " started");
+      log.debug(this + " started with initMember " + initMembers);
    }
    
    //this method will trigger a dedicated thread to write time stamp
@@ -497,6 +499,7 @@
                
                if (row == 0)
                {
+                  log.trace("inserting this node " + thisNodeID + " to the DB table");
                   ps1 = conn.prepareStatement(getSQLStatement("INSERT_NODE_STATE"));
                   ps1.setInt(1, thisNodeID);
                   ps1.setInt(2, STATE_CLUSTERED);
@@ -735,6 +738,7 @@
 
                if (result > 0)
                {
+                  timestampDone = true;
                   log.debug("Successfully updated cluster health timestamp for node " + thisNodeID);
                }
                else if (result == 0)
@@ -742,8 +746,6 @@
                   log.debug("Cluster health timestamp update for node " + thisNodeID + " failed!");
                }
 
-               timestampDone = true;
-
                synchronized (clusterState)
                {
                   clusterState.clear();
@@ -839,8 +841,12 @@
       {
          if (stopUpdate) return;
          stopUpdate = true;
-         while (updateInProcess)
+         if (trace)
          {
+            log.trace("updateThread is " + updateThread);
+         }
+         while (updateInProcess && (updateThread != Thread.currentThread()))
+         {
             try
             {
                log.info("Waiting for view update finish before stop post office " + this);
@@ -860,15 +866,31 @@
       {
          if (stopUpdate) return false;
          updateInProcess = true;
+         updateThread = Thread.currentThread();
       }
       return true;
    }
 
+   //force view update stop, don't wait
+   public void disableViewUpdate()
+   {
+      if (!clustered) return;
+      synchronized (viewUpdateLock)
+      {
+         if (stopUpdate) return;
+         stopUpdate = true;
+         updateThread = null;
+         viewUpdateLock.notify();
+      }
+      log.trace("View update disabled");
+   }
+
    public void endProcessView()
    {
       synchronized (viewUpdateLock)
       {
          updateInProcess = false;
+         updateThread = null;
          viewUpdateLock.notify();
       }
    }
@@ -1740,7 +1762,7 @@
                catch (Exception e)
                {
                   log.error("Having problem update myself. Shutting down to avoid message duplicated delivery.");
-                  this.serverPeer.stopJBMNodeForRecovery();
+                  stateMonitor.stopJBMNodeForRecovery();
                   return;
                }
             }
@@ -2906,8 +2928,17 @@
    	   //This is ok - it wil be shortly followed by another calculation of the map
    	}
    	
+      initMembers = initMembers < failoverMap.size() ? failoverMap.size() : initMembers;
+
+      log.trace("initMembers updated to: " + initMembers);
       log.debug("Updated failover map:\n" + dumpFailoverMap(failoverMap));   	      
    }
+
+   private boolean iAmAlone()
+   {
+      if (trace) { log.trace("If I am alone in failover Map: " + failoverMap.size() + " initMembers: " + initMembers); }
+      return (failoverMap.size() == 1) && (initMembers > 1);
+   }
    
    private Integer findNodeIDForAddress(Address address)
    {
@@ -4228,11 +4259,16 @@
          }
       }
       
+      public synchronized void stopJBMNodeForRecovery()
+      {
+         disableViewUpdate();
+         serverPeer.stopJBMNodeForRecovery();
+      }
+
       public synchronized void run()
       {
          do
          {
-            ClusterState newState;
             boolean timeStampDone = false;
             try
             {
@@ -4242,10 +4278,10 @@
                {
                   processClusterState();
                }
-               else if (clusterState.isQuarantined(thisNodeID))
+               else if (clusterState.isQuarantined(thisNodeID) || iAmAlone())
                {
                   log.error("I'm orphaned and now I can't tell others that I'm alive. Shutdown node: " + thisNodeID);
-                  serverPeer.stopJBMNodeForRecovery();
+                  stopJBMNodeForRecovery();
                   working = false;
                   nodeStateRefreshInterval = 1; //let the thread quite quickly.
                }



More information about the jboss-cvs-commits mailing list