[jboss-cvs] JBoss Messaging SVN: r3023 - trunk/src/main/org/jboss/messaging/core/impl.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Aug 22 06:44:34 EDT 2007


Author: timfox
Date: 2007-08-22 06:44:34 -0400 (Wed, 22 Aug 2007)
New Revision: 3023

Modified:
   trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
   trunk/src/main/org/jboss/messaging/core/impl/PagingChannelSupport.java
Log:
Persistence Manage interim commit 2


Modified: trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java	2007-08-21 21:07:50 UTC (rev 3022)
+++ trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java	2007-08-22 10:44:34 UTC (rev 3023)
@@ -31,10 +31,9 @@
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.sql.Statement;
 import java.sql.Types;
 import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -61,7 +60,6 @@
 import org.jboss.messaging.core.impl.tx.Transaction;
 import org.jboss.messaging.core.impl.tx.TxCallback;
 import org.jboss.messaging.util.JDBCUtil;
-import org.jboss.messaging.util.LockMap;
 import org.jboss.messaging.util.StreamUtils;
 import org.jboss.messaging.util.Util;
 
@@ -311,7 +309,82 @@
       }
    }
          
+   
+   abstract class JDBCTxRunner
+   {   
+   	private static final int MAX_TRIES = 25;
+   	
+   	Connection conn;
+
+      TransactionWrapper wrap;
+      
+		public Object execute() throws Exception
+		{
+	      wrap = new TransactionWrapper();
+	      
+	      try
+	      {
+	         conn = ds.getConnection();
+	         
+	         return doTransaction();
+	      }
+	      catch (Exception e)
+	      {
+	         wrap.exceptionOccurred();
+	         throw e;
+	      }
+	      finally
+	      {	      		      
+	      	closeConnection(conn);
+	         wrap.end();
+	      }  
+		}
+		
+		public Object executeWithRetry() throws Exception
+		{
+	      int tries = 0;
+	      
+	      while (true)
+	      {
+	         try
+	         {
+	            Object res = execute();
+	            
+	            if (tries > 0)
+	            {
+	               log.warn("Update worked after retry");
+	            }
+	            return res;	            
+	         }
+	         catch (SQLException e)
+	         {
+	            log.warn("SQLException caught - assuming deadlock detected, try:" + (tries + 1), e);
+	            
+	            log.info("SQLState:" + e.getSQLState() + " code:" + e.getErrorCode());
+	            
+	            tries++;
+	            if (tries == MAX_TRIES)
+	            {
+	               log.error("Retried " + tries + " times, now giving up");
+	               throw new IllegalStateException("Failed to excecute transaction");
+	            }
+	            log.warn("Trying again after a pause");
+	            //Now we wait for a random amount of time to minimise risk of deadlock
+	            Thread.sleep((long)(Math.random() * 500));
+	         }  
+	      }
+		}
+		
+		public abstract Object doTransaction() throws Exception;
+   }
+   
+   
                
+   
+   
+   
+   
+   
    // Related to counters
    // ===================
    
@@ -324,83 +397,87 @@
          throw new IllegalArgumentException("block size must be > 0");
       }
       
