[jboss-cvs] JBoss Messaging SVN: r3022 - in trunk: src/main/org/jboss/jms/server/endpoint and 10 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Aug 21 17:07:50 EDT 2007


Author: timfox
Date: 2007-08-21 17:07:50 -0400 (Tue, 21 Aug 2007)
New Revision: 3022

Modified:
   trunk/src/etc/server/default/deploy/mysql-persistence-service.xml
   trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
   trunk/src/main/org/jboss/messaging/core/contract/Message.java
   trunk/src/main/org/jboss/messaging/core/contract/PersistenceManager.java
   trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
   trunk/src/main/org/jboss/messaging/core/impl/PagingChannelSupport.java
   trunk/src/main/org/jboss/messaging/core/impl/message/MessageSupport.java
   trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
   trunk/src/main/org/jboss/messaging/core/impl/tx/TransactionRepository.java
   trunk/src/main/org/jboss/messaging/util/LockMap.java
   trunk/tests/src/org/jboss/test/messaging/core/JDBCPersistenceManagerTest.java
   trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_NP_2PCTest.java
   trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_NP_NTTest.java
   trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_NP_TTest.java
   trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_P_2PCTest.java
   trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_P_NTTest.java
   trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_P_TTest.java
   trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_NP_2PCTest.java
   trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_NP_NTTest.java
   trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_NP_TTest.java
   trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_P_2PCTest.java
   trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_P_NTTest.java
   trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_P_TTest.java
   trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_ReloadTest.java
   trunk/tests/src/org/jboss/test/messaging/jms/AcknowledgementTest.java
   trunk/tests/src/org/jboss/test/messaging/jms/JMSTestCase.java
   trunk/tests/src/org/jboss/test/messaging/jms/MessageCleanupTest.java
   trunk/tests/src/org/jboss/test/messaging/jms/XARecoveryTest.java
   trunk/tests/src/org/jboss/test/messaging/jms/XATest.java
   trunk/tests/src/org/jboss/test/messaging/tools/container/LocalTestServer.java
   trunk/tests/src/org/jboss/test/messaging/tools/container/RMITestServer.java
   trunk/tests/src/org/jboss/test/messaging/tools/container/Server.java
Log:
Persistence Manager changes interim commit


Modified: trunk/src/etc/server/default/deploy/mysql-persistence-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/mysql-persistence-service.xml	2007-08-21 16:24:12 UTC (rev 3021)
+++ trunk/src/etc/server/default/deploy/mysql-persistence-service.xml	2007-08-21 21:07:50 UTC (rev 3022)
@@ -41,6 +41,7 @@
    CREATE_IDX_MESSAGE_REF_MESSAGE_ID=CREATE INDEX JBM_MSG_REF_MESSAGE_ID ON JBM_MSG_REF (MESSAGE_ID)
    CREATE_IDX_MESSAGE_REF_SCHED_DELIVERY=CREATE INDEX JBM_MSG_REF_SCHED_DELIVERY ON JBM_MSG_REF (SCHED_DELIVERY)
    CREATE_MESSAGE=CREATE TABLE JBM_MSG (MESSAGE_ID BIGINT, RELIABLE CHAR(1), EXPIRATION BIGINT, TIMESTAMP BIGINT, PRIORITY TINYINT, HEADERS MEDIUMBLOB, PAYLOAD LONGBLOB, TYPE TINYINT, PRIMARY KEY (MESSAGE_ID)) ENGINE = INNODB
+   CREATE_IDX_MESSAGE_TIMESTAMP=CREATE INDEX JBM_MSG_REF_TIMESTAMP ON JBM_MSG (TIMESTAMP)
    CREATE_TRANSACTION=CREATE TABLE JBM_TX (NODE_ID INTEGER, TRANSACTION_ID BIGINT, BRANCH_QUAL VARBINARY(254), FORMAT_ID INTEGER, GLOBAL_TXID VARBINARY(254), PRIMARY KEY (TRANSACTION_ID)) ENGINE = INNODB
    CREATE_COUNTER=CREATE TABLE JBM_COUNTER (NAME VARCHAR(255), NEXT_ID BIGINT, PRIMARY KEY(NAME)) ENGINE = INNODB
    INSERT_MESSAGE_REF=INSERT INTO JBM_MSG_REF (CHANNEL_ID, MESSAGE_ID, TRANSACTION_ID, STATE, ORD, PAGE_ORD, DELIVERY_COUNT, SCHED_DELIVERY) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
@@ -60,11 +61,10 @@
    UPDATE_DELIVERY_COUNT=UPDATE JBM_MSG_REF SET DELIVERY_COUNT = ? WHERE CHANNEL_ID = ? AND MESSAGE_ID = ?
    UPDATE_CHANNEL_ID=UPDATE JBM_MSG_REF SET CHANNEL_ID = ? WHERE CHANNEL_ID = ?
    LOAD_MESSAGES=SELECT MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, HEADERS, PAYLOAD, TYPE FROM JBM_MSG
-   INSERT_MESSAGE=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, HEADERS, PAYLOAD, TYPE) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
-   DELETE_MESSAGE=DELETE FROM JBM_MSG WHERE MESSAGE_ID=?
-   DELETE_PAGED_MESSAGE=DELETE FROM JBM_MSG WHERE MESSAGE_ID=? AND NOT EXISTS (SELECT * FROM JBM_MSG_REF WHERE MESSAGE_ID = ?)
+   INSERT_MESSAGE=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, HEADERS, PAYLOAD, TYPE) VALUES (?, ?, ?, ?, ?, ?, ?, ?)   
    MESSAGE_ID_COLUMN=MESSAGE_ID
    MESSAGE_EXISTS=SELECT MESSAGE_ID FROM JBM_MSG WHERE MESSAGE_ID = ? FOR UPDATE
+   REAP_MESSAGES=DELETE FROM JBM_MSG WHERE TIMESTAMP < ? AND NOT EXISTS (SELECT * FROM JBM_MSG_REF WHERE JBM_MSG_REF.MESSAGE_ID = JBM_MSG.MESSAGE_ID)
    INSERT_TRANSACTION=INSERT INTO JBM_TX (NODE_ID, TRANSACTION_ID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID) VALUES(?, ?, ?, ?, ?)
    DELETE_TRANSACTION=DELETE FROM JBM_TX WHERE NODE_ID = ? AND TRANSACTION_ID = ?
    SELECT_PREPARED_TRANSACTIONS=SELECT TRANSACTION_ID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID FROM JBM_TX WHERE NODE_ID = ?

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2007-08-21 16:24:12 UTC (rev 3021)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2007-08-21 21:07:50 UTC (rev 3022)
@@ -1724,7 +1724,6 @@
       {               
          if (queue != null)
          {          
-         	msg.setPersistentCount(1);
             queue.handle(null, ref, tx);
             del.acknowledge(tx);
          }

Modified: trunk/src/main/org/jboss/messaging/core/contract/Message.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/contract/Message.java	2007-08-21 16:24:12 UTC (rev 3021)
+++ trunk/src/main/org/jboss/messaging/core/contract/Message.java	2007-08-21 21:07:50 UTC (rev 3022)
@@ -149,14 +149,6 @@
     */
    MessageReference createReference();
    
-   void setPersistentCount(int count);
-   
-   int getPersistentCount();
-   
-   void incrementPersistentCount();
-   
-   void decrementPersistentCount();
-   
    boolean isPersisted();
    
    void setPersisted(boolean persisted);

Modified: trunk/src/main/org/jboss/messaging/core/contract/PersistenceManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/contract/PersistenceManager.java	2007-08-21 16:24:12 UTC (rev 3021)
+++ trunk/src/main/org/jboss/messaging/core/contract/PersistenceManager.java	2007-08-21 21:07:50 UTC (rev 3022)
@@ -39,11 +39,13 @@
  */
 public interface PersistenceManager extends MessagingComponent
 {
-	void setPaging(long channelID, boolean paging);
-
-	boolean isPaging();
+	void startReaper();
 	
+	void stopReaper();
 	
+	void reapUnreferencedMessages() throws Exception;
+	
+
    void addReference(long channelID, MessageReference ref, Transaction tx) throws Exception;
 
    void removeReference(long channelID, MessageReference ref, Transaction tx) throws Exception;

Modified: trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java	2007-08-21 16:24:12 UTC (rev 3021)
+++ trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java	2007-08-21 21:07:50 UTC (rev 3022)
@@ -43,6 +43,8 @@
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
 
 import javax.sql.DataSource;
 import javax.transaction.TransactionManager;
@@ -101,9 +103,14 @@
    
    private boolean nodeIDSet;
    
-   private Set pagingSet = new HashSet();
-  
-      
+   private Timer reaperTimer;
+   
+   private Reaper reaper;
+   
+   private long reaperPeriod;
+   
+   private boolean reaperRunning;
+          
    // Constructors --------------------------------------------------
     
    public JDBCPersistenceManager(DataSource ds, TransactionManager tm, Properties sqlProperties,
@@ -118,7 +125,13 @@
       
       this.usingTrailingByte = usingTrailingByte;
       
-      this.maxParams = maxParams;      
+      this.maxParams = maxParams;    
+      
+      this.reaperPeriod = 5000;
+      
+      reaperTimer = new Timer(true);
+      
+      reaper = new Reaper();
    }
    
    
@@ -169,6 +182,10 @@
    public void stop() throws Exception
    {
       super.stop();
+      
+      reaperTimer.cancel();
+      
+      stopReaper();
    }
    
    // Injection -------------------------------------------------
@@ -184,24 +201,36 @@
    
    // PersistenceManager implementation -------------------------
    
-   public synchronized void setPaging(long channelID, boolean paging)
+   public synchronized void startReaper()
    {
-   	Long l = new Long(channelID);
-   	if (paging)
+   	if (reaperRunning)
    	{
-   		pagingSet.add(l);
+   		return;
    	}
-   	else
+   	if (reaperPeriod != -1)
    	{
-   		pagingSet.remove(l);
+   		reaperTimer.schedule(reaper, reaperPeriod, reaperPeriod);
+   		
+   		reaperRunning = true;
    	}
    }
    
-   public synchronized boolean isPaging()
+   public synchronized void stopReaper()
    {
-   	return !pagingSet.isEmpty();
+   	if (!reaperRunning)
+   	{
+   		return;
+   	}
+   	reaper.cancel();
+   	
+   	reaperRunning = false;
    }
    
+   public void reapUnreferencedMessages() throws Exception
+   {
+   	reapUnreferencedMessages(System.currentTimeMillis());
+   }
+      
    // Related to XA Recovery
    // ======================
    
@@ -504,22 +533,23 @@
    // ===============================                 
    
    //Used to page NP messages or P messages in a non recoverable queue
-   public synchronized void pageReferences(long channelID, List references, boolean page) throws Exception
+   
+   public void pageReferences(long channelID, List references, boolean page) throws Exception
    {
       Connection conn = null;
       PreparedStatement psInsertReference = null;  
       PreparedStatement psInsertMessage = null;    
-      PreparedStatement psMessageExists = null;
+  //    PreparedStatement psMessageExists = null;
       TransactionWrapper wrap = new TransactionWrapper();
             
       //First we order the references in message order
-      orderReferences(references);
+      //orderReferences(references);
                          
       try
       {
-         //Now we get a lock on all the messages. Since we have ordered the refs we should avoid deadlock
-         getLocks(references);
-         
+//         //Now we get a lock on all the messages. Since we have ordered the refs we should avoid deadlock
+//         getLocks(references);
+//         
          conn = ds.getConnection();
          
          Iterator iter = references.iterator();
@@ -527,7 +557,7 @@
          psInsertReference = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE_REF"));
          psInsertMessage = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE"));
          
-         boolean insertsInBatch = false;
+        // boolean insertsInBatch = false;
               
          while (iter.hasNext())
          {
@@ -539,82 +569,87 @@
             //Now store the reference
             addReference(channelID, ref, psInsertReference, page);
                         
-            if (usingBatchUpdates)
-            {
-               psInsertReference.addBatch();
-            }
-            else
-            {
+//            if (usingBatchUpdates)
+//            {
+//               psInsertReference.addBatch();
+//            }
+//            else
+//            {
                int rows = executeWithRetry(psInsertReference);
                
                if (trace)
                {
                   log.trace("Inserted " + rows + " rows");
                }
-            }
+            //}
             
             //Maybe we need to persist the message itself
             Message m = ref.getMessage();
+            
+            synchronized (m)
+            {
                                      
             if (!m.isPersisted())
             {            	               
             	//The message might actually already exist due to it already being paged
+            	//so we insert and ignore key violations
             	
-            	if (psMessageExists == null)
-            	{
-            		psMessageExists = conn.prepareStatement(getSQLStatement("MESSAGE_EXISTS"));            		            		
-            	}
-            	
-            	psMessageExists.setLong(1, m.getMessageID());
-            	
-            	ResultSet rs = null;
-            	
-            	try
-            	{
-            		rs = psMessageExists.executeQuery();
-            		
-            		if (!rs.next())
-            		{
+//            	if (psMessageExists == null)
+//            	{
+//            		psMessageExists = conn.prepareStatement(getSQLStatement("MESSAGE_EXISTS"));            		            		
+//            	}
+//            	
+//            	psMessageExists.setLong(1, m.getMessageID());
+//            	
+//            	ResultSet rs = null;
+//            	
+//            	try
+//            	{
+//            		rs = psMessageExists.executeQuery();
+//            		
+//            		if (!rs.next())
+//            		{
             			storeMessage(m, psInsertMessage); 
             			
-            			if (usingBatchUpdates)
-      	            {
-      	               psInsertMessage.addBatch();	 
-      	               
-      	               insertsInBatch = true;
-      	            }
-      	            else
-      	            {
-      	               int rows = executeWithRetry(psInsertMessage);
+//            			if (usingBatchUpdates)
+//      	            {
+//      	               psInsertMessage.addBatch();	 
+//      	               
+//      	               insertsInBatch = true;
+//      	            }
+//      	            else
+//      	            {
+      	               rows = executeWithRetryIgnoreKeyViolation(psInsertMessage);
       	                                      
                         if (trace) { log.trace("Inserted " + rows + " rows"); }	               
-      	            } 
+      	           // } 
       	            m.setPersisted(true);
             		}
-            	}
-            	finally
-            	{
-            		if (rs != null)
-            		{
-            			rs.close();
-            		}
-            	}  	                               	
-            }    
+//            	}
+//            	finally
+//            	{
+//            		if (rs != null)
+//            		{
+//            			rs.close();
+//            		}
+//            	}  	                               	
+         //   }    
+            }
          }         
          
-         if (usingBatchUpdates)
-         {
-            int[] rowsReference = executeWithRetryBatch(psInsertReference);
-            
-            if (trace) { logBatchUpdate(getSQLStatement("INSERT_MESSAGE_REF"), rowsReference, "inserted"); }
-            
-            if (insertsInBatch)
-            {
-               int[] rowsMessage = executeWithRetryBatch(psInsertMessage);
-               
-               if (trace) { logBatchUpdate(getSQLStatement("INSERT_MESSAGE"), rowsMessage, "inserted"); }
-            }
-         }        
+//         if (usingBatchUpdates)
+//         {
+//            int[] rowsReference = executeWithRetryBatch(psInsertReference);
+//            
+//            if (trace) { logBatchUpdate(getSQLStatement("INSERT_MESSAGE_REF"), rowsReference, "inserted"); }
+//            
+//            if (insertsInBatch)
+//            {
+//               int[] rowsMessage = executeWithRetryBatch(psInsertMessage);
+//               
+//               if (trace) { logBatchUpdate(getSQLStatement("INSERT_MESSAGE"), rowsMessage, "inserted"); }
+//            }
+//         }        
       }
       catch (Exception e)
       {
@@ -625,17 +660,17 @@
       {
       	closeStatement(psInsertReference);
       	closeStatement(psInsertMessage);
-      	closeStatement(psMessageExists);
+     // 	closeStatement(psMessageExists);
       	closeConnection(conn);         
-         try
-         {
+     //    try
+     //    {
             wrap.end();                       
-         }
-         finally
-         {            
-            //And then release locks
-            this.releaseLocks(references);
-         }         
+//         }
+//         finally
+//         {            
+//            //And then release locks
+//            this.releaseLocks(references);
+//         }         
       }      
    }
          
@@ -646,24 +681,22 @@
           
       Connection conn = null;
       PreparedStatement psDeleteReference = null;  
-      PreparedStatement psDeleteMessage = null;
       TransactionWrapper wrap = new TransactionWrapper();
         
       //We order the references
-      orderReferences(references);
+     // orderReferences(references);
              
       try
       {
          //We get locks on all the messages - since they are ordered we avoid deadlock
-         getLocks(references);
+       //  getLocks(references);
          
          conn = ds.getConnection();
          
          Iterator iter = references.iterator();
          
          psDeleteReference = conn.prepareStatement(getSQLStatement("DELETE_MESSAGE_REF"));
-         psDeleteMessage = conn.prepareStatement(getSQLStatement("DELETE_PAGED_MESSAGE"));
-         
+
          while (iter.hasNext())
          {
             MessageReference ref = (MessageReference) iter.next();
@@ -680,38 +713,18 @@
                
                if (trace) { log.trace("Deleted " + rows + " rows"); }
             }
-            
-            Message m = ref.getMessage();
-                                    
-            //Maybe we need to delete the message itself
-              
-            psDeleteMessage.setLong(1, m.getMessageID());
-            psDeleteMessage.setLong(2, m.getMessageID());  
-                        
-            if (usingBatchUpdates)
-            {
-               psDeleteMessage.addBatch();
-            }
-            else
-            {  
-               int rows = executeWithRetry(psDeleteMessage);
-        
-               if (trace) { log.trace("Deleted " + rows + " rows"); }
-            }  
-            
-            ref.getMessage().setPersisted(false);
-            
+                          
+            //There is a small possibility that the ref is depaged here, then paged again, before this flag is set
+            //and the tx is committed so the message be attempted to be inserted twice but this should be ok
+            //since we ignore key violations on message insert
+            ref.getMessage().setPersisted(false);                  
          }         
          
          if (usingBatchUpdates)
          {
             int[] rowsReference = executeWithRetryBatch(psDeleteReference);
             
-            if (trace) { logBatchUpdate(getSQLStatement("DELETE_MESSAGE_REF"), rowsReference, "deleted"); }
-                        
-            rowsReference = executeWithRetryBatch(psDeleteMessage);
-            
-            if (trace) { logBatchUpdate(getSQLStatement("DELETE_PAGED_MESSAGE"), rowsReference, "deleted"); }
+            if (trace) { logBatchUpdate(getSQLStatement("DELETE_MESSAGE_REF"), rowsReference, "deleted"); }                                    
          }              
       }
       catch (Exception e)
@@ -722,24 +735,23 @@
       finally
       {
       	closeStatement(psDeleteReference);
-      	closeStatement(psDeleteMessage);
       	closeConnection(conn);         
-         try
-         {
+//         try
+//         {
             wrap.end();
-         }
-         finally
-         {     
-            //And then release locks
-            this.releaseLocks(references);
-         }         
+//         }
+//         finally
+//         {     
+//            //And then release locks
+//            this.releaseLocks(references);
+//         }         
       }      
    }
    
    // After loading paged refs this is used to update P messages to non paged
    public void updateReferencesNotPagedInRange(long channelID, long orderStart, long orderEnd, long num) throws Exception
    {
-      if (trace) { log.trace("Updating paaged references for channel " + channelID + " between " + orderStart + " and " + orderEnd); }
+      if (trace) { log.trace("Updating paged references for channel " + channelID + " between " + orderStart + " and " + orderEnd); }
       
       Connection conn = null;
       PreparedStatement ps = null;
@@ -1238,7 +1250,7 @@
          try
          {            
             // Get lock on message
-            LockMap.instance.obtainLock(m);
+           // LockMap.instance.obtainLock(m);
                                     
             psReference = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE_REF"));
             
@@ -1282,7 +1294,7 @@
             finally
             {   
                //Release Lock
-               LockMap.instance.releaseLock(m);
+             //  LockMap.instance.releaseLock(m);
             }
          }      
       }
@@ -1340,8 +1352,7 @@
          TransactionWrapper wrap = new TransactionWrapper();
          
          PreparedStatement psReference = null;
-         PreparedStatement psMessage = null;
-         
+
          Connection conn = ds.getConnection();
          
          Message m = ref.getMessage();         
@@ -1349,7 +1360,7 @@
          try
          {
             //get lock on message
-            LockMap.instance.obtainLock(m);
+         //   LockMap.instance.obtainLock(m);
                               
             psReference = conn.prepareStatement(getSQLStatement("DELETE_MESSAGE_REF"));
             
@@ -1364,36 +1375,7 @@
                return;
             }
             
-            if (trace) { log.trace("Deleted " + rows + " rows"); }
-            
-            ref.getMessage().decrementPersistentCount();
-            
-            if (ref.getMessage().getPersistentCount() == 0)
-            {            
-            	if (trace) { log.trace("Last reference so deleting message"); }
-            	
-               //Delete the message (if necessary)
-            	
-            	if (this.isPaging())
-            	{
-            		//There is a possibility there are paged refs holding the message so we need to do a conditional delete
-            		psMessage = conn.prepareStatement(getSQLStatement("DELETE_PAGED_MESSAGE"));
-               	
-	            	psMessage.setLong(1, m.getMessageID());
-	            	
-	            	psMessage.setLong(2, m.getMessageID());
-            	}
-            	else
-            	{	               
-	               psMessage = conn.prepareStatement(getSQLStatement("DELETE_MESSAGE"));
-	                           	
-	            	psMessage.setLong(1, m.getMessageID());
-            	}
-                       
-            	rows = executeWithRetry(psMessage);
-            
-            	if (trace) { log.trace("Delete " + rows + " rows"); }
-            }
+            if (trace) { log.trace("Deleted " + rows + " rows"); }                                  
          }
          catch (Exception e)
          {
@@ -1403,17 +1385,16 @@
          finally
          {
          	closeStatement(psReference);
-         	closeStatement(psMessage);
          	closeConnection(conn);
-            try
-            {
+//            try
+//            {
                wrap.end();               
-            }
-            finally
-            {      
-               //release the lock
-               LockMap.instance.releaseLock(m);
-            }
+//            }
+//            finally
+//            {      
+//               //release the lock
+//               LockMap.instance.releaseLock(m);
+//            }
          }      
       }
    }
