Author: clebert.suconic(a)jboss.com
Date: 2010-10-05 18:18:27 -0400 (Tue, 05 Oct 2010)
New Revision: 9754
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingManager.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagePosition.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PagePositionImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
Log:
First implementation of the redelivery and ack with reload of the cursors
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingManager.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingManager.java 2010-10-05
17:33:33 UTC (rev 9753)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingManager.java 2010-10-05
22:18:27 UTC (rev 9754)
@@ -82,5 +82,5 @@
void deletePageStore(SimpleString storeName) throws Exception;
- void processReload();
+ void processReload() throws Exception;
}
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java 2010-10-05
17:33:33 UTC (rev 9753)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java 2010-10-05
22:18:27 UTC (rev 9754)
@@ -64,7 +64,7 @@
PageCursorProvider getCursorProvier();
- void processReload();
+ void processReload() throws Exception;
/**
* @return false if a thread was already started, or if not in page mode
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java 2010-10-05
17:33:33 UTC (rev 9753)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java 2010-10-05
22:18:27 UTC (rev 9754)
@@ -48,7 +48,7 @@
*/
void reloadPreparedACK(Transaction tx, PagePosition position);
- void processReload();
+ void processReload() throws Exception;
/**
* To be used on redeliveries
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java 2010-10-05
17:33:33 UTC (rev 9753)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java 2010-10-05
22:18:27 UTC (rev 9754)
@@ -57,9 +57,13 @@
// PageCursor recoverCursor(PagePosition position);
Pair<PagePosition, ServerMessage> getAfter(PagePosition pos) throws Exception;
+
+ ServerMessage getMessage(PagePosition pos) throws Exception;
- void processReload();
+ void processReload() throws Exception;
+ void stop();
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagePosition.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagePosition.java 2010-10-05
17:33:33 UTC (rev 9753)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagePosition.java 2010-10-05
22:18:27 UTC (rev 9754)
@@ -37,5 +37,8 @@
PagePosition nextMessage();
PagePosition nextPage();
+
+ /** This will just test if the current position is the immediate next to the parameter
position */
+ boolean isRightAfter(PagePosition previous);
}
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-05
17:33:33 UTC (rev 9753)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-05
22:18:27 UTC (rev 9754)
@@ -14,8 +14,10 @@
package org.hornetq.core.paging.cursor.impl;
import java.util.Collections;
+import java.util.Deque;
import java.util.LinkedList;
import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
import org.hornetq.api.core.Pair;
import org.hornetq.core.paging.PagingStore;
@@ -23,6 +25,7 @@
import org.hornetq.core.paging.cursor.PageCursorProvider;
import org.hornetq.core.paging.cursor.PagePosition;
import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.transaction.Transaction;
@@ -49,9 +52,12 @@
private final PageCursorProvider cursorProvider;
private volatile PagePosition lastPosition;
-
+
private List<PagePosition> recoveredACK;
+ // We only store the position for redeliveries. They will be read from the SoftCache
again during delivery.
+ private final ConcurrentLinkedQueue<PagePosition> redeliveries = new
ConcurrentLinkedQueue<PagePosition>();
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -74,11 +80,23 @@
*/
public synchronized Pair<PagePosition, ServerMessage> moveNext() throws
Exception
{
+ PagePosition redeliveryPos = null;
+
+ // Redeliveries will take precedence
+ if ((redeliveryPos = redeliveries.poll()) != null)
+ {
+ return new Pair<PagePosition, ServerMessage>(redeliveryPos,
cursorProvider.getMessage(redeliveryPos));
+ }
+
if (lastPosition == null)
{
- lastPosition = recoverLastPosition();
+ // it will start at the first available page
+ long firstPage = pageStore.getFirstPage();
+ lastPosition = new PagePositionImpl(firstPage, -1);
}
+ boolean match = false;
+
Pair<PagePosition, ServerMessage> message = null;
do
{
@@ -87,8 +105,14 @@
{
lastPosition = message.a;
}
+ match = match(message.b);
+
+ if (!match)
+ {
+ ignored(message.a);
+ }
}
- while (message != null && !match(message.b));
+ while (message != null && !match);
return message;
}
@@ -111,12 +135,10 @@
/* (non-Javadoc)
* @see
org.hornetq.core.paging.cursor.PageCursor#returnElement(org.hornetq.core.paging.cursor.PagePosition)
*/
- public void redeliver(final PagePosition position)
+ public synchronized void redeliver(final PagePosition position)
{
- // TODO Auto-generated method stub
-
+ this.redeliveries.add(position);
}
-
/**
* Theres no need to synchronize this method as it's only called from journal load
on startup
@@ -136,11 +158,49 @@
installTXCallback(tx, position);
}
- public void processReload()
+ public void processReload() throws Exception
{
if (this.recoveredACK != null)
{
+ System.out.println("********** processing reload!!!!!!!");
Collections.sort(recoveredACK);
+
+ PagePosition previousPos = null;
+ for (PagePosition pos : recoveredACK)
+ {
+ lastPosition = pos;
+ if (previousPos != null)
+ {
+ if (!previousPos.isRightAfter(previousPos))
+ {
+ PagePosition tmpPos = previousPos;
+ // looking for holes on the ack list for redelivery
+ while (true)
+ {
+ Pair<PagePosition, ServerMessage> msgCheck =
cursorProvider.getAfter(tmpPos);
+ // end of the hole, we can finish processing here
+ if (msgCheck == null || msgCheck.a.equals(pos))
+ {
+ break;
+ }
+ else
+ {
+ if (match(msgCheck.b))
+ {
+ redeliver(msgCheck.a);
+ }
+ }
+ tmpPos = msgCheck.a;
+ }
+ }
+ }
+
+ previousPos = pos;
+ System.out.println("pos: " + pos);
+ }
+
+ recoveredACK.clear();
+ recoveredACK = null;
}
}
@@ -153,11 +213,14 @@
// To be used with expressions
return true;
}
-
-
// Private -------------------------------------------------------
-
+
+ private void ignored(final PagePosition message)
+ {
+ // TODO: Update reference counts
+ }
+
/**
* @param committedACK
*/
@@ -167,27 +230,18 @@
{
recoveredACK = new LinkedList<PagePosition>();
}
-
+
recoveredACK.add(committedACK);
}
-
- private PagePosition recoverLastPosition()
- {
- long firstPage = pageStore.getFirstPage();
- return new PagePositionImpl(firstPage, -1);
- }
-
-
/**
* @param tx
* @param position
*/
private void installTXCallback(Transaction tx, PagePosition position)
{
- }
+ }
-
// Inner classes -------------------------------------------------
}
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-10-05
17:33:33 UTC (rev 9753)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-10-05
22:18:27 UTC (rev 9754)
@@ -78,7 +78,12 @@
PageCursor activeCursor = activeCursors.get(cursorID);
if (activeCursor == null)
{
- activeCursor = activeCursors.putIfAbsent(cursorID, new PageCursorImpl(this,
pagingStore, storageManager, cursorID));
+ activeCursor = new PageCursorImpl(this, pagingStore, storageManager, cursorID);
+ PageCursor previousValue = activeCursors.putIfAbsent(cursorID, activeCursor);
+ if (previousValue != null)
+ {
+ activeCursor = previousValue;
+ }
}
return activeCursor;
@@ -118,6 +123,19 @@
return new Pair<PagePosition, ServerMessage>(retPos,
cache.getMessage(retPos.getMessageNr()));
}
+
+ public ServerMessage getMessage(final PagePosition pos) throws Exception
+ {
+ PageCache cache = getPageCache(pos.getPageNr());
+
+ if (pos.getMessageNr() >= cache.getNumberOfMessages())
+ {
+ // sanity check, this should never happen unless there's a bug
+ throw new IllegalStateException("Invalid messageNumber passed = " +
pos);
+ }
+
+ return cache.getMessage(pos.getMessageNr());
+ }
public PageCache getPageCache(final long pageId) throws Exception
{
@@ -181,13 +199,18 @@
return softCache.size();
}
- public void processReload()
+ public void processReload() throws Exception
{
for (PageCursor cursor : this.activeCursors.values())
{
cursor.processReload();
}
}
+
+ public void stop()
+ {
+ activeCursors.clear();
+ }
// Package protected ---------------------------------------------
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PagePositionImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PagePositionImpl.java 2010-10-05
17:33:33 UTC (rev 9753)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PagePositionImpl.java 2010-10-05
22:18:27 UTC (rev 9754)
@@ -82,6 +82,11 @@
{
return messageNr;
}
+
+ public boolean isRightAfter(final PagePosition previous)
+ {
+ return this.pageNr == previous.getPageNr() && this.messageNr ==
previous.getMessageNr() + 1;
+ }
/* (non-Javadoc)
* @see java.lang.Comparable#compareTo(java.lang.Object)
@@ -158,4 +163,12 @@
return true;
}
+ @Override
+ public String toString()
+ {
+ return "PagePositionImpl [pageNr=" + pageNr + ", messageNr=" +
messageNr + ", recordID=" + recordID + "]";
+ }
+
+
+
}
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java 2010-10-05
17:33:33 UTC (rev 9753)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java 2010-10-05
22:18:27 UTC (rev 9754)
@@ -229,7 +229,7 @@
}
}
- public void processReload()
+ public void processReload() throws Exception
{
for (PagingStore store: stores.values())
{
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-10-05
17:33:33 UTC (rev 9753)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-10-05
22:18:27 UTC (rev 9754)
@@ -317,6 +317,13 @@
public boolean startDepaging()
{
+
+ // Disabled for now
+
+ return false;
+
+
+ /*
if (!running)
{
return false;
@@ -353,11 +360,11 @@
finally
{
currentPageLock.readLock().unlock();
- }
+ } */
}
- public void processReload()
+ public void processReload() throws Exception
{
cursorProvider.processReload();
}
@@ -396,6 +403,8 @@
currentPage.close();
currentPage = null;
}
+
+ cursorProvider.stop();
}
}
@@ -1218,7 +1227,7 @@
// Inner classes -------------------------------------------------
- private class DepageRunnable implements Runnable
+/* private class DepageRunnable implements Runnable
{
private final Executor followingExecutor;
@@ -1252,5 +1261,5 @@
PagingStoreImpl.log.error(e, e);
}
}
- }
+ } */
}
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-10-05
17:33:33 UTC (rev 9753)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-10-05
22:18:27 UTC (rev 9754)
@@ -1114,7 +1114,7 @@
deploymentManager.start();
}
- pagingManager.reloadStores();
+ pagingManager.processReload();
pagingManager.resumeDepages();
Modified:
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
---
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-10-05
17:33:33 UTC (rev 9753)
+++
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-10-05
22:18:27 UTC (rev 9754)
@@ -23,10 +23,12 @@
import org.hornetq.core.config.Configuration;
import org.hornetq.core.paging.cursor.PageCache;
import org.hornetq.core.paging.cursor.PageCursor;
+import org.hornetq.core.paging.cursor.PageCursorProvider;
import org.hornetq.core.paging.cursor.PagePosition;
import org.hornetq.core.paging.cursor.impl.PageCursorProviderImpl;
import org.hornetq.core.paging.impl.PagingStoreImpl;
import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.Queue;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.impl.ServerMessageImpl;
import org.hornetq.core.settings.impl.AddressSettings;
@@ -50,6 +52,8 @@
private SimpleString ADDRESS = new SimpleString("test-add");
private HornetQServer server;
+
+ private Queue queue;
private static final int PAGE_MAX = -1;
@@ -135,6 +139,92 @@
}
+ public void testRestart() throws Exception
+ {
+ final int NUM_MESSAGES = 1000;
+
+ int numberOfPages = addMessages(NUM_MESSAGES, 100 * 1024);
+
+ System.out.println("Number of pages = " + numberOfPages);
+
+ PageCursorProvider cursorProvider =
this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier();
+ System.out.println("cursorProvider = " + cursorProvider);
+
+ PageCursor cursor =
this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getCursor(queue.getID());
+
+ System.out.println("Cursor: " + cursor);
+ for (int i = 0 ; i < 500 ; i++)
+ {
+ Pair<PagePosition, ServerMessage> msg = cursor.moveNext();
+ assertEquals(i, msg.b.getIntProperty("key").intValue());
+ cursor.ack(msg.a);
+ }
+
+ server.stop();
+
+ server.start();
+
+ cursor =
this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getCursor(queue.getID());
+
+ for (int i = 500; i < NUM_MESSAGES; i++)
+ {
+ Pair<PagePosition, ServerMessage> msg = cursor.moveNext();
+ assertEquals(i, msg.b.getIntProperty("key").intValue());
+ cursor.ack(msg.a);
+ }
+
+ }
+
+
+ public void testRestartWithHoleOnAck() throws Exception
+ {
+
+ final int NUM_MESSAGES = 1000;
+
+ int numberOfPages = addMessages(NUM_MESSAGES, 10 * 1024);
+
+ System.out.println("Number of pages = " + numberOfPages);
+
+ PageCursorProvider cursorProvider =
this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier();
+ System.out.println("cursorProvider = " + cursorProvider);
+
+ PageCursor cursor =
this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getCursor(queue.getID());
+
+ System.out.println("Cursor: " + cursor);
+ for (int i = 0 ; i < 100 ; i++)
+ {
+ Pair<PagePosition, ServerMessage> msg = cursor.moveNext();
+ assertEquals(i, msg.b.getIntProperty("key").intValue());
+ if (i < 10 || i > 20)
+ {
+ cursor.ack(msg.a);
+ }
+ }
+
+ server.stop();
+
+ server.start();
+
+ cursor =
this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getCursor(queue.getID());
+
+ for (int i = 10; i <= 20; i++)
+ {
+ Pair<PagePosition, ServerMessage> msg = cursor.moveNext();
+ assertEquals(i, msg.b.getIntProperty("key").intValue());
+ cursor.ack(msg.a);
+ }
+
+
+ for (int i = 100; i < NUM_MESSAGES; i++)
+ {
+ Pair<PagePosition, ServerMessage> msg = cursor.moveNext();
+ assertEquals(i, msg.b.getIntProperty("key").intValue());
+ cursor.ack(msg.a);
+ }
+
+ }
+
+
public void testRollbackScenarios() throws Exception
{
@@ -216,8 +306,10 @@
new HashMap<String, AddressSettings>());
server.start();
+
+ queue = server.createQueue(ADDRESS, ADDRESS, null, true, false);
- createQueue(ADDRESS.toString(), ADDRESS.toString());
+ //createQueue(ADDRESS.toString(), ADDRESS.toString());
}
protected void tearDown() throws Exception