-      Connection conn = null;
-      PreparedStatement ps = null;
-      ResultSet rs = null;
-      TransactionWrapper wrap = new TransactionWrapper();
-      
-      try
+      class ReserveIDBlockRunner extends JDBCTxRunner
       {
-         conn = ds.getConnection();
-
-         //For the clustered case - this MUST use SELECT .. FOR UPDATE or a similar
-         //construct the locks the row
-         String selectCounterSQL = getSQLStatement("SELECT_COUNTER");
-         
-         ps = conn.prepareStatement(selectCounterSQL);
-         
-         ps.setString(1, counterName);
-         
-         rs = ps.executeQuery();
-         
-         if (trace) { log.trace(JDBCUtil.statementToString(selectCounterSQL, counterName)); }         
-         
-         if (!rs.next())
-         {
-            rs.close();
-            rs = null;
+      	String counterName;
+      	
+      	int size;
+      	
+      	public ReserveIDBlockRunner(String counterName, int size)
+      	{
+      		this.counterName = counterName;
+      		this.size = size;
+      	}
+      	
+      	public Object doTransaction() throws Exception
+   		{
+            //	For the clustered case - this MUST use SELECT .. FOR UPDATE or a similar
+            //construct the locks the row
+            String selectCounterSQL = getSQLStatement("SELECT_COUNTER");
             
-            ps.close();
+            PreparedStatement ps = null;
+            ResultSet rs = null;
             
-            //There is a very small possibility that two threads will attempt to insert the same counter
-            //at the same time, if so, then the second one will fail eventually after a few retries by throwing
-            //a primary key violation.
-            
-            String insertCounterSQL = getSQLStatement("INSERT_COUNTER");
-            
-            ps = conn.prepareStatement(insertCounterSQL);
-            
-            ps.setString(1, counterName);
-            ps.setLong(2, size);
-            
-            int rows = executeWithRetry(ps);
-            if (trace) { log.trace(JDBCUtil.statementToString(insertCounterSQL, counterName, new Integer(size)) + " inserted " + rows + " rows"); }
-            
-            ps.close();            
-            ps = null;
-            return 0;
-         }
-         
-         long nextId = rs.getLong(1);
-         
-         rs.close();
-         rs = null;
-         
-         ps.close();
+            try
+            {            
+	            ps = conn.prepareStatement(selectCounterSQL);
+	
+	            ps.setString(1, counterName);
+	            
+	            rs = ps.executeQuery();
+	            
+	            if (trace) { log.trace(JDBCUtil.statementToString(selectCounterSQL, counterName)); }         
+	            
+	            if (!rs.next())
+	            {
+	               rs.close();
+	               rs = null;
+	               
+	               ps.close();
+	               
+	               //There is a very small possibility that two threads will attempt to insert the same counter
+	               //at the same time, if so, then the second one will fail eventually after a few retries by throwing
+	               //a primary key violation.
+	               
+	               String insertCounterSQL = getSQLStatement("INSERT_COUNTER");
+	               
+	               ps = conn.prepareStatement(insertCounterSQL);
+	               
+	               ps.setString(1, counterName);
+	               ps.setLong(2, size);
+	               
+	               int rows = ps.executeUpdate();
+	               if (trace) { log.trace(JDBCUtil.statementToString(insertCounterSQL, counterName, new Integer(size)) + " inserted " + rows + " rows"); }
+	               
+	               return 0L;
+	            }
+	            
+	            long nextId = rs.getLong(1);
+	            
+	            ps.close();
+	
+	            String updateCounterSQL = getSQLStatement("UPDATE_COUNTER");
+	
+	            ps = conn.prepareStatement(updateCounterSQL);
+	            
+	            ps.setLong(1, nextId + size);
+	            ps.setString(2, counterName);
 
-         String updateCounterSQL = getSQLStatement("UPDATE_COUNTER");
+	            int rows = ps.executeUpdate();
 
-         ps = conn.prepareStatement(updateCounterSQL);
-         
-         ps.setLong(1, nextId + size);
-         ps.setString(2, counterName);
-         
-         int rows = executeWithRetry(ps);
-         if (trace) { log.trace(JDBCUtil.statementToString(updateCounterSQL, new Long(nextId + size), counterName) + " updated " + rows + " rows"); }
-         
-         return nextId;
+	            if (trace) { log.trace(JDBCUtil.statementToString(updateCounterSQL, new Long(nextId + size), counterName) + " updated " + rows + " rows"); }
+
+	            return nextId;
+            }
+            finally
+            {
+            	closeResultSet(rs);
+            	closeStatement(ps);
+            }
+   		}
       }
