[hornetq-commits] JBoss hornetq SVN: r10070 - in trunk: src/main/org/hornetq/core/paging/cursor/impl and 2 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Dec 22 18:11:41 EST 2010


Author: clebert.suconic at jboss.com
Date: 2010-12-22 18:11:40 -0500 (Wed, 22 Dec 2010)
New Revision: 10070

Modified:
   trunk/src/main/org/hornetq/core/paging/cursor/PageSubscription.java
   trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
   trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
   trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java
Log:
Paging changes

Modified: trunk/src/main/org/hornetq/core/paging/cursor/PageSubscription.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/cursor/PageSubscription.java	2010-12-22 20:03:31 UTC (rev 10069)
+++ trunk/src/main/org/hornetq/core/paging/cursor/PageSubscription.java	2010-12-22 23:11:40 UTC (rev 10070)
@@ -36,6 +36,8 @@
    void bookmark(PagePosition position) throws Exception;
    
    PageSubscriptionCounter getCounter();
+   
+   long getMessageCount();
 
    long getId();
 

Modified: trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java	2010-12-22 20:03:31 UTC (rev 10069)
+++ trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java	2010-12-22 23:11:40 UTC (rev 10070)
@@ -28,6 +28,7 @@
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.hornetq.core.filter.Filter;
 import org.hornetq.core.journal.IOAsyncTask;
@@ -101,6 +102,8 @@
    private final SortedMap<Long, PageCursorInfo> consumedPages = Collections.synchronizedSortedMap(new TreeMap<Long, PageCursorInfo>());
    
    private final PageSubscriptionCounter counter;
+   
+   private final AtomicLong deliveredCount = new AtomicLong(0);
 
    // We only store the position for redeliveries. They will be read from the SoftCache again during delivery.
    private final ConcurrentLinkedQueue<PagePosition> redeliveries = new ConcurrentLinkedQueue<PagePosition>();
@@ -171,6 +174,13 @@
       confirmPosition(position);
    }
 
+   
+   
+   public long getMessageCount()
+   {
+      return counter.getValue() - deliveredCount.get();
+   }
+
    public PageSubscriptionCounter getCounter()
    {
       return counter;
@@ -959,6 +969,7 @@
             for (PagePosition confirmed : positions)
             {
                cursor.processACK(confirmed);
+               cursor.deliveredCount.decrementAndGet();
             }
 
          }
@@ -1195,6 +1206,7 @@
       {
          if (!isredelivery)
          {
+            deliveredCount.incrementAndGet();
             PageSubscriptionImpl.this.getPageInfo(position).remove(position);
          }
       }

Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java	2010-12-22 20:03:31 UTC (rev 10069)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java	2010-12-22 23:11:40 UTC (rev 10070)
@@ -661,7 +661,7 @@
       {
          if (pageSubscription != null)
          {
-            return messageReferences.size() + getScheduledCount() + getDeliveringCount() + pageSubscription.getCounter().getValue();
+            return messageReferences.size() + getScheduledCount() + getDeliveringCount() + pageSubscription.getMessageCount();
          }
          else
          {
@@ -1639,6 +1639,11 @@
       }
 
       queue.deliveringCount.decrementAndGet();
+      
+      if (queue.deliveringCount.get() < 0)
+      {
+         new Exception("DeliveringCount became negative").printStackTrace();
+      }
 
       try
       {

Modified: trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java	2010-12-22 20:03:31 UTC (rev 10069)
+++ trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java	2010-12-22 23:11:40 UTC (rev 10070)
@@ -236,6 +236,8 @@
          assertNull(consumer.receiveImmediate());
 
          sessionCheck.close();
+         
+         assertEquals(numberOfMessages, queue.getMessageCount());
 
          sf.close();
          locator.close();
@@ -277,6 +279,7 @@
          
          locator.close();
          
+         queue.getMessageCount();
          //assertEquals(numberOfMessages, queue.getMessageCount());
       }
       finally



More information about the hornetq-commits mailing list