[jboss-cvs] JBoss Messaging SVN: r3297 - in branches/Branch_Stable: tests/src/org/jboss/test/messaging/core/paging and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Nov 7 14:36:53 EST 2007


Author: timfox
Date: 2007-11-07 14:36:53 -0500 (Wed, 07 Nov 2007)
New Revision: 3297

Modified:
   branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
   branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/PagingChannelSupport.java
   branches/Branch_Stable/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_ReloadTest.java
Log:
http://jira.jboss.org/jira/browse/JBMESSAGING-1139


Modified: branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java	2007-11-07 16:34:07 UTC (rev 3296)
+++ branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java	2007-11-07 19:36:53 UTC (rev 3297)
@@ -70,97 +70,107 @@
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @author <a href="mailto:adrian at jboss.org">Adrian Brock</a>
  * @author <a href="mailto:juha at jboss.org">Juha Lindfors</a>
- *
+ * 
  * @version <tt>1.1</tt>
- *
+ * 
  * JDBCPersistenceManager.java,v 1.1 2006/02/22 17:33:41 timfox Exp
  */
-public class JDBCPersistenceManager extends JDBCSupport implements PersistenceManager
+public class JDBCPersistenceManager extends JDBCSupport implements
+      PersistenceManager
 {
    // Constants -----------------------------------------------------
-   
-   private static final Logger log = Logger.getLogger(JDBCPersistenceManager.class); 
 
+   private static final Logger log = Logger
+         .getLogger(JDBCPersistenceManager.class);
+
    // Static --------------------------------------------------------
-      
+
    private boolean trace = log.isTraceEnabled();
-      
+
    private boolean usingBinaryStream = true;
-   
+
    private boolean usingTrailingByte = false;
-   
+
    private int maxParams;
-   
+
    private short orderCount;
-   
+
    private int nodeID;
-   
+
    private boolean nodeIDSet;
-   
-   // Some versions of the oracle driver don't support binding blobs on select clauses,
-   // what would force us to use a two stage insert (insert and if successful, update)
+
+   // Some versions of the oracle driver don't support binding blobs on select
+   // clauses,
+   // what would force us to use a two stage insert (insert and if successful,
+   // update)
    private boolean supportsBlobSelect;
 
    // Constructors --------------------------------------------------
-    
-   public JDBCPersistenceManager(DataSource ds, TransactionManager tm, Properties sqlProperties,
-                                 boolean createTablesOnStartup, boolean usingBatchUpdates,
-                                 boolean usingBinaryStream, boolean usingTrailingByte, int maxParams,
-                                 boolean supportsBlobSelect)
+
+   public JDBCPersistenceManager(DataSource ds, TransactionManager tm,
+         Properties sqlProperties, boolean createTablesOnStartup,
+         boolean usingBatchUpdates, boolean usingBinaryStream,
+         boolean usingTrailingByte, int maxParams, boolean supportsBlobSelect)
    {
       super(ds, tm, sqlProperties, createTablesOnStartup);
-      
-      //usingBatchUpdates is currently ignored due to sketchy support from databases
-      
+
+      // usingBatchUpdates is currently ignored due to sketchy support from
+      // databases
+
       this.usingBinaryStream = usingBinaryStream;
-      
+
       this.usingTrailingByte = usingTrailingByte;
-      
-      this.maxParams = maxParams;    
-         
+
+      this.maxParams = maxParams;
+
       this.supportsBlobSelect = supportsBlobSelect;
    }
-   
-   
+
    // MessagingComponent overrides ---------------------------------
-   
+
    public void start() throws Exception
    {
       super.start();
 
       Connection conn = null;
-      
+
       PreparedStatement ps = null;
 
       TransactionWrapper wrap = new TransactionWrapper();
 
       try
       {
-         conn = ds.getConnection();      
-         //JBossMessaging requires transaction isolation of READ_COMMITTED
-         //Any looser isolation level and we cannot maintain consistency for paging (HSQL)
+         conn = ds.getConnection();
+         // JBossMessaging requires transaction isolation of READ_COMMITTED
+         // Any looser isolation level and we cannot maintain consistency for
+         // paging (HSQL)
          if (conn.getTransactionIsolation() != Connection.TRANSACTION_READ_COMMITTED)
          {
             int level = conn.getTransactionIsolation();
 
-            String warn =
-               "\n\n" +
-               "JBoss Messaging Warning: DataSource connection transaction isolation should be READ_COMMITTED, but it is currently " + Util.transactionIsolationToString(level) + ".\n" +
-               "                         Using an isolation level less strict than READ_COMMITTED may lead to data consistency problems.\n" +
-               "                         Using an isolation level more strict than READ_COMMITTED may lead to deadlock.\n";
+            String warn = "\n\n"
+                  + "JBoss Messaging Warning: DataSource connection transaction isolation should be READ_COMMITTED, but it is currently "
+                  + Util.transactionIsolationToString(level)
+                  + ".\n"
+                  + "                         Using an isolation level less strict than READ_COMMITTED may lead to data consistency problems.\n"
+                  + "                         Using an isolation level more strict than READ_COMMITTED may lead to deadlock.\n";
             log.warn(warn);
          }
-         
+
          log.debug("Adding record on JBM_DUAL");
-         
-         //Now we need to insert a row in the DUAL table if it doesn't contain one already
+
+         // Now we need to insert a row in the DUAL table if it doesn't contain
+         // one already
          ps = conn.prepareStatement(this.getSQLStatement("INSERT_DUAL"));
-         
+
          try
          {
-         	int rows = ps.executeUpdate();
-         
-         	if (trace) { log.trace("Inserted " + rows + " rows into dual"); }
+            int rows = ps.executeUpdate();
+
+            if (trace)
+            {
+               log.trace("Inserted " + rows + " rows into dual");
+            }
          }
          catch (SQLException e)
          {
@@ -171,23 +181,31 @@
             log.debug("Checking for existance on JBM_DUAL");
 
             Statement selectCount = conn.createStatement();
-            ResultSet rset = selectCount.executeQuery(this.getSQLStatement("CHECK_DUAL"));
+            ResultSet rset = selectCount.executeQuery(this
+                  .getSQLStatement("CHECK_DUAL"));
             try
             {
-               // if JBM_DUAL is empty, and if an exception happened, we should warn!
-               // if JBM_DUAL has a line already, we don't care about the exception...
+               // if JBM_DUAL is empty, and if an exception happened, we should
+               // warn!
+               // if JBM_DUAL has a line already, we don't care about the
+               // exception...
                if (!rset.next())
                {
-                  log.debug("JBM_DUAL didn't have a record.. throwing exception", e);
+                  log
+                        .debug(
+                              "JBM_DUAL didn't have a record.. throwing exception",
+                              e);
                   throw e;
                }
 
-
-               // if there are two lines or more on JBM_DUAL, that is also a problem
+               // if there are two lines or more on JBM_DUAL, that is also a
+               // problem
                if (rset.next())
                {
-                  log.debug("duplicated record found on JBM_DUAL... throwing exception");
-                  throw new IllegalStateException("JBM_DUAL is missing a primary key as it allowed a duplicate value");
+                  log
+                        .debug("duplicated record found on JBM_DUAL... throwing exception");
+                  throw new IllegalStateException(
+                        "JBM_DUAL is missing a primary key as it allowed a duplicate value");
                }
             }
             finally
@@ -201,7 +219,7 @@
                {
                }
             }
-         }         
+         }
       }
       catch (Exception e)
       {
@@ -214,88 +232,93 @@
          closeConnection(conn);
          wrap.end();
       }
-         
+
       log.debug(this + " started");
    }
-   
+
    // Injection -------------------------------------------------
-   
+
    // This is only known by server peer so we inject it after startup
-   
+
    public void injectNodeID(int nodeID)
    {
-   	this.nodeID = nodeID;
-   	
-   	this.nodeIDSet = true;
-   }      
-   
+      this.nodeID = nodeID;
+
+      this.nodeIDSet = true;
+   }
+
    // PersistenceManager implementation -------------------------
-         
+
    // Related to XA Recovery
    // ======================
-   
-   public List getMessageChannelPairRefsForTx(long transactionId) throws Exception
+
+   public List getMessageChannelPairRefsForTx(long transactionId)
+         throws Exception
    {
       String sql = this.getSQLStatement("SELECT_MESSAGE_ID_FOR_REF");
       return getMessageChannelPair(sql, transactionId);
    }
-   
-   public List getMessageChannelPairAcksForTx(long transactionId) throws Exception
+
+   public List getMessageChannelPairAcksForTx(long transactionId)
+         throws Exception
    {
       String sql = this.getSQLStatement("SELECT_MESSAGE_ID_FOR_ACK");
       return getMessageChannelPair(sql, transactionId);
    }
-   
+
    public List retrievePreparedTransactions() throws Exception
    {
       if (!this.nodeIDSet)
       {
-      	//Sanity
-      	throw new IllegalStateException("Node id has not been set");
+         // Sanity
+         throw new IllegalStateException("Node id has not been set");
       }
-   	
-      /* Note the API change for 1.0.2 XA Recovery -- List now contains instances of PreparedTxInfo<TxId, Xid>
-       * instead of direct Xids [JPL] */
-      
+
+      /*
+       * Note the API change for 1.0.2 XA Recovery -- List now contains
+       * instances of PreparedTxInfo<TxId, Xid> instead of direct Xids [JPL]
+       */
+
       Connection conn = null;
       PreparedStatement st = null;
       ResultSet rs = null;
       PreparedTxInfo txInfo = null;
       TransactionWrapper wrap = new TransactionWrapper();
-      
+
       try
       {
-         List<PreparedTxInfo> transactions = new ArrayList<PreparedTxInfo> ();
-         
+         List<PreparedTxInfo> transactions = new ArrayList<PreparedTxInfo>();
+
          conn = ds.getConnection();
-         
-         st = conn.prepareStatement(getSQLStatement("SELECT_PREPARED_TRANSACTIONS"));
-         
+
+         st = conn
+               .prepareStatement(getSQLStatement("SELECT_PREPARED_TRANSACTIONS"));
+
          st.setInt(1, nodeID);
-         
+
          rs = st.executeQuery();
-         
+
          while (rs.next())
          {
-            //get the existing tx id --MK START
+            // get the existing tx id --MK START
             long txId = rs.getLong(1);
-            
+
             byte[] branchQual = getVarBinaryColumn(rs, 2);
-            
+
             int formatId = rs.getInt(3);
-            
+
             byte[] globalTxId = getVarBinaryColumn(rs, 4);
-            
+
             Xid xid = new MessagingXid(branchQual, formatId, globalTxId);
-            
+
             // create a tx info object with the result set detailsdetails
             txInfo = new PreparedTxInfo(txId, xid);
-            
+
             transactions.add(txInfo);
          }
-         
+
          return transactions;
-         
+
       }
       catch (Exception e)
       {
@@ -304,110 +327,134 @@
       }
       finally
       {
-      	closeResultSet(rs);
-      	closeStatement(st);
-      	closeConnection(conn);
+         closeResultSet(rs);
+         closeStatement(st);
+         closeConnection(conn);
          wrap.end();
       }
    }
-         
+
    // Related to counters
    // ===================
-   
-   public long reserveIDBlock(final String counterName, final int size) throws Exception
+
+   public long reserveIDBlock(final String counterName, final int size)
+         throws Exception
    {
-      if (trace) { log.trace("Getting ID block for counter " + counterName + ", size " + size); }
-      
-      if (size <= 0)
+      if (trace)
       {
-         throw new IllegalArgumentException("block size must be > 0");
+         log.trace("Getting ID block for counter " + counterName + ", size "
+               + size);
       }
-      
+
+      if (size <= 0) { throw new IllegalArgumentException(
+            "block size must be > 0"); }
+
       class ReserveIDBlockRunner extends JDBCTxRunner2<Long>
       {
-      	public Long doTransaction() throws Exception
-   		{
-            //	For the clustered case - this MUST use SELECT .. FOR UPDATE or a similar
-            //construct the locks the row
+         public Long doTransaction() throws Exception
+         {
+            // For the clustered case - this MUST use SELECT .. FOR UPDATE or a
+            // similar
+            // construct the locks the row
             String selectCounterSQL = getSQLStatement("SELECT_COUNTER");
-            
+
             PreparedStatement ps = null;
             ResultSet rs = null;
-            
+
             try
-            {            
-	            ps = conn.prepareStatement(selectCounterSQL);
-	
-	            ps.setString(1, counterName);
-	            
-	            rs = ps.executeQuery();
-	            
-	            if (trace) { log.trace(JDBCUtil.statementToString(selectCounterSQL, counterName)); }         
-	            
-	            if (!rs.next())
-	            {
-	               rs.close();
-	               rs = null;
-	               
-	               ps.close();
-	               
-	               //There is a very small possibility that two threads will attempt to insert the same counter
-	               //at the same time, if so, then the second one will fail eventually after a few retries by throwing
-	               //a primary key violation.
-	               
-	               String insertCounterSQL = getSQLStatement("INSERT_COUNTER");
-	               
-	               ps = conn.prepareStatement(insertCounterSQL);
-	               
-	               ps.setString(1, counterName);
-	               ps.setLong(2, size);
-	               
-	               int rows = ps.executeUpdate();
-	               if (trace) { log.trace(JDBCUtil.statementToString(insertCounterSQL, counterName, new Integer(size)) + " inserted " + rows + " rows"); }
-	               
-	               return 0L;
-	            }
-	            
-	            long nextId = rs.getLong(1);
-	            
-	            ps.close();
-	
-	            String updateCounterSQL = getSQLStatement("UPDATE_COUNTER");
-	
-	            ps = conn.prepareStatement(updateCounterSQL);
-	            
-	            ps.setLong(1, nextId + size);
-	            ps.setString(2, counterName);
+            {
+               ps = conn.prepareStatement(selectCounterSQL);
 
-	            int rows = ps.executeUpdate();
+               ps.setString(1, counterName);
 
-	            if (trace) { log.trace(JDBCUtil.statementToString(updateCounterSQL, new Long(nextId + size), counterName) + " updated " + rows + " rows"); }
+               rs = ps.executeQuery();
 
-	            return nextId;
+               if (trace)
+               {
+                  log.trace(JDBCUtil.statementToString(selectCounterSQL,
+                        counterName));
+               }
+
+               if (!rs.next())
+               {
+                  rs.close();
+                  rs = null;
+
+                  ps.close();
+
+                  // There is a very small possibility that two threads will
+                  // attempt to insert the same counter
+                  // at the same time, if so, then the second one will fail
+                  // eventually after a few retries by throwing
+                  // a primary key violation.
+
+                  String insertCounterSQL = getSQLStatement("INSERT_COUNTER");
+
+                  ps = conn.prepareStatement(insertCounterSQL);
+
+                  ps.setString(1, counterName);
+                  ps.setLong(2, size);
+
+                  int rows = ps.executeUpdate();
+                  if (trace)
+                  {
+                     log.trace(JDBCUtil.statementToString(insertCounterSQL,
+                           counterName, new Integer(size))
+                           + " inserted " + rows + " rows");
+                  }
+
+                  return 0L;
+               }
+
+               long nextId = rs.getLong(1);
+
+               ps.close();
+
+               String updateCounterSQL = getSQLStatement("UPDATE_COUNTER");
+
+               ps = conn.prepareStatement(updateCounterSQL);
+
+               ps.setLong(1, nextId + size);
+               ps.setString(2, counterName);
+
+               int rows = ps.executeUpdate();
+
+               if (trace)
+               {
+                  log.trace(JDBCUtil.statementToString(updateCounterSQL,
+                        new Long(nextId + size), counterName)
+                        + " updated " + rows + " rows");
+               }
+
+               return nextId;
             }
             finally
             {
-            	closeResultSet(rs);
-            	closeStatement(ps);
+               closeResultSet(rs);
+               closeStatement(ps);
             }
-   		}
+         }
       }
-      
+
       return new ReserveIDBlockRunner().executeWithRetry();
    }
-         
+
    /*
-    * Retrieve a List of messages corresponding to the specified List of message ids.
-    * The implementation here for HSQLDB does this by using a PreparedStatment with an IN clause
-    * with a maximum of 100 elements.
-    * If there are more than maxParams message to retrieve this is repeated a number of times.
-    * For "Enterprise" databases (Oracle, DB2, Sybase etc) a more sophisticated technique should be used
-    * e.g. Oracle ARRAY types in Oracle which can be submitted as a param to an Oracle prepared statement
+    * Retrieve a List of messages corresponding to the specified List of message
+    * ids. The implementation here for HSQLDB does this by using a
+    * PreparedStatment with an IN clause with a maximum of 100 elements. If
+    * there are more than maxParams message to retrieve this is repeated a
+    * number of times. For "Enterprise" databases (Oracle, DB2, Sybase etc) a
+    * more sophisticated technique should be used e.g. Oracle ARRAY types in
+    * Oracle which can be submitted as a param to an Oracle prepared statement
     * Although this would all be DB specific.
     */
    public List getMessages(final List messageIds) throws Exception
    {
-      if (trace) { log.trace("Getting batch of messages for " + messageIds); }
+      if (trace)
+      {
+         log.trace("Getting batch of messages for " + messageIds);
+      }
 
       class GetMessageListTX extends JDBCTxRunner2<List>
       {
@@ -431,8 +478,9 @@
                {
                   if (ps == null)
                   {
-                     //PreparedStatements are cached in the JCA layer so we will never actually have more than
-                     //100 distinct ones
+                     // PreparedStatements are cached in the JCA layer so we
+                     // will never actually have more than
+                     // 100 distinct ones
                      int numParams;
                      if (count < (size / maxParams) * maxParams)
                      {
@@ -442,8 +490,11 @@
                      {
                         numParams = size % maxParams;
                      }
-                     StringBuffer buff = new StringBuffer(getSQLStatement("LOAD_MESSAGES"));
-                     buff.append(" WHERE ").append(getSQLStatement("MESSAGE_ID_COLUMN")).append(" IN (");
+                     StringBuffer buff = new StringBuffer(
+                           getSQLStatement("LOAD_MESSAGES"));
+                     buff.append(" WHERE ").append(
+                           getSQLStatement("MESSAGE_ID_COLUMN"))
+                           .append(" IN (");
                      for (int i = 0; i < numParams; i++)
                      {
                         buff.append("?");
@@ -461,7 +512,7 @@
                      }
                   }
 
-                  long msgId = ((Long)iter.next()).longValue();
+                  long msgId = ((Long) iter.next()).longValue();
 
                   ps.setLong((count % maxParams) + 1, msgId);
 
@@ -491,8 +542,9 @@
 
                         byte type = rs.getByte(8);
 
-                        Message m = MessageFactory.createMessage(messageId, reliable, expiration, timestamp, priority,
-                                                                 headers, payload, type);
+                        Message m = MessageFactory.createMessage(messageId,
+                              reliable, expiration, timestamp, priority,
+                              headers, payload, type);
                         msgs.add(m);
                      }
 
@@ -504,7 +556,10 @@
                   }
                }
 
-               if (trace) { log.trace("Loaded " + msgs.size() + " messages in total"); }
+               if (trace)
+               {
+                  log.trace("Loaded " + msgs.size() + " messages in total");
+               }
 
                return msgs;
             }
@@ -520,270 +575,312 @@
          }
       }
 
