[hornetq-commits] JBoss hornetq SVN: r9742 - in branches/Branch_New_Paging: src/main/org/hornetq/core/paging/cursor and 3 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Oct 4 12:12:31 EDT 2010


Author: clebert.suconic at jboss.com
Date: 2010-10-04 12:12:30 -0400 (Mon, 04 Oct 2010)
New Revision: 9742

Added:
   branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
Removed:
   branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCacheTest.java
Modified:
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagePosition.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PagePositionImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
Log:
PageCursor implementation 

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java	2010-09-30 23:11:44 UTC (rev 9741)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java	2010-10-04 16:12:30 UTC (rev 9742)
@@ -40,6 +40,10 @@
    SimpleString getStoreName();
 
    AddressFullMessagePolicy getAddressFullMessagePolicy();
+   
+   long getFirstPage();
+   
+   long getTopPage();
 
    long getPageSizeBytes();
 

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java	2010-09-30 23:11:44 UTC (rev 9741)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java	2010-10-04 16:12:30 UTC (rev 9742)
@@ -14,8 +14,7 @@
 package org.hornetq.core.paging.cursor;
 
 import org.hornetq.api.core.Pair;
-import org.hornetq.core.paging.PagedMessage;
-import org.hornetq.core.paging.cursor.impl.PagePositionImpl;
+import org.hornetq.core.server.ServerMessage;
 
 /**
  * A PageCursor
@@ -27,11 +26,11 @@
 public interface PageCursor
 {
    
-   Pair<PagePositionImpl, PagedMessage> moveNext();
+   Pair<PagePosition, ServerMessage> moveNext() throws Exception;
    
-   PagePositionImpl getFirstPosition();
+   PagePosition getFirstPosition();
    
-   void ack(PagePositionImpl position);
+   void ack(PagePosition position);
    
-   void returnElement(PagePositionImpl position);
+   void returnElement(PagePosition position);
 }

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java	2010-09-30 23:11:44 UTC (rev 9741)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java	2010-10-04 16:12:30 UTC (rev 9742)
@@ -13,8 +13,10 @@
 
 package org.hornetq.core.paging.cursor;
 
+import org.hornetq.api.core.Pair;
+import org.hornetq.core.paging.PagedMessage;
 import org.hornetq.core.paging.PagingStore;
-import org.hornetq.core.paging.cursor.impl.PagePositionImpl;
+import org.hornetq.core.server.ServerMessage;
 
 /**
  * The provider of Cursor for a given Address
@@ -29,6 +31,7 @@
    // Constants -----------------------------------------------------
 
    // Attributes ----------------------------------------------------
+   
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
@@ -42,9 +45,9 @@
    
    PageCursor createCursor();
    
-   PageCursor recoverCursor(PagePositionImpl position);
+   //PageCursor recoverCursor(PagePosition position);
    
-   PagePositionImpl getAfter(PagePositionImpl pos);
+   Pair<PagePosition,ServerMessage> getAfter(PagePosition pos) throws Exception;
 
    // Package protected ---------------------------------------------
 

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagePosition.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagePosition.java	2010-09-30 23:11:44 UTC (rev 9741)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagePosition.java	2010-10-04 16:12:30 UTC (rev 9742)
@@ -25,10 +25,15 @@
    
    long getRecordID();
 
+   // TODO: this belongs somewhere else
    void setRecordID(long recordID);
 
    long getPageNr();
 
-   long getMessageNr();
+   int getMessageNr();
    
+   PagePosition nextMessage();
+   
+   PagePosition nextPage();
+   
 }

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java	2010-09-30 23:11:44 UTC (rev 9741)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java	2010-10-04 16:12:30 UTC (rev 9742)
@@ -17,7 +17,10 @@
 import org.hornetq.core.paging.PagedMessage;
 import org.hornetq.core.paging.PagingStore;
 import org.hornetq.core.paging.cursor.PageCursor;
+import org.hornetq.core.paging.cursor.PageCursorProvider;
+import org.hornetq.core.paging.cursor.PagePosition;
 import org.hornetq.core.paging.cursor.StorageCursor;
+import org.hornetq.core.server.ServerMessage;
 
 /**
  * A PageCursorImpl
@@ -29,69 +32,99 @@
  */
 public class PageCursorImpl implements PageCursor
 {
-   
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
    private StorageCursor store;
-   
+
    private PagingStore pageStore;
    
-   public PageCursorImpl(PagingStore pageStore, StorageCursor store)
+   private final PageCursorProvider cursorProvider;
+
+   private volatile PagePosition lastPosition;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public PageCursorImpl(PageCursorProvider cursorProvider, PagingStore pageStore, StorageCursor store)
    {
       this.pageStore = pageStore;
       this.store = store;
+      this.cursorProvider = cursorProvider;
    }
 
+   // Public --------------------------------------------------------
+
    /* (non-Javadoc)
     * @see org.hornetq.core.paging.cursor.PageCursor#moveNext()
     */
-   public Pair<PagePositionImpl, PagedMessage> moveNext()
+   public synchronized Pair<PagePosition, ServerMessage> moveNext() throws Exception
    {
-      // TODO Auto-generated method stub
-      return null;
+      if (lastPosition == null)
+      {
+         lastPosition = recoverLastPosition();
+      }
+       
+      Pair<PagePosition,ServerMessage> message = null;
+      do
+      {
+        message = cursorProvider.getAfter(lastPosition);
+        if (message != null)
+        {
+           lastPosition = message.a;
+        }
+      }
+      while (message != null && !match(message.b));
+
+      return message;
    }
 
    /* (non-Javadoc)
     * @see org.hornetq.core.paging.cursor.PageCursor#confirm(org.hornetq.core.paging.cursor.PagePosition)
     */
-   public void ack(PagePositionImpl position)
+   public void ack(PagePosition position)
    {
       // TODO Auto-generated method stub
-      
+
    }
 
    /* (non-Javadoc)
     * @see org.hornetq.core.paging.cursor.PageCursor#returnElement(org.hornetq.core.paging.cursor.PagePosition)
     */
-   public void returnElement(PagePositionImpl position)
+   public void returnElement(PagePosition position)
    {
       // TODO Auto-generated method stub
-      
+
    }
 
    /* (non-Javadoc)
     * @see org.hornetq.core.paging.cursor.PageCursor#getFirstPosition()
     */
-   public PagePositionImpl getFirstPosition()
+   public PagePosition getFirstPosition()
    {
       // TODO Auto-generated method stub
       return null;
    }
 
-   // Constants -----------------------------------------------------
-
-   // Attributes ----------------------------------------------------
-
-   // Static --------------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   // Public --------------------------------------------------------
-
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------
+   
+   protected boolean match(ServerMessage message)
+   {
+      return true;
+   }
 
    // Private -------------------------------------------------------
 
+   private PagePosition recoverLastPosition()
+   {
+      long firstPage = pageStore.getFirstPage();
+      return new PagePositionImpl(firstPage, -1);
+   }
+   
    // Inner classes -------------------------------------------------
 
 }

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java	2010-09-30 23:11:44 UTC (rev 9741)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java	2010-10-04 16:12:30 UTC (rev 9742)
@@ -15,12 +15,14 @@
 
 import java.util.List;
 
