[hornetq-commits] JBoss hornetq SVN: r9809 - in branches/Branch_New_Paging: src/main/org/hornetq/core/paging/cursor/impl and 2 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Oct 22 15:40:19 EDT 2010


Author: clebert.suconic at jboss.com
Date: 2010-10-22 15:40:19 -0400 (Fri, 22 Oct 2010)
New Revision: 9809

Modified:
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
   branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
Log:
Adding filter on cursor

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java	2010-10-22 01:08:55 UTC (rev 9808)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java	2010-10-22 19:40:19 UTC (rev 9809)
@@ -14,6 +14,7 @@
 package org.hornetq.core.paging.cursor;
 
 import org.hornetq.api.core.Pair;
+import org.hornetq.core.filter.Filter;
 import org.hornetq.core.paging.PagedMessage;
 import org.hornetq.core.paging.PagingStore;
 
@@ -50,11 +51,13 @@
     */
    PageCursor getPersistentCursor(long queueId);
    
+   PageCursor createPersistentCursor(long queueId, Filter filter);
+   
    /**
     * Create a non persistent cursor, usually associated with browsing
     * @return
     */
-   PageCursor createNonPersistentCursor();
+   PageCursor createNonPersistentCursor(Filter filter);
 
    Pair<PagePosition, PagedMessage> getNext(PageCursor cursor, PagePosition pos) throws Exception;
    

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java	2010-10-22 01:08:55 UTC (rev 9808)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java	2010-10-22 19:40:19 UTC (rev 9809)
@@ -28,6 +28,7 @@
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.hornetq.api.core.Pair;
+import org.hornetq.core.filter.Filter;
 import org.hornetq.core.journal.IOAsyncTask;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.paging.PagedMessage;
@@ -70,6 +71,8 @@
    private final StorageManager store;
 
    private final long cursorId;
+   
+   private final Filter filter;
 
    private final PagingStore pageStore;
 
@@ -96,6 +99,7 @@
                          final PagingStore pageStore,
                          final StorageManager store,
                          final Executor executor,
+                         final Filter filter,
                          final long cursorId)
    {
       this.pageStore = pageStore;
@@ -103,6 +107,7 @@
       this.cursorProvider = cursorProvider;
       this.cursorId = cursorId;
       this.executor = executor;
+      this.filter = filter;
    }
 
    // Public --------------------------------------------------------
@@ -472,8 +477,14 @@
 
    protected boolean match(final ServerMessage message)
    {
-      // To be used with expressions
-      return true;
+      if (filter == null)
+      {
+         return true;
+      }
+      else
+      {
+         return filter.match(message);
+      }
    }
 
    // Private -------------------------------------------------------

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java	2010-10-22 01:08:55 UTC (rev 9808)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java	2010-10-22 19:40:19 UTC (rev 9809)
@@ -19,6 +19,7 @@
 import java.util.concurrent.Executor;
 
 import org.hornetq.api.core.Pair;
+import org.hornetq.core.filter.Filter;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.paging.Page;
 import org.hornetq.core.paging.PageTransactionInfo;
@@ -92,32 +93,34 @@
    {
       return pagingStore;
    }
+   
+   public PageCursor createPersistentCursor(long cursorID, Filter filter)
+   {
+      PageCursor activeCursor = activeCursors.get(cursorID);
+      if (activeCursor != null)
+      {
+         throw new IllegalStateException("Cursor " + cursorID + " had already been created");
+      }
+      
+      activeCursor = new PageCursorImpl(this, pagingStore, storageManager, executorFactory.getExecutor(), filter, cursorID);
+      activeCursors.put(cursorID, activeCursor);
+      return activeCursor;
+   }
 
    /* (non-Javadoc)
     * @see org.hornetq.core.paging.cursor.PageCursorProvider#createCursor()
     */
    public PageCursor getPersistentCursor(long cursorID)
    {
-      PageCursor activeCursor = activeCursors.get(cursorID);
-      if (activeCursor == null)
-      {
-         activeCursor = new PageCursorImpl(this, pagingStore, storageManager, executorFactory.getExecutor(), cursorID);
-         PageCursor previousValue = activeCursors.putIfAbsent(cursorID, activeCursor);
-         if (previousValue != null)
-         {
-            activeCursor = previousValue;
-         }
-      }
-
-      return activeCursor;
+      return activeCursors.get(cursorID);
    }
 
    /**
     * this will create a non-persistent cursor
     */