-
       return new GetMessageListTX().executeWithRetry();
 
-      
    }
-   
-       
+
    // Related to paging functionality
-   // ===============================                 
-   
-   //Used to page NP messages or P messages in a non recoverable queue
-   
-   public void pageReferences(final long channelID, final List references, final boolean page) throws Exception
-   {  
-   	if (trace) { log.trace("Paging references in channel " + channelID + " refs " + references.size()); }
-   	
-   	class PageReferencesRunner extends JDBCTxRunner2
-      {	
-      	public Object doTransaction() throws Exception
-   		{
-      		PreparedStatement psInsertReference = null;
-      		PreparedStatement psInsertMessage = null;
+   // ===============================
+
+   // Used to page NP messages or P messages in a non recoverable queue
+
+   public void pageReferences(final long channelID, final List references,
+         final boolean page) throws Exception
+   {
+      if (trace)
+      {
+         log.trace("Paging references in channel " + channelID + " refs "
+               + references.size());
+      }
+
+      class PageReferencesRunner extends JDBCTxRunner2
+      {
+         public Object doTransaction() throws Exception
+         {
+            PreparedStatement psInsertReference = null;
+            PreparedStatement psInsertMessage = null;
             PreparedStatement psUpdateMessage = null;
 
             try
-      		{      		
-	      		Iterator iter = references.iterator();
-	
-	         	psInsertReference = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE_REF"));
+            {
+               Iterator iter = references.iterator();
 
+               psInsertReference = conn
+                     .prepareStatement(getSQLStatement("INSERT_MESSAGE_REF"));
+
                if (supportsBlobSelect)
                {
-                  psInsertMessage = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE_CONDITIONAL_FULL"));
+                  psInsertMessage = conn
+                        .prepareStatement(getSQLStatement("INSERT_MESSAGE_CONDITIONAL_FULL"));
                }
                else
                {
-                  psInsertMessage = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE_CONDITIONAL"));
-                  psUpdateMessage = conn.prepareStatement(getSQLStatement("UPDATE_MESSAGE_4CONDITIONAL"));
+                  psInsertMessage = conn
+                        .prepareStatement(getSQLStatement("INSERT_MESSAGE_CONDITIONAL"));
+                  psUpdateMessage = conn
+                        .prepareStatement(getSQLStatement("UPDATE_MESSAGE_4CONDITIONAL"));
                }
-	
+
                while (iter.hasNext())
-	         	{
-	         		//We may need to persist the message itself 
-	         		MessageReference ref = (MessageReference) iter.next();
-	
-	         		//For non reliable refs we insert the ref (and maybe the message) itself
-	
-	         		//Now store the reference
-	
-	         		log.trace("Paged ref with page order " + ref.getPagingOrder());
-	
-	         		addReference(channelID, ref, psInsertReference, page);
-	
-	         		int rows = psInsertReference.executeUpdate();
-	
-	         		if (trace)
-	         		{
-	         			log.trace("Inserted " + rows + " rows");
-	         		}
-	
-	         		//Maybe we need to persist the message itself
-	         		Message m = ref.getMessage();
+               {
+                  // We may need to persist the message itself
+                  MessageReference ref = (MessageReference) iter.next();
 
+                  // For non reliable refs we insert the ref (and maybe the
+                  // message) itself
+
+                  // Now store the reference
+
+                  log
+                        .trace("Paged ref with page order "
+                              + ref.getPagingOrder());
+
+                  addReference(channelID, ref, psInsertReference, page);
+
+                  int rows = psInsertReference.executeUpdate();
+
+                  if (trace)
+                  {
+                     log.trace("Inserted " + rows + " rows");
+                  }
+
+                  // Maybe we need to persist the message itself
+                  Message m = ref.getMessage();
+
                   rows = storeMessage(m, psInsertMessage, psUpdateMessage);
 
-                  if (trace) { log.trace("Inserted " + rows + " rows"); }
-	         	} 
-	         	
-	         	return null;
-      		}
-      		finally
-      		{
-      			closeStatement(psInsertReference);
+                  if (trace)
+                  {
+                     log.trace("Inserted " + rows + " rows");
+                  }
+               }
+
+               return null;
+            }
+            finally
+            {
+               closeStatement(psInsertReference);
                closeStatement(psInsertMessage);
                closeStatement(psUpdateMessage);
-      		}
-   		}
+            }
+         }
       }
-   	
-   	new PageReferencesRunner().executeWithRetry();   	
+
+      new PageReferencesRunner().executeWithRetry();
    }
-         
-   //After loading paged refs this is used to remove any NP or P messages in a unrecoverable channel
-   public void removeDepagedReferences(final long channelID, final List references) throws Exception
-   {   	
-      if (trace) { log.trace(this + " Removing depaged " + references.size() + " refs from channel " + channelID); }
-      
+
+   // After loading paged refs this is used to remove any NP or P messages in a
+   // unrecoverable channel
+   public void removeDepagedReferences(final long channelID,
+         final List references) throws Exception
+   {
+      if (trace)
+      {
+         log.trace(this + " Removing depaged " + references.size()
+               + " refs from channel " + channelID);
+      }
+
       class RemoveDepagedReferencesRunner extends JDBCTxRunner2
       {
-      	public Object doTransaction() throws Exception
-   		{
-		      PreparedStatement psDeleteReference = null; 
-		      
-		      try
-		      {	
-		         psDeleteReference = conn.prepareStatement(getSQLStatement("DELETE_MESSAGE_REF"));
-		         
-		         Iterator iter = references.iterator();		         
-		
-		         while (iter.hasNext())
-		         {
-		            MessageReference ref = (MessageReference) iter.next();
-		                                                             
-		            removeReference(channelID, ref, psDeleteReference);
-		            
-		            int rows = psDeleteReference.executeUpdate();
-		            
-		            if (trace) { log.trace("Deleted " + rows + " references"); }
-		            
-		         }         
-		         
-		         return null;
-		      }		      
-		      finally
-		      {
-		      	closeStatement(psDeleteReference);       
-		      }      
-   		}
+         public Object doTransaction() throws Exception
+         {
+            PreparedStatement psDeleteReference = null;
+
+            try
+            {
+               psDeleteReference = conn
+                     .prepareStatement(getSQLStatement("DELETE_MESSAGE_REF"));
+
+               Iterator iter = references.iterator();
+
+               while (iter.hasNext())
+               {
+                  MessageReference ref = (MessageReference) iter.next();
+
+                  removeReference(channelID, ref, psDeleteReference);
+
+                  int rows = psDeleteReference.executeUpdate();
+
+                  if (trace)
+                  {
+                     log.trace("Deleted " + rows + " references");
+                  }
+
+               }
+
+               return null;
+            }
+            finally
+            {
+               closeStatement(psDeleteReference);
+            }
+         }
       }
-            
+
       new RemoveDepagedReferencesRunner().executeWithRetry();
-      
+
       deleteMessages(references);
-   }   
-     
+   }
+
    // After loading paged refs this is used to update P messages to non paged
-   public void updateReferencesNotPagedInRange(final long channelID, final long orderStart, final long orderEnd, final long num) throws Exception
+   public void updateReferencesNotPagedInRange(final long channelID,
+         final long orderStart, final long orderEnd, final long num)
+         throws Exception
    {
-      if (trace) { log.trace("Updating paged references for channel " + channelID + " between " + orderStart + " and " + orderEnd); }
-      
+      if (trace)
+      {
+         log.trace("Updating paged references for channel " + channelID
+               + " between " + orderStart + " and " + orderEnd);
+      }
+
       class UpdateReferencesNotPagedInRangeRunner extends JDBCTxRunner2
       {
-      	public Object doTransaction() throws Exception
-   		{
-             PreparedStatement ps = null;
-             
-             try
-             {              
-		         ps = conn.prepareStatement(getSQLStatement("UPDATE_REFS_NOT_PAGED"));
-		                 
-		         ps.setLong(1, orderStart);
-		         
-		         ps.setLong(2, orderEnd);
-		         
-		         ps.setLong(3, channelID);
-		         
-		         int rows = ps.executeUpdate();
-		           
-		         if (trace) { log.trace(JDBCUtil.statementToString(getSQLStatement("UPDATE_REFS_NOT_PAGED"), new Long(channelID),
-		                                new Long(orderStart), new Long(orderEnd)) + " updated " + rows + " rows"); }
-		
-		         //Sanity check
-		         if (rows != num)
-		         {
-		            throw new IllegalStateException("Did not update correct number of rows");
-		         }     
-		         
-		         return null;
-		      }
-		      finally
-		      {
-		      	closeStatement(ps);
-		      }
-   		}
+         public Object doTransaction() throws Exception
+         {
+            PreparedStatement ps = null;
+
+            try
+            {
+               ps = conn
+                     .prepareStatement(getSQLStatement("UPDATE_REFS_NOT_PAGED"));
+
+               ps.setLong(1, orderStart);
+
+               ps.setLong(2, orderEnd);
+
+               ps.setLong(3, channelID);
+
+               int rows = ps.executeUpdate();
+
+               if (trace)
+               {
+                  log.trace(JDBCUtil.statementToString(
+                        getSQLStatement("UPDATE_REFS_NOT_PAGED"), new Long(
+                              channelID), new Long(orderStart), new Long(
+                              orderEnd))
+                        + " updated " + rows + " rows");
+               }
+
+               // Sanity check
+               if (rows != num) { throw new IllegalStateException(
+                     "Did not update correct number of rows"); }
+
+               return null;
+            }
+            finally
+            {
+               closeStatement(ps);
+            }
+         }
       }
-      
+
       new UpdateReferencesNotPagedInRangeRunner().executeWithRetry();
    }
-   
-   public void updatePageOrder(final long channelID, final List references) throws Exception
+
+   public void updatePageOrder(final long channelID, final List references)
+         throws Exception
    {
-      if (trace) { log.trace("Updating page order for channel:" + channelID); }
-      
+      if (trace)
+      {
+         log.trace("Updating page order for channel:" + channelID);
+      }
+
       class UpdatePageOrderRunner extends JDBCTxRunner2
       {
-      	public Object doTransaction() throws Exception
-   		{      
-      		PreparedStatement psUpdateReference = null;
-      		try
-      		{
-		         Iterator iter = references.iterator();
-		         
-		         psUpdateReference = conn.prepareStatement(getSQLStatement("UPDATE_PAGE_ORDER"));
-		
-		         while (iter.hasNext())
-		         {
-		            MessageReference ref = (MessageReference) iter.next();
-		                 
-		            psUpdateReference.setLong(1, ref.getPagingOrder());
-		
-		            psUpdateReference.setLong(2, ref.getMessage().getMessageID());
-		            
-		            psUpdateReference.setLong(3, channelID);
-		            
-		            int rows = psUpdateReference.executeUpdate();
-		               
-		            if (trace) { log.trace("Updated " + rows + " rows"); }		            
-		         }
-		                     
-		         return null;
-		      }
-		      finally
-		      {
-		      	closeStatement(psUpdateReference);
-		      }    
-   		}
+         public Object doTransaction() throws Exception
+         {
+            PreparedStatement psUpdateReference = null;
+            try
+            {
+               Iterator iter = references.iterator();
+
+               psUpdateReference = conn
+                     .prepareStatement(getSQLStatement("UPDATE_PAGE_ORDER"));
+
+               while (iter.hasNext())
+               {
+                  MessageReference ref = (MessageReference) iter.next();
+
+                  psUpdateReference.setLong(1, ref.getPagingOrder());
+
+                  psUpdateReference.setLong(2, ref.getMessage().getMessageID());
+
+                  psUpdateReference.setLong(3, channelID);
+
+                  int rows = psUpdateReference.executeUpdate();
+
+                  if (trace)
+                  {
+                     log.trace("Updated " + rows + " rows");
+                  }
+               }
+
+               return null;
+            }
+            finally
+            {
+               closeStatement(psUpdateReference);
+            }
+         }
       }
-      
-      new UpdatePageOrderRunner().executeWithRetry();      
+
+      new UpdatePageOrderRunner().executeWithRetry();
    }
-      
-   public List getPagedReferenceInfos(final long channelID, final long orderStart, final int number) throws Exception
+
+   public List getPagedReferenceInfos(final long channelID,
+         final long orderStart, final int number) throws Exception
    {
-      if (trace) { log.trace("loading message reference info for channel " + channelID + " from " + orderStart + " number " + number);      }
-                 
+      if (trace)
+      {
+         log.trace("loading message reference info for channel " + channelID
+               + " from " + orderStart + " number " + number);
+      }
+
       List<ReferenceInfo> refs = new ArrayList<ReferenceInfo>();
-      
+
       Connection conn = null;
       PreparedStatement ps = null;
       ResultSet rs = null;
       TransactionWrapper wrap = new TransactionWrapper();
-      
+
       try
       {
          conn = ds.getConnection();
-         
+
          ps = conn.prepareStatement(getSQLStatement("LOAD_PAGED_REFS"));
-         
+
          ps.setLong(1, channelID);
-         
+
          ps.setLong(2, orderStart);
-         
+
          ps.setLong(3, orderStart + number - 1);
-         
+
          rs = ps.executeQuery();
-         
+
          long ord = orderStart;
-         
+
          while (rs.next())
          {
-            long msgId = rs.getLong(1);     
+            long msgId = rs.getLong(1);
             int deliveryCount = rs.getInt(2);
             int pageOrd = rs.getInt(3);
             long sched = rs.getLong(4);
-            
-            //Sanity check
-            if (pageOrd != ord)
-            {
-               throw new IllegalStateException("Unexpected pageOrd: " + pageOrd + " expected: " + ord);
-            }
-            
+
+            // Sanity check
+            if (pageOrd != ord) { throw new IllegalStateException(
+                  "Unexpected pageOrd: " + pageOrd + " expected: " + ord); }
+
             ReferenceInfo ri = new ReferenceInfo(msgId, deliveryCount, sched);
-            
+
             refs.add(ri);
             ord++;
          }
-         
-         //Sanity check
-         if (ord != orderStart + number)
-         {
-            throw new IllegalStateException("Didn't load expected number of references, loaded: " + (ord - orderStart) +
-                                            " expected: " + number);
-         }
-         
+
+         // Sanity check
+         if (ord != orderStart + number) { throw new IllegalStateException(
+               "Didn't load expected number of references, loaded: "
+                     + (ord - orderStart) + " expected: " + number); }
+
          return refs;
       }
       catch (Exception e)
@@ -793,89 +890,135 @@
       }
       finally
       {
-      	closeResultSet(rs);
-      	closeStatement(ps);
-      	closeConnection(conn);       
+         closeResultSet(rs);
+         closeStatement(ps);
+         closeConnection(conn);
          wrap.end();
-      }      
-   }   
-   
+      }
+   }
+
    /*
     * Load the initial, non paged refs
     */
