[jboss-cvs] jboss-jms/src/main/org/jboss/messaging/core ...

Timothy Fox tim.fox at jboss.com
Mon Jul 17 13:14:47 EDT 2006


  User: timfox  
  Date: 06/07/17 13:14:47

  Modified:    src/main/org/jboss/messaging/core      Channel.java
                        ChannelSupport.java Routable.java
  Removed:     src/main/org/jboss/messaging/core      ChannelState.java
                        State.java
  Log:
  Many changes including implementation of prefetch, SEDAisation of server, changing of recovery
  
  Revision  Changes    Path
  1.25      +48 -14    jboss-jms/src/main/org/jboss/messaging/core/Channel.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: Channel.java
  ===================================================================
  RCS file: /cvsroot/jboss/jboss-jms/src/main/org/jboss/messaging/core/Channel.java,v
  retrieving revision 1.24
  retrieving revision 1.25
  diff -u -b -r1.24 -r1.25
  --- Channel.java	24 Jun 2006 09:05:38 -0000	1.24
  +++ Channel.java	17 Jul 2006 17:14:47 -0000	1.25
  @@ -21,8 +21,6 @@
     */
   package org.jboss.messaging.core;
   
  -import org.jboss.messaging.core.plugin.contract.MessageStore;
  -
   import java.util.List;
   
   /**
  @@ -42,13 +40,16 @@
    *
    * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
    * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  - * @version <tt>$Revision: 1.24 $</tt>
  + * @version <tt>$Revision: 1.25 $</tt>
    *
  - * $Id: Channel.java,v 1.24 2006/06/24 09:05:38 timfox Exp $
  + * $Id: Channel.java,v 1.25 2006/07/17 17:14:47 timfox Exp $
    */
   public interface Channel extends DeliveryObserver, Receiver, Distributor
   {
   
  +   /**    
  +    * @return the unique ID of the channel
  +    */
      long getChannelID();
   
      /**
  @@ -83,24 +84,57 @@
      List browse();
   
      /**
  -    * @see Channel#browse()
  +    * @param filter - may be null, in which case no filter is applied.
  +    *
  +    * @return a List containing message references of messages whose state is maintained by this
  +    *         State instance. The list includes references of messages in process of being delivered
  +    *         and references of messages for which delivery has not been attempted yet.
       */
      List browse(Filter filter);
   
  -   MessageStore getMessageStore();
  +   /**
  +    * Delivers as many references as possible to it's router until no more deliveries are returned
  +    *
  +    */
  +   void deliver(boolean synchronous);
   
      /**
  -    * Synchronously pushes the "oldest" message stored by the channel to the receiver. If receiver
  -    * is null, it delivers the message to the first available receiver.
  +    * Close the channel
       *
  -    * @return true if a message was handed over to the receiver and the channel got a delivery in
  -    *         exchange, or false otherwise. 
       */
  -   boolean deliver(Receiver receiver);
  +   void close();
      
  -   boolean deliver();
  +   /**
  +    * Get a list of message references of messages in the process of being delivered, subject to the filter
  +    * @param filter
  +    * @return the list
  +    */
  +   List delivering(Filter filter);
   
  -   void close();
  +   /**
  +    * Get a list of message references of messages not in the process of being delivered, subject to the filter
  +    * @param filter
  +    * @return the list
  +    */
  +   List undelivered(Filter filter);   
  +
  +   /**
  +    * Clears non-recoverable state but not persisted state, so a recovery of the channel is possible
  +    * TODO really?
  +    */
  +   void clear();
  +   
  +   /**
  +    * Message amount. 
  +    * @return message amount.
  +    */
  +   int messageCount();   
  +   
  +   /**
  +    * Load the channel state from storage
  +    * @throws Exception
  +    */
  +   void load() throws Exception;
   
   }
   
  
  
  
  1.66      +1573 -334 jboss-jms/src/main/org/jboss/messaging/core/ChannelSupport.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: ChannelSupport.java
  ===================================================================
  RCS file: /cvsroot/jboss/jboss-jms/src/main/org/jboss/messaging/core/ChannelSupport.java,v
  retrieving revision 1.65
  retrieving revision 1.66
  diff -u -b -r1.65 -r1.66
  --- ChannelSupport.java	27 Jun 2006 19:44:39 -0000	1.65
  +++ ChannelSupport.java	17 Jul 2006 17:14:47 -0000	1.66
  @@ -22,25 +22,41 @@
   package org.jboss.messaging.core;
   
   import java.util.ArrayList;
  +import java.util.HashMap;
   import java.util.Iterator;
  +import java.util.LinkedHashSet;
   import java.util.List;
  +import java.util.Map;
   import java.util.Set;
   
   import org.jboss.logging.Logger;
   import org.jboss.messaging.core.memory.MemoryManager;
   import org.jboss.messaging.core.plugin.contract.MessageStore;
   import org.jboss.messaging.core.plugin.contract.PersistenceManager;
  +import org.jboss.messaging.core.refqueue.BasicPrioritizedDeque;
  +import org.jboss.messaging.core.refqueue.PrioritizedDeque;
   import org.jboss.messaging.core.tx.Transaction;
  +import org.jboss.messaging.core.tx.TransactionException;
  +import org.jboss.messaging.core.tx.TxCallback;
  +import org.jboss.messaging.util.Future;
  +
  +import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
  +import EDU.oswego.cs.dl.util.concurrent.SynchronizedLong;
   
   /**
  - * A basic channel implementation. It supports atomicity, isolation and, if a non-null
  - * PersistenceManager is available, it supports recoverability of reliable messages.
  + * A basic channel implementation. It supports atomicity, isolation and, if a
  + * non-null PersistenceManager is available, it supports recoverability of
  + * reliable messages. The channel implementation here uses a "SEDA-type"
  + * approach, where requests to handle messages, deliver to receivers or
  + * acknowledge messages are not executed concurrently but placed on an event
  + * queue and executed serially by a single thread. This prevents lock contention
  + * since requests are executed serially, resulting in better scalability and
  + * higher throughput at the expense of some latency
    *
    * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
    * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a> 
  - * @version <tt>$Revision: 1.65 $</tt>
  - *
  - * $Id: ChannelSupport.java,v 1.65 2006/06/27 19:44:39 timfox Exp $
  + * @version <tt>$Revision: 1.66 $</tt> $Id: ChannelSupport.java,v 1.65
  + *          2006/06/27 19:44:39 timfox Exp $
    */
   public abstract class ChannelSupport implements Channel
   {
  @@ -52,126 +68,173 @@
   
       // Attributes ----------------------------------------------------
   
  +   private boolean trace = log.isTraceEnabled();
  +
      protected long channelID;
  +
      protected Router router;
  -   protected State state;
  +
      protected MessageStore ms;
  +
  +   protected QueuedExecutor executor;
  +
  +   protected boolean receiversReady;
  +
  +   protected PrioritizedDeque messageRefs;
  +
  +   protected Set deliveries;
  +
  +   protected List downCache;
  +
  +   protected boolean acceptReliableMessages;
  +
  +   protected boolean recoverable;
  +
  +   protected SynchronizedLong messageOrdering;
  +
      protected PersistenceManager pm;
  +
      protected MemoryManager mm;
      
  -   private boolean trace = log.isTraceEnabled();
  +   protected int fullSize;
  +
  +   protected int pageSize;
  +
  +   protected int downCacheSize;
  +
  +   protected boolean paging;
  +
  +   protected int refsInStorage;
  +
  +   private Object refLock;
  +
  +   private Object deliveryLock;
  +
  +   // When we load refs from the channel we do so with values of ordering >=
  +   // this value
  +   private long loadFromOrderingValue;
   
      // Constructors --------------------------------------------------
   
      /**
  -    * @param acceptReliableMessages - it only makes sense if tl is null. Otherwise ignored (a
  +    * @param acceptReliableMessages -
  +    *           it only makes sense if tl is null. Otherwise ignored (a
       *        recoverable channel always accepts reliable messages)
       */
      
  -   protected ChannelSupport(long channelID,
  -                            MessageStore ms,
  -                            PersistenceManager pm,
  -                            MemoryManager mm,
  -                            boolean acceptReliableMessages,
  -                            boolean recoverable,
  -                            int fullSize, int pageSize, int downCacheSize)
  +   protected ChannelSupport(long channelID, MessageStore ms,
  +            PersistenceManager pm, MemoryManager mm,
  +            boolean acceptReliableMessages, boolean recoverable, int fullSize,
  +            int pageSize, int downCacheSize, QueuedExecutor executor)
      {
  -      if (trace) { log.trace("creating " + (pm != null ? "recoverable " : "non-recoverable ") + "channel[" + channelID + "]"); }
  +      if (trace)
  +      {
  +         log.trace("creating "
  +                  + (pm != null ? "recoverable " : "non-recoverable ")
  +                  + "channel[" + channelID + "]");
  +      }
   
         if (ms == null)
         {
  -         throw new IllegalArgumentException("MessageStore is null");
  +         throw new IllegalArgumentException(
  +                  "ChannelSupport requires a non-null message store");
         }
         if (pm == null)
         {
  -         throw new IllegalArgumentException("PersistenceManager is null");
  +         throw new IllegalArgumentException("ChannelSupport requires a "
  +                  + "non-null persistence manager");
  +      }
  +      if (pageSize >= fullSize)
  +      {
  +         throw new IllegalArgumentException(
  +                  "pageSize must be less than full size");
  +      }
  +      if (downCacheSize > pageSize)
  +      {
  +         throw new IllegalArgumentException(
  +                  "pageSize cannot be smaller than downCacheSize");
  +      }
  +      if (pageSize <= 0)
  +      {
  +         throw new IllegalArgumentException(
  +                  "pageSize must be greater than zero");
  +      }
  +      if (downCacheSize <= 0)
  +      {
  +         throw new IllegalArgumentException(
  +                  "downCacheSize must be greater than zero");
         }
  -      
  -      this.state = new ChannelState(this, pm, mm, acceptReliableMessages, recoverable,
  -                                    fullSize, pageSize, downCacheSize); 
         
         this.ms = ms;
  +
         this.pm = pm;
  +
         this.mm = mm;
         
         this.channelID = channelID;
  -   }     
      
  +      this.executor = executor;
   
  -   // Receiver implementation ---------------------------------------
  +      this.acceptReliableMessages = acceptReliableMessages;
   
  -   public Delivery handle(DeliveryObserver sender, Routable r, Transaction tx)
  -   {            
  -      checkClosed();
  +      this.recoverable = recoverable;
         
  -      if (r == null)
  -      {
  -         return null;
  -      }
  +      messageRefs = new BasicPrioritizedDeque(10);
   
  -      if (trace){ log.trace(this + " handles " + r + (tx == null ? " non-transactionally" : " in transaction: " + tx) ); }
  +      deliveries = new LinkedHashSet();
   
  -      MessageReference ref = obtainReference(r);
  +      downCache = new ArrayList();
         
  -      try
  -      {
  +      this.fullSize = fullSize;
            
  -         if (tx == null)
  -         {
  -            // Don't even attempt synchronous delivery for a reliable message when we have an
  -            // non-recoverable state that doesn't accept reliable messages. If we do, we may get
  -            // into the situation where we need to reliably store an active delivery of a reliable
  -            // message, which in these conditions cannot be done.
  +      this.pageSize = pageSize;
   
  -            if (r.isReliable() && !state.acceptReliableMessages())
  -            {
  -               log.error("Cannot handle reliable message " + r +
  -                         " because the channel has a non-recoverable state!");
  -               return null;
  -            }
  +      this.downCacheSize = downCacheSize;
   
  -            // This returns true if the ref was added to an empty reference queue
  -            boolean first = state.addReference(ref);
  +      refLock = new Object();
               
  -            // Previously we would call push() at this point to push the reference to the consumer.
  -            // One of the problems this had was it would end up leap-frogging messages that were
  -            // already in the queue. So now we add the message to the back of the queue and call
  -            // deliver(). In fact we only need to call deliver() if the queue was empty before we
  -            // added the reference.
  -            // If the queue wasn't empty there would be no active waiting receiver, so there would
  -            // be no need to call deliver().
  +      deliveryLock = new Object();
               
  -            if (first)
  -            {
  -               deliver(this, null);         
  -            }
  +      messageOrdering = new SynchronizedLong(0);
            }
  -         else
  -         {   
  -            if (trace){ log.trace("adding " + ref + " to state " + (tx == null ? "non-transactionally" : "in transaction: " + tx) ); }
               
  -            state.addReference(ref, tx);         
  -         }
  -      }
  -      catch (Throwable t)
  +   // Receiver implementation ---------------------------------------
  +
  +   public Delivery handle(DeliveryObserver sender, Routable r, Transaction tx)
         {
  -         log.error("Failed to handle message", t);
  +      checkClosed();
            
  -         ref.releaseMemoryReference();
  +      Future result = new Future();
            
  -         return null;
  +      try
  +      {
  +         // Instead of executing directly, we add the handle request to the
  +         // event queue
  +         // Since remoting doesn't currently handle non blocking IO, we still
  +         // have to wait for the result
  +         // But when remoting does, we can use a full SEDA approach and get even
  +         // better throughput
  +         this.executor.execute(new HandleRunnable(result, sender, r, tx));
         }
  -
  -      // I might as well return null, the sender shouldn't care
  -      return new SimpleDelivery(sender, ref, true);
  +      catch (InterruptedException e)
  +      {
  +         log.warn("Thread interrupted", e);
      }
      
  +      return (Delivery) result.getResult();
  +   }
   
      // DeliveryObserver implementation --------------------------
   
      public void acknowledge(Delivery d, Transaction tx)
      {
  -      if (trace){ log.trace("acknowledging " + d + (tx == null ? " non-transactionally" : " transactionally in " + tx)); }
  +      if (trace)
  +      {
  +         log.trace("acknowledging "
  +                  + d
  +                  + (tx == null ? " non-transactionally"
  +                           : " transactionally in " + tx));
  +      }
               
         try
         {      
  @@ -179,13 +242,40 @@
            {
               // acknowledge non transactionally
               
  -            state.acknowledge(d);
  +            // we put the acknowledgement on the event queue
  +
  +            // try
  +            // {
  +            // Future result = new Future();
  +            //
  +            // this.executor.execute(new AcknowledgeRunnable(d, result));
  +            //               
  +            // //For now we wait for result, but this may not be necessary
  +            // result.getResult();
  +            // }
  +            // catch (InterruptedException e)
  +            // {
  +            // log.warn("Thread interrupted", e);
  +            // }
  +
  +            // TODO We should consider also executing acks on the event queue
  +            acknowledgeInternal(d);
                  
  -            if (trace) { log.trace(this + " delivery " + d + " completed and forgotten"); }                    
            }
            else
            {
  -            state.acknowledge(d, tx);         
  +            this.getCallback(tx).addDelivery(d);
  +
  +            if (trace)
  +            {
  +               log.trace(this + " added " + d + " to memory on transaction "
  +                        + tx);
  +            }
  +
  +            if (recoverable && d.getReference().isReliable())
  +            {
  +               pm.removeReference(channelID, d.getReference(), tx);
  +            }
            }         
         }
         catch (Throwable t)
  @@ -194,24 +284,52 @@
         }
      }
   
  -   public void cancel(Delivery d) throws Throwable
  +   public void cancel(Delivery d)
      {
  -      if (trace) { log.trace("cancelling " + d); }
  +      // We put the cancellation on the event queue
  +      // try
  +      // {
  +      // Future result = new Future();
  +      //         
  +      // this.executor.execute(new CancelRunnable(d, result));
  +      //         
  +      // //For now we wait for result, but this may not be necessary
  +      // result.getResult();
  +      // }
  +      // catch (InterruptedException e)
  +      // {
  +      // log.warn("Thread interrupted", e);
  +      // }
         
  -      state.cancelDelivery(d);
  +      // TODO We should also consider executing cancels on the event queue
  +      try
  +      {
  +         cancelInternal(d);
  +      }
  +      catch (Throwable t)
  +      {
  +         log.error("Failed to cancel delivery", t);
  +      }
         
  -      if (trace) { log.trace(this + " marked message " + d.getReference() + " as undelivered"); }
      }
   
      // Distributor implementation ------------------------------------
   
      public boolean add(Receiver r)
      {
  -      if (trace) { log.trace(this + " attempting to add receiver " + r); }
  +      if (trace)
  +      {
  +         log.trace(this + " attempting to add receiver " + r);
  +      }
         
         boolean added = router.add(r);
   
  -      if (trace) { log.trace("receiver " + r + (added ? "" : " NOT") + " added"); }
  +      if (trace)
  +      {
  +         log.trace("receiver " + r + (added ? "" : " NOT") + " added");
  +      }
  +      
  +      receiversReady = true;
         
         return added;
      }
  @@ -220,7 +338,11 @@
      {
         boolean removed = router.remove(r);
   
  -      if (trace) { log.trace(this + (removed ? " removed ":" did NOT remove ") + r); }
  +      if (trace)
  +      {
  +         log.trace(this + (removed ? " removed " : " did NOT remove ") + r);
  +      }
  +
         return removed;
      }
   
  @@ -248,12 +370,12 @@
   
      public boolean isRecoverable()
      {
  -      return state.isRecoverable();
  +      return recoverable;
      }
      
      public boolean acceptReliableMessages()
      {
  -      return state.acceptReliableMessages();
  +      return acceptReliableMessages;
      }
   
      public List browse()
  @@ -261,296 +383,1413 @@
         return browse(null);
      }
   
  -   public List browse(Filter f)
  +   public List browse(Filter filter)
  +   {
  +      if (trace)
      {
  -      if (trace) { log.trace(this + " browse" + (f == null ? "" : ", filter = " + f)); }
  +         log.trace(this + " browse"
  +                  + (filter == null ? "" : ", filter = " + filter));
  +      }
   
  -      List references = state.browse(f);
  +      synchronized (deliveryLock)
  +      {
  +         synchronized (refLock)
  +         {
  +            //FIXME - This is currently broken since it doesn't take into account
  +            // refs paged into persistent storage
  +            // Also is very inefficient since it makes a copy
  +            List references = delivering(filter);
   
  -      // dereference pass
  -      ArrayList messages = new ArrayList(references.size());
  -      for(Iterator i = references.iterator(); i.hasNext();)
  +            Iterator iter = references.iterator();
  +            while (iter.hasNext())
         {
  -         MessageReference ref = (MessageReference)i.next();
  -         messages.add(ref.getMessage());
  -      }
  -      return messages;
  +               MessageReference ref = (MessageReference)iter.next();
      }
   
  -   public MessageStore getMessageStore()
  +            List undel = undelivered(filter);
  +            
  +            iter = undel.iterator();
  +            while (iter.hasNext())
      {
  -      return ms;
  +               MessageReference ref = (MessageReference)iter.next();
      }
   
  +            references.addAll(undel);
      
  -   public synchronized boolean deliver()
  -   {
  -      try
  +            // dereference pass
  +            ArrayList messages = new ArrayList(references.size());
  +            for (Iterator i = references.iterator(); i.hasNext();)
         {
  -         return deliver(this, null);
  +               MessageReference ref = (MessageReference) i.next();
  +               messages.add(ref.getMessage());
  +            }
  +            return messages;
         }
  -      catch (Throwable t)
  -      {
  -         log.error("Failed to deliver message", t);
  -         return false;
         }
      }
   
  -   public boolean deliver(Receiver r)
  +   public void deliver(boolean synchronous)
      {
  -      if (trace){ log.trace(r != null ? r + " requested delivery on " + this : "generic delivery requested on " + this); }
  -      
         checkClosed();
         
  +      // We put a delivery request on the event queue.
         try
         {
  -         return deliver(this, r);
  +         Future future = null;
  +         
  +         if (synchronous)
  +         {
  +            future = new Future();
         }
  -      catch (Throwable t)
  +                  
  +         this.executor.execute(new DeliveryRunnable(future));
  +         
  +         if (synchronous)
         {
  -         log.error("Failed to deliver message", t);
  -         return false;
  +            //Wait to complete
  +            future.getResult();
  +         }
  +      }
  +      catch (InterruptedException e)
  +      {
  +         log.warn("Thread interrupted", e);
         }
      }
      
      public void close()
      {
  -      if (state == null)
  +      if (router != null)
         {
  -         return;
  -      }
  -
         router.clear();
         router = null;
  -      state.clear();
  -      state = null;
  +      }
      }
     
      public void removeAllMessages()
      {
  -      state.removeAll();
  -   }
  -
  -   // Public --------------------------------------------------------
  -   
  -   // Package protected ---------------------------------------------
  -   
  -   // Protected -----------------------------------------------------
  -
  -   protected MessageReference obtainReference(Routable r)
  +      synchronized (refLock)
      {
  -      MessageReference ref = null;
  -
  -      // Convert to reference
  -      try
  +         synchronized (deliveryLock)
         {
  -         if (!r.isReference())
  +            // Remove all deliveries
  +
  +            Iterator iter = deliveries.iterator();
  +            while (iter.hasNext())
            {
  -            //We should only handle references in core.
  -            //TODO enforce this in the signature of handle method            
  -            //See http://jira.jboss.com/jira/browse/JBMESSAGING-255            
  -            log.warn("Should only handle references");
  -            //Remove this when this is enforced
  -            ref = ms.reference((Message)r);
  +               Delivery d = (Delivery) iter.next();
  +               MessageReference r = d.getReference();
  +               removeCompletely(r);
            }
  -         else
  +            deliveries.clear();
  +
  +            // Remove all holding messages
  +
  +            iter = messageRefs.getAll().iterator();
  +            while (iter.hasNext())
            {                     
  -            // Each channel has it's own copy of the reference
  -            ref = ((MessageReference)r).copy();
  +               MessageReference r = (MessageReference) iter.next();
  +               removeCompletely(r);
            }
  +            messageRefs.clear();
         
  -         return ref;
         }
  -      catch (Exception e)
  -      {
  -         log.error("Failed to reference routable", e);
  -         //FIXME - Swallowing exceptions
  -         return null;
         }      
      }
   
  -   /**
  -    * Give subclass a chance to process the message before storing it internally. Useful to get
  -    * rid of the REMOTE_ROUTABLE header in a distributed case, for example.
  -    */
  -   protected void processMessageBeforeStorage(MessageReference reference)
  +   public void load() throws Exception
      {
  -      // by default a noop
  +      if (trace)
  +      {
  +         log.trace(this + " loading channel state");
      }
  +      synchronized (refLock)
  +      {
  +         // First we need to reset the loaded flag in the db to "N"
  +         pm.resetLoadedStatus(channelID);
      
  +         refsInStorage = pm.getNumberOfUnloadedReferences(channelID);
      
  +         loadFromOrderingValue = pm.getMinOrdering(channelID);
           
  -   /**
  -    * Delivery for the channel must be synchronized. Otherwise we can end up with the same message
  -    * being delivered more than once to the same consumer (if deliver() is called concurrently) or
  -    * messages being delivered in the wrong order.
  -    */
  -   protected synchronized boolean deliver(DeliveryObserver sender, Receiver receiver)
  -      throws Throwable
  +         if (refsInStorage > 0)
      {
  +            load(Math.min(refsInStorage, fullSize));
  +         }
  +      }
  +   }
         
  -      MessageReference ref;
  -      
  -      while (true)
  +   public List delivering(Filter filter)
         {      
  -         ref = state.peekFirst();
  +      List delivering = new ArrayList();
            
  -         if (ref != null)
  +      synchronized (deliveryLock)
            {
  -            //Check if message is expired (we also do this on the client side)
  -            //If so ack it from the channel            
  -            if (ref.isExpired())                 
  +         for (Iterator i = deliveries.iterator(); i.hasNext();)
  +         {
  +            Delivery d = (Delivery) i.next();
  +
  +            MessageReference r = d.getReference();
  +
  +            // TODO: I need to dereference the message each time I apply the
  +            // filter. Refactor so the message reference will also contain JMS
  +            // properties
  +            if (filter == null || filter.accept(r.getMessage()))
               {
  +               delivering.add(r);
  +            }
  +         }
  +      }
                  if (trace)
                  {
  -                  log.trace("Message reference: " + ref + " has expired");
  +         log.trace(this + ": the non-recoverable state has "
  +                  + delivering.size() + " messages being delivered");
                  }
                  
  -               //remove and acknowledge it
  +      return delivering;
  +   }
                  
  -               state.removeFirstInMemory();
  +   public List undelivered(Filter filter)
  +   {
  +      List undelivered = new ArrayList();
                 
  -               Delivery delivery = new SimpleDelivery(this, ref, true);
  +      synchronized (refLock)
  +      {
  +         Iterator iter = messageRefs.getAll().iterator();
                  
  -               //is this stage really necessary?
  -               state.addDelivery(delivery);
  +         while (iter.hasNext())
  +         {
  +            MessageReference r = (MessageReference) iter.next();
                 
  -               state.acknowledge(delivery);                             
  +            // TODO: I need to dereference the message each time I apply the
  +            // filter. Refactor so the message reference will also contain JMS
  +            // properties
  +            if (filter == null || filter.accept(r.getMessage()))
  +            {
  +               undelivered.add(r);
               }            
               else
               {
  -               break;
  +               if (trace)
  +               {
  +                  log.trace(this + ": " + r
  +                           + " NOT accepted by filter so won't add to list");
               }
            }
  -         else
  -         {
  -            //No more refs in channel
  -            return false;
            }
         }
  -
  -      
  -      if (trace){ log.trace(this + " delivering " + ref); }
  -
  -      Delivery del = push(ref, receiver);
  -
  -      if (del == null)
  +      if (trace)
         {
  -         // no receiver, receiver that doesn't accept the message or broken receiver
  -
  -         if (trace){ log.trace(this + ": no delivery returned for message" + ref + " so no receiver got the message"); }
  +         log.trace(this + ": undelivered() returns a list of "
  +                  + undelivered.size() + " undelivered memory messages");
  +      }
   
  -         return false;
  +      return undelivered;
         }
  -      else
  -      {
  -         if (trace){ log.trace(this + ": delivery returned for message:" + ref); }
            
  -         // We must synchronize here to cope with another race condition where message is
  -         // cancelled/acked in flight while the following few actions are being performed.
  -         // e.g. delivery could be cancelled acked after being removed from state but before
  -         // delivery being added (observed).
  -         synchronized (del)
  +   public int messageCount()
            {
  -            if (trace) { log.trace(this + " incrementing delivery count for " + del); }    
  -            
  -            //FIXME - It's actually possible the delivery could be cancelled before it reaches
  -            //here, in which case we wouldn't get a delivery but we still need to increment the
  -            //delivery count
  -            //All the problems related to these race conditions and fiddly edge cases will disappear
  -            //once we do http://jira.jboss.com/jira/browse/JBMESSAGING-355
  -            //This will make life a lot easier
  -            
  -            //Note we don't increment the delivery count if the message didn't match the selector
  -            //FIXME - this is a temporary hack that will disappear once
  -            //http://jira.jboss.org/jira/browse/JBMESSAGING-275
  -            //is solved            
  -            boolean incrementCount = true;
  -            if (del instanceof SimpleDelivery)
  +      synchronized (refLock)
               {
  -               SimpleDelivery sd = (SimpleDelivery)del;
  -               incrementCount = sd.isSelectorAccepted();
  -            }                       
  -            if (incrementCount)
  +         synchronized (deliveryLock)
               {
  -               del.getReference().incrementDeliveryCount();
  +            return messageRefs.size() + deliveries.size();
  +         }
               }
  -                        
  -            if (del.isCancelled())
  -            {
  -               return false;
               }
                           
  -            state.removeFirstInMemory();
  +   // Public --------------------------------------------------------
               
  -            // delivered
  -            if (!del.isDone())
  +   public int memoryRefCount()
               {
  -               //Add the delivery to state
  -               state.addDelivery(del);
  +      synchronized (refLock)
  +      {
  +         return messageRefs.size();
               }                                
  -            
  -            //Delivery successful
  -            return true;               
            }
            
  +   public int memoryDeliveryCount()
  +   {
  +      synchronized (deliveryLock)
  +      {
  +         return deliveries.size();
         }
      }
   
  -   // Private -------------------------------------------------------
  -
  -   private void checkClosed()
  +   public int downCacheCount()
      {
  -      if (state == null)
  +      synchronized (refLock)
         {
  -         throw new IllegalStateException(this + " closed");
  +         return downCache.size();
         }
      }
   
  -   /**
  -    * Pushes the reference to the specified receiver. If the receiver is null, pushes the reference
  -    * to <i>a</i> reciever, as dictated by the routing policy, if receivers are available.
  -    */
  -   private Delivery push(MessageReference ref, Receiver receiver)
  -   {      
  -      Delivery d = null;
  -            
  -      if (receiver == null)
  +   public boolean isPaging()
         {
  -         Set deliveries = router.handle(this, ref, null);
  -         
  -         if (deliveries.isEmpty())
  +      synchronized (refLock)
            {
  -            return null;
  +         return paging;
  +      }
            }
   
  -         // TODO
  -         // Sanity check - we shouldn't get more then one delivery - the Channel can only cope with
  -         // one delivery per message reference at any one time. Eventually this will be enforced in
  -         // the design of the core classes but for now we just throw an Exception
  -         if (deliveries.size() > 1)
  +   public String toString()
            {
  -            throw new IllegalStateException("More than one delivery returned from router!");
  +      return "ChannelSupport[" + channelID + "]";
            }
            
  -         d = (Delivery)deliveries.iterator().next();
  -      }
  -      else
  +   // Package protected ---------------------------------------------
  +
  +   // Protected -----------------------------------------------------
  +
  +   /*
  +    * This methods delivers as many messages as possible to the router until no
  +    * more deliveries are returned. This method should never be called at the
  +    * same time as handle. 
  +    * 
  +    * @see org.jboss.messaging.core.Channel#deliver()
  +    */
  +   protected void deliverInternal()
         {
            try
            {
  -            d = receiver.handle(this, ref, null);                          
  -         }
  -         catch(Throwable t)
  +         while (true)
            {
  -            // broken receiver - log the exception and ignore it
  -            log.error("The receiver " + receiver + " is broken", t);
  -         }
  +            MessageReference ref;
  +
  +            synchronized (refLock)
  +            {
  +               ref = (MessageReference) messageRefs.peekFirst();               
  +            }
  +
  +            if (ref != null)
  +            {
  +               // Check if message is expired (we also do this on the client
  +               // side)
  +               // If so ack it from the channel
  +               if (ref.isExpired())
  +               {
  +                  if (trace)
  +                  {
  +                     log.trace("Message reference: " + ref + " has expired");
  +                  }
  +
  +                  // remove and acknowledge it
  +
  +                  removeFirstInMemory();
  +
  +                  Delivery delivery = new SimpleDelivery(this, ref, true);
  +
  +                  // TODO - is this stage really necessary?
  +                  synchronized (deliveryLock)
  +                  {
  +                     deliveries.add(delivery);
  +                  }
  +
  +                  acknowledgeInternal(delivery);
  +               }
  +               else
  +               {
  +                  // Reference is not expired
  +
  +                  // Push the ref to a receiver
  +                  Delivery del = push(ref);
  +
  +                  if (del == null)
  +                  {
  +                     // no receiver, receiver that doesn't accept the message or
  +                     // broken receiver
  +
  +                     if (trace)
  +                     {
  +                        log.trace(this + ": no delivery returned for message"
  +                                 + ref + " so no receiver got the message");
  +                     }
  +
  +                     // Now we stop delivering
  +
  +                     if (trace)
  +                     {
  +                        log.trace("Delivery is now complete");
  +                     }
  +
  +                     receiversReady = false;
  +
  +                     return;
  +                  }
  +                  else
  +                  {
  +                     if (trace)
  +                     {
  +                        log.trace(this + ": delivery returned for message:"
  +                                 + ref);
  +                     }
  +
  +                     // We must synchronize here to cope with another race
  +                     // condition where message is
  +                     // cancelled/acked in flight while the following few
  +                     // actions are being performed.
  +                     // e.g. delivery could be cancelled acked after being
  +                     // removed from state but before
  +                     // delivery being added (observed).
  +                     synchronized (del)
  +                     {
  +                        if (trace)
  +                        {
  +                           log.trace(this + " incrementing delivery count for "
  +                                    + del);
  +                        }
  +
  +                        // FIXME - It's actually possible the delivery could be
  +                        // cancelled before it reaches
  +                        // here, in which case we wouldn't get a delivery but we
  +                        // still need to increment the
  +                        // delivery count
  +                        // All the problems related to these race conditions and
  +                        // fiddly edge cases will disappear
  +                        // once we do
  +                        // http://jira.jboss.com/jira/browse/JBMESSAGING-355
  +                        // This will make life a lot easier
  +
  +                        // Note we don't increment the delivery count if the
  +                        // message didn't match the selector
  +                        // FIXME - this is a temporary hack that will disappear
  +                        // once
  +                        // http://jira.jboss.org/jira/browse/JBMESSAGING-275
  +                        // is solved
  +                        boolean incrementCount = true;
  +                        if (del instanceof SimpleDelivery)
  +                        {
  +                           SimpleDelivery sd = (SimpleDelivery) del;
  +                           incrementCount = sd.isSelectorAccepted();
  +                        }
  +                        if (incrementCount)
  +                        {
  +                           del.getReference().incrementDeliveryCount();                    
  +                        }
  +
  +                        if (!del.isCancelled())
  +                        {
  +                           removeFirstInMemory();
  +
  +                           // delivered
  +                           if (!del.isDone())
  +                           {
  +                              // Add the delivery to state
  +                              synchronized (deliveryLock)
  +                              {
  +                                 deliveries.add(del);
  +                              }
  +                           }
  +                        }
  +                     }
  +                  }
  +               }
  +            }
  +            else
  +            {
  +               // No more refs in channel
  +               if (trace)
  +               {
  +                  log.trace(this + " no more refs to deliver ");
  +               }
  +               break;
  +            }
  +         }
  +      }
  +      catch (Throwable t)
  +      {
  +         log.error(this + " Failed to deliver", t);
  +      }
  +   }
  +
  +   protected Delivery handleInternal(DeliveryObserver sender, Routable r,
  +            Transaction tx)
  +   {
  +      if (r == null)
  +      {
  +         return null;
  +      }
  +
  +      if (trace)
  +      {
  +         log.trace(this
  +                  + " handles "
  +                  + r
  +                  + (tx == null ? " non-transactionally" : " in transaction: "
  +                           + tx));
  +      }
  +
  +      MessageReference ref = obtainReference(r);
  +
  +      try
  +      {
  +
  +         if (tx == null)
  +         {
  +            // Don't even attempt synchronous delivery for a reliable message
  +            // when we have an
  +            // non-recoverable state that doesn't accept reliable messages. If
  +            // we do, we may get
  +            // into the situation where we need to reliably store an active
  +            // delivery of a reliable
  +            // message, which in these conditions cannot be done.
  +
  +            if (r.isReliable() && !acceptReliableMessages)
  +            {
  +               log.error("Cannot handle reliable message " + r
  +                        + " because the channel has a non-recoverable state!");
  +               return null;
  +            }
  +
  +            checkMemory();
  +
  +            ref.setOrdering(messageOrdering.increment());
  +
  +            if (ref.isReliable() && recoverable)
  +            {
  +               // Reliable message in a recoverable state - also add to db
  +               if (trace)
  +               {
  +                  log.trace("adding " + ref
  +                           + " to database non-transactionally");
  +               }
  +
  +               pm.addReference(channelID, ref, null);
  +            }
  +
  +            addReferenceInMemory(ref);
  +
  +            // We only do delivery if there are receivers that haven't said they
  +            // don't want
  +            // any more references
  +            if (receiversReady)
  +            {
  +               // Prompt delivery
  +               deliverInternal();
  +            }
  +         }
  +        else
  +         {
  +            if (trace)
  +            {
  +               log.trace("adding "
  +                        + ref
  +                        + " to state "
  +                        + (tx == null ? "non-transactionally"
  +                                 : "in transaction: " + tx));
  +            }
  +
  +            checkMemory();
  +
  +            if (ref.isReliable() && !acceptReliableMessages)
  +            {
  +               // this transaction has no chance to succeed, since a reliable
  +               // message cannot be
  +               // safely stored by a non-recoverable state, so doom the
  +               // transaction
  +               if (trace)
  +               {
  +                  log
  +                           .trace(this
  +                                    + " cannot handle reliable messages, dooming the transaction");
  +               }
  +               tx.setRollbackOnly();
  +            } 
  +            else
  +            {
  +               // add to post commit callback
  +               ref.setOrdering(messageOrdering.increment());
  +               this.getCallback(tx).addRef(ref);
  +               if (trace)
  +               {
  +                  log.trace(this + " added transactionally " + ref
  +                           + " in memory");
  +               }
  +            }
  +
  +            if (ref.isReliable() && recoverable)
  +            {
  +               // Reliable message in a recoverable state - also add to db
  +               if (trace)
  +               {
  +                  log.trace("adding "
  +                           + ref
  +                           + (tx == null ? " to database non-transactionally"
  +                                    : " in transaction: " + tx));
  +               }
  +
  +               pm.addReference(channelID, ref, tx);
  +            }
  +         }
  +      }
  +      catch (Throwable t)
  +      {
  +         log.error("Failed to handle message", t);
  +
  +         ref.releaseMemoryReference();
  +
  +         return null;
  +      }
  +
  +      // I might as well return null, the sender shouldn't care
  +      return new SimpleDelivery(sender, ref, true);
  +   }
  +
  +   protected void acknowledgeInternal(Delivery d) throws Throwable
  +   {
  +      synchronized (deliveryLock)
  +      {
  +         acknowledgeInMemory(d);
  +      }
  +
  +      if (recoverable && d.getReference().isReliable())
  +      {
  +         // TODO - Optimisation - If the message is acknowledged before the call
  +         // to handle() returns
  +         // And it is a new message then there won't be any reference in the
  +         // database
  +         // So the call to remove from the db is wasted.
  +         // We should add a flag to check this
  +         pm.removeReference(channelID, d.getReference(), null);
  +      }
  +
  +      d.getReference().releaseMemoryReference();
  +   }
  +
  +   protected void cancelInternal(Delivery del) throws Throwable
  +   {
  +      if (trace)
  +      {
  +         log.trace(this + " cancelling " + del + " in memory");
  +      }
  +
  +      boolean removed;
  +
  +      synchronized (deliveryLock)
  +      {
  +         removed = deliveries.remove(del);
  +      }
  +
  +      if (!removed)
  +      {
  +         // This is ok
  +         // This can happen if the message is cancelled before the result of
  +         // ServerConsumerDelegate.handle
  +         // has returned, in which case we won't have a record of the delivery
  +         // in the Set
  +
  +         // In this case we don't want to add the message reference back into
  +         // the state
  +         // since it was never removed in the first place
  +
  +         if (trace)
  +         {
  +            log.trace(this + " can't find delivery " + del
  +                     + " in state so not replacing messsage ref");
  +         }
  +
  +      }
  +      else
  +      {
  +         synchronized (refLock)
  +         {
  +            messageRefs.addFirst(del.getReference(), del.getReference()
  +                     .getPriority());
  +
  +            if (paging)
  +            {
  +               // if paging we need to evict the end reference to storage to
  +               // preserve the number of refs in the queue
  +
  +               MessageReference ref = (MessageReference) messageRefs
  +                        .removeLast();
  +
  +               addToDownCache(ref);
  +
  +               refsInStorage++;
  +            }
  +         }
  +
  +         if (trace)
  +         {
  +            log.trace(this + " added " + del.getReference()
  +                     + " back into state");
  +         }
  +      }
  +   }
  +
  +   protected MessageReference removeFirstInMemory() throws Throwable
  +   {
  +      synchronized (refLock)
  +      {
  +         MessageReference result = (MessageReference) messageRefs.removeFirst();
  +
  +         if (refsInStorage > 0)
  +         {
  +            int numberLoadable = Math.min(refsInStorage, pageSize);
  +
  +            if (messageRefs.size() <= fullSize - numberLoadable)
  +            {
  +               load(numberLoadable);
  +            }
  +         }
  +         else
  +         {
  +            paging = false;
  +         }
  +
  +         return (MessageReference) result;
  +      }
  +   }
  +
  +   protected MessageReference obtainReference(Routable r)
  +   {
  +      MessageReference ref = null;
  +
  +      // Convert to reference
  +      try
  +      {
  +         if (!r.isReference())
  +         {
  +            // We should only handle references in core.
  +            // TODO enforce this in the signature of handle method
  +            // See http://jira.jboss.com/jira/browse/JBMESSAGING-255
  +            log.warn("Should only handle references");
  +            // Remove this when this is enforced
  +            ref = ms.reference((Message) r);
  +         }
  +         else
  +         {
  +            // Each channel has it's own copy of the reference
  +            ref = ((MessageReference) r).copy();
  +         }
  +
  +         return ref;
  +      }
  +      catch (Exception e)
  +      {
  +         log.error("Failed to reference routable", e);
  +         // FIXME - Swallowing exceptions
  +         return null;
  +      }
  +   }
  +
  +   protected void checkMemory()
  +   {
  +
  +      // Disabled for now
  +
  +      // if (mm != null)
  +      // {
  +      // boolean isLow = mm.isMemoryLow();
  +      //         
  +      // if (isLow)
  +      // {
  +      //            
  +      // synchronized (refLock)
  +      // {
  +      // if (!paging)
  +      // {
  +      // log.info("Memory is low:" + this);
  +      //                  
  +      // fullSize = messageRefs.size() + 1;
  +      //                  
  +      // //TODO Make this configurable
  +      // pageSize = downCacheSize = Math.max(1, fullSize / 50);
  +      //                  
  +      // log.info("Turned paging on, fullSize=" + fullSize + " dc:" +
  +      // downCacheSize + " ps: " + pageSize);
  +      // }
  +      // else
  +      // {
  +      // //log.info("already paging");
  +      // }
  +      //               
  +      // }
  +      // }
  +      // }
  +   }
  +
  +   protected void addReferenceInMemory(MessageReference ref) throws Throwable
  +   {
  +      if (ref.isReliable() && !acceptReliableMessages)
  +      {
  +         throw new IllegalStateException("Reliable reference " + ref
  +                  + " cannot be added to non-recoverable state");
  +      }
  +
  +      synchronized (refLock)
  +      {
  +         if (paging)
  +         {
  +            addToDownCache(ref);
  +
  +            refsInStorage++;
  +         }
  +         else
  +         {
  +            messageRefs.addLast(ref, ref.getPriority());
  +
  +            if (trace)
  +            {
  +               log.trace(this + " added " + ref
  +                        + " non-transactionally in memory");
  +            }
  +
  +            if (messageRefs.size() == fullSize)
  +            {
  +               // We are full in memory - go into paging mode
  +
  +               if (trace)
  +               {
  +                  log.trace(this + " going into paging mode");
  +               }
  +
  +               paging = true;
  +            }
  +         }
  +      }
  +   }
  +
  +   protected void addToDownCache(MessageReference ref) throws Exception
  +   {
  +      // If the down cache exists then refs are not spilled over immediately,
  +      // but store in the cache
  +      // and spilled over in one go when the next load is requested, or when it
  +      // is full
  +
  +      // Both non reliable and reliable references can go in the down cache,
  +      // however only non-reliable
  +      // references actually get added to storage, reliable references instead
  +      // get their LOADED column
  +      // updated to "N".
  +
  +      downCache.add(ref);
  +
  +      if (trace)
  +      {
  +         log.trace(ref + " sent to downcache");
  +      }
  +
  +      if (downCache.size() == downCacheSize)
  +      {
  +         if (trace)
  +         {
  +            log.trace(this + "'s downcache is full (" + downCache.size()
  +                     + " messages)");
  +         }
  +         flushDownCache();
  +      }
  +   }
  +
  +   protected void flushDownCache() throws Exception
  +   {
  +      if (trace)
  +      {
  +         log.trace(this + " flushing " + downCache.size()
  +                  + " refs from downcache");
  +      }
  +
  +      // Non persistent refs or persistent refs in a non recoverable state won't
  +      // already be in the db
  +      // so they need to be inserted
  +      // Persistent refs in a recoverable state will already be there so need to
  +      // be updated
  +
  +      List toUpdate = new ArrayList();
  +
  +      List toAdd = new ArrayList();
  +
  +      Iterator iter = downCache.iterator();
  +
  +      long minOrdering = Long.MAX_VALUE;
  +
  +      while (iter.hasNext())
  +      {
  +         MessageReference ref = (MessageReference) iter.next();
  +
  +         minOrdering = Math.min(minOrdering, ref.getOrdering());
  +
  +         if (ref.isReliable() && recoverable)
  +         {
  +            toUpdate.add(ref);
  +         }
  +         else
  +         {
  +            toAdd.add(ref);
  +         }
  +      }
  +
  +      if (!toAdd.isEmpty())
  +      {
  +         pm.addReferences(channelID, toAdd, false);
  +      }
  +      if (!toUpdate.isEmpty())
  +      {
  +         pm.updateReferencesNotLoaded(channelID, toUpdate);
  +      }
  +
  +      // Release in memory refs for the refs we just spilled
  +      // Note! This must be done after the db inserts - to ensure message is
  +      // still in memory
  +      iter = downCache.iterator();
  +
  +      while (iter.hasNext())
  +      {
  +         MessageReference ref = (MessageReference) iter.next();
  +
  +         ref.releaseMemoryReference();
  +      }
  +
  +      downCache.clear();
  +
  +      // when we select refs to load from the channel we load them with a value
  +      // of ordering >= loadFromOrderingValue
  +      if (this.loadFromOrderingValue == 0)
  +      {
  +         // First time paging
  +         this.loadFromOrderingValue = minOrdering;
  +      }
  +      else
  +      {
  +         // It's possible that one of the refs that we're spilling to disk
  +         // has a lower ordering value than the current loadFromOrderingValue
  +         // value
  +         // normally this will not be the case but it can be the case if
  +         // messages are cancelled out of normal delivery order
  +         // or with committing transactions since the ordering for refs in a
  +         // transaction is determined when the refs are added to the tx, not
  +         // at commit time.
  +         // In these cases we need to adjust loadFromOrderingValue appropriately
  +         if (minOrdering < loadFromOrderingValue)
  +         {
  +            loadFromOrderingValue = minOrdering;
  +         }
  +      }
  +
  +      if (trace)
  +      {
  +         log.trace(this + " cleared downcache");
  +      }
  +   }
  +
  +   protected void acknowledgeInMemory(Delivery d) throws Throwable
  +   {
  +      if (d == null)
  +      {
  +         throw new IllegalArgumentException("Can't acknowledge a null delivery");
  +      }
  +
  +      boolean removed = deliveries.remove(d);
  +
  +      // It's ok if the delivery couldn't be found - this might happen
  +      // if the delivery is acked before the call to handle() has returned
  +
  +      if (trace)
  +      {
  +         log.trace(this + " removed " + d + " from memory:" + removed);
  +      }
  +   }
  +
  +   protected void load(int number) throws Exception
  +   {
  +      if (trace)
  +      {
  +         log.trace(this + " Loading " + number + " refs from storage");
  +      }
  +
  +      // Must flush the down cache first
  +      flushDownCache();
  +
  +      List refInfos = pm.getReferenceInfos(channelID, loadFromOrderingValue,
  +               number);
  +
  +      // We may load less than desired due to "holes" - this is ok
  +      int numberLoaded = refInfos.size();
  +      
  +      if (numberLoaded == 0)
  +      {
  +         throw new IllegalStateException(
  +                  "Trying to page refs in from persitent storage - but can't find any!");
  +      }
  +
  +      Map refMap = new HashMap(refInfos.size());
  +
  +      List msgIdsToLoad = new ArrayList(refInfos.size());
  +
  +      Iterator iter = refInfos.iterator();
  +
  +      // Put the refs that we already have messages for in a map
  +      while (iter.hasNext())
  +      {
  +         PersistenceManager.ReferenceInfo info = (PersistenceManager.ReferenceInfo) iter
  +                  .next();
  +
  +         long msgId = info.getMessageId();
  +
  +         MessageReference ref = ms.reference(msgId);
  +
  +         if (ref != null)
  +         {
  +            refMap.put(new Long(msgId), ref);
  +         }
  +         else
  +         {
  +            // Add id to list of msg ids to load
  +            msgIdsToLoad.add(new Long(msgId));
  +         }
  +      }
  +
  +      // Load the messages (if any)
  +      List messages = null;
  +      if (!msgIdsToLoad.isEmpty())
  +      {
  +         messages = pm.getMessages(msgIdsToLoad);
  +
  +         if (messages.size() != msgIdsToLoad.size())
  +         {
  +            // Sanity check
  +            throw new IllegalStateException(
  +                     "Did not load correct number of messages, wanted:"
  +                              + msgIdsToLoad.size() + " but got:"
  +                              + messages.size());
  +         }
  +
  +         // Create references for these messages and add them to the reference
  +         // map
  +         iter = messages.iterator();
  +
  +         while (iter.hasNext())
  +         {
  +            Message m = (Message) iter.next();
  +
  +            // Message might actually be know to the store since we did the
  +            // first check
  +            // since might have been added by different channel
  +            // in intervening period, but this is ok - the store knows to only
  +            // return a reference
  +            // to the pre-existing message
  +            MessageReference ref = ms.reference(m);
  +
  +            refMap.put(new Long(m.getMessageID()), ref);
  +         }
  +      }
  +
  +      // Now we have all the messages loaded and refs created we need to put the
  +      // refs in the right order
  +      // in the channel
  +
  +      boolean loadedReliable = false;
  +
  +      long firstOrdering = -1;
  +      long lastOrdering = -1;
  +
  +      List toRemove = new ArrayList();
  +
  +      iter = refInfos.iterator();
  +      while (iter.hasNext())
  +      {
  +         PersistenceManager.ReferenceInfo info = (PersistenceManager.ReferenceInfo) iter
  +                  .next();
  +
  +         if (firstOrdering == -1)
  +         {
  +            firstOrdering = info.getOrdering();
  +         }
  +         lastOrdering = info.getOrdering();
  +
  +         long msgId = info.getMessageId();
  +
  +         MessageReference ref = (MessageReference) refMap.get(new Long(msgId));
  +
  +         ref.setDeliveryCount(info.getDeliveryCount());
  +
  +         ref.setOrdering(info.getOrdering());
  +
  +         messageRefs.addLast(ref, ref.getPriority());
  +
  +         if (recoverable && ref.isReliable())
  +         {
  +            loadedReliable = true;
  +         }
  +         else
  +         {
  +            // We put the non reliable refs (or reliable in a non-recoverable
  +            // store)
  +            // in a list to be removed
  +            toRemove.add(ref);
  +         }
  +      }
  +
  +      if (!toRemove.isEmpty())
  +      {
  +         // Now we remove the references we loaded (only the non persistent or
  +         // persistent in a non-recoverable
  +         // store)
  +         pm.removeReferences(channelID, toRemove);
  +      }
  +
  +      if (loadedReliable)
  +      {
  +         // If we loaded any reliable refs then we must mark them as loaded in
  +         // the store
  +         // otherwise they may get loaded again, the next time we do a load
  +         // We can't delete them since they're reliable and haven't been acked
  +         // yet
  +         pm.updateReliableReferencesLoadedInRange(channelID, firstOrdering,
  +                  lastOrdering);
  +      }
  +
  +      refsInStorage -= numberLoaded;
  +
  +      loadFromOrderingValue = lastOrdering + 1;
  +
  +      if (refsInStorage != 0 || messageRefs.size() == fullSize)
  +      {
  +         paging = true;
  +      }
  +      else
  +      {
  +         paging = false;
  +      }
  +   }
  +
  +   protected InMemoryCallback getCallback(Transaction tx)
  +   {
  +      InMemoryCallback callback = (InMemoryCallback) tx.getKeyedCallback(this);
  +
  +      if (callback == null)
  +      {
  +         callback = new InMemoryCallback();
  +
  +         tx.addKeyedCallback(callback, this);
  +      }
  +
  +      return callback;
  +   }
  +
  +   protected void removeCompletely(MessageReference r)
  +   {
  +      if (recoverable && r.isReliable())
  +      {
  +         try
  +         {
  +            pm.removeReference(channelID, r, null);
  +         }
  +         catch (Exception e)
  +         {
  +            if (trace)
  +            {
  +               log.trace("removeAll() failed on removing " + r, e);
  +            }
  +         }
  +      }
  +   }
  +
  +   // Private -------------------------------------------------------
  +
  +   // Inner classes -------------------------------------------------
  +
  +   private class InMemoryCallback implements TxCallback, Runnable
  +   {
  +      private List refsToAdd;
  +
  +      private List deliveriesToRemove;
  +
  +      private long minOrder;
  +
  +      private InMemoryCallback()
  +      {
  +         refsToAdd = new ArrayList();
  +
  +         deliveriesToRemove = new ArrayList();
  +      }
  +
  +      private void addRef(MessageReference ref)
  +      {
  +         refsToAdd.add(ref);
  +
  +         minOrder = Math.min(minOrder, ref.getOrdering());
  +      }
  +
  +      private void addDelivery(Delivery del)
  +      {
  +         deliveriesToRemove.add(del);
  +      }
  +
  +      public void beforePrepare()
  +      {
  +         // NOOP
  +      }
  +
  +      public void beforeCommit(boolean onePhase)
  +      {
  +         // NOOP
  +      }
  +
  +      public void beforeRollback(boolean onePhase)
  +      {
  +         // NOOP
  +      }
  +
  +      public void afterPrepare()
  +      {
  +         // NOOP
  +      }
  +
  +      private boolean committing;
  +
  +      private Future result;
  +
  +      public void run()
  +      {
  +         try
  +         {
  +            if (committing)
  +            {
  +               doAfterCommit();
  +            }
  +            else
  +            {
  +               doAfterRollback();
  +            }
  +
  +            // prompt delivery
  +            if (receiversReady)
  +            {
  +               deliverInternal();
  +            }
  +
  +            result.setResult(null);
  +         }
  +         catch (Throwable t)
  +         {
  +            result.setException(t);
  +         }
  +      }
  +
  +      public void afterCommit(boolean onePhase) throws TransactionException
  +      {
  +         // We don't execute the commit directly, we add it to the event queue
  +         // of the channel
  +         // so it is executed in turn
  +         committing = true;
  +
  +         executeAndWaitForResult();
  +      }
  +
  +      public void afterRollback(boolean onePhase) throws TransactionException
  +      {
  +         // We don't execute the commit directly, we add it to the event queue
  +         // of the channel
  +         // so it is executed in turn
  +         committing = false;
  +
  +         executeAndWaitForResult();
  +      }
  +
  +      private void executeAndWaitForResult() throws TransactionException
  +      {
  +         result = new Future();
  +
  +         try
  +         {
  +            executor.execute(this);
  +         }
  +         catch (InterruptedException e)
  +         {
  +            log.warn("Thread interrupted", e);
  +         }
  +
  +         // Wait for it to complete
  +
  +         Throwable t = (Throwable) result.getResult();
  +
  +         if (t != null)
  +         {
  +            if (t instanceof RuntimeException)
  +            {
  +               throw (RuntimeException) t;
  +            }
  +            if (t instanceof Error)
  +            {
  +               throw (Error) t;
  +            }
  +            if (t instanceof TransactionException)
  +            {
  +               throw (TransactionException) t;
  +            }
  +            throw new IllegalStateException("Unknown Throwable " + t);
  +         }
  +      }
  +
  +      private void doAfterCommit() throws TransactionException
  +      {
  +         // We add the references to the state
  +
  +         Iterator iter = refsToAdd.iterator();
  +
  +         while (iter.hasNext())
  +         {
  +            MessageReference ref = (MessageReference) iter.next();
  +
  +            if (trace)
  +            {
  +               log.trace(this + ": adding " + ref
  +                                 + " to non-recoverable state");
  +            }
  +
  +            try
  +            {
  +               addReferenceInMemory(ref);
  +            }
  +            catch (Throwable t)
  +            {
  +               // FIXME - Sort out this exception handling
  +               log.error("Failed to add reference", t);
  +            }
  +         }
  +
  +         // Remove deliveries
  +
  +         iter = this.deliveriesToRemove.iterator();
  +
  +         while (iter.hasNext())
  +         {
  +            Delivery del = (Delivery) iter.next();
  +
  +            if (trace)
  +            {
  +               log.trace(this + " removing " + del + " after commit");
  +            }
  +
  +            del.getReference().releaseMemoryReference();
  +
  +            try
  +            {
  +               synchronized (deliveryLock)
  +               {
  +                  acknowledgeInMemory(del);
  +               }
  +            }
  +            catch (Throwable t)
  +            {
  +               throw new TransactionException("Failed to ack message", t);
  +            }
  +         }
  +      }
  +
  +      private void doAfterRollback()
  +      {
  +         Iterator iter = refsToAdd.iterator();
  +
  +         while (iter.hasNext())
  +         {
  +            MessageReference ref = (MessageReference) iter.next();
  +
  +            ref.releaseMemoryReference();
  +         }
  +      }
  +   }
  +
  +   /**
  +    * Give subclass a chance to process the message before storing it
  +    * internally. Useful to get rid of the REMOTE_ROUTABLE header in a
  +    * distributed case, for example.
  +    */
  +   protected void processMessageBeforeStorage(MessageReference reference)
  +   {
  +      // by default a noop
  +   }
  +
  +   // Private -------------------------------------------------------
  +
  +   /**
  +    * Pushes the reference to <i>a</i> receiver, as dictated by the routing
  +    * policy
  +    */
  +   private Delivery push(MessageReference ref)
  +   {
  +      Delivery d = null;
  +
  +      Set deliveries = router.handle(this, ref, null);
  +
  +      if (deliveries.isEmpty())
  +      {
  +         return null;
  +      }
  +
  +      // TODO
  +      // Sanity check - we shouldn't get more then one delivery - the Channel
  +      // can only cope with
  +      // one delivery per message reference at any one time. Eventually this
  +      // will be enforced in
  +      // the design of the core classes but for now we just throw an Exception
  +      if (deliveries.size() > 1)
  +      {
  +         throw new IllegalStateException(
  +                  "More than one delivery returned from router!");
         }            
         
  +      d = (Delivery) deliveries.iterator().next();
  +
         return d;
      }
      
  +   private void checkClosed()
  +   {
  +      if (router == null)
  +      {
  +         throw new IllegalStateException(this + " closed");
  +      }
  +   }
  +
      // Inner classes -------------------------------------------------
   
  +   private class DeliveryRunnable implements Runnable
  +   {
  +      Future result;
  +      
  +      DeliveryRunnable(Future result)
  +      {
  +         this.result = result;
  +      }
  +      
  +      public void run()
  +      {
  +         receiversReady = true;
  +         deliverInternal();
  +         if (result != null)
  +         {
  +            result.setResult(null);
  +         }
  +      }
  +   }
  +
  +   private class HandleRunnable implements Runnable
  +   {
  +      Future result;
  +
  +      DeliveryObserver sender;
  +
  +      Routable routable;
  +
  +      Transaction tx;
  +
  +      HandleRunnable(Future result, DeliveryObserver sender, Routable routable,
  +               Transaction tx)
  +      {
  +         this.result = result;
  +         this.sender = sender;
  +         this.routable = routable;
  +         this.tx = tx;
  +      }
  +
  +      public void run()
  +      {
  +         Delivery d = handleInternal(sender, routable, tx);
  +
  +         result.setResult(d);
  +      }
  +   }
  +
   }
  
  
  
  1.17      +4 -2      jboss-jms/src/main/org/jboss/messaging/core/Routable.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: Routable.java
  ===================================================================
  RCS file: /cvsroot/jboss/jboss-jms/src/main/org/jboss/messaging/core/Routable.java,v
  retrieving revision 1.16
  retrieving revision 1.17
  diff -u -b -r1.16 -r1.17
  --- Routable.java	7 Mar 2006 17:11:15 -0000	1.16
  +++ Routable.java	17 Jul 2006 17:14:47 -0000	1.17
  @@ -36,9 +36,9 @@
    *
    * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
    * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  - * @version <tt>$Revision: 1.16 $</tt>
  + * @version <tt>$Revision: 1.17 $</tt>
    *
  - * $Id: Routable.java,v 1.16 2006/03/07 17:11:15 timfox Exp $
  + * $Id: Routable.java,v 1.17 2006/07/17 17:14:47 timfox Exp $
    */
   public interface Routable extends Serializable
   {
  @@ -94,6 +94,8 @@
      
      void incrementDeliveryCount();
      
  +   void decrementDeliveryCount();
  +   
      /**
       * Binds a header. If the header map previously contained a mapping for this name, the old value
       * is replaced by the specified value.
  
  
  



More information about the jboss-cvs-commits mailing list