-   public PageCursor createNonPersistentCursor()
+   public PageCursor createNonPersistentCursor(Filter filter)
    {
-      PageCursor cursor = new PageCursorImpl(this, pagingStore, storageManager, executorFactory.getExecutor(), 0);
+      PageCursor cursor = new PageCursorImpl(this, pagingStore, storageManager, executorFactory.getExecutor(), filter, 0);
       nonPersistentCursors.add(cursor);
       return cursor;
    }

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2010-10-22 01:08:55 UTC (rev 9808)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2010-10-22 19:40:19 UTC (rev 9809)
@@ -21,8 +21,8 @@
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+	import java.util.Map.Entry;
 import java.util.Set;
-import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -61,7 +61,6 @@
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.management.impl.HornetQServerControlImpl;
 import org.hornetq.core.paging.PagingManager;
-import org.hornetq.core.paging.cursor.PageCursorProvider;
 import org.hornetq.core.paging.impl.PagingManagerImpl;
 import org.hornetq.core.paging.impl.PagingStoreFactoryNIO;
 import org.hornetq.core.persistence.GroupingInfo;
@@ -1214,6 +1213,8 @@
 
          managementService.registerAddress(queueBindingInfo.getAddress());
          managementService.registerQueue(queue, queueBindingInfo.getAddress(), storageManager);
+         
+         pagingManager.getPageStore(queueBindingInfo.getAddress()).getCursorProvier().createPersistentCursor(queue.getID(), filter);
       }
 
       for (GroupingInfo groupingInfo : groupingInfos)

Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java	2010-10-22 01:08:55 UTC (rev 9808)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java	2010-10-22 19:40:19 UTC (rev 9809)
@@ -23,6 +23,7 @@
 import org.hornetq.api.core.Pair;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.core.config.Configuration;
+import org.hornetq.core.filter.Filter;
 import org.hornetq.core.paging.PageTransactionInfo;
 import org.hornetq.core.paging.PagedMessage;
 import org.hornetq.core.paging.cursor.PageCache;
@@ -63,7 +64,7 @@
    private SimpleString ADDRESS = new SimpleString("test-add");
 
    private HornetQServer server;
-   
+
    private Queue queue;
 
    private static final int PAGE_MAX = -1;
@@ -86,7 +87,9 @@
 
       System.out.println("NumberOfPages = " + numberOfPages);
 
-      PageCursorProviderImpl cursorProvider = new PageCursorProviderImpl(lookupPageStore(ADDRESS), server.getStorageManager(), server.getExecutorFactory());
+      PageCursorProviderImpl cursorProvider = new PageCursorProviderImpl(lookupPageStore(ADDRESS),
+                                                                         server.getStorageManager(),
+                                                                         server.getExecutorFactory());
 
       for (int i = 0; i < numberOfPages; i++)
       {
@@ -94,15 +97,14 @@
          System.out.println("Page " + i + " had " + cache.getNumberOfMessages() + " messages");
 
       }
-      
+
       forceGC();
-      
+
       assertTrue(cursorProvider.getCacheSize() < numberOfPages);
-      
+
       System.out.println("Cache size = " + cursorProvider.getCacheSize());
    }
 
-
    public void testSimpleCursor() throws Exception
    {
 
@@ -111,11 +113,11 @@
       int numberOfPages = addMessages(NUM_MESSAGES, 1024 * 1024);
 
       System.out.println("NumberOfPages = " + numberOfPages);
-      
+
       PageCursor cursor = createNonPersistentCursor();
-      
+
       Pair<PagePosition, PagedMessage> msg;
-      
+
       int key = 0;
       while ((msg = cursor.moveNext()) != null)
       {
@@ -123,20 +125,104 @@
          cursor.ack(msg.a);
       }
       assertEquals(NUM_MESSAGES, key);
-      
-      
+
       forceGC();
-      
+
       assertTrue(lookupCursorProvider().getCacheSize() < numberOfPages);
-      
+
       server.stop();
       createServer();
       assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
 
    }
 
