[jboss-cvs] JBoss Messaging SVN: r7450 - in trunk: src/main/org/jboss/messaging/core/server and 6 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Jun 24 05:52:53 EDT 2009
Author: jmesnil
Date: 2009-06-24 05:52:52 -0400 (Wed, 24 Jun 2009)
New Revision: 7450
Added:
trunk/src/main/org/jboss/messaging/utils/concurrent/
trunk/src/main/org/jboss/messaging/utils/concurrent/BlockingDeque.java
trunk/src/main/org/jboss/messaging/utils/concurrent/Deque.java
trunk/src/main/org/jboss/messaging/utils/concurrent/LinkedBlockingDeque.java
trunk/tests/src/org/jboss/messaging/tests/unit/util/concurrent/
trunk/tests/src/org/jboss/messaging/tests/unit/util/concurrent/LinkedBlockingDequeTest.java
Modified:
trunk/src/main/org/jboss/messaging/core/list/impl/PriorityLinkedListImpl.java
trunk/src/main/org/jboss/messaging/core/server/Queue.java
trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/BindingImplTest.java
Log:
JBMESSAGING-1437: Browsers should iterate on queue
* used LinkedBlockingDeque's java 5 backport to provide non fail-fast iterators
* refactored ServerConsumerImpl so that if it is a browser, it will iterate
on the queue to deliver messages instead of making a snapshot copy of the queue
* "browser" ServerConsumerImpl are not added to the QueueIml's consumers as they are
not delivered the messages through the queue distributor
Modified: trunk/src/main/org/jboss/messaging/core/list/impl/PriorityLinkedListImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/list/impl/PriorityLinkedListImpl.java 2009-06-24 04:49:11 UTC (rev 7449)
+++ trunk/src/main/org/jboss/messaging/core/list/impl/PriorityLinkedListImpl.java 2009-06-24 09:52:52 UTC (rev 7450)
@@ -24,26 +24,27 @@
import java.util.ArrayList;
import java.util.Iterator;
-import java.util.LinkedList;
import java.util.List;
-import java.util.ListIterator;
import java.util.NoSuchElementException;
import org.jboss.messaging.core.list.PriorityLinkedList;
+import org.jboss.messaging.utils.concurrent.Deque;
+import org.jboss.messaging.utils.concurrent.LinkedBlockingDeque;
/**
* A priority linked list implementation
*
- * It implements this by maintaining an individual LinkedList for each priority level.
+ * It implements this by maintaining an individual LinkedBlockingDeque for each priority level.
*
* @author <a href="mailto:tim.fox at jboss.com>Tim Fox</a>
+ * @author <a href="mailto:jmesnil at redhat.com>Jeff Mesnil</a>
* @version <tt>$Revision: 1174 $</tt>
*
* $Id: BasicPrioritizedDeque.java 1174 2006-08-02 14:14:32Z timfox $
*/
public class PriorityLinkedListImpl<T> implements PriorityLinkedList<T>
{
- private final List<LinkedList<T>> linkedLists;
+ private final List<Deque<T>> levels;
private final int priorities;
@@ -53,24 +54,24 @@
{
this.priorities = priorities;
- linkedLists = new ArrayList<LinkedList<T>>();
+ levels = new ArrayList<Deque<T>>();
for (int i = 0; i < priorities; i++)
{
- linkedLists.add(new LinkedList<T>());
+ levels.add(new LinkedBlockingDeque<T>());
}
}
public void addFirst(final T t, final int priority)
{
- linkedLists.get(priority).addFirst(t);
+ levels.get(priority).addFirst(t);
size++;
}
public void addLast(final T t, final int priority)
{
- linkedLists.get(priority).addLast(t);
+ levels.get(priority).addLast(t);
size++;
}
@@ -87,7 +88,7 @@
for (int i = priorities - 1; i >= 0; i--)
{
- LinkedList<T> ll = linkedLists.get(i);
+ Deque<T> ll = levels.get(i);
if (!ll.isEmpty())
{
@@ -110,7 +111,7 @@
for (int i = priorities - 1; i >= 0; i--)
{
- LinkedList<T> ll = linkedLists.get(i);
+ Deque<T> ll = levels.get(i);
if (!ll.isEmpty())
{
t = ll.getFirst();
@@ -130,7 +131,7 @@
for (int i = priorities - 1; i >= 0; i--)
{
- LinkedList<T> list = linkedLists.get(i);
+ Deque<T> list = levels.get(i);
all.addAll(list);
}
@@ -139,7 +140,7 @@
public void clear()
{
- for (LinkedList<T> list : linkedLists)
+ for (Deque<T> list : levels)
{
list.clear();
}
@@ -166,13 +167,13 @@
{
private int index;
- private ListIterator<T> currentIter;
+ private Iterator<T> currentIter;
PriorityLinkedListIterator()
{
- index = linkedLists.size() - 1;
+ index = levels.size() - 1;
- currentIter = linkedLists.get(index).listIterator();
+ currentIter = levels.get(index).iterator();
}
public boolean hasNext()
@@ -191,7 +192,7 @@
index--;
- currentIter = linkedLists.get(index).listIterator();
+ currentIter = levels.get(index).iterator();
}
return currentIter.hasNext();
}
Modified: trunk/src/main/org/jboss/messaging/core/server/Queue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/Queue.java 2009-06-24 04:49:11 UTC (rev 7449)
+++ trunk/src/main/org/jboss/messaging/core/server/Queue.java 2009-06-24 09:52:52 UTC (rev 7450)
@@ -22,6 +22,7 @@
package org.jboss.messaging.core.server;
+import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executor;
@@ -151,4 +152,9 @@
void lockDelivery();
void unlockDelivery();
+
+ /**
+ * @return an immutable iterator which does not allow to remove references
+ */
+ Iterator<MessageReference> iterator();
}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2009-06-24 04:49:11 UTC (rev 7449)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2009-06-24 09:52:52 UTC (rev 7450)
@@ -497,6 +497,29 @@
return consumers;
}
+ public Iterator<MessageReference> iterator()
+ {
+ return new Iterator<MessageReference>()
+ {
+ private final Iterator<MessageReference> iterator = messageReferences.iterator();
+
+ public boolean hasNext()
+ {
+ return iterator.hasNext();
+ }
+
+ public MessageReference next()
+ {
+ return iterator.next();
+ }
+
+ public void remove()
+ {
+ throw new UnsupportedOperationException("iterator is immutable");
+ }
+ };
+ }
+
public synchronized List<MessageReference> list(final Filter filter)
{
if (filter == null)
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2009-06-24 04:49:11 UTC (rev 7449)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2009-06-24 09:52:52 UTC (rev 7450)
@@ -51,7 +51,6 @@
import org.jboss.messaging.core.server.HandleStatus;
import org.jboss.messaging.core.server.LargeServerMessage;
import org.jboss.messaging.core.server.MessageReference;
-import org.jboss.messaging.core.server.MessagingServer;
import org.jboss.messaging.core.server.Queue;
import org.jboss.messaging.core.server.ServerConsumer;
import org.jboss.messaging.core.server.ServerMessage;
@@ -117,6 +116,8 @@
*/
private final boolean browseOnly;
+ private Runnable browserDeliverer;
+
private final boolean updateDeliveries;
private final StorageManager storageManager;
@@ -189,7 +190,14 @@
this.updateDeliveries = updateDeliveries;
- binding.getQueue().addConsumer(this);
+ if (browseOnly)
+ {
+ browserDeliverer = new BrowserDeliverer(messageQueue.iterator());
+ }
+ else
+ {
+ messageQueue.addConsumer(this);
+ }
}
// ServerConsumer implementation
@@ -219,8 +227,11 @@
largeMessageDeliverer.close();
}
- messageQueue.removeConsumer(this);
-
+ if (!browseOnly)
+ {
+ messageQueue.removeConsumer(this);
+ }
+
session.removeConsumer(this);
LinkedList<MessageReference> refs = cancelRefs(false, null);
@@ -519,7 +530,14 @@
}
else
{
- session.promptDelivery(messageQueue);
+ if (browseOnly)
+ {
+ executor.execute(browserDeliverer);
+ }
+ else
+ {
+ session.promptDelivery(messageQueue);
+ }
}
}
finally
@@ -739,8 +757,15 @@
{
if (largeMessageDeliverer == null || largeMessageDeliverer.deliver())
{
- // prompt Delivery only if chunk was finished
- session.promptDelivery(messageQueue);
+ if (browseOnly)
+ {
+ executor.execute(browserDeliverer);
+ }
+ else
+ {
+ // prompt Delivery only if chunk was finished
+ session.promptDelivery(messageQueue);
+ }
}
}
finally
@@ -989,4 +1014,59 @@
return chunk;
}
}
+
+ private class BrowserDeliverer implements Runnable
+ {
+ private MessageReference current = null;
+
+ public BrowserDeliverer(final Iterator<MessageReference> iterator)
+ {
+ this.iterator = iterator;
+ }
+
+ private final Iterator<MessageReference> iterator;
+
+ public void run()
+ {
+ // if the reference was busy during the previous iteration, handle it now
+ if (current != null)
+ {
+ try
+ {
+ HandleStatus status = handle(current);
+ if (status == HandleStatus.BUSY)
+ {
+ return;
+ }
+ }
+ catch (Exception e)
+ {
+ log.warn("Exception while browser handled from " + messageQueue + ": " + current);
+ return;
+ }
+ }
+
+ while (iterator.hasNext())
+ {
+ MessageReference ref = (MessageReference)iterator.next();
+ try
+ {
+ HandleStatus status = handle(ref);
+ if (status == HandleStatus.BUSY)
+ {
+ // keep a reference on the current message reference
+ // to handle it next time the browser deliverer is executed
+ current = ref;
+ break;
+ }
+ }
+ catch (Exception e)
+ {
+ log.warn("Exception while browser handled from " + messageQueue + ": " + ref);
+ break;
+ }
+ }
+ }
+
+ }
}
Added: trunk/src/main/org/jboss/messaging/utils/concurrent/BlockingDeque.java
===================================================================
--- trunk/src/main/org/jboss/messaging/utils/concurrent/BlockingDeque.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/utils/concurrent/BlockingDeque.java 2009-06-24 09:52:52 UTC (rev 7450)
@@ -0,0 +1,238 @@
+/*
+ * Written by Doug Lea with assistance from members of JCP JSR-166
+ * Expert Group and released to the public domain, as explained at
+ * http://creativecommons.org/licenses/publicdomain
+ */
+
+package org.jboss.messaging.utils.concurrent; // XXX This belongs in java.util!!! XXX
+import java.util.concurrent.*; // XXX This import goes away XXX
+import java.util.*;
+
+/**
+ * A {@link Deque} that additionally supports operations that wait for
+ * the deque to become non-empty when retrieving an element, and wait
+ * for space to become available in the deque when storing an
+ * element. These methods are summarized in the following table:<p>
+ *
+ * <table BORDER CELLPADDING=3 CELLSPACING=1>
+ * <tr>
+ * <td></td>
+ * <td ALIGN=CENTER COLSPAN = 2> <b>First Element (Head)</b></td>
+ * <td ALIGN=CENTER COLSPAN = 2> <b>Last Element (Tail)</b></td>
+ * </tr>
+ * <tr>
+ * <td></td>
+ * <td ALIGN=CENTER><em>Block</em></td>
+ * <td ALIGN=CENTER><em>Time out</em></td>
+ * <td ALIGN=CENTER><em>Block</em></td>
+ * <td ALIGN=CENTER><em>Time out</em></td>
+ * </tr>
+ * <tr>
+ * <td><b>Insert</b></td>
+ * <td>{@link #putFirst putFirst(e)}</td>
+ * <td>{@link #offerFirst(Object, long, TimeUnit) offerFirst(e, time, unit)}</td>
+ * <td>{@link #putLast putLast(e)}</td>
+ * <td>{@link #offerLast(Object, long, TimeUnit) offerLast(e, time, unit)}</td>
+ * </tr>
+ * <tr>
+ * <td><b>Remove</b></td>
+ * <td>{@link #takeFirst takeFirst()}</td>
+ * <td>{@link #pollFirst(long, TimeUnit) pollFirst(time, unit)}</td>
+ * <td>{@link #takeLast takeLast()}</td>
+ * <td>{@link #pollLast(long, TimeUnit) pollLast(time, unit)}</td>
+ * </tr>
+ * </table>
+ *
+ * <p>Like any {@link BlockingQueue}, a <tt>BlockingDeque</tt> is
+ * thread safe and may (or may not) be capacity-constrained. A
+ * <tt>BlockingDeque</tt> implementation may be used directly as a
+ * FIFO <tt>BlockingQueue</tt>. The blocking methods inherited from
+ * the <tt>BlockingQueue</tt> interface are precisely equivalent to
+ * <tt>BlockingDeque</tt> methods as indicated in the following table:<p>
+ *
+ * <table BORDER CELLPADDING=3 CELLSPACING=1>
+ * <tr>
+ * <td ALIGN=CENTER> <b><tt>BlockingQueue</tt> Method</b></td>
+ * <td ALIGN=CENTER> <b>Equivalent <tt>BlockingDeque</tt> Method</b></td>
+ * </tr>
+ * <tr>
+ * <tr>
+ * <td>{@link java.util.concurrent.BlockingQueue#put put(e)}</td>
+ * <td>{@link #putLast putLast(e)}</td>
+ * </tr>
+ * <tr>
+ * <td>{@link java.util.concurrent.BlockingQueue#take take()}</td>
+ * <td>{@link #takeFirst takeFirst()}</td>
+ * </tr>
+ * <tr>
+ * <td>{@link java.util.concurrent.BlockingQueue#offer(Object, long, TimeUnit) offer(e, time. unit)}</td>
+ * <td>{@link #offerLast(Object, long, TimeUnit) offerLast(e, time, unit)}</td>
+ * </tr>
+ * <tr>
+ * <td>{@link java.util.concurrent.BlockingQueue#poll(long, TimeUnit) poll(time, unit)}</td>
+ * <td>{@link #pollFirst(long, TimeUnit) pollFirst(time, unit)}</td>
+ * </tr>
+ * </table>
+ *
+ *
+ * <p>This interface is a member of the
+ * <a href="{@docRoot}/../guide/collections/index.html">
+ * Java Collections Framework</a>.
+ *
+ * @since 1.6
+ * @author Doug Lea
+ * @param <E> the type of elements held in this collection
+ */
+public interface BlockingDeque<E> extends Deque<E>, BlockingQueue<E> {
+
+ /**
+ * Adds the specified element as the first element of this deque,
+ * waiting if necessary for space to become available.
+ * @param o the element to add
+ * @throws InterruptedException if interrupted while waiting.
+ * @throws NullPointerException if the specified element is <tt>null</tt>.
+ */
+ void putFirst(E o) throws InterruptedException;
+
+ /**
+ * Adds the specified element as the last element of this deque,
+ * waiting if necessary for space to become available.
+ * @param o the element to add
+ * @throws InterruptedException if interrupted while waiting.
+ * @throws NullPointerException if the specified element is <tt>null</tt>.
+ */
+ void putLast(E o) throws InterruptedException;
+
+ /**
+ * Retrieves and removes the first element of this deque, waiting
+ * if no elements are present on this deque.
+ * @return the head of this deque
+ * @throws InterruptedException if interrupted while waiting.
+ */
+ E takeFirst() throws InterruptedException;
+
+ /**
+ * Retrieves and removes the last element of this deque, waiting
+ * if no elements are present on this deque.
+ * @return the head of this deque
+ * @throws InterruptedException if interrupted while waiting.
+ */
+ E takeLast() throws InterruptedException;
+
+ /**
+ * Inserts the specified element as the first element of this deque,
+ * waiting if necessary up to the specified wait time for space to
+ * become available.
+ * @param o the element to add
+ * @param timeout how long to wait before giving up, in units of
+ * <tt>unit</tt>
+ * @param unit a <tt>TimeUnit</tt> determining how to interpret the
+ * <tt>timeout</tt> parameter
+ * @return <tt>true</tt> if successful, or <tt>false</tt> if
+ * the specified waiting time elapses before space is available.
+ * @throws InterruptedException if interrupted while waiting.
+ * @throws NullPointerException if the specified element is <tt>null</tt>.
+ */
+ boolean offerFirst(E o, long timeout, TimeUnit unit)
+ throws InterruptedException;
+
+ /**
+ * Inserts the specified element as the last element of this deque,
+ * waiting if necessary up to the specified wait time for space to
+ * become available.
+ * @param o the element to add
+ * @param timeout how long to wait before giving up, in units of
+ * <tt>unit</tt>
+ * @param unit a <tt>TimeUnit</tt> determining how to interpret the
+ * <tt>timeout</tt> parameter
+ * @return <tt>true</tt> if successful, or <tt>false</tt> if
+ * the specified waiting time elapses before space is available.
+ * @throws InterruptedException if interrupted while waiting.
+ * @throws NullPointerException if the specified element is <tt>null</tt>.
+ */
+ boolean offerLast(E o, long timeout, TimeUnit unit)
+ throws InterruptedException;
+
+
+ /**
+ * Retrieves and removes the first element of this deque, waiting
+ * if necessary up to the specified wait time if no elements are
+ * present on this deque.
+ * @param timeout how long to wait before giving up, in units of
+ * <tt>unit</tt>
+ * @param unit a <tt>TimeUnit</tt> determining how to interpret the
+ * <tt>timeout</tt> parameter
+ * @return the head of this deque, or <tt>null</tt> if the
+ * specified waiting time elapses before an element is present.
+ * @throws InterruptedException if interrupted while waiting.
+ */
+ E pollFirst(long timeout, TimeUnit unit)
+ throws InterruptedException;
+
+ /**
+ * Retrieves and removes the last element of this deque, waiting
+ * if necessary up to the specified wait time if no elements are
+ * present on this deque.
+ * @param timeout how long to wait before giving up, in units of
+ * <tt>unit</tt>
+ * @param unit a <tt>TimeUnit</tt> determining how to interpret the
+ * <tt>timeout</tt> parameter
+ * @return the head of this deque, or <tt>null</tt> if the
+ * specified waiting time elapses before an element is present.
+ * @throws InterruptedException if interrupted while waiting.
+ */
+ E pollLast(long timeout, TimeUnit unit)
+ throws InterruptedException;
+
+ /**
+ * Adds the specified element as the last element of this deque,
+ * waiting if necessary for space to become available. This
+ * method is equivalent to to putLast
+ * @param o the element to add
+ * @throws InterruptedException if interrupted while waiting.
+ * @throws NullPointerException if the specified element is <tt>null</tt>.
+ */
+ void put(E o) throws InterruptedException;
+
+ /**
+ * Inserts the specified element as the lest element of this
+ * deque, if possible. When using deques that may impose
+ * insertion restrictions (for example capacity bounds), method
+ * <tt>offer</tt> is generally preferable to method {@link
+ * Collection#add}, which can fail to insert an element only by
+ * throwing an exception. This method is equivalent to to
+ * offerLast
+ *
+ * @param o the element to add.
+ * @return <tt>true</tt> if it was possible to add the element to
+ * this deque, else <tt>false</tt>
+ * @throws NullPointerException if the specified element is <tt>null</tt>
+ */
+ boolean offer(E o, long timeout, TimeUnit unit)
+ throws InterruptedException;
+
+ /**
+ * Retrieves and removes the first element of this deque, waiting
+ * if no elements are present on this deque.
+ * This method is equivalent to to takeFirst
+ * @return the head of this deque
+ * @throws InterruptedException if interrupted while waiting.
+ */
+ E take() throws InterruptedException;
+
+ /**
+ * Retrieves and removes the first element of this deque, waiting
+ * if necessary up to the specified wait time if no elements are
+ * present on this deque. This method is equivalent to to
+ * pollFirst
+ * @param timeout how long to wait before giving up, in units of
+ * <tt>unit</tt>
+ * @param unit a <tt>TimeUnit</tt> determining how to interpret the
+ * <tt>timeout</tt> parameter
+ * @return the head of this deque, or <tt>null</tt> if the
+ * specified waiting time elapses before an element is present.
+ * @throws InterruptedException if interrupted while waiting.
+ */
+ E poll(long timeout, TimeUnit unit)
+ throws InterruptedException;
+}
Added: trunk/src/main/org/jboss/messaging/utils/concurrent/Deque.java
===================================================================
--- trunk/src/main/org/jboss/messaging/utils/concurrent/Deque.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/utils/concurrent/Deque.java 2009-06-24 09:52:52 UTC (rev 7450)
@@ -0,0 +1,442 @@
+/*
+ * Written by Doug Lea and Josh Bloch with assistance from members of
+ * JCP JSR-166 Expert Group and released to the public domain, as explained
+ * at http://creativecommons.org/licenses/publicdomain
+ */
+
+package org.jboss.messaging.utils.concurrent; // XXX This belongs in java.util!!! XXX
+import java.util.*; // XXX This import goes away XXX
+
+/**
+ * A linear collection that supports element insertion and removal at
+ * both ends. The name <i>deque</i> is short for "double ended queue"
+ * and is usually pronounced "deck". Most <tt>Deque</tt>
+ * implementations place no fixed limits on the number of elements
+ * they may contain, but this interface supports capacity-restricted
+ * deques as well as those with no fixed size limit.
+ *
+ * <p>This interface defines methods to access the elements at both
+ * ends of the deque. Methods are provided to insert, remove, and
+ * examine the element. Each of these methods exists in two forms:
+ * one throws an exception if the operation fails, the other returns a
+ * special value (either <tt>null</tt> or <tt>false</tt>, depending on
+ * the operation). The latter form of the insert operation is
+ * designed specifically for use with capacity-restricted
+ * <tt>Deque</tt> implementations; in most implementations, insert
+ * operations cannot fail.
+ *
+ * <p>The twelve methods described above are are summarized in the
+ * follwoing table:<p>
+ *
+ * <table BORDER CELLPADDING=3 CELLSPACING=1>
+ * <tr>
+ * <td></td>
+ * <td ALIGN=CENTER COLSPAN = 2> <b>First Element (Head)</b></td>
+ * <td ALIGN=CENTER COLSPAN = 2> <b>Last Element (Tail)</b></td>
+ * </tr>
+ * <tr>
+ * <td></td>
+ * <td ALIGN=CENTER><em>Throws exception</em></td>
+ * <td ALIGN=CENTER><em>Returns special value</em></td>
+ * <td ALIGN=CENTER><em>Throws exception</em></td>
+ * <td ALIGN=CENTER><em>Returns special value</em></td>
+ * </tr>
+ * <tr>
+ * <td><b>Insert</b></td>
+ * <td>{@link #addFirst addFirst(e)}</td>
+ * <td>{@link #offerFirst offerFirst(e)}</td>
+ * <td>{@link #addLast addLast(e)}</td>
+ * <td>{@link #offerLast offerLast(e)}</td>
+ * </tr>
+ * <tr>
+ * <td><b>Remove</b></td>
+ * <td>{@link #removeFirst removeFirst()}</td>
+ * <td>{@link #pollFirst pollFirst()}</td>
+ * <td>{@link #removeLast removeLast()}</td>
+ * <td>{@link #pollLast pollLast()}</td>
+ * </tr>
+ * <tr>
+ * <td><b>Examine</b></td>
+ * <td>{@link #getFirst getFirst()}</td>
+ * <td>{@link #peekFirst peekFirst()}</td>
+ * <td>{@link #getLast getLast()}</td>
+ * <td>{@link #peekLast peekLast()}</td>
+ * </tr>
+ * </table>
+ *
+ * <p>This interface extends the {@link Queue} interface. When a deque is
+ * used as a queue, FIFO (First-In-First-Out) behavior results. Elements are
+ * added to the end of the deque and removed from the beginning. The methods
+ * inherited from the <tt>Queue</tt> interface are precisely equivalent to
+ * <tt>Deque</tt> methods as indicated in the following table:<p>
+ *
+ * <table BORDER CELLPADDING=3 CELLSPACING=1>
+ * <tr>
+ * <td ALIGN=CENTER> <b><tt>Queue</tt> Method</b></td>
+ * <td ALIGN=CENTER> <b>Equivalent <tt>Deque</tt> Method</b></td>
+ * </tr>
+ * <tr>
+ * <tr>
+ * <td>{@link java.util.Queue#offer offer(e)}</td>
+ * <td>{@link #offerLast offerLast(e)}</td>
+ * </tr>
+ * <tr>
+ * <td>{@link java.util.Queue#add add(e)}</td>
+ * <td>{@link #addLast addLast(e)}</td>
+ * </tr>
+ * <tr>
+ * <td>{@link java.util.Queue#poll poll()}</td>
+ * <td>{@link #pollFirst pollFirst()}</td>
+ * </tr>
+ * <tr>
+ * <td>{@link java.util.Queue#remove remove()}</td>
+ * <td>{@link #removeFirst removeFirst()}</td>
+ * </tr>
+ * <tr>
+ * <td>{@link java.util.Queue#peek peek()}</td>
+ * <td>{@link #peek peekFirst()}</td>
+ * </tr>
+ * <tr>
+ * <td>{@link java.util.Queue#element element()}</td>
+ * <td>{@link #getFirst getFirst()}</td>
+ * </tr>
+ * </table>
+ *
+ * <p>Deques can also be used as LIFO (Last-In-First-Out) stacks. This
+ * interface should be used in preference to the legacy {@link Stack} class.
+ * When a dequeue is used as a stack, elements are pushed and popped from the
+ * beginning of the deque. Stack methods are precisely equivalent to
+ * <tt>Deque</tt> methods as indicated in the table below:<p>
+ *
+ * <table BORDER CELLPADDING=3 CELLSPACING=1>
+ * <tr>
+ * <td ALIGN=CENTER> <b>Stack Method</b></td>
+ * <td ALIGN=CENTER> <b>Equivalent <tt>Deque</tt> Method</b></td>
+ * </tr>
+ * <tr>
+ * <tr>
+ * <td>{@link #push push(e)}</td>
+ * <td>{@link #addFirst addFirst(e)}</td>
+ * </tr>
+ * <tr>
+ * <td>{@link #pop pop()}</td>
+ * <td>{@link #removeFirst removeFirst()}</td>
+ * </tr>
+ * <tr>
+ * <td>{@link #peek peek()}</td>
+ * <td>{@link #peekFirst peekFirst()}</td>
+ * </tr>
+ * </table>
+ *
+ * <p>Note that the {@link #peek peek} method works equally well when
+ * a deque is used as a queue or a stack; in either case, elements are
+ * drawn from the beginning of the deque.
+ *
+ * <p>This inteface provides two methods to to remove interior
+ * elements, {@link #removeFirstOccurrence removeFirstOccurrence} and
+ * {@link #removeLastOccurrence removeLastOccurrence}. Unlike the
+ * {@link List} interface, this interface does not provide support for
+ * indexed access to elements.
+ *
+ * <p>While <tt>Deque</tt> implementations are not strictly required
+ * to prohibit the insertion of null elements, they are strongly
+ * encouraged to do so. Users of any <tt>Deque</tt> implementations
+ * that do allow null elements are strongly encouraged <i>not</i> to
+ * take advantage of the ability to insert nulls. This is so because
+ * <tt>null</tt> is used as a special return value by various methods
+ * to indicated that the deque is empty.
+ *
+ * <p><tt>Deque</tt> implementations generally do not define
+ * element-based versions of the <tt>equals</tt> and <tt>hashCode</tt>
+ * methods, but instead inherit the identity-based versions from class
+ * <tt>Object</tt>.
+ *
+ * <p>This interface is a member of the <a
+ * href="{@docRoot}/../guide/collections/index.html"> Java Collections
+ * Framework</a>.
+ *
+ * @author Doug Lea
+ * @author Josh Bloch
+ * @since 1.6
+ * @param <E> the type of elements held in this collection
+ */
+public interface Deque<E> extends Queue<E> {
+ /**
+ * Inserts the specified element to the front this deque unless it would
+ * violate capacity restrictions. When using a capacity-restricted deque,
+ * this method is generally preferable to method <tt>addFirst</tt>, which
+ * can fail to insert an element only by throwing an exception.
+ *
+ * @param e the element to insert
+ * @return <tt>true</tt> if it was possible to insert the element,
+ * else <tt>false</tt>
+ * @throws NullPointerException if <tt>e</tt> is null and this
+ * deque does not permit null elements
+ */
+ boolean offerFirst(E e);
+
+ /**
+ * Inserts the specified element to the end of this deque unless it would
+ * violate capacity restrictions. When using a capacity-restricted deque,
+ * this method is generally preferable to method <tt>addLast</tt> which
+ * can fail to insert an element only by throwing an exception.
+ *
+ * @param e the element to insert
+ * @return <tt>true</tt> if it was possible to insert the element,
+ * else <tt>false</tt>
+ * @throws NullPointerException if <tt>e</tt> is null and this
+ * deque does not permit null elements
+ */
+ boolean offerLast(E e);
+
+ /**
+ * Inserts the specified element to the front of this deque unless it
+ * would violate capacity restrictions.
+ *
+ * @param e the element to insert
+ * @throws IllegalStateException if it was not possible to insert
+ * the element due to capacity restrictions
+ * @throws NullPointerException if <tt>e</tt> is null and this
+ * deque does not permit null elements
+ */
+ void addFirst(E e);
+
+ /**
+ * Inserts the specified element to the end of this deque unless it would
+ * violate capacity restrictions.
+ *
+ * @param e the element to insert
+ * @throws IllegalStateException if it was not possible to insert
+ * the element due to capacity restrictions
+ * @throws NullPointerException if <tt>e</tt> is null and this
+ * deque does not permit null elements
+ */
+ void addLast(E e);
+
+ /**
+ * Retrieves and removes the first element of this deque, or
+ * <tt>null</tt> if this deque is empty.
+ *
+ * @return the first element of this deque, or <tt>null</tt> if
+ * this deque is empty
+ */
+ E pollFirst();
+
+ /**
+ * Retrieves and removes the last element of this deque, or
+ * <tt>null</tt> if this deque is empty.
+ *
+ * @return the last element of this deque, or <tt>null</tt> if
+ * this deque is empty
+ */
+ E pollLast();
+
+ /**
+ * Removes and returns the first element of this deque. This method
+ * differs from the <tt>pollFirst</tt> method only in that it throws an
+ * exception if this deque is empty.
+ *
+ * @return the first element of this deque
+ * @throws NoSuchElementException if this deque is empty
+ */
+ E removeFirst();
+
+ /**
+ * Retrieves and removes the last element of this deque. This method
+ * differs from the <tt>pollLast</tt> method only in that it throws an
+ * exception if this deque is empty.
+ *
+ * @return the last element of this deque
+ * @throws NoSuchElementException if this deque is empty
+ */
+ E removeLast();
+
+ /**
+ * Retrieves, but does not remove, the first element of this deque,
+ * returning <tt>null</tt> if this deque is empty.
+ *
+ * @return the first element of this deque, or <tt>null</tt> if
+ * this deque is empty
+ */
+ E peekFirst();
+
+ /**
+ * Retrieves, but does not remove, the last element of this deque,
+ * returning <tt>null</tt> if this deque is empty.
+ *
+ * @return the last element of this deque, or <tt>null</tt> if this deque
+ * is empty
+ */
+ E peekLast();
+
+ /**
+ * Retrieves, but does not remove, the first element of this
+ * deque. This method differs from the <tt>peek</tt> method only
+ * in that it throws an exception if this deque is empty.
+ *
+ * @return the first element of this deque
+ * @throws NoSuchElementException if this deque is empty
+ */
+ E getFirst();
+
+ /**
+ * Retrieves, but does not remove, the last element of this
+ * deque. This method differs from the <tt>peek</tt> method only
+ * in that it throws an exception if this deque is empty.
+ *
+ * @return the last element of this deque
+ * @throws NoSuchElementException if this deque is empty
+ */
+ E getLast();
+
+ /**
+ * Removes the first occurrence of the specified element in this
+ * deque. If the deque does not contain the element, it is
+ * unchanged. More formally, removes the first element <tt>e</tt>
+ * such that <tt>(o==null ? e==null : o.equals(e))</tt> (if
+ * such an element exists).
+ *
+ * @param e element to be removed from this deque, if present
+ * @return <tt>true</tt> if the deque contained the specified element
+ * @throws NullPointerException if the specified element is <tt>null</tt>
+ */
+ boolean removeFirstOccurrence(Object e);
+
+ /**
+ * Removes the last occurrence of the specified element in this
+ * deque. If the deque does not contain the element, it is
+ * unchanged. More formally, removes the last element <tt>e</tt>
+ * such that <tt>(o==null ? e==null : o.equals(e))</tt> (if
+ * such an element exists).
+ *
+ * @param e element to be removed from this deque, if present
+ * @return <tt>true</tt> if the deque contained the specified element
+ * @throws NullPointerException if the specified element is <tt>null</tt>
+ */
+ boolean removeLastOccurrence(Object e);
+
+
+ // *** Queue methods ***
+
+ /**
+ * Inserts the specified element into the queue represented by this deque
+ * unless it would violate capacity restrictions. In other words, inserts
+ * the specified element to the end of this deque. When using a
+ * capacity-restricted deque, this method is generally preferable to the
+ * {@link #add} method, which can fail to insert an element only by
+ * throwing an exception.
+ *
+ * <p>This method is equivalent to {@link #offerLast}.
+ *
+ * @param e the element to insert
+ * @return <tt>true</tt> if it was possible to insert the element,
+ * else <tt>false</tt>
+ * @throws NullPointerException if <tt>e</tt> is null and this
+ * deque does not permit null elements
+ */
+ boolean offer(E e);
+
+ /**
+ * Inserts the specified element into the queue represented by this
+ * deque unless it would violate capacity restrictions. In other words,
+ * inserts the specified element as the last element of this deque.
+ *
+ * <p>This method is equivalent to {@link #addLast}.
+ *
+ * @param e the element to insert
+ * @return <tt>true</tt> (as per the spec for {@link Collection#add})
+ * @throws IllegalStateException if it was not possible to insert
+ * the element due to capacity restrictions
+ * @throws NullPointerException if <tt>e</tt> is null and this
+ * deque does not permit null elements
+ */
+ boolean add(E e);
+
+ /**
+ * Retrieves and removes the head of the queue represented by
+ * this deque, or <tt>null</tt> if this deque is empty. In other words,
+ * retrieves and removes the first element of this deque, or <tt>null</tt>
+ * if this deque is empty.
+ *
+ * <p>This method is equivalent to {@link #pollFirst()}.
+ *
+ * @return the first element of this deque, or <tt>null</tt> if
+ * this deque is empty
+ */
+ E poll();
+
+ /**
+ * Retrieves and removes the head of the queue represented by this deque.
+ * This method differs from the <tt>poll</tt> method only in that it
+ * throws an exception if this deque is empty.
+ *
+ * <p>This method is equivalent to {@link #removeFirst()}.
+ *
+ * @return the head of the queue represented by this deque
+ * @throws NoSuchElementException if this deque is empty
+ */
+ E remove();
+
+ /**
+ * Retrieves, but does not remove, the head of the queue represented by
+ * this deque, returning <tt>null</tt> if this deque is empty.
+ *
+ * <p>This method is equivalent to {@link #peekFirst()}
+ *
+ * @return the head of the queue represented by this deque, or
+ * <tt>null</tt> if this deque is empty
+ */
+ E peek();
+
+ /**
+ * Retrieves, but does not remove, the head of the queue represented by
+ * this deque. This method differs from the <tt>peek</tt> method only in
+ * that it throws an exception if this deque is empty.
+ *
+ * <p>This method is equivalent to {@link #getFirst()}
+ *
+ * @return the head of the queue represented by this deque
+ * @throws NoSuchElementException if this deque is empty
+ */
+ E element();
+
+
+ // *** Stack methods ***
+
+ /**
+ * Pushes an element onto the stack represented by this deque. In other
+ * words, inserts the element to the front this deque unless it would
+ * violate capacity restrictions.
+ *
+ * <p>This method is equivalent to {@link #addFirst}.
+ *
+ * @throws IllegalStateException if it was not possible to insert
+ * the element due to capacity restrictions
+ * @throws NullPointerException if <tt>e</tt> is null and this
+ * deque does not permit null elements
+ */
+ void push(E e);
+
+ /**
+ * Pops an element from the stack represented by this deque. In other
+ * words, removes and returns the the first element of this deque.
+ *
+ * <p>This method is equivalent to {@link #removeFirst()}.
+ *
+ * @return the element at the front of this deque (which is the top
+ * of the stack represented by this deque)
+ * @throws NoSuchElementException if this deque is empty
+ */
+ E pop();
+
+
+ // *** Collection Method ***
+
+ /**
+ * Returns an iterator over the elements in this deque. The elements
+ * will be ordered from first (head) to last (tail).
+ *
+ * @return an <tt>Iterator</tt> over the elements in this deque
+ */
+ Iterator<E> iterator();
+}
Added: trunk/src/main/org/jboss/messaging/utils/concurrent/LinkedBlockingDeque.java
===================================================================
--- trunk/src/main/org/jboss/messaging/utils/concurrent/LinkedBlockingDeque.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/utils/concurrent/LinkedBlockingDeque.java 2009-06-24 09:52:52 UTC (rev 7450)
@@ -0,0 +1,762 @@
+/*
+ * Written by Doug Lea with assistance from members of JCP JSR-166
+ * Expert Group and released to the public domain, as explained at
+ * http://creativecommons.org/licenses/publicdomain
+ */
+
+package org.jboss.messaging.utils.concurrent;
+
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.locks.*;
+
+/**
+ * An optionally-bounded {@linkplain BlockingDeque blocking deque} based on
+ * linked nodes.
+ *
+ * <p> The optional capacity bound constructor argument serves as a
+ * way to prevent excessive expansion. The capacity, if unspecified,
+ * is equal to {@link Integer#MAX_VALUE}. Linked nodes are
+ * dynamically created upon each insertion unless this would bring the
+ * deque above capacity.
+ *
+ * <p>Most operations run in constant time (ignoring time spent
+ * blocking). Exceptions include {@link #remove(Object) remove},
+ * {@link #removeFirstOccurrence removeFirstOccurrence}, {@link
+ * #removeLastOccurrence removeLastOccurrence}, {@link #contains
+ * contains }, {@link #iterator iterator.remove()}, and the bulk
+ * operations, all of which run in linear time.
+ *
+ * <p>This class and its iterator implement all of the
+ * <em>optional</em> methods of the {@link Collection} and {@link
+ * Iterator} interfaces. This class is a member of the <a
+ * href="{@docRoot}/../guide/collections/index.html"> Java Collections
+ * Framework</a>.
+ *
+ * @since 1.6
+ * @author Doug Lea
+ * @param <E> the type of elements held in this collection
+ */
+public class LinkedBlockingDeque<E>
+ extends AbstractQueue<E>
+ implements BlockingDeque<E>, java.io.Serializable {
+
+ /*
+ * Implemented as a simple doubly-linked list protected by a
+ * single lock and using conditions to manage blocking.
+ */
+
+ private static final long serialVersionUID = -387911632671998426L;
+
+ /** Doubly-linked list node class */
+ static final class Node<E> {
+ E item;
+ Node<E> prev;
+ Node<E> next;
+ Node(E x, Node<E> p, Node<E> n) {
+ item = x;
+ prev = p;
+ next = n;
+ }
+ }
+
+ /** Pointer to first node */
+ private transient Node<E> first;
+ /** Pointer to last node */
+ private transient Node<E> last;
+ /** Number of items in the deque */
+ private transient int count;
+ /** Maximum number of items in the deque */
+ private final int capacity;
+ /** Main lock guarding all access */
+ private final ReentrantLock lock = new ReentrantLock();
+ /** Condition for waiting takes */
+ private final Condition notEmpty = lock.newCondition();
+ /** Condition for waiting puts */
+ private final Condition notFull = lock.newCondition();
+
+ /**
+ * Creates a <tt>LinkedBlockingDeque</tt> with a capacity of
+ * {@link Integer#MAX_VALUE}.
+ */
+ public LinkedBlockingDeque() {
+ this(Integer.MAX_VALUE);
+ }
+
+ /**
+ * Creates a <tt>LinkedBlockingDeque</tt> with the given (fixed)
+ * capacity.
+ * @param capacity the capacity of this deque
+ * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1
+ */
+ public LinkedBlockingDeque(int capacity) {
+ if (capacity <= 0) throw new IllegalArgumentException();
+ this.capacity = capacity;
+ }
+
+ /**
+ * Creates a <tt>LinkedBlockingDeque</tt> with a capacity of
+ * {@link Integer#MAX_VALUE}, initially containing the elements of the
+ * given collection,
+ * added in traversal order of the collection's iterator.
+ * @param c the collection of elements to initially contain
+ * @throws NullPointerException if <tt>c</tt> or any element within it
+ * is <tt>null</tt>
+ */
+ public LinkedBlockingDeque(Collection<? extends E> c) {
+ this(Integer.MAX_VALUE);
+ for (E e : c)
+ add(e);
+ }
+
+
+ // Basic linking and unlinking operations, called only while holding lock
+
+ /**
+ * Link e as first element, or return false if full
+ */
+ private boolean linkFirst(E e) {
+ if (count >= capacity)
+ return false;
+ ++count;
+ Node<E> f = first;
+ Node<E> x = new Node<E>(e, null, f);
+ first = x;
+ if (last == null)
+ last = x;
+ else
+ f.prev = x;
+ notEmpty.signal();
+ return true;
+ }
+
+ /**
+ * Link e as last element, or return false if full
+ */
+ private boolean linkLast(E e) {
+ if (count >= capacity)
+ return false;
+ ++count;
+ Node<E> l = last;
+ Node<E> x = new Node<E>(e, l, null);
+ last = x;
+ if (first == null)
+ first = x;
+ else
+ l.next = x;
+ notEmpty.signal();
+ return true;
+ }
+
+ /**
+ * Remove and return first element, or null if empty
+ */
+ private E unlinkFirst() {
+ Node<E> f = first;
+ if (f == null)
+ return null;
+ Node<E> n = f.next;
+ first = n;
+ if (n == null)
+ last = null;
+ else
+ n.prev = null;
+ --count;
+ notFull.signal();
+ return f.item;
+ }
+
+ /**
+ * Remove and return last element, or null if empty
+ */
+ private E unlinkLast() {
+ Node<E> l = last;
+ if (l == null)
+ return null;
+ Node<E> p = l.prev;
+ last = p;
+ if (p == null)
+ first = null;
+ else
+ p.next = null;
+ --count;
+ notFull.signal();
+ return l.item;
+ }
+
+ /**
+ * Unlink e
+ */
+ private void unlink(Node<E> x) {
+ Node<E> p = x.prev;
+ Node<E> n = x.next;
+ if (p == null) {
+ if (n == null)
+ first = last = null;
+ else {
+ n.prev = null;
+ first = n;
+ }
+ } else if (n == null) {
+ p.next = null;
+ last = p;
+ } else {
+ p.next = n;
+ n.prev = p;
+ }
+ --count;
+ notFull.signalAll();
+ }
+
+ // Deque methods
+
+ public boolean offerFirst(E o) {
+ if (o == null) throw new NullPointerException();
+ lock.lock();
+ try {
+ return linkFirst(o);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public boolean offerLast(E o) {
+ if (o == null) throw new NullPointerException();
+ lock.lock();
+ try {
+ return linkLast(o);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public void addFirst(E e) {
+ if (!offerFirst(e))
+ throw new IllegalStateException("Deque full");
+ }
+
+ public void addLast(E e) {
+ if (!offerLast(e))
+ throw new IllegalStateException("Deque full");
+ }
+
+ public E pollFirst() {
+ lock.lock();
+ try {
+ return unlinkFirst();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public E pollLast() {
+ lock.lock();
+ try {
+ return unlinkLast();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public E removeFirst() {
+ E x = pollFirst();
+ if (x == null) throw new NoSuchElementException();
+ return x;
+ }
+
+ public E removeLast() {
+ E x = pollLast();
+ if (x == null) throw new NoSuchElementException();
+ return x;
+ }
+
+ public E peekFirst() {
+ lock.lock();
+ try {
+ return (first == null) ? null : first.item;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public E peekLast() {
+ lock.lock();
+ try {
+ return (last == null) ? null : last.item;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public E getFirst() {
+ E x = peekFirst();
+ if (x == null) throw new NoSuchElementException();
+ return x;
+ }
+
+ public E getLast() {
+ E x = peekLast();
+ if (x == null) throw new NoSuchElementException();
+ return x;
+ }
+
+ // BlockingDeque methods
+
+ public void putFirst(E o) throws InterruptedException {
+ if (o == null) throw new NullPointerException();
+ lock.lock();
+ try {
+ while (!linkFirst(o))
+ notFull.await();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public void putLast(E o) throws InterruptedException {
+ if (o == null) throw new NullPointerException();
+ lock.lock();
+ try {
+ while (!linkLast(o))
+ notFull.await();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public E takeFirst() throws InterruptedException {
+ lock.lock();
+ try {
+ E x;
+ while ( (x = unlinkFirst()) == null)
+ notEmpty.await();
+ return x;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public E takeLast() throws InterruptedException {
+ lock.lock();
+ try {
+ E x;
+ while ( (x = unlinkLast()) == null)
+ notEmpty.await();
+ return x;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public boolean offerFirst(E o, long timeout, TimeUnit unit)
+ throws InterruptedException {
+ if (o == null) throw new NullPointerException();
+ lock.lockInterruptibly();
+ try {
+ long nanos = unit.toNanos(timeout);
+ for (;;) {
+ if (linkFirst(o))
+ return true;
+ if (nanos <= 0)
+ return false;
+ nanos = notFull.awaitNanos(nanos);
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public boolean offerLast(E o, long timeout, TimeUnit unit)
+ throws InterruptedException {
+ if (o == null) throw new NullPointerException();
+ lock.lockInterruptibly();
+ try {
+ long nanos = unit.toNanos(timeout);
+ for (;;) {
+ if (linkLast(o))
+ return true;
+ if (nanos <= 0)
+ return false;
+ nanos = notFull.awaitNanos(nanos);
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public E pollFirst(long timeout, TimeUnit unit)
+ throws InterruptedException {
+ lock.lockInterruptibly();
+ try {
+ long nanos = unit.toNanos(timeout);
+ for (;;) {
+ E x = unlinkFirst();
+ if (x != null)
+ return x;
+ if (nanos <= 0)
+ return null;
+ nanos = notEmpty.awaitNanos(nanos);
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public E pollLast(long timeout, TimeUnit unit)
+ throws InterruptedException {
+ lock.lockInterruptibly();
+ try {
+ long nanos = unit.toNanos(timeout);
+ for (;;) {
+ E x = unlinkLast();
+ if (x != null)
+ return x;
+ if (nanos <= 0)
+ return null;
+ nanos = notEmpty.awaitNanos(nanos);
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ // Queue and stack methods
+
+ public boolean offer(E e) { return offerLast(e); }
+ public boolean add(E e) { addLast(e); return true; }
+ public void push(E e) { addFirst(e); }
+ public E poll() { return pollFirst(); }
+ public E remove() { return removeFirst(); }
+ public E pop() { return removeFirst(); }
+ public E peek() { return peekFirst(); }
+ public E element() { return getFirst(); }
+ public boolean remove(Object o) { return removeFirstOccurrence(o); }
+
+ // BlockingQueue methods
+
+ public void put(E o) throws InterruptedException { putLast(o); }
+ public E take() throws InterruptedException { return takeFirst(); }
+ public boolean offer(E o, long timeout, TimeUnit unit)
+ throws InterruptedException { return offerLast(o, timeout, unit); }
+ public E poll(long timeout, TimeUnit unit)
+ throws InterruptedException { return pollFirst(timeout, unit); }
+
+ /**
+ * Returns the number of elements in this deque.
+ *
+ * @return the number of elements in this deque.
+ */
+ public int size() {
+ lock.lock();
+ try {
+ return count;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Returns the number of elements that this deque can ideally (in
+ * the absence of memory or resource constraints) accept without
+ * blocking. This is always equal to the initial capacity of this deque
+ * less the current <tt>size</tt> of this deque.
+ * <p>Note that you <em>cannot</em> always tell if
+ * an attempt to <tt>add</tt> an element will succeed by
+ * inspecting <tt>remainingCapacity</tt> because it may be the
+ * case that a waiting consumer is ready to <tt>take</tt> an
+ * element out of an otherwise full deque.
+ */
+ public int remainingCapacity() {
+ lock.lock();
+ try {
+ return capacity - count;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public boolean contains(Object o) {
+ if (o == null) return false;
+ lock.lock();
+ try {
+ for (Node<E> p = first; p != null; p = p.next)
+ if (o.equals(p.item))
+ return true;
+ return false;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public boolean removeFirstOccurrence(Object e) {
+ if (e == null) throw new NullPointerException();
+ lock.lock();
+ try {
+ for (Node<E> p = first; p != null; p = p.next) {
+ if (e.equals(p.item)) {
+ unlink(p);
+ return true;
+ }
+ }
+ return false;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public boolean removeLastOccurrence(Object e) {
+ if (e == null) throw new NullPointerException();
+ lock.lock();
+ try {
+ for (Node<E> p = last; p != null; p = p.prev) {
+ if (e.equals(p.item)) {
+ unlink(p);
+ return true;
+ }
+ }
+ return false;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Variant of removeFirstOccurrence needed by iterator.remove.
+ * Searches for the node, not its contents.
+ */
+ boolean removeNode(Node<E> e) {
+ lock.lock();
+ try {
+ for (Node<E> p = first; p != null; p = p.next) {
+ if (p == e) {
+ unlink(p);
+ return true;
+ }
+ }
+ return false;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public Object[] toArray() {
+ lock.lock();
+ try {
+ Object[] a = new Object[count];
+ int k = 0;
+ for (Node<E> p = first; p != null; p = p.next)
+ a[k++] = p.item;
+ return a;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public <T> T[] toArray(T[] a) {
+ lock.lock();
+ try {
+ if (a.length < count)
+ a = (T[])java.lang.reflect.Array.newInstance(
+ a.getClass().getComponentType(),
+ count
+ );
+
+ int k = 0;
+ for (Node<E> p = first; p != null; p = p.next)
+ a[k++] = (T)p.item;
+ if (a.length > k)
+ a[k] = null;
+ return a;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public String toString() {
+ lock.lock();
+ try {
+ return super.toString();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Atomically removes all of the elements from this deque.
+ * The deque will be empty after this call returns.
+ */
+ public void clear() {
+ lock.lock();
+ try {
+ first = last = null;
+ count = 0;
+ notFull.signalAll();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public int drainTo(Collection<? super E> c) {
+ if (c == null)
+ throw new NullPointerException();
+ if (c == this)
+ throw new IllegalArgumentException();
+ lock.lock();
+ try {
+ for (Node<E> p = first; p != null; p = p.next)
+ c.add(p.item);
+ int n = count;
+ count = 0;
+ first = last = null;
+ notFull.signalAll();
+ return n;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public int drainTo(Collection<? super E> c, int maxElements) {
+ if (c == null)
+ throw new NullPointerException();
+ if (c == this)
+ throw new IllegalArgumentException();
+ lock.lock();
+ try {
+ int n = 0;
+ while (n < maxElements && first != null) {
+ c.add(first.item);
+ first.prev = null;
+ first = first.next;
+ --count;
+ ++n;
+ }
+ if (first == null)
+ last = null;
+ notFull.signalAll();
+ return n;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Returns an iterator over the elements in this deque in proper sequence.
+ * The returned <tt>Iterator</tt> is a "weakly consistent" iterator that
+ * will never throw {@link java.util.ConcurrentModificationException},
+ * and guarantees to traverse elements as they existed upon
+ * construction of the iterator, and may (but is not guaranteed to)
+ * reflect any modifications subsequent to construction.
+ *
+ * @return an iterator over the elements in this deque in proper sequence.
+ */
+ public Iterator<E> iterator() {
+ return new Itr();
+ }
+
+ /**
+ * Iterator for LinkedBlockingDeque
+ */
+ private class Itr implements Iterator<E> {
+ private Node<E> next;
+
+ /**
+ * nextItem holds on to item fields because once we claim that
+ * an element exists in hasNext(), we must return item read
+ * under lock (in advance()) even if it was in the process of
+ * being removed when hasNext() was called.
+ **/
+ private E nextItem;
+
+ /**
+ * Node returned by most recent call to next. Needed by remove.
+ * Reset to null if this element is deleted by a call to remove.
+ */
+ private Node<E> last;
+
+ Itr() {
+ advance();
+ }
+
+ /**
+ * Advance next, or if not yet initialized, set to first node.
+ */
+ private void advance() {
+ final ReentrantLock lock = LinkedBlockingDeque.this.lock;
+ lock.lock();
+ try {
+ next = (next == null)? first : next.next;
+ nextItem = (next == null)? null : next.item;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public boolean hasNext() {
+ return next != null;
+ }
+
+ public E next() {
+ if (next == null)
+ throw new NoSuchElementException();
+ last = next;
+ E x = nextItem;
+ advance();
+ return x;
+ }
+
+ public void remove() {
+ Node<E> n = last;
+ if (n == null)
+ throw new IllegalStateException();
+ last = null;
+ // Note: removeNode rescans looking for this node to make
+ // sure it was not already removed. Otherwwise, trying to
+ // re-remove could corrupt list.
+ removeNode(n);
+ }
+ }
+
+ /**
+ * Save the state to a stream (that is, serialize it).
+ *
+ * @serialData The capacity (int), followed by elements (each an
+ * <tt>Object</tt>) in the proper order, followed by a null
+ * @param s the stream
+ */
+ private void writeObject(java.io.ObjectOutputStream s)
+ throws java.io.IOException {
+ lock.lock();
+ try {
+ // Write out capacity and any hidden stuff
+ s.defaultWriteObject();
+ // Write out all elements in the proper order.
+ for (Node<E> p = first; p != null; p = p.next)
+ s.writeObject(p.item);
+ // Use trailing null as sentinel
+ s.writeObject(null);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Reconstitute this deque instance from a stream (that is,
+ * deserialize it).
+ * @param s the stream
+ */
+ private void readObject(java.io.ObjectInputStream s)
+ throws java.io.IOException, ClassNotFoundException {
+ s.defaultReadObject();
+ count = 0;
+ first = null;
+ last = null;
+ // Read in all elements and place in queue
+ for (;;) {
+ E item = (E)s.readObject();
+ if (item == null)
+ break;
+ add(item);
+ }
+ }
+
+}
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/BindingImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/BindingImplTest.java 2009-06-24 04:49:11 UTC (rev 7449)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/BindingImplTest.java 2009-06-24 09:52:52 UTC (rev 7450)
@@ -24,6 +24,7 @@
import java.io.InputStream;
import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -1367,6 +1368,11 @@
return null;
}
+
+ public Iterator<MessageReference> iterator()
+ {
+ return null;
+ }
/* (non-Javadoc)
* @see org.jboss.messaging.core.server.Queue#moveReference(long, org.jboss.messaging.utils.SimpleString)
Added: trunk/tests/src/org/jboss/messaging/tests/unit/util/concurrent/LinkedBlockingDequeTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/util/concurrent/LinkedBlockingDequeTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/util/concurrent/LinkedBlockingDequeTest.java 2009-06-24 09:52:52 UTC (rev 7450)
@@ -0,0 +1,278 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.unit.util.concurrent;
+
+import java.util.Iterator;
+
+import org.jboss.messaging.tests.util.UnitTestCase;
+import org.jboss.messaging.utils.concurrent.LinkedBlockingDeque;
+
+/**
+ * A LinkedBlockingDequeTest
+ *
+ * @author <a href="jmesnil at redhat.com>Jeff Mesnil</a>
+ */
+public class LinkedBlockingDequeTest extends UnitTestCase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testAddFirstWhileIterating() throws Exception
+ {
+ LinkedBlockingDeque<String> deque = new LinkedBlockingDeque<String>();
+
+ deque.addFirst("a");
+ deque.addFirst("b");
+
+ Iterator<String> iter = deque.iterator();
+
+ System.out.println(deque);
+
+ assertTrue(iter.hasNext());
+ assertEquals("b", iter.next());
+
+ deque.addFirst("c");
+
+ System.out.println(deque);
+
+ assertTrue(iter.hasNext());
+ assertEquals("a", iter.next());
+
+ assertFalse(iter.hasNext());
+ }
+
+ public void testAddLastWhileIterating() throws Exception
+ {
+ LinkedBlockingDeque<String> deque = new LinkedBlockingDeque<String>();
+
+ deque.addLast("a");
+ deque.addLast("b");
+
+ System.out.println(deque);
+
+ Iterator<String> iter = deque.iterator();
+
+ assertTrue(iter.hasNext());
+ assertEquals("a", iter.next());
+
+ deque.addLast("c");
+
+ System.out.println(deque);
+
+ assertTrue(iter.hasNext());
+ assertEquals("b", iter.next());
+
+ assertTrue(iter.hasNext());
+ assertEquals("c", iter.next());
+
+ assertFalse(iter.hasNext());
+ }
+
+ public void testRemoveFromHeadWhileIterating() throws Exception
+ {
+ LinkedBlockingDeque<String> deque = new LinkedBlockingDeque<String>();
+
+ deque.addLast("a");
+ deque.addLast("b");
+ deque.addLast("c");
+
+ System.out.println(deque);
+
+ Iterator<String> iter = deque.iterator();
+
+ assertTrue(iter.hasNext());
+ assertEquals("a", iter.next());
+
+ assertEquals("a", deque.removeFirst());
+ assertEquals("b", deque.removeFirst());
+
+ System.out.println(deque);
+
+ assertTrue(iter.hasNext());
+ assertEquals("b", iter.next());
+
+ assertTrue(iter.hasNext());
+ assertEquals("c", iter.next());
+
+ assertFalse(iter.hasNext());
+ }
+
+ public void testRemoveFromHeadWhileIterating_2() throws Exception
+ {
+ LinkedBlockingDeque<String> deque = new LinkedBlockingDeque<String>();
+
+ deque.addLast("a");
+ deque.addLast("b");
+ deque.addLast("c");
+
+ System.out.println(deque);
+
+ Iterator<String> iter = deque.iterator();
+
+ assertTrue(iter.hasNext());
+ assertEquals("a", iter.next());
+
+ assertEquals("a", deque.removeFirst());
+ assertEquals("b", deque.removeFirst());
+ assertEquals("c", deque.removeFirst());
+
+ assertEquals(0, deque.size());
+
+ assertTrue(iter.hasNext());
+ assertEquals("b", iter.next());
+
+ assertTrue(iter.hasNext());
+ assertEquals("c", iter.next());
+
+ assertFalse(iter.hasNext());
+ }
+
+ public void testRemoveFromHeadAndAddLastWhileIterating_2() throws Exception
+ {
+ LinkedBlockingDeque<String> deque = new LinkedBlockingDeque<String>();
+
+ deque.addLast("a");
+ deque.addLast("b");
+ deque.addLast("c");
+
+ System.out.println(deque);
+
+ Iterator<String> iter = deque.iterator();
+
+ assertTrue(iter.hasNext());
+ assertEquals("a", iter.next());
+
+ assertEquals("a", deque.removeFirst());
+ assertEquals("b", deque.removeFirst());
+
+ assertEquals(1, deque.size());
+
+ assertTrue(iter.hasNext());
+ assertEquals("b", iter.next());
+
+ assertTrue(iter.hasNext());
+ assertEquals("c", iter.next());
+
+ deque.addLast("d");
+
+ assertFalse(iter.hasNext());
+ }
+
+ public void testRemoveFromHeadAndAddFirstWhileIterating() throws Exception
+ {
+ LinkedBlockingDeque<String> deque = new LinkedBlockingDeque<String>();
+
+ deque.addLast("a");
+ deque.addLast("b");
+ deque.addLast("c");
+
+ System.out.println(deque);
+
+ Iterator<String> iter = deque.iterator();
+
+ assertTrue(iter.hasNext());
+ assertEquals("a", iter.next());
+
+ assertEquals("a", deque.removeFirst());
+ assertEquals("b", deque.removeFirst());
+
+ deque.addFirst("d");
+
+ System.out.println(deque);
+
+ assertEquals(2, deque.size());
+
+ assertTrue(iter.hasNext());
+ assertEquals("b", iter.next());
+
+ assertTrue(iter.hasNext());
+ assertEquals("c", iter.next());
+
+ assertFalse(iter.hasNext());
+ }
+
+ public void test3() throws Exception
+ {
+ LinkedBlockingDeque<String> deque = new LinkedBlockingDeque<String>();
+
+ deque.addLast("a");
+ deque.addLast("b");
+ deque.addLast("c");
+ deque.addLast("d");
+
+ Iterator<String> iter = deque.iterator();
+
+ assertEquals("a", deque.removeFirst());
+ assertEquals("b", deque.removeFirst());
+ assertEquals("c", deque.removeFirst());
+ assertEquals("d", deque.removeFirst());
+
+ deque.addFirst("e");
+
+ assertTrue(iter.hasNext());
+ assertEquals("a", iter.next());
+
+ assertTrue(iter.hasNext());
+ assertEquals("b", iter.next());
+
+ assertTrue(iter.hasNext());
+ assertEquals("c", iter.next());
+
+ assertTrue(iter.hasNext());
+ assertEquals("d", iter.next());
+
+ assertFalse(iter.hasNext());
+ }
+
+ public void test4() throws Exception
+ {
+ LinkedBlockingDeque<String> deque = new LinkedBlockingDeque<String>();
+
+ deque.addLast("a");
+
+ Iterator<String> iter = deque.iterator();
+
+ assertEquals("a", deque.removeFirst());
+ deque.addLast("b");
+
+ assertTrue(iter.hasNext());
+ assertEquals("a", iter.next());
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
More information about the jboss-cvs-commits
mailing list