[jboss-cvs] JBoss Messaging SVN: r5146 - in trunk: src/main/org/jboss/messaging/core/server/impl and 3 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Sun Oct 19 04:48:49 EDT 2008


Author: timfox
Date: 2008-10-19 04:48:48 -0400 (Sun, 19 Oct 2008)
New Revision: 5146

Modified:
   trunk/src/main/org/jboss/messaging/core/server/Queue.java
   trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
   trunk/tests/src/org/jboss/messaging/tests/concurrent/server/impl/QueueTest.java
   trunk/tests/src/org/jboss/messaging/tests/timing/core/server/impl/QueueImplTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java
Log:
Fixed queue delivery tests


Modified: trunk/src/main/org/jboss/messaging/core/server/Queue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/Queue.java	2008-10-19 08:38:07 UTC (rev 5145)
+++ trunk/src/main/org/jboss/messaging/core/server/Queue.java	2008-10-19 08:48:48 UTC (rev 5146)
@@ -57,9 +57,7 @@
     * @param list
     */
    void addListFirst(LinkedList<MessageReference> list);
-   
-   void deliver();
-   
+         
    void deliverAsync(Executor executor);
    
    void addConsumer(Consumer consumer);
@@ -144,4 +142,7 @@
    MessageReference removeFirst();
    
    boolean consumerFailedOver();   
+   
+   //Only used in testing
+   void deliverNow();
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2008-10-19 08:38:07 UTC (rev 5145)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2008-10-19 08:48:48 UTC (rev 5146)
@@ -195,84 +195,14 @@
          executor.execute(deliverRunner);
       }
    }
-
-   /*
-    * Attempt to deliver all the messages in the queue
-    */
-   public void deliver()
+   
+   // Only used in testing - do not call directly!
+   public synchronized void deliverNow()
    {
-      // We don't do actual delivery if the queue is on a backup node - this is
-      // because it's async and could get out of step
-      // with the live node. Instead, when we replicate the delivery we remove
-      // the ref from the queue
-
-      if (backup)
-      {
-         return ;
-      }
-
-      MessageReference reference;
-
-      Iterator<MessageReference> iterator = null;
-
-      while (true)
-      {
-         if (iterator == null)
-         {
-            reference = messageReferences.peekFirst();
-         }
-         else
-         {
-            if (iterator.hasNext())
-            {
-               reference = iterator.next();
-            }
-            else
-            {
-               reference = null;
-            }
-         }
-
-         if (reference == null)
-         {
-            if (iterator == null)
-            {
-               // We delivered all the messages - go into direct delivery
-               direct = true;
-
-               promptDelivery = false;
-            }
-            return;
-         }
-
-         HandleStatus status = deliver(reference);
-
-         if (status == HandleStatus.HANDLED)
-         {
-            if (iterator == null)
-            {
-               messageReferences.removeFirst();
-            }
-            else
-            {
-               iterator.remove();
-            }            
-         }
-         else if (status == HandleStatus.BUSY)
-         {
-            // All consumers busy - give up
-            break;
-         }
-         else if (status == HandleStatus.NO_MATCH && iterator == null)
-         {
-            // Consumers not all busy - but filter not accepting - iterate
-            // back
-            // through the queue
-            iterator = messageReferences.iterator();
-         }
-      }     
+      deliver();
    }
 
+   
    public synchronized void addConsumer(final Consumer consumer)
    {
       distributionPolicy.addConsumer(consumer);
@@ -668,6 +598,83 @@
    // Private
    // ------------------------------------------------------------------------------
 
+   /*
+    * Attempt to deliver all the messages in the queue
+    */
+   private void deliver()
+   {
+      // We don't do actual delivery if the queue is on a backup node - this is
+      // because it's async and could get out of step
+      // with the live node. Instead, when we replicate the delivery we remove
+      // the ref from the queue
+
+      if (backup)
+      {
+         return;
+      }
+
+      MessageReference reference;
+
+      Iterator<MessageReference> iterator = null;
+
+      while (true)
+      {
+         if (iterator == null)
+         {
+            reference = messageReferences.peekFirst();
+         }
+         else
+         {
+            if (iterator.hasNext())
+            {
+               reference = iterator.next();
+            }
+            else
+            {
+               reference = null;
+            }
+         }
+
+         if (reference == null)
+         {
+            if (iterator == null)
+            {
+               // We delivered all the messages - go into direct delivery
+               direct = true;
+
+               promptDelivery = false;
+            }
+            return;
+         }
+
+         HandleStatus status = deliver(reference);
+
+         if (status == HandleStatus.HANDLED)
+         {
+            if (iterator == null)
+            {
+               messageReferences.removeFirst();
+            }
+            else
+            {
+               iterator.remove();
+            }            
+         }
+         else if (status == HandleStatus.BUSY)
+         {
+            // All consumers busy - give up
+            break;
+         }
+         else if (status == HandleStatus.NO_MATCH && iterator == null)
+         {
+            // Consumers not all busy - but filter not accepting - iterate
+            // back
+            // through the queue
+            iterator = messageReferences.iterator();
+         }
+      }     
+   }
+   
    private synchronized HandleStatus add(final MessageReference ref, final boolean first)
    {
       if (!first)

Modified: trunk/tests/src/org/jboss/messaging/tests/concurrent/server/impl/QueueTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/concurrent/server/impl/QueueTest.java	2008-10-19 08:38:07 UTC (rev 5145)
+++ trunk/tests/src/org/jboss/messaging/tests/concurrent/server/impl/QueueTest.java	2008-10-19 08:48:48 UTC (rev 5146)
@@ -79,7 +79,7 @@
       
       consumer.setStatusImmediate(HandleStatus.HANDLED);
       
-      queue.deliver();
+      queue.deliverNow();
 
       if (sender.getException() != null)
       {
@@ -196,7 +196,7 @@
             {
                consumer.setStatusImmediate(HandleStatus.HANDLED);
                
-               queue.deliver();
+               queue.deliverNow();
             }
             toggle = !toggle;
             

Modified: trunk/tests/src/org/jboss/messaging/tests/timing/core/server/impl/QueueImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/timing/core/server/impl/QueueImplTest.java	2008-10-19 08:38:07 UTC (rev 5145)
+++ trunk/tests/src/org/jboss/messaging/tests/timing/core/server/impl/QueueImplTest.java	2008-10-19 08:48:48 UTC (rev 5146)
@@ -141,7 +141,7 @@
 
       queue.addConsumer(consumer);
 
-      queue.deliver();
+      queue.deliverNow();
 
       assertRefListsIdenticalRefs(refs, consumer.getReferences());
    }
@@ -201,7 +201,7 @@
 
          queue.addConsumer(consumer);
 
-         queue.deliver();
+         queue.deliverNow();
       }
 
       List<MessageReference> refs = new ArrayList<MessageReference>();
@@ -328,7 +328,7 @@
 
       queue.addConsumer(consumer);
 
-      queue.deliver();
+      queue.deliverNow();
 
       assertTrue(consumer.getReferences().isEmpty());
    }

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java	2008-10-19 08:38:07 UTC (rev 5145)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java	2008-10-19 08:48:48 UTC (rev 5146)
@@ -273,7 +273,7 @@
       assertEquals(10, queue.getMessageCount());
       assertEquals(0, queue.getScheduledCount());
 
