[jboss-cvs] JBoss Messaging SVN: r3213 - in trunk: src/etc/xmdesc and 5 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Oct 19 17:41:35 EDT 2007


Author: timfox
Date: 2007-10-19 17:41:35 -0400 (Fri, 19 Oct 2007)
New Revision: 3213

Modified:
   trunk/src/etc/server/default/deploy/db2-persistence-service.xml
   trunk/src/etc/server/default/deploy/mssql-persistence-service.xml
   trunk/src/etc/server/default/deploy/mysql-persistence-service.xml
   trunk/src/etc/server/default/deploy/mysqlcluster-persistence-service.xml
   trunk/src/etc/server/default/deploy/oracle-persistence-service.xml
   trunk/src/etc/server/default/deploy/postgresql-persistence-service.xml
   trunk/src/etc/server/default/deploy/sybase-persistence-service.xml
   trunk/src/etc/xmdesc/JDBCPersistenceManager-xmbean.xml
   trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
   trunk/src/main/org/jboss/messaging/core/jmx/JDBCPersistenceManagerService.java
   trunk/tests/src/org/jboss/test/messaging/core/IdManagerTest.java
   trunk/tests/src/org/jboss/test/messaging/core/JDBCPersistenceManagerTest.java
   trunk/tests/src/org/jboss/test/messaging/core/MessagingQueueTestBase.java
   trunk/tests/src/org/jboss/test/messaging/core/PostOfficeTestBase.java
   trunk/tests/src/org/jboss/test/messaging/core/paging/PagingStateTestBase.java
   trunk/tests/src/org/jboss/test/messaging/core/paging/PagingTest.java
   trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_ReloadTest.java
   trunk/tests/src/org/jboss/test/messaging/jms/persistence/MessagePersistenceManagerTest.java
Log:
More tweaks to reaping


Modified: trunk/src/etc/server/default/deploy/db2-persistence-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/db2-persistence-service.xml	2007-10-19 19:07:01 UTC (rev 3212)
+++ trunk/src/etc/server/default/deploy/db2-persistence-service.xml	2007-10-19 21:41:35 UTC (rev 3213)
@@ -67,6 +67,7 @@
    INSERT_MESSAGE_CONDITIONAL_FULL=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE, INS_TIME, HEADERS, PAYLOAD) SELECT ?, ?, ?, ?, ?, ?, ?, ?, ? FROM JBM_DUAL WHERE NOT EXISTS (SELECT MESSAGE_ID FROM JBM_MSG WHERE MESSAGE_ID = ?)   
    MESSAGE_ID_COLUMN=MESSAGE_ID
    REAP_MESSAGES=DELETE FROM JBM_MSG WHERE INS_TIME < ? AND NOT EXISTS (SELECT * FROM JBM_MSG_REF WHERE JBM_MSG_REF.MESSAGE_ID = JBM_MSG.MESSAGE_ID)   
+   DELETE_MESSAGE=DELETE FROM JBM_MSG WHERE MESSAGE_ID = ? AND NOT EXISTS (SELECT * FROM JBM_MSG_REF WHERE JBM_MSG_REF.MESSAGE_ID = ?)      
    INSERT_TRANSACTION=INSERT INTO JBM_TX (NODE_ID, TRANSACTION_ID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID) VALUES(?, ?, ?, ?, ?)
    DELETE_TRANSACTION=DELETE FROM JBM_TX WHERE NODE_ID = ? AND TRANSACTION_ID = ?
    SELECT_PREPARED_TRANSACTIONS=SELECT TRANSACTION_ID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID FROM JBM_TX WHERE NODE_ID = ?
@@ -85,12 +86,8 @@
       
       <!-- The period between asynchronous reaps of unreferenced messages, or zero for synchronous reaping -->
       
-      <attribute name="ReaperPeriod">5000</attribute>
+      <attribute name="ReaperPeriod">0</attribute>
       
-      <!-- The number of messages to do a synchronous reap after, or zero for asynchronous reaping -->
-      
-      <attribute name="SynchronousReapMessages">0</attribute>
-      
    </mbean>
 
    <!-- Messaging Post Office MBean configuration

Modified: trunk/src/etc/server/default/deploy/mssql-persistence-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/mssql-persistence-service.xml	2007-10-19 19:07:01 UTC (rev 3212)
+++ trunk/src/etc/server/default/deploy/mssql-persistence-service.xml	2007-10-19 21:41:35 UTC (rev 3213)
@@ -70,6 +70,7 @@
    INSERT_MESSAGE_CONDITIONAL_FULL=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE, INS_TIME, HEADERS, PAYLOAD) SELECT ?, ?, ?, ?, ?, ?, ?, ?, ? FROM JBM_DUAL WHERE NOT EXISTS (SELECT MESSAGE_ID FROM JBM_MSG WHERE MESSAGE_ID = ?)   
    MESSAGE_ID_COLUMN=MESSAGE_ID
    REAP_MESSAGES=DELETE FROM JBM_MSG WHERE INS_TIME < ? AND NOT EXISTS (SELECT * FROM JBM_MSG_REF WHERE JBM_MSG_REF.MESSAGE_ID = JBM_MSG.MESSAGE_ID)
