[hornetq-commits] JBoss hornetq SVN: r9878 - in branches/Branch_New_Paging: src/main/org/hornetq/core/paging/cursor/impl and 2 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Nov 11 23:51:40 EST 2010


Author: clebert.suconic at jboss.com
Date: 2010-11-11 23:51:39 -0500 (Thu, 11 Nov 2010)
New Revision: 9878

Modified:
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
   branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
Log:
moving method moveNext from PageCursor to PageSubscription

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java	2010-11-11 22:23:30 UTC (rev 9877)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java	2010-11-12 04:51:39 UTC (rev 9878)
@@ -54,8 +54,6 @@
    
    PageSubscription createSubscription(long queueId, Filter filter, boolean durable);
    
-   PagedReference getNext(PageSubscription cursor, PagePosition pos) throws Exception;
-   
    PagedMessage getMessage(PagePosition pos) throws Exception;
 
    void processReload() throws Exception;

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java	2010-11-11 22:23:30 UTC (rev 9877)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java	2010-11-12 04:51:39 UTC (rev 9878)
@@ -22,7 +22,6 @@
 import org.hornetq.core.filter.Filter;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.paging.Page;
-import org.hornetq.core.paging.PageTransactionInfo;
 import org.hornetq.core.paging.PagedMessage;
 import org.hornetq.core.paging.PagingManager;
 import org.hornetq.core.paging.PagingStore;
@@ -33,7 +32,6 @@
 import org.hornetq.core.paging.cursor.PagedReference;
 import org.hornetq.core.paging.cursor.PagedReferenceImpl;
 import org.hornetq.core.persistence.StorageManager;
-import org.hornetq.core.server.ServerMessage;
 import org.hornetq.utils.ExecutorFactory;
 import org.hornetq.utils.Future;
 import org.hornetq.utils.SoftValueHashMap;
@@ -59,8 +57,6 @@
 
    private final PagingStore pagingStore;
 
-   private final PagingManager pagingManager;
-
    private final StorageManager storageManager;
 
    private final ExecutorFactory executorFactory;
@@ -80,7 +76,6 @@
                                  final ExecutorFactory executorFactory)
    {
       this.pagingStore = pagingStore;
-      this.pagingManager = pagingStore.getPagingManager();
       this.storageManager = storageManager;
       this.executorFactory = executorFactory;
       this.executor = executorFactory.getExecutor();
@@ -120,104 +115,6 @@
       return activeCursors.get(cursorID);
    }
 
