Author: clebert.suconic(a)jboss.com
Date: 2010-10-11 17:16:17 -0400 (Mon, 11 Oct 2010)
New Revision: 9769
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCache.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagePosition.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PagePositionImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreFactoryNIO.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/transaction/TransactionPropertyIndexes.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/client/PagingTest.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCrashTest.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
Log:
just backing up the current state of my work
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java 2010-10-11
13:42:51 UTC (rev 9768)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java 2010-10-11
21:16:17 UTC (rev 9769)
@@ -37,6 +37,9 @@
SimpleString getAddress();
int getNumberOfPages();
+
+ // The current page in which the system is writing files
+ int getCurrentWritingPage();
SimpleString getStoreName();
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCache.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCache.java 2010-10-11
13:42:51 UTC (rev 9768)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCache.java 2010-10-11
21:16:17 UTC (rev 9769)
@@ -28,6 +28,8 @@
Page getPage();
int getNumberOfMessages();
+
+ void setMessages(ServerMessage[] messages);
/**
*
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java 2010-10-11
13:42:51 UTC (rev 9768)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java 2010-10-11
21:16:17 UTC (rev 9769)
@@ -14,6 +14,7 @@
package org.hornetq.core.paging.cursor;
import org.hornetq.api.core.Pair;
+import org.hornetq.core.paging.Page;
import org.hornetq.core.paging.PagingStore;
import org.hornetq.core.server.ServerMessage;
@@ -37,7 +38,7 @@
// Public --------------------------------------------------------
- PageCache getPageCache(long pageId) throws Exception;
+ PageCache getPageCache(PagePosition pos);
PagingStore getAssociatedStore();
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagePosition.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagePosition.java 2010-10-11
13:42:51 UTC (rev 9768)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagePosition.java 2010-10-11
21:16:17 UTC (rev 9769)
@@ -14,6 +14,7 @@
package org.hornetq.core.paging.cursor;
+
/**
* A PagePosition
*
@@ -33,6 +34,15 @@
long getPageNr();
int getMessageNr();
+
+ void setPageCache(PageCache pageCache);
+
+ /**
+ * PagePosition will hold the page with a weak reference.
+ * So, this could be eventually null case soft-cache was released
+ * @return
+ */
+ PageCache getPageCache();
PagePosition nextMessage();
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-11
13:42:51 UTC (rev 9768)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-11
21:16:17 UTC (rev 9769)
@@ -14,20 +14,29 @@
package org.hornetq.core.paging.cursor.impl;
import java.util.Collections;
-import java.util.Deque;
+import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.Map.Entry;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicInteger;
import org.hornetq.api.core.Pair;
+import org.hornetq.core.paging.Page;
import org.hornetq.core.paging.PagingStore;
+import org.hornetq.core.paging.cursor.PageCache;
import org.hornetq.core.paging.cursor.PageCursor;
import org.hornetq.core.paging.cursor.PageCursorProvider;
import org.hornetq.core.paging.cursor.PagePosition;
import org.hornetq.core.persistence.StorageManager;
-import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.transaction.Transaction;
+import org.hornetq.core.transaction.TransactionOperation;
+import org.hornetq.core.transaction.TransactionPropertyIndexes;
/**
* A PageCursorImpl
@@ -50,11 +59,15 @@
private final PagingStore pageStore;
private final PageCursorProvider cursorProvider;
+
+ private final Executor executor;
private volatile PagePosition lastPosition;
private List<PagePosition> recoveredACK;
+ private SortedMap<Long, PageCursorInfo> consumedPages =
Collections.synchronizedSortedMap(new TreeMap<Long, PageCursorInfo>());
+
// We only store the position for redeliveries. They will be read from the SoftCache
again during delivery.
private final ConcurrentLinkedQueue<PagePosition> redeliveries = new
ConcurrentLinkedQueue<PagePosition>();
@@ -65,12 +78,14 @@
public PageCursorImpl(final PageCursorProvider cursorProvider,
final PagingStore pageStore,
final StorageManager store,
+ final Executor executor,
final long cursorId)
{
this.pageStore = pageStore;
this.store = store;
this.cursorProvider = cursorProvider;
this.cursorId = cursorId;
+ this.executor = executor;
}
// Public --------------------------------------------------------
@@ -98,19 +113,23 @@
boolean match = false;
Pair<PagePosition, ServerMessage> message = null;
+
do
{
message = cursorProvider.getAfter(lastPosition);
+
if (message != null)
{
lastPosition = message.a;
- }
- match = match(message.b);
- if (!match)
- {
- ignored(message.a);
+ match = match(message.b);
+
+ if (!match)
+ {
+ confirmPagePosition(message.a);
+ }
}
+
}
while (message != null && !match);
@@ -130,11 +149,6 @@
store.storeCursorAcknowledgeTransactional(tx.getID(), cursorId, position);
installTXCallback(tx, position);
- // It needs to persist, otherwise the cursor will return to the fist page position
- tx.setContainsPersistent();
-
-
- // tx.afterCommit()
}
/* (non-Javadoc)
@@ -173,6 +187,11 @@
PagePosition previousPos = null;
for (PagePosition pos : recoveredACK)
{
+ PageCursorInfo positions = getPageInfo(pos);
+
+ positions.confirmed.incrementAndGet();
+ positions.acks.add(pos);
+
lastPosition = pos;
if (previousPos != null)
{
@@ -183,6 +202,9 @@
while (true)
{
Pair<PagePosition, ServerMessage> msgCheck =
cursorProvider.getAfter(tmpPos);
+
+ positions = getPageInfo(tmpPos);
+
// end of the hole, we can finish processing here
// It may be also that the next was just a next page, so we just
ignore it
if (msgCheck == null || msgCheck.a.equals(pos))
@@ -195,6 +217,12 @@
{
redeliver(msgCheck.a);
}
+ else
+ {
+ // The reference was ignored. But we must take a count from
the reference count
+ // otherwise the page will never be deleted hence we would
never leave paging even if everything was consumed
+ positions.confirmed.incrementAndGet();
+ }
}
tmpPos = msgCheck.a;
}
@@ -210,9 +238,29 @@
}
}
+ /**
+ * @param page
+ * @return
+ */
+ private PageCursorInfo getPageInfo(PagePosition pos)
+ {
+ PageCursorInfo pageInfo = consumedPages.get(pos.getPageNr());
+
+ if (pageInfo == null)
+ {
+ PageCache cache = cursorProvider.getPageCache(pos);
+ pageInfo = new PageCursorInfo(pos.getPageNr(), cache.getNumberOfMessages());
+ consumedPages.put(pos.getPageNr(), pageInfo);
+ }
+
+ return pageInfo;
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
+
+
protected boolean match(final ServerMessage message)
{
@@ -221,10 +269,15 @@
}
// Private -------------------------------------------------------
-
- private void ignored(final PagePosition message)
+
+ private void confirmPagePosition(final PagePosition pos)
{
- // TODO: Update reference counts
+ PageCursorInfo info = getPageInfo(pos);
+
+ if (info.confirmed.incrementAndGet() == info.getNumberOfMessages())
+ {
+ // todo delete previous destinations
+ }
}
/**
@@ -246,9 +299,136 @@
*/
private void installTXCallback(Transaction tx, PagePosition position)
{
- //TODO: Play with rollbacks on the reference counts
+ // It needs to persist, otherwise the cursor will return to the fist page position
+ tx.setContainsPersistent();
+
+ PageCursorTX cursorTX =
(PageCursorTX)tx.getProperty(TransactionPropertyIndexes.PAGE_CURSOR_POSITIONS);
+
+ if (cursorTX == null)
+ {
+ cursorTX = new PageCursorTX();
+ tx.putProperty(TransactionPropertyIndexes.PAGE_CURSOR_POSITIONS,cursorTX);
+ tx.addOperation(cursorTX);
+ }
+
+
}
// Inner classes -------------------------------------------------
+
+
+ private static class PageCursorInfo
+ {
+ // Number of messages existent on this page
+ private final int numberOfMessages;
+
+ private final long pageId;
+
+ // Confirmed ACKs on this page
+ private final List<PagePosition> acks = new
LinkedList<PagePosition>();
+
+ // We need a separate counter as the cursor may be ignoring certain values because
of incomplete transactions or expressions
+ private final AtomicInteger confirmed = new AtomicInteger(0);
+
+ public PageCursorInfo(final long pageId, final int numberOfMessages)
+ {
+ this.pageId = pageId;
+ this.numberOfMessages = numberOfMessages;
+ }
+ /**
+ * @return the numberOfMessages
+ */
+ public int getNumberOfMessages()
+ {
+ return numberOfMessages;
+ }
+
+ /**
+ * @return the pageId
+ */
+ public long getPageId()
+ {
+ return pageId;
+ }
+
+ public void addACK(final PagePosition posACK)
+ {
+ this.acks.add(posACK);
+ }
+
+ }
+
+ static class PageCursorTX implements TransactionOperation
+ {
+ HashMap<PageCursorImpl, List<PagePosition>> pendingPositions = new
HashMap<PageCursorImpl, List<PagePosition>>();
+
+ public void addPositionConfirmation(PageCursorImpl cursor, PagePosition position)
+ {
+ List<PagePosition> list = pendingPositions.get(cursor);
+
+ if (list == null)
+ {
+ list = new LinkedList<PagePosition>();
+ pendingPositions.put(cursor, list);
+ }
+
+ list.add(position);
+ }
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.transaction.TransactionOperation#beforePrepare(org.hornetq.core.transaction.Transaction)
+ */
+ public void beforePrepare(Transaction tx) throws Exception
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.transaction.TransactionOperation#afterPrepare(org.hornetq.core.transaction.Transaction)
+ */
+ public void afterPrepare(Transaction tx)
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.transaction.TransactionOperation#beforeCommit(org.hornetq.core.transaction.Transaction)
+ */
+ public void beforeCommit(Transaction tx) throws Exception
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.transaction.TransactionOperation#afterCommit(org.hornetq.core.transaction.Transaction)
+ */
+ public void afterCommit(Transaction tx)
+ {
+ for (Entry<PageCursorImpl, List<PagePosition>> entry :
this.pendingPositions.entrySet())
+ {
+ PageCursorImpl cursor = entry.getKey();
+
+ List<PagePosition> positions = entry.getValue();
+
+ for (PagePosition confirmed : positions)
+ {
+ cursor.confirmPagePosition(confirmed);
+ }
+
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.transaction.TransactionOperation#beforeRollback(org.hornetq.core.transaction.Transaction)
+ */
+ public void beforeRollback(Transaction tx) throws Exception
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.transaction.TransactionOperation#afterRollback(org.hornetq.core.transaction.Transaction)
+ */
+ public void afterRollback(Transaction tx)
+ {
+ }
+ }
+
}
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-10-11
13:42:51 UTC (rev 9768)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-10-11
21:16:17 UTC (rev 9769)
@@ -26,6 +26,7 @@
import org.hornetq.core.paging.cursor.PagePosition;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.server.ServerMessage;
+import org.hornetq.utils.ExecutorFactory;
import org.hornetq.utils.SoftValueHashMap;
import org.jboss.netty.util.internal.ConcurrentHashMap;
@@ -46,21 +47,26 @@
// Attributes ----------------------------------------------------
private final PagingStore pagingStore;
-
+
private final StorageManager storageManager;
-
- private SoftValueHashMap<Long, PageCacheImpl> softCache = new
SoftValueHashMap<Long, PageCacheImpl>();
-
+
+ private final ExecutorFactory executorFactory;
+
+ private SoftValueHashMap<Long, PageCache> softCache = new
SoftValueHashMap<Long, PageCache>();
+
private ConcurrentMap<Long, PageCursor> activeCursors = new
ConcurrentHashMap<Long, PageCursor>();
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public PageCursorProviderImpl(final PagingStore pagingStore, final StorageManager
storageManager)
+ public PageCursorProviderImpl(final PagingStore pagingStore,
+ final StorageManager storageManager,
+ final ExecutorFactory executorFactory)
{
this.pagingStore = pagingStore;
this.storageManager = storageManager;
+ this.executorFactory = executorFactory;
}
// Public --------------------------------------------------------
@@ -78,20 +84,23 @@
PageCursor activeCursor = activeCursors.get(cursorID);
if (activeCursor == null)
{
- activeCursor = new PageCursorImpl(this, pagingStore, storageManager, cursorID);
+ activeCursor = new PageCursorImpl(this, pagingStore, storageManager,
executorFactory.getExecutor(), cursorID);
PageCursor previousValue = activeCursors.putIfAbsent(cursorID, activeCursor);
if (previousValue != null)
{
activeCursor = previousValue;
}
}
-
+
return activeCursor;
}
-
+
+ /**
+ * this will create a non-persistent cursor
+ */
public PageCursor createCursor()
{
- return new PageCursorImpl(this, pagingStore, storageManager, 0);
+ return new PageCursorImpl(this, pagingStore, storageManager,
executorFactory.getExecutor(), 0);
}
/* (non-Javadoc)
@@ -100,100 +109,55 @@
public Pair<PagePosition, ServerMessage> getAfter(final PagePosition pos) throws
Exception
{
// TODO: consider page transactions here to avoid receiving an uncommitted message
- // TODO: consider the case where a page came empty because of an ignored PageTX
+ // TODO: consider the case where a full page is ignored because of a TX
PagePosition retPos = pos.nextMessage();
-
- PageCache cache = getPageCache(pos.getPageNr());
-
+
+ PageCache cache = getPageCache(pos);
+
if (retPos.getMessageNr() >= cache.getNumberOfMessages())
{
retPos = pos.nextPage();
-
- cache = getPageCache(retPos.getPageNr());
+
+ cache = getPageCache(retPos);
+
if (cache == null)
{
return null;
}
-
+
if (retPos.getMessageNr() >= cache.getNumberOfMessages())
{
return null;
}
}
-
+
return new Pair<PagePosition, ServerMessage>(retPos,
cache.getMessage(retPos.getMessageNr()));
}
-
+
public ServerMessage getMessage(final PagePosition pos) throws Exception
{
- PageCache cache = getPageCache(pos.getPageNr());
-
+ PageCache cache = getPageCache(pos);
+
if (pos.getMessageNr() >= cache.getNumberOfMessages())
{
// sanity check, this should never happen unless there's a bug
throw new IllegalStateException("Invalid messageNumber passed = " +
pos);
}
-
+
return cache.getMessage(pos.getMessageNr());
}
- public PageCache getPageCache(final long pageId) throws Exception
+ public PageCache getPageCache(PagePosition pos)
{
- boolean needToRead = false;
- PageCacheImpl cache = null;
- synchronized (this)
+ PageCache cache = pos.getPageCache();
+ if (cache == null)
{
- if (pageId > pagingStore.getNumberOfPages())
- {
- return null;
- }
-
- cache = softCache.get(pageId);
- if (cache == null)
- {
- cache = createPageCache(pageId);
- needToRead = true;
- // anyone reading from this cache will have to wait reading to finish first
- // we also want only one thread reading this cache
- cache.lock();
- softCache.put(pageId, cache);
- }
+ cache = getPageCache(pos.getPageNr());
+ pos.setPageCache(cache);
}
-
- // Reading is done outside of the synchronized block, however
- // the page stays locked until the entire reading is finished
- if (needToRead)
- {
- try
- {
- Page page = pagingStore.createPage((int)pageId);
-
- page.open();
-
- List<PagedMessage> pgdMessages = page.read();
-
- ServerMessage srvMessages[] = new ServerMessage[pgdMessages.size()];
-
- int i = 0;
- for (PagedMessage pdgMessage : pgdMessages)
- {
- ServerMessage message = pdgMessage.getMessage(storageManager);
- srvMessages[i++] = message;
- }
-
- cache.setMessages(srvMessages);
-
- }
- finally
- {
- cache.unlock();
- }
- }
-
-
return cache;
}
-
+
public int getCacheSize()
{
return softCache.size();
@@ -206,7 +170,7 @@
cursor.processReload();
}
}
-
+
public void stop()
{
activeCursors.clear();
@@ -215,7 +179,7 @@
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
-
+
protected PageCacheImpl createPageCache(final long pageId) throws Exception
{
return new PageCacheImpl(pagingStore.createPage((int)pageId));
@@ -223,6 +187,69 @@
// Private -------------------------------------------------------
+ private PageCache getPageCache(final long pageId)
+ {
+ try
+ {
+ boolean needToRead = false;
+ PageCache cache = null;
+ synchronized (this)
+ {
+ if (pageId > pagingStore.getCurrentWritingPage())
+ {
+ return null;
+ }
+
+ cache = softCache.get(pageId);
+ if (cache == null)
+ {
+ cache = createPageCache(pageId);
+ needToRead = true;
+ // anyone reading from this cache will have to wait reading to finish
first
+ // we also want only one thread reading this cache
+ cache.lock();
+ softCache.put(pageId, cache);
+ }
+ }
+
+ // Reading is done outside of the synchronized block, however
+ // the page stays locked until the entire reading is finished
+ if (needToRead)
+ {
+ try
+ {
+ Page page = pagingStore.createPage((int)pageId);
+
+ page.open();
+
+ List<PagedMessage> pgdMessages = page.read();
+
+ ServerMessage srvMessages[] = new ServerMessage[pgdMessages.size()];
+
+ int i = 0;
+ for (PagedMessage pdgMessage : pgdMessages)
+ {
+ ServerMessage message = pdgMessage.getMessage(storageManager);
+ srvMessages[i++] = message;
+ }
+
+ cache.setMessages(srvMessages);
+
+ }
+ finally
+ {
+ cache.unlock();
+ }
+ }
+
+ return cache;
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException("Couldn't complete paging due to an IO
Exception on Paging - " + e.getMessage(), e);
+ }
+ }
+
// Inner classes -------------------------------------------------
}
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PagePositionImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PagePositionImpl.java 2010-10-11
13:42:51 UTC (rev 9768)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PagePositionImpl.java 2010-10-11
21:16:17 UTC (rev 9769)
@@ -13,6 +13,10 @@
package org.hornetq.core.paging.cursor.impl;
+import java.lang.ref.WeakReference;
+
+import org.hornetq.core.paging.Page;
+import org.hornetq.core.paging.cursor.PageCache;
import org.hornetq.core.paging.cursor.PagePosition;
/**
@@ -30,6 +34,8 @@
/** ID used for storage */
private long recordID;
+
+ private volatile WeakReference<PageCache> cacheReference;
/**
* @param pageNr
@@ -42,6 +48,12 @@
this.messageNr = messageNr;
}
+ public PagePositionImpl(long pageNr, int messageNr, PageCache pageCache)
+ {
+ this(pageNr, messageNr);
+ this.setPageCache(pageCache);
+ }
+
/**
* @param pageNr
* @param messageNr
@@ -52,6 +64,31 @@
}
/**
+ * The cached page associaed with this position
+ * @return
+ */
+ public PageCache getPageCache()
+ {
+ if (cacheReference == null)
+ {
+ return null;
+ }
+ else
+ {
+ return cacheReference.get();
+ }
+ }
+
+ public void setPageCache(final PageCache cache)
+ {
+ if (cache != null)
+ {
+ this.cacheReference = new WeakReference<PageCache>(cache);
+ }
+ }
+
+
+ /**
* @return the recordID
*/
public long getRecordID()
@@ -117,7 +154,7 @@
public PagePosition nextMessage()
{
- return new PagePositionImpl(this.pageNr, this.messageNr + 1);
+ return new PagePositionImpl(this.pageNr, this.messageNr + 1, this.getPageCache());
}
public PagePosition nextPage()
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageImpl.java 2010-10-11
13:42:51 UTC (rev 9768)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageImpl.java 2010-10-11
21:16:17 UTC (rev 9769)
@@ -13,7 +13,6 @@
package org.hornetq.core.paging.impl;
-import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
@@ -35,7 +34,7 @@
* @author <a href="mailto:clebert.suconic@jboss.com">Clebert
Suconic</a>
*
*/
-public class PageImpl implements Page
+public class PageImpl implements Page, Comparable<Page>
{
// Constants -----------------------------------------------------
@@ -241,13 +240,54 @@
{
return "PageImpl::pageID=" + this.pageId + ", file=" +
this.file;
}
+
+ /* (non-Javadoc)
+ * @see java.lang.Comparable#compareTo(java.lang.Object)
+ */
+ public int compareTo(Page otherPage)
+ {
+ return otherPage.getPageId() - this.pageId;
+ }
+
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#hashCode()
+ */
+ @Override
+ public int hashCode()
+ {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + pageId;
+ return result;
+ }
+
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
// Private -------------------------------------------------------
+ /* (non-Javadoc)
+ * @see java.lang.Object#equals(java.lang.Object)
+ */
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ PageImpl other = (PageImpl)obj;
+ if (pageId != other.pageId)
+ return false;
+ return true;
+ }
+
/**
* @param position
* @param msgNumber
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreFactoryNIO.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreFactoryNIO.java 2010-10-11
13:42:51 UTC (rev 9768)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreFactoryNIO.java 2010-10-11
21:16:17 UTC (rev 9769)
@@ -98,7 +98,7 @@
this,
address,
settings,
- executorFactory.getExecutor(),
+ executorFactory,
syncNonTransactional);
}
@@ -202,7 +202,7 @@
this,
address,
settings,
- executorFactory.getExecutor(),
+ executorFactory,
syncNonTransactional);
storesReturn.add(store);
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-10-11
13:42:51 UTC (rev 9768)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-10-11
21:16:17 UTC (rev 9769)
@@ -53,6 +53,7 @@
import org.hornetq.core.transaction.Transaction.State;
import org.hornetq.core.transaction.TransactionPropertyIndexes;
import org.hornetq.core.transaction.impl.TransactionImpl;
+import org.hornetq.utils.ExecutorFactory;
/**
*
@@ -150,7 +151,7 @@
final PagingStoreFactory storeFactory,
final SimpleString storeName,
final AddressSettings addressSettings,
- final Executor executor,
+ final ExecutorFactory executorFactory,
final boolean syncNonTransactional)
{
if (pagingManager == null)
@@ -182,7 +183,7 @@
pageSize);
}
- this.executor = executor;
+ this.executor = executorFactory.getExecutor();
this.pagingManager = pagingManager;
@@ -192,7 +193,7 @@
this.syncNonTransactional = syncNonTransactional;
- this.cursorProvider = new PageCursorProviderImpl(this, this.storageManager);
+ this.cursorProvider = new PageCursorProviderImpl(this, this.storageManager,
executorFactory);
// Post office could be null on the backup node
if (postOffice == null)
@@ -279,6 +280,11 @@
{
return numberOfPages;
}
+
+ public int getCurrentWritingPage()
+ {
+ return currentPageId;
+ }
public SimpleString getStoreName()
{
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/transaction/TransactionPropertyIndexes.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/transaction/TransactionPropertyIndexes.java 2010-10-11
13:42:51 UTC (rev 9768)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/transaction/TransactionPropertyIndexes.java 2010-10-11
21:16:17 UTC (rev 9769)
@@ -31,4 +31,6 @@
public static final int REFS_OPERATION = 6;
public static final int PAGE_MESSAGES_OPERATION = 7;
+
+ public static final int PAGE_CURSOR_POSITIONS = 8;
}
Modified:
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
---
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2010-10-11
13:42:51 UTC (rev 9768)
+++
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2010-10-11
21:16:17 UTC (rev 9769)
@@ -999,7 +999,7 @@
final PagingStoreFactory storeFactory,
final SimpleString storeName,
final AddressSettings addressSettings,
- final Executor executor,
+ final ExecutorFactory executorFactory,
final boolean syncNonTransactional)
{
super(address,
@@ -1010,7 +1010,7 @@
storeFactory,
storeName,
addressSettings,
- executor,
+ executorFactory,
syncNonTransactional);
}
@@ -1073,7 +1073,7 @@
this,
address,
settings,
- getExecutorFactory().getExecutor(),
+ getExecutorFactory(),
syncNonTransactional);
}
Modified:
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCrashTest.java
===================================================================
---
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCrashTest.java 2010-10-11
13:42:51 UTC (rev 9768)
+++
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCrashTest.java 2010-10-11
21:16:17 UTC (rev 9769)
@@ -43,6 +43,7 @@
import org.hornetq.spi.core.security.HornetQSecurityManager;
import org.hornetq.spi.core.security.HornetQSecurityManagerImpl;
import org.hornetq.tests.util.ServiceTestBase;
+import org.hornetq.utils.ExecutorFactory;
import org.hornetq.utils.OrderedExecutorFactory;
/**
@@ -292,7 +293,7 @@
factoryField.setAccessible(true);
OrderedExecutorFactory factory =
(org.hornetq.utils.OrderedExecutorFactory)factoryField.get(this);
- return new FailingPagingStore(destinationName, settings,
factory.getExecutor(), syncNonTransactional);
+ return new FailingPagingStore(destinationName, settings, factory,
syncNonTransactional);
}
// Package protected ---------------------------------------------
@@ -312,7 +313,7 @@
*/
public FailingPagingStore(final SimpleString storeName,
final AddressSettings addressSettings,
- final Executor executor,
+ final ExecutorFactory executor,
final boolean syncNonTransactional)
{
super(storeName,
Modified:
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
---
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-10-11
13:42:51 UTC (rev 9768)
+++
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-10-11
21:16:17 UTC (rev 9769)
@@ -26,6 +26,7 @@
import org.hornetq.core.paging.cursor.PageCursorProvider;
import org.hornetq.core.paging.cursor.PagePosition;
import org.hornetq.core.paging.cursor.impl.PageCursorProviderImpl;
+import org.hornetq.core.paging.cursor.impl.PagePositionImpl;
import org.hornetq.core.paging.impl.PagingStoreImpl;
import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
import org.hornetq.core.server.HornetQServer;
@@ -78,11 +79,11 @@
System.out.println("NumberOfPages = " + numberOfPages);
- PageCursorProviderImpl cursorProvider = new
PageCursorProviderImpl(lookupPageStore(ADDRESS), server.getStorageManager());
+ PageCursorProviderImpl cursorProvider = new
PageCursorProviderImpl(lookupPageStore(ADDRESS), server.getStorageManager(),
server.getExecutorFactory());
for (int i = 0; i < numberOfPages; i++)
{
- PageCache cache = cursorProvider.getPageCache(i + 1);
+ PageCache cache = cursorProvider.getPageCache(new PagePositionImpl(i + 1, 0));
System.out.println("Page " + i + " had " +
cache.getNumberOfMessages() + " messages");
}
@@ -104,7 +105,7 @@
System.out.println("NumberOfPages = " + numberOfPages);
- PageCursorProviderImpl cursorProvider = new
PageCursorProviderImpl(lookupPageStore(ADDRESS), server.getStorageManager());
+ PageCursorProviderImpl cursorProvider = new
PageCursorProviderImpl(lookupPageStore(ADDRESS), server.getStorageManager(),
server.getExecutorFactory());
PageCursor cursor = cursorProvider.createCursor();
@@ -134,9 +135,9 @@
System.out.println("NumberOfPages = " + numberOfPages);
- PageCursorProviderImpl cursorProvider = new
PageCursorProviderImpl(lookupPageStore(ADDRESS), server.getStorageManager());
+ PageCursorProviderImpl cursorProvider = new
PageCursorProviderImpl(lookupPageStore(ADDRESS), server.getStorageManager(),
server.getExecutorFactory());
- PageCache cache = cursorProvider.getPageCache(2);
+ PageCache cache = cursorProvider.getPageCache(new PagePositionImpl(2,0));
assertNull(cache);
}
@@ -287,11 +288,16 @@
}
- public void testRollbackScenarios() throws Exception
+ public void testRollbackScenariosOnACK() throws Exception
{
}
+ public void testReadRolledBackData() throws Exception
+ {
+
+ }
+
public void testPrepareScenarios() throws Exception
{
@@ -316,6 +322,11 @@
{
}
+
+ public void testFirstMessageInTheMiddle() throws Exception
+ {
+
+ }
/**
* @param numMessages
Modified:
branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
---
branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2010-10-11
13:42:51 UTC (rev 9768)
+++
branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2010-10-11
21:16:17 UTC (rev 9769)
@@ -73,6 +73,7 @@
import org.hornetq.tests.unit.core.server.impl.fakes.FakePostOffice;
import org.hornetq.tests.util.RandomUtil;
import org.hornetq.tests.util.UnitTestCase;
+import org.hornetq.utils.ExecutorFactory;
import org.hornetq.utils.UUID;
/**
@@ -143,7 +144,7 @@
null,
PagingStoreImplTest.destinationTestName,
addressSettings,
- executor,
+ getExecutorFactory(),
true);
storeImpl.start();
@@ -179,7 +180,7 @@
storeFactory,
PagingStoreImplTest.destinationTestName,
addressSettings,
- executor,
+ getExecutorFactory(),
true);
storeImpl.start();
@@ -215,7 +216,7 @@
null,
PagingStoreImplTest.destinationTestName,
addressSettings,
- executor,
+ getExecutorFactory(),
true);
storeImpl.start();
@@ -242,7 +243,7 @@
storeFactory,
PagingStoreImplTest.destinationTestName,
addressSettings,
- executor,
+ getExecutorFactory(),
true);
storeImpl.start();
@@ -317,7 +318,7 @@
storeFactory,
PagingStoreImplTest.destinationTestName,
addressSettings,
- executor,
+ getExecutorFactory(),
true);
storeImpl.start();
@@ -464,7 +465,7 @@
storeFactory,
new
SimpleString("test"),
settings,
- executor,
+ getExecutorFactory(),
true);
storeImpl.start();
@@ -627,7 +628,7 @@
storeFactory,
new
SimpleString("test"),
settings,
- executor,
+ getExecutorFactory(),
true);
storeImpl2.start();
@@ -710,7 +711,7 @@
storeFactory,
new
SimpleString("test"),
settings,
- executor,
+ getExecutorFactory(),
true);
storeImpl.start();
@@ -854,6 +855,18 @@
{
return new FakePostOffice();
}
+
+ private ExecutorFactory getExecutorFactory()
+ {
+ return new ExecutorFactory()
+ {
+
+ public Executor getExecutor()
+ {
+ return executor;
+ }
+ };
+ }
private ServerMessage createMessage(final long id,
final PagingStore store,