+   DELETE_MESSAGE=DELETE FROM JBM_MSG WHERE MESSAGE_ID = ? AND NOT EXISTS (SELECT * FROM JBM_MSG_REF WHERE JBM_MSG_REF.MESSAGE_ID = ?)   
    INSERT_TRANSACTION=INSERT INTO JBM_TX (NODE_ID, TRANSACTION_ID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID) VALUES(?, ?, ?, ?, ?)
    DELETE_TRANSACTION=DELETE FROM JBM_TX WHERE NODE_ID = ? AND TRANSACTION_ID = ?
    SELECT_PREPARED_TRANSACTIONS=SELECT TRANSACTION_ID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID FROM JBM_TX WHERE NODE_ID = ?
@@ -88,11 +89,7 @@
       
       <!-- The period between asynchronous reaps of unreferenced messages, or zero for synchronous reaping -->
       
-      <attribute name="ReaperPeriod">5000</attribute>
-      
-      <!-- The number of messages to do a synchronous reap after, or zero for asynchronous reaping -->
-      
-      <attribute name="SynchronousReapMessages">0</attribute>
+      <attribute name="ReaperPeriod">0</attribute>      
    </mbean>
 
    <!-- Messaging Post Office MBean configuration

Modified: trunk/src/etc/server/default/deploy/mysql-persistence-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/mysql-persistence-service.xml	2007-10-19 19:07:01 UTC (rev 3212)
+++ trunk/src/etc/server/default/deploy/mysql-persistence-service.xml	2007-10-19 21:41:35 UTC (rev 3213)
@@ -70,6 +70,7 @@
    INSERT_MESSAGE_CONDITIONAL_FULL=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE, INS_TIME, HEADERS, PAYLOAD) SELECT ?, ?, ?, ?, ?, ?, ?, ?, ? FROM JBM_DUAL WHERE NOT EXISTS (SELECT MESSAGE_ID FROM JBM_MSG WHERE MESSAGE_ID = ?)   
    MESSAGE_ID_COLUMN=MESSAGE_ID
    REAP_MESSAGES=DELETE FROM JBM_MSG WHERE INS_TIME < ? AND NOT EXISTS (SELECT * FROM JBM_MSG_REF WHERE JBM_MSG_REF.MESSAGE_ID = JBM_MSG.MESSAGE_ID)
+   DELETE_MESSAGE=DELETE FROM JBM_MSG WHERE MESSAGE_ID = ? AND NOT EXISTS (SELECT * FROM JBM_MSG_REF WHERE JBM_MSG_REF.MESSAGE_ID = ?)      
    INSERT_TRANSACTION=INSERT INTO JBM_TX (NODE_ID, TRANSACTION_ID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID) VALUES(?, ?, ?, ?, ?)
    DELETE_TRANSACTION=DELETE FROM JBM_TX WHERE NODE_ID = ? AND TRANSACTION_ID = ?
    SELECT_PREPARED_TRANSACTIONS=SELECT TRANSACTION_ID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID FROM JBM_TX WHERE NODE_ID = ?
@@ -88,11 +89,7 @@
       
       <!-- The period between asynchronous reaps of unreferenced messages, or zero for synchronous reaping -->
       
-      <attribute name="ReaperPeriod">5000</attribute>
-      
-      <!-- The number of messages to do a synchronous reap after, or zero for asynchronous reaping -->
-      
-      <attribute name="SynchronousReapMessages">0</attribute>
+      <attribute name="ReaperPeriod">0</attribute>      
    </mbean>
 
    <!-- Messaging Post Office MBean configuration

Modified: trunk/src/etc/server/default/deploy/mysqlcluster-persistence-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/mysqlcluster-persistence-service.xml	2007-10-19 19:07:01 UTC (rev 3212)
+++ trunk/src/etc/server/default/deploy/mysqlcluster-persistence-service.xml	2007-10-19 21:41:35 UTC (rev 3213)
@@ -70,6 +70,7 @@
    INSERT_MESSAGE_CONDITIONAL_FULL=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE, INS_TIME, HEADERS, PAYLOAD) SELECT ?, ?, ?, ?, ?, ?, ?, ?, ? FROM JBM_DUAL WHERE NOT EXISTS (SELECT MESSAGE_ID FROM JBM_MSG WHERE MESSAGE_ID = ?)   
    MESSAGE_ID_COLUMN=MESSAGE_ID
    REAP_MESSAGES=DELETE FROM JBM_MSG WHERE INS_TIME < ? AND NOT EXISTS (SELECT * FROM JBM_MSG_REF WHERE JBM_MSG_REF.MESSAGE_ID = JBM_MSG.MESSAGE_ID)
+   DELETE_MESSAGE=DELETE FROM JBM_MSG WHERE MESSAGE_ID = ? AND NOT EXISTS (SELECT * FROM JBM_MSG_REF WHERE JBM_MSG_REF.MESSAGE_ID = ?)   
    INSERT_TRANSACTION=INSERT INTO JBM_TX (NODE_ID, TRANSACTION_ID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID) VALUES(?, ?, ?, ?, ?)
    DELETE_TRANSACTION=DELETE FROM JBM_TX WHERE NODE_ID = ? AND TRANSACTION_ID = ?
    SELECT_PREPARED_TRANSACTIONS=SELECT TRANSACTION_ID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID FROM JBM_TX WHERE NODE_ID = ?
