Author: clebert.suconic(a)jboss.com
Date: 2010-11-05 20:03:12 -0400 (Fri, 05 Nov 2010)
New Revision: 9851
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/LivePageCacheImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
Log:
a few fixes
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/LivePageCacheImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/LivePageCacheImpl.java 2010-11-05
16:00:14 UTC (rev 9850)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/LivePageCacheImpl.java 2010-11-06
00:03:12 UTC (rev 9851)
@@ -19,7 +19,6 @@
import org.hornetq.core.paging.Page;
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.paging.cursor.LivePageCache;
-import org.hornetq.core.server.ServerMessage;
/**
* This is the same as PageCache, however this is for the page that's being currently
written.
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-11-05
16:00:14 UTC (rev 9850)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-11-06
00:03:12 UTC (rev 9851)
@@ -322,6 +322,11 @@
{
return;
}
+
+ if (pagingStore.getNumberOfPages() == 0)
+ {
+ return;
+ }
ArrayList<PageSubscription> cursorList = new
ArrayList<PageSubscription>();
cursorList.addAll(activeCursors.values());
@@ -498,7 +503,6 @@
List<PagedMessage> pgdMessages = page.read();
- int i = 0;
for (PagedMessage pdgMessage : pgdMessages)
{
pdgMessage.initMessage(storageManager);
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java 2010-11-05
16:00:14 UTC (rev 9850)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java 2010-11-06
00:03:12 UTC (rev 9851)
@@ -145,8 +145,8 @@
{
pos.a.redeliver(pos.b);
}
+ lateDeliveries.clear();
}
- lateDeliveries.clear();
lateDeliveries = null;
}
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2010-11-05
16:00:14 UTC (rev 9850)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2010-11-06
00:03:12 UTC (rev 9851)
@@ -619,7 +619,7 @@
boolean depage = tx.getProperty(TransactionPropertyIndexes.IS_DEPAGE) != null;
- // if the TX paged at least one message on a give address, all the other message
on the same address should also go towards
+ // if the TX paged at least one message on a given address, all the other
message on the same address should also go towards
// paging cache now
boolean alreadyPaging = false;
Modified:
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
---
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-11-05
16:00:14 UTC (rev 9850)
+++
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-11-06
00:03:12 UTC (rev 9851)
@@ -712,6 +712,72 @@
}
+
+ public void testLazyCommit() throws Exception
+ {
+ PagingStoreImpl pageStore = lookupPageStore(ADDRESS);
+
+ pageStore.startPaging();
+
+ final int NUM_MESSAGES = 100;
+
+ final int messageSize = 100 * 1024;
+
+ PageCursorProvider cursorProvider =
this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier();
+ System.out.println("cursorProvider = " + cursorProvider);
+
+ PageSubscription cursor = this.server.getPagingManager()
+ .getPageStore(ADDRESS)
+ .getCursorProvier()
+ .getSubscription(queue.getID());
+ LinkedListIterator<PagedReference> iterator = cursor.iterator();
+
+ System.out.println("Cursor: " + cursor);
+
+ StorageManager storage = this.server.getStorageManager();
+
+ PageTransactionInfoImpl txLazy = new
PageTransactionInfoImpl(storage.generateUniqueID());
+
+ server.getPagingManager().addTransaction(txLazy);
+
+ pgMessages(storage, pageStore, txLazy, 0, NUM_MESSAGES, messageSize);
+
+ addMessages(100, NUM_MESSAGES, messageSize);
+
+ System.out.println("Number of pages - " + pageStore.getNumberOfPages());
+
+ // First consume what's already there without any tx as nothing was committed
+ for (int i = 100; i < 200; i++)
+ {
+ PagedReference pos = iterator.next();
+ assertNotNull("Null at position " + i, pos);
+ assertEquals(i, pos.getMessage().getIntProperty("key").intValue());
+ cursor.ack(pos);
+ }
+
+ assertNull(iterator.next());
+
+ txLazy.commit();
+
+ for (int i = 0; i < 100; i++)
+ {
+ PagedReference pos = iterator.next();
+ assertNotNull("Null at position " + i, pos);
+ assertEquals(i, pos.getMessage().getIntProperty("key").intValue());
+ cursor.ack(pos);
+ }
+
+ assertNull(iterator.next());
+
+ waitCleanup();
+
+ server.stop();
+ createServer();
+ waitCleanup();
+ assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
+
+ }
+
public void testCloseNonPersistentConsumer() throws Exception
{