[jboss-cvs] JBoss Messaging SVN: r8231 - in branches/port1842: src/main/org/jboss/messaging/core/impl and 1 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Mar 2 07:04:54 EST 2011
Author: gaohoward
Date: 2011-03-02 07:04:54 -0500 (Wed, 02 Mar 2011)
New Revision: 8231
Modified:
branches/port1842/integration/EAP4/etc/server/default/deploy/db2-persistence-service.xml
branches/port1842/integration/EAP4/etc/server/default/deploy/mssql-persistence-service.xml
branches/port1842/integration/EAP4/etc/server/default/deploy/mysql-persistence-service.xml
branches/port1842/integration/EAP4/etc/server/default/deploy/ndb-persistence-service.xml
branches/port1842/integration/EAP4/etc/server/default/deploy/oracle-persistence-service.xml
branches/port1842/integration/EAP4/etc/server/default/deploy/postgresql-persistence-service.xml
branches/port1842/integration/EAP4/etc/server/default/deploy/sybase-persistence-service.xml
branches/port1842/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
branches/port1842/src/main/org/jboss/messaging/core/impl/JDBCSupport.java
branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
Log:
fix commit successful but response failed to reach jbm issue
Modified: branches/port1842/integration/EAP4/etc/server/default/deploy/db2-persistence-service.xml
===================================================================
--- branches/port1842/integration/EAP4/etc/server/default/deploy/db2-persistence-service.xml 2011-02-17 15:39:08 UTC (rev 8230)
+++ branches/port1842/integration/EAP4/etc/server/default/deploy/db2-persistence-service.xml 2011-03-02 12:04:54 UTC (rev 8231)
@@ -96,6 +96,7 @@
UPDATE_MESSAGE_STATE=UPDATE JBM_MSG_REF SET STATE = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
CLAIM_MESSAGE_IN_SUCK=UPDATE JBM_MSG_REF SET STATE='C' WHERE CHANNEL_ID = ? AND MESSAGE_ID = ? AND STATE='S'
LOAD_REFS_IN_SUCK=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'S' AND CHANNEL_ID = ? ORDER BY ORD
+ LOAD_SINGLE_REFERENCE=SELECT STATE, CHANNEL_ID FROM JBM_MSG_REF WHERE MESSAGE_ID=?
]]></attribute>
<!-- The maximum number of parameters to include in a prepared statement -->
Modified: branches/port1842/integration/EAP4/etc/server/default/deploy/mssql-persistence-service.xml
===================================================================
--- branches/port1842/integration/EAP4/etc/server/default/deploy/mssql-persistence-service.xml 2011-02-17 15:39:08 UTC (rev 8230)
+++ branches/port1842/integration/EAP4/etc/server/default/deploy/mssql-persistence-service.xml 2011-03-02 12:04:54 UTC (rev 8231)
@@ -92,6 +92,7 @@
UPDATE_MESSAGE_STATE=UPDATE JBM_MSG_REF SET STATE = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
CLAIM_MESSAGE_IN_SUCK=UPDATE JBM_MSG_REF SET STATE='C' WHERE CHANNEL_ID = ? AND MESSAGE_ID = ? AND STATE='S'
LOAD_REFS_IN_SUCK=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'S' AND CHANNEL_ID = ? ORDER BY ORD
+ LOAD_SINGLE_REFERENCE=SELECT STATE, CHANNEL_ID FROM JBM_MSG_REF WHERE MESSAGE_ID=?
]]></attribute>
<!-- The maximum number of parameters to include in a prepared statement -->
Modified: branches/port1842/integration/EAP4/etc/server/default/deploy/mysql-persistence-service.xml
===================================================================
--- branches/port1842/integration/EAP4/etc/server/default/deploy/mysql-persistence-service.xml 2011-02-17 15:39:08 UTC (rev 8230)
+++ branches/port1842/integration/EAP4/etc/server/default/deploy/mysql-persistence-service.xml 2011-03-02 12:04:54 UTC (rev 8231)
@@ -92,6 +92,7 @@
UPDATE_MESSAGE_STATE=UPDATE JBM_MSG_REF SET STATE = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
CLAIM_MESSAGE_IN_SUCK=UPDATE JBM_MSG_REF SET STATE='C' WHERE CHANNEL_ID = ? AND MESSAGE_ID = ? AND STATE='S'
LOAD_REFS_IN_SUCK=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'S' AND CHANNEL_ID = ? ORDER BY ORD
+ LOAD_SINGLE_REFERENCE=SELECT STATE, CHANNEL_ID FROM JBM_MSG_REF WHERE MESSAGE_ID=?
]]></attribute>
<!-- The maximum number of parameters to include in a prepared statement -->
Modified: branches/port1842/integration/EAP4/etc/server/default/deploy/ndb-persistence-service.xml
===================================================================
--- branches/port1842/integration/EAP4/etc/server/default/deploy/ndb-persistence-service.xml 2011-02-17 15:39:08 UTC (rev 8230)
+++ branches/port1842/integration/EAP4/etc/server/default/deploy/ndb-persistence-service.xml 2011-03-02 12:04:54 UTC (rev 8231)
@@ -92,6 +92,7 @@
UPDATE_MESSAGE_STATE=UPDATE JBM_MSG_REF SET STATE = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
CLAIM_MESSAGE_IN_SUCK=UPDATE JBM_MSG_REF SET STATE='C' WHERE CHANNEL_ID = ? AND MESSAGE_ID = ? AND STATE='S'
LOAD_REFS_IN_SUCK=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'S' AND CHANNEL_ID = ? ORDER BY ORD
+ LOAD_SINGLE_REFERENCE=SELECT STATE, CHANNEL_ID FROM JBM_MSG_REF WHERE MESSAGE_ID=?
]]></attribute>
<!-- The maximum number of parameters to include in a prepared statement -->
Modified: branches/port1842/integration/EAP4/etc/server/default/deploy/oracle-persistence-service.xml
===================================================================
--- branches/port1842/integration/EAP4/etc/server/default/deploy/oracle-persistence-service.xml 2011-02-17 15:39:08 UTC (rev 8230)
+++ branches/port1842/integration/EAP4/etc/server/default/deploy/oracle-persistence-service.xml 2011-03-02 12:04:54 UTC (rev 8231)
@@ -96,6 +96,7 @@
UPDATE_MESSAGE_STATE=UPDATE JBM_MSG_REF SET STATE = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
CLAIM_MESSAGE_IN_SUCK=UPDATE JBM_MSG_REF SET STATE='C' WHERE CHANNEL_ID = ? AND MESSAGE_ID = ? AND STATE='S'
LOAD_REFS_IN_SUCK=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'S' AND CHANNEL_ID = ? ORDER BY ORD
+ LOAD_SINGLE_REFERENCE=SELECT STATE, CHANNEL_ID FROM JBM_MSG_REF WHERE MESSAGE_ID=?
]]></attribute>
<!-- The maximum number of parameters to include in a prepared statement -->
Modified: branches/port1842/integration/EAP4/etc/server/default/deploy/postgresql-persistence-service.xml
===================================================================
--- branches/port1842/integration/EAP4/etc/server/default/deploy/postgresql-persistence-service.xml 2011-02-17 15:39:08 UTC (rev 8230)
+++ branches/port1842/integration/EAP4/etc/server/default/deploy/postgresql-persistence-service.xml 2011-03-02 12:04:54 UTC (rev 8231)
@@ -92,6 +92,7 @@
UPDATE_MESSAGE_STATE=UPDATE JBM_MSG_REF SET STATE = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
CLAIM_MESSAGE_IN_SUCK=UPDATE JBM_MSG_REF SET STATE='C' WHERE CHANNEL_ID = ? AND MESSAGE_ID = ? AND STATE='S'
LOAD_REFS_IN_SUCK=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'S' AND CHANNEL_ID = ? ORDER BY ORD
+ LOAD_SINGLE_REFERENCE=SELECT STATE, CHANNEL_ID FROM JBM_MSG_REF WHERE MESSAGE_ID=?
]]></attribute>
<!-- The maximum number of parameters to include in a prepared statement -->
Modified: branches/port1842/integration/EAP4/etc/server/default/deploy/sybase-persistence-service.xml
===================================================================
--- branches/port1842/integration/EAP4/etc/server/default/deploy/sybase-persistence-service.xml 2011-02-17 15:39:08 UTC (rev 8230)
+++ branches/port1842/integration/EAP4/etc/server/default/deploy/sybase-persistence-service.xml 2011-03-02 12:04:54 UTC (rev 8231)
@@ -97,6 +97,7 @@
UPDATE_MESSAGE_STATE=UPDATE JBM_MSG_REF SET STATE = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
CLAIM_MESSAGE_IN_SUCK=UPDATE JBM_MSG_REF SET STATE='C' WHERE CHANNEL_ID = ? AND MESSAGE_ID = ? AND STATE='S'
LOAD_REFS_IN_SUCK=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'S' AND CHANNEL_ID = ? ORDER BY ORD
+ LOAD_SINGLE_REFERENCE=SELECT STATE, CHANNEL_ID FROM JBM_MSG_REF WHERE MESSAGE_ID=?
]]></attribute>
<!-- The maximum number of parameters to include in a prepared statement -->
Modified: branches/port1842/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
===================================================================
--- branches/port1842/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java 2011-02-17 15:39:08 UTC (rev 8230)
+++ branches/port1842/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java 2011-03-02 12:04:54 UTC (rev 8231)
@@ -768,6 +768,22 @@
return null;
}
+ catch (SQLException e)
+ {
+ if (transactionDone)
+ {
+ String sqlState = e.getSQLState();
+ // 23000(XA) or 23xxx (SQL:2003)
+ if (sqlState.startsWith("23"))
+ {
+ // this is a constraint violation. i.e. the record identified by the same key already exists
+ // This is fine, log a warning and go ahead.
+ log.warn("We encountered a after-commit problem however messages have already been inserted.");
+ return null;
+ }
+ }
+ throw e;
+ }
finally
{
closeStatement(psInsertReference);
@@ -873,8 +889,13 @@
}
// Sanity check
- if (rows != num) { throw new IllegalStateException(
- "Did not update correct number of rows"); }
+ if (rows != num)
+ {
+ if (!transactionDone)
+ {
+ throw new IllegalStateException("Did not update correct number of rows");
+ }
+ }
return null;
}
@@ -1631,6 +1652,22 @@
return null;
}
+ catch (SQLException e)
+ {
+ if (transactionDone)
+ {
+ String sqlState = e.getSQLState();
+ //23000(XA) or 23xxx (SQL:2003)
+ if (sqlState.startsWith("23"))
+ {
+ //this is a constraint violation. i.e. the record identified by the same key already exists
+ //This is fine, log a warning and go ahead.
+ log.warn("We encountered a problem but the message " + message + " has already been inserted.");
+ return null;
+ }
+ }
+ throw e;
+ }
finally
{
closeStatement(psReference);
@@ -1672,6 +1709,7 @@
public Object doTransaction() throws Exception
{
PreparedStatement psReference = null;
+ PreparedStatement ps = null;
try
{
@@ -1690,8 +1728,32 @@
if (rows == 0)
{
- // no message updated, should be canceled back already
- throw new JMSException("Failed to move message " + ref.getMessage().getMessageID());
+ //check if it's already sucked before a possible failure
+ ps = conn.prepareStatement(getSQLStatement("LOAD_SINGLE_REFERENCE"));
+ ps.setLong(1, ref.getMessage().getMessageID());
+
+ ResultSet rs = ps.executeQuery();
+
+ if (rs.next())
+ {
+ String state = rs.getString(1);
+ long channelID = rs.getLong(2);
+
+ if ((!"C".equals(state)) || channelID != destChannelID)
+ {
+ //the message didn't get moved!
+ throw new JMSException("Failed to move message " + ref.getMessage().getMessageID());
+ }
+ else
+ {
+ log.info("Already moved message " + ref.getMessage().getMessageID());
+ }
+ }
+ else
+ {
+ // no message updated, should be canceled back already
+ throw new JMSException("Failed to move message " + ref.getMessage().getMessageID() + " probably canceled back to source channel.");
+ }
}
return null;
@@ -1699,6 +1761,7 @@
finally
{
closeStatement(psReference);
+ closeStatement(ps);
}
}
}
@@ -2097,6 +2160,22 @@
return null;
}
+ catch (SQLException e)
+ {
+ if (transactionDone)
+ {
+ String sqlState = e.getSQLState();
+ //23000(XA) or 23xxx (SQL:2003)
+ if (sqlState.startsWith("23"))
+ {
+ //this is a constraint violation. i.e. the record identified by the same key already exists
+ //This is fine, log a warning and go ahead.
+ log.warn("We encountered a after-commit problem however messages have already been inserted.");
+ return null;
+ }
+ }
+ throw e;
+ }
finally
{
closeStatement(psReference);
@@ -2297,6 +2376,22 @@
return null;
}
+ catch (SQLException e)
+ {
+ if (transactionDone)
+ {
+ String sqlState = e.getSQLState();
+ //23000(XA) or 23xxx (SQL:2003)
+ if (sqlState.startsWith("23"))
+ {
+ //this is a constraint violation. i.e. the record identified by the same key already exists
+ //This is fine, log a warning and go ahead.
+ log.warn("We encountered a after-commit problem however messages have already been inserted.");
+ return null;
+ }
+ }
+ throw e;
+ }
finally
{
closeStatement(psReference);
@@ -3009,7 +3104,7 @@
map.put("CLAIM_MESSAGE_IN_SUCK", "UPDATE JBM_MSG_REF SET STATE='C' WHERE CHANNEL_ID = ? AND MESSAGE_ID = ? AND STATE='S'");
map.put("LOAD_REFS_IN_SUCK",
"SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'S' AND CHANNEL_ID = ? ORDER BY ORD");
-
+ map.put("LOAD_SINGLE_REFERENCE", "SELECT STATE, CHANNEL_ID FROM JBM_MSG_REF WHERE MESSAGE_ID=?");
return map;
}
@@ -3378,6 +3473,7 @@
public Object doTransaction() throws Exception
{
PreparedStatement psReference = null;
+ PreparedStatement ps = null;
try
{
@@ -3398,9 +3494,28 @@
if (rows != 1)
{
- throw new JMSException("Failed to update message " + ref.getMessage().getMessageID() +
- " to state " +
- c);
+ ps = conn.prepareStatement(getSQLStatement("LOAD_SINGLE_REFERENCE"));
+ ps.setLong(1, ref.getMessage().getMessageID());
+
+ ResultSet rs = ps.executeQuery();
+
+ if (rs.next())
+ {
+ String state = rs.getString(1);
+
+ if (!c.equals(state))
+ {
+ throw new JMSException("Failed to update message " + ref.getMessage().getMessageID() + " to state " + c);
+ }
+ else
+ {
+ log.info("Already updated message " + ref.getMessage().getMessageID());
+ }
+ }
+ else
+ {
+ throw new JMSException("Failed to update message " + ref.getMessage().getMessageID() + " to state " + c);
+ }
}
return null;
@@ -3408,6 +3523,7 @@
finally
{
closeStatement(psReference);
+ closeStatement(ps);
}
}
}
Modified: branches/port1842/src/main/org/jboss/messaging/core/impl/JDBCSupport.java
===================================================================
--- branches/port1842/src/main/org/jboss/messaging/core/impl/JDBCSupport.java 2011-02-17 15:39:08 UTC (rev 8230)
+++ branches/port1842/src/main/org/jboss/messaging/core/impl/JDBCSupport.java 2011-03-02 12:04:54 UTC (rev 8231)
@@ -530,6 +530,8 @@
protected abstract class JDBCTxRunner<T>
{
protected Connection conn;
+
+ protected boolean shouldFailCommit = false;
private TransactionWrapper wrap;
@@ -618,6 +620,8 @@
{
protected Connection conn;
+ protected boolean transactionDone = false;
+
private boolean getConnectionFailed;
private boolean getCommitFailed;
@@ -641,6 +645,8 @@
T res = doTransaction();
+ transactionDone = true;
+
conn.commit();
return res;
Modified: branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java 2011-02-17 15:39:08 UTC (rev 8230)
+++ branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java 2011-03-02 12:04:54 UTC (rev 8231)
@@ -24,6 +24,7 @@
import java.io.Serializable;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
+import java.sql.SQLException;
import java.sql.Timestamp;
import java.sql.Types;
import java.util.ArrayList;
@@ -3467,7 +3468,7 @@
{
return;
}
- class InsertBindings extends JDBCTxRunner
+ class InsertBindings extends JDBCTxRunner2
{
public Object doTransaction() throws Exception
{
@@ -3510,6 +3511,22 @@
ps.executeUpdate();
}
+ catch (SQLException e)
+ {
+ if (transactionDone)
+ {
+ String sqlState = e.getSQLState();
+ //23000(XA) or 23xxx (SQL:2003)
+ if (sqlState.startsWith("23"))
+ {
+ //this is a constraint violation. i.e. the record identified by the same key already exists
+ //This is fine, log a warning and go ahead.
+ log.warn("We encountered a problem but the binding has already been inserted.");
+ return null;
+ }
+ }
+ throw e;
+ }
finally
{
closeStatement(ps);
More information about the jboss-cvs-commits
mailing list