@@ -88,12 +89,8 @@
 
       <!-- The period between asynchronous reaps of unreferenced messages, or zero for synchronous reaping -->
       
-      <attribute name="ReaperPeriod">5000</attribute>
+      <attribute name="ReaperPeriod">0</attribute>      
       
-      <!-- The number of messages to do a synchronous reap after, or zero for asynchronous reaping -->
-      
-      <attribute name="SynchronousReapMessages">0</attribute>
-      
    </mbean>
 
    <!-- Messaging Post Office MBean configuration

Modified: trunk/src/etc/server/default/deploy/oracle-persistence-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/oracle-persistence-service.xml	2007-10-19 19:07:01 UTC (rev 3212)
+++ trunk/src/etc/server/default/deploy/oracle-persistence-service.xml	2007-10-19 21:41:35 UTC (rev 3213)
@@ -74,6 +74,7 @@
    INSERT_MESSAGE_CONDITIONAL_FULL=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE, INS_TIME, HEADERS, PAYLOAD) SELECT ?, ?, ?, ?, ?, ?, ?, ?, ? FROM JBM_DUAL WHERE NOT EXISTS (SELECT MESSAGE_ID FROM JBM_MSG WHERE MESSAGE_ID = ?)   
    MESSAGE_ID_COLUMN=MESSAGE_ID
    REAP_MESSAGES=DELETE FROM JBM_MSG WHERE INS_TIME < ? AND NOT EXISTS (SELECT * FROM JBM_MSG_REF WHERE JBM_MSG_REF.MESSAGE_ID = JBM_MSG.MESSAGE_ID)
+   DELETE_MESSAGE=DELETE FROM JBM_MSG WHERE MESSAGE_ID = ? AND NOT EXISTS (SELECT * FROM JBM_MSG_REF WHERE JBM_MSG_REF.MESSAGE_ID = ?)   
    INSERT_TRANSACTION=INSERT INTO JBM_TX (NODE_ID, TRANSACTION_ID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID) VALUES(?, ?, ?, ?, ?)
    DELETE_TRANSACTION=DELETE FROM JBM_TX WHERE NODE_ID = ? AND TRANSACTION_ID = ?
    SELECT_PREPARED_TRANSACTIONS=SELECT TRANSACTION_ID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID FROM JBM_TX WHERE NODE_ID = ?
@@ -92,12 +93,7 @@
       
       <!-- The period between asynchronous reaps of unreferenced messages, or zero for synchronous reaping -->
       
-      <attribute name="ReaperPeriod">5000</attribute>
-      
-      <!-- The number of messages to do a synchronous reap after, or zero for asynchronous reaping -->
-      
-      <attribute name="SynchronousReapMessages">0</attribute>
-      
+      <attribute name="ReaperPeriod">0</attribute>      
    </mbean>
 
    <!-- Messaging Post Office MBean configuration

Modified: trunk/src/etc/server/default/deploy/postgresql-persistence-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/postgresql-persistence-service.xml	2007-10-19 19:07:01 UTC (rev 3212)
+++ trunk/src/etc/server/default/deploy/postgresql-persistence-service.xml	2007-10-19 21:41:35 UTC (rev 3213)
@@ -70,6 +70,7 @@
    INSERT_MESSAGE_CONDITIONAL_FULL=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE, INS_TIME, HEADERS, PAYLOAD) SELECT ?, ?, ?, ?, ?, ?, ?, ?, ? FROM JBM_DUAL WHERE NOT EXISTS (SELECT MESSAGE_ID FROM JBM_MSG WHERE MESSAGE_ID = ?)   
    MESSAGE_ID_COLUMN=MESSAGE_ID
    REAP_MESSAGES=DELETE FROM JBM_MSG WHERE INS_TIME < ? AND NOT EXISTS (SELECT * FROM JBM_MSG_REF WHERE JBM_MSG_REF.MESSAGE_ID = JBM_MSG.MESSAGE_ID)
+   DELETE_MESSAGE=DELETE FROM JBM_MSG WHERE MESSAGE_ID = ? AND NOT EXISTS (SELECT * FROM JBM_MSG_REF WHERE JBM_MSG_REF.MESSAGE_ID = ?)   
    INSERT_TRANSACTION=INSERT INTO JBM_TX (NODE_ID, TRANSACTION_ID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID) VALUES(?, ?, ?, ?, ?)
    DELETE_TRANSACTION=DELETE FROM JBM_TX WHERE NODE_ID = ? AND TRANSACTION_ID = ?
    SELECT_PREPARED_TRANSACTIONS=SELECT TRANSACTION_ID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID FROM JBM_TX WHERE NODE_ID = ?
@@ -88,11 +89,8 @@
       
       <!-- The period between asynchronous reaps of unreferenced messages, or zero for synchronous reaping -->
       
-      <attribute name="ReaperPeriod">5000</attribute>
+      <attribute name="ReaperPeriod">0</attribute>
       
