[Jboss-cvs] JBoss Messaging SVN: r1361 - in trunk: src/main/org/jboss/jms/server/endpoint src/main/org/jboss/messaging/core src/main/org/jboss/messaging/core/plugin/postoffice src/main/org/jboss/messaging/core/plugin/postoffice/cluster tests/src/org/jboss/test/messaging/core/plugin/base tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Sat Sep 23 14:04:10 EDT 2006


Author: timfox
Date: 2006-09-23 14:04:00 -0400 (Sat, 23 Sep 2006)
New Revision: 1361

Modified:
   trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
   trunk/src/main/org/jboss/messaging/core/ChannelSupport.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultRouterFactory.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesRequest.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesResponse.java
   trunk/tests/src/org/jboss/test/messaging/core/plugin/base/ClusteringTestBase.java
   trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeTest.java
   trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionTest.java
Log:
Clustering


Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2006-09-23 14:21:10 UTC (rev 1360)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2006-09-23 18:04:00 UTC (rev 1361)
@@ -135,7 +135,7 @@
    }
    
    // SessionDelegate implementation --------------------------------
-
+   
 	public ConsumerDelegate createConsumerDelegate(JBossDestination jmsDestination,
                                                   String selectorString,
                                                   boolean noLocal,
@@ -193,17 +193,30 @@
                // non-durable subscription
                if (log.isTraceEnabled()) { log.trace("creating new non-durable subscription on " + jmsDestination); }
                      
-               //Create the sub
+               //Create the non durable sub
                QueuedExecutor executor = (QueuedExecutor)pool.get();
                
-               PagingFilteredQueue q = 
-                  new PagingFilteredQueue(new GUID().toString(), idm.getId(), ms, pm, true, false,                          
-                                          executor, selector, mDest.getFullSize(),
-                                          mDest.getPageSize(),
-                                          mDest.getDownCacheSize());       
+               PagingFilteredQueue q;
                
-               //Make a binding for this queue - non durable subscriptins are always non clustered
-               binding = topicPostOffice.bindQueue(jmsDestination.getName(), q);               
+               if (topicPostOffice.isLocal())
+               {
+                  q = new PagingFilteredQueue(new GUID().toString(), idm.getId(), ms, pm, true, false,                              
+                                              executor, selector,
+                                              mDest.getFullSize(),
+                                              mDest.getPageSize(),
+                                              mDest.getDownCacheSize());
+                  
+                  binding = topicPostOffice.bindQueue(jmsDestination.getName(), q);      
+               }
+               else
+               {
+                  q = new LocalClusteredQueue(topicPostOffice, nodeId, new GUID().toString(), idm.getId(), ms, pm, true, false,                              
+                                              executor, selector, tr,
+                                              mDest.getFullSize(),
+                                              mDest.getPageSize(),
+                                              mDest.getDownCacheSize());
+                  binding = ((ClusteredPostOffice)topicPostOffice).bindClusteredQueue(jmsDestination.getName(), (LocalClusteredQueue)q);    
+               }       
             }
             else
             {
@@ -608,6 +621,8 @@
                new PagingFilteredQueue(dest.getName(), idm.getId(), ms, pm, true, false,
                                        executor, null, fullSize, pageSize, downCacheSize);
             
+            
+            
             //Make a binding for this queue
             queuePostOffice.bindQueue(dest.getName(), q);  
          }         

Modified: trunk/src/main/org/jboss/messaging/core/ChannelSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/ChannelSupport.java	2006-09-23 14:21:10 UTC (rev 1360)
+++ trunk/src/main/org/jboss/messaging/core/ChannelSupport.java	2006-09-23 18:04:00 UTC (rev 1361)
@@ -1133,13 +1133,12 @@
             {               
                receiversReady = true;
                
-               deliverInternal(false);                  
-               
-               if (result != null)
-               {
-                  result.setResult(null);
-               }
+               deliverInternal(false);                     
             }
