[jboss-cvs] JBoss Messaging SVN: r2815 - in trunk: src/main/org/jboss/jms/server/destination and 7 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Jun 28 16:11:38 EDT 2007


Author: timfox
Date: 2007-06-28 16:11:38 -0400 (Thu, 28 Jun 2007)
New Revision: 2815

Modified:
   trunk/src/main/org/jboss/jms/server/ServerPeer.java
   trunk/src/main/org/jboss/jms/server/destination/QueueService.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
   trunk/src/main/org/jboss/messaging/core/contract/Binding.java
   trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
   trunk/src/main/org/jboss/messaging/core/impl/MessagingQueue.java
   trunk/src/main/org/jboss/messaging/core/impl/postoffice/MappingInfo.java
   trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
   trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/ClusteredPostOfficeTest.java
   trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/PostOfficeTest.java
   trunk/tests/src/org/jboss/test/messaging/jms/clustering/MergeQueueTest.java
   trunk/tests/src/org/jboss/test/messaging/jms/clustering/base/ClusteringTestBase.java
Log:
Fixed bug in merging queues


Modified: trunk/src/main/org/jboss/jms/server/ServerPeer.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/ServerPeer.java	2007-06-28 14:06:25 UTC (rev 2814)
+++ trunk/src/main/org/jboss/jms/server/ServerPeer.java	2007-06-28 20:11:38 UTC (rev 2815)
@@ -1436,7 +1436,10 @@
       {
          Queue queue = (Queue)iter.next();
          
-         postOffice.removeBinding(queue.getName(), false);
+         //Durable subs need to be removed on all nodes
+         boolean all = !isQueue && queue.isRecoverable();
+         
+         postOffice.removeBinding(queue.getName(), all);
       }
       
       return true;

Modified: trunk/src/main/org/jboss/jms/server/destination/QueueService.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/destination/QueueService.java	2007-06-28 14:06:25 UTC (rev 2814)
+++ trunk/src/main/org/jboss/jms/server/destination/QueueService.java	2007-06-28 20:11:38 UTC (rev 2815)
@@ -109,7 +109,7 @@
                                        destination.getFullSize(), destination.getPageSize(),
                                        destination.getDownCacheSize(), destination.isClustered(),
                                        serverPeer.isDefaultPreserveOrdering());
-            po.addBinding(new Binding(queueCond, queue), false);         
+            po.addBinding(new Binding(queueCond, queue, false), false);         
             
             queue.activate();
          }

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2007-06-28 14:06:25 UTC (rev 2814)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2007-06-28 20:11:38 UTC (rev 2815)
@@ -566,7 +566,7 @@
          	// make a binding for this temporary queue
             
             // temporary queues need to bound on ALL nodes of the cluster
-            postOffice.addBinding(new Binding(cond, coreQueue), true);   
+            postOffice.addBinding(new Binding(cond, coreQueue, true), true);   
             
             coreQueue.activate();
          }         
@@ -702,7 +702,7 @@
          	}
          }
          
-         postOffice.removeBinding(sub.getName(), true);         
+         postOffice.removeBinding(sub.getName(), sub.isClustered());         
          
          String counterName = TopicService.SUBSCRIPTION_MESSAGECOUNTER_PREFIX + sub.getName();
          
@@ -1276,7 +1276,7 @@
             
             JMSCondition topicCond = new JMSCondition(false, jmsDestination.getName());
                         
-            postOffice.addBinding(new Binding(topicCond, queue), false);   
+            postOffice.addBinding(new Binding(topicCond, queue, false), false);   
             
             queue.activate();
 
@@ -1332,9 +1332,9 @@
                                           mDest.isClustered(),
                                           sp.isDefaultPreserveOrdering());
                
-               // Durable subs must be bound on ALL nodes of the cluster
+               // Durable subs must be bound on ALL nodes of the cluster (if clustered)
                
-               postOffice.addBinding(new Binding(new JMSCondition(false, jmsDestination.getName()), queue), true);
+               postOffice.addBinding(new Binding(new JMSCondition(false, jmsDestination.getName()), queue, true), true);
                
                queue.activate();
                   
@@ -1408,7 +1408,7 @@
                   
                   // Durable subs must be unbound on ALL nodes of the cluster
                   
-                  postOffice.removeBinding(queue.getName(), true);                  
+                  postOffice.removeBinding(queue.getName(), mDest.isClustered());                  
                   
                   // create a fresh new subscription
                                     
@@ -1422,7 +1422,7 @@
                   
                   // Durable subs must be bound on ALL nodes of the cluster
                   
-                  postOffice.addBinding(new Binding(new JMSCondition(false, jmsDestination.getName()), queue), true);
+                  postOffice.addBinding(new Binding(new JMSCondition(false, jmsDestination.getName()), queue, true), true);
   
                   queue.activate();                  
                   

Modified: trunk/src/main/org/jboss/messaging/core/contract/Binding.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/contract/Binding.java	2007-06-28 14:06:25 UTC (rev 2814)
+++ trunk/src/main/org/jboss/messaging/core/contract/Binding.java	2007-06-28 20:11:38 UTC (rev 2815)
@@ -32,19 +32,23 @@
  */
 public class Binding
 {
-	public Binding(Condition condition, Queue queue)
+	public Binding(Condition condition, Queue queue, boolean allNodes)
 	{
 		this.condition = condition;
 		
 		this.queue = queue;
+		
+		this.allNodes = allNodes;
 	}
 	
 	public Condition condition;
 	
 	public Queue queue;
 	
+	public boolean allNodes;
+	
 	public String toString()
 	{
-		return "Binding:" + System.identityHashCode(this) + " condition: " + condition + " queue: " + queue;
+		return "Binding:" + System.identityHashCode(this) + " condition: " + condition + " queue: " + queue +" allNodes: " + allNodes;
 	}
 }