-      <!-- The number of messages to do a synchronous reap after, or zero for asynchronous reaping -->
-      
-      <attribute name="SynchronousReapMessages">0</attribute>
    </mbean>
 
    <!-- Messaging Post Office MBean configuration

Modified: trunk/src/etc/server/default/deploy/sybase-persistence-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/sybase-persistence-service.xml	2007-10-19 19:07:01 UTC (rev 3212)
+++ trunk/src/etc/server/default/deploy/sybase-persistence-service.xml	2007-10-19 21:41:35 UTC (rev 3213)
@@ -75,6 +75,7 @@
    INSERT_MESSAGE_CONDITIONAL_FULL=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE, INS_TIME, HEADERS, PAYLOAD) SELECT ?, ?, ?, ?, ?, ?, ?, ?, ? FROM JBM_DUAL WHERE NOT EXISTS (SELECT MESSAGE_ID FROM JBM_MSG WHERE MESSAGE_ID = ?)   
    MESSAGE_ID_COLUMN=MESSAGE_ID
    REAP_MESSAGES=DELETE FROM JBM_MSG WHERE INS_TIME < ? AND NOT EXISTS (SELECT * FROM JBM_MSG_REF WHERE JBM_MSG_REF.MESSAGE_ID = JBM_MSG.MESSAGE_ID)
+   DELETE_MESSAGE=DELETE FROM JBM_MSG WHERE MESSAGE_ID = ? AND NOT EXISTS (SELECT * FROM JBM_MSG_REF WHERE JBM_MSG_REF.MESSAGE_ID = ?)   
    INSERT_TRANSACTION=INSERT INTO JBM_TX (NODE_ID, TRANSACTION_ID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID) VALUES(?, ?, ?, ?, ?)
    DELETE_TRANSACTION=DELETE FROM JBM_TX WHERE NODE_ID = ? AND TRANSACTION_ID = ?
    SELECT_PREPARED_TRANSACTIONS=SELECT TRANSACTION_ID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID FROM JBM_TX WHERE NODE_ID = ?
@@ -93,11 +94,8 @@
       
       <!-- The period between asynchronous reaps of unreferenced messages, or zero for synchronous reaping -->
       
-      <attribute name="ReaperPeriod">5000</attribute>
+      <attribute name="ReaperPeriod">0</attribute>
       
-      <!-- The number of messages to do a synchronous reap after, or zero for asynchronous reaping -->
-      
-      <attribute name="SynchronousReapMessages">0</attribute>
    </mbean>
 
    <!-- Messaging Post Office MBean configuration

Modified: trunk/src/etc/xmdesc/JDBCPersistenceManager-xmbean.xml
===================================================================
--- trunk/src/etc/xmdesc/JDBCPersistenceManager-xmbean.xml	2007-10-19 19:07:01 UTC (rev 3212)
+++ trunk/src/etc/xmdesc/JDBCPersistenceManager-xmbean.xml	2007-10-19 21:41:35 UTC (rev 3213)
@@ -71,12 +71,6 @@
       <type>long</type>
    </attribute>    
    
-   <attribute access="read-write" getMethod="getSynchronousReapMessages" setMethod="setSynchronousReapMessages">
-      <description>The number of messages to synchronously force a reap after, or zero to reap asynchronously</description>
-      <name>SynchronousReapMessages</name>
-      <type>int</type>
-   </attribute> 
-   
    <attribute access="read-write" getMethod="isSupportsBlobOnSelect" setMethod="setSupportsBlobOnSelect">
       <description>Some databases don't support binding blobs on select clauses</description>
       <name>SupportsBlobOnSelect</name>

Modified: trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java	2007-10-19 19:07:01 UTC (rev 3212)
+++ trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java	2007-10-19 21:41:35 UTC (rev 3213)
@@ -44,7 +44,6 @@
 import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.sql.DataSource;
 import javax.transaction.TransactionManager;
@@ -106,10 +105,6 @@
    
    private boolean reaperRunning;
    
-   private int synchronousReapMessages;
-   
-   private AtomicInteger syncReapCount;
-
    // Some versions of the oracle driver don't support binding blobs on select clauses,
    // what would force us to use a two stage insert (insert and if successful, update)
    private boolean supportsBlobSelect;
@@ -119,7 +114,7 @@
    public JDBCPersistenceManager(DataSource ds, TransactionManager tm, Properties sqlProperties,
                                  boolean createTablesOnStartup, boolean usingBatchUpdates,
                                  boolean usingBinaryStream, boolean usingTrailingByte, int maxParams,
-                                 long reaperPeriod, int synchronousReapMessages, boolean supportsBlobSelect)
+                                 long reaperPeriod, boolean supportsBlobSelect)
    {
       super(ds, tm, sqlProperties, createTablesOnStartup);
       
@@ -133,19 +128,13 @@
       
       this.reaperPeriod = reaperPeriod;
       
-      this.synchronousReapMessages = synchronousReapMessages;
-      
       if (reaperPeriod > 0)
       {
 	      reaperTimer = new Timer(true);
 	      
 	      reaper = new Reaper();
       }
-      else
-      {
-      	syncReapCount = new AtomicInteger(0);
-      }
-      
+
       this.supportsBlobSelect = supportsBlobSelect;
    }
    
