[jboss-cvs] JBoss Messaging SVN: r3226 - in trunk/src/main/org/jboss: messaging/core/impl and 1 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Sat Oct 20 13:35:44 EDT 2007


Author: timfox
Date: 2007-10-20 13:35:44 -0400 (Sat, 20 Oct 2007)
New Revision: 3226

Modified:
   trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
   trunk/src/main/org/jboss/messaging/core/impl/MessagingQueue.java
   trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
Log:
Make the semaphore clearable


Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2007-10-20 16:13:52 UTC (rev 3225)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2007-10-20 17:35:44 UTC (rev 3226)
@@ -1033,9 +1033,12 @@
 	   			{
 	   				if (!delivered)
 	   				{
-   	   				//We have to wait for another response to arrive first
-   	   				
-   	   				throw new IllegalStateException("Responses have come back our of order");
+   	   				// Resonpse has come back out of order - this can happen when the failover node is being changed
+	   					// E.g. failover node changes, replicates start getting sent to the new failover node,
+	   					// then the new node requests to collect the deliveries from this node, at which point we deliver
+	   					// all waiting deliveries. Then the responses to the original ones come back.
+	   					// So we can ignore them
+	   					break;
 	   				}
 	   				else
 	   				{

Modified: trunk/src/main/org/jboss/messaging/core/impl/MessagingQueue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/MessagingQueue.java	2007-10-20 16:13:52 UTC (rev 3225)
+++ trunk/src/main/org/jboss/messaging/core/impl/MessagingQueue.java	2007-10-20 17:35:44 UTC (rev 3226)
@@ -466,13 +466,7 @@
    	if (trace) { log.trace("Adding all from recovery area for node " + nodeID +" set " + ids); }
    	
    	Integer nid = new Integer(nodeID);
-   	
-   	//Sanity check
-   	if (recoveryArea.get(nid) != null)
-   	{
-   		throw new IllegalStateException("There are already message ids for node " + nodeID);
-   	}
-   	   	
+   	   	   
    	if (!(ids instanceof ConcurrentHashMap))
    	{
    		ids = new ConcurrentHashMap(ids);

Modified: trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2007-10-20 16:13:52 UTC (rev 3225)
+++ trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2007-10-20 17:35:44 UTC (rev 3226)
@@ -75,9 +75,9 @@
 import org.jboss.messaging.core.impl.tx.Transaction;
 import org.jboss.messaging.core.impl.tx.TransactionRepository;
 import org.jboss.messaging.core.impl.tx.TxCallback;
+import org.jboss.messaging.util.ClearableSemaphore;
 import org.jboss.messaging.util.ConcurrentHashSet;
 import org.jboss.messaging.util.StreamUtils;
-import org.jboss.messaging.util.Throttle;
 import org.jgroups.Address;
 import org.jgroups.View;
 
@@ -224,12 +224,10 @@
    
    //We keep use a semaphore to limit the number of concurrent replication requests to avoid
    //overwhelming JGroups
-   //private Semaphore replicateSemaphore;
+   private ClearableSemaphore replicateSemaphore;
       
-   //private int maxConcurrentReplications;
+   private int maxConcurrentReplications;
    
-   private Throttle throttle = new Throttle(5000, 5);
-   
    // Constructors ---------------------------------------------------------------------------------
 
    /*
@@ -311,9 +309,7 @@
       
       nbSupport = new NotificationBroadcasterSupport();
       
-    //  this.maxConcurrentReplications = maxConcurrentReplications;
-      
-      //replicateSemaphore = new Semaphore(maxConcurrentReplications, true);
+      replicateSemaphore = new ClearableSemaphore(maxConcurrentReplications);
    }
       
    // MessagingComponent overrides -----------------------------------------------------------------
@@ -459,9 +455,7 @@
    	
    	return added;
    }
-   
-   
-          
+                
    public Binding removeBinding(String queueName, boolean allNodes) throws Throwable
    {
    	Binding binding = internalRemoveBinding(queueName, allNodes, true);
@@ -628,7 +622,8 @@
    	
    	if (reply)
    	{
-   		//replicateSemaphore.acquire();
+   		//replicateSemaphore.tryAcquire(250, TimeUnit.MILLISECONDS);
+   		replicateSemaphore.acquire();
    	}
    	
    	try
@@ -656,8 +651,6 @@
 		   	
 		   if (address != null)
 		   {	   
-		   	throttle.ping();
-		   	
 		   	groupMember.unicastData(request, address);
 		   }
    	}
@@ -665,7 +658,7 @@
    	{
    		if (reply)
    		{
-   		//	replicateSemaphore.release();
+   			replicateSemaphore.release();
    		}
    		
    		throw e;
@@ -683,8 +676,6 @@
 	   	
 	   if (address != null)
 	   {	   
-	   	throttle.ping();
-	   	
 	   	groupMember.unicastData(request, address);
 	   }
 	}
@@ -1290,7 +1281,8 @@
    	
    	if (binding == null)
    	{
-   		throw new IllegalStateException("Cannot find queue with name " + queueName +" has it been deployed?");
+   		//This is ok - maybe new failover node but queue is not yet deployed
+   		return;
    	}
    	
    	Queue queue = binding.queue;
@@ -1306,7 +1298,7 @@
    	//TODO - this does not belong here
    	ServerSessionEndpoint session = serverPeer.getSession(sessionID);
    	
-   //	replicateSemaphore.release();
+   	replicateSemaphore.release();
    	
    	if (session == null)
    	{
@@ -2788,6 +2780,8 @@
    	//The failover node has changed - we need to move our replicated deliveries
    	
    	if (trace) { log.trace("Failover node has changed from " + oldFailoverNodeID + " to " + failoverNodeID); }
+   	
+   	replicateSemaphore.disable();
    	   	
    	if (!firstNode)
    	{	   	
@@ -2836,9 +2830,7 @@
 						
 						session.deliverAnyWaitingDeliveries(null);
 						
-						session.collectDeliveries(deliveries, firstNode, null);
-						
-					//	releaseAndReplaceSemaphore();						
+						session.collectDeliveries(deliveries, firstNode, null);												
 					}   				  
 					
 					if (!firstNode)
@@ -2856,26 +2848,12 @@
 			   		
 			   		if (trace) { log.trace("Sent AddAllReplicatedDeliveriesMessage"); }
 					}
+					
+					replicateSemaphore.enable();
 	   		}
 	   	}  	
-   	}
-   	
-   	//Now we replace the semaphore since some of the acks may not come back from the old failover node
-  // 	releaseAndReplaceSemaphore();
+   	}   	
    }
-   
-//   private void releaseAndReplaceSemaphore()
-//   {
-//   	if (replicateSemaphore != null)
-//   	{
-//   		Semaphore oldSem = replicateSemaphore;
-//
-//   		replicateSemaphore = new Semaphore(maxConcurrentReplications);
-//   		
-//   		oldSem.release(maxConcurrentReplications);   		
-//   	}
-//   }
-//   
 
    /**
     * This method fails over all the queues from node <failedNodeId> onto this node. It is triggered
@@ -3066,9 +3044,9 @@
       				if (session.collectDeliveries(dels, firstNode, queueName))
       				{
       					gotSome = true;
-      				}
-      				
-      			//	releaseAndReplaceSemaphore();
+      				}      				
+      				//Release them all
+						replicateSemaphore.enable();
       			}   				  
       			
       			if (gotSome)




More information about the jboss-cvs-commits mailing list