[jboss-cvs] JBoss Messaging SVN: r8614 - in branches/Branch_JBossMessaging_1_4_8_SP9_JBMESSAGING-1940/src/main/org/jboss: messaging/core/impl and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Sep 20 02:20:53 EDT 2013


Author: bershath27
Date: 2013-09-20 02:20:53 -0400 (Fri, 20 Sep 2013)
New Revision: 8614

Modified:
   branches/Branch_JBossMessaging_1_4_8_SP9_JBMESSAGING-1940/src/main/org/jboss/jms/server/ServerPeer.java
   branches/Branch_JBossMessaging_1_4_8_SP9_JBMESSAGING-1940/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
   branches/Branch_JBossMessaging_1_4_8_SP9_JBMESSAGING-1940/src/main/org/jboss/messaging/core/impl/JDBCSupport.java
Log:
JBPAPP-10865

Modified: branches/Branch_JBossMessaging_1_4_8_SP9_JBMESSAGING-1940/src/main/org/jboss/jms/server/ServerPeer.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_8_SP9_JBMESSAGING-1940/src/main/org/jboss/jms/server/ServerPeer.java	2013-09-20 05:04:30 UTC (rev 8613)
+++ branches/Branch_JBossMessaging_1_4_8_SP9_JBMESSAGING-1940/src/main/org/jboss/jms/server/ServerPeer.java	2013-09-20 06:20:53 UTC (rev 8614)
@@ -274,6 +274,7 @@
          {
          	((JDBCPersistenceManager)persistenceManager).injectNodeID(serverPeerID);
             ((JDBCPersistenceManager)persistenceManager).setStopServerPeerOnDBFailure(this.stopServerPeerOnDBFailure);
+            ((JDBCPersistenceManager)persistenceManager).setServerPeer(this);
          }
          else if (persistenceManager instanceof NullPersistenceManager)
          {

Modified: branches/Branch_JBossMessaging_1_4_8_SP9_JBMESSAGING-1940/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_8_SP9_JBMESSAGING-1940/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java	2013-09-20 05:04:30 UTC (rev 8613)
+++ branches/Branch_JBossMessaging_1_4_8_SP9_JBMESSAGING-1940/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java	2013-09-20 06:20:53 UTC (rev 8614)
@@ -22,11 +22,14 @@
 package org.jboss.messaging.core.impl;
 
 import org.jboss.jms.message.JBossMessage;
+import org.jboss.jms.server.ServerPeer;
 import org.jboss.jms.tx.MessagingXid;
 import org.jboss.logging.Logger;
+import org.jboss.messaging.core.contract.Binding;
 import org.jboss.messaging.core.contract.Message;
 import org.jboss.messaging.core.contract.MessageReference;
 import org.jboss.messaging.core.contract.PersistenceManager;
+import org.jboss.messaging.core.contract.PostOffice;
 import org.jboss.messaging.core.impl.message.MessageFactory;
 import org.jboss.messaging.core.impl.message.MessageSupport;
 import org.jboss.messaging.core.impl.tx.PreparedTxInfo;
@@ -106,16 +109,12 @@
    
    private boolean supportsTxAge;
    
-   private int maxRetry = 25;
-   
-   private int retryInterval = 1000;
-
-   private boolean retryOnConnectionFailure = false;
-   
    private volatile boolean stopServerPeerOnDBFailure = false;
    
    private Object moveLock = new Object();
    
+   private ServerPeer server;
+   
    // Constructors --------------------------------------------------
 
    public JDBCPersistenceManager(DataSource ds, TransactionManager tm,
@@ -1192,6 +1191,14 @@
          public Object doTransaction() throws Exception
          {
             PreparedStatement statement = null;
+            PreparedStatement statement1 = null;
+            PreparedStatement statement2 = null;
+            PreparedStatement statement3 = null;
+            PreparedStatement statement4 = null;
+            ResultSet rs = null;
+            ResultSet rs1 = null;
+            ResultSet rs2 = null;
+
             try
             {
                statement = conn.prepareStatement(getSQLStatement("UPDATE_TX"));
@@ -1199,14 +1206,106 @@
                statement.setInt(2, fromNodeID);
                int affected = statement.executeUpdate();
 
+               statement.close();
+               statement = null;
+
                log.debug("Merged " + affected + " transactions from channel "
                      + fromNodeID + " into node " + toNodeID);
+               
+               //The following code is for JBMESSAGING-1940
+               if (affected > 0)
+               {
+                  //check non-movable transactions
+                  statement1 = conn.prepareStatement(getSQLStatement("SELECT_PREPARED_TRANSACTIONS"));
 
+                  statement1.setInt(1, nodeID);
+
+                  rs = statement1.executeQuery();
+
+                  List<Long> nonMovableTxs = new ArrayList<Long>();
+
+                  //check '+" messages, if binding is missing, don't merge this tx
+                  String sql = getSQLStatement("SELECT_MESSAGE_ID_FOR_REF");
+                  statement2 = conn.prepareStatement(sql);
+
+                  //check '-' messages, if binding is missing, don't merge this tx
+                  sql = getSQLStatement("SELECT_MESSAGE_ID_FOR_ACK");
+                  statement3 = conn.prepareStatement(sql);
+                  
+                  PostOffice postOffice = server.getPostOfficeInstance();
+
+                  while (rs.next())
+                  {
+                     boolean found = false;
+
+                     long txId = rs.getLong(1);
+
+                     statement2.setLong(1, txId);
+
+                     rs1 = statement2.executeQuery();
+
+                     while (rs1.next())
+                     {
+                        long channelId = rs1.getLong(2);
+                        Binding binding = postOffice.getBindingForChannelID(channelId);
+                        if (binding == null)
+                        {
+                           nonMovableTxs.add(txId);
+                           found = true;
+                           break;
+                        }
+                     }
+                     
+                     rs1.close();
+                     rs1 = null;
+                     if (found) continue;
+
+                     statement3.setLong(1, txId);
+
+                     rs2 = statement3.executeQuery();
+
+                     while (rs2.next())
+                     {
+                        long channelId = rs2.getLong(2);
+                        Binding binding = postOffice.getBindingForChannelID(channelId);
+                        if (binding == null)
+                        {
+                           nonMovableTxs.add(txId);
+                           break;
+                        }
+                     }
+                     rs2.close();
+                     rs2 = null;
+                  }
+
+                  rs.close();
+                  rs = null;
+                  statement2.close();
+                  statement2 = null;
+                  statement3.close();
+                  statement3 = null;
+
+                  if (nonMovableTxs.size() > 0)
+                  {
+                     //put non-movable tx back
+                     statement4 = conn.prepareStatement(getSQLStatement("MOVE_TX_NODE"));
+                     for (Long nonMTxId : nonMovableTxs)
+                     {
+                        statement4.setInt(1, fromNodeID);
+                        statement4.setLong(2, nonMTxId);
+                        statement4.executeUpdate();
+                     }
+                     statement4.close();
+                     statement4 = null;
+                     log.debug("Moved transactions back: " + nonMovableTxs);
+                  }
+               }
                return null;
             }
             finally
             {
-               closeStatement(statement);
+               closeResultSet(rs, rs1, rs2);
+               closeStatement(statement, statement1, statement2, statement3, statement4);
             }
          }
       }
@@ -3091,6 +3190,7 @@
       map.put("SELECT_MESSAGE_ID_FOR_ACK",
               "SELECT MESSAGE_ID, CHANNEL_ID FROM JBM_MSG_REF WHERE TRANSACTION_ID = ? AND STATE = '-' ORDER BY ORD");
       map.put("UPDATE_TX", "UPDATE JBM_TX SET NODE_ID=? WHERE NODE_ID=?");
+      map.put("MOVE_TX_NODE", "UPDATE JBM_TX SET NODE_ID=? WHERE TRANSACTION_ID=?");
 
       // Counter
       map.put("UPDATE_COUNTER",
@@ -3613,4 +3713,9 @@
       this.stopServerPeerOnDBFailure = crashServerOnDBFailure;
    }
 
+   public void setServerPeer(ServerPeer server)
+   {
+      this.server = server;
+   }
+
 }

