[jboss-cvs] JBoss Messaging SVN: r3106 - in trunk/src/main/org/jboss/messaging/core/impl: postoffice and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Sep 14 21:43:50 EDT 2007
Author: clebert.suconic at jboss.com
Date: 2007-09-14 21:43:50 -0400 (Fri, 14 Sep 2007)
New Revision: 3106
Modified:
trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
trunk/src/main/org/jboss/messaging/core/impl/JDBCSupport.java
trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
Log:
Encapsulating MessagingPostOffice's statements on Retry blocks as well
(these were failing on MySQL Cluster under certain circustances)
Modified: trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java 2007-09-14 23:22:25 UTC (rev 3105)
+++ trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java 2007-09-15 01:43:50 UTC (rev 3106)
@@ -386,9 +386,9 @@
throw new IllegalArgumentException("block size must be > 0");
}
- class ReserveIDBlockRunner extends JDBCTxRunner
+ class ReserveIDBlockRunner extends JDBCTxRunner<Long>
{
- public Object doTransaction() throws Exception
+ public Long doTransaction() throws Exception
{
// For the clustered case - this MUST use SELECT .. FOR UPDATE or a similar
//construct the locks the row
@@ -456,7 +456,7 @@
}
}
- return (Long)new ReserveIDBlockRunner().executeWithRetry();
+ return new ReserveIDBlockRunner().executeWithRetry();
}
/*
@@ -2502,69 +2502,4 @@
}
}
- private abstract class JDBCTxRunner
- {
- private static final int MAX_TRIES = 25;
-
- Connection conn;
-
- TransactionWrapper wrap;
-
- public Object execute() throws Exception
- {
- wrap = new TransactionWrapper();
-
- try
- {
- conn = ds.getConnection();
-
- return doTransaction();
- }
- catch (Exception e)
- {
- wrap.exceptionOccurred();
- throw e;
- }
- finally
- {
- closeConnection(conn);
- wrap.end();
- }
- }
-
- public Object executeWithRetry() throws Exception
- {
- int tries = 0;
-
- while (true)
- {
- try
- {
- Object res = execute();
-
- if (tries > 0)
- {
- log.warn("Update worked after retry");
- }
- return res;
- }
- catch (SQLException e)
- {
- log.warn("SQLException caught, SQLState " + e.getSQLState() + " code:" + e.getErrorCode() + "- assuming deadlock detected, try:" + (tries + 1), e);
-
- tries++;
- if (tries == MAX_TRIES)
- {
- log.error("Retried " + tries + " times, now giving up");
- throw new IllegalStateException("Failed to excecute transaction");
- }
- log.warn("Trying again after a pause");
- //Now we wait for a random amount of time to minimise risk of deadlock
- Thread.sleep((long)(Math.random() * 500));
- }
- }
- }
-
- public abstract Object doTransaction() throws Exception;
- }
}
Modified: trunk/src/main/org/jboss/messaging/core/impl/JDBCSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/JDBCSupport.java 2007-09-14 23:22:25 UTC (rev 3105)
+++ trunk/src/main/org/jboss/messaging/core/impl/JDBCSupport.java 2007-09-15 01:43:50 UTC (rev 3106)
@@ -24,6 +24,7 @@
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.Statement;
+import java.sql.SQLException;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
@@ -364,4 +365,71 @@
return failed;
}
}
+
+ protected abstract class JDBCTxRunner<T>
+ {
+ private static final int MAX_TRIES = 25;
+
+ protected Connection conn;
+
+ private TransactionWrapper wrap;
+
+ public T execute() throws Exception
+ {
+ wrap = new TransactionWrapper();
+
+ try
+ {
+ conn = ds.getConnection();
+
+ return doTransaction();
+ }
+ catch (Exception e)
+ {
+ wrap.exceptionOccurred();
+ throw e;
+ }
+ finally
+ {
+ closeConnection(conn);
+ wrap.end();
+ }
+ }
+
+ public T executeWithRetry() throws Exception
+ {
+ int tries = 0;
+
+ while (true)
+ {
+ try
+ {
+ T res = execute();
+
+ if (tries > 0)
+ {
+ log.warn("Update worked after retry");
+ }
+ return res;
+ }
+ catch (SQLException e)
+ {
+ log.warn("SQLException caught, SQLState " + e.getSQLState() + " code:" + e.getErrorCode() + "- assuming deadlock detected, try:" + (tries + 1), e);
+
+ tries++;
+ if (tries == MAX_TRIES)
+ {
+ log.error("Retried " + tries + " times, now giving up");
+ throw new IllegalStateException("Failed to excecute transaction");
+ }
+ log.warn("Trying again after a pause");
+ //Now we wait for a random amount of time to minimise risk of deadlock
+ Thread.sleep((long)(Math.random() * 500));
+ }
+ }
+ }
+
+ public abstract T doTransaction() throws Exception;
+ }
+
}
Modified: trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java 2007-09-14 23:22:25 UTC (rev 3105)
+++ trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java 2007-09-15 01:43:50 UTC (rev 3106)
@@ -2373,80 +2373,75 @@
private Map getBindingsFromStorage() throws Exception
{
- Connection conn = null;
- PreparedStatement ps = null;
- ResultSet rs = null;
- TransactionWrapper wrap = new TransactionWrapper();
-
- Map bindings = new HashMap();
-
- try
+ class LoadBindings extends JDBCTxRunner<Map>
{
- conn = ds.getConnection();
+ public Map doTransaction() throws Exception
+ {
+ PreparedStatement ps = null;
+ ResultSet rs = null;
- ps = conn.prepareStatement(getSQLStatement("LOAD_BINDINGS"));
+ Map bindings = new HashMap();
- ps.setString(1, officeName);
-
- ps.setInt(2, thisNodeID);
+ try
+ {
+ ps = conn.prepareStatement(getSQLStatement("LOAD_BINDINGS"));
- rs = ps.executeQuery();
+ ps.setString(1, officeName);
- while (rs.next())
- {
- String queueName = rs.getString(1);
- String conditionText = rs.getString(2);
- String selector = rs.getString(3);
+ ps.setInt(2, thisNodeID);
- if (rs.wasNull())
- {
- selector = null;
- }
+ rs = ps.executeQuery();
- long channelID = rs.getLong(4);
-
- boolean bindingClustered = rs.getString(5).equals("Y");
-
- boolean allNodes = rs.getString(6).equals("Y");
-
- //If the node is not clustered then we load the bindings as non clustered
-
- Filter filter = null;
-
- if (selector != null)
+ while (rs.next())
+ {
+ String queueName = rs.getString(1);
+ String conditionText = rs.getString(2);
+ String selector = rs.getString(3);
+
+ if (rs.wasNull())
+ {
+ selector = null;
+ }
+
+ long channelID = rs.getLong(4);
+
+ boolean bindingClustered = rs.getString(5).equals("Y");
+
+ boolean allNodes = rs.getString(6).equals("Y");
+
+ //If the node is not clustered then we load the bindings as non clustered
+
+ Filter filter = null;
+
+ if (selector != null)
+ {
+ filter = filterFactory.createFilter(selector);
+ }
+
+ Queue queue = new MessagingQueue(thisNodeID, queueName, channelID, ms, pm,
+ true, filter, bindingClustered && clustered);
+
+ if (trace) { log.trace(this + " loaded binding from storage: " + queueName); }
+
+ Condition condition = conditionFactory.createCondition(conditionText);
+
+ Binding binding = new Binding(condition, queue, allNodes);
+
+ bindings.put(queueName, binding);
+ }
+
+ return bindings;
+ }
+ finally
{
- filter = filterFactory.createFilter(selector);
+ closeResultSet(rs);
+
+ closeStatement(ps);
}
-
- Queue queue = new MessagingQueue(thisNodeID, queueName, channelID, ms, pm,
- true, filter, bindingClustered && clustered);
-
- if (trace) { log.trace(this + " loaded binding from storage: " + queueName); }
-
- Condition condition = conditionFactory.createCondition(conditionText);
-
- Binding binding = new Binding(condition, queue, allNodes);
-
- bindings.put(queueName, binding);
}
-
- return bindings;
}
- catch (Exception e)
- {
- wrap.exceptionOccurred();
- throw e;
- }
- finally
- {
- closeResultSet(rs);
-
- closeStatement(ps);
-
- closeConnection(conn);
- wrap.end();
- }
+ return new LoadBindings().executeWithRetry();
}
private void loadBindings() throws Exception
@@ -2483,95 +2478,91 @@
}
- private void insertBindingInStorage(Condition condition, Queue queue, boolean allNodes) throws Exception
+ private void insertBindingInStorage(final Condition condition, final Queue queue, final boolean allNodes) throws Exception
{
- Connection conn = null;
- PreparedStatement ps = null;
- TransactionWrapper wrap = new TransactionWrapper();
-
- try
+ class InsertBindings extends JDBCTxRunner
{
- conn = ds.getConnection();
+ public Object doTransaction() throws Exception
+ {
+ PreparedStatement ps = null;
- ps = conn.prepareStatement(getSQLStatement("INSERT_BINDING"));
+ try
+ {
+ ps = conn.prepareStatement(getSQLStatement("INSERT_BINDING"));
- ps.setString(1, officeName);
- ps.setInt(2, thisNodeID);
- ps.setString(3, queue.getName());
- ps.setString(4, condition.toText());
- String filterString = queue.getFilter() != null ? queue.getFilter().getFilterString() : null;
- if (filterString != null)
- {
- ps.setString(5, filterString);
- }
- else
- {
- ps.setNull(5, Types.VARCHAR);
- }
- ps.setLong(6, queue.getChannelID());
- if (queue.isClustered())
- {
- ps.setString(7, "Y");
- }
- else
- {
- ps.setString(7, "N");
- }
- if (allNodes)
- {
- ps.setString(8, "Y");
- }
- else
- {
- ps.setString(8, "N");
- }
+ ps.setString(1, officeName);
+ ps.setInt(2, thisNodeID);
+ ps.setString(3, queue.getName());
+ ps.setString(4, condition.toText());
+ String filterString = queue.getFilter() != null ? queue.getFilter().getFilterString() : null;
+ if (filterString != null)
+ {
+ ps.setString(5, filterString);
+ }
+ else
+ {
+ ps.setNull(5, Types.VARCHAR);
+ }
+ ps.setLong(6, queue.getChannelID());
+ if (queue.isClustered())
+ {
+ ps.setString(7, "Y");
+ }
+ else
+ {
+ ps.setString(7, "N");
+ }
+ if (allNodes)
+ {
+ ps.setString(8, "Y");
+ }
+ else
+ {
+ ps.setString(8, "N");
+ }
- ps.executeUpdate();
+ ps.executeUpdate();
+ }
+ finally
+ {
+ closeStatement(ps);
+ }
+
+ return null;
+ }
}
- catch (Exception e)
- {
- wrap.exceptionOccurred();
- throw e;
- }
- finally
- {
- closeStatement(ps);
- closeConnection(conn);
- wrap.end();
- }
+
+ new InsertBindings().executeWithRetry();
}
- private boolean deleteBindingFromStorage(Queue queue) throws Exception
+ private boolean deleteBindingFromStorage(final Queue queue) throws Exception
{
- Connection conn = null;
- PreparedStatement ps = null;
- TransactionWrapper wrap = new TransactionWrapper();
-
- try
+ class DeleteBindings extends JDBCTxRunner<Boolean>
{
- conn = ds.getConnection();
+ public Boolean doTransaction() throws Exception
+ {
+ PreparedStatement ps = null;
- ps = conn.prepareStatement(getSQLStatement("DELETE_BINDING"));
+ try
+ {
+ ps = conn.prepareStatement(getSQLStatement("DELETE_BINDING"));
- ps.setString(1, officeName);
- ps.setInt(2, queue.getNodeID());
- ps.setString(3, queue.getName());
+ ps.setString(1, officeName);
+ ps.setInt(2, queue.getNodeID());
+ ps.setString(3, queue.getName());
- int rows = ps.executeUpdate();
+ int rows = ps.executeUpdate();
- return rows == 1;
+ return rows == 1;
+ }
+ finally
+ {
+ closeStatement(ps);
+ }
+ }
}
- catch (Exception e)
- {
- wrap.exceptionOccurred();
- throw e;
- }
- finally
- {
- closeStatement(ps);
- closeConnection(conn);
- wrap.end();
- }
+
+ return new DeleteBindings().executeWithRetry();
}
private boolean leaveMessageReceived(Integer nodeId) throws Exception
More information about the jboss-cvs-commits
mailing list