+import org.hornetq.api.core.Pair;
 import org.hornetq.core.paging.Page;
 import org.hornetq.core.paging.PagedMessage;
 import org.hornetq.core.paging.PagingStore;
 import org.hornetq.core.paging.cursor.PageCache;
 import org.hornetq.core.paging.cursor.PageCursor;
 import org.hornetq.core.paging.cursor.PageCursorProvider;
+import org.hornetq.core.paging.cursor.PagePosition;
 import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.core.server.ServerMessage;
 import org.hornetq.utils.SoftValueHashMap;
@@ -66,7 +68,7 @@
     */
    public PageCursor createCursor()
    {
-      return null;
+      return new PageCursorImpl(this, pagingStore, null);
    }
 
    /* (non-Javadoc)
@@ -80,9 +82,29 @@
    /* (non-Javadoc)
     * @see org.hornetq.core.paging.cursor.PageCursorProvider#getAfter(org.hornetq.core.paging.cursor.PagePosition)
     */
-   public PagePositionImpl getAfter(final PagePositionImpl pos)
+   public Pair<PagePosition, ServerMessage> getAfter(final PagePosition pos) throws Exception
    {
-      return null;
+      PagePosition retPos = pos.nextMessage();
+      
+      PageCache cache = getPageCache(pos.getPageNr());
+      
+      if (retPos.getMessageNr() >= cache.getNumberOfMessages())
+      {
+         retPos = pos.nextPage();
+         
+         cache = getPageCache(retPos.getPageNr());
+         if (cache == null)
+         {
+            return null;
+         }
+         
+         if (retPos.getMessageNr() >= cache.getNumberOfMessages())
+         {
+            return null;
+         }
+      }
+      
+      return new Pair<PagePosition, ServerMessage>(retPos, cache.getMessage(retPos.getMessageNr()));
    }
 
    public PageCache getPageCache(final long pageId) throws Exception
