[jboss-cvs] JBoss Messaging SVN: r3035 - trunk/src/main/org/jboss/messaging/core/impl.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Aug 23 06:38:23 EDT 2007
Author: timfox
Date: 2007-08-23 06:38:22 -0400 (Thu, 23 Aug 2007)
New Revision: 3035
Modified:
trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
Log:
some extra logging
Modified: trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java 2007-08-22 21:12:05 UTC (rev 3034)
+++ trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java 2007-08-23 10:38:22 UTC (rev 3035)
@@ -31,7 +31,6 @@
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
-import java.sql.Statement;
import java.sql.Types;
import java.util.ArrayList;
import java.util.HashMap;
@@ -532,7 +531,9 @@
//Used to page NP messages or P messages in a non recoverable queue
public void pageReferences(final long channelID, final List references, final boolean page) throws Exception
- {
+ {
+ if (trace) { log.trace("Paging references in channel " + channelID + " refs " + references.size()); }
+
class PageReferencesRunner extends JDBCTxRunner
{
public Object doTransaction() throws Exception
@@ -646,7 +647,7 @@
//After loading paged refs this is used to remove any NP or P messages in a unrecoverable channel
public void removeDepagedReferences(final long channelID, final List references) throws Exception
{
- if (trace) { log.trace(this + " Removing " + references.size() + " refs from channel " + channelID); }
+ if (trace) { log.trace(this + " Removing depaged " + references.size() + " refs from channel " + channelID); }
class RemoveDepagedReferencesRunner extends JDBCTxRunner
{
@@ -749,7 +750,214 @@
new UpdateReferencesNotPagedInRangeRunner().executeWithRetry();
}
-
+
+ public void updatePageOrder(final long channelID, final List references) throws Exception
+ {
+ if (trace) { log.trace("Updating page order for channel:" + channelID); }
+
+ class UpdatePageOrderRunner extends JDBCTxRunner
+ {
+ public Object doTransaction() throws Exception
+ {
+ PreparedStatement psUpdateReference = null;
+ try
+ {
+ Iterator iter = references.iterator();
+
+ psUpdateReference = conn.prepareStatement(getSQLStatement("UPDATE_PAGE_ORDER"));
+
+ while (iter.hasNext())
+ {
+ MessageReference ref = (MessageReference) iter.next();
+
+ psUpdateReference.setLong(1, ref.getPagingOrder());
+
+ psUpdateReference.setLong(2, ref.getMessage().getMessageID());
+
+ psUpdateReference.setLong(3, channelID);
+
+ int rows = psUpdateReference.executeUpdate();
+
+ if (trace) { log.trace("Updated " + rows + " rows"); }
+ }
+
+ return null;
+ }
+ finally
+ {
+ closeStatement(psUpdateReference);
+ }
+ }
+ }
+
+ new UpdatePageOrderRunner().executeWithRetry();
+ }
+
+ public List getPagedReferenceInfos(final long channelID, final long orderStart, final int number) throws Exception
+ {
+ if (trace) { log.trace("loading message reference info for channel " + channelID + " from " + orderStart + " number " + number); }
+
+ List<ReferenceInfo> refs = new ArrayList<ReferenceInfo>();
+
+ Connection conn = null;
+ PreparedStatement ps = null;
+ ResultSet rs = null;
+ TransactionWrapper wrap = new TransactionWrapper();
+
+ try
+ {
+ conn = ds.getConnection();
+
+ ps = conn.prepareStatement(getSQLStatement("LOAD_PAGED_REFS"));
+
+ ps.setLong(1, channelID);
+
+ ps.setLong(2, orderStart);
+
+ ps.setLong(3, orderStart + number - 1);
+
+ rs = ps.executeQuery();
+
+ long ord = orderStart;
+
+ while (rs.next())
+ {
+ long msgId = rs.getLong(1);
+ int deliveryCount = rs.getInt(2);
+ int pageOrd = rs.getInt(3);
+ long sched = rs.getLong(4);
+
+ //Sanity check
+ if (pageOrd != ord)
+ {
+ throw new IllegalStateException("Unexpected pageOrd: " + pageOrd + " expected: " + ord);
+ }
+
+ ReferenceInfo ri = new ReferenceInfo(msgId, deliveryCount, sched);
+
+ refs.add(ri);
+ ord++;
+ }
+
+ //Sanity check
+ if (ord != orderStart + number)
+ {
+ throw new IllegalStateException("Didn't load expected number of references, loaded: " + (ord - orderStart) +
+ " expected: " + number);
+ }
+
+ return refs;
+ }
+ catch (Exception e)
+ {
+ wrap.exceptionOccurred();
+ throw e;
+ }
+ finally
+ {
+ closeResultSet(rs);
+ closeStatement(ps);
+ closeConnection(conn);
+ wrap.end();
+ }
+ }
+
+ /*
+ * Load the initial, non paged refs
+ */
+ public InitialLoadInfo loadFromStart(final long channelID, final int number) throws Exception
+ {
+ if (trace) { log.trace("loading initial reference infos for channel " + channelID); }
+
+ Connection conn = null;
+ PreparedStatement ps = null;
+ ResultSet rs = null;
+ TransactionWrapper wrap = new TransactionWrapper();
+
+ try
+ {
+ conn = ds.getConnection();
+
+ //First we get the values for min() and max() page order
+ ps = conn.prepareStatement(getSQLStatement("SELECT_MIN_MAX_PAGE_ORD"));
+
+ ps.setLong(1, channelID);
+
+ rs = ps.executeQuery();
+
+ rs.next();
+
+ Long minOrdering = new Long(rs.getLong(1));
+
+ if (rs.wasNull())
+ {
+ minOrdering = null;
+ }
+
+ Long maxOrdering = new Long(rs.getLong(2));
+
+ if (rs.wasNull())
+ {
+ maxOrdering = null;
+ }
+
+ ps.close();
+
+ ps = null;
+
+ ps = conn.prepareStatement(getSQLStatement("LOAD_UNPAGED_REFS"));
+
+ ps.setLong(1, channelID);
+
+ rs = ps.executeQuery();
+
+ List<ReferenceInfo> refs = new ArrayList<ReferenceInfo>();
+
+ int count = 0;
+ while (rs.next())
+ {
+ long msgId = rs.getLong(1);
+ int deliveryCount = rs.getInt(2);
+ long sched = rs.getLong(3);
+
+ ReferenceInfo ri = new ReferenceInfo(msgId, deliveryCount, sched);
+
+ if (count < number)
+ {
+ refs.add(ri);
+ }
+
+ count++;
+ }
+
+ //No refs paged
+
+ if (count > number)
+ {
+ throw new IllegalStateException("Cannot load channel " + channelID + " since the fullSize parameter is too small to load " +
+ " all the required references, fullSize needs to be at least " + count + " it is currently " + number);
+ }
+
+ return new InitialLoadInfo(minOrdering, maxOrdering, refs);
+ }
+ catch (Exception e)
+ {
+ wrap.exceptionOccurred();
+ throw e;
+ }
+ finally
+ {
+ closeResultSet(rs);
+ closeStatement(ps);
+ closeConnection(conn);
+ wrap.end();
+ }
+ }
+
+
+ // Merging functionality
+ // --------------------
+
public void mergeTransactions(final long fromChannelID, final long toChannelID) throws Exception
{
if (trace) { log.trace("Merging transactions from channel " + fromChannelID + " to " + toChannelID); }
@@ -955,210 +1163,6 @@
return (InitialLoadInfo)new MergeAndLoadRunner().executeWithRetry();
}
- public void updatePageOrder(final long channelID, final List references) throws Exception
- {
- if (trace) { log.trace("Updating page order for channel:" + channelID); }
-
- class UpdatePageOrderRunner extends JDBCTxRunner
- {
- public Object doTransaction() throws Exception
- {
- PreparedStatement psUpdateReference = null;
- try
- {
- Iterator iter = references.iterator();
-
- psUpdateReference = conn.prepareStatement(getSQLStatement("UPDATE_PAGE_ORDER"));
-
- while (iter.hasNext())
- {
- MessageReference ref = (MessageReference) iter.next();
-
- psUpdateReference.setLong(1, ref.getPagingOrder());
-
- psUpdateReference.setLong(2, ref.getMessage().getMessageID());
-
- psUpdateReference.setLong(3, channelID);
-
- int rows = psUpdateReference.executeUpdate();
-
- if (trace) { log.trace("Updated " + rows + " rows"); }
- }
-
- return null;
- }
- finally
- {
- closeStatement(psUpdateReference);
- }
- }
- }
-
- new UpdatePageOrderRunner().executeWithRetry();
- }
-
- public List getPagedReferenceInfos(final long channelID, final long orderStart, final int number) throws Exception
- {
- if (trace) { log.trace("loading message reference info for channel " + channelID + " from " + orderStart + " number " + number); }
-
- List<ReferenceInfo> refs = new ArrayList<ReferenceInfo>();
-
- Connection conn = null;
- PreparedStatement ps = null;
- ResultSet rs = null;
- TransactionWrapper wrap = new TransactionWrapper();
-
- try
- {
- conn = ds.getConnection();
-
- ps = conn.prepareStatement(getSQLStatement("LOAD_PAGED_REFS"));
-
- ps.setLong(1, channelID);
-
- ps.setLong(2, orderStart);
-
- ps.setLong(3, orderStart + number - 1);
-
- rs = ps.executeQuery();
-
- long ord = orderStart;
-
- while (rs.next())
- {
- long msgId = rs.getLong(1);
- int deliveryCount = rs.getInt(2);
- int pageOrd = rs.getInt(3);
- long sched = rs.getLong(4);
-
- //Sanity check
- if (pageOrd != ord)
- {
- throw new IllegalStateException("Unexpected pageOrd: " + pageOrd + " expected: " + ord);
- }
-
- ReferenceInfo ri = new ReferenceInfo(msgId, deliveryCount, sched);
-
- refs.add(ri);
- ord++;
- }
-
- //Sanity check
- if (ord != orderStart + number)
- {
- throw new IllegalStateException("Didn't load expected number of references, loaded: " + (ord - orderStart) +
- " expected: " + number);
- }
-
- return refs;
- }
- catch (Exception e)
- {
- wrap.exceptionOccurred();
- throw e;
- }
- finally
- {
- closeResultSet(rs);
- closeStatement(ps);
- closeConnection(conn);
- wrap.end();
- }
- }
-
- /*
- * Load the initial, non paged refs
- */
- public InitialLoadInfo loadFromStart(final long channelID, final int number) throws Exception
- {
- if (trace) { log.trace("loading initial reference infos for channel " + channelID); }
-
- Connection conn = null;
- PreparedStatement ps = null;
- ResultSet rs = null;
- TransactionWrapper wrap = new TransactionWrapper();
-
- try
- {
- conn = ds.getConnection();
-
- //First we get the values for min() and max() page order
- ps = conn.prepareStatement(getSQLStatement("SELECT_MIN_MAX_PAGE_ORD"));
-
- ps.setLong(1, channelID);
-
- rs = ps.executeQuery();
-
- rs.next();
-
- Long minOrdering = new Long(rs.getLong(1));
-
- if (rs.wasNull())
- {
- minOrdering = null;
- }
-
- Long maxOrdering = new Long(rs.getLong(2));
-
- if (rs.wasNull())
- {
- maxOrdering = null;
- }
-
- ps.close();
-
- ps = null;
-
- ps = conn.prepareStatement(getSQLStatement("LOAD_UNPAGED_REFS"));
-
- ps.setLong(1, channelID);
-
- rs = ps.executeQuery();
-
- List<ReferenceInfo> refs = new ArrayList<ReferenceInfo>();
-
- int count = 0;
- while (rs.next())
- {
- long msgId = rs.getLong(1);
- int deliveryCount = rs.getInt(2);
- long sched = rs.getLong(3);
-
- ReferenceInfo ri = new ReferenceInfo(msgId, deliveryCount, sched);
-
- if (count < number)
- {
- refs.add(ri);
- }
-
- count++;
- }
-
- //No refs paged
-
- if (count > number)
- {
- throw new IllegalStateException("Cannot load channel " + channelID + " since the fullSize parameter is too small to load " +
- " all the required references, fullSize needs to be at least " + count + " it is currently " + number);
- }
-
- return new InitialLoadInfo(minOrdering, maxOrdering, refs);
- }
- catch (Exception e)
- {
- wrap.exceptionOccurred();
- throw e;
- }
- finally
- {
- closeResultSet(rs);
- closeStatement(ps);
- closeConnection(conn);
- wrap.end();
- }
- }
-
-
// End of paging functionality
// ===========================
More information about the jboss-cvs-commits
mailing list