[jboss-cvs] JBoss Messaging SVN: r8468 - in branches/JBPAPP-7443: src/main/org/jboss/messaging/core/impl/postoffice and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Nov 1 09:36:21 EDT 2011


Author: raggz
Date: 2011-11-01 09:36:20 -0400 (Tue, 01 Nov 2011)
New Revision: 8468

Modified:
   branches/JBPAPP-7443/
   branches/JBPAPP-7443/src/main/org/jboss/messaging/core/impl/postoffice/GroupListener.java
   branches/JBPAPP-7443/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java
   branches/JBPAPP-7443/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
Log:
Backport of JBMessaging-1796 needed for JBMessaging-1842


Property changes on: branches/JBPAPP-7443
___________________________________________________________________
Modified: svn:mergeinfo
   - /branches/Branch_1_4:8151,8237-8238,8245,8257
   + /branches/Branch_1_4:7996,7999,8151,8237-8238,8245,8257

Modified: branches/JBPAPP-7443/src/main/org/jboss/messaging/core/impl/postoffice/GroupListener.java
===================================================================
--- branches/JBPAPP-7443/src/main/org/jboss/messaging/core/impl/postoffice/GroupListener.java	2011-11-01 10:51:51 UTC (rev 8467)
+++ branches/JBPAPP-7443/src/main/org/jboss/messaging/core/impl/postoffice/GroupListener.java	2011-11-01 13:36:20 UTC (rev 8468)
@@ -42,4 +42,14 @@
 	byte[] getState() throws Exception;
 	
 	void setState(byte[] state) throws Exception;
+
+	//tell the listener that it will begin to process the view change.
+	//if the listener thinks the change cannot be made (for example 
+	//shutdown has been initiated), return false. Otherwise return true.
+	
+	//introduced for https://jira.jboss.org/jira/browse/JBMESSAGING-1796
+   boolean beginProcessView();
+   
+   //tell the listener that the view has been processed
+   void endProcessView();
 }

Modified: branches/JBPAPP-7443/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java
===================================================================
--- branches/JBPAPP-7443/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java	2011-11-01 10:51:51 UTC (rev 8467)
+++ branches/JBPAPP-7443/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java	2011-11-01 13:36:20 UTC (rev 8468)
@@ -493,6 +493,11 @@
       public void viewAccepted(final View newView)
       {     	
       	 log.info(this  + " got new view " + newView + ", old view is " + currentView);
+      	 
+      	 if (!groupListener.beginProcessView())
+      	 {
+      	    return;
+      	 }
 		      	
          if (newView instanceof MergeView)
          {
@@ -534,7 +539,7 @@
                if (!leftNodes.isEmpty())
                {
                	groupListener.nodesLeft(leftNodes);
-                log.info("Dead members: " + leftNodes.size() + " (" + leftNodes + ")");
+                  log.info("Dead members: " + leftNodes.size() + " (" + leftNodes + ")");
                }
             }
             List nodesAdded=new ArrayList();
@@ -557,6 +562,8 @@
          {
             log.error("Caught Exception in MembershipListener", e);
          }
+         
+         groupListener.endProcessView();
       }
       	
       public byte[] getState()
@@ -658,4 +665,5 @@
          }
       }
    }
+   
 }

Modified: branches/JBPAPP-7443/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- branches/JBPAPP-7443/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2011-11-01 10:51:51 UTC (rev 8467)
+++ branches/JBPAPP-7443/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2011-11-01 13:36:20 UTC (rev 8468)
@@ -240,6 +240,11 @@
    private boolean useJGroupsWorkaround;
    
    private boolean failoverOnNodeLeave;
+
+   //needed only in clustered constructor
+   private Object viewUpdateLock;
+   private boolean stopUpdate = false;
+   private boolean updateInProcess = false;
       
    // Constructors ---------------------------------------------------------------------------------
 
@@ -333,6 +338,8 @@
      
       this.clustered = true;
       
+      this.viewUpdateLock = new Object();
+      
       this.failoverOnNodeLeave = failoverOnNodeLeave;
       
       groupMember = new GroupMember(groupName, stateTimeout, castTimeout, jChannelFactory, this, this);
@@ -379,8 +386,10 @@
       
       if (clustered)
       {
-	      groupMember.start();
+         stopUpdate = false;
 
+         groupMember.start();
+
 	      //Sanity check - we check there aren't any other nodes already in the cluster with the same node id
 	      if (knowAboutNodeId(thisNodeID))
 	      {
@@ -413,34 +422,87 @@
       log.debug(this + " started");      
    }
 
-   public synchronized void stop() throws Exception
-   {      
-      if (!started)
-   	{
-   		log.warn(this + " is not started");
-   		
-   		return;
-   	}
-   	
-      if (trace) { log.trace(this + " stopping"); }
-            
-      super.stop();      
-      
-      if (clustered)
-      {	       
-	      //Need to send this *before* stopping
-      	groupMember.multicastControl(new LeaveClusterRequest(thisNodeID), true);
-	
-	      groupMember.stop();
+   public void stop() throws Exception
+   {
+      stopViewUpdate();
+
+      synchronized (this)
+      {
+         if (!started)
+         {
+            log.warn(this + " is not started");
+
+            return;
+         }
+
+         if (trace)
+         {
+            log.trace(this + " stopping");
+         }
+
+         super.stop();
+
+         if (clustered)
+         {
+            // Need to send this *before* stopping
+            groupMember.multicastControl(new LeaveClusterRequest(thisNodeID), true);
+            groupMember.stop();
+         }
+
+         deInit();
+
+         started = false;
+
+         log.debug(this + " stopped");
       }
-      
-      deInit();
-      
-      started = false;
+   }
 
-      log.debug(this + " stopped");
+   /**
+    * This method prevents further view update before
+    * shutdown. The stop() method will call it.
+    * https://jira.jboss.org/jira/browse/JBMESSAGING-1796
+    */
+   private void stopViewUpdate()
+   {
+      if (!clustered) return;
+      synchronized (viewUpdateLock)
+      {
+         if (stopUpdate) return;
+         stopUpdate = true;
+         while (updateInProcess)
+         {
+            try
+            {
+               log.info("Waiting for view update finish before stop post office " + this);
+               viewUpdateLock.wait();
+               log.info("View update finished, stpping post office: " + this);
+            }
+            catch (InterruptedException e)
+            {
+            }
+         }
+      }
    }
 
+   public boolean beginProcessView()
+   {
+      synchronized (viewUpdateLock)
+      {
+         if (stopUpdate) return false;
+         updateInProcess = true;
+      }
+      return true;
+   }
+
+   public void endProcessView()
+   {
+      synchronized (viewUpdateLock)
+      {
+         updateInProcess = false;
+         viewUpdateLock.notify();
+      }
+   }
+
    // NotificationBroadcaster implementation -------------------------------------------------------
 
    public void addNotificationListener(NotificationListener listener,



More information about the jboss-cvs-commits mailing list