[jboss-cvs] JBoss Messaging SVN: r8232 - in branches/JBM1842: src/main/org/jboss/messaging/core/impl and 1 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Mar 2 07:10:50 EST 2011


Author: gaohoward
Date: 2011-03-02 07:10:49 -0500 (Wed, 02 Mar 2011)
New Revision: 8232

Modified:
   branches/JBM1842/integration/EAP4/etc/server/default/deploy/db2-persistence-service.xml
   branches/JBM1842/integration/EAP4/etc/server/default/deploy/mssql-persistence-service.xml
   branches/JBM1842/integration/EAP4/etc/server/default/deploy/mysql-persistence-service.xml
   branches/JBM1842/integration/EAP4/etc/server/default/deploy/ndb-persistence-service.xml
   branches/JBM1842/integration/EAP4/etc/server/default/deploy/oracle-persistence-service.xml
   branches/JBM1842/integration/EAP4/etc/server/default/deploy/postgresql-persistence-service.xml
   branches/JBM1842/integration/EAP4/etc/server/default/deploy/sybase-persistence-service.xml
   branches/JBM1842/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
   branches/JBM1842/src/main/org/jboss/messaging/core/impl/JDBCSupport.java
   branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
Log:
fix jdbc commit successful but response fails to reach jbm issue



Modified: branches/JBM1842/integration/EAP4/etc/server/default/deploy/db2-persistence-service.xml
===================================================================
--- branches/JBM1842/integration/EAP4/etc/server/default/deploy/db2-persistence-service.xml	2011-03-02 12:04:54 UTC (rev 8231)
+++ branches/JBM1842/integration/EAP4/etc/server/default/deploy/db2-persistence-service.xml	2011-03-02 12:10:49 UTC (rev 8232)
@@ -96,6 +96,7 @@
    UPDATE_MESSAGE_STATE=UPDATE JBM_MSG_REF SET STATE = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
    CLAIM_MESSAGE_IN_SUCK=UPDATE JBM_MSG_REF SET STATE='C' WHERE CHANNEL_ID = ? AND MESSAGE_ID = ? AND STATE='S'
    LOAD_REFS_IN_SUCK=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'S' AND CHANNEL_ID = ? ORDER BY ORD
+   LOAD_SINGLE_REFERENCE=SELECT STATE, CHANNEL_ID FROM JBM_MSG_REF WHERE MESSAGE_ID=?
       ]]></attribute>
 
       <!-- The maximum number of parameters to include in a prepared statement -->

Modified: branches/JBM1842/integration/EAP4/etc/server/default/deploy/mssql-persistence-service.xml
===================================================================
--- branches/JBM1842/integration/EAP4/etc/server/default/deploy/mssql-persistence-service.xml	2011-03-02 12:04:54 UTC (rev 8231)
+++ branches/JBM1842/integration/EAP4/etc/server/default/deploy/mssql-persistence-service.xml	2011-03-02 12:10:49 UTC (rev 8232)
@@ -92,6 +92,7 @@
    UPDATE_MESSAGE_STATE=UPDATE JBM_MSG_REF SET STATE = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
    CLAIM_MESSAGE_IN_SUCK=UPDATE JBM_MSG_REF SET STATE='C' WHERE CHANNEL_ID = ? AND MESSAGE_ID = ? AND STATE='S'
    LOAD_REFS_IN_SUCK=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'S' AND CHANNEL_ID = ? ORDER BY ORD
+   LOAD_SINGLE_REFERENCE=SELECT STATE, CHANNEL_ID FROM JBM_MSG_REF WHERE MESSAGE_ID=?
       ]]></attribute>
 
       <!-- The maximum number of parameters to include in a prepared statement -->

