[hornetq-commits] JBoss hornetq SVN: r9780 - in branches/Branch_New_Paging: src/main/org/hornetq/core/paging/cursor and 3 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Oct 12 20:10:31 EDT 2010


Author: clebert.suconic at jboss.com
Date: 2010-10-12 20:10:30 -0400 (Tue, 12 Oct 2010)
New Revision: 9780

Added:
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/LivePageCache.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/LivePageCacheImpl.java
Modified:
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/Page.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCache.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCacheImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
   branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCrashTest.java
   branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
Log:
Moving the cursor over live data as well

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/Page.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/Page.java	2010-10-12 18:17:07 UTC (rev 9779)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/Page.java	2010-10-13 00:10:30 UTC (rev 9780)
@@ -15,6 +15,8 @@
 
 import java.util.List;
 
+import org.hornetq.core.paging.cursor.LivePageCache;
+
 /**
  * 
  * @see PagingManager
@@ -28,6 +30,8 @@
    void write(PagedMessage message) throws Exception;
 
    List<PagedMessage> read() throws Exception;
+   
+   void setLiveCache(LivePageCache pageCache);
 
    int getSize();
 

Added: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/LivePageCache.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/LivePageCache.java	                        (rev 0)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/LivePageCache.java	2010-10-13 00:10:30 UTC (rev 9780)
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.paging.cursor;
+
+import org.hornetq.core.server.ServerMessage;
+
+/**
+ * A LivePageCache
+ *
+ * @author clebertsuconic
+ *
+ *
+ */
+public interface LivePageCache extends PageCache
+{
+   
+   void addLiveMessage(ServerMessage message);
+
+   void close();
+}

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCache.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCache.java	2010-10-12 18:17:07 UTC (rev 9779)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCache.java	2010-10-13 00:10:30 UTC (rev 9780)
@@ -30,6 +30,12 @@
    int getNumberOfMessages();
    
    void setMessages(ServerMessage[] messages);
+   
+   /**
+    * If this cache is still being updated
+    * @return
+    */
+   boolean isLive();
 
    /**
     * 

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java	2010-10-12 18:17:07 UTC (rev 9779)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java	2010-10-13 00:10:30 UTC (rev 9780)
@@ -39,6 +39,8 @@
    // Public --------------------------------------------------------
 
    PageCache getPageCache(PagePosition pos);
+   
+   void addPageCache(PageCache cache);
 
    PagingStore getAssociatedStore();
 
@@ -55,8 +57,6 @@
     */
    PageCursor createCursor();
 
-   // PageCursor recoverCursor(PagePosition position);
-
    Pair<PagePosition, ServerMessage> getAfter(PagePosition pos) throws Exception;
    
    ServerMessage getMessage(PagePosition pos) throws Exception;

