[jboss-cvs] JBoss Messaging SVN: r4570 - in trunk: tests/src/org/jboss/messaging/tests/unit/core/server/impl and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Jun 24 10:34:18 EDT 2008


Author: ataylor
Date: 2008-06-24 10:34:18 -0400 (Tue, 24 Jun 2008)
New Revision: 4570

Added:
   trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java
Modified:
   trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
Log:
more tests and removed lock optimisation from queueimpl

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2008-06-24 10:36:06 UTC (rev 4569)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2008-06-24 14:34:18 UTC (rev 4570)
@@ -22,37 +22,28 @@
 
 package org.jboss.messaging.core.server.impl;
 
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.LinkedHashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.ListIterator;
-import java.util.Set;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
 import org.jboss.messaging.core.filter.Filter;
 import org.jboss.messaging.core.list.PriorityLinkedList;
 import org.jboss.messaging.core.list.impl.PriorityLinkedListImpl;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.postoffice.FlowController;
-import org.jboss.messaging.core.server.Consumer;
-import org.jboss.messaging.core.server.DistributionPolicy;
-import org.jboss.messaging.core.server.HandleStatus;
-import org.jboss.messaging.core.server.MessageReference;
+import org.jboss.messaging.core.server.*;
 import org.jboss.messaging.core.server.Queue;
 import org.jboss.messaging.core.transaction.Transaction;
 import org.jboss.messaging.core.transaction.impl.TransactionImpl;
 import org.jboss.messaging.util.SimpleString;
 