Modified: branches/JBM1842/integration/EAP4/etc/server/default/deploy/mysql-persistence-service.xml
===================================================================
--- branches/JBM1842/integration/EAP4/etc/server/default/deploy/mysql-persistence-service.xml	2011-03-02 12:04:54 UTC (rev 8231)
+++ branches/JBM1842/integration/EAP4/etc/server/default/deploy/mysql-persistence-service.xml	2011-03-02 12:10:49 UTC (rev 8232)
@@ -92,6 +92,7 @@
    UPDATE_MESSAGE_STATE=UPDATE JBM_MSG_REF SET STATE = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
    CLAIM_MESSAGE_IN_SUCK=UPDATE JBM_MSG_REF SET STATE='C' WHERE CHANNEL_ID = ? AND MESSAGE_ID = ? AND STATE='S'
    LOAD_REFS_IN_SUCK=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'S' AND CHANNEL_ID = ? ORDER BY ORD
+   LOAD_SINGLE_REFERENCE=SELECT STATE, CHANNEL_ID FROM JBM_MSG_REF WHERE MESSAGE_ID=?
    ]]></attribute>
 
       <!-- The maximum number of parameters to include in a prepared statement -->

Modified: branches/JBM1842/integration/EAP4/etc/server/default/deploy/ndb-persistence-service.xml
===================================================================
--- branches/JBM1842/integration/EAP4/etc/server/default/deploy/ndb-persistence-service.xml	2011-03-02 12:04:54 UTC (rev 8231)
+++ branches/JBM1842/integration/EAP4/etc/server/default/deploy/ndb-persistence-service.xml	2011-03-02 12:10:49 UTC (rev 8232)
@@ -92,6 +92,7 @@
    UPDATE_MESSAGE_STATE=UPDATE JBM_MSG_REF SET STATE = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
    CLAIM_MESSAGE_IN_SUCK=UPDATE JBM_MSG_REF SET STATE='C' WHERE CHANNEL_ID = ? AND MESSAGE_ID = ? AND STATE='S'
    LOAD_REFS_IN_SUCK=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'S' AND CHANNEL_ID = ? ORDER BY ORD
+   LOAD_SINGLE_REFERENCE=SELECT STATE, CHANNEL_ID FROM JBM_MSG_REF WHERE MESSAGE_ID=?
       ]]></attribute>
 
       <!-- The maximum number of parameters to include in a prepared statement -->

Modified: branches/JBM1842/integration/EAP4/etc/server/default/deploy/oracle-persistence-service.xml
===================================================================
--- branches/JBM1842/integration/EAP4/etc/server/default/deploy/oracle-persistence-service.xml	2011-03-02 12:04:54 UTC (rev 8231)
+++ branches/JBM1842/integration/EAP4/etc/server/default/deploy/oracle-persistence-service.xml	2011-03-02 12:10:49 UTC (rev 8232)
@@ -96,6 +96,7 @@
    UPDATE_MESSAGE_STATE=UPDATE JBM_MSG_REF SET STATE = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
    CLAIM_MESSAGE_IN_SUCK=UPDATE JBM_MSG_REF SET STATE='C' WHERE CHANNEL_ID = ? AND MESSAGE_ID = ? AND STATE='S'
    LOAD_REFS_IN_SUCK=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'S' AND CHANNEL_ID = ? ORDER BY ORD
+   LOAD_SINGLE_REFERENCE=SELECT STATE, CHANNEL_ID FROM JBM_MSG_REF WHERE MESSAGE_ID=?
       ]]></attribute>
 
       <!-- The maximum number of parameters to include in a prepared statement -->

Modified: branches/JBM1842/integration/EAP4/etc/server/default/deploy/postgresql-persistence-service.xml
===================================================================
--- branches/JBM1842/integration/EAP4/etc/server/default/deploy/postgresql-persistence-service.xml	2011-03-02 12:04:54 UTC (rev 8231)
+++ branches/JBM1842/integration/EAP4/etc/server/default/deploy/postgresql-persistence-service.xml	2011-03-02 12:10:49 UTC (rev 8232)
@@ -92,6 +92,7 @@
    UPDATE_MESSAGE_STATE=UPDATE JBM_MSG_REF SET STATE = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
    CLAIM_MESSAGE_IN_SUCK=UPDATE JBM_MSG_REF SET STATE='C' WHERE CHANNEL_ID = ? AND MESSAGE_ID = ? AND STATE='S'
    LOAD_REFS_IN_SUCK=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'S' AND CHANNEL_ID = ? ORDER BY ORD