@@ -1486,10 +1467,10 @@
     * We order the list of references in ascending message order thus preventing deadlock when 2 or
     * more channels are updating the same messages in different transactions.
     */
-   protected void orderReferences(List references)
-   {      
-      Collections.sort(references, MessageOrderComparator.instance);
-   }
+//   protected void orderReferences(List references)
+//   {      
+//      Collections.sort(references, MessageOrderComparator.instance);
+//   }
    
    protected void handleBeforeCommit1PC(List refsToAdd, List refsToRemove, Transaction tx)
       throws Exception
@@ -1498,21 +1479,21 @@
       //       so we will end up acquiring the lock more than once which is unnecessary. If find
       //       unique set of messages can avoid this.
 
-      List allRefs = new ArrayList(refsToAdd.size() + refsToRemove.size());
-
-      for(Iterator i = refsToAdd.iterator(); i.hasNext(); )
-      {
-         ChannelRefPair pair = (ChannelRefPair)i.next();
-         allRefs.add(pair.ref);
-      }
-
-      for(Iterator i = refsToRemove.iterator(); i.hasNext(); )
-      {
-         ChannelRefPair pair = (ChannelRefPair)i.next();
-         allRefs.add(pair.ref);
-      }
-            
-      orderReferences(allRefs);
+//      List allRefs = new ArrayList(refsToAdd.size() + refsToRemove.size());
+//
+//      for(Iterator i = refsToAdd.iterator(); i.hasNext(); )
+//      {
+//         ChannelRefPair pair = (ChannelRefPair)i.next();
+//         allRefs.add(pair.ref);
+//      }
+//
+//      for(Iterator i = refsToRemove.iterator(); i.hasNext(); )
+//      {
+//         ChannelRefPair pair = (ChannelRefPair)i.next();
+//         allRefs.add(pair.ref);
+//      }
+//            
+//      orderReferences(allRefs);
       
       // For one phase we simply add rows corresponding to the refs and remove rows corresponding to
       // the deliveries in one jdbc tx. We also need to store or remove messages as necessary,
@@ -1521,7 +1502,6 @@
       Connection conn = null;
       PreparedStatement psReference = null;
       PreparedStatement psInsertMessage = null;
-      PreparedStatement psDeleteMessage = null;
       TransactionWrapper wrap = new TransactionWrapper();
       
       try
@@ -1529,7 +1509,7 @@
          conn = ds.getConnection();
          
          // Obtain locks on all messages
-         getLocks(allRefs);
+        // getLocks(allRefs);
          
          // First the adds
 
@@ -1563,34 +1543,37 @@
                psReference = null;
             }
             
-            Message m = ref.getMessage();        
+            Message m = ref.getMessage();    
             
-            if (!m.isPersisted())
-            {   
-               if (batch && psInsertMessage == null || !batch)
-               {
-               	psInsertMessage = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE"));               	
-               }
-            	
-               // First time so add message
-               storeMessage(m, psInsertMessage);
-               m.setPersisted(true);
-
-	            if (batch)
-	            {
-                  psInsertMessage.addBatch();
-                  if (trace) { log.trace("Message does not already exist so inserting it"); }
-                  messageInsertsInBatch = true;	               
+            synchronized (m)
+            {            
+	            if (!m.isPersisted())
+	            {   
+	               if (batch && psInsertMessage == null || !batch)
+	               {
+	               	psInsertMessage = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE"));               	
+	               }
+	            	
+	               // First time so add message
+	               storeMessage(m, psInsertMessage);
+	               m.setPersisted(true);
+	
+		            if (batch)
+		            {
+	                  psInsertMessage.addBatch();
+	                  if (trace) { log.trace("Message does not already exist so inserting it"); }
+	                  messageInsertsInBatch = true;	               
+		            }
+		            else
+		            {
+	                  if (trace) { log.trace("Message does not already exist so inserting it"); }
+	                  int rows = executeWithRetry(psInsertMessage);
+	                  if (trace) { log.trace("Inserted " + rows + " rows"); }
+		               
+		               psInsertMessage.close();
+		               psInsertMessage = null;
+		            }
 	            }
-	            else
-	            {
-                  if (trace) { log.trace("Message does not already exist so inserting it"); }
-                  int rows = executeWithRetry(psInsertMessage);
-                  if (trace) { log.trace("Inserted " + rows + " rows"); }
-	               
-	               psInsertMessage.close();
-	               psInsertMessage = null;
-	            }
             }
          }         
          
@@ -1616,14 +1599,9 @@
 
          // Now the removes
 
-         psReference = null;
-         psDeleteMessage = null;
+         psReference = null;        
          batch = usingBatchUpdates && refsToRemove.size() > 0;
          
-         boolean messageDeletesInBatch = false;
-         
-         boolean pagng = this.isPaging();
-         
          for (Iterator i = refsToRemove.iterator(); i.hasNext(); )
          {
             ChannelRefPair pair = (ChannelRefPair)i.next();
@@ -1645,51 +1623,7 @@
                if (trace) { log.trace("Deleted " + rows + " rows"); }
                psReference.close();
                psReference = null;
-            }
-                 
-            Message m = pair.ref.getMessage();
-            
-            m.decrementPersistentCount();
-            
-            if (m.getPersistentCount() == 0)
-            {	                        		           
-	            // Delete the message (if necessary)
-	                            
-	            if (batch && psDeleteMessage == null || !batch)
-	            {
-	            	if (pagng)
-	            	{
-	            		//Need to do conditional delete - ref might still exist for message
-	            		psDeleteMessage = conn.prepareStatement(getSQLStatement("DELETE_PAGED_MESSAGE"));
-	            	}
-	            	else
-	            	{
-	            		psDeleteMessage = conn.prepareStatement(getSQLStatement("DELETE_MESSAGE"));
-	            	}	            		            		   	              
-	            }
-	
-	            psDeleteMessage.setLong(1, m.getMessageID());
-	            
-	            if (pagng)
-	            {
-	            	psDeleteMessage.setLong(2, m.getMessageID());
-	            }
-	                       
-	            if (batch)
-	            {
-	               psDeleteMessage.addBatch();
-	               
-	               messageDeletesInBatch = true;
-	            }
-	            else
-	            {
-	               int rows = executeWithRetry(psDeleteMessage);
-	               if (trace) { log.trace("Deleted " + rows + " rows"); }
-	
-	               psDeleteMessage.close();
-	               psDeleteMessage = null;
-	            }
-            }
+            }                
          }
          
          if (batch)
