[jboss-cvs] JBoss Messaging SVN: r3022 - in trunk: src/main/org/jboss/jms/server/endpoint and 10 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Aug 21 17:07:50 EDT 2007
Author: timfox
Date: 2007-08-21 17:07:50 -0400 (Tue, 21 Aug 2007)
New Revision: 3022
Modified:
trunk/src/etc/server/default/deploy/mysql-persistence-service.xml
trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
trunk/src/main/org/jboss/messaging/core/contract/Message.java
trunk/src/main/org/jboss/messaging/core/contract/PersistenceManager.java
trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
trunk/src/main/org/jboss/messaging/core/impl/PagingChannelSupport.java
trunk/src/main/org/jboss/messaging/core/impl/message/MessageSupport.java
trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
trunk/src/main/org/jboss/messaging/core/impl/tx/TransactionRepository.java
trunk/src/main/org/jboss/messaging/util/LockMap.java
trunk/tests/src/org/jboss/test/messaging/core/JDBCPersistenceManagerTest.java
trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_NP_2PCTest.java
trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_NP_NTTest.java
trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_NP_TTest.java
trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_P_2PCTest.java
trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_P_NTTest.java
trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_P_TTest.java
trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_NP_2PCTest.java
trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_NP_NTTest.java
trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_NP_TTest.java
trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_P_2PCTest.java
trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_P_NTTest.java
trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_P_TTest.java
trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_ReloadTest.java
trunk/tests/src/org/jboss/test/messaging/jms/AcknowledgementTest.java
trunk/tests/src/org/jboss/test/messaging/jms/JMSTestCase.java
trunk/tests/src/org/jboss/test/messaging/jms/MessageCleanupTest.java
trunk/tests/src/org/jboss/test/messaging/jms/XARecoveryTest.java
trunk/tests/src/org/jboss/test/messaging/jms/XATest.java
trunk/tests/src/org/jboss/test/messaging/tools/container/LocalTestServer.java
trunk/tests/src/org/jboss/test/messaging/tools/container/RMITestServer.java
trunk/tests/src/org/jboss/test/messaging/tools/container/Server.java
Log:
Persistence Manager changes interim commit
Modified: trunk/src/etc/server/default/deploy/mysql-persistence-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/mysql-persistence-service.xml 2007-08-21 16:24:12 UTC (rev 3021)
+++ trunk/src/etc/server/default/deploy/mysql-persistence-service.xml 2007-08-21 21:07:50 UTC (rev 3022)
@@ -41,6 +41,7 @@
CREATE_IDX_MESSAGE_REF_MESSAGE_ID=CREATE INDEX JBM_MSG_REF_MESSAGE_ID ON JBM_MSG_REF (MESSAGE_ID)
CREATE_IDX_MESSAGE_REF_SCHED_DELIVERY=CREATE INDEX JBM_MSG_REF_SCHED_DELIVERY ON JBM_MSG_REF (SCHED_DELIVERY)
CREATE_MESSAGE=CREATE TABLE JBM_MSG (MESSAGE_ID BIGINT, RELIABLE CHAR(1), EXPIRATION BIGINT, TIMESTAMP BIGINT, PRIORITY TINYINT, HEADERS MEDIUMBLOB, PAYLOAD LONGBLOB, TYPE TINYINT, PRIMARY KEY (MESSAGE_ID)) ENGINE = INNODB
+ CREATE_IDX_MESSAGE_TIMESTAMP=CREATE INDEX JBM_MSG_REF_TIMESTAMP ON JBM_MSG (TIMESTAMP)
CREATE_TRANSACTION=CREATE TABLE JBM_TX (NODE_ID INTEGER, TRANSACTION_ID BIGINT, BRANCH_QUAL VARBINARY(254), FORMAT_ID INTEGER, GLOBAL_TXID VARBINARY(254), PRIMARY KEY (TRANSACTION_ID)) ENGINE = INNODB
CREATE_COUNTER=CREATE TABLE JBM_COUNTER (NAME VARCHAR(255), NEXT_ID BIGINT, PRIMARY KEY(NAME)) ENGINE = INNODB
INSERT_MESSAGE_REF=INSERT INTO JBM_MSG_REF (CHANNEL_ID, MESSAGE_ID, TRANSACTION_ID, STATE, ORD, PAGE_ORD, DELIVERY_COUNT, SCHED_DELIVERY) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
@@ -60,11 +61,10 @@
UPDATE_DELIVERY_COUNT=UPDATE JBM_MSG_REF SET DELIVERY_COUNT = ? WHERE CHANNEL_ID = ? AND MESSAGE_ID = ?
UPDATE_CHANNEL_ID=UPDATE JBM_MSG_REF SET CHANNEL_ID = ? WHERE CHANNEL_ID = ?
LOAD_MESSAGES=SELECT MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, HEADERS, PAYLOAD, TYPE FROM JBM_MSG
- INSERT_MESSAGE=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, HEADERS, PAYLOAD, TYPE) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
- DELETE_MESSAGE=DELETE FROM JBM_MSG WHERE MESSAGE_ID=?
- DELETE_PAGED_MESSAGE=DELETE FROM JBM_MSG WHERE MESSAGE_ID=? AND NOT EXISTS (SELECT * FROM JBM_MSG_REF WHERE MESSAGE_ID = ?)
+ INSERT_MESSAGE=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, HEADERS, PAYLOAD, TYPE) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
MESSAGE_ID_COLUMN=MESSAGE_ID
MESSAGE_EXISTS=SELECT MESSAGE_ID FROM JBM_MSG WHERE MESSAGE_ID = ? FOR UPDATE
+ REAP_MESSAGES=DELETE FROM JBM_MSG WHERE TIMESTAMP < ? AND NOT EXISTS (SELECT * FROM JBM_MSG_REF WHERE JBM_MSG_REF.MESSAGE_ID = JBM_MSG.MESSAGE_ID)
INSERT_TRANSACTION=INSERT INTO JBM_TX (NODE_ID, TRANSACTION_ID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID) VALUES(?, ?, ?, ?, ?)
DELETE_TRANSACTION=DELETE FROM JBM_TX WHERE NODE_ID = ? AND TRANSACTION_ID = ?
SELECT_PREPARED_TRANSACTIONS=SELECT TRANSACTION_ID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID FROM JBM_TX WHERE NODE_ID = ?
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2007-08-21 16:24:12 UTC (rev 3021)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2007-08-21 21:07:50 UTC (rev 3022)
@@ -1724,7 +1724,6 @@
{
if (queue != null)
{
- msg.setPersistentCount(1);
queue.handle(null, ref, tx);
del.acknowledge(tx);
}
Modified: trunk/src/main/org/jboss/messaging/core/contract/Message.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/contract/Message.java 2007-08-21 16:24:12 UTC (rev 3021)
+++ trunk/src/main/org/jboss/messaging/core/contract/Message.java 2007-08-21 21:07:50 UTC (rev 3022)
@@ -149,14 +149,6 @@
*/
MessageReference createReference();
- void setPersistentCount(int count);
-
- int getPersistentCount();
-
- void incrementPersistentCount();
-
- void decrementPersistentCount();
-
boolean isPersisted();
void setPersisted(boolean persisted);
Modified: trunk/src/main/org/jboss/messaging/core/contract/PersistenceManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/contract/PersistenceManager.java 2007-08-21 16:24:12 UTC (rev 3021)
+++ trunk/src/main/org/jboss/messaging/core/contract/PersistenceManager.java 2007-08-21 21:07:50 UTC (rev 3022)
@@ -39,11 +39,13 @@
*/
public interface PersistenceManager extends MessagingComponent
{
- void setPaging(long channelID, boolean paging);
-
- boolean isPaging();
+ void startReaper();
+ void stopReaper();
+ void reapUnreferencedMessages() throws Exception;
+
+
void addReference(long channelID, MessageReference ref, Transaction tx) throws Exception;
void removeReference(long channelID, MessageReference ref, Transaction tx) throws Exception;
Modified: trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java 2007-08-21 16:24:12 UTC (rev 3021)
+++ trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java 2007-08-21 21:07:50 UTC (rev 3022)
@@ -43,6 +43,8 @@
import java.util.Map;
import java.util.Properties;
import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
import javax.sql.DataSource;
import javax.transaction.TransactionManager;
@@ -101,9 +103,14 @@
private boolean nodeIDSet;
- private Set pagingSet = new HashSet();
-
-
+ private Timer reaperTimer;
+
+ private Reaper reaper;
+
+ private long reaperPeriod;
+
+ private boolean reaperRunning;
+
// Constructors --------------------------------------------------
public JDBCPersistenceManager(DataSource ds, TransactionManager tm, Properties sqlProperties,
@@ -118,7 +125,13 @@
this.usingTrailingByte = usingTrailingByte;
- this.maxParams = maxParams;
+ this.maxParams = maxParams;
+
+ this.reaperPeriod = 5000;
+
+ reaperTimer = new Timer(true);
+
+ reaper = new Reaper();
}
@@ -169,6 +182,10 @@
public void stop() throws Exception
{
super.stop();
+
+ reaperTimer.cancel();
+
+ stopReaper();
}
// Injection -------------------------------------------------
@@ -184,24 +201,36 @@
// PersistenceManager implementation -------------------------
- public synchronized void setPaging(long channelID, boolean paging)
+ public synchronized void startReaper()
{
- Long l = new Long(channelID);
- if (paging)
+ if (reaperRunning)
{
- pagingSet.add(l);
+ return;
}
- else
+ if (reaperPeriod != -1)
{
- pagingSet.remove(l);
+ reaperTimer.schedule(reaper, reaperPeriod, reaperPeriod);
+
+ reaperRunning = true;
}
}
- public synchronized boolean isPaging()
+ public synchronized void stopReaper()
{
- return !pagingSet.isEmpty();
+ if (!reaperRunning)
+ {
+ return;
+ }
+ reaper.cancel();
+
+ reaperRunning = false;
}
+ public void reapUnreferencedMessages() throws Exception
+ {
+ reapUnreferencedMessages(System.currentTimeMillis());
+ }
+
// Related to XA Recovery
// ======================
@@ -504,22 +533,23 @@
// ===============================
//Used to page NP messages or P messages in a non recoverable queue
- public synchronized void pageReferences(long channelID, List references, boolean page) throws Exception
+
+ public void pageReferences(long channelID, List references, boolean page) throws Exception
{
Connection conn = null;
PreparedStatement psInsertReference = null;
PreparedStatement psInsertMessage = null;
- PreparedStatement psMessageExists = null;
+ // PreparedStatement psMessageExists = null;
TransactionWrapper wrap = new TransactionWrapper();
//First we order the references in message order
- orderReferences(references);
+ //orderReferences(references);
try
{
- //Now we get a lock on all the messages. Since we have ordered the refs we should avoid deadlock
- getLocks(references);
-
+// //Now we get a lock on all the messages. Since we have ordered the refs we should avoid deadlock
+// getLocks(references);
+//
conn = ds.getConnection();
Iterator iter = references.iterator();
@@ -527,7 +557,7 @@
psInsertReference = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE_REF"));
psInsertMessage = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE"));
- boolean insertsInBatch = false;
+ // boolean insertsInBatch = false;
while (iter.hasNext())
{
@@ -539,82 +569,87 @@
//Now store the reference
addReference(channelID, ref, psInsertReference, page);
- if (usingBatchUpdates)
- {
- psInsertReference.addBatch();
- }
- else
- {
+// if (usingBatchUpdates)
+// {
+// psInsertReference.addBatch();
+// }
+// else
+// {
int rows = executeWithRetry(psInsertReference);
if (trace)
{
log.trace("Inserted " + rows + " rows");
}
- }
+ //}
//Maybe we need to persist the message itself
Message m = ref.getMessage();
+
+ synchronized (m)
+ {
if (!m.isPersisted())
{
//The message might actually already exist due to it already being paged
+ //so we insert and ignore key violations
- if (psMessageExists == null)
- {
- psMessageExists = conn.prepareStatement(getSQLStatement("MESSAGE_EXISTS"));
- }
-
- psMessageExists.setLong(1, m.getMessageID());
-
- ResultSet rs = null;
-
- try
- {
- rs = psMessageExists.executeQuery();
-
- if (!rs.next())
- {
+// if (psMessageExists == null)
+// {
+// psMessageExists = conn.prepareStatement(getSQLStatement("MESSAGE_EXISTS"));
+// }
+//
+// psMessageExists.setLong(1, m.getMessageID());
+//
+// ResultSet rs = null;
+//
+// try
+// {
+// rs = psMessageExists.executeQuery();
+//
+// if (!rs.next())
+// {
storeMessage(m, psInsertMessage);
- if (usingBatchUpdates)
- {
- psInsertMessage.addBatch();
-
- insertsInBatch = true;
- }
- else
- {
- int rows = executeWithRetry(psInsertMessage);
+// if (usingBatchUpdates)
+// {
+// psInsertMessage.addBatch();
+//
+// insertsInBatch = true;
+// }
+// else
+// {
+ rows = executeWithRetryIgnoreKeyViolation(psInsertMessage);
if (trace) { log.trace("Inserted " + rows + " rows"); }
- }
+ // }
m.setPersisted(true);
}
- }
- finally
- {
- if (rs != null)
- {
- rs.close();
- }
- }
- }
+// }
+// finally
+// {
+// if (rs != null)
+// {
+// rs.close();
+// }
+// }
+ // }
+ }
}
- if (usingBatchUpdates)
- {
- int[] rowsReference = executeWithRetryBatch(psInsertReference);
-
- if (trace) { logBatchUpdate(getSQLStatement("INSERT_MESSAGE_REF"), rowsReference, "inserted"); }
-
- if (insertsInBatch)
- {
- int[] rowsMessage = executeWithRetryBatch(psInsertMessage);
-
- if (trace) { logBatchUpdate(getSQLStatement("INSERT_MESSAGE"), rowsMessage, "inserted"); }
- }
- }
+// if (usingBatchUpdates)
+// {
+// int[] rowsReference = executeWithRetryBatch(psInsertReference);
+//
+// if (trace) { logBatchUpdate(getSQLStatement("INSERT_MESSAGE_REF"), rowsReference, "inserted"); }
+//
+// if (insertsInBatch)
+// {
+// int[] rowsMessage = executeWithRetryBatch(psInsertMessage);
+//
+// if (trace) { logBatchUpdate(getSQLStatement("INSERT_MESSAGE"), rowsMessage, "inserted"); }
+// }
+// }
}
catch (Exception e)
{
@@ -625,17 +660,17 @@
{
closeStatement(psInsertReference);
closeStatement(psInsertMessage);
- closeStatement(psMessageExists);
+ // closeStatement(psMessageExists);
closeConnection(conn);
- try
- {
+ // try
+ // {
wrap.end();
- }
- finally
- {
- //And then release locks
- this.releaseLocks(references);
- }
+// }
+// finally
+// {
+// //And then release locks
+// this.releaseLocks(references);
+// }
}
}
@@ -646,24 +681,22 @@
Connection conn = null;
PreparedStatement psDeleteReference = null;
- PreparedStatement psDeleteMessage = null;
TransactionWrapper wrap = new TransactionWrapper();
//We order the references
- orderReferences(references);
+ // orderReferences(references);
try
{
//We get locks on all the messages - since they are ordered we avoid deadlock
- getLocks(references);
+ // getLocks(references);
conn = ds.getConnection();
Iterator iter = references.iterator();
psDeleteReference = conn.prepareStatement(getSQLStatement("DELETE_MESSAGE_REF"));
- psDeleteMessage = conn.prepareStatement(getSQLStatement("DELETE_PAGED_MESSAGE"));
-
+
while (iter.hasNext())
{
MessageReference ref = (MessageReference) iter.next();
@@ -680,38 +713,18 @@
if (trace) { log.trace("Deleted " + rows + " rows"); }
}
-
- Message m = ref.getMessage();
-
- //Maybe we need to delete the message itself
-
- psDeleteMessage.setLong(1, m.getMessageID());
- psDeleteMessage.setLong(2, m.getMessageID());
-
- if (usingBatchUpdates)
- {
- psDeleteMessage.addBatch();
- }
- else
- {
- int rows = executeWithRetry(psDeleteMessage);
-
- if (trace) { log.trace("Deleted " + rows + " rows"); }
- }
-
- ref.getMessage().setPersisted(false);
-
+
+ //There is a small possibility that the ref is depaged here, then paged again, before this flag is set
+ //and the tx is committed so the message be attempted to be inserted twice but this should be ok
+ //since we ignore key violations on message insert
+ ref.getMessage().setPersisted(false);
}
if (usingBatchUpdates)
{
int[] rowsReference = executeWithRetryBatch(psDeleteReference);
- if (trace) { logBatchUpdate(getSQLStatement("DELETE_MESSAGE_REF"), rowsReference, "deleted"); }
-
- rowsReference = executeWithRetryBatch(psDeleteMessage);
-
- if (trace) { logBatchUpdate(getSQLStatement("DELETE_PAGED_MESSAGE"), rowsReference, "deleted"); }
+ if (trace) { logBatchUpdate(getSQLStatement("DELETE_MESSAGE_REF"), rowsReference, "deleted"); }
}
}
catch (Exception e)
@@ -722,24 +735,23 @@
finally
{
closeStatement(psDeleteReference);
- closeStatement(psDeleteMessage);
closeConnection(conn);
- try
- {
+// try
+// {
wrap.end();
- }
- finally
- {
- //And then release locks
- this.releaseLocks(references);
- }
+// }
+// finally
+// {
+// //And then release locks
+// this.releaseLocks(references);
+// }
}
}
// After loading paged refs this is used to update P messages to non paged
public void updateReferencesNotPagedInRange(long channelID, long orderStart, long orderEnd, long num) throws Exception
{
- if (trace) { log.trace("Updating paaged references for channel " + channelID + " between " + orderStart + " and " + orderEnd); }
+ if (trace) { log.trace("Updating paged references for channel " + channelID + " between " + orderStart + " and " + orderEnd); }
Connection conn = null;
PreparedStatement ps = null;
@@ -1238,7 +1250,7 @@
try
{
// Get lock on message
- LockMap.instance.obtainLock(m);
+ // LockMap.instance.obtainLock(m);
psReference = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE_REF"));
@@ -1282,7 +1294,7 @@
finally
{
//Release Lock
- LockMap.instance.releaseLock(m);
+ // LockMap.instance.releaseLock(m);
}
}
}
@@ -1340,8 +1352,7 @@
TransactionWrapper wrap = new TransactionWrapper();
PreparedStatement psReference = null;
- PreparedStatement psMessage = null;
-
+
Connection conn = ds.getConnection();
Message m = ref.getMessage();
@@ -1349,7 +1360,7 @@
try
{
//get lock on message
- LockMap.instance.obtainLock(m);
+ // LockMap.instance.obtainLock(m);
psReference = conn.prepareStatement(getSQLStatement("DELETE_MESSAGE_REF"));
@@ -1364,36 +1375,7 @@
return;
}
- if (trace) { log.trace("Deleted " + rows + " rows"); }
-
- ref.getMessage().decrementPersistentCount();
-
- if (ref.getMessage().getPersistentCount() == 0)
- {
- if (trace) { log.trace("Last reference so deleting message"); }
-
- //Delete the message (if necessary)
-
- if (this.isPaging())
- {
- //There is a possibility there are paged refs holding the message so we need to do a conditional delete
- psMessage = conn.prepareStatement(getSQLStatement("DELETE_PAGED_MESSAGE"));
-
- psMessage.setLong(1, m.getMessageID());
-
- psMessage.setLong(2, m.getMessageID());
- }
- else
- {
- psMessage = conn.prepareStatement(getSQLStatement("DELETE_MESSAGE"));
-
- psMessage.setLong(1, m.getMessageID());
- }
-
- rows = executeWithRetry(psMessage);
-
- if (trace) { log.trace("Delete " + rows + " rows"); }
- }
+ if (trace) { log.trace("Deleted " + rows + " rows"); }
}
catch (Exception e)
{
@@ -1403,17 +1385,16 @@
finally
{
closeStatement(psReference);
- closeStatement(psMessage);
closeConnection(conn);
- try
- {
+// try
+// {
wrap.end();
- }
- finally
- {
- //release the lock
- LockMap.instance.releaseLock(m);
- }
+// }
+// finally
+// {
+// //release the lock
+// LockMap.instance.releaseLock(m);
+// }
}
}
}
@@ -1486,10 +1467,10 @@
* We order the list of references in ascending message order thus preventing deadlock when 2 or
* more channels are updating the same messages in different transactions.
*/
- protected void orderReferences(List references)
- {
- Collections.sort(references, MessageOrderComparator.instance);
- }
+// protected void orderReferences(List references)
+// {
+// Collections.sort(references, MessageOrderComparator.instance);
+// }
protected void handleBeforeCommit1PC(List refsToAdd, List refsToRemove, Transaction tx)
throws Exception
@@ -1498,21 +1479,21 @@
// so we will end up acquiring the lock more than once which is unnecessary. If find
// unique set of messages can avoid this.
- List allRefs = new ArrayList(refsToAdd.size() + refsToRemove.size());
-
- for(Iterator i = refsToAdd.iterator(); i.hasNext(); )
- {
- ChannelRefPair pair = (ChannelRefPair)i.next();
- allRefs.add(pair.ref);
- }
-
- for(Iterator i = refsToRemove.iterator(); i.hasNext(); )
- {
- ChannelRefPair pair = (ChannelRefPair)i.next();
- allRefs.add(pair.ref);
- }
-
- orderReferences(allRefs);
+// List allRefs = new ArrayList(refsToAdd.size() + refsToRemove.size());
+//
+// for(Iterator i = refsToAdd.iterator(); i.hasNext(); )
+// {
+// ChannelRefPair pair = (ChannelRefPair)i.next();
+// allRefs.add(pair.ref);
+// }
+//
+// for(Iterator i = refsToRemove.iterator(); i.hasNext(); )
+// {
+// ChannelRefPair pair = (ChannelRefPair)i.next();
+// allRefs.add(pair.ref);
+// }
+//
+// orderReferences(allRefs);
// For one phase we simply add rows corresponding to the refs and remove rows corresponding to
// the deliveries in one jdbc tx. We also need to store or remove messages as necessary,
@@ -1521,7 +1502,6 @@
Connection conn = null;
PreparedStatement psReference = null;
PreparedStatement psInsertMessage = null;
- PreparedStatement psDeleteMessage = null;
TransactionWrapper wrap = new TransactionWrapper();
try
@@ -1529,7 +1509,7 @@
conn = ds.getConnection();
// Obtain locks on all messages
- getLocks(allRefs);
+ // getLocks(allRefs);
// First the adds
@@ -1563,34 +1543,37 @@
psReference = null;
}
- Message m = ref.getMessage();
+ Message m = ref.getMessage();
- if (!m.isPersisted())
- {
- if (batch && psInsertMessage == null || !batch)
- {
- psInsertMessage = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE"));
- }
-
- // First time so add message
- storeMessage(m, psInsertMessage);
- m.setPersisted(true);
-
- if (batch)
- {
- psInsertMessage.addBatch();
- if (trace) { log.trace("Message does not already exist so inserting it"); }
- messageInsertsInBatch = true;
+ synchronized (m)
+ {
+ if (!m.isPersisted())
+ {
+ if (batch && psInsertMessage == null || !batch)
+ {
+ psInsertMessage = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE"));
+ }
+
+ // First time so add message
+ storeMessage(m, psInsertMessage);
+ m.setPersisted(true);
+
+ if (batch)
+ {
+ psInsertMessage.addBatch();
+ if (trace) { log.trace("Message does not already exist so inserting it"); }
+ messageInsertsInBatch = true;
+ }
+ else
+ {
+ if (trace) { log.trace("Message does not already exist so inserting it"); }
+ int rows = executeWithRetry(psInsertMessage);
+ if (trace) { log.trace("Inserted " + rows + " rows"); }
+
+ psInsertMessage.close();
+ psInsertMessage = null;
+ }
}
- else
- {
- if (trace) { log.trace("Message does not already exist so inserting it"); }
- int rows = executeWithRetry(psInsertMessage);
- if (trace) { log.trace("Inserted " + rows + " rows"); }
-
- psInsertMessage.close();
- psInsertMessage = null;
- }
}
}
@@ -1616,14 +1599,9 @@
// Now the removes
- psReference = null;
- psDeleteMessage = null;
+ psReference = null;
batch = usingBatchUpdates && refsToRemove.size() > 0;
- boolean messageDeletesInBatch = false;
-
- boolean pagng = this.isPaging();
-
for (Iterator i = refsToRemove.iterator(); i.hasNext(); )
{
ChannelRefPair pair = (ChannelRefPair)i.next();
@@ -1645,51 +1623,7 @@
if (trace) { log.trace("Deleted " + rows + " rows"); }
psReference.close();
psReference = null;
- }
-
- Message m = pair.ref.getMessage();
-
- m.decrementPersistentCount();
-
- if (m.getPersistentCount() == 0)
- {
- // Delete the message (if necessary)
-
- if (batch && psDeleteMessage == null || !batch)
- {
- if (pagng)
- {
- //Need to do conditional delete - ref might still exist for message
- psDeleteMessage = conn.prepareStatement(getSQLStatement("DELETE_PAGED_MESSAGE"));
- }
- else
- {
- psDeleteMessage = conn.prepareStatement(getSQLStatement("DELETE_MESSAGE"));
- }
- }
-
- psDeleteMessage.setLong(1, m.getMessageID());
-
- if (pagng)
- {
- psDeleteMessage.setLong(2, m.getMessageID());
- }
-
- if (batch)
- {
- psDeleteMessage.addBatch();
-
- messageDeletesInBatch = true;
- }
- else
- {
- int rows = executeWithRetry(psDeleteMessage);
- if (trace) { log.trace("Deleted " + rows + " rows"); }
-
- psDeleteMessage.close();
- psDeleteMessage = null;
- }
- }
+ }
}
if (batch)
@@ -1699,17 +1633,7 @@
int[] rows = executeWithRetryBatch(psReference);
if (trace) { logBatchUpdate(getSQLStatement("DELETE_MESSAGE_REF"), rows, "deleted"); }
-
- if (messageDeletesInBatch)
- {
- rows = executeWithRetryBatch(psDeleteMessage);
-
- if (trace) { logBatchUpdate(getSQLStatement("DELETE_MESSAGE"), rows, "deleted"); }
-
- psDeleteMessage.close();
- psDeleteMessage = null;
- }
-
+
psReference.close();
psReference = null;
}
@@ -1723,17 +1647,16 @@
{
closeStatement(psReference);
closeStatement(psInsertMessage);
- closeStatement(psDeleteMessage);
closeConnection(conn);
- try
- {
+ // try
+ // {
wrap.end();
- }
- finally
- {
- //Release the locks
- this.releaseLocks(allRefs);
- }
+// }
+// finally
+// {
+// //Release the locks
+// this.releaseLocks(allRefs);
+// }
}
}
@@ -1741,101 +1664,34 @@
throws Exception
{
Connection conn = null;
- PreparedStatement psDeleteMessage = null;
+
TransactionWrapper wrap = new TransactionWrapper();
- List refs = new ArrayList(refsToRemove.size());
- Iterator iter = refsToRemove.iterator();
- while (iter.hasNext())
- {
- ChannelRefPair pair = (ChannelRefPair)iter.next();
- refs.add(pair.ref);
- }
-
- orderReferences(refs);
+// List refs = new ArrayList(refsToRemove.size());
+// Iterator iter = refsToRemove.iterator();
+// while (iter.hasNext())
+// {
+// ChannelRefPair pair = (ChannelRefPair)iter.next();
+// refs.add(pair.ref);
+// }
+//
+// orderReferences(refs);
try
{
//get locks on all the refs
- this.getLocks(refs);
+ // this.getLocks(refs);
conn = ds.getConnection();
//2PC commit
- //First we commit any refs in state "+" to "C" and delete any
+ //We commit any refs in state "+" to "C" and delete any
//refs in state "-", then we
//remove any messages due to refs we just removed
//if they're not referenced elsewhere
- commitPreparedTransaction(tx, conn);
-
- boolean batch = usingBatchUpdates && refsToRemove.size() > 0;
-
- iter = refsToRemove.iterator();
-
- boolean messageDeletesInBatch = false;
-
- boolean pagng = this.isPaging();
-
- while (iter.hasNext())
- {
- ChannelRefPair pair = (ChannelRefPair) iter.next();
-
- MessageReference ref = pair.ref;
-
- Message m = ref.getMessage();
-
- m.decrementPersistentCount();
-
- if (m.getPersistentCount() == 0)
- {
- if (batch && psDeleteMessage == null || !batch)
- {
- if (pagng)
- {
- psDeleteMessage = conn.prepareStatement(getSQLStatement("DELETE_PAGED_MESSAGE"));
- }
- else
- {
- psDeleteMessage = conn.prepareStatement(getSQLStatement("DELETE_MESSAGE"));
- }
- }
-
- psDeleteMessage.setLong(1, m.getMessageID());
-
- if (pagng)
- {
- psDeleteMessage.setLong(2, m.getMessageID());
- }
-
- if (batch)
- {
- psDeleteMessage.addBatch();
-
- messageDeletesInBatch = true;
- }
- else
- {
- int rows = executeWithRetry(psDeleteMessage);
-
- if (trace) { log.trace("Deleted " + rows + " rows"); }
-
- psDeleteMessage.close();
- psDeleteMessage = null;
- }
- }
- }
-
- if (batch && messageDeletesInBatch)
- {
- int[] rows = executeWithRetryBatch(psDeleteMessage);
-
- if (trace) { logBatchUpdate(getSQLStatement("DELETE_MESSAGE"), rows, "deleted"); }
-
- psDeleteMessage.close();
- psDeleteMessage = null;
- }
+ commitPreparedTransaction(tx, conn);
}
catch (Exception e)
{
@@ -1844,35 +1700,34 @@
}
finally
{
- closeStatement(psDeleteMessage);
closeConnection(conn);
- try
- {
+// try
+// {
wrap.end();
- }
- finally
- {
- //release the locks
- this.releaseLocks(refs);
- }
+// }
+// finally
+// {
+// //release the locks
+// this.releaseLocks(refs);
+// }
}
}
protected void handleBeforePrepare(List refsToAdd, List refsToRemove, Transaction tx) throws Exception
{
//We only need to lock on the adds
- List refs = new ArrayList(refsToAdd.size());
+// List refs = new ArrayList(refsToAdd.size());
+//
+// Iterator iter = refsToAdd.iterator();
+// while (iter.hasNext())
+// {
+// ChannelRefPair pair = (ChannelRefPair)iter.next();
+//
+// refs.add(pair.ref);
+// }
+//
+// orderReferences(refs);
- Iterator iter = refsToAdd.iterator();
- while (iter.hasNext())
- {
- ChannelRefPair pair = (ChannelRefPair)iter.next();
-
- refs.add(pair.ref);
- }
-
- orderReferences(refs);
-
//We insert a tx record and
//a row for each ref with +
//and update the row for each delivery with "-"
@@ -1885,7 +1740,7 @@
try
{
//get the locks
- getLocks(refs);
+ // getLocks(refs);
conn = ds.getConnection();
@@ -1895,7 +1750,7 @@
addTXRecord(conn, tx);
}
- iter = refsToAdd.iterator();
+ Iterator iter = refsToAdd.iterator();
boolean batch = usingBatchUpdates && refsToAdd.size() > 1;
boolean messageInsertsInBatch = false;
@@ -1924,35 +1779,39 @@
psReference = null;
}
- Message m = pair.ref.getMessage();
+ Message m = pair.ref.getMessage();
- if (!m.isPersisted())
- {
- if (batch && psInsertMessage == null || !batch)
- {
- psInsertMessage = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE"));
- }
+ synchronized (m)
+ {
+
+ if (!m.isPersisted())
+ {
+ if (batch && psInsertMessage == null || !batch)
+ {
+ psInsertMessage = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE"));
+ }
+
+ storeMessage(m, psInsertMessage);
+
+ m.setPersisted(true);
- storeMessage(m, psInsertMessage);
-
- m.setPersisted(true);
-
- if (batch)
- {
- psInsertMessage.addBatch();
-
- messageInsertsInBatch = true;
+ if (batch)
+ {
+ psInsertMessage.addBatch();
+
+ messageInsertsInBatch = true;
+ }
+ else
+ {
+ int rows = executeWithRetry(psInsertMessage);
+
+ if (trace) { log.trace("Inserted " + rows + " rows"); }
+
+ psInsertMessage.close();
+
+ psInsertMessage = null;
+ }
}
- else
- {
- int rows = executeWithRetry(psInsertMessage);
-
- if (trace) { log.trace("Inserted " + rows + " rows"); }
-
- psInsertMessage.close();
-
- psInsertMessage = null;
- }
}
}
@@ -2032,16 +1891,16 @@
closeStatement(psReference);
closeStatement(psInsertMessage);
closeConnection(conn);
- try
- {
+// try
+// {
wrap.end();
- }
- finally
- {
+// }
+// finally
+// {
//release the locks
- this.releaseLocks(refs);
- }
+ // this.releaseLocks(refs);
+ // }
}
}
@@ -2050,95 +1909,28 @@
//remove refs marked with +
//and update rows marked with - to C
- PreparedStatement psDeleteMessage = null;
Connection conn = null;
TransactionWrapper wrap = new TransactionWrapper();
- List refs = new ArrayList(refsToAdd.size());
+// List refs = new ArrayList(refsToAdd.size());
+//
+// Iterator iter = refsToAdd.iterator();
+//
+// while (iter.hasNext())
+// {
+// ChannelRefPair pair = (ChannelRefPair)iter.next();
+// refs.add(pair.ref);
+// }
+//
+// orderReferences(refs);
- Iterator iter = refsToAdd.iterator();
-
- while (iter.hasNext())
- {
- ChannelRefPair pair = (ChannelRefPair)iter.next();
- refs.add(pair.ref);
- }
-
- orderReferences(refs);
-
try
{
- this.getLocks(refs);
+ // this.getLocks(refs);
conn = ds.getConnection();
- rollbackPreparedTransaction(tx, conn);
-
- iter = refsToAdd.iterator();
-
- boolean batch = usingBatchUpdates && refsToAdd.size() > 1;
-
- boolean messageDeletesInBatch = false;
-
- boolean pagng = this.isPaging();
-
- while (iter.hasNext())
- {
- ChannelRefPair pair = (ChannelRefPair) iter.next();
-
- Message m = pair.ref.getMessage();
-
- m.decrementPersistentCount();
-
- if (m.getPersistentCount() == 0)
- {
- if (batch && psDeleteMessage == null || !batch)
- {
- if (pagng)
- {
- psDeleteMessage = conn.prepareStatement(getSQLStatement("DELETE_PAGED_MESSAGE"));
- }
- else
- {
- psDeleteMessage = conn.prepareStatement(getSQLStatement("DELETE_MESSAGE"));
- }
- }
-
-
- psDeleteMessage.setLong(1, m.getMessageID());
-
- if (pagng)
- {
- psDeleteMessage.setLong(2, m.getMessageID());
- }
-
- if (batch)
- {
- psDeleteMessage.addBatch();
-
- messageDeletesInBatch = true;
- }
- else
- {
- int rows = executeWithRetry(psDeleteMessage);
-
- if (trace) { log.trace("deleted " + rows + " rows"); }
-
- psDeleteMessage.close();
- psDeleteMessage = null;
- }
- }
- }
-
- if (batch && messageDeletesInBatch)
- {
- int[] rows = executeWithRetryBatch(psDeleteMessage);
-
- if (trace) { logBatchUpdate(getSQLStatement("DELETE_MESSAGE"), rows, "deleted"); }
-
- psDeleteMessage.close();
- psDeleteMessage = null;
- }
+ rollbackPreparedTransaction(tx, conn);
}
catch (Exception e)
{
@@ -2146,18 +1938,17 @@
throw e;
}
finally
- {
- closeStatement(psDeleteMessage);
+ {
closeConnection(conn);
- try
- {
+// try
+// {
wrap.end();
- }
- finally
- {
- //release locks
- this.releaseLocks(refs);
- }
+// }
+// finally
+// {
+// //release locks
+// this.releaseLocks(refs);
+// }
}
}
@@ -2583,28 +2374,28 @@
}
}
- protected void getLocks(List refs)
- {
- Iterator iter = refs.iterator();
- while (iter.hasNext())
- {
- MessageReference ref = (MessageReference)iter.next();
- Message m = ref.getMessage();
- LockMap.instance.obtainLock(m);
- }
- }
+// protected void getLocks(List refs)
+// {
+// Iterator iter = refs.iterator();
+// while (iter.hasNext())
+// {
+// MessageReference ref = (MessageReference)iter.next();
+// Message m = ref.getMessage();
+// LockMap.instance.obtainLock(m);
+// }
+// }
+//
+// protected void releaseLocks(List refs)
+// {
+// Iterator iter = refs.iterator();
+// while (iter.hasNext())
+// {
+// MessageReference ref = (MessageReference)iter.next();
+// Message m = ref.getMessage();
+// LockMap.instance.releaseLock(m);
+// }
+// }
- protected void releaseLocks(List refs)
- {
- Iterator iter = refs.iterator();
- while (iter.hasNext())
- {
- MessageReference ref = (MessageReference)iter.next();
- Message m = ref.getMessage();
- LockMap.instance.releaseLock(m);
- }
- }
-
protected void logBatchUpdate(String name, int[] rows, String action)
{
int count = 0;
@@ -2617,12 +2408,17 @@
protected int executeWithRetry(PreparedStatement ps) throws Exception
{
- return executeWithRetry(ps, false)[0];
+ return executeWithRetry(ps, false, false)[0];
}
+ protected int executeWithRetryIgnoreKeyViolation(PreparedStatement ps) throws Exception
+ {
+ return executeWithRetry(ps, false, true)[0];
+ }
+
protected int[] executeWithRetryBatch(PreparedStatement ps) throws Exception
{
- return executeWithRetry(ps, true);
+ return executeWithRetry(ps, true, false);
}
//PersistentServiceSupport overrides ----------------------------
@@ -2639,13 +2435,14 @@
map.put("CREATE_IDX_MESSAGE_REF_ORD", "CREATE INDEX JBM_MSG_REF_ORD ON JBM_MSG_REF (ORD)");
map.put("CREATE_IDX_MESSAGE_REF_PAGE_ORD", "CREATE INDEX JBM_MSG_REF__PAGE_ORD ON JBM_MSG_REF (PAGE_ORD)");
map.put("CREATE_IDX_MESSAGE_REF_MESSAGE_ID", "CREATE INDEX JBM_MSG_REF_MESSAGE_ID ON JBM_MSG_REF (MESSAGE_ID)");
- map.put("CREATE_IDX_MESSAGE_REF_SCHED_DELIVERY", "CREATE INDEX JBM_MSG_REF_SCHED_DELIVERY ON JBM_MSG_REF (SCHED_DELIVERY)");
+ map.put("CREATE_IDX_MESSAGE_REF_SCHED_DELIVERY", "CREATE INDEX JBM_MSG_REF_SCHED_DELIVERY ON JBM_MSG_REF (SCHED_DELIVERY)");
//Message
map.put("CREATE_MESSAGE",
"CREATE TABLE JBM_MSG (MESSAGE_ID BIGINT, RELIABLE CHAR(1), " +
"EXPIRATION BIGINT, TIMESTAMP BIGINT, PRIORITY TINYINT, HEADERS LONGVARBINARY, " +
- "PAYLOAD LONGVARBINARY, TYPE TINYINT, PAGED CHAR(1) " +
+ "PAYLOAD LONGVARBINARY, TYPE TINYINT " +
"PRIMARY KEY (MESSAGE_ID))");
+ map.put("CREATE_IDX_MESSAGE_TIMESTAMP", "CREATE INDEX JBM_MSG_REF_TIMESTAMP ON JBM_MSG (TIMESTAMP)");
//Transaction
map.put("CREATE_TRANSACTION",
"CREATE TABLE JBM_TX (" +
@@ -2691,16 +2488,15 @@
//Message
map.put("LOAD_MESSAGES",
"SELECT MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, " +
- "PRIORITY, HEADERS, PAYLOAD, TYPE, PAGED " +
+ "PRIORITY, HEADERS, PAYLOAD, TYPE " +
"FROM JBM_MSG");
map.put("INSERT_MESSAGE",
"INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, " +
- "TIMESTAMP, PRIORITY, HEADERS, PAYLOAD, TYPE, PAGED) " +
- "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)" );
- map.put("DELETE_MESSAGE", "DELETE FROM JBM_MSG WHERE MESSAGE_ID=?");
- map.put("DELETE_PAGED_MESSAGE", "DELETE FROM JBM_MSG WHERE MESSAGE_ID=? AND NOT EXISTS (SELECT * FROM JBM_MSG_REF WHERE MESSAGE_ID = ?)");
+ "TIMESTAMP, PRIORITY, HEADERS, PAYLOAD, TYPE) " +
+ "VALUES (?, ?, ?, ?, ?, ?, ?, ?)" );
map.put("MESSAGE_ID_COLUMN", "MESSAGE_ID");
map.put("MESSAGE_EXISTS", "SELECT MESSAGE_ID FROM JBM_MSG WHERE MESSAGE_ID = ?");
+ map.put("REAP_MESSAGES", "DELETE FROM JBM_MSG WHERE TIMESTAMP <= ? AND NOT EXISTS (SELECT * FROM JBM_MSG_REF WHERE JBM_MSG_REF.MESSAGE_ID = JBM_MSG.MESSAGE_ID)");
//Transaction
map.put("INSERT_TRANSACTION",
"INSERT INTO JBM_TX (NODE_ID, TRANSACTION_ID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID) " +
@@ -2725,7 +2521,7 @@
- private int[] executeWithRetry(PreparedStatement ps, boolean batch) throws Exception
+ private int[] executeWithRetry(PreparedStatement ps, boolean batch, boolean ignoreKeyViolation) throws Exception
{
final int MAX_TRIES = 25;
@@ -2745,7 +2541,22 @@
}
else
{
- rows = ps.executeUpdate();
+ try
+ {
+ rows = ps.executeUpdate();
+ }
+ catch (SQLException e)
+ {
+ if (ignoreKeyViolation && e.getSQLState().equals("23000"))
+ {
+ //Key violation - ignore
+ log.info("Got key violation - but ignoring");
+ }
+ else
+ {
+ throw e;
+ }
+ }
}
if (tries > 0)
@@ -2757,6 +2568,9 @@
catch (SQLException e)
{
log.warn("SQLException caught - assuming deadlock detected, try:" + (tries + 1), e);
+
+ log.info("SQLState:" + e.getSQLState() + " code:" + e.getErrorCode());
+
tries++;
if (tries == MAX_TRIES)
{
@@ -2918,9 +2732,80 @@
return order;
}
-
+
+ private void reapUnreferencedMessages(long timestamp) throws Exception
+ {
+ Connection conn = null;
+ PreparedStatement ps = null;
+ TransactionWrapper wrap = new TransactionWrapper();
+
+ int rows = -1;
+
+ long start = System.currentTimeMillis();
+
+ try
+ {
+ conn = ds.getConnection();
+
+ ps = conn.prepareStatement(getSQLStatement("REAP_MESSAGES"));
+
+ ps.setLong(1, timestamp);
+
+ rows = ps.executeUpdate();
+ }
+ catch (Exception e)
+ {
+ wrap.exceptionOccurred();
+ throw e;
+ }
+ finally
+ {
+ closeStatement(ps);
+ closeConnection(conn);
+ wrap.end();
+
+ long end = System.currentTimeMillis();
+
+ if (trace) { log.trace("Reaper reaped " + rows + " messages in " + (end - start) + " ms"); }
+
+ log.info("Reaper reaped " + rows + " messages in " + (end - start) + " ms");
+ }
+ }
+
+
// Inner classes -------------------------------------------------
-
+
+ private class Reaper extends TimerTask
+ {
+ private boolean cancel;
+
+ public synchronized void run()
+ {
+ if (cancel)
+ {
+ cancel();
+
+ return;
+ }
+
+ try
+ {
+ reapUnreferencedMessages(System.currentTimeMillis() - reaperPeriod);
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to reap", e);
+ }
+ }
+
+ public synchronized void doCancel()
+ {
+ cancel = true;
+
+ cancel();
+ }
+ }
+
private static class ChannelRefPair
{
private long channelID;
Modified: trunk/src/main/org/jboss/messaging/core/impl/PagingChannelSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/PagingChannelSupport.java 2007-08-21 16:24:12 UTC (rev 3021)
+++ trunk/src/main/org/jboss/messaging/core/impl/PagingChannelSupport.java 2007-08-21 21:07:50 UTC (rev 3022)
@@ -231,8 +231,6 @@
paging = false;
- pm.setPaging(channelID, false);
-
firstPagingOrder = nextPagingOrder = 0;
clearAllScheduledDeliveries();
@@ -305,9 +303,7 @@
if (messageRefs.size() != fullSize)
{
- paging = false;
-
- pm.setPaging(channelID, false);
+ paging = false;
}
}
}
@@ -367,8 +363,6 @@
{
paging = false;
- pm.setPaging(channelID, false);
-
return false;
}
}
@@ -388,9 +382,7 @@
// We are full in memory - go into paging mode
if (trace) { log.trace(this + " going into paging mode"); }
- paging = true;
-
- pm.setPaging(channelID, true);
+ paging = true;
}
}
}
@@ -497,8 +489,6 @@
paging = false;
}
- pm.setPaging(channelID, paging);
-
Map refMap = processReferences(ili.getRefInfos());
Iterator iter = ili.getRefInfos().iterator();
@@ -523,9 +513,7 @@
ref.setPagingOrder(-1);
ref.setScheduledDeliveryTime(info.getScheduledDelivery());
-
- ref.getMessage().incrementPersistentCount();
-
+
//Schedule the delivery if necessary, or just add to the in memory queue
if (!checkAndSchedule(ref))
{
Modified: trunk/src/main/org/jboss/messaging/core/impl/message/MessageSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/message/MessageSupport.java 2007-08-21 16:24:12 UTC (rev 3021)
+++ trunk/src/main/org/jboss/messaging/core/impl/message/MessageSupport.java 2007-08-21 21:07:50 UTC (rev 3022)
@@ -76,8 +76,6 @@
// Must be hidden from subclasses
private byte[] payloadAsByteArray;
- private transient volatile int persistentCount;
-
private transient volatile boolean persisted;
// Constructors --------------------------------------------------
@@ -316,26 +314,6 @@
return new SimpleMessageReference(this);
}
- public int getPersistentCount()
- {
- return persistentCount;
- }
-
- public void setPersistentCount(int count)
- {
- persistentCount = count;
- }
-
- public void decrementPersistentCount()
- {
- persistentCount--;
- }
-
- public void incrementPersistentCount()
- {
- persistentCount++;
- }
-
public boolean isPersisted()
{
return persisted;
Modified: trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java 2007-08-21 16:24:12 UTC (rev 3021)
+++ trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java 2007-08-21 21:07:50 UTC (rev 3022)
@@ -348,7 +348,13 @@
put(Replicator.JVM_ID_KEY, JMSClientVMIdentifier.instance);
groupMember.multicastControl(new JoinClusterRequest(thisNodeID, info), true);
+
+ checkStartReaper();
}
+ else
+ {
+ pm.startReaper();
+ }
//Now load the bindings for this node
@@ -837,15 +843,25 @@
*/
public void nodeJoined(Address address) throws Exception
{
- log.debug(this + ": " + address + " joined");
-
- // Currently does nothing
+ log.debug(this + ": " + address + " joined");
}
+ private void checkStartReaper()
+ {
+ if (groupMember.getCurrentView().size() == 1)
+ {
+ //We are the only member in the group - start the message reaper
+
+ pm.startReaper();
+ }
+ }
+
public void nodesLeft(List addresses) throws Throwable
{
if (trace) { log.trace("Nodes left " + addresses.size()); }
+ checkStartReaper();
+
Map oldFailoverMap = new HashMap(this.failoverMap);
int oldFailoverNodeID = failoverNodeID;
@@ -2090,14 +2106,6 @@
startedTx = true;
}
- //We set the persistent count to be the same as the localReliableCount
- //Note that we MUST set the persistent count before routing to any of the queues
- //if we only set it when we actually persist in a channel then we could have the situation where
- //a ref arrives in a subscription then gets acknowledged and removed before hitting the next sub
- //so we would end up with a churn where the message is getting added and removed multiple times for
- //a single route
- ref.getMessage().setPersistentCount(localReliableCount);
-
//Now actually route the ref
iter = targets.iterator();
Modified: trunk/src/main/org/jboss/messaging/core/impl/tx/TransactionRepository.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/tx/TransactionRepository.java 2007-08-21 16:24:12 UTC (rev 3021)
+++ trunk/src/main/org/jboss/messaging/core/impl/tx/TransactionRepository.java 2007-08-21 21:07:50 UTC (rev 3022)
@@ -307,8 +307,6 @@
MessageReference ref = messageStore.reference(msg);
- ref.getMessage().incrementPersistentCount();
-
ref.getMessage().setPersisted(true);
Binding binding = postOffice.getBindingForChannelID(channelID);
@@ -368,8 +366,6 @@
ref = messageStore.reference(msg);
- ref.getMessage().incrementPersistentCount();
-
ref.getMessage().setPersisted(true);
Binding binding = postOffice.getBindingForChannelID(channelID);
Modified: trunk/src/main/org/jboss/messaging/util/LockMap.java
===================================================================
--- trunk/src/main/org/jboss/messaging/util/LockMap.java 2007-08-21 16:24:12 UTC (rev 3021)
+++ trunk/src/main/org/jboss/messaging/util/LockMap.java 2007-08-21 21:07:50 UTC (rev 3022)
@@ -56,47 +56,47 @@
this.map = new ConcurrentHashMap();
}
- public void obtainLock(Object obj)
- {
- Entry entry = null;
- synchronized (obj)
- {
- entry = (Entry)map.get(obj);
- if (entry == null)
- {
- entry = new Entry();
- map.put(obj, entry);
- }
- entry.refCount++;
- }
- try
- {
- entry.lock.acquire();
- }
- catch (InterruptedException e)
- {
- throw new IllegalStateException("Thread interrupted while acquiring lock");
- }
- }
+// public void obtainLock(Object obj)
+// {
+// Entry entry = null;
+// synchronized (obj)
+// {
+// entry = (Entry)map.get(obj);
+// if (entry == null)
+// {
+// entry = new Entry();
+// map.put(obj, entry);
+// }
+// entry.refCount++;
+// }
+// try
+// {
+// entry.lock.acquire();
+// }
+// catch (InterruptedException e)
+// {
+// throw new IllegalStateException("Thread interrupted while acquiring lock");
+// }
+// }
+//
+// public void releaseLock(Object obj)
+// {
+// synchronized (obj)
+// {
+// Entry entry = (Entry)map.get(obj);
+// if (entry == null)
+// {
+// throw new IllegalArgumentException("Cannot find mutex in map for " + obj);
+// }
+// if (entry.refCount == 1)
+// {
+// map.remove(obj);
+// }
+// entry.refCount--;
+// entry.lock.release();
+// }
+// }
- public void releaseLock(Object obj)
- {
- synchronized (obj)
- {
- Entry entry = (Entry)map.get(obj);
- if (entry == null)
- {
- throw new IllegalArgumentException("Cannot find mutex in map for " + obj);
- }
- if (entry.refCount == 1)
- {
- map.remove(obj);
- }
- entry.refCount--;
- entry.lock.release();
- }
- }
-
public int getSize()
{
return map.size();
Modified: trunk/tests/src/org/jboss/test/messaging/core/JDBCPersistenceManagerTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/JDBCPersistenceManagerTest.java 2007-08-21 16:24:12 UTC (rev 3021)
+++ trunk/tests/src/org/jboss/test/messaging/core/JDBCPersistenceManagerTest.java 2007-08-21 21:07:50 UTC (rev 3022)
@@ -139,9 +139,6 @@
Message m1 = messages[i * 2];
Message m2 = messages[i * 2 + 1];
- m1.setPersistentCount(2);
- m2.setPersistentCount(2);
-
MessageReference ref1_1 = ms.reference(m1);
MessageReference ref1_2 = ms.reference(m1);
@@ -168,6 +165,7 @@
assertTrue(refs.contains(new Long(m1.getMessageID())));
assertTrue(refs.contains(new Long(m2.getMessageID())));
+ pm.reapUnreferencedMessages();
List msgs = getMessageIds();
assertNotNull(msgs);
assertEquals(2, msgs.size());
@@ -187,6 +185,7 @@
assertTrue(refs.contains(new Long(m1.getMessageID())));
assertTrue(refs.contains(new Long(m2.getMessageID())));
+ pm.reapUnreferencedMessages();
msgs = getMessageIds();
assertNotNull(msgs);
assertEquals(2, msgs.size());
@@ -205,6 +204,7 @@
assertEquals(1, refs.size());
assertTrue(refs.contains(new Long(m2.getMessageID())));
+ pm.reapUnreferencedMessages();
msgs = getMessageIds();
assertNotNull(msgs);
assertEquals(1, msgs.size());
@@ -221,6 +221,7 @@
assertEquals(1, refs.size());
assertTrue(refs.contains(new Long(m2.getMessageID())));
+ pm.reapUnreferencedMessages();
msgs = getMessageIds();
assertNotNull(msgs);
assertEquals(1, msgs.size());
@@ -236,6 +237,7 @@
assertNotNull(refs);
assertTrue(refs.isEmpty());
+ pm.reapUnreferencedMessages();
msgs = getMessageIds();
assertNotNull(msgs);
assertTrue(msgs.isEmpty());
@@ -515,6 +517,7 @@
assertTrue(refIds.contains(new Long(ref15.getMessage().getMessageID())));
+ pm.reapUnreferencedMessages();
List msgs = getMessageIds();
assertNotNull(msgs);
assertEquals(10, msgs.size());
@@ -557,6 +560,7 @@
assertEquals(1, refIds.size());
assertTrue(refIds.contains(new Long(ref11.getMessage().getMessageID())));
+ pm.reapUnreferencedMessages();
ms = getMessageIds();
assertNotNull(ms);
@@ -591,7 +595,7 @@
assertTrue(refIds.contains(new Long(ref9.getMessage().getMessageID())));
assertTrue(refIds.contains(new Long(ref10.getMessage().getMessageID())));
-
+ pm.reapUnreferencedMessages();
ms = getMessageIds();
assertNotNull(ms);
@@ -620,6 +624,7 @@
refs.add(ref10);
pm.removeDepagedReferences(channel1.getChannelID(), refs);
+ pm.reapUnreferencedMessages();
ms = getMessageIds();
assertNotNull(ms);
assertEquals(0, ms.size());
@@ -760,6 +765,7 @@
assertTrue(refIds.contains(new Long(ref9.getMessage().getMessageID())));
assertTrue(refIds.contains(new Long(ref10.getMessage().getMessageID())));
+ pm.reapUnreferencedMessages();
List msgs = getMessageIds();
assertNotNull(msgs);
assertEquals(10, msgs.size());
@@ -909,6 +915,7 @@
assertTrue(refIds.contains(new Long(ref9.getMessage().getMessageID())));
assertTrue(refIds.contains(new Long(ref10.getMessage().getMessageID())));
+ pm.reapUnreferencedMessages();
List msgs = getMessageIds();
assertNotNull(msgs);
assertEquals(10, msgs.size());
@@ -1304,8 +1311,6 @@
log.debug("adding references non-transactionally");
// Add first two refs non transactionally
- ref1.getMessage().incrementPersistentCount();
- ref2.getMessage().incrementPersistentCount();
pm.addReference(channel.getChannelID(), ref1, null);
pm.addReference(channel.getChannelID(), ref2, null);
@@ -1316,6 +1321,7 @@
assertTrue(refs.contains(new Long(ref1.getMessage().getMessageID())));
assertTrue(refs.contains(new Long(ref2.getMessage().getMessageID())));
+ pm.reapUnreferencedMessages();
List msgs = getMessageIds();
assertNotNull(msgs);
assertEquals(2, msgs.size());
@@ -1325,9 +1331,6 @@
log.debug("ref1 and ref2 are there");
//Add the next 3 refs transactionally
- ref3.getMessage().incrementPersistentCount();
- ref4.getMessage().incrementPersistentCount();
- ref5.getMessage().incrementPersistentCount();
pm.addReference(channel.getChannelID(), ref3, tx);
pm.addReference(channel.getChannelID(), ref4, tx);
pm.addReference(channel.getChannelID(), ref5, tx);
@@ -1343,6 +1346,7 @@
assertTrue(refs.contains(new Long(ref1.getMessage().getMessageID())));
assertTrue(refs.contains(new Long(ref2.getMessage().getMessageID())));
+ pm.reapUnreferencedMessages();
msgs = getMessageIds();
assertNotNull(msgs);
assertEquals(2, msgs.size());
@@ -1362,6 +1366,7 @@
assertTrue(refs.contains(new Long(ref4.getMessage().getMessageID())));
assertTrue(refs.contains(new Long(ref5.getMessage().getMessageID())));
+ pm.reapUnreferencedMessages();
msgs = getMessageIds();
assertNotNull(msgs);
assertEquals(3, msgs.size());
@@ -1408,8 +1413,6 @@
MessageReference ref5 = ms.reference(m5);
//Add first two refs non transactionally
- ref1.getMessage().incrementPersistentCount();
- ref2.getMessage().incrementPersistentCount();
pm.addReference(channel.getChannelID(), ref1, null);
pm.addReference(channel.getChannelID(), ref2, null);
@@ -1420,6 +1423,7 @@
assertTrue(refs.contains(new Long(ref1.getMessage().getMessageID())));
assertTrue(refs.contains(new Long(ref2.getMessage().getMessageID())));
+ pm.reapUnreferencedMessages();
List msgs = getMessageIds();
assertNotNull(msgs);
assertEquals(2, msgs.size());
@@ -1429,9 +1433,6 @@
//Add the next 3 refs transactionally
- ref3.getMessage().incrementPersistentCount();
- ref4.getMessage().incrementPersistentCount();
- ref5.getMessage().incrementPersistentCount();
pm.addReference(channel.getChannelID(), ref3, tx);
pm.addReference(channel.getChannelID(), ref4, tx);
pm.addReference(channel.getChannelID(), ref5, tx);
@@ -1447,6 +1448,7 @@
assertTrue(refs.contains(new Long(ref1.getMessage().getMessageID())));
assertTrue(refs.contains(new Long(ref2.getMessage().getMessageID())));
+ pm.reapUnreferencedMessages();
msgs = getMessageIds();
assertNotNull(msgs);
assertEquals(2, msgs.size());
@@ -1462,6 +1464,7 @@
assertTrue(refs.contains(new Long(ref1.getMessage().getMessageID())));
assertTrue(refs.contains(new Long(ref2.getMessage().getMessageID())));
+ pm.reapUnreferencedMessages();
msgs = getMessageIds();
assertNotNull(msgs);
assertEquals(2, msgs.size());
Modified: trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_NP_2PCTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_NP_2PCTest.java 2007-08-21 16:24:12 UTC (rev 3021)
+++ trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_NP_2PCTest.java 2007-08-21 21:07:50 UTC (rev 3022)
@@ -119,6 +119,7 @@
//Msgs
+ pm.reapUnreferencedMessages();
List msgIds = getMessageIds();
assertEquals(0, msgIds.size());
@@ -168,6 +169,7 @@
//Msgs
+ pm.reapUnreferencedMessages();
msgIds = getMessageIds();
assertEquals(25, msgIds.size());
@@ -216,6 +218,7 @@
//Msgs
+ pm.reapUnreferencedMessages();
msgIds = getMessageIds();
assertEquals(50, msgIds.size());
@@ -264,6 +267,7 @@
//Msgs
+ pm.reapUnreferencedMessages();
msgIds = getMessageIds();
assertEquals(100, msgIds.size());
@@ -306,6 +310,7 @@
//Msgs
+ pm.reapUnreferencedMessages();
msgIds = getMessageIds();
assertEquals(0, msgIds.size());
Modified: trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_NP_NTTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_NP_NTTest.java 2007-08-21 16:24:12 UTC (rev 3021)
+++ trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_NP_NTTest.java 2007-08-21 21:07:50 UTC (rev 3022)
@@ -115,6 +115,7 @@
//Msgs
+ pm.reapUnreferencedMessages();
List msgIds = getMessageIds();
assertEquals(0, msgIds.size());
@@ -161,6 +162,7 @@
//Msgs
+ pm.reapUnreferencedMessages();
msgIds = getMessageIds();
assertEquals(25, msgIds.size());
@@ -207,6 +209,7 @@
//Msgs
+ pm.reapUnreferencedMessages();
msgIds = getMessageIds();
assertEquals(50, msgIds.size());
@@ -254,6 +257,7 @@
//Msgs
+ pm.reapUnreferencedMessages();
msgIds = getMessageIds();
assertEquals(100, msgIds.size());
@@ -296,6 +300,7 @@
//Msgs
+ pm.reapUnreferencedMessages();
msgIds = getMessageIds();
assertEquals(0, msgIds.size());
Modified: trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_NP_TTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_NP_TTest.java 2007-08-21 16:24:12 UTC (rev 3021)
+++ trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_NP_TTest.java 2007-08-21 21:07:50 UTC (rev 3022)
@@ -118,6 +118,7 @@
//Msgs
+ pm.reapUnreferencedMessages();
List msgIds = getMessageIds();
assertEquals(0, msgIds.size());
@@ -166,6 +167,7 @@
//Msgs
+ pm.reapUnreferencedMessages();
msgIds = getMessageIds();
assertEquals(25, msgIds.size());
@@ -214,6 +216,7 @@
//Msgs
+ pm.reapUnreferencedMessages();
msgIds = getMessageIds();
assertEquals(50, msgIds.size());
@@ -262,6 +265,7 @@
//Msgs
+ pm.reapUnreferencedMessages();
msgIds = getMessageIds();
assertEquals(100, msgIds.size());
@@ -304,6 +308,7 @@
//Msgs
+ pm.reapUnreferencedMessages();
msgIds = getMessageIds();
assertEquals(0, msgIds.size());
Modified: trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_P_2PCTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_P_2PCTest.java 2007-08-21 16:24:12 UTC (rev 3021)
+++ trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_P_2PCTest.java 2007-08-21 21:07:50 UTC (rev 3022)
@@ -119,6 +119,7 @@
//Msgs
+ pm.reapUnreferencedMessages();
List msgIds = getMessageIds();
assertEquals(50, msgIds.size());
@@ -173,6 +174,7 @@
//Msgs
+ pm.reapUnreferencedMessages();
msgIds = getMessageIds();
assertEquals(75, msgIds.size());
@@ -228,6 +230,7 @@
//Msgs
+ pm.reapUnreferencedMessages();
msgIds = getMessageIds();
assertEquals(100, msgIds.size());
@@ -283,6 +286,7 @@
//Msgs
+ pm.reapUnreferencedMessages();
msgIds = getMessageIds();
assertEquals(150, msgIds.size());
@@ -325,6 +329,7 @@
//Msgs
+ pm.reapUnreferencedMessages();
msgIds = getMessageIds();
assertEquals(0, msgIds.size());
Modified: trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_P_NTTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_P_NTTest.java 2007-08-21 16:24:12 UTC (rev 3021)
+++ trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_P_NTTest.java 2007-08-21 21:07:50 UTC (rev 3022)
@@ -121,6 +121,7 @@
//Msgs
+ pm.reapUnreferencedMessages();
List msgIds = getMessageIds();
assertEquals(50, msgIds.size());
@@ -173,6 +174,7 @@
//Msgs
+ pm.reapUnreferencedMessages();
msgIds = getMessageIds();
assertEquals(75, msgIds.size());
@@ -227,6 +229,7 @@
//Msgs
+ pm.reapUnreferencedMessages();
msgIds = getMessageIds();
assertEquals(100, msgIds.size());
@@ -280,6 +283,7 @@
//Msgs
+ pm.reapUnreferencedMessages();
msgIds = getMessageIds();
assertEquals(150, msgIds.size());
@@ -322,6 +326,7 @@
//Msgs
+ pm.reapUnreferencedMessages();
msgIds = getMessageIds();
assertEquals(0, msgIds.size());
Modified: trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_P_TTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_P_TTest.java 2007-08-21 16:24:12 UTC (rev 3021)
+++ trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_P_TTest.java 2007-08-21 21:07:50 UTC (rev 3022)
@@ -122,6 +122,7 @@
//Msgs
+ pm.reapUnreferencedMessages();
List msgIds = getMessageIds();
assertEquals(50, msgIds.size());
@@ -175,6 +176,7 @@
//Msgs
+ pm.reapUnreferencedMessages();
msgIds = getMessageIds();
assertEquals(75, msgIds.size());
@@ -230,6 +232,7 @@
//Msgs
+ pm.reapUnreferencedMessages();
msgIds = getMessageIds();
assertEquals(100, msgIds.size());
@@ -284,6 +287,7 @@
//Msgs
+ pm.reapUnreferencedMessages();
msgIds = getMessageIds();
assertEquals(150, msgIds.size());
@@ -326,6 +330,7 @@
//Msgs
+ pm.reapUnreferencedMessages();
msgIds = getMessageIds();
assertEquals(0, msgIds.size());
Modified: trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_NP_2PCTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_NP_2PCTest.java 2007-08-21 16:24:12 UTC (rev 3021)
+++ trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_NP_2PCTest.java 2007-08-21 21:07:50 UTC (rev 3022)
@@ -91,7 +91,7 @@
assertTrue(refIds.isEmpty());
//Verify no msgs in storage
- List msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); List msgIds = getMessageIds();
assertTrue(msgIds.isEmpty());
//Verify 99 refs in queue
@@ -124,7 +124,8 @@
assertTrue(refIds.isEmpty());
//Verify no msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertTrue(msgIds.isEmpty());
//Verify 100 refs in queue
@@ -158,7 +159,7 @@
assertTrue(refIds.isEmpty());
//Verify no msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertTrue(msgIds.isEmpty());
//Verify 100 refs in queue
@@ -192,7 +193,7 @@
assertSameIds(refIds, refs, 100, 109);
//Verify 10 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(10, msgIds.size());
assertSameIds(msgIds, refs, 100, 109);
@@ -226,7 +227,7 @@
assertSameIds(refIds, refs, 100, 109);
//Verify 10 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(10, msgIds.size());
assertSameIds(msgIds, refs, 100, 109);
@@ -263,7 +264,7 @@
assertSameIds(refIds, refs, 100, 119);
//Verify 20 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(20, msgIds.size());
assertSameIds(msgIds, refs, 100, 119);
@@ -310,7 +311,7 @@
assertSameIds(refIds, refs, 100, 129);
//Verify 30 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(30, msgIds.size());
assertSameIds(msgIds, refs, 100, 129);
@@ -348,7 +349,7 @@
assertSameIds(refIds, refs, 100, 139);
//Verify 40 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(40, msgIds.size());
assertSameIds(msgIds, refs, 100, 139);
@@ -383,7 +384,7 @@
assertSameIds(refIds, refs, 100, 139);
//Verify 40 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(40, msgIds.size());
assertSameIds(msgIds, refs, 100, 139);
@@ -414,7 +415,7 @@
assertSameIds(refIds, refs, 100, 139);
//Verify 40 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(40, msgIds.size());
assertSameIds(msgIds, refs, 100, 139);
@@ -448,7 +449,7 @@
assertSameIds(refIds, refs, 100, 139);
//Verify 40 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(40, msgIds.size());
assertSameIds(msgIds, refs, 100, 139);
@@ -479,7 +480,7 @@
assertSameIds(refIds, refs, 120, 140);
//Verify 21 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(21, msgIds.size());
assertSameIds(msgIds, refs, 120, 140);
@@ -510,7 +511,7 @@
assertSameIds(refIds, refs, 140, 140);
//Verify 1 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(1, msgIds.size());
assertSameIds(msgIds, refs, 140, 140);
@@ -540,7 +541,7 @@
assertEquals(0, refIds.size());
//Verify 0 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(0, msgIds.size());
//Verify 81 refs in queue
@@ -569,7 +570,7 @@
assertEquals(0, refIds.size());
//Verify 0 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(0, msgIds.size());
//Verify 80 refs in queue
@@ -598,7 +599,7 @@
assertEquals(0, refIds.size());
//Verify 0 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(0, msgIds.size());
//Verify 20 refs in queue
@@ -632,7 +633,7 @@
assertEquals(0, refIds.size());
//Verify 0 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(0, msgIds.size());
//Verify 40 refs in queue
@@ -667,7 +668,7 @@
assertEquals(0, refIds.size());
//Verify 0 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(0, msgIds.size());
//Verify 60 refs in queue
@@ -702,7 +703,7 @@
assertSameIds(refIds, refs, 221, 240);
//Verify 20 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(20, msgIds.size());
assertSameIds(msgIds, refs, 221, 240);
@@ -733,7 +734,7 @@
assertEquals(0, refIds.size());
//Verify 0 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(0, msgIds.size());
//Verify 100 refs in queue
@@ -762,7 +763,7 @@
assertEquals(0, refIds.size());
//Verify 0 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(0, msgIds.size());
//Verify 100 refs in queue
@@ -790,7 +791,7 @@
assertSameIds(refIds, refs, 231, 240);
//Verify 10 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(10, msgIds.size());
assertSameIds(msgIds, refs, 231, 240);
@@ -821,7 +822,7 @@
assertSameIds(refIds, refs, 221, 240);
//Verify 20 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(20, msgIds.size());
assertSameIds(msgIds, refs, 221, 240);
@@ -848,7 +849,7 @@
assertEquals(0, refIds.size());
//Verify 0 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(0, msgIds.size());
//Verify 70 refs in queue
@@ -872,7 +873,7 @@
assertEquals(0, refIds.size());
//Verify 0 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(0, msgIds.size());
//Verify 0 refs in queue
Modified: trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_NP_NTTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_NP_NTTest.java 2007-08-21 16:24:12 UTC (rev 3021)
+++ trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_NP_NTTest.java 2007-08-21 21:07:50 UTC (rev 3022)
@@ -85,7 +85,7 @@
assertTrue(refIds.isEmpty());
//Verify no msgs in storage
- List msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); List msgIds = getMessageIds();
assertTrue(msgIds.isEmpty());
//Verify 99 refs in queue
@@ -113,7 +113,7 @@
assertTrue(refIds.isEmpty());
//Verify no msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertTrue(msgIds.isEmpty());
//Verify 100 refs in queue
@@ -143,7 +143,7 @@
assertTrue(refIds.isEmpty());
//Verify no msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertTrue(msgIds.isEmpty());
//Verify 100 refs in queue
@@ -174,7 +174,7 @@
assertSameIds(refIds, refs, 100, 109);
//Verify 10 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(10, msgIds.size());
assertSameIds(msgIds, refs, 100, 109);
@@ -205,7 +205,7 @@
assertSameIds(refIds, refs, 100, 109);
//Verify 10 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(10, msgIds.size());
assertSameIds(msgIds, refs, 100, 109);
@@ -239,7 +239,7 @@
assertSameIds(refIds, refs, 100, 119);
//Verify 20 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(20, msgIds.size());
assertSameIds(msgIds, refs, 100, 119);
@@ -274,7 +274,7 @@
assertSameIds(refIds, refs, 100, 129);
//Verify 30 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(30, msgIds.size());
assertSameIds(msgIds, refs, 100, 129);
@@ -309,7 +309,7 @@
assertSameIds(refIds, refs, 100, 139);
//Verify 40 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(40, msgIds.size());
assertSameIds(msgIds, refs, 100, 139);
@@ -341,7 +341,7 @@
assertSameIds(refIds, refs, 100, 139);
//Verify 40 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(40, msgIds.size());
assertSameIds(msgIds, refs, 100, 139);
@@ -372,7 +372,7 @@
assertSameIds(refIds, refs, 100, 139);
//Verify 40 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(40, msgIds.size());
assertSameIds(msgIds, refs, 100, 139);
@@ -406,7 +406,7 @@
assertSameIds(refIds, refs, 100, 139);
//Verify 40 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(40, msgIds.size());
assertSameIds(msgIds, refs, 100, 139);
@@ -440,7 +440,7 @@
assertSameIds(refIds, refs, 120, 140);
//Verify 21 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(21, msgIds.size());
assertSameIds(msgIds, refs, 120, 140);
@@ -471,7 +471,7 @@
assertSameIds(refIds, refs, 140, 140);
//Verify 1 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(1, msgIds.size());
assertSameIds(msgIds, refs, 140, 140);
@@ -501,7 +501,7 @@
assertEquals(0, refIds.size());
//Verify 0 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(0, msgIds.size());
//Verify 81 refs in queue
@@ -530,7 +530,7 @@
assertEquals(0, refIds.size());
//Verify 0 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(0, msgIds.size());
//Verify 80 refs in queue
@@ -559,7 +559,7 @@
assertEquals(0, refIds.size());
//Verify 0 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(0, msgIds.size());
//Verify 20 refs in queue
@@ -590,7 +590,7 @@
assertEquals(0, refIds.size());
//Verify 0 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(0, msgIds.size());
//Verify 40 refs in queue
@@ -622,7 +622,7 @@
assertEquals(0, refIds.size());
//Verify 0 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(0, msgIds.size());
//Verify 60 refs in queue
@@ -654,7 +654,7 @@
assertSameIds(refIds, refs, 221, 240);
//Verify 20 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(20, msgIds.size());
assertSameIds(msgIds, refs, 221, 240);
@@ -687,7 +687,7 @@
assertEquals(0, refIds.size());
//Verify 0 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(0, msgIds.size());
//Verify 100 refs in queue
@@ -716,7 +716,7 @@
assertEquals(0, refIds.size());
//Verify 0 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(0, msgIds.size());
//Verify 100 refs in queue
@@ -746,7 +746,7 @@
assertSameIds(refIds, refs, 231, 240);
//Verify 10 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(10, msgIds.size());
assertSameIds(msgIds, refs, 231, 240);
@@ -777,7 +777,7 @@
assertSameIds(refIds, refs, 221, 240);
//Verify 20 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(20, msgIds.size());
assertSameIds(msgIds, refs, 221, 240);
@@ -807,7 +807,7 @@
assertEquals(0, refIds.size());
//Verify 0 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(0, msgIds.size());
//Verify 0 deliveries
@@ -837,7 +837,7 @@
assertEquals(0, refIds.size());
//Verify 0 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(0, msgIds.size());
//Verify 0 refs in queue
Modified: trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_NP_TTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_NP_TTest.java 2007-08-21 16:24:12 UTC (rev 3021)
+++ trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_NP_TTest.java 2007-08-21 21:07:50 UTC (rev 3022)
@@ -90,7 +90,7 @@
assertTrue(refIds.isEmpty());
//Verify no msgs in storage
- List msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); List msgIds = getMessageIds();
assertTrue(msgIds.isEmpty());
//Verify 99 refs in queue
@@ -122,7 +122,7 @@
assertTrue(refIds.isEmpty());
//Verify no msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertTrue(msgIds.isEmpty());
//Verify 100 refs in queue
@@ -155,7 +155,7 @@
assertTrue(refIds.isEmpty());
//Verify no msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertTrue(msgIds.isEmpty());
//Verify 100 refs in queue
@@ -188,7 +188,7 @@
assertSameIds(refIds, refs, 100, 109);
//Verify 10 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(10, msgIds.size());
assertSameIds(msgIds, refs, 100, 109);
@@ -221,7 +221,7 @@
assertSameIds(refIds, refs, 100, 109);
//Verify 10 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(10, msgIds.size());
assertSameIds(msgIds, refs, 100, 109);
@@ -257,7 +257,7 @@
assertSameIds(refIds, refs, 100, 119);
//Verify 20 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(20, msgIds.size());
assertSameIds(msgIds, refs, 100, 119);
@@ -303,7 +303,7 @@
assertSameIds(refIds, refs, 100, 129);
//Verify 30 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(30, msgIds.size());
assertSameIds(msgIds, refs, 100, 129);
@@ -338,7 +338,7 @@
assertSameIds(refIds, refs, 100, 139);
//Verify 40 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(40, msgIds.size());
assertSameIds(msgIds, refs, 100, 139);
@@ -372,7 +372,7 @@
assertSameIds(refIds, refs, 100, 139);
//Verify 40 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(40, msgIds.size());
assertSameIds(msgIds, refs, 100, 139);
@@ -403,7 +403,7 @@
assertSameIds(refIds, refs, 100, 139);
//Verify 40 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(40, msgIds.size());
assertSameIds(msgIds, refs, 100, 139);
@@ -437,7 +437,7 @@
assertSameIds(refIds, refs, 100, 139);
//Verify 40 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(40, msgIds.size());
assertSameIds(msgIds, refs, 100, 139);
@@ -470,7 +470,7 @@
assertSameIds(refIds, refs, 120, 140);
//Verify 21 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(21, msgIds.size());
assertSameIds(msgIds, refs, 120, 140);
@@ -501,7 +501,7 @@
assertSameIds(refIds, refs, 140, 140);
//Verify 1 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(1, msgIds.size());
assertSameIds(msgIds, refs, 140, 140);
@@ -531,7 +531,7 @@
assertEquals(0, refIds.size());
//Verify 0 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(0, msgIds.size());
//Verify 81 refs in queue
@@ -560,7 +560,7 @@
assertEquals(0, refIds.size());
//Verify 0 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(0, msgIds.size());
//Verify 80 refs in queue
@@ -589,7 +589,7 @@
assertEquals(0, refIds.size());
//Verify 0 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(0, msgIds.size());
//Verify 20 refs in queue
@@ -622,7 +622,7 @@
assertEquals(0, refIds.size());
//Verify 0 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(0, msgIds.size());
//Verify 40 refs in queue
@@ -656,7 +656,7 @@
assertEquals(0, refIds.size());
//Verify 0 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(0, msgIds.size());
//Verify 60 refs in queue
@@ -690,7 +690,7 @@
assertSameIds(refIds, refs, 221, 240);
//Verify 20 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(20, msgIds.size());
assertSameIds(msgIds, refs, 221, 240);
@@ -722,7 +722,7 @@
assertEquals(0, refIds.size());
//Verify 0 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(0, msgIds.size());
//Verify 100 refs in queue
@@ -751,7 +751,7 @@
assertEquals(0, refIds.size());
//Verify 0 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(0, msgIds.size());
//Verify 100 refs in queue
@@ -781,7 +781,7 @@
assertSameIds(refIds, refs, 231, 240);
//Verify 10 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(10, msgIds.size());
assertSameIds(msgIds, refs, 231, 240);
@@ -812,7 +812,7 @@
assertSameIds(refIds, refs, 221, 240);
//Verify 20 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(20, msgIds.size());
assertSameIds(msgIds, refs, 221, 240);
@@ -839,7 +839,7 @@
assertEquals(0, refIds.size());
//Verify 0 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(0, msgIds.size());
//Verify 70 refs in queue
@@ -863,7 +863,7 @@
assertEquals(0, refIds.size());
//Verify 0 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(0, msgIds.size());
//Verify 0 refs in queue
Modified: trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_P_2PCTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_P_2PCTest.java 2007-08-21 16:24:12 UTC (rev 3021)
+++ trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_P_2PCTest.java 2007-08-21 21:07:50 UTC (rev 3022)
@@ -94,7 +94,7 @@
assertSameIds(refIds, refs, 0, 98);
//Verify 99 msgs in storage
- List msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); List msgIds = getMessageIds();
assertEquals(99, msgIds.size());
assertSameIds(msgIds, refs, 0, 98);
@@ -131,7 +131,7 @@
assertSameIds(refIds, refs, 0, 99);
//Verify 100 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(100, msgIds.size());
assertSameIds(msgIds, refs, 0, 99);
@@ -171,7 +171,7 @@
assertSameIds(refIds, refs, 0, 108);
//Verify 100 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(109, msgIds.size());
assertSameIds(msgIds, refs, 0, 108);
@@ -211,7 +211,7 @@
assertSameIds(refIds, refs, 0, 109);
//Verify 110 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(110, msgIds.size());
assertSameIds(msgIds, refs, 0, 109);
@@ -249,7 +249,7 @@
assertSameIds(refIds, refs, 0, 110);
//Verify 111 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(111, msgIds.size());
assertSameIds(msgIds, refs, 0, 110);
@@ -290,7 +290,7 @@
assertSameIds(refIds, refs, 0, 119);
//Verify 120 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(120, msgIds.size());
assertSameIds(msgIds, refs, 0, 119);
@@ -341,7 +341,7 @@
assertSameIds(refIds, refs, 0, 129);
//Verify 130 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(130, msgIds.size());
assertSameIds(msgIds, refs, 0, 129);
@@ -383,7 +383,7 @@
assertSameIds(refIds, refs, 0, 139);
//Verify 140 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(140, msgIds.size());
assertSameIds(msgIds, refs, 0, 139);
@@ -423,7 +423,7 @@
assertSameIds(refIds, refs, 0, 140);
//Verify 141 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(141, msgIds.size());
assertSameIds(msgIds, refs, 0, 140);
@@ -458,7 +458,7 @@
assertSameIds(refIds, refs, 1, 140);
//Verify 140 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(140, msgIds.size());
assertSameIds(msgIds, refs, 1, 140);
@@ -497,7 +497,7 @@
assertSameIds(refIds, refs, 19, 140);
//Verify 122 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(122, msgIds.size());
assertSameIds(msgIds, refs, 19, 140);
@@ -535,7 +535,7 @@
assertSameIds(refIds, refs, 20, 140);
//Verify 121 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(121, msgIds.size());
assertSameIds(msgIds, refs, 20, 140);
@@ -571,7 +571,7 @@
assertSameIds(refIds, refs, 40, 140);
//Verify 120 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(101, msgIds.size());
assertSameIds(msgIds, refs, 40, 140);
@@ -606,7 +606,7 @@
assertSameIds(refIds, refs, 41, 140);
//Verify 100 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(100, msgIds.size());
assertSameIds(msgIds, refs, 41, 140);
@@ -641,7 +641,7 @@
assertSameIds(refIds, refs, 61, 140);
//Verify 80 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(80, msgIds.size());
assertSameIds(msgIds, refs, 61, 140);
@@ -676,7 +676,7 @@
assertSameIds(refIds, refs, 121, 140);
//Verify 20 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(20, msgIds.size());
assertSameIds(msgIds, refs, 121, 140);
@@ -714,7 +714,7 @@
assertSameIds(refIds, refs, 121, 160);
//Verify 40 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(40, msgIds.size());
assertSameIds(msgIds, refs, 121, 160);
@@ -753,7 +753,7 @@
assertSameIds(refIds, refs, 121, 180);
//Verify 60 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(60, msgIds.size());
assertSameIds(msgIds, refs, 121, 180);
@@ -794,7 +794,7 @@
assertSameIds(refIds, refs, 121, 240);
//Verify 120 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(120, msgIds.size());
assertSameIds(msgIds, refs, 121, 240);
@@ -831,7 +831,7 @@
assertSameIds(refIds, refs, 121, 240);
//Verify 120 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(120, msgIds.size());
assertSameIds(msgIds, refs, 121, 240);
@@ -866,7 +866,7 @@
assertSameIds(refIds, refs, 121, 240);
//Verify 120 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(120, msgIds.size());
assertSameIds(msgIds, refs, 121, 240);
@@ -902,7 +902,7 @@
assertSameIds(refIds, refs, 121, 240);
//Verify 120 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(120, msgIds.size());
assertSameIds(msgIds, refs, 121, 240);
@@ -938,7 +938,7 @@
assertSameIds(refIds, refs, 121, 240);
//Verify 120 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(120, msgIds.size());
assertSameIds(msgIds, refs, 121, 240);
@@ -972,7 +972,7 @@
assertSameIds(refIds, refs, 171, 240);
//Verify 70 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(70, msgIds.size());
assertSameIds(msgIds, refs, 171, 240);
@@ -1002,7 +1002,7 @@
assertEquals(0, refIds.size());
//Verify 0 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(0, msgIds.size());
//Verify 0 refs in queue
Modified: trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_P_NTTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_P_NTTest.java 2007-08-21 16:24:12 UTC (rev 3021)
+++ trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_P_NTTest.java 2007-08-21 21:07:50 UTC (rev 3022)
@@ -90,7 +90,7 @@
assertSameIds(refIds, refs, 0, 98);
//Verify 99 msgs in storage
- List msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); List msgIds = getMessageIds();
assertEquals(99, msgIds.size());
assertSameIds(msgIds, refs, 0, 98);
@@ -124,7 +124,7 @@
assertSameIds(refIds, refs, 0, 99);
//Verify 100 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(100, msgIds.size());
assertSameIds(msgIds, refs, 0, 99);
@@ -161,7 +161,7 @@
assertSameIds(refIds, refs, 0, 108);
//Verify 100 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(109, msgIds.size());
assertSameIds(msgIds, refs, 0, 108);
@@ -194,7 +194,7 @@
assertSameIds(refIds, refs, 0, 109);
//Verify 110 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(110, msgIds.size());
assertSameIds(msgIds, refs, 0, 109);
@@ -230,7 +230,7 @@
assertSameIds(refIds, refs, 0, 110);
//Verify 111 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(111, msgIds.size());
assertSameIds(msgIds, refs, 0, 110);
@@ -269,7 +269,7 @@
assertSameIds(refIds, refs, 0, 119);
//Verify 120 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(120, msgIds.size());
assertSameIds(msgIds, refs, 0, 119);
@@ -309,7 +309,7 @@
assertSameIds(refIds, refs, 0, 129);
//Verify 130 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(130, msgIds.size());
assertSameIds(msgIds, refs, 0, 129);
@@ -350,7 +350,7 @@
assertSameIds(refIds, refs, 0, 139);
//Verify 140 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(140, msgIds.size());
assertSameIds(msgIds, refs, 0, 139);
@@ -387,7 +387,7 @@
assertSameIds(refIds, refs, 0, 140);
//Verify 141 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(141, msgIds.size());
assertSameIds(msgIds, refs, 0, 140);
@@ -423,7 +423,7 @@
assertSameIds(refIds, refs, 1, 140);
//Verify 140 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(140, msgIds.size());
assertSameIds(msgIds, refs, 1, 140);
@@ -461,7 +461,7 @@
assertSameIds(refIds, refs, 19, 140);
//Verify 122 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(122, msgIds.size());
assertSameIds(msgIds, refs, 19, 140);
@@ -499,7 +499,7 @@
assertSameIds(refIds, refs, 20, 140);
//Verify 121 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(121, msgIds.size());
assertSameIds(msgIds, refs, 20, 140);
@@ -535,7 +535,7 @@
assertSameIds(refIds, refs, 40, 140);
//Verify 120 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(101, msgIds.size());
assertSameIds(msgIds, refs, 40, 140);
@@ -570,7 +570,7 @@
assertSameIds(refIds, refs, 41, 140);
//Verify 100 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(100, msgIds.size());
assertSameIds(msgIds, refs, 41, 140);
@@ -605,7 +605,7 @@
assertSameIds(refIds, refs, 61, 140);
//Verify 80 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(80, msgIds.size());
assertSameIds(msgIds, refs, 61, 140);
@@ -639,7 +639,7 @@
assertSameIds(refIds, refs, 121, 140);
//Verify 20 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(20, msgIds.size());
assertSameIds(msgIds, refs, 121, 140);
@@ -674,7 +674,7 @@
assertSameIds(refIds, refs, 121, 160);
//Verify 40 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(40, msgIds.size());
assertSameIds(msgIds, refs, 121, 160);
@@ -710,7 +710,7 @@
assertSameIds(refIds, refs, 121, 180);
//Verify 60 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(60, msgIds.size());
assertSameIds(msgIds, refs, 121, 180);
@@ -748,7 +748,7 @@
assertSameIds(refIds, refs, 121, 240);
//Verify 120 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(120, msgIds.size());
assertSameIds(msgIds, refs, 121, 240);
@@ -785,7 +785,7 @@
assertSameIds(refIds, refs, 121, 240);
//Verify 120 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(120, msgIds.size());
assertSameIds(msgIds, refs, 121, 240);
@@ -820,7 +820,7 @@
assertSameIds(refIds, refs, 121, 240);
//Verify 120 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(120, msgIds.size());
assertSameIds(msgIds, refs, 121, 240);
@@ -856,7 +856,7 @@
assertSameIds(refIds, refs, 121, 240);
//Verify 120 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(120, msgIds.size());
assertSameIds(msgIds, refs, 121, 240);
@@ -892,7 +892,7 @@
assertSameIds(refIds, refs, 121, 240);
//Verify 120 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(120, msgIds.size());
assertSameIds(msgIds, refs, 121, 240);
@@ -926,7 +926,7 @@
assertSameIds(refIds, refs, 171, 240);
//Verify 70 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(70, msgIds.size());
assertSameIds(msgIds, refs, 171, 240);
@@ -956,7 +956,7 @@
assertEquals(0, refIds.size());
//Verify 0 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(0, msgIds.size());
//Verify 0 refs in queue
Modified: trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_P_TTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_P_TTest.java 2007-08-21 16:24:12 UTC (rev 3021)
+++ trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_P_TTest.java 2007-08-21 21:07:50 UTC (rev 3022)
@@ -93,7 +93,7 @@
assertSameIds(refIds, refs, 0, 98);
//Verify 99 msgs in storage
- List msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); List msgIds = getMessageIds();
assertEquals(99, msgIds.size());
assertSameIds(msgIds, refs, 0, 98);
@@ -129,7 +129,7 @@
assertSameIds(refIds, refs, 0, 99);
//Verify 100 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(100, msgIds.size());
assertSameIds(msgIds, refs, 0, 99);
@@ -168,7 +168,7 @@
assertSameIds(refIds, refs, 0, 108);
//Verify 100 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(109, msgIds.size());
assertSameIds(msgIds, refs, 0, 108);
@@ -207,7 +207,7 @@
assertSameIds(refIds, refs, 0, 109);
//Verify 110 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(110, msgIds.size());
assertSameIds(msgIds, refs, 0, 109);
@@ -244,7 +244,7 @@
assertSameIds(refIds, refs, 0, 110);
//Verify 111 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(111, msgIds.size());
assertSameIds(msgIds, refs, 0, 110);
@@ -284,7 +284,7 @@
assertSameIds(refIds, refs, 0, 119);
//Verify 120 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(120, msgIds.size());
assertSameIds(msgIds, refs, 0, 119);
@@ -325,7 +325,7 @@
assertSameIds(refIds, refs, 0, 129);
//Verify 130 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(130, msgIds.size());
assertSameIds(msgIds, refs, 0, 129);
@@ -374,7 +374,7 @@
assertSameIds(refIds, refs, 0, 139);
//Verify 140 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(140, msgIds.size());
assertSameIds(msgIds, refs, 0, 139);
@@ -412,7 +412,7 @@
assertSameIds(refIds, refs, 0, 140);
//Verify 141 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(141, msgIds.size());
assertSameIds(msgIds, refs, 0, 140);
@@ -447,7 +447,7 @@
assertSameIds(refIds, refs, 1, 140);
//Verify 140 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(140, msgIds.size());
assertSameIds(msgIds, refs, 1, 140);
@@ -486,7 +486,7 @@
assertSameIds(refIds, refs, 19, 140);
//Verify 122 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(122, msgIds.size());
assertSameIds(msgIds, refs, 19, 140);
@@ -524,7 +524,7 @@
assertSameIds(refIds, refs, 20, 140);
//Verify 121 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(121, msgIds.size());
assertSameIds(msgIds, refs, 20, 140);
@@ -560,7 +560,7 @@
assertSameIds(refIds, refs, 40, 140);
//Verify 120 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(101, msgIds.size());
assertSameIds(msgIds, refs, 40, 140);
@@ -595,7 +595,7 @@
assertSameIds(refIds, refs, 41, 140);
//Verify 100 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(100, msgIds.size());
assertSameIds(msgIds, refs, 41, 140);
@@ -630,7 +630,7 @@
assertSameIds(refIds, refs, 61, 140);
//Verify 80 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(80, msgIds.size());
assertSameIds(msgIds, refs, 61, 140);
@@ -665,7 +665,7 @@
assertSameIds(refIds, refs, 121, 140);
//Verify 20 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(20, msgIds.size());
assertSameIds(msgIds, refs, 121, 140);
@@ -702,7 +702,7 @@
assertSameIds(refIds, refs, 121, 160);
//Verify 40 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(40, msgIds.size());
assertSameIds(msgIds, refs, 121, 160);
@@ -740,7 +740,7 @@
assertSameIds(refIds, refs, 121, 180);
//Verify 60 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(60, msgIds.size());
assertSameIds(msgIds, refs, 121, 180);
@@ -780,7 +780,7 @@
assertSameIds(refIds, refs, 121, 240);
//Verify 120 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(120, msgIds.size());
assertSameIds(msgIds, refs, 121, 240);
@@ -817,7 +817,7 @@
assertSameIds(refIds, refs, 121, 240);
//Verify 120 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(120, msgIds.size());
assertSameIds(msgIds, refs, 121, 240);
@@ -852,7 +852,7 @@
assertSameIds(refIds, refs, 121, 240);
//Verify 120 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(120, msgIds.size());
assertSameIds(msgIds, refs, 121, 240);
@@ -888,7 +888,7 @@
assertSameIds(refIds, refs, 121, 240);
//Verify 120 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(120, msgIds.size());
assertSameIds(msgIds, refs, 121, 240);
@@ -924,7 +924,7 @@
assertSameIds(refIds, refs, 121, 240);
//Verify 120 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(120, msgIds.size());
assertSameIds(msgIds, refs, 121, 240);
@@ -958,7 +958,7 @@
assertSameIds(refIds, refs, 171, 240);
//Verify 70 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(70, msgIds.size());
assertSameIds(msgIds, refs, 171, 240);
@@ -988,7 +988,7 @@
assertEquals(0, refIds.size());
//Verify 0 msgs in storage
- msgIds = getMessageIds();
+ pm.reapUnreferencedMessages(); msgIds = getMessageIds();
assertEquals(0, msgIds.size());
//Verify 0 refs in queue
Modified: trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_ReloadTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_ReloadTest.java 2007-08-21 16:24:12 UTC (rev 3021)
+++ trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_ReloadTest.java 2007-08-21 21:07:50 UTC (rev 3022)
@@ -227,6 +227,7 @@
refIds = getReferenceIdsOrderedByPageOrd(queue2.getChannelID());
assertEquals(0, refIds.size());
+ pm.reapUnreferencedMessages();
List msgIds = getMessageIds();
assertEquals(0, msgIds.size());
@@ -292,6 +293,7 @@
refIds = getReferenceIdsOrderedByPageOrd(queue.getChannelID());
assertEquals(0, refIds.size());
+ pm.reapUnreferencedMessages();
List msgIds = getMessageIds();
assertEquals(0, msgIds.size());
Modified: trunk/tests/src/org/jboss/test/messaging/jms/AcknowledgementTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/AcknowledgementTest.java 2007-08-21 16:24:12 UTC (rev 3021)
+++ trunk/tests/src/org/jboss/test/messaging/jms/AcknowledgementTest.java 2007-08-21 21:07:50 UTC (rev 3022)
@@ -65,7 +65,7 @@
// TestCase overrides -------------------------------------------
// Public --------------------------------------------------------
-
+
/* Topics shouldn't hold on to messages if there are no subscribers */
public void testPersistentMessagesForTopicDropped() throws Exception
{
Modified: trunk/tests/src/org/jboss/test/messaging/jms/JMSTestCase.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/JMSTestCase.java 2007-08-21 16:24:12 UTC (rev 3021)
+++ trunk/tests/src/org/jboss/test/messaging/jms/JMSTestCase.java 2007-08-21 21:07:50 UTC (rev 3022)
@@ -110,6 +110,7 @@
if (ServerManagement.isStarted(0))
{
+ ServerManagement.getServer().reapMessages();
if (checkNoMessageData())
{
fail("Message Data exists");
Modified: trunk/tests/src/org/jboss/test/messaging/jms/MessageCleanupTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/MessageCleanupTest.java 2007-08-21 16:24:12 UTC (rev 3021)
+++ trunk/tests/src/org/jboss/test/messaging/jms/MessageCleanupTest.java 2007-08-21 21:07:50 UTC (rev 3022)
@@ -115,6 +115,8 @@
assertEquals(0, getReferenceIds().size());
+ ServerManagement.getServer().reapMessages();
+
assertEquals(0, getMessageIds().size());
conn.close();
@@ -162,6 +164,9 @@
assertEquals(0, getReferenceIds().size());
+ ServerManagement.getServer().reapMessages();
+
+
assertEquals(0, getMessageIds().size());
@@ -213,6 +218,9 @@
assertEquals(0, getReferenceIds().size());
+ ServerManagement.getServer().reapMessages();
+
+
assertEquals(0, getMessageIds().size());
}
@@ -264,6 +272,9 @@
assertEquals(0, getReferenceIds().size());
+ ServerManagement.getServer().reapMessages();
+
+
assertEquals(0, getMessageIds().size());
}
Modified: trunk/tests/src/org/jboss/test/messaging/jms/XARecoveryTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/XARecoveryTest.java 2007-08-21 16:24:12 UTC (rev 3021)
+++ trunk/tests/src/org/jboss/test/messaging/jms/XARecoveryTest.java 2007-08-21 21:07:50 UTC (rev 3022)
@@ -291,12 +291,7 @@
m = cons2.receive(1000);
assertNull(m);
-
- if (checkNoMessageData())
- {
- fail("Data remains in database");
- }
-
+
}
finally
{
@@ -526,13 +521,7 @@
m = cons2.receive(1000);
- assertNull(m);
-
- if (checkNoMessageData())
- {
- fail("Data remains in database");
- }
-
+ assertNull(m);
}
finally
{
@@ -796,12 +785,6 @@
m = cons2.receive(1000);
assertNull(m);
-
- if (checkNoMessageData())
- {
- fail("Data remains in database");
- }
-
}
finally
{
@@ -1070,12 +1053,6 @@
assertNull(m);
cons1.close();
-
- if (checkNoMessageData())
- {
- fail("Data remains in database");
- }
-
}
finally
{
@@ -1296,12 +1273,6 @@
m = cons2.receive(1000);
assertNull(m);
-
- if (checkNoMessageData())
- {
- fail("Data remains in database");
- }
-
}
finally
{
@@ -1437,10 +1408,6 @@
assertEquals(tm1.getText(), rm1.getText());
- if (checkNoMessageData())
- {
- fail("Data remains in database");
- }
}
finally
{
@@ -1603,11 +1570,7 @@
assertNotNull(rm1);
assertEquals(tm1.getText(), rm1.getText());
-
- if (checkNoMessageData())
- {
- fail("Data remains in database");
- }
+
}
finally
{
@@ -1747,10 +1710,6 @@
assertNull(m);
- if (checkNoMessageData())
- {
- fail("Data remains in database");
- }
}
finally
{
@@ -1898,11 +1857,7 @@
Message m = cons1.receive(1000);
assertNull(m);
-
- if (checkNoMessageData())
- {
- fail("Data remains in database");
- }
+
}
finally
{
@@ -2161,10 +2116,6 @@
sess1.unsubscribe("sub2");
- if (checkNoMessageData())
- {
- fail("Data remains in database");
- }
}
finally
{
@@ -2434,11 +2385,7 @@
sess1.unsubscribe("sub1");
sess1.unsubscribe("sub2");
-
- if (checkNoMessageData())
- {
- fail("Data remains in database");
- }
+
}
finally
{
@@ -2651,11 +2598,6 @@
TextMessage m5 = (TextMessage)cons.receive(1000);
assertNull(m5);
-
- if (checkNoMessageData())
- {
- fail("Data remains in database");
- }
}
finally
{
@@ -2812,11 +2754,7 @@
Message nullMessage = cons.receive(MIN_TIMEOUT);
assertTrue(nullMessage == null);
-
- if (checkNoMessageData())
- {
- fail("Data remains in database");
- }
+
}
finally
{
@@ -2965,11 +2903,7 @@
assertNotNull(m2);
assertEquals("testing2", m2.getText());
-
- if (checkNoMessageData())
- {
- fail("Data remains in database");
- }
+
}
finally
{
@@ -3111,11 +3045,6 @@
assertNotNull(m2);
assertEquals("testing2", m2.getText());
-
- if (checkNoMessageData())
- {
- fail("Data remains in database");
- }
}
finally
{
@@ -3268,10 +3197,6 @@
assertNotNull(m3);
assertEquals("testing3", m3.getText());
- if (checkNoMessageData())
- {
- fail("Data remains in database");
- }
}
finally
{
Modified: trunk/tests/src/org/jboss/test/messaging/jms/XATest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/XATest.java 2007-08-21 16:24:12 UTC (rev 3021)
+++ trunk/tests/src/org/jboss/test/messaging/jms/XATest.java 2007-08-21 21:07:50 UTC (rev 3022)
@@ -1130,11 +1130,6 @@
assertEqualByteArrays(trailing.getBranchQualifier(), trailing2.getBranchQualifier());
res.commit(trailing, false);
-
- if (checkNoMessageData())
- {
- fail("Data remains in database");
- }
}
finally
{
Modified: trunk/tests/src/org/jboss/test/messaging/tools/container/LocalTestServer.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/tools/container/LocalTestServer.java 2007-08-21 16:24:12 UTC (rev 3021)
+++ trunk/tests/src/org/jboss/test/messaging/tools/container/LocalTestServer.java 2007-08-21 21:07:50 UTC (rev 3022)
@@ -899,6 +899,11 @@
getServerPeer().resetAllSuckers();
}
+ public void reapMessages() throws Exception
+ {
+ getServerPeer().getPersistenceManagerInstance().reapUnreferencedMessages();
+ }
+
// Public ---------------------------------------------------------------------------------------
// Package protected ----------------------------------------------------------------------------
Modified: trunk/tests/src/org/jboss/test/messaging/tools/container/RMITestServer.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/tools/container/RMITestServer.java 2007-08-21 16:24:12 UTC (rev 3021)
+++ trunk/tests/src/org/jboss/test/messaging/tools/container/RMITestServer.java 2007-08-21 21:07:50 UTC (rev 3022)
@@ -501,6 +501,11 @@
server.resetAllSuckers();
}
+ public void reapMessages() throws Exception
+ {
+ server.reapMessages();
+ }
+
// Public --------------------------------------------------------
// Package protected ---------------------------------------------
Modified: trunk/tests/src/org/jboss/test/messaging/tools/container/Server.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/tools/container/Server.java 2007-08-21 16:24:12 UTC (rev 3021)
+++ trunk/tests/src/org/jboss/test/messaging/tools/container/Server.java 2007-08-21 21:07:50 UTC (rev 3022)
@@ -288,4 +288,6 @@
void flushManagedConnectionPool() throws Exception;
void resetAllSuckers() throws Exception;
+
+ void reapMessages() throws Exception;
}
More information about the jboss-cvs-commits
mailing list