[jboss-cvs] JBoss Messaging SVN: r1502 - in trunk: src/etc/server/default/deploy src/main/org/jboss/messaging/core src/main/org/jboss/messaging/core/plugin tests tests/src/org/jboss/test/messaging/core/plugin/postoffice tests/src/org/jboss/test/messaging/jms tests/src/org/jboss/test/messaging/jms/manual tests/src/org/jboss/test/messaging/tools/jmx

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Oct 19 18:01:57 EDT 2006


Author: timfox
Date: 2006-10-19 18:01:47 -0400 (Thu, 19 Oct 2006)
New Revision: 1502

Added:
   trunk/tests/src/org/jboss/test/messaging/jms/manual/
   trunk/tests/src/org/jboss/test/messaging/jms/manual/ManualClusteringTest.java
   trunk/tests/src/org/jboss/test/messaging/jms/manual/ManualPagingSoakTest.java
Removed:
   trunk/tests/src/org/jboss/test/messaging/jms/ManualClusteringTest.java
Modified:
   trunk/src/etc/server/default/deploy/clustered-mysql-persistence-service.xml
   trunk/src/etc/server/default/deploy/mysql-persistence-service.xml
   trunk/src/main/org/jboss/messaging/core/PagingChannelSupport.java
   trunk/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java
   trunk/tests/build.xml
   trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/DefaultPostOfficeTest.java
   trunk/tests/src/org/jboss/test/messaging/tools/jmx/ServiceContainer.java
Log:
Alpha2 interim commit



Modified: trunk/src/etc/server/default/deploy/clustered-mysql-persistence-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/clustered-mysql-persistence-service.xml	2006-10-19 19:30:58 UTC (rev 1501)
+++ trunk/src/etc/server/default/deploy/clustered-mysql-persistence-service.xml	2006-10-19 22:01:47 UTC (rev 1502)
@@ -43,7 +43,7 @@
 INSERT_MESSAGE=INSERT INTO JMS_MESSAGE (MESSAGEID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, COREHEADERS, PAYLOAD, CHANNELCOUNT, TYPE, JMSTYPE, CORRELATIONID, CORRELATIONID_BYTES, DESTINATION, REPLYTO, JMSPROPERTIES) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
 INC_CHANNELCOUNT=UPDATE JMS_MESSAGE SET CHANNELCOUNT = CHANNELCOUNT + 1 WHERE MESSAGEID=?
 DEC_CHANNELCOUNT=UPDATE JMS_MESSAGE SET CHANNELCOUNT = CHANNELCOUNT - 1 WHERE MESSAGEID=?
-DELETE_MESSAGE=DELETE FROM JMS_MESSAGE WHERE MESSAGEID=?
+DELETE_MESSAGE=DELETE FROM JMS_MESSAGE WHERE MESSAGEID=? AND CHANNELCOUNT=0
 MESSAGEID_COLUMN=MESSAGEID
 MESSAGE_EXISTS=SELECT MESSAGEID FROM JMS_MESSAGE WHERE MESSAGEID = ? FOR UPDATE
 INSERT_TRANSACTION=INSERT INTO JMS_TRANSACTION (TRANSACTIONID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID) VALUES(?, ?, ?, ?)

Modified: trunk/src/etc/server/default/deploy/mysql-persistence-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/mysql-persistence-service.xml	2006-10-19 19:30:58 UTC (rev 1501)
+++ trunk/src/etc/server/default/deploy/mysql-persistence-service.xml	2006-10-19 22:01:47 UTC (rev 1502)
@@ -43,7 +43,7 @@
    INSERT_MESSAGE=INSERT INTO JMS_MESSAGE (MESSAGEID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, COREHEADERS, PAYLOAD, CHANNELCOUNT, TYPE, JMSTYPE, CORRELATIONID, CORRELATIONID_BYTES, DESTINATION, REPLYTO, JMSPROPERTIES) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
    INC_CHANNELCOUNT=UPDATE JMS_MESSAGE SET CHANNELCOUNT = CHANNELCOUNT + 1 WHERE MESSAGEID=?
    DEC_CHANNELCOUNT=UPDATE JMS_MESSAGE SET CHANNELCOUNT = CHANNELCOUNT - 1 WHERE MESSAGEID=?
-   DELETE_MESSAGE=DELETE FROM JMS_MESSAGE WHERE MESSAGEID=?
+   DELETE_MESSAGE=DELETE FROM JMS_MESSAGE WHERE MESSAGEID=? AND CHANNELCOUNT=0
    MESSAGEID_COLUMN=MESSAGEID
    MESSAGE_EXISTS=SELECT MESSAGEID FROM JMS_MESSAGE WHERE MESSAGEID = ? FOR UPDATE
    INSERT_TRANSACTION=INSERT INTO JMS_TRANSACTION (TRANSACTIONID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID) VALUES(?, ?, ?, ?)

Modified: trunk/src/main/org/jboss/messaging/core/PagingChannelSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/PagingChannelSupport.java	2006-10-19 19:30:58 UTC (rev 1501)
+++ trunk/src/main/org/jboss/messaging/core/PagingChannelSupport.java	2006-10-19 22:01:47 UTC (rev 1502)
@@ -22,6 +22,7 @@
 package org.jboss.messaging.core;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -302,12 +303,11 @@
    protected void loadPagedReferences(long number) throws Exception
    {
       if (trace) { log.trace(this + " Loading " + number + " paged references from storage"); }
-
+  
       // Must flush the down cache first
       flushDownCache();
+           List refInfos = pm.getPagedReferenceInfos(channelID, firstPagingOrder, number);      
       
-      List refInfos = pm.getPagedReferenceInfos(channelID, firstPagingOrder, number);
-      
       Map refMap = processReferences(refInfos);
 
       boolean loadedReliable = false;
@@ -336,7 +336,7 @@
             unreliableNumber++;
          }
       }
