[hornetq-commits] JBoss hornetq SVN: r9903 - in branches/Branch_New_Paging_preMerge/tests/src/org/hornetq/tests: stress/paging and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Nov 16 21:25:06 EST 2010


Author: clebert.suconic at jboss.com
Date: 2010-11-16 21:25:06 -0500 (Tue, 16 Nov 2010)
New Revision: 9903

Added:
   branches/Branch_New_Paging_preMerge/tests/src/org/hornetq/tests/stress/paging/PageCursorStressTest.java
Removed:
   branches/Branch_New_Paging_preMerge/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
Log:
move file

Deleted: branches/Branch_New_Paging_preMerge/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
--- branches/Branch_New_Paging_preMerge/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java	2010-11-17 02:24:23 UTC (rev 9902)
+++ branches/Branch_New_Paging_preMerge/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java	2010-11-17 02:25:06 UTC (rev 9903)
@@ -1,1330 +0,0 @@
-/*
- * Copyright 2010 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *    http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied.  See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.tests.integration.paging;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import junit.framework.Assert;
-
-import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.api.core.SimpleString;
-import org.hornetq.api.core.client.ClientSession;
-import org.hornetq.api.core.client.ClientSessionFactory;
-import org.hornetq.core.config.Configuration;
-import org.hornetq.core.filter.Filter;
-import org.hornetq.core.paging.PagedMessage;
-import org.hornetq.core.paging.cursor.PageCache;
-import org.hornetq.core.paging.cursor.PageCursorProvider;
-import org.hornetq.core.paging.cursor.PagePosition;
-import org.hornetq.core.paging.cursor.PageSubscription;
-import org.hornetq.core.paging.cursor.PagedReference;
-import org.hornetq.core.paging.cursor.impl.PageCursorProviderImpl;
-import org.hornetq.core.paging.cursor.impl.PagePositionImpl;
-import org.hornetq.core.paging.cursor.impl.PageSubscriptionImpl;
-import org.hornetq.core.paging.impl.PagingStoreImpl;
-import org.hornetq.core.persistence.StorageManager;
-import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
-import org.hornetq.core.server.HornetQServer;
-import org.hornetq.core.server.Queue;
-import org.hornetq.core.server.RoutingContext;
-import org.hornetq.core.server.ServerMessage;
-import org.hornetq.core.server.impl.RoutingContextImpl;
-import org.hornetq.core.server.impl.ServerMessageImpl;
-import org.hornetq.core.settings.impl.AddressSettings;
-import org.hornetq.core.transaction.Transaction;
-import org.hornetq.core.transaction.impl.TransactionImpl;
-import org.hornetq.tests.unit.core.postoffice.impl.FakeQueue;
-import org.hornetq.tests.util.RandomUtil;
-import org.hornetq.tests.util.ServiceTestBase;
-import org.hornetq.utils.LinkedListIterator;
-
-/**
- * A PageCursorTest
- *
- * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
- *
- *
- */
-public class PageCursorTest extends ServiceTestBase
-{
-
-   // Constants -----------------------------------------------------
-
-   // Attributes ----------------------------------------------------
-
-   private SimpleString ADDRESS = new SimpleString("test-add");
-
-   private HornetQServer server;
-
-   private Queue queue;
-   
-   private List<Queue> queueList;
-
-   private static final int PAGE_MAX = -1;
-
-   private static final int PAGE_SIZE = 10 * 1024 * 1024;
-
-   // Static --------------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   // Public --------------------------------------------------------
-
-   // Read more cache than what would fit on the memory, and validate if the memory would be cleared through soft-caches
-   public void testReadCache() throws Exception
-   {
-
-      final int NUM_MESSAGES = 1000;
-
-      int numberOfPages = addMessages(NUM_MESSAGES, 1024 * 1024);
-
-      System.out.println("NumberOfPages = " + numberOfPages);
-
-      PageCursorProviderImpl cursorProvider = new PageCursorProviderImpl(lookupPageStore(ADDRESS),
-                                                                         server.getStorageManager(),
-                                                                         server.getExecutorFactory());
-
-      for (int i = 0; i < numberOfPages; i++)
-      {
-         PageCache cache = cursorProvider.getPageCache(new PagePositionImpl(i + 1, 0));
-         System.out.println("Page " + i + " had " + cache.getNumberOfMessages() + " messages");
-
-      }
-
-      forceGC();
-
-      assertTrue(cursorProvider.getCacheSize() < numberOfPages);
-
-      System.out.println("Cache size = " + cursorProvider.getCacheSize());
-   }
-
-   public void testSimpleCursor() throws Exception
-   {
-
-      final int NUM_MESSAGES = 100;
-
-      PageSubscription cursor = lookupPageStore(ADDRESS).getCursorProvier().getSubscription(queue.getID());
-      
-      Iterator<PagedReference> iterEmpty = cursor.iterator();
-
-      int numberOfPages = addMessages(NUM_MESSAGES, 1024 * 1024);
-
-      System.out.println("NumberOfPages = " + numberOfPages);
-
-      PagedReference msg;
-
-      LinkedListIterator<PagedReference> iterator = cursor.iterator();
-      int key = 0;
-      while ((msg = iterator.next()) != null)
-      {
-         assertEquals(key++, msg.getMessage().getIntProperty("key").intValue());
-         cursor.ack(msg.getPosition());
-      }
-      assertEquals(NUM_MESSAGES, key);
-
-      server.getStorageManager().waitOnOperations();
-
-      waitCleanup();
-
-      assertFalse(lookupPageStore(ADDRESS).isPaging());
-
-      assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
-
-      forceGC();
-
-      server.stop();
-      createServer();
-      waitCleanup();
-      assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
-
-   }
-
-   public void testSimpleCursorWithFilter() throws Exception
-   {
-
-      final int NUM_MESSAGES = 100;
-
-      PageSubscription cursorEven = createNonPersistentCursor(new Filter()
-      {
-
-         public boolean match(ServerMessage message)
-         {
-            Boolean property = message.getBooleanProperty("even");
-            if (property == null)
-            {
-               return false;
-            }
-            else
-            {
-               return property.booleanValue();
-            }
-         }
-
-         public SimpleString getFilterString()
-         {
-            return new SimpleString("even=true");
-         }
-
-      });
-
-      PageSubscription cursorOdd = createNonPersistentCursor(new Filter()
-      {
-
-         public boolean match(ServerMessage message)
-         {
-            Boolean property = message.getBooleanProperty("even");
-            if (property == null)
-            {
-               return false;
-            }
-            else
-            {
-               return !property.booleanValue();
-            }
-         }
-
-         public SimpleString getFilterString()
-         {
-            return new SimpleString("even=true");
-         }
-
-      });
-
-      int numberOfPages = addMessages(NUM_MESSAGES, 1024 * 1024);
-
-      System.out.println("NumberOfPages = " + numberOfPages);
-
-      queue.getPageSubscription().close();
-
-      PagedReference msg;
-
-      LinkedListIterator<PagedReference> iteratorEven = cursorEven.iterator();
-
-      LinkedListIterator<PagedReference> iteratorOdd = cursorOdd.iterator();
-
-      int key = 0;
-      while ((msg = iteratorEven.next()) != null)
-      {
-         System.out.println("Received" + msg);
-         assertEquals(key, msg.getMessage().getIntProperty("key").intValue());
-         assertTrue(msg.getMessage().getBooleanProperty("even").booleanValue());
-         key += 2;
-         cursorEven.ack(msg.getPosition());
-      }
-      assertEquals(NUM_MESSAGES, key);
-
-      key = 1;
-      while ((msg = iteratorOdd.next()) != null)
-      {
-         assertEquals(key, msg.getMessage().getIntProperty("key").intValue());
-         assertFalse(msg.getMessage().getBooleanProperty("even").booleanValue());
-         key += 2;
-         cursorOdd.ack(msg.getPosition());
-      }
-      assertEquals(NUM_MESSAGES + 1, key);
-
-      forceGC();
-
-      // assertTrue(lookupCursorProvider().getCacheSize() < numberOfPages);
-
-      server.stop();
-      createServer();
-      waitCleanup();
-      assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
-
-   }
-
-   public void testReadNextPage() throws Exception
-   {
-
-      final int NUM_MESSAGES = 1;
-
-      int numberOfPages = addMessages(NUM_MESSAGES, 1024);
-
-      System.out.println("NumberOfPages = " + numberOfPages);
-
-      PageCursorProvider cursorProvider = lookupCursorProvider();
-
-      PageCache cache = cursorProvider.getPageCache(new PagePositionImpl(2, 0));
-
-      assertNull(cache);
-   }
-
-   public void testRestart() throws Exception
-   {
-      final int NUM_MESSAGES = 1000;
-
-      int numberOfPages = addMessages(NUM_MESSAGES, 100 * 1024);
-
-      System.out.println("Number of pages = " + numberOfPages);
-
-      PageCursorProvider cursorProvider = lookupCursorProvider();
-
-      PageSubscription cursor = this.server.getPagingManager()
-                                           .getPageStore(ADDRESS)
-                                           .getCursorProvier()
-                                           .getSubscription(queue.getID());
-
-      PageCache firstPage = cursorProvider.getPageCache(new PagePositionImpl(server.getPagingManager()
-                                                                                   .getPageStore(ADDRESS)
-                                                                                   .getFirstPage(), 0));
-
-      int firstPageSize = firstPage.getNumberOfMessages();
-
-      firstPage = null;
-
-      System.out.println("Cursor: " + cursor);
-      cursorProvider.printDebug();
-
-      LinkedListIterator<PagedReference> iterator = cursor.iterator();
-
-      for (int i = 0; i < 1000; i++)
-      {
-         System.out.println("Reading Msg : " + i);
-         PagedReference msg = iterator.next();
-         assertNotNull(msg);
-         assertEquals(i, msg.getMessage().getIntProperty("key").intValue());
-
-         if (i < firstPageSize)
-         {
-            cursor.ack(msg);
-         }
-      }
-      cursorProvider.printDebug();
-
-      server.getStorageManager().waitOnOperations();
-      lookupPageStore(ADDRESS).flushExecutors();
-
-      // needs to clear the context since we are using the same thread over two distinct servers
-      // otherwise we will get the old executor on the factory
-      OperationContextImpl.clearContext();
-
-      server.stop();
-
-      server.start();
-
-      cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getSubscription(queue.getID());
-
-      iterator = cursor.iterator();
-
-      for (int i = firstPageSize; i < NUM_MESSAGES; i++)
-      {
-         System.out.println("Received " + i);
-         PagedReference msg = iterator.next();
-         assertNotNull(msg);
-         assertEquals(i, msg.getMessage().getIntProperty("key").intValue());
-
-         cursor.ack(msg);
-
-         OperationContextImpl.getContext(null).waitCompletion();
-
-      }
-
-      OperationContextImpl.getContext(null).waitCompletion();
-
-      lookupPageStore(ADDRESS).flushExecutors();
-
-      assertFalse(lookupPageStore(ADDRESS).isPaging());
-
-      server.stop();
-      createServer();
-      assertFalse(lookupPageStore(ADDRESS).isPaging());
-      waitCleanup();
-      assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
-
-   }
-
-   public void testRestartWithHoleOnAck() throws Exception
-   {
-
-      final int NUM_MESSAGES = 1000;
-
-      int numberOfPages = addMessages(NUM_MESSAGES, 10 * 1024);
-
-      System.out.println("Number of pages = " + numberOfPages);
-
-      PageCursorProvider cursorProvider = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier();
-      System.out.println("cursorProvider = " + cursorProvider);
-
-      PageSubscription cursor = this.server.getPagingManager()
-                                           .getPageStore(ADDRESS)
-                                           .getCursorProvier()
-                                           .getSubscription(queue.getID());
-
-      System.out.println("Cursor: " + cursor);
-      LinkedListIterator<PagedReference> iterator = cursor.iterator();
-      for (int i = 0; i < 100; i++)
-      {
-         PagedReference msg = iterator.next();
-         assertEquals(i, msg.getMessage().getIntProperty("key").intValue());
-         if (i < 10 || i > 20)
-         {
-            cursor.ack(msg);
-         }
-      }
-      
-      server.getStorageManager().waitOnOperations();
-
-      server.stop();
-
-      OperationContextImpl.clearContext();
-
-      server.start();
-
-      cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getSubscription(queue.getID());
-      iterator = cursor.iterator();
-
-      for (int i = 10; i <= 20; i++)
-      {
-         PagedReference msg = iterator.next();
-         assertEquals(i, msg.getMessage().getIntProperty("key").intValue());
-         cursor.ack(msg);
-      }
-
-      for (int i = 100; i < NUM_MESSAGES; i++)
-      {
-         PagedReference msg = iterator.next();
-         assertEquals(i, msg.getMessage().getIntProperty("key").intValue());
-         cursor.ack(msg);
-      }
-
-      server.stop();
-      createServer();
-      waitCleanup();
-      assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
-
-   }
-
-   public void testRestartWithHoleOnAckAndTransaction() throws Exception
-   {
-      final int NUM_MESSAGES = 1000;
-
-      int numberOfPages = addMessages(NUM_MESSAGES, 10 * 1024);
-
-      System.out.println("Number of pages = " + numberOfPages);
-
-      PageCursorProvider cursorProvider = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier();
-      System.out.println("cursorProvider = " + cursorProvider);
-
-      PageSubscription cursor = this.server.getPagingManager()
-                                           .getPageStore(ADDRESS)
-                                           .getCursorProvier()
-                                           .getSubscription(queue.getID());
-
-      System.out.println("Cursor: " + cursor);
-
-      Transaction tx = new TransactionImpl(server.getStorageManager(), 60 * 1000);
-
-      LinkedListIterator<PagedReference> iterator = cursor.iterator();
-
-      for (int i = 0; i < 100; i++)
-      {
-         PagedReference msg = iterator.next();
-         assertEquals(i, msg.getMessage().getIntProperty("key").intValue());
-         if (i < 10 || i > 20)
-         {
-            cursor.ackTx(tx, msg);
-         }
-      }
-
-      tx.commit();
-
-      server.stop();
-
-      OperationContextImpl.clearContext();
-
-      server.start();
-
-      cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getSubscription(queue.getID());
-
-      tx = new TransactionImpl(server.getStorageManager(), 60 * 1000);
-      iterator = cursor.iterator();
-
-      for (int i = 10; i <= 20; i++)
-      {
-         PagedReference msg = iterator.next();
-         assertEquals(i, msg.getMessage().getIntProperty("key").intValue());
-         cursor.ackTx(tx, msg);
-      }
-
-      for (int i = 100; i < NUM_MESSAGES; i++)
-      {
-         PagedReference msg = iterator.next();
-         assertEquals(i, msg.getMessage().getIntProperty("key").intValue());
-         cursor.ackTx(tx, msg);
-      }
-
-      tx.commit();
-
-      server.stop();
-      createServer();
-      waitCleanup();
-      assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
-
-   }
-
-   public void testConsumeLivePage() throws Exception
-   {
-      PagingStoreImpl pageStore = lookupPageStore(ADDRESS);
-
-      pageStore.startPaging();
-
-      final int NUM_MESSAGES = 100;
-
-      final int messageSize = 1024 * 1024;
-
-      PageCursorProvider cursorProvider = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier();
-      System.out.println("cursorProvider = " + cursorProvider);
-
-      PageSubscription cursor = this.server.getPagingManager()
-                                           .getPageStore(ADDRESS)
-                                           .getCursorProvier()
-                                           .getSubscription(queue.getID());
-
-      System.out.println("Cursor: " + cursor);
-      
-      RoutingContextImpl ctx = generateCTX();
-
-      LinkedListIterator<PagedReference> iterator = cursor.iterator();
-
-      for (int i = 0; i < NUM_MESSAGES; i++)
-      {
-         // if (i % 100 == 0)
-         System.out.println("read/written " + i);
-
-         HornetQBuffer buffer = RandomUtil.randomBuffer(messageSize, i + 1l);
-
-         ServerMessage msg = new ServerMessageImpl(i, buffer.writerIndex());
-         msg.putIntProperty("key", i);
-
-         msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
-
-         Assert.assertTrue(pageStore.page(msg, ctx, ctx.getContextListing(ADDRESS)));
-
-         PagedReference readMessage = iterator.next();
-
-         assertNotNull(readMessage);
-
-         assertEquals(i, readMessage.getMessage().getIntProperty("key").intValue());
-
-         assertNull(iterator.next());
-      }
-
-      server.stop();
-
-      OperationContextImpl.clearContext();
-
-      createServer();
-
-      pageStore = lookupPageStore(ADDRESS);
-
-      cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getSubscription(queue.getID());
-      iterator = cursor.iterator();
-
-      for (int i = 0; i < NUM_MESSAGES * 2; i++)
-      {
-         if (i % 100 == 0)
-            System.out.println("Paged " + i);
-
-         if (i >= NUM_MESSAGES)
-         {
-
-            HornetQBuffer buffer = RandomUtil.randomBuffer(messageSize, i + 1l);
-
-            ServerMessage msg = new ServerMessageImpl(i, buffer.writerIndex());
-            msg.putIntProperty("key", i);
-
-            msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
-
-            Assert.assertTrue(pageStore.page(msg, ctx, ctx.getContextListing(ADDRESS)));
-         }
-
-         PagedReference readMessage = iterator.next();
-
-         assertNotNull(readMessage);
-
-         assertEquals(i, readMessage.getMessage().getIntProperty("key").intValue());
-      }
-
-      server.stop();
-
-      OperationContextImpl.clearContext();
-
-      createServer();
-
-      pageStore = lookupPageStore(ADDRESS);
-
-      cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getSubscription(queue.getID());
-      iterator = cursor.iterator();
-
-      for (int i = 0; i < NUM_MESSAGES * 3; i++)
-      {
-         if (i % 100 == 0)
-            System.out.println("Paged " + i);
-
-         if (i >= NUM_MESSAGES * 2 - 1)
-         {
-
-            HornetQBuffer buffer = RandomUtil.randomBuffer(messageSize, i + 1l);
-
-            ServerMessage msg = new ServerMessageImpl(i, buffer.writerIndex());
-            msg.putIntProperty("key", i + 1);
-
-            msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
-
-            Assert.assertTrue(pageStore.page(msg, ctx, ctx.getContextListing(ADDRESS)));
-         }
-
-         PagedReference readMessage = iterator.next();
-
-         assertNotNull(readMessage);
-
-         cursor.ack(readMessage);
-
-         assertEquals(i, readMessage.getMessage().getIntProperty("key").intValue());
-      }
-
-      PagedReference readMessage = iterator.next();
-
-      assertEquals(NUM_MESSAGES * 3, readMessage.getMessage().getIntProperty("key").intValue());
-
-      cursor.ack(readMessage);
-
-      server.getStorageManager().waitOnOperations();
-
-      pageStore.flushExecutors();
-
-      assertFalse(pageStore.isPaging());
-
-      server.stop();
-      createServer();
-
-      assertFalse(pageStore.isPaging());
-
-      waitCleanup();
-
-      assertFalse(lookupPageStore(ADDRESS).isPaging());
-
-   }
-   
-
-   public void testConsumeLivePageMultiThread() throws Exception
-   {
-      final PagingStoreImpl pageStore = lookupPageStore(ADDRESS);
-
-      pageStore.startPaging();
-
-      final int NUM_TX = 100;
-      
-      final int MSGS_TX = 100;
-      
-      final int TOTAL_MSG = NUM_TX * MSGS_TX;
-
-      final int messageSize = 1024;
-
-      PageCursorProvider cursorProvider = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier();
-      System.out.println("cursorProvider = " + cursorProvider);
-
-      PageSubscription cursor = this.server.getPagingManager()
-                                           .getPageStore(ADDRESS)
-                                           .getCursorProvier()
-                                           .getSubscription(queue.getID());
-
-      System.out.println("Cursor: " + cursor);
-      
-      final StorageManager storage = this.server.getStorageManager();
-      
-      final AtomicInteger exceptions = new AtomicInteger(0);
-      
-      Thread t1 = new Thread()
-      {
-         public void run()
-         {
-            try
-            {
-               int count = 0;
-               
-               for (int txCount = 0; txCount < NUM_TX; txCount++)
-               {
-                  
-                  Transaction tx = null;
-                  
-                  if (txCount % 2 == 0)
-                  {
-                     tx = new TransactionImpl(storage);
-                  }
-
-                  RoutingContext ctx = generateCTX(tx);
-                  
-                  for (int i = 0 ; i < MSGS_TX; i++)
-                  {
-                     //System.out.println("Sending " + count);
-                     HornetQBuffer buffer = RandomUtil.randomBuffer(messageSize, count);
-
-                     ServerMessage msg = new ServerMessageImpl(i, buffer.writerIndex());
-                     msg.putIntProperty("key", count++);
-
-                     msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
-
-                     Assert.assertTrue(pageStore.page(msg, ctx, ctx.getContextListing(ADDRESS)));
-                  }
-                  
-                  if (tx != null)
-                  {
-                     tx.commit();
-                  }
-                  
-               }
-            }
-            catch (Throwable e)
-            {
-               e.printStackTrace();
-               exceptions.incrementAndGet();
-            }
-         }
-      };
-      
-      t1.start();
-      
-      
-      LinkedListIterator<PagedReference> iterator = cursor.iterator();
-      
-      for (int i = 0 ; i < TOTAL_MSG; i++ )
-      {
-         assertEquals(0, exceptions.get());
-         PagedReference ref = null;
-         for (int repeat = 0 ; repeat < 5; repeat++)
-         {
-            ref = iterator.next();
-            if (ref == null)
-            {
-               Thread.sleep(1000);
-            }
-            else
-            {
-               break;
-            }
-         }
-         assertNotNull(ref);
-         
-         ref.acknowledge();
-         assertNotNull(ref);
-         
-         System.out.println("Consuming " + ref.getMessage().getIntProperty("key"));
-         //assertEquals(i, ref.getMessage().getIntProperty("key").intValue());
-      }
-
-      assertEquals(0, exceptions.get());
-   }
-   
-   private RoutingContextImpl generateCTX()
-   {
-      return generateCTX(null);
-   }
-   
-   private RoutingContextImpl generateCTX(Transaction tx)
-   {
-      RoutingContextImpl ctx = new RoutingContextImpl(tx);
-      ctx.addQueue(ADDRESS, queue);
-      
-      for (Queue q : this.queueList)
-      {
-         ctx.addQueue(ADDRESS, q);
-      }
-      
-      return ctx;
-   }
-
-   /**
-    * @throws Exception
-    * @throws InterruptedException
-    */
-   private void waitCleanup() throws Exception, InterruptedException
-   {
-      // The cleanup is done asynchronously, so we need to wait some time
-      long timeout = System.currentTimeMillis() + 10000;
-
-      while (System.currentTimeMillis() < timeout && lookupPageStore(ADDRESS).getNumberOfPages() != 1)
-      {
-         Thread.sleep(100);
-      }
-
-      assertTrue("expected " + lookupPageStore(ADDRESS).getNumberOfPages(),
-                 lookupPageStore(ADDRESS).getNumberOfPages() <= 2);
-   }
-
-   public void testPrepareScenarios() throws Exception
-   {
-      PagingStoreImpl pageStore = lookupPageStore(ADDRESS);
-
-      pageStore.startPaging();
-
-      final int NUM_MESSAGES = 100;
-
-      final int messageSize = 100 * 1024;
-
-      PageCursorProvider cursorProvider = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier();
-      System.out.println("cursorProvider = " + cursorProvider);
-
-      PageSubscription cursor = this.server.getPagingManager()
-                                           .getPageStore(ADDRESS)
-                                           .getCursorProvier()
-                                           .getSubscription(queue.getID());
-      LinkedListIterator<PagedReference> iterator = cursor.iterator();
-
-      System.out.println("Cursor: " + cursor);
-
-      StorageManager storage = this.server.getStorageManager();
-
-      long pgtxRollback = storage.generateUniqueID();
-      long pgtxForgotten = storage.generateUniqueID();
-      long pgtxCommit = storage.generateUniqueID();
-
-      Transaction txRollback = pgMessages(storage, pageStore, pgtxRollback, 0, NUM_MESSAGES, messageSize);
-      pageStore.forceAnotherPage();
-      Transaction txForgotten = pgMessages(storage, pageStore, pgtxForgotten, 100, NUM_MESSAGES, messageSize);
-      pageStore.forceAnotherPage();
-      Transaction txCommit = pgMessages(storage, pageStore, pgtxCommit, 200, NUM_MESSAGES, messageSize);
-      pageStore.forceAnotherPage();
-
-      addMessages(300, NUM_MESSAGES, messageSize);
-
-      System.out.println("Number of pages - " + pageStore.getNumberOfPages());
-
-      // First consume what's already there without any tx as nothing was committed
-      for (int i = 300; i < 400; i++)
-      {
-         PagedReference pos = iterator.next();
-         assertNotNull("Null at position " + i, pos);
-         assertEquals(i, pos.getMessage().getIntProperty("key").intValue());
-         cursor.ack(pos);
-      }
-
-      assertNull(iterator.next());
-
-      cursor.printDebug();
- 
-      txCommit.commit();
-
-      txRollback.rollback();
-      
-      storage.waitOnOperations();
-
-      // Second:after pgtxCommit was done
-      for (int i = 200; i < 300; i++)
-      {
-         PagedReference pos = iterator.next();
-         assertNotNull(pos);
-         assertEquals(i, pos.getMessage().getIntProperty("key").intValue());
-         cursor.ack(pos);
-      }
-
-      assertNull(iterator.next());
-      
-      server.getStorageManager().waitOnOperations();
-
-      server.stop();
-      createServer();
-
-      long timeout = System.currentTimeMillis() + 10000;
-
-      while (System.currentTimeMillis() < timeout && lookupPageStore(ADDRESS).getNumberOfPages() != 1)
-      {
-         Thread.sleep(500);
-      }
-      assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
-
-   }
-
-
-   public void testLazyCommit() throws Exception
-   {
-      PagingStoreImpl pageStore = lookupPageStore(ADDRESS);
-
-      pageStore.startPaging();
-
-      final int NUM_MESSAGES = 100;
-
-      final int messageSize = 100 * 1024;
-
-      PageCursorProvider cursorProvider = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier();
-      System.out.println("cursorProvider = " + cursorProvider);
-
-      PageSubscription cursor = this.server.getPagingManager()
-                                           .getPageStore(ADDRESS)
-                                           .getCursorProvier()
-                                           .getSubscription(queue.getID());
-      LinkedListIterator<PagedReference> iterator = cursor.iterator();
-
-      System.out.println("Cursor: " + cursor);
-
-      StorageManager storage = this.server.getStorageManager();
-
-      long pgtxLazy = storage.generateUniqueID();
-
-      Transaction txLazy = pgMessages(storage, pageStore, pgtxLazy, 0, NUM_MESSAGES, messageSize);
-
-      addMessages(100, NUM_MESSAGES, messageSize);
-
-      System.out.println("Number of pages - " + pageStore.getNumberOfPages());
-
-      // First consume what's already there without any tx as nothing was committed
-      for (int i = 100; i < 200; i++)
-      {
-         PagedReference pos = iterator.next();
-         assertNotNull("Null at position " + i, pos);
-         assertEquals(i, pos.getMessage().getIntProperty("key").intValue());
-         cursor.ack(pos);
-      }
-
-      assertNull(iterator.next());
-      
-      txLazy.commit();
-      
-      storage.waitOnOperations();
-
-      for (int i = 0; i < 100; i++)
-      {
-         PagedReference pos = iterator.next();
-         assertNotNull("Null at position " + i, pos);
-         assertEquals(i, pos.getMessage().getIntProperty("key").intValue());
-         cursor.ack(pos);
-      }
-
-      assertNull(iterator.next());
-
-      waitCleanup();
-
-      server.stop();
-      createServer();
-      waitCleanup();
-      assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
-
-   }
-
-   public void testCloseNonPersistentConsumer() throws Exception
-   {
-
-      final int NUM_MESSAGES = 100;
-
-      PageCursorProvider cursorProvider = lookupCursorProvider();
-
-      PageSubscription cursor = cursorProvider.createSubscription(11, null, false);
-      PageSubscriptionImpl cursor2 = (PageSubscriptionImpl)cursorProvider.createSubscription(12, null, false);
-      
-      this.queueList.add(new FakeQueue(new SimpleString("a"), 11));
-      
-      this.queueList.add(new FakeQueue(new SimpleString("b"), 12));
-
-      int numberOfPages = addMessages(NUM_MESSAGES, 1024 * 1024);
-
-      System.out.println("NumberOfPages = " + numberOfPages);
-
-      queue.getPageSubscription().close();
-
-      PagedReference msg;
-      LinkedListIterator<PagedReference> iterator = cursor.iterator();
-      LinkedListIterator<PagedReference> iterator2 = cursor2.iterator();
-      
-      cursor2.bookmark(new PagePositionImpl(1, -1));
-
-      int key = 0;
-      while ((msg = iterator.next()) != null)
-      {
-         System.out.println("key = " + key);
-         assertEquals(key++, msg.getMessage().getIntProperty("key").intValue());
-         cursor.ack(msg);
-      }
-      assertEquals(NUM_MESSAGES, key);
-
-      forceGC();
-
-      for (int i = 0; i < 10; i++)
-      {
-         assertTrue(iterator2.hasNext());
-         msg = iterator2.next();
-         assertEquals(i, msg.getMessage().getIntProperty("key").intValue());
-      }
-
-      assertSame(cursor2.getProvider(), cursorProvider);
-
-      cursor2.close();
-
-      lookupPageStore(ADDRESS).flushExecutors();
-
-      server.stop();
-      createServer();
-      waitCleanup();
-      assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
-
-   }
-
-   public void testNoCursors() throws Exception
-   {
-
-      final int NUM_MESSAGES = 100;
-
-      int numberOfPages = addMessages(NUM_MESSAGES, 1024 * 1024);
-
-      ClientSessionFactory sf = createInVMFactory();
-      ClientSession session = sf.createSession();
-      session.deleteQueue(ADDRESS);
-
-      System.out.println("NumberOfPages = " + numberOfPages);
-
-      server.stop();
-      createServer();
-      waitCleanup();
-      assertEquals(0, lookupPageStore(ADDRESS).getNumberOfPages());
-
-   }
-
-   public void testFirstMessageInTheMiddle() throws Exception
-   {
-
-      final int NUM_MESSAGES = 100;
-
-      PageCursorProvider cursorProvider = lookupCursorProvider();
-
-      PageSubscription cursor = cursorProvider.createSubscription(2, null, false);
-      
-      queueList.add(new FakeQueue(new SimpleString("tmp"), 2));
-
-      int numberOfPages = addMessages(NUM_MESSAGES, 1024 * 1024);
-
-      System.out.println("NumberOfPages = " + numberOfPages);
-
-      PageCache cache = cursorProvider.getPageCache(new PagePositionImpl(5, 0));
-
-      queue.getPageSubscription().close();
-
-      PagePosition startingPos = new PagePositionImpl(5, cache.getNumberOfMessages() / 2);
-      cursor.bookmark(startingPos);
-      PagedMessage msg = cache.getMessage(startingPos.getMessageNr() + 1);
-      msg.initMessage(server.getStorageManager());
-      int key = msg.getMessage().getIntProperty("key").intValue();
-
-      msg = null;
-
-      cache = null;
-      LinkedListIterator<PagedReference> iterator = cursor.iterator();
-
-      PagedReference msgCursor = null;
-      while ((msgCursor = iterator.next()) != null)
-      {
-         assertEquals(key++, msgCursor.getMessage().getIntProperty("key").intValue());
-         cursor.ack(msgCursor);
-      }
-      assertEquals(NUM_MESSAGES, key);
-
-      forceGC();
-
-      // assertTrue(cursorProvider.getCacheSize() < numberOfPages);
-
-      server.stop();
-      createServer();
-      waitCleanup();
-      assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
-   }
-
-   public void testFirstMessageInTheMiddlePersistent() throws Exception
-   {
-
-      final int NUM_MESSAGES = 100;
-
-      int numberOfPages = addMessages(NUM_MESSAGES, 1024 * 1024);
-
-      System.out.println("NumberOfPages = " + numberOfPages);
-
-      PageCursorProvider cursorProvider = lookupCursorProvider();
-
-      PageCache cache = cursorProvider.getPageCache(new PagePositionImpl(5, 0));
-
-      PageSubscription cursor = cursorProvider.getSubscription(queue.getID());
-      PagePosition startingPos = new PagePositionImpl(5, cache.getNumberOfMessages() / 2);
-      cursor.bookmark(startingPos);
-
-      // We can't proceed until the operation has finished
-      server.getStorageManager().waitOnOperations();
-      
-      PagedMessage msg = cache.getMessage(startingPos.getMessageNr() + 1);
-      msg.initMessage(server.getStorageManager());
-      int initialKey = msg.getMessage().getIntProperty("key").intValue();
-      int key = initialKey;
-
-      msg = null;
-
-      cache = null;
-
-      LinkedListIterator<PagedReference> iterator = cursor.iterator();
-
-      PagedReference msgCursor = null;
-      while ((msgCursor = iterator.next()) != null)
-      {
-         assertEquals(key++, msgCursor.getMessage().getIntProperty("key").intValue());
-      }
-      assertEquals(NUM_MESSAGES, key);
-
-      server.stop();
-
-      OperationContextImpl.clearContext();
-
-      createServer();
-
-      cursorProvider = lookupCursorProvider();
-      cursor = cursorProvider.getSubscription(queue.getID());
-      key = initialKey;
-      iterator = cursor.iterator();
-      while ((msgCursor = iterator.next()) != null)
-      {
-         assertEquals(key++, msgCursor.getMessage().getIntProperty("key").intValue());
-         cursor.ack(msgCursor);
-      }
-
-      forceGC();
-
-      assertTrue(cursorProvider.getCacheSize() < numberOfPages);
-
-      server.stop();
-      createServer();
-      waitCleanup();
-      assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
-
-   }
-   
-   private int tstProperty(ServerMessage msg)
-   {
-      return msg.getIntProperty("key").intValue();
-   }
-
-   public void testMultipleIterators() throws Exception
-   {
-
-      final int NUM_MESSAGES = 10;
-
-      int numberOfPages = addMessages(NUM_MESSAGES, 1024 * 1024);
-
-      System.out.println("NumberOfPages = " + numberOfPages);
-
-      PageCursorProvider cursorProvider = lookupCursorProvider();
-
-      PageSubscription cursor = cursorProvider.getSubscription(queue.getID());
-
-      LinkedListIterator<PagedReference> iter = cursor.iterator();
-      
-      LinkedListIterator<PagedReference> iter2 = cursor.iterator();
-      
-      assertTrue(iter.hasNext());
-      
-      PagedReference msg1 = iter.next();
-      
-      PagedReference msg2 = iter2.next();
-      
-      assertEquals(tstProperty(msg1.getMessage()), tstProperty(msg2.getMessage()));
-      
-      System.out.println("property = " + tstProperty(msg1.getMessage()));
-
-      msg1 = iter.next();
-      
-      assertEquals(1, tstProperty(msg1.getMessage()));
-      
-      iter.remove();
-      
-      msg2 = iter2.next();
-      
-      assertEquals(2, tstProperty(msg2.getMessage()));
-      
-      iter2.repeat();
-      
-      msg2 = iter2.next();
-      
-      assertEquals(2, tstProperty(msg2.getMessage()));
-      
-      iter2.repeat();
-      
-      assertEquals(2, tstProperty(msg2.getMessage()));
-      
-      msg1 = iter.next();
-      
-      assertEquals(2, tstProperty(msg1.getMessage()));
-      
-      iter.repeat();
-      
-      msg1 = iter.next();
-      
-      assertEquals(2, tstProperty(msg1.getMessage()));
-      
-      assertTrue(iter2.hasNext());
-      
-      
-   }
-
-   private int addMessages(final int numMessages, final int messageSize) throws Exception
-   {
-      return addMessages(0, numMessages, messageSize);
-   }
-
-   /**
-    * @param numMessages
-    * @param pageStore
-    * @throws Exception
-    */
-   private int addMessages(final int start, final int numMessages, final int messageSize) throws Exception
-   {
-      PagingStoreImpl pageStore = lookupPageStore(ADDRESS);
-
-      pageStore.startPaging();
-      
-      RoutingContext ctx = generateCTX();
-
-      for (int i = start; i < start + numMessages; i++)
-      {
-         if (i % 100 == 0)
-            System.out.println("Paged " + i);
-         HornetQBuffer buffer = RandomUtil.randomBuffer(messageSize, i + 1l);
-
-         ServerMessage msg = new ServerMessageImpl(i, buffer.writerIndex());
-         msg.putIntProperty("key", i);
-         // to be used on tests that are validating filters
-         msg.putBooleanProperty("even", i % 2 == 0);
-
-         msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
-
-         Assert.assertTrue(pageStore.page(msg, ctx, ctx.getContextListing(ADDRESS)));
-      }
-
-      return pageStore.getNumberOfPages();
-   }
-
-   /**
-    * @return
-    * @throws Exception
-    */
-   private PagingStoreImpl lookupPageStore(SimpleString address) throws Exception
-   {
-      return (PagingStoreImpl)server.getPagingManager().getPageStore(address);
-   }
-
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   protected void tearDown() throws Exception
-   {
-      server.stop();
-      server = null;
-      queue = null;
-      queueList = null;
-      super.tearDown();
-   }
-
-   protected void setUp() throws Exception
-   {
-      super.setUp();
-      OperationContextImpl.clearContext();
-      System.out.println("Tmp:" + getTemporaryDir());
-
-      queueList = new ArrayList<Queue>();
-      
-      createServer();
-   }
-
-   /**
-    * @throws Exception
-    */
-   private void createServer() throws Exception
-   {
-      OperationContextImpl.clearContext();
-
-      Configuration config = createDefaultConfig();
-
-      config.setJournalSyncNonTransactional(true);
-
-      server = createServer(true, config, PAGE_SIZE, PAGE_MAX, new HashMap<String, AddressSettings>());
-
-      server.start();
-
-      try
-      {
-         queue = server.createQueue(ADDRESS, ADDRESS, null, true, false);
-         queue.pause();
-      }
-      catch (Exception ignored)
-      {
-      }
-   }
-
-   /**
-    * @return
-    * @throws Exception
-    */
-   private PageSubscription createNonPersistentCursor(Filter filter) throws Exception
-   {
-      long id = server.getStorageManager().generateUniqueID();
-      queueList.add(new FakeQueue(new SimpleString(filter.toString()), id));
-      return lookupCursorProvider().createSubscription(id, filter, false);
-   }
-
-   /**
-    * @return
-    * @throws Exception
-    */
-   private PageCursorProvider lookupCursorProvider() throws Exception
-   {
-      return lookupPageStore(ADDRESS).getCursorProvier();
-   }
-
-   /**
-    * @param storage
-    * @param pageStore
-    * @param pgParameter
-    * @param start
-    * @param NUM_MESSAGES
-    * @param messageSize
-    * @throws Exception
-    */
-   private Transaction pgMessages(StorageManager storage,
-                           PagingStoreImpl pageStore,
-                           long pgParameter,
-                           int start,
-                           final int NUM_MESSAGES,
-                           final int messageSize) throws Exception
-   {
-      
-      TransactionImpl txImpl = new TransactionImpl(pgParameter, null, storage);
-      
-      RoutingContext ctx = generateCTX(txImpl);
-
-      for (int i = start; i < start + NUM_MESSAGES; i++)
-      {
-         HornetQBuffer buffer = RandomUtil.randomBuffer(messageSize, i + 1l);
-         ServerMessage msg = new ServerMessageImpl(storage.generateUniqueID(), buffer.writerIndex());
-         msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
-         msg.putIntProperty("key", i);
-         pageStore.page(msg, ctx, ctx.getContextListing(ADDRESS));
-      }
-      
-      return txImpl;
-
-   }
-
-   // Private -------------------------------------------------------
-
-   // Inner classes -------------------------------------------------
-
-}

