[jboss-cvs] JBoss Messaging SVN: r8231 - in branches/port1842: src/main/org/jboss/messaging/core/impl and 1 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Mar 2 07:04:54 EST 2011


Author: gaohoward
Date: 2011-03-02 07:04:54 -0500 (Wed, 02 Mar 2011)
New Revision: 8231

Modified:
   branches/port1842/integration/EAP4/etc/server/default/deploy/db2-persistence-service.xml
   branches/port1842/integration/EAP4/etc/server/default/deploy/mssql-persistence-service.xml
   branches/port1842/integration/EAP4/etc/server/default/deploy/mysql-persistence-service.xml
   branches/port1842/integration/EAP4/etc/server/default/deploy/ndb-persistence-service.xml
   branches/port1842/integration/EAP4/etc/server/default/deploy/oracle-persistence-service.xml
   branches/port1842/integration/EAP4/etc/server/default/deploy/postgresql-persistence-service.xml
   branches/port1842/integration/EAP4/etc/server/default/deploy/sybase-persistence-service.xml
   branches/port1842/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
   branches/port1842/src/main/org/jboss/messaging/core/impl/JDBCSupport.java
   branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
Log:
fix commit successful but response failed to reach jbm issue



Modified: branches/port1842/integration/EAP4/etc/server/default/deploy/db2-persistence-service.xml
===================================================================
--- branches/port1842/integration/EAP4/etc/server/default/deploy/db2-persistence-service.xml	2011-02-17 15:39:08 UTC (rev 8230)
+++ branches/port1842/integration/EAP4/etc/server/default/deploy/db2-persistence-service.xml	2011-03-02 12:04:54 UTC (rev 8231)
@@ -96,6 +96,7 @@
    UPDATE_MESSAGE_STATE=UPDATE JBM_MSG_REF SET STATE = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
    CLAIM_MESSAGE_IN_SUCK=UPDATE JBM_MSG_REF SET STATE='C' WHERE CHANNEL_ID = ? AND MESSAGE_ID = ? AND STATE='S'
    LOAD_REFS_IN_SUCK=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'S' AND CHANNEL_ID = ? ORDER BY ORD
+   LOAD_SINGLE_REFERENCE=SELECT STATE, CHANNEL_ID FROM JBM_MSG_REF WHERE MESSAGE_ID=?
       ]]></attribute>
 
       <!-- The maximum number of parameters to include in a prepared statement -->

Modified: branches/port1842/integration/EAP4/etc/server/default/deploy/mssql-persistence-service.xml
===================================================================
--- branches/port1842/integration/EAP4/etc/server/default/deploy/mssql-persistence-service.xml	2011-02-17 15:39:08 UTC (rev 8230)
+++ branches/port1842/integration/EAP4/etc/server/default/deploy/mssql-persistence-service.xml	2011-03-02 12:04:54 UTC (rev 8231)
@@ -92,6 +92,7 @@
    UPDATE_MESSAGE_STATE=UPDATE JBM_MSG_REF SET STATE = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
    CLAIM_MESSAGE_IN_SUCK=UPDATE JBM_MSG_REF SET STATE='C' WHERE CHANNEL_ID = ? AND MESSAGE_ID = ? AND STATE='S'
    LOAD_REFS_IN_SUCK=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'S' AND CHANNEL_ID = ? ORDER BY ORD
+   LOAD_SINGLE_REFERENCE=SELECT STATE, CHANNEL_ID FROM JBM_MSG_REF WHERE MESSAGE_ID=?
       ]]></attribute>
 
       <!-- The maximum number of parameters to include in a prepared statement -->

Modified: branches/port1842/integration/EAP4/etc/server/default/deploy/mysql-persistence-service.xml
===================================================================
--- branches/port1842/integration/EAP4/etc/server/default/deploy/mysql-persistence-service.xml	2011-02-17 15:39:08 UTC (rev 8230)
+++ branches/port1842/integration/EAP4/etc/server/default/deploy/mysql-persistence-service.xml	2011-03-02 12:04:54 UTC (rev 8231)
@@ -92,6 +92,7 @@
    UPDATE_MESSAGE_STATE=UPDATE JBM_MSG_REF SET STATE = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
    CLAIM_MESSAGE_IN_SUCK=UPDATE JBM_MSG_REF SET STATE='C' WHERE CHANNEL_ID = ? AND MESSAGE_ID = ? AND STATE='S'
    LOAD_REFS_IN_SUCK=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'S' AND CHANNEL_ID = ? ORDER BY ORD