-
+      
       if (!toRemove.isEmpty())
       {
          // Now we remove the references we loaded (only the non persistent or persistent in a non-recoverable store)
@@ -401,12 +401,12 @@
    
    private boolean checkLoad() throws Exception
    {
-      long refNum = downCache.size() + nextPagingOrder - firstPagingOrder;
+      long refNum = nextPagingOrder - firstPagingOrder;
       
       if (refNum > 0)
       {
          long numberLoadable = Math.min(refNum, pageSize);
-
+         
          if (messageRefs.size() <= fullSize - numberLoadable)
          {
             //This will flush the down cache too
@@ -465,7 +465,7 @@
 
       //If cancelling then the ref is supposed to go back on the front of the queue segment in storage
       //so we set the page ordering to be firstPageOrdering - 1
-      //If not cancelling, then the ref should go on the end of the quueue in storage so
+      //If not cancelling, then the ref should go on the end of the queue in storage so
       //we set the page ordering to be nextPageOrdering
       
       if (cancelling)
@@ -509,7 +509,7 @@
       while (iter.hasNext())
       {
          MessageReference ref = (MessageReference) iter.next();
-         
+           
          if (ref.isReliable() && recoverable)
          {
             toUpdate.add(ref);
@@ -519,7 +519,7 @@
             toAdd.add(ref);
          }
       }
-
+      
       if (!toAdd.isEmpty())
       {
          pm.pageReferences(channelID, toAdd, true);
@@ -608,9 +608,10 @@
          if (messages.size() != msgIdsToLoad.size())
          {
             // Sanity check
+            
             throw new IllegalStateException("Did not load correct number of messages, wanted:" +
                                             msgIdsToLoad.size() + " but got:" +
-                                            messages.size());
+                                            messages.size());            
          }
 
          // Create references for these messages and add them to the reference map

Modified: trunk/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java	2006-10-19 19:30:58 UTC (rev 1501)
+++ trunk/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java	2006-10-19 22:01:47 UTC (rev 1502)
@@ -138,7 +138,10 @@
       }
       finally
       {
-         conn.close();
+         if (conn != null)
+         {
+            conn.close();
+         }
       }
         
       //We can't remnove unreliable data since it might introduce holes into the paging order
@@ -155,7 +158,7 @@
    // PersistenceManager implementation -------------------------
    
    // Related to counters
-   // ==================
+   // ===================
    
    public long reserveIDBlock(String counterName, int size) throws Exception
    {
@@ -261,103 +264,7 @@
    }
          
    // Related to paging functionality
-   // ===============================
-      
-   public void updateReliableReferencesNotPagedInRange(long channelID, long orderStart, long orderEnd, long num) throws Exception
-   {
-      if (trace) { log.trace("Updating reliable references for channel " + channelID + " between " + orderStart + " and " + orderEnd); }
-      
-      Connection conn = null;
-      PreparedStatement ps = null;
-      TransactionWrapper wrap = new TransactionWrapper();
-
-      final int MAX_TRIES = 25;      
-      
-      try
-      {
-         conn = ds.getConnection();
-         
-         ps = conn.prepareStatement(getSQLStatement("UPDATE_RELIABLE_REFS_NOT_PAGED"));
-                 
-         ps.setLong(1, orderStart);
-         
-         ps.setLong(2, orderEnd);
-         
-         ps.setLong(3, channelID);
-         
-         int tries = 0;
-         
-         while (true)
-         {
-            try
-            {
-               int rows = ps.executeUpdate();
-                 
-               if (trace) { log.trace(JDBCUtil.statementToString(getSQLStatement("UPDATE_RELIABLE_REFS_NOT_PAGED"), new Long(channelID),
-                                      new Long(orderStart), new Long(orderEnd)) + " updated " + rows + " rows"); }
-               if (tries > 0)
-               {
-                  log.warn("Update worked after retry");
-               }
-               
-               //Sanity check
-               if (rows != num)
-               {
-                  throw new IllegalStateException("Did not update correct number of rows");
-               }
-               
-               break;
-            }
-            catch (SQLException e)
-            {
-               log.warn("SQLException caught - assuming deadlock detected, try:" + (tries + 1), e);
-               
-               tries++;
-               
-               if (tries == MAX_TRIES)
-               {
-                  log.error("Retried " + tries + " times, now giving up");
-                  
-                  throw new IllegalStateException("Failed to update references");
-               }
-               
-               log.warn("Trying again after a pause");
-               
-               //Now we wait for a random amount of time to minimise risk of deadlock occurring again
-               Thread.sleep((long)(Math.random() * 500));
-            }  
-         }
-      }
-      catch (Exception e)
-      {
-         wrap.exceptionOccurred();
-         throw e;
-      }
-      finally
-      {
-         if (ps != null)
-         {
-            try
-            {
-               ps.close();
-            }
-            catch (Throwable e)
-            {
-            }
-         }
-         if (conn != null)
-         {
-            try
-            {
-               conn.close();
-            }
-            catch (Throwable e)
-            {
-            }
-         }
-         wrap.end();
-      }
-   }         
+   // ===============================         
         
    /*
     * Retrieve a List of messages corresponding to the specified List of message ids.
@@ -561,7 +468,8 @@
          wrap.end();
       }
    }  
-               
+   
+       
    public void pageReferences(long channelID, List references, boolean paged) throws Exception
    {  
       Connection conn = null;
@@ -635,7 +543,7 @@
             //Maybe we need to persist the message itself
             Message m = ref.getMessage();
             
-            //In a paging situation, we cannot use the persisted flag on the messager to determine whether
+            //In a paging situation, we cannot use the persisted flag on the message to determine whether
             //to insert the message or not.
             //This is because a channel (possibly on another node) may be paging too and referencing
             //the same message, and might have removed the message independently, the other
@@ -655,23 +563,22 @@
             //this, this is another reason why we cannot use HSQL in a clustered environment
             //since it does not have a for update equivalent
             
+            boolean added;
+            
             psMessageExists = conn.prepareStatement(getSQLStatement("MESSAGE_EXISTS"));
             
             psMessageExists.setLong(1, m.getMessageID());
             
             rsMessageExists = psMessageExists.executeQuery();
-            
-            boolean added;
-            
+             
             if (rsMessageExists.next())
             {
                //Message exists
                
                // Update the message with the new channel count
                incrementChannelCount(m, psUpdateMessage);
-               
-               added = false;
-               
+                  
+               added = false;              
             }
             else
             {
@@ -679,7 +586,7 @@
                storeMessage(m, psInsertMessage);
                
                added = true;
-            }            
+            }    
             
             if (usingBatchUpdates)
             {
@@ -706,9 +613,9 @@
                   }
                }
                else
-               {
+               {               
                   int rows = psUpdateMessage.executeUpdate();
-                 
+                  
                   if (trace)
                   {
                      log.trace("Updated " + rows + " rows");
@@ -816,7 +723,7 @@
       PreparedStatement psDeleteMessage = null;
       PreparedStatement psUpdateMessage = null;
       TransactionWrapper wrap = new TransactionWrapper();
-      
+        
       //We order the references
       orderReferences(references);
              
@@ -870,10 +777,11 @@
             Message m = ref.getMessage();
                                     
             //Maybe we need to delete the message itself
-            
+              
             //Update the message with the new channel count
             decrementChannelCount(m, psUpdateMessage);
             
+
             //Run the remove message update
             removeMessage(m, psDeleteMessage);
                         
@@ -884,20 +792,21 @@
                psDeleteMessage.addBatch();
             }
             else
-            {
+            {  
                int rows = psUpdateMessage.executeUpdate();
-                              
+                                                 
                if (trace) { log.trace("Updated " + rows + " rows"); }
                
                rows = psDeleteMessage.executeUpdate();
-               
+        
                if (trace) { log.trace("Deleted " + rows + " rows"); }
             
                psDeleteMessage.close();
                psDeleteMessage = null;
                psUpdateMessage.close();
                psUpdateMessage = null;
-            }      
+            }  
+            
          }         
          
          if (usingBatchUpdates)
@@ -980,11 +889,197 @@
          }         
       }      
    }
+   
+   public void updateReliableReferencesNotPagedInRange(long channelID, long orderStart, long orderEnd, long num) throws Exception
+   {
+      if (trace) { log.trace("Updating reliable references for channel " + channelID + " between " + orderStart + " and " + orderEnd); }
       
+      Connection conn = null;
+      PreparedStatement ps = null;
+      TransactionWrapper wrap = new TransactionWrapper();
+
+      final int MAX_TRIES = 25;      
+      
+      try
+      {
+         conn = ds.getConnection();
+         
+         ps = conn.prepareStatement(getSQLStatement("UPDATE_RELIABLE_REFS_NOT_PAGED"));
+                 
+         ps.setLong(1, orderStart);
+         
+         ps.setLong(2, orderEnd);
+         
+         ps.setLong(3, channelID);
+         
+         int tries = 0;
+         
+         while (true)
+         {
+            try
+            {
+               int rows = ps.executeUpdate();
+                 
+               if (trace) { log.trace(JDBCUtil.statementToString(getSQLStatement("UPDATE_RELIABLE_REFS_NOT_PAGED"), new Long(channelID),
+                                      new Long(orderStart), new Long(orderEnd)) + " updated " + rows + " rows"); }
+               if (tries > 0)
+               {
+                  log.warn("Update worked after retry");
+               }
+               
+               //Sanity check
+               if (rows != num)
+               {
+                  throw new IllegalStateException("Did not update correct number of rows");
+               }
+               
+               break;
+            }
+            catch (SQLException e)
+            {
+               log.warn("SQLException caught - assuming deadlock detected, try:" + (tries + 1), e);
+               
+               tries++;
+               
+               if (tries == MAX_TRIES)
+               {
+                  log.error("Retried " + tries + " times, now giving up");
+                  
+                  throw new IllegalStateException("Failed to update references");
+               }
+               
+               log.warn("Trying again after a pause");
+               
+               //Now we wait for a random amount of time to minimise risk of deadlock occurring again
+               Thread.sleep((long)(Math.random() * 500));
+            }  
+         }
+      }
+      catch (Exception e)
+      {
+         wrap.exceptionOccurred();
+         throw e;
+      }
+      finally
+      {
+         if (ps != null)
+         {
+            try
+            {
+               ps.close();
+            }
+            catch (Throwable e)
+            {
+            }
+         }
+         if (conn != null)
+         {
+            try
+            {
+               conn.close();
+            }
+            catch (Throwable e)
+            {
+            }
+         }
+         wrap.end();
+      }
+   }
+   
+   public void updatePageOrder(long channelID, List references) throws Exception
+   {
+      Connection conn = null;
+      PreparedStatement psUpdateReference = null;  
+      TransactionWrapper wrap = new TransactionWrapper();
+      
+      if (trace) { log.trace("Updating page order for channel:" + channelID); }
+        
+      try
+      {
+         conn = ds.getConnection();
+         
+         Iterator iter = references.iterator();
+         
+         if (usingBatchUpdates)
+         {
+            psUpdateReference = conn.prepareStatement(getSQLStatement("UPDATE_PAGE_ORDER"));
+         }
+         
+         while (iter.hasNext())
+         {
+            MessageReference ref = (MessageReference) iter.next();
+                 
+            if (!usingBatchUpdates)
+            {
+               psUpdateReference = conn.prepareStatement(getSQLStatement("UPDATE_PAGE_ORDER"));
+            }
+            
+            psUpdateReference.setLong(1, ref.getPagingOrder());
+
+            psUpdateReference.setLong(2, ref.getMessageID());
+            
+            psUpdateReference.setLong(3, channelID);
+            
+            if (usingBatchUpdates)
+            {
+               psUpdateReference.addBatch();
+            }
+            else
+            {
+               int rows = psUpdateReference.executeUpdate();
+               
+               if (trace) { log.trace("Updated " + rows + " rows"); }
+               
+               psUpdateReference.close();
+               psUpdateReference = null;
+            }
+         }
+                     
+         if (usingBatchUpdates)
+         {
+            int[] rowsReference = psUpdateReference.executeBatch();
+            
+            if (trace) { logBatchUpdate(getSQLStatement("UPDATE_PAGE_ORDER"), rowsReference, "updated"); }
+                        
+            psUpdateReference.close();
+            psUpdateReference = null;
+         }
+      }
+      catch (Exception e)
+      {
+         wrap.exceptionOccurred();
+         throw e;
+      }
+      finally
+      {
+         if (psUpdateReference != null)
+         {
+            try
+            {
+               psUpdateReference.close();
+            }
+            catch (Throwable t)
+            {
+            }
+         }
+         if (conn != null)
+         {
+            try
+            {
+               conn.close();
+            }
+            catch (Throwable t)
+            {
+            }
+         }
+         wrap.end();
+      }    
+   }
+      
    public List getPagedReferenceInfos(long channelID, long orderStart, long number) throws Exception
    {
       if (trace) { log.trace("loading message reference info for channel " + channelID + " from " + orderStart + " number " + number);      }
-                    
+                 
       List refs = new ArrayList();
       
       Connection conn = null;
@@ -1192,96 +1287,8 @@
       }      
    }   
    
-   public void updatePageOrder(long channelID, List references) throws Exception
-   {
-      Connection conn = null;
-      PreparedStatement psUpdateReference = null;  
-      TransactionWrapper wrap = new TransactionWrapper();
-      
-      if (trace) { log.trace("Updating page order for channel:" + channelID); }
-        
-      try
-      {
-         conn = ds.getConnection();
-         
-         Iterator iter = references.iterator();
-         
-         if (usingBatchUpdates)
-         {
-            psUpdateReference = conn.prepareStatement(getSQLStatement("UPDATE_PAGE_ORDER"));
-         }
-         
-         while (iter.hasNext())
-         {
-            MessageReference ref = (MessageReference) iter.next();
-                 
-            if (!usingBatchUpdates)
-            {
-               psUpdateReference = conn.prepareStatement(getSQLStatement("UPDATE_PAGE_ORDER"));
-            }
-            
-            psUpdateReference.setLong(1, ref.getPagingOrder());
-            
-            psUpdateReference.setLong(2, ref.getMessageID());
-            
-            psUpdateReference.setLong(3, channelID);
-            
-            if (usingBatchUpdates)
-            {
-               psUpdateReference.addBatch();
-            }
-            else
-            {
-               int rows = psUpdateReference.executeUpdate();
-               
-               if (trace) { log.trace("Updated " + rows + " rows"); }
-               
-               psUpdateReference.close();
-               psUpdateReference = null;
-            }
-         }
-                     
-         if (usingBatchUpdates)
-         {
-            int[] rowsReference = psUpdateReference.executeBatch();
-            
-            if (trace) { logBatchUpdate(getSQLStatement("UPDATE_PAGE_ORDER"), rowsReference, "updated"); }
-                        
-            psUpdateReference.close();
-            psUpdateReference = null;
-         }
-      }
-      catch (Exception e)
-      {
-         wrap.exceptionOccurred();
-         throw e;
-      }
-      finally
-      {
-         if (psUpdateReference != null)
-         {
-            try
-            {
-               psUpdateReference.close();
-            }
-            catch (Throwable t)
-            {
-            }
-         }
-         if (conn != null)
-         {
-            try
-            {
-               conn.close();
-            }
-            catch (Throwable t)
-            {
-            }
-         }
-         wrap.end();
-      }    
-   }
    
+   
    // End of paging functionality
    // ===========================
    
@@ -2925,6 +2932,8 @@
       ps.setLong(1, m.getMessageID());
    }
    
+   
+   
    protected void decrementChannelCount(Message m, PreparedStatement ps) throws Exception
    {
       ps.setLong(1, m.getMessageID());

Modified: trunk/tests/build.xml
===================================================================
--- trunk/tests/build.xml	2006-10-19 19:30:58 UTC (rev 1501)
+++ trunk/tests/build.xml	2006-10-19 22:01:47 UTC (rev 1502)
@@ -343,7 +343,7 @@
                <exclude name="**/jms/stress/**"/>
                <exclude name="**/jms/crash/*Test.class"/>
                <exclude name="**/jms/MemLeakTest.class"/>
-               <exclude name="**/jms/ManualClusteringTest.class"/>
+               <exclude name="**/jms/manual/**/*Test.class"/>
             </fileset>
          </batchtest>
       </junit>
@@ -456,7 +456,8 @@
                     haltonerror="${junit.batchtest.haltonerror}">
             <formatter type="plain" usefile="${junit.formatter.usefile}"/>
             <fileset dir="${build.tests.classes}">
-               <include name="**/jms/stress/**/*Test.class"/>
+               <!-- <include name="**/jms/stress/**/*Test.class"/> -->
+               <include name="**/jms/stress/StressTest.class"/>
             </fileset>
          </batchtest>
       </junit>
@@ -527,11 +528,10 @@
                <exclude name="org/jboss/test/messaging/jms/ReferencingTest.class"/>
                <exclude name="org/jboss/test/messaging/jms/PersistenceTest.class"/>
                <exclude name="org/jboss/test/messaging/jms/crash/*Test.class"/>
-               <exclude name="org/jboss/test/messaging/jms/ManualCrashTest.class"/>
                <exclude name="org/jboss/test/messaging/jms/MemLeakTest.class"/>
                <exclude name="org/jboss/test/messaging/jms/ManifestTest.class"/>
                <exclude name="org/jboss/test/messaging/jms/JCAWrapperTest.class"/>
-               <exclude name="**/jms/ManualClusteringTest.class"/>
+               <exclude name="**/jms/manual/**/*Test.class"/>
             </fileset>
          </batchtest>
       </junit>

Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/DefaultPostOfficeTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/DefaultPostOfficeTest.java	2006-10-19 19:30:58 UTC (rev 1501)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/DefaultPostOfficeTest.java	2006-10-19 22:01:47 UTC (rev 1502)
@@ -212,7 +212,10 @@
             office2.stop();
          }
          
-         checkNoBindingData();
+         if (checkNoBindingData())
+         {
+            fail("Binding data still in database");
+         }
       }
             
    }
