[jboss-cvs] JBoss Messaging SVN: r1741 - branches/Branch_1_0/src/main/org/jboss/messaging/core/tx
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Sat Dec 9 13:46:51 EST 2006
Author: juha at jboss.org
Date: 2006-12-09 13:46:50 -0500 (Sat, 09 Dec 2006)
New Revision: 1741
Modified:
branches/Branch_1_0/src/main/org/jboss/messaging/core/tx/TransactionRepository.java
Log:
Rewrite of the patch JBMESSAGING-407. Unhandled messages should now be correctly handled wrt the ongoing transaction and underlying channels. Methods getRefs and getChannel from the original patch removed.
Modified: branches/Branch_1_0/src/main/org/jboss/messaging/core/tx/TransactionRepository.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/messaging/core/tx/TransactionRepository.java 2006-12-09 18:17:15 UTC (rev 1740)
+++ branches/Branch_1_0/src/main/org/jboss/messaging/core/tx/TransactionRepository.java 2006-12-09 18:46:50 UTC (rev 1741)
@@ -28,7 +28,6 @@
import javax.transaction.xa.Xid;
import org.jboss.jms.destination.JBossDestination;
-import org.jboss.jms.server.plugin.JDBCChannelMapper;
import org.jboss.jms.server.plugin.contract.ChannelMapper;
import org.jboss.logging.Logger;
import org.jboss.messaging.core.Delivery;
@@ -97,7 +96,13 @@
{
}
- public List getPreparedTransactions()
+ /**
+ * Attempts to recover existing prepared transactions by redelivering unhandled messages and acknowledgements
+ * on the appropriate channels.
+ *
+ * @return List of Xid instances
+ */
+ public List recoverPreparedTransactions()
{
ArrayList prepared = new ArrayList();
@@ -111,15 +116,14 @@
{
try
{
- if(trace)
- log.trace("Loading and handling refs and acks to the Tx "+tx);
+ if(trace) log.trace("Loading and handling refs and acks to the Tx "+tx);
- handleReferences(tx, tx.getId());
- handleAcks(tx, tx.getId());
+ handleReferences(tx);
+ handleAcks(tx);
}
catch (Exception e)
{
- log.debug("Exception in replaying a prepared transaction.", e);
+ log.warn("Failed to replay transaction (XID: " + tx.getXid() + ", LocalID: " + tx.getId() + ") during recovery.", e);
}
prepared.add(tx.getXid());
@@ -135,30 +139,24 @@
*/
public void loadPreparedTransactions() throws Exception
{
+ if (trace) log.trace("load prepared transactions...");
+
List prepared = null;
prepared = persistenceManager.retrievePreparedTransactions();
+ if (trace) log.trace ("Found " + prepared.size() + " transactions in prepared state:");
+
if (prepared != null) {
Iterator iter = prepared.iterator();
while (iter.hasNext()) {
PreparedTxInfo txInfo = (PreparedTxInfo) iter.next();
+ if (trace) log.trace("Reinstating TX(XID: " + txInfo.getXid() + ", LocalId " + txInfo.getTxId() +")");
+
Transaction tx = createTransaction(txInfo);
tx.state = Transaction.STATE_PREPARED;
-
- // // we have to associate a callback to the prepared
- // transaction
- // // --MK
- persistenceManager.associateTxCallbackToPreparedTx(tx);
- // log.info(">>>ASSOCIATING THE MEMCALLBACK <<<");
-
- // TODO: [JPL] needs to be completed.
-
- // associateInMemoryCallbackToPreparedTx(tx);
- // Load the references for this transaction
-
}
}
}
@@ -236,42 +234,55 @@
/**
* Load the references and invoke the channel to handle those refs
*/
- private void handleReferences(Transaction tx, long txId) throws Exception {
+ private void handleReferences(Transaction tx) throws Exception {
- long messageId = persistenceManager.getMessageIdForRef(txId);
+ if (trace) log.trace("Handle references for TX(XID: " + tx.getXid() + ", LocalID: " + tx.getId()+ "):");
- List refsList = getRefs(messageId);
+ long txId = tx.getId();
- // now we got all the refs
- // for each ref loaded, we'll invoke channel.handle
- for (Iterator iter = refsList.iterator(); iter.hasNext();)
- {
- CoreDestination d = getChannel(persistenceManager.getChannelId(txId), txId);
+ Map messageChannelPair = persistenceManager.getMessageChannelPairForTx(txId);
- if (trace)
- log.trace("Handling the channel");
+ if (trace) log.trace("Found " + messageChannelPair.size() + " unhandled messages.");
- d.handle(null, (MessageReference) iter.next(), tx);
+ for (Iterator iter = messageChannelPair.keySet().iterator(); iter.hasNext();)
+ {
+ Message msg = (Message)iter.next();
+ MessageReference ref = messageStore.reference(msg);
+
+ long channelID = ((Long)messageChannelPair.get(msg)).longValue();
+
+ ChannelMapper mapper = persistenceManager.getChannels();
+ JBossDestination destn = mapper.getJBossDestination(channelID);
+
+ CoreDestination d = mapper.getCoreDestination(destn);
+
+ if (trace) log.trace("Destination for message[ID=" + ref.getMessageID() + "] is: " + d);
+
+ d.handle(null, ref, tx);
}
}
/**
* Load the acks and acknowledge them
*/
- private void handleAcks(Transaction tx, long txId) throws Exception {
+ private void handleAcks(Transaction tx) throws Exception {
- long messageId = persistenceManager.getMessageIdForAck(txId);
+ long txId = tx.getId();
- List refsList = getRefs(messageId);
+ List messages = persistenceManager.getAcksForTx(txId);
- for (Iterator iter = refsList.iterator(); iter.hasNext();)
- {
- Delivery del = new SimpleDelivery(null, (MessageReference) iter.next());
+ if (trace) log.trace("Found " + messages.size() + " unhandled acks.");
+ for (Iterator iter = messages.iterator(); iter.hasNext();)
+ {
+ Message msg = (Message)iter.next();
+ MessageReference ref = messageStore.reference(msg);
+
+ Delivery del = new SimpleDelivery(null, ref);
+
try
{
- if(trace)
- log.trace("Acknowledging..");
+ if(trace) log.trace("Acknowledging..");
((DeliveryObserver)del).acknowledge(del, tx);
}
@@ -282,62 +293,8 @@
}
}
- /**
- * Get the message references based on the messageId from database
- */
- private List getRefs(long messageId) throws Exception
- {
- List noRefsList = new ArrayList();
- List refsList = new ArrayList();
- // and message reference from store
- MessageReference ref = messageStore.reference(messageId);
-
- // Store, sometime, does'nt know about the message referece
- // and the above ref may be null. Hence we need to load actual message
- // by goind back to the database and loading them based on id
-
- if (ref == null)
- {
- noRefsList.add(new Long(messageId));
- }
- else
- {
- refsList.add(ref);
- }
-
- // ask the pm to get the messages from messageId list
- List messagesList = persistenceManager.getMessages(noRefsList);
-
- for (Iterator iter = messagesList.iterator(); iter.hasNext();)
- {
- Message m = (Message) iter.next();
- MessageReference r = messageStore.reference(m);
- refsList.add(r);
- }
-
- return refsList;
- }
-
/**
- * This method returns a core destination representation based on
- * the channel id and transaction id
- */
- private CoreDestination getChannel(long channelId, long txId)
- throws Exception {
-
- // first get the reference to the channel mapper
- ChannelMapper mapper = persistenceManager.getChannelMapperReference();
-
- // find out the destination from channelId
- JBossDestination destn = ((JDBCChannelMapper)mapper).getJBossDestination(channelId);
-
- // get the core destination from jbossdestination
- return mapper.getCoreDestination(destn);
-
- }
-
- /**
* Creates a prepared transaction
*
* @param txInfo
More information about the jboss-cvs-commits
mailing list