[jboss-cvs] JBoss Messaging SVN: r3038 - in trunk/src: main/org/jboss/messaging/core/impl and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Aug 23 09:56:17 EDT 2007


Author: timfox
Date: 2007-08-23 09:56:17 -0400 (Thu, 23 Aug 2007)
New Revision: 3038

Modified:
   trunk/src/etc/server/default/deploy/mysql-persistence-service.xml
   trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
Log:
Now uses conditional insert


Modified: trunk/src/etc/server/default/deploy/mysql-persistence-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/mysql-persistence-service.xml	2007-08-23 12:16:46 UTC (rev 3037)
+++ trunk/src/etc/server/default/deploy/mysql-persistence-service.xml	2007-08-23 13:56:17 UTC (rev 3038)
@@ -34,6 +34,7 @@
       <attribute name="UsingBatchUpdates">true</attribute>
       
       <attribute name="SqlProperties"><![CDATA[
+   CREATE_DUAL=CREATE TABLE JBM_DUAL (DUMMY INTEGER, PRIMARY KEY (DUMMY)) ENGINE = INNODB
    CREATE_MESSAGE_REFERENCE=CREATE TABLE JBM_MSG_REF (CHANNEL_ID BIGINT, MESSAGE_ID BIGINT, TRANSACTION_ID BIGINT, STATE CHAR(1), ORD BIGINT, PAGE_ORD BIGINT, DELIVERY_COUNT INTEGER, SCHED_DELIVERY BIGINT, PRIMARY KEY(CHANNEL_ID, MESSAGE_ID)) ENGINE = INNODB
    CREATE_IDX_MESSAGE_REF_TX=CREATE INDEX JBM_MSG_REF_TX ON JBM_MSG_REF (TRANSACTION_ID)
    CREATE_IDX_MESSAGE_REF_ORD=CREATE INDEX JBM_MSG_REF_ORD ON JBM_MSG_REF (ORD)
@@ -44,6 +45,7 @@
    CREATE_IDX_MESSAGE_TIMESTAMP=CREATE INDEX JBM_MSG_REF_TIMESTAMP ON JBM_MSG (TIMESTAMP)
    CREATE_TRANSACTION=CREATE TABLE JBM_TX (NODE_ID INTEGER, TRANSACTION_ID BIGINT, BRANCH_QUAL VARBINARY(254), FORMAT_ID INTEGER, GLOBAL_TXID VARBINARY(254), PRIMARY KEY (TRANSACTION_ID)) ENGINE = INNODB
    CREATE_COUNTER=CREATE TABLE JBM_COUNTER (NAME VARCHAR(255), NEXT_ID BIGINT, PRIMARY KEY(NAME)) ENGINE = INNODB
+   INSERT_DUAL=INSERT INTO JBM_DUAL VALUES (1)
    INSERT_MESSAGE_REF=INSERT INTO JBM_MSG_REF (CHANNEL_ID, MESSAGE_ID, TRANSACTION_ID, STATE, ORD, PAGE_ORD, DELIVERY_COUNT, SCHED_DELIVERY) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
    DELETE_MESSAGE_REF=DELETE FROM JBM_MSG_REF WHERE MESSAGE_ID=? AND CHANNEL_ID=? AND STATE='C'
    UPDATE_MESSAGE_REF=UPDATE JBM_MSG_REF SET TRANSACTION_ID=?, STATE='-' WHERE MESSAGE_ID=? AND CHANNEL_ID=? AND STATE='C'
@@ -61,7 +63,8 @@
    UPDATE_DELIVERY_COUNT=UPDATE JBM_MSG_REF SET DELIVERY_COUNT = ? WHERE CHANNEL_ID = ? AND MESSAGE_ID = ?
    UPDATE_CHANNEL_ID=UPDATE JBM_MSG_REF SET CHANNEL_ID = ? WHERE CHANNEL_ID = ?
    LOAD_MESSAGES=SELECT MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, HEADERS, PAYLOAD, TYPE FROM JBM_MSG
-   INSERT_MESSAGE=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, HEADERS, PAYLOAD, TYPE) VALUES (?, ?, ?, ?, ?, ?, ?, ?)   
+   INSERT_MESSAGE=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, HEADERS, PAYLOAD, TYPE) VALUES (?, ?, ?, ?, ?, ?, ?, ?)      
+   INSERT_MESSAGE_CONDITIONAL=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, HEADERS, PAYLOAD, TYPE) SELECT (?, ?, ?, ?, ?, ?, ?, ?) FROM JBM_DUAL WHERE NOT EXISTS (SELECT MESSAGE_ID FROM JBM_MSG WHERE MESSAGE_ID = ?)   
    MESSAGE_ID_COLUMN=MESSAGE_ID
    MESSAGE_EXISTS=SELECT MESSAGE_ID FROM JBM_MSG WHERE MESSAGE_ID = ? FOR UPDATE
    REAP_MESSAGES=DELETE FROM JBM_MSG WHERE TIMESTAMP < ? AND NOT EXISTS (SELECT * FROM JBM_MSG_REF WHERE JBM_MSG_REF.MESSAGE_ID = JBM_MSG.MESSAGE_ID)