+   LOAD_SINGLE_REFERENCE=SELECT STATE, CHANNEL_ID FROM JBM_MSG_REF WHERE MESSAGE_ID=?
    ]]></attribute>
 
       <!-- The maximum number of parameters to include in a prepared statement -->

Modified: branches/port1842/integration/EAP4/etc/server/default/deploy/ndb-persistence-service.xml
===================================================================
--- branches/port1842/integration/EAP4/etc/server/default/deploy/ndb-persistence-service.xml	2011-02-17 15:39:08 UTC (rev 8230)
+++ branches/port1842/integration/EAP4/etc/server/default/deploy/ndb-persistence-service.xml	2011-03-02 12:04:54 UTC (rev 8231)
@@ -92,6 +92,7 @@
    UPDATE_MESSAGE_STATE=UPDATE JBM_MSG_REF SET STATE = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
    CLAIM_MESSAGE_IN_SUCK=UPDATE JBM_MSG_REF SET STATE='C' WHERE CHANNEL_ID = ? AND MESSAGE_ID = ? AND STATE='S'
    LOAD_REFS_IN_SUCK=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'S' AND CHANNEL_ID = ? ORDER BY ORD
+   LOAD_SINGLE_REFERENCE=SELECT STATE, CHANNEL_ID FROM JBM_MSG_REF WHERE MESSAGE_ID=?
       ]]></attribute>
 
       <!-- The maximum number of parameters to include in a prepared statement -->

Modified: branches/port1842/integration/EAP4/etc/server/default/deploy/oracle-persistence-service.xml
===================================================================
--- branches/port1842/integration/EAP4/etc/server/default/deploy/oracle-persistence-service.xml	2011-02-17 15:39:08 UTC (rev 8230)
+++ branches/port1842/integration/EAP4/etc/server/default/deploy/oracle-persistence-service.xml	2011-03-02 12:04:54 UTC (rev 8231)
@@ -96,6 +96,7 @@
    UPDATE_MESSAGE_STATE=UPDATE JBM_MSG_REF SET STATE = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
    CLAIM_MESSAGE_IN_SUCK=UPDATE JBM_MSG_REF SET STATE='C' WHERE CHANNEL_ID = ? AND MESSAGE_ID = ? AND STATE='S'
    LOAD_REFS_IN_SUCK=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'S' AND CHANNEL_ID = ? ORDER BY ORD
+   LOAD_SINGLE_REFERENCE=SELECT STATE, CHANNEL_ID FROM JBM_MSG_REF WHERE MESSAGE_ID=?
       ]]></attribute>
 
       <!-- The maximum number of parameters to include in a prepared statement -->

Modified: branches/port1842/integration/EAP4/etc/server/default/deploy/postgresql-persistence-service.xml
===================================================================
--- branches/port1842/integration/EAP4/etc/server/default/deploy/postgresql-persistence-service.xml	2011-02-17 15:39:08 UTC (rev 8230)
+++ branches/port1842/integration/EAP4/etc/server/default/deploy/postgresql-persistence-service.xml	2011-03-02 12:04:54 UTC (rev 8231)
@@ -92,6 +92,7 @@
    UPDATE_MESSAGE_STATE=UPDATE JBM_MSG_REF SET STATE = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
    CLAIM_MESSAGE_IN_SUCK=UPDATE JBM_MSG_REF SET STATE='C' WHERE CHANNEL_ID = ? AND MESSAGE_ID = ? AND STATE='S'
    LOAD_REFS_IN_SUCK=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'S' AND CHANNEL_ID = ? ORDER BY ORD
+   LOAD_SINGLE_REFERENCE=SELECT STATE, CHANNEL_ID FROM JBM_MSG_REF WHERE MESSAGE_ID=?
       ]]></attribute>
 
       <!-- The maximum number of parameters to include in a prepared statement -->

Modified: branches/port1842/integration/EAP4/etc/server/default/deploy/sybase-persistence-service.xml
===================================================================
--- branches/port1842/integration/EAP4/etc/server/default/deploy/sybase-persistence-service.xml	2011-02-17 15:39:08 UTC (rev 8230)
+++ branches/port1842/integration/EAP4/etc/server/default/deploy/sybase-persistence-service.xml	2011-03-02 12:04:54 UTC (rev 8231)
@@ -97,6 +97,7 @@
    UPDATE_MESSAGE_STATE=UPDATE JBM_MSG_REF SET STATE = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
    CLAIM_MESSAGE_IN_SUCK=UPDATE JBM_MSG_REF SET STATE='C' WHERE CHANNEL_ID = ? AND MESSAGE_ID = ? AND STATE='S'
    LOAD_REFS_IN_SUCK=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'S' AND CHANNEL_ID = ? ORDER BY ORD
