[hornetq-commits] JBoss hornetq SVN: r9833 - in branches/Branch_New_Paging: src/main/org/hornetq/core/postoffice/impl and 1 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Nov 2 18:44:07 EDT 2010


Author: clebert.suconic at jboss.com
Date: 2010-11-02 18:44:05 -0400 (Tue, 02 Nov 2010)
New Revision: 9833

Modified:
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
   branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
Log:
Fixing tests

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java	2010-11-02 03:33:07 UTC (rev 9832)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java	2010-11-02 22:44:05 UTC (rev 9833)
@@ -297,6 +297,7 @@
             PageCursorInfo info = getPageInfo(message.a, false);
             if (info != null && info.isRemoved(message.a))
             {
+               tmpPosition = message.a;
                valid = false;
             }
          }
@@ -907,8 +908,7 @@
       
       public boolean isRemoved(final PagePosition pos)
       {
-         return false;
-         //return removedReferences.contains(pos);
+         return removedReferences.contains(pos);
       }
       
       public void remove(final PagePosition position)

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2010-11-02 03:33:07 UTC (rev 9832)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2010-11-02 22:44:05 UTC (rev 9833)
@@ -619,7 +619,7 @@
 
          boolean depage = tx.getProperty(TransactionPropertyIndexes.IS_DEPAGE) != null;
 
-         // if the TX paged at least one message on a give address, all the other addresses should also go towards
+         // if the TX paged at least one message on a give address, all the other message on the same address should also go towards
          // paging cache now
          boolean alreadyPaging = false;
 

Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java	2010-11-02 03:33:07 UTC (rev 9832)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java	2010-11-02 22:44:05 UTC (rev 9833)
@@ -15,6 +15,7 @@
 
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 
 import junit.framework.Assert;
@@ -129,9 +130,8 @@
          cursor.ack(msg.a);
       }
       assertEquals(NUM_MESSAGES, key);
-      
+
       server.getStorageManager().waitOnOperations();
-      
 
       waitCleanup();
 
@@ -202,7 +202,7 @@
          }
 
       });
-      
+
       queue.getPageSubscription().close();
 
       Pair<PagePosition, PagedMessage> msg;
@@ -210,7 +210,6 @@
       LinkedListIterator<Pair<PagePosition, PagedMessage>> iteratorEven = cursorEven.iterator();
 
       LinkedListIterator<Pair<PagePosition, PagedMessage>> iteratorOdd = cursorOdd.iterator();
-      
 
       int key = 0;
       while ((msg = iteratorEven.next()) != null)
@@ -235,7 +234,7 @@
 
       forceGC();
 
-   //   assertTrue(lookupCursorProvider().getCacheSize() < numberOfPages);
+      // assertTrue(lookupCursorProvider().getCacheSize() < numberOfPages);
 
       server.stop();
       createServer();
@@ -271,9 +270,9 @@
       PageCursorProvider cursorProvider = lookupCursorProvider();
 
       PageSubscription cursor = this.server.getPagingManager()
-                                     .getPageStore(ADDRESS)
-                                     .getCursorProvier()
-                                     .getSubscription(queue.getID());
+                                           .getPageStore(ADDRESS)
+                                           .getCursorProvier()
+                                           .getSubscription(queue.getID());
 
       PageCache firstPage = cursorProvider.getPageCache(new PagePositionImpl(server.getPagingManager()
                                                                                    .getPageStore(ADDRESS)
@@ -285,7 +284,7 @@
 
       System.out.println("Cursor: " + cursor);
       cursorProvider.printDebug();
-      
+
       LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator = cursor.iterator();
 
       for (int i = 0; i < 1000; i++)
@@ -301,13 +300,10 @@
          }
       }
       cursorProvider.printDebug();
-      
 
       server.getStorageManager().waitOnOperations();
       lookupPageStore(ADDRESS).flushExecutors();
 
-
-
       // needs to clear the context since we are using the same thread over two distinct servers
       // otherwise we will get the old executor on the factory
       OperationContextImpl.clearContext();
@@ -316,13 +312,10 @@
 
       server.start();
 
