[jboss-cvs] JBoss Messaging SVN: r8348 - in branches/Branch_JBossMessaging_1_4_0_SP3_CP09_JBMESSAGING-1801_JBMESSAGING-1805_JBMESSAGING-1851_JBMESSAGING-1864: src/main/org/jboss/messaging/core/impl/postoffice and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Jun 16 14:12:20 EDT 2011


Author: jbertram
Date: 2011-06-16 14:12:20 -0400 (Thu, 16 Jun 2011)
New Revision: 8348

Modified:
   branches/Branch_JBossMessaging_1_4_0_SP3_CP09_JBMESSAGING-1801_JBMESSAGING-1805_JBMESSAGING-1851_JBMESSAGING-1864/
   branches/Branch_JBossMessaging_1_4_0_SP3_CP09_JBMESSAGING-1801_JBMESSAGING-1805_JBMESSAGING-1851_JBMESSAGING-1864/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
Log:
JBPAPP-6588


Property changes on: branches/Branch_JBossMessaging_1_4_0_SP3_CP09_JBMESSAGING-1801_JBMESSAGING-1805_JBMESSAGING-1851_JBMESSAGING-1864
___________________________________________________________________
Modified: svn:mergeinfo
   - /branches/Branch_1_4:8010,8013,8238,8245
   + /branches/Branch_1_4:8010,8013,8238,8245,8305

Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP09_JBMESSAGING-1801_JBMESSAGING-1805_JBMESSAGING-1851_JBMESSAGING-1864/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP09_JBMESSAGING-1801_JBMESSAGING-1805_JBMESSAGING-1851_JBMESSAGING-1864/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2011-06-16 12:14:35 UTC (rev 8347)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP09_JBMESSAGING-1801_JBMESSAGING-1805_JBMESSAGING-1851_JBMESSAGING-1864/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2011-06-16 18:12:20 UTC (rev 8348)
@@ -238,6 +238,9 @@
    
    private boolean failoverOnNodeLeave;
       
+   //https://issues.jboss.org/browse/JBMESSAGING-1864
+   private Object failoverLock = new Object();
+   
    // Constructors ---------------------------------------------------------------------------------
 
    public boolean isFailoverOnNodeLeave()
@@ -462,27 +465,30 @@
    
    public boolean addBinding(Binding binding, boolean allNodes) throws Exception
    {
-   	if (allNodes && !binding.queue.isClustered())
-   	{
-   		throw new IllegalArgumentException("Cannot bind a non clustered queue on all nodes");
-   	}
-   		
-   	boolean added = internalAddBinding(binding, allNodes, true);
-   	
-   	if (added && allNodes && clustered && binding.queue.isClustered())
-   	{
-	   	//Now we must wait for all the bindings to appear in state
-	   	//This is necessary since the second bind in an all bind is sent asynchronously to avoid deadlock
-   		
-   		waitForBindUnbind(binding.queue.getName(), true);
-   	}
-   	
-   	if (added)
-   	{
-   		requestDeliveries(binding.queue);
-   	}
-   	
-   	return added;
+      synchronized (failoverLock)
+      {
+         if (allNodes && !binding.queue.isClustered())
+         {
+            throw new IllegalArgumentException("Cannot bind a non clustered queue on all nodes");
+         }
+
+         boolean added = internalAddBinding(binding, allNodes, true);
+
+         if (added && allNodes && clustered && binding.queue.isClustered())
+         {
+            // Now we must wait for all the bindings to appear in state
+            // This is necessary since the second bind in an all bind is sent asynchronously to avoid deadlock
+
+            waitForBindUnbind(binding.queue.getName(), true);
+         }
+
+         if (added)
+         {
+            requestDeliveries(binding.queue);
+         }
+
+         return added;
+      }
    }
                 
    public Binding removeBinding(String queueName, boolean allNodes) throws Throwable
@@ -3114,133 +3120,141 @@
    	
       pm.mergeTransactions(failedNodeID.intValue(), thisNodeID);
       
-      // Need to lock
-      boolean intr = Thread.interrupted();
-      for (;;)
+      synchronized (failoverLock)
       {
-         try
+
+         // Need to lock
+         boolean intr = Thread.interrupted();
+         for (;;)
          {
-            lock.writeLock().acquire();
-            break;
+            try
+            {
+               lock.writeLock().acquire();
+               break;
+            }
+            catch (InterruptedException ex)
+            {
+               intr = true;
+            }
          }
-         catch (InterruptedException ex)
+
+         try
          {
-            intr = true;
-         }
-      }
+            Map nameMap = (Map)nameMaps.get(failedNodeID);
 
