[jboss-cvs] JBoss Messaging SVN: r3025 - trunk/src/main/org/jboss/messaging/core/impl.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Aug 22 09:14:56 EDT 2007


Author: timfox
Date: 2007-08-22 09:14:56 -0400 (Wed, 22 Aug 2007)
New Revision: 3025

Modified:
   trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
Log:
PersistenceManager interim commit 4


Modified: trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java	2007-08-22 11:52:23 UTC (rev 3024)
+++ trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java	2007-08-22 13:14:56 UTC (rev 3025)
@@ -87,8 +87,6 @@
    
    private boolean trace = log.isTraceEnabled();
       
-   private boolean usingBatchUpdates = false;
-   
    private boolean usingBinaryStream = true;
    
    private boolean usingTrailingByte = false;
@@ -117,7 +115,7 @@
    {
       super(ds, tm, sqlProperties, createTablesOnStartup);
       
-      this.usingBatchUpdates = usingBatchUpdates;
+      //usingBatchUpdates is currently ignored due to sketchy support from databases
       
       this.usingBinaryStream = usingBinaryStream;
       
@@ -263,7 +261,7 @@
       
       try
       {
-         List transactions = new ArrayList();
+         List<PreparedTxInfo> transactions = new ArrayList<PreparedTxInfo> ();
          
          conn = ds.getConnection();
          
@@ -309,86 +307,10 @@
       }
    }
          
-   
-   abstract class JDBCTxRunner
-   {   
-   	private static final int MAX_TRIES = 25;
-   	
-   	Connection conn;
-
-      TransactionWrapper wrap;
-      
-		public Object execute() throws Exception
-		{
-	      wrap = new TransactionWrapper();
-	      
-	      try
-	      {
-	         conn = ds.getConnection();
-	         
-	         return doTransaction();
-	      }
-	      catch (Exception e)
-	      {
-	         wrap.exceptionOccurred();
-	         throw e;
-	      }
-	      finally
-	      {	      		      
-	      	closeConnection(conn);
-	         wrap.end();
-	      }  
-		}
-		
-		public Object executeWithRetry() throws Exception
-		{
-	      int tries = 0;
-	      
-	      while (true)
-	      {
-	         try
-	         {
-	            Object res = execute();
-	            
-	            if (tries > 0)
-	            {
-	               log.warn("Update worked after retry");
-	            }
-	            return res;	            
-	         }
-	         catch (SQLException e)
-	         {
-	            log.warn("SQLException caught - assuming deadlock detected, try:" + (tries + 1), e);
-	            
-	            log.info("SQLState:" + e.getSQLState() + " code:" + e.getErrorCode());
-	            
-	            tries++;
-	            if (tries == MAX_TRIES)
-	            {
-	               log.error("Retried " + tries + " times, now giving up");
-	               throw new IllegalStateException("Failed to excecute transaction");
-	            }
-	            log.warn("Trying again after a pause");
-	            //Now we wait for a random amount of time to minimise risk of deadlock
-	            Thread.sleep((long)(Math.random() * 500));
-	         }  
-	      }
-		}
-		
-		public abstract Object doTransaction() throws Exception;
-   }
-   
-   
-               
-   
-   
-   
-   
-   
    // Related to counters
    // ===================
    