@@ -490,8 +493,14 @@
             postOffice.stop();
          }
          
-         checkNoMessageData();
-         checkNoBindingData();
+         if (checkNoMessageData())
+         {
+            fail("data still in database");
+         }
+         if (checkNoBindingData())
+         {
+            fail("Binding data still in database");
+         }
       }
    
    }
@@ -533,8 +542,14 @@
             postOffice.stop();
          }
          
-         checkNoMessageData();
-         checkNoBindingData();
+         if (checkNoMessageData())
+         {
+            fail("data still in database");
+         }
+         if (checkNoBindingData())
+         {
+            fail("Binding data still in database");
+         }
       }
    }
    
@@ -641,8 +656,14 @@
             postOffice.stop();
          }
          
-         checkNoMessageData();
-         checkNoBindingData();
+         if (checkNoMessageData())
+         {
+            fail("data still in database");
+         }
+         if (checkNoBindingData())
+         {
+            fail("Binding data still in database");
+         }
       }
    }
    
@@ -813,8 +834,15 @@
             postOffice.stop();
          }
          
-         checkNoMessageData();
-         checkNoBindingData();
+         if (checkNoMessageData())
+         {
+            fail("data still in database");
+         }
+         if (checkNoBindingData())
+         {
+            fail("Binding data still in database");
+         }
+      
       }
    }
    
@@ -1084,8 +1112,14 @@
             postOffice.stop();
          }
          
-         checkNoMessageData();
-         checkNoBindingData();
+         if (checkNoMessageData())
+         {
+            fail("data still in database");
+         };
+         if (checkNoBindingData())
+         {
+            fail("Binding data still in database");
+         }
       }
    }
    