+   public void testSimpleCursorWithFilter() throws Exception
+   {
 
+      final int NUM_MESSAGES = 100;
 
+      int numberOfPages = addMessages(NUM_MESSAGES, 1024 * 1024);
+
+      System.out.println("NumberOfPages = " + numberOfPages);
+
+      PageCursor cursorEven = createNonPersistentCursor(new Filter()
+      {
+
+         public boolean match(ServerMessage message)
+         {
+            Boolean property = message.getBooleanProperty("even");
+            if (property == null)
+            {
+               return false;
+            }
+            else
+            {
+               return property.booleanValue();
+            }
+         }
+
+         public SimpleString getFilterString()
+         {
+            return new SimpleString("even=true");
+         }
+
+      });
+
+      PageCursor cursorOdd = createNonPersistentCursor(new Filter()
+      {
+
+         public boolean match(ServerMessage message)
+         {
+            Boolean property = message.getBooleanProperty("even");
+            if (property == null)
+            {
+               return false;
+            }
+            else
+            {
+               return !property.booleanValue();
+            }
+         }
+
+         public SimpleString getFilterString()
+         {
+            return new SimpleString("even=true");
+         }
+
+      });
+
+      Pair<PagePosition, PagedMessage> msg;
+
+      int key = 0;
+      while ((msg = cursorEven.moveNext()) != null)
+      {
+         assertEquals(key, msg.b.getMessage().getIntProperty("key").intValue());
+         assertTrue(msg.b.getMessage().getBooleanProperty("even").booleanValue());
+         key += 2;
+         cursorEven.ack(msg.a);
+      }
+      assertEquals(NUM_MESSAGES, key);
+
+      key = 1;
+      while ((msg = cursorOdd.moveNext()) != null)
+      {
+         assertEquals(key, msg.b.getMessage().getIntProperty("key").intValue());
+         assertFalse(msg.b.getMessage().getBooleanProperty("even").booleanValue());
+         key += 2;
+         cursorOdd.ack(msg.a);
+      }
+      assertEquals(NUM_MESSAGES + 1, key);
+
+      forceGC();
+
+      assertTrue(lookupCursorProvider().getCacheSize() < numberOfPages);
+
+      server.stop();
+      createServer();
+      assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
+
+   }
+
    public void testReadNextPage() throws Exception
    {
 
@@ -147,196 +233,226 @@
       System.out.println("NumberOfPages = " + numberOfPages);
 
       PageCursorProvider cursorProvider = lookupCursorProvider();
-      
-      PageCache cache = cursorProvider.getPageCache(new PagePositionImpl(2,0));
-      
+
+      PageCache cache = cursorProvider.getPageCache(new PagePositionImpl(2, 0));
+
       assertNull(cache);
    }
 
-   
-   
    public void testRestart() throws Exception
    {
       final int NUM_MESSAGES = 1000;
 
       int numberOfPages = addMessages(NUM_MESSAGES, 100 * 1024);
-      
+
       System.out.println("Number of pages = " + numberOfPages);
-      
+
       PageCursorProvider cursorProvider = lookupCursorProvider();
-      
-      
-      PageCursor cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
- 
-      PageCache firstPage = cursorProvider.getPageCache(new PagePositionImpl(server.getPagingManager().getPageStore(ADDRESS).getFirstPage(), 0));
 
+      // TODO: We should be using getPersisentCursor here but I can't change the method here until createQueue is not
+      // creating the cursor also
+      // need to change this after some integration
+      // PageCursor cursor =
+      // this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
+      PageCursor cursor = this.server.getPagingManager()
+                                     .getPageStore(ADDRESS)
+                                     .getCursorProvier()
+                                     .createPersistentCursor(queue.getID(), null);
+
+      PageCache firstPage = cursorProvider.getPageCache(new PagePositionImpl(server.getPagingManager()
+                                                                                   .getPageStore(ADDRESS)
+                                                                                   .getFirstPage(), 0));
+
       int firstPageSize = firstPage.getNumberOfMessages();
-      
+
       firstPage = null;
-      
+
       System.out.println("Cursor: " + cursor);
       cursorProvider.printDebug();
 
-      for (int i = 0 ; i < 1000 ; i++)
+      for (int i = 0; i < 1000; i++)
       {
-         Pair<PagePosition, PagedMessage> msg =  cursor.moveNext();
+         Pair<PagePosition, PagedMessage> msg = cursor.moveNext();
          assertNotNull(msg);
          assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
-         
+
          if (i < firstPageSize)
          {
             cursor.ack(msg.a);
          }
       }
       cursorProvider.printDebug();
-     
+
       // needs to clear the context since we are using the same thread over two distinct servers
       // otherwise we will get the old executor on the factory
       OperationContextImpl.clearContext();
-      
+
       server.stop();
-      
+
       server.start();
-      
-      cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
-      
+
+      cursor = this.server.getPagingManager()
+                          .getPageStore(ADDRESS)
+                          .getCursorProvier()
+                          .getPersistentCursor(queue.getID());
+
       for (int i = firstPageSize; i < NUM_MESSAGES; i++)
       {
          System.out.println("Received " + i);
-         Pair<PagePosition, PagedMessage> msg =  cursor.moveNext();
+         Pair<PagePosition, PagedMessage> msg = cursor.moveNext();
          assertNotNull(msg);
          assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
-         
+
          cursor.ack(msg.a);
-         
+
          OperationContextImpl.getContext(null).waitCompletion();
-         
+
       }
 
-      OperationContextImpl.getContext(null).waitCompletion(); 
+      OperationContextImpl.getContext(null).waitCompletion();
       ((PageCursorImpl)cursor).printDebug();
-      
+
       server.stop();
       createServer();
       assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
 
    }