-      catch (Exception e)
-      {
-         wrap.exceptionOccurred();
-         throw e;
-      }
-      finally
-      {
-      	closeStatement(ps);
-      	closeConnection(conn);
-         wrap.end();
-      }     
+      
+      return (Long)new ReserveIDBlockRunner(counterName, size).executeWithRetry();
    }
          
    /*
@@ -535,143 +612,84 @@
    //Used to page NP messages or P messages in a non recoverable queue
    
    public void pageReferences(long channelID, List references, boolean page) throws Exception
-   {
-      Connection conn = null;
-      PreparedStatement psInsertReference = null;  
-      PreparedStatement psInsertMessage = null;    
-  //    PreparedStatement psMessageExists = null;
-      TransactionWrapper wrap = new TransactionWrapper();
-            
-      //First we order the references in message order
-      //orderReferences(references);
-                         
-      try
+   {      
+   	class PageReferencesRunner extends JDBCTxRunner
       {
-//         //Now we get a lock on all the messages. Since we have ordered the refs we should avoid deadlock
-//         getLocks(references);
-//         
-         conn = ds.getConnection();
-         
-         Iterator iter = references.iterator();
-                  
-         psInsertReference = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE_REF"));
-         psInsertMessage = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE"));
-         
-        // boolean insertsInBatch = false;
-              
-         while (iter.hasNext())
-         {
-            //We may need to persist the message itself 
-            MessageReference ref = (MessageReference) iter.next();
-                                            
-            //For non reliable refs we insert the ref (and maybe the message) itself
-                     
-            //Now store the reference
-            addReference(channelID, ref, psInsertReference, page);
-                        
-//            if (usingBatchUpdates)
-//            {
-//               psInsertReference.addBatch();
-//            }
-//            else
-//            {
-               int rows = executeWithRetry(psInsertReference);
-               
-               if (trace)
-               {
-                  log.trace("Inserted " + rows + " rows");
-               }
-            //}
-            
-            //Maybe we need to persist the message itself
-            Message m = ref.getMessage();
-            
-            synchronized (m)
-            {
-                                     
-            if (!m.isPersisted())
-            {            	               
-            	//The message might actually already exist due to it already being paged
-            	//so we insert and ignore key violations
-            	
-//            	if (psMessageExists == null)
-//            	{
-//            		psMessageExists = conn.prepareStatement(getSQLStatement("MESSAGE_EXISTS"));            		            		
-//            	}
-//            	
-//            	psMessageExists.setLong(1, m.getMessageID());
-//            	
-//            	ResultSet rs = null;
-//            	
-//            	try
-//            	{
-//            		rs = psMessageExists.executeQuery();
-//            		
-//            		if (!rs.next())
-//            		{
-            			storeMessage(m, psInsertMessage); 
-            			
-//            			if (usingBatchUpdates)
-//      	            {
-//      	               psInsertMessage.addBatch();	 
-//      	               
-//      	               insertsInBatch = true;
-//      	            }
-//      	            else
-//      	            {
-      	               rows = executeWithRetryIgnoreKeyViolation(psInsertMessage);
-      	                                      
-                        if (trace) { log.trace("Inserted " + rows + " rows"); }	               
-      	           // } 
-      	            m.setPersisted(true);
-            		}
-//            	}
-//            	finally
-//            	{
-//            		if (rs != null)
-//            		{
-//            			rs.close();
-//            		}
-//            	}  	                               	
-         //   }    
-            }
-         }         
-         
-//         if (usingBatchUpdates)
-//         {
-//            int[] rowsReference = executeWithRetryBatch(psInsertReference);
-//            
-//            if (trace) { logBatchUpdate(getSQLStatement("INSERT_MESSAGE_REF"), rowsReference, "inserted"); }
-//            
-//            if (insertsInBatch)
-//            {
-//               int[] rowsMessage = executeWithRetryBatch(psInsertMessage);
-//               
-//               if (trace) { logBatchUpdate(getSQLStatement("INSERT_MESSAGE"), rowsMessage, "inserted"); }
-//            }
-//         }        
+      	long channelID;
+      	List references;
+      	boolean page;
+      	
+      	public PageReferencesRunner(long channelID, List references, boolean page)
+      	{
+      		this.channelID = channelID;
+      		this.references = references;
+      		this.page = page;
+      	}
+      	
+      	public Object doTransaction() throws Exception
+   		{
+      		PreparedStatement psInsertReference = null;
+      		PreparedStatement psInsertMessage = null;
+      		
+      		try
+      		{      		
+	      		Iterator iter = references.iterator();
+	
+	         	psInsertReference = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE_REF"));
+	         	psInsertMessage = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE"));
+	
+	         	while (iter.hasNext())
+	         	{
+	         		//We may need to persist the message itself 
+	         		MessageReference ref = (MessageReference) iter.next();
+	
+	         		//For non reliable refs we insert the ref (and maybe the message) itself
+	
+	         		//Now store the reference
+	
+	         		//log.info("Paged ref with page order " + ref.getPagingOrder());
+	
+	         		addReference(channelID, ref, psInsertReference, page);
+	
+	         		int rows = psInsertReference.executeUpdate();
+	
+	         		if (trace)
+	         		{
+	         			log.trace("Inserted " + rows + " rows");
+	         		}
+	
+	         		//Maybe we need to persist the message itself
+	         		Message m = ref.getMessage();
+	
+	         		synchronized (m)
+	         		{
+	         			if (!m.isPersisted())
+	         			{            	               
+	         				//The message might actually already exist due to it already being paged
+	         				//so we insert and ignore key violations
+	
+	         				storeMessage(m, psInsertMessage); 
+	
+	         				rows = psInsertMessage.executeUpdate();
+	
+	         				if (trace) { log.trace("Inserted " + rows + " rows"); }	               
+	
+	         				m.setPersisted(true);
+	         			}
+	         		}
+	         	} 
+	         	
+	         	return null;
+      		}
+      		finally
+      		{
+      			closeStatement(psInsertReference);
+      			closeStatement(psInsertMessage);
+      		}
+   		}      	      	      	
       }
-      catch (Exception e)
-      {
-         wrap.exceptionOccurred();
-         throw e;
-      }
-      finally
-      {
-      	closeStatement(psInsertReference);
-      	closeStatement(psInsertMessage);
-     // 	closeStatement(psMessageExists);
-      	closeConnection(conn);         
-     //    try
-     //    {
-            wrap.end();                       
-//         }
-//         finally
-//         {            
-//            //And then release locks
-//            this.releaseLocks(references);
-//         }         
-      }      
+   	
+   	new PageReferencesRunner(channelID, references, page).executeWithRetry(); 
    }
          
    //After loading paged refs this is used to remove any NP or P messages in a unrecoverable channel
@@ -703,6 +721,8 @@
                                                              
             removeReference(channelID, ref, psDeleteReference);
             
+            //log.info("Removed ref with page order " + ref.getPagingOrder());
+            
             if (usingBatchUpdates)
             {
                psDeleteReference.addBatch();
@@ -2890,19 +2910,19 @@
       }
    }
    
-   static class MessageOrderComparator implements Comparator
-   {
-      static MessageOrderComparator instance = new MessageOrderComparator();
-      
-      public int compare(Object o1, Object o2)
-      {        
-         MessageReference ref1 = (MessageReference)o1;
-         MessageReference ref2 = (MessageReference)o2;
-
-         long id1 = ref1.getMessage().getMessageID();         
-         long id2 = ref2.getMessage().getMessageID(); 
-         
-         return (id1 < id2 ? -1 : (id1 == id2 ? 0 : 1));
-      }      
-   }   
+//   static class MessageOrderComparator implements Comparator
+//   {
+//      static MessageOrderComparator instance = new MessageOrderComparator();
+//      
+//      public int compare(Object o1, Object o2)
+//      {        
+//         MessageReference ref1 = (MessageReference)o1;
+//         MessageReference ref2 = (MessageReference)o2;
+//
+//         long id1 = ref1.getMessage().getMessageID();         
+//         long id2 = ref2.getMessage().getMessageID(); 
+//         
+//         return (id1 < id2 ? -1 : (id1 == id2 ? 0 : 1));
+//      }      
+//   }   
 }

Modified: trunk/src/main/org/jboss/messaging/core/impl/PagingChannelSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/PagingChannelSupport.java	2007-08-21 21:07:50 UTC (rev 3022)
+++ trunk/src/main/org/jboss/messaging/core/impl/PagingChannelSupport.java	2007-08-22 10:44:34 UTC (rev 3023)
@@ -441,6 +441,8 @@
 
       Iterator iter = downCache.iterator();
       
+      log.info("Flushing down cache");
+      
       while (iter.hasNext())
       {
          MessageReference ref = (MessageReference) iter.next();




More information about the jboss-cvs-commits mailing list