[hornetq-commits] JBoss hornetq SVN: r9769 - in branches/Branch_New_Paging: src/main/org/hornetq/core/paging/cursor and 6 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Oct 11 17:16:17 EDT 2010


Author: clebert.suconic at jboss.com
Date: 2010-10-11 17:16:17 -0400 (Mon, 11 Oct 2010)
New Revision: 9769

Modified:
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCache.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagePosition.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PagePositionImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreFactoryNIO.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/transaction/TransactionPropertyIndexes.java
   branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/client/PagingTest.java
   branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCrashTest.java
   branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
   branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
Log:
just backing up the current state of my work

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java	2010-10-11 13:42:51 UTC (rev 9768)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java	2010-10-11 21:16:17 UTC (rev 9769)
@@ -37,6 +37,9 @@
    SimpleString getAddress();
 
    int getNumberOfPages();
+   
+   // The current page in which the system is writing files
+   int getCurrentWritingPage();
 
    SimpleString getStoreName();
 

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCache.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCache.java	2010-10-11 13:42:51 UTC (rev 9768)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCache.java	2010-10-11 21:16:17 UTC (rev 9769)
@@ -28,6 +28,8 @@
    Page getPage();
 
    int getNumberOfMessages();
+   
+   void setMessages(ServerMessage[] messages);
 
    /**
     * 

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java	2010-10-11 13:42:51 UTC (rev 9768)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java	2010-10-11 21:16:17 UTC (rev 9769)
@@ -14,6 +14,7 @@
 package org.hornetq.core.paging.cursor;
 
 import org.hornetq.api.core.Pair;
+import org.hornetq.core.paging.Page;
 import org.hornetq.core.paging.PagingStore;
 import org.hornetq.core.server.ServerMessage;
 
@@ -37,7 +38,7 @@
 
    // Public --------------------------------------------------------
 
-   PageCache getPageCache(long pageId) throws Exception;
+   PageCache getPageCache(PagePosition pos);
 
    PagingStore getAssociatedStore();
 

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagePosition.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagePosition.java	2010-10-11 13:42:51 UTC (rev 9768)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagePosition.java	2010-10-11 21:16:17 UTC (rev 9769)
@@ -14,6 +14,7 @@
 package org.hornetq.core.paging.cursor;
 
 
+
 /**
  * A PagePosition
  *
@@ -33,6 +34,15 @@
    long getPageNr();
 
    int getMessageNr();
+   
+   void setPageCache(PageCache pageCache);
+   
+   /**
+    * PagePosition will hold the page with a weak reference.
+    * So, this could be eventually null case soft-cache was released
+    * @return
+    */
+   PageCache getPageCache();
 
    PagePosition nextMessage();
 

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java	2010-10-11 13:42:51 UTC (rev 9768)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java	2010-10-11 21:16:17 UTC (rev 9769)
@@ -14,20 +14,29 @@
 package org.hornetq.core.paging.cursor.impl;
 
 import java.util.Collections;
-import java.util.Deque;
+import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.hornetq.api.core.Pair;
+import org.hornetq.core.paging.Page;
 import org.hornetq.core.paging.PagingStore;
+import org.hornetq.core.paging.cursor.PageCache;
 import org.hornetq.core.paging.cursor.PageCursor;
 import org.hornetq.core.paging.cursor.PageCursorProvider;
 import org.hornetq.core.paging.cursor.PagePosition;
 import org.hornetq.core.persistence.StorageManager;
-import org.hornetq.core.server.MessageReference;
 import org.hornetq.core.server.ServerMessage;
 import org.hornetq.core.transaction.Transaction;