-   /* (non-Javadoc)
-    * @see org.hornetq.core.paging.cursor.PageCursorProvider#getAfter(org.hornetq.core.paging.cursor.PagePosition)
-    */
-   public PagedReference getNext(final PageSubscription cursor, PagePosition cursorPos) throws Exception
-   {
-
-      while (true)
-      {
-         PagedReference retPos = internalGetNext(cursorPos, cursor);
-
-         if (retPos == null)
-         {
-            return null;
-         }
-         else if (retPos != null)
-         {
-            cursorPos = retPos.getPosition();
-            
-            if (!routed(retPos.getPagedMessage(), cursor))
-            {
-               cursor.positionIgnored(cursorPos);
-            }
-            else
-            if (retPos.getPagedMessage().getTransactionID() != 0)
-            {
-               PageTransactionInfo tx = pagingManager.getTransaction(retPos.getPagedMessage().getTransactionID());
-               if (tx == null)
-               {
-                  log.warn("Couldn't locate page transaction " + retPos.getPagedMessage().getTransactionID() +
-                           ", ignoring message on position " +
-                           retPos.getPosition());
-                  cursor.positionIgnored(cursorPos);
-               }
-               else
-               {
-                  if (!tx.deliverAfterCommit(cursor, cursorPos))
-                  {
-                     return retPos;
-                  }
-               }
-            }
-            else
-            {
-               return retPos;
-            }
-         }
-      }
-   }
-   
-   private boolean routed(PagedMessage message, PageSubscription subs)
-   {
-      long id = subs.getId();
-      
-      for (long qid : message.getQueueIDs())
-      {
-         if (qid == id)
-         {
-            return true;
-         }
-      }
-      return false;
-   }
-
-   private PagedReference internalGetNext(final PagePosition pos, final PageSubscription sub)
-   {
-      PagePosition retPos = pos.nextMessage();
-
-      PageCache cache = getPageCache(pos);
-
-      if (!cache.isLive() && retPos.getMessageNr() >= cache.getNumberOfMessages())
-      {
-         retPos = pos.nextPage();
-
-         cache = getPageCache(retPos);
-
-         if (cache == null)
-         {
-            return null;
-         }
-
-         if (retPos.getMessageNr() >= cache.getNumberOfMessages())
-         {
-            return null;
-         }
-      }
-
-      PagedMessage serverMessage = cache.getMessage(retPos.getMessageNr());
-
-      if (serverMessage != null)
-      {
-         return newReference(retPos, serverMessage, sub);
-      }
-      else
-      {
-         return null;
-      }
-   }
-
    public PagedMessage getMessage(final PagePosition pos) throws Exception
    {
       PageCache cache = getPageCache(pos);

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java	2010-11-11 22:23:30 UTC (rev 9877)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java	2010-11-12 04:51:39 UTC (rev 9878)
@@ -79,9 +79,9 @@
    private final StorageManager store;
 
    private final long cursorId;
-   
+
    private Queue queue;
-   
+
    private final boolean persistent;
 
    private final Filter filter;
@@ -106,12 +106,12 @@
    // Constructors --------------------------------------------------
 
    public PageSubscriptionImpl(final PageCursorProvider cursorProvider,
-                         final PagingStore pageStore,
-                         final StorageManager store,
-                         final Executor executor,
-                         final Filter filter,
-                         final long cursorId,
-                         final boolean persistent)
+                               final PagingStore pageStore,
+                               final StorageManager store,
+                               final Executor executor,
+                               final Filter filter,
+                               final long cursorId,
+                               final boolean persistent)
    {
       this.pageStore = pageStore;
       this.store = store;
@@ -128,17 +128,17 @@
    {
       return queue;
    }
-   
+
    public boolean isPaging()
    {
       return pageStore.isPaging();
    }
-   
+
    public void setQueue(Queue queue)
    {
       this.queue = queue;
    }
-   
+
    public void disableAutoCleanup()
    {
       autoCleanup = false;
@@ -209,7 +209,8 @@
             {
                if (entry.getKey() == lastAckedPosition.getPageNr())
                {
-                  PageSubscriptionImpl.trace("We can't clear page " + entry.getKey() + " now since it's the current page");
+                  PageSubscriptionImpl.trace("We can't clear page " + entry.getKey() +
+                                             " now since it's the current page");
                }
                else
                {
@@ -261,8 +262,8 @@
                         if (consumedPages.remove(completePage.getPageId()) == null)
                         {
                            PageSubscriptionImpl.log.warn("Couldn't remove page " + completePage.getPageId() +
-                                                   " from consumed pages on cursor for address " +
-                                                   pageStore.getAddress());
+                                                         " from consumed pages on cursor for address " +
+                                                         pageStore.getAddress());
                         }
                      }
                   }
@@ -303,30 +304,64 @@
 
       do
       {
-         message = cursorProvider.getNext(this, tmpPosition);
+         message = internalGetNext(tmpPosition);
          
-         boolean valid = true;
-         
+
          if (message == null)
          {
-            valid = false;
+            break;
          }
-         else
+         
+         tmpPosition = message.getPosition();
+
+         boolean valid = true;
+         boolean ignored = false;
+
+         // Validate the scenarios where the message should be considered not valid even to be considered
+
+         // 1st... is it routed?
+
+         valid = routed(message.getPagedMessage());
+         if (!valid) ignored = true;
+
+         // 2nd ... if TX, is it committed?
+         if (valid && message.getPagedMessage().getTransactionID() != 0)
          {
+            PageTransactionInfo tx = pageStore.getPagingManager().getTransaction(message.getPagedMessage()
+                                                                                       .getTransactionID());
+            if (tx == null)
+            {
+               log.warn("Couldn't locate page transaction " + message.getPagedMessage().getTransactionID() +
+                        ", ignoring message on position " +
+                        message.getPosition());
+               valid = false;
+               ignored = true;
+            }
+            else
+            {
+               if (tx.deliverAfterCommit(this, message.getPosition()))
+               {
+                  valid = false;
+                  ignored = false;
+               }
+            }
+         }
+
+         // 3rd... was it previously removed?
+         if (valid)
+         {
             // We don't create a PageCursorInfo unless we are doing a write operation (ack or removing)
-            // Say you have a Browser that will only read the files... there's no need to control PageCursors is nothing 
+            // Say you have a Browser that will only read the files... there's no need to control PageCursors is nothing
             // is being changed. That's why the false is passed as a parameter here
             PageCursorInfo info = getPageInfo(message.getPosition(), false);
             if (info != null && info.isRemoved(message.getPosition()))
             {
-               tmpPosition = message.getPosition();
                valid = false;
             }
          }
+
          if (valid)
          {
-            tmpPosition = message.getPosition();
-
             match = match(message.getMessage());
 
             if (!match)
@@ -334,12 +369,70 @@
                processACK(message.getPosition());
             }
          }
+         else if (ignored)
+         {
+            positionIgnored(message.getPosition());
+         }
       }
       while (message != null && !match);
 
       return message;
    }
 
