[jboss-cvs] JBoss Messaging SVN: r1593 - branches/Branch_1_0_XARecovery/src/main/org/jboss/messaging/core/tx

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Nov 20 06:52:41 EST 2006


Author: juha at jboss.org
Date: 2006-11-20 06:52:39 -0500 (Mon, 20 Nov 2006)
New Revision: 1593

Modified:
   branches/Branch_1_0_XARecovery/src/main/org/jboss/messaging/core/tx/TransactionRepository.java
Log:
Original patch (JBMessaging-407) for TransactionRepository changes with comments. Requires JMX kernel integration fix and verification of correct message recovery semantics.

Modified: branches/Branch_1_0_XARecovery/src/main/org/jboss/messaging/core/tx/TransactionRepository.java
===================================================================
--- branches/Branch_1_0_XARecovery/src/main/org/jboss/messaging/core/tx/TransactionRepository.java	2006-11-20 11:29:44 UTC (rev 1592)
+++ branches/Branch_1_0_XARecovery/src/main/org/jboss/messaging/core/tx/TransactionRepository.java	2006-11-20 11:52:39 UTC (rev 1593)
@@ -30,7 +30,18 @@
 
 import org.jboss.logging.Logger;
 import org.jboss.messaging.core.plugin.IdManager;
+import org.jboss.messaging.core.plugin.SimpleMessageStore;
 import org.jboss.messaging.core.plugin.contract.PersistenceManager;
+import org.jboss.messaging.core.plugin.contract.MessageStore;
+import org.jboss.messaging.core.MessageReference;
+import org.jboss.messaging.core.Message;
+import org.jboss.messaging.core.SimpleDelivery;
+import org.jboss.messaging.core.Delivery;
+import org.jboss.messaging.core.DeliveryObserver;
+import org.jboss.messaging.core.local.CoreDestination;
+import org.jboss.jms.server.plugin.contract.ChannelMapper;
+import org.jboss.jms.server.plugin.JDBCChannelMapper;
+import org.jboss.jms.destination.JBossDestination;
 
 import EDU.oswego.cs.dl.util.concurrent.ConcurrentReaderHashMap;
 
@@ -39,6 +50,8 @@
  * This class maintains JMS Server local transactions.
  * 
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:Konda.Madhu at uk.mizuho-sc.com">Madhu Konda</a>
+ *
  * @version $Revision 1.1 $
  *
  * $Id$
@@ -82,48 +95,87 @@
    {
    }
 
-   public List getPreparedTransactions()
+	public List getPreparedTransactions()
    {
-      ArrayList prepared = new ArrayList();
-      Iterator iter = globalToLocalMap.values().iterator();
-      while (iter.hasNext())
+
+		ArrayList prepared = new ArrayList();
+
+		Iterator iter = globalToLocalMap.values().iterator();
+
+		while (iter.hasNext())
       {
-         Transaction tx = (Transaction)iter.next();
-         if (tx.xid != null && tx.getState() == Transaction.STATE_PREPARED)
+			Transaction tx = (Transaction) iter.next();
+
+			try
          {
-            prepared.add(tx.getXid());
-         }
-      }
-      return prepared;
-   }
-   
-   /*
-    * Load any prepared transactions into the repository so they can be recovered
-    */
-   public void loadPreparedTransactions() throws Exception
-   {
-      List prepared = null;
-            
-      prepared = persistenceManager.retrievePreparedTransactions();
-      
-      if (prepared != null)
-      {         
-         Iterator iter = prepared.iterator();
-         
-         while (iter.hasNext())
+				if(trace)
+					log.trace("Loading and handling refs and acks to the Tx "+tx);
+
+            // TODO: [JPL] should this only apply to STATE_PREPARED transactions?
+
+				handleReferences(tx, tx.getId());
+				handleAcks(tx, tx.getId());
+			}
+         catch (Exception e)
          {
-            Xid xid = (Xid)iter.next();
-            Transaction tx = createTransaction(xid);            
-            tx.state = Transaction.STATE_PREPARED;
-            
-            //Load the references for this transaction
-         }
-      }
-   }
-         
+            // TODO: [JPL] fix this
+				e.printStackTrace();
+			}
+
+			if (tx.xid != null && tx.getState() == Transaction.STATE_PREPARED)
+         {
+				prepared.add(tx.getXid());
+			}
+		}
+
+		return prepared;
+	}
+
+	/*
+	 * Load any prepared transactions into the repository so they can be
+	 * recovered
+	 */
+	public void loadPreparedTransactions() throws Exception {
+		List prepared = null;
+
+		prepared = persistenceManager.retrievePreparedTransactions();
+
+		if (prepared != null) {
+			Iterator iter = prepared.iterator();
+
+			while (iter.hasNext()) {
+				PreparedTxInfo txInfo = (PreparedTxInfo) iter.next();
+
+				Transaction tx = createTransaction(txInfo);
+				tx.state = Transaction.STATE_PREPARED;
+
+				// // we have to associate a callback to the prepared
+				// transaction
+				// // --MK
+				 persistenceManager.associateTxCallbackToPreparedTx(tx);
+				// log.info(">>>ASSOCIATING THE MEMCALLBACK <<<");
+
+            // TODO: [JPL] needs to be completed.
+
+            // associateInMemoryCallbackToPreparedTx(tx);
+				// Load the references for this transaction
+
+			}
+		}
+	}
+
+
    public Transaction getPreparedTx(Xid xid) throws Exception
    {
-      Transaction tx = (Transaction)globalToLocalMap.get(xid);
+		// the incoming Xid from Arjuna's RM is funny formatted!
+		// hence we have to do our way
+		// I have to look into this carefully a bit later.
+
+		Xid x = new XidImpl(xid.getBranchQualifier(), xid.getFormatId(), xid
+				.getGlobalTransactionId());
+
+      Transaction tx = (Transaction)globalToLocalMap.get(x);
+
       if (tx == null)
       {
          throw new TransactionException("Cannot find local tx for xid:" + xid);
@@ -153,9 +205,6 @@
 	   }
 	   
 	   globalToLocalMap.remove(id);
-	   
-	   
-	   
    }
    
    public Transaction createTransaction(Xid xid) throws Exception
