[hornetq-commits] JBoss hornetq SVN: r9794 - in branches/Branch_New_Paging: src/main/org/hornetq/core/paging/cursor and 4 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Sat Oct 16 00:00:17 EDT 2010


Author: clebert.suconic at jboss.com
Date: 2010-10-16 00:00:15 -0400 (Sat, 16 Oct 2010)
New Revision: 9794

Modified:
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCache.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/LivePageCacheImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCacheImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/TestSupportPageStore.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
Log:
Cleanup on paging

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java	2010-10-15 20:18:59 UTC (rev 9793)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java	2010-10-16 04:00:15 UTC (rev 9794)
@@ -70,7 +70,20 @@
    PageCursorProvider getCursorProvier();
    
    void processReload() throws Exception;
+   
+   /** 
+    * Remove the first page from the Writing Queue.
+    * The file will still exist until Page.delete is called, 
+    * So, case the system is reloaded the same Page will be loaded back if delete is not called.
+    *
+    * @throws Exception
+    * 
+    * Note: This should still be part of the interface, even though HornetQ only uses through the 
+    */
+   Page depage() throws Exception;
 
+
+
    /**
     * @return false if a thread was already started, or if not in page mode
     * @throws Exception 

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCache.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCache.java	2010-10-15 20:18:59 UTC (rev 9793)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCache.java	2010-10-16 04:00:15 UTC (rev 9794)
@@ -26,6 +26,8 @@
 public interface PageCache
 {
    Page getPage();
+   
+   long getPageId();
 
    int getNumberOfMessages();
    

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java	2010-10-15 20:18:59 UTC (rev 9793)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java	2010-10-16 04:00:15 UTC (rev 9794)
@@ -15,7 +15,6 @@
 
 import org.hornetq.api.core.Pair;
 import org.hornetq.core.paging.PagedMessage;
-import org.hornetq.core.server.ServerMessage;
 import org.hornetq.core.transaction.Transaction;
 
 /**
@@ -37,6 +36,11 @@
    void ack(PagePosition position) throws Exception;
 
    void ackTx(Transaction tx, PagePosition position) throws Exception;
+   /**
+    * 
+    * @return the first page in use or MAX_LONG if none is in use
+    */
+   long getFirstPage();
    
    // Reload operations
    

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java	2010-10-15 20:18:59 UTC (rev 9793)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java	2010-10-16 04:00:15 UTC (rev 9794)
@@ -16,7 +16,6 @@
 import org.hornetq.api.core.Pair;
 import org.hornetq.core.paging.PagedMessage;
 import org.hornetq.core.paging.PagingStore;
-import org.hornetq.core.server.ServerMessage;
 
 /**
  * The provider of Cursor for a given Address
@@ -49,13 +48,13 @@
     * @param queueId The cursorID should be the same as the queueId associated for persistance
     * @return
     */
-   PageCursor getCursor(long queueId);
+   PageCursor getPersistentCursor(long queueId);
    
    /**
     * Create a non persistent cursor, usually associated with browsing
     * @return
     */
-   PageCursor createCursor();
+   PageCursor createNonPersistentCursor();
 
    Pair<PagePosition, PagedMessage> getNext(PageCursor cursor, PagePosition pos) throws Exception;
    
@@ -65,6 +64,8 @@
 
    void stop();
 
+   void scheduleCleanup();
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/LivePageCacheImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/LivePageCacheImpl.java	2010-10-15 20:18:59 UTC (rev 9793)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/LivePageCacheImpl.java	2010-10-16 04:00:15 UTC (rev 9794)
@@ -63,6 +63,11 @@
    {
       return page;
    }
+   
+   public long getPageId()
+   {
+      return page.getPageId();
+   }
 
    /* (non-Javadoc)
     * @see org.hornetq.core.paging.cursor.PageCache#getNumberOfMessages()

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCacheImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCacheImpl.java	2010-10-15 20:18:59 UTC (rev 9793)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCacheImpl.java	2010-10-16 04:00:15 UTC (rev 9794)
@@ -81,6 +81,11 @@
          lock.readLock().unlock();
       }
    }
+   
+   public long getPageId()
+   {
+      return page.getPageId();
+   }
 
    public void lock()
    {

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java	2010-10-15 20:18:59 UTC (rev 9793)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java	2010-10-16 04:00:15 UTC (rev 9794)
@@ -189,8 +189,26 @@
       installTXCallback(tx, position);
 
    }
+   
 
    /* (non-Javadoc)
+    * @see org.hornetq.core.paging.cursor.PageCursor#getFirstPage()
+    */
+   public long getFirstPage()
+   {
+      Long firstKey = consumedPages.firstKey();
+      if (firstKey == null)
+      {
+         return Long.MAX_VALUE;
+      }
+      else
+      {
+         return consumedPages.firstKey();
+      }
+   }
+
+
+   /* (non-Javadoc)
     * @see org.hornetq.core.paging.cursor.PageCursor#returnElement(org.hornetq.core.paging.cursor.PagePosition)
     */
    public synchronized void redeliver(final PagePosition position)