+import org.hornetq.core.transaction.TransactionOperation;
+import org.hornetq.core.transaction.TransactionPropertyIndexes;
 
 /**
  * A PageCursorImpl
@@ -50,11 +59,15 @@
    private final PagingStore pageStore;
 
    private final PageCursorProvider cursorProvider;
+   
+   private final Executor executor;
 
    private volatile PagePosition lastPosition;
 
    private List<PagePosition> recoveredACK;
 
+   private SortedMap<Long, PageCursorInfo> consumedPages = Collections.synchronizedSortedMap(new TreeMap<Long, PageCursorInfo>());
+
    // We only store the position for redeliveries. They will be read from the SoftCache again during delivery.
    private final ConcurrentLinkedQueue<PagePosition> redeliveries = new ConcurrentLinkedQueue<PagePosition>();
 
@@ -65,12 +78,14 @@
    public PageCursorImpl(final PageCursorProvider cursorProvider,
                          final PagingStore pageStore,
                          final StorageManager store,
+                         final Executor executor,
                          final long cursorId)
    {
       this.pageStore = pageStore;
       this.store = store;
       this.cursorProvider = cursorProvider;
       this.cursorId = cursorId;
+      this.executor = executor;
    }
 
    // Public --------------------------------------------------------
@@ -98,19 +113,23 @@
       boolean match = false;
 
       Pair<PagePosition, ServerMessage> message = null;
+      
       do
       {
          message = cursorProvider.getAfter(lastPosition);
+
          if (message != null)
          {
             lastPosition = message.a;
-         }
-         match = match(message.b);
 
-         if (!match)
-         {
-            ignored(message.a);
+            match = match(message.b);
+
+            if (!match)
+            {
+               confirmPagePosition(message.a);
+            }
          }
+         
       }
       while (message != null && !match);
 
@@ -130,11 +149,6 @@
       store.storeCursorAcknowledgeTransactional(tx.getID(), cursorId, position);
       installTXCallback(tx, position);
 
-      // It needs to persist, otherwise the cursor will return to the fist page position
-      tx.setContainsPersistent();
-      
-      
-      // tx.afterCommit()
    }
 
    /* (non-Javadoc)
@@ -173,6 +187,11 @@
          PagePosition previousPos = null;
          for (PagePosition pos : recoveredACK)
          {
+            PageCursorInfo positions = getPageInfo(pos);
+            
+            positions.confirmed.incrementAndGet();
+            positions.acks.add(pos);
+
             lastPosition = pos;
             if (previousPos != null)
             {
@@ -183,6 +202,9 @@
                   while (true)
                   {
                      Pair<PagePosition, ServerMessage> msgCheck = cursorProvider.getAfter(tmpPos);
+
+                     positions = getPageInfo(tmpPos);
+
                      // end of the hole, we can finish processing here
                      // It may be also that the next was just a next page, so we just ignore it
                      if (msgCheck == null || msgCheck.a.equals(pos))
@@ -195,6 +217,12 @@
                         {
                            redeliver(msgCheck.a);
                         }
+                        else
+                        {
+                           // The reference was ignored. But we must take a count from the reference count
+                           // otherwise the page will never be deleted hence we would never leave paging even if everything was consumed
+                           positions.confirmed.incrementAndGet();
+                        }
                      }
                      tmpPos = msgCheck.a;
                   }
@@ -210,9 +238,29 @@
       }
    }
 
+   /**
+    * @param page
+    * @return
+    */
+   private PageCursorInfo getPageInfo(PagePosition pos)
+   {
+      PageCursorInfo pageInfo = consumedPages.get(pos.getPageNr());
+      
+      if (pageInfo == null)
+      {
+         PageCache cache = cursorProvider.getPageCache(pos);
+         pageInfo = new PageCursorInfo(pos.getPageNr(), cache.getNumberOfMessages());
+         consumedPages.put(pos.getPageNr(), pageInfo);
+      }
+      
+      return pageInfo;
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------
+   
+   
 
    protected boolean match(final ServerMessage message)
    {
@@ -221,10 +269,15 @@
    }
 
    // Private -------------------------------------------------------
-
-   private void ignored(final PagePosition message)
+    
+   private void confirmPagePosition(final PagePosition pos)
    {
-      // TODO: Update reference counts
+      PageCursorInfo info = getPageInfo(pos);
+      
+      if (info.confirmed.incrementAndGet() == info.getNumberOfMessages())
+      {
+         // todo delete previous destinations
+      }
    }
 
    /**
@@ -246,9 +299,136 @@
     */
    private void installTXCallback(Transaction tx, PagePosition position)
    {
-      //TODO: Play with rollbacks on the reference counts
+      // It needs to persist, otherwise the cursor will return to the fist page position
+      tx.setContainsPersistent();
+      
+      PageCursorTX cursorTX = (PageCursorTX)tx.getProperty(TransactionPropertyIndexes.PAGE_CURSOR_POSITIONS);
+      
+      if (cursorTX == null)
+      {
+         cursorTX = new PageCursorTX();
+         tx.putProperty(TransactionPropertyIndexes.PAGE_CURSOR_POSITIONS,cursorTX);
+         tx.addOperation(cursorTX);
+      }
+      
+      
    }
 
    // Inner classes -------------------------------------------------
+   
+   
+   private static class PageCursorInfo
+   {
+      // Number of messages existent on this page
+      private final int numberOfMessages;
+      
+      private final long pageId;
+      
+      // Confirmed ACKs on this page
+      private final List<PagePosition> acks = new LinkedList<PagePosition>();
+      
+      // We need a separate counter as the cursor may be ignoring certain values because of incomplete transactions or expressions 
+      private final AtomicInteger confirmed = new AtomicInteger(0);
+      
+      public PageCursorInfo(final long pageId, final int numberOfMessages)
+      {
+         this.pageId = pageId;
+         this.numberOfMessages = numberOfMessages;
+      }
 
+      /**
+       * @return the numberOfMessages
+       */
+      public int getNumberOfMessages()
+      {
+         return numberOfMessages;
+      }
+
+      /**
+       * @return the pageId
+       */
+      public long getPageId()
+      {
+         return pageId;
+      }
+      
+      public void addACK(final PagePosition posACK)
+      {
+         this.acks.add(posACK);
+      }
+      
+    }
+   
+   static class PageCursorTX implements TransactionOperation
+   {
+      HashMap<PageCursorImpl, List<PagePosition>> pendingPositions = new HashMap<PageCursorImpl, List<PagePosition>>();
+      
+      public void addPositionConfirmation(PageCursorImpl cursor, PagePosition position)
+      {
+         List<PagePosition> list = pendingPositions.get(cursor);
+         
+         if (list == null)
+         {
+            list = new LinkedList<PagePosition>();
+            pendingPositions.put(cursor, list);
+         }
+         
+         list.add(position);
+      }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.transaction.TransactionOperation#beforePrepare(org.hornetq.core.transaction.Transaction)
+       */
+      public void beforePrepare(Transaction tx) throws Exception
+      {
+      }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.transaction.TransactionOperation#afterPrepare(org.hornetq.core.transaction.Transaction)
+       */
+      public void afterPrepare(Transaction tx)
+      {
+      }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.transaction.TransactionOperation#beforeCommit(org.hornetq.core.transaction.Transaction)
+       */
+      public void beforeCommit(Transaction tx) throws Exception
+      {
+      }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.transaction.TransactionOperation#afterCommit(org.hornetq.core.transaction.Transaction)
+       */
+      public void afterCommit(Transaction tx)
+      {
+         for (Entry<PageCursorImpl, List<PagePosition>> entry : this.pendingPositions.entrySet())
+         {
+            PageCursorImpl cursor = entry.getKey();
+            
+            List<PagePosition> positions = entry.getValue();
+            
+            for (PagePosition confirmed : positions)
+            {
+               cursor.confirmPagePosition(confirmed);
+            }
+            
+         }
+      }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.transaction.TransactionOperation#beforeRollback(org.hornetq.core.transaction.Transaction)
+       */
+      public void beforeRollback(Transaction tx) throws Exception
+      {
+      }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.transaction.TransactionOperation#afterRollback(org.hornetq.core.transaction.Transaction)
+       */
+      public void afterRollback(Transaction tx)
+      {
+      }
+   }
+
 }

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java	2010-10-11 13:42:51 UTC (rev 9768)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java	2010-10-11 21:16:17 UTC (rev 9769)
@@ -26,6 +26,7 @@
 import org.hornetq.core.paging.cursor.PagePosition;
 import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.core.server.ServerMessage;
