[jboss-cvs] JBoss Messaging SVN: r1593 - branches/Branch_1_0_XARecovery/src/main/org/jboss/messaging/core/tx
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Nov 20 06:52:41 EST 2006
Author: juha at jboss.org
Date: 2006-11-20 06:52:39 -0500 (Mon, 20 Nov 2006)
New Revision: 1593
Modified:
branches/Branch_1_0_XARecovery/src/main/org/jboss/messaging/core/tx/TransactionRepository.java
Log:
Original patch (JBMessaging-407) for TransactionRepository changes with comments. Requires JMX kernel integration fix and verification of correct message recovery semantics.
Modified: branches/Branch_1_0_XARecovery/src/main/org/jboss/messaging/core/tx/TransactionRepository.java
===================================================================
--- branches/Branch_1_0_XARecovery/src/main/org/jboss/messaging/core/tx/TransactionRepository.java 2006-11-20 11:29:44 UTC (rev 1592)
+++ branches/Branch_1_0_XARecovery/src/main/org/jboss/messaging/core/tx/TransactionRepository.java 2006-11-20 11:52:39 UTC (rev 1593)
@@ -30,7 +30,18 @@
import org.jboss.logging.Logger;
import org.jboss.messaging.core.plugin.IdManager;
+import org.jboss.messaging.core.plugin.SimpleMessageStore;
import org.jboss.messaging.core.plugin.contract.PersistenceManager;
+import org.jboss.messaging.core.plugin.contract.MessageStore;
+import org.jboss.messaging.core.MessageReference;
+import org.jboss.messaging.core.Message;
+import org.jboss.messaging.core.SimpleDelivery;
+import org.jboss.messaging.core.Delivery;
+import org.jboss.messaging.core.DeliveryObserver;
+import org.jboss.messaging.core.local.CoreDestination;
+import org.jboss.jms.server.plugin.contract.ChannelMapper;
+import org.jboss.jms.server.plugin.JDBCChannelMapper;
+import org.jboss.jms.destination.JBossDestination;
import EDU.oswego.cs.dl.util.concurrent.ConcurrentReaderHashMap;
@@ -39,6 +50,8 @@
* This class maintains JMS Server local transactions.
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:Konda.Madhu at uk.mizuho-sc.com">Madhu Konda</a>
+ *
* @version $Revision 1.1 $
*
* $Id$
@@ -82,48 +95,87 @@
{
}
- public List getPreparedTransactions()
+ public List getPreparedTransactions()
{
- ArrayList prepared = new ArrayList();
- Iterator iter = globalToLocalMap.values().iterator();
- while (iter.hasNext())
+
+ ArrayList prepared = new ArrayList();
+
+ Iterator iter = globalToLocalMap.values().iterator();
+
+ while (iter.hasNext())
{
- Transaction tx = (Transaction)iter.next();
- if (tx.xid != null && tx.getState() == Transaction.STATE_PREPARED)
+ Transaction tx = (Transaction) iter.next();
+
+ try
{
- prepared.add(tx.getXid());
- }
- }
- return prepared;
- }
-
- /*
- * Load any prepared transactions into the repository so they can be recovered
- */
- public void loadPreparedTransactions() throws Exception
- {
- List prepared = null;
-
- prepared = persistenceManager.retrievePreparedTransactions();
-
- if (prepared != null)
- {
- Iterator iter = prepared.iterator();
-
- while (iter.hasNext())
+ if(trace)
+ log.trace("Loading and handling refs and acks to the Tx "+tx);
+
+ // TODO: [JPL] should this only apply to STATE_PREPARED transactions?
+
+ handleReferences(tx, tx.getId());
+ handleAcks(tx, tx.getId());
+ }
+ catch (Exception e)
{
- Xid xid = (Xid)iter.next();
- Transaction tx = createTransaction(xid);
- tx.state = Transaction.STATE_PREPARED;
-
- //Load the references for this transaction
- }
- }
- }
-
+ // TODO: [JPL] fix this
+ e.printStackTrace();
+ }
+
+ if (tx.xid != null && tx.getState() == Transaction.STATE_PREPARED)
+ {
+ prepared.add(tx.getXid());
+ }
+ }
+
+ return prepared;
+ }
+
+ /*
+ * Load any prepared transactions into the repository so they can be
+ * recovered
+ */
+ public void loadPreparedTransactions() throws Exception {
+ List prepared = null;
+
+ prepared = persistenceManager.retrievePreparedTransactions();
+
+ if (prepared != null) {
+ Iterator iter = prepared.iterator();
+
+ while (iter.hasNext()) {
+ PreparedTxInfo txInfo = (PreparedTxInfo) iter.next();
+
+ Transaction tx = createTransaction(txInfo);
+ tx.state = Transaction.STATE_PREPARED;
+
+ // // we have to associate a callback to the prepared
+ // transaction
+ // // --MK
+ persistenceManager.associateTxCallbackToPreparedTx(tx);
+ // log.info(">>>ASSOCIATING THE MEMCALLBACK <<<");
+
+ // TODO: [JPL] needs to be completed.
+
+ // associateInMemoryCallbackToPreparedTx(tx);
+ // Load the references for this transaction
+
+ }
+ }
+ }
+
+
public Transaction getPreparedTx(Xid xid) throws Exception
{
- Transaction tx = (Transaction)globalToLocalMap.get(xid);
+ // the incoming Xid from Arjuna's RM is funny formatted!
+ // hence we have to do our way
+ // I have to look into this carefully a bit later.
+
+ Xid x = new XidImpl(xid.getBranchQualifier(), xid.getFormatId(), xid
+ .getGlobalTransactionId());
+
+ Transaction tx = (Transaction)globalToLocalMap.get(x);
+
if (tx == null)
{
throw new TransactionException("Cannot find local tx for xid:" + xid);
@@ -153,9 +205,6 @@
}
globalToLocalMap.remove(id);
-
-
-
}
public Transaction createTransaction(Xid xid) throws Exception
@@ -180,8 +229,21 @@
return tx;
}
-
-
+
+
+ // TODO: [JPL] BEGIN REMOVE ------------------------------------------------
+
+ // [JPL] this is a wrong way of getting to the store, it ignores the
+ // configuration settings and needs to be retrieved via the kernel
+
+ public MessageStore getMessageStore()
+ {
+ return new SimpleMessageStore("store.0");
+ }
+
+ // TODO: [JPL] END REMOVE --------------------------------------------------
+
+
/** To be used only by testcases */
public int getNumberOfRegisteredTransactions()
{
@@ -193,7 +255,143 @@
// Protected -----------------------------------------------------
// Private -------------------------------------------------------
-
+
+ /**
+ * Load the references and invoke the channel to handle those refs
+ */
+ private void handleReferences(Transaction tx, long txId) throws Exception {
+
+ long messageId = persistenceManager.getMessageIdForRef(txId);
+
+ List refsList = getRefs(messageId);
+
+ // now we got all the refs
+ // for each ref loaded, we'll invoke channel.handle
+ for (Iterator iter = refsList.iterator(); iter.hasNext();)
+ {
+ CoreDestination d = getChannel(persistenceManager.getChannelId(txId), txId);
+
+ if (trace)
+ log.trace("Handling the channel");
+
+ d.handle(null, (MessageReference) iter.next(), tx);
+ }
+ }
+
+ /**
+ * Load the acks and acknowledge them
+ */
+ private void handleAcks(Transaction tx, long txId) throws Exception {
+
+ long messageId = persistenceManager.getMessageIdForAck(txId);
+
+ List refsList = getRefs(messageId);
+
+ for (Iterator iter = refsList.iterator(); iter.hasNext();)
+ {
+ Delivery del = new SimpleDelivery(null, (MessageReference) iter.next());
+
+ try
+ {
+ if(trace)
+ log.trace("Acknowledging..");
+
+ ((DeliveryObserver)del).acknowledge(del, tx);
+ }
+ catch (Throwable e)
+ {
+ // TODO: [JPL] fix this
+ e.printStackTrace();
+ }
+ }
+ }
+
+ /**
+ * Get the message references based on the messageId from database
+ */
+ private List getRefs(long messageId) throws Exception
+ {
+ List noRefsList = new ArrayList();
+ List refsList = new ArrayList();
+
+ // Find the message store
+ // TODO: [JPL] this needs to be fixed to go through the kernel
+ MessageStore ms = getMessageStore();
+
+ // and message reference from store
+ MessageReference ref = ms.reference(messageId);
+
+ // Store, sometime, does'nt know about the message referece
+ // and the above ref may be null. Hence we need to load actual message
+ // by goind back to the database and loading them based on id
+
+ if (ref == null)
+ {
+ noRefsList.add(new Long(messageId));
+ }
+ else
+ {
+ refsList.add(ref);
+ }
+
+ // ask the pm to get the messages from messageId list
+ List messagesList = persistenceManager.getMessages(noRefsList);
+
+ for (Iterator iter = messagesList.iterator(); iter.hasNext();)
+ {
+ Message m = (Message) iter.next();
+ MessageReference r = ms.reference(m);
+ refsList.add(r);
+ }
+
+ return refsList;
+ }
+
+ /**
+ * This method returns a core destination representation based on
+ * the channel id and transaction id
+ */
+ private CoreDestination getChannel(long channelId, long txId)
+ throws Exception {
+
+ // first get the reference to the channel mapper
+ ChannelMapper mapper = persistenceManager.getChannelMapperReference();
+
+ // find out the destination from channelId
+ JBossDestination destn = ((JDBCChannelMapper)mapper).getJBossDestination(channelId);
+
+ // get the core destination from jbossdestination
+ return mapper.getCoreDestination(destn);
+
+ }
+
+ /**
+ * Creates a prepared transaction
+ *
+ * @param txInfo
+ * @return
+ * @throws Exception
+ */
+ private Transaction createTransaction(PreparedTxInfo txInfo) throws Exception
+ {
+ if (globalToLocalMap.containsKey(txInfo.getXid()))
+ {
+ throw new TransactionException(
+ "There is already a local tx for global tx " + txInfo.getXid());
+ }
+
+ // Resurrected tx
+ Transaction tx = new Transaction(txInfo.getTxId(), txInfo.getXid(), this);
+
+ if (trace) {
+ log.trace("created transaction " + tx);
+ }
+
+ globalToLocalMap.put(txInfo.getXid(), tx);
+
+ return tx;
+ }
+
// Inner classes -------------------------------------------------
}
\ No newline at end of file
More information about the jboss-cvs-commits
mailing list