[hornetq-commits] JBoss hornetq SVN: r9885 - in branches/Branch_New_Paging: src/main/org/hornetq/core/paging/impl and 1 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Sat Nov 13 00:38:13 EST 2010


Author: clebert.suconic at jboss.com
Date: 2010-11-13 00:38:12 -0500 (Sat, 13 Nov 2010)
New Revision: 9885

Modified:
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
   branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/client/PagingTest.java
Log:
Fixing sort order

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java	2010-11-13 02:43:47 UTC (rev 9884)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java	2010-11-13 05:38:12 UTC (rev 9885)
@@ -23,7 +23,6 @@
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.paging.Page;
 import org.hornetq.core.paging.PagedMessage;
-import org.hornetq.core.paging.PagingManager;
 import org.hornetq.core.paging.PagingStore;
 import org.hornetq.core.paging.cursor.PageCache;
 import org.hornetq.core.paging.cursor.PageCursorProvider;

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java	2010-11-13 02:43:47 UTC (rev 9884)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java	2010-11-13 05:38:12 UTC (rev 9885)
@@ -24,6 +24,7 @@
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -48,7 +49,6 @@
 import org.hornetq.core.transaction.impl.TransactionImpl;
 import org.hornetq.utils.ConcurrentHashSet;
 import org.hornetq.utils.Future;
-import org.hornetq.utils.LinkedListImpl;
 import org.hornetq.utils.LinkedListIterator;
 
 /**
@@ -99,7 +99,7 @@
    private final SortedMap<Long, PageCursorInfo> consumedPages = Collections.synchronizedSortedMap(new TreeMap<Long, PageCursorInfo>());
 
    // We only store the position for redeliveries. They will be read from the SoftCache again during delivery.
-   private final org.hornetq.utils.LinkedList<PagePosition> redeliveries = new LinkedListImpl<PagePosition>();
+   private final ConcurrentLinkedQueue<PagePosition> redeliveries = new ConcurrentLinkedQueue<PagePosition>();
 
    // Static --------------------------------------------------------
 
@@ -464,7 +464,7 @@
    {
       synchronized (redeliveries)
       {
-         redeliveries.addTail(position);
+         redeliveries.add(position);
       }
    }
 
@@ -961,20 +961,16 @@
 
       private PagePosition lastOperation = null;
 
-      private final LinkedListIterator<PagePosition> redeliveryIterator;
-
       private volatile boolean isredelivery = false;
 
+      private volatile PagedReference lastRedelivery = null;
+
       /** next element taken on hasNext test.
        *  it has to be delivered on next next operation */
       private volatile PagedReference cachedNext;
 
       public CursorIterator()
       {
-         synchronized (redeliveries)
-         {
-            redeliveryIterator = redeliveries.iterator();
-         }
       }
 
       public void repeat()
@@ -983,7 +979,7 @@
          {
             synchronized (redeliveries)
             {
-               redeliveryIterator.repeat();
+               cachedNext = lastRedelivery;
             }
          }
          else
@@ -1045,16 +1041,20 @@
             {
                synchronized (redeliveries)
                {
-                  if (redeliveryIterator.hasNext())
+                  PagePosition redelivery = redeliveries.poll();
+
+                  if (redelivery != null)
                   {
                      // There's a redelivery pending, we will get it out of that pool instead
                      isredelivery = true;
-                     PagedReference redeliveredMsg = getReference(redeliveryIterator.next());
+                     PagedReference redeliveredMsg = getReference(redelivery);
+                     lastRedelivery = redeliveredMsg;
 
                      return redeliveredMsg;
                   }
                   else
                   {
+                     lastRedelivery = null;
                      isredelivery = false;
                   }
 
@@ -1077,7 +1077,9 @@
 
                valid = routed(message.getPagedMessage());
                if (!valid)
+               {
                   ignored = true;
+               }
 
                // 2nd ... if TX, is it committed?
                if (valid && message.getPagedMessage().getTransactionID() != 0)
@@ -1171,7 +1173,10 @@
        */
       public void remove()
       {
-         PageSubscriptionImpl.this.getPageInfo(position).remove(position);
+         if (!isredelivery)
+         {
+            PageSubscriptionImpl.this.getPageInfo(position).remove(position);
+         }
       }
 
       /* (non-Javadoc)

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2010-11-13 02:43:47 UTC (rev 9884)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2010-11-13 05:38:12 UTC (rev 9885)
@@ -14,9 +14,7 @@
 package org.hornetq.core.paging.impl;
 
 import java.text.DecimalFormat;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executor;
@@ -43,7 +41,6 @@
 import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.core.postoffice.DuplicateIDCache;
 import org.hornetq.core.postoffice.PostOffice;
-import org.hornetq.core.server.LargeServerMessage;
 import org.hornetq.core.server.RouteContextList;
 import org.hornetq.core.server.RoutingContext;
 import org.hornetq.core.server.ServerMessage;
@@ -53,7 +50,6 @@
 import org.hornetq.core.transaction.Transaction.State;
 import org.hornetq.core.transaction.TransactionOperation;
 import org.hornetq.core.transaction.TransactionPropertyIndexes;
-import org.hornetq.core.transaction.impl.TransactionImpl;
 import org.hornetq.utils.ExecutorFactory;
 import org.hornetq.utils.Future;
 
@@ -1049,19 +1045,14 @@
       {
          numberOfPages++;
 
-         currentPageId++;
+         int tmpCurrentPageId = currentPageId + 1;
 
-         if (currentPageId < firstPageId)
-         {
-            firstPageId = currentPageId;
-         }
-
          if (currentPage != null)
          {
             currentPage.close();
          }
 
-         currentPage = createPage(currentPageId);
+         currentPage = createPage(tmpCurrentPageId);
 
          LivePageCache pageCache = new LivePageCacheImpl(currentPage);
 
@@ -1072,6 +1063,13 @@
          currentPageSize.set(0);
 
          currentPage.open();
+
+         currentPageId = tmpCurrentPageId;
+
+         if (currentPageId < firstPageId)
+         {
+            firstPageId = currentPageId;
+         }
       }
       finally
       {

Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/client/PagingTest.java	2010-11-13 02:43:47 UTC (rev 9884)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/client/PagingTest.java	2010-11-13 05:38:12 UTC (rev 9885)
@@ -966,7 +966,7 @@
          ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS);
 
          for (int i = 0; i < numberOfMessages; i++)
-         {
+         {  
             ClientMessage msg = consumer.receive(5000);
             assertNotNull(msg);
             assertEquals(i, msg.getIntProperty("count").intValue());



More information about the hornetq-commits mailing list