@@ -348,6 +366,11 @@
    {
       if (lastAckedPosition == null || pos.compareTo(lastAckedPosition) > 0)
       {
+         if (lastAckedPosition != null && lastAckedPosition.getPageNr() != pos.getPageNr())
+         {
+            // there's a different page being acked, we will do the check right away
+            scheduleCleanupCheck();
+         }
          this.lastAckedPosition = pos;
       }
       PageCursorInfo info = getPageInfo(pos);
@@ -386,6 +409,11 @@
     */
    private void onPageDone(final PageCursorInfo info)
    {
+      scheduleCleanupCheck();
+   }
+
+   private void scheduleCleanupCheck()
+   {
       executor.execute(new Runnable()
       {
 
@@ -458,18 +486,27 @@
          @Override
          public void afterCommit(final Transaction tx)
          {
-            synchronized (PageCursorImpl.this)
+            executor.execute(new Runnable()
             {
-               for (PageCursorInfo completePage : completedPages)
+               
+               public void run()
                {
-                  if (isTrace)
+                  synchronized (PageCursorImpl.this)
                   {
-                     PageCursorImpl.trace("Removing page " + completePage.getPageId());
+                     for (PageCursorInfo completePage : completedPages)
+                     {
+                        if (isTrace)
+                        {
+                           PageCursorImpl.trace("Removing page " + completePage.getPageId());
+                        }
+                        System.out.println("Removing page " + completePage.getPageId());
+                        consumedPages.remove(completePage.getPageId());
+                     }
                   }
-                  System.out.println("Removing page " + completePage.getPageId());
-                  consumedPages.remove(completePage.getPageId());
+                  
+                  cursorProvider.scheduleCleanup();
                }
-            }
+            });
          }
       });
 

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java	2010-10-15 20:18:59 UTC (rev 9793)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java	2010-10-16 04:00:15 UTC (rev 9794)
@@ -13,8 +13,10 @@
 
 package org.hornetq.core.paging.cursor.impl;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executor;
 
 import org.hornetq.api.core.Pair;
 import org.hornetq.core.logging.Logger;
@@ -28,6 +30,8 @@
 import org.hornetq.core.paging.cursor.PageCursorProvider;
 import org.hornetq.core.paging.cursor.PagePosition;
 import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.utils.ConcurrentHashSet;
+import org.hornetq.utils.ConcurrentSet;
 import org.hornetq.utils.ExecutorFactory;
 import org.hornetq.utils.SoftValueHashMap;
 import org.jboss.netty.util.internal.ConcurrentHashMap;
@@ -51,16 +55,20 @@
    // Attributes ----------------------------------------------------
 
    private final PagingStore pagingStore;
-   
+
    private final PagingManager pagingManager;
 
    private final StorageManager storageManager;
 
    private final ExecutorFactory executorFactory;
 
+   private final Executor executor;
+
    private SoftValueHashMap<Long, PageCache> softCache = new SoftValueHashMap<Long, PageCache>();
 
    private ConcurrentMap<Long, PageCursor> activeCursors = new ConcurrentHashMap<Long, PageCursor>();
+   
+   private ConcurrentSet<PageCursor> nonPersistentCursors = new ConcurrentHashSet<PageCursor>();
 
    // Static --------------------------------------------------------
 
@@ -74,6 +82,7 @@
       this.pagingManager = pagingStore.getPagingManager();
       this.storageManager = storageManager;
       this.executorFactory = executorFactory;
+      this.executor = executorFactory.getExecutor();
    }
 
    // Public --------------------------------------------------------
@@ -86,7 +95,7 @@
    /* (non-Javadoc)
     * @see org.hornetq.core.paging.cursor.PageCursorProvider#createCursor()
     */
