[jboss-cvs] JBoss Messaging SVN: r3025 - trunk/src/main/org/jboss/messaging/core/impl.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Aug 22 09:14:56 EDT 2007
Author: timfox
Date: 2007-08-22 09:14:56 -0400 (Wed, 22 Aug 2007)
New Revision: 3025
Modified:
trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
Log:
PersistenceManager interim commit 4
Modified: trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java 2007-08-22 11:52:23 UTC (rev 3024)
+++ trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java 2007-08-22 13:14:56 UTC (rev 3025)
@@ -87,8 +87,6 @@
private boolean trace = log.isTraceEnabled();
- private boolean usingBatchUpdates = false;
-
private boolean usingBinaryStream = true;
private boolean usingTrailingByte = false;
@@ -117,7 +115,7 @@
{
super(ds, tm, sqlProperties, createTablesOnStartup);
- this.usingBatchUpdates = usingBatchUpdates;
+ //usingBatchUpdates is currently ignored due to sketchy support from databases
this.usingBinaryStream = usingBinaryStream;
@@ -263,7 +261,7 @@
try
{
- List transactions = new ArrayList();
+ List<PreparedTxInfo> transactions = new ArrayList<PreparedTxInfo> ();
conn = ds.getConnection();
@@ -309,86 +307,10 @@
}
}
-
- abstract class JDBCTxRunner
- {
- private static final int MAX_TRIES = 25;
-
- Connection conn;
-
- TransactionWrapper wrap;
-
- public Object execute() throws Exception
- {
- wrap = new TransactionWrapper();
-
- try
- {
- conn = ds.getConnection();
-
- return doTransaction();
- }
- catch (Exception e)
- {
- wrap.exceptionOccurred();
- throw e;
- }
- finally
- {
- closeConnection(conn);
- wrap.end();
- }
- }
-
- public Object executeWithRetry() throws Exception
- {
- int tries = 0;
-
- while (true)
- {
- try
- {
- Object res = execute();
-
- if (tries > 0)
- {
- log.warn("Update worked after retry");
- }
- return res;
- }
- catch (SQLException e)
- {
- log.warn("SQLException caught - assuming deadlock detected, try:" + (tries + 1), e);
-
- log.info("SQLState:" + e.getSQLState() + " code:" + e.getErrorCode());
-
- tries++;
- if (tries == MAX_TRIES)
- {
- log.error("Retried " + tries + " times, now giving up");
- throw new IllegalStateException("Failed to excecute transaction");
- }
- log.warn("Trying again after a pause");
- //Now we wait for a random amount of time to minimise risk of deadlock
- Thread.sleep((long)(Math.random() * 500));
- }
- }
- }
-
- public abstract Object doTransaction() throws Exception;
- }
-
-
-
-
-
-
-
-
// Related to counters
// ===================
- public long reserveIDBlock(String counterName, int size) throws Exception
+ public long reserveIDBlock(final String counterName, final int size) throws Exception
{
if (trace) { log.trace("Getting ID block for counter " + counterName + ", size " + size); }
@@ -399,16 +321,6 @@
class ReserveIDBlockRunner extends JDBCTxRunner
{
- String counterName;
-
- int size;
-
- public ReserveIDBlockRunner(String counterName, int size)
- {
- this.counterName = counterName;
- this.size = size;
- }
-
public Object doTransaction() throws Exception
{
// For the clustered case - this MUST use SELECT .. FOR UPDATE or a similar
@@ -477,7 +389,7 @@
}
}
- return (Long)new ReserveIDBlockRunner(counterName, size).executeWithRetry();
+ return (Long)new ReserveIDBlockRunner().executeWithRetry();
}
/*
@@ -508,7 +420,7 @@
int count = 0;
- List msgs = new ArrayList();
+ List<Message> msgs = new ArrayList<Message>();
while (iter.hasNext())
{
@@ -611,27 +523,16 @@
//Used to page NP messages or P messages in a non recoverable queue
- public void pageReferences(long channelID, List references, boolean page) throws Exception
+ public void pageReferences(final long channelID, final List references, final boolean page) throws Exception
{
class PageReferencesRunner extends JDBCTxRunner
{
- long channelID;
- List references;
- boolean page;
-
- public PageReferencesRunner(long channelID, List references, boolean page)
- {
- this.channelID = channelID;
- this.references = references;
- this.page = page;
- }
-
public Object doTransaction() throws Exception
{
PreparedStatement psInsertReference = null;
PreparedStatement psInsertMessage = null;
- List persistedMessages = new ArrayList();
+ List<Message> persistedMessages = new ArrayList<Message>();
try
{
@@ -685,7 +586,7 @@
return null;
}
- catch (SQLException e)
+ catch (Exception e)
{
//The tx will be rolled back
//so we need to set the messages to not persisted
@@ -706,25 +607,16 @@
}
}
- new PageReferencesRunner(channelID, references, page).executeWithRetry();
+ new PageReferencesRunner().executeWithRetry();
}
//After loading paged refs this is used to remove any NP or P messages in a unrecoverable channel
- public void removeDepagedReferences(long channelID, List references) throws Exception
+ public void removeDepagedReferences(final long channelID, final List references) throws Exception
{
if (trace) { log.trace(this + " Removing " + references.size() + " refs from channel " + channelID); }
class RemoveDepagedReferencesRunner extends JDBCTxRunner
{
- long channelID;
- List references;
-
- public RemoveDepagedReferencesRunner(long channelID, List references)
- {
- this.channelID = channelID;
- this.references = references;
- }
-
public Object doTransaction() throws Exception
{
PreparedStatement psDeleteReference = null;
@@ -743,16 +635,9 @@
//log.info("Removed ref with page order " + ref.getPagingOrder());
- if (usingBatchUpdates)
- {
- psDeleteReference.addBatch();
- }
- else
- {
- int rows = psDeleteReference.executeUpdate();
+ int rows = psDeleteReference.executeUpdate();
- if (trace) { log.trace("Deleted " + rows + " rows"); }
- }
+ if (trace) { log.trace("Deleted " + rows + " rows"); }
//There is a small possibility that the ref is depaged here, then paged again, before this flag is set
//and the tx is committed so the message be attempted to be inserted twice but this should be ok
@@ -760,13 +645,6 @@
ref.getMessage().setPersisted(false);
}
- if (usingBatchUpdates)
- {
- int[] rowsReference = executeWithRetryBatch(psDeleteReference);
-
- if (trace) { logBatchUpdate(getSQLStatement("DELETE_MESSAGE_REF"), rowsReference, "deleted"); }
- }
-
return null;
}
finally
@@ -776,92 +654,91 @@
}
}
- new RemoveDepagedReferencesRunner(channelID, references).executeWithRetry();
+ new RemoveDepagedReferencesRunner().executeWithRetry();
}
// After loading paged refs this is used to update P messages to non paged
- public void updateReferencesNotPagedInRange(long channelID, long orderStart, long orderEnd, long num) throws Exception
+ public void updateReferencesNotPagedInRange(final long channelID, final long orderStart, final long orderEnd, final long num) throws Exception
{
if (trace) { log.trace("Updating paged references for channel " + channelID + " between " + orderStart + " and " + orderEnd); }
- Connection conn = null;
- PreparedStatement ps = null;
- TransactionWrapper wrap = new TransactionWrapper();
-
- try
+ class UpdateReferencesNotPagedInRangeRunner extends JDBCTxRunner
{
- conn = ds.getConnection();
-
- ps = conn.prepareStatement(getSQLStatement("UPDATE_REFS_NOT_PAGED"));
-
- ps.setLong(1, orderStart);
-
- ps.setLong(2, orderEnd);
-
- ps.setLong(3, channelID);
-
- int rows = executeWithRetry(ps);
-
- if (trace) { log.trace(JDBCUtil.statementToString(getSQLStatement("UPDATE_REFS_NOT_PAGED"), new Long(channelID),
- new Long(orderStart), new Long(orderEnd)) + " updated " + rows + " rows"); }
-
- //Sanity check
- if (rows != num)
- {
- throw new IllegalStateException("Did not update correct number of rows");
- }
+ public Object doTransaction() throws Exception
+ {
+ PreparedStatement ps = null;
+
+ try
+ {
+ ps = conn.prepareStatement(getSQLStatement("UPDATE_REFS_NOT_PAGED"));
+
+ ps.setLong(1, orderStart);
+
+ ps.setLong(2, orderEnd);
+
+ ps.setLong(3, channelID);
+
+ int rows = ps.executeUpdate();
+
+ if (trace) { log.trace(JDBCUtil.statementToString(getSQLStatement("UPDATE_REFS_NOT_PAGED"), new Long(channelID),
+ new Long(orderStart), new Long(orderEnd)) + " updated " + rows + " rows"); }
+
+ //Sanity check
+ if (rows != num)
+ {
+ throw new IllegalStateException("Did not update correct number of rows");
+ }
+
+ return null;
+ }
+ finally
+ {
+ closeStatement(ps);
+ }
+ }
}
- catch (Exception e)
- {
- wrap.exceptionOccurred();
- throw e;
- }
- finally
- {
- closeStatement(ps);
- closeConnection(conn);
- wrap.end();
- }
+
+ new UpdateReferencesNotPagedInRangeRunner().executeWithRetry();
}
- public void mergeTransactions(long fromChannelID, long toChannelID) throws Exception
+ public void mergeTransactions(final long fromChannelID, final long toChannelID) throws Exception
{
if (trace) { log.trace("Merging transactions from channel " + fromChannelID + " to " + toChannelID); }
-
+
// Sanity check
if (fromChannelID == toChannelID)
{
throw new IllegalArgumentException("Cannot merge transactions - they have the same channel id!!");
}
-
- Connection conn = null;
- PreparedStatement statement = null;
- TransactionWrapper wrap = new TransactionWrapper();
- try
+
+ class MergeTransactionsRunner extends JDBCTxRunner
{
- conn = ds.getConnection();
- statement = conn.prepareStatement(getSQLStatement("UPDATE_TX"));
- statement.setLong(1, toChannelID);
- statement.setLong(2, fromChannelID);
- int affected = statement.executeUpdate();
-
- log.debug("Merged " + affected + " transactions from channel " + fromChannelID + " into node " + toChannelID);
+ public Object doTransaction() throws Exception
+ {
+ PreparedStatement statement = null;
+ try
+ {
+ statement = conn.prepareStatement(getSQLStatement("UPDATE_TX"));
+ statement.setLong(1, toChannelID);
+ statement.setLong(2, fromChannelID);
+ int affected = statement.executeUpdate();
+
+ log.debug("Merged " + affected + " transactions from channel " + fromChannelID + " into node " + toChannelID);
+
+ return null;
+ }
+ finally
+ {
+ closeStatement(statement);
+ }
+ }
}
- catch (Exception e)
- {
- wrap.exceptionOccurred();
- throw e;
- }
- finally
- {
- closeConnection(conn);
- closeStatement(statement);
- wrap.end();
- }
+
+ new MergeTransactionsRunner().executeWithRetry();
}
- public InitialLoadInfo mergeAndLoad(long fromChannelID, long toChannelID, int numberToLoad, long firstPagingOrder, long nextPagingOrder) throws Exception
+ public InitialLoadInfo mergeAndLoad(final long fromChannelID, final long toChannelID, final int numberToLoad, final long firstPagingOrder, final long nextPagingOrder) throws Exception
{
if (trace) { log.trace("Merging channel from " + fromChannelID + " to " + toChannelID + " numberToLoad:" + numberToLoad + " firstPagingOrder:" + firstPagingOrder + " nextPagingOrder:" + nextPagingOrder); }
@@ -871,228 +748,211 @@
{
throw new IllegalArgumentException("Cannot merge queues - they have the same channel id!!");
}
-
- Connection conn = null;
- PreparedStatement ps = null;
- ResultSet rs = null;
- TransactionWrapper wrap = new TransactionWrapper();
- PreparedStatement ps2 = null;
-
- try
- {
- conn = ds.getConnection();
-
- /*
- * If channel is paging and has full size f
- *
- * then we don't need to load any refs but we need to:
- *
- * make sure the page ord is correct across the old paged and new refs
- *
- * we know the max page ord (from the channel) for the old refs so we just need to:
- *
- * 1) Iterate through the failed channel and update page_ord = max + 1, max + 2 etc
- *
- * 2) update channel id
- *
- *
- * If channel is not paging and the total refs before and after <=f
- *
- * 1) Load all refs from failed channel
- *
- * 2) Update channel id
- *
- * return those refs
- *
- *
- * If channel is not paging but total new refs > f
- *
- * 1) Iterate through failed channel refs and take the first x to make the channel full
- *
- * 2) Update the others with page_ord starting at zero
- *
- * 3) Update channel id
- *
- * In general:
- *
- * We have number to load n, max page size p
- *
- * 1) Iterate through failed channel refs in page_ord order
- *
- * 2) Put the first n in a List.
- *
- * 3) Initialise page_ord_count to be p or 0 depending on whether it was specified
- *
- * 4) Update the page_ord of the remaining refs accordiningly
- *
- * 5) Update the channel id
- *
- */
-
- //First load the refs from the failed channel
- List refs = new ArrayList();
-
- ps = conn.prepareStatement(getSQLStatement("LOAD_REFS"));
-
- ps.setLong(1, fromChannelID);
-
- rs = ps.executeQuery();
-
- int count = 0;
-
- boolean arePaged = false;
-
- long pageOrd = nextPagingOrder;
-
- while (rs.next())
- {
- long msgId = rs.getLong(1);
- int deliveryCount = rs.getInt(2);
- long sched = rs.getLong(3);
-
- if (count < numberToLoad)
- {
- ReferenceInfo ri = new ReferenceInfo(msgId, deliveryCount, sched);
-
- refs.add(ri);
- }
-
- // Set page ord
-
- if (ps2 == null)
- {
- ps2 = conn.prepareStatement(getSQLStatement("UPDATE_PAGE_ORDER"));
- }
-
- if (count < numberToLoad)
- {
- ps2.setNull(1, Types.BIGINT);
-
- if (trace) { log.trace("Set page ord to null"); }
- }
- else
- {
- ps2.setLong(1, pageOrd);
-
- if (trace) { log.trace("Set page ord to " + pageOrd); }
-
- arePaged = true;
-
- pageOrd++;
- }
-
- ps2.setLong(2, msgId);
-
- ps2.setLong(3, fromChannelID);
-
- int rows = executeWithRetry(ps2);
-
- if (trace) { log.trace("Update page ord updated " + rows + " rows"); }
-
- count++;
- }
-
- ps.close();
-
- // Now swap the channel id
-
- ps = conn.prepareStatement(getSQLStatement("UPDATE_CHANNEL_ID"));
-
- ps.setLong(1, toChannelID);
-
- ps.setLong(2, fromChannelID);
-
- int rows = executeWithRetry(ps);
-
- if (trace) { log.trace("Update channel id updated " + rows + " rows"); }
-
- if (arePaged)
- {
- return new InitialLoadInfo(new Long(firstPagingOrder), new Long(pageOrd - 1), refs);
- }
- else
- {
- return new InitialLoadInfo(null, null, refs);
- }
- }
- catch (Exception e)
+ class MergeAndLoadRunner extends JDBCTxRunner
{
- wrap.exceptionOccurred();
- throw e;
+ public Object doTransaction() throws Exception
+ {
+ PreparedStatement ps = null;
+ ResultSet rs = null;
+ PreparedStatement ps2 = null;
+
+ try
+ {
+ /*
+ * If channel is paging and has full size f
+ *
+ * then we don't need to load any refs but we need to:
+ *
+ * make sure the page ord is correct across the old paged and new refs
+ *
+ * we know the max page ord (from the channel) for the old refs so we just need to:
+ *
+ * 1) Iterate through the failed channel and update page_ord = max + 1, max + 2 etc
+ *
+ * 2) update channel id
+ *
+ *
+ * If channel is not paging and the total refs before and after <=f
+ *
+ * 1) Load all refs from failed channel
+ *
+ * 2) Update channel id
+ *
+ * return those refs
+ *
+ *
+ * If channel is not paging but total new refs > f
+ *
+ * 1) Iterate through failed channel refs and take the first x to make the channel full
+ *
+ * 2) Update the others with page_ord starting at zero
+ *
+ * 3) Update channel id
+ *
+ * In general:
+ *
+ * We have number to load n, max page size p
+ *
+ * 1) Iterate through failed channel refs in page_ord order
+ *
+ * 2) Put the first n in a List.
+ *
+ * 3) Initialise page_ord_count to be p or 0 depending on whether it was specified
+ *
+ * 4) Update the page_ord of the remaining refs accordiningly
+ *
+ * 5) Update the channel id
+ *
+ */
+
+ //First load the refs from the failed channel
+
+ List<ReferenceInfo> refs = new ArrayList<ReferenceInfo>();
+
+ ps = conn.prepareStatement(getSQLStatement("LOAD_REFS"));
+
+ ps.setLong(1, fromChannelID);
+
+ rs = ps.executeQuery();
+
+ int count = 0;
+
+ boolean arePaged = false;
+
+ long pageOrd = nextPagingOrder;
+
+ while (rs.next())
+ {
+ long msgId = rs.getLong(1);
+ int deliveryCount = rs.getInt(2);
+ long sched = rs.getLong(3);
+
+ if (count < numberToLoad)
+ {
+ ReferenceInfo ri = new ReferenceInfo(msgId, deliveryCount, sched);
+
+ refs.add(ri);
+ }
+
+ // Set page ord
+
+ if (ps2 == null)
+ {
+ ps2 = conn.prepareStatement(getSQLStatement("UPDATE_PAGE_ORDER"));
+ }
+
+ if (count < numberToLoad)
+ {
+ ps2.setNull(1, Types.BIGINT);
+
+ if (trace) { log.trace("Set page ord to null"); }
+ }
+ else
+ {
+ ps2.setLong(1, pageOrd);
+
+ if (trace) { log.trace("Set page ord to " + pageOrd); }
+
+ arePaged = true;
+
+ pageOrd++;
+ }
+
+ ps2.setLong(2, msgId);
+
+ ps2.setLong(3, fromChannelID);
+
+ int rows = ps2.executeUpdate();
+
+ if (trace) { log.trace("Update page ord updated " + rows + " rows"); }
+
+ count++;
+ }
+
+ ps.close();
+
+ ps = null;
+
+ // Now swap the channel id
+
+ ps = conn.prepareStatement(getSQLStatement("UPDATE_CHANNEL_ID"));
+
+ ps.setLong(1, toChannelID);
+
+ ps.setLong(2, fromChannelID);
+
+ int rows = ps.executeUpdate();
+
+ if (trace) { log.trace("Update channel id updated " + rows + " rows"); }
+
+ if (arePaged)
+ {
+ return new InitialLoadInfo(new Long(firstPagingOrder), new Long(pageOrd - 1), refs);
+ }
+ else
+ {
+ return new InitialLoadInfo(null, null, refs);
+ }
+ }
+ finally
+ {
+ closeResultSet(rs);
+ closeStatement(ps);
+ closeStatement(ps2);
+ }
+ }
}
- finally
- {
- closeStatement(ps);
- closeStatement(ps2);
- closeConnection(conn);
- wrap.end();
- }
+ return (InitialLoadInfo)new MergeAndLoadRunner().executeWithRetry();
}
- public void updatePageOrder(long channelID, List references) throws Exception
+ public void updatePageOrder(final long channelID, final List references) throws Exception
{
- Connection conn = null;
- PreparedStatement psUpdateReference = null;
- TransactionWrapper wrap = new TransactionWrapper();
-
if (trace) { log.trace("Updating page order for channel:" + channelID); }
-
- try
+
+ class UpdatePageOrderRunner extends JDBCTxRunner
{
- conn = ds.getConnection();
-
- Iterator iter = references.iterator();
-
- psUpdateReference = conn.prepareStatement(getSQLStatement("UPDATE_PAGE_ORDER"));
-
- while (iter.hasNext())
- {
- MessageReference ref = (MessageReference) iter.next();
-
- psUpdateReference.setLong(1, ref.getPagingOrder());
-
- psUpdateReference.setLong(2, ref.getMessage().getMessageID());
-
- psUpdateReference.setLong(3, channelID);
-
- if (usingBatchUpdates)
- {
- psUpdateReference.addBatch();
- }
- else
- {
- int rows = executeWithRetry(psUpdateReference);
-
- if (trace) { log.trace("Updated " + rows + " rows"); }
- }
- }
-
- if (usingBatchUpdates)
- {
- int[] rowsReference = executeWithRetryBatch(psUpdateReference);
-
- if (trace) { logBatchUpdate(getSQLStatement("UPDATE_PAGE_ORDER"), rowsReference, "updated"); }
- }
+ public Object doTransaction() throws Exception
+ {
+ PreparedStatement psUpdateReference = null;
+ try
+ {
+ Iterator iter = references.iterator();
+
+ psUpdateReference = conn.prepareStatement(getSQLStatement("UPDATE_PAGE_ORDER"));
+
+ while (iter.hasNext())
+ {
+ MessageReference ref = (MessageReference) iter.next();
+
+ psUpdateReference.setLong(1, ref.getPagingOrder());
+
+ psUpdateReference.setLong(2, ref.getMessage().getMessageID());
+
+ psUpdateReference.setLong(3, channelID);
+
+ int rows = psUpdateReference.executeUpdate();
+
+ if (trace) { log.trace("Updated " + rows + " rows"); }
+ }
+
+ return null;
+ }
+ finally
+ {
+ closeStatement(psUpdateReference);
+ }
+ }
}
- catch (Exception e)
- {
- wrap.exceptionOccurred();
- throw e;
- }
- finally
- {
- closeStatement(psUpdateReference);
- closeConnection(conn);
- wrap.end();
- }
+
+ new UpdatePageOrderRunner().executeWithRetry();
}
- public List getPagedReferenceInfos(long channelID, long orderStart, int number) throws Exception
+ public List getPagedReferenceInfos(final long channelID, final long orderStart, final int number) throws Exception
{
if (trace) { log.trace("loading message reference info for channel " + channelID + " from " + orderStart + " number " + number); }
- List refs = new ArrayList();
+ List<ReferenceInfo> refs = new ArrayList<ReferenceInfo>();
Connection conn = null;
PreparedStatement ps = null;
@@ -1160,7 +1020,7 @@
/*
* Load the initial, non paged refs
*/
- public InitialLoadInfo loadFromStart(long channelID, int number) throws Exception
+ public InitialLoadInfo loadFromStart(final long channelID, final int number) throws Exception
{
if (trace) { log.trace("loading initial reference infos for channel " + channelID); }
@@ -1208,7 +1068,7 @@
rs = ps.executeQuery();
- List refs = new ArrayList();
+ List<ReferenceInfo> refs = new ArrayList<ReferenceInfo>();
int count = 0;
while (rs.next())
@@ -1255,12 +1115,58 @@
// End of paging functionality
// ===========================
- public void addReference(long channelID, MessageReference ref, Transaction tx) throws Exception
- {
+ public void addReference(final long channelID, final MessageReference ref, final Transaction tx) throws Exception
+ {
+ class AddReferenceRunner extends JDBCTxRunner
+ {
+ public Object doTransaction() throws Exception
+ {
+ PreparedStatement psReference = null;
+ PreparedStatement psMessage = null;
+
+ Message m = ref.getMessage();
+
+ try
+ {
+ psReference = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE_REF"));
+
+ // Add the reference
+ addReference(channelID, ref, psReference, false);
+
+ int rows = psReference.executeUpdate();
+
+ if (trace) { log.trace("Inserted " + rows + " rows"); }
+
+ if (!m.isPersisted())
+ {
+ // First time so persist the message
+ psMessage = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE"));
+
+ storeMessage(m, psMessage);
+
+ rows = psMessage.executeUpdate();
+
+ if (trace) { log.trace("Inserted/updated " + rows + " rows"); }
+
+ log.trace("message Inserted/updated " + rows + " rows");
+
+ //Needs to be at the end - in case an exception is thrown in which case retry will be attempted and we want to insert it again
+ m.setPersisted(true);
+ }
+
+ return null;
+ }
+ finally
+ {
+ closeStatement(psReference);
+ closeStatement(psMessage);
+ }
+ }
+ }
+
if (tx != null)
{
//In a tx so we just add the ref in the tx in memory for now
-
TransactionCallback callback = getCallback(tx);
callback.addReferenceToAdd(channelID, ref);
@@ -1268,106 +1174,78 @@
else
{
//No tx so add the ref directly in the db
-
- TransactionWrapper wrap = new TransactionWrapper();
-
- PreparedStatement psReference = null;
- PreparedStatement psMessage = null;
-
- Connection conn = ds.getConnection();
-
- Message m = ref.getMessage();
-
- try
- {
- // Get lock on message
- // LockMap.instance.obtainLock(m);
-
- psReference = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE_REF"));
-
- // Add the reference
- addReference(channelID, ref, psReference, false);
-
- int rows = executeWithRetry(psReference);
-
- if (trace) { log.trace("Inserted " + rows + " rows"); }
-
- if (!m.isPersisted())
- {
- // First time so persist the message
- psMessage = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE"));
-
- storeMessage(m, psMessage);
-
- m.setPersisted(true);
-
- rows = executeWithRetry(psMessage);
-
- if (trace) { log.trace("Inserted/updated " + rows + " rows"); }
-
- log.trace("message Inserted/updated " + rows + " rows");
- }
- }
- catch (Exception e)
- {
- wrap.exceptionOccurred();
- throw e;
- }
- finally
- {
- closeStatement(psReference);
- closeStatement(psMessage);
- closeConnection(conn);
- try
- {
- wrap.end();
- }
- finally
- {
- //Release Lock
- // LockMap.instance.releaseLock(m);
- }
- }
+ new AddReferenceRunner().executeWithRetry();
}
}
- public void updateDeliveryCount(long channelID, MessageReference ref) throws Exception
+ public void updateDeliveryCount(final long channelID, final MessageReference ref) throws Exception
{
- TransactionWrapper wrap = new TransactionWrapper();
-
- PreparedStatement psReference = null;
-
- Connection conn = ds.getConnection();
-
- try
- {
- psReference = conn.prepareStatement(getSQLStatement("UPDATE_DELIVERY_COUNT"));
-
- psReference.setInt(1, ref.getDeliveryCount());
-
- psReference.setLong(2, channelID);
-
- psReference.setLong(3, ref.getMessage().getMessageID());
-
- int rows = executeWithRetry(psReference);
-
- if (trace) { log.trace("Updated " + rows + " rows"); }
- }
- catch (Exception e)
- {
- wrap.exceptionOccurred();
- throw e;
- }
- finally
- {
- closeStatement(psReference);
- closeConnection(conn);
- wrap.end();
- }
+ class UpdateDeliveryCountRunner extends JDBCTxRunner
+ {
+ public Object doTransaction() throws Exception
+ {
+ PreparedStatement psReference = null;
+
+ try
+ {
+ psReference = conn.prepareStatement(getSQLStatement("UPDATE_DELIVERY_COUNT"));
+
+ psReference.setInt(1, ref.getDeliveryCount());
+
+ psReference.setLong(2, channelID);
+
+ psReference.setLong(3, ref.getMessage().getMessageID());
+
+ int rows = psReference.executeUpdate();
+
+ if (trace) { log.trace("Updated " + rows + " rows"); }
+
+ return null;
+ }
+ finally
+ {
+ closeStatement(psReference);
+ }
+ }
+ }
+
+ new UpdateDeliveryCountRunner().executeWithRetry();
}
- public void removeReference(long channelID, MessageReference ref, Transaction tx) throws Exception
+ public void removeReference(final long channelID, final MessageReference ref, final Transaction tx) throws Exception
{
+ class RemoveReferenceRunner extends JDBCTxRunner
+ {
+ public Object doTransaction() throws Exception
+ {
+ PreparedStatement psReference = null;
+
+ try
+ {
+ psReference = conn.prepareStatement(getSQLStatement("DELETE_MESSAGE_REF"));
+
+ //Remove the message reference
+ removeReference(channelID, ref, psReference);
+
+ int rows = psReference.executeUpdate();
+
+ if (rows != 1)
+ {
+ log.warn("Failed to remove row for: " + ref);
+ return null;
+ }
+
+ if (trace) { log.trace("Deleted " + rows + " rows"); }
+
+ return null;
+ }
+ finally
+ {
+ closeStatement(psReference);
+ }
+ }
+ }
+
if (tx != null)
{
//In a tx so we just add the ref in the tx in memory for now
@@ -1379,54 +1257,8 @@
else
{
//No tx so we remove the reference directly from the db
-
- TransactionWrapper wrap = new TransactionWrapper();
-
- PreparedStatement psReference = null;
-
- Connection conn = ds.getConnection();
-
- Message m = ref.getMessage();
-
- try
- {
- //get lock on message
- // LockMap.instance.obtainLock(m);
-
- psReference = conn.prepareStatement(getSQLStatement("DELETE_MESSAGE_REF"));
-
- //Remove the message reference
- removeReference(channelID, ref, psReference);
-
- int rows = executeWithRetry(psReference);
-
- if (rows != 1)
- {
- log.warn("Failed to remove row for: " + ref);
- return;
- }
-
- if (trace) { log.trace("Deleted " + rows + " rows"); }
- }
- catch (Exception e)
- {
- wrap.exceptionOccurred();
- throw e;
- }
- finally
- {
- closeStatement(psReference);
- closeConnection(conn);
-// try
-// {
- wrap.end();
-// }
-// finally
-// {
-// //release the lock
-// LockMap.instance.releaseLock(m);
-// }
- }
+
+ new RemoveReferenceRunner().executeWithRetry();
}
}
@@ -1494,493 +1326,295 @@
return callback;
}
- /**
- * We order the list of references in ascending message order thus preventing deadlock when 2 or
- * more channels are updating the same messages in different transactions.
- */
-// protected void orderReferences(List references)
-// {
-// Collections.sort(references, MessageOrderComparator.instance);
-// }
-
- protected void handleBeforeCommit1PC(List refsToAdd, List refsToRemove, Transaction tx)
+ protected void handleBeforeCommit1PC(final List refsToAdd, final List refsToRemove, final Transaction tx)
throws Exception
{
- //TODO - A slight optimisation - it's possible we have refs referring to the same message
- // so we will end up acquiring the lock more than once which is unnecessary. If find
- // unique set of messages can avoid this.
+ class HandleBeforeCommit1PCRunner extends JDBCTxRunner
+ {
+ public Object doTransaction() throws Exception
+ {
+ // For one phase we simply add rows corresponding to the refs and remove rows corresponding to
+ // the deliveries in one jdbc tx. We also need to store messages as necessary,
+ // depending on whether they've already been stored or still referenced by other channels.
+
+ PreparedStatement psReference = null;
+ PreparedStatement psInsertMessage = null;
+ PreparedStatement psDeleteReference = null;
+
+ List<Message> messagesStored = new ArrayList<Message>();
-// List allRefs = new ArrayList(refsToAdd.size() + refsToRemove.size());
-//
-// for(Iterator i = refsToAdd.iterator(); i.hasNext(); )
-// {
-// ChannelRefPair pair = (ChannelRefPair)i.next();
-// allRefs.add(pair.ref);
-// }
-//
-// for(Iterator i = refsToRemove.iterator(); i.hasNext(); )
-// {
-// ChannelRefPair pair = (ChannelRefPair)i.next();
-// allRefs.add(pair.ref);
-// }
-//
-// orderReferences(allRefs);
-
- // For one phase we simply add rows corresponding to the refs and remove rows corresponding to
- // the deliveries in one jdbc tx. We also need to store or remove messages as necessary,
- // depending on whether they've already been stored or still referenced by other channels.
-
- Connection conn = null;
- PreparedStatement psReference = null;
- PreparedStatement psInsertMessage = null;
- TransactionWrapper wrap = new TransactionWrapper();
-
- try
- {
- conn = ds.getConnection();
-
- // Obtain locks on all messages
- // getLocks(allRefs);
-
- // First the adds
+ try
+ {
+ // First the adds
- boolean messageInsertsInBatch = false;
- boolean batch = usingBatchUpdates && refsToAdd.size() > 0;
+ for (Iterator i = refsToAdd.iterator(); i.hasNext(); )
+ {
+ ChannelRefPair pair = (ChannelRefPair)i.next();
+ MessageReference ref = pair.ref;
+
+ psReference = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE_REF"));
+
+ // Now store the reference
+ addReference(pair.channelID, ref, psReference, false);
+
+ int rows = psReference.executeUpdate();
+
+ if (trace) { log.trace("Inserted " + rows + " rows"); }
+
+ Message m = ref.getMessage();
+
+ synchronized (m)
+ {
+ if (!m.isPersisted())
+ {
+ if (psInsertMessage == null)
+ {
+ psInsertMessage = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE"));
+ }
+
+ // First time so add message
+ storeMessage(m, psInsertMessage);
+
+ if (trace) { log.trace("Message does not already exist so inserting it"); }
+ rows = psInsertMessage.executeUpdate();
+ if (trace) { log.trace("Inserted " + rows + " rows"); }
+
+ m.setPersisted(true);
+
+ messagesStored.add(m);
+ }
+ }
+ }
+
+ // Now the removes
- for (Iterator i = refsToAdd.iterator(); i.hasNext(); )
- {
- ChannelRefPair pair = (ChannelRefPair)i.next();
- MessageReference ref = pair.ref;
-
- if (batch && psReference == null || !batch)
- {
- psReference = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE_REF"));
- }
-
- // Now store the reference
- addReference(pair.channelID, ref, psReference, false);
-
- if (batch)
- {
- psReference.addBatch();
- }
- else
- {
- int rows = executeWithRetry(psReference);
-
- if (trace) { log.trace("Inserted " + rows + " rows"); }
-
- psReference.close();
- psReference = null;
- }
-
- Message m = ref.getMessage();
-
- synchronized (m)
- {
- if (!m.isPersisted())
- {
- if (batch && psInsertMessage == null || !batch)
- {
- psInsertMessage = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE"));
- }
-
- // First time so add message
- storeMessage(m, psInsertMessage);
- m.setPersisted(true);
-
- if (batch)
+ for (Iterator i = refsToRemove.iterator(); i.hasNext(); )
+ {
+ ChannelRefPair pair = (ChannelRefPair)i.next();
+
+ if (psDeleteReference == null)
{
- psInsertMessage.addBatch();
- if (trace) { log.trace("Message does not already exist so inserting it"); }
- messageInsertsInBatch = true;
+ psDeleteReference = conn.prepareStatement(getSQLStatement("DELETE_MESSAGE_REF"));
}
- else
- {
- if (trace) { log.trace("Message does not already exist so inserting it"); }
- int rows = executeWithRetry(psInsertMessage);
- if (trace) { log.trace("Inserted " + rows + " rows"); }
-
- psInsertMessage.close();
- psInsertMessage = null;
- }
- }
- }
- }
-
- if (batch)
- {
- // Process the add batch
- int[] rowsReference = executeWithRetryBatch(psReference);
-
- if (trace) { logBatchUpdate(getSQLStatement("INSERT_MESSAGE_REF"), rowsReference, "inserted"); }
-
- if (messageInsertsInBatch)
- {
- int[] rowsMessage = executeWithRetryBatch(psInsertMessage);
- if (trace) { logBatchUpdate(getSQLStatement("INSERT_MESSAGE"), rowsMessage, "inserted"); }
- }
-
- psReference.close();
- psReference = null;
- psInsertMessage.close();
- psInsertMessage = null;
- }
-
- // Now the removes
-
- psReference = null;
- batch = usingBatchUpdates && refsToRemove.size() > 0;
-
- for (Iterator i = refsToRemove.iterator(); i.hasNext(); )
- {
- ChannelRefPair pair = (ChannelRefPair)i.next();
-
- if (batch && psReference == null || !batch)
- {
- psReference = conn.prepareStatement(getSQLStatement("DELETE_MESSAGE_REF"));
- }
-
- removeReference(pair.channelID, pair.ref, psReference);
-
- if (batch)
- {
- psReference.addBatch();
- }
- else
- {
- int rows = executeWithRetry(psReference);
- if (trace) { log.trace("Deleted " + rows + " rows"); }
- psReference.close();
- psReference = null;
- }
- }
-
- if (batch)
- {
- // Process the remove batch
-
- int[] rows = executeWithRetryBatch(psReference);
-
- if (trace) { logBatchUpdate(getSQLStatement("DELETE_MESSAGE_REF"), rows, "deleted"); }
-
- psReference.close();
- psReference = null;
- }
- }
- catch (Exception e)
- {
- wrap.exceptionOccurred();
- throw e;
- }
- finally
- {
- closeStatement(psReference);
- closeStatement(psInsertMessage);
- closeConnection(conn);
- // try
- // {
- wrap.end();
-// }
-// finally
-// {
-// //Release the locks
-// this.releaseLocks(allRefs);
-// }
- }
+ removeReference(pair.channelID, pair.ref, psDeleteReference);
+
+ int rows = psDeleteReference.executeUpdate();
+
+ if (trace) { log.trace("Deleted " + rows + " rows"); }
+ }
+
+ return null;
+ }
+ catch (Exception e)
+ {
+ for (Iterator i = messagesStored.iterator(); i.hasNext(); )
+ {
+ Message msg = (Message)i.next();
+
+ msg.setPersisted(false);
+ }
+ throw e;
+ }
+ finally
+ {
+ closeStatement(psReference);
+ closeStatement(psDeleteReference);
+ closeStatement(psInsertMessage);
+ }
+ }
+ }
+ new HandleBeforeCommit1PCRunner().executeWithRetry();
}
- protected void handleBeforeCommit2PC(List refsToRemove, Transaction tx)
- throws Exception
+ protected void handleBeforeCommit2PC(final Transaction tx) throws Exception
{
- Connection conn = null;
-
- TransactionWrapper wrap = new TransactionWrapper();
-
-// List refs = new ArrayList(refsToRemove.size());
-// Iterator iter = refsToRemove.iterator();
-// while (iter.hasNext())
-// {
-// ChannelRefPair pair = (ChannelRefPair)iter.next();
-// refs.add(pair.ref);
-// }
-//
-// orderReferences(refs);
-
- try
- {
- //get locks on all the refs
- // this.getLocks(refs);
-
- conn = ds.getConnection();
-
- //2PC commit
-
- //We commit any refs in state "+" to "C" and delete any
- //refs in state "-", then we
- //remove any messages due to refs we just removed
- //if they're not referenced elsewhere
-
- commitPreparedTransaction(tx, conn);
- }
- catch (Exception e)
- {
- wrap.exceptionOccurred();
- throw e;
- }
- finally
- {
- closeConnection(conn);
-// try
-// {
- wrap.end();
-// }
-// finally
-// {
-// //release the locks
-// this.releaseLocks(refs);
-// }
- }
+ class HandleBeforeCommit2PCRunner extends JDBCTxRunner
+ {
+ public Object doTransaction() throws Exception
+ {
+ PreparedStatement ps = null;
+
+ if (trace) { log.trace(this + " commitPreparedTransaction, tx= " + tx); }
+
+ try
+ {
+ ps = conn.prepareStatement(getSQLStatement("COMMIT_MESSAGE_REF1"));
+
+ ps.setLong(1, tx.getId());
+
+ int rows = ps.executeUpdate();
+
+ if (trace) { log.trace(JDBCUtil.statementToString(getSQLStatement("COMMIT_MESSAGE_REF1"), new Long(tx.getId())) + " removed " + rows + " row(s)"); }
+
+ ps.close();
+ ps = null;
+
+
+ ps = conn.prepareStatement(getSQLStatement("COMMIT_MESSAGE_REF2"));
+ ps.setLong(1, tx.getId());
+
+ rows = ps.executeUpdate();
+
+ if (trace) { log.trace(JDBCUtil.statementToString(getSQLStatement("COMMIT_MESSAGE_REF2"), new Long(tx.getId())) + " updated " + rows + " row(s)"); }
+
+ removeTXRecord(conn, tx);
+
+ return null;
+ }
+ finally
+ {
+ closeStatement(ps);
+ }
+ }
+ }
+
+ new HandleBeforeCommit2PCRunner().executeWithRetry();
}
- protected void handleBeforePrepare(List refsToAdd, List refsToRemove, Transaction tx) throws Exception
+ protected void handleBeforePrepare(final List refsToAdd, final List refsToRemove, final Transaction tx) throws Exception
{
- //We only need to lock on the adds
-// List refs = new ArrayList(refsToAdd.size());
-//
-// Iterator iter = refsToAdd.iterator();
-// while (iter.hasNext())
-// {
-// ChannelRefPair pair = (ChannelRefPair)iter.next();
-//
-// refs.add(pair.ref);
-// }
-//
-// orderReferences(refs);
-
- //We insert a tx record and
- //a row for each ref with +
- //and update the row for each delivery with "-"
-
- PreparedStatement psReference = null;
- PreparedStatement psInsertMessage = null;
- Connection conn = null;
- TransactionWrapper wrap = new TransactionWrapper();
-
- try
- {
- //get the locks
- // getLocks(refs);
-
- conn = ds.getConnection();
-
- //Insert the tx record
- if (!refsToAdd.isEmpty() || !refsToRemove.isEmpty())
- {
- addTXRecord(conn, tx);
- }
-
- Iterator iter = refsToAdd.iterator();
- boolean batch = usingBatchUpdates && refsToAdd.size() > 1;
- boolean messageInsertsInBatch = false;
+ class HandleBeforePrepareRunner extends JDBCTxRunner
+ {
+ public Object doTransaction() throws Exception
+ {
+ //We insert a tx record and
+ //a row for each ref with +
+ //and update the row for each delivery with "-"
+
+ PreparedStatement psReference = null;
+ PreparedStatement psInsertMessage = null;
+ PreparedStatement psUpdateReference = null;
+
+ try
+ {
+ conn = ds.getConnection();
+
+ //Insert the tx record
+ if (!refsToAdd.isEmpty() || !refsToRemove.isEmpty())
+ {
+ addTXRecord(conn, tx);
+ }
+
+ Iterator iter = refsToAdd.iterator();
- while (iter.hasNext())
- {
- ChannelRefPair pair = (ChannelRefPair) iter.next();
-
- if (batch && psReference == null || !batch)
- {
- psReference = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE_REF"));
- }
-
- prepareToAddReference(pair.channelID, pair.ref, tx, psReference);
-
- if (batch)
- {
- psReference.addBatch();
- }
- else
- {
- int rows = executeWithRetry(psReference);
-
- if (trace) { log.trace("Inserted " + rows + " rows"); }
-
- psReference.close();
- psReference = null;
- }
-
- Message m = pair.ref.getMessage();
-
- synchronized (m)
- {
-
- if (!m.isPersisted())
- {
- if (batch && psInsertMessage == null || !batch)
- {
- psInsertMessage = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE"));
- }
-
- storeMessage(m, psInsertMessage);
+ while (iter.hasNext())
+ {
+ ChannelRefPair pair = (ChannelRefPair) iter.next();
- m.setPersisted(true);
-
- if (batch)
+ if (psReference == null)
{
- psInsertMessage.addBatch();
-
- messageInsertsInBatch = true;
+ psReference = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE_REF"));
}
- else
- {
- int rows = executeWithRetry(psInsertMessage);
-
- if (trace) { log.trace("Inserted " + rows + " rows"); }
-
- psInsertMessage.close();
-
- psInsertMessage = null;
- }
- }
- }
- }
-
- if (batch)
- {
- int[] rowsReference = executeWithRetryBatch(psReference);
-
- if (trace) { logBatchUpdate(getSQLStatement("INSERT_MESSAGE_REF"), rowsReference, "inserted"); }
-
- if (messageInsertsInBatch)
- {
- int[] rowsMessage = executeWithRetryBatch(psInsertMessage);
-
- if (trace) { logBatchUpdate(getSQLStatement("INSERT_MESSAGE"), rowsMessage, "inserted"); }
- }
+
+ prepareToAddReference(pair.channelID, pair.ref, tx, psReference);
+
+ int rows = psReference.executeUpdate();
+
+ if (trace) { log.trace("Inserted " + rows + " rows"); }
+
+ Message m = pair.ref.getMessage();
+
+ synchronized (m)
+ {
+ if (!m.isPersisted())
+ {
+ if (psInsertMessage == null)
+ {
+ psInsertMessage = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE"));
+ }
+
+ storeMessage(m, psInsertMessage);
+
+ rows = psInsertMessage.executeUpdate();
+
+ if (trace) { log.trace("Inserted " + rows + " rows"); }
- psReference.close();
- psReference = null;
- psInsertMessage.close();
- psInsertMessage = null;
- }
-
- //Now the removes
-
- iter = refsToRemove.iterator();
-
- batch = usingBatchUpdates && refsToRemove.size() > 1;
-
- if (batch)
- {
- psReference = conn.prepareStatement(getSQLStatement("UPDATE_MESSAGE_REF"));
- }
-
- while (iter.hasNext())
- {
- ChannelRefPair pair = (ChannelRefPair) iter.next();
-
- if (!batch)
- {
- psReference = conn.prepareStatement(getSQLStatement("UPDATE_MESSAGE_REF"));
- }
-
- prepareToRemoveReference(pair.channelID, pair.ref, tx, psReference);
-
- if (batch)
- {
- psReference.addBatch();
- }
- else
- {
- int rows = executeWithRetry(psReference);
-
- if (trace) { log.trace("updated " + rows + " rows"); }
-
- psReference.close();
- psReference = null;
- }
- }
-
- if (batch)
- {
- int[] rows = executeWithRetryBatch(psReference);
-
- if (trace) { logBatchUpdate(getSQLStatement("UPDATE_MESSAGE_REF"), rows, "updated"); }
-
- psReference.close();
- psReference = null;
- }
- }
- catch (Exception e)
- {
- wrap.exceptionOccurred();
- throw e;
- }
- finally
- {
- closeStatement(psReference);
- closeStatement(psInsertMessage);
- closeConnection(conn);
-// try
-// {
- wrap.end();
-// }
-// finally
-// {
- //release the locks
-
- // this.releaseLocks(refs);
- // }
- }
+ m.setPersisted(true);
+ }
+ }
+ }
+
+ //Now the removes
+
+ iter = refsToRemove.iterator();
+
+ while (iter.hasNext())
+ {
+ if (psUpdateReference == null)
+ {
+ psUpdateReference = conn.prepareStatement(getSQLStatement("UPDATE_MESSAGE_REF"));
+ }
+
+ ChannelRefPair pair = (ChannelRefPair) iter.next();
+
+ prepareToRemoveReference(pair.channelID, pair.ref, tx, psUpdateReference);
+
+ int rows = psUpdateReference.executeUpdate();
+
+ if (trace) { log.trace("updated " + rows + " rows"); }
+ }
+
+ return null;
+ }
+ finally
+ {
+ closeStatement(psReference);
+ closeStatement(psInsertMessage);
+ closeStatement(psUpdateReference);
+ }
+ }
+ }
+
+ new HandleBeforePrepareRunner().executeWithRetry();
}
- protected void handleBeforeRollback(List refsToAdd, Transaction tx) throws Exception
+ protected void handleBeforeRollback(final List refsToAdd, final Transaction tx) throws Exception
{
- //remove refs marked with +
- //and update rows marked with - to C
-
- Connection conn = null;
- TransactionWrapper wrap = new TransactionWrapper();
-
-// List refs = new ArrayList(refsToAdd.size());
-//
-// Iterator iter = refsToAdd.iterator();
-//
-// while (iter.hasNext())
-// {
-// ChannelRefPair pair = (ChannelRefPair)iter.next();
-// refs.add(pair.ref);
-// }
-//
-// orderReferences(refs);
-
- try
- {
- // this.getLocks(refs);
-
- conn = ds.getConnection();
-
- rollbackPreparedTransaction(tx, conn);
- }
- catch (Exception e)
- {
- wrap.exceptionOccurred();
- throw e;
- }
- finally
- {
- closeConnection(conn);
-// try
-// {
- wrap.end();
-// }
-// finally
-// {
-// //release locks
-// this.releaseLocks(refs);
-// }
- }
+ class HandleBeforeRollbackRunner extends JDBCTxRunner
+ {
+ public Object doTransaction() throws Exception
+ {
+ PreparedStatement ps = null;
+
+ try
+ {
+ ps = conn.prepareStatement(getSQLStatement("ROLLBACK_MESSAGE_REF1"));
+
+ ps.setLong(1, tx.getId());
+
+ int rows = ps.executeUpdate();
+
+ if (trace)
+ {
+ log.trace(JDBCUtil.statementToString(getSQLStatement("ROLLBACK_MESSAGE_REF1"), new Long(tx.getId())) + " removed " + rows + " row(s)");
+ }
+
+ ps.close();
+
+ ps = conn.prepareStatement(getSQLStatement("ROLLBACK_MESSAGE_REF2"));
+ ps.setLong(1, tx.getId());
+
+ rows = ps.executeUpdate();
+
+ if (trace)
+ {
+ log.trace(JDBCUtil.statementToString(getSQLStatement("ROLLBACK_MESSAGE_REF2"), new Long(tx.getId())) + " updated " + rows
+ + " row(s)");
+ }
+
+ removeTXRecord(conn, tx);
+
+ return null;
+ }
+ finally
+ {
+ closeStatement(ps);
+ }
+ }
+ }
+
+ new HandleBeforeRollbackRunner().executeWithRetry();
}
@@ -2021,8 +1655,7 @@
setVarBinaryColumn(5, ps, xid.getGlobalTransactionId());
- rows = executeWithRetry(ps);
-
+ rows = ps.executeUpdate();
}
finally
{
@@ -2053,7 +1686,7 @@
ps.setLong(2, tx.getId());
- int rows = executeWithRetry(ps);
+ int rows = ps.executeUpdate();
if (trace)
{
@@ -2126,83 +1759,6 @@
ps.setLong(3, channelID);
}
- protected void commitPreparedTransaction(Transaction tx, Connection conn) throws Exception
- {
- PreparedStatement ps = null;
-
- if (trace) { log.trace(this + " commitPreparedTransaction, tx= " + tx); }
-
- try
- {
- ps = conn.prepareStatement(getSQLStatement("COMMIT_MESSAGE_REF1"));
-
- ps.setLong(1, tx.getId());
-
- int rows = executeWithRetry(ps);
-
- if (trace)
- {
- log.trace(JDBCUtil.statementToString(getSQLStatement("COMMIT_MESSAGE_REF1"), new Long(tx.getId())) + " removed " + rows + " row(s)");
- }
-
- ps.close();
- ps = conn.prepareStatement(getSQLStatement("COMMIT_MESSAGE_REF2"));
- ps.setLong(1, tx.getId());
-
- rows = executeWithRetry(ps);
-
- if (trace)
- {
- log.trace(JDBCUtil.statementToString(getSQLStatement("COMMIT_MESSAGE_REF2"), new Long(tx.getId())) + " updated " + rows
- + " row(s)");
- }
-
- removeTXRecord(conn, tx);
- }
- finally
- {
- closeStatement(ps);
- }
- }
-
- protected void rollbackPreparedTransaction(Transaction tx, Connection conn) throws Exception
- {
- PreparedStatement ps = null;
-
- try
- {
- ps = conn.prepareStatement(getSQLStatement("ROLLBACK_MESSAGE_REF1"));
-
- ps.setLong(1, tx.getId());
-
- int rows = executeWithRetry(ps);
-
- if (trace)
- {
- log.trace(JDBCUtil.statementToString(getSQLStatement("ROLLBACK_MESSAGE_REF1"), new Long(tx.getId())) + " removed " + rows + " row(s)");
- }
-
- ps.close();
-
- ps = conn.prepareStatement(getSQLStatement("ROLLBACK_MESSAGE_REF2"));
- ps.setLong(1, tx.getId());
-
- rows = executeWithRetry(ps);
-
- if (trace)
- {
- log.trace(JDBCUtil.statementToString(getSQLStatement("ROLLBACK_MESSAGE_REF2"), new Long(tx.getId())) + " updated " + rows
- + " row(s)");
- }
-
- removeTXRecord(conn, tx);
- }
- finally
- {
- closeStatement(ps);
- }
- }
-
protected byte[] mapToBytes(Map map) throws Exception
{
if (map == null || map.isEmpty())
@@ -2405,28 +1961,6 @@
}
}
-// protected void getLocks(List refs)
-// {
-// Iterator iter = refs.iterator();
-// while (iter.hasNext())
-// {
-// MessageReference ref = (MessageReference)iter.next();
-// Message m = ref.getMessage();
-// LockMap.instance.obtainLock(m);
-// }
-// }
-//
-// protected void releaseLocks(List refs)
-// {
-// Iterator iter = refs.iterator();
-// while (iter.hasNext())
-// {
-// MessageReference ref = (MessageReference)iter.next();
-// Message m = ref.getMessage();
-// LockMap.instance.releaseLock(m);
-// }
-// }
-
protected void logBatchUpdate(String name, int[] rows, String action)
{
int count = 0;
@@ -2436,27 +1970,12 @@
}
log.trace("Batch update " + name + ", " + action + " total of " + count + " rows");
}
-
- protected int executeWithRetry(PreparedStatement ps) throws Exception
- {
- return executeWithRetry(ps, false, false)[0];
- }
-
- protected int executeWithRetryIgnoreKeyViolation(PreparedStatement ps) throws Exception
- {
- return executeWithRetry(ps, false, true)[0];
- }
-
- protected int[] executeWithRetryBatch(PreparedStatement ps) throws Exception
- {
- return executeWithRetry(ps, true, false);
- }
-
+
//PersistentServiceSupport overrides ----------------------------
protected Map getDefaultDDLStatements()
{
- Map map = new LinkedHashMap();
+ Map<String, String> map = new LinkedHashMap<String, String>();
//Message reference
map.put("CREATE_MESSAGE_REFERENCE",
"CREATE TABLE JBM_MSG_REF (CHANNEL_ID BIGINT, " +
@@ -2487,7 +2006,7 @@
protected Map getDefaultDMLStatements()
{
- Map map = new LinkedHashMap();
+ Map<String, String> map = new LinkedHashMap<String, String>();
//Message reference
map.put("INSERT_MESSAGE_REF",
"INSERT INTO JBM_MSG_REF (CHANNEL_ID, MESSAGE_ID, TRANSACTION_ID, STATE, ORD, PAGE_ORD, DELIVERY_COUNT, SCHED_DELIVERY) " +
@@ -2550,80 +2069,6 @@
// Private -------------------------------------------------------
-
-
- private int[] executeWithRetry(PreparedStatement ps, boolean batch, boolean ignoreKeyViolation) throws Exception
- {
- final int MAX_TRIES = 25;
-
- int rows = 0;
-
- int[] rowsArr = null;
-
- int tries = 0;
-
- while (true)
- {
- try
- {
- if (batch)
- {
- rowsArr = ps.executeBatch();
- }
- else
- {
- try
- {
- rows = ps.executeUpdate();
- }
- catch (SQLException e)
- {
- if (ignoreKeyViolation && e.getSQLState().equals("23000"))
- {
- //Key violation - ignore
- log.info("Got key violation - but ignoring");
- }
- else
- {
- throw e;
- }
- }
- }
-
- if (tries > 0)
- {
- log.warn("Update worked after retry");
- }
- break;
- }
- catch (SQLException e)
- {
- log.warn("SQLException caught - assuming deadlock detected, try:" + (tries + 1), e);
-
- log.info("SQLState:" + e.getSQLState() + " code:" + e.getErrorCode());
-
- tries++;
- if (tries == MAX_TRIES)
- {
- log.error("Retried " + tries + " times, now giving up");
- throw new IllegalStateException("Failed to update references");
- }
- log.warn("Trying again after a pause");
- //Now we wait for a random amount of time to minimise risk of deadlock
- Thread.sleep((long)(Math.random() * 500));
- }
- }
-
- if (batch)
- {
- return rowsArr;
- }
- else
- {
- return new int[] { rows };
- }
- }
-
private List getMessageChannelPair(String sqlQuery, long transactionId) throws Exception
{
if (trace) log.trace("loading message and channel ids for tx [" + transactionId + "]");
@@ -2652,14 +2097,6 @@
//Don't use a Map. A message could be in multiple channels in a tx, so if you use a map
//when you put the same message again it's going to overwrite the previous put!!
- List holders = new ArrayList();
-
- //Unique set of messages
- Set msgIds = new HashSet();
-
- //TODO it would probably have been simpler just to have done all this in a SQL JOIN rather
- //than do the join in memory.....
-
class Holder
{
long messageId;
@@ -2670,6 +2107,14 @@
this.channelId = channelId;
}
}
+
+ List<Holder> holders = new ArrayList<Holder>();
+
+ //Unique set of messages
+ Set<Long> msgIds = new HashSet<Long>();
+
+ //TODO it would probably have been simpler just to have done all this in a SQL JOIN rather
+ //than do the join in memory.....
while(rs.next())
{
@@ -2680,7 +2125,7 @@
holders.add(holder);
- msgIds.add(new Long(messageId));
+ msgIds.add(messageId);
if (trace) log.trace("Loaded MsgID: " + messageId + " and ChannelID: " + channelId);
}
@@ -2899,7 +2344,7 @@
}
else
{
- handleBeforeCommit2PC(refsToRemove, tx);
+ handleBeforeCommit2PC(tx);
}
}
@@ -2921,19 +2366,71 @@
}
}
-// static class MessageOrderComparator implements Comparator
-// {
-// static MessageOrderComparator instance = new MessageOrderComparator();
-//
-// public int compare(Object o1, Object o2)
-// {
-// MessageReference ref1 = (MessageReference)o1;
-// MessageReference ref2 = (MessageReference)o2;
-//
-// long id1 = ref1.getMessage().getMessageID();
-// long id2 = ref2.getMessage().getMessageID();
-//
-// return (id1 < id2 ? -1 : (id1 == id2 ? 0 : 1));
-// }
-// }
+ private abstract class JDBCTxRunner
+ {
+ private static final int MAX_TRIES = 25;
+
+ Connection conn;
+
+ TransactionWrapper wrap;
+
+ public Object execute() throws Exception
+ {
+ wrap = new TransactionWrapper();
+
+ try
+ {
+ conn = ds.getConnection();
+
+ return doTransaction();
+ }
+ catch (Exception e)
+ {
+ wrap.exceptionOccurred();
+ throw e;
+ }
+ finally
+ {
+ closeConnection(conn);
+ wrap.end();
+ }
+ }
+
+ public Object executeWithRetry() throws Exception
+ {
+ int tries = 0;
+
+ while (true)
+ {
+ try
+ {
+ Object res = execute();
+
+ if (tries > 0)
+ {
+ log.warn("Update worked after retry");
+ }
+ return res;
+ }
+ catch (SQLException e)
+ {
+ log.warn("SQLException caught - assuming deadlock detected, try:" + (tries + 1), e);
+
+ log.info("SQLState:" + e.getSQLState() + " code:" + e.getErrorCode());
+
+ tries++;
+ if (tries == MAX_TRIES)
+ {
+ log.error("Retried " + tries + " times, now giving up");
+ throw new IllegalStateException("Failed to excecute transaction");
+ }
+ log.warn("Trying again after a pause");
+ //Now we wait for a random amount of time to minimise risk of deadlock
+ Thread.sleep((long)(Math.random() * 500));
+ }
+ }
+ }
+
+ public abstract Object doTransaction() throws Exception;
+ }
}
More information about the jboss-cvs-commits
mailing list