-   public InitialLoadInfo loadFromStart(final long channelID, final int number) throws Exception
+   public InitialLoadInfo loadFromStart(final long channelID, final int number)
+         throws Exception
    {
-      if (trace) { log.trace("loading initial reference infos for channel " + channelID);  }
-                    
+      if (trace)
+      {
+         log.trace("loading initial reference infos for channel " + channelID);
+      }
+
       Connection conn = null;
       PreparedStatement ps = null;
       ResultSet rs = null;
       TransactionWrapper wrap = new TransactionWrapper();
-                                
+
       try
       {
-         conn = ds.getConnection();         
-         
-         //First we get the values for min() and max() page order
+         conn = ds.getConnection();
+
+         // First we get the values for min() and max() page order
          ps = conn.prepareStatement(getSQLStatement("SELECT_MIN_MAX_PAGE_ORD"));
-         
+
          ps.setLong(1, channelID);
-         
+
          rs = ps.executeQuery();
-                  
+
          rs.next();
-         
+
          Long minOrdering = new Long(rs.getLong(1));
-         
+
          if (rs.wasNull())
          {
             minOrdering = null;
          }
-         
+
          Long maxOrdering = new Long(rs.getLong(2));
-         
+
          if (rs.wasNull())
          {
             maxOrdering = null;
          }
-         
+
          ps.close();
-         
-         ps = null;
-         
+
          ps = conn.prepareStatement(getSQLStatement("LOAD_UNPAGED_REFS"));
-         
+
          ps.setLong(1, channelID);
-                 
+
          rs = ps.executeQuery();
-         
+
          List<ReferenceInfo> refs = new ArrayList<ReferenceInfo>();
-         
+
+         List<ReferenceInfo> refsToUpdate = new ArrayList<ReferenceInfo>();
+
          int count = 0;
          while (rs.next())
          {
-            long msgId = rs.getLong(1);            
+            long msgId = rs.getLong(1);
             int deliveryCount = rs.getInt(2);
             long sched = rs.getLong(3);
-            
+
             ReferenceInfo ri = new ReferenceInfo(msgId, deliveryCount, sched);
-            
+
             if (count < number)
             {
                refs.add(ri);
-            }            
-            
+            }
+            else
+            {
+               refsToUpdate.add(ri);
+            }
+
             count++;
          }
-                  
-         //No refs paged
-            
-         if (count > number)
+
+         // No refs paged
+
+         if (!refsToUpdate.isEmpty())
          {
-            throw new IllegalStateException("Cannot load channel " + channelID + " since the fullSize parameter is too small to load " +
-                     " all the required references, fullSize needs to be at least " + count + " it is currently " + number);
+            // Take any overflow and convert them to paged refs
+
+            ps.close();
+            ps = conn.prepareStatement(getSQLStatement("UPDATE_PAGE_ORDER"));
+
+            Iterator<ReferenceInfo> iter = refsToUpdate.iterator();
+
+            long ordering = 0;
+
+            if (maxOrdering != null)
+            {
+               ordering = maxOrdering.longValue() + 1;
+            }
+
+            while (iter.hasNext())
+            {
+               ReferenceInfo ri = (ReferenceInfo) iter.next();
+
+               ps.setLong(1, ordering);
+
+               ps.setLong(2, ri.getMessageId());
+
+               ps.setLong(3, channelID);
+
+               int rows = ps.executeUpdate();
+
+               if (trace)
+               {
+                  log.trace("Updated " + rows + " rows");
+               }
+
+               ordering++;
+            }
+
+            if (minOrdering == null)
+            {
+               minOrdering = new Long(0);
+            }
+
+            maxOrdering = new Long(ordering - 1);
          }
-                         
+
          return new InitialLoadInfo(minOrdering, maxOrdering, refs);
       }
       catch (Exception e)
@@ -885,379 +1028,429 @@
       }
       finally
       {
-      	closeResultSet(rs);
-      	closeStatement(ps);      	
-      	closeConnection(conn);       
+         closeResultSet(rs);
+         closeStatement(ps);
+         closeConnection(conn);
          wrap.end();
-      }      
-   }   
-   
-   
+      }
+   }
+
    // Merging functionality
    // --------------------
-   
-   public void mergeTransactions(final long fromChannelID, final long toChannelID) throws Exception
+
+   public void mergeTransactions(final long fromChannelID,
+         final long toChannelID) throws Exception
    {
-      if (trace) { log.trace("Merging transactions from channel " + fromChannelID + " to " + toChannelID); }
-      
-      // Sanity check
-      
-      if (fromChannelID == toChannelID)
+      if (trace)
       {
-      	throw new IllegalArgumentException("Cannot merge transactions - they have the same channel id!!");
+         log.trace("Merging transactions from channel " + fromChannelID
+               + " to " + toChannelID);
       }
-      
+
+      // Sanity check
+
+      if (fromChannelID == toChannelID) { throw new IllegalArgumentException(
+            "Cannot merge transactions - they have the same channel id!!"); }
+
       class MergeTransactionsRunner extends JDBCTxRunner2
       {
-      	public Object doTransaction() throws Exception
-   		{
-		      PreparedStatement statement = null;
-		      try
-		      {
-		         statement = conn.prepareStatement(getSQLStatement("UPDATE_TX"));
-		         statement.setLong(1, toChannelID);
-		         statement.setLong(2, fromChannelID);
-		         int affected = statement.executeUpdate();
-		
-		         log.debug("Merged " + affected + " transactions from channel " + fromChannelID + " into node " + toChannelID);
-		         
-		         return null;
-		      }
-		      finally
-		      {
-		         closeStatement(statement);
-		      }
-   		}
+         public Object doTransaction() throws Exception
+         {
+            PreparedStatement statement = null;
+            try
+            {
+               statement = conn.prepareStatement(getSQLStatement("UPDATE_TX"));
+               statement.setLong(1, toChannelID);
+               statement.setLong(2, fromChannelID);
+               int affected = statement.executeUpdate();
+
+               log.debug("Merged " + affected + " transactions from channel "
+                     + fromChannelID + " into node " + toChannelID);
+
+               return null;
+            }
+            finally
+            {
+               closeStatement(statement);
+            }
+         }
       }
-      
+
       new MergeTransactionsRunner().executeWithRetry();
    }
-   
-   public InitialLoadInfo mergeAndLoad(final long fromChannelID, final long toChannelID, final int numberToLoad, final long firstPagingOrder, final long nextPagingOrder) throws Exception
+
+   public InitialLoadInfo mergeAndLoad(final long fromChannelID,
+         final long toChannelID, final int numberToLoad,
+         final long firstPagingOrder, final long nextPagingOrder)
+         throws Exception
    {
-      if (trace) { log.trace("Merging channel from " + fromChannelID + " to " + toChannelID + " numberToLoad:" + numberToLoad + " firstPagingOrder:" + firstPagingOrder + " nextPagingOrder:" + nextPagingOrder); }
-      
-      //Sanity
-      
-      if (fromChannelID == toChannelID)
+      if (trace)
       {
-      	throw new IllegalArgumentException("Cannot merge queues - they have the same channel id!!");
+         log.trace("Merging channel from " + fromChannelID + " to "
+               + toChannelID + " numberToLoad:" + numberToLoad
+               + " firstPagingOrder:" + firstPagingOrder + " nextPagingOrder:"
+               + nextPagingOrder);
       }
 
+      // Sanity
+
+      if (fromChannelID == toChannelID) { throw new IllegalArgumentException(
+            "Cannot merge queues - they have the same channel id!!"); }
+
       class MergeAndLoadRunner extends JDBCTxRunner2
       {
-      	public Object doTransaction() throws Exception
-   		{      
-		      PreparedStatement ps = null;
-		      ResultSet rs = null;      
-		      PreparedStatement ps2 = null;
-		    
-		      try
-		      {
-		         /*
-		          * If channel is paging and has full size f
-		          * 
-		          * then we don't need to load any refs but we need to:
-		          * 
-		          * make sure the page ord is correct across the old paged and new refs
-		          * 
-		          * we know the max page ord (from the channel) for the old refs so we just need to:
-		          * 
-		          * 1) Iterate through the failed channel and update page_ord = max + 1, max + 2 etc
-		          * 
-		          * 2) update channel id
-		          * 
-		          * 
-		          * If channel is not paging and the total refs before and after <=f
-		          * 
-		          * 1) Load all refs from failed channel
-		          * 
-		          * 2) Update channel id
-		          * 
-		          * return those refs
-		          * 
-		          * 
-		          * If channel is not paging but total new refs > f
-		          * 
-		          * 1) Iterate through failed channel refs and take the first x to make the channel full
-		          * 
-		          * 2) Update the others with page_ord starting at zero 
-		          * 
-		          * 3) Update channel id
-		          * 
-		          * In general:
-		          * 
-		          * We have number to load n, max page size p
-		          * 
-		          * 1) Iterate through failed channel refs in page_ord order
-		          * 
-		          * 2) Put the first n in a List.
-		          * 
-		          * 3) Initialise page_ord_count to be p or 0 depending on whether it was specified
-		          * 
-		          * 4) Update the page_ord of the remaining refs accordiningly
-		          * 
-		          * 5) Update the channel id
-		          * 
-		          */
-		         
-		         //First load the refs from the failed channel
-		
-		         List<ReferenceInfo> refs = new ArrayList<ReferenceInfo>();
-		         
-		         ps = conn.prepareStatement(getSQLStatement("LOAD_REFS"));
-		         
-		         ps.setLong(1, fromChannelID);
-		                 
-		         rs = ps.executeQuery();
-		         
-		         int count = 0;
-		         
-		         boolean arePaged = false;
-		         
-		         long pageOrd = nextPagingOrder;
-		         
-		         while (rs.next())
-		         {
-		            long msgId = rs.getLong(1);            
-		            int deliveryCount = rs.getInt(2);
-		            long sched = rs.getLong(3);
-		            
-		            if (count < numberToLoad)
-		            {           
-		               ReferenceInfo ri = new ReferenceInfo(msgId, deliveryCount, sched);
-		               
-		               refs.add(ri);
-		            }
-		            
-		            // Set page ord
-		            
-		            if (ps2 == null)
-		            {
-		               ps2 = conn.prepareStatement(getSQLStatement("UPDATE_PAGE_ORDER"));
-		            }
-		                
-		            if (count < numberToLoad)
-		            {
-		               ps2.setNull(1, Types.BIGINT);
-		               
-		               if (trace) { log.trace("Set page ord to null"); }
-		            }
-		            else
-		            {                                 
-		               ps2.setLong(1, pageOrd);
-		               
-		               if (trace) { log.trace("Set page ord to " + pageOrd); }
-		               
-		               arePaged = true; 
-		               
-		               pageOrd++;                      
-		            }
-		            
-		            ps2.setLong(2, msgId);
-		            
-		            ps2.setLong(3, fromChannelID);
-		            
-		            int rows = ps2.executeUpdate();
-		            
-		            if (trace) { log.trace("Update page ord updated " + rows + " rows"); }
-		
-		            count++;            
-		         }
-		         
-		         ps.close();
-		         
-		         ps = null;
-		         
-		         // Now swap the channel id
-		         
-		         ps = conn.prepareStatement(getSQLStatement("UPDATE_CHANNEL_ID"));
-		         
-		         ps.setLong(1, toChannelID);
-		         
-		         ps.setLong(2, fromChannelID);
-		         
-		         int rows = ps.executeUpdate();
-		         
-		         if (trace) { log.trace("Update channel id updated " + rows + " rows"); }
-		                           
-		         if (arePaged)
-		         {            
-		            return new InitialLoadInfo(new Long(firstPagingOrder), new Long(pageOrd - 1), refs);
-		         }
-		         else
-		         {
-		            return new InitialLoadInfo(null, null, refs);
-		         }         
-		      }
-		      finally
-		      {      
-		         closeResultSet(rs);
-		      	closeStatement(ps);
-		      	closeStatement(ps2);
-		      }
-   		}
+         public Object doTransaction() throws Exception
+         {
+            PreparedStatement ps = null;
+            ResultSet rs = null;
+            PreparedStatement ps2 = null;
+
+            try
+            {
+               /*
+                * If channel is paging and has full size f
+                * 
+                * then we don't need to load any refs but we need to:
+                * 
+                * make sure the page ord is correct across the old paged and new
+                * refs
+                * 
+                * we know the max page ord (from the channel) for the old refs
+                * so we just need to:
+                * 
+                * 1) Iterate through the failed channel and update page_ord =
+                * max + 1, max + 2 etc
+                * 
+                * 2) update channel id
+                * 
+                * 
+                * If channel is not paging and the total refs before and after
+                * <=f
+                * 
+                * 1) Load all refs from failed channel
+                * 
+                * 2) Update channel id
+                * 
+                * return those refs
+                * 
+                * 
+                * If channel is not paging but total new refs > f
+                * 
+                * 1) Iterate through failed channel refs and take the first x to
+                * make the channel full
+                * 
+                * 2) Update the others with page_ord starting at zero
+                * 
+                * 3) Update channel id
+                * 
+                * In general:
+                * 
+                * We have number to load n, max page size p
+                * 
+                * 1) Iterate through failed channel refs in page_ord order
+                * 
+                * 2) Put the first n in a List.
+                * 
+                * 3) Initialise page_ord_count to be p or 0 depending on whether
+                * it was specified
+                * 
+                * 4) Update the page_ord of the remaining refs accordiningly
+                * 
+                * 5) Update the channel id
+                * 
+                */
+
+               // First load the refs from the failed channel
+               List<ReferenceInfo> refs = new ArrayList<ReferenceInfo>();
+
+               ps = conn.prepareStatement(getSQLStatement("LOAD_REFS"));
+
+               ps.setLong(1, fromChannelID);
+
+               rs = ps.executeQuery();
+
+               int count = 0;
+
+               boolean arePaged = false;
+
+               long pageOrd = nextPagingOrder;
+
+               while (rs.next())
+               {
+                  long msgId = rs.getLong(1);
+                  int deliveryCount = rs.getInt(2);
+                  long sched = rs.getLong(3);
+
+                  if (count < numberToLoad)
+                  {
+                     ReferenceInfo ri = new ReferenceInfo(msgId, deliveryCount,
+                           sched);
+
+                     refs.add(ri);
+                  }
+
+                  // Set page ord
+
+                  if (ps2 == null)
+                  {
+                     ps2 = conn
+                           .prepareStatement(getSQLStatement("UPDATE_PAGE_ORDER"));
+                  }
+
+                  if (count < numberToLoad)
+                  {
+                     ps2.setNull(1, Types.BIGINT);
+
+                     if (trace)
+                     {
+                        log.trace("Set page ord to null");
+                     }
+                  }
+                  else
+                  {
+                     ps2.setLong(1, pageOrd);
+
+                     if (trace)
+                     {
+                        log.trace("Set page ord to " + pageOrd);
+                     }
+
+                     arePaged = true;
+
+                     pageOrd++;
+                  }
+
+                  ps2.setLong(2, msgId);
+
+                  ps2.setLong(3, fromChannelID);
+
+                  int rows = ps2.executeUpdate();
+
+                  if (trace)
+                  {
+                     log.trace("Update page ord updated " + rows + " rows");
+                  }
+
+                  count++;
+               }
+
+               ps.close();
+
+               ps = null;
+
+               // Now swap the channel id
+
+               ps = conn.prepareStatement(getSQLStatement("UPDATE_CHANNEL_ID"));
+
+               ps.setLong(1, toChannelID);
+
+               ps.setLong(2, fromChannelID);
+
+               int rows = ps.executeUpdate();
+
+               if (trace)
+               {
+                  log.trace("Update channel id updated " + rows + " rows");
+               }
+
+               if (arePaged)
+               {
+                  return new InitialLoadInfo(new Long(firstPagingOrder),
+                        new Long(pageOrd - 1), refs);
+               }
+               else
+               {
+                  return new InitialLoadInfo(null, null, refs);
+               }
+            }
+            finally
+            {
+               closeResultSet(rs);
+               closeStatement(ps);
+               closeStatement(ps2);
+            }
+         }
       }
-      return (InitialLoadInfo)new MergeAndLoadRunner().executeWithRetry();      
+      return (InitialLoadInfo) new MergeAndLoadRunner().executeWithRetry();
    }