-   
+
    public void testRestartWithHoleOnAck() throws Exception
    {
 
       final int NUM_MESSAGES = 1000;
 
       int numberOfPages = addMessages(NUM_MESSAGES, 10 * 1024);
-      
+
       System.out.println("Number of pages = " + numberOfPages);
-      
+
       PageCursorProvider cursorProvider = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier();
       System.out.println("cursorProvider = " + cursorProvider);
-      
-      PageCursor cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
-      
+
+      // TODO: We should be using getPersisentCursor here but I can't change the method here until createQueue is not
+      // creating the cursor also
+      // need to change this after some integration
+      // PageCursor cursor =
+      // this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
+      PageCursor cursor = this.server.getPagingManager()
+                                     .getPageStore(ADDRESS)
+                                     .getCursorProvier()
+                                     .createPersistentCursor(queue.getID(), null);
+
       System.out.println("Cursor: " + cursor);
-      for (int i = 0 ; i < 100 ; i++)
+      for (int i = 0; i < 100; i++)
       {
-         Pair<PagePosition, PagedMessage> msg =  cursor.moveNext();
+         Pair<PagePosition, PagedMessage> msg = cursor.moveNext();
          assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
          if (i < 10 || i > 20)
          {
             cursor.ack(msg.a);
          }
       }
-      
+
       server.stop();
-      
+
       OperationContextImpl.clearContext();
-      
+
       server.start();
-      
-      cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
-      
+
+      cursor = this.server.getPagingManager()
+                          .getPageStore(ADDRESS)
+                          .getCursorProvier()
+                          .getPersistentCursor(queue.getID());
+
       for (int i = 10; i <= 20; i++)
       {
-         Pair<PagePosition, PagedMessage> msg =  cursor.moveNext();
+         Pair<PagePosition, PagedMessage> msg = cursor.moveNext();
          assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
          cursor.ack(msg.a);
       }
-    
-      
+
       for (int i = 100; i < NUM_MESSAGES; i++)
       {
-         Pair<PagePosition, PagedMessage> msg =  cursor.moveNext();
+         Pair<PagePosition, PagedMessage> msg = cursor.moveNext();
          assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
          cursor.ack(msg.a);
       }
-      
+
       server.stop();
       createServer();
       assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
 
    }
