Author: clebert.suconic(a)jboss.com
Date: 2010-10-05 13:33:33 -0400 (Tue, 05 Oct 2010)
New Revision: 9753
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingManager.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCacheImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
Log:
Reload Cursors
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingManager.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingManager.java 2010-10-05
17:26:00 UTC (rev 9752)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingManager.java 2010-10-05
17:33:33 UTC (rev 9753)
@@ -81,4 +81,6 @@
SimpleString[] getStoreNames();
void deletePageStore(SimpleString storeName) throws Exception;
+
+ void processReload();
}
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java 2010-10-05
17:26:00 UTC (rev 9752)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java 2010-10-05
17:33:33 UTC (rev 9753)
@@ -63,6 +63,8 @@
Page createPage(final int page) throws Exception;
PageCursorProvider getCursorProvier();
+
+ void processReload();
/**
* @return false if a thread was already started, or if not in page mode
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java 2010-10-05
17:26:00 UTC (rev 9752)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java 2010-10-05
17:33:33 UTC (rev 9753)
@@ -15,6 +15,7 @@
import org.hornetq.api.core.Pair;
import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.transaction.Transaction;
/**
* A PageCursor
@@ -26,28 +27,32 @@
public interface PageCursor
{
+ // Cursor query operations --------------------------------------
+
Pair<PagePosition, ServerMessage> moveNext() throws Exception;
- PagePosition getFirstPosition();
-
void ack(PagePosition position) throws Exception;
- void ackTx(long tx, PagePosition position) throws Exception;
+ void ackTx(Transaction tx, PagePosition position) throws Exception;
+ // Reload operations
+
/**
* @param position
*/
- void recoverACK(PagePosition position);
+ void reloadACK(PagePosition position);
/**
* To be used to avoid a redelivery of a prepared ACK after load
* @param position
*/
- void recoverPreparedACK(PagePosition position);
+ void reloadPreparedACK(Transaction tx, PagePosition position);
+
+ void processReload();
/**
* To be used on redeliveries
* @param position
*/
- void returnElement(PagePosition position);
+ void redeliver(PagePosition position);
}
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java 2010-10-05
17:26:00 UTC (rev 9752)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java 2010-10-05
17:33:33 UTC (rev 9753)
@@ -58,6 +58,8 @@
Pair<PagePosition, ServerMessage> getAfter(PagePosition pos) throws Exception;
+ void processReload();
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCacheImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCacheImpl.java 2010-10-05
17:26:00 UTC (rev 9752)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCacheImpl.java 2010-10-05
17:33:33 UTC (rev 9753)
@@ -58,7 +58,7 @@
*/
public Page getPage()
{
- return null;
+ return page;
}
/* (non-Javadoc)
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-05
17:26:00 UTC (rev 9752)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-05
17:33:33 UTC (rev 9753)
@@ -13,6 +13,10 @@
package org.hornetq.core.paging.cursor.impl;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+
import org.hornetq.api.core.Pair;
import org.hornetq.core.paging.PagingStore;
import org.hornetq.core.paging.cursor.PageCursor;
@@ -20,6 +24,7 @@
import org.hornetq.core.paging.cursor.PagePosition;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.transaction.Transaction;
/**
* A PageCursorImpl
@@ -44,6 +49,8 @@
private final PageCursorProvider cursorProvider;
private volatile PagePosition lastPosition;
+
+ private List<PagePosition> recoveredACK;
// Static --------------------------------------------------------
@@ -94,64 +101,93 @@
store.storeCursorAcknowledge(cursorId, position);
}
- public void ackTx(final long tx, final PagePosition position) throws Exception
+ public void ackTx(final Transaction tx, final PagePosition position) throws Exception
{
- store.storeCursorAcknowledgeTransactional(tx, cursorId, position);
+ store.storeCursorAcknowledgeTransactional(tx.getID(), cursorId, position);
+ installTXCallback(tx, position);
+ // tx.afterCommit()
}
/* (non-Javadoc)
* @see
org.hornetq.core.paging.cursor.PageCursor#returnElement(org.hornetq.core.paging.cursor.PagePosition)
*/
- public void returnElement(final PagePosition position)
+ public void redeliver(final PagePosition position)
{
// TODO Auto-generated method stub
}
+
+ /**
+ * Theres no need to synchronize this method as it's only called from journal load
on startup
+ */
+ public void reloadACK(final PagePosition position)
+ {
+ internalAdd(position);
+
+ }
+
/* (non-Javadoc)
- * @see org.hornetq.core.paging.cursor.PageCursor#getFirstPosition()
+ * @see
org.hornetq.core.paging.cursor.PageCursor#recoverPreparedACK(org.hornetq.core.paging.cursor.PagePosition)
*/
- public PagePosition getFirstPosition()
+ public void reloadPreparedACK(final Transaction tx, final PagePosition position)
{
- // TODO Auto-generated method stub
- return null;
+ internalAdd(position);
+ installTXCallback(tx, position);
}
+ public void processReload()
+ {
+ if (this.recoveredACK != null)
+ {
+ Collections.sort(recoveredACK);
+ }
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
protected boolean match(final ServerMessage message)
{
+ // To be used with expressions
return true;
}
+
+
// Private -------------------------------------------------------
+
+ /**
+ * @param committedACK
+ */
+ private void internalAdd(final PagePosition committedACK)
+ {
+ if (recoveredACK == null)
+ {
+ recoveredACK = new LinkedList<PagePosition>();
+ }
+
+ recoveredACK.add(committedACK);
+ }
+
private PagePosition recoverLastPosition()
{
long firstPage = pageStore.getFirstPage();
return new PagePositionImpl(firstPage, -1);
}
+
- /* (non-Javadoc)
- * @see
org.hornetq.core.paging.cursor.PageCursor#recoverACK(org.hornetq.core.paging.cursor.PagePosition)
+ /**
+ * @param tx
+ * @param position
*/
- public void recoverACK(final PagePosition position)
+ private void installTXCallback(Transaction tx, PagePosition position)
{
- // TODO Auto-generated method stub
+ }
- }
- /* (non-Javadoc)
- * @see
org.hornetq.core.paging.cursor.PageCursor#recoverPreparedACK(org.hornetq.core.paging.cursor.PagePosition)
- */
- public void recoverPreparedACK(final PagePosition position)
- {
- // TODO Auto-generated method stub
-
- }
-
// Inner classes -------------------------------------------------
}
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-10-05
17:26:00 UTC (rev 9752)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-10-05
17:33:33 UTC (rev 9753)
@@ -94,6 +94,8 @@
*/
public Pair<PagePosition, ServerMessage> getAfter(final PagePosition pos) throws
Exception
{
+ // TODO: consider page transactions here to avoid receiving an uncommitted message
+ // TODO: consider the case where a page came empty because of an ignored PageTX
PagePosition retPos = pos.nextMessage();
PageCache cache = getPageCache(pos.getPageNr());
@@ -179,6 +181,14 @@
return softCache.size();
}
+ public void processReload()
+ {
+ for (PageCursor cursor : this.activeCursors.values())
+ {
+ cursor.processReload();
+ }
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java 2010-10-05
17:26:00 UTC (rev 9752)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java 2010-10-05
17:33:33 UTC (rev 9753)
@@ -228,7 +228,16 @@
}
}
}
+
+ public void processReload()
+ {
+ for (PagingStore store: stores.values())
+ {
+ store.processReload();
+ }
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-10-05
17:26:00 UTC (rev 9752)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-10-05
17:33:33 UTC (rev 9753)
@@ -355,7 +355,14 @@
currentPageLock.readLock().unlock();
}
}
+
+ public void processReload()
+ {
+ cursorProvider.processReload();
+ }
+
+
// HornetQComponent implementation
public synchronized boolean isStarted()
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-10-05
17:26:00 UTC (rev 9752)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-10-05
17:33:33 UTC (rev 9753)
@@ -1019,11 +1019,11 @@
SimpleString address = queueInfo.getAddress();
PagingStore store = pagingManager.getPageStore(address);
PageCursor cursor =
store.getCursorProvier().getCursor(encoding.queueID);
- cursor.recoverACK(encoding.position);
+ cursor.reloadACK(encoding.position);
}
else
{
- log.warn("Can't find queue " + queueInfo.getId() + "
while reloading ACKNOWLEDGE_CURSOR");
+ log.warn("Can't find queue " + encoding.queueID + "
while reloading ACKNOWLEDGE_CURSOR");
}
break;
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-10-05
17:26:00 UTC (rev 9752)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-10-05
17:33:33 UTC (rev 9753)
@@ -1114,6 +1114,8 @@
deploymentManager.start();
}
+ pagingManager.reloadStores();
+
pagingManager.resumeDepages();
final ServerInfo dumper = new ServerInfo(this, pagingManager);
Modified:
branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
---
branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2010-10-05
17:26:00 UTC (rev 9752)
+++
branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2010-10-05
17:33:33 UTC (rev 9753)
@@ -1027,6 +1027,15 @@
return null;
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.paging.PagingManager#processReload()
+ */
+ public void processReload()
+ {
+ // TODO Auto-generated method stub
+
+ }
+
}
class FakeStorageManager implements StorageManager
Modified:
branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
===================================================================
---
branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java 2010-10-05
17:26:00 UTC (rev 9752)
+++
branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java 2010-10-05
17:33:33 UTC (rev 9753)
@@ -323,6 +323,16 @@
return null;
}
+
+
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.paging.PagingManager#processReload()
+ */
+ public void processReload()
+ {
+ }
+
}
}