[jboss-cvs] jboss-jms/src/main/org/jboss/jms/client/remoting ...

Timothy Fox tim.fox at jboss.com
Mon Jul 17 13:14:44 EDT 2006


  User: timfox  
  Date: 06/07/17 13:14:44

  Modified:    src/main/org/jboss/jms/client/remoting   
                        CallbackManager.java MessageCallbackHandler.java
  Added:       src/main/org/jboss/jms/client/remoting   
                        HandleMessageResponse.java
  Log:
  Many changes including implementation of prefetch, SEDAisation of server, changing of recovery
  
  Revision  Changes    Path
  1.6       +7 -10     jboss-jms/src/main/org/jboss/jms/client/remoting/CallbackManager.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: CallbackManager.java
  ===================================================================
  RCS file: /cvsroot/jboss/jboss-jms/src/main/org/jboss/jms/client/remoting/CallbackManager.java,v
  retrieving revision 1.5
  retrieving revision 1.6
  diff -u -b -r1.5 -r1.6
  --- CallbackManager.java	24 Jun 2006 09:05:36 -0000	1.5
  +++ CallbackManager.java	17 Jul 2006 17:14:44 -0000	1.6
  @@ -21,12 +21,12 @@
     */
   package org.jboss.jms.client.remoting;
   
  +import java.util.List;
   import java.util.Map;
   
   import javax.management.MBeanServer;
   
  -import org.jboss.jms.message.MessageProxy;
  -import org.jboss.jms.server.endpoint.DeliveryRunnable;
  +import org.jboss.jms.server.endpoint.ClientDelivery;
   import org.jboss.jms.server.remoting.MessagingMarshallable;
   import org.jboss.remoting.InvocationRequest;
   import org.jboss.remoting.ServerInvocationHandler;
  @@ -74,11 +74,11 @@
      {
         MessagingMarshallable mm = (MessagingMarshallable)ir.getParameter();
         
  -      DeliveryRunnable dr = (DeliveryRunnable)mm.getLoad();
  +      ClientDelivery dr = (ClientDelivery)mm.getLoad();
         
         int consumerID = dr.getConsumerID();
         
  -      MessageProxy del = dr.getMessageProxy();
  +      List msgs = dr.getMessages();
         
         MessageCallbackHandler handler =
            (MessageCallbackHandler)callbackHandlers.get(new Integer(consumerID));
  @@ -88,14 +88,11 @@
            throw new IllegalStateException("Cannot find handler for consumer: " + consumerID);
         }
         
  -      handler.handleMessage(del);
  -      
  -      return null;
  +      return new MessagingMarshallable(mm.getVersion(), handler.handleMessage(msgs));
      }
   
      public void removeListener(InvokerCallbackHandler arg0)
      {
  -
      }
   
      public void setInvoker(ServerInvoker arg0)
  
  
  
  1.71      +361 -458  jboss-jms/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: MessageCallbackHandler.java
  ===================================================================
  RCS file: /cvsroot/jboss/jboss-jms/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java,v
  retrieving revision 1.70
  retrieving revision 1.71
  diff -u -b -r1.70 -r1.71
  --- MessageCallbackHandler.java	26 Jun 2006 18:41:21 -0000	1.70
  +++ MessageCallbackHandler.java	17 Jul 2006 17:14:44 -0000	1.71
  @@ -26,6 +26,7 @@
   import java.util.LinkedList;
   import java.util.List;
   
  +import javax.jms.IllegalStateException;
   import javax.jms.JMSException;
   import javax.jms.MessageListener;
   import javax.jms.Session;
  @@ -33,18 +34,19 @@
   import org.jboss.jms.delegate.ConsumerDelegate;
   import org.jboss.jms.delegate.SessionDelegate;
   import org.jboss.jms.message.MessageProxy;
  +import org.jboss.jms.tx.AckInfo;
   import org.jboss.logging.Logger;
  +import org.jboss.messaging.util.Future;
   import org.jboss.remoting.callback.HandleCallbackException;
   
  -import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
   import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
   
   /**
    * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
    * @author <a href="mailto:tim.fox at jboss.com">Tim Fox/a>
  - * @version <tt>$Revision: 1.70 $</tt>
  + * @version <tt>$Revision: 1.71 $</tt>
    *
  - * $Id: MessageCallbackHandler.java,v 1.70 2006/06/26 18:41:21 timfox Exp $
  + * $Id: MessageCallbackHandler.java,v 1.71 2006/07/17 17:14:44 timfox Exp $
    */
   public class MessageCallbackHandler
   {
  @@ -52,10 +54,6 @@
      
      private static final Logger log;
      
  -   //TODO Make configurable
  -   private static final int CLOSE_TIMEOUT = 20000;
  -   
  -   
      // Static --------------------------------------------------------
      
      private static boolean trace;
  @@ -66,6 +64,9 @@
         trace = log.isTraceEnabled();
      }
      
  +   //Hardcoded for now
  +   private static final int MAX_REDELIVERIES = 10;
  +      
      public static void callOnMessage(ConsumerDelegate cons,
                                       SessionDelegate sess,
                                       MessageListener listener,
  @@ -77,9 +78,15 @@
      {
         preDeliver(sess, consumerID, m, isConnectionConsumer);  
                     
  +      int tries = 0;
  +      
  +      while (true)
  +      {
         try
         {      
            listener.onMessage(m);         
  +            
  +            break;
         }
         catch (RuntimeException e)
         {
  @@ -91,15 +98,35 @@
   
            if (ackMode == Session.AUTO_ACKNOWLEDGE || ackMode == Session.DUPS_OK_ACKNOWLEDGE)
            {
  -            // cancel the delivery - this means it will be immediately redelivered
  -            if (trace) { log.trace("cancelling " + id); }
  -            cons.cancelDelivery(id);
  +               //We redeliver at certain number of times
  +               if (tries < MAX_REDELIVERIES)
  +               {
  +                  m.setJMSRedelivered(true);
  +                  
  +                  //TODO delivery count although optional should be global
  +                  //so we need to send it back to the server
  +                  //but this has performance hit so perhaps we just don't support it?
  +                  m.incDeliveryCount();
  +                  
  +                  tries++;
  +               }
  +               else
  +               {
  +                  log.error("Max redeliveries has occurred for message: " + m.getJMSMessageID());
  +                  
  +                  //TODO - Send to DLQ
  +                  
  +                  break;
  +               }
            }
            else
            {
               // Session is either transacted or CLIENT_ACKNOWLEDGE
               // We just deliver next message
               if (trace) { log.trace("ignoring exception on " + id); }
  +               
  +               break;
  +            }
            }
         }
               
  @@ -116,7 +143,7 @@
         // add anything to the tx for this session.
         if (!isConnectionConsumer)
         {
  -         sess.preDeliver(m.getMessage().getMessageID(), consumerID);
  +         sess.preDeliver(m, consumerID);
         }         
      }
      
  @@ -130,151 +157,115 @@
         // add anything to the tx for this session
         if (!isConnectionConsumer)
         {
  -         sess.postDeliver(m.getMessage().getMessageID(), consumerID);
  +         sess.postDeliver(m, consumerID);
         }         
      }
      
      // Attributes ----------------------------------------------------
         
  -   protected LinkedList buffer;
  +   private LinkedList buffer;
      
  -   protected SessionDelegate sessionDelegate;
  +   private SessionDelegate sessionDelegate;
      
  -   protected ConsumerDelegate consumerDelegate;
  +   private ConsumerDelegate consumerDelegate;
      
  -   protected int consumerID;
  +   private int consumerID;
      
  -   protected boolean isConnectionConsumer;
  +   private boolean isConnectionConsumer;
      
  -   protected volatile Thread receiverThread;
  +   private volatile Thread receiverThread;
      
  -   protected MessageListener listener;
  +   private MessageListener listener;
      
  -   protected int deliveryAttempts;
  +   private int ackMode;
      
  -   protected int ackMode;
  +   private boolean closed;
       
  -   // Executor used for executing onMessage methods - there is one per session
  -   protected QueuedExecutor onMessageExecutor;
  +   private Object mainLock;
      
  -   // Executor for executing activateConsumer methods asynchronously, there is one pool per connection
  -   protected PooledExecutor activateConsumerExecutor;
  +   private boolean serverSending;
          
  -   protected Object mainLock;
  +   private int bufferSize;
      
  -   protected Object onMessageLock;
  +   private QueuedExecutor sessionExecutor;
      
  -   protected boolean closed;
  -      
  -   protected volatile boolean closing;
  -   
  -   protected boolean gotLastMessage;
  -   
  -   //The id of the last message we received
  -   protected long lastMessageId = -1;
  -   
  -   protected volatile int activationCount;
  -   
  -   protected volatile boolean onMessageExecuting;
  +   private boolean listenerRunning;
      
      // Constructors --------------------------------------------------
   
  -   public MessageCallbackHandler(boolean isCC, int ackMode, QueuedExecutor onMessageExecutor,
  -                                 PooledExecutor activateConsumerExecutor,
  -                                 SessionDelegate sess, ConsumerDelegate cons, int consumerID)
  +   public MessageCallbackHandler(boolean isCC, int ackMode,                                
  +                                 SessionDelegate sess, ConsumerDelegate cons, int consumerID,
  +                                 int bufferSize, QueuedExecutor sessionExecutor)
      {
  +      if (bufferSize < 1)
  +      {
  +         throw new IllegalArgumentException(this + " bufferSize must be > 0");
  +      }
  +              
  +      this.bufferSize = bufferSize;
  +      
         buffer = new LinkedList();
         
         isConnectionConsumer = isCC;
         
         this.ackMode = ackMode;
         
  -      this.onMessageExecutor = onMessageExecutor;
  -      
  -      this.activateConsumerExecutor = activateConsumerExecutor;
  -         
         this.sessionDelegate = sess;
         
         this.consumerDelegate = cons;
         
         this.consumerID = consumerID;
         
  +      this.serverSending = true;
  +      
         mainLock = new Object();
             
  -      onMessageLock = new Object();
  +      this.sessionExecutor = sessionExecutor;
      }
           
      // Public --------------------------------------------------------
      
  -   public void handleMessage(MessageProxy md) throws HandleCallbackException
  -   {            
  -      if (trace) { log.trace("receiving message " + md + " from the remoting layer"); }
         
  -      md = processMessage(md);
  +   /**
  +    * Handles a list of messages sent from the server
  +    * @param msgs The list of messages
  +    * @return The number of messages handled (placeholder for future - now we always accept all messages)
  +    *         or -1 if closed
  +    */
  +   public HandleMessageResponse handleMessage(List msgs) throws HandleCallbackException
  +   {            
  +      if (trace) { log.trace(this + " receiving " + msgs.size() + " messages from the remoting layer"); }            
         
         synchronized (mainLock)
         {
            if (closed)
            {
  -            //Sanity check
  -            //This should never happen
  -            //Part of the close procedure is to ensure that no more messages will be sent
  -            //If this happens it implies the close() procedure is not functioning correctly
  -            throw new IllegalStateException("Message has arrived after consumer is closed!");
  +            //Ignore
  +            return new HandleMessageResponse(false, 0);
            }
            
  -         if (closing && gotLastMessage)
  -         {
  -            //Sanity check - this should never happen
  -            //No messages should arrive after the last one sent by the server consumer endpoint
  -            throw new IllegalStateException("Message has arrived after we have received the last one");
  -         }
  +         //Put the messages in the buffer
  +         //And notify any waiting receive()
            
  -         // We record the last message we received
  -         this.lastMessageId = md.getMessage().getMessageID();
  +         processMessages(msgs);
                                    
  -         if (listener != null)
  -         {
  -            // Queue the message to be delivered by the session
  -            ClientDeliveryRunnable cdr = new ClientDeliveryRunnable(md);
  +         buffer.addAll(msgs);                  
               
  -            onMessageExecuting = true;         
  +         if (trace) { log.trace(this + " added messages to the buffer"); }            
               
  -            try
  -            {
  -               onMessageExecutor.execute(cdr);
  -            }
  -            catch (InterruptedException e)
  -            {
  -               //This should never happen
  -               throw new IllegalStateException("Thread interrupted in client delivery executor");
  -            }
  -         }
  -         else
  -         {                                                    
  -            //Put the message in the buffer
  -            //And notify any waiting receive()
  -            //On close any remaining messages will be cancelled
  -            //We do not wait for the message to be received before returning
  +         boolean full = buffer.size() >= bufferSize;         
                     
  -            buffer.add(md);                                 
  -         }   
  +         if (trace) { log.trace(this + " receiving messages from the remoting layer"); }            
            
  -         if (closing)
  -         {
  -            //If closing then we may have the close() thread waiting for the last message as well as a receive
  -            //thread
  -            mainLock.notifyAll();
  -         }
  -         else
  -         {
  -            //Otherwise we will only have at most one receive thread waiting
  -            //We don't want to do notifyAll in both cases since notifyAll can have a perf penalty
  -            if (receiverThread != null)
  +         messagesAdded();
  +         
  +         if (full)
               {
  -               mainLock.notify();
  -            }
  +            serverSending = false;
            }
  +                                          
  +         //For now we always accept all messages - in the future this may change
  +         return new HandleMessageResponse(full, msgs.size());
         }
      }
       
  @@ -282,35 +273,24 @@
      {
         synchronized (mainLock)
         {         
  -         if (closed)
  -         {
  -            throw new JMSException("Cannot set MessageListener - consumer is closed");
  -         }
  -         
  -         // JMS consumer is single threaded, so it shouldn't be possible to set a MessageListener
  -         // while another thread is receiving
  -         
            if (receiverThread != null)
            {
               // Should never happen
  -            throw new javax.jms.IllegalStateException("Consumer is currently in receive(..) Cannot set MessageListener");
  +            throw new IllegalStateException("Consumer is currently in receive(..) Cannot set MessageListener");
            }
            
  -         synchronized (onMessageLock)
  -         {         
               this.listener = listener;
  -         }
      
  -         log.debug("installed listener " + listener);
  -   
  -         activateConsumer();
  +         if (!buffer.isEmpty())
  +         {  
  +            listenerRunning = true;
  +            this.queueRunner(new ListenerRunner());
  +         }
         }
      }
             
      public void close() throws JMSException
      {
  -      try
  -      {
            synchronized (mainLock)
            {
               log.debug(this + " closing");
  @@ -320,107 +300,60 @@
                  return;
               }
               
  -            closing = true;   
  -            
  -            //We wait for any activation in progress to complete and the resulting message
  -            //(if any) to be returned and processed.
  -            //The ensures a clean, gracefully closure of the client side consumer, without
  -            //any messages in transit which might arrive after the consumer is closed and which
  -            //subsequently might be cancelled out of sequence causing message ordering problems
  -            
  -            if (activationCount > 0)
  -            {
  -               long waitTime = CLOSE_TIMEOUT;
  -               
  -               while (activationCount > 0 && waitTime > 0)
  -               {               
  -                  waitTime = waitOnLock(mainLock, waitTime);           
  -               }
  -               
  -               if (activationCount > 0)
  -               {
  -                  log.warn("Timed out waiting for activations to complete");
  -               }
  -            }                       
  -               
  -            //Now we know there are no activations in progress but the consumer may still be active so we call
  -            //deactivate which returns the id of the last message we should have received
  -            //if we have received this message then we know there is no possibility of any message still in
  -            //transit and we can close down with confidence
  -            //otherwise we wait for this message and timeout if it doesn't arrive which might be the case
  -            //if the connection to the server has been lost
  -                        
  -            long lastMessageIDToExpect = deactivateConsumer();
  -            
  -            if (lastMessageIDToExpect != -1)
  -            {            
  -               long waitTime = CLOSE_TIMEOUT;
  -               
  -               while (lastMessageIDToExpect != lastMessageId && waitTime > 0)
  -               {               
  -                  waitTime = waitOnLock(mainLock, waitTime);           
  -               }
  +         closed = true;   
                  
  -               if (lastMessageIDToExpect != lastMessageId)
  +         if (receiverThread != null)
                  {
  -                  log.warn("Timed out waiting for last message to arrive, last=" + lastMessageId +" expected=" + lastMessageIDToExpect);
  -               }
  -            }
  -            
  -            //We set this even if we timed out waiting since we do not want any more to arrive now
  -            gotLastMessage = true;            
  -            
               //Wake up any receive() thread that might be waiting
  -            if (trace) { log.trace("Notifying main lock"); }
               mainLock.notify();
  -            if (trace) { log.trace("Notified main lock"); }
  +         }                                       
               
  -            //Now make sure that any onMessage of a listener has finished executing
  +         //Wait for any on message executions to complete
               
  -            long waitTime = CLOSE_TIMEOUT;
  +         Future result = new Future();
               
  -            synchronized (onMessageLock)
  -            {               
  -               while (onMessageExecuting && waitTime > 0)
  +         try
                  {
  -                  waitTime = waitOnLock(onMessageLock, waitTime);   
  +            this.sessionExecutor.execute(new Closer(result));
  +            
  +            result.getResult();
                  }
  -               if (onMessageExecuting)
  +         catch (InterruptedException e)
                  {
  -                  //Timed out waiting for last onMessage to be processed
  -                  log.warn("Timed out waiting for last onMessage to be executed");            
  -               }
  +            log.warn("Thread interrupted", e);
               }
                           
  -            //Now we know that all messages have been received and processed                                 
  +         //Now we cancel anything left in the buffer
  +         //The reason we do this now is that otherwise the deliveries wouldn't get cancelled
  +         //until session close (since we don't cancel consumer's deliveries until then)
  +         //which is too late - since we need to preserve the order of messages delivered in a session.
               
               if (!buffer.isEmpty())
               {            
                  //Now we cancel any deliveries that might be waiting in our buffer
  +            //This is because, otherwise the messages wouldn't get cancelled until
  +            //the corresponding session died.
  +            //So if another consumer in another session tried to consume from the channel
  +            //before that session died it wouldn't receive those messages
                  Iterator iter = buffer.iterator();
                  
  -               List ids = new ArrayList();
  +            List ackInfos = new ArrayList();
                  while (iter.hasNext())
                  {                        
                     MessageProxy mp = (MessageProxy)iter.next();
                     
  -                  ids.add(new Long(mp.getMessage().getMessageID()));                                    
  -               }
  -               cancelDeliveries(ids);
  -            }
  -          
  -            //Now we are done
  -            listener = null;
  +               AckInfo ack = new AckInfo(mp, consumerID);
               
  -            receiverThread = null;
  +               ackInfos.add(ack);
               
  -            closed = true;  
            }
  +                  
  +            sessionDelegate.cancelDeliveries(ackInfos);
  +            
  +            buffer.clear();
         }
  -      catch (InterruptedException e)
  -      {
  -         //Ignore         
         }
  +      
         if (trace) { log.trace(this + " closed"); }
      }
      
  @@ -434,11 +367,13 @@
       */
      public MessageProxy receive(long timeout) throws JMSException
      {                    
  +      MessageProxy m = null;      
  +      
         synchronized (mainLock)
         {        
            if (trace) { log.trace(this + " receiving, timeout = " + timeout); }
            
  -         if (closed || closing)
  +         if (closed)
            {
               //If consumer is closed or closing calling receive returns null
               return null;
  @@ -453,8 +388,6 @@
                  
            long startTimestamp = System.currentTimeMillis();
            
  -         MessageProxy m = null;
  -         
            try
            {
               while(true)
  @@ -511,7 +444,7 @@
                  {
                     if (trace) { log.trace("message " + m + " is not expired, pushing it to the caller"); }
                     
  -                  return m;
  +                  break;
                  }
                  
                  if (trace)
  @@ -524,11 +457,6 @@
                  {
                     timeout -= System.currentTimeMillis() - startTimestamp;
                  }
  -               
  -               if (closing)
  -               {
  -                  return null;
  -               }               
               }
            }
            finally
  @@ -536,21 +464,44 @@
               receiverThread = null;            
            }
         } 
  +      
  +      //This needs to be outside the lock
  +      if (buffer.isEmpty() && !serverSending)
  +      {
  +         //The server has previously stopped sending because the buffer was full
  +         //but now it is empty, so we tell the server to start sending again
  +         consumerDelegate.more();
  +      }
  +      
  +      return m;
      }    
      
  +
      public MessageListener getMessageListener()
      {
  -      synchronized (onMessageLock)
  -      {
            return listener;
         }
  -   }
   
      public String toString()
      {
         return "MessageCallbackHandler[" + consumerID + "]";
      }
   
  +   public int getConsumerId()
  +   {
  +      return consumerID;
  +   }
  +   
  +   public void addToFrontOfBuffer(MessageProxy proxy)
  +   {
  +      synchronized (mainLock)
  +      {
  +         buffer.addFirst(proxy);
  +         
  +         messagesAdded();
  +      }
  +   }
  +   
      // Package protected ---------------------------------------------
      
      // Protected -----------------------------------------------------
  @@ -576,87 +527,22 @@
         }     
      }
       
  -   protected void cancelDeliveries(List ids)
  -   {
  -      try
  -      {
  -         consumerDelegate.cancelDeliveries(ids);
  -      }
  -      catch (Exception e)
  -      {
  -         String msg = "Failed to cancel deliveries";
  -         log.warn(msg, e);         
  -      }
  -   }
  -   
  -   protected void activateConsumer() throws JMSException
  -   {
  -      // We execute this on a separate thread to avoid the case where the asynchronous delivery
  -      // arrives before we have returned from the synchronus call, which would cause us to lose
  -      // the message.
  -        
  -      try
  -      {
  -         if (trace) { log.trace("initiating consumer endpoint activation"); }
  -         activationCount++;         
  -         activateConsumerExecutor.execute(new ConsumerActivationRunnable());
  -      }
  -      catch (InterruptedException e)
  -      {
  -         // This should never happen
  -         throw new IllegalStateException("Activation executor thread interrupted");
  -      }
  -   }
  -   
  -   protected long deactivateConsumer() throws JMSException
  -   {
  -      return consumerDelegate.deactivate();
  -   }
  -   
  -   protected MessageProxy getMessageNow() throws JMSException
  -   {
  -      MessageProxy del = (MessageProxy)consumerDelegate.getMessageNow(false);      
  -      
  -      if (del != null)
  -      {
  -         //We record the id of the last message delivered
  -         //No need to notify here since this will never be called while we
  -         //are closing
  -         lastMessageId = del.getMessage().getMessageID();         
  -                  
  -         return processMessage(del);
  -      }
  -      else
  -      {
  -         return null;
  -      }
  -   }
  -   
      protected MessageProxy getMessage(long timeout) throws JMSException
      {
  -      MessageProxy m = null;
  -      
  -      // If it's receiveNoWait then get the message directly
         if (timeout == -1)
         {
  -         m = getMessageNow();        
  +         //receiveNoWait so don't wait
         }
         else
         {
  -         // ... otherwise we activate the server side consumer and wait for a message to arrive
  -         // asynchronously         
  -         activateConsumer();
  -      
            try
            {         
               if (timeout == 0)
               {
                  //Wait for ever potentially
  -               while (!closing && buffer.isEmpty())
  +               while (!closed && buffer.isEmpty())
                  {
  -                  if (trace) { log.trace("waiting on main lock"); }
                     mainLock.wait();               
  -                  if (trace) { log.trace("done waiting on main lock"); }
                  }
               }
               else
  @@ -664,166 +550,182 @@
                  //Wait with timeout
                  long toWait = timeout;
                
  -               while (!closing && buffer.isEmpty() && toWait > 0)
  +               while (!closed && buffer.isEmpty() && toWait > 0)
                  {
  +                  if (trace) { log.trace("Waiting on lock"); }
                     toWait = waitOnLock(mainLock, toWait);
  +                  if (trace) { log.trace("Done waiting on lock, empty?" + buffer.isEmpty()); }
                  }
               }
  -             
  -            if (closing)
  +         }
  +         catch (InterruptedException e)
               {
  -               m = null;
  +            return null;
               }
  -            else
  +      }
  +             
  +      if (closed)
               {
  +         return null;
  +      }
  +         
  +      MessageProxy m = null;     
  +      
                  if (!buffer.isEmpty())
                  {
                     m = (MessageProxy)buffer.removeFirst();
  +         
  +         if (trace) { log.trace("Got message:" + m); }                  
                  }
                  else
                  {
                     m = null;
                  }
  -            }
  -         }
  -         catch (InterruptedException e)
  -         {
  -            //interrupting receive thread should make it return null
  -            m = null;
  -         }         
  -         finally
  -         {
  -            // We only need to call this if we timed out        
  -            if (m == null)
  -            {
  -               deactivateConsumer();
  -            }               
  -         } 
  -      }
                  
         return m;
      }
      
  -   protected MessageProxy processMessage(MessageProxy del)
  +   protected void processMessages(List msgs)
      {
  +      Iterator iter = msgs.iterator();
  +      
  +      while (iter.hasNext())
  +      {         
  +         MessageProxy msg = (MessageProxy)iter.next();
  +      
         //if this is the handler for a connection consumer we don't want to set the session delegate
         //since this is only used for client acknowledgement which is illegal for a session
         //used for an MDB
  -      if (!this.isConnectionConsumer)
  -      {
  -         del.setSessionDelegate(sessionDelegate);
  -      }         
  -      del.setReceived();
  +         msg.setSessionDelegate(sessionDelegate, isConnectionConsumer);
         
  -      return del;
  +         msg.setReceived();
  +      }
      }
      
      // Private -------------------------------------------------------
      
  -   // Inner classes -------------------------------------------------
  -   
  -   private class ClientDeliveryRunnable implements Runnable
  +   private void queueRunner(ListenerRunner runner)
      {
  -      private MessageProxy message;
  -      
  -      private ClientDeliveryRunnable(MessageProxy message)
  +      try
         {
  -         this.message = message;
  +         this.sessionExecutor.execute(runner);
         }
  -      
  -      public void run()
  +      catch (InterruptedException e)
         {
  -         // We synchronize here to prevent the message listener being set with a different one
  -         // between callOnMessage and activate being called
  -         synchronized (onMessageLock)
  +         log.warn("Thread interrupted", e);
  +      }
  +   }
  +   
  +   private void messagesAdded()
            { 
  -            if (closed)
  +      //If we have a thread waiting on receive() we notify it
  +      if (receiverThread != null)
               {
  -               // Sanity check. This should never happen. Part of the close procedure is to ensure
  -               // there are no messages in the executor queue for delivery to the MessageListener.
  -               // If this happens it implies the close() procedure is not working properly.
  -               throw new IllegalStateException("Calling onMessage() but the consumer is closed!");
  +         if (trace) { log.trace(this + " notifying receiver thread"); }            
  +         mainLock.notify();
               }
  -            else
  +      else if (listener != null)
               {
  -               try
  -               {                                                    
  -                  MessageCallbackHandler.callOnMessage(consumerDelegate, sessionDelegate, listener,
  -                                                       consumerID, isConnectionConsumer, message, ackMode);
  -                  if (!closing)
  +         //We have a message listener
  +         if (!listenerRunning)
                     {
  -                     consumerDelegate.activate();                                  
  +            listenerRunning = true;
  +            this.queueRunner(new ListenerRunner());
                     }
                     
  -                  onMessageExecuting = false;
  -                  
  -                  //The close() thread may be waiting for us to finish executing, so wake it up
  -                  onMessageLock.notify();                 
  -               }
  -               catch (JMSException e)
  -               {
  -                  log.error("Failed to deliver message", e);
  +         //TODO - Execute onMessage on same thread for even better throughput 
                  }                           
               }
  +   
  +   // Inner classes -------------------------------------------------   
  +   
  +   /*
  +    * This class is used to put on the listener executor to wait for onMessage
  +    * invocations to complete when closing
  +    */
  +   private class Closer implements Runnable
  +   {
  +      Future result;
  +      
  +      Closer(Future result)
  +      {
  +         this.result = result;
            }
  +      
  +      public void run()
  +      {
  +         result.setResult(null);
         }
      }
      
  -   private class ConsumerActivationRunnable implements Runnable
  +   /*
  +    * This class handles the execution of onMessage methods
  +    */
  +   private class ListenerRunner implements Runnable
      {
         public void run()
         {      
  -         try
  -         {
  -            // We always try and return the message immediately, if available. This prevents an
  -            // extra network call to deliver the message. If the message is not available,
  -            // the consumer will stay active and the message will delivered asynchronously (pushed)
  -            // (that is what the boolean param is for)
  +         MessageProxy mp = null;
               
  -            if (trace) { log.trace("activation runnable running, getting message now"); }
  +         boolean again = false;
               
  -            try
  +         synchronized (mainLock)
               {
  -               MessageProxy m = (MessageProxy)consumerDelegate.getMessageNow(true);
  -               
  -               if (trace) { log.trace("got message " + m); }
  +            //remove a message from the buffer
                  
  -               if (m != null)
  +            if (buffer.isEmpty())
                  {
  -                  if (trace) { log.trace("handling " + m); }
  -                  handleMessage(m);
  +               listenerRunning = false;               
                  }
  +            else
  +            {               
  +               mp = (MessageProxy)buffer.removeFirst();
   
  -               if (trace) { log.trace("activation runnable done"); }
  +               if (mp == null)
  +               {
  +                  throw new java.lang.IllegalStateException("Cannot find message in buffer!");
               }
  -            finally
  +               
  +               again = !buffer.isEmpty();
  +               
  +               if (!again)
               {
  -               activationCount--;
  -               // closing is volatile so we don't have to do the check inside the synchronized
  -               // (mainLock) {} block which should aid concurrency
  -               if (closing)
  +                  listenerRunning  = false;
  +               }  
  +            }
  +         }
  +                        
  +         if (mp != null)
                  {
  -                  synchronized (mainLock)
  +            try
                     {
  -                     mainLock.notifyAll();                     
  +               callOnMessage(consumerDelegate, sessionDelegate, listener, consumerID, false, mp, ackMode);
                     }
  +            catch (JMSException e)
  +            {
  +               log.error("Failed to deliver message", e);
                  }
               }                        
  +         
  +         if (again)
  +         {
  +            //Queue it up again
  +            queueRunner(this);
            }
  -         catch(Throwable t)
  +         else
            {
  -            log.error("Consumer endpoint activation failed", t);
  -            if (t.getCause() != null)
  +            if (!serverSending)
               {
  -               log.error("Cause:" + t.getCause());
  -            }            
  +               //Ask server for more messages
               try
               {
  -               close();
  +                  consumerDelegate.more();
               }
               catch (JMSException e)
               {
  -               log.error("Failed to close consumer", e);
  +                  log.error("Failed to execute more()", e);
  +               }
  +               return;
               }
            }             
         } 
  @@ -831,3 +733,4 @@
   }
   
   
  +
  
  
  
  1.1      date: 2006/07/17 17:14:44;  author: timfox;  state: Exp;jboss-jms/src/main/org/jboss/jms/client/remoting/HandleMessageResponse.java
  
  Index: HandleMessageResponse.java
  ===================================================================
  /*
   * JBoss, Home of Professional Open Source
   * Copyright 2005, JBoss Inc., and individual contributors as indicated
   * by the @authors tag. See the copyright.txt in the distribution for a
   * full listing of individual contributors.
   *
   * This is free software; you can redistribute it and/or modify it
   * under the terms of the GNU Lesser General Public License as
   * published by the Free Software Foundation; either version 2.1 of
   * the License, or (at your option) any later version.
   *
   * This software is distributed in the hope that it will be useful,
   * but WITHOUT ANY WARRANTY; without even the implied warranty of
   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
   * Lesser General Public License for more details.
   *
   * You should have received a copy of the GNU Lesser General Public
   * License along with this software; if not, write to the Free
   * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
   * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
   */
  package org.jboss.jms.client.remoting;
  
  import java.io.Externalizable;
  import java.io.IOException;
  import java.io.ObjectInput;
  import java.io.ObjectOutput;
  
  /**
   * A HandleMessageResponse
   * 
   * This is the response the server gets after delivering messages to a client consumer
  
   * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
   * @version <tt>$Revision: 1.1 $</tt>
   *
   * $Id: HandleMessageResponse.java,v 1.1 2006/07/17 17:14:44 timfox Exp $
   *
   */
  public class HandleMessageResponse implements Externalizable
  {
     private static final long serialVersionUID = 2500443290413453569L;
  
     private boolean full;
     
     private int messagesAccepted;
     
     public HandleMessageResponse()
     {      
     }
     
     public HandleMessageResponse(boolean full, int messagesAccepted)
     {
        this.full = full;
        
        this.messagesAccepted = messagesAccepted;
     }
     
     public boolean clientIsFull()
     {
        return full;
     }
     
     public int getNumberAccepted()
     {
        return messagesAccepted;
     }
     
     
     // Externalizable implementation
     // ---------------------------------------------------------------
     
     public void writeExternal(ObjectOutput out) throws IOException
     {
        out.writeBoolean(full);
        
        out.writeInt(messagesAccepted);
     }
  
     public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException
     {
        full = in.readBoolean();
        
        messagesAccepted = in.readInt();
     }
  }
  
  
  



More information about the jboss-cvs-commits mailing list