[hornetq-commits] JBoss hornetq SVN: r9851 - in branches/Branch_New_Paging: src/main/org/hornetq/core/paging/impl and 2 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Nov 5 20:03:13 EDT 2010


Author: clebert.suconic at jboss.com
Date: 2010-11-05 20:03:12 -0400 (Fri, 05 Nov 2010)
New Revision: 9851

Modified:
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/LivePageCacheImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
   branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
Log:
a few fixes

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/LivePageCacheImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/LivePageCacheImpl.java	2010-11-05 16:00:14 UTC (rev 9850)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/LivePageCacheImpl.java	2010-11-06 00:03:12 UTC (rev 9851)
@@ -19,7 +19,6 @@
 import org.hornetq.core.paging.Page;
 import org.hornetq.core.paging.PagedMessage;
 import org.hornetq.core.paging.cursor.LivePageCache;
-import org.hornetq.core.server.ServerMessage;
 
 /**
  * This is the same as PageCache, however this is for the page that's being currently written.

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java	2010-11-05 16:00:14 UTC (rev 9850)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java	2010-11-06 00:03:12 UTC (rev 9851)
@@ -322,6 +322,11 @@
             {
                return;
             }
+            
+            if (pagingStore.getNumberOfPages() == 0)
+            {
+               return;
+            }
 
             ArrayList<PageSubscription> cursorList = new ArrayList<PageSubscription>();
             cursorList.addAll(activeCursors.values());
@@ -498,7 +503,6 @@
 
                List<PagedMessage> pgdMessages = page.read();
 
-               int i = 0;
                for (PagedMessage pdgMessage : pgdMessages)
                {
                   pdgMessage.initMessage(storageManager);

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java	2010-11-05 16:00:14 UTC (rev 9850)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java	2010-11-06 00:03:12 UTC (rev 9851)
@@ -145,8 +145,8 @@
          {
             pos.a.redeliver(pos.b);
          }
+         lateDeliveries.clear();
       }
-      lateDeliveries.clear();
       lateDeliveries = null;
    }
 

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2010-11-05 16:00:14 UTC (rev 9850)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2010-11-06 00:03:12 UTC (rev 9851)
@@ -619,7 +619,7 @@
 
          boolean depage = tx.getProperty(TransactionPropertyIndexes.IS_DEPAGE) != null;
 
-         // if the TX paged at least one message on a give address, all the other message on the same address should also go towards
+         // if the TX paged at least one message on a given address, all the other message on the same address should also go towards
          // paging cache now
          boolean alreadyPaging = false;
 

Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java	2010-11-05 16:00:14 UTC (rev 9850)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java	2010-11-06 00:03:12 UTC (rev 9851)
@@ -712,6 +712,72 @@
 
    }
 
+
+   public void testLazyCommit() throws Exception
+   {
+      PagingStoreImpl pageStore = lookupPageStore(ADDRESS);
+
+      pageStore.startPaging();
+
+      final int NUM_MESSAGES = 100;
+
+      final int messageSize = 100 * 1024;
+
+      PageCursorProvider cursorProvider = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier();
+      System.out.println("cursorProvider = " + cursorProvider);
+
+      PageSubscription cursor = this.server.getPagingManager()
+                                           .getPageStore(ADDRESS)
+                                           .getCursorProvier()
+                                           .getSubscription(queue.getID());
+      LinkedListIterator<PagedReference> iterator = cursor.iterator();
+
+      System.out.println("Cursor: " + cursor);
+
+      StorageManager storage = this.server.getStorageManager();
+
+      PageTransactionInfoImpl txLazy = new PageTransactionInfoImpl(storage.generateUniqueID());
+      
+      server.getPagingManager().addTransaction(txLazy);
+
+      pgMessages(storage, pageStore, txLazy, 0, NUM_MESSAGES, messageSize);
+
+      addMessages(100, NUM_MESSAGES, messageSize);
+
+      System.out.println("Number of pages - " + pageStore.getNumberOfPages());
+
+      // First consume what's already there without any tx as nothing was committed
+      for (int i = 100; i < 200; i++)
+      {
+         PagedReference pos = iterator.next();
+         assertNotNull("Null at position " + i, pos);
+         assertEquals(i, pos.getMessage().getIntProperty("key").intValue());
+         cursor.ack(pos);
+      }
+
+      assertNull(iterator.next());
+      
+      txLazy.commit();
+
+      for (int i = 0; i < 100; i++)
+      {
+         PagedReference pos = iterator.next();
+         assertNotNull("Null at position " + i, pos);
+         assertEquals(i, pos.getMessage().getIntProperty("key").intValue());
+         cursor.ack(pos);
+      }
+
+      assertNull(iterator.next());
+
+      waitCleanup();
+
+      server.stop();
+      createServer();
+      waitCleanup();
+      assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
+
+   }
+
    public void testCloseNonPersistentConsumer() throws Exception
    {
 



More information about the hornetq-commits mailing list