-   
-   
+
    public void testRestartWithHoleOnAckAndTransaction() throws Exception
    {
       final int NUM_MESSAGES = 1000;
 
       int numberOfPages = addMessages(NUM_MESSAGES, 10 * 1024);
-      
+
       System.out.println("Number of pages = " + numberOfPages);
-      
+
       PageCursorProvider cursorProvider = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier();
       System.out.println("cursorProvider = " + cursorProvider);
-      
-      PageCursor cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
-      
+
+      // TODO: We should be using getPersisentCursor here but I can't change the method here until createQueue is not
+      // creating the cursor also
+      // need to change this after some integration
+      // PageCursor cursor =
+      // this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
+      PageCursor cursor = this.server.getPagingManager()
+                                     .getPageStore(ADDRESS)
+                                     .getCursorProvier()
+                                     .createPersistentCursor(queue.getID(), null);
+
       System.out.println("Cursor: " + cursor);
-      
+
       Transaction tx = new TransactionImpl(server.getStorageManager(), 60 * 1000);
-      for (int i = 0 ; i < 100 ; i++)
+      for (int i = 0; i < 100; i++)
       {
-         Pair<PagePosition, PagedMessage> msg =  cursor.moveNext();
+         Pair<PagePosition, PagedMessage> msg = cursor.moveNext();
          assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
          if (i < 10 || i > 20)
          {
             cursor.ackTx(tx, msg.a);
          }
       }
-      
+
       tx.commit();
-      
+
       server.stop();
-      
+
       OperationContextImpl.clearContext();
-      
+
       server.start();
-      
-      cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
 
+      cursor = this.server.getPagingManager()
+                          .getPageStore(ADDRESS)
+                          .getCursorProvier()
+                          .getPersistentCursor(queue.getID());
+
       tx = new TransactionImpl(server.getStorageManager(), 60 * 1000);
-      
+
       for (int i = 10; i <= 20; i++)
       {
-         Pair<PagePosition, PagedMessage> msg =  cursor.moveNext();
+         Pair<PagePosition, PagedMessage> msg = cursor.moveNext();
          assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
-         cursor.ackTx(tx,msg.a);
+         cursor.ackTx(tx, msg.a);
       }
-    
+
       for (int i = 100; i < NUM_MESSAGES; i++)
       {
-         Pair<PagePosition, PagedMessage> msg =  cursor.moveNext();
+         Pair<PagePosition, PagedMessage> msg = cursor.moveNext();
          assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
-         cursor.ackTx(tx,msg.a);
+         cursor.ackTx(tx, msg.a);
       }
-      
+
       tx.commit();
-      
+
       server.stop();
       createServer();
       assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
-      
+
    }
-   
+
    public void testConsumeLivePage() throws Exception
    {
       PagingStoreImpl pageStore = lookupPageStore(ADDRESS);
@@ -344,117 +460,131 @@
       pageStore.startPaging();
 
       final int NUM_MESSAGES = 100;
-      
+
       final int messageSize = 1024 * 1024;
-      
-      
+
       PageCursorProvider cursorProvider = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier();
       System.out.println("cursorProvider = " + cursorProvider);
-      
-      PageCursor cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
-      
+
+      // TODO: We should be using getPersisentCursor here but I can't change the method here until createQueue is not
+      // creating the cursor also
+      // need to change this after some integration
+      // PageCursor cursor =
+      // this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
+      PageCursor cursor = this.server.getPagingManager()
+                                     .getPageStore(ADDRESS)
+                                     .getCursorProvier()
+                                     .createPersistentCursor(queue.getID(), null);
+
       System.out.println("Cursor: " + cursor);
 
-      
       for (int i = 0; i < NUM_MESSAGES; i++)
       {
-         if (i % 100 == 0) System.out.println("Paged " + i);
-         
+         if (i % 100 == 0)
+            System.out.println("Paged " + i);
+
          HornetQBuffer buffer = RandomUtil.randomBuffer(messageSize, i + 1l);
 
          ServerMessage msg = new ServerMessageImpl(i, buffer.writerIndex());
          msg.putIntProperty("key", i);
-         
+
          msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
 
          Assert.assertTrue(pageStore.page(msg));
-         
+
          Pair<PagePosition, PagedMessage> readMessage = cursor.moveNext();
-         
+
          assertNotNull(readMessage);
-         
+
          assertEquals(i, readMessage.b.getMessage().getIntProperty("key").intValue());
-         
+
          assertNull(cursor.moveNext());
       }
-      
+
       server.stop();
-      
+
       OperationContextImpl.clearContext();
-      
+
       createServer();
-      
+
       pageStore = lookupPageStore(ADDRESS);
-      
-      cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
-      
+
+      cursor = this.server.getPagingManager()
+                          .getPageStore(ADDRESS)
+                          .getCursorProvier()
+                          .getPersistentCursor(queue.getID());
+
       for (int i = 0; i < NUM_MESSAGES * 2; i++)
       {
-         if (i % 100 == 0) System.out.println("Paged " + i);
+         if (i % 100 == 0)
+            System.out.println("Paged " + i);
 
          if (i >= NUM_MESSAGES)
          {
-            
+
             HornetQBuffer buffer = RandomUtil.randomBuffer(messageSize, i + 1l);
-   
+
             ServerMessage msg = new ServerMessageImpl(i, buffer.writerIndex());
             msg.putIntProperty("key", i);
-            
+
             msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
-   
+
             Assert.assertTrue(pageStore.page(msg));
          }
-         
+
          Pair<PagePosition, PagedMessage> readMessage = cursor.moveNext();
-         
+
          assertNotNull(readMessage);
-         
+
          assertEquals(i, readMessage.b.getMessage().getIntProperty("key").intValue());
       }
 
       server.stop();
-      
+
       OperationContextImpl.clearContext();
-      
+
       createServer();
-      
+
       pageStore = lookupPageStore(ADDRESS);
-      
-      cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
-      
+
+      cursor = this.server.getPagingManager()
+                          .getPageStore(ADDRESS)
+                          .getCursorProvier()
+                          .getPersistentCursor(queue.getID());
+
       for (int i = 0; i < NUM_MESSAGES * 3; i++)
       {
-         if (i % 100 == 0) System.out.println("Paged " + i);
+         if (i % 100 == 0)
+            System.out.println("Paged " + i);
 
          if (i >= NUM_MESSAGES * 2)
          {
-            
+
             HornetQBuffer buffer = RandomUtil.randomBuffer(messageSize, i + 1l);
-   
+
             ServerMessage msg = new ServerMessageImpl(i, buffer.writerIndex());
             msg.putIntProperty("key", i);
-            
+
             msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
-   
+
             Assert.assertTrue(pageStore.page(msg));
          }
-         
+
          Pair<PagePosition, PagedMessage> readMessage = cursor.moveNext();
-         
+
          assertNotNull(readMessage);
-         
+
          cursor.ack(readMessage.a);
-         
+
          assertEquals(i, readMessage.b.getMessage().getIntProperty("key").intValue());
       }
 
       server.stop();
       createServer();
       assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
- 
+
    }