+   LOAD_SINGLE_REFERENCE=SELECT STATE, CHANNEL_ID FROM JBM_MSG_REF WHERE MESSAGE_ID=?
       ]]></attribute>
 
       <!-- The maximum number of parameters to include in a prepared statement -->

Modified: branches/port1842/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
===================================================================
--- branches/port1842/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java	2011-02-17 15:39:08 UTC (rev 8230)
+++ branches/port1842/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java	2011-03-02 12:04:54 UTC (rev 8231)
@@ -768,6 +768,22 @@
 
                return null;
             }
+            catch (SQLException e)
+            {
+               if (transactionDone)
+               {
+                  String sqlState = e.getSQLState();
+                  // 23000(XA) or 23xxx (SQL:2003)
+                  if (sqlState.startsWith("23"))
+                  {
+                     // this is a constraint violation. i.e. the record identified by the same key already exists
+                     // This is fine, log a warning and go ahead.
+                     log.warn("We encountered a after-commit problem however messages have already been inserted.");
+                     return null;
+                  }
+               }
+               throw e;
+            }
             finally
             {
                closeStatement(psInsertReference);
@@ -873,8 +889,13 @@
                }
 
                // Sanity check
-               if (rows != num) { throw new IllegalStateException(
-                     "Did not update correct number of rows"); }
+               if (rows != num)
+               {
+                  if (!transactionDone)
+                  {
+                     throw new IllegalStateException("Did not update correct number of rows");
+                  }
+               }
 
                return null;
             }
@@ -1631,6 +1652,22 @@
 
                return null;
             }
+            catch (SQLException e)
+            {
+               if (transactionDone)
+               {
+                  String sqlState = e.getSQLState();
+                  //23000(XA) or 23xxx (SQL:2003)
+                  if (sqlState.startsWith("23"))
+                  {
+                     //this is a constraint violation. i.e. the record identified by the same key already exists
+                     //This is fine, log a warning and go ahead.
+                     log.warn("We encountered a problem but the message " + message + " has already been inserted.");
+                     return null;
+                  }
+               }
+               throw e;
+            }
             finally
             {
                closeStatement(psReference);
@@ -1672,6 +1709,7 @@
          public Object doTransaction() throws Exception
          {
             PreparedStatement psReference = null;
+            PreparedStatement ps = null;
 
             try
             {
@@ -1690,8 +1728,32 @@
 
                if (rows == 0)
                {
-                  // no message updated, should be canceled back already
-                  throw new JMSException("Failed to move message " + ref.getMessage().getMessageID());
+                  //check if it's already sucked before a possible failure
+                  ps = conn.prepareStatement(getSQLStatement("LOAD_SINGLE_REFERENCE"));
+                  ps.setLong(1, ref.getMessage().getMessageID());
+                  
+                  ResultSet rs = ps.executeQuery();
+                  
+                  if (rs.next())
+                  {
+                     String state = rs.getString(1);
+                     long channelID = rs.getLong(2);
+                     
+                     if ((!"C".equals(state)) || channelID != destChannelID)
+                     {
+                        //the message didn't get moved!
+                        throw new JMSException("Failed to move message " + ref.getMessage().getMessageID());
+                     }
+                     else
+                     {
+                        log.info("Already moved message " + ref.getMessage().getMessageID());
+                     }
+                  }
+                  else
+                  {
+                     // no message updated, should be canceled back already
+                     throw new JMSException("Failed to move message " + ref.getMessage().getMessageID() + " probably canceled back to source channel.");
+                  }
                }
 
                return null;
@@ -1699,6 +1761,7 @@
             finally
             {
                closeStatement(psReference);
+               closeStatement(ps);
             }
          }
       }
@@ -2097,6 +2160,22 @@
 
                return null;
             }
+            catch (SQLException e)
+            {
+               if (transactionDone)
+               {
+                  String sqlState = e.getSQLState();
+                  //23000(XA) or 23xxx (SQL:2003)
+                  if (sqlState.startsWith("23"))
+                  {
+                     //this is a constraint violation. i.e. the record identified by the same key already exists
+                     //This is fine, log a warning and go ahead.
+                     log.warn("We encountered a after-commit problem however messages have already been inserted.");
+                     return null;
+                  }
+               }
+               throw e;
+            }
             finally
             {
                closeStatement(psReference);
@@ -2297,6 +2376,22 @@
 
                return null;
             }