+   LOAD_SINGLE_REFERENCE=SELECT STATE, CHANNEL_ID FROM JBM_MSG_REF WHERE MESSAGE_ID=?
       ]]></attribute>
 
       <!-- The maximum number of parameters to include in a prepared statement -->

Modified: branches/JBM1842/integration/EAP4/etc/server/default/deploy/sybase-persistence-service.xml
===================================================================
--- branches/JBM1842/integration/EAP4/etc/server/default/deploy/sybase-persistence-service.xml	2011-03-02 12:04:54 UTC (rev 8231)
+++ branches/JBM1842/integration/EAP4/etc/server/default/deploy/sybase-persistence-service.xml	2011-03-02 12:10:49 UTC (rev 8232)
@@ -97,6 +97,7 @@
    UPDATE_MESSAGE_STATE=UPDATE JBM_MSG_REF SET STATE = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
    CLAIM_MESSAGE_IN_SUCK=UPDATE JBM_MSG_REF SET STATE='C' WHERE CHANNEL_ID = ? AND MESSAGE_ID = ? AND STATE='S'
    LOAD_REFS_IN_SUCK=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'S' AND CHANNEL_ID = ? ORDER BY ORD
+   LOAD_SINGLE_REFERENCE=SELECT STATE, CHANNEL_ID FROM JBM_MSG_REF WHERE MESSAGE_ID=?
       ]]></attribute>
 
       <!-- The maximum number of parameters to include in a prepared statement -->

Modified: branches/JBM1842/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
===================================================================
--- branches/JBM1842/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java	2011-03-02 12:04:54 UTC (rev 8231)
+++ branches/JBM1842/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java	2011-03-02 12:10:49 UTC (rev 8232)
@@ -751,6 +751,22 @@
 
                return null;
             }
+            catch (SQLException e)
+            {
+               if (transactionDone)
+               {
+                  String sqlState = e.getSQLState();
+                  //23000(XA) or 23xxx (SQL:2003)
+                  if (sqlState.startsWith("23"))
+                  {
+                     //this is a constraint violation. i.e. the record identified by the same key already exists
+                     //This is fine, log a warning and go ahead.
+                     log.warn("We encountered a after-commit problem however messages have already been inserted.");
+                     return null;
+                  }
+               }
+               throw e;
+            }
             finally
             {
                closeStatement(psInsertReference);
@@ -856,8 +872,13 @@
                }
 
                // Sanity check
-               if (rows != num) { throw new IllegalStateException(
-                     "Did not update correct number of rows"); }
+               if (rows != num)
+               {
+                  if (!transactionDone)
+                  {
+                     throw new IllegalStateException("Did not update correct number of rows");
+                  }
+               }
 
                return null;
             }
@@ -1558,7 +1579,7 @@
       if (trace) { log.trace("Adding reference " + ref + " in channel " + channelID + " tx " + tx); }
 
       class AddReferenceRunner extends JDBCTxRunner2