@@ -1699,17 +1633,7 @@
             int[] rows = executeWithRetryBatch(psReference);
             
             if (trace) { logBatchUpdate(getSQLStatement("DELETE_MESSAGE_REF"), rows, "deleted"); }
-            
-            if (messageDeletesInBatch)
-            {	            
-	            rows = executeWithRetryBatch(psDeleteMessage);
-	            
-	            if (trace) { logBatchUpdate(getSQLStatement("DELETE_MESSAGE"), rows, "deleted"); }
-	            
-	            psDeleteMessage.close();
-	            psDeleteMessage = null;
-            }
-
+                       
             psReference.close();
             psReference = null;           
          }
@@ -1723,17 +1647,16 @@
       {
       	closeStatement(psReference);
       	closeStatement(psInsertMessage);
-      	closeStatement(psDeleteMessage);
       	closeConnection(conn);        
-         try
-         {
+        // try
+        // {
             wrap.end();                        
-         }
-         finally
-         {  
-            //Release the locks
-            this.releaseLocks(allRefs);
-         }
+//         }
+//         finally
+//         {  
+//            //Release the locks
+//            this.releaseLocks(allRefs);
+//         }
       }
    }
    
@@ -1741,101 +1664,34 @@
       throws Exception
    {          
       Connection conn = null;
-      PreparedStatement psDeleteMessage = null;
+     
       TransactionWrapper wrap = new TransactionWrapper();
       
-      List refs = new ArrayList(refsToRemove.size());
-      Iterator iter = refsToRemove.iterator();
-      while (iter.hasNext())
-      {
-         ChannelRefPair pair = (ChannelRefPair)iter.next();
-         refs.add(pair.ref);
-      }
-            
-      orderReferences(refs);      
+//      List refs = new ArrayList(refsToRemove.size());
+//      Iterator iter = refsToRemove.iterator();
+//      while (iter.hasNext())
+//      {
+//         ChannelRefPair pair = (ChannelRefPair)iter.next();
+//         refs.add(pair.ref);
+//      }
+//            
+//      orderReferences(refs);      
       
       try
       {
          //get locks on all the refs
-         this.getLocks(refs);
+      //   this.getLocks(refs);
          
          conn = ds.getConnection();
                   
          //2PC commit
          
-         //First we commit any refs in state "+" to "C" and delete any
+         //We commit any refs in state "+" to "C" and delete any
          //refs in state "-", then we
          //remove any messages due to refs we just removed
          //if they're not referenced elsewhere
          
-         commitPreparedTransaction(tx, conn);
-         
-         boolean batch = usingBatchUpdates && refsToRemove.size() > 0;
-
-         iter = refsToRemove.iterator();
-         
-         boolean messageDeletesInBatch = false;
-         
-         boolean pagng = this.isPaging();
-         
-         while (iter.hasNext())
-         {
-            ChannelRefPair pair = (ChannelRefPair) iter.next();
-            
-            MessageReference ref = pair.ref;
-            
-            Message m = ref.getMessage();
-            
-            m.decrementPersistentCount();
-                  
-            if (m.getPersistentCount() == 0)
-            {
-               if (batch && psDeleteMessage == null || !batch)
-               {
-               	if (pagng)
-               	{
-               		psDeleteMessage = conn.prepareStatement(getSQLStatement("DELETE_PAGED_MESSAGE"));
-               	}
-               	else
-               	{
-               		psDeleteMessage = conn.prepareStatement(getSQLStatement("DELETE_MESSAGE"));
-               	}
-               }
-	                                   
-	            psDeleteMessage.setLong(1, m.getMessageID());
-	            
-	            if (pagng)
-	            {
-	            	psDeleteMessage.setLong(2, m.getMessageID());	
-	            }
-
-	            if (batch)
-	            {
-	               psDeleteMessage.addBatch(); 
-	               
-	               messageDeletesInBatch = true;
-	            }
-	            else
-	            {
-	               int rows = executeWithRetry(psDeleteMessage);
-	               
-	               if (trace) { log.trace("Deleted " + rows + " rows"); }
-	               
-	               psDeleteMessage.close();
-	               psDeleteMessage = null;
-	            }
-            }
-         }         
-         
-         if (batch && messageDeletesInBatch)
-         {
-            int[] rows = executeWithRetryBatch(psDeleteMessage);
-            
-            if (trace) { logBatchUpdate(getSQLStatement("DELETE_MESSAGE"), rows, "deleted"); }
-            
-            psDeleteMessage.close();
-            psDeleteMessage = null;                           
-         }         
+         commitPreparedTransaction(tx, conn);                      
       }
       catch (Exception e)
       {
@@ -1844,35 +1700,34 @@
       }
       finally
       {
-      	closeStatement(psDeleteMessage);
       	closeConnection(conn);        
-         try
-         {
+//         try
+//         {
             wrap.end();
-         }
-         finally
-         {
-            //release the locks
-            this.releaseLocks(refs);
-         }
+//         }
+//         finally
+//         {
+//            //release the locks
+//            this.releaseLocks(refs);
+//         }
       }
    }
    
    protected void handleBeforePrepare(List refsToAdd, List refsToRemove, Transaction tx) throws Exception
    {
       //We only need to lock on the adds
-      List refs = new ArrayList(refsToAdd.size());
+//      List refs = new ArrayList(refsToAdd.size());
+//      
+//      Iterator iter = refsToAdd.iterator();
+//      while (iter.hasNext())
+//      {
+//         ChannelRefPair pair = (ChannelRefPair)iter.next();
+//         
+//         refs.add(pair.ref);
+//      }
+//      
+//      orderReferences(refs);
       
-      Iterator iter = refsToAdd.iterator();
-      while (iter.hasNext())
-      {
-         ChannelRefPair pair = (ChannelRefPair)iter.next();
-         
-         refs.add(pair.ref);
-      }
-      
-      orderReferences(refs);
-      
       //We insert a tx record and
       //a row for each ref with +
       //and update the row for each delivery with "-"
@@ -1885,7 +1740,7 @@
       try
       {
          //get the locks
-         getLocks(refs);
+        // getLocks(refs);
          
          conn = ds.getConnection();
          
@@ -1895,7 +1750,7 @@
             addTXRecord(conn, tx);
          }
          
-         iter = refsToAdd.iterator();
+         Iterator iter = refsToAdd.iterator();
          boolean batch = usingBatchUpdates && refsToAdd.size() > 1;
          boolean messageInsertsInBatch = false;
 
@@ -1924,35 +1779,39 @@
                psReference = null;
             }
                        
-            Message m = pair.ref.getMessage();            
+            Message m = pair.ref.getMessage(); 
             
-            if (!m.isPersisted())
-            {	            	
-	            if (batch && psInsertMessage == null || !batch)
-	            {
-	            	psInsertMessage = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE"));	            	
-	            }
+            synchronized (m)
+            {
+            
+	            if (!m.isPersisted())
+	            {	            	
+		            if (batch && psInsertMessage == null || !batch)
+		            {
+		            	psInsertMessage = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE"));	            	
+		            }
+		
+		            storeMessage(m, psInsertMessage);               
+		            
+		            m.setPersisted(true);
 	
-	            storeMessage(m, psInsertMessage);               
-	            
-	            m.setPersisted(true);
-
-	            if (batch)
-	            {
-	            	psInsertMessage.addBatch();
-	            	
-	            	messageInsertsInBatch = true;
+		            if (batch)
+		            {
+		            	psInsertMessage.addBatch();
+		            	
+		            	messageInsertsInBatch = true;
+		            }
+		            else
+		            {
+		            	int rows = executeWithRetry(psInsertMessage);
+	
+		            	if (trace) { log.trace("Inserted " + rows + " rows"); }
+	
+		            	psInsertMessage.close();
+		            	
+		            	psInsertMessage = null;
+		            }
 	            }
-	            else
-	            {
-	            	int rows = executeWithRetry(psInsertMessage);
-
-	            	if (trace) { log.trace("Inserted " + rows + " rows"); }
-
-	            	psInsertMessage.close();
-	            	
-	            	psInsertMessage = null;
-	            }
             }
          }         
          
@@ -2032,16 +1891,16 @@
       	closeStatement(psReference);
       	closeStatement(psInsertMessage);
       	closeConnection(conn);         
-         try
-         {
+//         try
+//         {
             wrap.end();            
-         }
-         finally
-         {
+//         }
+//         finally
+//         {
             //release the locks
             
-            this.releaseLocks(refs);
-         }
+           // this.releaseLocks(refs);
+       //  }
       }
    }
    
@@ -2050,95 +1909,28 @@
       //remove refs marked with +
       //and update rows marked with - to C
             
-      PreparedStatement psDeleteMessage = null;
       Connection conn = null;
       TransactionWrapper wrap = new TransactionWrapper();
       
-      List refs = new ArrayList(refsToAdd.size());
+//      List refs = new ArrayList(refsToAdd.size());
+//      
+//      Iterator iter = refsToAdd.iterator();
+//      
+//      while (iter.hasNext())
+//      {
+//         ChannelRefPair pair = (ChannelRefPair)iter.next();
+//         refs.add(pair.ref);
+//      }
+//      
+//      orderReferences(refs);
       
-      Iterator iter = refsToAdd.iterator();
-      
-      while (iter.hasNext())
-      {
-         ChannelRefPair pair = (ChannelRefPair)iter.next();
-         refs.add(pair.ref);
-      }
-      
-      orderReferences(refs);
-      
       try
       {
-         this.getLocks(refs);
+       //  this.getLocks(refs);
          
          conn = ds.getConnection();
          
-         rollbackPreparedTransaction(tx, conn);
-         
-         iter = refsToAdd.iterator();
-         
-         boolean batch = usingBatchUpdates && refsToAdd.size() > 1;
-         
-         boolean messageDeletesInBatch = false;
-         
-         boolean pagng = this.isPaging();
-         
-         while (iter.hasNext())
-         {
-            ChannelRefPair pair = (ChannelRefPair) iter.next();
-            
-            Message m = pair.ref.getMessage();       
-            
-            m.decrementPersistentCount();
-            
-            if (m.getPersistentCount() == 0)
-            {            	
-	            if (batch && psDeleteMessage == null || !batch)
-	            {	
-	            	if (pagng)
-	            	{
-	            		psDeleteMessage = conn.prepareStatement(getSQLStatement("DELETE_PAGED_MESSAGE"));
-	            	}
-	            	else
-	            	{	            			            	
-	            		psDeleteMessage = conn.prepareStatement(getSQLStatement("DELETE_MESSAGE"));
-	            	}
-	            }
-	            
-	                                     
-	            psDeleteMessage.setLong(1, m.getMessageID());
-	            
-	            if (pagng)
-	            {
-	            	psDeleteMessage.setLong(2, m.getMessageID());
-	            }
-	                        
-	            if (batch)
-	            {
-	               psDeleteMessage.addBatch();
-	               
-	               messageDeletesInBatch = true;
-	            }
-	            else
-	            {
-	               int rows = executeWithRetry(psDeleteMessage);
-	               
-	               if (trace) { log.trace("deleted " + rows + " rows"); }
-	               
-	               psDeleteMessage.close();
-	               psDeleteMessage = null;
-	            } 
-            }
-         }
-         
-         if (batch && messageDeletesInBatch)
-         {
-            int[] rows = executeWithRetryBatch(psDeleteMessage);
-            
-            if (trace) { logBatchUpdate(getSQLStatement("DELETE_MESSAGE"), rows, "deleted"); }
-            
-            psDeleteMessage.close();
-            psDeleteMessage = null;         
-         }
+         rollbackPreparedTransaction(tx, conn);         
       }
       catch (Exception e)
       {
@@ -2146,18 +1938,17 @@
          throw e;
       }
       finally
-      {
-      	closeStatement(psDeleteMessage);    	
+      { 	
       	closeConnection(conn);                
-         try
-         {
+//         try
+//         {
             wrap.end();
-         }
-         finally
-         {
-            //release locks
-            this.releaseLocks(refs);
-         }
+//         }
+//         finally
+//         {
+//            //release locks
+//            this.releaseLocks(refs);
+//         }
       }      
    }
    
@@ -2583,28 +2374,28 @@
       }
    }
    
-   protected void getLocks(List refs)
-   {
-      Iterator iter = refs.iterator();
-      while (iter.hasNext())
-      {
-         MessageReference ref = (MessageReference)iter.next();
-         Message m = ref.getMessage();
-         LockMap.instance.obtainLock(m);        
-      }
-   }
+//   protected void getLocks(List refs)
+//   {
+//      Iterator iter = refs.iterator();
+//      while (iter.hasNext())
+//      {
+//         MessageReference ref = (MessageReference)iter.next();
+//         Message m = ref.getMessage();
+//         LockMap.instance.obtainLock(m);        
+//      }
+//   }
+//   
+//   protected void releaseLocks(List refs)
+//   {
+//      Iterator iter = refs.iterator();
+//      while (iter.hasNext())
+//      {
+//         MessageReference ref = (MessageReference)iter.next();
+//         Message m = ref.getMessage();
+//         LockMap.instance.releaseLock(m);         
+//      }
+//   }
    