@@ -685,10 +674,17 @@
    		{
 		      PreparedStatement psDeleteReference = null; 
 		      
+		      PreparedStatement psDeleteMessage = null;
+		      
 		      try
 		      {	
 		         psDeleteReference = conn.prepareStatement(getSQLStatement("DELETE_MESSAGE_REF"));
 		         
+		         if (reaper == null)
+		         {
+		         	psDeleteMessage = conn.prepareStatement(getSQLStatement("DELETE_MESSAGE"));
+		         }
+		         
 		         Iterator iter = references.iterator();		         
 		
 		         while (iter.hasNext())
@@ -699,9 +695,17 @@
 		            
 		            int rows = psDeleteReference.executeUpdate();
 		            
-		            addToReapCount(rows);
-		               
-		            if (trace) { log.trace("Deleted " + rows + " rows"); }		            		                          
+		            if (trace) { log.trace("Deleted " + rows + " references"); }
+		            
+		            if (reaper == null)
+		            {		            
+			            psDeleteMessage.setLong(1, ref.getMessage().getMessageID());
+			            psDeleteMessage.setLong(2, ref.getMessage().getMessageID());
+			            
+			            rows = psDeleteMessage.executeUpdate();
+			            
+			            if (trace) { log.trace("Deleted " + rows + " messages"); }
+		            }
 		         }         
 		         
 		         return null;
@@ -709,13 +713,12 @@
 		      finally
 		      {
 		      	closeStatement(psDeleteReference);       
+		      	closeStatement(psDeleteMessage);
 		      }      
    		}
       }
       
-      new RemoveDepagedReferencesRunner().executeWithRetry();   
-      
-      checkReap();      
+      new RemoveDepagedReferencesRunner().executeWithRetry();    
    }
    
    // After loading paged refs this is used to update P messages to non paged
@@ -1174,6 +1177,11 @@
       return (InitialLoadInfo)new MergeAndLoadRunner().executeWithRetry();      
    }
    
+   public void testSpeed() throws Exception
+   {
+   	
+   }
+   
    // End of paging functionality
    // ===========================
    
@@ -1280,11 +1288,18 @@
 			public Object doTransaction() throws Exception
 			{  
 				PreparedStatement psReference = null;
+				
+				PreparedStatement psMessage = null;
 
 	         try
 	         {
 	            psReference = conn.prepareStatement(getSQLStatement("DELETE_MESSAGE_REF"));
 	            
+	            if (reaper == null)
+	            {
+	            	psMessage = conn.prepareStatement(getSQLStatement("DELETE_MESSAGE"));
+	            }
+	            
 	            //Remove the message reference
 	            removeReference(channelID, ref, psReference);
 	             
@@ -1296,15 +1311,24 @@
 	               return null;
 	            }
 	            
-	            if (trace) { log.trace("Deleted " + rows + " rows"); }  
+	            if (trace) { log.trace("Deleted " + rows + " references"); }
+	            
+	            if (reaper == null)
+	            {	           
+		            psMessage.setLong(1, ref.getMessage().getMessageID());
+		            psMessage.setLong(2, ref.getMessage().getMessageID());
+		            
+		            rows = psMessage.executeUpdate();
+		            
+		            if (trace) { log.trace("Deleted " + rows + " messages"); }
+	            }
 	            	            
-	            incrementReapCount();
-	            
 	            return null;
 	         }
 	         finally
 	         {
 	         	closeStatement(psReference);
+	         	closeStatement(psMessage);
 	         }  
 			}
    	}
@@ -1321,9 +1345,7 @@
       {         
          //No tx so we remove the reference directly from the db
       
-         new RemoveReferenceRunner().executeWithRetry();
-         
-         checkReap();         
+         new RemoveReferenceRunner().executeWithRetry();       
       }
    }
    
@@ -1405,6 +1427,7 @@
 		      PreparedStatement psReference = null;
 		      PreparedStatement psInsertMessage = null;
             PreparedStatement psDeleteReference = null;
+            PreparedStatement psDeleteMessage = null;
 		      
 		      List<Message> messagesStored = new ArrayList<Message>();
 
@@ -1467,9 +1490,22 @@
 		                        
 		            int rows = psDeleteReference.executeUpdate();
 		            
-		            if (trace) { log.trace("Deleted " + rows + " rows"); }
+		            if (trace) { log.trace("Deleted " + rows + " references"); }
 		            
-		            incrementReapCount();
+		            if (reaper == null)
+		            {		            
+			            if (psDeleteMessage == null)
+			            {
+			            	psDeleteMessage = conn.prepareStatement(getSQLStatement("DELETE_MESSAGE"));
+			            }
+			            
+			            psDeleteMessage.setLong(1, pair.ref.getMessage().getMessageID());
+			            psDeleteMessage.setLong(2, pair.ref.getMessage().getMessageID());
+			            
+			            rows = psDeleteMessage.executeUpdate();
+			            
+			            if (trace) { log.trace("Deleted " + rows + " messages"); }
+		            }
 		         }
 		         
 		         return null;