-   
-   
+
    public void testPrepareScenarios() throws Exception
    {
       PagingStoreImpl pageStore = lookupPageStore(ADDRESS);
@@ -462,40 +592,46 @@
       pageStore.startPaging();
 
       final int NUM_MESSAGES = 100;
-      
+
       final int messageSize = 100 * 1024;
-      
-      
+
       PageCursorProvider cursorProvider = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier();
       System.out.println("cursorProvider = " + cursorProvider);
-       
-      PageCursor cursor = pageStore.getCursorProvier().getPersistentCursor(queue.getID());
-      
+
+      // TODO: We should be using getPersisentCursor here but I can't change the method here until createQueue is not
+      // creating the cursor also
+      // need to change this after some integration
+      // PageCursor cursor =
+      // this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
+      PageCursor cursor = this.server.getPagingManager()
+                                     .getPageStore(ADDRESS)
+                                     .getCursorProvier()
+                                     .createPersistentCursor(queue.getID(), null);
+
       System.out.println("Cursor: " + cursor);
-      
+
       StorageManager storage = this.server.getStorageManager();
-      
+
       PageTransactionInfoImpl pgtxRollback = new PageTransactionInfoImpl(storage.generateUniqueID());
       PageTransactionInfoImpl pgtxForgotten = new PageTransactionInfoImpl(storage.generateUniqueID());
       PageTransactionInfoImpl pgtxCommit = new PageTransactionInfoImpl(storage.generateUniqueID());
-      
+
       System.out.println("Forgetting tx " + pgtxForgotten.getTransactionID());
-      
+
       this.server.getPagingManager().addTransaction(pgtxRollback);
       this.server.getPagingManager().addTransaction(pgtxCommit);
-      
+
       pgMessages(storage, pageStore, pgtxRollback, 0, NUM_MESSAGES, messageSize);
       pageStore.forceAnotherPage();
       pgMessages(storage, pageStore, pgtxForgotten, 100, NUM_MESSAGES, messageSize);
       pageStore.forceAnotherPage();
       pgMessages(storage, pageStore, pgtxCommit, 200, NUM_MESSAGES, messageSize);
       pageStore.forceAnotherPage();
-      
+
       addMessages(300, NUM_MESSAGES, messageSize);
-      
+
       System.out.println("Number of pages - " + pageStore.getNumberOfPages());
 
-
       // First consume what's already there without any tx as nothing was committed
       for (int i = 300; i < 400; i++)
       {
@@ -506,10 +642,10 @@
       }
 
       assertNull(cursor.moveNext());
-      
+
       cursor.printDebug();
       pgtxRollback.rollback();
-      
+
       this.server.getPagingManager().removeTransaction(pgtxRollback.getTransactionID());
       pgtxCommit.commit();
 
@@ -521,16 +657,15 @@
          assertEquals(i, pos.b.getMessage().getIntProperty("key").intValue());
          cursor.ack(pos.a);
       }
-      
+
       assertNull(cursor.moveNext());
-      
+
       server.stop();
       createServer();
       assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
-   
-      
+
    }
