[jboss-cvs] JBoss Messaging SVN: r3893 - in branches/Branch_Stable/src: etc/xmdesc and 2 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Mar 19 09:27:45 EDT 2008


Author: ataylor
Date: 2008-03-19 09:27:45 -0400 (Wed, 19 Mar 2008)
New Revision: 3893

Modified:
   branches/Branch_Stable/src/etc/server/default/deploy/mysql-clustered-persistence-service.xml
   branches/Branch_Stable/src/etc/xmdesc/JDBCPersistenceManager-xmbean.xml
   branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
   branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/JDBCSupport.java
   branches/Branch_Stable/src/main/org/jboss/messaging/core/jmx/JDBCPersistenceManagerService.java
Log:
added support for mysql cluster

Modified: branches/Branch_Stable/src/etc/server/default/deploy/mysql-clustered-persistence-service.xml
===================================================================
--- branches/Branch_Stable/src/etc/server/default/deploy/mysql-clustered-persistence-service.xml	2008-03-19 09:25:22 UTC (rev 3892)
+++ branches/Branch_Stable/src/etc/server/default/deploy/mysql-clustered-persistence-service.xml	2008-03-19 13:27:45 UTC (rev 3893)
@@ -93,6 +93,8 @@
 
       <attribute name="MaxParams">500</attribute>
 
+      <!-- This -->
+      <attribute name="UseNDBFailoverStrategy">true</attribute>
    </mbean>
 
    <!-- Messaging Post Office MBean configuration

Modified: branches/Branch_Stable/src/etc/xmdesc/JDBCPersistenceManager-xmbean.xml
===================================================================
--- branches/Branch_Stable/src/etc/xmdesc/JDBCPersistenceManager-xmbean.xml	2008-03-19 09:25:22 UTC (rev 3892)
+++ branches/Branch_Stable/src/etc/xmdesc/JDBCPersistenceManager-xmbean.xml	2008-03-19 13:27:45 UTC (rev 3893)
@@ -64,6 +64,12 @@
       <name>MaxParams</name>
       <type>int</type>
    </attribute>
+   
+   <attribute access="read-write" getMethod="isUseNDBFailoverStrategy" setMethod="setUseNDBFailoverStrategy">
+      <description>This changes the retry strategy for the persistence manager. If failure occurs on commit, we ignore the second failure and assume that the insert happened correctly</description>
+      <name>UseNDBFailoverStrategy</name>
+      <type>boolean</type>
+   </attribute>
 
    <attribute access="read-write" getMethod="isSupportsBlobOnSelect" setMethod="setSupportsBlobOnSelect">
       <description>Some databases don't support binding blobs on select clauses</description>

Modified: branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java	2008-03-19 09:25:22 UTC (rev 3892)
+++ branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java	2008-03-19 13:27:45 UTC (rev 3893)
@@ -21,36 +21,6 @@
  */
 package org.jboss.messaging.core.impl;
 
-import java.io.BufferedInputStream;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.InputStream;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.sql.Types;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Semaphore;
-
-import javax.sql.DataSource;
-import javax.transaction.TransactionManager;
-import javax.transaction.xa.Xid;
-
 import org.jboss.jms.message.JBossMessage;
 import org.jboss.jms.tx.MessagingXid;
 import org.jboss.logging.Logger;
@@ -66,6 +36,15 @@
 import org.jboss.messaging.util.StreamUtils;
 import org.jboss.messaging.util.Util;
 