-   protected void releaseLocks(List refs)
-   {
-      Iterator iter = refs.iterator();
-      while (iter.hasNext())
-      {
-         MessageReference ref = (MessageReference)iter.next();
-         Message m = ref.getMessage();
-         LockMap.instance.releaseLock(m);         
-      }
-   }
-   
    protected void logBatchUpdate(String name, int[] rows, String action)
    {
       int count = 0;
@@ -2617,12 +2408,17 @@
    
    protected int executeWithRetry(PreparedStatement ps) throws Exception
    {
-      return executeWithRetry(ps, false)[0];
+      return executeWithRetry(ps, false, false)[0];
    }
    
+   protected int executeWithRetryIgnoreKeyViolation(PreparedStatement ps) throws Exception
+   {
+      return executeWithRetry(ps, false, true)[0];
+   }
+   
    protected int[] executeWithRetryBatch(PreparedStatement ps) throws Exception
    {
-      return executeWithRetry(ps, true);
+      return executeWithRetry(ps, true, false);
    }
    
    //PersistentServiceSupport overrides ----------------------------
@@ -2639,13 +2435,14 @@
       map.put("CREATE_IDX_MESSAGE_REF_ORD", "CREATE INDEX JBM_MSG_REF_ORD ON JBM_MSG_REF (ORD)");
       map.put("CREATE_IDX_MESSAGE_REF_PAGE_ORD", "CREATE INDEX JBM_MSG_REF__PAGE_ORD ON JBM_MSG_REF (PAGE_ORD)");
       map.put("CREATE_IDX_MESSAGE_REF_MESSAGE_ID", "CREATE INDEX JBM_MSG_REF_MESSAGE_ID ON JBM_MSG_REF (MESSAGE_ID)");      
-      map.put("CREATE_IDX_MESSAGE_REF_SCHED_DELIVERY", "CREATE INDEX JBM_MSG_REF_SCHED_DELIVERY ON JBM_MSG_REF (SCHED_DELIVERY)");
+      map.put("CREATE_IDX_MESSAGE_REF_SCHED_DELIVERY", "CREATE INDEX JBM_MSG_REF_SCHED_DELIVERY ON JBM_MSG_REF (SCHED_DELIVERY)");      
       //Message
       map.put("CREATE_MESSAGE",
               "CREATE TABLE JBM_MSG (MESSAGE_ID BIGINT, RELIABLE CHAR(1), " +
               "EXPIRATION BIGINT, TIMESTAMP BIGINT, PRIORITY TINYINT, HEADERS LONGVARBINARY, " +
-              "PAYLOAD LONGVARBINARY, TYPE TINYINT, PAGED CHAR(1) " +
+              "PAYLOAD LONGVARBINARY, TYPE TINYINT " +
               "PRIMARY KEY (MESSAGE_ID))"); 
+      map.put("CREATE_IDX_MESSAGE_TIMESTAMP", "CREATE INDEX JBM_MSG_REF_TIMESTAMP ON JBM_MSG (TIMESTAMP)");
       //Transaction
       map.put("CREATE_TRANSACTION",
               "CREATE TABLE JBM_TX (" +
@@ -2691,16 +2488,15 @@
       //Message
       map.put("LOAD_MESSAGES",
               "SELECT MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, " +
-              "PRIORITY, HEADERS, PAYLOAD, TYPE, PAGED " +
+              "PRIORITY, HEADERS, PAYLOAD, TYPE " +
               "FROM JBM_MSG");
       map.put("INSERT_MESSAGE",
               "INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, " +
-              "TIMESTAMP, PRIORITY, HEADERS, PAYLOAD, TYPE, PAGED) " +           
-              "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)" );
-      map.put("DELETE_MESSAGE", "DELETE FROM JBM_MSG WHERE MESSAGE_ID=?");
-      map.put("DELETE_PAGED_MESSAGE", "DELETE FROM JBM_MSG WHERE MESSAGE_ID=? AND NOT EXISTS (SELECT * FROM JBM_MSG_REF WHERE MESSAGE_ID = ?)");
+              "TIMESTAMP, PRIORITY, HEADERS, PAYLOAD, TYPE) " +           
+              "VALUES (?, ?, ?, ?, ?, ?, ?, ?)" );
       map.put("MESSAGE_ID_COLUMN", "MESSAGE_ID");
       map.put("MESSAGE_EXISTS", "SELECT MESSAGE_ID FROM JBM_MSG WHERE MESSAGE_ID = ?");
+      map.put("REAP_MESSAGES", "DELETE FROM JBM_MSG WHERE TIMESTAMP <= ? AND NOT EXISTS (SELECT * FROM JBM_MSG_REF WHERE JBM_MSG_REF.MESSAGE_ID = JBM_MSG.MESSAGE_ID)");
       //Transaction
       map.put("INSERT_TRANSACTION",
               "INSERT INTO JBM_TX (NODE_ID, TRANSACTION_ID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID) " +
@@ -2725,7 +2521,7 @@
    
 
    
-   private int[] executeWithRetry(PreparedStatement ps, boolean batch) throws Exception
+   private int[] executeWithRetry(PreparedStatement ps, boolean batch, boolean ignoreKeyViolation) throws Exception
    {
       final int MAX_TRIES = 25;      
       
@@ -2745,7 +2541,22 @@
             }
             else
             {
-               rows = ps.executeUpdate();
+            	try
+            	{
+            		rows = ps.executeUpdate();
+            	}
+            	catch (SQLException e)
+            	{
+            		if (ignoreKeyViolation && e.getSQLState().equals("23000"))
+            		{
+            			//Key violation - ignore
+            			log.info("Got key violation - but ignoring");
+            		}
+            		else
+            		{
+            			throw e;
+            		}
+            	}
             }
             
             if (tries > 0)
@@ -2757,6 +2568,9 @@
          catch (SQLException e)
          {
             log.warn("SQLException caught - assuming deadlock detected, try:" + (tries + 1), e);
+            
+            log.info("SQLState:" + e.getSQLState() + " code:" + e.getErrorCode());
+            
             tries++;
             if (tries == MAX_TRIES)
             {
@@ -2918,9 +2732,80 @@
       
       return order;
    }
-
+   
+   private void reapUnreferencedMessages(long timestamp) throws Exception
+   {
+   	 Connection conn = null;
+       PreparedStatement ps = null;
+       TransactionWrapper wrap = new TransactionWrapper();
+       
+       int rows = -1;
+       
+   	 long start = System.currentTimeMillis();
+   	        
+       try
+       {
+          conn = ds.getConnection();
+          
+          ps = conn.prepareStatement(getSQLStatement("REAP_MESSAGES"));
+          
+          ps.setLong(1, timestamp);
+          
+          rows = ps.executeUpdate();             
+       }
+       catch (Exception e)
+       {
+          wrap.exceptionOccurred();
+          throw e;
+       }
+       finally
+       {
+       	closeStatement(ps);
+       	closeConnection(conn);
+         wrap.end();
+         
+         long end = System.currentTimeMillis();
+         
+         if (trace) { log.trace("Reaper reaped " + rows + " messages in " + (end - start) + " ms"); }
+         
+         log.info("Reaper reaped " + rows + " messages in " + (end - start) + " ms");
+       }   	
+   }
+  
+   
    // Inner classes -------------------------------------------------
-        
+            
+   private class Reaper extends TimerTask
+   {
+   	private boolean cancel;
+   	
+		public synchronized void run()
+		{
+			if (cancel)
+			{
+				cancel();
+				
+				return;
+			}
+			
+			try
+			{
+				reapUnreferencedMessages(System.currentTimeMillis() - reaperPeriod);
+			}
+			catch (Exception e)
+			{
+				log.error("Failed to reap", e);
+			}
+		}
+		
+		public synchronized void doCancel()
+		{
+			cancel = true;
+			
+			cancel();
+		}
+   }
+   
    private static class ChannelRefPair
    {
       private long channelID;

Modified: trunk/src/main/org/jboss/messaging/core/impl/PagingChannelSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/PagingChannelSupport.java	2007-08-21 16:24:12 UTC (rev 3021)
+++ trunk/src/main/org/jboss/messaging/core/impl/PagingChannelSupport.java	2007-08-21 21:07:50 UTC (rev 3022)
@@ -231,8 +231,6 @@
          
          paging = false;
          
-         pm.setPaging(channelID, false);         
-         
          firstPagingOrder = nextPagingOrder = 0;  
          
          clearAllScheduledDeliveries();
@@ -305,9 +303,7 @@
          
          if (messageRefs.size() != fullSize)
          {
-            paging = false;
-            
-            pm.setPaging(channelID, false);            
+            paging = false;    
          }
       }    
    }
@@ -367,8 +363,6 @@
       {
          paging = false;
          
-         pm.setPaging(channelID, false);
-         
          return false;
       }
    }
@@ -388,9 +382,7 @@
             // We are full in memory - go into paging mode
             if (trace) { log.trace(this + " going into paging mode"); }
 
-            paging = true;
-            
-            pm.setPaging(channelID, true);            
+            paging = true;     
          }
       }      
    }
@@ -497,8 +489,6 @@
          paging = false;
       }
             
-      pm.setPaging(channelID, paging);     
-      
       Map refMap = processReferences(ili.getRefInfos());
       
       Iterator iter = ili.getRefInfos().iterator();
@@ -523,9 +513,7 @@
       ref.setPagingOrder(-1);
       
       ref.setScheduledDeliveryTime(info.getScheduledDelivery());
-      
-      ref.getMessage().incrementPersistentCount();
-      
+         
       //Schedule the delivery if necessary, or just add to the in memory queue
       if (!checkAndSchedule(ref))
       {

Modified: trunk/src/main/org/jboss/messaging/core/impl/message/MessageSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/message/MessageSupport.java	2007-08-21 16:24:12 UTC (rev 3021)
+++ trunk/src/main/org/jboss/messaging/core/impl/message/MessageSupport.java	2007-08-21 21:07:50 UTC (rev 3022)
@@ -76,8 +76,6 @@
 	// Must be hidden from subclasses
 	private byte[] payloadAsByteArray;
 	
-	private transient volatile int persistentCount;
-	
 	private transient volatile boolean persisted;
 
 	// Constructors --------------------------------------------------
@@ -316,26 +314,6 @@
 		return new SimpleMessageReference(this);
 	}
 	
-	public int getPersistentCount()
-	{
-		return persistentCount;
-	}
-	
-	public void setPersistentCount(int count)
-	{
-		persistentCount = count;
-	}
-	
-	public void decrementPersistentCount()
-	{
-		persistentCount--;
-	}
-	
-	public void incrementPersistentCount()
-	{
-		persistentCount++;
-	}
-	
 	public boolean isPersisted()
 	{
 		return persisted;

Modified: trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2007-08-21 16:24:12 UTC (rev 3021)
+++ trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2007-08-21 21:07:50 UTC (rev 3022)
@@ -348,7 +348,13 @@
 	      put(Replicator.JVM_ID_KEY, JMSClientVMIdentifier.instance);
 	      
 	      groupMember.multicastControl(new JoinClusterRequest(thisNodeID, info), true);
+	      
+	      checkStartReaper();
       }
+      else
+      {
+      	pm.startReaper();
+      }
    
       //Now load the bindings for this node
       
@@ -837,15 +843,25 @@
     */
    public void nodeJoined(Address address) throws Exception
    {
-      log.debug(this + ": " + address + " joined");
-      
-      // Currently does nothing
+      log.debug(this + ": " + address + " joined");      
    }
    
+   private void checkStartReaper()
+   {
+   	if (groupMember.getCurrentView().size() == 1)
+   	{
+   		//We are the only member in the group - start the message reaper
+   		
+   		pm.startReaper();
+   	}
+   }
+   
    public void nodesLeft(List addresses) throws Throwable
    {
    	if (trace) { log.trace("Nodes left " + addresses.size()); }
    	
+   	checkStartReaper();
+   	
    	Map oldFailoverMap = new HashMap(this.failoverMap);
    	
    	int oldFailoverNodeID = failoverNodeID;
@@ -2090,14 +2106,6 @@
          		startedTx = true;
          	}
          	
-         	//We set the persistent count to be the same as the localReliableCount
-         	//Note that we MUST set the persistent count before routing to any of the queues
-         	//if we only set it when we actually persist in a channel then we could have the situation where
-         	//a ref arrives in a subscription then gets acknowledged and removed before hitting the next sub
-         	//so we would end up with a churn where the message is getting added and removed multiple times for
-         	//a single route
-         	ref.getMessage().setPersistentCount(localReliableCount);
-         	
          	//Now actually route the ref
          	
          	iter = targets.iterator();

Modified: trunk/src/main/org/jboss/messaging/core/impl/tx/TransactionRepository.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/tx/TransactionRepository.java	2007-08-21 16:24:12 UTC (rev 3021)
+++ trunk/src/main/org/jboss/messaging/core/impl/tx/TransactionRepository.java	2007-08-21 21:07:50 UTC (rev 3022)
@@ -307,8 +307,6 @@
          
          MessageReference ref = messageStore.reference(msg);  
          
-         ref.getMessage().incrementPersistentCount();
-         
          ref.getMessage().setPersisted(true);
 
          Binding binding = postOffice.getBindingForChannelID(channelID);
@@ -368,8 +366,6 @@
          
          ref = messageStore.reference(msg);    
          
-         ref.getMessage().incrementPersistentCount();
-         
          ref.getMessage().setPersisted(true);
 
          Binding binding = postOffice.getBindingForChannelID(channelID);

Modified: trunk/src/main/org/jboss/messaging/util/LockMap.java
===================================================================
--- trunk/src/main/org/jboss/messaging/util/LockMap.java	2007-08-21 16:24:12 UTC (rev 3021)
+++ trunk/src/main/org/jboss/messaging/util/LockMap.java	2007-08-21 21:07:50 UTC (rev 3022)
@@ -56,47 +56,47 @@
       this.map = new ConcurrentHashMap();
    }
    
-   public void obtainLock(Object obj)
-   {      
-      Entry entry = null;
-      synchronized (obj)
-      {
-         entry = (Entry)map.get(obj);
-         if (entry == null)
-         {
-            entry = new Entry();
-            map.put(obj, entry);
-         }        
-         entry.refCount++;
-      }
-      try
-      {
-         entry.lock.acquire();
-      }
-      catch (InterruptedException e)
-      {
-         throw new IllegalStateException("Thread interrupted while acquiring lock");
-      }
-   }
+//   public void obtainLock(Object obj)
+//   {      
+//      Entry entry = null;
+//      synchronized (obj)
+//      {
+//         entry = (Entry)map.get(obj);
+//         if (entry == null)
+//         {
+//            entry = new Entry();
+//            map.put(obj, entry);
+//         }        
+//         entry.refCount++;
+//      }
+//      try
+//      {
+//         entry.lock.acquire();
+//      }
+//      catch (InterruptedException e)
+//      {
+//         throw new IllegalStateException("Thread interrupted while acquiring lock");
+//      }
+//   }
+//   
+//   public void releaseLock(Object obj)
+//   {
+//      synchronized (obj)
+//      {
+//         Entry entry = (Entry)map.get(obj);
+//         if (entry == null)
+//         {
+//            throw new IllegalArgumentException("Cannot find mutex in map for " + obj);
+//         }    
+//         if (entry.refCount == 1)
+//         {
+//            map.remove(obj);
+//         }
+//         entry.refCount--;
+//         entry.lock.release();         
+//      }      
+//   }
    