Modified: trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java	2007-08-23 12:16:46 UTC (rev 3037)
+++ trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java	2007-08-23 13:56:17 UTC (rev 3038)
@@ -135,6 +135,8 @@
       super.start();
 
       Connection conn = null;
+      
+      PreparedStatement ps = null;
 
       TransactionWrapper wrap = new TransactionWrapper();
 
@@ -154,6 +156,27 @@
                "                         Using an isolation level more strict than READ_COMMITTED may lead to deadlock.\n";
             log.warn(warn);
          }
+         
+         //Now we need to insert a row in the DUAL table if it doesn't contain one already
+         ps = conn.prepareStatement(this.getSQLStatement("INSERT_DUAL"));
+         
+         try
+         {
+         	int rows = ps.executeUpdate();
+         
+         	if (trace) { log.trace("Inserted " + rows + " rows into dual"); }
+         }
+         catch (SQLException e)
+         {
+         	if (e.getSQLState().equals("23000"))
+         	{
+         		//Ignore  PK violation - since might already be inserted
+         	}
+         	else
+         	{
+         		throw e;
+         	}
+         }         
       }
       catch (Exception e)
       {
@@ -162,13 +185,11 @@
       }
       finally
       {
-         if (conn != null)
-         {
-            conn.close();
-         }
+         closeStatement(ps);
+         closeConnection(conn);
          wrap.end();
       }
-             
+         
       log.debug(this + " started");
    }
    
@@ -531,14 +552,12 @@
       		PreparedStatement psInsertReference = null;
       		PreparedStatement psInsertMessage = null;
       		
-      		List<Message> pagedMessages = new ArrayList<Message>();
-      		
       		try
       		{      		
 	      		Iterator iter = references.iterator();
 	
 	         	psInsertReference = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE_REF"));
-	         	psInsertMessage = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE"));
+	         	psInsertMessage = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE_CONDITIONAL"));
 	
 	         	while (iter.hasNext())
 	         	{
@@ -563,63 +582,19 @@
 	         		//Maybe we need to persist the message itself
 	         		Message m = ref.getMessage();
 	
-	         		synchronized (m)
-	         		{
-	         			if (!m.isPersisted())
-	         			{            	               
-	         				//The message might actually still already exist (despite pagedcount=0) due to it already being paged
-	         				//so we insert and ignore key violations
-	
-	         				storeMessage(m, psInsertMessage); 
-	
-	         				try
-	         				{
-	         					rows = psInsertMessage.executeUpdate();
-	         				}
-	         				catch (SQLException e)
-	         				{
-	         					if (e.getSQLState().equals("23000"))
-	         					{
-	         						//This is a primary key violation
-	         						//It might legitimately occur if the ref has been paged to two channels so it is not
-	         						//left in memory any more, then loaded by one channel
-	         						//The paged count would then be one, not two
-	         						if (trace) { log.trace("Primary key violation detected", e); }
-	         						
-	         						//When we retry we don't want to insert again, so set persisted to true
-	         						m.setPersisted(true);
-	         						
-	         						//Throw the exception so the tx is retried - the next time it won't try and insert the message
-	         						violationOk = true;
-	         					}
-         						
-         						throw e;
-	         				}
-	
-	         				if (trace) { log.trace("Inserted " + rows + " rows"); }	               
-	
-	         				m.setPersisted(true);
-	         				
-	         				pagedMessages.add(m);
-	         			}
-	         		}
+	         		//We always try and insert the message, even if it might already be paged -
+	         		//we use a conditional insert though, so it won't insert it if it already exists
+
+      				storeMessage(m, psInsertMessage); 
+      				psInsertMessage.setLong(9, m.getMessageID());
+
+      			   rows = psInsertMessage.executeUpdate();
+      				
+      				if (trace) { log.trace("Inserted " + rows + " rows"); }	               
 	         	} 
 	         	
 	         	return null;
       		}