-   public long reserveIDBlock(String counterName, int size) throws Exception
+   public long reserveIDBlock(final String counterName, final int size) throws Exception
    {
       if (trace) { log.trace("Getting ID block for counter " + counterName + ", size " + size); }
       
@@ -399,16 +321,6 @@
       
       class ReserveIDBlockRunner extends JDBCTxRunner
       {
-      	String counterName;
-      	
-      	int size;
-      	
-      	public ReserveIDBlockRunner(String counterName, int size)
-      	{
-      		this.counterName = counterName;
-      		this.size = size;
-      	}
-      	
       	public Object doTransaction() throws Exception
    		{
             //	For the clustered case - this MUST use SELECT .. FOR UPDATE or a similar
@@ -477,7 +389,7 @@
    		}
       }
       
-      return (Long)new ReserveIDBlockRunner(counterName, size).executeWithRetry();
+      return (Long)new ReserveIDBlockRunner().executeWithRetry();
    }
          
    /*
@@ -508,7 +420,7 @@
          
          int count = 0;
          
-         List msgs = new ArrayList();
+         List<Message> msgs = new ArrayList<Message>();
          
          while (iter.hasNext())
          {
@@ -611,27 +523,16 @@
    
    //Used to page NP messages or P messages in a non recoverable queue
    
-   public void pageReferences(long channelID, List references, boolean page) throws Exception
+   public void pageReferences(final long channelID, final List references, final boolean page) throws Exception
    {      
    	class PageReferencesRunner extends JDBCTxRunner
       {
-      	long channelID;
-      	List references;
-      	boolean page;
-      	
-      	public PageReferencesRunner(long channelID, List references, boolean page)
-      	{
-      		this.channelID = channelID;
-      		this.references = references;
-      		this.page = page;
-      	}
-      	
       	public Object doTransaction() throws Exception
    		{
       		PreparedStatement psInsertReference = null;
       		PreparedStatement psInsertMessage = null;
       		
-      		List persistedMessages = new ArrayList();
+      		List<Message> persistedMessages = new ArrayList<Message>();
       		
       		try
       		{      		
@@ -685,7 +586,7 @@
 	         	
 	         	return null;
       		}
-      		catch (SQLException e)
+      		catch (Exception e)
       		{
       			//The tx will be rolled back
       			//so we need to set the messages to not persisted
@@ -706,25 +607,16 @@
    		}      	      	      	
       }
    	
-   	new PageReferencesRunner(channelID, references, page).executeWithRetry(); 
+   	new PageReferencesRunner().executeWithRetry(); 
    }
          
    //After loading paged refs this is used to remove any NP or P messages in a unrecoverable channel
-   public void removeDepagedReferences(long channelID, List references) throws Exception
+   public void removeDepagedReferences(final long channelID, final List references) throws Exception
    {
       if (trace) { log.trace(this + " Removing " + references.size() + " refs from channel " + channelID); }
       
       class RemoveDepagedReferencesRunner extends JDBCTxRunner
       {
-      	long channelID;
-      	List references;
-      	
-      	public RemoveDepagedReferencesRunner(long channelID, List references)
-      	{
-      		this.channelID = channelID;
-      		this.references = references;
-      	}
-      	
       	public Object doTransaction() throws Exception
    		{
 		      PreparedStatement psDeleteReference = null;  
@@ -743,16 +635,9 @@
 		            
 		            //log.info("Removed ref with page order " + ref.getPagingOrder());
 		            
-		            if (usingBatchUpdates)
-		            {
-		               psDeleteReference.addBatch();
-		            }
-		            else
-		            {
-		               int rows = psDeleteReference.executeUpdate();
+		            int rows = psDeleteReference.executeUpdate();
 		               
-		               if (trace) { log.trace("Deleted " + rows + " rows"); }
-		            }
+		            if (trace) { log.trace("Deleted " + rows + " rows"); }		            
 		                          
 		            //There is a small possibility that the ref is depaged here, then paged again, before this flag is set
 		            //and the tx is committed so the message be attempted to be inserted twice but this should be ok
@@ -760,13 +645,6 @@
 		            ref.getMessage().setPersisted(false);                  
 		         }         
 		         
-		         if (usingBatchUpdates)
-		         {
-		            int[] rowsReference = executeWithRetryBatch(psDeleteReference);
-		            
-		            if (trace) { logBatchUpdate(getSQLStatement("DELETE_MESSAGE_REF"), rowsReference, "deleted"); }                                    
-		         }
-		         
 		         return null;
 		      }
 		      finally
@@ -776,92 +654,91 @@
    		}
       }
       
-      new RemoveDepagedReferencesRunner(channelID, references).executeWithRetry();
+      new RemoveDepagedReferencesRunner().executeWithRetry();
    }
    
    // After loading paged refs this is used to update P messages to non paged
-   public void updateReferencesNotPagedInRange(long channelID, long orderStart, long orderEnd, long num) throws Exception
+   public void updateReferencesNotPagedInRange(final long channelID, final long orderStart, final long orderEnd, final long num) throws Exception
    {
       if (trace) { log.trace("Updating paged references for channel " + channelID + " between " + orderStart + " and " + orderEnd); }
       
-      Connection conn = null;
-      PreparedStatement ps = null;
-      TransactionWrapper wrap = new TransactionWrapper();
-    
-      try
+      class UpdateReferencesNotPagedInRangeRunner extends JDBCTxRunner
       {
-         conn = ds.getConnection();
-         
-         ps = conn.prepareStatement(getSQLStatement("UPDATE_REFS_NOT_PAGED"));
-                 
-         ps.setLong(1, orderStart);
-         
-         ps.setLong(2, orderEnd);
-         
-         ps.setLong(3, channelID);
-         
-         int rows = executeWithRetry(ps);
-           
-         if (trace) { log.trace(JDBCUtil.statementToString(getSQLStatement("UPDATE_REFS_NOT_PAGED"), new Long(channelID),
-                                new Long(orderStart), new Long(orderEnd)) + " updated " + rows + " rows"); }
-
-         //Sanity check
-         if (rows != num)
-         {
-            throw new IllegalStateException("Did not update correct number of rows");
-         }            
+      	public Object doTransaction() throws Exception
+   		{
+             PreparedStatement ps = null;
+             
+             try
+             {              
+		         ps = conn.prepareStatement(getSQLStatement("UPDATE_REFS_NOT_PAGED"));
+		                 
+		         ps.setLong(1, orderStart);
+		         
+		         ps.setLong(2, orderEnd);
+		         
+		         ps.setLong(3, channelID);
+		         
+		         int rows = ps.executeUpdate();
+		           
+		         if (trace) { log.trace(JDBCUtil.statementToString(getSQLStatement("UPDATE_REFS_NOT_PAGED"), new Long(channelID),
+		                                new Long(orderStart), new Long(orderEnd)) + " updated " + rows + " rows"); }
+		
+		         //Sanity check
+		         if (rows != num)
+		         {
+		            throw new IllegalStateException("Did not update correct number of rows");
+		         }     
+		         
+		         return null;
+		      }
+		      finally
+		      {
+		      	closeStatement(ps);
+		      }
+   		}
       }
-      catch (Exception e)
-      {
-         wrap.exceptionOccurred();
-         throw e;
-      }
-      finally
-      {
-      	closeStatement(ps);
-      	closeConnection(conn);       
-         wrap.end();
-      }
+      
+      new UpdateReferencesNotPagedInRangeRunner().executeWithRetry();
    }
 
-   public void mergeTransactions(long fromChannelID, long toChannelID) throws Exception
+   public void mergeTransactions(final long fromChannelID, final long toChannelID) throws Exception
    {
       if (trace) { log.trace("Merging transactions from channel " + fromChannelID + " to " + toChannelID); }
-
+      
       // Sanity check
       
       if (fromChannelID == toChannelID)
       {
       	throw new IllegalArgumentException("Cannot merge transactions - they have the same channel id!!");
       }
-
-      Connection conn = null;
-      PreparedStatement statement = null;
-      TransactionWrapper wrap = new TransactionWrapper();
-      try
+      
+      class MergeTransactionsRunner extends JDBCTxRunner
       {
-         conn = ds.getConnection();
-         statement = conn.prepareStatement(getSQLStatement("UPDATE_TX"));
-         statement.setLong(1, toChannelID);
-         statement.setLong(2, fromChannelID);
-         int affected = statement.executeUpdate();
-
-         log.debug("Merged " + affected + " transactions from channel " + fromChannelID + " into node " + toChannelID);
+      	public Object doTransaction() throws Exception
+   		{
+		      PreparedStatement statement = null;
+		      try
+		      {
+		         statement = conn.prepareStatement(getSQLStatement("UPDATE_TX"));
+		         statement.setLong(1, toChannelID);
+		         statement.setLong(2, fromChannelID);
+		         int affected = statement.executeUpdate();
+		
+		         log.debug("Merged " + affected + " transactions from channel " + fromChannelID + " into node " + toChannelID);
+		         
+		         return null;
+		      }
+		      finally
+		      {
+		         closeStatement(statement);
+		      }
+   		}
       }
-      catch (Exception e)
-      {
-         wrap.exceptionOccurred();
-         throw e;
-      }
-      finally
-      {
-         closeConnection(conn);
-         closeStatement(statement);
-         wrap.end();
-      }
+      
+      new MergeTransactionsRunner().executeWithRetry();
    }
    
-   public InitialLoadInfo mergeAndLoad(long fromChannelID, long toChannelID, int numberToLoad, long firstPagingOrder, long nextPagingOrder) throws Exception
+   public InitialLoadInfo mergeAndLoad(final long fromChannelID, final long toChannelID, final int numberToLoad, final long firstPagingOrder, final long nextPagingOrder) throws Exception
    {
       if (trace) { log.trace("Merging channel from " + fromChannelID + " to " + toChannelID + " numberToLoad:" + numberToLoad + " firstPagingOrder:" + firstPagingOrder + " nextPagingOrder:" + nextPagingOrder); }
       
@@ -871,228 +748,211 @@
       {
       	throw new IllegalArgumentException("Cannot merge queues - they have the same channel id!!");
       }
-      
-      Connection conn = null;
-      PreparedStatement ps = null;
-      ResultSet rs = null;
-      TransactionWrapper wrap = new TransactionWrapper();
-      PreparedStatement ps2 = null;
-    
-      try
-      {
-         conn = ds.getConnection();
-         
-         /*
-          * If channel is paging and has full size f
-          * 
-          * then we don't need to load any refs but we need to:
-          * 
-          * make sure the page ord is correct across the old paged and new refs
-          * 
-          * we know the max page ord (from the channel) for the old refs so we just need to:
-          * 
-          * 1) Iterate through the failed channel and update page_ord = max + 1, max + 2 etc
-          * 
-          * 2) update channel id
-          * 
-          * 
-          * If channel is not paging and the total refs before and after <=f
-          * 
-          * 1) Load all refs from failed channel
-          * 
-          * 2) Update channel id
-          * 
-          * return those refs
-          * 
-          * 
-          * If channel is not paging but total new refs > f
-          * 
-          * 1) Iterate through failed channel refs and take the first x to make the channel full
-          * 
-          * 2) Update the others with page_ord starting at zero 
-          * 
-          * 3) Update channel id
-          * 
-          * In general:
-          * 
-          * We have number to load n, max page size p
-          * 
-          * 1) Iterate through failed channel refs in page_ord order
-          * 
-          * 2) Put the first n in a List.
-          * 
-          * 3) Initialise page_ord_count to be p or 0 depending on whether it was specified
-          * 
-          * 4) Update the page_ord of the remaining refs accordiningly
-          * 
-          * 5) Update the channel id
-          * 
-          */
-         
-         //First load the refs from the failed channel
 