Deleted: trunk/tests/src/org/jboss/test/messaging/jms/ManualClusteringTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/ManualClusteringTest.java	2006-10-19 19:30:58 UTC (rev 1501)
+++ trunk/tests/src/org/jboss/test/messaging/jms/ManualClusteringTest.java	2006-10-19 22:01:47 UTC (rev 1502)
@@ -1,1297 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.test.messaging.jms;
-
-import java.util.Properties;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.DeliveryMode;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-import javax.naming.Context;
-import javax.naming.InitialContext;
-
-import org.jboss.test.messaging.MessagingTestCase;
-
-/**
- * 
- * A ManualClusteringTest
- * 
- * Nodes must be started up in order node1, node2, node3
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @version <tt>$Revision: 1.1 $</tt>
- *
- * $Id$
- *
- */
-public class ManualClusteringTest extends MessagingTestCase
-{
-   protected Context ic1;
-   
-   protected Context ic2;
-   
-   protected Context ic3;
-   
-   protected Queue queue1;
-   
-   protected Topic topic1;
-   
-   protected Queue queue2;
-   
-   protected Topic topic2;
-   
-   protected Queue queue3;
-   
-   protected Topic topic3;
-   
-   protected ConnectionFactory cf1;
-   
-   protected ConnectionFactory cf2;
-   
-   protected ConnectionFactory cf3;
-     
-   public ManualClusteringTest(String name)
-   {
-      super(name);
-   }
-
-   protected void setUp() throws Exception
-   {
-      super.setUp();
-      
-      Properties props1 = new Properties();
-      
-      props1.put(Context.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory");
-      props1.put(Context.PROVIDER_URL, "jnp://localhost:1199");
-      props1.put(Context.URL_PKG_PREFIXES, "org.jnp.interfaces");
-      
-      ic1 = new InitialContext(props1);
-      
-      Properties props2 = new Properties();
-      
-      props2.put(Context.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory");
-      props2.put(Context.PROVIDER_URL, "jnp://localhost:1299");
-      props2.put(Context.URL_PKG_PREFIXES, "org.jnp.interfaces");
-      
-      ic2 = new InitialContext(props2);
-      
-      Properties props3 = new Properties();
-      
-      props3.put(Context.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory");
-      props3.put(Context.PROVIDER_URL, "jnp://localhost:1399");
-      props3.put(Context.URL_PKG_PREFIXES, "org.jnp.interfaces");
-      
-      ic3 = new InitialContext(props3);
-      
-      queue1 = (Queue)ic1.lookup("queue/testDistributedQueue");
-      
-      queue2 = (Queue)ic2.lookup("queue/testDistributedQueue");
-      
-      queue3 = (Queue)ic3.lookup("queue/testDistributedQueue");
-            
-      topic1 = (Topic)ic1.lookup("topic/testDistributedTopic");
-      
-      topic2 = (Topic)ic2.lookup("topic/testDistributedTopic");
-      
-      topic3 = (Topic)ic3.lookup("topic/testDistributedTopic");
-      
-      cf1 = (ConnectionFactory)ic1.lookup("/ConnectionFactory");
-      
-      cf2 = (ConnectionFactory)ic2.lookup("/ConnectionFactory");
-      
-      cf3 = (ConnectionFactory)ic3.lookup("/ConnectionFactory");
-      
-      drainStuff();
-   }
-
-   protected void tearDown() throws Exception
-   {
-      super.tearDown();
-      
-      ic1.close();
-      
-      ic2.close();
-   }
-   
-   protected void drainStuff() throws Exception
-   {
-      Connection conn1 = null;
-      
-      Connection conn2 = null;
-      
-      Connection conn3 = null;
-            
-      try
-      {
-         conn1 = cf1.createConnection();
-         
-         conn2 = cf2.createConnection();
-         
-         conn3 = cf3.createConnection();
-           
-         Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         
-         Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         
-         Session sess3 = conn3.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         
-         MessageConsumer cons1 = sess1.createConsumer(queue1);
-         
-         MessageConsumer cons2 = sess2.createConsumer(queue2);
-         
-         MessageConsumer cons3 = sess3.createConsumer(queue2);
-         
-         conn1.start();
-         
-         conn2.start();
-         
-         conn3.start();
-         
-         Message msg = null;
-         
-         do
-         {
-            msg = cons1.receive(1000);
-         }
-         while (msg != null);
-         
-         do
-         {
-            msg = cons2.receive(1000);
-         }
-         while (msg != null);
-         
-         do
-         {
-            msg = cons3.receive(1000);
-         }
-         while (msg != null);
-      }
-      finally
-      {      
-         if (conn1 != null) conn1.close();
-         
-         if (conn2 != null) conn2.close();
-         
-         if (conn3 != null) conn3.close();
-      }
-   }
-   
-   public void testClusteredQueueLocalConsumerNonPersistent() throws Exception
-   {
-      clusteredQueueLocalConsumer(false);
-   }
-   
-   public void testClusteredQueueLocalConsumerPersistent() throws Exception
-   {
-      clusteredQueueLocalConsumer(true);
-   }
-   
-   public void testClusteredQueueNoLocalConsumerNonPersistent() throws Exception
-   {
-      clusteredQueueNoLocalConsumer(false);
-   }
-   
-   public void testClusteredQueueNoLocalConsumerPersistent() throws Exception
-   {
-      clusteredQueueNoLocalConsumer(true);
-   }
-   
-   
-   public void testClusteredTopicNonDurableNonPersistent() throws Exception
-   {
-      clusteredTopicNonDurable(false);
-   }
-   
-   public void testClusteredTopicNonDurablePersistent() throws Exception
-   {
-      clusteredTopicNonDurable(true);
-   }
-   
-   
-   public void testClusteredTopicNonDurableWithSelectorsNonPersistent() throws Exception
-   {
-      clusteredTopicNonDurableWithSelectors(false);
-   }
-   
-   public void testClusteredTopicNonDurableWithSelectorsPersistent() throws Exception
-   {
-      clusteredTopicNonDurableWithSelectors(true);
-   }
-   
-   public void testClusteredTopicDurableNonPersistent() throws Exception
-   {
-      clusteredTopicDurable(false);
-   }
-   
-   public void testClusteredTopicDurablePersistent() throws Exception
-   {
-      clusteredTopicDurable(true);
-   }
-   
-   public void testClusteredTopicSharedDurableLocalConsumerNonPersistent() throws Exception
-   {
-      clusteredTopicSharedDurableLocalConsumer(false);
-   }
-   
-   public void testClusteredTopicSharedDurableLocalConsumerPersistent() throws Exception
-   {
-      clusteredTopicSharedDurableLocalConsumer(true);
-   }
-   
-   public void testClusteredTopicSharedDurableNoLocalConsumerNonPersistent() throws Exception
-   {
-      clusteredTopicSharedDurableNoLocalConsumer(false);
-   }
-   
-   public void testClusteredTopicSharedDurableNoLocalConsumerPersistent() throws Exception
-   {
-      clusteredTopicSharedDurableNoLocalConsumer(true);
-   }
-   
-   public void testClusteredTopicSharedDurableNoLocalSubNonPersistent() throws Exception
-   {
-      clusteredTopicSharedDurableNoLocalSub(false);
-   }
-   
-   public void testClusteredTopicSharedDurableNoLocalSubPersistent() throws Exception
-   {
-      clusteredTopicSharedDurableNoLocalSub(true);
-   }
-   
-   
-   
-   
-   /*
-    * Create a consumer on each queue on each node.
-    * Send messages in turn from all nodes.
-    * Ensure that the local consumer gets the message
-    */
-   protected void clusteredQueueLocalConsumer(boolean persistent) throws Exception
-   {
-      Connection conn1 = null;
-      
-      Connection conn2 = null;
-      
-      Connection conn3 = null;
-      try
-      {
-         conn1 = cf1.createConnection();
-         
-         conn2 = cf2.createConnection();
-         
-         conn3 = cf3.createConnection();
-           
-         Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         
-         Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         
-         Session sess3 = conn3.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         
-         MessageConsumer cons1 = sess1.createConsumer(queue1);
-         
-         MessageConsumer cons2 = sess2.createConsumer(queue2);
-         
-         MessageConsumer cons3 = sess3.createConsumer(queue3);
-         
-         conn1.start();
-         
-         conn2.start();
-         
-         conn3.start();
-         
-         //Send at node1
-         
-         MessageProducer prod1 = sess1.createProducer(queue1);
-         
-         prod1.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-         
-         final int NUM_MESSAGES = 100;
-         
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            TextMessage tm = sess1.createTextMessage("message" + i);
-            
-            prod1.send(tm);
-         }
-         
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            TextMessage tm = (TextMessage)cons1.receive(1000);
-            
-            assertNotNull(tm);
-            
-            assertEquals("message" + i, tm.getText());
-         }
-         
-         Message m = cons2.receive(2000);
-         
-         assertNull(m);
-         
-         m = cons3.receive(2000);
-         
-         assertNull(m);
-         
-         // Send at node2
-         
-         MessageProducer prod2 = sess2.createProducer(queue2);
-         
-         prod2.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-         
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            TextMessage tm = sess2.createTextMessage("message" + i);
-            
-            prod2.send(tm);
-         }
-         
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            TextMessage tm = (TextMessage)cons2.receive(1000);
-            
-            assertNotNull(tm);
-            
-            assertEquals("message" + i, tm.getText());
-         }
-         
-         m = cons1.receive(2000);
-         
-         assertNull(m);
-         
-         m = cons3.receive(2000);
-         
-         assertNull(m);
-         
-         // Send at node3
-         
-         MessageProducer prod3 = sess3.createProducer(queue3);
-         
-         prod3.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-         
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            TextMessage tm = sess3.createTextMessage("message" + i);
-            
-            prod3.send(tm);
-         }
-            
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            TextMessage tm = (TextMessage)cons3.receive(1000);
-            
-            assertNotNull(tm);
-            
-            assertEquals("message" + i, tm.getText());
-         }
-         
-         m = cons1.receive(2000);
-         
-         assertNull(m);
-         
-         m = cons2.receive(2000);
-         
-         assertNull(m);         
-      }
-      finally
-      {      
-         if (conn1 != null) conn1.close();
-         
-         if (conn2 != null) conn2.close();
-         
-         if (conn3 != null) conn3.close();
-      }
-   }
-   
-   
-   
-   
-   /*
-    * Create a consumer on two nodes out of three
-    * Send messages from the third node
-    * Ensure that the messages are received from the other two nodes in 
-    * round robin order.
-    * (Note that this test depends on us using the default router which has
-    * this round robin behaviour)
-    */
-   protected void clusteredQueueNoLocalConsumer(boolean persistent) throws Exception
-   {
-      Connection conn1 = null;
-      
-      Connection conn2 = null;
-      
-      Connection conn3 = null;
-      try
-      {
-         conn1 = cf1.createConnection();
-         
-         conn2 = cf2.createConnection();
-         
-         conn3 = cf3.createConnection();
-           
-         Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         
-         Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         
-         Session sess3 = conn3.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         
-         MessageConsumer cons2 = sess2.createConsumer(queue2);
-         
-         MessageConsumer cons3 = sess3.createConsumer(queue3);
-         
-         conn2.start();
-         
-         conn3.start();
-         
-         //Send at node1
-         
-         MessageProducer prod1 = sess1.createProducer(queue1);
-         
-         prod1.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-         
-         final int NUM_MESSAGES = 100;
-         
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            TextMessage tm = sess1.createTextMessage("message" + i);
-            
-            prod1.send(tm);
-         }
-         
-         for (int i = 0; i < NUM_MESSAGES / 2; i++)
-         {
-            TextMessage tm = (TextMessage)cons2.receive(1000);
-            
-            assertNotNull(tm);
-            
-            assertEquals("message" + i * 2, tm.getText());
-         }
-         
-         for (int i = 0; i < NUM_MESSAGES / 2; i++)
-         {
-            TextMessage tm = (TextMessage)cons3.receive(1000);
-            
-            assertNotNull(tm);
-            
-            assertEquals("message" + (i * 2 + 1), tm.getText());
-         }
-      
-      }
-      finally
-      {      
-         if (conn1 != null) conn1.close();
-         
-         if (conn2 != null) conn2.close();
-         
-         if (conn3 != null) conn3.close();
-      }
-   }
-   
-   
-   
-   /*
-    * Create non durable subscriptions on all nodes of the cluster.
-    * Ensure all messages are receive as appropriate
-    */
-   public void clusteredTopicNonDurable(boolean persistent) throws Exception
-   {
-      Connection conn1 = null;
-      
-      Connection conn2 = null;
-      
-      Connection conn3 = null;
-      try
-      {
-         conn1 = cf1.createConnection();
-         
-         conn2 = cf2.createConnection();
-         
-         conn3 = cf3.createConnection();
-           
-         Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         
-         Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         
-         Session sess3 = conn3.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         
-         MessageConsumer cons1 = sess1.createConsumer(topic1);
-         
-         MessageConsumer cons2 = sess2.createConsumer(topic2);
-         
-         MessageConsumer cons3 = sess3.createConsumer(topic3);
-         
-         MessageConsumer cons4 = sess1.createConsumer(topic1);
-         
-         MessageConsumer cons5 = sess2.createConsumer(topic2);
-            
-         conn1.start();
-         
-         conn2.start();
-         
-         conn3.start();
-         
-         //Send at node1
-         
-         MessageProducer prod1 = sess1.createProducer(topic1);
-         
-         prod1.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-         
-         final int NUM_MESSAGES = 100;
-         
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            TextMessage tm = sess1.createTextMessage("message" + i);
-            
-            prod1.send(tm);
-         }
-            
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            TextMessage tm = (TextMessage)cons1.receive(1000);
-            
-            assertNotNull(tm);
-                        
-            assertEquals("message" + i, tm.getText());                        
-         }
-         
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            TextMessage tm = (TextMessage)cons2.receive(1000);
-                      
-            assertNotNull(tm);
-            
-            assertEquals("message" + i, tm.getText());
-         }
-         
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            TextMessage tm = (TextMessage)cons3.receive(1000);
-                        
-            assertNotNull(tm);
-             
-            assertEquals("message" + i, tm.getText());
-         } 
-         
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            TextMessage tm = (TextMessage)cons4.receive(1000);
-                        
-            assertNotNull(tm);
-             
-            assertEquals("message" + i, tm.getText());
-         } 
-         
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            TextMessage tm = (TextMessage)cons5.receive(1000);
-                        
-            assertNotNull(tm);
-             
-            assertEquals("message" + i, tm.getText());
-         } 
-      }
-      finally
-      {      
-         if (conn1 != null) conn1.close();
-         
-         if (conn2 != null) conn2.close();
-         
-         if (conn3 != null) conn3.close();
-      }
-   }
-   
-   
-   
-   
-   /*
-    * Create non durable subscriptions on all nodes of the cluster.
-    * Include some with selectors
-    * Ensure all messages are receive as appropriate
-    */
-   public void clusteredTopicNonDurableWithSelectors(boolean persistent) throws Exception
-   {
-      Connection conn1 = null;
-      
-      Connection conn2 = null;
-      
-      Connection conn3 = null;
-      try
-      {
-         conn1 = cf1.createConnection();
-         
-         conn2 = cf2.createConnection();
-         
-         conn3 = cf3.createConnection();
-                             
-         Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         
-         Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         
-         Session sess3 = conn3.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         
-         MessageConsumer cons1 = sess1.createConsumer(topic1);
-         
-         MessageConsumer cons2 = sess2.createConsumer(topic2);
-         
-         MessageConsumer cons3 = sess3.createConsumer(topic3);
-         
-         MessageConsumer cons4 = sess1.createConsumer(topic1, "COLOUR='red'");
-         
-         MessageConsumer cons5 = sess2.createConsumer(topic2, "COLOUR='blue'");
-            
-         conn1.start();
-         
-         conn2.start();
-         
-         conn3.start();
-         
-         //Send at node1
-         
-         MessageProducer prod1 = sess1.createProducer(topic1);
-         
-         prod1.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-         
-         final int NUM_MESSAGES = 100;
-         
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            TextMessage tm = sess1.createTextMessage("message" + i);
-            
-            int c = i % 3;
-            if (c == 0)
-            {
-               tm.setStringProperty("COLOUR", "red");
-            }
-            else if (c == 1)
-            {
-               tm.setStringProperty("COLOUR", "blue");
-            }
-            
-            prod1.send(tm);
-         }
-            
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            TextMessage tm = (TextMessage)cons1.receive(1000);
-            
-            assertNotNull(tm);
-                        
-            assertEquals("message" + i, tm.getText());                        
-         }
-         
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            TextMessage tm = (TextMessage)cons2.receive(1000);
-                      
-            assertNotNull(tm);
-            
-            assertEquals("message" + i, tm.getText());
-         }
-         
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            TextMessage tm = (TextMessage)cons3.receive(1000);
-                        
-            assertNotNull(tm);
-             
-            assertEquals("message" + i, tm.getText());
-         } 
-         
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            int c = i % 3;
-            
-            if (c == 0)
-            {
-               TextMessage tm = (TextMessage)cons4.receive(1000);
-                           
-               assertNotNull(tm);
-                
-               assertEquals("message" + i, tm.getText());
-            }
-         } 
-         
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            int c = i % 3;
-            
-            if (c == 1)
-            {
-               TextMessage tm = (TextMessage)cons5.receive(1000);
-                           
-               assertNotNull(tm);
-                
-               assertEquals("message" + i, tm.getText());
-            }
-         } 
-      }
-      finally
-      {      
-         if (conn1 != null) conn1.close();
-         
-         if (conn2 != null) conn2.close();
-         
-         if (conn3 != null) conn3.close();
-      }
-   }
-   
-   
-   
-   /*
-    * Create durable subscriptions on all nodes of the cluster.
-    * Include a couple with selectors
-    * Ensure all messages are receive as appropriate
-    * None of the durable subs are shared
-    */
-   public void clusteredTopicDurable(boolean persistent) throws Exception
-   {
-      Connection conn1 = null;
-      
-      Connection conn2 = null;
-      
-      Connection conn3 = null;
-      try
-      {
-         conn1 = cf1.createConnection();
-         
-         conn2 = cf2.createConnection();
-         
-         conn3 = cf3.createConnection();
-         
-         conn1.setClientID("wib1");
-         
-         conn2.setClientID("wib1");
-         
-         conn3.setClientID("wib1");
-           
-         Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         
-         Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         
-         Session sess3 = conn3.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         
-         MessageConsumer cons1 = sess1.createDurableSubscriber(topic1, "sub1");
-         
-         MessageConsumer cons2 = sess2.createDurableSubscriber(topic2, "sub2");
-         
-         MessageConsumer cons3 = sess3.createDurableSubscriber(topic3, "sub3");
-         
-         MessageConsumer cons4 = sess1.createDurableSubscriber(topic1, "sub4");
-         
-         MessageConsumer cons5 = sess2.createDurableSubscriber(topic2, "sub5");
-            
-         conn1.start();
-         
-         conn2.start();
-         
-         conn3.start();
-         
-         //Send at node1
-         
-         MessageProducer prod1 = sess1.createProducer(topic1);
-         
-         prod1.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-         
-         final int NUM_MESSAGES = 100;
-         
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            TextMessage tm = sess1.createTextMessage("message" + i);
-            
-            prod1.send(tm);
-         }
-            
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            TextMessage tm = (TextMessage)cons1.receive(1000);
-            
-            assertNotNull(tm);
-                        
-            assertEquals("message" + i, tm.getText());                        
-         }
-         
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            TextMessage tm = (TextMessage)cons2.receive(1000);
-                      
-            assertNotNull(tm);
-            
-            assertEquals("message" + i, tm.getText());
-         }
-         
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            TextMessage tm = (TextMessage)cons3.receive(1000);
-                        
-            assertNotNull(tm);
-             
-            assertEquals("message" + i, tm.getText());
-         } 
-         
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            TextMessage tm = (TextMessage)cons4.receive(1000);
-                        
-            assertNotNull(tm);
-             
-            assertEquals("message" + i, tm.getText());
-         } 
-         
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            TextMessage tm = (TextMessage)cons5.receive(1000);
-                        
-            assertNotNull(tm);
-             
-            assertEquals("message" + i, tm.getText());
-         } 
-         
-         cons1.close();
-         
-         cons2.close();
-         
-         cons3.close();
-         
-         cons4.close();
-         
-         cons5.close();
-         
-         sess1.unsubscribe("sub1");
-         
-         sess2.unsubscribe("sub2");
-         
-         sess3.unsubscribe("sub3");
-         
-         sess1.unsubscribe("sub4");
-         
-         sess2.unsubscribe("sub5");
-         
-      }
-      finally
-      {      
-         if (conn1 != null) conn1.close();
-         
-         if (conn2 != null) conn2.close();
-         
-         if (conn3 != null) conn3.close();
-      }
-   }
-   
-   
-   
-   
-   /*
-    * Create shared durable subs on multiple nodes, the local instance should always get the message
-    */
-   protected void clusteredTopicSharedDurableLocalConsumer(boolean persistent) throws Exception
-   {
-      Connection conn1 = null;
-      
-      Connection conn2 = null;
-      
-      Connection conn3 = null;
-      try
-      {
-         conn1 = cf1.createConnection();
-         
-         conn2 = cf2.createConnection();
-         
-         conn3 = cf3.createConnection();
-         
-         conn1.setClientID("wib1");
-         
-         conn2.setClientID("wib1");
-         
-         conn3.setClientID("wib1");
-           
-         Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         
-         Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         
-         Session sess3 = conn3.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         
-         MessageConsumer cons1 = sess1.createDurableSubscriber(topic1, "sub1");
-         
-         MessageConsumer cons2 = sess2.createDurableSubscriber(topic2, "sub1");
-         
-         MessageConsumer cons3 = sess3.createDurableSubscriber(topic3, "sub1");
-         
-         conn1.start();
-         
-         conn2.start();
-         
-         conn3.start();
-         
-         //Send at node1
-         
-         MessageProducer prod1 = sess1.createProducer(topic1);
-         
-         prod1.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-         
-         final int NUM_MESSAGES = 100;
-         
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            TextMessage tm = sess1.createTextMessage("message" + i);
-            
-            prod1.send(tm);
-         }
-         
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            TextMessage tm = (TextMessage)cons1.receive(1000);
-            
-            assertNotNull(tm);
-            
-            assertEquals("message" + i, tm.getText());
-         }
-         
-         Message m = cons2.receive(2000);
-         
-         assertNull(m);
-         
-         m = cons3.receive(2000);
-         
-         assertNull(m);
-         
-         // Send at node2
-         
-         MessageProducer prod2 = sess2.createProducer(topic2);
-         
-         prod2.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-         
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            TextMessage tm = sess2.createTextMessage("message" + i);
-            
-            prod2.send(tm);
-         }
-         
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            TextMessage tm = (TextMessage)cons2.receive(1000);
-            
-            assertNotNull(tm);
-               
-            assertEquals("message" + i, tm.getText());
-         }
-         
-         m = cons1.receive(2000);
-         
-         assertNull(m);
-         
-         m = cons3.receive(2000);
-         
-         assertNull(m);
-         
-         // Send at node3
-         
-         MessageProducer prod3 = sess3.createProducer(topic3);
-         
-         prod3.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-         
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            TextMessage tm = sess3.createTextMessage("message" + i);
-            
-            prod3.send(tm);
-         }
-           
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            TextMessage tm = (TextMessage)cons3.receive(1000);
-            
-            assertNotNull(tm);
-            
-            assertEquals("message" + i, tm.getText());
-         }
-         
-         m = cons1.receive(2000);
-         
-         assertNull(m);
-         
-         m = cons2.receive(2000);
-         
-         assertNull(m);         
-         
-         cons1.close();
-         
-         cons2.close();
-         
-         cons3.close();
-         
-         //Need to unsubscribe on any node that the durable sub was created on
-         
-         sess1.unsubscribe("sub1");
-         
-         sess2.unsubscribe("sub1");
-         
-         sess3.unsubscribe("sub1");
-      }
-      finally
-      {      
-         if (conn1 != null) conn1.close();
-         
-         if (conn2 != null) conn2.close();
-         
-         if (conn3 != null) conn3.close();
-      }
-   }
-   
-   
-   /*
-    * Create shared durable subs on multiple nodes, but without consumer on local node
-    * even thought there is durable sub
-    * should round robin
-    * note that this test assumes round robin
-    */
-   protected void clusteredTopicSharedDurableNoLocalConsumer(boolean persistent) throws Exception
-   {
-      Connection conn1 = null;
-      
-      Connection conn2 = null;
-      
-      Connection conn3 = null;
-      try
-      {
-         conn1 = cf1.createConnection();
-         
-         conn2 = cf2.createConnection();
-         
-         conn3 = cf3.createConnection();
-         
-         conn1.setClientID("wib1");
-         
-         conn2.setClientID("wib1");
-         
-         conn3.setClientID("wib1");
-           
-         Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         
-         Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         
-         Session sess3 = conn3.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         
-         MessageConsumer cons1 = sess1.createDurableSubscriber(topic1, "sub1");
-         
-         //Now close it on node 1
-         conn1.close();
-         
-         conn1 = cf1.createConnection();
-         
-         conn1.setClientID("wib1");         
-         
-         sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         
-         //This means the durable sub is inactive on node1
-         
-         MessageConsumer cons2 = sess2.createDurableSubscriber(topic2, "sub1");
-         
-         MessageConsumer cons3 = sess3.createDurableSubscriber(topic3, "sub1");
-         
-         conn2.start();
-         
-         conn3.start();
-         
-         //Send at node1
-         
-         //Should round robin between the other 2 since there is no active consumer on sub1 on node1
-         
-         MessageProducer prod1 = sess1.createProducer(topic1);
-         
-         prod1.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-         
-         final int NUM_MESSAGES = 100;
-         
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            TextMessage tm = sess1.createTextMessage("message" + i);
-            
-            prod1.send(tm);
-         }
-         
-         for (int i = 0; i < NUM_MESSAGES / 2; i++)
-         {
-            TextMessage tm = (TextMessage)cons2.receive(1000);
-            
-            assertNotNull(tm);
-            
-            assertEquals("message" + i * 2, tm.getText());
-         }
-         
-         for (int i = 0; i < NUM_MESSAGES / 2; i++)
-         {
-            TextMessage tm = (TextMessage)cons3.receive(1000);
-            
-            assertNotNull(tm);
-            
-            assertEquals("message" + (i * 2 + 1), tm.getText());
-         }
-         
-         cons2.close();
-         
-         cons3.close();
-         
-         sess1.unsubscribe("sub1");
-         
-         sess2.unsubscribe("sub1");
-         
-         sess3.unsubscribe("sub1");
-      
-      }
-      finally
-      {      
-         if (conn1 != null) conn1.close();
-         
-         if (conn2 != null) conn2.close();
-         
-         if (conn3 != null) conn3.close();
-      }
-   }
-   
-   
-   
-   /*
-    * Create shared durable subs on multiple nodes, but without sub on local node
-    * should round robin
-    * note that this test assumes round robin
-    */
-   protected void clusteredTopicSharedDurableNoLocalSub(boolean persistent) throws Exception
-   {
-      Connection conn1 = null;
-      
-      Connection conn2 = null;
-      
-      Connection conn3 = null;
-      try
-      {
-         conn1 = cf1.createConnection();
-         
-         conn2 = cf2.createConnection();
-         
-         conn3 = cf3.createConnection();
-         
-         conn2.setClientID("wib1");
-         
-         conn3.setClientID("wib1");
-           
-         Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         
-         Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         
-         Session sess3 = conn3.createSession(false, Session.AUTO_ACKNOWLEDGE);
-                  
-         MessageConsumer cons2 = sess2.createDurableSubscriber(topic2, "sub1");
-         
-         MessageConsumer cons3 = sess3.createDurableSubscriber(topic3, "sub1");
-         
-         conn2.start();
-         
-         conn3.start();
-         
-         //Send at node1
-         
-         //Should round robin between the other 2 since there is no active consumer on sub1 on node1
-         
-         MessageProducer prod1 = sess1.createProducer(topic1);
-         
-         prod1.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-         
-         final int NUM_MESSAGES = 100;
-         
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            TextMessage tm = sess1.createTextMessage("message" + i);
-            
-            prod1.send(tm);
-         }
-         
-         for (int i = 0; i < NUM_MESSAGES / 2; i++)
-         {
-            TextMessage tm = (TextMessage)cons2.receive(1000);
-            
-            assertNotNull(tm);
-            
-            assertEquals("message" + i * 2, tm.getText());
-         }
-         
-         for (int i = 0; i < NUM_MESSAGES / 2; i++)
-         {
-            TextMessage tm = (TextMessage)cons3.receive(1000);
-            
-            assertNotNull(tm);
-            
-            assertEquals("message" + (i * 2 + 1), tm.getText());
-         }
-         
-         cons2.close();
-         
-         cons3.close();
-         
-         sess2.unsubscribe("sub1");
-         
-         sess3.unsubscribe("sub1");
-      
-      }
-      finally
-      {      
-         if (conn1 != null) conn1.close();
-         
-         if (conn2 != null) conn2.close();
-         
-         if (conn3 != null) conn3.close();
-      }
-   }
-
-   class MyListener implements MessageListener
-   {
-      private int i;
-      
-      MyListener(int i)
-      {
-         this.i = i;
-      }
-
-      public void onMessage(Message m)
-      {
-         try
-         {
-            int count = m.getIntProperty("count");
-            
-            log.info("Listener " + i + " received message " + count);
-         }
-         catch (Exception e)
-         {
-            e.printStackTrace();
-         }
-      }
-      
-   }
-   
-}