-   public void releaseLock(Object obj)
-   {
-      synchronized (obj)
-      {
-         Entry entry = (Entry)map.get(obj);
-         if (entry == null)
-         {
-            throw new IllegalArgumentException("Cannot find mutex in map for " + obj);
-         }    
-         if (entry.refCount == 1)
-         {
-            map.remove(obj);
-         }
-         entry.refCount--;
-         entry.lock.release();         
-      }      
-   }
-   
    public int getSize()
    {
       return map.size();

Modified: trunk/tests/src/org/jboss/test/messaging/core/JDBCPersistenceManagerTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/JDBCPersistenceManagerTest.java	2007-08-21 16:24:12 UTC (rev 3021)
+++ trunk/tests/src/org/jboss/test/messaging/core/JDBCPersistenceManagerTest.java	2007-08-21 21:07:50 UTC (rev 3022)
@@ -139,9 +139,6 @@
          Message m1 = messages[i * 2];
          Message m2 = messages[i * 2 + 1];
          
-         m1.setPersistentCount(2);
-         m2.setPersistentCount(2);
-         
          MessageReference ref1_1 = ms.reference(m1);
          MessageReference ref1_2 = ms.reference(m1);
                 
@@ -168,6 +165,7 @@
          assertTrue(refs.contains(new Long(m1.getMessageID())));
          assertTrue(refs.contains(new Long(m2.getMessageID())));
          
+         pm.reapUnreferencedMessages();
          List msgs = getMessageIds();
          assertNotNull(msgs);
          assertEquals(2, msgs.size());
@@ -187,6 +185,7 @@
          assertTrue(refs.contains(new Long(m1.getMessageID())));
          assertTrue(refs.contains(new Long(m2.getMessageID())));
          
+         pm.reapUnreferencedMessages();
          msgs = getMessageIds();
          assertNotNull(msgs);
          assertEquals(2, msgs.size());
@@ -205,6 +204,7 @@
          assertEquals(1, refs.size());         
          assertTrue(refs.contains(new Long(m2.getMessageID())));
          
+         pm.reapUnreferencedMessages();
          msgs = getMessageIds();
          assertNotNull(msgs);
          assertEquals(1, msgs.size()); 
@@ -221,6 +221,7 @@
          assertEquals(1, refs.size());         
          assertTrue(refs.contains(new Long(m2.getMessageID())));
          
+         pm.reapUnreferencedMessages();
          msgs = getMessageIds();
          assertNotNull(msgs);
          assertEquals(1, msgs.size());
@@ -236,6 +237,7 @@
          assertNotNull(refs);
          assertTrue(refs.isEmpty());
          
+         pm.reapUnreferencedMessages();
          msgs = getMessageIds();
          assertNotNull(msgs);
          assertTrue(msgs.isEmpty());
@@ -515,6 +517,7 @@
       assertTrue(refIds.contains(new Long(ref15.getMessage().getMessageID())));
      
       
+      pm.reapUnreferencedMessages();
       List msgs = getMessageIds();
       assertNotNull(msgs);
       assertEquals(10, msgs.size());
@@ -557,6 +560,7 @@
       assertEquals(1, refIds.size());
       assertTrue(refIds.contains(new Long(ref11.getMessage().getMessageID())));
       
+      pm.reapUnreferencedMessages();
       ms = getMessageIds();
 
       assertNotNull(ms);
@@ -591,7 +595,7 @@
       assertTrue(refIds.contains(new Long(ref9.getMessage().getMessageID())));
       assertTrue(refIds.contains(new Long(ref10.getMessage().getMessageID())));
      
-      
+      pm.reapUnreferencedMessages();
       ms = getMessageIds();
         
       assertNotNull(ms);
@@ -620,6 +624,7 @@
       refs.add(ref10);
       pm.removeDepagedReferences(channel1.getChannelID(), refs);
       
+      pm.reapUnreferencedMessages();
       ms = getMessageIds();
       assertNotNull(ms);
       assertEquals(0, ms.size());
@@ -760,6 +765,7 @@
       assertTrue(refIds.contains(new Long(ref9.getMessage().getMessageID())));
       assertTrue(refIds.contains(new Long(ref10.getMessage().getMessageID())));
       
+      pm.reapUnreferencedMessages();
       List msgs = getMessageIds();
       assertNotNull(msgs);
       assertEquals(10, msgs.size());
@@ -909,6 +915,7 @@
       assertTrue(refIds.contains(new Long(ref9.getMessage().getMessageID())));
       assertTrue(refIds.contains(new Long(ref10.getMessage().getMessageID())));
       
+      pm.reapUnreferencedMessages();
       List msgs = getMessageIds();
       assertNotNull(msgs);
       assertEquals(10, msgs.size());
@@ -1304,8 +1311,6 @@
       log.debug("adding references non-transactionally");
 
       // Add first two refs non transactionally
-      ref1.getMessage().incrementPersistentCount();
-      ref2.getMessage().incrementPersistentCount();
       pm.addReference(channel.getChannelID(), ref1, null);
       pm.addReference(channel.getChannelID(), ref2, null);
       
@@ -1316,6 +1321,7 @@
       assertTrue(refs.contains(new Long(ref1.getMessage().getMessageID())));
       assertTrue(refs.contains(new Long(ref2.getMessage().getMessageID())));
       
+      pm.reapUnreferencedMessages();
       List msgs = getMessageIds();
       assertNotNull(msgs);
       assertEquals(2, msgs.size());
@@ -1325,9 +1331,6 @@
       log.debug("ref1 and ref2 are there");
 
       //Add the next 3 refs transactionally
-      ref3.getMessage().incrementPersistentCount();
-      ref4.getMessage().incrementPersistentCount();
-      ref5.getMessage().incrementPersistentCount();
       pm.addReference(channel.getChannelID(), ref3, tx);
       pm.addReference(channel.getChannelID(), ref4, tx);
       pm.addReference(channel.getChannelID(), ref5, tx);
@@ -1343,6 +1346,7 @@
       assertTrue(refs.contains(new Long(ref1.getMessage().getMessageID())));
       assertTrue(refs.contains(new Long(ref2.getMessage().getMessageID())));  
       
+      pm.reapUnreferencedMessages();
       msgs = getMessageIds();
       assertNotNull(msgs);
       assertEquals(2, msgs.size());
@@ -1362,6 +1366,7 @@
       assertTrue(refs.contains(new Long(ref4.getMessage().getMessageID())));  
       assertTrue(refs.contains(new Long(ref5.getMessage().getMessageID())));
       
+      pm.reapUnreferencedMessages();
       msgs = getMessageIds();
       assertNotNull(msgs);
       assertEquals(3, msgs.size());
@@ -1408,8 +1413,6 @@
       MessageReference ref5 = ms.reference(m5);  
 
       //Add first two refs non transactionally
-      ref1.getMessage().incrementPersistentCount();
-      ref2.getMessage().incrementPersistentCount();
       pm.addReference(channel.getChannelID(), ref1, null);
       pm.addReference(channel.getChannelID(), ref2, null);
       
@@ -1420,6 +1423,7 @@
       assertTrue(refs.contains(new Long(ref1.getMessage().getMessageID())));
       assertTrue(refs.contains(new Long(ref2.getMessage().getMessageID())));      
       
+      pm.reapUnreferencedMessages();
       List msgs = getMessageIds();
       assertNotNull(msgs);
       assertEquals(2, msgs.size());
@@ -1429,9 +1433,6 @@
       
       
       //Add the next 3 refs transactionally
-      ref3.getMessage().incrementPersistentCount();
-      ref4.getMessage().incrementPersistentCount();
-      ref5.getMessage().incrementPersistentCount();
       pm.addReference(channel.getChannelID(), ref3, tx);
       pm.addReference(channel.getChannelID(), ref4, tx);
       pm.addReference(channel.getChannelID(), ref5, tx);
@@ -1447,6 +1448,7 @@
       assertTrue(refs.contains(new Long(ref1.getMessage().getMessageID())));
       assertTrue(refs.contains(new Long(ref2.getMessage().getMessageID())));  
       
+      pm.reapUnreferencedMessages();
       msgs = getMessageIds();
       assertNotNull(msgs);
       assertEquals(2, msgs.size());
@@ -1462,6 +1464,7 @@
       assertTrue(refs.contains(new Long(ref1.getMessage().getMessageID())));
       assertTrue(refs.contains(new Long(ref2.getMessage().getMessageID())));  
       
+      pm.reapUnreferencedMessages();
       msgs = getMessageIds();
       assertNotNull(msgs);
       assertEquals(2, msgs.size());

Modified: trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_NP_2PCTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_NP_2PCTest.java	2007-08-21 16:24:12 UTC (rev 3021)
+++ trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_NP_2PCTest.java	2007-08-21 21:07:50 UTC (rev 3022)
@@ -119,6 +119,7 @@
            
       //Msgs
       
+      pm.reapUnreferencedMessages();
       List msgIds = getMessageIds();
       assertEquals(0, msgIds.size()); 
       
@@ -168,6 +169,7 @@
             
       //Msgs
         
+      pm.reapUnreferencedMessages();
       msgIds = getMessageIds();
       assertEquals(25, msgIds.size());
        
@@ -216,6 +218,7 @@
             
       //Msgs
 
+      pm.reapUnreferencedMessages();
       msgIds = getMessageIds();
       assertEquals(50, msgIds.size());
       
@@ -264,6 +267,7 @@
                
       //Msgs
       
+      pm.reapUnreferencedMessages();
       msgIds = getMessageIds();
       assertEquals(100, msgIds.size());
              
@@ -306,6 +310,7 @@
             
       //Msgs
       
+      pm.reapUnreferencedMessages();
       msgIds = getMessageIds();
       assertEquals(0, msgIds.size());
       

Modified: trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_NP_NTTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_NP_NTTest.java	2007-08-21 16:24:12 UTC (rev 3021)
+++ trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_NP_NTTest.java	2007-08-21 21:07:50 UTC (rev 3022)
@@ -115,6 +115,7 @@
             
       //Msgs
       
+      pm.reapUnreferencedMessages();
       List msgIds = getMessageIds();
       assertEquals(0, msgIds.size()); 
       
@@ -161,6 +162,7 @@
             
       //Msgs
       
+      pm.reapUnreferencedMessages();
       msgIds = getMessageIds();
       assertEquals(25, msgIds.size());
       
@@ -207,6 +209,7 @@
             
       //Msgs
       
+      pm.reapUnreferencedMessages();
       msgIds = getMessageIds();
       assertEquals(50, msgIds.size());
       
@@ -254,6 +257,7 @@
             
       //Msgs
       
+      pm.reapUnreferencedMessages();
       msgIds = getMessageIds();
       assertEquals(100, msgIds.size());
       
@@ -296,6 +300,7 @@
             
       //Msgs
       
+      pm.reapUnreferencedMessages();
       msgIds = getMessageIds();
       assertEquals(0, msgIds.size());
       

Modified: trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_NP_TTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_NP_TTest.java	2007-08-21 16:24:12 UTC (rev 3021)
+++ trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_NP_TTest.java	2007-08-21 21:07:50 UTC (rev 3022)
@@ -118,6 +118,7 @@
             
       //Msgs
        
+      pm.reapUnreferencedMessages();
       List msgIds = getMessageIds();
       assertEquals(0, msgIds.size()); 
       
@@ -166,6 +167,7 @@
             
       //Msgs
       
+      pm.reapUnreferencedMessages();
       msgIds = getMessageIds();
       assertEquals(25, msgIds.size());
       
@@ -214,6 +216,7 @@
             
       //Msgs
        
+      pm.reapUnreferencedMessages();
       msgIds = getMessageIds();
       assertEquals(50, msgIds.size());
       
@@ -262,6 +265,7 @@
             
       //Msgs
       
+      pm.reapUnreferencedMessages();
       msgIds = getMessageIds();
       assertEquals(100, msgIds.size());
       
@@ -304,6 +308,7 @@
             
       //Msgs
       
+      pm.reapUnreferencedMessages();
       msgIds = getMessageIds();
       assertEquals(0, msgIds.size());
       

Modified: trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_P_2PCTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_P_2PCTest.java	2007-08-21 16:24:12 UTC (rev 3021)
+++ trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_P_2PCTest.java	2007-08-21 21:07:50 UTC (rev 3022)
@@ -119,6 +119,7 @@
             
       //Msgs
       
+      pm.reapUnreferencedMessages();
       List msgIds = getMessageIds();
       assertEquals(50, msgIds.size()); 
       
@@ -173,6 +174,7 @@
             
       //Msgs
       
+      pm.reapUnreferencedMessages();
       msgIds = getMessageIds();
       assertEquals(75, msgIds.size());
       
@@ -228,6 +230,7 @@
             
       //Msgs
       
+      pm.reapUnreferencedMessages();
       msgIds = getMessageIds();
       assertEquals(100, msgIds.size());
       
@@ -283,6 +286,7 @@
             
       //Msgs
       
+      pm.reapUnreferencedMessages();
       msgIds = getMessageIds();
       assertEquals(150, msgIds.size());
       
@@ -325,6 +329,7 @@
             
       //Msgs
       
+      pm.reapUnreferencedMessages();
       msgIds = getMessageIds();
       assertEquals(0, msgIds.size());
       

Modified: trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_P_NTTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_P_NTTest.java	2007-08-21 16:24:12 UTC (rev 3021)
+++ trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_P_NTTest.java	2007-08-21 21:07:50 UTC (rev 3022)
@@ -121,6 +121,7 @@
             
       //Msgs
       
+      pm.reapUnreferencedMessages();
       List msgIds = getMessageIds();
       assertEquals(50, msgIds.size()); 
       
@@ -173,6 +174,7 @@
             
       //Msgs
       
+      pm.reapUnreferencedMessages();
       msgIds = getMessageIds();
       assertEquals(75, msgIds.size());
       
@@ -227,6 +229,7 @@
             
       //Msgs
       
+      pm.reapUnreferencedMessages();
       msgIds = getMessageIds();
       assertEquals(100, msgIds.size());
       
@@ -280,6 +283,7 @@
             
       //Msgs
         
+      pm.reapUnreferencedMessages();
       msgIds = getMessageIds();
       assertEquals(150, msgIds.size());
       
@@ -322,6 +326,7 @@
             
       //Msgs
       
+      pm.reapUnreferencedMessages();
       msgIds = getMessageIds();
       assertEquals(0, msgIds.size());
       

Modified: trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_P_TTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_P_TTest.java	2007-08-21 16:24:12 UTC (rev 3021)
+++ trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_P_TTest.java	2007-08-21 21:07:50 UTC (rev 3022)
@@ -122,6 +122,7 @@
             
       //Msgs
       
+      pm.reapUnreferencedMessages();
       List msgIds = getMessageIds();
       assertEquals(50, msgIds.size()); 
       
@@ -175,6 +176,7 @@
             
       //Msgs
       
+      pm.reapUnreferencedMessages();
       msgIds = getMessageIds();
       assertEquals(75, msgIds.size());
       
@@ -230,6 +232,7 @@
             
       //Msgs
       
+      pm.reapUnreferencedMessages();
       msgIds = getMessageIds();
       assertEquals(100, msgIds.size());
       
@@ -284,6 +287,7 @@
             
       //Msgs
          
+      pm.reapUnreferencedMessages();
       msgIds = getMessageIds();
       assertEquals(150, msgIds.size());
       
@@ -326,6 +330,7 @@
             
       //Msgs
       
+      pm.reapUnreferencedMessages();
       msgIds = getMessageIds();
       assertEquals(0, msgIds.size());
       

Modified: trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_NP_2PCTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_NP_2PCTest.java	2007-08-21 16:24:12 UTC (rev 3021)
+++ trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_NP_2PCTest.java	2007-08-21 21:07:50 UTC (rev 3022)
@@ -91,7 +91,7 @@
       assertTrue(refIds.isEmpty());
       
       //Verify no msgs in storage
-      List msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); List msgIds = getMessageIds();
       assertTrue(msgIds.isEmpty());
       
       //Verify 99 refs in queue
