[jboss-cvs] JBoss Messaging SVN: r7450 - in trunk: src/main/org/jboss/messaging/core/server and 6 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Jun 24 05:52:53 EDT 2009


Author: jmesnil
Date: 2009-06-24 05:52:52 -0400 (Wed, 24 Jun 2009)
New Revision: 7450

Added:
   trunk/src/main/org/jboss/messaging/utils/concurrent/
   trunk/src/main/org/jboss/messaging/utils/concurrent/BlockingDeque.java
   trunk/src/main/org/jboss/messaging/utils/concurrent/Deque.java
   trunk/src/main/org/jboss/messaging/utils/concurrent/LinkedBlockingDeque.java
   trunk/tests/src/org/jboss/messaging/tests/unit/util/concurrent/
   trunk/tests/src/org/jboss/messaging/tests/unit/util/concurrent/LinkedBlockingDequeTest.java
Modified:
   trunk/src/main/org/jboss/messaging/core/list/impl/PriorityLinkedListImpl.java
   trunk/src/main/org/jboss/messaging/core/server/Queue.java
   trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/BindingImplTest.java
Log:
JBMESSAGING-1437: Browsers should iterate on queue

* used LinkedBlockingDeque's java 5 backport to provide non fail-fast iterators
* refactored ServerConsumerImpl so that if it is a browser, it will iterate
  on the queue to deliver messages instead of making a snapshot copy of the queue
* "browser" ServerConsumerImpl are not added to the QueueIml's consumers as they are
  not delivered the messages through the queue distributor

Modified: trunk/src/main/org/jboss/messaging/core/list/impl/PriorityLinkedListImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/list/impl/PriorityLinkedListImpl.java	2009-06-24 04:49:11 UTC (rev 7449)
+++ trunk/src/main/org/jboss/messaging/core/list/impl/PriorityLinkedListImpl.java	2009-06-24 09:52:52 UTC (rev 7450)
@@ -24,26 +24,27 @@
 
 import java.util.ArrayList;
 import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
-import java.util.ListIterator;
 import java.util.NoSuchElementException;
 
 import org.jboss.messaging.core.list.PriorityLinkedList;
+import org.jboss.messaging.utils.concurrent.Deque;
+import org.jboss.messaging.utils.concurrent.LinkedBlockingDeque;
 
 /**
  * A priority linked list implementation
  * 
- * It implements this by maintaining an individual LinkedList for each priority level.
+ * It implements this by maintaining an individual LinkedBlockingDeque for each priority level.
  * 
  * @author <a href="mailto:tim.fox at jboss.com>Tim Fox</a>
+ * @author <a href="mailto:jmesnil at redhat.com>Jeff Mesnil</a>
  * @version <tt>$Revision: 1174 $</tt>
  *
  * $Id: BasicPrioritizedDeque.java 1174 2006-08-02 14:14:32Z timfox $
  */
 public class PriorityLinkedListImpl<T> implements PriorityLinkedList<T>
 {
-   private final List<LinkedList<T>> linkedLists;
+   private final List<Deque<T>> levels;
 
    private final int priorities;
 
@@ -53,24 +54,24 @@
    {
       this.priorities = priorities;
 
-      linkedLists = new ArrayList<LinkedList<T>>();
+      levels = new ArrayList<Deque<T>>();
 
       for (int i = 0; i < priorities; i++)
       {
-         linkedLists.add(new LinkedList<T>());
+         levels.add(new LinkedBlockingDeque<T>());
       }
    }
 
    public void addFirst(final T t, final int priority)
    {
-      linkedLists.get(priority).addFirst(t);
+      levels.get(priority).addFirst(t);
 
       size++;
    }
 
    public void addLast(final T t, final int priority)
    {
-      linkedLists.get(priority).addLast(t);
+      levels.get(priority).addLast(t);
 
       size++;
    }
@@ -87,7 +88,7 @@
 
       for (int i = priorities - 1; i >= 0; i--)
       {
-         LinkedList<T> ll = linkedLists.get(i);
+         Deque<T> ll = levels.get(i);
 
          if (!ll.isEmpty())
          {
@@ -110,7 +111,7 @@
 
       for (int i = priorities - 1; i >= 0; i--)
       {
-         LinkedList<T> ll = linkedLists.get(i);
+         Deque<T> ll = levels.get(i);
          if (!ll.isEmpty())
          {
             t = ll.getFirst();
@@ -130,7 +131,7 @@
 
       for (int i = priorities - 1; i >= 0; i--)
       {
-         LinkedList<T> list = linkedLists.get(i);
+         Deque<T> list = levels.get(i);
          all.addAll(list);
       }
 
@@ -139,7 +140,7 @@
 
    public void clear()
    {
-      for (LinkedList<T> list : linkedLists)
+      for (Deque<T> list : levels)
       {
          list.clear();
       }
@@ -166,13 +167,13 @@
    {
       private int index;
 
-      private ListIterator<T> currentIter;
+      private Iterator<T> currentIter;
 
       PriorityLinkedListIterator()
       {
-         index = linkedLists.size() - 1;
+         index = levels.size() - 1;
 
-         currentIter = linkedLists.get(index).listIterator();
+         currentIter = levels.get(index).iterator();
       }
 
       public boolean hasNext()
@@ -191,7 +192,7 @@
 
             index--;
 
-            currentIter = linkedLists.get(index).listIterator();
+            currentIter = levels.get(index).iterator();
          }
          return currentIter.hasNext();
       }

Modified: trunk/src/main/org/jboss/messaging/core/server/Queue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/Queue.java	2009-06-24 04:49:11 UTC (rev 7449)
+++ trunk/src/main/org/jboss/messaging/core/server/Queue.java	2009-06-24 09:52:52 UTC (rev 7450)
@@ -22,6 +22,7 @@
 
 package org.jboss.messaging.core.server;
 
+import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.Executor;
@@ -151,4 +152,9 @@
    void lockDelivery();
    
    void unlockDelivery();
+
+   /**
+    * @return an immutable iterator which does not allow to remove references
+    */
+   Iterator<MessageReference> iterator();
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2009-06-24 04:49:11 UTC (rev 7449)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2009-06-24 09:52:52 UTC (rev 7450)
@@ -497,6 +497,29 @@
       return consumers;
    }
 
+   public Iterator<MessageReference> iterator()
+   {
+      return new Iterator<MessageReference>()
+      {
+         private final Iterator<MessageReference> iterator = messageReferences.iterator();
+         
+         public boolean hasNext()
+         {
+            return iterator.hasNext();
+         }
+
+         public MessageReference next()
+         {
+            return iterator.next();
+         }
+
+         public void remove()
+         {
+            throw new UnsupportedOperationException("iterator is immutable");
+         }
+      };
+   }
+
    public synchronized List<MessageReference> list(final Filter filter)
    {
       if (filter == null)

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2009-06-24 04:49:11 UTC (rev 7449)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2009-06-24 09:52:52 UTC (rev 7450)
@@ -51,7 +51,6 @@
 import org.jboss.messaging.core.server.HandleStatus;
 import org.jboss.messaging.core.server.LargeServerMessage;
 import org.jboss.messaging.core.server.MessageReference;
-import org.jboss.messaging.core.server.MessagingServer;
 import org.jboss.messaging.core.server.Queue;
 import org.jboss.messaging.core.server.ServerConsumer;
 import org.jboss.messaging.core.server.ServerMessage;
@@ -117,6 +116,8 @@
     */
    private final boolean browseOnly;
 
+   private Runnable browserDeliverer;
+
    private final boolean updateDeliveries;
 
    private final StorageManager storageManager;
@@ -189,7 +190,14 @@
 
       this.updateDeliveries = updateDeliveries;
       
-      binding.getQueue().addConsumer(this);
+      if (browseOnly)
+      {
+         browserDeliverer = new BrowserDeliverer(messageQueue.iterator());
+      }
+      else
+      {
+         messageQueue.addConsumer(this);
+      }
    }
 
    // ServerConsumer implementation
@@ -219,8 +227,11 @@
          largeMessageDeliverer.close();
       }
 
-      messageQueue.removeConsumer(this);
-
+      if (!browseOnly)
+      {
+         messageQueue.removeConsumer(this);
+      }
+      
       session.removeConsumer(this);
 
       LinkedList<MessageReference> refs = cancelRefs(false, null);
@@ -519,7 +530,14 @@
          }
          else
          {
-            session.promptDelivery(messageQueue);
+            if (browseOnly)
+            {
+               executor.execute(browserDeliverer);
+            }
+            else
+            {
+               session.promptDelivery(messageQueue);
+            }
          }
       }
       finally
@@ -739,8 +757,15 @@
          {
             if (largeMessageDeliverer == null || largeMessageDeliverer.deliver())
             {
-               // prompt Delivery only if chunk was finished
-               session.promptDelivery(messageQueue);
+               if (browseOnly)
+               {
+                  executor.execute(browserDeliverer);
+               }
+               else
+               {
+                  // prompt Delivery only if chunk was finished
+                  session.promptDelivery(messageQueue);
+               }
             }
          }
          finally
@@ -989,4 +1014,59 @@
          return chunk;
       }
    }
