[hornetq-commits] JBoss hornetq SVN: r10168 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/paging/cursor/impl and 3 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Feb 1 21:21:14 EST 2011


Author: clebert.suconic at jboss.com
Date: 2011-02-01 21:21:13 -0500 (Tue, 01 Feb 2011)
New Revision: 10168

Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PageSubscription.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java
Log:
Fixing Tests and Page Counters

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PageSubscription.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PageSubscription.java	2011-02-01 17:02:29 UTC (rev 10167)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PageSubscription.java	2011-02-02 02:21:13 UTC (rev 10168)
@@ -96,6 +96,8 @@
 
    void processReload() throws Exception;
 
+   void addPendingDelivery(final PagePosition position);
+   
    /**      
     * To be used on redeliveries
     * @param position

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java	2011-02-01 17:02:29 UTC (rev 10167)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java	2011-02-02 02:21:13 UTC (rev 10168)
@@ -121,7 +121,7 @@
       if (pos.getMessageNr() >= cache.getNumberOfMessages())
       {
          // sanity check, this should never happen unless there's a bug
-         throw new IllegalStateException("Invalid messageNumber passed = " + pos);
+         throw new IllegalStateException("Invalid messageNumber passed = " + pos + " on " + cache);
       }
 
       return cache.getMessage(pos.getMessageNr());
@@ -255,7 +255,7 @@
             cursorList.addAll(activeCursors.values());
 
             long minPage = checkMinPage(cursorList);
-
+            
             if (minPage == pagingStore.getCurrentWritingPage() && pagingStore.getCurrentPage().getNumberOfMessages() > 0)
             {
                boolean complete = true;

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java	2011-02-01 17:02:29 UTC (rev 10167)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java	2011-02-02 02:21:13 UTC (rev 10168)
@@ -73,8 +73,7 @@
 
    private static void trace(final String message)
    {
-      // PageCursorImpl.log.info(message);
-      System.out.println(message);
+      PageSubscriptionImpl.log.info(message);
    }
 
    private volatile boolean autoCleanup = true;
@@ -102,6 +101,8 @@
    private final SortedMap<Long, PageCursorInfo> consumedPages = Collections.synchronizedSortedMap(new TreeMap<Long, PageCursorInfo>());
    
    private final PageSubscriptionCounter counter;
+   
+   private final AtomicLong deliveredCount = new AtomicLong(0);
 
    // We only store the position for redeliveries. They will be read from the SoftCache again during delivery.
    private final ConcurrentLinkedQueue<PagePosition> redeliveries = new ConcurrentLinkedQueue<PagePosition>();
@@ -176,7 +177,7 @@
    
    public long getMessageCount()
    {
-      return counter.getValue();
+      return counter.getValue() - deliveredCount.get();
    }
 
    public PageSubscriptionCounter getCounter()
@@ -272,7 +273,7 @@
                   synchronized (PageSubscriptionImpl.this)
                   {
                      for (PageCursorInfo completePage : completedPages)
-                     {
+                     {  
                         if (isTrace)
                         {
                            PageSubscriptionImpl.trace("Removing page " + completePage.getPageId());
@@ -474,6 +475,11 @@
          return consumedPages.firstKey();
       }
    }
+   
+   public void addPendingDelivery(final PagePosition position)
+   {
+      getPageInfo(position).incrementPendingTX();
+   }
 
    /* (non-Javadoc)
     * @see org.hornetq.core.paging.cursor.PageCursor#returnElement(org.hornetq.core.paging.cursor.PagePosition)
@@ -483,6 +489,15 @@
       synchronized (redeliveries)
       {
          redeliveries.add(position);
+         PageCursorInfo pageInfo = consumedPages.get(position.getPageNr());
+         if (pageInfo != null)
+         {
+            pageInfo.decrementPendingTX();
+         }
+         else
+         {
+            // this shouldn't really happen.
+         }
       }
    }
 
@@ -823,6 +838,9 @@
 
       // The page was live at the time of the creation
       private final boolean wasLive;
+      
+      // There's a pending TX to add elements on this page
+      private AtomicInteger pendingTX = new AtomicInteger(0);
 
       // There's a pending delete on the async IO pipe
       // We're holding this object to avoid delete the pages before the IO is complete,
@@ -856,7 +874,7 @@
 
       public boolean isDone()
       {
-         return getNumberOfMessages() == confirmed.get();
+         return getNumberOfMessages() == confirmed.get() && pendingTX.get() == 0;
       }
 
       public boolean isPendingDelete()
@@ -876,6 +894,16 @@
       {
          return pageId;
       }
+      
+      public void incrementPendingTX()
+      {
+         pendingTX.incrementAndGet();
+      }
+      
+      public void decrementPendingTX()
+      {
+         pendingTX.decrementAndGet();
+      }
 
       public boolean isRemoved(final PagePosition pos)
       {
@@ -967,6 +995,7 @@
             for (PagePosition confirmed : positions)
             {
                cursor.processACK(confirmed);
+               cursor.deliveredCount.decrementAndGet();
             }
 
          }
@@ -1201,10 +1230,8 @@
        */
       public void remove()
       {
-         if (!isredelivery)
-         {
-            PageSubscriptionImpl.this.getPageInfo(position).remove(position);
-         }
+         deliveredCount.incrementAndGet();
+         PageSubscriptionImpl.this.getPageInfo(position).remove(position);
       }
 
       /* (non-Javadoc)

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java	2011-02-01 17:02:29 UTC (rev 10167)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java	2011-02-02 02:21:13 UTC (rev 10168)
@@ -250,6 +250,7 @@
    {
       if (committed && useRedelivery)
       {
+         cursor.addPendingDelivery(cursorPos);
          cursor.redeliver(cursorPos);
          return true;
       }
@@ -271,6 +272,7 @@
          {
             lateDeliveries = new LinkedList<Pair<PageSubscription, PagePosition>>();
          }
+         cursor.addPendingDelivery(cursorPos);
          lateDeliveries.add(new Pair<PageSubscription, PagePosition>(cursor, cursorPos));
          return true;
       }

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2011-02-01 17:02:29 UTC (rev 10167)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2011-02-02 02:21:13 UTC (rev 10168)
@@ -849,8 +849,6 @@
             return false;
          }
 
-         PagedMessage pagedMessage;
-
          if (!message.isDurable())
          {
             // The address should never be transient when paging (even for non-persistent messages when paging)
@@ -858,8 +856,9 @@
             message.bodyChanged();
          }
 
-         pagedMessage = new PagedMessageImpl(message, routeQueues(tx, listCtx), installPageTransaction(tx, listCtx));
 
+         PagedMessage pagedMessage = new PagedMessageImpl(message, routeQueues(tx, listCtx), tx == null ? -1 : tx.getID());
+
          int bytesToWrite = pagedMessage.getEncodeSize() + PageImpl.SIZE_RECORD;
 
          if (currentPageSize.addAndGet(bytesToWrite) > pageSize && currentPage.getNumberOfMessages() > 0)
@@ -868,7 +867,9 @@
             openNewPage();
             currentPageSize.addAndGet(bytesToWrite);
          }
-
+         
+         installPageTransaction(tx, listCtx, currentPage.getPageId());
+ 
          currentPage.write(pagedMessage);
 
          if (tx != null)
@@ -920,11 +921,11 @@
       return ids;
    }
 
-   private long installPageTransaction(final Transaction tx, final RouteContextList listCtx) throws Exception
+   private PageTransactionInfo installPageTransaction(final Transaction tx, final RouteContextList listCtx, int pageID) throws Exception
    {
       if (tx == null)
       {
-         return -1;
+         return null;
       }
       else
       {
@@ -939,7 +940,7 @@
 
          pgTX.increment(listCtx.getNumberOfQueues());
 
-         return tx.getID();
+         return pgTX;
       }
    }
 

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java	2011-02-01 17:02:29 UTC (rev 10167)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java	2011-02-02 02:21:13 UTC (rev 10168)
@@ -668,7 +668,7 @@
          if (pageSubscription != null)
          {
             // messageReferences will have depaged messages which we need to discount from the counter as they are counted on the pageSubscription as well
-            return messageReferences.size() - pagedReferences.get() + getScheduledCount()  + deliveringCount.get() +  pageSubscription.getMessageCount();
+            return messageReferences.size() + getScheduledCount()  + deliveringCount.get() +  pageSubscription.getMessageCount();
          }
          else
          {

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java	2011-02-01 17:02:29 UTC (rev 10167)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java	2011-02-02 02:21:13 UTC (rev 10168)
@@ -80,7 +80,7 @@
    // Constants -----------------------------------------------------
    private static final Logger log = Logger.getLogger(PagingTest.class);
 
-   private static final int RECEIVE_TIMEOUT = 30000;
+   private static final int RECEIVE_TIMEOUT = 5000;
 
    private static final int PAGE_MAX = 100 * 1024;
 
@@ -237,6 +237,8 @@
 
          sessionCheck.close();
          
+         System.out.println(queue.getMessagesAdded());
+         
          assertEquals(numberOfMessages, queue.getMessageCount());
 
          sf.close();
@@ -359,7 +361,7 @@
 
       final int messageSize = 1024;
 
-      final int numberOfMessages = 30000;
+      final int numberOfMessages = 3000;
 
       final byte[] body = new byte[messageSize];
 
@@ -563,7 +565,7 @@
 
       final int numberOfIntegers = 256;
 
-      final int numberOfMessages = 10000;
+      final int numberOfMessages = 1000;
 
       try
       {
@@ -1032,6 +1034,7 @@
             ClientMessage message = sessionNonTX.createMessage(true);
             message.getBodyBuffer().writeBytes(body);
             message.putIntProperty(new SimpleString("id"), i);
+            message.putStringProperty(new SimpleString("tst"),  new SimpleString("i=" + i));
 
             producerTransacted.send(message);
 
@@ -1041,6 +1044,7 @@
                for (int j = 0; j < 20; j++)
                {
                   ClientMessage msgSend = sessionNonTX.createMessage(true);
+                  msgSend.putStringProperty(new SimpleString("tst"),  new SimpleString("i=" + i + ", j=" + j));
                   msgSend.getBodyBuffer().writeBytes(new byte[10 * 1024]);
                   producerNonTransacted.send(msgSend);
                }
@@ -1403,7 +1407,7 @@
 
       server.start();
 
-      final int numberOfMessages = 10000;
+      final int numberOfMessages = 1000;
 
       final int numberOfBytes = 1024;
 



More information about the hornetq-commits mailing list