-   
+
    public void testSpeed() throws Exception
    {
-   	
+
    }
-   
+
    // End of paging functionality
    // ===========================
-   
-   public void addReference(final long channelID, final MessageReference ref, final Transaction tx) throws Exception
-   {         	
-   	class AddReferenceRunner extends JDBCTxRunner2
-   	{
-			public Object doTransaction() throws Exception
-			{
-	         PreparedStatement psReference = null;
+
+   public void addReference(final long channelID, final MessageReference ref,
+         final Transaction tx) throws Exception
+   {
+      class AddReferenceRunner extends JDBCTxRunner2
+      {
+         public Object doTransaction() throws Exception
+         {
+            PreparedStatement psReference = null;
             PreparedStatement psInsertMessage = null;
 
-	         Message m = ref.getMessage();     
-	         
-	         try
-	         {                          
-	            psReference = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE_REF"));
-	            
-	            // Add the reference
-	            addReference(channelID, ref, psReference, false);
-	            
-	            int rows = psReference.executeUpdate();  
-	            
-	            if (trace) { log.trace("Inserted " + rows + " rows"); }
-	              
-	            if (!m.isPersisted())
-	            {
-	               // First time so persist the message
-                  psInsertMessage = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE"));
+            Message m = ref.getMessage();
 
+            try
+            {
+               psReference = conn
+                     .prepareStatement(getSQLStatement("INSERT_MESSAGE_REF"));
+
+               // Add the reference
+               addReference(channelID, ref, psReference, false);
+
+               int rows = psReference.executeUpdate();
+
+               if (trace)
+               {
+                  log.trace("Inserted " + rows + " rows");
+               }
+
+               if (!m.isPersisted())
+               {
+                  // First time so persist the message
+                  psInsertMessage = conn
+                        .prepareStatement(getSQLStatement("INSERT_MESSAGE"));
+
                   storeMessage(m, psInsertMessage, true);
                   rows = psInsertMessage.executeUpdate();
-		            
-		            if (trace) { log.trace("Inserted/updated " + rows + " rows"); }     
-		            
-		            log.trace("message Inserted/updated " + rows + " rows");  
-		            
-		            //Needs to be at the end - in case an exception is thrown in which case retry will be attempted and we want to insert it again
-	               m.setPersisted(true);	                 
-	            }
-	            
-	            return null;
-	         }
-	         finally
-	         {
-	         	closeStatement(psReference);
+
+                  if (trace)
+                  {
+                     log.trace("Inserted/updated " + rows + " rows");
+                  }
+
+                  log.trace("message Inserted/updated " + rows + " rows");
+
+                  // Needs to be at the end - in case an exception is thrown in
+                  // which case retry will be attempted and we want to insert it
+                  // again
+                  m.setPersisted(true);
+               }
+
+               return null;
+            }
+            finally
+            {
+               closeStatement(psReference);
                closeStatement(psInsertMessage);
-	         }
-			}   		
-   	}
-   	   	
+            }
+         }
+      }
+
       if (tx != null)
       {
-         //In a tx so we just add the ref in the tx in memory for now
+         // In a tx so we just add the ref in the tx in memory for now
          TransactionCallback callback = getCallback(tx);
 
          callback.addReferenceToAdd(channelID, ref);
       }
       else
-      {         
-         //No tx so add the ref directly in the db
-         new AddReferenceRunner().executeWithRetry();         
+      {
+         // No tx so add the ref directly in the db
+         new AddReferenceRunner().executeWithRetry();
       }
    }
-   
-   public void updateDeliveryCount(final long channelID, final MessageReference ref) throws Exception
+
+   public void updateDeliveryCount(final long channelID,
+         final MessageReference ref) throws Exception
    {
-   	class UpdateDeliveryCountRunner extends JDBCTxRunner2
-   	{   		
-			public Object doTransaction() throws Exception
-			{   	
-		      PreparedStatement psReference = null;
-		       
-		      try
-		      {                                    
-		         psReference = conn.prepareStatement(getSQLStatement("UPDATE_DELIVERY_COUNT"));
-		         
-		         psReference.setInt(1, ref.getDeliveryCount());
-		         
-		         psReference.setLong(2, channelID);
-		         
-		         psReference.setLong(3, ref.getMessage().getMessageID());
-		         
-		         int rows = psReference.executeUpdate();
-		
-		         if (trace) { log.trace("Updated " + rows + " rows"); }
-		         
-		         return null;
-		      }
-		      finally
-		      {
-		      	closeStatement(psReference);                        
-		      }  
-			}
-   	}
-   	
-   	new UpdateDeliveryCountRunner().executeWithRetry();
+      class UpdateDeliveryCountRunner extends JDBCTxRunner2
+      {
+         public Object doTransaction() throws Exception
+         {
+            PreparedStatement psReference = null;
+
+            try
+            {
+               psReference = conn
+                     .prepareStatement(getSQLStatement("UPDATE_DELIVERY_COUNT"));
+
+               psReference.setInt(1, ref.getDeliveryCount());
+
+               psReference.setLong(2, channelID);
+
+               psReference.setLong(3, ref.getMessage().getMessageID());
+
+               int rows = psReference.executeUpdate();
+
+               if (trace)
+               {
+                  log.trace("Updated " + rows + " rows");
+               }
+
+               return null;
+            }
+            finally
+            {
+               closeStatement(psReference);
+            }
+         }
+      }
+
+      new UpdateDeliveryCountRunner().executeWithRetry();
    }
-   
-   public void removeReference(final long channelID, final MessageReference ref, final Transaction tx) throws Exception
-   {      
-   	class RemoveReferenceRunner extends JDBCTxRunner2
-   	{
-			public Object doTransaction() throws Exception
-			{  
-				PreparedStatement psReference = null;
-				     
-	         try
-	         {
-	            psReference = conn.prepareStatement(getSQLStatement("DELETE_MESSAGE_REF"));	           
-	            
-	            //Remove the message reference
-	            removeReference(channelID, ref, psReference);
-	             
-	            int rows = psReference.executeUpdate();
-	            
-	            if (rows != 1)
-	            {
-	               log.warn("Failed to remove row for: " + ref);
-	               return null;
-	            }
-	            
-	            if (trace) { log.trace("Deleted " + rows + " references"); }
-	                  	            
-	            return null;
-	         }
-	         finally
-	         {
-	         	closeStatement(psReference);
-	         }  
-			}
-   	}
-           
+
+   public void removeReference(final long channelID,
+         final MessageReference ref, final Transaction tx) throws Exception
+   {
+      class RemoveReferenceRunner extends JDBCTxRunner2
+      {
+         public Object doTransaction() throws Exception
+         {
+            PreparedStatement psReference = null;
+
+            try
+            {
+               psReference = conn
+                     .prepareStatement(getSQLStatement("DELETE_MESSAGE_REF"));
+
+               // Remove the message reference
+               removeReference(channelID, ref, psReference);
+
+               int rows = psReference.executeUpdate();
+
+               if (rows != 1)
+               {
+                  log.warn("Failed to remove row for: " + ref);
+                  return null;
+               }
+
+               if (trace)
+               {
+                  log.trace("Deleted " + rows + " references");
+               }
+
+               return null;
+            }
+            finally
+            {
+               closeStatement(psReference);
+            }
+         }
+      }
+
       if (tx != null)
       {
-         //In a tx so we just add the ref in the tx in memory for now
+         // In a tx so we just add the ref in the tx in memory for now
 
          TransactionCallback callback = getCallback(tx);
 
          callback.addReferenceToRemove(channelID, ref);
       }
       else
-      {         
-         //No tx so we remove the reference directly from the db
+      {
+         // No tx so we remove the reference directly from the db
 
          new RemoveReferenceRunner().executeWithRetry();
 
          deleteMessage(ref.getMessage().getMessageID());
       }
    }
-   
-   
+
    public boolean referenceExists(long messageID) throws Exception
    {
       Connection conn = null;
@@ -1269,7 +1462,8 @@
       {
          conn = ds.getConnection();
 
-         st = conn.prepareStatement(getSQLStatement("SELECT_EXISTS_REF_MESSAGE_ID"));
+         st = conn
+               .prepareStatement(getSQLStatement("SELECT_EXISTS_REF_MESSAGE_ID"));
          st.setLong(1, messageID);
 
          rs = st.executeQuery();
@@ -1290,24 +1484,24 @@
       }
       finally
       {
-      	closeResultSet(rs);
-      	closeStatement(st);
-      	closeConnection(conn);
+         closeResultSet(rs);
+         closeStatement(st);
+         closeConnection(conn);
          wrap.end();
       }
    }
 
    // Public --------------------------------------------------------
-   
+
    public String toString()
    {
       return "JDBCPersistenceManager[" + Integer.toHexString(hashCode()) + "]";
-   }   
-   
+   }
+
    // Package protected ---------------------------------------------
-   
+
    // Protected -----------------------------------------------------
-      
+
    protected TransactionCallback getCallback(Transaction tx)
    {
       TransactionCallback callback = (TransactionCallback) tx.getCallback(this);
@@ -1321,332 +1515,393 @@
 
       return callback;
    }
-   
-   protected void handleBeforeCommit1PC(final List refsToAdd, final List refsToRemove, final Transaction tx)
-      throws Exception
+
+   protected void handleBeforeCommit1PC(final List refsToAdd,
+         final List refsToRemove, final Transaction tx) throws Exception
    {
-   	class HandleBeforeCommit1PCRunner extends JDBCTxRunner2
-   	{
-			public Object doTransaction() throws Exception
-			{
-				// For one phase we simply add rows corresponding to the refs and remove rows corresponding to
-		      // the deliveries in one jdbc tx. We also need to store messages as necessary,
-		      // depending on whether they've already been stored or still referenced by other channels.
-		         
-		      PreparedStatement psReference = null;
-		      PreparedStatement psInsertMessage = null;
+      class HandleBeforeCommit1PCRunner extends JDBCTxRunner2
+      {
+         public Object doTransaction() throws Exception
+         {
+            // For one phase we simply add rows corresponding to the refs and
+            // remove rows corresponding to
+            // the deliveries in one jdbc tx. We also need to store messages as
+            // necessary,
+            // depending on whether they've already been stored or still
+            // referenced by other channels.
+
+            PreparedStatement psReference = null;
+            PreparedStatement psInsertMessage = null;
             PreparedStatement psDeleteReference = null;
-      
-		      List<Message> messagesStored = new ArrayList<Message>();
 
-		      try
-		      {
-		         // First the adds
+            List<Message> messagesStored = new ArrayList<Message>();
 
-		         for (Iterator i = refsToAdd.iterator(); i.hasNext(); )
-		         {
-		            ChannelRefPair pair = (ChannelRefPair)i.next();
-		            MessageReference ref = pair.ref;
-		                                                
-		            psReference = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE_REF"));
-		            
-		            // Now store the reference
-		            addReference(pair.channelID, ref, psReference, false);
-		              
-		            int rows = psReference.executeUpdate();
-		               
-		            if (trace) { log.trace("Inserted " + rows + " rows"); }                              
-     
-		            Message m = ref.getMessage();    
-		            
-		            synchronized (m)
-		            {            
-			            if (!m.isPersisted())
-			            {   
-			            	if (psInsertMessage == null)
-			            	{
-                           psInsertMessage = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE"));
-			            	}
-			            	
-			               // First time so add message
-                        // And in case of clustered queues/topics, the message could possibly be already persisted on the different node
+            try
+            {
+               // First the adds
+
+               for (Iterator i = refsToAdd.iterator(); i.hasNext();)
+               {
+                  ChannelRefPair pair = (ChannelRefPair) i.next();
+                  MessageReference ref = pair.ref;
+
+                  psReference = conn
+                        .prepareStatement(getSQLStatement("INSERT_MESSAGE_REF"));
+
+                  // Now store the reference
+                  addReference(pair.channelID, ref, psReference, false);
+
+                  int rows = psReference.executeUpdate();
+
+                  if (trace)
+                  {
+                     log.trace("Inserted " + rows + " rows");
+                  }
+
+                  Message m = ref.getMessage();
+
+                  synchronized (m)
+                  {
+                     if (!m.isPersisted())
+                     {
+                        if (psInsertMessage == null)
+                        {
+                           psInsertMessage = conn
+                                 .prepareStatement(getSQLStatement("INSERT_MESSAGE"));
+                        }
+
+                        // First time so add message
+                        // And in case of clustered queues/topics, the message
+                        // could possibly be already persisted on the different
+                        // node
                         // so we persist also using the Conditional Update
-                        if (trace) { log.trace("Message does not already exist so inserting it"); }
+                        if (trace)
+                        {
+                           log
+                                 .trace("Message does not already exist so inserting it");
+                        }
                         storeMessage(m, psInsertMessage, true);
                         rows = psInsertMessage.executeUpdate();
-                        if (trace) { log.trace("Inserted " + rows + " rows"); }
+                        if (trace)
+                        {
+                           log.trace("Inserted " + rows + " rows");
+                        }
 
-			               m.setPersisted(true);
-			               
-			               messagesStored.add(m);
-			            }
-		            }
-		         }         
-		         
-		         // Now the removes
+                        m.setPersisted(true);
 
-		         for (Iterator i = refsToRemove.iterator(); i.hasNext(); )
-		         {
-		            ChannelRefPair pair = (ChannelRefPair)i.next();
-		            
-		            if (psDeleteReference == null)
-		            {
-		            	psDeleteReference = conn.prepareStatement(getSQLStatement("DELETE_MESSAGE_REF"));
-		            }
+                        messagesStored.add(m);
+                     }
+                  }
+               }
 
-		            removeReference(pair.channelID, pair.ref, psDeleteReference);
-		                        
-		            int rows = psDeleteReference.executeUpdate();
-		            
-		            if (trace) { log.trace("Deleted " + rows + " references"); }
-		            
-		         }
-		         
-		         return null;
-		      }
-		      catch (Exception e)
-		      {
-		      	for (Iterator i = messagesStored.iterator(); i.hasNext(); )
-		         {
-		      		Message msg = (Message)i.next();
-		      		
-		      		msg.setPersisted(false);
-		         }
-		      	throw e;
-		      }
-		      finally
-		      {
+               // Now the removes
+
+               for (Iterator i = refsToRemove.iterator(); i.hasNext();)
+               {
+                  ChannelRefPair pair = (ChannelRefPair) i.next();
+
+                  if (psDeleteReference == null)
+                  {
+                     psDeleteReference = conn
+                           .prepareStatement(getSQLStatement("DELETE_MESSAGE_REF"));
+                  }
+
+                  removeReference(pair.channelID, pair.ref, psDeleteReference);
+
+                  int rows = psDeleteReference.executeUpdate();
+
+                  if (trace)
+                  {
+                     log.trace("Deleted " + rows + " references");
+                  }
+
+               }
+
+               return null;
+            }
+            catch (Exception e)
+            {
+               for (Iterator i = messagesStored.iterator(); i.hasNext();)
+               {
+                  Message msg = (Message) i.next();
+
+                  msg.setPersisted(false);
+               }
+               throw e;
+            }
+            finally
+            {
                closeStatement(psReference);
-		      	closeStatement(psDeleteReference);
-		      	closeStatement(psInsertMessage);
-		      }
-			}   		
-   	}   	      
-      
+               closeStatement(psDeleteReference);
+               closeStatement(psInsertMessage);
+            }
+         }
+      }
+
       new HandleBeforeCommit1PCRunner().executeWithRetry();
-      
+
       this.deleteMessages(refsToRemove);
    }
-   
-   protected void handleBeforeCommit2PC(final List refsToRemove, final Transaction tx) throws Exception
-   {          
-   	class HandleBeforeCommit2PCRunner extends JDBCTxRunner2
-   	{
-   		public Object doTransaction() throws Exception
-			{				   	
-		   	PreparedStatement ps = null;
-		      
-		      if (trace) { log.trace(this + " commitPreparedTransaction, tx= " + tx); }
-		        
-		      try
-		      {
-		         ps = conn.prepareStatement(getSQLStatement("COMMIT_MESSAGE_REF1"));
-		         
-		         ps.setLong(1, tx.getId());        
-		         
-		         int rows = ps.executeUpdate();
-		         
-		         if (trace) { log.trace(JDBCUtil.statementToString(getSQLStatement("COMMIT_MESSAGE_REF1"), new Long(tx.getId())) + " removed " + rows + " row(s)");  }
-		         
-		         ps.close();
-		         ps = null;
-		         		         
-		         ps = conn.prepareStatement(getSQLStatement("COMMIT_MESSAGE_REF2"));
-		         ps.setLong(1, tx.getId());         
-		         
-		         rows = ps.executeUpdate();
-		         
-		         if (trace) { log.trace(JDBCUtil.statementToString(getSQLStatement("COMMIT_MESSAGE_REF2"), new Long(tx.getId())) + " updated " + rows + " row(s)"); }
-		         
-		         removeTXRecord(conn, tx);
-		         
-		         return null;
-		      }
-		      finally
-		      {
-		      	closeStatement(ps);
-		      }
-			}
-   	}
-   	
+
+   protected void handleBeforeCommit2PC(final List refsToRemove,
+         final Transaction tx) throws Exception
+   {
+      class HandleBeforeCommit2PCRunner extends JDBCTxRunner2
+      {
+         public Object doTransaction() throws Exception
+         {
+            PreparedStatement ps = null;
+
+            if (trace)
+            {
+               log.trace(this + " commitPreparedTransaction, tx= " + tx);
+            }
+
+            try
+            {
+               ps = conn
+                     .prepareStatement(getSQLStatement("COMMIT_MESSAGE_REF1"));
+
+               ps.setLong(1, tx.getId());
+
+               int rows = ps.executeUpdate();
+
+               if (trace)
+               {
+                  log.trace(JDBCUtil.statementToString(
+                        getSQLStatement("COMMIT_MESSAGE_REF1"), new Long(tx
+                              .getId()))
+                        + " removed " + rows + " row(s)");
+               }
+
+               ps.close();
+               ps = null;
+
+               ps = conn
+                     .prepareStatement(getSQLStatement("COMMIT_MESSAGE_REF2"));
+               ps.setLong(1, tx.getId());
+
+               rows = ps.executeUpdate();
+
+               if (trace)
+               {
+                  log.trace(JDBCUtil.statementToString(
+                        getSQLStatement("COMMIT_MESSAGE_REF2"), new Long(tx
+                              .getId()))
+                        + " updated " + rows + " row(s)");
+               }
+
+               removeTXRecord(conn, tx);
+
+               return null;
+            }
+            finally
+            {
+               closeStatement(ps);
+            }
+         }
+      }
+
       new HandleBeforeCommit2PCRunner().executeWithRetry();
-      
-      this.deleteMessages(refsToRemove);        	
+
+      this.deleteMessages(refsToRemove);
    }
-   
-   protected void handleBeforePrepare(final List refsToAdd, final List refsToRemove, final Transaction tx) throws Exception
+
+   protected void handleBeforePrepare(final List refsToAdd,
+         final List refsToRemove, final Transaction tx) throws Exception
    {
-   	class HandleBeforePrepareRunner extends JDBCTxRunner2
-   	{
-			public Object doTransaction() throws Exception
-			{
-				//We insert a tx record and
-		      //a row for each ref with +
-		      //and update the row for each delivery with "-"
-		      
-		      PreparedStatement psReference = null;
-		      PreparedStatement psInsertMessage = null;
+      class HandleBeforePrepareRunner extends JDBCTxRunner2
+      {
+         public Object doTransaction() throws Exception
+         {
+            // We insert a tx record and
+            // a row for each ref with +
+            // and update the row for each delivery with "-"
+
+            PreparedStatement psReference = null;
+            PreparedStatement psInsertMessage = null;
             PreparedStatement psUpdateReference = null;
 
-		      List<Message> messagesStored = new ArrayList<Message>();
-		 
-		      try
-		      { 
-		         //Insert the tx record
-		         if (!refsToAdd.isEmpty() || !refsToRemove.isEmpty())
-		         {
-		            addTXRecord(conn, tx);
-		         }
-		         
-		         Iterator iter = refsToAdd.iterator();
+            List<Message> messagesStored = new ArrayList<Message>();
 
-		         while (iter.hasNext())
-		         {
-		            ChannelRefPair pair = (ChannelRefPair) iter.next();
-		            
-		            if (psReference == null)
-		            {
-		            	psReference = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE_REF"));
-		            }
-		            
-		            prepareToAddReference(pair.channelID, pair.ref, tx, psReference);
-		            
-		            int rows = psReference.executeUpdate();
-		               
-		            if (trace) { log.trace("Inserted " + rows + " rows"); }
-           
-		            Message m = pair.ref.getMessage(); 
-		            
-		            synchronized (m)
-		            {            
-			            if (!m.isPersisted())
-			            {	            	
-			            	if (psInsertMessage == null)
-			            	{
-                           psInsertMessage = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE"));
-			            	}
+            try
+            {
+               // Insert the tx record
+               if (!refsToAdd.isEmpty() || !refsToRemove.isEmpty())
+               {
+                  addTXRecord(conn, tx);
+               }
 
+               Iterator iter = refsToAdd.iterator();
+
+               while (iter.hasNext())
+               {
+                  ChannelRefPair pair = (ChannelRefPair) iter.next();
+
+                  if (psReference == null)
+                  {
+                     psReference = conn
+                           .prepareStatement(getSQLStatement("INSERT_MESSAGE_REF"));
+                  }
+
+                  prepareToAddReference(pair.channelID, pair.ref, tx,
+                        psReference);
+
+                  int rows = psReference.executeUpdate();
+
+                  if (trace)
+                  {
+                     log.trace("Inserted " + rows + " rows");
+                  }
+
+                  Message m = pair.ref.getMessage();
+
+                  synchronized (m)
+                  {
+                     if (!m.isPersisted())
+                     {
+                        if (psInsertMessage == null)
+                        {
+                           psInsertMessage = conn
+                                 .prepareStatement(getSQLStatement("INSERT_MESSAGE"));
+                        }
+
                         storeMessage(m, psInsertMessage, true);
                         rows = psInsertMessage.executeUpdate();
-				            
-			            	if (trace) { log.trace("Inserted " + rows + " rows"); }
 
-				            m.setPersisted(true);				   			
-				            
-				            messagesStored.add(m);
-			            }
-		            }
-		         }         
-		                  
-		         //Now the removes
-		         
-		         iter = refsToRemove.iterator();
-		         
-		         while (iter.hasNext())
-		         {
-		         	if (psUpdateReference == null)
-		         	{
-				         psUpdateReference = conn.prepareStatement(getSQLStatement("UPDATE_MESSAGE_REF"));         
-		         	}
-		         	
-		            ChannelRefPair pair = (ChannelRefPair) iter.next();
-		            
-		            prepareToRemoveReference(pair.channelID, pair.ref, tx, psUpdateReference);
-		            
-		            int rows = psUpdateReference.executeUpdate();
-		               
-		            if (trace) { log.trace("updated " + rows + " rows"); }          
-		         }         
-		         
-		         return null;
-		      }
-		      catch (Exception e)
-		      {
-		      	for (Iterator i = messagesStored.iterator(); i.hasNext(); )
-		         {
-		      		Message msg = (Message)i.next();
-		      		
-		      		msg.setPersisted(false);
-		         }
-		      	throw e;
-		      }
-		      finally
-		      {
-		      	closeStatement(psReference);
-		      	closeStatement(psInsertMessage);  
-		      	closeStatement(psUpdateReference);
-		      }
-			}   		
-   	}
-   	
-   	new HandleBeforePrepareRunner().executeWithRetry();
+                        if (trace)
+                        {
+                           log.trace("Inserted " + rows + " rows");
+                        }
+
+                        m.setPersisted(true);
+
+                        messagesStored.add(m);
+                     }
+                  }
+               }
+
+               // Now the removes
+
+               iter = refsToRemove.iterator();
+
+               while (iter.hasNext())
+               {
+                  if (psUpdateReference == null)
+                  {
+                     psUpdateReference = conn
+                           .prepareStatement(getSQLStatement("UPDATE_MESSAGE_REF"));
+                  }
+
+                  ChannelRefPair pair = (ChannelRefPair) iter.next();
+
+                  prepareToRemoveReference(pair.channelID, pair.ref, tx,
+                        psUpdateReference);
+
+                  int rows = psUpdateReference.executeUpdate();
+
+                  if (trace)
+                  {
+                     log.trace("updated " + rows + " rows");
+                  }
+               }
+
+               return null;
+            }
+            catch (Exception e)
+            {
+               for (Iterator i = messagesStored.iterator(); i.hasNext();)
+               {
+                  Message msg = (Message) i.next();
+
+                  msg.setPersisted(false);
+               }
+               throw e;
+            }
+            finally
+            {
+               closeStatement(psReference);
+               closeStatement(psInsertMessage);
+               closeStatement(psUpdateReference);
+            }
+         }
+      }
+
+      new HandleBeforePrepareRunner().executeWithRetry();
    }
-   
-   protected void handleBeforeRollback(final List refsToAdd, final Transaction tx) throws Exception
+
+   protected void handleBeforeRollback(final List refsToAdd,
+         final Transaction tx) throws Exception
    {
-   	class HandleBeforeRollbackRunner extends JDBCTxRunner2
-   	{
-			public Object doTransaction() throws Exception
-			{
-				PreparedStatement ps = null;
-		      
-		      try
-		      {
-		         ps = conn.prepareStatement(getSQLStatement("ROLLBACK_MESSAGE_REF1"));
-		         
-		         ps.setLong(1, tx.getId());         
-		         
-		         int rows = ps.executeUpdate();
-		         
-		         if (trace)
-		         {
-		            log.trace(JDBCUtil.statementToString(getSQLStatement("ROLLBACK_MESSAGE_REF1"), new Long(tx.getId())) + " removed " + rows + " row(s)");
-		         }
-		         
-		         ps.close();
-		         ps = null;
-		         
-		         ps = conn.prepareStatement(getSQLStatement("ROLLBACK_MESSAGE_REF2"));
-		         ps.setLong(1, tx.getId());
-		         
-		         rows = ps.executeUpdate();
-		         
-		         if (trace)
-		         {
-		            log.trace(JDBCUtil.statementToString(getSQLStatement("ROLLBACK_MESSAGE_REF2"), new Long(tx.getId())) + " updated " + rows
-		                  + " row(s)");
-		         }
-		         
-		         removeTXRecord(conn, tx);
-		         
-		         return null;
-		      }
-		      finally
-		      {
-		      	closeStatement(ps);
-		      }
-			}   	
-   	}
-   	
+      class HandleBeforeRollbackRunner extends JDBCTxRunner2
+      {
+         public Object doTransaction() throws Exception
+         {
+            PreparedStatement ps = null;
+
+            try
+            {
+               ps = conn
+                     .prepareStatement(getSQLStatement("ROLLBACK_MESSAGE_REF1"));
+
+               ps.setLong(1, tx.getId());
+
+               int rows = ps.executeUpdate();
+
+               if (trace)
+               {
+                  log.trace(JDBCUtil.statementToString(
+                        getSQLStatement("ROLLBACK_MESSAGE_REF1"), new Long(tx
+                              .getId()))
+                        + " removed " + rows + " row(s)");
+               }
+
+               ps.close();
+               ps = null;
+
+               ps = conn
+                     .prepareStatement(getSQLStatement("ROLLBACK_MESSAGE_REF2"));
+               ps.setLong(1, tx.getId());
+
+               rows = ps.executeUpdate();
+
+               if (trace)
+               {
+                  log.trace(JDBCUtil.statementToString(
+                        getSQLStatement("ROLLBACK_MESSAGE_REF2"), new Long(tx
+                              .getId()))
+                        + " updated " + rows + " row(s)");
+               }
+
+               removeTXRecord(conn, tx);
+
+               return null;
+            }
+            finally
+            {
+               closeStatement(ps);
+            }
+         }
+      }
+
       new HandleBeforeRollbackRunner().executeWithRetry();
-      
-      this.deleteMessages(refsToAdd);      	     
+
+      this.deleteMessages(refsToAdd);
    }
-   
-   
+
    protected void addTXRecord(Connection conn, Transaction tx) throws Exception
    {
       if (trace)
       {
          log.trace("Inserting tx record for " + tx);
       }
-      
+
       if (!this.nodeIDSet)
       {
-      	//Sanity
-      	throw new IllegalStateException("Node id has not been set");
+         // Sanity
+         throw new IllegalStateException("Node id has not been set");
       }
-      
+
       PreparedStatement ps = null;
       String statement = "UNDEFINED";
       int rows = -1;
@@ -1654,72 +1909,83 @@
       try
       {
          statement = getSQLStatement("INSERT_TRANSACTION");
-         
+
          ps = conn.prepareStatement(statement);
-         
+
          ps.setInt(1, nodeID);
-         
+
          ps.setLong(2, tx.getId());
-         
+
          Xid xid = tx.getXid();
-         
+
          formatID = xid.getFormatId();
-         
+
          setVarBinaryColumn(3, ps, xid.getBranchQualifier());
-         
+
          ps.setInt(4, formatID);
-         
+
          setVarBinaryColumn(5, ps, xid.getGlobalTransactionId());
-         
-         rows = ps.executeUpdate();         
+
+         rows = ps.executeUpdate();
       }
       finally
       {
          if (trace)
          {
-            String s = JDBCUtil.statementToString(statement, new Integer(nodeID), new Long(tx.getId()), "<byte-array>",
+            String s = JDBCUtil.statementToString(statement,
+                  new Integer(nodeID), new Long(tx.getId()), "<byte-array>",
                   new Integer(formatID), "<byte-array>");
-            log.trace(s + (rows == -1 ? " failed!" : " inserted " + rows + " row(s)"));
+            log
+                  .trace(s
+                        + (rows == -1 ? " failed!" : " inserted " + rows
+                              + " row(s)"));
          }
-         closeStatement(ps);      
+         closeStatement(ps);
       }
    }
-   
-   protected void removeTXRecord(Connection conn, Transaction tx) throws Exception
+
+   protected void removeTXRecord(Connection conn, Transaction tx)
+         throws Exception
    {
       if (!this.nodeIDSet)
       {
-      	//Sanity
-      	throw new IllegalStateException("Node id has not been set");
+         // Sanity
+         throw new IllegalStateException("Node id has not been set");
       }
-   	
+
       PreparedStatement ps = null;
       try
       {
          ps = conn.prepareStatement(getSQLStatement("DELETE_TRANSACTION"));
-         
+
          ps.setInt(1, nodeID);
-         
+
          ps.setLong(2, tx.getId());
-         
+
          int rows = ps.executeUpdate();
-         
+
          if (trace)
          {
-            log.trace(JDBCUtil.statementToString(getSQLStatement("DELETE_TRANSACTION"), new Integer(nodeID), new Long(tx.getId())) + " removed " + rows + " row(s)");
+            log.trace(JDBCUtil.statementToString(
+                  getSQLStatement("DELETE_TRANSACTION"), new Integer(nodeID),
+                  new Long(tx.getId()))
+                  + " removed " + rows + " row(s)");
          }
       }
       finally
       {
-      	closeStatement(ps);    
+         closeStatement(ps);
       }
-   }  
-   
+   }
+
    protected void addReference(long channelID, MessageReference ref,
-                               PreparedStatement ps, boolean paged) throws Exception
+         PreparedStatement ps, boolean paged) throws Exception
    {
-      if (trace) { log.trace("adding " + ref + " to channel " + channelID); }
-      
+      if (trace)
+      {
+         log.trace("adding " + ref + " to channel " + channelID);
+      }
+
       ps.setLong(1, channelID);
       ps.setLong(2, ref.getMessage().getMessageID());
       ps.setNull(3, Types.BIGINT);
@@ -1736,88 +2002,99 @@
       ps.setInt(7, ref.getDeliveryCount());
       ps.setLong(8, ref.getScheduledDeliveryTime());
    }
-   
-   protected void removeReference(long channelID, MessageReference ref, PreparedStatement ps)
-      throws Exception
+
+   protected void removeReference(long channelID, MessageReference ref,
+         PreparedStatement ps) throws Exception
    {
-      if (trace) { log.trace("removing " + ref + " from channel " + channelID); }
-      
+      if (trace)
+      {
+         log.trace("removing " + ref + " from channel " + channelID);
+      }
+
       ps.setLong(1, ref.getMessage().getMessageID());
-      ps.setLong(2, channelID);      
+      ps.setLong(2, channelID);
    }
-   
-   protected void prepareToAddReference(long channelID, MessageReference ref, Transaction tx, PreparedStatement ps)
-     throws Exception
+
+   protected void prepareToAddReference(long channelID, MessageReference ref,
+         Transaction tx, PreparedStatement ps) throws Exception
    {
-      if (trace) { log.trace("adding " + ref + " to channel " + channelID + (tx == null ? " non-transactionally" : " on transaction: " + tx)); }
-      
+      if (trace)
+      {
+         log.trace("adding "
+               + ref
+               + " to channel "
+               + channelID
+               + (tx == null ? " non-transactionally" : " on transaction: "
+                     + tx));
+      }
+
       ps.setLong(1, channelID);
       ps.setLong(2, ref.getMessage().getMessageID());
       ps.setLong(3, tx.getId());
       ps.setString(4, "+");
       ps.setLong(5, getOrdering());
-      ps.setNull(6, Types.BIGINT);      
+      ps.setNull(6, Types.BIGINT);
       ps.setInt(7, ref.getDeliveryCount());
       ps.setLong(8, ref.getScheduledDeliveryTime());
    }
-   
-   protected void prepareToRemoveReference(long channelID, MessageReference ref, Transaction tx, PreparedStatement ps)
-      throws Exception
+
+   protected void prepareToRemoveReference(long channelID,
+         MessageReference ref, Transaction tx, PreparedStatement ps)
+         throws Exception
    {
       if (trace)
       {
-         log.trace("removing " + ref + " from channel " + channelID
-               + (tx == null ? " non-transactionally" : " on transaction: " + tx));
+         log.trace("removing "
+               + ref
+               + " from channel "
+               + channelID
+               + (tx == null ? " non-transactionally" : " on transaction: "
+                     + tx));
       }
-      
-      ps.setLong(1, tx.getId()); 
+
+      ps.setLong(1, tx.getId());
       ps.setLong(2, ref.getMessage().getMessageID());
-      ps.setLong(3, channelID);           
+      ps.setLong(3, channelID);
    }
-   
+
    protected byte[] mapToBytes(Map map) throws Exception
    {
-      if (map == null || map.isEmpty())
-      {
-         return null;
-      }
-      
+      if (map == null || map.isEmpty()) { return null; }
+
       final int BUFFER_SIZE = 1024;
-       
+
       ByteArrayOutputStream bos = new ByteArrayOutputStream(BUFFER_SIZE);
-      
+
       DataOutputStream oos = new DataOutputStream(bos);
-      
+
       StreamUtils.writeMap(oos, map, true);
-      
+
       oos.close();
-      
+
       return bos.toByteArray();
    }
-   
+
    protected HashMap bytesToMap(byte[] bytes) throws Exception
    {
-      if (bytes == null)
-      {
-         return new HashMap();
-      }
-       
+      if (bytes == null) { return new HashMap(); }
+
       ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
-      
+
       DataInputStream dais = new DataInputStream(bis);
-      
+
       HashMap map = StreamUtils.readMap(dais, true);
-      
+
       dais.close();
-            
+
       return map;
    }
-   
+
    /**
     * Stores the message in the MESSAGE table.
     */
-   protected void storeMessage(Message m, PreparedStatement ps, boolean bindBlobs) throws Exception
-   {      
+   protected void storeMessage(Message m, PreparedStatement ps,
+         boolean bindBlobs) throws Exception
+   {
       // physically insert the row in the database
       // first set the fields from org.jboss.messaging.core.Routable
       ps.setLong(1, m.getMessageID());
@@ -1831,18 +2108,18 @@
       if (bindBlobs)
       {
          bindBlobs(m, ps, 8, 9);
-      }            
+      }
    }
 
-
    /** Stores the message using the Conditional update */
-   protected int storeMessage(Message message, PreparedStatement psInsertMessage, PreparedStatement psUpdateMessage)
-      throws Exception
+   protected int storeMessage(Message message,
+         PreparedStatement psInsertMessage, PreparedStatement psUpdateMessage)
+         throws Exception
    {
       int rows;
       if (!supportsBlobSelect)
       {
-      	//Need to store in two phases
+         // Need to store in two phases
          storeMessage(message, psInsertMessage, false);
          psInsertMessage.setLong(8, message.getMessageID());
          rows = psInsertMessage.executeUpdate();
@@ -1852,16 +2129,14 @@
             bindBlobs(message, psUpdateMessage, 1, 2);
             psUpdateMessage.setLong(3, message.getMessageID());
             rows = psUpdateMessage.executeUpdate();
-            if (rows != 1)
-            {
-               throw new IllegalStateException("Couldn't update messageId=" +
-                                               message.getMessageID() + " on paging");
-            }
+            if (rows != 1) { throw new IllegalStateException(
+                  "Couldn't update messageId=" + message.getMessageID()
+                        + " on paging"); }
          }
       }
       else
       {
-      	//Can store in one go
+         // Can store in one go
          storeMessage(message, psInsertMessage, true);
          psInsertMessage.setLong(10, message.getMessageID());
          rows = psInsertMessage.executeUpdate();
@@ -1869,11 +2144,10 @@
       return rows;
    }
 
-
-   private void bindBlobs(Message m, PreparedStatement ps, int headerPosition, int payloadPosition)
-      throws Exception
+   private void bindBlobs(Message m, PreparedStatement ps, int headerPosition,
+         int payloadPosition) throws Exception
    {
-      //headers
+      // headers
       byte[] bytes = mapToBytes(((MessageSupport) m).getHeaders());
       if (bytes != null)
       {
@@ -1884,7 +2158,6 @@
          ps.setNull(headerPosition, Types.LONGVARBINARY);
       }
 
-
       byte[] payload = m.getPayloadAsByteArray();
       if (payload != null)
       {
@@ -1896,63 +2169,71 @@
       }
    }
 
-   protected void setVarBinaryColumn(int column, PreparedStatement ps, byte[] bytes) throws Exception
+   protected void setVarBinaryColumn(int column, PreparedStatement ps,
+         byte[] bytes) throws Exception
    {
       if (usingTrailingByte)
       {
-         // Sybase has the stupid characteristic of truncating all trailing in zeros
+         // Sybase has the stupid characteristic of truncating all trailing in
+         // zeros
          // in varbinary columns
          // So we add an extra byte on the end when we store the varbinary data
          // otherwise we might lose data
          // http://jira.jboss.org/jira/browse/JBMESSAGING-825
-         
+
          byte[] res = new byte[bytes.length + 1];
-         
+
          System.arraycopy(bytes, 0, res, 0, bytes.length);
-         
+
          res[bytes.length] = 127;
 
          bytes = res;
       }
-      
-      ps.setBytes(column, bytes);            
-      
-      if (trace) { log.trace("Setting varbinary column of length: " + bytes.length); }
+
+      ps.setBytes(column, bytes);
+
+      if (trace)
+      {
+         log.trace("Setting varbinary column of length: " + bytes.length);
+      }
    }
-   
-   protected byte[] getVarBinaryColumn(ResultSet rs, int columnIndex) throws Exception
+
+   protected byte[] getVarBinaryColumn(ResultSet rs, int columnIndex)
+         throws Exception
    {
       byte[] bytes = rs.getBytes(columnIndex);
-      
+
       if (usingTrailingByte)
       {
          // Get rid of the trailing byte
-         
+
          // http://jira.jboss.org/jira/browse/JBMESSAGING-825
-         
+
          byte[] newBytes = new byte[bytes.length - 1];
-         
+
          System.arraycopy(bytes, 0, newBytes, 0, bytes.length - 1);
-         
+
          bytes = newBytes;
       }
-      
+
       return bytes;
    }
-     
+
    // Used for storing message headers and bodies
-   protected void setBytes(PreparedStatement ps, int columnIndex, byte[] bytes) throws Exception
+   protected void setBytes(PreparedStatement ps, int columnIndex, byte[] bytes)
+         throws Exception
    {
       if (usingBinaryStream)
       {
-         //Set the bytes using a binary stream - likely to be better for large byte[]
-         
+         // Set the bytes using a binary stream - likely to be better for large
+         // byte[]
+
          InputStream is = null;
-         
+
          try
          {
             is = new ByteArrayInputStream(bytes);
-            
+
             ps.setBinaryStream(columnIndex, is, bytes.length);
          }
          finally
@@ -1965,42 +2246,42 @@
       }
       else
       {
-         //Set the bytes using setBytes() - likely to be better for smaller byte[]
-         
+         // Set the bytes using setBytes() - likely to be better for smaller
+         // byte[]
+
          setVarBinaryColumn(columnIndex, ps, bytes);
       }
    }
-   
+
    protected byte[] getBytes(ResultSet rs, int columnIndex) throws Exception
    {
       if (usingBinaryStream)
       {
-         //Get the bytes using a binary stream - likely to be better for large byte[]
-         
+         // Get the bytes using a binary stream - likely to be better for large
+         // byte[]
+
          InputStream is = null;
          ByteArrayOutputStream os = null;
-         
+
          final int BUFFER_SIZE = 4096;
-         
+
          try
          {
             InputStream i = rs.getBinaryStream(columnIndex);
-            
-            if (i == null)
-            {
-               return null;
-            }
-            
-            is = new BufferedInputStream(rs.getBinaryStream(columnIndex), BUFFER_SIZE);
-            
+
+            if (i == null) { return null; }
+
+            is = new BufferedInputStream(rs.getBinaryStream(columnIndex),
+                  BUFFER_SIZE);
+
             os = new ByteArrayOutputStream(BUFFER_SIZE);
-            
+
             int b;
             while ((b = is.read()) != -1)
             {
                os.write(b);
             }
-            
+
             return os.toByteArray();
          }
          finally
@@ -2017,11 +2298,11 @@
       }
       else
       {
-         //Get the bytes using getBytes() - better for smaller byte[]
+         // Get the bytes using getBytes() - better for smaller byte[]
          return getVarBinaryColumn(rs, columnIndex);
       }
    }
-   
+
    protected void logBatchUpdate(String name, int[] rows, String action)
    {
       int count = 0;
@@ -2029,133 +2310,206 @@
       {
          count += rows[i];
       }
-      log.trace("Batch update " + name + ", " + action + " total of " + count + " rows");
+      log.trace("Batch update " + name + ", " + action + " total of " + count
+            + " rows");
    }
 
-   //PersistentServiceSupport overrides ----------------------------
-   
+   // PersistentServiceSupport overrides ----------------------------
+
    protected Map getDefaultDDLStatements()
    {
       Map<String, String> map = new LinkedHashMap<String, String>();
       map.put("CREATE_DUAL", "CREATE TABLE JBM_DUAL (DUMMY INTEGER)");
-      //Message reference
-      map.put("CREATE_MESSAGE_REFERENCE",
-              "CREATE TABLE JBM_MSG_REF (CHANNEL_ID BIGINT, " +
-              "MESSAGE_ID BIGINT, TRANSACTION_ID BIGINT, STATE CHAR(1), ORD BIGINT, PAGE_ORD BIGINT, " +
-              "DELIVERY_COUNT INTEGER, SCHED_DELIVERY BIGINT, PRIMARY KEY(CHANNEL_ID, MESSAGE_ID))");
-      map.put("CREATE_IDX_MESSAGE_REF_TX", "CREATE INDEX JBM_MSG_REF_TX ON JBM_MSG_REF (TRANSACTION_ID)");
-      map.put("CREATE_IDX_MESSAGE_REF_ORD", "CREATE INDEX JBM_MSG_REF_ORD ON JBM_MSG_REF (ORD)");
-      map.put("CREATE_IDX_MESSAGE_REF_PAGE_ORD", "CREATE INDEX JBM_MSG_REF__PAGE_ORD ON JBM_MSG_REF (PAGE_ORD)");
-      map.put("CREATE_IDX_MESSAGE_REF_MESSAGE_ID", "CREATE INDEX JBM_MSG_REF_MESSAGE_ID ON JBM_MSG_REF (MESSAGE_ID)");      
-      map.put("CREATE_IDX_MESSAGE_REF_SCHED_DELIVERY", "CREATE INDEX JBM_MSG_REF_SCHED_DELIVERY ON JBM_MSG_REF (SCHED_DELIVERY)");      
-      //Message
-      map.put("CREATE_MESSAGE",
-              "CREATE TABLE JBM_MSG (MESSAGE_ID BIGINT, RELIABLE CHAR(1), " +
-              "EXPIRATION BIGINT, TIMESTAMP BIGINT, PRIORITY TINYINT, TYPE TINYINT, INS_TIME BIGINT, HEADERS LONGVARBINARY, " +
-              "PAYLOAD LONGVARBINARY, " +
-              "PRIMARY KEY (MESSAGE_ID))"); 
-      map.put("CREATE_IDX_MESSAGE_TIMESTAMP", "CREATE INDEX JBM_MSG_REF_TIMESTAMP ON JBM_MSG (TIMESTAMP)");
-      //Transaction
-      map.put("CREATE_TRANSACTION",
-              "CREATE TABLE JBM_TX (" +
-              "NODE_ID INTEGER, TRANSACTION_ID BIGINT, BRANCH_QUAL VARBINARY(254), " +
-              "FORMAT_ID INTEGER, GLOBAL_TXID VARBINARY(254), PRIMARY KEY (TRANSACTION_ID))");
-      //Counter
-      map.put("CREATE_COUNTER",
-              "CREATE TABLE JBM_COUNTER (NAME VARCHAR(255), NEXT_ID BIGINT, PRIMARY KEY(NAME))");
+      // Message reference
+      map
+            .put(
+                  "CREATE_MESSAGE_REFERENCE",
+                  "CREATE TABLE JBM_MSG_REF (CHANNEL_ID BIGINT, "
+                        + "MESSAGE_ID BIGINT, TRANSACTION_ID BIGINT, STATE CHAR(1), ORD BIGINT, PAGE_ORD BIGINT, "
+                        + "DELIVERY_COUNT INTEGER, SCHED_DELIVERY BIGINT, PRIMARY KEY(CHANNEL_ID, MESSAGE_ID))");
+      map.put("CREATE_IDX_MESSAGE_REF_TX",
+            "CREATE INDEX JBM_MSG_REF_TX ON JBM_MSG_REF (TRANSACTION_ID)");
+      map.put("CREATE_IDX_MESSAGE_REF_ORD",
+            "CREATE INDEX JBM_MSG_REF_ORD ON JBM_MSG_REF (ORD)");
+      map.put("CREATE_IDX_MESSAGE_REF_PAGE_ORD",
+            "CREATE INDEX JBM_MSG_REF__PAGE_ORD ON JBM_MSG_REF (PAGE_ORD)");
+      map.put("CREATE_IDX_MESSAGE_REF_MESSAGE_ID",
+            "CREATE INDEX JBM_MSG_REF_MESSAGE_ID ON JBM_MSG_REF (MESSAGE_ID)");
+      map
+            .put("CREATE_IDX_MESSAGE_REF_SCHED_DELIVERY",
+                  "CREATE INDEX JBM_MSG_REF_SCHED_DELIVERY ON JBM_MSG_REF (SCHED_DELIVERY)");
+      // Message
+      map
+            .put(
+                  "CREATE_MESSAGE",
+                  "CREATE TABLE JBM_MSG (MESSAGE_ID BIGINT, RELIABLE CHAR(1), "
+                        + "EXPIRATION BIGINT, TIMESTAMP BIGINT, PRIORITY TINYINT, TYPE TINYINT, INS_TIME BIGINT, HEADERS LONGVARBINARY, "
+                        + "PAYLOAD LONGVARBINARY, "
+                        + "PRIMARY KEY (MESSAGE_ID))");
+      map.put("CREATE_IDX_MESSAGE_TIMESTAMP",
+            "CREATE INDEX JBM_MSG_REF_TIMESTAMP ON JBM_MSG (TIMESTAMP)");
+      // Transaction
+      map
+            .put(
+                  "CREATE_TRANSACTION",
+                  "CREATE TABLE JBM_TX ("
+                        + "NODE_ID INTEGER, TRANSACTION_ID BIGINT, BRANCH_QUAL VARBINARY(254), "
+                        + "FORMAT_ID INTEGER, GLOBAL_TXID VARBINARY(254), PRIMARY KEY (TRANSACTION_ID))");
+      // Counter
+      map
+            .put(
+                  "CREATE_COUNTER",
+                  "CREATE TABLE JBM_COUNTER (NAME VARCHAR(255), NEXT_ID BIGINT, PRIMARY KEY(NAME))");
       return map;
    }
-      
+
    protected Map getDefaultDMLStatements()
-   {                
+   {
       Map<String, String> map = new LinkedHashMap<String, String>();
       map.put("INSERT_DUAL", "INSERT INTO JBM_DUAL VALUES (1)");
       map.put("CHECK_DUAL", "SELECT 1 FROM JBM_DUAL");
-      //Message reference
-      map.put("INSERT_MESSAGE_REF",
-              "INSERT INTO JBM_MSG_REF (CHANNEL_ID, MESSAGE_ID, TRANSACTION_ID, STATE, ORD, PAGE_ORD, DELIVERY_COUNT, SCHED_DELIVERY) " +
-              "VALUES (?, ?, ?, ?, ?, ?, ?, ?)");
-      map.put("DELETE_MESSAGE_REF", "DELETE FROM JBM_MSG_REF WHERE MESSAGE_ID=? AND CHANNEL_ID=? AND STATE='C'");
+      // Message reference
+      map
+            .put(
+                  "INSERT_MESSAGE_REF",
+                  "INSERT INTO JBM_MSG_REF (CHANNEL_ID, MESSAGE_ID, TRANSACTION_ID, STATE, ORD, PAGE_ORD, DELIVERY_COUNT, SCHED_DELIVERY) "
+                        + "VALUES (?, ?, ?, ?, ?, ?, ?, ?)");
+      map
+            .put("DELETE_MESSAGE_REF",
+                  "DELETE FROM JBM_MSG_REF WHERE MESSAGE_ID=? AND CHANNEL_ID=? AND STATE='C'");
       map.put("UPDATE_MESSAGE_REF",
-              "UPDATE JBM_MSG_REF SET TRANSACTION_ID=?, STATE='-' " +
-              "WHERE MESSAGE_ID=? AND CHANNEL_ID=? AND STATE='C'");
-      map.put("UPDATE_PAGE_ORDER", "UPDATE JBM_MSG_REF SET PAGE_ORD = ? WHERE MESSAGE_ID=? AND CHANNEL_ID=?");
-      map.put("COMMIT_MESSAGE_REF1", "UPDATE JBM_MSG_REF SET STATE='C', TRANSACTION_ID = NULL WHERE TRANSACTION_ID=? AND STATE='+'");
-      map.put("COMMIT_MESSAGE_REF2", "DELETE FROM JBM_MSG_REF WHERE TRANSACTION_ID=? AND STATE='-'");
-      map.put("ROLLBACK_MESSAGE_REF1", "DELETE FROM JBM_MSG_REF WHERE TRANSACTION_ID=? AND STATE='+'");
-      map.put("ROLLBACK_MESSAGE_REF2", "UPDATE JBM_MSG_REF SET STATE='C', TRANSACTION_ID = NULL WHERE TRANSACTION_ID=? AND STATE='-'");
-      map.put("LOAD_PAGED_REFS",
-              "SELECT MESSAGE_ID, DELIVERY_COUNT, PAGE_ORD, SCHED_DELIVERY FROM JBM_MSG_REF " +
-              "WHERE CHANNEL_ID = ? AND PAGE_ORD BETWEEN ? AND ? ORDER BY PAGE_ORD");
-      map.put("LOAD_UNPAGED_REFS",
-              "SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'C' " +
-              "AND CHANNEL_ID = ? AND PAGE_ORD IS NULL ORDER BY ORD");
-      map.put("LOAD_REFS",
-              "SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'C' " +
-              "AND CHANNEL_ID = ? ORDER BY ORD");      
-      
-      map.put("UPDATE_REFS_NOT_PAGED", "UPDATE JBM_MSG_REF SET PAGE_ORD = NULL WHERE PAGE_ORD BETWEEN ? AND ? AND CHANNEL_ID=?");       
-      map.put("SELECT_MIN_MAX_PAGE_ORD", "SELECT MIN(PAGE_ORD), MAX(PAGE_ORD) FROM JBM_MSG_REF WHERE CHANNEL_ID = ?");     
-      map.put("SELECT_EXISTS_REF_MESSAGE_ID", "SELECT MESSAGE_ID FROM JBM_MSG_REF WHERE MESSAGE_ID = ?");
-      map.put("UPDATE_DELIVERY_COUNT", "UPDATE JBM_MSG_REF SET DELIVERY_COUNT = ? WHERE CHANNEL_ID = ? AND MESSAGE_ID = ?");
-      map.put("UPDATE_CHANNEL_ID", "UPDATE JBM_MSG_REF SET CHANNEL_ID = ? WHERE CHANNEL_ID = ?");
-      
-      //Message
+            "UPDATE JBM_MSG_REF SET TRANSACTION_ID=?, STATE='-' "
+                  + "WHERE MESSAGE_ID=? AND CHANNEL_ID=? AND STATE='C'");
+      map
+            .put("UPDATE_PAGE_ORDER",
+                  "UPDATE JBM_MSG_REF SET PAGE_ORD = ? WHERE MESSAGE_ID=? AND CHANNEL_ID=?");
+      map
+            .put(
+                  "COMMIT_MESSAGE_REF1",
+                  "UPDATE JBM_MSG_REF SET STATE='C', TRANSACTION_ID = NULL WHERE TRANSACTION_ID=? AND STATE='+'");
+      map.put("COMMIT_MESSAGE_REF2",
+            "DELETE FROM JBM_MSG_REF WHERE TRANSACTION_ID=? AND STATE='-'");
+      map.put("ROLLBACK_MESSAGE_REF1",
+            "DELETE FROM JBM_MSG_REF WHERE TRANSACTION_ID=? AND STATE='+'");
+      map
+            .put(
+                  "ROLLBACK_MESSAGE_REF2",
+                  "UPDATE JBM_MSG_REF SET STATE='C', TRANSACTION_ID = NULL WHERE TRANSACTION_ID=? AND STATE='-'");
+      map
+            .put(
+                  "LOAD_PAGED_REFS",
+                  "SELECT MESSAGE_ID, DELIVERY_COUNT, PAGE_ORD, SCHED_DELIVERY FROM JBM_MSG_REF "
+                        + "WHERE CHANNEL_ID = ? AND PAGE_ORD BETWEEN ? AND ? ORDER BY PAGE_ORD");
+      map
+            .put(
+                  "LOAD_UNPAGED_REFS",
+                  "SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'C' "
+                        + "AND CHANNEL_ID = ? AND PAGE_ORD IS NULL ORDER BY ORD");
+      map
+            .put(
+                  "LOAD_REFS",
+                  "SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'C' "
+                        + "AND CHANNEL_ID = ? ORDER BY ORD");
+
+      map
+            .put(
+                  "UPDATE_REFS_NOT_PAGED",
+                  "UPDATE JBM_MSG_REF SET PAGE_ORD = NULL WHERE PAGE_ORD BETWEEN ? AND ? AND CHANNEL_ID=?");
+      map
+            .put("SELECT_MIN_MAX_PAGE_ORD",
+                  "SELECT MIN(PAGE_ORD), MAX(PAGE_ORD) FROM JBM_MSG_REF WHERE CHANNEL_ID = ?");
+      map.put("SELECT_EXISTS_REF_MESSAGE_ID",
+            "SELECT MESSAGE_ID FROM JBM_MSG_REF WHERE MESSAGE_ID = ?");
+      map
+            .put(
+                  "UPDATE_DELIVERY_COUNT",
+                  "UPDATE JBM_MSG_REF SET DELIVERY_COUNT = ? WHERE CHANNEL_ID = ? AND MESSAGE_ID = ?");
+      map.put("UPDATE_CHANNEL_ID",
+            "UPDATE JBM_MSG_REF SET CHANNEL_ID = ? WHERE CHANNEL_ID = ?");
+
+      // Message
       map.put("LOAD_MESSAGES",
-              "SELECT MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, " +
-              "PRIORITY, HEADERS, PAYLOAD, TYPE " +
-              "FROM JBM_MSG");
+            "SELECT MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, "
+                  + "PRIORITY, HEADERS, PAYLOAD, TYPE " + "FROM JBM_MSG");
       map.put("INSERT_MESSAGE",
-              "INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, " +
-              "TIMESTAMP, PRIORITY, TYPE, INS_TIME, HEADERS, PAYLOAD) " +           
-              "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)" );
-      map.put("INSERT_MESSAGE_CONDITIONAL",
-      		  "INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, " +
-              "TIMESTAMP, PRIORITY, TYPE, INS_TIME) " +
-              "SELECT ?, ?, ?, ?, ?, ?, ? " + 
-              "FROM JBM_DUAL WHERE NOT EXISTS (SELECT MESSAGE_ID FROM JBM_MSG WHERE MESSAGE_ID = ?)");
-      map.put("INSERT_MESSAGE_CONDITIONAL_FULL", "INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE, INS_TIME, HEADERS, PAYLOAD) SELECT ?, ?, ?, ?, ?, ?, ?, ?, ? FROM JBM_DUAL WHERE NOT EXISTS (SELECT MESSAGE_ID FROM JBM_MSG WHERE MESSAGE_ID = ?)");
-      map.put("UPDATE_MESSAGE_4CONDITIONAL", "UPDATE JBM_MSG SET HEADERS=?, PAYLOAD=? WHERE MESSAGE_ID=?");
+            "INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, "
+                  + "TIMESTAMP, PRIORITY, TYPE, INS_TIME, HEADERS, PAYLOAD) "
+                  + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)");
+      map
+            .put(
+                  "INSERT_MESSAGE_CONDITIONAL",
+                  "INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, "
+                        + "TIMESTAMP, PRIORITY, TYPE, INS_TIME) "
+                        + "SELECT ?, ?, ?, ?, ?, ?, ? "
+                        + "FROM JBM_DUAL WHERE NOT EXISTS (SELECT MESSAGE_ID FROM JBM_MSG WHERE MESSAGE_ID = ?)");
+      map
+            .put(
+                  "INSERT_MESSAGE_CONDITIONAL_FULL",
+                  "INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE, INS_TIME, HEADERS, PAYLOAD) SELECT ?, ?, ?, ?, ?, ?, ?, ?, ? FROM JBM_DUAL WHERE NOT EXISTS (SELECT MESSAGE_ID FROM JBM_MSG WHERE MESSAGE_ID = ?)");
+      map.put("UPDATE_MESSAGE_4CONDITIONAL",
+            "UPDATE JBM_MSG SET HEADERS=?, PAYLOAD=? WHERE MESSAGE_ID=?");
       map.put("MESSAGE_ID_COLUMN", "MESSAGE_ID");
-      map.put("REAP_MESSAGES", "DELETE FROM JBM_MSG WHERE INS_TIME <= ? AND NOT EXISTS (SELECT * FROM JBM_MSG_REF WHERE JBM_MSG_REF.MESSAGE_ID = JBM_MSG.MESSAGE_ID)");
-      map.put("DELETE_MESSAGE", "DELETE FROM JBM_MSG WHERE MESSAGE_ID = ? AND NOT EXISTS (SELECT * FROM JBM_MSG_REF WHERE JBM_MSG_REF.MESSAGE_ID = ?)");
-      //Transaction
-      map.put("INSERT_TRANSACTION",
-              "INSERT INTO JBM_TX (NODE_ID, TRANSACTION_ID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID) " +
-              "VALUES(?, ?, ?, ?, ?)");
-      map.put("DELETE_TRANSACTION", "DELETE FROM JBM_TX WHERE NODE_ID = ? AND TRANSACTION_ID = ?");
-      map.put("SELECT_PREPARED_TRANSACTIONS", "SELECT TRANSACTION_ID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID FROM JBM_TX WHERE NODE_ID = ?");
-      map.put("SELECT_MESSAGE_ID_FOR_REF", "SELECT MESSAGE_ID, CHANNEL_ID FROM JBM_MSG_REF WHERE TRANSACTION_ID = ? AND STATE = '+' ORDER BY ORD");
-      map.put("SELECT_MESSAGE_ID_FOR_ACK", "SELECT MESSAGE_ID, CHANNEL_ID FROM JBM_MSG_REF WHERE TRANSACTION_ID = ? AND STATE = '-' ORDER BY ORD");
+      map
+            .put(
+                  "REAP_MESSAGES",
+                  "DELETE FROM JBM_MSG WHERE INS_TIME <= ? AND NOT EXISTS (SELECT * FROM JBM_MSG_REF WHERE JBM_MSG_REF.MESSAGE_ID = JBM_MSG.MESSAGE_ID)");
+      map
+            .put(
+                  "DELETE_MESSAGE",
+                  "DELETE FROM JBM_MSG WHERE MESSAGE_ID = ? AND NOT EXISTS (SELECT * FROM JBM_MSG_REF WHERE JBM_MSG_REF.MESSAGE_ID = ?)");
+      // Transaction
+      map
+            .put(
+                  "INSERT_TRANSACTION",
+                  "INSERT INTO JBM_TX (NODE_ID, TRANSACTION_ID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID) "
+                        + "VALUES(?, ?, ?, ?, ?)");
+      map.put("DELETE_TRANSACTION",
+            "DELETE FROM JBM_TX WHERE NODE_ID = ? AND TRANSACTION_ID = ?");
+      map
+            .put(
+                  "SELECT_PREPARED_TRANSACTIONS",
+                  "SELECT TRANSACTION_ID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID FROM JBM_TX WHERE NODE_ID = ?");
+      map
+            .put(
+                  "SELECT_MESSAGE_ID_FOR_REF",
+                  "SELECT MESSAGE_ID, CHANNEL_ID FROM JBM_MSG_REF WHERE TRANSACTION_ID = ? AND STATE = '+' ORDER BY ORD");
+      map
+            .put(
+                  "SELECT_MESSAGE_ID_FOR_ACK",
+                  "SELECT MESSAGE_ID, CHANNEL_ID FROM JBM_MSG_REF WHERE TRANSACTION_ID = ? AND STATE = '-' ORDER BY ORD");
       map.put("UPDATE_TX", "UPDATE JBM_TX SET NODE_ID=? WHERE NODE_ID=?");
-      
-      //Counter
-      map.put("UPDATE_COUNTER", "UPDATE JBM_COUNTER SET NEXT_ID = ? WHERE NAME=?");
+
+      // Counter
+      map.put("UPDATE_COUNTER",
+            "UPDATE JBM_COUNTER SET NEXT_ID = ? WHERE NAME=?");
       map.put("SELECT_COUNTER", "SELECT NEXT_ID FROM JBM_COUNTER WHERE NAME=?");
-      map.put("INSERT_COUNTER", "INSERT INTO JBM_COUNTER (NAME, NEXT_ID) VALUES (?, ?)");
-      //Other
-      map.put("SELECT_ALL_CHANNELS", "SELECT DISTINCT(CHANNEL_ID) FROM JBM_MSG_REF");
+      map.put("INSERT_COUNTER",
+            "INSERT INTO JBM_COUNTER (NAME, NEXT_ID) VALUES (?, ?)");
+      // Other
+      map.put("SELECT_ALL_CHANNELS",
+            "SELECT DISTINCT(CHANNEL_ID) FROM JBM_MSG_REF");
 
       return map;
    }
-   
+
    // Private -------------------------------------------------------
-   
+
    private void deleteMessages(final List references) throws Exception
    {
       class DeleteMessagesRunner extends JDBCTxRunner2
       {
          public Object doTransaction() throws Exception
-         {  
+         {
 
             PreparedStatement psMessage = null;
 
             try
             {
-               psMessage = conn.prepareStatement(getSQLStatement("DELETE_MESSAGE"));
-               
+               psMessage = conn
+                     .prepareStatement(getSQLStatement("DELETE_MESSAGE"));
+
                Iterator iter = references.iterator();
-               
+
                while (iter.hasNext())
                {
                   Object obj = iter.next();
@@ -2163,161 +2517,172 @@
                   MessageReference ref;
                   if (obj instanceof MessageReference)
                   {
-                     ref = (MessageReference)obj;
+                     ref = (MessageReference) obj;
                   }
                   else
                   {
-                     ref = ((ChannelRefPair)obj).ref;
-                  }                  
+                     ref = ((ChannelRefPair) obj).ref;
+                  }
 
                   psMessage.setLong(1, ref.getMessage().getMessageID());
                   psMessage.setLong(2, ref.getMessage().getMessageID());
-   
+
                   int rows = psMessage.executeUpdate();
-                  
-                  if (trace) { log.trace("Deleted " + rows + " messages"); }             
 
+                  if (trace)
+                  {
+                     log.trace("Deleted " + rows + " messages");
+                  }
+
                }
-               
+
                return null;
             }
             finally
             {
                closeStatement(psMessage);
-            }  
+            }
          }
       }
-      
-      //Order to avoid deadlock
+
+      // Order to avoid deadlock
       orderReferences(references);
-      
+
       new DeleteMessagesRunner().executeWithRetry();
    }