+            catch (SQLException e)
+            {
+               if (transactionDone)
+               {
+                  String sqlState = e.getSQLState();
+                  //23000(XA) or 23xxx (SQL:2003)
+                  if (sqlState.startsWith("23"))
+                  {
+                     //this is a constraint violation. i.e. the record identified by the same key already exists
+                     //This is fine, log a warning and go ahead.
+                     log.warn("We encountered a after-commit problem however messages have already been inserted.");
+                     return null;
+                  }
+               }
+               throw e;
+            }
             finally
             {
                closeStatement(psReference);
@@ -3009,7 +3104,7 @@
       map.put("CLAIM_MESSAGE_IN_SUCK", "UPDATE JBM_MSG_REF SET STATE='C' WHERE CHANNEL_ID = ? AND MESSAGE_ID = ? AND STATE='S'");
       map.put("LOAD_REFS_IN_SUCK",
               "SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'S' AND CHANNEL_ID = ? ORDER BY ORD");
-
+      map.put("LOAD_SINGLE_REFERENCE", "SELECT STATE, CHANNEL_ID FROM JBM_MSG_REF WHERE MESSAGE_ID=?");
       return map;
    }
 
@@ -3378,6 +3473,7 @@
          public Object doTransaction() throws Exception
          {
             PreparedStatement psReference = null;
+            PreparedStatement ps = null;
 
             try
             {
@@ -3398,9 +3494,28 @@
 
                if (rows != 1)
                {
-                  throw new JMSException("Failed to update message " + ref.getMessage().getMessageID() +
-                                         " to state " +
-                                         c);
+                  ps = conn.prepareStatement(getSQLStatement("LOAD_SINGLE_REFERENCE"));
+                  ps.setLong(1, ref.getMessage().getMessageID());
+                  
+                  ResultSet rs = ps.executeQuery();
+                  
+                  if (rs.next())
+                  {
+                     String state = rs.getString(1);
+                     
+                     if (!c.equals(state))
+                     {
+                        throw new JMSException("Failed to update message " + ref.getMessage().getMessageID() + " to state " + c);
+                     }
+                     else
+                     {
+                        log.info("Already updated message " + ref.getMessage().getMessageID());
+                     }
+                  }
+                  else
+                  {
+                     throw new JMSException("Failed to update message " + ref.getMessage().getMessageID() + " to state " + c);
+                  }
                }
 
                return null;
@@ -3408,6 +3523,7 @@
             finally
             {
                closeStatement(psReference);
+               closeStatement(ps);
             }
          }
       }

Modified: branches/port1842/src/main/org/jboss/messaging/core/impl/JDBCSupport.java
===================================================================
--- branches/port1842/src/main/org/jboss/messaging/core/impl/JDBCSupport.java	2011-02-17 15:39:08 UTC (rev 8230)
+++ branches/port1842/src/main/org/jboss/messaging/core/impl/JDBCSupport.java	2011-03-02 12:04:54 UTC (rev 8231)
@@ -530,6 +530,8 @@
    protected abstract class JDBCTxRunner<T>
    {
    	protected Connection conn;
+      
+      protected boolean shouldFailCommit = false;
 
       private TransactionWrapper wrap;
 
@@ -618,6 +620,8 @@
    {
       protected Connection conn;
       
+      protected boolean transactionDone = false;
+      
       private boolean getConnectionFailed;
 
       private boolean getCommitFailed;
@@ -641,6 +645,8 @@
             
             T res = doTransaction();
             
+            transactionDone = true;
+            
             conn.commit();
             
             return res;

Modified: branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2011-02-17 15:39:08 UTC (rev 8230)
+++ branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2011-03-02 12:04:54 UTC (rev 8231)
@@ -24,6 +24,7 @@
 import java.io.Serializable;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
+import java.sql.SQLException;
 import java.sql.Timestamp;
 import java.sql.Types;
 import java.util.ArrayList;
@@ -3467,7 +3468,7 @@
    	{
    		return;
    	}
-      class InsertBindings extends JDBCTxRunner
+      class InsertBindings extends JDBCTxRunner2
       {
          public Object doTransaction() throws Exception
          {
@@ -3510,6 +3511,22 @@
 
                ps.executeUpdate();
             }
+            catch (SQLException e)
+            {
+               if (transactionDone)
+               {
+                  String sqlState = e.getSQLState();
+                  //23000(XA) or 23xxx (SQL:2003)
+                  if (sqlState.startsWith("23"))
+                  {
+                     //this is a constraint violation. i.e. the record identified by the same key already exists
+                     //This is fine, log a warning and go ahead.
+                     log.warn("We encountered a problem but the binding has already been inserted.");
+                     return null;
+                  }
+               }
+               throw e;
+            }
             finally
             {
                closeStatement(ps);



More information about the jboss-cvs-commits mailing list