+   private PagedReference internalGetNext(final PagePosition pos)
+   {
+      PagePosition retPos = pos.nextMessage();
+
+      PageCache cache = cursorProvider.getPageCache(pos);
+      
+      if (cache == null)
+      {
+         return null;
+      }
+
+      if (!cache.isLive() && retPos.getMessageNr() >= cache.getNumberOfMessages())
+      {
+         retPos = pos.nextPage();
+
+         cache = cursorProvider.getPageCache(retPos);
+
+         if (cache == null)
+         {
+            return null;
+         }
+
+         if (retPos.getMessageNr() >= cache.getNumberOfMessages())
+         {
+            return null;
+         }
+      }
+
+      PagedMessage serverMessage = cache.getMessage(retPos.getMessageNr());
+
+      if (serverMessage != null)
+      {
+         return cursorProvider.newReference(retPos, serverMessage, this);
+      }
+      else
+      {
+         return null;
+      }
+   }
+
+   private boolean routed(PagedMessage message)
+   {
+      long id = getId();
+
+      for (long qid : message.getQueueIDs())
+      {
+         if (qid == id)
+         {
+            return true;
+         }
+      }
+      return false;
+   }
+
    /**
     * 
     */
@@ -360,7 +453,7 @@
                // The list is not ordered...
                // This is only done at creation of the queue, so we just scan instead of keeping the list ordened
                PagePosition retValue = null;
-               
+
                for (PagePosition pos : entry.getValue().acks)
                {
                   System.out.println("Analizing " + pos);
@@ -369,9 +462,9 @@
                      retValue = pos;
                   }
                }
-               
+
                System.out.println("Returning initial position " + retValue);
-               
+
                return retValue;
             }
          }
@@ -391,11 +484,10 @@
 
    }
 
-
    public void ackTx(final Transaction tx, final PagedReference reference) throws Exception
    {
       ackTx(tx, reference.getPosition());
-      
+
       PageTransactionInfo txInfo = getPageTransaction(reference);
       if (txInfo != null)
       {
@@ -403,7 +495,6 @@
       }
    }
 
-
    /* (non-Javadoc)
     * @see org.hornetq.core.paging.cursor.PageCursor#confirm(org.hornetq.core.paging.cursor.PagePosition)
     */
@@ -416,7 +507,7 @@
          txInfo.storeUpdate(this.store, pageStore.getPagingManager());
       }
    }
-   
+
    public void ack(final PagePosition position) throws Exception
    {
       // if we are dealing with a persistent cursor
@@ -465,7 +556,6 @@
       }
    }
 
-
    /* (non-Javadoc)
     * @see org.hornetq.core.paging.cursor.PageSubscription#queryMessage(org.hornetq.core.paging.cursor.PagePosition)
     */
@@ -481,7 +571,6 @@
       }
    }
 
-
    /** 
     * Theres no need to synchronize this method as it's only called from journal load on startup
     */
@@ -602,7 +691,7 @@
    {
       return cursorId;
    }
-   
+
    public boolean isPersistent()
    {
       return persistent;
@@ -619,7 +708,7 @@
          Collections.sort(recoveredACK);
 
          boolean first = true;
-         
+
          for (PagePosition pos : recoveredACK)
          {
             lastAckedPosition = pos;
@@ -669,7 +758,7 @@
          System.out.println(info);
       }
    }
-  
+
    private synchronized PageCursorInfo getPageInfo(final PagePosition pos)
    {
       return getPageInfo(pos, true);
@@ -756,7 +845,7 @@
       cursorTX.addPositionConfirmation(this, position);
 
    }
-   
+
    private PageTransactionInfo getPageTransaction(final PagedReference reference)
    {
       if (reference.getPagedMessage().getTransactionID() != 0)
@@ -780,6 +869,7 @@
          scheduleCleanupCheck();
       }
    }