-   
-   
+
    private void deleteMessage(final long messageID) throws Exception
    {
       class DeleteMessageRunner extends JDBCTxRunner2
       {
          public Object doTransaction() throws Exception
-         {  
+         {
 
             PreparedStatement psMessage = null;
 
             try
             {
-               psMessage = conn.prepareStatement(getSQLStatement("DELETE_MESSAGE"));
+               psMessage = conn
+                     .prepareStatement(getSQLStatement("DELETE_MESSAGE"));
 
                psMessage.setLong(1, messageID);
                psMessage.setLong(2, messageID);
 
                int rows = psMessage.executeUpdate();
 
-               if (trace) { log.trace("Deleted " + rows + " messages"); }             
+               if (trace)
+               {
+                  log.trace("Deleted " + rows + " messages");
+               }
 
                return null;
             }
             finally
             {
                closeStatement(psMessage);
-            }  
+            }
          }
       }
-      
+
       new DeleteMessageRunner().executeWithRetry();
    }
-   
-   
-   private List getMessageChannelPair(String sqlQuery, long transactionId) throws Exception
+
+   private List getMessageChannelPair(String sqlQuery, long transactionId)
+         throws Exception
    {
-      if (trace) log.trace("loading message and channel ids for tx [" + transactionId + "]");
-      
+      if (trace) log.trace("loading message and channel ids for tx ["
+            + transactionId + "]");
+
       if (!this.nodeIDSet)
       {
-      	//Sanity
-      	throw new IllegalStateException("Node id has not been set");
+         // Sanity
+         throw new IllegalStateException("Node id has not been set");
       }
-      
+
       Connection conn = null;
       PreparedStatement ps = null;
       ResultSet rs = null;
       TransactionWrapper wrap = new TransactionWrapper();
-      
+
       try
       {
          conn = ds.getConnection();
-         
+
          ps = conn.prepareStatement(sqlQuery);
-         
+
          ps.setLong(1, transactionId);
-           
+
          rs = ps.executeQuery();
-         
-         //Don't use a Map. A message could be in multiple channels in a tx, so if you use a map
-         //when you put the same message again it's going to overwrite the previous put!!
-         
+
+         // Don't use a Map. A message could be in multiple channels in a tx, so
+         // if you use a map
+         // when you put the same message again it's going to overwrite the
+         // previous put!!
+
          class Holder
          {
             long messageId;
             long channelId;
+
             Holder(long messageId, long channelId)
             {
                this.messageId = messageId;
                this.channelId = channelId;
             }
          }
-         
+
          List<Holder> holders = new ArrayList<Holder>();
-         
-         //Unique set of messages
+
+         // Unique set of messages
          Set<Long> msgIds = new HashSet<Long>();
-         
-         //TODO it would probably have been simpler just to have done all this in a SQL JOIN rather
-         //than do the join in memory.....        
-                  
-         while(rs.next())
-         {            
+
+         // TODO it would probably have been simpler just to have done all this
+         // in a SQL JOIN rather
+         // than do the join in memory.....
+
+         while (rs.next())
+         {
             long messageId = rs.getLong(1);
             long channelId = rs.getLong(2);
-            
+
             Holder holder = new Holder(messageId, channelId);
-            
+
             holders.add(holder);
-                        
+
             msgIds.add(messageId);
-            
-            if (trace) log.trace("Loaded MsgID: " + messageId + " and ChannelID: " + channelId);
+
+            if (trace) log.trace("Loaded MsgID: " + messageId
+                  + " and ChannelID: " + channelId);
          }
-         
+
          Map messageMap = new HashMap();
-         
+
          List messages = getMessages(new ArrayList(msgIds));
-         
-         for (Iterator iter = messages.iterator(); iter.hasNext(); )
+
+         for (Iterator iter = messages.iterator(); iter.hasNext();)
          {
-            Message msg = (Message)iter.next();
-            
-            messageMap.put(new Long(msg.getMessageID()), msg);            
+            Message msg = (Message) iter.next();
+
+            messageMap.put(new Long(msg.getMessageID()), msg);
          }
-         
+
          List returnList = new ArrayList();
-         
-         for (Iterator iter = holders.iterator(); iter.hasNext(); )
+
+         for (Iterator iter = holders.iterator(); iter.hasNext();)
          {
-            Holder holder = (Holder)iter.next();
-            
-            Message msg = (Message)messageMap.get(new Long(holder.messageId));
-            
-            if (msg == null)
-            {
-               throw new IllegalStateException("Cannot find message " + holder.messageId);
-            }
-            
-            MessageChannelPair pair = new MessageChannelPair(msg, holder.channelId);
-            
+            Holder holder = (Holder) iter.next();
+
+            Message msg = (Message) messageMap.get(new Long(holder.messageId));
+
+            if (msg == null) { throw new IllegalStateException(
+                  "Cannot find message " + holder.messageId); }
+
+            MessageChannelPair pair = new MessageChannelPair(msg,
+                  holder.channelId);
+
             returnList.add(pair);
          }
-         
+
          return returnList;
       }
       catch (Exception e)
@@ -2327,35 +2692,43 @@
       }
       finally
       {
-      	closeResultSet(rs);
-      	closeStatement(ps);
-      	closeConnection(conn);
+         closeResultSet(rs);
+         closeStatement(ps);
+         closeConnection(conn);
          wrap.end();
       }
    }