-      try
-      {
-      	Map nameMap = (Map)nameMaps.get(failedNodeID);
-      	
-      	List toRemove = new ArrayList();
-      	
-      	if (nameMap != null)
-      	{
-      		Iterator iter = nameMap.values().iterator();
-      		
-      		while (iter.hasNext())
-      		{
-      			Binding binding = (Binding)iter.next();
-      			
-      			Queue queue = binding.queue;
-      			
-      			if (queue.isRecoverable() && queue.getNodeID() == failedNodeID.intValue())
-      			{
-      				toRemove.add(binding);
-      			}      			
-      		}
-      	}
-      	         	
-      	Iterator iter = toRemove.iterator();
+            List toRemove = new ArrayList();
 
-      	while (iter.hasNext())
-      	{
-      		Binding binding = (Binding)iter.next();
-      		
-      		Condition condition = binding.condition;
-      		
-      		Queue queue = binding.queue;
-      		
-      		// Sanity check
-            if (!queue.isRecoverable())
+            if (nameMap != null)
             {
-               throw new IllegalStateException("Found non recoverable queue " +
-                                               queue.getName() + " in map, these should have been removed!");
+               Iterator iter = nameMap.values().iterator();
+
+               while (iter.hasNext())
+               {
+                  Binding binding = (Binding)iter.next();
+
+                  Queue queue = binding.queue;
+
+                  if (queue.isRecoverable() && queue.getNodeID() == failedNodeID.intValue())
+                  {
+                     toRemove.add(binding);
+                  }
+               }
             }
 
-            // Sanity check
-            if (!queue.isClustered())
+            Iterator iter = toRemove.iterator();
+
+            while (iter.hasNext())
             {
-               throw new IllegalStateException("Queue " + queue.getName() + " is not clustered!");
+               Binding binding = (Binding)iter.next();
+
+               Condition condition = binding.condition;
+
+               Queue queue = binding.queue;
+
+               // Sanity check
+               if (!queue.isRecoverable())
+               {
+                  throw new IllegalStateException("Found non recoverable queue " + queue.getName() +
+                                                  " in map, these should have been removed!");
+               }
+
+               // Sanity check
+               if (!queue.isClustered())
+               {
+                  throw new IllegalStateException("Queue " + queue.getName() + " is not clustered!");
+               }
+
+               // Remove from the in-memory map - no need to broadcast anything - they will get removed from other nodes
+               // in memory
+               // maps when the other nodes detect failure
+               removeBindingInMemory(binding.queue.getNodeID(), binding.queue.getName());
+
+               // Find if there is a local queue with the same name
+
+               Queue localQueue = null;
+
+               if (localNameMap != null)
+               {
+                  Binding b = (Binding)localNameMap.get(queue.getName());
+                  if (b != null)
+                  {
+                     localQueue = b.queue;
+                  }
+               }
+
+               if (localQueue != null)
+               {
+                  // need to merge the queues
+
+                  log.debug(this + " has already a queue: " + queue.getName() + " queue so merging queues");
+
+                  localQueue.mergeIn(queue.getChannelID(), failedNodeID.intValue());
+
+                  log.debug("Merged queue");
+
+                  // Delete from storage
+
+                  // Note we must do this *after* we have done any merge.
+                  // This is because if we did it first, then the merge failed, we'd be left with the old channel
+                  // deleted
+                  // but the messages would have still be in the old channel
+                  // meaning they would have disappeared from the users point of view and it would involve manual
+                  // database intervention to correct it
+                  // See http://jira.jboss.com/jira/browse/JBMESSAGING-1113
+
+                  deleteBindingFromStorage(queue);
+
+                  log.debug(this + " deleted binding for " + queue.getName());
+               }
+               else
+               {
+                  // Cannot failover if there is no queue deployed.
+
+                  log.warn("Cannot failover " + queue.getName() +
+                           " since it does not exist on this node. " +
+                           "You must deploy your clustered destinations on ALL nodes of the cluster");
+               }
+
+               // Note we do not need to send an unbind request across the cluster - this is because
+               // when the node crashes a view change will hit the other nodes and that will cause
+               // all binding data for that node to be removed anyway.
             }
-            
-            //Remove from the in-memory map - no need to broadcast anything - they will get removed from other nodes in memory
-            //maps when the other nodes detect failure
-            removeBindingInMemory(binding.queue.getNodeID(), binding.queue.getName());
-      		
-            //Find if there is a local queue with the same name
-            
-            Queue localQueue = null;
-            
-            if (localNameMap != null)
-            {
-            	Binding b = (Binding)localNameMap.get(queue.getName());
-            	if (b != null)
-            	{
-            		localQueue = b.queue;
-            	}
-            }
-            	
-            if (localQueue != null)
-            {
-               //need to merge the queues
-            	
-            	log.debug(this + " has already a queue: " + queue.getName() + " queue so merging queues");
-            	  
-               localQueue.mergeIn(queue.getChannelID(), failedNodeID.intValue());
-               
-               log.debug("Merged queue");       
-               
-               //Delete from storage
-               
-               //Note we must do this *after* we have done any merge.
-               //This is because if we did it first, then the merge failed, we'd be left with the old channel deleted
-               //but the messages would have still be in the old channel
-               //meaning they would have disappeared from the users point of view and it would involve manual
-               //database intervention to correct it
-               //See http://jira.jboss.com/jira/browse/JBMESSAGING-1113
-               
-               deleteBindingFromStorage(queue);
-            
-               log.debug(this + " deleted binding for " + queue.getName());
-            }
-            else
-            {
-            	//Cannot failover if there is no queue deployed.
-            	
-            	log.warn("Cannot failover " + queue.getName() + " since it does not exist on this node. " + 
-            			                          "You must deploy your clustered destinations on ALL nodes of the cluster");
-            }    
 
-            // Note we do not need to send an unbind request across the cluster - this is because
-            // when the node crashes a view change will hit the other nodes and that will cause
-            // all binding data for that node to be removed anyway.            
+            log.debug(this + ": server side fail over is now complete");
          }
+         finally
+         {
+            lock.writeLock().release();
+            if (intr)
+               Thread.currentThread().interrupt();
+         }
 
-         log.debug(this + ": server side fail over is now complete");
+         // Now clean the data for the failed node
+
+         // TODO - does this need to be inside the lock above?
+         cleanDataForNode(failedNodeID);
       }
-      finally
-      {
-         lock.writeLock().release();
-         if (intr) Thread.currentThread().interrupt();
-      }
-      
-      //Now clean the data for the failed node
-      
-      //TODO - does this need to be inside the lock above?
-      cleanDataForNode(failedNodeID);
-      
+
       log.debug(this + " announcing that failover procedure is complete");
 
       notification = new ClusterNotification(ClusterNotification.TYPE_FAILOVER_END, failedNodeID.intValue(), null);



More information about the jboss-cvs-commits mailing list