@@ -124,7 +124,8 @@
       assertTrue(refIds.isEmpty());
       
       //Verify no msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertTrue(msgIds.isEmpty());
        
       //Verify 100 refs in queue
@@ -158,7 +159,7 @@
       assertTrue(refIds.isEmpty());
       
       //Verify no msgs in storage
-      msgIds = getMessageIds();
+       pm.reapUnreferencedMessages(); pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertTrue(msgIds.isEmpty());
       
       //Verify 100 refs in queue
@@ -192,7 +193,7 @@
       assertSameIds(refIds, refs, 100, 109);
       
       //Verify 10 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(10, msgIds.size()); 
       assertSameIds(msgIds, refs, 100, 109);
       
@@ -226,7 +227,7 @@
       assertSameIds(refIds, refs, 100, 109);
       
       //Verify 10 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(10, msgIds.size()); 
       assertSameIds(msgIds, refs, 100, 109);
        
@@ -263,7 +264,7 @@
       assertSameIds(refIds, refs, 100, 119);
       
       //Verify 20 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(20, msgIds.size()); 
       assertSameIds(msgIds, refs, 100, 119);
       
@@ -310,7 +311,7 @@
       assertSameIds(refIds, refs, 100, 129);
       
       //Verify 30 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(30, msgIds.size()); 
       assertSameIds(msgIds, refs, 100, 129);
       
@@ -348,7 +349,7 @@
       assertSameIds(refIds, refs, 100, 139);
       
       //Verify 40 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(40, msgIds.size()); 
       assertSameIds(msgIds, refs, 100, 139);
       
@@ -383,7 +384,7 @@
       assertSameIds(refIds, refs, 100, 139);
       
       //Verify 40 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(40, msgIds.size()); 
       assertSameIds(msgIds, refs, 100, 139);
       
@@ -414,7 +415,7 @@
       assertSameIds(refIds, refs, 100, 139);
       
       //Verify 40 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(40, msgIds.size()); 
       assertSameIds(msgIds, refs, 100, 139);
       
@@ -448,7 +449,7 @@
       assertSameIds(refIds, refs, 100, 139);
       
       //Verify 40 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(40, msgIds.size()); 
       assertSameIds(msgIds, refs, 100, 139);
 
@@ -479,7 +480,7 @@
       assertSameIds(refIds, refs, 120, 140);
       
       //Verify 21 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(21, msgIds.size()); 
       assertSameIds(msgIds, refs, 120, 140);
  
@@ -510,7 +511,7 @@
       assertSameIds(refIds, refs, 140, 140);
       
       //Verify 1 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(1, msgIds.size()); 
       assertSameIds(msgIds, refs, 140, 140);
       
@@ -540,7 +541,7 @@
       assertEquals(0, refIds.size());
       
       //Verify 0 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(0, msgIds.size()); 
       
       //Verify 81 refs in queue
@@ -569,7 +570,7 @@
       assertEquals(0, refIds.size());
       
       //Verify 0 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(0, msgIds.size()); 
 
       //Verify 80 refs in queue
@@ -598,7 +599,7 @@
       assertEquals(0, refIds.size());
       
       //Verify 0 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(0, msgIds.size()); 
       
       //Verify 20 refs in queue
@@ -632,7 +633,7 @@
       assertEquals(0, refIds.size());
       
       //Verify 0 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(0, msgIds.size()); 
  
       //Verify 40 refs in queue
@@ -667,7 +668,7 @@
       assertEquals(0, refIds.size());
       
       //Verify 0 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(0, msgIds.size()); 
         
       //Verify 60 refs in queue
@@ -702,7 +703,7 @@
       assertSameIds(refIds, refs, 221, 240);
       
       //Verify 20 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(20, msgIds.size()); 
       assertSameIds(msgIds, refs, 221, 240);
       
@@ -733,7 +734,7 @@
       assertEquals(0, refIds.size());
       
       //Verify 0 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(0, msgIds.size()); 
   
       //Verify 100 refs in queue
@@ -762,7 +763,7 @@
       assertEquals(0, refIds.size());
       
       //Verify 0 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(0, msgIds.size()); 
   
       //Verify 100 refs in queue
@@ -790,7 +791,7 @@
       assertSameIds(refIds, refs, 231, 240);
       
       //Verify 10 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(10, msgIds.size()); 
       assertSameIds(msgIds, refs, 231, 240);
       
@@ -821,7 +822,7 @@
       assertSameIds(refIds, refs, 221, 240);
       
       //Verify 20 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(20, msgIds.size()); 
       assertSameIds(msgIds, refs, 221, 240);
       
@@ -848,7 +849,7 @@
       assertEquals(0, refIds.size());     
       
       //Verify 0 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(0, msgIds.size()); 
 
       //Verify 70 refs in queue
@@ -872,7 +873,7 @@
       assertEquals(0, refIds.size());     
       
       //Verify 0 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(0, msgIds.size()); 
 
       //Verify 0 refs in queue

Modified: trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_NP_NTTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_NP_NTTest.java	2007-08-21 16:24:12 UTC (rev 3021)
+++ trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_NP_NTTest.java	2007-08-21 21:07:50 UTC (rev 3022)
@@ -85,7 +85,7 @@
       assertTrue(refIds.isEmpty());
       
       //Verify no msgs in storage
-      List msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); List msgIds = getMessageIds();
       assertTrue(msgIds.isEmpty());
       
       //Verify 99 refs in queue
@@ -113,7 +113,7 @@
       assertTrue(refIds.isEmpty());
       
       //Verify no msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertTrue(msgIds.isEmpty());
       
       //Verify 100 refs in queue
@@ -143,7 +143,7 @@
       assertTrue(refIds.isEmpty());
       
       //Verify no msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertTrue(msgIds.isEmpty());
           
       //Verify 100 refs in queue
@@ -174,7 +174,7 @@
       assertSameIds(refIds, refs, 100, 109);
       
       //Verify 10 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(10, msgIds.size()); 
       assertSameIds(msgIds, refs, 100, 109);
       
@@ -205,7 +205,7 @@
       assertSameIds(refIds, refs, 100, 109);
       
       //Verify 10 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(10, msgIds.size()); 
       assertSameIds(msgIds, refs, 100, 109);
       
@@ -239,7 +239,7 @@
       assertSameIds(refIds, refs, 100, 119);
       
       //Verify 20 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(20, msgIds.size()); 
       assertSameIds(msgIds, refs, 100, 119);
       
@@ -274,7 +274,7 @@
       assertSameIds(refIds, refs, 100, 129);
       
       //Verify 30 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(30, msgIds.size()); 
       assertSameIds(msgIds, refs, 100, 129);
       
@@ -309,7 +309,7 @@
       assertSameIds(refIds, refs, 100, 139);
       
       //Verify 40 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(40, msgIds.size()); 
       assertSameIds(msgIds, refs, 100, 139);
       
@@ -341,7 +341,7 @@
       assertSameIds(refIds, refs, 100, 139);
       
       //Verify 40 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(40, msgIds.size()); 
       assertSameIds(msgIds, refs, 100, 139);
       
@@ -372,7 +372,7 @@
       assertSameIds(refIds, refs, 100, 139);
       
       //Verify 40 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(40, msgIds.size()); 
       assertSameIds(msgIds, refs, 100, 139);
       
@@ -406,7 +406,7 @@
       assertSameIds(refIds, refs, 100, 139);
       
       //Verify 40 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(40, msgIds.size()); 
       assertSameIds(msgIds, refs, 100, 139);
       
@@ -440,7 +440,7 @@
       assertSameIds(refIds, refs, 120, 140);
       
       //Verify 21 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(21, msgIds.size()); 
       assertSameIds(msgIds, refs, 120, 140);
         
@@ -471,7 +471,7 @@
       assertSameIds(refIds, refs, 140, 140);
       
       //Verify 1 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(1, msgIds.size()); 
       assertSameIds(msgIds, refs, 140, 140);
 
@@ -501,7 +501,7 @@
       assertEquals(0, refIds.size());
       
       //Verify 0 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(0, msgIds.size()); 
       
       //Verify 81 refs in queue
@@ -530,7 +530,7 @@
       assertEquals(0, refIds.size());
       
       //Verify 0 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(0, msgIds.size()); 
       
       //Verify 80 refs in queue
@@ -559,7 +559,7 @@
       assertEquals(0, refIds.size());
       
       //Verify 0 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(0, msgIds.size()); 
       
       //Verify 20 refs in queue
@@ -590,7 +590,7 @@
       assertEquals(0, refIds.size());
       
       //Verify 0 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(0, msgIds.size()); 
    
       //Verify 40 refs in queue
@@ -622,7 +622,7 @@
       assertEquals(0, refIds.size());
       
       //Verify 0 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(0, msgIds.size()); 
       
       //Verify 60 refs in queue
@@ -654,7 +654,7 @@
       assertSameIds(refIds, refs, 221, 240);
       
       //Verify 20 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(20, msgIds.size()); 
       assertSameIds(msgIds, refs, 221, 240);
         
@@ -687,7 +687,7 @@
       assertEquals(0, refIds.size());
       
       //Verify 0 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(0, msgIds.size()); 
   
       //Verify 100 refs in queue
@@ -716,7 +716,7 @@
       assertEquals(0, refIds.size());
       
       //Verify 0 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(0, msgIds.size()); 
   
       //Verify 100 refs in queue
@@ -746,7 +746,7 @@
       assertSameIds(refIds, refs, 231, 240);
       
       //Verify 10 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(10, msgIds.size()); 
       assertSameIds(msgIds, refs, 231, 240);
       
@@ -777,7 +777,7 @@
       assertSameIds(refIds, refs, 221, 240);
       
       //Verify 20 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(20, msgIds.size()); 
       assertSameIds(msgIds, refs, 221, 240);
       
@@ -807,7 +807,7 @@
       assertEquals(0, refIds.size());     
       
       //Verify 0 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(0, msgIds.size()); 
     
       //Verify 0 deliveries
@@ -837,7 +837,7 @@
       assertEquals(0, refIds.size());     
       
       //Verify 0 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(0, msgIds.size()); 
 
       //Verify 0 refs in queue

Modified: trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_NP_TTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_NP_TTest.java	2007-08-21 16:24:12 UTC (rev 3021)
+++ trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_NP_TTest.java	2007-08-21 21:07:50 UTC (rev 3022)
@@ -90,7 +90,7 @@
       assertTrue(refIds.isEmpty());
       
       //Verify no msgs in storage
-      List msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); List msgIds = getMessageIds();
       assertTrue(msgIds.isEmpty());
       
       //Verify 99 refs in queue
@@ -122,7 +122,7 @@
       assertTrue(refIds.isEmpty());
       
       //Verify no msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertTrue(msgIds.isEmpty());
       
       //Verify 100 refs in queue
@@ -155,7 +155,7 @@
       assertTrue(refIds.isEmpty());
       
       //Verify no msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertTrue(msgIds.isEmpty());
 
       //Verify 100 refs in queue
@@ -188,7 +188,7 @@
       assertSameIds(refIds, refs, 100, 109);
       
       //Verify 10 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(10, msgIds.size()); 
       assertSameIds(msgIds, refs, 100, 109);
       
@@ -221,7 +221,7 @@
       assertSameIds(refIds, refs, 100, 109);
       
       //Verify 10 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(10, msgIds.size()); 
       assertSameIds(msgIds, refs, 100, 109);
       
@@ -257,7 +257,7 @@
       assertSameIds(refIds, refs, 100, 119);
       
       //Verify 20 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(20, msgIds.size()); 
       assertSameIds(msgIds, refs, 100, 119);
       
@@ -303,7 +303,7 @@
       assertSameIds(refIds, refs, 100, 129);
       
       //Verify 30 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(30, msgIds.size()); 
       assertSameIds(msgIds, refs, 100, 129);
       
@@ -338,7 +338,7 @@
       assertSameIds(refIds, refs, 100, 139);
       
       //Verify 40 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(40, msgIds.size()); 
       assertSameIds(msgIds, refs, 100, 139);
       
@@ -372,7 +372,7 @@
       assertSameIds(refIds, refs, 100, 139);
       
       //Verify 40 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(40, msgIds.size()); 
       assertSameIds(msgIds, refs, 100, 139);
       
@@ -403,7 +403,7 @@
       assertSameIds(refIds, refs, 100, 139);
       
       //Verify 40 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(40, msgIds.size()); 
       assertSameIds(msgIds, refs, 100, 139);
       
@@ -437,7 +437,7 @@
       assertSameIds(refIds, refs, 100, 139);
       
       //Verify 40 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(40, msgIds.size()); 
       assertSameIds(msgIds, refs, 100, 139);
       
@@ -470,7 +470,7 @@
       assertSameIds(refIds, refs, 120, 140);
       
       //Verify 21 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(21, msgIds.size()); 
       assertSameIds(msgIds, refs, 120, 140);
       
@@ -501,7 +501,7 @@
       assertSameIds(refIds, refs, 140, 140);
       
       //Verify 1 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(1, msgIds.size()); 
       assertSameIds(msgIds, refs, 140, 140);
       