-   
+
    private synchronized long getOrdering()
    {
-      //We generate the ordering for the message reference by taking the lowest 48 bits of the current time and
-      //concatenating with a 15 bit rotating counter to form a string of 63 bits which we then place
-      //in the right most bits of a long, giving a positive signed 63 bit integer.
-      
-      //Having a time element in the ordering means we don't have to maintain a counter in the database
-      //It also helps with failover since if two queues merge after failover then, the ordering will mean
-      //their orderings interleave nicely and they still get consumed in pretty much time order
-      
-      //We only have to guarantee ordering per session, so having slight differences of time on different nodes is
-      //not a problem
-      
-      //The time element is good for about 8919 years - if you're still running JBoss Messaging then, I suggest you need an
-      //upgrade!
-      
+      // We generate the ordering for the message reference by taking the lowest
+      // 48 bits of the current time and
+      // concatenating with a 15 bit rotating counter to form a string of 63
+      // bits which we then place
+      // in the right most bits of a long, giving a positive signed 63 bit
+      // integer.
+
+      // Having a time element in the ordering means we don't have to maintain a
+      // counter in the database
+      // It also helps with failover since if two queues merge after failover
+      // then, the ordering will mean
+      // their orderings interleave nicely and they still get consumed in pretty
+      // much time order
+
+      // We only have to guarantee ordering per session, so having slight
+      // differences of time on different nodes is
+      // not a problem
+
+      // The time element is good for about 8919 years - if you're still running
+      // JBoss Messaging then, I suggest you need an
+      // upgrade!
+
       long order = System.currentTimeMillis();
-      
+
       order = order << 15;
-      
+
       order = order | orderCount;
-      
+
       if (orderCount == Short.MAX_VALUE)
       {
          orderCount = 0;
@@ -2364,66 +2737,66 @@
       {
          orderCount++;
       }
-      
+
       return order;
    }
-         
+
    // Inner classes -------------------------------------------------
-            
+
    private static class ChannelRefPair
    {
       private long channelID;
       private MessageReference ref;
-      
+
       private ChannelRefPair(long channelID, MessageReference ref)
       {
          this.channelID = channelID;
          this.ref = ref;
       }
    }
-   
+
    private class TransactionCallback implements TxCallback
    {
       private Transaction tx;
-      
+
       private List refsToAdd;
-      
+
       private List refsToRemove;
-      
+
       private TransactionCallback(Transaction tx)
       {
          this.tx = tx;
-         
+
          refsToAdd = new ArrayList();
-         
+
          refsToRemove = new ArrayList();
       }
-      
+
       private void addReferenceToAdd(long channelId, MessageReference ref)
       {
          refsToAdd.add(new ChannelRefPair(channelId, ref));
       }
-      
+
       private void addReferenceToRemove(long channelId, MessageReference ref)
       {
          refsToRemove.add(new ChannelRefPair(channelId, ref));
       }
-      
+
       public void afterCommit(boolean onePhase)
       {
-         //NOOP
+         // NOOP
       }
-      
+
       public void afterPrepare()
       {
-         //NOOP
+         // NOOP
       }
-      
+
       public void afterRollback(boolean onePhase)
       {
-         //NOOP
+         // NOOP
       }
-      
+
       public void beforeCommit(boolean onePhase) throws Exception
       {
          if (onePhase)
@@ -2435,17 +2808,17 @@
             handleBeforeCommit2PC(refsToRemove, tx);
          }
       }
-      
+
       public void beforePrepare() throws Exception
       {
          handleBeforePrepare(refsToAdd, refsToRemove, tx);
       }
-      
+
       public void beforeRollback(boolean onePhase) throws Exception
       {
          if (onePhase)
          {
-            //NOOP - nothing in db
+            // NOOP - nothing in db
          }
          else
          {
@@ -2453,37 +2826,37 @@
          }
       }
    }