+
    // Inner classes -------------------------------------------------
 
    /** 
@@ -798,7 +888,7 @@
       private final List<PagePosition> acks = Collections.synchronizedList(new LinkedList<PagePosition>());
 
       private WeakReference<PageCache> cache;
-      
+
       private Set<PagePosition> removedReferences = new ConcurrentHashSet<PagePosition>();
 
       // The page was live at the time of the creation
@@ -856,12 +946,12 @@
       {
          return pageId;
       }
-      
+
       public boolean isRemoved(final PagePosition pos)
       {
          return removedReferences.contains(pos);
       }
-      
+
       public void remove(final PagePosition position)
       {
          removedReferences.add(position);
@@ -875,10 +965,10 @@
          if (isTrace)
          {
             PageSubscriptionImpl.trace("numberOfMessages =  " + getNumberOfMessages() +
-                                 " confirmed =  " +
-                                 (confirmed.get() + 1) +
-                                 ", page = " +
-                                 pageId);
+                                       " confirmed =  " +
+                                       (confirmed.get() + 1) +
+                                       ", page = " +
+                                       pageId);
          }
 
          // Negative could mean a bookmark on the first element for the page (example -1)
@@ -952,7 +1042,6 @@
          }
       }
    }
-   
 
    class CursorIterator implements LinkedListIterator<PagedReference>
    {
@@ -963,11 +1052,11 @@
       private final LinkedListIterator<PagePosition> redeliveryIterator;
 
       private volatile boolean isredelivery = false;
-      
+
       /** next element taken on hasNext test.
        *  it has to be delivered on next next operation */
       private volatile PagedReference cachedNext;
-      
+
       public CursorIterator()
       {
          synchronized (redeliveries)
@@ -975,7 +1064,6 @@
             redeliveryIterator = redeliveries.iterator();
          }
       }
-      
 
       public void repeat()
       {
@@ -1004,14 +1092,14 @@
        */
       public synchronized PagedReference next()
       {
-         
+
          if (cachedNext != null)
          {
             PagedReference retPos = cachedNext;
             cachedNext = null;
             return retPos;
          }
-         
+
          try
          {
             synchronized (redeliveries)
@@ -1027,7 +1115,7 @@
                   isredelivery = false;
                }
             }
-            
+
             if (position == null)
             {
                position = getStartPosition();
@@ -1056,12 +1144,12 @@
          {
             return true;
          }
-         
+
          if (!pageStore.isPaging())
          {
             return false;
          }
-         
+
          cachedNext = next();
 
          return cachedNext != null;
@@ -1083,5 +1171,4 @@
       }
    }
 
-   
 }

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java	2010-11-11 22:23:30 UTC (rev 9877)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java	2010-11-12 04:51:39 UTC (rev 9878)
@@ -25,11 +25,10 @@
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.paging.PageTransactionInfo;
 import org.hornetq.core.paging.PagingManager;
-import org.hornetq.core.paging.cursor.PageSubscription;
 import org.hornetq.core.paging.cursor.PagePosition;
+import org.hornetq.core.paging.cursor.PageSubscription;
 import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.core.transaction.Transaction;
-import org.hornetq.core.transaction.TransactionOperation;
 import org.hornetq.core.transaction.TransactionOperationAbstract;
 import org.hornetq.core.transaction.TransactionPropertyIndexes;
 import org.hornetq.utils.DataConstants;

Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java	2010-11-11 22:23:30 UTC (rev 9877)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java	2010-11-12 04:51:39 UTC (rev 9878)
@@ -26,7 +26,6 @@
 import org.hornetq.api.core.client.ClientSessionFactory;
 import org.hornetq.core.config.Configuration;
 import org.hornetq.core.filter.Filter;
-import org.hornetq.core.paging.PageTransactionInfo;
 import org.hornetq.core.paging.PagedMessage;
 import org.hornetq.core.paging.cursor.PageCache;
 import org.hornetq.core.paging.cursor.PageCursorProvider;
@@ -36,7 +35,6 @@
 import org.hornetq.core.paging.cursor.impl.PageCursorProviderImpl;
 import org.hornetq.core.paging.cursor.impl.PagePositionImpl;
 import org.hornetq.core.paging.cursor.impl.PageSubscriptionImpl;
-import org.hornetq.core.paging.impl.PageTransactionInfoImpl;
 import org.hornetq.core.paging.impl.PagingStoreImpl;
 import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
@@ -55,7 +53,7 @@
 import org.hornetq.utils.LinkedListIterator;
 
 /**
- * A PageCacheTest
+ * A PageCursorTest
  *
  * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
  *
@@ -726,10 +724,18 @@
       }
 
       assertNull(iterator.next());
+      
+      server.getStorageManager().waitOnOperations();
 
       server.stop();
       createServer();
-      waitCleanup();
+
+      long timeout = System.currentTimeMillis() + 10000;
+
+      while (System.currentTimeMillis() < timeout && lookupPageStore(ADDRESS).getNumberOfPages() != 1)
+      {
+         Thread.sleep(500);
+      }
       assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
 
    }



More information about the hornetq-commits mailing list