\ No newline at end of file

Modified: trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java	2007-06-28 14:06:25 UTC (rev 2814)
+++ trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java	2007-06-28 20:11:38 UTC (rev 2815)
@@ -859,6 +859,13 @@
    {
       if (trace) { log.trace("Merging channel from " + fromChannelID + " to " + toChannelID + " numberToLoad:" + numberToLoad + " firstPagingOrder:" + firstPagingOrder + " nextPagingOrder:" + nextPagingOrder); }
       
+      //Sanity
+      
+      if (fromChannelID == toChannelID)
+      {
+      	throw new IllegalArgumentException("Cannot merge queues - they have the same channel id!!");
+      }
+      
       Connection conn = null;
       PreparedStatement ps = null;
       ResultSet rs = null;

Modified: trunk/src/main/org/jboss/messaging/core/impl/MessagingQueue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/MessagingQueue.java	2007-06-28 14:06:25 UTC (rev 2814)
+++ trunk/src/main/org/jboss/messaging/core/impl/MessagingQueue.java	2007-06-28 20:11:38 UTC (rev 2815)
@@ -183,7 +183,7 @@
     * over to another node, but a queue with the same name already exists. In this case we merge the
     * two queues.
     */
-   public void mergeIn(long channelID) throws Exception
+   public void mergeIn(long theChannelID) throws Exception
    {
       if (trace) { log.trace("Merging queue " + channelID + " into " + this); }
            
@@ -192,7 +192,7 @@
          flushDownCache();
                   
          PersistenceManager.InitialLoadInfo ili =
-            pm.mergeAndLoad(channelID, channelID, fullSize - messageRefs.size(),
+            pm.mergeAndLoad(theChannelID, this.channelID, fullSize - messageRefs.size(),
                             firstPagingOrder, nextPagingOrder);
             
          if (trace) { log.trace("Loaded " + ili.getRefInfos().size() + " refs"); }            

Modified: trunk/src/main/org/jboss/messaging/core/impl/postoffice/MappingInfo.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/postoffice/MappingInfo.java	2007-06-28 14:06:25 UTC (rev 2814)
+++ trunk/src/main/org/jboss/messaging/core/impl/postoffice/MappingInfo.java	2007-06-28 20:11:38 UTC (rev 2815)
@@ -66,12 +66,14 @@
    
    private boolean preserveOrdering;
    
+   private boolean allNodes;
+   
    MappingInfo()
    {      
    }
    
    MappingInfo(int nodeId, String queueName, String conditionText, String filterString,
-               long channelId, boolean recoverable, boolean clustered)
+               long channelId, boolean recoverable, boolean clustered, boolean allNodes)
    {
       this.nodeId = nodeId;
       
@@ -86,13 +88,16 @@
       this.recoverable = recoverable;
         
       this.clustered = clustered;
+      
+      this.allNodes = allNodes;
    }   
    
    MappingInfo(int nodeId, String queueName, String conditionText, String filterString,
-   		      long channelId, boolean recoverable, boolean clustered, int fullSize, int pageSize, int downCacheSize,
+   		      long channelId, boolean recoverable, boolean clustered, boolean allNodes,
+   		      int fullSize, int pageSize, int downCacheSize,
    		      int maxSize, boolean preserveOrdering)
    {
-   	this (nodeId, queueName, conditionText, filterString, channelId, recoverable, clustered);
+   	this (nodeId, queueName, conditionText, filterString, channelId, recoverable, clustered, allNodes);
    	
    	this.fullSize = fullSize;
    	
@@ -123,6 +128,8 @@
       
       clustered = in.readBoolean();
       
+      allNodes = in.readBoolean();
+      
       fullSize = in.readInt();
       
       pageSize = in.readInt();
@@ -150,6 +157,8 @@
       
       out.writeBoolean(clustered);
       
+      out.writeBoolean(allNodes);
+      
       out.writeInt(fullSize);
       
       out.writeInt(pageSize);
@@ -195,6 +204,11 @@
    {
    	return clustered;
    }
+   
+   boolean isAllNodes()
+   {
+   	return allNodes;
+   }
 
    int getFullSize()
    {

Modified: trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2007-06-28 14:06:25 UTC (rev 2814)
+++ trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2007-06-28 20:11:38 UTC (rev 2815)
@@ -199,6 +199,8 @@
    private Map nodeIDAddressMap;
    
    private Object waitForBindUnbindLock;   
+   
+   private Map loadedBindings;   
 
    // Constructors ---------------------------------------------------------------------------------
 
@@ -302,7 +304,9 @@
       if (trace) { log.trace(this + " starting"); }
       
       super.start();
-
+      
+      loadedBindings = getBindingsFromStorage();
+      
       if (clustered)
       {
 	      groupMember.start();
@@ -327,7 +331,7 @@
    
       //Now load the bindings for this node
       
-      loadBindingsFromStorage();  
+      loadBindings();  
       
       started = true;
 
@@ -390,7 +394,7 @@
    {
    	internalAddBinding(binding, allNodes, true);
    	
-   	if (allNodes)
+   	if (allNodes && clustered && binding.queue.isClustered())
    	{
 	   	//Now we must wait for all the bindings to appear in state
 	   	//This is necessary since the second bind in an all bind is sent asynchronously to avoid deadlock
@@ -401,9 +405,9 @@
           
    public void removeBinding(String queueName, boolean allNodes) throws Throwable
    {
-   	internalRemoveBinding(queueName, allNodes, true);
+   	Binding binding = internalRemoveBinding(queueName, allNodes, true);
    	
-   	if (allNodes)
+   	if (binding != null && allNodes && clustered && binding.queue.isClustered())
    	{
 	   	//Now we must wait for all the bindings to be removed from state
 	   	//This is necessary since the second unbind in an all unbind is sent asynchronously to avoid deadlock
@@ -551,7 +555,7 @@
    }
    
    // GroupListener implementation -------------------------------------------------------------
-
+ 
    public void setState(byte[] bytes) throws Exception
    {
       if (trace) { log.trace(this + " received state from group"); }
@@ -582,11 +586,41 @@
          }
          
          Queue queue = new MessagingQueue(mapping.getNodeId(), mapping.getQueueName(), mapping.getChannelId(),
-                                          mapping.isRecoverable(), filter, mapping.isClustered());
+                                          mapping.isRecoverable(), filter, true);
          
          Condition condition = conditionFactory.createCondition(mapping.getConditionText());
          
-         addBindingInMemory(new Binding(condition, queue));
+         addBindingInMemory(new Binding(condition, queue, mapping.isAllNodes()));
+         
+         if (mapping.isAllNodes())
+         {
+         	// insert into db if not already there
+         	if (!loadedBindings.containsKey(queue.getName()))
+         	{
+	         	//Create a local binding too
+	         	
+	      		long channelID = channelIDManager.getID();
+	      			      		
+	      		Queue queue2 = new MessagingQueue(thisNodeID, mapping.getQueueName(), channelID, ms, pm,
+	   			                                  mapping.isRecoverable(), mapping.getMaxSize(), filter,
+	   			                                  mapping.getFullSize(), mapping.getPageSize(), mapping.getDownCacheSize(), true,
+	   			                                  mapping.isPreserveOrdering());     
+	      		
+	      		Binding localBinding = new Binding(condition, queue2, true);
+	      			         	
+	         	if (mapping.isRecoverable())
+	         	{		         	
+	         		//We need to insert it into the database
+	         		if (trace) { log.trace(this + " got all binding in state for queue " + queue.getName() + " inserting it in DB"); }
+	         		
+	         		insertBindingInStorage(condition, queue2, true);	         			         				         	
+	         	}
+	         	
+	         	//	Add it to the loaded map
+	         	
+	            loadedBindings.put(mapping.getQueueName(), localBinding);	         	
+         	}  	
+         }
       }
 
       //Update the replicated data
@@ -607,30 +641,28 @@
       
       try
       {
-	      Iterator iter = mappings.entrySet().iterator();
-	      
-	      while (iter.hasNext())
-	      {
-	      	Map.Entry entry = (Map.Entry)iter.next();
-	      	
-	      	Condition condition = (Condition)entry.getKey();
-	      	
-	      	List queues = (List)entry.getValue();
-	      	
-	      	Iterator iter2 = queues.iterator();
-	      	
-	      	while (iter2.hasNext())
-	      	{
-	      		Queue queue = (Queue)iter2.next();
+      	Iterator iter = nameMaps.values().iterator();
+      	
+      	while (iter.hasNext())
+      	{
+      		Map map = (Map)iter.next();
+      		
+      		Iterator iter2 = map.values().iterator();
+      		
+      		while (iter2.hasNext())
+      		{
+      			Binding binding = (Binding)iter2.next();
+      		
+	      		Queue queue = binding.queue;
 	      		
 	      		//We only get the clustered queues
 	      		if (queue.isClustered())
 	      		{		      		
 		      		String filterString = queue.getFilter() == null ? null : queue.getFilter().getFilterString();
 		      		
-		      		MappingInfo mapping = new MappingInfo(queue.getNodeID(), queue.getName(), condition.toText(),
+		      		MappingInfo mapping = new MappingInfo(queue.getNodeID(), queue.getName(), binding.condition.toText(),
 		      				                                filterString, queue.getChannelID(), queue.isRecoverable(),
-		      				                                true);		      		
+		      				                                true, binding.allNodes);		      		
 		      		list.add(mapping);
 	      		}
 	      	}
@@ -749,7 +781,7 @@
       
       Condition condition = conditionFactory.createCondition(mapping.getConditionText());
       
-      addBindingInMemory(new Binding(condition, queue));
+      addBindingInMemory(new Binding(condition, queue, false));
       
       if (allNodes)
    	{
@@ -769,7 +801,7 @@
 			                                  mapping.isPreserveOrdering());
 
    		//We must cast back asynchronously to avoid deadlock
-   		boolean added = internalAddBinding(new Binding(condition, queue2), false, false);
+   		boolean added = internalAddBinding(new Binding(condition, queue2, true), false, false);
    		
    		if (added)
    		{	   		
@@ -1259,6 +1291,7 @@
          	
          	MappingInfo info = new MappingInfo(thisNodeID, queue.getName(), condition.toText(), filterString, queue.getChannelID(),
          			                             queue.isRecoverable(), true,
+         			                             allNodes,
          			                             queue.getFullSize(), queue.getPageSize(), queue.getDownCacheSize(),
          			                             queue.getMaxSize(),
          			                             queue.isPreserveOrdering());
@@ -1304,7 +1337,7 @@
 	      	String filterString = queue.getFilter() == null ? null : queue.getFilter().getFilterString();      	
 	      	
 	      	MappingInfo info = new MappingInfo(thisNodeID, queue.getName(), condition.toText(), filterString, queue.getChannelID(),
-	      			                             queue.isRecoverable(), true);
+	      			                             queue.isRecoverable(), true, allNodes);
 	      	
 		      UnbindRequest request = new UnbindRequest(info, allNodes);
 		
@@ -1706,15 +1739,16 @@
       groupMember.unicastData(request, address);
    }
    
-   private void loadBindingsFromStorage() throws Exception
+   
+   private Map getBindingsFromStorage() throws Exception
    {
-      lock.writeLock().acquire();
-
       Connection conn = null;
       PreparedStatement ps  = null;
       ResultSet rs = null;
       TransactionWrapper wrap = new TransactionWrapper();
 
+      Map bindings = new HashMap();
+      
       try
       {
          conn = ds.getConnection();
@@ -1760,24 +1794,12 @@
             
             Condition condition = conditionFactory.createCondition(conditionText);
             
-            addBindingInMemory(new Binding(condition, queue));          
+            Binding binding = new Binding(condition, queue, allNodes);
             
-            //Need to broadcast it too
-            if (clustered && queue.isClustered())
-            {
-            	String filterString = queue.getFilter() == null ? null : queue.getFilter().getFilterString();      	            	
-            	
-            	MappingInfo info = new MappingInfo(thisNodeID, queue.getName(), condition.toText(), filterString, queue.getChannelID(),
-            			                             queue.isRecoverable(), true,
-            			                             queue.getFullSize(), queue.getPageSize(), queue.getDownCacheSize(),
-            			                             queue.getMaxSize(),
-            			                             queue.isPreserveOrdering());
-            	
-               ClusterRequest request = new BindRequest(info, allNodes);
-
-               groupMember.multicastControl(request, false);         
-            }
+            bindings.put(queueName, binding);                        
          }
+         
+         return bindings;
       }
       catch (Exception e)
       {
@@ -1786,8 +1808,6 @@
       }
       finally
       {
-         lock.writeLock().release();
-         
          closeResultSet(rs);
          
          closeStatement(ps);
@@ -1795,8 +1815,40 @@
          closeConnection(conn);
 
          wrap.end();
+      }	
+   }
+   
+   private void loadBindings() throws Exception
+   {   	
+      Iterator iter = loadedBindings.values().iterator();
+      
+      while (iter.hasNext())
+      {
+      	Binding binding = (Binding)iter.next();
+      	
+      	addBindingInMemory(binding);    
+      	
+      	Queue queue = binding.queue;
+         
+         //Need to broadcast it too
+         if (clustered && queue.isClustered())
+         {
+         	String filterString = queue.getFilter() == null ? null : queue.getFilter().getFilterString();      	            	
+         	
+         	MappingInfo info = new MappingInfo(thisNodeID, queue.getName(), binding.condition.toText(), filterString, queue.getChannelID(),
+         			                             queue.isRecoverable(), true,
+         			                             binding.allNodes,
+         			                             queue.getFullSize(), queue.getPageSize(), queue.getDownCacheSize(),
+         			                             queue.getMaxSize(),
+         			                             queue.isPreserveOrdering());
+         	
+            ClusterRequest request = new BindRequest(info, binding.allNodes);
+
+            groupMember.multicastControl(request, false);         
+         }      	
       }
    }
+    
 
    private void insertBindingInStorage(Condition condition, Queue queue, boolean allNodes) throws Exception
    {
@@ -1927,7 +1979,7 @@
       			
       			if (queue.getNodeID() == nodeToRemove.intValue())
       			{
-      				toRemove.add(new Binding(condition, queue));
+      				toRemove.add(new Binding(condition, queue, false));
       			}
       		}
       	}
@@ -2087,36 +2139,32 @@
    	
       // Need to lock
       lock.writeLock().acquire();
-
+      
       try
       {
-         Iterator iter = mappings.entrySet().iterator();
-         	
+      	Map nameMap = (Map)this.nameMaps.get(failedNodeID);
+      	
       	List toRemove = new ArrayList();
       	
-      	while (iter.hasNext())
+      	if (nameMap != null)
       	{
-      		Map.Entry entry = (Map.Entry)iter.next();
+      		Iterator iter = nameMap.values().iterator();
       		
-      		Condition condition = (Condition)entry.getKey();
-      		
-      		List queues = (List)entry.getValue();
-      		
-      		Iterator iter2 = queues.iterator();
-      		
-      		while (iter2.hasNext())
+      		while (iter.hasNext())
       		{
-      			Queue queue = (Queue)iter2.next();
+      			Binding binding = (Binding)iter.next();
       			
+      			Queue queue = binding.queue;
+      			
       			if (queue.isRecoverable() && queue.getNodeID() == failedNodeID.intValue())
       			{
-      				toRemove.add(new Binding(condition, queue));
-      			}
+      				toRemove.add(binding);
+      			}      			
       		}
       	}
-         	
-      	iter = toRemove.iterator();
-      	
+      	         	
+      	Iterator iter = toRemove.iterator();
+
       	while (iter.hasNext())
       	{
       		Binding binding = (Binding)iter.next();
@@ -2129,17 +2177,19 @@
             if (!queue.isRecoverable())
             {
                throw new IllegalStateException("Found non recoverable queue " +
-                                               queue.getName() + "in map, these should have been removed!");
+                                               queue.getName() + " in map, these should have been removed!");
             }
 
             // Sanity check
             if (!queue.isClustered())
             {
-               throw new IllegalStateException("Queue " + queue.getName() +
-                                               " is not clustered!");
+               throw new IllegalStateException("Queue " + queue.getName() + " is not clustered!");
             }
+            
+            log.info("**** removing old queue with channel id " + queue.getChannelID());
       		
-            //Remove from the in-memory map
+            //Remove from the in-memory map - no need to broadcast anything - they will get removed from other nodes in memory
+            //maps when the other nodes detect failure
             removeBindingInMemory(binding.queue.getNodeID(), binding.queue.getName());
       		
       		//Delete from storage
@@ -2151,23 +2201,16 @@
             // when the node crashes a view change will hit the other nodes and that will cause
             // all binding data for that node to be removed anyway.
             
-            Collection queues = getQueuesForCondition(condition, true);
+            //Find if there is a local queue with the same name
             
-            Iterator iter2 = queues.iterator();
-            
             Queue localQueue = null;
             
-            while (iter2.hasNext())
+            if (localNameMap != null)
             {
-            	Queue q = (Queue)iter2.next();
+            	Binding b = (Binding)localNameMap.get(queue.getName());
+            	localQueue = b.queue;
             	
-            	if (queue.getName().equals(q.getName()))
-            	{
-            		localQueue = q;
-            		
-            		break;
-            	}
-            	
+            	log.info("Found a local queue with channel id " + localQueue.getChannelID());
             }
             	
             if (localQueue != null)
@@ -2187,10 +2230,8 @@
               	Queue newQueue = new MessagingQueue(thisNodeID, queue.getName(), queue.getChannelID(), queue.isRecoverable(),
               			                              queue.getFilter(), true);
 
-               addBinding(new Binding(condition, newQueue), false);
+               addBinding(new Binding(condition, newQueue, binding.allNodes), false);
                
-               newQueue.deactivate();
-               
                newQueue.load();
                
                //TODO - do we really want to activate ALL the queues - surely only the ones that correspond to deployed destinations??

Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/ClusteredPostOfficeTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/ClusteredPostOfficeTest.java	2007-06-28 14:06:25 UTC (rev 2814)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/ClusteredPostOfficeTest.java	2007-06-28 20:11:38 UTC (rev 2815)
@@ -197,14 +197,14 @@
 
          Condition condition1 = new SimpleCondition("topic1");
          
-         office1.addBinding(new Binding(condition1, queue1), false);
+         office1.addBinding(new Binding(condition1, queue1, false), false);
          
          log.info("Added binding1");
          
          Queue queue2 = new MessagingQueue(1, "sub2", channelIDManager.getID(), ms, pm, false, -1, null, true, false);
          queue2.activate();
 
-         office1.addBinding(new Binding(condition1, queue2), false);
+         office1.addBinding(new Binding(condition1, queue2, false), false);
          
          log.info("Added binding2");
          
@@ -227,7 +227,7 @@
          Queue queue3 = new MessagingQueue(2, "sub3", channelIDManager.getID(), ms, pm, false, -1, null, true, false);
          queue3.activate();
 
-         office2.addBinding(new Binding(condition1, queue3), false);
+         office2.addBinding(new Binding(condition1, queue3, false), false);
   
          // Make sure both nodes pick it up
          
@@ -251,7 +251,7 @@
          Queue queue4 = new MessagingQueue(2, "sub4", channelIDManager.getID(), ms, pm, false, -1, null, true, false);
          queue4.activate();
 
-         office2.addBinding(new Binding(condition1, queue4), false);
+         office2.addBinding(new Binding(condition1, queue4, false), false);
          
          // Make sure both nodes pick it up
          
@@ -308,7 +308,7 @@
          Queue queue5 = new MessagingQueue(3, "sub5", channelIDManager.getID(), ms, pm, false, -1, null, true, false);
          queue5.activate();
          
-         office3.addBinding(new Binding(condition1, queue5), false);
+         office3.addBinding(new Binding(condition1, queue5, false), false);
          
          // Make sure all nodes pick it up
          
@@ -338,12 +338,12 @@
          Queue queue6 = new MessagingQueue(1, "sub6", channelIDManager.getID(), ms, pm, true, -1, null, true, false);
          queue6.activate();
          
-         office1.addBinding(new Binding(condition1, queue6), false);
+         office1.addBinding(new Binding(condition1, queue6, false), false);
          
          Queue queue7 = new MessagingQueue(1, "sub7", channelIDManager.getID(), ms, pm, false, -1, null, true, false);
          queue7.activate();
          
-         office1.addBinding(new Binding(condition1, queue7), false);
+         office1.addBinding(new Binding(condition1, queue7, false), false);
          
          
          // Make sure all nodes pick them up
@@ -489,13 +489,13 @@
          
          //Bind on different conditions
          
-         office1.addBinding(new Binding(condition1, queue8), false);
+         office1.addBinding(new Binding(condition1, queue8, false), false);
          
-         office2.addBinding(new Binding(condition1, queue9), false);
+         office2.addBinding(new Binding(condition1, queue9, false), false);
          
          Condition condition2 = new SimpleCondition("topic2");
          
-         office2.addBinding(new Binding(condition2, queue10), false);
+         office2.addBinding(new Binding(condition2, queue10, false), false);
          
          queues = office1.getQueuesForCondition(condition1, false);
          assertNotNull(queues);
@@ -523,9 +523,9 @@
          Queue queue12 = new MessagingQueue(2, "sub12", channelIDManager.getID(), ms, pm, false, -1, null, false, false);
          queue12.activate();
          
-         office1.addBinding(new Binding(condition1, queue11), false);
+         office1.addBinding(new Binding(condition1, queue11, false), false);
          
-         office2.addBinding(new Binding(condition1, queue12), false);
+         office2.addBinding(new Binding(condition1, queue12, false), false);
          
          queues = office1.getQueuesForCondition(condition1, false);
          assertNotNull(queues);
@@ -594,7 +594,7 @@
          
          Condition condition1 = new SimpleCondition("condition1");         
          
-         office1.addBinding(new Binding(condition1, queue1), true);
+         office1.addBinding(new Binding(condition1, queue1, false), true);
          
          Collection queues = office1.getQueuesForCondition(condition1, false);
                   
@@ -614,7 +614,7 @@
          Queue queue2 = new MessagingQueue(2, "sub2", channelIDManager.getID(), ms, pm, false, -1, null, true, false);
          queue2.activate();
          
-         office2.addBinding(new Binding(condition1, queue2), true);
+         office2.addBinding(new Binding(condition1, queue2, false), true);
          
          queues = office1.getQueuesForCondition(condition1, false);
          
@@ -883,19 +883,19 @@
          
          Condition condition1 = new SimpleCondition("queue1");
          
-         office1.addBinding(new Binding(condition1, queue1), false);
+         office1.addBinding(new Binding(condition1, queue1, false), false);
 
          Queue queue2 = new MessagingQueue(2, "queue1", channelIDManager.getID(), ms, pm, false, -1, null, true, false);
          queue2.activate();
 
-         office2.addBinding(new Binding(condition1, queue2), false);
+         office2.addBinding(new Binding(condition1, queue2, false), false);
 
          Queue queue3 = new MessagingQueue(1, "queue1", channelIDManager.getID(), ms, pm, false, -1, null, true, false);
          queue3.activate();
          
          try
          {
-            office1.addBinding(new Binding(condition1, queue3), false);
+            office1.addBinding(new Binding(condition1, queue3, false), false);
             fail();
          }
          catch (Exception e)
@@ -908,7 +908,7 @@
          
          try
          {
-            office2.addBinding(new Binding(condition1, queue4), false);
+            office2.addBinding(new Binding(condition1, queue4, false), false);
             fail();
          }
          catch (Exception e)

Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/PostOfficeTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/PostOfficeTest.java	2007-06-28 14:06:25 UTC (rev 2814)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/PostOfficeTest.java	2007-06-28 20:11:38 UTC (rev 2815)
@@ -85,12 +85,12 @@
          
          Condition condition1 = new SimpleCondition("condition1");
                   
-         office1.addBinding(new Binding(condition1, queue1), false);
+         office1.addBinding(new Binding(condition1, queue1, false), false);
          
          //Binding twice with the same name should fail      
          try
          {
-            office1.addBinding(new Binding(condition1, queue1), false);
+            office1.addBinding(new Binding(condition1, queue1, false), false);
             fail();
          }
          catch (IllegalArgumentException e)
@@ -106,7 +106,7 @@
 	         MessagingQueue queuexx =
 	            new MessagingQueue(777, "durableQueue", channelIDManager.getID(), ms, pm, true, 1, null, false, false);
 	         queuexx.activate();
-	         office1.addBinding(new Binding(condition1, queuexx), false);
+	         office1.addBinding(new Binding(condition1, queuexx, false), false);
             fail();
          }
          catch (IllegalArgumentException e)
@@ -122,7 +122,7 @@
          
          Condition condition2 = new SimpleCondition("condition2");         
          
-         office1.addBinding(new Binding(condition2, queue2), false);
+         office1.addBinding(new Binding(condition2, queue2, false), false);
          
          //Check they're there
          
@@ -222,44 +222,44 @@
          MessagingQueue queue1 = new MessagingQueue(1, "queue1", channelIDManager.getID(), ms, pm, false, -1, null, false, false);
          queue1.activate();
          
-         office.addBinding(new Binding(condition1, queue1), false);
+         office.addBinding(new Binding(condition1, queue1, false), false);
          
          MessagingQueue queue2 = new MessagingQueue(1, "queue2", channelIDManager.getID(), ms, pm, false, -1, null, false, false);
          queue2.activate();
          
-         office.addBinding(new Binding(condition1, queue2), false);
+         office.addBinding(new Binding(condition1, queue2, false), false);
          
          MessagingQueue queue3 = new MessagingQueue(1, "queue3", channelIDManager.getID(), ms, pm, false, -1, null, false, false);
          queue3.activate();
          
-         office.addBinding(new Binding(condition1, queue3), false);
+         office.addBinding(new Binding(condition1, queue3, false), false);
          
          MessagingQueue queue4 = new MessagingQueue(1, "queue4", channelIDManager.getID(), ms, pm, false, -1, null, false, false);
          queue4.activate();
          
-         office.addBinding(new Binding(condition1, queue4), false);
+         office.addBinding(new Binding(condition1, queue4, false), false);
          
          MessagingQueue queue5 = new MessagingQueue(1, "queue5", channelIDManager.getID(), ms, pm, false, -1, null, false, false);
          queue5.activate();
          
          Condition condition2 = new SimpleCondition("condition2");         
          
-         office.addBinding(new Binding(condition2, queue5), false);
+         office.addBinding(new Binding(condition2, queue5, false), false);
          
          MessagingQueue queue6 = new MessagingQueue(1, "queue6", channelIDManager.getID(), ms, pm, false, -1, null, false, false);
          queue6.activate();
          
-         office.addBinding(new Binding(condition2, queue6), false);
+         office.addBinding(new Binding(condition2, queue6, false), false);
          
          MessagingQueue queue7 = new MessagingQueue(1, "queue7", channelIDManager.getID(), ms, pm, false, -1, null, false, false);
          queue7.activate();
          
-         office.addBinding(new Binding(condition2, queue7), false);
+         office.addBinding(new Binding(condition2, queue7, false), false);
          
          MessagingQueue queue8 = new MessagingQueue(1, "queue8", channelIDManager.getID(), ms, pm,  false, -1, null, false, false);
          queue8.activate();
          
-         office.addBinding(new Binding(condition2, queue8), false);
+         office.addBinding(new Binding(condition2, queue8, false), false);
                   
          Collection queues = office.getQueuesForCondition(new SimpleCondition("dummy"), true);
          assertNotNull(queues);
@@ -342,15 +342,15 @@
          Condition condition1 = new SimpleCondition("condition1");  
          
          MessagingQueue queue1 = new MessagingQueue(1, "queue1", channelIDManager.getID(), ms, pm, false, -1, null, false, false);
-         office.addBinding(new Binding(condition1, queue1), false);
+         office.addBinding(new Binding(condition1, queue1, false), false);
                          
          MessagingQueue queue2 = new MessagingQueue(1, "queue2", channelIDManager.getID(), ms, pm, false, -1, null, false, false);
-         office.addBinding(new Binding(condition1, queue2), false);
+         office.addBinding(new Binding(condition1, queue2, false), false);
          
          Condition condition2 = new SimpleCondition("condition2");  
                   
          MessagingQueue queue3 = new MessagingQueue(1, "queue3", channelIDManager.getID(), ms, pm, false, -1, null, false, false);
-         office.addBinding(new Binding(condition2, queue3), false);
+         office.addBinding(new Binding(condition2, queue3, false), false);
          
          Binding b1 = office.getBindingForQueueName("queue1");
          assertNotNull(b1);
@@ -420,15 +420,15 @@
          Condition condition1 = new SimpleCondition("condition1");  
          
          MessagingQueue queue1 = new MessagingQueue(1, "queue1", channelIDManager.getID(), ms, pm, false, -1, null, false, false);
-         office.addBinding(new Binding(condition1, queue1), false);
+         office.addBinding(new Binding(condition1, queue1, false), false);
                          
          MessagingQueue queue2 = new MessagingQueue(1, "queue2", channelIDManager.getID(), ms, pm, false, -1, null, false, false);
-         office.addBinding(new Binding(condition1, queue2), false);
+         office.addBinding(new Binding(condition1, queue2, false), false);
          
          Condition condition2 = new SimpleCondition("condition2");  
                   
          MessagingQueue queue3 = new MessagingQueue(1, "queue3", channelIDManager.getID(), ms, pm, false, -1, null, false, false);
-         office.addBinding(new Binding(condition2, queue3), false);
+         office.addBinding(new Binding(condition2, queue3, false), false);
          
          Binding b1 = office.getBindingForChannelID(queue1.getChannelID());
          assertNotNull(b1);
@@ -530,17 +530,17 @@
          MessagingQueue queue1 =  new MessagingQueue(1, "queue1", channelIDManager.getID(), ms, pm, false, -1, null, false, false);
          queue1.activate();
          
-         postOffice.addBinding(new Binding(condition1, queue1), false);
+         postOffice.addBinding(new Binding(condition1, queue1, false), false);
          
          MessagingQueue queue2 =  new MessagingQueue(1, "queue2", channelIDManager.getID(), ms, pm, false, -1, null, false, false);
          queue2.activate();
          
-         postOffice.addBinding(new Binding(condition1, queue2), false);
+         postOffice.addBinding(new Binding(condition1, queue2, false), false);
          
          MessagingQueue queue3 = new MessagingQueue(1, "queue3", channelIDManager.getID(), ms, pm, false, -1, null, false, false);
          queue3.activate();
          
-         postOffice.addBinding(new Binding(condition1, queue3), false);
+         postOffice.addBinding(new Binding(condition1, queue3, false), false);
          
          MessagingQueue queue4 = new MessagingQueue(1, "queue4", channelIDManager.getID(), ms, pm, false, -1, null, false, false);
          queue4.activate();
@@ -548,17 +548,17 @@
          Condition condition2 = new SimpleCondition("topic2");
          
          
-         postOffice.addBinding(new Binding(condition2, queue4), false);
+         postOffice.addBinding(new Binding(condition2, queue4, false), false);
          
          MessagingQueue queue5 = new MessagingQueue(1, "queue5", channelIDManager.getID(), ms, pm, false,-1, null, false, false);
          queue5.activate();
          
-         postOffice.addBinding(new Binding(condition2, queue5), false);
+         postOffice.addBinding(new Binding(condition2, queue5, false), false);
          
          MessagingQueue queue6 = new MessagingQueue(1, "queue6", channelIDManager.getID(), ms, pm, false, -1, null, false, false);
          queue6.activate();
          
-         postOffice.addBinding(new Binding(condition2, queue6), false);
+         postOffice.addBinding(new Binding(condition2, queue6, false), false);
       
          SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
          queue1.getLocalDistributor().add(receiver1);
@@ -686,7 +686,7 @@
          MessagingQueue queue1 = new MessagingQueue(1, "queue1", channelIDManager.getID(), ms, pm, false, -1, null, false, false);
          queue1.activate();
          
-         postOffice.addBinding(new Binding(new SimpleCondition("condition1"), queue1), false);
+         postOffice.addBinding(new Binding(new SimpleCondition("condition1"), queue1, false), false);
               
          SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);;
          
@@ -743,17 +743,17 @@
          MessagingQueue queue1 = new MessagingQueue(1, "queue1", channelIDManager.getID(), ms, pm, false, -1, filter, false, false);
          queue1.activate();
          
-         postOffice.addBinding(new Binding(condition1, queue1), false);
+         postOffice.addBinding(new Binding(condition1, queue1, false), false);
          
          MessagingQueue queue2 = new MessagingQueue(1, "queue2", channelIDManager.getID(), ms, pm, false, -1, null, false, false);
          queue2.activate();
          
-         postOffice.addBinding(new Binding(condition1, queue2), false);
+         postOffice.addBinding(new Binding(condition1, queue2, false), false);
          
          MessagingQueue queue3 = new MessagingQueue(1, "queue3", channelIDManager.getID(), ms, pm, false, -1, null, false, false);
          queue3.activate();
          
-         postOffice.addBinding(new Binding(condition1, queue3), false);
+         postOffice.addBinding(new Binding(condition1, queue3, false), false);
          
          SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
          queue1.getLocalDistributor().add(receiver1);
@@ -848,34 +848,34 @@
          MessagingQueue queue1 = new MessagingQueue(1, "queue1", channelIDManager.getID(), ms, pm, false, -1, null, false, false);
          queue1.activate();
          
-         postOffice.addBinding(new Binding(condition1, queue1), false);
+         postOffice.addBinding(new Binding(condition1, queue1, false), false);
          
          MessagingQueue queue2 = new MessagingQueue(1, "queue2", channelIDManager.getID(), ms, pm, false, -1, null, false, false);
          queue2.activate();
          
-         postOffice.addBinding(new Binding(condition1, queue2), false);
+         postOffice.addBinding(new Binding(condition1, queue2, false), false);
          
          MessagingQueue queue3 = new MessagingQueue(1, "queue3", channelIDManager.getID(), ms, pm, false, -1, null, false, false);
          queue3.activate();
          
-         postOffice.addBinding(new Binding(condition1, queue3), false);
+         postOffice.addBinding(new Binding(condition1, queue3, false), false);
          
          MessagingQueue queue4 = new MessagingQueue(1, "queue4", channelIDManager.getID(), ms, pm, true, -1, null, false, false);
          queue4.activate();
          
          Condition condition2 = new SimpleCondition("topic2");
          
-         postOffice.addBinding(new Binding(condition2, queue4), false);
+         postOffice.addBinding(new Binding(condition2, queue4, false), false);
          
          MessagingQueue queue5 = new MessagingQueue(1, "queue5", channelIDManager.getID(), ms, pm, true, -1, null, false, false);
          queue5.activate();
          
-         postOffice.addBinding(new Binding(condition2, queue5), false);
+         postOffice.addBinding(new Binding(condition2, queue5, false), false);
          
          MessagingQueue queue6 = new MessagingQueue(1, "queue6", channelIDManager.getID(), ms, pm, true, -1, null, false, false);
          queue6.activate();
          
-         postOffice.addBinding(new Binding(condition2, queue6), false);
+         postOffice.addBinding(new Binding(condition2, queue6, false), false);
       
          SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);;
          queue1.getLocalDistributor().add(receiver1);
@@ -1026,12 +1026,12 @@
          MessagingQueue queue1 = new MessagingQueue(1, "queue1", channelIDManager.getID(), ms, pm, false, -1, null, false, false);
          queue1.activate();
          
-         postOffice.addBinding(new Binding(condition1, queue1), false);
+         postOffice.addBinding(new Binding(condition1, queue1, false), false);
          
          MessagingQueue queue2 = new MessagingQueue(1, "queue2", channelIDManager.getID(), ms, pm, true,-1, null, false, false);
          queue2.activate();
          
-         postOffice.addBinding(new Binding(condition1, queue2), false);
+         postOffice.addBinding(new Binding(condition1, queue2, false), false);
           
          SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);;
          queue1.getLocalDistributor().add(receiver1);

Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/MergeQueueTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/MergeQueueTest.java	2007-06-28 14:06:25 UTC (rev 2814)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/MergeQueueTest.java	2007-06-28 20:11:38 UTC (rev 2815)
@@ -491,6 +491,8 @@
 
          MessageProducer producer0 = session0.createProducer(queue0);
 
+         log.info("sending messages on node 0");
+         
          for (int i = 0; i < messages0; i++)
          {
             producer0.send(session0.createTextMessage("message " + i));
@@ -503,6 +505,8 @@
          
          //Send some more on node 1
          
+         log.info("Sending some messages on node 1");
+         
          Session session1 = conn1.createSession(true, Session.SESSION_TRANSACTED);
 
          MessageProducer producer1 = session1.createProducer(queue1);
@@ -586,8 +590,7 @@
          catch (Exception ignore)
          {            
          }
-         
-         
+                  
          if (conn0!=null)
          {
             conn0.close();

Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/base/ClusteringTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/base/ClusteringTestBase.java	2007-06-28 14:06:25 UTC (rev 2814)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/base/ClusteringTestBase.java	2007-06-28 20:11:38 UTC (rev 2815)
@@ -204,17 +204,17 @@
 
       // wait for the client-side failover to complete
 
-      while(true)
-         {
-            FailoverEvent event = failoverListener.getEvent(120000);
-         if (event != null && FailoverEvent.FAILOVER_COMPLETED == event.getType())
-         {
-            break;
-         }
-         if (event == null)
-         {
-            fail("Did not get expected FAILOVER_COMPLETED event");
-         }
+      while (true)
+      {
+      	FailoverEvent event = failoverListener.getEvent(120000);
+      	if (event != null && FailoverEvent.FAILOVER_COMPLETED == event.getType())
+      	{
+      		break;
+      	}
+      	if (event == null)
+      	{
+      		fail("Did not get expected FAILOVER_COMPLETED event");
+      	}
       }
 
       // failover complete




More information about the jboss-cvs-commits mailing list