-   
+
    private void orderReferences(List references)
-   {      
+   {
       Collections.sort(references, MessageOrderComparator.instance);
    }
-      
+
    private static class MessageOrderComparator implements Comparator
    {
       static MessageOrderComparator instance = new MessageOrderComparator();
-      
+
       public int compare(Object o1, Object o2)
-      {        
+      {
          MessageReference ref1;
          MessageReference ref2;
-         
+
          if (o1 instanceof MessageReference)
          {
-            ref1 = (MessageReference)o1;
-            ref2 = (MessageReference)o2;
+            ref1 = (MessageReference) o1;
+            ref2 = (MessageReference) o2;
          }
          else
          {
-            ref1 = ((ChannelRefPair)o1).ref;
-            ref2 = ((ChannelRefPair)o2).ref;
+            ref1 = ((ChannelRefPair) o1).ref;
+            ref2 = ((ChannelRefPair) o2).ref;
          }
-         
-         long id1 = ref1.getMessage().getMessageID();         
-         long id2 = ref2.getMessage().getMessageID(); 
-         
+
+         long id1 = ref1.getMessage().getMessageID();
+         long id2 = ref2.getMessage().getMessageID();
+
          return (id1 < id2 ? -1 : (id1 == id2 ? 0 : 1));
-      }      
+      }
    }
-   
+
 }