@@ -1489,15 +1525,14 @@
                closeStatement(psReference);
 		      	closeStatement(psDeleteReference);
 		      	closeStatement(psInsertMessage);
+		      	closeStatement(psDeleteMessage);
 		      }
 			}   		
    	}   	      
-   	new HandleBeforeCommit1PCRunner().executeWithRetry();
-   	
-      checkReap();      
+   	new HandleBeforeCommit1PCRunner().executeWithRetry();     
    }
    
-   protected void handleBeforeCommit2PC(final Transaction tx) throws Exception
+   protected void handleBeforeCommit2PC(final List refsToRemove, final Transaction tx) throws Exception
    {          
    	class HandleBeforeCommit2PCRunner extends JDBCTxRunner
    	{
@@ -1519,17 +1554,39 @@
 		         
 		         ps.close();
 		         ps = null;
-		         
-		         
+		         		         
 		         ps = conn.prepareStatement(getSQLStatement("COMMIT_MESSAGE_REF2"));
 		         ps.setLong(1, tx.getId());         
 		         
 		         rows = ps.executeUpdate();
 		         
-		         addToReapCount(rows);
-		         
 		         if (trace) { log.trace(JDBCUtil.statementToString(getSQLStatement("COMMIT_MESSAGE_REF2"), new Long(tx.getId())) + " updated " + rows + " row(s)"); }
 		         
+		         ps.close();
+		         ps = null;
+		         
+		         if (reaper == null)
+		         {			         
+			         Iterator iter = refsToRemove.iterator();
+			         
+			         while (iter.hasNext())
+			         {
+			         	ChannelRefPair pair = (ChannelRefPair)iter.next();
+			         	
+			         	if (ps == null)
+			         	{
+			         		ps = conn.prepareStatement(getSQLStatement("DELETE_MESSAGE"));
+			         	}
+			         		
+		         		ps.setLong(1, pair.ref.getMessage().getMessageID());
+		         		ps.setLong(2, pair.ref.getMessage().getMessageID());
+		         		
+		         		rows = ps.executeUpdate();
+		         		
+		         		if (trace) { log.trace("Deleted " + rows + " messages"); }			         	           
+			         }
+		         }
+		         
 		         removeTXRecord(conn, tx);
 		         
 		         return null;
@@ -1542,8 +1599,6 @@
    	}
    	
    	new HandleBeforeCommit2PCRunner().executeWithRetry();
-   	
-   	checkReap();
    }
    
    protected void handleBeforePrepare(final List refsToAdd, final List refsToRemove, final Transaction tx) throws Exception
@@ -1670,14 +1725,13 @@
 		         
 		         int rows = ps.executeUpdate();
 		         
-		         addToReapCount(rows);
-		         
 		         if (trace)
 		         {
 		            log.trace(JDBCUtil.statementToString(getSQLStatement("ROLLBACK_MESSAGE_REF1"), new Long(tx.getId())) + " removed " + rows + " row(s)");
 		         }
 		         
 		         ps.close();
+		         ps = null;
 		         
 		         ps = conn.prepareStatement(getSQLStatement("ROLLBACK_MESSAGE_REF2"));
 		         ps.setLong(1, tx.getId());
@@ -1690,6 +1744,31 @@
 		                  + " row(s)");
 		         }
 		         
+		         ps.close();
+		         ps = null;
+		         
+		         if (reaper == null)
+		         {			         
+			         Iterator iter = refsToAdd.iterator();
+			         
+			         while (iter.hasNext())
+			         {
+			         	ChannelRefPair pair = (ChannelRefPair)iter.next();
+			         	
+			         	if (ps == null)
+			         	{
+			         		ps = conn.prepareStatement(getSQLStatement("DELETE_MESSAGE"));
+			         	}
+			         		
+		         		ps.setLong(1, pair.ref.getMessage().getMessageID());
+		         		ps.setLong(2, pair.ref.getMessage().getMessageID());
+		         		
+		         		rows = ps.executeUpdate();
+		         		
+		         		if (trace) { log.trace("Deleted " + rows + " messages"); }		         			           
+			         }
+		         }
+		         
 		         removeTXRecord(conn, tx);
 		         
 		         return null;
@@ -1702,8 +1781,6 @@
    	}
    	
    	new HandleBeforeRollbackRunner().executeWithRetry();
-   	
-   	checkReap();
    }
    
    
@@ -2191,6 +2268,7 @@
       map.put("UPDATE_MESSAGE_4CONDITIONAL", "UPDATE JBM_MSG SET HEADERS=?, PAYLOAD=? WHERE MESSAGE_ID=?");
       map.put("MESSAGE_ID_COLUMN", "MESSAGE_ID");
       map.put("REAP_MESSAGES", "DELETE FROM JBM_MSG WHERE INS_TIME <= ? AND NOT EXISTS (SELECT * FROM JBM_MSG_REF WHERE JBM_MSG_REF.MESSAGE_ID = JBM_MSG.MESSAGE_ID)");
