[jboss-cvs] jboss-jms/src/main/org/jboss/jms/tx ...

Timothy Fox tim.fox at jboss.com
Mon Jul 17 13:14:47 EDT 2006


  User: timfox  
  Date: 06/07/17 13:14:47

  Modified:    src/main/org/jboss/jms/tx     AckInfo.java
                        MessagingXAResource.java ResourceManager.java
                        TxState.java
  Log:
  Many changes including implementation of prefetch, SEDAisation of server, changing of recovery
  
  Revision  Changes    Path
  1.12      +20 -3     jboss-jms/src/main/org/jboss/jms/tx/AckInfo.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: AckInfo.java
  ===================================================================
  RCS file: /cvsroot/jboss/jboss-jms/src/main/org/jboss/jms/tx/AckInfo.java,v
  retrieving revision 1.11
  retrieving revision 1.12
  diff -u -b -r1.11 -r1.12
  --- AckInfo.java	25 Mar 2006 05:09:16 -0000	1.11
  +++ AckInfo.java	17 Jul 2006 17:14:47 -0000	1.12
  @@ -26,13 +26,15 @@
   import java.io.ObjectInput;
   import java.io.ObjectOutput;
   
  +import org.jboss.jms.message.MessageProxy;
  +
   /**
    * Struct like class for holding information regarding an acknowledgement to be passed to the server
    * for processing.
    * 
    * @author <a href="mailto:tim.fox at jboss.com>Tim Fox </a>
    *
  - * $Id: AckInfo.java,v 1.11 2006/03/25 05:09:16 ovidiu Exp $
  + * $Id: AckInfo.java,v 1.12 2006/07/17 17:14:47 timfox Exp $
    */
   public class AckInfo implements Externalizable
   {
  @@ -46,6 +48,9 @@
      
      protected int consumerID;
   
  +   //The actual proxy must not get serialized
  +   protected transient MessageProxy msg;
  +   
      // Static --------------------------------------------------------
      
      // Constructors --------------------------------------------------
  @@ -54,6 +59,13 @@
      {      
      }
      
  +   public AckInfo(MessageProxy proxy, int consumerID)
  +   {
  +      this.msg = proxy;
  +      this.messageID = proxy.getMessage().getMessageID();
  +      this.consumerID = consumerID;    
  +   }
  +   
      public AckInfo(long messageID, int consumerID)
      {
         this.messageID = messageID;
  @@ -72,6 +84,11 @@
         return consumerID;
      }
   
  +   public MessageProxy getMessage()
  +   {
  +      return msg;
  +   }
  +
      public String toString()
      {
         return "AckInfo[" + messageID + ", " + consumerID + "]";
  
  
  
  1.4       +3 -0      jboss-jms/src/main/org/jboss/jms/tx/MessagingXAResource.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: MessagingXAResource.java
  ===================================================================
  RCS file: /cvsroot/jboss/jboss-jms/src/main/org/jboss/jms/tx/MessagingXAResource.java,v
  retrieving revision 1.3
  retrieving revision 1.4
  diff -u -b -r1.3 -r1.4
  --- MessagingXAResource.java	29 Apr 2006 02:43:41 -0000	1.3
  +++ MessagingXAResource.java	17 Jul 2006 17:14:47 -0000	1.4
  @@ -162,6 +162,9 @@
      {
         if (trace) { log.trace(this + " rolling back " + xid); }
   
  +      //TODO Hmmm on rollback should we also stop and start the consumers to remove any transient messages
  +      //Like we do on local session rollback??
  +      
         rm.rollback(xid, connection);
      }
   
  
  
  
  1.21      +82 -2     jboss-jms/src/main/org/jboss/jms/tx/ResourceManager.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: ResourceManager.java
  ===================================================================
  RCS file: /cvsroot/jboss/jboss-jms/src/main/org/jboss/jms/tx/ResourceManager.java,v
  retrieving revision 1.20
  retrieving revision 1.21
  diff -u -b -r1.20 -r1.21
  --- ResourceManager.java	29 Apr 2006 02:43:41 -0000	1.20
  +++ ResourceManager.java	17 Jul 2006 17:14:47 -0000	1.21
  @@ -21,6 +21,13 @@
     */
   package org.jboss.jms.tx;
   
  +import java.util.ArrayList;
  +import java.util.Iterator;
  +import java.util.LinkedHashMap;
  +import java.util.LinkedList;
  +import java.util.List;
  +import java.util.Map;
  +
   import javax.jms.IllegalStateException;
   import javax.jms.JMSException;
   import javax.jms.Message;
  @@ -30,6 +37,7 @@
   import javax.transaction.xa.Xid;
   
   import org.jboss.jms.delegate.ConnectionDelegate;
  +import org.jboss.jms.delegate.SessionDelegate;
   import org.jboss.jms.util.MessagingXAException;
   import org.jboss.logging.Logger;
   
  @@ -47,9 +55,9 @@
    *
    * @author <a href="mailto:Cojonudo14 at hotmail.com">Hiram Chirino</a>
    * @author <a href="mailto:adrian at jboss.org">Adrian Brock</a>
  - * @version $Revision: 1.20 $
  + * @version $Revision: 1.21 $
    *
  - * $Id: ResourceManager.java,v 1.20 2006/04/29 02:43:41 ovidiu Exp $
  + * $Id: ResourceManager.java,v 1.21 2006/07/17 17:14:47 timfox Exp $
    */
   public class ResourceManager
   {
  @@ -119,6 +127,8 @@
       * 
       * @param xid - The id of the transaction to add the message to
       * @param ackInfo Information describing the acknowledgement
  +    * @param sessionState - the session the ack is in - we need this so on rollback we can tell each session
  +    * to redeliver it's messages
       */
      public void addAck(Object xid, AckInfo ackInfo) throws JMSException
      {
  @@ -170,6 +180,8 @@
            new TransactionRequest(TransactionRequest.ONE_PHASE_ROLLBACK_REQUEST, null, tx);
         
         connection.sendTransaction(request);
  +      
  +      redeliverMessages(tx);
      }
      
      public void commit(Xid xid, boolean onePhase, ConnectionDelegate connection) throws XAException
  @@ -252,8 +264,76 @@
         }
         
         sendTransactionXA(request, connection);
  +      
  +      try
  +      {
  +         redeliverMessages(tx);
  +      }
  +      catch (JMSException e)
  +      {
  +         log.error("Failed to redeliver", e);
  +      }
  +   }
  +   
  +   
  +   /*
  +    * Rollback has occurred so we need to redeliver any unacked messages corresponding to the acks
  +    * is in the transaction
  +    */
  +   private void redeliverMessages(TxState tx) throws JMSException
  +   {
  +      Iterator iter = tx.getAcks().iterator();
  +      
  +      //Sort them into lists - one for each session
  +        
  +      //We use a LinkedHashMap since we need to preserve the order of the sessions
  +      Map toAck = new LinkedHashMap();
  +      
  +      while (iter.hasNext())
  +      {
  +         AckInfo ack = (AckInfo)iter.next();
  +         
  +         SessionDelegate del = ack.msg.getSessionDelegate();
  +         
  +         List acks = (List)toAck.get(del);
  +         
  +         if (acks == null)
  +         {
  +            acks = new ArrayList();
  +            
  +            toAck.put(del, acks);
  +         }
  +         
  +         acks.add(ack);
      }
      
  +      //Now tell each session to redeliver
  +      
  +      LinkedList l = new LinkedList();
  +      
  +      iter = toAck.entrySet().iterator();
  +                  
  +      //need to reverse the order
  +      while (iter.hasNext())
  +      {
  +         Object entry = iter.next();
  +         
  +         l.addFirst(entry);         
  +      }
  +      
  +      iter = l.iterator();
  +      
  +      while (iter.hasNext())
  +      {
  +         Map.Entry entry = (Map.Entry)iter.next();
  +         
  +         SessionDelegate sess = (SessionDelegate)entry.getKey();
  +         
  +         List acks = (List)entry.getValue();
  +         
  +         sess.redeliver(acks);
  +      }  
  +   }
   
      // Protected ------------------------------------------------------
      
  
  
  
  1.9       +4 -1      jboss-jms/src/main/org/jboss/jms/tx/TxState.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: TxState.java
  ===================================================================
  RCS file: /cvsroot/jboss/jboss-jms/src/main/org/jboss/jms/tx/TxState.java,v
  retrieving revision 1.8
  retrieving revision 1.9
  diff -u -b -r1.8 -r1.9
  --- TxState.java	20 Apr 2006 20:42:27 -0000	1.8
  +++ TxState.java	17 Jul 2006 17:14:47 -0000	1.9
  @@ -26,9 +26,12 @@
   import java.io.ObjectInput;
   import java.io.ObjectOutput;
   import java.util.ArrayList;
  +import java.util.HashSet;
   import java.util.Iterator;
   import java.util.List;
  +import java.util.Set;
   
  +import org.jboss.jms.client.state.SessionState;
   import org.jboss.jms.message.JBossMessage;
   import org.jboss.messaging.core.message.MessageFactory;
   
  
  
  



More information about the jboss-cvs-commits mailing list