+import javax.sql.DataSource;
+import javax.transaction.TransactionManager;
+import javax.transaction.xa.Xid;
+import java.io.*;
+import java.sql.*;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Semaphore;
+
 /**
  * JDBC implementation of PersistenceManager
  *
@@ -126,9 +105,9 @@
          Properties sqlProperties, boolean createTablesOnStartup,
          boolean usingBatchUpdates, boolean usingBinaryStream,
          boolean usingTrailingByte, int maxParams, boolean supportsBlobSelect,
-         boolean detectDuplicates, int idCacheSize)
+         boolean detectDuplicates, boolean useNDBFailoverStrategy, int idCacheSize)
    {
-      super(ds, tm, sqlProperties, createTablesOnStartup);
+      super(ds, tm, sqlProperties, createTablesOnStartup, useNDBFailoverStrategy);
 
       // usingBatchUpdates is currently ignored due to sketchy support from
       // databases
@@ -144,6 +123,7 @@
       this.detectDuplicates = detectDuplicates;
 
       this.idCacheSize = idCacheSize;
+
    }
 
    public JDBCPersistenceManager(DataSource ds, TransactionManager tm,
@@ -152,7 +132,7 @@
          boolean usingTrailingByte, int maxParams, boolean supportsBlobSelect)
    {
       this (ds, tm, sqlProperties, createTablesOnStartup, usingBatchUpdates, usingBinaryStream, usingTrailingByte,
-            maxParams, supportsBlobSelect, false, 0);
+            maxParams, supportsBlobSelect, false, false, 0);
    }
 
    // MessagingComponent overrides ---------------------------------
@@ -1327,12 +1307,14 @@
 
       class AddReferenceRunner extends JDBCTxRunner2
       {
+         private Message message;
+         private boolean messagePersisted = false;
          public Object doTransaction() throws Exception
          {
             PreparedStatement psReference = null;
             PreparedStatement psInsertMessage = null;
 
-            Message m = ref.getMessage();
+            message = ref.getMessage();
 
             try
             {
@@ -1349,13 +1331,13 @@
                   log.trace("Inserted " + rows + " rows");
                }
 
-               if (!m.isPersisted())
+               if (!message.isPersisted())
                {
                   // First time so persist the message
                   psInsertMessage = conn
                         .prepareStatement(getSQLStatement("INSERT_MESSAGE"));
 
-                  storeMessage(m, psInsertMessage, true);
+                  storeMessage(message, psInsertMessage, true);
                   rows = psInsertMessage.executeUpdate();
 
                   if (trace)
@@ -1365,15 +1347,17 @@
 
                   log.trace("message Inserted/updated " + rows + " rows");
 
+
+
+                  if (message instanceof JBossMessage)
+                  {
+                     cacheID(conn, ((JBossMessage) message).getJMSMessageID());
+                  }
                   // Needs to be at the end - in case an exception is thrown in
                   // which case retry will be attempted and we want to insert it
                   // again
-                  m.setPersisted(true);
-
-                  if (m instanceof JBossMessage)
-                  {
-                     cacheID(conn, ((JBossMessage)m).getJMSMessageID());
-                  }
+                  message.setPersisted(true);
+                  messagePersisted = true;
                }
 
                return null;
@@ -1384,6 +1368,15 @@
                closeStatement(psInsertMessage);
             }
          }
+
+         public void rollback()
+         {
+            if(messagePersisted)
+            {
+               messagePersisted = false;
+               message.setPersisted(false);
+            }
+         }
       }
 
       if (tx != null)
@@ -1711,6 +1704,8 @@
    {
       class HandleBeforeCommit1PCRunner extends JDBCTxRunner2
       {
+         private List<Message> messagesStored;
+
          public Object doTransaction() throws Exception
          {
             // For one phase we simply add rows corresponding to the refs and
@@ -1724,7 +1719,7 @@
             PreparedStatement psInsertMessage = null;
             PreparedStatement psDeleteReference = null;
 
-            List<Message> messagesStored = new ArrayList<Message>();
+            messagesStored = new ArrayList<Message>();
 
             try
             {
@@ -1822,22 +1817,25 @@
 
                return null;
             }
-            catch (Exception e)
+            finally
             {
+               closeStatement(psReference);
+               closeStatement(psDeleteReference);
+               closeStatement(psInsertMessage);
+            }
+         }
+
+         public void rollback()
+         {
+            if(messagesStored != null)
+            {
                for (Iterator i = messagesStored.iterator(); i.hasNext();)
                {
                   Message msg = (Message) i.next();
 
                   msg.setPersisted(false);
                }
-               throw e;
             }
-            finally
-            {
-               closeStatement(psReference);
-               closeStatement(psDeleteReference);
-               closeStatement(psInsertMessage);
-            }
          }
       }
 
@@ -1914,6 +1912,8 @@
    {
       class HandleBeforePrepareRunner extends JDBCTxRunner2
       {
+         private List<Message> messagesStored;
+
          public Object doTransaction() throws Exception
          {
             // We insert a tx record and
@@ -1924,7 +1924,7 @@
             PreparedStatement psInsertMessage = null;
             PreparedStatement psUpdateReference = null;
 
-            List<Message> messagesStored = new ArrayList<Message>();
+            messagesStored = new ArrayList<Message>();
 
             try
             {
@@ -2020,22 +2020,25 @@
 
                return null;
             }
-            catch (Exception e)
+            finally
             {
+               closeStatement(psReference);
+               closeStatement(psInsertMessage);
+               closeStatement(psUpdateReference);
+            }
+         }
+
+         public void rollback()
+         {
+            if(messagesStored != null)
+            {
                for (Iterator i = messagesStored.iterator(); i.hasNext();)
                {
                   Message msg = (Message) i.next();
 
                   msg.setPersisted(false);
                }
-               throw e;
             }
-            finally
-            {
-               closeStatement(psReference);
-               closeStatement(psInsertMessage);
-               closeStatement(psUpdateReference);
-            }
          }
       }
 

Modified: branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/JDBCSupport.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/JDBCSupport.java	2008-03-19 09:25:22 UTC (rev 3892)
+++ branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/JDBCSupport.java	2008-03-19 13:27:45 UTC (rev 3893)
@@ -21,25 +21,19 @@
  */
 package org.jboss.messaging.core.impl;
 
