[jboss-cvs] JBoss Messaging SVN: r1741 - branches/Branch_1_0/src/main/org/jboss/messaging/core/tx

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Sat Dec 9 13:46:51 EST 2006


Author: juha at jboss.org
Date: 2006-12-09 13:46:50 -0500 (Sat, 09 Dec 2006)
New Revision: 1741

Modified:
   branches/Branch_1_0/src/main/org/jboss/messaging/core/tx/TransactionRepository.java
Log:
Rewrite of the patch JBMESSAGING-407. Unhandled messages should now be correctly handled wrt the ongoing transaction and underlying channels. Methods getRefs and getChannel from the original patch removed.

Modified: branches/Branch_1_0/src/main/org/jboss/messaging/core/tx/TransactionRepository.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/messaging/core/tx/TransactionRepository.java	2006-12-09 18:17:15 UTC (rev 1740)
+++ branches/Branch_1_0/src/main/org/jboss/messaging/core/tx/TransactionRepository.java	2006-12-09 18:46:50 UTC (rev 1741)
@@ -28,7 +28,6 @@
 import javax.transaction.xa.Xid;
 
 import org.jboss.jms.destination.JBossDestination;
-import org.jboss.jms.server.plugin.JDBCChannelMapper;
 import org.jboss.jms.server.plugin.contract.ChannelMapper;
 import org.jboss.logging.Logger;
 import org.jboss.messaging.core.Delivery;
@@ -97,7 +96,13 @@
    {
    }
 