@@ -180,8 +229,21 @@
 
       return tx;
    }
-   
-   
+
+
+   // TODO: [JPL] BEGIN REMOVE ------------------------------------------------
+
+   // [JPL] this is a wrong way of getting to the store, it ignores the
+   //       configuration settings and needs to be retrieved via the kernel
+
+	public MessageStore getMessageStore()
+	{
+		return new SimpleMessageStore("store.0");
+	}
+
+   // TODO: [JPL] END REMOVE --------------------------------------------------
+
+
    /** To be used only by testcases */
    public int getNumberOfRegisteredTransactions()
    {
@@ -193,7 +255,143 @@
    // Protected -----------------------------------------------------         
    
    // Private -------------------------------------------------------
-   
+
+	/**
+	 * Load the references and invoke the channel to handle those refs
+	 */
+	private void handleReferences(Transaction tx, long txId) throws Exception {
+
+		long messageId = persistenceManager.getMessageIdForRef(txId);
+
+		List refsList = getRefs(messageId);
+
+		// now we got all the refs
+		// for each ref loaded, we'll invoke channel.handle
+		for (Iterator iter = refsList.iterator(); iter.hasNext();)
+		{
+			CoreDestination d = getChannel(persistenceManager.getChannelId(txId), txId);
+
+         if (trace)
+			   log.trace("Handling the channel");
+
+         d.handle(null, (MessageReference) iter.next(), tx);
+		}
+	}
+
+	/**
+	 * Load the acks and acknowledge them
+	 */
+	private void handleAcks(Transaction tx, long txId) throws Exception {
+
+		long messageId = persistenceManager.getMessageIdForAck(txId);
+
+		List refsList = getRefs(messageId);
+
+		for (Iterator iter = refsList.iterator(); iter.hasNext();)
+		{
+			Delivery del = new SimpleDelivery(null, (MessageReference) iter.next());
+
+			try
+         {
+				if(trace)
+					log.trace("Acknowledging..");
+
+				((DeliveryObserver)del).acknowledge(del, tx);
+			}
+         catch (Throwable e)
+         {
+            // TODO: [JPL] fix this
+				e.printStackTrace();
+			}
+		}
+	}
+
+	/**
+	 * Get the message references based on the messageId from database
+	 */
+	private List getRefs(long messageId) throws Exception
+	{
+		List noRefsList = new ArrayList();
+		List refsList = new ArrayList();
+
+		// Find the message store
+      // TODO: [JPL] this needs to be fixed to go through the kernel
+		MessageStore ms = getMessageStore();
+
+		// and message reference from store
+		MessageReference ref = ms.reference(messageId);
+
+		// Store, sometime, does'nt know about the message referece
+		// and the above ref may be null. Hence we need to load actual message
+		// by goind back to the database and loading them based on id
+
+		if (ref == null)
+		{
+			noRefsList.add(new Long(messageId));
+		}
+		else
+		{
+			refsList.add(ref);
+		}
+
+		// ask the pm to get the messages from messageId list
+		List messagesList = persistenceManager.getMessages(noRefsList);
+
+		for (Iterator iter = messagesList.iterator(); iter.hasNext();)
+		{
+			Message m = (Message) iter.next();
+			MessageReference r = ms.reference(m);
+			refsList.add(r);
+		}
+
+		return refsList;
+	}
+
+	/**
+	 * This method returns a core destination representation based on
+	 * the channel id and transaction id
+	 */
+	private CoreDestination getChannel(long channelId, long txId)
+			throws Exception {
+
+		// first get the reference to the channel mapper
+		ChannelMapper mapper = persistenceManager.getChannelMapperReference();
+
+		// find out the destination from channelId
+		JBossDestination destn = ((JDBCChannelMapper)mapper).getJBossDestination(channelId);
+
+		// get the core destination from jbossdestination
+		return mapper.getCoreDestination(destn);
+
+	}
+
+	/**
+	 * Creates a prepared transaction
+	 *
+	 * @param txInfo
+	 * @return
+	 * @throws Exception
+	 */
+	private Transaction createTransaction(PreparedTxInfo txInfo) throws Exception
+   {
+		if (globalToLocalMap.containsKey(txInfo.getXid()))
+      {
+			throw new TransactionException(
+					"There is already a local tx for global tx "	+ txInfo.getXid());
+		}
+
+		// Resurrected tx
+		Transaction tx = new Transaction(txInfo.getTxId(), txInfo.getXid(), this);
+
+		if (trace) {
+			log.trace("created transaction " + tx);
+		}
+
+		globalToLocalMap.put(txInfo.getXid(), tx);
+
+		return tx;
+	}
+
    // Inner classes -------------------------------------------------
    
 }
\ No newline at end of file




More information about the jboss-cvs-commits mailing list