-import java.sql.Connection;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.Properties;
+import org.jboss.logging.Logger;
+import org.jboss.messaging.core.contract.MessagingComponent;
 
 import javax.sql.DataSource;
 import javax.transaction.Status;
 import javax.transaction.Transaction;
 import javax.transaction.TransactionManager;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.*;
 
-import org.jboss.logging.Logger;
-import org.jboss.messaging.core.contract.MessagingComponent;
-import org.jboss.util.NestedSQLException;
-
 /**
  * Common functionality for messaging components that need to access a database.
  *
@@ -64,6 +58,8 @@
    private Map defaultDMLStatements;
    
    private Map defaultDDLStatements;
+
+   private boolean useNDBFailoverStrategy = false;
        
    private boolean createTablesOnStartup = true;
       
@@ -92,7 +88,14 @@
       
       this.createTablesOnStartup = createTablesOnStartup;
    }
-      
+
+    public JDBCSupport(DataSource ds, TransactionManager tm, Properties sqlProperties,
+                      boolean createTablesOnStartup, boolean useNDBFailoverStrategy)
+   {
+      this(ds, tm, sqlProperties, createTablesOnStartup);
+      this.useNDBFailoverStrategy = useNDBFailoverStrategy;
+   }
+
    // MessagingComponent overrides ---------------------------------
    
    public void start() throws Exception
@@ -447,6 +450,8 @@
       
       private boolean getConnectionFailed;
 
+      private boolean getCommitFailed;
+
       public T execute() throws Exception
       {                    
          Transaction tx = tm.suspend();
@@ -514,12 +519,22 @@
             }
             catch (SQLException  e)
             {
+               rollback();
+               if(getCommitFailed && useNDBFailoverStrategy)
+               {
+                  log.warn("Ignoring SQL exception on retry after commit failed, SQLState " + e.getSQLState() + " code:" + e.getErrorCode() + "- assuming deadlock detected, try:" + (tries + 1), e);
+                  return null;
+               }
                if (getConnectionFailed)
                {
                   //Do not retry - just throw the exception up
                   throw e;
                }
-               
+               //this is the xopen code that signifies acommit has failed with the state of the tx unknown.
+               if("08007".equals(e.getSQLState()) && useNDBFailoverStrategy)
+               {
+                  getCommitFailed = true;
+               }
                log.warn("SQLException caught, SQLState " + e.getSQLState() + " code:" + e.getErrorCode() + "- assuming deadlock detected, try:" + (tries + 1), e);
 
                tries++;
@@ -535,6 +550,14 @@
          }
       }
 
+      /**
+       * allow work to be done if commit fails, implement this if you want to rollback your own work such as setting flags.
+       */
+      public void rollback()
+      {
+         //noop
+      }
+
       public abstract T doTransaction() throws Exception;
    }
 

Modified: branches/Branch_Stable/src/main/org/jboss/messaging/core/jmx/JDBCPersistenceManagerService.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/messaging/core/jmx/JDBCPersistenceManagerService.java	2008-03-19 09:25:22 UTC (rev 3892)
+++ branches/Branch_Stable/src/main/org/jboss/messaging/core/jmx/JDBCPersistenceManagerService.java	2008-03-19 13:27:45 UTC (rev 3893)
@@ -21,13 +21,13 @@
  */
 package org.jboss.messaging.core.jmx;
 
-import javax.transaction.TransactionManager;
-
 import org.jboss.messaging.core.contract.MessagingComponent;
 import org.jboss.messaging.core.contract.PersistenceManager;
 import org.jboss.messaging.core.impl.JDBCPersistenceManager;
 import org.jboss.messaging.util.ExceptionUtil;
 
+import javax.transaction.TransactionManager;
+
 /**
  * A JDBCPersistenceManagerService
  *
@@ -59,6 +59,8 @@
 
    private int idCacheSize = 500;
 
+   private boolean useNDBFailoverStrategy = false;
+
    // Constructors --------------------------------------------------------
 
    public JDBCPersistenceManagerService()
@@ -91,7 +93,7 @@
             new JDBCPersistenceManager(ds, tm, sqlProperties,
                                        createTablesOnStartup, usingBatchUpdates,
                                        usingBinaryStream, usingTrailingByte, maxParams,
-                                       supportsBlobOnSelect, detectDuplicates, idCacheSize);
+                                       supportsBlobOnSelect, detectDuplicates, useNDBFailoverStrategy, idCacheSize);
 
          persistenceManager.start();
 
@@ -148,6 +150,16 @@
       this.maxParams = maxParams;
    }
 
+   public boolean isUseNDBFailoverStrategy()
+   {
+      return useNDBFailoverStrategy;
+   }
+
+   public void setUseNDBFailoverStrategy(boolean useNDBFailoverStrategy)
+   {
+      this.useNDBFailoverStrategy = useNDBFailoverStrategy;
+   }
+
    public boolean isUsingBinaryStream()
    {
       return usingBinaryStream;




More information about the jboss-cvs-commits mailing list