Author: clebert.suconic(a)jboss.com
Date: 2010-10-12 20:10:30 -0400 (Tue, 12 Oct 2010)
New Revision: 9780
Added:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/LivePageCache.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/LivePageCacheImpl.java
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/Page.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCache.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCacheImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCrashTest.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
Log:
Moving the cursor over live data as well
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/Page.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/Page.java 2010-10-12
18:17:07 UTC (rev 9779)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/Page.java 2010-10-13
00:10:30 UTC (rev 9780)
@@ -15,6 +15,8 @@
import java.util.List;
+import org.hornetq.core.paging.cursor.LivePageCache;
+
/**
*
* @see PagingManager
@@ -28,6 +30,8 @@
void write(PagedMessage message) throws Exception;
List<PagedMessage> read() throws Exception;
+
+ void setLiveCache(LivePageCache pageCache);
int getSize();
Added:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/LivePageCache.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/LivePageCache.java
(rev 0)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/LivePageCache.java 2010-10-13
00:10:30 UTC (rev 9780)
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.paging.cursor;
+
+import org.hornetq.core.server.ServerMessage;
+
+/**
+ * A LivePageCache
+ *
+ * @author clebertsuconic
+ *
+ *
+ */
+public interface LivePageCache extends PageCache
+{
+
+ void addLiveMessage(ServerMessage message);
+
+ void close();
+}
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCache.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCache.java 2010-10-12
18:17:07 UTC (rev 9779)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCache.java 2010-10-13
00:10:30 UTC (rev 9780)
@@ -30,6 +30,12 @@
int getNumberOfMessages();
void setMessages(ServerMessage[] messages);
+
+ /**
+ * If this cache is still being updated
+ * @return
+ */
+ boolean isLive();
/**
*
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java 2010-10-12
18:17:07 UTC (rev 9779)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java 2010-10-13
00:10:30 UTC (rev 9780)
@@ -39,6 +39,8 @@
// Public --------------------------------------------------------
PageCache getPageCache(PagePosition pos);
+
+ void addPageCache(PageCache cache);
PagingStore getAssociatedStore();
@@ -55,8 +57,6 @@
*/
PageCursor createCursor();
- // PageCursor recoverCursor(PagePosition position);
-
Pair<PagePosition, ServerMessage> getAfter(PagePosition pos) throws Exception;
ServerMessage getMessage(PagePosition pos) throws Exception;
Added:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/LivePageCacheImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/LivePageCacheImpl.java
(rev 0)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/LivePageCacheImpl.java 2010-10-13
00:10:30 UTC (rev 9780)
@@ -0,0 +1,145 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.paging.cursor.impl;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.hornetq.core.paging.Page;
+import org.hornetq.core.paging.cursor.LivePageCache;
+import org.hornetq.core.server.ServerMessage;
+
+/**
+ * This is the same as PageCache, however this is for the page that's being currently
written.
+ *
+ * @author clebertsuconic
+ *
+ *
+ */
+public class LivePageCacheImpl implements LivePageCache
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private final List<ServerMessage> messages = new
LinkedList<ServerMessage>();
+
+ private final Page page;
+
+ private boolean isLive = true;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public LivePageCacheImpl(final Page page)
+ {
+ this.page = page;
+ }
+
+ // Public --------------------------------------------------------
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.paging.cursor.PageCache#getPage()
+ */
+ public Page getPage()
+ {
+ return page;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.paging.cursor.PageCache#getNumberOfMessages()
+ */
+ public synchronized int getNumberOfMessages()
+ {
+ return messages.size();
+ }
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.paging.cursor.PageCache#setMessages(org.hornetq.core.server.ServerMessage[])
+ */
+ public synchronized void setMessages(ServerMessage[] messages)
+ {
+ // This method shouldn't be called on liveCache, but we will provide the
implementation for it anyway
+ for (ServerMessage msg : messages)
+ {
+ addLiveMessage(msg);
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.paging.cursor.PageCache#getMessage(int)
+ */
+ public synchronized ServerMessage getMessage(int messageNumber)
+ {
+ if (messageNumber < messages.size())
+ {
+ return messages.get(messageNumber);
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.paging.cursor.PageCache#lock()
+ */
+ public void lock()
+ {
+ // nothing to be done on live cache
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.paging.cursor.PageCache#unlock()
+ */
+ public void unlock()
+ {
+ // nothing to be done on live cache
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.paging.cursor.PageCache#isLive()
+ */
+ public synchronized boolean isLive()
+ {
+ return isLive;
+ }
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.paging.cursor.LivePageCache#addLiveMessage(org.hornetq.core.server.ServerMessage)
+ */
+ public synchronized void addLiveMessage(ServerMessage message)
+ {
+ this.messages.add(message);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.paging.cursor.LivePageCache#close()
+ */
+ public synchronized void close()
+ {
+ this.isLive = false;
+ }
+
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCacheImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCacheImpl.java 2010-10-12
18:17:07 UTC (rev 9779)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCacheImpl.java 2010-10-13
00:10:30 UTC (rev 9780)
@@ -22,8 +22,6 @@
/**
* The caching associated to a single page.
- *
- * TODO: Solve how to update the cache for the current page on PagingStore.
*
* @author <a href="mailto:clebert.suconic@jboss.org">Clebert
Suconic</a>
*
@@ -112,7 +110,15 @@
}
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.paging.cursor.PageCache#isLive()
+ */
+ public boolean isLive()
+ {
+ return true;
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-12
18:17:07 UTC (rev 9779)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-13
00:10:30 UTC (rev 9780)
@@ -368,7 +368,7 @@
final ArrayList<PageCursorInfo> completedPages = new
ArrayList<PageCursorInfo>();
- // First get the completed pages
+ // First get the completed pages using a lock
synchronized (this)
{
for (Entry<Long, PageCursorInfo> entry : consumedPages.entrySet())
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-10-12
18:17:07 UTC (rev 9779)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-10-13
00:10:30 UTC (rev 9780)
@@ -114,7 +114,7 @@
PageCache cache = getPageCache(pos);
- if (retPos.getMessageNr() >= cache.getNumberOfMessages())
+ if (!cache.isLive() && retPos.getMessageNr() >=
cache.getNumberOfMessages())
{
retPos = pos.nextPage();
@@ -130,8 +130,17 @@
return null;
}
}
-
- return new Pair<PagePosition, ServerMessage>(retPos,
cache.getMessage(retPos.getMessageNr()));
+
+ ServerMessage serverMessage = cache.getMessage(retPos.getMessageNr());
+
+ if (serverMessage != null)
+ {
+ return new Pair<PagePosition, ServerMessage>(retPos,
cache.getMessage(retPos.getMessageNr()));
+ }
+ else
+ {
+ return null;
+ }
}
public ServerMessage getMessage(final PagePosition pos) throws Exception
@@ -147,6 +156,9 @@
return cache.getMessage(pos.getMessageNr());
}
+ /**
+ * No need to synchronize this method since the private getPageCache will have a
synchronized call
+ */
public PageCache getPageCache(PagePosition pos)
{
PageCache cache = pos.getPageCache();
@@ -157,8 +169,14 @@
}
return cache;
}
+
+ public synchronized void addPageCache(PageCache cache)
+ {
+ // TODO: remove the type cast here
+ softCache.put((long)cache.getPage().getPageId(), cache);
+ }
- public int getCacheSize()
+ public synchronized int getCacheSize()
{
return softCache.size();
}
@@ -171,6 +189,7 @@
}
}
+
public void stop()
{
activeCursors.clear();
@@ -180,6 +199,7 @@
// Protected -----------------------------------------------------
+ /* Protected as we may let test cases to instrument the test */
protected PageCacheImpl createPageCache(final long pageId) throws Exception
{
return new PageCacheImpl(pagingStore.createPage((int)pageId));
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageImpl.java 2010-10-12
18:17:07 UTC (rev 9779)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageImpl.java 2010-10-13
00:10:30 UTC (rev 9780)
@@ -26,6 +26,7 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.core.paging.Page;
import org.hornetq.core.paging.PagedMessage;
+import org.hornetq.core.paging.cursor.LivePageCache;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.utils.DataConstants;
@@ -57,6 +58,11 @@
private final SequentialFile file;
private final SequentialFileFactory fileFactory;
+
+ /**
+ * The page cache that will be filled with data as we write more data
+ */
+ private volatile LivePageCache pageCache;
private final AtomicInteger size = new AtomicInteger(0);
@@ -89,6 +95,11 @@
{
return pageId;
}
+
+ public void setLiveCache(LivePageCache pageCache)
+ {
+ this.pageCache = pageCache;
+ }
public List<PagedMessage> read() throws Exception
{
@@ -167,6 +178,11 @@
buffer.rewind();
file.writeDirect(buffer, false);
+
+ if (pageCache != null)
+ {
+ pageCache.addLiveMessage(message.getMessage(storageManager));
+ }
numberOfMessages.incrementAndGet();
size.addAndGet(buffer.limit());
@@ -192,6 +208,12 @@
{
storageManager.pageClosed(storeName, pageId);
}
+ if (pageCache != null)
+ {
+ pageCache.close();
+ // leave it to the soft cache to decide when to release it now
+ pageCache = null;
+ }
file.close();
}
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-10-12
18:17:07 UTC (rev 9779)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-10-13
00:10:30 UTC (rev 9780)
@@ -40,7 +40,9 @@
import org.hornetq.core.paging.PagingManager;
import org.hornetq.core.paging.PagingStore;
import org.hornetq.core.paging.PagingStoreFactory;
+import org.hornetq.core.paging.cursor.LivePageCache;
import org.hornetq.core.paging.cursor.PageCursorProvider;
+import org.hornetq.core.paging.cursor.impl.LivePageCacheImpl;
import org.hornetq.core.paging.cursor.impl.PageCursorProviderImpl;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.postoffice.DuplicateIDCache;
@@ -545,9 +547,9 @@
return currentPage;
}
- public Page createPage(final int page) throws Exception
+ public Page createPage(final int pageNumber) throws Exception
{
- String fileName = createFileName(page);
+ String fileName = createFileName(pageNumber);
if (fileFactory == null)
{
@@ -555,14 +557,24 @@
}
SequentialFile file = fileFactory.createSequentialFile(fileName, 1000);
+
+ Page page = new PageImpl(storeName, storageManager, fileFactory, file,
pageNumber);
+
+ LivePageCache pageCache = new LivePageCacheImpl(page);
+
+ page.setLiveCache(pageCache);
+ cursorProvider.addPageCache(pageCache);
+
+ // To create the file
file.open();
file.position(0);
file.close();
+
- return new PageImpl(storeName, storageManager, fileFactory, file, page);
+ return page;
}
// TestSupportPageStore ------------------------------------------
Modified:
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCrashTest.java
===================================================================
---
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCrashTest.java 2010-10-12
18:17:07 UTC (rev 9779)
+++
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCrashTest.java 2010-10-13
00:10:30 UTC (rev 9780)
@@ -18,7 +18,6 @@
import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.List;
-import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import junit.framework.Assert;
@@ -34,6 +33,7 @@
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.paging.PagingManager;
import org.hornetq.core.paging.PagingStore;
+import org.hornetq.core.paging.cursor.LivePageCache;
import org.hornetq.core.paging.impl.PagingManagerImpl;
import org.hornetq.core.paging.impl.PagingStoreFactoryNIO;
import org.hornetq.core.paging.impl.PagingStoreImpl;
@@ -434,6 +434,13 @@
{
delegatedPage = delegatePage;
}
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.paging.Page#setLiveCache(org.hornetq.core.paging.cursor.LivePageCache)
+ */
+ public void setLiveCache(LivePageCache pageCache)
+ {
+ }
}
}
Modified:
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
---
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-10-12
18:17:07 UTC (rev 9779)
+++
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-10-13
00:10:30 UTC (rev 9780)
@@ -292,7 +292,52 @@
}
+ public void testConsumeLivePage() throws Exception
+ {
+ PagingStoreImpl pageStore = lookupPageStore(ADDRESS);
+
+ pageStore.startPaging();
+
+ final int NUM_MESSAGES = 1000;
+
+ final int messageSize = 1024 * 1024;
+
+
+ PageCursorProvider cursorProvider =
this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier();
+ System.out.println("cursorProvider = " + cursorProvider);
+
+ PageCursor cursor =
this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getCursor(queue.getID());
+
+ System.out.println("Cursor: " + cursor);
+
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ if (i % 100 == 0) System.out.println("Paged " + i);
+
+ HornetQBuffer buffer = RandomUtil.randomBuffer(messageSize, i + 1l);
+
+ ServerMessage msg = new ServerMessageImpl(i, buffer.writerIndex());
+ msg.putIntProperty("key", i);
+
+ msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
+
+ Assert.assertTrue(pageStore.page(msg));
+
+ Pair<PagePosition, ServerMessage> readMessage = cursor.moveNext();
+
+ assertNotNull(readMessage);
+
+ // TODO: ack on live data
+
+ assertEquals(i, readMessage.b.getIntProperty("key").intValue());
+
+ assertNull(cursor.moveNext());
+ }
+
+ }
+
public void testRollbackScenariosOnACK() throws Exception
{