-   
+
    public void testCloseNonPersistentConsumer() throws Exception
    {
 
@@ -541,12 +676,12 @@
       System.out.println("NumberOfPages = " + numberOfPages);
 
       PageCursorProvider cursorProvider = lookupCursorProvider();
-      
-      PageCursor cursor = cursorProvider.createNonPersistentCursor();
-      PageCursorImpl cursor2 = (PageCursorImpl)cursorProvider.createNonPersistentCursor();
-      
+
+      PageCursor cursor = cursorProvider.createNonPersistentCursor(null);
+      PageCursorImpl cursor2 = (PageCursorImpl)cursorProvider.createNonPersistentCursor(null);
+
       Pair<PagePosition, PagedMessage> msg;
-      
+
       int key = 0;
       while ((msg = cursor.moveNext()) != null)
       {
@@ -554,18 +689,17 @@
          cursor.ack(msg.a);
       }
       assertEquals(NUM_MESSAGES, key);
-      
-      
+
       forceGC();
-      
+
       assertTrue(cursorProvider.getCacheSize() < numberOfPages);
-      
-      for (int i = 0 ; i < 10; i++)
+
+      for (int i = 0; i < 10; i++)
       {
          msg = cursor2.moveNext();
          assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
       }
-        
+
       assertSame(cursor2.getProvider(), cursorProvider);
 
       cursor2.close();
@@ -575,13 +709,12 @@
       assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
 
    }
-  
-   
+
    public void testLeavePageStateAndRestart() throws Exception
    {
-      // Validate the cursor are working fine when all the pages are gone, and then paging being restarted   
+      // Validate the cursor are working fine when all the pages are gone, and then paging being restarted
    }
-    
+
    public void testFirstMessageInTheMiddle() throws Exception
    {
 
@@ -592,20 +725,20 @@
       System.out.println("NumberOfPages = " + numberOfPages);
 
       PageCursorProvider cursorProvider = lookupCursorProvider();
-      
+
       PageCache cache = cursorProvider.getPageCache(new PagePositionImpl(5, 0));
 
-      PageCursor cursor = cursorProvider.createNonPersistentCursor();
-      PagePosition startingPos = new PagePositionImpl(5, cache.getNumberOfMessages()/2);
+      PageCursor cursor = cursorProvider.createNonPersistentCursor(null);
+      PagePosition startingPos = new PagePositionImpl(5, cache.getNumberOfMessages() / 2);
       cursor.bookmark(startingPos);
       PagedMessage msg = cache.getMessage(startingPos.getMessageNr() + 1);
       msg.initMessage(server.getStorageManager());
       int key = msg.getMessage().getIntProperty("key").intValue();
-      
+
       msg = null;
-      
+
       cache = null;
-      
+
       Pair<PagePosition, PagedMessage> msgCursor = null;
       while ((msgCursor = cursor.moveNext()) != null)
       {
@@ -613,18 +746,16 @@
          cursor.ack(msgCursor.a);
       }
       assertEquals(NUM_MESSAGES, key);
-      
-      
+
       forceGC();
-      
+
       assertTrue(cursorProvider.getCacheSize() < numberOfPages);
-      
+
       server.stop();
       createServer();
       assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
    }
-   
-   
+
    public void testFirstMessageInTheMiddlePersistent() throws Exception
    {
 
@@ -635,35 +766,39 @@
       System.out.println("NumberOfPages = " + numberOfPages);
 
       PageCursorProvider cursorProvider = lookupCursorProvider();
-      
+
       PageCache cache = cursorProvider.getPageCache(new PagePositionImpl(5, 0));
 
-      PageCursor cursor = cursorProvider.getPersistentCursor(queue.getID());
-      PagePosition startingPos = new PagePositionImpl(5, cache.getNumberOfMessages()/2);
+      // TODO: We should be using getPersisentCursor here but I can't change the method here until createQueue is not
+      // creating the cursor also
+      // need to change this after some integration
+      // PageCursor cursor =
+      // this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
+      PageCursor cursor = cursorProvider.createPersistentCursor(queue.getID(), null);
+      PagePosition startingPos = new PagePositionImpl(5, cache.getNumberOfMessages() / 2);
       cursor.bookmark(startingPos);
       PagedMessage msg = cache.getMessage(startingPos.getMessageNr() + 1);
       msg.initMessage(server.getStorageManager());
       int initialKey = msg.getMessage().getIntProperty("key").intValue();
       int key = initialKey;
-      
+
       msg = null;
-      
+
       cache = null;
-      
+
       Pair<PagePosition, PagedMessage> msgCursor = null;
       while ((msgCursor = cursor.moveNext()) != null)
       {
          assertEquals(key++, msgCursor.b.getMessage().getIntProperty("key").intValue());
       }
       assertEquals(NUM_MESSAGES, key);
-      
-      
+
       server.stop();
-      
+
       OperationContextImpl.clearContext();
-      
+
       createServer();
-      
+
       cursorProvider = lookupCursorProvider();
       cursor = cursorProvider.getPersistentCursor(queue.getID());
       key = initialKey;
@@ -672,18 +807,17 @@
          assertEquals(key++, msgCursor.b.getMessage().getIntProperty("key").intValue());
          cursor.ack(msgCursor.a);
       }
