[jboss-cvs] JBoss Messaging SVN: r3213 - in trunk: src/etc/xmdesc and 5 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Oct 19 17:41:35 EDT 2007
Author: timfox
Date: 2007-10-19 17:41:35 -0400 (Fri, 19 Oct 2007)
New Revision: 3213
Modified:
trunk/src/etc/server/default/deploy/db2-persistence-service.xml
trunk/src/etc/server/default/deploy/mssql-persistence-service.xml
trunk/src/etc/server/default/deploy/mysql-persistence-service.xml
trunk/src/etc/server/default/deploy/mysqlcluster-persistence-service.xml
trunk/src/etc/server/default/deploy/oracle-persistence-service.xml
trunk/src/etc/server/default/deploy/postgresql-persistence-service.xml
trunk/src/etc/server/default/deploy/sybase-persistence-service.xml
trunk/src/etc/xmdesc/JDBCPersistenceManager-xmbean.xml
trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
trunk/src/main/org/jboss/messaging/core/jmx/JDBCPersistenceManagerService.java
trunk/tests/src/org/jboss/test/messaging/core/IdManagerTest.java
trunk/tests/src/org/jboss/test/messaging/core/JDBCPersistenceManagerTest.java
trunk/tests/src/org/jboss/test/messaging/core/MessagingQueueTestBase.java
trunk/tests/src/org/jboss/test/messaging/core/PostOfficeTestBase.java
trunk/tests/src/org/jboss/test/messaging/core/paging/PagingStateTestBase.java
trunk/tests/src/org/jboss/test/messaging/core/paging/PagingTest.java
trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_ReloadTest.java
trunk/tests/src/org/jboss/test/messaging/jms/persistence/MessagePersistenceManagerTest.java
Log:
More tweaks to reaping
Modified: trunk/src/etc/server/default/deploy/db2-persistence-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/db2-persistence-service.xml 2007-10-19 19:07:01 UTC (rev 3212)
+++ trunk/src/etc/server/default/deploy/db2-persistence-service.xml 2007-10-19 21:41:35 UTC (rev 3213)
@@ -67,6 +67,7 @@
INSERT_MESSAGE_CONDITIONAL_FULL=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE, INS_TIME, HEADERS, PAYLOAD) SELECT ?, ?, ?, ?, ?, ?, ?, ?, ? FROM JBM_DUAL WHERE NOT EXISTS (SELECT MESSAGE_ID FROM JBM_MSG WHERE MESSAGE_ID = ?)
MESSAGE_ID_COLUMN=MESSAGE_ID
REAP_MESSAGES=DELETE FROM JBM_MSG WHERE INS_TIME < ? AND NOT EXISTS (SELECT * FROM JBM_MSG_REF WHERE JBM_MSG_REF.MESSAGE_ID = JBM_MSG.MESSAGE_ID)
+ DELETE_MESSAGE=DELETE FROM JBM_MSG WHERE MESSAGE_ID = ? AND NOT EXISTS (SELECT * FROM JBM_MSG_REF WHERE JBM_MSG_REF.MESSAGE_ID = ?)
INSERT_TRANSACTION=INSERT INTO JBM_TX (NODE_ID, TRANSACTION_ID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID) VALUES(?, ?, ?, ?, ?)
DELETE_TRANSACTION=DELETE FROM JBM_TX WHERE NODE_ID = ? AND TRANSACTION_ID = ?
SELECT_PREPARED_TRANSACTIONS=SELECT TRANSACTION_ID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID FROM JBM_TX WHERE NODE_ID = ?
@@ -85,12 +86,8 @@
<!-- The period between asynchronous reaps of unreferenced messages, or zero for synchronous reaping -->
- <attribute name="ReaperPeriod">5000</attribute>
+ <attribute name="ReaperPeriod">0</attribute>
- <!-- The number of messages to do a synchronous reap after, or zero for asynchronous reaping -->
-
- <attribute name="SynchronousReapMessages">0</attribute>
-
</mbean>
<!-- Messaging Post Office MBean configuration
Modified: trunk/src/etc/server/default/deploy/mssql-persistence-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/mssql-persistence-service.xml 2007-10-19 19:07:01 UTC (rev 3212)
+++ trunk/src/etc/server/default/deploy/mssql-persistence-service.xml 2007-10-19 21:41:35 UTC (rev 3213)
@@ -70,6 +70,7 @@
INSERT_MESSAGE_CONDITIONAL_FULL=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE, INS_TIME, HEADERS, PAYLOAD) SELECT ?, ?, ?, ?, ?, ?, ?, ?, ? FROM JBM_DUAL WHERE NOT EXISTS (SELECT MESSAGE_ID FROM JBM_MSG WHERE MESSAGE_ID = ?)
MESSAGE_ID_COLUMN=MESSAGE_ID
REAP_MESSAGES=DELETE FROM JBM_MSG WHERE INS_TIME < ? AND NOT EXISTS (SELECT * FROM JBM_MSG_REF WHERE JBM_MSG_REF.MESSAGE_ID = JBM_MSG.MESSAGE_ID)
+ DELETE_MESSAGE=DELETE FROM JBM_MSG WHERE MESSAGE_ID = ? AND NOT EXISTS (SELECT * FROM JBM_MSG_REF WHERE JBM_MSG_REF.MESSAGE_ID = ?)
INSERT_TRANSACTION=INSERT INTO JBM_TX (NODE_ID, TRANSACTION_ID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID) VALUES(?, ?, ?, ?, ?)
DELETE_TRANSACTION=DELETE FROM JBM_TX WHERE NODE_ID = ? AND TRANSACTION_ID = ?
SELECT_PREPARED_TRANSACTIONS=SELECT TRANSACTION_ID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID FROM JBM_TX WHERE NODE_ID = ?
@@ -88,11 +89,7 @@
<!-- The period between asynchronous reaps of unreferenced messages, or zero for synchronous reaping -->
- <attribute name="ReaperPeriod">5000</attribute>
-
- <!-- The number of messages to do a synchronous reap after, or zero for asynchronous reaping -->
-
- <attribute name="SynchronousReapMessages">0</attribute>
+ <attribute name="ReaperPeriod">0</attribute>
</mbean>
<!-- Messaging Post Office MBean configuration
Modified: trunk/src/etc/server/default/deploy/mysql-persistence-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/mysql-persistence-service.xml 2007-10-19 19:07:01 UTC (rev 3212)
+++ trunk/src/etc/server/default/deploy/mysql-persistence-service.xml 2007-10-19 21:41:35 UTC (rev 3213)
@@ -70,6 +70,7 @@
INSERT_MESSAGE_CONDITIONAL_FULL=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE, INS_TIME, HEADERS, PAYLOAD) SELECT ?, ?, ?, ?, ?, ?, ?, ?, ? FROM JBM_DUAL WHERE NOT EXISTS (SELECT MESSAGE_ID FROM JBM_MSG WHERE MESSAGE_ID = ?)
MESSAGE_ID_COLUMN=MESSAGE_ID
REAP_MESSAGES=DELETE FROM JBM_MSG WHERE INS_TIME < ? AND NOT EXISTS (SELECT * FROM JBM_MSG_REF WHERE JBM_MSG_REF.MESSAGE_ID = JBM_MSG.MESSAGE_ID)
+ DELETE_MESSAGE=DELETE FROM JBM_MSG WHERE MESSAGE_ID = ? AND NOT EXISTS (SELECT * FROM JBM_MSG_REF WHERE JBM_MSG_REF.MESSAGE_ID = ?)
INSERT_TRANSACTION=INSERT INTO JBM_TX (NODE_ID, TRANSACTION_ID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID) VALUES(?, ?, ?, ?, ?)
DELETE_TRANSACTION=DELETE FROM JBM_TX WHERE NODE_ID = ? AND TRANSACTION_ID = ?
SELECT_PREPARED_TRANSACTIONS=SELECT TRANSACTION_ID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID FROM JBM_TX WHERE NODE_ID = ?
@@ -88,11 +89,7 @@
<!-- The period between asynchronous reaps of unreferenced messages, or zero for synchronous reaping -->
- <attribute name="ReaperPeriod">5000</attribute>
-
- <!-- The number of messages to do a synchronous reap after, or zero for asynchronous reaping -->
-
- <attribute name="SynchronousReapMessages">0</attribute>
+ <attribute name="ReaperPeriod">0</attribute>
</mbean>
<!-- Messaging Post Office MBean configuration
Modified: trunk/src/etc/server/default/deploy/mysqlcluster-persistence-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/mysqlcluster-persistence-service.xml 2007-10-19 19:07:01 UTC (rev 3212)
+++ trunk/src/etc/server/default/deploy/mysqlcluster-persistence-service.xml 2007-10-19 21:41:35 UTC (rev 3213)
@@ -70,6 +70,7 @@
INSERT_MESSAGE_CONDITIONAL_FULL=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE, INS_TIME, HEADERS, PAYLOAD) SELECT ?, ?, ?, ?, ?, ?, ?, ?, ? FROM JBM_DUAL WHERE NOT EXISTS (SELECT MESSAGE_ID FROM JBM_MSG WHERE MESSAGE_ID = ?)
MESSAGE_ID_COLUMN=MESSAGE_ID
REAP_MESSAGES=DELETE FROM JBM_MSG WHERE INS_TIME < ? AND NOT EXISTS (SELECT * FROM JBM_MSG_REF WHERE JBM_MSG_REF.MESSAGE_ID = JBM_MSG.MESSAGE_ID)
+ DELETE_MESSAGE=DELETE FROM JBM_MSG WHERE MESSAGE_ID = ? AND NOT EXISTS (SELECT * FROM JBM_MSG_REF WHERE JBM_MSG_REF.MESSAGE_ID = ?)
INSERT_TRANSACTION=INSERT INTO JBM_TX (NODE_ID, TRANSACTION_ID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID) VALUES(?, ?, ?, ?, ?)
DELETE_TRANSACTION=DELETE FROM JBM_TX WHERE NODE_ID = ? AND TRANSACTION_ID = ?
SELECT_PREPARED_TRANSACTIONS=SELECT TRANSACTION_ID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID FROM JBM_TX WHERE NODE_ID = ?
@@ -88,12 +89,8 @@
<!-- The period between asynchronous reaps of unreferenced messages, or zero for synchronous reaping -->
- <attribute name="ReaperPeriod">5000</attribute>
+ <attribute name="ReaperPeriod">0</attribute>
- <!-- The number of messages to do a synchronous reap after, or zero for asynchronous reaping -->
-
- <attribute name="SynchronousReapMessages">0</attribute>
-
</mbean>
<!-- Messaging Post Office MBean configuration
Modified: trunk/src/etc/server/default/deploy/oracle-persistence-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/oracle-persistence-service.xml 2007-10-19 19:07:01 UTC (rev 3212)
+++ trunk/src/etc/server/default/deploy/oracle-persistence-service.xml 2007-10-19 21:41:35 UTC (rev 3213)
@@ -74,6 +74,7 @@
INSERT_MESSAGE_CONDITIONAL_FULL=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE, INS_TIME, HEADERS, PAYLOAD) SELECT ?, ?, ?, ?, ?, ?, ?, ?, ? FROM JBM_DUAL WHERE NOT EXISTS (SELECT MESSAGE_ID FROM JBM_MSG WHERE MESSAGE_ID = ?)
MESSAGE_ID_COLUMN=MESSAGE_ID
REAP_MESSAGES=DELETE FROM JBM_MSG WHERE INS_TIME < ? AND NOT EXISTS (SELECT * FROM JBM_MSG_REF WHERE JBM_MSG_REF.MESSAGE_ID = JBM_MSG.MESSAGE_ID)
+ DELETE_MESSAGE=DELETE FROM JBM_MSG WHERE MESSAGE_ID = ? AND NOT EXISTS (SELECT * FROM JBM_MSG_REF WHERE JBM_MSG_REF.MESSAGE_ID = ?)
INSERT_TRANSACTION=INSERT INTO JBM_TX (NODE_ID, TRANSACTION_ID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID) VALUES(?, ?, ?, ?, ?)
DELETE_TRANSACTION=DELETE FROM JBM_TX WHERE NODE_ID = ? AND TRANSACTION_ID = ?
SELECT_PREPARED_TRANSACTIONS=SELECT TRANSACTION_ID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID FROM JBM_TX WHERE NODE_ID = ?
@@ -92,12 +93,7 @@
<!-- The period between asynchronous reaps of unreferenced messages, or zero for synchronous reaping -->
- <attribute name="ReaperPeriod">5000</attribute>
-
- <!-- The number of messages to do a synchronous reap after, or zero for asynchronous reaping -->
-
- <attribute name="SynchronousReapMessages">0</attribute>
-
+ <attribute name="ReaperPeriod">0</attribute>
</mbean>
<!-- Messaging Post Office MBean configuration
Modified: trunk/src/etc/server/default/deploy/postgresql-persistence-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/postgresql-persistence-service.xml 2007-10-19 19:07:01 UTC (rev 3212)
+++ trunk/src/etc/server/default/deploy/postgresql-persistence-service.xml 2007-10-19 21:41:35 UTC (rev 3213)
@@ -70,6 +70,7 @@
INSERT_MESSAGE_CONDITIONAL_FULL=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE, INS_TIME, HEADERS, PAYLOAD) SELECT ?, ?, ?, ?, ?, ?, ?, ?, ? FROM JBM_DUAL WHERE NOT EXISTS (SELECT MESSAGE_ID FROM JBM_MSG WHERE MESSAGE_ID = ?)
MESSAGE_ID_COLUMN=MESSAGE_ID
REAP_MESSAGES=DELETE FROM JBM_MSG WHERE INS_TIME < ? AND NOT EXISTS (SELECT * FROM JBM_MSG_REF WHERE JBM_MSG_REF.MESSAGE_ID = JBM_MSG.MESSAGE_ID)
+ DELETE_MESSAGE=DELETE FROM JBM_MSG WHERE MESSAGE_ID = ? AND NOT EXISTS (SELECT * FROM JBM_MSG_REF WHERE JBM_MSG_REF.MESSAGE_ID = ?)
INSERT_TRANSACTION=INSERT INTO JBM_TX (NODE_ID, TRANSACTION_ID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID) VALUES(?, ?, ?, ?, ?)
DELETE_TRANSACTION=DELETE FROM JBM_TX WHERE NODE_ID = ? AND TRANSACTION_ID = ?
SELECT_PREPARED_TRANSACTIONS=SELECT TRANSACTION_ID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID FROM JBM_TX WHERE NODE_ID = ?
@@ -88,11 +89,8 @@
<!-- The period between asynchronous reaps of unreferenced messages, or zero for synchronous reaping -->
- <attribute name="ReaperPeriod">5000</attribute>
+ <attribute name="ReaperPeriod">0</attribute>
- <!-- The number of messages to do a synchronous reap after, or zero for asynchronous reaping -->
-
- <attribute name="SynchronousReapMessages">0</attribute>
</mbean>
<!-- Messaging Post Office MBean configuration
Modified: trunk/src/etc/server/default/deploy/sybase-persistence-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/sybase-persistence-service.xml 2007-10-19 19:07:01 UTC (rev 3212)
+++ trunk/src/etc/server/default/deploy/sybase-persistence-service.xml 2007-10-19 21:41:35 UTC (rev 3213)
@@ -75,6 +75,7 @@
INSERT_MESSAGE_CONDITIONAL_FULL=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE, INS_TIME, HEADERS, PAYLOAD) SELECT ?, ?, ?, ?, ?, ?, ?, ?, ? FROM JBM_DUAL WHERE NOT EXISTS (SELECT MESSAGE_ID FROM JBM_MSG WHERE MESSAGE_ID = ?)
MESSAGE_ID_COLUMN=MESSAGE_ID
REAP_MESSAGES=DELETE FROM JBM_MSG WHERE INS_TIME < ? AND NOT EXISTS (SELECT * FROM JBM_MSG_REF WHERE JBM_MSG_REF.MESSAGE_ID = JBM_MSG.MESSAGE_ID)
+ DELETE_MESSAGE=DELETE FROM JBM_MSG WHERE MESSAGE_ID = ? AND NOT EXISTS (SELECT * FROM JBM_MSG_REF WHERE JBM_MSG_REF.MESSAGE_ID = ?)
INSERT_TRANSACTION=INSERT INTO JBM_TX (NODE_ID, TRANSACTION_ID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID) VALUES(?, ?, ?, ?, ?)
DELETE_TRANSACTION=DELETE FROM JBM_TX WHERE NODE_ID = ? AND TRANSACTION_ID = ?
SELECT_PREPARED_TRANSACTIONS=SELECT TRANSACTION_ID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID FROM JBM_TX WHERE NODE_ID = ?
@@ -93,11 +94,8 @@
<!-- The period between asynchronous reaps of unreferenced messages, or zero for synchronous reaping -->
- <attribute name="ReaperPeriod">5000</attribute>
+ <attribute name="ReaperPeriod">0</attribute>
- <!-- The number of messages to do a synchronous reap after, or zero for asynchronous reaping -->
-
- <attribute name="SynchronousReapMessages">0</attribute>
</mbean>
<!-- Messaging Post Office MBean configuration
Modified: trunk/src/etc/xmdesc/JDBCPersistenceManager-xmbean.xml
===================================================================
--- trunk/src/etc/xmdesc/JDBCPersistenceManager-xmbean.xml 2007-10-19 19:07:01 UTC (rev 3212)
+++ trunk/src/etc/xmdesc/JDBCPersistenceManager-xmbean.xml 2007-10-19 21:41:35 UTC (rev 3213)
@@ -71,12 +71,6 @@
<type>long</type>
</attribute>
- <attribute access="read-write" getMethod="getSynchronousReapMessages" setMethod="setSynchronousReapMessages">
- <description>The number of messages to synchronously force a reap after, or zero to reap asynchronously</description>
- <name>SynchronousReapMessages</name>
- <type>int</type>
- </attribute>
-
<attribute access="read-write" getMethod="isSupportsBlobOnSelect" setMethod="setSupportsBlobOnSelect">
<description>Some databases don't support binding blobs on select clauses</description>
<name>SupportsBlobOnSelect</name>
Modified: trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java 2007-10-19 19:07:01 UTC (rev 3212)
+++ trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java 2007-10-19 21:41:35 UTC (rev 3213)
@@ -44,7 +44,6 @@
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
-import java.util.concurrent.atomic.AtomicInteger;
import javax.sql.DataSource;
import javax.transaction.TransactionManager;
@@ -106,10 +105,6 @@
private boolean reaperRunning;
- private int synchronousReapMessages;
-
- private AtomicInteger syncReapCount;
-
// Some versions of the oracle driver don't support binding blobs on select clauses,
// what would force us to use a two stage insert (insert and if successful, update)
private boolean supportsBlobSelect;
@@ -119,7 +114,7 @@
public JDBCPersistenceManager(DataSource ds, TransactionManager tm, Properties sqlProperties,
boolean createTablesOnStartup, boolean usingBatchUpdates,
boolean usingBinaryStream, boolean usingTrailingByte, int maxParams,
- long reaperPeriod, int synchronousReapMessages, boolean supportsBlobSelect)
+ long reaperPeriod, boolean supportsBlobSelect)
{
super(ds, tm, sqlProperties, createTablesOnStartup);
@@ -133,19 +128,13 @@
this.reaperPeriod = reaperPeriod;
- this.synchronousReapMessages = synchronousReapMessages;
-
if (reaperPeriod > 0)
{
reaperTimer = new Timer(true);
reaper = new Reaper();
}
- else
- {
- syncReapCount = new AtomicInteger(0);
- }
-
+
this.supportsBlobSelect = supportsBlobSelect;
}
@@ -685,10 +674,17 @@
{
PreparedStatement psDeleteReference = null;
+ PreparedStatement psDeleteMessage = null;
+
try
{
psDeleteReference = conn.prepareStatement(getSQLStatement("DELETE_MESSAGE_REF"));
+ if (reaper == null)
+ {
+ psDeleteMessage = conn.prepareStatement(getSQLStatement("DELETE_MESSAGE"));
+ }
+
Iterator iter = references.iterator();
while (iter.hasNext())
@@ -699,9 +695,17 @@
int rows = psDeleteReference.executeUpdate();
- addToReapCount(rows);
-
- if (trace) { log.trace("Deleted " + rows + " rows"); }
+ if (trace) { log.trace("Deleted " + rows + " references"); }
+
+ if (reaper == null)
+ {
+ psDeleteMessage.setLong(1, ref.getMessage().getMessageID());
+ psDeleteMessage.setLong(2, ref.getMessage().getMessageID());
+
+ rows = psDeleteMessage.executeUpdate();
+
+ if (trace) { log.trace("Deleted " + rows + " messages"); }
+ }
}
return null;
@@ -709,13 +713,12 @@
finally
{
closeStatement(psDeleteReference);
+ closeStatement(psDeleteMessage);
}
}
}
- new RemoveDepagedReferencesRunner().executeWithRetry();
-
- checkReap();
+ new RemoveDepagedReferencesRunner().executeWithRetry();
}
// After loading paged refs this is used to update P messages to non paged
@@ -1174,6 +1177,11 @@
return (InitialLoadInfo)new MergeAndLoadRunner().executeWithRetry();
}
+ public void testSpeed() throws Exception
+ {
+
+ }
+
// End of paging functionality
// ===========================
@@ -1280,11 +1288,18 @@
public Object doTransaction() throws Exception
{
PreparedStatement psReference = null;
+
+ PreparedStatement psMessage = null;
try
{
psReference = conn.prepareStatement(getSQLStatement("DELETE_MESSAGE_REF"));
+ if (reaper == null)
+ {
+ psMessage = conn.prepareStatement(getSQLStatement("DELETE_MESSAGE"));
+ }
+
//Remove the message reference
removeReference(channelID, ref, psReference);
@@ -1296,15 +1311,24 @@
return null;
}
- if (trace) { log.trace("Deleted " + rows + " rows"); }
+ if (trace) { log.trace("Deleted " + rows + " references"); }
+
+ if (reaper == null)
+ {
+ psMessage.setLong(1, ref.getMessage().getMessageID());
+ psMessage.setLong(2, ref.getMessage().getMessageID());
+
+ rows = psMessage.executeUpdate();
+
+ if (trace) { log.trace("Deleted " + rows + " messages"); }
+ }
- incrementReapCount();
-
return null;
}
finally
{
closeStatement(psReference);
+ closeStatement(psMessage);
}
}
}
@@ -1321,9 +1345,7 @@
{
//No tx so we remove the reference directly from the db
- new RemoveReferenceRunner().executeWithRetry();
-
- checkReap();
+ new RemoveReferenceRunner().executeWithRetry();
}
}
@@ -1405,6 +1427,7 @@
PreparedStatement psReference = null;
PreparedStatement psInsertMessage = null;
PreparedStatement psDeleteReference = null;
+ PreparedStatement psDeleteMessage = null;
List<Message> messagesStored = new ArrayList<Message>();
@@ -1467,9 +1490,22 @@
int rows = psDeleteReference.executeUpdate();
- if (trace) { log.trace("Deleted " + rows + " rows"); }
+ if (trace) { log.trace("Deleted " + rows + " references"); }
- incrementReapCount();
+ if (reaper == null)
+ {
+ if (psDeleteMessage == null)
+ {
+ psDeleteMessage = conn.prepareStatement(getSQLStatement("DELETE_MESSAGE"));
+ }
+
+ psDeleteMessage.setLong(1, pair.ref.getMessage().getMessageID());
+ psDeleteMessage.setLong(2, pair.ref.getMessage().getMessageID());
+
+ rows = psDeleteMessage.executeUpdate();
+
+ if (trace) { log.trace("Deleted " + rows + " messages"); }
+ }
}
return null;
@@ -1489,15 +1525,14 @@
closeStatement(psReference);
closeStatement(psDeleteReference);
closeStatement(psInsertMessage);
+ closeStatement(psDeleteMessage);
}
}
}
- new HandleBeforeCommit1PCRunner().executeWithRetry();
-
- checkReap();
+ new HandleBeforeCommit1PCRunner().executeWithRetry();
}
- protected void handleBeforeCommit2PC(final Transaction tx) throws Exception
+ protected void handleBeforeCommit2PC(final List refsToRemove, final Transaction tx) throws Exception
{
class HandleBeforeCommit2PCRunner extends JDBCTxRunner
{
@@ -1519,17 +1554,39 @@
ps.close();
ps = null;
-
-
+
ps = conn.prepareStatement(getSQLStatement("COMMIT_MESSAGE_REF2"));
ps.setLong(1, tx.getId());
rows = ps.executeUpdate();
- addToReapCount(rows);
-
if (trace) { log.trace(JDBCUtil.statementToString(getSQLStatement("COMMIT_MESSAGE_REF2"), new Long(tx.getId())) + " updated " + rows + " row(s)"); }
+ ps.close();
+ ps = null;
+
+ if (reaper == null)
+ {
+ Iterator iter = refsToRemove.iterator();
+
+ while (iter.hasNext())
+ {
+ ChannelRefPair pair = (ChannelRefPair)iter.next();
+
+ if (ps == null)
+ {
+ ps = conn.prepareStatement(getSQLStatement("DELETE_MESSAGE"));
+ }
+
+ ps.setLong(1, pair.ref.getMessage().getMessageID());
+ ps.setLong(2, pair.ref.getMessage().getMessageID());
+
+ rows = ps.executeUpdate();
+
+ if (trace) { log.trace("Deleted " + rows + " messages"); }
+ }
+ }
+
removeTXRecord(conn, tx);
return null;
@@ -1542,8 +1599,6 @@
}
new HandleBeforeCommit2PCRunner().executeWithRetry();
-
- checkReap();
}
protected void handleBeforePrepare(final List refsToAdd, final List refsToRemove, final Transaction tx) throws Exception
@@ -1670,14 +1725,13 @@
int rows = ps.executeUpdate();
- addToReapCount(rows);
-
if (trace)
{
log.trace(JDBCUtil.statementToString(getSQLStatement("ROLLBACK_MESSAGE_REF1"), new Long(tx.getId())) + " removed " + rows + " row(s)");
}
ps.close();
+ ps = null;
ps = conn.prepareStatement(getSQLStatement("ROLLBACK_MESSAGE_REF2"));
ps.setLong(1, tx.getId());
@@ -1690,6 +1744,31 @@
+ " row(s)");
}
+ ps.close();
+ ps = null;
+
+ if (reaper == null)
+ {
+ Iterator iter = refsToAdd.iterator();
+
+ while (iter.hasNext())
+ {
+ ChannelRefPair pair = (ChannelRefPair)iter.next();
+
+ if (ps == null)
+ {
+ ps = conn.prepareStatement(getSQLStatement("DELETE_MESSAGE"));
+ }
+
+ ps.setLong(1, pair.ref.getMessage().getMessageID());
+ ps.setLong(2, pair.ref.getMessage().getMessageID());
+
+ rows = ps.executeUpdate();
+
+ if (trace) { log.trace("Deleted " + rows + " messages"); }
+ }
+ }
+
removeTXRecord(conn, tx);
return null;
@@ -1702,8 +1781,6 @@
}
new HandleBeforeRollbackRunner().executeWithRetry();
-
- checkReap();
}
@@ -2191,6 +2268,7 @@
map.put("UPDATE_MESSAGE_4CONDITIONAL", "UPDATE JBM_MSG SET HEADERS=?, PAYLOAD=? WHERE MESSAGE_ID=?");
map.put("MESSAGE_ID_COLUMN", "MESSAGE_ID");
map.put("REAP_MESSAGES", "DELETE FROM JBM_MSG WHERE INS_TIME <= ? AND NOT EXISTS (SELECT * FROM JBM_MSG_REF WHERE JBM_MSG_REF.MESSAGE_ID = JBM_MSG.MESSAGE_ID)");
+ map.put("DELETE_MESSAGE", "DELETE FROM JBM_MSG WHERE MESSAGE_ID = ? AND NOT EXISTS (SELECT * FROM JBM_MSG_REF WHERE JBM_MSG_REF.MESSAGE_ID = ?)");
//Transaction
map.put("INSERT_TRANSACTION",
"INSERT INTO JBM_TX (NODE_ID, TRANSACTION_ID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID) " +
@@ -2352,33 +2430,7 @@
return order;
}
-
- private void incrementReapCount()
- {
- if (syncReapCount != null)
- {
- syncReapCount.incrementAndGet();
- }
- }
-
- private void addToReapCount(int delta)
- {
- if (syncReapCount != null)
- {
- syncReapCount.addAndGet(delta);
- }
- }
-
- private synchronized void checkReap() throws Exception
- {
- if (synchronousReapMessages > 0 && syncReapCount.get() >= synchronousReapMessages)
- {
- reapUnreferencedMessages();
-
- syncReapCount.set(0);
- }
- }
-
+
private void reapUnreferencedMessages(final long timestamp) throws Exception
{
class ReaperRunner extends JDBCTxRunner
@@ -2510,7 +2562,7 @@
}
else
{
- handleBeforeCommit2PC(tx);
+ handleBeforeCommit2PC(refsToRemove, tx);
}
}
Modified: trunk/src/main/org/jboss/messaging/core/jmx/JDBCPersistenceManagerService.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/jmx/JDBCPersistenceManagerService.java 2007-10-19 19:07:01 UTC (rev 3212)
+++ trunk/src/main/org/jboss/messaging/core/jmx/JDBCPersistenceManagerService.java 2007-10-19 21:41:35 UTC (rev 3213)
@@ -55,8 +55,6 @@
private long reaperPeriod = 5000;
- private int synchronousReapMessages = 0;
-
private boolean supportsBlobOnSelect = true;
// Constructors --------------------------------------------------------
@@ -87,21 +85,11 @@
{
TransactionManager tm = getTransactionManagerReference();
- if (reaperPeriod == 0 && synchronousReapMessages == 0)
- {
- throw new IllegalArgumentException("One of reaperPeriod or synchronousReapMessage must be > 0");
- }
-
- if (reaperPeriod > 0 && synchronousReapMessages > 0)
- {
- throw new IllegalArgumentException("Only one of reaperPeriod or synchronousReapMessage can be > 0");
- }
-
persistenceManager =
new JDBCPersistenceManager(ds, tm, sqlProperties,
createTablesOnStartup, usingBatchUpdates,
usingBinaryStream, usingTrailingByte, maxParams, reaperPeriod,
- synchronousReapMessages, supportsBlobOnSelect);
+ supportsBlobOnSelect);
persistenceManager.start();
@@ -182,7 +170,7 @@
{
if (reaperPeriod < 0)
{
- throw new IllegalArgumentException("reaperPeriod must be > 0");
+ throw new IllegalArgumentException("reaperPeriod must be >= 0");
}
this.reaperPeriod = reaperPeriod;
@@ -193,21 +181,6 @@
return reaperPeriod;
}
- public void setSynchronousReapMessages(int msgs)
- {
- if (msgs < 0)
- {
- throw new IllegalArgumentException("synchronousReapMessages must be > 0");
- }
-
- this.synchronousReapMessages = msgs;
- }
-
- public int getSynchronousReapMessages()
- {
- return synchronousReapMessages;
- }
-
public boolean isSupportsBlobOnSelect()
{
return supportsBlobOnSelect;
Modified: trunk/tests/src/org/jboss/test/messaging/core/IdManagerTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/IdManagerTest.java 2007-10-19 19:07:01 UTC (rev 3212)
+++ trunk/tests/src/org/jboss/test/messaging/core/IdManagerTest.java 2007-10-19 21:41:35 UTC (rev 3213)
@@ -68,7 +68,7 @@
pm =
new JDBCPersistenceManager(sc.getDataSource(), sc.getTransactionManager(),
sc.getPersistenceManagerSQLProperties(),
- true, true, true, false, 100, 5000, 0, true);
+ true, true, true, false, 100, 5000, true);
((JDBCPersistenceManager)pm).injectNodeID(1);
pm.start();
Modified: trunk/tests/src/org/jboss/test/messaging/core/JDBCPersistenceManagerTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/JDBCPersistenceManagerTest.java 2007-10-19 19:07:01 UTC (rev 3212)
+++ trunk/tests/src/org/jboss/test/messaging/core/JDBCPersistenceManagerTest.java 2007-10-19 21:41:35 UTC (rev 3213)
@@ -99,7 +99,7 @@
JDBCPersistenceManager p =
new JDBCPersistenceManager(sc.getDataSource(), sc.getTransactionManager(),
sc.getPersistenceManagerSQLProperties(),
- true, batch, useBinaryStream, trailingByte, maxParams, 5000, 0, true);
+ true, batch, useBinaryStream, trailingByte, maxParams, 5000, true);
((JDBCPersistenceManager)p).injectNodeID(1);
p.start();
return p;
Modified: trunk/tests/src/org/jboss/test/messaging/core/MessagingQueueTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/MessagingQueueTestBase.java 2007-10-19 19:07:01 UTC (rev 3212)
+++ trunk/tests/src/org/jboss/test/messaging/core/MessagingQueueTestBase.java 2007-10-19 21:41:35 UTC (rev 3213)
@@ -108,7 +108,7 @@
pm = new JDBCPersistenceManager(sc.getDataSource(), sc.getTransactionManager(),
sc.getPersistenceManagerSQLProperties(),
- true, true, true, false, 100, 5000, 0, true);
+ true, true, true, false, 100, 5000, true);
((JDBCPersistenceManager)pm).injectNodeID(1);
pm.start();
Modified: trunk/tests/src/org/jboss/test/messaging/core/PostOfficeTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/PostOfficeTestBase.java 2007-10-19 19:07:01 UTC (rev 3212)
+++ trunk/tests/src/org/jboss/test/messaging/core/PostOfficeTestBase.java 2007-10-19 21:41:35 UTC (rev 3213)
@@ -245,7 +245,7 @@
pm =
new JDBCPersistenceManager(sc.getDataSource(), sc.getTransactionManager(),
sc.getPersistenceManagerSQLProperties(),
- true, true, true, false, 100, 5000, 0, true);
+ true, true, true, false, 100, 5000, true);
((JDBCPersistenceManager)pm).injectNodeID(1);
pm.start();
Modified: trunk/tests/src/org/jboss/test/messaging/core/paging/PagingStateTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/paging/PagingStateTestBase.java 2007-10-19 19:07:01 UTC (rev 3212)
+++ trunk/tests/src/org/jboss/test/messaging/core/paging/PagingStateTestBase.java 2007-10-19 21:41:35 UTC (rev 3213)
@@ -93,7 +93,7 @@
pm =
new JDBCPersistenceManager(sc.getDataSource(), sc.getTransactionManager(),
sc.getPersistenceManagerSQLProperties(),
- true, true, true, false, 100, 5000, 0, true);
+ true, true, true, false, 100, 5000, true);
((JDBCPersistenceManager)pm).injectNodeID(1);
pm.start();
Modified: trunk/tests/src/org/jboss/test/messaging/core/paging/PagingTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/paging/PagingTest.java 2007-10-19 19:07:01 UTC (rev 3212)
+++ trunk/tests/src/org/jboss/test/messaging/core/paging/PagingTest.java 2007-10-19 21:41:35 UTC (rev 3213)
@@ -87,7 +87,7 @@
pm =
new JDBCPersistenceManager(sc.getDataSource(), sc.getTransactionManager(),
sc.getPersistenceManagerSQLProperties(),
- true, true, true, false, 100, 5000, 0, true);
+ true, true, true, false, 100, 5000, true);
((JDBCPersistenceManager)pm).injectNodeID(1);
pm.start();
Modified: trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_ReloadTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_ReloadTest.java 2007-10-19 19:07:01 UTC (rev 3212)
+++ trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_ReloadTest.java 2007-10-19 21:41:35 UTC (rev 3213)
@@ -112,7 +112,7 @@
pm =
new JDBCPersistenceManager(sc.getDataSource(), sc.getTransactionManager(),
sc.getPersistenceManagerSQLProperties(),
- true, true, true, false, 100, 5000, 0, true);
+ true, true, true, false, 100, 5000, true);
((JDBCPersistenceManager)pm).injectNodeID(1);
pm.start();
@@ -204,7 +204,7 @@
pm =
new JDBCPersistenceManager(sc.getDataSource(), sc.getTransactionManager(),
sc.getPersistenceManagerSQLProperties(),
- true, true, true, false, 100, 5000, 0, true);
+ true, true, true, false, 100, 5000, true);
((JDBCPersistenceManager)pm).injectNodeID(1);
pm.start();
Modified: trunk/tests/src/org/jboss/test/messaging/jms/persistence/MessagePersistenceManagerTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/persistence/MessagePersistenceManagerTest.java 2007-10-19 19:07:01 UTC (rev 3212)
+++ trunk/tests/src/org/jboss/test/messaging/jms/persistence/MessagePersistenceManagerTest.java 2007-10-19 21:41:35 UTC (rev 3213)
@@ -73,7 +73,7 @@
JDBCPersistenceManager p =
new JDBCPersistenceManager(sc.getDataSource(), sc.getTransactionManager(),
sc.getPersistenceManagerSQLProperties(),
- true, batch, true, false, maxParams, 5000, 0, true);
+ true, batch, true, false, maxParams, 5000, true);
((JDBCPersistenceManager)pm).injectNodeID(1);
p.start();
return p;
More information about the jboss-cvs-commits
mailing list