-	public List getPreparedTransactions()
+   /**
+    * Attempts to recover existing prepared transactions by redelivering unhandled messages and acknowledgements
+    * on the appropriate channels.
+    *
+    * @return  List of Xid instances
+    */
+	public List recoverPreparedTransactions()
    {
 		ArrayList prepared = new ArrayList();
 
@@ -111,15 +116,14 @@
          {
             try
             {
-               if(trace)
-                  log.trace("Loading and handling refs and acks to the Tx "+tx);
+               if(trace) log.trace("Loading and handling refs and acks to the Tx "+tx);
 
-               handleReferences(tx, tx.getId());
-               handleAcks(tx, tx.getId());
+               handleReferences(tx);
+               handleAcks(tx);
             }
             catch (Exception e)
             {
-               log.debug("Exception in replaying a prepared transaction.", e);
+               log.warn("Failed to replay transaction (XID: " + tx.getXid() + ", LocalID: " + tx.getId() + ") during recovery.", e);
             }
 
 				prepared.add(tx.getXid());
@@ -135,30 +139,24 @@
 	 */
 	public void loadPreparedTransactions() throws Exception
    {
+      if (trace) log.trace("load prepared transactions...");
+
 		List prepared = null;
 
 		prepared = persistenceManager.retrievePreparedTransactions();
 
+      if (trace) log.trace ("Found " + prepared.size() + " transactions in prepared state:");
+
 		if (prepared != null) {
 			Iterator iter = prepared.iterator();
 
 			while (iter.hasNext()) {
 				PreparedTxInfo txInfo = (PreparedTxInfo) iter.next();
 
+            if (trace) log.trace("Reinstating TX(XID: " + txInfo.getXid() + ", LocalId " + txInfo.getTxId() +")");
+            
 				Transaction tx = createTransaction(txInfo);
 				tx.state = Transaction.STATE_PREPARED;
-
-				// // we have to associate a callback to the prepared
-				// transaction
-				// // --MK
-				persistenceManager.associateTxCallbackToPreparedTx(tx);
-				// log.info(">>>ASSOCIATING THE MEMCALLBACK <<<");
-
-            // TODO: [JPL] needs to be completed.
-
-            // associateInMemoryCallbackToPreparedTx(tx);
-				// Load the references for this transaction
-
 			}
 		}
 	}
@@ -236,42 +234,55 @@
 	/**
 	 * Load the references and invoke the channel to handle those refs
 	 */
-	private void handleReferences(Transaction tx, long txId) throws Exception {
+	private void handleReferences(Transaction tx) throws Exception {
 
-		long messageId = persistenceManager.getMessageIdForRef(txId);
+      if (trace) log.trace("Handle references for TX(XID: " + tx.getXid() + ", LocalID: " + tx.getId()+ "):");
 
-		List refsList = getRefs(messageId);
+      long txId = tx.getId();
 
-		// now we got all the refs
-		// for each ref loaded, we'll invoke channel.handle
-		for (Iterator iter = refsList.iterator(); iter.hasNext();)
-		{
-			CoreDestination d = getChannel(persistenceManager.getChannelId(txId), txId);
+		Map messageChannelPair = persistenceManager.getMessageChannelPairForTx(txId);
 
-         if (trace)
-			   log.trace("Handling the channel");
+      if (trace) log.trace("Found " + messageChannelPair.size() + " unhandled messages.");
 
-         d.handle(null, (MessageReference) iter.next(), tx);
+      for (Iterator iter = messageChannelPair.keySet().iterator(); iter.hasNext();)
+      {
+         Message msg = (Message)iter.next();
+         MessageReference ref = messageStore.reference(msg);
+
+         long channelID = ((Long)messageChannelPair.get(msg)).longValue();
+
+         ChannelMapper mapper = persistenceManager.getChannels();
+         JBossDestination destn = mapper.getJBossDestination(channelID);
+
+         CoreDestination d = mapper.getCoreDestination(destn);
+
+         if (trace) log.trace("Destination for message[ID=" + ref.getMessageID() + "] is: " + d);
+
+         d.handle(null, ref, tx);
 		}
 	}
 
 	/**
 	 * Load the acks and acknowledge them
 	 */
-	private void handleAcks(Transaction tx, long txId) throws Exception {
+	private void handleAcks(Transaction tx) throws Exception {
 
-		long messageId = persistenceManager.getMessageIdForAck(txId);
+      long txId = tx.getId();
 
-		List refsList = getRefs(messageId);
+		List messages = persistenceManager.getAcksForTx(txId);
 
-		for (Iterator iter = refsList.iterator(); iter.hasNext();)
-		{
-			Delivery del = new SimpleDelivery(null, (MessageReference) iter.next());
+      if (trace) log.trace("Found " + messages.size() + " unhandled acks.");
 
+      for (Iterator iter = messages.iterator(); iter.hasNext();)
+      {
+         Message msg = (Message)iter.next();
+         MessageReference ref = messageStore.reference(msg);
+
+			Delivery del = new SimpleDelivery(null, ref);
+
 			try
          {
-				if(trace)
-					log.trace("Acknowledging..");
+				if(trace) log.trace("Acknowledging..");
 
 				((DeliveryObserver)del).acknowledge(del, tx);
 			}
@@ -282,62 +293,8 @@
 		}
 	}
 
-	/**
-	 * Get the message references based on the messageId from database
-	 */
-	private List getRefs(long messageId) throws Exception
-	{
-		List noRefsList = new ArrayList();
-		List refsList = new ArrayList();
 
-		// and message reference from store
-		MessageReference ref = messageStore.reference(messageId);
-
-		// Store, sometime, does'nt know about the message referece
-		// and the above ref may be null. Hence we need to load actual message
-		// by goind back to the database and loading them based on id
-
-		if (ref == null)
-		{
-			noRefsList.add(new Long(messageId));
-		}
-		else
-		{
-			refsList.add(ref);
-		}
-
-		// ask the pm to get the messages from messageId list
-		List messagesList = persistenceManager.getMessages(noRefsList);
-
-		for (Iterator iter = messagesList.iterator(); iter.hasNext();)
-		{
-			Message m = (Message) iter.next();
-			MessageReference r = messageStore.reference(m);
-			refsList.add(r);
-		}
-
-		return refsList;
-	}
-
 	/**
-	 * This method returns a core destination representation based on
-	 * the channel id and transaction id
-	 */
-	private CoreDestination getChannel(long channelId, long txId)
-			throws Exception {
-
-		// first get the reference to the channel mapper
-		ChannelMapper mapper = persistenceManager.getChannelMapperReference();
-
-		// find out the destination from channelId
-		JBossDestination destn = ((JDBCChannelMapper)mapper).getJBossDestination(channelId);
-
-		// get the core destination from jbossdestination
-		return mapper.getCoreDestination(destn);
-
-	}
-
-	/**
 	 * Creates a prepared transaction
 	 *
 	 * @param txInfo




More information about the jboss-cvs-commits mailing list