-         List refs = new ArrayList();
-         
-         ps = conn.prepareStatement(getSQLStatement("LOAD_REFS"));
-         
-         ps.setLong(1, fromChannelID);
-                 
-         rs = ps.executeQuery();
-         
-         int count = 0;
-         
-         boolean arePaged = false;
-         
-         long pageOrd = nextPagingOrder;
-         
-         while (rs.next())
-         {
-            long msgId = rs.getLong(1);            
-            int deliveryCount = rs.getInt(2);
-            long sched = rs.getLong(3);
-            
-            if (count < numberToLoad)
-            {           
-               ReferenceInfo ri = new ReferenceInfo(msgId, deliveryCount, sched);
-               
-               refs.add(ri);
-            }
-            
-            // Set page ord
-            
-            if (ps2 == null)
-            {
-               ps2 = conn.prepareStatement(getSQLStatement("UPDATE_PAGE_ORDER"));
-            }
-                
-            if (count < numberToLoad)
-            {
-               ps2.setNull(1, Types.BIGINT);
-               
-               if (trace) { log.trace("Set page ord to null"); }
-            }
-            else
-            {                                 
-               ps2.setLong(1, pageOrd);
-               
-               if (trace) { log.trace("Set page ord to " + pageOrd); }
-               
-               arePaged = true; 
-               
-               pageOrd++;                      
-            }
-            
-            ps2.setLong(2, msgId);
-            
-            ps2.setLong(3, fromChannelID);
-            
-            int rows = executeWithRetry(ps2);
-            
-            if (trace) { log.trace("Update page ord updated " + rows + " rows"); }
-
-            count++;            
-         }
-         
-         ps.close();
-         
-         // Now swap the channel id
-         
-         ps = conn.prepareStatement(getSQLStatement("UPDATE_CHANNEL_ID"));
-         
-         ps.setLong(1, toChannelID);
-         
-         ps.setLong(2, fromChannelID);
-         
-         int rows = executeWithRetry(ps);
-         
-         if (trace) { log.trace("Update channel id updated " + rows + " rows"); }
-                           
-         if (arePaged)
-         {            
-            return new InitialLoadInfo(new Long(firstPagingOrder), new Long(pageOrd - 1), refs);
-         }
-         else
-         {
-            return new InitialLoadInfo(null, null, refs);
-         }         
-      }
-      catch (Exception e)
+      class MergeAndLoadRunner extends JDBCTxRunner
       {
-         wrap.exceptionOccurred();
-         throw e;
+      	public Object doTransaction() throws Exception
+   		{      
+		      PreparedStatement ps = null;
+		      ResultSet rs = null;      
+		      PreparedStatement ps2 = null;
+		    
+		      try
+		      {
+		         /*
+		          * If channel is paging and has full size f
+		          * 
+		          * then we don't need to load any refs but we need to:
+		          * 
+		          * make sure the page ord is correct across the old paged and new refs
+		          * 
+		          * we know the max page ord (from the channel) for the old refs so we just need to:
+		          * 
+		          * 1) Iterate through the failed channel and update page_ord = max + 1, max + 2 etc
+		          * 
+		          * 2) update channel id
+		          * 
+		          * 
+		          * If channel is not paging and the total refs before and after <=f
+		          * 
+		          * 1) Load all refs from failed channel
+		          * 
+		          * 2) Update channel id
+		          * 
+		          * return those refs
+		          * 
+		          * 
+		          * If channel is not paging but total new refs > f
+		          * 
+		          * 1) Iterate through failed channel refs and take the first x to make the channel full
+		          * 
+		          * 2) Update the others with page_ord starting at zero 
+		          * 
+		          * 3) Update channel id
+		          * 
+		          * In general:
+		          * 
+		          * We have number to load n, max page size p
+		          * 
+		          * 1) Iterate through failed channel refs in page_ord order
+		          * 
+		          * 2) Put the first n in a List.
+		          * 
+		          * 3) Initialise page_ord_count to be p or 0 depending on whether it was specified
+		          * 
+		          * 4) Update the page_ord of the remaining refs accordiningly
+		          * 
+		          * 5) Update the channel id
+		          * 
+		          */
+		         
+		         //First load the refs from the failed channel
+		
+		         List<ReferenceInfo> refs = new ArrayList<ReferenceInfo>();
+		         
+		         ps = conn.prepareStatement(getSQLStatement("LOAD_REFS"));
+		         
+		         ps.setLong(1, fromChannelID);
+		                 
+		         rs = ps.executeQuery();
+		         
+		         int count = 0;
+		         
+		         boolean arePaged = false;
+		         
+		         long pageOrd = nextPagingOrder;
+		         
+		         while (rs.next())
+		         {
+		            long msgId = rs.getLong(1);            
+		            int deliveryCount = rs.getInt(2);
+		            long sched = rs.getLong(3);
+		            
+		            if (count < numberToLoad)
+		            {           
+		               ReferenceInfo ri = new ReferenceInfo(msgId, deliveryCount, sched);
+		               
+		               refs.add(ri);
+		            }
+		            
+		            // Set page ord
+		            
+		            if (ps2 == null)
+		            {
+		               ps2 = conn.prepareStatement(getSQLStatement("UPDATE_PAGE_ORDER"));
+		            }
+		                
+		            if (count < numberToLoad)
+		            {
+		               ps2.setNull(1, Types.BIGINT);
+		               
+		               if (trace) { log.trace("Set page ord to null"); }
+		            }
+		            else
+		            {                                 
+		               ps2.setLong(1, pageOrd);
+		               
+		               if (trace) { log.trace("Set page ord to " + pageOrd); }
+		               
+		               arePaged = true; 
+		               
+		               pageOrd++;                      
+		            }
+		            
+		            ps2.setLong(2, msgId);
+		            
+		            ps2.setLong(3, fromChannelID);
+		            
+		            int rows = ps2.executeUpdate();
+		            
+		            if (trace) { log.trace("Update page ord updated " + rows + " rows"); }
+		
+		            count++;            
+		         }
+		         
+		         ps.close();
+		         
+		         ps = null;
+		         
+		         // Now swap the channel id
+		         
+		         ps = conn.prepareStatement(getSQLStatement("UPDATE_CHANNEL_ID"));
+		         
+		         ps.setLong(1, toChannelID);
+		         
+		         ps.setLong(2, fromChannelID);
+		         
+		         int rows = ps.executeUpdate();
+		         
+		         if (trace) { log.trace("Update channel id updated " + rows + " rows"); }
+		                           
+		         if (arePaged)
+		         {            
+		            return new InitialLoadInfo(new Long(firstPagingOrder), new Long(pageOrd - 1), refs);
+		         }
+		         else
+		         {
+		            return new InitialLoadInfo(null, null, refs);
+		         }         
+		      }
+		      finally
+		      {      
+		         closeResultSet(rs);
+		      	closeStatement(ps);
+		      	closeStatement(ps2);
+		      }
+   		}
       }
-      finally
-      {
-      	closeStatement(ps);
-      	closeStatement(ps2);
-      	closeConnection(conn);       
-         wrap.end();
-      }
+      return (InitialLoadInfo)new MergeAndLoadRunner().executeWithRetry();      
    }
    
-   public void updatePageOrder(long channelID, List references) throws Exception
+   public void updatePageOrder(final long channelID, final List references) throws Exception
    {
-      Connection conn = null;
-      PreparedStatement psUpdateReference = null;  
-      TransactionWrapper wrap = new TransactionWrapper();
-      
       if (trace) { log.trace("Updating page order for channel:" + channelID); }
-        
-      try
+      
+      class UpdatePageOrderRunner extends JDBCTxRunner
       {
-         conn = ds.getConnection();
-         
-         Iterator iter = references.iterator();
-         
-         psUpdateReference = conn.prepareStatement(getSQLStatement("UPDATE_PAGE_ORDER"));
-
-         while (iter.hasNext())
-         {
-            MessageReference ref = (MessageReference) iter.next();
-                 
-            psUpdateReference.setLong(1, ref.getPagingOrder());
-
-            psUpdateReference.setLong(2, ref.getMessage().getMessageID());
-            
-            psUpdateReference.setLong(3, channelID);
-            
-            if (usingBatchUpdates)
-            {
-               psUpdateReference.addBatch();
-            }
-            else
-            {
-               int rows = executeWithRetry(psUpdateReference);
-               
-               if (trace) { log.trace("Updated " + rows + " rows"); }
-            }
-         }
-                     
-         if (usingBatchUpdates)
-         {
-            int[] rowsReference = executeWithRetryBatch(psUpdateReference);
-            
-            if (trace) { logBatchUpdate(getSQLStatement("UPDATE_PAGE_ORDER"), rowsReference, "updated"); }
-         }
+      	public Object doTransaction() throws Exception
+   		{      
+      		PreparedStatement psUpdateReference = null;
+      		try
+      		{
+		         Iterator iter = references.iterator();
+		         
+		         psUpdateReference = conn.prepareStatement(getSQLStatement("UPDATE_PAGE_ORDER"));
+		
+		         while (iter.hasNext())
+		         {
+		            MessageReference ref = (MessageReference) iter.next();
+		                 
+		            psUpdateReference.setLong(1, ref.getPagingOrder());
+		
+		            psUpdateReference.setLong(2, ref.getMessage().getMessageID());
+		            
+		            psUpdateReference.setLong(3, channelID);
+		            
+		            int rows = psUpdateReference.executeUpdate();
+		               
+		            if (trace) { log.trace("Updated " + rows + " rows"); }		            
+		         }
+		                     
+		         return null;
+		      }
+		      finally
+		      {
+		      	closeStatement(psUpdateReference);
+		      }    
+   		}
       }
-      catch (Exception e)
-      {
-         wrap.exceptionOccurred();
-         throw e;
-      }
-      finally
-      {
-      	closeStatement(psUpdateReference);
-      	closeConnection(conn);       
-         wrap.end();
-      }    
+      
+      new UpdatePageOrderRunner().executeWithRetry();      
    }
       