Modified: branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/PagingChannelSupport.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/PagingChannelSupport.java	2007-11-07 16:34:07 UTC (rev 3296)
+++ branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/PagingChannelSupport.java	2007-11-07 19:36:53 UTC (rev 3297)
@@ -210,7 +210,8 @@
          
          doLoad(ili);
          
-         //Maybe we need to load some paged refs
+         //Maybe we need to load some paged refs too since we might not be full (fullSize might have been increased from last
+         //load)
          
          while (checkLoad()) {}
       }

Modified: branches/Branch_Stable/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_ReloadTest.java
===================================================================
--- branches/Branch_Stable/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_ReloadTest.java	2007-11-07 16:34:07 UTC (rev 3296)
+++ branches/Branch_Stable/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_ReloadTest.java	2007-11-07 19:36:53 UTC (rev 3297)
@@ -303,5 +303,68 @@
       assertEquals(0, queue.getMessageCount());
    }
    
+   //http://jira.jboss.org/jira/browse/JBMESSAGING-1139
+   //If the downcache is not full when we stop the server, we need to test that when we start it again
+   //it loads ok (previously it wasn't)
+   
+   //First test with downcach never flushed
+   public void testRecoverableQueueRestartWithDownCache() throws Throwable
+   {
+      testRecoverableQueueRestartWithDownCache(110);
+   }
+   
+   //Then with down cache flushed once 
+   public void testRecoverableQueueRestartWithDownCacheAlreadyFlushed() throws Throwable
+   {
+      testRecoverableQueueRestartWithDownCache(130);
+   }
+   
+   private void testRecoverableQueueRestartWithDownCache(int num) throws Throwable
+   {
+      MessagingQueue queue =
+         new MessagingQueue(1, "queue1", 1, ms, pm, true, -1, null, 100, 20, 20, false, 300000);
+      queue.activate();
+      
+      Message[] msgs = new Message[num];
+      
+      MessageReference[] refs = new MessageReference[num];
+            
+      for (int i = 0; i < num; i++)
+      {
+         msgs[i] = CoreMessageFactory.createCoreMessage(i, true, null);
+         
+         refs[i] = msgs[i].createReference();
+                
+         queue.handle(null, refs[i], null); 
+      }
+      
+            
+      pm.stop();
+      tr.stop();
+      ms.stop();
+      
+      pm =
+         new JDBCPersistenceManager(sc.getDataSource(), sc.getTransactionManager(),
+                  sc.getPersistenceManagerSQLProperties(),
+                  true, true, true, false, 100, !sc.getDatabaseName().equals("oracle"));   
+      ((JDBCPersistenceManager)pm).injectNodeID(1);
+      pm.start();
+      
+      ms = new SimpleMessageStore();
+      ms.start();
+      
+      tr = new TransactionRepository(pm, ms, idm);
 
+      tr.start();
+         
+      MessagingQueue queue2 =
+         new MessagingQueue(1, "queue1", 1, ms, pm, true, -1, null, 100, 20, 20, false, 300000);
+    
+      queue2.load();
+      queue2.activate();
+                              
+      this.consume(queue2, 0, refs, num);            
+   }
+     
+  
 }




More information about the jboss-cvs-commits mailing list