Modified: branches/Branch_JBossMessaging_1_4_8_SP9_JBMESSAGING-1940/src/main/org/jboss/messaging/core/impl/JDBCSupport.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_8_SP9_JBMESSAGING-1940/src/main/org/jboss/messaging/core/impl/JDBCSupport.java	2013-09-20 05:04:30 UTC (rev 8613)
+++ branches/Branch_JBossMessaging_1_4_8_SP9_JBMESSAGING-1940/src/main/org/jboss/messaging/core/impl/JDBCSupport.java	2013-09-20 06:20:53 UTC (rev 8614)
@@ -252,39 +252,45 @@
       return false;
    }
    
-   protected void closeResultSet(ResultSet rs)
+   protected void closeResultSet(ResultSet... rss)
    {
-   	if (rs != null)
+      for (ResultSet rs : rss)
       {
-         try
+         if (rs != null)
          {
-            rs.close();
+            try
+            {
+               rs.close();
+            }
+            catch (Throwable e)
+            {
+               if (trace)
+               {
+                  log.trace("Failed to close result set", e);
+               }
+            }
          }
-         catch (Throwable e)
-         {
-         	if (trace)
-         	{
-         		log.trace("Failed to close result set", e);
-         	}
-         }
       }
    }
    
-   protected void closeStatement(Statement st)
+   protected void closeStatement(Statement... sts)
    {
-   	if (st != null)
+      for (Statement st : sts)
       {
-         try
+   	   if (st != null)
          {
-         	st.close();
+            try
+            {
+         	   st.close();
+            }
+            catch (Throwable e)
+            {
+         	   if (trace)
+         	   {
+         		   log.trace("Failed to close statement", e);
+         	   }
+            }
          }
-         catch (Throwable e)
-         {
-         	if (trace)
-         	{
-         		log.trace("Failed to close statement", e);
-         	}
-         }
       }
    }
    



More information about the jboss-cvs-commits mailing list