+            if (result != null)
+            {
+               result.setResult(null);
+            }
          }
          catch (Throwable t)
          {

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java	2006-09-23 14:21:10 UTC (rev 1360)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java	2006-09-23 18:04:00 UTC (rev 1361)
@@ -48,7 +48,6 @@
 import org.jboss.messaging.core.plugin.contract.MessageStore;
 import org.jboss.messaging.core.plugin.contract.PersistenceManager;
 import org.jboss.messaging.core.plugin.contract.PostOffice;
-import org.jboss.messaging.core.plugin.postoffice.cluster.LocalClusteredQueue;
 import org.jboss.messaging.core.tx.Transaction;
 import org.jboss.messaging.core.tx.TransactionRepository;
 

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java	2006-09-23 14:21:10 UTC (rev 1360)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java	2006-09-23 18:04:00 UTC (rev 1361)
@@ -131,6 +131,8 @@
    private StatsSender statsSender;
    
    private long statsSendPeriod;
+   
+   private boolean started;
       
    public DefaultClusteredPostOffice()
    {        
@@ -241,7 +243,7 @@
    // MessagingComponent overrides
    // --------------------------------------------------------------
    
-   public void start() throws Exception
+   public synchronized void start() throws Exception
    {
       if (syncChannelConfigE != null)
       {        
@@ -286,9 +288,11 @@
       syncSendRequest(new SendNodeIdRequest(currentAddress, nodeId));           
       
       statsSender.start();
+      
+      started = true;      
    }
 
-   public void stop() throws Exception
+   public synchronized void stop() throws Exception
    {
       super.stop();
       
@@ -297,6 +301,8 @@
       syncChannel.close();
       
       asyncChannel.close();
+      
+      started = false;
    }  
    
    // PostOffice implementation ---------------------------------------        
@@ -752,8 +758,13 @@
       }
    }
    
-   public void sendQueueStats() throws Exception
+   public synchronized void sendQueueStats() throws Exception
    {
+      if (!started)
+      {
+         return;
+      }
+      
       lock.readLock().acquire();
       
       List statsList = null;      

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultRouterFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultRouterFactory.java	2006-09-23 14:21:10 UTC (rev 1360)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultRouterFactory.java	2006-09-23 18:04:00 UTC (rev 1361)
@@ -21,7 +21,6 @@
  */
 package org.jboss.messaging.core.plugin.postoffice.cluster;
 
-import org.jboss.messaging.core.Router;
 
 
 /**

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java	2006-09-23 14:21:10 UTC (rev 1360)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java	2006-09-23 18:04:00 UTC (rev 1361)
@@ -28,7 +28,6 @@
 
 import org.jboss.logging.Logger;
 import org.jboss.messaging.core.Delivery;
-import org.jboss.messaging.core.DeliveryObserver;
 import org.jboss.messaging.core.Filter;
 import org.jboss.messaging.core.MessageReference;
 import org.jboss.messaging.core.SimpleDelivery;

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesRequest.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesRequest.java	2006-09-23 14:21:10 UTC (rev 1360)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesRequest.java	2006-09-23 18:04:00 UTC (rev 1361)
@@ -171,6 +171,8 @@
          //since they would have been acked on the pulling node
          LocalClusteredQueue queue = (LocalClusteredQueue)del.getObserver();
          
+         log.info("i am committing request");
+         
          queue.acknowledgeFromCluster(del);
       }
    }

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesResponse.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesResponse.java	2006-09-23 14:21:10 UTC (rev 1360)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesResponse.java	2006-09-23 18:04:00 UTC (rev 1361)
@@ -23,7 +23,6 @@
 
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
-import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
@@ -41,7 +40,7 @@
  * $Id$
  *
  */
-public class PullMessagesResponse implements Streamable, Serializable
+public class PullMessagesResponse implements Streamable
 {
    private List messages;
    

Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/base/ClusteringTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/base/ClusteringTestBase.java	2006-09-23 14:21:10 UTC (rev 1360)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/base/ClusteringTestBase.java	2006-09-23 18:04:00 UTC (rev 1361)
@@ -269,7 +269,7 @@
          list.add(msg);
       }
       
-      Thread.sleep(1000);
+      Thread.sleep(2000);
       
       return list;
    }

Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeTest.java	2006-09-23 14:21:10 UTC (rev 1360)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeTest.java	2006-09-23 18:04:00 UTC (rev 1361)
@@ -21,7 +21,6 @@
   */
 package org.jboss.test.messaging.core.plugin.postoffice.cluster;
 
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
@@ -29,10 +28,8 @@
 import org.jboss.messaging.core.FilterFactory;
 import org.jboss.messaging.core.Message;
 import org.jboss.messaging.core.MessageReference;
-import org.jboss.messaging.core.Queue;
 import org.jboss.messaging.core.local.PagingFilteredQueue;
 import org.jboss.messaging.core.plugin.contract.ClusteredPostOffice;
-import org.jboss.messaging.core.plugin.contract.PostOffice;
 import org.jboss.messaging.core.plugin.postoffice.Binding;
 import org.jboss.messaging.core.plugin.postoffice.cluster.ClusterRouterFactory;
 import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultClusteredPostOffice;
@@ -550,19 +547,11 @@
          {
             //ok
          }
+          
+         office1.unbindClusteredQueue("queue1");
          
-         try
-         {
-            office1.unbindQueue("queue1");
-            fail();
-         }
-         catch (Exception e)
-         {
-            //ok
-         }
+         //It should be possible to bind local queues into a clustered post office
          
-         office1.unbindClusteredQueue("queue1");
-         
          PagingFilteredQueue queue7 = new PagingFilteredQueue("queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);       
          Binding binding7 = office1.bindQueue("queue1", queue7);
          
@@ -848,17 +837,15 @@
          if (office1 != null)
          {
             try
-            {
-               office2.unbindClusteredQueue("sub5");
+            {              
                office1.unbindClusteredQueue("sub7");
-               office1.unbindClusteredQueue("sub8");
-               office2.unbindClusteredQueue("sub13");
+               office1.unbindClusteredQueue("sub8");               
                office1.unbindClusteredQueue("sub15");
                office1.unbindClusteredQueue("sub16");
             }
             catch (Exception ignore)
             {
-               
+               ignore.printStackTrace();
             }
             
             office1.stop();
@@ -866,6 +853,15 @@
          
          if (office2 != null)
          {
+            try
+            {
+               office2.unbindClusteredQueue("sub5");
+               office2.unbindClusteredQueue("sub13");
+            }
+            catch (Exception ignore)
+            {     
+               ignore.printStackTrace();
+            }
             office2.stop();
          }
          
@@ -976,8 +972,6 @@
          this.checkEmpty(receiver4);
          checkContainsAndAcknowledge(msg, receiver5, queue5);
          
-         log.info("************* ROOTING");
-         
          msg = CoreMessageFactory.createCoreMessage(1, persistentMessage, null);      
          ref = ms.reference(msg);         
          routed = office6.route(ref, "queue1", null);         
@@ -989,7 +983,6 @@
          
          Thread.sleep(1000);
          
-         log.info("checking");
          checkContainsAndAcknowledge(msg, receiver1, queue1);
          this.checkEmpty(receiver1);
          this.checkEmpty(receiver2);         
@@ -1174,12 +1167,10 @@
          sharedDurable5.add(receiver14);
          
          
-         //Send 3 messages at node1
+         //Send 1 message at node1
          //========================
          
-         log.info("******** sending");
-         List msgs = sendMessages("topic", persistent, office1, 3, null);
-         log.info("********** sent");
+         List msgs = sendMessages("topic", persistent, office1, 1, null);
          
          //n2
          checkContainsAndAcknowledge(msgs, receiver1, nonDurable1);
@@ -1208,10 +1199,10 @@
          checkEmpty(receiver12);
          
          
-         //Send 3 messages at node2
+         //Send 1 message at node2
          //========================
          
-         msgs = sendMessages("topic", persistent, office2, 3, null);
+         msgs = sendMessages("topic", persistent, office2, 1, null);
          
          //n2
          checkContainsAndAcknowledge(msgs, receiver1, nonDurable1);
@@ -1239,10 +1230,10 @@
          //n7
          checkEmpty(receiver12);
          
-         //Send 3 messages at node3
+         //Send 1 message at node3
          //========================
          
-         msgs = sendMessages("topic", persistent, office3, 3, null);
+         msgs = sendMessages("topic", persistent, office3, 1, null);
          
          //n2
          checkContainsAndAcknowledge(msgs, receiver1, nonDurable1);
@@ -1270,18 +1261,10 @@
          //n7
          checkEmpty(receiver12);     
          
-         //Send 3 messages at node4
+         //Send 1 message at node4
          //========================
-         
-//         * node1: no subscriptions
-//         * node2: 2 non durable
-//         * node3: 1 non shared durable, 1 non durable
-//         * node4: 1 shared durable (shared1), 1 non shared durable, 3 non durable
-//         * node5: 2 shared durable (shared1 and shared2)
-//         * node6: 1 shared durable (shared2), 1 non durable
-//         * node7: 1 shared durable (shared2)
-         
-         msgs = sendMessages("topic", persistent, office4, 3, null);
+             
+         msgs = sendMessages("topic", persistent, office4, 1, null);
                
          //n2
          checkContainsAndAcknowledge(msgs, receiver1, nonDurable1);
@@ -1309,10 +1292,10 @@
          //n7
          checkEmpty(receiver12);
          
-         //Send 3 messages at node5
+         //Send 1 message at node5
          //========================
          
-         msgs = sendMessages("topic", persistent, office5, 3, null);
+         msgs = sendMessages("topic", persistent, office5, 1, null);
              
          //n2
          checkContainsAndAcknowledge(msgs, receiver1, nonDurable1);
@@ -1340,10 +1323,10 @@
          //n7
          checkEmpty(receiver12);
          
-         //Send 3 messages at node6
+         //Send 1 message at node6
          //========================
          
-         msgs = sendMessages("topic", persistent, office6, 3, null);
+         msgs = sendMessages("topic", persistent, office6, 1, null);
              
          //n2
          checkContainsAndAcknowledge(msgs, receiver1, nonDurable1);
@@ -1372,10 +1355,10 @@
          //n7
          checkEmpty(receiver12);
          
-         //Send 3 messages at node7
+         //Send 1 message at node7
          //========================
          
-         msgs = sendMessages("topic", persistent, office7, 3, null);
+         msgs = sendMessages("topic", persistent, office7, 1, null);
 
          //n2
          checkContainsAndAcknowledge(msgs, receiver1, nonDurable1);
@@ -1423,7 +1406,8 @@
                office3.unbindClusteredQueue("nonshareddurable1");
             }
             catch (Exception ignore)
-            {               
+            {   
+               ignore.printStackTrace();
             }
             office3.stop();
          }
@@ -1436,7 +1420,8 @@
                office4.unbindClusteredQueue("nonshareddurable2");
             }
             catch (Exception ignore)
-            {               
+            {           
+               ignore.printStackTrace();
             }
             office4.stop();
          }
@@ -1450,6 +1435,7 @@
             }
             catch (Exception ignore)
             {               
+               ignore.printStackTrace();
             }
             office5.stop();
          }