+      map.put("DELETE_MESSAGE", "DELETE FROM JBM_MSG WHERE MESSAGE_ID = ? AND NOT EXISTS (SELECT * FROM JBM_MSG_REF WHERE JBM_MSG_REF.MESSAGE_ID = ?)");
       //Transaction
       map.put("INSERT_TRANSACTION",
               "INSERT INTO JBM_TX (NODE_ID, TRANSACTION_ID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID) " +
@@ -2352,33 +2430,7 @@
       
       return order;
    }
-   
-   private void incrementReapCount()
-   {
-   	if (syncReapCount != null)
-   	{
-   		syncReapCount.incrementAndGet();
-   	}
-   }
-   
-   private void addToReapCount(int delta)
-   {
-   	if (syncReapCount != null)
-   	{
-   		syncReapCount.addAndGet(delta);
-   	}
-   }
-   
-   private synchronized void checkReap() throws Exception
-   {
-   	if (synchronousReapMessages > 0 && syncReapCount.get() >= synchronousReapMessages)
-   	{
-   		reapUnreferencedMessages();
-   		
-   		syncReapCount.set(0);
-   	}
-   }
-   
+      
    private void reapUnreferencedMessages(final long timestamp) throws Exception
    {
    	class ReaperRunner extends JDBCTxRunner
@@ -2510,7 +2562,7 @@
          }
          else
          {
-            handleBeforeCommit2PC(tx);
+            handleBeforeCommit2PC(refsToRemove, tx);
          }
       }
       

Modified: trunk/src/main/org/jboss/messaging/core/jmx/JDBCPersistenceManagerService.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/jmx/JDBCPersistenceManagerService.java	2007-10-19 19:07:01 UTC (rev 3212)
+++ trunk/src/main/org/jboss/messaging/core/jmx/JDBCPersistenceManagerService.java	2007-10-19 21:41:35 UTC (rev 3213)
@@ -55,8 +55,6 @@
    
    private long reaperPeriod = 5000;
    
-   private int synchronousReapMessages = 0;
-   
    private boolean supportsBlobOnSelect = true;
    
    // Constructors --------------------------------------------------------
@@ -87,21 +85,11 @@
       {  
          TransactionManager tm = getTransactionManagerReference();
          
-         if (reaperPeriod == 0 && synchronousReapMessages == 0)
-         {
-         	throw new IllegalArgumentException("One of reaperPeriod or synchronousReapMessage must be > 0");
-         }
-         
-         if (reaperPeriod > 0 && synchronousReapMessages > 0)
-         {
-         	throw new IllegalArgumentException("Only one of reaperPeriod or synchronousReapMessage can be > 0");
-         }
-         
          persistenceManager =
             new JDBCPersistenceManager(ds, tm, sqlProperties,
                                        createTablesOnStartup, usingBatchUpdates,
                                        usingBinaryStream, usingTrailingByte, maxParams, reaperPeriod,
-                                       synchronousReapMessages, supportsBlobOnSelect);
+                                       supportsBlobOnSelect);
          
          persistenceManager.start();
          
@@ -182,7 +170,7 @@
    {
    	if (reaperPeriod < 0)
    	{
-   		throw new IllegalArgumentException("reaperPeriod must be > 0");
+   		throw new IllegalArgumentException("reaperPeriod must be >= 0");
    	}
    	
    	this.reaperPeriod = reaperPeriod;
@@ -193,21 +181,6 @@
    	return reaperPeriod;
    }
    
-   public void setSynchronousReapMessages(int msgs)
-   {
-   	if (msgs < 0)
-   	{
-   		throw new IllegalArgumentException("synchronousReapMessages must be > 0");
-   	}
-   	
-   	this.synchronousReapMessages = msgs;
-   }
-   
-   public int getSynchronousReapMessages()
-   {
-   	return synchronousReapMessages;
-   }
-   
    public boolean isSupportsBlobOnSelect()
    {
    	return supportsBlobOnSelect;

Modified: trunk/tests/src/org/jboss/test/messaging/core/IdManagerTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/IdManagerTest.java	2007-10-19 19:07:01 UTC (rev 3212)
+++ trunk/tests/src/org/jboss/test/messaging/core/IdManagerTest.java	2007-10-19 21:41:35 UTC (rev 3213)
@@ -68,7 +68,7 @@
       pm =
          new JDBCPersistenceManager(sc.getDataSource(), sc.getTransactionManager(),
                   sc.getPersistenceManagerSQLProperties(),
-                  true, true, true, false, 100, 5000, 0, true);   
+                  true, true, true, false, 100, 5000, true);   
       ((JDBCPersistenceManager)pm).injectNodeID(1);
       pm.start();
       

Modified: trunk/tests/src/org/jboss/test/messaging/core/JDBCPersistenceManagerTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/JDBCPersistenceManagerTest.java	2007-10-19 19:07:01 UTC (rev 3212)
+++ trunk/tests/src/org/jboss/test/messaging/core/JDBCPersistenceManagerTest.java	2007-10-19 21:41:35 UTC (rev 3213)
@@ -99,7 +99,7 @@
       JDBCPersistenceManager p =
          new JDBCPersistenceManager(sc.getDataSource(), sc.getTransactionManager(),
                   sc.getPersistenceManagerSQLProperties(),
-                  true, batch, useBinaryStream, trailingByte, maxParams, 5000, 0, true);
+                  true, batch, useBinaryStream, trailingByte, maxParams, 5000, true);
       ((JDBCPersistenceManager)p).injectNodeID(1);
       p.start();
       return p;