+   
+   private class BrowserDeliverer implements Runnable
+   {
+      private MessageReference current = null;
+      
+      public BrowserDeliverer(final Iterator<MessageReference> iterator)
+      {
+         this.iterator = iterator;
+      }
+
+      private final Iterator<MessageReference> iterator;
+      
+      public void run()
+      {
+         // if the reference was busy during the previous iteration, handle it now
+         if (current != null)
+         {
+            try
+            {
+               HandleStatus status = handle(current);
+               if (status == HandleStatus.BUSY)
+               {
+                  return;
+               }            
+            }
+            catch (Exception e)
+            {
+               log.warn("Exception while browser handled from " + messageQueue + ": " + current);
+               return;
+            }
+         }
+
+         while (iterator.hasNext())
+         {
+            MessageReference ref = (MessageReference)iterator.next();
+            try
+            {
+               HandleStatus status = handle(ref);
+               if (status == HandleStatus.BUSY)
+               {
+                  // keep a reference on the current message reference
+                  // to handle it next time the browser deliverer is executed
+                  current = ref;
+                  break;
+               }
+            }
+            catch (Exception e)
+            {
+               log.warn("Exception while browser handled from " + messageQueue + ": " + ref);
+               break;
+            }
+         }
+      }
+      
+   }
 }

Added: trunk/src/main/org/jboss/messaging/utils/concurrent/BlockingDeque.java
===================================================================
--- trunk/src/main/org/jboss/messaging/utils/concurrent/BlockingDeque.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/utils/concurrent/BlockingDeque.java	2009-06-24 09:52:52 UTC (rev 7450)
@@ -0,0 +1,238 @@
+/*
+ * Written by Doug Lea with assistance from members of JCP JSR-166
+ * Expert Group and released to the public domain, as explained at
+ * http://creativecommons.org/licenses/publicdomain
+ */
+
+package org.jboss.messaging.utils.concurrent;     // XXX This belongs in java.util!!! XXX
+import java.util.concurrent.*;  // XXX This import goes away        XXX
+import java.util.*;  
+
+/**
+ * A {@link Deque} that additionally supports operations that wait for
+ * the deque to become non-empty when retrieving an element, and wait
+ * for space to become available in the deque when storing an
+ * element. These methods are summarized in the following table:<p>
+ *
+ * <table BORDER CELLPADDING=3 CELLSPACING=1>
+ *  <tr>
+ *    <td></td>
+ *    <td ALIGN=CENTER COLSPAN = 2> <b>First Element (Head)</b></td>
+ *    <td ALIGN=CENTER COLSPAN = 2> <b>Last Element (Tail)</b></td>
+ *  </tr>
+ *  <tr>
+ *    <td></td>
+ *    <td ALIGN=CENTER><em>Block</em></td>
+ *    <td ALIGN=CENTER><em>Time out</em></td>
+ *    <td ALIGN=CENTER><em>Block</em></td>
+ *    <td ALIGN=CENTER><em>Time out</em></td>
+ *  </tr>
+ *  <tr>
+ *    <td><b>Insert</b></td>
+ *    <td>{@link #putFirst putFirst(e)}</td>
+ *    <td>{@link #offerFirst(Object, long, TimeUnit) offerFirst(e, time, unit)}</td>
+ *    <td>{@link #putLast putLast(e)}</td>
+ *    <td>{@link #offerLast(Object, long, TimeUnit) offerLast(e, time, unit)}</td> 
+ *  </tr>
+ *  <tr>
+ *    <td><b>Remove</b></td>
+ *    <td>{@link #takeFirst takeFirst()}</td>
+ *    <td>{@link #pollFirst(long, TimeUnit)  pollFirst(time, unit)}</td>
+ *    <td>{@link #takeLast takeLast()}</td>
+ *    <td>{@link #pollLast(long, TimeUnit) pollLast(time, unit)}</td>
+ *  </tr>
+ * </table>
+ *
+ * <p>Like any {@link BlockingQueue}, a <tt>BlockingDeque</tt> is
+ * thread safe and may (or may not) be capacity-constrained.  A
+ * <tt>BlockingDeque</tt> implementation may be used directly as a
+ * FIFO <tt>BlockingQueue</tt>. The blocking methods inherited from
+ * the <tt>BlockingQueue</tt> interface are precisely equivalent to
+ * <tt>BlockingDeque</tt> methods as indicated in the following table:<p>
+ *
+ * <table BORDER CELLPADDING=3 CELLSPACING=1>
+ *  <tr>
+ *    <td ALIGN=CENTER> <b><tt>BlockingQueue</tt> Method</b></td>
+ *    <td ALIGN=CENTER> <b>Equivalent <tt>BlockingDeque</tt> Method</b></td>
+ *  </tr>
+ *  <tr>
+ *   <tr>
+ *    <td>{@link java.util.concurrent.BlockingQueue#put put(e)}</td>
+ *    <td>{@link #putLast putLast(e)}</td>
+ *   </tr>
+ *   <tr>
+ *    <td>{@link java.util.concurrent.BlockingQueue#take take()}</td>
+ *    <td>{@link #takeFirst takeFirst()}</td>
+ *   </tr>
+ *   <tr>
+ *    <td>{@link java.util.concurrent.BlockingQueue#offer(Object, long, TimeUnit) offer(e, time. unit)}</td>
+ *    <td>{@link #offerLast(Object, long, TimeUnit) offerLast(e, time, unit)}</td>
+ *   </tr>
+ *   <tr>
+ *    <td>{@link java.util.concurrent.BlockingQueue#poll(long, TimeUnit) poll(time, unit)}</td>
+ *    <td>{@link #pollFirst(long, TimeUnit) pollFirst(time, unit)}</td>
+ *   </tr>
+ * </table>
+ *
+ *
+ * <p>This interface is a member of the
+ * <a href="{@docRoot}/../guide/collections/index.html">
+ * Java Collections Framework</a>.
+ *
+ * @since 1.6
+ * @author Doug Lea
+ * @param <E> the type of elements held in this collection
+ */
+public interface BlockingDeque<E> extends Deque<E>, BlockingQueue<E> {
+
+    /**
+     * Adds the specified element as the first element of this deque,
+     * waiting if necessary for space to become available.
+     * @param o the element to add
+     * @throws InterruptedException if interrupted while waiting.
+     * @throws NullPointerException if the specified element is <tt>null</tt>.
+     */
+    void putFirst(E o) throws InterruptedException;
+
+    /**
+     * Adds the specified element as the last element of this deque,
+     * waiting if necessary for space to become available.
+     * @param o the element to add
+     * @throws InterruptedException if interrupted while waiting.
+     * @throws NullPointerException if the specified element is <tt>null</tt>.
+     */
+    void putLast(E o) throws InterruptedException;
+
+    /**
+     * Retrieves and removes the first element of this deque, waiting
+     * if no elements are present on this deque.
+     * @return the head of this deque
+     * @throws InterruptedException if interrupted while waiting.
+     */
+    E takeFirst() throws InterruptedException;
+
+    /**
+     * Retrieves and removes the last element of this deque, waiting
+     * if no elements are present on this deque.
+     * @return the head of this deque
+     * @throws InterruptedException if interrupted while waiting.
+     */
+    E takeLast() throws InterruptedException;
+
+    /**
+     * Inserts the specified element as the first element of this deque,
+     * waiting if necessary up to the specified wait time for space to
+     * become available.
+     * @param o the element to add
+     * @param timeout how long to wait before giving up, in units of
+     * <tt>unit</tt>
+     * @param unit a <tt>TimeUnit</tt> determining how to interpret the
+     * <tt>timeout</tt> parameter
+     * @return <tt>true</tt> if successful, or <tt>false</tt> if
+     * the specified waiting time elapses before space is available.
+     * @throws InterruptedException if interrupted while waiting.
+     * @throws NullPointerException if the specified element is <tt>null</tt>.
+     */
+    boolean offerFirst(E o, long timeout, TimeUnit unit)
+        throws InterruptedException;
+
+    /**
+     * Inserts the specified element as the last element of this deque,
+     * waiting if necessary up to the specified wait time for space to
+     * become available.
+     * @param o the element to add
+     * @param timeout how long to wait before giving up, in units of
+     * <tt>unit</tt>
+     * @param unit a <tt>TimeUnit</tt> determining how to interpret the
+     * <tt>timeout</tt> parameter
+     * @return <tt>true</tt> if successful, or <tt>false</tt> if
+     * the specified waiting time elapses before space is available.
+     * @throws InterruptedException if interrupted while waiting.
+     * @throws NullPointerException if the specified element is <tt>null</tt>.
+     */
+    boolean offerLast(E o, long timeout, TimeUnit unit)
+        throws InterruptedException;
+
+
+    /**
+     * Retrieves and removes the first element of this deque, waiting
+     * if necessary up to the specified wait time if no elements are
+     * present on this deque.
+     * @param timeout how long to wait before giving up, in units of
+     * <tt>unit</tt>
+     * @param unit a <tt>TimeUnit</tt> determining how to interpret the
+     * <tt>timeout</tt> parameter
+     * @return the head of this deque, or <tt>null</tt> if the
+     * specified waiting time elapses before an element is present.
+     * @throws InterruptedException if interrupted while waiting.
+     */
+    E pollFirst(long timeout, TimeUnit unit)
+        throws InterruptedException;
+
+    /**
+     * Retrieves and removes the last element of this deque, waiting
+     * if necessary up to the specified wait time if no elements are
+     * present on this deque.
+     * @param timeout how long to wait before giving up, in units of
+     * <tt>unit</tt>
+     * @param unit a <tt>TimeUnit</tt> determining how to interpret the
+     * <tt>timeout</tt> parameter
+     * @return the head of this deque, or <tt>null</tt> if the
+     * specified waiting time elapses before an element is present.
+     * @throws InterruptedException if interrupted while waiting.
+     */
+    E pollLast(long timeout, TimeUnit unit)
+        throws InterruptedException;
+
+    /**
+     * Adds the specified element as the last element of this deque,
+     * waiting if necessary for space to become available.  This
+     * method is equivalent to to putLast
+     * @param o the element to add
+     * @throws InterruptedException if interrupted while waiting.
+     * @throws NullPointerException if the specified element is <tt>null</tt>.
+     */
+    void put(E o) throws InterruptedException;
+
+    /** 
+     * Inserts the specified element as the lest element of this
+     * deque, if possible.  When using deques that may impose
+     * insertion restrictions (for example capacity bounds), method
+     * <tt>offer</tt> is generally preferable to method {@link
+     * Collection#add}, which can fail to insert an element only by
+     * throwing an exception.  This method is equivalent to to
+     * offerLast
+     *
+     * @param o the element to add.
+     * @return <tt>true</tt> if it was possible to add the element to
+     *         this deque, else <tt>false</tt>
+     * @throws NullPointerException if the specified element is <tt>null</tt>
+     */
+    boolean offer(E o, long timeout, TimeUnit unit)
+        throws InterruptedException;
+
+    /**
+     * Retrieves and removes the first element of this deque, waiting
+     * if no elements are present on this deque.
+     * This method is equivalent to to takeFirst 
+     * @return the head of this deque
+     * @throws InterruptedException if interrupted while waiting.
+     */
+    E take() throws InterruptedException;
+
+    /**
+     * Retrieves and removes the first element of this deque, waiting
+     * if necessary up to the specified wait time if no elements are
+     * present on this deque.  This method is equivalent to to
+     * pollFirst
+     * @param timeout how long to wait before giving up, in units of
+     * <tt>unit</tt>
+     * @param unit a <tt>TimeUnit</tt> determining how to interpret the
+     * <tt>timeout</tt> parameter
+     * @return the head of this deque, or <tt>null</tt> if the
+     * specified waiting time elapses before an element is present.
+     * @throws InterruptedException if interrupted while waiting.
+     */
+    E poll(long timeout, TimeUnit unit)
+        throws InterruptedException;
+}