@@ -1462,6 +1448,7 @@
             }
             catch (Exception ignore)
             {               
+               ignore.printStackTrace();
             }
             office6.stop();
          }
@@ -1470,10 +1457,11 @@
          {      
             try
             {
-               office6.unbindClusteredQueue("shareddurable2");
+               office7.unbindClusteredQueue("shareddurable2");
             }
             catch (Exception ignore)
             {               
+               ignore.printStackTrace();
             }
             office7.stop();
          }
@@ -2069,17 +2057,33 @@
       {
          if (office1 != null)
          {
-            office2.unbindClusteredQueue("sub5");
-            office1.unbindClusteredQueue("sub7");
-            office1.unbindClusteredQueue("sub8");
-            office2.unbindClusteredQueue("sub13");
-            office1.unbindClusteredQueue("sub15");
-            office1.unbindClusteredQueue("sub16");
+            try
+            {
+               office1.unbindClusteredQueue("sub7");
+               office1.unbindClusteredQueue("sub8");           
+               office1.unbindClusteredQueue("sub15");
+               office1.unbindClusteredQueue("sub16");
+            }
+            catch (Exception ignore)
+            {
+               ignore.printStackTrace();
+            }
+                        
             office1.stop();
          }
          
          if (office2 != null)
          {
+            try
+            {
+               office2.unbindClusteredQueue("sub5");
+               office2.unbindClusteredQueue("sub13");
+            }
+            catch (Exception ignore)
+            {
+               ignore.printStackTrace();
+            }
+            
             office2.stop();
          }
          

Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionTest.java	2006-09-23 14:21:10 UTC (rev 1360)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionTest.java	2006-09-23 18:04:00 UTC (rev 1361)
@@ -21,8 +21,6 @@
   */
 package org.jboss.test.messaging.core.plugin.postoffice.cluster;
 
-import java.util.List;
-
 import org.jboss.messaging.core.Delivery;
 import org.jboss.messaging.core.DeliveryObserver;
 import org.jboss.messaging.core.FilterFactory;
@@ -71,16 +69,16 @@
       super.tearDown();
    }
    