-      cursor = this.server.getPagingManager()
-                          .getPageStore(ADDRESS)
-                          .getCursorProvier()
-                          .getSubscription(queue.getID());
+      cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getSubscription(queue.getID());
 
       iterator = cursor.iterator();
-      
+
       for (int i = firstPageSize; i < NUM_MESSAGES; i++)
       {
          System.out.println("Received " + i);
@@ -337,9 +330,9 @@
       }
 
       OperationContextImpl.getContext(null).waitCompletion();
-      
+
       lookupPageStore(ADDRESS).flushExecutors();
-      
+
       assertFalse(lookupPageStore(ADDRESS).isPaging());
 
       server.stop();
@@ -362,15 +355,10 @@
       PageCursorProvider cursorProvider = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier();
       System.out.println("cursorProvider = " + cursorProvider);
 
-      // TODO: We should be using getPersisentCursor here but I can't change the method here until createQueue is not
-      // creating the cursor also
-      // need to change this after some integration
-      // PageCursor cursor =
-      // this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
       PageSubscription cursor = this.server.getPagingManager()
-                                     .getPageStore(ADDRESS)
-                                     .getCursorProvier()
-                                     .getSubscription(queue.getID());
+                                           .getPageStore(ADDRESS)
+                                           .getCursorProvier()
+                                           .getSubscription(queue.getID());
 
       System.out.println("Cursor: " + cursor);
       LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator = cursor.iterator();
@@ -390,10 +378,7 @@
 
       server.start();
 
-      cursor = this.server.getPagingManager()
-                          .getPageStore(ADDRESS)
-                          .getCursorProvier()
-                          .getSubscription(queue.getID());
+      cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getSubscription(queue.getID());
       iterator = cursor.iterator();
 
       for (int i = 10; i <= 20; i++)
@@ -428,15 +413,10 @@
       PageCursorProvider cursorProvider = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier();
       System.out.println("cursorProvider = " + cursorProvider);
 
-      // TODO: We should be using getPersisentCursor here but I can't change the method here until createQueue is not
-      // creating the cursor also
-      // need to change this after some integration
-      // PageCursor cursor =
-      // this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
       PageSubscription cursor = this.server.getPagingManager()
-                                     .getPageStore(ADDRESS)
-                                     .getCursorProvier()
-                                     .getSubscription(queue.getID());
+                                           .getPageStore(ADDRESS)
+                                           .getCursorProvier()
+                                           .getSubscription(queue.getID());
 
       System.out.println("Cursor: " + cursor);
 
@@ -462,10 +442,7 @@
 
       server.start();
 
-      cursor = this.server.getPagingManager()
-                          .getPageStore(ADDRESS)
-                          .getCursorProvier()
-                          .getSubscription(queue.getID());
+      cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getSubscription(queue.getID());
 
       tx = new TransactionImpl(server.getStorageManager(), 60 * 1000);
       iterator = cursor.iterator();
@@ -506,15 +483,10 @@
       PageCursorProvider cursorProvider = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier();
       System.out.println("cursorProvider = " + cursorProvider);
 
-      // TODO: We should be using getPersisentCursor here but I can't change the method here until createQueue is not
-      // creating the cursor also
-      // need to change this after some integration
-      // PageCursor cursor =
-      // this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
       PageSubscription cursor = this.server.getPagingManager()
-                                     .getPageStore(ADDRESS)
-                                     .getCursorProvier()
-                                     .getSubscription(queue.getID());
+                                           .getPageStore(ADDRESS)
+                                           .getCursorProvier()
+                                           .getSubscription(queue.getID());
 
       System.out.println("Cursor: " + cursor);
 
