[jboss-cvs] JBoss Messaging SVN: r3893 - in branches/Branch_Stable/src: etc/xmdesc and 2 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Mar 19 09:27:45 EDT 2008
Author: ataylor
Date: 2008-03-19 09:27:45 -0400 (Wed, 19 Mar 2008)
New Revision: 3893
Modified:
branches/Branch_Stable/src/etc/server/default/deploy/mysql-clustered-persistence-service.xml
branches/Branch_Stable/src/etc/xmdesc/JDBCPersistenceManager-xmbean.xml
branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/JDBCSupport.java
branches/Branch_Stable/src/main/org/jboss/messaging/core/jmx/JDBCPersistenceManagerService.java
Log:
added support for mysql cluster
Modified: branches/Branch_Stable/src/etc/server/default/deploy/mysql-clustered-persistence-service.xml
===================================================================
--- branches/Branch_Stable/src/etc/server/default/deploy/mysql-clustered-persistence-service.xml 2008-03-19 09:25:22 UTC (rev 3892)
+++ branches/Branch_Stable/src/etc/server/default/deploy/mysql-clustered-persistence-service.xml 2008-03-19 13:27:45 UTC (rev 3893)
@@ -93,6 +93,8 @@
<attribute name="MaxParams">500</attribute>
+ <!-- This -->
+ <attribute name="UseNDBFailoverStrategy">true</attribute>
</mbean>
<!-- Messaging Post Office MBean configuration
Modified: branches/Branch_Stable/src/etc/xmdesc/JDBCPersistenceManager-xmbean.xml
===================================================================
--- branches/Branch_Stable/src/etc/xmdesc/JDBCPersistenceManager-xmbean.xml 2008-03-19 09:25:22 UTC (rev 3892)
+++ branches/Branch_Stable/src/etc/xmdesc/JDBCPersistenceManager-xmbean.xml 2008-03-19 13:27:45 UTC (rev 3893)
@@ -64,6 +64,12 @@
<name>MaxParams</name>
<type>int</type>
</attribute>
+
+ <attribute access="read-write" getMethod="isUseNDBFailoverStrategy" setMethod="setUseNDBFailoverStrategy">
+ <description>This changes the retry strategy for the persistence manager. If failure occurs on commit, we ignore the second failure and assume that the insert happened correctly</description>
+ <name>UseNDBFailoverStrategy</name>
+ <type>boolean</type>
+ </attribute>
<attribute access="read-write" getMethod="isSupportsBlobOnSelect" setMethod="setSupportsBlobOnSelect">
<description>Some databases don't support binding blobs on select clauses</description>
Modified: branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java 2008-03-19 09:25:22 UTC (rev 3892)
+++ branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java 2008-03-19 13:27:45 UTC (rev 3893)
@@ -21,36 +21,6 @@
*/
package org.jboss.messaging.core.impl;
-import java.io.BufferedInputStream;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.InputStream;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.sql.Types;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Semaphore;
-
-import javax.sql.DataSource;
-import javax.transaction.TransactionManager;
-import javax.transaction.xa.Xid;
-
import org.jboss.jms.message.JBossMessage;
import org.jboss.jms.tx.MessagingXid;
import org.jboss.logging.Logger;
@@ -66,6 +36,15 @@
import org.jboss.messaging.util.StreamUtils;
import org.jboss.messaging.util.Util;
+import javax.sql.DataSource;
+import javax.transaction.TransactionManager;
+import javax.transaction.xa.Xid;
+import java.io.*;
+import java.sql.*;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Semaphore;
+
/**
* JDBC implementation of PersistenceManager
*
@@ -126,9 +105,9 @@
Properties sqlProperties, boolean createTablesOnStartup,
boolean usingBatchUpdates, boolean usingBinaryStream,
boolean usingTrailingByte, int maxParams, boolean supportsBlobSelect,
- boolean detectDuplicates, int idCacheSize)
+ boolean detectDuplicates, boolean useNDBFailoverStrategy, int idCacheSize)
{
- super(ds, tm, sqlProperties, createTablesOnStartup);
+ super(ds, tm, sqlProperties, createTablesOnStartup, useNDBFailoverStrategy);
// usingBatchUpdates is currently ignored due to sketchy support from
// databases
@@ -144,6 +123,7 @@
this.detectDuplicates = detectDuplicates;
this.idCacheSize = idCacheSize;
+
}
public JDBCPersistenceManager(DataSource ds, TransactionManager tm,
@@ -152,7 +132,7 @@
boolean usingTrailingByte, int maxParams, boolean supportsBlobSelect)
{
this (ds, tm, sqlProperties, createTablesOnStartup, usingBatchUpdates, usingBinaryStream, usingTrailingByte,
- maxParams, supportsBlobSelect, false, 0);
+ maxParams, supportsBlobSelect, false, false, 0);
}
// MessagingComponent overrides ---------------------------------
@@ -1327,12 +1307,14 @@
class AddReferenceRunner extends JDBCTxRunner2
{
+ private Message message;
+ private boolean messagePersisted = false;
public Object doTransaction() throws Exception
{
PreparedStatement psReference = null;
PreparedStatement psInsertMessage = null;
- Message m = ref.getMessage();
+ message = ref.getMessage();
try
{
@@ -1349,13 +1331,13 @@
log.trace("Inserted " + rows + " rows");
}
- if (!m.isPersisted())
+ if (!message.isPersisted())
{
// First time so persist the message
psInsertMessage = conn
.prepareStatement(getSQLStatement("INSERT_MESSAGE"));
- storeMessage(m, psInsertMessage, true);
+ storeMessage(message, psInsertMessage, true);
rows = psInsertMessage.executeUpdate();
if (trace)
@@ -1365,15 +1347,17 @@
log.trace("message Inserted/updated " + rows + " rows");
+
+
+ if (message instanceof JBossMessage)
+ {
+ cacheID(conn, ((JBossMessage) message).getJMSMessageID());
+ }
// Needs to be at the end - in case an exception is thrown in
// which case retry will be attempted and we want to insert it
// again
- m.setPersisted(true);
-
- if (m instanceof JBossMessage)
- {
- cacheID(conn, ((JBossMessage)m).getJMSMessageID());
- }
+ message.setPersisted(true);
+ messagePersisted = true;
}
return null;
@@ -1384,6 +1368,15 @@
closeStatement(psInsertMessage);
}
}
+
+ public void rollback()
+ {
+ if(messagePersisted)
+ {
+ messagePersisted = false;
+ message.setPersisted(false);
+ }
+ }
}
if (tx != null)
@@ -1711,6 +1704,8 @@
{
class HandleBeforeCommit1PCRunner extends JDBCTxRunner2
{
+ private List<Message> messagesStored;
+
public Object doTransaction() throws Exception
{
// For one phase we simply add rows corresponding to the refs and
@@ -1724,7 +1719,7 @@
PreparedStatement psInsertMessage = null;
PreparedStatement psDeleteReference = null;
- List<Message> messagesStored = new ArrayList<Message>();
+ messagesStored = new ArrayList<Message>();
try
{
@@ -1822,22 +1817,25 @@
return null;
}
- catch (Exception e)
+ finally
{
+ closeStatement(psReference);
+ closeStatement(psDeleteReference);
+ closeStatement(psInsertMessage);
+ }
+ }
+
+ public void rollback()
+ {
+ if(messagesStored != null)
+ {
for (Iterator i = messagesStored.iterator(); i.hasNext();)
{
Message msg = (Message) i.next();
msg.setPersisted(false);
}
- throw e;
}
- finally
- {
- closeStatement(psReference);
- closeStatement(psDeleteReference);
- closeStatement(psInsertMessage);
- }
}
}
@@ -1914,6 +1912,8 @@
{
class HandleBeforePrepareRunner extends JDBCTxRunner2
{
+ private List<Message> messagesStored;
+
public Object doTransaction() throws Exception
{
// We insert a tx record and
@@ -1924,7 +1924,7 @@
PreparedStatement psInsertMessage = null;
PreparedStatement psUpdateReference = null;
- List<Message> messagesStored = new ArrayList<Message>();
+ messagesStored = new ArrayList<Message>();
try
{
@@ -2020,22 +2020,25 @@
return null;
}
- catch (Exception e)
+ finally
{
+ closeStatement(psReference);
+ closeStatement(psInsertMessage);
+ closeStatement(psUpdateReference);
+ }
+ }
+
+ public void rollback()
+ {
+ if(messagesStored != null)
+ {
for (Iterator i = messagesStored.iterator(); i.hasNext();)
{
Message msg = (Message) i.next();
msg.setPersisted(false);
}
- throw e;
}
- finally
- {
- closeStatement(psReference);
- closeStatement(psInsertMessage);
- closeStatement(psUpdateReference);
- }
}
}
Modified: branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/JDBCSupport.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/JDBCSupport.java 2008-03-19 09:25:22 UTC (rev 3892)
+++ branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/JDBCSupport.java 2008-03-19 13:27:45 UTC (rev 3893)
@@ -21,25 +21,19 @@
*/
package org.jboss.messaging.core.impl;
-import java.sql.Connection;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.Properties;
+import org.jboss.logging.Logger;
+import org.jboss.messaging.core.contract.MessagingComponent;
import javax.sql.DataSource;
import javax.transaction.Status;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.*;
-import org.jboss.logging.Logger;
-import org.jboss.messaging.core.contract.MessagingComponent;
-import org.jboss.util.NestedSQLException;
-
/**
* Common functionality for messaging components that need to access a database.
*
@@ -64,6 +58,8 @@
private Map defaultDMLStatements;
private Map defaultDDLStatements;
+
+ private boolean useNDBFailoverStrategy = false;
private boolean createTablesOnStartup = true;
@@ -92,7 +88,14 @@
this.createTablesOnStartup = createTablesOnStartup;
}
-
+
+ public JDBCSupport(DataSource ds, TransactionManager tm, Properties sqlProperties,
+ boolean createTablesOnStartup, boolean useNDBFailoverStrategy)
+ {
+ this(ds, tm, sqlProperties, createTablesOnStartup);
+ this.useNDBFailoverStrategy = useNDBFailoverStrategy;
+ }
+
// MessagingComponent overrides ---------------------------------
public void start() throws Exception
@@ -447,6 +450,8 @@
private boolean getConnectionFailed;
+ private boolean getCommitFailed;
+
public T execute() throws Exception
{
Transaction tx = tm.suspend();
@@ -514,12 +519,22 @@
}
catch (SQLException e)
{
+ rollback();
+ if(getCommitFailed && useNDBFailoverStrategy)
+ {
+ log.warn("Ignoring SQL exception on retry after commit failed, SQLState " + e.getSQLState() + " code:" + e.getErrorCode() + "- assuming deadlock detected, try:" + (tries + 1), e);
+ return null;
+ }
if (getConnectionFailed)
{
//Do not retry - just throw the exception up
throw e;
}
-
+ //this is the xopen code that signifies acommit has failed with the state of the tx unknown.
+ if("08007".equals(e.getSQLState()) && useNDBFailoverStrategy)
+ {
+ getCommitFailed = true;
+ }
log.warn("SQLException caught, SQLState " + e.getSQLState() + " code:" + e.getErrorCode() + "- assuming deadlock detected, try:" + (tries + 1), e);
tries++;
@@ -535,6 +550,14 @@
}
}
+ /**
+ * allow work to be done if commit fails, implement this if you want to rollback your own work such as setting flags.
+ */
+ public void rollback()
+ {
+ //noop
+ }
+
public abstract T doTransaction() throws Exception;
}
Modified: branches/Branch_Stable/src/main/org/jboss/messaging/core/jmx/JDBCPersistenceManagerService.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/messaging/core/jmx/JDBCPersistenceManagerService.java 2008-03-19 09:25:22 UTC (rev 3892)
+++ branches/Branch_Stable/src/main/org/jboss/messaging/core/jmx/JDBCPersistenceManagerService.java 2008-03-19 13:27:45 UTC (rev 3893)
@@ -21,13 +21,13 @@
*/
package org.jboss.messaging.core.jmx;
-import javax.transaction.TransactionManager;
-
import org.jboss.messaging.core.contract.MessagingComponent;
import org.jboss.messaging.core.contract.PersistenceManager;
import org.jboss.messaging.core.impl.JDBCPersistenceManager;
import org.jboss.messaging.util.ExceptionUtil;
+import javax.transaction.TransactionManager;
+
/**
* A JDBCPersistenceManagerService
*
@@ -59,6 +59,8 @@
private int idCacheSize = 500;
+ private boolean useNDBFailoverStrategy = false;
+
// Constructors --------------------------------------------------------
public JDBCPersistenceManagerService()
@@ -91,7 +93,7 @@
new JDBCPersistenceManager(ds, tm, sqlProperties,
createTablesOnStartup, usingBatchUpdates,
usingBinaryStream, usingTrailingByte, maxParams,
- supportsBlobOnSelect, detectDuplicates, idCacheSize);
+ supportsBlobOnSelect, detectDuplicates, useNDBFailoverStrategy, idCacheSize);
persistenceManager.start();
@@ -148,6 +150,16 @@
this.maxParams = maxParams;
}
+ public boolean isUseNDBFailoverStrategy()
+ {
+ return useNDBFailoverStrategy;
+ }
+
+ public void setUseNDBFailoverStrategy(boolean useNDBFailoverStrategy)
+ {
+ this.useNDBFailoverStrategy = useNDBFailoverStrategy;
+ }
+
public boolean isUsingBinaryStream()
{
return usingBinaryStream;
More information about the jboss-cvs-commits
mailing list