Modified: trunk/tests/src/org/jboss/test/messaging/core/MessagingQueueTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/MessagingQueueTestBase.java	2007-10-19 19:07:01 UTC (rev 3212)
+++ trunk/tests/src/org/jboss/test/messaging/core/MessagingQueueTestBase.java	2007-10-19 21:41:35 UTC (rev 3213)
@@ -108,7 +108,7 @@
 
       pm = new JDBCPersistenceManager(sc.getDataSource(), sc.getTransactionManager(),
                                       sc.getPersistenceManagerSQLProperties(),
-                                      true, true, true, false, 100, 5000, 0, true);
+                                      true, true, true, false, 100, 5000, true);
       ((JDBCPersistenceManager)pm).injectNodeID(1);
       pm.start();
       

Modified: trunk/tests/src/org/jboss/test/messaging/core/PostOfficeTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/PostOfficeTestBase.java	2007-10-19 19:07:01 UTC (rev 3212)
+++ trunk/tests/src/org/jboss/test/messaging/core/PostOfficeTestBase.java	2007-10-19 21:41:35 UTC (rev 3213)
@@ -245,7 +245,7 @@
       pm =
          new JDBCPersistenceManager(sc.getDataSource(), sc.getTransactionManager(),
                   sc.getPersistenceManagerSQLProperties(),
-                  true, true, true, false, 100, 5000, 0, true);
+                  true, true, true, false, 100, 5000, true);
       ((JDBCPersistenceManager)pm).injectNodeID(1);
       pm.start();
 

Modified: trunk/tests/src/org/jboss/test/messaging/core/paging/PagingStateTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/paging/PagingStateTestBase.java	2007-10-19 19:07:01 UTC (rev 3212)
+++ trunk/tests/src/org/jboss/test/messaging/core/paging/PagingStateTestBase.java	2007-10-19 21:41:35 UTC (rev 3213)
@@ -93,7 +93,7 @@
       pm =
          new JDBCPersistenceManager(sc.getDataSource(), sc.getTransactionManager(),
                   sc.getPersistenceManagerSQLProperties(),
-                  true, true, true, false, 100, 5000, 0, true);  
+                  true, true, true, false, 100, 5000, true);  
       ((JDBCPersistenceManager)pm).injectNodeID(1);
       pm.start();
  

Modified: trunk/tests/src/org/jboss/test/messaging/core/paging/PagingTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/paging/PagingTest.java	2007-10-19 19:07:01 UTC (rev 3212)
+++ trunk/tests/src/org/jboss/test/messaging/core/paging/PagingTest.java	2007-10-19 21:41:35 UTC (rev 3213)
@@ -87,7 +87,7 @@
       pm =
          new JDBCPersistenceManager(sc.getDataSource(), sc.getTransactionManager(),
                   sc.getPersistenceManagerSQLProperties(),
-                  true, true, true, false, 100, 5000, 0, true);   
+                  true, true, true, false, 100, 5000, true);   
       ((JDBCPersistenceManager)pm).injectNodeID(1);
       pm.start();
             

Modified: trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_ReloadTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_ReloadTest.java	2007-10-19 19:07:01 UTC (rev 3212)
+++ trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_ReloadTest.java	2007-10-19 21:41:35 UTC (rev 3213)
@@ -112,7 +112,7 @@
       pm =
          new JDBCPersistenceManager(sc.getDataSource(), sc.getTransactionManager(),
                   sc.getPersistenceManagerSQLProperties(),
-                  true, true, true, false, 100, 5000, 0, true);   
+                  true, true, true, false, 100, 5000, true);   
       ((JDBCPersistenceManager)pm).injectNodeID(1);
       pm.start();
       
@@ -204,7 +204,7 @@
       pm =
          new JDBCPersistenceManager(sc.getDataSource(), sc.getTransactionManager(),
                   sc.getPersistenceManagerSQLProperties(),
-                  true, true, true, false, 100, 5000, 0, true); 
+                  true, true, true, false, 100, 5000, true); 
       ((JDBCPersistenceManager)pm).injectNodeID(1);
       pm.start();
       

Modified: trunk/tests/src/org/jboss/test/messaging/jms/persistence/MessagePersistenceManagerTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/persistence/MessagePersistenceManagerTest.java	2007-10-19 19:07:01 UTC (rev 3212)
+++ trunk/tests/src/org/jboss/test/messaging/jms/persistence/MessagePersistenceManagerTest.java	2007-10-19 21:41:35 UTC (rev 3213)
@@ -73,7 +73,7 @@
       JDBCPersistenceManager p =
          new JDBCPersistenceManager(sc.getDataSource(), sc.getTransactionManager(),
                   sc.getPersistenceManagerSQLProperties(),
-                  true, batch, true, false, maxParams, 5000, 0, true);      
+                  true, batch, true, false, maxParams, 5000, true);      
       ((JDBCPersistenceManager)pm).injectNodeID(1);
       p.start();
       return p;




More information about the jboss-cvs-commits mailing list