@@ -522,8 +494,8 @@
 
       for (int i = 0; i < NUM_MESSAGES; i++)
       {
-         //if (i % 100 == 0)
-            System.out.println("read/written " + i);
+         // if (i % 100 == 0)
+         System.out.println("read/written " + i);
 
          HornetQBuffer buffer = RandomUtil.randomBuffer(messageSize, i + 1l);
 
@@ -551,10 +523,7 @@
 
       pageStore = lookupPageStore(ADDRESS);
 
-      cursor = this.server.getPagingManager()
-                          .getPageStore(ADDRESS)
-                          .getCursorProvier()
-                          .getSubscription(queue.getID());
+      cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getSubscription(queue.getID());
       iterator = cursor.iterator();
 
       for (int i = 0; i < NUM_MESSAGES * 2; i++)
@@ -590,10 +559,7 @@
 
       pageStore = lookupPageStore(ADDRESS);
 
-      cursor = this.server.getPagingManager()
-                          .getPageStore(ADDRESS)
-                          .getCursorProvier()
-                          .getSubscription(queue.getID());
+      cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getSubscription(queue.getID());
       iterator = cursor.iterator();
 
       for (int i = 0; i < NUM_MESSAGES * 3; i++)
@@ -622,26 +588,26 @@
 
          assertEquals(i, readMessage.b.getMessage().getIntProperty("key").intValue());
       }
-      
+
       Pair<PagePosition, PagedMessage> readMessage = iterator.next();
-      
+
       assertEquals(NUM_MESSAGES * 3, readMessage.b.getMessage().getIntProperty("key").intValue());
-      
+
       cursor.ack(readMessage.a);
-      
+
       server.getStorageManager().waitOnOperations();
 
       pageStore.flushExecutors();
-      
+
       assertFalse(pageStore.isPaging());
 
       server.stop();
       createServer();
-      
+
       assertFalse(pageStore.isPaging());
 
       waitCleanup();
-      
+
       assertFalse(lookupPageStore(ADDRESS).isPaging());
 
    }
@@ -654,13 +620,14 @@
    {
       // The cleanup is done asynchronously, so we need to wait some time
       long timeout = System.currentTimeMillis() + 10000;
-      
+
       while (System.currentTimeMillis() < timeout && lookupPageStore(ADDRESS).getNumberOfPages() != 1)
       {
          Thread.sleep(100);
       }
 
-      assertTrue("expected " + lookupPageStore(ADDRESS).getNumberOfPages(), lookupPageStore(ADDRESS).getNumberOfPages() <= 2);
+      assertTrue("expected " + lookupPageStore(ADDRESS).getNumberOfPages(),
+                 lookupPageStore(ADDRESS).getNumberOfPages() <= 2);
    }
 
    public void testPrepareScenarios() throws Exception
@@ -676,15 +643,10 @@
       PageCursorProvider cursorProvider = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier();
       System.out.println("cursorProvider = " + cursorProvider);
 
-      // TODO: We should be using getPersisentCursor here but I can't change the method here until createQueue is not
-      // creating the cursor also
-      // need to change this after some integration
-      // PageCursor cursor =
-      // this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
       PageSubscription cursor = this.server.getPagingManager()
-                                     .getPageStore(ADDRESS)
-                                     .getCursorProvier()
-                                     .getSubscription(queue.getID());
+                                           .getPageStore(ADDRESS)
+                                           .getCursorProvier()
+                                           .getSubscription(queue.getID());
       LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator = cursor.iterator();
 
       System.out.println("Cursor: " + cursor);
@@ -759,7 +721,7 @@
 
       PageSubscription cursor = cursorProvider.createSubscription(11, null, false);
       PageSubscriptionImpl cursor2 = (PageSubscriptionImpl)cursorProvider.createSubscription(12, null, false);
-      
+
       queue.getPageSubscription().close();
 
       Pair<PagePosition, PagedMessage> msg;
@@ -785,8 +747,7 @@
       assertSame(cursor2.getProvider(), cursorProvider);
 
       cursor2.close();
-      
-       
+
       lookupPageStore(ADDRESS).flushExecutors();
 
       server.stop();
@@ -796,18 +757,17 @@
 
    }
 