-      
-      
+
       forceGC();
-      
+
       assertTrue(cursorProvider.getCacheSize() < numberOfPages);
-      
+
       server.stop();
       createServer();
       assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
 
    }
-   
+
    private int addMessages(final int numMessages, final int messageSize) throws Exception
    {
       return addMessages(0, numMessages, messageSize);
@@ -702,17 +836,20 @@
 
       for (int i = start; i < start + numMessages; i++)
       {
-         if (i % 100 == 0) System.out.println("Paged " + i);
+         if (i % 100 == 0)
+            System.out.println("Paged " + i);
          HornetQBuffer buffer = RandomUtil.randomBuffer(messageSize, i + 1l);
 
          ServerMessage msg = new ServerMessageImpl(i, buffer.writerIndex());
          msg.putIntProperty("key", i);
-         
+         // to be used on tests that are validating filters
+         msg.putBooleanProperty("even", i % 2 == 0);
+
          msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
 
          Assert.assertTrue(pageStore.page(msg));
       }
-      
+
       return pageStore.getNumberOfPages();
    }
 
@@ -734,53 +871,56 @@
       super.setUp();
       OperationContextImpl.clearContext();
       System.out.println("Tmp:" + getTemporaryDir());
-      
+
       createServer();
-
-      //createQueue(ADDRESS.toString(), ADDRESS.toString());
    }
 
-
    /**
     * @throws Exception
     */
    private void createServer() throws Exception
    {
       OperationContextImpl.clearContext();
-      
+
       Configuration config = createDefaultConfig();
-      
+
       config.setJournalSyncNonTransactional(true);
 
-      server = createServer(true,
-                            config,
-                            PAGE_SIZE,
-                            PAGE_MAX,
-                            new HashMap<String, AddressSettings>());
+      server = createServer(true, config, PAGE_SIZE, PAGE_MAX, new HashMap<String, AddressSettings>());
 
       server.start();
-      
+
       try
       {
          queue = server.createQueue(ADDRESS, ADDRESS, null, true, false);
       }
       catch (Exception ignored)
       {
-       }
+      }
    }
+
    /**
     * @return
     * @throws Exception
     */
    private PageCursor createNonPersistentCursor() throws Exception
    {
-      return lookupCursorProvider().createNonPersistentCursor();
+      return lookupCursorProvider().createNonPersistentCursor(null);
    }
 
    /**
     * @return
     * @throws Exception
     */
+   private PageCursor createNonPersistentCursor(Filter filter) throws Exception
+   {
+      return lookupCursorProvider().createNonPersistentCursor(filter);
+   }
+
+   /**
+    * @return
+    * @throws Exception
+    */
    private PageCursorProvider lookupCursorProvider() throws Exception
    {
       return lookupPageStore(ADDRESS).getCursorProvier();
@@ -803,8 +943,8 @@
                            final int messageSize) throws Exception
    {
       List<ServerMessage> messages = new ArrayList<ServerMessage>();
-      
-      for (int i = start ; i < start + NUM_MESSAGES; i++)
+
+      for (int i = start; i < start + NUM_MESSAGES; i++)
       {
          HornetQBuffer buffer = RandomUtil.randomBuffer(messageSize, i + 1l);
          ServerMessage msg = new ServerMessageImpl(storage.generateUniqueID(), buffer.writerIndex());
@@ -812,10 +952,9 @@
          msg.putIntProperty("key", i);
          messages.add(msg);
       }
-      
+
       pageStore.page(messages, pgParameter.getTransactionID());
    }
-   
 
    protected void tearDown() throws Exception
    {



More information about the hornetq-commits mailing list