+import org.hornetq.utils.ExecutorFactory;
 import org.hornetq.utils.SoftValueHashMap;
 import org.jboss.netty.util.internal.ConcurrentHashMap;
 
@@ -46,21 +47,26 @@
    // Attributes ----------------------------------------------------
 
    private final PagingStore pagingStore;
-   
+
    private final StorageManager storageManager;
-   
-   private SoftValueHashMap<Long, PageCacheImpl> softCache = new SoftValueHashMap<Long, PageCacheImpl>();
-   
+
+   private final ExecutorFactory executorFactory;
+
+   private SoftValueHashMap<Long, PageCache> softCache = new SoftValueHashMap<Long, PageCache>();
+
    private ConcurrentMap<Long, PageCursor> activeCursors = new ConcurrentHashMap<Long, PageCursor>();
 
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
 
-   public PageCursorProviderImpl(final PagingStore pagingStore, final StorageManager storageManager)
+   public PageCursorProviderImpl(final PagingStore pagingStore,
+                                 final StorageManager storageManager,
+                                 final ExecutorFactory executorFactory)
    {
       this.pagingStore = pagingStore;
       this.storageManager = storageManager;
+      this.executorFactory = executorFactory;
    }
 
    // Public --------------------------------------------------------
@@ -78,20 +84,23 @@
       PageCursor activeCursor = activeCursors.get(cursorID);
       if (activeCursor == null)
       {
-         activeCursor = new PageCursorImpl(this, pagingStore, storageManager, cursorID);
+         activeCursor = new PageCursorImpl(this, pagingStore, storageManager, executorFactory.getExecutor(), cursorID);
          PageCursor previousValue = activeCursors.putIfAbsent(cursorID, activeCursor);
          if (previousValue != null)
          {
             activeCursor = previousValue;
          }
       }
