[jboss-cvs] jboss-jms/src/main/org/jboss/jms/server/endpoint ...

Timothy Fox tim.fox at jboss.com
Mon Jul 17 13:14:46 EDT 2006


  User: timfox  
  Date: 06/07/17 13:14:46

  Modified:    src/main/org/jboss/jms/server/endpoint        
                        ConnectionEndpoint.java ConsumerEndpoint.java
                        ServerConnectionEndpoint.java
                        ServerConnectionFactoryEndpoint.java
                        ServerConsumerEndpoint.java
                        ServerSessionEndpoint.java SessionEndpoint.java
  Added:       src/main/org/jboss/jms/server/endpoint        
                        ClientDelivery.java
  Log:
  Many changes including implementation of prefetch, SEDAisation of server, changing of recovery
  
  Revision  Changes    Path
  1.11      +3 -4      jboss-jms/src/main/org/jboss/jms/server/endpoint/ConnectionEndpoint.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: ConnectionEndpoint.java
  ===================================================================
  RCS file: /cvsroot/jboss/jboss-jms/src/main/org/jboss/jms/server/endpoint/ConnectionEndpoint.java,v
  retrieving revision 1.10
  retrieving revision 1.11
  diff -u -b -r1.10 -r1.11
  --- ConnectionEndpoint.java	27 Jun 2006 19:44:39 -0000	1.10
  +++ ConnectionEndpoint.java	17 Jul 2006 17:14:45 -0000	1.11
  @@ -34,9 +34,9 @@
    * The rest of the methods are handled in the advice stack.
    * 
    * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  - * @version <tt>$Revision: 1.10 $</tt>
  + * @version <tt>$Revision: 1.11 $</tt>
    *
  - * $Id: ConnectionEndpoint.java,v 1.10 2006/06/27 19:44:39 timfox Exp $
  + * $Id: ConnectionEndpoint.java,v 1.11 2006/07/17 17:14:45 timfox Exp $
    */
   public interface ConnectionEndpoint extends Closeable
   {
  @@ -55,6 +55,5 @@
      void sendTransaction(TransactionRequest request) throws JMSException;
   
      Xid[] getPreparedTransactions();
  -
   }
   
  
  
  
  1.11      +9 -16     jboss-jms/src/main/org/jboss/jms/server/endpoint/ConsumerEndpoint.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: ConsumerEndpoint.java
  ===================================================================
  RCS file: /cvsroot/jboss/jboss-jms/src/main/org/jboss/jms/server/endpoint/ConsumerEndpoint.java,v
  retrieving revision 1.10
  retrieving revision 1.11
  diff -u -b -r1.10 -r1.11
  --- ConsumerEndpoint.java	28 Mar 2006 14:26:16 -0000	1.10
  +++ ConsumerEndpoint.java	17 Jul 2006 17:14:45 -0000	1.11
  @@ -21,34 +21,27 @@
     */
   package org.jboss.jms.server.endpoint;
   
  -import java.util.List;
  -
   import javax.jms.JMSException;
   
   import org.jboss.jms.client.Closeable;
  -import org.jboss.jms.message.MessageProxy;
   
   /**
    * Represents the set of methods from the ConsumerDelegate that are handled on the server.
    * The rest of the methods are handled in the advice stack.
    *
    * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  - * @version <tt>$Revision: 1.10 $</tt>
  + * @version <tt>$Revision: 1.11 $</tt>
    *
  - * $Id: ConsumerEndpoint.java,v 1.10 2006/03/28 14:26:16 timfox Exp $
  + * $Id: ConsumerEndpoint.java,v 1.11 2006/07/17 17:14:45 timfox Exp $
    */
   public interface ConsumerEndpoint extends Closeable
   {
  -   void cancelDelivery(long messageID) throws JMSException;
  -
  -   void cancelDeliveries(List messageIDs) throws JMSException;
  -
  -   MessageProxy getMessageNow(boolean wait) throws JMSException;
  -
  -   void activate() throws JMSException;
  -
      /**
  -    * @return the last message ID delivered to the client consumer
  +    * If the client buffer has previously become full because the server was sending at a faster rate than the
  +    * client could consume, then the server will stop sending messages.
  +    * When the client has emptied the buffer it then needs to inform the server that it can receive more messages
  +    * by calling this method
  +    * @throws JMSException
       */
  -   long deactivate() throws JMSException;
  +   void more() throws JMSException;   
   }
  
  
  
  1.45      +15 -9     jboss-jms/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: ServerConnectionEndpoint.java
  ===================================================================
  RCS file: /cvsroot/jboss/jboss-jms/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java,v
  retrieving revision 1.44
  retrieving revision 1.45
  diff -u -b -r1.44 -r1.45
  --- ServerConnectionEndpoint.java	24 Jun 2006 09:05:37 -0000	1.44
  +++ ServerConnectionEndpoint.java	17 Jul 2006 17:14:46 -0000	1.45
  @@ -70,9 +70,9 @@
    * 
    * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
    * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  - * @version <tt>$Revision: 1.44 $</tt>
  + * @version <tt>$Revision: 1.45 $</tt>
    *
  - * $Id: ServerConnectionEndpoint.java,v 1.44 2006/06/24 09:05:37 timfox Exp $
  + * $Id: ServerConnectionEndpoint.java,v 1.45 2006/07/17 17:14:46 timfox Exp $
    */
   public class ServerConnectionEndpoint implements ConnectionEndpoint
   {
  @@ -125,10 +125,12 @@
      
      private byte usingVersion;
   
  +   private int prefetchSize;
  +
      // Constructors --------------------------------------------------
      
      protected ServerConnectionEndpoint(ServerPeer serverPeer, String clientID,
  -                                      String username, String password)
  +                                      String username, String password, int prefetchSize)
      {
         this.serverPeer = serverPeer;
   
  @@ -141,6 +143,7 @@
   
         this.connectionID = serverPeer.getNextObjectID();
         this.clientID = clientID;
  +      this.prefetchSize = prefetchSize;
   
         sessions = new ConcurrentReaderHashMap();
         temporaryDestinations = new ConcurrentReaderHashSet();
  @@ -410,8 +413,6 @@
            {
               if (trace) { log.trace("one phase rollback request received"); }
                 
  -            // We just need to cancel deliveries
  -
               Transaction tx = null;
               try
               {               
  @@ -598,6 +599,11 @@
         return usingVersion;
      }
            
  +   public int getPrefetchSize()
  +   {
  +      return prefetchSize;
  +   }
  +         
      public String toString()
      {
         return "ConnectionEndpoint[" + connectionID + "]";
  @@ -805,7 +811,7 @@
   
      // Private -------------------------------------------------------
      
  -   private void setStarted(boolean s)
  +   private void setStarted(boolean s) throws JMSException
      {
         synchronized(sessions)
         {
  
  
  
  1.25      +12 -8     jboss-jms/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: ServerConnectionFactoryEndpoint.java
  ===================================================================
  RCS file: /cvsroot/jboss/jboss-jms/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java,v
  retrieving revision 1.24
  retrieving revision 1.25
  diff -u -b -r1.24 -r1.25
  --- ServerConnectionFactoryEndpoint.java	23 May 2006 18:25:08 -0000	1.24
  +++ ServerConnectionFactoryEndpoint.java	17 Jul 2006 17:14:46 -0000	1.25
  @@ -38,9 +38,9 @@
    *
    * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
    * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  - * @version <tt>$Revision: 1.24 $</tt>
  + * @version <tt>$Revision: 1.25 $</tt>
    *
  - * $Id: ServerConnectionFactoryEndpoint.java,v 1.24 2006/05/23 18:25:08 ovidiu Exp $
  + * $Id: ServerConnectionFactoryEndpoint.java,v 1.25 2006/07/17 17:14:46 timfox Exp $
    */
   public class ServerConnectionFactoryEndpoint implements ConnectionFactoryEndpoint
   {
  @@ -52,13 +52,15 @@
      
      // Attributes ----------------------------------------------------
   
  -   protected ServerPeer serverPeer;
  +   private ServerPeer serverPeer;
      
  -   protected String clientID;
  +   private String clientID;
      
  -   protected int id;
  +   private int id;
      
  -   protected JNDIBindings jndiBindings;
  +   private JNDIBindings jndiBindings;
  +   
  +   private int prefetchSize;
    
      // Constructors --------------------------------------------------
   
  @@ -68,12 +70,14 @@
       */
      public ServerConnectionFactoryEndpoint(int id, ServerPeer serverPeer,
                                             String defaultClientID,
  -                                          JNDIBindings jndiBindings)
  +                                          JNDIBindings jndiBindings,
  +                                          int preFetchSize)
      {
         this.serverPeer = serverPeer;
         this.clientID = defaultClientID;
         this.id = id;
         this.jndiBindings = jndiBindings;
  +      this.prefetchSize = preFetchSize;
      }
   
      // ConnectionFactoryDelegate implementation ----------------------
  @@ -101,7 +105,7 @@
         // create the corresponding "server-side" connection endpoint and register it with the
         // server peer's ClientManager
         ServerConnectionEndpoint endpoint =
  -         new ServerConnectionEndpoint(serverPeer, clientID, username, password);
  +         new ServerConnectionEndpoint(serverPeer, clientID, username, password, prefetchSize);
   
         int connectionID = endpoint.getConnectionID();
   
  
  
  
  1.44      +477 -349  jboss-jms/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: ServerConsumerEndpoint.java
  ===================================================================
  RCS file: /cvsroot/jboss/jboss-jms/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java,v
  retrieving revision 1.43
  retrieving revision 1.44
  diff -u -b -r1.43 -r1.44
  --- ServerConsumerEndpoint.java	26 Jun 2006 18:41:21 -0000	1.43
  +++ ServerConsumerEndpoint.java	17 Jul 2006 17:14:46 -0000	1.44
  @@ -32,12 +32,15 @@
   import javax.jms.InvalidSelectorException;
   import javax.jms.JMSException;
   
  +import org.jboss.jms.client.remoting.HandleMessageResponse;
   import org.jboss.jms.destination.JBossDestination;
   import org.jboss.jms.message.JBossMessage;
   import org.jboss.jms.message.MessageProxy;
   import org.jboss.jms.selector.Selector;
  -import org.jboss.jms.server.plugin.contract.ThreadPool;
  +import org.jboss.jms.server.ConnectionManager;
  +import org.jboss.jms.server.QueuedExecutorPool;
   import org.jboss.jms.server.remoting.JMSDispatcher;
  +import org.jboss.jms.server.remoting.MessagingMarshallable;
   import org.jboss.jms.server.subscription.Subscription;
   import org.jboss.jms.util.MessagingJMSException;
   import org.jboss.logging.Logger;
  @@ -53,6 +56,10 @@
   import org.jboss.messaging.core.tx.Transaction;
   import org.jboss.messaging.core.tx.TransactionException;
   import org.jboss.messaging.core.tx.TxCallback;
  +import org.jboss.messaging.util.Future;
  +
  +import EDU.oswego.cs.dl.util.concurrent.Executor;
  +import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
   
   /**
    * Concrete implementation of ConsumerEndpoint. Lives on the boundary between Messaging Core and the
  @@ -60,9 +67,9 @@
    *
    * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
    * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  - * @version <tt>$Revision: 1.43 $</tt>
  + * @version <tt>$Revision: 1.44 $</tt>
    *
  - * $Id: ServerConsumerEndpoint.java,v 1.43 2006/06/26 18:41:21 timfox Exp $
  + * $Id: ServerConsumerEndpoint.java,v 1.44 2006/07/17 17:14:46 timfox Exp $
    */
   public class ServerConsumerEndpoint implements Receiver, Filter, ConsumerEndpoint
   {
  @@ -90,37 +97,41 @@
   
      private Selector messageSelector;
   
  -   private ThreadPool threadPoolDelegate;
  +   private DeliveryCallback deliveryCallback;
   
  -   private volatile boolean started;
  +   private JBossDestination destination;
   
  -   private boolean disconnected = false;
  +   private List toDeliver;
   
  -   // deliveries must be maintained in order they were received
  -   private Map deliveries;
  +   //Must be volatile
  +   private volatile boolean clientConsumerFull;
   
  -   private boolean closed;
  +   //Must be volatile
  +   private volatile boolean bufferFull;
   
  -   private boolean active;
  +   //No need to be volatile - is protected by lock
  +   private boolean started;
   
  -   private boolean grabbing;
  +   //No need to be volatile
  +   private boolean closed;
   
  -   private MessageProxy toGrab;
  +   //No need to be volatile
  +   private boolean disconnected;
   
  -   private DeliveryCallback deliveryCallback;
  +   private Executor executor;
   
  -   private boolean selectorRejected;
  +   private int prefetchSize;
      
  -   private JBossDestination destination;
  +   private Object lock;
   
  -   // We record the id of the last message delivered to the client consumer
  -   private long lastMessageIDDelivered = -1;
  +   private Map deliveries;
   
      // Constructors --------------------------------------------------
   
      protected ServerConsumerEndpoint(int id, Channel channel,
                                       ServerSessionEndpoint sessionEndpoint,
  -                                    String selector, boolean noLocal, JBossDestination dest)
  +                                    String selector, boolean noLocal, JBossDestination dest,
  +                                    int prefetchSize)
                                       throws InvalidSelectorException
      {
         if (trace) { log.trace("creating consumer endpoint " + id); }
  @@ -128,11 +139,42 @@
         this.id = id;
         this.channel = channel;
         this.sessionEndpoint = sessionEndpoint;
  -      this.threadPoolDelegate =
  -         sessionEndpoint.getConnectionEndpoint().getServerPeer().getThreadPoolDelegate();
  +      this.prefetchSize = prefetchSize;
  +      
  +      //We always created with clientConsumerFull = true
  +      //This prevents the SCD sending messages to the client before the client has fully
  +      //finished creating the MessageCallbackHandler      
  +      this.clientConsumerFull = true;
  +            
  +      //We allocate an executor for this consumer based on the destination name
  +      //so that all consumers for the same destination currently use the same executor
  +      //(we can change this if need be)
  +      //Note that they do not use the same executor as the channel of the destination
  +      QueuedExecutorPool pool =
  +         sessionEndpoint.getConnectionEndpoint().getServerPeer().getQueuedExecutorPool();
  +      
  +      this.executor = (QueuedExecutor)pool.get("consumer" + dest.getName());
  +             
  +      /*
  +      Note that using a PooledExecutor with a linked queue is not sufficient to ensure that
  +      deliveries for the same consumer happen serially, since even if they are queued serially
  +      the actual deliveries can happen in parallel, resulting in a later one "overtaking" an earlier
  +      non-deterministicly depending on thread scheduling.
  +      Consequently we use a QueuedExecutor to ensure the deliveries happen sequentially.
  +      We do not want each ServerConsumerEndpoint instance to have it's own instance - since
  +      we would end up using too many threads, neither do we want to share the same instance
  +      amongst all consumers - we do not want to serialize delivery to all consumers.
  +      So we maintain a bag of QueuedExecutors and give them out to consumers as required.
  +      Different consumers can end up using the same queuedexecutor concurrently if there are a lot
  +      of active consumers. 
  +      */      
         this.noLocal = noLocal;
         this.destination = dest;
   
  +      this.toDeliver = new ArrayList();
  +      
  +      this.lock = new Object();
  +
         if (selector != null)
         {
            if (trace) log.trace("creating selector:" + selector);
  @@ -140,52 +182,75 @@
            if (trace) log.trace("created selector");
         }
   
  +      //FIXME - 
  +      //We really need to get rid of this delivery list - it's only purpose in life is to solve
  +      //the race condition where acks or cancels can come in before handle has returned - and
  +      //that can be solved in a simpler way anyway.
  +      //It adds extra complexity both in all the extra code necessary to maintain it, the extra memory
  +      //needed to maintain it, the extra complexity in synchronization on this class to protect access to it
  +      //and when we do clustering we will have to replicate it too!!
  +      //Let's GET RID OF IT!!!!!!!!!!!
         this.deliveries = new LinkedHashMap();
  -      this.started = this.sessionEndpoint.getConnectionEndpoint().isStarted();
   
  -      // adding the consumer to the channel
  +      
  +      this.started = this.sessionEndpoint.getConnectionEndpoint().isStarted();      // adding the consumer to the channel
         this.channel.add(this);
   
  +      //prompt delivery
  +      channel.deliver(false);
  +      
         log.debug(this + " created");
      }
   
      // Receiver implementation ---------------------------------------
   
  -   // There is no need to synchronize this method. The channel synchronizes delivery to its
  -   // consumers
  +   /*
  +    * The channel ensures that handle is never called concurrently by more than one thread
  +    */
      public Delivery handle(DeliveryObserver observer, Routable reference, Transaction tx)
      {
         if (trace) { log.trace(this + " receives reference " + reference.getMessageID() + " for delivery"); }
   
  -      MessageReference ref = (MessageReference)reference;
  -
  -      if (!isReady())
  +      //This is ok to have outside lock - is volatile
  +      if (bufferFull)
         {
  -         if (trace) { log.trace(this + " rejects reference with ID " + ref.getMessageID()); }
  +         //We buffer a maximum of PREFETCH_LIMIT messages at once
  +         
  +         if (trace) { log.trace(this + " has reached prefetch size will not accept any more references"); }
  +         
            return null;
         }
   
  -      try
  +      //Need to synchronized around the whole block to prevent setting started = false
  +      //but handle is already running and a message is deposited during the stop procedure
  +      synchronized (lock)
         {
  -         Delivery delivery = null;
  +         // If the consumer is stopped then we don't accept the message, it should go back into the
  +         // channel for delivery later.
  +         if (!started)
  +         {
  +            // this is a common programming error, make this visible in the debug logs
  +            // TODO: analyse performance implications
  +            log.debug(this + " NOT started yet!");
  +            return null;
  +         }
  +            
  +         MessageReference ref = (MessageReference)reference;
   
            JBossMessage message = (JBossMessage)ref.getMessage();
   
  -         // TODO - We need to put the message in a DLQ
  -         // For now we just ack it otherwise the message will keep being retried and we'll never get
  -         // anywhere
  -         if (ref.getDeliveryCount() > MAX_DELIVERY_ATTEMPTS)
  +         boolean selectorRejected = !this.accept(message);
  +   
  +         SimpleDelivery delivery = new SimpleDelivery(observer, ref, false, !selectorRejected);
  +            
  +         checkDeliveryCount(delivery);
  +         
  +         if (delivery.isDone())
            {
  -            log.warn(message + " has exceed maximum delivery attempts and will be removed");
  -            delivery = new SimpleDelivery(observer, ref, true);
               return delivery;
            }                 
   
  -         selectorRejected = !this.accept(message);
  -
  -         delivery = new SimpleDelivery(observer, ref, false, !selectorRejected);
            deliveries.put(new Long(ref.getMessageID()), delivery);
  -
            if (selectorRejected)
            {
               // we "arrest" the message so we can get the next one
  @@ -194,6 +259,12 @@
               // http://jira.jboss.org/jira/browse/JBMESSAGING-275
               if (trace) { log.trace(this + " DOES NOT accept the message because the selector rejected it"); }
   
  +            //FIXME - This hack also breaks delivery behaviour - if there are multiple competing consumers
  +            //on the same queue, each with a different selector, then if the message arrives at one receiver
  +            //(e.g. this one) and doesn't match the selector, then it is arrested, which means the 
  +            //PointToPointRouter does not try the next receiver which does match.
  +            //See 
  +   
               // ... however, keep asking for messages, the fact that this one wasn't accepted doesn't
               // mean that the next one it won't.
   
  @@ -207,45 +278,29 @@
   
            MessageProxy mp = JBossMessage.createThinDelegate(message, ref.getDeliveryCount());
   
  -         if (!grabbing)
  -         {
  -            // We want to asynchronously deliver the message to the consumer. Deliver the message on
  -            // a different thread than the core thread that brought it here.
  +         //Add the proxy to the list to deliver
   
  +         toDeliver.add(mp);     
  +          
  +         bufferFull = toDeliver.size() >= prefetchSize;
  +             
  +         if (!clientConsumerFull)
  +         {            
               try
               {
  -               if (trace) { log.trace("queueing message " + message + " for delivery to client"); }
  -               threadPoolDelegate.execute(
  -                  new DeliveryRunnable(mp, id, sessionEndpoint.getConnectionEndpoint(), trace));
  +               this.executor.execute(new Deliverer());
               }
               catch (InterruptedException e)
               {
                  log.warn("Thread interrupted", e);
               }
            }
  -         else
  -         {
  -            // The message is being "grabbed" and returned for receiveNoWait semantics
  -            toGrab = mp;
  -         }
  -
  -         lastMessageIDDelivered = mp.getMessage().getMessageID();
   
            return delivery;
         }
  -      finally
  -      {
  -         // reset the "active" state, but only if the current message hasn't been rejected by
  -         // selector, because otherwise we want to get more messages
  -         // TODO this is a kludge that will be cleared by http://jira.jboss.org/jira/browse/JBMESSAGING-275
  -         if (!selectorRejected)
  -         {
  -            active = false;
  -            grabbing = false;
  -         }
  -      }
      }
   
  +
      // Filter implementation -----------------------------------------
   
      public boolean accept(Routable r)
  @@ -285,22 +340,21 @@
      public void closing() throws JMSException
      {
         if (trace) { log.trace(this + " closing"); }
  +      
  +      stop();         
      }
   
      public void close() throws JMSException
      {
  -      if (closed)
  +      synchronized (lock)
         {
  -         throw new IllegalStateException("Consumer is already closed");
  -      }
  -
  -      if (trace) { log.trace(this + " close"); }
  -
  -      closed = true;
  -
  -      // On close we only disconnect the consumer from the Channel we don't actually remove it
  +         //On close we only disconnect the consumer from the Channel we don't actually remove it
         // This is because it may still contain deliveries that may well be acknowledged after
         // the consumer has closed. This is perfectly valid.
  +         //FIXME - The deliveries should really be stored in the session endpoint, not here
  +         //that is their natural place, that would mean we wouldn't have to mess around with keeping
  +         //deliveries after this is closed
  +               
         disconnect();
   
         JMSDispatcher.instance.unregisterTarget(new Integer(id));
  @@ -322,107 +376,64 @@
               throw new MessagingJMSException("Failed to disconnect", e);
            }
         }           
  -   }
  -
  -   // ConsumerEndpoint implementation -------------------------------
   
  -   public void cancelDelivery(long messageID) throws JMSException
  -   {
  -      SingleReceiverDelivery del = (SingleReceiverDelivery)deliveries.remove(new Long(messageID));
  -      if (del != null)
  -      {
  -         try
  -         {
  -            del.cancel();
  -         }
  -         catch (Throwable t)
  -         {
  -            throw new MessagingJMSException("Failed to cancel delivery " + del, t);
  -         }
  -         promptDelivery();
  -      }
  -      else
  -      {
  -         throw new IllegalStateException("Cannot find delivery to cancel:" + messageID);
  +         closed = true;
         }
      }
   
  -   public void cancelDeliveries(List messageIDs) throws JMSException
  -   {
  -      //Cancel in reverse order to preserve order in queue
  -
  -      for (int i = messageIDs.size() - 1; i >= 0; i--)
  -      {
  -         Long id = (Long)messageIDs.get(i);
  -
  -         cancelDelivery(id.longValue());
  -      }
  -   }
  +   // ConsumerEndpoint implementation -------------------------------
   
  -   /**
  -    * We attempt to get the message directly fron the channel first. If we find one, we return that.
  -    * Otherwise, if wait = true, we register as being interested in receiving a message
  -    * asynchronously, then return and wait for it on the client side.
  +   /*
  +    * This is called by the client consumer to tell the server to wake up and start sending more
  +    * messages if available
       */
  -   public MessageProxy getMessageNow(boolean wait) throws JMSException
  -   {
  -      synchronized (channel)
  +   public void more()
         {
            try
            {
  -            grabbing = true;
  +         /*
  +         Set clientConsumerFull to false
  +         NOTE! This must be done using a Runnable on the delivery executor - this is to
  +         prevent the following race condition:
  +         1) Messages are delivered to the client, causing it to be full
  +         2) The messages are consumed very quickly on the client causing more to be called()
  +         3) more() hits the server BEFORE the deliverer thread has returned from delivering to the client
  +         causing clientConsumerFull to be set to false and adding a deliverer to the queue.
  +         4) The deliverer thread returns and sets clientConsumerFull to true
  +         5) The next deliverer runs but doesn't do anything since clientConsumerFull = true even
  +         though the client needs messages
  +         */
  +         this.executor.execute(new Runnable() { public void run() { clientConsumerFull = false; } });         
   
  -            // This will always deliver a message (if there is one) on the same thread
  -            promptDelivery();
  +         //Run a deliverer to deliver any existing ones
  +         this.executor.execute(new Deliverer());
   
  -            if (wait && toGrab == null)
  -            {
  -               active = true;
  -            }
  +         //TODO Why do we need to wait for it to execute??
  +         //Why not just return immediately?
   
  -            return toGrab;
  -         }
  -         finally
  -         {
  -            toGrab = null;
  -            grabbing = false;
  -         }
  -      }
  -   }
  +         //Now wait for it to execute
  +         Future result = new Future();
   
  -   public long deactivate() throws JMSException
  -   {
  -      synchronized (channel)
  -      {
  -         active = false;
  -         if (trace) { log.trace(this + " deactivated"); }
  +         this.executor.execute(new Waiter(result));
   
  -         return lastMessageIDDelivered;
  -      }
  -   }
  +         result.getResult();
      
  -   public void activate() throws JMSException
  -   {
  -      synchronized (channel)
  -      {
  -         if (closed)
  +         //Now we know the deliverer has delivered any outstanding messages to the client buffer
  +      }
  +      catch (InterruptedException e)
            {
  -            //Do nothing
  -            return;
  +         log.warn("Thread interrupted", e);
            }
            
  -         active = true;
  -         if (trace) { log.trace(this + " just activated"); }
  -         
  -         promptDelivery();
  -      }
  +      channel.deliver(false);
      }
      
  +   
      // Public --------------------------------------------------------
      
      public String toString()
      {
  -      return "ConsumerEndpoint[" + id + "]" + (active ? "(active)" : "");
  +      return "ConsumerEndpoint[" + id + "]";
      }
      
      public JBossDestination getDestination()
  @@ -435,14 +446,18 @@
         return sessionEndpoint;
      }
      
  -   // Package protected ---------------------------------------------
  -   
  -   // Protected -----------------------------------------------------
  +   public int getId()
  +   {
  +      return id;
  +   }
      
      /**
       * Actually remove the consumer and clear up any deliveries it may have
  -    * */
  -   protected void remove() throws JMSException
  +    * This is called by the session on session.close()
  +    * We can get rid of this when we store the deliveries on the session
  +    *
  +    **/
  +   public void remove() throws JMSException
      {
         if (trace) log.trace("attempting to remove receiver " + this + " from destination " + channel);
         
  @@ -480,19 +495,24 @@
            //If we cancelled any deliveries we need to force a deliver on the channel
            //This is because there may be other waiting competing consumers who need a chance to get
            //any of the cancelled messages
  -         channel.deliver();
  +         channel.deliver(false);
         }
      }  
      
  -   protected void acknowledgeAll() throws JMSException
  +   public void acknowledge(long messageID) throws JMSException
      {
  -      // acknowledge all "pending" deliveries, except the ones corresponding to messages rejected
  -      // by selector, which are cancelled
  +      // acknowledge a delivery
         try
         {     
  -         for(Iterator i = deliveries.values().iterator(); i.hasNext(); )
  +         SingleReceiverDelivery d;
  +           
  +         synchronized (lock)
  +         {
  +            d = (SingleReceiverDelivery)deliveries.remove(new Long(messageID));
  +         }
  +         
  +         if (d != null)
            {
  -            SingleReceiverDelivery d = (SingleReceiverDelivery)i.next();
   
               //TODO - Selector kludge - remove this
               if (d.isSelectorAccepted())
  @@ -504,8 +524,10 @@
                  d.cancel();
               }
            }
  -
  -         deliveries.clear();
  +         else
  +         {
  +            throw new IllegalStateException("Cannot find delivery to acknowledge:" + messageID);
  +         }
         }
         catch(Throwable t)
         {
  @@ -513,15 +535,17 @@
         }
      }
      
  -   
  -   protected void acknowledgeTransactionally(long messageID, Transaction tx) throws JMSException
  +   public void acknowledgeTransactionally(long messageID, Transaction tx) throws JMSException
      {
  -      if (trace) { log.trace("acknowledging " + messageID); }
  +      if (trace) { log.trace("acknowledging transactionally " + messageID); }
         
         SingleReceiverDelivery d = null;
                 
         // The actual removal of the deliveries from the delivery list is deferred until tx commit
  +      synchronized (lock)
  +      {
         d = (SingleReceiverDelivery)deliveries.get(new Long(messageID));
  +      }
         if (deliveryCallback == null)
         {
            deliveryCallback = new DeliveryCallback();
  @@ -529,7 +553,6 @@
         }
         deliveryCallback.addMessageID(messageID);
            
  -      
         if (d != null)
         {
            try
  @@ -548,89 +571,131 @@
         }       
      }
      
  -   protected void removeDelivery(String messageID) throws JMSException
  +   // Package protected ---------------------------------------------
  +   
  +   // Protected -----------------------------------------------------   
  +   
  +   protected void promptDelivery()
      {      
  -      if (deliveries.remove(messageID) == null)
  +      channel.deliver(false);
  +   }
  +   
  +   protected void cancelDelivery(Long messageID) throws JMSException
  +   {
  +      SingleReceiverDelivery del = (SingleReceiverDelivery)deliveries.remove(messageID);
  +      if (del != null)
         {
  -         throw new IllegalStateException("Cannot find delivery to remove:" + messageID);
  +          del.getReference().decrementDeliveryCount();    
  +          try
  +          {
  +             del.cancel();
  +          }
  +          catch (Throwable t)
  +          {
  +             throw new MessagingJMSException("Failed to cancel delivery " + del, t);
  +          }
  +      }
  +      else
  +      {
  +          throw new IllegalStateException("Cannot find delivery to cancel:" + id);
         }      
      }
      
  -   protected void cancelAllDeliveries() throws JMSException
  +   protected void start() throws JMSException
  +   {             
  +      synchronized (lock)
  +      {
  +         //can't start or stop it if it is closed
  +         if (closed)
      {
  -      if (trace) { log.trace(this + " cancels deliveries"); }
  +            return;
  +         }
              
  -      // Need to cancel starting at the end of the list and working to the front in order that the
  -      // messages end up back in the correct order in the channel.
  +         if (started)
  +         {
  +            return;
  +         }
         
  -      List toCancel = new ArrayList();
  +         started = true;
  +      }
         
  -      Iterator iter = deliveries.values().iterator();
  -      while (iter.hasNext())
  -      {
  -         SingleReceiverDelivery d = (SingleReceiverDelivery)iter.next();
  -         toCancel.add(d);
  +      //Prompt delivery
  +      channel.deliver(false);
         }
         
  -      for (int i = toCancel.size() - 1; i >= 0; i--)
  +   protected void stop() throws JMSException
         {   
  -         SingleReceiverDelivery d = (SingleReceiverDelivery)toCancel.get(i);
  -         try
  +      //We need to:
  +      //Stop accepting any new messages in the SCE
  +      //Flush any messages from the SCE to the buffer
  +      //If the client consumer is now full, then we need to cancel the ones in the toDeliver list
  +
  +      //We need to lock since otherwise we could set started to false but the handle method was already executing
  +      //and messages might get deposited after
  +      synchronized (lock)
            {
  -            d.cancel();      
  -            if (trace) { log.trace(d +  " canceled"); }
  -         }
  -         catch(Throwable t)
  +         //can't start or stop it if it is closed
  +         if (closed)
            {
  -            log.error("Failed to cancel delivery: " + d, t);
  -         }     
  +            return;
         }     
         
  -      deliveries.clear();
  -      promptDelivery();      
  +         started = false;
      }
      
  -   protected void setStarted(boolean started)
  +      //Now we know no more messages will be accepted in the SCE
  +            
  +      try
      {
  -      if (trace) { log.trace(this + (started ? " started" : " stopped")); }
  +         //Flush any messages waiting to be sent to the client
  +         this.executor.execute(new Deliverer());
         
  -      this.started = started;   
  +         //Now wait for it to execute
  +         Future result = new Future();
         
  -      if (started)
  -      {
  -         //need to prompt delivery   
  -         promptDelivery();
  +         this.executor.execute(new Waiter(result));
  +         
  +         result.getResult();
  +             
  +         //Now we know any deliverer has delivered any outstanding messages to the client buffer
         }
  +      catch (InterruptedException e)
  +      {
  +         log.warn("Thread interrupted", e);
      }
      
  -   protected void promptDelivery()
  +      //Now we know that there are no in flight messages on the way to the client consumer.
  +      
  +      //But there may be messages still in the toDeliver list since the client consumer might be full
  +      //So we need to cancel these
  +            
  +      if (!toDeliver.isEmpty())
      {
  -      if (active || grabbing)
  +         synchronized (lock)
         {
  -         // prompt delivery in a loop, since this consumer may "arrest" a message not accepted
  -         // by the selector, while it still wants to get the next one
  -         // TODO this is a kludge that will be cleared by http://jira.jboss.org/jira/browse/JBMESSAGING-275
  -         while(true)
  +            for (int i = toDeliver.size() - 1; i >= 0; i--)
            {
  -            if (trace) { log.trace(this + " prompts delivery"); }
  -
  -            selectorRejected = false;
  +               MessageProxy proxy = (MessageProxy)toDeliver.get(i);
   
  -            channel.deliver(this);
  +               long id = proxy.getMessage().getMessageID();
   
  -            if (!selectorRejected)
  -            {
  -               break;
  +               cancelDelivery(new Long(id));
               }
            }
  +                 
  +         toDeliver.clear();
  +         
  +         bufferFull = false;
         }      
      }
      
  +   // Private -------------------------------------------------------
  +   
      /**
       * Disconnect this consumer from the Channel that feeds it. This method does not clear up
       * deliveries, except the "arrested" ones
       */
  -   protected void disconnect()
  +   private void disconnect()
      {
         // clean up "arrested" deliveries, no acknowledgment will ever come for them
         for(Iterator i = deliveries.values().iterator(); i.hasNext(); )
  @@ -658,46 +723,132 @@
         }
      }
      
  +   private void checkDeliveryCount(SimpleDelivery del)
  +   {
  +      //TODO - We need to put the message in a DLQ
  +      // For now we just ack it otherwise the message will keep being retried and we'll never get
  +      // anywhere
  +      if (del.getReference().getDeliveryCount() > MAX_DELIVERY_ATTEMPTS)
  +      {
  +         log.warn(del.getReference() + " has exceed maximum delivery attempts and will be removed");
  +         
  +         try
  +         {
  +            del.acknowledge(null);
  +         }
  +         catch (Throwable t)
  +         {
  +            log.error("Failed to acknowledge delivery", t);
  +         }
  +      }                 
  +
  +   }
  +   
  +   // Inner classes -------------------------------------------------   
  +   
      /*
  -    * Do we want to handle the message? (excluding filter check)
  +    * Delivers messages to the client 
  +    * TODO - We can make this a bit more intelligent by letting it measure the rate
  +    * the client is consuming messages and send messages at that rate.
  +    * This would mean the client consumer wouldn't be full so often and more wouldn't have to be called
  +    * This should give higher throughput.
       */
  -   protected boolean isReady()
  +   private class Deliverer implements Runnable
      {
  -      // If the client side consumer is not ready to accept a message and have it sent to it
  -      // or we're not grabbing a message for receiveNoWait we return null to refuse the message
  -      if (!active && !grabbing)
  +      public void run()
         {
  -         if (trace) { log.trace(this + " not ready"); }
  -         return false;
  +         //Is there anything to deliver?
  +         //This is ok outside lock - is volatile
  +         if (clientConsumerFull)
  +         {
  +            //Do nothing
  +            return;
         }
         
  -      if (closed)
  +         List list = null;
  +             
  +         synchronized (lock)
         {
  -         if (trace) { log.trace(this + " closed"); }
  -         return false;
  +            if (!toDeliver.isEmpty())
  +            {
  +               list = new ArrayList(toDeliver);
  +               
  +               toDeliver.clear();
  +               
  +               bufferFull = false;
  +            }
         }
         
  -      // If the consumer is stopped then we don't accept the message, it should go back into the
  -      // channel for delivery later.
  -      if (!started)
  +         if (list != null)
         {
  -         // this is a common programming error, make this visible in the debug logs
  -         // TODO: anaylize performance implications
  -         log.debug(this + " NOT started yet!");
  -         return false;
  +            ServerConnectionEndpoint connection =
  +               ServerConsumerEndpoint.this.sessionEndpoint.getConnectionEndpoint();
  +
  +            try
  +            {
  +               if (trace) { log.trace("handing " + list.size() + " messages over to the remoting layer"); }
  +            
  +               ClientDelivery del = new ClientDelivery(list, id);
  +               
  +               //TODO How can we ensure that messages for the same consumer aren't delivered
  +               //concurrently to the same consumer on different threads?
  +               MessagingMarshallable mm = new MessagingMarshallable(connection.getUsingVersion(), del);
  +               
  +               MessagingMarshallable resp = (MessagingMarshallable)connection.getCallbackClient().invoke(mm);    
  +                
  +               HandleMessageResponse result = (HandleMessageResponse)resp.getLoad();
  +               
  +               if (trace) { log.trace("handed messages over to the remoting layer"); }
  +               
  +               //For now we don't look at how many messages are accepted since they all will be
  +               //The field is a placeholder for the future
  +               if (result.clientIsFull())
  +               {
  +                  //Stop the server sending any more messages to the client
  +                  //This is ok outside lock
  +                  clientConsumerFull = true;       
         }
  +            }
  +            catch(Throwable t)
  +            {
  +               log.warn("Failed to deliver the message to the client.");
         
  -      //TODO nice all the message headers and properties are in the reference we can do the 
  -      //filter check in here too.
  +               if (trace)
  +               {
  +                  log.trace("Failed to deliver message", t);
  +               }
  +               
  +               ConnectionManager mgr = connection.getServerPeer().getConnectionManager();
         
  -      return true;
  +               mgr.handleClientFailure(connection.getRemotingClientSessionId());
  +            }
  +         }              
  +      }
      }
      
  -   // Private -------------------------------------------------------
  +   /*
  +    * The purpose of this class is to put it on the QueuedExecutor and wait for it to run
  +    * We can then ensure that all the Runnables in front of it on the queue have also executed
  +    * We cannot just call shutdownAfterProcessingCurrentlyQueuedTasks() since the
  +    * QueueExecutor might be share by other consumers and we don't want to wait for their
  +    * tasks to complete
  +    */
  +   private class Waiter implements Runnable
  +   {
  +      Future result;
      
  -   // Inner classes -------------------------------------------------   
  +      Waiter(Future result)
  +      {
  +         this.result = result;
  +      }
  +      
  +      public void run()
  +      {
  +         result.setResult(null);
  +      }
  +   }
      
  -   class DeliveryCallback implements TxCallback
  +   private class DeliveryCallback implements TxCallback
      {
         List delList = new ArrayList();
         
  @@ -740,30 +891,7 @@
         
         public void afterRollback(boolean onePhase) throws TransactionException
         { 
  -         // Cancel the deliveries. Need to be cancelled in reverse order to maintain ordering
  -         for(Iterator i = delList.iterator(); i.hasNext(); )
  -         {
  -            Long messageID = (Long)i.next();
  -            
  -            SimpleDelivery del;
  -            
  -            if ((del = (SimpleDelivery)deliveries.remove(messageID)) == null)
  -            {
  -               throw new TransactionException("Failed to remove delivery " + messageID);
  -            }
  -            
  -            // cancel the delivery
  -            try
  -            {
  -               del.cancel();
  -            }
  -            catch (Throwable t)
  -            {
  -               throw new TransactionException("Failed to cancel delivery " + del, t);
  -            }
  -         }
  -         
  -         deliveryCallback = null;
  +         //Do nothing
         }
         
         void addMessageID(long messageID)
  
  
  
  1.41      +78 -29    jboss-jms/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: ServerSessionEndpoint.java
  ===================================================================
  RCS file: /cvsroot/jboss/jboss-jms/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java,v
  retrieving revision 1.40
  retrieving revision 1.41
  diff -u -b -r1.40 -r1.41
  --- ServerSessionEndpoint.java	27 Jun 2006 19:44:39 -0000	1.40
  +++ ServerSessionEndpoint.java	17 Jul 2006 17:14:46 -0000	1.41
  @@ -24,6 +24,7 @@
   import java.util.HashMap;
   import java.util.HashSet;
   import java.util.Iterator;
  +import java.util.List;
   import java.util.Map;
   import java.util.Set;
   
  @@ -46,6 +47,7 @@
   import org.jboss.jms.server.remoting.JMSDispatcher;
   import org.jboss.jms.server.subscription.DurableSubscription;
   import org.jboss.jms.server.subscription.Subscription;
  +import org.jboss.jms.tx.AckInfo;
   import org.jboss.jms.util.MessagingJMSException;
   import org.jboss.logging.Logger;
   import org.jboss.messaging.core.Channel;
  @@ -61,9 +63,9 @@
    * 
    * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
    * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  - * @version <tt>$Revision: 1.40 $</tt>
  + * @version <tt>$Revision: 1.41 $</tt>
    *
  - * $Id: ServerSessionEndpoint.java,v 1.40 2006/06/27 19:44:39 timfox Exp $
  + * $Id: ServerSessionEndpoint.java,v 1.41 2006/07/17 17:14:46 timfox Exp $
    */
   public class ServerSessionEndpoint implements SessionEndpoint
   {
  @@ -250,14 +252,17 @@
            }
         }
         
  +      int prefetchSize = connectionEndpoint.getPrefetchSize();
  +      
         ServerConsumerEndpoint ep =
            new ServerConsumerEndpoint(consumerID,
                                       subscription == null ? (Channel)coreDestination : subscription,
  -                                    this, selector, noLocal, jmsDestination);
  +                                    this, selector, noLocal, jmsDestination, prefetchSize);
          
         JMSDispatcher.instance.registerTarget(new Integer(consumerID), new ConsumerAdvised(ep));
            
  -      ClientConsumerDelegate stub = new ClientConsumerDelegate(consumerID);
  +      
  +      ClientConsumerDelegate stub = new ClientConsumerDelegate(consumerID, prefetchSize);
         
         if (subscription != null)
         {
  @@ -382,33 +387,69 @@
         connectionEndpoint.sendMessage(message, null);
      }
      
  -   /**
  -    * Cancel all the deliveries in the session
  -    */
  -	public void cancelDeliveries() throws JMSException
  +   public void acknowledgeBatch(List ackInfos) throws JMSException
   	{
  -      if (closed)
  +      Iterator iter = ackInfos.iterator();
  +      
  +      while (iter.hasNext())
         {
  -         throw new IllegalStateException("Session is closed");
  +         AckInfo ackInfo = (AckInfo)iter.next();
  +         
  +         acknowledge(ackInfo);
  +      }
         }
         
  -      if (trace) { log.trace("Cancelling messages"); }
  +   public void acknowledge(AckInfo ackInfo) throws JMSException
  +   {
  +      //If the message was delivered via a connection consumer then the message needs to be acked
  +      //via the original consumer that was used to feed the connection consumer - which
  +      //won't be one of the consumers of this session
  +      //Therefore we always look in the global map of consumers held in the server peer
  +      ServerConsumerEndpoint consumer = this.connectionEndpoint.getConsumerEndpoint(ackInfo.getConsumerID());
               
  -		for(Iterator i = this.consumers.values().iterator(); i.hasNext(); )
  +      if (consumer == null)
   		{
  -			ServerConsumerEndpoint scd = (ServerConsumerEndpoint)i.next();
  -         scd.cancelAllDeliveries();
  +         throw new IllegalArgumentException("Cannot find consumer id: " + ackInfo.getConsumerID());
   		}     
  +      
  +      consumer.acknowledge(ackInfo.getMessageID());
  +      
   	}
   	
  -   public void acknowledge() throws JMSException
  +   public void cancelDeliveries(List ackInfos) throws JMSException
  +   {
  +      //Deliveries must be cancelled in reverse order
  +      
  +      log.info(this + " cancelling deliveries");
  +      
  +      Set consumers = new HashSet();
  +      
  +      for (int i = ackInfos.size() - 1; i >= 0; i--)
  +      {
  +         AckInfo ack = (AckInfo)ackInfos.get(i);
  +         
  +         //We look in the global map since the message might have come from connection consumer
  +         ServerConsumerEndpoint consumer = this.connectionEndpoint.getConsumerEndpoint(ack.getConsumerID());
  +
  +         if (consumer == null)
      {
  +            throw new IllegalArgumentException("Cannot find consumer id: " + ack.getConsumerID());
  +         }
  +         
  +         consumer.cancelDelivery(new Long(ack.getMessageID()));
  +         
  +         consumers.add(consumer);
  +      }
  +      
  +      //Need to prompt delivery for all consumers
  +      
  +      Iterator iter = consumers.iterator();
   
  -      Iterator iter = consumers.values().iterator();
         while (iter.hasNext())
         {
            ServerConsumerEndpoint consumer = (ServerConsumerEndpoint)iter.next();
  -         consumer.acknowledgeAll();
  +         
  +         consumer.promptDelivery();
         }
      }
   
  @@ -576,13 +617,21 @@
      /**
       * Starts this session's Consumers
       */
  -   protected void setStarted(boolean s)
  +   protected void setStarted(boolean s) throws JMSException
      {
         synchronized(consumers)
         {
            for(Iterator i = consumers.values().iterator(); i.hasNext(); )
            {
  -            ((ServerConsumerEndpoint)i.next()).setStarted(s);
  +            ServerConsumerEndpoint sce = (ServerConsumerEndpoint)i.next();
  +            if (s)
  +            {
  +               sce.start();
  +            }
  +            else
  +            {
  +               sce.stop();
  +            }
            }
         }
      }   
  
  
  
  1.10      +30 -8     jboss-jms/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: SessionEndpoint.java
  ===================================================================
  RCS file: /cvsroot/jboss/jboss-jms/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java,v
  retrieving revision 1.9
  retrieving revision 1.10
  diff -u -b -r1.9 -r1.10
  --- SessionEndpoint.java	20 Apr 2006 20:42:26 -0000	1.9
  +++ SessionEndpoint.java	17 Jul 2006 17:14:46 -0000	1.10
  @@ -22,6 +22,8 @@
   package org.jboss.jms.server.endpoint;
   
   
  +import java.util.List;
  +
   import javax.jms.JMSException;
   
   import org.jboss.jms.client.Closeable;
  @@ -31,14 +33,15 @@
   import org.jboss.jms.destination.JBossQueue;
   import org.jboss.jms.destination.JBossTopic;
   import org.jboss.jms.message.JBossMessage;
  +import org.jboss.jms.tx.AckInfo;
   
   /**
    * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
    * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
    * 
  - * @version <tt>$Revision: 1.9 $</tt>
  + * @version <tt>$Revision: 1.10 $</tt>
    *
  - * $Id: SessionEndpoint.java,v 1.9 2006/04/20 20:42:26 timfox Exp $
  + * $Id: SessionEndpoint.java,v 1.10 2006/07/17 17:14:46 timfox Exp $
    */
   public interface SessionEndpoint extends Closeable
   { 
  @@ -64,11 +67,18 @@
      JBossTopic createTopic(String topicName) throws JMSException;
   
      /**
  -    * Acknowledges the session
  +    * Acknowledge a batch of messages - used with client acknowledge or dups_ok acknowledge
  +    * @param ackInfos
  +    * @throws JMSException
       */ 
  -   void acknowledge() throws JMSException;
  +   void acknowledgeBatch(List ackInfos) throws JMSException;
      
  -   void cancelDeliveries() throws JMSException;
  +   /**
  +    * Acknowledge a message - used for auto acknowledge
  +    * @param ackInfo
  +    * @throws JMSException
  +    */
  +   void acknowledge(AckInfo ackInfo) throws JMSException;
      
      /**
       * Add a temporary destination.
  @@ -89,8 +99,20 @@
       */
      void unsubscribe(String subscriptionName) throws JMSException;
      
  +   /**
  +    * Send a message
  +    * @param message The message to send
  +    * @throws JMSException
  +    */
      void send(JBossMessage message) throws JMSException;
      
  -     
  +   /**
  +    * Cancel some deliveries.
  +    * This used at consumer close to cancel any undelivered messages left in the client buffer
  +    * or at session recovery to cancel any messages that couldn't be redelivered locally
  +    * @param ackInfos
  +    * @throws Exception
  +    */
  +   void cancelDeliveries(List ackInfos) throws JMSException;
   }
   
  
  
  
  1.1      date: 2006/07/17 17:14:45;  author: timfox;  state: Exp;jboss-jms/src/main/org/jboss/jms/server/endpoint/ClientDelivery.java
  
  Index: ClientDelivery.java
  ===================================================================
  /*
    * JBoss, Home of Professional Open Source
    * Copyright 2005, JBoss Inc., and individual contributors as indicated
    * by the @authors tag. See the copyright.txt in the distribution for a
    * full listing of individual contributors.
    *
    * This is free software; you can redistribute it and/or modify it
    * under the terms of the GNU Lesser General Public License as
    * published by the Free Software Foundation; either version 2.1 of
    * the License, or (at your option) any later version.
    *
    * This software is distributed in the hope that it will be useful,
    * but WITHOUT ANY WARRANTY; without even the implied warranty of
    * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
    * Lesser General Public License for more details.
    *
    * You should have received a copy of the GNU Lesser General Public
    * License along with this software; if not, write to the Free
    * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
    * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
    */
  package org.jboss.jms.server.endpoint;
  
  import java.io.Externalizable;
  import java.io.IOException;
  import java.io.ObjectInput;
  import java.io.ObjectOutput;
  import java.util.ArrayList;
  import java.util.Iterator;
  import java.util.List;
  
  import org.jboss.jms.message.JBossMessage;
  import org.jboss.jms.message.MessageProxy;
  import org.jboss.messaging.core.message.MessageFactory;
  
  /**
   * 
   * A ClientDelivery
   * Encapsulates a delivery of some messages to a client consumer
   * 
   * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
   * @version <tt>$Revision: 1.1 $</tt>
   *
   * $Id: ClientDelivery.java,v 1.1 2006/07/17 17:14:45 timfox Exp $
   *
   */
  public class ClientDelivery implements Externalizable
  {
     // Constants -----------------------------------------------------
     
     private static final long serialVersionUID = 8375144805659344430L;
  
     // Static --------------------------------------------------------
     
     // Attributes ----------------------------------------------------
     
     private List msgs;
           
     private int consumerID;
      
     // Constructors --------------------------------------------------
     
     public ClientDelivery()
     {      
     }
  
     public ClientDelivery(List msgs, int consumerID)
     {
        this.msgs = msgs;
        
        this.consumerID = consumerID;      
     }
    
     // Externalizable implementation
     // ---------------------------------------------------------------
     
     public void writeExternal(ObjectOutput out) throws IOException
     {
        out.writeInt(consumerID);
        
        out.writeInt(msgs.size());
        
        Iterator iter = msgs.iterator();
        
        while (iter.hasNext())
        {
           MessageProxy mp = (MessageProxy)iter.next();
           
           out.writeByte(mp.getMessage().getType());
  
           out.writeInt(mp.getDeliveryCount());
  
           mp.getMessage().writeExternal(out);
        }      
     }
  
     public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException
     {
        consumerID = in.readInt();
        
        int numMessages = in.readInt();
        
        msgs = new ArrayList(numMessages);
        
        for (int i = 0; i < numMessages; i++)
        {
           byte type = in.readByte();
           
           int deliveryCount = in.readInt();
           
           JBossMessage m = (JBossMessage)MessageFactory.createMessage(type);
  
           m.readExternal(in);
  
           MessageProxy md = JBossMessage.createThinDelegate(m, deliveryCount);
           
           msgs.add(md);
        }      
     }
  
     // Public --------------------------------------------------------
     
     public List getMessages()
     {
        return msgs;
     }
     
     public int getConsumerID()
     {
        return consumerID;
     }
  
     // Package protected ---------------------------------------------
     
     // Protected -----------------------------------------------------
     
     // Private -------------------------------------------------------
     
     // Inner classes -------------------------------------------------   
  }
  
  
  



More information about the jboss-cvs-commits mailing list