Added: trunk/src/main/org/jboss/messaging/utils/concurrent/Deque.java
===================================================================
--- trunk/src/main/org/jboss/messaging/utils/concurrent/Deque.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/utils/concurrent/Deque.java	2009-06-24 09:52:52 UTC (rev 7450)
@@ -0,0 +1,442 @@
+/*
+ * Written by Doug Lea and Josh Bloch with assistance from members of
+ * JCP JSR-166 Expert Group and released to the public domain, as explained
+ * at http://creativecommons.org/licenses/publicdomain
+ */
+
+package org.jboss.messaging.utils.concurrent;     // XXX This belongs in java.util!!! XXX
+import java.util.*;    // XXX This import goes away        XXX
+
+/**
+ * A linear collection that supports element insertion and removal at
+ * both ends.  The name <i>deque</i> is short for "double ended queue"
+ * and is usually pronounced "deck".  Most <tt>Deque</tt>
+ * implementations place no fixed limits on the number of elements
+ * they may contain, but this interface supports capacity-restricted
+ * deques as well as those with no fixed size limit.
+ *
+ * <p>This interface defines methods to access the elements at both
+ * ends of the deque.  Methods are provided to insert, remove, and
+ * examine the element.  Each of these methods exists in two forms:
+ * one throws an exception if the operation fails, the other returns a
+ * special value (either <tt>null</tt> or <tt>false</tt>, depending on
+ * the operation).  The latter form of the insert operation is
+ * designed specifically for use with capacity-restricted
+ * <tt>Deque</tt> implementations; in most implementations, insert
+ * operations cannot fail.
+ *
+ * <p>The twelve methods described above are are summarized in the 
+ * follwoing table:<p>
+ * 
+ * <table BORDER CELLPADDING=3 CELLSPACING=1>
+ *  <tr>
+ *    <td></td>
+ *    <td ALIGN=CENTER COLSPAN = 2> <b>First Element (Head)</b></td>
+ *    <td ALIGN=CENTER COLSPAN = 2> <b>Last Element (Tail)</b></td>
+ *  </tr>
+ *  <tr>
+ *    <td></td>
+ *    <td ALIGN=CENTER><em>Throws exception</em></td>
+ *    <td ALIGN=CENTER><em>Returns special value</em></td>
+ *    <td ALIGN=CENTER><em>Throws exception</em></td>
+ *    <td ALIGN=CENTER><em>Returns special value</em></td>
+ *  </tr>
+ *  <tr>
+ *    <td><b>Insert</b></td>
+ *    <td>{@link #addFirst addFirst(e)}</td>
+ *    <td>{@link #offerFirst offerFirst(e)}</td>
+ *    <td>{@link #addLast addLast(e)}</td>
+ *    <td>{@link #offerLast offerLast(e)}</td>
+ *  </tr>
+ *  <tr>
+ *    <td><b>Remove</b></td>
+ *    <td>{@link #removeFirst removeFirst()}</td>
+ *    <td>{@link #pollFirst pollFirst()}</td>
+ *    <td>{@link #removeLast removeLast()}</td>
+ *    <td>{@link #pollLast pollLast()}</td>
+ *  </tr>
+ *  <tr>
+ *    <td><b>Examine</b></td>
+ *    <td>{@link #getFirst getFirst()}</td>
+ *    <td>{@link #peekFirst peekFirst()}</td>
+ *    <td>{@link #getLast getLast()}</td>
+ *    <td>{@link #peekLast peekLast()}</td>
+ *  </tr>
+ * </table>
+ *
+ * <p>This interface extends the {@link Queue} interface.  When a deque is
+ * used as a queue, FIFO (First-In-First-Out) behavior results.  Elements are
+ * added to the end of the deque and removed from the beginning.  The methods
+ * inherited from the <tt>Queue</tt> interface are precisely equivalent to
+ * <tt>Deque</tt> methods as indicated in the following table:<p>
+ *
+ * <table BORDER CELLPADDING=3 CELLSPACING=1>
+ *  <tr>
+ *    <td ALIGN=CENTER> <b><tt>Queue</tt> Method</b></td>
+ *    <td ALIGN=CENTER> <b>Equivalent <tt>Deque</tt> Method</b></td>
+ *  </tr>
+ *  <tr>
+ *   <tr>
+ *    <td>{@link java.util.Queue#offer offer(e)}</td>
+ *    <td>{@link #offerLast offerLast(e)}</td>
+ *   </tr>
+ *   <tr>
+ *    <td>{@link java.util.Queue#add add(e)}</td>
+ *    <td>{@link #addLast addLast(e)}</td>
+ *   </tr>
+ *   <tr>
+ *    <td>{@link java.util.Queue#poll poll()}</td>
+ *    <td>{@link #pollFirst pollFirst()}</td>
+ *   </tr>
+ *   <tr>
+ *    <td>{@link java.util.Queue#remove remove()}</td>
+ *    <td>{@link #removeFirst removeFirst()}</td>
+ *   </tr>
+ *   <tr>
+ *    <td>{@link java.util.Queue#peek peek()}</td>
+ *    <td>{@link #peek peekFirst()}</td>
+ *   </tr>
+ *   <tr>
+ *    <td>{@link java.util.Queue#element element()}</td>
+ *    <td>{@link #getFirst getFirst()}</td>
+ *   </tr>
+ * </table>
+ *
+ * <p>Deques can also be used as LIFO (Last-In-First-Out) stacks.  This
+ * interface should be used in preference to the legacy {@link Stack} class.
+ * When a dequeue is used as a stack, elements are pushed and popped from the
+ * beginning of the deque.  Stack methods are precisely equivalent to
+ * <tt>Deque</tt> methods as indicated in the table below:<p>
+ *
+ * <table BORDER CELLPADDING=3 CELLSPACING=1>
+ *  <tr>
+ *    <td ALIGN=CENTER> <b>Stack Method</b></td>
+ *    <td ALIGN=CENTER> <b>Equivalent <tt>Deque</tt> Method</b></td>
+ *  </tr>
+ *  <tr>
+ *   <tr>
+ *    <td>{@link #push push(e)}</td>
+ *    <td>{@link #addFirst addFirst(e)}</td>
+ *   </tr>
+ *   <tr>
+ *    <td>{@link #pop pop()}</td>
+ *    <td>{@link #removeFirst removeFirst()}</td>
+ *   </tr>
+ *   <tr>
+ *    <td>{@link #peek peek()}</td>
+ *    <td>{@link #peekFirst peekFirst()}</td>
+ *   </tr>
+ * </table>
+ *
+ * <p>Note that the {@link #peek peek} method works equally well when
+ * a deque is used as a queue or a stack; in either case, elements are
+ * drawn from the beginning of the deque.
+ *
+ * <p>This inteface provides two methods to to remove interior
+ * elements, {@link #removeFirstOccurrence removeFirstOccurrence} and
+ * {@link #removeLastOccurrence removeLastOccurrence}.  Unlike the
+ * {@link List} interface, this interface does not provide support for
+ * indexed access to elements.
+ *
+ * <p>While <tt>Deque</tt> implementations are not strictly required
+ * to prohibit the insertion of null elements, they are strongly
+ * encouraged to do so.  Users of any <tt>Deque</tt> implementations
+ * that do allow null elements are strongly encouraged <i>not</i> to
+ * take advantage of the ability to insert nulls.  This is so because
+ * <tt>null</tt> is used as a special return value by various methods
+ * to indicated that the deque is empty.
+ * 
+ * <p><tt>Deque</tt> implementations generally do not define
+ * element-based versions of the <tt>equals</tt> and <tt>hashCode</tt>
+ * methods, but instead inherit the identity-based versions from class
+ * <tt>Object</tt>.
+ *
+ * <p>This interface is a member of the <a
+ * href="{@docRoot}/../guide/collections/index.html"> Java Collections
+ * Framework</a>.
+ *
+ * @author Doug Lea
+ * @author Josh Bloch
+ * @since  1.6
+ * @param <E> the type of elements held in this collection
+ */
+public interface Deque<E> extends Queue<E> {
+    /**
+     * Inserts the specified element to the front this deque unless it would
+     * violate capacity restrictions.  When using a capacity-restricted deque,
+     * this method is generally preferable to method <tt>addFirst</tt>, which
+     * can fail to insert an element only by throwing an exception.
+     *
+     * @param e the element to insert
+     * @return <tt>true</tt> if it was possible to insert the element,
+     *     else <tt>false</tt>
+     * @throws NullPointerException if <tt>e</tt> is null and this
+     *     deque does not permit null elements
+     */
+    boolean offerFirst(E e);
+
+    /**
+     * Inserts the specified element to the end of this deque unless it would
+     * violate capacity restrictions.  When using a capacity-restricted deque,
+     * this method is generally preferable to method <tt>addLast</tt> which
+     * can fail to insert an element only by throwing an exception.
+     *
+     * @param e the element to insert
+     * @return <tt>true</tt> if it was possible to insert the element,
+     *     else <tt>false</tt>
+     * @throws NullPointerException if <tt>e</tt> is null and this
+     *     deque does not permit null elements
+     */
+    boolean offerLast(E e);
+
+    /**
+     * Inserts the specified element to the front of this deque unless it
+     * would violate capacity restrictions.
+     *
+     * @param e the element to insert
+     * @throws IllegalStateException if it was not possible to insert
+     *    the element due to capacity restrictions
+     * @throws NullPointerException if <tt>e</tt> is null and this
+     *     deque does not permit null elements
+     */
+    void addFirst(E e);
+
+    /**
+     * Inserts the specified element to the end of this deque unless it would
+     * violate capacity restrictions.
+     *
+     * @param e the element to insert
+     * @throws IllegalStateException if it was not possible to insert
+     *    the element due to capacity restrictions
+     * @throws NullPointerException if <tt>e</tt> is null and this
+     *     deque does not permit null elements
+     */
+    void addLast(E e);
+
+    /**
+     * Retrieves and removes the first element of this deque, or
+     * <tt>null</tt> if this deque is empty.
+     *
+     * @return the first element of this deque, or <tt>null</tt> if
+     *     this deque is empty
+     */
+    E pollFirst();
+
+    /**
+     * Retrieves and removes the last element of this deque, or
+     * <tt>null</tt> if this deque is empty.
+     *
+     * @return the last element of this deque, or <tt>null</tt> if
+     *     this deque is empty
+     */
+    E pollLast();
+
+    /**
+     * Removes and returns the first element of this deque.  This method
+     * differs from the <tt>pollFirst</tt> method only in that it throws an
+     * exception if this deque is empty.
+     *
+     * @return the first element of this deque
+     * @throws NoSuchElementException if this deque is empty
+     */
+    E removeFirst();
+
+    /**
+     * Retrieves and removes the last element of this deque.  This method
+     * differs from the <tt>pollLast</tt> method only in that it throws an
+     * exception if this deque is empty.
+     *
+     * @return the last element of this deque
+     * @throws NoSuchElementException if this deque is empty
+     */
+    E removeLast();
+
+    /**
+     * Retrieves, but does not remove, the first element of this deque,
+     * returning <tt>null</tt> if this deque is empty.
+     *
+     * @return the first element of this deque, or <tt>null</tt> if
+     *     this deque is empty
+     */
+    E peekFirst();
+
+    /**
+     * Retrieves, but does not remove, the last element of this deque,
+     * returning <tt>null</tt> if this deque is empty.
+     *
+     * @return the last element of this deque, or <tt>null</tt> if this deque
+     *     is empty
+     */
+    E peekLast();
+
+    /**
+     * Retrieves, but does not remove, the first element of this
+     * deque.  This method differs from the <tt>peek</tt> method only
+     * in that it throws an exception if this deque is empty.
+     *
+     * @return the first element of this deque
+     * @throws NoSuchElementException if this deque is empty
+     */
+    E getFirst();
+
+    /**
+     * Retrieves, but does not remove, the last element of this
+     * deque.  This method differs from the <tt>peek</tt> method only
+     * in that it throws an exception if this deque is empty.
+     *
+     * @return the last element of this deque
+     * @throws NoSuchElementException if this deque is empty
+     */
+    E getLast();
+
+    /**
+     * Removes the first occurrence of the specified element in this
+     * deque.  If the deque does not contain the element, it is
+     * unchanged.  More formally, removes the first element <tt>e</tt>
+     * such that <tt>(o==null ? e==null : o.equals(e))</tt> (if
+     * such an element exists).
+     *
+     * @param e element to be removed from this deque, if present
+     * @return <tt>true</tt> if the deque contained the specified element
+     * @throws NullPointerException if the specified element is <tt>null</tt>
+     */
+    boolean removeFirstOccurrence(Object e);
+
+    /**
+     * Removes the last occurrence of the specified element in this
+     * deque.  If the deque does not contain the element, it is
+     * unchanged.  More formally, removes the last element <tt>e</tt>
+     * such that <tt>(o==null ? e==null : o.equals(e))</tt> (if
+     * such an element exists).
+     *
+     * @param e element to be removed from this deque, if present
+     * @return <tt>true</tt> if the deque contained the specified element
+     * @throws NullPointerException if the specified element is <tt>null</tt>
+     */
+    boolean removeLastOccurrence(Object e);
+
+
+    // *** Queue methods ***
+
+    /**
+     * Inserts the specified element into the queue represented by this deque
+     * unless it would violate capacity restrictions.  In other words, inserts
+     * the specified element to the end of this deque.  When using a
+     * capacity-restricted deque, this method is generally preferable to the
+     * {@link #add} method, which can fail to insert an element only by
+     * throwing an exception.
+     *
+     * <p>This method is equivalent to {@link #offerLast}.
+     *
+     * @param e the element to insert
+     * @return <tt>true</tt> if it was possible to insert the element,
+     *     else <tt>false</tt>
+     * @throws NullPointerException if <tt>e</tt> is null and this
+     *     deque does not permit null elements
+     */
+    boolean offer(E e);
+
+    /**
+     * Inserts the specified element into the queue represented by this
+     * deque unless it would violate capacity restrictions.  In other words,
+     * inserts the specified element as the last element of this deque. 
+     *
+     * <p>This method is equivalent to {@link #addLast}.
+     *
+     * @param e the element to insert
+     * @return <tt>true</tt> (as per the spec for {@link Collection#add})
+     * @throws IllegalStateException if it was not possible to insert
+     *    the element due to capacity restrictions
+     * @throws NullPointerException if <tt>e</tt> is null and this
+     *     deque does not permit null elements
+     */
+    boolean add(E e);
+
+    /**
+     * Retrieves and removes the head of the queue represented by
+     * this deque, or <tt>null</tt> if this deque is empty.  In other words,
+     * retrieves and removes the first element of this deque, or <tt>null</tt>
+     * if this deque is empty.
+     *
+     * <p>This method is equivalent to {@link #pollFirst()}.
+     *
+     * @return the first element of this deque, or <tt>null</tt> if
+     *     this deque is empty
+     */
+    E poll();
+
+    /**
+     * Retrieves and removes the head of the queue represented by this deque.
+     * This method differs from the <tt>poll</tt> method only in that it
+     * throws an exception if this deque is empty.
+     *
+     * <p>This method is equivalent to {@link #removeFirst()}.
+     *
+     * @return the head of the queue represented by this deque
+     * @throws NoSuchElementException if this deque is empty
+     */
+    E remove();
+
+    /**
+     * Retrieves, but does not remove, the head of the queue represented by
+     * this deque, returning <tt>null</tt> if this deque is empty.
+     *
+     * <p>This method is equivalent to {@link #peekFirst()}
+     *
+     * @return the head of the queue represented by this deque, or
+     *     <tt>null</tt> if this deque is empty
+     */
+    E peek();
+
+    /**
+     * Retrieves, but does not remove, the head of the queue represented by
+     * this deque.  This method differs from the <tt>peek</tt> method only in
+     * that it throws an exception if this deque is empty.
+     *
+     * <p>This method is equivalent to {@link #getFirst()}
+     *
+     * @return the head of the queue represented by this deque
+     * @throws NoSuchElementException if this deque is empty
+     */
+    E element();
+
+
+    // *** Stack methods ***
+
+    /**
+     * Pushes an element onto the stack represented by this deque.  In other
+     * words, inserts the element to the front this deque unless it would
+     * violate capacity restrictions.
+     *
+     * <p>This method is equivalent to {@link #addFirst}.
+     *
+     * @throws IllegalStateException if it was not possible to insert
+     *    the element due to capacity restrictions
+     * @throws NullPointerException if <tt>e</tt> is null and this
+     *     deque does not permit null elements
+     */
+    void push(E e);
+
+    /**
+     * Pops an element from the stack represented by this deque.  In other
+     * words, removes and returns the the first element of this deque.
+     *
+     * <p>This method is equivalent to {@link #removeFirst()}.
+     *
+     * @return the element at the front of this deque (which is the top
+     *     of the stack represented by this deque)
+     * @throws NoSuchElementException if this deque is empty
+     */
+    E pop();
+
+
+    // *** Collection Method ***
+
+    /**
+     * Returns an iterator over the elements in this deque.  The elements
+     * will be ordered from first (head) to last (tail).
+     * 
+     * @return an <tt>Iterator</tt> over the elements in this deque
+     */
+    Iterator<E> iterator();
+}

