[jboss-cvs] JBoss Messaging SVN: r1595 - branches/Branch_1_0_XARecovery/src/main/org/jboss/messaging/core/plugin
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Nov 20 06:57:24 EST 2006
Author: juha at jboss.org
Date: 2006-11-20 06:57:22 -0500 (Mon, 20 Nov 2006)
New Revision: 1595
Modified:
branches/Branch_1_0_XARecovery/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java
Log:
Original patch (JBMessaging-407) with changed to persistence manager to handle recovery related queries.
Modified: branches/Branch_1_0_XARecovery/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java
===================================================================
--- branches/Branch_1_0_XARecovery/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java 2006-11-20 11:54:49 UTC (rev 1594)
+++ branches/Branch_1_0_XARecovery/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java 2006-11-20 11:57:22 UTC (rev 1595)
@@ -66,6 +66,7 @@
import org.jboss.messaging.core.tx.Transaction;
import org.jboss.messaging.core.tx.TxCallback;
import org.jboss.messaging.core.tx.XidImpl;
+import org.jboss.messaging.core.tx.PreparedTxInfo;
import org.jboss.messaging.util.Util;
import org.jboss.serial.io.JBossObjectInputStream;
import org.jboss.serial.io.JBossObjectOutputStream;
@@ -81,6 +82,7 @@
* @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @author <a href="mailto:adrian at jboss.org">Adrian Brock</a>
+ * @author <a href="mailto:Konda.Madhu at uk.mizuho-sc.com">Madhu Konda</a>
*
* @version <tt>1.1</tt>
*
@@ -208,7 +210,16 @@
protected String selectAllChannels = "SELECT DISTINCT(CHANNELID) FROM JMS_MESSAGE_REFERENCE";
-
+ //RECOVERY --MK
+ protected String selectMessageIdForRef = "SELECT MESSAGEID FROM JMS_MESSAGE_REFERENCE "
+ + "WHERE TRANSACTIONID = ? AND STATE = '+' ";
+
+ protected String selectMessageIdForAck = "SELECT MESSAGEID FROM JMS_MESSAGE_REFERENCE "
+ + "WHERE TRANSACTIONID = ? AND STATE = '-' ";
+
+ protected String selectChannelId = "SELECT CHANNELLID FROM JMS_MESSAGE_REFERENCE "
+ + "WHERE TRANSACTIONID = ?";
+
// Static --------------------------------------------------------
// Attributes ----------------------------------------------------
@@ -617,8 +628,117 @@
}
}
-
-
+ public ChannelMapper getChannelMapperReference()
+ {
+ return cm;
+ }
+
+ public long getMessageIdForRef(long transactionId) throws Exception
+ {
+ return getId(selectMessageIdForRef, transactionId, 1);
+ }
+ public long getMessageIdForAck(long transactionId) throws Exception
+ {
+ return getId(selectMessageIdForAck, transactionId, 1);
+ }
+
+ public long getChannelId(long transactionId) throws Exception
+ {
+ return getId(selectChannelId, transactionId, 1);
+ }
+
+
+ /**
+ * This method associates a TransactionCallback to the
+ * resurrected transaction. This method is called from
+ * the TransactionRepository once a transaction object
+ * is resurrected.
+ */
+ public void associateTxCallbackToPreparedTx(Transaction tx)
+ {
+ TransactionCallback callback = (TransactionCallback) tx.getKeyedCallback(this);
+
+ if (callback == null)
+ {
+ callback = new TransactionCallback(tx);
+
+ tx.addKeyedCallback(callback, this);
+ }
+ }
+
+ /**
+ * MK
+ * Load the message refs for the corresponding tx id
+ */
+ public long getId(String sqlQuery, long transactionId, int index) throws Exception
+ {
+ if (trace) { log.trace("loading references for tx [" + transactionId + "]"); }
+
+ Connection conn = null;
+ PreparedStatement ps = null;
+ ResultSet rs = null;
+ TransactionWrapper wrap = new TransactionWrapper();
+ long messageId = 0;
+
+ try
+ {
+ conn = ds.getConnection();
+
+ ps = conn.prepareStatement(sqlQuery);
+
+ ps.setLong(1, transactionId);
+
+ rs = ps.executeQuery();
+
+ while(rs.next())
+ {
+ messageId = rs.getLong(index);
+ }
+
+ return messageId;
+ }
+ catch (Exception e)
+ {
+ wrap.exceptionOccurred();
+ throw e;
+ }
+ finally
+ {
+ if (rs != null)
+ {
+ try
+ {
+ rs.close();
+ }
+ catch (Throwable e)
+ {
+ }
+ }
+ if (ps != null)
+ {
+ try
+ {
+ ps.close();
+ }
+ catch (Throwable e)
+ {
+ }
+ }
+ if (conn != null)
+ {
+ try
+ {
+ conn.close();
+ }
+ catch (Throwable e)
+ {
+ }
+ }
+ wrap.end();
+ }
+ }
+
+
/*
* Retrieve a List of messages corresponding to the specified List of message ids.
* The implementation here for HSQLDB does this by using a PreparedStatment with an IN clause
@@ -1983,6 +2103,7 @@
Connection conn = null;
Statement st = null;
ResultSet rs = null;
+ PreparedTxInfo txInfo = null;
TransactionWrapper wrap = new TransactionWrapper();
try
@@ -1996,12 +2117,17 @@
while (rs.next())
{
+ //get the existing tx id --MK START
+ long txId = rs.getLong(1);
+
byte[] branchQual = rs.getBytes(2);
int formatId = rs.getInt(3);
byte[] globalTxId = rs.getBytes(4);
Xid xid = new XidImpl(branchQual, formatId, globalTxId);
- transactions.add(xid);
+ // create a tx info object with the result set detailsdetails
+ txInfo = new PreparedTxInfo(txId, xid);
+ transactions.add(txInfo);
}
return transactions;
@@ -2443,6 +2569,11 @@
//Other
selectAllChannels = sqlProperties.getProperty("SELECT_ALL_CHANNELS", selectAllChannels);
+
+ //recovery
+ selectMessageIdForRef = sqlProperties.getProperty("SELECT_MESSAGEID_FOR_REFS",selectMessageIdForRef);
+ selectMessageIdForAck = sqlProperties.getProperty("SELECT_MESSAGEID_FOR_ACKS",selectMessageIdForAck);
+ selectChannelId = sqlProperties.getProperty("SELECT_CHANNEL_ID",selectChannelId );
}
protected TransactionCallback getCallback(Transaction tx)
More information about the jboss-cvs-commits
mailing list