@@ -91,6 +113,11 @@
       PageCacheImpl cache = null;
       synchronized (this)
       {
+         if (pageId > pagingStore.getNumberOfPages())
+         {
+            return null;
+         }
+         
          cache = softCache.get(pageId);
          if (cache == null)
          {

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PagePositionImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PagePositionImpl.java	2010-09-30 23:11:44 UTC (rev 9741)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PagePositionImpl.java	2010-10-04 16:12:30 UTC (rev 9742)
@@ -26,18 +26,16 @@
 {
    private long pageNr;
 
-   private long messageNr;
+   private int messageNr;
 
    /** ID used for storage */
    private long recordID;
-   
-   
 
    /**
     * @param pageNr
     * @param messageNr
     */
-   public PagePositionImpl(long pageNr, long messageNr)
+   public PagePositionImpl(long pageNr, int messageNr)
    {
       super();
       this.pageNr = pageNr;
@@ -71,7 +69,7 @@
    /**
     * @return the messageNr
     */
-   public long getMessageNr()
+   public int getMessageNr()
    {
       return messageNr;
    }
@@ -102,7 +100,17 @@
          return 0;
       }
    }
-   
+
+   public PagePosition nextMessage()
+   {
+      return new PagePositionImpl(this.pageNr, this.messageNr + 1);
+   }
+
+   public PagePosition nextPage()
+   {
+      return new PagePositionImpl(this.pageNr + 1, 0);
+   }
+
    public boolean isNextSequenceOf(PagePosition pos)
    {
       return this.pageNr == pos.getPageNr() && this.getRecordID() - pos.getRecordID() == 1;

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2010-09-30 23:11:44 UTC (rev 9741)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2010-10-04 16:12:30 UTC (rev 9742)
@@ -203,6 +203,16 @@
    // Public --------------------------------------------------------
 
    // PagingStore implementation ------------------------------------
+   
+   public long getFirstPage()
+   {
+      return firstPageId;
+   }
+   
+   public long getTopPage()
+   {
+      return currentPageId;
+   }
 
    public SimpleString getAddress()
    {

Deleted: branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCacheTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCacheTest.java	2010-09-30 23:11:44 UTC (rev 9741)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCacheTest.java	2010-10-04 16:12:30 UTC (rev 9742)
@@ -1,136 +0,0 @@
-/*
- * Copyright 2010 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *    http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied.  See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.tests.integration.paging;
-
-import java.util.HashMap;
-
-import junit.framework.Assert;
-
-import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.api.core.SimpleString;
-import org.hornetq.core.paging.cursor.PageCache;
-import org.hornetq.core.paging.cursor.impl.PageCursorProviderImpl;
-import org.hornetq.core.paging.impl.PagingStoreImpl;
-import org.hornetq.core.persistence.StorageManager;
-import org.hornetq.core.server.HornetQServer;
-import org.hornetq.core.server.ServerMessage;
-import org.hornetq.core.server.impl.ServerMessageImpl;
-import org.hornetq.core.settings.impl.AddressSettings;
-import org.hornetq.tests.util.RandomUtil;
-import org.hornetq.tests.util.ServiceTestBase;
-
-/**
- * A PageCacheTest
- *
- * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
- *
- *
- */
-public class PageCacheTest extends ServiceTestBase
-{
-
-   // Constants -----------------------------------------------------
-
-   // Attributes ----------------------------------------------------
-
-   private SimpleString ADDRESS = new SimpleString("test-add");
-
-   private HornetQServer server;
-
-   private static final int PAGE_MAX = -1;
-
-   private static final int PAGE_SIZE = 10 * 1024 * 1024;
-
-   // Static --------------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   // Public --------------------------------------------------------
-
-   public void testReadCache() throws Exception
-   {
-
-      PagingStoreImpl pageStore = (PagingStoreImpl)server.getPagingManager().getPageStore(ADDRESS);
-
-      StorageManager storageManager = server.getStorageManager();
-
-      final int NUM_MESSAGES = 1000;
-
-      pageStore.startPaging();
-
-      for (int i = 0; i < NUM_MESSAGES; i++)
-      {
-         if (i % 100 == 0) System.out.println("Paged " + i);
-         HornetQBuffer buffer = RandomUtil.randomBuffer(1024*1024, i + 1l);
-
-         ServerMessage msg = new ServerMessageImpl(i, buffer.writerIndex());
-         msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
-
-         Assert.assertTrue(pageStore.page(msg));
-      }
-
-      int numberOfPages = pageStore.getNumberOfPages();
-
-      System.out.println("NumberOfPages = " + numberOfPages);
-
-      PageCursorProviderImpl cursorProvider = new PageCursorProviderImpl(pageStore, storageManager);
-
-      for (int i = 0; i < numberOfPages; i++)
-      {
-         PageCache cache = cursorProvider.getPageCache(i + 1);
-         System.out.println("Page " + i + " had " + cache.getNumberOfMessages() + " messages");
-
-      }
-      
-      forceGC();
-      
-      assertTrue(cursorProvider.getCacheSize() < numberOfPages);
-      
-      System.out.println("Cache size = " + cursorProvider.getCacheSize());
-      assertEquals(numberOfPages, pageStore.getNumberOfPages());
-      
-      
-   }
-
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   protected void setUp() throws Exception
-   {
-      super.setUp();
-      System.out.println("Tmp:" + getTemporaryDir());
-
-      server = createServer(true,
-                            createDefaultConfig(),
-                            PAGE_SIZE,
-                            PAGE_MAX,
-                            new HashMap<String, AddressSettings>());
-
-      server.start();
-
-      createQueue(ADDRESS.toString(), ADDRESS.toString());
-   }
-
-   protected void tearDown() throws Exception
-   {
-      server.stop();
-      super.tearDown();
-   }
-
-   // Private -------------------------------------------------------
-
-   // Inner classes -------------------------------------------------
-
-}

Copied: branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java (from rev 9741, branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCacheTest.java)
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java	                        (rev 0)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java	2010-10-04 16:12:30 UTC (rev 9742)
@@ -0,0 +1,206 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.paging;
+
+import java.util.HashMap;
+
+import junit.framework.Assert;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.Pair;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.paging.cursor.PageCache;
+import org.hornetq.core.paging.cursor.PageCursor;
+import org.hornetq.core.paging.cursor.PagePosition;
+import org.hornetq.core.paging.cursor.impl.PageCursorProviderImpl;
+import org.hornetq.core.paging.impl.PagingStoreImpl;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.impl.ServerMessageImpl;
+import org.hornetq.core.settings.impl.AddressSettings;
+import org.hornetq.tests.util.RandomUtil;
+import org.hornetq.tests.util.ServiceTestBase;
+
+/**
+ * A PageCacheTest
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class PageCursorTest extends ServiceTestBase
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private SimpleString ADDRESS = new SimpleString("test-add");
+
+   private HornetQServer server;
+
+   private static final int PAGE_MAX = -1;
+
+   private static final int PAGE_SIZE = 10 * 1024 * 1024;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   public void testReadCache() throws Exception
+   {
+
+      final int NUM_MESSAGES = 1000;
+
+      int numberOfPages = addMessages(NUM_MESSAGES, 1024 * 1024);
+
+      System.out.println("NumberOfPages = " + numberOfPages);
+
+      PageCursorProviderImpl cursorProvider = new PageCursorProviderImpl(lookupPageStore(ADDRESS), server.getStorageManager());
+
+      for (int i = 0; i < numberOfPages; i++)
+      {
+         PageCache cache = cursorProvider.getPageCache(i + 1);
+         System.out.println("Page " + i + " had " + cache.getNumberOfMessages() + " messages");
+
+      }
+      
+      forceGC();
+      
+      assertTrue(cursorProvider.getCacheSize() < numberOfPages);
+      
+      System.out.println("Cache size = " + cursorProvider.getCacheSize());
+   }
+
+
+   public void testSimpleCursor() throws Exception
+   {
+
+      final int NUM_MESSAGES = 1000;
+
+      int numberOfPages = addMessages(NUM_MESSAGES, 1024 * 1024);
+
+      System.out.println("NumberOfPages = " + numberOfPages);
+
+      PageCursorProviderImpl cursorProvider = new PageCursorProviderImpl(lookupPageStore(ADDRESS), server.getStorageManager());
+      
+      PageCursor cursor = cursorProvider.createCursor();
+      
+      Pair<PagePosition, ServerMessage> msg;
+      
+      int key = 0;
+      while ((msg = cursor.moveNext()) != null)
+      {
+         assertEquals(key++, msg.b.getIntProperty("key").intValue());
+      }
+      assertEquals(NUM_MESSAGES, key);
+      
+      
+      forceGC();
+      
+      assertTrue(cursorProvider.getCacheSize() < numberOfPages);
+
+   }
+
+
+   public void testReadNextPage() throws Exception
+   {
+
+      final int NUM_MESSAGES = 1;
+
+      int numberOfPages = addMessages(NUM_MESSAGES, 1024);
+
+      System.out.println("NumberOfPages = " + numberOfPages);
+
+      PageCursorProviderImpl cursorProvider = new PageCursorProviderImpl(lookupPageStore(ADDRESS), server.getStorageManager());
+      
+      PageCache cache = cursorProvider.getPageCache(2);
+      
+      assertNull(cache);
+   }
+
+   /**
+    * @param numMessages
+    * @param pageStore
+    * @throws Exception
+    */
+   private int addMessages(final int numMessages, final int messageSize) throws Exception
+   {
+      PagingStoreImpl pageStore = lookupPageStore(ADDRESS);
+
+      pageStore.startPaging();
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         if (i % 100 == 0) System.out.println("Paged " + i);
+         HornetQBuffer buffer = RandomUtil.randomBuffer(messageSize, i + 1l);
+
+         ServerMessage msg = new ServerMessageImpl(i, buffer.writerIndex());
+         msg.putIntProperty("key", i);
+         
+         msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
+
+         Assert.assertTrue(pageStore.page(msg));
+      }
+      
+      return pageStore.getNumberOfPages();
+   }
+
+   /**
+    * @return
+    * @throws Exception
+    */
+   private PagingStoreImpl lookupPageStore(SimpleString address) throws Exception
+   {
+      return (PagingStoreImpl)server.getPagingManager().getPageStore(address);
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+      System.out.println("Tmp:" + getTemporaryDir());
+      
+      Configuration config = createDefaultConfig();
+      
+      config.setJournalSyncNonTransactional(false);
+
+      server = createServer(true,
+                            config,
+                            PAGE_SIZE,
+                            PAGE_MAX,
+                            new HashMap<String, AddressSettings>());
+
+      server.start();
+
+      createQueue(ADDRESS.toString(), ADDRESS.toString());
+   }
+
+   protected void tearDown() throws Exception
+   {
+      server.stop();
+      super.tearDown();
+   }
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}



More information about the hornetq-commits mailing list