@@ -531,7 +531,7 @@
       assertEquals(0, refIds.size());
       
       //Verify 0 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(0, msgIds.size()); 
       
       //Verify 81 refs in queue
@@ -560,7 +560,7 @@
       assertEquals(0, refIds.size());
       
       //Verify 0 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(0, msgIds.size()); 
 
       //Verify 80 refs in queue
@@ -589,7 +589,7 @@
       assertEquals(0, refIds.size());
       
       //Verify 0 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(0, msgIds.size()); 
   
       //Verify 20 refs in queue
@@ -622,7 +622,7 @@
       assertEquals(0, refIds.size());
       
       //Verify 0 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(0, msgIds.size()); 
       
       //Verify 40 refs in queue
@@ -656,7 +656,7 @@
       assertEquals(0, refIds.size());
       
       //Verify 0 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(0, msgIds.size()); 
       
       //Verify 60 refs in queue
@@ -690,7 +690,7 @@
       assertSameIds(refIds, refs, 221, 240);
       
       //Verify 20 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(20, msgIds.size()); 
       assertSameIds(msgIds, refs, 221, 240);
       
@@ -722,7 +722,7 @@
       assertEquals(0, refIds.size());
       
       //Verify 0 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(0, msgIds.size()); 
    
       //Verify 100 refs in queue
@@ -751,7 +751,7 @@
       assertEquals(0, refIds.size());
       
       //Verify 0 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(0, msgIds.size()); 
   
       //Verify 100 refs in queue
@@ -781,7 +781,7 @@
       assertSameIds(refIds, refs, 231, 240);
       
       //Verify 10 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(10, msgIds.size()); 
       assertSameIds(msgIds, refs, 231, 240);
       
@@ -812,7 +812,7 @@
       assertSameIds(refIds, refs, 221, 240);
       
       //Verify 20 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(20, msgIds.size()); 
       assertSameIds(msgIds, refs, 221, 240);
       
@@ -839,7 +839,7 @@
       assertEquals(0, refIds.size());     
       
       //Verify 0 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(0, msgIds.size()); 
 
       //Verify 70 refs in queue
@@ -863,7 +863,7 @@
       assertEquals(0, refIds.size());     
       
       //Verify 0 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(0, msgIds.size()); 
   
       //Verify 0 refs in queue

Modified: trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_P_2PCTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_P_2PCTest.java	2007-08-21 16:24:12 UTC (rev 3021)
+++ trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_P_2PCTest.java	2007-08-21 21:07:50 UTC (rev 3022)
@@ -94,7 +94,7 @@
       assertSameIds(refIds, refs, 0, 98);
       
       //Verify 99 msgs in storage
-      List msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); List msgIds = getMessageIds();
       assertEquals(99, msgIds.size());
       assertSameIds(msgIds, refs, 0, 98);
       
@@ -131,7 +131,7 @@
       assertSameIds(refIds, refs, 0, 99);
       
       //Verify 100 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(100, msgIds.size());
       assertSameIds(msgIds, refs, 0, 99);
       
@@ -171,7 +171,7 @@
       assertSameIds(refIds, refs, 0, 108);      
       
       //Verify 100 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(109, msgIds.size());
       assertSameIds(msgIds, refs, 0, 108);
       
@@ -211,7 +211,7 @@
       assertSameIds(refIds, refs, 0, 109);
       
       //Verify 110 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(110, msgIds.size()); 
       assertSameIds(msgIds, refs, 0, 109);
        
@@ -249,7 +249,7 @@
       assertSameIds(refIds, refs, 0, 110);
       
       //Verify 111 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(111, msgIds.size()); 
       assertSameIds(msgIds, refs, 0, 110);
       
@@ -290,7 +290,7 @@
       assertSameIds(refIds, refs, 0, 119);
       
       //Verify 120 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(120, msgIds.size()); 
       assertSameIds(msgIds, refs, 0, 119);
       
@@ -341,7 +341,7 @@
       assertSameIds(refIds, refs, 0, 129);
       
       //Verify 130 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(130, msgIds.size()); 
       assertSameIds(msgIds, refs, 0, 129);
       
@@ -383,7 +383,7 @@
       assertSameIds(refIds, refs, 0, 139);
       
       //Verify 140 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(140, msgIds.size()); 
       assertSameIds(msgIds, refs, 0, 139);
       
@@ -423,7 +423,7 @@
       assertSameIds(refIds, refs, 0, 140);
       
       //Verify 141 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(141, msgIds.size()); 
       assertSameIds(msgIds, refs, 0, 140);
       
@@ -458,7 +458,7 @@
       assertSameIds(refIds, refs, 1, 140);      
       
       //Verify 140 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(140, msgIds.size()); 
       assertSameIds(msgIds, refs, 1, 140);
       
@@ -497,7 +497,7 @@
       assertSameIds(refIds, refs, 19, 140);
       
       //Verify 122 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(122, msgIds.size()); 
       assertSameIds(msgIds, refs, 19, 140);
       
@@ -535,7 +535,7 @@
       assertSameIds(refIds, refs, 20, 140);
       
       //Verify 121 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(121, msgIds.size()); 
       assertSameIds(msgIds, refs, 20, 140);
       
@@ -571,7 +571,7 @@
       assertSameIds(refIds, refs, 40, 140);
       
       //Verify 120 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(101, msgIds.size()); 
       assertSameIds(msgIds, refs, 40, 140);
       
@@ -606,7 +606,7 @@
       assertSameIds(refIds, refs, 41, 140);
       
       //Verify 100 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(100, msgIds.size()); 
       assertSameIds(msgIds, refs, 41, 140); 
 
@@ -641,7 +641,7 @@
       assertSameIds(refIds, refs, 61, 140);
       
       //Verify 80 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(80, msgIds.size()); 
       assertSameIds(msgIds, refs, 61, 140); 
       
@@ -676,7 +676,7 @@
       assertSameIds(refIds, refs, 121, 140);
       
       //Verify 20 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(20, msgIds.size()); 
       assertSameIds(msgIds, refs, 121, 140);  
 
@@ -714,7 +714,7 @@
       assertSameIds(refIds, refs, 121, 160);
       
       //Verify 40 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(40, msgIds.size()); 
       assertSameIds(msgIds, refs, 121, 160);
 
@@ -753,7 +753,7 @@
       assertSameIds(refIds, refs, 121, 180);
       
       //Verify 60 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(60, msgIds.size()); 
       assertSameIds(msgIds, refs, 121, 180); 
       
@@ -794,7 +794,7 @@
       assertSameIds(refIds, refs, 121, 240);
       
       //Verify 120 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(120, msgIds.size()); 
       assertSameIds(msgIds, refs, 121, 240);
       
@@ -831,7 +831,7 @@
       assertSameIds(refIds, refs, 121, 240);
       
       //Verify 120 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(120, msgIds.size()); 
       assertSameIds(msgIds, refs, 121, 240);
   
@@ -866,7 +866,7 @@
       assertSameIds(refIds, refs, 121, 240);
       
       //Verify 120 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(120, msgIds.size()); 
       assertSameIds(msgIds, refs, 121, 240);      
   
@@ -902,7 +902,7 @@
       assertSameIds(refIds, refs, 121, 240);
       
       //Verify 120 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(120, msgIds.size()); 
       assertSameIds(msgIds, refs, 121, 240);      
             
@@ -938,7 +938,7 @@
       assertSameIds(refIds, refs, 121, 240);
       
       //Verify 120 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(120, msgIds.size()); 
       assertSameIds(msgIds, refs, 121, 240);
       
@@ -972,7 +972,7 @@
       assertSameIds(refIds, refs, 171, 240);
       
       //Verify 70 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(70, msgIds.size()); 
       assertSameIds(msgIds, refs, 171, 240); 
   
@@ -1002,7 +1002,7 @@
       assertEquals(0, refIds.size());     
       
       //Verify 0 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(0, msgIds.size()); 
 
       //Verify 0 refs in queue

Modified: trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_P_NTTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_P_NTTest.java	2007-08-21 16:24:12 UTC (rev 3021)
+++ trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_P_NTTest.java	2007-08-21 21:07:50 UTC (rev 3022)
@@ -90,7 +90,7 @@
       assertSameIds(refIds, refs, 0, 98);
       
       //Verify 99 msgs in storage
-      List msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); List msgIds = getMessageIds();
       assertEquals(99, msgIds.size());
       assertSameIds(msgIds, refs, 0, 98);
        
@@ -124,7 +124,7 @@
       assertSameIds(refIds, refs, 0, 99);
       
       //Verify 100 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(100, msgIds.size());
       assertSameIds(msgIds, refs, 0, 99);
       
@@ -161,7 +161,7 @@
       assertSameIds(refIds, refs, 0, 108);      
       
       //Verify 100 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(109, msgIds.size());
       assertSameIds(msgIds, refs, 0, 108);
       
@@ -194,7 +194,7 @@
       assertSameIds(refIds, refs, 0, 109);
       
       //Verify 110 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(110, msgIds.size()); 
       assertSameIds(msgIds, refs, 0, 109);
       
@@ -230,7 +230,7 @@
       assertSameIds(refIds, refs, 0, 110);
       
       //Verify 111 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(111, msgIds.size()); 
       assertSameIds(msgIds, refs, 0, 110);
       
@@ -269,7 +269,7 @@
       assertSameIds(refIds, refs, 0, 119);
       
       //Verify 120 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(120, msgIds.size()); 
       assertSameIds(msgIds, refs, 0, 119);
        
@@ -309,7 +309,7 @@
       assertSameIds(refIds, refs, 0, 129);
       
       //Verify 130 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(130, msgIds.size()); 
       assertSameIds(msgIds, refs, 0, 129);
       
@@ -350,7 +350,7 @@
       assertSameIds(refIds, refs, 0, 139);
       
       //Verify 140 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(140, msgIds.size()); 
       assertSameIds(msgIds, refs, 0, 139);
       
@@ -387,7 +387,7 @@
       assertSameIds(refIds, refs, 0, 140);
       
       //Verify 141 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(141, msgIds.size()); 
       assertSameIds(msgIds, refs, 0, 140);
       
@@ -423,7 +423,7 @@
       assertSameIds(refIds, refs, 1, 140);      
       
       //Verify 140 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(140, msgIds.size()); 
       assertSameIds(msgIds, refs, 1, 140);
       
@@ -461,7 +461,7 @@
       assertSameIds(refIds, refs, 19, 140);
       
       //Verify 122 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(122, msgIds.size()); 
       assertSameIds(msgIds, refs, 19, 140);
 
@@ -499,7 +499,7 @@
       assertSameIds(refIds, refs, 20, 140);
       
       //Verify 121 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(121, msgIds.size()); 
       assertSameIds(msgIds, refs, 20, 140);
       
@@ -535,7 +535,7 @@
       assertSameIds(refIds, refs, 40, 140);
       
       //Verify 120 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(101, msgIds.size()); 
       assertSameIds(msgIds, refs, 40, 140);
       
@@ -570,7 +570,7 @@
       assertSameIds(refIds, refs, 41, 140);
       
       //Verify 100 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(100, msgIds.size()); 
       assertSameIds(msgIds, refs, 41, 140); 
       
@@ -605,7 +605,7 @@
       assertSameIds(refIds, refs, 61, 140);
       
       //Verify 80 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(80, msgIds.size()); 
       assertSameIds(msgIds, refs, 61, 140); 
        
@@ -639,7 +639,7 @@
       assertSameIds(refIds, refs, 121, 140);
       
       //Verify 20 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(20, msgIds.size()); 
       assertSameIds(msgIds, refs, 121, 140);  
       
@@ -674,7 +674,7 @@
       assertSameIds(refIds, refs, 121, 160);
       
       //Verify 40 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(40, msgIds.size()); 
       assertSameIds(msgIds, refs, 121, 160);
           
@@ -710,7 +710,7 @@
       assertSameIds(refIds, refs, 121, 180);
       
       //Verify 60 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(60, msgIds.size()); 
       assertSameIds(msgIds, refs, 121, 180); 
       
@@ -748,7 +748,7 @@
       assertSameIds(refIds, refs, 121, 240);
       
       //Verify 120 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(120, msgIds.size()); 
       assertSameIds(msgIds, refs, 121, 240);
       
@@ -785,7 +785,7 @@
       assertSameIds(refIds, refs, 121, 240);
       
       //Verify 120 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(120, msgIds.size()); 
       assertSameIds(msgIds, refs, 121, 240);
   
@@ -820,7 +820,7 @@
       assertSameIds(refIds, refs, 121, 240);
       
       //Verify 120 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(120, msgIds.size()); 
       assertSameIds(msgIds, refs, 121, 240);      
   
@@ -856,7 +856,7 @@
       assertSameIds(refIds, refs, 121, 240);
       
       //Verify 120 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(120, msgIds.size()); 
       assertSameIds(msgIds, refs, 121, 240);      
             
@@ -892,7 +892,7 @@
       assertSameIds(refIds, refs, 121, 240);
       
       //Verify 120 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(120, msgIds.size()); 
       assertSameIds(msgIds, refs, 121, 240);
 
@@ -926,7 +926,7 @@
       assertSameIds(refIds, refs, 171, 240);
       
       //Verify 70 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(70, msgIds.size()); 
       assertSameIds(msgIds, refs, 171, 240); 
 
@@ -956,7 +956,7 @@
       assertEquals(0, refIds.size());     
       
       //Verify 0 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(0, msgIds.size()); 
 
       //Verify 0 refs in queue

Modified: trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_P_TTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_P_TTest.java	2007-08-21 16:24:12 UTC (rev 3021)
+++ trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_P_TTest.java	2007-08-21 21:07:50 UTC (rev 3022)
@@ -93,7 +93,7 @@
       assertSameIds(refIds, refs, 0, 98);
       
       //Verify 99 msgs in storage
-      List msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); List msgIds = getMessageIds();
       assertEquals(99, msgIds.size());
       assertSameIds(msgIds, refs, 0, 98);
       
@@ -129,7 +129,7 @@
       assertSameIds(refIds, refs, 0, 99);
       
       //Verify 100 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(100, msgIds.size());
       assertSameIds(msgIds, refs, 0, 99);
       
@@ -168,7 +168,7 @@
       assertSameIds(refIds, refs, 0, 108);      
       
       //Verify 100 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(109, msgIds.size());
       assertSameIds(msgIds, refs, 0, 108);
       
@@ -207,7 +207,7 @@
       assertSameIds(refIds, refs, 0, 109);
       
       //Verify 110 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(110, msgIds.size()); 
       assertSameIds(msgIds, refs, 0, 109);
       