Copied: trunk/tests/src/org/jboss/test/messaging/jms/manual/ManualClusteringTest.java (from rev 1500, trunk/tests/src/org/jboss/test/messaging/jms/ManualClusteringTest.java)
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/ManualClusteringTest.java	2006-10-18 22:48:32 UTC (rev 1500)
+++ trunk/tests/src/org/jboss/test/messaging/jms/manual/ManualClusteringTest.java	2006-10-19 22:01:47 UTC (rev 1502)
@@ -0,0 +1,1297 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.test.messaging.jms;
+
+import java.util.Properties;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+
+import org.jboss.test.messaging.MessagingTestCase;
+
+/**
+ * 
+ * A ManualClusteringTest
+ * 
+ * Nodes must be started up in order node1, node2, node3
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+public class ManualClusteringTest extends MessagingTestCase
+{
+   protected Context ic1;
+   
+   protected Context ic2;
+   
+   protected Context ic3;
+   
+   protected Queue queue1;
+   
+   protected Topic topic1;
+   
+   protected Queue queue2;
+   
+   protected Topic topic2;
+   
+   protected Queue queue3;
+   
+   protected Topic topic3;
+   
+   protected ConnectionFactory cf1;
+   
+   protected ConnectionFactory cf2;
+   
+   protected ConnectionFactory cf3;
+     
+   public ManualClusteringTest(String name)
+   {
+      super(name);
+   }
+
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+      
+      Properties props1 = new Properties();
+      
+      props1.put(Context.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory");
+      props1.put(Context.PROVIDER_URL, "jnp://localhost:1199");
+      props1.put(Context.URL_PKG_PREFIXES, "org.jnp.interfaces");
+      
+      ic1 = new InitialContext(props1);
+      
+      Properties props2 = new Properties();
+      
+      props2.put(Context.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory");
+      props2.put(Context.PROVIDER_URL, "jnp://localhost:1299");
+      props2.put(Context.URL_PKG_PREFIXES, "org.jnp.interfaces");
+      
+      ic2 = new InitialContext(props2);
+      
+      Properties props3 = new Properties();
+      
+      props3.put(Context.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory");
+      props3.put(Context.PROVIDER_URL, "jnp://localhost:1399");
+      props3.put(Context.URL_PKG_PREFIXES, "org.jnp.interfaces");
+      
+      ic3 = new InitialContext(props3);
+      
+      queue1 = (Queue)ic1.lookup("queue/testDistributedQueue");
+      
+      queue2 = (Queue)ic2.lookup("queue/testDistributedQueue");
+      
+      queue3 = (Queue)ic3.lookup("queue/testDistributedQueue");
+            
+      topic1 = (Topic)ic1.lookup("topic/testDistributedTopic");
+      
+      topic2 = (Topic)ic2.lookup("topic/testDistributedTopic");
+      
+      topic3 = (Topic)ic3.lookup("topic/testDistributedTopic");
+      
+      cf1 = (ConnectionFactory)ic1.lookup("/ConnectionFactory");
+      
+      cf2 = (ConnectionFactory)ic2.lookup("/ConnectionFactory");
+      
+      cf3 = (ConnectionFactory)ic3.lookup("/ConnectionFactory");
+      
+      drainStuff();
+   }
+
+   protected void tearDown() throws Exception
+   {
+      super.tearDown();
+      
+      ic1.close();
+      
+      ic2.close();
+   }
+   
+   protected void drainStuff() throws Exception
+   {
+      Connection conn1 = null;
+      
+      Connection conn2 = null;
+      
+      Connection conn3 = null;
+            
+      try
+      {
+         conn1 = cf1.createConnection();
+         
+         conn2 = cf2.createConnection();
+         
+         conn3 = cf3.createConnection();
+           
+         Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         Session sess3 = conn3.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         MessageConsumer cons1 = sess1.createConsumer(queue1);
+         
+         MessageConsumer cons2 = sess2.createConsumer(queue2);
+         
+         MessageConsumer cons3 = sess3.createConsumer(queue2);
+         
+         conn1.start();
+         
+         conn2.start();
+         
+         conn3.start();
+         
+         Message msg = null;
+         
+         do
+         {
+            msg = cons1.receive(1000);
+         }
+         while (msg != null);
+         
+         do
+         {
+            msg = cons2.receive(1000);
+         }
+         while (msg != null);
+         
+         do
+         {
+            msg = cons3.receive(1000);
+         }
+         while (msg != null);
+      }
+      finally
+      {      
+         if (conn1 != null) conn1.close();
+         
+         if (conn2 != null) conn2.close();
+         
+         if (conn3 != null) conn3.close();
+      }
+   }
+   
+   public void testClusteredQueueLocalConsumerNonPersistent() throws Exception
+   {
+      clusteredQueueLocalConsumer(false);
+   }
+   
+   public void testClusteredQueueLocalConsumerPersistent() throws Exception
+   {
+      clusteredQueueLocalConsumer(true);
+   }
+   
+   public void testClusteredQueueNoLocalConsumerNonPersistent() throws Exception
+   {
+      clusteredQueueNoLocalConsumer(false);
+   }
+   
+   public void testClusteredQueueNoLocalConsumerPersistent() throws Exception
+   {
+      clusteredQueueNoLocalConsumer(true);
+   }
+   
+   
+   public void testClusteredTopicNonDurableNonPersistent() throws Exception
+   {
+      clusteredTopicNonDurable(false);
+   }
+   
+   public void testClusteredTopicNonDurablePersistent() throws Exception
+   {
+      clusteredTopicNonDurable(true);
+   }
+   
+   
+   public void testClusteredTopicNonDurableWithSelectorsNonPersistent() throws Exception
+   {
+      clusteredTopicNonDurableWithSelectors(false);
+   }
+   
+   public void testClusteredTopicNonDurableWithSelectorsPersistent() throws Exception
+   {
+      clusteredTopicNonDurableWithSelectors(true);
+   }
+   
+   public void testClusteredTopicDurableNonPersistent() throws Exception
+   {
+      clusteredTopicDurable(false);
+   }
+   
+   public void testClusteredTopicDurablePersistent() throws Exception
+   {
+      clusteredTopicDurable(true);
+   }
+   
+   public void testClusteredTopicSharedDurableLocalConsumerNonPersistent() throws Exception
+   {
+      clusteredTopicSharedDurableLocalConsumer(false);
+   }
+   
+   public void testClusteredTopicSharedDurableLocalConsumerPersistent() throws Exception
+   {
+      clusteredTopicSharedDurableLocalConsumer(true);
+   }
+   
+   public void testClusteredTopicSharedDurableNoLocalConsumerNonPersistent() throws Exception
+   {
+      clusteredTopicSharedDurableNoLocalConsumer(false);
+   }
+   
+   public void testClusteredTopicSharedDurableNoLocalConsumerPersistent() throws Exception
+   {
+      clusteredTopicSharedDurableNoLocalConsumer(true);
+   }
+   
+   public void testClusteredTopicSharedDurableNoLocalSubNonPersistent() throws Exception
+   {
+      clusteredTopicSharedDurableNoLocalSub(false);
+   }
+   
+   public void testClusteredTopicSharedDurableNoLocalSubPersistent() throws Exception
+   {
+      clusteredTopicSharedDurableNoLocalSub(true);
+   }
+   
+   
+   
+   
+   /*
+    * Create a consumer on each queue on each node.
+    * Send messages in turn from all nodes.
+    * Ensure that the local consumer gets the message
+    */
+   protected void clusteredQueueLocalConsumer(boolean persistent) throws Exception
+   {
+      Connection conn1 = null;
+      
+      Connection conn2 = null;
+      
+      Connection conn3 = null;
+      try
+      {
+         conn1 = cf1.createConnection();
+         
+         conn2 = cf2.createConnection();
+         
+         conn3 = cf3.createConnection();
+           
+         Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         Session sess3 = conn3.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         MessageConsumer cons1 = sess1.createConsumer(queue1);
+         
+         MessageConsumer cons2 = sess2.createConsumer(queue2);
+         
+         MessageConsumer cons3 = sess3.createConsumer(queue3);
+         
+         conn1.start();
+         
+         conn2.start();
+         
+         conn3.start();
+         
+         //Send at node1
+         
+         MessageProducer prod1 = sess1.createProducer(queue1);
+         
+         prod1.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+         
+         final int NUM_MESSAGES = 100;
+         
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = sess1.createTextMessage("message" + i);
+            
+            prod1.send(tm);
+         }
+         
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = (TextMessage)cons1.receive(1000);
+            
+            assertNotNull(tm);
+            
+            assertEquals("message" + i, tm.getText());
+         }
+         
+         Message m = cons2.receive(2000);
+         
+         assertNull(m);
+         
+         m = cons3.receive(2000);
+         
+         assertNull(m);
+         
+         // Send at node2
+         
+         MessageProducer prod2 = sess2.createProducer(queue2);
+         
+         prod2.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+         
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = sess2.createTextMessage("message" + i);
+            
+            prod2.send(tm);
+         }
+         
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = (TextMessage)cons2.receive(1000);
+            
+            assertNotNull(tm);
+            
+            assertEquals("message" + i, tm.getText());
+         }
+         
+         m = cons1.receive(2000);
+         
+         assertNull(m);
+         
+         m = cons3.receive(2000);
+         
+         assertNull(m);
+         
+         // Send at node3
+         
+         MessageProducer prod3 = sess3.createProducer(queue3);
+         
+         prod3.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+         
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = sess3.createTextMessage("message" + i);
+            
+            prod3.send(tm);
+         }
+            
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = (TextMessage)cons3.receive(1000);
+            
+            assertNotNull(tm);
+            
+            assertEquals("message" + i, tm.getText());
+         }
+         
+         m = cons1.receive(2000);
+         
+         assertNull(m);
+         
+         m = cons2.receive(2000);
+         
+         assertNull(m);         
+      }
+      finally
+      {      
+         if (conn1 != null) conn1.close();
+         
+         if (conn2 != null) conn2.close();
+         
+         if (conn3 != null) conn3.close();
+      }
+   }
+   
+   
+   
+   
+   /*
+    * Create a consumer on two nodes out of three
+    * Send messages from the third node
+    * Ensure that the messages are received from the other two nodes in 
+    * round robin order.
+    * (Note that this test depends on us using the default router which has
+    * this round robin behaviour)
+    */
+   protected void clusteredQueueNoLocalConsumer(boolean persistent) throws Exception
+   {
+      Connection conn1 = null;
+      
+      Connection conn2 = null;
+      
+      Connection conn3 = null;
+      try
+      {
+         conn1 = cf1.createConnection();
+         
+         conn2 = cf2.createConnection();
+         
+         conn3 = cf3.createConnection();
+           
+         Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         Session sess3 = conn3.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         MessageConsumer cons2 = sess2.createConsumer(queue2);
+         
+         MessageConsumer cons3 = sess3.createConsumer(queue3);
+         
+         conn2.start();
+         
+         conn3.start();
+         
+         //Send at node1
+         
+         MessageProducer prod1 = sess1.createProducer(queue1);
+         
+         prod1.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+         
+         final int NUM_MESSAGES = 100;
+         
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = sess1.createTextMessage("message" + i);
+            
+            prod1.send(tm);
+         }
+         
+         for (int i = 0; i < NUM_MESSAGES / 2; i++)
+         {
+            TextMessage tm = (TextMessage)cons2.receive(1000);
+            
+            assertNotNull(tm);
+            
+            assertEquals("message" + i * 2, tm.getText());
+         }
+         
+         for (int i = 0; i < NUM_MESSAGES / 2; i++)
+         {
+            TextMessage tm = (TextMessage)cons3.receive(1000);
+            
+            assertNotNull(tm);
+            
+            assertEquals("message" + (i * 2 + 1), tm.getText());
+         }
+      
+      }
+      finally
+      {      
+         if (conn1 != null) conn1.close();
+         
+         if (conn2 != null) conn2.close();
+         
+         if (conn3 != null) conn3.close();
+      }
+   }
+   
+   
+   
+   /*
+    * Create non durable subscriptions on all nodes of the cluster.
+    * Ensure all messages are receive as appropriate
+    */
+   public void clusteredTopicNonDurable(boolean persistent) throws Exception
+   {
+      Connection conn1 = null;
+      
+      Connection conn2 = null;
+      
+      Connection conn3 = null;
+      try
+      {
+         conn1 = cf1.createConnection();
+         
+         conn2 = cf2.createConnection();
+         
+         conn3 = cf3.createConnection();
+           
+         Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         Session sess3 = conn3.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         MessageConsumer cons1 = sess1.createConsumer(topic1);
+         
+         MessageConsumer cons2 = sess2.createConsumer(topic2);
+         
+         MessageConsumer cons3 = sess3.createConsumer(topic3);
+         
+         MessageConsumer cons4 = sess1.createConsumer(topic1);
+         
+         MessageConsumer cons5 = sess2.createConsumer(topic2);
+            
+         conn1.start();
+         
+         conn2.start();
+         
+         conn3.start();
+         
+         //Send at node1
+         
+         MessageProducer prod1 = sess1.createProducer(topic1);
+         
+         prod1.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+         
+         final int NUM_MESSAGES = 100;
+         
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = sess1.createTextMessage("message" + i);
+            
+            prod1.send(tm);
+         }
+            
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = (TextMessage)cons1.receive(1000);
+            
+            assertNotNull(tm);
+                        
+            assertEquals("message" + i, tm.getText());                        
+         }
+         
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = (TextMessage)cons2.receive(1000);
+                      
+            assertNotNull(tm);
+            
+            assertEquals("message" + i, tm.getText());
+         }
+         
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = (TextMessage)cons3.receive(1000);
+                        
+            assertNotNull(tm);
+             
+            assertEquals("message" + i, tm.getText());
+         } 
+         
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = (TextMessage)cons4.receive(1000);
+                        
+            assertNotNull(tm);
+             
+            assertEquals("message" + i, tm.getText());
+         } 
+         
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = (TextMessage)cons5.receive(1000);
+                        
+            assertNotNull(tm);
+             
+            assertEquals("message" + i, tm.getText());
+         } 
+      }
+      finally
+      {      
+         if (conn1 != null) conn1.close();
+         
+         if (conn2 != null) conn2.close();
+         
+         if (conn3 != null) conn3.close();
+      }
+   }
+   
+   
+   
+   
+   /*
+    * Create non durable subscriptions on all nodes of the cluster.
+    * Include some with selectors
+    * Ensure all messages are receive as appropriate
+    */
+   public void clusteredTopicNonDurableWithSelectors(boolean persistent) throws Exception
+   {
+      Connection conn1 = null;
+      
+      Connection conn2 = null;
+      
+      Connection conn3 = null;
+      try
+      {
+         conn1 = cf1.createConnection();
+         
+         conn2 = cf2.createConnection();
+         
+         conn3 = cf3.createConnection();
+                             
+         Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         Session sess3 = conn3.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         MessageConsumer cons1 = sess1.createConsumer(topic1);
+         
+         MessageConsumer cons2 = sess2.createConsumer(topic2);
+         
+         MessageConsumer cons3 = sess3.createConsumer(topic3);
+         
+         MessageConsumer cons4 = sess1.createConsumer(topic1, "COLOUR='red'");
+         
+         MessageConsumer cons5 = sess2.createConsumer(topic2, "COLOUR='blue'");
+            
+         conn1.start();
+         
+         conn2.start();
+         
+         conn3.start();
+         
+         //Send at node1
+         
+         MessageProducer prod1 = sess1.createProducer(topic1);
+         
+         prod1.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+         
+         final int NUM_MESSAGES = 100;
+         
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = sess1.createTextMessage("message" + i);
+            
+            int c = i % 3;
+            if (c == 0)
+            {
+               tm.setStringProperty("COLOUR", "red");
+            }
+            else if (c == 1)
+            {
+               tm.setStringProperty("COLOUR", "blue");
+            }
+            
+            prod1.send(tm);
+         }
+            
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = (TextMessage)cons1.receive(1000);
+            
+            assertNotNull(tm);
+                        
+            assertEquals("message" + i, tm.getText());                        
+         }
+         
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = (TextMessage)cons2.receive(1000);
+                      
+            assertNotNull(tm);
+            
+            assertEquals("message" + i, tm.getText());
+         }
+         
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = (TextMessage)cons3.receive(1000);
+                        
+            assertNotNull(tm);
+             
+            assertEquals("message" + i, tm.getText());
+         } 
+         
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            int c = i % 3;
+            
+            if (c == 0)
+            {
+               TextMessage tm = (TextMessage)cons4.receive(1000);
+                           
+               assertNotNull(tm);
+                
+               assertEquals("message" + i, tm.getText());
+            }
+         } 
+         
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            int c = i % 3;
+            
+            if (c == 1)
+            {
+               TextMessage tm = (TextMessage)cons5.receive(1000);
+                           
+               assertNotNull(tm);
+                
+               assertEquals("message" + i, tm.getText());
+            }
+         } 
+      }
+      finally
+      {      
+         if (conn1 != null) conn1.close();
+         
+         if (conn2 != null) conn2.close();
+         
+         if (conn3 != null) conn3.close();
+      }
+   }
+   
+   
+   
+   /*
+    * Create durable subscriptions on all nodes of the cluster.
+    * Include a couple with selectors
+    * Ensure all messages are receive as appropriate
+    * None of the durable subs are shared
+    */
+   public void clusteredTopicDurable(boolean persistent) throws Exception
+   {
+      Connection conn1 = null;
+      
+      Connection conn2 = null;
+      
+      Connection conn3 = null;
+      try
+      {
+         conn1 = cf1.createConnection();
+         
+         conn2 = cf2.createConnection();
+         
+         conn3 = cf3.createConnection();
+         
+         conn1.setClientID("wib1");
+         
+         conn2.setClientID("wib1");
+         
+         conn3.setClientID("wib1");
+           
+         Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         Session sess3 = conn3.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         MessageConsumer cons1 = sess1.createDurableSubscriber(topic1, "sub1");
+         
+         MessageConsumer cons2 = sess2.createDurableSubscriber(topic2, "sub2");
+         
+         MessageConsumer cons3 = sess3.createDurableSubscriber(topic3, "sub3");
+         
+         MessageConsumer cons4 = sess1.createDurableSubscriber(topic1, "sub4");
+         
+         MessageConsumer cons5 = sess2.createDurableSubscriber(topic2, "sub5");
+            
+         conn1.start();
+         
+         conn2.start();
+         
+         conn3.start();
+         
+         //Send at node1
+         
+         MessageProducer prod1 = sess1.createProducer(topic1);
+         
+         prod1.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+         
+         final int NUM_MESSAGES = 100;
+         
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = sess1.createTextMessage("message" + i);
+            
+            prod1.send(tm);
+         }
+            
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = (TextMessage)cons1.receive(1000);
+            
+            assertNotNull(tm);
+                        
+            assertEquals("message" + i, tm.getText());                        
+         }
+         
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = (TextMessage)cons2.receive(1000);
+                      
+            assertNotNull(tm);
+            
+            assertEquals("message" + i, tm.getText());
+         }
+         
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = (TextMessage)cons3.receive(1000);
+                        
+            assertNotNull(tm);
+             
+            assertEquals("message" + i, tm.getText());
+         } 
+         
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = (TextMessage)cons4.receive(1000);
+                        
+            assertNotNull(tm);
+             
+            assertEquals("message" + i, tm.getText());
+         } 
+         
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = (TextMessage)cons5.receive(1000);
+                        
+            assertNotNull(tm);
+             
+            assertEquals("message" + i, tm.getText());
+         } 
+         
+         cons1.close();
+         
+         cons2.close();
+         
+         cons3.close();
+         
+         cons4.close();
+         
+         cons5.close();
+         
+         sess1.unsubscribe("sub1");
+         
+         sess2.unsubscribe("sub2");
+         
+         sess3.unsubscribe("sub3");
+         
+         sess1.unsubscribe("sub4");
+         
+         sess2.unsubscribe("sub5");
+         
+      }
+      finally
+      {      
+         if (conn1 != null) conn1.close();
+         
+         if (conn2 != null) conn2.close();
+         
+         if (conn3 != null) conn3.close();
+      }
+   }
+   
+   
+   
+   
+   /*
+    * Create shared durable subs on multiple nodes, the local instance should always get the message
+    */
+   protected void clusteredTopicSharedDurableLocalConsumer(boolean persistent) throws Exception
+   {
+      Connection conn1 = null;
+      
+      Connection conn2 = null;
+      
+      Connection conn3 = null;
+      try
+      {
+         conn1 = cf1.createConnection();
+         
+         conn2 = cf2.createConnection();
+         
+         conn3 = cf3.createConnection();
+         
+         conn1.setClientID("wib1");
+         
+         conn2.setClientID("wib1");
+         
+         conn3.setClientID("wib1");
+           
+         Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         Session sess3 = conn3.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         MessageConsumer cons1 = sess1.createDurableSubscriber(topic1, "sub1");
+         
+         MessageConsumer cons2 = sess2.createDurableSubscriber(topic2, "sub1");
+         
+         MessageConsumer cons3 = sess3.createDurableSubscriber(topic3, "sub1");
+         
+         conn1.start();
+         
+         conn2.start();
+         
+         conn3.start();
+         
+         //Send at node1
+         
+         MessageProducer prod1 = sess1.createProducer(topic1);
+         
+         prod1.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+         
+         final int NUM_MESSAGES = 100;
+         
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = sess1.createTextMessage("message" + i);
+            
+            prod1.send(tm);
+         }
+         
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = (TextMessage)cons1.receive(1000);
+            
+            assertNotNull(tm);
+            
+            assertEquals("message" + i, tm.getText());
+         }
+         
+         Message m = cons2.receive(2000);
+         
+         assertNull(m);
+         
+         m = cons3.receive(2000);
+         
+         assertNull(m);
+         
+         // Send at node2
+         
+         MessageProducer prod2 = sess2.createProducer(topic2);
+         
+         prod2.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+         
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = sess2.createTextMessage("message" + i);
+            
+            prod2.send(tm);
+         }
+         
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = (TextMessage)cons2.receive(1000);
+            
+            assertNotNull(tm);
+               
+            assertEquals("message" + i, tm.getText());
+         }
+         
+         m = cons1.receive(2000);
+         
+         assertNull(m);
+         
+         m = cons3.receive(2000);
+         
+         assertNull(m);
+         
+         // Send at node3
+         
+         MessageProducer prod3 = sess3.createProducer(topic3);
+         
+         prod3.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+         
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = sess3.createTextMessage("message" + i);
+            
+            prod3.send(tm);
+         }
+           
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = (TextMessage)cons3.receive(1000);
+            
+            assertNotNull(tm);
+            
+            assertEquals("message" + i, tm.getText());
+         }
+         
+         m = cons1.receive(2000);
+         
+         assertNull(m);
+         
+         m = cons2.receive(2000);
+         
+         assertNull(m);         
+         
+         cons1.close();
+         
+         cons2.close();
+         
+         cons3.close();
+         
+         //Need to unsubscribe on any node that the durable sub was created on
+         
+         sess1.unsubscribe("sub1");
+         
+         sess2.unsubscribe("sub1");
+         
+         sess3.unsubscribe("sub1");
+      }
+      finally
+      {      
+         if (conn1 != null) conn1.close();
+         
+         if (conn2 != null) conn2.close();
+         
+         if (conn3 != null) conn3.close();
+      }
+   }
+   
+   
+   /*
+    * Create shared durable subs on multiple nodes, but without consumer on local node
+    * even thought there is durable sub
+    * should round robin
+    * note that this test assumes round robin
+    */
+   protected void clusteredTopicSharedDurableNoLocalConsumer(boolean persistent) throws Exception
+   {
+      Connection conn1 = null;
+      
+      Connection conn2 = null;
+      
+      Connection conn3 = null;
+      try
+      {
+         conn1 = cf1.createConnection();
+         
+         conn2 = cf2.createConnection();
+         
+         conn3 = cf3.createConnection();
+         
+         conn1.setClientID("wib1");
+         
+         conn2.setClientID("wib1");
+         
+         conn3.setClientID("wib1");
+           
+         Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         Session sess3 = conn3.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         MessageConsumer cons1 = sess1.createDurableSubscriber(topic1, "sub1");
+         
+         //Now close it on node 1
+         conn1.close();
+         
+         conn1 = cf1.createConnection();
+         
+         conn1.setClientID("wib1");         
+         
+         sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         //This means the durable sub is inactive on node1
+         
+         MessageConsumer cons2 = sess2.createDurableSubscriber(topic2, "sub1");
+         
+         MessageConsumer cons3 = sess3.createDurableSubscriber(topic3, "sub1");
+         
+         conn2.start();
+         
+         conn3.start();
+         
+         //Send at node1
+         
+         //Should round robin between the other 2 since there is no active consumer on sub1 on node1
+         
+         MessageProducer prod1 = sess1.createProducer(topic1);
+         
+         prod1.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+         
+         final int NUM_MESSAGES = 100;
+         
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = sess1.createTextMessage("message" + i);
+            
+            prod1.send(tm);
+         }
+         
+         for (int i = 0; i < NUM_MESSAGES / 2; i++)
+         {
+            TextMessage tm = (TextMessage)cons2.receive(1000);
+            
+            assertNotNull(tm);
+            
+            assertEquals("message" + i * 2, tm.getText());
+         }
+         
+         for (int i = 0; i < NUM_MESSAGES / 2; i++)
+         {
+            TextMessage tm = (TextMessage)cons3.receive(1000);
+            
+            assertNotNull(tm);
+            
+            assertEquals("message" + (i * 2 + 1), tm.getText());
+         }
+         
+         cons2.close();
+         
+         cons3.close();
+         
+         sess1.unsubscribe("sub1");
+         
+         sess2.unsubscribe("sub1");
+         
+         sess3.unsubscribe("sub1");
+      
+      }
+      finally
+      {      
+         if (conn1 != null) conn1.close();
+         
+         if (conn2 != null) conn2.close();
+         
+         if (conn3 != null) conn3.close();
+      }
+   }
+   
+   
+   
+   /*
+    * Create shared durable subs on multiple nodes, but without sub on local node
+    * should round robin
+    * note that this test assumes round robin
+    */
+   protected void clusteredTopicSharedDurableNoLocalSub(boolean persistent) throws Exception
+   {
+      Connection conn1 = null;
+      
+      Connection conn2 = null;
+      
+      Connection conn3 = null;
+      try
+      {
+         conn1 = cf1.createConnection();
+         
+         conn2 = cf2.createConnection();
+         
+         conn3 = cf3.createConnection();
+         
+         conn2.setClientID("wib1");
+         
+         conn3.setClientID("wib1");
+           
+         Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         Session sess3 = conn3.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                  
+         MessageConsumer cons2 = sess2.createDurableSubscriber(topic2, "sub1");
+         
+         MessageConsumer cons3 = sess3.createDurableSubscriber(topic3, "sub1");
+         
+         conn2.start();
+         
+         conn3.start();
+         
+         //Send at node1
+         
+         //Should round robin between the other 2 since there is no active consumer on sub1 on node1
+         
+         MessageProducer prod1 = sess1.createProducer(topic1);
+         
+         prod1.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+         
+         final int NUM_MESSAGES = 100;
+         
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = sess1.createTextMessage("message" + i);
+            
+            prod1.send(tm);
+         }
+         
+         for (int i = 0; i < NUM_MESSAGES / 2; i++)
+         {
+            TextMessage tm = (TextMessage)cons2.receive(1000);
+            
+            assertNotNull(tm);
+            
+            assertEquals("message" + i * 2, tm.getText());
+         }
+         
+         for (int i = 0; i < NUM_MESSAGES / 2; i++)
+         {
+            TextMessage tm = (TextMessage)cons3.receive(1000);
+            
+            assertNotNull(tm);
+            
+            assertEquals("message" + (i * 2 + 1), tm.getText());
+         }
+         
+         cons2.close();
+         
+         cons3.close();
+         
+         sess2.unsubscribe("sub1");
+         
+         sess3.unsubscribe("sub1");
+      
+      }
+      finally
+      {      
+         if (conn1 != null) conn1.close();
+         
+         if (conn2 != null) conn2.close();
+         
+         if (conn3 != null) conn3.close();
+      }
+   }
+
+   class MyListener implements MessageListener
+   {
+      private int i;
+      
+      MyListener(int i)
+      {
+         this.i = i;
+      }
+
+      public void onMessage(Message m)
+      {
+         try
+         {
+            int count = m.getIntProperty("count");
+            
+            log.info("Listener " + i + " received message " + count);
+         }
+         catch (Exception e)
+         {
+            e.printStackTrace();
+         }
+      }
+      
+   }
+   
+}

