[hornetq-commits] JBoss hornetq SVN: r9829 - in branches/Branch_New_Paging: src/main/org/hornetq/core/paging/cursor and 5 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Nov 1 20:24:35 EDT 2010


Author: clebert.suconic at jboss.com
Date: 2010-11-01 20:24:34 -0400 (Mon, 01 Nov 2010)
New Revision: 9829

Added:
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageSubscription.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
Removed:
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
Modified:
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PageTransactionInfo.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
   branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
Log:
Renaming Pagecursor to PageSubscription

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PageTransactionInfo.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PageTransactionInfo.java	2010-11-02 00:20:27 UTC (rev 9828)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PageTransactionInfo.java	2010-11-02 00:24:34 UTC (rev 9829)
@@ -14,7 +14,7 @@
 package org.hornetq.core.paging;
 
 import org.hornetq.core.journal.EncodingSupport;
-import org.hornetq.core.paging.cursor.PageCursor;
+import org.hornetq.core.paging.cursor.PageSubscription;
 import org.hornetq.core.paging.cursor.PagePosition;
 import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.core.transaction.Transaction;
@@ -58,5 +58,5 @@
     * @param cursorPos
     * @return true if the message will be delivered later, false if it should be delivered right away
     */
-   boolean deliverAfterCommit(PageCursor cursor, PagePosition cursorPos);
+   boolean deliverAfterCommit(PageSubscription cursor, PagePosition cursorPos);
 }

Deleted: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java	2010-11-02 00:20:27 UTC (rev 9828)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java	2010-11-02 00:24:34 UTC (rev 9829)
@@ -1,99 +0,0 @@
-/*
- * Copyright 2010 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *    http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied.  See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.paging.cursor;
-
-import org.hornetq.api.core.Pair;
-import org.hornetq.core.paging.PagedMessage;
-import org.hornetq.core.transaction.Transaction;
-import org.hornetq.utils.LinkedListIterator;
-
-/**
- * A PageCursor
- *
- * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
- *
- *
- */
-public interface PageCursor
-{
-
-   // Cursor query operations --------------------------------------
-   
-   // To be called before the server is down
-   void stop();
-   
-   void bookmark(PagePosition position) throws Exception;
-   
-   /** It will be 0 if non persistent cursor */
-   public long getId();
-   
-   public LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator();
-   
-   // To be called when the cursor is closed for good. Most likely when the queue is deleted
-   void close() throws Exception;
-   
-   void scheduleCleanupCheck();
-   
-   void cleanupEntries() throws Exception;
-   
-   void disableAutoCleanup();
-   
-   void enableAutoCleanup();
-
-   void ack(PagePosition position) throws Exception;
-
-   void ackTx(Transaction tx, PagePosition position) throws Exception;
-   /**
-    * 
-    * @return the first page in use or MAX_LONG if none is in use
-    */
-   long getFirstPage();
-   
-   // Reload operations
-   
-   /**
-    * @param position
-    */
-   void reloadACK(PagePosition position);
-   
-   /**
-    * To be called when the cursor decided to ignore a position.
-    * @param position
-    */
-   void positionIgnored(PagePosition position);
-   
-   /**
-    * To be used to avoid a redelivery of a prepared ACK after load
-    * @param position
-    */
-   void reloadPreparedACK(Transaction tx, PagePosition position);
-   
-   void processReload() throws Exception;
-
-   /**
-    * To be used on redeliveries
-    * @param position
-    */
-   void redeliver(PagePosition position);
-   
-   void printDebug();
-
-   /**
-    * @param minPage
-    * @return
-    */
-   boolean isComplete(long minPage);
-
-   void flushExecutors();
-}

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java	2010-11-02 00:20:27 UTC (rev 9828)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java	2010-11-02 00:24:34 UTC (rev 9829)
@@ -49,17 +49,17 @@
     * @param queueId The cursorID should be the same as the queueId associated for persistance
     * @return
     */
-   PageCursor getPersistentCursor(long queueId);
+   PageSubscription getPersistentCursor(long queueId);
    
-   PageCursor createPersistentCursor(long queueId, Filter filter);
+   PageSubscription createPersistentSubscription(long queueId, Filter filter);
    
    /**
     * Create a non persistent cursor, usually associated with browsing
     * @return
     */
-   PageCursor createNonPersistentCursor(Filter filter);
+   PageSubscription createNonPersistentSubscription(Filter filter);
 
-   Pair<PagePosition, PagedMessage> getNext(PageCursor cursor, PagePosition pos) throws Exception;
+   Pair<PagePosition, PagedMessage> getNext(PageSubscription cursor, PagePosition pos) throws Exception;
    
    PagedMessage getMessage(PagePosition pos) throws Exception;
 
@@ -77,7 +77,7 @@
    /**
     * @param pageCursorImpl
     */
-   void close(PageCursor pageCursorImpl);
+   void close(PageSubscription pageCursorImpl);
    
    // to be used on tests -------------------------------------------
    

Copied: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageSubscription.java (from rev 9827, branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java)
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageSubscription.java	                        (rev 0)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageSubscription.java	2010-11-02 00:24:34 UTC (rev 9829)
@@ -0,0 +1,99 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.paging.cursor;
+
+import org.hornetq.api.core.Pair;
+import org.hornetq.core.paging.PagedMessage;
+import org.hornetq.core.transaction.Transaction;
+import org.hornetq.utils.LinkedListIterator;
+
+/**
+ * A PageCursor
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
+ *
+ *
+ */
+public interface PageSubscription
+{
+
+   // Cursor query operations --------------------------------------
+   
+   // To be called before the server is down
+   void stop();
+   
+   void bookmark(PagePosition position) throws Exception;
+   
+   /** It will be 0 if non persistent cursor */
+   public long getId();
+   
+   public LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator();
+   
+   // To be called when the cursor is closed for good. Most likely when the queue is deleted
+   void close() throws Exception;
+   
+   void scheduleCleanupCheck();
+   
+   void cleanupEntries() throws Exception;
+   
+   void disableAutoCleanup();
+   
+   void enableAutoCleanup();
+
+   void ack(PagePosition position) throws Exception;
+
+   void ackTx(Transaction tx, PagePosition position) throws Exception;
+   /**
+    * 
+    * @return the first page in use or MAX_LONG if none is in use
+    */
+   long getFirstPage();
+   
+   // Reload operations
+   
+   /**
+    * @param position
+    */
+   void reloadACK(PagePosition position);
+   
+   /**
+    * To be called when the cursor decided to ignore a position.
+    * @param position
+    */
+   void positionIgnored(PagePosition position);
+   
+   /**
+    * To be used to avoid a redelivery of a prepared ACK after load
+    * @param position
+    */
+   void reloadPreparedACK(Transaction tx, PagePosition position);
+   
+   void processReload() throws Exception;
+
+   /**
+    * To be used on redeliveries
+    * @param position
+    */
+   void redeliver(PagePosition position);
+   
+   void printDebug();
+
+   /**
+    * @param minPage
+    * @return
+    */
+   boolean isComplete(long minPage);
+
+   void flushExecutors();
+}