Added: trunk/src/main/org/jboss/messaging/utils/concurrent/LinkedBlockingDeque.java
===================================================================
--- trunk/src/main/org/jboss/messaging/utils/concurrent/LinkedBlockingDeque.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/utils/concurrent/LinkedBlockingDeque.java	2009-06-24 09:52:52 UTC (rev 7450)
@@ -0,0 +1,762 @@
+/*
+ * Written by Doug Lea with assistance from members of JCP JSR-166
+ * Expert Group and released to the public domain, as explained at
+ * http://creativecommons.org/licenses/publicdomain
+ */
+
+package org.jboss.messaging.utils.concurrent;
+
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.locks.*;
+
+/**
+ * An optionally-bounded {@linkplain BlockingDeque blocking deque} based on
+ * linked nodes.
+ *
+ * <p> The optional capacity bound constructor argument serves as a
+ * way to prevent excessive expansion. The capacity, if unspecified,
+ * is equal to {@link Integer#MAX_VALUE}.  Linked nodes are
+ * dynamically created upon each insertion unless this would bring the
+ * deque above capacity.
+ *
+ * <p>Most operations run in constant time (ignoring time spent
+ * blocking).  Exceptions include {@link #remove(Object) remove},
+ * {@link #removeFirstOccurrence removeFirstOccurrence}, {@link
+ * #removeLastOccurrence removeLastOccurrence}, {@link #contains
+ * contains }, {@link #iterator iterator.remove()}, and the bulk
+ * operations, all of which run in linear time.
+ *
+ * <p>This class and its iterator implement all of the
+ * <em>optional</em> methods of the {@link Collection} and {@link
+ * Iterator} interfaces.  This class is a member of the <a
+ * href="{@docRoot}/../guide/collections/index.html"> Java Collections
+ * Framework</a>.
+ *
+ * @since 1.6
+ * @author  Doug Lea
+ * @param <E> the type of elements held in this collection
+ */
+public class LinkedBlockingDeque<E>
+    extends AbstractQueue<E>
+    implements BlockingDeque<E>,  java.io.Serializable {
+
+    /*
+     * Implemented as a simple doubly-linked list protected by a
+     * single lock and using conditions to manage blocking.
+     */
+
+    private static final long serialVersionUID = -387911632671998426L;
+
+    /** Doubly-linked list node class */
+    static final class Node<E> {
+   E item; 
+        Node<E> prev;
+        Node<E> next;
+        Node(E x, Node<E> p, Node<E> n) {
+            item = x;
+            prev = p;
+            next = n;
+        }
+    }
+
+    /** Pointer to first node */
+    private transient Node<E> first;
+    /** Pointer to last node */
+    private transient Node<E> last;
+    /** Number of items in the deque */
+    private transient int count;
+    /** Maximum number of items in the deque */
+    private final int capacity;
+    /** Main lock guarding all access */
+    private final ReentrantLock lock = new ReentrantLock();
+    /** Condition for waiting takes */
+    private final Condition notEmpty = lock.newCondition();
+    /** Condition for waiting puts */
+    private final Condition notFull = lock.newCondition();
+
+    /**
+     * Creates a <tt>LinkedBlockingDeque</tt> with a capacity of
+     * {@link Integer#MAX_VALUE}.
+     */
+    public LinkedBlockingDeque() {
+        this(Integer.MAX_VALUE);
+    }
+
+    /**
+     * Creates a <tt>LinkedBlockingDeque</tt> with the given (fixed)
+     * capacity.
+     * @param capacity the capacity of this deque
+     * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1
+     */
+    public LinkedBlockingDeque(int capacity) {
+        if (capacity <= 0) throw new IllegalArgumentException();
+        this.capacity = capacity;
+    }
+
+    /**
+     * Creates a <tt>LinkedBlockingDeque</tt> with a capacity of
+     * {@link Integer#MAX_VALUE}, initially containing the elements of the
+     * given collection,
+     * added in traversal order of the collection's iterator.
+     * @param c the collection of elements to initially contain
+     * @throws NullPointerException if <tt>c</tt> or any element within it
+     * is <tt>null</tt>
+     */
+    public LinkedBlockingDeque(Collection<? extends E> c) {
+        this(Integer.MAX_VALUE);
+        for (E e : c)
+            add(e);
+    }
+
+
+    // Basic linking and unlinking operations, called only while holding lock
+
+    /**
+     * Link e as first element, or return false if full
+     */
+    private boolean linkFirst(E e) {
+        if (count >= capacity)
+            return false;
+        ++count;
+        Node<E> f = first;
+        Node<E> x = new Node<E>(e, null, f);
+        first = x;
+        if (last == null)
+            last = x;
+        else
+            f.prev = x;
+        notEmpty.signal();
+        return true;
+    }
+
+    /**
+     * Link e as last element, or return false if full
+     */
+    private boolean linkLast(E e) {
+        if (count >= capacity)
+            return false;
+        ++count;
+        Node<E> l = last;
+        Node<E> x = new Node<E>(e, l, null);
+        last = x;
+        if (first == null)
+            first = x;
+        else
+            l.next = x;
+        notEmpty.signal();
+        return true;
+    }
+
+    /**
+     * Remove and return first element, or null if empty
+     */
+    private E unlinkFirst() {
+        Node<E> f = first;
+        if (f == null)
+            return null;
+        Node<E> n = f.next;
+        first = n;
+        if (n == null) 
+            last = null;
+        else 
+            n.prev = null;
+        --count;
+        notFull.signal();
+        return f.item;
+    }
+
+    /**
+     * Remove and return last element, or null if empty
+     */
+    private E unlinkLast() {
+        Node<E> l = last;
+        if (l == null)
+            return null;
+        Node<E> p = l.prev;
+        last = p;
+        if (p == null) 
+            first = null;
+        else 
+            p.next = null;
+        --count;
+        notFull.signal();
+        return l.item;
+    }
+
+    /**
+     * Unlink e
+     */
+    private void unlink(Node<E> x) {
+        Node<E> p = x.prev;
+        Node<E> n = x.next;
+        if (p == null) {
+            if (n == null) 
+                first = last = null;
+            else {
+                n.prev = null;
+                first = n;
+            }
+        } else if (n == null) {
+            p.next = null;
+            last = p;
+        } else {
+            p.next = n;
+            n.prev = p;
+        }
+        --count;
+        notFull.signalAll();
+    }
+
+    // Deque methods
+
+    public boolean offerFirst(E o) {
+        if (o == null) throw new NullPointerException();
+        lock.lock();
+        try {
+            return linkFirst(o);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public boolean offerLast(E o) {
+        if (o == null) throw new NullPointerException();
+        lock.lock();
+        try {
+            return linkLast(o);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public void addFirst(E e) { 
+        if (!offerFirst(e))
+            throw new IllegalStateException("Deque full");
+    }
+
+    public void addLast(E e) { 
+        if (!offerLast(e))
+            throw new IllegalStateException("Deque full");
+    }
+
+    public E pollFirst() {
+        lock.lock();
+        try {
+            return unlinkFirst();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public E pollLast() {
+        lock.lock();
+        try {
+            return unlinkLast();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public E removeFirst() {
+        E x = pollFirst();
+        if (x == null) throw new NoSuchElementException();
+        return x;
+    }
+
+    public E removeLast() {
+        E x = pollLast();
+        if (x == null) throw new NoSuchElementException();
+        return x;
+    }
+
+    public E peekFirst() {
+        lock.lock();
+        try {
+            return (first == null) ? null : first.item;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public E peekLast() {
+        lock.lock();
+        try {
+            return (last == null) ? null : last.item;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public E getFirst() {
+        E x = peekFirst();
+        if (x == null) throw new NoSuchElementException();
+        return x;
+    }
+
+    public E getLast() {
+        E x = peekLast();
+        if (x == null) throw new NoSuchElementException();
+        return x;
+    }
+
+    // BlockingDeque methods
+
+    public void putFirst(E o) throws InterruptedException {
+        if (o == null) throw new NullPointerException();
+        lock.lock();
+        try {
+            while (!linkFirst(o))
+                notFull.await();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public void putLast(E o) throws InterruptedException {
+        if (o == null) throw new NullPointerException();
+        lock.lock();
+        try {
+            while (!linkLast(o))
+                notFull.await();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public E takeFirst() throws InterruptedException {
+        lock.lock();
+        try {
+            E x;
+            while ( (x = unlinkFirst()) == null)
+                notEmpty.await();
+            return x;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public E takeLast() throws InterruptedException {
+        lock.lock();
+        try {
+            E x;
+            while ( (x = unlinkLast()) == null)
+                notEmpty.await();
+            return x;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public boolean offerFirst(E o, long timeout, TimeUnit unit)
+        throws InterruptedException {
+        if (o == null) throw new NullPointerException();
+        lock.lockInterruptibly();
+        try {
+            long nanos = unit.toNanos(timeout);
+            for (;;) {
+                if (linkFirst(o))
+                    return true;
+                if (nanos <= 0)
+                    return false;
+                nanos = notFull.awaitNanos(nanos);
+            }
+        } finally {
+            lock.unlock();
+        }
+    }
+        
+    public boolean offerLast(E o, long timeout, TimeUnit unit)
+        throws InterruptedException {
+        if (o == null) throw new NullPointerException();
+        lock.lockInterruptibly();
+        try {
+            long nanos = unit.toNanos(timeout);
+            for (;;) {
+                if (linkLast(o))
+                    return true;
+                if (nanos <= 0)
+                    return false;
+                nanos = notFull.awaitNanos(nanos);
+            }
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public E pollFirst(long timeout, TimeUnit unit) 
+        throws InterruptedException {
+        lock.lockInterruptibly();
+        try {
+            long nanos = unit.toNanos(timeout);
+            for (;;) {
+                E x = unlinkFirst();
+                if (x != null)
+                    return x;
+                if (nanos <= 0)
+                    return null;
+                nanos = notEmpty.awaitNanos(nanos);
+            }
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public E pollLast(long timeout, TimeUnit unit) 
+        throws InterruptedException {
+        lock.lockInterruptibly();
+        try {
+            long nanos = unit.toNanos(timeout);
+            for (;;) {
+                E x = unlinkLast();
+                if (x != null)
+                    return x;
+                if (nanos <= 0)
+                    return null;
+                nanos = notEmpty.awaitNanos(nanos);
+            }
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    // Queue and stack methods
+
+    public boolean offer(E e)       { return offerLast(e); }
+    public boolean add(E e)         { addLast(e); return true; }
+    public void push(E e)           { addFirst(e); }
+    public E poll()                 { return pollFirst(); }
+    public E remove()               { return removeFirst(); }
+    public E pop()                  { return removeFirst(); }
+    public E peek()                 { return peekFirst(); }
+    public E element()              { return getFirst(); }
+    public boolean remove(Object o) { return removeFirstOccurrence(o); }
+
+    // BlockingQueue methods
+
+    public void put(E o) throws InterruptedException  { putLast(o);  }
+    public E take() throws InterruptedException       { return takeFirst(); }
+    public boolean offer(E o, long timeout, TimeUnit unit)
+        throws InterruptedException    { return offerLast(o, timeout, unit); }
+    public E poll(long timeout, TimeUnit unit)
+        throws InterruptedException    { return pollFirst(timeout, unit); }
+
+    /**
+     * Returns the number of elements in this deque.
+     *
+     * @return  the number of elements in this deque.
+     */
+    public int size() {
+        lock.lock();
+        try {
+            return count;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Returns the number of elements that this deque can ideally (in
+     * the absence of memory or resource constraints) accept without
+     * blocking. This is always equal to the initial capacity of this deque
+     * less the current <tt>size</tt> of this deque.
+     * <p>Note that you <em>cannot</em> always tell if
+     * an attempt to <tt>add</tt> an element will succeed by
+     * inspecting <tt>remainingCapacity</tt> because it may be the
+     * case that a waiting consumer is ready to <tt>take</tt> an
+     * element out of an otherwise full deque.
+     */
+    public int remainingCapacity() {
+        lock.lock();
+        try {
+            return capacity - count;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public boolean contains(Object o) {
+        if (o == null) return false;
+        lock.lock();
+        try {
+            for (Node<E> p = first; p != null; p = p.next) 
+                if (o.equals(p.item))
+                    return true;
+            return false;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public boolean removeFirstOccurrence(Object e) {
+        if (e == null) throw new NullPointerException();
+        lock.lock();
+        try {
+            for (Node<E> p = first; p != null; p = p.next) {
+                if (e.equals(p.item)) {
+                    unlink(p);
+                    return true;
+                }
+            }
+            return false;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public boolean removeLastOccurrence(Object e) {
+        if (e == null) throw new NullPointerException();
+        lock.lock();
+        try {
+            for (Node<E> p = last; p != null; p = p.prev) {
+                if (e.equals(p.item)) {
+                    unlink(p);
+                    return true;
+                }
+            }
+            return false;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Variant of removeFirstOccurrence needed by iterator.remove.
+     * Searches for the node, not its contents.
+     */
+   boolean removeNode(Node<E> e) {
+        lock.lock();
+        try {
+            for (Node<E> p = first; p != null; p = p.next) {
+                if (p == e) {
+                    unlink(p);
+                    return true;
+                }
+            }
+            return false;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public Object[] toArray() {
+        lock.lock();
+        try {
+            Object[] a = new Object[count];
+            int k = 0;
+            for (Node<E> p = first; p != null; p = p.next) 
+                a[k++] = p.item;
+            return a;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public <T> T[] toArray(T[] a) {
+        lock.lock();
+        try {
+            if (a.length < count)
+                a = (T[])java.lang.reflect.Array.newInstance(
+                    a.getClass().getComponentType(),
+                    count
+                    );
+
+            int k = 0;
+            for (Node<E> p = first; p != null; p = p.next) 
+                a[k++] = (T)p.item;
+            if (a.length > k)
+                a[k] = null;
+            return a;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public String toString() {
+        lock.lock();
+        try {
+            return super.toString();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Atomically removes all of the elements from this deque.
+     * The deque will be empty after this call returns.
+     */
+    public void clear() {
+        lock.lock();
+        try {
+            first = last = null;
+            count = 0;
+            notFull.signalAll();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public int drainTo(Collection<? super E> c) {
+        if (c == null)
+            throw new NullPointerException();
+        if (c == this)
+            throw new IllegalArgumentException();
+        lock.lock();
+        try {
+            for (Node<E> p = first; p != null; p = p.next) 
+                c.add(p.item);
+            int n = count;
+            count = 0;
+            first = last = null;
+            notFull.signalAll();
+            return n;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public int drainTo(Collection<? super E> c, int maxElements) {
+        if (c == null)
+            throw new NullPointerException();
+        if (c == this)
+            throw new IllegalArgumentException();
+        lock.lock();
+        try {
+            int n = 0;
+            while (n < maxElements && first != null) {
+                c.add(first.item);
+                first.prev = null;
+                first = first.next;
+                --count;
+                ++n;
+            }
+            if (first == null)
+                last = null;
+            notFull.signalAll();
+            return n;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Returns an iterator over the elements in this deque in proper sequence.
+     * The returned <tt>Iterator</tt> is a "weakly consistent" iterator that
+     * will never throw {@link java.util.ConcurrentModificationException},
+     * and guarantees to traverse elements as they existed upon
+     * construction of the iterator, and may (but is not guaranteed to)
+     * reflect any modifications subsequent to construction.
+     *
+     * @return an iterator over the elements in this deque in proper sequence.
+     */
+    public Iterator<E> iterator() {
+        return new Itr();
+    }
+
+    /**
+     * Iterator for LinkedBlockingDeque
+     */
+    private class Itr implements Iterator<E> {
+        private Node<E> next;
+
+        /**
+         * nextItem holds on to item fields because once we claim that
+         * an element exists in hasNext(), we must return item read
+         * under lock (in advance()) even if it was in the process of
+         * being removed when hasNext() was called.
+         **/
+        private E nextItem;
+
+        /**
+         * Node returned by most recent call to next. Needed by remove.
+         * Reset to null if this element is deleted by a call to remove.
+         */
+        private Node<E> last;
+
+        Itr() {
+            advance();
+        }
+
+        /**
+         * Advance next, or if not yet initialized, set to first node.
+         */
+        private void advance() { 
+            final ReentrantLock lock = LinkedBlockingDeque.this.lock;
+            lock.lock();
+            try {
+                next = (next == null)? first : next.next;
+                nextItem = (next == null)? null : next.item;
+            } finally {
+                lock.unlock();
+            }
+        }
+
+        public boolean hasNext() {
+            return next != null;
+        }
+
+        public E next() {
+            if (next == null)
+                throw new NoSuchElementException();
+            last = next;
+            E x = nextItem;
+            advance();
+            return x;
+        }
+
+        public void remove() {
+            Node<E> n = last;
+            if (n == null)
+                throw new IllegalStateException();
+            last = null;
+            // Note: removeNode rescans looking for this node to make
+            // sure it was not already removed. Otherwwise, trying to
+            // re-remove could corrupt list.
+            removeNode(n);
+        }
+    }
+
+    /**
+     * Save the state to a stream (that is, serialize it).
+     *
+     * @serialData The capacity (int), followed by elements (each an
+     * <tt>Object</tt>) in the proper order, followed by a null
+     * @param s the stream
+     */
+    private void writeObject(java.io.ObjectOutputStream s)
+        throws java.io.IOException {
+        lock.lock();
+        try {
+            // Write out capacity and any hidden stuff
+            s.defaultWriteObject();
+            // Write out all elements in the proper order.
+            for (Node<E> p = first; p != null; p = p.next)
+                s.writeObject(p.item);
+            // Use trailing null as sentinel
+            s.writeObject(null);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Reconstitute this deque instance from a stream (that is,
+     * deserialize it).
+     * @param s the stream
+     */
+    private void readObject(java.io.ObjectInputStream s)
+        throws java.io.IOException, ClassNotFoundException {
+        s.defaultReadObject();
+        count = 0;
+        first = null;
+        last = null;
+        // Read in all elements and place in queue
+        for (;;) {
+            E item = (E)s.readObject();
+            if (item == null)
+                break;
+            add(item);
+        }
+    }
+    
+}

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/BindingImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/BindingImplTest.java	2009-06-24 04:49:11 UTC (rev 7449)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/BindingImplTest.java	2009-06-24 09:52:52 UTC (rev 7450)
@@ -24,6 +24,7 @@
 
 import java.io.InputStream;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -1367,6 +1368,11 @@
 
          return null;
       }
+      
+      public Iterator<MessageReference> iterator()
+      {
+         return null;
+      }
 
       /* (non-Javadoc)
        * @see org.jboss.messaging.core.server.Queue#moveReference(long, org.jboss.messaging.utils.SimpleString)

Added: trunk/tests/src/org/jboss/messaging/tests/unit/util/concurrent/LinkedBlockingDequeTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/util/concurrent/LinkedBlockingDequeTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/util/concurrent/LinkedBlockingDequeTest.java	2009-06-24 09:52:52 UTC (rev 7450)
@@ -0,0 +1,278 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.unit.util.concurrent;
+
+import java.util.Iterator;
+
+import org.jboss.messaging.tests.util.UnitTestCase;
+import org.jboss.messaging.utils.concurrent.LinkedBlockingDeque;
+
+/**
+ * A LinkedBlockingDequeTest
+ *
+ * @author <a href="jmesnil at redhat.com>Jeff Mesnil</a>
+ */
+public class LinkedBlockingDequeTest extends UnitTestCase
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   public void testAddFirstWhileIterating() throws Exception
+   {
+      LinkedBlockingDeque<String> deque = new LinkedBlockingDeque<String>();
+
+      deque.addFirst("a");
+      deque.addFirst("b");
+
+      Iterator<String> iter = deque.iterator();
+
+      System.out.println(deque);
+
+      assertTrue(iter.hasNext());
+      assertEquals("b", iter.next());
+
+      deque.addFirst("c");
+
+      System.out.println(deque);
+
+      assertTrue(iter.hasNext());
+      assertEquals("a", iter.next());
+
+      assertFalse(iter.hasNext());
+   }
+
+   public void testAddLastWhileIterating() throws Exception
+   {
+      LinkedBlockingDeque<String> deque = new LinkedBlockingDeque<String>();
+
+      deque.addLast("a");
+      deque.addLast("b");
+
+      System.out.println(deque);
+
+      Iterator<String> iter = deque.iterator();
+
+      assertTrue(iter.hasNext());
+      assertEquals("a", iter.next());
+
+      deque.addLast("c");
+
+      System.out.println(deque);
+
+      assertTrue(iter.hasNext());
+      assertEquals("b", iter.next());
+
+      assertTrue(iter.hasNext());
+      assertEquals("c", iter.next());
+
+      assertFalse(iter.hasNext());
+   }
+
+   public void testRemoveFromHeadWhileIterating() throws Exception
+   {
+      LinkedBlockingDeque<String> deque = new LinkedBlockingDeque<String>();
+
+      deque.addLast("a");
+      deque.addLast("b");
+      deque.addLast("c");
+
+      System.out.println(deque);
+
+      Iterator<String> iter = deque.iterator();
+
+      assertTrue(iter.hasNext());
+      assertEquals("a", iter.next());
+
+      assertEquals("a", deque.removeFirst());
+      assertEquals("b", deque.removeFirst());
+      
+      System.out.println(deque);
+      
+      assertTrue(iter.hasNext());
+      assertEquals("b", iter.next());
+
+      assertTrue(iter.hasNext());
+      assertEquals("c", iter.next());
+
+      assertFalse(iter.hasNext());
+   }
+   
+   public void testRemoveFromHeadWhileIterating_2() throws Exception
+   {
+      LinkedBlockingDeque<String> deque = new LinkedBlockingDeque<String>();
+
+      deque.addLast("a");
+      deque.addLast("b");
+      deque.addLast("c");
+
+      System.out.println(deque);
+
+      Iterator<String> iter = deque.iterator();
+
+      assertTrue(iter.hasNext());
+      assertEquals("a", iter.next());
+
+      assertEquals("a", deque.removeFirst());
+      assertEquals("b", deque.removeFirst());
+      assertEquals("c", deque.removeFirst());
+      
+      assertEquals(0, deque.size());
+      
+      assertTrue(iter.hasNext());
+      assertEquals("b", iter.next());
+
+      assertTrue(iter.hasNext());
+      assertEquals("c", iter.next());
+
+      assertFalse(iter.hasNext());
+   }
+   
+   public void testRemoveFromHeadAndAddLastWhileIterating_2() throws Exception
+   {
+      LinkedBlockingDeque<String> deque = new LinkedBlockingDeque<String>();
+
+      deque.addLast("a");
+      deque.addLast("b");
+      deque.addLast("c");
+
+      System.out.println(deque);
+
+      Iterator<String> iter = deque.iterator();
+
+      assertTrue(iter.hasNext());
+      assertEquals("a", iter.next());
+
+      assertEquals("a", deque.removeFirst());
+      assertEquals("b", deque.removeFirst());
+      
+      assertEquals(1, deque.size());
+      
+      assertTrue(iter.hasNext());
+      assertEquals("b", iter.next());
+
+      assertTrue(iter.hasNext());
+      assertEquals("c", iter.next());
+
+      deque.addLast("d");
+      
+      assertFalse(iter.hasNext());
+   }
+   
+   public void testRemoveFromHeadAndAddFirstWhileIterating() throws Exception
+   {
+      LinkedBlockingDeque<String> deque = new LinkedBlockingDeque<String>();
+
+      deque.addLast("a");
+      deque.addLast("b");
+      deque.addLast("c");
+
+      System.out.println(deque);
+
+      Iterator<String> iter = deque.iterator();
+
+      assertTrue(iter.hasNext());
+      assertEquals("a", iter.next());
+
+      assertEquals("a", deque.removeFirst());
+      assertEquals("b", deque.removeFirst());
+     
+      deque.addFirst("d");
+      
+      System.out.println(deque);
+      
+      assertEquals(2, deque.size());
+      
+      assertTrue(iter.hasNext());
+      assertEquals("b", iter.next());
+
+      assertTrue(iter.hasNext());
+      assertEquals("c", iter.next());
+
+      assertFalse(iter.hasNext());
+   }
+   
+   public void test3() throws Exception
+   {
+      LinkedBlockingDeque<String> deque = new LinkedBlockingDeque<String>();
+
+      deque.addLast("a");
+      deque.addLast("b");
+      deque.addLast("c");
+      deque.addLast("d");
+
+      Iterator<String> iter = deque.iterator();
+
+      assertEquals("a", deque.removeFirst());
+      assertEquals("b", deque.removeFirst());
+      assertEquals("c", deque.removeFirst());
+      assertEquals("d", deque.removeFirst());
+      
+      deque.addFirst("e");
+
+      assertTrue(iter.hasNext());
+      assertEquals("a", iter.next());
+
+      assertTrue(iter.hasNext());
+      assertEquals("b", iter.next());
+
+      assertTrue(iter.hasNext());
+      assertEquals("c", iter.next());
+
+      assertTrue(iter.hasNext());
+      assertEquals("d", iter.next());
+
+      assertFalse(iter.hasNext());
+   }
+   
+   public void test4() throws Exception
+   {
+      LinkedBlockingDeque<String> deque = new LinkedBlockingDeque<String>();
+
+      deque.addLast("a");
+
+      Iterator<String> iter = deque.iterator();
+
+      assertEquals("a", deque.removeFirst());
+      deque.addLast("b");
+
+      assertTrue(iter.hasNext());
+      assertEquals("a", iter.next());
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}




More information about the jboss-cvs-commits mailing list