-   public List getPagedReferenceInfos(long channelID, long orderStart, int number) throws Exception
+   public List getPagedReferenceInfos(final long channelID, final long orderStart, final int number) throws Exception
    {
       if (trace) { log.trace("loading message reference info for channel " + channelID + " from " + orderStart + " number " + number);      }
                  
-      List refs = new ArrayList();
+      List<ReferenceInfo> refs = new ArrayList<ReferenceInfo>();
       
       Connection conn = null;
       PreparedStatement ps = null;
@@ -1160,7 +1020,7 @@
    /*
     * Load the initial, non paged refs
     */
-   public InitialLoadInfo loadFromStart(long channelID, int number) throws Exception
+   public InitialLoadInfo loadFromStart(final long channelID, final int number) throws Exception
    {
       if (trace) { log.trace("loading initial reference infos for channel " + channelID);  }
                     
@@ -1208,7 +1068,7 @@
                  
          rs = ps.executeQuery();
          
-         List refs = new ArrayList();
+         List<ReferenceInfo> refs = new ArrayList<ReferenceInfo>();
          
          int count = 0;
          while (rs.next())
@@ -1255,12 +1115,58 @@
    // End of paging functionality
    // ===========================
    
-   public void addReference(long channelID, MessageReference ref, Transaction tx) throws Exception
-   {      
+   public void addReference(final long channelID, final MessageReference ref, final Transaction tx) throws Exception
+   {         	
+   	class AddReferenceRunner extends JDBCTxRunner
+   	{
+			public Object doTransaction() throws Exception
+			{
+	         PreparedStatement psReference = null;
+	         PreparedStatement psMessage = null;
+	         
+	         Message m = ref.getMessage();     
+	         
+	         try
+	         {                          
+	            psReference = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE_REF"));
+	            
+	            // Add the reference
+	            addReference(channelID, ref, psReference, false);
+	            
+	            int rows = psReference.executeUpdate();  
+	            
+	            if (trace) { log.trace("Inserted " + rows + " rows"); }
+	              
+	            if (!m.isPersisted())
+	            {
+	               // First time so persist the message
+	               psMessage = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE"));
+	               
+	               storeMessage(m, psMessage);
+	                                    
+		            rows = psMessage.executeUpdate();
+		            
+		            if (trace) { log.trace("Inserted/updated " + rows + " rows"); }     
+		            
+		            log.trace("message Inserted/updated " + rows + " rows");  
+		            
+		            //Needs to be at the end - in case an exception is thrown in which case retry will be attempted and we want to insert it again
+	               m.setPersisted(true);	                 
+	            }
+	            
+	            return null;
+	         }
+	         finally
+	         {
+	         	closeStatement(psReference);
+	         	closeStatement(psMessage);
+	         }      
+			}   		
+   	}
+   	   	
       if (tx != null)
       {
          //In a tx so we just add the ref in the tx in memory for now
-
          TransactionCallback callback = getCallback(tx);
 
          callback.addReferenceToAdd(channelID, ref);
@@ -1268,106 +1174,78 @@
       else
       {         
          //No tx so add the ref directly in the db
-         
-         TransactionWrapper wrap = new TransactionWrapper();
-         
-         PreparedStatement psReference = null;
-         PreparedStatement psMessage = null;
-         
-         Connection conn = ds.getConnection();
-         
-         Message m = ref.getMessage();     
-           
-         try
-         {            
-            // Get lock on message
-           // LockMap.instance.obtainLock(m);
-                                    
-            psReference = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE_REF"));
-            
-            // Add the reference
-            addReference(channelID, ref, psReference, false);
-            
-            int rows = executeWithRetry(psReference);      
-            
-            if (trace) { log.trace("Inserted " + rows + " rows"); }
-              
-            if (!m.isPersisted())
-            {
-               // First time so persist the message
-               psMessage = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE"));
-               
-               storeMessage(m, psMessage);
-               
-               m.setPersisted(true);
-                                      
-	            rows = executeWithRetry(psMessage);
-	            
-	            if (trace) { log.trace("Inserted/updated " + rows + " rows"); }     
-	            
-	            log.trace("message Inserted/updated " + rows + " rows");     
-            }
-         }
-         catch (Exception e)
-         {
-            wrap.exceptionOccurred();
-            throw e;
-         }
-         finally
-         {
-         	closeStatement(psReference);
-         	closeStatement(psMessage);
-         	closeConnection(conn);  
-            try
-            {
-               wrap.end();
-            }
-            finally
-            {   
-               //Release Lock
-             //  LockMap.instance.releaseLock(m);
-            }
-         }      
+         new AddReferenceRunner().executeWithRetry();         
       }
    }
    
-   public void updateDeliveryCount(long channelID, MessageReference ref) throws Exception
+   public void updateDeliveryCount(final long channelID, final MessageReference ref) throws Exception
    {
-      TransactionWrapper wrap = new TransactionWrapper();
-      
-      PreparedStatement psReference = null;
-      
-      Connection conn = ds.getConnection();
-       
-      try
-      {                                    
-         psReference = conn.prepareStatement(getSQLStatement("UPDATE_DELIVERY_COUNT"));
-         
-         psReference.setInt(1, ref.getDeliveryCount());
-         
-         psReference.setLong(2, channelID);
-         
-         psReference.setLong(3, ref.getMessage().getMessageID());
-         
-         int rows = executeWithRetry(psReference);
-
-         if (trace) { log.trace("Updated " + rows + " rows"); }
-      }
-      catch (Exception e)
-      {
-         wrap.exceptionOccurred();
-         throw e;
-      }
-      finally
-      {
-      	closeStatement(psReference);
-      	closeConnection(conn);
-         wrap.end();                        
-      }  
+   	class UpdateDeliveryCountRunner extends JDBCTxRunner
+   	{   		
+			public Object doTransaction() throws Exception
+			{   	
+		      PreparedStatement psReference = null;
+		       
+		      try
+		      {                                    
+		         psReference = conn.prepareStatement(getSQLStatement("UPDATE_DELIVERY_COUNT"));
+		         
+		         psReference.setInt(1, ref.getDeliveryCount());
+		         
+		         psReference.setLong(2, channelID);
+		         
+		         psReference.setLong(3, ref.getMessage().getMessageID());
+		         
+		         int rows = psReference.executeUpdate();
+		
+		         if (trace) { log.trace("Updated " + rows + " rows"); }
+		         
+		         return null;
+		      }
+		      finally
+		      {
+		      	closeStatement(psReference);                        
+		      }  
+			}
+   	}
+   	
+   	new UpdateDeliveryCountRunner().executeWithRetry();
    }
    
-   public void removeReference(long channelID, MessageReference ref, Transaction tx) throws Exception
+   public void removeReference(final long channelID, final MessageReference ref, final Transaction tx) throws Exception
    {      
+   	class RemoveReferenceRunner extends JDBCTxRunner
+   	{
+			public Object doTransaction() throws Exception
+			{  
+				PreparedStatement psReference = null;
+
+	         try
+	         {
+	            psReference = conn.prepareStatement(getSQLStatement("DELETE_MESSAGE_REF"));
+	            
+	            //Remove the message reference
+	            removeReference(channelID, ref, psReference);
+	             
+	            int rows = psReference.executeUpdate();
+	            
+	            if (rows != 1)
+	            {
+	               log.warn("Failed to remove row for: " + ref);
+	               return null;
+	            }
+	            
+	            if (trace) { log.trace("Deleted " + rows + " rows"); }             
+	            
+	            return null;
+	         }
+	         finally
+	         {
+	         	closeStatement(psReference);
+	         }  
+			}
+   	}
+   	   	
       if (tx != null)
       {
          //In a tx so we just add the ref in the tx in memory for now
@@ -1379,54 +1257,8 @@
       else
       {         
          //No tx so we remove the reference directly from the db
-         
-         TransactionWrapper wrap = new TransactionWrapper();
-         
-         PreparedStatement psReference = null;
-
-         Connection conn = ds.getConnection();
-         
-         Message m = ref.getMessage();         
-         
-         try
-         {
-            //get lock on message
-         //   LockMap.instance.obtainLock(m);
-                              
-            psReference = conn.prepareStatement(getSQLStatement("DELETE_MESSAGE_REF"));
-            
-            //Remove the message reference
-            removeReference(channelID, ref, psReference);
-             
-            int rows = executeWithRetry(psReference);
-            
-            if (rows != 1)
-            {
-               log.warn("Failed to remove row for: " + ref);
-               return;
-            }
-            
-            if (trace) { log.trace("Deleted " + rows + " rows"); }                                  
-         }
-         catch (Exception e)
-         {
-            wrap.exceptionOccurred();
-            throw e;
-         }
-         finally
-         {
-         	closeStatement(psReference);
-         	closeConnection(conn);
-//            try
-//            {
-               wrap.end();               
-//            }
-//            finally
-//            {      
-//               //release the lock
-//               LockMap.instance.releaseLock(m);
-//            }
-         }      
+      
+         new RemoveReferenceRunner().executeWithRetry();
       }
    }
    
@@ -1494,493 +1326,295 @@
       return callback;
    }
    
-   /**
-    * We order the list of references in ascending message order thus preventing deadlock when 2 or
-    * more channels are updating the same messages in different transactions.
-    */
-//   protected void orderReferences(List references)
-//   {      
-//      Collections.sort(references, MessageOrderComparator.instance);
-//   }
-   
-   protected void handleBeforeCommit1PC(List refsToAdd, List refsToRemove, Transaction tx)
+   protected void handleBeforeCommit1PC(final List refsToAdd, final List refsToRemove, final Transaction tx)
       throws Exception
    {
-      //TODO - A slight optimisation - it's possible we have refs referring to the same message
-      //       so we will end up acquiring the lock more than once which is unnecessary. If find
-      //       unique set of messages can avoid this.
+   	class HandleBeforeCommit1PCRunner extends JDBCTxRunner
+   	{
+			public Object doTransaction() throws Exception
+			{
+				// For one phase we simply add rows corresponding to the refs and remove rows corresponding to
+		      // the deliveries in one jdbc tx. We also need to store messages as necessary,
+		      // depending on whether they've already been stored or still referenced by other channels.
+		         
+		      PreparedStatement psReference = null;
+		      PreparedStatement psInsertMessage = null;
+		      PreparedStatement psDeleteReference = null;
+		      
+		      List<Message> messagesStored = new ArrayList<Message>();
 
-//      List allRefs = new ArrayList(refsToAdd.size() + refsToRemove.size());
-//
-//      for(Iterator i = refsToAdd.iterator(); i.hasNext(); )
-//      {
-//         ChannelRefPair pair = (ChannelRefPair)i.next();
-//         allRefs.add(pair.ref);
-//      }
-//
-//      for(Iterator i = refsToRemove.iterator(); i.hasNext(); )
-//      {
-//         ChannelRefPair pair = (ChannelRefPair)i.next();
-//         allRefs.add(pair.ref);
-//      }
-//            
-//      orderReferences(allRefs);
-      
-      // For one phase we simply add rows corresponding to the refs and remove rows corresponding to
-      // the deliveries in one jdbc tx. We also need to store or remove messages as necessary,
-      // depending on whether they've already been stored or still referenced by other channels.
-         
-      Connection conn = null;
-      PreparedStatement psReference = null;
-      PreparedStatement psInsertMessage = null;
-      TransactionWrapper wrap = new TransactionWrapper();
-      
-      try
-      {
-         conn = ds.getConnection();
-         
-         // Obtain locks on all messages
-        // getLocks(allRefs);
-         
-         // First the adds
+		      try
+		      {
+		         // First the adds
 
-         boolean messageInsertsInBatch = false;
-         boolean batch = usingBatchUpdates && refsToAdd.size() > 0;
+		         for (Iterator i = refsToAdd.iterator(); i.hasNext(); )
+		         {
+		            ChannelRefPair pair = (ChannelRefPair)i.next();
+		            MessageReference ref = pair.ref;
+		                                                
+		            psReference = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE_REF"));
+		            
+		            // Now store the reference
+		            addReference(pair.channelID, ref, psReference, false);
+		              
+		            int rows = psReference.executeUpdate();
+		               
+		            if (trace) { log.trace("Inserted " + rows + " rows"); }                              
+     
+		            Message m = ref.getMessage();    
+		            
+		            synchronized (m)
+		            {            
+			            if (!m.isPersisted())
+			            {   
+			            	if (psInsertMessage == null)
+			            	{
+			            		psInsertMessage = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE"));
+			            	}
+			            	
+			               // First time so add message
+			               storeMessage(m, psInsertMessage);
+			                 
+		                  if (trace) { log.trace("Message does not already exist so inserting it"); }
+		                  rows = psInsertMessage.executeUpdate();
+		                  if (trace) { log.trace("Inserted " + rows + " rows"); }
+			               
+			               m.setPersisted(true);
+			               
+			               messagesStored.add(m);
+			            }
+		            }
+		         }         
+		         
+		         // Now the removes
 
-         for (Iterator i = refsToAdd.iterator(); i.hasNext(); )
-         {
-            ChannelRefPair pair = (ChannelRefPair)i.next();
-            MessageReference ref = pair.ref;
-                                                
-            if (batch && psReference == null || !batch)
-            {
-               psReference = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE_REF"));
-            }
-
-            // Now store the reference
-            addReference(pair.channelID, ref, psReference, false);
-              
-            if (batch)
-            {
-               psReference.addBatch();
-            }
-            else
-            {
-               int rows = executeWithRetry(psReference);
-               
-               if (trace) { log.trace("Inserted " + rows + " rows"); }                              
-
-               psReference.close();
-               psReference = null;
-            }
-            
-            Message m = ref.getMessage();    
-            
-            synchronized (m)
-            {            
-	            if (!m.isPersisted())
-	            {   
-	               if (batch && psInsertMessage == null || !batch)
-	               {
-	               	psInsertMessage = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE"));               	
-	               }
-	            	
-	               // First time so add message
-	               storeMessage(m, psInsertMessage);
-	               m.setPersisted(true);
-	
-		            if (batch)
+		         for (Iterator i = refsToRemove.iterator(); i.hasNext(); )
+		         {
+		            ChannelRefPair pair = (ChannelRefPair)i.next();
+		            
+		            if (psDeleteReference == null)
 		            {
-	                  psInsertMessage.addBatch();
-	                  if (trace) { log.trace("Message does not already exist so inserting it"); }
-	                  messageInsertsInBatch = true;	               
+		            	psDeleteReference = conn.prepareStatement(getSQLStatement("DELETE_MESSAGE_REF"));
 		            }
-		            else
-		            {
-	                  if (trace) { log.trace("Message does not already exist so inserting it"); }
-	                  int rows = executeWithRetry(psInsertMessage);
-	                  if (trace) { log.trace("Inserted " + rows + " rows"); }
-		               
-		               psInsertMessage.close();
-		               psInsertMessage = null;
-		            }
-	            }
-            }
-         }         
-         
-         if (batch)
-         {
-            // Process the add batch
 
-            int[] rowsReference = executeWithRetryBatch(psReference);
-            
-            if (trace) { logBatchUpdate(getSQLStatement("INSERT_MESSAGE_REF"), rowsReference, "inserted"); }
-            
-            if (messageInsertsInBatch)
-            {
-               int[] rowsMessage = executeWithRetryBatch(psInsertMessage);
-               if (trace) { logBatchUpdate(getSQLStatement("INSERT_MESSAGE"), rowsMessage, "inserted"); }
-            }
-
-            psReference.close();
-            psReference = null;
-            psInsertMessage.close();
-            psInsertMessage = null;
-         }
-
-         // Now the removes
-
-         psReference = null;        
-         batch = usingBatchUpdates && refsToRemove.size() > 0;
-         
-         for (Iterator i = refsToRemove.iterator(); i.hasNext(); )
-         {
-            ChannelRefPair pair = (ChannelRefPair)i.next();
-            
-            if (batch && psReference == null || !batch)
-            {
-               psReference = conn.prepareStatement(getSQLStatement("DELETE_MESSAGE_REF"));
-            }
-
-            removeReference(pair.channelID, pair.ref, psReference);
-            
-            if (batch)
-            {
-               psReference.addBatch();
-            }
-            else
-            {
-               int rows = executeWithRetry(psReference);
-               if (trace) { log.trace("Deleted " + rows + " rows"); }
-               psReference.close();
-               psReference = null;
-            }                
-         }
-         
-         if (batch)
-         {
-            // Process the remove batch
-
-            int[] rows = executeWithRetryBatch(psReference);
-            
-            if (trace) { logBatchUpdate(getSQLStatement("DELETE_MESSAGE_REF"), rows, "deleted"); }
-                       
-            psReference.close();
-            psReference = null;           
-         }
-      }
-      catch (Exception e)
-      {
-         wrap.exceptionOccurred();                  
-         throw e;
-      }
-      finally
-      {
-      	closeStatement(psReference);
-      	closeStatement(psInsertMessage);
-      	closeConnection(conn);        
-        // try
-        // {
-            wrap.end();                        
-//         }
-//         finally
-//         {  
-//            //Release the locks
-//            this.releaseLocks(allRefs);
-//         }
-      }
+		            removeReference(pair.channelID, pair.ref, psDeleteReference);
+		                        
+		            int rows = psDeleteReference.executeUpdate();
+		            
+		            if (trace) { log.trace("Deleted " + rows + " rows"); }                         
+		         }
+		         
+		         return null;
+		      }
+		      catch (Exception e)
+		      {
+		      	for (Iterator i = messagesStored.iterator(); i.hasNext(); )
+		         {
+		      		Message msg = (Message)i.next();
+		      		
+		      		msg.setPersisted(false);
+		         }
+		      	throw e;
+		      }
+		      finally
+		      {
+		      	closeStatement(psReference);
+		      	closeStatement(psDeleteReference);
+		      	closeStatement(psInsertMessage);
+		      }
+			}   		
+   	}   	      
+   	new HandleBeforeCommit1PCRunner().executeWithRetry();
    }
    
-   protected void handleBeforeCommit2PC(List refsToRemove, Transaction tx)
-      throws Exception
+   protected void handleBeforeCommit2PC(final Transaction tx) throws Exception
    {          
-      Connection conn = null;
-     
-      TransactionWrapper wrap = new TransactionWrapper();
-      
-//      List refs = new ArrayList(refsToRemove.size());
-//      Iterator iter = refsToRemove.iterator();
-//      while (iter.hasNext())
-//      {
-//         ChannelRefPair pair = (ChannelRefPair)iter.next();
-//         refs.add(pair.ref);
-//      }
-//            
-//      orderReferences(refs);      
-      
-      try
-      {
-         //get locks on all the refs
-      //   this.getLocks(refs);
-         
-         conn = ds.getConnection();
-                  
-         //2PC commit
-         
-         //We commit any refs in state "+" to "C" and delete any
-         //refs in state "-", then we
-         //remove any messages due to refs we just removed
-         //if they're not referenced elsewhere
-         
-         commitPreparedTransaction(tx, conn);                      
-      }
-      catch (Exception e)
-      {
-         wrap.exceptionOccurred();
-         throw e;
-      }
-      finally
-      {
-      	closeConnection(conn);        
-//         try
-//         {
-            wrap.end();
-//         }
-//         finally
-//         {
-//            //release the locks
-//            this.releaseLocks(refs);
-//         }
-      }
+   	class HandleBeforeCommit2PCRunner extends JDBCTxRunner
+   	{
+   		public Object doTransaction() throws Exception
+			{				   	
+		   	PreparedStatement ps = null;
+		      
+		      if (trace) { log.trace(this + " commitPreparedTransaction, tx= " + tx); }
+		        
+		      try
+		      {
+		         ps = conn.prepareStatement(getSQLStatement("COMMIT_MESSAGE_REF1"));
+		         
+		         ps.setLong(1, tx.getId());        
+		         
+		         int rows = ps.executeUpdate();
+		         
+		         if (trace) { log.trace(JDBCUtil.statementToString(getSQLStatement("COMMIT_MESSAGE_REF1"), new Long(tx.getId())) + " removed " + rows + " row(s)");  }
+		         
+		         ps.close();
+		         ps = null;
+		         
+		         
+		         ps = conn.prepareStatement(getSQLStatement("COMMIT_MESSAGE_REF2"));
+		         ps.setLong(1, tx.getId());         
+		         
+		         rows = ps.executeUpdate();
+		         
+		         if (trace) { log.trace(JDBCUtil.statementToString(getSQLStatement("COMMIT_MESSAGE_REF2"), new Long(tx.getId())) + " updated " + rows + " row(s)"); }
+		         
+		         removeTXRecord(conn, tx);
+		         
+		         return null;
+		      }
+		      finally
+		      {
+		      	closeStatement(ps);
+		      }
+			}
+   	}
+   	
+   	new HandleBeforeCommit2PCRunner().executeWithRetry();
    }
    
-   protected void handleBeforePrepare(List refsToAdd, List refsToRemove, Transaction tx) throws Exception
+   protected void handleBeforePrepare(final List refsToAdd, final List refsToRemove, final Transaction tx) throws Exception
    {
-      //We only need to lock on the adds
-//      List refs = new ArrayList(refsToAdd.size());
-//      
-//      Iterator iter = refsToAdd.iterator();
-//      while (iter.hasNext())
-//      {
-//         ChannelRefPair pair = (ChannelRefPair)iter.next();
-//         
-//         refs.add(pair.ref);
-//      }
-//      
-//      orderReferences(refs);
-      
-      //We insert a tx record and
-      //a row for each ref with +
-      //and update the row for each delivery with "-"
-      
-      PreparedStatement psReference = null;
-      PreparedStatement psInsertMessage = null;
-      Connection conn = null;
-      TransactionWrapper wrap = new TransactionWrapper();
-      
-      try
-      {
-         //get the locks
-        // getLocks(refs);
-         
-         conn = ds.getConnection();
-         
-         //Insert the tx record
-         if (!refsToAdd.isEmpty() || !refsToRemove.isEmpty())
-         {
-            addTXRecord(conn, tx);
-         }
-         
-         Iterator iter = refsToAdd.iterator();
-         boolean batch = usingBatchUpdates && refsToAdd.size() > 1;
-         boolean messageInsertsInBatch = false;
+   	class HandleBeforePrepareRunner extends JDBCTxRunner
+   	{
+			public Object doTransaction() throws Exception
+			{
+				//We insert a tx record and
+		      //a row for each ref with +
+		      //and update the row for each delivery with "-"
+		      
+		      PreparedStatement psReference = null;
+		      PreparedStatement psInsertMessage = null;
+		      PreparedStatement psUpdateReference = null;
+		 
+		      try
+		      {
+		         conn = ds.getConnection();
+		         
+		         //Insert the tx record
+		         if (!refsToAdd.isEmpty() || !refsToRemove.isEmpty())
+		         {
+		            addTXRecord(conn, tx);
+		         }
+		         
+		         Iterator iter = refsToAdd.iterator();
 
-         while (iter.hasNext())
-         {
-            ChannelRefPair pair = (ChannelRefPair) iter.next();
-            
-            if (batch && psReference == null || !batch)
-            {
-               psReference = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE_REF"));
-            }
-
-            prepareToAddReference(pair.channelID, pair.ref, tx, psReference);
-            
-            if (batch)
-            {
-               psReference.addBatch();
-            }
-            else
-            {
-               int rows = executeWithRetry(psReference);
-               
-               if (trace) { log.trace("Inserted " + rows + " rows"); }
-
-               psReference.close();
-               psReference = null;
-            }
-                       
-            Message m = pair.ref.getMessage(); 
-            
-            synchronized (m)
-            {
-            
-	            if (!m.isPersisted())
-	            {	            	
-		            if (batch && psInsertMessage == null || !batch)
-		            {
-		            	psInsertMessage = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE"));	            	
-		            }
-		
-		            storeMessage(m, psInsertMessage);               
+		         while (iter.hasNext())
+		         {
+		            ChannelRefPair pair = (ChannelRefPair) iter.next();
 		            
-		            m.setPersisted(true);
-	
-		            if (batch)
+		            if (psReference == null)
 		            {
-		            	psInsertMessage.addBatch();
-		            	
-		            	messageInsertsInBatch = true;
+		            	psReference = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE_REF"));
 		            }
-		            else
-		            {
-		            	int rows = executeWithRetry(psInsertMessage);
-	
-		            	if (trace) { log.trace("Inserted " + rows + " rows"); }
-	
-		            	psInsertMessage.close();
-		            	
-		            	psInsertMessage = null;
-		            }
-	            }
-            }
-         }         
-         
-         if (batch)
-         {
-            int[] rowsReference = executeWithRetryBatch(psReference);
-            
-            if (trace) { logBatchUpdate(getSQLStatement("INSERT_MESSAGE_REF"), rowsReference, "inserted"); }
-            
-            if (messageInsertsInBatch)
-            {
-               int[] rowsMessage = executeWithRetryBatch(psInsertMessage);
-               
-               if (trace) { logBatchUpdate(getSQLStatement("INSERT_MESSAGE"), rowsMessage, "inserted"); }
-            }
+		            
+		            prepareToAddReference(pair.channelID, pair.ref, tx, psReference);
+		            
+		            int rows = psReference.executeUpdate();
+		               
+		            if (trace) { log.trace("Inserted " + rows + " rows"); }
+           
+		            Message m = pair.ref.getMessage(); 
+		            
+		            synchronized (m)
+		            {            
+			            if (!m.isPersisted())
+			            {	            	
+			            	if (psInsertMessage == null)
+			            	{
+			            		psInsertMessage = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE"));
+			            	}
+				            		
+				            storeMessage(m, psInsertMessage);               
+				            
+				            rows = psInsertMessage.executeUpdate();
+			
+			            	if (trace) { log.trace("Inserted " + rows + " rows"); }
 
-            psReference.close();
-            psReference = null;
-            psInsertMessage.close();
-            psInsertMessage = null;
-         }
-         
-         //Now the removes
-         
-         iter = refsToRemove.iterator();
-         
-         batch = usingBatchUpdates && refsToRemove.size() > 1;
-         
-         if (batch)
-         {
-            psReference = conn.prepareStatement(getSQLStatement("UPDATE_MESSAGE_REF"));
-         }
-
-         while (iter.hasNext())
-         {
-            ChannelRefPair pair = (ChannelRefPair) iter.next();
-            
-            if (!batch)
-            {
-               psReference = conn.prepareStatement(getSQLStatement("UPDATE_MESSAGE_REF"));
-            }
-            
-            prepareToRemoveReference(pair.channelID, pair.ref, tx, psReference);
-            
-            if (batch)
-            {
-               psReference.addBatch();
-            }
-            else
-            {
-               int rows = executeWithRetry(psReference);
-               
-               if (trace) { log.trace("updated " + rows + " rows"); }
-               
-               psReference.close();
-               psReference = null;
-            }
-         }
-         
-         if (batch)
-         {
-            int[] rows = executeWithRetryBatch(psReference);
-            
-            if (trace) { logBatchUpdate(getSQLStatement("UPDATE_MESSAGE_REF"), rows, "updated"); }
-            
-            psReference.close();
-            psReference = null;
-         }
-      }
-      catch (Exception e)
-      {
-         wrap.exceptionOccurred();
-         throw e;
-      }
-      finally
-      {
-      	closeStatement(psReference);
-      	closeStatement(psInsertMessage);
-      	closeConnection(conn);         
-//         try
-//         {
-            wrap.end();            
-//         }
-//         finally
-//         {
-            //release the locks
-            
-           // this.releaseLocks(refs);
-       //  }
-      }
+				            m.setPersisted(true);				   			
+			            }
+		            }
+		         }         
+		                  
+		         //Now the removes
+		         
+		         iter = refsToRemove.iterator();
+		         
+		         while (iter.hasNext())
+		         {
+		         	if (psUpdateReference == null)
+		         	{
+				         psUpdateReference = conn.prepareStatement(getSQLStatement("UPDATE_MESSAGE_REF"));         
+		         	}
+		         	
+		            ChannelRefPair pair = (ChannelRefPair) iter.next();
+		            
+		            prepareToRemoveReference(pair.channelID, pair.ref, tx, psUpdateReference);
+		            
+		            int rows = psUpdateReference.executeUpdate();
+		               
+		            if (trace) { log.trace("updated " + rows + " rows"); }          
+		         }         
+		         
+		         return null;
+		      }
+		      finally
+		      {
+		      	closeStatement(psReference);
+		      	closeStatement(psInsertMessage);  
+		      	closeStatement(psUpdateReference);
+		      }
+			}   		
+   	}
+   	
+   	new HandleBeforePrepareRunner().executeWithRetry();
    }
    
-   protected void handleBeforeRollback(List refsToAdd, Transaction tx) throws Exception
+   protected void handleBeforeRollback(final List refsToAdd, final Transaction tx) throws Exception
    {
-      //remove refs marked with +
-      //and update rows marked with - to C
-            
-      Connection conn = null;
-      TransactionWrapper wrap = new TransactionWrapper();
-      
-//      List refs = new ArrayList(refsToAdd.size());
-//      
-//      Iterator iter = refsToAdd.iterator();
-//      
-//      while (iter.hasNext())
-//      {
-//         ChannelRefPair pair = (ChannelRefPair)iter.next();
-//         refs.add(pair.ref);
-//      }
-//      
-//      orderReferences(refs);
-      
-      try
-      {
-       //  this.getLocks(refs);
-         
-         conn = ds.getConnection();
-         
-         rollbackPreparedTransaction(tx, conn);         
-      }
-      catch (Exception e)
-      {
-         wrap.exceptionOccurred();
-         throw e;
-      }
-      finally
-      { 	
-      	closeConnection(conn);                
-//         try
-//         {
-            wrap.end();
-//         }
-//         finally
-//         {
-//            //release locks
-//            this.releaseLocks(refs);
-//         }
-      }      
+   	class HandleBeforeRollbackRunner extends JDBCTxRunner
+   	{
+			public Object doTransaction() throws Exception
+			{
+				PreparedStatement ps = null;
+		      
+		      try
+		      {
+		         ps = conn.prepareStatement(getSQLStatement("ROLLBACK_MESSAGE_REF1"));
+		         
+		         ps.setLong(1, tx.getId());         
+		         
+		         int rows = ps.executeUpdate();
+		         
+		         if (trace)
+		         {
+		            log.trace(JDBCUtil.statementToString(getSQLStatement("ROLLBACK_MESSAGE_REF1"), new Long(tx.getId())) + " removed " + rows + " row(s)");
+		         }
+		         
+		         ps.close();
+		         
+		         ps = conn.prepareStatement(getSQLStatement("ROLLBACK_MESSAGE_REF2"));
+		         ps.setLong(1, tx.getId());
+		         
+		         rows = ps.executeUpdate();
+		         
+		         if (trace)
+		         {
+		            log.trace(JDBCUtil.statementToString(getSQLStatement("ROLLBACK_MESSAGE_REF2"), new Long(tx.getId())) + " updated " + rows
+		                  + " row(s)");
+		         }
+		         
+		         removeTXRecord(conn, tx);
+		         
+		         return null;
+		      }
+		      finally
+		      {
+		      	closeStatement(ps);
+		      }
+			}   	
+   	}
+   	
+   	new HandleBeforeRollbackRunner().executeWithRetry();
    }
    
    
@@ -2021,8 +1655,7 @@
          
          setVarBinaryColumn(5, ps, xid.getGlobalTransactionId());
          
-         rows = executeWithRetry(ps);
-         
+         rows = ps.executeUpdate();         
       }
       finally
       {
@@ -2053,7 +1686,7 @@
          
          ps.setLong(2, tx.getId());
          
-         int rows = executeWithRetry(ps);
+         int rows = ps.executeUpdate();
          
          if (trace)
          {
@@ -2126,83 +1759,6 @@
       ps.setLong(3, channelID);           
    }
    
-   protected void commitPreparedTransaction(Transaction tx, Connection conn) throws Exception
-   {
-      PreparedStatement ps = null;
-      
-      if (trace) { log.trace(this + " commitPreparedTransaction, tx= " + tx); }
-        
-      try
-      {
-         ps = conn.prepareStatement(getSQLStatement("COMMIT_MESSAGE_REF1"));
-         
-         ps.setLong(1, tx.getId());        
-         
-         int rows = executeWithRetry(ps);
-         
-         if (trace)
-         {
-            log.trace(JDBCUtil.statementToString(getSQLStatement("COMMIT_MESSAGE_REF1"), new Long(tx.getId())) + " removed " + rows + " row(s)");
-         }
-         
-         ps.close();
-         ps = conn.prepareStatement(getSQLStatement("COMMIT_MESSAGE_REF2"));
-         ps.setLong(1, tx.getId());         
-         
-         rows = executeWithRetry(ps);
-         
-         if (trace)
-         {
-            log.trace(JDBCUtil.statementToString(getSQLStatement("COMMIT_MESSAGE_REF2"), new Long(tx.getId())) + " updated " + rows
-                  + " row(s)");
-         }
-         
-         removeTXRecord(conn, tx);
-      }
-      finally
-      {
-      	closeStatement(ps);
-      }
-   }
-   
-   protected void rollbackPreparedTransaction(Transaction tx, Connection conn) throws Exception
-   {
-      PreparedStatement ps = null;
-      
-      try
-      {
-         ps = conn.prepareStatement(getSQLStatement("ROLLBACK_MESSAGE_REF1"));
-         
-         ps.setLong(1, tx.getId());         
-         
-         int rows = executeWithRetry(ps);
-         
-         if (trace)
-         {
-            log.trace(JDBCUtil.statementToString(getSQLStatement("ROLLBACK_MESSAGE_REF1"), new Long(tx.getId())) + " removed " + rows + " row(s)");
-         }
-         
-         ps.close();
-         
-         ps = conn.prepareStatement(getSQLStatement("ROLLBACK_MESSAGE_REF2"));
-         ps.setLong(1, tx.getId());
-         
-         rows = executeWithRetry(ps);
-         
-         if (trace)
-         {
-            log.trace(JDBCUtil.statementToString(getSQLStatement("ROLLBACK_MESSAGE_REF2"), new Long(tx.getId())) + " updated " + rows
-                  + " row(s)");
-         }
-         
-         removeTXRecord(conn, tx);
-      }
-      finally
-      {
-      	closeStatement(ps);
-      }
-   }
-   
    protected byte[] mapToBytes(Map map) throws Exception
    {
       if (map == null || map.isEmpty())
@@ -2405,28 +1961,6 @@
       }
    }
    
-//   protected void getLocks(List refs)
-//   {
-//      Iterator iter = refs.iterator();
-//      while (iter.hasNext())
-//      {
-//         MessageReference ref = (MessageReference)iter.next();
-//         Message m = ref.getMessage();
-//         LockMap.instance.obtainLock(m);        
-//      }
-//   }
-//   
-//   protected void releaseLocks(List refs)
-//   {
-//      Iterator iter = refs.iterator();
-//      while (iter.hasNext())
-//      {
-//         MessageReference ref = (MessageReference)iter.next();
-//         Message m = ref.getMessage();
-//         LockMap.instance.releaseLock(m);         
-//      }
-//   }
-   
    protected void logBatchUpdate(String name, int[] rows, String action)
    {
       int count = 0;
@@ -2436,27 +1970,12 @@
       }
       log.trace("Batch update " + name + ", " + action + " total of " + count + " rows");
    }
-   
-   protected int executeWithRetry(PreparedStatement ps) throws Exception
-   {
-      return executeWithRetry(ps, false, false)[0];
-   }
-   
-   protected int executeWithRetryIgnoreKeyViolation(PreparedStatement ps) throws Exception
-   {
-      return executeWithRetry(ps, false, true)[0];
-   }
-   
-   protected int[] executeWithRetryBatch(PreparedStatement ps) throws Exception
-   {
-      return executeWithRetry(ps, true, false);
-   }
-   
+
    //PersistentServiceSupport overrides ----------------------------
    
    protected Map getDefaultDDLStatements()
    {
-      Map map = new LinkedHashMap();
+      Map<String, String> map = new LinkedHashMap<String, String>();
       //Message reference
       map.put("CREATE_MESSAGE_REFERENCE",
               "CREATE TABLE JBM_MSG_REF (CHANNEL_ID BIGINT, " +
@@ -2487,7 +2006,7 @@
       
    protected Map getDefaultDMLStatements()
    {                
-      Map map = new LinkedHashMap();
+      Map<String, String> map = new LinkedHashMap<String, String>();
       //Message reference
       map.put("INSERT_MESSAGE_REF",
               "INSERT INTO JBM_MSG_REF (CHANNEL_ID, MESSAGE_ID, TRANSACTION_ID, STATE, ORD, PAGE_ORD, DELIVERY_COUNT, SCHED_DELIVERY) " +
@@ -2550,80 +2069,6 @@
    
    // Private -------------------------------------------------------
    
-
-   
-   private int[] executeWithRetry(PreparedStatement ps, boolean batch, boolean ignoreKeyViolation) throws Exception
-   {
-      final int MAX_TRIES = 25;      
-      
-      int rows = 0;
-      
-      int[] rowsArr = null;
-      
-      int tries = 0;
-      
-      while (true)
-      {
-         try
-         {
-            if (batch)
-            {
-               rowsArr = ps.executeBatch();
-            }
-            else
-            {
-            	try
-            	{
-            		rows = ps.executeUpdate();
-            	}
-            	catch (SQLException e)
-            	{
-            		if (ignoreKeyViolation && e.getSQLState().equals("23000"))
-            		{
-            			//Key violation - ignore
-            			log.info("Got key violation - but ignoring");
-            		}
-            		else
-            		{
-            			throw e;
-            		}
-            	}
-            }
-            
-            if (tries > 0)
-            {
-               log.warn("Update worked after retry");
-            }
-            break;
-         }
-         catch (SQLException e)
-         {
-            log.warn("SQLException caught - assuming deadlock detected, try:" + (tries + 1), e);
-            
-            log.info("SQLState:" + e.getSQLState() + " code:" + e.getErrorCode());
-            
-            tries++;
-            if (tries == MAX_TRIES)
-            {
-               log.error("Retried " + tries + " times, now giving up");
-               throw new IllegalStateException("Failed to update references");
-            }
-            log.warn("Trying again after a pause");
-            //Now we wait for a random amount of time to minimise risk of deadlock
-            Thread.sleep((long)(Math.random() * 500));
-         }  
-      }
-      
-      if (batch)
-      {
-         return rowsArr;
-      }
-      else
-      {
-         return new int[] { rows };
-      }
-   }
-   
    private List getMessageChannelPair(String sqlQuery, long transactionId) throws Exception
    {
       if (trace) log.trace("loading message and channel ids for tx [" + transactionId + "]");
@@ -2652,14 +2097,6 @@
          //Don't use a Map. A message could be in multiple channels in a tx, so if you use a map
          //when you put the same message again it's going to overwrite the previous put!!
          
-         List holders = new ArrayList();
-         
-         //Unique set of messages
-         Set msgIds = new HashSet();
-         
-         //TODO it would probably have been simpler just to have done all this in a SQL JOIN rather
-         //than do the join in memory.....
-         
          class Holder
          {
             long messageId;
@@ -2670,6 +2107,14 @@
                this.channelId = channelId;
             }
          }
+         
+         List<Holder> holders = new ArrayList<Holder>();
+         
+         //Unique set of messages
+         Set<Long> msgIds = new HashSet<Long>();
+         
+         //TODO it would probably have been simpler just to have done all this in a SQL JOIN rather
+         //than do the join in memory.....        
                   
          while(rs.next())
          {            
@@ -2680,7 +2125,7 @@
             
             holders.add(holder);
                         
-            msgIds.add(new Long(messageId));
+            msgIds.add(messageId);
             
             if (trace) log.trace("Loaded MsgID: " + messageId + " and ChannelID: " + channelId);
          }
@@ -2899,7 +2344,7 @@
          }
          else
          {
-            handleBeforeCommit2PC(refsToRemove, tx);
+            handleBeforeCommit2PC(tx);
          }
       }
       
@@ -2921,19 +2366,71 @@
       }
    }
    
-//   static class MessageOrderComparator implements Comparator
-//   {
-//      static MessageOrderComparator instance = new MessageOrderComparator();
-//      
-//      public int compare(Object o1, Object o2)
-//      {        
-//         MessageReference ref1 = (MessageReference)o1;
-//         MessageReference ref2 = (MessageReference)o2;
-//
-//         long id1 = ref1.getMessage().getMessageID();         
-//         long id2 = ref2.getMessage().getMessageID(); 
-//         
-//         return (id1 < id2 ? -1 : (id1 == id2 ? 0 : 1));
-//      }      
-//   }   
+   private abstract class JDBCTxRunner
+   {   
+   	private static final int MAX_TRIES = 25;
+   	
+   	Connection conn;
+
+      TransactionWrapper wrap;
+      
+		public Object execute() throws Exception
+		{
+	      wrap = new TransactionWrapper();
+	      
+	      try
+	      {
+	         conn = ds.getConnection();
+	         
+	         return doTransaction();
+	      }
+	      catch (Exception e)
+	      {
+	         wrap.exceptionOccurred();
+	         throw e;
+	      }
+	      finally
+	      {	      		      
+	      	closeConnection(conn);
+	         wrap.end();
+	      }  
+		}
+		
+		public Object executeWithRetry() throws Exception
+		{
+	      int tries = 0;
+	      
+	      while (true)
+	      {
+	         try
+	         {
+	            Object res = execute();
+	            
+	            if (tries > 0)
+	            {
+	               log.warn("Update worked after retry");
+	            }
+	            return res;	            
+	         }
+	         catch (SQLException e)
+	         {
+	            log.warn("SQLException caught - assuming deadlock detected, try:" + (tries + 1), e);
+	            
+	            log.info("SQLState:" + e.getSQLState() + " code:" + e.getErrorCode());
+	            
+	            tries++;
+	            if (tries == MAX_TRIES)
+	            {
+	               log.error("Retried " + tries + " times, now giving up");
+	               throw new IllegalStateException("Failed to excecute transaction");
+	            }
+	            log.warn("Trying again after a pause");
+	            //Now we wait for a random amount of time to minimise risk of deadlock
+	            Thread.sleep((long)(Math.random() * 500));
+	         }  
+	      }
+		}
+		
+		public abstract Object doTransaction() throws Exception;
+   }  
 }




More information about the jboss-cvs-commits mailing list