Copied: branches/Branch_New_Paging_preMerge/tests/src/org/hornetq/tests/stress/paging/PageCursorStressTest.java (from rev 9894, branches/Branch_New_Paging_preMerge/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java)
===================================================================
--- branches/Branch_New_Paging_preMerge/tests/src/org/hornetq/tests/stress/paging/PageCursorStressTest.java	                        (rev 0)
+++ branches/Branch_New_Paging_preMerge/tests/src/org/hornetq/tests/stress/paging/PageCursorStressTest.java	2010-11-17 02:25:06 UTC (rev 9903)
@@ -0,0 +1,1330 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.stress.paging;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import junit.framework.Assert;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.filter.Filter;
+import org.hornetq.core.paging.PagedMessage;
+import org.hornetq.core.paging.cursor.PageCache;
+import org.hornetq.core.paging.cursor.PageCursorProvider;
+import org.hornetq.core.paging.cursor.PagePosition;
+import org.hornetq.core.paging.cursor.PageSubscription;
+import org.hornetq.core.paging.cursor.PagedReference;
+import org.hornetq.core.paging.cursor.impl.PageCursorProviderImpl;
+import org.hornetq.core.paging.cursor.impl.PagePositionImpl;
+import org.hornetq.core.paging.cursor.impl.PageSubscriptionImpl;
+import org.hornetq.core.paging.impl.PagingStoreImpl;
+import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.RoutingContext;
+import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.impl.RoutingContextImpl;
+import org.hornetq.core.server.impl.ServerMessageImpl;
+import org.hornetq.core.settings.impl.AddressSettings;
+import org.hornetq.core.transaction.Transaction;
+import org.hornetq.core.transaction.impl.TransactionImpl;
+import org.hornetq.tests.unit.core.postoffice.impl.FakeQueue;
+import org.hornetq.tests.util.RandomUtil;
+import org.hornetq.tests.util.ServiceTestBase;
+import org.hornetq.utils.LinkedListIterator;
+
+/**
+ * A PageCursorTest
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class PageCursorStressTest extends ServiceTestBase
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private SimpleString ADDRESS = new SimpleString("test-add");
+
+   private HornetQServer server;
+
+   private Queue queue;
+   
+   private List<Queue> queueList;
+
+   private static final int PAGE_MAX = -1;
+
+   private static final int PAGE_SIZE = 10 * 1024 * 1024;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   // Read more cache than what would fit on the memory, and validate if the memory would be cleared through soft-caches
+   public void testReadCache() throws Exception
+   {
+
+      final int NUM_MESSAGES = 1000;
+
+      int numberOfPages = addMessages(NUM_MESSAGES, 1024 * 1024);
+
+      System.out.println("NumberOfPages = " + numberOfPages);
+
+      PageCursorProviderImpl cursorProvider = new PageCursorProviderImpl(lookupPageStore(ADDRESS),
+                                                                         server.getStorageManager(),
+                                                                         server.getExecutorFactory());
+
+      for (int i = 0; i < numberOfPages; i++)
+      {
+         PageCache cache = cursorProvider.getPageCache(new PagePositionImpl(i + 1, 0));
+         System.out.println("Page " + i + " had " + cache.getNumberOfMessages() + " messages");
+
+      }
+
+      forceGC();
+
+      assertTrue(cursorProvider.getCacheSize() < numberOfPages);
+
+      System.out.println("Cache size = " + cursorProvider.getCacheSize());
+   }
+
+   public void testSimpleCursor() throws Exception
+   {
+
+      final int NUM_MESSAGES = 100;
+
+      PageSubscription cursor = lookupPageStore(ADDRESS).getCursorProvier().getSubscription(queue.getID());
+      
+      Iterator<PagedReference> iterEmpty = cursor.iterator();
+
+      int numberOfPages = addMessages(NUM_MESSAGES, 1024 * 1024);
+
+      System.out.println("NumberOfPages = " + numberOfPages);
+
+      PagedReference msg;
+
+      LinkedListIterator<PagedReference> iterator = cursor.iterator();
+      int key = 0;
+      while ((msg = iterator.next()) != null)
+      {
+         assertEquals(key++, msg.getMessage().getIntProperty("key").intValue());
+         cursor.ack(msg.getPosition());
+      }
+      assertEquals(NUM_MESSAGES, key);
+
+      server.getStorageManager().waitOnOperations();
+
+      waitCleanup();
+
+      assertFalse(lookupPageStore(ADDRESS).isPaging());
+
+      assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
+
+      forceGC();
+
+      server.stop();
+      createServer();
+      waitCleanup();
+      assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
+
+   }
+
+   public void testSimpleCursorWithFilter() throws Exception
+   {
+
+      final int NUM_MESSAGES = 100;
+
+      PageSubscription cursorEven = createNonPersistentCursor(new Filter()
+      {
+
+         public boolean match(ServerMessage message)
+         {
+            Boolean property = message.getBooleanProperty("even");
+            if (property == null)
+            {
+               return false;
+            }
+            else
+            {
+               return property.booleanValue();
+            }
+         }
+
+         public SimpleString getFilterString()
+         {
+            return new SimpleString("even=true");
+         }
+
+      });
+
+      PageSubscription cursorOdd = createNonPersistentCursor(new Filter()
+      {
+
+         public boolean match(ServerMessage message)
+         {
+            Boolean property = message.getBooleanProperty("even");
+            if (property == null)
+            {
+               return false;
+            }
+            else
+            {
+               return !property.booleanValue();
+            }
+         }
+
+         public SimpleString getFilterString()
+         {
+            return new SimpleString("even=true");
+         }
+
+      });
+
+      int numberOfPages = addMessages(NUM_MESSAGES, 1024 * 1024);
+
+      System.out.println("NumberOfPages = " + numberOfPages);
+
+      queue.getPageSubscription().close();
+
+      PagedReference msg;
+
+      LinkedListIterator<PagedReference> iteratorEven = cursorEven.iterator();
+
+      LinkedListIterator<PagedReference> iteratorOdd = cursorOdd.iterator();
+
+      int key = 0;
+      while ((msg = iteratorEven.next()) != null)
+      {
+         System.out.println("Received" + msg);
+         assertEquals(key, msg.getMessage().getIntProperty("key").intValue());
+         assertTrue(msg.getMessage().getBooleanProperty("even").booleanValue());
+         key += 2;
+         cursorEven.ack(msg.getPosition());
+      }
+      assertEquals(NUM_MESSAGES, key);
+
+      key = 1;
+      while ((msg = iteratorOdd.next()) != null)
+      {
+         assertEquals(key, msg.getMessage().getIntProperty("key").intValue());
+         assertFalse(msg.getMessage().getBooleanProperty("even").booleanValue());
+         key += 2;
+         cursorOdd.ack(msg.getPosition());
+      }
+      assertEquals(NUM_MESSAGES + 1, key);
+
+      forceGC();
+
+      // assertTrue(lookupCursorProvider().getCacheSize() < numberOfPages);
+
+      server.stop();
+      createServer();
+      waitCleanup();
+      assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
+
+   }
+
+   public void testReadNextPage() throws Exception
+   {
+
+      final int NUM_MESSAGES = 1;
+
+      int numberOfPages = addMessages(NUM_MESSAGES, 1024);
+
+      System.out.println("NumberOfPages = " + numberOfPages);
+
+      PageCursorProvider cursorProvider = lookupCursorProvider();
+
+      PageCache cache = cursorProvider.getPageCache(new PagePositionImpl(2, 0));
+
+      assertNull(cache);
+   }
+
+   public void testRestart() throws Exception
+   {
+      final int NUM_MESSAGES = 1000;
+
+      int numberOfPages = addMessages(NUM_MESSAGES, 100 * 1024);
+
+      System.out.println("Number of pages = " + numberOfPages);
+
+      PageCursorProvider cursorProvider = lookupCursorProvider();
+
+      PageSubscription cursor = this.server.getPagingManager()
+                                           .getPageStore(ADDRESS)
+                                           .getCursorProvier()
+                                           .getSubscription(queue.getID());
+
+      PageCache firstPage = cursorProvider.getPageCache(new PagePositionImpl(server.getPagingManager()
+                                                                                   .getPageStore(ADDRESS)
+                                                                                   .getFirstPage(), 0));
+
+      int firstPageSize = firstPage.getNumberOfMessages();
+
+      firstPage = null;
+
+      System.out.println("Cursor: " + cursor);
+      cursorProvider.printDebug();
+
+      LinkedListIterator<PagedReference> iterator = cursor.iterator();
+
+      for (int i = 0; i < 1000; i++)
+      {
+         System.out.println("Reading Msg : " + i);
+         PagedReference msg = iterator.next();
+         assertNotNull(msg);
+         assertEquals(i, msg.getMessage().getIntProperty("key").intValue());
+
+         if (i < firstPageSize)
+         {
+            cursor.ack(msg);
+         }
+      }
+      cursorProvider.printDebug();
+
+      server.getStorageManager().waitOnOperations();
+      lookupPageStore(ADDRESS).flushExecutors();
+
+      // needs to clear the context since we are using the same thread over two distinct servers
+      // otherwise we will get the old executor on the factory
+      OperationContextImpl.clearContext();
+
+      server.stop();
+
+      server.start();
+
+      cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getSubscription(queue.getID());
+
+      iterator = cursor.iterator();
+
+      for (int i = firstPageSize; i < NUM_MESSAGES; i++)
+      {
+         System.out.println("Received " + i);
+         PagedReference msg = iterator.next();
+         assertNotNull(msg);
+         assertEquals(i, msg.getMessage().getIntProperty("key").intValue());
+
+         cursor.ack(msg);
+
+         OperationContextImpl.getContext(null).waitCompletion();
+
+      }
+
+      OperationContextImpl.getContext(null).waitCompletion();
+
+      lookupPageStore(ADDRESS).flushExecutors();
+
+      assertFalse(lookupPageStore(ADDRESS).isPaging());
+
+      server.stop();
+      createServer();
+      assertFalse(lookupPageStore(ADDRESS).isPaging());
+      waitCleanup();
+      assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
+
+   }
+
+   public void testRestartWithHoleOnAck() throws Exception
+   {
+
+      final int NUM_MESSAGES = 1000;
+
+      int numberOfPages = addMessages(NUM_MESSAGES, 10 * 1024);
+
+      System.out.println("Number of pages = " + numberOfPages);
+
+      PageCursorProvider cursorProvider = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier();
+      System.out.println("cursorProvider = " + cursorProvider);
+
+      PageSubscription cursor = this.server.getPagingManager()
+                                           .getPageStore(ADDRESS)
+                                           .getCursorProvier()
+                                           .getSubscription(queue.getID());
+
+      System.out.println("Cursor: " + cursor);
+      LinkedListIterator<PagedReference> iterator = cursor.iterator();
+      for (int i = 0; i < 100; i++)
+      {
+         PagedReference msg = iterator.next();
+         assertEquals(i, msg.getMessage().getIntProperty("key").intValue());
+         if (i < 10 || i > 20)
+         {
+            cursor.ack(msg);
+         }
+      }
+      
+      server.getStorageManager().waitOnOperations();
+
+      server.stop();
+
+      OperationContextImpl.clearContext();
+
+      server.start();
+
+      cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getSubscription(queue.getID());
+      iterator = cursor.iterator();
+
+      for (int i = 10; i <= 20; i++)
+      {
+         PagedReference msg = iterator.next();
+         assertEquals(i, msg.getMessage().getIntProperty("key").intValue());
+         cursor.ack(msg);
+      }
+
+      for (int i = 100; i < NUM_MESSAGES; i++)
+      {
+         PagedReference msg = iterator.next();
+         assertEquals(i, msg.getMessage().getIntProperty("key").intValue());
+         cursor.ack(msg);
+      }
+
+      server.stop();
+      createServer();
+      waitCleanup();
+      assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
+
+   }
+
+   public void testRestartWithHoleOnAckAndTransaction() throws Exception
+   {
+      final int NUM_MESSAGES = 1000;
+
+      int numberOfPages = addMessages(NUM_MESSAGES, 10 * 1024);
+
+      System.out.println("Number of pages = " + numberOfPages);
+
+      PageCursorProvider cursorProvider = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier();
+      System.out.println("cursorProvider = " + cursorProvider);
+
+      PageSubscription cursor = this.server.getPagingManager()
+                                           .getPageStore(ADDRESS)
+                                           .getCursorProvier()
+                                           .getSubscription(queue.getID());
+
+      System.out.println("Cursor: " + cursor);
+
+      Transaction tx = new TransactionImpl(server.getStorageManager(), 60 * 1000);
+
+      LinkedListIterator<PagedReference> iterator = cursor.iterator();
+
+      for (int i = 0; i < 100; i++)
+      {
+         PagedReference msg = iterator.next();
+         assertEquals(i, msg.getMessage().getIntProperty("key").intValue());
+         if (i < 10 || i > 20)
+         {
+            cursor.ackTx(tx, msg);
+         }
+      }
+
+      tx.commit();
+
+      server.stop();
+
+      OperationContextImpl.clearContext();
+
+      server.start();
+
+      cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getSubscription(queue.getID());
+
+      tx = new TransactionImpl(server.getStorageManager(), 60 * 1000);
+      iterator = cursor.iterator();
+
+      for (int i = 10; i <= 20; i++)
+      {
+         PagedReference msg = iterator.next();
+         assertEquals(i, msg.getMessage().getIntProperty("key").intValue());
+         cursor.ackTx(tx, msg);
+      }
+
+      for (int i = 100; i < NUM_MESSAGES; i++)
+      {
+         PagedReference msg = iterator.next();
+         assertEquals(i, msg.getMessage().getIntProperty("key").intValue());
+         cursor.ackTx(tx, msg);
+      }
+
+      tx.commit();
+
+      server.stop();
+      createServer();
+      waitCleanup();
+      assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
+
+   }
+
+   public void testConsumeLivePage() throws Exception
+   {
+      PagingStoreImpl pageStore = lookupPageStore(ADDRESS);
+
+      pageStore.startPaging();
+
+      final int NUM_MESSAGES = 100;
+
+      final int messageSize = 1024 * 1024;
+
+      PageCursorProvider cursorProvider = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier();
+      System.out.println("cursorProvider = " + cursorProvider);
+
+      PageSubscription cursor = this.server.getPagingManager()
+                                           .getPageStore(ADDRESS)
+                                           .getCursorProvier()
+                                           .getSubscription(queue.getID());
+
+      System.out.println("Cursor: " + cursor);
+      
+      RoutingContextImpl ctx = generateCTX();
+
+      LinkedListIterator<PagedReference> iterator = cursor.iterator();
+
+      for (int i = 0; i < NUM_MESSAGES; i++)
+      {
+         // if (i % 100 == 0)
+         System.out.println("read/written " + i);
+
+         HornetQBuffer buffer = RandomUtil.randomBuffer(messageSize, i + 1l);
+
+         ServerMessage msg = new ServerMessageImpl(i, buffer.writerIndex());
+         msg.putIntProperty("key", i);
+
+         msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
+
+         Assert.assertTrue(pageStore.page(msg, ctx, ctx.getContextListing(ADDRESS)));
+
+         PagedReference readMessage = iterator.next();
+
+         assertNotNull(readMessage);
+
+         assertEquals(i, readMessage.getMessage().getIntProperty("key").intValue());
+
+         assertNull(iterator.next());
+      }
+
+      server.stop();
+
+      OperationContextImpl.clearContext();
+
+      createServer();
+
+      pageStore = lookupPageStore(ADDRESS);
+
+      cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getSubscription(queue.getID());
+      iterator = cursor.iterator();
+
+      for (int i = 0; i < NUM_MESSAGES * 2; i++)
+      {
+         if (i % 100 == 0)
+            System.out.println("Paged " + i);
+
+         if (i >= NUM_MESSAGES)
+         {
+
+            HornetQBuffer buffer = RandomUtil.randomBuffer(messageSize, i + 1l);
+
+            ServerMessage msg = new ServerMessageImpl(i, buffer.writerIndex());
+            msg.putIntProperty("key", i);
+
+            msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
+
+            Assert.assertTrue(pageStore.page(msg, ctx, ctx.getContextListing(ADDRESS)));
+         }
+
+         PagedReference readMessage = iterator.next();
+
+         assertNotNull(readMessage);
+
+         assertEquals(i, readMessage.getMessage().getIntProperty("key").intValue());
+      }
+
+      server.stop();
+
+      OperationContextImpl.clearContext();
+
+      createServer();
+
+      pageStore = lookupPageStore(ADDRESS);
+
+      cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getSubscription(queue.getID());
+      iterator = cursor.iterator();
+
+      for (int i = 0; i < NUM_MESSAGES * 3; i++)
+      {
+         if (i % 100 == 0)
+            System.out.println("Paged " + i);
+
+         if (i >= NUM_MESSAGES * 2 - 1)
+         {
+
+            HornetQBuffer buffer = RandomUtil.randomBuffer(messageSize, i + 1l);
+
+            ServerMessage msg = new ServerMessageImpl(i, buffer.writerIndex());
+            msg.putIntProperty("key", i + 1);
+
+            msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
+
+            Assert.assertTrue(pageStore.page(msg, ctx, ctx.getContextListing(ADDRESS)));
+         }
+
+         PagedReference readMessage = iterator.next();
+
+         assertNotNull(readMessage);
+
+         cursor.ack(readMessage);
+
+         assertEquals(i, readMessage.getMessage().getIntProperty("key").intValue());
+      }
+
+      PagedReference readMessage = iterator.next();
+
+      assertEquals(NUM_MESSAGES * 3, readMessage.getMessage().getIntProperty("key").intValue());
+
+      cursor.ack(readMessage);
+
+      server.getStorageManager().waitOnOperations();
+
+      pageStore.flushExecutors();
+
+      assertFalse(pageStore.isPaging());
+
+      server.stop();
+      createServer();
+
+      assertFalse(pageStore.isPaging());
+
+      waitCleanup();
+
+      assertFalse(lookupPageStore(ADDRESS).isPaging());
+
+   }
+   
+
+   public void testConsumeLivePageMultiThread() throws Exception
+   {
+      final PagingStoreImpl pageStore = lookupPageStore(ADDRESS);
+
+      pageStore.startPaging();
+
+      final int NUM_TX = 100;
+      
+      final int MSGS_TX = 100;
+      
+      final int TOTAL_MSG = NUM_TX * MSGS_TX;
+
+      final int messageSize = 1024;
+
+      PageCursorProvider cursorProvider = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier();
+      System.out.println("cursorProvider = " + cursorProvider);
+
+      PageSubscription cursor = this.server.getPagingManager()
+                                           .getPageStore(ADDRESS)
+                                           .getCursorProvier()
+                                           .getSubscription(queue.getID());
+
+      System.out.println("Cursor: " + cursor);
+      
+      final StorageManager storage = this.server.getStorageManager();
+      
+      final AtomicInteger exceptions = new AtomicInteger(0);
+      
+      Thread t1 = new Thread()
+      {
+         public void run()
+         {
+            try
+            {
+               int count = 0;
+               
+               for (int txCount = 0; txCount < NUM_TX; txCount++)
+               {
+                  
+                  Transaction tx = null;
+                  
+                  if (txCount % 2 == 0)
+                  {
+                     tx = new TransactionImpl(storage);
+                  }
+
+                  RoutingContext ctx = generateCTX(tx);
+                  
+                  for (int i = 0 ; i < MSGS_TX; i++)
+                  {
+                     //System.out.println("Sending " + count);
+                     HornetQBuffer buffer = RandomUtil.randomBuffer(messageSize, count);
+
+                     ServerMessage msg = new ServerMessageImpl(i, buffer.writerIndex());
+                     msg.putIntProperty("key", count++);
+
+                     msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
+
+                     Assert.assertTrue(pageStore.page(msg, ctx, ctx.getContextListing(ADDRESS)));
+                  }
+                  
+                  if (tx != null)
+                  {
+                     tx.commit();
+                  }
+                  
+               }
+            }
+            catch (Throwable e)
+            {
+               e.printStackTrace();
+               exceptions.incrementAndGet();
+            }
+         }
+      };
+      
+      t1.start();
+      
+      
+      LinkedListIterator<PagedReference> iterator = cursor.iterator();
+      
+      for (int i = 0 ; i < TOTAL_MSG; i++ )
+      {
+         assertEquals(0, exceptions.get());
+         PagedReference ref = null;
+         for (int repeat = 0 ; repeat < 5; repeat++)
+         {
+            ref = iterator.next();
+            if (ref == null)
+            {
+               Thread.sleep(1000);
+            }
+            else
+            {
+               break;
+            }
+         }
+         assertNotNull(ref);
+         
+         ref.acknowledge();
+         assertNotNull(ref);
+         
+         System.out.println("Consuming " + ref.getMessage().getIntProperty("key"));
+         //assertEquals(i, ref.getMessage().getIntProperty("key").intValue());
+      }
+
+      assertEquals(0, exceptions.get());
+   }
+   
+   private RoutingContextImpl generateCTX()
+   {
+      return generateCTX(null);
+   }
+   
+   private RoutingContextImpl generateCTX(Transaction tx)
+   {
+      RoutingContextImpl ctx = new RoutingContextImpl(tx);
+      ctx.addQueue(ADDRESS, queue);
+      
+      for (Queue q : this.queueList)
+      {
+         ctx.addQueue(ADDRESS, q);
+      }
+      
+      return ctx;
+   }
+
+   /**
+    * @throws Exception
+    * @throws InterruptedException
+    */
+   private void waitCleanup() throws Exception, InterruptedException
+   {
+      // The cleanup is done asynchronously, so we need to wait some time
+      long timeout = System.currentTimeMillis() + 10000;
+
+      while (System.currentTimeMillis() < timeout && lookupPageStore(ADDRESS).getNumberOfPages() != 1)
+      {
+         Thread.sleep(100);
+      }
+
+      assertTrue("expected " + lookupPageStore(ADDRESS).getNumberOfPages(),
+                 lookupPageStore(ADDRESS).getNumberOfPages() <= 2);
+   }
+
+   public void testPrepareScenarios() throws Exception
+   {
+      PagingStoreImpl pageStore = lookupPageStore(ADDRESS);
+
+      pageStore.startPaging();
+
+      final int NUM_MESSAGES = 100;
+
+      final int messageSize = 100 * 1024;
+
+      PageCursorProvider cursorProvider = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier();
+      System.out.println("cursorProvider = " + cursorProvider);
+
+      PageSubscription cursor = this.server.getPagingManager()
+                                           .getPageStore(ADDRESS)
+                                           .getCursorProvier()
+                                           .getSubscription(queue.getID());
+      LinkedListIterator<PagedReference> iterator = cursor.iterator();
+
+      System.out.println("Cursor: " + cursor);
+
+      StorageManager storage = this.server.getStorageManager();
+
+      long pgtxRollback = storage.generateUniqueID();
+      long pgtxForgotten = storage.generateUniqueID();
+      long pgtxCommit = storage.generateUniqueID();
+
+      Transaction txRollback = pgMessages(storage, pageStore, pgtxRollback, 0, NUM_MESSAGES, messageSize);
+      pageStore.forceAnotherPage();
+      Transaction txForgotten = pgMessages(storage, pageStore, pgtxForgotten, 100, NUM_MESSAGES, messageSize);
+      pageStore.forceAnotherPage();
+      Transaction txCommit = pgMessages(storage, pageStore, pgtxCommit, 200, NUM_MESSAGES, messageSize);
+      pageStore.forceAnotherPage();
+
+      addMessages(300, NUM_MESSAGES, messageSize);
+
+      System.out.println("Number of pages - " + pageStore.getNumberOfPages());
+
+      // First consume what's already there without any tx as nothing was committed
+      for (int i = 300; i < 400; i++)
+      {
+         PagedReference pos = iterator.next();
+         assertNotNull("Null at position " + i, pos);
+         assertEquals(i, pos.getMessage().getIntProperty("key").intValue());
+         cursor.ack(pos);
+      }
+
+      assertNull(iterator.next());
+
+      cursor.printDebug();
+ 
+      txCommit.commit();
+
+      txRollback.rollback();
+      
+      storage.waitOnOperations();
+
+      // Second:after pgtxCommit was done
+      for (int i = 200; i < 300; i++)
+      {
+         PagedReference pos = iterator.next();
+         assertNotNull(pos);
+         assertEquals(i, pos.getMessage().getIntProperty("key").intValue());
+         cursor.ack(pos);
+      }
+
+      assertNull(iterator.next());
+      
+      server.getStorageManager().waitOnOperations();
+
+      server.stop();
+      createServer();
+
+      long timeout = System.currentTimeMillis() + 10000;
+
+      while (System.currentTimeMillis() < timeout && lookupPageStore(ADDRESS).getNumberOfPages() != 1)
+      {
+         Thread.sleep(500);
+      }
+      assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
+
+   }
+
+
+   public void testLazyCommit() throws Exception
+   {
+      PagingStoreImpl pageStore = lookupPageStore(ADDRESS);
+
+      pageStore.startPaging();
+
+      final int NUM_MESSAGES = 100;
+
+      final int messageSize = 100 * 1024;
+
+      PageCursorProvider cursorProvider = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier();
+      System.out.println("cursorProvider = " + cursorProvider);
+
+      PageSubscription cursor = this.server.getPagingManager()
+                                           .getPageStore(ADDRESS)
+                                           .getCursorProvier()
+                                           .getSubscription(queue.getID());
+      LinkedListIterator<PagedReference> iterator = cursor.iterator();
+
+      System.out.println("Cursor: " + cursor);
+
+      StorageManager storage = this.server.getStorageManager();
+
+      long pgtxLazy = storage.generateUniqueID();
+
+      Transaction txLazy = pgMessages(storage, pageStore, pgtxLazy, 0, NUM_MESSAGES, messageSize);
+
+      addMessages(100, NUM_MESSAGES, messageSize);
+
+      System.out.println("Number of pages - " + pageStore.getNumberOfPages());
+
+      // First consume what's already there without any tx as nothing was committed
+      for (int i = 100; i < 200; i++)
+      {
+         PagedReference pos = iterator.next();
+         assertNotNull("Null at position " + i, pos);
+         assertEquals(i, pos.getMessage().getIntProperty("key").intValue());
+         cursor.ack(pos);
+      }
+
+      assertNull(iterator.next());
+      
+      txLazy.commit();
+      
+      storage.waitOnOperations();
+
+      for (int i = 0; i < 100; i++)
+      {
+         PagedReference pos = iterator.next();
+         assertNotNull("Null at position " + i, pos);
+         assertEquals(i, pos.getMessage().getIntProperty("key").intValue());
+         cursor.ack(pos);
+      }
+
+      assertNull(iterator.next());
+
+      waitCleanup();
+
+      server.stop();
+      createServer();
+      waitCleanup();
+      assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
+
+   }
+
+   public void testCloseNonPersistentConsumer() throws Exception
+   {
+
+      final int NUM_MESSAGES = 100;
+
+      PageCursorProvider cursorProvider = lookupCursorProvider();
+
+      PageSubscription cursor = cursorProvider.createSubscription(11, null, false);
+      PageSubscriptionImpl cursor2 = (PageSubscriptionImpl)cursorProvider.createSubscription(12, null, false);
+      
+      this.queueList.add(new FakeQueue(new SimpleString("a"), 11));
+      
+      this.queueList.add(new FakeQueue(new SimpleString("b"), 12));
+
+      int numberOfPages = addMessages(NUM_MESSAGES, 1024 * 1024);
+
+      System.out.println("NumberOfPages = " + numberOfPages);
+
+      queue.getPageSubscription().close();
+
+      PagedReference msg;
+      LinkedListIterator<PagedReference> iterator = cursor.iterator();
+      LinkedListIterator<PagedReference> iterator2 = cursor2.iterator();
+      
+      cursor2.bookmark(new PagePositionImpl(1, -1));
+
+      int key = 0;
+      while ((msg = iterator.next()) != null)
+      {
+         System.out.println("key = " + key);
+         assertEquals(key++, msg.getMessage().getIntProperty("key").intValue());
+         cursor.ack(msg);
+      }
+      assertEquals(NUM_MESSAGES, key);
+
+      forceGC();
+
+      for (int i = 0; i < 10; i++)
+      {
+         assertTrue(iterator2.hasNext());
+         msg = iterator2.next();
+         assertEquals(i, msg.getMessage().getIntProperty("key").intValue());
+      }
+
+      assertSame(cursor2.getProvider(), cursorProvider);
+
+      cursor2.close();
+
+      lookupPageStore(ADDRESS).flushExecutors();
+
+      server.stop();
+      createServer();
+      waitCleanup();
+      assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
+
+   }
+
+   public void testNoCursors() throws Exception
+   {
+
+      final int NUM_MESSAGES = 100;
+
+      int numberOfPages = addMessages(NUM_MESSAGES, 1024 * 1024);
+
+      ClientSessionFactory sf = createInVMFactory();
+      ClientSession session = sf.createSession();
+      session.deleteQueue(ADDRESS);
+
+      System.out.println("NumberOfPages = " + numberOfPages);
+
+      server.stop();
+      createServer();
+      waitCleanup();
+      assertEquals(0, lookupPageStore(ADDRESS).getNumberOfPages());
+
+   }
+
+   public void testFirstMessageInTheMiddle() throws Exception
+   {
+
+      final int NUM_MESSAGES = 100;
+
+      PageCursorProvider cursorProvider = lookupCursorProvider();
+
+      PageSubscription cursor = cursorProvider.createSubscription(2, null, false);
+      
+      queueList.add(new FakeQueue(new SimpleString("tmp"), 2));
+
+      int numberOfPages = addMessages(NUM_MESSAGES, 1024 * 1024);
+
+      System.out.println("NumberOfPages = " + numberOfPages);
+
+      PageCache cache = cursorProvider.getPageCache(new PagePositionImpl(5, 0));
+
+      queue.getPageSubscription().close();
+
+      PagePosition startingPos = new PagePositionImpl(5, cache.getNumberOfMessages() / 2);
+      cursor.bookmark(startingPos);
+      PagedMessage msg = cache.getMessage(startingPos.getMessageNr() + 1);
+      msg.initMessage(server.getStorageManager());
+      int key = msg.getMessage().getIntProperty("key").intValue();
+
+      msg = null;
+
+      cache = null;
+      LinkedListIterator<PagedReference> iterator = cursor.iterator();
+
+      PagedReference msgCursor = null;
+      while ((msgCursor = iterator.next()) != null)
+      {
+         assertEquals(key++, msgCursor.getMessage().getIntProperty("key").intValue());
+         cursor.ack(msgCursor);
+      }
+      assertEquals(NUM_MESSAGES, key);
+
+      forceGC();
+
+      // assertTrue(cursorProvider.getCacheSize() < numberOfPages);
+
+      server.stop();
+      createServer();
+      waitCleanup();
+      assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
+   }
+
+   public void testFirstMessageInTheMiddlePersistent() throws Exception
+   {
+
+      final int NUM_MESSAGES = 100;
+
+      int numberOfPages = addMessages(NUM_MESSAGES, 1024 * 1024);
+
+      System.out.println("NumberOfPages = " + numberOfPages);
+
+      PageCursorProvider cursorProvider = lookupCursorProvider();
+
+      PageCache cache = cursorProvider.getPageCache(new PagePositionImpl(5, 0));
+
+      PageSubscription cursor = cursorProvider.getSubscription(queue.getID());
+      PagePosition startingPos = new PagePositionImpl(5, cache.getNumberOfMessages() / 2);
+      cursor.bookmark(startingPos);
+
+      // We can't proceed until the operation has finished
+      server.getStorageManager().waitOnOperations();
+      
+      PagedMessage msg = cache.getMessage(startingPos.getMessageNr() + 1);
+      msg.initMessage(server.getStorageManager());
+      int initialKey = msg.getMessage().getIntProperty("key").intValue();
+      int key = initialKey;
+
+      msg = null;
+
+      cache = null;
+
+      LinkedListIterator<PagedReference> iterator = cursor.iterator();
+
+      PagedReference msgCursor = null;
+      while ((msgCursor = iterator.next()) != null)
+      {
+         assertEquals(key++, msgCursor.getMessage().getIntProperty("key").intValue());
+      }
+      assertEquals(NUM_MESSAGES, key);
+
+      server.stop();
+
+      OperationContextImpl.clearContext();
+
+      createServer();
+
+      cursorProvider = lookupCursorProvider();
+      cursor = cursorProvider.getSubscription(queue.getID());
+      key = initialKey;
+      iterator = cursor.iterator();
+      while ((msgCursor = iterator.next()) != null)
+      {
+         assertEquals(key++, msgCursor.getMessage().getIntProperty("key").intValue());
+         cursor.ack(msgCursor);
+      }
+
+      forceGC();
+
+      assertTrue(cursorProvider.getCacheSize() < numberOfPages);
+
+      server.stop();
+      createServer();
+      waitCleanup();
+      assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
+
+   }
+   
+   private int tstProperty(ServerMessage msg)
+   {
+      return msg.getIntProperty("key").intValue();
+   }
+
+   public void testMultipleIterators() throws Exception
+   {
+
+      final int NUM_MESSAGES = 10;
+
+      int numberOfPages = addMessages(NUM_MESSAGES, 1024 * 1024);
+
+      System.out.println("NumberOfPages = " + numberOfPages);
+
+      PageCursorProvider cursorProvider = lookupCursorProvider();
+
+      PageSubscription cursor = cursorProvider.getSubscription(queue.getID());
+
+      LinkedListIterator<PagedReference> iter = cursor.iterator();
+      
+      LinkedListIterator<PagedReference> iter2 = cursor.iterator();
+      
+      assertTrue(iter.hasNext());
+      
+      PagedReference msg1 = iter.next();
+      
+      PagedReference msg2 = iter2.next();
+      
+      assertEquals(tstProperty(msg1.getMessage()), tstProperty(msg2.getMessage()));
+      
+      System.out.println("property = " + tstProperty(msg1.getMessage()));
+
+      msg1 = iter.next();
+      
+      assertEquals(1, tstProperty(msg1.getMessage()));
+      
+      iter.remove();
+      
+      msg2 = iter2.next();
+      
+      assertEquals(2, tstProperty(msg2.getMessage()));
+      
+      iter2.repeat();
+      
+      msg2 = iter2.next();
+      
+      assertEquals(2, tstProperty(msg2.getMessage()));
+      
+      iter2.repeat();
+      
+      assertEquals(2, tstProperty(msg2.getMessage()));
+      
+      msg1 = iter.next();
+      
+      assertEquals(2, tstProperty(msg1.getMessage()));
+      
+      iter.repeat();
+      
+      msg1 = iter.next();
+      
+      assertEquals(2, tstProperty(msg1.getMessage()));
+      
+      assertTrue(iter2.hasNext());
+      
+      
+   }
+
+   private int addMessages(final int numMessages, final int messageSize) throws Exception
+   {
+      return addMessages(0, numMessages, messageSize);
+   }
+
+   /**
+    * @param numMessages
+    * @param pageStore
+    * @throws Exception
+    */
+   private int addMessages(final int start, final int numMessages, final int messageSize) throws Exception
+   {
+      PagingStoreImpl pageStore = lookupPageStore(ADDRESS);
+
+      pageStore.startPaging();
+      
+      RoutingContext ctx = generateCTX();
+
+      for (int i = start; i < start + numMessages; i++)
+      {
+         if (i % 100 == 0)
+            System.out.println("Paged " + i);
+         HornetQBuffer buffer = RandomUtil.randomBuffer(messageSize, i + 1l);
+
+         ServerMessage msg = new ServerMessageImpl(i, buffer.writerIndex());
+         msg.putIntProperty("key", i);
+         // to be used on tests that are validating filters
+         msg.putBooleanProperty("even", i % 2 == 0);
+
+         msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
+
+         Assert.assertTrue(pageStore.page(msg, ctx, ctx.getContextListing(ADDRESS)));
+      }
+
+      return pageStore.getNumberOfPages();
+   }
+
+   /**
+    * @return
+    * @throws Exception
+    */
+   private PagingStoreImpl lookupPageStore(SimpleString address) throws Exception
+   {
+      return (PagingStoreImpl)server.getPagingManager().getPageStore(address);
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   protected void tearDown() throws Exception
+   {
+      server.stop();
+      server = null;
+      queue = null;
+      queueList = null;
+      super.tearDown();
+   }
+
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+      OperationContextImpl.clearContext();
+      System.out.println("Tmp:" + getTemporaryDir());
+
+      queueList = new ArrayList<Queue>();
+      
+      createServer();
+   }
+
+   /**
+    * @throws Exception
+    */
+   private void createServer() throws Exception
+   {
+      OperationContextImpl.clearContext();
+
+      Configuration config = createDefaultConfig();
+
+      config.setJournalSyncNonTransactional(true);
+
+      server = createServer(true, config, PAGE_SIZE, PAGE_MAX, new HashMap<String, AddressSettings>());
+
+      server.start();
+
+      try
+      {
+         queue = server.createQueue(ADDRESS, ADDRESS, null, true, false);
+         queue.pause();
+      }
+      catch (Exception ignored)
+      {
+      }
+   }
+
+   /**
+    * @return
+    * @throws Exception
+    */
+   private PageSubscription createNonPersistentCursor(Filter filter) throws Exception
+   {
+      long id = server.getStorageManager().generateUniqueID();
+      queueList.add(new FakeQueue(new SimpleString(filter.toString()), id));
+      return lookupCursorProvider().createSubscription(id, filter, false);
+   }
+
+   /**
+    * @return
+    * @throws Exception
+    */
+   private PageCursorProvider lookupCursorProvider() throws Exception
+   {
+      return lookupPageStore(ADDRESS).getCursorProvier();
+   }
+
+   /**
+    * @param storage
+    * @param pageStore
+    * @param pgParameter
+    * @param start
+    * @param NUM_MESSAGES
+    * @param messageSize
+    * @throws Exception
+    */
+   private Transaction pgMessages(StorageManager storage,
+                           PagingStoreImpl pageStore,
+                           long pgParameter,
+                           int start,
+                           final int NUM_MESSAGES,
+                           final int messageSize) throws Exception
+   {
+      
+      TransactionImpl txImpl = new TransactionImpl(pgParameter, null, storage);
+      
+      RoutingContext ctx = generateCTX(txImpl);
+
+      for (int i = start; i < start + NUM_MESSAGES; i++)
+      {
+         HornetQBuffer buffer = RandomUtil.randomBuffer(messageSize, i + 1l);
+         ServerMessage msg = new ServerMessageImpl(storage.generateUniqueID(), buffer.writerIndex());
+         msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
+         msg.putIntProperty("key", i);
+         pageStore.page(msg, ctx, ctx.getContextListing(ADDRESS));
+      }
+      
+      return txImpl;
+
+   }
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}



More information about the hornetq-commits mailing list