Deleted: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java	2010-11-02 00:20:27 UTC (rev 9828)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java	2010-11-02 00:24:34 UTC (rev 9829)
@@ -1,943 +0,0 @@
-/*
- * Copyright 2010 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *    http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied.  See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.paging.cursor.impl;
-
-import java.lang.ref.WeakReference;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.hornetq.api.core.Pair;
-import org.hornetq.core.filter.Filter;
-import org.hornetq.core.journal.IOAsyncTask;
-import org.hornetq.core.logging.Logger;
-import org.hornetq.core.paging.PagedMessage;
-import org.hornetq.core.paging.PagingStore;
-import org.hornetq.core.paging.cursor.PageCache;
-import org.hornetq.core.paging.cursor.PageCursor;
-import org.hornetq.core.paging.cursor.PageCursorProvider;
-import org.hornetq.core.paging.cursor.PagePosition;
-import org.hornetq.core.persistence.StorageManager;
-import org.hornetq.core.server.ServerMessage;
-import org.hornetq.core.transaction.Transaction;
-import org.hornetq.core.transaction.TransactionOperationAbstract;
-import org.hornetq.core.transaction.TransactionPropertyIndexes;
-import org.hornetq.core.transaction.impl.TransactionImpl;
-import org.hornetq.utils.Future;
-import org.hornetq.utils.LinkedListImpl;
-import org.hornetq.utils.LinkedListIterator;
-
-/**
- * A PageCursorImpl
- *
- * A page cursor will always store its 
- * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
- *
- * 
- */
-public class PageCursorImpl implements PageCursor
-{
-   // Constants -----------------------------------------------------
-   private static final Logger log = Logger.getLogger(PageCursorImpl.class);
-
-   // Attributes ----------------------------------------------------
-
-   private final boolean isTrace = false; // PageCursorImpl.log.isTraceEnabled();
-
-   private static void trace(final String message)
-   {
-      // PageCursorImpl.log.info(message);
-      System.out.println(message);
-   }
-
-   private volatile boolean autoCleanup = true;
-
-   private final StorageManager store;
-
-   private final long cursorId;
-
-   private final Filter filter;
-
-   private final PagingStore pageStore;
-
-   private final PageCursorProvider cursorProvider;
-
-   private final Executor executor;
-
-   private volatile PagePosition lastPosition;
-
-   private volatile PagePosition lastAckedPosition;
-
-   private List<PagePosition> recoveredACK;
-
-   private final SortedMap<Long, PageCursorInfo> consumedPages = Collections.synchronizedSortedMap(new TreeMap<Long, PageCursorInfo>());
-
-   // We only store the position for redeliveries. They will be read from the SoftCache again during delivery.
-   private final org.hornetq.utils.LinkedList<PagePosition> redeliveries = new LinkedListImpl<PagePosition>();
-
-   // Static --------------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   public PageCursorImpl(final PageCursorProvider cursorProvider,
-                         final PagingStore pageStore,
-                         final StorageManager store,
-                         final Executor executor,
-                         final Filter filter,
-                         final long cursorId)
-   {
-      this.pageStore = pageStore;
-      this.store = store;
-      this.cursorProvider = cursorProvider;
-      this.cursorId = cursorId;
-      this.executor = executor;
-      this.filter = filter;
-   }
-
-   // Public --------------------------------------------------------
-
-   public void disableAutoCleanup()
-   {
-      autoCleanup = false;
-   }
-
-   public void enableAutoCleanup()
-   {
-      autoCleanup = true;
-   }
-
-   public PageCursorProvider getProvider()
-   {
-      return cursorProvider;
-   }
-
-   public void bookmark(PagePosition position) throws Exception
-   {
-      if (lastPosition != null)
-      {
-         throw new RuntimeException("Bookmark can only be done at the time of the cursor's creation");
-      }
-
-      lastPosition = position;
-
-      PageCursorInfo cursorInfo = getPageInfo(position);
-
-      if (position.getMessageNr() > 0)
-      {
-         cursorInfo.confirmed.addAndGet(position.getMessageNr());
-      }
-
-      ack(position);
-   }
-
-   class CursorIterator implements LinkedListIterator<Pair<PagePosition, PagedMessage>>
-   {
-      PagePosition position = getLastPosition();
-
-      PagePosition lastOperation = null;
-
-      LinkedListIterator<PagePosition> redeliveryIterator = redeliveries.iterator();
-
-      boolean isredelivery = false;
-
-      public void repeat()
-      {
-         if (isredelivery)
-         {
-            redeliveryIterator.repeat();
-         }
-         else
-         {
-            if (lastOperation == null)
-            {
-               position = getLastPosition();
-            }
-            else
-            {
-               position = lastOperation;
-            }
-         }
-      }
-
-      /* (non-Javadoc)
-       * @see java.util.Iterator#next()
-       */
-      public Pair<PagePosition, PagedMessage> next()
-      {
-         try
-         {
-            if (redeliveryIterator.hasNext())
-            {
-               isredelivery = true;
-               return getMessage(redeliveryIterator.next());
-            }
-            else
-            {
-               isredelivery = false;
-            }
-
-            Pair<PagePosition, PagedMessage> nextPos = moveNext(position);
-            if (nextPos != null)
-            {
-               position = nextPos.a;
-            }
-            return nextPos;
-         }
-         catch (Exception e)
-         {
-            throw new RuntimeException(e.getMessage(), e);
-         }
-      }
-
-      public boolean hasNext()
-      {
-         return true;
-      }
-
-      /* (non-Javadoc)
-       * @see java.util.Iterator#remove()
-       */
-      public void remove()
-      {
-      }
-
-      /* (non-Javadoc)
-       * @see org.hornetq.utils.LinkedListIterator#close()
-       */
-      public void close()
-      {
-      }
-   }
-
-   private Pair<PagePosition, PagedMessage> getMessage(PagePosition pos) throws Exception
-   {
-      return new Pair<PagePosition, PagedMessage>(pos, cursorProvider.getMessage(pos));
-   }
-
-   /* (non-Javadoc)
-    * @see org.hornetq.core.paging.cursor.PageCursor#iterator()
-    */
-   public LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator()
-   {
-      return new CursorIterator();
-   }
-
-   /* (non-Javadoc)
-    * @see org.hornetq.core.paging.cursor.PageCursor#moveNext()
-    */
-   public synchronized Pair<PagePosition, PagedMessage> moveNext(PagePosition position) throws Exception
-   {
-      boolean match = false;
-
-      Pair<PagePosition, PagedMessage> message = null;
-
-      PagePosition tmpPosition = position;
-
-      do
-      {
-         message = cursorProvider.getNext(this, tmpPosition);
-
-         if (message != null)
-         {
-            tmpPosition = message.a;
-
-            match = match(message.b.getMessage());
-
-            if (!match)
-            {
-               processACK(message.a);
-            }
-         }
-
-      }
-      while (message != null && !match);
-
-      return message;
-   }
-
-   /**
-    * 
-    */
-   private PagePosition getLastPosition()
-   {
-      if (lastPosition == null)
-      {
-         // it will start at the first available page
-         long firstPage = pageStore.getFirstPage();
-         lastPosition = new PagePositionImpl(firstPage, -1);
-      }
-
-      return lastPosition;
-   }
-
-   /* (non-Javadoc)
-    * @see org.hornetq.core.paging.cursor.PageCursor#confirm(org.hornetq.core.paging.cursor.PagePosition)
-    */
-   public void ack(final PagePosition position) throws Exception
-   {
-
-      // if we are dealing with a persistent cursor
-      if (cursorId != 0)
-      {
-         store.storeCursorAcknowledge(cursorId, position);
-      }
-
-      store.afterCompleteOperations(new IOAsyncTask()
-      {
-
-         public void onError(final int errorCode, final String errorMessage)
-         {
-         }
-
-         public void done()
-         {
-            processACK(position);
-         }
-      });
-   }
-
-   public void ackTx(final Transaction tx, final PagePosition position) throws Exception
-   {
-      // if the cursor is persistent
-      if (cursorId != 0)
-      {
-         store.storeCursorAcknowledgeTransactional(tx.getID(), cursorId, position);
-      }
-      installTXCallback(tx, position);
-
-   }
-
-   /* (non-Javadoc)
-    * @see org.hornetq.core.paging.cursor.PageCursor#getFirstPage()
-    */
-   public long getFirstPage()
-   {
-      if (consumedPages.isEmpty())
-      {
-         return 0;
-      }
-      else
-      {
-         return consumedPages.firstKey();
-      }
-   }
-
-   /* (non-Javadoc)
-    * @see org.hornetq.core.paging.cursor.PageCursor#returnElement(org.hornetq.core.paging.cursor.PagePosition)
-    */
-   public synchronized void redeliver(final PagePosition position)
-   {
-      redeliveries.addTail(position);
-   }
-
-   /** 
-    * Theres no need to synchronize this method as it's only called from journal load on startup
-    */
-   public void reloadACK(final PagePosition position)
-   {
-      if (recoveredACK == null)
-      {
-         recoveredACK = new LinkedList<PagePosition>();
-      }
-
-      recoveredACK.add(position);
-   }
-
-   /* (non-Javadoc)
-    * @see org.hornetq.core.paging.cursor.PageCursor#recoverPreparedACK(org.hornetq.core.paging.cursor.PagePosition)
-    */
-   public void reloadPreparedACK(final Transaction tx, final PagePosition position)
-   {
-      installTXCallback(tx, position);
-   }
-
-   /* (non-Javadoc)
-    * @see org.hornetq.core.paging.cursor.PageCursor#positionIgnored(org.hornetq.core.paging.cursor.PagePosition)
-    */
-   public void positionIgnored(final PagePosition position)
-   {
-      processACK(position);
-   }
-
-   /* (non-Javadoc)
-    * @see org.hornetq.core.paging.cursor.PageCursor#isComplete(long)
-    */
-   public boolean isComplete(long page)
-   {
-      PageCursorInfo info = consumedPages.get(page);
-      return info != null && info.isDone();
-   }
-
-   /**
-    * All the data associated with the cursor should go away here
-    */
-   public void close() throws Exception
-   {
-      final long tx = store.generateUniqueID();
-
-      final ArrayList<Exception> ex = new ArrayList<Exception>();
-
-      final AtomicBoolean isPersistent = new AtomicBoolean(false);
-
-      // We can't delete the records at the caller's thread
-      // because an executor may be holding the synchronized on PageCursorImpl
-      // what would lead to a dead lock
-      // so, we delete it inside the executor also
-      // and wait for the result
-      // The caller will be treating eventual IO exceptions and dispatching to the original thread's caller
-      executor.execute(new Runnable()
-      {
-
-         public void run()
-         {
-            try
-            {
-               synchronized (PageCursorImpl.this)
-               {
-                  for (PageCursorInfo cursor : consumedPages.values())
-                  {
-                     for (PagePosition info : cursor.acks)
-                     {
-                        if (info.getRecordID() != 0)
-                        {
-                           isPersistent.set(true);
-                           store.deleteCursorAcknowledgeTransactional(tx, info.getRecordID());
-                        }
-                     }
-                  }
-               }
-            }
-            catch (Exception e)
-            {
-               ex.add(e);
-               PageCursorImpl.log.warn(e.getMessage(), e);
-            }
-         }
-      });
-
-      Future future = new Future();
-
-      executor.execute(future);
-
-      while (!future.await(5000))
-      {
-         PageCursorImpl.log.warn("Timeout on waiting cursor " + this + " to be closed");
-      }
-
-      if (isPersistent.get())
-      {
-         // Another reason to perform the commit at the main thread is because the OperationContext may only send the
-         // result to the client when
-         // the IO on commit is done
-         if (ex.size() == 0)
-         {
-            store.commit(tx);
-         }
-         else
-         {
-            store.rollback(tx);
-            throw ex.get(0);
-         }
-      }
-
-      cursorProvider.close(this);
-   }
-
-   /* (non-Javadoc)
-    * @see org.hornetq.core.paging.cursor.PageCursor#getId()
-    */
-   public long getId()
-   {
-      return cursorId;
-   }
-
-   public void processReload() throws Exception
-   {
-      if (recoveredACK != null)
-      {
-         if (isTrace)
-         {
-            PageCursorImpl.trace("********** processing reload!!!!!!!");
-         }
-         Collections.sort(recoveredACK);
-
-         boolean first = true;
-
-         PagePosition previousPos = null;
-         for (PagePosition pos : recoveredACK)
-         {
-            PageCursorInfo positions = getPageInfo(pos);
-            if (first)
-            {
-               first = false;
-               if (pos.getMessageNr() > 0)
-               {
-                  positions.confirmed.addAndGet(pos.getMessageNr());
-               }
-            }
-
-            positions.addACK(pos);
-
-            lastPosition = pos;
-            if (previousPos != null)
-            {
-               if (!previousPos.isRightAfter(previousPos))
-               {
-                  PagePosition tmpPos = previousPos;
-                  // looking for holes on the ack list for redelivery
-                  while (true)
-                  {
-                     Pair<PagePosition, PagedMessage> msgCheck = cursorProvider.getNext(this, tmpPos);
-
-                     positions = getPageInfo(tmpPos);
-
-                     // end of the hole, we can finish processing here
-                     // It may be also that the next was just a next page, so we just ignore it
-                     if (msgCheck == null || msgCheck.a.equals(pos))
-                     {
-                        break;
-                     }
-                     else
-                     {
-                        if (match(msgCheck.b.getMessage()))
-                        {
-                           redeliver(msgCheck.a);
-                        }
-                        else
-                        {
-                           // The reference was ignored. But we must take a count from the reference count
-                           // otherwise the page will never be deleted hence we would never leave paging even if
-                           // everything was consumed
-                           positions.confirmed.incrementAndGet();
-                        }
-                     }
-                     tmpPos = msgCheck.a;
-                  }
-               }
-            }
-
-            previousPos = pos;
-         }
-
-         lastAckedPosition = lastPosition;
-
-         recoveredACK.clear();
-         recoveredACK = null;
-      }
-   }
-
-   public void flushExecutors()
-   {
-      Future future = new Future();
-      executor.execute(future);
-      while (!future.await(1000))
-      {
-         PageCursorImpl.log.warn("Waiting page cursor to finish executors - " + this);
-      }
-   }
-
-   public void stop()
-   {
-      flushExecutors();
-   }
-
-   public void printDebug()
-   {
-      printDebug(toString());
-   }
-
-   public void printDebug(final String msg)
-   {
-      System.out.println("Debug information on PageCurorImpl- " + msg);
-      for (PageCursorInfo info : consumedPages.values())
-      {
-         System.out.println(info);
-      }
-   }
-
-   /**
-    * @param page
-    * @return
-    */
-   private synchronized PageCursorInfo getPageInfo(final PagePosition pos)
-   {
-      PageCursorInfo pageInfo = consumedPages.get(pos.getPageNr());
-
-      if (pageInfo == null)
-      {
-         PageCache cache = cursorProvider.getPageCache(pos);
-         pageInfo = new PageCursorInfo(pos.getPageNr(), cache.getNumberOfMessages(), cache);
-         consumedPages.put(pos.getPageNr(), pageInfo);
-      }
-
-      return pageInfo;
-   }
-
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   protected boolean match(final ServerMessage message)
-   {
-      if (filter == null)
-      {
-         return true;
-      }
-      else
-      {
-         return filter.match(message);
-      }
-   }
-
-   // Private -------------------------------------------------------
-
-   // To be called only after the ACK has been processed and guaranteed to be on storae
-   // The only exception is on non storage events such as not matching messages
-   private void processACK(final PagePosition pos)
-   {
-      if (lastAckedPosition == null || pos.compareTo(lastAckedPosition) > 0)
-      {
-         if (lastAckedPosition != null && lastAckedPosition.getPageNr() != pos.getPageNr())
-         {
-            // there's a different page being acked, we will do the check right away
-            if (autoCleanup)
-            {
-               scheduleCleanupCheck();
-            }
-         }
-         lastAckedPosition = pos;
-      }
-      PageCursorInfo info = getPageInfo(pos);
-
-      info.addACK(pos);
-   }
-
-   /**
-    * @param tx
-    * @param position
-    */
-   private void installTXCallback(final Transaction tx, final PagePosition position)
-   {
-      if (position.getRecordID() > 0)
-      {
-         // It needs to persist, otherwise the cursor will return to the fist page position
-         tx.setContainsPersistent();
-      }
-
-      PageCursorTX cursorTX = (PageCursorTX)tx.getProperty(TransactionPropertyIndexes.PAGE_CURSOR_POSITIONS);
-
-      if (cursorTX == null)
-      {
-         cursorTX = new PageCursorTX();
-         tx.putProperty(TransactionPropertyIndexes.PAGE_CURSOR_POSITIONS, cursorTX);
-         tx.addOperation(cursorTX);
-      }
-
-      cursorTX.addPositionConfirmation(this, position);
-
-   }
-
-   /**
-    *  A callback from the PageCursorInfo. It will be called when all the messages on a page have been acked
-    * @param info
-    */
-   private void onPageDone(final PageCursorInfo info)
-   {
-      if (autoCleanup)
-      {
-         scheduleCleanupCheck();
-      }
-   }
-
-   public void scheduleCleanupCheck()
-   {
-      if (autoCleanup)
-      {
-         executor.execute(new Runnable()
-         {
-
-            public void run()
-            {
-               try
-               {
-                  cleanupEntries();
-               }
-               catch (Exception e)
-               {
-                  PageCursorImpl.log.warn("Error on cleaning up cursor pages", e);
-               }
-            }
-         });
-      }
-   }
-
-   /** 
-    * It will cleanup all the records for completed pages
-    * */
-   public void cleanupEntries() throws Exception
-   {
-      Transaction tx = new TransactionImpl(store);
-
-      boolean persist = false;
-
-      final ArrayList<PageCursorInfo> completedPages = new ArrayList<PageCursorInfo>();
-
-      // First get the completed pages using a lock
-      synchronized (this)
-      {
-         for (Entry<Long, PageCursorInfo> entry : consumedPages.entrySet())
-         {
-            PageCursorInfo info = entry.getValue();
-            if (info.isDone() && !info.isPendingDelete() && lastAckedPosition != null)
-            {
-               if (entry.getKey() == lastAckedPosition.getPageNr())
-               {
-                  PageCursorImpl.trace("We can't clear page " + entry.getKey() + " now since it's the current page");
-               }
-               else
-               {
-                  info.setPendingDelete();
-                  completedPages.add(entry.getValue());
-               }
-            }
-         }
-      }
-
-      for (int i = 0; i < completedPages.size(); i++)
-      {
-         PageCursorInfo info = completedPages.get(i);
-
-         for (PagePosition pos : info.acks)
-         {
-            if (pos.getRecordID() > 0)
-            {
-               store.deleteCursorAcknowledgeTransactional(tx.getID(), pos.getRecordID());
-               if (!persist)
-               {
-                  // only need to set it once
-                  tx.setContainsPersistent();
-                  persist = true;
-               }
-            }
-         }
-      }
-
-      tx.addOperation(new TransactionOperationAbstract()
-      {
-
-         @Override
-         public void afterCommit(final Transaction tx)
-         {
-            executor.execute(new Runnable()
-            {
-
-               public void run()
-               {
-                  synchronized (PageCursorImpl.this)
-                  {
-                     for (PageCursorInfo completePage : completedPages)
-                     {
-                        if (isTrace)
-                        {
-                           PageCursorImpl.trace("Removing page " + completePage.getPageId());
-                        }
-                        if (consumedPages.remove(completePage.getPageId()) == null)
-                        {
-                           PageCursorImpl.log.warn("Couldn't remove page " + completePage.getPageId() +
-                                                   " from consumed pages on cursor for address " +
-                                                   pageStore.getAddress());
-                        }
-                     }
-                  }
-
-                  cursorProvider.scheduleCleanup();
-               }
-            });
-         }
-      });
-
-      tx.commit();
-
-   }
-
-   // Inner classes -------------------------------------------------
-
-   private class PageCursorInfo
-   {
-      // Number of messages existent on this page
-      private final int numberOfMessages;
-
-      private final long pageId;
-
-      // Confirmed ACKs on this page
-      private final List<PagePosition> acks = Collections.synchronizedList(new LinkedList<PagePosition>());
-
-      private WeakReference<PageCache> cache;
-
-      // The page was live at the time of the creation
-      private final boolean wasLive;
-
-      // There's a pending delete on the async IO pipe
-      // We're holding this object to avoid delete the pages before the IO is complete,
-      // however we can't delete these records again
-      private boolean pendingDelete;
-
-      // We need a separate counter as the cursor may be ignoring certain values because of incomplete transactions or
-      // expressions
-      private final AtomicInteger confirmed = new AtomicInteger(0);
-
-      @Override
-      public String toString()
-      {
-         return "PageCursorInfo::PageID=" + pageId +
-                " numberOfMessage = " +
-                numberOfMessages +
-                ", confirmed = " +
-                confirmed;
-      }
-
-      public PageCursorInfo(final long pageId, final int numberOfMessages, final PageCache cache)
-      {
-         this.pageId = pageId;
-         this.numberOfMessages = numberOfMessages;
-         wasLive = cache.isLive();
-         if (wasLive)
-         {
-            this.cache = new WeakReference<PageCache>(cache);
-         }
-      }
-
-      public boolean isDone()
-      {
-         return getNumberOfMessages() == confirmed.get();
-      }
-
-      public boolean isPendingDelete()
-      {
-         return pendingDelete;
-      }
-
-      public void setPendingDelete()
-      {
-         pendingDelete = true;
-      }
-
-      /**
-       * @return the pageId
-       */
-      public long getPageId()
-      {
-         return pageId;
-      }
-
-      public void addACK(final PagePosition posACK)
-      {
-         if (posACK.getRecordID() > 0)
-         {
-            // We store these elements for later cleanup
-            acks.add(posACK);
-         }
-
-         if (isTrace)
-         {
-            PageCursorImpl.trace("numberOfMessages =  " + getNumberOfMessages() +
-                                 " confirmed =  " +
-                                 (confirmed.get() + 1) +
-                                 ", page = " +
-                                 pageId);
-         }
-
-         // Negative could mean a bookmark on the first element for the page (example -1)
-         if (posACK.getMessageNr() >= 0)
-         {
-            if (getNumberOfMessages() == confirmed.incrementAndGet())
-            {
-               onPageDone(this);
-            }
-         }
-      }
-
-      private int getNumberOfMessages()
-      {
-         if (wasLive)
-         {
-            PageCache cache = this.cache.get();
-            if (cache != null)
-            {
-               return cache.getNumberOfMessages();
-            }
-            else
-            {
-               cache = cursorProvider.getPageCache(new PagePositionImpl(pageId, 0));
-               this.cache = new WeakReference<PageCache>(cache);
-               return cache.getNumberOfMessages();
-            }
-         }
-         else
-         {
-            return numberOfMessages;
-         }
-      }
-
-   }
-
-   static class PageCursorTX extends TransactionOperationAbstract
-   {
-      HashMap<PageCursorImpl, List<PagePosition>> pendingPositions = new HashMap<PageCursorImpl, List<PagePosition>>();
-
-      public void addPositionConfirmation(final PageCursorImpl cursor, final PagePosition position)
-      {
-         List<PagePosition> list = pendingPositions.get(cursor);
-
-         if (list == null)
-         {
-            list = new LinkedList<PagePosition>();
-            pendingPositions.put(cursor, list);
-         }
-
-         list.add(position);
-      }
-
-      /* (non-Javadoc)
-       * @see org.hornetq.core.transaction.TransactionOperation#afterCommit(org.hornetq.core.transaction.Transaction)
-       */
-      @Override
-      public void afterCommit(final Transaction tx)
-      {
-         for (Entry<PageCursorImpl, List<PagePosition>> entry : pendingPositions.entrySet())
-         {
-            PageCursorImpl cursor = entry.getKey();
-
-            List<PagePosition> positions = entry.getValue();
-
-            for (PagePosition confirmed : positions)
-            {
-               cursor.processACK(confirmed);
-            }
-
-         }
-      }
-
-   }
-}

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java	2010-11-02 00:20:27 UTC (rev 9828)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java	2010-11-02 00:24:34 UTC (rev 9829)
@@ -29,7 +29,7 @@
 import org.hornetq.core.paging.PagingManager;
 import org.hornetq.core.paging.PagingStore;
 import org.hornetq.core.paging.cursor.PageCache;
