[jboss-cvs] JBoss Messaging SVN: r3024 - trunk/src/main/org/jboss/messaging/core/impl.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Aug 22 07:52:23 EDT 2007
Author: timfox
Date: 2007-08-22 07:52:23 -0400 (Wed, 22 Aug 2007)
New Revision: 3024
Modified:
trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
Log:
Persistence manager interim commit 3
Modified: trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java 2007-08-22 10:44:34 UTC (rev 3023)
+++ trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java 2007-08-22 11:52:23 UTC (rev 3024)
@@ -631,6 +631,8 @@
PreparedStatement psInsertReference = null;
PreparedStatement psInsertMessage = null;
+ List persistedMessages = new ArrayList();
+
try
{
Iterator iter = references.iterator();
@@ -647,7 +649,7 @@
//Now store the reference
- //log.info("Paged ref with page order " + ref.getPagingOrder());
+ log.trace("Paged ref with page order " + ref.getPagingOrder());
addReference(channelID, ref, psInsertReference, page);
@@ -675,12 +677,27 @@
if (trace) { log.trace("Inserted " + rows + " rows"); }
m.setPersisted(true);
+
+ persistedMessages.add(m);
}
}
}
return null;
}
+ catch (SQLException e)
+ {
+ //The tx will be rolled back
+ //so we need to set the messages to not persisted
+ for (Iterator iter = persistedMessages.iterator(); iter.hasNext(); )
+ {
+ Message msg = (Message)iter.next();
+
+ msg.setPersisted(false);
+ }
+
+ throw e;
+ }
finally
{
closeStatement(psInsertReference);
@@ -696,76 +713,70 @@
public void removeDepagedReferences(long channelID, List references) throws Exception
{
if (trace) { log.trace(this + " Removing " + references.size() + " refs from channel " + channelID); }
-
- Connection conn = null;
- PreparedStatement psDeleteReference = null;
- TransactionWrapper wrap = new TransactionWrapper();
-
- //We order the references
- // orderReferences(references);
-
- try
+
+ class RemoveDepagedReferencesRunner extends JDBCTxRunner
{
- //We get locks on all the messages - since they are ordered we avoid deadlock
- // getLocks(references);
-
- conn = ds.getConnection();
-
- Iterator iter = references.iterator();
-
- psDeleteReference = conn.prepareStatement(getSQLStatement("DELETE_MESSAGE_REF"));
-
- while (iter.hasNext())
- {
- MessageReference ref = (MessageReference) iter.next();
-
- removeReference(channelID, ref, psDeleteReference);
-
- //log.info("Removed ref with page order " + ref.getPagingOrder());
-
- if (usingBatchUpdates)
- {
- psDeleteReference.addBatch();
- }
- else
- {
- int rows = executeWithRetry(psDeleteReference);
-
- if (trace) { log.trace("Deleted " + rows + " rows"); }
- }
-
- //There is a small possibility that the ref is depaged here, then paged again, before this flag is set
- //and the tx is committed so the message be attempted to be inserted twice but this should be ok
- //since we ignore key violations on message insert
- ref.getMessage().setPersisted(false);
- }
-
- if (usingBatchUpdates)
- {
- int[] rowsReference = executeWithRetryBatch(psDeleteReference);
-
- if (trace) { logBatchUpdate(getSQLStatement("DELETE_MESSAGE_REF"), rowsReference, "deleted"); }
- }
+ long channelID;
+ List references;
+
+ public RemoveDepagedReferencesRunner(long channelID, List references)
+ {
+ this.channelID = channelID;
+ this.references = references;
+ }
+
+ public Object doTransaction() throws Exception
+ {
+ PreparedStatement psDeleteReference = null;
+
+ try
+ {
+ psDeleteReference = conn.prepareStatement(getSQLStatement("DELETE_MESSAGE_REF"));
+
+ Iterator iter = references.iterator();
+
+ while (iter.hasNext())
+ {
+ MessageReference ref = (MessageReference) iter.next();
+
+ removeReference(channelID, ref, psDeleteReference);
+
+ //log.info("Removed ref with page order " + ref.getPagingOrder());
+
+ if (usingBatchUpdates)
+ {
+ psDeleteReference.addBatch();
+ }
+ else
+ {
+ int rows = psDeleteReference.executeUpdate();
+
+ if (trace) { log.trace("Deleted " + rows + " rows"); }
+ }
+
+ //There is a small possibility that the ref is depaged here, then paged again, before this flag is set
+ //and the tx is committed so the message be attempted to be inserted twice but this should be ok
+ //since we ignore key violations on message insert
+ ref.getMessage().setPersisted(false);
+ }
+
+ if (usingBatchUpdates)
+ {
+ int[] rowsReference = executeWithRetryBatch(psDeleteReference);
+
+ if (trace) { logBatchUpdate(getSQLStatement("DELETE_MESSAGE_REF"), rowsReference, "deleted"); }
+ }
+
+ return null;
+ }
+ finally
+ {
+ closeStatement(psDeleteReference);
+ }
+ }
}
- catch (Exception e)
- {
- wrap.exceptionOccurred();
- throw e;
- }
- finally
- {
- closeStatement(psDeleteReference);
- closeConnection(conn);
-// try
-// {
- wrap.end();
-// }
-// finally
-// {
-// //And then release locks
-// this.releaseLocks(references);
-// }
- }
+
+ new RemoveDepagedReferencesRunner(channelID, references).executeWithRetry();
}
// After loading paged refs this is used to update P messages to non paged
More information about the jboss-cvs-commits
mailing list