-      
+
       return activeCursor;
    }
-   
+
+   /**
+    * this will create a non-persistent cursor
+    */
    public PageCursor createCursor()
    {
-      return new PageCursorImpl(this, pagingStore, storageManager, 0);
+      return new PageCursorImpl(this, pagingStore, storageManager, executorFactory.getExecutor(), 0);
    }
 
    /* (non-Javadoc)
@@ -100,100 +109,55 @@
    public Pair<PagePosition, ServerMessage> getAfter(final PagePosition pos) throws Exception
    {
       // TODO: consider page transactions here to avoid receiving an uncommitted message
-      // TODO: consider the case where a page came empty because of an ignored PageTX
+      // TODO: consider the case where a full page is ignored because of a TX
       PagePosition retPos = pos.nextMessage();
-      
-      PageCache cache = getPageCache(pos.getPageNr());
-      
+
+      PageCache cache = getPageCache(pos);
+
       if (retPos.getMessageNr() >= cache.getNumberOfMessages())
       {
          retPos = pos.nextPage();
-         
-         cache = getPageCache(retPos.getPageNr());
+
+         cache = getPageCache(retPos);
+
          if (cache == null)
          {
             return null;
          }
-         
+
          if (retPos.getMessageNr() >= cache.getNumberOfMessages())
          {
             return null;
          }
       }
-      
+
       return new Pair<PagePosition, ServerMessage>(retPos, cache.getMessage(retPos.getMessageNr()));
    }
-   
+
    public ServerMessage getMessage(final PagePosition pos) throws Exception
    {
-      PageCache cache = getPageCache(pos.getPageNr());
-      
+      PageCache cache = getPageCache(pos);
+
       if (pos.getMessageNr() >= cache.getNumberOfMessages())
       {
          // sanity check, this should never happen unless there's a bug
          throw new IllegalStateException("Invalid messageNumber passed = " + pos);
       }
-      
+
       return cache.getMessage(pos.getMessageNr());
    }
 
-   public PageCache getPageCache(final long pageId) throws Exception
+   public PageCache getPageCache(PagePosition pos)
    {
-      boolean needToRead = false;
-      PageCacheImpl cache = null;
-      synchronized (this)
+      PageCache cache = pos.getPageCache();
+      if (cache == null)
       {
-         if (pageId > pagingStore.getNumberOfPages())
-         {
-            return null;
-         }
-         
-         cache = softCache.get(pageId);
-         if (cache == null)
-         {
-            cache = createPageCache(pageId);
-            needToRead = true;
-            // anyone reading from this cache will have to wait reading to finish first
-            // we also want only one thread reading this cache
-            cache.lock();
-            softCache.put(pageId, cache);
-         }
+         cache = getPageCache(pos.getPageNr());
+         pos.setPageCache(cache);
       }
-      
-      // Reading is done outside of the synchronized block, however
-      // the page stays locked until the entire reading is finished
-      if (needToRead)
-      {
-         try
-         {
-            Page page = pagingStore.createPage((int)pageId);
-            
-            page.open();
-            
-            List<PagedMessage> pgdMessages = page.read();
-            
-            ServerMessage srvMessages[] = new ServerMessage[pgdMessages.size()];
-            
-            int i = 0;
-            for (PagedMessage pdgMessage : pgdMessages)
-            {
-               ServerMessage message = pdgMessage.getMessage(storageManager);
-               srvMessages[i++] = message;
-            }
-            
-            cache.setMessages(srvMessages);
-            
-         }
-         finally
-         {
-            cache.unlock();
-         }
-      }
-      
-      
       return cache;
    }
-   
+
    public int getCacheSize()
    {
       return softCache.size();
@@ -206,7 +170,7 @@
          cursor.processReload();
       }
    }
-   
+
    public void stop()
    {
       activeCursors.clear();
@@ -215,7 +179,7 @@
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------
-   
+
    protected PageCacheImpl createPageCache(final long pageId) throws Exception
    {
       return new PageCacheImpl(pagingStore.createPage((int)pageId));
@@ -223,6 +187,69 @@
 
    // Private -------------------------------------------------------
 
+   private PageCache getPageCache(final long pageId)
+   {
+      try
+      {
+         boolean needToRead = false;
+         PageCache cache = null;
+         synchronized (this)
+         {
+            if (pageId > pagingStore.getCurrentWritingPage())
+            {
+               return null;
+            }
+
+            cache = softCache.get(pageId);
+            if (cache == null)
+            {
+               cache = createPageCache(pageId);
+               needToRead = true;
+               // anyone reading from this cache will have to wait reading to finish first
+               // we also want only one thread reading this cache
+               cache.lock();
+               softCache.put(pageId, cache);
+            }
+         }
+
+         // Reading is done outside of the synchronized block, however
+         // the page stays locked until the entire reading is finished
+         if (needToRead)
+         {
+            try
+            {
+               Page page = pagingStore.createPage((int)pageId);
+
+               page.open();
+
+               List<PagedMessage> pgdMessages = page.read();
+
+               ServerMessage srvMessages[] = new ServerMessage[pgdMessages.size()];
+
+               int i = 0;
+               for (PagedMessage pdgMessage : pgdMessages)
+               {
+                  ServerMessage message = pdgMessage.getMessage(storageManager);
+                  srvMessages[i++] = message;
+               }
+
+               cache.setMessages(srvMessages);
+
+            }
+            finally
+            {
+               cache.unlock();
+            }
+         }
+
+         return cache;
+      }
+      catch (Exception e)
+      {
+         throw new RuntimeException("Couldn't complete paging due to an IO Exception on Paging - " + e.getMessage(), e);
+      }
+   }
+
    // Inner classes -------------------------------------------------
 
 }

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PagePositionImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PagePositionImpl.java	2010-10-11 13:42:51 UTC (rev 9768)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PagePositionImpl.java	2010-10-11 21:16:17 UTC (rev 9769)
@@ -13,6 +13,10 @@
 
 package org.hornetq.core.paging.cursor.impl;
 
+import java.lang.ref.WeakReference;
+
+import org.hornetq.core.paging.Page;
+import org.hornetq.core.paging.cursor.PageCache;
 import org.hornetq.core.paging.cursor.PagePosition;
 
 /**
@@ -30,6 +34,8 @@
 
    /** ID used for storage */
    private long recordID;
