[jboss-cvs] JBoss Messaging SVN: r3030 - trunk/src/main/org/jboss/messaging/core/impl.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Aug 22 10:50:40 EDT 2007


Author: timfox
Date: 2007-08-22 10:50:40 -0400 (Wed, 22 Aug 2007)
New Revision: 3030

Modified:
   trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
   trunk/src/main/org/jboss/messaging/core/impl/PagingChannelSupport.java
Log:
Removed race condition


Modified: trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java	2007-08-22 14:01:01 UTC (rev 3029)
+++ trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java	2007-08-22 14:50:40 UTC (rev 3030)
@@ -521,6 +521,16 @@
    // Related to paging functionality
    // ===============================                 
    
+   //We need to prevent pageReferences and removeDepagedReferences being called concurrently
+   //This is because otherwise we could end up with a ref being depaged from channel A, then paged
+   //from channel B, but the channel B page doesn't insert the message since persisted is not set to true until after the
+   //the ref has been paged from A.
+   //We need to lock around the entire DB operation *including* the commit
+   //The locking could be made more fine grained but this would involve fine grained locking on list of messages
+   //which is fiddly since we would have to order them to prevent deadlocks
+   private Object pageLock = new Object();
+   
+   
    //Used to page NP messages or P messages in a non recoverable queue
    
    public void pageReferences(final long channelID, final List references, final boolean page) throws Exception
@@ -607,12 +617,15 @@
    		}      	      	      	
       }
    	
-   	new PageReferencesRunner().executeWithRetry(); 
+   	synchronized (pageLock)
+   	{
+   		new PageReferencesRunner().executeWithRetry();
+   	}
    }
          
    //After loading paged refs this is used to remove any NP or P messages in a unrecoverable channel
    public void removeDepagedReferences(final long channelID, final List references) throws Exception
-   {
+   {   	
       if (trace) { log.trace(this + " Removing " + references.size() + " refs from channel " + channelID); }
       
       class RemoveDepagedReferencesRunner extends JDBCTxRunner
@@ -633,16 +646,15 @@
 		                                                             
 		            removeReference(channelID, ref, psDeleteReference);
 		            
-		            //log.info("Removed ref with page order " + ref.getPagingOrder());
-		            
 		            int rows = psDeleteReference.executeUpdate();
 		               
 		            if (trace) { log.trace("Deleted " + rows + " rows"); }		            
 		                          
 		            //There is a small possibility that the ref is depaged here, then paged again, before this flag is set
 		            //and the tx is committed so the message be attempted to be inserted twice but this should be ok
-		            //since we ignore key violations on message insert
-		            ref.getMessage().setPersisted(false);                  
+		            //since we ignore key violations on message insert		      
+		            
+		            ref.getMessage().setPersisted(false); 
 		         }         
 		         
 		         return null;
@@ -654,7 +666,10 @@
    		}
       }
       
-      new RemoveDepagedReferencesRunner().executeWithRetry();
+      synchronized (pageLock)
+      {
+      	new RemoveDepagedReferencesRunner().executeWithRetry();
+      }
    }
    
    // After loading paged refs this is used to update P messages to non paged
@@ -2253,8 +2268,6 @@
          long end = System.currentTimeMillis();
          
          if (trace) { log.trace("Reaper reaped " + rows + " messages in " + (end - start) + " ms"); }
-         
-         log.info("Reaper reaped " + rows + " messages in " + (end - start) + " ms");
        }   	
    }
   
@@ -2424,10 +2437,8 @@
 	         }
 	         catch (SQLException e)
 	         {
-	            log.warn("SQLException caught - assuming deadlock detected, try:" + (tries + 1), e);
+	            log.warn("SQLException caught, SQLState " + e.getSQLState() + " code:" + e.getErrorCode() + "- assuming deadlock detected, try:" + (tries + 1), e);
 	            
-	            log.info("SQLState:" + e.getSQLState() + " code:" + e.getErrorCode());
-	            
 	            tries++;
 	            if (tries == MAX_TRIES)
 	            {

Modified: trunk/src/main/org/jboss/messaging/core/impl/PagingChannelSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/PagingChannelSupport.java	2007-08-22 14:01:01 UTC (rev 3029)
+++ trunk/src/main/org/jboss/messaging/core/impl/PagingChannelSupport.java	2007-08-22 14:50:40 UTC (rev 3030)
@@ -441,8 +441,6 @@
 
       Iterator iter = downCache.iterator();
       
-      log.info("Flushing down cache");
-      
       while (iter.hasNext())
       {
          MessageReference ref = (MessageReference) iter.next();




More information about the jboss-cvs-commits mailing list