[jboss-cvs] JBoss Messaging SVN: r2814 - in trunk: src/main/org/jboss/messaging/core/contract and 6 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Jun 28 10:06:26 EDT 2007


Author: timfox
Date: 2007-06-28 10:06:25 -0400 (Thu, 28 Jun 2007)
New Revision: 2814

Modified:
   trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
   trunk/src/main/org/jboss/messaging/core/contract/Message.java
   trunk/src/main/org/jboss/messaging/core/impl/ChannelSupport.java
   trunk/src/main/org/jboss/messaging/core/impl/ClusterRoundRobinDistributor.java
   trunk/src/main/org/jboss/messaging/core/impl/MessagingQueue.java
   trunk/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java
   trunk/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java
   trunk/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java
   trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
   trunk/tests/src/org/jboss/test/messaging/jms/clustering/DistributedTopicTest.java
   trunk/tests/src/org/jboss/test/messaging/tools/ServerManagement.java
   trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java
Log:
Removed extraneous logging, and more fixes


Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2007-06-28 08:51:05 UTC (rev 2813)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2007-06-28 14:06:25 UTC (rev 2814)
@@ -543,7 +543,7 @@
             	
             	Replicator rep = (Replicator)postOffice;
             	
-            	rep.put(queue.getName(), ServerSessionEndpoint.DUR_SUB_STATE_NO_CONSUMERS);
+            	rep.remove(queue.getName());
             }
          }
       }

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2007-06-28 08:51:05 UTC (rev 2813)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2007-06-28 14:06:25 UTC (rev 2814)
@@ -117,8 +117,6 @@
    
    static final String DUR_SUB_STATE_CONSUMERS = "C";
    
-   static final String DUR_SUB_STATE_NO_CONSUMERS = "N";
-   
    static final String TEMP_QUEUE_MESSAGECOUNTER_PREFIX = "TempQueue.";
    
    // Static ---------------------------------------------------------------------------------------