Added: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/LivePageCacheImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/LivePageCacheImpl.java	                        (rev 0)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/LivePageCacheImpl.java	2010-10-13 00:10:30 UTC (rev 9780)
@@ -0,0 +1,145 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.paging.cursor.impl;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.hornetq.core.paging.Page;
+import org.hornetq.core.paging.cursor.LivePageCache;
+import org.hornetq.core.server.ServerMessage;
+
+/**
+ * This is the same as PageCache, however this is for the page that's being currently written.
+ *
+ * @author clebertsuconic
+ *
+ *
+ */
+public class LivePageCacheImpl implements LivePageCache
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+   
+   private final List<ServerMessage> messages = new LinkedList<ServerMessage>();
+   
+   private final Page page;
+   
+   private boolean isLive = true;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+   
+   public LivePageCacheImpl(final Page page)
+   {
+      this.page = page;
+   }
+
+   // Public --------------------------------------------------------
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.paging.cursor.PageCache#getPage()
+    */
+   public Page getPage()
+   {
+      return page;
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.paging.cursor.PageCache#getNumberOfMessages()
+    */
+   public synchronized int getNumberOfMessages()
+   {
+      return messages.size();
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.paging.cursor.PageCache#setMessages(org.hornetq.core.server.ServerMessage[])
+    */
+   public synchronized void setMessages(ServerMessage[] messages)
+   {
+      // This method shouldn't be called on liveCache, but we will provide the implementation for it anyway
+      for (ServerMessage msg : messages)
+      {
+         addLiveMessage(msg);
+      }
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.paging.cursor.PageCache#getMessage(int)
+    */
+   public synchronized ServerMessage getMessage(int messageNumber)
+   {
+      if (messageNumber < messages.size())
+      {
+         return messages.get(messageNumber);
+      }
+      else
+      {
+         return null;
+      }
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.paging.cursor.PageCache#lock()
+    */
+   public void lock()
+   {
+      // nothing to be done on live cache
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.paging.cursor.PageCache#unlock()
+    */
+   public void unlock()
+   {
+      // nothing to be done on live cache
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.paging.cursor.PageCache#isLive()
+    */
+   public synchronized boolean isLive()
+   {
+      return isLive;
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.paging.cursor.LivePageCache#addLiveMessage(org.hornetq.core.server.ServerMessage)
+    */
+   public synchronized void addLiveMessage(ServerMessage message)
+   {
+      this.messages.add(message);
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.paging.cursor.LivePageCache#close()
+    */
+   public synchronized void close()
+   {
+      this.isLive = false;
+   }
+
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCacheImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCacheImpl.java	2010-10-12 18:17:07 UTC (rev 9779)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCacheImpl.java	2010-10-13 00:10:30 UTC (rev 9780)
@@ -22,8 +22,6 @@
 
 /**
  * The caching associated to a single page.
- * 
- * TODO: Solve how to update the cache for the current page on PagingStore.
  *
  * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
  *
@@ -112,7 +110,15 @@
       }
    }
 
+   /* (non-Javadoc)
+    * @see org.hornetq.core.paging.cursor.PageCache#isLive()
+    */
+   public boolean isLive()
+   {
+      return true;
+   }
 
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java	2010-10-12 18:17:07 UTC (rev 9779)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java	2010-10-13 00:10:30 UTC (rev 9780)
@@ -368,7 +368,7 @@
 
       final ArrayList<PageCursorInfo> completedPages = new ArrayList<PageCursorInfo>();
 
-      // First get the completed pages
+      // First get the completed pages using a lock   
       synchronized (this)
       {
          for (Entry<Long, PageCursorInfo> entry : consumedPages.entrySet())

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java	2010-10-12 18:17:07 UTC (rev 9779)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java	2010-10-13 00:10:30 UTC (rev 9780)
@@ -114,7 +114,7 @@
 
       PageCache cache = getPageCache(pos);
 
-      if (retPos.getMessageNr() >= cache.getNumberOfMessages())
+      if (!cache.isLive() && retPos.getMessageNr() >= cache.getNumberOfMessages())
       {
          retPos = pos.nextPage();
 
@@ -130,8 +130,17 @@
             return null;
          }
       }
-
-      return new Pair<PagePosition, ServerMessage>(retPos, cache.getMessage(retPos.getMessageNr()));
+      
+      ServerMessage serverMessage = cache.getMessage(retPos.getMessageNr());
+      
+      if (serverMessage != null)
+      {
+         return new Pair<PagePosition, ServerMessage>(retPos, cache.getMessage(retPos.getMessageNr()));
+      }
+      else
+      {
+         return null;
+      }
    }
 
    public ServerMessage getMessage(final PagePosition pos) throws Exception
@@ -147,6 +156,9 @@
       return cache.getMessage(pos.getMessageNr());
    }
 
+   /**
+    * No need to synchronize this method since the private getPageCache will have a synchronized call
+    */
    public PageCache getPageCache(PagePosition pos)
    {
       PageCache cache = pos.getPageCache();
@@ -157,8 +169,14 @@
       }
       return cache;
    }
+   
+   public synchronized void addPageCache(PageCache cache)
+   {
+      // TODO: remove the type cast here
+      softCache.put((long)cache.getPage().getPageId(), cache);
+   }
 
-   public int getCacheSize()
+   public synchronized int getCacheSize()
    {
       return softCache.size();
    }
@@ -171,6 +189,7 @@
       }
    }
 
+
    public void stop()
    {
       activeCursors.clear();
@@ -180,6 +199,7 @@
 
    // Protected -----------------------------------------------------
 
+   /* Protected as we may let test cases to instrument the test */
    protected PageCacheImpl createPageCache(final long pageId) throws Exception
    {
       return new PageCacheImpl(pagingStore.createPage((int)pageId));

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageImpl.java	2010-10-12 18:17:07 UTC (rev 9779)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageImpl.java	2010-10-13 00:10:30 UTC (rev 9780)
@@ -26,6 +26,7 @@
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.paging.Page;
 import org.hornetq.core.paging.PagedMessage;
+import org.hornetq.core.paging.cursor.LivePageCache;
 import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.utils.DataConstants;
 
@@ -57,6 +58,11 @@
    private final SequentialFile file;
 
    private final SequentialFileFactory fileFactory;
+   
+   /**
+    * The page cache that will be filled with data as we write more data
+    */
+   private volatile LivePageCache pageCache;
 
    private final AtomicInteger size = new AtomicInteger(0);
 
@@ -89,6 +95,11 @@
    {
       return pageId;
    }
+   
+   public void setLiveCache(LivePageCache pageCache)
+   {
+      this.pageCache = pageCache;
+   }
 
    public List<PagedMessage> read() throws Exception
    {
@@ -167,6 +178,11 @@
       buffer.rewind();
 
       file.writeDirect(buffer, false);
+      
+      if (pageCache != null)
+      {
+         pageCache.addLiveMessage(message.getMessage(storageManager));
+      }
 
       numberOfMessages.incrementAndGet();
       size.addAndGet(buffer.limit());
@@ -192,6 +208,12 @@
       {
          storageManager.pageClosed(storeName, pageId);
       }
+      if (pageCache != null)
+      {
+         pageCache.close();
+         // leave it to the soft cache to decide when to release it now
+         pageCache = null;
+      }
       file.close();
    }
 

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2010-10-12 18:17:07 UTC (rev 9779)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2010-10-13 00:10:30 UTC (rev 9780)
@@ -40,7 +40,9 @@
 import org.hornetq.core.paging.PagingManager;
 import org.hornetq.core.paging.PagingStore;
 import org.hornetq.core.paging.PagingStoreFactory;
+import org.hornetq.core.paging.cursor.LivePageCache;
 import org.hornetq.core.paging.cursor.PageCursorProvider;
+import org.hornetq.core.paging.cursor.impl.LivePageCacheImpl;
 import org.hornetq.core.paging.cursor.impl.PageCursorProviderImpl;
 import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.core.postoffice.DuplicateIDCache;
@@ -545,9 +547,9 @@
       return currentPage;
    }
 
-   public Page createPage(final int page) throws Exception
+   public Page createPage(final int pageNumber) throws Exception
    {
-      String fileName = createFileName(page);
+      String fileName = createFileName(pageNumber);
 
       if (fileFactory == null)
       {
@@ -555,14 +557,24 @@
       }
 
       SequentialFile file = fileFactory.createSequentialFile(fileName, 1000);
+      
+      Page page = new PageImpl(storeName, storageManager, fileFactory, file, pageNumber);
+      
+      LivePageCache pageCache = new LivePageCacheImpl(page);
+      
+      page.setLiveCache(pageCache);
 
+      cursorProvider.addPageCache(pageCache);
+      
+      // To create the file 
       file.open();
 
       file.position(0);
 
       file.close();
+      
 
-      return new PageImpl(storeName, storageManager, fileFactory, file, page);
+      return page;
    }
 
    // TestSupportPageStore ------------------------------------------

Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCrashTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCrashTest.java	2010-10-12 18:17:07 UTC (rev 9779)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCrashTest.java	2010-10-13 00:10:30 UTC (rev 9780)
@@ -18,7 +18,6 @@
 import java.lang.reflect.Field;
 import java.util.HashMap;
 import java.util.List;
-import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
 
 import junit.framework.Assert;
@@ -34,6 +33,7 @@
 import org.hornetq.core.paging.PagedMessage;
 import org.hornetq.core.paging.PagingManager;
 import org.hornetq.core.paging.PagingStore;
+import org.hornetq.core.paging.cursor.LivePageCache;
 import org.hornetq.core.paging.impl.PagingManagerImpl;
 import org.hornetq.core.paging.impl.PagingStoreFactoryNIO;
 import org.hornetq.core.paging.impl.PagingStoreImpl;
@@ -434,6 +434,13 @@
          {
             delegatedPage = delegatePage;
          }
+
+         /* (non-Javadoc)
+          * @see org.hornetq.core.paging.Page#setLiveCache(org.hornetq.core.paging.cursor.LivePageCache)
+          */
+         public void setLiveCache(LivePageCache pageCache)
+         {
+         }
       }
 
    }

Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java	2010-10-12 18:17:07 UTC (rev 9779)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java	2010-10-13 00:10:30 UTC (rev 9780)
@@ -292,7 +292,52 @@
       
    }
    
+   public void testConsumeLivePage() throws Exception
+   {
+      PagingStoreImpl pageStore = lookupPageStore(ADDRESS);
+
+      pageStore.startPaging();
+
+      final int NUM_MESSAGES = 1000;
+      
+      final int messageSize = 1024 * 1024;
+      
+      
+      PageCursorProvider cursorProvider = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier();
+      System.out.println("cursorProvider = " + cursorProvider);
+      
+      PageCursor cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getCursor(queue.getID());
+      
+      System.out.println("Cursor: " + cursor);
+
+      
+      for (int i = 0; i < NUM_MESSAGES; i++)
+      {
+         if (i % 100 == 0) System.out.println("Paged " + i);
+         
+         HornetQBuffer buffer = RandomUtil.randomBuffer(messageSize, i + 1l);
+
+         ServerMessage msg = new ServerMessageImpl(i, buffer.writerIndex());
+         msg.putIntProperty("key", i);
+         
+         msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
+
+         Assert.assertTrue(pageStore.page(msg));
+         
+         Pair<PagePosition, ServerMessage> readMessage = cursor.moveNext();
+         
+         assertNotNull(readMessage);
+         
+         // TODO: ack on live data
+         
+         assertEquals(i, readMessage.b.getIntProperty("key").intValue());
+         
+         assertNull(cursor.moveNext());
+      }
+      
+   }
    
+   
    public void testRollbackScenariosOnACK() throws Exception
    {
       



More information about the hornetq-commits mailing list