+import java.util.*;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
 /**
  * 
  * Implementation of a Queue
@@ -102,7 +93,7 @@
 
    private int pos;
    
-   private boolean locked;
+   //private boolean locked;
    
    private AtomicInteger sizeBytes = new AtomicInteger(0);
 
@@ -167,39 +158,30 @@
    
    public HandleStatus addLast(final MessageReference ref)
    {
-      if (locked)
-      {
+
          lock.lock();
-      }
       try
       {         
          return add(ref, false);
       }
       finally
       {
-         if (locked)
-         {
             lock.unlock();
-         }
       }
    }
 
    public HandleStatus addFirst(final MessageReference ref)
    {
-      if (locked)
-      {
          lock.lock();
-      }      
+
       try
       {
          return add(ref, true);
       }
       finally
       {
-         if (locked)
-         {
             lock.unlock();
-         }
+        
       }
    }
 
@@ -508,14 +490,14 @@
    {
       lock.lock();
       
-      locked = true;
+      //locked = true;
    }
    
    public synchronized void unlock()
    {            
       lock.unlock();          
       
-      locked = false;
+      //locked = false;
    }
 
    // Public

Copied: trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java (from rev 4568, trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueTest.java)
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java	2008-06-24 14:34:18 UTC (rev 4570)
@@ -0,0 +1,1521 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.unit.core.server.impl;
+
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+import org.jboss.messaging.core.filter.Filter;
+import org.jboss.messaging.core.persistence.StorageManager;
+import org.jboss.messaging.core.server.*;
+import org.jboss.messaging.core.server.impl.QueueImpl;
+import org.jboss.messaging.core.server.impl.RoundRobinDistributionPolicy;
+import org.jboss.messaging.tests.unit.core.server.impl.fakes.FakeConsumer;
+import org.jboss.messaging.tests.unit.core.server.impl.fakes.FakeFilter;
+import org.jboss.messaging.tests.util.UnitTestCase;
+import org.jboss.messaging.util.SimpleString;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.*;
+
+/**
+ * A QueueTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ */
+public class QueueImplTest extends UnitTestCase
+{
+   // The tests ----------------------------------------------------------------
+
+   private final ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
+
+   private static final SimpleString queue1 = new SimpleString("queue1");
+
+   public void testID()
+   {
+      final long id = 123;
+
+      Queue queue = new QueueImpl(id, queue1, null, false, true, false, -1, scheduledExecutor);
+
+      assertEquals(id, queue.getPersistenceID());
+
+      final long id2 = 456;
+
+      queue.setPersistenceID(id2);
+
+      assertEquals(id2, queue.getPersistenceID());
+   }
+
+   public void testName()
+   {
+      final SimpleString name = new SimpleString("oobblle");
+
+      Queue queue = new QueueImpl(1, name, null, false, true, false, -1, scheduledExecutor);
+
+      assertEquals(name, queue.getName());
+   }
+
+   public void testClustered()
+   {
+      Queue queue = new QueueImpl(1, queue1, null, false, true, false, -1, scheduledExecutor);
+
+      assertFalse(queue.isClustered());
+
+      queue = new QueueImpl(1, queue1, null, true, true, false, -1, scheduledExecutor);
+
+      assertTrue(queue.isClustered());
+   }
+
+   public void testDurable()
+   {
+      Queue queue = new QueueImpl(1, queue1, null, false, false, false, -1, scheduledExecutor);
+
+      assertFalse(queue.isDurable());
+
+      queue = new QueueImpl(1, queue1, null, false, true, false, -1, scheduledExecutor);
+
+      assertTrue(queue.isDurable());
+   }
+
+   public void testTemporary()
+   {
+      Queue queue = new QueueImpl(1, queue1, null, false, false, false, -1, scheduledExecutor);
+
+      assertFalse(queue.isTemporary());
+
+      queue = new QueueImpl(1, queue1, null, false, false, true, -1, scheduledExecutor);
+
+      assertTrue(queue.isTemporary());
+   }
+
+   public void testGetMaxSizeBytes()
+   {
+      final int maxSize = 123456;
+
+      final int id = 123;
+
+      Queue queue = new QueueImpl(id, queue1, null, false, true, false, maxSize, scheduledExecutor);
+
+      assertEquals(id, queue.getPersistenceID());
+
+      assertEquals(maxSize, queue.getMaxSizeBytes());
+   }
+
+   public void testAddRemoveConsumer()
+   {
+      Consumer cons1 = new FakeConsumer();
+
+      Consumer cons2 = new FakeConsumer();
+
+      Consumer cons3 = new FakeConsumer();
+
+      Queue queue = new QueueImpl(1, queue1, null, false, true, false, -1, scheduledExecutor);
+
+      assertEquals(0, queue.getConsumerCount());
+
+      queue.addConsumer(cons1);
+
+      assertEquals(1, queue.getConsumerCount());
+
+      assertTrue(queue.removeConsumer(cons1));
+
+      assertEquals(0, queue.getConsumerCount());
+
+      queue.addConsumer(cons1);
+
+      queue.addConsumer(cons2);
+
+      queue.addConsumer(cons3);
+
+      assertEquals(3, queue.getConsumerCount());
+
+      assertFalse(queue.removeConsumer(new FakeConsumer()));
+
+      assertEquals(3, queue.getConsumerCount());
+
+      assertTrue(queue.removeConsumer(cons1));
+
+      assertEquals(2, queue.getConsumerCount());
+
+      assertTrue(queue.removeConsumer(cons2));
+
+      assertEquals(1, queue.getConsumerCount());
+
+      assertTrue(queue.removeConsumer(cons3));
+
+      assertEquals(0, queue.getConsumerCount());
+
+      assertFalse(queue.removeConsumer(cons3));
+   }
+
+   public void testGetSetDistributionPolicy()
+   {
+      Queue queue = new QueueImpl(1, queue1, null, false, true, false, -1, scheduledExecutor);
+
+      assertNotNull(queue.getDistributionPolicy());
+
+      assertTrue(queue.getDistributionPolicy() instanceof RoundRobinDistributionPolicy);
+
+      DistributionPolicy policy = new DummyDistributionPolicy();
+
+      queue.setDistributionPolicy(policy);
+
+      assertEquals(policy, queue.getDistributionPolicy());
+   }
+
+   public void testGetSetFilter()
+   {
+      Queue queue = new QueueImpl(1, queue1, null, false, true, false, -1, scheduledExecutor);
+
+      assertNull(queue.getFilter());
+
+      Filter filter = new FakeFilter();
+
+      queue.setFilter(filter);
+
+      assertEquals(filter, queue.getFilter());
+
+      queue = new QueueImpl(1, queue1, filter, false, true, false, -1, scheduledExecutor);
+
+      assertEquals(filter, queue.getFilter());
+   }
+
+   public void testDefaultMaxSize()
+   {
+      Queue queue = new QueueImpl(1, queue1, null, false, true, false, -1, scheduledExecutor);
+
+      assertEquals(-1, queue.getMaxSizeBytes());
+   }
+
+   public void testSimpleAddLast()
+   {
+      Queue queue = new QueueImpl(1, queue1, null, false, true, false, -1, scheduledExecutor);
+
+      final int numMessages = 10;
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         MessageReference ref = generateReference(queue, i);
+
+         queue.addLast(ref);
+      }
+
+      assertEquals(numMessages, queue.getMessageCount());
+      assertEquals(0, queue.getScheduledCount());
+      assertEquals(0, queue.getDeliveringCount());
+
+   }
+
+   public void testSimpleDirectDelivery()
+   {
+      Queue queue = new QueueImpl(1, queue1, null, false, true, false, -1, scheduledExecutor);
+
+      FakeConsumer consumer = new FakeConsumer();
+
+      queue.addConsumer(consumer);
+
+      final int numMessages = 10;
+
+      List<MessageReference> refs = new ArrayList<MessageReference>();
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         MessageReference ref = generateReference(queue, i);
+
+         refs.add(ref);
+
+         queue.addLast(ref);
+      }
+
+      assertEquals(numMessages, queue.getMessageCount());
+      assertEquals(0, queue.getScheduledCount());
+      assertEquals(numMessages, queue.getDeliveringCount());
+
+      assertRefListsIdenticalRefs(refs, consumer.getReferences());
+   }
+
+   public void testSimpleNonDirectDelivery()
+   {
+      Queue queue = new QueueImpl(1, queue1, null, false, true, false, -1, scheduledExecutor);
+
+      final int numMessages = 10;
+
+      List<MessageReference> refs = new ArrayList<MessageReference>();
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         MessageReference ref = generateReference(queue, i);
+
+         refs.add(ref);
+
+         queue.addLast(ref);
+      }
+
+      assertEquals(10, queue.getMessageCount());
+      assertEquals(0, queue.getScheduledCount());
+      assertEquals(0, queue.getDeliveringCount());
+
+      //Now add a consumer
+      FakeConsumer consumer = new FakeConsumer();
+
+      queue.addConsumer(consumer);
+
+      assertTrue(consumer.getReferences().isEmpty());
+      assertEquals(10, queue.getMessageCount());
+      assertEquals(0, queue.getScheduledCount());
+
+      queue.deliver();
+
+      assertRefListsIdenticalRefs(refs, consumer.getReferences());
+      assertEquals(numMessages, queue.getMessageCount());
+      assertEquals(0, queue.getScheduledCount());
+      assertEquals(numMessages, queue.getDeliveringCount());
+   }
+
+   public void testBusyConsumer()
+   {
+      Queue queue = new QueueImpl(1, queue1, null, false, true, false, -1, scheduledExecutor);
+
+      FakeConsumer consumer = new FakeConsumer();
+
+      consumer.setStatusImmediate(HandleStatus.BUSY);
+
+      queue.addConsumer(consumer);
+
+      final int numMessages = 10;
+
+      List<MessageReference> refs = new ArrayList<MessageReference>();
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         MessageReference ref = generateReference(queue, i);
+
+         refs.add(ref);
+
+         queue.addLast(ref);
+      }
+
+      assertEquals(10, queue.getMessageCount());
+      assertEquals(0, queue.getScheduledCount());
+      assertEquals(0, queue.getDeliveringCount());
+
+      queue.deliver();
+
+      assertEquals(10, queue.getMessageCount());
+      assertEquals(0, queue.getScheduledCount());
+      assertEquals(0, queue.getDeliveringCount());
+      assertTrue(consumer.getReferences().isEmpty());
+
+      consumer.setStatusImmediate(HandleStatus.HANDLED);
+
+      queue.deliver();
+
+      assertRefListsIdenticalRefs(refs, consumer.getReferences());
+      assertEquals(10, queue.getMessageCount());
+      assertEquals(0, queue.getScheduledCount());
+      assertEquals(10, queue.getDeliveringCount());
+   }
+
+   public void testBusyConsumerThenAddMoreMessages()
+   {
+      Queue queue = new QueueImpl(1, queue1, null, false, true, false, -1, scheduledExecutor);
+
+      FakeConsumer consumer = new FakeConsumer();
+
+      consumer.setStatusImmediate(HandleStatus.BUSY);
+
+      queue.addConsumer(consumer);
+
+      final int numMessages = 10;
+
+      List<MessageReference> refs = new ArrayList<MessageReference>();
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         MessageReference ref = generateReference(queue, i);
+
+         refs.add(ref);
+
+         queue.addLast(ref);
+      }
+
+      assertEquals(10, queue.getMessageCount());
+      assertEquals(0, queue.getScheduledCount());
+      assertEquals(0, queue.getDeliveringCount());
+
+      queue.deliver();
+
+      assertEquals(10, queue.getMessageCount());
+      assertEquals(0, queue.getScheduledCount());
+      assertEquals(0, queue.getDeliveringCount());
+      assertTrue(consumer.getReferences().isEmpty());
+
+      for (int i = numMessages; i < numMessages * 2; i++)
+      {
+         MessageReference ref = generateReference(queue, i);
+
+         refs.add(ref);
+
+         queue.addLast(ref);
+      }
+
+      assertEquals(20, queue.getMessageCount());
+      assertEquals(0, queue.getScheduledCount());
+      assertEquals(0, queue.getDeliveringCount());
+      assertTrue(consumer.getReferences().isEmpty());
+
+      consumer.setStatusImmediate(HandleStatus.HANDLED);
+
+      for (int i = numMessages * 2; i < numMessages * 3; i++)
+      {
+         MessageReference ref = generateReference(queue, i);
+
+         refs.add(ref);
+
+         queue.addLast(ref);
+      }
+
+      queue.deliver();
+
+      assertRefListsIdenticalRefs(refs, consumer.getReferences());
+      assertEquals(30, queue.getMessageCount());
+      assertEquals(0, queue.getScheduledCount());
+      assertEquals(30, queue.getDeliveringCount());
+   }
+
+   public void testAddFirstAddLast()
+   {
+      Queue queue = new QueueImpl(1, queue1, null, false, true, false, -1, scheduledExecutor);
+
+      final int numMessages = 10;
+
+      List<MessageReference> refs1 = new ArrayList<MessageReference>();
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         MessageReference ref = generateReference(queue, i);
+
+         refs1.add(ref);
+
+         queue.addLast(ref);
+      }
+
+      LinkedList<MessageReference> refs2 = new LinkedList<MessageReference>();
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         MessageReference ref = generateReference(queue, i + numMessages);
+
+         refs2.addFirst(ref);
+
+         queue.addFirst(ref);
+      }
+
+      List<MessageReference> refs3 = new ArrayList<MessageReference>();
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         MessageReference ref = generateReference(queue, i + 2 * numMessages);
+
+         refs3.add(ref);
+
+         queue.addLast(ref);
+      }
+
+      FakeConsumer consumer = new FakeConsumer();
+
+      queue.addConsumer(consumer);
+
+      queue.deliver();
+
+      List<MessageReference> allRefs = new ArrayList<MessageReference>();
+
+      allRefs.addAll(refs2);
+      allRefs.addAll(refs1);
+      allRefs.addAll(refs3);
+
+      assertRefListsIdenticalRefs(allRefs, consumer.getReferences());
+   }
+
+
+   public void testChangeConsumersAndDeliver() throws Exception
+   {
+      Queue queue = new QueueImpl(1, queue1, null, false, true, false, -1, scheduledExecutor);
+
+      final int numMessages = 10;
+
+      List<MessageReference> refs = new ArrayList<MessageReference>();
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         MessageReference ref = generateReference(queue, i);
+
+         refs.add(ref);
+
+         queue.addLast(ref);
+      }
+
+      assertEquals(numMessages, queue.getMessageCount());
+      assertEquals(0, queue.getScheduledCount());
+      assertEquals(0, queue.getDeliveringCount());
+
+      FakeConsumer cons1 = new FakeConsumer();
+
+      queue.addConsumer(cons1);
+
+      queue.deliver();
+
+      assertEquals(numMessages, queue.getMessageCount());
+      assertEquals(0, queue.getScheduledCount());
+      assertEquals(numMessages, queue.getDeliveringCount());
+
+      assertRefListsIdenticalRefs(refs, cons1.getReferences());
+
+      FakeConsumer cons2 = new FakeConsumer();
+
+      queue.addConsumer(cons2);
+
+      assertEquals(2, queue.getConsumerCount());
+
+      cons1.getReferences().clear();
+
+      for (MessageReference ref : refs)
+      {
+         queue.referenceAcknowledged(ref);
+      }
+
+      refs.clear();
+
+      for (int i = 0; i < 2 * numMessages; i++)
+      {
+         MessageReference ref = generateReference(queue, i);
+
+         refs.add(ref);
+
+         queue.addLast(ref);
+      }
+
+      assertEquals(numMessages * 2, queue.getMessageCount());
+      assertEquals(0, queue.getScheduledCount());
+      assertEquals(numMessages * 2, queue.getDeliveringCount());
+
+      assertEquals(numMessages, cons1.getReferences().size());
+
+      assertEquals(numMessages, cons2.getReferences().size());
+
+      cons1.getReferences().clear();
+      cons2.getReferences().clear();
+
+      for (MessageReference ref : refs)
+      {
+         queue.referenceAcknowledged(ref);
+      }
+      refs.clear();
+
+      FakeConsumer cons3 = new FakeConsumer();
+
+      queue.addConsumer(cons3);
+
+      assertEquals(3, queue.getConsumerCount());
+
+      for (int i = 0; i < 3 * numMessages; i++)
+      {
+         MessageReference ref = generateReference(queue, i);
+
+         refs.add(ref);
+
+         queue.addLast(ref);
+      }
+
+      assertEquals(numMessages * 3, queue.getMessageCount());
+      assertEquals(0, queue.getScheduledCount());
+      assertEquals(numMessages * 3, queue.getDeliveringCount());
+
+      assertEquals(numMessages, cons1.getReferences().size());
+
+      assertEquals(numMessages, cons2.getReferences().size());
+
+      assertEquals(numMessages, cons3.getReferences().size());
+
+      queue.removeConsumer(cons1);
+
+      cons3.getReferences().clear();
+      cons2.getReferences().clear();
+
+      for (MessageReference ref : refs)
+      {
+         queue.referenceAcknowledged(ref);
+      }
+      refs.clear();
+
+      for (int i = 0; i < 2 * numMessages; i++)
+      {
+         MessageReference ref = generateReference(queue, i);
+
+         refs.add(ref);
+
+         queue.addLast(ref);
+      }
+
+      assertEquals(numMessages * 2, queue.getMessageCount());
+      assertEquals(0, queue.getScheduledCount());
+      assertEquals(numMessages * 2, queue.getDeliveringCount());
+
+      assertEquals(numMessages, cons2.getReferences().size());
+
+      assertEquals(numMessages, cons3.getReferences().size());
+
+      queue.removeConsumer(cons3);
+
+      cons2.getReferences().clear();
+
+      for (MessageReference ref : refs)
+      {
+         queue.referenceAcknowledged(ref);
+      }
+      refs.clear();
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         MessageReference ref = generateReference(queue, i);
+
+         refs.add(ref);
+
+         queue.addLast(ref);
+      }
+
+      assertEquals(numMessages, queue.getMessageCount());
+      assertEquals(0, queue.getScheduledCount());
+      assertEquals(numMessages, queue.getDeliveringCount());
+
+      assertEquals(numMessages, cons2.getReferences().size());
+
+   }
+
+   public void testConsumerReturningNull()
+   {
+      Queue queue = new QueueImpl(1, queue1, null, false, true, false, -1, scheduledExecutor);
+
+      class NullConsumer implements Consumer
+      {
+         public HandleStatus handle(MessageReference reference)
+         {
+            return null;
+         }
+      }
+
+      queue.addConsumer(new NullConsumer());
+
+      MessageReference ref = generateReference(queue, 1);
+
+      try
+      {
+         queue.addLast(ref);
+
+         fail("Should throw IllegalStateException");
+      }
+      catch (IllegalStateException e)
+      {
+         //Ok
+      }
+   }
+
+   public void testRoundRobinWithQueueing()
+   {
+      Queue queue = new QueueImpl(1, queue1, null, false, true, false, -1, scheduledExecutor);
+
+      assertTrue(queue.getDistributionPolicy() instanceof RoundRobinDistributionPolicy);
+
+      final int numMessages = 10;
+
+      List<MessageReference> refs = new ArrayList<MessageReference>();
+
+      //Test first with queueing
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         MessageReference ref = generateReference(queue, i);
+
+         refs.add(ref);
+
+         queue.addLast(ref);
+      }
+
+      FakeConsumer cons1 = new FakeConsumer();
+
+      FakeConsumer cons2 = new FakeConsumer();
+
+      queue.addConsumer(cons1);
+
+      queue.addConsumer(cons2);
+
+      queue.deliver();
+
+      assertEquals(numMessages / 2, cons1.getReferences().size());
+
+      assertEquals(numMessages / 2, cons2.getReferences().size());
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         MessageReference ref;
+
+         ref = (i % 2 == 0) ? cons1.getReferences().get(i / 2) : cons2.getReferences().get(i / 2);
+
+         assertEquals(refs.get(i), ref);
+      }
+   }
+
+   public void testRoundRobinDirect()
+   {
+      Queue queue = new QueueImpl(1, queue1, null, false, true, false, -1, scheduledExecutor);
+
+      assertTrue(queue.getDistributionPolicy() instanceof RoundRobinDistributionPolicy);
+
+      final int numMessages = 10;
+
+      List<MessageReference> refs = new ArrayList<MessageReference>();
+
+      FakeConsumer cons1 = new FakeConsumer();
+
+      FakeConsumer cons2 = new FakeConsumer();
+
+      queue.addConsumer(cons1);
+
+      queue.addConsumer(cons2);
+
+      queue.deliver();
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         MessageReference ref = generateReference(queue, i);
+
+         refs.add(ref);
+
+         queue.addLast(ref);
+      }
+
+      assertEquals(numMessages / 2, cons1.getReferences().size());
+
+      assertEquals(numMessages / 2, cons2.getReferences().size());
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         MessageReference ref;
+
+         ref = (i % 2 == 0) ? cons1.getReferences().get(i / 2) : cons2.getReferences().get(i / 2);
+
+         assertEquals(refs.get(i), ref);
+      }
+   }
+
+   public void testDeleteAllReferences() throws Exception
+   {
+      Queue queue = new QueueImpl(1, queue1, null, false, true, false, -1, scheduledExecutor);
+
+      StorageManager storageManager = EasyMock.createStrictMock(StorageManager.class);
+
+      final int numMessages = 10;
+
+      List<MessageReference> refs = new ArrayList<MessageReference>();
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         MessageReference ref = generateReference(queue, i);
+
+         ref.getMessage().setDurable(i % 2 == 0);
+
+         refs.add(ref);
+
+         queue.addLast(ref);
+      }
+
+      //Add some scheduled too
+
+      final int numScheduled = 10;
+
+      for (int i = numMessages; i < numMessages + numScheduled; i++)
+      {
+         MessageReference ref = generateReference(queue, i);
+
+         ref.setScheduledDeliveryTime(System.currentTimeMillis() + 1000000000);
+
+         ref.getMessage().setDurable(i % 2 == 0);
+
+         refs.add(ref);
+
+         queue.addLast(ref);
+      }
+
+
+      assertEquals(numMessages + numScheduled, queue.getMessageCount());
+      assertEquals(numScheduled, queue.getScheduledCount());
+      assertEquals(0, queue.getDeliveringCount());
+
+      //What I expect to get
+
+      EasyMock.expect(storageManager.generateTransactionID()).andReturn(1L);
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         if (i % 2 == 0)
+         {
+            storageManager.storeDeleteTransactional(1, i);
+         }
+      }
+
+      for (int i = numMessages; i < numMessages + numScheduled; i++)
+      {
+         if (i % 2 == 0)
+         {
+            storageManager.storeDeleteTransactional(1, i);
+         }
+      }
+
+      storageManager.commit(1);
+
+      EasyMock.replay(storageManager);
+
+      queue.deleteAllReferences(storageManager);
+
+      EasyMock.verify(storageManager);
+
+      assertEquals(0, queue.getMessageCount());
+      assertEquals(0, queue.getScheduledCount());
+      assertEquals(0, queue.getDeliveringCount());
+
+      FakeConsumer consumer = new FakeConsumer();
+
+      queue.addConsumer(consumer);
+
+      queue.deliver();
+
+      assertTrue(consumer.getReferences().isEmpty());
+   }
+
+   public void testMaxSize()
+   {
+      final int maxSize = 10 * 1024;
+
+      Queue queue = new QueueImpl(1, queue1, null, false, true, false, maxSize, scheduledExecutor);
+
+      List<MessageReference> refs = new ArrayList<MessageReference>();
+
+      int size = 0;
+
+      int i = 0;
+      while (true)
+      {
+         MessageReference ref = generateReference(queue, i++);
+
+         if (size + ref.getMessage().getEncodeSize() > maxSize)
+         {
+            break;
+         }
+
+         size += ref.getMessage().getEncodeSize();
+
+         refs.add(ref);
+
+         assertEquals(HandleStatus.HANDLED, queue.addLast(ref));
+      }
+
+      assertEquals(maxSize, queue.getMaxSizeBytes());
+      assertEquals(size, queue.getSizeBytes());
+
+      //Try to add more
+
+      for (int j = 0; j < 10; j++)
+      {
+         MessageReference ref = generateReference(queue, j);
+
+         assertEquals(HandleStatus.BUSY, queue.addLast(ref));
+      }
+
+      assertEquals(maxSize, queue.getMaxSizeBytes());
+      assertEquals(size, queue.getSizeBytes());
+
+      // Try to add at front too
+
+      for (int j = 0; j < 10; j++)
+      {
+         MessageReference ref = generateReference(queue, i);
+
+         assertEquals(HandleStatus.BUSY, queue.addLast(ref));
+      }
+
+      assertEquals(maxSize, queue.getMaxSizeBytes());
+      assertEquals(size, queue.getSizeBytes());
+   }
+
+   public void testWithPriorities()
+   {
+      Queue queue = new QueueImpl(1, queue1, null, false, true, false, -1, scheduledExecutor);
+
+      final int numMessages = 10;
+
+      List<MessageReference> refs = new ArrayList<MessageReference>();
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         MessageReference ref = generateReference(queue, i);
+
+         ref.getMessage().setPriority((byte) i);
+
+         refs.add(ref);
+
+         assertEquals(HandleStatus.HANDLED, queue.addLast(ref));
+      }
+
+      FakeConsumer consumer = new FakeConsumer();
+
+      queue.addConsumer(consumer);
+
+      queue.deliver();
+
+      List<MessageReference> receivedRefs = consumer.getReferences();
+
+      //Should be in reverse order
+
+      assertEquals(refs.size(), receivedRefs.size());
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         assertEquals(refs.get(i), receivedRefs.get(9 - i));
+      }
+
+      //But if we send more - since we are now in direct mode - the order will be the send order
+      //since the refs don't get queued
+
+      consumer.clearReferences();
+
+      refs.clear();
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         MessageReference ref = generateReference(queue, i);
+
+         ref.getMessage().setPriority((byte) i);
+
+         refs.add(ref);
+
+         assertEquals(HandleStatus.HANDLED, queue.addLast(ref));
+      }
+
+      assertRefListsIdenticalRefs(refs, consumer.getReferences());
+   }
+
+   public void testConsumerWithFiltersDirect() throws Exception
+   {
+      testConsumerWithFilters(true);
+   }
+
+   public void testConsumerWithFiltersQueueing() throws Exception
+   {
+      testConsumerWithFilters(false);
+   }
+
+   public void testConsumerWithFilterAddAndRemove()
+   {
+      Queue queue = new QueueImpl(1, queue1, null, false, true, false, -1, scheduledExecutor);
+
+      Filter filter = new FakeFilter("fruit", "orange");
+
+      FakeConsumer consumer = new FakeConsumer(filter);
+   }
+
+   public void testList()
+   {
+      Queue queue = new QueueImpl(1, queue1, null, false, true, false, -1, scheduledExecutor);
+
+      final int numMessages = 20;
+
+      List<MessageReference> refs = new ArrayList<MessageReference>();
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         MessageReference ref = generateReference(queue, i);
+
+         queue.addLast(ref);
+
+         refs.add(ref);
+      }
+
+      assertEquals(numMessages, queue.getMessageCount());
+
+      List<MessageReference> list = queue.list(null);
+
+      assertRefListsIdenticalRefs(refs, list);
+   }
+
+   public void testListWithFilter()
+   {
+      Queue queue = new QueueImpl(1, queue1, null, false, true, false, -1, scheduledExecutor);
+
+      final int numMessages = 20;
+
+      List<MessageReference> refs = new ArrayList<MessageReference>();
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         MessageReference ref = generateReference(queue, i);
+
+         if (i % 2 == 0)
+         {
+            ref.getMessage().putStringProperty(new SimpleString("god"), new SimpleString("dog"));
+         }
+
+         queue.addLast(ref);
+
+         refs.add(ref);
+      }
+
+      assertEquals(numMessages, queue.getMessageCount());
+
+      Filter filter = new FakeFilter("god", "dog");
+
+      List<MessageReference> list = queue.list(filter);
+
+      assertEquals(numMessages / 2, list.size());
+
+      for (int i = 0; i < numMessages; i += 2)
+      {
+         assertEquals(refs.get(i), list.get(i / 2));
+      }
+   }
+
+   /*
+   public void testQuickSpeedTest()
+   {
+      Queue queue = new QueueImpl(1);
+      
+      final int numMessages = 1000000;
+      
+      FakeConsumer cons = new FakeConsumer();
+      
+      queue.addConsumer(cons);
+      
+      long start = System.currentTimeMillis();
+      
+      for (int i = 0; i < numMessages; i++)
+      {
+         MessageReference ref = this.generateReference(1);
+         
+         queue.addLast(ref);
+      }
+      
+      long end = System.currentTimeMillis();
+      
+      double rate = 1000 * (double)numMessages / (end - start); 
+      
+      System.out.println("Rate: " + rate);
+      
+      assertEquals(numMessages, cons.getReferences().size());
+   }
+   */
+
+   public void testConsumeWithFiltersAddAndRemoveConsumer() throws Exception
+   {
+      Queue queue = new QueueImpl(1, queue1, null, false, true, false, -1, scheduledExecutor);
+
+      Filter filter = new FakeFilter("fruit", "orange");
+
+      FakeConsumer consumer = new FakeConsumer(filter);
+
+      queue.addConsumer(consumer);
+
+      List<MessageReference> refs = new ArrayList<MessageReference>();
+
+      MessageReference ref1 = generateReference(queue, 1);
+
+      ref1.getMessage().putStringProperty(new SimpleString("fruit"), new SimpleString("banana"));
+
+      assertEquals(HandleStatus.HANDLED, queue.addLast(ref1));
+
+      MessageReference ref2 = generateReference(queue, 2);
+
+      ref2.getMessage().putStringProperty(new SimpleString("fruit"), new SimpleString("orange"));
+
+      assertEquals(HandleStatus.HANDLED, queue.addLast(ref2));
+
+      refs.add(ref2);
+
+
+      assertEquals(2, queue.getMessageCount());
+
+      assertEquals(1, consumer.getReferences().size());
+
+      assertEquals(1, queue.getDeliveringCount());
+
+      assertRefListsIdenticalRefs(refs, consumer.getReferences());
+
+      queue.referenceAcknowledged(ref2);
+
+      queue.removeConsumer(consumer);
+
+      queue.addConsumer(consumer);
+
+      queue.deliver();
+
+
+      refs.clear();
+
+      consumer.clearReferences();
+
+      MessageReference ref3 = generateReference(queue, 3);
+
+      ref3.getMessage().putStringProperty(new SimpleString("fruit"), new SimpleString("banana"));
+
+      assertEquals(HandleStatus.HANDLED, queue.addLast(ref3));
+
+      MessageReference ref4 = generateReference(queue, 4);
+
+      ref4.getMessage().putStringProperty(new SimpleString("fruit"), new SimpleString("orange"));
+
+      assertEquals(HandleStatus.HANDLED, queue.addLast(ref4));
+
+      refs.add(ref4);
+
+      assertEquals(3, queue.getMessageCount());
+
+      assertEquals(1, consumer.getReferences().size());
+
+      assertEquals(1, queue.getDeliveringCount());
+
+      assertRefListsIdenticalRefs(refs, consumer.getReferences());
+   }
+
+   // Private ------------------------------------------------------------------------------
+
+   private void testConsumerWithFilters(boolean direct) throws Exception
+   {
+      Queue queue = new QueueImpl(1, queue1, null, false, true, false, -1, scheduledExecutor);
+
+      Filter filter = new FakeFilter("fruit", "orange");
+
+      FakeConsumer consumer = new FakeConsumer(filter);
+
+      if (direct)
+      {
+         queue.addConsumer(consumer);
+      }
+
+      List<MessageReference> refs = new ArrayList<MessageReference>();
+
+      MessageReference ref1 = generateReference(queue, 1);
+
+      ref1.getMessage().putStringProperty(new SimpleString("fruit"), new SimpleString("banana"));
+
+      assertEquals(HandleStatus.HANDLED, queue.addLast(ref1));
+
+      MessageReference ref2 = generateReference(queue, 2);
+
+      ref2.getMessage().putStringProperty(new SimpleString("cheese"), new SimpleString("stilton"));
+
+      assertEquals(HandleStatus.HANDLED, queue.addLast(ref2));
+
+      MessageReference ref3 = generateReference(queue, 3);
+
+      ref3.getMessage().putStringProperty(new SimpleString("cake"), new SimpleString("sponge"));
+
+      assertEquals(HandleStatus.HANDLED, queue.addLast(ref3));
+
+      MessageReference ref4 = generateReference(queue, 4);
+
+      ref4.getMessage().putStringProperty(new SimpleString("fruit"), new SimpleString("orange"));
+
+      refs.add(ref4);
+
+      assertEquals(HandleStatus.HANDLED, queue.addLast(ref4));
+
+      MessageReference ref5 = generateReference(queue, 5);
+
+      ref5.getMessage().putStringProperty(new SimpleString("fruit"), new SimpleString("apple"));
+
+      assertEquals(HandleStatus.HANDLED, queue.addLast(ref5));
+
+      MessageReference ref6 = generateReference(queue, 6);
+
+      ref6.getMessage().putStringProperty(new SimpleString("fruit"), new SimpleString("orange"));
+
+      refs.add(ref6);
+
+      assertEquals(HandleStatus.HANDLED, queue.addLast(ref6));
+
+      if (!direct)
+      {
+         queue.addConsumer(consumer);
+
+         queue.deliver();
+      }
+
+      assertEquals(6, queue.getMessageCount());
+
+      assertEquals(2, consumer.getReferences().size());
+
+      assertEquals(2, queue.getDeliveringCount());
+
+      assertRefListsIdenticalRefs(refs, consumer.getReferences());
+
+      queue.referenceAcknowledged(ref5);
+      queue.referenceAcknowledged(ref6);
+
+      queue.removeConsumer(consumer);
+
+      consumer = new FakeConsumer();
+
+      queue.addConsumer(consumer);
+
+      queue.deliver();
+
+      assertEquals(4, queue.getMessageCount());
+
+      assertEquals(4, consumer.getReferences().size());
+
+      assertEquals(4, queue.getDeliveringCount());
+   }
+
+   public void testMessageOrder() throws Exception
+   {
+      Consumer consumer = EasyMock.createStrictMock(Consumer.class);
+      Queue queue = new QueueImpl(1, queue1, null, false, true, false, -1, scheduledExecutor);
+      MessageReference messageReference = generateReference(queue, 1);
+      MessageReference messageReference2 = generateReference(queue, 2);
+      MessageReference messageReference3 = generateReference(queue, 3);
+      queue.addFirst(messageReference);
+      queue.addLast(messageReference2);
+      queue.addFirst(messageReference3);
+      EasyMock.expect(consumer.handle(messageReference3)).andReturn(HandleStatus.HANDLED);
+      EasyMock.expect(consumer.handle(messageReference)).andReturn(HandleStatus.HANDLED);
+      EasyMock.expect(consumer.handle(messageReference2)).andReturn(HandleStatus.HANDLED);
+      EasyMock.replay(consumer);
+      queue.addConsumer(consumer);
+      queue.deliver();
+      EasyMock.verify(consumer);
+   }
+
+   public void testMessagesAdded() throws Exception
+   {
+      Queue queue = new QueueImpl(1, queue1, null, false, true, false, -1, scheduledExecutor);
+      MessageReference messageReference = generateReference(queue, 1);
+      MessageReference messageReference2 = generateReference(queue, 2);
+      MessageReference messageReference3 = generateReference(queue, 3);
+      queue.addLast(messageReference);
+      queue.addLast(messageReference2);
+      queue.addLast(messageReference3);
+      assertEquals(queue.getMessagesAdded(), 3);
+   }
+
+   public void testAddLastWhenLocked() throws Exception
+   {
+
+      Queue queue = new QueueImpl(1, queue1, null, false, true, false, -1, scheduledExecutor);
+      MessageReference messageReference = generateReference(queue, 1);
+      queue.lock();
+      CountDownLatch countDownLatch = new CountDownLatch(1);
+      AddtoQueueRunner runner = new AddtoQueueRunner(false, queue, messageReference, countDownLatch);
+      new Thread(runner).start();
+      assertFalse(runner.added);
+      queue.unlock();
+      countDownLatch.await(1000, TimeUnit.MILLISECONDS);
+      assertTrue(runner.added);
+
+   }
+
+   public void testAddLastWhenLockedMultiple() throws Exception
+   {
+
+      Queue queue = new QueueImpl(1, queue1, null, false, true, false, -1, scheduledExecutor);
+      MessageReference messageReference = generateReference(queue, 1);
+      MessageReference messageReference2 = generateReference(queue, 2);
+      MessageReference messageReference3 = generateReference(queue, 3);
+      queue.lock();
+      CountDownLatch countDownLatch = new CountDownLatch(3);
+      AddtoQueueRunner runner = new AddtoQueueRunner(false, queue, messageReference, countDownLatch);
+      new Thread(runner).start();
+      AddtoQueueRunner runner2 = new AddtoQueueRunner(false, queue, messageReference2, countDownLatch);
+      new Thread(runner2).start();
+      AddtoQueueRunner runner3 = new AddtoQueueRunner(false, queue, messageReference3, countDownLatch);
+      new Thread(runner3).start();
+      assertFalse(runner.added);
+      assertFalse(runner2.added);
+      assertFalse(runner3.added);
+      queue.unlock();
+      countDownLatch.await(1000, TimeUnit.MILLISECONDS);
+      assertTrue(runner.added);
+      assertTrue(runner2.added);
+      assertTrue(runner3.added);
+
+   }
+
+   public void testAddFirstWhenLocked() throws Exception
+   {
+
+      Queue queue = new QueueImpl(1, queue1, null, false, true, false, -1, scheduledExecutor);
+      MessageReference messageReference = generateReference(queue, 1);
+      queue.lock();
+      CountDownLatch countDownLatch = new CountDownLatch(1);
+      AddtoQueueRunner runner = new AddtoQueueRunner(true, queue, messageReference, countDownLatch);
+      new Thread(runner).start();
+      assertFalse(runner.added);
+      queue.unlock();
+      countDownLatch.await(1000, TimeUnit.MILLISECONDS);
+      assertTrue(runner.added);
+
+   }
+
+   public void testAddFirstWhenLockedMultiple() throws Exception
+   {
+
+      Queue queue = new QueueImpl(1, queue1, null, false, true, false, -1, scheduledExecutor);
+      MessageReference messageReference = generateReference(queue, 1);
+      MessageReference messageReference2 = generateReference(queue, 2);
+      MessageReference messageReference3 = generateReference(queue, 3);
+      queue.lock();
+      CountDownLatch countDownLatch = new CountDownLatch(3);
+      AddtoQueueRunner runner = new AddtoQueueRunner(true, queue, messageReference, countDownLatch);
+      new Thread(runner).start();
+      AddtoQueueRunner runner2 = new AddtoQueueRunner(true, queue, messageReference2, countDownLatch);
+      new Thread(runner2).start();
+      AddtoQueueRunner runner3 = new AddtoQueueRunner(true, queue, messageReference3, countDownLatch);
+      new Thread(runner3).start();
+      assertFalse(runner.added);
+      assertFalse(runner2.added);
+      assertFalse(runner3.added);
+      queue.unlock();
+      countDownLatch.await(10000, TimeUnit.MILLISECONDS);
+      assertTrue(runner.added);
+      assertTrue(runner2.added);
+      assertTrue(runner3.added);
+
+   }
+
+   public void testAddListFirst() throws Exception
+   {
+      Consumer consumer = EasyMock.createStrictMock(Consumer.class);
+      Queue queue = new QueueImpl(1, queue1, null, false, true, false, -1, scheduledExecutor);
+      MessageReference messageReference = generateReference(queue, 1);
+      MessageReference messageReference2 = generateReference(queue, 2);
+      MessageReference messageReference3 = generateReference(queue, 3);
+      LinkedList<MessageReference> messageReferences = new LinkedList<MessageReference>();
+      messageReferences.add(messageReference);
+      messageReferences.add(messageReference2);
+      messageReferences.add(messageReference3);
+      EasyMock.expect(consumer.handle(messageReference)).andReturn(HandleStatus.HANDLED);
+      EasyMock.expect(consumer.handle(messageReference2)).andReturn(HandleStatus.HANDLED);
+      EasyMock.expect(consumer.handle(messageReference3)).andReturn(HandleStatus.HANDLED);
+      EasyMock.replay(consumer);
+      queue.addConsumer(consumer);
+      queue.addListFirst(messageReferences);
+      EasyMock.verify(consumer);
+
+   }
+
+   public void testRemoveReferenceWithId() throws Exception
+   {
+      Consumer consumer = EasyMock.createStrictMock(Consumer.class);
+      Queue queue = new QueueImpl(1, queue1, null, false, true, false, -1, scheduledExecutor);
+      MessageReference messageReference = generateReference(queue, 1);
+      MessageReference messageReference2 = generateReference(queue, 2);
+      MessageReference messageReference3 = generateReference(queue, 3);
+      LinkedList<MessageReference> messageReferences = new LinkedList<MessageReference>();
+      messageReferences.add(messageReference);
+      messageReferences.add(messageReference2);
+      messageReferences.add(messageReference3);
+      EasyMock.expect(consumer.handle(messageReference)).andReturn(HandleStatus.HANDLED);
+      EasyMock.expect(consumer.handle(messageReference3)).andReturn(HandleStatus.HANDLED);
+      EasyMock.replay(consumer);
+      queue.addListFirst(messageReferences);
+      queue.removeReferenceWithID(2);
+      queue.addConsumer(consumer);
+      queue.deliver();
+      EasyMock.verify(consumer);
+
+   }
+
+   public void testGetReference() throws Exception
+   {
+      Queue queue = new QueueImpl(1, queue1, null, false, true, false, -1, scheduledExecutor);
+      MessageReference messageReference = generateReference(queue, 1);
+      MessageReference messageReference2 = generateReference(queue, 2);
+      MessageReference messageReference3 = generateReference(queue, 3);
+      queue.addFirst(messageReference);
+      queue.addFirst(messageReference2);
+      queue.addFirst(messageReference3);
+      assertEquals(queue.getReference(2), messageReference2);
+
+   }
+
+   public void testGetNonExistentReference() throws Exception
+   {
+      Queue queue = new QueueImpl(1, queue1, null, false, true, false, -1, scheduledExecutor);
+      MessageReference messageReference = generateReference(queue, 1);
+      MessageReference messageReference2 = generateReference(queue, 2);
+      MessageReference messageReference3 = generateReference(queue, 3);
+      queue.addFirst(messageReference);
+      queue.addFirst(messageReference2);
+      queue.addFirst(messageReference3);
+      assertNull(queue.getReference(5));
+
+   }
+
+   public void testConsumerRemovedAfterException() throws Exception
+   {
+      Consumer consumer = EasyMock.createStrictMock(Consumer.class);
+      Queue queue = new QueueImpl(1, queue1, null, false, true, false, -1, scheduledExecutor);
+      MessageReference messageReference = generateReference(queue, 1);
+      MessageReference messageReference2 = generateReference(queue, 2);
+      MessageReference messageReference3 = generateReference(queue, 3);
+      LinkedList<MessageReference> messageReferences = new LinkedList<MessageReference>();
+      messageReferences.add(messageReference);
+      messageReferences.add(messageReference2);
+      messageReferences.add(messageReference3);
+      EasyMock.expect(consumer.handle(messageReference)).andReturn(HandleStatus.HANDLED);
+      EasyMock.expect(consumer.handle(messageReference2)).andThrow(new RuntimeException());
+      EasyMock.replay(consumer);
+      queue.addConsumer(consumer);
+      queue.addListFirst(messageReferences);
+      EasyMock.verify(consumer);
+
+   }
+
+   public void testDeliveryAsync() throws Exception
+   {
+      Consumer consumer = EasyMock.createStrictMock(Consumer.class);
+      Queue queue = new QueueImpl(1, queue1, null, false, true, false, -1, scheduledExecutor);
+      MessageReference messageReference = generateReference(queue, 1);
+      MessageReference messageReference2 = generateReference(queue, 2);
+      MessageReference messageReference3 = generateReference(queue, 3);
+      LinkedList<MessageReference> messageReferences = new LinkedList<MessageReference>();
+      messageReferences.add(messageReference);
+      messageReferences.add(messageReference2);
+      messageReferences.add(messageReference3);
+      EasyMock.expect(consumer.handle(messageReference)).andReturn(HandleStatus.HANDLED);
+      EasyMock.expect(consumer.handle(messageReference2)).andReturn(HandleStatus.HANDLED);
+      EasyMock.expect(consumer.handle(messageReference3)).andReturn(HandleStatus.HANDLED);
+      EasyMock.replay(consumer);
+      queue.addListFirst(messageReferences);
+      queue.addConsumer(consumer);
+      queue.deliverAsync(new Executor()
+      {
+         public void execute(Runnable command)
+         {
+            command.run();
+         }
+      });
+      EasyMock.verify(consumer);
+
+   }
+
+   public void testDeliveryScheduled() throws Exception
+   {
+      Consumer consumer = EasyMock.createStrictMock(Consumer.class);
+      Queue queue = new QueueImpl(1, queue1, null, false, true, false, -1, scheduledExecutor);
+      MessageReference messageReference = generateReference(queue, 1);
+      final CountDownLatch countDownLatch = new CountDownLatch(1);
+      EasyMock.expect(consumer.handle(messageReference)).andAnswer(new IAnswer<HandleStatus>()
+      {
+         public HandleStatus answer() throws Throwable
+         {
+            countDownLatch.countDown();
+            return HandleStatus.HANDLED;
+         }
+      });
+      EasyMock.replay(consumer);
+      queue.addConsumer(consumer);
+      messageReference.setScheduledDeliveryTime(System.currentTimeMillis() + 2000);
+      queue.addFirst(messageReference);
+
+      countDownLatch.await(3000, TimeUnit.MILLISECONDS);
+
+      EasyMock.verify(consumer);
+
+   }
+
+   public void testDeliveryScheduledBusyConsumer() throws Exception
+   {
+      Consumer consumer = EasyMock.createStrictMock(Consumer.class);
+      Queue queue = new QueueImpl(1, queue1, null, false, true, false, -1, scheduledExecutor);
+      MessageReference messageReference = generateReference(queue, 1);
+      final CountDownLatch countDownLatch = new CountDownLatch(1);
+      EasyMock.expect(consumer.handle(messageReference)).andAnswer(new IAnswer<HandleStatus>()
+      {
+         public HandleStatus answer() throws Throwable
+         {
+            countDownLatch.countDown();
+            return HandleStatus.BUSY;
+         }
+      });
+      EasyMock.expect(consumer.handle(messageReference)).andReturn(HandleStatus.HANDLED);
+      EasyMock.replay(consumer);
+      queue.addConsumer(consumer);
+      messageReference.setScheduledDeliveryTime(System.currentTimeMillis() + 2000);
+      queue.addFirst(messageReference);
+
+      countDownLatch.await(3000, TimeUnit.MILLISECONDS);
+
+      EasyMock.verify(consumer);
+
+   }
+   // Inner classes ---------------------------------------------------------------
+
+   class AddtoQueueRunner implements Runnable
+   {
+      Queue queue;
+      MessageReference messageReference;
+      boolean added = false;
+      CountDownLatch countDownLatch;
+      boolean first;
+
+      public AddtoQueueRunner(boolean first, Queue queue, MessageReference messageReference, CountDownLatch countDownLatch)
+      {
+         this.queue = queue;
+         this.messageReference = messageReference;
+         this.countDownLatch = countDownLatch;
+         this.first = first;
+      }
+
+      public void run()
+      {
+         if (first)
+         {
+            queue.addFirst(messageReference);
+         }
+         else
+         {
+            queue.addLast(messageReference);
+         }
+         added = true;
+         countDownLatch.countDown();
+      }
+   }
+
+   class DummyDistributionPolicy implements DistributionPolicy
+   {
+      public int select(List<Consumer> consumers, int lastPos)
+      {
+         return 0;
+      }
+   }
+
+}




More information about the jboss-cvs-commits mailing list