[jboss-cvs] JBoss Messaging SVN: r1595 - branches/Branch_1_0_XARecovery/src/main/org/jboss/messaging/core/plugin

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Nov 20 06:57:24 EST 2006


Author: juha at jboss.org
Date: 2006-11-20 06:57:22 -0500 (Mon, 20 Nov 2006)
New Revision: 1595

Modified:
   branches/Branch_1_0_XARecovery/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java
Log:
Original patch (JBMessaging-407) with changed to persistence manager to handle recovery related queries.

Modified: branches/Branch_1_0_XARecovery/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java
===================================================================
--- branches/Branch_1_0_XARecovery/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java	2006-11-20 11:54:49 UTC (rev 1594)
+++ branches/Branch_1_0_XARecovery/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java	2006-11-20 11:57:22 UTC (rev 1595)
@@ -66,6 +66,7 @@
 import org.jboss.messaging.core.tx.Transaction;
 import org.jboss.messaging.core.tx.TxCallback;
 import org.jboss.messaging.core.tx.XidImpl;
+import org.jboss.messaging.core.tx.PreparedTxInfo;
 import org.jboss.messaging.util.Util;
 import org.jboss.serial.io.JBossObjectInputStream;
 import org.jboss.serial.io.JBossObjectOutputStream;
@@ -81,6 +82,7 @@
  * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @author <a href="mailto:adrian at jboss.org">Adrian Brock</a>
+ * @author <a href="mailto:Konda.Madhu at uk.mizuho-sc.com">Madhu Konda</a>
  *
  * @version <tt>1.1</tt>
  *
@@ -208,7 +210,16 @@
    
    protected String selectAllChannels = "SELECT DISTINCT(CHANNELID) FROM JMS_MESSAGE_REFERENCE";
    
-        
+   //RECOVERY --MK
+   protected String selectMessageIdForRef = "SELECT MESSAGEID FROM JMS_MESSAGE_REFERENCE "
+	      + "WHERE TRANSACTIONID = ? AND STATE = '+' ";
+
+   protected String selectMessageIdForAck = "SELECT MESSAGEID FROM JMS_MESSAGE_REFERENCE "
+	      + "WHERE TRANSACTIONID = ? AND STATE = '-' ";
+
+   protected String selectChannelId = "SELECT CHANNELLID FROM JMS_MESSAGE_REFERENCE "
+	      + "WHERE TRANSACTIONID = ?";
+
    // Static --------------------------------------------------------
    
    // Attributes ----------------------------------------------------
@@ -617,8 +628,117 @@
       }
    }
    
-   
-   
+   public ChannelMapper getChannelMapperReference()
+   {
+	   return cm;
+   }
+
+   public long getMessageIdForRef(long transactionId) throws Exception
+   {
+	   return getId(selectMessageIdForRef, transactionId, 1);
+   }
+   public long getMessageIdForAck(long transactionId) throws Exception
+   {
+	   return getId(selectMessageIdForAck, transactionId, 1);
+   }
+
+   public long getChannelId(long transactionId) throws Exception
+   {
+	   return getId(selectChannelId, transactionId, 1);
+   }
+
+
+   /**
+    * This method associates a TransactionCallback to the
+    * resurrected transaction. This method is called from
+    * the TransactionRepository once a transaction object
+    * is resurrected.
+    */
+   public void associateTxCallbackToPreparedTx(Transaction tx)
+   {
+	  TransactionCallback callback = (TransactionCallback) tx.getKeyedCallback(this);
+
+      if (callback == null)
+      {
+         callback = new TransactionCallback(tx);
+
+         tx.addKeyedCallback(callback, this);
+      }
+   }
+
+   /**
+    * MK
+    * Load the message refs for the corresponding tx id
+    */
+   public long getId(String sqlQuery, long transactionId, int index) throws Exception
+   {
+      if (trace) { log.trace("loading references for tx [" + transactionId + "]"); }
+
+      Connection conn = null;
+      PreparedStatement ps = null;
+      ResultSet rs = null;
+      TransactionWrapper wrap = new TransactionWrapper();
+      long messageId = 0;
+
+      try
+      {
+         conn = ds.getConnection();
+
+         ps = conn.prepareStatement(sqlQuery);
+
+         ps.setLong(1, transactionId);
+
+         rs = ps.executeQuery();
+
+         while(rs.next())
+         {
+        	   messageId = rs.getLong(index);
+         }
+
+         return messageId;
+      }
+      catch (Exception e)
+      {
+         wrap.exceptionOccurred();
+         throw e;
+      }
+      finally
+      {
+         if (rs != null)
+         {
+            try
+            {
+               rs.close();
+            }
+            catch (Throwable e)
+            {
+            }
+         }
+         if (ps != null)
+         {
+            try
+            {
+               ps.close();
+            }
+            catch (Throwable e)
+            {
+            }
+         }
+         if (conn != null)
+         {
+            try
+            {
+               conn.close();
+            }
+            catch (Throwable e)
+            {
+            }
+         }
+         wrap.end();
+      }
+   }
+
+
    /*
     * Retrieve a List of messages corresponding to the specified List of message ids.
     * The implementation here for HSQLDB does this by using a PreparedStatment with an IN clause
@@ -1983,6 +2103,7 @@
       Connection conn = null;
       Statement st = null;
       ResultSet rs = null;
+      PreparedTxInfo txInfo = null;
       TransactionWrapper wrap = new TransactionWrapper();
       
       try
@@ -1996,12 +2117,17 @@
          
          while (rs.next())
          {
+           	//get the existing tx id --MK START
+            long txId = rs.getLong(1);
+
             byte[] branchQual = rs.getBytes(2);
             int formatId = rs.getInt(3);
             byte[] globalTxId = rs.getBytes(4);
             Xid xid = new XidImpl(branchQual, formatId, globalTxId);
             
-            transactions.add(xid);
+            // create a tx info object with the result set detailsdetails
+            txInfo = new PreparedTxInfo(txId, xid);
+            transactions.add(txInfo);
          }
          
          return transactions;
@@ -2443,6 +2569,11 @@
       
       //Other
       selectAllChannels = sqlProperties.getProperty("SELECT_ALL_CHANNELS", selectAllChannels);
+
+      //recovery
+      selectMessageIdForRef = sqlProperties.getProperty("SELECT_MESSAGEID_FOR_REFS",selectMessageIdForRef);
+      selectMessageIdForAck = sqlProperties.getProperty("SELECT_MESSAGEID_FOR_ACKS",selectMessageIdForAck);
+      selectChannelId  = sqlProperties.getProperty("SELECT_CHANNEL_ID",selectChannelId );
    }
    
    protected TransactionCallback getCallback(Transaction tx)




More information about the jboss-cvs-commits mailing list