-
-   public void testNoCursors() throws Exception // aki
+   public void testNoCursors() throws Exception
    {
 
       final int NUM_MESSAGES = 100;
 
       int numberOfPages = addMessages(NUM_MESSAGES, 1024 * 1024);
-      
+
       ClientSessionFactory sf = createInVMFactory();
       ClientSession session = sf.createSession();
       session.deleteQueue(ADDRESS);
-      
+
       System.out.println("NumberOfPages = " + numberOfPages);
 
       server.stop();
@@ -831,9 +791,9 @@
       PageCache cache = cursorProvider.getPageCache(new PagePositionImpl(5, 0));
 
       PageSubscription cursor = cursorProvider.createSubscription(2, null, false);
-      
+
       queue.getPageSubscription().close();
-      
+
       PagePosition startingPos = new PagePositionImpl(5, cache.getNumberOfMessages() / 2);
       cursor.bookmark(startingPos);
       PagedMessage msg = cache.getMessage(startingPos.getMessageNr() + 1);
@@ -855,7 +815,7 @@
 
       forceGC();
 
-     // assertTrue(cursorProvider.getCacheSize() < numberOfPages);
+      // assertTrue(cursorProvider.getCacheSize() < numberOfPages);
 
       server.stop();
       createServer();
@@ -876,11 +836,6 @@
 
       PageCache cache = cursorProvider.getPageCache(new PagePositionImpl(5, 0));
 
-      // TODO: We should be using getPersisentCursor here but I can't change the method here until createQueue is not
-      // creating the cursor also
-      // need to change this after some integration
-      // PageCursor cursor =
-      // this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
       PageSubscription cursor = cursorProvider.getSubscription(queue.getID());
       PagePosition startingPos = new PagePositionImpl(5, cache.getNumberOfMessages() / 2);
       cursor.bookmark(startingPos);
@@ -928,7 +883,54 @@
       assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
 
    }
+   
+   private int tstProperty(ServerMessage msg)
+   {
+      return msg.getIntProperty("key").intValue();
+   }
 
+   public void testMultipleIterators() throws Exception
+   {
+
+      final int NUM_MESSAGES = 10;
+
+      int numberOfPages = addMessages(NUM_MESSAGES, 1024 * 1024);
+
+      System.out.println("NumberOfPages = " + numberOfPages);
+
+      PageCursorProvider cursorProvider = lookupCursorProvider();
+
+      PageSubscription cursor = cursorProvider.getSubscription(queue.getID());
+
+      Iterator<Pair<PagePosition, PagedMessage>> iter = cursor.iterator();
+      
+      Iterator<Pair<PagePosition, PagedMessage>> iter2 = cursor.iterator();
+      
+      assertTrue(iter.hasNext());
+      
+      Pair<PagePosition, PagedMessage> msg1 = iter.next();
+      
+      Pair<PagePosition, PagedMessage> msg2 = iter2.next();
+      
+      assertEquals(tstProperty(msg1.b.getMessage()), tstProperty(msg2.b.getMessage()));
+      
+      System.out.println("property = " + tstProperty(msg1.b.getMessage()));
+
+      msg1 = iter.next();
+      
+      assertEquals(1, tstProperty(msg1.b.getMessage()));
+      
+      iter.remove();
+      
+      msg2 = iter2.next();
+      
+      assertEquals(2, tstProperty(msg2.b.getMessage()));
+      
+      assertTrue(iter2.hasNext());
+      
+      
+   }
+
    private int addMessages(final int numMessages, final int messageSize) throws Exception
    {
       return addMessages(0, numMessages, messageSize);
@@ -1014,15 +1016,6 @@
     * @return
     * @throws Exception
     */
-   private PageSubscription createNonPersistentCursor() throws Exception
-   {
-      return lookupCursorProvider().createSubscription(server.getStorageManager().generateUniqueID(), null, false);
-   }
-
-   /**
-    * @return
-    * @throws Exception
-    */
    private PageSubscription createNonPersistentCursor(Filter filter) throws Exception
    {
       return lookupCursorProvider().createSubscription(server.getStorageManager().generateUniqueID(), filter, false);
@@ -1070,6 +1063,8 @@
    protected void tearDown() throws Exception
    {
       server.stop();
+      server = null;
+      queue = null;
       super.tearDown();
    }
 



More information about the hornetq-commits mailing list