[hornetq-commits] JBoss hornetq SVN: r9822 - in branches/Branch_New_Paging: src/main/org/hornetq/core/paging/cursor/impl and 1 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Oct 29 20:06:05 EDT 2010


Author: clebert.suconic at jboss.com
Date: 2010-10-29 20:06:04 -0400 (Fri, 29 Oct 2010)
New Revision: 9822

Modified:
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
   branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
Log:
Changes

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java	2010-10-29 23:28:22 UTC (rev 9821)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java	2010-10-30 00:06:04 UTC (rev 9822)
@@ -16,6 +16,7 @@
 import org.hornetq.api.core.Pair;
 import org.hornetq.core.paging.PagedMessage;
 import org.hornetq.core.transaction.Transaction;
+import org.hornetq.utils.LinkedListIterator;
 
 /**
  * A PageCursor
@@ -37,6 +38,8 @@
    /** It will be 0 if non persistent cursor */
    public long getId();
    
+   public LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator();
+   
    // To be called when the cursor is closed for good. Most likely when the queue is deleted
    void close() throws Exception;
    

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java	2010-10-29 23:28:22 UTC (rev 9821)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java	2010-10-30 00:06:04 UTC (rev 9822)
@@ -152,7 +152,7 @@
    }
    
    
-   class LocalIterator implements LinkedListIterator<Pair<PagePosition, PagedMessage>>
+   class CursorIterator implements LinkedListIterator<Pair<PagePosition, PagedMessage>>
    {
       PagePosition position = getLastPosition();
       
@@ -191,7 +191,14 @@
          {
              Pair<PagePosition, PagedMessage> nextPos = moveNext(position);
              lastOperation = position;
-             position = nextPos.a;
+             if (nextPos == null)
+             {
+                position = null;
+             }
+             else
+             {
+                position = nextPos.a;
+             }
              return nextPos;
          }
          catch (Exception e)
@@ -219,8 +226,20 @@
       {
       }
    }
+   
+   
 
    /* (non-Javadoc)
+    * @see org.hornetq.core.paging.cursor.PageCursor#iterator()
+    */
+   public LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator()
+   {
+      return new CursorIterator();
+   }
+
+
+
+   /* (non-Javadoc)
     * @see org.hornetq.core.paging.cursor.PageCursor#moveNext()
     */
    public synchronized Pair<PagePosition, PagedMessage> moveNext(PagePosition posision) throws Exception
@@ -923,5 +942,4 @@
       }
 
    }
-
 }

Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java	2010-10-29 23:28:22 UTC (rev 9821)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java	2010-10-30 00:06:04 UTC (rev 9822)
@@ -46,6 +46,7 @@
 import org.hornetq.core.transaction.impl.TransactionImpl;
 import org.hornetq.tests.util.RandomUtil;
 import org.hornetq.tests.util.ServiceTestBase;