-import org.hornetq.core.paging.cursor.PageCursor;
+import org.hornetq.core.paging.cursor.PageSubscription;
 import org.hornetq.core.paging.cursor.PageCursorProvider;
 import org.hornetq.core.paging.cursor.PagePosition;
 import org.hornetq.core.persistence.StorageManager;
@@ -70,9 +70,9 @@
 
    private Map<Long, PageCache> softCache = new SoftValueHashMap<Long, PageCache>();
 
-   private ConcurrentMap<Long, PageCursor> activeCursors = new ConcurrentHashMap<Long, PageCursor>();
+   private ConcurrentMap<Long, PageSubscription> activeCursors = new ConcurrentHashMap<Long, PageSubscription>();
 
-   private ConcurrentSet<PageCursor> nonPersistentCursors = new ConcurrentHashSet<PageCursor>();
+   private ConcurrentSet<PageSubscription> nonPersistentCursors = new ConcurrentHashSet<PageSubscription>();
 
    // Static --------------------------------------------------------
 
@@ -96,15 +96,15 @@
       return pagingStore;
    }
 
-   public synchronized PageCursor createPersistentCursor(long cursorID, Filter filter)
+   public synchronized PageSubscription createPersistentSubscription(long cursorID, Filter filter)
    {
-      PageCursor activeCursor = activeCursors.get(cursorID);
+      PageSubscription activeCursor = activeCursors.get(cursorID);
       if (activeCursor != null)
       {
          throw new IllegalStateException("Cursor " + cursorID + " had already been created");
       }
 
-      activeCursor = new PageCursorImpl(this,
+      activeCursor = new PageSubscriptionImpl(this,
                                         pagingStore,
                                         storageManager,
                                         executorFactory.getExecutor(),
@@ -117,7 +117,7 @@
    /* (non-Javadoc)
     * @see org.hornetq.core.paging.cursor.PageCursorProvider#createCursor()
     */
-   public synchronized PageCursor getPersistentCursor(long cursorID)
+   public synchronized PageSubscription getPersistentCursor(long cursorID)
    {
       return activeCursors.get(cursorID);
    }
@@ -125,9 +125,9 @@
    /**
     * this will create a non-persistent cursor
     */
-   public synchronized PageCursor createNonPersistentCursor(Filter filter)
+   public synchronized PageSubscription createNonPersistentSubscription(Filter filter)
    {
-      PageCursor cursor = new PageCursorImpl(this,
+      PageSubscription cursor = new PageSubscriptionImpl(this,
                                              pagingStore,
                                              storageManager,
                                              executorFactory.getExecutor(),
@@ -140,7 +140,7 @@
    /* (non-Javadoc)
     * @see org.hornetq.core.paging.cursor.PageCursorProvider#getAfter(org.hornetq.core.paging.cursor.PagePosition)
     */
-   public Pair<PagePosition, PagedMessage> getNext(final PageCursor cursor, PagePosition cursorPos) throws Exception
+   public Pair<PagePosition, PagedMessage> getNext(final PageSubscription cursor, PagePosition cursorPos) throws Exception
    {
 
       while (true)
@@ -260,7 +260,7 @@
 
    public void processReload() throws Exception
    {
-      for (PageCursor cursor : this.activeCursors.values())
+      for (PageSubscription cursor : this.activeCursors.values())
       {
          cursor.processReload();
       }
@@ -271,12 +271,12 @@
 
    public void stop()
    {
-      for (PageCursor cursor : activeCursors.values())
+      for (PageSubscription cursor : activeCursors.values())
       {
          cursor.stop();
       }
 
-      for (PageCursor cursor : nonPersistentCursors)
+      for (PageSubscription cursor : nonPersistentCursors)
       {
          cursor.stop();
       }
@@ -294,12 +294,12 @@
 
    public void flushExecutors()
    {
-      for (PageCursor cursor : activeCursors.values())
+      for (PageSubscription cursor : activeCursors.values())
       {
          cursor.flushExecutors();
       }
 
-      for (PageCursor cursor : nonPersistentCursors)
+      for (PageSubscription cursor : nonPersistentCursors)
       {
          cursor.flushExecutors();
       }
@@ -315,7 +315,7 @@
 
    }
 
-   public void close(PageCursor cursor)
+   public void close(PageSubscription cursor)
    {
       if (cursor.getId() != 0)
       {
@@ -359,7 +359,7 @@
                return;
             }
 
-            ArrayList<PageCursor> cursorList = new ArrayList<PageCursor>();
+            ArrayList<PageSubscription> cursorList = new ArrayList<PageSubscription>();
             cursorList.addAll(activeCursors.values());
             cursorList.addAll(nonPersistentCursors);
 
@@ -369,7 +369,7 @@
             {
                boolean complete = true;
 
-               for (PageCursor cursor : cursorList)
+               for (PageSubscription cursor : cursorList)
                {
                   if (!cursor.isComplete(minPage))
                   {
@@ -389,7 +389,7 @@
                   try
                   {
                      // First step: Move every cursor to the next bookmarked page (that was just created)
-                     for (PageCursor cursor : cursorList)
+                     for (PageSubscription cursor : cursorList)
                      {
                         cursor.ack(new PagePositionImpl(currentPage.getPageId(), -1));
                      }
@@ -398,7 +398,7 @@
                   }
                   finally
                   {
-                     for (PageCursor cursor : cursorList)
+                     for (PageSubscription cursor : cursorList)
                      {
                         cursor.enableAutoCleanup();
                      }
@@ -407,7 +407,7 @@
                   pagingStore.stopPaging();
 
                   // This has to be called after we stopped paging
-                  for (PageCursor cursor : cursorList)
+                  for (PageSubscription cursor : cursorList)
                   {
                      cursor.scheduleCleanupCheck();
                   }
@@ -481,11 +481,11 @@
    /**
     * This method is synchronized because we want it to be atomic with the cursors being used
     */
-   private long checkMinPage(List<PageCursor> cursorList)
+   private long checkMinPage(List<PageSubscription> cursorList)
    {
       long minPage = Long.MAX_VALUE;
 
-      for (PageCursor cursor : cursorList)
+      for (PageSubscription cursor : cursorList)
       {
          long firstPage = cursor.getFirstPage();
          if (firstPage < minPage)

Copied: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java (from rev 9828, branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java)
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java	                        (rev 0)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java	2010-11-02 00:24:34 UTC (rev 9829)
@@ -0,0 +1,943 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.paging.cursor.impl;
+
+import java.lang.ref.WeakReference;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.hornetq.api.core.Pair;
+import org.hornetq.core.filter.Filter;
+import org.hornetq.core.journal.IOAsyncTask;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.paging.PagedMessage;
+import org.hornetq.core.paging.PagingStore;
+import org.hornetq.core.paging.cursor.PageCache;
+import org.hornetq.core.paging.cursor.PageSubscription;
+import org.hornetq.core.paging.cursor.PageCursorProvider;
+import org.hornetq.core.paging.cursor.PagePosition;
+import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.transaction.Transaction;
+import org.hornetq.core.transaction.TransactionOperationAbstract;
+import org.hornetq.core.transaction.TransactionPropertyIndexes;
+import org.hornetq.core.transaction.impl.TransactionImpl;
+import org.hornetq.utils.Future;
+import org.hornetq.utils.LinkedListImpl;
+import org.hornetq.utils.LinkedListIterator;
+
+/**
+ * A PageCursorImpl
+ *
+ * A page cursor will always store its 
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
+ *
+ * 
+ */
+public class PageSubscriptionImpl implements PageSubscription
+{
+   // Constants -----------------------------------------------------
+   private static final Logger log = Logger.getLogger(PageSubscriptionImpl.class);
+
+   // Attributes ----------------------------------------------------
+
+   private final boolean isTrace = false; // PageCursorImpl.log.isTraceEnabled();
+
+   private static void trace(final String message)
+   {
+      // PageCursorImpl.log.info(message);
+      System.out.println(message);
+   }
+
+   private volatile boolean autoCleanup = true;
+
+   private final StorageManager store;
+
+   private final long cursorId;
+
+   private final Filter filter;
+
+   private final PagingStore pageStore;
+
+   private final PageCursorProvider cursorProvider;
+
+   private final Executor executor;
+
+   private volatile PagePosition lastPosition;
+
+   private volatile PagePosition lastAckedPosition;
+
+   private List<PagePosition> recoveredACK;
+
+   private final SortedMap<Long, PageCursorInfo> consumedPages = Collections.synchronizedSortedMap(new TreeMap<Long, PageCursorInfo>());
+
+   // We only store the position for redeliveries. They will be read from the SoftCache again during delivery.
+   private final org.hornetq.utils.LinkedList<PagePosition> redeliveries = new LinkedListImpl<PagePosition>();
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public PageSubscriptionImpl(final PageCursorProvider cursorProvider,
+                         final PagingStore pageStore,
+                         final StorageManager store,
+                         final Executor executor,
+                         final Filter filter,
+                         final long cursorId)
+   {
+      this.pageStore = pageStore;
+      this.store = store;
+      this.cursorProvider = cursorProvider;
+      this.cursorId = cursorId;
+      this.executor = executor;
+      this.filter = filter;
+   }
+
+   // Public --------------------------------------------------------
+
+   public void disableAutoCleanup()
+   {
+      autoCleanup = false;
+   }
+
+   public void enableAutoCleanup()
+   {
+      autoCleanup = true;
+   }
+
+   public PageCursorProvider getProvider()
+   {
+      return cursorProvider;
+   }
+
+   public void bookmark(PagePosition position) throws Exception
+   {
+      if (lastPosition != null)
+      {
+         throw new RuntimeException("Bookmark can only be done at the time of the cursor's creation");
+      }
+
+      lastPosition = position;
+
+      PageCursorInfo cursorInfo = getPageInfo(position);
+
+      if (position.getMessageNr() > 0)
+      {
+         cursorInfo.confirmed.addAndGet(position.getMessageNr());
+      }
+
+      ack(position);
+   }
+
+   class CursorIterator implements LinkedListIterator<Pair<PagePosition, PagedMessage>>
+   {
+      PagePosition position = getLastPosition();
+
+      PagePosition lastOperation = null;
+
+      LinkedListIterator<PagePosition> redeliveryIterator = redeliveries.iterator();
+
+      boolean isredelivery = false;
+
+      public void repeat()
+      {
+         if (isredelivery)
+         {
+            redeliveryIterator.repeat();
+         }
+         else
+         {
+            if (lastOperation == null)
+            {
+               position = getLastPosition();
+            }
+            else
+            {
+               position = lastOperation;
+            }
+         }
+      }
+
+      /* (non-Javadoc)
+       * @see java.util.Iterator#next()
+       */
+      public Pair<PagePosition, PagedMessage> next()
+      {
+         try
+         {
+            if (redeliveryIterator.hasNext())
+            {
+               isredelivery = true;
+               return getMessage(redeliveryIterator.next());
+            }
+            else
+            {
+               isredelivery = false;
+            }
+
+            Pair<PagePosition, PagedMessage> nextPos = moveNext(position);
+            if (nextPos != null)
+            {
+               position = nextPos.a;
+            }
+            return nextPos;
+         }
+         catch (Exception e)
+         {
+            throw new RuntimeException(e.getMessage(), e);
+         }
+      }
+
+      public boolean hasNext()
+      {
+         return true;
+      }
+
+      /* (non-Javadoc)
+       * @see java.util.Iterator#remove()
+       */
+      public void remove()
+      {
+      }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.utils.LinkedListIterator#close()
+       */
+      public void close()
+      {
+      }
+   }
+
+   private Pair<PagePosition, PagedMessage> getMessage(PagePosition pos) throws Exception
+   {
+      return new Pair<PagePosition, PagedMessage>(pos, cursorProvider.getMessage(pos));
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.paging.cursor.PageCursor#iterator()
+    */
+   public LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator()
+   {
+      return new CursorIterator();
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.paging.cursor.PageCursor#moveNext()
+    */
+   public synchronized Pair<PagePosition, PagedMessage> moveNext(PagePosition position) throws Exception
+   {
+      boolean match = false;
+
+      Pair<PagePosition, PagedMessage> message = null;
+
+      PagePosition tmpPosition = position;
+
+      do
+      {
+         message = cursorProvider.getNext(this, tmpPosition);
+
+         if (message != null)
+         {
+            tmpPosition = message.a;
+
+            match = match(message.b.getMessage());
+
+            if (!match)
+            {
+               processACK(message.a);
+            }
+         }
+
+      }
+      while (message != null && !match);
+
+      return message;
+   }
+
+   /**
+    * 
+    */
+   private PagePosition getLastPosition()
+   {
+      if (lastPosition == null)
+      {
+         // it will start at the first available page
+         long firstPage = pageStore.getFirstPage();
+         lastPosition = new PagePositionImpl(firstPage, -1);
+      }
+
+      return lastPosition;
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.paging.cursor.PageCursor#confirm(org.hornetq.core.paging.cursor.PagePosition)
+    */
+   public void ack(final PagePosition position) throws Exception
+   {
+
+      // if we are dealing with a persistent cursor
+      if (cursorId != 0)
+      {
+         store.storeCursorAcknowledge(cursorId, position);
+      }
+
+      store.afterCompleteOperations(new IOAsyncTask()
+      {
+
+         public void onError(final int errorCode, final String errorMessage)
+         {
+         }
+
+         public void done()
+         {
+            processACK(position);
+         }
+      });
+   }
+
+   public void ackTx(final Transaction tx, final PagePosition position) throws Exception
+   {
+      // if the cursor is persistent
+      if (cursorId != 0)
+      {
+         store.storeCursorAcknowledgeTransactional(tx.getID(), cursorId, position);
+      }
+      installTXCallback(tx, position);
+
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.paging.cursor.PageCursor#getFirstPage()
+    */
+   public long getFirstPage()
+   {
+      if (consumedPages.isEmpty())
+      {
+         return 0;
+      }
+      else
+      {
+         return consumedPages.firstKey();
+      }
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.paging.cursor.PageCursor#returnElement(org.hornetq.core.paging.cursor.PagePosition)
+    */
+   public synchronized void redeliver(final PagePosition position)
+   {
+      redeliveries.addTail(position);
+   }
+
+   /** 
+    * Theres no need to synchronize this method as it's only called from journal load on startup
+    */
+   public void reloadACK(final PagePosition position)
+   {
+      if (recoveredACK == null)
+      {
+         recoveredACK = new LinkedList<PagePosition>();
+      }
+
+      recoveredACK.add(position);
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.paging.cursor.PageCursor#recoverPreparedACK(org.hornetq.core.paging.cursor.PagePosition)
+    */
+   public void reloadPreparedACK(final Transaction tx, final PagePosition position)
+   {
+      installTXCallback(tx, position);
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.paging.cursor.PageCursor#positionIgnored(org.hornetq.core.paging.cursor.PagePosition)
+    */
+   public void positionIgnored(final PagePosition position)
+   {
+      processACK(position);
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.paging.cursor.PageCursor#isComplete(long)
+    */
+   public boolean isComplete(long page)
+   {
+      PageCursorInfo info = consumedPages.get(page);
+      return info != null && info.isDone();
+   }
+
+   /**
+    * All the data associated with the cursor should go away here
+    */
+   public void close() throws Exception
+   {
+      final long tx = store.generateUniqueID();
+
+      final ArrayList<Exception> ex = new ArrayList<Exception>();
+
+      final AtomicBoolean isPersistent = new AtomicBoolean(false);
+
+      // We can't delete the records at the caller's thread
+      // because an executor may be holding the synchronized on PageCursorImpl
+      // what would lead to a dead lock
+      // so, we delete it inside the executor also
+      // and wait for the result
+      // The caller will be treating eventual IO exceptions and dispatching to the original thread's caller
+      executor.execute(new Runnable()
+      {
+
+         public void run()
+         {
+            try
+            {
+               synchronized (PageSubscriptionImpl.this)
+               {
+                  for (PageCursorInfo cursor : consumedPages.values())
+                  {
+                     for (PagePosition info : cursor.acks)
+                     {
+                        if (info.getRecordID() != 0)
+                        {
+                           isPersistent.set(true);
+                           store.deleteCursorAcknowledgeTransactional(tx, info.getRecordID());
+                        }
+                     }
+                  }
+               }
+            }
+            catch (Exception e)
+            {
+               ex.add(e);
+               PageSubscriptionImpl.log.warn(e.getMessage(), e);
+            }
+         }
+      });
+
+      Future future = new Future();
+
+      executor.execute(future);
+
+      while (!future.await(5000))
+      {
+         PageSubscriptionImpl.log.warn("Timeout on waiting cursor " + this + " to be closed");
+      }
+
+      if (isPersistent.get())
+      {
+         // Another reason to perform the commit at the main thread is because the OperationContext may only send the
+         // result to the client when
+         // the IO on commit is done
+         if (ex.size() == 0)
+         {
+            store.commit(tx);
+         }
+         else
+         {
+            store.rollback(tx);
+            throw ex.get(0);
+         }
+      }
+
+      cursorProvider.close(this);
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.paging.cursor.PageCursor#getId()
+    */
+   public long getId()
+   {
+      return cursorId;
+   }
+
+   public void processReload() throws Exception
+   {
+      if (recoveredACK != null)
+      {
+         if (isTrace)
+         {
+            PageSubscriptionImpl.trace("********** processing reload!!!!!!!");
+         }
+         Collections.sort(recoveredACK);
+
+         boolean first = true;
+
+         PagePosition previousPos = null;
+         for (PagePosition pos : recoveredACK)
+         {
+            PageCursorInfo positions = getPageInfo(pos);
+            if (first)
+            {
+               first = false;
+               if (pos.getMessageNr() > 0)
+               {
+                  positions.confirmed.addAndGet(pos.getMessageNr());
+               }
+            }
+
+            positions.addACK(pos);
+
+            lastPosition = pos;
+            if (previousPos != null)
+            {
+               if (!previousPos.isRightAfter(previousPos))
+               {
+                  PagePosition tmpPos = previousPos;
+                  // looking for holes on the ack list for redelivery
+                  while (true)
+                  {
+                     Pair<PagePosition, PagedMessage> msgCheck = cursorProvider.getNext(this, tmpPos);
+
+                     positions = getPageInfo(tmpPos);
+
+                     // end of the hole, we can finish processing here
+                     // It may be also that the next was just a next page, so we just ignore it
+                     if (msgCheck == null || msgCheck.a.equals(pos))
+                     {
+                        break;
+                     }
+                     else
+                     {
+                        if (match(msgCheck.b.getMessage()))
+                        {
+                           redeliver(msgCheck.a);
+                        }
+                        else
+                        {
+                           // The reference was ignored. But we must take a count from the reference count
+                           // otherwise the page will never be deleted hence we would never leave paging even if
+                           // everything was consumed
+                           positions.confirmed.incrementAndGet();
+                        }
+                     }
+                     tmpPos = msgCheck.a;
+                  }
+               }
+            }
+
+            previousPos = pos;
+         }
+
+         lastAckedPosition = lastPosition;
+
+         recoveredACK.clear();
+         recoveredACK = null;
+      }
+   }
+
+   public void flushExecutors()
+   {
+      Future future = new Future();
+      executor.execute(future);
+      while (!future.await(1000))
+      {
+         PageSubscriptionImpl.log.warn("Waiting page cursor to finish executors - " + this);
+      }
+   }
+
+   public void stop()
+   {
+      flushExecutors();
+   }
+
+   public void printDebug()
+   {
+      printDebug(toString());
+   }
+
+   public void printDebug(final String msg)
+   {
+      System.out.println("Debug information on PageCurorImpl- " + msg);
+      for (PageCursorInfo info : consumedPages.values())
+      {
+         System.out.println(info);
+      }
+   }
+
+   /**
+    * @param page
+    * @return
+    */
+   private synchronized PageCursorInfo getPageInfo(final PagePosition pos)
+   {
+      PageCursorInfo pageInfo = consumedPages.get(pos.getPageNr());
+
+      if (pageInfo == null)
+      {
+         PageCache cache = cursorProvider.getPageCache(pos);
+         pageInfo = new PageCursorInfo(pos.getPageNr(), cache.getNumberOfMessages(), cache);
+         consumedPages.put(pos.getPageNr(), pageInfo);
+      }
+
+      return pageInfo;
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   protected boolean match(final ServerMessage message)
+   {
+      if (filter == null)
+      {
+         return true;
+      }
+      else
+      {
+         return filter.match(message);
+      }
+   }
+
+   // Private -------------------------------------------------------
+
+   // To be called only after the ACK has been processed and guaranteed to be on storae
+   // The only exception is on non storage events such as not matching messages
+   private void processACK(final PagePosition pos)
+   {
+      if (lastAckedPosition == null || pos.compareTo(lastAckedPosition) > 0)
+      {
+         if (lastAckedPosition != null && lastAckedPosition.getPageNr() != pos.getPageNr())
+         {
+            // there's a different page being acked, we will do the check right away
+            if (autoCleanup)
+            {
+               scheduleCleanupCheck();
+            }
+         }
+         lastAckedPosition = pos;
+      }
+      PageCursorInfo info = getPageInfo(pos);
+
+      info.addACK(pos);
+   }
+
+   /**
+    * @param tx
+    * @param position
+    */
+   private void installTXCallback(final Transaction tx, final PagePosition position)
+   {
+      if (position.getRecordID() > 0)
+      {
+         // It needs to persist, otherwise the cursor will return to the fist page position
+         tx.setContainsPersistent();
+      }
+
+      PageCursorTX cursorTX = (PageCursorTX)tx.getProperty(TransactionPropertyIndexes.PAGE_CURSOR_POSITIONS);
+
+      if (cursorTX == null)
+      {
+         cursorTX = new PageCursorTX();
+         tx.putProperty(TransactionPropertyIndexes.PAGE_CURSOR_POSITIONS, cursorTX);
+         tx.addOperation(cursorTX);
+      }
+
+      cursorTX.addPositionConfirmation(this, position);
+
+   }
+
+   /**
+    *  A callback from the PageCursorInfo. It will be called when all the messages on a page have been acked
+    * @param info
+    */
+   private void onPageDone(final PageCursorInfo info)
+   {
+      if (autoCleanup)
+      {
+         scheduleCleanupCheck();
+      }
+   }
+
+   public void scheduleCleanupCheck()
+   {
+      if (autoCleanup)
+      {
+         executor.execute(new Runnable()
+         {
+
+            public void run()
+            {
+               try
+               {
+                  cleanupEntries();
+               }
+               catch (Exception e)
+               {
+                  PageSubscriptionImpl.log.warn("Error on cleaning up cursor pages", e);
+               }
+            }
+         });
+      }
+   }
+
+   /** 
+    * It will cleanup all the records for completed pages
+    * */
+   public void cleanupEntries() throws Exception
+   {
+      Transaction tx = new TransactionImpl(store);
+
+      boolean persist = false;
+
+      final ArrayList<PageCursorInfo> completedPages = new ArrayList<PageCursorInfo>();
+
+      // First get the completed pages using a lock
+      synchronized (this)
+      {
+         for (Entry<Long, PageCursorInfo> entry : consumedPages.entrySet())
+         {
+            PageCursorInfo info = entry.getValue();
+            if (info.isDone() && !info.isPendingDelete() && lastAckedPosition != null)
+            {
+               if (entry.getKey() == lastAckedPosition.getPageNr())
+               {
+                  PageSubscriptionImpl.trace("We can't clear page " + entry.getKey() + " now since it's the current page");
+               }
+               else
+               {
+                  info.setPendingDelete();
+                  completedPages.add(entry.getValue());
+               }
+            }
+         }
+      }
+
+      for (int i = 0; i < completedPages.size(); i++)
+      {
+         PageCursorInfo info = completedPages.get(i);
+
+         for (PagePosition pos : info.acks)
+         {
+            if (pos.getRecordID() > 0)
+            {
+               store.deleteCursorAcknowledgeTransactional(tx.getID(), pos.getRecordID());
+               if (!persist)
+               {
+                  // only need to set it once
+                  tx.setContainsPersistent();
+                  persist = true;
+               }
+            }
+         }
+      }
+
+      tx.addOperation(new TransactionOperationAbstract()
+      {
+
+         @Override
+         public void afterCommit(final Transaction tx)
+         {
+            executor.execute(new Runnable()
+            {
+
+               public void run()
+               {
+                  synchronized (PageSubscriptionImpl.this)
+                  {
+                     for (PageCursorInfo completePage : completedPages)
+                     {
+                        if (isTrace)
+                        {
+                           PageSubscriptionImpl.trace("Removing page " + completePage.getPageId());
+                        }
+                        if (consumedPages.remove(completePage.getPageId()) == null)
+                        {
+                           PageSubscriptionImpl.log.warn("Couldn't remove page " + completePage.getPageId() +
+                                                   " from consumed pages on cursor for address " +
+                                                   pageStore.getAddress());
+                        }
+                     }
+                  }
+
+                  cursorProvider.scheduleCleanup();
+               }
+            });
+         }
+      });
+
+      tx.commit();
+
+   }
+
+   // Inner classes -------------------------------------------------
+
+   private class PageCursorInfo
+   {
+      // Number of messages existent on this page
+      private final int numberOfMessages;
+
+      private final long pageId;
+
+      // Confirmed ACKs on this page
+      private final List<PagePosition> acks = Collections.synchronizedList(new LinkedList<PagePosition>());
+
+      private WeakReference<PageCache> cache;
+
+      // The page was live at the time of the creation
+      private final boolean wasLive;
+
+      // There's a pending delete on the async IO pipe
+      // We're holding this object to avoid delete the pages before the IO is complete,
+      // however we can't delete these records again
+      private boolean pendingDelete;
+
+      // We need a separate counter as the cursor may be ignoring certain values because of incomplete transactions or
+      // expressions
+      private final AtomicInteger confirmed = new AtomicInteger(0);
+
+      @Override
+      public String toString()
+      {
+         return "PageCursorInfo::PageID=" + pageId +
+                " numberOfMessage = " +
+                numberOfMessages +
+                ", confirmed = " +
+                confirmed;
+      }
+
+      public PageCursorInfo(final long pageId, final int numberOfMessages, final PageCache cache)
+      {
+         this.pageId = pageId;
+         this.numberOfMessages = numberOfMessages;
+         wasLive = cache.isLive();
+         if (wasLive)
+         {
+            this.cache = new WeakReference<PageCache>(cache);
+         }
+      }
+
+      public boolean isDone()
+      {
+         return getNumberOfMessages() == confirmed.get();
+      }
+
+      public boolean isPendingDelete()
+      {
+         return pendingDelete;
+      }
+
+      public void setPendingDelete()
+      {
+         pendingDelete = true;
+      }
+
+      /**
+       * @return the pageId
+       */
+      public long getPageId()
+      {
+         return pageId;
+      }
+
+      public void addACK(final PagePosition posACK)
+      {
+         if (posACK.getRecordID() > 0)
+         {
+            // We store these elements for later cleanup
+            acks.add(posACK);
+         }
+
+         if (isTrace)
+         {
+            PageSubscriptionImpl.trace("numberOfMessages =  " + getNumberOfMessages() +
+                                 " confirmed =  " +
+                                 (confirmed.get() + 1) +
+                                 ", page = " +
+                                 pageId);
+         }
+
+         // Negative could mean a bookmark on the first element for the page (example -1)
+         if (posACK.getMessageNr() >= 0)
+         {
+            if (getNumberOfMessages() == confirmed.incrementAndGet())
+            {
+               onPageDone(this);
+            }
+         }
+      }
+
+      private int getNumberOfMessages()
+      {
+         if (wasLive)
+         {
+            PageCache cache = this.cache.get();
+            if (cache != null)
+            {
+               return cache.getNumberOfMessages();
+            }
+            else
+            {
+               cache = cursorProvider.getPageCache(new PagePositionImpl(pageId, 0));
+               this.cache = new WeakReference<PageCache>(cache);
+               return cache.getNumberOfMessages();
+            }
+         }
+         else
+         {
+            return numberOfMessages;
+         }
+      }
+
+   }
+
+   static class PageCursorTX extends TransactionOperationAbstract
+   {
+      HashMap<PageSubscriptionImpl, List<PagePosition>> pendingPositions = new HashMap<PageSubscriptionImpl, List<PagePosition>>();
+
+      public void addPositionConfirmation(final PageSubscriptionImpl cursor, final PagePosition position)
+      {
+         List<PagePosition> list = pendingPositions.get(cursor);
+
+         if (list == null)
+         {
+            list = new LinkedList<PagePosition>();
+            pendingPositions.put(cursor, list);
+         }
+
+         list.add(position);
+      }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.transaction.TransactionOperation#afterCommit(org.hornetq.core.transaction.Transaction)
+       */
+      @Override
+      public void afterCommit(final Transaction tx)
+      {
+         for (Entry<PageSubscriptionImpl, List<PagePosition>> entry : pendingPositions.entrySet())
+         {
+            PageSubscriptionImpl cursor = entry.getKey();
+
+            List<PagePosition> positions = entry.getValue();
+
+            for (PagePosition confirmed : positions)
+            {
+               cursor.processACK(confirmed);
+            }
+
+         }
+      }
+
+   }
+}

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java	2010-11-02 00:20:27 UTC (rev 9828)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java	2010-11-02 00:24:34 UTC (rev 9829)
@@ -22,7 +22,7 @@
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.paging.PageTransactionInfo;
 import org.hornetq.core.paging.PagingManager;
-import org.hornetq.core.paging.cursor.PageCursor;
+import org.hornetq.core.paging.cursor.PageSubscription;
 import org.hornetq.core.paging.cursor.PagePosition;
 import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.core.transaction.Transaction;
@@ -52,7 +52,7 @@
 
    private AtomicInteger numberOfMessages = new AtomicInteger(0);
    
-   private List<Pair<PageCursor, PagePosition>> lateDeliveries;
+   private List<Pair<PageSubscription, PagePosition>> lateDeliveries;
 
    // Static --------------------------------------------------------
 
@@ -141,7 +141,7 @@
       committed = true;
       if (lateDeliveries != null)
       {
-         for (Pair<PageCursor, PagePosition> pos : lateDeliveries)
+         for (Pair<PageSubscription, PagePosition> pos : lateDeliveries)
          {
             pos.a.redeliver(pos.b);
          }
@@ -210,7 +210,7 @@
 
       if (lateDeliveries != null)
       {
-         for (Pair<PageCursor, PagePosition> pos : lateDeliveries)
+         for (Pair<PageSubscription, PagePosition> pos : lateDeliveries)
          {
             pos.a.positionIgnored(pos.b);
          }
@@ -230,7 +230,7 @@
    /* (non-Javadoc)
     * @see org.hornetq.core.paging.PageTransactionInfo#deliverAfterCommit(org.hornetq.core.paging.cursor.PageCursor, org.hornetq.core.paging.cursor.PagePosition)
     */
-   public synchronized boolean deliverAfterCommit(PageCursor cursor, PagePosition cursorPos)
+   public synchronized boolean deliverAfterCommit(PageSubscription cursor, PagePosition cursorPos)
    {
       if (committed)
       {
@@ -246,9 +246,9 @@
       {
          if (lateDeliveries == null)
          {
-            lateDeliveries = new LinkedList<Pair<PageCursor, PagePosition>>();
+            lateDeliveries = new LinkedList<Pair<PageSubscription, PagePosition>>();
          }
-         lateDeliveries.add(new Pair<PageCursor, PagePosition>(cursor, cursorPos));
+         lateDeliveries.add(new Pair<PageSubscription, PagePosition>(cursor, cursorPos));
          return true;
       }
    }

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2010-11-02 00:20:27 UTC (rev 9828)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2010-11-02 00:24:34 UTC (rev 9829)
@@ -51,7 +51,7 @@
 import org.hornetq.core.paging.PagedMessage;
 import org.hornetq.core.paging.PagingManager;
 import org.hornetq.core.paging.PagingStore;
-import org.hornetq.core.paging.cursor.PageCursor;
+import org.hornetq.core.paging.cursor.PageSubscription;
 import org.hornetq.core.paging.cursor.PagePosition;
 import org.hornetq.core.paging.cursor.impl.PagePositionImpl;
 import org.hornetq.core.paging.impl.PageTransactionInfoImpl;
@@ -1028,7 +1028,7 @@
                {
                   SimpleString address = queueInfo.getAddress();
                   PagingStore store = pagingManager.getPageStore(address);
-                  PageCursor cursor = store.getCursorProvier().getPersistentCursor(encoding.queueID);
+                  PageSubscription cursor = store.getCursorProvier().getPersistentCursor(encoding.queueID);
                   cursor.reloadACK(encoding.position);
                }
                else

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2010-11-02 00:20:27 UTC (rev 9828)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2010-11-02 00:24:34 UTC (rev 9829)
@@ -1214,7 +1214,7 @@
          managementService.registerAddress(queueBindingInfo.getAddress());
          managementService.registerQueue(queue, queueBindingInfo.getAddress(), storageManager);
          
-         pagingManager.getPageStore(queueBindingInfo.getAddress()).getCursorProvier().createPersistentCursor(queue.getID(), filter);
+         pagingManager.getPageStore(queueBindingInfo.getAddress()).getCursorProvier().createPersistentSubscription(queue.getID(), filter);
       }
 
       for (GroupingInfo groupingInfo : groupingInfos)

Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java	2010-11-02 00:20:27 UTC (rev 9828)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java	2010-11-02 00:24:34 UTC (rev 9829)
@@ -29,10 +29,10 @@
 import org.hornetq.core.paging.PageTransactionInfo;
 import org.hornetq.core.paging.PagedMessage;
 import org.hornetq.core.paging.cursor.PageCache;
-import org.hornetq.core.paging.cursor.PageCursor;
+import org.hornetq.core.paging.cursor.PageSubscription;
 import org.hornetq.core.paging.cursor.PageCursorProvider;
 import org.hornetq.core.paging.cursor.PagePosition;
-import org.hornetq.core.paging.cursor.impl.PageCursorImpl;
+import org.hornetq.core.paging.cursor.impl.PageSubscriptionImpl;
 import org.hornetq.core.paging.cursor.impl.PageCursorProviderImpl;
 import org.hornetq.core.paging.cursor.impl.PagePositionImpl;
 import org.hornetq.core.paging.impl.PageTransactionInfoImpl;
@@ -117,7 +117,7 @@
 
       System.out.println("NumberOfPages = " + numberOfPages);
 
-      PageCursor cursor = createNonPersistentCursor();
+      PageSubscription cursor = createNonPersistentCursor();
 
       Pair<PagePosition, PagedMessage> msg;
 
@@ -157,7 +157,7 @@
 
       System.out.println("NumberOfPages = " + numberOfPages);
 
-      PageCursor cursorEven = createNonPersistentCursor(new Filter()
+      PageSubscription cursorEven = createNonPersistentCursor(new Filter()
       {
 
          public boolean match(ServerMessage message)
@@ -180,7 +180,7 @@
 
       });
 
-      PageCursor cursorOdd = createNonPersistentCursor(new Filter()
+      PageSubscription cursorOdd = createNonPersistentCursor(new Filter()
       {
 
          public boolean match(ServerMessage message)
@@ -273,10 +273,10 @@
       // need to change this after some integration
       // PageCursor cursor =
       // this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
-      PageCursor cursor = this.server.getPagingManager()
+      PageSubscription cursor = this.server.getPagingManager()
                                      .getPageStore(ADDRESS)
                                      .getCursorProvier()
-                                     .createPersistentCursor(queue.getID(), null);
+                                     .createPersistentSubscription(queue.getID(), null);
 
       PageCache firstPage = cursorProvider.getPageCache(new PagePositionImpl(server.getPagingManager()
                                                                                    .getPageStore(ADDRESS)
@@ -370,10 +370,10 @@
       // need to change this after some integration
       // PageCursor cursor =
       // this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
-      PageCursor cursor = this.server.getPagingManager()
+      PageSubscription cursor = this.server.getPagingManager()
                                      .getPageStore(ADDRESS)
                                      .getCursorProvier()
-                                     .createPersistentCursor(queue.getID(), null);
+                                     .createPersistentSubscription(queue.getID(), null);
 
       System.out.println("Cursor: " + cursor);
       LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator = cursor.iterator();
@@ -436,10 +436,10 @@
       // need to change this after some integration
       // PageCursor cursor =
       // this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
-      PageCursor cursor = this.server.getPagingManager()
+      PageSubscription cursor = this.server.getPagingManager()
                                      .getPageStore(ADDRESS)
                                      .getCursorProvier()
-                                     .createPersistentCursor(queue.getID(), null);
+                                     .createPersistentSubscription(queue.getID(), null);
 
       System.out.println("Cursor: " + cursor);
 
@@ -514,10 +514,10 @@
       // need to change this after some integration
       // PageCursor cursor =
       // this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
-      PageCursor cursor = this.server.getPagingManager()
+      PageSubscription cursor = this.server.getPagingManager()
                                      .getPageStore(ADDRESS)
                                      .getCursorProvier()
-                                     .createPersistentCursor(queue.getID(), null);
+                                     .createPersistentSubscription(queue.getID(), null);
 
       System.out.println("Cursor: " + cursor);
 
@@ -684,10 +684,10 @@
       // need to change this after some integration
       // PageCursor cursor =
       // this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
-      PageCursor cursor = this.server.getPagingManager()
+      PageSubscription cursor = this.server.getPagingManager()
                                      .getPageStore(ADDRESS)
                                      .getCursorProvier()
-                                     .createPersistentCursor(queue.getID(), null);
+                                     .createPersistentSubscription(queue.getID(), null);
       LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator = cursor.iterator();
 
       System.out.println("Cursor: " + cursor);
@@ -760,8 +760,8 @@
 
       PageCursorProvider cursorProvider = lookupCursorProvider();
 
-      PageCursor cursor = cursorProvider.createNonPersistentCursor(null);
-      PageCursorImpl cursor2 = (PageCursorImpl)cursorProvider.createNonPersistentCursor(null);
+      PageSubscription cursor = cursorProvider.createNonPersistentSubscription(null);
+      PageSubscriptionImpl cursor2 = (PageSubscriptionImpl)cursorProvider.createNonPersistentSubscription(null);
 
       Pair<PagePosition, PagedMessage> msg;
       LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator = cursor.iterator();
@@ -832,7 +832,7 @@
 
       PageCache cache = cursorProvider.getPageCache(new PagePositionImpl(5, 0));
 
-      PageCursor cursor = cursorProvider.createNonPersistentCursor(null);
+      PageSubscription cursor = cursorProvider.createNonPersistentSubscription(null);
       PagePosition startingPos = new PagePositionImpl(5, cache.getNumberOfMessages() / 2);
       cursor.bookmark(startingPos);
       PagedMessage msg = cache.getMessage(startingPos.getMessageNr() + 1);
@@ -880,7 +880,7 @@
       // need to change this after some integration
       // PageCursor cursor =
       // this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
-      PageCursor cursor = cursorProvider.createPersistentCursor(queue.getID(), null);
+      PageSubscription cursor = cursorProvider.createPersistentSubscription(queue.getID(), null);
       PagePosition startingPos = new PagePositionImpl(5, cache.getNumberOfMessages() / 2);
       cursor.bookmark(startingPos);
       PagedMessage msg = cache.getMessage(startingPos.getMessageNr() + 1);
@@ -1013,18 +1013,18 @@
     * @return
     * @throws Exception
     */
-   private PageCursor createNonPersistentCursor() throws Exception
+   private PageSubscription createNonPersistentCursor() throws Exception
    {
-      return lookupCursorProvider().createNonPersistentCursor(null);
+      return lookupCursorProvider().createNonPersistentSubscription(null);
    }
 
    /**
     * @return
     * @throws Exception
     */
-   private PageCursor createNonPersistentCursor(Filter filter) throws Exception
+   private PageSubscription createNonPersistentCursor(Filter filter) throws Exception
    {
-      return lookupCursorProvider().createNonPersistentCursor(filter);
+      return lookupCursorProvider().createNonPersistentSubscription(filter);
    }
 
    /**



More information about the hornetq-commits mailing list