Added: trunk/tests/src/org/jboss/test/messaging/jms/manual/ManualPagingSoakTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/manual/ManualPagingSoakTest.java	2006-10-19 19:30:58 UTC (rev 1501)
+++ trunk/tests/src/org/jboss/test/messaging/jms/manual/ManualPagingSoakTest.java	2006-10-19 22:01:47 UTC (rev 1502)
@@ -0,0 +1,139 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.test.messaging.jms.manual;
+
+import java.util.Properties;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+
+import org.jboss.test.messaging.MessagingTestCase;
+
+
+public class ManualPagingSoakTest extends MessagingTestCase
+{
+   protected Context ic1;
+ 
+   protected Queue queue;
+   
+   protected Topic topic;
+     
+   protected ConnectionFactory cf;
+        
+   public ManualPagingSoakTest(String name)
+   {
+      super(name);
+   }
+
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+      
+      Properties props1 = new Properties();
+      
+      props1.put(Context.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory");
+      props1.put(Context.PROVIDER_URL, "jnp://localhost:1199");
+      props1.put(Context.URL_PKG_PREFIXES, "org.jnp.interfaces");
+      
+      ic1 = new InitialContext(props1);
+                  
+      queue = (Queue)ic1.lookup("queue/testQueue");
+      
+      topic = (Topic)ic1.lookup("topic/testTopic");
+      
+      cf = (ConnectionFactory)ic1.lookup("/ConnectionFactory");
+      
+   }
+
+   protected void tearDown() throws Exception
+   {
+      super.tearDown();
+      
+      ic1.close();
+   }
+   
+   public void test1() throws Exception
+   {
+      Connection conn = null;
+         
+      try
+      {
+         conn = cf.createConnection();
+         
+         Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+              
+         MessageProducer prod = sess.createProducer(queue);
+         
+         prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+         
+         final int NUM_MESSAGES = 1000000;
+         
+         byte[] bytes = new byte[2048];
+         
+         String s = new String(bytes);
+         
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = sess.createTextMessage(s);
+            
+            prod.send(tm);
+            
+            if (i % 1000 == 0)
+            {
+               log.info("Sent " + i);
+            }
+         }
+         
+         log.info("Receiving");
+         
+         MessageConsumer cons = sess.createConsumer(queue);
+         
+         conn.start();
+         
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = (TextMessage)cons.receive(2000);
+            
+            assertNotNull(tm);
+            
+            if (i % 1000 == 0)
+            {
+               log.info("Received " + i);
+            }
+         }
+                  
+      }
+      finally
+      {      
+         if (conn != null) conn.close();
+      }
+   }       
+}

Modified: trunk/tests/src/org/jboss/test/messaging/tools/jmx/ServiceContainer.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/tools/jmx/ServiceContainer.java	2006-10-19 19:30:58 UTC (rev 1501)
+++ trunk/tests/src/org/jboss/test/messaging/tools/jmx/ServiceContainer.java	2006-10-19 22:01:47 UTC (rev 1502)
@@ -1055,6 +1055,8 @@
    {
       try
       {
+         log.info("************************** Deleting all data from database");
+         
          InitialContext ctx = new InitialContext();
 
          TransactionManager mgr = (TransactionManager)ctx.lookup(TransactionManagerService.JNDI_NAME);
@@ -1065,7 +1067,7 @@
 
          Connection conn = ds.getConnection();
 
-         String sql = "DELETE FROM JMS_CHANNEL_MAPPING";
+         String sql = "DELETE FROM JMS_POSTOFFICE";
          PreparedStatement ps = conn.prepareStatement(sql);
 
          int rows = ps.executeUpdate();




More information about the jboss-cvs-commits mailing list