-      {
+      {         
          private Message message;
          private boolean messagePersisted = false;
          public Object doTransaction() throws Exception
@@ -1614,6 +1635,22 @@
 
                return null;
             }
+            catch (SQLException e)
+            {
+               if (transactionDone)
+               {
+                  String sqlState = e.getSQLState();
+                  //23000(XA) or 23xxx (SQL:2003)
+                  if (sqlState.startsWith("23"))
+                  {
+                     //this is a constraint violation. i.e. the record identified by the same key already exists
+                     //This is fine, log a warning and go ahead.
+                     log.warn("We encountered a problem but the message " + message + " has already been inserted.");
+                     return null;
+                  }
+               }
+               throw e;
+            }
             finally
             {
                closeStatement(psReference);
@@ -1651,10 +1688,11 @@
       if (trace) { log.trace("Moving reference " + ref + " from " + sourceChannelID + " to " + destChannelID); }
 
       class MoveReferenceRunner extends JDBCTxRunner2
-      {
+      {         
          public Object doTransaction() throws Exception
          {
             PreparedStatement psReference = null;
+            PreparedStatement ps = null;
 
             try
             {
@@ -1673,8 +1711,32 @@
                
                if (rows == 0)
                {
-                  //no message updated, should be canceled back already
-                  throw new JMSException("Failed to move message " + ref.getMessage().getMessageID());
+                  //check if it's already sucked before a possible failure
+                  ps = conn.prepareStatement(getSQLStatement("LOAD_SINGLE_REFERENCE"));
+                  ps.setLong(1, ref.getMessage().getMessageID());
+                  
+                  ResultSet rs = ps.executeQuery();
+                  
+                  if (rs.next())
+                  {
+                     String state = rs.getString(1);
+                     long channelID = rs.getLong(2);
+                     
+                     if ((!"C".equals(state)) || channelID != destChannelID)
+                     {
+                        //the message didn't get moved!
+                        throw new JMSException("Failed to move message " + ref.getMessage().getMessageID());
+                     }
+                     else
+                     {
+                        log.info("Already moved message " + ref.getMessage().getMessageID());
+                     }
+                  }
+                  else
+                  {
+                     // no message updated, should be canceled back already
+                     throw new JMSException("Failed to move message " + ref.getMessage().getMessageID() + " probably canceled back to source channel.");
+                  }
                }
 
                return null;
@@ -1682,6 +1744,7 @@
             finally
             {
                closeStatement(psReference);
+               closeStatement(ps);
             }
          }
       }
@@ -2080,6 +2143,22 @@
 
                return null;
             }
+            catch (SQLException e)
+            {
+               if (transactionDone)
+               {
+                  String sqlState = e.getSQLState();
+                  //23000(XA) or 23xxx (SQL:2003)
+                  if (sqlState.startsWith("23"))
+                  {
+                     //this is a constraint violation. i.e. the record identified by the same key already exists
+                     //This is fine, log a warning and go ahead.
+                     log.warn("We encountered a after-commit problem however messages have already been inserted.");
+                     return null;
+                  }
+               }
+               throw e;
+            }
             finally
             {
                closeStatement(psReference);
@@ -2280,6 +2359,22 @@
 
                return null;
             }
+            catch (SQLException e)
+            {
+               if (transactionDone)
+               {
+                  String sqlState = e.getSQLState();
+                  //23000(XA) or 23xxx (SQL:2003)
+                  if (sqlState.startsWith("23"))
+                  {
+                     //this is a constraint violation. i.e. the record identified by the same key already exists
+                     //This is fine, log a warning and go ahead.
+                     log.warn("We encountered a after-commit problem however messages have already been inserted.");
+                     return null;
+                  }
+               }
+               throw e;
+            }
             finally
             {
                closeStatement(psReference);
@@ -2991,6 +3086,7 @@
       map.put("UPDATE_MESSAGE_STATE", "UPDATE JBM_MSG_REF SET STATE = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?");
       map.put("CLAIM_MESSAGE_IN_SUCK", "UPDATE JBM_MSG_REF SET STATE='C' WHERE CHANNEL_ID = ? AND MESSAGE_ID = ? AND STATE='S'");
       map.put("LOAD_REFS_IN_SUCK", "SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'S' AND CHANNEL_ID = ? ORDER BY ORD");
+      map.put("LOAD_SINGLE_REFERENCE", "SELECT STATE, CHANNEL_ID FROM JBM_MSG_REF WHERE MESSAGE_ID=?");      
 
       return map;
    }
@@ -3360,6 +3456,7 @@
          public Object doTransaction() throws Exception
          {
             PreparedStatement psReference = null;
+            PreparedStatement ps = null;
 
             try
             {
@@ -3381,7 +3478,28 @@
                
                if (rows != 1)
                {
-                  throw new JMSException("Failed to update message " + ref.getMessage().getMessageID() + " to state " + c);
+                  ps = conn.prepareStatement(getSQLStatement("LOAD_SINGLE_REFERENCE"));
+                  ps.setLong(1, ref.getMessage().getMessageID());
+                  
+                  ResultSet rs = ps.executeQuery();
+                  
+                  if (rs.next())
+                  {
+                     String state = rs.getString(1);
+                     
+                     if (!c.equals(state))
+                     {
+                        throw new JMSException("Failed to update message " + ref.getMessage().getMessageID() + " to state " + c);
+                     }
+                     else
+                     {
+                        log.info("Already updated message " + ref.getMessage().getMessageID());
+                     }
+                  }
+                  else
+                  {
+                     throw new JMSException("Failed to update message " + ref.getMessage().getMessageID() + " to state " + c);
+                  }
                }
 
                return null;
@@ -3389,6 +3507,7 @@
             finally
             {
                closeStatement(psReference);
+               closeStatement(ps);
             }
          }
       }

Modified: branches/JBM1842/src/main/org/jboss/messaging/core/impl/JDBCSupport.java
===================================================================
--- branches/JBM1842/src/main/org/jboss/messaging/core/impl/JDBCSupport.java	2011-03-02 12:04:54 UTC (rev 8231)
+++ branches/JBM1842/src/main/org/jboss/messaging/core/impl/JDBCSupport.java	2011-03-02 12:10:49 UTC (rev 8232)
@@ -28,6 +28,7 @@
 import javax.transaction.Status;
 import javax.transaction.Transaction;
 import javax.transaction.TransactionManager;
+
 import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
@@ -523,6 +524,8 @@
 
       private TransactionWrapper wrap;
       
+      protected boolean shouldFailCommit = false;
+      
       public T executeOnlyOnce() throws Exception
       {
          wrap = new TransactionWrapper();
@@ -611,6 +614,8 @@
       private boolean getConnectionFailed;
 
       private boolean getCommitFailed;
+      
+      protected boolean transactionDone = false;
 
       public T execute() throws Exception
       {                    
@@ -631,6 +636,8 @@
             
             T res = doTransaction();
             
+            transactionDone = true;
+
             conn.commit();
             
             return res;

Modified: branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2011-03-02 12:04:54 UTC (rev 8231)
+++ branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2011-03-02 12:10:49 UTC (rev 8232)
@@ -24,6 +24,7 @@
 import java.io.Serializable;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
+import java.sql.SQLException;
 import java.sql.Timestamp;
 import java.sql.Types;
 import java.util.ArrayList;
@@ -3512,8 +3513,8 @@
    	{
    		return;
    	}
-      class InsertBindings extends JDBCTxRunner
-      {
+      class InsertBindings extends JDBCTxRunner2
+      {         
          public Object doTransaction() throws Exception
          {
             PreparedStatement ps  = null;
@@ -3555,6 +3556,22 @@
 
                ps.executeUpdate();
             }
+            catch (SQLException e)
+            {
+               if (transactionDone)
+               {
+                  String sqlState = e.getSQLState();
+                  //23000(XA) or 23xxx (SQL:2003)
+                  if (sqlState.startsWith("23"))
+                  {
+                     //this is a constraint violation. i.e. the record identified by the same key already exists
+                     //This is fine, log a warning and go ahead.
+                     log.warn("We encountered a problem but the binding has already been inserted.");
+                     return null;
+                  }
+               }
+               throw e;
+            }
             finally
             {
                closeStatement(ps);



More information about the jboss-cvs-commits mailing list