-      		catch (Exception e)
-      		{
-      			//The tx will be rolled back
-      			//so we need to set the messages to not persisted
-      			for (Iterator iter = pagedMessages.iterator(); iter.hasNext(); )
-               {
-                  Message msg = (Message)iter.next();
-      			
-                  msg.setPersisted(false);
-      			}
-      			
-      			throw e;
-      		}
       		finally
       		{
       			closeStatement(psInsertReference);
@@ -642,8 +617,6 @@
    		{
 		      PreparedStatement psDeleteReference = null; 
 		      
-		      List<Message> depagedReferences = new ArrayList<Message>();
-		       
 		      try
 		      {	
 		         psDeleteReference = conn.prepareStatement(getSQLStatement("DELETE_MESSAGE_REF"));
@@ -658,29 +631,11 @@
 		            
 		            int rows = psDeleteReference.executeUpdate();
 		               
-		            if (trace) { log.trace("Deleted " + rows + " rows"); }		            
-		                          
-		            //There is a small possibility that the ref is depaged here, then paged again, before this flag is set
-		            //and the tx is committed so the message be attempted to be inserted twice but this should be ok
-		            //since we ignore key violations on message insert		      
-		            
-		            ref.getMessage().setPersisted(false);
-		            
-		            depagedReferences.add(ref.getMessage());
+		            if (trace) { log.trace("Deleted " + rows + " rows"); }		            		                          
 		         }         
 		         
 		         return null;
-		      }
-		      catch (Exception e)
-		      {
-		      	for (Iterator iter = depagedReferences.iterator(); iter.hasNext(); )
-               {
-                  Message msg = (Message)iter.next();
-      			
-                  msg.setPersisted(true);
-      			}
-		      	throw e;
-		      }
+		      }		      
 		      finally
 		      {
 		      	closeStatement(psDeleteReference);       
@@ -2023,6 +1978,7 @@
    protected Map getDefaultDDLStatements()
    {
       Map<String, String> map = new LinkedHashMap<String, String>();
+      map.put("CREATE_DUAL", "CREATE TABLE JBM_DUAL (DUMMY INTEGER)");
       //Message reference
       map.put("CREATE_MESSAGE_REFERENCE",
               "CREATE TABLE JBM_MSG_REF (CHANNEL_ID BIGINT, " +
@@ -2054,6 +2010,7 @@
    protected Map getDefaultDMLStatements()
    {                
       Map<String, String> map = new LinkedHashMap<String, String>();
+      map.put("INSERT_DUAL", "INSERT INTO JBM_DUAL VALUES (1) WHERE NOT EXISTS (SELECT * FROM JBM_DUAL)");
       //Message reference
       map.put("INSERT_MESSAGE_REF",
               "INSERT INTO JBM_MSG_REF (CHANNEL_ID, MESSAGE_ID, TRANSACTION_ID, STATE, ORD, PAGE_ORD, DELIVERY_COUNT, SCHED_DELIVERY) " +
@@ -2091,6 +2048,11 @@
               "INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, " +
               "TIMESTAMP, PRIORITY, HEADERS, PAYLOAD, TYPE) " +           
               "VALUES (?, ?, ?, ?, ?, ?, ?, ?)" );
+      map.put("INSERT_MESSAGE_CONDITIONAL",
+      		  "INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, " +
+              "TIMESTAMP, PRIORITY, HEADERS, PAYLOAD, TYPE) " +     
+              "SELECT (?, ?, ?, ?, ?, ?, ?, ?) " + 
+              "FROM JBM_DUAL WHERE NOT EXISTS (SELECT MESSAGE_ID FROM JBM_MSG WHERE MESSAGE_ID = ?)");	
       map.put("MESSAGE_ID_COLUMN", "MESSAGE_ID");
       map.put("MESSAGE_EXISTS", "SELECT MESSAGE_ID FROM JBM_MSG WHERE MESSAGE_ID = ?");
       map.put("REAP_MESSAGES", "DELETE FROM JBM_MSG WHERE TIMESTAMP <= ? AND NOT EXISTS (SELECT * FROM JBM_MSG_REF WHERE JBM_MSG_REF.MESSAGE_ID = JBM_MSG.MESSAGE_ID)");
@@ -2418,9 +2380,7 @@
    	Connection conn;
 
       TransactionWrapper wrap;
-      
-      boolean violationOk;
-      
+         
 		public Object execute() throws Exception
 		{
 	      wrap = new TransactionWrapper();
@@ -2460,26 +2420,18 @@
 	            return res;	            
 	         }
 	         catch (SQLException e)
-	         {
-	         	if (e.getSQLState().equals("23000") && violationOk)
-	         	{
-	         		//Primary key violation - this is ok - we retry immediately
-	         		violationOk = false;
-	         	}
-	         	else
-	         	{		         	
-		            log.warn("SQLException caught, SQLState " + e.getSQLState() + " code:" + e.getErrorCode() + "- assuming deadlock detected, try:" + (tries + 1), e);
-		            
-		            tries++;
-		            if (tries == MAX_TRIES)
-		            {
-		               log.error("Retried " + tries + " times, now giving up");
-		               throw new IllegalStateException("Failed to excecute transaction");
-		            }
-		            log.warn("Trying again after a pause");
-		            //Now we wait for a random amount of time to minimise risk of deadlock
-		            Thread.sleep((long)(Math.random() * 500));
-	         	}
+	         {       	
+	            log.warn("SQLException caught, SQLState " + e.getSQLState() + " code:" + e.getErrorCode() + "- assuming deadlock detected, try:" + (tries + 1), e);
+	            
+	            tries++;
+	            if (tries == MAX_TRIES)
+	            {
+	               log.error("Retried " + tries + " times, now giving up");
+	               throw new IllegalStateException("Failed to excecute transaction");
+	            }
+	            log.warn("Trying again after a pause");
+	            //Now we wait for a random amount of time to minimise risk of deadlock
+	            Thread.sleep((long)(Math.random() * 500));	         	
 	         }  
 	      }
 		}




More information about the jboss-cvs-commits mailing list