+   
+   private volatile WeakReference<PageCache> cacheReference;
 
    /**
     * @param pageNr
@@ -42,6 +48,12 @@
       this.messageNr = messageNr;
    }
 
+   public PagePositionImpl(long pageNr, int messageNr, PageCache pageCache)
+   {
+      this(pageNr, messageNr);
+      this.setPageCache(pageCache);
+   }
+
    /**
     * @param pageNr
     * @param messageNr
@@ -52,6 +64,31 @@
    }
 
    /**
+    * The cached page associaed with this position
+    * @return
+    */
+   public PageCache getPageCache()
+   {
+      if (cacheReference == null)
+      {
+         return null;
+      }
+      else
+      {
+         return cacheReference.get();
+      }
+   }
+   
+   public void setPageCache(final PageCache cache)
+   {
+      if (cache != null)
+      {
+         this.cacheReference = new WeakReference<PageCache>(cache);
+      }
+   }
+
+
+   /**
     * @return the recordID
     */
    public long getRecordID()
@@ -117,7 +154,7 @@
 
    public PagePosition nextMessage()
    {
-      return new PagePositionImpl(this.pageNr, this.messageNr + 1);
+      return new PagePositionImpl(this.pageNr, this.messageNr + 1, this.getPageCache());
    }
 
    public PagePosition nextPage()

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageImpl.java	2010-10-11 13:42:51 UTC (rev 9768)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageImpl.java	2010-10-11 21:16:17 UTC (rev 9769)
@@ -13,7 +13,6 @@
 
 package org.hornetq.core.paging.impl;
 
