Author: clebert.suconic(a)jboss.com
Date: 2010-10-15 16:07:39 -0400 (Fri, 15 Oct 2010)
New Revision: 9792
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PageTransactionInfo.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagedMessageImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
Log:
Transactions on Cursors
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PageTransactionInfo.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PageTransactionInfo.java 2010-10-15
12:34:13 UTC (rev 9791)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PageTransactionInfo.java 2010-10-15
20:07:39 UTC (rev 9792)
@@ -14,6 +14,8 @@
package org.hornetq.core.paging;
import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.core.paging.cursor.PageCursor;
+import org.hornetq.core.paging.cursor.PagePosition;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.transaction.Transaction;
@@ -48,4 +50,13 @@
void increment();
int getNumberOfMessages();
+
+ /**
+ * This method will hold the position to be delivered later in case this transaction
is pending.
+ * If the tx is not pending, it will return false, so the caller can deliver it right
away
+ * @param cursor
+ * @param cursorPos
+ * @return true if the message will be delivered later, false if it should be
delivered right away
+ */
+ boolean deliverAfterCommit(PageCursor cursor, PagePosition cursorPos);
}
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java 2010-10-15
12:34:13 UTC (rev 9791)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java 2010-10-15
20:07:39 UTC (rev 9792)
@@ -65,6 +65,8 @@
Page createPage(final int page) throws Exception;
+ PagingManager getPagingManager();
+
PageCursorProvider getCursorProvier();
void processReload() throws Exception;
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java 2010-10-15
12:34:13 UTC (rev 9791)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java 2010-10-15
20:07:39 UTC (rev 9792)
@@ -46,6 +46,12 @@
void reloadACK(PagePosition position);
/**
+ * To be called when the cursor decided to ignore a position.
+ * @param position
+ */
+ void positionIgnored(PagePosition position);
+
+ /**
* To be used to avoid a redelivery of a prepared ACK after load
* @param position
*/
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java 2010-10-15
12:34:13 UTC (rev 9791)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java 2010-10-15
20:07:39 UTC (rev 9792)
@@ -57,7 +57,7 @@
*/
PageCursor createCursor();
- Pair<PagePosition, PagedMessage> getAfter(PageCursor cursor, PagePosition pos)
throws Exception;
+ Pair<PagePosition, PagedMessage> getNext(PageCursor cursor, PagePosition pos)
throws Exception;
PagedMessage getMessage(PagePosition pos) throws Exception;
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-15
12:34:13 UTC (rev 9791)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-15
20:07:39 UTC (rev 9792)
@@ -133,7 +133,7 @@
do
{
- message = cursorProvider.getAfter(this, lastPosition);
+ message = cursorProvider.getNext(this, lastPosition);
if (message != null)
{
@@ -217,10 +217,19 @@
*/
public void reloadPreparedACK(final Transaction tx, final PagePosition position)
{
- // internalAdd(position);
installTXCallback(tx, position);
}
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.paging.cursor.PageCursor#positionIgnored(org.hornetq.core.paging.cursor.PagePosition)
+ */
+ public void positionIgnored(PagePosition position)
+ {
+ processACK(position);
+ }
+
+
public void processReload() throws Exception
{
if (recoveredACK != null)
@@ -247,7 +256,7 @@
// looking for holes on the ack list for redelivery
while (true)
{
- Pair<PagePosition, PagedMessage> msgCheck =
cursorProvider.getAfter(this, tmpPos);
+ Pair<PagePosition, PagedMessage> msgCheck =
cursorProvider.getNext(this, tmpPos);
positions = getPageInfo(tmpPos);
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-10-15
12:34:13 UTC (rev 9791)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-10-15
20:07:39 UTC (rev 9792)
@@ -17,15 +17,17 @@
import java.util.concurrent.ConcurrentMap;
import org.hornetq.api.core.Pair;
+import org.hornetq.core.logging.Logger;
import org.hornetq.core.paging.Page;
+import org.hornetq.core.paging.PageTransactionInfo;
import org.hornetq.core.paging.PagedMessage;
+import org.hornetq.core.paging.PagingManager;
import org.hornetq.core.paging.PagingStore;
import org.hornetq.core.paging.cursor.PageCache;
import org.hornetq.core.paging.cursor.PageCursor;
import org.hornetq.core.paging.cursor.PageCursorProvider;
import org.hornetq.core.paging.cursor.PagePosition;
import org.hornetq.core.persistence.StorageManager;
-import org.hornetq.core.server.ServerMessage;
import org.hornetq.utils.ExecutorFactory;
import org.hornetq.utils.SoftValueHashMap;
import org.jboss.netty.util.internal.ConcurrentHashMap;
@@ -44,9 +46,13 @@
{
// Constants -----------------------------------------------------
+ private static final Logger log = Logger.getLogger(PageCursorProviderImpl.class);
+
// Attributes ----------------------------------------------------
private final PagingStore pagingStore;
+
+ private final PagingManager pagingManager;
private final StorageManager storageManager;
@@ -65,6 +71,7 @@
final ExecutorFactory executorFactory)
{
this.pagingStore = pagingStore;
+ this.pagingManager = pagingStore.getPagingManager();
this.storageManager = storageManager;
this.executorFactory = executorFactory;
}
@@ -106,23 +113,47 @@
/* (non-Javadoc)
* @see
org.hornetq.core.paging.cursor.PageCursorProvider#getAfter(org.hornetq.core.paging.cursor.PagePosition)
*/
- public Pair<PagePosition, PagedMessage> getAfter(PageCursor cursor, final
PagePosition pos) throws Exception
+ public Pair<PagePosition, PagedMessage> getNext(final PageCursor cursor,
PagePosition cursorPos) throws Exception
{
while(true)
{
- Pair<PagePosition, PagedMessage> retPos = internalAfter(pos);
+ Pair<PagePosition, PagedMessage> retPos = internalAfter(cursorPos);
-
-
- return retPos;
+ if (retPos == null)
+ {
+ return null;
+ }
+ else
+ if (retPos != null)
+ {
+ cursorPos = retPos.a;
+ if (retPos.b.getTransactionID() != 0)
+ {
+ PageTransactionInfo tx =
pagingManager.getTransaction(retPos.b.getTransactionID());
+ if (tx == null)
+ {
+ log.warn("Couldn't locate page transaction " +
retPos.b.getTransactionID() + ", ignoring message on position " + retPos.a);
+ cursor.positionIgnored(cursorPos);
+ }
+ else
+ {
+ if (!tx.deliverAfterCommit(cursor, cursorPos))
+ {
+ return retPos;
+ }
+ }
+ }
+ else
+ {
+ return retPos;
+ }
+ }
}
}
private Pair<PagePosition, PagedMessage> internalAfter(final PagePosition pos)
{
- // TODO: consider page transactions here to avoid receiving an uncommitted message
- // TODO: consider the case where a full page is ignored because of a TX
PagePosition retPos = pos.nextMessage();
PageCache cache = getPageCache(pos);
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java 2010-10-15
12:34:13 UTC (rev 9791)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java 2010-10-15
20:07:39 UTC (rev 9792)
@@ -13,12 +13,17 @@
package org.hornetq.core.paging.impl;
+import java.util.LinkedList;
+import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.Pair;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.paging.PageTransactionInfo;
import org.hornetq.core.paging.PagingManager;
+import org.hornetq.core.paging.cursor.PageCursor;
+import org.hornetq.core.paging.cursor.PagePosition;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.transaction.Transaction;
import org.hornetq.core.transaction.TransactionOperation;
@@ -46,6 +51,8 @@
private volatile boolean rolledback = false;
private AtomicInteger numberOfMessages = new AtomicInteger(0);
+
+ private List<Pair<PageCursor, PagePosition>> lateDeliveries;
// Static --------------------------------------------------------
@@ -132,6 +139,15 @@
public synchronized void commit()
{
committed = true;
+ if (lateDeliveries != null)
+ {
+ for (Pair<PageCursor, PagePosition> pos : lateDeliveries)
+ {
+ pos.a.redeliver(pos.b);
+ }
+ }
+ lateDeliveries.clear();
+ lateDeliveries = null;
}
public void store(final StorageManager storageManager, PagingManager pagingManager,
final Transaction tx) throws Exception
@@ -203,6 +219,32 @@
")";
}
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.paging.PageTransactionInfo#deliverAfterCommit(org.hornetq.core.paging.cursor.PageCursor,
org.hornetq.core.paging.cursor.PagePosition)
+ */
+ public synchronized boolean deliverAfterCommit(PageCursor cursor, PagePosition
cursorPos)
+ {
+ if (committed)
+ {
+ return false;
+ }
+ else
+ if (rolledback)
+ {
+ cursor.positionIgnored(cursorPos);
+ return true;
+ }
+ else
+ {
+ if (lateDeliveries == null)
+ {
+ lateDeliveries = new LinkedList<Pair<PageCursor,
PagePosition>>();
+ }
+ lateDeliveries.add(new Pair<PageCursor, PagePosition>(cursor,
cursorPos));
+ return true;
+ }
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagedMessageImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagedMessageImpl.java 2010-10-15
12:34:13 UTC (rev 9791)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagedMessageImpl.java 2010-10-15
20:07:39 UTC (rev 9792)
@@ -50,7 +50,7 @@
private ServerMessage message;
- private long transactionID = -1;
+ private long transactionID = 0;
public PagedMessageImpl(final ServerMessage message, final long transactionID)
{
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-10-15
12:34:13 UTC (rev 9791)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-10-15
20:07:39 UTC (rev 9792)
@@ -377,6 +377,10 @@
cursorProvider.processReload();
}
+ public PagingManager getPagingManager()
+ {
+ return pagingManager;
+ }
// HornetQComponent implementation
Modified:
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
---
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-10-15
12:34:13 UTC (rev 9791)
+++
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-10-15
20:07:39 UTC (rev 9792)
@@ -13,7 +13,9 @@
package org.hornetq.tests.integration.paging;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import junit.framework.Assert;
@@ -21,6 +23,7 @@
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.config.Configuration;
+import org.hornetq.core.paging.PageTransactionInfo;
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.paging.cursor.PageCache;
import org.hornetq.core.paging.cursor.PageCursor;
@@ -29,7 +32,9 @@
import org.hornetq.core.paging.cursor.impl.PageCursorImpl;
import org.hornetq.core.paging.cursor.impl.PageCursorProviderImpl;
import org.hornetq.core.paging.cursor.impl.PagePositionImpl;
+import org.hornetq.core.paging.impl.PageTransactionInfoImpl;
import org.hornetq.core.paging.impl.PagingStoreImpl;
+import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.Queue;
@@ -237,6 +242,8 @@
server.stop();
+ OperationContextImpl.clearContext();
+
server.start();
cursor =
this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getCursor(queue.getID());
@@ -289,6 +296,8 @@
server.stop();
+ OperationContextImpl.clearContext();
+
server.start();
cursor =
this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getCursor(queue.getID());
@@ -359,17 +368,102 @@
}
- public void testRollbackScenariosOnACK() throws Exception
+ public void testPrepareScenarios() throws Exception
{
+ PagingStoreImpl pageStore = lookupPageStore(ADDRESS);
+
+ pageStore.startPaging();
+
+ final int NUM_MESSAGES = 100;
+ final int messageSize = 10 * 1024;
+
+
+ PageCursorProvider cursorProvider =
this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier();
+ System.out.println("cursorProvider = " + cursorProvider);
+
+ PageCursor cursor =
this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getCursor(queue.getID());
+
+ System.out.println("Cursor: " + cursor);
+
+ StorageManager storage = this.server.getStorageManager();
+
+ PageTransactionInfoImpl pgtxRollback = new
PageTransactionInfoImpl(storage.generateUniqueID());
+ PageTransactionInfoImpl pgtxForgotten = new
PageTransactionInfoImpl(storage.generateUniqueID());
+ PageTransactionInfoImpl pgtxCommit = new
PageTransactionInfoImpl(storage.generateUniqueID());
+
+ this.server.getPagingManager().addTransaction(pgtxRollback);
+ this.server.getPagingManager().addTransaction(pgtxCommit);
+
+ pgMessages(storage, pageStore, pgtxRollback, 0, NUM_MESSAGES, messageSize);
+ pgMessages(storage, pageStore, pgtxForgotten, 100, NUM_MESSAGES, messageSize);
+ pgMessages(storage, pageStore, pgtxCommit, 200, NUM_MESSAGES, messageSize);
+
+ addMessages(300, NUM_MESSAGES, messageSize);
+
+
+ // First consume what's already there without any tx as nothing was committed
+ for (int i = 300; i < 400; i++)
+ {
+ Pair<PagePosition, PagedMessage> pos = cursor.moveNext();
+ assertNotNull("Null at position " + i, pos);
+ assertEquals(i, pos.b.getMessage().getIntProperty("key").intValue());
+ cursor.ack(pos.a);
+ }
+
+ assertNull(cursor.moveNext());
+
+ pgtxRollback.rollback();
+ pgtxCommit.commit();
+ // Second:after pgtxCommit was done
+ for (int i = 200; i < 300; i++)
+ {
+ Pair<PagePosition, PagedMessage> pos = cursor.moveNext();
+ assertNotNull(pos);
+ assertEquals(i, pos.b.getMessage().getIntProperty("key").intValue());
+ cursor.ack(pos.a);
+ }
+
+
}
+
+
+ /**
+ * @param storage
+ * @param pageStore
+ * @param pgParameter
+ * @param start
+ * @param NUM_MESSAGES
+ * @param messageSize
+ * @throws Exception
+ */
+ private void pgMessages(StorageManager storage,
+ PagingStoreImpl pageStore,
+ PageTransactionInfo pgParameter,
+ int start,
+ final int NUM_MESSAGES,
+ final int messageSize) throws Exception
+ {
+ List<ServerMessage> messages = new ArrayList<ServerMessage>();
+
+ for (int i = start ; i < start + NUM_MESSAGES; i++)
+ {
+ HornetQBuffer buffer = RandomUtil.randomBuffer(messageSize, i + 1l);
+ ServerMessage msg = new ServerMessageImpl(storage.generateUniqueID(),
buffer.writerIndex());
+ msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
+ msg.putIntProperty("key", i);
+ messages.add(msg);
+ }
+
+ pageStore.page(messages, pgParameter.getTransactionID());
+ }
- public void testReadRolledBackData() throws Exception
+ public void testRollbackScenariosOnACK() throws Exception
{
}
- public void testPrepareScenarios() throws Exception
+ public void testReadRolledBackData() throws Exception
{
}
@@ -398,19 +492,24 @@
{
}
+
+ private int addMessages(final int numMessages, final int messageSize) throws
Exception
+ {
+ return addMessages(0, numMessages, messageSize);
+ }
/**
* @param numMessages
* @param pageStore
* @throws Exception
*/
- private int addMessages(final int numMessages, final int messageSize) throws
Exception
+ private int addMessages(final int start, final int numMessages, final int messageSize)
throws Exception
{
PagingStoreImpl pageStore = lookupPageStore(ADDRESS);
pageStore.startPaging();
- for (int i = 0; i < numMessages; i++)
+ for (int i = start; i < start + numMessages; i++)
{
if (i % 100 == 0) System.out.println("Paged " + i);
HornetQBuffer buffer = RandomUtil.randomBuffer(messageSize, i + 1l);
@@ -464,7 +563,6 @@
protected void tearDown() throws Exception
{
- OperationContextImpl.clearContext();
server.stop();
super.tearDown();
}