-      queue.deliver();
+      queue.deliverNow();
 
       assertRefListsIdenticalRefs(refs, consumer.getReferences());
       assertEquals(numMessages, queue.getMessageCount());
@@ -308,7 +308,7 @@
       assertEquals(0, queue.getScheduledCount());
       assertEquals(0, queue.getDeliveringCount());
 
-      queue.deliver();
+      queue.deliverNow();
 
       assertEquals(10, queue.getMessageCount());
       assertEquals(0, queue.getScheduledCount());
@@ -317,7 +317,7 @@
 
       consumer.setStatusImmediate(HandleStatus.HANDLED);
 
-      queue.deliver();
+      queue.deliverNow();
 
       assertRefListsIdenticalRefs(refs, consumer.getReferences());
       assertEquals(10, queue.getMessageCount());
@@ -352,7 +352,7 @@
       assertEquals(0, queue.getScheduledCount());
       assertEquals(0, queue.getDeliveringCount());
 
-      queue.deliver();
+      queue.deliverNow();
 
       assertEquals(10, queue.getMessageCount());
       assertEquals(0, queue.getScheduledCount());
@@ -384,7 +384,7 @@
          queue.addLast(ref);
       }
 
-      queue.deliver();
+      queue.deliverNow();
 
       assertRefListsIdenticalRefs(refs, consumer.getReferences());
       assertEquals(30, queue.getMessageCount());
@@ -435,7 +435,7 @@
 
       queue.addConsumer(consumer);
 
-      queue.deliver();
+      queue.deliverNow();
 
       List<MessageReference> allRefs = new ArrayList<MessageReference>();
 
@@ -472,7 +472,7 @@
 
       queue.addConsumer(cons1);
 
-      queue.deliver();
+      queue.deliverNow();
 
       assertEquals(numMessages, queue.getMessageCount());
       assertEquals(0, queue.getScheduledCount());
@@ -658,7 +658,7 @@
 
       queue.addConsumer(cons2);
 
-      queue.deliver();
+      queue.deliverNow();
 
       assertEquals(numMessages / 2, cons1.getReferences().size());
 
@@ -692,7 +692,7 @@
 
       queue.addConsumer(cons2);
 
-      queue.deliver();
+      queue.deliverNow();
 
       for (int i = 0; i < numMessages; i++)
       {
@@ -796,7 +796,7 @@
 
       queue.addConsumer(consumer);
 
-      queue.deliver();
+      queue.deliverNow();
 
       assertTrue(consumer.getReferences().isEmpty());
    }
@@ -824,7 +824,7 @@
 
       queue.addConsumer(consumer);
 
-      queue.deliver();
+      queue.deliverNow();
 
       List<MessageReference> receivedRefs = consumer.getReferences();
 
@@ -978,7 +978,7 @@
 
       queue.addConsumer(consumer);
 
-      queue.deliver();
+      queue.deliverNow();
 
 
       refs.clear();
@@ -1069,7 +1069,7 @@
       {
          queue.addConsumer(consumer);
 
-         queue.deliver();
+         queue.deliverNow();
       }
 
       assertEquals(6, queue.getMessageCount());
@@ -1089,7 +1089,7 @@
 
       queue.addConsumer(consumer);
 
-      queue.deliver();
+      queue.deliverNow();
 
       assertEquals(4, queue.getMessageCount());
 
@@ -1113,7 +1113,7 @@
       EasyMock.expect(consumer.handle(messageReference2)).andReturn(HandleStatus.HANDLED);
       EasyMock.replay(consumer);
       queue.addConsumer(consumer);
-      queue.deliver();
+      queue.deliverNow();
       EasyMock.verify(consumer);
    }
 
@@ -1167,7 +1167,7 @@
       queue.addListFirst(messageReferences);
       queue.removeReferenceWithID(2);
       queue.addConsumer(consumer);
-      queue.deliver();
+      queue.deliverNow();
       EasyMock.verify(consumer);
 
    }




More information about the jboss-cvs-commits mailing list