[jboss-cvs] jboss-jms/src/main/org/jboss/jms/client/container ...

Timothy Fox tim.fox at jboss.com
Mon Jul 17 13:14:44 EDT 2006


  User: timfox  
  Date: 06/07/17 13:14:44

  Modified:    src/main/org/jboss/jms/client/container       AsfAspect.java
                        ConsumerAspect.java ProducerAspect.java
                        SessionAspect.java StateCreationAspect.java
                        TransactionAspect.java
  Log:
  Many changes including implementation of prefetch, SEDAisation of server, changing of recovery
  
  Revision  Changes    Path
  1.12      +4 -2      jboss-jms/src/main/org/jboss/jms/client/container/AsfAspect.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: AsfAspect.java
  ===================================================================
  RCS file: /cvsroot/jboss/jboss-jms/src/main/org/jboss/jms/client/container/AsfAspect.java,v
  retrieving revision 1.11
  retrieving revision 1.12
  diff -u -b -r1.11 -r1.12
  --- AsfAspect.java	6 May 2006 05:05:39 -0000	1.11
  +++ AsfAspect.java	17 Jul 2006 17:14:44 -0000	1.12
  @@ -51,7 +51,7 @@
    *  
    * @author <a href="mailto:tim.fox at jboss.com>Tim Fox</a>
    *
  - * $Id: AsfAspect.java,v 1.11 2006/05/06 05:05:39 ovidiu Exp $
  + * $Id: AsfAspect.java,v 1.12 2006/07/17 17:14:44 timfox Exp $
    */
   public class AsfAspect
   {
  @@ -162,7 +162,9 @@
   
            if (trace) { log.trace("sending " + holder.msg + " to the message listener" ); }
            
  -         MessageCallbackHandler.callOnMessage(holder.consumerDelegate, del, sessionListener, holder.consumerID, false, holder.msg, ackMode);                          
  +         MessageCallbackHandler.callOnMessage(holder.consumerDelegate, del,
  +                                              sessionListener, holder.consumerID, false,
  +                                              holder.msg, ackMode);                          
         }
         
         return null;
  
  
  
  1.13      +34 -10    jboss-jms/src/main/org/jboss/jms/client/container/ConsumerAspect.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: ConsumerAspect.java
  ===================================================================
  RCS file: /cvsroot/jboss/jboss-jms/src/main/org/jboss/jms/client/container/ConsumerAspect.java,v
  retrieving revision 1.12
  retrieving revision 1.13
  diff -u -b -r1.12 -r1.13
  --- ConsumerAspect.java	6 May 2006 05:05:39 -0000	1.12
  +++ ConsumerAspect.java	17 Jul 2006 17:14:44 -0000	1.13
  @@ -24,13 +24,17 @@
   import org.jboss.aop.joinpoint.Invocation;
   import org.jboss.aop.joinpoint.MethodInvocation;
   import org.jboss.jms.client.delegate.DelegateSupport;
  -import org.jboss.jms.client.remoting.MessageCallbackHandler;
   import org.jboss.jms.client.remoting.CallbackManager;
  +import org.jboss.jms.client.remoting.MessageCallbackHandler;
   import org.jboss.jms.client.state.ConnectionState;
   import org.jboss.jms.client.state.ConsumerState;
   import org.jboss.jms.client.state.SessionState;
   import org.jboss.jms.delegate.ConsumerDelegate;
   import org.jboss.jms.delegate.SessionDelegate;
  +import org.jboss.jms.server.endpoint.ServerBrowserEndpoint;
  +import org.jboss.logging.Logger;
  +
  +import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
   
   /**
    * 
  @@ -40,14 +44,17 @@
    * 
    * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
    * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  - * @version <tt>$Revision: 1.12 $</tt>
  + * @version <tt>$Revision: 1.13 $</tt>
    *
  - * $Id: ConsumerAspect.java,v 1.12 2006/05/06 05:05:39 ovidiu Exp $
  + * $Id: ConsumerAspect.java,v 1.13 2006/07/17 17:14:44 timfox Exp $
    */
   public class ConsumerAspect
   {
      // Constants -----------------------------------------------------
   
  +   private static final Logger log = Logger.getLogger(ConsumerAspect.class);
  +
  +   
      // Static --------------------------------------------------------
   
      // Attributes ----------------------------------------------------
  @@ -71,32 +78,50 @@
         SessionDelegate sessionDelegate = (SessionDelegate)invocation.getTargetObject();
         ConsumerState consumerState = (ConsumerState)((DelegateSupport)consumerDelegate).getState();
         int consumerID = consumerState.getConsumerID();
  +      int prefetchSize = consumerState.getPrefetchSize();
  +      QueuedExecutor sessionExecutor = sessionState.getExecutor();
         
         MessageCallbackHandler messageHandler =
            new MessageCallbackHandler(isCC, sessionState.getAcknowledgeMode(),
  -                                    sessionState.getExecutor(), connectionState.getPooledExecutor(),
  -                                    sessionDelegate, consumerDelegate, consumerID);
  +                                    sessionDelegate, consumerDelegate, consumerID,
  +                                    prefetchSize, sessionExecutor);
  +      
  +      sessionState.addCallbackHandler(messageHandler);
         
         CallbackManager cm = connectionState.getRemotingConnection().getCallbackManager();
         cm.registerHandler(consumerID, messageHandler);
            
         consumerState.setMessageCallbackHandler(messageHandler);
   
  +      //Now we have finished creating the client consumer, we can tell the SCD
  +      //we are ready
  +      consumerDelegate.more();
  +      
         return consumerDelegate;
      }
      
      public Object handleClosing(Invocation invocation) throws Throwable
      {      
  +      //First we make sure closing is called on the ServerConsumerEndpoint
  +      //This ensures that any in transit messages are flushed out to the client side
  +      Object res = invocation.invokeNext();
  +      
         ConsumerState consumerState = getState(invocation);
         
  -      ConnectionState connectionState = (ConnectionState)consumerState.getParent().getParent();
  +      SessionState sessionState = (SessionState)consumerState.getParent();
  +      
  +      ConnectionState connectionState = (ConnectionState)sessionState.getParent();
               
  +      //Then we call close on the messagecallbackhandler which waits for onMessage invocations
  +      //to complete and then cancels anything in the client buffer
         consumerState.getMessageCallbackHandler().close();
   
  +      sessionState.removeCallbackHandler(consumerState.getMessageCallbackHandler());
  +
         CallbackManager cm = connectionState.getRemotingConnection().getCallbackManager();
         cm.unregisterHandler(consumerState.getConsumerID());
               
  -      return invocation.invokeNext();
  +      return res;
      }
      
      public Object handleGetDestination(Invocation invocation) throws Throwable
  @@ -114,7 +139,6 @@
         return getState(invocation).getSelector();
      }
      
  -
      // Package protected ---------------------------------------------
   
      // Protected -----------------------------------------------------
  
  
  
  1.18      +18 -2     jboss-jms/src/main/org/jboss/jms/client/container/ProducerAspect.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: ProducerAspect.java
  ===================================================================
  RCS file: /cvsroot/jboss/jboss-jms/src/main/org/jboss/jms/client/container/ProducerAspect.java,v
  retrieving revision 1.17
  retrieving revision 1.18
  diff -u -b -r1.17 -r1.18
  --- ProducerAspect.java	6 May 2006 05:05:39 -0000	1.17
  +++ ProducerAspect.java	17 Jul 2006 17:14:44 -0000	1.18
  @@ -53,9 +53,9 @@
    * This aspect is PER_INSTANCE.
    *
    * @author <a href="mailto:tim.fox at jboss.com>Tim Fox</a>
  - * @version <tt>$Revision: 1.17 $</tt>
  + * @version <tt>$Revision: 1.18 $</tt>
    *
  - * $Id: ProducerAspect.java,v 1.17 2006/05/06 05:05:39 ovidiu Exp $
  + * $Id: ProducerAspect.java,v 1.18 2006/07/17 17:14:44 timfox Exp $
    */
   public class ProducerAspect
   {   
  @@ -169,6 +169,8 @@
         JBossMessage messageToSend;
         boolean foreign = false;
   
  +      boolean doCopy = false;
  +      
         if (!(m instanceof MessageProxy))
         {
            // it's a foreign message
  @@ -211,6 +213,12 @@
            // get the actual message
            MessageProxy proxy = (MessageProxy)m;
            messageToSend = proxy.getMessage();
  +                  
  +         if (proxy.isSent())
  +         {
  +            doCopy = true;
  +         }
  +         
            messageToSend.doAfterSend();
            proxy.setSent();
         }
  @@ -224,6 +232,14 @@
         messageToSend.setJMSMessageID(null);
         messageToSend.setMessageId(id);
   
  +      //If the message has already been sent we need to make a shallow copy since if we are invm then we do not
  +      //want to change the ids of messages already sent - which would happen if we were sending the same
  +      //underlying instance
  +      if (doCopy)
  +      {
  +         messageToSend = messageToSend.doShallowCopy();
  +      }
  +
         // now that we know the messageID, set it also on the foreign message, if is the case
         if (foreign)
         {
  
  
  
  1.10      +181 -14   jboss-jms/src/main/org/jboss/jms/client/container/SessionAspect.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: SessionAspect.java
  ===================================================================
  RCS file: /cvsroot/jboss/jboss-jms/src/main/org/jboss/jms/client/container/SessionAspect.java,v
  retrieving revision 1.9
  retrieving revision 1.10
  diff -u -b -r1.9 -r1.10
  --- SessionAspect.java	27 Apr 2006 17:38:09 -0000	1.9
  +++ SessionAspect.java	17 Jul 2006 17:14:44 -0000	1.10
  @@ -21,14 +21,20 @@
     */
   package org.jboss.jms.client.container;
   
  +import java.util.LinkedList;
  +import java.util.List;
  +
   import javax.jms.IllegalStateException;
   import javax.jms.Session;
   
   import org.jboss.aop.joinpoint.Invocation;
   import org.jboss.aop.joinpoint.MethodInvocation;
  -import org.jboss.jms.client.state.SessionState;
   import org.jboss.jms.client.delegate.DelegateSupport;
  +import org.jboss.jms.client.remoting.MessageCallbackHandler;
  +import org.jboss.jms.client.state.SessionState;
   import org.jboss.jms.delegate.SessionDelegate;
  +import org.jboss.jms.message.MessageProxy;
  +import org.jboss.jms.tx.AckInfo;
   import org.jboss.logging.Logger;
   import org.jboss.messaging.util.Util;
   
  @@ -39,7 +45,7 @@
    *
    * @author <a href="mailto:tim.fox at jboss.com>Tim Fox</a>
    *
  - * $Id: SessionAspect.java,v 1.9 2006/04/27 17:38:09 ovidiu Exp $
  + * $Id: SessionAspect.java,v 1.10 2006/07/17 17:14:44 timfox Exp $
    */
   public class SessionAspect
   {
  @@ -57,47 +63,208 @@
      
      // Public --------------------------------------------------------
   
  +   public Object handlePreDeliver(Invocation invocation) throws Throwable
  +   { 
  +      MethodInvocation mi = (MethodInvocation)invocation;
  +      
  +      SessionState state = getState(invocation);
  +      
  +      int ackMode = state.getAcknowledgeMode();
  +      
  +      if (ackMode == Session.CLIENT_ACKNOWLEDGE || ackMode == Session.AUTO_ACKNOWLEDGE ||
  +          ackMode == Session.DUPS_OK_ACKNOWLEDGE)
  +      {
  +         SessionDelegate del = (SessionDelegate)mi.getTargetObject();
  +         
  +         //We store the ack in a list for later acknowledgement or recovery
  +         
  +         Object[] args = mi.getArguments();
  +         
  +         MessageProxy mp = (MessageProxy)args[0];
  +         
  +         int consumerID = ((Integer)args[1]).intValue();
  +         
  +         AckInfo info = new AckInfo(mp, consumerID);
  +         
  +         state.getToAck().add(info);
  +         
  +         if (trace) { log.trace("ack mode is " + Util.acknowledgmentModeToString(ackMode)+ ", acknowledged on " + del); }
  +      }
  +
  +      return invocation.invokeNext();
  +   }
  +   
  +   public Object handleAcknowledgeAll(Invocation invocation) throws Throwable
  +   {    
  +      MethodInvocation mi = (MethodInvocation)invocation;
  +      
  +      SessionState state = getState(invocation);
  +      
  +      SessionDelegate del = (SessionDelegate)mi.getTargetObject();
  +    
  +      if (!state.getToAck().isEmpty())
  +      {                  
  +         del.acknowledgeBatch(state.getToAck());
  +      
  +         state.getToAck().clear();
  +      }
  +        
  +      return null;
  +   }
      
      public Object handlePostDeliver(Invocation invocation) throws Throwable
      { 
         MethodInvocation mi = (MethodInvocation)invocation;
         
  -      int ackMode = getState(invocation).getAcknowledgeMode();
  +      SessionState state = getState(invocation);
  +      
  +      int ackMode = state.getAcknowledgeMode();
         
  -      if (ackMode != Session.SESSION_TRANSACTED && ackMode != Session.CLIENT_ACKNOWLEDGE)
  +      if (ackMode == Session.AUTO_ACKNOWLEDGE || ackMode == Session.DUPS_OK_ACKNOWLEDGE)
         {
            SessionDelegate del = (SessionDelegate)mi.getTargetObject();
            
            //We acknowledge immediately
  -         del.acknowledge();
  +         
  +         if (!state.isRecoverCalled())
  +         {
  +            //We don't acknowledge the message if recover() was called
  +            
  +            Object[] args = mi.getArguments();
  +            
  +            MessageProxy proxy = (MessageProxy)args[0];
  +                   
  +            int consumerID = ((Integer)args[1]).intValue();
  +
  +            AckInfo ack = new AckInfo(proxy, consumerID);
  +            
  +            del.acknowledge(ack);   
  +               
  +            state.getToAck().clear();            
  +         }
  +         else
  +         {
  +            state.setRecoverCalled(false);
  +         }
            if (trace) { log.trace("ack mode is " + Util.acknowledgmentModeToString(ackMode)+ ", acknowledged on " + del); }
         }
   
         return null;
      }
      
  -   
  +   /*
  +    * Called when session.recover is called
  +    */
      public Object handleRecover(Invocation invocation) throws Throwable
      {
         if (trace) { log.trace("recover called"); }
         
  -      int ackMode = getState(invocation).getAcknowledgeMode();
  +      MethodInvocation mi = (MethodInvocation)invocation;
  +            
  +      SessionState state = getState(invocation);
  +      
  +      int ackMode = state.getAcknowledgeMode();
         
         if (ackMode == Session.SESSION_TRANSACTED)
         {
            throw new IllegalStateException("Cannot recover a transacted session");
         }
         
  -      //Tell the server to redeliver any un-acked messages
  -      if (trace) { log.trace("redelivering messages"); }
  +      if (trace) { log.trace("recovering the session"); }
  +       
  +      //Call redeliver
  +      SessionDelegate del = (SessionDelegate)mi.getTargetObject();
  +      
  +      del.redeliver(state.getToAck());
  +            
  +      state.getToAck().clear();
  +
  +      state.setRecoverCalled(true);
  +      
  +      return null;  
  +   }
  +   
  +   /*
  +    * Redelivery occurs in two situations:
  +    * 1) When session.recover() is called (JMS1.1 4.4.11)
  +    * "A session’s recover method is used to stop a session and restart it with its first
  +    * unacknowledged message. In effect, the session’s series of delivered messages
  +    * is reset to the point after its last acknowledged message."
  +    * An important note here is that session recovery is LOCAL to the session.
  +    * Session recovery DOES NOT result in delivered messages being cancelled back
  +    * to the channel where they can be redelivered - since that may result in them being
  +    * picked up by another session, which would break the semantics of recovery as described
  +    * in the spec.
  +    * 2) When session rollback occurs (JMS1.1 4.4.7)
  +    * On rollback of a session the spec is clear that session recovery occurs:
  +    * "If a transaction rollback is done, its produced messages
  +    * are destroyed and its consumed messages are automatically recovered. For
  +    * more information on session recovery, see Section 4.4.11 ”Message
  +    * Acknowledgment.”"
  +    * So on rollback we do session recovery (local redelivery) in the same as if 
  +    * session.recover() was called.
  +    * 
  +    * There is a conflict here though. It seems a CTS test requires messages to be available to OTHER
  +    * sessions on rollback - see CTSMiscellaneousTest.testContestedQueueOnRollback()
  +    * Which seems in direct contradiction to the spec.
  +    * 
  +    * In order to satisfy the test, on session recovery, if there are no local consumers available
  +    * to consume the message, we cancel the message back to the channel.
  +    */
  +   public Object handleRedeliver(Invocation invocation) throws Throwable
  +   {
  +      if (trace) { log.trace("redeliver called"); }
         
         MethodInvocation mi = (MethodInvocation)invocation;
         
  -      SessionDelegate del = (SessionDelegate)mi.getTargetObject();
  +      SessionState state = getState(invocation);
         
  -      if (trace) { log.trace("Calling sessiondelegate.redeliver()"); }
  +      //We put the messages back in the front of their appropriate consumer buffers and
  +      //set JMSRedelivered to true
         
  -      del.cancelDeliveries();
  +      List toRedeliver = (List)mi.getArguments()[0];
  +       
  +      LinkedList toCancel = new LinkedList();
  +      
  +      //Need to be recovered in reverse order
  +      for (int i = toRedeliver.size() - 1; i >= 0; i--)
  +      {
  +         AckInfo info = (AckInfo)toRedeliver.get(i);
  +         
  +         MessageProxy proxy = info.getMessage();
  +         
  +         proxy.setJMSRedelivered(true);
  +         
  +         //TODO delivery count although optional should be global
  +         //so we need to send it back to the server
  +         //but this has performance hit so perhaps we just don't support it?
  +         proxy.incDeliveryCount();
  +         
  +         MessageCallbackHandler handler = state.getCallbackHandler(info.getConsumerID());
  +              
  +         if (handler == null)
  +         {
  +            //This is ok.
  +            
  +            //The original consumer has closed, this message wil get cancelled
  +            //back to the channel
  +            
  +            toCancel.addFirst(info);            
  +         }
  +         else
  +         {
  +            handler.addToFrontOfBuffer(proxy);
  +         }                                    
  +      }
  +      
  +      if (!toCancel.isEmpty())
  +      {
  +         //Cancel the messages that can't be redelivered locally
  +         
  +         SessionDelegate del = (SessionDelegate)mi.getTargetObject();
  +         
  +         del.cancelDeliveries(toCancel);
  +      }
         
         return null;  
      }
  
  
  
  1.18      +9 -4      jboss-jms/src/main/org/jboss/jms/client/container/StateCreationAspect.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: StateCreationAspect.java
  ===================================================================
  RCS file: /cvsroot/jboss/jboss-jms/src/main/org/jboss/jms/client/container/StateCreationAspect.java,v
  retrieving revision 1.17
  retrieving revision 1.18
  diff -u -b -r1.17 -r1.18
  --- StateCreationAspect.java	23 May 2006 18:25:07 -0000	1.17
  +++ StateCreationAspect.java	17 Jul 2006 17:14:44 -0000	1.18
  @@ -26,6 +26,7 @@
   import org.jboss.aop.Advised;
   import org.jboss.aop.joinpoint.Invocation;
   import org.jboss.aop.joinpoint.MethodInvocation;
  +import org.jboss.aop.metadata.SimpleMetaData;
   import org.jboss.jms.client.delegate.ClientConnectionDelegate;
   import org.jboss.jms.client.delegate.ClientConnectionFactoryDelegate;
   import org.jboss.jms.client.delegate.ClientProducerDelegate;
  @@ -58,7 +59,7 @@
    * @author <a href="mailto:tim.fox at jboss.com>Tim Fox</a>
    * @author <a href="mailto:ovidiu at jboss.org>Ovidiu Feodorov</a>
    *
  - * $Id: StateCreationAspect.java,v 1.17 2006/05/23 18:25:07 ovidiu Exp $
  + * $Id: StateCreationAspect.java,v 1.18 2006/07/17 17:14:44 timfox Exp $
    */
   public class StateCreationAspect
   {
  @@ -131,13 +132,17 @@
         boolean noLocal = ((Boolean)mi.getArguments()[2]).booleanValue();    
         boolean connectionConsumer = ((Boolean)mi.getArguments()[4]).booleanValue();
   
  +      SimpleMetaData md = ((Advised)consumerDelegate)._getInstanceAdvisor().getMetaData();
  +      
         int consumerID =
  -         ((Integer)((Advised)consumerDelegate)._getInstanceAdvisor().getMetaData().
  -         getMetaData(MetaDataConstants.JMS, MetaDataConstants.CONSUMER_ID)).intValue();
  +         ((Integer)md.getMetaData(MetaDataConstants.JMS, MetaDataConstants.CONSUMER_ID)).intValue();
  +      
  +      int prefetchSize =
  +         ((Integer)md.getMetaData(MetaDataConstants.JMS, MetaDataConstants.PREFETCH_SIZE)).intValue();
         
         ConsumerState consumerState =
            new ConsumerState(sessionState, consumerDelegate, dest, selector,
  -                           noLocal, consumerID, connectionConsumer);
  +                           noLocal, consumerID, connectionConsumer, prefetchSize);
         
         delegate.setState(consumerState);
         return consumerDelegate;
  
  
  
  1.15      +21 -8     jboss-jms/src/main/org/jboss/jms/client/container/TransactionAspect.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: TransactionAspect.java
  ===================================================================
  RCS file: /cvsroot/jboss/jboss-jms/src/main/org/jboss/jms/client/container/TransactionAspect.java,v
  retrieving revision 1.14
  retrieving revision 1.15
  diff -u -b -r1.14 -r1.15
  --- TransactionAspect.java	6 May 2006 05:05:39 -0000	1.14
  +++ TransactionAspect.java	17 Jul 2006 17:14:44 -0000	1.15
  @@ -32,8 +32,10 @@
   import org.jboss.jms.client.state.HierarchicalState;
   import org.jboss.jms.client.state.SessionState;
   import org.jboss.jms.delegate.ConnectionDelegate;
  +import org.jboss.jms.message.MessageProxy;
   import org.jboss.jms.tx.AckInfo;
   import org.jboss.jms.tx.LocalTx;
  +import org.jboss.jms.tx.TxState;
   
   /**
    * This aspect handles transaction related logic
  @@ -42,7 +44,7 @@
    * 
    * @author <a href="mailto:tim.fox at jboss.com>Tim Fox</a>
    *
  - * $Id: TransactionAspect.java,v 1.14 2006/05/06 05:05:39 ovidiu Exp $
  + * $Id: TransactionAspect.java,v 1.15 2006/07/17 17:14:44 timfox Exp $
    */
   public class TransactionAspect
   {
  @@ -124,6 +126,13 @@
         ConnectionState connState = (ConnectionState)state.getParent();
         ConnectionDelegate conn = (ConnectionDelegate)connState.getDelegate();
         
  +      TxState tx = connState.getResourceManager().getTx(state.getCurrentTxId());
  +      
  +      if (tx == null)
  +      {
  +         throw new IllegalStateException("Cannot find tx:" + state.getCurrentTxId());
  +      }
  +        
         try
         {
            connState.getResourceManager().rollbackLocal((LocalTx)state.getCurrentTxId(), conn);
  @@ -179,10 +188,14 @@
         {
            MethodInvocation mi = (MethodInvocation)invocation;
            
  -         long messageID = ((Long)mi.getArguments()[0]).longValue();
  +         MessageProxy proxy = (MessageProxy)mi.getArguments()[0];
  +         
  +         //long messageID = proxy.getMessage().getMessageID();
            
            int consumerID = ((Integer)mi.getArguments()[1]).intValue();
            
  +         AckInfo info = new AckInfo(proxy, consumerID);
  +         
            Object txID = state.getCurrentTxId();
            
            if (txID == null)
  @@ -194,7 +207,7 @@
            
            //Add the acknowledgement to the transaction
            
  -         connState.getResourceManager().addAck(txID, new AckInfo(messageID, consumerID));                  
  +         connState.getResourceManager().addAck(txID, info);                  
         }
         
         return null;
  
  
  



More information about the jboss-cvs-commits mailing list