[jboss-cvs] JBoss Messaging SVN: r3023 - trunk/src/main/org/jboss/messaging/core/impl.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Aug 22 06:44:34 EDT 2007
Author: timfox
Date: 2007-08-22 06:44:34 -0400 (Wed, 22 Aug 2007)
New Revision: 3023
Modified:
trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
trunk/src/main/org/jboss/messaging/core/impl/PagingChannelSupport.java
Log:
Persistence Manage interim commit 2
Modified: trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java 2007-08-21 21:07:50 UTC (rev 3022)
+++ trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java 2007-08-22 10:44:34 UTC (rev 3023)
@@ -31,10 +31,9 @@
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.sql.Statement;
import java.sql.Types;
import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -61,7 +60,6 @@
import org.jboss.messaging.core.impl.tx.Transaction;
import org.jboss.messaging.core.impl.tx.TxCallback;
import org.jboss.messaging.util.JDBCUtil;
-import org.jboss.messaging.util.LockMap;
import org.jboss.messaging.util.StreamUtils;
import org.jboss.messaging.util.Util;
@@ -311,7 +309,82 @@
}
}
+
+ abstract class JDBCTxRunner
+ {
+ private static final int MAX_TRIES = 25;
+
+ Connection conn;
+
+ TransactionWrapper wrap;
+
+ public Object execute() throws Exception
+ {
+ wrap = new TransactionWrapper();
+
+ try
+ {
+ conn = ds.getConnection();
+
+ return doTransaction();
+ }
+ catch (Exception e)
+ {
+ wrap.exceptionOccurred();
+ throw e;
+ }
+ finally
+ {
+ closeConnection(conn);
+ wrap.end();
+ }
+ }
+
+ public Object executeWithRetry() throws Exception
+ {
+ int tries = 0;
+
+ while (true)
+ {
+ try
+ {
+ Object res = execute();
+
+ if (tries > 0)
+ {
+ log.warn("Update worked after retry");
+ }
+ return res;
+ }
+ catch (SQLException e)
+ {
+ log.warn("SQLException caught - assuming deadlock detected, try:" + (tries + 1), e);
+
+ log.info("SQLState:" + e.getSQLState() + " code:" + e.getErrorCode());
+
+ tries++;
+ if (tries == MAX_TRIES)
+ {
+ log.error("Retried " + tries + " times, now giving up");
+ throw new IllegalStateException("Failed to excecute transaction");
+ }
+ log.warn("Trying again after a pause");
+ //Now we wait for a random amount of time to minimise risk of deadlock
+ Thread.sleep((long)(Math.random() * 500));
+ }
+ }
+ }
+
+ public abstract Object doTransaction() throws Exception;
+ }
+
+
+
+
+
+
+
// Related to counters
// ===================
@@ -324,83 +397,87 @@
throw new IllegalArgumentException("block size must be > 0");
}
- Connection conn = null;
- PreparedStatement ps = null;
- ResultSet rs = null;
- TransactionWrapper wrap = new TransactionWrapper();
-
- try
+ class ReserveIDBlockRunner extends JDBCTxRunner
{
- conn = ds.getConnection();
-
- //For the clustered case - this MUST use SELECT .. FOR UPDATE or a similar
- //construct the locks the row
- String selectCounterSQL = getSQLStatement("SELECT_COUNTER");
-
- ps = conn.prepareStatement(selectCounterSQL);
-
- ps.setString(1, counterName);
-
- rs = ps.executeQuery();
-
- if (trace) { log.trace(JDBCUtil.statementToString(selectCounterSQL, counterName)); }
-
- if (!rs.next())
- {
- rs.close();
- rs = null;
+ String counterName;
+
+ int size;
+
+ public ReserveIDBlockRunner(String counterName, int size)
+ {
+ this.counterName = counterName;
+ this.size = size;
+ }
+
+ public Object doTransaction() throws Exception
+ {
+ // For the clustered case - this MUST use SELECT .. FOR UPDATE or a similar
+ //construct the locks the row
+ String selectCounterSQL = getSQLStatement("SELECT_COUNTER");
- ps.close();
+ PreparedStatement ps = null;
+ ResultSet rs = null;
- //There is a very small possibility that two threads will attempt to insert the same counter
- //at the same time, if so, then the second one will fail eventually after a few retries by throwing
- //a primary key violation.
-
- String insertCounterSQL = getSQLStatement("INSERT_COUNTER");
-
- ps = conn.prepareStatement(insertCounterSQL);
-
- ps.setString(1, counterName);
- ps.setLong(2, size);
-
- int rows = executeWithRetry(ps);
- if (trace) { log.trace(JDBCUtil.statementToString(insertCounterSQL, counterName, new Integer(size)) + " inserted " + rows + " rows"); }
-
- ps.close();
- ps = null;
- return 0;
- }
-
- long nextId = rs.getLong(1);
-
- rs.close();
- rs = null;
-
- ps.close();
+ try
+ {
+ ps = conn.prepareStatement(selectCounterSQL);
+
+ ps.setString(1, counterName);
+
+ rs = ps.executeQuery();
+
+ if (trace) { log.trace(JDBCUtil.statementToString(selectCounterSQL, counterName)); }
+
+ if (!rs.next())
+ {
+ rs.close();
+ rs = null;
+
+ ps.close();
+
+ //There is a very small possibility that two threads will attempt to insert the same counter
+ //at the same time, if so, then the second one will fail eventually after a few retries by throwing
+ //a primary key violation.
+
+ String insertCounterSQL = getSQLStatement("INSERT_COUNTER");
+
+ ps = conn.prepareStatement(insertCounterSQL);
+
+ ps.setString(1, counterName);
+ ps.setLong(2, size);
+
+ int rows = ps.executeUpdate();
+ if (trace) { log.trace(JDBCUtil.statementToString(insertCounterSQL, counterName, new Integer(size)) + " inserted " + rows + " rows"); }
+
+ return 0L;
+ }
+
+ long nextId = rs.getLong(1);
+
+ ps.close();
+
+ String updateCounterSQL = getSQLStatement("UPDATE_COUNTER");
+
+ ps = conn.prepareStatement(updateCounterSQL);
+
+ ps.setLong(1, nextId + size);
+ ps.setString(2, counterName);
- String updateCounterSQL = getSQLStatement("UPDATE_COUNTER");
+ int rows = ps.executeUpdate();
- ps = conn.prepareStatement(updateCounterSQL);
-
- ps.setLong(1, nextId + size);
- ps.setString(2, counterName);
-
- int rows = executeWithRetry(ps);
- if (trace) { log.trace(JDBCUtil.statementToString(updateCounterSQL, new Long(nextId + size), counterName) + " updated " + rows + " rows"); }
-
- return nextId;
+ if (trace) { log.trace(JDBCUtil.statementToString(updateCounterSQL, new Long(nextId + size), counterName) + " updated " + rows + " rows"); }
+
+ return nextId;
+ }
+ finally
+ {
+ closeResultSet(rs);
+ closeStatement(ps);
+ }
+ }
}
- catch (Exception e)
- {
- wrap.exceptionOccurred();
- throw e;
- }
- finally
- {
- closeStatement(ps);
- closeConnection(conn);
- wrap.end();
- }
+
+ return (Long)new ReserveIDBlockRunner(counterName, size).executeWithRetry();
}
/*
@@ -535,143 +612,84 @@
//Used to page NP messages or P messages in a non recoverable queue
public void pageReferences(long channelID, List references, boolean page) throws Exception
- {
- Connection conn = null;
- PreparedStatement psInsertReference = null;
- PreparedStatement psInsertMessage = null;
- // PreparedStatement psMessageExists = null;
- TransactionWrapper wrap = new TransactionWrapper();
-
- //First we order the references in message order
- //orderReferences(references);
-
- try
+ {
+ class PageReferencesRunner extends JDBCTxRunner
{
-// //Now we get a lock on all the messages. Since we have ordered the refs we should avoid deadlock
-// getLocks(references);
-//
- conn = ds.getConnection();
-
- Iterator iter = references.iterator();
-
- psInsertReference = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE_REF"));
- psInsertMessage = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE"));
-
- // boolean insertsInBatch = false;
-
- while (iter.hasNext())
- {
- //We may need to persist the message itself
- MessageReference ref = (MessageReference) iter.next();
-
- //For non reliable refs we insert the ref (and maybe the message) itself
-
- //Now store the reference
- addReference(channelID, ref, psInsertReference, page);
-
-// if (usingBatchUpdates)
-// {
-// psInsertReference.addBatch();
-// }
-// else
-// {
- int rows = executeWithRetry(psInsertReference);
-
- if (trace)
- {
- log.trace("Inserted " + rows + " rows");
- }
- //}
-
- //Maybe we need to persist the message itself
- Message m = ref.getMessage();
-
- synchronized (m)
- {
-
- if (!m.isPersisted())
- {
- //The message might actually already exist due to it already being paged
- //so we insert and ignore key violations
-
-// if (psMessageExists == null)
-// {
-// psMessageExists = conn.prepareStatement(getSQLStatement("MESSAGE_EXISTS"));
-// }
-//
-// psMessageExists.setLong(1, m.getMessageID());
-//
-// ResultSet rs = null;
-//
-// try
-// {
-// rs = psMessageExists.executeQuery();
-//
-// if (!rs.next())
-// {
- storeMessage(m, psInsertMessage);
-
-// if (usingBatchUpdates)
-// {
-// psInsertMessage.addBatch();
-//
-// insertsInBatch = true;
-// }
-// else
-// {
- rows = executeWithRetryIgnoreKeyViolation(psInsertMessage);
-
- if (trace) { log.trace("Inserted " + rows + " rows"); }
- // }
- m.setPersisted(true);
- }
-// }
-// finally
-// {
-// if (rs != null)
-// {
-// rs.close();
-// }
-// }
- // }
- }
- }
-
-// if (usingBatchUpdates)
-// {
-// int[] rowsReference = executeWithRetryBatch(psInsertReference);
-//
-// if (trace) { logBatchUpdate(getSQLStatement("INSERT_MESSAGE_REF"), rowsReference, "inserted"); }
-//
-// if (insertsInBatch)
-// {
-// int[] rowsMessage = executeWithRetryBatch(psInsertMessage);
-//
-// if (trace) { logBatchUpdate(getSQLStatement("INSERT_MESSAGE"), rowsMessage, "inserted"); }
-// }
-// }
+ long channelID;
+ List references;
+ boolean page;
+
+ public PageReferencesRunner(long channelID, List references, boolean page)
+ {
+ this.channelID = channelID;
+ this.references = references;
+ this.page = page;
+ }
+
+ public Object doTransaction() throws Exception
+ {
+ PreparedStatement psInsertReference = null;
+ PreparedStatement psInsertMessage = null;
+
+ try
+ {
+ Iterator iter = references.iterator();
+
+ psInsertReference = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE_REF"));
+ psInsertMessage = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE"));
+
+ while (iter.hasNext())
+ {
+ //We may need to persist the message itself
+ MessageReference ref = (MessageReference) iter.next();
+
+ //For non reliable refs we insert the ref (and maybe the message) itself
+
+ //Now store the reference
+
+ //log.info("Paged ref with page order " + ref.getPagingOrder());
+
+ addReference(channelID, ref, psInsertReference, page);
+
+ int rows = psInsertReference.executeUpdate();
+
+ if (trace)
+ {
+ log.trace("Inserted " + rows + " rows");
+ }
+
+ //Maybe we need to persist the message itself
+ Message m = ref.getMessage();
+
+ synchronized (m)
+ {
+ if (!m.isPersisted())
+ {
+ //The message might actually already exist due to it already being paged
+ //so we insert and ignore key violations
+
+ storeMessage(m, psInsertMessage);
+
+ rows = psInsertMessage.executeUpdate();
+
+ if (trace) { log.trace("Inserted " + rows + " rows"); }
+
+ m.setPersisted(true);
+ }
+ }
+ }
+
+ return null;
+ }
+ finally
+ {
+ closeStatement(psInsertReference);
+ closeStatement(psInsertMessage);
+ }
+ }
}
- catch (Exception e)
- {
- wrap.exceptionOccurred();
- throw e;
- }
- finally
- {
- closeStatement(psInsertReference);
- closeStatement(psInsertMessage);
- // closeStatement(psMessageExists);
- closeConnection(conn);
- // try
- // {
- wrap.end();
-// }
-// finally
-// {
-// //And then release locks
-// this.releaseLocks(references);
-// }
- }
+
+ new PageReferencesRunner(channelID, references, page).executeWithRetry();
}
//After loading paged refs this is used to remove any NP or P messages in a unrecoverable channel
@@ -703,6 +721,8 @@
removeReference(channelID, ref, psDeleteReference);
+ //log.info("Removed ref with page order " + ref.getPagingOrder());
+
if (usingBatchUpdates)
{
psDeleteReference.addBatch();
@@ -2890,19 +2910,19 @@
}
}
- static class MessageOrderComparator implements Comparator
- {
- static MessageOrderComparator instance = new MessageOrderComparator();
-
- public int compare(Object o1, Object o2)
- {
- MessageReference ref1 = (MessageReference)o1;
- MessageReference ref2 = (MessageReference)o2;
-
- long id1 = ref1.getMessage().getMessageID();
- long id2 = ref2.getMessage().getMessageID();
-
- return (id1 < id2 ? -1 : (id1 == id2 ? 0 : 1));
- }
- }
+// static class MessageOrderComparator implements Comparator
+// {
+// static MessageOrderComparator instance = new MessageOrderComparator();
+//
+// public int compare(Object o1, Object o2)
+// {
+// MessageReference ref1 = (MessageReference)o1;
+// MessageReference ref2 = (MessageReference)o2;
+//
+// long id1 = ref1.getMessage().getMessageID();
+// long id2 = ref2.getMessage().getMessageID();
+//
+// return (id1 < id2 ? -1 : (id1 == id2 ? 0 : 1));
+// }
+// }
}
Modified: trunk/src/main/org/jboss/messaging/core/impl/PagingChannelSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/PagingChannelSupport.java 2007-08-21 21:07:50 UTC (rev 3022)
+++ trunk/src/main/org/jboss/messaging/core/impl/PagingChannelSupport.java 2007-08-22 10:44:34 UTC (rev 3023)
@@ -441,6 +441,8 @@
Iterator iter = downCache.iterator();
+ log.info("Flushing down cache");
+
while (iter.hasNext())
{
MessageReference ref = (MessageReference) iter.next();
More information about the jboss-cvs-commits
mailing list