[jboss-cvs] JBoss Messaging SVN: r3024 - trunk/src/main/org/jboss/messaging/core/impl.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Aug 22 07:52:23 EDT 2007


Author: timfox
Date: 2007-08-22 07:52:23 -0400 (Wed, 22 Aug 2007)
New Revision: 3024

Modified:
   trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
Log:
Persistence manager interim commit 3


Modified: trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java	2007-08-22 10:44:34 UTC (rev 3023)
+++ trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java	2007-08-22 11:52:23 UTC (rev 3024)
@@ -631,6 +631,8 @@
       		PreparedStatement psInsertReference = null;
       		PreparedStatement psInsertMessage = null;
       		
+      		List persistedMessages = new ArrayList();
+      		
       		try
       		{      		
 	      		Iterator iter = references.iterator();
@@ -647,7 +649,7 @@
 	
 	         		//Now store the reference
 	
-	         		//log.info("Paged ref with page order " + ref.getPagingOrder());
+	         		log.trace("Paged ref with page order " + ref.getPagingOrder());
 	
 	         		addReference(channelID, ref, psInsertReference, page);
 	
@@ -675,12 +677,27 @@
 	         				if (trace) { log.trace("Inserted " + rows + " rows"); }	               
 	
 	         				m.setPersisted(true);
+	         				
+	         				persistedMessages.add(m);
 	         			}
 	         		}
 	         	} 
 	         	
 	         	return null;
       		}
+      		catch (SQLException e)
+      		{
+      			//The tx will be rolled back
+      			//so we need to set the messages to not persisted
+      			for (Iterator iter = persistedMessages.iterator(); iter.hasNext(); )
+               {
+                  Message msg = (Message)iter.next();
+      			
+                  msg.setPersisted(false);      				
+      			}
+      			
+      			throw e;
+      		}
       		finally
       		{
       			closeStatement(psInsertReference);
@@ -696,76 +713,70 @@
    public void removeDepagedReferences(long channelID, List references) throws Exception
    {
       if (trace) { log.trace(this + " Removing " + references.size() + " refs from channel " + channelID); }
-          
-      Connection conn = null;
-      PreparedStatement psDeleteReference = null;  
-      TransactionWrapper wrap = new TransactionWrapper();
-        
-      //We order the references
-     // orderReferences(references);
-             
-      try
+      
+      class RemoveDepagedReferencesRunner extends JDBCTxRunner
       {
-         //We get locks on all the messages - since they are ordered we avoid deadlock
-       //  getLocks(references);
-         
-         conn = ds.getConnection();
-         
-         Iterator iter = references.iterator();
-         
-         psDeleteReference = conn.prepareStatement(getSQLStatement("DELETE_MESSAGE_REF"));
-
-         while (iter.hasNext())
-         {
-            MessageReference ref = (MessageReference) iter.next();
-                                                             
-            removeReference(channelID, ref, psDeleteReference);
-            
-            //log.info("Removed ref with page order " + ref.getPagingOrder());
-            
-            if (usingBatchUpdates)
-            {
-               psDeleteReference.addBatch();
-            }
-            else
-            {
-               int rows = executeWithRetry(psDeleteReference);
-               
-               if (trace) { log.trace("Deleted " + rows + " rows"); }
-            }
-                          
-            //There is a small possibility that the ref is depaged here, then paged again, before this flag is set
-            //and the tx is committed so the message be attempted to be inserted twice but this should be ok
-            //since we ignore key violations on message insert
-            ref.getMessage().setPersisted(false);                  
-         }         
-         
-         if (usingBatchUpdates)
-         {
-            int[] rowsReference = executeWithRetryBatch(psDeleteReference);
-            
-            if (trace) { logBatchUpdate(getSQLStatement("DELETE_MESSAGE_REF"), rowsReference, "deleted"); }                                    
-         }              
+      	long channelID;
+      	List references;
+      	
+      	public RemoveDepagedReferencesRunner(long channelID, List references)
+      	{
+      		this.channelID = channelID;
+      		this.references = references;
+      	}
+      	
+      	public Object doTransaction() throws Exception
+   		{
+		      PreparedStatement psDeleteReference = null;  
+		       
+		      try
+		      {	
+		         psDeleteReference = conn.prepareStatement(getSQLStatement("DELETE_MESSAGE_REF"));
+		         
+		         Iterator iter = references.iterator();		         
+		
+		         while (iter.hasNext())
+		         {
+		            MessageReference ref = (MessageReference) iter.next();
+		                                                             
+		            removeReference(channelID, ref, psDeleteReference);
+		            
+		            //log.info("Removed ref with page order " + ref.getPagingOrder());
+		            
+		            if (usingBatchUpdates)
+		            {
+		               psDeleteReference.addBatch();
+		            }
+		            else
+		            {
+		               int rows = psDeleteReference.executeUpdate();
+		               
+		               if (trace) { log.trace("Deleted " + rows + " rows"); }
+		            }
+		                          
+		            //There is a small possibility that the ref is depaged here, then paged again, before this flag is set
+		            //and the tx is committed so the message be attempted to be inserted twice but this should be ok
+		            //since we ignore key violations on message insert
+		            ref.getMessage().setPersisted(false);                  
+		         }         
+		         
+		         if (usingBatchUpdates)
+		         {
+		            int[] rowsReference = executeWithRetryBatch(psDeleteReference);
+		            
+		            if (trace) { logBatchUpdate(getSQLStatement("DELETE_MESSAGE_REF"), rowsReference, "deleted"); }                                    
+		         }
+		         
+		         return null;
+		      }
+		      finally
+		      {
+		      	closeStatement(psDeleteReference);       
+		      }      
+   		}
       }
-      catch (Exception e)
-      {
-         wrap.exceptionOccurred();
-         throw e;
-      }
-      finally
-      {
-      	closeStatement(psDeleteReference);
-      	closeConnection(conn);         
-//         try
-//         {
-            wrap.end();
-//         }
-//         finally
-//         {     
-//            //And then release locks
-//            this.releaseLocks(references);
-//         }         
-      }      
+      
+      new RemoveDepagedReferencesRunner(channelID, references).executeWithRetry();
    }
    
    // After loading paged refs this is used to update P messages to non paged




More information about the jboss-cvs-commits mailing list