Modified: trunk/src/main/org/jboss/messaging/core/contract/Message.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/contract/Message.java	2007-06-28 08:51:05 UTC (rev 2813)
+++ trunk/src/main/org/jboss/messaging/core/contract/Message.java	2007-06-28 14:06:25 UTC (rev 2814)
@@ -38,6 +38,11 @@
 public interface Message extends Streamable
 {
 	/**
+	 * This header can be used to trace a message across the cluster 
+	 */
+   public static final String HEADER_JBM_TRACE_ROUTE = "HEADER_JBM_TRACE_ROUTE";   
+	
+	/**
 	 * This header is set on a message when a message is sucked from one node of the cluster to another
 	 * and order preservation is true.
 	 * The header is checked when sucking messages and if order preservation is true then the message is not accepted.

Modified: trunk/src/main/org/jboss/messaging/core/impl/ChannelSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/ChannelSupport.java	2007-06-28 08:51:05 UTC (rev 2813)
+++ trunk/src/main/org/jboss/messaging/core/impl/ChannelSupport.java	2007-06-28 14:06:25 UTC (rev 2814)
@@ -35,6 +35,7 @@
 import org.jboss.messaging.core.contract.DeliveryObserver;
 import org.jboss.messaging.core.contract.Distributor;
 import org.jboss.messaging.core.contract.Filter;
+import org.jboss.messaging.core.contract.Message;
 import org.jboss.messaging.core.contract.MessageReference;
 import org.jboss.messaging.core.contract.MessageStore;
 import org.jboss.messaging.core.contract.PersistenceManager;
@@ -222,8 +223,6 @@
       {      
 	      if (distributor != null && distributor.getNumberOfReceivers() > 0)
 	      {         
-	      	log.info("Deliver was called");
-	      	
 	         setReceiversReady(true);
 	            
 	         deliverInternal();                  
@@ -456,13 +455,6 @@
       }
    }
 
-   //Only used for testing
-
-   public String toString()
-   {
-      return "ChannelSupport[" + channelID + "]";
-   }
-
    // Package protected ----------------------------------------------------------------------------
    
    // Protected ------------------------------------------------------------------------------------
@@ -520,8 +512,6 @@
          {
          	if (trace) { log.trace(this + " receivers not ready so not delivering"); }
          	
-         	log.info("There are " + this.distributor.getNumberOfReceivers() + " receivers");
-         	
             return;
          }
          
@@ -667,6 +657,22 @@
 
       try
       {  
+      	if (trace)
+   		{
+   			//We add a header that tracks the route of the message across the cluster
+   			
+   			String route = (String)ref.getMessage().getHeader(Message.HEADER_JBM_TRACE_ROUTE);
+   			
+   			if (route == null)
+      		{
+      			route = "nodes:";
+      		}
+   			
+      		route += this + "-";
+      		
+      		ref.getMessage().putHeader(Message.HEADER_JBM_TRACE_ROUTE, route);
+   		}
+      	
          if (tx == null)
          {
             if (persist && ref.getMessage().isReliable() && recoverable)

Modified: trunk/src/main/org/jboss/messaging/core/impl/ClusterRoundRobinDistributor.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/ClusterRoundRobinDistributor.java	2007-06-28 08:51:05 UTC (rev 2813)
+++ trunk/src/main/org/jboss/messaging/core/impl/ClusterRoundRobinDistributor.java	2007-06-28 14:06:25 UTC (rev 2814)
@@ -54,6 +54,8 @@
    
    // Attributes -----------------------------------------------------------------------------------
    
+   private boolean trace = log.isTraceEnabled();
+   
    private Distributor localDistributor;
    
    private Distributor remoteDistributor;
@@ -75,24 +77,24 @@
    {             
       //First try the local distributor
    	
-   	log.info("** first trying with local distributor");
+   	if (trace) { log.trace(this + " first trying with local distributor"); }
    	
    	Delivery del = localDistributor.handle(observer, ref, tx);
    	
-   	log.info("*** local distributor returned " + del);
+   	if (trace) { log.trace(this + " local distributor returned " + del); }
    	
    	if (del == null)
    	{
    		//If no local distributor takes the ref then we try the remote distributor
    		
-   		log.info("** preserve ordering is " + preserveOrdering);
-   		
    		if (preserveOrdering)
    		{
    			if (ref.getMessage().getHeader(Message.CLUSTER_SUCKED) != null)
    			{
    				//The message has already been sucked once - don't suck it again
    				
+   				if (trace) { log.trace(this + " preserveOrdering is true and has already been sucked so not allowing message to be sucked again"); }
+   				
    				return null;
    			}
    			else
@@ -103,19 +105,11 @@
    			}
    		}
    		
-   		log.info("*** sending to remote distributor");
-   		
-   		String wib = (String)ref.getMessage().getHeader("wib");
-   		if (wib == null)
-   		{
-   			wib = "nodes:";
-   		}
-   		wib += ((MessagingQueue)observer).getNodeID() + "-";
-   		ref.getMessage().putHeader("wib", wib);
-   		
+   		if (trace) { log.trace(this + " trying with remote distributor"); }
+   		   		
    		del = remoteDistributor.handle(observer, ref, tx);
-   		
-   		log.info("** remote distributor returned " + del);
+   		   		   	
+   		if (trace) { log.trace(this + " remote distributor returned " + del); }
    	}
    	
    	return del;

Modified: trunk/src/main/org/jboss/messaging/core/impl/MessagingQueue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/MessagingQueue.java	2007-06-28 08:51:05 UTC (rev 2813)
+++ trunk/src/main/org/jboss/messaging/core/impl/MessagingQueue.java	2007-06-28 14:06:25 UTC (rev 2814)
@@ -221,10 +221,6 @@
    			
    				sucker.setConsuming(true);
    			}
-   			else
-   			{
-   				log.info("No receivers ready set setting consuming to false");
-   			}
    		}
    	}
    }

Modified: trunk/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java	2007-06-28 08:51:05 UTC (rev 2813)
+++ trunk/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java	2007-06-28 14:06:25 UTC (rev 2814)
@@ -232,14 +232,10 @@
 	         		while (iter.hasNext())
 	         		{
 	         			Integer nid = (Integer)iter.next();
-	         			
-		         		log.info("*********** CLOSING CLUSTER CONNECTION FOR NODE " + nid);
-		         			         			
+	         			        			
 	         			ConnectionInfo info = (ConnectionInfo)connections.remove(nid);
 	         				         			
 	         			info.close();
-	         					         		
-		         		log.info("******* CLOSED");
 	         		}
 	      		}         	
 	         }

Modified: trunk/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java	2007-06-28 08:51:05 UTC (rev 2813)
+++ trunk/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java	2007-06-28 14:06:25 UTC (rev 2814)
@@ -152,8 +152,6 @@
 		
 		if (trace) { log.trace(this + " Registering sucker"); }
 		
-		log.info("**** starting sucker sucking from queue " + this.getQueueName());
-		
 		localQueue.registerSucker(this);
 		
 		if (trace) { log.trace(this + " Registered sucker"); }

Modified: trunk/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java	2007-06-28 08:51:05 UTC (rev 2813)
+++ trunk/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java	2007-06-28 14:06:25 UTC (rev 2814)
@@ -200,6 +200,11 @@
    	return dataChannel.getLocalAddress();
    }
    
+   public long getCastTimeout()
+   {
+   	return castTimeout;
+   }
+   
    public void multicastControl(ClusterRequest request, boolean sync) throws Exception
    {
    	lock.readLock().acquire();
@@ -382,8 +387,6 @@
    {
    	boolean retrievedState = false;
    	
-   	log.info("***** waiting for state to arrive");
-   	
    	if (controlChannel.getState(null, stateTimeout))
    	{
    		//We are not the first member of the group, so let's wait for state to be got and processed
@@ -453,8 +456,6 @@
    {
       public byte[] getState()
       {
-      	log.info("*** getting state");
-      	
          try
          {
 	      	lock.readLock().acquire();
@@ -472,8 +473,6 @@
 	
 		         byte[] state = groupListener.getState();
 		         
-		         log.info("**** got state " + state);
-		         	
 		         return state;		        
 	      	}
 	      	finally
@@ -495,13 +494,11 @@
 
       public void setState(byte[] bytes)
       {
-      	log.info("************* setting state");
          synchronized (setStateLock)
          {
          	try
          	{
          		groupListener.setState(bytes);
-         		log.info("* set it");
          	}
          	catch (Exception e)
          	{

Modified: trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2007-06-28 08:51:05 UTC (rev 2813)
+++ trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2007-06-28 14:06:25 UTC (rev 2814)
@@ -197,6 +197,8 @@
    
    // Map <node id, PostOfficeAddressInfo>
    private Map nodeIDAddressMap;
+   
+   private Object waitForBindUnbindLock;   
 
    // Constructors ---------------------------------------------------------------------------------
 
@@ -249,6 +251,8 @@
       channelIDMap = new HashMap(); 
       
       nodeIDAddressMap = new ConcurrentHashMap();           
+      
+      waitForBindUnbindLock = new Object();
    }
    
    /*
@@ -381,116 +385,33 @@
    {
    	return officeName;
    }
-   
+  
    public void addBinding(Binding binding, boolean allNodes) throws Exception
    {
    	internalAddBinding(binding, allNodes, true);
-   }
-   
-   public void internalAddBinding(Binding binding, boolean allNodes, boolean sync) throws Exception
-   {
-   	if (trace) { log.trace(this.thisNodeID + " binding " + binding.queue + " with condition " + binding.condition + " all nodes " + allNodes); }
-
-   	if (binding == null)
-      {
-         throw new IllegalArgumentException("Binding is null");
-      }
    	
-   	Condition condition = binding.condition;
-   	
-   	Queue queue = binding.queue;
-   	
-   	if (queue == null)
-      {
-         throw new IllegalArgumentException("Queue is null");
-      }
-   	
-   	if (queue.getNodeID() != thisNodeID)
+   	if (allNodes)
    	{
-   		throw new IllegalArgumentException("Cannot bind a queue from another node");
+	   	//Now we must wait for all the bindings to appear in state
+	   	//This is necessary since the second bind in an all bind is sent asynchronously to avoid deadlock
+   		
+   		waitForBindUnbind(binding.queue.getName(), true);
    	}
-
-      if (condition == null)
-      {
-         throw new IllegalArgumentException("Condition is null");
-      }
-
-      addBindingInMemory(binding);  	
-      
-      if (queue.isRecoverable())
-      {
-         // Need to write the mapping to the database
-         insertBindingInStorage(condition, queue, allNodes);
-      }
-
-      if (clustered && queue.isClustered())
-      {
-      	String filterString = queue.getFilter() == null ? null : queue.getFilter().getFilterString();      	
-      	
-      	MappingInfo info = new MappingInfo(thisNodeID, queue.getName(), condition.toText(), filterString, queue.getChannelID(),
-      			                             queue.isRecoverable(), true,
-      			                             queue.getFullSize(), queue.getPageSize(), queue.getDownCacheSize(),
-      			                             queue.getMaxSize(),
-      			                             queue.isPreserveOrdering());
-      	
-         ClusterRequest request = new BindRequest(info, allNodes);
-
-//         if (sync)
-//         {
-//         	syncSendRequest(request);
-//         }
-//         else
-//         {
-         	//When sending as a result of an all binding being received from the cluster we send the bind on asynchronously
-         	//To avoid a deadlock which happens when you have one sync request hitting a node which then tries to send another back
-         	//to the original node
-           groupMember.multicastControl(request, sync);
-      //   }
-      }
    }
-   
+          
    public void removeBinding(String queueName, boolean allNodes) throws Throwable
    {
-   	this.internalRemoveBinding(queueName, allNodes, true);
-   }
-   
-   private void internalRemoveBinding(String queueName, boolean allNodes, boolean sync) throws Throwable
-   {
-      if (trace) { log.trace(this.thisNodeID + " unbind queue: " + queueName + " all nodes " + allNodes); }
-
-      if (queueName == null)
-      {
-         throw new IllegalArgumentException("Queue name is null");
-      }
-
-      Binding removed = removeBindingInMemory(thisNodeID, queueName);
-      
-      Queue queue = removed.queue;
-      
-      Condition condition = removed.condition;
-      	     	
-      if (queue.isRecoverable())
-      {
-         //Need to remove from db too
-
-         deleteBindingFromStorage(queue);
-      }
-
-      queue.removeAllReferences();
-      
-      if (clustered && queue.isClustered())
-      {
-      	String filterString = queue.getFilter() == null ? null : queue.getFilter().getFilterString();      	
-      	
-      	MappingInfo info = new MappingInfo(thisNodeID, queue.getName(), condition.toText(), filterString, queue.getChannelID(),
-      			                             queue.isRecoverable(), true);
-      	
-	      UnbindRequest request = new UnbindRequest(info, allNodes);
-	
-	      groupMember.multicastControl(request, sync);
-      }
-   }
-                  
+   	internalRemoveBinding(queueName, allNodes, true);
+   	
+   	if (allNodes)
+   	{
+	   	//Now we must wait for all the bindings to be removed from state
+	   	//This is necessary since the second unbind in an all unbind is sent asynchronously to avoid deadlock
+   	
+   		waitForBindUnbind(queueName, false);
+   	}
+   }      
+                 
    public boolean route(MessageReference ref, Condition condition, Transaction tx) throws Exception
    {
       if (ref == null)
@@ -828,7 +749,7 @@
       
       Condition condition = conditionFactory.createCondition(mapping.getConditionText());
       
-      addBindingInMemory(new Binding(condition, queue));  	
+      addBindingInMemory(new Binding(condition, queue));
       
       if (allNodes)
    	{
@@ -837,43 +758,34 @@
    		//There is the possibility that two nodes send a bind all with the same name simultaneously OR
    		//a node starts and sends a bind "ALL" and the other nodes already have a queue with that name
    		//This is ok - but we must check for this and not create the local binding in this case
+   					   				   	
+   		//Bind locally
+
+   		long channelID = channelIDManager.getID();
    		
-   		lock.readLock().acquire();
+   		Queue queue2 = new MessagingQueue(thisNodeID, mapping.getQueueName(), channelID, ms, pm,
+			                                  mapping.isRecoverable(), mapping.getMaxSize(), filter,
+			                                  mapping.getFullSize(), mapping.getPageSize(), mapping.getDownCacheSize(), true,
+			                                  mapping.isPreserveOrdering());
+
+   		//We must cast back asynchronously to avoid deadlock
+   		boolean added = internalAddBinding(new Binding(condition, queue2), false, false);
    		
-   		Queue queue2 = null;
-   		
-   		try
-   		{
-   			if (localNameMap != null && localNameMap.get(mapping.getQueueName()) != null)
-   			{
-   				//Already exists - don't create it again!
-   			}
-   			else
-   			{		   				   		
-		   		//Bind locally
-		
-		   		long channelID = channelIDManager.getID();
+   		if (added)
+   		{	   		
+	   		if (trace) { log.trace(this + " inserted in binding locally"); }			
+	   		
+	   		queue2.load();
 		   		
-		   		queue2 = new MessagingQueue(thisNodeID, mapping.getQueueName(), channelID, ms, pm,
-   				                            mapping.isRecoverable(), mapping.getMaxSize(), filter,
-   				                            mapping.getFullSize(), mapping.getPageSize(), mapping.getDownCacheSize(), true,
-   				                            mapping.isPreserveOrdering());
-		
-		   		internalAddBinding(new Binding(condition, queue2), false, false);
-   			}
+		      queue2.activate();	   		
    		}
-   		finally
-   		{
-   			lock.readLock().release();
-   		}
-   		
-   		if (queue2 != null)
-   		{   		
-	   		queue2.load();
-	   		
-	   		queue2.activate();
-   		}
    	}
+      
+      synchronized (waitForBindUnbindLock)
+      {
+      	if (trace) { log.trace(this + " notifying bind unbind lock"); }
+      	waitForBindUnbindLock.notifyAll();
+      }
    }
  
    /*
@@ -891,13 +803,20 @@
 
       removeBindingInMemory(mapping.getNodeId(), mapping.getQueueName());      
       
+      synchronized (waitForBindUnbindLock)
+      {
+      	if (trace) { log.trace(this + " notifying bind unbind lock"); }
+      	waitForBindUnbindLock.notifyAll();
+      }
+      
       if (allNodes)
       {
       	//Also unbind locally
       	
    		if (trace) { log.trace("allNodes is true, so also forcing a local unbind"); }
    		         	
-   		removeBinding(mapping.getQueueName(), false);
+   		// We must cast back asynchronously to avoid deadlock
+   		internalRemoveBinding(mapping.getQueueName(), false, false);
       }
    }
 
@@ -1210,6 +1129,192 @@
    
    // Private ------------------------------------------------------------------------------------
      
+   private void waitForBindUnbind(String queueName, boolean bind) throws Exception
+   {
+   	if (trace) { log.trace(this + " waiting for " + (bind ? "bind" : "unbind") + " of "+ queueName + " on all nodes"); }
+   	
+   	Set nodesToWaitFor = new HashSet(nodeIDAddressMap.keySet());
+		
+		long timeToWait = groupMember.getCastTimeout();
+		
+		long start = System.currentTimeMillis();
+		
+		boolean boundAll = true;
+		
+		boolean unboundAll = true;
+				
+		synchronized (waitForBindUnbindLock)
+		{			
+			do
+			{		
+				boundAll = true;
+				
+				unboundAll = true;
+								
+				lock.readLock().acquire();
+						
+				try
+				{
+					// Refresh the to wait for map - a node might have left
+					
+					Iterator iter = nodesToWaitFor.iterator();
+					
+					while (iter.hasNext())
+					{
+						Integer node = (Integer)iter.next();
+						
+						if (!nodeIDAddressMap.containsKey(node))
+						{
+							iter.remove();
+						}
+						else
+						{
+							Map nameMap = (Map)this.nameMaps.get(node);
+							
+							if (nameMap != null && nameMap.get(queueName) != null)
+							{
+								if (trace) { log.trace(this + " queue " + queueName + " exists on node " + node); }
+								unboundAll = false;
+							}
+							else
+							{
+								if (trace) { log.trace(this + " queue " + queueName + " does not exist on node " + node); }
+								boundAll = false;
+							}
+						}
+					}
+				}
+				finally
+				{
+					lock.readLock().release();
+				}	
+					
+				if ((bind && !boundAll) || (!bind && !unboundAll))
+				{	
+					try
+					{
+						if (trace) { log.trace(this + " waiting for bind unbind lock"); }
+						waitForBindUnbindLock.wait(groupMember.getCastTimeout());
+						if (trace) { log.trace(this + " woke up"); }
+					}
+					catch (InterruptedException e)
+					{
+						//Ignore
+					}
+					timeToWait -= System.currentTimeMillis() - start;
+				}												
+			}
+			while (((bind && !boundAll) || (!bind && !unboundAll)) && timeToWait > 0);
+			
+			if (trace) { log.trace(this + " waited ok"); }
+		}
+		if ((bind && !boundAll) || (!bind && !unboundAll))
+		{
+			throw new IllegalStateException(this + " timed out waiting for " + (bind ? " bind " : " unbind ") + "ALL to occur");
+		}
+   }
+   
+   private boolean internalAddBinding(Binding binding, boolean allNodes, boolean sync) throws Exception
+   {
+   	if (trace) { log.trace(this.thisNodeID + " binding " + binding.queue + " with condition " + binding.condition + " all nodes " + allNodes); }
+
+   	if (binding == null)
+      {
+         throw new IllegalArgumentException("Binding is null");
+      }
+   	
+   	Condition condition = binding.condition;
+   	
+   	Queue queue = binding.queue;
+   	
+   	if (queue == null)
+      {
+         throw new IllegalArgumentException("Queue is null");
+      }
+   	
+   	if (queue.getNodeID() != thisNodeID)
+   	{
+   		throw new IllegalArgumentException("Cannot bind a queue from another node");
+   	}
+
+      if (condition == null)
+      {
+         throw new IllegalArgumentException("Condition is null");
+      }
+           
+   	//The binding might already exist - this could happen if the queue is bind all simultaneously from more than one node of the cluster
+      boolean added = addBindingInMemory(binding);  			      	     
+
+      if (added)
+      {
+      	if (queue.isRecoverable())
+      	{
+      		// Need to write the mapping to the database
+      		insertBindingInStorage(condition, queue, allNodes);
+      	}
+      	
+      	if (clustered && queue.isClustered())
+         {
+         	String filterString = queue.getFilter() == null ? null : queue.getFilter().getFilterString();      	
+         	
+         	MappingInfo info = new MappingInfo(thisNodeID, queue.getName(), condition.toText(), filterString, queue.getChannelID(),
+         			                             queue.isRecoverable(), true,
+         			                             queue.getFullSize(), queue.getPageSize(), queue.getDownCacheSize(),
+         			                             queue.getMaxSize(),
+         			                             queue.isPreserveOrdering());
+         	
+            ClusterRequest request = new BindRequest(info, allNodes);
+
+            groupMember.multicastControl(request, sync);
+         }      	
+      }      
+      
+      return added;
+   }   
+   
+   private Binding internalRemoveBinding(String queueName, boolean allNodes, boolean sync) throws Throwable
+   {
+      if (trace) { log.trace(this.thisNodeID + " unbind queue: " + queueName + " all nodes " + allNodes); }
+
+      if (queueName == null)
+      {
+         throw new IllegalArgumentException("Queue name is null");
+      }
+
+      Binding removed = removeBindingInMemory(thisNodeID, queueName);
+      
+      //The queue might not be removed (it's already removed) if two unbind all requests are sent simultaneously on the cluster
+      if (removed != null)
+      {	      
+	      Queue queue = removed.queue;
+	      
+	      Condition condition = removed.condition;
+	      	     	
+	      if (queue.isRecoverable())
+	      {
+	         //Need to remove from db too
+	
+	         deleteBindingFromStorage(queue);
+	      }
+	
+	      queue.removeAllReferences();
+	      
+	      if (clustered && queue.isClustered())
+	      {
+	      	String filterString = queue.getFilter() == null ? null : queue.getFilter().getFilterString();      	
+	      	
+	      	MappingInfo info = new MappingInfo(thisNodeID, queue.getName(), condition.toText(), filterString, queue.getChannelID(),
+	      			                             queue.isRecoverable(), true);
+	      	
+		      UnbindRequest request = new UnbindRequest(info, allNodes);
+		
+		      groupMember.multicastControl(request, sync);
+	      }
+      }
+      
+      return removed;
+   }
+   
    private void calculateFailoverMap()
    {
    	 //calculate the failover map
@@ -1275,8 +1380,6 @@
          	
          	List targets = new ArrayList();
          	
-         	log.info("routing to " + queues.size() + " queues");
-         	
          	while (iter.hasNext())
          	{
          		Queue queue = (Queue)iter.next();
@@ -1305,7 +1408,8 @@
 	         			
 	         			if (filter == null || filter.accept(ref.getMessage()))
 	         			{                		
-	         				log.info("Added queue " + queue + " to list of targets");
+	         				if (trace) { log.trace(this + " Added queue " + queue + " to list of targets"); }
+	         				
 	      					targets.add(queue);
 	      					
 	      					if (ref.getMessage().isReliable() && queue.isRecoverable())
@@ -1394,10 +1498,8 @@
          	{
          		Queue queue = (Queue)iter.next();
          		
-         		if (trace) { log.trace("Routing ref to queue " + queue); }
+         		if (trace) { log.trace(this + " Routing ref to queue " + queue); }
          		         	
-         		log.info("Routing ref " + ref + " to queue " + queue);
-         		
          		Delivery del = queue.handle(null, ref, tx);
          		
          		if (trace) { log.trace("Queue returned " + del); }
@@ -1438,14 +1540,17 @@
 	   	
 	   	if (nameMap == null)
 	   	{
-	   		throw new IllegalArgumentException("Cannot find name maps for node " + nodeID);
+	   		log.warn("Cannot find name maps for node " + nodeID);
+	   		
+	   		return null;
 	   	}
 	   	
 	   	Binding binding = (Binding)nameMap.remove(queueName);
 	   	
 	   	if (binding == null)
 	   	{
-	   		throw new IllegalArgumentException("Cannot find binding for queue name " + queueName);
+	   		log.warn("Cannot find binding for queue name " + queueName);
+	   		return null;
 	   	}
 	   	
 	   	if (nameMap.isEmpty())
@@ -1497,14 +1602,14 @@
    	}
    }
    
-   private void addBindingInMemory(Binding binding) throws Exception
+   private boolean addBindingInMemory(Binding binding) throws Exception
    {
    	Queue queue = binding.queue;
    	   	
+   	if (trace) { log.trace(this + " Adding binding in memory " + binding); }
+   	
    	lock.writeLock().acquire();
    	
-   	if (trace) { log.trace(this + " Adding binding in memory " + binding); }
-   	
    	try
    	{	  
    		Integer nid = new Integer(queue.getNodeID());
@@ -1513,14 +1618,16 @@
    		   		
    		if (nameMap != null && nameMap.containsKey(queue.getName()))
    		{
-   			throw new IllegalArgumentException("Name map for node " + nid + " already contains binding for queue " + queue.getName());
+   			log.warn("Name map for node " + nid + " already contains binding for queue " + queue.getName());
+   			
+   			return false;
    		}
    		
    		Long cid = new Long(queue.getChannelID());
    		
    		if (channelIDMap.containsKey(cid))
    		{
-   			throw new IllegalArgumentException("Channel id map for node " + nid + " already contains binding for queue " + cid);
+   			throw new IllegalStateException("Channel id map for node " + nid + " already contains binding for queue " + cid);
    		}
    		
    		if (nameMap == null)
@@ -1568,6 +1675,8 @@
       ClusterNotification notification = new ClusterNotification(ClusterNotification.TYPE_BIND, queue.getNodeID(), queue.getName());
       
       clusterNotifier.sendNotification(notification);
+      
+      return true;
    }
    
    /*
@@ -1647,7 +1756,7 @@
             Queue queue = new MessagingQueue(thisNodeID, queueName, channelID, ms, pm,
                                              true, filter, bindingClustered && clustered);
             
-            log.info("**** loaded binding from storage: " + queueName);
+            if (trace) { log.trace(this + " loaded binding from storage: " + queueName); }
             
             Condition condition = conditionFactory.createCondition(conditionText);
             
@@ -1794,7 +1903,7 @@
 
       lock.writeLock().acquire();
 
-      log.info("** cleaning data for node " + nodeToRemove);
+      if (trace) { log.trace(this + " cleaning data for node " + nodeToRemove); }
       
       try
       {
@@ -1818,14 +1927,8 @@
       			
       			if (queue.getNodeID() == nodeToRemove.intValue())
       			{
-      				log.info("** removing queue " + queue);
-      				
       				toRemove.add(new Binding(condition, queue));
       			}
-      			else
-      			{
-      				log.info("** not removing " + queue);
-      			}
       		}
       	}
       	
@@ -1866,8 +1969,14 @@
       }
       
       //remove node id - address info
-      nodeIDAddressMap.remove(nodeToRemove);      
+      nodeIDAddressMap.remove(nodeToRemove);  
       
+      synchronized (waitForBindUnbindLock)
+      {
+      	if (trace) { log.trace(this + " notifying bind unbind lock"); }
+      	waitForBindUnbindLock.notifyAll();
+      }
+      
       //Recalculate the failover map
       calculateFailoverMap();
       

Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/DistributedTopicTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/DistributedTopicTest.java	2007-06-28 08:51:05 UTC (rev 2813)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/DistributedTopicTest.java	2007-06-28 14:06:25 UTC (rev 2814)
@@ -26,7 +26,6 @@
 import javax.jms.InvalidDestinationException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
 import javax.jms.TextMessage;
@@ -186,6 +185,8 @@
             TextMessage tm = (TextMessage)cons0.receive(1000);
 
             assertNotNull(tm);
+            
+            log.info("Got message " + tm.getText());
 
             assertEquals("message" + i, tm.getText());
          }
@@ -199,6 +200,8 @@
             TextMessage tm = (TextMessage)cons1.receive(1000);
 
             assertNotNull(tm);
+            
+            log.info("Got message " + tm.getText());
 
             assertEquals("message" + i, tm.getText());
          }
@@ -212,6 +215,8 @@
             TextMessage tm = (TextMessage)cons2.receive(1000);
 
             assertNotNull(tm);
+            
+            log.info("Got message " + tm.getText());
 
             assertEquals("message" + i, tm.getText());
          }
@@ -225,6 +230,8 @@
             TextMessage tm = (TextMessage)cons3.receive(1000);
 
             assertNotNull(tm);
+            
+            log.info("Got message " + tm.getText());
 
             assertEquals("message" + i, tm.getText());
          }
@@ -238,6 +245,8 @@
             TextMessage tm = (TextMessage)cons4.receive(1000);
 
             assertNotNull(tm);
+            
+            log.info("Got message " + tm.getText());
 
             assertEquals("message" + i, tm.getText());
          }
@@ -497,8 +506,6 @@
          conn1.start();
          conn2.start();
          
-         Thread.sleep(5000);
-         
          log.info("started");
 
          // Send at node 0 - and make sure the messages are consumable from all the durable subs
@@ -963,11 +970,6 @@
             sess2.unsubscribe("sub");
          }
          catch (Exception ignore) {}
-         try
-         {
-            sess3.unsubscribe("sub");
-         }
-         catch (Exception ignore) {}
 
          MessageConsumer cons1 = sess2.createDurableSubscriber(topic[1], "sub");
          MessageConsumer cons2 = sess3.createDurableSubscriber(topic[2], "sub");
@@ -987,34 +989,65 @@
 
          for (int i = 0; i < NUM_MESSAGES; i++)
          {
-            TextMessage tm = sess1.createTextMessage("message" + i);
+            TextMessage tm = sess1.createTextMessage("message2-" + i);
 
             prod.send(tm);
          }
-
+         
+         
+         int offset = 0;
+         
          for (int i = 0; i < NUM_MESSAGES / 2; i++)
          {
             TextMessage tm = (TextMessage)cons1.receive(1000);
-
             assertNotNull(tm);
-
-            assertEquals("message" + i * 2, tm.getText());
+            log.info("**** got message" + tm.getText());
+            
+            if (tm.getText().substring("message2-".length()).equals("1"))
+            {
+            	offset = 1;
+            }
+            
+            assertEquals("message2-" + (i * 2 + offset), tm.getText());
          }
-
+         
+         Message msg = cons1.receive(2000);
+         assertNull(msg);
+         
+         if (offset == 1)
+         {
+         	offset = 0;
+         }
+         else
+         {
+         	offset = 1;
+         }      
+         
          for (int i = 0; i < NUM_MESSAGES / 2; i++)
          {
             TextMessage tm = (TextMessage)cons2.receive(1000);
-
             assertNotNull(tm);
-
-            assertEquals("message" + (i * 2 + 1), tm.getText());
+            log.info("**** got message" + tm.getText());
+            assertEquals("message2-" + (i * 2 + offset), tm.getText());
          }
-
+         
+         msg = cons2.receive(2000);
+         assertNull(msg);
+                 
          cons1.close();
          cons2.close();
 
          sess2.unsubscribe("sub");
-         sess3.unsubscribe("sub");
+         
+         try
+         {
+         	sess3.unsubscribe("sub");
+         	fail("Should already be unsubscribed");
+         }
+         catch (InvalidDestinationException e)
+         {
+         	//Ok - the previous unsubscribe should do a cluster wide unsubscribe
+         }
 
       }
       finally

Modified: trunk/tests/src/org/jboss/test/messaging/tools/ServerManagement.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/tools/ServerManagement.java	2007-06-28 08:51:05 UTC (rev 2813)
+++ trunk/tests/src/org/jboss/test/messaging/tools/ServerManagement.java	2007-06-28 14:06:25 UTC (rev 2814)
@@ -153,7 +153,6 @@
             {
                // most likely the remote server is not started, so spawn it
                servers[i] = new ServerHolder(ServerManagement.spawn(i), true);
-               log.info("server " + i + " online");
             }
          }
       }

Modified: trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java	2007-06-28 08:51:05 UTC (rev 2813)
+++ trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java	2007-06-28 14:06:25 UTC (rev 2814)
@@ -352,9 +352,7 @@
 
          postOfficeObjectName = sc.registerAndConfigureService(postOfficeConfig);         
          sc.setAttribute(postOfficeObjectName, "Clustered", clustered ? "true" : "false"); 
-         
-         log.info("************* SET CLUSTERED ATTRIBUTE TO " + clustered);
-         
+             
          overrideAttributes(postOfficeObjectName, attrOverrides);
          sc.invoke(postOfficeObjectName, "create", new Object[0], new String[0]);
          sc.invoke(postOfficeObjectName, "start", new Object[0], new String[0]);




More information about the jboss-cvs-commits mailing list