+import org.hornetq.utils.LinkedListIterator;
 
 /**
  * A PageCacheTest
@@ -118,14 +119,24 @@
 
       Pair<PagePosition, PagedMessage> msg;
 
+      LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator = cursor.iterator();
       int key = 0;
-      while ((msg = cursor.moveNext()) != null)
+      while ((msg = iterator.next()) != null)
       {
          assertEquals(key++, msg.b.getMessage().getIntProperty("key").intValue());
          cursor.ack(msg.a);
       }
       assertEquals(NUM_MESSAGES, key);
+      
+      server.getStorageManager().waitOnOperations();
+      
 
+      waitCleanup();
+
+      assertFalse(lookupPageStore(ADDRESS).isPaging());
+
+      assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
+
       forceGC();
 
       assertTrue(lookupCursorProvider().getCacheSize() < numberOfPages);
@@ -194,8 +205,13 @@
 
       Pair<PagePosition, PagedMessage> msg;
 
+      LinkedListIterator<Pair<PagePosition, PagedMessage>> iteratorEven = cursorEven.iterator();
+
+      LinkedListIterator<Pair<PagePosition, PagedMessage>> iteratorOdd = cursorOdd.iterator();
+      
+
       int key = 0;
-      while ((msg = cursorEven.moveNext()) != null)
+      while ((msg = iteratorEven.next()) != null)
       {
          assertEquals(key, msg.b.getMessage().getIntProperty("key").intValue());
          assertTrue(msg.b.getMessage().getBooleanProperty("even").booleanValue());
@@ -205,7 +221,7 @@
       assertEquals(NUM_MESSAGES, key);
 
       key = 1;
-      while ((msg = cursorOdd.moveNext()) != null)
+      while ((msg = iteratorOdd.next()) != null)
       {
          assertEquals(key, msg.b.getMessage().getIntProperty("key").intValue());
          assertFalse(msg.b.getMessage().getBooleanProperty("even").booleanValue());
@@ -271,11 +287,13 @@
 
       System.out.println("Cursor: " + cursor);
       cursorProvider.printDebug();
+      
+      LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator = cursor.iterator();
 
       for (int i = 0; i < 1000; i++)
       {
          System.out.println("Reading Msg : " + i);
-         Pair<PagePosition, PagedMessage> msg = cursor.moveNext();
+         Pair<PagePosition, PagedMessage> msg = iterator.next();
          assertNotNull(msg);
          assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
 
@@ -305,10 +323,12 @@
                           .getCursorProvier()
                           .getPersistentCursor(queue.getID());
 
+      iterator = cursor.iterator();
+      
       for (int i = firstPageSize; i < NUM_MESSAGES; i++)
       {
          System.out.println("Received " + i);
-         Pair<PagePosition, PagedMessage> msg = cursor.moveNext();
+         Pair<PagePosition, PagedMessage> msg = iterator.next();
          assertNotNull(msg);
          assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
 
@@ -356,9 +376,10 @@
                                      .createPersistentCursor(queue.getID(), null);
 
       System.out.println("Cursor: " + cursor);
+      LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator = cursor.iterator();
       for (int i = 0; i < 100; i++)
       {
-         Pair<PagePosition, PagedMessage> msg = cursor.moveNext();
+         Pair<PagePosition, PagedMessage> msg = iterator.next();
          assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
          if (i < 10 || i > 20)
          {
@@ -376,17 +397,18 @@
                           .getPageStore(ADDRESS)
                           .getCursorProvier()
                           .getPersistentCursor(queue.getID());
+      iterator = cursor.iterator();
 
       for (int i = 10; i <= 20; i++)
       {
-         Pair<PagePosition, PagedMessage> msg = cursor.moveNext();
+         Pair<PagePosition, PagedMessage> msg = iterator.next();
          assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
          cursor.ack(msg.a);
       }
 
       for (int i = 100; i < NUM_MESSAGES; i++)
       {
-         Pair<PagePosition, PagedMessage> msg = cursor.moveNext();
+         Pair<PagePosition, PagedMessage> msg = iterator.next();
          assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
          cursor.ack(msg.a);
       }
@@ -422,9 +444,12 @@
       System.out.println("Cursor: " + cursor);
 
       Transaction tx = new TransactionImpl(server.getStorageManager(), 60 * 1000);
+
+      LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator = cursor.iterator();
+
       for (int i = 0; i < 100; i++)
       {
-         Pair<PagePosition, PagedMessage> msg = cursor.moveNext();
+         Pair<PagePosition, PagedMessage> msg = iterator.next();
          assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
          if (i < 10 || i > 20)
          {
@@ -446,17 +471,18 @@
                           .getPersistentCursor(queue.getID());
 
       tx = new TransactionImpl(server.getStorageManager(), 60 * 1000);
+      iterator = cursor.iterator();
 
       for (int i = 10; i <= 20; i++)
       {
-         Pair<PagePosition, PagedMessage> msg = cursor.moveNext();
+         Pair<PagePosition, PagedMessage> msg = iterator.next();
          assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
          cursor.ackTx(tx, msg.a);
       }
 
       for (int i = 100; i < NUM_MESSAGES; i++)
       {
-         Pair<PagePosition, PagedMessage> msg = cursor.moveNext();
+         Pair<PagePosition, PagedMessage> msg = iterator.next();
          assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
          cursor.ackTx(tx, msg.a);
       }
@@ -495,6 +521,8 @@
 
       System.out.println("Cursor: " + cursor);
 
+      LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator = cursor.iterator();
+
       for (int i = 0; i < NUM_MESSAGES; i++)
       {
          if (i % 100 == 0)
@@ -509,13 +537,13 @@
 
          Assert.assertTrue(pageStore.page(msg));
 
-         Pair<PagePosition, PagedMessage> readMessage = cursor.moveNext();
+         Pair<PagePosition, PagedMessage> readMessage = iterator.next();
 
          assertNotNull(readMessage);
 
          assertEquals(i, readMessage.b.getMessage().getIntProperty("key").intValue());
 
-         assertNull(cursor.moveNext());
+         assertNull(iterator.next());
       }
 
       server.stop();
@@ -530,6 +558,7 @@
                           .getPageStore(ADDRESS)
                           .getCursorProvier()
                           .getPersistentCursor(queue.getID());
+      iterator = cursor.iterator();
 
       for (int i = 0; i < NUM_MESSAGES * 2; i++)
       {
@@ -549,7 +578,7 @@
             Assert.assertTrue(pageStore.page(msg));
          }
 
-         Pair<PagePosition, PagedMessage> readMessage = cursor.moveNext();
+         Pair<PagePosition, PagedMessage> readMessage = iterator.next();
 
          assertNotNull(readMessage);
 
@@ -568,6 +597,7 @@
                           .getPageStore(ADDRESS)
                           .getCursorProvier()
                           .getPersistentCursor(queue.getID());
+      iterator = cursor.iterator();
 
       for (int i = 0; i < NUM_MESSAGES * 3; i++)
       {
@@ -587,7 +617,7 @@
             Assert.assertTrue(pageStore.page(msg));
          }
 
-         Pair<PagePosition, PagedMessage> readMessage = cursor.moveNext();
+         Pair<PagePosition, PagedMessage> readMessage = iterator.next();
 
          assertNotNull(readMessage);
 
@@ -596,7 +626,7 @@
          assertEquals(i, readMessage.b.getMessage().getIntProperty("key").intValue());
       }
       
-      Pair<PagePosition, PagedMessage> readMessage = cursor.moveNext();
+      Pair<PagePosition, PagedMessage> readMessage = iterator.next();
       
       assertEquals(NUM_MESSAGES * 3, readMessage.b.getMessage().getIntProperty("key").intValue());
       
@@ -658,6 +688,7 @@
                                      .getPageStore(ADDRESS)
                                      .getCursorProvier()
                                      .createPersistentCursor(queue.getID(), null);
+      LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator = cursor.iterator();
 
       System.out.println("Cursor: " + cursor);
 
@@ -686,13 +717,13 @@
       // First consume what's already there without any tx as nothing was committed
       for (int i = 300; i < 400; i++)
       {
-         Pair<PagePosition, PagedMessage> pos = cursor.moveNext();
+         Pair<PagePosition, PagedMessage> pos = iterator.next();
          assertNotNull("Null at position " + i, pos);
          assertEquals(i, pos.b.getMessage().getIntProperty("key").intValue());
          cursor.ack(pos.a);
       }
 
-      assertNull(cursor.moveNext());
+      assertNull(iterator.next());
 
       cursor.printDebug();
       pgtxRollback.rollback();
@@ -703,13 +734,13 @@
       // Second:after pgtxCommit was done
       for (int i = 200; i < 300; i++)
       {
-         Pair<PagePosition, PagedMessage> pos = cursor.moveNext();
+         Pair<PagePosition, PagedMessage> pos = iterator.next();
          assertNotNull(pos);
          assertEquals(i, pos.b.getMessage().getIntProperty("key").intValue());
          cursor.ack(pos.a);
       }
 
-      assertNull(cursor.moveNext());
+      assertNull(iterator.next());
 
       server.stop();
       createServer();
@@ -733,9 +764,11 @@
       PageCursorImpl cursor2 = (PageCursorImpl)cursorProvider.createNonPersistentCursor(null);
 
       Pair<PagePosition, PagedMessage> msg;
+      LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator = cursor.iterator();
+      LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator2 = cursor.iterator();
 
       int key = 0;
-      while ((msg = cursor.moveNext()) != null)
+      while ((msg = iterator.next()) != null)
       {
          assertEquals(key++, msg.b.getMessage().getIntProperty("key").intValue());
          cursor.ack(msg.a);
@@ -748,7 +781,7 @@
 
       for (int i = 0; i < 10; i++)
       {
-         msg = cursor2.moveNext();
+         msg = iterator2.next();
          assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
       }
 
@@ -791,9 +824,10 @@
       msg = null;
 
       cache = null;
+      LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator = cursor.iterator();
 
       Pair<PagePosition, PagedMessage> msgCursor = null;
-      while ((msgCursor = cursor.moveNext()) != null)
+      while ((msgCursor = iterator.next()) != null)
       {
          assertEquals(key++, msgCursor.b.getMessage().getIntProperty("key").intValue());
          cursor.ack(msgCursor.a);
@@ -840,8 +874,10 @@
 
       cache = null;
 
+      LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator = cursor.iterator();
+
       Pair<PagePosition, PagedMessage> msgCursor = null;
-      while ((msgCursor = cursor.moveNext()) != null)
+      while ((msgCursor = iterator.next()) != null)
       {
          assertEquals(key++, msgCursor.b.getMessage().getIntProperty("key").intValue());
       }
@@ -856,7 +892,8 @@
       cursorProvider = lookupCursorProvider();
       cursor = cursorProvider.getPersistentCursor(queue.getID());
       key = initialKey;
-      while ((msgCursor = cursor.moveNext()) != null)
+      iterator = cursor.iterator();
+      while ((msgCursor = iterator.next()) != null)
       {
          assertEquals(key++, msgCursor.b.getMessage().getIntProperty("key").intValue());
          cursor.ack(msgCursor.a);



More information about the hornetq-commits mailing list