-   public void testRedist() throws Throwable
+   public void testRedistNonPersistent() throws Throwable
    {
+      redistTest(false);
+   }
+   
+   public void testRedistPersistent() throws Throwable
+   {
       redistTest(true);
    }
    
-   /*
-    * 
-    * 
-    * 
-    */
    public void redistTest(boolean persistent) throws Throwable
    {
       ClusteredPostOffice office1 = null;
@@ -137,20 +135,20 @@
          
          //Check the sizes
           
-         List msgs = queue1.browse();
-         assertEquals(30, msgs.size());
+         assertEquals(30, queue1.memoryRefCount());
+         assertEquals(0, queue1.memoryDeliveryCount());
          
-         msgs = queue2.browse();
-         assertEquals(30, msgs.size());
+         assertEquals(30, queue2.memoryRefCount());
+         assertEquals(0, queue2.memoryDeliveryCount());
            
-         msgs = queue3.browse();
-         assertEquals(30, msgs.size());
+         assertEquals(30, queue3.memoryRefCount());
+         assertEquals(0, queue3.memoryDeliveryCount());
          
-         msgs = queue4.browse();
-         assertEquals(30, msgs.size());
+         assertEquals(30, queue4.memoryRefCount());
+         assertEquals(0, queue4.memoryDeliveryCount());
          
-         msgs = queue5.browse();
-         assertEquals(30, msgs.size());
+         assertEquals(30, queue5.memoryRefCount());
+         assertEquals(0, queue5.memoryDeliveryCount());
          
          //Now we add the receivers
          //Note that we did not do this before the send.
@@ -184,44 +182,53 @@
          Thread.sleep(1000);
          
          //Now we check the sizes again in case automatic balancing has erroneously
-         //kicked in
+         //kicked in                  
          
-         msgs = queue1.browse();
-         assertEquals(30, msgs.size());
+         assertEquals(29, queue1.memoryRefCount());
+         assertEquals(1, queue1.memoryDeliveryCount());
          
-         msgs = queue2.browse();
-         assertEquals(30, msgs.size());
+         assertEquals(29, queue2.memoryRefCount());
+         assertEquals(1, queue2.memoryDeliveryCount());
            
-         msgs = queue3.browse();
-         assertEquals(30, msgs.size());
+         assertEquals(29, queue3.memoryRefCount());
+         assertEquals(1, queue3.memoryDeliveryCount());
          
-         msgs = queue4.browse();
-         assertEquals(30, msgs.size());
+         assertEquals(29, queue4.memoryRefCount());
+         assertEquals(1, queue4.memoryDeliveryCount());
          
-         msgs = queue5.browse();
-         assertEquals(30, msgs.size());
+         assertEquals(29, queue5.memoryRefCount());
+         assertEquals(1, queue5.memoryDeliveryCount());
          
          Thread.sleep(5000);
          
          //And again - should still be no redistribution
          
-         msgs = queue1.browse();
-         assertEquals(30, msgs.size());
+         assertEquals(29, queue1.memoryRefCount());
+         assertEquals(1, queue1.memoryDeliveryCount());
          
-         msgs = queue2.browse();
-         assertEquals(30, msgs.size());
+         assertEquals(29, queue2.memoryRefCount());
+         assertEquals(1, queue2.memoryDeliveryCount());
            
-         msgs = queue3.browse();
-         assertEquals(30, msgs.size());
+         assertEquals(29, queue3.memoryRefCount());
+         assertEquals(1, queue3.memoryDeliveryCount());
          
-         msgs = queue4.browse();
-         assertEquals(30, msgs.size());
+         assertEquals(29, queue4.memoryRefCount());
+         assertEquals(1, queue4.memoryDeliveryCount());
          
-         msgs = queue5.browse();
-         assertEquals(30, msgs.size());
+         assertEquals(29, queue5.memoryRefCount());
+         assertEquals(1, queue5.memoryDeliveryCount());
          
-         //Try and consumer them all via one receiver
+         Thread.sleep(2000);
          
+         log.info("Here are the sizes:");         
+         log.info("queue1, refs:" + queue1.memoryRefCount() + " dels:" + queue1.memoryDeliveryCount());
+         log.info("queue2, refs:" + queue2.memoryRefCount() + " dels:" + queue2.memoryDeliveryCount());         
+         log.info("queue3, refs:" + queue3.memoryRefCount() + " dels:" + queue3.memoryDeliveryCount());         
+         log.info("queue4, refs:" + queue4.memoryRefCount() + " dels:" + queue4.memoryDeliveryCount());         
+         log.info("queue5, refs:" + queue5.memoryRefCount() + " dels:" + queue5.memoryDeliveryCount());
+         
+         
+         
          log.info("trying to consume");
          
          //So we have 150 messages in total - 30 on each node.
@@ -236,87 +243,79 @@
          
          //Consume 10 on node 4
          
-         //Consume 5 on node 5
+         //We leave the last 5 since they will be as deliveries in the receivers probably
+         
+         Delivery del;
                   
          log.info("consuming queue1");
          for (int i = 0; i < 10; i++)
          {       
             queue1.deliver(true);
-            Delivery del = receiver1.getDelivery();
+            del = receiver1.getDelivery();
             log.info("Got delivery: " + del.getReference().getMessageID());
             del.acknowledge(null);  
          }
          log.info("consumed queue1");
          
+         log.info("Here are the sizes:");  
+         
+         log.info("queue1, refs:" + queue1.memoryRefCount() + " dels:" + queue1.memoryDeliveryCount());
+         log.info("queue2, refs:" + queue2.memoryRefCount() + " dels:" + queue2.memoryDeliveryCount());         
+         log.info("queue3, refs:" + queue3.memoryRefCount() + " dels:" + queue3.memoryDeliveryCount());         
+         log.info("queue4, refs:" + queue4.memoryRefCount() + " dels:" + queue4.memoryDeliveryCount());         
+         log.info("queue5, refs:" + queue5.memoryRefCount() + " dels:" + queue5.memoryDeliveryCount());
+                  
          log.info("consuming queue2");
          for (int i = 0; i < 50; i++)
          {       
             queue2.deliver(true);
-            Delivery del = receiver2.getDelivery();
+            del = receiver2.getDelivery();
             log.info("Got delivery: " + del.getReference().getMessageID());
             del.acknowledge(null);  
          }
+         log.info("consumed queue2");
          
+         log.info("Here are the sizes:");         
+         log.info("queue1, refs:" + queue1.memoryRefCount() + " dels:" + queue1.memoryDeliveryCount());
+         log.info("queue2, refs:" + queue2.memoryRefCount() + " dels:" + queue2.memoryDeliveryCount());         
+         log.info("queue3, refs:" + queue3.memoryRefCount() + " dels:" + queue3.memoryDeliveryCount());         
+         log.info("queue4, refs:" + queue4.memoryRefCount() + " dels:" + queue4.memoryDeliveryCount());         
+         log.info("queue5, refs:" + queue5.memoryRefCount() + " dels:" + queue5.memoryDeliveryCount());
+         
          log.info("consuming queue3");
          for (int i = 0; i < 75; i++)
          {       
             queue3.deliver(true);
-            Delivery del = receiver3.getDelivery();
+            del = receiver3.getDelivery();
             log.info("Got delivery: " + del.getReference().getMessageID());
             del.acknowledge(null);  
          }
+         log.info("consumed queue3");
          
+         log.info("Here are the sizes:");         
+         log.info("queue1, refs:" + queue1.memoryRefCount() + " dels:" + queue1.memoryDeliveryCount());
+         log.info("queue2, refs:" + queue2.memoryRefCount() + " dels:" + queue2.memoryDeliveryCount());         
+         log.info("queue3, refs:" + queue3.memoryRefCount() + " dels:" + queue3.memoryDeliveryCount());         
+         log.info("queue4, refs:" + queue4.memoryRefCount() + " dels:" + queue4.memoryDeliveryCount());         
+         log.info("queue5, refs:" + queue5.memoryRefCount() + " dels:" + queue5.memoryDeliveryCount());
+         
          log.info("consuming queue4");
          for (int i = 0; i < 10; i++)
          {       
             queue4.deliver(true);
-            Delivery del = receiver4.getDelivery();
+            del = receiver4.getDelivery();
             log.info("Got delivery: " + del.getReference().getMessageID());
             del.acknowledge(null);  
          }
+         log.info("consumed queue4");
          
-         Thread.sleep(2000);
+         log.info("Here are the sizes:");         
+         log.info("queue1, refs:" + queue1.memoryRefCount() + " dels:" + queue1.memoryDeliveryCount());
+         log.info("queue2, refs:" + queue2.memoryRefCount() + " dels:" + queue2.memoryDeliveryCount());         
+         log.info("queue3, refs:" + queue3.memoryRefCount() + " dels:" + queue3.memoryDeliveryCount());         
+         log.info("queue4, refs:" + queue4.memoryRefCount() + " dels:" + queue4.memoryDeliveryCount());         
+         log.info("queue5, refs:" + queue5.memoryRefCount() + " dels:" + queue5.memoryDeliveryCount());
          
-         log.info("Here are the sizes:");
-         
-         msgs = queue1.browse();
-         log.info("queue1: " + msgs.size());
-         
-         msgs = queue2.browse();
-         log.info("queue2: " + msgs.size());
-           
-         msgs = queue3.browse();
-         log.info("queue3: " + msgs.size());
-         
-         msgs = queue4.browse();
-         log.info("queue4: " + msgs.size());
-         
-         msgs = queue5.browse();
-         log.info("queue5: " + msgs.size());
-         
-         log.info("consuming queue5");
-         for (int i = 0; i < 5; i++)
-         {       
-            queue5.deliver(true);
-            Delivery del = receiver5.getDelivery();
-            log.info("Got delivery: " + del.getReference().getMessageID());
-            del.acknowledge(null);  
-         }
-         
-         msgs = queue1.browse();
-         assertEquals(0, msgs.size());
-         
-         msgs = queue2.browse();
-         assertEquals(0, msgs.size());
-           
-         msgs = queue3.browse();
-         assertEquals(0, msgs.size());
-         
-         msgs = queue4.browse();
-         assertEquals(0, msgs.size());
-         
-         msgs = queue5.browse();
-         assertEquals(0, msgs.size());
       }
       finally
       { 




More information about the jboss-cvs-commits mailing list