-   public PageCursor getCursor(long cursorID)
+   public PageCursor getPersistentCursor(long cursorID)
    {
       PageCursor activeCursor = activeCursors.get(cursorID);
       if (activeCursor == null)
@@ -105,9 +114,11 @@
    /**
     * this will create a non-persistent cursor
     */
-   public PageCursor createCursor()
+   public PageCursor createNonPersistentCursor()
    {
-      return new PageCursorImpl(this, pagingStore, storageManager, executorFactory.getExecutor(), 0);
+      PageCursor cursor = new PageCursorImpl(this, pagingStore, storageManager, executorFactory.getExecutor(), 0);
+      nonPersistentCursors.add(cursor);
+      return cursor;
    }
 
    /* (non-Javadoc)
@@ -116,16 +127,15 @@
    public Pair<PagePosition, PagedMessage> getNext(final PageCursor cursor, PagePosition cursorPos) throws Exception
    {
 
-      while(true)
+      while (true)
       {
          Pair<PagePosition, PagedMessage> retPos = internalAfter(cursorPos);
-         
+
          if (retPos == null)
          {
             return null;
          }
-         else
-         if (retPos != null)
+         else if (retPos != null)
          {
             cursorPos = retPos.a;
             if (retPos.b.getTransactionID() != 0)
@@ -133,7 +143,9 @@
                PageTransactionInfo tx = pagingManager.getTransaction(retPos.b.getTransactionID());
                if (tx == null)
                {
-                  log.warn("Couldn't locate page transaction " + retPos.b.getTransactionID() + ", ignoring message on position " + retPos.a);
+                  log.warn("Couldn't locate page transaction " + retPos.b.getTransactionID() +
+                           ", ignoring message on position " +
+                           retPos.a);
                   cursor.positionIgnored(cursorPos);
                }
                else
@@ -151,7 +163,7 @@
          }
       }
    }
-   
+
    private Pair<PagePosition, PagedMessage> internalAfter(final PagePosition pos)
    {
       PagePosition retPos = pos.nextMessage();
@@ -174,9 +186,9 @@
             return null;
          }
       }
-      
+
       PagedMessage serverMessage = cache.getMessage(retPos.getMessageNr());
-      
+
       if (serverMessage != null)
       {
          return new Pair<PagePosition, PagedMessage>(retPos, cache.getMessage(retPos.getMessageNr()));
@@ -213,11 +225,10 @@
       }
       return cache;
    }
-   
+
    public synchronized void addPageCache(PageCache cache)
    {
-      // TODO: remove the type cast here
-      softCache.put((long)cache.getPage().getPageId(), cache);
+      softCache.put(cache.getPageId(), cache);
    }
 
    public synchronized int getCacheSize()
@@ -233,20 +244,53 @@
       }
    }
 
-
    public void stop()
    {
       for (PageCursor cursor : activeCursors.values())
       {
          cursor.stop();
       }
-      
+
       activeCursors.clear();
    }
-   
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.paging.cursor.PageCursorProvider#scheduleCleanup()
+    */
+   public void scheduleCleanup()
+   {
+      executor.execute(new Runnable()
+      {
+         public void run()
+         {
+            cleanup();
+         }
+      });
+   }
+
+   private void cleanup()
+   {
+      long minPage = getMinPageInUse();
+
+      try
+      {
+         System.out.println("MinPage = " + minPage + " firstPage = "  + pagingStore.getFirstPage());
+         for (long i = pagingStore.getFirstPage(); i < minPage; i++)
+         {
+            Page page = pagingStore.depage();
+            System.out.println("Deleting files associated with page " + page);
+            page.delete();
+         }
+      }
+      catch (Exception e)
+      {
+         log.warn("Couldn't complete cleanup on paging", e);
+      }
+   }
+
    public void printDebug()
    {
-      for (PageCache cache: softCache.values())
+      for (PageCache cache : softCache.values())
       {
          System.out.println("Cache " + cache);
       }
@@ -264,6 +308,37 @@
 
    // Private -------------------------------------------------------
 
+   private long getMinPageInUse()
+   {
+      ArrayList<PageCursor> cursorList = new ArrayList<PageCursor>();
+      synchronized (this)
+      {
+         cursorList.addAll(activeCursors.values());
+         cursorList.addAll(nonPersistentCursors);
+      }
+      
+      if (cursorList.size() == 0)
+      {
+         return 0l;
+      }
+
+      long minPage = Long.MAX_VALUE;
+      
+      System.out.println("CursorList : " + cursorList.size());
+
+      for (PageCursor cursor : cursorList)
+      {
+         long firstPage = cursor.getFirstPage();
+         if (firstPage < minPage)
+         {
+            minPage = firstPage;
+         }
+      }
+
+      return minPage;
+
+   }
+
    private PageCache getPageCache(final long pageId)
    {
       try

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/TestSupportPageStore.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/TestSupportPageStore.java	2010-10-15 20:18:59 UTC (rev 9793)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/TestSupportPageStore.java	2010-10-16 04:00:15 UTC (rev 9794)
@@ -23,17 +23,6 @@
  */
 public interface TestSupportPageStore extends PagingStore
 {
-   /** 
-    * Remove the first page from the Writing Queue.
-    * The file will still exist until Page.delete is called, 
-    * So, case the system is reloaded the same Page will be loaded back if delete is not called.
-    *
-    * @throws Exception
-    * 
-    * Note: This should still be part of the interface, even though HornetQ only uses through the 
-    */
-   Page depage() throws Exception;
-
    void forceAnotherPage() throws Exception;
 
    /** @return true if paging was started, or false if paging was already started before this call */

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2010-10-15 20:18:59 UTC (rev 9793)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2010-10-16 04:00:15 UTC (rev 9794)
@@ -1028,7 +1028,7 @@
                {
                   SimpleString address = queueInfo.getAddress();
                   PagingStore store = pagingManager.getPageStore(address);
-                  PageCursor cursor = store.getCursorProvier().getCursor(encoding.queueID);
+                  PageCursor cursor = store.getCursorProvier().getPersistentCursor(encoding.queueID);
                   cursor.reloadACK(encoding.position);
                }
                else

Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java	2010-10-15 20:18:59 UTC (rev 9793)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java	2010-10-16 04:00:15 UTC (rev 9794)
@@ -106,7 +106,7 @@
    public void testSimpleCursor() throws Exception
    {
 
-      final int NUM_MESSAGES = 1000;
+      final int NUM_MESSAGES = 100;
 
       int numberOfPages = addMessages(NUM_MESSAGES, 1024 * 1024);
 
@@ -114,7 +114,7 @@
 
       PageCursorProviderImpl cursorProvider = new PageCursorProviderImpl(lookupPageStore(ADDRESS), server.getStorageManager(), server.getExecutorFactory());
       
-      PageCursor cursor = cursorProvider.createCursor();
+      PageCursor cursor = cursorProvider.createNonPersistentCursor();
       
       Pair<PagePosition, PagedMessage> msg;
       
@@ -162,7 +162,7 @@
       PageCursorProviderImpl cursorProvider = (PageCursorProviderImpl)this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier();
       
       
-      PageCursor cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getCursor(queue.getID());
+      PageCursor cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
  
       PageCache firstPage = cursorProvider.getPageCache(new PagePositionImpl(server.getPagingManager().getPageStore(ADDRESS).getFirstPage(), 0));
 
@@ -194,7 +194,7 @@
       
       server.start();
       
-      cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getCursor(queue.getID());
+      cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
       
       for (int i = firstPageSize; i < NUM_MESSAGES; i++)
       {
@@ -227,7 +227,7 @@
       PageCursorProvider cursorProvider = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier();
       System.out.println("cursorProvider = " + cursorProvider);
       
-      PageCursor cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getCursor(queue.getID());
+      PageCursor cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
       
       System.out.println("Cursor: " + cursor);
       for (int i = 0 ; i < 100 ; i++)
@@ -246,7 +246,7 @@
       
       server.start();
       
-      cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getCursor(queue.getID());
+      cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
       
       for (int i = 10; i <= 20; i++)
       {
@@ -277,7 +277,7 @@
       PageCursorProvider cursorProvider = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier();
       System.out.println("cursorProvider = " + cursorProvider);
       
-      PageCursor cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getCursor(queue.getID());
+      PageCursor cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
       
       System.out.println("Cursor: " + cursor);
       
@@ -300,7 +300,7 @@
       
       server.start();
       
-      cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getCursor(queue.getID());
+      cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
 
       tx = new TransactionImpl(server.getStorageManager(), 60 * 1000);
       
@@ -336,7 +336,7 @@
       PageCursorProvider cursorProvider = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier();
       System.out.println("cursorProvider = " + cursorProvider);
       
-      PageCursor cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getCursor(queue.getID());
+      PageCursor cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
       
       System.out.println("Cursor: " + cursor);
 
@@ -382,7 +382,7 @@
       PageCursorProvider cursorProvider = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier();
       System.out.println("cursorProvider = " + cursorProvider);
        
-      PageCursor cursor = pageStore.getCursorProvier().getCursor(queue.getID());
+      PageCursor cursor = pageStore.getCursorProvier().getPersistentCursor(queue.getID());
       
       System.out.println("Cursor: " + cursor);
       
@@ -465,21 +465,6 @@
       pageStore.page(messages, pgParameter.getTransactionID());
    }
    
-   public void testRollbackScenariosOnACK() throws Exception
-   {
-      
-   }
-   
-   public void testReadRolledBackData() throws Exception
-   {
-      
-   }
-   
-   public void testRedeliveryScenarios() throws Exception
-   {
-      
-   }
-   
    public void testCleanupScenarios() throws Exception
    {
       // Validate the pages are being cleared (with multiple cursors)



More information about the hornetq-commits mailing list