[hornetq-commits] JBoss hornetq SVN: r10144 - in branches/Branch_2_2_EAP: tests/src/org/hornetq/tests/integration/client and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Jan 25 13:00:45 EST 2011


Author: clebert.suconic at jboss.com
Date: 2011-01-25 13:00:45 -0500 (Tue, 25 Jan 2011)
New Revision: 10144

Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingOrderTest.java
Log:
HORNETQ-628 - fix on paging counters

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java	2011-01-25 09:07:51 UTC (rev 10143)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java	2011-01-25 18:00:45 UTC (rev 10144)
@@ -100,6 +100,9 @@
    private final ConcurrentLinkedQueue<MessageReference> concurrentQueue = new ConcurrentLinkedQueue<MessageReference>();
 
    private final PriorityLinkedList<MessageReference> messageReferences = new PriorityLinkedListImpl<MessageReference>(QueueImpl.NUM_PRIORITIES);
+   
+   // The quantity of pagedReferences on messageREferences priority list
+   private final AtomicInteger pagedReferences = new AtomicInteger(0);
 
    private final List<ConsumerHolder> consumerList = new ArrayList<ConsumerHolder>();
 
@@ -316,16 +319,17 @@
          return;
       }
 
-      messageReferences.addHead(ref, ref.getMessage().getPriority());
+      internalAddHead(ref);
 
       directDeliver = false;
    }
 
+
    public synchronized void reload(final MessageReference ref)
    {
       if (!scheduledDeliveryHandler.checkAndSchedule(ref))
       {
-         messageReferences.addTail(ref, ref.getMessage().getPriority());
+         internalAddTail(ref);
       }
 
       directDeliver = false;
@@ -621,6 +625,7 @@
          if (ref.getMessage().getMessageID() == id)
          {
             iterator.remove();
+            refRemoved(ref);
 
             removed = ref;
 
@@ -662,7 +667,8 @@
       {
          if (pageSubscription != null)
          {
-            return messageReferences.size() + getScheduledCount()  + deliveringCount.get() +  pageSubscription.getMessageCount();
+            // messageReferences will have depaged messages which we need to discount from the counter as they are counted on the pageSubscription as well
+            return messageReferences.size() - pagedReferences.get() + getScheduledCount()  + deliveringCount.get() +  pageSubscription.getMessageCount();
          }
          else
          {
@@ -778,7 +784,7 @@
       {
          if (!scheduledDeliveryHandler.checkAndSchedule(reference))
          {
-            messageReferences.addHead(reference, reference.getMessage().getPriority());
+            internalAddHead(reference);
          }
 
          resetAllIterators();
@@ -839,6 +845,7 @@
             deliveringCount.incrementAndGet();
             acknowledge(tx, ref);
             iter.remove();
+            refRemoved(ref);
             count++;
          }
       }
@@ -872,6 +879,7 @@
             deliveringCount.incrementAndGet();
             acknowledge(tx, ref);
             iter.remove();
+            refRemoved(ref);
             deleted = true;
             break;
          }
@@ -894,6 +902,7 @@
             deliveringCount.incrementAndGet();
             expire(ref);
             iter.remove();
+            refRemoved(ref);
             return true;
          }
       }
@@ -915,6 +924,7 @@
             deliveringCount.incrementAndGet();
             expire(tx, ref);
             iter.remove();
+            refRemoved(ref);
             count++;
          }
       }
@@ -936,6 +946,7 @@
             deliveringCount.incrementAndGet();
             expire(ref);
             iter.remove();
+            refRemoved(ref);
          }
       }
    }
@@ -952,6 +963,7 @@
             deliveringCount.incrementAndGet();
             sendToDeadLetterAddress(ref);
             iter.remove();
+            refRemoved(ref);
             return true;
          }
       }
@@ -971,6 +983,7 @@
             deliveringCount.incrementAndGet();
             sendToDeadLetterAddress(ref);
             iter.remove();
+            refRemoved(ref);
             count++;
          }
       }
@@ -992,6 +1005,7 @@
          if (ref.getMessage().getMessageID() == messageID)
          {
             iter.remove();
+            refRemoved(ref);
             deliveringCount.incrementAndGet();
             try
             {
@@ -1097,6 +1111,7 @@
          if (ref.getMessage().getMessageID() == messageID)
          {
             iter.remove();
+            refRemoved(ref);
             ref.getMessage().setPriority(newPriority);
             addTail(ref, false);
             return true;
@@ -1118,6 +1133,7 @@
          {
             count++;
             iter.remove();
+            refRemoved(ref);
             ref.getMessage().setPriority(newPriority);
             addTail(ref, false);
          }
@@ -1186,13 +1202,38 @@
    // Private
    // ------------------------------------------------------------------------------
 
+   /**
+    * @param ref
+    */
+   private void internalAddTail(final MessageReference ref)
+   {
+      if (ref.isPaged())
+      {
+         pagedReferences.incrementAndGet();
+      }
+      messageReferences.addTail(ref, ref.getMessage().getPriority());
+   }
+
+   /**
+    * @param ref
+    */
+   private void internalAddHead(final MessageReference ref)
+   {
+      if (ref.isPaged())
+      {
+         pagedReferences.incrementAndGet();
+      }
+      messageReferences.addHead(ref, ref.getMessage().getPriority());
+   }
+
+
    private synchronized void doPoll()
    {
       MessageReference ref = concurrentQueue.poll();
 
       if (ref != null)
       {
-         messageReferences.addTail(ref, ref.getMessage().getPriority());
+         internalAddTail(ref);
 
          messagesAdded++;
 
@@ -1264,6 +1305,8 @@
             if (checkExpired(ref))
             {
                holder.iter.remove();
+               
+               refRemoved(ref);
 
                continue;
             }
@@ -1289,6 +1332,8 @@
             if (status == HandleStatus.HANDLED)
             {
                holder.iter.remove();
+               
+               refRemoved(ref);
 
                if (groupID != null && groupConsumer == null)
                {
@@ -1333,6 +1378,18 @@
          scheduleDepage();
       }
    }
+
+
+   /**
+    * @param ref
+    */
+   private void refRemoved(MessageReference ref)
+   {
+      if (ref.isPaged())
+      {
+         pagedReferences.decrementAndGet();
+      }
+   }
    
    private void scheduleDepage()
    {

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingOrderTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingOrderTest.java	2011-01-25 09:07:51 UTC (rev 10143)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingOrderTest.java	2011-01-25 18:00:45 UTC (rev 10144)
@@ -473,7 +473,7 @@
                   ClientSession sess = sf.createSession(true, true, 0);
                   sess.start();
                   ClientConsumer cons = sess.createConsumer(ADDRESS);
-                  for (int i = 0; i < 10; i++)
+                  for (int i = 0; i < 100; i++)
                   {
                      ClientMessage msg = cons.receive(5000);
                      assertNotNull(msg);
@@ -510,14 +510,22 @@
          }
 
          session.commit();
+         
+         q1.getMessageCount();
 
          t1.start();
          t1.join();
 
          assertEquals(0, errors.get());
+         long timeout = System.currentTimeMillis() + 10000;
+         while (numberOfMessages -100 != q1.getMessageCount() && System.currentTimeMillis() < timeout)
+         {
+            Thread.sleep(500);
+            
+         }
 
          assertEquals(numberOfMessages, q2.getMessageCount());
-         assertEquals(numberOfMessages - 10, q1.getMessageCount());
+         assertEquals(numberOfMessages - 100, q1.getMessageCount());
 
       }
       catch (Throwable e)



More information about the hornetq-commits mailing list