@@ -244,7 +244,7 @@
       assertSameIds(refIds, refs, 0, 110);
       
       //Verify 111 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(111, msgIds.size()); 
       assertSameIds(msgIds, refs, 0, 110);
       
@@ -284,7 +284,7 @@
       assertSameIds(refIds, refs, 0, 119);
       
       //Verify 120 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(120, msgIds.size()); 
       assertSameIds(msgIds, refs, 0, 119);
       
@@ -325,7 +325,7 @@
       assertSameIds(refIds, refs, 0, 129);
       
       //Verify 130 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(130, msgIds.size()); 
       assertSameIds(msgIds, refs, 0, 129);
       
@@ -374,7 +374,7 @@
       assertSameIds(refIds, refs, 0, 139);
       
       //Verify 140 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(140, msgIds.size()); 
       assertSameIds(msgIds, refs, 0, 139);
       
@@ -412,7 +412,7 @@
       assertSameIds(refIds, refs, 0, 140);
       
       //Verify 141 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(141, msgIds.size()); 
       assertSameIds(msgIds, refs, 0, 140);
       
@@ -447,7 +447,7 @@
       assertSameIds(refIds, refs, 1, 140);      
       
       //Verify 140 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(140, msgIds.size()); 
       assertSameIds(msgIds, refs, 1, 140);
       
@@ -486,7 +486,7 @@
       assertSameIds(refIds, refs, 19, 140);
       
       //Verify 122 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(122, msgIds.size()); 
       assertSameIds(msgIds, refs, 19, 140);
       
@@ -524,7 +524,7 @@
       assertSameIds(refIds, refs, 20, 140);
       
       //Verify 121 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(121, msgIds.size()); 
       assertSameIds(msgIds, refs, 20, 140);
          
@@ -560,7 +560,7 @@
       assertSameIds(refIds, refs, 40, 140);
       
       //Verify 120 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(101, msgIds.size()); 
       assertSameIds(msgIds, refs, 40, 140);
       
@@ -595,7 +595,7 @@
       assertSameIds(refIds, refs, 41, 140);
       
       //Verify 100 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(100, msgIds.size()); 
       assertSameIds(msgIds, refs, 41, 140); 
       
@@ -630,7 +630,7 @@
       assertSameIds(refIds, refs, 61, 140);
       
       //Verify 80 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(80, msgIds.size()); 
       assertSameIds(msgIds, refs, 61, 140); 
       
@@ -665,7 +665,7 @@
       assertSameIds(refIds, refs, 121, 140);
       
       //Verify 20 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(20, msgIds.size()); 
       assertSameIds(msgIds, refs, 121, 140);  
       
@@ -702,7 +702,7 @@
       assertSameIds(refIds, refs, 121, 160);
       
       //Verify 40 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(40, msgIds.size()); 
       assertSameIds(msgIds, refs, 121, 160);
         
@@ -740,7 +740,7 @@
       assertSameIds(refIds, refs, 121, 180);
       
       //Verify 60 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(60, msgIds.size()); 
       assertSameIds(msgIds, refs, 121, 180); 
       
@@ -780,7 +780,7 @@
       assertSameIds(refIds, refs, 121, 240);
       
       //Verify 120 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(120, msgIds.size()); 
       assertSameIds(msgIds, refs, 121, 240);
       
@@ -817,7 +817,7 @@
       assertSameIds(refIds, refs, 121, 240);
       
       //Verify 120 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(120, msgIds.size()); 
       assertSameIds(msgIds, refs, 121, 240);
   
@@ -852,7 +852,7 @@
       assertSameIds(refIds, refs, 121, 240);
       
       //Verify 120 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(120, msgIds.size()); 
       assertSameIds(msgIds, refs, 121, 240);      
   
@@ -888,7 +888,7 @@
       assertSameIds(refIds, refs, 121, 240);
       
       //Verify 120 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(120, msgIds.size()); 
       assertSameIds(msgIds, refs, 121, 240);      
             
@@ -924,7 +924,7 @@
       assertSameIds(refIds, refs, 121, 240);
       
       //Verify 120 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(120, msgIds.size()); 
       assertSameIds(msgIds, refs, 121, 240);
       
@@ -958,7 +958,7 @@
       assertSameIds(refIds, refs, 171, 240);
       
       //Verify 70 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(70, msgIds.size()); 
       assertSameIds(msgIds, refs, 171, 240); 
 
@@ -988,7 +988,7 @@
       assertEquals(0, refIds.size());     
       
       //Verify 0 msgs in storage
-      msgIds = getMessageIds();
+      pm.reapUnreferencedMessages(); msgIds = getMessageIds();
       assertEquals(0, msgIds.size()); 
 
       //Verify 0 refs in queue

Modified: trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_ReloadTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_ReloadTest.java	2007-08-21 16:24:12 UTC (rev 3021)
+++ trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_ReloadTest.java	2007-08-21 21:07:50 UTC (rev 3022)
@@ -227,6 +227,7 @@
       refIds = getReferenceIdsOrderedByPageOrd(queue2.getChannelID());
       assertEquals(0, refIds.size());
       
+      pm.reapUnreferencedMessages();
       List msgIds = getMessageIds();
       assertEquals(0, msgIds.size());
                                                                   
@@ -292,6 +293,7 @@
       refIds = getReferenceIdsOrderedByPageOrd(queue.getChannelID());
       assertEquals(0, refIds.size());
       
+      pm.reapUnreferencedMessages();
       List msgIds = getMessageIds();
       assertEquals(0, msgIds.size());
                                                                   

Modified: trunk/tests/src/org/jboss/test/messaging/jms/AcknowledgementTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/AcknowledgementTest.java	2007-08-21 16:24:12 UTC (rev 3021)
+++ trunk/tests/src/org/jboss/test/messaging/jms/AcknowledgementTest.java	2007-08-21 21:07:50 UTC (rev 3022)
@@ -65,7 +65,7 @@
    // TestCase overrides -------------------------------------------
 
    // Public --------------------------------------------------------
-      
+
    /* Topics shouldn't hold on to messages if there are no subscribers */
    public void testPersistentMessagesForTopicDropped() throws Exception
    {

Modified: trunk/tests/src/org/jboss/test/messaging/jms/JMSTestCase.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/JMSTestCase.java	2007-08-21 16:24:12 UTC (rev 3021)
+++ trunk/tests/src/org/jboss/test/messaging/jms/JMSTestCase.java	2007-08-21 21:07:50 UTC (rev 3022)
@@ -110,6 +110,7 @@
 		
 		if (ServerManagement.isStarted(0))
 		{			
+			ServerManagement.getServer().reapMessages();
 			if (checkNoMessageData())
 			{
 				fail("Message Data exists");

Modified: trunk/tests/src/org/jboss/test/messaging/jms/MessageCleanupTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/MessageCleanupTest.java	2007-08-21 16:24:12 UTC (rev 3021)
+++ trunk/tests/src/org/jboss/test/messaging/jms/MessageCleanupTest.java	2007-08-21 21:07:50 UTC (rev 3022)
@@ -115,6 +115,8 @@
       
       assertEquals(0, getReferenceIds().size());
       
+      ServerManagement.getServer().reapMessages();
+      
       assertEquals(0, getMessageIds().size());
       
       conn.close();
@@ -162,6 +164,9 @@
         
       assertEquals(0, getReferenceIds().size());
       
+      ServerManagement.getServer().reapMessages();
+      
+      
       assertEquals(0, getMessageIds().size());
       
       
@@ -213,6 +218,9 @@
       
       assertEquals(0, getReferenceIds().size());
       
+      ServerManagement.getServer().reapMessages();
+      
+      
       assertEquals(0, getMessageIds().size());
       
    }
@@ -264,6 +272,9 @@
         
       assertEquals(0, getReferenceIds().size());
       
+      ServerManagement.getServer().reapMessages();
+      
+      
       assertEquals(0, getMessageIds().size());
       
    }

Modified: trunk/tests/src/org/jboss/test/messaging/jms/XARecoveryTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/XARecoveryTest.java	2007-08-21 16:24:12 UTC (rev 3021)
+++ trunk/tests/src/org/jboss/test/messaging/jms/XARecoveryTest.java	2007-08-21 21:07:50 UTC (rev 3022)
@@ -291,12 +291,7 @@
          m = cons2.receive(1000);
          
          assertNull(m);
-         
-         if (checkNoMessageData())
-         {
-            fail("Data remains in database");
-         }
-         
+           
       }
       finally
       {
@@ -526,13 +521,7 @@
          
          m = cons2.receive(1000);
          
-         assertNull(m);
-         
-         if (checkNoMessageData())
-         {
-            fail("Data remains in database");
-         }
-         
+         assertNull(m);         
       }
       finally
       {
@@ -796,12 +785,6 @@
          m = cons2.receive(1000);
          
          assertNull(m);
-         
-         if (checkNoMessageData())
-         {
-            fail("Data remains in database");
-         }
-         
       }
       finally
       {
@@ -1070,12 +1053,6 @@
          assertNull(m);
          
          cons1.close();
-         
-         if (checkNoMessageData())
-         {
-            fail("Data remains in database");
-         }
-         
       }
       finally
       {
@@ -1296,12 +1273,6 @@
          m = cons2.receive(1000);
          
          assertNull(m);
-         
-         if (checkNoMessageData())
-         {
-            fail("Data remains in database");
-         }
-         
       }
       finally
       {
@@ -1437,10 +1408,6 @@
          
          assertEquals(tm1.getText(), rm1.getText());
          
-         if (checkNoMessageData())
-         {
-            fail("Data remains in database");
-         }
       }
       finally
       {
@@ -1603,11 +1570,7 @@
          assertNotNull(rm1);
          
          assertEquals(tm1.getText(), rm1.getText());
-         
-         if (checkNoMessageData())
-         {
-            fail("Data remains in database");
-         }
+   
       }
       finally
       {
@@ -1747,10 +1710,6 @@
          
          assertNull(m);
          
-         if (checkNoMessageData())
-         {
-            fail("Data remains in database");
-         }
       }
       finally
       {
@@ -1898,11 +1857,7 @@
          Message m = cons1.receive(1000);
          
          assertNull(m);
-         
-         if (checkNoMessageData())
-         {
-            fail("Data remains in database");
-         }                  
+                       
       }
       finally
       {
@@ -2161,10 +2116,6 @@
          
          sess1.unsubscribe("sub2");
          
-         if (checkNoMessageData())
-         {
-            fail("Data remains in database");
-         }
       }
       finally
       {
@@ -2434,11 +2385,7 @@
          sess1.unsubscribe("sub1");
          
          sess1.unsubscribe("sub2");         
-         
-         if (checkNoMessageData())
-         {
-            fail("Data remains in database");
-         }
+
       }
       finally
       {
@@ -2651,11 +2598,6 @@
          
          TextMessage m5 = (TextMessage)cons.receive(1000);
          assertNull(m5);
-         
-         if (checkNoMessageData())
-         {
-            fail("Data remains in database");
-         }
       }
       finally
       {
@@ -2812,11 +2754,7 @@
    
          Message nullMessage = cons.receive(MIN_TIMEOUT);
          assertTrue(nullMessage == null);
-         
-         if (checkNoMessageData())
-         {
-            fail("Data remains in database");
-         }
+
       }
       finally
       {
@@ -2965,11 +2903,7 @@
          assertNotNull(m2);
    
          assertEquals("testing2", m2.getText());
-         
-         if (checkNoMessageData())
-         {
-            fail("Data remains in database");
-         }
+
       }
       finally
       {
@@ -3111,11 +3045,6 @@
          assertNotNull(m2);
    
          assertEquals("testing2", m2.getText());
-         
-         if (checkNoMessageData())
-         {
-            fail("Data remains in database");
-         }
       }
       finally
       {
@@ -3268,10 +3197,6 @@
          assertNotNull(m3);
          assertEquals("testing3", m3.getText());
          
-         if (checkNoMessageData())
-         {
-            fail("Data remains in database");
-         }
       }
       finally
       {

Modified: trunk/tests/src/org/jboss/test/messaging/jms/XATest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/XATest.java	2007-08-21 16:24:12 UTC (rev 3021)
+++ trunk/tests/src/org/jboss/test/messaging/jms/XATest.java	2007-08-21 21:07:50 UTC (rev 3022)
@@ -1130,11 +1130,6 @@
          assertEqualByteArrays(trailing.getBranchQualifier(), trailing2.getBranchQualifier());
 
          res.commit(trailing, false);
-
-         if (checkNoMessageData())
-         {
-            fail("Data remains in database");
-         }
       }
       finally
       {

Modified: trunk/tests/src/org/jboss/test/messaging/tools/container/LocalTestServer.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/tools/container/LocalTestServer.java	2007-08-21 16:24:12 UTC (rev 3021)
+++ trunk/tests/src/org/jboss/test/messaging/tools/container/LocalTestServer.java	2007-08-21 21:07:50 UTC (rev 3022)
@@ -899,6 +899,11 @@
    	getServerPeer().resetAllSuckers();
    }
    
+   public void reapMessages() throws Exception
+   {
+   	getServerPeer().getPersistenceManagerInstance().reapUnreferencedMessages();
+   }
+   
    // Public ---------------------------------------------------------------------------------------
 
    // Package protected ----------------------------------------------------------------------------

Modified: trunk/tests/src/org/jboss/test/messaging/tools/container/RMITestServer.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/tools/container/RMITestServer.java	2007-08-21 16:24:12 UTC (rev 3021)
+++ trunk/tests/src/org/jboss/test/messaging/tools/container/RMITestServer.java	2007-08-21 21:07:50 UTC (rev 3022)
@@ -501,6 +501,11 @@
    	server.resetAllSuckers();
    }   
    
+   public void reapMessages() throws Exception
+   {
+   	server.reapMessages();
+   }
+   
    // Public --------------------------------------------------------
 
    // Package protected ---------------------------------------------

Modified: trunk/tests/src/org/jboss/test/messaging/tools/container/Server.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/tools/container/Server.java	2007-08-21 16:24:12 UTC (rev 3021)
+++ trunk/tests/src/org/jboss/test/messaging/tools/container/Server.java	2007-08-21 21:07:50 UTC (rev 3022)
@@ -288,4 +288,6 @@
    void flushManagedConnectionPool() throws Exception;
    
    void resetAllSuckers() throws Exception;   
+   
+   void reapMessages() throws Exception;
 }




More information about the jboss-cvs-commits mailing list