-import java.nio.Buffer;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
@@ -35,7 +34,7 @@
  * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
  *
  */
-public class PageImpl implements Page
+public class PageImpl implements Page, Comparable<Page>
 {
    // Constants -----------------------------------------------------
 
@@ -241,13 +240,54 @@
    {
       return "PageImpl::pageID="  + this.pageId + ", file=" + this.file;
    }
+   
 
+   /* (non-Javadoc)
+    * @see java.lang.Comparable#compareTo(java.lang.Object)
+    */
+   public int compareTo(Page otherPage)
+   {
+      return otherPage.getPageId() - this.pageId;
+   }
+
+
+   /* (non-Javadoc)
+    * @see java.lang.Object#hashCode()
+    */
+   @Override
+   public int hashCode()
+   {
+      final int prime = 31;
+      int result = 1;
+      result = prime * result + pageId;
+      return result;
+   }
+
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------
 
    // Private -------------------------------------------------------
 
+   /* (non-Javadoc)
+    * @see java.lang.Object#equals(java.lang.Object)
+    */
+   @Override
+   public boolean equals(Object obj)
+   {
+      if (this == obj)
+         return true;
+      if (obj == null)
+         return false;
+      if (getClass() != obj.getClass())
+         return false;
+      PageImpl other = (PageImpl)obj;
+      if (pageId != other.pageId)
+         return false;
+      return true;
+   }
+
    /**
     * @param position
     * @param msgNumber

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreFactoryNIO.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreFactoryNIO.java	2010-10-11 13:42:51 UTC (rev 9768)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreFactoryNIO.java	2010-10-11 21:16:17 UTC (rev 9769)
@@ -98,7 +98,7 @@
                                  this,
                                  address,
                                  settings,
-                                 executorFactory.getExecutor(),
+                                 executorFactory,
                                  syncNonTransactional);
    }
 
@@ -202,7 +202,7 @@
                                                     this,
                                                     address,
                                                     settings,
-                                                    executorFactory.getExecutor(),
+                                                    executorFactory,
                                                     syncNonTransactional);
 
             storesReturn.add(store);

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2010-10-11 13:42:51 UTC (rev 9768)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2010-10-11 21:16:17 UTC (rev 9769)
@@ -53,6 +53,7 @@
 import org.hornetq.core.transaction.Transaction.State;
 import org.hornetq.core.transaction.TransactionPropertyIndexes;
 import org.hornetq.core.transaction.impl.TransactionImpl;
+import org.hornetq.utils.ExecutorFactory;
 
 /**
  * 
@@ -150,7 +151,7 @@
                           final PagingStoreFactory storeFactory,
                           final SimpleString storeName,
                           final AddressSettings addressSettings,
-                          final Executor executor,
+                          final ExecutorFactory executorFactory,
                           final boolean syncNonTransactional)
    {
       if (pagingManager == null)
@@ -182,7 +183,7 @@
                                          pageSize);
       }
 
-      this.executor = executor;
+      this.executor = executorFactory.getExecutor();
 
       this.pagingManager = pagingManager;
 
@@ -192,7 +193,7 @@
 
       this.syncNonTransactional = syncNonTransactional;
       
-      this.cursorProvider = new PageCursorProviderImpl(this, this.storageManager);
+      this.cursorProvider = new PageCursorProviderImpl(this, this.storageManager, executorFactory);
 
       // Post office could be null on the backup node
       if (postOffice == null)
@@ -279,6 +280,11 @@
    {
       return numberOfPages;
    }
+   
+   public int getCurrentWritingPage()
+   {
+      return currentPageId;
+   }
 
    public SimpleString getStoreName()
    {

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/transaction/TransactionPropertyIndexes.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/transaction/TransactionPropertyIndexes.java	2010-10-11 13:42:51 UTC (rev 9768)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/transaction/TransactionPropertyIndexes.java	2010-10-11 21:16:17 UTC (rev 9769)
@@ -31,4 +31,6 @@
    public static final int REFS_OPERATION = 6;
 
    public static final int PAGE_MESSAGES_OPERATION = 7;
+   
+   public static final int PAGE_CURSOR_POSITIONS = 8;
 }

Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/client/PagingTest.java	2010-10-11 13:42:51 UTC (rev 9768)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/client/PagingTest.java	2010-10-11 21:16:17 UTC (rev 9769)
@@ -999,7 +999,7 @@
                          final PagingStoreFactory storeFactory,
                          final SimpleString storeName,
                          final AddressSettings addressSettings,
-                         final Executor executor,
+                         final ExecutorFactory executorFactory,
                          final boolean syncNonTransactional)
          {
             super(address,
@@ -1010,7 +1010,7 @@
                   storeFactory,
                   storeName,
                   addressSettings,
-                  executor,
+                  executorFactory,
                   syncNonTransactional);
          }
 
@@ -1073,7 +1073,7 @@
                                        this,
                                        address,
                                        settings,
-                                       getExecutorFactory().getExecutor(),
+                                       getExecutorFactory(),
                                        syncNonTransactional);
          }
 

Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCrashTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCrashTest.java	2010-10-11 13:42:51 UTC (rev 9768)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCrashTest.java	2010-10-11 21:16:17 UTC (rev 9769)
@@ -43,6 +43,7 @@
 import org.hornetq.spi.core.security.HornetQSecurityManager;
 import org.hornetq.spi.core.security.HornetQSecurityManagerImpl;
 import org.hornetq.tests.util.ServiceTestBase;
+import org.hornetq.utils.ExecutorFactory;
 import org.hornetq.utils.OrderedExecutorFactory;
 
 /**
@@ -292,7 +293,7 @@
             factoryField.setAccessible(true);
 
             OrderedExecutorFactory factory = (org.hornetq.utils.OrderedExecutorFactory)factoryField.get(this);
-            return new FailingPagingStore(destinationName, settings, factory.getExecutor(), syncNonTransactional);
+            return new FailingPagingStore(destinationName, settings, factory, syncNonTransactional);
          }
 
          // Package protected ---------------------------------------------
@@ -312,7 +313,7 @@
              */
             public FailingPagingStore(final SimpleString storeName,
                                       final AddressSettings addressSettings,
-                                      final Executor executor,
+                                      final ExecutorFactory executor,
                                       final boolean syncNonTransactional)
             {
                super(storeName,

Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java	2010-10-11 13:42:51 UTC (rev 9768)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java	2010-10-11 21:16:17 UTC (rev 9769)
@@ -26,6 +26,7 @@
 import org.hornetq.core.paging.cursor.PageCursorProvider;
 import org.hornetq.core.paging.cursor.PagePosition;
 import org.hornetq.core.paging.cursor.impl.PageCursorProviderImpl;
+import org.hornetq.core.paging.cursor.impl.PagePositionImpl;
 import org.hornetq.core.paging.impl.PagingStoreImpl;
 import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
 import org.hornetq.core.server.HornetQServer;
@@ -78,11 +79,11 @@
 
       System.out.println("NumberOfPages = " + numberOfPages);
 
-      PageCursorProviderImpl cursorProvider = new PageCursorProviderImpl(lookupPageStore(ADDRESS), server.getStorageManager());
+      PageCursorProviderImpl cursorProvider = new PageCursorProviderImpl(lookupPageStore(ADDRESS), server.getStorageManager(), server.getExecutorFactory());
 
       for (int i = 0; i < numberOfPages; i++)
       {
-         PageCache cache = cursorProvider.getPageCache(i + 1);
+         PageCache cache = cursorProvider.getPageCache(new PagePositionImpl(i + 1, 0));
          System.out.println("Page " + i + " had " + cache.getNumberOfMessages() + " messages");
 
       }
@@ -104,7 +105,7 @@
 
       System.out.println("NumberOfPages = " + numberOfPages);
 
-      PageCursorProviderImpl cursorProvider = new PageCursorProviderImpl(lookupPageStore(ADDRESS), server.getStorageManager());
+      PageCursorProviderImpl cursorProvider = new PageCursorProviderImpl(lookupPageStore(ADDRESS), server.getStorageManager(), server.getExecutorFactory());
       
       PageCursor cursor = cursorProvider.createCursor();
       
@@ -134,9 +135,9 @@
 
       System.out.println("NumberOfPages = " + numberOfPages);
 
-      PageCursorProviderImpl cursorProvider = new PageCursorProviderImpl(lookupPageStore(ADDRESS), server.getStorageManager());
+      PageCursorProviderImpl cursorProvider = new PageCursorProviderImpl(lookupPageStore(ADDRESS), server.getStorageManager(), server.getExecutorFactory());
       
-      PageCache cache = cursorProvider.getPageCache(2);
+      PageCache cache = cursorProvider.getPageCache(new PagePositionImpl(2,0));
       
       assertNull(cache);
    }
@@ -287,11 +288,16 @@
    }
    
    
-   public void testRollbackScenarios() throws Exception
+   public void testRollbackScenariosOnACK() throws Exception
    {
       
    }
    
+   public void testReadRolledBackData() throws Exception
+   {
+      
+   }
+   
    public void testPrepareScenarios() throws Exception
    {
       
@@ -316,6 +322,11 @@
    {
       
    }
+   
+   public void testFirstMessageInTheMiddle() throws Exception
+   {
+      
+   }
 
    /**
     * @param numMessages

Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java	2010-10-11 13:42:51 UTC (rev 9768)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java	2010-10-11 21:16:17 UTC (rev 9769)
@@ -73,6 +73,7 @@
 import org.hornetq.tests.unit.core.server.impl.fakes.FakePostOffice;
 import org.hornetq.tests.util.RandomUtil;
 import org.hornetq.tests.util.UnitTestCase;
+import org.hornetq.utils.ExecutorFactory;
 import org.hornetq.utils.UUID;
 
 /**
@@ -143,7 +144,7 @@
                                                   null,
                                                   PagingStoreImplTest.destinationTestName,
                                                   addressSettings,
-                                                  executor,
+                                                  getExecutorFactory(),
                                                   true);
 
       storeImpl.start();
@@ -179,7 +180,7 @@
                                                            storeFactory,
                                                            PagingStoreImplTest.destinationTestName,
                                                            addressSettings,
-                                                           executor,
+                                                           getExecutorFactory(),
                                                            true);
 
       storeImpl.start();
@@ -215,7 +216,7 @@
                                       null,
                                       PagingStoreImplTest.destinationTestName,
                                       addressSettings,
-                                      executor,
+                                      getExecutorFactory(),
                                       true);
 
       storeImpl.start();
@@ -242,7 +243,7 @@
                                                            storeFactory,
                                                            PagingStoreImplTest.destinationTestName,
                                                            addressSettings,
-                                                           executor,
+                                                           getExecutorFactory(),
                                                            true);
 
       storeImpl.start();
@@ -317,7 +318,7 @@
                                                            storeFactory,
                                                            PagingStoreImplTest.destinationTestName,
                                                            addressSettings,
-                                                           executor,
+                                                           getExecutorFactory(),
                                                            true);
 
       storeImpl.start();
@@ -464,7 +465,7 @@
                                                                  storeFactory,
                                                                  new SimpleString("test"),
                                                                  settings,
-                                                                 executor,
+                                                                 getExecutorFactory(),
                                                                  true);
 
       storeImpl.start();
@@ -627,7 +628,7 @@
                                                             storeFactory,
                                                             new SimpleString("test"),
                                                             settings,
-                                                            executor,
+                                                            getExecutorFactory(),
                                                             true);
       storeImpl2.start();
 
@@ -710,7 +711,7 @@
                                                                  storeFactory,
                                                                  new SimpleString("test"),
                                                                  settings,
-                                                                 executor,
+                                                                 getExecutorFactory(),
                                                                  true);
 
       storeImpl.start();
@@ -854,6 +855,18 @@
    {
       return new FakePostOffice();
    }
+   
+   private ExecutorFactory getExecutorFactory()
+   {
+      return new ExecutorFactory()
+      {
+         
+         public Executor getExecutor()
+         {
+             return executor;
+         }
+      };
+   }
 
    private ServerMessage createMessage(final long id,
                                        final PagingStore store,



More information about the hornetq-commits mailing list